socket.js 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619
  1. import { transports } from "./transports/index.js";
  2. import { installTimerFunctions, byteLength } from "./util.js";
  3. import { decode } from "./contrib/parseqs.js";
  4. import { parse } from "./contrib/parseuri.js";
  5. import debugModule from "debug"; // debug()
  6. import { Emitter } from "@socket.io/component-emitter";
  7. import { protocol } from "engine.io-parser";
  8. import { defaultBinaryType } from "./transports/websocket-constructor.js";
  9. const debug = debugModule("engine.io-client:socket"); // debug()
  10. export class Socket extends Emitter {
  11. /**
  12. * Socket constructor.
  13. *
  14. * @param {String|Object} uri - uri or options
  15. * @param {Object} opts - options
  16. */
  17. constructor(uri, opts = {}) {
  18. super();
  19. this.binaryType = defaultBinaryType;
  20. this.writeBuffer = [];
  21. if (uri && "object" === typeof uri) {
  22. opts = uri;
  23. uri = null;
  24. }
  25. if (uri) {
  26. uri = parse(uri);
  27. opts.hostname = uri.host;
  28. opts.secure = uri.protocol === "https" || uri.protocol === "wss";
  29. opts.port = uri.port;
  30. if (uri.query)
  31. opts.query = uri.query;
  32. }
  33. else if (opts.host) {
  34. opts.hostname = parse(opts.host).host;
  35. }
  36. installTimerFunctions(this, opts);
  37. this.secure =
  38. null != opts.secure
  39. ? opts.secure
  40. : typeof location !== "undefined" && "https:" === location.protocol;
  41. if (opts.hostname && !opts.port) {
  42. // if no port is specified manually, use the protocol default
  43. opts.port = this.secure ? "443" : "80";
  44. }
  45. this.hostname =
  46. opts.hostname ||
  47. (typeof location !== "undefined" ? location.hostname : "localhost");
  48. this.port =
  49. opts.port ||
  50. (typeof location !== "undefined" && location.port
  51. ? location.port
  52. : this.secure
  53. ? "443"
  54. : "80");
  55. this.transports = opts.transports || [
  56. "polling",
  57. "websocket",
  58. "webtransport",
  59. ];
  60. this.writeBuffer = [];
  61. this.prevBufferLen = 0;
  62. this.opts = Object.assign({
  63. path: "/engine.io",
  64. agent: false,
  65. withCredentials: false,
  66. upgrade: true,
  67. timestampParam: "t",
  68. rememberUpgrade: false,
  69. addTrailingSlash: true,
  70. rejectUnauthorized: true,
  71. perMessageDeflate: {
  72. threshold: 1024,
  73. },
  74. transportOptions: {},
  75. closeOnBeforeunload: false,
  76. }, opts);
  77. this.opts.path =
  78. this.opts.path.replace(/\/$/, "") +
  79. (this.opts.addTrailingSlash ? "/" : "");
  80. if (typeof this.opts.query === "string") {
  81. this.opts.query = decode(this.opts.query);
  82. }
  83. // set on handshake
  84. this.id = null;
  85. this.upgrades = null;
  86. this.pingInterval = null;
  87. this.pingTimeout = null;
  88. // set on heartbeat
  89. this.pingTimeoutTimer = null;
  90. if (typeof addEventListener === "function") {
  91. if (this.opts.closeOnBeforeunload) {
  92. // Firefox closes the connection when the "beforeunload" event is emitted but not Chrome. This event listener
  93. // ensures every browser behaves the same (no "disconnect" event at the Socket.IO level when the page is
  94. // closed/reloaded)
  95. this.beforeunloadEventListener = () => {
  96. if (this.transport) {
  97. // silently close the transport
  98. this.transport.removeAllListeners();
  99. this.transport.close();
  100. }
  101. };
  102. addEventListener("beforeunload", this.beforeunloadEventListener, false);
  103. }
  104. if (this.hostname !== "localhost") {
  105. this.offlineEventListener = () => {
  106. this.onClose("transport close", {
  107. description: "network connection lost",
  108. });
  109. };
  110. addEventListener("offline", this.offlineEventListener, false);
  111. }
  112. }
  113. this.open();
  114. }
  115. /**
  116. * Creates transport of the given type.
  117. *
  118. * @param {String} name - transport name
  119. * @return {Transport}
  120. * @private
  121. */
  122. createTransport(name) {
  123. debug('creating transport "%s"', name);
  124. const query = Object.assign({}, this.opts.query);
  125. // append engine.io protocol identifier
  126. query.EIO = protocol;
  127. // transport name
  128. query.transport = name;
  129. // session id if we already have one
  130. if (this.id)
  131. query.sid = this.id;
  132. const opts = Object.assign({}, this.opts, {
  133. query,
  134. socket: this,
  135. hostname: this.hostname,
  136. secure: this.secure,
  137. port: this.port,
  138. }, this.opts.transportOptions[name]);
  139. debug("options: %j", opts);
  140. return new transports[name](opts);
  141. }
  142. /**
  143. * Initializes transport to use and starts probe.
  144. *
  145. * @private
  146. */
  147. open() {
  148. let transport;
  149. if (this.opts.rememberUpgrade &&
  150. Socket.priorWebsocketSuccess &&
  151. this.transports.indexOf("websocket") !== -1) {
  152. transport = "websocket";
  153. }
  154. else if (0 === this.transports.length) {
  155. // Emit error on next tick so it can be listened to
  156. this.setTimeoutFn(() => {
  157. this.emitReserved("error", "No transports available");
  158. }, 0);
  159. return;
  160. }
  161. else {
  162. transport = this.transports[0];
  163. }
  164. this.readyState = "opening";
  165. // Retry with the next transport if the transport is disabled (jsonp: false)
  166. try {
  167. transport = this.createTransport(transport);
  168. }
  169. catch (e) {
  170. debug("error while creating transport: %s", e);
  171. this.transports.shift();
  172. this.open();
  173. return;
  174. }
  175. transport.open();
  176. this.setTransport(transport);
  177. }
  178. /**
  179. * Sets the current transport. Disables the existing one (if any).
  180. *
  181. * @private
  182. */
  183. setTransport(transport) {
  184. debug("setting transport %s", transport.name);
  185. if (this.transport) {
  186. debug("clearing existing transport %s", this.transport.name);
  187. this.transport.removeAllListeners();
  188. }
  189. // set up transport
  190. this.transport = transport;
  191. // set up transport listeners
  192. transport
  193. .on("drain", this.onDrain.bind(this))
  194. .on("packet", this.onPacket.bind(this))
  195. .on("error", this.onError.bind(this))
  196. .on("close", (reason) => this.onClose("transport close", reason));
  197. }
  198. /**
  199. * Probes a transport.
  200. *
  201. * @param {String} name - transport name
  202. * @private
  203. */
  204. probe(name) {
  205. debug('probing transport "%s"', name);
  206. let transport = this.createTransport(name);
  207. let failed = false;
  208. Socket.priorWebsocketSuccess = false;
  209. const onTransportOpen = () => {
  210. if (failed)
  211. return;
  212. debug('probe transport "%s" opened', name);
  213. transport.send([{ type: "ping", data: "probe" }]);
  214. transport.once("packet", (msg) => {
  215. if (failed)
  216. return;
  217. if ("pong" === msg.type && "probe" === msg.data) {
  218. debug('probe transport "%s" pong', name);
  219. this.upgrading = true;
  220. this.emitReserved("upgrading", transport);
  221. if (!transport)
  222. return;
  223. Socket.priorWebsocketSuccess = "websocket" === transport.name;
  224. debug('pausing current transport "%s"', this.transport.name);
  225. this.transport.pause(() => {
  226. if (failed)
  227. return;
  228. if ("closed" === this.readyState)
  229. return;
  230. debug("changing transport and sending upgrade packet");
  231. cleanup();
  232. this.setTransport(transport);
  233. transport.send([{ type: "upgrade" }]);
  234. this.emitReserved("upgrade", transport);
  235. transport = null;
  236. this.upgrading = false;
  237. this.flush();
  238. });
  239. }
  240. else {
  241. debug('probe transport "%s" failed', name);
  242. const err = new Error("probe error");
  243. // @ts-ignore
  244. err.transport = transport.name;
  245. this.emitReserved("upgradeError", err);
  246. }
  247. });
  248. };
  249. function freezeTransport() {
  250. if (failed)
  251. return;
  252. // Any callback called by transport should be ignored since now
  253. failed = true;
  254. cleanup();
  255. transport.close();
  256. transport = null;
  257. }
  258. // Handle any error that happens while probing
  259. const onerror = (err) => {
  260. const error = new Error("probe error: " + err);
  261. // @ts-ignore
  262. error.transport = transport.name;
  263. freezeTransport();
  264. debug('probe transport "%s" failed because of error: %s', name, err);
  265. this.emitReserved("upgradeError", error);
  266. };
  267. function onTransportClose() {
  268. onerror("transport closed");
  269. }
  270. // When the socket is closed while we're probing
  271. function onclose() {
  272. onerror("socket closed");
  273. }
  274. // When the socket is upgraded while we're probing
  275. function onupgrade(to) {
  276. if (transport && to.name !== transport.name) {
  277. debug('"%s" works - aborting "%s"', to.name, transport.name);
  278. freezeTransport();
  279. }
  280. }
  281. // Remove all listeners on the transport and on self
  282. const cleanup = () => {
  283. transport.removeListener("open", onTransportOpen);
  284. transport.removeListener("error", onerror);
  285. transport.removeListener("close", onTransportClose);
  286. this.off("close", onclose);
  287. this.off("upgrading", onupgrade);
  288. };
  289. transport.once("open", onTransportOpen);
  290. transport.once("error", onerror);
  291. transport.once("close", onTransportClose);
  292. this.once("close", onclose);
  293. this.once("upgrading", onupgrade);
  294. if (this.upgrades.indexOf("webtransport") !== -1 &&
  295. name !== "webtransport") {
  296. // favor WebTransport
  297. this.setTimeoutFn(() => {
  298. if (!failed) {
  299. transport.open();
  300. }
  301. }, 200);
  302. }
  303. else {
  304. transport.open();
  305. }
  306. }
  307. /**
  308. * Called when connection is deemed open.
  309. *
  310. * @private
  311. */
  312. onOpen() {
  313. debug("socket open");
  314. this.readyState = "open";
  315. Socket.priorWebsocketSuccess = "websocket" === this.transport.name;
  316. this.emitReserved("open");
  317. this.flush();
  318. // we check for `readyState` in case an `open`
  319. // listener already closed the socket
  320. if ("open" === this.readyState && this.opts.upgrade) {
  321. debug("starting upgrade probes");
  322. let i = 0;
  323. const l = this.upgrades.length;
  324. for (; i < l; i++) {
  325. this.probe(this.upgrades[i]);
  326. }
  327. }
  328. }
  329. /**
  330. * Handles a packet.
  331. *
  332. * @private
  333. */
  334. onPacket(packet) {
  335. if ("opening" === this.readyState ||
  336. "open" === this.readyState ||
  337. "closing" === this.readyState) {
  338. debug('socket receive: type "%s", data "%s"', packet.type, packet.data);
  339. this.emitReserved("packet", packet);
  340. // Socket is live - any packet counts
  341. this.emitReserved("heartbeat");
  342. this.resetPingTimeout();
  343. switch (packet.type) {
  344. case "open":
  345. this.onHandshake(JSON.parse(packet.data));
  346. break;
  347. case "ping":
  348. this.sendPacket("pong");
  349. this.emitReserved("ping");
  350. this.emitReserved("pong");
  351. break;
  352. case "error":
  353. const err = new Error("server error");
  354. // @ts-ignore
  355. err.code = packet.data;
  356. this.onError(err);
  357. break;
  358. case "message":
  359. this.emitReserved("data", packet.data);
  360. this.emitReserved("message", packet.data);
  361. break;
  362. }
  363. }
  364. else {
  365. debug('packet received with socket readyState "%s"', this.readyState);
  366. }
  367. }
  368. /**
  369. * Called upon handshake completion.
  370. *
  371. * @param {Object} data - handshake obj
  372. * @private
  373. */
  374. onHandshake(data) {
  375. this.emitReserved("handshake", data);
  376. this.id = data.sid;
  377. this.transport.query.sid = data.sid;
  378. this.upgrades = this.filterUpgrades(data.upgrades);
  379. this.pingInterval = data.pingInterval;
  380. this.pingTimeout = data.pingTimeout;
  381. this.maxPayload = data.maxPayload;
  382. this.onOpen();
  383. // In case open handler closes socket
  384. if ("closed" === this.readyState)
  385. return;
  386. this.resetPingTimeout();
  387. }
  388. /**
  389. * Sets and resets ping timeout timer based on server pings.
  390. *
  391. * @private
  392. */
  393. resetPingTimeout() {
  394. this.clearTimeoutFn(this.pingTimeoutTimer);
  395. this.pingTimeoutTimer = this.setTimeoutFn(() => {
  396. this.onClose("ping timeout");
  397. }, this.pingInterval + this.pingTimeout);
  398. if (this.opts.autoUnref) {
  399. this.pingTimeoutTimer.unref();
  400. }
  401. }
  402. /**
  403. * Called on `drain` event
  404. *
  405. * @private
  406. */
  407. onDrain() {
  408. this.writeBuffer.splice(0, this.prevBufferLen);
  409. // setting prevBufferLen = 0 is very important
  410. // for example, when upgrading, upgrade packet is sent over,
  411. // and a nonzero prevBufferLen could cause problems on `drain`
  412. this.prevBufferLen = 0;
  413. if (0 === this.writeBuffer.length) {
  414. this.emitReserved("drain");
  415. }
  416. else {
  417. this.flush();
  418. }
  419. }
  420. /**
  421. * Flush write buffers.
  422. *
  423. * @private
  424. */
  425. flush() {
  426. if ("closed" !== this.readyState &&
  427. this.transport.writable &&
  428. !this.upgrading &&
  429. this.writeBuffer.length) {
  430. const packets = this.getWritablePackets();
  431. debug("flushing %d packets in socket", packets.length);
  432. this.transport.send(packets);
  433. // keep track of current length of writeBuffer
  434. // splice writeBuffer and callbackBuffer on `drain`
  435. this.prevBufferLen = packets.length;
  436. this.emitReserved("flush");
  437. }
  438. }
  439. /**
  440. * Ensure the encoded size of the writeBuffer is below the maxPayload value sent by the server (only for HTTP
  441. * long-polling)
  442. *
  443. * @private
  444. */
  445. getWritablePackets() {
  446. const shouldCheckPayloadSize = this.maxPayload &&
  447. this.transport.name === "polling" &&
  448. this.writeBuffer.length > 1;
  449. if (!shouldCheckPayloadSize) {
  450. return this.writeBuffer;
  451. }
  452. let payloadSize = 1; // first packet type
  453. for (let i = 0; i < this.writeBuffer.length; i++) {
  454. const data = this.writeBuffer[i].data;
  455. if (data) {
  456. payloadSize += byteLength(data);
  457. }
  458. if (i > 0 && payloadSize > this.maxPayload) {
  459. debug("only send %d out of %d packets", i, this.writeBuffer.length);
  460. return this.writeBuffer.slice(0, i);
  461. }
  462. payloadSize += 2; // separator + packet type
  463. }
  464. debug("payload size is %d (max: %d)", payloadSize, this.maxPayload);
  465. return this.writeBuffer;
  466. }
  467. /**
  468. * Sends a message.
  469. *
  470. * @param {String} msg - message.
  471. * @param {Object} options.
  472. * @param {Function} callback function.
  473. * @return {Socket} for chaining.
  474. */
  475. write(msg, options, fn) {
  476. this.sendPacket("message", msg, options, fn);
  477. return this;
  478. }
  479. send(msg, options, fn) {
  480. this.sendPacket("message", msg, options, fn);
  481. return this;
  482. }
  483. /**
  484. * Sends a packet.
  485. *
  486. * @param {String} type: packet type.
  487. * @param {String} data.
  488. * @param {Object} options.
  489. * @param {Function} fn - callback function.
  490. * @private
  491. */
  492. sendPacket(type, data, options, fn) {
  493. if ("function" === typeof data) {
  494. fn = data;
  495. data = undefined;
  496. }
  497. if ("function" === typeof options) {
  498. fn = options;
  499. options = null;
  500. }
  501. if ("closing" === this.readyState || "closed" === this.readyState) {
  502. return;
  503. }
  504. options = options || {};
  505. options.compress = false !== options.compress;
  506. const packet = {
  507. type: type,
  508. data: data,
  509. options: options,
  510. };
  511. this.emitReserved("packetCreate", packet);
  512. this.writeBuffer.push(packet);
  513. if (fn)
  514. this.once("flush", fn);
  515. this.flush();
  516. }
  517. /**
  518. * Closes the connection.
  519. */
  520. close() {
  521. const close = () => {
  522. this.onClose("forced close");
  523. debug("socket closing - telling transport to close");
  524. this.transport.close();
  525. };
  526. const cleanupAndClose = () => {
  527. this.off("upgrade", cleanupAndClose);
  528. this.off("upgradeError", cleanupAndClose);
  529. close();
  530. };
  531. const waitForUpgrade = () => {
  532. // wait for upgrade to finish since we can't send packets while pausing a transport
  533. this.once("upgrade", cleanupAndClose);
  534. this.once("upgradeError", cleanupAndClose);
  535. };
  536. if ("opening" === this.readyState || "open" === this.readyState) {
  537. this.readyState = "closing";
  538. if (this.writeBuffer.length) {
  539. this.once("drain", () => {
  540. if (this.upgrading) {
  541. waitForUpgrade();
  542. }
  543. else {
  544. close();
  545. }
  546. });
  547. }
  548. else if (this.upgrading) {
  549. waitForUpgrade();
  550. }
  551. else {
  552. close();
  553. }
  554. }
  555. return this;
  556. }
  557. /**
  558. * Called upon transport error
  559. *
  560. * @private
  561. */
  562. onError(err) {
  563. debug("socket error %j", err);
  564. Socket.priorWebsocketSuccess = false;
  565. this.emitReserved("error", err);
  566. this.onClose("transport error", err);
  567. }
  568. /**
  569. * Called upon transport close.
  570. *
  571. * @private
  572. */
  573. onClose(reason, description) {
  574. if ("opening" === this.readyState ||
  575. "open" === this.readyState ||
  576. "closing" === this.readyState) {
  577. debug('socket close with reason: "%s"', reason);
  578. // clear timers
  579. this.clearTimeoutFn(this.pingTimeoutTimer);
  580. // stop event from firing again for transport
  581. this.transport.removeAllListeners("close");
  582. // ensure transport won't stay open
  583. this.transport.close();
  584. // ignore further transport communication
  585. this.transport.removeAllListeners();
  586. if (typeof removeEventListener === "function") {
  587. removeEventListener("beforeunload", this.beforeunloadEventListener, false);
  588. removeEventListener("offline", this.offlineEventListener, false);
  589. }
  590. // set ready state
  591. this.readyState = "closed";
  592. // clear session id
  593. this.id = null;
  594. // emit close event
  595. this.emitReserved("close", reason, description);
  596. // clean buffers after, so users can still
  597. // grab the buffers on `close` event
  598. this.writeBuffer = [];
  599. this.prevBufferLen = 0;
  600. }
  601. }
  602. /**
  603. * Filters upgrades, returning only those matching client transports.
  604. *
  605. * @param {Array} upgrades - server upgrades
  606. * @private
  607. */
  608. filterUpgrades(upgrades) {
  609. const filteredUpgrades = [];
  610. let i = 0;
  611. const j = upgrades.length;
  612. for (; i < j; i++) {
  613. if (~this.transports.indexOf(upgrades[i]))
  614. filteredUpgrades.push(upgrades[i]);
  615. }
  616. return filteredUpgrades;
  617. }
  618. }
  619. Socket.protocol = protocol;