2 use core
::cell
::UnsafeCell
;
4 use core
::marker
::PhantomData
;
6 use core
::sync
::atomic
::{self, AtomicPtr, AtomicUsize, Ordering}
;
8 use crossbeam_utils
::{Backoff, CachePadded}
;
10 use maybe_uninit
::MaybeUninit
;
14 // Bits indicating the state of a slot:
15 // * If a value has been written into the slot, `WRITE` is set.
16 // * If a value has been read from the slot, `READ` is set.
17 // * If the block is being destroyed, `DESTROY` is set.
18 const WRITE
: usize = 1;
19 const READ
: usize = 2;
20 const DESTROY
: usize = 4;
22 // Each block covers one "lap" of indices.
23 const LAP
: usize = 32;
24 // The maximum number of values a block can hold.
25 const BLOCK_CAP
: usize = LAP
- 1;
26 // How many lower bits are reserved for metadata.
27 const SHIFT
: usize = 1;
28 // Indicates that the block is not the last one.
29 const HAS_NEXT
: usize = 1;
31 /// A slot in a block.
34 value
: UnsafeCell
<MaybeUninit
<T
>>,
36 /// The state of the slot.
41 /// Waits until a value is written into the slot.
42 fn wait_write(&self) {
43 let backoff
= Backoff
::new();
44 while self.state
.load(Ordering
::Acquire
) & WRITE
== 0 {
50 /// A block in a linked list.
52 /// Each block in the list can hold up to `BLOCK_CAP` values.
54 /// The next block in the linked list.
55 next
: AtomicPtr
<Block
<T
>>,
58 slots
: [Slot
<T
>; BLOCK_CAP
],
62 /// Creates an empty block that starts at `start_index`.
63 fn new() -> Block
<T
> {
64 // SAFETY: This is safe because:
65 // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
66 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
67 // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
68 // holds a MaybeUninit.
69 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
70 unsafe { MaybeUninit::zeroed().assume_init() }
73 /// Waits until the next pointer is set.
74 fn wait_next(&self) -> *mut Block
<T
> {
75 let backoff
= Backoff
::new();
77 let next
= self.next
.load(Ordering
::Acquire
);
85 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
86 unsafe fn destroy(this
: *mut Block
<T
>, start
: usize) {
87 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
88 // begun destruction of the block.
89 for i
in start
..BLOCK_CAP
- 1 {
90 let slot
= (*this
).slots
.get_unchecked(i
);
92 // Mark the `DESTROY` bit if a thread is still using the slot.
93 if slot
.state
.load(Ordering
::Acquire
) & READ
== 0
94 && slot
.state
.fetch_or(DESTROY
, Ordering
::AcqRel
) & READ
== 0
96 // If a thread is still using the slot, it will continue destruction of the block.
101 // No thread is using the block, now it is safe to destroy it.
102 drop(Box
::from_raw(this
));
106 /// A position in a queue.
108 /// The index in the queue.
111 /// The block in the linked list.
112 block
: AtomicPtr
<Block
<T
>>,
115 /// An unbounded multi-producer multi-consumer queue.
117 /// This queue is implemented as a linked list of segments, where each segment is a small buffer
118 /// that can hold a handful of elements. There is no limit to how many elements can be in the queue
119 /// at a time. However, since segments need to be dynamically allocated as elements get pushed,
120 /// this queue is somewhat slower than [`ArrayQueue`].
122 /// [`ArrayQueue`]: struct.ArrayQueue.html
127 /// use crossbeam_queue::{PopError, SegQueue};
129 /// let q = SegQueue::new();
134 /// assert_eq!(q.pop(), Ok('a'));
135 /// assert_eq!(q.pop(), Ok('b'));
136 /// assert_eq!(q.pop(), Err(PopError));
138 pub struct SegQueue
<T
> {
139 /// The head of the queue.
140 head
: CachePadded
<Position
<T
>>,
142 /// The tail of the queue.
143 tail
: CachePadded
<Position
<T
>>,
145 /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`.
146 _marker
: PhantomData
<T
>,
149 unsafe impl<T
: Send
> Send
for SegQueue
<T
> {}
150 unsafe impl<T
: Send
> Sync
for SegQueue
<T
> {}
152 impl<T
> SegQueue
<T
> {
153 /// Creates a new unbounded queue.
158 /// use crossbeam_queue::SegQueue;
160 /// let q = SegQueue::<i32>::new();
162 pub fn new() -> SegQueue
<T
> {
164 head
: CachePadded
::new(Position
{
165 block
: AtomicPtr
::new(ptr
::null_mut()),
166 index
: AtomicUsize
::new(0),
168 tail
: CachePadded
::new(Position
{
169 block
: AtomicPtr
::new(ptr
::null_mut()),
170 index
: AtomicUsize
::new(0),
172 _marker
: PhantomData
,
176 /// Pushes an element into the queue.
181 /// use crossbeam_queue::SegQueue;
183 /// let q = SegQueue::new();
188 pub fn push(&self, value
: T
) {
189 let backoff
= Backoff
::new();
190 let mut tail
= self.tail
.index
.load(Ordering
::Acquire
);
191 let mut block
= self.tail
.block
.load(Ordering
::Acquire
);
192 let mut next_block
= None
;
195 // Calculate the offset of the index into the block.
196 let offset
= (tail
>> SHIFT
) % LAP
;
198 // If we reached the end of the block, wait until the next one is installed.
199 if offset
== BLOCK_CAP
{
201 tail
= self.tail
.index
.load(Ordering
::Acquire
);
202 block
= self.tail
.block
.load(Ordering
::Acquire
);
206 // If we're going to have to install the next block, allocate it in advance in order to
207 // make the wait for other threads as short as possible.
208 if offset
+ 1 == BLOCK_CAP
&& next_block
.is_none() {
209 next_block
= Some(Box
::new(Block
::<T
>::new()));
212 // If this is the first push operation, we need to allocate the first block.
214 let new
= Box
::into_raw(Box
::new(Block
::<T
>::new()));
219 .compare_and_swap(block
, new
, Ordering
::Release
)
222 self.head
.block
.store(new
, Ordering
::Release
);
225 next_block
= unsafe { Some(Box::from_raw(new)) }
;
226 tail
= self.tail
.index
.load(Ordering
::Acquire
);
227 block
= self.tail
.block
.load(Ordering
::Acquire
);
232 let new_tail
= tail
+ (1 << SHIFT
);
234 // Try advancing the tail forward.
235 match self.tail
.index
.compare_exchange_weak(
242 // If we've reached the end of the block, install the next one.
243 if offset
+ 1 == BLOCK_CAP
{
244 let next_block
= Box
::into_raw(next_block
.unwrap());
245 let next_index
= new_tail
.wrapping_add(1 << SHIFT
);
247 self.tail
.block
.store(next_block
, Ordering
::Release
);
248 self.tail
.index
.store(next_index
, Ordering
::Release
);
249 (*block
).next
.store(next_block
, Ordering
::Release
);
252 // Write the value into the slot.
253 let slot
= (*block
).slots
.get_unchecked(offset
);
254 slot
.value
.get().write(MaybeUninit
::new(value
));
255 slot
.state
.fetch_or(WRITE
, Ordering
::Release
);
261 block
= self.tail
.block
.load(Ordering
::Acquire
);
268 /// Pops an element from the queue.
270 /// If the queue is empty, an error is returned.
275 /// use crossbeam_queue::{PopError, SegQueue};
277 /// let q = SegQueue::new();
280 /// assert_eq!(q.pop(), Ok(10));
281 /// assert_eq!(q.pop(), Err(PopError));
283 pub fn pop(&self) -> Result
<T
, PopError
> {
284 let backoff
= Backoff
::new();
285 let mut head
= self.head
.index
.load(Ordering
::Acquire
);
286 let mut block
= self.head
.block
.load(Ordering
::Acquire
);
289 // Calculate the offset of the index into the block.
290 let offset
= (head
>> SHIFT
) % LAP
;
292 // If we reached the end of the block, wait until the next one is installed.
293 if offset
== BLOCK_CAP
{
295 head
= self.head
.index
.load(Ordering
::Acquire
);
296 block
= self.head
.block
.load(Ordering
::Acquire
);
300 let mut new_head
= head
+ (1 << SHIFT
);
302 if new_head
& HAS_NEXT
== 0 {
303 atomic
::fence(Ordering
::SeqCst
);
304 let tail
= self.tail
.index
.load(Ordering
::Relaxed
);
306 // If the tail equals the head, that means the queue is empty.
307 if head
>> SHIFT
== tail
>> SHIFT
{
308 return Err(PopError
);
311 // If head and tail are not in the same block, set `HAS_NEXT` in head.
312 if (head
>> SHIFT
) / LAP
!= (tail
>> SHIFT
) / LAP
{
313 new_head
|= HAS_NEXT
;
317 // The block can be null here only if the first push operation is in progress. In that
318 // case, just wait until it gets initialized.
321 head
= self.head
.index
.load(Ordering
::Acquire
);
322 block
= self.head
.block
.load(Ordering
::Acquire
);
326 // Try moving the head index forward.
327 match self.head
.index
.compare_exchange_weak(
334 // If we've reached the end of the block, move to the next one.
335 if offset
+ 1 == BLOCK_CAP
{
336 let next
= (*block
).wait_next();
337 let mut next_index
= (new_head
& !HAS_NEXT
).wrapping_add(1 << SHIFT
);
338 if !(*next
).next
.load(Ordering
::Relaxed
).is_null() {
339 next_index
|= HAS_NEXT
;
342 self.head
.block
.store(next
, Ordering
::Release
);
343 self.head
.index
.store(next_index
, Ordering
::Release
);
347 let slot
= (*block
).slots
.get_unchecked(offset
);
349 let value
= slot
.value
.get().read().assume_init();
351 // Destroy the block if we've reached the end, or if another thread wanted to
352 // destroy but couldn't because we were busy reading from the slot.
353 if offset
+ 1 == BLOCK_CAP
{
354 Block
::destroy(block
, 0);
355 } else if slot
.state
.fetch_or(READ
, Ordering
::AcqRel
) & DESTROY
!= 0 {
356 Block
::destroy(block
, offset
+ 1);
363 block
= self.head
.block
.load(Ordering
::Acquire
);
370 /// Returns `true` if the queue is empty.
375 /// use crossbeam_queue::SegQueue;
377 /// let q = SegQueue::new();
379 /// assert!(q.is_empty());
381 /// assert!(!q.is_empty());
383 pub fn is_empty(&self) -> bool
{
384 let head
= self.head
.index
.load(Ordering
::SeqCst
);
385 let tail
= self.tail
.index
.load(Ordering
::SeqCst
);
386 head
>> SHIFT
== tail
>> SHIFT
389 /// Returns the number of elements in the queue.
394 /// use crossbeam_queue::{SegQueue, PopError};
396 /// let q = SegQueue::new();
397 /// assert_eq!(q.len(), 0);
400 /// assert_eq!(q.len(), 1);
403 /// assert_eq!(q.len(), 2);
405 pub fn len(&self) -> usize {
407 // Load the tail index, then load the head index.
408 let mut tail
= self.tail
.index
.load(Ordering
::SeqCst
);
409 let mut head
= self.head
.index
.load(Ordering
::SeqCst
);
411 // If the tail index didn't change, we've got consistent indices to work with.
412 if self.tail
.index
.load(Ordering
::SeqCst
) == tail
{
413 // Erase the lower bits.
414 tail
&= !((1 << SHIFT
) - 1);
415 head
&= !((1 << SHIFT
) - 1);
417 // Rotate indices so that head falls into the first block.
418 let lap
= (head
>> SHIFT
) / LAP
;
419 tail
= tail
.wrapping_sub((lap
* LAP
) << SHIFT
);
420 head
= head
.wrapping_sub((lap
* LAP
) << SHIFT
);
422 // Remove the lower bits.
426 // Fix up indices if they fall onto block ends.
427 if head
== BLOCK_CAP
{
431 if tail
== BLOCK_CAP
{
435 // Return the difference minus the number of blocks between tail and head.
436 return tail
- head
- tail
/ LAP
;
442 impl<T
> Drop
for SegQueue
<T
> {
444 let mut head
= self.head
.index
.load(Ordering
::Relaxed
);
445 let mut tail
= self.tail
.index
.load(Ordering
::Relaxed
);
446 let mut block
= self.head
.block
.load(Ordering
::Relaxed
);
448 // Erase the lower bits.
449 head
&= !((1 << SHIFT
) - 1);
450 tail
&= !((1 << SHIFT
) - 1);
453 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
455 let offset
= (head
>> SHIFT
) % LAP
;
457 if offset
< BLOCK_CAP
{
458 // Drop the value in the slot.
459 let slot
= (*block
).slots
.get_unchecked(offset
);
460 let p
= &mut *slot
.value
.get();
461 p
.as_mut_ptr().drop_in_place();
463 // Deallocate the block and move to the next one.
464 let next
= (*block
).next
.load(Ordering
::Relaxed
);
465 drop(Box
::from_raw(block
));
469 head
= head
.wrapping_add(1 << SHIFT
);
472 // Deallocate the last remaining block.
473 if !block
.is_null() {
474 drop(Box
::from_raw(block
));
480 impl<T
> fmt
::Debug
for SegQueue
<T
> {
481 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
482 f
.pad("SegQueue { .. }")
486 impl<T
> Default
for SegQueue
<T
> {
487 fn default() -> SegQueue
<T
> {