1 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
4 //! - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
6 //! Copyright & License:
7 //! - Copyright (c) 2010-2011 Dmitry Vyukov
8 //! - Simplified BSD License and Apache License, Version 2.0
9 //! - http://www.1024cores.net/home/code-license
12 use core
::cell
::UnsafeCell
;
14 use core
::marker
::PhantomData
;
17 use core
::sync
::atomic
::{self, AtomicUsize, Ordering}
;
19 use crossbeam_utils
::{Backoff, CachePadded}
;
21 use maybe_uninit
::MaybeUninit
;
23 use err
::{PopError, PushError}
;
25 /// A slot in a queue.
27 /// The current stamp.
29 /// If the stamp equals the tail, this node will be next written to. If it equals head + 1,
30 /// this node will be next read from.
33 /// The value in this slot.
34 value
: UnsafeCell
<MaybeUninit
<T
>>,
37 /// A bounded multi-producer multi-consumer queue.
39 /// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed
40 /// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an
41 /// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit
42 /// faster than [`SegQueue`].
44 /// [`SegQueue`]: struct.SegQueue.html
49 /// use crossbeam_queue::{ArrayQueue, PushError};
51 /// let q = ArrayQueue::new(2);
53 /// assert_eq!(q.push('a'), Ok(()));
54 /// assert_eq!(q.push('b'), Ok(()));
55 /// assert_eq!(q.push('c'), Err(PushError('c')));
56 /// assert_eq!(q.pop(), Ok('a'));
58 pub struct ArrayQueue
<T
> {
59 /// The head of the queue.
61 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
62 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
64 /// Elements are popped from the head of the queue.
65 head
: CachePadded
<AtomicUsize
>,
67 /// The tail of the queue.
69 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
70 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
72 /// Elements are pushed into the tail of the queue.
73 tail
: CachePadded
<AtomicUsize
>,
75 /// The buffer holding slots.
78 /// The queue capacity.
81 /// A stamp with the value of `{ lap: 1, index: 0 }`.
84 /// Indicates that dropping an `ArrayQueue<T>` may drop elements of type `T`.
85 _marker
: PhantomData
<T
>,
88 unsafe impl<T
: Send
> Sync
for ArrayQueue
<T
> {}
89 unsafe impl<T
: Send
> Send
for ArrayQueue
<T
> {}
91 impl<T
> ArrayQueue
<T
> {
92 /// Creates a new bounded queue with the given capacity.
96 /// Panics if the capacity is zero.
101 /// use crossbeam_queue::ArrayQueue;
103 /// let q = ArrayQueue::<i32>::new(100);
105 pub fn new(cap
: usize) -> ArrayQueue
<T
> {
106 assert
!(cap
> 0, "capacity must be non-zero");
108 // Head is initialized to `{ lap: 0, index: 0 }`.
109 // Tail is initialized to `{ lap: 0, index: 0 }`.
113 // Allocate a buffer of `cap` slots.
115 let mut v
= Vec
::<Slot
<T
>>::with_capacity(cap
);
116 let ptr
= v
.as_mut_ptr();
121 // Initialize stamps in the slots.
124 // Set the stamp to `{ lap: 0, index: i }`.
125 let slot
= buffer
.add(i
);
126 ptr
::write(&mut (*slot
).stamp
, AtomicUsize
::new(i
));
130 // One lap is the smallest power of two greater than `cap`.
131 let one_lap
= (cap
+ 1).next_power_of_two();
137 head
: CachePadded
::new(AtomicUsize
::new(head
)),
138 tail
: CachePadded
::new(AtomicUsize
::new(tail
)),
139 _marker
: PhantomData
,
143 /// Attempts to push an element into the queue.
145 /// If the queue is full, the element is returned back as an error.
150 /// use crossbeam_queue::{ArrayQueue, PushError};
152 /// let q = ArrayQueue::new(1);
154 /// assert_eq!(q.push(10), Ok(()));
155 /// assert_eq!(q.push(20), Err(PushError(20)));
157 pub fn push(&self, value
: T
) -> Result
<(), PushError
<T
>> {
158 let backoff
= Backoff
::new();
159 let mut tail
= self.tail
.load(Ordering
::Relaxed
);
162 // Deconstruct the tail.
163 let index
= tail
& (self.one_lap
- 1);
164 let lap
= tail
& !(self.one_lap
- 1);
166 // Inspect the corresponding slot.
167 let slot
= unsafe { &*self.buffer.add(index) }
;
168 let stamp
= slot
.stamp
.load(Ordering
::Acquire
);
170 // If the tail and the stamp match, we may attempt to push.
172 let new_tail
= if index
+ 1 < self.cap
{
173 // Same lap, incremented index.
174 // Set to `{ lap: lap, index: index + 1 }`.
177 // One lap forward, index wraps around to zero.
178 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
179 lap
.wrapping_add(self.one_lap
)
182 // Try moving the tail.
183 match self.tail
.compare_exchange_weak(
190 // Write the value into the slot and update the stamp.
192 slot
.value
.get().write(MaybeUninit
::new(value
));
194 slot
.stamp
.store(tail
+ 1, Ordering
::Release
);
202 } else if stamp
.wrapping_add(self.one_lap
) == tail
+ 1 {
203 atomic
::fence(Ordering
::SeqCst
);
204 let head
= self.head
.load(Ordering
::Relaxed
);
206 // If the head lags one lap behind the tail as well...
207 if head
.wrapping_add(self.one_lap
) == tail
{
208 // ...then the queue is full.
209 return Err(PushError(value
));
213 tail
= self.tail
.load(Ordering
::Relaxed
);
215 // Snooze because we need to wait for the stamp to get updated.
217 tail
= self.tail
.load(Ordering
::Relaxed
);
222 /// Attempts to pop an element from the queue.
224 /// If the queue is empty, an error is returned.
229 /// use crossbeam_queue::{ArrayQueue, PopError};
231 /// let q = ArrayQueue::new(1);
232 /// assert_eq!(q.push(10), Ok(()));
234 /// assert_eq!(q.pop(), Ok(10));
235 /// assert_eq!(q.pop(), Err(PopError));
237 pub fn pop(&self) -> Result
<T
, PopError
> {
238 let backoff
= Backoff
::new();
239 let mut head
= self.head
.load(Ordering
::Relaxed
);
242 // Deconstruct the head.
243 let index
= head
& (self.one_lap
- 1);
244 let lap
= head
& !(self.one_lap
- 1);
246 // Inspect the corresponding slot.
247 let slot
= unsafe { &*self.buffer.add(index) }
;
248 let stamp
= slot
.stamp
.load(Ordering
::Acquire
);
250 // If the the stamp is ahead of the head by 1, we may attempt to pop.
251 if head
+ 1 == stamp
{
252 let new
= if index
+ 1 < self.cap
{
253 // Same lap, incremented index.
254 // Set to `{ lap: lap, index: index + 1 }`.
257 // One lap forward, index wraps around to zero.
258 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
259 lap
.wrapping_add(self.one_lap
)
262 // Try moving the head.
263 match self.head
.compare_exchange_weak(
270 // Read the value from the slot and update the stamp.
271 let msg
= unsafe { slot.value.get().read().assume_init() }
;
273 .store(head
.wrapping_add(self.one_lap
), Ordering
::Release
);
281 } else if stamp
== head
{
282 atomic
::fence(Ordering
::SeqCst
);
283 let tail
= self.tail
.load(Ordering
::Relaxed
);
285 // If the tail equals the head, that means the channel is empty.
287 return Err(PopError
);
291 head
= self.head
.load(Ordering
::Relaxed
);
293 // Snooze because we need to wait for the stamp to get updated.
295 head
= self.head
.load(Ordering
::Relaxed
);
300 /// Returns the capacity of the queue.
305 /// use crossbeam_queue::{ArrayQueue, PopError};
307 /// let q = ArrayQueue::<i32>::new(100);
309 /// assert_eq!(q.capacity(), 100);
311 pub fn capacity(&self) -> usize {
315 /// Returns `true` if the queue is empty.
320 /// use crossbeam_queue::{ArrayQueue, PopError};
322 /// let q = ArrayQueue::new(100);
324 /// assert!(q.is_empty());
325 /// q.push(1).unwrap();
326 /// assert!(!q.is_empty());
328 pub fn is_empty(&self) -> bool
{
329 let head
= self.head
.load(Ordering
::SeqCst
);
330 let tail
= self.tail
.load(Ordering
::SeqCst
);
332 // Is the tail lagging one lap behind head?
333 // Is the tail equal to the head?
335 // Note: If the head changes just before we load the tail, that means there was a moment
336 // when the channel was not empty, so it is safe to just return `false`.
340 /// Returns `true` if the queue is full.
345 /// use crossbeam_queue::{ArrayQueue, PopError};
347 /// let q = ArrayQueue::new(1);
349 /// assert!(!q.is_full());
350 /// q.push(1).unwrap();
351 /// assert!(q.is_full());
353 pub fn is_full(&self) -> bool
{
354 let tail
= self.tail
.load(Ordering
::SeqCst
);
355 let head
= self.head
.load(Ordering
::SeqCst
);
357 // Is the head lagging one lap behind tail?
359 // Note: If the tail changes just before we load the head, that means there was a moment
360 // when the queue was not full, so it is safe to just return `false`.
361 head
.wrapping_add(self.one_lap
) == tail
364 /// Returns the number of elements in the queue.
369 /// use crossbeam_queue::{ArrayQueue, PopError};
371 /// let q = ArrayQueue::new(100);
372 /// assert_eq!(q.len(), 0);
374 /// q.push(10).unwrap();
375 /// assert_eq!(q.len(), 1);
377 /// q.push(20).unwrap();
378 /// assert_eq!(q.len(), 2);
380 pub fn len(&self) -> usize {
382 // Load the tail, then load the head.
383 let tail
= self.tail
.load(Ordering
::SeqCst
);
384 let head
= self.head
.load(Ordering
::SeqCst
);
386 // If the tail didn't change, we've got consistent values to work with.
387 if self.tail
.load(Ordering
::SeqCst
) == tail
{
388 let hix
= head
& (self.one_lap
- 1);
389 let tix
= tail
& (self.one_lap
- 1);
391 return if hix
< tix
{
393 } else if hix
> tix
{
395 } else if tail
== head
{
405 impl<T
> Drop
for ArrayQueue
<T
> {
407 // Get the index of the head.
408 let hix
= self.head
.load(Ordering
::Relaxed
) & (self.one_lap
- 1);
410 // Loop over all slots that hold a message and drop them.
411 for i
in 0..self.len() {
412 // Compute the index of the next slot holding a message.
413 let index
= if hix
+ i
< self.cap
{
421 let slot
= &mut *self.buffer
.add(index
);
422 let value
= &mut *slot
.value
.get();
429 // Finally, deallocate the buffer, but don't run any destructors.
431 Vec
::from_raw_parts(self.buffer
, 0, self.cap
);
436 impl<T
> fmt
::Debug
for ArrayQueue
<T
> {
437 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
438 f
.pad("ArrayQueue { .. }")