combineLatest.js 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import { Observable } from '../Observable';
  2. import { argsArgArrayOrObject } from '../util/argsArgArrayOrObject';
  3. import { from } from './from';
  4. import { identity } from '../util/identity';
  5. import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
  6. import { popResultSelector, popScheduler } from '../util/args';
  7. import { createObject } from '../util/createObject';
  8. import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
  9. import { executeSchedule } from '../util/executeSchedule';
  10. export function combineLatest(...args) {
  11. const scheduler = popScheduler(args);
  12. const resultSelector = popResultSelector(args);
  13. const { args: observables, keys } = argsArgArrayOrObject(args);
  14. if (observables.length === 0) {
  15. return from([], scheduler);
  16. }
  17. const result = new Observable(combineLatestInit(observables, scheduler, keys
  18. ?
  19. (values) => createObject(keys, values)
  20. :
  21. identity));
  22. return resultSelector ? result.pipe(mapOneOrManyArgs(resultSelector)) : result;
  23. }
  24. export function combineLatestInit(observables, scheduler, valueTransform = identity) {
  25. return (subscriber) => {
  26. maybeSchedule(scheduler, () => {
  27. const { length } = observables;
  28. const values = new Array(length);
  29. let active = length;
  30. let remainingFirstValues = length;
  31. for (let i = 0; i < length; i++) {
  32. maybeSchedule(scheduler, () => {
  33. const source = from(observables[i], scheduler);
  34. let hasFirstValue = false;
  35. source.subscribe(createOperatorSubscriber(subscriber, (value) => {
  36. values[i] = value;
  37. if (!hasFirstValue) {
  38. hasFirstValue = true;
  39. remainingFirstValues--;
  40. }
  41. if (!remainingFirstValues) {
  42. subscriber.next(valueTransform(values.slice()));
  43. }
  44. }, () => {
  45. if (!--active) {
  46. subscriber.complete();
  47. }
  48. }));
  49. }, subscriber);
  50. }
  51. }, subscriber);
  52. };
  53. }
  54. function maybeSchedule(scheduler, execute, subscription) {
  55. if (scheduler) {
  56. executeSchedule(subscription, scheduler, execute);
  57. }
  58. else {
  59. execute();
  60. }
  61. }
  62. //# sourceMappingURL=combineLatest.js.map