| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 | var inherits = require('inherits')var EventEmitter = require('events').EventEmittermodule.exports = Queuemodule.exports.default = Queuefunction Queue (options) {  if (!(this instanceof Queue)) {    return new Queue(options)  }  EventEmitter.call(this)  options = options || {}  this.concurrency = options.concurrency || Infinity  this.timeout = options.timeout || 0  this.autostart = options.autostart || false  this.results = options.results || null  this.pending = 0  this.session = 0  this.running = false  this.jobs = []  this.timers = {}}inherits(Queue, EventEmitter)var arrayMethods = [  'pop',  'shift',  'indexOf',  'lastIndexOf']arrayMethods.forEach(function (method) {  Queue.prototype[method] = function () {    return Array.prototype[method].apply(this.jobs, arguments)  }})Queue.prototype.slice = function (begin, end) {  this.jobs = this.jobs.slice(begin, end)  return this}Queue.prototype.reverse = function () {  this.jobs.reverse()  return this}var arrayAddMethods = [  'push',  'unshift',  'splice']arrayAddMethods.forEach(function (method) {  Queue.prototype[method] = function () {    var methodResult = Array.prototype[method].apply(this.jobs, arguments)    if (this.autostart) {      this.start()    }    return methodResult  }})Object.defineProperty(Queue.prototype, 'length', {  get: function () {    return this.pending + this.jobs.length  }})Queue.prototype.start = function (cb) {  if (cb) {    callOnErrorOrEnd.call(this, cb)  }  this.running = true  if (this.pending >= this.concurrency) {    return  }  if (this.jobs.length === 0) {    if (this.pending === 0) {      done.call(this)    }    return  }  var self = this  var job = this.jobs.shift()  var once = true  var session = this.session  var timeoutId = null  var didTimeout = false  var resultIndex = null  var timeout = job.hasOwnProperty('timeout') ? job.timeout : this.timeout  function next (err, result) {    if (once && self.session === session) {      once = false      self.pending--      if (timeoutId !== null) {        delete self.timers[timeoutId]        clearTimeout(timeoutId)      }      if (err) {        self.emit('error', err, job)      } else if (didTimeout === false) {        if (resultIndex !== null) {          self.results[resultIndex] = Array.prototype.slice.call(arguments, 1)        }        self.emit('success', result, job)      }      if (self.session === session) {        if (self.pending === 0 && self.jobs.length === 0) {          done.call(self)        } else if (self.running) {          self.start()        }      }    }  }  if (timeout) {    timeoutId = setTimeout(function () {      didTimeout = true      if (self.listeners('timeout').length > 0) {        self.emit('timeout', next, job)      } else {        next()      }    }, timeout)    this.timers[timeoutId] = timeoutId  }  if (this.results) {    resultIndex = this.results.length    this.results[resultIndex] = null  }  this.pending++  self.emit('start', job)  var promise = job(next)  if (promise && promise.then && typeof promise.then === 'function') {    promise.then(function (result) {      return next(null, result)    }).catch(function (err) {      return next(err || true)    })  }  if (this.running && this.jobs.length > 0) {    this.start()  }}Queue.prototype.stop = function () {  this.running = false}Queue.prototype.end = function (err) {  clearTimers.call(this)  this.jobs.length = 0  this.pending = 0  done.call(this, err)}function clearTimers () {  for (var key in this.timers) {    var timeoutId = this.timers[key]    delete this.timers[key]    clearTimeout(timeoutId)  }}function callOnErrorOrEnd (cb) {  var self = this  this.on('error', onerror)  this.on('end', onend)  function onerror (err) { self.end(err) }  function onend (err) {    self.removeListener('error', onerror)    self.removeListener('end', onend)    cb(err, this.results)  }}function done (err) {  this.session++  this.running = false  this.emit('end', err)}
 |