]> git.proxmox.com Git - rustc.git/blob - src/doc/book/src/ch20-02-multithreaded.md
New upstream version 1.40.0+dfsg1
[rustc.git] / src / doc / book / src / ch20-02-multithreaded.md
1 ## Turning Our Single-Threaded Server into a Multithreaded Server
2
3 Right now, the server will process each request in turn, meaning it won’t
4 process a second connection until the first is finished processing. If the
5 server received more and more requests, this serial execution would be less and
6 less optimal. If the server receives a request that takes a long time to
7 process, subsequent requests will have to wait until the long request is
8 finished, even if the new requests can be processed quickly. We’ll need to fix
9 this, but first, we’ll look at the problem in action.
10
11 ### Simulating a Slow Request in the Current Server Implementation
12
13 We’ll look at how a slow-processing request can affect other requests made to
14 our current server implementation. Listing 20-10 implements handling a request
15 to */sleep* with a simulated slow response that will cause the server to sleep
16 for 5 seconds before responding.
17
18 <span class="filename">Filename: src/main.rs</span>
19
20 ```rust
21 use std::thread;
22 use std::time::Duration;
23 # use std::io::prelude::*;
24 # use std::net::TcpStream;
25 # use std::fs::File;
26 // --snip--
27
28 fn handle_connection(mut stream: TcpStream) {
29 # let mut buffer = [0; 512];
30 # stream.read(&mut buffer).unwrap();
31 // --snip--
32
33 let get = b"GET / HTTP/1.1\r\n";
34 let sleep = b"GET /sleep HTTP/1.1\r\n";
35
36 let (status_line, filename) = if buffer.starts_with(get) {
37 ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
38 } else if buffer.starts_with(sleep) {
39 thread::sleep(Duration::from_secs(5));
40 ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
41 } else {
42 ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
43 };
44
45 // --snip--
46 }
47 ```
48
49 <span class="caption">Listing 20-10: Simulating a slow request by recognizing
50 */sleep* and sleeping for 5 seconds</span>
51
52 This code is a bit messy, but it’s good enough for simulation purposes. We
53 created a second request `sleep`, whose data our server recognizes. We added an
54 `else if` after the `if` block to check for the request to */sleep*. When that
55 request is received, the server will sleep for 5 seconds before rendering the
56 successful HTML page.
57
58 You can see how primitive our server is: real libraries would handle the
59 recognition of multiple requests in a much less verbose way!
60
61 Start the server using `cargo run`. Then open two browser windows: one for
62 *http://127.0.0.1:7878/* and the other for *http://127.0.0.1:7878/sleep*. If
63 you enter the */* URI a few times, as before, you’ll see it respond quickly.
64 But if you enter */sleep* and then load */*, you’ll see that */* waits until
65 `sleep` has slept for its full 5 seconds before loading.
66
67 There are multiple ways we could change how our web server works to avoid
68 having more requests back up behind a slow request; the one we’ll implement is
69 a thread pool.
70
71 ### Improving Throughput with a Thread Pool
72
73 A *thread pool* is a group of spawned threads that are waiting and ready to
74 handle a task. When the program receives a new task, it assigns one of the
75 threads in the pool to the task, and that thread will process the task. The
76 remaining threads in the pool are available to handle any other tasks that come
77 in while the first thread is processing. When the first thread is done
78 processing its task, it’s returned to the pool of idle threads, ready to handle
79 a new task. A thread pool allows you to process connections concurrently,
80 increasing the throughput of your server.
81
82 We’ll limit the number of threads in the pool to a small number to protect us
83 from Denial of Service (DoS) attacks; if we had our program create a new thread
84 for each request as it came in, someone making 10 million requests to our
85 server could create havoc by using up all our server’s resources and grinding
86 the processing of requests to a halt.
87
88 Rather than spawning unlimited threads, we’ll have a fixed number of threads
89 waiting in the pool. As requests come in, they’ll be sent to the pool for
90 processing. The pool will maintain a queue of incoming requests. Each of the
91 threads in the pool will pop off a request from this queue, handle the request,
92 and then ask the queue for another request. With this design, we can process
93 `N` requests concurrently, where `N` is the number of threads. If each thread
94 is responding to a long-running request, subsequent requests can still back up
95 in the queue, but we’ve increased the number of long-running requests we can
96 handle before reaching that point.
97
98 This technique is just one of many ways to improve the throughput of a web
99 server. Other options you might explore are the fork/join model and the
100 single-threaded async I/O model. If you’re interested in this topic, you can
101 read more about other solutions and try to implement them in Rust; with a
102 low-level language like Rust, all of these options are possible.
103
104 Before we begin implementing a thread pool, let’s talk about what using the
105 pool should look like. When you’re trying to design code, writing the client
106 interface first can help guide your design. Write the API of the code so it’s
107 structured in the way you want to call it; then implement the functionality
108 within that structure rather than implementing the functionality and then
109 designing the public API.
110
111 Similar to how we used test-driven development in the project in Chapter 12,
112 we’ll use compiler-driven development here. We’ll write the code that calls the
113 functions we want, and then we’ll look at errors from the compiler to determine
114 what we should change next to get the code to work.
115
116 #### Code Structure If We Could Spawn a Thread for Each Request
117
118 First, let’s explore how our code might look if it did create a new thread for
119 every connection. As mentioned earlier, this isn’t our final plan due to the
120 problems with potentially spawning an unlimited number of threads, but it is a
121 starting point. Listing 20-11 shows the changes to make to `main` to spawn a
122 new thread to handle each stream within the `for` loop.
123
124 <span class="filename">Filename: src/main.rs</span>
125
126 ```rust,no_run
127 # use std::thread;
128 # use std::io::prelude::*;
129 # use std::net::TcpListener;
130 # use std::net::TcpStream;
131 #
132 fn main() {
133 let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
134
135 for stream in listener.incoming() {
136 let stream = stream.unwrap();
137
138 thread::spawn(|| {
139 handle_connection(stream);
140 });
141 }
142 }
143 # fn handle_connection(mut stream: TcpStream) {}
144 ```
145
146 <span class="caption">Listing 20-11: Spawning a new thread for each
147 stream</span>
148
149 As you learned in Chapter 16, `thread::spawn` will create a new thread and then
150 run the code in the closure in the new thread. If you run this code and load
151 */sleep* in your browser, then */* in two more browser tabs, you’ll indeed see
152 that the requests to */* don’t have to wait for */sleep* to finish. But as we
153 mentioned, this will eventually overwhelm the system because you’d be making
154 new threads without any limit.
155
156 #### Creating a Similar Interface for a Finite Number of Threads
157
158 We want our thread pool to work in a similar, familiar way so switching from
159 threads to a thread pool doesn’t require large changes to the code that uses
160 our API. Listing 20-12 shows the hypothetical interface for a `ThreadPool`
161 struct we want to use instead of `thread::spawn`.
162
163 <span class="filename">Filename: src/main.rs</span>
164
165 ```rust,no_run
166 # use std::thread;
167 # use std::io::prelude::*;
168 # use std::net::TcpListener;
169 # use std::net::TcpStream;
170 # struct ThreadPool;
171 # impl ThreadPool {
172 # fn new(size: u32) -> ThreadPool { ThreadPool }
173 # fn execute<F>(&self, f: F)
174 # where F: FnOnce() + Send + 'static {}
175 # }
176 #
177 fn main() {
178 let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
179 let pool = ThreadPool::new(4);
180
181 for stream in listener.incoming() {
182 let stream = stream.unwrap();
183
184 pool.execute(|| {
185 handle_connection(stream);
186 });
187 }
188 }
189 # fn handle_connection(mut stream: TcpStream) {}
190 ```
191
192 <span class="caption">Listing 20-12: Our ideal `ThreadPool` interface</span>
193
194 We use `ThreadPool::new` to create a new thread pool with a configurable number
195 of threads, in this case four. Then, in the `for` loop, `pool.execute` has a
196 similar interface as `thread::spawn` in that it takes a closure the pool should
197 run for each stream. We need to implement `pool.execute` so it takes the
198 closure and gives it to a thread in the pool to run. This code won’t yet
199 compile, but we’ll try so the compiler can guide us in how to fix it.
200
201 #### Building the `ThreadPool` Struct Using Compiler Driven Development
202
203 Make the changes in Listing 20-12 to *src/main.rs*, and then let’s use the
204 compiler errors from `cargo check` to drive our development. Here is the first
205 error we get:
206
207 ```text
208 $ cargo check
209 Compiling hello v0.1.0 (file:///projects/hello)
210 error[E0433]: failed to resolve. Use of undeclared type or module `ThreadPool`
211 --> src\main.rs:10:16
212 |
213 10 | let pool = ThreadPool::new(4);
214 | ^^^^^^^^^^^^^^^ Use of undeclared type or module
215 `ThreadPool`
216
217 error: aborting due to previous error
218 ```
219
220 Great! This error tells us we need a `ThreadPool` type or module, so we’ll
221 build one now. Our `ThreadPool` implementation will be independent of the kind
222 of work our web server is doing. So, let’s switch the `hello` crate from a
223 binary crate to a library crate to hold our `ThreadPool` implementation. After
224 we change to a library crate, we could also use the separate thread pool
225 library for any work we want to do using a thread pool, not just for serving
226 web requests.
227
228 Create a *src/lib.rs* that contains the following, which is the simplest
229 definition of a `ThreadPool` struct that we can have for now:
230
231 <span class="filename">Filename: src/lib.rs</span>
232
233 ```rust
234 pub struct ThreadPool;
235 ```
236
237 Then create a new directory, *src/bin*, and move the binary crate rooted in
238 *src/main.rs* into *src/bin/main.rs*. Doing so will make the library crate the
239 primary crate in the *hello* directory; we can still run the binary in
240 *src/bin/main.rs* using `cargo run`. After moving the *main.rs* file, edit it
241 to bring the library crate in and bring `ThreadPool` into scope by adding the
242 following code to the top of *src/bin/main.rs*:
243
244 <span class="filename">Filename: src/bin/main.rs</span>
245
246 ```rust,ignore
247 use hello::ThreadPool;
248 ```
249
250 This code still won’t work, but let’s check it again to get the next error that
251 we need to address:
252
253 ```text
254 $ cargo check
255 Compiling hello v0.1.0 (file:///projects/hello)
256 error[E0599]: no function or associated item named `new` found for type
257 `hello::ThreadPool` in the current scope
258 --> src/bin/main.rs:13:16
259 |
260 13 | let pool = ThreadPool::new(4);
261 | ^^^^^^^^^^^^^^^ function or associated item not found in
262 `hello::ThreadPool`
263 ```
264
265 This error indicates that next we need to create an associated function named
266 `new` for `ThreadPool`. We also know that `new` needs to have one parameter
267 that can accept `4` as an argument and should return a `ThreadPool` instance.
268 Let’s implement the simplest `new` function that will have those
269 characteristics:
270
271 <span class="filename">Filename: src/lib.rs</span>
272
273 ```rust
274 pub struct ThreadPool;
275
276 impl ThreadPool {
277 pub fn new(size: usize) -> ThreadPool {
278 ThreadPool
279 }
280 }
281 ```
282
283 We chose `usize` as the type of the `size` parameter, because we know that a
284 negative number of threads doesn’t make any sense. We also know we’ll use this
285 4 as the number of elements in a collection of threads, which is what the
286 `usize` type is for, as discussed in the [“Integer Types”][integer-types]<!--
287 ignore --> section of Chapter 3.
288
289 Let’s check the code again:
290
291 ```text
292 $ cargo check
293 Compiling hello v0.1.0 (file:///projects/hello)
294 warning: unused variable: `size`
295 --> src/lib.rs:4:16
296 |
297 4 | pub fn new(size: usize) -> ThreadPool {
298 | ^^^^
299 |
300 = note: #[warn(unused_variables)] on by default
301 = note: to avoid this warning, consider using `_size` instead
302
303 error[E0599]: no method named `execute` found for type `hello::ThreadPool` in the current scope
304 --> src/bin/main.rs:18:14
305 |
306 18 | pool.execute(|| {
307 | ^^^^^^^
308 ```
309
310 Now we get a warning and an error. Ignoring the warning for a moment, the error
311 occurs because we don’t have an `execute` method on `ThreadPool`. Recall from
312 the [“Creating a Similar Interface for a Finite Number of
313 Threads”](#creating-a-similar-interface-for-a-finite-number-of-threads)<!--
314 ignore --> section that we decided our thread pool should have an interface
315 similar to `thread::spawn`. In addition, we’ll implement the `execute` function
316 so it takes the closure it’s given and gives it to an idle thread in the pool
317 to run.
318
319 We’ll define the `execute` method on `ThreadPool` to take a closure as a
320 parameter. Recall from the [“Storing Closures Using Generic Parameters and the
321 `Fn` Traits”][storing-closures-using-generic-parameters-and-the-fn-traits]<!--
322 ignore --> section in Chapter 13 that we can take closures as parameters with
323 three different traits: `Fn`, `FnMut`, and `FnOnce`. We need to decide which
324 kind of closure to use here. We know we’ll end up doing something similar to
325 the standard library `thread::spawn` implementation, so we can look at what
326 bounds the signature of `thread::spawn` has on its parameter. The documentation
327 shows us the following:
328
329 ```rust,ignore
330 pub fn spawn<F, T>(f: F) -> JoinHandle<T>
331 where
332 F: FnOnce() -> T + Send + 'static,
333 T: Send + 'static
334 ```
335
336 The `F` type parameter is the one we’re concerned with here; the `T` type
337 parameter is related to the return value, and we’re not concerned with that. We
338 can see that `spawn` uses `FnOnce` as the trait bound on `F`. This is probably
339 what we want as well, because we’ll eventually pass the argument we get in
340 `execute` to `spawn`. We can be further confident that `FnOnce` is the trait we
341 want to use because the thread for running a request will only execute that
342 request’s closure one time, which matches the `Once` in `FnOnce`.
343
344 The `F` type parameter also has the trait bound `Send` and the lifetime bound
345 `'static`, which are useful in our situation: we need `Send` to transfer the
346 closure from one thread to another and `'static` because we don’t know how long
347 the thread will take to execute. Let’s create an `execute` method on
348 `ThreadPool` that will take a generic parameter of type `F` with these bounds:
349
350 <span class="filename">Filename: src/lib.rs</span>
351
352 ```rust
353 # pub struct ThreadPool;
354 impl ThreadPool {
355 // --snip--
356
357 pub fn execute<F>(&self, f: F)
358 where
359 F: FnOnce() + Send + 'static
360 {
361
362 }
363 }
364 ```
365
366 We still use the `()` after `FnOnce` because this `FnOnce` represents a closure
367 that takes no parameters and returns the unit type `()`. Just like function
368 definitions, the return type can be omitted from the signature, but even if we
369 have no parameters, we still need the parentheses.
370
371 Again, this is the simplest implementation of the `execute` method: it does
372 nothing, but we’re trying only to make our code compile. Let’s check it again:
373
374 ```text
375 $ cargo check
376 Compiling hello v0.1.0 (file:///projects/hello)
377 warning: unused variable: `size`
378 --> src/lib.rs:4:16
379 |
380 4 | pub fn new(size: usize) -> ThreadPool {
381 | ^^^^
382 |
383 = note: #[warn(unused_variables)] on by default
384 = note: to avoid this warning, consider using `_size` instead
385
386 warning: unused variable: `f`
387 --> src/lib.rs:8:30
388 |
389 8 | pub fn execute<F>(&self, f: F)
390 | ^
391 |
392 = note: to avoid this warning, consider using `_f` instead
393 ```
394
395 We’re receiving only warnings now, which means it compiles! But note that if
396 you try `cargo run` and make a request in the browser, you’ll see the errors in
397 the browser that we saw at the beginning of the chapter. Our library isn’t
398 actually calling the closure passed to `execute` yet!
399
400 > Note: A saying you might hear about languages with strict compilers, such as
401 > Haskell and Rust, is “if the code compiles, it works.” But this saying is not
402 > universally true. Our project compiles, but it does absolutely nothing! If we
403 > were building a real, complete project, this would be a good time to start
404 > writing unit tests to check that the code compiles *and* has the behavior we
405 > want.
406
407 #### Validating the Number of Threads in `new`
408
409 We’ll continue to get warnings because we aren’t doing anything with the
410 parameters to `new` and `execute`. Let’s implement the bodies of these
411 functions with the behavior we want. To start, let’s think about `new`. Earlier
412 we chose an unsigned type for the `size` parameter, because a pool with a
413 negative number of threads makes no sense. However, a pool with zero threads
414 also makes no sense, yet zero is a perfectly valid `usize`. We’ll add code to
415 check that `size` is greater than zero before we return a `ThreadPool` instance
416 and have the program panic if it receives a zero by using the `assert!` macro,
417 as shown in Listing 20-13.
418
419 <span class="filename">Filename: src/lib.rs</span>
420
421 ```rust
422 # pub struct ThreadPool;
423 impl ThreadPool {
424 /// Create a new ThreadPool.
425 ///
426 /// The size is the number of threads in the pool.
427 ///
428 /// # Panics
429 ///
430 /// The `new` function will panic if the size is zero.
431 pub fn new(size: usize) -> ThreadPool {
432 assert!(size > 0);
433
434 ThreadPool
435 }
436
437 // --snip--
438 }
439 ```
440
441 <span class="caption">Listing 20-13: Implementing `ThreadPool::new` to panic if
442 `size` is zero</span>
443
444 We’ve added some documentation for our `ThreadPool` with doc comments. Note
445 that we followed good documentation practices by adding a section that calls
446 out the situations in which our function can panic, as discussed in Chapter 14.
447 Try running `cargo doc --open` and clicking the `ThreadPool` struct to see what
448 the generated docs for `new` look like!
449
450 Instead of adding the `assert!` macro as we’ve done here, we could make `new`
451 return a `Result` like we did with `Config::new` in the I/O project in Listing
452 12-9. But we’ve decided in this case that trying to create a thread pool
453 without any threads should be an unrecoverable error. If you’re feeling
454 ambitious, try to write a version of `new` with the following signature to
455 compare both versions:
456
457 ```rust,ignore
458 pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> {
459 ```
460
461 #### Creating Space to Store the Threads
462
463 Now that we have a way to know we have a valid number of threads to store in
464 the pool, we can create those threads and store them in the `ThreadPool` struct
465 before returning it. But how do we “store” a thread? Let’s take another look at
466 the `thread::spawn` signature:
467
468 ```rust,ignore
469 pub fn spawn<F, T>(f: F) -> JoinHandle<T>
470 where
471 F: FnOnce() -> T + Send + 'static,
472 T: Send + 'static
473 ```
474
475 The `spawn` function returns a `JoinHandle<T>`, where `T` is the type that the
476 closure returns. Let’s try using `JoinHandle` too and see what happens. In our
477 case, the closures we’re passing to the thread pool will handle the connection
478 and not return anything, so `T` will be the unit type `()`.
479
480 The code in Listing 20-14 will compile but doesn’t create any threads yet.
481 We’ve changed the definition of `ThreadPool` to hold a vector of
482 `thread::JoinHandle<()>` instances, initialized the vector with a capacity of
483 `size`, set up a `for` loop that will run some code to create the threads, and
484 returned a `ThreadPool` instance containing them.
485
486 <span class="filename">Filename: src/lib.rs</span>
487
488 ```rust,ignore,not_desired_behavior
489 use std::thread;
490
491 pub struct ThreadPool {
492 threads: Vec<thread::JoinHandle<()>>,
493 }
494
495 impl ThreadPool {
496 // --snip--
497 pub fn new(size: usize) -> ThreadPool {
498 assert!(size > 0);
499
500 let mut threads = Vec::with_capacity(size);
501
502 for _ in 0..size {
503 // create some threads and store them in the vector
504 }
505
506 ThreadPool {
507 threads
508 }
509 }
510
511 // --snip--
512 }
513 ```
514
515 <span class="caption">Listing 20-14: Creating a vector for `ThreadPool` to hold
516 the threads</span>
517
518 We’ve brought `std::thread` into scope in the library crate, because we’re
519 using `thread::JoinHandle` as the type of the items in the vector in
520 `ThreadPool`.
521
522 Once a valid size is received, our `ThreadPool` creates a new vector that can
523 hold `size` items. We haven’t used the `with_capacity` function in this book
524 yet, which performs the same task as `Vec::new` but with an important
525 difference: it preallocates space in the vector. Because we know we need to
526 store `size` elements in the vector, doing this allocation up front is slightly
527 more efficient than using `Vec::new`, which resizes itself as elements are
528 inserted.
529
530 When you run `cargo check` again, you’ll get a few more warnings, but it should
531 succeed.
532
533 #### A `Worker` Struct Responsible for Sending Code from the `ThreadPool` to a Thread
534
535 We left a comment in the `for` loop in Listing 20-14 regarding the creation of
536 threads. Here, we’ll look at how we actually create threads. The standard
537 library provides `thread::spawn` as a way to create threads, and
538 `thread::spawn` expects to get some code the thread should run as soon as the
539 thread is created. However, in our case, we want to create the threads and have
540 them *wait* for code that we’ll send later. The standard library’s
541 implementation of threads doesn’t include any way to do that; we have to
542 implement it manually.
543
544 We’ll implement this behavior by introducing a new data structure between the
545 `ThreadPool` and the threads that will manage this new behavior. We’ll call
546 this data structure `Worker`, which is a common term in pooling
547 implementations. Think of people working in the kitchen at a restaurant: the
548 workers wait until orders come in from customers, and then they’re responsible
549 for taking those orders and filling them.
550
551 Instead of storing a vector of `JoinHandle<()>` instances in the thread pool,
552 we’ll store instances of the `Worker` struct. Each `Worker` will store a single
553 `JoinHandle<()>` instance. Then we’ll implement a method on `Worker` that will
554 take a closure of code to run and send it to the already running thread for
555 execution. We’ll also give each worker an `id` so we can distinguish between
556 the different workers in the pool when logging or debugging.
557
558 Let’s make the following changes to what happens when we create a `ThreadPool`.
559 We’ll implement the code that sends the closure to the thread after we have
560 `Worker` set up in this way:
561
562 1. Define a `Worker` struct that holds an `id` and a `JoinHandle<()>`.
563 2. Change `ThreadPool` to hold a vector of `Worker` instances.
564 3. Define a `Worker::new` function that takes an `id` number and returns a
565 `Worker` instance that holds the `id` and a thread spawned with an empty
566 closure.
567 4. In `ThreadPool::new`, use the `for` loop counter to generate an `id`, create
568 a new `Worker` with that `id`, and store the worker in the vector.
569
570 If you’re up for a challenge, try implementing these changes on your own before
571 looking at the code in Listing 20-15.
572
573 Ready? Here is Listing 20-15 with one way to make the preceding modifications.
574
575 <span class="filename">Filename: src/lib.rs</span>
576
577 ```rust
578 use std::thread;
579
580 pub struct ThreadPool {
581 workers: Vec<Worker>,
582 }
583
584 impl ThreadPool {
585 // --snip--
586 pub fn new(size: usize) -> ThreadPool {
587 assert!(size > 0);
588
589 let mut workers = Vec::with_capacity(size);
590
591 for id in 0..size {
592 workers.push(Worker::new(id));
593 }
594
595 ThreadPool {
596 workers
597 }
598 }
599 // --snip--
600 }
601
602 struct Worker {
603 id: usize,
604 thread: thread::JoinHandle<()>,
605 }
606
607 impl Worker {
608 fn new(id: usize) -> Worker {
609 let thread = thread::spawn(|| {});
610
611 Worker {
612 id,
613 thread,
614 }
615 }
616 }
617 ```
618
619 <span class="caption">Listing 20-15: Modifying `ThreadPool` to hold `Worker`
620 instances instead of holding threads directly</span>
621
622 We’ve changed the name of the field on `ThreadPool` from `threads` to `workers`
623 because it’s now holding `Worker` instances instead of `JoinHandle<()>`
624 instances. We use the counter in the `for` loop as an argument to
625 `Worker::new`, and we store each new `Worker` in the vector named `workers`.
626
627 External code (like our server in *src/bin/main.rs*) doesn’t need to know the
628 implementation details regarding using a `Worker` struct within `ThreadPool`,
629 so we make the `Worker` struct and its `new` function private. The
630 `Worker::new` function uses the `id` we give it and stores a `JoinHandle<()>`
631 instance that is created by spawning a new thread using an empty closure.
632
633 This code will compile and will store the number of `Worker` instances we
634 specified as an argument to `ThreadPool::new`. But we’re *still* not processing
635 the closure that we get in `execute`. Let’s look at how to do that next.
636
637 #### Sending Requests to Threads via Channels
638
639 Now we’ll tackle the problem that the closures given to `thread::spawn` do
640 absolutely nothing. Currently, we get the closure we want to execute in the
641 `execute` method. But we need to give `thread::spawn` a closure to run when we
642 create each `Worker` during the creation of the `ThreadPool`.
643
644 We want the `Worker` structs that we just created to fetch code to run from a
645 queue held in the `ThreadPool` and send that code to its thread to run.
646
647 In Chapter 16, you learned about *channels*—a simple way to communicate between
648 two threads—that would be perfect for this use case. We’ll use a channel to
649 function as the queue of jobs, and `execute` will send a job from the
650 `ThreadPool` to the `Worker` instances, which will send the job to its thread.
651 Here is the plan:
652
653 1. The `ThreadPool` will create a channel and hold on to the sending side of
654 the channel.
655 2. Each `Worker` will hold on to the receiving side of the channel.
656 3. We’ll create a new `Job` struct that will hold the closures we want to send
657 down the channel.
658 4. The `execute` method will send the job it wants to execute down the sending
659 side of the channel.
660 5. In its thread, the `Worker` will loop over its receiving side of the channel
661 and execute the closures of any jobs it receives.
662
663 Let’s start by creating a channel in `ThreadPool::new` and holding the sending
664 side in the `ThreadPool` instance, as shown in Listing 20-16. The `Job` struct
665 doesn’t hold anything for now but will be the type of item we’re sending down
666 the channel.
667
668 <span class="filename">Filename: src/lib.rs</span>
669
670 ```rust
671 # use std::thread;
672 // --snip--
673 use std::sync::mpsc;
674
675 pub struct ThreadPool {
676 workers: Vec<Worker>,
677 sender: mpsc::Sender<Job>,
678 }
679
680 struct Job;
681
682 impl ThreadPool {
683 // --snip--
684 pub fn new(size: usize) -> ThreadPool {
685 assert!(size > 0);
686
687 let (sender, receiver) = mpsc::channel();
688
689 let mut workers = Vec::with_capacity(size);
690
691 for id in 0..size {
692 workers.push(Worker::new(id));
693 }
694
695 ThreadPool {
696 workers,
697 sender,
698 }
699 }
700 // --snip--
701 }
702 #
703 # struct Worker {
704 # id: usize,
705 # thread: thread::JoinHandle<()>,
706 # }
707 #
708 # impl Worker {
709 # fn new(id: usize) -> Worker {
710 # let thread = thread::spawn(|| {});
711 #
712 # Worker {
713 # id,
714 # thread,
715 # }
716 # }
717 # }
718 ```
719
720 <span class="caption">Listing 20-16: Modifying `ThreadPool` to store the
721 sending end of a channel that sends `Job` instances</span>
722
723 In `ThreadPool::new`, we create our new channel and have the pool hold the
724 sending end. This will successfully compile, still with warnings.
725
726 Let’s try passing a receiving end of the channel into each worker as the thread
727 pool creates the channel. We know we want to use the receiving end in the
728 thread that the workers spawn, so we’ll reference the `receiver` parameter in
729 the closure. The code in Listing 20-17 won’t quite compile yet.
730
731 <span class="filename">Filename: src/lib.rs</span>
732
733 ```rust,ignore,does_not_compile
734 impl ThreadPool {
735 // --snip--
736 pub fn new(size: usize) -> ThreadPool {
737 assert!(size > 0);
738
739 let (sender, receiver) = mpsc::channel();
740
741 let mut workers = Vec::with_capacity(size);
742
743 for id in 0..size {
744 workers.push(Worker::new(id, receiver));
745 }
746
747 ThreadPool {
748 workers,
749 sender,
750 }
751 }
752 // --snip--
753 }
754
755 // --snip--
756
757 impl Worker {
758 fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
759 let thread = thread::spawn(|| {
760 receiver;
761 });
762
763 Worker {
764 id,
765 thread,
766 }
767 }
768 }
769 ```
770
771 <span class="caption">Listing 20-17: Passing the receiving end of the channel
772 to the workers</span>
773
774 We’ve made some small and straightforward changes: we pass the receiving end of
775 the channel into `Worker::new`, and then we use it inside the closure.
776
777 When we try to check this code, we get this error:
778
779 ```text
780 $ cargo check
781 Compiling hello v0.1.0 (file:///projects/hello)
782 error[E0382]: use of moved value: `receiver`
783 --> src/lib.rs:27:42
784 |
785 27 | workers.push(Worker::new(id, receiver));
786 | ^^^^^^^^ value moved here in
787 previous iteration of loop
788 |
789 = note: move occurs because `receiver` has type
790 `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
791 ```
792
793 The code is trying to pass `receiver` to multiple `Worker` instances. This
794 won’t work, as you’ll recall from Chapter 16: the channel implementation that
795 Rust provides is multiple *producer*, single *consumer*. This means we can’t
796 just clone the consuming end of the channel to fix this code. Even if we could,
797 that is not the technique we would want to use; instead, we want to distribute
798 the jobs across threads by sharing the single `receiver` among all the workers.
799
800 Additionally, taking a job off the channel queue involves mutating the
801 `receiver`, so the threads need a safe way to share and modify `receiver`;
802 otherwise, we might get race conditions (as covered in Chapter 16).
803
804 Recall the thread-safe smart pointers discussed in Chapter 16: to share
805 ownership across multiple threads and allow the threads to mutate the value, we
806 need to use `Arc<Mutex<T>>`. The `Arc` type will let multiple workers own the
807 receiver, and `Mutex` will ensure that only one worker gets a job from the
808 receiver at a time. Listing 20-18 shows the changes we need to make.
809
810 <span class="filename">Filename: src/lib.rs</span>
811
812 ```rust
813 # use std::thread;
814 # use std::sync::mpsc;
815 use std::sync::Arc;
816 use std::sync::Mutex;
817 // --snip--
818
819 # pub struct ThreadPool {
820 # workers: Vec<Worker>,
821 # sender: mpsc::Sender<Job>,
822 # }
823 # struct Job;
824 #
825 impl ThreadPool {
826 // --snip--
827 pub fn new(size: usize) -> ThreadPool {
828 assert!(size > 0);
829
830 let (sender, receiver) = mpsc::channel();
831
832 let receiver = Arc::new(Mutex::new(receiver));
833
834 let mut workers = Vec::with_capacity(size);
835
836 for id in 0..size {
837 workers.push(Worker::new(id, Arc::clone(&receiver)));
838 }
839
840 ThreadPool {
841 workers,
842 sender,
843 }
844 }
845
846 // --snip--
847 }
848
849 # struct Worker {
850 # id: usize,
851 # thread: thread::JoinHandle<()>,
852 # }
853 #
854 impl Worker {
855 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
856 // --snip--
857 # let thread = thread::spawn(|| {
858 # receiver;
859 # });
860 #
861 # Worker {
862 # id,
863 # thread,
864 # }
865 }
866 }
867 ```
868
869 <span class="caption">Listing 20-18: Sharing the receiving end of the channel
870 among the workers using `Arc` and `Mutex`</span>
871
872 In `ThreadPool::new`, we put the receiving end of the channel in an `Arc` and a
873 `Mutex`. For each new worker, we clone the `Arc` to bump the reference count so
874 the workers can share ownership of the receiving end.
875
876 With these changes, the code compiles! We’re getting there!
877
878 #### Implementing the `execute` Method
879
880 Let’s finally implement the `execute` method on `ThreadPool`. We’ll also change
881 `Job` from a struct to a type alias for a trait object that holds the type of
882 closure that `execute` receives. As discussed in the [“Creating Type Synonyms
883 with Type Aliases”][creating-type-synonyms-with-type-aliases]<!-- ignore -->
884 section of Chapter 19, type aliases allow us to make long types shorter. Look
885 at Listing 20-19.
886
887 <span class="filename">Filename: src/lib.rs</span>
888
889 ```rust
890 // --snip--
891 # pub struct ThreadPool {
892 # workers: Vec<Worker>,
893 # sender: mpsc::Sender<Job>,
894 # }
895 # use std::sync::mpsc;
896 # struct Worker {}
897
898 type Job = Box<dyn FnOnce() + Send + 'static>;
899
900 impl ThreadPool {
901 // --snip--
902
903 pub fn execute<F>(&self, f: F)
904 where
905 F: FnOnce() + Send + 'static
906 {
907 let job = Box::new(f);
908
909 self.sender.send(job).unwrap();
910 }
911 }
912
913 // --snip--
914 ```
915
916 <span class="caption">Listing 20-19: Creating a `Job` type alias for a `Box`
917 that holds each closure and then sending the job down the channel</span>
918
919 After creating a new `Job` instance using the closure we get in `execute`, we
920 send that job down the sending end of the channel. We’re calling `unwrap` on
921 `send` for the case that sending fails. This might happen if, for example, we
922 stop all our threads from executing, meaning the receiving end has stopped
923 receiving new messages. At the moment, we can’t stop our threads from
924 executing: our threads continue executing as long as the pool exists. The
925 reason we use `unwrap` is that we know the failure case won’t happen, but the
926 compiler doesn’t know that.
927
928 But we’re not quite done yet! In the worker, our closure being passed to
929 `thread::spawn` still only *references* the receiving end of the channel.
930 Instead, we need the closure to loop forever, asking the receiving end of the
931 channel for a job and running the job when it gets one. Let’s make the change
932 shown in Listing 20-20 to `Worker::new`.
933
934 <span class="filename">Filename: src/lib.rs</span>
935
936 ```rust,ignore,does_not_compile
937 // --snip--
938
939 impl Worker {
940 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
941 let thread = thread::spawn(move || {
942 loop {
943 let job = receiver.lock().unwrap().recv().unwrap();
944
945 println!("Worker {} got a job; executing.", id);
946
947 (*job)();
948 }
949 });
950
951 Worker {
952 id,
953 thread,
954 }
955 }
956 }
957 ```
958
959 <span class="caption">Listing 20-20: Receiving and executing the jobs in the
960 worker’s thread</span>
961
962 Here, we first call `lock` on the `receiver` to acquire the mutex, and then we
963 call `unwrap` to panic on any errors. Acquiring a lock might fail if the mutex
964 is in a *poisoned* state, which can happen if some other thread panicked while
965 holding the lock rather than releasing the lock. In this situation, calling
966 `unwrap` to have this thread panic is the correct action to take. Feel free to
967 change this `unwrap` to an `expect` with an error message that is meaningful to
968 you.
969
970 If we get the lock on the mutex, we call `recv` to receive a `Job` from the
971 channel. A final `unwrap` moves past any errors here as well, which might occur
972 if the thread holding the sending side of the channel has shut down, similar to
973 how the `send` method returns `Err` if the receiving side shuts down.
974
975 The call to `recv` blocks, so if there is no job yet, the current thread will
976 wait until a job becomes available. The `Mutex<T>` ensures that only one
977 `Worker` thread at a time is trying to request a job.
978
979 Theoretically, this code should compile. Unfortunately, the Rust compiler isn’t
980 perfect yet, and we get this error:
981
982 ```text
983 error[E0161]: cannot move a value of type std::ops::FnOnce() +
984 std::marker::Send: the size of std::ops::FnOnce() + std::marker::Send cannot be
985 statically determined
986 --> src/lib.rs:63:17
987 |
988 63 | (*job)();
989 | ^^^^^^
990 ```
991
992 This error is fairly cryptic because the problem is fairly cryptic. To call a
993 `FnOnce` closure that is stored in a `Box<T>` (which is what our `Job` type
994 alias is), the closure needs to move itself *out* of the `Box<T>` because the
995 closure takes ownership of `self` when we call it. In general, Rust doesn’t
996 allow us to move a value out of a `Box<T>` because Rust doesn’t know how big
997 the value inside the `Box<T>` will be: recall in Chapter 15 that we used
998 `Box<T>` precisely because we had something of an unknown size that we wanted
999 to store in a `Box<T>` to get a value of a known size.
1000
1001 As you saw in Listing 17-15, we can write methods that use the syntax `self:
1002 Box<Self>`, which allows the method to take ownership of a `Self` value stored
1003 in a `Box<T>`. That’s exactly what we want to do here, but unfortunately Rust
1004 won’t let us: the part of Rust that implements behavior when a closure is
1005 called isn’t implemented using `self: Box<Self>`. So Rust doesn’t yet
1006 understand that it could use `self: Box<Self>` in this situation to take
1007 ownership of the closure and move the closure out of the `Box<T>`.
1008
1009 Rust is still a work in progress with places where the compiler could be
1010 improved, but in the future, the code in Listing 20-20 should work just fine.
1011 People just like you are working to fix this and other issues! After you’ve
1012 finished this book, we would love for you to join in.
1013
1014 But for now, let’s work around this problem using a handy trick. We can tell
1015 Rust explicitly that in this case we can take ownership of the value inside the
1016 `Box<T>` using `self: Box<Self>`; then, once we have ownership of the closure,
1017 we can call it. This involves defining a new trait `FnBox` with the method
1018 `call_box` that will use `self: Box<Self>` in its signature, defining `FnBox`
1019 for any type that implements `FnOnce()`, changing our type alias to use the new
1020 trait, and changing `Worker` to use the `call_box` method. These changes are
1021 shown in Listing 20-21.
1022
1023 <span class="filename">Filename: src/lib.rs</span>
1024
1025 ```rust,ignore
1026 trait FnBox {
1027 fn call_box(self: Box<Self>);
1028 }
1029
1030 impl<F: FnOnce()> FnBox for F {
1031 fn call_box(self: Box<F>) {
1032 (*self)()
1033 }
1034 }
1035
1036 type Job = Box<dyn FnBox + Send + 'static>;
1037
1038 // --snip--
1039
1040 impl Worker {
1041 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
1042 let thread = thread::spawn(move || {
1043 loop {
1044 let job = receiver.lock().unwrap().recv().unwrap();
1045
1046 println!("Worker {} got a job; executing.", id);
1047
1048 job.call_box();
1049 }
1050 });
1051
1052 Worker {
1053 id,
1054 thread,
1055 }
1056 }
1057 }
1058 ```
1059
1060 <span class="caption">Listing 20-21: Adding a new trait `FnBox` to work around
1061 the current limitations of `Box<FnOnce()>`</span>
1062
1063 First, we create a new trait named `FnBox`. This trait has the one method
1064 `call_box`, which is similar to the `call` methods on the other `Fn*` traits
1065 except that it takes `self: Box<Self>` to take ownership of `self` and move the
1066 value out of the `Box<T>`.
1067
1068 Next, we implement the `FnBox` trait for any type `F` that implements the
1069 `FnOnce()` trait. Effectively, this means that any `FnOnce()` closures can use
1070 our `call_box` method. The implementation of `call_box` uses `(*self)()` to
1071 move the closure out of the `Box<T>` and call the closure.
1072
1073 We now need our `Job` type alias to be a `Box` of anything that implements our
1074 new trait `FnBox`. This will allow us to use `call_box` in `Worker` when we get
1075 a `Job` value instead of invoking the closure directly. Implementing the
1076 `FnBox` trait for any `FnOnce()` closure means we don’t have to change anything
1077 about the actual values we’re sending down the channel. Now Rust is able to
1078 recognize that what we want to do is fine.
1079
1080 This trick is very sneaky and complicated. Don’t worry if it doesn’t make
1081 perfect sense; someday, it will be completely unnecessary.
1082
1083 With the implementation of this trick, our thread pool is in a working state!
1084 Give it a `cargo run` and make some requests:
1085
1086 ```text
1087 $ cargo run
1088 Compiling hello v0.1.0 (file:///projects/hello)
1089 warning: field is never used: `workers`
1090 --> src/lib.rs:7:5
1091 |
1092 7 | workers: Vec<Worker>,
1093 | ^^^^^^^^^^^^^^^^^^^^
1094 |
1095 = note: #[warn(dead_code)] on by default
1096
1097 warning: field is never used: `id`
1098 --> src/lib.rs:61:5
1099 |
1100 61 | id: usize,
1101 | ^^^^^^^^^
1102 |
1103 = note: #[warn(dead_code)] on by default
1104
1105 warning: field is never used: `thread`
1106 --> src/lib.rs:62:5
1107 |
1108 62 | thread: thread::JoinHandle<()>,
1109 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1110 |
1111 = note: #[warn(dead_code)] on by default
1112
1113 Finished dev [unoptimized + debuginfo] target(s) in 0.99 secs
1114 Running `target/debug/hello`
1115 Worker 0 got a job; executing.
1116 Worker 2 got a job; executing.
1117 Worker 1 got a job; executing.
1118 Worker 3 got a job; executing.
1119 Worker 0 got a job; executing.
1120 Worker 2 got a job; executing.
1121 Worker 1 got a job; executing.
1122 Worker 3 got a job; executing.
1123 Worker 0 got a job; executing.
1124 Worker 2 got a job; executing.
1125 ```
1126
1127 Success! We now have a thread pool that executes connections asynchronously.
1128 There are never more than four threads created, so our system won’t get
1129 overloaded if the server receives a lot of requests. If we make a request to
1130 */sleep*, the server will be able to serve other requests by having another
1131 thread run them.
1132
1133 > Note: if you open */sleep* in multiple browser windows simultaneously, they
1134 > might load one at a time in 5 second intervals. Some web browsers execute
1135 > multiple instances of the same request sequentially for caching reasons. This
1136 > limitation is not caused by our web server.
1137
1138 After learning about the `while let` loop in Chapter 18, you might be wondering
1139 why we didn’t write the worker thread code as shown in Listing 20-22.
1140
1141 <span class="filename">Filename: src/lib.rs</span>
1142
1143 ```rust,ignore,not_desired_behavior
1144 // --snip--
1145
1146 impl Worker {
1147 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
1148 let thread = thread::spawn(move || {
1149 while let Ok(job) = receiver.lock().unwrap().recv() {
1150 println!("Worker {} got a job; executing.", id);
1151
1152 job.call_box();
1153 }
1154 });
1155
1156 Worker {
1157 id,
1158 thread,
1159 }
1160 }
1161 }
1162 ```
1163
1164 <span class="caption">Listing 20-22: An alternative implementation of
1165 `Worker::new` using `while let`</span>
1166
1167 This code compiles and runs but doesn’t result in the desired threading
1168 behavior: a slow request will still cause other requests to wait to be
1169 processed. The reason is somewhat subtle: the `Mutex` struct has no public
1170 `unlock` method because the ownership of the lock is based on the lifetime of
1171 the `MutexGuard<T>` within the `LockResult<MutexGuard<T>>` that the `lock`
1172 method returns. At compile time, the borrow checker can then enforce the rule
1173 that a resource guarded by a `Mutex` cannot be accessed unless we hold the
1174 lock. But this implementation can also result in the lock being held longer
1175 than intended if we don’t think carefully about the lifetime of the
1176 `MutexGuard<T>`. Because the values in the `while` expression remain in scope
1177 for the duration of the block, the lock remains held for the duration of the
1178 call to `job.call_box()`, meaning other workers cannot receive jobs.
1179
1180 By using `loop` instead and acquiring the lock and a job within the block
1181 rather than outside it, the `MutexGuard` returned from the `lock` method is
1182 dropped as soon as the `let job` statement ends. This ensures that the lock is
1183 held during the call to `recv`, but it is released before the call to
1184 `job.call_box()`, allowing multiple requests to be serviced concurrently.
1185
1186 [creating-type-synonyms-with-type-aliases]:
1187 ch19-04-advanced-types.html#creating-type-synonyms-with-type-aliases
1188 [integer-types]: ch03-02-data-types.html#integer-types
1189 [storing-closures-using-generic-parameters-and-the-fn-traits]:
1190 ch13-01-closures.html#storing-closures-using-generic-parameters-and-the-fn-traits