2 use core
::cell
::UnsafeCell
;
4 use core
::marker
::PhantomData
;
5 use core
::mem
::MaybeUninit
;
7 use core
::sync
::atomic
::{self, AtomicPtr, AtomicUsize, Ordering}
;
9 use crossbeam_utils
::{Backoff, CachePadded}
;
11 // Bits indicating the state of a slot:
12 // * If a value has been written into the slot, `WRITE` is set.
13 // * If a value has been read from the slot, `READ` is set.
14 // * If the block is being destroyed, `DESTROY` is set.
15 const WRITE
: usize = 1;
16 const READ
: usize = 2;
17 const DESTROY
: usize = 4;
19 // Each block covers one "lap" of indices.
20 const LAP
: usize = 32;
21 // The maximum number of values a block can hold.
22 const BLOCK_CAP
: usize = LAP
- 1;
23 // How many lower bits are reserved for metadata.
24 const SHIFT
: usize = 1;
25 // Indicates that the block is not the last one.
26 const HAS_NEXT
: usize = 1;
28 /// A slot in a block.
31 value
: UnsafeCell
<MaybeUninit
<T
>>,
33 /// The state of the slot.
38 /// Waits until a value is written into the slot.
39 fn wait_write(&self) {
40 let backoff
= Backoff
::new();
41 while self.state
.load(Ordering
::Acquire
) & WRITE
== 0 {
47 /// A block in a linked list.
49 /// Each block in the list can hold up to `BLOCK_CAP` values.
51 /// The next block in the linked list.
52 next
: AtomicPtr
<Block
<T
>>,
55 slots
: [Slot
<T
>; BLOCK_CAP
],
59 /// Creates an empty block that starts at `start_index`.
60 fn new() -> Block
<T
> {
61 // SAFETY: This is safe because:
62 // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
63 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
64 // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
65 // holds a MaybeUninit.
66 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
67 unsafe { MaybeUninit::zeroed().assume_init() }
70 /// Waits until the next pointer is set.
71 fn wait_next(&self) -> *mut Block
<T
> {
72 let backoff
= Backoff
::new();
74 let next
= self.next
.load(Ordering
::Acquire
);
82 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
83 unsafe fn destroy(this
: *mut Block
<T
>, start
: usize) {
84 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
85 // begun destruction of the block.
86 for i
in start
..BLOCK_CAP
- 1 {
87 let slot
= (*this
).slots
.get_unchecked(i
);
89 // Mark the `DESTROY` bit if a thread is still using the slot.
90 if slot
.state
.load(Ordering
::Acquire
) & READ
== 0
91 && slot
.state
.fetch_or(DESTROY
, Ordering
::AcqRel
) & READ
== 0
93 // If a thread is still using the slot, it will continue destruction of the block.
98 // No thread is using the block, now it is safe to destroy it.
99 drop(Box
::from_raw(this
));
103 /// A position in a queue.
105 /// The index in the queue.
108 /// The block in the linked list.
109 block
: AtomicPtr
<Block
<T
>>,
112 /// An unbounded multi-producer multi-consumer queue.
114 /// This queue is implemented as a linked list of segments, where each segment is a small buffer
115 /// that can hold a handful of elements. There is no limit to how many elements can be in the queue
116 /// at a time. However, since segments need to be dynamically allocated as elements get pushed,
117 /// this queue is somewhat slower than [`ArrayQueue`].
119 /// [`ArrayQueue`]: super::ArrayQueue
124 /// use crossbeam_queue::SegQueue;
126 /// let q = SegQueue::new();
131 /// assert_eq!(q.pop(), Some('a'));
132 /// assert_eq!(q.pop(), Some('b'));
133 /// assert!(q.pop().is_none());
135 pub struct SegQueue
<T
> {
136 /// The head of the queue.
137 head
: CachePadded
<Position
<T
>>,
139 /// The tail of the queue.
140 tail
: CachePadded
<Position
<T
>>,
142 /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`.
143 _marker
: PhantomData
<T
>,
146 unsafe impl<T
: Send
> Send
for SegQueue
<T
> {}
147 unsafe impl<T
: Send
> Sync
for SegQueue
<T
> {}
149 impl<T
> SegQueue
<T
> {
150 /// Creates a new unbounded queue.
155 /// use crossbeam_queue::SegQueue;
157 /// let q = SegQueue::<i32>::new();
159 pub const fn new() -> SegQueue
<T
> {
161 head
: CachePadded
::new(Position
{
162 block
: AtomicPtr
::new(ptr
::null_mut()),
163 index
: AtomicUsize
::new(0),
165 tail
: CachePadded
::new(Position
{
166 block
: AtomicPtr
::new(ptr
::null_mut()),
167 index
: AtomicUsize
::new(0),
169 _marker
: PhantomData
,
173 /// Pushes an element into the queue.
178 /// use crossbeam_queue::SegQueue;
180 /// let q = SegQueue::new();
185 pub fn push(&self, value
: T
) {
186 let backoff
= Backoff
::new();
187 let mut tail
= self.tail
.index
.load(Ordering
::Acquire
);
188 let mut block
= self.tail
.block
.load(Ordering
::Acquire
);
189 let mut next_block
= None
;
192 // Calculate the offset of the index into the block.
193 let offset
= (tail
>> SHIFT
) % LAP
;
195 // If we reached the end of the block, wait until the next one is installed.
196 if offset
== BLOCK_CAP
{
198 tail
= self.tail
.index
.load(Ordering
::Acquire
);
199 block
= self.tail
.block
.load(Ordering
::Acquire
);
203 // If we're going to have to install the next block, allocate it in advance in order to
204 // make the wait for other threads as short as possible.
205 if offset
+ 1 == BLOCK_CAP
&& next_block
.is_none() {
206 next_block
= Some(Box
::new(Block
::<T
>::new()));
209 // If this is the first push operation, we need to allocate the first block.
211 let new
= Box
::into_raw(Box
::new(Block
::<T
>::new()));
216 .compare_exchange(block
, new
, Ordering
::Release
, Ordering
::Relaxed
)
219 self.head
.block
.store(new
, Ordering
::Release
);
222 next_block
= unsafe { Some(Box::from_raw(new)) }
;
223 tail
= self.tail
.index
.load(Ordering
::Acquire
);
224 block
= self.tail
.block
.load(Ordering
::Acquire
);
229 let new_tail
= tail
+ (1 << SHIFT
);
231 // Try advancing the tail forward.
232 match self.tail
.index
.compare_exchange_weak(
239 // If we've reached the end of the block, install the next one.
240 if offset
+ 1 == BLOCK_CAP
{
241 let next_block
= Box
::into_raw(next_block
.unwrap());
242 let next_index
= new_tail
.wrapping_add(1 << SHIFT
);
244 self.tail
.block
.store(next_block
, Ordering
::Release
);
245 self.tail
.index
.store(next_index
, Ordering
::Release
);
246 (*block
).next
.store(next_block
, Ordering
::Release
);
249 // Write the value into the slot.
250 let slot
= (*block
).slots
.get_unchecked(offset
);
251 slot
.value
.get().write(MaybeUninit
::new(value
));
252 slot
.state
.fetch_or(WRITE
, Ordering
::Release
);
258 block
= self.tail
.block
.load(Ordering
::Acquire
);
265 /// Pops an element from the queue.
267 /// If the queue is empty, `None` is returned.
272 /// use crossbeam_queue::SegQueue;
274 /// let q = SegQueue::new();
277 /// assert_eq!(q.pop(), Some(10));
278 /// assert!(q.pop().is_none());
280 pub fn pop(&self) -> Option
<T
> {
281 let backoff
= Backoff
::new();
282 let mut head
= self.head
.index
.load(Ordering
::Acquire
);
283 let mut block
= self.head
.block
.load(Ordering
::Acquire
);
286 // Calculate the offset of the index into the block.
287 let offset
= (head
>> SHIFT
) % LAP
;
289 // If we reached the end of the block, wait until the next one is installed.
290 if offset
== BLOCK_CAP
{
292 head
= self.head
.index
.load(Ordering
::Acquire
);
293 block
= self.head
.block
.load(Ordering
::Acquire
);
297 let mut new_head
= head
+ (1 << SHIFT
);
299 if new_head
& HAS_NEXT
== 0 {
300 atomic
::fence(Ordering
::SeqCst
);
301 let tail
= self.tail
.index
.load(Ordering
::Relaxed
);
303 // If the tail equals the head, that means the queue is empty.
304 if head
>> SHIFT
== tail
>> SHIFT
{
308 // If head and tail are not in the same block, set `HAS_NEXT` in head.
309 if (head
>> SHIFT
) / LAP
!= (tail
>> SHIFT
) / LAP
{
310 new_head
|= HAS_NEXT
;
314 // The block can be null here only if the first push operation is in progress. In that
315 // case, just wait until it gets initialized.
318 head
= self.head
.index
.load(Ordering
::Acquire
);
319 block
= self.head
.block
.load(Ordering
::Acquire
);
323 // Try moving the head index forward.
324 match self.head
.index
.compare_exchange_weak(
331 // If we've reached the end of the block, move to the next one.
332 if offset
+ 1 == BLOCK_CAP
{
333 let next
= (*block
).wait_next();
334 let mut next_index
= (new_head
& !HAS_NEXT
).wrapping_add(1 << SHIFT
);
335 if !(*next
).next
.load(Ordering
::Relaxed
).is_null() {
336 next_index
|= HAS_NEXT
;
339 self.head
.block
.store(next
, Ordering
::Release
);
340 self.head
.index
.store(next_index
, Ordering
::Release
);
344 let slot
= (*block
).slots
.get_unchecked(offset
);
346 let value
= slot
.value
.get().read().assume_init();
348 // Destroy the block if we've reached the end, or if another thread wanted to
349 // destroy but couldn't because we were busy reading from the slot.
350 if offset
+ 1 == BLOCK_CAP
{
351 Block
::destroy(block
, 0);
352 } else if slot
.state
.fetch_or(READ
, Ordering
::AcqRel
) & DESTROY
!= 0 {
353 Block
::destroy(block
, offset
+ 1);
360 block
= self.head
.block
.load(Ordering
::Acquire
);
367 /// Returns `true` if the queue is empty.
372 /// use crossbeam_queue::SegQueue;
374 /// let q = SegQueue::new();
376 /// assert!(q.is_empty());
378 /// assert!(!q.is_empty());
380 pub fn is_empty(&self) -> bool
{
381 let head
= self.head
.index
.load(Ordering
::SeqCst
);
382 let tail
= self.tail
.index
.load(Ordering
::SeqCst
);
383 head
>> SHIFT
== tail
>> SHIFT
386 /// Returns the number of elements in the queue.
391 /// use crossbeam_queue::SegQueue;
393 /// let q = SegQueue::new();
394 /// assert_eq!(q.len(), 0);
397 /// assert_eq!(q.len(), 1);
400 /// assert_eq!(q.len(), 2);
402 pub fn len(&self) -> usize {
404 // Load the tail index, then load the head index.
405 let mut tail
= self.tail
.index
.load(Ordering
::SeqCst
);
406 let mut head
= self.head
.index
.load(Ordering
::SeqCst
);
408 // If the tail index didn't change, we've got consistent indices to work with.
409 if self.tail
.index
.load(Ordering
::SeqCst
) == tail
{
410 // Erase the lower bits.
411 tail
&= !((1 << SHIFT
) - 1);
412 head
&= !((1 << SHIFT
) - 1);
414 // Fix up indices if they fall onto block ends.
415 if (tail
>> SHIFT
) & (LAP
- 1) == LAP
- 1 {
416 tail
= tail
.wrapping_add(1 << SHIFT
);
418 if (head
>> SHIFT
) & (LAP
- 1) == LAP
- 1 {
419 head
= head
.wrapping_add(1 << SHIFT
);
422 // Rotate indices so that head falls into the first block.
423 let lap
= (head
>> SHIFT
) / LAP
;
424 tail
= tail
.wrapping_sub((lap
* LAP
) << SHIFT
);
425 head
= head
.wrapping_sub((lap
* LAP
) << SHIFT
);
427 // Remove the lower bits.
431 // Return the difference minus the number of blocks between tail and head.
432 return tail
- head
- tail
/ LAP
;
438 impl<T
> Drop
for SegQueue
<T
> {
440 let mut head
= self.head
.index
.load(Ordering
::Relaxed
);
441 let mut tail
= self.tail
.index
.load(Ordering
::Relaxed
);
442 let mut block
= self.head
.block
.load(Ordering
::Relaxed
);
444 // Erase the lower bits.
445 head
&= !((1 << SHIFT
) - 1);
446 tail
&= !((1 << SHIFT
) - 1);
449 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
451 let offset
= (head
>> SHIFT
) % LAP
;
453 if offset
< BLOCK_CAP
{
454 // Drop the value in the slot.
455 let slot
= (*block
).slots
.get_unchecked(offset
);
456 let p
= &mut *slot
.value
.get();
457 p
.as_mut_ptr().drop_in_place();
459 // Deallocate the block and move to the next one.
460 let next
= (*block
).next
.load(Ordering
::Relaxed
);
461 drop(Box
::from_raw(block
));
465 head
= head
.wrapping_add(1 << SHIFT
);
468 // Deallocate the last remaining block.
469 if !block
.is_null() {
470 drop(Box
::from_raw(block
));
476 impl<T
> fmt
::Debug
for SegQueue
<T
> {
477 fn fmt(&self, f
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
478 f
.pad("SegQueue { .. }")
482 impl<T
> Default
for SegQueue
<T
> {
483 fn default() -> SegQueue
<T
> {
488 impl<T
> IntoIterator
for SegQueue
<T
> {
491 type IntoIter
= IntoIter
<T
>;
493 fn into_iter(self) -> Self::IntoIter
{
494 IntoIter { value: self }
499 pub struct IntoIter
<T
> {
503 impl<T
> Iterator
for IntoIter
<T
> {
506 fn next(&mut self) -> Option
<Self::Item
> {
507 let value
= &mut self.value
;
508 let head
= *value
.head
.index
.get_mut();
509 let tail
= *value
.tail
.index
.get_mut();
510 if head
>> SHIFT
== tail
>> SHIFT
{
513 let block
= *value
.head
.block
.get_mut();
514 let offset
= (head
>> SHIFT
) % LAP
;
516 // SAFETY: We have mutable access to this, so we can read without
517 // worrying about concurrency. Furthermore, we know this is
518 // initialized because it is the value pointed at by `value.head`
519 // and this is a non-empty queue.
521 let slot
= (*block
).slots
.get_unchecked(offset
);
522 let p
= &mut *slot
.value
.get();
523 p
.as_mut_ptr().read()
525 if offset
+ 1 == BLOCK_CAP
{
526 // Deallocate the block and move to the next one.
527 // SAFETY: The block is initialized because we've been reading
528 // from it this entire time. We can drop it b/c everything has
529 // been read out of it, so nothing is pointing to it anymore.
531 let next
= *(*block
).next
.get_mut();
532 drop(Box
::from_raw(block
));
533 *value
.head
.block
.get_mut() = next
;
535 // The last value in a block is empty, so skip it
536 *value
.head
.index
.get_mut() = head
.wrapping_add(2 << SHIFT
);
537 // Double-check that we're pointing to the first item in a block.
538 debug_assert_eq
!((*value
.head
.index
.get_mut() >> SHIFT
) % LAP
, 0);
540 *value
.head
.index
.get_mut() = head
.wrapping_add(1 << SHIFT
);