stream.js 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. /**
  2. * @typedef {import('micromark-util-types').Options} Options
  3. * @typedef {import('micromark-util-types').Value} Value
  4. * @typedef {import('micromark-util-types').Encoding} Encoding
  5. */
  6. /**
  7. * @callback Callback
  8. * Function called when write was successful.
  9. * @returns {undefined}
  10. * Nothing.
  11. *
  12. * @typedef PipeOptions
  13. * @property {boolean | null | undefined} [end]
  14. *
  15. * @typedef {Omit<NodeJS.ReadableStream & NodeJS.WritableStream, 'isPaused' | 'pause' | 'read' | 'resume' | 'setEncoding' | 'unpipe' | 'unshift' | 'wrap'>} MinimalDuplex
  16. */
  17. import {EventEmitter} from 'node:events'
  18. import {compile} from './lib/compile.js'
  19. import {parse} from './lib/parse.js'
  20. import {postprocess} from './lib/postprocess.js'
  21. import {preprocess} from './lib/preprocess.js'
  22. /**
  23. * Create a duplex (readable and writable) stream.
  24. *
  25. * Some of the work to parse markdown can be done streaming, but in the
  26. * end buffering is required.
  27. *
  28. * micromark does not handle errors for you, so you must handle errors on whatever
  29. * streams you pipe into it.
  30. * As markdown does not know errors, `micromark` itself does not emit errors.
  31. *
  32. * @param {Options | null | undefined} [options]
  33. * Configuration (optional).
  34. * @returns {MinimalDuplex}
  35. * Duplex stream.
  36. */
  37. export function stream(options) {
  38. const prep = preprocess()
  39. const tokenize = parse(options).document().write
  40. const comp = compile(options)
  41. /** @type {boolean} */
  42. let ended
  43. /** @type {MinimalDuplex} */
  44. // @ts-expect-error `addListener` is fine.
  45. const emitter = Object.assign(new EventEmitter(), {
  46. end,
  47. pipe,
  48. readable: true,
  49. writable: true,
  50. write
  51. })
  52. return emitter
  53. /**
  54. * Write a chunk into memory.
  55. *
  56. * @overload
  57. * @param {Value | null | undefined} [chunk]
  58. * Slice of markdown to parse (`string` or `Uint8Array`).
  59. * @param {Encoding | null | undefined} [encoding]
  60. * Character encoding to understand `chunk` as when it’s a `Uint8Array`
  61. * (`string`, default: `'utf8'`).
  62. * @param {Callback | null | undefined} [callback]
  63. * Function called when write was successful.
  64. * @returns {boolean}
  65. * Whether write was successful.
  66. *
  67. * @overload
  68. * @param {Value | null | undefined} [chunk]
  69. * Slice of markdown to parse (`string` or `Uint8Array`).
  70. * @param {Callback | null | undefined} [callback]
  71. * Function called when write was successful.
  72. * @returns {boolean}
  73. * Whether write was successful.
  74. *
  75. * @param {Value | null | undefined} [chunk]
  76. * Slice of markdown to parse (`string` or `Uint8Array`).
  77. * @param {Callback | Encoding | null | undefined} [encoding]
  78. * Character encoding to understand `chunk` as when it’s a `Uint8Array`
  79. * (`string`, default: `'utf8'`).
  80. * @param {Callback | null | undefined} [callback]
  81. * Function called when write was successful.
  82. * @returns {boolean}
  83. * Whether write was successful.
  84. */
  85. function write(chunk, encoding, callback) {
  86. if (typeof encoding === 'function') {
  87. callback = encoding
  88. encoding = undefined
  89. }
  90. if (ended) {
  91. throw new Error('Did not expect `write` after `end`')
  92. }
  93. tokenize(prep(chunk || '', encoding))
  94. if (callback) {
  95. callback()
  96. }
  97. // Signal successful write.
  98. return true
  99. }
  100. /**
  101. * End the writing.
  102. *
  103. * Passes all arguments as a final `write`.
  104. *
  105. * @overload
  106. * @param {Value | null | undefined} [chunk]
  107. * Slice of markdown to parse (`string` or `Uint8Array`).
  108. * @param {Encoding | null | undefined} [encoding]
  109. * Character encoding to understand `chunk` as when it’s a `Uint8Array`
  110. * (`string`, default: `'utf8'`).
  111. * @param {Callback | null | undefined} [callback]
  112. * Function called when write was successful.
  113. * @returns {boolean}
  114. * Whether write was successful.
  115. *
  116. * @overload
  117. * @param {Value | null | undefined} [chunk]
  118. * Slice of markdown to parse (`string` or `Uint8Array`).
  119. * @param {Callback | null | undefined} [callback]
  120. * Function called when write was successful.
  121. * @returns {boolean}
  122. * Whether write was successful.
  123. *
  124. * @overload
  125. * @param {Callback | null | undefined} [callback]
  126. * Function called when write was successful.
  127. * @returns {boolean}
  128. *
  129. * @param {Callback | Value | null | undefined} [chunk]
  130. * Slice of markdown to parse (`string` or `Uint8Array`).
  131. * @param {Callback | Encoding | null | undefined} [encoding]
  132. * Character encoding to understand `chunk` as when it’s a `Uint8Array`
  133. * (`string`, default: `'utf8'`).
  134. * @param {Callback | null | undefined} [callback]
  135. * Function called when write was successful.
  136. * @returns {boolean}
  137. * Whether write was successful.
  138. */
  139. function end(chunk, encoding, callback) {
  140. if (typeof chunk === 'function') {
  141. encoding = chunk
  142. chunk = undefined
  143. }
  144. if (typeof encoding === 'function') {
  145. callback = encoding
  146. encoding = undefined
  147. }
  148. write(chunk, encoding, callback)
  149. emitter.emit('data', comp(postprocess(tokenize(prep('', encoding, true)))))
  150. emitter.emit('end')
  151. ended = true
  152. return true
  153. }
  154. /**
  155. * Pipe the processor into a writable stream.
  156. *
  157. * Basically `Stream#pipe`, but inlined and simplified to keep the bundled
  158. * size down.
  159. * See: <https://github.com/nodejs/node/blob/43a5170/lib/internal/streams/legacy.js#L13>.
  160. *
  161. * @template {NodeJS.WritableStream} Stream
  162. * @param {Stream} dest
  163. * @param {PipeOptions | null | undefined} [options]
  164. * @returns {Stream}
  165. */
  166. function pipe(dest, options) {
  167. emitter.on('data', ondata)
  168. emitter.on('error', onerror)
  169. emitter.on('end', cleanup)
  170. emitter.on('close', cleanup)
  171. // If the `end` option is not supplied, `dest.end()` will be
  172. // called when the `end` or `close` events are received.
  173. // @ts-expect-error `_isStdio` is available on `std{err,out}`
  174. if (!dest._isStdio && (!options || options.end !== false)) {
  175. emitter.on('end', onend)
  176. }
  177. dest.on('error', onerror)
  178. dest.on('close', cleanup)
  179. dest.emit('pipe', emitter)
  180. return dest
  181. /**
  182. * End destination stream.
  183. *
  184. * @returns {undefined}
  185. */
  186. function onend() {
  187. if (dest.end) {
  188. dest.end()
  189. }
  190. }
  191. /**
  192. * Handle data.
  193. *
  194. * @param {string} chunk
  195. * @returns {undefined}
  196. */
  197. function ondata(chunk) {
  198. if (dest.writable) {
  199. dest.write(chunk)
  200. }
  201. }
  202. /**
  203. * Clean listeners.
  204. *
  205. * @returns {undefined}
  206. */
  207. function cleanup() {
  208. emitter.removeListener('data', ondata)
  209. emitter.removeListener('end', onend)
  210. emitter.removeListener('error', onerror)
  211. emitter.removeListener('end', cleanup)
  212. emitter.removeListener('close', cleanup)
  213. dest.removeListener('error', onerror)
  214. dest.removeListener('close', cleanup)
  215. }
  216. /**
  217. * Close dangling pipes and handle unheard errors.
  218. *
  219. * @param {Error | null | undefined} [error]
  220. * @returns {undefined}
  221. */
  222. function onerror(error) {
  223. cleanup()
  224. if (!emitter.listenerCount('error')) {
  225. throw error // Unhandled stream error in pipe.
  226. }
  227. }
  228. }
  229. }