1 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
4 //! - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
6 //! Copyright & License:
7 //! - Copyright (c) 2010-2011 Dmitry Vyukov
8 //! - Simplified BSD License and Apache License, Version 2.0
9 //! - http://www.1024cores.net/home/code-license
11 use std
::cell
::UnsafeCell
;
13 use std
::marker
::PhantomData
;
16 use std
::sync
::atomic
::{self, AtomicUsize, Ordering}
;
18 use crossbeam_utils
::{Backoff, CachePadded}
;
20 use err
::{PopError, PushError}
;
22 /// A slot in a queue.
24 /// The current stamp.
26 /// If the stamp equals the tail, this node will be next written to. If it equals the head,
27 /// this node will be next read from.
30 /// The value in this slot.
34 /// A bounded multi-producer multi-consumer queue.
36 /// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed
37 /// elements. The queue cannot hold more elements that the buffer allows. Attempting to push an
38 /// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit
39 /// faster than [`SegQueue`].
41 /// [`SegQueue`]: struct.SegQueue.html
46 /// use crossbeam_queue::{ArrayQueue, PushError};
48 /// let q = ArrayQueue::new(2);
50 /// assert_eq!(q.push('a'), Ok(()));
51 /// assert_eq!(q.push('b'), Ok(()));
52 /// assert_eq!(q.push('c'), Err(PushError('c')));
53 /// assert_eq!(q.pop(), Ok('a'));
55 pub struct ArrayQueue
<T
> {
56 /// The head of the queue.
58 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
59 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
61 /// Elements are popped from the head of the queue.
62 head
: CachePadded
<AtomicUsize
>,
64 /// The tail of the queue.
66 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
67 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
69 /// Elements are pushed into the tail of the queue.
70 tail
: CachePadded
<AtomicUsize
>,
72 /// The buffer holding slots.
75 /// The queue capacity.
78 /// A stamp with the value of `{ lap: 1, index: 0 }`.
81 /// Indicates that dropping an `ArrayQueue<T>` may drop elements of type `T`.
82 _marker
: PhantomData
<T
>,
85 unsafe impl<T
: Send
> Sync
for ArrayQueue
<T
> {}
86 unsafe impl<T
: Send
> Send
for ArrayQueue
<T
> {}
88 impl<T
> ArrayQueue
<T
> {
89 /// Creates a new bounded queue with the given capacity.
93 /// Panics if the capacity is zero.
98 /// use crossbeam_queue::ArrayQueue;
100 /// let q = ArrayQueue::<i32>::new(100);
102 pub fn new(cap
: usize) -> ArrayQueue
<T
> {
103 assert
!(cap
> 0, "capacity must be non-zero");
105 // Head is initialized to `{ lap: 0, index: 0 }`.
106 // Tail is initialized to `{ lap: 0, index: 0 }`.
110 // Allocate a buffer of `cap` slots.
112 let mut v
= Vec
::<Slot
<T
>>::with_capacity(cap
);
113 let ptr
= v
.as_mut_ptr();
118 // Initialize stamps in the slots.
121 // Set the stamp to `{ lap: 0, index: i }`.
122 let slot
= buffer
.add(i
);
123 ptr
::write(&mut (*slot
).stamp
, AtomicUsize
::new(i
));
127 // One lap is the smallest power of two greater than `cap`.
128 let one_lap
= (cap
+ 1).next_power_of_two();
134 head
: CachePadded
::new(AtomicUsize
::new(head
)),
135 tail
: CachePadded
::new(AtomicUsize
::new(tail
)),
136 _marker
: PhantomData
,
140 /// Attempts to push an element into the queue.
142 /// If the queue is full, the element is returned back as an error.
147 /// use crossbeam_queue::{ArrayQueue, PushError};
149 /// let q = ArrayQueue::new(1);
151 /// assert_eq!(q.push(10), Ok(()));
152 /// assert_eq!(q.push(20), Err(PushError(20)));
154 pub fn push(&self, value
: T
) -> Result
<(), PushError
<T
>> {
155 let backoff
= Backoff
::new();
156 let mut tail
= self.tail
.load(Ordering
::Relaxed
);
159 // Deconstruct the tail.
160 let index
= tail
& (self.one_lap
- 1);
161 let lap
= tail
& !(self.one_lap
- 1);
163 // Inspect the corresponding slot.
164 let slot
= unsafe { &*self.buffer.add(index) }
;
165 let stamp
= slot
.stamp
.load(Ordering
::Acquire
);
167 // If the tail and the stamp match, we may attempt to push.
169 let new_tail
= if index
+ 1 < self.cap
{
170 // Same lap, incremented index.
171 // Set to `{ lap: lap, index: index + 1 }`.
174 // One lap forward, index wraps around to zero.
175 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
176 lap
.wrapping_add(self.one_lap
)
179 // Try moving the tail.
182 .compare_exchange_weak(tail
, new_tail
, Ordering
::SeqCst
, Ordering
::Relaxed
)
185 // Write the value into the slot and update the stamp.
186 unsafe { slot.value.get().write(value); }
187 slot
.stamp
.store(tail
+ 1, Ordering
::Release
);
195 } else if stamp
.wrapping_add(self.one_lap
) == tail
+ 1 {
196 atomic
::fence(Ordering
::SeqCst
);
197 let head
= self.head
.load(Ordering
::Relaxed
);
199 // If the head lags one lap behind the tail as well...
200 if head
.wrapping_add(self.one_lap
) == tail
{
201 // ...then the queue is full.
202 return Err(PushError(value
));
206 tail
= self.tail
.load(Ordering
::Relaxed
);
208 // Snooze because we need to wait for the stamp to get updated.
210 tail
= self.tail
.load(Ordering
::Relaxed
);
215 /// Attempts to pop an element from the queue.
217 /// If the queue is empty, an error is returned.
222 /// use crossbeam_queue::{ArrayQueue, PopError};
224 /// let q = ArrayQueue::new(1);
225 /// assert_eq!(q.push(10), Ok(()));
227 /// assert_eq!(q.pop(), Ok(10));
228 /// assert_eq!(q.pop(), Err(PopError));
230 pub fn pop(&self) -> Result
<T
, PopError
> {
231 let backoff
= Backoff
::new();
232 let mut head
= self.head
.load(Ordering
::Relaxed
);
235 // Deconstruct the head.
236 let index
= head
& (self.one_lap
- 1);
237 let lap
= head
& !(self.one_lap
- 1);
239 // Inspect the corresponding slot.
240 let slot
= unsafe { &*self.buffer.add(index) }
;
241 let stamp
= slot
.stamp
.load(Ordering
::Acquire
);
243 // If the the stamp is ahead of the head by 1, we may attempt to pop.
244 if head
+ 1 == stamp
{
245 let new
= if index
+ 1 < self.cap
{
246 // Same lap, incremented index.
247 // Set to `{ lap: lap, index: index + 1 }`.
250 // One lap forward, index wraps around to zero.
251 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
252 lap
.wrapping_add(self.one_lap
)
255 // Try moving the head.
258 .compare_exchange_weak(head
, new
, Ordering
::SeqCst
, Ordering
::Relaxed
)
261 // Read the value from the slot and update the stamp.
262 let msg
= unsafe { slot.value.get().read() }
;
263 slot
.stamp
.store(head
.wrapping_add(self.one_lap
), Ordering
::Release
);
271 } else if stamp
== head
{
272 atomic
::fence(Ordering
::SeqCst
);
273 let tail
= self.tail
.load(Ordering
::Relaxed
);
275 // If the tail equals the head, that means the channel is empty.
277 return Err(PopError
);
281 head
= self.head
.load(Ordering
::Relaxed
);
283 // Snooze because we need to wait for the stamp to get updated.
285 head
= self.head
.load(Ordering
::Relaxed
);
290 /// Returns the capacity of the queue.
295 /// use crossbeam_queue::{ArrayQueue, PopError};
297 /// let q = ArrayQueue::<i32>::new(100);
299 /// assert_eq!(q.capacity(), 100);
301 pub fn capacity(&self) -> usize {
305 /// Returns `true` if the queue is empty.
310 /// use crossbeam_queue::{ArrayQueue, PopError};
312 /// let q = ArrayQueue::new(100);
314 /// assert!(q.is_empty());
315 /// q.push(1).unwrap();
316 /// assert!(!q.is_empty());
318 pub fn is_empty(&self) -> bool
{
319 let head
= self.head
.load(Ordering
::SeqCst
);
320 let tail
= self.tail
.load(Ordering
::SeqCst
);
322 // Is the tail lagging one lap behind head?
323 // Is the tail equal to the head?
325 // Note: If the head changes just before we load the tail, that means there was a moment
326 // when the channel was not empty, so it is safe to just return `false`.
330 /// Returns `true` if the queue is full.
335 /// use crossbeam_queue::{ArrayQueue, PopError};
337 /// let q = ArrayQueue::new(1);
339 /// assert!(!q.is_full());
340 /// q.push(1).unwrap();
341 /// assert!(q.is_full());
343 pub fn is_full(&self) -> bool
{
344 let tail
= self.tail
.load(Ordering
::SeqCst
);
345 let head
= self.head
.load(Ordering
::SeqCst
);
347 // Is the head lagging one lap behind tail?
349 // Note: If the tail changes just before we load the head, that means there was a moment
350 // when the queue was not full, so it is safe to just return `false`.
351 head
.wrapping_add(self.one_lap
) == tail
354 /// Returns the number of elements in the queue.
359 /// use crossbeam_queue::{ArrayQueue, PopError};
361 /// let q = ArrayQueue::new(100);
362 /// assert_eq!(q.len(), 0);
364 /// q.push(10).unwrap();
365 /// assert_eq!(q.len(), 1);
367 /// q.push(20).unwrap();
368 /// assert_eq!(q.len(), 2);
370 pub fn len(&self) -> usize {
372 // Load the tail, then load the head.
373 let tail
= self.tail
.load(Ordering
::SeqCst
);
374 let head
= self.head
.load(Ordering
::SeqCst
);
376 // If the tail didn't change, we've got consistent values to work with.
377 if self.tail
.load(Ordering
::SeqCst
) == tail
{
378 let hix
= head
& (self.one_lap
- 1);
379 let tix
= tail
& (self.one_lap
- 1);
381 return if hix
< tix
{
383 } else if hix
> tix
{
385 } else if tail
== head
{
395 impl<T
> Drop
for ArrayQueue
<T
> {
397 // Get the index of the head.
398 let hix
= self.head
.load(Ordering
::Relaxed
) & (self.one_lap
- 1);
400 // Loop over all slots that hold a message and drop them.
401 for i
in 0..self.len() {
402 // Compute the index of the next slot holding a message.
403 let index
= if hix
+ i
< self.cap
{
410 self.buffer
.add(index
).drop_in_place();
414 // Finally, deallocate the buffer, but don't run any destructors.
416 Vec
::from_raw_parts(self.buffer
, 0, self.cap
);
421 impl<T
> fmt
::Debug
for ArrayQueue
<T
> {
422 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
423 f
.pad("ArrayQueue { .. }")