]> git.proxmox.com Git - cargo.git/blob - vendor/crossbeam-channel/src/channel.rs
New upstream version 0.33.0
[cargo.git] / vendor / crossbeam-channel / src / channel.rs
1 //! The channel interface.
2
3 use std::fmt;
4 use std::iter::FusedIterator;
5 use std::mem;
6 use std::panic::{RefUnwindSafe, UnwindSafe};
7 use std::sync::Arc;
8 use std::time::{Duration, Instant};
9
10 use context::Context;
11 use counter;
12 use err::{RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError};
13 use flavors;
14 use select::{Operation, SelectHandle, Token};
15
16 /// Creates a channel of unbounded capacity.
17 ///
18 /// This channel has a growable buffer that can hold any number of messages at a time.
19 ///
20 /// # Examples
21 ///
22 /// ```
23 /// use std::thread;
24 /// use crossbeam_channel::unbounded;
25 ///
26 /// let (s, r) = unbounded();
27 ///
28 /// // Computes the n-th Fibonacci number.
29 /// fn fib(n: i32) -> i32 {
30 /// if n <= 1 {
31 /// n
32 /// } else {
33 /// fib(n - 1) + fib(n - 2)
34 /// }
35 /// }
36 ///
37 /// // Spawn an asynchronous computation.
38 /// thread::spawn(move || s.send(fib(20)).unwrap());
39 ///
40 /// // Print the result of the computation.
41 /// println!("{}", r.recv().unwrap());
42 /// ```
43 pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
44 let (s, r) = counter::new(flavors::list::Channel::new());
45 let s = Sender {
46 flavor: SenderFlavor::List(s),
47 };
48 let r = Receiver {
49 flavor: ReceiverFlavor::List(r),
50 };
51 (s, r)
52 }
53
54 /// Creates a channel of bounded capacity.
55 ///
56 /// This channel has a buffer that can hold at most `cap` messages at a time.
57 ///
58 /// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
59 /// receive operations must appear at the same time in order to pair up and pass the message over.
60 ///
61 /// # Panics
62 ///
63 /// Panics if the capacity is greater than `usize::max_value() / 4`.
64 ///
65 /// # Examples
66 ///
67 /// A channel of capacity 1:
68 ///
69 /// ```
70 /// use std::thread;
71 /// use std::time::Duration;
72 /// use crossbeam_channel::bounded;
73 ///
74 /// let (s, r) = bounded(1);
75 ///
76 /// // This call returns immediately because there is enough space in the channel.
77 /// s.send(1).unwrap();
78 ///
79 /// thread::spawn(move || {
80 /// // This call blocks the current thread because the channel is full.
81 /// // It will be able to complete only after the first message is received.
82 /// s.send(2).unwrap();
83 /// });
84 ///
85 /// thread::sleep(Duration::from_secs(1));
86 /// assert_eq!(r.recv(), Ok(1));
87 /// assert_eq!(r.recv(), Ok(2));
88 /// ```
89 ///
90 /// A zero-capacity channel:
91 ///
92 /// ```
93 /// use std::thread;
94 /// use std::time::Duration;
95 /// use crossbeam_channel::bounded;
96 ///
97 /// let (s, r) = bounded(0);
98 ///
99 /// thread::spawn(move || {
100 /// // This call blocks the current thread until a receive operation appears
101 /// // on the other side of the channel.
102 /// s.send(1).unwrap();
103 /// });
104 ///
105 /// thread::sleep(Duration::from_secs(1));
106 /// assert_eq!(r.recv(), Ok(1));
107 /// ```
108 pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
109 if cap == 0 {
110 let (s, r) = counter::new(flavors::zero::Channel::new());
111 let s = Sender {
112 flavor: SenderFlavor::Zero(s),
113 };
114 let r = Receiver {
115 flavor: ReceiverFlavor::Zero(r),
116 };
117 (s, r)
118 } else {
119 let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap));
120 let s = Sender {
121 flavor: SenderFlavor::Array(s),
122 };
123 let r = Receiver {
124 flavor: ReceiverFlavor::Array(r),
125 };
126 (s, r)
127 }
128 }
129
130 /// Creates a receiver that delivers a message after a certain duration of time.
131 ///
132 /// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
133 /// be sent into the channel after `duration` elapses. The message is the instant at which it is
134 /// sent.
135 ///
136 /// # Examples
137 ///
138 /// Using an `after` channel for timeouts:
139 ///
140 /// ```
141 /// # #[macro_use]
142 /// # extern crate crossbeam_channel;
143 /// # fn main() {
144 /// use std::time::Duration;
145 /// use crossbeam_channel::{after, unbounded};
146 ///
147 /// let (s, r) = unbounded::<i32>();
148 /// let timeout = Duration::from_millis(100);
149 ///
150 /// select! {
151 /// recv(r) -> msg => println!("received {:?}", msg),
152 /// recv(after(timeout)) -> _ => println!("timed out"),
153 /// }
154 /// # }
155 /// ```
156 ///
157 /// When the message gets sent:
158 ///
159 /// ```
160 /// use std::thread;
161 /// use std::time::{Duration, Instant};
162 /// use crossbeam_channel::after;
163 ///
164 /// // Converts a number of milliseconds into a `Duration`.
165 /// let ms = |ms| Duration::from_millis(ms);
166 ///
167 /// // Returns `true` if `a` and `b` are very close `Instant`s.
168 /// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
169 ///
170 /// let start = Instant::now();
171 /// let r = after(ms(100));
172 ///
173 /// thread::sleep(ms(500));
174 ///
175 /// // This message was sent 100 ms from the start and received 500 ms from the start.
176 /// assert!(eq(r.recv().unwrap(), start + ms(100)));
177 /// assert!(eq(Instant::now(), start + ms(500)));
178 /// ```
179 pub fn after(duration: Duration) -> Receiver<Instant> {
180 Receiver {
181 flavor: ReceiverFlavor::After(Arc::new(flavors::after::Channel::new(duration))),
182 }
183 }
184
185 /// Creates a receiver that never delivers messages.
186 ///
187 /// The channel is bounded with capacity of 0 and never gets disconnected.
188 ///
189 /// # Examples
190 ///
191 /// Using a `never` channel to optionally add a timeout to [`select!`]:
192 ///
193 /// ```
194 /// # #[macro_use]
195 /// # extern crate crossbeam_channel;
196 /// # fn main() {
197 /// use std::thread;
198 /// use std::time::{Duration, Instant};
199 /// use crossbeam_channel::{after, never, unbounded};
200 ///
201 /// let (s, r) = unbounded();
202 ///
203 /// thread::spawn(move || {
204 /// thread::sleep(Duration::from_secs(1));
205 /// s.send(1).unwrap();
206 /// });
207 ///
208 /// // Suppose this duration can be a `Some` or a `None`.
209 /// let duration = Some(Duration::from_millis(100));
210 ///
211 /// // Create a channel that times out after the specified duration.
212 /// let timeout = duration
213 /// .map(|d| after(d))
214 /// .unwrap_or(never());
215 ///
216 /// select! {
217 /// recv(r) -> msg => assert_eq!(msg, Ok(1)),
218 /// recv(timeout) -> _ => println!("timed out"),
219 /// }
220 /// # }
221 /// ```
222 ///
223 /// [`select!`]: macro.select.html
224 pub fn never<T>() -> Receiver<T> {
225 Receiver {
226 flavor: ReceiverFlavor::Never(flavors::never::Channel::new()),
227 }
228 }
229
230 /// Creates a receiver that delivers messages periodically.
231 ///
232 /// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be
233 /// sent into the channel in intervals of `duration`. Each message is the instant at which it is
234 /// sent.
235 ///
236 /// # Examples
237 ///
238 /// Using a `tick` channel to periodically print elapsed time:
239 ///
240 /// ```
241 /// use std::time::{Duration, Instant};
242 /// use crossbeam_channel::tick;
243 ///
244 /// let start = Instant::now();
245 /// let ticker = tick(Duration::from_millis(100));
246 ///
247 /// for _ in 0..5 {
248 /// ticker.recv().unwrap();
249 /// println!("elapsed: {:?}", start.elapsed());
250 /// }
251 /// ```
252 ///
253 /// When messages get sent:
254 ///
255 /// ```
256 /// use std::thread;
257 /// use std::time::{Duration, Instant};
258 /// use crossbeam_channel::tick;
259 ///
260 /// // Converts a number of milliseconds into a `Duration`.
261 /// let ms = |ms| Duration::from_millis(ms);
262 ///
263 /// // Returns `true` if `a` and `b` are very close `Instant`s.
264 /// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
265 ///
266 /// let start = Instant::now();
267 /// let r = tick(ms(100));
268 ///
269 /// // This message was sent 100 ms from the start and received 100 ms from the start.
270 /// assert!(eq(r.recv().unwrap(), start + ms(100)));
271 /// assert!(eq(Instant::now(), start + ms(100)));
272 ///
273 /// thread::sleep(ms(500));
274 ///
275 /// // This message was sent 200 ms from the start and received 600 ms from the start.
276 /// assert!(eq(r.recv().unwrap(), start + ms(200)));
277 /// assert!(eq(Instant::now(), start + ms(600)));
278 ///
279 /// // This message was sent 700 ms from the start and received 700 ms from the start.
280 /// assert!(eq(r.recv().unwrap(), start + ms(700)));
281 /// assert!(eq(Instant::now(), start + ms(700)));
282 /// ```
283 pub fn tick(duration: Duration) -> Receiver<Instant> {
284 Receiver {
285 flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(duration))),
286 }
287 }
288
289 /// The sending side of a channel.
290 ///
291 /// # Examples
292 ///
293 /// ```
294 /// use std::thread;
295 /// use crossbeam_channel::unbounded;
296 ///
297 /// let (s1, r) = unbounded();
298 /// let s2 = s1.clone();
299 ///
300 /// thread::spawn(move || s1.send(1).unwrap());
301 /// thread::spawn(move || s2.send(2).unwrap());
302 ///
303 /// let msg1 = r.recv().unwrap();
304 /// let msg2 = r.recv().unwrap();
305 ///
306 /// assert_eq!(msg1 + msg2, 3);
307 /// ```
308 pub struct Sender<T> {
309 flavor: SenderFlavor<T>,
310 }
311
312 /// Sender flavors.
313 enum SenderFlavor<T> {
314 /// Bounded channel based on a preallocated array.
315 Array(counter::Sender<flavors::array::Channel<T>>),
316
317 /// Unbounded channel implemented as a linked list.
318 List(counter::Sender<flavors::list::Channel<T>>),
319
320 /// Zero-capacity channel.
321 Zero(counter::Sender<flavors::zero::Channel<T>>),
322 }
323
324 unsafe impl<T: Send> Send for Sender<T> {}
325 unsafe impl<T: Send> Sync for Sender<T> {}
326
327 impl<T> UnwindSafe for Sender<T> {}
328 impl<T> RefUnwindSafe for Sender<T> {}
329
330 impl<T> Sender<T> {
331 /// Attempts to send a message into the channel without blocking.
332 ///
333 /// This method will either send a message into the channel immediately or return an error if
334 /// the channel is full or disconnected. The returned error contains the original message.
335 ///
336 /// If called on a zero-capacity channel, this method will send the message only if there
337 /// happens to be a receive operation on the other side of the channel at the same time.
338 ///
339 /// # Examples
340 ///
341 /// ```
342 /// use crossbeam_channel::{bounded, TrySendError};
343 ///
344 /// let (s, r) = bounded(1);
345 ///
346 /// assert_eq!(s.try_send(1), Ok(()));
347 /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
348 ///
349 /// drop(r);
350 /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3)));
351 /// ```
352 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
353 match &self.flavor {
354 SenderFlavor::Array(chan) => chan.try_send(msg),
355 SenderFlavor::List(chan) => chan.try_send(msg),
356 SenderFlavor::Zero(chan) => chan.try_send(msg),
357 }
358 }
359
360 /// Blocks the current thread until a message is sent or the channel is disconnected.
361 ///
362 /// If the channel is full and not disconnected, this call will block until the send operation
363 /// can proceed. If the channel becomes disconnected, this call will wake up and return an
364 /// error. The returned error contains the original message.
365 ///
366 /// If called on a zero-capacity channel, this method will wait for a receive operation to
367 /// appear on the other side of the channel.
368 ///
369 /// # Examples
370 ///
371 /// ```
372 /// use std::thread;
373 /// use std::time::Duration;
374 /// use crossbeam_channel::{bounded, SendError};
375 ///
376 /// let (s, r) = bounded(1);
377 /// assert_eq!(s.send(1), Ok(()));
378 ///
379 /// thread::spawn(move || {
380 /// assert_eq!(r.recv(), Ok(1));
381 /// thread::sleep(Duration::from_secs(1));
382 /// drop(r);
383 /// });
384 ///
385 /// assert_eq!(s.send(2), Ok(()));
386 /// assert_eq!(s.send(3), Err(SendError(3)));
387 /// ```
388 pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
389 match &self.flavor {
390 SenderFlavor::Array(chan) => chan.send(msg, None),
391 SenderFlavor::List(chan) => chan.send(msg, None),
392 SenderFlavor::Zero(chan) => chan.send(msg, None),
393 }.map_err(|err| match err {
394 SendTimeoutError::Disconnected(msg) => SendError(msg),
395 SendTimeoutError::Timeout(_) => unreachable!(),
396 })
397 }
398
399 /// Waits for a message to be sent into the channel, but only for a limited time.
400 ///
401 /// If the channel is full and not disconnected, this call will block until the send operation
402 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
403 /// wake up and return an error. The returned error contains the original message.
404 ///
405 /// If called on a zero-capacity channel, this method will wait for a receive operation to
406 /// appear on the other side of the channel.
407 ///
408 /// # Examples
409 ///
410 /// ```
411 /// use std::thread;
412 /// use std::time::Duration;
413 /// use crossbeam_channel::{bounded, SendTimeoutError};
414 ///
415 /// let (s, r) = bounded(0);
416 ///
417 /// thread::spawn(move || {
418 /// thread::sleep(Duration::from_secs(1));
419 /// assert_eq!(r.recv(), Ok(2));
420 /// drop(r);
421 /// });
422 ///
423 /// assert_eq!(
424 /// s.send_timeout(1, Duration::from_millis(500)),
425 /// Err(SendTimeoutError::Timeout(1)),
426 /// );
427 /// assert_eq!(
428 /// s.send_timeout(2, Duration::from_secs(1)),
429 /// Ok(()),
430 /// );
431 /// assert_eq!(
432 /// s.send_timeout(3, Duration::from_millis(500)),
433 /// Err(SendTimeoutError::Disconnected(3)),
434 /// );
435 /// ```
436 pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
437 let deadline = Instant::now() + timeout;
438
439 match &self.flavor {
440 SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
441 SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
442 SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
443 }
444 }
445
446 /// Returns `true` if the channel is empty.
447 ///
448 /// Note: Zero-capacity channels are always empty.
449 ///
450 /// # Examples
451 ///
452 /// ```
453 /// use crossbeam_channel::unbounded;
454 ///
455 /// let (s, r) = unbounded();
456 /// assert!(s.is_empty());
457 ///
458 /// s.send(0).unwrap();
459 /// assert!(!s.is_empty());
460 /// ```
461 pub fn is_empty(&self) -> bool {
462 match &self.flavor {
463 SenderFlavor::Array(chan) => chan.is_empty(),
464 SenderFlavor::List(chan) => chan.is_empty(),
465 SenderFlavor::Zero(chan) => chan.is_empty(),
466 }
467 }
468
469 /// Returns `true` if the channel is full.
470 ///
471 /// Note: Zero-capacity channels are always full.
472 ///
473 /// # Examples
474 ///
475 /// ```
476 /// use crossbeam_channel::bounded;
477 ///
478 /// let (s, r) = bounded(1);
479 ///
480 /// assert!(!s.is_full());
481 /// s.send(0).unwrap();
482 /// assert!(s.is_full());
483 /// ```
484 pub fn is_full(&self) -> bool {
485 match &self.flavor {
486 SenderFlavor::Array(chan) => chan.is_full(),
487 SenderFlavor::List(chan) => chan.is_full(),
488 SenderFlavor::Zero(chan) => chan.is_full(),
489 }
490 }
491
492 /// Returns the number of messages in the channel.
493 ///
494 /// # Examples
495 ///
496 /// ```
497 /// use crossbeam_channel::unbounded;
498 ///
499 /// let (s, r) = unbounded();
500 /// assert_eq!(s.len(), 0);
501 ///
502 /// s.send(1).unwrap();
503 /// s.send(2).unwrap();
504 /// assert_eq!(s.len(), 2);
505 /// ```
506 pub fn len(&self) -> usize {
507 match &self.flavor {
508 SenderFlavor::Array(chan) => chan.len(),
509 SenderFlavor::List(chan) => chan.len(),
510 SenderFlavor::Zero(chan) => chan.len(),
511 }
512 }
513
514 /// If the channel is bounded, returns its capacity.
515 ///
516 /// # Examples
517 ///
518 /// ```
519 /// use crossbeam_channel::{bounded, unbounded};
520 ///
521 /// let (s, _) = unbounded::<i32>();
522 /// assert_eq!(s.capacity(), None);
523 ///
524 /// let (s, _) = bounded::<i32>(5);
525 /// assert_eq!(s.capacity(), Some(5));
526 ///
527 /// let (s, _) = bounded::<i32>(0);
528 /// assert_eq!(s.capacity(), Some(0));
529 /// ```
530 pub fn capacity(&self) -> Option<usize> {
531 match &self.flavor {
532 SenderFlavor::Array(chan) => chan.capacity(),
533 SenderFlavor::List(chan) => chan.capacity(),
534 SenderFlavor::Zero(chan) => chan.capacity(),
535 }
536 }
537 }
538
539 impl<T> Drop for Sender<T> {
540 fn drop(&mut self) {
541 unsafe {
542 match &self.flavor {
543 SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
544 SenderFlavor::List(chan) => chan.release(|c| c.disconnect()),
545 SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
546 }
547 }
548 }
549 }
550
551 impl<T> Clone for Sender<T> {
552 fn clone(&self) -> Self {
553 let flavor = match &self.flavor {
554 SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
555 SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
556 SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
557 };
558
559 Sender { flavor }
560 }
561 }
562
563 impl<T> fmt::Debug for Sender<T> {
564 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
565 f.pad("Sender { .. }")
566 }
567 }
568
569 /// The receiving side of a channel.
570 ///
571 /// # Examples
572 ///
573 /// ```
574 /// use std::thread;
575 /// use std::time::Duration;
576 /// use crossbeam_channel::unbounded;
577 ///
578 /// let (s, r) = unbounded();
579 ///
580 /// thread::spawn(move || {
581 /// s.send(1);
582 /// thread::sleep(Duration::from_secs(1));
583 /// s.send(2);
584 /// });
585 ///
586 /// assert_eq!(r.recv(), Ok(1)); // Received immediately.
587 /// assert_eq!(r.recv(), Ok(2)); // Received after 1 second.
588 /// ```
589 pub struct Receiver<T> {
590 flavor: ReceiverFlavor<T>,
591 }
592
593 /// Receiver flavors.
594 enum ReceiverFlavor<T> {
595 /// Bounded channel based on a preallocated array.
596 Array(counter::Receiver<flavors::array::Channel<T>>),
597
598 /// Unbounded channel implemented as a linked list.
599 List(counter::Receiver<flavors::list::Channel<T>>),
600
601 /// Zero-capacity channel.
602 Zero(counter::Receiver<flavors::zero::Channel<T>>),
603
604 /// The after flavor.
605 After(Arc<flavors::after::Channel>),
606
607 /// The tick flavor.
608 Tick(Arc<flavors::tick::Channel>),
609
610 /// The never flavor.
611 Never(flavors::never::Channel<T>),
612 }
613
614 unsafe impl<T: Send> Send for Receiver<T> {}
615 unsafe impl<T: Send> Sync for Receiver<T> {}
616
617 impl<T> UnwindSafe for Receiver<T> {}
618 impl<T> RefUnwindSafe for Receiver<T> {}
619
620 impl<T> Receiver<T> {
621 /// Attempts to receive a message from the channel without blocking.
622 ///
623 /// This method will either receive a message from the channel immediately or return an error
624 /// if the channel is empty.
625 ///
626 /// If called on a zero-capacity channel, this method will receive a message only if there
627 /// happens to be a send operation on the other side of the channel at the same time.
628 ///
629 /// # Examples
630 ///
631 /// ```
632 /// use crossbeam_channel::{unbounded, TryRecvError};
633 ///
634 /// let (s, r) = unbounded();
635 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
636 ///
637 /// s.send(5).unwrap();
638 /// drop(s);
639 ///
640 /// assert_eq!(r.try_recv(), Ok(5));
641 /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
642 /// ```
643 pub fn try_recv(&self) -> Result<T, TryRecvError> {
644 match &self.flavor {
645 ReceiverFlavor::Array(chan) => chan.try_recv(),
646 ReceiverFlavor::List(chan) => chan.try_recv(),
647 ReceiverFlavor::Zero(chan) => chan.try_recv(),
648 ReceiverFlavor::After(chan) => {
649 let msg = chan.try_recv();
650 unsafe {
651 mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
652 &msg,
653 )
654 }
655 }
656 ReceiverFlavor::Tick(chan) => {
657 let msg = chan.try_recv();
658 unsafe {
659 mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
660 &msg,
661 )
662 }
663 }
664 ReceiverFlavor::Never(chan) => chan.try_recv(),
665 }
666 }
667
668 /// Blocks the current thread until a message is received or the channel is empty and
669 /// disconnected.
670 ///
671 /// If the channel is empty and not disconnected, this call will block until the receive
672 /// operation can proceed. If the channel is empty and becomes disconnected, this call will
673 /// wake up and return an error.
674 ///
675 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
676 /// on the other side of the channel.
677 ///
678 /// # Examples
679 ///
680 /// ```
681 /// use std::thread;
682 /// use std::time::Duration;
683 /// use crossbeam_channel::{unbounded, RecvError};
684 ///
685 /// let (s, r) = unbounded();
686 ///
687 /// thread::spawn(move || {
688 /// thread::sleep(Duration::from_secs(1));
689 /// s.send(5).unwrap();
690 /// drop(s);
691 /// });
692 ///
693 /// assert_eq!(r.recv(), Ok(5));
694 /// assert_eq!(r.recv(), Err(RecvError));
695 /// ```
696 pub fn recv(&self) -> Result<T, RecvError> {
697 match &self.flavor {
698 ReceiverFlavor::Array(chan) => chan.recv(None),
699 ReceiverFlavor::List(chan) => chan.recv(None),
700 ReceiverFlavor::Zero(chan) => chan.recv(None),
701 ReceiverFlavor::After(chan) => {
702 let msg = chan.recv(None);
703 unsafe {
704 mem::transmute_copy::<
705 Result<Instant, RecvTimeoutError>,
706 Result<T, RecvTimeoutError>,
707 >(&msg)
708 }
709 }
710 ReceiverFlavor::Tick(chan) => {
711 let msg = chan.recv(None);
712 unsafe {
713 mem::transmute_copy::<
714 Result<Instant, RecvTimeoutError>,
715 Result<T, RecvTimeoutError>,
716 >(&msg)
717 }
718 }
719 ReceiverFlavor::Never(chan) => chan.recv(None),
720 }.map_err(|_| RecvError)
721 }
722
723 /// Waits for a message to be received from the channel, but only for a limited time.
724 ///
725 /// If the channel is empty and not disconnected, this call will block until the receive
726 /// operation can proceed or the operation times out. If the channel is empty and becomes
727 /// disconnected, this call will wake up and return an error.
728 ///
729 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
730 /// on the other side of the channel.
731 ///
732 /// # Examples
733 ///
734 /// ```
735 /// use std::thread;
736 /// use std::time::Duration;
737 /// use crossbeam_channel::{unbounded, RecvTimeoutError};
738 ///
739 /// let (s, r) = unbounded();
740 ///
741 /// thread::spawn(move || {
742 /// thread::sleep(Duration::from_secs(1));
743 /// s.send(5).unwrap();
744 /// drop(s);
745 /// });
746 ///
747 /// assert_eq!(
748 /// r.recv_timeout(Duration::from_millis(500)),
749 /// Err(RecvTimeoutError::Timeout),
750 /// );
751 /// assert_eq!(
752 /// r.recv_timeout(Duration::from_secs(1)),
753 /// Ok(5),
754 /// );
755 /// assert_eq!(
756 /// r.recv_timeout(Duration::from_secs(1)),
757 /// Err(RecvTimeoutError::Disconnected),
758 /// );
759 /// ```
760 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
761 let deadline = Instant::now() + timeout;
762
763 match &self.flavor {
764 ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
765 ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
766 ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
767 ReceiverFlavor::After(chan) => {
768 let msg = chan.recv(Some(deadline));
769 unsafe {
770 mem::transmute_copy::<
771 Result<Instant, RecvTimeoutError>,
772 Result<T, RecvTimeoutError>,
773 >(&msg)
774 }
775 }
776 ReceiverFlavor::Tick(chan) => {
777 let msg = chan.recv(Some(deadline));
778 unsafe {
779 mem::transmute_copy::<
780 Result<Instant, RecvTimeoutError>,
781 Result<T, RecvTimeoutError>,
782 >(&msg)
783 }
784 }
785 ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)),
786 }
787 }
788
789 /// Returns `true` if the channel is empty.
790 ///
791 /// Note: Zero-capacity channels are always empty.
792 ///
793 /// # Examples
794 ///
795 /// ```
796 /// use crossbeam_channel::unbounded;
797 ///
798 /// let (s, r) = unbounded();
799 ///
800 /// assert!(r.is_empty());
801 /// s.send(0).unwrap();
802 /// assert!(!r.is_empty());
803 /// ```
804 pub fn is_empty(&self) -> bool {
805 match &self.flavor {
806 ReceiverFlavor::Array(chan) => chan.is_empty(),
807 ReceiverFlavor::List(chan) => chan.is_empty(),
808 ReceiverFlavor::Zero(chan) => chan.is_empty(),
809 ReceiverFlavor::After(chan) => chan.is_empty(),
810 ReceiverFlavor::Tick(chan) => chan.is_empty(),
811 ReceiverFlavor::Never(chan) => chan.is_empty(),
812 }
813 }
814
815 /// Returns `true` if the channel is full.
816 ///
817 /// Note: Zero-capacity channels are always full.
818 ///
819 /// # Examples
820 ///
821 /// ```
822 /// use crossbeam_channel::bounded;
823 ///
824 /// let (s, r) = bounded(1);
825 ///
826 /// assert!(!r.is_full());
827 /// s.send(0).unwrap();
828 /// assert!(r.is_full());
829 /// ```
830 pub fn is_full(&self) -> bool {
831 match &self.flavor {
832 ReceiverFlavor::Array(chan) => chan.is_full(),
833 ReceiverFlavor::List(chan) => chan.is_full(),
834 ReceiverFlavor::Zero(chan) => chan.is_full(),
835 ReceiverFlavor::After(chan) => chan.is_full(),
836 ReceiverFlavor::Tick(chan) => chan.is_full(),
837 ReceiverFlavor::Never(chan) => chan.is_full(),
838 }
839 }
840
841 /// Returns the number of messages in the channel.
842 ///
843 /// # Examples
844 ///
845 /// ```
846 /// use crossbeam_channel::unbounded;
847 ///
848 /// let (s, r) = unbounded();
849 /// assert_eq!(r.len(), 0);
850 ///
851 /// s.send(1).unwrap();
852 /// s.send(2).unwrap();
853 /// assert_eq!(r.len(), 2);
854 /// ```
855 pub fn len(&self) -> usize {
856 match &self.flavor {
857 ReceiverFlavor::Array(chan) => chan.len(),
858 ReceiverFlavor::List(chan) => chan.len(),
859 ReceiverFlavor::Zero(chan) => chan.len(),
860 ReceiverFlavor::After(chan) => chan.len(),
861 ReceiverFlavor::Tick(chan) => chan.len(),
862 ReceiverFlavor::Never(chan) => chan.len(),
863 }
864 }
865
866 /// If the channel is bounded, returns its capacity.
867 ///
868 /// # Examples
869 ///
870 /// ```
871 /// use crossbeam_channel::{bounded, unbounded};
872 ///
873 /// let (_, r) = unbounded::<i32>();
874 /// assert_eq!(r.capacity(), None);
875 ///
876 /// let (_, r) = bounded::<i32>(5);
877 /// assert_eq!(r.capacity(), Some(5));
878 ///
879 /// let (_, r) = bounded::<i32>(0);
880 /// assert_eq!(r.capacity(), Some(0));
881 /// ```
882 pub fn capacity(&self) -> Option<usize> {
883 match &self.flavor {
884 ReceiverFlavor::Array(chan) => chan.capacity(),
885 ReceiverFlavor::List(chan) => chan.capacity(),
886 ReceiverFlavor::Zero(chan) => chan.capacity(),
887 ReceiverFlavor::After(chan) => chan.capacity(),
888 ReceiverFlavor::Tick(chan) => chan.capacity(),
889 ReceiverFlavor::Never(chan) => chan.capacity(),
890 }
891 }
892
893 /// A blocking iterator over messages in the channel.
894 ///
895 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if
896 /// the channel becomes empty and disconnected, it returns [`None`] without blocking.
897 ///
898 /// [`next`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next
899 /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
900 ///
901 /// # Examples
902 ///
903 /// ```
904 /// use std::thread;
905 /// use crossbeam_channel::unbounded;
906 ///
907 /// let (s, r) = unbounded();
908 ///
909 /// thread::spawn(move || {
910 /// s.send(1).unwrap();
911 /// s.send(2).unwrap();
912 /// s.send(3).unwrap();
913 /// drop(s); // Disconnect the channel.
914 /// });
915 ///
916 /// // Collect all messages from the channel.
917 /// // Note that the call to `collect` blocks until the sender is dropped.
918 /// let v: Vec<_> = r.iter().collect();
919 ///
920 /// assert_eq!(v, [1, 2, 3]);
921 /// ```
922 pub fn iter(&self) -> Iter<T> {
923 Iter { receiver: self }
924 }
925
926 /// A non-blocking iterator over messages in the channel.
927 ///
928 /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
929 /// never blocks waiting for the next message.
930 ///
931 /// [`next`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next
932 ///
933 /// # Examples
934 ///
935 /// ```
936 /// use std::thread;
937 /// use std::time::Duration;
938 /// use crossbeam_channel::unbounded;
939 ///
940 /// let (s, r) = unbounded::<i32>();
941 ///
942 /// thread::spawn(move || {
943 /// s.send(1).unwrap();
944 /// thread::sleep(Duration::from_secs(1));
945 /// s.send(2).unwrap();
946 /// thread::sleep(Duration::from_secs(2));
947 /// s.send(3).unwrap();
948 /// });
949 ///
950 /// thread::sleep(Duration::from_secs(2));
951 ///
952 /// // Collect all messages from the channel without blocking.
953 /// // The third message hasn't been sent yet so we'll collect only the first two.
954 /// let v: Vec<_> = r.try_iter().collect();
955 ///
956 /// assert_eq!(v, [1, 2]);
957 /// ```
958 pub fn try_iter(&self) -> TryIter<T> {
959 TryIter { receiver: self }
960 }
961 }
962
963 impl<T> Drop for Receiver<T> {
964 fn drop(&mut self) {
965 unsafe {
966 match &self.flavor {
967 ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
968 ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect()),
969 ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
970 ReceiverFlavor::After(_) => {},
971 ReceiverFlavor::Tick(_) => {},
972 ReceiverFlavor::Never(_) => {},
973 }
974 }
975 }
976 }
977
978 impl<T> Clone for Receiver<T> {
979 fn clone(&self) -> Self {
980 let flavor = match &self.flavor {
981 ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
982 ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
983 ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
984 ReceiverFlavor::After(chan) => ReceiverFlavor::After(chan.clone()),
985 ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()),
986 ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()),
987 };
988
989 Receiver { flavor }
990 }
991 }
992
993 impl<T> fmt::Debug for Receiver<T> {
994 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
995 f.pad("Receiver { .. }")
996 }
997 }
998
999 impl<'a, T> IntoIterator for &'a Receiver<T> {
1000 type Item = T;
1001 type IntoIter = Iter<'a, T>;
1002
1003 fn into_iter(self) -> Self::IntoIter {
1004 self.iter()
1005 }
1006 }
1007
1008 impl<T> IntoIterator for Receiver<T> {
1009 type Item = T;
1010 type IntoIter = IntoIter<T>;
1011
1012 fn into_iter(self) -> Self::IntoIter {
1013 IntoIter { receiver: self }
1014 }
1015 }
1016
1017 /// A blocking iterator over messages in a channel.
1018 ///
1019 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1020 /// channel becomes empty and disconnected, it returns [`None`] without blocking.
1021 ///
1022 /// [`next`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next
1023 /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
1024 ///
1025 /// # Examples
1026 ///
1027 /// ```
1028 /// use std::thread;
1029 /// use crossbeam_channel::unbounded;
1030 ///
1031 /// let (s, r) = unbounded();
1032 ///
1033 /// thread::spawn(move || {
1034 /// s.send(1).unwrap();
1035 /// s.send(2).unwrap();
1036 /// s.send(3).unwrap();
1037 /// drop(s); // Disconnect the channel.
1038 /// });
1039 ///
1040 /// // Collect all messages from the channel.
1041 /// // Note that the call to `collect` blocks until the sender is dropped.
1042 /// let v: Vec<_> = r.iter().collect();
1043 ///
1044 /// assert_eq!(v, [1, 2, 3]);
1045 /// ```
1046 pub struct Iter<'a, T: 'a> {
1047 receiver: &'a Receiver<T>,
1048 }
1049
1050 impl<'a, T> FusedIterator for Iter<'a, T> {}
1051
1052 impl<'a, T> Iterator for Iter<'a, T> {
1053 type Item = T;
1054
1055 fn next(&mut self) -> Option<Self::Item> {
1056 self.receiver.recv().ok()
1057 }
1058 }
1059
1060 impl<'a, T> fmt::Debug for Iter<'a, T> {
1061 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1062 f.pad("Iter { .. }")
1063 }
1064 }
1065
1066 /// A non-blocking iterator over messages in a channel.
1067 ///
1068 /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1069 /// never blocks waiting for the next message.
1070 ///
1071 /// [`next`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next
1072 ///
1073 /// # Examples
1074 ///
1075 /// ```
1076 /// use std::thread;
1077 /// use std::time::Duration;
1078 /// use crossbeam_channel::unbounded;
1079 ///
1080 /// let (s, r) = unbounded::<i32>();
1081 ///
1082 /// thread::spawn(move || {
1083 /// s.send(1).unwrap();
1084 /// thread::sleep(Duration::from_secs(1));
1085 /// s.send(2).unwrap();
1086 /// thread::sleep(Duration::from_secs(2));
1087 /// s.send(3).unwrap();
1088 /// });
1089 ///
1090 /// thread::sleep(Duration::from_secs(2));
1091 ///
1092 /// // Collect all messages from the channel without blocking.
1093 /// // The third message hasn't been sent yet so we'll collect only the first two.
1094 /// let v: Vec<_> = r.try_iter().collect();
1095 ///
1096 /// assert_eq!(v, [1, 2]);
1097 /// ```
1098 pub struct TryIter<'a, T: 'a> {
1099 receiver: &'a Receiver<T>,
1100 }
1101
1102 impl<'a, T> Iterator for TryIter<'a, T> {
1103 type Item = T;
1104
1105 fn next(&mut self) -> Option<Self::Item> {
1106 self.receiver.try_recv().ok()
1107 }
1108 }
1109
1110 impl<'a, T> fmt::Debug for TryIter<'a, T> {
1111 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1112 f.pad("TryIter { .. }")
1113 }
1114 }
1115
1116 /// A blocking iterator over messages in a channel.
1117 ///
1118 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1119 /// channel becomes empty and disconnected, it returns [`None`] without blocking.
1120 ///
1121 /// [`next`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next
1122 /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
1123 ///
1124 /// # Examples
1125 ///
1126 /// ```
1127 /// use std::thread;
1128 /// use crossbeam_channel::unbounded;
1129 ///
1130 /// let (s, r) = unbounded();
1131 ///
1132 /// thread::spawn(move || {
1133 /// s.send(1).unwrap();
1134 /// s.send(2).unwrap();
1135 /// s.send(3).unwrap();
1136 /// drop(s); // Disconnect the channel.
1137 /// });
1138 ///
1139 /// // Collect all messages from the channel.
1140 /// // Note that the call to `collect` blocks until the sender is dropped.
1141 /// let v: Vec<_> = r.into_iter().collect();
1142 ///
1143 /// assert_eq!(v, [1, 2, 3]);
1144 /// ```
1145 pub struct IntoIter<T> {
1146 receiver: Receiver<T>,
1147 }
1148
1149 impl<T> FusedIterator for IntoIter<T> {}
1150
1151 impl<T> Iterator for IntoIter<T> {
1152 type Item = T;
1153
1154 fn next(&mut self) -> Option<Self::Item> {
1155 self.receiver.recv().ok()
1156 }
1157 }
1158
1159 impl<T> fmt::Debug for IntoIter<T> {
1160 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1161 f.pad("IntoIter { .. }")
1162 }
1163 }
1164
1165 impl<T> SelectHandle for Sender<T> {
1166 fn try_select(&self, token: &mut Token) -> bool {
1167 match &self.flavor {
1168 SenderFlavor::Array(chan) => chan.sender().try_select(token),
1169 SenderFlavor::List(chan) => chan.sender().try_select(token),
1170 SenderFlavor::Zero(chan) => chan.sender().try_select(token),
1171 }
1172 }
1173
1174 fn deadline(&self) -> Option<Instant> {
1175 None
1176 }
1177
1178 fn register(&self, oper: Operation, cx: &Context) -> bool {
1179 match &self.flavor {
1180 SenderFlavor::Array(chan) => chan.sender().register(oper, cx),
1181 SenderFlavor::List(chan) => chan.sender().register(oper, cx),
1182 SenderFlavor::Zero(chan) => chan.sender().register(oper, cx),
1183 }
1184 }
1185
1186 fn unregister(&self, oper: Operation) {
1187 match &self.flavor {
1188 SenderFlavor::Array(chan) => chan.sender().unregister(oper),
1189 SenderFlavor::List(chan) => chan.sender().unregister(oper),
1190 SenderFlavor::Zero(chan) => chan.sender().unregister(oper),
1191 }
1192 }
1193
1194 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1195 match &self.flavor {
1196 SenderFlavor::Array(chan) => chan.sender().accept(token, cx),
1197 SenderFlavor::List(chan) => chan.sender().accept(token, cx),
1198 SenderFlavor::Zero(chan) => chan.sender().accept(token, cx),
1199 }
1200 }
1201
1202 fn is_ready(&self) -> bool {
1203 match &self.flavor {
1204 SenderFlavor::Array(chan) => chan.sender().is_ready(),
1205 SenderFlavor::List(chan) => chan.sender().is_ready(),
1206 SenderFlavor::Zero(chan) => chan.sender().is_ready(),
1207 }
1208 }
1209
1210 fn watch(&self, oper: Operation, cx: &Context) -> bool {
1211 match &self.flavor {
1212 SenderFlavor::Array(chan) => chan.sender().watch(oper, cx),
1213 SenderFlavor::List(chan) => chan.sender().watch(oper, cx),
1214 SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx),
1215 }
1216 }
1217
1218 fn unwatch(&self, oper: Operation) {
1219 match &self.flavor {
1220 SenderFlavor::Array(chan) => chan.sender().unwatch(oper),
1221 SenderFlavor::List(chan) => chan.sender().unwatch(oper),
1222 SenderFlavor::Zero(chan) => chan.sender().unwatch(oper),
1223 }
1224 }
1225 }
1226
1227 impl<T> SelectHandle for Receiver<T> {
1228 fn try_select(&self, token: &mut Token) -> bool {
1229 match &self.flavor {
1230 ReceiverFlavor::Array(chan) => chan.receiver().try_select(token),
1231 ReceiverFlavor::List(chan) => chan.receiver().try_select(token),
1232 ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token),
1233 ReceiverFlavor::After(chan) => chan.try_select(token),
1234 ReceiverFlavor::Tick(chan) => chan.try_select(token),
1235 ReceiverFlavor::Never(chan) => chan.try_select(token),
1236 }
1237 }
1238
1239 fn deadline(&self) -> Option<Instant> {
1240 match &self.flavor {
1241 ReceiverFlavor::Array(_) => None,
1242 ReceiverFlavor::List(_) => None,
1243 ReceiverFlavor::Zero(_) => None,
1244 ReceiverFlavor::After(chan) => chan.deadline(),
1245 ReceiverFlavor::Tick(chan) => chan.deadline(),
1246 ReceiverFlavor::Never(chan) => chan.deadline(),
1247 }
1248 }
1249
1250 fn register(&self, oper: Operation, cx: &Context) -> bool {
1251 match &self.flavor {
1252 ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx),
1253 ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx),
1254 ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx),
1255 ReceiverFlavor::After(chan) => chan.register(oper, cx),
1256 ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
1257 ReceiverFlavor::Never(chan) => chan.register(oper, cx),
1258 }
1259 }
1260
1261 fn unregister(&self, oper: Operation) {
1262 match &self.flavor {
1263 ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper),
1264 ReceiverFlavor::List(chan) => chan.receiver().unregister(oper),
1265 ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper),
1266 ReceiverFlavor::After(chan) => chan.unregister(oper),
1267 ReceiverFlavor::Tick(chan) => chan.unregister(oper),
1268 ReceiverFlavor::Never(chan) => chan.unregister(oper),
1269 }
1270 }
1271
1272 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1273 match &self.flavor {
1274 ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx),
1275 ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx),
1276 ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx),
1277 ReceiverFlavor::After(chan) => chan.accept(token, cx),
1278 ReceiverFlavor::Tick(chan) => chan.accept(token, cx),
1279 ReceiverFlavor::Never(chan) => chan.accept(token, cx),
1280 }
1281 }
1282
1283 fn is_ready(&self) -> bool {
1284 match &self.flavor {
1285 ReceiverFlavor::Array(chan) => chan.receiver().is_ready(),
1286 ReceiverFlavor::List(chan) => chan.receiver().is_ready(),
1287 ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(),
1288 ReceiverFlavor::After(chan) => chan.is_ready(),
1289 ReceiverFlavor::Tick(chan) => chan.is_ready(),
1290 ReceiverFlavor::Never(chan) => chan.is_ready(),
1291 }
1292 }
1293
1294 fn watch(&self, oper: Operation, cx: &Context) -> bool {
1295 match &self.flavor {
1296 ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx),
1297 ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx),
1298 ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx),
1299 ReceiverFlavor::After(chan) => chan.watch(oper, cx),
1300 ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
1301 ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
1302 }
1303 }
1304
1305 fn unwatch(&self, oper: Operation) {
1306 match &self.flavor {
1307 ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper),
1308 ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper),
1309 ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper),
1310 ReceiverFlavor::After(chan) => chan.unwatch(oper),
1311 ReceiverFlavor::Tick(chan) => chan.unwatch(oper),
1312 ReceiverFlavor::Never(chan) => chan.unwatch(oper),
1313 }
1314 }
1315 }
1316
1317 /// Writes a message into the channel.
1318 pub unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
1319 match &s.flavor {
1320 SenderFlavor::Array(chan) => chan.write(token, msg),
1321 SenderFlavor::List(chan) => chan.write(token, msg),
1322 SenderFlavor::Zero(chan) => chan.write(token, msg),
1323 }
1324 }
1325
1326 /// Reads a message from the channel.
1327 pub unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> {
1328 match &r.flavor {
1329 ReceiverFlavor::Array(chan) => chan.read(token),
1330 ReceiverFlavor::List(chan) => chan.read(token),
1331 ReceiverFlavor::Zero(chan) => chan.read(token),
1332 ReceiverFlavor::After(chan) => {
1333 mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1334 }
1335 ReceiverFlavor::Tick(chan) => {
1336 mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1337 }
1338 ReceiverFlavor::Never(chan) => chan.read(token),
1339 }
1340 }