]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/lockfree/spsc_queue.hpp
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / boost / boost / lockfree / spsc_queue.hpp
1 // lock-free single-producer/single-consumer ringbuffer
2 // this algorithm is implemented in various projects (linux kernel)
3 //
4 // Copyright (C) 2009-2013 Tim Blechmann
5 //
6 // Distributed under the Boost Software License, Version 1.0. (See
7 // accompanying file LICENSE_1_0.txt or copy at
8 // http://www.boost.org/LICENSE_1_0.txt)
9
10 #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
11 #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
12
13 #include <algorithm>
14 #include <memory>
15
16 #include <boost/aligned_storage.hpp>
17 #include <boost/assert.hpp>
18 #include <boost/static_assert.hpp>
19 #include <boost/utility.hpp>
20 #include <boost/next_prior.hpp>
21 #include <boost/utility/enable_if.hpp>
22 #include <boost/config.hpp> // for BOOST_LIKELY
23
24 #include <boost/type_traits/has_trivial_destructor.hpp>
25 #include <boost/type_traits/is_convertible.hpp>
26
27 #include <boost/lockfree/detail/allocator_rebind_helper.hpp>
28 #include <boost/lockfree/detail/atomic.hpp>
29 #include <boost/lockfree/detail/copy_payload.hpp>
30 #include <boost/lockfree/detail/parameter.hpp>
31 #include <boost/lockfree/detail/prefix.hpp>
32
33 #include <boost/lockfree/lockfree_forward.hpp>
34
35 #ifdef BOOST_HAS_PRAGMA_ONCE
36 #pragma once
37 #endif
38
39 namespace boost {
40 namespace lockfree {
41 namespace detail {
42
43 typedef parameter::parameters<boost::parameter::optional<tag::capacity>,
44 boost::parameter::optional<tag::allocator>
45 > ringbuffer_signature;
46
47 template <typename T>
48 class ringbuffer_base
49 {
50 #ifndef BOOST_DOXYGEN_INVOKED
51 protected:
52 typedef std::size_t size_t;
53 static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t);
54 atomic<size_t> write_index_;
55 char padding1[padding_size]; /* force read_index and write_index to different cache lines */
56 atomic<size_t> read_index_;
57
58 BOOST_DELETED_FUNCTION(ringbuffer_base(ringbuffer_base const&))
59 BOOST_DELETED_FUNCTION(ringbuffer_base& operator= (ringbuffer_base const&))
60
61 protected:
62 ringbuffer_base(void):
63 write_index_(0), read_index_(0)
64 {}
65
66 static size_t next_index(size_t arg, size_t max_size)
67 {
68 size_t ret = arg + 1;
69 while (BOOST_UNLIKELY(ret >= max_size))
70 ret -= max_size;
71 return ret;
72 }
73
74 static size_t read_available(size_t write_index, size_t read_index, size_t max_size)
75 {
76 if (write_index >= read_index)
77 return write_index - read_index;
78
79 const size_t ret = write_index + max_size - read_index;
80 return ret;
81 }
82
83 static size_t write_available(size_t write_index, size_t read_index, size_t max_size)
84 {
85 size_t ret = read_index - write_index - 1;
86 if (write_index >= read_index)
87 ret += max_size;
88 return ret;
89 }
90
91 size_t read_available(size_t max_size) const
92 {
93 size_t write_index = write_index_.load(memory_order_acquire);
94 const size_t read_index = read_index_.load(memory_order_relaxed);
95 return read_available(write_index, read_index, max_size);
96 }
97
98 size_t write_available(size_t max_size) const
99 {
100 size_t write_index = write_index_.load(memory_order_relaxed);
101 const size_t read_index = read_index_.load(memory_order_acquire);
102 return write_available(write_index, read_index, max_size);
103 }
104
105 bool push(T const & t, T * buffer, size_t max_size)
106 {
107 const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
108 const size_t next = next_index(write_index, max_size);
109
110 if (next == read_index_.load(memory_order_acquire))
111 return false; /* ringbuffer is full */
112
113 new (buffer + write_index) T(t); // copy-construct
114
115 write_index_.store(next, memory_order_release);
116
117 return true;
118 }
119
120 size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size)
121 {
122 return push(input_buffer, input_buffer + input_count, internal_buffer, max_size) - input_buffer;
123 }
124
125 template <typename ConstIterator>
126 ConstIterator push(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size)
127 {
128 // FIXME: avoid std::distance
129
130 const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
131 const size_t read_index = read_index_.load(memory_order_acquire);
132 const size_t avail = write_available(write_index, read_index, max_size);
133
134 if (avail == 0)
135 return begin;
136
137 size_t input_count = std::distance(begin, end);
138 input_count = (std::min)(input_count, avail);
139
140 size_t new_write_index = write_index + input_count;
141
142 const ConstIterator last = boost::next(begin, input_count);
143
144 if (write_index + input_count > max_size) {
145 /* copy data in two sections */
146 const size_t count0 = max_size - write_index;
147 const ConstIterator midpoint = boost::next(begin, count0);
148
149 std::uninitialized_copy(begin, midpoint, internal_buffer + write_index);
150 std::uninitialized_copy(midpoint, last, internal_buffer);
151 new_write_index -= max_size;
152 } else {
153 std::uninitialized_copy(begin, last, internal_buffer + write_index);
154
155 if (new_write_index == max_size)
156 new_write_index = 0;
157 }
158
159 write_index_.store(new_write_index, memory_order_release);
160 return last;
161 }
162
163 template <typename Functor>
164 bool consume_one(Functor & functor, T * buffer, size_t max_size)
165 {
166 const size_t write_index = write_index_.load(memory_order_acquire);
167 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
168 if ( empty(write_index, read_index) )
169 return false;
170
171 T & object_to_consume = buffer[read_index];
172 functor( object_to_consume );
173 object_to_consume.~T();
174
175 size_t next = next_index(read_index, max_size);
176 read_index_.store(next, memory_order_release);
177 return true;
178 }
179
180 template <typename Functor>
181 bool consume_one(Functor const & functor, T * buffer, size_t max_size)
182 {
183 const size_t write_index = write_index_.load(memory_order_acquire);
184 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
185 if ( empty(write_index, read_index) )
186 return false;
187
188 T & object_to_consume = buffer[read_index];
189 functor( object_to_consume );
190 object_to_consume.~T();
191
192 size_t next = next_index(read_index, max_size);
193 read_index_.store(next, memory_order_release);
194 return true;
195 }
196
197 template <typename Functor>
198 size_t consume_all (Functor const & functor, T * internal_buffer, size_t max_size)
199 {
200 const size_t write_index = write_index_.load(memory_order_acquire);
201 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
202
203 const size_t avail = read_available(write_index, read_index, max_size);
204
205 if (avail == 0)
206 return 0;
207
208 const size_t output_count = avail;
209
210 size_t new_read_index = read_index + output_count;
211
212 if (read_index + output_count > max_size) {
213 /* copy data in two sections */
214 const size_t count0 = max_size - read_index;
215 const size_t count1 = output_count - count0;
216
217 run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
218 run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
219
220 new_read_index -= max_size;
221 } else {
222 run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
223
224 if (new_read_index == max_size)
225 new_read_index = 0;
226 }
227
228 read_index_.store(new_read_index, memory_order_release);
229 return output_count;
230 }
231
232 template <typename Functor>
233 size_t consume_all (Functor & functor, T * internal_buffer, size_t max_size)
234 {
235 const size_t write_index = write_index_.load(memory_order_acquire);
236 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
237
238 const size_t avail = read_available(write_index, read_index, max_size);
239
240 if (avail == 0)
241 return 0;
242
243 const size_t output_count = avail;
244
245 size_t new_read_index = read_index + output_count;
246
247 if (read_index + output_count > max_size) {
248 /* copy data in two sections */
249 const size_t count0 = max_size - read_index;
250 const size_t count1 = output_count - count0;
251
252 run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
253 run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
254
255 new_read_index -= max_size;
256 } else {
257 run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
258
259 if (new_read_index == max_size)
260 new_read_index = 0;
261 }
262
263 read_index_.store(new_read_index, memory_order_release);
264 return output_count;
265 }
266
267 size_t pop (T * output_buffer, size_t output_count, T * internal_buffer, size_t max_size)
268 {
269 const size_t write_index = write_index_.load(memory_order_acquire);
270 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
271
272 const size_t avail = read_available(write_index, read_index, max_size);
273
274 if (avail == 0)
275 return 0;
276
277 output_count = (std::min)(output_count, avail);
278
279 size_t new_read_index = read_index + output_count;
280
281 if (read_index + output_count > max_size) {
282 /* copy data in two sections */
283 const size_t count0 = max_size - read_index;
284 const size_t count1 = output_count - count0;
285
286 copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer);
287 copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0);
288
289 new_read_index -= max_size;
290 } else {
291 copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer);
292 if (new_read_index == max_size)
293 new_read_index = 0;
294 }
295
296 read_index_.store(new_read_index, memory_order_release);
297 return output_count;
298 }
299
300 template <typename OutputIterator>
301 size_t pop_to_output_iterator (OutputIterator it, T * internal_buffer, size_t max_size)
302 {
303 const size_t write_index = write_index_.load(memory_order_acquire);
304 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
305
306 const size_t avail = read_available(write_index, read_index, max_size);
307 if (avail == 0)
308 return 0;
309
310 size_t new_read_index = read_index + avail;
311
312 if (read_index + avail > max_size) {
313 /* copy data in two sections */
314 const size_t count0 = max_size - read_index;
315 const size_t count1 = avail - count0;
316
317 it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it);
318 copy_and_delete(internal_buffer, internal_buffer + count1, it);
319
320 new_read_index -= max_size;
321 } else {
322 copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it);
323 if (new_read_index == max_size)
324 new_read_index = 0;
325 }
326
327 read_index_.store(new_read_index, memory_order_release);
328 return avail;
329 }
330
331 const T& front(const T * internal_buffer) const
332 {
333 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
334 return *(internal_buffer + read_index);
335 }
336
337 T& front(T * internal_buffer)
338 {
339 const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
340 return *(internal_buffer + read_index);
341 }
342 #endif
343
344
345 public:
346 /** reset the ringbuffer
347 *
348 * \note Not thread-safe
349 * */
350 void reset(void)
351 {
352 if ( !boost::has_trivial_destructor<T>::value ) {
353 // make sure to call all destructors!
354
355 T dummy_element;
356 while (pop(dummy_element))
357 {}
358 } else {
359 write_index_.store(0, memory_order_relaxed);
360 read_index_.store(0, memory_order_release);
361 }
362 }
363
364 /** Check if the ringbuffer is empty
365 *
366 * \return true, if the ringbuffer is empty, false otherwise
367 * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate.
368 * */
369 bool empty(void)
370 {
371 return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed));
372 }
373
374 /**
375 * \return true, if implementation is lock-free.
376 *
377 * */
378 bool is_lock_free(void) const
379 {
380 return write_index_.is_lock_free() && read_index_.is_lock_free();
381 }
382
383 private:
384 bool empty(size_t write_index, size_t read_index)
385 {
386 return write_index == read_index;
387 }
388
389 template< class OutputIterator >
390 OutputIterator copy_and_delete( T * first, T * last, OutputIterator out )
391 {
392 if (boost::has_trivial_destructor<T>::value) {
393 return std::copy(first, last, out); // will use memcpy if possible
394 } else {
395 for (; first != last; ++first, ++out) {
396 *out = *first;
397 first->~T();
398 }
399 return out;
400 }
401 }
402
403 template< class Functor >
404 void run_functor_and_delete( T * first, T * last, Functor & functor )
405 {
406 for (; first != last; ++first) {
407 functor(*first);
408 first->~T();
409 }
410 }
411
412 template< class Functor >
413 void run_functor_and_delete( T * first, T * last, Functor const & functor )
414 {
415 for (; first != last; ++first) {
416 functor(*first);
417 first->~T();
418 }
419 }
420 };
421
422 template <typename T, std::size_t MaxSize>
423 class compile_time_sized_ringbuffer:
424 public ringbuffer_base<T>
425 {
426 typedef std::size_t size_type;
427 static const std::size_t max_size = MaxSize + 1;
428
429 typedef typename boost::aligned_storage<max_size * sizeof(T),
430 boost::alignment_of<T>::value
431 >::type storage_type;
432
433 storage_type storage_;
434
435 T * data()
436 {
437 return static_cast<T*>(storage_.address());
438 }
439
440 const T * data() const
441 {
442 return static_cast<const T*>(storage_.address());
443 }
444
445 protected:
446 size_type max_number_of_elements() const
447 {
448 return max_size;
449 }
450
451 public:
452 bool push(T const & t)
453 {
454 return ringbuffer_base<T>::push(t, data(), max_size);
455 }
456
457 template <typename Functor>
458 bool consume_one(Functor & f)
459 {
460 return ringbuffer_base<T>::consume_one(f, data(), max_size);
461 }
462
463 template <typename Functor>
464 bool consume_one(Functor const & f)
465 {
466 return ringbuffer_base<T>::consume_one(f, data(), max_size);
467 }
468
469 template <typename Functor>
470 size_type consume_all(Functor & f)
471 {
472 return ringbuffer_base<T>::consume_all(f, data(), max_size);
473 }
474
475 template <typename Functor>
476 size_type consume_all(Functor const & f)
477 {
478 return ringbuffer_base<T>::consume_all(f, data(), max_size);
479 }
480
481 size_type push(T const * t, size_type size)
482 {
483 return ringbuffer_base<T>::push(t, size, data(), max_size);
484 }
485
486 template <size_type size>
487 size_type push(T const (&t)[size])
488 {
489 return push(t, size);
490 }
491
492 template <typename ConstIterator>
493 ConstIterator push(ConstIterator begin, ConstIterator end)
494 {
495 return ringbuffer_base<T>::push(begin, end, data(), max_size);
496 }
497
498 size_type pop(T * ret, size_type size)
499 {
500 return ringbuffer_base<T>::pop(ret, size, data(), max_size);
501 }
502
503 template <typename OutputIterator>
504 size_type pop_to_output_iterator(OutputIterator it)
505 {
506 return ringbuffer_base<T>::pop_to_output_iterator(it, data(), max_size);
507 }
508
509 const T& front(void) const
510 {
511 return ringbuffer_base<T>::front(data());
512 }
513
514 T& front(void)
515 {
516 return ringbuffer_base<T>::front(data());
517 }
518 };
519
520 template <typename T, typename Alloc>
521 class runtime_sized_ringbuffer:
522 public ringbuffer_base<T>,
523 private Alloc
524 {
525 typedef std::size_t size_type;
526 size_type max_elements_;
527 #ifdef BOOST_NO_CXX11_ALLOCATOR
528 typedef typename Alloc::pointer pointer;
529 #else
530 typedef std::allocator_traits<Alloc> allocator_traits;
531 typedef typename allocator_traits::pointer pointer;
532 #endif
533 pointer array_;
534
535 protected:
536 size_type max_number_of_elements() const
537 {
538 return max_elements_;
539 }
540
541 public:
542 explicit runtime_sized_ringbuffer(size_type max_elements):
543 max_elements_(max_elements + 1)
544 {
545 #ifdef BOOST_NO_CXX11_ALLOCATOR
546 array_ = Alloc::allocate(max_elements_);
547 #else
548 Alloc& alloc = *this;
549 array_ = allocator_traits::allocate(alloc, max_elements_);
550 #endif
551 }
552
553 template <typename U>
554 runtime_sized_ringbuffer(typename detail::allocator_rebind_helper<Alloc, U>::type const & alloc, size_type max_elements):
555 Alloc(alloc), max_elements_(max_elements + 1)
556 {
557 #ifdef BOOST_NO_CXX11_ALLOCATOR
558 array_ = Alloc::allocate(max_elements_);
559 #else
560 Alloc& allocator = *this;
561 array_ = allocator_traits::allocate(allocator, max_elements_);
562 #endif
563 }
564
565 runtime_sized_ringbuffer(Alloc const & alloc, size_type max_elements):
566 Alloc(alloc), max_elements_(max_elements + 1)
567 {
568 #ifdef BOOST_NO_CXX11_ALLOCATOR
569 array_ = Alloc::allocate(max_elements_);
570 #else
571 Alloc& allocator = *this;
572 array_ = allocator_traits::allocate(allocator, max_elements_);
573 #endif
574 }
575
576 ~runtime_sized_ringbuffer(void)
577 {
578 // destroy all remaining items
579 T out;
580 while (pop(&out, 1)) {}
581
582 #ifdef BOOST_NO_CXX11_ALLOCATOR
583 Alloc::deallocate(array_, max_elements_);
584 #else
585 Alloc& allocator = *this;
586 allocator_traits::deallocate(allocator, array_, max_elements_);
587 #endif
588 }
589
590 bool push(T const & t)
591 {
592 return ringbuffer_base<T>::push(t, &*array_, max_elements_);
593 }
594
595 template <typename Functor>
596 bool consume_one(Functor & f)
597 {
598 return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
599 }
600
601 template <typename Functor>
602 bool consume_one(Functor const & f)
603 {
604 return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
605 }
606
607 template <typename Functor>
608 size_type consume_all(Functor & f)
609 {
610 return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
611 }
612
613 template <typename Functor>
614 size_type consume_all(Functor const & f)
615 {
616 return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
617 }
618
619 size_type push(T const * t, size_type size)
620 {
621 return ringbuffer_base<T>::push(t, size, &*array_, max_elements_);
622 }
623
624 template <size_type size>
625 size_type push(T const (&t)[size])
626 {
627 return push(t, size);
628 }
629
630 template <typename ConstIterator>
631 ConstIterator push(ConstIterator begin, ConstIterator end)
632 {
633 return ringbuffer_base<T>::push(begin, end, &*array_, max_elements_);
634 }
635
636 size_type pop(T * ret, size_type size)
637 {
638 return ringbuffer_base<T>::pop(ret, size, &*array_, max_elements_);
639 }
640
641 template <typename OutputIterator>
642 size_type pop_to_output_iterator(OutputIterator it)
643 {
644 return ringbuffer_base<T>::pop_to_output_iterator(it, &*array_, max_elements_);
645 }
646
647 const T& front(void) const
648 {
649 return ringbuffer_base<T>::front(&*array_);
650 }
651
652 T& front(void)
653 {
654 return ringbuffer_base<T>::front(&*array_);
655 }
656 };
657
658 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
659 template <typename T, typename A0, typename A1>
660 #else
661 template <typename T, typename ...Options>
662 #endif
663 struct make_ringbuffer
664 {
665 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
666 typedef typename ringbuffer_signature::bind<A0, A1>::type bound_args;
667 #else
668 typedef typename ringbuffer_signature::bind<Options...>::type bound_args;
669 #endif
670
671 typedef extract_capacity<bound_args> extract_capacity_t;
672
673 static const bool runtime_sized = !extract_capacity_t::has_capacity;
674 static const size_t capacity = extract_capacity_t::capacity;
675
676 typedef extract_allocator<bound_args, T> extract_allocator_t;
677 typedef typename extract_allocator_t::type allocator;
678
679 // allocator argument is only sane, for run-time sized ringbuffers
680 BOOST_STATIC_ASSERT((mpl::if_<mpl::bool_<!runtime_sized>,
681 mpl::bool_<!extract_allocator_t::has_allocator>,
682 mpl::true_
683 >::type::value));
684
685 typedef typename mpl::if_c<runtime_sized,
686 runtime_sized_ringbuffer<T, allocator>,
687 compile_time_sized_ringbuffer<T, capacity>
688 >::type ringbuffer_type;
689 };
690
691
692 } /* namespace detail */
693
694
695 /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free.
696 *
697 * \b Policies:
698 * - \c boost::lockfree::capacity<>, optional <br>
699 * If this template argument is passed to the options, the size of the ringbuffer is set at compile-time.
700 *
701 * - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br>
702 * Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is configured
703 * to be sized at run-time
704 *
705 * \b Requirements:
706 * - T must have a default constructor
707 * - T must be copyable
708 * */
709 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
710 template <typename T, class A0, class A1>
711 #else
712 template <typename T, typename ...Options>
713 #endif
714 class spsc_queue:
715 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
716 public detail::make_ringbuffer<T, A0, A1>::ringbuffer_type
717 #else
718 public detail::make_ringbuffer<T, Options...>::ringbuffer_type
719 #endif
720 {
721 private:
722
723 #ifndef BOOST_DOXYGEN_INVOKED
724
725 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
726 typedef typename detail::make_ringbuffer<T, A0, A1>::ringbuffer_type base_type;
727 static const bool runtime_sized = detail::make_ringbuffer<T, A0, A1>::runtime_sized;
728 typedef typename detail::make_ringbuffer<T, A0, A1>::allocator allocator_arg;
729 #else
730 typedef typename detail::make_ringbuffer<T, Options...>::ringbuffer_type base_type;
731 static const bool runtime_sized = detail::make_ringbuffer<T, Options...>::runtime_sized;
732 typedef typename detail::make_ringbuffer<T, Options...>::allocator allocator_arg;
733 #endif
734
735
736 struct implementation_defined
737 {
738 typedef allocator_arg allocator;
739 typedef std::size_t size_type;
740 };
741 #endif
742
743 public:
744 typedef T value_type;
745 typedef typename implementation_defined::allocator allocator;
746 typedef typename implementation_defined::size_type size_type;
747
748 /** Constructs a spsc_queue
749 *
750 * \pre spsc_queue must be configured to be sized at compile-time
751 */
752 // @{
753 spsc_queue(void)
754 {
755 BOOST_ASSERT(!runtime_sized);
756 }
757
758 template <typename U>
759 explicit spsc_queue(typename detail::allocator_rebind_helper<allocator, U>::type const &)
760 {
761 // just for API compatibility: we don't actually need an allocator
762 BOOST_STATIC_ASSERT(!runtime_sized);
763 }
764
765 explicit spsc_queue(allocator const &)
766 {
767 // just for API compatibility: we don't actually need an allocator
768 BOOST_ASSERT(!runtime_sized);
769 }
770 // @}
771
772
773 /** Constructs a spsc_queue for element_count elements
774 *
775 * \pre spsc_queue must be configured to be sized at run-time
776 */
777 // @{
778 explicit spsc_queue(size_type element_count):
779 base_type(element_count)
780 {
781 BOOST_ASSERT(runtime_sized);
782 }
783
784 template <typename U>
785 spsc_queue(size_type element_count, typename detail::allocator_rebind_helper<allocator, U>::type const & alloc):
786 base_type(alloc, element_count)
787 {
788 BOOST_STATIC_ASSERT(runtime_sized);
789 }
790
791 spsc_queue(size_type element_count, allocator_arg const & alloc):
792 base_type(alloc, element_count)
793 {
794 BOOST_ASSERT(runtime_sized);
795 }
796 // @}
797
798 /** Pushes object t to the ringbuffer.
799 *
800 * \pre only one thread is allowed to push data to the spsc_queue
801 * \post object will be pushed to the spsc_queue, unless it is full.
802 * \return true, if the push operation is successful.
803 *
804 * \note Thread-safe and wait-free
805 * */
806 bool push(T const & t)
807 {
808 return base_type::push(t);
809 }
810
811 /** Pops one object from ringbuffer.
812 *
813 * \pre only one thread is allowed to pop data to the spsc_queue
814 * \post if ringbuffer is not empty, object will be discarded.
815 * \return true, if the pop operation is successful, false if ringbuffer was empty.
816 *
817 * \note Thread-safe and wait-free
818 */
819 bool pop ()
820 {
821 detail::consume_noop consume_functor;
822 return consume_one( consume_functor );
823 }
824
825 /** Pops one object from ringbuffer.
826 *
827 * \pre only one thread is allowed to pop data to the spsc_queue
828 * \post if ringbuffer is not empty, object will be copied to ret.
829 * \return true, if the pop operation is successful, false if ringbuffer was empty.
830 *
831 * \note Thread-safe and wait-free
832 */
833 template <typename U>
834 typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type
835 pop (U & ret)
836 {
837 detail::consume_via_copy<U> consume_functor(ret);
838 return consume_one( consume_functor );
839 }
840
841 /** Pushes as many objects from the array t as there is space.
842 *
843 * \pre only one thread is allowed to push data to the spsc_queue
844 * \return number of pushed items
845 *
846 * \note Thread-safe and wait-free
847 */
848 size_type push(T const * t, size_type size)
849 {
850 return base_type::push(t, size);
851 }
852
853 /** Pushes as many objects from the array t as there is space available.
854 *
855 * \pre only one thread is allowed to push data to the spsc_queue
856 * \return number of pushed items
857 *
858 * \note Thread-safe and wait-free
859 */
860 template <size_type size>
861 size_type push(T const (&t)[size])
862 {
863 return push(t, size);
864 }
865
866 /** Pushes as many objects from the range [begin, end) as there is space .
867 *
868 * \pre only one thread is allowed to push data to the spsc_queue
869 * \return iterator to the first element, which has not been pushed
870 *
871 * \note Thread-safe and wait-free
872 */
873 template <typename ConstIterator>
874 ConstIterator push(ConstIterator begin, ConstIterator end)
875 {
876 return base_type::push(begin, end);
877 }
878
879 /** Pops a maximum of size objects from ringbuffer.
880 *
881 * \pre only one thread is allowed to pop data to the spsc_queue
882 * \return number of popped items
883 *
884 * \note Thread-safe and wait-free
885 * */
886 size_type pop(T * ret, size_type size)
887 {
888 return base_type::pop(ret, size);
889 }
890
891 /** Pops a maximum of size objects from spsc_queue.
892 *
893 * \pre only one thread is allowed to pop data to the spsc_queue
894 * \return number of popped items
895 *
896 * \note Thread-safe and wait-free
897 * */
898 template <size_type size>
899 size_type pop(T (&ret)[size])
900 {
901 return pop(ret, size);
902 }
903
904 /** Pops objects to the output iterator it
905 *
906 * \pre only one thread is allowed to pop data to the spsc_queue
907 * \return number of popped items
908 *
909 * \note Thread-safe and wait-free
910 * */
911 template <typename OutputIterator>
912 typename boost::disable_if<typename is_convertible<T, OutputIterator>::type, size_type>::type
913 pop(OutputIterator it)
914 {
915 return base_type::pop_to_output_iterator(it);
916 }
917
918 /** consumes one element via a functor
919 *
920 * pops one element from the queue and applies the functor on this object
921 *
922 * \returns true, if one element was consumed
923 *
924 * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
925 * */
926 template <typename Functor>
927 bool consume_one(Functor & f)
928 {
929 return base_type::consume_one(f);
930 }
931
932 /// \copydoc boost::lockfree::spsc_queue::consume_one(Functor & rhs)
933 template <typename Functor>
934 bool consume_one(Functor const & f)
935 {
936 return base_type::consume_one(f);
937 }
938
939 /** consumes all elements via a functor
940 *
941 * sequentially pops all elements from the queue and applies the functor on each object
942 *
943 * \returns number of elements that are consumed
944 *
945 * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
946 * */
947 template <typename Functor>
948 size_type consume_all(Functor & f)
949 {
950 return base_type::consume_all(f);
951 }
952
953 /// \copydoc boost::lockfree::spsc_queue::consume_all(Functor & rhs)
954 template <typename Functor>
955 size_type consume_all(Functor const & f)
956 {
957 return base_type::consume_all(f);
958 }
959
960 /** get number of elements that are available for read
961 *
962 * \return number of available elements that can be popped from the spsc_queue
963 *
964 * \note Thread-safe and wait-free, should only be called from the consumer thread
965 * */
966 size_type read_available() const
967 {
968 return base_type::read_available(base_type::max_number_of_elements());
969 }
970
971 /** get write space to write elements
972 *
973 * \return number of elements that can be pushed to the spsc_queue
974 *
975 * \note Thread-safe and wait-free, should only be called from the producer thread
976 * */
977 size_type write_available() const
978 {
979 return base_type::write_available(base_type::max_number_of_elements());
980 }
981
982 /** get reference to element in the front of the queue
983 *
984 * Availability of front element can be checked using read_available().
985 *
986 * \pre only a consuming thread is allowed to check front element
987 * \pre read_available() > 0. If ringbuffer is empty, it's undefined behaviour to invoke this method.
988 * \return reference to the first element in the queue
989 *
990 * \note Thread-safe and wait-free
991 */
992 const T& front() const
993 {
994 BOOST_ASSERT(read_available() > 0);
995 return base_type::front();
996 }
997
998 /// \copydoc boost::lockfree::spsc_queue::front() const
999 T& front()
1000 {
1001 BOOST_ASSERT(read_available() > 0);
1002 return base_type::front();
1003 }
1004
1005 /** reset the ringbuffer
1006 *
1007 * \note Not thread-safe
1008 * */
1009 void reset(void)
1010 {
1011 if ( !boost::has_trivial_destructor<T>::value ) {
1012 // make sure to call all destructors!
1013
1014 T dummy_element;
1015 while (pop(dummy_element))
1016 {}
1017 } else {
1018 base_type::write_index_.store(0, memory_order_relaxed);
1019 base_type::read_index_.store(0, memory_order_release);
1020 }
1021 }
1022 };
1023
1024 } /* namespace lockfree */
1025 } /* namespace boost */
1026
1027
1028 #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */