123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- var inherits = require('inherits')
- var EventEmitter = require('events').EventEmitter
- module.exports = Queue
- module.exports.default = Queue
- function Queue (options) {
- if (!(this instanceof Queue)) {
- return new Queue(options)
- }
- EventEmitter.call(this)
- options = options || {}
- this.concurrency = options.concurrency || Infinity
- this.timeout = options.timeout || 0
- this.autostart = options.autostart || false
- this.results = options.results || null
- this.pending = 0
- this.session = 0
- this.running = false
- this.jobs = []
- this.timers = {}
- }
- inherits(Queue, EventEmitter)
- var arrayMethods = [
- 'pop',
- 'shift',
- 'indexOf',
- 'lastIndexOf'
- ]
- arrayMethods.forEach(function (method) {
- Queue.prototype[method] = function () {
- return Array.prototype[method].apply(this.jobs, arguments)
- }
- })
- Queue.prototype.slice = function (begin, end) {
- this.jobs = this.jobs.slice(begin, end)
- return this
- }
- Queue.prototype.reverse = function () {
- this.jobs.reverse()
- return this
- }
- var arrayAddMethods = [
- 'push',
- 'unshift',
- 'splice'
- ]
- arrayAddMethods.forEach(function (method) {
- Queue.prototype[method] = function () {
- var methodResult = Array.prototype[method].apply(this.jobs, arguments)
- if (this.autostart) {
- this.start()
- }
- return methodResult
- }
- })
- Object.defineProperty(Queue.prototype, 'length', {
- get: function () {
- return this.pending + this.jobs.length
- }
- })
- Queue.prototype.start = function (cb) {
- if (cb) {
- callOnErrorOrEnd.call(this, cb)
- }
- this.running = true
- if (this.pending >= this.concurrency) {
- return
- }
- if (this.jobs.length === 0) {
- if (this.pending === 0) {
- done.call(this)
- }
- return
- }
- var self = this
- var job = this.jobs.shift()
- var once = true
- var session = this.session
- var timeoutId = null
- var didTimeout = false
- var resultIndex = null
- var timeout = job.hasOwnProperty('timeout') ? job.timeout : this.timeout
- function next (err, result) {
- if (once && self.session === session) {
- once = false
- self.pending--
- if (timeoutId !== null) {
- delete self.timers[timeoutId]
- clearTimeout(timeoutId)
- }
- if (err) {
- self.emit('error', err, job)
- } else if (didTimeout === false) {
- if (resultIndex !== null) {
- self.results[resultIndex] = Array.prototype.slice.call(arguments, 1)
- }
- self.emit('success', result, job)
- }
- if (self.session === session) {
- if (self.pending === 0 && self.jobs.length === 0) {
- done.call(self)
- } else if (self.running) {
- self.start()
- }
- }
- }
- }
- if (timeout) {
- timeoutId = setTimeout(function () {
- didTimeout = true
- if (self.listeners('timeout').length > 0) {
- self.emit('timeout', next, job)
- } else {
- next()
- }
- }, timeout)
- this.timers[timeoutId] = timeoutId
- }
- if (this.results) {
- resultIndex = this.results.length
- this.results[resultIndex] = null
- }
- this.pending++
- self.emit('start', job)
- var promise = job(next)
- if (promise && promise.then && typeof promise.then === 'function') {
- promise.then(function (result) {
- return next(null, result)
- }).catch(function (err) {
- return next(err || true)
- })
- }
- if (this.running && this.jobs.length > 0) {
- this.start()
- }
- }
- Queue.prototype.stop = function () {
- this.running = false
- }
- Queue.prototype.end = function (err) {
- clearTimers.call(this)
- this.jobs.length = 0
- this.pending = 0
- done.call(this, err)
- }
- function clearTimers () {
- for (var key in this.timers) {
- var timeoutId = this.timers[key]
- delete this.timers[key]
- clearTimeout(timeoutId)
- }
- }
- function callOnErrorOrEnd (cb) {
- var self = this
- this.on('error', onerror)
- this.on('end', onend)
- function onerror (err) { self.end(err) }
- function onend (err) {
- self.removeListener('error', onerror)
- self.removeListener('end', onend)
- cb(err, this.results)
- }
- }
- function done (err) {
- this.session++
- this.running = false
- this.emit('end', err)
- }
|