1 ## Turning Our Single-Threaded Server into a Multithreaded Server
3 Right now, the server will process each request in turn, meaning it won’t
4 process a second connection until the first is finished processing. If the
5 server received more and more requests, this serial execution would be less and
6 less optimal. If the server receives a request that takes a long time to
7 process, subsequent requests will have to wait until the long request is
8 finished, even if the new requests can be processed quickly. We’ll need to fix
9 this, but first, we’ll look at the problem in action.
11 ### Simulating a Slow Request in the Current Server Implementation
13 We’ll look at how a slow-processing request can affect other requests made to
14 our current server implementation. Listing 20-10 implements handling a request
15 to */sleep* with a simulated slow response that will cause the server to sleep
16 for 5 seconds before responding.
18 <span class="filename">Filename: src/main.rs</span>
22 use std::time::Duration;
23 # use std::io::prelude::*;
24 # use std::net::TcpStream;
28 fn handle_connection(mut stream: TcpStream) {
29 # let mut buffer = [0; 512];
30 # stream.read(&mut buffer).unwrap();
33 let get = b"GET / HTTP/1.1\r\n";
34 let sleep = b"GET /sleep HTTP/1.1\r\n";
36 let (status_line, filename) = if buffer.starts_with(get) {
37 ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
38 } else if buffer.starts_with(sleep) {
39 thread::sleep(Duration::from_secs(5));
40 ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
42 ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
49 <span class="caption">Listing 20-10: Simulating a slow request by recognizing
50 */sleep* and sleeping for 5 seconds</span>
52 This code is a bit messy, but it’s good enough for simulation purposes. We
53 created a second request `sleep`, whose data our server recognizes. We added an
54 `else if` after the `if` block to check for the request to */sleep*. When that
55 request is received, the server will sleep for 5 seconds before rendering the
58 You can see how primitive our server is: real libraries would handle the
59 recognition of multiple requests in a much less verbose way!
61 Start the server using `cargo run`. Then open two browser windows: one for
62 *http://127.0.0.1:7878/* and the other for *http://127.0.0.1:7878/sleep*. If
63 you enter the */* URI a few times, as before, you’ll see it respond quickly.
64 But if you enter */sleep* and then load */*, you’ll see that */* waits until
65 `sleep` has slept for its full 5 seconds before loading.
67 There are multiple ways we could change how our web server works to avoid
68 having more requests back up behind a slow request; the one we’ll implement is
71 ### Improving Throughput with a Thread Pool
73 A *thread pool* is a group of spawned threads that are waiting and ready to
74 handle a task. When the program receives a new task, it assigns one of the
75 threads in the pool to the task, and that thread will process the task. The
76 remaining threads in the pool are available to handle any other tasks that come
77 in while the first thread is processing. When the first thread is done
78 processing its task, it’s returned to the pool of idle threads, ready to handle
79 a new task. A thread pool allows you to process connections concurrently,
80 increasing the throughput of your server.
82 We’ll limit the number of threads in the pool to a small number to protect us
83 from Denial of Service (DoS) attacks; if we had our program create a new thread
84 for each request as it came in, someone making 10 million requests to our
85 server could create havoc by using up all our server’s resources and grinding
86 the processing of requests to a halt.
88 Rather than spawning unlimited threads, we’ll have a fixed number of threads
89 waiting in the pool. As requests come in, they’ll be sent to the pool for
90 processing. The pool will maintain a queue of incoming requests. Each of the
91 threads in the pool will pop off a request from this queue, handle the request,
92 and then ask the queue for another request. With this design, we can process
93 `N` requests concurrently, where `N` is the number of threads. If each thread
94 is responding to a long-running request, subsequent requests can still back up
95 in the queue, but we’ve increased the number of long-running requests we can
96 handle before reaching that point.
98 This technique is just one of many ways to improve the throughput of a web
99 server. Other options you might explore are the fork/join model and the
100 single-threaded async I/O model. If you’re interested in this topic, you can
101 read more about other solutions and try to implement them in Rust; with a
102 low-level language like Rust, all of these options are possible.
104 Before we begin implementing a thread pool, let’s talk about what using the
105 pool should look like. When you’re trying to design code, writing the client
106 interface first can help guide your design. Write the API of the code so it’s
107 structured in the way you want to call it; then implement the functionality
108 within that structure rather than implementing the functionality and then
109 designing the public API.
111 Similar to how we used test-driven development in the project in Chapter 12,
112 we’ll use compiler-driven development here. We’ll write the code that calls the
113 functions we want, and then we’ll look at errors from the compiler to determine
114 what we should change next to get the code to work.
116 #### Code Structure If We Could Spawn a Thread for Each Request
118 First, let’s explore how our code might look if it did create a new thread for
119 every connection. As mentioned earlier, this isn’t our final plan due to the
120 problems with potentially spawning an unlimited number of threads, but it is a
121 starting point. Listing 20-11 shows the changes to make to `main` to spawn a
122 new thread to handle each stream within the `for` loop.
124 <span class="filename">Filename: src/main.rs</span>
128 # use std::io::prelude::*;
129 # use std::net::TcpListener;
130 # use std::net::TcpStream;
133 let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
135 for stream in listener.incoming() {
136 let stream = stream.unwrap();
139 handle_connection(stream);
143 # fn handle_connection(mut stream: TcpStream) {}
146 <span class="caption">Listing 20-11: Spawning a new thread for each
149 As you learned in Chapter 16, `thread::spawn` will create a new thread and then
150 run the code in the closure in the new thread. If you run this code and load
151 */sleep* in your browser, then */* in two more browser tabs, you’ll indeed see
152 that the requests to */* don’t have to wait for */sleep* to finish. But as we
153 mentioned, this will eventually overwhelm the system because you’d be making
154 new threads without any limit.
156 #### Creating a Similar Interface for a Finite Number of Threads
158 We want our thread pool to work in a similar, familiar way so switching from
159 threads to a thread pool doesn’t require large changes to the code that uses
160 our API. Listing 20-12 shows the hypothetical interface for a `ThreadPool`
161 struct we want to use instead of `thread::spawn`.
163 <span class="filename">Filename: src/main.rs</span>
167 # use std::io::prelude::*;
168 # use std::net::TcpListener;
169 # use std::net::TcpStream;
172 # fn new(size: u32) -> ThreadPool { ThreadPool }
173 # fn execute<F>(&self, f: F)
174 # where F: FnOnce() + Send + 'static {}
178 let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
179 let pool = ThreadPool::new(4);
181 for stream in listener.incoming() {
182 let stream = stream.unwrap();
185 handle_connection(stream);
189 # fn handle_connection(mut stream: TcpStream) {}
192 <span class="caption">Listing 20-12: Our ideal `ThreadPool` interface</span>
194 We use `ThreadPool::new` to create a new thread pool with a configurable number
195 of threads, in this case four. Then, in the `for` loop, `pool.execute` has a
196 similar interface as `thread::spawn` in that it takes a closure the pool should
197 run for each stream. We need to implement `pool.execute` so it takes the
198 closure and gives it to a thread in the pool to run. This code won’t yet
199 compile, but we’ll try so the compiler can guide us in how to fix it.
201 #### Building the `ThreadPool` Struct Using Compiler Driven Development
203 Make the changes in Listing 20-12 to *src/main.rs*, and then let’s use the
204 compiler errors from `cargo check` to drive our development. Here is the first
209 Compiling hello v0.1.0 (file:///projects/hello)
210 error[E0433]: failed to resolve. Use of undeclared type or module `ThreadPool`
211 --> src\main.rs:10:16
213 10 | let pool = ThreadPool::new(4);
214 | ^^^^^^^^^^^^^^^ Use of undeclared type or module
217 error: aborting due to previous error
220 Great! This error tells us we need a `ThreadPool` type or module, so we’ll
221 build one now. Our `ThreadPool` implementation will be independent of the kind
222 of work our web server is doing. So, let’s switch the `hello` crate from a
223 binary crate to a library crate to hold our `ThreadPool` implementation. After
224 we change to a library crate, we could also use the separate thread pool
225 library for any work we want to do using a thread pool, not just for serving
228 Create a *src/lib.rs* that contains the following, which is the simplest
229 definition of a `ThreadPool` struct that we can have for now:
231 <span class="filename">Filename: src/lib.rs</span>
234 pub struct ThreadPool;
237 Then create a new directory, *src/bin*, and move the binary crate rooted in
238 *src/main.rs* into *src/bin/main.rs*. Doing so will make the library crate the
239 primary crate in the *hello* directory; we can still run the binary in
240 *src/bin/main.rs* using `cargo run`. After moving the *main.rs* file, edit it
241 to bring the library crate in and bring `ThreadPool` into scope by adding the
242 following code to the top of *src/bin/main.rs*:
244 <span class="filename">Filename: src/bin/main.rs</span>
247 use hello::ThreadPool;
250 This code still won’t work, but let’s check it again to get the next error that
255 Compiling hello v0.1.0 (file:///projects/hello)
256 error[E0599]: no function or associated item named `new` found for type
257 `hello::ThreadPool` in the current scope
258 --> src/bin/main.rs:13:16
260 13 | let pool = ThreadPool::new(4);
261 | ^^^^^^^^^^^^^^^ function or associated item not found in
265 This error indicates that next we need to create an associated function named
266 `new` for `ThreadPool`. We also know that `new` needs to have one parameter
267 that can accept `4` as an argument and should return a `ThreadPool` instance.
268 Let’s implement the simplest `new` function that will have those
271 <span class="filename">Filename: src/lib.rs</span>
274 pub struct ThreadPool;
277 pub fn new(size: usize) -> ThreadPool {
283 We chose `usize` as the type of the `size` parameter, because we know that a
284 negative number of threads doesn’t make any sense. We also know we’ll use this
285 4 as the number of elements in a collection of threads, which is what the
286 `usize` type is for, as discussed in the [“Integer Types”][integer-types]<!--
287 ignore --> section of Chapter 3.
289 Let’s check the code again:
293 Compiling hello v0.1.0 (file:///projects/hello)
294 warning: unused variable: `size`
297 4 | pub fn new(size: usize) -> ThreadPool {
300 = note: #[warn(unused_variables)] on by default
301 = note: to avoid this warning, consider using `_size` instead
303 error[E0599]: no method named `execute` found for type `hello::ThreadPool` in the current scope
304 --> src/bin/main.rs:18:14
306 18 | pool.execute(|| {
310 Now we get a warning and an error. Ignoring the warning for a moment, the error
311 occurs because we don’t have an `execute` method on `ThreadPool`. Recall from
312 the [“Creating a Similar Interface for a Finite Number of
313 Threads”](#creating-a-similar-interface-for-a-finite-number-of-threads)<!--
314 ignore --> section that we decided our thread pool should have an interface
315 similar to `thread::spawn`. In addition, we’ll implement the `execute` function
316 so it takes the closure it’s given and gives it to an idle thread in the pool
319 We’ll define the `execute` method on `ThreadPool` to take a closure as a
320 parameter. Recall from the [“Storing Closures Using Generic Parameters and the
321 `Fn` Traits”][storing-closures-using-generic-parameters-and-the-fn-traits]<!--
322 ignore --> section in Chapter 13 that we can take closures as parameters with
323 three different traits: `Fn`, `FnMut`, and `FnOnce`. We need to decide which
324 kind of closure to use here. We know we’ll end up doing something similar to
325 the standard library `thread::spawn` implementation, so we can look at what
326 bounds the signature of `thread::spawn` has on its parameter. The documentation
327 shows us the following:
330 pub fn spawn<F, T>(f: F) -> JoinHandle<T>
332 F: FnOnce() -> T + Send + 'static,
336 The `F` type parameter is the one we’re concerned with here; the `T` type
337 parameter is related to the return value, and we’re not concerned with that. We
338 can see that `spawn` uses `FnOnce` as the trait bound on `F`. This is probably
339 what we want as well, because we’ll eventually pass the argument we get in
340 `execute` to `spawn`. We can be further confident that `FnOnce` is the trait we
341 want to use because the thread for running a request will only execute that
342 request’s closure one time, which matches the `Once` in `FnOnce`.
344 The `F` type parameter also has the trait bound `Send` and the lifetime bound
345 `'static`, which are useful in our situation: we need `Send` to transfer the
346 closure from one thread to another and `'static` because we don’t know how long
347 the thread will take to execute. Let’s create an `execute` method on
348 `ThreadPool` that will take a generic parameter of type `F` with these bounds:
350 <span class="filename">Filename: src/lib.rs</span>
353 # pub struct ThreadPool;
357 pub fn execute<F>(&self, f: F)
359 F: FnOnce() + Send + 'static
366 We still use the `()` after `FnOnce` because this `FnOnce` represents a closure
367 that takes no parameters and doesn’t return a value. Just like function
368 definitions, the return type can be omitted from the signature, but even if we
369 have no parameters, we still need the parentheses.
371 Again, this is the simplest implementation of the `execute` method: it does
372 nothing, but we’re trying only to make our code compile. Let’s check it again:
376 Compiling hello v0.1.0 (file:///projects/hello)
377 warning: unused variable: `size`
380 4 | pub fn new(size: usize) -> ThreadPool {
383 = note: #[warn(unused_variables)] on by default
384 = note: to avoid this warning, consider using `_size` instead
386 warning: unused variable: `f`
389 8 | pub fn execute<F>(&self, f: F)
392 = note: to avoid this warning, consider using `_f` instead
395 We’re receiving only warnings now, which means it compiles! But note that if
396 you try `cargo run` and make a request in the browser, you’ll see the errors in
397 the browser that we saw at the beginning of the chapter. Our library isn’t
398 actually calling the closure passed to `execute` yet!
400 > Note: A saying you might hear about languages with strict compilers, such as
401 > Haskell and Rust, is “if the code compiles, it works.” But this saying is not
402 > universally true. Our project compiles, but it does absolutely nothing! If we
403 > were building a real, complete project, this would be a good time to start
404 > writing unit tests to check that the code compiles *and* has the behavior we
407 #### Validating the Number of Threads in `new`
409 We’ll continue to get warnings because we aren’t doing anything with the
410 parameters to `new` and `execute`. Let’s implement the bodies of these
411 functions with the behavior we want. To start, let’s think about `new`. Earlier
412 we chose an unsigned type for the `size` parameter, because a pool with a
413 negative number of threads makes no sense. However, a pool with zero threads
414 also makes no sense, yet zero is a perfectly valid `usize`. We’ll add code to
415 check that `size` is greater than zero before we return a `ThreadPool` instance
416 and have the program panic if it receives a zero by using the `assert!` macro,
417 as shown in Listing 20-13.
419 <span class="filename">Filename: src/lib.rs</span>
422 # pub struct ThreadPool;
424 /// Create a new ThreadPool.
426 /// The size is the number of threads in the pool.
430 /// The `new` function will panic if the size is zero.
431 pub fn new(size: usize) -> ThreadPool {
441 <span class="caption">Listing 20-13: Implementing `ThreadPool::new` to panic if
442 `size` is zero</span>
444 We’ve added some documentation for our `ThreadPool` with doc comments. Note
445 that we followed good documentation practices by adding a section that calls
446 out the situations in which our function can panic, as discussed in Chapter 14.
447 Try running `cargo doc --open` and clicking the `ThreadPool` struct to see what
448 the generated docs for `new` look like!
450 Instead of adding the `assert!` macro as we’ve done here, we could make `new`
451 return a `Result` like we did with `Config::new` in the I/O project in Listing
452 12-9. But we’ve decided in this case that trying to create a thread pool
453 without any threads should be an unrecoverable error. If you’re feeling
454 ambitious, try to write a version of `new` with the following signature to
455 compare both versions:
458 pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> {
461 #### Creating Space to Store the Threads
463 Now that we have a way to know we have a valid number of threads to store in
464 the pool, we can create those threads and store them in the `ThreadPool` struct
465 before returning it. But how do we “store” a thread? Let’s take another look at
466 the `thread::spawn` signature:
469 pub fn spawn<F, T>(f: F) -> JoinHandle<T>
471 F: FnOnce() -> T + Send + 'static,
475 The `spawn` function returns a `JoinHandle<T>`, where `T` is the type that the
476 closure returns. Let’s try using `JoinHandle` too and see what happens. In our
477 case, the closures we’re passing to the thread pool will handle the connection
478 and not return anything, so `T` will be the unit type `()`.
480 The code in Listing 20-14 will compile but doesn’t create any threads yet.
481 We’ve changed the definition of `ThreadPool` to hold a vector of
482 `thread::JoinHandle<()>` instances, initialized the vector with a capacity of
483 `size`, set up a `for` loop that will run some code to create the threads, and
484 returned a `ThreadPool` instance containing them.
486 <span class="filename">Filename: src/lib.rs</span>
488 ```rust,ignore,not_desired_behavior
491 pub struct ThreadPool {
492 threads: Vec<thread::JoinHandle<()>>,
497 pub fn new(size: usize) -> ThreadPool {
500 let mut threads = Vec::with_capacity(size);
503 // create some threads and store them in the vector
515 <span class="caption">Listing 20-14: Creating a vector for `ThreadPool` to hold
518 We’ve brought `std::thread` into scope in the library crate, because we’re
519 using `thread::JoinHandle` as the type of the items in the vector in
522 Once a valid size is received, our `ThreadPool` creates a new vector that can
523 hold `size` items. We haven’t used the `with_capacity` function in this book
524 yet, which performs the same task as `Vec::new` but with an important
525 difference: it preallocates space in the vector. Because we know we need to
526 store `size` elements in the vector, doing this allocation up front is slightly
527 more efficient than using `Vec::new`, which resizes itself as elements are
530 When you run `cargo check` again, you’ll get a few more warnings, but it should
533 #### A `Worker` Struct Responsible for Sending Code from the `ThreadPool` to a Thread
535 We left a comment in the `for` loop in Listing 20-14 regarding the creation of
536 threads. Here, we’ll look at how we actually create threads. The standard
537 library provides `thread::spawn` as a way to create threads, and
538 `thread::spawn` expects to get some code the thread should run as soon as the
539 thread is created. However, in our case, we want to create the threads and have
540 them *wait* for code that we’ll send later. The standard library’s
541 implementation of threads doesn’t include any way to do that; we have to
542 implement it manually.
544 We’ll implement this behavior by introducing a new data structure between the
545 `ThreadPool` and the threads that will manage this new behavior. We’ll call
546 this data structure `Worker`, which is a common term in pooling
547 implementations. Think of people working in the kitchen at a restaurant: the
548 workers wait until orders come in from customers, and then they’re responsible
549 for taking those orders and filling them.
551 Instead of storing a vector of `JoinHandle<()>` instances in the thread pool,
552 we’ll store instances of the `Worker` struct. Each `Worker` will store a single
553 `JoinHandle<()>` instance. Then we’ll implement a method on `Worker` that will
554 take a closure of code to run and send it to the already running thread for
555 execution. We’ll also give each worker an `id` so we can distinguish between
556 the different workers in the pool when logging or debugging.
558 Let’s make the following changes to what happens when we create a `ThreadPool`.
559 We’ll implement the code that sends the closure to the thread after we have
560 `Worker` set up in this way:
562 1. Define a `Worker` struct that holds an `id` and a `JoinHandle<()>`.
563 2. Change `ThreadPool` to hold a vector of `Worker` instances.
564 3. Define a `Worker::new` function that takes an `id` number and returns a
565 `Worker` instance that holds the `id` and a thread spawned with an empty
567 4. In `ThreadPool::new`, use the `for` loop counter to generate an `id`, create
568 a new `Worker` with that `id`, and store the worker in the vector.
570 If you’re up for a challenge, try implementing these changes on your own before
571 looking at the code in Listing 20-15.
573 Ready? Here is Listing 20-15 with one way to make the preceding modifications.
575 <span class="filename">Filename: src/lib.rs</span>
580 pub struct ThreadPool {
581 workers: Vec<Worker>,
586 pub fn new(size: usize) -> ThreadPool {
589 let mut workers = Vec::with_capacity(size);
592 workers.push(Worker::new(id));
604 thread: thread::JoinHandle<()>,
608 fn new(id: usize) -> Worker {
609 let thread = thread::spawn(|| {});
619 <span class="caption">Listing 20-15: Modifying `ThreadPool` to hold `Worker`
620 instances instead of holding threads directly</span>
622 We’ve changed the name of the field on `ThreadPool` from `threads` to `workers`
623 because it’s now holding `Worker` instances instead of `JoinHandle<()>`
624 instances. We use the counter in the `for` loop as an argument to
625 `Worker::new`, and we store each new `Worker` in the vector named `workers`.
627 External code (like our server in *src/bin/main.rs*) doesn’t need to know the
628 implementation details regarding using a `Worker` struct within `ThreadPool`,
629 so we make the `Worker` struct and its `new` function private. The
630 `Worker::new` function uses the `id` we give it and stores a `JoinHandle<()>`
631 instance that is created by spawning a new thread using an empty closure.
633 This code will compile and will store the number of `Worker` instances we
634 specified as an argument to `ThreadPool::new`. But we’re *still* not processing
635 the closure that we get in `execute`. Let’s look at how to do that next.
637 #### Sending Requests to Threads via Channels
639 Now we’ll tackle the problem that the closures given to `thread::spawn` do
640 absolutely nothing. Currently, we get the closure we want to execute in the
641 `execute` method. But we need to give `thread::spawn` a closure to run when we
642 create each `Worker` during the creation of the `ThreadPool`.
644 We want the `Worker` structs that we just created to fetch code to run from a
645 queue held in the `ThreadPool` and send that code to its thread to run.
647 In Chapter 16, you learned about *channels*—a simple way to communicate between
648 two threads—that would be perfect for this use case. We’ll use a channel to
649 function as the queue of jobs, and `execute` will send a job from the
650 `ThreadPool` to the `Worker` instances, which will send the job to its thread.
653 1. The `ThreadPool` will create a channel and hold on to the sending side of
655 2. Each `Worker` will hold on to the receiving side of the channel.
656 3. We’ll create a new `Job` struct that will hold the closures we want to send
658 4. The `execute` method will send the job it wants to execute down the sending
660 5. In its thread, the `Worker` will loop over its receiving side of the channel
661 and execute the closures of any jobs it receives.
663 Let’s start by creating a channel in `ThreadPool::new` and holding the sending
664 side in the `ThreadPool` instance, as shown in Listing 20-16. The `Job` struct
665 doesn’t hold anything for now but will be the type of item we’re sending down
668 <span class="filename">Filename: src/lib.rs</span>
675 pub struct ThreadPool {
676 workers: Vec<Worker>,
677 sender: mpsc::Sender<Job>,
684 pub fn new(size: usize) -> ThreadPool {
687 let (sender, receiver) = mpsc::channel();
689 let mut workers = Vec::with_capacity(size);
692 workers.push(Worker::new(id));
705 # thread: thread::JoinHandle<()>,
709 # fn new(id: usize) -> Worker {
710 # let thread = thread::spawn(|| {});
720 <span class="caption">Listing 20-16: Modifying `ThreadPool` to store the
721 sending end of a channel that sends `Job` instances</span>
723 In `ThreadPool::new`, we create our new channel and have the pool hold the
724 sending end. This will successfully compile, still with warnings.
726 Let’s try passing a receiving end of the channel into each worker as the thread
727 pool creates the channel. We know we want to use the receiving end in the
728 thread that the workers spawn, so we’ll reference the `receiver` parameter in
729 the closure. The code in Listing 20-17 won’t quite compile yet.
731 <span class="filename">Filename: src/lib.rs</span>
733 ```rust,ignore,does_not_compile
736 pub fn new(size: usize) -> ThreadPool {
739 let (sender, receiver) = mpsc::channel();
741 let mut workers = Vec::with_capacity(size);
744 workers.push(Worker::new(id, receiver));
758 fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
759 let thread = thread::spawn(|| {
771 <span class="caption">Listing 20-17: Passing the receiving end of the channel
772 to the workers</span>
774 We’ve made some small and straightforward changes: we pass the receiving end of
775 the channel into `Worker::new`, and then we use it inside the closure.
777 When we try to check this code, we get this error:
781 Compiling hello v0.1.0 (file:///projects/hello)
782 error[E0382]: use of moved value: `receiver`
785 27 | workers.push(Worker::new(id, receiver));
786 | ^^^^^^^^ value moved here in
787 previous iteration of loop
789 = note: move occurs because `receiver` has type
790 `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
793 The code is trying to pass `receiver` to multiple `Worker` instances. This
794 won’t work, as you’ll recall from Chapter 16: the channel implementation that
795 Rust provides is multiple *producer*, single *consumer*. This means we can’t
796 just clone the consuming end of the channel to fix this code. Even if we could,
797 that is not the technique we would want to use; instead, we want to distribute
798 the jobs across threads by sharing the single `receiver` among all the workers.
800 Additionally, taking a job off the channel queue involves mutating the
801 `receiver`, so the threads need a safe way to share and modify `receiver`;
802 otherwise, we might get race conditions (as covered in Chapter 16).
804 Recall the thread-safe smart pointers discussed in Chapter 16: to share
805 ownership across multiple threads and allow the threads to mutate the value, we
806 need to use `Arc<Mutex<T>>`. The `Arc` type will let multiple workers own the
807 receiver, and `Mutex` will ensure that only one worker gets a job from the
808 receiver at a time. Listing 20-18 shows the changes we need to make.
810 <span class="filename">Filename: src/lib.rs</span>
814 # use std::sync::mpsc;
816 use std::sync::Mutex;
819 # pub struct ThreadPool {
820 # workers: Vec<Worker>,
821 # sender: mpsc::Sender<Job>,
827 pub fn new(size: usize) -> ThreadPool {
830 let (sender, receiver) = mpsc::channel();
832 let receiver = Arc::new(Mutex::new(receiver));
834 let mut workers = Vec::with_capacity(size);
837 workers.push(Worker::new(id, Arc::clone(&receiver)));
851 # thread: thread::JoinHandle<()>,
855 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
857 # let thread = thread::spawn(|| {
869 <span class="caption">Listing 20-18: Sharing the receiving end of the channel
870 among the workers using `Arc` and `Mutex`</span>
872 In `ThreadPool::new`, we put the receiving end of the channel in an `Arc` and a
873 `Mutex`. For each new worker, we clone the `Arc` to bump the reference count so
874 the workers can share ownership of the receiving end.
876 With these changes, the code compiles! We’re getting there!
878 #### Implementing the `execute` Method
880 Let’s finally implement the `execute` method on `ThreadPool`. We’ll also change
881 `Job` from a struct to a type alias for a trait object that holds the type of
882 closure that `execute` receives. As discussed in the [“Creating Type Synonyms
883 with Type Aliases”][creating-type-synonyms-with-type-aliases]<!-- ignore -->
884 section of Chapter 19, type aliases allow us to make long types shorter. Look
887 <span class="filename">Filename: src/lib.rs</span>
891 # pub struct ThreadPool {
892 # workers: Vec<Worker>,
893 # sender: mpsc::Sender<Job>,
895 # use std::sync::mpsc;
898 type Job = Box<FnOnce() + Send + 'static>;
903 pub fn execute<F>(&self, f: F)
905 F: FnOnce() + Send + 'static
907 let job = Box::new(f);
909 self.sender.send(job).unwrap();
916 <span class="caption">Listing 20-19: Creating a `Job` type alias for a `Box`
917 that holds each closure and then sending the job down the channel</span>
919 After creating a new `Job` instance using the closure we get in `execute`, we
920 send that job down the sending end of the channel. We’re calling `unwrap` on
921 `send` for the case that sending fails. This might happen if, for example, we
922 stop all our threads from executing, meaning the receiving end has stopped
923 receiving new messages. At the moment, we can’t stop our threads from
924 executing: our threads continue executing as long as the pool exists. The
925 reason we use `unwrap` is that we know the failure case won’t happen, but the
926 compiler doesn’t know that.
928 But we’re not quite done yet! In the worker, our closure being passed to
929 `thread::spawn` still only *references* the receiving end of the channel.
930 Instead, we need the closure to loop forever, asking the receiving end of the
931 channel for a job and running the job when it gets one. Let’s make the change
932 shown in Listing 20-20 to `Worker::new`.
934 <span class="filename">Filename: src/lib.rs</span>
936 ```rust,ignore,does_not_compile
940 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
941 let thread = thread::spawn(move || {
943 let job = receiver.lock().unwrap().recv().unwrap();
945 println!("Worker {} got a job; executing.", id);
959 <span class="caption">Listing 20-20: Receiving and executing the jobs in the
960 worker’s thread</span>
962 Here, we first call `lock` on the `receiver` to acquire the mutex, and then we
963 call `unwrap` to panic on any errors. Acquiring a lock might fail if the mutex
964 is in a *poisoned* state, which can happen if some other thread panicked while
965 holding the lock rather than releasing the lock. In this situation, calling
966 `unwrap` to have this thread panic is the correct action to take. Feel free to
967 change this `unwrap` to an `expect` with an error message that is meaningful to
970 If we get the lock on the mutex, we call `recv` to receive a `Job` from the
971 channel. A final `unwrap` moves past any errors here as well, which might occur
972 if the thread holding the sending side of the channel has shut down, similar to
973 how the `send` method returns `Err` if the receiving side shuts down.
975 The call to `recv` blocks, so if there is no job yet, the current thread will
976 wait until a job becomes available. The `Mutex<T>` ensures that only one
977 `Worker` thread at a time is trying to request a job.
979 Theoretically, this code should compile. Unfortunately, the Rust compiler isn’t
980 perfect yet, and we get this error:
983 error[E0161]: cannot move a value of type std::ops::FnOnce() +
984 std::marker::Send: the size of std::ops::FnOnce() + std::marker::Send cannot be
985 statically determined
992 This error is fairly cryptic because the problem is fairly cryptic. To call a
993 `FnOnce` closure that is stored in a `Box<T>` (which is what our `Job` type
994 alias is), the closure needs to move itself *out* of the `Box<T>` because the
995 closure takes ownership of `self` when we call it. In general, Rust doesn’t
996 allow us to move a value out of a `Box<T>` because Rust doesn’t know how big
997 the value inside the `Box<T>` will be: recall in Chapter 15 that we used
998 `Box<T>` precisely because we had something of an unknown size that we wanted
999 to store in a `Box<T>` to get a value of a known size.
1001 As you saw in Listing 17-15, we can write methods that use the syntax `self:
1002 Box<Self>`, which allows the method to take ownership of a `Self` value stored
1003 in a `Box<T>`. That’s exactly what we want to do here, but unfortunately Rust
1004 won’t let us: the part of Rust that implements behavior when a closure is
1005 called isn’t implemented using `self: Box<Self>`. So Rust doesn’t yet
1006 understand that it could use `self: Box<Self>` in this situation to take
1007 ownership of the closure and move the closure out of the `Box<T>`.
1009 Rust is still a work in progress with places where the compiler could be
1010 improved, but in the future, the code in Listing 20-20 should work just fine.
1011 People just like you are working to fix this and other issues! After you’ve
1012 finished this book, we would love for you to join in.
1014 But for now, let’s work around this problem using a handy trick. We can tell
1015 Rust explicitly that in this case we can take ownership of the value inside the
1016 `Box<T>` using `self: Box<Self>`; then, once we have ownership of the closure,
1017 we can call it. This involves defining a new trait `FnBox` with the method
1018 `call_box` that will use `self: Box<Self>` in its signature, defining `FnBox`
1019 for any type that implements `FnOnce()`, changing our type alias to use the new
1020 trait, and changing `Worker` to use the `call_box` method. These changes are
1021 shown in Listing 20-21.
1023 <span class="filename">Filename: src/lib.rs</span>
1027 fn call_box(self: Box<Self>);
1030 impl<F: FnOnce()> FnBox for F {
1031 fn call_box(self: Box<F>) {
1036 type Job = Box<dyn FnBox + Send + 'static>;
1041 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
1042 let thread = thread::spawn(move || {
1044 let job = receiver.lock().unwrap().recv().unwrap();
1046 println!("Worker {} got a job; executing.", id);
1060 <span class="caption">Listing 20-21: Adding a new trait `FnBox` to work around
1061 the current limitations of `Box<FnOnce()>`</span>
1063 First, we create a new trait named `FnBox`. This trait has the one method
1064 `call_box`, which is similar to the `call` methods on the other `Fn*` traits
1065 except that it takes `self: Box<Self>` to take ownership of `self` and move the
1066 value out of the `Box<T>`.
1068 Next, we implement the `FnBox` trait for any type `F` that implements the
1069 `FnOnce()` trait. Effectively, this means that any `FnOnce()` closures can use
1070 our `call_box` method. The implementation of `call_box` uses `(*self)()` to
1071 move the closure out of the `Box<T>` and call the closure.
1073 We now need our `Job` type alias to be a `Box` of anything that implements our
1074 new trait `FnBox`. This will allow us to use `call_box` in `Worker` when we get
1075 a `Job` value instead of invoking the closure directly. Implementing the
1076 `FnBox` trait for any `FnOnce()` closure means we don’t have to change anything
1077 about the actual values we’re sending down the channel. Now Rust is able to
1078 recognize that what we want to do is fine.
1080 This trick is very sneaky and complicated. Don’t worry if it doesn’t make
1081 perfect sense; someday, it will be completely unnecessary.
1083 With the implementation of this trick, our thread pool is in a working state!
1084 Give it a `cargo run` and make some requests:
1088 Compiling hello v0.1.0 (file:///projects/hello)
1089 warning: field is never used: `workers`
1092 7 | workers: Vec<Worker>,
1093 | ^^^^^^^^^^^^^^^^^^^^
1095 = note: #[warn(dead_code)] on by default
1097 warning: field is never used: `id`
1103 = note: #[warn(dead_code)] on by default
1105 warning: field is never used: `thread`
1108 62 | thread: thread::JoinHandle<()>,
1109 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1111 = note: #[warn(dead_code)] on by default
1113 Finished dev [unoptimized + debuginfo] target(s) in 0.99 secs
1114 Running `target/debug/hello`
1115 Worker 0 got a job; executing.
1116 Worker 2 got a job; executing.
1117 Worker 1 got a job; executing.
1118 Worker 3 got a job; executing.
1119 Worker 0 got a job; executing.
1120 Worker 2 got a job; executing.
1121 Worker 1 got a job; executing.
1122 Worker 3 got a job; executing.
1123 Worker 0 got a job; executing.
1124 Worker 2 got a job; executing.
1127 Success! We now have a thread pool that executes connections asynchronously.
1128 There are never more than four threads created, so our system won’t get
1129 overloaded if the server receives a lot of requests. If we make a request to
1130 */sleep*, the server will be able to serve other requests by having another
1133 > Note: if you open */sleep* in multiple browser windows simultaneously, they
1134 > might load one at a time in 5 second intervals. Some web browsers execute
1135 > multiple instances of the same request sequentially for caching reasons. This
1136 > limitation is not caused by our web server.
1138 After learning about the `while let` loop in Chapter 18, you might be wondering
1139 why we didn’t write the worker thread code as shown in Listing 20-22.
1141 <span class="filename">Filename: src/lib.rs</span>
1143 ```rust,ignore,not_desired_behavior
1147 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
1148 let thread = thread::spawn(move || {
1149 while let Ok(job) = receiver.lock().unwrap().recv() {
1150 println!("Worker {} got a job; executing.", id);
1164 <span class="caption">Listing 20-22: An alternative implementation of
1165 `Worker::new` using `while let`</span>
1167 This code compiles and runs but doesn’t result in the desired threading
1168 behavior: a slow request will still cause other requests to wait to be
1169 processed. The reason is somewhat subtle: the `Mutex` struct has no public
1170 `unlock` method because the ownership of the lock is based on the lifetime of
1171 the `MutexGuard<T>` within the `LockResult<MutexGuard<T>>` that the `lock`
1172 method returns. At compile time, the borrow checker can then enforce the rule
1173 that a resource guarded by a `Mutex` cannot be accessed unless we hold the
1174 lock. But this implementation can also result in the lock being held longer
1175 than intended if we don’t think carefully about the lifetime of the
1176 `MutexGuard<T>`. Because the values in the `while` expression remain in scope
1177 for the duration of the block, the lock remains held for the duration of the
1178 call to `job.call_box()`, meaning other workers cannot receive jobs.
1180 By using `loop` instead and acquiring the lock and a job within the block
1181 rather than outside it, the `MutexGuard` returned from the `lock` method is
1182 dropped as soon as the `let job` statement ends. This ensures that the lock is
1183 held during the call to `recv`, but it is released before the call to
1184 `job.call_box()`, allowing multiple requests to be serviced concurrently.
1186 [creating-type-synonyms-with-type-aliases]:
1187 ch19-04-advanced-types.html#creating-type-synonyms-with-type-aliases
1188 [integer-types]: ch03-02-data-types.html#integer-types
1189 [storing-closures-using-generic-parameters-and-the-fn-traits]:
1190 ch13-01-closures.html#storing-closures-using-generic-parameters-and-the-fn-traits