con·cur·rent /kənˈkərənt/
adjective
- existing, happening, or done at the same time.
con·trol /kənˈtrōl/
verb
- determine the behavior or supervise the running of.
Think about the last time you went to the supermarket. After grabbing every item on your list, scoping out all the deals, and making sure you clipped the very best coupons from the weekly ad, you were faced with the most important decision of the trip; which register to choose. Since standing in line is not anyone’s favorite activity, your goal is to minimize the amount of time between when you choose a register and when the cashier starts scanning your items. The question: what’s the optimal strategy?
A typical scenario might be to scan the lines in the registers closest to you and choose the shortest one, maybe accounting for how many items the other people in line have, and hope for the best. While this will likely result in a satisfactory solution most of the time, there’s always those few times where someone ahead of you has an abnormality with their order. Maybe they need to have a price check done, maybe their card got declined and they need to pay with a check. Whatever the case, their order is taking longer than you anticipated and you notice other lines are going faster than yours. Ugh, there’s got to be a better way!
Fortunately, there is. Unfortunately, it’s not likely to be implemented in your supermarket, but let’s discuss it anyway. If you put all the shoppers waiting in line into a single queue and let the first person go to the next available register (and only let people go to the registers from the queue), the next person in line is guaranteed to get the best time on the next available register. Any abnormalities at any particular register can be handled without directly impacting a single shopper. Instead, the abnormality will be handled on it’s own term and that register will eventually be able to assist another patron.
Now, let’s deconstruct this scenario. We have a finite number of registers (processers), a variable number of customers (requests), and a desire to serve (process) the customers who have been waiting longest. Surely it’s not hard to see how this problem is applicable to computer science. We’ve already established our ideal solution, and since this article is dealing with Node JS specifically, the only thing left to figure out is implementation.
The basic implementation should do the following:
- Keep track of requests to process
- Process a certain number of requests up to a maximum number
- Process the next available request once a processor becomes free
Keeping that in mind, I’ve implemented the following:
function sleep (ms) {
return new Promise(resolve => setTimeout(resolve, ms))
}
async function concurrencyControl (Queue) {
while (true) {
if (Queue.concurrencyCount < Queue.maxConcurrency) {
Queue.concurrencyCount++
return
}
await sleep(100)
}
}
class Queue {
constructor ({ maxConcurrency, items = [] } = {}) {
this.items = []
this._executing = false
if (items.length) this.add(...items)
if (maxConcurrency !== undefined) this.maxConcurrency = maxConcurrency
}
add (...items) {
this.items.push(...items)
return this
}
clear () {
this.items.length = 0
return this
}
async start () {
if (this._executing) return
this._executing = true
this.concurrencyCount = 0
while (this._executing && this.items.length) {
await concurrencyControl(this)
if (this._executing === false) this.concurrencyCount--
else {
const next = this.items.shift()
if (next === undefined) this.concurrencyCount--
else Promise.resolve(next.func(...next.params)).then(() => this.concurrencyCount--)
}
}
while (this.concurrencyCount > 0) {
await sleep(100)
}
this._executing = false
}
stop () {
this._executing = false
}
}
The Queue class keeps track of a queue of items to process while maintaining information about how many concurrent processes can be running at a time. Once the maximum number is reached, the next request will wait until a previous request finishes. The concurrencyControl function makes sure a single Queue instance doesn’t handle more requests than its configured to and the sleep function lets Node clear up certain processes in order to handle others (Node is single threaded so we can’t just hold the current process indefinitely).
The test for this looks like the following:
(note: the sleep function and Queue class are the same as the above code block)
async function simulateRequest (name) {
const length = 200 + Math.floor(Math.random() * 1000)
console.log(`[Thread ${name}] Starting -- ${length}`)
await sleep(length)
console.log(`[Thread ${name}] Finishing`)
}
function generator (item) {
return { func: simulateRequest, params: [item] }
}
async function runQueue () {
const requeusts = [1, 2, 3, 4, 5, 6, 7, 8, 9]
const queue = new Queue({ maxConcurrency: 4, items: requeusts.map(generator) })
console.log('Starting queue')
Promise.resolve(queue.start()).then(() => {
console.log('Queue finished')
if (queue.items.length) {
console.log(`Remaining items in queue: ${queue.items.length}`)
}
})
await sleep(500)
queue.add(generator(10))
await sleep(200)
queue.add(generator(11))
await sleep(700)
queue.add(generator(12), generator(13), generator(14))
await sleep(300)
if (Math.floor(Math.random() * 2)) queue.stop()
}
The best way to look at the test output is to pull the code from github (https://github.com/crispwaters/limited-concurrency-queue) and run the test file through a terminal with the command: node .\src\queue.test.js
Sample terminal output might look like:
Starting queue
[Thread 1] Starting -- 568
[Thread 2] Starting -- 878
[Thread 3] Starting -- 787
[Thread 4] Starting -- 334
[Thread 4] Finishing
[Thread 5] Starting -- 929
[Thread 1] Finishing
[Thread 6] Starting -- 687
[Thread 3] Finishing
[Thread 7] Starting -- 999
[Thread 2] Finishing
[Thread 8] Starting -- 349
[Thread 8] Finishing
[Thread 6] Finishing
[Thread 9] Starting -- 544
[Thread 10] Starting -- 414
[Thread 5] Finishing
[Thread 11] Starting -- 502
[Thread 10] Finishing
[Thread 7] Finishing
[Thread 12] Starting -- 835
[Thread 13] Starting -- 460
[Thread 9] Finishing
[Thread 11] Finishing
[Thread 14] Starting -- 455
[Thread 13] Finishing
[Thread 14] Finishing
[Thread 12] Finishing
Queue finished
Starting queue
[Thread 1] Starting -- 641
[Thread 2] Starting -- 366
[Thread 3] Starting -- 645
[Thread 4] Starting -- 846
[Thread 2] Finishing
[Thread 5] Starting -- 1070
[Thread 1] Finishing
[Thread 3] Finishing
[Thread 6] Starting -- 424
[Thread 7] Starting -- 795
[Thread 4] Finishing
[Thread 8] Starting -- 417
[Thread 6] Finishing
[Thread 9] Starting -- 986
[Thread 8] Finishing
[Thread 10] Starting -- 825
[Thread 5] Finishing
[Thread 7] Finishing
[Thread 11] Starting -- 814
[Thread 12] Starting -- 1076
[Thread 9] Finishing
[Thread 13] Starting -- 889
[Thread 10] Finishing
[Thread 14] Starting -- 766
[Thread 11] Finishing
[Thread 12] Finishing
[Thread 14] Finishing
[Thread 13] Finishing
Queue finished
Starting queue
[Thread 1] Starting -- 794
[Thread 2] Starting -- 444
[Thread 3] Starting -- 1183
[Thread 4] Starting -- 741
[Thread 2] Finishing
[Thread 5] Starting -- 1174
[Thread 4] Finishing
[Thread 1] Finishing
[Thread 6] Starting -- 901
[Thread 7] Starting -- 905
[Thread 3] Finishing
[Thread 8] Starting -- 900
[Thread 5] Finishing
[Thread 6] Finishing
[Thread 7] Finishing
[Thread 8] Finishing
Queue finished
Remaining items in queue: 6
The output will always be a little different because of the use of random elements, but you should notice the following behavior:
- Requests are always begun in order they are inserted into the queue
- Items can be added to the queue after request processing has started
- If request processing is terminated, no new requests will be handled (though existing requests will still finish)
This is certainly not a production-ready example, but hopefully it provides more insight into how to manage handling multiple processes in node and gives some practical examples of using Promises and asynchronous functions in Node.
UPDATE:
I have published this package to NPM, please visit https://www.npmjs.com/package/limited-concurrency-queue for more information