123456789101112131415161718192021222324252627282930313233343536373839 |
- //Promise并发控制池
- const EventEmitter = require('events')
- class PromisePool extends EventEmitter {
- constructor(concurrent, promiseCreator) {
- super()
- this.concurrent = concurrent || 5
- this.promiseCreator = promiseCreator
- }
- start(datas) {
- let self = this
- let index = 0
- let running = 0
- return new Promise(resolved => {
- let execute = () => {
- running++
- self.promiseCreator(datas[index++]).finally(() => {
- self.emit('count', index)
- running--
- if (running < self.concurrent && datas.length > index) {
- execute()
- } else if (datas.length == index && running == 0) {
- resolved()
- }
- })
- }
- while (running < self.concurrent && datas.length > index) {
- execute()
- }
- })
- }
- }
- exports.create = function(concurrent, promiseCreator) {
- return new PromisePool(concurrent, promiseCreator)
- }
|