body-streams.js 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", {
  3. value: true
  4. });
  5. exports.requestToBodyStream = requestToBodyStream;
  6. exports.bodyStreamToNodeStream = bodyStreamToNodeStream;
  7. exports.getClonableBody = getClonableBody;
  8. var _stream = require("stream");
  9. function requestToBodyStream(context, stream) {
  10. return new context.ReadableStream({
  11. start (controller) {
  12. stream.on("data", (chunk)=>controller.enqueue(chunk));
  13. stream.on("end", ()=>controller.close());
  14. stream.on("error", (err)=>controller.error(err));
  15. }
  16. });
  17. }
  18. function bodyStreamToNodeStream(bodyStream) {
  19. const reader = bodyStream.getReader();
  20. return _stream.Readable.from(async function*() {
  21. while(true){
  22. const { done , value } = await reader.read();
  23. if (done) {
  24. return;
  25. }
  26. yield value;
  27. }
  28. }());
  29. }
  30. function replaceRequestBody(base, stream) {
  31. for(const key in stream){
  32. let v = stream[key];
  33. if (typeof v === "function") {
  34. v = v.bind(base);
  35. }
  36. base[key] = v;
  37. }
  38. return base;
  39. }
  40. function getClonableBody(readable) {
  41. let buffered = null;
  42. const endPromise = new Promise((resolve, reject)=>{
  43. readable.on("end", resolve);
  44. readable.on("error", reject);
  45. }).catch((error)=>{
  46. return {
  47. error
  48. };
  49. });
  50. return {
  51. /**
  52. * Replaces the original request body if necessary.
  53. * This is done because once we read the body from the original request,
  54. * we can't read it again.
  55. */ async finalize () {
  56. if (buffered) {
  57. const res = await endPromise;
  58. if (res && typeof res === "object" && res.error) {
  59. throw res.error;
  60. }
  61. replaceRequestBody(readable, buffered);
  62. buffered = readable;
  63. }
  64. },
  65. /**
  66. * Clones the body stream
  67. * to pass into a middleware
  68. */ cloneBodyStream () {
  69. const input = buffered != null ? buffered : readable;
  70. const p1 = new _stream.PassThrough();
  71. const p2 = new _stream.PassThrough();
  72. input.on("data", (chunk)=>{
  73. p1.push(chunk);
  74. p2.push(chunk);
  75. });
  76. input.on("end", ()=>{
  77. p1.push(null);
  78. p2.push(null);
  79. });
  80. buffered = p2;
  81. return p1;
  82. }
  83. };
  84. }
  85. //# sourceMappingURL=body-streams.js.map