index.js 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. var inherits = require('inherits')
  2. var EventEmitter = require('events').EventEmitter
  3. module.exports = Queue
  4. module.exports.default = Queue
  5. function Queue (options) {
  6. if (!(this instanceof Queue)) {
  7. return new Queue(options)
  8. }
  9. EventEmitter.call(this)
  10. options = options || {}
  11. this.concurrency = options.concurrency || Infinity
  12. this.timeout = options.timeout || 0
  13. this.autostart = options.autostart || false
  14. this.results = options.results || null
  15. this.pending = 0
  16. this.session = 0
  17. this.running = false
  18. this.jobs = []
  19. this.timers = {}
  20. }
  21. inherits(Queue, EventEmitter)
  22. var arrayMethods = [
  23. 'pop',
  24. 'shift',
  25. 'indexOf',
  26. 'lastIndexOf'
  27. ]
  28. arrayMethods.forEach(function (method) {
  29. Queue.prototype[method] = function () {
  30. return Array.prototype[method].apply(this.jobs, arguments)
  31. }
  32. })
  33. Queue.prototype.slice = function (begin, end) {
  34. this.jobs = this.jobs.slice(begin, end)
  35. return this
  36. }
  37. Queue.prototype.reverse = function () {
  38. this.jobs.reverse()
  39. return this
  40. }
  41. var arrayAddMethods = [
  42. 'push',
  43. 'unshift',
  44. 'splice'
  45. ]
  46. arrayAddMethods.forEach(function (method) {
  47. Queue.prototype[method] = function () {
  48. var methodResult = Array.prototype[method].apply(this.jobs, arguments)
  49. if (this.autostart) {
  50. this.start()
  51. }
  52. return methodResult
  53. }
  54. })
  55. Object.defineProperty(Queue.prototype, 'length', {
  56. get: function () {
  57. return this.pending + this.jobs.length
  58. }
  59. })
  60. Queue.prototype.start = function (cb) {
  61. if (cb) {
  62. callOnErrorOrEnd.call(this, cb)
  63. }
  64. this.running = true
  65. if (this.pending >= this.concurrency) {
  66. return
  67. }
  68. if (this.jobs.length === 0) {
  69. if (this.pending === 0) {
  70. done.call(this)
  71. }
  72. return
  73. }
  74. var self = this
  75. var job = this.jobs.shift()
  76. var once = true
  77. var session = this.session
  78. var timeoutId = null
  79. var didTimeout = false
  80. var resultIndex = null
  81. var timeout = job.hasOwnProperty('timeout') ? job.timeout : this.timeout
  82. function next (err, result) {
  83. if (once && self.session === session) {
  84. once = false
  85. self.pending--
  86. if (timeoutId !== null) {
  87. delete self.timers[timeoutId]
  88. clearTimeout(timeoutId)
  89. }
  90. if (err) {
  91. self.emit('error', err, job)
  92. } else if (didTimeout === false) {
  93. if (resultIndex !== null) {
  94. self.results[resultIndex] = Array.prototype.slice.call(arguments, 1)
  95. }
  96. self.emit('success', result, job)
  97. }
  98. if (self.session === session) {
  99. if (self.pending === 0 && self.jobs.length === 0) {
  100. done.call(self)
  101. } else if (self.running) {
  102. self.start()
  103. }
  104. }
  105. }
  106. }
  107. if (timeout) {
  108. timeoutId = setTimeout(function () {
  109. didTimeout = true
  110. if (self.listeners('timeout').length > 0) {
  111. self.emit('timeout', next, job)
  112. } else {
  113. next()
  114. }
  115. }, timeout)
  116. this.timers[timeoutId] = timeoutId
  117. }
  118. if (this.results) {
  119. resultIndex = this.results.length
  120. this.results[resultIndex] = null
  121. }
  122. this.pending++
  123. self.emit('start', job)
  124. var promise = job(next)
  125. if (promise && promise.then && typeof promise.then === 'function') {
  126. promise.then(function (result) {
  127. return next(null, result)
  128. }).catch(function (err) {
  129. return next(err || true)
  130. })
  131. }
  132. if (this.running && this.jobs.length > 0) {
  133. this.start()
  134. }
  135. }
  136. Queue.prototype.stop = function () {
  137. this.running = false
  138. }
  139. Queue.prototype.end = function (err) {
  140. clearTimers.call(this)
  141. this.jobs.length = 0
  142. this.pending = 0
  143. done.call(this, err)
  144. }
  145. function clearTimers () {
  146. for (var key in this.timers) {
  147. var timeoutId = this.timers[key]
  148. delete this.timers[key]
  149. clearTimeout(timeoutId)
  150. }
  151. }
  152. function callOnErrorOrEnd (cb) {
  153. var self = this
  154. this.on('error', onerror)
  155. this.on('end', onend)
  156. function onerror (err) { self.end(err) }
  157. function onend (err) {
  158. self.removeListener('error', onerror)
  159. self.removeListener('end', onend)
  160. cb(err, this.results)
  161. }
  162. }
  163. function done (err) {
  164. this.session++
  165. this.running = false
  166. this.emit('end', err)
  167. }