WebSocketSubject.js 7.8 KB


  1. import { __assign, __extends } from "tslib";
  2. import { Subject, AnonymousSubject } from '../../Subject';
  3. import { Subscriber } from '../../Subscriber';
  4. import { Observable } from '../../Observable';
  5. import { Subscription } from '../../Subscription';
  6. import { ReplaySubject } from '../../ReplaySubject';
  7. var DEFAULT_WEBSOCKET_CONFIG = {
  8. url: '',
  9. deserializer: function (e) { return JSON.parse(e.data); },
  10. serializer: function (value) { return JSON.stringify(value); },
  11. };
  12. var WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT = 'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }';
  13. var WebSocketSubject = (function (_super) {
  14. __extends(WebSocketSubject, _super);
  15. function WebSocketSubject(urlConfigOrSource, destination) {
  16. var _this = _super.call(this) || this;
  17. _this._socket = null;
  18. if (urlConfigOrSource instanceof Observable) {
  19. _this.destination = destination;
  20. _this.source = urlConfigOrSource;
  21. }
  22. else {
  23. var config = (_this._config = __assign({}, DEFAULT_WEBSOCKET_CONFIG));
  24. _this._output = new Subject();
  25. if (typeof urlConfigOrSource === 'string') {
  26. config.url = urlConfigOrSource;
  27. }
  28. else {
  29. for (var key in urlConfigOrSource) {
  30. if (urlConfigOrSource.hasOwnProperty(key)) {
  31. config[key] = urlConfigOrSource[key];
  32. }
  33. }
  34. }
  35. if (!config.WebSocketCtor && WebSocket) {
  36. config.WebSocketCtor = WebSocket;
  37. }
  38. else if (!config.WebSocketCtor) {
  39. throw new Error('no WebSocket constructor can be found');
  40. }
  41. _this.destination = new ReplaySubject();
  42. }
  43. return _this;
  44. }
  45. WebSocketSubject.prototype.lift = function (operator) {
  46. var sock = new WebSocketSubject(this._config, this.destination);
  47. sock.operator = operator;
  48. sock.source = this;
  49. return sock;
  50. };
  51. WebSocketSubject.prototype._resetState = function () {
  52. this._socket = null;
  53. if (!this.source) {
  54. this.destination = new ReplaySubject();
  55. }
  56. this._output = new Subject();
  57. };
  58. WebSocketSubject.prototype.multiplex = function (subMsg, unsubMsg, messageFilter) {
  59. var self = this;
  60. return new Observable(function (observer) {
  61. try {
  62. self.next(subMsg());
  63. }
  64. catch (err) {
  65. observer.error(err);
  66. }
  67. var subscription = self.subscribe({
  68. next: function (x) {
  69. try {
  70. if (messageFilter(x)) {
  71. observer.next(x);
  72. }
  73. }
  74. catch (err) {
  75. observer.error(err);
  76. }
  77. },
  78. error: function (err) { return observer.error(err); },
  79. complete: function () { return observer.complete(); },
  80. });
  81. return function () {
  82. try {
  83. self.next(unsubMsg());
  84. }
  85. catch (err) {
  86. observer.error(err);
  87. }
  88. subscription.unsubscribe();
  89. };
  90. });
  91. };
  92. WebSocketSubject.prototype._connectSocket = function () {
  93. var _this = this;
  94. var _a = this._config, WebSocketCtor = _a.WebSocketCtor, protocol = _a.protocol, url = _a.url, binaryType = _a.binaryType;
  95. var observer = this._output;
  96. var socket = null;
  97. try {
  98. socket = protocol ? new WebSocketCtor(url, protocol) : new WebSocketCtor(url);
  99. this._socket = socket;
  100. if (binaryType) {
  101. this._socket.binaryType = binaryType;
  102. }
  103. }
  104. catch (e) {
  105. observer.error(e);
  106. return;
  107. }
  108. var subscription = new Subscription(function () {
  109. _this._socket = null;
  110. if (socket && socket.readyState === 1) {
  111. socket.close();
  112. }
  113. });
  114. socket.onopen = function (evt) {
  115. var _socket = _this._socket;
  116. if (!_socket) {
  117. socket.close();
  118. _this._resetState();
  119. return;
  120. }
  121. var openObserver = _this._config.openObserver;
  122. if (openObserver) {
  123. openObserver.next(evt);
  124. }
  125. var queue = _this.destination;
  126. _this.destination = Subscriber.create(function (x) {
  127. if (socket.readyState === 1) {
  128. try {
  129. var serializer = _this._config.serializer;
  130. socket.send(serializer(x));
  131. }
  132. catch (e) {
  133. _this.destination.error(e);
  134. }
  135. }
  136. }, function (err) {
  137. var closingObserver = _this._config.closingObserver;
  138. if (closingObserver) {
  139. closingObserver.next(undefined);
  140. }
  141. if (err && err.code) {
  142. socket.close(err.code, err.reason);
  143. }
  144. else {
  145. observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
  146. }
  147. _this._resetState();
  148. }, function () {
  149. var closingObserver = _this._config.closingObserver;
  150. if (closingObserver) {
  151. closingObserver.next(undefined);
  152. }
  153. socket.close();
  154. _this._resetState();
  155. });
  156. if (queue && queue instanceof ReplaySubject) {
  157. subscription.add(queue.subscribe(_this.destination));
  158. }
  159. };
  160. socket.onerror = function (e) {
  161. _this._resetState();
  162. observer.error(e);
  163. };
  164. socket.onclose = function (e) {
  165. if (socket === _this._socket) {
  166. _this._resetState();
  167. }
  168. var closeObserver = _this._config.closeObserver;
  169. if (closeObserver) {
  170. closeObserver.next(e);
  171. }
  172. if (e.wasClean) {
  173. observer.complete();
  174. }
  175. else {
  176. observer.error(e);
  177. }
  178. };
  179. socket.onmessage = function (e) {
  180. try {
  181. var deserializer = _this._config.deserializer;
  182. observer.next(deserializer(e));
  183. }
  184. catch (err) {
  185. observer.error(err);
  186. }
  187. };
  188. };
  189. WebSocketSubject.prototype._subscribe = function (subscriber) {
  190. var _this = this;
  191. var source = this.source;
  192. if (source) {
  193. return source.subscribe(subscriber);
  194. }
  195. if (!this._socket) {
  196. this._connectSocket();
  197. }
  198. this._output.subscribe(subscriber);
  199. subscriber.add(function () {
  200. var _socket = _this._socket;
  201. if (_this._output.observers.length === 0) {
  202. if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) {
  203. _socket.close();
  204. }
  205. _this._resetState();
  206. }
  207. });
  208. return subscriber;
  209. };
  210. WebSocketSubject.prototype.unsubscribe = function () {
  211. var _socket = this._socket;
  212. if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) {
  213. _socket.close();
  214. }
  215. this._resetState();
  216. _super.prototype.unsubscribe.call(this);
  217. };
  218. return WebSocketSubject;
  219. }(AnonymousSubject));
  220. export { WebSocketSubject };
  221. //# sourceMappingURL=WebSocketSubject.js.map