]> git.proxmox.com Git - rustc.git/blame - vendor/crossbeam-channel/src/flavors/list.rs
New upstream version 1.51.0+dfsg1
[rustc.git] / vendor / crossbeam-channel / src / flavors / list.rs
CommitLineData
1b1a35ee
XL
1//! Unbounded channel implemented as a linked list.
2
3use std::cell::UnsafeCell;
4use std::marker::PhantomData;
5869c6ff 5use std::mem::MaybeUninit;
1b1a35ee
XL
6use std::ptr;
7use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
8use std::time::Instant;
9
10use crossbeam_utils::{Backoff, CachePadded};
11
5869c6ff
XL
12use crate::context::Context;
13use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
14use crate::select::{Operation, SelectHandle, Selected, Token};
15use crate::waker::SyncWaker;
1b1a35ee
XL
16
17// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
18// following changes by @kleimkuhler:
19//
20// 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100
21// 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101
22
23// Bits indicating the state of a slot:
24// * If a message has been written into the slot, `WRITE` is set.
25// * If a message has been read from the slot, `READ` is set.
26// * If the block is being destroyed, `DESTROY` is set.
27const WRITE: usize = 1;
28const READ: usize = 2;
29const DESTROY: usize = 4;
30
31// Each block covers one "lap" of indices.
32const LAP: usize = 32;
33// The maximum number of messages a block can hold.
34const BLOCK_CAP: usize = LAP - 1;
35// How many lower bits are reserved for metadata.
36const SHIFT: usize = 1;
37// Has two different purposes:
38// * If set in head, indicates that the block is not the last one.
39// * If set in tail, indicates that the channel is disconnected.
40const MARK_BIT: usize = 1;
41
42/// A slot in a block.
43struct Slot<T> {
44 /// The message.
45 msg: UnsafeCell<MaybeUninit<T>>,
46
47 /// The state of the slot.
48 state: AtomicUsize,
49}
50
51impl<T> Slot<T> {
52 /// Waits until a message is written into the slot.
53 fn wait_write(&self) {
54 let backoff = Backoff::new();
55 while self.state.load(Ordering::Acquire) & WRITE == 0 {
56 backoff.snooze();
57 }
58 }
59}
60
61/// A block in a linked list.
62///
63/// Each block in the list can hold up to `BLOCK_CAP` messages.
64struct Block<T> {
65 /// The next block in the linked list.
66 next: AtomicPtr<Block<T>>,
67
68 /// Slots for messages.
69 slots: [Slot<T>; BLOCK_CAP],
70}
71
72impl<T> Block<T> {
73 /// Creates an empty block.
74 fn new() -> Block<T> {
75 // SAFETY: This is safe because:
76 // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
77 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
78 // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
79 // holds a MaybeUninit.
80 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
81 unsafe { MaybeUninit::zeroed().assume_init() }
82 }
83
84 /// Waits until the next pointer is set.
85 fn wait_next(&self) -> *mut Block<T> {
86 let backoff = Backoff::new();
87 loop {
88 let next = self.next.load(Ordering::Acquire);
89 if !next.is_null() {
90 return next;
91 }
92 backoff.snooze();
93 }
94 }
95
96 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
97 unsafe fn destroy(this: *mut Block<T>, start: usize) {
5869c6ff
XL
98 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
99 // begun destruction of the block.
1b1a35ee
XL
100 for i in start..BLOCK_CAP - 1 {
101 let slot = (*this).slots.get_unchecked(i);
102
103 // Mark the `DESTROY` bit if a thread is still using the slot.
104 if slot.state.load(Ordering::Acquire) & READ == 0
105 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
106 {
107 // If a thread is still using the slot, it will continue destruction of the block.
108 return;
109 }
110 }
111
112 // No thread is using the block, now it is safe to destroy it.
113 drop(Box::from_raw(this));
114 }
115}
116
117/// A position in a channel.
118#[derive(Debug)]
119struct Position<T> {
120 /// The index in the channel.
121 index: AtomicUsize,
122
123 /// The block in the linked list.
124 block: AtomicPtr<Block<T>>,
125}
126
127/// The token type for the list flavor.
128#[derive(Debug)]
129pub struct ListToken {
130 /// The block of slots.
131 block: *const u8,
132
133 /// The offset into the block.
134 offset: usize,
135}
136
137impl Default for ListToken {
138 #[inline]
139 fn default() -> Self {
140 ListToken {
141 block: ptr::null(),
142 offset: 0,
143 }
144 }
145}
146
147/// Unbounded channel implemented as a linked list.
148///
149/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
150/// represented as numbers of type `usize` and wrap on overflow.
151///
152/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
153/// improve cache efficiency.
154pub struct Channel<T> {
155 /// The head of the channel.
156 head: CachePadded<Position<T>>,
157
158 /// The tail of the channel.
159 tail: CachePadded<Position<T>>,
160
161 /// Receivers waiting while the channel is empty and not disconnected.
162 receivers: SyncWaker,
163
164 /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
165 _marker: PhantomData<T>,
166}
167
168impl<T> Channel<T> {
169 /// Creates a new unbounded channel.
170 pub fn new() -> Self {
171 Channel {
172 head: CachePadded::new(Position {
173 block: AtomicPtr::new(ptr::null_mut()),
174 index: AtomicUsize::new(0),
175 }),
176 tail: CachePadded::new(Position {
177 block: AtomicPtr::new(ptr::null_mut()),
178 index: AtomicUsize::new(0),
179 }),
180 receivers: SyncWaker::new(),
181 _marker: PhantomData,
182 }
183 }
184
185 /// Returns a receiver handle to the channel.
5869c6ff 186 pub fn receiver(&self) -> Receiver<'_, T> {
1b1a35ee
XL
187 Receiver(self)
188 }
189
190 /// Returns a sender handle to the channel.
5869c6ff 191 pub fn sender(&self) -> Sender<'_, T> {
1b1a35ee
XL
192 Sender(self)
193 }
194
195 /// Attempts to reserve a slot for sending a message.
196 fn start_send(&self, token: &mut Token) -> bool {
197 let backoff = Backoff::new();
198 let mut tail = self.tail.index.load(Ordering::Acquire);
199 let mut block = self.tail.block.load(Ordering::Acquire);
200 let mut next_block = None;
201
202 loop {
203 // Check if the channel is disconnected.
204 if tail & MARK_BIT != 0 {
205 token.list.block = ptr::null();
206 return true;
207 }
208
209 // Calculate the offset of the index into the block.
210 let offset = (tail >> SHIFT) % LAP;
211
212 // If we reached the end of the block, wait until the next one is installed.
213 if offset == BLOCK_CAP {
214 backoff.snooze();
215 tail = self.tail.index.load(Ordering::Acquire);
216 block = self.tail.block.load(Ordering::Acquire);
217 continue;
218 }
219
220 // If we're going to have to install the next block, allocate it in advance in order to
221 // make the wait for other threads as short as possible.
222 if offset + 1 == BLOCK_CAP && next_block.is_none() {
223 next_block = Some(Box::new(Block::<T>::new()));
224 }
225
226 // If this is the first message to be sent into the channel, we need to allocate the
227 // first block and install it.
228 if block.is_null() {
229 let new = Box::into_raw(Box::new(Block::<T>::new()));
230
231 if self
232 .tail
233 .block
234 .compare_and_swap(block, new, Ordering::Release)
235 == block
236 {
237 self.head.block.store(new, Ordering::Release);
238 block = new;
239 } else {
240 next_block = unsafe { Some(Box::from_raw(new)) };
241 tail = self.tail.index.load(Ordering::Acquire);
242 block = self.tail.block.load(Ordering::Acquire);
243 continue;
244 }
245 }
246
247 let new_tail = tail + (1 << SHIFT);
248
249 // Try advancing the tail forward.
250 match self.tail.index.compare_exchange_weak(
251 tail,
252 new_tail,
253 Ordering::SeqCst,
254 Ordering::Acquire,
255 ) {
256 Ok(_) => unsafe {
257 // If we've reached the end of the block, install the next one.
258 if offset + 1 == BLOCK_CAP {
259 let next_block = Box::into_raw(next_block.unwrap());
260 self.tail.block.store(next_block, Ordering::Release);
261 self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
262 (*block).next.store(next_block, Ordering::Release);
263 }
264
265 token.list.block = block as *const u8;
266 token.list.offset = offset;
267 return true;
268 },
269 Err(t) => {
270 tail = t;
271 block = self.tail.block.load(Ordering::Acquire);
272 backoff.spin();
273 }
274 }
275 }
276 }
277
278 /// Writes a message into the channel.
279 pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
280 // If there is no slot, the channel is disconnected.
281 if token.list.block.is_null() {
282 return Err(msg);
283 }
284
285 // Write the message into the slot.
286 let block = token.list.block as *mut Block<T>;
287 let offset = token.list.offset;
288 let slot = (*block).slots.get_unchecked(offset);
289 slot.msg.get().write(MaybeUninit::new(msg));
290 slot.state.fetch_or(WRITE, Ordering::Release);
291
292 // Wake a sleeping receiver.
293 self.receivers.notify();
294 Ok(())
295 }
296
297 /// Attempts to reserve a slot for receiving a message.
298 fn start_recv(&self, token: &mut Token) -> bool {
299 let backoff = Backoff::new();
300 let mut head = self.head.index.load(Ordering::Acquire);
301 let mut block = self.head.block.load(Ordering::Acquire);
302
303 loop {
304 // Calculate the offset of the index into the block.
305 let offset = (head >> SHIFT) % LAP;
306
307 // If we reached the end of the block, wait until the next one is installed.
308 if offset == BLOCK_CAP {
309 backoff.snooze();
310 head = self.head.index.load(Ordering::Acquire);
311 block = self.head.block.load(Ordering::Acquire);
312 continue;
313 }
314
315 let mut new_head = head + (1 << SHIFT);
316
317 if new_head & MARK_BIT == 0 {
318 atomic::fence(Ordering::SeqCst);
319 let tail = self.tail.index.load(Ordering::Relaxed);
320
321 // If the tail equals the head, that means the channel is empty.
322 if head >> SHIFT == tail >> SHIFT {
323 // If the channel is disconnected...
324 if tail & MARK_BIT != 0 {
325 // ...then receive an error.
326 token.list.block = ptr::null();
327 return true;
328 } else {
329 // Otherwise, the receive operation is not ready.
330 return false;
331 }
332 }
333
334 // If head and tail are not in the same block, set `MARK_BIT` in head.
335 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
336 new_head |= MARK_BIT;
337 }
338 }
339
340 // The block can be null here only if the first message is being sent into the channel.
341 // In that case, just wait until it gets initialized.
342 if block.is_null() {
343 backoff.snooze();
344 head = self.head.index.load(Ordering::Acquire);
345 block = self.head.block.load(Ordering::Acquire);
346 continue;
347 }
348
349 // Try moving the head index forward.
350 match self.head.index.compare_exchange_weak(
351 head,
352 new_head,
353 Ordering::SeqCst,
354 Ordering::Acquire,
355 ) {
356 Ok(_) => unsafe {
357 // If we've reached the end of the block, move to the next one.
358 if offset + 1 == BLOCK_CAP {
359 let next = (*block).wait_next();
360 let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
361 if !(*next).next.load(Ordering::Relaxed).is_null() {
362 next_index |= MARK_BIT;
363 }
364
365 self.head.block.store(next, Ordering::Release);
366 self.head.index.store(next_index, Ordering::Release);
367 }
368
369 token.list.block = block as *const u8;
370 token.list.offset = offset;
371 return true;
372 },
373 Err(h) => {
374 head = h;
375 block = self.head.block.load(Ordering::Acquire);
376 backoff.spin();
377 }
378 }
379 }
380 }
381
382 /// Reads a message from the channel.
383 pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
384 if token.list.block.is_null() {
385 // The channel is disconnected.
386 return Err(());
387 }
388
389 // Read the message.
390 let block = token.list.block as *mut Block<T>;
391 let offset = token.list.offset;
392 let slot = (*block).slots.get_unchecked(offset);
393 slot.wait_write();
394 let msg = slot.msg.get().read().assume_init();
395
396 // Destroy the block if we've reached the end, or if another thread wanted to destroy but
397 // couldn't because we were busy reading from the slot.
398 if offset + 1 == BLOCK_CAP {
399 Block::destroy(block, 0);
400 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
401 Block::destroy(block, offset + 1);
402 }
403
404 Ok(msg)
405 }
406
407 /// Attempts to send a message into the channel.
408 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
409 self.send(msg, None).map_err(|err| match err {
410 SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
411 SendTimeoutError::Timeout(_) => unreachable!(),
412 })
413 }
414
415 /// Sends a message into the channel.
416 pub fn send(&self, msg: T, _deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> {
417 let token = &mut Token::default();
418 assert!(self.start_send(token));
419 unsafe {
420 self.write(token, msg)
421 .map_err(SendTimeoutError::Disconnected)
422 }
423 }
424
425 /// Attempts to receive a message without blocking.
426 pub fn try_recv(&self) -> Result<T, TryRecvError> {
427 let token = &mut Token::default();
428
429 if self.start_recv(token) {
430 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
431 } else {
432 Err(TryRecvError::Empty)
433 }
434 }
435
436 /// Receives a message from the channel.
437 pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
438 let token = &mut Token::default();
439 loop {
440 // Try receiving a message several times.
441 let backoff = Backoff::new();
442 loop {
443 if self.start_recv(token) {
444 unsafe {
445 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
446 }
447 }
448
449 if backoff.is_completed() {
450 break;
451 } else {
452 backoff.snooze();
453 }
454 }
455
456 if let Some(d) = deadline {
457 if Instant::now() >= d {
458 return Err(RecvTimeoutError::Timeout);
459 }
460 }
461
462 // Prepare for blocking until a sender wakes us up.
463 Context::with(|cx| {
464 let oper = Operation::hook(token);
465 self.receivers.register(oper, cx);
466
467 // Has the channel become ready just now?
468 if !self.is_empty() || self.is_disconnected() {
469 let _ = cx.try_select(Selected::Aborted);
470 }
471
472 // Block the current thread.
473 let sel = cx.wait_until(deadline);
474
475 match sel {
476 Selected::Waiting => unreachable!(),
477 Selected::Aborted | Selected::Disconnected => {
478 self.receivers.unregister(oper).unwrap();
479 // If the channel was disconnected, we still have to check for remaining
480 // messages.
481 }
482 Selected::Operation(_) => {}
483 }
484 });
485 }
486 }
487
488 /// Returns the current number of messages inside the channel.
489 pub fn len(&self) -> usize {
490 loop {
491 // Load the tail index, then load the head index.
492 let mut tail = self.tail.index.load(Ordering::SeqCst);
493 let mut head = self.head.index.load(Ordering::SeqCst);
494
495 // If the tail index didn't change, we've got consistent indices to work with.
496 if self.tail.index.load(Ordering::SeqCst) == tail {
497 // Erase the lower bits.
498 tail &= !((1 << SHIFT) - 1);
499 head &= !((1 << SHIFT) - 1);
500
5869c6ff
XL
501 // Fix up indices if they fall onto block ends.
502 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
503 tail = tail.wrapping_add(1 << SHIFT);
504 }
505 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
506 head = head.wrapping_add(1 << SHIFT);
507 }
508
1b1a35ee
XL
509 // Rotate indices so that head falls into the first block.
510 let lap = (head >> SHIFT) / LAP;
511 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
512 head = head.wrapping_sub((lap * LAP) << SHIFT);
513
514 // Remove the lower bits.
515 tail >>= SHIFT;
516 head >>= SHIFT;
517
1b1a35ee
XL
518 // Return the difference minus the number of blocks between tail and head.
519 return tail - head - tail / LAP;
520 }
521 }
522 }
523
524 /// Returns the capacity of the channel.
525 pub fn capacity(&self) -> Option<usize> {
526 None
527 }
528
529 /// Disconnects the channel and wakes up all blocked receivers.
530 ///
531 /// Returns `true` if this call disconnected the channel.
532 pub fn disconnect(&self) -> bool {
533 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
534
535 if tail & MARK_BIT == 0 {
536 self.receivers.disconnect();
537 true
538 } else {
539 false
540 }
541 }
542
543 /// Returns `true` if the channel is disconnected.
544 pub fn is_disconnected(&self) -> bool {
545 self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
546 }
547
548 /// Returns `true` if the channel is empty.
549 pub fn is_empty(&self) -> bool {
550 let head = self.head.index.load(Ordering::SeqCst);
551 let tail = self.tail.index.load(Ordering::SeqCst);
552 head >> SHIFT == tail >> SHIFT
553 }
554
555 /// Returns `true` if the channel is full.
556 pub fn is_full(&self) -> bool {
557 false
558 }
559}
560
561impl<T> Drop for Channel<T> {
562 fn drop(&mut self) {
563 let mut head = self.head.index.load(Ordering::Relaxed);
564 let mut tail = self.tail.index.load(Ordering::Relaxed);
565 let mut block = self.head.block.load(Ordering::Relaxed);
566
567 // Erase the lower bits.
568 head &= !((1 << SHIFT) - 1);
569 tail &= !((1 << SHIFT) - 1);
570
571 unsafe {
572 // Drop all messages between head and tail and deallocate the heap-allocated blocks.
573 while head != tail {
574 let offset = (head >> SHIFT) % LAP;
575
576 if offset < BLOCK_CAP {
577 // Drop the message in the slot.
578 let slot = (*block).slots.get_unchecked(offset);
579 let p = &mut *slot.msg.get();
580 p.as_mut_ptr().drop_in_place();
581 } else {
582 // Deallocate the block and move to the next one.
583 let next = (*block).next.load(Ordering::Relaxed);
584 drop(Box::from_raw(block));
585 block = next;
586 }
587
588 head = head.wrapping_add(1 << SHIFT);
589 }
590
591 // Deallocate the last remaining block.
592 if !block.is_null() {
593 drop(Box::from_raw(block));
594 }
595 }
596 }
597}
598
599/// Receiver handle to a channel.
5869c6ff 600pub struct Receiver<'a, T>(&'a Channel<T>);
1b1a35ee
XL
601
602/// Sender handle to a channel.
5869c6ff 603pub struct Sender<'a, T>(&'a Channel<T>);
1b1a35ee 604
5869c6ff 605impl<T> SelectHandle for Receiver<'_, T> {
1b1a35ee
XL
606 fn try_select(&self, token: &mut Token) -> bool {
607 self.0.start_recv(token)
608 }
609
610 fn deadline(&self) -> Option<Instant> {
611 None
612 }
613
614 fn register(&self, oper: Operation, cx: &Context) -> bool {
615 self.0.receivers.register(oper, cx);
616 self.is_ready()
617 }
618
619 fn unregister(&self, oper: Operation) {
620 self.0.receivers.unregister(oper);
621 }
622
623 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
624 self.try_select(token)
625 }
626
627 fn is_ready(&self) -> bool {
628 !self.0.is_empty() || self.0.is_disconnected()
629 }
630
631 fn watch(&self, oper: Operation, cx: &Context) -> bool {
632 self.0.receivers.watch(oper, cx);
633 self.is_ready()
634 }
635
636 fn unwatch(&self, oper: Operation) {
637 self.0.receivers.unwatch(oper);
638 }
639}
640
5869c6ff 641impl<T> SelectHandle for Sender<'_, T> {
1b1a35ee
XL
642 fn try_select(&self, token: &mut Token) -> bool {
643 self.0.start_send(token)
644 }
645
646 fn deadline(&self) -> Option<Instant> {
647 None
648 }
649
650 fn register(&self, _oper: Operation, _cx: &Context) -> bool {
651 self.is_ready()
652 }
653
654 fn unregister(&self, _oper: Operation) {}
655
656 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
657 self.try_select(token)
658 }
659
660 fn is_ready(&self) -> bool {
661 true
662 }
663
664 fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
665 self.is_ready()
666 }
667
668 fn unwatch(&self, _oper: Operation) {}
669}