]> git.proxmox.com Git - rustc.git/blob - vendor/crossbeam-queue/src/seg_queue.rs
New upstream version 1.64.0+dfsg1
[rustc.git] / vendor / crossbeam-queue / src / seg_queue.rs
1 use alloc::boxed::Box;
2 use core::cell::UnsafeCell;
3 use core::fmt;
4 use core::marker::PhantomData;
5 use core::mem::MaybeUninit;
6 use core::ptr;
7 use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
8
9 use crossbeam_utils::{Backoff, CachePadded};
10
11 // Bits indicating the state of a slot:
12 // * If a value has been written into the slot, `WRITE` is set.
13 // * If a value has been read from the slot, `READ` is set.
14 // * If the block is being destroyed, `DESTROY` is set.
15 const WRITE: usize = 1;
16 const READ: usize = 2;
17 const DESTROY: usize = 4;
18
19 // Each block covers one "lap" of indices.
20 const LAP: usize = 32;
21 // The maximum number of values a block can hold.
22 const BLOCK_CAP: usize = LAP - 1;
23 // How many lower bits are reserved for metadata.
24 const SHIFT: usize = 1;
25 // Indicates that the block is not the last one.
26 const HAS_NEXT: usize = 1;
27
28 /// A slot in a block.
29 struct Slot<T> {
30 /// The value.
31 value: UnsafeCell<MaybeUninit<T>>,
32
33 /// The state of the slot.
34 state: AtomicUsize,
35 }
36
37 impl<T> Slot<T> {
38 /// Waits until a value is written into the slot.
39 fn wait_write(&self) {
40 let backoff = Backoff::new();
41 while self.state.load(Ordering::Acquire) & WRITE == 0 {
42 backoff.snooze();
43 }
44 }
45 }
46
47 /// A block in a linked list.
48 ///
49 /// Each block in the list can hold up to `BLOCK_CAP` values.
50 struct Block<T> {
51 /// The next block in the linked list.
52 next: AtomicPtr<Block<T>>,
53
54 /// Slots for values.
55 slots: [Slot<T>; BLOCK_CAP],
56 }
57
58 impl<T> Block<T> {
59 /// Creates an empty block that starts at `start_index`.
60 fn new() -> Block<T> {
61 // SAFETY: This is safe because:
62 // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
63 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
64 // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it
65 // holds a MaybeUninit.
66 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
67 unsafe { MaybeUninit::zeroed().assume_init() }
68 }
69
70 /// Waits until the next pointer is set.
71 fn wait_next(&self) -> *mut Block<T> {
72 let backoff = Backoff::new();
73 loop {
74 let next = self.next.load(Ordering::Acquire);
75 if !next.is_null() {
76 return next;
77 }
78 backoff.snooze();
79 }
80 }
81
82 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
83 unsafe fn destroy(this: *mut Block<T>, start: usize) {
84 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
85 // begun destruction of the block.
86 for i in start..BLOCK_CAP - 1 {
87 let slot = (*this).slots.get_unchecked(i);
88
89 // Mark the `DESTROY` bit if a thread is still using the slot.
90 if slot.state.load(Ordering::Acquire) & READ == 0
91 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
92 {
93 // If a thread is still using the slot, it will continue destruction of the block.
94 return;
95 }
96 }
97
98 // No thread is using the block, now it is safe to destroy it.
99 drop(Box::from_raw(this));
100 }
101 }
102
103 /// A position in a queue.
104 struct Position<T> {
105 /// The index in the queue.
106 index: AtomicUsize,
107
108 /// The block in the linked list.
109 block: AtomicPtr<Block<T>>,
110 }
111
112 /// An unbounded multi-producer multi-consumer queue.
113 ///
114 /// This queue is implemented as a linked list of segments, where each segment is a small buffer
115 /// that can hold a handful of elements. There is no limit to how many elements can be in the queue
116 /// at a time. However, since segments need to be dynamically allocated as elements get pushed,
117 /// this queue is somewhat slower than [`ArrayQueue`].
118 ///
119 /// [`ArrayQueue`]: super::ArrayQueue
120 ///
121 /// # Examples
122 ///
123 /// ```
124 /// use crossbeam_queue::SegQueue;
125 ///
126 /// let q = SegQueue::new();
127 ///
128 /// q.push('a');
129 /// q.push('b');
130 ///
131 /// assert_eq!(q.pop(), Some('a'));
132 /// assert_eq!(q.pop(), Some('b'));
133 /// assert!(q.pop().is_none());
134 /// ```
135 pub struct SegQueue<T> {
136 /// The head of the queue.
137 head: CachePadded<Position<T>>,
138
139 /// The tail of the queue.
140 tail: CachePadded<Position<T>>,
141
142 /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`.
143 _marker: PhantomData<T>,
144 }
145
146 unsafe impl<T: Send> Send for SegQueue<T> {}
147 unsafe impl<T: Send> Sync for SegQueue<T> {}
148
149 impl<T> SegQueue<T> {
150 /// Creates a new unbounded queue.
151 ///
152 /// # Examples
153 ///
154 /// ```
155 /// use crossbeam_queue::SegQueue;
156 ///
157 /// let q = SegQueue::<i32>::new();
158 /// ```
159 pub const fn new() -> SegQueue<T> {
160 SegQueue {
161 head: CachePadded::new(Position {
162 block: AtomicPtr::new(ptr::null_mut()),
163 index: AtomicUsize::new(0),
164 }),
165 tail: CachePadded::new(Position {
166 block: AtomicPtr::new(ptr::null_mut()),
167 index: AtomicUsize::new(0),
168 }),
169 _marker: PhantomData,
170 }
171 }
172
173 /// Pushes an element into the queue.
174 ///
175 /// # Examples
176 ///
177 /// ```
178 /// use crossbeam_queue::SegQueue;
179 ///
180 /// let q = SegQueue::new();
181 ///
182 /// q.push(10);
183 /// q.push(20);
184 /// ```
185 pub fn push(&self, value: T) {
186 let backoff = Backoff::new();
187 let mut tail = self.tail.index.load(Ordering::Acquire);
188 let mut block = self.tail.block.load(Ordering::Acquire);
189 let mut next_block = None;
190
191 loop {
192 // Calculate the offset of the index into the block.
193 let offset = (tail >> SHIFT) % LAP;
194
195 // If we reached the end of the block, wait until the next one is installed.
196 if offset == BLOCK_CAP {
197 backoff.snooze();
198 tail = self.tail.index.load(Ordering::Acquire);
199 block = self.tail.block.load(Ordering::Acquire);
200 continue;
201 }
202
203 // If we're going to have to install the next block, allocate it in advance in order to
204 // make the wait for other threads as short as possible.
205 if offset + 1 == BLOCK_CAP && next_block.is_none() {
206 next_block = Some(Box::new(Block::<T>::new()));
207 }
208
209 // If this is the first push operation, we need to allocate the first block.
210 if block.is_null() {
211 let new = Box::into_raw(Box::new(Block::<T>::new()));
212
213 if self
214 .tail
215 .block
216 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
217 .is_ok()
218 {
219 self.head.block.store(new, Ordering::Release);
220 block = new;
221 } else {
222 next_block = unsafe { Some(Box::from_raw(new)) };
223 tail = self.tail.index.load(Ordering::Acquire);
224 block = self.tail.block.load(Ordering::Acquire);
225 continue;
226 }
227 }
228
229 let new_tail = tail + (1 << SHIFT);
230
231 // Try advancing the tail forward.
232 match self.tail.index.compare_exchange_weak(
233 tail,
234 new_tail,
235 Ordering::SeqCst,
236 Ordering::Acquire,
237 ) {
238 Ok(_) => unsafe {
239 // If we've reached the end of the block, install the next one.
240 if offset + 1 == BLOCK_CAP {
241 let next_block = Box::into_raw(next_block.unwrap());
242 let next_index = new_tail.wrapping_add(1 << SHIFT);
243
244 self.tail.block.store(next_block, Ordering::Release);
245 self.tail.index.store(next_index, Ordering::Release);
246 (*block).next.store(next_block, Ordering::Release);
247 }
248
249 // Write the value into the slot.
250 let slot = (*block).slots.get_unchecked(offset);
251 slot.value.get().write(MaybeUninit::new(value));
252 slot.state.fetch_or(WRITE, Ordering::Release);
253
254 return;
255 },
256 Err(t) => {
257 tail = t;
258 block = self.tail.block.load(Ordering::Acquire);
259 backoff.spin();
260 }
261 }
262 }
263 }
264
265 /// Pops an element from the queue.
266 ///
267 /// If the queue is empty, `None` is returned.
268 ///
269 /// # Examples
270 ///
271 /// ```
272 /// use crossbeam_queue::SegQueue;
273 ///
274 /// let q = SegQueue::new();
275 ///
276 /// q.push(10);
277 /// assert_eq!(q.pop(), Some(10));
278 /// assert!(q.pop().is_none());
279 /// ```
280 pub fn pop(&self) -> Option<T> {
281 let backoff = Backoff::new();
282 let mut head = self.head.index.load(Ordering::Acquire);
283 let mut block = self.head.block.load(Ordering::Acquire);
284
285 loop {
286 // Calculate the offset of the index into the block.
287 let offset = (head >> SHIFT) % LAP;
288
289 // If we reached the end of the block, wait until the next one is installed.
290 if offset == BLOCK_CAP {
291 backoff.snooze();
292 head = self.head.index.load(Ordering::Acquire);
293 block = self.head.block.load(Ordering::Acquire);
294 continue;
295 }
296
297 let mut new_head = head + (1 << SHIFT);
298
299 if new_head & HAS_NEXT == 0 {
300 atomic::fence(Ordering::SeqCst);
301 let tail = self.tail.index.load(Ordering::Relaxed);
302
303 // If the tail equals the head, that means the queue is empty.
304 if head >> SHIFT == tail >> SHIFT {
305 return None;
306 }
307
308 // If head and tail are not in the same block, set `HAS_NEXT` in head.
309 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
310 new_head |= HAS_NEXT;
311 }
312 }
313
314 // The block can be null here only if the first push operation is in progress. In that
315 // case, just wait until it gets initialized.
316 if block.is_null() {
317 backoff.snooze();
318 head = self.head.index.load(Ordering::Acquire);
319 block = self.head.block.load(Ordering::Acquire);
320 continue;
321 }
322
323 // Try moving the head index forward.
324 match self.head.index.compare_exchange_weak(
325 head,
326 new_head,
327 Ordering::SeqCst,
328 Ordering::Acquire,
329 ) {
330 Ok(_) => unsafe {
331 // If we've reached the end of the block, move to the next one.
332 if offset + 1 == BLOCK_CAP {
333 let next = (*block).wait_next();
334 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
335 if !(*next).next.load(Ordering::Relaxed).is_null() {
336 next_index |= HAS_NEXT;
337 }
338
339 self.head.block.store(next, Ordering::Release);
340 self.head.index.store(next_index, Ordering::Release);
341 }
342
343 // Read the value.
344 let slot = (*block).slots.get_unchecked(offset);
345 slot.wait_write();
346 let value = slot.value.get().read().assume_init();
347
348 // Destroy the block if we've reached the end, or if another thread wanted to
349 // destroy but couldn't because we were busy reading from the slot.
350 if offset + 1 == BLOCK_CAP {
351 Block::destroy(block, 0);
352 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
353 Block::destroy(block, offset + 1);
354 }
355
356 return Some(value);
357 },
358 Err(h) => {
359 head = h;
360 block = self.head.block.load(Ordering::Acquire);
361 backoff.spin();
362 }
363 }
364 }
365 }
366
367 /// Returns `true` if the queue is empty.
368 ///
369 /// # Examples
370 ///
371 /// ```
372 /// use crossbeam_queue::SegQueue;
373 ///
374 /// let q = SegQueue::new();
375 ///
376 /// assert!(q.is_empty());
377 /// q.push(1);
378 /// assert!(!q.is_empty());
379 /// ```
380 pub fn is_empty(&self) -> bool {
381 let head = self.head.index.load(Ordering::SeqCst);
382 let tail = self.tail.index.load(Ordering::SeqCst);
383 head >> SHIFT == tail >> SHIFT
384 }
385
386 /// Returns the number of elements in the queue.
387 ///
388 /// # Examples
389 ///
390 /// ```
391 /// use crossbeam_queue::SegQueue;
392 ///
393 /// let q = SegQueue::new();
394 /// assert_eq!(q.len(), 0);
395 ///
396 /// q.push(10);
397 /// assert_eq!(q.len(), 1);
398 ///
399 /// q.push(20);
400 /// assert_eq!(q.len(), 2);
401 /// ```
402 pub fn len(&self) -> usize {
403 loop {
404 // Load the tail index, then load the head index.
405 let mut tail = self.tail.index.load(Ordering::SeqCst);
406 let mut head = self.head.index.load(Ordering::SeqCst);
407
408 // If the tail index didn't change, we've got consistent indices to work with.
409 if self.tail.index.load(Ordering::SeqCst) == tail {
410 // Erase the lower bits.
411 tail &= !((1 << SHIFT) - 1);
412 head &= !((1 << SHIFT) - 1);
413
414 // Fix up indices if they fall onto block ends.
415 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
416 tail = tail.wrapping_add(1 << SHIFT);
417 }
418 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
419 head = head.wrapping_add(1 << SHIFT);
420 }
421
422 // Rotate indices so that head falls into the first block.
423 let lap = (head >> SHIFT) / LAP;
424 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
425 head = head.wrapping_sub((lap * LAP) << SHIFT);
426
427 // Remove the lower bits.
428 tail >>= SHIFT;
429 head >>= SHIFT;
430
431 // Return the difference minus the number of blocks between tail and head.
432 return tail - head - tail / LAP;
433 }
434 }
435 }
436 }
437
438 impl<T> Drop for SegQueue<T> {
439 fn drop(&mut self) {
440 let mut head = self.head.index.load(Ordering::Relaxed);
441 let mut tail = self.tail.index.load(Ordering::Relaxed);
442 let mut block = self.head.block.load(Ordering::Relaxed);
443
444 // Erase the lower bits.
445 head &= !((1 << SHIFT) - 1);
446 tail &= !((1 << SHIFT) - 1);
447
448 unsafe {
449 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
450 while head != tail {
451 let offset = (head >> SHIFT) % LAP;
452
453 if offset < BLOCK_CAP {
454 // Drop the value in the slot.
455 let slot = (*block).slots.get_unchecked(offset);
456 let p = &mut *slot.value.get();
457 p.as_mut_ptr().drop_in_place();
458 } else {
459 // Deallocate the block and move to the next one.
460 let next = (*block).next.load(Ordering::Relaxed);
461 drop(Box::from_raw(block));
462 block = next;
463 }
464
465 head = head.wrapping_add(1 << SHIFT);
466 }
467
468 // Deallocate the last remaining block.
469 if !block.is_null() {
470 drop(Box::from_raw(block));
471 }
472 }
473 }
474 }
475
476 impl<T> fmt::Debug for SegQueue<T> {
477 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
478 f.pad("SegQueue { .. }")
479 }
480 }
481
482 impl<T> Default for SegQueue<T> {
483 fn default() -> SegQueue<T> {
484 SegQueue::new()
485 }
486 }
487
488 impl<T> IntoIterator for SegQueue<T> {
489 type Item = T;
490
491 type IntoIter = IntoIter<T>;
492
493 fn into_iter(self) -> Self::IntoIter {
494 IntoIter { value: self }
495 }
496 }
497
498 #[derive(Debug)]
499 pub struct IntoIter<T> {
500 value: SegQueue<T>,
501 }
502
503 impl<T> Iterator for IntoIter<T> {
504 type Item = T;
505
506 fn next(&mut self) -> Option<Self::Item> {
507 let value = &mut self.value;
508 let head = *value.head.index.get_mut();
509 let tail = *value.tail.index.get_mut();
510 if head >> SHIFT == tail >> SHIFT {
511 None
512 } else {
513 let block = *value.head.block.get_mut();
514 let offset = (head >> SHIFT) % LAP;
515
516 // SAFETY: We have mutable access to this, so we can read without
517 // worrying about concurrency. Furthermore, we know this is
518 // initialized because it is the value pointed at by `value.head`
519 // and this is a non-empty queue.
520 let item = unsafe {
521 let slot = (*block).slots.get_unchecked(offset);
522 let p = &mut *slot.value.get();
523 p.as_mut_ptr().read()
524 };
525 if offset + 1 == BLOCK_CAP {
526 // Deallocate the block and move to the next one.
527 // SAFETY: The block is initialized because we've been reading
528 // from it this entire time. We can drop it b/c everything has
529 // been read out of it, so nothing is pointing to it anymore.
530 unsafe {
531 let next = *(*block).next.get_mut();
532 drop(Box::from_raw(block));
533 *value.head.block.get_mut() = next;
534 }
535 // The last value in a block is empty, so skip it
536 *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT);
537 // Double-check that we're pointing to the first item in a block.
538 debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0);
539 } else {
540 *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT);
541 }
542 Some(item)
543 }
544 }
545 }