forkJoin.js 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. import { Observable } from '../Observable';
  2. import { argsArgArrayOrObject } from '../util/argsArgArrayOrObject';
  3. import { innerFrom } from './innerFrom';
  4. import { popResultSelector } from '../util/args';
  5. import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
  6. import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
  7. import { createObject } from '../util/createObject';
  8. export function forkJoin() {
  9. var args = [];
  10. for (var _i = 0; _i < arguments.length; _i++) {
  11. args[_i] = arguments[_i];
  12. }
  13. var resultSelector = popResultSelector(args);
  14. var _a = argsArgArrayOrObject(args), sources = _a.args, keys = _a.keys;
  15. var result = new Observable(function (subscriber) {
  16. var length = sources.length;
  17. if (!length) {
  18. subscriber.complete();
  19. return;
  20. }
  21. var values = new Array(length);
  22. var remainingCompletions = length;
  23. var remainingEmissions = length;
  24. var _loop_1 = function (sourceIndex) {
  25. var hasValue = false;
  26. innerFrom(sources[sourceIndex]).subscribe(createOperatorSubscriber(subscriber, function (value) {
  27. if (!hasValue) {
  28. hasValue = true;
  29. remainingEmissions--;
  30. }
  31. values[sourceIndex] = value;
  32. }, function () { return remainingCompletions--; }, undefined, function () {
  33. if (!remainingCompletions || !hasValue) {
  34. if (!remainingEmissions) {
  35. subscriber.next(keys ? createObject(keys, values) : values);
  36. }
  37. subscriber.complete();
  38. }
  39. }));
  40. };
  41. for (var sourceIndex = 0; sourceIndex < length; sourceIndex++) {
  42. _loop_1(sourceIndex);
  43. }
  44. });
  45. return resultSelector ? result.pipe(mapOneOrManyArgs(resultSelector)) : result;
  46. }
  47. //# sourceMappingURL=forkJoin.js.map