share.js 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import { innerFrom } from '../observable/innerFrom';
  2. import { Subject } from '../Subject';
  3. import { SafeSubscriber } from '../Subscriber';
  4. import { operate } from '../util/lift';
  5. export function share(options = {}) {
  6. const { connector = () => new Subject(), resetOnError = true, resetOnComplete = true, resetOnRefCountZero = true } = options;
  7. return (wrapperSource) => {
  8. let connection;
  9. let resetConnection;
  10. let subject;
  11. let refCount = 0;
  12. let hasCompleted = false;
  13. let hasErrored = false;
  14. const cancelReset = () => {
  15. resetConnection === null || resetConnection === void 0 ? void 0 : resetConnection.unsubscribe();
  16. resetConnection = undefined;
  17. };
  18. const reset = () => {
  19. cancelReset();
  20. connection = subject = undefined;
  21. hasCompleted = hasErrored = false;
  22. };
  23. const resetAndUnsubscribe = () => {
  24. const conn = connection;
  25. reset();
  26. conn === null || conn === void 0 ? void 0 : conn.unsubscribe();
  27. };
  28. return operate((source, subscriber) => {
  29. refCount++;
  30. if (!hasErrored && !hasCompleted) {
  31. cancelReset();
  32. }
  33. const dest = (subject = subject !== null && subject !== void 0 ? subject : connector());
  34. subscriber.add(() => {
  35. refCount--;
  36. if (refCount === 0 && !hasErrored && !hasCompleted) {
  37. resetConnection = handleReset(resetAndUnsubscribe, resetOnRefCountZero);
  38. }
  39. });
  40. dest.subscribe(subscriber);
  41. if (!connection &&
  42. refCount > 0) {
  43. connection = new SafeSubscriber({
  44. next: (value) => dest.next(value),
  45. error: (err) => {
  46. hasErrored = true;
  47. cancelReset();
  48. resetConnection = handleReset(reset, resetOnError, err);
  49. dest.error(err);
  50. },
  51. complete: () => {
  52. hasCompleted = true;
  53. cancelReset();
  54. resetConnection = handleReset(reset, resetOnComplete);
  55. dest.complete();
  56. },
  57. });
  58. innerFrom(source).subscribe(connection);
  59. }
  60. })(wrapperSource);
  61. };
  62. }
  63. function handleReset(reset, on, ...args) {
  64. if (on === true) {
  65. reset();
  66. return;
  67. }
  68. if (on === false) {
  69. return;
  70. }
  71. const onSubscriber = new SafeSubscriber({
  72. next: () => {
  73. onSubscriber.unsubscribe();
  74. reset();
  75. },
  76. });
  77. return innerFrom(on(...args)).subscribe(onSubscriber);
  78. }
  79. //# sourceMappingURL=share.js.map