How to implement worker pools in Rust
Think twice. Worker pools are not a great fit for Rust due to its ownership model. Instead, embrace functional programming and immutable data. Rust provides simpler to use and more elegant tools: parallel Iterators and Streams.
Update: Someone (rightfully) pointed out that I didn't really define what my definition of a worker pool is, so here it is: I consider a worker pool to be some kind of class that encourages stateful and mutable data while providing methods to execute tasks concurrently.
It's important to note that, like when working with worker pools in any programming language, an upper limit for concurrency should always be set up. Otherwise, you may quickly exhaust the resources of your system.
Compute intensive jobs
For compute-intensive jobs (CPU bound), there is the rayon
crate which provides parallel Iterators: Iterators whose combinators are dispatched to a thread pool. The nice thing is that the thread pool is hidden from us, the developers. we just have to code as if we were using standard Iterators.
Cargo.toml
[package]
name = "rust_worker_pool"
version = "0.1.0"
authors = ["Sylvain Kerkour <sylvain@kerkour.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
rand = "0.8"
rayon = "1"
main.rs
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use std::time::Duration;
fn compute_job(job: i64) -> i64 {
let mut rng = thread_rng();
let sleep_ms: u64 = rng.gen_range(0..10);
std::thread::sleep(Duration::from_millis(sleep_ms));
job * job
}
fn process_result(result: i64) {
println!("{}", result);
}
fn main() {
let jobs = 0..100;
jobs.into_par_iter()
.map(compute_job)
.for_each(process_result);
}
By default, the thread pool has a size equal to the number of logical CPUs of the machine.
I/O intensive jobs
For I/O (Input/Output) bound jobs, we need to move to async
land. More precisely, we will use Streams
, which are async
Iterators that can process items concurrently.
But the Stream
trait does not provide combinators itself. We need to import the StreamExt
trait from the future
crate.
Cargo.toml
[package]
name = "rust_worker_pool"
version = "0.1.0"
authors = ["Sylvain Kerkour <sylvain@kerkour.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
rand = "0.8"
tokio = { version = "1", features = ["full"] }
futures = "0.3"
for_each_concurrent
for_each_concurrent
is the easiest to use as it consumes the Stream. It means that it does not return a Stream itself, but a Future, that can be .await
ed.
main.rs
use futures::{stream, StreamExt};
use rand::{thread_rng, Rng};
use std::time::Duration;
async fn compute_job(job: i64) -> i64 {
let mut rng = thread_rng();
let sleep_ms: u64 = rng.gen_range(0..10);
tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
job * job
}
async fn process_result(result: i64) {
println!("{}", result);
}
#[tokio::main]
async fn main() {
let jobs = 0..100;
let concurrency = 42;
stream::iter(jobs)
.for_each_concurrent(concurrency, |job| async move {
let result = compute_job(job).await;
process_result(result).await;
})
.await;
}
buffer_unordered
On the other hand, buffer_unordered
does not consume the Stream. This is why we need to use for_each
as a sink to consume the Stream.
main.rs
use futures::{stream, StreamExt};
use rand::{thread_rng, Rng};
use std::time::Duration;
async fn compute_job(job: i64) -> i64 {
let mut rng = thread_rng();
let sleep_ms: u64 = rng.gen_range(0..10);
tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
job * job
}
async fn process_result(result: i64) {
println!("{}", result);
}
#[tokio::main]
async fn main() {
let jobs = 0..100;
let concurrency = 42;
stream::iter(jobs)
.map(compute_job)
.buffer_unordered(concurrency)
.for_each(process_result)
.await;
}
Collecting results
Sometimes, instead of processing results directly we may need to collect them, to send them later in batch for example. Good news, the collect
method is available on Streams:
main.rs
use futures::{stream, StreamExt};
use rand::{thread_rng, Rng};
use std::time::Duration;
async fn compute_job(job: i64) -> i64 {
let mut rng = thread_rng();
let sleep_ms: u64 = rng.gen_range(0..10);
tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
job * job
}
async fn process_result(result: i64) {
println!("{}", result);
}
#[tokio::main]
async fn main() {
let jobs = 0..100;
let concurrency = 42;
let results: Vec<i64> = stream::iter(jobs)
.map(compute_job)
.buffer_unordered(concurrency)
.collect()
.await;
}
The code is on GitHub
As usual, you can find the code on GitHub: github.com/skerkour/kerkour.com (please don't forget to star the repo 🙏).