windowTime.js 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import { Subject } from '../Subject';
  2. import { asyncScheduler } from '../scheduler/async';
  3. import { Subscription } from '../Subscription';
  4. import { operate } from '../util/lift';
  5. import { createOperatorSubscriber } from './OperatorSubscriber';
  6. import { arrRemove } from '../util/arrRemove';
  7. import { popScheduler } from '../util/args';
  8. import { executeSchedule } from '../util/executeSchedule';
  9. export function windowTime(windowTimeSpan, ...otherArgs) {
  10. var _a, _b;
  11. const scheduler = (_a = popScheduler(otherArgs)) !== null && _a !== void 0 ? _a : asyncScheduler;
  12. const windowCreationInterval = (_b = otherArgs[0]) !== null && _b !== void 0 ? _b : null;
  13. const maxWindowSize = otherArgs[1] || Infinity;
  14. return operate((source, subscriber) => {
  15. let windowRecords = [];
  16. let restartOnClose = false;
  17. const closeWindow = (record) => {
  18. const { window, subs } = record;
  19. window.complete();
  20. subs.unsubscribe();
  21. arrRemove(windowRecords, record);
  22. restartOnClose && startWindow();
  23. };
  24. const startWindow = () => {
  25. if (windowRecords) {
  26. const subs = new Subscription();
  27. subscriber.add(subs);
  28. const window = new Subject();
  29. const record = {
  30. window,
  31. subs,
  32. seen: 0,
  33. };
  34. windowRecords.push(record);
  35. subscriber.next(window.asObservable());
  36. executeSchedule(subs, scheduler, () => closeWindow(record), windowTimeSpan);
  37. }
  38. };
  39. if (windowCreationInterval !== null && windowCreationInterval >= 0) {
  40. executeSchedule(subscriber, scheduler, startWindow, windowCreationInterval, true);
  41. }
  42. else {
  43. restartOnClose = true;
  44. }
  45. startWindow();
  46. const loop = (cb) => windowRecords.slice().forEach(cb);
  47. const terminate = (cb) => {
  48. loop(({ window }) => cb(window));
  49. cb(subscriber);
  50. subscriber.unsubscribe();
  51. };
  52. source.subscribe(createOperatorSubscriber(subscriber, (value) => {
  53. loop((record) => {
  54. record.window.next(value);
  55. maxWindowSize <= ++record.seen && closeWindow(record);
  56. });
  57. }, () => terminate((consumer) => consumer.complete()), (err) => terminate((consumer) => consumer.error(err))));
  58. return () => {
  59. windowRecords = null;
  60. };
  61. });
  62. }
  63. //# sourceMappingURL=windowTime.js.map