queue.js 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. 'use strict'
  2. var reusify = require('reusify')
  3. function fastqueue (context, worker, concurrency) {
  4. if (typeof context === 'function') {
  5. concurrency = worker
  6. worker = context
  7. context = null
  8. }
  9. var cache = reusify(Task)
  10. var queueHead = null
  11. var queueTail = null
  12. var _running = 0
  13. var errorHandler = null
  14. var self = {
  15. push: push,
  16. drain: noop,
  17. saturated: noop,
  18. pause: pause,
  19. paused: false,
  20. concurrency: concurrency,
  21. running: running,
  22. resume: resume,
  23. idle: idle,
  24. length: length,
  25. getQueue: getQueue,
  26. unshift: unshift,
  27. empty: noop,
  28. kill: kill,
  29. killAndDrain: killAndDrain,
  30. error: error
  31. }
  32. return self
  33. function running () {
  34. return _running
  35. }
  36. function pause () {
  37. self.paused = true
  38. }
  39. function length () {
  40. var current = queueHead
  41. var counter = 0
  42. while (current) {
  43. current = current.next
  44. counter++
  45. }
  46. return counter
  47. }
  48. function getQueue () {
  49. var current = queueHead
  50. var tasks = []
  51. while (current) {
  52. tasks.push(current.value)
  53. current = current.next
  54. }
  55. return tasks
  56. }
  57. function resume () {
  58. if (!self.paused) return
  59. self.paused = false
  60. for (var i = 0; i < self.concurrency; i++) {
  61. _running++
  62. release()
  63. }
  64. }
  65. function idle () {
  66. return _running === 0 && self.length() === 0
  67. }
  68. function push (value, done) {
  69. var current = cache.get()
  70. current.context = context
  71. current.release = release
  72. current.value = value
  73. current.callback = done || noop
  74. current.errorHandler = errorHandler
  75. if (_running === self.concurrency || self.paused) {
  76. if (queueTail) {
  77. queueTail.next = current
  78. queueTail = current
  79. } else {
  80. queueHead = current
  81. queueTail = current
  82. self.saturated()
  83. }
  84. } else {
  85. _running++
  86. worker.call(context, current.value, current.worked)
  87. }
  88. }
  89. function unshift (value, done) {
  90. var current = cache.get()
  91. current.context = context
  92. current.release = release
  93. current.value = value
  94. current.callback = done || noop
  95. if (_running === self.concurrency || self.paused) {
  96. if (queueHead) {
  97. current.next = queueHead
  98. queueHead = current
  99. } else {
  100. queueHead = current
  101. queueTail = current
  102. self.saturated()
  103. }
  104. } else {
  105. _running++
  106. worker.call(context, current.value, current.worked)
  107. }
  108. }
  109. function release (holder) {
  110. if (holder) {
  111. cache.release(holder)
  112. }
  113. var next = queueHead
  114. if (next) {
  115. if (!self.paused) {
  116. if (queueTail === queueHead) {
  117. queueTail = null
  118. }
  119. queueHead = next.next
  120. next.next = null
  121. worker.call(context, next.value, next.worked)
  122. if (queueTail === null) {
  123. self.empty()
  124. }
  125. } else {
  126. _running--
  127. }
  128. } else if (--_running === 0) {
  129. self.drain()
  130. }
  131. }
  132. function kill () {
  133. queueHead = null
  134. queueTail = null
  135. self.drain = noop
  136. }
  137. function killAndDrain () {
  138. queueHead = null
  139. queueTail = null
  140. self.drain()
  141. self.drain = noop
  142. }
  143. function error (handler) {
  144. errorHandler = handler
  145. }
  146. }
  147. function noop () {}
  148. function Task () {
  149. this.value = null
  150. this.callback = noop
  151. this.next = null
  152. this.release = noop
  153. this.context = null
  154. this.errorHandler = null
  155. var self = this
  156. this.worked = function worked (err, result) {
  157. var callback = self.callback
  158. var errorHandler = self.errorHandler
  159. var val = self.value
  160. self.value = null
  161. self.callback = noop
  162. if (self.errorHandler) {
  163. errorHandler(err, val)
  164. }
  165. callback.call(self.context, err, result)
  166. self.release(self)
  167. }
  168. }
  169. module.exports = fastqueue