]>
git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/fiber/examples/wait_stuff.cpp
1 // Copyright Nat Goodspeed 2015.
2 // Distributed under the Boost Software License, Version 1.0.
3 // (See accompanying file LICENSE_1_0.txt or copy at
4 // http://www.boost.org/LICENSE_1_0.txt)
13 #include <type_traits>
17 #include <boost/fiber/all.hpp>
18 #include <boost/variant/variant.hpp>
19 #include <boost/variant/get.hpp>
21 // These are wait_something() functions rather than when_something()
22 // functions. A big part of the point of the Fiber library is to model
23 // sequencing using the processor's instruction pointer rather than chains of
24 // callbacks. The future-oriented when_all() / when_any() functions are still
25 // based on chains of callbacks. With Fiber, we can do better.
27 /*****************************************************************************
29 *****************************************************************************/
32 Verbose( std::string
const& d
):
34 std::cout
<< desc
<< " start" << std::endl
;
38 std::cout
<< desc
<< " stop" << std::endl
;
41 Verbose( Verbose
const&) = delete;
42 Verbose
& operator=( Verbose
const&) = delete;
45 const std::string desc
;
48 /*****************************************************************************
50 *****************************************************************************/
51 // collect and ultimately run every Example
53 typedef std::vector
< std::pair
< std::string
, std::function
< void() > > > function_list
;
56 void add( std::string
const& desc
, std::function
< void() > const& func
) {
57 functions_
.push_back( function_list::value_type( desc
, func
) );
61 for ( function_list::value_type
const& pair
: functions_
) {
62 Verbose
v( pair
.first
);
68 function_list functions_
;
73 // Example allows us to embed Runner::add() calls at module scope
75 Example( Runner
& runner
, std::string
const& desc
, std::function
< void() > const& func
) {
76 runner
.add( desc
, func
);
80 /*****************************************************************************
81 * example task functions
82 *****************************************************************************/
84 template< typename T
>
85 T
sleeper_impl( T item
, int ms
, bool thrw
= false) {
86 std::ostringstream descb
, funcb
;
88 std::string
desc( descb
.str() );
89 funcb
<< " sleeper(" << item
<< ")";
90 Verbose
v( funcb
.str() );
92 boost::this_fiber::sleep_for( std::chrono::milliseconds( ms
) );
94 throw std::runtime_error( desc
);
101 std::string
sleeper( std::string
const& item
, int ms
, bool thrw
= false) {
102 return sleeper_impl( item
, ms
, thrw
);
106 double sleeper( double item
, int ms
, bool thrw
= false) {
107 return sleeper_impl( item
, ms
, thrw
);
111 int sleeper(int item
, int ms
, bool thrw
= false) {
112 return sleeper_impl( item
, ms
, thrw
);
115 /*****************************************************************************
117 *****************************************************************************/
119 // Wrap canonical pattern for condition_variable + bool flag
122 boost::fibers::condition_variable cond
;
123 boost::fibers::mutex mutex
;
127 typedef std::shared_ptr
< Done
> ptr
;
130 std::unique_lock
< boost::fibers::mutex
> lock( mutex
);
131 cond
.wait( lock
, [this](){ return ready
; });
136 std::unique_lock
< boost::fibers::mutex
> lock( mutex
);
144 /*****************************************************************************
145 * when_any, simple completion
146 *****************************************************************************/
147 //[wait_first_simple_impl
148 // Degenerate case: when there are no functions to wait for, return
150 void wait_first_simple_impl( Done::ptr
) {
153 // When there's at least one function to wait for, launch it and recur to
155 template< typename Fn
, typename
... Fns
>
156 void wait_first_simple_impl( Done::ptr done
, Fn
&& function
, Fns
&& ... functions
) {
157 boost::fibers::fiber( [done
, function
](){
161 wait_first_simple_impl( done
, std::forward
< Fns
>( functions
) ... );
165 // interface function: instantiate Done, launch tasks, wait for Done
167 template< typename
... Fns
>
168 void wait_first_simple( Fns
&& ... functions
) {
169 // Use shared_ptr because each function's fiber will bind it separately,
170 // and we're going to return before the last of them completes.
171 auto done( std::make_shared
< Done
>() );
172 wait_first_simple_impl( done
, std::forward
< Fns
>( functions
) ... );
178 Example
wfs( runner
, "wait_first_simple()", [](){
179 //[wait_first_simple_ex
181 [](){ sleeper("wfs_long", 150); },
182 [](){ sleeper("wfs_medium", 100); },
183 [](){ sleeper("wfs_short", 50); });
187 /*****************************************************************************
188 * when_any, return value
189 *****************************************************************************/
190 // When there's only one function, call this overload
191 //[wait_first_value_impl
192 template< typename T
, typename Fn
>
193 void wait_first_value_impl( std::shared_ptr
< boost::fibers::buffered_channel
< T
> > chan
,
195 boost::fibers::fiber( [chan
, function
](){
196 // Ignore channel_op_status returned by push():
197 // might be closed; we simply don't care.
198 chan
->push( function() );
203 // When there are two or more functions, call this overload
204 template< typename T
, typename Fn0
, typename Fn1
, typename
... Fns
>
205 void wait_first_value_impl( std::shared_ptr
< boost::fibers::buffered_channel
< T
> > chan
,
208 Fns
&& ... functions
) {
209 // process the first function using the single-function overload
210 wait_first_value_impl
< T
>( chan
,
211 std::forward
< Fn0
>( function0
) );
212 // then recur to process the rest
213 wait_first_value_impl
< T
>( chan
,
214 std::forward
< Fn1
>( function1
),
215 std::forward
< Fns
>( functions
) ... );
219 // Assume that all passed functions have the same return type. The return type
220 // of wait_first_value() is the return type of the first passed function. It is
221 // simply invalid to pass NO functions.
222 template< typename Fn
, typename
... Fns
>
223 typename
std::result_of
< Fn() >::type
224 wait_first_value( Fn
&& function
, Fns
&& ... functions
) {
225 typedef typename
std::result_of
< Fn() >::type return_t
;
226 typedef boost::fibers::buffered_channel
< return_t
> channel_t
;
227 auto chanp( std::make_shared
< channel_t
>( 64) );
228 // launch all the relevant fibers
229 wait_first_value_impl
< return_t
>( chanp
,
230 std::forward
< Fn
>( function
),
231 std::forward
< Fns
>( functions
) ... );
232 // retrieve the first value
233 return_t
value( chanp
->value_pop() );
234 // close the channel: no subsequent push() has to succeed
241 Example
wfv( runner
, "wait_first_value()", [](){
242 //[wait_first_value_ex
243 std::string result
= wait_first_value(
244 [](){ return sleeper("wfv_third", 150); },
245 [](){ return sleeper("wfv_second", 100); },
246 [](){ return sleeper("wfv_first", 50); });
247 std::cout
<< "wait_first_value() => " << result
<< std::endl
;
248 assert(result
== "wfv_first");
252 /*****************************************************************************
253 * when_any, produce first outcome, whether result or exception
254 *****************************************************************************/
255 // When there's only one function, call this overload.
256 //[wait_first_outcome_impl
257 template< typename T
, typename CHANP
, typename Fn
>
258 void wait_first_outcome_impl( CHANP chan
, Fn
&& function
) {
259 boost::fibers::fiber(
260 // Use std::bind() here for C++11 compatibility. C++11 lambda capture
261 // can't move a move-only Fn type, but bind() can. Let bind() move the
262 // channel pointer and the function into the bound object, passing
263 // references into the lambda.
266 typename
std::decay
< Fn
>::type
& function
) {
267 // Instantiate a packaged_task to capture any exception thrown by
269 boost::fibers::packaged_task
< T() > task( function
);
270 // Immediately run this packaged_task on same fiber. We want
271 // function() to have completed BEFORE we push the future.
273 // Pass the corresponding future to consumer. Ignore
274 // channel_op_status returned by push(): might be closed; we
275 // simply don't care.
276 chan
->push( task
.get_future() );
279 std::forward
< Fn
>( function
)
284 // When there are two or more functions, call this overload
285 template< typename T
, typename CHANP
, typename Fn0
, typename Fn1
, typename
... Fns
>
286 void wait_first_outcome_impl( CHANP chan
,
289 Fns
&& ... functions
) {
290 // process the first function using the single-function overload
291 wait_first_outcome_impl
< T
>( chan
,
292 std::forward
< Fn0
>( function0
) );
293 // then recur to process the rest
294 wait_first_outcome_impl
< T
>( chan
,
295 std::forward
< Fn1
>( function1
),
296 std::forward
< Fns
>( functions
) ... );
299 // Assume that all passed functions have the same return type. The return type
300 // of wait_first_outcome() is the return type of the first passed function. It is
301 // simply invalid to pass NO functions.
302 //[wait_first_outcome
303 template< typename Fn
, typename
... Fns
>
304 typename
std::result_of
< Fn() >::type
305 wait_first_outcome( Fn
&& function
, Fns
&& ... functions
) {
306 // In this case, the value we pass through the channel is actually a
307 // future -- which is already ready. future can carry either a value or an
309 typedef typename
std::result_of
< Fn() >::type return_t
;
310 typedef boost::fibers::future
< return_t
> future_t
;
311 typedef boost::fibers::buffered_channel
< future_t
> channel_t
;
312 auto chanp(std::make_shared
< channel_t
>( 64) );
313 // launch all the relevant fibers
314 wait_first_outcome_impl
< return_t
>( chanp
,
315 std::forward
< Fn
>( function
),
316 std::forward
< Fns
>( functions
) ... );
317 // retrieve the first future
318 future_t
future( chanp
->value_pop() );
319 // close the channel: no subsequent push() has to succeed
321 // either return value or throw exception
327 Example
wfo( runner
, "wait_first_outcome()", [](){
328 //[wait_first_outcome_ex
329 std::string result
= wait_first_outcome(
330 [](){ return sleeper("wfos_first", 50); },
331 [](){ return sleeper("wfos_second", 100); },
332 [](){ return sleeper("wfos_third", 150); });
333 std::cout
<< "wait_first_outcome(success) => " << result
<< std::endl
;
334 assert(result
== "wfos_first");
338 result
= wait_first_outcome(
339 [](){ return sleeper("wfof_first", 50, true); },
340 [](){ return sleeper("wfof_second", 100); },
341 [](){ return sleeper("wfof_third", 150); });
342 } catch ( std::exception
const& e
) {
345 std::cout
<< "wait_first_outcome(fail) threw '" << thrown
347 assert(thrown
== "wfof_first");
351 /*****************************************************************************
352 * when_any, collect exceptions until success; throw exception_list if no
354 *****************************************************************************/
355 // define an exception to aggregate exception_ptrs; prefer
356 // std::exception_list (N4407 et al.) once that becomes available
358 class exception_list
: public std::runtime_error
{
360 exception_list( std::string
const& what
) :
361 std::runtime_error( what
) {
364 typedef std::vector
< std::exception_ptr
> bundle_t
;
366 // N4407 proposed std::exception_list API
367 typedef bundle_t::const_iterator iterator
;
369 std::size_t size() const noexcept
{
370 return bundle_
.size();
373 iterator
begin() const noexcept
{
374 return bundle_
.begin();
377 iterator
end() const noexcept
{
378 return bundle_
.end();
381 // extension to populate
382 void add( std::exception_ptr ep
) {
383 bundle_
.push_back( ep
);
391 // Assume that all passed functions have the same return type. The return type
392 // of wait_first_success() is the return type of the first passed function. It is
393 // simply invalid to pass NO functions.
394 //[wait_first_success
395 template< typename Fn
, typename
... Fns
>
396 typename
std::result_of
< Fn() >::type
397 wait_first_success( Fn
&& function
, Fns
&& ... functions
) {
398 std::size_t count( 1 + sizeof ... ( functions
) );
399 // In this case, the value we pass through the channel is actually a
400 // future -- which is already ready. future can carry either a value or an
402 typedef typename
std::result_of
< typename
std::decay
< Fn
>::type() >::type return_t
;
403 typedef boost::fibers::future
< return_t
> future_t
;
404 typedef boost::fibers::buffered_channel
< future_t
> channel_t
;
405 auto chanp( std::make_shared
< channel_t
>( 64) );
406 // launch all the relevant fibers
407 wait_first_outcome_impl
< return_t
>( chanp
,
408 std::forward
< Fn
>( function
),
409 std::forward
< Fns
>( functions
) ... );
410 // instantiate exception_list, just in case
411 exception_list
exceptions("wait_first_success() produced only errors");
412 // retrieve up to 'count' results -- but stop there!
413 for ( std::size_t i
= 0; i
< count
; ++i
) {
414 // retrieve the next future
415 future_t
future( chanp
->value_pop() );
416 // retrieve exception_ptr if any
417 std::exception_ptr
error( future
.get_exception_ptr() );
418 // if no error, then yay, return value
420 // close the channel: no subsequent push() has to succeed
422 // show caller the value we got
426 // error is non-null: collect
427 exceptions
.add( error
);
429 // We only arrive here when every passed function threw an exception.
430 // Throw our collection to inform caller.
436 Example
wfss( runner
, "wait_first_success()", [](){
437 //[wait_first_success_ex
438 std::string result
= wait_first_success(
439 [](){ return sleeper("wfss_first", 50, true); },
440 [](){ return sleeper("wfss_second", 100); },
441 [](){ return sleeper("wfss_third", 150); });
442 std::cout
<< "wait_first_success(success) => " << result
<< std::endl
;
443 assert(result
== "wfss_second");
447 std::size_t count
= 0;
449 result
= wait_first_success(
450 [](){ return sleeper("wfsf_first", 50, true); },
451 [](){ return sleeper("wfsf_second", 100, true); },
452 [](){ return sleeper("wfsf_third", 150, true); });
453 } catch ( exception_list
const& e
) {
456 } catch ( std::exception
const& e
) {
459 std::cout
<< "wait_first_success(fail) threw '" << thrown
<< "': "
460 << count
<< " errors" << std::endl
;
461 assert(thrown
== "wait_first_success() produced only errors");
465 /*****************************************************************************
466 * when_any, heterogeneous
467 *****************************************************************************/
468 //[wait_first_value_het
469 // No need to break out the first Fn for interface function: let the compiler
470 // complain if empty.
471 // Our functions have different return types, and we might have to return any
472 // of them. Use a variant, expanding std::result_of<Fn()>::type for each Fn in
474 template< typename
... Fns
>
475 boost::variant
< typename
std::result_of
< Fns() >::type
... >
476 wait_first_value_het( Fns
&& ... functions
) {
477 // Use buffered_channel<boost::variant<T1, T2, ...>>; see remarks above.
478 typedef boost::variant
< typename
std::result_of
< Fns() >::type
... > return_t
;
479 typedef boost::fibers::buffered_channel
< return_t
> channel_t
;
480 auto chanp( std::make_shared
< channel_t
>( 64) );
481 // launch all the relevant fibers
482 wait_first_value_impl
< return_t
>( chanp
,
483 std::forward
< Fns
>( functions
) ... );
484 // retrieve the first value
485 return_t
value( chanp
->value_pop() );
486 // close the channel: no subsequent push() has to succeed
493 Example
wfvh( runner
, "wait_first_value_het()", [](){
494 //[wait_first_value_het_ex
495 boost::variant
< std::string
, double, int > result
=
496 wait_first_value_het(
497 [](){ return sleeper("wfvh_third", 150); },
498 [](){ return sleeper(3.14, 100); },
499 [](){ return sleeper(17, 50); });
500 std::cout
<< "wait_first_value_het() => " << result
<< std::endl
;
501 assert(boost::get
< int >( result
) == 17);
505 /*****************************************************************************
506 * when_all, simple completion
507 *****************************************************************************/
508 // Degenerate case: when there are no functions to wait for, return
510 void wait_all_simple_impl( std::shared_ptr
< boost::fibers::barrier
>) {
513 // When there's at least one function to wait for, launch it and recur to
515 //[wait_all_simple_impl
516 template< typename Fn
, typename
... Fns
>
517 void wait_all_simple_impl( std::shared_ptr
< boost::fibers::barrier
> barrier
,
518 Fn
&& function
, Fns
&& ... functions
) {
519 boost::fibers::fiber(
521 []( std::shared_ptr
< boost::fibers::barrier
> & barrier
,
522 typename
std::decay
< Fn
>::type
& function
) mutable {
527 std::forward
< Fn
>( function
)
529 wait_all_simple_impl( barrier
, std::forward
< Fns
>( functions
) ... );
533 // interface function: instantiate barrier, launch tasks, wait for barrier
535 template< typename
... Fns
>
536 void wait_all_simple( Fns
&& ... functions
) {
537 std::size_t count( sizeof ... ( functions
) );
538 // Initialize a barrier(count+1) because we'll immediately wait on it. We
539 // don't want to wake up until 'count' more fibers wait on it. Even though
540 // we'll stick around until the last of them completes, use shared_ptr
541 // anyway because it's easier to be confident about lifespan issues.
542 auto barrier( std::make_shared
< boost::fibers::barrier
>( count
+ 1) );
543 wait_all_simple_impl( barrier
, std::forward
< Fns
>( functions
) ... );
549 Example
was( runner
, "wait_all_simple()", [](){
550 //[wait_all_simple_ex
552 [](){ sleeper("was_long", 150); },
553 [](){ sleeper("was_medium", 100); },
554 [](){ sleeper("was_short", 50); });
558 /*****************************************************************************
559 * when_all, return values
560 *****************************************************************************/
562 // Introduce a channel facade that closes the channel once a specific number
563 // of items has been pushed. This allows an arbitrary consumer to read until
564 // 'closed' without itself having to count items.
565 template< typename T
>
568 nchannel( std::shared_ptr
< boost::fibers::buffered_channel
< T
> > chan
,
578 boost::fibers::channel_op_status
push( T
&& va
) {
579 boost::fibers::channel_op_status ok
=
580 chan_
->push( std::forward
< T
>( va
) );
581 if ( ok
== boost::fibers::channel_op_status::success
&&
583 // after the 'limit_'th successful push, close the channel
590 std::shared_ptr
< boost::fibers::buffered_channel
< T
> > chan_
;
595 // When there's only one function, call this overload
596 //[wait_all_values_impl
597 template< typename T
, typename Fn
>
598 void wait_all_values_impl( std::shared_ptr
< nchannel
< T
> > chan
,
600 boost::fibers::fiber( [chan
, function
](){
601 chan
->push(function());
606 // When there are two or more functions, call this overload
607 template< typename T
, typename Fn0
, typename Fn1
, typename
... Fns
>
608 void wait_all_values_impl( std::shared_ptr
< nchannel
< T
> > chan
,
611 Fns
&& ... functions
) {
612 // process the first function using the single-function overload
613 wait_all_values_impl
< T
>( chan
, std::forward
< Fn0
>( function0
) );
614 // then recur to process the rest
615 wait_all_values_impl
< T
>( chan
,
616 std::forward
< Fn1
>( function1
),
617 std::forward
< Fns
>( functions
) ... );
620 //[wait_all_values_source
621 // Return a shared_ptr<buffered_channel<T>> from which the caller can
622 // retrieve each new result as it arrives, until 'closed'.
623 template< typename Fn
, typename
... Fns
>
624 std::shared_ptr
< boost::fibers::buffered_channel
< typename
std::result_of
< Fn() >::type
> >
625 wait_all_values_source( Fn
&& function
, Fns
&& ... functions
) {
626 std::size_t count( 1 + sizeof ... ( functions
) );
627 typedef typename
std::result_of
< Fn() >::type return_t
;
628 typedef boost::fibers::buffered_channel
< return_t
> channel_t
;
630 auto chanp( std::make_shared
< channel_t
>( 64) );
631 // and make an nchannel facade to close it after 'count' items
632 auto ncp( std::make_shared
< nchannel
< return_t
> >( chanp
, count
) );
633 // pass that nchannel facade to all the relevant fibers
634 wait_all_values_impl
< return_t
>( ncp
,
635 std::forward
< Fn
>( function
),
636 std::forward
< Fns
>( functions
) ... );
637 // then return the channel for consumer
642 // When all passed functions have completed, return vector<T> containing
643 // collected results. Assume that all passed functions have the same return
644 // type. It is simply invalid to pass NO functions.
646 template< typename Fn
, typename
... Fns
>
647 std::vector
< typename
std::result_of
< Fn() >::type
>
648 wait_all_values( Fn
&& function
, Fns
&& ... functions
) {
649 std::size_t count( 1 + sizeof ... ( functions
) );
650 typedef typename
std::result_of
< Fn() >::type return_t
;
651 typedef std::vector
< return_t
> vector_t
;
653 results
.reserve( count
);
656 std::shared_ptr
< boost::fibers::buffered_channel
< return_t
> > chan
=
657 wait_all_values_source( std::forward
< Fn
>( function
),
658 std::forward
< Fns
>( functions
) ... );
659 // fill results vector
661 while ( boost::fibers::channel_op_status::success
== chan
->pop(value
) ) {
662 results
.push_back( value
);
664 // return vector to caller
669 Example
wav( runner
, "wait_all_values()", [](){
670 //[wait_all_values_source_ex
671 std::shared_ptr
< boost::fibers::buffered_channel
< std::string
> > chan
=
672 wait_all_values_source(
673 [](){ return sleeper("wavs_third", 150); },
674 [](){ return sleeper("wavs_second", 100); },
675 [](){ return sleeper("wavs_first", 50); });
677 while ( boost::fibers::channel_op_status::success
== chan
->pop(value
) ) {
678 std::cout
<< "wait_all_values_source() => '" << value
683 //[wait_all_values_ex
684 std::vector
< std::string
> values
=
686 [](){ return sleeper("wav_late", 150); },
687 [](){ return sleeper("wav_middle", 100); },
688 [](){ return sleeper("wav_early", 50); });
690 std::cout
<< "wait_all_values() =>";
691 for ( std::string
const& v
: values
) {
692 std::cout
<< " '" << v
<< "'";
694 std::cout
<< std::endl
;
697 /*****************************************************************************
698 * when_all, throw first exception
699 *****************************************************************************/
700 //[wait_all_until_error_source
701 // Return a shared_ptr<buffered_channel<future<T>>> from which the caller can
702 // get() each new result as it arrives, until 'closed'.
703 template< typename Fn
, typename
... Fns
>
705 boost::fibers::buffered_channel
<
706 boost::fibers::future
<
707 typename
std::result_of
< Fn() >::type
> > >
708 wait_all_until_error_source( Fn
&& function
, Fns
&& ... functions
) {
709 std::size_t count( 1 + sizeof ... ( functions
) );
710 typedef typename
std::result_of
< Fn() >::type return_t
;
711 typedef boost::fibers::future
< return_t
> future_t
;
712 typedef boost::fibers::buffered_channel
< future_t
> channel_t
;
714 auto chanp( std::make_shared
< channel_t
>( 64) );
715 // and make an nchannel facade to close it after 'count' items
716 auto ncp( std::make_shared
< nchannel
< future_t
> >( chanp
, count
) );
717 // pass that nchannel facade to all the relevant fibers
718 wait_first_outcome_impl
< return_t
>( ncp
,
719 std::forward
< Fn
>( function
),
720 std::forward
< Fns
>( functions
) ... );
721 // then return the channel for consumer
726 // When all passed functions have completed, return vector<T> containing
727 // collected results, or throw the first exception thrown by any of the passed
728 // functions. Assume that all passed functions have the same return type. It
729 // is simply invalid to pass NO functions.
730 //[wait_all_until_error
731 template< typename Fn
, typename
... Fns
>
732 std::vector
< typename
std::result_of
< Fn() >::type
>
733 wait_all_until_error( Fn
&& function
, Fns
&& ... functions
) {
734 std::size_t count( 1 + sizeof ... ( functions
) );
735 typedef typename
std::result_of
< Fn() >::type return_t
;
736 typedef typename
boost::fibers::future
< return_t
> future_t
;
737 typedef std::vector
< return_t
> vector_t
;
739 results
.reserve( count
);
743 boost::fibers::buffered_channel
< future_t
> > chan(
744 wait_all_until_error_source( std::forward
< Fn
>( function
),
745 std::forward
< Fns
>( functions
) ... ) );
746 // fill results vector
748 while ( boost::fibers::channel_op_status::success
== chan
->pop( future
) ) {
749 results
.push_back( future
.get() );
751 // return vector to caller
756 Example
waue( runner
, "wait_all_until_error()", [](){
757 //[wait_all_until_error_source_ex
758 typedef boost::fibers::future
< std::string
> future_t
;
759 std::shared_ptr
< boost::fibers::buffered_channel
< future_t
> > chan
=
760 wait_all_until_error_source(
761 [](){ return sleeper("wauess_third", 150); },
762 [](){ return sleeper("wauess_second", 100); },
763 [](){ return sleeper("wauess_first", 50); });
765 while ( boost::fibers::channel_op_status::success
== chan
->pop( future
) ) {
766 std::string
value( future
.get() );
767 std::cout
<< "wait_all_until_error_source(success) => '" << value
772 chan
= wait_all_until_error_source(
773 [](){ return sleeper("wauesf_third", 150); },
774 [](){ return sleeper("wauesf_second", 100, true); },
775 [](){ return sleeper("wauesf_first", 50); });
776 //[wait_all_until_error_ex
780 while ( boost::fibers::channel_op_status::success
== chan
->pop( future
) ) {
781 std::string
value( future
.get() );
782 std::cout
<< "wait_all_until_error_source(fail) => '" << value
785 } catch ( std::exception
const& e
) {
788 std::cout
<< "wait_all_until_error_source(fail) threw '" << thrown
794 std::vector
< std::string
> values
= wait_all_until_error(
795 [](){ return sleeper("waue_late", 150); },
796 [](){ return sleeper("waue_middle", 100, true); },
797 [](){ return sleeper("waue_early", 50); });
799 std::cout
<< "wait_all_until_error(fail) =>";
800 for ( std::string
const& v
: values
) {
801 std::cout
<< " '" << v
<< "'";
803 std::cout
<< std::endl
;
805 } catch ( std::exception
const& e
) {
808 std::cout
<< "wait_all_until_error(fail) threw '" << thrown
813 /*****************************************************************************
814 * when_all, collect exceptions
815 *****************************************************************************/
816 // When all passed functions have succeeded, return vector<T> containing
817 // collected results, or throw exception_list containing all exceptions thrown
818 // by any of the passed functions. Assume that all passed functions have the
819 // same return type. It is simply invalid to pass NO functions.
820 //[wait_all_collect_errors
821 template< typename Fn
, typename
... Fns
>
822 std::vector
< typename
std::result_of
< Fn() >::type
>
823 wait_all_collect_errors( Fn
&& function
, Fns
&& ... functions
) {
824 std::size_t count( 1 + sizeof ... ( functions
) );
825 typedef typename
std::result_of
< Fn() >::type return_t
;
826 typedef typename
boost::fibers::future
< return_t
> future_t
;
827 typedef std::vector
< return_t
> vector_t
;
829 results
.reserve( count
);
830 exception_list
exceptions("wait_all_collect_errors() exceptions");
834 boost::fibers::buffered_channel
< future_t
> > chan(
835 wait_all_until_error_source( std::forward
< Fn
>( function
),
836 std::forward
< Fns
>( functions
) ... ) );
837 // fill results and/or exceptions vectors
839 while ( boost::fibers::channel_op_status::success
== chan
->pop( future
) ) {
840 std::exception_ptr exp
= future
.get_exception_ptr();
842 results
.push_back( future
.get() );
844 exceptions
.add( exp
);
847 // if there were any exceptions, throw
848 if ( exceptions
.size() ) {
851 // no exceptions: return vector to caller
856 Example
wace( runner
, "wait_all_collect_errors()", [](){
857 std::vector
< std::string
> values
= wait_all_collect_errors(
858 [](){ return sleeper("waces_late", 150); },
859 [](){ return sleeper("waces_middle", 100); },
860 [](){ return sleeper("waces_early", 50); });
861 std::cout
<< "wait_all_collect_errors(success) =>";
862 for ( std::string
const& v
: values
) {
863 std::cout
<< " '" << v
<< "'";
865 std::cout
<< std::endl
;
868 std::size_t errors
= 0;
870 values
= wait_all_collect_errors(
871 [](){ return sleeper("wacef_late", 150, true); },
872 [](){ return sleeper("wacef_middle", 100, true); },
873 [](){ return sleeper("wacef_early", 50); });
874 std::cout
<< "wait_all_collect_errors(fail) =>";
875 for ( std::string
const& v
: values
) {
876 std::cout
<< " '" << v
<< "'";
878 std::cout
<< std::endl
;
879 } catch ( exception_list
const& e
) {
882 } catch ( std::exception
const& e
) {
885 std::cout
<< "wait_all_collect_errors(fail) threw '" << thrown
886 << "': " << errors
<< " errors" << std::endl
;
889 /*****************************************************************************
890 * when_all, heterogeneous
891 *****************************************************************************/
892 //[wait_all_members_get
893 template< typename Result
, typename
... Futures
>
894 Result
wait_all_members_get( Futures
&& ... futures
) {
895 // Fetch the results from the passed futures into Result's initializer
896 // list. It's true that the get() calls here will block the implicit
897 // iteration over futures -- but that doesn't matter because we won't be
898 // done until the slowest of them finishes anyway. As results are
899 // processed in argument-list order rather than order of completion, the
900 // leftmost get() to throw an exception will cause that exception to
901 // propagate to the caller.
902 return Result
{ futures
.get() ... };
907 // Explicitly pass Result. This can be any type capable of being initialized
908 // from the results of the passed functions, such as a struct.
909 template< typename Result
, typename
... Fns
>
910 Result
wait_all_members( Fns
&& ... functions
) {
911 // Run each of the passed functions on a separate fiber, passing all their
912 // futures to helper function for processing.
913 return wait_all_members_get
< Result
>(
914 boost::fibers::async( std::forward
< Fns
>( functions
) ) ... );
918 // used by following example
925 friend std::ostream
& operator<<( std::ostream
& out
, Data
const& data
)/*=;
929 return out
<< "Data{str='" << data
.str
<< "', inexact=" << data
.inexact
930 << ", exact=" << data
.exact
<< "}";
937 Example
wam( runner
, "wait_all_members()", [](){
938 //[wait_all_members_data_ex
939 Data data
= wait_all_members
< Data
>(
940 [](){ return sleeper("wams_left", 100); },
941 [](){ return sleeper(3.14, 150); },
942 [](){ return sleeper(17, 50); });
943 std::cout
<< "wait_all_members<Data>(success) => " << data
<< std::endl
;
948 data
= wait_all_members
< Data
>(
949 [](){ return sleeper("wamf_left", 100, true); },
950 [](){ return sleeper(3.14, 150); },
951 [](){ return sleeper(17, 50, true); });
952 std::cout
<< "wait_all_members<Data>(fail) => " << data
<< std::endl
;
953 } catch ( std::exception
const& e
) {
956 std::cout
<< "wait_all_members<Data>(fail) threw '" << thrown
959 //[wait_all_members_vector_ex
960 // If we don't care about obtaining results as soon as they arrive, and we
961 // prefer a result vector in passed argument order rather than completion
962 // order, wait_all_members() is another possible implementation of
963 // wait_all_until_error().
964 auto strings
= wait_all_members
< std::vector
< std::string
> >(
965 [](){ return sleeper("wamv_left", 150); },
966 [](){ return sleeper("wamv_middle", 100); },
967 [](){ return sleeper("wamv_right", 50); });
968 std::cout
<< "wait_all_members<vector>() =>";
969 for ( std::string
const& str
: strings
) {
970 std::cout
<< " '" << str
<< "'";
972 std::cout
<< std::endl
;
977 /*****************************************************************************
979 *****************************************************************************/
980 int main( int argc
, char *argv
[]) {
982 std::cout
<< "done." << std::endl
;