2 Copyright Oliver Kowalke, Nat Goodspeed 2015.
3 Distributed under the Boost Software License, Version 1.0.
4 (See accompanying file LICENSE_1_0.txt or copy at
5 http://www.boost.org/LICENSE_1_0.txt
8 [/ import path is relative to this .qbk file]
9 [import ../examples/wait_stuff.cpp]
12 [section:when_any when_any / when_all functionality]
16 A bit of wisdom from the early days of computing still holds true today:
17 prefer to model program state using the instruction pointer rather than with
18 Boolean flags. In other words, if the program must ["do something] and then
19 do something almost the same, but with minor changes... perhaps parts of that
20 something should be broken out as smaller separate functions, rather than
21 introducing flags to alter the internal behavior of a monolithic function.
23 To that we would add: prefer to describe control flow using C++ native
24 constructs such as function calls, `if`, `while`, `for`, `do` et al.
25 rather than as chains of callbacks.
27 One of the great strengths of __boost_fiber__ is the flexibility it confers on
28 the coder to restructure an application from chains of callbacks to
29 straightforward C++ statement sequence, even when code in that fiber is
30 in fact interleaved with code running in other fibers.
32 There has been much recent discussion about the benefits of when_any and
33 when_all functionality. When dealing with asynchronous and possibly unreliable
34 services, these are valuable idioms. But of course when_any and when_all are
35 closely tied to the use of chains of callbacks.
37 This section presents recipes for achieving the same ends, in the context of a
38 fiber that wants to ["do something] when one or more other independent
39 activities have completed. Accordingly, these are `wait_something()`
40 functions rather than `when_something()` functions. The expectation is that
41 the calling fiber asks to launch those independent activities, then waits for
42 them, then sequentially proceeds with whatever processing depends on those
45 The function names shown (e.g. [link wait_first_simple `wait_first_simple()`])
46 are for illustrative purposes only, because all these functions have been
47 bundled into a single source file. Presumably, if (say) [link
48 wait_first_success `wait_first_success()`] best suits your application needs,
49 you could introduce that variant with the name `wait_any()`.
51 [note The functions presented in this section accept variadic argument lists
52 of task functions. Corresponding `wait_something()` functions accepting a
53 container of task functions are left as an exercise for the interested reader.
54 Those should actually be simpler. Most of the complexity would arise from
55 overloading the same name for both purposes.]
57 [/ @path link is relative to (eventual) doc/html/index.html, hence ../..]
58 All the source code for this section is found in
59 [@../../examples/wait_stuff.cpp wait_stuff.cpp].
61 [heading Example Task Function]
64 We found it convenient to model an asynchronous task using this function:
68 with type-specific `sleeper()` ["front ends] for `std::string`, `double` and
71 `Verbose` simply prints a message to `std::cout` on construction and
76 # `sleeper()` prints a start message;
77 # sleeps for the specified number of milliseconds;
78 # if `thrw` is passed as `true`, throws a string description of the passed
80 # else returns the passed `item`.
81 # On the way out, `sleeper()` produces a stop message.
83 This function will feature in the example calls to the various functions
87 [#wait_first_simple_section]
88 [section when_any, simple completion]
90 The simplest case is when you only need to know that the first of a set of
91 asynchronous tasks has completed [mdash] but you don't need to obtain a return
92 value, and you're confident that they will not throw exceptions.
95 For this we introduce a `Done` class to wrap a `bool` variable with a
96 [class_link condition_variable] and a [class_link mutex]:
100 The pattern we follow throughout this section is to pass a
101 [@http://www.cplusplus.com/reference/memory/shared_ptr/ `std::shared_ptr<>`]
102 to the relevant synchronization object to the various tasks' fiber functions.
103 This eliminates nagging questions about the lifespan of the synchronization
104 object relative to the last of the fibers.
107 `wait_first_simple()` uses that tactic for [link wait_done `Done`]:
111 [#wait_first_simple_impl]
112 `wait_first_simple_impl()` is an ordinary recursion over the argument pack,
113 capturing `Done::ptr` for each new fiber:
115 [wait_first_simple_impl]
117 The body of the fiber's lambda is extremely simple, as promised: call the
118 function, notify [link wait_done `Done`] when it returns. The first fiber to
119 do so allows `wait_first_simple()` to return [mdash] which is why it's useful
120 to have `std::shared_ptr<Done>` manage the lifespan of our `Done` object
121 rather than declaring it as a stack variable in `wait_first_simple()`.
123 This is how you might call it:
125 [wait_first_simple_ex]
127 In this example, control resumes after `wait_first_simple()` when [link
128 wait_sleeper `sleeper("wfs_short", 50)`] completes [mdash] even though the
129 other two `sleeper()` fibers are still running.
132 [section when_any, return value]
134 It seems more useful to add the ability to capture the return value from the
135 first of the task functions to complete. Again, we assume that none will throw
138 One tactic would be to adapt our [link wait_done `Done`] class to store the
139 first of the return values, rather than a simple `bool`. However, we choose
140 instead to use a [template_link unbounded_channel]. We'll only need to enqueue
141 the first value, so we'll [member_link unbounded_channel..close] it once we've
142 retrieved that value. Subsequent `push()` calls will return `closed`.
147 [#wait_first_value_impl]
148 The meat of the `wait_first_value_impl()` function is as you might expect:
150 [wait_first_value_impl]
152 It calls the passed function, pushes its return value and ignores the `push()`
153 result. You might call it like this:
155 [wait_first_value_ex]
158 [section when_any, produce first outcome, whether result or exception]
160 We may not be running in an environment in which we can guarantee no exception
161 will be thrown by any of our task functions. In that case, the above
162 implementations of `wait_first_something()` would be naïve: as mentioned in
163 [link exceptions the section on Fiber Management], an uncaught exception in one
164 of our task fibers would cause `std::terminate()` to be called.
166 Let's at least ensure that such an exception would propagate to the fiber
167 awaiting the first result. We can use [template_link future] to transport
168 either a return value or an exception. Therefore, we will change [link
169 wait_first_value `wait_first_value()`]'s [template_link unbounded_channel] to
170 hold `future< T >` items instead of simply `T`.
172 Once we have a `future<>` in hand, all we need do is call [member_link
173 future..get], which will either return the value or rethrow the exception.
175 [#wait_first_outcome]
178 So far so good [mdash] but there's a timing issue. How should we obtain the
179 `future<>` to [member_link unbounded_channel..push] on the channel?
181 We could call [ns_function_link fibers..async]. That would certainly produce a
182 `future<>` for the task function. The trouble is that it would return too
183 quickly! We only want `future<>` items for ['completed] tasks on our
184 `unbounded_channel<>`. In fact, we only want the `future<>` for the one that
185 completes first. If each fiber launched by `wait_first_outcome()` were to
186 `push()` the result of calling `async()`, the channel would only ever report
187 the result of the leftmost task item [mdash] ['not] the one that completes most
190 Calling [member_link future..get] on the future returned by `async()` wouldn't
191 be right. You can only call `get()` once per `future<>` instance! And if there
192 were an exception, it would be rethrown inside the helper fiber at the
193 producer end of the channel, rather than propagated to the consumer end.
195 We could call [member_link future..wait]. That would block the helper fiber
196 until the `future<>` became ready, at which point we could `push()` it to be
197 retrieved by `wait_first_outcome()`.
199 That would work [mdash] but there's a simpler tactic that avoids creating an extra
200 fiber. We can wrap the task function in a [template_link packaged_task]. While
201 one naturally thinks of passing a `packaged_task<>` to a new fiber [mdash] that is,
202 in fact, what `async()` does [mdash] in this case, we're already running in the
203 helper fiber at the producer end of the channel! We can simply ['call] the
204 `packaged_task<>`. On return from that call, the task function has completed,
205 meaning that the `future<>` obtained from the `packaged_task<>` is certain to
206 be ready. At that point we can simply `push()` it to the channel.
208 [#wait_first_outcome_impl]
209 [wait_first_outcome_impl]
211 Calling it might look like this:
213 [wait_first_outcome_ex]
216 [section when_any, produce first success]
218 One scenario for ["when_any] functionality is when we're redundantly contacting
219 some number of possibly-unreliable web services. Not only might they be slow
220 [mdash] any one of them might produce a failure rather than the desired
223 In such a case, [link wait_first_outcome `wait_first_outcome()`] isn't the
224 right approach. If one of the services produces an error quickly, while
225 another follows up with a real answer, we don't want to prefer the error just
226 because it arrived first!
228 Given the `unbounded_channel< future< T > >` we already constructed for
229 `wait_first_outcome()`, though, we can readily recast the interface function
230 to deliver the first ['successful] result.
232 That does beg the question: what if ['all] the task functions throw an
233 exception? In that case we'd probably better know about it.
237 [@http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/n4407.html#parallel.exceptions.synopsis
238 C++ Parallelism Draft Technical Specification] proposes a
239 `std::exception_list` exception capable of delivering a collection of
240 `std::exception_ptr`s. Until that becomes universally available, let's fake up
241 an `exception_list` of our own:
245 Now we can build `wait_first_success()`, using [link wait_first_outcome_impl
246 `wait_first_outcome_impl()`].
248 Instead of retrieving only the first `future<>` from the channel, we must now
249 loop over `future<>` items. Of course we must limit that iteration! If we
250 launch only `count` producer fibers, the `(count+1)`[superscript st]
251 [member_link unbounded_channel..pop] call would block forever.
253 Given a ready `future<>`, we can distinguish failure by calling [member_link
254 future..get_exception_ptr]. If the `future<>` in fact contains a result rather
255 than an exception, `get_exception_ptr()` returns `nullptr`. In that case, we
256 can confidently call [member_link future..get] to return that result to our
259 If the `std::exception_ptr` is ['not] `nullptr`, though, we collect it into
260 our pending `exception_list` and loop back for the next `future<>` from the
263 If we fall out of the loop [mdash] if every single task fiber threw an
264 exception [mdash] we throw the `exception_list` exception into which we've
265 been collecting those `std::exception_ptr`s.
267 [#wait_first_success]
270 A call might look like this:
272 [wait_first_success_ex]
275 [section when_any, heterogeneous types]
277 We would be remiss to ignore the case in which the various task functions have
278 distinct return types. That means that the value returned by the first of them
279 might have any one of those types. We can express that with
280 [@http://www.boost.org/doc/libs/release/doc/html/variant.html Boost.Variant].
282 To keep the example simple, we'll revert to pretending that none of them can
283 throw an exception. That makes `wait_first_value_het()` strongly resemble
284 [link wait_first_value `wait_first_value()`]. We can actually reuse [link
285 wait_first_value_impl `wait_first_value_impl()`], merely passing
286 `boost::variant<T0, T1, ...>` as the channel's value type rather than the
289 Naturally this could be extended to use [link wait_first_success
290 `wait_first_success()`] semantics instead.
292 [wait_first_value_het]
294 It might be called like this:
296 [wait_first_value_het_ex]
299 [section when_any, a dubious alternative]
301 Certain topics in C++ can arouse strong passions, and exceptions are no
302 exception. We cannot resist mentioning [mdash] for purely informational
303 purposes [mdash] that when you need only the ['first] result from some number
304 of concurrently-running fibers, it would be possible to pass a
305 [^shared_ptr<[template_link promise]>] to the participating fibers, then cause
306 the initiating fiber to call [member_link future..get] on its [template_link
307 future]. The first fiber to call [member_link promise..set_value] on that
308 shared `promise` will succeed; subsequent `set_value()` calls on the same
309 `promise` instance will throw `future_error`.
311 Use this information at your own discretion. Beware the dark side.
314 [endsect][/ when_any]
316 [section when_all functionality]
317 [section when_all, simple completion]
319 For the case in which we must wait for ['all] task functions to complete
320 [mdash] but we don't need results (or expect exceptions) from any of them
321 [mdash] we can write `wait_all_simple()` that looks remarkably like [link
322 wait_first_simple `wait_first_simple()`]. The difference is that instead of
323 our [link wait_done `Done`] class, we instantiate a [class_link barrier] and
324 call its [member_link barrier..wait].
326 We initialize the `barrier` with `(count+1)` because we are launching `count`
327 fibers, plus the `wait()` call within `wait_all_simple()` itself.
331 As stated above, the only difference between `wait_all_simple_impl()` and
332 [link wait_first_simple_impl `wait_first_simple_impl()`] is that the former
333 calls `barrier::wait()` rather than `Done::notify()`:
335 [wait_all_simple_impl]
337 You might call it like this:
341 Control will not return from the `wait_all_simple()` call until the last of
342 its task functions has completed.
345 [section when_all, return values]
347 As soon as we want to collect return values from all the task functions, we
348 can see right away how to reuse [link wait_first_value `wait_first_value()`]'s
349 channel<T> for the purpose. All we have to do is avoid closing it after the
352 But in fact, collecting multiple values raises an interesting question: do we
353 ['really] want to wait until the slowest of them has arrived? Wouldn't we
354 rather process each result as soon as it becomes available?
356 Fortunately we can present both APIs. Let's define `wait_all_values_source()`
357 to return `shared_ptr<unbounded_channel<T>>`.[footnote We could have used
358 either [template_link bounded_channel] or [template_link unbounded_channel].
359 We chose `unbounded_channel<>` on the assumption that its simpler semantics
360 imply a cheaper implementation.]
363 Given `wait_all_values_source()`, it's straightforward to implement
368 It might be called like this:
372 As you can see from the loop in `wait_all_values()`, instead of requiring its
373 caller to count values, we define `wait_all_values_source()` to [member_link
374 unbounded_channel..close] the channel when done. But how do we do that? Each
375 producer fiber is independent. It has no idea whether it is the last one to
376 [member_link unbounded_channel..push] a value.
379 We can address that problem with a counting façade for the
380 `unbounded_channel<>`. In fact, our façade need only support the producer end of
385 [#wait_all_values_source]
386 Armed with `nchannel<>`, we can implement `wait_all_values_source()`. It
387 starts just like [link wait_first_value `wait_first_value()`]. The difference
388 is that we wrap the `unbounded_channel<T>` with an `nchannel<T>` to pass to
391 Then, of course, instead of popping the first value, closing the channel and
392 returning it, we simply return the `shared_ptr<unbounded_channel<T>>`.
394 [wait_all_values_source]
398 [wait_all_values_source_ex]
400 [#wait_all_values_impl]
401 `wait_all_values_impl()` really is just like [link wait_first_value_impl
402 `wait_first_value_impl()`] except for the use of `nchannel<T>` rather than
403 `unbounded_channel<T>`:
405 [wait_all_values_impl]
408 [section when_all until first exception]
410 Naturally, just as with [link wait_first_outcome `wait_first_outcome()`], we
411 can elaborate [link wait_all_values `wait_all_values()`] and [link
412 wait_all_values_source `wait_all_values_source()`] by passing `future< T >`
413 instead of plain `T`.
415 [#wait_all_until_error]
416 `wait_all_until_error()` pops that `future< T >` and calls its [member_link
419 [wait_all_until_error]
423 [wait_all_until_error_ex]
425 [#wait_all_until_error_source]
426 Naturally this complicates the API for `wait_all_until_error_source()`. The
427 caller must both retrieve a `future< T >` and call its `get()` method. It would,
428 of course, be possible to return a façade over the consumer end of the
429 channel that would implicitly perform the `get()` and return a simple `T` (or
432 The implementation is just as you would expect. Notice, however, that we can
433 reuse [link wait_first_outcome_impl `wait_first_outcome_impl()`], passing the
434 `nchannel<T>` rather than `unbounded_channel<T>`.
436 [wait_all_until_error_source]
440 [wait_all_until_error_source_ex]
443 [section wait_all, collecting all exceptions]
445 [#wait_all_collect_errors]
446 Given [link wait_all_until_error_source `wait_all_until_error_source()`], it
447 might be more reasonable to make a `wait_all_...()` that collects ['all]
448 errors instead of presenting only the first:
450 [wait_all_collect_errors]
452 The implementation is a simple variation on [link wait_first_success
453 `wait_first_success()`], using the same [link exception_list `exception_list`]
457 [section when_all, heterogeneous types]
459 But what about the case when we must wait for all results of different types?
461 We can present an API that is frankly quite cool. Consider a sample struct:
465 Let's fill its members from task functions all running concurrently:
467 [wait_all_members_data_ex]
469 Note that for this case, we abandon the notion of capturing the earliest
470 result first, and so on: we must fill exactly the passed struct in
473 That permits a beautifully simple implementation:
477 [wait_all_members_get]
479 It is tempting to try to implement `wait_all_members()` as a one-liner like
482 return Result{ boost::fibers::async(functions).get()... };
484 The trouble with this tactic is that it would serialize all the task
485 functions. The runtime makes a single pass through `functions`, calling
486 [ns_function_link fibers..async] for each and then immediately calling
487 [member_link future..get] on its returned `future<>`. That blocks the implicit
488 loop. The above is almost equivalent to writing:
490 return Result{ functions()... };
492 in which, of course, there is no concurrency at all.
494 Passing the argument pack through a function-call boundary
495 (`wait_all_members_get()`) forces the runtime to make ['two] passes: one in
496 `wait_all_members()` to collect the `future<>`s from all the `async()` calls,
497 the second in `wait_all_members_get()` to fetch each of the results.
499 As noted in comments, within the `wait_all_members_get()` parameter pack
500 expansion pass, the blocking behavior of `get()` becomes irrelevant. Along the
501 way, we will hit the `get()` for the slowest task function; after that every
502 subsequent `get()` will complete in trivial time.
504 By the way, we could also use this same API to fill a vector or other
507 [wait_all_members_vector_ex]
510 [endsect][/ when_all]
512 [endsect][/ outermost]