Mastodon Sylvain Kerkour - How to implement worker pools in Rust

How to implement worker pools in Rust

Think twice. Worker pools are not a great fit for Rust due to its ownership model. Instead, embrace functional programming and immutable data. Rust provides simpler to use and more elegant tools: parallel Iterators and Streams.

Update: Someone (rightfully) pointed out that I didn’t really define what my definition of a worker pool is, so here it is: I consider a worker pool to be some kind of class that encourages stateful and mutable data while providing methods to execute tasks concurrently.

It’s important to note that, like when working with worker pools in any programming language, an upper limit for concurrency should always be set up. Otherwise, you may quickly exhaust the resources of your system.

Compute intensive jobs

For compute-intensive jobs (CPU bound), there is the rayon crate which provides parallel Iterators: Iterators whose combinators are dispatched to a thread pool. The nice thing is that the thread pool is hidden from us, the developers. we just have to code as if we were using standard Iterators.

Cargo.toml

[package]
name = "rust_worker_pool"
version = "0.1.0"
authors = ["Sylvain Kerkour <sylvain@kerkour.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rand = "0.8"

rayon = "1"

main.rs

use rand::{thread_rng, Rng};
use rayon::prelude::*;
use std::time::Duration;

fn compute_job(job: i64) -> i64 {
    let mut rng = thread_rng();
    let sleep_ms: u64 = rng.gen_range(0..10);
    std::thread::sleep(Duration::from_millis(sleep_ms));

    job * job
}

fn process_result(result: i64) {
    println!("{}", result);
}

fn main() {
    let jobs = 0..100;

    jobs.into_par_iter()
        .map(compute_job)
        .for_each(process_result);
}

By default, the thread pool has a size equal to the number of logical CPUs of the machine.

I/O intensive jobs

For I/O (Input/Output) bound jobs, we need to move to async land. More precisely, we will use Streams, which are async Iterators that can process items concurrently.

But the Stream trait does not provide combinators itself. We need to import the StreamExt trait from the future crate.

Cargo.toml

[package]
name = "rust_worker_pool"
version = "0.1.0"
authors = ["Sylvain Kerkour <sylvain@kerkour.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rand = "0.8"

tokio = { version = "1", features = ["full"] }
futures = "0.3"

for_each_concurrent

for_each_concurrent is the easiest to use as it consumes the Stream. It means that it does not return a Stream itself, but a Future, that can be .awaited.

main.rs

use futures::{stream, StreamExt};
use rand::{thread_rng, Rng};
use std::time::Duration;

async fn compute_job(job: i64) -> i64 {
    let mut rng = thread_rng();
    let sleep_ms: u64 = rng.gen_range(0..10);
    tokio::time::sleep(Duration::from_millis(sleep_ms)).await;

    job * job
}

async fn process_result(result: i64) {
    println!("{}", result);
}

#[tokio::main]
async fn main() {
    let jobs = 0..100;
    let concurrency = 42;

    stream::iter(jobs)
        .for_each_concurrent(concurrency, |job| async move {
            let result = compute_job(job).await;
            process_result(result).await;
        })
        .await;
}

I'll write you once a week about software development, hacking, and entrepreneurship.
You can subscribe by Email, Matrix or RSS.

buffer_unordered

On the other hand, buffer_unordered does not consume the Stream. This is why we need to use for_each as a sink to consume the Stream.

main.rs

use futures::{stream, StreamExt};
use rand::{thread_rng, Rng};
use std::time::Duration;

async fn compute_job(job: i64) -> i64 {
    let mut rng = thread_rng();
    let sleep_ms: u64 = rng.gen_range(0..10);
    tokio::time::sleep(Duration::from_millis(sleep_ms)).await;

    job * job
}

async fn process_result(result: i64) {
    println!("{}", result);
}

#[tokio::main]
async fn main() {
    let jobs = 0..100;
    let concurrency = 42;

    stream::iter(jobs)
        .map(compute_job)
        .buffer_unordered(concurrency)
        .for_each(process_result)
        .await;
}

Collecting results

Sometimes, instead of processing results directly we may need to collect them, to send them later in batch for example. Good news, the collect method is available on Streams:

main.rs

use futures::{stream, StreamExt};
use rand::{thread_rng, Rng};
use std::time::Duration;

async fn compute_job(job: i64) -> i64 {
    let mut rng = thread_rng();
    let sleep_ms: u64 = rng.gen_range(0..10);
    tokio::time::sleep(Duration::from_millis(sleep_ms)).await;

    job * job
}

async fn process_result(result: i64) {
    println!("{}", result);
}

#[tokio::main]
async fn main() {
    let jobs = 0..100;
    let concurrency = 42;

    let results: Vec<i64> = stream::iter(jobs)
        .map(compute_job)
        .buffer_unordered(concurrency)
        .collect()
        .await;
}

You can find the complete code for this article on GitHub: https://github.com/skerkour/kerkour.com/tree/main/2021/rust_worker_pool

P.S: I'm starting a new way to subscribe to this blog: A Matrix room :)
Join us on #blog:kerkour.com

Want to learn Rust and offensive security? Take a look at my book Black Hat Rust. All early-access supporters get a special discount and awesome bonuses: https://academy.kerkour.com/black-hat-rust?coupon=BLOG.
Warning: this offer is limited in time!

I'll write you once a week about software development, hacking, and entrepreneurship.
You can subscribe by Email, Matrix or RSS.

Tags: programming, rust

Related posts