promise-pool.js 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. //Promise并发控制池
  2. const EventEmitter = require('events')
  3. class PromisePool extends EventEmitter {
  4. constructor(concurrent, promiseCreator) {
  5. super()
  6. this.concurrent = concurrent || 5
  7. this.promiseCreator = promiseCreator
  8. }
  9. start(datas) {
  10. let self = this
  11. let index = 0
  12. let running = 0
  13. return new Promise(resolved => {
  14. let execute = () => {
  15. running++
  16. self.promiseCreator(datas[index++]).finally(() => {
  17. self.emit('count', index)
  18. running--
  19. if (running < self.concurrent && datas.length > index) {
  20. execute()
  21. } else if (datas.length == index && running == 0) {
  22. resolved()
  23. }
  24. })
  25. }
  26. while (running < self.concurrent && datas.length > index) {
  27. execute()
  28. }
  29. })
  30. }
  31. }
  32. exports.create = function(concurrent, promiseCreator) {
  33. return new PromisePool(concurrent, promiseCreator)
  34. }