mergeInternals.js 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. import { innerFrom } from '../observable/innerFrom';
  2. import { executeSchedule } from '../util/executeSchedule';
  3. import { createOperatorSubscriber } from './OperatorSubscriber';
  4. export function mergeInternals(source, subscriber, project, concurrent, onBeforeNext, expand, innerSubScheduler, additionalFinalizer) {
  5. var buffer = [];
  6. var active = 0;
  7. var index = 0;
  8. var isComplete = false;
  9. var checkComplete = function () {
  10. if (isComplete && !buffer.length && !active) {
  11. subscriber.complete();
  12. }
  13. };
  14. var outerNext = function (value) { return (active < concurrent ? doInnerSub(value) : buffer.push(value)); };
  15. var doInnerSub = function (value) {
  16. expand && subscriber.next(value);
  17. active++;
  18. var innerComplete = false;
  19. innerFrom(project(value, index++)).subscribe(createOperatorSubscriber(subscriber, function (innerValue) {
  20. onBeforeNext === null || onBeforeNext === void 0 ? void 0 : onBeforeNext(innerValue);
  21. if (expand) {
  22. outerNext(innerValue);
  23. }
  24. else {
  25. subscriber.next(innerValue);
  26. }
  27. }, function () {
  28. innerComplete = true;
  29. }, undefined, function () {
  30. if (innerComplete) {
  31. try {
  32. active--;
  33. var _loop_1 = function () {
  34. var bufferedValue = buffer.shift();
  35. if (innerSubScheduler) {
  36. executeSchedule(subscriber, innerSubScheduler, function () { return doInnerSub(bufferedValue); });
  37. }
  38. else {
  39. doInnerSub(bufferedValue);
  40. }
  41. };
  42. while (buffer.length && active < concurrent) {
  43. _loop_1();
  44. }
  45. checkComplete();
  46. }
  47. catch (err) {
  48. subscriber.error(err);
  49. }
  50. }
  51. }));
  52. };
  53. source.subscribe(createOperatorSubscriber(subscriber, outerNext, function () {
  54. isComplete = true;
  55. checkComplete();
  56. }));
  57. return function () {
  58. additionalFinalizer === null || additionalFinalizer === void 0 ? void 0 : additionalFinalizer();
  59. };
  60. }
  61. //# sourceMappingURL=mergeInternals.js.map