[/ Copyright Oliver Kowalke 2013. Distributed under the Boost Software License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt ] [section:channels Channels] __boost_fiber__ provides a bounded and a unbounded channel suitable to synchonize fibers via message passing. typedef boost::fibers::unbounded_channel< int > channel_t; void send( channel_t & channel) { for ( int i = 0; i < 5; ++i) { channel.push( i); } channel.close(); } void recv( channel_t & channel) { int i; while ( boost::fibers::channel_op_status::success == channel.pop(i) ) { std::cout << "received " << i << std::endl; } } channel_t channel; boost::fibers::fiber f1( std::bind( send, ref( channel) ) ); boost::fibers::fiber f2( std::bind( recv, ref( channel) ) ); f1.join(); f2.join(); [#class_channel_op_status] [heading Enumeration `channel_op_status`] channel operations return the state of the channel. enum class channel_op_status { success, empty, full, closed, timeout }; [heading `success`] [variablelist [[Effects:] [Operation was successful.]] ] [heading `empty`] [variablelist [[Effects:] [channel is empty, operation failed.]] ] [heading `full`] [variablelist [[Effects:] [channel is full, operation failed.]] ] [heading `closed`] [variablelist [[Effects:] [channel is closed, operation failed.]] ] [heading `timeout`] [variablelist [[Effects:] [The operation did not become ready before specified timeout elapsed.]] ] [template_heading unbounded_channel] #include namespace boost { namespace fibers { template< typename T, typename __Allocator__ = __allocator__ > class unbounded_channel { public: typedef T value_type; explicit unbounded_channel( __Allocator__ const& alloc = Allocator() ) noexcept; unbounded_channel( unbounded_channel const& other) = delete; unbounded_channel & operator=( unbounded_channel const& other) = delete; void close() noexcept; channel_op_status push( value_type const& va); channel_op_status push( value_type && va); channel_op_status pop( value_type & va); value_type value_pop(); channel_op_status try_pop( value_type & va); template< typename Rep, typename Period > channel_op_status pop_wait_for( value_type & va, std::chrono::duration< Rep, Period > const& timeout_duration); template< typename Clock, typename Duration > channel_op_status pop_wait_until( value_type & va, std::chrono::time_point< Clock, Duration > const& timeout_time); }; }} [heading Constructor] explicit unbounded_channel( __Allocator__ const& alloc = Allocator() ) noexcept; [variablelist [[Effects:] [Constructs an object of class `unbounded_channel`. Internal nodes are allocated using `alloc` - C++11-allocators are supported.]] [[Throws:] [Nothing.]] [[See also:] [__Allocator__ concept, __allocator__]] ] [template xchannel_close[cls] [member_heading [cls]..close] void close() noexcept; [variablelist [[Effects:] [Deactivates the channel. No values can be put after calling `this->close()`. Fibers blocked in `this->pop()`, `this->pop_wait_for()` or `this->pop_wait_until()` will return `closed`. Fibers blocked in `this->value_pop()` will receive an exception.]] [[Throws:] [Nothing.]] [[Note:] [`close()` is like closing a pipe. It informs waiting consumers that no more values will arrive.]] ] ] [xchannel_close unbounded_channel] [template xchannel_push_effects[enqueues] If channel is closed, returns `closed`. [enqueues] the value in the channel, wakes up a fiber blocked on `this->pop()`, `this->value_pop()`, `this->pop_wait_for()` or `this->pop_wait_until()` and returns `success`.] [member_heading unbounded_channel..push] channel_op_status push( value_type const& va); channel_op_status push( value_type && va); [variablelist [[Effects:] [[xchannel_push_effects Otherwise enqueues]]] [[Throws:] [Exceptions thrown by memory allocation and copying or moving `va`.]] ] [template xchannel_pop[cls unblocking] [member_heading [cls]..pop] channel_op_status pop( value_type & va); [variablelist [[Effects:] [Dequeues a value from the channel. If the channel is empty, the fiber gets suspended until at least one new item is `push()`ed (return value `success` and `va` contains dequeued value) or the channel gets `close()`d (return value `closed`)[unblocking]]] [[Throws:] [Nothing]] ] ] [xchannel_pop unbounded_channel .] [template xchannel_value_pop[cls unblocking] [member_heading [cls]..value_pop] value_type value_pop(); [variablelist [[Effects:] [Dequeues a value from the channel. If the channel is empty, the fiber gets suspended until at least one new item is `push()`ed or the channel gets `close()`d (which throws an exception)[unblocking]]] [[Throws:] [`fiber_error` if `*this` is closed]] [[Error conditions:] [`std::errc::operation_not_permitted`]] ] ] [xchannel_value_pop unbounded_channel .] [template xchannel_try_pop[cls unblocking] [member_heading [cls]..try_pop] channel_op_status try_pop( value_type & va); [variablelist [[Effects:] [If channel is empty, returns `empty`. If channel is closed, returns `closed`. Otherwise it returns `success` and `va` contains the dequeued value[unblocking]]] [[Throws:] [Exceptions thrown by copy- or move-operations.]] ] ] [xchannel_try_pop unbounded_channel .] [template xchannel_pop_wait_until_effects[endtime unblocking] If channel is not empty, immediately dequeues a value from the channel. Otherwise the fiber gets suspended until at least one new item is `push()`ed (return value `success` and `va` contains dequeued value), or the channel gets `close()`d (return value `closed`), or the system time reaches [endtime] (return value `timeout`)[unblocking]] [template xchannel_pop_wait_for[cls unblocking] [member_heading [cls]..pop_wait_for] template< typename Rep, typename Period > channel_op_status pop_wait_for( value_type & va, std::chrono::duration< Rep, Period > const& timeout_duration) [variablelist [[Effects:] [Accepts `std::chrono::duration` and internally computes a timeout time as (system time + `timeout_duration`). [xchannel_pop_wait_until_effects the computed timeout time..[unblocking]]]] [[Throws:] [timeout-related exceptions.]] ] ] [xchannel_pop_wait_for unbounded_channel .] [template xchannel_pop_wait_until[cls unblocking] [member_heading [cls]..pop_wait_until] template< typename Clock, typename Duration > channel_op_status pop_wait_until( value_type & va, std::chrono::time_point< Clock, Duration > const& timeout_time) [variablelist [[Effects:] [Accepts a `std::chrono::time_point< Clock, Duration >`. [xchannel_pop_wait_until_effects the passed `time_point`..[unblocking]]]] [[Throws:] [timeout-related exceptions.]] ] ] [xchannel_pop_wait_until unbounded_channel .] [template_heading bounded_channel] #include namespace boost { namespace fibers { template< typename T, typename __Allocator__ = __allocator__ > class bounded_channel { public: typedef T value_type; bounded_channel( std::size_t wm, __Allocator__ const& alloc = Allocator() ); bounded_channel( std::size_t hwm, std::size_t lwm, __Allocator__ const& alloc = Allocator() ); bounded_channel( bounded_channel const& other) = delete; bounded_channel & operator=( bounded_channel const& other) = delete; std::size_t upper_bound() const noexcept; std::size_t lower_bound() const noexcept; void close() noexcept; channel_op_status push( value_type const& va); channel_op_status push( value_type && va); template< typename Rep, typename Period > channel_op_status push_wait_for( value_type const& va, std::chrono::duration< Rep, Period > const& timeout_duration); channel_op_status push_wait_for( value_type && va, std::chrono::duration< Rep, Period > const& timeout_duration); template< typename Clock, typename Duration > channel_op_status push_wait_until( value_type const& va, std::chrono::time_point< Clock, Duration > const& timeout_time); template< typename Clock, typename Duration > channel_op_status push_wait_until( value_type && va, std::chrono::time_point< Clock, Duration > const& timeout_time); channel_op_status try_push( value_type const& va); channel_op_status try_push( value_type && va); channel_op_status pop( value_type & va); value_type value_pop(); template< typename Rep, typename Period > channel_op_status pop_wait_for( value_type & va, std::chrono::duration< Rep, Period > const& timeout_duration); template< typename Clock, typename Duration > channel_op_status pop_wait_until( value_type & va, std::chrono::time_point< Clock, Duration > const& timeout_time); channel_op_status try_pop( value_type & va); }; }} [heading Constructor] bounded_channel( std::size_t wm, __Allocator__ const& alloc = Allocator() ); bounded_channel( std::size_t hwm, std::size_t lwm, __Allocator__ const& alloc = Allocator() ); [variablelist [[Preconditions:] [`hwm > lwm`]] [[Effects:] [Constructs an object of class `bounded_channel`. The constructor with two arguments constructs an object of class `bounded_channel` with a high-watermark of `hwm` and a low-watermark of `lwm` items. The constructor with one `std::size_t` argument is effectively the same as `bounded_channel(wm, (wm-1), alloc)`. Internal nodes are allocated using `alloc` - C++11-allocators are supported.]] [[Throws:] [`fiber_error`]] [[Error Conditions:] [ [*invalid_argument]: if `lwm >= hwm`.]] [[Notes:] [Once the number of values in the channel reaches `hwm`, any call to `push()`, `push_wait_for()` or `push_wait_until()` will block until the number of values in the channel is at most `lwm`. That is, if `lwm < (hwm-1)`, the channel can be in a state in which `push()`, `push_wait_for()` or `push_wait_until()` calls will block (channel is full) even though the number of values in the channel is less than `hwm`.]] [[See also:] [__Allocator__ concept, __allocator__]] ] [member_heading bounded_channel..upper_bound] std::size_t upper_bound() const noexcept; [variablelist [[Returns:] [the high-watermark with which `*this` was constructed.]] [[Throws:] [Nothing.]] ] [member_heading bounded_channel..lower_bound] std::size_t lower_bound() const noexcept; [variablelist [[Returns:] [the low-watermark with which `*this` was constructed.]] [[Throws:] [Nothing.]] ] [xchannel_close bounded_channel] [template bounded_channel_push_effects[or] [xchannel_push_effects If channel is not full, enqueues] Otherwise the calling fiber is suspended until the number of values in the channel drops to `lwm` (return value `success`)[or] the channel is `close()`d (return value `closed`)] [member_heading bounded_channel..push] channel_op_status push( value_type const& va); channel_op_status push( value_type && va); [variablelist [[Effects:] [[bounded_channel_push_effects or].]] [[Throws:] [exceptions thrown by memory allocation and copying or moving `va`.]] ] [member_heading bounded_channel..push_wait_for] template< typename Rep, typename Period > channel_op_status push_wait_for( value_type const& va, std::chrono::duration< Rep, Period > const& timeout_duration); template< typename Rep, typename Period > channel_op_status push_wait_for( value_type && va, std::chrono::duration< Rep, Period > const& timeout_duration); [variablelist [[Effects:] [Accepts `std::chrono::duration` and internally computes a time_point as (system time + `timeout_duration`). [bounded_channel_push_effects ,], or the system time reaches the computed time_point (return value `timeout`).]] [[Throws:] [exceptions thrown by memory allocation and copying or moving `va` or timeout-related exceptions.]] ] [member_heading bounded_channel..push_wait_until] template< typename Clock, typename Duration > channel_op_status push_wait_until( value_type const& va, std::chrono::time_point< Clock, Duration > const& timeout_time); template< typename Clock, typename Duration > channel_op_status push_wait_until( value_type && va, std::chrono::time_point< Clock, Duration > const& timeout_time); [variablelist [[Effects:] [Accepts an absolute `timeout_time` in any supported time_point type. [bounded_channel_push_effects ,], or the system time reaches the passed time_point (return value `timeout`).]] [[Throws:] [exceptions thrown by memory allocation and copying or moving `va` or timeout-related exceptions.]] ] [member_heading bounded_channel..try_push] channel_op_status try_push( value_type const& va); channel_op_status try_push( value_type && va); [variablelist [[Effects:] [If channel is full, returns `full`. [xchannel_push_effects Otherwise enqueues]]] [[Throws:] [Exceptions thrown by memory allocation and copying or moving `va`.]] ] [template bounded_pop_unblocking[] Once the number of items remaining in the channel drops to `lwm`, any fibers blocked on `push()`, `push_wait_for()` or `push_wait_until()` may resume.] [xchannel_pop bounded_channel... [bounded_pop_unblocking]] [xchannel_value_pop bounded_channel... [bounded_pop_unblocking]] [xchannel_try_pop bounded_channel... [bounded_pop_unblocking]] [xchannel_pop_wait_for bounded_channel... [bounded_pop_unblocking]] [xchannel_pop_wait_until bounded_channel... [bounded_pop_unblocking]] [endsect]