transport.js 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", {
  3. value: true
  4. });
  5. exports.PipeTransport = void 0;
  6. var _utils = require("../utils");
  7. /**
  8. * Copyright (c) Microsoft Corporation.
  9. *
  10. * Licensed under the Apache License, Version 2.0 (the "License");
  11. * you may not use this file except in compliance with the License.
  12. * You may obtain a copy of the License at
  13. *
  14. * http://www.apache.org/licenses/LICENSE-2.0
  15. *
  16. * Unless required by applicable law or agreed to in writing, software
  17. * distributed under the License is distributed on an "AS IS" BASIS,
  18. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  19. * See the License for the specific language governing permissions and
  20. * limitations under the License.
  21. */
  22. class PipeTransport {
  23. constructor(pipeWrite, pipeRead, closeable, endian = 'le') {
  24. this._pipeWrite = void 0;
  25. this._data = Buffer.from([]);
  26. this._waitForNextTask = (0, _utils.makeWaitForNextTask)();
  27. this._closed = false;
  28. this._bytesLeft = 0;
  29. this.onmessage = void 0;
  30. this.onclose = void 0;
  31. this._endian = void 0;
  32. this._closeableStream = void 0;
  33. this._pipeWrite = pipeWrite;
  34. this._endian = endian;
  35. this._closeableStream = closeable;
  36. pipeRead.on('data', buffer => this._dispatch(buffer));
  37. pipeRead.on('close', () => {
  38. this._closed = true;
  39. if (this.onclose) this.onclose();
  40. });
  41. this.onmessage = undefined;
  42. this.onclose = undefined;
  43. }
  44. send(message) {
  45. if (this._closed) throw new Error('Pipe has been closed');
  46. const data = Buffer.from(message, 'utf-8');
  47. const dataLength = Buffer.alloc(4);
  48. if (this._endian === 'be') dataLength.writeUInt32BE(data.length, 0);else dataLength.writeUInt32LE(data.length, 0);
  49. this._pipeWrite.write(dataLength);
  50. this._pipeWrite.write(data);
  51. }
  52. close() {
  53. // Let it throw.
  54. this._closeableStream.close();
  55. }
  56. _dispatch(buffer) {
  57. this._data = Buffer.concat([this._data, buffer]);
  58. while (true) {
  59. if (!this._bytesLeft && this._data.length < 4) {
  60. // Need more data.
  61. break;
  62. }
  63. if (!this._bytesLeft) {
  64. this._bytesLeft = this._endian === 'be' ? this._data.readUInt32BE(0) : this._data.readUInt32LE(0);
  65. this._data = this._data.slice(4);
  66. }
  67. if (!this._bytesLeft || this._data.length < this._bytesLeft) {
  68. // Need more data.
  69. break;
  70. }
  71. const message = this._data.slice(0, this._bytesLeft);
  72. this._data = this._data.slice(this._bytesLeft);
  73. this._bytesLeft = 0;
  74. this._waitForNextTask(() => {
  75. if (this.onmessage) this.onmessage(message.toString('utf-8'));
  76. });
  77. }
  78. }
  79. }
  80. exports.PipeTransport = PipeTransport;