]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/fiber/examples/wait_stuff.cpp
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / boost / libs / fiber / examples / wait_stuff.cpp
CommitLineData
7c673cae
FG
1// Copyright Nat Goodspeed 2015.
2// Distributed under the Boost Software License, Version 1.0.
3// (See accompanying file LICENSE_1_0.txt or copy at
4// http://www.boost.org/LICENSE_1_0.txt)
5//
6#include <algorithm>
7#include <cassert>
8#include <chrono>
9#include <iostream>
10#include <memory>
11#include <sstream>
12#include <string>
13#include <type_traits>
14#include <utility>
15#include <vector>
16
17#include <boost/fiber/all.hpp>
18#include <boost/noncopyable.hpp>
19#include <boost/variant/variant.hpp>
20#include <boost/variant/get.hpp>
21
22// These are wait_something() functions rather than when_something()
23// functions. A big part of the point of the Fiber library is to model
24// sequencing using the processor's instruction pointer rather than chains of
25// callbacks. The future-oriented when_all() / when_any() functions are still
26// based on chains of callbacks. With Fiber, we can do better.
27
28/*****************************************************************************
29* Verbose
30*****************************************************************************/
31class Verbose: boost::noncopyable {
32public:
33 Verbose( std::string const& d):
34 desc( d) {
35 std::cout << desc << " start" << std::endl;
36 }
37
38 ~Verbose() {
39 std::cout << desc << " stop" << std::endl;
40 }
41
42private:
43 const std::string desc;
44};
45
46/*****************************************************************************
47* Runner and Example
48*****************************************************************************/
49// collect and ultimately run every Example
50class Runner {
51 typedef std::vector< std::pair< std::string, std::function< void() > > > function_list;
52
53public:
54 void add( std::string const& desc, std::function< void() > const& func) {
55 functions_.push_back( function_list::value_type( desc, func) );
56 }
57
58 void run() {
59 for ( function_list::value_type const& pair : functions_) {
60 Verbose v( pair.first);
61 pair.second();
62 }
63 }
64
65private:
66 function_list functions_;
67};
68
69Runner runner;
70
71// Example allows us to embed Runner::add() calls at module scope
72struct Example {
73 Example( Runner & runner, std::string const& desc, std::function< void() > const& func) {
74 runner.add( desc, func);
75 }
76};
77
78/*****************************************************************************
79* example task functions
80*****************************************************************************/
81//[wait_sleeper
82template< typename T >
83T sleeper_impl( T item, int ms, bool thrw = false) {
84 std::ostringstream descb, funcb;
85 descb << item;
86 std::string desc( descb.str() );
87 funcb << " sleeper(" << item << ")";
88 Verbose v( funcb.str() );
89
90 boost::this_fiber::sleep_for( std::chrono::milliseconds( ms) );
91 if ( thrw) {
92 throw std::runtime_error( desc);
93 }
94 return item;
95}
96//]
97
98inline
99std::string sleeper( std::string const& item, int ms, bool thrw = false) {
100 return sleeper_impl( item, ms, thrw);
101}
102
103inline
104double sleeper( double item, int ms, bool thrw = false) {
105 return sleeper_impl( item, ms, thrw);
106}
107
108inline
109int sleeper(int item, int ms, bool thrw = false) {
110 return sleeper_impl( item, ms, thrw);
111}
112
113/*****************************************************************************
114* Done
115*****************************************************************************/
116//[wait_done
117// Wrap canonical pattern for condition_variable + bool flag
118struct Done {
119private:
120 boost::fibers::condition_variable cond;
121 boost::fibers::mutex mutex;
122 bool ready = false;
123
124public:
125 typedef std::shared_ptr< Done > ptr;
126
127 void wait() {
128 std::unique_lock< boost::fibers::mutex > lock( mutex);
129 cond.wait( lock, [this](){ return ready; });
130 }
131
132 void notify() {
133 {
134 std::unique_lock< boost::fibers::mutex > lock( mutex);
135 ready = true;
136 } // release mutex
137 cond.notify_one();
138 }
139};
140//]
141
142/*****************************************************************************
143* when_any, simple completion
144*****************************************************************************/
145//[wait_first_simple_impl
146// Degenerate case: when there are no functions to wait for, return
147// immediately.
148void wait_first_simple_impl( Done::ptr) {
149}
150
151// When there's at least one function to wait for, launch it and recur to
152// process the rest.
153template< typename Fn, typename ... Fns >
154void wait_first_simple_impl( Done::ptr done, Fn && function, Fns && ... functions) {
155 boost::fibers::fiber( [done, function](){
156 function();
157 done->notify();
158 }).detach();
159 wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
160}
161//]
162
163// interface function: instantiate Done, launch tasks, wait for Done
164//[wait_first_simple
165template< typename ... Fns >
166void wait_first_simple( Fns && ... functions) {
167 // Use shared_ptr because each function's fiber will bind it separately,
168 // and we're going to return before the last of them completes.
169 auto done( std::make_shared< Done >() );
170 wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
171 done->wait();
172}
173//]
174
175// example usage
176Example wfs( runner, "wait_first_simple()", [](){
177//[wait_first_simple_ex
178 wait_first_simple(
179 [](){ sleeper("wfs_long", 150); },
180 [](){ sleeper("wfs_medium", 100); },
181 [](){ sleeper("wfs_short", 50); });
182//]
183});
184
185/*****************************************************************************
186* when_any, return value
187*****************************************************************************/
188// When there's only one function, call this overload
189//[wait_first_value_impl
190template< typename T, typename Fn >
191void wait_first_value_impl( std::shared_ptr< boost::fibers::unbounded_channel< T > > channel,
192 Fn && function) {
193 boost::fibers::fiber( [channel, function](){
194 // Ignore channel_op_status returned by push():
195 // might be closed; we simply don't care.
196 channel->push( function() );
197 }).detach();
198}
199//]
200
201// When there are two or more functions, call this overload
202template< typename T, typename Fn0, typename Fn1, typename ... Fns >
203void wait_first_value_impl( std::shared_ptr< boost::fibers::unbounded_channel< T > > channel,
204 Fn0 && function0,
205 Fn1 && function1,
206 Fns && ... functions) {
207 // process the first function using the single-function overload
208 wait_first_value_impl< T >( channel,
209 std::forward< Fn0 >( function0) );
210 // then recur to process the rest
211 wait_first_value_impl< T >( channel,
212 std::forward< Fn1 >( function1),
213 std::forward< Fns >( functions) ... );
214}
215
216//[wait_first_value
217// Assume that all passed functions have the same return type. The return type
218// of wait_first_value() is the return type of the first passed function. It is
219// simply invalid to pass NO functions.
220template< typename Fn, typename ... Fns >
221typename std::result_of< Fn() >::type
222wait_first_value( Fn && function, Fns && ... functions) {
223 typedef typename std::result_of< Fn() >::type return_t;
224 typedef boost::fibers::unbounded_channel< return_t > channel_t;
225 auto channelp( std::make_shared< channel_t >() );
226 // launch all the relevant fibers
227 wait_first_value_impl< return_t >( channelp,
228 std::forward< Fn >( function),
229 std::forward< Fns >( functions) ... );
230 // retrieve the first value
231 return_t value( channelp->value_pop() );
232 // close the channel: no subsequent push() has to succeed
233 channelp->close();
234 return value;
235}
236//]
237
238// example usage
239Example wfv( runner, "wait_first_value()", [](){
240//[wait_first_value_ex
241 std::string result = wait_first_value(
242 [](){ return sleeper("wfv_third", 150); },
243 [](){ return sleeper("wfv_second", 100); },
244 [](){ return sleeper("wfv_first", 50); });
245 std::cout << "wait_first_value() => " << result << std::endl;
246 assert(result == "wfv_first");
247//]
248});
249
250/*****************************************************************************
251* when_any, produce first outcome, whether result or exception
252*****************************************************************************/
253// When there's only one function, call this overload.
254//[wait_first_outcome_impl
255template< typename T, typename CHANNELP, typename Fn >
256void wait_first_outcome_impl( CHANNELP channel, Fn && function) {
257 boost::fibers::fiber(
258 // Use std::bind() here for C++11 compatibility. C++11 lambda capture
259 // can't move a move-only Fn type, but bind() can. Let bind() move the
260 // channel pointer and the function into the bound object, passing
261 // references into the lambda.
262 std::bind(
263 []( CHANNELP & channel,
264 typename std::decay< Fn >::type & function) {
265 // Instantiate a packaged_task to capture any exception thrown by
266 // function.
267 boost::fibers::packaged_task< T() > task( function);
268 // Immediately run this packaged_task on same fiber. We want
269 // function() to have completed BEFORE we push the future.
270 task();
271 // Pass the corresponding future to consumer. Ignore
272 // channel_op_status returned by push(): might be closed; we
273 // simply don't care.
274 channel->push( task.get_future() );
275 },
276 channel,
277 std::forward< Fn >( function)
278 )).detach();
279}
280//]
281
282// When there are two or more functions, call this overload
283template< typename T, typename CHANNELP, typename Fn0, typename Fn1, typename ... Fns >
284void wait_first_outcome_impl( CHANNELP channel,
285 Fn0 && function0,
286 Fn1 && function1,
287 Fns && ... functions) {
288 // process the first function using the single-function overload
289 wait_first_outcome_impl< T >( channel,
290 std::forward< Fn0 >( function0) );
291 // then recur to process the rest
292 wait_first_outcome_impl< T >( channel,
293 std::forward< Fn1 >( function1),
294 std::forward< Fns >( functions) ... );
295}
296
297// Assume that all passed functions have the same return type. The return type
298// of wait_first_outcome() is the return type of the first passed function. It is
299// simply invalid to pass NO functions.
300//[wait_first_outcome
301template< typename Fn, typename ... Fns >
302typename std::result_of< Fn() >::type
303wait_first_outcome( Fn && function, Fns && ... functions) {
304 // In this case, the value we pass through the channel is actually a
305 // future -- which is already ready. future can carry either a value or an
306 // exception.
307 typedef typename std::result_of< Fn() >::type return_t;
308 typedef boost::fibers::future< return_t > future_t;
309 typedef boost::fibers::unbounded_channel< future_t > channel_t;
310 auto channelp(std::make_shared< channel_t >() );
311 // launch all the relevant fibers
312 wait_first_outcome_impl< return_t >( channelp,
313 std::forward< Fn >( function),
314 std::forward< Fns >( functions) ... );
315 // retrieve the first future
316 future_t future( channelp->value_pop() );
317 // close the channel: no subsequent push() has to succeed
318 channelp->close();
319 // either return value or throw exception
320 return future.get();
321}
322//]
323
324// example usage
325Example wfo( runner, "wait_first_outcome()", [](){
326//[wait_first_outcome_ex
327 std::string result = wait_first_outcome(
328 [](){ return sleeper("wfos_first", 50); },
329 [](){ return sleeper("wfos_second", 100); },
330 [](){ return sleeper("wfos_third", 150); });
331 std::cout << "wait_first_outcome(success) => " << result << std::endl;
332 assert(result == "wfos_first");
333
334 std::string thrown;
335 try {
336 result = wait_first_outcome(
337 [](){ return sleeper("wfof_first", 50, true); },
338 [](){ return sleeper("wfof_second", 100); },
339 [](){ return sleeper("wfof_third", 150); });
340 } catch ( std::exception const& e) {
341 thrown = e.what();
342 }
343 std::cout << "wait_first_outcome(fail) threw '" << thrown
344 << "'" << std::endl;
345 assert(thrown == "wfof_first");
346//]
347});
348
349/*****************************************************************************
350* when_any, collect exceptions until success; throw exception_list if no
351* success
352*****************************************************************************/
353// define an exception to aggregate exception_ptrs; prefer
354// std::exception_list (N4407 et al.) once that becomes available
355//[exception_list
356class exception_list : public std::runtime_error {
357public:
358 exception_list( std::string const& what) :
359 std::runtime_error( what) {
360 }
361
362 typedef std::vector< std::exception_ptr > bundle_t;
363
364 // N4407 proposed std::exception_list API
365 typedef bundle_t::const_iterator iterator;
366
367 std::size_t size() const noexcept {
368 return bundle_.size();
369 }
370
371 iterator begin() const noexcept {
372 return bundle_.begin();
373 }
374
375 iterator end() const noexcept {
376 return bundle_.end();
377 }
378
379 // extension to populate
380 void add( std::exception_ptr ep) {
381 bundle_.push_back( ep);
382 }
383
384private:
385 bundle_t bundle_;
386};
387//]
388
389// Assume that all passed functions have the same return type. The return type
390// of wait_first_success() is the return type of the first passed function. It is
391// simply invalid to pass NO functions.
392//[wait_first_success
393template< typename Fn, typename ... Fns >
394typename std::result_of< Fn() >::type
395wait_first_success( Fn && function, Fns && ... functions) {
396 std::size_t count( 1 + sizeof ... ( functions) );
397 // In this case, the value we pass through the channel is actually a
398 // future -- which is already ready. future can carry either a value or an
399 // exception.
400 typedef typename std::result_of< typename std::decay< Fn >::type() >::type return_t;
401 typedef boost::fibers::future< return_t > future_t;
402 typedef boost::fibers::unbounded_channel< future_t > channel_t;
403 auto channelp( std::make_shared< channel_t >() );
404 // launch all the relevant fibers
405 wait_first_outcome_impl< return_t >( channelp,
406 std::forward< Fn >( function),
407 std::forward< Fns >( functions) ... );
408 // instantiate exception_list, just in case
409 exception_list exceptions("wait_first_success() produced only errors");
410 // retrieve up to 'count' results -- but stop there!
411 for ( std::size_t i = 0; i < count; ++i) {
412 // retrieve the next future
413 future_t future( channelp->value_pop() );
414 // retrieve exception_ptr if any
415 std::exception_ptr error( future.get_exception_ptr() );
416 // if no error, then yay, return value
417 if ( ! error) {
418 // close the channel: no subsequent push() has to succeed
419 channelp->close();
420 // show caller the value we got
421 return future.get();
422 }
423
424 // error is non-null: collect
425 exceptions.add( error);
426 }
427 // We only arrive here when every passed function threw an exception.
428 // Throw our collection to inform caller.
429 throw exceptions;
430}
431//]
432
433// example usage
434Example wfss( runner, "wait_first_success()", [](){
435//[wait_first_success_ex
436 std::string result = wait_first_success(
437 [](){ return sleeper("wfss_first", 50, true); },
438 [](){ return sleeper("wfss_second", 100); },
439 [](){ return sleeper("wfss_third", 150); });
440 std::cout << "wait_first_success(success) => " << result << std::endl;
441 assert(result == "wfss_second");
442//]
443
444 std::string thrown;
445 std::size_t count = 0;
446 try {
447 result = wait_first_success(
448 [](){ return sleeper("wfsf_first", 50, true); },
449 [](){ return sleeper("wfsf_second", 100, true); },
450 [](){ return sleeper("wfsf_third", 150, true); });
451 } catch ( exception_list const& e) {
452 thrown = e.what();
453 count = e.size();
454 } catch ( std::exception const& e) {
455 thrown = e.what();
456 }
457 std::cout << "wait_first_success(fail) threw '" << thrown << "': "
458 << count << " errors" << std::endl;
459 assert(thrown == "wait_first_success() produced only errors");
460 assert(count == 3);
461});
462
463/*****************************************************************************
464* when_any, heterogeneous
465*****************************************************************************/
466//[wait_first_value_het
467// No need to break out the first Fn for interface function: let the compiler
468// complain if empty.
469// Our functions have different return types, and we might have to return any
470// of them. Use a variant, expanding std::result_of<Fn()>::type for each Fn in
471// parameter pack.
472template< typename ... Fns >
473boost::variant< typename std::result_of< Fns() >::type ... >
474wait_first_value_het( Fns && ... functions) {
475 // Use unbounded_channel<boost::variant<T1, T2, ...>>; see remarks above.
476 typedef boost::variant< typename std::result_of< Fns() >::type ... > return_t;
477 typedef boost::fibers::unbounded_channel< return_t > channel_t;
478 auto channelp( std::make_shared< channel_t >() );
479 // launch all the relevant fibers
480 wait_first_value_impl< return_t >( channelp,
481 std::forward< Fns >( functions) ... );
482 // retrieve the first value
483 return_t value( channelp->value_pop() );
484 // close the channel: no subsequent push() has to succeed
485 channelp->close();
486 return value;
487}
488//]
489
490// example usage
491Example wfvh( runner, "wait_first_value_het()", [](){
492//[wait_first_value_het_ex
493 boost::variant< std::string, double, int > result =
494 wait_first_value_het(
495 [](){ return sleeper("wfvh_third", 150); },
496 [](){ return sleeper(3.14, 100); },
497 [](){ return sleeper(17, 50); });
498 std::cout << "wait_first_value_het() => " << result << std::endl;
499 assert(boost::get< int >( result) == 17);
500//]
501});
502
503/*****************************************************************************
504* when_all, simple completion
505*****************************************************************************/
506// Degenerate case: when there are no functions to wait for, return
507// immediately.
508void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier >) {
509}
510
511// When there's at least one function to wait for, launch it and recur to
512// process the rest.
513//[wait_all_simple_impl
514template< typename Fn, typename ... Fns >
515void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier > barrier,
516 Fn && function, Fns && ... functions) {
517 boost::fibers::fiber(
518 std::bind(
519 []( std::shared_ptr< boost::fibers::barrier > & barrier,
520 typename std::decay< Fn >::type & function) mutable {
521 function();
522 barrier->wait();
523 },
524 barrier,
525 std::forward< Fn >( function)
526 )).detach();
527 wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
528}
529//]
530
531// interface function: instantiate barrier, launch tasks, wait for barrier
532//[wait_all_simple
533template< typename ... Fns >
534void wait_all_simple( Fns && ... functions) {
535 std::size_t count( sizeof ... ( functions) );
536 // Initialize a barrier(count+1) because we'll immediately wait on it. We
537 // don't want to wake up until 'count' more fibers wait on it. Even though
538 // we'll stick around until the last of them completes, use shared_ptr
539 // anyway because it's easier to be confident about lifespan issues.
540 auto barrier( std::make_shared< boost::fibers::barrier >( count + 1) );
541 wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
542 barrier->wait();
543}
544//]
545
546// example usage
547Example was( runner, "wait_all_simple()", [](){
548//[wait_all_simple_ex
549 wait_all_simple(
550 [](){ sleeper("was_long", 150); },
551 [](){ sleeper("was_medium", 100); },
552 [](){ sleeper("was_short", 50); });
553//]
554});
555
556/*****************************************************************************
557* when_all, return values
558*****************************************************************************/
559//[wait_nchannel
560// Introduce a channel facade that closes the channel once a specific number
561// of items has been pushed. This allows an arbitrary consumer to read until
562// 'closed' without itself having to count items.
563template< typename T >
564class nchannel {
565public:
566 nchannel( std::shared_ptr< boost::fibers::unbounded_channel< T > > cp,
567 std::size_t lm):
568 channel_( cp),
569 limit_( lm) {
570 assert(channel_);
571 if ( 0 == limit_) {
572 channel_->close();
573 }
574 }
575
576 boost::fibers::channel_op_status push( T && va) {
577 boost::fibers::channel_op_status ok =
578 channel_->push( std::forward< T >( va) );
579 if ( ok == boost::fibers::channel_op_status::success &&
580 --limit_ == 0) {
581 // after the 'limit_'th successful push, close the channel
582 channel_->close();
583 }
584 return ok;
585 }
586
587private:
588 std::shared_ptr< boost::fibers::unbounded_channel< T > > channel_;
589 std::size_t limit_;
590};
591//]
592
593// When there's only one function, call this overload
594//[wait_all_values_impl
595template< typename T, typename Fn >
596void wait_all_values_impl( std::shared_ptr< nchannel< T > > channel,
597 Fn && function) {
598 boost::fibers::fiber( [channel, function](){
599 channel->push(function());
600 }).detach();
601}
602//]
603
604// When there are two or more functions, call this overload
605template< typename T, typename Fn0, typename Fn1, typename ... Fns >
606void wait_all_values_impl( std::shared_ptr< nchannel< T > > channel,
607 Fn0 && function0,
608 Fn1 && function1,
609 Fns && ... functions) {
610 // process the first function using the single-function overload
611 wait_all_values_impl< T >( channel, std::forward< Fn0 >( function0) );
612 // then recur to process the rest
613 wait_all_values_impl< T >( channel,
614 std::forward< Fn1 >( function1),
615 std::forward< Fns >( functions) ... );
616}
617
618//[wait_all_values_source
619// Return a shared_ptr<unbounded_channel<T>> from which the caller can
620// retrieve each new result as it arrives, until 'closed'.
621template< typename Fn, typename ... Fns >
622std::shared_ptr< boost::fibers::unbounded_channel< typename std::result_of< Fn() >::type > >
623wait_all_values_source( Fn && function, Fns && ... functions) {
624 std::size_t count( 1 + sizeof ... ( functions) );
625 typedef typename std::result_of< Fn() >::type return_t;
626 typedef boost::fibers::unbounded_channel< return_t > channel_t;
627 // make the channel
628 auto channelp( std::make_shared< channel_t >() );
629 // and make an nchannel facade to close it after 'count' items
630 auto ncp( std::make_shared< nchannel< return_t > >( channelp, count) );
631 // pass that nchannel facade to all the relevant fibers
632 wait_all_values_impl< return_t >( ncp,
633 std::forward< Fn >( function),
634 std::forward< Fns >( functions) ... );
635 // then return the channel for consumer
636 return channelp;
637}
638//]
639
640// When all passed functions have completed, return vector<T> containing
641// collected results. Assume that all passed functions have the same return
642// type. It is simply invalid to pass NO functions.
643//[wait_all_values
644template< typename Fn, typename ... Fns >
645std::vector< typename std::result_of< Fn() >::type >
646wait_all_values( Fn && function, Fns && ... functions) {
647 std::size_t count( 1 + sizeof ... ( functions) );
648 typedef typename std::result_of< Fn() >::type return_t;
649 typedef std::vector< return_t > vector_t;
650 vector_t results;
651 results.reserve( count);
652
653 // get channel
654 std::shared_ptr< boost::fibers::unbounded_channel< return_t > > channel =
655 wait_all_values_source( std::forward< Fn >( function),
656 std::forward< Fns >( functions) ... );
657 // fill results vector
658 return_t value;
659 while ( boost::fibers::channel_op_status::success == channel->pop(value) ) {
660 results.push_back( value);
661 }
662 // return vector to caller
663 return results;
664}
665//]
666
667Example wav( runner, "wait_all_values()", [](){
668//[wait_all_values_source_ex
669 std::shared_ptr< boost::fibers::unbounded_channel< std::string > > channel =
670 wait_all_values_source(
671 [](){ return sleeper("wavs_third", 150); },
672 [](){ return sleeper("wavs_second", 100); },
673 [](){ return sleeper("wavs_first", 50); });
674 std::string value;
675 while ( boost::fibers::channel_op_status::success == channel->pop(value) ) {
676 std::cout << "wait_all_values_source() => '" << value
677 << "'" << std::endl;
678 }
679//]
680
681//[wait_all_values_ex
682 std::vector< std::string > values =
683 wait_all_values(
684 [](){ return sleeper("wav_late", 150); },
685 [](){ return sleeper("wav_middle", 100); },
686 [](){ return sleeper("wav_early", 50); });
687//]
688 std::cout << "wait_all_values() =>";
689 for ( std::string const& v : values) {
690 std::cout << " '" << v << "'";
691 }
692 std::cout << std::endl;
693});
694
695/*****************************************************************************
696* when_all, throw first exception
697*****************************************************************************/
698//[wait_all_until_error_source
699// Return a shared_ptr<unbounded_channel<future<T>>> from which the caller can
700// get() each new result as it arrives, until 'closed'.
701template< typename Fn, typename ... Fns >
702std::shared_ptr<
703 boost::fibers::unbounded_channel<
704 boost::fibers::future<
705 typename std::result_of< Fn() >::type > > >
706wait_all_until_error_source( Fn && function, Fns && ... functions) {
707 std::size_t count( 1 + sizeof ... ( functions) );
708 typedef typename std::result_of< Fn() >::type return_t;
709 typedef boost::fibers::future< return_t > future_t;
710 typedef boost::fibers::unbounded_channel< future_t > channel_t;
711 // make the channel
712 auto channelp( std::make_shared< channel_t >() );
713 // and make an nchannel facade to close it after 'count' items
714 auto ncp( std::make_shared< nchannel< future_t > >( channelp, count) );
715 // pass that nchannel facade to all the relevant fibers
716 wait_first_outcome_impl< return_t >( ncp,
717 std::forward< Fn >( function),
718 std::forward< Fns >( functions) ... );
719 // then return the channel for consumer
720 return channelp;
721}
722//]
723
724// When all passed functions have completed, return vector<T> containing
725// collected results, or throw the first exception thrown by any of the passed
726// functions. Assume that all passed functions have the same return type. It
727// is simply invalid to pass NO functions.
728//[wait_all_until_error
729template< typename Fn, typename ... Fns >
730std::vector< typename std::result_of< Fn() >::type >
731wait_all_until_error( Fn && function, Fns && ... functions) {
732 std::size_t count( 1 + sizeof ... ( functions) );
733 typedef typename std::result_of< Fn() >::type return_t;
734 typedef typename boost::fibers::future< return_t > future_t;
735 typedef std::vector< return_t > vector_t;
736 vector_t results;
737 results.reserve( count);
738
739 // get channel
740 std::shared_ptr<
741 boost::fibers::unbounded_channel< future_t > > channel(
742 wait_all_until_error_source( std::forward< Fn >( function),
743 std::forward< Fns >( functions) ... ) );
744 // fill results vector
745 future_t future;
746 while ( boost::fibers::channel_op_status::success == channel->pop( future) ) {
747 results.push_back( future.get() );
748 }
749 // return vector to caller
750 return results;
751}
752//]
753
754Example waue( runner, "wait_all_until_error()", [](){
755//[wait_all_until_error_source_ex
756 typedef boost::fibers::future< std::string > future_t;
757 std::shared_ptr< boost::fibers::unbounded_channel< future_t > > channel =
758 wait_all_until_error_source(
759 [](){ return sleeper("wauess_third", 150); },
760 [](){ return sleeper("wauess_second", 100); },
761 [](){ return sleeper("wauess_first", 50); });
762 future_t future;
763 while ( boost::fibers::channel_op_status::success == channel->pop( future) ) {
764 std::string value( future.get() );
765 std::cout << "wait_all_until_error_source(success) => '" << value
766 << "'" << std::endl;
767 }
768//]
769
770 channel = wait_all_until_error_source(
771 [](){ return sleeper("wauesf_third", 150); },
772 [](){ return sleeper("wauesf_second", 100, true); },
773 [](){ return sleeper("wauesf_first", 50); });
774//[wait_all_until_error_ex
775 std::string thrown;
776//<-
777 try {
778 while ( boost::fibers::channel_op_status::success == channel->pop( future) ) {
779 std::string value( future.get() );
780 std::cout << "wait_all_until_error_source(fail) => '" << value
781 << "'" << std::endl;
782 }
783 } catch ( std::exception const& e) {
784 thrown = e.what();
785 }
786 std::cout << "wait_all_until_error_source(fail) threw '" << thrown
787 << "'" << std::endl;
788
789 thrown.clear();
790//->
791 try {
792 std::vector< std::string > values = wait_all_until_error(
793 [](){ return sleeper("waue_late", 150); },
794 [](){ return sleeper("waue_middle", 100, true); },
795 [](){ return sleeper("waue_early", 50); });
796//<-
797 std::cout << "wait_all_until_error(fail) =>";
798 for ( std::string const& v : values) {
799 std::cout << " '" << v << "'";
800 }
801 std::cout << std::endl;
802//->
803 } catch ( std::exception const& e) {
804 thrown = e.what();
805 }
806 std::cout << "wait_all_until_error(fail) threw '" << thrown
807 << "'" << std::endl;
808//]
809});
810
811/*****************************************************************************
812* when_all, collect exceptions
813*****************************************************************************/
814// When all passed functions have succeeded, return vector<T> containing
815// collected results, or throw exception_list containing all exceptions thrown
816// by any of the passed functions. Assume that all passed functions have the
817// same return type. It is simply invalid to pass NO functions.
818//[wait_all_collect_errors
819template< typename Fn, typename ... Fns >
820std::vector< typename std::result_of< Fn() >::type >
821wait_all_collect_errors( Fn && function, Fns && ... functions) {
822 std::size_t count( 1 + sizeof ... ( functions) );
823 typedef typename std::result_of< Fn() >::type return_t;
824 typedef typename boost::fibers::future< return_t > future_t;
825 typedef std::vector< return_t > vector_t;
826 vector_t results;
827 results.reserve( count);
828 exception_list exceptions("wait_all_collect_errors() exceptions");
829
830 // get channel
831 std::shared_ptr<
832 boost::fibers::unbounded_channel< future_t > > channel(
833 wait_all_until_error_source( std::forward< Fn >( function),
834 std::forward< Fns >( functions) ... ) );
835 // fill results and/or exceptions vectors
836 future_t future;
837 while ( boost::fibers::channel_op_status::success == channel->pop( future) ) {
838 std::exception_ptr exp = future.get_exception_ptr();
839 if ( ! exp) {
840 results.push_back( future.get() );
841 } else {
842 exceptions.add( exp);
843 }
844 }
845 // if there were any exceptions, throw
846 if ( exceptions.size() ) {
847 throw exceptions;
848 }
849 // no exceptions: return vector to caller
850 return results;
851}
852//]
853
854Example wace( runner, "wait_all_collect_errors()", [](){
855 std::vector< std::string > values = wait_all_collect_errors(
856 [](){ return sleeper("waces_late", 150); },
857 [](){ return sleeper("waces_middle", 100); },
858 [](){ return sleeper("waces_early", 50); });
859 std::cout << "wait_all_collect_errors(success) =>";
860 for ( std::string const& v : values) {
861 std::cout << " '" << v << "'";
862 }
863 std::cout << std::endl;
864
865 std::string thrown;
866 std::size_t errors = 0;
867 try {
868 values = wait_all_collect_errors(
869 [](){ return sleeper("wacef_late", 150, true); },
870 [](){ return sleeper("wacef_middle", 100, true); },
871 [](){ return sleeper("wacef_early", 50); });
872 std::cout << "wait_all_collect_errors(fail) =>";
873 for ( std::string const& v : values) {
874 std::cout << " '" << v << "'";
875 }
876 std::cout << std::endl;
877 } catch ( exception_list const& e) {
878 thrown = e.what();
879 errors = e.size();
880 } catch ( std::exception const& e) {
881 thrown = e.what();
882 }
883 std::cout << "wait_all_collect_errors(fail) threw '" << thrown
884 << "': " << errors << " errors" << std::endl;
885});
886
887/*****************************************************************************
888* when_all, heterogeneous
889*****************************************************************************/
890//[wait_all_members_get
891template< typename Result, typename ... Futures >
892Result wait_all_members_get( Futures && ... futures) {
893 // Fetch the results from the passed futures into Result's initializer
894 // list. It's true that the get() calls here will block the implicit
895 // iteration over futures -- but that doesn't matter because we won't be
896 // done until the slowest of them finishes anyway. As results are
897 // processed in argument-list order rather than order of completion, the
898 // leftmost get() to throw an exception will cause that exception to
899 // propagate to the caller.
900 return Result{ futures.get() ... };
901}
902//]
903
904//[wait_all_members
905// Explicitly pass Result. This can be any type capable of being initialized
906// from the results of the passed functions, such as a struct.
907template< typename Result, typename ... Fns >
908Result wait_all_members( Fns && ... functions) {
909 // Run each of the passed functions on a separate fiber, passing all their
910 // futures to helper function for processing.
911 return wait_all_members_get< Result >(
912 boost::fibers::async( std::forward< Fns >( functions) ) ... );
913}
914//]
915
916// used by following example
917//[wait_Data
918struct Data {
919 std::string str;
920 double inexact;
921 int exact;
922
923 friend std::ostream& operator<<( std::ostream& out, Data const& data)/*=;
924 ...*/
925//<-
926 {
927 return out << "Data{str='" << data.str << "', inexact=" << data.inexact
928 << ", exact=" << data.exact << "}";
929 }
930//->
931};
932//]
933
934// example usage
935Example wam( runner, "wait_all_members()", [](){
936//[wait_all_members_data_ex
937 Data data = wait_all_members< Data >(
938 [](){ return sleeper("wams_left", 100); },
939 [](){ return sleeper(3.14, 150); },
940 [](){ return sleeper(17, 50); });
941 std::cout << "wait_all_members<Data>(success) => " << data << std::endl;
942//]
943
944 std::string thrown;
945 try {
946 data = wait_all_members< Data >(
947 [](){ return sleeper("wamf_left", 100, true); },
948 [](){ return sleeper(3.14, 150); },
949 [](){ return sleeper(17, 50, true); });
950 std::cout << "wait_all_members<Data>(fail) => " << data << std::endl;
951 } catch ( std::exception const& e) {
952 thrown = e.what();
953 }
954 std::cout << "wait_all_members<Data>(fail) threw '" << thrown
955 << '"' << std::endl;
956
957//[wait_all_members_vector_ex
958 // If we don't care about obtaining results as soon as they arrive, and we
959 // prefer a result vector in passed argument order rather than completion
960 // order, wait_all_members() is another possible implementation of
961 // wait_all_until_error().
962 auto strings = wait_all_members< std::vector< std::string > >(
963 [](){ return sleeper("wamv_left", 150); },
964 [](){ return sleeper("wamv_middle", 100); },
965 [](){ return sleeper("wamv_right", 50); });
966 std::cout << "wait_all_members<vector>() =>";
967 for ( std::string const& str : strings) {
968 std::cout << " '" << str << "'";
969 }
970 std::cout << std::endl;
971//]
972});
973
974
975/*****************************************************************************
976* main()
977*****************************************************************************/
978int main( int argc, char *argv[]) {
979 runner.run();
980 std::cout << "done." << std::endl;
981 return EXIT_SUCCESS;
982}