]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // Copyright Nat Goodspeed 2015. |
2 | // Distributed under the Boost Software License, Version 1.0. | |
3 | // (See accompanying file LICENSE_1_0.txt or copy at | |
4 | // http://www.boost.org/LICENSE_1_0.txt) | |
5 | // | |
6 | #include <algorithm> | |
7 | #include <cassert> | |
8 | #include <chrono> | |
9 | #include <iostream> | |
10 | #include <memory> | |
11 | #include <sstream> | |
12 | #include <string> | |
13 | #include <type_traits> | |
14 | #include <utility> | |
15 | #include <vector> | |
16 | ||
17 | #include <boost/fiber/all.hpp> | |
7c673cae FG |
18 | #include <boost/variant/variant.hpp> |
19 | #include <boost/variant/get.hpp> | |
20 | ||
21 | // These are wait_something() functions rather than when_something() | |
22 | // functions. A big part of the point of the Fiber library is to model | |
23 | // sequencing using the processor's instruction pointer rather than chains of | |
24 | // callbacks. The future-oriented when_all() / when_any() functions are still | |
25 | // based on chains of callbacks. With Fiber, we can do better. | |
26 | ||
27 | /***************************************************************************** | |
28 | * Verbose | |
29 | *****************************************************************************/ | |
b32b8144 | 30 | class Verbose { |
7c673cae FG |
31 | public: |
32 | Verbose( std::string const& d): | |
33 | desc( d) { | |
34 | std::cout << desc << " start" << std::endl; | |
35 | } | |
36 | ||
37 | ~Verbose() { | |
38 | std::cout << desc << " stop" << std::endl; | |
39 | } | |
40 | ||
b32b8144 FG |
41 | Verbose( Verbose const&) = delete; |
42 | Verbose & operator=( Verbose const&) = delete; | |
43 | ||
7c673cae FG |
44 | private: |
45 | const std::string desc; | |
46 | }; | |
47 | ||
48 | /***************************************************************************** | |
49 | * Runner and Example | |
50 | *****************************************************************************/ | |
51 | // collect and ultimately run every Example | |
52 | class Runner { | |
53 | typedef std::vector< std::pair< std::string, std::function< void() > > > function_list; | |
54 | ||
55 | public: | |
56 | void add( std::string const& desc, std::function< void() > const& func) { | |
57 | functions_.push_back( function_list::value_type( desc, func) ); | |
58 | } | |
59 | ||
60 | void run() { | |
61 | for ( function_list::value_type const& pair : functions_) { | |
62 | Verbose v( pair.first); | |
63 | pair.second(); | |
64 | } | |
65 | } | |
66 | ||
67 | private: | |
68 | function_list functions_; | |
69 | }; | |
70 | ||
71 | Runner runner; | |
72 | ||
73 | // Example allows us to embed Runner::add() calls at module scope | |
74 | struct Example { | |
75 | Example( Runner & runner, std::string const& desc, std::function< void() > const& func) { | |
76 | runner.add( desc, func); | |
77 | } | |
78 | }; | |
79 | ||
80 | /***************************************************************************** | |
81 | * example task functions | |
82 | *****************************************************************************/ | |
83 | //[wait_sleeper | |
84 | template< typename T > | |
85 | T sleeper_impl( T item, int ms, bool thrw = false) { | |
86 | std::ostringstream descb, funcb; | |
87 | descb << item; | |
88 | std::string desc( descb.str() ); | |
89 | funcb << " sleeper(" << item << ")"; | |
90 | Verbose v( funcb.str() ); | |
91 | ||
92 | boost::this_fiber::sleep_for( std::chrono::milliseconds( ms) ); | |
93 | if ( thrw) { | |
94 | throw std::runtime_error( desc); | |
95 | } | |
96 | return item; | |
97 | } | |
98 | //] | |
99 | ||
100 | inline | |
101 | std::string sleeper( std::string const& item, int ms, bool thrw = false) { | |
102 | return sleeper_impl( item, ms, thrw); | |
103 | } | |
104 | ||
105 | inline | |
106 | double sleeper( double item, int ms, bool thrw = false) { | |
107 | return sleeper_impl( item, ms, thrw); | |
108 | } | |
109 | ||
110 | inline | |
111 | int sleeper(int item, int ms, bool thrw = false) { | |
112 | return sleeper_impl( item, ms, thrw); | |
113 | } | |
114 | ||
115 | /***************************************************************************** | |
116 | * Done | |
117 | *****************************************************************************/ | |
118 | //[wait_done | |
119 | // Wrap canonical pattern for condition_variable + bool flag | |
120 | struct Done { | |
121 | private: | |
122 | boost::fibers::condition_variable cond; | |
123 | boost::fibers::mutex mutex; | |
124 | bool ready = false; | |
125 | ||
126 | public: | |
127 | typedef std::shared_ptr< Done > ptr; | |
128 | ||
129 | void wait() { | |
130 | std::unique_lock< boost::fibers::mutex > lock( mutex); | |
131 | cond.wait( lock, [this](){ return ready; }); | |
132 | } | |
133 | ||
134 | void notify() { | |
135 | { | |
136 | std::unique_lock< boost::fibers::mutex > lock( mutex); | |
137 | ready = true; | |
138 | } // release mutex | |
139 | cond.notify_one(); | |
140 | } | |
141 | }; | |
142 | //] | |
143 | ||
144 | /***************************************************************************** | |
145 | * when_any, simple completion | |
146 | *****************************************************************************/ | |
147 | //[wait_first_simple_impl | |
148 | // Degenerate case: when there are no functions to wait for, return | |
149 | // immediately. | |
150 | void wait_first_simple_impl( Done::ptr) { | |
151 | } | |
152 | ||
153 | // When there's at least one function to wait for, launch it and recur to | |
154 | // process the rest. | |
155 | template< typename Fn, typename ... Fns > | |
156 | void wait_first_simple_impl( Done::ptr done, Fn && function, Fns && ... functions) { | |
157 | boost::fibers::fiber( [done, function](){ | |
158 | function(); | |
159 | done->notify(); | |
160 | }).detach(); | |
161 | wait_first_simple_impl( done, std::forward< Fns >( functions) ... ); | |
162 | } | |
163 | //] | |
164 | ||
165 | // interface function: instantiate Done, launch tasks, wait for Done | |
166 | //[wait_first_simple | |
167 | template< typename ... Fns > | |
168 | void wait_first_simple( Fns && ... functions) { | |
169 | // Use shared_ptr because each function's fiber will bind it separately, | |
170 | // and we're going to return before the last of them completes. | |
171 | auto done( std::make_shared< Done >() ); | |
172 | wait_first_simple_impl( done, std::forward< Fns >( functions) ... ); | |
173 | done->wait(); | |
174 | } | |
175 | //] | |
176 | ||
177 | // example usage | |
178 | Example wfs( runner, "wait_first_simple()", [](){ | |
179 | //[wait_first_simple_ex | |
180 | wait_first_simple( | |
181 | [](){ sleeper("wfs_long", 150); }, | |
182 | [](){ sleeper("wfs_medium", 100); }, | |
183 | [](){ sleeper("wfs_short", 50); }); | |
184 | //] | |
185 | }); | |
186 | ||
187 | /***************************************************************************** | |
188 | * when_any, return value | |
189 | *****************************************************************************/ | |
190 | // When there's only one function, call this overload | |
191 | //[wait_first_value_impl | |
192 | template< typename T, typename Fn > | |
b32b8144 | 193 | void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan, |
7c673cae | 194 | Fn && function) { |
b32b8144 | 195 | boost::fibers::fiber( [chan, function](){ |
7c673cae FG |
196 | // Ignore channel_op_status returned by push(): |
197 | // might be closed; we simply don't care. | |
b32b8144 | 198 | chan->push( function() ); |
7c673cae FG |
199 | }).detach(); |
200 | } | |
201 | //] | |
202 | ||
203 | // When there are two or more functions, call this overload | |
204 | template< typename T, typename Fn0, typename Fn1, typename ... Fns > | |
b32b8144 | 205 | void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan, |
7c673cae FG |
206 | Fn0 && function0, |
207 | Fn1 && function1, | |
208 | Fns && ... functions) { | |
209 | // process the first function using the single-function overload | |
b32b8144 | 210 | wait_first_value_impl< T >( chan, |
7c673cae FG |
211 | std::forward< Fn0 >( function0) ); |
212 | // then recur to process the rest | |
b32b8144 | 213 | wait_first_value_impl< T >( chan, |
7c673cae FG |
214 | std::forward< Fn1 >( function1), |
215 | std::forward< Fns >( functions) ... ); | |
216 | } | |
217 | ||
218 | //[wait_first_value | |
219 | // Assume that all passed functions have the same return type. The return type | |
220 | // of wait_first_value() is the return type of the first passed function. It is | |
221 | // simply invalid to pass NO functions. | |
222 | template< typename Fn, typename ... Fns > | |
223 | typename std::result_of< Fn() >::type | |
224 | wait_first_value( Fn && function, Fns && ... functions) { | |
225 | typedef typename std::result_of< Fn() >::type return_t; | |
b32b8144 FG |
226 | typedef boost::fibers::buffered_channel< return_t > channel_t; |
227 | auto chanp( std::make_shared< channel_t >( 64) ); | |
7c673cae | 228 | // launch all the relevant fibers |
b32b8144 | 229 | wait_first_value_impl< return_t >( chanp, |
7c673cae FG |
230 | std::forward< Fn >( function), |
231 | std::forward< Fns >( functions) ... ); | |
232 | // retrieve the first value | |
b32b8144 | 233 | return_t value( chanp->value_pop() ); |
7c673cae | 234 | // close the channel: no subsequent push() has to succeed |
b32b8144 | 235 | chanp->close(); |
7c673cae FG |
236 | return value; |
237 | } | |
238 | //] | |
239 | ||
240 | // example usage | |
241 | Example wfv( runner, "wait_first_value()", [](){ | |
242 | //[wait_first_value_ex | |
243 | std::string result = wait_first_value( | |
244 | [](){ return sleeper("wfv_third", 150); }, | |
245 | [](){ return sleeper("wfv_second", 100); }, | |
246 | [](){ return sleeper("wfv_first", 50); }); | |
247 | std::cout << "wait_first_value() => " << result << std::endl; | |
248 | assert(result == "wfv_first"); | |
249 | //] | |
250 | }); | |
251 | ||
252 | /***************************************************************************** | |
253 | * when_any, produce first outcome, whether result or exception | |
254 | *****************************************************************************/ | |
255 | // When there's only one function, call this overload. | |
256 | //[wait_first_outcome_impl | |
b32b8144 FG |
257 | template< typename T, typename CHANP, typename Fn > |
258 | void wait_first_outcome_impl( CHANP chan, Fn && function) { | |
7c673cae FG |
259 | boost::fibers::fiber( |
260 | // Use std::bind() here for C++11 compatibility. C++11 lambda capture | |
261 | // can't move a move-only Fn type, but bind() can. Let bind() move the | |
262 | // channel pointer and the function into the bound object, passing | |
263 | // references into the lambda. | |
264 | std::bind( | |
b32b8144 | 265 | []( CHANP & chan, |
7c673cae FG |
266 | typename std::decay< Fn >::type & function) { |
267 | // Instantiate a packaged_task to capture any exception thrown by | |
268 | // function. | |
269 | boost::fibers::packaged_task< T() > task( function); | |
270 | // Immediately run this packaged_task on same fiber. We want | |
271 | // function() to have completed BEFORE we push the future. | |
272 | task(); | |
273 | // Pass the corresponding future to consumer. Ignore | |
274 | // channel_op_status returned by push(): might be closed; we | |
275 | // simply don't care. | |
b32b8144 | 276 | chan->push( task.get_future() ); |
7c673cae | 277 | }, |
b32b8144 | 278 | chan, |
7c673cae FG |
279 | std::forward< Fn >( function) |
280 | )).detach(); | |
281 | } | |
282 | //] | |
283 | ||
284 | // When there are two or more functions, call this overload | |
b32b8144 FG |
285 | template< typename T, typename CHANP, typename Fn0, typename Fn1, typename ... Fns > |
286 | void wait_first_outcome_impl( CHANP chan, | |
7c673cae FG |
287 | Fn0 && function0, |
288 | Fn1 && function1, | |
289 | Fns && ... functions) { | |
290 | // process the first function using the single-function overload | |
b32b8144 | 291 | wait_first_outcome_impl< T >( chan, |
7c673cae FG |
292 | std::forward< Fn0 >( function0) ); |
293 | // then recur to process the rest | |
b32b8144 | 294 | wait_first_outcome_impl< T >( chan, |
7c673cae FG |
295 | std::forward< Fn1 >( function1), |
296 | std::forward< Fns >( functions) ... ); | |
297 | } | |
298 | ||
299 | // Assume that all passed functions have the same return type. The return type | |
300 | // of wait_first_outcome() is the return type of the first passed function. It is | |
301 | // simply invalid to pass NO functions. | |
302 | //[wait_first_outcome | |
303 | template< typename Fn, typename ... Fns > | |
304 | typename std::result_of< Fn() >::type | |
305 | wait_first_outcome( Fn && function, Fns && ... functions) { | |
306 | // In this case, the value we pass through the channel is actually a | |
307 | // future -- which is already ready. future can carry either a value or an | |
308 | // exception. | |
309 | typedef typename std::result_of< Fn() >::type return_t; | |
310 | typedef boost::fibers::future< return_t > future_t; | |
b32b8144 FG |
311 | typedef boost::fibers::buffered_channel< future_t > channel_t; |
312 | auto chanp(std::make_shared< channel_t >( 64) ); | |
7c673cae | 313 | // launch all the relevant fibers |
b32b8144 | 314 | wait_first_outcome_impl< return_t >( chanp, |
7c673cae FG |
315 | std::forward< Fn >( function), |
316 | std::forward< Fns >( functions) ... ); | |
317 | // retrieve the first future | |
b32b8144 | 318 | future_t future( chanp->value_pop() ); |
7c673cae | 319 | // close the channel: no subsequent push() has to succeed |
b32b8144 | 320 | chanp->close(); |
7c673cae FG |
321 | // either return value or throw exception |
322 | return future.get(); | |
323 | } | |
324 | //] | |
325 | ||
326 | // example usage | |
327 | Example wfo( runner, "wait_first_outcome()", [](){ | |
328 | //[wait_first_outcome_ex | |
329 | std::string result = wait_first_outcome( | |
330 | [](){ return sleeper("wfos_first", 50); }, | |
331 | [](){ return sleeper("wfos_second", 100); }, | |
332 | [](){ return sleeper("wfos_third", 150); }); | |
333 | std::cout << "wait_first_outcome(success) => " << result << std::endl; | |
334 | assert(result == "wfos_first"); | |
335 | ||
336 | std::string thrown; | |
337 | try { | |
338 | result = wait_first_outcome( | |
339 | [](){ return sleeper("wfof_first", 50, true); }, | |
340 | [](){ return sleeper("wfof_second", 100); }, | |
341 | [](){ return sleeper("wfof_third", 150); }); | |
342 | } catch ( std::exception const& e) { | |
343 | thrown = e.what(); | |
344 | } | |
345 | std::cout << "wait_first_outcome(fail) threw '" << thrown | |
346 | << "'" << std::endl; | |
347 | assert(thrown == "wfof_first"); | |
348 | //] | |
349 | }); | |
350 | ||
351 | /***************************************************************************** | |
352 | * when_any, collect exceptions until success; throw exception_list if no | |
353 | * success | |
354 | *****************************************************************************/ | |
355 | // define an exception to aggregate exception_ptrs; prefer | |
356 | // std::exception_list (N4407 et al.) once that becomes available | |
357 | //[exception_list | |
358 | class exception_list : public std::runtime_error { | |
359 | public: | |
360 | exception_list( std::string const& what) : | |
361 | std::runtime_error( what) { | |
362 | } | |
363 | ||
364 | typedef std::vector< std::exception_ptr > bundle_t; | |
365 | ||
366 | // N4407 proposed std::exception_list API | |
367 | typedef bundle_t::const_iterator iterator; | |
368 | ||
369 | std::size_t size() const noexcept { | |
370 | return bundle_.size(); | |
371 | } | |
372 | ||
373 | iterator begin() const noexcept { | |
374 | return bundle_.begin(); | |
375 | } | |
376 | ||
377 | iterator end() const noexcept { | |
378 | return bundle_.end(); | |
379 | } | |
380 | ||
381 | // extension to populate | |
382 | void add( std::exception_ptr ep) { | |
383 | bundle_.push_back( ep); | |
384 | } | |
385 | ||
386 | private: | |
387 | bundle_t bundle_; | |
388 | }; | |
389 | //] | |
390 | ||
391 | // Assume that all passed functions have the same return type. The return type | |
392 | // of wait_first_success() is the return type of the first passed function. It is | |
393 | // simply invalid to pass NO functions. | |
394 | //[wait_first_success | |
395 | template< typename Fn, typename ... Fns > | |
396 | typename std::result_of< Fn() >::type | |
397 | wait_first_success( Fn && function, Fns && ... functions) { | |
398 | std::size_t count( 1 + sizeof ... ( functions) ); | |
399 | // In this case, the value we pass through the channel is actually a | |
400 | // future -- which is already ready. future can carry either a value or an | |
401 | // exception. | |
402 | typedef typename std::result_of< typename std::decay< Fn >::type() >::type return_t; | |
403 | typedef boost::fibers::future< return_t > future_t; | |
b32b8144 FG |
404 | typedef boost::fibers::buffered_channel< future_t > channel_t; |
405 | auto chanp( std::make_shared< channel_t >( 64) ); | |
7c673cae | 406 | // launch all the relevant fibers |
b32b8144 | 407 | wait_first_outcome_impl< return_t >( chanp, |
7c673cae FG |
408 | std::forward< Fn >( function), |
409 | std::forward< Fns >( functions) ... ); | |
410 | // instantiate exception_list, just in case | |
411 | exception_list exceptions("wait_first_success() produced only errors"); | |
412 | // retrieve up to 'count' results -- but stop there! | |
413 | for ( std::size_t i = 0; i < count; ++i) { | |
414 | // retrieve the next future | |
b32b8144 | 415 | future_t future( chanp->value_pop() ); |
7c673cae FG |
416 | // retrieve exception_ptr if any |
417 | std::exception_ptr error( future.get_exception_ptr() ); | |
418 | // if no error, then yay, return value | |
419 | if ( ! error) { | |
420 | // close the channel: no subsequent push() has to succeed | |
b32b8144 | 421 | chanp->close(); |
7c673cae FG |
422 | // show caller the value we got |
423 | return future.get(); | |
424 | } | |
425 | ||
426 | // error is non-null: collect | |
427 | exceptions.add( error); | |
428 | } | |
429 | // We only arrive here when every passed function threw an exception. | |
430 | // Throw our collection to inform caller. | |
431 | throw exceptions; | |
432 | } | |
433 | //] | |
434 | ||
435 | // example usage | |
436 | Example wfss( runner, "wait_first_success()", [](){ | |
437 | //[wait_first_success_ex | |
438 | std::string result = wait_first_success( | |
439 | [](){ return sleeper("wfss_first", 50, true); }, | |
440 | [](){ return sleeper("wfss_second", 100); }, | |
441 | [](){ return sleeper("wfss_third", 150); }); | |
442 | std::cout << "wait_first_success(success) => " << result << std::endl; | |
443 | assert(result == "wfss_second"); | |
444 | //] | |
445 | ||
446 | std::string thrown; | |
447 | std::size_t count = 0; | |
448 | try { | |
449 | result = wait_first_success( | |
450 | [](){ return sleeper("wfsf_first", 50, true); }, | |
451 | [](){ return sleeper("wfsf_second", 100, true); }, | |
452 | [](){ return sleeper("wfsf_third", 150, true); }); | |
453 | } catch ( exception_list const& e) { | |
454 | thrown = e.what(); | |
455 | count = e.size(); | |
456 | } catch ( std::exception const& e) { | |
457 | thrown = e.what(); | |
458 | } | |
459 | std::cout << "wait_first_success(fail) threw '" << thrown << "': " | |
460 | << count << " errors" << std::endl; | |
461 | assert(thrown == "wait_first_success() produced only errors"); | |
462 | assert(count == 3); | |
463 | }); | |
464 | ||
465 | /***************************************************************************** | |
466 | * when_any, heterogeneous | |
467 | *****************************************************************************/ | |
468 | //[wait_first_value_het | |
469 | // No need to break out the first Fn for interface function: let the compiler | |
470 | // complain if empty. | |
471 | // Our functions have different return types, and we might have to return any | |
472 | // of them. Use a variant, expanding std::result_of<Fn()>::type for each Fn in | |
473 | // parameter pack. | |
474 | template< typename ... Fns > | |
475 | boost::variant< typename std::result_of< Fns() >::type ... > | |
476 | wait_first_value_het( Fns && ... functions) { | |
b32b8144 | 477 | // Use buffered_channel<boost::variant<T1, T2, ...>>; see remarks above. |
7c673cae | 478 | typedef boost::variant< typename std::result_of< Fns() >::type ... > return_t; |
b32b8144 FG |
479 | typedef boost::fibers::buffered_channel< return_t > channel_t; |
480 | auto chanp( std::make_shared< channel_t >( 64) ); | |
7c673cae | 481 | // launch all the relevant fibers |
b32b8144 | 482 | wait_first_value_impl< return_t >( chanp, |
7c673cae FG |
483 | std::forward< Fns >( functions) ... ); |
484 | // retrieve the first value | |
b32b8144 | 485 | return_t value( chanp->value_pop() ); |
7c673cae | 486 | // close the channel: no subsequent push() has to succeed |
b32b8144 | 487 | chanp->close(); |
7c673cae FG |
488 | return value; |
489 | } | |
490 | //] | |
491 | ||
492 | // example usage | |
493 | Example wfvh( runner, "wait_first_value_het()", [](){ | |
494 | //[wait_first_value_het_ex | |
495 | boost::variant< std::string, double, int > result = | |
496 | wait_first_value_het( | |
497 | [](){ return sleeper("wfvh_third", 150); }, | |
498 | [](){ return sleeper(3.14, 100); }, | |
499 | [](){ return sleeper(17, 50); }); | |
500 | std::cout << "wait_first_value_het() => " << result << std::endl; | |
501 | assert(boost::get< int >( result) == 17); | |
502 | //] | |
503 | }); | |
504 | ||
505 | /***************************************************************************** | |
506 | * when_all, simple completion | |
507 | *****************************************************************************/ | |
508 | // Degenerate case: when there are no functions to wait for, return | |
509 | // immediately. | |
510 | void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier >) { | |
511 | } | |
512 | ||
513 | // When there's at least one function to wait for, launch it and recur to | |
514 | // process the rest. | |
515 | //[wait_all_simple_impl | |
516 | template< typename Fn, typename ... Fns > | |
517 | void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier > barrier, | |
518 | Fn && function, Fns && ... functions) { | |
519 | boost::fibers::fiber( | |
520 | std::bind( | |
521 | []( std::shared_ptr< boost::fibers::barrier > & barrier, | |
522 | typename std::decay< Fn >::type & function) mutable { | |
523 | function(); | |
524 | barrier->wait(); | |
525 | }, | |
526 | barrier, | |
527 | std::forward< Fn >( function) | |
528 | )).detach(); | |
529 | wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... ); | |
530 | } | |
531 | //] | |
532 | ||
533 | // interface function: instantiate barrier, launch tasks, wait for barrier | |
534 | //[wait_all_simple | |
535 | template< typename ... Fns > | |
536 | void wait_all_simple( Fns && ... functions) { | |
537 | std::size_t count( sizeof ... ( functions) ); | |
538 | // Initialize a barrier(count+1) because we'll immediately wait on it. We | |
539 | // don't want to wake up until 'count' more fibers wait on it. Even though | |
540 | // we'll stick around until the last of them completes, use shared_ptr | |
541 | // anyway because it's easier to be confident about lifespan issues. | |
542 | auto barrier( std::make_shared< boost::fibers::barrier >( count + 1) ); | |
543 | wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... ); | |
544 | barrier->wait(); | |
545 | } | |
546 | //] | |
547 | ||
548 | // example usage | |
549 | Example was( runner, "wait_all_simple()", [](){ | |
550 | //[wait_all_simple_ex | |
551 | wait_all_simple( | |
552 | [](){ sleeper("was_long", 150); }, | |
553 | [](){ sleeper("was_medium", 100); }, | |
554 | [](){ sleeper("was_short", 50); }); | |
555 | //] | |
556 | }); | |
557 | ||
558 | /***************************************************************************** | |
559 | * when_all, return values | |
560 | *****************************************************************************/ | |
561 | //[wait_nchannel | |
562 | // Introduce a channel facade that closes the channel once a specific number | |
563 | // of items has been pushed. This allows an arbitrary consumer to read until | |
564 | // 'closed' without itself having to count items. | |
565 | template< typename T > | |
566 | class nchannel { | |
567 | public: | |
b32b8144 | 568 | nchannel( std::shared_ptr< boost::fibers::buffered_channel< T > > chan, |
7c673cae | 569 | std::size_t lm): |
b32b8144 | 570 | chan_( chan), |
7c673cae | 571 | limit_( lm) { |
b32b8144 | 572 | assert(chan_); |
7c673cae | 573 | if ( 0 == limit_) { |
b32b8144 | 574 | chan_->close(); |
7c673cae FG |
575 | } |
576 | } | |
577 | ||
578 | boost::fibers::channel_op_status push( T && va) { | |
579 | boost::fibers::channel_op_status ok = | |
b32b8144 | 580 | chan_->push( std::forward< T >( va) ); |
7c673cae FG |
581 | if ( ok == boost::fibers::channel_op_status::success && |
582 | --limit_ == 0) { | |
583 | // after the 'limit_'th successful push, close the channel | |
b32b8144 | 584 | chan_->close(); |
7c673cae FG |
585 | } |
586 | return ok; | |
587 | } | |
588 | ||
589 | private: | |
b32b8144 | 590 | std::shared_ptr< boost::fibers::buffered_channel< T > > chan_; |
7c673cae FG |
591 | std::size_t limit_; |
592 | }; | |
593 | //] | |
594 | ||
595 | // When there's only one function, call this overload | |
596 | //[wait_all_values_impl | |
597 | template< typename T, typename Fn > | |
b32b8144 | 598 | void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan, |
7c673cae | 599 | Fn && function) { |
b32b8144 FG |
600 | boost::fibers::fiber( [chan, function](){ |
601 | chan->push(function()); | |
7c673cae FG |
602 | }).detach(); |
603 | } | |
604 | //] | |
605 | ||
606 | // When there are two or more functions, call this overload | |
607 | template< typename T, typename Fn0, typename Fn1, typename ... Fns > | |
b32b8144 | 608 | void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan, |
7c673cae FG |
609 | Fn0 && function0, |
610 | Fn1 && function1, | |
611 | Fns && ... functions) { | |
612 | // process the first function using the single-function overload | |
b32b8144 | 613 | wait_all_values_impl< T >( chan, std::forward< Fn0 >( function0) ); |
7c673cae | 614 | // then recur to process the rest |
b32b8144 | 615 | wait_all_values_impl< T >( chan, |
7c673cae FG |
616 | std::forward< Fn1 >( function1), |
617 | std::forward< Fns >( functions) ... ); | |
618 | } | |
619 | ||
620 | //[wait_all_values_source | |
b32b8144 | 621 | // Return a shared_ptr<buffered_channel<T>> from which the caller can |
7c673cae FG |
622 | // retrieve each new result as it arrives, until 'closed'. |
623 | template< typename Fn, typename ... Fns > | |
b32b8144 | 624 | std::shared_ptr< boost::fibers::buffered_channel< typename std::result_of< Fn() >::type > > |
7c673cae FG |
625 | wait_all_values_source( Fn && function, Fns && ... functions) { |
626 | std::size_t count( 1 + sizeof ... ( functions) ); | |
627 | typedef typename std::result_of< Fn() >::type return_t; | |
b32b8144 | 628 | typedef boost::fibers::buffered_channel< return_t > channel_t; |
7c673cae | 629 | // make the channel |
b32b8144 | 630 | auto chanp( std::make_shared< channel_t >( 64) ); |
7c673cae | 631 | // and make an nchannel facade to close it after 'count' items |
b32b8144 | 632 | auto ncp( std::make_shared< nchannel< return_t > >( chanp, count) ); |
7c673cae FG |
633 | // pass that nchannel facade to all the relevant fibers |
634 | wait_all_values_impl< return_t >( ncp, | |
635 | std::forward< Fn >( function), | |
636 | std::forward< Fns >( functions) ... ); | |
637 | // then return the channel for consumer | |
b32b8144 | 638 | return chanp; |
7c673cae FG |
639 | } |
640 | //] | |
641 | ||
642 | // When all passed functions have completed, return vector<T> containing | |
643 | // collected results. Assume that all passed functions have the same return | |
644 | // type. It is simply invalid to pass NO functions. | |
645 | //[wait_all_values | |
646 | template< typename Fn, typename ... Fns > | |
647 | std::vector< typename std::result_of< Fn() >::type > | |
648 | wait_all_values( Fn && function, Fns && ... functions) { | |
649 | std::size_t count( 1 + sizeof ... ( functions) ); | |
650 | typedef typename std::result_of< Fn() >::type return_t; | |
651 | typedef std::vector< return_t > vector_t; | |
652 | vector_t results; | |
653 | results.reserve( count); | |
654 | ||
655 | // get channel | |
b32b8144 | 656 | std::shared_ptr< boost::fibers::buffered_channel< return_t > > chan = |
7c673cae FG |
657 | wait_all_values_source( std::forward< Fn >( function), |
658 | std::forward< Fns >( functions) ... ); | |
659 | // fill results vector | |
660 | return_t value; | |
b32b8144 | 661 | while ( boost::fibers::channel_op_status::success == chan->pop(value) ) { |
7c673cae FG |
662 | results.push_back( value); |
663 | } | |
664 | // return vector to caller | |
665 | return results; | |
666 | } | |
667 | //] | |
668 | ||
669 | Example wav( runner, "wait_all_values()", [](){ | |
670 | //[wait_all_values_source_ex | |
b32b8144 | 671 | std::shared_ptr< boost::fibers::buffered_channel< std::string > > chan = |
7c673cae FG |
672 | wait_all_values_source( |
673 | [](){ return sleeper("wavs_third", 150); }, | |
674 | [](){ return sleeper("wavs_second", 100); }, | |
675 | [](){ return sleeper("wavs_first", 50); }); | |
676 | std::string value; | |
b32b8144 | 677 | while ( boost::fibers::channel_op_status::success == chan->pop(value) ) { |
7c673cae FG |
678 | std::cout << "wait_all_values_source() => '" << value |
679 | << "'" << std::endl; | |
680 | } | |
681 | //] | |
682 | ||
683 | //[wait_all_values_ex | |
684 | std::vector< std::string > values = | |
685 | wait_all_values( | |
686 | [](){ return sleeper("wav_late", 150); }, | |
687 | [](){ return sleeper("wav_middle", 100); }, | |
688 | [](){ return sleeper("wav_early", 50); }); | |
689 | //] | |
690 | std::cout << "wait_all_values() =>"; | |
691 | for ( std::string const& v : values) { | |
692 | std::cout << " '" << v << "'"; | |
693 | } | |
694 | std::cout << std::endl; | |
695 | }); | |
696 | ||
697 | /***************************************************************************** | |
698 | * when_all, throw first exception | |
699 | *****************************************************************************/ | |
700 | //[wait_all_until_error_source | |
b32b8144 | 701 | // Return a shared_ptr<buffered_channel<future<T>>> from which the caller can |
7c673cae FG |
702 | // get() each new result as it arrives, until 'closed'. |
703 | template< typename Fn, typename ... Fns > | |
704 | std::shared_ptr< | |
b32b8144 | 705 | boost::fibers::buffered_channel< |
7c673cae FG |
706 | boost::fibers::future< |
707 | typename std::result_of< Fn() >::type > > > | |
708 | wait_all_until_error_source( Fn && function, Fns && ... functions) { | |
709 | std::size_t count( 1 + sizeof ... ( functions) ); | |
710 | typedef typename std::result_of< Fn() >::type return_t; | |
711 | typedef boost::fibers::future< return_t > future_t; | |
b32b8144 | 712 | typedef boost::fibers::buffered_channel< future_t > channel_t; |
7c673cae | 713 | // make the channel |
b32b8144 | 714 | auto chanp( std::make_shared< channel_t >( 64) ); |
7c673cae | 715 | // and make an nchannel facade to close it after 'count' items |
b32b8144 | 716 | auto ncp( std::make_shared< nchannel< future_t > >( chanp, count) ); |
7c673cae FG |
717 | // pass that nchannel facade to all the relevant fibers |
718 | wait_first_outcome_impl< return_t >( ncp, | |
719 | std::forward< Fn >( function), | |
720 | std::forward< Fns >( functions) ... ); | |
721 | // then return the channel for consumer | |
b32b8144 | 722 | return chanp; |
7c673cae FG |
723 | } |
724 | //] | |
725 | ||
726 | // When all passed functions have completed, return vector<T> containing | |
727 | // collected results, or throw the first exception thrown by any of the passed | |
728 | // functions. Assume that all passed functions have the same return type. It | |
729 | // is simply invalid to pass NO functions. | |
730 | //[wait_all_until_error | |
731 | template< typename Fn, typename ... Fns > | |
732 | std::vector< typename std::result_of< Fn() >::type > | |
733 | wait_all_until_error( Fn && function, Fns && ... functions) { | |
734 | std::size_t count( 1 + sizeof ... ( functions) ); | |
735 | typedef typename std::result_of< Fn() >::type return_t; | |
736 | typedef typename boost::fibers::future< return_t > future_t; | |
737 | typedef std::vector< return_t > vector_t; | |
738 | vector_t results; | |
739 | results.reserve( count); | |
740 | ||
741 | // get channel | |
742 | std::shared_ptr< | |
b32b8144 | 743 | boost::fibers::buffered_channel< future_t > > chan( |
7c673cae FG |
744 | wait_all_until_error_source( std::forward< Fn >( function), |
745 | std::forward< Fns >( functions) ... ) ); | |
746 | // fill results vector | |
747 | future_t future; | |
b32b8144 | 748 | while ( boost::fibers::channel_op_status::success == chan->pop( future) ) { |
7c673cae FG |
749 | results.push_back( future.get() ); |
750 | } | |
751 | // return vector to caller | |
752 | return results; | |
753 | } | |
754 | //] | |
755 | ||
756 | Example waue( runner, "wait_all_until_error()", [](){ | |
757 | //[wait_all_until_error_source_ex | |
758 | typedef boost::fibers::future< std::string > future_t; | |
b32b8144 | 759 | std::shared_ptr< boost::fibers::buffered_channel< future_t > > chan = |
7c673cae FG |
760 | wait_all_until_error_source( |
761 | [](){ return sleeper("wauess_third", 150); }, | |
762 | [](){ return sleeper("wauess_second", 100); }, | |
763 | [](){ return sleeper("wauess_first", 50); }); | |
764 | future_t future; | |
b32b8144 | 765 | while ( boost::fibers::channel_op_status::success == chan->pop( future) ) { |
7c673cae FG |
766 | std::string value( future.get() ); |
767 | std::cout << "wait_all_until_error_source(success) => '" << value | |
768 | << "'" << std::endl; | |
769 | } | |
770 | //] | |
771 | ||
b32b8144 | 772 | chan = wait_all_until_error_source( |
7c673cae FG |
773 | [](){ return sleeper("wauesf_third", 150); }, |
774 | [](){ return sleeper("wauesf_second", 100, true); }, | |
775 | [](){ return sleeper("wauesf_first", 50); }); | |
776 | //[wait_all_until_error_ex | |
777 | std::string thrown; | |
778 | //<- | |
779 | try { | |
b32b8144 | 780 | while ( boost::fibers::channel_op_status::success == chan->pop( future) ) { |
7c673cae FG |
781 | std::string value( future.get() ); |
782 | std::cout << "wait_all_until_error_source(fail) => '" << value | |
783 | << "'" << std::endl; | |
784 | } | |
785 | } catch ( std::exception const& e) { | |
786 | thrown = e.what(); | |
787 | } | |
788 | std::cout << "wait_all_until_error_source(fail) threw '" << thrown | |
789 | << "'" << std::endl; | |
790 | ||
791 | thrown.clear(); | |
792 | //-> | |
793 | try { | |
794 | std::vector< std::string > values = wait_all_until_error( | |
795 | [](){ return sleeper("waue_late", 150); }, | |
796 | [](){ return sleeper("waue_middle", 100, true); }, | |
797 | [](){ return sleeper("waue_early", 50); }); | |
798 | //<- | |
799 | std::cout << "wait_all_until_error(fail) =>"; | |
800 | for ( std::string const& v : values) { | |
801 | std::cout << " '" << v << "'"; | |
802 | } | |
803 | std::cout << std::endl; | |
804 | //-> | |
805 | } catch ( std::exception const& e) { | |
806 | thrown = e.what(); | |
807 | } | |
808 | std::cout << "wait_all_until_error(fail) threw '" << thrown | |
809 | << "'" << std::endl; | |
810 | //] | |
811 | }); | |
812 | ||
813 | /***************************************************************************** | |
814 | * when_all, collect exceptions | |
815 | *****************************************************************************/ | |
816 | // When all passed functions have succeeded, return vector<T> containing | |
817 | // collected results, or throw exception_list containing all exceptions thrown | |
818 | // by any of the passed functions. Assume that all passed functions have the | |
819 | // same return type. It is simply invalid to pass NO functions. | |
820 | //[wait_all_collect_errors | |
821 | template< typename Fn, typename ... Fns > | |
822 | std::vector< typename std::result_of< Fn() >::type > | |
823 | wait_all_collect_errors( Fn && function, Fns && ... functions) { | |
824 | std::size_t count( 1 + sizeof ... ( functions) ); | |
825 | typedef typename std::result_of< Fn() >::type return_t; | |
826 | typedef typename boost::fibers::future< return_t > future_t; | |
827 | typedef std::vector< return_t > vector_t; | |
828 | vector_t results; | |
829 | results.reserve( count); | |
830 | exception_list exceptions("wait_all_collect_errors() exceptions"); | |
831 | ||
832 | // get channel | |
833 | std::shared_ptr< | |
b32b8144 | 834 | boost::fibers::buffered_channel< future_t > > chan( |
7c673cae FG |
835 | wait_all_until_error_source( std::forward< Fn >( function), |
836 | std::forward< Fns >( functions) ... ) ); | |
837 | // fill results and/or exceptions vectors | |
838 | future_t future; | |
b32b8144 | 839 | while ( boost::fibers::channel_op_status::success == chan->pop( future) ) { |
7c673cae FG |
840 | std::exception_ptr exp = future.get_exception_ptr(); |
841 | if ( ! exp) { | |
842 | results.push_back( future.get() ); | |
843 | } else { | |
844 | exceptions.add( exp); | |
845 | } | |
846 | } | |
847 | // if there were any exceptions, throw | |
848 | if ( exceptions.size() ) { | |
849 | throw exceptions; | |
850 | } | |
851 | // no exceptions: return vector to caller | |
852 | return results; | |
853 | } | |
854 | //] | |
855 | ||
856 | Example wace( runner, "wait_all_collect_errors()", [](){ | |
857 | std::vector< std::string > values = wait_all_collect_errors( | |
858 | [](){ return sleeper("waces_late", 150); }, | |
859 | [](){ return sleeper("waces_middle", 100); }, | |
860 | [](){ return sleeper("waces_early", 50); }); | |
861 | std::cout << "wait_all_collect_errors(success) =>"; | |
862 | for ( std::string const& v : values) { | |
863 | std::cout << " '" << v << "'"; | |
864 | } | |
865 | std::cout << std::endl; | |
866 | ||
867 | std::string thrown; | |
868 | std::size_t errors = 0; | |
869 | try { | |
870 | values = wait_all_collect_errors( | |
871 | [](){ return sleeper("wacef_late", 150, true); }, | |
872 | [](){ return sleeper("wacef_middle", 100, true); }, | |
873 | [](){ return sleeper("wacef_early", 50); }); | |
874 | std::cout << "wait_all_collect_errors(fail) =>"; | |
875 | for ( std::string const& v : values) { | |
876 | std::cout << " '" << v << "'"; | |
877 | } | |
878 | std::cout << std::endl; | |
879 | } catch ( exception_list const& e) { | |
880 | thrown = e.what(); | |
881 | errors = e.size(); | |
882 | } catch ( std::exception const& e) { | |
883 | thrown = e.what(); | |
884 | } | |
885 | std::cout << "wait_all_collect_errors(fail) threw '" << thrown | |
886 | << "': " << errors << " errors" << std::endl; | |
887 | }); | |
888 | ||
889 | /***************************************************************************** | |
890 | * when_all, heterogeneous | |
891 | *****************************************************************************/ | |
892 | //[wait_all_members_get | |
893 | template< typename Result, typename ... Futures > | |
894 | Result wait_all_members_get( Futures && ... futures) { | |
895 | // Fetch the results from the passed futures into Result's initializer | |
896 | // list. It's true that the get() calls here will block the implicit | |
897 | // iteration over futures -- but that doesn't matter because we won't be | |
898 | // done until the slowest of them finishes anyway. As results are | |
899 | // processed in argument-list order rather than order of completion, the | |
900 | // leftmost get() to throw an exception will cause that exception to | |
901 | // propagate to the caller. | |
902 | return Result{ futures.get() ... }; | |
903 | } | |
904 | //] | |
905 | ||
906 | //[wait_all_members | |
907 | // Explicitly pass Result. This can be any type capable of being initialized | |
908 | // from the results of the passed functions, such as a struct. | |
909 | template< typename Result, typename ... Fns > | |
910 | Result wait_all_members( Fns && ... functions) { | |
911 | // Run each of the passed functions on a separate fiber, passing all their | |
912 | // futures to helper function for processing. | |
913 | return wait_all_members_get< Result >( | |
914 | boost::fibers::async( std::forward< Fns >( functions) ) ... ); | |
915 | } | |
916 | //] | |
917 | ||
918 | // used by following example | |
919 | //[wait_Data | |
920 | struct Data { | |
921 | std::string str; | |
922 | double inexact; | |
923 | int exact; | |
924 | ||
925 | friend std::ostream& operator<<( std::ostream& out, Data const& data)/*=; | |
926 | ...*/ | |
927 | //<- | |
928 | { | |
929 | return out << "Data{str='" << data.str << "', inexact=" << data.inexact | |
930 | << ", exact=" << data.exact << "}"; | |
931 | } | |
932 | //-> | |
933 | }; | |
934 | //] | |
935 | ||
936 | // example usage | |
937 | Example wam( runner, "wait_all_members()", [](){ | |
938 | //[wait_all_members_data_ex | |
939 | Data data = wait_all_members< Data >( | |
940 | [](){ return sleeper("wams_left", 100); }, | |
941 | [](){ return sleeper(3.14, 150); }, | |
942 | [](){ return sleeper(17, 50); }); | |
943 | std::cout << "wait_all_members<Data>(success) => " << data << std::endl; | |
944 | //] | |
945 | ||
946 | std::string thrown; | |
947 | try { | |
948 | data = wait_all_members< Data >( | |
949 | [](){ return sleeper("wamf_left", 100, true); }, | |
950 | [](){ return sleeper(3.14, 150); }, | |
951 | [](){ return sleeper(17, 50, true); }); | |
952 | std::cout << "wait_all_members<Data>(fail) => " << data << std::endl; | |
953 | } catch ( std::exception const& e) { | |
954 | thrown = e.what(); | |
955 | } | |
956 | std::cout << "wait_all_members<Data>(fail) threw '" << thrown | |
957 | << '"' << std::endl; | |
958 | ||
959 | //[wait_all_members_vector_ex | |
960 | // If we don't care about obtaining results as soon as they arrive, and we | |
961 | // prefer a result vector in passed argument order rather than completion | |
962 | // order, wait_all_members() is another possible implementation of | |
963 | // wait_all_until_error(). | |
964 | auto strings = wait_all_members< std::vector< std::string > >( | |
965 | [](){ return sleeper("wamv_left", 150); }, | |
966 | [](){ return sleeper("wamv_middle", 100); }, | |
967 | [](){ return sleeper("wamv_right", 50); }); | |
968 | std::cout << "wait_all_members<vector>() =>"; | |
969 | for ( std::string const& str : strings) { | |
970 | std::cout << " '" << str << "'"; | |
971 | } | |
972 | std::cout << std::endl; | |
973 | //] | |
974 | }); | |
975 | ||
976 | ||
977 | /***************************************************************************** | |
978 | * main() | |
979 | *****************************************************************************/ | |
980 | int main( int argc, char *argv[]) { | |
981 | runner.run(); | |
982 | std::cout << "done." << std::endl; | |
983 | return EXIT_SUCCESS; | |
984 | } |