]>
Commit | Line | Data |
---|---|---|
1b1a35ee XL |
1 | //! Unbounded channel implemented as a linked list. |
2 | ||
3 | use std::cell::UnsafeCell; | |
4 | use std::marker::PhantomData; | |
5869c6ff | 5 | use std::mem::MaybeUninit; |
1b1a35ee XL |
6 | use std::ptr; |
7 | use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; | |
8 | use std::time::Instant; | |
9 | ||
10 | use crossbeam_utils::{Backoff, CachePadded}; | |
11 | ||
5869c6ff XL |
12 | use crate::context::Context; |
13 | use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; | |
14 | use crate::select::{Operation, SelectHandle, Selected, Token}; | |
15 | use crate::waker::SyncWaker; | |
1b1a35ee XL |
16 | |
17 | // TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the | |
18 | // following changes by @kleimkuhler: | |
19 | // | |
20 | // 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100 | |
21 | // 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101 | |
22 | ||
23 | // Bits indicating the state of a slot: | |
24 | // * If a message has been written into the slot, `WRITE` is set. | |
25 | // * If a message has been read from the slot, `READ` is set. | |
26 | // * If the block is being destroyed, `DESTROY` is set. | |
27 | const WRITE: usize = 1; | |
28 | const READ: usize = 2; | |
29 | const DESTROY: usize = 4; | |
30 | ||
31 | // Each block covers one "lap" of indices. | |
32 | const LAP: usize = 32; | |
33 | // The maximum number of messages a block can hold. | |
34 | const BLOCK_CAP: usize = LAP - 1; | |
35 | // How many lower bits are reserved for metadata. | |
36 | const SHIFT: usize = 1; | |
37 | // Has two different purposes: | |
38 | // * If set in head, indicates that the block is not the last one. | |
39 | // * If set in tail, indicates that the channel is disconnected. | |
40 | const MARK_BIT: usize = 1; | |
41 | ||
42 | /// A slot in a block. | |
43 | struct Slot<T> { | |
44 | /// The message. | |
45 | msg: UnsafeCell<MaybeUninit<T>>, | |
46 | ||
47 | /// The state of the slot. | |
48 | state: AtomicUsize, | |
49 | } | |
50 | ||
51 | impl<T> Slot<T> { | |
52 | /// Waits until a message is written into the slot. | |
53 | fn wait_write(&self) { | |
54 | let backoff = Backoff::new(); | |
55 | while self.state.load(Ordering::Acquire) & WRITE == 0 { | |
56 | backoff.snooze(); | |
57 | } | |
58 | } | |
59 | } | |
60 | ||
61 | /// A block in a linked list. | |
62 | /// | |
63 | /// Each block in the list can hold up to `BLOCK_CAP` messages. | |
64 | struct Block<T> { | |
65 | /// The next block in the linked list. | |
66 | next: AtomicPtr<Block<T>>, | |
67 | ||
68 | /// Slots for messages. | |
69 | slots: [Slot<T>; BLOCK_CAP], | |
70 | } | |
71 | ||
72 | impl<T> Block<T> { | |
73 | /// Creates an empty block. | |
74 | fn new() -> Block<T> { | |
75 | // SAFETY: This is safe because: | |
76 | // [1] `Block::next` (AtomicPtr) may be safely zero initialized. | |
77 | // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. | |
78 | // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it | |
79 | // holds a MaybeUninit. | |
80 | // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. | |
81 | unsafe { MaybeUninit::zeroed().assume_init() } | |
82 | } | |
83 | ||
84 | /// Waits until the next pointer is set. | |
85 | fn wait_next(&self) -> *mut Block<T> { | |
86 | let backoff = Backoff::new(); | |
87 | loop { | |
88 | let next = self.next.load(Ordering::Acquire); | |
89 | if !next.is_null() { | |
90 | return next; | |
91 | } | |
92 | backoff.snooze(); | |
93 | } | |
94 | } | |
95 | ||
96 | /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. | |
97 | unsafe fn destroy(this: *mut Block<T>, start: usize) { | |
5869c6ff XL |
98 | // It is not necessary to set the `DESTROY` bit in the last slot because that slot has |
99 | // begun destruction of the block. | |
1b1a35ee XL |
100 | for i in start..BLOCK_CAP - 1 { |
101 | let slot = (*this).slots.get_unchecked(i); | |
102 | ||
103 | // Mark the `DESTROY` bit if a thread is still using the slot. | |
104 | if slot.state.load(Ordering::Acquire) & READ == 0 | |
105 | && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 | |
106 | { | |
107 | // If a thread is still using the slot, it will continue destruction of the block. | |
108 | return; | |
109 | } | |
110 | } | |
111 | ||
112 | // No thread is using the block, now it is safe to destroy it. | |
113 | drop(Box::from_raw(this)); | |
114 | } | |
115 | } | |
116 | ||
117 | /// A position in a channel. | |
118 | #[derive(Debug)] | |
119 | struct Position<T> { | |
120 | /// The index in the channel. | |
121 | index: AtomicUsize, | |
122 | ||
123 | /// The block in the linked list. | |
124 | block: AtomicPtr<Block<T>>, | |
125 | } | |
126 | ||
127 | /// The token type for the list flavor. | |
128 | #[derive(Debug)] | |
129 | pub struct ListToken { | |
130 | /// The block of slots. | |
131 | block: *const u8, | |
132 | ||
133 | /// The offset into the block. | |
134 | offset: usize, | |
135 | } | |
136 | ||
137 | impl Default for ListToken { | |
138 | #[inline] | |
139 | fn default() -> Self { | |
140 | ListToken { | |
141 | block: ptr::null(), | |
142 | offset: 0, | |
143 | } | |
144 | } | |
145 | } | |
146 | ||
147 | /// Unbounded channel implemented as a linked list. | |
148 | /// | |
149 | /// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are | |
150 | /// represented as numbers of type `usize` and wrap on overflow. | |
151 | /// | |
152 | /// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and | |
153 | /// improve cache efficiency. | |
154 | pub struct Channel<T> { | |
155 | /// The head of the channel. | |
156 | head: CachePadded<Position<T>>, | |
157 | ||
158 | /// The tail of the channel. | |
159 | tail: CachePadded<Position<T>>, | |
160 | ||
161 | /// Receivers waiting while the channel is empty and not disconnected. | |
162 | receivers: SyncWaker, | |
163 | ||
164 | /// Indicates that dropping a `Channel<T>` may drop messages of type `T`. | |
165 | _marker: PhantomData<T>, | |
166 | } | |
167 | ||
168 | impl<T> Channel<T> { | |
169 | /// Creates a new unbounded channel. | |
170 | pub fn new() -> Self { | |
171 | Channel { | |
172 | head: CachePadded::new(Position { | |
173 | block: AtomicPtr::new(ptr::null_mut()), | |
174 | index: AtomicUsize::new(0), | |
175 | }), | |
176 | tail: CachePadded::new(Position { | |
177 | block: AtomicPtr::new(ptr::null_mut()), | |
178 | index: AtomicUsize::new(0), | |
179 | }), | |
180 | receivers: SyncWaker::new(), | |
181 | _marker: PhantomData, | |
182 | } | |
183 | } | |
184 | ||
185 | /// Returns a receiver handle to the channel. | |
5869c6ff | 186 | pub fn receiver(&self) -> Receiver<'_, T> { |
1b1a35ee XL |
187 | Receiver(self) |
188 | } | |
189 | ||
190 | /// Returns a sender handle to the channel. | |
5869c6ff | 191 | pub fn sender(&self) -> Sender<'_, T> { |
1b1a35ee XL |
192 | Sender(self) |
193 | } | |
194 | ||
195 | /// Attempts to reserve a slot for sending a message. | |
196 | fn start_send(&self, token: &mut Token) -> bool { | |
197 | let backoff = Backoff::new(); | |
198 | let mut tail = self.tail.index.load(Ordering::Acquire); | |
199 | let mut block = self.tail.block.load(Ordering::Acquire); | |
200 | let mut next_block = None; | |
201 | ||
202 | loop { | |
203 | // Check if the channel is disconnected. | |
204 | if tail & MARK_BIT != 0 { | |
205 | token.list.block = ptr::null(); | |
206 | return true; | |
207 | } | |
208 | ||
209 | // Calculate the offset of the index into the block. | |
210 | let offset = (tail >> SHIFT) % LAP; | |
211 | ||
212 | // If we reached the end of the block, wait until the next one is installed. | |
213 | if offset == BLOCK_CAP { | |
214 | backoff.snooze(); | |
215 | tail = self.tail.index.load(Ordering::Acquire); | |
216 | block = self.tail.block.load(Ordering::Acquire); | |
217 | continue; | |
218 | } | |
219 | ||
220 | // If we're going to have to install the next block, allocate it in advance in order to | |
221 | // make the wait for other threads as short as possible. | |
222 | if offset + 1 == BLOCK_CAP && next_block.is_none() { | |
223 | next_block = Some(Box::new(Block::<T>::new())); | |
224 | } | |
225 | ||
226 | // If this is the first message to be sent into the channel, we need to allocate the | |
227 | // first block and install it. | |
228 | if block.is_null() { | |
229 | let new = Box::into_raw(Box::new(Block::<T>::new())); | |
230 | ||
231 | if self | |
232 | .tail | |
233 | .block | |
234 | .compare_and_swap(block, new, Ordering::Release) | |
235 | == block | |
236 | { | |
237 | self.head.block.store(new, Ordering::Release); | |
238 | block = new; | |
239 | } else { | |
240 | next_block = unsafe { Some(Box::from_raw(new)) }; | |
241 | tail = self.tail.index.load(Ordering::Acquire); | |
242 | block = self.tail.block.load(Ordering::Acquire); | |
243 | continue; | |
244 | } | |
245 | } | |
246 | ||
247 | let new_tail = tail + (1 << SHIFT); | |
248 | ||
249 | // Try advancing the tail forward. | |
250 | match self.tail.index.compare_exchange_weak( | |
251 | tail, | |
252 | new_tail, | |
253 | Ordering::SeqCst, | |
254 | Ordering::Acquire, | |
255 | ) { | |
256 | Ok(_) => unsafe { | |
257 | // If we've reached the end of the block, install the next one. | |
258 | if offset + 1 == BLOCK_CAP { | |
259 | let next_block = Box::into_raw(next_block.unwrap()); | |
260 | self.tail.block.store(next_block, Ordering::Release); | |
261 | self.tail.index.fetch_add(1 << SHIFT, Ordering::Release); | |
262 | (*block).next.store(next_block, Ordering::Release); | |
263 | } | |
264 | ||
265 | token.list.block = block as *const u8; | |
266 | token.list.offset = offset; | |
267 | return true; | |
268 | }, | |
269 | Err(t) => { | |
270 | tail = t; | |
271 | block = self.tail.block.load(Ordering::Acquire); | |
272 | backoff.spin(); | |
273 | } | |
274 | } | |
275 | } | |
276 | } | |
277 | ||
278 | /// Writes a message into the channel. | |
279 | pub unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { | |
280 | // If there is no slot, the channel is disconnected. | |
281 | if token.list.block.is_null() { | |
282 | return Err(msg); | |
283 | } | |
284 | ||
285 | // Write the message into the slot. | |
286 | let block = token.list.block as *mut Block<T>; | |
287 | let offset = token.list.offset; | |
288 | let slot = (*block).slots.get_unchecked(offset); | |
289 | slot.msg.get().write(MaybeUninit::new(msg)); | |
290 | slot.state.fetch_or(WRITE, Ordering::Release); | |
291 | ||
292 | // Wake a sleeping receiver. | |
293 | self.receivers.notify(); | |
294 | Ok(()) | |
295 | } | |
296 | ||
297 | /// Attempts to reserve a slot for receiving a message. | |
298 | fn start_recv(&self, token: &mut Token) -> bool { | |
299 | let backoff = Backoff::new(); | |
300 | let mut head = self.head.index.load(Ordering::Acquire); | |
301 | let mut block = self.head.block.load(Ordering::Acquire); | |
302 | ||
303 | loop { | |
304 | // Calculate the offset of the index into the block. | |
305 | let offset = (head >> SHIFT) % LAP; | |
306 | ||
307 | // If we reached the end of the block, wait until the next one is installed. | |
308 | if offset == BLOCK_CAP { | |
309 | backoff.snooze(); | |
310 | head = self.head.index.load(Ordering::Acquire); | |
311 | block = self.head.block.load(Ordering::Acquire); | |
312 | continue; | |
313 | } | |
314 | ||
315 | let mut new_head = head + (1 << SHIFT); | |
316 | ||
317 | if new_head & MARK_BIT == 0 { | |
318 | atomic::fence(Ordering::SeqCst); | |
319 | let tail = self.tail.index.load(Ordering::Relaxed); | |
320 | ||
321 | // If the tail equals the head, that means the channel is empty. | |
322 | if head >> SHIFT == tail >> SHIFT { | |
323 | // If the channel is disconnected... | |
324 | if tail & MARK_BIT != 0 { | |
325 | // ...then receive an error. | |
326 | token.list.block = ptr::null(); | |
327 | return true; | |
328 | } else { | |
329 | // Otherwise, the receive operation is not ready. | |
330 | return false; | |
331 | } | |
332 | } | |
333 | ||
334 | // If head and tail are not in the same block, set `MARK_BIT` in head. | |
335 | if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { | |
336 | new_head |= MARK_BIT; | |
337 | } | |
338 | } | |
339 | ||
340 | // The block can be null here only if the first message is being sent into the channel. | |
341 | // In that case, just wait until it gets initialized. | |
342 | if block.is_null() { | |
343 | backoff.snooze(); | |
344 | head = self.head.index.load(Ordering::Acquire); | |
345 | block = self.head.block.load(Ordering::Acquire); | |
346 | continue; | |
347 | } | |
348 | ||
349 | // Try moving the head index forward. | |
350 | match self.head.index.compare_exchange_weak( | |
351 | head, | |
352 | new_head, | |
353 | Ordering::SeqCst, | |
354 | Ordering::Acquire, | |
355 | ) { | |
356 | Ok(_) => unsafe { | |
357 | // If we've reached the end of the block, move to the next one. | |
358 | if offset + 1 == BLOCK_CAP { | |
359 | let next = (*block).wait_next(); | |
360 | let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT); | |
361 | if !(*next).next.load(Ordering::Relaxed).is_null() { | |
362 | next_index |= MARK_BIT; | |
363 | } | |
364 | ||
365 | self.head.block.store(next, Ordering::Release); | |
366 | self.head.index.store(next_index, Ordering::Release); | |
367 | } | |
368 | ||
369 | token.list.block = block as *const u8; | |
370 | token.list.offset = offset; | |
371 | return true; | |
372 | }, | |
373 | Err(h) => { | |
374 | head = h; | |
375 | block = self.head.block.load(Ordering::Acquire); | |
376 | backoff.spin(); | |
377 | } | |
378 | } | |
379 | } | |
380 | } | |
381 | ||
382 | /// Reads a message from the channel. | |
383 | pub unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { | |
384 | if token.list.block.is_null() { | |
385 | // The channel is disconnected. | |
386 | return Err(()); | |
387 | } | |
388 | ||
389 | // Read the message. | |
390 | let block = token.list.block as *mut Block<T>; | |
391 | let offset = token.list.offset; | |
392 | let slot = (*block).slots.get_unchecked(offset); | |
393 | slot.wait_write(); | |
394 | let msg = slot.msg.get().read().assume_init(); | |
395 | ||
396 | // Destroy the block if we've reached the end, or if another thread wanted to destroy but | |
397 | // couldn't because we were busy reading from the slot. | |
398 | if offset + 1 == BLOCK_CAP { | |
399 | Block::destroy(block, 0); | |
400 | } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { | |
401 | Block::destroy(block, offset + 1); | |
402 | } | |
403 | ||
404 | Ok(msg) | |
405 | } | |
406 | ||
407 | /// Attempts to send a message into the channel. | |
408 | pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { | |
409 | self.send(msg, None).map_err(|err| match err { | |
410 | SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg), | |
411 | SendTimeoutError::Timeout(_) => unreachable!(), | |
412 | }) | |
413 | } | |
414 | ||
415 | /// Sends a message into the channel. | |
416 | pub fn send(&self, msg: T, _deadline: Option<Instant>) -> Result<(), SendTimeoutError<T>> { | |
417 | let token = &mut Token::default(); | |
418 | assert!(self.start_send(token)); | |
419 | unsafe { | |
420 | self.write(token, msg) | |
421 | .map_err(SendTimeoutError::Disconnected) | |
422 | } | |
423 | } | |
424 | ||
425 | /// Attempts to receive a message without blocking. | |
426 | pub fn try_recv(&self) -> Result<T, TryRecvError> { | |
427 | let token = &mut Token::default(); | |
428 | ||
429 | if self.start_recv(token) { | |
430 | unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } | |
431 | } else { | |
432 | Err(TryRecvError::Empty) | |
433 | } | |
434 | } | |
435 | ||
436 | /// Receives a message from the channel. | |
437 | pub fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { | |
438 | let token = &mut Token::default(); | |
439 | loop { | |
440 | // Try receiving a message several times. | |
441 | let backoff = Backoff::new(); | |
442 | loop { | |
443 | if self.start_recv(token) { | |
444 | unsafe { | |
445 | return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); | |
446 | } | |
447 | } | |
448 | ||
449 | if backoff.is_completed() { | |
450 | break; | |
451 | } else { | |
452 | backoff.snooze(); | |
453 | } | |
454 | } | |
455 | ||
456 | if let Some(d) = deadline { | |
457 | if Instant::now() >= d { | |
458 | return Err(RecvTimeoutError::Timeout); | |
459 | } | |
460 | } | |
461 | ||
462 | // Prepare for blocking until a sender wakes us up. | |
463 | Context::with(|cx| { | |
464 | let oper = Operation::hook(token); | |
465 | self.receivers.register(oper, cx); | |
466 | ||
467 | // Has the channel become ready just now? | |
468 | if !self.is_empty() || self.is_disconnected() { | |
469 | let _ = cx.try_select(Selected::Aborted); | |
470 | } | |
471 | ||
472 | // Block the current thread. | |
473 | let sel = cx.wait_until(deadline); | |
474 | ||
475 | match sel { | |
476 | Selected::Waiting => unreachable!(), | |
477 | Selected::Aborted | Selected::Disconnected => { | |
478 | self.receivers.unregister(oper).unwrap(); | |
479 | // If the channel was disconnected, we still have to check for remaining | |
480 | // messages. | |
481 | } | |
482 | Selected::Operation(_) => {} | |
483 | } | |
484 | }); | |
485 | } | |
486 | } | |
487 | ||
488 | /// Returns the current number of messages inside the channel. | |
489 | pub fn len(&self) -> usize { | |
490 | loop { | |
491 | // Load the tail index, then load the head index. | |
492 | let mut tail = self.tail.index.load(Ordering::SeqCst); | |
493 | let mut head = self.head.index.load(Ordering::SeqCst); | |
494 | ||
495 | // If the tail index didn't change, we've got consistent indices to work with. | |
496 | if self.tail.index.load(Ordering::SeqCst) == tail { | |
497 | // Erase the lower bits. | |
498 | tail &= !((1 << SHIFT) - 1); | |
499 | head &= !((1 << SHIFT) - 1); | |
500 | ||
5869c6ff XL |
501 | // Fix up indices if they fall onto block ends. |
502 | if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { | |
503 | tail = tail.wrapping_add(1 << SHIFT); | |
504 | } | |
505 | if (head >> SHIFT) & (LAP - 1) == LAP - 1 { | |
506 | head = head.wrapping_add(1 << SHIFT); | |
507 | } | |
508 | ||
1b1a35ee XL |
509 | // Rotate indices so that head falls into the first block. |
510 | let lap = (head >> SHIFT) / LAP; | |
511 | tail = tail.wrapping_sub((lap * LAP) << SHIFT); | |
512 | head = head.wrapping_sub((lap * LAP) << SHIFT); | |
513 | ||
514 | // Remove the lower bits. | |
515 | tail >>= SHIFT; | |
516 | head >>= SHIFT; | |
517 | ||
1b1a35ee XL |
518 | // Return the difference minus the number of blocks between tail and head. |
519 | return tail - head - tail / LAP; | |
520 | } | |
521 | } | |
522 | } | |
523 | ||
524 | /// Returns the capacity of the channel. | |
525 | pub fn capacity(&self) -> Option<usize> { | |
526 | None | |
527 | } | |
528 | ||
529 | /// Disconnects the channel and wakes up all blocked receivers. | |
530 | /// | |
531 | /// Returns `true` if this call disconnected the channel. | |
532 | pub fn disconnect(&self) -> bool { | |
533 | let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); | |
534 | ||
535 | if tail & MARK_BIT == 0 { | |
536 | self.receivers.disconnect(); | |
537 | true | |
538 | } else { | |
539 | false | |
540 | } | |
541 | } | |
542 | ||
543 | /// Returns `true` if the channel is disconnected. | |
544 | pub fn is_disconnected(&self) -> bool { | |
545 | self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0 | |
546 | } | |
547 | ||
548 | /// Returns `true` if the channel is empty. | |
549 | pub fn is_empty(&self) -> bool { | |
550 | let head = self.head.index.load(Ordering::SeqCst); | |
551 | let tail = self.tail.index.load(Ordering::SeqCst); | |
552 | head >> SHIFT == tail >> SHIFT | |
553 | } | |
554 | ||
555 | /// Returns `true` if the channel is full. | |
556 | pub fn is_full(&self) -> bool { | |
557 | false | |
558 | } | |
559 | } | |
560 | ||
561 | impl<T> Drop for Channel<T> { | |
562 | fn drop(&mut self) { | |
563 | let mut head = self.head.index.load(Ordering::Relaxed); | |
564 | let mut tail = self.tail.index.load(Ordering::Relaxed); | |
565 | let mut block = self.head.block.load(Ordering::Relaxed); | |
566 | ||
567 | // Erase the lower bits. | |
568 | head &= !((1 << SHIFT) - 1); | |
569 | tail &= !((1 << SHIFT) - 1); | |
570 | ||
571 | unsafe { | |
572 | // Drop all messages between head and tail and deallocate the heap-allocated blocks. | |
573 | while head != tail { | |
574 | let offset = (head >> SHIFT) % LAP; | |
575 | ||
576 | if offset < BLOCK_CAP { | |
577 | // Drop the message in the slot. | |
578 | let slot = (*block).slots.get_unchecked(offset); | |
579 | let p = &mut *slot.msg.get(); | |
580 | p.as_mut_ptr().drop_in_place(); | |
581 | } else { | |
582 | // Deallocate the block and move to the next one. | |
583 | let next = (*block).next.load(Ordering::Relaxed); | |
584 | drop(Box::from_raw(block)); | |
585 | block = next; | |
586 | } | |
587 | ||
588 | head = head.wrapping_add(1 << SHIFT); | |
589 | } | |
590 | ||
591 | // Deallocate the last remaining block. | |
592 | if !block.is_null() { | |
593 | drop(Box::from_raw(block)); | |
594 | } | |
595 | } | |
596 | } | |
597 | } | |
598 | ||
599 | /// Receiver handle to a channel. | |
5869c6ff | 600 | pub struct Receiver<'a, T>(&'a Channel<T>); |
1b1a35ee XL |
601 | |
602 | /// Sender handle to a channel. | |
5869c6ff | 603 | pub struct Sender<'a, T>(&'a Channel<T>); |
1b1a35ee | 604 | |
5869c6ff | 605 | impl<T> SelectHandle for Receiver<'_, T> { |
1b1a35ee XL |
606 | fn try_select(&self, token: &mut Token) -> bool { |
607 | self.0.start_recv(token) | |
608 | } | |
609 | ||
610 | fn deadline(&self) -> Option<Instant> { | |
611 | None | |
612 | } | |
613 | ||
614 | fn register(&self, oper: Operation, cx: &Context) -> bool { | |
615 | self.0.receivers.register(oper, cx); | |
616 | self.is_ready() | |
617 | } | |
618 | ||
619 | fn unregister(&self, oper: Operation) { | |
620 | self.0.receivers.unregister(oper); | |
621 | } | |
622 | ||
623 | fn accept(&self, token: &mut Token, _cx: &Context) -> bool { | |
624 | self.try_select(token) | |
625 | } | |
626 | ||
627 | fn is_ready(&self) -> bool { | |
628 | !self.0.is_empty() || self.0.is_disconnected() | |
629 | } | |
630 | ||
631 | fn watch(&self, oper: Operation, cx: &Context) -> bool { | |
632 | self.0.receivers.watch(oper, cx); | |
633 | self.is_ready() | |
634 | } | |
635 | ||
636 | fn unwatch(&self, oper: Operation) { | |
637 | self.0.receivers.unwatch(oper); | |
638 | } | |
639 | } | |
640 | ||
5869c6ff | 641 | impl<T> SelectHandle for Sender<'_, T> { |
1b1a35ee XL |
642 | fn try_select(&self, token: &mut Token) -> bool { |
643 | self.0.start_send(token) | |
644 | } | |
645 | ||
646 | fn deadline(&self) -> Option<Instant> { | |
647 | None | |
648 | } | |
649 | ||
650 | fn register(&self, _oper: Operation, _cx: &Context) -> bool { | |
651 | self.is_ready() | |
652 | } | |
653 | ||
654 | fn unregister(&self, _oper: Operation) {} | |
655 | ||
656 | fn accept(&self, token: &mut Token, _cx: &Context) -> bool { | |
657 | self.try_select(token) | |
658 | } | |
659 | ||
660 | fn is_ready(&self) -> bool { | |
661 | true | |
662 | } | |
663 | ||
664 | fn watch(&self, _oper: Operation, _cx: &Context) -> bool { | |
665 | self.is_ready() | |
666 | } | |
667 | ||
668 | fn unwatch(&self, _oper: Operation) {} | |
669 | } |