ConnectableObservable.js 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. import { Observable } from '../Observable';
  2. import { Subscription } from '../Subscription';
  3. import { refCount as higherOrderRefCount } from '../operators/refCount';
  4. import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
  5. import { hasLift } from '../util/lift';
  6. export class ConnectableObservable extends Observable {
  7. constructor(source, subjectFactory) {
  8. super();
  9. this.source = source;
  10. this.subjectFactory = subjectFactory;
  11. this._subject = null;
  12. this._refCount = 0;
  13. this._connection = null;
  14. if (hasLift(source)) {
  15. this.lift = source.lift;
  16. }
  17. }
  18. _subscribe(subscriber) {
  19. return this.getSubject().subscribe(subscriber);
  20. }
  21. getSubject() {
  22. const subject = this._subject;
  23. if (!subject || subject.isStopped) {
  24. this._subject = this.subjectFactory();
  25. }
  26. return this._subject;
  27. }
  28. _teardown() {
  29. this._refCount = 0;
  30. const { _connection } = this;
  31. this._subject = this._connection = null;
  32. _connection === null || _connection === void 0 ? void 0 : _connection.unsubscribe();
  33. }
  34. connect() {
  35. let connection = this._connection;
  36. if (!connection) {
  37. connection = this._connection = new Subscription();
  38. const subject = this.getSubject();
  39. connection.add(this.source.subscribe(createOperatorSubscriber(subject, undefined, () => {
  40. this._teardown();
  41. subject.complete();
  42. }, (err) => {
  43. this._teardown();
  44. subject.error(err);
  45. }, () => this._teardown())));
  46. if (connection.closed) {
  47. this._connection = null;
  48. connection = Subscription.EMPTY;
  49. }
  50. }
  51. return connection;
  52. }
  53. refCount() {
  54. return higherOrderRefCount()(this);
  55. }
  56. }
  57. //# sourceMappingURL=ConnectableObservable.js.map