bufferToggle.js 1.3 KB

123456789101112131415161718192021222324252627282930313233
  1. import { Subscription } from '../Subscription';
  2. import { operate } from '../util/lift';
  3. import { innerFrom } from '../observable/innerFrom';
  4. import { createOperatorSubscriber } from './OperatorSubscriber';
  5. import { noop } from '../util/noop';
  6. import { arrRemove } from '../util/arrRemove';
  7. export function bufferToggle(openings, closingSelector) {
  8. return operate((source, subscriber) => {
  9. const buffers = [];
  10. innerFrom(openings).subscribe(createOperatorSubscriber(subscriber, (openValue) => {
  11. const buffer = [];
  12. buffers.push(buffer);
  13. const closingSubscription = new Subscription();
  14. const emitBuffer = () => {
  15. arrRemove(buffers, buffer);
  16. subscriber.next(buffer);
  17. closingSubscription.unsubscribe();
  18. };
  19. closingSubscription.add(innerFrom(closingSelector(openValue)).subscribe(createOperatorSubscriber(subscriber, emitBuffer, noop)));
  20. }, noop));
  21. source.subscribe(createOperatorSubscriber(subscriber, (value) => {
  22. for (const buffer of buffers) {
  23. buffer.push(value);
  24. }
  25. }, () => {
  26. while (buffers.length > 0) {
  27. subscriber.next(buffers.shift());
  28. }
  29. subscriber.complete();
  30. }));
  31. });
  32. }
  33. //# sourceMappingURL=bufferToggle.js.map