forkJoin.js 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. import { Observable } from '../Observable';
  2. import { argsArgArrayOrObject } from '../util/argsArgArrayOrObject';
  3. import { innerFrom } from './innerFrom';
  4. import { popResultSelector } from '../util/args';
  5. import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
  6. import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
  7. import { createObject } from '../util/createObject';
  8. export function forkJoin(...args) {
  9. const resultSelector = popResultSelector(args);
  10. const { args: sources, keys } = argsArgArrayOrObject(args);
  11. const result = new Observable((subscriber) => {
  12. const { length } = sources;
  13. if (!length) {
  14. subscriber.complete();
  15. return;
  16. }
  17. const values = new Array(length);
  18. let remainingCompletions = length;
  19. let remainingEmissions = length;
  20. for (let sourceIndex = 0; sourceIndex < length; sourceIndex++) {
  21. let hasValue = false;
  22. innerFrom(sources[sourceIndex]).subscribe(createOperatorSubscriber(subscriber, (value) => {
  23. if (!hasValue) {
  24. hasValue = true;
  25. remainingEmissions--;
  26. }
  27. values[sourceIndex] = value;
  28. }, () => remainingCompletions--, undefined, () => {
  29. if (!remainingCompletions || !hasValue) {
  30. if (!remainingEmissions) {
  31. subscriber.next(keys ? createObject(keys, values) : values);
  32. }
  33. subscriber.complete();
  34. }
  35. }));
  36. }
  37. });
  38. return resultSelector ? result.pipe(mapOneOrManyArgs(resultSelector)) : result;
  39. }
  40. //# sourceMappingURL=forkJoin.js.map