]> git.proxmox.com Git - rustc.git/blob - vendor/crossbeam-queue/src/array_queue.rs
New upstream version 1.42.0+dfsg0+pve1
[rustc.git] / vendor / crossbeam-queue / src / array_queue.rs
1 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
2 //!
3 //! Source:
4 //! - http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
5 //!
6 //! Copyright & License:
7 //! - Copyright (c) 2010-2011 Dmitry Vyukov
8 //! - Simplified BSD License and Apache License, Version 2.0
9 //! - http://www.1024cores.net/home/code-license
10
11 use std::cell::UnsafeCell;
12 use std::fmt;
13 use std::marker::PhantomData;
14 use std::mem;
15 use std::ptr;
16 use std::sync::atomic::{self, AtomicUsize, Ordering};
17
18 use crossbeam_utils::{Backoff, CachePadded};
19
20 use err::{PopError, PushError};
21
22 /// A slot in a queue.
23 struct Slot<T> {
24 /// The current stamp.
25 ///
26 /// If the stamp equals the tail, this node will be next written to. If it equals the head,
27 /// this node will be next read from.
28 stamp: AtomicUsize,
29
30 /// The value in this slot.
31 value: UnsafeCell<T>,
32 }
33
34 /// A bounded multi-producer multi-consumer queue.
35 ///
36 /// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed
37 /// elements. The queue cannot hold more elements that the buffer allows. Attempting to push an
38 /// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit
39 /// faster than [`SegQueue`].
40 ///
41 /// [`SegQueue`]: struct.SegQueue.html
42 ///
43 /// # Examples
44 ///
45 /// ```
46 /// use crossbeam_queue::{ArrayQueue, PushError};
47 ///
48 /// let q = ArrayQueue::new(2);
49 ///
50 /// assert_eq!(q.push('a'), Ok(()));
51 /// assert_eq!(q.push('b'), Ok(()));
52 /// assert_eq!(q.push('c'), Err(PushError('c')));
53 /// assert_eq!(q.pop(), Ok('a'));
54 /// ```
55 pub struct ArrayQueue<T> {
56 /// The head of the queue.
57 ///
58 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
59 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
60 ///
61 /// Elements are popped from the head of the queue.
62 head: CachePadded<AtomicUsize>,
63
64 /// The tail of the queue.
65 ///
66 /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
67 /// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
68 ///
69 /// Elements are pushed into the tail of the queue.
70 tail: CachePadded<AtomicUsize>,
71
72 /// The buffer holding slots.
73 buffer: *mut Slot<T>,
74
75 /// The queue capacity.
76 cap: usize,
77
78 /// A stamp with the value of `{ lap: 1, index: 0 }`.
79 one_lap: usize,
80
81 /// Indicates that dropping an `ArrayQueue<T>` may drop elements of type `T`.
82 _marker: PhantomData<T>,
83 }
84
85 unsafe impl<T: Send> Sync for ArrayQueue<T> {}
86 unsafe impl<T: Send> Send for ArrayQueue<T> {}
87
88 impl<T> ArrayQueue<T> {
89 /// Creates a new bounded queue with the given capacity.
90 ///
91 /// # Panics
92 ///
93 /// Panics if the capacity is zero.
94 ///
95 /// # Examples
96 ///
97 /// ```
98 /// use crossbeam_queue::ArrayQueue;
99 ///
100 /// let q = ArrayQueue::<i32>::new(100);
101 /// ```
102 pub fn new(cap: usize) -> ArrayQueue<T> {
103 assert!(cap > 0, "capacity must be non-zero");
104
105 // Head is initialized to `{ lap: 0, index: 0 }`.
106 // Tail is initialized to `{ lap: 0, index: 0 }`.
107 let head = 0;
108 let tail = 0;
109
110 // Allocate a buffer of `cap` slots.
111 let buffer = {
112 let mut v = Vec::<Slot<T>>::with_capacity(cap);
113 let ptr = v.as_mut_ptr();
114 mem::forget(v);
115 ptr
116 };
117
118 // Initialize stamps in the slots.
119 for i in 0..cap {
120 unsafe {
121 // Set the stamp to `{ lap: 0, index: i }`.
122 let slot = buffer.add(i);
123 ptr::write(&mut (*slot).stamp, AtomicUsize::new(i));
124 }
125 }
126
127 // One lap is the smallest power of two greater than `cap`.
128 let one_lap = (cap + 1).next_power_of_two();
129
130 ArrayQueue {
131 buffer,
132 cap,
133 one_lap,
134 head: CachePadded::new(AtomicUsize::new(head)),
135 tail: CachePadded::new(AtomicUsize::new(tail)),
136 _marker: PhantomData,
137 }
138 }
139
140 /// Attempts to push an element into the queue.
141 ///
142 /// If the queue is full, the element is returned back as an error.
143 ///
144 /// # Examples
145 ///
146 /// ```
147 /// use crossbeam_queue::{ArrayQueue, PushError};
148 ///
149 /// let q = ArrayQueue::new(1);
150 ///
151 /// assert_eq!(q.push(10), Ok(()));
152 /// assert_eq!(q.push(20), Err(PushError(20)));
153 /// ```
154 pub fn push(&self, value: T) -> Result<(), PushError<T>> {
155 let backoff = Backoff::new();
156 let mut tail = self.tail.load(Ordering::Relaxed);
157
158 loop {
159 // Deconstruct the tail.
160 let index = tail & (self.one_lap - 1);
161 let lap = tail & !(self.one_lap - 1);
162
163 // Inspect the corresponding slot.
164 let slot = unsafe { &*self.buffer.add(index) };
165 let stamp = slot.stamp.load(Ordering::Acquire);
166
167 // If the tail and the stamp match, we may attempt to push.
168 if tail == stamp {
169 let new_tail = if index + 1 < self.cap {
170 // Same lap, incremented index.
171 // Set to `{ lap: lap, index: index + 1 }`.
172 tail + 1
173 } else {
174 // One lap forward, index wraps around to zero.
175 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
176 lap.wrapping_add(self.one_lap)
177 };
178
179 // Try moving the tail.
180 match self
181 .tail
182 .compare_exchange_weak(tail, new_tail, Ordering::SeqCst, Ordering::Relaxed)
183 {
184 Ok(_) => {
185 // Write the value into the slot and update the stamp.
186 unsafe { slot.value.get().write(value); }
187 slot.stamp.store(tail + 1, Ordering::Release);
188 return Ok(());
189 }
190 Err(t) => {
191 tail = t;
192 backoff.spin();
193 }
194 }
195 } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
196 atomic::fence(Ordering::SeqCst);
197 let head = self.head.load(Ordering::Relaxed);
198
199 // If the head lags one lap behind the tail as well...
200 if head.wrapping_add(self.one_lap) == tail {
201 // ...then the queue is full.
202 return Err(PushError(value));
203 }
204
205 backoff.spin();
206 tail = self.tail.load(Ordering::Relaxed);
207 } else {
208 // Snooze because we need to wait for the stamp to get updated.
209 backoff.snooze();
210 tail = self.tail.load(Ordering::Relaxed);
211 }
212 }
213 }
214
215 /// Attempts to pop an element from the queue.
216 ///
217 /// If the queue is empty, an error is returned.
218 ///
219 /// # Examples
220 ///
221 /// ```
222 /// use crossbeam_queue::{ArrayQueue, PopError};
223 ///
224 /// let q = ArrayQueue::new(1);
225 /// assert_eq!(q.push(10), Ok(()));
226 ///
227 /// assert_eq!(q.pop(), Ok(10));
228 /// assert_eq!(q.pop(), Err(PopError));
229 /// ```
230 pub fn pop(&self) -> Result<T, PopError> {
231 let backoff = Backoff::new();
232 let mut head = self.head.load(Ordering::Relaxed);
233
234 loop {
235 // Deconstruct the head.
236 let index = head & (self.one_lap - 1);
237 let lap = head & !(self.one_lap - 1);
238
239 // Inspect the corresponding slot.
240 let slot = unsafe { &*self.buffer.add(index) };
241 let stamp = slot.stamp.load(Ordering::Acquire);
242
243 // If the the stamp is ahead of the head by 1, we may attempt to pop.
244 if head + 1 == stamp {
245 let new = if index + 1 < self.cap {
246 // Same lap, incremented index.
247 // Set to `{ lap: lap, index: index + 1 }`.
248 head + 1
249 } else {
250 // One lap forward, index wraps around to zero.
251 // Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
252 lap.wrapping_add(self.one_lap)
253 };
254
255 // Try moving the head.
256 match self
257 .head
258 .compare_exchange_weak(head, new, Ordering::SeqCst, Ordering::Relaxed)
259 {
260 Ok(_) => {
261 // Read the value from the slot and update the stamp.
262 let msg = unsafe { slot.value.get().read() };
263 slot.stamp.store(head.wrapping_add(self.one_lap), Ordering::Release);
264 return Ok(msg);
265 }
266 Err(h) => {
267 head = h;
268 backoff.spin();
269 }
270 }
271 } else if stamp == head {
272 atomic::fence(Ordering::SeqCst);
273 let tail = self.tail.load(Ordering::Relaxed);
274
275 // If the tail equals the head, that means the channel is empty.
276 if tail == head {
277 return Err(PopError);
278 }
279
280 backoff.spin();
281 head = self.head.load(Ordering::Relaxed);
282 } else {
283 // Snooze because we need to wait for the stamp to get updated.
284 backoff.snooze();
285 head = self.head.load(Ordering::Relaxed);
286 }
287 }
288 }
289
290 /// Returns the capacity of the queue.
291 ///
292 /// # Examples
293 ///
294 /// ```
295 /// use crossbeam_queue::{ArrayQueue, PopError};
296 ///
297 /// let q = ArrayQueue::<i32>::new(100);
298 ///
299 /// assert_eq!(q.capacity(), 100);
300 /// ```
301 pub fn capacity(&self) -> usize {
302 self.cap
303 }
304
305 /// Returns `true` if the queue is empty.
306 ///
307 /// # Examples
308 ///
309 /// ```
310 /// use crossbeam_queue::{ArrayQueue, PopError};
311 ///
312 /// let q = ArrayQueue::new(100);
313 ///
314 /// assert!(q.is_empty());
315 /// q.push(1).unwrap();
316 /// assert!(!q.is_empty());
317 /// ```
318 pub fn is_empty(&self) -> bool {
319 let head = self.head.load(Ordering::SeqCst);
320 let tail = self.tail.load(Ordering::SeqCst);
321
322 // Is the tail lagging one lap behind head?
323 // Is the tail equal to the head?
324 //
325 // Note: If the head changes just before we load the tail, that means there was a moment
326 // when the channel was not empty, so it is safe to just return `false`.
327 tail == head
328 }
329
330 /// Returns `true` if the queue is full.
331 ///
332 /// # Examples
333 ///
334 /// ```
335 /// use crossbeam_queue::{ArrayQueue, PopError};
336 ///
337 /// let q = ArrayQueue::new(1);
338 ///
339 /// assert!(!q.is_full());
340 /// q.push(1).unwrap();
341 /// assert!(q.is_full());
342 /// ```
343 pub fn is_full(&self) -> bool {
344 let tail = self.tail.load(Ordering::SeqCst);
345 let head = self.head.load(Ordering::SeqCst);
346
347 // Is the head lagging one lap behind tail?
348 //
349 // Note: If the tail changes just before we load the head, that means there was a moment
350 // when the queue was not full, so it is safe to just return `false`.
351 head.wrapping_add(self.one_lap) == tail
352 }
353
354 /// Returns the number of elements in the queue.
355 ///
356 /// # Examples
357 ///
358 /// ```
359 /// use crossbeam_queue::{ArrayQueue, PopError};
360 ///
361 /// let q = ArrayQueue::new(100);
362 /// assert_eq!(q.len(), 0);
363 ///
364 /// q.push(10).unwrap();
365 /// assert_eq!(q.len(), 1);
366 ///
367 /// q.push(20).unwrap();
368 /// assert_eq!(q.len(), 2);
369 /// ```
370 pub fn len(&self) -> usize {
371 loop {
372 // Load the tail, then load the head.
373 let tail = self.tail.load(Ordering::SeqCst);
374 let head = self.head.load(Ordering::SeqCst);
375
376 // If the tail didn't change, we've got consistent values to work with.
377 if self.tail.load(Ordering::SeqCst) == tail {
378 let hix = head & (self.one_lap - 1);
379 let tix = tail & (self.one_lap - 1);
380
381 return if hix < tix {
382 tix - hix
383 } else if hix > tix {
384 self.cap - hix + tix
385 } else if tail == head {
386 0
387 } else {
388 self.cap
389 };
390 }
391 }
392 }
393 }
394
395 impl<T> Drop for ArrayQueue<T> {
396 fn drop(&mut self) {
397 // Get the index of the head.
398 let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1);
399
400 // Loop over all slots that hold a message and drop them.
401 for i in 0..self.len() {
402 // Compute the index of the next slot holding a message.
403 let index = if hix + i < self.cap {
404 hix + i
405 } else {
406 hix + i - self.cap
407 };
408
409 unsafe {
410 self.buffer.add(index).drop_in_place();
411 }
412 }
413
414 // Finally, deallocate the buffer, but don't run any destructors.
415 unsafe {
416 Vec::from_raw_parts(self.buffer, 0, self.cap);
417 }
418 }
419 }
420
421 impl<T> fmt::Debug for ArrayQueue<T> {
422 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
423 f.pad("ArrayQueue { .. }")
424 }
425 }