]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/fiber/examples/wait_stuff.cpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / libs / fiber / examples / wait_stuff.cpp
CommitLineData
7c673cae
FG
1// Copyright Nat Goodspeed 2015.
2// Distributed under the Boost Software License, Version 1.0.
3// (See accompanying file LICENSE_1_0.txt or copy at
4// http://www.boost.org/LICENSE_1_0.txt)
5//
6#include <algorithm>
7#include <cassert>
8#include <chrono>
9#include <iostream>
10#include <memory>
11#include <sstream>
12#include <string>
13#include <type_traits>
14#include <utility>
15#include <vector>
16
17#include <boost/fiber/all.hpp>
7c673cae
FG
18#include <boost/variant/variant.hpp>
19#include <boost/variant/get.hpp>
20
21// These are wait_something() functions rather than when_something()
22// functions. A big part of the point of the Fiber library is to model
23// sequencing using the processor's instruction pointer rather than chains of
24// callbacks. The future-oriented when_all() / when_any() functions are still
25// based on chains of callbacks. With Fiber, we can do better.
26
27/*****************************************************************************
28* Verbose
29*****************************************************************************/
b32b8144 30class Verbose {
7c673cae
FG
31public:
32 Verbose( std::string const& d):
33 desc( d) {
34 std::cout << desc << " start" << std::endl;
35 }
36
37 ~Verbose() {
38 std::cout << desc << " stop" << std::endl;
39 }
40
b32b8144
FG
41 Verbose( Verbose const&) = delete;
42 Verbose & operator=( Verbose const&) = delete;
43
7c673cae
FG
44private:
45 const std::string desc;
46};
47
48/*****************************************************************************
49* Runner and Example
50*****************************************************************************/
51// collect and ultimately run every Example
52class Runner {
53 typedef std::vector< std::pair< std::string, std::function< void() > > > function_list;
54
55public:
56 void add( std::string const& desc, std::function< void() > const& func) {
57 functions_.push_back( function_list::value_type( desc, func) );
58 }
59
60 void run() {
61 for ( function_list::value_type const& pair : functions_) {
62 Verbose v( pair.first);
63 pair.second();
64 }
65 }
66
67private:
68 function_list functions_;
69};
70
71Runner runner;
72
73// Example allows us to embed Runner::add() calls at module scope
74struct Example {
75 Example( Runner & runner, std::string const& desc, std::function< void() > const& func) {
76 runner.add( desc, func);
77 }
78};
79
80/*****************************************************************************
81* example task functions
82*****************************************************************************/
83//[wait_sleeper
84template< typename T >
85T sleeper_impl( T item, int ms, bool thrw = false) {
86 std::ostringstream descb, funcb;
87 descb << item;
88 std::string desc( descb.str() );
89 funcb << " sleeper(" << item << ")";
90 Verbose v( funcb.str() );
91
92 boost::this_fiber::sleep_for( std::chrono::milliseconds( ms) );
93 if ( thrw) {
94 throw std::runtime_error( desc);
95 }
96 return item;
97}
98//]
99
100inline
101std::string sleeper( std::string const& item, int ms, bool thrw = false) {
102 return sleeper_impl( item, ms, thrw);
103}
104
105inline
106double sleeper( double item, int ms, bool thrw = false) {
107 return sleeper_impl( item, ms, thrw);
108}
109
110inline
111int sleeper(int item, int ms, bool thrw = false) {
112 return sleeper_impl( item, ms, thrw);
113}
114
115/*****************************************************************************
116* Done
117*****************************************************************************/
118//[wait_done
119// Wrap canonical pattern for condition_variable + bool flag
120struct Done {
121private:
122 boost::fibers::condition_variable cond;
123 boost::fibers::mutex mutex;
124 bool ready = false;
125
126public:
127 typedef std::shared_ptr< Done > ptr;
128
129 void wait() {
130 std::unique_lock< boost::fibers::mutex > lock( mutex);
131 cond.wait( lock, [this](){ return ready; });
132 }
133
134 void notify() {
135 {
136 std::unique_lock< boost::fibers::mutex > lock( mutex);
137 ready = true;
138 } // release mutex
139 cond.notify_one();
140 }
141};
142//]
143
144/*****************************************************************************
145* when_any, simple completion
146*****************************************************************************/
147//[wait_first_simple_impl
148// Degenerate case: when there are no functions to wait for, return
149// immediately.
150void wait_first_simple_impl( Done::ptr) {
151}
152
153// When there's at least one function to wait for, launch it and recur to
154// process the rest.
155template< typename Fn, typename ... Fns >
156void wait_first_simple_impl( Done::ptr done, Fn && function, Fns && ... functions) {
157 boost::fibers::fiber( [done, function](){
158 function();
159 done->notify();
160 }).detach();
161 wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
162}
163//]
164
165// interface function: instantiate Done, launch tasks, wait for Done
166//[wait_first_simple
167template< typename ... Fns >
168void wait_first_simple( Fns && ... functions) {
169 // Use shared_ptr because each function's fiber will bind it separately,
170 // and we're going to return before the last of them completes.
171 auto done( std::make_shared< Done >() );
172 wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
173 done->wait();
174}
175//]
176
177// example usage
178Example wfs( runner, "wait_first_simple()", [](){
179//[wait_first_simple_ex
180 wait_first_simple(
181 [](){ sleeper("wfs_long", 150); },
182 [](){ sleeper("wfs_medium", 100); },
183 [](){ sleeper("wfs_short", 50); });
184//]
185});
186
187/*****************************************************************************
188* when_any, return value
189*****************************************************************************/
190// When there's only one function, call this overload
191//[wait_first_value_impl
192template< typename T, typename Fn >
b32b8144 193void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
7c673cae 194 Fn && function) {
b32b8144 195 boost::fibers::fiber( [chan, function](){
7c673cae
FG
196 // Ignore channel_op_status returned by push():
197 // might be closed; we simply don't care.
b32b8144 198 chan->push( function() );
7c673cae
FG
199 }).detach();
200}
201//]
202
203// When there are two or more functions, call this overload
204template< typename T, typename Fn0, typename Fn1, typename ... Fns >
b32b8144 205void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
7c673cae
FG
206 Fn0 && function0,
207 Fn1 && function1,
208 Fns && ... functions) {
209 // process the first function using the single-function overload
b32b8144 210 wait_first_value_impl< T >( chan,
7c673cae
FG
211 std::forward< Fn0 >( function0) );
212 // then recur to process the rest
b32b8144 213 wait_first_value_impl< T >( chan,
7c673cae
FG
214 std::forward< Fn1 >( function1),
215 std::forward< Fns >( functions) ... );
216}
217
218//[wait_first_value
219// Assume that all passed functions have the same return type. The return type
220// of wait_first_value() is the return type of the first passed function. It is
221// simply invalid to pass NO functions.
222template< typename Fn, typename ... Fns >
223typename std::result_of< Fn() >::type
224wait_first_value( Fn && function, Fns && ... functions) {
225 typedef typename std::result_of< Fn() >::type return_t;
b32b8144
FG
226 typedef boost::fibers::buffered_channel< return_t > channel_t;
227 auto chanp( std::make_shared< channel_t >( 64) );
7c673cae 228 // launch all the relevant fibers
b32b8144 229 wait_first_value_impl< return_t >( chanp,
7c673cae
FG
230 std::forward< Fn >( function),
231 std::forward< Fns >( functions) ... );
232 // retrieve the first value
b32b8144 233 return_t value( chanp->value_pop() );
7c673cae 234 // close the channel: no subsequent push() has to succeed
b32b8144 235 chanp->close();
7c673cae
FG
236 return value;
237}
238//]
239
240// example usage
241Example wfv( runner, "wait_first_value()", [](){
242//[wait_first_value_ex
243 std::string result = wait_first_value(
244 [](){ return sleeper("wfv_third", 150); },
245 [](){ return sleeper("wfv_second", 100); },
246 [](){ return sleeper("wfv_first", 50); });
247 std::cout << "wait_first_value() => " << result << std::endl;
248 assert(result == "wfv_first");
249//]
250});
251
252/*****************************************************************************
253* when_any, produce first outcome, whether result or exception
254*****************************************************************************/
255// When there's only one function, call this overload.
256//[wait_first_outcome_impl
b32b8144
FG
257template< typename T, typename CHANP, typename Fn >
258void wait_first_outcome_impl( CHANP chan, Fn && function) {
7c673cae
FG
259 boost::fibers::fiber(
260 // Use std::bind() here for C++11 compatibility. C++11 lambda capture
261 // can't move a move-only Fn type, but bind() can. Let bind() move the
262 // channel pointer and the function into the bound object, passing
263 // references into the lambda.
264 std::bind(
b32b8144 265 []( CHANP & chan,
7c673cae
FG
266 typename std::decay< Fn >::type & function) {
267 // Instantiate a packaged_task to capture any exception thrown by
268 // function.
269 boost::fibers::packaged_task< T() > task( function);
270 // Immediately run this packaged_task on same fiber. We want
271 // function() to have completed BEFORE we push the future.
272 task();
273 // Pass the corresponding future to consumer. Ignore
274 // channel_op_status returned by push(): might be closed; we
275 // simply don't care.
b32b8144 276 chan->push( task.get_future() );
7c673cae 277 },
b32b8144 278 chan,
7c673cae
FG
279 std::forward< Fn >( function)
280 )).detach();
281}
282//]
283
284// When there are two or more functions, call this overload
b32b8144
FG
285template< typename T, typename CHANP, typename Fn0, typename Fn1, typename ... Fns >
286void wait_first_outcome_impl( CHANP chan,
7c673cae
FG
287 Fn0 && function0,
288 Fn1 && function1,
289 Fns && ... functions) {
290 // process the first function using the single-function overload
b32b8144 291 wait_first_outcome_impl< T >( chan,
7c673cae
FG
292 std::forward< Fn0 >( function0) );
293 // then recur to process the rest
b32b8144 294 wait_first_outcome_impl< T >( chan,
7c673cae
FG
295 std::forward< Fn1 >( function1),
296 std::forward< Fns >( functions) ... );
297}
298
299// Assume that all passed functions have the same return type. The return type
300// of wait_first_outcome() is the return type of the first passed function. It is
301// simply invalid to pass NO functions.
302//[wait_first_outcome
303template< typename Fn, typename ... Fns >
304typename std::result_of< Fn() >::type
305wait_first_outcome( Fn && function, Fns && ... functions) {
306 // In this case, the value we pass through the channel is actually a
307 // future -- which is already ready. future can carry either a value or an
308 // exception.
309 typedef typename std::result_of< Fn() >::type return_t;
310 typedef boost::fibers::future< return_t > future_t;
b32b8144
FG
311 typedef boost::fibers::buffered_channel< future_t > channel_t;
312 auto chanp(std::make_shared< channel_t >( 64) );
7c673cae 313 // launch all the relevant fibers
b32b8144 314 wait_first_outcome_impl< return_t >( chanp,
7c673cae
FG
315 std::forward< Fn >( function),
316 std::forward< Fns >( functions) ... );
317 // retrieve the first future
b32b8144 318 future_t future( chanp->value_pop() );
7c673cae 319 // close the channel: no subsequent push() has to succeed
b32b8144 320 chanp->close();
7c673cae
FG
321 // either return value or throw exception
322 return future.get();
323}
324//]
325
326// example usage
327Example wfo( runner, "wait_first_outcome()", [](){
328//[wait_first_outcome_ex
329 std::string result = wait_first_outcome(
330 [](){ return sleeper("wfos_first", 50); },
331 [](){ return sleeper("wfos_second", 100); },
332 [](){ return sleeper("wfos_third", 150); });
333 std::cout << "wait_first_outcome(success) => " << result << std::endl;
334 assert(result == "wfos_first");
335
336 std::string thrown;
337 try {
338 result = wait_first_outcome(
339 [](){ return sleeper("wfof_first", 50, true); },
340 [](){ return sleeper("wfof_second", 100); },
341 [](){ return sleeper("wfof_third", 150); });
342 } catch ( std::exception const& e) {
343 thrown = e.what();
344 }
345 std::cout << "wait_first_outcome(fail) threw '" << thrown
346 << "'" << std::endl;
347 assert(thrown == "wfof_first");
348//]
349});
350
351/*****************************************************************************
352* when_any, collect exceptions until success; throw exception_list if no
353* success
354*****************************************************************************/
355// define an exception to aggregate exception_ptrs; prefer
356// std::exception_list (N4407 et al.) once that becomes available
357//[exception_list
358class exception_list : public std::runtime_error {
359public:
360 exception_list( std::string const& what) :
361 std::runtime_error( what) {
362 }
363
364 typedef std::vector< std::exception_ptr > bundle_t;
365
366 // N4407 proposed std::exception_list API
367 typedef bundle_t::const_iterator iterator;
368
369 std::size_t size() const noexcept {
370 return bundle_.size();
371 }
372
373 iterator begin() const noexcept {
374 return bundle_.begin();
375 }
376
377 iterator end() const noexcept {
378 return bundle_.end();
379 }
380
381 // extension to populate
382 void add( std::exception_ptr ep) {
383 bundle_.push_back( ep);
384 }
385
386private:
387 bundle_t bundle_;
388};
389//]
390
391// Assume that all passed functions have the same return type. The return type
392// of wait_first_success() is the return type of the first passed function. It is
393// simply invalid to pass NO functions.
394//[wait_first_success
395template< typename Fn, typename ... Fns >
396typename std::result_of< Fn() >::type
397wait_first_success( Fn && function, Fns && ... functions) {
398 std::size_t count( 1 + sizeof ... ( functions) );
399 // In this case, the value we pass through the channel is actually a
400 // future -- which is already ready. future can carry either a value or an
401 // exception.
402 typedef typename std::result_of< typename std::decay< Fn >::type() >::type return_t;
403 typedef boost::fibers::future< return_t > future_t;
b32b8144
FG
404 typedef boost::fibers::buffered_channel< future_t > channel_t;
405 auto chanp( std::make_shared< channel_t >( 64) );
7c673cae 406 // launch all the relevant fibers
b32b8144 407 wait_first_outcome_impl< return_t >( chanp,
7c673cae
FG
408 std::forward< Fn >( function),
409 std::forward< Fns >( functions) ... );
410 // instantiate exception_list, just in case
411 exception_list exceptions("wait_first_success() produced only errors");
412 // retrieve up to 'count' results -- but stop there!
413 for ( std::size_t i = 0; i < count; ++i) {
414 // retrieve the next future
b32b8144 415 future_t future( chanp->value_pop() );
7c673cae
FG
416 // retrieve exception_ptr if any
417 std::exception_ptr error( future.get_exception_ptr() );
418 // if no error, then yay, return value
419 if ( ! error) {
420 // close the channel: no subsequent push() has to succeed
b32b8144 421 chanp->close();
7c673cae
FG
422 // show caller the value we got
423 return future.get();
424 }
425
426 // error is non-null: collect
427 exceptions.add( error);
428 }
429 // We only arrive here when every passed function threw an exception.
430 // Throw our collection to inform caller.
431 throw exceptions;
432}
433//]
434
435// example usage
436Example wfss( runner, "wait_first_success()", [](){
437//[wait_first_success_ex
438 std::string result = wait_first_success(
439 [](){ return sleeper("wfss_first", 50, true); },
440 [](){ return sleeper("wfss_second", 100); },
441 [](){ return sleeper("wfss_third", 150); });
442 std::cout << "wait_first_success(success) => " << result << std::endl;
443 assert(result == "wfss_second");
444//]
445
446 std::string thrown;
447 std::size_t count = 0;
448 try {
449 result = wait_first_success(
450 [](){ return sleeper("wfsf_first", 50, true); },
451 [](){ return sleeper("wfsf_second", 100, true); },
452 [](){ return sleeper("wfsf_third", 150, true); });
453 } catch ( exception_list const& e) {
454 thrown = e.what();
455 count = e.size();
456 } catch ( std::exception const& e) {
457 thrown = e.what();
458 }
459 std::cout << "wait_first_success(fail) threw '" << thrown << "': "
460 << count << " errors" << std::endl;
461 assert(thrown == "wait_first_success() produced only errors");
462 assert(count == 3);
463});
464
465/*****************************************************************************
466* when_any, heterogeneous
467*****************************************************************************/
468//[wait_first_value_het
469// No need to break out the first Fn for interface function: let the compiler
470// complain if empty.
471// Our functions have different return types, and we might have to return any
472// of them. Use a variant, expanding std::result_of<Fn()>::type for each Fn in
473// parameter pack.
474template< typename ... Fns >
475boost::variant< typename std::result_of< Fns() >::type ... >
476wait_first_value_het( Fns && ... functions) {
b32b8144 477 // Use buffered_channel<boost::variant<T1, T2, ...>>; see remarks above.
7c673cae 478 typedef boost::variant< typename std::result_of< Fns() >::type ... > return_t;
b32b8144
FG
479 typedef boost::fibers::buffered_channel< return_t > channel_t;
480 auto chanp( std::make_shared< channel_t >( 64) );
7c673cae 481 // launch all the relevant fibers
b32b8144 482 wait_first_value_impl< return_t >( chanp,
7c673cae
FG
483 std::forward< Fns >( functions) ... );
484 // retrieve the first value
b32b8144 485 return_t value( chanp->value_pop() );
7c673cae 486 // close the channel: no subsequent push() has to succeed
b32b8144 487 chanp->close();
7c673cae
FG
488 return value;
489}
490//]
491
492// example usage
493Example wfvh( runner, "wait_first_value_het()", [](){
494//[wait_first_value_het_ex
495 boost::variant< std::string, double, int > result =
496 wait_first_value_het(
497 [](){ return sleeper("wfvh_third", 150); },
498 [](){ return sleeper(3.14, 100); },
499 [](){ return sleeper(17, 50); });
500 std::cout << "wait_first_value_het() => " << result << std::endl;
501 assert(boost::get< int >( result) == 17);
502//]
503});
504
505/*****************************************************************************
506* when_all, simple completion
507*****************************************************************************/
508// Degenerate case: when there are no functions to wait for, return
509// immediately.
510void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier >) {
511}
512
513// When there's at least one function to wait for, launch it and recur to
514// process the rest.
515//[wait_all_simple_impl
516template< typename Fn, typename ... Fns >
517void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier > barrier,
518 Fn && function, Fns && ... functions) {
519 boost::fibers::fiber(
520 std::bind(
521 []( std::shared_ptr< boost::fibers::barrier > & barrier,
522 typename std::decay< Fn >::type & function) mutable {
523 function();
524 barrier->wait();
525 },
526 barrier,
527 std::forward< Fn >( function)
528 )).detach();
529 wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
530}
531//]
532
533// interface function: instantiate barrier, launch tasks, wait for barrier
534//[wait_all_simple
535template< typename ... Fns >
536void wait_all_simple( Fns && ... functions) {
537 std::size_t count( sizeof ... ( functions) );
538 // Initialize a barrier(count+1) because we'll immediately wait on it. We
539 // don't want to wake up until 'count' more fibers wait on it. Even though
540 // we'll stick around until the last of them completes, use shared_ptr
541 // anyway because it's easier to be confident about lifespan issues.
542 auto barrier( std::make_shared< boost::fibers::barrier >( count + 1) );
543 wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
544 barrier->wait();
545}
546//]
547
548// example usage
549Example was( runner, "wait_all_simple()", [](){
550//[wait_all_simple_ex
551 wait_all_simple(
552 [](){ sleeper("was_long", 150); },
553 [](){ sleeper("was_medium", 100); },
554 [](){ sleeper("was_short", 50); });
555//]
556});
557
558/*****************************************************************************
559* when_all, return values
560*****************************************************************************/
561//[wait_nchannel
562// Introduce a channel facade that closes the channel once a specific number
563// of items has been pushed. This allows an arbitrary consumer to read until
564// 'closed' without itself having to count items.
565template< typename T >
566class nchannel {
567public:
b32b8144 568 nchannel( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
7c673cae 569 std::size_t lm):
b32b8144 570 chan_( chan),
7c673cae 571 limit_( lm) {
b32b8144 572 assert(chan_);
7c673cae 573 if ( 0 == limit_) {
b32b8144 574 chan_->close();
7c673cae
FG
575 }
576 }
577
578 boost::fibers::channel_op_status push( T && va) {
579 boost::fibers::channel_op_status ok =
b32b8144 580 chan_->push( std::forward< T >( va) );
7c673cae
FG
581 if ( ok == boost::fibers::channel_op_status::success &&
582 --limit_ == 0) {
583 // after the 'limit_'th successful push, close the channel
b32b8144 584 chan_->close();
7c673cae
FG
585 }
586 return ok;
587 }
588
589private:
b32b8144 590 std::shared_ptr< boost::fibers::buffered_channel< T > > chan_;
7c673cae
FG
591 std::size_t limit_;
592};
593//]
594
595// When there's only one function, call this overload
596//[wait_all_values_impl
597template< typename T, typename Fn >
b32b8144 598void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
7c673cae 599 Fn && function) {
b32b8144
FG
600 boost::fibers::fiber( [chan, function](){
601 chan->push(function());
7c673cae
FG
602 }).detach();
603}
604//]
605
606// When there are two or more functions, call this overload
607template< typename T, typename Fn0, typename Fn1, typename ... Fns >
b32b8144 608void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
7c673cae
FG
609 Fn0 && function0,
610 Fn1 && function1,
611 Fns && ... functions) {
612 // process the first function using the single-function overload
b32b8144 613 wait_all_values_impl< T >( chan, std::forward< Fn0 >( function0) );
7c673cae 614 // then recur to process the rest
b32b8144 615 wait_all_values_impl< T >( chan,
7c673cae
FG
616 std::forward< Fn1 >( function1),
617 std::forward< Fns >( functions) ... );
618}
619
620//[wait_all_values_source
b32b8144 621// Return a shared_ptr<buffered_channel<T>> from which the caller can
7c673cae
FG
622// retrieve each new result as it arrives, until 'closed'.
623template< typename Fn, typename ... Fns >
b32b8144 624std::shared_ptr< boost::fibers::buffered_channel< typename std::result_of< Fn() >::type > >
7c673cae
FG
625wait_all_values_source( Fn && function, Fns && ... functions) {
626 std::size_t count( 1 + sizeof ... ( functions) );
627 typedef typename std::result_of< Fn() >::type return_t;
b32b8144 628 typedef boost::fibers::buffered_channel< return_t > channel_t;
7c673cae 629 // make the channel
b32b8144 630 auto chanp( std::make_shared< channel_t >( 64) );
7c673cae 631 // and make an nchannel facade to close it after 'count' items
b32b8144 632 auto ncp( std::make_shared< nchannel< return_t > >( chanp, count) );
7c673cae
FG
633 // pass that nchannel facade to all the relevant fibers
634 wait_all_values_impl< return_t >( ncp,
635 std::forward< Fn >( function),
636 std::forward< Fns >( functions) ... );
637 // then return the channel for consumer
b32b8144 638 return chanp;
7c673cae
FG
639}
640//]
641
642// When all passed functions have completed, return vector<T> containing
643// collected results. Assume that all passed functions have the same return
644// type. It is simply invalid to pass NO functions.
645//[wait_all_values
646template< typename Fn, typename ... Fns >
647std::vector< typename std::result_of< Fn() >::type >
648wait_all_values( Fn && function, Fns && ... functions) {
649 std::size_t count( 1 + sizeof ... ( functions) );
650 typedef typename std::result_of< Fn() >::type return_t;
651 typedef std::vector< return_t > vector_t;
652 vector_t results;
653 results.reserve( count);
654
655 // get channel
b32b8144 656 std::shared_ptr< boost::fibers::buffered_channel< return_t > > chan =
7c673cae
FG
657 wait_all_values_source( std::forward< Fn >( function),
658 std::forward< Fns >( functions) ... );
659 // fill results vector
660 return_t value;
b32b8144 661 while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
7c673cae
FG
662 results.push_back( value);
663 }
664 // return vector to caller
665 return results;
666}
667//]
668
669Example wav( runner, "wait_all_values()", [](){
670//[wait_all_values_source_ex
b32b8144 671 std::shared_ptr< boost::fibers::buffered_channel< std::string > > chan =
7c673cae
FG
672 wait_all_values_source(
673 [](){ return sleeper("wavs_third", 150); },
674 [](){ return sleeper("wavs_second", 100); },
675 [](){ return sleeper("wavs_first", 50); });
676 std::string value;
b32b8144 677 while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
7c673cae
FG
678 std::cout << "wait_all_values_source() => '" << value
679 << "'" << std::endl;
680 }
681//]
682
683//[wait_all_values_ex
684 std::vector< std::string > values =
685 wait_all_values(
686 [](){ return sleeper("wav_late", 150); },
687 [](){ return sleeper("wav_middle", 100); },
688 [](){ return sleeper("wav_early", 50); });
689//]
690 std::cout << "wait_all_values() =>";
691 for ( std::string const& v : values) {
692 std::cout << " '" << v << "'";
693 }
694 std::cout << std::endl;
695});
696
697/*****************************************************************************
698* when_all, throw first exception
699*****************************************************************************/
700//[wait_all_until_error_source
b32b8144 701// Return a shared_ptr<buffered_channel<future<T>>> from which the caller can
7c673cae
FG
702// get() each new result as it arrives, until 'closed'.
703template< typename Fn, typename ... Fns >
704std::shared_ptr<
b32b8144 705 boost::fibers::buffered_channel<
7c673cae
FG
706 boost::fibers::future<
707 typename std::result_of< Fn() >::type > > >
708wait_all_until_error_source( Fn && function, Fns && ... functions) {
709 std::size_t count( 1 + sizeof ... ( functions) );
710 typedef typename std::result_of< Fn() >::type return_t;
711 typedef boost::fibers::future< return_t > future_t;
b32b8144 712 typedef boost::fibers::buffered_channel< future_t > channel_t;
7c673cae 713 // make the channel
b32b8144 714 auto chanp( std::make_shared< channel_t >( 64) );
7c673cae 715 // and make an nchannel facade to close it after 'count' items
b32b8144 716 auto ncp( std::make_shared< nchannel< future_t > >( chanp, count) );
7c673cae
FG
717 // pass that nchannel facade to all the relevant fibers
718 wait_first_outcome_impl< return_t >( ncp,
719 std::forward< Fn >( function),
720 std::forward< Fns >( functions) ... );
721 // then return the channel for consumer
b32b8144 722 return chanp;
7c673cae
FG
723}
724//]
725
726// When all passed functions have completed, return vector<T> containing
727// collected results, or throw the first exception thrown by any of the passed
728// functions. Assume that all passed functions have the same return type. It
729// is simply invalid to pass NO functions.
730//[wait_all_until_error
731template< typename Fn, typename ... Fns >
732std::vector< typename std::result_of< Fn() >::type >
733wait_all_until_error( Fn && function, Fns && ... functions) {
734 std::size_t count( 1 + sizeof ... ( functions) );
735 typedef typename std::result_of< Fn() >::type return_t;
736 typedef typename boost::fibers::future< return_t > future_t;
737 typedef std::vector< return_t > vector_t;
738 vector_t results;
739 results.reserve( count);
740
741 // get channel
742 std::shared_ptr<
b32b8144 743 boost::fibers::buffered_channel< future_t > > chan(
7c673cae
FG
744 wait_all_until_error_source( std::forward< Fn >( function),
745 std::forward< Fns >( functions) ... ) );
746 // fill results vector
747 future_t future;
b32b8144 748 while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
7c673cae
FG
749 results.push_back( future.get() );
750 }
751 // return vector to caller
752 return results;
753}
754//]
755
756Example waue( runner, "wait_all_until_error()", [](){
757//[wait_all_until_error_source_ex
758 typedef boost::fibers::future< std::string > future_t;
b32b8144 759 std::shared_ptr< boost::fibers::buffered_channel< future_t > > chan =
7c673cae
FG
760 wait_all_until_error_source(
761 [](){ return sleeper("wauess_third", 150); },
762 [](){ return sleeper("wauess_second", 100); },
763 [](){ return sleeper("wauess_first", 50); });
764 future_t future;
b32b8144 765 while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
7c673cae
FG
766 std::string value( future.get() );
767 std::cout << "wait_all_until_error_source(success) => '" << value
768 << "'" << std::endl;
769 }
770//]
771
b32b8144 772 chan = wait_all_until_error_source(
7c673cae
FG
773 [](){ return sleeper("wauesf_third", 150); },
774 [](){ return sleeper("wauesf_second", 100, true); },
775 [](){ return sleeper("wauesf_first", 50); });
776//[wait_all_until_error_ex
777 std::string thrown;
778//<-
779 try {
b32b8144 780 while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
7c673cae
FG
781 std::string value( future.get() );
782 std::cout << "wait_all_until_error_source(fail) => '" << value
783 << "'" << std::endl;
784 }
785 } catch ( std::exception const& e) {
786 thrown = e.what();
787 }
788 std::cout << "wait_all_until_error_source(fail) threw '" << thrown
789 << "'" << std::endl;
790
791 thrown.clear();
792//->
793 try {
794 std::vector< std::string > values = wait_all_until_error(
795 [](){ return sleeper("waue_late", 150); },
796 [](){ return sleeper("waue_middle", 100, true); },
797 [](){ return sleeper("waue_early", 50); });
798//<-
799 std::cout << "wait_all_until_error(fail) =>";
800 for ( std::string const& v : values) {
801 std::cout << " '" << v << "'";
802 }
803 std::cout << std::endl;
804//->
805 } catch ( std::exception const& e) {
806 thrown = e.what();
807 }
808 std::cout << "wait_all_until_error(fail) threw '" << thrown
809 << "'" << std::endl;
810//]
811});
812
813/*****************************************************************************
814* when_all, collect exceptions
815*****************************************************************************/
816// When all passed functions have succeeded, return vector<T> containing
817// collected results, or throw exception_list containing all exceptions thrown
818// by any of the passed functions. Assume that all passed functions have the
819// same return type. It is simply invalid to pass NO functions.
820//[wait_all_collect_errors
821template< typename Fn, typename ... Fns >
822std::vector< typename std::result_of< Fn() >::type >
823wait_all_collect_errors( Fn && function, Fns && ... functions) {
824 std::size_t count( 1 + sizeof ... ( functions) );
825 typedef typename std::result_of< Fn() >::type return_t;
826 typedef typename boost::fibers::future< return_t > future_t;
827 typedef std::vector< return_t > vector_t;
828 vector_t results;
829 results.reserve( count);
830 exception_list exceptions("wait_all_collect_errors() exceptions");
831
832 // get channel
833 std::shared_ptr<
b32b8144 834 boost::fibers::buffered_channel< future_t > > chan(
7c673cae
FG
835 wait_all_until_error_source( std::forward< Fn >( function),
836 std::forward< Fns >( functions) ... ) );
837 // fill results and/or exceptions vectors
838 future_t future;
b32b8144 839 while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
7c673cae
FG
840 std::exception_ptr exp = future.get_exception_ptr();
841 if ( ! exp) {
842 results.push_back( future.get() );
843 } else {
844 exceptions.add( exp);
845 }
846 }
847 // if there were any exceptions, throw
848 if ( exceptions.size() ) {
849 throw exceptions;
850 }
851 // no exceptions: return vector to caller
852 return results;
853}
854//]
855
856Example wace( runner, "wait_all_collect_errors()", [](){
857 std::vector< std::string > values = wait_all_collect_errors(
858 [](){ return sleeper("waces_late", 150); },
859 [](){ return sleeper("waces_middle", 100); },
860 [](){ return sleeper("waces_early", 50); });
861 std::cout << "wait_all_collect_errors(success) =>";
862 for ( std::string const& v : values) {
863 std::cout << " '" << v << "'";
864 }
865 std::cout << std::endl;
866
867 std::string thrown;
868 std::size_t errors = 0;
869 try {
870 values = wait_all_collect_errors(
871 [](){ return sleeper("wacef_late", 150, true); },
872 [](){ return sleeper("wacef_middle", 100, true); },
873 [](){ return sleeper("wacef_early", 50); });
874 std::cout << "wait_all_collect_errors(fail) =>";
875 for ( std::string const& v : values) {
876 std::cout << " '" << v << "'";
877 }
878 std::cout << std::endl;
879 } catch ( exception_list const& e) {
880 thrown = e.what();
881 errors = e.size();
882 } catch ( std::exception const& e) {
883 thrown = e.what();
884 }
885 std::cout << "wait_all_collect_errors(fail) threw '" << thrown
886 << "': " << errors << " errors" << std::endl;
887});
888
889/*****************************************************************************
890* when_all, heterogeneous
891*****************************************************************************/
892//[wait_all_members_get
893template< typename Result, typename ... Futures >
894Result wait_all_members_get( Futures && ... futures) {
895 // Fetch the results from the passed futures into Result's initializer
896 // list. It's true that the get() calls here will block the implicit
897 // iteration over futures -- but that doesn't matter because we won't be
898 // done until the slowest of them finishes anyway. As results are
899 // processed in argument-list order rather than order of completion, the
900 // leftmost get() to throw an exception will cause that exception to
901 // propagate to the caller.
902 return Result{ futures.get() ... };
903}
904//]
905
906//[wait_all_members
907// Explicitly pass Result. This can be any type capable of being initialized
908// from the results of the passed functions, such as a struct.
909template< typename Result, typename ... Fns >
910Result wait_all_members( Fns && ... functions) {
911 // Run each of the passed functions on a separate fiber, passing all their
912 // futures to helper function for processing.
913 return wait_all_members_get< Result >(
914 boost::fibers::async( std::forward< Fns >( functions) ) ... );
915}
916//]
917
918// used by following example
919//[wait_Data
920struct Data {
921 std::string str;
922 double inexact;
923 int exact;
924
925 friend std::ostream& operator<<( std::ostream& out, Data const& data)/*=;
926 ...*/
927//<-
928 {
929 return out << "Data{str='" << data.str << "', inexact=" << data.inexact
930 << ", exact=" << data.exact << "}";
931 }
932//->
933};
934//]
935
936// example usage
937Example wam( runner, "wait_all_members()", [](){
938//[wait_all_members_data_ex
939 Data data = wait_all_members< Data >(
940 [](){ return sleeper("wams_left", 100); },
941 [](){ return sleeper(3.14, 150); },
942 [](){ return sleeper(17, 50); });
943 std::cout << "wait_all_members<Data>(success) => " << data << std::endl;
944//]
945
946 std::string thrown;
947 try {
948 data = wait_all_members< Data >(
949 [](){ return sleeper("wamf_left", 100, true); },
950 [](){ return sleeper(3.14, 150); },
951 [](){ return sleeper(17, 50, true); });
952 std::cout << "wait_all_members<Data>(fail) => " << data << std::endl;
953 } catch ( std::exception const& e) {
954 thrown = e.what();
955 }
956 std::cout << "wait_all_members<Data>(fail) threw '" << thrown
957 << '"' << std::endl;
958
959//[wait_all_members_vector_ex
960 // If we don't care about obtaining results as soon as they arrive, and we
961 // prefer a result vector in passed argument order rather than completion
962 // order, wait_all_members() is another possible implementation of
963 // wait_all_until_error().
964 auto strings = wait_all_members< std::vector< std::string > >(
965 [](){ return sleeper("wamv_left", 150); },
966 [](){ return sleeper("wamv_middle", 100); },
967 [](){ return sleeper("wamv_right", 50); });
968 std::cout << "wait_all_members<vector>() =>";
969 for ( std::string const& str : strings) {
970 std::cout << " '" << str << "'";
971 }
972 std::cout << std::endl;
973//]
974});
975
976
977/*****************************************************************************
978* main()
979*****************************************************************************/
980int main( int argc, char *argv[]) {
981 runner.run();
982 std::cout << "done." << std::endl;
983 return EXIT_SUCCESS;
984}