streamDispatcher.js 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", {
  3. value: true
  4. });
  5. exports.StreamDispatcher = void 0;
  6. var _dispatcher = require("./dispatcher");
  7. var _utils = require("../../utils");
  8. /**
  9. * Copyright (c) Microsoft Corporation.
  10. *
  11. * Licensed under the Apache License, Version 2.0 (the 'License");
  12. * you may not use this file except in compliance with the License.
  13. * You may obtain a copy of the License at
  14. *
  15. * http://www.apache.org/licenses/LICENSE-2.0
  16. *
  17. * Unless required by applicable law or agreed to in writing, software
  18. * distributed under the License is distributed on an "AS IS" BASIS,
  19. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  20. * See the License for the specific language governing permissions and
  21. * limitations under the License.
  22. */
  23. class StreamDispatcher extends _dispatcher.Dispatcher {
  24. constructor(scope, stream) {
  25. super(scope, {
  26. guid: 'stream@' + (0, _utils.createGuid)(),
  27. stream
  28. }, 'Stream', {});
  29. // In Node v12.9.0+ we can use readableEnded.
  30. this._type_Stream = true;
  31. this._ended = false;
  32. stream.once('end', () => this._ended = true);
  33. stream.once('error', () => this._ended = true);
  34. }
  35. async read(params) {
  36. const stream = this._object.stream;
  37. if (this._ended) return {
  38. binary: Buffer.from('')
  39. };
  40. if (!stream.readableLength) {
  41. const readyPromise = new _utils.ManualPromise();
  42. const done = () => readyPromise.resolve();
  43. stream.on('readable', done);
  44. stream.on('end', done);
  45. stream.on('error', done);
  46. await readyPromise;
  47. stream.off('readable', done);
  48. stream.off('end', done);
  49. stream.off('error', done);
  50. }
  51. const buffer = stream.read(Math.min(stream.readableLength, params.size || stream.readableLength));
  52. return {
  53. binary: buffer || Buffer.from('')
  54. };
  55. }
  56. async close() {
  57. this._object.stream.destroy();
  58. }
  59. }
  60. exports.StreamDispatcher = StreamDispatcher;