Mastodon How to implement worker pools in Rust

How to implement worker pools in Rust

Think twice. Worker pools are not a great fit for Rust due to its ownership model. Instead, embrace functional programming and immutable data. Rust provides simpler to use and more elegant tools: parallel Iterators and Streams.

Update: Someone (rightfully) pointed out that I didn’t really define what my definition of a worker pool is, so here it is: I consider a worker pool to be some kind of class that encourages stateful and mutable data while providing methods to execute tasks concurrently.

It’s important to note that, like when working with worker pools in any programming language, an upper limit for concurrency should always be set up. Otherwise, you may quickly exhaust the resources of your system.

Compute intensive jobs

For compute-intensive jobs (CPU bound), there is the rayon crate which provides parallel Iterators: Iterators whose combinators are dispatched to a thread pool. The nice thing is that the thread pool is hidden from us, the developers. we just have to code as if we were using standard Iterators.

Cargo.toml

[package]
name = "rust_worker_pool"
version = "0.1.0"
authors = ["Sylvain Kerkour <sylvain@kerkour.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rand = "0.8"

rayon = "1"

main.rs

use rand::{thread_rng, Rng};
use rayon::prelude::*;
use std::time::Duration;

fn compute_job(job: i64) -> i64 {
    let mut rng = thread_rng();
    let sleep_ms: u64 = rng.gen_range(0..10);
    std::thread::sleep(Duration::from_millis(sleep_ms));

    job * job
}

fn process_result(result: i64) {
    println!("{}", result);
}

fn main() {
    let jobs = 0..100;

    jobs.into_par_iter()
        .map(compute_job)
        .for_each(process_result);
}

By default, the thread pool has a size equal to the number of logical CPUs of the machine.

I/O intensive jobs

For I/O (Input/Output) bound jobs, we need to move to async land. More precisely, we will use Streams, which are async Iterators that can process items concurrently.

But the Stream trait does not provide combinators itself. We need to import the StreamExt trait from the future crate.

Cargo.toml

[package]
name = "rust_worker_pool"
version = "0.1.0"
authors = ["Sylvain Kerkour <sylvain@kerkour.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rand = "0.8"

tokio = { version = "1", features = ["full"] }
futures = "0.3"

for_each_concurrent

for_each_concurrent is the easiest to use as it consumes the Stream. It means that it does not return a Stream itself, but a Future, that can be .awaited.

main.rs

use futures::{stream, StreamExt};
use rand::{thread_rng, Rng};
use std::time::Duration;

async fn compute_job(job: i64) -> i64 {
    let mut rng = thread_rng();
    let sleep_ms: u64 = rng.gen_range(0..10);
    tokio::time::sleep(Duration::from_millis(sleep_ms)).await;

    job * job
}

async fn process_result(result: i64) {
    println!("{}", result);
}

#[tokio::main]
async fn main() {
    let jobs = 0..100;
    let concurrency = 42;

    stream::iter(jobs)
        .for_each_concurrent(concurrency, |job| async move {
            let result = compute_job(job).await;
            process_result(result).await;
        })
        .await;
}

1 email / week to learn how to (ab)use technology for fun & profit: Programming, Hacking & Entrepreneurship.

buffer_unordered

On the other hand, buffer_unordered does not consume the Stream. This is why we need to use for_each as a sink to consume the Stream.

main.rs

use futures::{stream, StreamExt};
use rand::{thread_rng, Rng};
use std::time::Duration;

async fn compute_job(job: i64) -> i64 {
    let mut rng = thread_rng();
    let sleep_ms: u64 = rng.gen_range(0..10);
    tokio::time::sleep(Duration::from_millis(sleep_ms)).await;

    job * job
}

async fn process_result(result: i64) {
    println!("{}", result);
}

#[tokio::main]
async fn main() {
    let jobs = 0..100;
    let concurrency = 42;

    stream::iter(jobs)
        .map(compute_job)
        .buffer_unordered(concurrency)
        .for_each(process_result)
        .await;
}

Collecting results

Sometimes, instead of processing results directly we may need to collect them, to send them later in batch for example. Good news, the collect method is available on Streams:

main.rs

use futures::{stream, StreamExt};
use rand::{thread_rng, Rng};
use std::time::Duration;

async fn compute_job(job: i64) -> i64 {
    let mut rng = thread_rng();
    let sleep_ms: u64 = rng.gen_range(0..10);
    tokio::time::sleep(Duration::from_millis(sleep_ms)).await;

    job * job
}

async fn process_result(result: i64) {
    println!("{}", result);
}

#[tokio::main]
async fn main() {
    let jobs = 0..100;
    let concurrency = 42;

    let results: Vec<i64> = stream::iter(jobs)
        .map(compute_job)
        .buffer_unordered(concurrency)
        .collect()
        .await;
}

The code is on GitHub

As usual, you can find the code on GitHub: github.com/skerkour/kerkour.com (please don’t forget to star the repo 🙏).

1 email / week to learn how to (ab)use technology for fun & profit: Programming, Hacking & Entrepreneurship.
I hate spam even more than you do. I'll never share your email, and you can unsubscribe at any time.

Tags: rust, programming

Want to learn Rust and offensive security? Take a look at my book Black Hat Rust. All early-access supporters get a special discount and awesome bonuses: https://academy.kerkour.com/black-hat-rust?coupon=BLOG.
Warning: this offer is limited in time!

Related posts