]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // lock-free single-producer/single-consumer ringbuffer |
2 | // this algorithm is implemented in various projects (linux kernel) | |
3 | // | |
4 | // Copyright (C) 2009-2013 Tim Blechmann | |
5 | // | |
6 | // Distributed under the Boost Software License, Version 1.0. (See | |
7 | // accompanying file LICENSE_1_0.txt or copy at | |
8 | // http://www.boost.org/LICENSE_1_0.txt) | |
9 | ||
10 | #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED | |
11 | #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED | |
12 | ||
13 | #include <algorithm> | |
14 | #include <memory> | |
15 | ||
16 | #include <boost/aligned_storage.hpp> | |
17 | #include <boost/assert.hpp> | |
18 | #include <boost/static_assert.hpp> | |
20effc67 | 19 | #include <boost/core/allocator_access.hpp> |
7c673cae | 20 | #include <boost/utility.hpp> |
92f5a8d4 | 21 | #include <boost/next_prior.hpp> |
7c673cae FG |
22 | #include <boost/utility/enable_if.hpp> |
23 | #include <boost/config.hpp> // for BOOST_LIKELY | |
24 | ||
25 | #include <boost/type_traits/has_trivial_destructor.hpp> | |
26 | #include <boost/type_traits/is_convertible.hpp> | |
27 | ||
28 | #include <boost/lockfree/detail/atomic.hpp> | |
29 | #include <boost/lockfree/detail/copy_payload.hpp> | |
30 | #include <boost/lockfree/detail/parameter.hpp> | |
31 | #include <boost/lockfree/detail/prefix.hpp> | |
32 | ||
33 | #include <boost/lockfree/lockfree_forward.hpp> | |
34 | ||
35 | #ifdef BOOST_HAS_PRAGMA_ONCE | |
36 | #pragma once | |
37 | #endif | |
38 | ||
39 | namespace boost { | |
40 | namespace lockfree { | |
41 | namespace detail { | |
42 | ||
43 | typedef parameter::parameters<boost::parameter::optional<tag::capacity>, | |
44 | boost::parameter::optional<tag::allocator> | |
45 | > ringbuffer_signature; | |
46 | ||
47 | template <typename T> | |
48 | class ringbuffer_base | |
49 | { | |
50 | #ifndef BOOST_DOXYGEN_INVOKED | |
51 | protected: | |
52 | typedef std::size_t size_t; | |
53 | static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t); | |
54 | atomic<size_t> write_index_; | |
55 | char padding1[padding_size]; /* force read_index and write_index to different cache lines */ | |
56 | atomic<size_t> read_index_; | |
57 | ||
58 | BOOST_DELETED_FUNCTION(ringbuffer_base(ringbuffer_base const&)) | |
59 | BOOST_DELETED_FUNCTION(ringbuffer_base& operator= (ringbuffer_base const&)) | |
60 | ||
61 | protected: | |
62 | ringbuffer_base(void): | |
63 | write_index_(0), read_index_(0) | |
64 | {} | |
65 | ||
66 | static size_t next_index(size_t arg, size_t max_size) | |
67 | { | |
68 | size_t ret = arg + 1; | |
69 | while (BOOST_UNLIKELY(ret >= max_size)) | |
70 | ret -= max_size; | |
71 | return ret; | |
72 | } | |
73 | ||
74 | static size_t read_available(size_t write_index, size_t read_index, size_t max_size) | |
75 | { | |
76 | if (write_index >= read_index) | |
77 | return write_index - read_index; | |
78 | ||
79 | const size_t ret = write_index + max_size - read_index; | |
80 | return ret; | |
81 | } | |
82 | ||
83 | static size_t write_available(size_t write_index, size_t read_index, size_t max_size) | |
84 | { | |
85 | size_t ret = read_index - write_index - 1; | |
86 | if (write_index >= read_index) | |
87 | ret += max_size; | |
88 | return ret; | |
89 | } | |
90 | ||
91 | size_t read_available(size_t max_size) const | |
92 | { | |
93 | size_t write_index = write_index_.load(memory_order_acquire); | |
94 | const size_t read_index = read_index_.load(memory_order_relaxed); | |
95 | return read_available(write_index, read_index, max_size); | |
96 | } | |
97 | ||
98 | size_t write_available(size_t max_size) const | |
99 | { | |
100 | size_t write_index = write_index_.load(memory_order_relaxed); | |
101 | const size_t read_index = read_index_.load(memory_order_acquire); | |
102 | return write_available(write_index, read_index, max_size); | |
103 | } | |
104 | ||
105 | bool push(T const & t, T * buffer, size_t max_size) | |
106 | { | |
107 | const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread | |
108 | const size_t next = next_index(write_index, max_size); | |
109 | ||
110 | if (next == read_index_.load(memory_order_acquire)) | |
111 | return false; /* ringbuffer is full */ | |
112 | ||
113 | new (buffer + write_index) T(t); // copy-construct | |
114 | ||
115 | write_index_.store(next, memory_order_release); | |
116 | ||
117 | return true; | |
118 | } | |
119 | ||
120 | size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size) | |
121 | { | |
122 | return push(input_buffer, input_buffer + input_count, internal_buffer, max_size) - input_buffer; | |
123 | } | |
124 | ||
125 | template <typename ConstIterator> | |
126 | ConstIterator push(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size) | |
127 | { | |
128 | // FIXME: avoid std::distance | |
129 | ||
130 | const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread | |
131 | const size_t read_index = read_index_.load(memory_order_acquire); | |
132 | const size_t avail = write_available(write_index, read_index, max_size); | |
133 | ||
134 | if (avail == 0) | |
135 | return begin; | |
136 | ||
137 | size_t input_count = std::distance(begin, end); | |
138 | input_count = (std::min)(input_count, avail); | |
139 | ||
140 | size_t new_write_index = write_index + input_count; | |
141 | ||
142 | const ConstIterator last = boost::next(begin, input_count); | |
143 | ||
144 | if (write_index + input_count > max_size) { | |
145 | /* copy data in two sections */ | |
146 | const size_t count0 = max_size - write_index; | |
147 | const ConstIterator midpoint = boost::next(begin, count0); | |
148 | ||
149 | std::uninitialized_copy(begin, midpoint, internal_buffer + write_index); | |
150 | std::uninitialized_copy(midpoint, last, internal_buffer); | |
151 | new_write_index -= max_size; | |
152 | } else { | |
153 | std::uninitialized_copy(begin, last, internal_buffer + write_index); | |
154 | ||
155 | if (new_write_index == max_size) | |
156 | new_write_index = 0; | |
157 | } | |
158 | ||
159 | write_index_.store(new_write_index, memory_order_release); | |
160 | return last; | |
161 | } | |
162 | ||
163 | template <typename Functor> | |
164 | bool consume_one(Functor & functor, T * buffer, size_t max_size) | |
165 | { | |
166 | const size_t write_index = write_index_.load(memory_order_acquire); | |
167 | const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread | |
168 | if ( empty(write_index, read_index) ) | |
169 | return false; | |
170 | ||
171 | T & object_to_consume = buffer[read_index]; | |
172 | functor( object_to_consume ); | |
173 | object_to_consume.~T(); | |
174 | ||
175 | size_t next = next_index(read_index, max_size); | |
176 | read_index_.store(next, memory_order_release); | |
177 | return true; | |
178 | } | |
179 | ||
180 | template <typename Functor> | |
181 | bool consume_one(Functor const & functor, T * buffer, size_t max_size) | |
182 | { | |
183 | const size_t write_index = write_index_.load(memory_order_acquire); | |
184 | const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread | |
185 | if ( empty(write_index, read_index) ) | |
186 | return false; | |
187 | ||
188 | T & object_to_consume = buffer[read_index]; | |
189 | functor( object_to_consume ); | |
190 | object_to_consume.~T(); | |
191 | ||
192 | size_t next = next_index(read_index, max_size); | |
193 | read_index_.store(next, memory_order_release); | |
194 | return true; | |
195 | } | |
196 | ||
197 | template <typename Functor> | |
198 | size_t consume_all (Functor const & functor, T * internal_buffer, size_t max_size) | |
199 | { | |
200 | const size_t write_index = write_index_.load(memory_order_acquire); | |
201 | const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread | |
202 | ||
203 | const size_t avail = read_available(write_index, read_index, max_size); | |
204 | ||
205 | if (avail == 0) | |
206 | return 0; | |
207 | ||
208 | const size_t output_count = avail; | |
209 | ||
210 | size_t new_read_index = read_index + output_count; | |
211 | ||
212 | if (read_index + output_count > max_size) { | |
213 | /* copy data in two sections */ | |
214 | const size_t count0 = max_size - read_index; | |
215 | const size_t count1 = output_count - count0; | |
216 | ||
217 | run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor); | |
218 | run_functor_and_delete(internal_buffer, internal_buffer + count1, functor); | |
219 | ||
220 | new_read_index -= max_size; | |
221 | } else { | |
222 | run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor); | |
223 | ||
224 | if (new_read_index == max_size) | |
225 | new_read_index = 0; | |
226 | } | |
227 | ||
228 | read_index_.store(new_read_index, memory_order_release); | |
229 | return output_count; | |
230 | } | |
231 | ||
232 | template <typename Functor> | |
233 | size_t consume_all (Functor & functor, T * internal_buffer, size_t max_size) | |
234 | { | |
235 | const size_t write_index = write_index_.load(memory_order_acquire); | |
236 | const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread | |
237 | ||
238 | const size_t avail = read_available(write_index, read_index, max_size); | |
239 | ||
240 | if (avail == 0) | |
241 | return 0; | |
242 | ||
243 | const size_t output_count = avail; | |
244 | ||
245 | size_t new_read_index = read_index + output_count; | |
246 | ||
247 | if (read_index + output_count > max_size) { | |
248 | /* copy data in two sections */ | |
249 | const size_t count0 = max_size - read_index; | |
250 | const size_t count1 = output_count - count0; | |
251 | ||
252 | run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor); | |
253 | run_functor_and_delete(internal_buffer, internal_buffer + count1, functor); | |
254 | ||
255 | new_read_index -= max_size; | |
256 | } else { | |
257 | run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor); | |
258 | ||
259 | if (new_read_index == max_size) | |
260 | new_read_index = 0; | |
261 | } | |
262 | ||
263 | read_index_.store(new_read_index, memory_order_release); | |
264 | return output_count; | |
265 | } | |
266 | ||
267 | size_t pop (T * output_buffer, size_t output_count, T * internal_buffer, size_t max_size) | |
268 | { | |
269 | const size_t write_index = write_index_.load(memory_order_acquire); | |
270 | const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread | |
271 | ||
272 | const size_t avail = read_available(write_index, read_index, max_size); | |
273 | ||
274 | if (avail == 0) | |
275 | return 0; | |
276 | ||
277 | output_count = (std::min)(output_count, avail); | |
278 | ||
279 | size_t new_read_index = read_index + output_count; | |
280 | ||
281 | if (read_index + output_count > max_size) { | |
282 | /* copy data in two sections */ | |
283 | const size_t count0 = max_size - read_index; | |
284 | const size_t count1 = output_count - count0; | |
285 | ||
286 | copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer); | |
287 | copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0); | |
288 | ||
289 | new_read_index -= max_size; | |
290 | } else { | |
291 | copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer); | |
292 | if (new_read_index == max_size) | |
293 | new_read_index = 0; | |
294 | } | |
295 | ||
296 | read_index_.store(new_read_index, memory_order_release); | |
297 | return output_count; | |
298 | } | |
299 | ||
300 | template <typename OutputIterator> | |
301 | size_t pop_to_output_iterator (OutputIterator it, T * internal_buffer, size_t max_size) | |
302 | { | |
303 | const size_t write_index = write_index_.load(memory_order_acquire); | |
304 | const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread | |
305 | ||
306 | const size_t avail = read_available(write_index, read_index, max_size); | |
307 | if (avail == 0) | |
308 | return 0; | |
309 | ||
310 | size_t new_read_index = read_index + avail; | |
311 | ||
312 | if (read_index + avail > max_size) { | |
313 | /* copy data in two sections */ | |
314 | const size_t count0 = max_size - read_index; | |
315 | const size_t count1 = avail - count0; | |
316 | ||
317 | it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it); | |
318 | copy_and_delete(internal_buffer, internal_buffer + count1, it); | |
319 | ||
320 | new_read_index -= max_size; | |
321 | } else { | |
322 | copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it); | |
323 | if (new_read_index == max_size) | |
324 | new_read_index = 0; | |
325 | } | |
326 | ||
327 | read_index_.store(new_read_index, memory_order_release); | |
328 | return avail; | |
329 | } | |
330 | ||
331 | const T& front(const T * internal_buffer) const | |
332 | { | |
333 | const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread | |
334 | return *(internal_buffer + read_index); | |
335 | } | |
336 | ||
337 | T& front(T * internal_buffer) | |
338 | { | |
339 | const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread | |
340 | return *(internal_buffer + read_index); | |
341 | } | |
342 | #endif | |
343 | ||
344 | ||
345 | public: | |
346 | /** reset the ringbuffer | |
347 | * | |
348 | * \note Not thread-safe | |
349 | * */ | |
350 | void reset(void) | |
351 | { | |
352 | if ( !boost::has_trivial_destructor<T>::value ) { | |
353 | // make sure to call all destructors! | |
354 | ||
355 | T dummy_element; | |
356 | while (pop(dummy_element)) | |
357 | {} | |
358 | } else { | |
359 | write_index_.store(0, memory_order_relaxed); | |
360 | read_index_.store(0, memory_order_release); | |
361 | } | |
362 | } | |
363 | ||
364 | /** Check if the ringbuffer is empty | |
365 | * | |
366 | * \return true, if the ringbuffer is empty, false otherwise | |
367 | * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate. | |
368 | * */ | |
369 | bool empty(void) | |
370 | { | |
371 | return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed)); | |
372 | } | |
373 | ||
374 | /** | |
375 | * \return true, if implementation is lock-free. | |
376 | * | |
377 | * */ | |
378 | bool is_lock_free(void) const | |
379 | { | |
380 | return write_index_.is_lock_free() && read_index_.is_lock_free(); | |
381 | } | |
382 | ||
383 | private: | |
384 | bool empty(size_t write_index, size_t read_index) | |
385 | { | |
386 | return write_index == read_index; | |
387 | } | |
388 | ||
389 | template< class OutputIterator > | |
390 | OutputIterator copy_and_delete( T * first, T * last, OutputIterator out ) | |
391 | { | |
392 | if (boost::has_trivial_destructor<T>::value) { | |
393 | return std::copy(first, last, out); // will use memcpy if possible | |
394 | } else { | |
395 | for (; first != last; ++first, ++out) { | |
396 | *out = *first; | |
397 | first->~T(); | |
398 | } | |
399 | return out; | |
400 | } | |
401 | } | |
402 | ||
403 | template< class Functor > | |
404 | void run_functor_and_delete( T * first, T * last, Functor & functor ) | |
405 | { | |
406 | for (; first != last; ++first) { | |
407 | functor(*first); | |
408 | first->~T(); | |
409 | } | |
410 | } | |
411 | ||
412 | template< class Functor > | |
413 | void run_functor_and_delete( T * first, T * last, Functor const & functor ) | |
414 | { | |
415 | for (; first != last; ++first) { | |
416 | functor(*first); | |
417 | first->~T(); | |
418 | } | |
419 | } | |
420 | }; | |
421 | ||
422 | template <typename T, std::size_t MaxSize> | |
423 | class compile_time_sized_ringbuffer: | |
424 | public ringbuffer_base<T> | |
425 | { | |
426 | typedef std::size_t size_type; | |
427 | static const std::size_t max_size = MaxSize + 1; | |
428 | ||
429 | typedef typename boost::aligned_storage<max_size * sizeof(T), | |
430 | boost::alignment_of<T>::value | |
431 | >::type storage_type; | |
432 | ||
433 | storage_type storage_; | |
434 | ||
435 | T * data() | |
436 | { | |
437 | return static_cast<T*>(storage_.address()); | |
438 | } | |
439 | ||
440 | const T * data() const | |
441 | { | |
442 | return static_cast<const T*>(storage_.address()); | |
443 | } | |
444 | ||
445 | protected: | |
446 | size_type max_number_of_elements() const | |
447 | { | |
448 | return max_size; | |
449 | } | |
450 | ||
451 | public: | |
452 | bool push(T const & t) | |
453 | { | |
454 | return ringbuffer_base<T>::push(t, data(), max_size); | |
455 | } | |
456 | ||
457 | template <typename Functor> | |
458 | bool consume_one(Functor & f) | |
459 | { | |
460 | return ringbuffer_base<T>::consume_one(f, data(), max_size); | |
461 | } | |
462 | ||
463 | template <typename Functor> | |
464 | bool consume_one(Functor const & f) | |
465 | { | |
466 | return ringbuffer_base<T>::consume_one(f, data(), max_size); | |
467 | } | |
468 | ||
469 | template <typename Functor> | |
470 | size_type consume_all(Functor & f) | |
471 | { | |
472 | return ringbuffer_base<T>::consume_all(f, data(), max_size); | |
473 | } | |
474 | ||
475 | template <typename Functor> | |
476 | size_type consume_all(Functor const & f) | |
477 | { | |
478 | return ringbuffer_base<T>::consume_all(f, data(), max_size); | |
479 | } | |
480 | ||
481 | size_type push(T const * t, size_type size) | |
482 | { | |
483 | return ringbuffer_base<T>::push(t, size, data(), max_size); | |
484 | } | |
485 | ||
486 | template <size_type size> | |
487 | size_type push(T const (&t)[size]) | |
488 | { | |
489 | return push(t, size); | |
490 | } | |
491 | ||
492 | template <typename ConstIterator> | |
493 | ConstIterator push(ConstIterator begin, ConstIterator end) | |
494 | { | |
495 | return ringbuffer_base<T>::push(begin, end, data(), max_size); | |
496 | } | |
497 | ||
498 | size_type pop(T * ret, size_type size) | |
499 | { | |
500 | return ringbuffer_base<T>::pop(ret, size, data(), max_size); | |
501 | } | |
502 | ||
503 | template <typename OutputIterator> | |
504 | size_type pop_to_output_iterator(OutputIterator it) | |
505 | { | |
506 | return ringbuffer_base<T>::pop_to_output_iterator(it, data(), max_size); | |
507 | } | |
508 | ||
509 | const T& front(void) const | |
510 | { | |
511 | return ringbuffer_base<T>::front(data()); | |
512 | } | |
513 | ||
514 | T& front(void) | |
515 | { | |
516 | return ringbuffer_base<T>::front(data()); | |
517 | } | |
518 | }; | |
519 | ||
520 | template <typename T, typename Alloc> | |
521 | class runtime_sized_ringbuffer: | |
522 | public ringbuffer_base<T>, | |
523 | private Alloc | |
524 | { | |
525 | typedef std::size_t size_type; | |
526 | size_type max_elements_; | |
92f5a8d4 | 527 | #ifdef BOOST_NO_CXX11_ALLOCATOR |
7c673cae | 528 | typedef typename Alloc::pointer pointer; |
92f5a8d4 TL |
529 | #else |
530 | typedef std::allocator_traits<Alloc> allocator_traits; | |
531 | typedef typename allocator_traits::pointer pointer; | |
532 | #endif | |
7c673cae FG |
533 | pointer array_; |
534 | ||
535 | protected: | |
536 | size_type max_number_of_elements() const | |
537 | { | |
538 | return max_elements_; | |
539 | } | |
540 | ||
541 | public: | |
542 | explicit runtime_sized_ringbuffer(size_type max_elements): | |
543 | max_elements_(max_elements + 1) | |
544 | { | |
92f5a8d4 | 545 | #ifdef BOOST_NO_CXX11_ALLOCATOR |
7c673cae | 546 | array_ = Alloc::allocate(max_elements_); |
92f5a8d4 TL |
547 | #else |
548 | Alloc& alloc = *this; | |
549 | array_ = allocator_traits::allocate(alloc, max_elements_); | |
550 | #endif | |
7c673cae FG |
551 | } |
552 | ||
553 | template <typename U> | |
20effc67 | 554 | runtime_sized_ringbuffer(typename boost::allocator_rebind<Alloc, U>::type const & alloc, size_type max_elements): |
7c673cae FG |
555 | Alloc(alloc), max_elements_(max_elements + 1) |
556 | { | |
92f5a8d4 | 557 | #ifdef BOOST_NO_CXX11_ALLOCATOR |
7c673cae | 558 | array_ = Alloc::allocate(max_elements_); |
92f5a8d4 TL |
559 | #else |
560 | Alloc& allocator = *this; | |
561 | array_ = allocator_traits::allocate(allocator, max_elements_); | |
562 | #endif | |
7c673cae FG |
563 | } |
564 | ||
565 | runtime_sized_ringbuffer(Alloc const & alloc, size_type max_elements): | |
566 | Alloc(alloc), max_elements_(max_elements + 1) | |
567 | { | |
92f5a8d4 | 568 | #ifdef BOOST_NO_CXX11_ALLOCATOR |
7c673cae | 569 | array_ = Alloc::allocate(max_elements_); |
92f5a8d4 TL |
570 | #else |
571 | Alloc& allocator = *this; | |
572 | array_ = allocator_traits::allocate(allocator, max_elements_); | |
573 | #endif | |
7c673cae FG |
574 | } |
575 | ||
576 | ~runtime_sized_ringbuffer(void) | |
577 | { | |
578 | // destroy all remaining items | |
579 | T out; | |
580 | while (pop(&out, 1)) {} | |
581 | ||
92f5a8d4 | 582 | #ifdef BOOST_NO_CXX11_ALLOCATOR |
7c673cae | 583 | Alloc::deallocate(array_, max_elements_); |
92f5a8d4 TL |
584 | #else |
585 | Alloc& allocator = *this; | |
586 | allocator_traits::deallocate(allocator, array_, max_elements_); | |
587 | #endif | |
7c673cae FG |
588 | } |
589 | ||
590 | bool push(T const & t) | |
591 | { | |
592 | return ringbuffer_base<T>::push(t, &*array_, max_elements_); | |
593 | } | |
594 | ||
595 | template <typename Functor> | |
596 | bool consume_one(Functor & f) | |
597 | { | |
598 | return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_); | |
599 | } | |
600 | ||
601 | template <typename Functor> | |
602 | bool consume_one(Functor const & f) | |
603 | { | |
604 | return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_); | |
605 | } | |
606 | ||
607 | template <typename Functor> | |
608 | size_type consume_all(Functor & f) | |
609 | { | |
610 | return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_); | |
611 | } | |
612 | ||
613 | template <typename Functor> | |
614 | size_type consume_all(Functor const & f) | |
615 | { | |
616 | return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_); | |
617 | } | |
618 | ||
619 | size_type push(T const * t, size_type size) | |
620 | { | |
621 | return ringbuffer_base<T>::push(t, size, &*array_, max_elements_); | |
622 | } | |
623 | ||
624 | template <size_type size> | |
625 | size_type push(T const (&t)[size]) | |
626 | { | |
627 | return push(t, size); | |
628 | } | |
629 | ||
630 | template <typename ConstIterator> | |
631 | ConstIterator push(ConstIterator begin, ConstIterator end) | |
632 | { | |
633 | return ringbuffer_base<T>::push(begin, end, &*array_, max_elements_); | |
634 | } | |
635 | ||
636 | size_type pop(T * ret, size_type size) | |
637 | { | |
638 | return ringbuffer_base<T>::pop(ret, size, &*array_, max_elements_); | |
639 | } | |
640 | ||
641 | template <typename OutputIterator> | |
642 | size_type pop_to_output_iterator(OutputIterator it) | |
643 | { | |
644 | return ringbuffer_base<T>::pop_to_output_iterator(it, &*array_, max_elements_); | |
645 | } | |
646 | ||
647 | const T& front(void) const | |
648 | { | |
649 | return ringbuffer_base<T>::front(&*array_); | |
650 | } | |
651 | ||
652 | T& front(void) | |
653 | { | |
654 | return ringbuffer_base<T>::front(&*array_); | |
655 | } | |
656 | }; | |
657 | ||
658 | #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES | |
659 | template <typename T, typename A0, typename A1> | |
660 | #else | |
661 | template <typename T, typename ...Options> | |
662 | #endif | |
663 | struct make_ringbuffer | |
664 | { | |
665 | #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES | |
666 | typedef typename ringbuffer_signature::bind<A0, A1>::type bound_args; | |
667 | #else | |
668 | typedef typename ringbuffer_signature::bind<Options...>::type bound_args; | |
669 | #endif | |
670 | ||
671 | typedef extract_capacity<bound_args> extract_capacity_t; | |
672 | ||
673 | static const bool runtime_sized = !extract_capacity_t::has_capacity; | |
674 | static const size_t capacity = extract_capacity_t::capacity; | |
675 | ||
676 | typedef extract_allocator<bound_args, T> extract_allocator_t; | |
677 | typedef typename extract_allocator_t::type allocator; | |
678 | ||
679 | // allocator argument is only sane, for run-time sized ringbuffers | |
680 | BOOST_STATIC_ASSERT((mpl::if_<mpl::bool_<!runtime_sized>, | |
681 | mpl::bool_<!extract_allocator_t::has_allocator>, | |
682 | mpl::true_ | |
683 | >::type::value)); | |
684 | ||
685 | typedef typename mpl::if_c<runtime_sized, | |
686 | runtime_sized_ringbuffer<T, allocator>, | |
687 | compile_time_sized_ringbuffer<T, capacity> | |
688 | >::type ringbuffer_type; | |
689 | }; | |
690 | ||
691 | ||
692 | } /* namespace detail */ | |
693 | ||
694 | ||
695 | /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free. | |
696 | * | |
697 | * \b Policies: | |
698 | * - \c boost::lockfree::capacity<>, optional <br> | |
699 | * If this template argument is passed to the options, the size of the ringbuffer is set at compile-time. | |
700 | * | |
701 | * - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br> | |
702 | * Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is configured | |
703 | * to be sized at run-time | |
704 | * | |
705 | * \b Requirements: | |
706 | * - T must have a default constructor | |
707 | * - T must be copyable | |
708 | * */ | |
709 | #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES | |
710 | template <typename T, class A0, class A1> | |
711 | #else | |
712 | template <typename T, typename ...Options> | |
713 | #endif | |
714 | class spsc_queue: | |
715 | #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES | |
716 | public detail::make_ringbuffer<T, A0, A1>::ringbuffer_type | |
717 | #else | |
718 | public detail::make_ringbuffer<T, Options...>::ringbuffer_type | |
719 | #endif | |
720 | { | |
721 | private: | |
722 | ||
723 | #ifndef BOOST_DOXYGEN_INVOKED | |
724 | ||
725 | #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES | |
726 | typedef typename detail::make_ringbuffer<T, A0, A1>::ringbuffer_type base_type; | |
727 | static const bool runtime_sized = detail::make_ringbuffer<T, A0, A1>::runtime_sized; | |
728 | typedef typename detail::make_ringbuffer<T, A0, A1>::allocator allocator_arg; | |
729 | #else | |
730 | typedef typename detail::make_ringbuffer<T, Options...>::ringbuffer_type base_type; | |
731 | static const bool runtime_sized = detail::make_ringbuffer<T, Options...>::runtime_sized; | |
732 | typedef typename detail::make_ringbuffer<T, Options...>::allocator allocator_arg; | |
733 | #endif | |
734 | ||
735 | ||
736 | struct implementation_defined | |
737 | { | |
738 | typedef allocator_arg allocator; | |
739 | typedef std::size_t size_type; | |
740 | }; | |
741 | #endif | |
742 | ||
743 | public: | |
744 | typedef T value_type; | |
745 | typedef typename implementation_defined::allocator allocator; | |
746 | typedef typename implementation_defined::size_type size_type; | |
747 | ||
748 | /** Constructs a spsc_queue | |
749 | * | |
750 | * \pre spsc_queue must be configured to be sized at compile-time | |
751 | */ | |
7c673cae FG |
752 | spsc_queue(void) |
753 | { | |
20effc67 TL |
754 | // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling |
755 | // this function and this function may be compiled even when it isn't being used. | |
7c673cae FG |
756 | BOOST_ASSERT(!runtime_sized); |
757 | } | |
758 | ||
20effc67 TL |
759 | /** Constructs a spsc_queue with a custom allocator |
760 | * | |
761 | * \pre spsc_queue must be configured to be sized at compile-time | |
762 | * | |
763 | * \note This is just for API compatibility: an allocator isn't actually needed | |
764 | */ | |
7c673cae | 765 | template <typename U> |
20effc67 | 766 | explicit spsc_queue(typename boost::allocator_rebind<allocator, U>::type const &) |
7c673cae | 767 | { |
7c673cae FG |
768 | BOOST_STATIC_ASSERT(!runtime_sized); |
769 | } | |
770 | ||
20effc67 TL |
771 | /** Constructs a spsc_queue with a custom allocator |
772 | * | |
773 | * \pre spsc_queue must be configured to be sized at compile-time | |
774 | * | |
775 | * \note This is just for API compatibility: an allocator isn't actually needed | |
776 | */ | |
7c673cae FG |
777 | explicit spsc_queue(allocator const &) |
778 | { | |
20effc67 TL |
779 | // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling |
780 | // this function and this function may be compiled even when it isn't being used. | |
7c673cae FG |
781 | BOOST_ASSERT(!runtime_sized); |
782 | } | |
7c673cae FG |
783 | |
784 | /** Constructs a spsc_queue for element_count elements | |
785 | * | |
786 | * \pre spsc_queue must be configured to be sized at run-time | |
787 | */ | |
7c673cae FG |
788 | explicit spsc_queue(size_type element_count): |
789 | base_type(element_count) | |
790 | { | |
20effc67 TL |
791 | // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling |
792 | // this function and this function may be compiled even when it isn't being used. | |
7c673cae FG |
793 | BOOST_ASSERT(runtime_sized); |
794 | } | |
795 | ||
20effc67 TL |
796 | /** Constructs a spsc_queue for element_count elements with a custom allocator |
797 | * | |
798 | * \pre spsc_queue must be configured to be sized at run-time | |
799 | */ | |
7c673cae | 800 | template <typename U> |
20effc67 | 801 | spsc_queue(size_type element_count, typename boost::allocator_rebind<allocator, U>::type const & alloc): |
7c673cae FG |
802 | base_type(alloc, element_count) |
803 | { | |
804 | BOOST_STATIC_ASSERT(runtime_sized); | |
805 | } | |
806 | ||
20effc67 TL |
807 | /** Constructs a spsc_queue for element_count elements with a custom allocator |
808 | * | |
809 | * \pre spsc_queue must be configured to be sized at run-time | |
810 | */ | |
7c673cae FG |
811 | spsc_queue(size_type element_count, allocator_arg const & alloc): |
812 | base_type(alloc, element_count) | |
813 | { | |
20effc67 TL |
814 | // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling |
815 | // this function and this function may be compiled even when it isn't being used. | |
7c673cae FG |
816 | BOOST_ASSERT(runtime_sized); |
817 | } | |
7c673cae FG |
818 | |
819 | /** Pushes object t to the ringbuffer. | |
820 | * | |
821 | * \pre only one thread is allowed to push data to the spsc_queue | |
822 | * \post object will be pushed to the spsc_queue, unless it is full. | |
823 | * \return true, if the push operation is successful. | |
824 | * | |
825 | * \note Thread-safe and wait-free | |
826 | * */ | |
827 | bool push(T const & t) | |
828 | { | |
829 | return base_type::push(t); | |
830 | } | |
831 | ||
832 | /** Pops one object from ringbuffer. | |
833 | * | |
834 | * \pre only one thread is allowed to pop data to the spsc_queue | |
835 | * \post if ringbuffer is not empty, object will be discarded. | |
836 | * \return true, if the pop operation is successful, false if ringbuffer was empty. | |
837 | * | |
838 | * \note Thread-safe and wait-free | |
839 | */ | |
840 | bool pop () | |
841 | { | |
842 | detail::consume_noop consume_functor; | |
843 | return consume_one( consume_functor ); | |
844 | } | |
845 | ||
846 | /** Pops one object from ringbuffer. | |
847 | * | |
848 | * \pre only one thread is allowed to pop data to the spsc_queue | |
849 | * \post if ringbuffer is not empty, object will be copied to ret. | |
850 | * \return true, if the pop operation is successful, false if ringbuffer was empty. | |
851 | * | |
852 | * \note Thread-safe and wait-free | |
853 | */ | |
854 | template <typename U> | |
855 | typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type | |
856 | pop (U & ret) | |
857 | { | |
858 | detail::consume_via_copy<U> consume_functor(ret); | |
859 | return consume_one( consume_functor ); | |
860 | } | |
861 | ||
862 | /** Pushes as many objects from the array t as there is space. | |
863 | * | |
864 | * \pre only one thread is allowed to push data to the spsc_queue | |
865 | * \return number of pushed items | |
866 | * | |
867 | * \note Thread-safe and wait-free | |
868 | */ | |
869 | size_type push(T const * t, size_type size) | |
870 | { | |
871 | return base_type::push(t, size); | |
872 | } | |
873 | ||
874 | /** Pushes as many objects from the array t as there is space available. | |
875 | * | |
876 | * \pre only one thread is allowed to push data to the spsc_queue | |
877 | * \return number of pushed items | |
878 | * | |
879 | * \note Thread-safe and wait-free | |
880 | */ | |
881 | template <size_type size> | |
882 | size_type push(T const (&t)[size]) | |
883 | { | |
884 | return push(t, size); | |
885 | } | |
886 | ||
887 | /** Pushes as many objects from the range [begin, end) as there is space . | |
888 | * | |
889 | * \pre only one thread is allowed to push data to the spsc_queue | |
890 | * \return iterator to the first element, which has not been pushed | |
891 | * | |
892 | * \note Thread-safe and wait-free | |
893 | */ | |
894 | template <typename ConstIterator> | |
895 | ConstIterator push(ConstIterator begin, ConstIterator end) | |
896 | { | |
897 | return base_type::push(begin, end); | |
898 | } | |
899 | ||
900 | /** Pops a maximum of size objects from ringbuffer. | |
901 | * | |
902 | * \pre only one thread is allowed to pop data to the spsc_queue | |
903 | * \return number of popped items | |
904 | * | |
905 | * \note Thread-safe and wait-free | |
906 | * */ | |
907 | size_type pop(T * ret, size_type size) | |
908 | { | |
909 | return base_type::pop(ret, size); | |
910 | } | |
911 | ||
912 | /** Pops a maximum of size objects from spsc_queue. | |
913 | * | |
914 | * \pre only one thread is allowed to pop data to the spsc_queue | |
915 | * \return number of popped items | |
916 | * | |
917 | * \note Thread-safe and wait-free | |
918 | * */ | |
919 | template <size_type size> | |
920 | size_type pop(T (&ret)[size]) | |
921 | { | |
922 | return pop(ret, size); | |
923 | } | |
924 | ||
925 | /** Pops objects to the output iterator it | |
926 | * | |
927 | * \pre only one thread is allowed to pop data to the spsc_queue | |
928 | * \return number of popped items | |
929 | * | |
930 | * \note Thread-safe and wait-free | |
931 | * */ | |
932 | template <typename OutputIterator> | |
933 | typename boost::disable_if<typename is_convertible<T, OutputIterator>::type, size_type>::type | |
934 | pop(OutputIterator it) | |
935 | { | |
936 | return base_type::pop_to_output_iterator(it); | |
937 | } | |
938 | ||
939 | /** consumes one element via a functor | |
940 | * | |
941 | * pops one element from the queue and applies the functor on this object | |
942 | * | |
943 | * \returns true, if one element was consumed | |
944 | * | |
945 | * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking | |
946 | * */ | |
947 | template <typename Functor> | |
948 | bool consume_one(Functor & f) | |
949 | { | |
950 | return base_type::consume_one(f); | |
951 | } | |
952 | ||
953 | /// \copydoc boost::lockfree::spsc_queue::consume_one(Functor & rhs) | |
954 | template <typename Functor> | |
955 | bool consume_one(Functor const & f) | |
956 | { | |
957 | return base_type::consume_one(f); | |
958 | } | |
959 | ||
960 | /** consumes all elements via a functor | |
961 | * | |
962 | * sequentially pops all elements from the queue and applies the functor on each object | |
963 | * | |
964 | * \returns number of elements that are consumed | |
965 | * | |
966 | * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking | |
967 | * */ | |
968 | template <typename Functor> | |
969 | size_type consume_all(Functor & f) | |
970 | { | |
971 | return base_type::consume_all(f); | |
972 | } | |
973 | ||
974 | /// \copydoc boost::lockfree::spsc_queue::consume_all(Functor & rhs) | |
975 | template <typename Functor> | |
976 | size_type consume_all(Functor const & f) | |
977 | { | |
978 | return base_type::consume_all(f); | |
979 | } | |
980 | ||
981 | /** get number of elements that are available for read | |
982 | * | |
983 | * \return number of available elements that can be popped from the spsc_queue | |
984 | * | |
985 | * \note Thread-safe and wait-free, should only be called from the consumer thread | |
986 | * */ | |
987 | size_type read_available() const | |
988 | { | |
989 | return base_type::read_available(base_type::max_number_of_elements()); | |
990 | } | |
991 | ||
992 | /** get write space to write elements | |
993 | * | |
994 | * \return number of elements that can be pushed to the spsc_queue | |
995 | * | |
996 | * \note Thread-safe and wait-free, should only be called from the producer thread | |
997 | * */ | |
998 | size_type write_available() const | |
999 | { | |
1000 | return base_type::write_available(base_type::max_number_of_elements()); | |
1001 | } | |
1002 | ||
1003 | /** get reference to element in the front of the queue | |
1004 | * | |
1005 | * Availability of front element can be checked using read_available(). | |
1006 | * | |
1007 | * \pre only a consuming thread is allowed to check front element | |
1008 | * \pre read_available() > 0. If ringbuffer is empty, it's undefined behaviour to invoke this method. | |
1009 | * \return reference to the first element in the queue | |
1010 | * | |
1011 | * \note Thread-safe and wait-free | |
1012 | */ | |
1013 | const T& front() const | |
1014 | { | |
1015 | BOOST_ASSERT(read_available() > 0); | |
1016 | return base_type::front(); | |
1017 | } | |
1018 | ||
1019 | /// \copydoc boost::lockfree::spsc_queue::front() const | |
1020 | T& front() | |
1021 | { | |
1022 | BOOST_ASSERT(read_available() > 0); | |
1023 | return base_type::front(); | |
1024 | } | |
1025 | ||
1026 | /** reset the ringbuffer | |
1027 | * | |
1028 | * \note Not thread-safe | |
1029 | * */ | |
1030 | void reset(void) | |
1031 | { | |
1032 | if ( !boost::has_trivial_destructor<T>::value ) { | |
1033 | // make sure to call all destructors! | |
1034 | ||
1035 | T dummy_element; | |
1036 | while (pop(dummy_element)) | |
1037 | {} | |
1038 | } else { | |
1039 | base_type::write_index_.store(0, memory_order_relaxed); | |
1040 | base_type::read_index_.store(0, memory_order_release); | |
1041 | } | |
1042 | } | |
1043 | }; | |
1044 | ||
1045 | } /* namespace lockfree */ | |
1046 | } /* namespace boost */ | |
1047 | ||
1048 | ||
1049 | #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */ |