zip.js 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. import { Observable } from '../Observable';
  2. import { innerFrom } from './innerFrom';
  3. import { argsOrArgArray } from '../util/argsOrArgArray';
  4. import { EMPTY } from './empty';
  5. import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
  6. import { popResultSelector } from '../util/args';
  7. export function zip(...args) {
  8. const resultSelector = popResultSelector(args);
  9. const sources = argsOrArgArray(args);
  10. return sources.length
  11. ? new Observable((subscriber) => {
  12. let buffers = sources.map(() => []);
  13. let completed = sources.map(() => false);
  14. subscriber.add(() => {
  15. buffers = completed = null;
  16. });
  17. for (let sourceIndex = 0; !subscriber.closed && sourceIndex < sources.length; sourceIndex++) {
  18. innerFrom(sources[sourceIndex]).subscribe(createOperatorSubscriber(subscriber, (value) => {
  19. buffers[sourceIndex].push(value);
  20. if (buffers.every((buffer) => buffer.length)) {
  21. const result = buffers.map((buffer) => buffer.shift());
  22. subscriber.next(resultSelector ? resultSelector(...result) : result);
  23. if (buffers.some((buffer, i) => !buffer.length && completed[i])) {
  24. subscriber.complete();
  25. }
  26. }
  27. }, () => {
  28. completed[sourceIndex] = true;
  29. !buffers[sourceIndex].length && subscriber.complete();
  30. }));
  31. }
  32. return () => {
  33. buffers = completed = null;
  34. };
  35. })
  36. : EMPTY;
  37. }
  38. //# sourceMappingURL=zip.js.map