]>
Commit | Line | Data |
---|---|---|
13cf67c4 XL |
1 | ## Turning Our Single-Threaded Server into a Multithreaded Server |
2 | ||
3 | Right now, the server will process each request in turn, meaning it won’t | |
4 | process a second connection until the first is finished processing. If the | |
5 | server received more and more requests, this serial execution would be less and | |
6 | less optimal. If the server receives a request that takes a long time to | |
7 | process, subsequent requests will have to wait until the long request is | |
8 | finished, even if the new requests can be processed quickly. We’ll need to fix | |
9 | this, but first, we’ll look at the problem in action. | |
10 | ||
11 | ### Simulating a Slow Request in the Current Server Implementation | |
12 | ||
13 | We’ll look at how a slow-processing request can affect other requests made to | |
14 | our current server implementation. Listing 20-10 implements handling a request | |
15 | to */sleep* with a simulated slow response that will cause the server to sleep | |
16 | for 5 seconds before responding. | |
17 | ||
18 | <span class="filename">Filename: src/main.rs</span> | |
19 | ||
20 | ```rust | |
21 | use std::thread; | |
22 | use std::time::Duration; | |
23 | # use std::io::prelude::*; | |
24 | # use std::net::TcpStream; | |
25 | # use std::fs::File; | |
26 | // --snip-- | |
27 | ||
28 | fn handle_connection(mut stream: TcpStream) { | |
29 | # let mut buffer = [0; 512]; | |
30 | # stream.read(&mut buffer).unwrap(); | |
31 | // --snip-- | |
32 | ||
33 | let get = b"GET / HTTP/1.1\r\n"; | |
34 | let sleep = b"GET /sleep HTTP/1.1\r\n"; | |
35 | ||
36 | let (status_line, filename) = if buffer.starts_with(get) { | |
37 | ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") | |
38 | } else if buffer.starts_with(sleep) { | |
39 | thread::sleep(Duration::from_secs(5)); | |
40 | ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") | |
41 | } else { | |
42 | ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") | |
43 | }; | |
44 | ||
45 | // --snip-- | |
46 | } | |
47 | ``` | |
48 | ||
49 | <span class="caption">Listing 20-10: Simulating a slow request by recognizing | |
50 | */sleep* and sleeping for 5 seconds</span> | |
51 | ||
52 | This code is a bit messy, but it’s good enough for simulation purposes. We | |
53 | created a second request `sleep`, whose data our server recognizes. We added an | |
54 | `else if` after the `if` block to check for the request to */sleep*. When that | |
55 | request is received, the server will sleep for 5 seconds before rendering the | |
56 | successful HTML page. | |
57 | ||
58 | You can see how primitive our server is: real libraries would handle the | |
59 | recognition of multiple requests in a much less verbose way! | |
60 | ||
61 | Start the server using `cargo run`. Then open two browser windows: one for | |
62 | *http://127.0.0.1:7878/* and the other for *http://127.0.0.1:7878/sleep*. If | |
63 | you enter the */* URI a few times, as before, you’ll see it respond quickly. | |
64 | But if you enter */sleep* and then load */*, you’ll see that */* waits until | |
65 | `sleep` has slept for its full 5 seconds before loading. | |
66 | ||
67 | There are multiple ways we could change how our web server works to avoid | |
68 | having more requests back up behind a slow request; the one we’ll implement is | |
69 | a thread pool. | |
70 | ||
71 | ### Improving Throughput with a Thread Pool | |
72 | ||
73 | A *thread pool* is a group of spawned threads that are waiting and ready to | |
74 | handle a task. When the program receives a new task, it assigns one of the | |
75 | threads in the pool to the task, and that thread will process the task. The | |
76 | remaining threads in the pool are available to handle any other tasks that come | |
77 | in while the first thread is processing. When the first thread is done | |
78 | processing its task, it’s returned to the pool of idle threads, ready to handle | |
79 | a new task. A thread pool allows you to process connections concurrently, | |
80 | increasing the throughput of your server. | |
81 | ||
82 | We’ll limit the number of threads in the pool to a small number to protect us | |
83 | from Denial of Service (DoS) attacks; if we had our program create a new thread | |
84 | for each request as it came in, someone making 10 million requests to our | |
85 | server could create havoc by using up all our server’s resources and grinding | |
86 | the processing of requests to a halt. | |
87 | ||
88 | Rather than spawning unlimited threads, we’ll have a fixed number of threads | |
89 | waiting in the pool. As requests come in, they’ll be sent to the pool for | |
90 | processing. The pool will maintain a queue of incoming requests. Each of the | |
91 | threads in the pool will pop off a request from this queue, handle the request, | |
92 | and then ask the queue for another request. With this design, we can process | |
93 | `N` requests concurrently, where `N` is the number of threads. If each thread | |
94 | is responding to a long-running request, subsequent requests can still back up | |
95 | in the queue, but we’ve increased the number of long-running requests we can | |
96 | handle before reaching that point. | |
97 | ||
98 | This technique is just one of many ways to improve the throughput of a web | |
99 | server. Other options you might explore are the fork/join model and the | |
100 | single-threaded async I/O model. If you’re interested in this topic, you can | |
101 | read more about other solutions and try to implement them in Rust; with a | |
102 | low-level language like Rust, all of these options are possible. | |
103 | ||
104 | Before we begin implementing a thread pool, let’s talk about what using the | |
105 | pool should look like. When you’re trying to design code, writing the client | |
106 | interface first can help guide your design. Write the API of the code so it’s | |
107 | structured in the way you want to call it; then implement the functionality | |
108 | within that structure rather than implementing the functionality and then | |
109 | designing the public API. | |
110 | ||
111 | Similar to how we used test-driven development in the project in Chapter 12, | |
112 | we’ll use compiler-driven development here. We’ll write the code that calls the | |
113 | functions we want, and then we’ll look at errors from the compiler to determine | |
114 | what we should change next to get the code to work. | |
115 | ||
116 | #### Code Structure If We Could Spawn a Thread for Each Request | |
117 | ||
118 | First, let’s explore how our code might look if it did create a new thread for | |
119 | every connection. As mentioned earlier, this isn’t our final plan due to the | |
120 | problems with potentially spawning an unlimited number of threads, but it is a | |
121 | starting point. Listing 20-11 shows the changes to make to `main` to spawn a | |
122 | new thread to handle each stream within the `for` loop. | |
123 | ||
124 | <span class="filename">Filename: src/main.rs</span> | |
125 | ||
126 | ```rust,no_run | |
127 | # use std::thread; | |
128 | # use std::io::prelude::*; | |
129 | # use std::net::TcpListener; | |
130 | # use std::net::TcpStream; | |
131 | # | |
132 | fn main() { | |
133 | let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); | |
134 | ||
135 | for stream in listener.incoming() { | |
136 | let stream = stream.unwrap(); | |
137 | ||
138 | thread::spawn(|| { | |
139 | handle_connection(stream); | |
140 | }); | |
141 | } | |
142 | } | |
143 | # fn handle_connection(mut stream: TcpStream) {} | |
144 | ``` | |
145 | ||
146 | <span class="caption">Listing 20-11: Spawning a new thread for each | |
147 | stream</span> | |
148 | ||
149 | As you learned in Chapter 16, `thread::spawn` will create a new thread and then | |
150 | run the code in the closure in the new thread. If you run this code and load | |
151 | */sleep* in your browser, then */* in two more browser tabs, you’ll indeed see | |
152 | that the requests to */* don’t have to wait for */sleep* to finish. But as we | |
153 | mentioned, this will eventually overwhelm the system because you’d be making | |
154 | new threads without any limit. | |
155 | ||
156 | #### Creating a Similar Interface for a Finite Number of Threads | |
157 | ||
158 | We want our thread pool to work in a similar, familiar way so switching from | |
159 | threads to a thread pool doesn’t require large changes to the code that uses | |
160 | our API. Listing 20-12 shows the hypothetical interface for a `ThreadPool` | |
161 | struct we want to use instead of `thread::spawn`. | |
162 | ||
163 | <span class="filename">Filename: src/main.rs</span> | |
164 | ||
165 | ```rust,no_run | |
166 | # use std::thread; | |
167 | # use std::io::prelude::*; | |
168 | # use std::net::TcpListener; | |
169 | # use std::net::TcpStream; | |
170 | # struct ThreadPool; | |
171 | # impl ThreadPool { | |
172 | # fn new(size: u32) -> ThreadPool { ThreadPool } | |
173 | # fn execute<F>(&self, f: F) | |
174 | # where F: FnOnce() + Send + 'static {} | |
175 | # } | |
176 | # | |
177 | fn main() { | |
178 | let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); | |
179 | let pool = ThreadPool::new(4); | |
180 | ||
181 | for stream in listener.incoming() { | |
182 | let stream = stream.unwrap(); | |
183 | ||
184 | pool.execute(|| { | |
185 | handle_connection(stream); | |
186 | }); | |
187 | } | |
188 | } | |
189 | # fn handle_connection(mut stream: TcpStream) {} | |
190 | ``` | |
191 | ||
192 | <span class="caption">Listing 20-12: Our ideal `ThreadPool` interface</span> | |
193 | ||
194 | We use `ThreadPool::new` to create a new thread pool with a configurable number | |
195 | of threads, in this case four. Then, in the `for` loop, `pool.execute` has a | |
196 | similar interface as `thread::spawn` in that it takes a closure the pool should | |
197 | run for each stream. We need to implement `pool.execute` so it takes the | |
198 | closure and gives it to a thread in the pool to run. This code won’t yet | |
199 | compile, but we’ll try so the compiler can guide us in how to fix it. | |
200 | ||
201 | #### Building the `ThreadPool` Struct Using Compiler Driven Development | |
202 | ||
203 | Make the changes in Listing 20-12 to *src/main.rs*, and then let’s use the | |
204 | compiler errors from `cargo check` to drive our development. Here is the first | |
205 | error we get: | |
206 | ||
207 | ```text | |
208 | $ cargo check | |
209 | Compiling hello v0.1.0 (file:///projects/hello) | |
210 | error[E0433]: failed to resolve. Use of undeclared type or module `ThreadPool` | |
211 | --> src\main.rs:10:16 | |
212 | | | |
213 | 10 | let pool = ThreadPool::new(4); | |
214 | | ^^^^^^^^^^^^^^^ Use of undeclared type or module | |
215 | `ThreadPool` | |
216 | ||
217 | error: aborting due to previous error | |
218 | ``` | |
219 | ||
220 | Great! This error tells us we need a `ThreadPool` type or module, so we’ll | |
221 | build one now. Our `ThreadPool` implementation will be independent of the kind | |
222 | of work our web server is doing. So, let’s switch the `hello` crate from a | |
223 | binary crate to a library crate to hold our `ThreadPool` implementation. After | |
224 | we change to a library crate, we could also use the separate thread pool | |
225 | library for any work we want to do using a thread pool, not just for serving | |
226 | web requests. | |
227 | ||
228 | Create a *src/lib.rs* that contains the following, which is the simplest | |
229 | definition of a `ThreadPool` struct that we can have for now: | |
230 | ||
231 | <span class="filename">Filename: src/lib.rs</span> | |
232 | ||
233 | ```rust | |
234 | pub struct ThreadPool; | |
235 | ``` | |
236 | ||
237 | Then create a new directory, *src/bin*, and move the binary crate rooted in | |
238 | *src/main.rs* into *src/bin/main.rs*. Doing so will make the library crate the | |
239 | primary crate in the *hello* directory; we can still run the binary in | |
240 | *src/bin/main.rs* using `cargo run`. After moving the *main.rs* file, edit it | |
241 | to bring the library crate in and bring `ThreadPool` into scope by adding the | |
242 | following code to the top of *src/bin/main.rs*: | |
243 | ||
244 | <span class="filename">Filename: src/bin/main.rs</span> | |
245 | ||
246 | ```rust,ignore | |
247 | use hello::ThreadPool; | |
248 | ``` | |
249 | ||
250 | This code still won’t work, but let’s check it again to get the next error that | |
251 | we need to address: | |
252 | ||
253 | ```text | |
254 | $ cargo check | |
255 | Compiling hello v0.1.0 (file:///projects/hello) | |
256 | error[E0599]: no function or associated item named `new` found for type | |
257 | `hello::ThreadPool` in the current scope | |
258 | --> src/bin/main.rs:13:16 | |
259 | | | |
260 | 13 | let pool = ThreadPool::new(4); | |
261 | | ^^^^^^^^^^^^^^^ function or associated item not found in | |
262 | `hello::ThreadPool` | |
263 | ``` | |
264 | ||
265 | This error indicates that next we need to create an associated function named | |
266 | `new` for `ThreadPool`. We also know that `new` needs to have one parameter | |
267 | that can accept `4` as an argument and should return a `ThreadPool` instance. | |
268 | Let’s implement the simplest `new` function that will have those | |
269 | characteristics: | |
270 | ||
271 | <span class="filename">Filename: src/lib.rs</span> | |
272 | ||
273 | ```rust | |
274 | pub struct ThreadPool; | |
275 | ||
276 | impl ThreadPool { | |
277 | pub fn new(size: usize) -> ThreadPool { | |
278 | ThreadPool | |
279 | } | |
280 | } | |
281 | ``` | |
282 | ||
283 | We chose `usize` as the type of the `size` parameter, because we know that a | |
284 | negative number of threads doesn’t make any sense. We also know we’ll use this | |
285 | 4 as the number of elements in a collection of threads, which is what the | |
9fa01778 XL |
286 | `usize` type is for, as discussed in the [“Integer Types”][integer-types]<!-- |
287 | ignore --> section of Chapter 3. | |
13cf67c4 XL |
288 | |
289 | Let’s check the code again: | |
290 | ||
291 | ```text | |
292 | $ cargo check | |
293 | Compiling hello v0.1.0 (file:///projects/hello) | |
294 | warning: unused variable: `size` | |
295 | --> src/lib.rs:4:16 | |
296 | | | |
297 | 4 | pub fn new(size: usize) -> ThreadPool { | |
298 | | ^^^^ | |
299 | | | |
300 | = note: #[warn(unused_variables)] on by default | |
301 | = note: to avoid this warning, consider using `_size` instead | |
302 | ||
303 | error[E0599]: no method named `execute` found for type `hello::ThreadPool` in the current scope | |
304 | --> src/bin/main.rs:18:14 | |
305 | | | |
306 | 18 | pool.execute(|| { | |
307 | | ^^^^^^^ | |
308 | ``` | |
309 | ||
310 | Now we get a warning and an error. Ignoring the warning for a moment, the error | |
311 | occurs because we don’t have an `execute` method on `ThreadPool`. Recall from | |
9fa01778 XL |
312 | the [“Creating a Similar Interface for a Finite Number of |
313 | Threads”](#creating-a-similar-interface-for-a-finite-number-of-threads)<!-- | |
314 | ignore --> section that we decided our thread pool should have an interface | |
315 | similar to `thread::spawn`. In addition, we’ll implement the `execute` function | |
316 | so it takes the closure it’s given and gives it to an idle thread in the pool | |
317 | to run. | |
13cf67c4 XL |
318 | |
319 | We’ll define the `execute` method on `ThreadPool` to take a closure as a | |
9fa01778 XL |
320 | parameter. Recall from the [“Storing Closures Using Generic Parameters and the |
321 | `Fn` Traits”][storing-closures-using-generic-parameters-and-the-fn-traits]<!-- | |
322 | ignore --> section in Chapter 13 that we can take closures as parameters with | |
13cf67c4 XL |
323 | three different traits: `Fn`, `FnMut`, and `FnOnce`. We need to decide which |
324 | kind of closure to use here. We know we’ll end up doing something similar to | |
325 | the standard library `thread::spawn` implementation, so we can look at what | |
326 | bounds the signature of `thread::spawn` has on its parameter. The documentation | |
327 | shows us the following: | |
328 | ||
329 | ```rust,ignore | |
330 | pub fn spawn<F, T>(f: F) -> JoinHandle<T> | |
331 | where | |
332 | F: FnOnce() -> T + Send + 'static, | |
333 | T: Send + 'static | |
334 | ``` | |
335 | ||
336 | The `F` type parameter is the one we’re concerned with here; the `T` type | |
337 | parameter is related to the return value, and we’re not concerned with that. We | |
338 | can see that `spawn` uses `FnOnce` as the trait bound on `F`. This is probably | |
339 | what we want as well, because we’ll eventually pass the argument we get in | |
340 | `execute` to `spawn`. We can be further confident that `FnOnce` is the trait we | |
341 | want to use because the thread for running a request will only execute that | |
342 | request’s closure one time, which matches the `Once` in `FnOnce`. | |
343 | ||
344 | The `F` type parameter also has the trait bound `Send` and the lifetime bound | |
345 | `'static`, which are useful in our situation: we need `Send` to transfer the | |
346 | closure from one thread to another and `'static` because we don’t know how long | |
347 | the thread will take to execute. Let’s create an `execute` method on | |
348 | `ThreadPool` that will take a generic parameter of type `F` with these bounds: | |
349 | ||
350 | <span class="filename">Filename: src/lib.rs</span> | |
351 | ||
352 | ```rust | |
353 | # pub struct ThreadPool; | |
354 | impl ThreadPool { | |
355 | // --snip-- | |
356 | ||
357 | pub fn execute<F>(&self, f: F) | |
358 | where | |
359 | F: FnOnce() + Send + 'static | |
360 | { | |
361 | ||
362 | } | |
363 | } | |
364 | ``` | |
365 | ||
366 | We still use the `()` after `FnOnce` because this `FnOnce` represents a closure | |
367 | that takes no parameters and doesn’t return a value. Just like function | |
368 | definitions, the return type can be omitted from the signature, but even if we | |
369 | have no parameters, we still need the parentheses. | |
370 | ||
371 | Again, this is the simplest implementation of the `execute` method: it does | |
372 | nothing, but we’re trying only to make our code compile. Let’s check it again: | |
373 | ||
374 | ```text | |
375 | $ cargo check | |
376 | Compiling hello v0.1.0 (file:///projects/hello) | |
377 | warning: unused variable: `size` | |
378 | --> src/lib.rs:4:16 | |
379 | | | |
380 | 4 | pub fn new(size: usize) -> ThreadPool { | |
381 | | ^^^^ | |
382 | | | |
383 | = note: #[warn(unused_variables)] on by default | |
384 | = note: to avoid this warning, consider using `_size` instead | |
385 | ||
386 | warning: unused variable: `f` | |
387 | --> src/lib.rs:8:30 | |
388 | | | |
389 | 8 | pub fn execute<F>(&self, f: F) | |
390 | | ^ | |
391 | | | |
392 | = note: to avoid this warning, consider using `_f` instead | |
393 | ``` | |
394 | ||
395 | We’re receiving only warnings now, which means it compiles! But note that if | |
396 | you try `cargo run` and make a request in the browser, you’ll see the errors in | |
397 | the browser that we saw at the beginning of the chapter. Our library isn’t | |
398 | actually calling the closure passed to `execute` yet! | |
399 | ||
400 | > Note: A saying you might hear about languages with strict compilers, such as | |
401 | > Haskell and Rust, is “if the code compiles, it works.” But this saying is not | |
402 | > universally true. Our project compiles, but it does absolutely nothing! If we | |
403 | > were building a real, complete project, this would be a good time to start | |
404 | > writing unit tests to check that the code compiles *and* has the behavior we | |
405 | > want. | |
406 | ||
407 | #### Validating the Number of Threads in `new` | |
408 | ||
409 | We’ll continue to get warnings because we aren’t doing anything with the | |
410 | parameters to `new` and `execute`. Let’s implement the bodies of these | |
411 | functions with the behavior we want. To start, let’s think about `new`. Earlier | |
412 | we chose an unsigned type for the `size` parameter, because a pool with a | |
413 | negative number of threads makes no sense. However, a pool with zero threads | |
414 | also makes no sense, yet zero is a perfectly valid `usize`. We’ll add code to | |
415 | check that `size` is greater than zero before we return a `ThreadPool` instance | |
416 | and have the program panic if it receives a zero by using the `assert!` macro, | |
417 | as shown in Listing 20-13. | |
418 | ||
419 | <span class="filename">Filename: src/lib.rs</span> | |
420 | ||
421 | ```rust | |
422 | # pub struct ThreadPool; | |
423 | impl ThreadPool { | |
424 | /// Create a new ThreadPool. | |
425 | /// | |
426 | /// The size is the number of threads in the pool. | |
427 | /// | |
428 | /// # Panics | |
429 | /// | |
430 | /// The `new` function will panic if the size is zero. | |
431 | pub fn new(size: usize) -> ThreadPool { | |
432 | assert!(size > 0); | |
433 | ||
434 | ThreadPool | |
435 | } | |
436 | ||
437 | // --snip-- | |
438 | } | |
439 | ``` | |
440 | ||
441 | <span class="caption">Listing 20-13: Implementing `ThreadPool::new` to panic if | |
442 | `size` is zero</span> | |
443 | ||
444 | We’ve added some documentation for our `ThreadPool` with doc comments. Note | |
445 | that we followed good documentation practices by adding a section that calls | |
446 | out the situations in which our function can panic, as discussed in Chapter 14. | |
447 | Try running `cargo doc --open` and clicking the `ThreadPool` struct to see what | |
448 | the generated docs for `new` look like! | |
449 | ||
450 | Instead of adding the `assert!` macro as we’ve done here, we could make `new` | |
451 | return a `Result` like we did with `Config::new` in the I/O project in Listing | |
452 | 12-9. But we’ve decided in this case that trying to create a thread pool | |
453 | without any threads should be an unrecoverable error. If you’re feeling | |
454 | ambitious, try to write a version of `new` with the following signature to | |
455 | compare both versions: | |
456 | ||
457 | ```rust,ignore | |
458 | pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> { | |
459 | ``` | |
460 | ||
461 | #### Creating Space to Store the Threads | |
462 | ||
463 | Now that we have a way to know we have a valid number of threads to store in | |
464 | the pool, we can create those threads and store them in the `ThreadPool` struct | |
465 | before returning it. But how do we “store” a thread? Let’s take another look at | |
466 | the `thread::spawn` signature: | |
467 | ||
468 | ```rust,ignore | |
469 | pub fn spawn<F, T>(f: F) -> JoinHandle<T> | |
470 | where | |
471 | F: FnOnce() -> T + Send + 'static, | |
472 | T: Send + 'static | |
473 | ``` | |
474 | ||
475 | The `spawn` function returns a `JoinHandle<T>`, where `T` is the type that the | |
476 | closure returns. Let’s try using `JoinHandle` too and see what happens. In our | |
477 | case, the closures we’re passing to the thread pool will handle the connection | |
478 | and not return anything, so `T` will be the unit type `()`. | |
479 | ||
480 | The code in Listing 20-14 will compile but doesn’t create any threads yet. | |
481 | We’ve changed the definition of `ThreadPool` to hold a vector of | |
482 | `thread::JoinHandle<()>` instances, initialized the vector with a capacity of | |
483 | `size`, set up a `for` loop that will run some code to create the threads, and | |
484 | returned a `ThreadPool` instance containing them. | |
485 | ||
486 | <span class="filename">Filename: src/lib.rs</span> | |
487 | ||
488 | ```rust,ignore,not_desired_behavior | |
489 | use std::thread; | |
490 | ||
491 | pub struct ThreadPool { | |
492 | threads: Vec<thread::JoinHandle<()>>, | |
493 | } | |
494 | ||
495 | impl ThreadPool { | |
496 | // --snip-- | |
497 | pub fn new(size: usize) -> ThreadPool { | |
498 | assert!(size > 0); | |
499 | ||
500 | let mut threads = Vec::with_capacity(size); | |
501 | ||
502 | for _ in 0..size { | |
503 | // create some threads and store them in the vector | |
504 | } | |
505 | ||
506 | ThreadPool { | |
507 | threads | |
508 | } | |
509 | } | |
510 | ||
511 | // --snip-- | |
512 | } | |
513 | ``` | |
514 | ||
515 | <span class="caption">Listing 20-14: Creating a vector for `ThreadPool` to hold | |
516 | the threads</span> | |
517 | ||
518 | We’ve brought `std::thread` into scope in the library crate, because we’re | |
519 | using `thread::JoinHandle` as the type of the items in the vector in | |
520 | `ThreadPool`. | |
521 | ||
522 | Once a valid size is received, our `ThreadPool` creates a new vector that can | |
523 | hold `size` items. We haven’t used the `with_capacity` function in this book | |
524 | yet, which performs the same task as `Vec::new` but with an important | |
525 | difference: it preallocates space in the vector. Because we know we need to | |
526 | store `size` elements in the vector, doing this allocation up front is slightly | |
527 | more efficient than using `Vec::new`, which resizes itself as elements are | |
528 | inserted. | |
529 | ||
530 | When you run `cargo check` again, you’ll get a few more warnings, but it should | |
531 | succeed. | |
532 | ||
533 | #### A `Worker` Struct Responsible for Sending Code from the `ThreadPool` to a Thread | |
534 | ||
535 | We left a comment in the `for` loop in Listing 20-14 regarding the creation of | |
536 | threads. Here, we’ll look at how we actually create threads. The standard | |
537 | library provides `thread::spawn` as a way to create threads, and | |
538 | `thread::spawn` expects to get some code the thread should run as soon as the | |
539 | thread is created. However, in our case, we want to create the threads and have | |
540 | them *wait* for code that we’ll send later. The standard library’s | |
541 | implementation of threads doesn’t include any way to do that; we have to | |
542 | implement it manually. | |
543 | ||
544 | We’ll implement this behavior by introducing a new data structure between the | |
545 | `ThreadPool` and the threads that will manage this new behavior. We’ll call | |
546 | this data structure `Worker`, which is a common term in pooling | |
547 | implementations. Think of people working in the kitchen at a restaurant: the | |
548 | workers wait until orders come in from customers, and then they’re responsible | |
549 | for taking those orders and filling them. | |
550 | ||
551 | Instead of storing a vector of `JoinHandle<()>` instances in the thread pool, | |
552 | we’ll store instances of the `Worker` struct. Each `Worker` will store a single | |
553 | `JoinHandle<()>` instance. Then we’ll implement a method on `Worker` that will | |
554 | take a closure of code to run and send it to the already running thread for | |
555 | execution. We’ll also give each worker an `id` so we can distinguish between | |
556 | the different workers in the pool when logging or debugging. | |
557 | ||
558 | Let’s make the following changes to what happens when we create a `ThreadPool`. | |
559 | We’ll implement the code that sends the closure to the thread after we have | |
560 | `Worker` set up in this way: | |
561 | ||
562 | 1. Define a `Worker` struct that holds an `id` and a `JoinHandle<()>`. | |
563 | 2. Change `ThreadPool` to hold a vector of `Worker` instances. | |
564 | 3. Define a `Worker::new` function that takes an `id` number and returns a | |
565 | `Worker` instance that holds the `id` and a thread spawned with an empty | |
566 | closure. | |
567 | 4. In `ThreadPool::new`, use the `for` loop counter to generate an `id`, create | |
568 | a new `Worker` with that `id`, and store the worker in the vector. | |
569 | ||
570 | If you’re up for a challenge, try implementing these changes on your own before | |
571 | looking at the code in Listing 20-15. | |
572 | ||
573 | Ready? Here is Listing 20-15 with one way to make the preceding modifications. | |
574 | ||
575 | <span class="filename">Filename: src/lib.rs</span> | |
576 | ||
577 | ```rust | |
578 | use std::thread; | |
579 | ||
580 | pub struct ThreadPool { | |
581 | workers: Vec<Worker>, | |
582 | } | |
583 | ||
584 | impl ThreadPool { | |
585 | // --snip-- | |
586 | pub fn new(size: usize) -> ThreadPool { | |
587 | assert!(size > 0); | |
588 | ||
589 | let mut workers = Vec::with_capacity(size); | |
590 | ||
591 | for id in 0..size { | |
592 | workers.push(Worker::new(id)); | |
593 | } | |
594 | ||
595 | ThreadPool { | |
596 | workers | |
597 | } | |
598 | } | |
599 | // --snip-- | |
600 | } | |
601 | ||
602 | struct Worker { | |
603 | id: usize, | |
604 | thread: thread::JoinHandle<()>, | |
605 | } | |
606 | ||
607 | impl Worker { | |
608 | fn new(id: usize) -> Worker { | |
609 | let thread = thread::spawn(|| {}); | |
610 | ||
611 | Worker { | |
612 | id, | |
613 | thread, | |
614 | } | |
615 | } | |
616 | } | |
617 | ``` | |
618 | ||
619 | <span class="caption">Listing 20-15: Modifying `ThreadPool` to hold `Worker` | |
620 | instances instead of holding threads directly</span> | |
621 | ||
622 | We’ve changed the name of the field on `ThreadPool` from `threads` to `workers` | |
623 | because it’s now holding `Worker` instances instead of `JoinHandle<()>` | |
624 | instances. We use the counter in the `for` loop as an argument to | |
625 | `Worker::new`, and we store each new `Worker` in the vector named `workers`. | |
626 | ||
627 | External code (like our server in *src/bin/main.rs*) doesn’t need to know the | |
628 | implementation details regarding using a `Worker` struct within `ThreadPool`, | |
629 | so we make the `Worker` struct and its `new` function private. The | |
630 | `Worker::new` function uses the `id` we give it and stores a `JoinHandle<()>` | |
631 | instance that is created by spawning a new thread using an empty closure. | |
632 | ||
633 | This code will compile and will store the number of `Worker` instances we | |
634 | specified as an argument to `ThreadPool::new`. But we’re *still* not processing | |
635 | the closure that we get in `execute`. Let’s look at how to do that next. | |
636 | ||
637 | #### Sending Requests to Threads via Channels | |
638 | ||
639 | Now we’ll tackle the problem that the closures given to `thread::spawn` do | |
640 | absolutely nothing. Currently, we get the closure we want to execute in the | |
641 | `execute` method. But we need to give `thread::spawn` a closure to run when we | |
642 | create each `Worker` during the creation of the `ThreadPool`. | |
643 | ||
644 | We want the `Worker` structs that we just created to fetch code to run from a | |
645 | queue held in the `ThreadPool` and send that code to its thread to run. | |
646 | ||
647 | In Chapter 16, you learned about *channels*—a simple way to communicate between | |
648 | two threads—that would be perfect for this use case. We’ll use a channel to | |
649 | function as the queue of jobs, and `execute` will send a job from the | |
650 | `ThreadPool` to the `Worker` instances, which will send the job to its thread. | |
651 | Here is the plan: | |
652 | ||
653 | 1. The `ThreadPool` will create a channel and hold on to the sending side of | |
654 | the channel. | |
655 | 2. Each `Worker` will hold on to the receiving side of the channel. | |
656 | 3. We’ll create a new `Job` struct that will hold the closures we want to send | |
657 | down the channel. | |
658 | 4. The `execute` method will send the job it wants to execute down the sending | |
659 | side of the channel. | |
660 | 5. In its thread, the `Worker` will loop over its receiving side of the channel | |
661 | and execute the closures of any jobs it receives. | |
662 | ||
663 | Let’s start by creating a channel in `ThreadPool::new` and holding the sending | |
664 | side in the `ThreadPool` instance, as shown in Listing 20-16. The `Job` struct | |
665 | doesn’t hold anything for now but will be the type of item we’re sending down | |
666 | the channel. | |
667 | ||
668 | <span class="filename">Filename: src/lib.rs</span> | |
669 | ||
670 | ```rust | |
671 | # use std::thread; | |
672 | // --snip-- | |
673 | use std::sync::mpsc; | |
674 | ||
675 | pub struct ThreadPool { | |
676 | workers: Vec<Worker>, | |
677 | sender: mpsc::Sender<Job>, | |
678 | } | |
679 | ||
680 | struct Job; | |
681 | ||
682 | impl ThreadPool { | |
683 | // --snip-- | |
684 | pub fn new(size: usize) -> ThreadPool { | |
685 | assert!(size > 0); | |
686 | ||
687 | let (sender, receiver) = mpsc::channel(); | |
688 | ||
689 | let mut workers = Vec::with_capacity(size); | |
690 | ||
691 | for id in 0..size { | |
692 | workers.push(Worker::new(id)); | |
693 | } | |
694 | ||
695 | ThreadPool { | |
696 | workers, | |
697 | sender, | |
698 | } | |
699 | } | |
700 | // --snip-- | |
701 | } | |
702 | # | |
703 | # struct Worker { | |
704 | # id: usize, | |
705 | # thread: thread::JoinHandle<()>, | |
706 | # } | |
707 | # | |
708 | # impl Worker { | |
709 | # fn new(id: usize) -> Worker { | |
710 | # let thread = thread::spawn(|| {}); | |
711 | # | |
712 | # Worker { | |
713 | # id, | |
714 | # thread, | |
715 | # } | |
716 | # } | |
717 | # } | |
718 | ``` | |
719 | ||
720 | <span class="caption">Listing 20-16: Modifying `ThreadPool` to store the | |
721 | sending end of a channel that sends `Job` instances</span> | |
722 | ||
723 | In `ThreadPool::new`, we create our new channel and have the pool hold the | |
724 | sending end. This will successfully compile, still with warnings. | |
725 | ||
726 | Let’s try passing a receiving end of the channel into each worker as the thread | |
727 | pool creates the channel. We know we want to use the receiving end in the | |
728 | thread that the workers spawn, so we’ll reference the `receiver` parameter in | |
729 | the closure. The code in Listing 20-17 won’t quite compile yet. | |
730 | ||
731 | <span class="filename">Filename: src/lib.rs</span> | |
732 | ||
733 | ```rust,ignore,does_not_compile | |
734 | impl ThreadPool { | |
735 | // --snip-- | |
736 | pub fn new(size: usize) -> ThreadPool { | |
737 | assert!(size > 0); | |
738 | ||
739 | let (sender, receiver) = mpsc::channel(); | |
740 | ||
741 | let mut workers = Vec::with_capacity(size); | |
742 | ||
743 | for id in 0..size { | |
744 | workers.push(Worker::new(id, receiver)); | |
745 | } | |
746 | ||
747 | ThreadPool { | |
748 | workers, | |
749 | sender, | |
750 | } | |
751 | } | |
752 | // --snip-- | |
753 | } | |
754 | ||
755 | // --snip-- | |
756 | ||
757 | impl Worker { | |
758 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker { | |
759 | let thread = thread::spawn(|| { | |
760 | receiver; | |
761 | }); | |
762 | ||
763 | Worker { | |
764 | id, | |
765 | thread, | |
766 | } | |
767 | } | |
768 | } | |
769 | ``` | |
770 | ||
771 | <span class="caption">Listing 20-17: Passing the receiving end of the channel | |
772 | to the workers</span> | |
773 | ||
774 | We’ve made some small and straightforward changes: we pass the receiving end of | |
775 | the channel into `Worker::new`, and then we use it inside the closure. | |
776 | ||
777 | When we try to check this code, we get this error: | |
778 | ||
779 | ```text | |
780 | $ cargo check | |
781 | Compiling hello v0.1.0 (file:///projects/hello) | |
782 | error[E0382]: use of moved value: `receiver` | |
783 | --> src/lib.rs:27:42 | |
784 | | | |
785 | 27 | workers.push(Worker::new(id, receiver)); | |
786 | | ^^^^^^^^ value moved here in | |
787 | previous iteration of loop | |
788 | | | |
789 | = note: move occurs because `receiver` has type | |
790 | `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait | |
791 | ``` | |
792 | ||
793 | The code is trying to pass `receiver` to multiple `Worker` instances. This | |
794 | won’t work, as you’ll recall from Chapter 16: the channel implementation that | |
795 | Rust provides is multiple *producer*, single *consumer*. This means we can’t | |
796 | just clone the consuming end of the channel to fix this code. Even if we could, | |
797 | that is not the technique we would want to use; instead, we want to distribute | |
798 | the jobs across threads by sharing the single `receiver` among all the workers. | |
799 | ||
800 | Additionally, taking a job off the channel queue involves mutating the | |
801 | `receiver`, so the threads need a safe way to share and modify `receiver`; | |
802 | otherwise, we might get race conditions (as covered in Chapter 16). | |
803 | ||
804 | Recall the thread-safe smart pointers discussed in Chapter 16: to share | |
805 | ownership across multiple threads and allow the threads to mutate the value, we | |
806 | need to use `Arc<Mutex<T>>`. The `Arc` type will let multiple workers own the | |
807 | receiver, and `Mutex` will ensure that only one worker gets a job from the | |
808 | receiver at a time. Listing 20-18 shows the changes we need to make. | |
809 | ||
810 | <span class="filename">Filename: src/lib.rs</span> | |
811 | ||
812 | ```rust | |
813 | # use std::thread; | |
814 | # use std::sync::mpsc; | |
815 | use std::sync::Arc; | |
816 | use std::sync::Mutex; | |
817 | // --snip-- | |
818 | ||
819 | # pub struct ThreadPool { | |
820 | # workers: Vec<Worker>, | |
821 | # sender: mpsc::Sender<Job>, | |
822 | # } | |
823 | # struct Job; | |
824 | # | |
825 | impl ThreadPool { | |
826 | // --snip-- | |
827 | pub fn new(size: usize) -> ThreadPool { | |
828 | assert!(size > 0); | |
829 | ||
830 | let (sender, receiver) = mpsc::channel(); | |
831 | ||
832 | let receiver = Arc::new(Mutex::new(receiver)); | |
833 | ||
834 | let mut workers = Vec::with_capacity(size); | |
835 | ||
836 | for id in 0..size { | |
837 | workers.push(Worker::new(id, Arc::clone(&receiver))); | |
838 | } | |
839 | ||
840 | ThreadPool { | |
841 | workers, | |
842 | sender, | |
843 | } | |
844 | } | |
845 | ||
846 | // --snip-- | |
847 | } | |
848 | ||
849 | # struct Worker { | |
850 | # id: usize, | |
851 | # thread: thread::JoinHandle<()>, | |
852 | # } | |
853 | # | |
854 | impl Worker { | |
855 | fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { | |
856 | // --snip-- | |
857 | # let thread = thread::spawn(|| { | |
858 | # receiver; | |
859 | # }); | |
860 | # | |
861 | # Worker { | |
862 | # id, | |
863 | # thread, | |
864 | # } | |
865 | } | |
866 | } | |
867 | ``` | |
868 | ||
869 | <span class="caption">Listing 20-18: Sharing the receiving end of the channel | |
870 | among the workers using `Arc` and `Mutex`</span> | |
871 | ||
872 | In `ThreadPool::new`, we put the receiving end of the channel in an `Arc` and a | |
873 | `Mutex`. For each new worker, we clone the `Arc` to bump the reference count so | |
874 | the workers can share ownership of the receiving end. | |
875 | ||
876 | With these changes, the code compiles! We’re getting there! | |
877 | ||
878 | #### Implementing the `execute` Method | |
879 | ||
880 | Let’s finally implement the `execute` method on `ThreadPool`. We’ll also change | |
881 | `Job` from a struct to a type alias for a trait object that holds the type of | |
9fa01778 XL |
882 | closure that `execute` receives. As discussed in the [“Creating Type Synonyms |
883 | with Type Aliases”][creating-type-synonyms-with-type-aliases]<!-- ignore --> | |
884 | section of Chapter 19, type aliases allow us to make long types shorter. Look | |
885 | at Listing 20-19. | |
13cf67c4 XL |
886 | |
887 | <span class="filename">Filename: src/lib.rs</span> | |
888 | ||
889 | ```rust | |
890 | // --snip-- | |
891 | # pub struct ThreadPool { | |
892 | # workers: Vec<Worker>, | |
893 | # sender: mpsc::Sender<Job>, | |
894 | # } | |
895 | # use std::sync::mpsc; | |
896 | # struct Worker {} | |
897 | ||
898 | type Job = Box<FnOnce() + Send + 'static>; | |
899 | ||
900 | impl ThreadPool { | |
901 | // --snip-- | |
902 | ||
903 | pub fn execute<F>(&self, f: F) | |
904 | where | |
905 | F: FnOnce() + Send + 'static | |
906 | { | |
907 | let job = Box::new(f); | |
908 | ||
909 | self.sender.send(job).unwrap(); | |
910 | } | |
911 | } | |
912 | ||
913 | // --snip-- | |
914 | ``` | |
915 | ||
916 | <span class="caption">Listing 20-19: Creating a `Job` type alias for a `Box` | |
917 | that holds each closure and then sending the job down the channel</span> | |
918 | ||
919 | After creating a new `Job` instance using the closure we get in `execute`, we | |
920 | send that job down the sending end of the channel. We’re calling `unwrap` on | |
921 | `send` for the case that sending fails. This might happen if, for example, we | |
922 | stop all our threads from executing, meaning the receiving end has stopped | |
923 | receiving new messages. At the moment, we can’t stop our threads from | |
924 | executing: our threads continue executing as long as the pool exists. The | |
925 | reason we use `unwrap` is that we know the failure case won’t happen, but the | |
926 | compiler doesn’t know that. | |
927 | ||
928 | But we’re not quite done yet! In the worker, our closure being passed to | |
929 | `thread::spawn` still only *references* the receiving end of the channel. | |
930 | Instead, we need the closure to loop forever, asking the receiving end of the | |
931 | channel for a job and running the job when it gets one. Let’s make the change | |
932 | shown in Listing 20-20 to `Worker::new`. | |
933 | ||
934 | <span class="filename">Filename: src/lib.rs</span> | |
935 | ||
936 | ```rust,ignore,does_not_compile | |
937 | // --snip-- | |
938 | ||
939 | impl Worker { | |
940 | fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { | |
941 | let thread = thread::spawn(move || { | |
942 | loop { | |
943 | let job = receiver.lock().unwrap().recv().unwrap(); | |
944 | ||
945 | println!("Worker {} got a job; executing.", id); | |
946 | ||
947 | (*job)(); | |
948 | } | |
949 | }); | |
950 | ||
951 | Worker { | |
952 | id, | |
953 | thread, | |
954 | } | |
955 | } | |
956 | } | |
957 | ``` | |
958 | ||
959 | <span class="caption">Listing 20-20: Receiving and executing the jobs in the | |
960 | worker’s thread</span> | |
961 | ||
962 | Here, we first call `lock` on the `receiver` to acquire the mutex, and then we | |
963 | call `unwrap` to panic on any errors. Acquiring a lock might fail if the mutex | |
964 | is in a *poisoned* state, which can happen if some other thread panicked while | |
965 | holding the lock rather than releasing the lock. In this situation, calling | |
966 | `unwrap` to have this thread panic is the correct action to take. Feel free to | |
967 | change this `unwrap` to an `expect` with an error message that is meaningful to | |
968 | you. | |
969 | ||
970 | If we get the lock on the mutex, we call `recv` to receive a `Job` from the | |
971 | channel. A final `unwrap` moves past any errors here as well, which might occur | |
972 | if the thread holding the sending side of the channel has shut down, similar to | |
973 | how the `send` method returns `Err` if the receiving side shuts down. | |
974 | ||
975 | The call to `recv` blocks, so if there is no job yet, the current thread will | |
976 | wait until a job becomes available. The `Mutex<T>` ensures that only one | |
977 | `Worker` thread at a time is trying to request a job. | |
978 | ||
979 | Theoretically, this code should compile. Unfortunately, the Rust compiler isn’t | |
980 | perfect yet, and we get this error: | |
981 | ||
982 | ```text | |
983 | error[E0161]: cannot move a value of type std::ops::FnOnce() + | |
984 | std::marker::Send: the size of std::ops::FnOnce() + std::marker::Send cannot be | |
985 | statically determined | |
986 | --> src/lib.rs:63:17 | |
987 | | | |
988 | 63 | (*job)(); | |
989 | | ^^^^^^ | |
990 | ``` | |
991 | ||
992 | This error is fairly cryptic because the problem is fairly cryptic. To call a | |
993 | `FnOnce` closure that is stored in a `Box<T>` (which is what our `Job` type | |
994 | alias is), the closure needs to move itself *out* of the `Box<T>` because the | |
995 | closure takes ownership of `self` when we call it. In general, Rust doesn’t | |
996 | allow us to move a value out of a `Box<T>` because Rust doesn’t know how big | |
997 | the value inside the `Box<T>` will be: recall in Chapter 15 that we used | |
998 | `Box<T>` precisely because we had something of an unknown size that we wanted | |
999 | to store in a `Box<T>` to get a value of a known size. | |
1000 | ||
1001 | As you saw in Listing 17-15, we can write methods that use the syntax `self: | |
1002 | Box<Self>`, which allows the method to take ownership of a `Self` value stored | |
1003 | in a `Box<T>`. That’s exactly what we want to do here, but unfortunately Rust | |
1004 | won’t let us: the part of Rust that implements behavior when a closure is | |
1005 | called isn’t implemented using `self: Box<Self>`. So Rust doesn’t yet | |
1006 | understand that it could use `self: Box<Self>` in this situation to take | |
1007 | ownership of the closure and move the closure out of the `Box<T>`. | |
1008 | ||
1009 | Rust is still a work in progress with places where the compiler could be | |
1010 | improved, but in the future, the code in Listing 20-20 should work just fine. | |
1011 | People just like you are working to fix this and other issues! After you’ve | |
1012 | finished this book, we would love for you to join in. | |
1013 | ||
1014 | But for now, let’s work around this problem using a handy trick. We can tell | |
1015 | Rust explicitly that in this case we can take ownership of the value inside the | |
1016 | `Box<T>` using `self: Box<Self>`; then, once we have ownership of the closure, | |
1017 | we can call it. This involves defining a new trait `FnBox` with the method | |
1018 | `call_box` that will use `self: Box<Self>` in its signature, defining `FnBox` | |
1019 | for any type that implements `FnOnce()`, changing our type alias to use the new | |
1020 | trait, and changing `Worker` to use the `call_box` method. These changes are | |
1021 | shown in Listing 20-21. | |
1022 | ||
1023 | <span class="filename">Filename: src/lib.rs</span> | |
1024 | ||
1025 | ```rust,ignore | |
1026 | trait FnBox { | |
1027 | fn call_box(self: Box<Self>); | |
1028 | } | |
1029 | ||
1030 | impl<F: FnOnce()> FnBox for F { | |
1031 | fn call_box(self: Box<F>) { | |
1032 | (*self)() | |
1033 | } | |
1034 | } | |
1035 | ||
532ac7d7 | 1036 | type Job = Box<dyn FnBox + Send + 'static>; |
13cf67c4 XL |
1037 | |
1038 | // --snip-- | |
1039 | ||
1040 | impl Worker { | |
1041 | fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { | |
1042 | let thread = thread::spawn(move || { | |
1043 | loop { | |
1044 | let job = receiver.lock().unwrap().recv().unwrap(); | |
1045 | ||
1046 | println!("Worker {} got a job; executing.", id); | |
1047 | ||
1048 | job.call_box(); | |
1049 | } | |
1050 | }); | |
1051 | ||
1052 | Worker { | |
1053 | id, | |
1054 | thread, | |
1055 | } | |
1056 | } | |
1057 | } | |
1058 | ``` | |
1059 | ||
1060 | <span class="caption">Listing 20-21: Adding a new trait `FnBox` to work around | |
1061 | the current limitations of `Box<FnOnce()>`</span> | |
1062 | ||
1063 | First, we create a new trait named `FnBox`. This trait has the one method | |
1064 | `call_box`, which is similar to the `call` methods on the other `Fn*` traits | |
1065 | except that it takes `self: Box<Self>` to take ownership of `self` and move the | |
1066 | value out of the `Box<T>`. | |
1067 | ||
1068 | Next, we implement the `FnBox` trait for any type `F` that implements the | |
1069 | `FnOnce()` trait. Effectively, this means that any `FnOnce()` closures can use | |
1070 | our `call_box` method. The implementation of `call_box` uses `(*self)()` to | |
1071 | move the closure out of the `Box<T>` and call the closure. | |
1072 | ||
1073 | We now need our `Job` type alias to be a `Box` of anything that implements our | |
1074 | new trait `FnBox`. This will allow us to use `call_box` in `Worker` when we get | |
1075 | a `Job` value instead of invoking the closure directly. Implementing the | |
1076 | `FnBox` trait for any `FnOnce()` closure means we don’t have to change anything | |
1077 | about the actual values we’re sending down the channel. Now Rust is able to | |
1078 | recognize that what we want to do is fine. | |
1079 | ||
1080 | This trick is very sneaky and complicated. Don’t worry if it doesn’t make | |
1081 | perfect sense; someday, it will be completely unnecessary. | |
1082 | ||
1083 | With the implementation of this trick, our thread pool is in a working state! | |
1084 | Give it a `cargo run` and make some requests: | |
1085 | ||
1086 | ```text | |
1087 | $ cargo run | |
1088 | Compiling hello v0.1.0 (file:///projects/hello) | |
1089 | warning: field is never used: `workers` | |
1090 | --> src/lib.rs:7:5 | |
1091 | | | |
1092 | 7 | workers: Vec<Worker>, | |
1093 | | ^^^^^^^^^^^^^^^^^^^^ | |
1094 | | | |
1095 | = note: #[warn(dead_code)] on by default | |
1096 | ||
1097 | warning: field is never used: `id` | |
1098 | --> src/lib.rs:61:5 | |
1099 | | | |
1100 | 61 | id: usize, | |
1101 | | ^^^^^^^^^ | |
1102 | | | |
1103 | = note: #[warn(dead_code)] on by default | |
1104 | ||
1105 | warning: field is never used: `thread` | |
1106 | --> src/lib.rs:62:5 | |
1107 | | | |
1108 | 62 | thread: thread::JoinHandle<()>, | |
1109 | | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | |
1110 | | | |
1111 | = note: #[warn(dead_code)] on by default | |
1112 | ||
1113 | Finished dev [unoptimized + debuginfo] target(s) in 0.99 secs | |
1114 | Running `target/debug/hello` | |
1115 | Worker 0 got a job; executing. | |
1116 | Worker 2 got a job; executing. | |
1117 | Worker 1 got a job; executing. | |
1118 | Worker 3 got a job; executing. | |
1119 | Worker 0 got a job; executing. | |
1120 | Worker 2 got a job; executing. | |
1121 | Worker 1 got a job; executing. | |
1122 | Worker 3 got a job; executing. | |
1123 | Worker 0 got a job; executing. | |
1124 | Worker 2 got a job; executing. | |
1125 | ``` | |
1126 | ||
1127 | Success! We now have a thread pool that executes connections asynchronously. | |
1128 | There are never more than four threads created, so our system won’t get | |
1129 | overloaded if the server receives a lot of requests. If we make a request to | |
1130 | */sleep*, the server will be able to serve other requests by having another | |
1131 | thread run them. | |
1132 | ||
9fa01778 XL |
1133 | > Note: if you open */sleep* in multiple browser windows simultaneously, they |
1134 | > might load one at a time in 5 second intervals. Some web browsers execute | |
1135 | > multiple instances of the same request sequentially for caching reasons. This | |
1136 | > limitation is not caused by our web server. | |
13cf67c4 XL |
1137 | |
1138 | After learning about the `while let` loop in Chapter 18, you might be wondering | |
1139 | why we didn’t write the worker thread code as shown in Listing 20-22. | |
1140 | ||
1141 | <span class="filename">Filename: src/lib.rs</span> | |
1142 | ||
1143 | ```rust,ignore,not_desired_behavior | |
1144 | // --snip-- | |
1145 | ||
1146 | impl Worker { | |
1147 | fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { | |
1148 | let thread = thread::spawn(move || { | |
1149 | while let Ok(job) = receiver.lock().unwrap().recv() { | |
1150 | println!("Worker {} got a job; executing.", id); | |
1151 | ||
1152 | job.call_box(); | |
1153 | } | |
1154 | }); | |
1155 | ||
1156 | Worker { | |
1157 | id, | |
1158 | thread, | |
1159 | } | |
1160 | } | |
1161 | } | |
1162 | ``` | |
1163 | ||
1164 | <span class="caption">Listing 20-22: An alternative implementation of | |
1165 | `Worker::new` using `while let`</span> | |
1166 | ||
1167 | This code compiles and runs but doesn’t result in the desired threading | |
1168 | behavior: a slow request will still cause other requests to wait to be | |
1169 | processed. The reason is somewhat subtle: the `Mutex` struct has no public | |
1170 | `unlock` method because the ownership of the lock is based on the lifetime of | |
1171 | the `MutexGuard<T>` within the `LockResult<MutexGuard<T>>` that the `lock` | |
1172 | method returns. At compile time, the borrow checker can then enforce the rule | |
1173 | that a resource guarded by a `Mutex` cannot be accessed unless we hold the | |
1174 | lock. But this implementation can also result in the lock being held longer | |
1175 | than intended if we don’t think carefully about the lifetime of the | |
1176 | `MutexGuard<T>`. Because the values in the `while` expression remain in scope | |
1177 | for the duration of the block, the lock remains held for the duration of the | |
1178 | call to `job.call_box()`, meaning other workers cannot receive jobs. | |
1179 | ||
1180 | By using `loop` instead and acquiring the lock and a job within the block | |
1181 | rather than outside it, the `MutexGuard` returned from the `lock` method is | |
1182 | dropped as soon as the `let job` statement ends. This ensures that the lock is | |
1183 | held during the call to `recv`, but it is released before the call to | |
1184 | `job.call_box()`, allowing multiple requests to be serviced concurrently. | |
9fa01778 XL |
1185 | |
1186 | [creating-type-synonyms-with-type-aliases]: | |
1187 | ch19-04-advanced-types.html#creating-type-synonyms-with-type-aliases | |
1188 | [integer-types]: ch03-02-data-types.html#integer-types | |
1189 | [storing-closures-using-generic-parameters-and-the-fn-traits]: | |
1190 | ch13-01-closures.html#storing-closures-using-generic-parameters-and-the-fn-traits |