index.js 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. 'use strict';
  2. const from = require('from2');
  3. const pIsPromise = require('p-is-promise');
  4. const intoStream = input => {
  5. if (Array.isArray(input)) {
  6. input = input.slice();
  7. }
  8. let promise;
  9. let iterator;
  10. let asyncIterator;
  11. prepare(input);
  12. function prepare(value) {
  13. input = value;
  14. if (
  15. input instanceof ArrayBuffer ||
  16. (ArrayBuffer.isView(input) && !Buffer.isBuffer(input))
  17. ) {
  18. input = Buffer.from(input);
  19. }
  20. promise = pIsPromise(input) ? input : null;
  21. // We don't iterate on strings and buffers since slicing them is ~7x faster
  22. const shouldIterate = !promise && input[Symbol.iterator] && typeof input !== 'string' && !Buffer.isBuffer(input);
  23. iterator = shouldIterate ? input[Symbol.iterator]() : null;
  24. const shouldAsyncIterate = !promise && input[Symbol.asyncIterator];
  25. asyncIterator = shouldAsyncIterate ? input[Symbol.asyncIterator]() : null;
  26. }
  27. return from(function reader(size, callback) {
  28. if (promise) {
  29. (async () => {
  30. try {
  31. await prepare(await promise);
  32. reader.call(this, size, callback);
  33. } catch (error) {
  34. callback(error);
  35. }
  36. })();
  37. return;
  38. }
  39. if (iterator) {
  40. const object = iterator.next();
  41. setImmediate(callback, null, object.done ? null : object.value);
  42. return;
  43. }
  44. if (asyncIterator) {
  45. (async () => {
  46. try {
  47. const object = await asyncIterator.next();
  48. setImmediate(callback, null, object.done ? null : object.value);
  49. } catch (error) {
  50. setImmediate(callback, error);
  51. }
  52. })();
  53. return;
  54. }
  55. if (input.length === 0) {
  56. setImmediate(callback, null, null);
  57. return;
  58. }
  59. const chunk = input.slice(0, size);
  60. input = input.slice(size);
  61. setImmediate(callback, null, chunk);
  62. });
  63. };
  64. module.exports = intoStream;
  65. module.exports.object = input => {
  66. if (Array.isArray(input)) {
  67. input = input.slice();
  68. }
  69. let promise;
  70. let iterator;
  71. let asyncIterator;
  72. prepare(input);
  73. function prepare(value) {
  74. input = value;
  75. promise = pIsPromise(input) ? input : null;
  76. iterator = !promise && input[Symbol.iterator] ? input[Symbol.iterator]() : null;
  77. asyncIterator = !promise && input[Symbol.asyncIterator] ? input[Symbol.asyncIterator]() : null;
  78. }
  79. return from.obj(function reader(size, callback) {
  80. if (promise) {
  81. (async () => {
  82. try {
  83. await prepare(await promise);
  84. reader.call(this, size, callback);
  85. } catch (error) {
  86. callback(error);
  87. }
  88. })();
  89. return;
  90. }
  91. if (iterator) {
  92. const object = iterator.next();
  93. setImmediate(callback, null, object.done ? null : object.value);
  94. return;
  95. }
  96. if (asyncIterator) {
  97. (async () => {
  98. try {
  99. const object = await asyncIterator.next();
  100. setImmediate(callback, null, object.done ? null : object.value);
  101. } catch (error) {
  102. setImmediate(callback, error);
  103. }
  104. })();
  105. return;
  106. }
  107. this.push(input);
  108. setImmediate(callback, null, null);
  109. });
  110. };