]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/fiber/doc/when_any.qbk
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / boost / libs / fiber / doc / when_any.qbk
1 [/
2 Copyright Oliver Kowalke, Nat Goodspeed 2015.
3 Distributed under the Boost Software License, Version 1.0.
4 (See accompanying file LICENSE_1_0.txt or copy at
5 http://www.boost.org/LICENSE_1_0.txt
6 ]
7
8 [/ import path is relative to this .qbk file]
9 [import ../examples/wait_stuff.cpp]
10
11 [#when_any]
12 [section:when_any when_any / when_all functionality]
13
14 [heading Overview]
15
16 A bit of wisdom from the early days of computing still holds true today:
17 prefer to model program state using the instruction pointer rather than with
18 Boolean flags. In other words, if the program must ["do something] and then
19 do something almost the same, but with minor changes... perhaps parts of that
20 something should be broken out as smaller separate functions, rather than
21 introducing flags to alter the internal behavior of a monolithic function.
22
23 To that we would add: prefer to describe control flow using C++ native
24 constructs such as function calls, `if`, `while`, `for`, `do` et al.
25 rather than as chains of callbacks.
26
27 One of the great strengths of __boost_fiber__ is the flexibility it confers on
28 the coder to restructure an application from chains of callbacks to
29 straightforward C++ statement sequence, even when code in that fiber is
30 in fact interleaved with code running in other fibers.
31
32 There has been much recent discussion about the benefits of when_any and
33 when_all functionality. When dealing with asynchronous and possibly unreliable
34 services, these are valuable idioms. But of course when_any and when_all are
35 closely tied to the use of chains of callbacks.
36
37 This section presents recipes for achieving the same ends, in the context of a
38 fiber that wants to ["do something] when one or more other independent
39 activities have completed. Accordingly, these are `wait_something()`
40 functions rather than `when_something()` functions. The expectation is that
41 the calling fiber asks to launch those independent activities, then waits for
42 them, then sequentially proceeds with whatever processing depends on those
43 results.
44
45 The function names shown (e.g. [link wait_first_simple `wait_first_simple()`])
46 are for illustrative purposes only, because all these functions have been
47 bundled into a single source file. Presumably, if (say) [link
48 wait_first_success `wait_first_success()`] best suits your application needs,
49 you could introduce that variant with the name `wait_any()`.
50
51 [note The functions presented in this section accept variadic argument lists
52 of task functions. Corresponding `wait_something()` functions accepting a
53 container of task functions are left as an exercise for the interested reader.
54 Those should actually be simpler. Most of the complexity would arise from
55 overloading the same name for both purposes.]
56
57 [/ @path link is relative to (eventual) doc/html/index.html, hence ../..]
58 All the source code for this section is found in
59 [@../../examples/wait_stuff.cpp wait_stuff.cpp].
60
61 [heading Example Task Function]
62
63 [#wait_sleeper]
64 We found it convenient to model an asynchronous task using this function:
65
66 [wait_sleeper]
67
68 with type-specific `sleeper()` ["front ends] for `std::string`, `double` and
69 `int`.
70
71 `Verbose` simply prints a message to `std::cout` on construction and
72 destruction.
73
74 Basically:
75
76 # `sleeper()` prints a start message;
77 # sleeps for the specified number of milliseconds;
78 # if `thrw` is passed as `true`, throws a string description of the passed
79 `item`;
80 # else returns the passed `item`.
81 # On the way out, `sleeper()` produces a stop message.
82
83 This function will feature in the example calls to the various functions
84 presented below.
85
86 [section when_any]
87 [#wait_first_simple_section]
88 [section when_any, simple completion]
89
90 The simplest case is when you only need to know that the first of a set of
91 asynchronous tasks has completed [mdash] but you don't need to obtain a return
92 value, and you're confident that they will not throw exceptions.
93
94 [#wait_done]
95 For this we introduce a `Done` class to wrap a `bool` variable with a
96 [class_link condition_variable] and a [class_link mutex]:
97
98 [wait_done]
99
100 The pattern we follow throughout this section is to pass a
101 [@http://www.cplusplus.com/reference/memory/shared_ptr/ `std::shared_ptr<>`]
102 to the relevant synchronization object to the various tasks' fiber functions.
103 This eliminates nagging questions about the lifespan of the synchronization
104 object relative to the last of the fibers.
105
106 [#wait_first_simple]
107 `wait_first_simple()` uses that tactic for [link wait_done `Done`]:
108
109 [wait_first_simple]
110
111 [#wait_first_simple_impl]
112 `wait_first_simple_impl()` is an ordinary recursion over the argument pack,
113 capturing `Done::ptr` for each new fiber:
114
115 [wait_first_simple_impl]
116
117 The body of the fiber's lambda is extremely simple, as promised: call the
118 function, notify [link wait_done `Done`] when it returns. The first fiber to
119 do so allows `wait_first_simple()` to return [mdash] which is why it's useful
120 to have `std::shared_ptr<Done>` manage the lifespan of our `Done` object
121 rather than declaring it as a stack variable in `wait_first_simple()`.
122
123 This is how you might call it:
124
125 [wait_first_simple_ex]
126
127 In this example, control resumes after `wait_first_simple()` when [link
128 wait_sleeper `sleeper("wfs_short", 50)`] completes [mdash] even though the
129 other two `sleeper()` fibers are still running.
130
131 [endsect]
132 [section when_any, return value]
133
134 It seems more useful to add the ability to capture the return value from the
135 first of the task functions to complete. Again, we assume that none will throw
136 an exception.
137
138 One tactic would be to adapt our [link wait_done `Done`] class to store the
139 first of the return values, rather than a simple `bool`. However, we choose
140 instead to use a [template_link unbounded_channel]. We'll only need to enqueue
141 the first value, so we'll [member_link unbounded_channel..close] it once we've
142 retrieved that value. Subsequent `push()` calls will return `closed`.
143
144 [#wait_first_value]
145 [wait_first_value]
146
147 [#wait_first_value_impl]
148 The meat of the `wait_first_value_impl()` function is as you might expect:
149
150 [wait_first_value_impl]
151
152 It calls the passed function, pushes its return value and ignores the `push()`
153 result. You might call it like this:
154
155 [wait_first_value_ex]
156
157 [endsect]
158 [section when_any, produce first outcome, whether result or exception]
159
160 We may not be running in an environment in which we can guarantee no exception
161 will be thrown by any of our task functions. In that case, the above
162 implementations of `wait_first_something()` would be naïve: as mentioned in
163 [link exceptions the section on Fiber Management], an uncaught exception in one
164 of our task fibers would cause `std::terminate()` to be called.
165
166 Let's at least ensure that such an exception would propagate to the fiber
167 awaiting the first result. We can use [template_link future] to transport
168 either a return value or an exception. Therefore, we will change [link
169 wait_first_value `wait_first_value()`]'s [template_link unbounded_channel] to
170 hold `future< T >` items instead of simply `T`.
171
172 Once we have a `future<>` in hand, all we need do is call [member_link
173 future..get], which will either return the value or rethrow the exception.
174
175 [#wait_first_outcome]
176 [wait_first_outcome]
177
178 So far so good [mdash] but there's a timing issue. How should we obtain the
179 `future<>` to [member_link unbounded_channel..push] on the channel?
180
181 We could call [ns_function_link fibers..async]. That would certainly produce a
182 `future<>` for the task function. The trouble is that it would return too
183 quickly! We only want `future<>` items for ['completed] tasks on our
184 `unbounded_channel<>`. In fact, we only want the `future<>` for the one that
185 completes first. If each fiber launched by `wait_first_outcome()` were to
186 `push()` the result of calling `async()`, the channel would only ever report
187 the result of the leftmost task item [mdash] ['not] the one that completes most
188 quickly.
189
190 Calling [member_link future..get] on the future returned by `async()` wouldn't
191 be right. You can only call `get()` once per `future<>` instance! And if there
192 were an exception, it would be rethrown inside the helper fiber at the
193 producer end of the channel, rather than propagated to the consumer end.
194
195 We could call [member_link future..wait]. That would block the helper fiber
196 until the `future<>` became ready, at which point we could `push()` it to be
197 retrieved by `wait_first_outcome()`.
198
199 That would work [mdash] but there's a simpler tactic that avoids creating an extra
200 fiber. We can wrap the task function in a [template_link packaged_task]. While
201 one naturally thinks of passing a `packaged_task<>` to a new fiber [mdash] that is,
202 in fact, what `async()` does [mdash] in this case, we're already running in the
203 helper fiber at the producer end of the channel! We can simply ['call] the
204 `packaged_task<>`. On return from that call, the task function has completed,
205 meaning that the `future<>` obtained from the `packaged_task<>` is certain to
206 be ready. At that point we can simply `push()` it to the channel.
207
208 [#wait_first_outcome_impl]
209 [wait_first_outcome_impl]
210
211 Calling it might look like this:
212
213 [wait_first_outcome_ex]
214
215 [endsect]
216 [section when_any, produce first success]
217
218 One scenario for ["when_any] functionality is when we're redundantly contacting
219 some number of possibly-unreliable web services. Not only might they be slow
220 [mdash] any one of them might produce a failure rather than the desired
221 result.
222
223 In such a case, [link wait_first_outcome `wait_first_outcome()`] isn't the
224 right approach. If one of the services produces an error quickly, while
225 another follows up with a real answer, we don't want to prefer the error just
226 because it arrived first!
227
228 Given the `unbounded_channel< future< T > >` we already constructed for
229 `wait_first_outcome()`, though, we can readily recast the interface function
230 to deliver the first ['successful] result.
231
232 That does beg the question: what if ['all] the task functions throw an
233 exception? In that case we'd probably better know about it.
234
235 [#exception_list]
236 The
237 [@http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/n4407.html#parallel.exceptions.synopsis
238 C++ Parallelism Draft Technical Specification] proposes a
239 `std::exception_list` exception capable of delivering a collection of
240 `std::exception_ptr`s. Until that becomes universally available, let's fake up
241 an `exception_list` of our own:
242
243 [exception_list]
244
245 Now we can build `wait_first_success()`, using [link wait_first_outcome_impl
246 `wait_first_outcome_impl()`].
247
248 Instead of retrieving only the first `future<>` from the channel, we must now
249 loop over `future<>` items. Of course we must limit that iteration! If we
250 launch only `count` producer fibers, the `(count+1)`[superscript st]
251 [member_link unbounded_channel..pop] call would block forever.
252
253 Given a ready `future<>`, we can distinguish failure by calling [member_link
254 future..get_exception_ptr]. If the `future<>` in fact contains a result rather
255 than an exception, `get_exception_ptr()` returns `nullptr`. In that case, we
256 can confidently call [member_link future..get] to return that result to our
257 caller.
258
259 If the `std::exception_ptr` is ['not] `nullptr`, though, we collect it into
260 our pending `exception_list` and loop back for the next `future<>` from the
261 channel.
262
263 If we fall out of the loop [mdash] if every single task fiber threw an
264 exception [mdash] we throw the `exception_list` exception into which we've
265 been collecting those `std::exception_ptr`s.
266
267 [#wait_first_success]
268 [wait_first_success]
269
270 A call might look like this:
271
272 [wait_first_success_ex]
273
274 [endsect]
275 [section when_any, heterogeneous types]
276
277 We would be remiss to ignore the case in which the various task functions have
278 distinct return types. That means that the value returned by the first of them
279 might have any one of those types. We can express that with
280 [@http://www.boost.org/doc/libs/release/doc/html/variant.html Boost.Variant].
281
282 To keep the example simple, we'll revert to pretending that none of them can
283 throw an exception. That makes `wait_first_value_het()` strongly resemble
284 [link wait_first_value `wait_first_value()`]. We can actually reuse [link
285 wait_first_value_impl `wait_first_value_impl()`], merely passing
286 `boost::variant<T0, T1, ...>` as the channel's value type rather than the
287 common `T`!
288
289 Naturally this could be extended to use [link wait_first_success
290 `wait_first_success()`] semantics instead.
291
292 [wait_first_value_het]
293
294 It might be called like this:
295
296 [wait_first_value_het_ex]
297
298 [endsect]
299 [section when_any, a dubious alternative]
300
301 Certain topics in C++ can arouse strong passions, and exceptions are no
302 exception. We cannot resist mentioning [mdash] for purely informational
303 purposes [mdash] that when you need only the ['first] result from some number
304 of concurrently-running fibers, it would be possible to pass a
305 [^shared_ptr<[template_link promise]>] to the participating fibers, then cause
306 the initiating fiber to call [member_link future..get] on its [template_link
307 future]. The first fiber to call [member_link promise..set_value] on that
308 shared `promise` will succeed; subsequent `set_value()` calls on the same
309 `promise` instance will throw `future_error`.
310
311 Use this information at your own discretion. Beware the dark side.
312
313 [endsect]
314 [endsect][/ when_any]
315
316 [section when_all functionality]
317 [section when_all, simple completion]
318
319 For the case in which we must wait for ['all] task functions to complete
320 [mdash] but we don't need results (or expect exceptions) from any of them
321 [mdash] we can write `wait_all_simple()` that looks remarkably like [link
322 wait_first_simple `wait_first_simple()`]. The difference is that instead of
323 our [link wait_done `Done`] class, we instantiate a [class_link barrier] and
324 call its [member_link barrier..wait].
325
326 We initialize the `barrier` with `(count+1)` because we are launching `count`
327 fibers, plus the `wait()` call within `wait_all_simple()` itself.
328
329 [wait_all_simple]
330
331 As stated above, the only difference between `wait_all_simple_impl()` and
332 [link wait_first_simple_impl `wait_first_simple_impl()`] is that the former
333 calls `barrier::wait()` rather than `Done::notify()`:
334
335 [wait_all_simple_impl]
336
337 You might call it like this:
338
339 [wait_all_simple_ex]
340
341 Control will not return from the `wait_all_simple()` call until the last of
342 its task functions has completed.
343
344 [endsect]
345 [section when_all, return values]
346
347 As soon as we want to collect return values from all the task functions, we
348 can see right away how to reuse [link wait_first_value `wait_first_value()`]'s
349 channel<T> for the purpose. All we have to do is avoid closing it after the
350 first value!
351
352 But in fact, collecting multiple values raises an interesting question: do we
353 ['really] want to wait until the slowest of them has arrived? Wouldn't we
354 rather process each result as soon as it becomes available?
355
356 Fortunately we can present both APIs. Let's define `wait_all_values_source()`
357 to return `shared_ptr<unbounded_channel<T>>`.[footnote We could have used
358 either [template_link bounded_channel] or [template_link unbounded_channel].
359 We chose `unbounded_channel<>` on the assumption that its simpler semantics
360 imply a cheaper implementation.]
361
362 [#wait_all_values]
363 Given `wait_all_values_source()`, it's straightforward to implement
364 `wait_all_values()`:
365
366 [wait_all_values]
367
368 It might be called like this:
369
370 [wait_all_values_ex]
371
372 As you can see from the loop in `wait_all_values()`, instead of requiring its
373 caller to count values, we define `wait_all_values_source()` to [member_link
374 unbounded_channel..close] the channel when done. But how do we do that? Each
375 producer fiber is independent. It has no idea whether it is the last one to
376 [member_link unbounded_channel..push] a value.
377
378 [#wait_nchannel]
379 We can address that problem with a counting façade for the
380 `unbounded_channel<>`. In fact, our façade need only support the producer end of
381 the channel.
382
383 [wait_nchannel]
384
385 [#wait_all_values_source]
386 Armed with `nchannel<>`, we can implement `wait_all_values_source()`. It
387 starts just like [link wait_first_value `wait_first_value()`]. The difference
388 is that we wrap the `unbounded_channel<T>` with an `nchannel<T>` to pass to
389 the producer fibers.
390
391 Then, of course, instead of popping the first value, closing the channel and
392 returning it, we simply return the `shared_ptr<unbounded_channel<T>>`.
393
394 [wait_all_values_source]
395
396 For example:
397
398 [wait_all_values_source_ex]
399
400 [#wait_all_values_impl]
401 `wait_all_values_impl()` really is just like [link wait_first_value_impl
402 `wait_first_value_impl()`] except for the use of `nchannel<T>` rather than
403 `unbounded_channel<T>`:
404
405 [wait_all_values_impl]
406
407 [endsect]
408 [section when_all until first exception]
409
410 Naturally, just as with [link wait_first_outcome `wait_first_outcome()`], we
411 can elaborate [link wait_all_values `wait_all_values()`] and [link
412 wait_all_values_source `wait_all_values_source()`] by passing `future< T >`
413 instead of plain `T`.
414
415 [#wait_all_until_error]
416 `wait_all_until_error()` pops that `future< T >` and calls its [member_link
417 future..get]:
418
419 [wait_all_until_error]
420
421 For example:
422
423 [wait_all_until_error_ex]
424
425 [#wait_all_until_error_source]
426 Naturally this complicates the API for `wait_all_until_error_source()`. The
427 caller must both retrieve a `future< T >` and call its `get()` method. It would,
428 of course, be possible to return a façade over the consumer end of the
429 channel that would implicitly perform the `get()` and return a simple `T` (or
430 throw).
431
432 The implementation is just as you would expect. Notice, however, that we can
433 reuse [link wait_first_outcome_impl `wait_first_outcome_impl()`], passing the
434 `nchannel<T>` rather than `unbounded_channel<T>`.
435
436 [wait_all_until_error_source]
437
438 For example:
439
440 [wait_all_until_error_source_ex]
441
442 [endsect]
443 [section wait_all, collecting all exceptions]
444
445 [#wait_all_collect_errors]
446 Given [link wait_all_until_error_source `wait_all_until_error_source()`], it
447 might be more reasonable to make a `wait_all_...()` that collects ['all]
448 errors instead of presenting only the first:
449
450 [wait_all_collect_errors]
451
452 The implementation is a simple variation on [link wait_first_success
453 `wait_first_success()`], using the same [link exception_list `exception_list`]
454 exception class.
455
456 [endsect]
457 [section when_all, heterogeneous types]
458
459 But what about the case when we must wait for all results of different types?
460
461 We can present an API that is frankly quite cool. Consider a sample struct:
462
463 [wait_Data]
464
465 Let's fill its members from task functions all running concurrently:
466
467 [wait_all_members_data_ex]
468
469 Note that for this case, we abandon the notion of capturing the earliest
470 result first, and so on: we must fill exactly the passed struct in
471 left-to-right order.
472
473 That permits a beautifully simple implementation:
474
475 [wait_all_members]
476
477 [wait_all_members_get]
478
479 It is tempting to try to implement `wait_all_members()` as a one-liner like
480 this:
481
482 return Result{ boost::fibers::async(functions).get()... };
483
484 The trouble with this tactic is that it would serialize all the task
485 functions. The runtime makes a single pass through `functions`, calling
486 [ns_function_link fibers..async] for each and then immediately calling
487 [member_link future..get] on its returned `future<>`. That blocks the implicit
488 loop. The above is almost equivalent to writing:
489
490 return Result{ functions()... };
491
492 in which, of course, there is no concurrency at all.
493
494 Passing the argument pack through a function-call boundary
495 (`wait_all_members_get()`) forces the runtime to make ['two] passes: one in
496 `wait_all_members()` to collect the `future<>`s from all the `async()` calls,
497 the second in `wait_all_members_get()` to fetch each of the results.
498
499 As noted in comments, within the `wait_all_members_get()` parameter pack
500 expansion pass, the blocking behavior of `get()` becomes irrelevant. Along the
501 way, we will hit the `get()` for the slowest task function; after that every
502 subsequent `get()` will complete in trivial time.
503
504 By the way, we could also use this same API to fill a vector or other
505 collection:
506
507 [wait_all_members_vector_ex]
508
509 [endsect]
510 [endsect][/ when_all]
511
512 [endsect][/ outermost]