AxiosTransformStream.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. 'use strict';
  2. import stream from 'stream';
  3. import utils from '../utils.js';
  4. import throttle from './throttle.js';
  5. import speedometer from './speedometer.js';
  6. const kInternals = Symbol('internals');
  7. class AxiosTransformStream extends stream.Transform{
  8. constructor(options) {
  9. options = utils.toFlatObject(options, {
  10. maxRate: 0,
  11. chunkSize: 64 * 1024,
  12. minChunkSize: 100,
  13. timeWindow: 500,
  14. ticksRate: 2,
  15. samplesCount: 15
  16. }, null, (prop, source) => {
  17. return !utils.isUndefined(source[prop]);
  18. });
  19. super({
  20. readableHighWaterMark: options.chunkSize
  21. });
  22. const self = this;
  23. const internals = this[kInternals] = {
  24. length: options.length,
  25. timeWindow: options.timeWindow,
  26. ticksRate: options.ticksRate,
  27. chunkSize: options.chunkSize,
  28. maxRate: options.maxRate,
  29. minChunkSize: options.minChunkSize,
  30. bytesSeen: 0,
  31. isCaptured: false,
  32. notifiedBytesLoaded: 0,
  33. ts: Date.now(),
  34. bytes: 0,
  35. onReadCallback: null
  36. };
  37. const _speedometer = speedometer(internals.ticksRate * options.samplesCount, internals.timeWindow);
  38. this.on('newListener', event => {
  39. if (event === 'progress') {
  40. if (!internals.isCaptured) {
  41. internals.isCaptured = true;
  42. }
  43. }
  44. });
  45. let bytesNotified = 0;
  46. internals.updateProgress = throttle(function throttledHandler() {
  47. const totalBytes = internals.length;
  48. const bytesTransferred = internals.bytesSeen;
  49. const progressBytes = bytesTransferred - bytesNotified;
  50. if (!progressBytes || self.destroyed) return;
  51. const rate = _speedometer(progressBytes);
  52. bytesNotified = bytesTransferred;
  53. process.nextTick(() => {
  54. self.emit('progress', {
  55. 'loaded': bytesTransferred,
  56. 'total': totalBytes,
  57. 'progress': totalBytes ? (bytesTransferred / totalBytes) : undefined,
  58. 'bytes': progressBytes,
  59. 'rate': rate ? rate : undefined,
  60. 'estimated': rate && totalBytes && bytesTransferred <= totalBytes ?
  61. (totalBytes - bytesTransferred) / rate : undefined
  62. });
  63. });
  64. }, internals.ticksRate);
  65. const onFinish = () => {
  66. internals.updateProgress(true);
  67. };
  68. this.once('end', onFinish);
  69. this.once('error', onFinish);
  70. }
  71. _read(size) {
  72. const internals = this[kInternals];
  73. if (internals.onReadCallback) {
  74. internals.onReadCallback();
  75. }
  76. return super._read(size);
  77. }
  78. _transform(chunk, encoding, callback) {
  79. const self = this;
  80. const internals = this[kInternals];
  81. const maxRate = internals.maxRate;
  82. const readableHighWaterMark = this.readableHighWaterMark;
  83. const timeWindow = internals.timeWindow;
  84. const divider = 1000 / timeWindow;
  85. const bytesThreshold = (maxRate / divider);
  86. const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0;
  87. function pushChunk(_chunk, _callback) {
  88. const bytes = Buffer.byteLength(_chunk);
  89. internals.bytesSeen += bytes;
  90. internals.bytes += bytes;
  91. if (internals.isCaptured) {
  92. internals.updateProgress();
  93. }
  94. if (self.push(_chunk)) {
  95. process.nextTick(_callback);
  96. } else {
  97. internals.onReadCallback = () => {
  98. internals.onReadCallback = null;
  99. process.nextTick(_callback);
  100. };
  101. }
  102. }
  103. const transformChunk = (_chunk, _callback) => {
  104. const chunkSize = Buffer.byteLength(_chunk);
  105. let chunkRemainder = null;
  106. let maxChunkSize = readableHighWaterMark;
  107. let bytesLeft;
  108. let passed = 0;
  109. if (maxRate) {
  110. const now = Date.now();
  111. if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) {
  112. internals.ts = now;
  113. bytesLeft = bytesThreshold - internals.bytes;
  114. internals.bytes = bytesLeft < 0 ? -bytesLeft : 0;
  115. passed = 0;
  116. }
  117. bytesLeft = bytesThreshold - internals.bytes;
  118. }
  119. if (maxRate) {
  120. if (bytesLeft <= 0) {
  121. // next time window
  122. return setTimeout(() => {
  123. _callback(null, _chunk);
  124. }, timeWindow - passed);
  125. }
  126. if (bytesLeft < maxChunkSize) {
  127. maxChunkSize = bytesLeft;
  128. }
  129. }
  130. if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) {
  131. chunkRemainder = _chunk.subarray(maxChunkSize);
  132. _chunk = _chunk.subarray(0, maxChunkSize);
  133. }
  134. pushChunk(_chunk, chunkRemainder ? () => {
  135. process.nextTick(_callback, null, chunkRemainder);
  136. } : _callback);
  137. };
  138. transformChunk(chunk, function transformNextChunk(err, _chunk) {
  139. if (err) {
  140. return callback(err);
  141. }
  142. if (_chunk) {
  143. transformChunk(_chunk, transformNextChunk);
  144. } else {
  145. callback(null);
  146. }
  147. });
  148. }
  149. setLength(length) {
  150. this[kInternals].length = +length;
  151. return this;
  152. }
  153. }
  154. export default AxiosTransformStream;