How to use a Concurrent Task Queue in your Redux-Sagas
Learn what a concurrent task queue is, the best use cases, and how to write one.
The queue is one of the most used data structures.
You probably use it every day when you shop for groceries (even online) or when you send a text message to your friends.
The concurrent task queue is a very powerful pattern that can really help you handle tasks over time or improve your performance.
Let’s start with the basics
What is a Queue? 🚶🚶🚶
A queue is a linear structure in which values are added at one end and removed from the other. This discipline gives rise to a first-in/first-out behavior (FIFO) that is the defining feature of queues. The two fundamental queue operations are enqueued (add to back) and dequeue (remove from the front) (source).
Ok, when should we use it?
Use a queue when you need to maintain the order of events and process the value by that order.
Great, you convinced me! But why do I need the concurrency thing?
As I mentioned above, a queue is able to process one value at a time. But sometimes it’s not fast enough.
Consider the following case 🙌:
You are at your favorite grocery store and have just arrived at the cashier, but unfortunately, there are many people waiting. To speed up the process, the store opened several more registers and each additional cashier has its own queue. So you just have to choose one. If one of the cashiers is having a technical problem or they’re just slow, that queue will be delayed even if the other slots are free.
Concurrent task queue to the rescue! 💪
We will use only one queue for our purposes. In that way, every time a slot becomes free, we will dequeue a person from the queue and send him/her to the free slot.
Hooray! 🎉
Let’s examine a use case
Last week, I was working on a Google Chrome extension that sniffs and downloads HLS streams (HTTP Live stream).
HLS streams are combined from multiple chunks that are fetched one by one and streamed to your browser as a single video. You can have thousands of files per stream, and you need to download them all.
We will use our beloved queue to speed up the process and make sure that one slow fetch is not gonna hold up the others.
TL;DR: here’s the code
<a href="https://medium.com/media/e3e893d2c8e3dfbf70703ed75ed07963/href">https://medium.com/media/e3e893d2c8e3dfbf70703ed75ed07963/href</a>
Now let’s look at it piece-by-piece.
1. The handler
<a href="https://medium.com/media/10cd4512fe167910e06fdedc4559f221/href">https://medium.com/media/10cd4512fe167910e06fdedc4559f221/href</a>
This simple handler gets the URI from the payload and then:
fetches the chunk
transforms it to a blob
emits a chunk-ready redux event
gets the current count of ready chunks
checks if it’s “all done”
2. Create the queue
<a href="https://medium.com/media/01b3b84e98100cbcaabc663269e353d7/href">https://medium.com/media/01b3b84e98100cbcaabc663269e353d7/href</a>
Using the handler, we create a new queue with 5 workers. We get back the watcher task and a queue channel. Then we are going to run (fork) the watcher task so it will start listening to tasks.
3. Push the tasks
<a href="https://medium.com/media/03897e90b6ebc2b002f08d2307c018e4/href">https://medium.com/media/03897e90b6ebc2b002f08d2307c018e4/href</a>
We map all the segments to a put task (into the queue channel that we got back), and then we fire all the tasks together.
4. Wait for all the chunks to be ready or for the action to be cancelled
<a href="https://medium.com/media/56b752d8889ce6e2e30bce01cf7cdb71/href">https://medium.com/media/56b752d8889ce6e2e30bce01cf7cdb71/href</a>
Now we are waiting for the first action to be called all-done or to be canceled. After that, we can cancel the watcher and act according to the action that has been received.
And that’s it!
If you want to see it live, visit https://github.com/puemos/hls-downloader, and download the Chrome extension.
I hope you learned something new! If you have any questions, please comment below so everybody can benefit.