#include <vector>
#include <boost/fiber/all.hpp>
-#include <boost/noncopyable.hpp>
#include <boost/variant/variant.hpp>
#include <boost/variant/get.hpp>
/*****************************************************************************
* Verbose
*****************************************************************************/
-class Verbose: boost::noncopyable {
+class Verbose {
public:
Verbose( std::string const& d):
desc( d) {
std::cout << desc << " stop" << std::endl;
}
+ Verbose( Verbose const&) = delete;
+ Verbose & operator=( Verbose const&) = delete;
+
private:
const std::string desc;
};
// When there's only one function, call this overload
//[wait_first_value_impl
template< typename T, typename Fn >
-void wait_first_value_impl( std::shared_ptr< boost::fibers::unbounded_channel< T > > channel,
+void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
Fn && function) {
- boost::fibers::fiber( [channel, function](){
+ boost::fibers::fiber( [chan, function](){
// Ignore channel_op_status returned by push():
// might be closed; we simply don't care.
- channel->push( function() );
+ chan->push( function() );
}).detach();
}
//]
// When there are two or more functions, call this overload
template< typename T, typename Fn0, typename Fn1, typename ... Fns >
-void wait_first_value_impl( std::shared_ptr< boost::fibers::unbounded_channel< T > > channel,
+void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
Fn0 && function0,
Fn1 && function1,
Fns && ... functions) {
// process the first function using the single-function overload
- wait_first_value_impl< T >( channel,
+ wait_first_value_impl< T >( chan,
std::forward< Fn0 >( function0) );
// then recur to process the rest
- wait_first_value_impl< T >( channel,
+ wait_first_value_impl< T >( chan,
std::forward< Fn1 >( function1),
std::forward< Fns >( functions) ... );
}
typename std::result_of< Fn() >::type
wait_first_value( Fn && function, Fns && ... functions) {
typedef typename std::result_of< Fn() >::type return_t;
- typedef boost::fibers::unbounded_channel< return_t > channel_t;
- auto channelp( std::make_shared< channel_t >() );
+ typedef boost::fibers::buffered_channel< return_t > channel_t;
+ auto chanp( std::make_shared< channel_t >( 64) );
// launch all the relevant fibers
- wait_first_value_impl< return_t >( channelp,
+ wait_first_value_impl< return_t >( chanp,
std::forward< Fn >( function),
std::forward< Fns >( functions) ... );
// retrieve the first value
- return_t value( channelp->value_pop() );
+ return_t value( chanp->value_pop() );
// close the channel: no subsequent push() has to succeed
- channelp->close();
+ chanp->close();
return value;
}
//]
*****************************************************************************/
// When there's only one function, call this overload.
//[wait_first_outcome_impl
-template< typename T, typename CHANNELP, typename Fn >
-void wait_first_outcome_impl( CHANNELP channel, Fn && function) {
+template< typename T, typename CHANP, typename Fn >
+void wait_first_outcome_impl( CHANP chan, Fn && function) {
boost::fibers::fiber(
// Use std::bind() here for C++11 compatibility. C++11 lambda capture
// can't move a move-only Fn type, but bind() can. Let bind() move the
// channel pointer and the function into the bound object, passing
// references into the lambda.
std::bind(
- []( CHANNELP & channel,
+ []( CHANP & chan,
typename std::decay< Fn >::type & function) {
// Instantiate a packaged_task to capture any exception thrown by
// function.
// Pass the corresponding future to consumer. Ignore
// channel_op_status returned by push(): might be closed; we
// simply don't care.
- channel->push( task.get_future() );
+ chan->push( task.get_future() );
},
- channel,
+ chan,
std::forward< Fn >( function)
)).detach();
}
//]
// When there are two or more functions, call this overload
-template< typename T, typename CHANNELP, typename Fn0, typename Fn1, typename ... Fns >
-void wait_first_outcome_impl( CHANNELP channel,
+template< typename T, typename CHANP, typename Fn0, typename Fn1, typename ... Fns >
+void wait_first_outcome_impl( CHANP chan,
Fn0 && function0,
Fn1 && function1,
Fns && ... functions) {
// process the first function using the single-function overload
- wait_first_outcome_impl< T >( channel,
+ wait_first_outcome_impl< T >( chan,
std::forward< Fn0 >( function0) );
// then recur to process the rest
- wait_first_outcome_impl< T >( channel,
+ wait_first_outcome_impl< T >( chan,
std::forward< Fn1 >( function1),
std::forward< Fns >( functions) ... );
}
// exception.
typedef typename std::result_of< Fn() >::type return_t;
typedef boost::fibers::future< return_t > future_t;
- typedef boost::fibers::unbounded_channel< future_t > channel_t;
- auto channelp(std::make_shared< channel_t >() );
+ typedef boost::fibers::buffered_channel< future_t > channel_t;
+ auto chanp(std::make_shared< channel_t >( 64) );
// launch all the relevant fibers
- wait_first_outcome_impl< return_t >( channelp,
+ wait_first_outcome_impl< return_t >( chanp,
std::forward< Fn >( function),
std::forward< Fns >( functions) ... );
// retrieve the first future
- future_t future( channelp->value_pop() );
+ future_t future( chanp->value_pop() );
// close the channel: no subsequent push() has to succeed
- channelp->close();
+ chanp->close();
// either return value or throw exception
return future.get();
}
// exception.
typedef typename std::result_of< typename std::decay< Fn >::type() >::type return_t;
typedef boost::fibers::future< return_t > future_t;
- typedef boost::fibers::unbounded_channel< future_t > channel_t;
- auto channelp( std::make_shared< channel_t >() );
+ typedef boost::fibers::buffered_channel< future_t > channel_t;
+ auto chanp( std::make_shared< channel_t >( 64) );
// launch all the relevant fibers
- wait_first_outcome_impl< return_t >( channelp,
+ wait_first_outcome_impl< return_t >( chanp,
std::forward< Fn >( function),
std::forward< Fns >( functions) ... );
// instantiate exception_list, just in case
// retrieve up to 'count' results -- but stop there!
for ( std::size_t i = 0; i < count; ++i) {
// retrieve the next future
- future_t future( channelp->value_pop() );
+ future_t future( chanp->value_pop() );
// retrieve exception_ptr if any
std::exception_ptr error( future.get_exception_ptr() );
// if no error, then yay, return value
if ( ! error) {
// close the channel: no subsequent push() has to succeed
- channelp->close();
+ chanp->close();
// show caller the value we got
return future.get();
}
template< typename ... Fns >
boost::variant< typename std::result_of< Fns() >::type ... >
wait_first_value_het( Fns && ... functions) {
- // Use unbounded_channel<boost::variant<T1, T2, ...>>; see remarks above.
+ // Use buffered_channel<boost::variant<T1, T2, ...>>; see remarks above.
typedef boost::variant< typename std::result_of< Fns() >::type ... > return_t;
- typedef boost::fibers::unbounded_channel< return_t > channel_t;
- auto channelp( std::make_shared< channel_t >() );
+ typedef boost::fibers::buffered_channel< return_t > channel_t;
+ auto chanp( std::make_shared< channel_t >( 64) );
// launch all the relevant fibers
- wait_first_value_impl< return_t >( channelp,
+ wait_first_value_impl< return_t >( chanp,
std::forward< Fns >( functions) ... );
// retrieve the first value
- return_t value( channelp->value_pop() );
+ return_t value( chanp->value_pop() );
// close the channel: no subsequent push() has to succeed
- channelp->close();
+ chanp->close();
return value;
}
//]
template< typename T >
class nchannel {
public:
- nchannel( std::shared_ptr< boost::fibers::unbounded_channel< T > > cp,
+ nchannel( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
std::size_t lm):
- channel_( cp),
+ chan_( chan),
limit_( lm) {
- assert(channel_);
+ assert(chan_);
if ( 0 == limit_) {
- channel_->close();
+ chan_->close();
}
}
boost::fibers::channel_op_status push( T && va) {
boost::fibers::channel_op_status ok =
- channel_->push( std::forward< T >( va) );
+ chan_->push( std::forward< T >( va) );
if ( ok == boost::fibers::channel_op_status::success &&
--limit_ == 0) {
// after the 'limit_'th successful push, close the channel
- channel_->close();
+ chan_->close();
}
return ok;
}
private:
- std::shared_ptr< boost::fibers::unbounded_channel< T > > channel_;
+ std::shared_ptr< boost::fibers::buffered_channel< T > > chan_;
std::size_t limit_;
};
//]
// When there's only one function, call this overload
//[wait_all_values_impl
template< typename T, typename Fn >
-void wait_all_values_impl( std::shared_ptr< nchannel< T > > channel,
+void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
Fn && function) {
- boost::fibers::fiber( [channel, function](){
- channel->push(function());
+ boost::fibers::fiber( [chan, function](){
+ chan->push(function());
}).detach();
}
//]
// When there are two or more functions, call this overload
template< typename T, typename Fn0, typename Fn1, typename ... Fns >
-void wait_all_values_impl( std::shared_ptr< nchannel< T > > channel,
+void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
Fn0 && function0,
Fn1 && function1,
Fns && ... functions) {
// process the first function using the single-function overload
- wait_all_values_impl< T >( channel, std::forward< Fn0 >( function0) );
+ wait_all_values_impl< T >( chan, std::forward< Fn0 >( function0) );
// then recur to process the rest
- wait_all_values_impl< T >( channel,
+ wait_all_values_impl< T >( chan,
std::forward< Fn1 >( function1),
std::forward< Fns >( functions) ... );
}
//[wait_all_values_source
-// Return a shared_ptr<unbounded_channel<T>> from which the caller can
+// Return a shared_ptr<buffered_channel<T>> from which the caller can
// retrieve each new result as it arrives, until 'closed'.
template< typename Fn, typename ... Fns >
-std::shared_ptr< boost::fibers::unbounded_channel< typename std::result_of< Fn() >::type > >
+std::shared_ptr< boost::fibers::buffered_channel< typename std::result_of< Fn() >::type > >
wait_all_values_source( Fn && function, Fns && ... functions) {
std::size_t count( 1 + sizeof ... ( functions) );
typedef typename std::result_of< Fn() >::type return_t;
- typedef boost::fibers::unbounded_channel< return_t > channel_t;
+ typedef boost::fibers::buffered_channel< return_t > channel_t;
// make the channel
- auto channelp( std::make_shared< channel_t >() );
+ auto chanp( std::make_shared< channel_t >( 64) );
// and make an nchannel facade to close it after 'count' items
- auto ncp( std::make_shared< nchannel< return_t > >( channelp, count) );
+ auto ncp( std::make_shared< nchannel< return_t > >( chanp, count) );
// pass that nchannel facade to all the relevant fibers
wait_all_values_impl< return_t >( ncp,
std::forward< Fn >( function),
std::forward< Fns >( functions) ... );
// then return the channel for consumer
- return channelp;
+ return chanp;
}
//]
results.reserve( count);
// get channel
- std::shared_ptr< boost::fibers::unbounded_channel< return_t > > channel =
+ std::shared_ptr< boost::fibers::buffered_channel< return_t > > chan =
wait_all_values_source( std::forward< Fn >( function),
std::forward< Fns >( functions) ... );
// fill results vector
return_t value;
- while ( boost::fibers::channel_op_status::success == channel->pop(value) ) {
+ while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
results.push_back( value);
}
// return vector to caller
Example wav( runner, "wait_all_values()", [](){
//[wait_all_values_source_ex
- std::shared_ptr< boost::fibers::unbounded_channel< std::string > > channel =
+ std::shared_ptr< boost::fibers::buffered_channel< std::string > > chan =
wait_all_values_source(
[](){ return sleeper("wavs_third", 150); },
[](){ return sleeper("wavs_second", 100); },
[](){ return sleeper("wavs_first", 50); });
std::string value;
- while ( boost::fibers::channel_op_status::success == channel->pop(value) ) {
+ while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
std::cout << "wait_all_values_source() => '" << value
<< "'" << std::endl;
}
* when_all, throw first exception
*****************************************************************************/
//[wait_all_until_error_source
-// Return a shared_ptr<unbounded_channel<future<T>>> from which the caller can
+// Return a shared_ptr<buffered_channel<future<T>>> from which the caller can
// get() each new result as it arrives, until 'closed'.
template< typename Fn, typename ... Fns >
std::shared_ptr<
- boost::fibers::unbounded_channel<
+ boost::fibers::buffered_channel<
boost::fibers::future<
typename std::result_of< Fn() >::type > > >
wait_all_until_error_source( Fn && function, Fns && ... functions) {
std::size_t count( 1 + sizeof ... ( functions) );
typedef typename std::result_of< Fn() >::type return_t;
typedef boost::fibers::future< return_t > future_t;
- typedef boost::fibers::unbounded_channel< future_t > channel_t;
+ typedef boost::fibers::buffered_channel< future_t > channel_t;
// make the channel
- auto channelp( std::make_shared< channel_t >() );
+ auto chanp( std::make_shared< channel_t >( 64) );
// and make an nchannel facade to close it after 'count' items
- auto ncp( std::make_shared< nchannel< future_t > >( channelp, count) );
+ auto ncp( std::make_shared< nchannel< future_t > >( chanp, count) );
// pass that nchannel facade to all the relevant fibers
wait_first_outcome_impl< return_t >( ncp,
std::forward< Fn >( function),
std::forward< Fns >( functions) ... );
// then return the channel for consumer
- return channelp;
+ return chanp;
}
//]
// get channel
std::shared_ptr<
- boost::fibers::unbounded_channel< future_t > > channel(
+ boost::fibers::buffered_channel< future_t > > chan(
wait_all_until_error_source( std::forward< Fn >( function),
std::forward< Fns >( functions) ... ) );
// fill results vector
future_t future;
- while ( boost::fibers::channel_op_status::success == channel->pop( future) ) {
+ while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
results.push_back( future.get() );
}
// return vector to caller
Example waue( runner, "wait_all_until_error()", [](){
//[wait_all_until_error_source_ex
typedef boost::fibers::future< std::string > future_t;
- std::shared_ptr< boost::fibers::unbounded_channel< future_t > > channel =
+ std::shared_ptr< boost::fibers::buffered_channel< future_t > > chan =
wait_all_until_error_source(
[](){ return sleeper("wauess_third", 150); },
[](){ return sleeper("wauess_second", 100); },
[](){ return sleeper("wauess_first", 50); });
future_t future;
- while ( boost::fibers::channel_op_status::success == channel->pop( future) ) {
+ while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
std::string value( future.get() );
std::cout << "wait_all_until_error_source(success) => '" << value
<< "'" << std::endl;
}
//]
- channel = wait_all_until_error_source(
+ chan = wait_all_until_error_source(
[](){ return sleeper("wauesf_third", 150); },
[](){ return sleeper("wauesf_second", 100, true); },
[](){ return sleeper("wauesf_first", 50); });
std::string thrown;
//<-
try {
- while ( boost::fibers::channel_op_status::success == channel->pop( future) ) {
+ while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
std::string value( future.get() );
std::cout << "wait_all_until_error_source(fail) => '" << value
<< "'" << std::endl;
// get channel
std::shared_ptr<
- boost::fibers::unbounded_channel< future_t > > channel(
+ boost::fibers::buffered_channel< future_t > > chan(
wait_all_until_error_source( std::forward< Fn >( function),
std::forward< Fns >( functions) ... ) );
// fill results and/or exceptions vectors
future_t future;
- while ( boost::fibers::channel_op_status::success == channel->pop( future) ) {
+ while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
std::exception_ptr exp = future.get_exception_ptr();
if ( ! exp) {
results.push_back( future.get() );