]> git.proxmox.com Git - rustc.git/blob - vendor/crossbeam-queue/src/seg_queue.rs
New upstream version 1.52.0~beta.3+dfsg1
[rustc.git] / vendor / crossbeam-queue / src / seg_queue.rs
1 use alloc::boxed::Box;
2 use core::cell::UnsafeCell;
3 use core::fmt;
4 use core::marker::PhantomData;
5 use core::ptr;
6 use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
7
8 use crossbeam_utils::{Backoff, CachePadded};
9
10 use maybe_uninit::MaybeUninit;
11
12 use err::PopError;
13
14 // Bits indicating the state of a slot:
15 // * If a value has been written into the slot, `WRITE` is set.
16 // * If a value has been read from the slot, `READ` is set.
17 // * If the block is being destroyed, `DESTROY` is set.
18 const WRITE: usize = 1;
19 const READ: usize = 2;
20 const DESTROY: usize = 4;
21
22 // Each block covers one "lap" of indices.
23 const LAP: usize = 32;
24 // The maximum number of values a block can hold.
25 const BLOCK_CAP: usize = LAP - 1;
26 // How many lower bits are reserved for metadata.
27 const SHIFT: usize = 1;
28 // Indicates that the block is not the last one.
29 const HAS_NEXT: usize = 1;
30
31 /// A slot in a block.
32 struct Slot<T> {
33 /// The value.
34 value: UnsafeCell<MaybeUninit<T>>,
35
36 /// The state of the slot.
37 state: AtomicUsize,
38 }
39
40 impl<T> Slot<T> {
41 /// Waits until a value is written into the slot.
42 fn wait_write(&self) {
43 let backoff = Backoff::new();
44 while self.state.load(Ordering::Acquire) & WRITE == 0 {
45 backoff.snooze();
46 }
47 }
48 }
49
50 /// A block in a linked list.
51 ///
52 /// Each block in the list can hold up to `BLOCK_CAP` values.
53 struct Block<T> {
54 /// The next block in the linked list.
55 next: AtomicPtr<Block<T>>,
56
57 /// Slots for values.
58 slots: [Slot<T>; BLOCK_CAP],
59 }
60
61 impl<T> Block<T> {
62 /// Creates an empty block that starts at `start_index`.
63 fn new() -> Block<T> {
64 // SAFETY: This is safe because:
65 // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
66 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
67 // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
68 // holds a MaybeUninit.
69 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
70 unsafe { MaybeUninit::zeroed().assume_init() }
71 }
72
73 /// Waits until the next pointer is set.
74 fn wait_next(&self) -> *mut Block<T> {
75 let backoff = Backoff::new();
76 loop {
77 let next = self.next.load(Ordering::Acquire);
78 if !next.is_null() {
79 return next;
80 }
81 backoff.snooze();
82 }
83 }
84
85 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
86 unsafe fn destroy(this: *mut Block<T>, start: usize) {
87 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
88 // begun destruction of the block.
89 for i in start..BLOCK_CAP - 1 {
90 let slot = (*this).slots.get_unchecked(i);
91
92 // Mark the `DESTROY` bit if a thread is still using the slot.
93 if slot.state.load(Ordering::Acquire) & READ == 0
94 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
95 {
96 // If a thread is still using the slot, it will continue destruction of the block.
97 return;
98 }
99 }
100
101 // No thread is using the block, now it is safe to destroy it.
102 drop(Box::from_raw(this));
103 }
104 }
105
106 /// A position in a queue.
107 struct Position<T> {
108 /// The index in the queue.
109 index: AtomicUsize,
110
111 /// The block in the linked list.
112 block: AtomicPtr<Block<T>>,
113 }
114
115 /// An unbounded multi-producer multi-consumer queue.
116 ///
117 /// This queue is implemented as a linked list of segments, where each segment is a small buffer
118 /// that can hold a handful of elements. There is no limit to how many elements can be in the queue
119 /// at a time. However, since segments need to be dynamically allocated as elements get pushed,
120 /// this queue is somewhat slower than [`ArrayQueue`].
121 ///
122 /// [`ArrayQueue`]: struct.ArrayQueue.html
123 ///
124 /// # Examples
125 ///
126 /// ```
127 /// use crossbeam_queue::{PopError, SegQueue};
128 ///
129 /// let q = SegQueue::new();
130 ///
131 /// q.push('a');
132 /// q.push('b');
133 ///
134 /// assert_eq!(q.pop(), Ok('a'));
135 /// assert_eq!(q.pop(), Ok('b'));
136 /// assert_eq!(q.pop(), Err(PopError));
137 /// ```
138 pub struct SegQueue<T> {
139 /// The head of the queue.
140 head: CachePadded<Position<T>>,
141
142 /// The tail of the queue.
143 tail: CachePadded<Position<T>>,
144
145 /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`.
146 _marker: PhantomData<T>,
147 }
148
149 unsafe impl<T: Send> Send for SegQueue<T> {}
150 unsafe impl<T: Send> Sync for SegQueue<T> {}
151
152 impl<T> SegQueue<T> {
153 /// Creates a new unbounded queue.
154 ///
155 /// # Examples
156 ///
157 /// ```
158 /// use crossbeam_queue::SegQueue;
159 ///
160 /// let q = SegQueue::<i32>::new();
161 /// ```
162 pub fn new() -> SegQueue<T> {
163 SegQueue {
164 head: CachePadded::new(Position {
165 block: AtomicPtr::new(ptr::null_mut()),
166 index: AtomicUsize::new(0),
167 }),
168 tail: CachePadded::new(Position {
169 block: AtomicPtr::new(ptr::null_mut()),
170 index: AtomicUsize::new(0),
171 }),
172 _marker: PhantomData,
173 }
174 }
175
176 /// Pushes an element into the queue.
177 ///
178 /// # Examples
179 ///
180 /// ```
181 /// use crossbeam_queue::SegQueue;
182 ///
183 /// let q = SegQueue::new();
184 ///
185 /// q.push(10);
186 /// q.push(20);
187 /// ```
188 pub fn push(&self, value: T) {
189 let backoff = Backoff::new();
190 let mut tail = self.tail.index.load(Ordering::Acquire);
191 let mut block = self.tail.block.load(Ordering::Acquire);
192 let mut next_block = None;
193
194 loop {
195 // Calculate the offset of the index into the block.
196 let offset = (tail >> SHIFT) % LAP;
197
198 // If we reached the end of the block, wait until the next one is installed.
199 if offset == BLOCK_CAP {
200 backoff.snooze();
201 tail = self.tail.index.load(Ordering::Acquire);
202 block = self.tail.block.load(Ordering::Acquire);
203 continue;
204 }
205
206 // If we're going to have to install the next block, allocate it in advance in order to
207 // make the wait for other threads as short as possible.
208 if offset + 1 == BLOCK_CAP && next_block.is_none() {
209 next_block = Some(Box::new(Block::<T>::new()));
210 }
211
212 // If this is the first push operation, we need to allocate the first block.
213 if block.is_null() {
214 let new = Box::into_raw(Box::new(Block::<T>::new()));
215
216 if self
217 .tail
218 .block
219 .compare_and_swap(block, new, Ordering::Release)
220 == block
221 {
222 self.head.block.store(new, Ordering::Release);
223 block = new;
224 } else {
225 next_block = unsafe { Some(Box::from_raw(new)) };
226 tail = self.tail.index.load(Ordering::Acquire);
227 block = self.tail.block.load(Ordering::Acquire);
228 continue;
229 }
230 }
231
232 let new_tail = tail + (1 << SHIFT);
233
234 // Try advancing the tail forward.
235 match self.tail.index.compare_exchange_weak(
236 tail,
237 new_tail,
238 Ordering::SeqCst,
239 Ordering::Acquire,
240 ) {
241 Ok(_) => unsafe {
242 // If we've reached the end of the block, install the next one.
243 if offset + 1 == BLOCK_CAP {
244 let next_block = Box::into_raw(next_block.unwrap());
245 let next_index = new_tail.wrapping_add(1 << SHIFT);
246
247 self.tail.block.store(next_block, Ordering::Release);
248 self.tail.index.store(next_index, Ordering::Release);
249 (*block).next.store(next_block, Ordering::Release);
250 }
251
252 // Write the value into the slot.
253 let slot = (*block).slots.get_unchecked(offset);
254 slot.value.get().write(MaybeUninit::new(value));
255 slot.state.fetch_or(WRITE, Ordering::Release);
256
257 return;
258 },
259 Err(t) => {
260 tail = t;
261 block = self.tail.block.load(Ordering::Acquire);
262 backoff.spin();
263 }
264 }
265 }
266 }
267
268 /// Pops an element from the queue.
269 ///
270 /// If the queue is empty, an error is returned.
271 ///
272 /// # Examples
273 ///
274 /// ```
275 /// use crossbeam_queue::{PopError, SegQueue};
276 ///
277 /// let q = SegQueue::new();
278 ///
279 /// q.push(10);
280 /// assert_eq!(q.pop(), Ok(10));
281 /// assert_eq!(q.pop(), Err(PopError));
282 /// ```
283 pub fn pop(&self) -> Result<T, PopError> {
284 let backoff = Backoff::new();
285 let mut head = self.head.index.load(Ordering::Acquire);
286 let mut block = self.head.block.load(Ordering::Acquire);
287
288 loop {
289 // Calculate the offset of the index into the block.
290 let offset = (head >> SHIFT) % LAP;
291
292 // If we reached the end of the block, wait until the next one is installed.
293 if offset == BLOCK_CAP {
294 backoff.snooze();
295 head = self.head.index.load(Ordering::Acquire);
296 block = self.head.block.load(Ordering::Acquire);
297 continue;
298 }
299
300 let mut new_head = head + (1 << SHIFT);
301
302 if new_head & HAS_NEXT == 0 {
303 atomic::fence(Ordering::SeqCst);
304 let tail = self.tail.index.load(Ordering::Relaxed);
305
306 // If the tail equals the head, that means the queue is empty.
307 if head >> SHIFT == tail >> SHIFT {
308 return Err(PopError);
309 }
310
311 // If head and tail are not in the same block, set `HAS_NEXT` in head.
312 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
313 new_head |= HAS_NEXT;
314 }
315 }
316
317 // The block can be null here only if the first push operation is in progress. In that
318 // case, just wait until it gets initialized.
319 if block.is_null() {
320 backoff.snooze();
321 head = self.head.index.load(Ordering::Acquire);
322 block = self.head.block.load(Ordering::Acquire);
323 continue;
324 }
325
326 // Try moving the head index forward.
327 match self.head.index.compare_exchange_weak(
328 head,
329 new_head,
330 Ordering::SeqCst,
331 Ordering::Acquire,
332 ) {
333 Ok(_) => unsafe {
334 // If we've reached the end of the block, move to the next one.
335 if offset + 1 == BLOCK_CAP {
336 let next = (*block).wait_next();
337 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
338 if !(*next).next.load(Ordering::Relaxed).is_null() {
339 next_index |= HAS_NEXT;
340 }
341
342 self.head.block.store(next, Ordering::Release);
343 self.head.index.store(next_index, Ordering::Release);
344 }
345
346 // Read the value.
347 let slot = (*block).slots.get_unchecked(offset);
348 slot.wait_write();
349 let value = slot.value.get().read().assume_init();
350
351 // Destroy the block if we've reached the end, or if another thread wanted to
352 // destroy but couldn't because we were busy reading from the slot.
353 if offset + 1 == BLOCK_CAP {
354 Block::destroy(block, 0);
355 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
356 Block::destroy(block, offset + 1);
357 }
358
359 return Ok(value);
360 },
361 Err(h) => {
362 head = h;
363 block = self.head.block.load(Ordering::Acquire);
364 backoff.spin();
365 }
366 }
367 }
368 }
369
370 /// Returns `true` if the queue is empty.
371 ///
372 /// # Examples
373 ///
374 /// ```
375 /// use crossbeam_queue::SegQueue;
376 ///
377 /// let q = SegQueue::new();
378 ///
379 /// assert!(q.is_empty());
380 /// q.push(1);
381 /// assert!(!q.is_empty());
382 /// ```
383 pub fn is_empty(&self) -> bool {
384 let head = self.head.index.load(Ordering::SeqCst);
385 let tail = self.tail.index.load(Ordering::SeqCst);
386 head >> SHIFT == tail >> SHIFT
387 }
388
389 /// Returns the number of elements in the queue.
390 ///
391 /// # Examples
392 ///
393 /// ```
394 /// use crossbeam_queue::{SegQueue, PopError};
395 ///
396 /// let q = SegQueue::new();
397 /// assert_eq!(q.len(), 0);
398 ///
399 /// q.push(10);
400 /// assert_eq!(q.len(), 1);
401 ///
402 /// q.push(20);
403 /// assert_eq!(q.len(), 2);
404 /// ```
405 pub fn len(&self) -> usize {
406 loop {
407 // Load the tail index, then load the head index.
408 let mut tail = self.tail.index.load(Ordering::SeqCst);
409 let mut head = self.head.index.load(Ordering::SeqCst);
410
411 // If the tail index didn't change, we've got consistent indices to work with.
412 if self.tail.index.load(Ordering::SeqCst) == tail {
413 // Erase the lower bits.
414 tail &= !((1 << SHIFT) - 1);
415 head &= !((1 << SHIFT) - 1);
416
417 // Rotate indices so that head falls into the first block.
418 let lap = (head >> SHIFT) / LAP;
419 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
420 head = head.wrapping_sub((lap * LAP) << SHIFT);
421
422 // Remove the lower bits.
423 tail >>= SHIFT;
424 head >>= SHIFT;
425
426 // Fix up indices if they fall onto block ends.
427 if head == BLOCK_CAP {
428 head = 0;
429 tail -= LAP;
430 }
431 if tail == BLOCK_CAP {
432 tail += 1;
433 }
434
435 // Return the difference minus the number of blocks between tail and head.
436 return tail - head - tail / LAP;
437 }
438 }
439 }
440 }
441
442 impl<T> Drop for SegQueue<T> {
443 fn drop(&mut self) {
444 let mut head = self.head.index.load(Ordering::Relaxed);
445 let mut tail = self.tail.index.load(Ordering::Relaxed);
446 let mut block = self.head.block.load(Ordering::Relaxed);
447
448 // Erase the lower bits.
449 head &= !((1 << SHIFT) - 1);
450 tail &= !((1 << SHIFT) - 1);
451
452 unsafe {
453 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
454 while head != tail {
455 let offset = (head >> SHIFT) % LAP;
456
457 if offset < BLOCK_CAP {
458 // Drop the value in the slot.
459 let slot = (*block).slots.get_unchecked(offset);
460 let p = &mut *slot.value.get();
461 p.as_mut_ptr().drop_in_place();
462 } else {
463 // Deallocate the block and move to the next one.
464 let next = (*block).next.load(Ordering::Relaxed);
465 drop(Box::from_raw(block));
466 block = next;
467 }
468
469 head = head.wrapping_add(1 << SHIFT);
470 }
471
472 // Deallocate the last remaining block.
473 if !block.is_null() {
474 drop(Box::from_raw(block));
475 }
476 }
477 }
478 }
479
480 impl<T> fmt::Debug for SegQueue<T> {
481 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
482 f.pad("SegQueue { .. }")
483 }
484 }
485
486 impl<T> Default for SegQueue<T> {
487 fn default() -> SegQueue<T> {
488 SegQueue::new()
489 }
490 }