]> git.proxmox.com Git - rustc.git/blob - vendor/crossbeam-channel/src/flavors/list.rs
New upstream version 1.48.0~beta.8+dfsg1
[rustc.git] / vendor / crossbeam-channel / src / flavors / list.rs
1 //! Unbounded channel implemented as a linked list.
2
3 use std::cell::UnsafeCell;
4 use std::marker::PhantomData;
5 use std::ptr;
6 use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
7 use std::time::Instant;
8
9 use crossbeam_utils::{Backoff, CachePadded};
10
11 use maybe_uninit::MaybeUninit;
12
13 use context::Context;
14 use err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
15 use select::{Operation, SelectHandle, Selected, Token};
16 use waker::SyncWaker;
17
18 // TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
19 // following changes by @kleimkuhler:
20 //
21 // 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100
22 // 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101
23
24 // Bits indicating the state of a slot:
25 // * If a message has been written into the slot, `WRITE` is set.
26 // * If a message has been read from the slot, `READ` is set.
27 // * If the block is being destroyed, `DESTROY` is set.
28 const WRITE: usize = 1;
29 const READ: usize = 2;
30 const DESTROY: usize = 4;
31
32 // Each block covers one "lap" of indices.
33 const LAP: usize = 32;
34 // The maximum number of messages a block can hold.
35 const BLOCK_CAP: usize = LAP - 1;
36 // How many lower bits are reserved for metadata.
37 const SHIFT: usize = 1;
38 // Has two different purposes:
39 // * If set in head, indicates that the block is not the last one.
40 // * If set in tail, indicates that the channel is disconnected.
41 const MARK_BIT: usize = 1;
42
43 /// A slot in a block.
44 struct Slot<T> {
45 /// The message.
46 msg: UnsafeCell<MaybeUninit<T>>,
47
48 /// The state of the slot.
49 state: AtomicUsize,
50 }
51
52 impl<T> Slot<T> {
53 /// Waits until a message is written into the slot.
54 fn wait_write(&self) {
55 let backoff = Backoff::new();
56 while self.state.load(Ordering::Acquire) & WRITE == 0 {
57 backoff.snooze();
58 }
59 }
60 }
61
62 /// A block in a linked list.
63 ///
64 /// Each block in the list can hold up to `BLOCK_CAP` messages.
65 struct Block<T> {
66 /// The next block in the linked list.
67 next: AtomicPtr<Block<T>>,
68
69 /// Slots for messages.
70 slots: [Slot<T>; BLOCK_CAP],
71 }
72
73 impl<T> Block<T> {
74 /// Creates an empty block.
75 fn new() -> Block<T> {
76 // SAFETY: This is safe because:
77 // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
78 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
79 // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
80 // holds a MaybeUninit.
81 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
82 unsafe { MaybeUninit::zeroed().assume_init() }
83 }
84
85 /// Waits until the next pointer is set.
86 fn wait_next(&self) -> *mut Block<T> {
87 let backoff = Backoff::new();
88 loop {
89 let next = self.next.load(Ordering::Acquire);
90 if !next.is_null() {
91 return next;
92 }
93 backoff.snooze();
94 }
95 }
96
97 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
98 unsafe fn destroy(this: *mut Block<T>, start: usize) {
99 // It is not necessary to set the `DESTROY bit in the last slot because that slot has begun
100 // destruction of the block.
101 for i in start..BLOCK_CAP - 1 {
102 let slot = (*this).slots.get_unchecked(i);
103
104 // Mark the `DESTROY` bit if a thread is still using the slot.
105 if slot.state.load(Ordering::Acquire) & READ == 0
106 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
107 {
108 // If a thread is still using the slot, it will continue destruction of the block.
109 return;
110 }
111 }
112
113 // No thread is using the block, now it is safe to destroy it.
114 drop(Box::from_raw(this));
115 }
116 }
117
118 /// A position in a channel.
119 #[derive(Debug)]
120 struct Position<T> {
121 /// The index in the channel.
122 index: AtomicUsize,
123
124 /// The block in the linked list.
125 block: AtomicPtr<Block<T>>,
126 }
127
128 /// The token type for the list flavor.
129 #[derive(Debug)]
130 pub struct ListToken {
131 /// The block of slots.
132 block: *const u8,
133
134 /// The offset into the block.
135 offset: usize,
136 }
137
138 impl Default for ListToken {
139 #[inline]
140 fn default() -> Self {
141 ListToken {
142 block: ptr::null(),
143 offset: 0,
144 }
145 }
146 }
147
148 /// Unbounded channel implemented as a linked list.
149 ///
150 /// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
151 /// represented as numbers of type `usize` and wrap on overflow.
152 ///
153 /// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
154 /// improve cache efficiency.
155 pub struct Channel<T> {
156 /// The head of the channel.
157 head: CachePadded<Position<T>>,
158
159 /// The tail of the channel.
160 tail: CachePadded<Position<T>>,
161
162 /// Receivers waiting while the channel is empty and not disconnected.
163 receivers: SyncWaker,
164
165 /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
166 _marker: PhantomData<T>,
167 }
168
169 impl<T> Channel<T> {
170 /// Creates a new unbounded channel.
171 pub fn new() -> Self {
172 Channel {
173 head: CachePadded::new(Position {
174 block: AtomicPtr::new(ptr::null_mut()),
175 index: AtomicUsize::new(0),
176 }),
177 tail: CachePadded::new(Position {
178 block: AtomicPtr::new(ptr::null_mut()),
179 index: AtomicUsize::new(0),
180 }),
181 receivers: SyncWaker::new(),
182 _marker: PhantomData,
183 }
184 }
185
186 /// Returns a receiver handle to the channel.
187 pub fn receiver(&self) -> Receiver<T> {
188 Receiver(self)
189 }
190
191 /// Returns a sender handle to the channel.
192 pub fn sender(&self) -> Sender<T> {
193 Sender(self)
194 }
195
196 /// Attempts to reserve a slot for sending a message.
197 fn start_send(&self, token: &mut Token) -> bool {
198 let backoff = Backoff::new();
199 let mut tail = self.tail.index.load(Ordering::Acquire);
200 let mut block = self.tail.block.load(Ordering::Acquire);
201 let mut next_block = None;
202
203 loop {
204 // Check if the channel is disconnected.
205 if tail & MARK_BIT != 0 {
206 token.list.block = ptr::null();
207 return true;
208 }
209
210 // Calculate the offset of the index into the block.
211 let offset = (tail >> SHIFT) % LAP;
212
213 // If we reached the end of the block, wait until the next one is installed.
214 if offset == BLOCK_CAP {
215 backoff.snooze();
216 tail = self.tail.index.load(Ordering::Acquire);
217 block = self.tail.block.load(Ordering::Acquire);
218 continue;
219 }
220
221 // If we're going to have to install the next block, allocate it in advance in order to
222 // make the wait for other threads as short as possible.
223 if offset + 1 == BLOCK_CAP && next_block.is_none() {
224 next_block = Some(Box::new(Block::<T>::new()));
225 }
226
227 // If this is the first message to be sent into the channel, we need to allocate the
228 // first block and install it.
229 if block.is_null() {
230 let new = Box::into_raw(Box::new(Block::<T>::new()));
231
232 if self
233 .tail
234 .block
235 .compare_and_swap(block, new, Ordering::Release)
236 == block
237 {
238 self.head.block.store(new, Ordering::Release);
239 block = new;
240 } else {
241 next_block = unsafe { Some(Box::from_raw(new)) };
242 tail = self.tail.index.load(Ordering::Acquire);
243 block = self.tail.block.load(Ordering::Acquire);
244 continue;
245 }
246 }
247
248 let new_tail = tail + (1 << SHIFT);
249
250 // Try advancing the tail forward.
251 match self.tail.index.compare_exchange_weak(
252 tail,
253 new_tail,
254 Ordering::SeqCst,
255 Ordering::Acquire,
256 ) {
257 Ok(_) => unsafe {
258 // If we've reached the end of the block, install the next one.
259 if offset + 1 == BLOCK_CAP {
260 let next_block = Box::into_raw(next_block.unwrap());
261 self.tail.block.store(next_block, Ordering::Release);
262 self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
263 (*block).next.store(next_block, Ordering::Release);
264 }
265
266 token.list.block = block as *const u8;
267 token.list.offset = offset;
268 return true;
269 },
270 Err(t) => {
271 tail = t;
272 block = self.tail.block.load(Ordering::Acquire);
273 backoff.spin();
274 }
275 }
276 }
277 }
278
279 /// Writes a message into the channel.
280 pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
281 // If there is no slot, the channel is disconnected.
282 if token.list.block.is_null() {
283 return Err(msg);
284 }
285
286 // Write the message into the slot.
287 let block = token.list.block as *mut Block<T>;
288 let offset = token.list.offset;
289 let slot = (*block).slots.get_unchecked(offset);
290 slot.msg.get().write(MaybeUninit::new(msg));
291 slot.state.fetch_or(WRITE, Ordering::Release);
292
293 // Wake a sleeping receiver.
294 self.receivers.notify();
295 Ok(())
296 }
297
298 /// Attempts to reserve a slot for receiving a message.
299 fn start_recv(&self, token: &mut Token) -> bool {
300 let backoff = Backoff::new();
301 let mut head = self.head.index.load(Ordering::Acquire);
302 let mut block = self.head.block.load(Ordering::Acquire);
303
304 loop {
305 // Calculate the offset of the index into the block.
306 let offset = (head >> SHIFT) % LAP;
307
308 // If we reached the end of the block, wait until the next one is installed.
309 if offset == BLOCK_CAP {
310 backoff.snooze();
311 head = self.head.index.load(Ordering::Acquire);
312 block = self.head.block.load(Ordering::Acquire);
313 continue;
314 }
315
316 let mut new_head = head + (1 << SHIFT);
317
318 if new_head & MARK_BIT == 0 {
319 atomic::fence(Ordering::SeqCst);
320 let tail = self.tail.index.load(Ordering::Relaxed);
321
322 // If the tail equals the head, that means the channel is empty.
323 if head >> SHIFT == tail >> SHIFT {
324 // If the channel is disconnected...
325 if tail & MARK_BIT != 0 {
326 // ...then receive an error.
327 token.list.block = ptr::null();
328 return true;
329 } else {
330 // Otherwise, the receive operation is not ready.
331 return false;
332 }
333 }
334
335 // If head and tail are not in the same block, set `MARK_BIT` in head.
336 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
337 new_head |= MARK_BIT;
338 }
339 }
340
341 // The block can be null here only if the first message is being sent into the channel.
342 // In that case, just wait until it gets initialized.
343 if block.is_null() {
344 backoff.snooze();
345 head = self.head.index.load(Ordering::Acquire);
346 block = self.head.block.load(Ordering::Acquire);
347 continue;
348 }
349
350 // Try moving the head index forward.
351 match self.head.index.compare_exchange_weak(
352 head,
353 new_head,
354 Ordering::SeqCst,
355 Ordering::Acquire,
356 ) {
357 Ok(_) => unsafe {
358 // If we've reached the end of the block, move to the next one.
359 if offset + 1 == BLOCK_CAP {
360 let next = (*block).wait_next();
361 let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
362 if !(*next).next.load(Ordering::Relaxed).is_null() {
363 next_index |= MARK_BIT;
364 }
365
366 self.head.block.store(next, Ordering::Release);
367 self.head.index.store(next_index, Ordering::Release);
368 }
369
370 token.list.block = block as *const u8;
371 token.list.offset = offset;
372 return true;
373 },
374 Err(h) => {
375 head = h;
376 block = self.head.block.load(Ordering::Acquire);
377 backoff.spin();
378 }
379 }
380 }
381 }
382
383 /// Reads a message from the channel.
384 pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
385 if token.list.block.is_null() {
386 // The channel is disconnected.
387 return Err(());
388 }
389
390 // Read the message.
391 let block = token.list.block as *mut Block<T>;
392 let offset = token.list.offset;
393 let slot = (*block).slots.get_unchecked(offset);
394 slot.wait_write();
395 let msg = slot.msg.get().read().assume_init();
396
397 // Destroy the block if we've reached the end, or if another thread wanted to destroy but
398 // couldn't because we were busy reading from the slot.
399 if offset + 1 == BLOCK_CAP {
400 Block::destroy(block, 0);
401 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
402 Block::destroy(block, offset + 1);
403 }
404
405 Ok(msg)
406 }
407
408 /// Attempts to send a message into the channel.
409 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
410 self.send(msg, None).map_err(|err| match err {
411 SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
412 SendTimeoutError::Timeout(_) => unreachable!(),
413 })
414 }
415
416 /// Sends a message into the channel.
417 pub fn send(&self, msg: T, _deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> {
418 let token = &mut Token::default();
419 assert!(self.start_send(token));
420 unsafe {
421 self.write(token, msg)
422 .map_err(SendTimeoutError::Disconnected)
423 }
424 }
425
426 /// Attempts to receive a message without blocking.
427 pub fn try_recv(&self) -> Result<T, TryRecvError> {
428 let token = &mut Token::default();
429
430 if self.start_recv(token) {
431 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
432 } else {
433 Err(TryRecvError::Empty)
434 }
435 }
436
437 /// Receives a message from the channel.
438 pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
439 let token = &mut Token::default();
440 loop {
441 // Try receiving a message several times.
442 let backoff = Backoff::new();
443 loop {
444 if self.start_recv(token) {
445 unsafe {
446 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
447 }
448 }
449
450 if backoff.is_completed() {
451 break;
452 } else {
453 backoff.snooze();
454 }
455 }
456
457 if let Some(d) = deadline {
458 if Instant::now() >= d {
459 return Err(RecvTimeoutError::Timeout);
460 }
461 }
462
463 // Prepare for blocking until a sender wakes us up.
464 Context::with(|cx| {
465 let oper = Operation::hook(token);
466 self.receivers.register(oper, cx);
467
468 // Has the channel become ready just now?
469 if !self.is_empty() || self.is_disconnected() {
470 let _ = cx.try_select(Selected::Aborted);
471 }
472
473 // Block the current thread.
474 let sel = cx.wait_until(deadline);
475
476 match sel {
477 Selected::Waiting => unreachable!(),
478 Selected::Aborted | Selected::Disconnected => {
479 self.receivers.unregister(oper).unwrap();
480 // If the channel was disconnected, we still have to check for remaining
481 // messages.
482 }
483 Selected::Operation(_) => {}
484 }
485 });
486 }
487 }
488
489 /// Returns the current number of messages inside the channel.
490 pub fn len(&self) -> usize {
491 loop {
492 // Load the tail index, then load the head index.
493 let mut tail = self.tail.index.load(Ordering::SeqCst);
494 let mut head = self.head.index.load(Ordering::SeqCst);
495
496 // If the tail index didn't change, we've got consistent indices to work with.
497 if self.tail.index.load(Ordering::SeqCst) == tail {
498 // Erase the lower bits.
499 tail &= !((1 << SHIFT) - 1);
500 head &= !((1 << SHIFT) - 1);
501
502 // Rotate indices so that head falls into the first block.
503 let lap = (head >> SHIFT) / LAP;
504 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
505 head = head.wrapping_sub((lap * LAP) << SHIFT);
506
507 // Remove the lower bits.
508 tail >>= SHIFT;
509 head >>= SHIFT;
510
511 // Fix up indices if they fall onto block ends.
512 if head == BLOCK_CAP {
513 head = 0;
514 tail -= LAP;
515 }
516 if tail == BLOCK_CAP {
517 tail += 1;
518 }
519
520 // Return the difference minus the number of blocks between tail and head.
521 return tail - head - tail / LAP;
522 }
523 }
524 }
525
526 /// Returns the capacity of the channel.
527 pub fn capacity(&self) -> Option<usize> {
528 None
529 }
530
531 /// Disconnects the channel and wakes up all blocked receivers.
532 ///
533 /// Returns `true` if this call disconnected the channel.
534 pub fn disconnect(&self) -> bool {
535 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
536
537 if tail & MARK_BIT == 0 {
538 self.receivers.disconnect();
539 true
540 } else {
541 false
542 }
543 }
544
545 /// Returns `true` if the channel is disconnected.
546 pub fn is_disconnected(&self) -> bool {
547 self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
548 }
549
550 /// Returns `true` if the channel is empty.
551 pub fn is_empty(&self) -> bool {
552 let head = self.head.index.load(Ordering::SeqCst);
553 let tail = self.tail.index.load(Ordering::SeqCst);
554 head >> SHIFT == tail >> SHIFT
555 }
556
557 /// Returns `true` if the channel is full.
558 pub fn is_full(&self) -> bool {
559 false
560 }
561 }
562
563 impl<T> Drop for Channel<T> {
564 fn drop(&mut self) {
565 let mut head = self.head.index.load(Ordering::Relaxed);
566 let mut tail = self.tail.index.load(Ordering::Relaxed);
567 let mut block = self.head.block.load(Ordering::Relaxed);
568
569 // Erase the lower bits.
570 head &= !((1 << SHIFT) - 1);
571 tail &= !((1 << SHIFT) - 1);
572
573 unsafe {
574 // Drop all messages between head and tail and deallocate the heap-allocated blocks.
575 while head != tail {
576 let offset = (head >> SHIFT) % LAP;
577
578 if offset < BLOCK_CAP {
579 // Drop the message in the slot.
580 let slot = (*block).slots.get_unchecked(offset);
581 let p = &mut *slot.msg.get();
582 p.as_mut_ptr().drop_in_place();
583 } else {
584 // Deallocate the block and move to the next one.
585 let next = (*block).next.load(Ordering::Relaxed);
586 drop(Box::from_raw(block));
587 block = next;
588 }
589
590 head = head.wrapping_add(1 << SHIFT);
591 }
592
593 // Deallocate the last remaining block.
594 if !block.is_null() {
595 drop(Box::from_raw(block));
596 }
597 }
598 }
599 }
600
601 /// Receiver handle to a channel.
602 pub struct Receiver<'a, T: 'a>(&'a Channel<T>);
603
604 /// Sender handle to a channel.
605 pub struct Sender<'a, T: 'a>(&'a Channel<T>);
606
607 impl<'a, T> SelectHandle for Receiver<'a, T> {
608 fn try_select(&self, token: &mut Token) -> bool {
609 self.0.start_recv(token)
610 }
611
612 fn deadline(&self) -> Option<Instant> {
613 None
614 }
615
616 fn register(&self, oper: Operation, cx: &Context) -> bool {
617 self.0.receivers.register(oper, cx);
618 self.is_ready()
619 }
620
621 fn unregister(&self, oper: Operation) {
622 self.0.receivers.unregister(oper);
623 }
624
625 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
626 self.try_select(token)
627 }
628
629 fn is_ready(&self) -> bool {
630 !self.0.is_empty() || self.0.is_disconnected()
631 }
632
633 fn watch(&self, oper: Operation, cx: &Context) -> bool {
634 self.0.receivers.watch(oper, cx);
635 self.is_ready()
636 }
637
638 fn unwatch(&self, oper: Operation) {
639 self.0.receivers.unwatch(oper);
640 }
641 }
642
643 impl<'a, T> SelectHandle for Sender<'a, T> {
644 fn try_select(&self, token: &mut Token) -> bool {
645 self.0.start_send(token)
646 }
647
648 fn deadline(&self) -> Option<Instant> {
649 None
650 }
651
652 fn register(&self, _oper: Operation, _cx: &Context) -> bool {
653 self.is_ready()
654 }
655
656 fn unregister(&self, _oper: Operation) {}
657
658 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
659 self.try_select(token)
660 }
661
662 fn is_ready(&self) -> bool {
663 true
664 }
665
666 fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
667 self.is_ready()
668 }
669
670 fn unwatch(&self, _oper: Operation) {}
671 }