]> git.proxmox.com Git - rustc.git/blob - vendor/crossbeam-queue/src/array_queue.rs
New upstream version 1.46.0~beta.2+dfsg1
[rustc.git] / vendor / crossbeam-queue / src / array_queue.rs
1 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
2 //!
3 //! Source:
4 //! - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
5 //!
6 //! Copyright & License:
7 //! - Copyright (c) 2010-2011 Dmitry Vyukov
8 //! - Simplified BSD License and Apache License, Version 2.0
9 //! - http://www.1024cores.net/home/code-license
10
11 use alloc::vec::Vec;
12 use core::cell::UnsafeCell;
13 use core::fmt;
14 use core::marker::PhantomData;
15 use core::mem;
16 use core::ptr;
17 use core::sync::atomic::{self, AtomicUsize, Ordering};
18
19 use crossbeam_utils::{Backoff, CachePadded};
20
21 use maybe_uninit::MaybeUninit;
22
23 use err::{PopError, PushError};
24
25 /// A slot in a queue.
26 struct Slot<T> {
27 /// The current stamp.
28 ///
29 /// If the stamp equals the tail, this node will be next written to. If it equals head + 1,
30 /// this node will be next read from.
31 stamp: AtomicUsize,
32
33 /// The value in this slot.
34 value: UnsafeCell<MaybeUninit<T>>,
35 }
36
37 /// A bounded multi-producer multi-consumer queue.
38 ///
39 /// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed
40 /// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an
41 /// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit
42 /// faster than [`SegQueue`].
43 ///
44 /// [`SegQueue`]: struct.SegQueue.html
45 ///
46 /// # Examples
47 ///
48 /// ```
49 /// use crossbeam_queue::{ArrayQueue, PushError};
50 ///
51 /// let q = ArrayQueue::new(2);
52 ///
53 /// assert_eq!(q.push('a'), Ok(()));
54 /// assert_eq!(q.push('b'), Ok(()));
55 /// assert_eq!(q.push('c'), Err(PushError('c')));
56 /// assert_eq!(q.pop(), Ok('a'));
57 /// ```
58 pub struct ArrayQueue<T> {
59 /// The head of the queue.
60 ///
61 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
62 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
63 ///
64 /// Elements are popped from the head of the queue.
65 head: CachePadded<AtomicUsize>,
66
67 /// The tail of the queue.
68 ///
69 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
70 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
71 ///
72 /// Elements are pushed into the tail of the queue.
73 tail: CachePadded<AtomicUsize>,
74
75 /// The buffer holding slots.
76 buffer: *mut Slot<T>,
77
78 /// The queue capacity.
79 cap: usize,
80
81 /// A stamp with the value of `{ lap: 1, index: 0 }`.
82 one_lap: usize,
83
84 /// Indicates that dropping an `ArrayQueue<T>` may drop elements of type `T`.
85 _marker: PhantomData<T>,
86 }
87
88 unsafe impl<T: Send> Sync for ArrayQueue<T> {}
89 unsafe impl<T: Send> Send for ArrayQueue<T> {}
90
91 impl<T> ArrayQueue<T> {
92 /// Creates a new bounded queue with the given capacity.
93 ///
94 /// # Panics
95 ///
96 /// Panics if the capacity is zero.
97 ///
98 /// # Examples
99 ///
100 /// ```
101 /// use crossbeam_queue::ArrayQueue;
102 ///
103 /// let q = ArrayQueue::<i32>::new(100);
104 /// ```
105 pub fn new(cap: usize) -> ArrayQueue<T> {
106 assert!(cap > 0, "capacity must be non-zero");
107
108 // Head is initialized to `{ lap: 0, index: 0 }`.
109 // Tail is initialized to `{ lap: 0, index: 0 }`.
110 let head = 0;
111 let tail = 0;
112
113 // Allocate a buffer of `cap` slots.
114 let buffer = {
115 let mut v = Vec::<Slot<T>>::with_capacity(cap);
116 let ptr = v.as_mut_ptr();
117 mem::forget(v);
118 ptr
119 };
120
121 // Initialize stamps in the slots.
122 for i in 0..cap {
123 unsafe {
124 // Set the stamp to `{ lap: 0, index: i }`.
125 let slot = buffer.add(i);
126 ptr::write(&mut (*slot).stamp, AtomicUsize::new(i));
127 }
128 }
129
130 // One lap is the smallest power of two greater than `cap`.
131 let one_lap = (cap + 1).next_power_of_two();
132
133 ArrayQueue {
134 buffer,
135 cap,
136 one_lap,
137 head: CachePadded::new(AtomicUsize::new(head)),
138 tail: CachePadded::new(AtomicUsize::new(tail)),
139 _marker: PhantomData,
140 }
141 }
142
143 /// Attempts to push an element into the queue.
144 ///
145 /// If the queue is full, the element is returned back as an error.
146 ///
147 /// # Examples
148 ///
149 /// ```
150 /// use crossbeam_queue::{ArrayQueue, PushError};
151 ///
152 /// let q = ArrayQueue::new(1);
153 ///
154 /// assert_eq!(q.push(10), Ok(()));
155 /// assert_eq!(q.push(20), Err(PushError(20)));
156 /// ```
157 pub fn push(&self, value: T) -> Result<(), PushError<T>> {
158 let backoff = Backoff::new();
159 let mut tail = self.tail.load(Ordering::Relaxed);
160
161 loop {
162 // Deconstruct the tail.
163 let index = tail & (self.one_lap - 1);
164 let lap = tail & !(self.one_lap - 1);
165
166 // Inspect the corresponding slot.
167 let slot = unsafe { &*self.buffer.add(index) };
168 let stamp = slot.stamp.load(Ordering::Acquire);
169
170 // If the tail and the stamp match, we may attempt to push.
171 if tail == stamp {
172 let new_tail = if index + 1 < self.cap {
173 // Same lap, incremented index.
174 // Set to `{ lap: lap, index: index + 1 }`.
175 tail + 1
176 } else {
177 // One lap forward, index wraps around to zero.
178 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
179 lap.wrapping_add(self.one_lap)
180 };
181
182 // Try moving the tail.
183 match self.tail.compare_exchange_weak(
184 tail,
185 new_tail,
186 Ordering::SeqCst,
187 Ordering::Relaxed,
188 ) {
189 Ok(_) => {
190 // Write the value into the slot and update the stamp.
191 unsafe {
192 slot.value.get().write(MaybeUninit::new(value));
193 }
194 slot.stamp.store(tail + 1, Ordering::Release);
195 return Ok(());
196 }
197 Err(t) => {
198 tail = t;
199 backoff.spin();
200 }
201 }
202 } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
203 atomic::fence(Ordering::SeqCst);
204 let head = self.head.load(Ordering::Relaxed);
205
206 // If the head lags one lap behind the tail as well...
207 if head.wrapping_add(self.one_lap) == tail {
208 // ...then the queue is full.
209 return Err(PushError(value));
210 }
211
212 backoff.spin();
213 tail = self.tail.load(Ordering::Relaxed);
214 } else {
215 // Snooze because we need to wait for the stamp to get updated.
216 backoff.snooze();
217 tail = self.tail.load(Ordering::Relaxed);
218 }
219 }
220 }
221
222 /// Attempts to pop an element from the queue.
223 ///
224 /// If the queue is empty, an error is returned.
225 ///
226 /// # Examples
227 ///
228 /// ```
229 /// use crossbeam_queue::{ArrayQueue, PopError};
230 ///
231 /// let q = ArrayQueue::new(1);
232 /// assert_eq!(q.push(10), Ok(()));
233 ///
234 /// assert_eq!(q.pop(), Ok(10));
235 /// assert_eq!(q.pop(), Err(PopError));
236 /// ```
237 pub fn pop(&self) -> Result<T, PopError> {
238 let backoff = Backoff::new();
239 let mut head = self.head.load(Ordering::Relaxed);
240
241 loop {
242 // Deconstruct the head.
243 let index = head & (self.one_lap - 1);
244 let lap = head & !(self.one_lap - 1);
245
246 // Inspect the corresponding slot.
247 let slot = unsafe { &*self.buffer.add(index) };
248 let stamp = slot.stamp.load(Ordering::Acquire);
249
250 // If the the stamp is ahead of the head by 1, we may attempt to pop.
251 if head + 1 == stamp {
252 let new = if index + 1 < self.cap {
253 // Same lap, incremented index.
254 // Set to `{ lap: lap, index: index + 1 }`.
255 head + 1
256 } else {
257 // One lap forward, index wraps around to zero.
258 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
259 lap.wrapping_add(self.one_lap)
260 };
261
262 // Try moving the head.
263 match self.head.compare_exchange_weak(
264 head,
265 new,
266 Ordering::SeqCst,
267 Ordering::Relaxed,
268 ) {
269 Ok(_) => {
270 // Read the value from the slot and update the stamp.
271 let msg = unsafe { slot.value.get().read().assume_init() };
272 slot.stamp
273 .store(head.wrapping_add(self.one_lap), Ordering::Release);
274 return Ok(msg);
275 }
276 Err(h) => {
277 head = h;
278 backoff.spin();
279 }
280 }
281 } else if stamp == head {
282 atomic::fence(Ordering::SeqCst);
283 let tail = self.tail.load(Ordering::Relaxed);
284
285 // If the tail equals the head, that means the channel is empty.
286 if tail == head {
287 return Err(PopError);
288 }
289
290 backoff.spin();
291 head = self.head.load(Ordering::Relaxed);
292 } else {
293 // Snooze because we need to wait for the stamp to get updated.
294 backoff.snooze();
295 head = self.head.load(Ordering::Relaxed);
296 }
297 }
298 }
299
300 /// Returns the capacity of the queue.
301 ///
302 /// # Examples
303 ///
304 /// ```
305 /// use crossbeam_queue::{ArrayQueue, PopError};
306 ///
307 /// let q = ArrayQueue::<i32>::new(100);
308 ///
309 /// assert_eq!(q.capacity(), 100);
310 /// ```
311 pub fn capacity(&self) -> usize {
312 self.cap
313 }
314
315 /// Returns `true` if the queue is empty.
316 ///
317 /// # Examples
318 ///
319 /// ```
320 /// use crossbeam_queue::{ArrayQueue, PopError};
321 ///
322 /// let q = ArrayQueue::new(100);
323 ///
324 /// assert!(q.is_empty());
325 /// q.push(1).unwrap();
326 /// assert!(!q.is_empty());
327 /// ```
328 pub fn is_empty(&self) -> bool {
329 let head = self.head.load(Ordering::SeqCst);
330 let tail = self.tail.load(Ordering::SeqCst);
331
332 // Is the tail lagging one lap behind head?
333 // Is the tail equal to the head?
334 //
335 // Note: If the head changes just before we load the tail, that means there was a moment
336 // when the channel was not empty, so it is safe to just return `false`.
337 tail == head
338 }
339
340 /// Returns `true` if the queue is full.
341 ///
342 /// # Examples
343 ///
344 /// ```
345 /// use crossbeam_queue::{ArrayQueue, PopError};
346 ///
347 /// let q = ArrayQueue::new(1);
348 ///
349 /// assert!(!q.is_full());
350 /// q.push(1).unwrap();
351 /// assert!(q.is_full());
352 /// ```
353 pub fn is_full(&self) -> bool {
354 let tail = self.tail.load(Ordering::SeqCst);
355 let head = self.head.load(Ordering::SeqCst);
356
357 // Is the head lagging one lap behind tail?
358 //
359 // Note: If the tail changes just before we load the head, that means there was a moment
360 // when the queue was not full, so it is safe to just return `false`.
361 head.wrapping_add(self.one_lap) == tail
362 }
363
364 /// Returns the number of elements in the queue.
365 ///
366 /// # Examples
367 ///
368 /// ```
369 /// use crossbeam_queue::{ArrayQueue, PopError};
370 ///
371 /// let q = ArrayQueue::new(100);
372 /// assert_eq!(q.len(), 0);
373 ///
374 /// q.push(10).unwrap();
375 /// assert_eq!(q.len(), 1);
376 ///
377 /// q.push(20).unwrap();
378 /// assert_eq!(q.len(), 2);
379 /// ```
380 pub fn len(&self) -> usize {
381 loop {
382 // Load the tail, then load the head.
383 let tail = self.tail.load(Ordering::SeqCst);
384 let head = self.head.load(Ordering::SeqCst);
385
386 // If the tail didn't change, we've got consistent values to work with.
387 if self.tail.load(Ordering::SeqCst) == tail {
388 let hix = head & (self.one_lap - 1);
389 let tix = tail & (self.one_lap - 1);
390
391 return if hix < tix {
392 tix - hix
393 } else if hix > tix {
394 self.cap - hix + tix
395 } else if tail == head {
396 0
397 } else {
398 self.cap
399 };
400 }
401 }
402 }
403 }
404
405 impl<T> Drop for ArrayQueue<T> {
406 fn drop(&mut self) {
407 // Get the index of the head.
408 let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1);
409
410 // Loop over all slots that hold a message and drop them.
411 for i in 0..self.len() {
412 // Compute the index of the next slot holding a message.
413 let index = if hix + i < self.cap {
414 hix + i
415 } else {
416 hix + i - self.cap
417 };
418
419 unsafe {
420 let p = {
421 let slot = &mut *self.buffer.add(index);
422 let value = &mut *slot.value.get();
423 value.as_mut_ptr()
424 };
425 p.drop_in_place();
426 }
427 }
428
429 // Finally, deallocate the buffer, but don't run any destructors.
430 unsafe {
431 Vec::from_raw_parts(self.buffer, 0, self.cap);
432 }
433 }
434 }
435
436 impl<T> fmt::Debug for ArrayQueue<T> {
437 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
438 f.pad("ArrayQueue { .. }")
439 }
440 }