WebSocketSubject.js 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import { Subject, AnonymousSubject } from '../../Subject';
  2. import { Subscriber } from '../../Subscriber';
  3. import { Observable } from '../../Observable';
  4. import { Subscription } from '../../Subscription';
  5. import { ReplaySubject } from '../../ReplaySubject';
  6. const DEFAULT_WEBSOCKET_CONFIG = {
  7. url: '',
  8. deserializer: (e) => JSON.parse(e.data),
  9. serializer: (value) => JSON.stringify(value),
  10. };
  11. const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT = 'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }';
  12. export class WebSocketSubject extends AnonymousSubject {
  13. constructor(urlConfigOrSource, destination) {
  14. super();
  15. this._socket = null;
  16. if (urlConfigOrSource instanceof Observable) {
  17. this.destination = destination;
  18. this.source = urlConfigOrSource;
  19. }
  20. else {
  21. const config = (this._config = Object.assign({}, DEFAULT_WEBSOCKET_CONFIG));
  22. this._output = new Subject();
  23. if (typeof urlConfigOrSource === 'string') {
  24. config.url = urlConfigOrSource;
  25. }
  26. else {
  27. for (const key in urlConfigOrSource) {
  28. if (urlConfigOrSource.hasOwnProperty(key)) {
  29. config[key] = urlConfigOrSource[key];
  30. }
  31. }
  32. }
  33. if (!config.WebSocketCtor && WebSocket) {
  34. config.WebSocketCtor = WebSocket;
  35. }
  36. else if (!config.WebSocketCtor) {
  37. throw new Error('no WebSocket constructor can be found');
  38. }
  39. this.destination = new ReplaySubject();
  40. }
  41. }
  42. lift(operator) {
  43. const sock = new WebSocketSubject(this._config, this.destination);
  44. sock.operator = operator;
  45. sock.source = this;
  46. return sock;
  47. }
  48. _resetState() {
  49. this._socket = null;
  50. if (!this.source) {
  51. this.destination = new ReplaySubject();
  52. }
  53. this._output = new Subject();
  54. }
  55. multiplex(subMsg, unsubMsg, messageFilter) {
  56. const self = this;
  57. return new Observable((observer) => {
  58. try {
  59. self.next(subMsg());
  60. }
  61. catch (err) {
  62. observer.error(err);
  63. }
  64. const subscription = self.subscribe({
  65. next: (x) => {
  66. try {
  67. if (messageFilter(x)) {
  68. observer.next(x);
  69. }
  70. }
  71. catch (err) {
  72. observer.error(err);
  73. }
  74. },
  75. error: (err) => observer.error(err),
  76. complete: () => observer.complete(),
  77. });
  78. return () => {
  79. try {
  80. self.next(unsubMsg());
  81. }
  82. catch (err) {
  83. observer.error(err);
  84. }
  85. subscription.unsubscribe();
  86. };
  87. });
  88. }
  89. _connectSocket() {
  90. const { WebSocketCtor, protocol, url, binaryType } = this._config;
  91. const observer = this._output;
  92. let socket = null;
  93. try {
  94. socket = protocol ? new WebSocketCtor(url, protocol) : new WebSocketCtor(url);
  95. this._socket = socket;
  96. if (binaryType) {
  97. this._socket.binaryType = binaryType;
  98. }
  99. }
  100. catch (e) {
  101. observer.error(e);
  102. return;
  103. }
  104. const subscription = new Subscription(() => {
  105. this._socket = null;
  106. if (socket && socket.readyState === 1) {
  107. socket.close();
  108. }
  109. });
  110. socket.onopen = (evt) => {
  111. const { _socket } = this;
  112. if (!_socket) {
  113. socket.close();
  114. this._resetState();
  115. return;
  116. }
  117. const { openObserver } = this._config;
  118. if (openObserver) {
  119. openObserver.next(evt);
  120. }
  121. const queue = this.destination;
  122. this.destination = Subscriber.create((x) => {
  123. if (socket.readyState === 1) {
  124. try {
  125. const { serializer } = this._config;
  126. socket.send(serializer(x));
  127. }
  128. catch (e) {
  129. this.destination.error(e);
  130. }
  131. }
  132. }, (err) => {
  133. const { closingObserver } = this._config;
  134. if (closingObserver) {
  135. closingObserver.next(undefined);
  136. }
  137. if (err && err.code) {
  138. socket.close(err.code, err.reason);
  139. }
  140. else {
  141. observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
  142. }
  143. this._resetState();
  144. }, () => {
  145. const { closingObserver } = this._config;
  146. if (closingObserver) {
  147. closingObserver.next(undefined);
  148. }
  149. socket.close();
  150. this._resetState();
  151. });
  152. if (queue && queue instanceof ReplaySubject) {
  153. subscription.add(queue.subscribe(this.destination));
  154. }
  155. };
  156. socket.onerror = (e) => {
  157. this._resetState();
  158. observer.error(e);
  159. };
  160. socket.onclose = (e) => {
  161. if (socket === this._socket) {
  162. this._resetState();
  163. }
  164. const { closeObserver } = this._config;
  165. if (closeObserver) {
  166. closeObserver.next(e);
  167. }
  168. if (e.wasClean) {
  169. observer.complete();
  170. }
  171. else {
  172. observer.error(e);
  173. }
  174. };
  175. socket.onmessage = (e) => {
  176. try {
  177. const { deserializer } = this._config;
  178. observer.next(deserializer(e));
  179. }
  180. catch (err) {
  181. observer.error(err);
  182. }
  183. };
  184. }
  185. _subscribe(subscriber) {
  186. const { source } = this;
  187. if (source) {
  188. return source.subscribe(subscriber);
  189. }
  190. if (!this._socket) {
  191. this._connectSocket();
  192. }
  193. this._output.subscribe(subscriber);
  194. subscriber.add(() => {
  195. const { _socket } = this;
  196. if (this._output.observers.length === 0) {
  197. if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) {
  198. _socket.close();
  199. }
  200. this._resetState();
  201. }
  202. });
  203. return subscriber;
  204. }
  205. unsubscribe() {
  206. const { _socket } = this;
  207. if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) {
  208. _socket.close();
  209. }
  210. this._resetState();
  211. super.unsubscribe();
  212. }
  213. }
  214. //# sourceMappingURL=WebSocketSubject.js.map