]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/fiber/buffered_channel.hpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / boost / fiber / buffered_channel.hpp
1
2 // Copyright Oliver Kowalke 2016.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 //
7
8 #ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H
9 #define BOOST_FIBERS_BUFFERED_CHANNEL_H
10
11 #include <atomic>
12 #include <chrono>
13 #include <cstddef>
14 #include <cstdint>
15 #include <memory>
16 #include <type_traits>
17
18 #include <boost/config.hpp>
19
20 #include <boost/fiber/channel_op_status.hpp>
21 #include <boost/fiber/context.hpp>
22 #include <boost/fiber/detail/config.hpp>
23 #include <boost/fiber/detail/convert.hpp>
24 #include <boost/fiber/detail/spinlock.hpp>
25 #include <boost/fiber/exceptions.hpp>
26
27 #ifdef BOOST_HAS_ABI_HEADERS
28 # include BOOST_ABI_PREFIX
29 #endif
30
31 namespace boost {
32 namespace fibers {
33
34 template< typename T >
35 class buffered_channel {
36 public:
37 typedef T value_type;
38
39 private:
40 typedef context::wait_queue_t wait_queue_type;
41 typedef T slot_type;
42
43 mutable detail::spinlock splk_{};
44 wait_queue_type waiting_producers_{};
45 wait_queue_type waiting_consumers_{};
46 slot_type * slots_;
47 std::size_t pidx_{ 0 };
48 std::size_t cidx_{ 0 };
49 std::size_t capacity_;
50 bool closed_{ false };
51
52 bool is_full_() const noexcept {
53 return cidx_ == ((pidx_ + 1) % capacity_);
54 }
55
56 bool is_empty_() const noexcept {
57 return cidx_ == pidx_;
58 }
59
60 bool is_closed_() const noexcept {
61 return closed_;
62 }
63
64 public:
65 explicit buffered_channel( std::size_t capacity) :
66 capacity_{ capacity } {
67 if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) {
68 throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
69 "boost fiber: buffer capacity is invalid" };
70 }
71 slots_ = new slot_type[capacity_];
72 }
73
74 ~buffered_channel() {
75 close();
76 delete [] slots_;
77 }
78
79 buffered_channel( buffered_channel const&) = delete;
80 buffered_channel & operator=( buffered_channel const&) = delete;
81
82 bool is_closed() const noexcept {
83 detail::spinlock_lock lk{ splk_ };
84 return is_closed_();
85 }
86
87 void close() noexcept {
88 context * active_ctx = context::active();
89 detail::spinlock_lock lk{ splk_ };
90 closed_ = true;
91 // notify all waiting producers
92 while ( ! waiting_producers_.empty() ) {
93 context * producer_ctx = & waiting_producers_.front();
94 waiting_producers_.pop_front();
95 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
96 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
97 // notify before timeout
98 intrusive_ptr_release( producer_ctx);
99 // notify context
100 active_ctx->schedule( producer_ctx);
101 } else if ( static_cast< std::intptr_t >( 0) == expected) {
102 // no timed-wait op.
103 // notify context
104 active_ctx->schedule( producer_ctx);
105 } else {
106 // timed-wait op.
107 // expected == -1: notify after timeout, same timed-wait op.
108 // expected == <any>: notify after timeout, another timed-wait op. was already started
109 intrusive_ptr_release( producer_ctx);
110 // re-schedule next
111 }
112 }
113 // notify all waiting consumers
114 while ( ! waiting_consumers_.empty() ) {
115 context * consumer_ctx = & waiting_consumers_.front();
116 waiting_consumers_.pop_front();
117 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
118 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
119 // notify before timeout
120 intrusive_ptr_release( consumer_ctx);
121 // notify context
122 active_ctx->schedule( consumer_ctx);
123 } else if ( static_cast< std::intptr_t >( 0) == expected) {
124 // no timed-wait op.
125 // notify context
126 active_ctx->schedule( consumer_ctx);
127 } else {
128 // timed-wait op.
129 // expected == -1: notify after timeout, same timed-wait op.
130 // expected == <any>: notify after timeout, another timed-wait op. was already started
131 intrusive_ptr_release( consumer_ctx);
132 // re-schedule next
133 }
134 }
135 }
136
137 channel_op_status try_push( value_type const& value) {
138 context * active_ctx = context::active();
139 detail::spinlock_lock lk{ splk_ };
140 if ( BOOST_UNLIKELY( is_closed_() ) ) {
141 return channel_op_status::closed;
142 } else if ( is_full_() ) {
143 return channel_op_status::full;
144 } else {
145 slots_[pidx_] = value;
146 pidx_ = (pidx_ + 1) % capacity_;
147 // notify one waiting consumer
148 while ( ! waiting_consumers_.empty() ) {
149 context * consumer_ctx = & waiting_consumers_.front();
150 waiting_consumers_.pop_front();
151 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
152 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
153 // notify before timeout
154 intrusive_ptr_release( consumer_ctx);
155 // notify context
156 active_ctx->schedule( consumer_ctx);
157 break;
158 } else if ( static_cast< std::intptr_t >( 0) == expected) {
159 // no timed-wait op.
160 // notify context
161 active_ctx->schedule( consumer_ctx);
162 break;
163 } else {
164 // timed-wait op.
165 // expected == -1: notify after timeout, same timed-wait op.
166 // expected == <any>: notify after timeout, another timed-wait op. was already started
167 intrusive_ptr_release( consumer_ctx);
168 // re-schedule next
169 }
170 }
171 return channel_op_status::success;
172 }
173 }
174
175 channel_op_status try_push( value_type && value) {
176 context * active_ctx = context::active();
177 detail::spinlock_lock lk{ splk_ };
178 if ( BOOST_UNLIKELY( is_closed_() ) ) {
179 return channel_op_status::closed;
180 } else if ( is_full_() ) {
181 return channel_op_status::full;
182 } else {
183 slots_[pidx_] = std::move( value);
184 pidx_ = (pidx_ + 1) % capacity_;
185 // notify one waiting consumer
186 while ( ! waiting_consumers_.empty() ) {
187 context * consumer_ctx = & waiting_consumers_.front();
188 waiting_consumers_.pop_front();
189 lk.unlock();
190 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
191 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
192 // notify before timeout
193 intrusive_ptr_release( consumer_ctx);
194 // notify context
195 active_ctx->schedule( consumer_ctx);
196 break;
197 } else if ( static_cast< std::intptr_t >( 0) == expected) {
198 // no timed-wait op.
199 // notify context
200 active_ctx->schedule( consumer_ctx);
201 break;
202 } else {
203 // timed-wait op.
204 // expected == -1: notify after timeout, same timed-wait op.
205 // expected == <any>: notify after timeout, another timed-wait op. was already started
206 intrusive_ptr_release( consumer_ctx);
207 // re-schedule next
208 }
209 }
210 return channel_op_status::success;
211 }
212 }
213
214 channel_op_status push( value_type const& value) {
215 context * active_ctx = context::active();
216 for (;;) {
217 detail::spinlock_lock lk{ splk_ };
218 if ( BOOST_UNLIKELY( is_closed_() ) ) {
219 return channel_op_status::closed;
220 } else if ( is_full_() ) {
221 active_ctx->wait_link( waiting_producers_);
222 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
223 // suspend this producer
224 active_ctx->suspend( lk);
225 } else {
226 slots_[pidx_] = value;
227 pidx_ = (pidx_ + 1) % capacity_;
228 // notify one waiting consumer
229 while ( ! waiting_consumers_.empty() ) {
230 context * consumer_ctx = & waiting_consumers_.front();
231 waiting_consumers_.pop_front();
232 lk.unlock();
233 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
234 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
235 // notify before timeout
236 intrusive_ptr_release( consumer_ctx);
237 // notify context
238 active_ctx->schedule( consumer_ctx);
239 break;
240 } else if ( static_cast< std::intptr_t >( 0) == expected) {
241 // no timed-wait op.
242 // notify context
243 active_ctx->schedule( consumer_ctx);
244 break;
245 } else {
246 // timed-wait op.
247 // expected == -1: notify after timeout, same timed-wait op.
248 // expected == <any>: notify after timeout, another timed-wait op. was already started
249 intrusive_ptr_release( consumer_ctx);
250 // re-schedule next
251 }
252 }
253 return channel_op_status::success;
254 }
255 }
256 }
257
258 channel_op_status push( value_type && value) {
259 context * active_ctx = context::active();
260 for (;;) {
261 detail::spinlock_lock lk{ splk_ };
262 if ( BOOST_UNLIKELY( is_closed_() ) ) {
263 return channel_op_status::closed;
264 } else if ( is_full_() ) {
265 active_ctx->wait_link( waiting_producers_);
266 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
267 // suspend this producer
268 active_ctx->suspend( lk);
269 } else {
270 slots_[pidx_] = std::move( value);
271 pidx_ = (pidx_ + 1) % capacity_;
272 // notify one waiting consumer
273 while ( ! waiting_consumers_.empty() ) {
274 context * consumer_ctx = & waiting_consumers_.front();
275 waiting_consumers_.pop_front();
276 lk.unlock();
277 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
278 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
279 // notify before timeout
280 intrusive_ptr_release( consumer_ctx);
281 // notify context
282 active_ctx->schedule( consumer_ctx);
283 break;
284 } else if ( static_cast< std::intptr_t >( 0) == expected) {
285 // no timed-wait op.
286 // notify context
287 active_ctx->schedule( consumer_ctx);
288 break;
289 } else {
290 // timed-wait op.
291 // expected == -1: notify after timeout, same timed-wait op.
292 // expected == <any>: notify after timeout, another timed-wait op. was already started
293 intrusive_ptr_release( consumer_ctx);
294 // re-schedule next
295 }
296 }
297 return channel_op_status::success;
298 }
299 }
300 }
301
302 template< typename Rep, typename Period >
303 channel_op_status push_wait_for( value_type const& value,
304 std::chrono::duration< Rep, Period > const& timeout_duration) {
305 return push_wait_until( value,
306 std::chrono::steady_clock::now() + timeout_duration);
307 }
308
309 template< typename Rep, typename Period >
310 channel_op_status push_wait_for( value_type && value,
311 std::chrono::duration< Rep, Period > const& timeout_duration) {
312 return push_wait_until( std::forward< value_type >( value),
313 std::chrono::steady_clock::now() + timeout_duration);
314 }
315
316 template< typename Clock, typename Duration >
317 channel_op_status push_wait_until( value_type const& value,
318 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
319 context * active_ctx = context::active();
320 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
321 for (;;) {
322 detail::spinlock_lock lk{ splk_ };
323 if ( BOOST_UNLIKELY( is_closed_() ) ) {
324 return channel_op_status::closed;
325 } else if ( is_full_() ) {
326 active_ctx->wait_link( waiting_producers_);
327 intrusive_ptr_add_ref( active_ctx);
328 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
329 // suspend this producer
330 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
331 // relock local lk
332 lk.lock();
333 // remove from waiting-queue
334 waiting_producers_.remove( * active_ctx);
335 return channel_op_status::timeout;
336 }
337 } else {
338 slots_[pidx_] = value;
339 pidx_ = (pidx_ + 1) % capacity_;
340 // notify one waiting consumer
341 while ( ! waiting_consumers_.empty() ) {
342 context * consumer_ctx = & waiting_consumers_.front();
343 waiting_consumers_.pop_front();
344 lk.unlock();
345 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
346 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
347 // notify before timeout
348 intrusive_ptr_release( consumer_ctx);
349 // notify context
350 active_ctx->schedule( consumer_ctx);
351 break;
352 } else if ( static_cast< std::intptr_t >( 0) == expected) {
353 // no timed-wait op.
354 // notify context
355 active_ctx->schedule( consumer_ctx);
356 break;
357 } else {
358 // timed-wait op.
359 // expected == -1: notify after timeout, same timed-wait op.
360 // expected == <any>: notify after timeout, another timed-wait op. was already started
361 intrusive_ptr_release( consumer_ctx);
362 // re-schedule next
363 }
364 }
365 return channel_op_status::success;
366 }
367 }
368 }
369
370 template< typename Clock, typename Duration >
371 channel_op_status push_wait_until( value_type && value,
372 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
373 context * active_ctx = context::active();
374 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
375 for (;;) {
376 detail::spinlock_lock lk{ splk_ };
377 if ( BOOST_UNLIKELY( is_closed_() ) ) {
378 return channel_op_status::closed;
379 } else if ( is_full_() ) {
380 active_ctx->wait_link( waiting_producers_);
381 intrusive_ptr_add_ref( active_ctx);
382 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
383 // suspend this producer
384 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
385 // relock local lk
386 lk.lock();
387 // remove from waiting-queue
388 waiting_producers_.remove( * active_ctx);
389 return channel_op_status::timeout;
390 }
391 } else {
392 slots_[pidx_] = std::move( value);
393 pidx_ = (pidx_ + 1) % capacity_;
394 // notify one waiting consumer
395 while ( ! waiting_consumers_.empty() ) {
396 context * consumer_ctx = & waiting_consumers_.front();
397 waiting_consumers_.pop_front();
398 lk.unlock();
399 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
400 if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
401 // notify before timeout
402 intrusive_ptr_release( consumer_ctx);
403 // notify context
404 active_ctx->schedule( consumer_ctx);
405 break;
406 } else if ( static_cast< std::intptr_t >( 0) == expected) {
407 // no timed-wait op.
408 // notify context
409 active_ctx->schedule( consumer_ctx);
410 break;
411 } else {
412 // timed-wait op.
413 // expected == -1: notify after timeout, same timed-wait op.
414 // expected == <any>: notify after timeout, another timed-wait op. was already started
415 intrusive_ptr_release( consumer_ctx);
416 // re-schedule next
417 }
418 }
419 return channel_op_status::success;
420 }
421 }
422 }
423
424 channel_op_status try_pop( value_type & value) {
425 context * active_ctx = context::active();
426 detail::spinlock_lock lk{ splk_ };
427 if ( is_empty_() ) {
428 return is_closed_()
429 ? channel_op_status::closed
430 : channel_op_status::empty;
431 } else {
432 value = std::move( slots_[cidx_]);
433 cidx_ = (cidx_ + 1) % capacity_;
434 // notify one waiting producer
435 while ( ! waiting_producers_.empty() ) {
436 context * producer_ctx = & waiting_producers_.front();
437 waiting_producers_.pop_front();
438 lk.unlock();
439 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
440 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
441 // notify before timeout
442 intrusive_ptr_release( producer_ctx);
443 // notify context
444 active_ctx->schedule( producer_ctx);
445 break;
446 } else if ( static_cast< std::intptr_t >( 0) == expected) {
447 // no timed-wait op.
448 // notify context
449 active_ctx->schedule( producer_ctx);
450 break;
451 } else {
452 // timed-wait op.
453 // expected == -1: notify after timeout, same timed-wait op.
454 // expected == <any>: notify after timeout, another timed-wait op. was already started
455 intrusive_ptr_release( producer_ctx);
456 // re-schedule next
457 }
458 }
459 return channel_op_status::success;
460 }
461 }
462
463 channel_op_status pop( value_type & value) {
464 context * active_ctx = context::active();
465 for (;;) {
466 detail::spinlock_lock lk{ splk_ };
467 if ( is_empty_() ) {
468 if ( BOOST_UNLIKELY( is_closed_() ) ) {
469 return channel_op_status::closed;
470 } else {
471 active_ctx->wait_link( waiting_consumers_);
472 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
473 // suspend this consumer
474 active_ctx->suspend( lk);
475 }
476 } else {
477 value = std::move( slots_[cidx_]);
478 cidx_ = (cidx_ + 1) % capacity_;
479 // notify one waiting producer
480 while ( ! waiting_producers_.empty() ) {
481 context * producer_ctx = & waiting_producers_.front();
482 waiting_producers_.pop_front();
483 lk.unlock();
484 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
485 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
486 // notify before timeout
487 intrusive_ptr_release( producer_ctx);
488 // notify context
489 active_ctx->schedule( producer_ctx);
490 break;
491 } else if ( static_cast< std::intptr_t >( 0) == expected) {
492 // no timed-wait op.
493 // notify context
494 active_ctx->schedule( producer_ctx);
495 break;
496 } else {
497 // timed-wait op.
498 // expected == -1: notify after timeout, same timed-wait op.
499 // expected == <any>: notify after timeout, another timed-wait op. was already started
500 intrusive_ptr_release( producer_ctx);
501 // re-schedule next
502 }
503 }
504 return channel_op_status::success;
505 }
506 }
507 }
508
509 value_type value_pop() {
510 context * active_ctx = context::active();
511 for (;;) {
512 detail::spinlock_lock lk{ splk_ };
513 if ( is_empty_() ) {
514 if ( BOOST_UNLIKELY( is_closed_() ) ) {
515 throw fiber_error{
516 std::make_error_code( std::errc::operation_not_permitted),
517 "boost fiber: channel is closed" };
518 } else {
519 active_ctx->wait_link( waiting_consumers_);
520 active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
521 // suspend this consumer
522 active_ctx->suspend( lk);
523 }
524 } else {
525 value_type value = std::move( slots_[cidx_]);
526 cidx_ = (cidx_ + 1) % capacity_;
527 // notify one waiting producer
528 while ( ! waiting_producers_.empty() ) {
529 context * producer_ctx = & waiting_producers_.front();
530 waiting_producers_.pop_front();
531 lk.unlock();
532 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
533 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
534 // notify before timeout
535 intrusive_ptr_release( producer_ctx);
536 // notify context
537 active_ctx->schedule( producer_ctx);
538 break;
539 } else if ( static_cast< std::intptr_t >( 0) == expected) {
540 // no timed-wait op.
541 // notify context
542 active_ctx->schedule( producer_ctx);
543 break;
544 } else {
545 // timed-wait op.
546 // expected == -1: notify after timeout, same timed-wait op.
547 // expected == <any>: notify after timeout, another timed-wait op. was already started
548 intrusive_ptr_release( producer_ctx);
549 // re-schedule next
550 }
551 }
552 return std::move( value);
553 }
554 }
555 }
556
557 template< typename Rep, typename Period >
558 channel_op_status pop_wait_for( value_type & value,
559 std::chrono::duration< Rep, Period > const& timeout_duration) {
560 return pop_wait_until( value,
561 std::chrono::steady_clock::now() + timeout_duration);
562 }
563
564 template< typename Clock, typename Duration >
565 channel_op_status pop_wait_until( value_type & value,
566 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
567 context * active_ctx = context::active();
568 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
569 for (;;) {
570 detail::spinlock_lock lk{ splk_ };
571 if ( is_empty_() ) {
572 if ( BOOST_UNLIKELY( is_closed_() ) ) {
573 return channel_op_status::closed;
574 } else {
575 active_ctx->wait_link( waiting_consumers_);
576 intrusive_ptr_add_ref( active_ctx);
577 active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
578 // suspend this consumer
579 if ( ! active_ctx->wait_until( timeout_time, lk) ) {
580 // relock local lk
581 lk.lock();
582 // remove from waiting-queue
583 waiting_consumers_.remove( * active_ctx);
584 return channel_op_status::timeout;
585 }
586 }
587 } else {
588 value = std::move( slots_[cidx_]);
589 cidx_ = (cidx_ + 1) % capacity_;
590 // notify one waiting producer
591 while ( ! waiting_producers_.empty() ) {
592 context * producer_ctx = & waiting_producers_.front();
593 waiting_producers_.pop_front();
594 lk.unlock();
595 std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
596 if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
597 // notify before timeout
598 intrusive_ptr_release( producer_ctx);
599 // notify context
600 active_ctx->schedule( producer_ctx);
601 break;
602 } else if ( static_cast< std::intptr_t >( 0) == expected) {
603 // no timed-wait op.
604 // notify context
605 active_ctx->schedule( producer_ctx);
606 break;
607 } else {
608 // timed-wait op.
609 // expected == -1: notify after timeout, same timed-wait op.
610 // expected == <any>: notify after timeout, another timed-wait op. was already started
611 intrusive_ptr_release( producer_ctx);
612 // re-schedule next
613 }
614 }
615 return channel_op_status::success;
616 }
617 }
618 }
619
620 class iterator : public std::iterator< std::input_iterator_tag, typename std::remove_reference< value_type >::type > {
621 private:
622 typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
623
624 buffered_channel * chan_{ nullptr };
625 storage_type storage_;
626
627 void increment_() {
628 BOOST_ASSERT( nullptr != chan_);
629 try {
630 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
631 } catch ( fiber_error const&) {
632 chan_ = nullptr;
633 }
634 }
635
636 public:
637 typedef typename iterator::pointer pointer_t;
638 typedef typename iterator::reference reference_t;
639
640 iterator() noexcept = default;
641
642 explicit iterator( buffered_channel< T > * chan) noexcept :
643 chan_{ chan } {
644 increment_();
645 }
646
647 iterator( iterator const& other) noexcept :
648 chan_{ other.chan_ } {
649 }
650
651 iterator & operator=( iterator const& other) noexcept {
652 if ( BOOST_LIKELY( this != & other) ) {
653 chan_ = other.chan_;
654 }
655 return * this;
656 }
657
658 bool operator==( iterator const& other) const noexcept {
659 return other.chan_ == chan_;
660 }
661
662 bool operator!=( iterator const& other) const noexcept {
663 return other.chan_ != chan_;
664 }
665
666 iterator & operator++() {
667 increment_();
668 return * this;
669 }
670
671 iterator operator++( int) = delete;
672
673 reference_t operator*() noexcept {
674 return * reinterpret_cast< value_type * >( std::addressof( storage_) );
675 }
676
677 pointer_t operator->() noexcept {
678 return reinterpret_cast< value_type * >( std::addressof( storage_) );
679 }
680 };
681
682 friend class iterator;
683 };
684
685 template< typename T >
686 typename buffered_channel< T >::iterator
687 begin( buffered_channel< T > & chan) {
688 return typename buffered_channel< T >::iterator( & chan);
689 }
690
691 template< typename T >
692 typename buffered_channel< T >::iterator
693 end( buffered_channel< T > &) {
694 return typename buffered_channel< T >::iterator();
695 }
696
697 }}
698
699 #ifdef BOOST_HAS_ABI_HEADERS
700 # include BOOST_ABI_SUFFIX
701 #endif
702
703 #endif // BOOST_FIBERS_BUFFERED_CHANNEL_H