]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | [/ |
2 | Copyright Oliver Kowalke, Nat Goodspeed 2015. | |
3 | Distributed under the Boost Software License, Version 1.0. | |
4 | (See accompanying file LICENSE_1_0.txt or copy at | |
5 | http://www.boost.org/LICENSE_1_0.txt | |
6 | ] | |
7 | ||
8 | [/ import path is relative to this .qbk file] | |
9 | [import ../examples/wait_stuff.cpp] | |
10 | ||
11 | [#when_any] | |
12 | [section:when_any when_any / when_all functionality] | |
13 | ||
14 | [heading Overview] | |
15 | ||
16 | A bit of wisdom from the early days of computing still holds true today: | |
17 | prefer to model program state using the instruction pointer rather than with | |
18 | Boolean flags. In other words, if the program must ["do something] and then | |
19 | do something almost the same, but with minor changes... perhaps parts of that | |
20 | something should be broken out as smaller separate functions, rather than | |
21 | introducing flags to alter the internal behavior of a monolithic function. | |
22 | ||
23 | To that we would add: prefer to describe control flow using C++ native | |
24 | constructs such as function calls, `if`, `while`, `for`, `do` et al. | |
25 | rather than as chains of callbacks. | |
26 | ||
27 | One of the great strengths of __boost_fiber__ is the flexibility it confers on | |
28 | the coder to restructure an application from chains of callbacks to | |
29 | straightforward C++ statement sequence, even when code in that fiber is | |
30 | in fact interleaved with code running in other fibers. | |
31 | ||
32 | There has been much recent discussion about the benefits of when_any and | |
33 | when_all functionality. When dealing with asynchronous and possibly unreliable | |
34 | services, these are valuable idioms. But of course when_any and when_all are | |
35 | closely tied to the use of chains of callbacks. | |
36 | ||
37 | This section presents recipes for achieving the same ends, in the context of a | |
38 | fiber that wants to ["do something] when one or more other independent | |
39 | activities have completed. Accordingly, these are `wait_something()` | |
40 | functions rather than `when_something()` functions. The expectation is that | |
41 | the calling fiber asks to launch those independent activities, then waits for | |
42 | them, then sequentially proceeds with whatever processing depends on those | |
43 | results. | |
44 | ||
45 | The function names shown (e.g. [link wait_first_simple `wait_first_simple()`]) | |
46 | are for illustrative purposes only, because all these functions have been | |
47 | bundled into a single source file. Presumably, if (say) [link | |
48 | wait_first_success `wait_first_success()`] best suits your application needs, | |
49 | you could introduce that variant with the name `wait_any()`. | |
50 | ||
51 | [note The functions presented in this section accept variadic argument lists | |
52 | of task functions. Corresponding `wait_something()` functions accepting a | |
53 | container of task functions are left as an exercise for the interested reader. | |
54 | Those should actually be simpler. Most of the complexity would arise from | |
55 | overloading the same name for both purposes.] | |
56 | ||
57 | [/ @path link is relative to (eventual) doc/html/index.html, hence ../..] | |
58 | All the source code for this section is found in | |
59 | [@../../examples/wait_stuff.cpp wait_stuff.cpp]. | |
60 | ||
61 | [heading Example Task Function] | |
62 | ||
63 | [#wait_sleeper] | |
64 | We found it convenient to model an asynchronous task using this function: | |
65 | ||
66 | [wait_sleeper] | |
67 | ||
68 | with type-specific `sleeper()` ["front ends] for `std::string`, `double` and | |
69 | `int`. | |
70 | ||
71 | `Verbose` simply prints a message to `std::cout` on construction and | |
72 | destruction. | |
73 | ||
74 | Basically: | |
75 | ||
76 | # `sleeper()` prints a start message; | |
77 | # sleeps for the specified number of milliseconds; | |
78 | # if `thrw` is passed as `true`, throws a string description of the passed | |
79 | `item`; | |
80 | # else returns the passed `item`. | |
81 | # On the way out, `sleeper()` produces a stop message. | |
82 | ||
83 | This function will feature in the example calls to the various functions | |
84 | presented below. | |
85 | ||
86 | [section when_any] | |
87 | [#wait_first_simple_section] | |
88 | [section when_any, simple completion] | |
89 | ||
90 | The simplest case is when you only need to know that the first of a set of | |
91 | asynchronous tasks has completed [mdash] but you don't need to obtain a return | |
92 | value, and you're confident that they will not throw exceptions. | |
93 | ||
94 | [#wait_done] | |
95 | For this we introduce a `Done` class to wrap a `bool` variable with a | |
96 | [class_link condition_variable] and a [class_link mutex]: | |
97 | ||
98 | [wait_done] | |
99 | ||
100 | The pattern we follow throughout this section is to pass a | |
101 | [@http://www.cplusplus.com/reference/memory/shared_ptr/ `std::shared_ptr<>`] | |
102 | to the relevant synchronization object to the various tasks' fiber functions. | |
103 | This eliminates nagging questions about the lifespan of the synchronization | |
104 | object relative to the last of the fibers. | |
105 | ||
106 | [#wait_first_simple] | |
107 | `wait_first_simple()` uses that tactic for [link wait_done `Done`]: | |
108 | ||
109 | [wait_first_simple] | |
110 | ||
111 | [#wait_first_simple_impl] | |
112 | `wait_first_simple_impl()` is an ordinary recursion over the argument pack, | |
113 | capturing `Done::ptr` for each new fiber: | |
114 | ||
115 | [wait_first_simple_impl] | |
116 | ||
117 | The body of the fiber's lambda is extremely simple, as promised: call the | |
118 | function, notify [link wait_done `Done`] when it returns. The first fiber to | |
119 | do so allows `wait_first_simple()` to return [mdash] which is why it's useful | |
120 | to have `std::shared_ptr<Done>` manage the lifespan of our `Done` object | |
121 | rather than declaring it as a stack variable in `wait_first_simple()`. | |
122 | ||
123 | This is how you might call it: | |
124 | ||
125 | [wait_first_simple_ex] | |
126 | ||
127 | In this example, control resumes after `wait_first_simple()` when [link | |
128 | wait_sleeper `sleeper("wfs_short", 50)`] completes [mdash] even though the | |
129 | other two `sleeper()` fibers are still running. | |
130 | ||
131 | [endsect] | |
132 | [section when_any, return value] | |
133 | ||
134 | It seems more useful to add the ability to capture the return value from the | |
135 | first of the task functions to complete. Again, we assume that none will throw | |
136 | an exception. | |
137 | ||
138 | One tactic would be to adapt our [link wait_done `Done`] class to store the | |
139 | first of the return values, rather than a simple `bool`. However, we choose | |
140 | instead to use a [template_link unbounded_channel]. We'll only need to enqueue | |
141 | the first value, so we'll [member_link unbounded_channel..close] it once we've | |
142 | retrieved that value. Subsequent `push()` calls will return `closed`. | |
143 | ||
144 | [#wait_first_value] | |
145 | [wait_first_value] | |
146 | ||
147 | [#wait_first_value_impl] | |
148 | The meat of the `wait_first_value_impl()` function is as you might expect: | |
149 | ||
150 | [wait_first_value_impl] | |
151 | ||
152 | It calls the passed function, pushes its return value and ignores the `push()` | |
153 | result. You might call it like this: | |
154 | ||
155 | [wait_first_value_ex] | |
156 | ||
157 | [endsect] | |
158 | [section when_any, produce first outcome, whether result or exception] | |
159 | ||
160 | We may not be running in an environment in which we can guarantee no exception | |
161 | will be thrown by any of our task functions. In that case, the above | |
162 | implementations of `wait_first_something()` would be naïve: as mentioned in | |
163 | [link exceptions the section on Fiber Management], an uncaught exception in one | |
164 | of our task fibers would cause `std::terminate()` to be called. | |
165 | ||
166 | Let's at least ensure that such an exception would propagate to the fiber | |
167 | awaiting the first result. We can use [template_link future] to transport | |
168 | either a return value or an exception. Therefore, we will change [link | |
169 | wait_first_value `wait_first_value()`]'s [template_link unbounded_channel] to | |
170 | hold `future< T >` items instead of simply `T`. | |
171 | ||
172 | Once we have a `future<>` in hand, all we need do is call [member_link | |
173 | future..get], which will either return the value or rethrow the exception. | |
174 | ||
175 | [#wait_first_outcome] | |
176 | [wait_first_outcome] | |
177 | ||
178 | So far so good [mdash] but there's a timing issue. How should we obtain the | |
179 | `future<>` to [member_link unbounded_channel..push] on the channel? | |
180 | ||
181 | We could call [ns_function_link fibers..async]. That would certainly produce a | |
182 | `future<>` for the task function. The trouble is that it would return too | |
183 | quickly! We only want `future<>` items for ['completed] tasks on our | |
184 | `unbounded_channel<>`. In fact, we only want the `future<>` for the one that | |
185 | completes first. If each fiber launched by `wait_first_outcome()` were to | |
186 | `push()` the result of calling `async()`, the channel would only ever report | |
187 | the result of the leftmost task item [mdash] ['not] the one that completes most | |
188 | quickly. | |
189 | ||
190 | Calling [member_link future..get] on the future returned by `async()` wouldn't | |
191 | be right. You can only call `get()` once per `future<>` instance! And if there | |
192 | were an exception, it would be rethrown inside the helper fiber at the | |
193 | producer end of the channel, rather than propagated to the consumer end. | |
194 | ||
195 | We could call [member_link future..wait]. That would block the helper fiber | |
196 | until the `future<>` became ready, at which point we could `push()` it to be | |
197 | retrieved by `wait_first_outcome()`. | |
198 | ||
199 | That would work [mdash] but there's a simpler tactic that avoids creating an extra | |
200 | fiber. We can wrap the task function in a [template_link packaged_task]. While | |
201 | one naturally thinks of passing a `packaged_task<>` to a new fiber [mdash] that is, | |
202 | in fact, what `async()` does [mdash] in this case, we're already running in the | |
203 | helper fiber at the producer end of the channel! We can simply ['call] the | |
204 | `packaged_task<>`. On return from that call, the task function has completed, | |
205 | meaning that the `future<>` obtained from the `packaged_task<>` is certain to | |
206 | be ready. At that point we can simply `push()` it to the channel. | |
207 | ||
208 | [#wait_first_outcome_impl] | |
209 | [wait_first_outcome_impl] | |
210 | ||
211 | Calling it might look like this: | |
212 | ||
213 | [wait_first_outcome_ex] | |
214 | ||
215 | [endsect] | |
216 | [section when_any, produce first success] | |
217 | ||
218 | One scenario for ["when_any] functionality is when we're redundantly contacting | |
219 | some number of possibly-unreliable web services. Not only might they be slow | |
220 | [mdash] any one of them might produce a failure rather than the desired | |
221 | result. | |
222 | ||
223 | In such a case, [link wait_first_outcome `wait_first_outcome()`] isn't the | |
224 | right approach. If one of the services produces an error quickly, while | |
225 | another follows up with a real answer, we don't want to prefer the error just | |
226 | because it arrived first! | |
227 | ||
228 | Given the `unbounded_channel< future< T > >` we already constructed for | |
229 | `wait_first_outcome()`, though, we can readily recast the interface function | |
230 | to deliver the first ['successful] result. | |
231 | ||
232 | That does beg the question: what if ['all] the task functions throw an | |
233 | exception? In that case we'd probably better know about it. | |
234 | ||
235 | [#exception_list] | |
236 | The | |
237 | [@http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/n4407.html#parallel.exceptions.synopsis | |
238 | C++ Parallelism Draft Technical Specification] proposes a | |
239 | `std::exception_list` exception capable of delivering a collection of | |
240 | `std::exception_ptr`s. Until that becomes universally available, let's fake up | |
241 | an `exception_list` of our own: | |
242 | ||
243 | [exception_list] | |
244 | ||
245 | Now we can build `wait_first_success()`, using [link wait_first_outcome_impl | |
246 | `wait_first_outcome_impl()`]. | |
247 | ||
248 | Instead of retrieving only the first `future<>` from the channel, we must now | |
249 | loop over `future<>` items. Of course we must limit that iteration! If we | |
250 | launch only `count` producer fibers, the `(count+1)`[superscript st] | |
251 | [member_link unbounded_channel..pop] call would block forever. | |
252 | ||
253 | Given a ready `future<>`, we can distinguish failure by calling [member_link | |
254 | future..get_exception_ptr]. If the `future<>` in fact contains a result rather | |
255 | than an exception, `get_exception_ptr()` returns `nullptr`. In that case, we | |
256 | can confidently call [member_link future..get] to return that result to our | |
257 | caller. | |
258 | ||
259 | If the `std::exception_ptr` is ['not] `nullptr`, though, we collect it into | |
260 | our pending `exception_list` and loop back for the next `future<>` from the | |
261 | channel. | |
262 | ||
263 | If we fall out of the loop [mdash] if every single task fiber threw an | |
264 | exception [mdash] we throw the `exception_list` exception into which we've | |
265 | been collecting those `std::exception_ptr`s. | |
266 | ||
267 | [#wait_first_success] | |
268 | [wait_first_success] | |
269 | ||
270 | A call might look like this: | |
271 | ||
272 | [wait_first_success_ex] | |
273 | ||
274 | [endsect] | |
275 | [section when_any, heterogeneous types] | |
276 | ||
277 | We would be remiss to ignore the case in which the various task functions have | |
278 | distinct return types. That means that the value returned by the first of them | |
279 | might have any one of those types. We can express that with | |
280 | [@http://www.boost.org/doc/libs/release/doc/html/variant.html Boost.Variant]. | |
281 | ||
282 | To keep the example simple, we'll revert to pretending that none of them can | |
283 | throw an exception. That makes `wait_first_value_het()` strongly resemble | |
284 | [link wait_first_value `wait_first_value()`]. We can actually reuse [link | |
285 | wait_first_value_impl `wait_first_value_impl()`], merely passing | |
286 | `boost::variant<T0, T1, ...>` as the channel's value type rather than the | |
287 | common `T`! | |
288 | ||
289 | Naturally this could be extended to use [link wait_first_success | |
290 | `wait_first_success()`] semantics instead. | |
291 | ||
292 | [wait_first_value_het] | |
293 | ||
294 | It might be called like this: | |
295 | ||
296 | [wait_first_value_het_ex] | |
297 | ||
298 | [endsect] | |
299 | [section when_any, a dubious alternative] | |
300 | ||
301 | Certain topics in C++ can arouse strong passions, and exceptions are no | |
302 | exception. We cannot resist mentioning [mdash] for purely informational | |
303 | purposes [mdash] that when you need only the ['first] result from some number | |
304 | of concurrently-running fibers, it would be possible to pass a | |
305 | [^shared_ptr<[template_link promise]>] to the participating fibers, then cause | |
306 | the initiating fiber to call [member_link future..get] on its [template_link | |
307 | future]. The first fiber to call [member_link promise..set_value] on that | |
308 | shared `promise` will succeed; subsequent `set_value()` calls on the same | |
309 | `promise` instance will throw `future_error`. | |
310 | ||
311 | Use this information at your own discretion. Beware the dark side. | |
312 | ||
313 | [endsect] | |
314 | [endsect][/ when_any] | |
315 | ||
316 | [section when_all functionality] | |
317 | [section when_all, simple completion] | |
318 | ||
319 | For the case in which we must wait for ['all] task functions to complete | |
320 | [mdash] but we don't need results (or expect exceptions) from any of them | |
321 | [mdash] we can write `wait_all_simple()` that looks remarkably like [link | |
322 | wait_first_simple `wait_first_simple()`]. The difference is that instead of | |
323 | our [link wait_done `Done`] class, we instantiate a [class_link barrier] and | |
324 | call its [member_link barrier..wait]. | |
325 | ||
326 | We initialize the `barrier` with `(count+1)` because we are launching `count` | |
327 | fibers, plus the `wait()` call within `wait_all_simple()` itself. | |
328 | ||
329 | [wait_all_simple] | |
330 | ||
331 | As stated above, the only difference between `wait_all_simple_impl()` and | |
332 | [link wait_first_simple_impl `wait_first_simple_impl()`] is that the former | |
333 | calls `barrier::wait()` rather than `Done::notify()`: | |
334 | ||
335 | [wait_all_simple_impl] | |
336 | ||
337 | You might call it like this: | |
338 | ||
339 | [wait_all_simple_ex] | |
340 | ||
341 | Control will not return from the `wait_all_simple()` call until the last of | |
342 | its task functions has completed. | |
343 | ||
344 | [endsect] | |
345 | [section when_all, return values] | |
346 | ||
347 | As soon as we want to collect return values from all the task functions, we | |
348 | can see right away how to reuse [link wait_first_value `wait_first_value()`]'s | |
349 | channel<T> for the purpose. All we have to do is avoid closing it after the | |
350 | first value! | |
351 | ||
352 | But in fact, collecting multiple values raises an interesting question: do we | |
353 | ['really] want to wait until the slowest of them has arrived? Wouldn't we | |
354 | rather process each result as soon as it becomes available? | |
355 | ||
356 | Fortunately we can present both APIs. Let's define `wait_all_values_source()` | |
357 | to return `shared_ptr<unbounded_channel<T>>`.[footnote We could have used | |
358 | either [template_link bounded_channel] or [template_link unbounded_channel]. | |
359 | We chose `unbounded_channel<>` on the assumption that its simpler semantics | |
360 | imply a cheaper implementation.] | |
361 | ||
362 | [#wait_all_values] | |
363 | Given `wait_all_values_source()`, it's straightforward to implement | |
364 | `wait_all_values()`: | |
365 | ||
366 | [wait_all_values] | |
367 | ||
368 | It might be called like this: | |
369 | ||
370 | [wait_all_values_ex] | |
371 | ||
372 | As you can see from the loop in `wait_all_values()`, instead of requiring its | |
373 | caller to count values, we define `wait_all_values_source()` to [member_link | |
374 | unbounded_channel..close] the channel when done. But how do we do that? Each | |
375 | producer fiber is independent. It has no idea whether it is the last one to | |
376 | [member_link unbounded_channel..push] a value. | |
377 | ||
378 | [#wait_nchannel] | |
379 | We can address that problem with a counting façade for the | |
380 | `unbounded_channel<>`. In fact, our façade need only support the producer end of | |
381 | the channel. | |
382 | ||
383 | [wait_nchannel] | |
384 | ||
385 | [#wait_all_values_source] | |
386 | Armed with `nchannel<>`, we can implement `wait_all_values_source()`. It | |
387 | starts just like [link wait_first_value `wait_first_value()`]. The difference | |
388 | is that we wrap the `unbounded_channel<T>` with an `nchannel<T>` to pass to | |
389 | the producer fibers. | |
390 | ||
391 | Then, of course, instead of popping the first value, closing the channel and | |
392 | returning it, we simply return the `shared_ptr<unbounded_channel<T>>`. | |
393 | ||
394 | [wait_all_values_source] | |
395 | ||
396 | For example: | |
397 | ||
398 | [wait_all_values_source_ex] | |
399 | ||
400 | [#wait_all_values_impl] | |
401 | `wait_all_values_impl()` really is just like [link wait_first_value_impl | |
402 | `wait_first_value_impl()`] except for the use of `nchannel<T>` rather than | |
403 | `unbounded_channel<T>`: | |
404 | ||
405 | [wait_all_values_impl] | |
406 | ||
407 | [endsect] | |
408 | [section when_all until first exception] | |
409 | ||
410 | Naturally, just as with [link wait_first_outcome `wait_first_outcome()`], we | |
411 | can elaborate [link wait_all_values `wait_all_values()`] and [link | |
412 | wait_all_values_source `wait_all_values_source()`] by passing `future< T >` | |
413 | instead of plain `T`. | |
414 | ||
415 | [#wait_all_until_error] | |
416 | `wait_all_until_error()` pops that `future< T >` and calls its [member_link | |
417 | future..get]: | |
418 | ||
419 | [wait_all_until_error] | |
420 | ||
421 | For example: | |
422 | ||
423 | [wait_all_until_error_ex] | |
424 | ||
425 | [#wait_all_until_error_source] | |
426 | Naturally this complicates the API for `wait_all_until_error_source()`. The | |
427 | caller must both retrieve a `future< T >` and call its `get()` method. It would, | |
428 | of course, be possible to return a façade over the consumer end of the | |
429 | channel that would implicitly perform the `get()` and return a simple `T` (or | |
430 | throw). | |
431 | ||
432 | The implementation is just as you would expect. Notice, however, that we can | |
433 | reuse [link wait_first_outcome_impl `wait_first_outcome_impl()`], passing the | |
434 | `nchannel<T>` rather than `unbounded_channel<T>`. | |
435 | ||
436 | [wait_all_until_error_source] | |
437 | ||
438 | For example: | |
439 | ||
440 | [wait_all_until_error_source_ex] | |
441 | ||
442 | [endsect] | |
443 | [section wait_all, collecting all exceptions] | |
444 | ||
445 | [#wait_all_collect_errors] | |
446 | Given [link wait_all_until_error_source `wait_all_until_error_source()`], it | |
447 | might be more reasonable to make a `wait_all_...()` that collects ['all] | |
448 | errors instead of presenting only the first: | |
449 | ||
450 | [wait_all_collect_errors] | |
451 | ||
452 | The implementation is a simple variation on [link wait_first_success | |
453 | `wait_first_success()`], using the same [link exception_list `exception_list`] | |
454 | exception class. | |
455 | ||
456 | [endsect] | |
457 | [section when_all, heterogeneous types] | |
458 | ||
459 | But what about the case when we must wait for all results of different types? | |
460 | ||
461 | We can present an API that is frankly quite cool. Consider a sample struct: | |
462 | ||
463 | [wait_Data] | |
464 | ||
465 | Let's fill its members from task functions all running concurrently: | |
466 | ||
467 | [wait_all_members_data_ex] | |
468 | ||
469 | Note that for this case, we abandon the notion of capturing the earliest | |
470 | result first, and so on: we must fill exactly the passed struct in | |
471 | left-to-right order. | |
472 | ||
473 | That permits a beautifully simple implementation: | |
474 | ||
475 | [wait_all_members] | |
476 | ||
477 | [wait_all_members_get] | |
478 | ||
479 | It is tempting to try to implement `wait_all_members()` as a one-liner like | |
480 | this: | |
481 | ||
482 | return Result{ boost::fibers::async(functions).get()... }; | |
483 | ||
484 | The trouble with this tactic is that it would serialize all the task | |
485 | functions. The runtime makes a single pass through `functions`, calling | |
486 | [ns_function_link fibers..async] for each and then immediately calling | |
487 | [member_link future..get] on its returned `future<>`. That blocks the implicit | |
488 | loop. The above is almost equivalent to writing: | |
489 | ||
490 | return Result{ functions()... }; | |
491 | ||
492 | in which, of course, there is no concurrency at all. | |
493 | ||
494 | Passing the argument pack through a function-call boundary | |
495 | (`wait_all_members_get()`) forces the runtime to make ['two] passes: one in | |
496 | `wait_all_members()` to collect the `future<>`s from all the `async()` calls, | |
497 | the second in `wait_all_members_get()` to fetch each of the results. | |
498 | ||
499 | As noted in comments, within the `wait_all_members_get()` parameter pack | |
500 | expansion pass, the blocking behavior of `get()` becomes irrelevant. Along the | |
501 | way, we will hit the `get()` for the slowest task function; after that every | |
502 | subsequent `get()` will complete in trivial time. | |
503 | ||
504 | By the way, we could also use this same API to fill a vector or other | |
505 | collection: | |
506 | ||
507 | [wait_all_members_vector_ex] | |
508 | ||
509 | [endsect] | |
510 | [endsect][/ when_all] | |
511 | ||
512 | [endsect][/ outermost] |