]> git.proxmox.com Git - rustc.git/blame - src/doc/book/src/ch20-02-multithreaded.md
New upstream version 1.37.0+dfsg1
[rustc.git] / src / doc / book / src / ch20-02-multithreaded.md
CommitLineData
13cf67c4
XL
1## Turning Our Single-Threaded Server into a Multithreaded Server
2
3Right now, the server will process each request in turn, meaning it won’t
4process a second connection until the first is finished processing. If the
5server received more and more requests, this serial execution would be less and
6less optimal. If the server receives a request that takes a long time to
7process, subsequent requests will have to wait until the long request is
8finished, even if the new requests can be processed quickly. We’ll need to fix
9this, but first, we’ll look at the problem in action.
10
11### Simulating a Slow Request in the Current Server Implementation
12
13We’ll look at how a slow-processing request can affect other requests made to
14our current server implementation. Listing 20-10 implements handling a request
15to */sleep* with a simulated slow response that will cause the server to sleep
16for 5 seconds before responding.
17
18<span class="filename">Filename: src/main.rs</span>
19
20```rust
21use std::thread;
22use std::time::Duration;
23# use std::io::prelude::*;
24# use std::net::TcpStream;
25# use std::fs::File;
26// --snip--
27
28fn handle_connection(mut stream: TcpStream) {
29# let mut buffer = [0; 512];
30# stream.read(&mut buffer).unwrap();
31 // --snip--
32
33 let get = b"GET / HTTP/1.1\r\n";
34 let sleep = b"GET /sleep HTTP/1.1\r\n";
35
36 let (status_line, filename) = if buffer.starts_with(get) {
37 ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
38 } else if buffer.starts_with(sleep) {
39 thread::sleep(Duration::from_secs(5));
40 ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
41 } else {
42 ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
43 };
44
45 // --snip--
46}
47```
48
49<span class="caption">Listing 20-10: Simulating a slow request by recognizing
50*/sleep* and sleeping for 5 seconds</span>
51
52This code is a bit messy, but it’s good enough for simulation purposes. We
53created a second request `sleep`, whose data our server recognizes. We added an
54`else if` after the `if` block to check for the request to */sleep*. When that
55request is received, the server will sleep for 5 seconds before rendering the
56successful HTML page.
57
58You can see how primitive our server is: real libraries would handle the
59recognition of multiple requests in a much less verbose way!
60
61Start the server using `cargo run`. Then open two browser windows: one for
62*http://127.0.0.1:7878/* and the other for *http://127.0.0.1:7878/sleep*. If
63you enter the */* URI a few times, as before, you’ll see it respond quickly.
64But if you enter */sleep* and then load */*, you’ll see that */* waits until
65`sleep` has slept for its full 5 seconds before loading.
66
67There are multiple ways we could change how our web server works to avoid
68having more requests back up behind a slow request; the one we’ll implement is
69a thread pool.
70
71### Improving Throughput with a Thread Pool
72
73A *thread pool* is a group of spawned threads that are waiting and ready to
74handle a task. When the program receives a new task, it assigns one of the
75threads in the pool to the task, and that thread will process the task. The
76remaining threads in the pool are available to handle any other tasks that come
77in while the first thread is processing. When the first thread is done
78processing its task, it’s returned to the pool of idle threads, ready to handle
79a new task. A thread pool allows you to process connections concurrently,
80increasing the throughput of your server.
81
82We’ll limit the number of threads in the pool to a small number to protect us
83from Denial of Service (DoS) attacks; if we had our program create a new thread
84for each request as it came in, someone making 10 million requests to our
85server could create havoc by using up all our server’s resources and grinding
86the processing of requests to a halt.
87
88Rather than spawning unlimited threads, we’ll have a fixed number of threads
89waiting in the pool. As requests come in, they’ll be sent to the pool for
90processing. The pool will maintain a queue of incoming requests. Each of the
91threads in the pool will pop off a request from this queue, handle the request,
92and then ask the queue for another request. With this design, we can process
93`N` requests concurrently, where `N` is the number of threads. If each thread
94is responding to a long-running request, subsequent requests can still back up
95in the queue, but we’ve increased the number of long-running requests we can
96handle before reaching that point.
97
98This technique is just one of many ways to improve the throughput of a web
99server. Other options you might explore are the fork/join model and the
100single-threaded async I/O model. If you’re interested in this topic, you can
101read more about other solutions and try to implement them in Rust; with a
102low-level language like Rust, all of these options are possible.
103
104Before we begin implementing a thread pool, let’s talk about what using the
105pool should look like. When you’re trying to design code, writing the client
106interface first can help guide your design. Write the API of the code so it’s
107structured in the way you want to call it; then implement the functionality
108within that structure rather than implementing the functionality and then
109designing the public API.
110
111Similar to how we used test-driven development in the project in Chapter 12,
112we’ll use compiler-driven development here. We’ll write the code that calls the
113functions we want, and then we’ll look at errors from the compiler to determine
114what we should change next to get the code to work.
115
116#### Code Structure If We Could Spawn a Thread for Each Request
117
118First, let’s explore how our code might look if it did create a new thread for
119every connection. As mentioned earlier, this isn’t our final plan due to the
120problems with potentially spawning an unlimited number of threads, but it is a
121starting point. Listing 20-11 shows the changes to make to `main` to spawn a
122new thread to handle each stream within the `for` loop.
123
124<span class="filename">Filename: src/main.rs</span>
125
126```rust,no_run
127# use std::thread;
128# use std::io::prelude::*;
129# use std::net::TcpListener;
130# use std::net::TcpStream;
131#
132fn main() {
133 let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
134
135 for stream in listener.incoming() {
136 let stream = stream.unwrap();
137
138 thread::spawn(|| {
139 handle_connection(stream);
140 });
141 }
142}
143# fn handle_connection(mut stream: TcpStream) {}
144```
145
146<span class="caption">Listing 20-11: Spawning a new thread for each
147stream</span>
148
149As you learned in Chapter 16, `thread::spawn` will create a new thread and then
150run the code in the closure in the new thread. If you run this code and load
151*/sleep* in your browser, then */* in two more browser tabs, you’ll indeed see
152that the requests to */* don’t have to wait for */sleep* to finish. But as we
153mentioned, this will eventually overwhelm the system because you’d be making
154new threads without any limit.
155
156#### Creating a Similar Interface for a Finite Number of Threads
157
158We want our thread pool to work in a similar, familiar way so switching from
159threads to a thread pool doesn’t require large changes to the code that uses
160our API. Listing 20-12 shows the hypothetical interface for a `ThreadPool`
161struct we want to use instead of `thread::spawn`.
162
163<span class="filename">Filename: src/main.rs</span>
164
165```rust,no_run
166# use std::thread;
167# use std::io::prelude::*;
168# use std::net::TcpListener;
169# use std::net::TcpStream;
170# struct ThreadPool;
171# impl ThreadPool {
172# fn new(size: u32) -> ThreadPool { ThreadPool }
173# fn execute<F>(&self, f: F)
174# where F: FnOnce() + Send + 'static {}
175# }
176#
177fn main() {
178 let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
179 let pool = ThreadPool::new(4);
180
181 for stream in listener.incoming() {
182 let stream = stream.unwrap();
183
184 pool.execute(|| {
185 handle_connection(stream);
186 });
187 }
188}
189# fn handle_connection(mut stream: TcpStream) {}
190```
191
192<span class="caption">Listing 20-12: Our ideal `ThreadPool` interface</span>
193
194We use `ThreadPool::new` to create a new thread pool with a configurable number
195of threads, in this case four. Then, in the `for` loop, `pool.execute` has a
196similar interface as `thread::spawn` in that it takes a closure the pool should
197run for each stream. We need to implement `pool.execute` so it takes the
198closure and gives it to a thread in the pool to run. This code won’t yet
199compile, but we’ll try so the compiler can guide us in how to fix it.
200
201#### Building the `ThreadPool` Struct Using Compiler Driven Development
202
203Make the changes in Listing 20-12 to *src/main.rs*, and then let’s use the
204compiler errors from `cargo check` to drive our development. Here is the first
205error we get:
206
207```text
208$ cargo check
209 Compiling hello v0.1.0 (file:///projects/hello)
210error[E0433]: failed to resolve. Use of undeclared type or module `ThreadPool`
211 --> src\main.rs:10:16
212 |
21310 | let pool = ThreadPool::new(4);
214 | ^^^^^^^^^^^^^^^ Use of undeclared type or module
215 `ThreadPool`
216
217error: aborting due to previous error
218```
219
220Great! This error tells us we need a `ThreadPool` type or module, so we’ll
221build one now. Our `ThreadPool` implementation will be independent of the kind
222of work our web server is doing. So, let’s switch the `hello` crate from a
223binary crate to a library crate to hold our `ThreadPool` implementation. After
224we change to a library crate, we could also use the separate thread pool
225library for any work we want to do using a thread pool, not just for serving
226web requests.
227
228Create a *src/lib.rs* that contains the following, which is the simplest
229definition of a `ThreadPool` struct that we can have for now:
230
231<span class="filename">Filename: src/lib.rs</span>
232
233```rust
234pub struct ThreadPool;
235```
236
237Then create a new directory, *src/bin*, and move the binary crate rooted in
238*src/main.rs* into *src/bin/main.rs*. Doing so will make the library crate the
239primary crate in the *hello* directory; we can still run the binary in
240*src/bin/main.rs* using `cargo run`. After moving the *main.rs* file, edit it
241to bring the library crate in and bring `ThreadPool` into scope by adding the
242following code to the top of *src/bin/main.rs*:
243
244<span class="filename">Filename: src/bin/main.rs</span>
245
246```rust,ignore
247use hello::ThreadPool;
248```
249
250This code still won’t work, but let’s check it again to get the next error that
251we need to address:
252
253```text
254$ cargo check
255 Compiling hello v0.1.0 (file:///projects/hello)
256error[E0599]: no function or associated item named `new` found for type
257`hello::ThreadPool` in the current scope
258 --> src/bin/main.rs:13:16
259 |
26013 | let pool = ThreadPool::new(4);
261 | ^^^^^^^^^^^^^^^ function or associated item not found in
262 `hello::ThreadPool`
263```
264
265This error indicates that next we need to create an associated function named
266`new` for `ThreadPool`. We also know that `new` needs to have one parameter
267that can accept `4` as an argument and should return a `ThreadPool` instance.
268Let’s implement the simplest `new` function that will have those
269characteristics:
270
271<span class="filename">Filename: src/lib.rs</span>
272
273```rust
274pub struct ThreadPool;
275
276impl ThreadPool {
277 pub fn new(size: usize) -> ThreadPool {
278 ThreadPool
279 }
280}
281```
282
283We chose `usize` as the type of the `size` parameter, because we know that a
284negative number of threads doesn’t make any sense. We also know we’ll use this
2854 as the number of elements in a collection of threads, which is what the
9fa01778
XL
286`usize` type is for, as discussed in the [“Integer Types”][integer-types]<!--
287ignore --> section of Chapter 3.
13cf67c4
XL
288
289Let’s check the code again:
290
291```text
292$ cargo check
293 Compiling hello v0.1.0 (file:///projects/hello)
294warning: unused variable: `size`
295 --> src/lib.rs:4:16
296 |
2974 | pub fn new(size: usize) -> ThreadPool {
298 | ^^^^
299 |
300 = note: #[warn(unused_variables)] on by default
301 = note: to avoid this warning, consider using `_size` instead
302
303error[E0599]: no method named `execute` found for type `hello::ThreadPool` in the current scope
304 --> src/bin/main.rs:18:14
305 |
30618 | pool.execute(|| {
307 | ^^^^^^^
308```
309
310Now we get a warning and an error. Ignoring the warning for a moment, the error
311occurs because we don’t have an `execute` method on `ThreadPool`. Recall from
9fa01778
XL
312the [“Creating a Similar Interface for a Finite Number of
313Threads”](#creating-a-similar-interface-for-a-finite-number-of-threads)<!--
314ignore --> section that we decided our thread pool should have an interface
315similar to `thread::spawn`. In addition, we’ll implement the `execute` function
316so it takes the closure it’s given and gives it to an idle thread in the pool
317to run.
13cf67c4
XL
318
319We’ll define the `execute` method on `ThreadPool` to take a closure as a
9fa01778
XL
320parameter. Recall from the [“Storing Closures Using Generic Parameters and the
321`Fn` Traits”][storing-closures-using-generic-parameters-and-the-fn-traits]<!--
322ignore --> section in Chapter 13 that we can take closures as parameters with
13cf67c4
XL
323three different traits: `Fn`, `FnMut`, and `FnOnce`. We need to decide which
324kind of closure to use here. We know we’ll end up doing something similar to
325the standard library `thread::spawn` implementation, so we can look at what
326bounds the signature of `thread::spawn` has on its parameter. The documentation
327shows us the following:
328
329```rust,ignore
330pub fn spawn<F, T>(f: F) -> JoinHandle<T>
331 where
332 F: FnOnce() -> T + Send + 'static,
333 T: Send + 'static
334```
335
336The `F` type parameter is the one we’re concerned with here; the `T` type
337parameter is related to the return value, and we’re not concerned with that. We
338can see that `spawn` uses `FnOnce` as the trait bound on `F`. This is probably
339what we want as well, because we’ll eventually pass the argument we get in
340`execute` to `spawn`. We can be further confident that `FnOnce` is the trait we
341want to use because the thread for running a request will only execute that
342request’s closure one time, which matches the `Once` in `FnOnce`.
343
344The `F` type parameter also has the trait bound `Send` and the lifetime bound
345`'static`, which are useful in our situation: we need `Send` to transfer the
346closure from one thread to another and `'static` because we don’t know how long
347the thread will take to execute. Let’s create an `execute` method on
348`ThreadPool` that will take a generic parameter of type `F` with these bounds:
349
350<span class="filename">Filename: src/lib.rs</span>
351
352```rust
353# pub struct ThreadPool;
354impl ThreadPool {
355 // --snip--
356
357 pub fn execute<F>(&self, f: F)
358 where
359 F: FnOnce() + Send + 'static
360 {
361
362 }
363}
364```
365
366We still use the `()` after `FnOnce` because this `FnOnce` represents a closure
367that takes no parameters and doesn’t return a value. Just like function
368definitions, the return type can be omitted from the signature, but even if we
369have no parameters, we still need the parentheses.
370
371Again, this is the simplest implementation of the `execute` method: it does
372nothing, but we’re trying only to make our code compile. Let’s check it again:
373
374```text
375$ cargo check
376 Compiling hello v0.1.0 (file:///projects/hello)
377warning: unused variable: `size`
378 --> src/lib.rs:4:16
379 |
3804 | pub fn new(size: usize) -> ThreadPool {
381 | ^^^^
382 |
383 = note: #[warn(unused_variables)] on by default
384 = note: to avoid this warning, consider using `_size` instead
385
386warning: unused variable: `f`
387 --> src/lib.rs:8:30
388 |
3898 | pub fn execute<F>(&self, f: F)
390 | ^
391 |
392 = note: to avoid this warning, consider using `_f` instead
393```
394
395We’re receiving only warnings now, which means it compiles! But note that if
396you try `cargo run` and make a request in the browser, you’ll see the errors in
397the browser that we saw at the beginning of the chapter. Our library isn’t
398actually calling the closure passed to `execute` yet!
399
400> Note: A saying you might hear about languages with strict compilers, such as
401> Haskell and Rust, is “if the code compiles, it works.” But this saying is not
402> universally true. Our project compiles, but it does absolutely nothing! If we
403> were building a real, complete project, this would be a good time to start
404> writing unit tests to check that the code compiles *and* has the behavior we
405> want.
406
407#### Validating the Number of Threads in `new`
408
409We’ll continue to get warnings because we aren’t doing anything with the
410parameters to `new` and `execute`. Let’s implement the bodies of these
411functions with the behavior we want. To start, let’s think about `new`. Earlier
412we chose an unsigned type for the `size` parameter, because a pool with a
413negative number of threads makes no sense. However, a pool with zero threads
414also makes no sense, yet zero is a perfectly valid `usize`. We’ll add code to
415check that `size` is greater than zero before we return a `ThreadPool` instance
416and have the program panic if it receives a zero by using the `assert!` macro,
417as shown in Listing 20-13.
418
419<span class="filename">Filename: src/lib.rs</span>
420
421```rust
422# pub struct ThreadPool;
423impl ThreadPool {
424 /// Create a new ThreadPool.
425 ///
426 /// The size is the number of threads in the pool.
427 ///
428 /// # Panics
429 ///
430 /// The `new` function will panic if the size is zero.
431 pub fn new(size: usize) -> ThreadPool {
432 assert!(size > 0);
433
434 ThreadPool
435 }
436
437 // --snip--
438}
439```
440
441<span class="caption">Listing 20-13: Implementing `ThreadPool::new` to panic if
442`size` is zero</span>
443
444We’ve added some documentation for our `ThreadPool` with doc comments. Note
445that we followed good documentation practices by adding a section that calls
446out the situations in which our function can panic, as discussed in Chapter 14.
447Try running `cargo doc --open` and clicking the `ThreadPool` struct to see what
448the generated docs for `new` look like!
449
450Instead of adding the `assert!` macro as we’ve done here, we could make `new`
451return a `Result` like we did with `Config::new` in the I/O project in Listing
45212-9. But we’ve decided in this case that trying to create a thread pool
453without any threads should be an unrecoverable error. If you’re feeling
454ambitious, try to write a version of `new` with the following signature to
455compare both versions:
456
457```rust,ignore
458pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> {
459```
460
461#### Creating Space to Store the Threads
462
463Now that we have a way to know we have a valid number of threads to store in
464the pool, we can create those threads and store them in the `ThreadPool` struct
465before returning it. But how do we “store” a thread? Let’s take another look at
466the `thread::spawn` signature:
467
468```rust,ignore
469pub fn spawn<F, T>(f: F) -> JoinHandle<T>
470 where
471 F: FnOnce() -> T + Send + 'static,
472 T: Send + 'static
473```
474
475The `spawn` function returns a `JoinHandle<T>`, where `T` is the type that the
476closure returns. Let’s try using `JoinHandle` too and see what happens. In our
477case, the closures we’re passing to the thread pool will handle the connection
478and not return anything, so `T` will be the unit type `()`.
479
480The code in Listing 20-14 will compile but doesn’t create any threads yet.
481We’ve changed the definition of `ThreadPool` to hold a vector of
482`thread::JoinHandle<()>` instances, initialized the vector with a capacity of
483`size`, set up a `for` loop that will run some code to create the threads, and
484returned a `ThreadPool` instance containing them.
485
486<span class="filename">Filename: src/lib.rs</span>
487
488```rust,ignore,not_desired_behavior
489use std::thread;
490
491pub struct ThreadPool {
492 threads: Vec<thread::JoinHandle<()>>,
493}
494
495impl ThreadPool {
496 // --snip--
497 pub fn new(size: usize) -> ThreadPool {
498 assert!(size > 0);
499
500 let mut threads = Vec::with_capacity(size);
501
502 for _ in 0..size {
503 // create some threads and store them in the vector
504 }
505
506 ThreadPool {
507 threads
508 }
509 }
510
511 // --snip--
512}
513```
514
515<span class="caption">Listing 20-14: Creating a vector for `ThreadPool` to hold
516the threads</span>
517
518We’ve brought `std::thread` into scope in the library crate, because we’re
519using `thread::JoinHandle` as the type of the items in the vector in
520`ThreadPool`.
521
522Once a valid size is received, our `ThreadPool` creates a new vector that can
523hold `size` items. We haven’t used the `with_capacity` function in this book
524yet, which performs the same task as `Vec::new` but with an important
525difference: it preallocates space in the vector. Because we know we need to
526store `size` elements in the vector, doing this allocation up front is slightly
527more efficient than using `Vec::new`, which resizes itself as elements are
528inserted.
529
530When you run `cargo check` again, you’ll get a few more warnings, but it should
531succeed.
532
533#### A `Worker` Struct Responsible for Sending Code from the `ThreadPool` to a Thread
534
535We left a comment in the `for` loop in Listing 20-14 regarding the creation of
536threads. Here, we’ll look at how we actually create threads. The standard
537library provides `thread::spawn` as a way to create threads, and
538`thread::spawn` expects to get some code the thread should run as soon as the
539thread is created. However, in our case, we want to create the threads and have
540them *wait* for code that we’ll send later. The standard library’s
541implementation of threads doesn’t include any way to do that; we have to
542implement it manually.
543
544We’ll implement this behavior by introducing a new data structure between the
545`ThreadPool` and the threads that will manage this new behavior. We’ll call
546this data structure `Worker`, which is a common term in pooling
547implementations. Think of people working in the kitchen at a restaurant: the
548workers wait until orders come in from customers, and then they’re responsible
549for taking those orders and filling them.
550
551Instead of storing a vector of `JoinHandle<()>` instances in the thread pool,
552we’ll store instances of the `Worker` struct. Each `Worker` will store a single
553`JoinHandle<()>` instance. Then we’ll implement a method on `Worker` that will
554take a closure of code to run and send it to the already running thread for
555execution. We’ll also give each worker an `id` so we can distinguish between
556the different workers in the pool when logging or debugging.
557
558Let’s make the following changes to what happens when we create a `ThreadPool`.
559We’ll implement the code that sends the closure to the thread after we have
560`Worker` set up in this way:
561
5621. Define a `Worker` struct that holds an `id` and a `JoinHandle<()>`.
5632. Change `ThreadPool` to hold a vector of `Worker` instances.
5643. Define a `Worker::new` function that takes an `id` number and returns a
565 `Worker` instance that holds the `id` and a thread spawned with an empty
566 closure.
5674. In `ThreadPool::new`, use the `for` loop counter to generate an `id`, create
568 a new `Worker` with that `id`, and store the worker in the vector.
569
570If you’re up for a challenge, try implementing these changes on your own before
571looking at the code in Listing 20-15.
572
573Ready? Here is Listing 20-15 with one way to make the preceding modifications.
574
575<span class="filename">Filename: src/lib.rs</span>
576
577```rust
578use std::thread;
579
580pub struct ThreadPool {
581 workers: Vec<Worker>,
582}
583
584impl ThreadPool {
585 // --snip--
586 pub fn new(size: usize) -> ThreadPool {
587 assert!(size > 0);
588
589 let mut workers = Vec::with_capacity(size);
590
591 for id in 0..size {
592 workers.push(Worker::new(id));
593 }
594
595 ThreadPool {
596 workers
597 }
598 }
599 // --snip--
600}
601
602struct Worker {
603 id: usize,
604 thread: thread::JoinHandle<()>,
605}
606
607impl Worker {
608 fn new(id: usize) -> Worker {
609 let thread = thread::spawn(|| {});
610
611 Worker {
612 id,
613 thread,
614 }
615 }
616}
617```
618
619<span class="caption">Listing 20-15: Modifying `ThreadPool` to hold `Worker`
620instances instead of holding threads directly</span>
621
622We’ve changed the name of the field on `ThreadPool` from `threads` to `workers`
623because it’s now holding `Worker` instances instead of `JoinHandle<()>`
624instances. We use the counter in the `for` loop as an argument to
625`Worker::new`, and we store each new `Worker` in the vector named `workers`.
626
627External code (like our server in *src/bin/main.rs*) doesn’t need to know the
628implementation details regarding using a `Worker` struct within `ThreadPool`,
629so we make the `Worker` struct and its `new` function private. The
630`Worker::new` function uses the `id` we give it and stores a `JoinHandle<()>`
631instance that is created by spawning a new thread using an empty closure.
632
633This code will compile and will store the number of `Worker` instances we
634specified as an argument to `ThreadPool::new`. But we’re *still* not processing
635the closure that we get in `execute`. Let’s look at how to do that next.
636
637#### Sending Requests to Threads via Channels
638
639Now we’ll tackle the problem that the closures given to `thread::spawn` do
640absolutely nothing. Currently, we get the closure we want to execute in the
641`execute` method. But we need to give `thread::spawn` a closure to run when we
642create each `Worker` during the creation of the `ThreadPool`.
643
644We want the `Worker` structs that we just created to fetch code to run from a
645queue held in the `ThreadPool` and send that code to its thread to run.
646
647In Chapter 16, you learned about *channels*—a simple way to communicate between
648two threads—that would be perfect for this use case. We’ll use a channel to
649function as the queue of jobs, and `execute` will send a job from the
650`ThreadPool` to the `Worker` instances, which will send the job to its thread.
651Here is the plan:
652
6531. The `ThreadPool` will create a channel and hold on to the sending side of
654 the channel.
6552. Each `Worker` will hold on to the receiving side of the channel.
6563. We’ll create a new `Job` struct that will hold the closures we want to send
657 down the channel.
6584. The `execute` method will send the job it wants to execute down the sending
659 side of the channel.
6605. In its thread, the `Worker` will loop over its receiving side of the channel
661 and execute the closures of any jobs it receives.
662
663Let’s start by creating a channel in `ThreadPool::new` and holding the sending
664side in the `ThreadPool` instance, as shown in Listing 20-16. The `Job` struct
665doesn’t hold anything for now but will be the type of item we’re sending down
666the channel.
667
668<span class="filename">Filename: src/lib.rs</span>
669
670```rust
671# use std::thread;
672// --snip--
673use std::sync::mpsc;
674
675pub struct ThreadPool {
676 workers: Vec<Worker>,
677 sender: mpsc::Sender<Job>,
678}
679
680struct Job;
681
682impl ThreadPool {
683 // --snip--
684 pub fn new(size: usize) -> ThreadPool {
685 assert!(size > 0);
686
687 let (sender, receiver) = mpsc::channel();
688
689 let mut workers = Vec::with_capacity(size);
690
691 for id in 0..size {
692 workers.push(Worker::new(id));
693 }
694
695 ThreadPool {
696 workers,
697 sender,
698 }
699 }
700 // --snip--
701}
702#
703# struct Worker {
704# id: usize,
705# thread: thread::JoinHandle<()>,
706# }
707#
708# impl Worker {
709# fn new(id: usize) -> Worker {
710# let thread = thread::spawn(|| {});
711#
712# Worker {
713# id,
714# thread,
715# }
716# }
717# }
718```
719
720<span class="caption">Listing 20-16: Modifying `ThreadPool` to store the
721sending end of a channel that sends `Job` instances</span>
722
723In `ThreadPool::new`, we create our new channel and have the pool hold the
724sending end. This will successfully compile, still with warnings.
725
726Let’s try passing a receiving end of the channel into each worker as the thread
727pool creates the channel. We know we want to use the receiving end in the
728thread that the workers spawn, so we’ll reference the `receiver` parameter in
729the closure. The code in Listing 20-17 won’t quite compile yet.
730
731<span class="filename">Filename: src/lib.rs</span>
732
733```rust,ignore,does_not_compile
734impl ThreadPool {
735 // --snip--
736 pub fn new(size: usize) -> ThreadPool {
737 assert!(size > 0);
738
739 let (sender, receiver) = mpsc::channel();
740
741 let mut workers = Vec::with_capacity(size);
742
743 for id in 0..size {
744 workers.push(Worker::new(id, receiver));
745 }
746
747 ThreadPool {
748 workers,
749 sender,
750 }
751 }
752 // --snip--
753}
754
755// --snip--
756
757impl Worker {
758 fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
759 let thread = thread::spawn(|| {
760 receiver;
761 });
762
763 Worker {
764 id,
765 thread,
766 }
767 }
768}
769```
770
771<span class="caption">Listing 20-17: Passing the receiving end of the channel
772to the workers</span>
773
774We’ve made some small and straightforward changes: we pass the receiving end of
775the channel into `Worker::new`, and then we use it inside the closure.
776
777When we try to check this code, we get this error:
778
779```text
780$ cargo check
781 Compiling hello v0.1.0 (file:///projects/hello)
782error[E0382]: use of moved value: `receiver`
783 --> src/lib.rs:27:42
784 |
78527 | workers.push(Worker::new(id, receiver));
786 | ^^^^^^^^ value moved here in
787 previous iteration of loop
788 |
789 = note: move occurs because `receiver` has type
790 `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
791```
792
793The code is trying to pass `receiver` to multiple `Worker` instances. This
794won’t work, as you’ll recall from Chapter 16: the channel implementation that
795Rust provides is multiple *producer*, single *consumer*. This means we can’t
796just clone the consuming end of the channel to fix this code. Even if we could,
797that is not the technique we would want to use; instead, we want to distribute
798the jobs across threads by sharing the single `receiver` among all the workers.
799
800Additionally, taking a job off the channel queue involves mutating the
801`receiver`, so the threads need a safe way to share and modify `receiver`;
802otherwise, we might get race conditions (as covered in Chapter 16).
803
804Recall the thread-safe smart pointers discussed in Chapter 16: to share
805ownership across multiple threads and allow the threads to mutate the value, we
806need to use `Arc<Mutex<T>>`. The `Arc` type will let multiple workers own the
807receiver, and `Mutex` will ensure that only one worker gets a job from the
808receiver at a time. Listing 20-18 shows the changes we need to make.
809
810<span class="filename">Filename: src/lib.rs</span>
811
812```rust
813# use std::thread;
814# use std::sync::mpsc;
815use std::sync::Arc;
816use std::sync::Mutex;
817// --snip--
818
819# pub struct ThreadPool {
820# workers: Vec<Worker>,
821# sender: mpsc::Sender<Job>,
822# }
823# struct Job;
824#
825impl ThreadPool {
826 // --snip--
827 pub fn new(size: usize) -> ThreadPool {
828 assert!(size > 0);
829
830 let (sender, receiver) = mpsc::channel();
831
832 let receiver = Arc::new(Mutex::new(receiver));
833
834 let mut workers = Vec::with_capacity(size);
835
836 for id in 0..size {
837 workers.push(Worker::new(id, Arc::clone(&receiver)));
838 }
839
840 ThreadPool {
841 workers,
842 sender,
843 }
844 }
845
846 // --snip--
847}
848
849# struct Worker {
850# id: usize,
851# thread: thread::JoinHandle<()>,
852# }
853#
854impl Worker {
855 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
856 // --snip--
857# let thread = thread::spawn(|| {
858# receiver;
859# });
860#
861# Worker {
862# id,
863# thread,
864# }
865 }
866}
867```
868
869<span class="caption">Listing 20-18: Sharing the receiving end of the channel
870among the workers using `Arc` and `Mutex`</span>
871
872In `ThreadPool::new`, we put the receiving end of the channel in an `Arc` and a
873`Mutex`. For each new worker, we clone the `Arc` to bump the reference count so
874the workers can share ownership of the receiving end.
875
876With these changes, the code compiles! We’re getting there!
877
878#### Implementing the `execute` Method
879
880Let’s finally implement the `execute` method on `ThreadPool`. We’ll also change
881`Job` from a struct to a type alias for a trait object that holds the type of
9fa01778
XL
882closure that `execute` receives. As discussed in the [“Creating Type Synonyms
883with Type Aliases”][creating-type-synonyms-with-type-aliases]<!-- ignore -->
884section of Chapter 19, type aliases allow us to make long types shorter. Look
885at Listing 20-19.
13cf67c4
XL
886
887<span class="filename">Filename: src/lib.rs</span>
888
889```rust
890// --snip--
891# pub struct ThreadPool {
892# workers: Vec<Worker>,
893# sender: mpsc::Sender<Job>,
894# }
895# use std::sync::mpsc;
896# struct Worker {}
897
898type Job = Box<FnOnce() + Send + 'static>;
899
900impl ThreadPool {
901 // --snip--
902
903 pub fn execute<F>(&self, f: F)
904 where
905 F: FnOnce() + Send + 'static
906 {
907 let job = Box::new(f);
908
909 self.sender.send(job).unwrap();
910 }
911}
912
913// --snip--
914```
915
916<span class="caption">Listing 20-19: Creating a `Job` type alias for a `Box`
917that holds each closure and then sending the job down the channel</span>
918
919After creating a new `Job` instance using the closure we get in `execute`, we
920send that job down the sending end of the channel. We’re calling `unwrap` on
921`send` for the case that sending fails. This might happen if, for example, we
922stop all our threads from executing, meaning the receiving end has stopped
923receiving new messages. At the moment, we can’t stop our threads from
924executing: our threads continue executing as long as the pool exists. The
925reason we use `unwrap` is that we know the failure case won’t happen, but the
926compiler doesn’t know that.
927
928But we’re not quite done yet! In the worker, our closure being passed to
929`thread::spawn` still only *references* the receiving end of the channel.
930Instead, we need the closure to loop forever, asking the receiving end of the
931channel for a job and running the job when it gets one. Let’s make the change
932shown in Listing 20-20 to `Worker::new`.
933
934<span class="filename">Filename: src/lib.rs</span>
935
936```rust,ignore,does_not_compile
937// --snip--
938
939impl Worker {
940 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
941 let thread = thread::spawn(move || {
942 loop {
943 let job = receiver.lock().unwrap().recv().unwrap();
944
945 println!("Worker {} got a job; executing.", id);
946
947 (*job)();
948 }
949 });
950
951 Worker {
952 id,
953 thread,
954 }
955 }
956}
957```
958
959<span class="caption">Listing 20-20: Receiving and executing the jobs in the
960worker’s thread</span>
961
962Here, we first call `lock` on the `receiver` to acquire the mutex, and then we
963call `unwrap` to panic on any errors. Acquiring a lock might fail if the mutex
964is in a *poisoned* state, which can happen if some other thread panicked while
965holding the lock rather than releasing the lock. In this situation, calling
966`unwrap` to have this thread panic is the correct action to take. Feel free to
967change this `unwrap` to an `expect` with an error message that is meaningful to
968you.
969
970If we get the lock on the mutex, we call `recv` to receive a `Job` from the
971channel. A final `unwrap` moves past any errors here as well, which might occur
972if the thread holding the sending side of the channel has shut down, similar to
973how the `send` method returns `Err` if the receiving side shuts down.
974
975The call to `recv` blocks, so if there is no job yet, the current thread will
976wait until a job becomes available. The `Mutex<T>` ensures that only one
977`Worker` thread at a time is trying to request a job.
978
979Theoretically, this code should compile. Unfortunately, the Rust compiler isn’t
980perfect yet, and we get this error:
981
982```text
983error[E0161]: cannot move a value of type std::ops::FnOnce() +
984std::marker::Send: the size of std::ops::FnOnce() + std::marker::Send cannot be
985statically determined
986 --> src/lib.rs:63:17
987 |
98863 | (*job)();
989 | ^^^^^^
990```
991
992This error is fairly cryptic because the problem is fairly cryptic. To call a
993`FnOnce` closure that is stored in a `Box<T>` (which is what our `Job` type
994alias is), the closure needs to move itself *out* of the `Box<T>` because the
995closure takes ownership of `self` when we call it. In general, Rust doesn’t
996allow us to move a value out of a `Box<T>` because Rust doesn’t know how big
997the value inside the `Box<T>` will be: recall in Chapter 15 that we used
998`Box<T>` precisely because we had something of an unknown size that we wanted
999to store in a `Box<T>` to get a value of a known size.
1000
1001As you saw in Listing 17-15, we can write methods that use the syntax `self:
1002Box<Self>`, which allows the method to take ownership of a `Self` value stored
1003in a `Box<T>`. That’s exactly what we want to do here, but unfortunately Rust
1004won’t let us: the part of Rust that implements behavior when a closure is
1005called isn’t implemented using `self: Box<Self>`. So Rust doesn’t yet
1006understand that it could use `self: Box<Self>` in this situation to take
1007ownership of the closure and move the closure out of the `Box<T>`.
1008
1009Rust is still a work in progress with places where the compiler could be
1010improved, but in the future, the code in Listing 20-20 should work just fine.
1011People just like you are working to fix this and other issues! After you’ve
1012finished this book, we would love for you to join in.
1013
1014But for now, let’s work around this problem using a handy trick. We can tell
1015Rust explicitly that in this case we can take ownership of the value inside the
1016`Box<T>` using `self: Box<Self>`; then, once we have ownership of the closure,
1017we can call it. This involves defining a new trait `FnBox` with the method
1018`call_box` that will use `self: Box<Self>` in its signature, defining `FnBox`
1019for any type that implements `FnOnce()`, changing our type alias to use the new
1020trait, and changing `Worker` to use the `call_box` method. These changes are
1021shown in Listing 20-21.
1022
1023<span class="filename">Filename: src/lib.rs</span>
1024
1025```rust,ignore
1026trait FnBox {
1027 fn call_box(self: Box<Self>);
1028}
1029
1030impl<F: FnOnce()> FnBox for F {
1031 fn call_box(self: Box<F>) {
1032 (*self)()
1033 }
1034}
1035
532ac7d7 1036type Job = Box<dyn FnBox + Send + 'static>;
13cf67c4
XL
1037
1038// --snip--
1039
1040impl Worker {
1041 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
1042 let thread = thread::spawn(move || {
1043 loop {
1044 let job = receiver.lock().unwrap().recv().unwrap();
1045
1046 println!("Worker {} got a job; executing.", id);
1047
1048 job.call_box();
1049 }
1050 });
1051
1052 Worker {
1053 id,
1054 thread,
1055 }
1056 }
1057}
1058```
1059
1060<span class="caption">Listing 20-21: Adding a new trait `FnBox` to work around
1061the current limitations of `Box<FnOnce()>`</span>
1062
1063First, we create a new trait named `FnBox`. This trait has the one method
1064`call_box`, which is similar to the `call` methods on the other `Fn*` traits
1065except that it takes `self: Box<Self>` to take ownership of `self` and move the
1066value out of the `Box<T>`.
1067
1068Next, we implement the `FnBox` trait for any type `F` that implements the
1069`FnOnce()` trait. Effectively, this means that any `FnOnce()` closures can use
1070our `call_box` method. The implementation of `call_box` uses `(*self)()` to
1071move the closure out of the `Box<T>` and call the closure.
1072
1073We now need our `Job` type alias to be a `Box` of anything that implements our
1074new trait `FnBox`. This will allow us to use `call_box` in `Worker` when we get
1075a `Job` value instead of invoking the closure directly. Implementing the
1076`FnBox` trait for any `FnOnce()` closure means we don’t have to change anything
1077about the actual values we’re sending down the channel. Now Rust is able to
1078recognize that what we want to do is fine.
1079
1080This trick is very sneaky and complicated. Don’t worry if it doesn’t make
1081perfect sense; someday, it will be completely unnecessary.
1082
1083With the implementation of this trick, our thread pool is in a working state!
1084Give it a `cargo run` and make some requests:
1085
1086```text
1087$ cargo run
1088 Compiling hello v0.1.0 (file:///projects/hello)
1089warning: field is never used: `workers`
1090 --> src/lib.rs:7:5
1091 |
10927 | workers: Vec<Worker>,
1093 | ^^^^^^^^^^^^^^^^^^^^
1094 |
1095 = note: #[warn(dead_code)] on by default
1096
1097warning: field is never used: `id`
1098 --> src/lib.rs:61:5
1099 |
110061 | id: usize,
1101 | ^^^^^^^^^
1102 |
1103 = note: #[warn(dead_code)] on by default
1104
1105warning: field is never used: `thread`
1106 --> src/lib.rs:62:5
1107 |
110862 | thread: thread::JoinHandle<()>,
1109 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1110 |
1111 = note: #[warn(dead_code)] on by default
1112
1113 Finished dev [unoptimized + debuginfo] target(s) in 0.99 secs
1114 Running `target/debug/hello`
1115Worker 0 got a job; executing.
1116Worker 2 got a job; executing.
1117Worker 1 got a job; executing.
1118Worker 3 got a job; executing.
1119Worker 0 got a job; executing.
1120Worker 2 got a job; executing.
1121Worker 1 got a job; executing.
1122Worker 3 got a job; executing.
1123Worker 0 got a job; executing.
1124Worker 2 got a job; executing.
1125```
1126
1127Success! We now have a thread pool that executes connections asynchronously.
1128There are never more than four threads created, so our system won’t get
1129overloaded if the server receives a lot of requests. If we make a request to
1130*/sleep*, the server will be able to serve other requests by having another
1131thread run them.
1132
9fa01778
XL
1133> Note: if you open */sleep* in multiple browser windows simultaneously, they
1134> might load one at a time in 5 second intervals. Some web browsers execute
1135> multiple instances of the same request sequentially for caching reasons. This
1136> limitation is not caused by our web server.
13cf67c4
XL
1137
1138After learning about the `while let` loop in Chapter 18, you might be wondering
1139why we didn’t write the worker thread code as shown in Listing 20-22.
1140
1141<span class="filename">Filename: src/lib.rs</span>
1142
1143```rust,ignore,not_desired_behavior
1144// --snip--
1145
1146impl Worker {
1147 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
1148 let thread = thread::spawn(move || {
1149 while let Ok(job) = receiver.lock().unwrap().recv() {
1150 println!("Worker {} got a job; executing.", id);
1151
1152 job.call_box();
1153 }
1154 });
1155
1156 Worker {
1157 id,
1158 thread,
1159 }
1160 }
1161}
1162```
1163
1164<span class="caption">Listing 20-22: An alternative implementation of
1165`Worker::new` using `while let`</span>
1166
1167This code compiles and runs but doesn’t result in the desired threading
1168behavior: a slow request will still cause other requests to wait to be
1169processed. The reason is somewhat subtle: the `Mutex` struct has no public
1170`unlock` method because the ownership of the lock is based on the lifetime of
1171the `MutexGuard<T>` within the `LockResult<MutexGuard<T>>` that the `lock`
1172method returns. At compile time, the borrow checker can then enforce the rule
1173that a resource guarded by a `Mutex` cannot be accessed unless we hold the
1174lock. But this implementation can also result in the lock being held longer
1175than intended if we don’t think carefully about the lifetime of the
1176`MutexGuard<T>`. Because the values in the `while` expression remain in scope
1177for the duration of the block, the lock remains held for the duration of the
1178call to `job.call_box()`, meaning other workers cannot receive jobs.
1179
1180By using `loop` instead and acquiring the lock and a job within the block
1181rather than outside it, the `MutexGuard` returned from the `lock` method is
1182dropped as soon as the `let job` statement ends. This ensures that the lock is
1183held during the call to `recv`, but it is released before the call to
1184`job.call_box()`, allowing multiple requests to be serviced concurrently.
9fa01778
XL
1185
1186[creating-type-synonyms-with-type-aliases]:
1187ch19-04-advanced-types.html#creating-type-synonyms-with-type-aliases
1188[integer-types]: ch03-02-data-types.html#integer-types
1189[storing-closures-using-generic-parameters-and-the-fn-traits]:
1190ch13-01-closures.html#storing-closures-using-generic-parameters-and-the-fn-traits