Home | Libraries | People | FAQ | More |
As soon as we want to collect return values from all the task functions,
we can see right away how to reuse wait_first_value()
's
channel<T> for the purpose. All we have to do is avoid closing it
after the first value!
But in fact, collecting multiple values raises an interesting question: do we really want to wait until the slowest of them has arrived? Wouldn't we rather process each result as soon as it becomes available?
Fortunately we can present both APIs. Let's define wait_all_values_source()
to return shared_ptr<unbounded_channel<T>>
.[6]
Given wait_all_values_source()
, it's straightforward to implement wait_all_values()
:
template< typename Fn, typename ... Fns > std::vector< typename std::result_of< Fn() >::type > wait_all_values( Fn && function, Fns && ... functions) { std::size_t count( 1 + sizeof ... ( functions) ); typedef typename std::result_of< Fn() >::type return_t; typedef std::vector< return_t > vector_t; vector_t results; results.reserve( count); // get channel std::shared_ptr< boost::fibers::unbounded_channel< return_t > > channel = wait_all_values_source( std::forward< Fn >( function), std::forward< Fns >( functions) ... ); // fill results vector return_t value; while ( boost::fibers::channel_op_status::success == channel->pop(value) ) { results.push_back( value); } // return vector to caller return results; }
It might be called like this:
std::vector< std::string > values = wait_all_values( [](){ return sleeper("wav_late", 150); }, [](){ return sleeper("wav_middle", 100); }, [](){ return sleeper("wav_early", 50); });
As you can see from the loop in wait_all_values()
, instead of requiring its caller to count
values, we define wait_all_values_source()
to unbounded_channel::close()
the
channel when done. But how do we do that? Each producer fiber is independent.
It has no idea whether it is the last one to unbounded_channel::push()
a
value.
We can address that problem with a counting
façade for the unbounded_channel<>
. In fact, our façade need only
support the producer end of the channel.
// Introduce a channel facade that closes the channel once a specific number // of items has been pushed. This allows an arbitrary consumer to read until // 'closed' without itself having to count items. template< typename T > class nchannel { public: nchannel( std::shared_ptr< boost::fibers::unbounded_channel< T > > cp, std::size_t lm): channel_( cp), limit_( lm) { assert(channel_); if ( 0 == limit_) { channel_->close(); } } boost::fibers::channel_op_status push( T && va) { boost::fibers::channel_op_status ok = channel_->push( std::forward< T >( va) ); if ( ok == boost::fibers::channel_op_status::success && --limit_ == 0) { // after the 'limit_'th successful push, close the channel channel_->close(); } return ok; } private: std::shared_ptr< boost::fibers::unbounded_channel< T > > channel_; std::size_t limit_; };
Armed with nchannel<>
, we can implement wait_all_values_source()
.
It starts just like wait_first_value()
. The difference is that we wrap
the unbounded_channel<T>
with an nchannel<T>
to pass to the producer fibers.
Then, of course, instead of popping the first value, closing the channel
and returning it, we simply return the shared_ptr<unbounded_channel<T>>
.
// Return a shared_ptr<unbounded_channel<T>> from which the caller can // retrieve each new result as it arrives, until 'closed'. template< typename Fn, typename ... Fns > std::shared_ptr< boost::fibers::unbounded_channel< typename std::result_of< Fn() >::type > > wait_all_values_source( Fn && function, Fns && ... functions) { std::size_t count( 1 + sizeof ... ( functions) ); typedef typename std::result_of< Fn() >::type return_t; typedef boost::fibers::unbounded_channel< return_t > channel_t; // make the channel auto channelp( std::make_shared< channel_t >() ); // and make an nchannel facade to close it after 'count' items auto ncp( std::make_shared< nchannel< return_t > >( channelp, count) ); // pass that nchannel facade to all the relevant fibers wait_all_values_impl< return_t >( ncp, std::forward< Fn >( function), std::forward< Fns >( functions) ... ); // then return the channel for consumer return channelp; }
For example:
std::shared_ptr< boost::fibers::unbounded_channel< std::string > > channel = wait_all_values_source( [](){ return sleeper("wavs_third", 150); }, [](){ return sleeper("wavs_second", 100); }, [](){ return sleeper("wavs_first", 50); }); std::string value; while ( boost::fibers::channel_op_status::success == channel->pop(value) ) { std::cout << "wait_all_values_source() => '" << value << "'" << std::endl; }
wait_all_values_impl()
really is just like wait_first_value_impl()
except for the use of nchannel<T>
rather than unbounded_channel<T>
:
template< typename T, typename Fn > void wait_all_values_impl( std::shared_ptr< nchannel< T > > channel, Fn && function) { boost::fibers::fiber( [channel, function](){ channel->push(function()); }).detach(); }
[6]
We could have used either bounded_channel<>
or
unbounded_channel<>
. We chose unbounded_channel<>
on the assumption that its simpler semantics imply a cheaper implementation.