]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/lockfree/include/boost/lockfree/spsc_queue.hpp
bump version to 12.2.2-pve1
[ceph.git] / ceph / src / boost / libs / lockfree / include / boost / lockfree / spsc_queue.hpp
1 // lock-free single-producer/single-consumer ringbuffer
2 // this algorithm is implemented in various projects (linux kernel)
3 //
4 // Copyright (C) 2009-2013 Tim Blechmann
5 //
6 // Distributed under the Boost Software License, Version 1.0. (See
7 // accompanying file LICENSE_1_0.txt or copy at
8 // http://www.boost.org/LICENSE_1_0.txt)
9
10 #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
11 #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
12
13 #include <algorithm>
14 #include <memory>
15
16 #include <boost/aligned_storage.hpp>
17 #include <boost/assert.hpp>
18 #include <boost/static_assert.hpp>
19 #include <boost/utility.hpp>
20 #include <boost/utility/enable_if.hpp>
21 #include <boost/config.hpp> // for BOOST_LIKELY
22
23 #include <boost/type_traits/has_trivial_destructor.hpp>
24 #include <boost/type_traits/is_convertible.hpp>
25
26 #include <boost/lockfree/detail/atomic.hpp>
27 #include <boost/lockfree/detail/copy_payload.hpp>
28 #include <boost/lockfree/detail/parameter.hpp>
29 #include <boost/lockfree/detail/prefix.hpp>
30
31 #include <boost/lockfree/lockfree_forward.hpp>
32
33 #ifdef BOOST_HAS_PRAGMA_ONCE
34 #pragma once
35 #endif
36
37 namespace boost {
38 namespace lockfree {
39 namespace detail {
40
41 typedef parameter::parameters<boost::parameter::optional<tag::capacity>,
42 boost::parameter::optional<tag::allocator>
43 > ringbuffer_signature;
44
45 template <typename T>
46 class ringbuffer_base
47 {
48 #ifndef BOOST_DOXYGEN_INVOKED
49 protected:
50 typedef std::size_t size_t;
51 static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t);
52 atomic<size_t> write_index_;
53 char padding1[padding_size]; /* force read_index and write_index to different cache lines */
54 atomic<size_t> read_index_;
55
56 BOOST_DELETED_FUNCTION(ringbuffer_base(ringbuffer_base const&))
57 BOOST_DELETED_FUNCTION(ringbuffer_base& operator= (ringbuffer_base const&))
58
59 protected:
60 ringbuffer_base(void):
61 write_index_(0), read_index_(0)
62 {}
63
64 static size_t next_index(size_t arg, size_t max_size)
65 {
66 size_t ret = arg + 1;
67 while (BOOST_UNLIKELY(ret >= max_size))
68 ret -= max_size;
69 return ret;
70 }
71
72 static size_t read_available(size_t write_index, size_t read_index, size_t max_size)
73 {
74 if (write_index >= read_index)
75 return write_index - read_index;
76
77 const size_t ret = write_index + max_size - read_index;
78 return ret;
79 }
80
81 static size_t write_available(size_t write_index, size_t read_index, size_t max_size)
82 {
83 size_t ret = read_index - write_index - 1;
84 if (write_index >= read_index)
85 ret += max_size;
86 return ret;
87 }
88
89 size_t read_available(size_t max_size) const
90 {
91 size_t write_index = write_index_.load(memory_order_acquire);
92 const size_t read_index = read_index_.load(memory_order_relaxed);
93 return read_available(write_index, read_index, max_size);
94 }
95
96 size_t write_available(size_t max_size) const
97 {
98 size_t write_index = write_index_.load(memory_order_relaxed);
99 const size_t read_index = read_index_.load(memory_order_acquire);
100 return write_available(write_index, read_index, max_size);
101 }
102
103 bool push(T const & t, T * buffer, size_t max_size)
104 {
105 const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
106 const size_t next = next_index(write_index, max_size);
107
108 if (next == read_index_.load(memory_order_acquire))
109 return false; /* ringbuffer is full */
110
111 new (buffer + write_index) T(t); // copy-construct
112
113 write_index_.store(next, memory_order_release);
114
115 return true;
116 }
117
118 size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size)
119 {
120 return push(input_buffer, input_buffer + input_count, internal_buffer, max_size) - input_buffer;
121 }
122
123 template <typename ConstIterator>
124 ConstIterator push(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size)
125 {
126 // FIXME: avoid std::distance
127
128 const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
129 const size_t read_index = read_index_.load(memory_order_acquire);
130 const size_t avail = write_available(write_index, read_index, max_size);
131
132 if (avail == 0)
133 return begin;
134
135 size_t input_count = std::distance(begin, end);
136 input_count = (std::min)(input_count, avail);
137
138 size_t new_write_index = write_index + input_count;
139
140 const ConstIterator last = boost::next(begin, input_count);
141
142 if (write_index + input_count > max_size) {
143 /* copy data in two sections */
144 const size_t count0 = max_size - write_index;
145 const ConstIterator midpoint = boost::next(begin, count0);
146
147 std::uninitialized_copy(begin, midpoint, internal_buffer + write_index);
148 std::uninitialized_copy(midpoint, last, internal_buffer);
149 new_write_index -= max_size;
150 } else {
151 std::uninitialized_copy(begin, last, internal_buffer + write_index);
152
153 if (new_write_index == max_size)
154 new_write_index = 0;
155 }
156
157 write_index_.store(new_write_index, memory_order_release);
158 return last;
159 }
160
161 template <typename Functor>
162 bool consume_one(Functor & functor, T * buffer, size_t max_size)
163 {
164 const size_t write_index = write_index_.load(memory_order_acquire);
165 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
166 if ( empty(write_index, read_index) )
167 return false;
168
169 T & object_to_consume = buffer[read_index];
170 functor( object_to_consume );
171 object_to_consume.~T();
172
173 size_t next = next_index(read_index, max_size);
174 read_index_.store(next, memory_order_release);
175 return true;
176 }
177
178 template <typename Functor>
179 bool consume_one(Functor const & functor, T * buffer, size_t max_size)
180 {
181 const size_t write_index = write_index_.load(memory_order_acquire);
182 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
183 if ( empty(write_index, read_index) )
184 return false;
185
186 T & object_to_consume = buffer[read_index];
187 functor( object_to_consume );
188 object_to_consume.~T();
189
190 size_t next = next_index(read_index, max_size);
191 read_index_.store(next, memory_order_release);
192 return true;
193 }
194
195 template <typename Functor>
196 size_t consume_all (Functor const & functor, T * internal_buffer, size_t max_size)
197 {
198 const size_t write_index = write_index_.load(memory_order_acquire);
199 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
200
201 const size_t avail = read_available(write_index, read_index, max_size);
202
203 if (avail == 0)
204 return 0;
205
206 const size_t output_count = avail;
207
208 size_t new_read_index = read_index + output_count;
209
210 if (read_index + output_count > max_size) {
211 /* copy data in two sections */
212 const size_t count0 = max_size - read_index;
213 const size_t count1 = output_count - count0;
214
215 run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
216 run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
217
218 new_read_index -= max_size;
219 } else {
220 run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
221
222 if (new_read_index == max_size)
223 new_read_index = 0;
224 }
225
226 read_index_.store(new_read_index, memory_order_release);
227 return output_count;
228 }
229
230 template <typename Functor>
231 size_t consume_all (Functor & functor, T * internal_buffer, size_t max_size)
232 {
233 const size_t write_index = write_index_.load(memory_order_acquire);
234 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
235
236 const size_t avail = read_available(write_index, read_index, max_size);
237
238 if (avail == 0)
239 return 0;
240
241 const size_t output_count = avail;
242
243 size_t new_read_index = read_index + output_count;
244
245 if (read_index + output_count > max_size) {
246 /* copy data in two sections */
247 const size_t count0 = max_size - read_index;
248 const size_t count1 = output_count - count0;
249
250 run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
251 run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
252
253 new_read_index -= max_size;
254 } else {
255 run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
256
257 if (new_read_index == max_size)
258 new_read_index = 0;
259 }
260
261 read_index_.store(new_read_index, memory_order_release);
262 return output_count;
263 }
264
265 size_t pop (T * output_buffer, size_t output_count, T * internal_buffer, size_t max_size)
266 {
267 const size_t write_index = write_index_.load(memory_order_acquire);
268 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
269
270 const size_t avail = read_available(write_index, read_index, max_size);
271
272 if (avail == 0)
273 return 0;
274
275 output_count = (std::min)(output_count, avail);
276
277 size_t new_read_index = read_index + output_count;
278
279 if (read_index + output_count > max_size) {
280 /* copy data in two sections */
281 const size_t count0 = max_size - read_index;
282 const size_t count1 = output_count - count0;
283
284 copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer);
285 copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0);
286
287 new_read_index -= max_size;
288 } else {
289 copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer);
290 if (new_read_index == max_size)
291 new_read_index = 0;
292 }
293
294 read_index_.store(new_read_index, memory_order_release);
295 return output_count;
296 }
297
298 template <typename OutputIterator>
299 size_t pop_to_output_iterator (OutputIterator it, T * internal_buffer, size_t max_size)
300 {
301 const size_t write_index = write_index_.load(memory_order_acquire);
302 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
303
304 const size_t avail = read_available(write_index, read_index, max_size);
305 if (avail == 0)
306 return 0;
307
308 size_t new_read_index = read_index + avail;
309
310 if (read_index + avail > max_size) {
311 /* copy data in two sections */
312 const size_t count0 = max_size - read_index;
313 const size_t count1 = avail - count0;
314
315 it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it);
316 copy_and_delete(internal_buffer, internal_buffer + count1, it);
317
318 new_read_index -= max_size;
319 } else {
320 copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it);
321 if (new_read_index == max_size)
322 new_read_index = 0;
323 }
324
325 read_index_.store(new_read_index, memory_order_release);
326 return avail;
327 }
328
329 const T& front(const T * internal_buffer) const
330 {
331 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
332 return *(internal_buffer + read_index);
333 }
334
335 T& front(T * internal_buffer)
336 {
337 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
338 return *(internal_buffer + read_index);
339 }
340 #endif
341
342
343 public:
344 /** reset the ringbuffer
345 *
346 * \note Not thread-safe
347 * */
348 void reset(void)
349 {
350 if ( !boost::has_trivial_destructor<T>::value ) {
351 // make sure to call all destructors!
352
353 T dummy_element;
354 while (pop(dummy_element))
355 {}
356 } else {
357 write_index_.store(0, memory_order_relaxed);
358 read_index_.store(0, memory_order_release);
359 }
360 }
361
362 /** Check if the ringbuffer is empty
363 *
364 * \return true, if the ringbuffer is empty, false otherwise
365 * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate.
366 * */
367 bool empty(void)
368 {
369 return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed));
370 }
371
372 /**
373 * \return true, if implementation is lock-free.
374 *
375 * */
376 bool is_lock_free(void) const
377 {
378 return write_index_.is_lock_free() && read_index_.is_lock_free();
379 }
380
381 private:
382 bool empty(size_t write_index, size_t read_index)
383 {
384 return write_index == read_index;
385 }
386
387 template< class OutputIterator >
388 OutputIterator copy_and_delete( T * first, T * last, OutputIterator out )
389 {
390 if (boost::has_trivial_destructor<T>::value) {
391 return std::copy(first, last, out); // will use memcpy if possible
392 } else {
393 for (; first != last; ++first, ++out) {
394 *out = *first;
395 first->~T();
396 }
397 return out;
398 }
399 }
400
401 template< class Functor >
402 void run_functor_and_delete( T * first, T * last, Functor & functor )
403 {
404 for (; first != last; ++first) {
405 functor(*first);
406 first->~T();
407 }
408 }
409
410 template< class Functor >
411 void run_functor_and_delete( T * first, T * last, Functor const & functor )
412 {
413 for (; first != last; ++first) {
414 functor(*first);
415 first->~T();
416 }
417 }
418 };
419
420 template <typename T, std::size_t MaxSize>
421 class compile_time_sized_ringbuffer:
422 public ringbuffer_base<T>
423 {
424 typedef std::size_t size_type;
425 static const std::size_t max_size = MaxSize + 1;
426
427 typedef typename boost::aligned_storage<max_size * sizeof(T),
428 boost::alignment_of<T>::value
429 >::type storage_type;
430
431 storage_type storage_;
432
433 T * data()
434 {
435 return static_cast<T*>(storage_.address());
436 }
437
438 const T * data() const
439 {
440 return static_cast<const T*>(storage_.address());
441 }
442
443 protected:
444 size_type max_number_of_elements() const
445 {
446 return max_size;
447 }
448
449 public:
450 bool push(T const & t)
451 {
452 return ringbuffer_base<T>::push(t, data(), max_size);
453 }
454
455 template <typename Functor>
456 bool consume_one(Functor & f)
457 {
458 return ringbuffer_base<T>::consume_one(f, data(), max_size);
459 }
460
461 template <typename Functor>
462 bool consume_one(Functor const & f)
463 {
464 return ringbuffer_base<T>::consume_one(f, data(), max_size);
465 }
466
467 template <typename Functor>
468 size_type consume_all(Functor & f)
469 {
470 return ringbuffer_base<T>::consume_all(f, data(), max_size);
471 }
472
473 template <typename Functor>
474 size_type consume_all(Functor const & f)
475 {
476 return ringbuffer_base<T>::consume_all(f, data(), max_size);
477 }
478
479 size_type push(T const * t, size_type size)
480 {
481 return ringbuffer_base<T>::push(t, size, data(), max_size);
482 }
483
484 template <size_type size>
485 size_type push(T const (&t)[size])
486 {
487 return push(t, size);
488 }
489
490 template <typename ConstIterator>
491 ConstIterator push(ConstIterator begin, ConstIterator end)
492 {
493 return ringbuffer_base<T>::push(begin, end, data(), max_size);
494 }
495
496 size_type pop(T * ret, size_type size)
497 {
498 return ringbuffer_base<T>::pop(ret, size, data(), max_size);
499 }
500
501 template <typename OutputIterator>
502 size_type pop_to_output_iterator(OutputIterator it)
503 {
504 return ringbuffer_base<T>::pop_to_output_iterator(it, data(), max_size);
505 }
506
507 const T& front(void) const
508 {
509 return ringbuffer_base<T>::front(data());
510 }
511
512 T& front(void)
513 {
514 return ringbuffer_base<T>::front(data());
515 }
516 };
517
518 template <typename T, typename Alloc>
519 class runtime_sized_ringbuffer:
520 public ringbuffer_base<T>,
521 private Alloc
522 {
523 typedef std::size_t size_type;
524 size_type max_elements_;
525 typedef typename Alloc::pointer pointer;
526 pointer array_;
527
528 protected:
529 size_type max_number_of_elements() const
530 {
531 return max_elements_;
532 }
533
534 public:
535 explicit runtime_sized_ringbuffer(size_type max_elements):
536 max_elements_(max_elements + 1)
537 {
538 array_ = Alloc::allocate(max_elements_);
539 }
540
541 template <typename U>
542 runtime_sized_ringbuffer(typename Alloc::template rebind<U>::other const & alloc, size_type max_elements):
543 Alloc(alloc), max_elements_(max_elements + 1)
544 {
545 array_ = Alloc::allocate(max_elements_);
546 }
547
548 runtime_sized_ringbuffer(Alloc const & alloc, size_type max_elements):
549 Alloc(alloc), max_elements_(max_elements + 1)
550 {
551 array_ = Alloc::allocate(max_elements_);
552 }
553
554 ~runtime_sized_ringbuffer(void)
555 {
556 // destroy all remaining items
557 T out;
558 while (pop(&out, 1)) {}
559
560 Alloc::deallocate(array_, max_elements_);
561 }
562
563 bool push(T const & t)
564 {
565 return ringbuffer_base<T>::push(t, &*array_, max_elements_);
566 }
567
568 template <typename Functor>
569 bool consume_one(Functor & f)
570 {
571 return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
572 }
573
574 template <typename Functor>
575 bool consume_one(Functor const & f)
576 {
577 return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
578 }
579
580 template <typename Functor>
581 size_type consume_all(Functor & f)
582 {
583 return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
584 }
585
586 template <typename Functor>
587 size_type consume_all(Functor const & f)
588 {
589 return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
590 }
591
592 size_type push(T const * t, size_type size)
593 {
594 return ringbuffer_base<T>::push(t, size, &*array_, max_elements_);
595 }
596
597 template <size_type size>
598 size_type push(T const (&t)[size])
599 {
600 return push(t, size);
601 }
602
603 template <typename ConstIterator>
604 ConstIterator push(ConstIterator begin, ConstIterator end)
605 {
606 return ringbuffer_base<T>::push(begin, end, &*array_, max_elements_);
607 }
608
609 size_type pop(T * ret, size_type size)
610 {
611 return ringbuffer_base<T>::pop(ret, size, &*array_, max_elements_);
612 }
613
614 template <typename OutputIterator>
615 size_type pop_to_output_iterator(OutputIterator it)
616 {
617 return ringbuffer_base<T>::pop_to_output_iterator(it, &*array_, max_elements_);
618 }
619
620 const T& front(void) const
621 {
622 return ringbuffer_base<T>::front(&*array_);
623 }
624
625 T& front(void)
626 {
627 return ringbuffer_base<T>::front(&*array_);
628 }
629 };
630
631 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
632 template <typename T, typename A0, typename A1>
633 #else
634 template <typename T, typename ...Options>
635 #endif
636 struct make_ringbuffer
637 {
638 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
639 typedef typename ringbuffer_signature::bind<A0, A1>::type bound_args;
640 #else
641 typedef typename ringbuffer_signature::bind<Options...>::type bound_args;
642 #endif
643
644 typedef extract_capacity<bound_args> extract_capacity_t;
645
646 static const bool runtime_sized = !extract_capacity_t::has_capacity;
647 static const size_t capacity = extract_capacity_t::capacity;
648
649 typedef extract_allocator<bound_args, T> extract_allocator_t;
650 typedef typename extract_allocator_t::type allocator;
651
652 // allocator argument is only sane, for run-time sized ringbuffers
653 BOOST_STATIC_ASSERT((mpl::if_<mpl::bool_<!runtime_sized>,
654 mpl::bool_<!extract_allocator_t::has_allocator>,
655 mpl::true_
656 >::type::value));
657
658 typedef typename mpl::if_c<runtime_sized,
659 runtime_sized_ringbuffer<T, allocator>,
660 compile_time_sized_ringbuffer<T, capacity>
661 >::type ringbuffer_type;
662 };
663
664
665 } /* namespace detail */
666
667
668 /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free.
669 *
670 * \b Policies:
671 * - \c boost::lockfree::capacity<>, optional <br>
672 * If this template argument is passed to the options, the size of the ringbuffer is set at compile-time.
673 *
674 * - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br>
675 * Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is configured
676 * to be sized at run-time
677 *
678 * \b Requirements:
679 * - T must have a default constructor
680 * - T must be copyable
681 * */
682 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
683 template <typename T, class A0, class A1>
684 #else
685 template <typename T, typename ...Options>
686 #endif
687 class spsc_queue:
688 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
689 public detail::make_ringbuffer<T, A0, A1>::ringbuffer_type
690 #else
691 public detail::make_ringbuffer<T, Options...>::ringbuffer_type
692 #endif
693 {
694 private:
695
696 #ifndef BOOST_DOXYGEN_INVOKED
697
698 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
699 typedef typename detail::make_ringbuffer<T, A0, A1>::ringbuffer_type base_type;
700 static const bool runtime_sized = detail::make_ringbuffer<T, A0, A1>::runtime_sized;
701 typedef typename detail::make_ringbuffer<T, A0, A1>::allocator allocator_arg;
702 #else
703 typedef typename detail::make_ringbuffer<T, Options...>::ringbuffer_type base_type;
704 static const bool runtime_sized = detail::make_ringbuffer<T, Options...>::runtime_sized;
705 typedef typename detail::make_ringbuffer<T, Options...>::allocator allocator_arg;
706 #endif
707
708
709 struct implementation_defined
710 {
711 typedef allocator_arg allocator;
712 typedef std::size_t size_type;
713 };
714 #endif
715
716 public:
717 typedef T value_type;
718 typedef typename implementation_defined::allocator allocator;
719 typedef typename implementation_defined::size_type size_type;
720
721 /** Constructs a spsc_queue
722 *
723 * \pre spsc_queue must be configured to be sized at compile-time
724 */
725 // @{
726 spsc_queue(void)
727 {
728 BOOST_ASSERT(!runtime_sized);
729 }
730
731 template <typename U>
732 explicit spsc_queue(typename allocator::template rebind<U>::other const &)
733 {
734 // just for API compatibility: we don't actually need an allocator
735 BOOST_STATIC_ASSERT(!runtime_sized);
736 }
737
738 explicit spsc_queue(allocator const &)
739 {
740 // just for API compatibility: we don't actually need an allocator
741 BOOST_ASSERT(!runtime_sized);
742 }
743 // @}
744
745
746 /** Constructs a spsc_queue for element_count elements
747 *
748 * \pre spsc_queue must be configured to be sized at run-time
749 */
750 // @{
751 explicit spsc_queue(size_type element_count):
752 base_type(element_count)
753 {
754 BOOST_ASSERT(runtime_sized);
755 }
756
757 template <typename U>
758 spsc_queue(size_type element_count, typename allocator::template rebind<U>::other const & alloc):
759 base_type(alloc, element_count)
760 {
761 BOOST_STATIC_ASSERT(runtime_sized);
762 }
763
764 spsc_queue(size_type element_count, allocator_arg const & alloc):
765 base_type(alloc, element_count)
766 {
767 BOOST_ASSERT(runtime_sized);
768 }
769 // @}
770
771 /** Pushes object t to the ringbuffer.
772 *
773 * \pre only one thread is allowed to push data to the spsc_queue
774 * \post object will be pushed to the spsc_queue, unless it is full.
775 * \return true, if the push operation is successful.
776 *
777 * \note Thread-safe and wait-free
778 * */
779 bool push(T const & t)
780 {
781 return base_type::push(t);
782 }
783
784 /** Pops one object from ringbuffer.
785 *
786 * \pre only one thread is allowed to pop data to the spsc_queue
787 * \post if ringbuffer is not empty, object will be discarded.
788 * \return true, if the pop operation is successful, false if ringbuffer was empty.
789 *
790 * \note Thread-safe and wait-free
791 */
792 bool pop ()
793 {
794 detail::consume_noop consume_functor;
795 return consume_one( consume_functor );
796 }
797
798 /** Pops one object from ringbuffer.
799 *
800 * \pre only one thread is allowed to pop data to the spsc_queue
801 * \post if ringbuffer is not empty, object will be copied to ret.
802 * \return true, if the pop operation is successful, false if ringbuffer was empty.
803 *
804 * \note Thread-safe and wait-free
805 */
806 template <typename U>
807 typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type
808 pop (U & ret)
809 {
810 detail::consume_via_copy<U> consume_functor(ret);
811 return consume_one( consume_functor );
812 }
813
814 /** Pushes as many objects from the array t as there is space.
815 *
816 * \pre only one thread is allowed to push data to the spsc_queue
817 * \return number of pushed items
818 *
819 * \note Thread-safe and wait-free
820 */
821 size_type push(T const * t, size_type size)
822 {
823 return base_type::push(t, size);
824 }
825
826 /** Pushes as many objects from the array t as there is space available.
827 *
828 * \pre only one thread is allowed to push data to the spsc_queue
829 * \return number of pushed items
830 *
831 * \note Thread-safe and wait-free
832 */
833 template <size_type size>
834 size_type push(T const (&t)[size])
835 {
836 return push(t, size);
837 }
838
839 /** Pushes as many objects from the range [begin, end) as there is space .
840 *
841 * \pre only one thread is allowed to push data to the spsc_queue
842 * \return iterator to the first element, which has not been pushed
843 *
844 * \note Thread-safe and wait-free
845 */
846 template <typename ConstIterator>
847 ConstIterator push(ConstIterator begin, ConstIterator end)
848 {
849 return base_type::push(begin, end);
850 }
851
852 /** Pops a maximum of size objects from ringbuffer.
853 *
854 * \pre only one thread is allowed to pop data to the spsc_queue
855 * \return number of popped items
856 *
857 * \note Thread-safe and wait-free
858 * */
859 size_type pop(T * ret, size_type size)
860 {
861 return base_type::pop(ret, size);
862 }
863
864 /** Pops a maximum of size objects from spsc_queue.
865 *
866 * \pre only one thread is allowed to pop data to the spsc_queue
867 * \return number of popped items
868 *
869 * \note Thread-safe and wait-free
870 * */
871 template <size_type size>
872 size_type pop(T (&ret)[size])
873 {
874 return pop(ret, size);
875 }
876
877 /** Pops objects to the output iterator it
878 *
879 * \pre only one thread is allowed to pop data to the spsc_queue
880 * \return number of popped items
881 *
882 * \note Thread-safe and wait-free
883 * */
884 template <typename OutputIterator>
885 typename boost::disable_if<typename is_convertible<T, OutputIterator>::type, size_type>::type
886 pop(OutputIterator it)
887 {
888 return base_type::pop_to_output_iterator(it);
889 }
890
891 /** consumes one element via a functor
892 *
893 * pops one element from the queue and applies the functor on this object
894 *
895 * \returns true, if one element was consumed
896 *
897 * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
898 * */
899 template <typename Functor>
900 bool consume_one(Functor & f)
901 {
902 return base_type::consume_one(f);
903 }
904
905 /// \copydoc boost::lockfree::spsc_queue::consume_one(Functor & rhs)
906 template <typename Functor>
907 bool consume_one(Functor const & f)
908 {
909 return base_type::consume_one(f);
910 }
911
912 /** consumes all elements via a functor
913 *
914 * sequentially pops all elements from the queue and applies the functor on each object
915 *
916 * \returns number of elements that are consumed
917 *
918 * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
919 * */
920 template <typename Functor>
921 size_type consume_all(Functor & f)
922 {
923 return base_type::consume_all(f);
924 }
925
926 /// \copydoc boost::lockfree::spsc_queue::consume_all(Functor & rhs)
927 template <typename Functor>
928 size_type consume_all(Functor const & f)
929 {
930 return base_type::consume_all(f);
931 }
932
933 /** get number of elements that are available for read
934 *
935 * \return number of available elements that can be popped from the spsc_queue
936 *
937 * \note Thread-safe and wait-free, should only be called from the consumer thread
938 * */
939 size_type read_available() const
940 {
941 return base_type::read_available(base_type::max_number_of_elements());
942 }
943
944 /** get write space to write elements
945 *
946 * \return number of elements that can be pushed to the spsc_queue
947 *
948 * \note Thread-safe and wait-free, should only be called from the producer thread
949 * */
950 size_type write_available() const
951 {
952 return base_type::write_available(base_type::max_number_of_elements());
953 }
954
955 /** get reference to element in the front of the queue
956 *
957 * Availability of front element can be checked using read_available().
958 *
959 * \pre only a consuming thread is allowed to check front element
960 * \pre read_available() > 0. If ringbuffer is empty, it's undefined behaviour to invoke this method.
961 * \return reference to the first element in the queue
962 *
963 * \note Thread-safe and wait-free
964 */
965 const T& front() const
966 {
967 BOOST_ASSERT(read_available() > 0);
968 return base_type::front();
969 }
970
971 /// \copydoc boost::lockfree::spsc_queue::front() const
972 T& front()
973 {
974 BOOST_ASSERT(read_available() > 0);
975 return base_type::front();
976 }
977
978 /** reset the ringbuffer
979 *
980 * \note Not thread-safe
981 * */
982 void reset(void)
983 {
984 if ( !boost::has_trivial_destructor<T>::value ) {
985 // make sure to call all destructors!
986
987 T dummy_element;
988 while (pop(dummy_element))
989 {}
990 } else {
991 base_type::write_index_.store(0, memory_order_relaxed);
992 base_type::read_index_.store(0, memory_order_release);
993 }
994 }
995 };
996
997 } /* namespace lockfree */
998 } /* namespace boost */
999
1000
1001 #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */