bindCallbackInternals.js 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import { isScheduler } from '../util/isScheduler';
  2. import { Observable } from '../Observable';
  3. import { subscribeOn } from '../operators/subscribeOn';
  4. import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
  5. import { observeOn } from '../operators/observeOn';
  6. import { AsyncSubject } from '../AsyncSubject';
  7. export function bindCallbackInternals(isNodeStyle, callbackFunc, resultSelector, scheduler) {
  8. if (resultSelector) {
  9. if (isScheduler(resultSelector)) {
  10. scheduler = resultSelector;
  11. }
  12. else {
  13. return function (...args) {
  14. return bindCallbackInternals(isNodeStyle, callbackFunc, scheduler)
  15. .apply(this, args)
  16. .pipe(mapOneOrManyArgs(resultSelector));
  17. };
  18. }
  19. }
  20. if (scheduler) {
  21. return function (...args) {
  22. return bindCallbackInternals(isNodeStyle, callbackFunc)
  23. .apply(this, args)
  24. .pipe(subscribeOn(scheduler), observeOn(scheduler));
  25. };
  26. }
  27. return function (...args) {
  28. const subject = new AsyncSubject();
  29. let uninitialized = true;
  30. return new Observable((subscriber) => {
  31. const subs = subject.subscribe(subscriber);
  32. if (uninitialized) {
  33. uninitialized = false;
  34. let isAsync = false;
  35. let isComplete = false;
  36. callbackFunc.apply(this, [
  37. ...args,
  38. (...results) => {
  39. if (isNodeStyle) {
  40. const err = results.shift();
  41. if (err != null) {
  42. subject.error(err);
  43. return;
  44. }
  45. }
  46. subject.next(1 < results.length ? results : results[0]);
  47. isComplete = true;
  48. if (isAsync) {
  49. subject.complete();
  50. }
  51. },
  52. ]);
  53. if (isComplete) {
  54. subject.complete();
  55. }
  56. isAsync = true;
  57. }
  58. return subs;
  59. });
  60. };
  61. }
  62. //# sourceMappingURL=bindCallbackInternals.js.map