]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // Copyright Nat Goodspeed 2015. |
2 | // Distributed under the Boost Software License, Version 1.0. | |
3 | // (See accompanying file LICENSE_1_0.txt or copy at | |
4 | // http://www.boost.org/LICENSE_1_0.txt) | |
5 | // | |
6 | #include <algorithm> | |
7 | #include <cassert> | |
8 | #include <chrono> | |
9 | #include <iostream> | |
10 | #include <memory> | |
11 | #include <sstream> | |
12 | #include <string> | |
13 | #include <type_traits> | |
14 | #include <utility> | |
15 | #include <vector> | |
16 | ||
17 | #include <boost/fiber/all.hpp> | |
18 | #include <boost/noncopyable.hpp> | |
19 | #include <boost/variant/variant.hpp> | |
20 | #include <boost/variant/get.hpp> | |
21 | ||
22 | // These are wait_something() functions rather than when_something() | |
23 | // functions. A big part of the point of the Fiber library is to model | |
24 | // sequencing using the processor's instruction pointer rather than chains of | |
25 | // callbacks. The future-oriented when_all() / when_any() functions are still | |
26 | // based on chains of callbacks. With Fiber, we can do better. | |
27 | ||
28 | /***************************************************************************** | |
29 | * Verbose | |
30 | *****************************************************************************/ | |
31 | class Verbose: boost::noncopyable { | |
32 | public: | |
33 | Verbose( std::string const& d): | |
34 | desc( d) { | |
35 | std::cout << desc << " start" << std::endl; | |
36 | } | |
37 | ||
38 | ~Verbose() { | |
39 | std::cout << desc << " stop" << std::endl; | |
40 | } | |
41 | ||
42 | private: | |
43 | const std::string desc; | |
44 | }; | |
45 | ||
46 | /***************************************************************************** | |
47 | * Runner and Example | |
48 | *****************************************************************************/ | |
49 | // collect and ultimately run every Example | |
50 | class Runner { | |
51 | typedef std::vector< std::pair< std::string, std::function< void() > > > function_list; | |
52 | ||
53 | public: | |
54 | void add( std::string const& desc, std::function< void() > const& func) { | |
55 | functions_.push_back( function_list::value_type( desc, func) ); | |
56 | } | |
57 | ||
58 | void run() { | |
59 | for ( function_list::value_type const& pair : functions_) { | |
60 | Verbose v( pair.first); | |
61 | pair.second(); | |
62 | } | |
63 | } | |
64 | ||
65 | private: | |
66 | function_list functions_; | |
67 | }; | |
68 | ||
69 | Runner runner; | |
70 | ||
71 | // Example allows us to embed Runner::add() calls at module scope | |
72 | struct Example { | |
73 | Example( Runner & runner, std::string const& desc, std::function< void() > const& func) { | |
74 | runner.add( desc, func); | |
75 | } | |
76 | }; | |
77 | ||
78 | /***************************************************************************** | |
79 | * example task functions | |
80 | *****************************************************************************/ | |
81 | //[wait_sleeper | |
82 | template< typename T > | |
83 | T sleeper_impl( T item, int ms, bool thrw = false) { | |
84 | std::ostringstream descb, funcb; | |
85 | descb << item; | |
86 | std::string desc( descb.str() ); | |
87 | funcb << " sleeper(" << item << ")"; | |
88 | Verbose v( funcb.str() ); | |
89 | ||
90 | boost::this_fiber::sleep_for( std::chrono::milliseconds( ms) ); | |
91 | if ( thrw) { | |
92 | throw std::runtime_error( desc); | |
93 | } | |
94 | return item; | |
95 | } | |
96 | //] | |
97 | ||
98 | inline | |
99 | std::string sleeper( std::string const& item, int ms, bool thrw = false) { | |
100 | return sleeper_impl( item, ms, thrw); | |
101 | } | |
102 | ||
103 | inline | |
104 | double sleeper( double item, int ms, bool thrw = false) { | |
105 | return sleeper_impl( item, ms, thrw); | |
106 | } | |
107 | ||
108 | inline | |
109 | int sleeper(int item, int ms, bool thrw = false) { | |
110 | return sleeper_impl( item, ms, thrw); | |
111 | } | |
112 | ||
113 | /***************************************************************************** | |
114 | * Done | |
115 | *****************************************************************************/ | |
116 | //[wait_done | |
117 | // Wrap canonical pattern for condition_variable + bool flag | |
118 | struct Done { | |
119 | private: | |
120 | boost::fibers::condition_variable cond; | |
121 | boost::fibers::mutex mutex; | |
122 | bool ready = false; | |
123 | ||
124 | public: | |
125 | typedef std::shared_ptr< Done > ptr; | |
126 | ||
127 | void wait() { | |
128 | std::unique_lock< boost::fibers::mutex > lock( mutex); | |
129 | cond.wait( lock, [this](){ return ready; }); | |
130 | } | |
131 | ||
132 | void notify() { | |
133 | { | |
134 | std::unique_lock< boost::fibers::mutex > lock( mutex); | |
135 | ready = true; | |
136 | } // release mutex | |
137 | cond.notify_one(); | |
138 | } | |
139 | }; | |
140 | //] | |
141 | ||
142 | /***************************************************************************** | |
143 | * when_any, simple completion | |
144 | *****************************************************************************/ | |
145 | //[wait_first_simple_impl | |
146 | // Degenerate case: when there are no functions to wait for, return | |
147 | // immediately. | |
148 | void wait_first_simple_impl( Done::ptr) { | |
149 | } | |
150 | ||
151 | // When there's at least one function to wait for, launch it and recur to | |
152 | // process the rest. | |
153 | template< typename Fn, typename ... Fns > | |
154 | void wait_first_simple_impl( Done::ptr done, Fn && function, Fns && ... functions) { | |
155 | boost::fibers::fiber( [done, function](){ | |
156 | function(); | |
157 | done->notify(); | |
158 | }).detach(); | |
159 | wait_first_simple_impl( done, std::forward< Fns >( functions) ... ); | |
160 | } | |
161 | //] | |
162 | ||
163 | // interface function: instantiate Done, launch tasks, wait for Done | |
164 | //[wait_first_simple | |
165 | template< typename ... Fns > | |
166 | void wait_first_simple( Fns && ... functions) { | |
167 | // Use shared_ptr because each function's fiber will bind it separately, | |
168 | // and we're going to return before the last of them completes. | |
169 | auto done( std::make_shared< Done >() ); | |
170 | wait_first_simple_impl( done, std::forward< Fns >( functions) ... ); | |
171 | done->wait(); | |
172 | } | |
173 | //] | |
174 | ||
175 | // example usage | |
176 | Example wfs( runner, "wait_first_simple()", [](){ | |
177 | //[wait_first_simple_ex | |
178 | wait_first_simple( | |
179 | [](){ sleeper("wfs_long", 150); }, | |
180 | [](){ sleeper("wfs_medium", 100); }, | |
181 | [](){ sleeper("wfs_short", 50); }); | |
182 | //] | |
183 | }); | |
184 | ||
185 | /***************************************************************************** | |
186 | * when_any, return value | |
187 | *****************************************************************************/ | |
188 | // When there's only one function, call this overload | |
189 | //[wait_first_value_impl | |
190 | template< typename T, typename Fn > | |
191 | void wait_first_value_impl( std::shared_ptr< boost::fibers::unbounded_channel< T > > channel, | |
192 | Fn && function) { | |
193 | boost::fibers::fiber( [channel, function](){ | |
194 | // Ignore channel_op_status returned by push(): | |
195 | // might be closed; we simply don't care. | |
196 | channel->push( function() ); | |
197 | }).detach(); | |
198 | } | |
199 | //] | |
200 | ||
201 | // When there are two or more functions, call this overload | |
202 | template< typename T, typename Fn0, typename Fn1, typename ... Fns > | |
203 | void wait_first_value_impl( std::shared_ptr< boost::fibers::unbounded_channel< T > > channel, | |
204 | Fn0 && function0, | |
205 | Fn1 && function1, | |
206 | Fns && ... functions) { | |
207 | // process the first function using the single-function overload | |
208 | wait_first_value_impl< T >( channel, | |
209 | std::forward< Fn0 >( function0) ); | |
210 | // then recur to process the rest | |
211 | wait_first_value_impl< T >( channel, | |
212 | std::forward< Fn1 >( function1), | |
213 | std::forward< Fns >( functions) ... ); | |
214 | } | |
215 | ||
216 | //[wait_first_value | |
217 | // Assume that all passed functions have the same return type. The return type | |
218 | // of wait_first_value() is the return type of the first passed function. It is | |
219 | // simply invalid to pass NO functions. | |
220 | template< typename Fn, typename ... Fns > | |
221 | typename std::result_of< Fn() >::type | |
222 | wait_first_value( Fn && function, Fns && ... functions) { | |
223 | typedef typename std::result_of< Fn() >::type return_t; | |
224 | typedef boost::fibers::unbounded_channel< return_t > channel_t; | |
225 | auto channelp( std::make_shared< channel_t >() ); | |
226 | // launch all the relevant fibers | |
227 | wait_first_value_impl< return_t >( channelp, | |
228 | std::forward< Fn >( function), | |
229 | std::forward< Fns >( functions) ... ); | |
230 | // retrieve the first value | |
231 | return_t value( channelp->value_pop() ); | |
232 | // close the channel: no subsequent push() has to succeed | |
233 | channelp->close(); | |
234 | return value; | |
235 | } | |
236 | //] | |
237 | ||
238 | // example usage | |
239 | Example wfv( runner, "wait_first_value()", [](){ | |
240 | //[wait_first_value_ex | |
241 | std::string result = wait_first_value( | |
242 | [](){ return sleeper("wfv_third", 150); }, | |
243 | [](){ return sleeper("wfv_second", 100); }, | |
244 | [](){ return sleeper("wfv_first", 50); }); | |
245 | std::cout << "wait_first_value() => " << result << std::endl; | |
246 | assert(result == "wfv_first"); | |
247 | //] | |
248 | }); | |
249 | ||
250 | /***************************************************************************** | |
251 | * when_any, produce first outcome, whether result or exception | |
252 | *****************************************************************************/ | |
253 | // When there's only one function, call this overload. | |
254 | //[wait_first_outcome_impl | |
255 | template< typename T, typename CHANNELP, typename Fn > | |
256 | void wait_first_outcome_impl( CHANNELP channel, Fn && function) { | |
257 | boost::fibers::fiber( | |
258 | // Use std::bind() here for C++11 compatibility. C++11 lambda capture | |
259 | // can't move a move-only Fn type, but bind() can. Let bind() move the | |
260 | // channel pointer and the function into the bound object, passing | |
261 | // references into the lambda. | |
262 | std::bind( | |
263 | []( CHANNELP & channel, | |
264 | typename std::decay< Fn >::type & function) { | |
265 | // Instantiate a packaged_task to capture any exception thrown by | |
266 | // function. | |
267 | boost::fibers::packaged_task< T() > task( function); | |
268 | // Immediately run this packaged_task on same fiber. We want | |
269 | // function() to have completed BEFORE we push the future. | |
270 | task(); | |
271 | // Pass the corresponding future to consumer. Ignore | |
272 | // channel_op_status returned by push(): might be closed; we | |
273 | // simply don't care. | |
274 | channel->push( task.get_future() ); | |
275 | }, | |
276 | channel, | |
277 | std::forward< Fn >( function) | |
278 | )).detach(); | |
279 | } | |
280 | //] | |
281 | ||
282 | // When there are two or more functions, call this overload | |
283 | template< typename T, typename CHANNELP, typename Fn0, typename Fn1, typename ... Fns > | |
284 | void wait_first_outcome_impl( CHANNELP channel, | |
285 | Fn0 && function0, | |
286 | Fn1 && function1, | |
287 | Fns && ... functions) { | |
288 | // process the first function using the single-function overload | |
289 | wait_first_outcome_impl< T >( channel, | |
290 | std::forward< Fn0 >( function0) ); | |
291 | // then recur to process the rest | |
292 | wait_first_outcome_impl< T >( channel, | |
293 | std::forward< Fn1 >( function1), | |
294 | std::forward< Fns >( functions) ... ); | |
295 | } | |
296 | ||
297 | // Assume that all passed functions have the same return type. The return type | |
298 | // of wait_first_outcome() is the return type of the first passed function. It is | |
299 | // simply invalid to pass NO functions. | |
300 | //[wait_first_outcome | |
301 | template< typename Fn, typename ... Fns > | |
302 | typename std::result_of< Fn() >::type | |
303 | wait_first_outcome( Fn && function, Fns && ... functions) { | |
304 | // In this case, the value we pass through the channel is actually a | |
305 | // future -- which is already ready. future can carry either a value or an | |
306 | // exception. | |
307 | typedef typename std::result_of< Fn() >::type return_t; | |
308 | typedef boost::fibers::future< return_t > future_t; | |
309 | typedef boost::fibers::unbounded_channel< future_t > channel_t; | |
310 | auto channelp(std::make_shared< channel_t >() ); | |
311 | // launch all the relevant fibers | |
312 | wait_first_outcome_impl< return_t >( channelp, | |
313 | std::forward< Fn >( function), | |
314 | std::forward< Fns >( functions) ... ); | |
315 | // retrieve the first future | |
316 | future_t future( channelp->value_pop() ); | |
317 | // close the channel: no subsequent push() has to succeed | |
318 | channelp->close(); | |
319 | // either return value or throw exception | |
320 | return future.get(); | |
321 | } | |
322 | //] | |
323 | ||
324 | // example usage | |
325 | Example wfo( runner, "wait_first_outcome()", [](){ | |
326 | //[wait_first_outcome_ex | |
327 | std::string result = wait_first_outcome( | |
328 | [](){ return sleeper("wfos_first", 50); }, | |
329 | [](){ return sleeper("wfos_second", 100); }, | |
330 | [](){ return sleeper("wfos_third", 150); }); | |
331 | std::cout << "wait_first_outcome(success) => " << result << std::endl; | |
332 | assert(result == "wfos_first"); | |
333 | ||
334 | std::string thrown; | |
335 | try { | |
336 | result = wait_first_outcome( | |
337 | [](){ return sleeper("wfof_first", 50, true); }, | |
338 | [](){ return sleeper("wfof_second", 100); }, | |
339 | [](){ return sleeper("wfof_third", 150); }); | |
340 | } catch ( std::exception const& e) { | |
341 | thrown = e.what(); | |
342 | } | |
343 | std::cout << "wait_first_outcome(fail) threw '" << thrown | |
344 | << "'" << std::endl; | |
345 | assert(thrown == "wfof_first"); | |
346 | //] | |
347 | }); | |
348 | ||
349 | /***************************************************************************** | |
350 | * when_any, collect exceptions until success; throw exception_list if no | |
351 | * success | |
352 | *****************************************************************************/ | |
353 | // define an exception to aggregate exception_ptrs; prefer | |
354 | // std::exception_list (N4407 et al.) once that becomes available | |
355 | //[exception_list | |
356 | class exception_list : public std::runtime_error { | |
357 | public: | |
358 | exception_list( std::string const& what) : | |
359 | std::runtime_error( what) { | |
360 | } | |
361 | ||
362 | typedef std::vector< std::exception_ptr > bundle_t; | |
363 | ||
364 | // N4407 proposed std::exception_list API | |
365 | typedef bundle_t::const_iterator iterator; | |
366 | ||
367 | std::size_t size() const noexcept { | |
368 | return bundle_.size(); | |
369 | } | |
370 | ||
371 | iterator begin() const noexcept { | |
372 | return bundle_.begin(); | |
373 | } | |
374 | ||
375 | iterator end() const noexcept { | |
376 | return bundle_.end(); | |
377 | } | |
378 | ||
379 | // extension to populate | |
380 | void add( std::exception_ptr ep) { | |
381 | bundle_.push_back( ep); | |
382 | } | |
383 | ||
384 | private: | |
385 | bundle_t bundle_; | |
386 | }; | |
387 | //] | |
388 | ||
389 | // Assume that all passed functions have the same return type. The return type | |
390 | // of wait_first_success() is the return type of the first passed function. It is | |
391 | // simply invalid to pass NO functions. | |
392 | //[wait_first_success | |
393 | template< typename Fn, typename ... Fns > | |
394 | typename std::result_of< Fn() >::type | |
395 | wait_first_success( Fn && function, Fns && ... functions) { | |
396 | std::size_t count( 1 + sizeof ... ( functions) ); | |
397 | // In this case, the value we pass through the channel is actually a | |
398 | // future -- which is already ready. future can carry either a value or an | |
399 | // exception. | |
400 | typedef typename std::result_of< typename std::decay< Fn >::type() >::type return_t; | |
401 | typedef boost::fibers::future< return_t > future_t; | |
402 | typedef boost::fibers::unbounded_channel< future_t > channel_t; | |
403 | auto channelp( std::make_shared< channel_t >() ); | |
404 | // launch all the relevant fibers | |
405 | wait_first_outcome_impl< return_t >( channelp, | |
406 | std::forward< Fn >( function), | |
407 | std::forward< Fns >( functions) ... ); | |
408 | // instantiate exception_list, just in case | |
409 | exception_list exceptions("wait_first_success() produced only errors"); | |
410 | // retrieve up to 'count' results -- but stop there! | |
411 | for ( std::size_t i = 0; i < count; ++i) { | |
412 | // retrieve the next future | |
413 | future_t future( channelp->value_pop() ); | |
414 | // retrieve exception_ptr if any | |
415 | std::exception_ptr error( future.get_exception_ptr() ); | |
416 | // if no error, then yay, return value | |
417 | if ( ! error) { | |
418 | // close the channel: no subsequent push() has to succeed | |
419 | channelp->close(); | |
420 | // show caller the value we got | |
421 | return future.get(); | |
422 | } | |
423 | ||
424 | // error is non-null: collect | |
425 | exceptions.add( error); | |
426 | } | |
427 | // We only arrive here when every passed function threw an exception. | |
428 | // Throw our collection to inform caller. | |
429 | throw exceptions; | |
430 | } | |
431 | //] | |
432 | ||
433 | // example usage | |
434 | Example wfss( runner, "wait_first_success()", [](){ | |
435 | //[wait_first_success_ex | |
436 | std::string result = wait_first_success( | |
437 | [](){ return sleeper("wfss_first", 50, true); }, | |
438 | [](){ return sleeper("wfss_second", 100); }, | |
439 | [](){ return sleeper("wfss_third", 150); }); | |
440 | std::cout << "wait_first_success(success) => " << result << std::endl; | |
441 | assert(result == "wfss_second"); | |
442 | //] | |
443 | ||
444 | std::string thrown; | |
445 | std::size_t count = 0; | |
446 | try { | |
447 | result = wait_first_success( | |
448 | [](){ return sleeper("wfsf_first", 50, true); }, | |
449 | [](){ return sleeper("wfsf_second", 100, true); }, | |
450 | [](){ return sleeper("wfsf_third", 150, true); }); | |
451 | } catch ( exception_list const& e) { | |
452 | thrown = e.what(); | |
453 | count = e.size(); | |
454 | } catch ( std::exception const& e) { | |
455 | thrown = e.what(); | |
456 | } | |
457 | std::cout << "wait_first_success(fail) threw '" << thrown << "': " | |
458 | << count << " errors" << std::endl; | |
459 | assert(thrown == "wait_first_success() produced only errors"); | |
460 | assert(count == 3); | |
461 | }); | |
462 | ||
463 | /***************************************************************************** | |
464 | * when_any, heterogeneous | |
465 | *****************************************************************************/ | |
466 | //[wait_first_value_het | |
467 | // No need to break out the first Fn for interface function: let the compiler | |
468 | // complain if empty. | |
469 | // Our functions have different return types, and we might have to return any | |
470 | // of them. Use a variant, expanding std::result_of<Fn()>::type for each Fn in | |
471 | // parameter pack. | |
472 | template< typename ... Fns > | |
473 | boost::variant< typename std::result_of< Fns() >::type ... > | |
474 | wait_first_value_het( Fns && ... functions) { | |
475 | // Use unbounded_channel<boost::variant<T1, T2, ...>>; see remarks above. | |
476 | typedef boost::variant< typename std::result_of< Fns() >::type ... > return_t; | |
477 | typedef boost::fibers::unbounded_channel< return_t > channel_t; | |
478 | auto channelp( std::make_shared< channel_t >() ); | |
479 | // launch all the relevant fibers | |
480 | wait_first_value_impl< return_t >( channelp, | |
481 | std::forward< Fns >( functions) ... ); | |
482 | // retrieve the first value | |
483 | return_t value( channelp->value_pop() ); | |
484 | // close the channel: no subsequent push() has to succeed | |
485 | channelp->close(); | |
486 | return value; | |
487 | } | |
488 | //] | |
489 | ||
490 | // example usage | |
491 | Example wfvh( runner, "wait_first_value_het()", [](){ | |
492 | //[wait_first_value_het_ex | |
493 | boost::variant< std::string, double, int > result = | |
494 | wait_first_value_het( | |
495 | [](){ return sleeper("wfvh_third", 150); }, | |
496 | [](){ return sleeper(3.14, 100); }, | |
497 | [](){ return sleeper(17, 50); }); | |
498 | std::cout << "wait_first_value_het() => " << result << std::endl; | |
499 | assert(boost::get< int >( result) == 17); | |
500 | //] | |
501 | }); | |
502 | ||
503 | /***************************************************************************** | |
504 | * when_all, simple completion | |
505 | *****************************************************************************/ | |
506 | // Degenerate case: when there are no functions to wait for, return | |
507 | // immediately. | |
508 | void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier >) { | |
509 | } | |
510 | ||
511 | // When there's at least one function to wait for, launch it and recur to | |
512 | // process the rest. | |
513 | //[wait_all_simple_impl | |
514 | template< typename Fn, typename ... Fns > | |
515 | void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier > barrier, | |
516 | Fn && function, Fns && ... functions) { | |
517 | boost::fibers::fiber( | |
518 | std::bind( | |
519 | []( std::shared_ptr< boost::fibers::barrier > & barrier, | |
520 | typename std::decay< Fn >::type & function) mutable { | |
521 | function(); | |
522 | barrier->wait(); | |
523 | }, | |
524 | barrier, | |
525 | std::forward< Fn >( function) | |
526 | )).detach(); | |
527 | wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... ); | |
528 | } | |
529 | //] | |
530 | ||
531 | // interface function: instantiate barrier, launch tasks, wait for barrier | |
532 | //[wait_all_simple | |
533 | template< typename ... Fns > | |
534 | void wait_all_simple( Fns && ... functions) { | |
535 | std::size_t count( sizeof ... ( functions) ); | |
536 | // Initialize a barrier(count+1) because we'll immediately wait on it. We | |
537 | // don't want to wake up until 'count' more fibers wait on it. Even though | |
538 | // we'll stick around until the last of them completes, use shared_ptr | |
539 | // anyway because it's easier to be confident about lifespan issues. | |
540 | auto barrier( std::make_shared< boost::fibers::barrier >( count + 1) ); | |
541 | wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... ); | |
542 | barrier->wait(); | |
543 | } | |
544 | //] | |
545 | ||
546 | // example usage | |
547 | Example was( runner, "wait_all_simple()", [](){ | |
548 | //[wait_all_simple_ex | |
549 | wait_all_simple( | |
550 | [](){ sleeper("was_long", 150); }, | |
551 | [](){ sleeper("was_medium", 100); }, | |
552 | [](){ sleeper("was_short", 50); }); | |
553 | //] | |
554 | }); | |
555 | ||
556 | /***************************************************************************** | |
557 | * when_all, return values | |
558 | *****************************************************************************/ | |
559 | //[wait_nchannel | |
560 | // Introduce a channel facade that closes the channel once a specific number | |
561 | // of items has been pushed. This allows an arbitrary consumer to read until | |
562 | // 'closed' without itself having to count items. | |
563 | template< typename T > | |
564 | class nchannel { | |
565 | public: | |
566 | nchannel( std::shared_ptr< boost::fibers::unbounded_channel< T > > cp, | |
567 | std::size_t lm): | |
568 | channel_( cp), | |
569 | limit_( lm) { | |
570 | assert(channel_); | |
571 | if ( 0 == limit_) { | |
572 | channel_->close(); | |
573 | } | |
574 | } | |
575 | ||
576 | boost::fibers::channel_op_status push( T && va) { | |
577 | boost::fibers::channel_op_status ok = | |
578 | channel_->push( std::forward< T >( va) ); | |
579 | if ( ok == boost::fibers::channel_op_status::success && | |
580 | --limit_ == 0) { | |
581 | // after the 'limit_'th successful push, close the channel | |
582 | channel_->close(); | |
583 | } | |
584 | return ok; | |
585 | } | |
586 | ||
587 | private: | |
588 | std::shared_ptr< boost::fibers::unbounded_channel< T > > channel_; | |
589 | std::size_t limit_; | |
590 | }; | |
591 | //] | |
592 | ||
593 | // When there's only one function, call this overload | |
594 | //[wait_all_values_impl | |
595 | template< typename T, typename Fn > | |
596 | void wait_all_values_impl( std::shared_ptr< nchannel< T > > channel, | |
597 | Fn && function) { | |
598 | boost::fibers::fiber( [channel, function](){ | |
599 | channel->push(function()); | |
600 | }).detach(); | |
601 | } | |
602 | //] | |
603 | ||
604 | // When there are two or more functions, call this overload | |
605 | template< typename T, typename Fn0, typename Fn1, typename ... Fns > | |
606 | void wait_all_values_impl( std::shared_ptr< nchannel< T > > channel, | |
607 | Fn0 && function0, | |
608 | Fn1 && function1, | |
609 | Fns && ... functions) { | |
610 | // process the first function using the single-function overload | |
611 | wait_all_values_impl< T >( channel, std::forward< Fn0 >( function0) ); | |
612 | // then recur to process the rest | |
613 | wait_all_values_impl< T >( channel, | |
614 | std::forward< Fn1 >( function1), | |
615 | std::forward< Fns >( functions) ... ); | |
616 | } | |
617 | ||
618 | //[wait_all_values_source | |
619 | // Return a shared_ptr<unbounded_channel<T>> from which the caller can | |
620 | // retrieve each new result as it arrives, until 'closed'. | |
621 | template< typename Fn, typename ... Fns > | |
622 | std::shared_ptr< boost::fibers::unbounded_channel< typename std::result_of< Fn() >::type > > | |
623 | wait_all_values_source( Fn && function, Fns && ... functions) { | |
624 | std::size_t count( 1 + sizeof ... ( functions) ); | |
625 | typedef typename std::result_of< Fn() >::type return_t; | |
626 | typedef boost::fibers::unbounded_channel< return_t > channel_t; | |
627 | // make the channel | |
628 | auto channelp( std::make_shared< channel_t >() ); | |
629 | // and make an nchannel facade to close it after 'count' items | |
630 | auto ncp( std::make_shared< nchannel< return_t > >( channelp, count) ); | |
631 | // pass that nchannel facade to all the relevant fibers | |
632 | wait_all_values_impl< return_t >( ncp, | |
633 | std::forward< Fn >( function), | |
634 | std::forward< Fns >( functions) ... ); | |
635 | // then return the channel for consumer | |
636 | return channelp; | |
637 | } | |
638 | //] | |
639 | ||
640 | // When all passed functions have completed, return vector<T> containing | |
641 | // collected results. Assume that all passed functions have the same return | |
642 | // type. It is simply invalid to pass NO functions. | |
643 | //[wait_all_values | |
644 | template< typename Fn, typename ... Fns > | |
645 | std::vector< typename std::result_of< Fn() >::type > | |
646 | wait_all_values( Fn && function, Fns && ... functions) { | |
647 | std::size_t count( 1 + sizeof ... ( functions) ); | |
648 | typedef typename std::result_of< Fn() >::type return_t; | |
649 | typedef std::vector< return_t > vector_t; | |
650 | vector_t results; | |
651 | results.reserve( count); | |
652 | ||
653 | // get channel | |
654 | std::shared_ptr< boost::fibers::unbounded_channel< return_t > > channel = | |
655 | wait_all_values_source( std::forward< Fn >( function), | |
656 | std::forward< Fns >( functions) ... ); | |
657 | // fill results vector | |
658 | return_t value; | |
659 | while ( boost::fibers::channel_op_status::success == channel->pop(value) ) { | |
660 | results.push_back( value); | |
661 | } | |
662 | // return vector to caller | |
663 | return results; | |
664 | } | |
665 | //] | |
666 | ||
667 | Example wav( runner, "wait_all_values()", [](){ | |
668 | //[wait_all_values_source_ex | |
669 | std::shared_ptr< boost::fibers::unbounded_channel< std::string > > channel = | |
670 | wait_all_values_source( | |
671 | [](){ return sleeper("wavs_third", 150); }, | |
672 | [](){ return sleeper("wavs_second", 100); }, | |
673 | [](){ return sleeper("wavs_first", 50); }); | |
674 | std::string value; | |
675 | while ( boost::fibers::channel_op_status::success == channel->pop(value) ) { | |
676 | std::cout << "wait_all_values_source() => '" << value | |
677 | << "'" << std::endl; | |
678 | } | |
679 | //] | |
680 | ||
681 | //[wait_all_values_ex | |
682 | std::vector< std::string > values = | |
683 | wait_all_values( | |
684 | [](){ return sleeper("wav_late", 150); }, | |
685 | [](){ return sleeper("wav_middle", 100); }, | |
686 | [](){ return sleeper("wav_early", 50); }); | |
687 | //] | |
688 | std::cout << "wait_all_values() =>"; | |
689 | for ( std::string const& v : values) { | |
690 | std::cout << " '" << v << "'"; | |
691 | } | |
692 | std::cout << std::endl; | |
693 | }); | |
694 | ||
695 | /***************************************************************************** | |
696 | * when_all, throw first exception | |
697 | *****************************************************************************/ | |
698 | //[wait_all_until_error_source | |
699 | // Return a shared_ptr<unbounded_channel<future<T>>> from which the caller can | |
700 | // get() each new result as it arrives, until 'closed'. | |
701 | template< typename Fn, typename ... Fns > | |
702 | std::shared_ptr< | |
703 | boost::fibers::unbounded_channel< | |
704 | boost::fibers::future< | |
705 | typename std::result_of< Fn() >::type > > > | |
706 | wait_all_until_error_source( Fn && function, Fns && ... functions) { | |
707 | std::size_t count( 1 + sizeof ... ( functions) ); | |
708 | typedef typename std::result_of< Fn() >::type return_t; | |
709 | typedef boost::fibers::future< return_t > future_t; | |
710 | typedef boost::fibers::unbounded_channel< future_t > channel_t; | |
711 | // make the channel | |
712 | auto channelp( std::make_shared< channel_t >() ); | |
713 | // and make an nchannel facade to close it after 'count' items | |
714 | auto ncp( std::make_shared< nchannel< future_t > >( channelp, count) ); | |
715 | // pass that nchannel facade to all the relevant fibers | |
716 | wait_first_outcome_impl< return_t >( ncp, | |
717 | std::forward< Fn >( function), | |
718 | std::forward< Fns >( functions) ... ); | |
719 | // then return the channel for consumer | |
720 | return channelp; | |
721 | } | |
722 | //] | |
723 | ||
724 | // When all passed functions have completed, return vector<T> containing | |
725 | // collected results, or throw the first exception thrown by any of the passed | |
726 | // functions. Assume that all passed functions have the same return type. It | |
727 | // is simply invalid to pass NO functions. | |
728 | //[wait_all_until_error | |
729 | template< typename Fn, typename ... Fns > | |
730 | std::vector< typename std::result_of< Fn() >::type > | |
731 | wait_all_until_error( Fn && function, Fns && ... functions) { | |
732 | std::size_t count( 1 + sizeof ... ( functions) ); | |
733 | typedef typename std::result_of< Fn() >::type return_t; | |
734 | typedef typename boost::fibers::future< return_t > future_t; | |
735 | typedef std::vector< return_t > vector_t; | |
736 | vector_t results; | |
737 | results.reserve( count); | |
738 | ||
739 | // get channel | |
740 | std::shared_ptr< | |
741 | boost::fibers::unbounded_channel< future_t > > channel( | |
742 | wait_all_until_error_source( std::forward< Fn >( function), | |
743 | std::forward< Fns >( functions) ... ) ); | |
744 | // fill results vector | |
745 | future_t future; | |
746 | while ( boost::fibers::channel_op_status::success == channel->pop( future) ) { | |
747 | results.push_back( future.get() ); | |
748 | } | |
749 | // return vector to caller | |
750 | return results; | |
751 | } | |
752 | //] | |
753 | ||
754 | Example waue( runner, "wait_all_until_error()", [](){ | |
755 | //[wait_all_until_error_source_ex | |
756 | typedef boost::fibers::future< std::string > future_t; | |
757 | std::shared_ptr< boost::fibers::unbounded_channel< future_t > > channel = | |
758 | wait_all_until_error_source( | |
759 | [](){ return sleeper("wauess_third", 150); }, | |
760 | [](){ return sleeper("wauess_second", 100); }, | |
761 | [](){ return sleeper("wauess_first", 50); }); | |
762 | future_t future; | |
763 | while ( boost::fibers::channel_op_status::success == channel->pop( future) ) { | |
764 | std::string value( future.get() ); | |
765 | std::cout << "wait_all_until_error_source(success) => '" << value | |
766 | << "'" << std::endl; | |
767 | } | |
768 | //] | |
769 | ||
770 | channel = wait_all_until_error_source( | |
771 | [](){ return sleeper("wauesf_third", 150); }, | |
772 | [](){ return sleeper("wauesf_second", 100, true); }, | |
773 | [](){ return sleeper("wauesf_first", 50); }); | |
774 | //[wait_all_until_error_ex | |
775 | std::string thrown; | |
776 | //<- | |
777 | try { | |
778 | while ( boost::fibers::channel_op_status::success == channel->pop( future) ) { | |
779 | std::string value( future.get() ); | |
780 | std::cout << "wait_all_until_error_source(fail) => '" << value | |
781 | << "'" << std::endl; | |
782 | } | |
783 | } catch ( std::exception const& e) { | |
784 | thrown = e.what(); | |
785 | } | |
786 | std::cout << "wait_all_until_error_source(fail) threw '" << thrown | |
787 | << "'" << std::endl; | |
788 | ||
789 | thrown.clear(); | |
790 | //-> | |
791 | try { | |
792 | std::vector< std::string > values = wait_all_until_error( | |
793 | [](){ return sleeper("waue_late", 150); }, | |
794 | [](){ return sleeper("waue_middle", 100, true); }, | |
795 | [](){ return sleeper("waue_early", 50); }); | |
796 | //<- | |
797 | std::cout << "wait_all_until_error(fail) =>"; | |
798 | for ( std::string const& v : values) { | |
799 | std::cout << " '" << v << "'"; | |
800 | } | |
801 | std::cout << std::endl; | |
802 | //-> | |
803 | } catch ( std::exception const& e) { | |
804 | thrown = e.what(); | |
805 | } | |
806 | std::cout << "wait_all_until_error(fail) threw '" << thrown | |
807 | << "'" << std::endl; | |
808 | //] | |
809 | }); | |
810 | ||
811 | /***************************************************************************** | |
812 | * when_all, collect exceptions | |
813 | *****************************************************************************/ | |
814 | // When all passed functions have succeeded, return vector<T> containing | |
815 | // collected results, or throw exception_list containing all exceptions thrown | |
816 | // by any of the passed functions. Assume that all passed functions have the | |
817 | // same return type. It is simply invalid to pass NO functions. | |
818 | //[wait_all_collect_errors | |
819 | template< typename Fn, typename ... Fns > | |
820 | std::vector< typename std::result_of< Fn() >::type > | |
821 | wait_all_collect_errors( Fn && function, Fns && ... functions) { | |
822 | std::size_t count( 1 + sizeof ... ( functions) ); | |
823 | typedef typename std::result_of< Fn() >::type return_t; | |
824 | typedef typename boost::fibers::future< return_t > future_t; | |
825 | typedef std::vector< return_t > vector_t; | |
826 | vector_t results; | |
827 | results.reserve( count); | |
828 | exception_list exceptions("wait_all_collect_errors() exceptions"); | |
829 | ||
830 | // get channel | |
831 | std::shared_ptr< | |
832 | boost::fibers::unbounded_channel< future_t > > channel( | |
833 | wait_all_until_error_source( std::forward< Fn >( function), | |
834 | std::forward< Fns >( functions) ... ) ); | |
835 | // fill results and/or exceptions vectors | |
836 | future_t future; | |
837 | while ( boost::fibers::channel_op_status::success == channel->pop( future) ) { | |
838 | std::exception_ptr exp = future.get_exception_ptr(); | |
839 | if ( ! exp) { | |
840 | results.push_back( future.get() ); | |
841 | } else { | |
842 | exceptions.add( exp); | |
843 | } | |
844 | } | |
845 | // if there were any exceptions, throw | |
846 | if ( exceptions.size() ) { | |
847 | throw exceptions; | |
848 | } | |
849 | // no exceptions: return vector to caller | |
850 | return results; | |
851 | } | |
852 | //] | |
853 | ||
854 | Example wace( runner, "wait_all_collect_errors()", [](){ | |
855 | std::vector< std::string > values = wait_all_collect_errors( | |
856 | [](){ return sleeper("waces_late", 150); }, | |
857 | [](){ return sleeper("waces_middle", 100); }, | |
858 | [](){ return sleeper("waces_early", 50); }); | |
859 | std::cout << "wait_all_collect_errors(success) =>"; | |
860 | for ( std::string const& v : values) { | |
861 | std::cout << " '" << v << "'"; | |
862 | } | |
863 | std::cout << std::endl; | |
864 | ||
865 | std::string thrown; | |
866 | std::size_t errors = 0; | |
867 | try { | |
868 | values = wait_all_collect_errors( | |
869 | [](){ return sleeper("wacef_late", 150, true); }, | |
870 | [](){ return sleeper("wacef_middle", 100, true); }, | |
871 | [](){ return sleeper("wacef_early", 50); }); | |
872 | std::cout << "wait_all_collect_errors(fail) =>"; | |
873 | for ( std::string const& v : values) { | |
874 | std::cout << " '" << v << "'"; | |
875 | } | |
876 | std::cout << std::endl; | |
877 | } catch ( exception_list const& e) { | |
878 | thrown = e.what(); | |
879 | errors = e.size(); | |
880 | } catch ( std::exception const& e) { | |
881 | thrown = e.what(); | |
882 | } | |
883 | std::cout << "wait_all_collect_errors(fail) threw '" << thrown | |
884 | << "': " << errors << " errors" << std::endl; | |
885 | }); | |
886 | ||
887 | /***************************************************************************** | |
888 | * when_all, heterogeneous | |
889 | *****************************************************************************/ | |
890 | //[wait_all_members_get | |
891 | template< typename Result, typename ... Futures > | |
892 | Result wait_all_members_get( Futures && ... futures) { | |
893 | // Fetch the results from the passed futures into Result's initializer | |
894 | // list. It's true that the get() calls here will block the implicit | |
895 | // iteration over futures -- but that doesn't matter because we won't be | |
896 | // done until the slowest of them finishes anyway. As results are | |
897 | // processed in argument-list order rather than order of completion, the | |
898 | // leftmost get() to throw an exception will cause that exception to | |
899 | // propagate to the caller. | |
900 | return Result{ futures.get() ... }; | |
901 | } | |
902 | //] | |
903 | ||
904 | //[wait_all_members | |
905 | // Explicitly pass Result. This can be any type capable of being initialized | |
906 | // from the results of the passed functions, such as a struct. | |
907 | template< typename Result, typename ... Fns > | |
908 | Result wait_all_members( Fns && ... functions) { | |
909 | // Run each of the passed functions on a separate fiber, passing all their | |
910 | // futures to helper function for processing. | |
911 | return wait_all_members_get< Result >( | |
912 | boost::fibers::async( std::forward< Fns >( functions) ) ... ); | |
913 | } | |
914 | //] | |
915 | ||
916 | // used by following example | |
917 | //[wait_Data | |
918 | struct Data { | |
919 | std::string str; | |
920 | double inexact; | |
921 | int exact; | |
922 | ||
923 | friend std::ostream& operator<<( std::ostream& out, Data const& data)/*=; | |
924 | ...*/ | |
925 | //<- | |
926 | { | |
927 | return out << "Data{str='" << data.str << "', inexact=" << data.inexact | |
928 | << ", exact=" << data.exact << "}"; | |
929 | } | |
930 | //-> | |
931 | }; | |
932 | //] | |
933 | ||
934 | // example usage | |
935 | Example wam( runner, "wait_all_members()", [](){ | |
936 | //[wait_all_members_data_ex | |
937 | Data data = wait_all_members< Data >( | |
938 | [](){ return sleeper("wams_left", 100); }, | |
939 | [](){ return sleeper(3.14, 150); }, | |
940 | [](){ return sleeper(17, 50); }); | |
941 | std::cout << "wait_all_members<Data>(success) => " << data << std::endl; | |
942 | //] | |
943 | ||
944 | std::string thrown; | |
945 | try { | |
946 | data = wait_all_members< Data >( | |
947 | [](){ return sleeper("wamf_left", 100, true); }, | |
948 | [](){ return sleeper(3.14, 150); }, | |
949 | [](){ return sleeper(17, 50, true); }); | |
950 | std::cout << "wait_all_members<Data>(fail) => " << data << std::endl; | |
951 | } catch ( std::exception const& e) { | |
952 | thrown = e.what(); | |
953 | } | |
954 | std::cout << "wait_all_members<Data>(fail) threw '" << thrown | |
955 | << '"' << std::endl; | |
956 | ||
957 | //[wait_all_members_vector_ex | |
958 | // If we don't care about obtaining results as soon as they arrive, and we | |
959 | // prefer a result vector in passed argument order rather than completion | |
960 | // order, wait_all_members() is another possible implementation of | |
961 | // wait_all_until_error(). | |
962 | auto strings = wait_all_members< std::vector< std::string > >( | |
963 | [](){ return sleeper("wamv_left", 150); }, | |
964 | [](){ return sleeper("wamv_middle", 100); }, | |
965 | [](){ return sleeper("wamv_right", 50); }); | |
966 | std::cout << "wait_all_members<vector>() =>"; | |
967 | for ( std::string const& str : strings) { | |
968 | std::cout << " '" << str << "'"; | |
969 | } | |
970 | std::cout << std::endl; | |
971 | //] | |
972 | }); | |
973 | ||
974 | ||
975 | /***************************************************************************** | |
976 | * main() | |
977 | *****************************************************************************/ | |
978 | int main( int argc, char *argv[]) { | |
979 | runner.run(); | |
980 | std::cout << "done." << std::endl; | |
981 | return EXIT_SUCCESS; | |
982 | } |