bufferTime.js 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. import { Subscription } from '../Subscription';
  2. import { operate } from '../util/lift';
  3. import { createOperatorSubscriber } from './OperatorSubscriber';
  4. import { arrRemove } from '../util/arrRemove';
  5. import { asyncScheduler } from '../scheduler/async';
  6. import { popScheduler } from '../util/args';
  7. import { executeSchedule } from '../util/executeSchedule';
  8. export function bufferTime(bufferTimeSpan, ...otherArgs) {
  9. var _a, _b;
  10. const scheduler = (_a = popScheduler(otherArgs)) !== null && _a !== void 0 ? _a : asyncScheduler;
  11. const bufferCreationInterval = (_b = otherArgs[0]) !== null && _b !== void 0 ? _b : null;
  12. const maxBufferSize = otherArgs[1] || Infinity;
  13. return operate((source, subscriber) => {
  14. let bufferRecords = [];
  15. let restartOnEmit = false;
  16. const emit = (record) => {
  17. const { buffer, subs } = record;
  18. subs.unsubscribe();
  19. arrRemove(bufferRecords, record);
  20. subscriber.next(buffer);
  21. restartOnEmit && startBuffer();
  22. };
  23. const startBuffer = () => {
  24. if (bufferRecords) {
  25. const subs = new Subscription();
  26. subscriber.add(subs);
  27. const buffer = [];
  28. const record = {
  29. buffer,
  30. subs,
  31. };
  32. bufferRecords.push(record);
  33. executeSchedule(subs, scheduler, () => emit(record), bufferTimeSpan);
  34. }
  35. };
  36. if (bufferCreationInterval !== null && bufferCreationInterval >= 0) {
  37. executeSchedule(subscriber, scheduler, startBuffer, bufferCreationInterval, true);
  38. }
  39. else {
  40. restartOnEmit = true;
  41. }
  42. startBuffer();
  43. const bufferTimeSubscriber = createOperatorSubscriber(subscriber, (value) => {
  44. const recordsCopy = bufferRecords.slice();
  45. for (const record of recordsCopy) {
  46. const { buffer } = record;
  47. buffer.push(value);
  48. maxBufferSize <= buffer.length && emit(record);
  49. }
  50. }, () => {
  51. while (bufferRecords === null || bufferRecords === void 0 ? void 0 : bufferRecords.length) {
  52. subscriber.next(bufferRecords.shift().buffer);
  53. }
  54. bufferTimeSubscriber === null || bufferTimeSubscriber === void 0 ? void 0 : bufferTimeSubscriber.unsubscribe();
  55. subscriber.complete();
  56. subscriber.unsubscribe();
  57. }, undefined, () => (bufferRecords = null));
  58. source.subscribe(bufferTimeSubscriber);
  59. });
  60. }
  61. //# sourceMappingURL=bufferTime.js.map