1 //! The channel interface.
4 use std
::iter
::FusedIterator
;
6 use std
::panic
::{RefUnwindSafe, UnwindSafe}
;
8 use std
::time
::{Duration, Instant}
;
12 use err
::{RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError}
;
14 use select
::{Operation, SelectHandle, Token}
;
16 /// Creates a channel of unbounded capacity.
18 /// This channel has a growable buffer that can hold any number of messages at a time.
24 /// use crossbeam_channel::unbounded;
26 /// let (s, r) = unbounded();
28 /// // Computes the n-th Fibonacci number.
29 /// fn fib(n: i32) -> i32 {
33 /// fib(n - 1) + fib(n - 2)
37 /// // Spawn an asynchronous computation.
38 /// thread::spawn(move || s.send(fib(20)).unwrap());
40 /// // Print the result of the computation.
41 /// println!("{}", r.recv().unwrap());
43 pub fn unbounded
<T
>() -> (Sender
<T
>, Receiver
<T
>) {
44 let (s
, r
) = counter
::new(flavors
::list
::Channel
::new());
46 flavor
: SenderFlavor
::List(s
),
49 flavor
: ReceiverFlavor
::List(r
),
54 /// Creates a channel of bounded capacity.
56 /// This channel has a buffer that can hold at most `cap` messages at a time.
58 /// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
59 /// receive operations must appear at the same time in order to pair up and pass the message over.
63 /// Panics if the capacity is greater than `usize::max_value() / 4`.
67 /// A channel of capacity 1:
71 /// use std::time::Duration;
72 /// use crossbeam_channel::bounded;
74 /// let (s, r) = bounded(1);
76 /// // This call returns immediately because there is enough space in the channel.
77 /// s.send(1).unwrap();
79 /// thread::spawn(move || {
80 /// // This call blocks the current thread because the channel is full.
81 /// // It will be able to complete only after the first message is received.
82 /// s.send(2).unwrap();
85 /// thread::sleep(Duration::from_secs(1));
86 /// assert_eq!(r.recv(), Ok(1));
87 /// assert_eq!(r.recv(), Ok(2));
90 /// A zero-capacity channel:
94 /// use std::time::Duration;
95 /// use crossbeam_channel::bounded;
97 /// let (s, r) = bounded(0);
99 /// thread::spawn(move || {
100 /// // This call blocks the current thread until a receive operation appears
101 /// // on the other side of the channel.
102 /// s.send(1).unwrap();
105 /// thread::sleep(Duration::from_secs(1));
106 /// assert_eq!(r.recv(), Ok(1));
108 pub fn bounded
<T
>(cap
: usize) -> (Sender
<T
>, Receiver
<T
>) {
110 let (s
, r
) = counter
::new(flavors
::zero
::Channel
::new());
112 flavor
: SenderFlavor
::Zero(s
),
115 flavor
: ReceiverFlavor
::Zero(r
),
119 let (s
, r
) = counter
::new(flavors
::array
::Channel
::with_capacity(cap
));
121 flavor
: SenderFlavor
::Array(s
),
124 flavor
: ReceiverFlavor
::Array(r
),
130 /// Creates a receiver that delivers a message after a certain duration of time.
132 /// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
133 /// be sent into the channel after `duration` elapses. The message is the instant at which it is
138 /// Using an `after` channel for timeouts:
142 /// # extern crate crossbeam_channel;
144 /// use std::time::Duration;
145 /// use crossbeam_channel::{after, unbounded};
147 /// let (s, r) = unbounded::<i32>();
148 /// let timeout = Duration::from_millis(100);
151 /// recv(r) -> msg => println!("received {:?}", msg),
152 /// recv(after(timeout)) -> _ => println!("timed out"),
157 /// When the message gets sent:
161 /// use std::time::{Duration, Instant};
162 /// use crossbeam_channel::after;
164 /// // Converts a number of milliseconds into a `Duration`.
165 /// let ms = |ms| Duration::from_millis(ms);
167 /// // Returns `true` if `a` and `b` are very close `Instant`s.
168 /// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
170 /// let start = Instant::now();
171 /// let r = after(ms(100));
173 /// thread::sleep(ms(500));
175 /// // This message was sent 100 ms from the start and received 500 ms from the start.
176 /// assert!(eq(r.recv().unwrap(), start + ms(100)));
177 /// assert!(eq(Instant::now(), start + ms(500)));
179 pub fn after(duration
: Duration
) -> Receiver
<Instant
> {
181 flavor
: ReceiverFlavor
::After(Arc
::new(flavors
::after
::Channel
::new(duration
))),
185 /// Creates a receiver that never delivers messages.
187 /// The channel is bounded with capacity of 0 and never gets disconnected.
191 /// Using a `never` channel to optionally add a timeout to [`select!`]:
195 /// # extern crate crossbeam_channel;
198 /// use std::time::{Duration, Instant};
199 /// use crossbeam_channel::{after, never, unbounded};
201 /// let (s, r) = unbounded();
203 /// thread::spawn(move || {
204 /// thread::sleep(Duration::from_secs(1));
205 /// s.send(1).unwrap();
208 /// // Suppose this duration can be a `Some` or a `None`.
209 /// let duration = Some(Duration::from_millis(100));
211 /// // Create a channel that times out after the specified duration.
212 /// let timeout = duration
213 /// .map(|d| after(d))
214 /// .unwrap_or(never());
217 /// recv(r) -> msg => assert_eq!(msg, Ok(1)),
218 /// recv(timeout) -> _ => println!("timed out"),
223 /// [`select!`]: macro.select.html
224 pub fn never
<T
>() -> Receiver
<T
> {
226 flavor
: ReceiverFlavor
::Never(flavors
::never
::Channel
::new()),
230 /// Creates a receiver that delivers messages periodically.
232 /// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be
233 /// sent into the channel in intervals of `duration`. Each message is the instant at which it is
238 /// Using a `tick` channel to periodically print elapsed time:
241 /// use std::time::{Duration, Instant};
242 /// use crossbeam_channel::tick;
244 /// let start = Instant::now();
245 /// let ticker = tick(Duration::from_millis(100));
248 /// ticker.recv().unwrap();
249 /// println!("elapsed: {:?}", start.elapsed());
253 /// When messages get sent:
257 /// use std::time::{Duration, Instant};
258 /// use crossbeam_channel::tick;
260 /// // Converts a number of milliseconds into a `Duration`.
261 /// let ms = |ms| Duration::from_millis(ms);
263 /// // Returns `true` if `a` and `b` are very close `Instant`s.
264 /// let eq = |a, b| a + ms(50) > b && b + ms(50) > a;
266 /// let start = Instant::now();
267 /// let r = tick(ms(100));
269 /// // This message was sent 100 ms from the start and received 100 ms from the start.
270 /// assert!(eq(r.recv().unwrap(), start + ms(100)));
271 /// assert!(eq(Instant::now(), start + ms(100)));
273 /// thread::sleep(ms(500));
275 /// // This message was sent 200 ms from the start and received 600 ms from the start.
276 /// assert!(eq(r.recv().unwrap(), start + ms(200)));
277 /// assert!(eq(Instant::now(), start + ms(600)));
279 /// // This message was sent 700 ms from the start and received 700 ms from the start.
280 /// assert!(eq(r.recv().unwrap(), start + ms(700)));
281 /// assert!(eq(Instant::now(), start + ms(700)));
283 pub fn tick(duration
: Duration
) -> Receiver
<Instant
> {
285 flavor
: ReceiverFlavor
::Tick(Arc
::new(flavors
::tick
::Channel
::new(duration
))),
289 /// The sending side of a channel.
295 /// use crossbeam_channel::unbounded;
297 /// let (s1, r) = unbounded();
298 /// let s2 = s1.clone();
300 /// thread::spawn(move || s1.send(1).unwrap());
301 /// thread::spawn(move || s2.send(2).unwrap());
303 /// let msg1 = r.recv().unwrap();
304 /// let msg2 = r.recv().unwrap();
306 /// assert_eq!(msg1 + msg2, 3);
308 pub struct Sender
<T
> {
309 flavor
: SenderFlavor
<T
>,
313 enum SenderFlavor
<T
> {
314 /// Bounded channel based on a preallocated array.
315 Array(counter
::Sender
<flavors
::array
::Channel
<T
>>),
317 /// Unbounded channel implemented as a linked list.
318 List(counter
::Sender
<flavors
::list
::Channel
<T
>>),
320 /// Zero-capacity channel.
321 Zero(counter
::Sender
<flavors
::zero
::Channel
<T
>>),
324 unsafe impl<T
: Send
> Send
for Sender
<T
> {}
325 unsafe impl<T
: Send
> Sync
for Sender
<T
> {}
327 impl<T
> UnwindSafe
for Sender
<T
> {}
328 impl<T
> RefUnwindSafe
for Sender
<T
> {}
331 /// Attempts to send a message into the channel without blocking.
333 /// This method will either send a message into the channel immediately or return an error if
334 /// the channel is full or disconnected. The returned error contains the original message.
336 /// If called on a zero-capacity channel, this method will send the message only if there
337 /// happens to be a receive operation on the other side of the channel at the same time.
342 /// use crossbeam_channel::{bounded, TrySendError};
344 /// let (s, r) = bounded(1);
346 /// assert_eq!(s.try_send(1), Ok(()));
347 /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
350 /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3)));
352 pub fn try_send(&self, msg
: T
) -> Result
<(), TrySendError
<T
>> {
354 SenderFlavor
::Array(chan
) => chan
.try_send(msg
),
355 SenderFlavor
::List(chan
) => chan
.try_send(msg
),
356 SenderFlavor
::Zero(chan
) => chan
.try_send(msg
),
360 /// Blocks the current thread until a message is sent or the channel is disconnected.
362 /// If the channel is full and not disconnected, this call will block until the send operation
363 /// can proceed. If the channel becomes disconnected, this call will wake up and return an
364 /// error. The returned error contains the original message.
366 /// If called on a zero-capacity channel, this method will wait for a receive operation to
367 /// appear on the other side of the channel.
373 /// use std::time::Duration;
374 /// use crossbeam_channel::{bounded, SendError};
376 /// let (s, r) = bounded(1);
377 /// assert_eq!(s.send(1), Ok(()));
379 /// thread::spawn(move || {
380 /// assert_eq!(r.recv(), Ok(1));
381 /// thread::sleep(Duration::from_secs(1));
385 /// assert_eq!(s.send(2), Ok(()));
386 /// assert_eq!(s.send(3), Err(SendError(3)));
388 pub fn send(&self, msg
: T
) -> Result
<(), SendError
<T
>> {
390 SenderFlavor
::Array(chan
) => chan
.send(msg
, None
),
391 SenderFlavor
::List(chan
) => chan
.send(msg
, None
),
392 SenderFlavor
::Zero(chan
) => chan
.send(msg
, None
),
393 }.map_err(|err
| match err
{
394 SendTimeoutError
::Disconnected(msg
) => SendError(msg
),
395 SendTimeoutError
::Timeout(_
) => unreachable
!(),
399 /// Waits for a message to be sent into the channel, but only for a limited time.
401 /// If the channel is full and not disconnected, this call will block until the send operation
402 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
403 /// wake up and return an error. The returned error contains the original message.
405 /// If called on a zero-capacity channel, this method will wait for a receive operation to
406 /// appear on the other side of the channel.
412 /// use std::time::Duration;
413 /// use crossbeam_channel::{bounded, SendTimeoutError};
415 /// let (s, r) = bounded(0);
417 /// thread::spawn(move || {
418 /// thread::sleep(Duration::from_secs(1));
419 /// assert_eq!(r.recv(), Ok(2));
424 /// s.send_timeout(1, Duration::from_millis(500)),
425 /// Err(SendTimeoutError::Timeout(1)),
428 /// s.send_timeout(2, Duration::from_secs(1)),
432 /// s.send_timeout(3, Duration::from_millis(500)),
433 /// Err(SendTimeoutError::Disconnected(3)),
436 pub fn send_timeout(&self, msg
: T
, timeout
: Duration
) -> Result
<(), SendTimeoutError
<T
>> {
437 let deadline
= Instant
::now() + timeout
;
440 SenderFlavor
::Array(chan
) => chan
.send(msg
, Some(deadline
)),
441 SenderFlavor
::List(chan
) => chan
.send(msg
, Some(deadline
)),
442 SenderFlavor
::Zero(chan
) => chan
.send(msg
, Some(deadline
)),
446 /// Returns `true` if the channel is empty.
448 /// Note: Zero-capacity channels are always empty.
453 /// use crossbeam_channel::unbounded;
455 /// let (s, r) = unbounded();
456 /// assert!(s.is_empty());
458 /// s.send(0).unwrap();
459 /// assert!(!s.is_empty());
461 pub fn is_empty(&self) -> bool
{
463 SenderFlavor
::Array(chan
) => chan
.is_empty(),
464 SenderFlavor
::List(chan
) => chan
.is_empty(),
465 SenderFlavor
::Zero(chan
) => chan
.is_empty(),
469 /// Returns `true` if the channel is full.
471 /// Note: Zero-capacity channels are always full.
476 /// use crossbeam_channel::bounded;
478 /// let (s, r) = bounded(1);
480 /// assert!(!s.is_full());
481 /// s.send(0).unwrap();
482 /// assert!(s.is_full());
484 pub fn is_full(&self) -> bool
{
486 SenderFlavor
::Array(chan
) => chan
.is_full(),
487 SenderFlavor
::List(chan
) => chan
.is_full(),
488 SenderFlavor
::Zero(chan
) => chan
.is_full(),
492 /// Returns the number of messages in the channel.
497 /// use crossbeam_channel::unbounded;
499 /// let (s, r) = unbounded();
500 /// assert_eq!(s.len(), 0);
502 /// s.send(1).unwrap();
503 /// s.send(2).unwrap();
504 /// assert_eq!(s.len(), 2);
506 pub fn len(&self) -> usize {
508 SenderFlavor
::Array(chan
) => chan
.len(),
509 SenderFlavor
::List(chan
) => chan
.len(),
510 SenderFlavor
::Zero(chan
) => chan
.len(),
514 /// If the channel is bounded, returns its capacity.
519 /// use crossbeam_channel::{bounded, unbounded};
521 /// let (s, _) = unbounded::<i32>();
522 /// assert_eq!(s.capacity(), None);
524 /// let (s, _) = bounded::<i32>(5);
525 /// assert_eq!(s.capacity(), Some(5));
527 /// let (s, _) = bounded::<i32>(0);
528 /// assert_eq!(s.capacity(), Some(0));
530 pub fn capacity(&self) -> Option
<usize> {
532 SenderFlavor
::Array(chan
) => chan
.capacity(),
533 SenderFlavor
::List(chan
) => chan
.capacity(),
534 SenderFlavor
::Zero(chan
) => chan
.capacity(),
539 impl<T
> Drop
for Sender
<T
> {
543 SenderFlavor
::Array(chan
) => chan
.release(|c
| c
.disconnect()),
544 SenderFlavor
::List(chan
) => chan
.release(|c
| c
.disconnect()),
545 SenderFlavor
::Zero(chan
) => chan
.release(|c
| c
.disconnect()),
551 impl<T
> Clone
for Sender
<T
> {
552 fn clone(&self) -> Self {
553 let flavor
= match &self.flavor
{
554 SenderFlavor
::Array(chan
) => SenderFlavor
::Array(chan
.acquire()),
555 SenderFlavor
::List(chan
) => SenderFlavor
::List(chan
.acquire()),
556 SenderFlavor
::Zero(chan
) => SenderFlavor
::Zero(chan
.acquire()),
563 impl<T
> fmt
::Debug
for Sender
<T
> {
564 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
565 f
.pad("Sender { .. }")
569 /// The receiving side of a channel.
575 /// use std::time::Duration;
576 /// use crossbeam_channel::unbounded;
578 /// let (s, r) = unbounded();
580 /// thread::spawn(move || {
582 /// thread::sleep(Duration::from_secs(1));
586 /// assert_eq!(r.recv(), Ok(1)); // Received immediately.
587 /// assert_eq!(r.recv(), Ok(2)); // Received after 1 second.
589 pub struct Receiver
<T
> {
590 flavor
: ReceiverFlavor
<T
>,
593 /// Receiver flavors.
594 enum ReceiverFlavor
<T
> {
595 /// Bounded channel based on a preallocated array.
596 Array(counter
::Receiver
<flavors
::array
::Channel
<T
>>),
598 /// Unbounded channel implemented as a linked list.
599 List(counter
::Receiver
<flavors
::list
::Channel
<T
>>),
601 /// Zero-capacity channel.
602 Zero(counter
::Receiver
<flavors
::zero
::Channel
<T
>>),
604 /// The after flavor.
605 After(Arc
<flavors
::after
::Channel
>),
608 Tick(Arc
<flavors
::tick
::Channel
>),
610 /// The never flavor.
611 Never(flavors
::never
::Channel
<T
>),
614 unsafe impl<T
: Send
> Send
for Receiver
<T
> {}
615 unsafe impl<T
: Send
> Sync
for Receiver
<T
> {}
617 impl<T
> UnwindSafe
for Receiver
<T
> {}
618 impl<T
> RefUnwindSafe
for Receiver
<T
> {}
620 impl<T
> Receiver
<T
> {
621 /// Attempts to receive a message from the channel without blocking.
623 /// This method will either receive a message from the channel immediately or return an error
624 /// if the channel is empty.
626 /// If called on a zero-capacity channel, this method will receive a message only if there
627 /// happens to be a send operation on the other side of the channel at the same time.
632 /// use crossbeam_channel::{unbounded, TryRecvError};
634 /// let (s, r) = unbounded();
635 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
637 /// s.send(5).unwrap();
640 /// assert_eq!(r.try_recv(), Ok(5));
641 /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
643 pub fn try_recv(&self) -> Result
<T
, TryRecvError
> {
645 ReceiverFlavor
::Array(chan
) => chan
.try_recv(),
646 ReceiverFlavor
::List(chan
) => chan
.try_recv(),
647 ReceiverFlavor
::Zero(chan
) => chan
.try_recv(),
648 ReceiverFlavor
::After(chan
) => {
649 let msg
= chan
.try_recv();
651 mem
::transmute_copy
::<Result
<Instant
, TryRecvError
>, Result
<T
, TryRecvError
>>(
656 ReceiverFlavor
::Tick(chan
) => {
657 let msg
= chan
.try_recv();
659 mem
::transmute_copy
::<Result
<Instant
, TryRecvError
>, Result
<T
, TryRecvError
>>(
664 ReceiverFlavor
::Never(chan
) => chan
.try_recv(),
668 /// Blocks the current thread until a message is received or the channel is empty and
671 /// If the channel is empty and not disconnected, this call will block until the receive
672 /// operation can proceed. If the channel is empty and becomes disconnected, this call will
673 /// wake up and return an error.
675 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
676 /// on the other side of the channel.
682 /// use std::time::Duration;
683 /// use crossbeam_channel::{unbounded, RecvError};
685 /// let (s, r) = unbounded();
687 /// thread::spawn(move || {
688 /// thread::sleep(Duration::from_secs(1));
689 /// s.send(5).unwrap();
693 /// assert_eq!(r.recv(), Ok(5));
694 /// assert_eq!(r.recv(), Err(RecvError));
696 pub fn recv(&self) -> Result
<T
, RecvError
> {
698 ReceiverFlavor
::Array(chan
) => chan
.recv(None
),
699 ReceiverFlavor
::List(chan
) => chan
.recv(None
),
700 ReceiverFlavor
::Zero(chan
) => chan
.recv(None
),
701 ReceiverFlavor
::After(chan
) => {
702 let msg
= chan
.recv(None
);
704 mem
::transmute_copy
::<
705 Result
<Instant
, RecvTimeoutError
>,
706 Result
<T
, RecvTimeoutError
>,
710 ReceiverFlavor
::Tick(chan
) => {
711 let msg
= chan
.recv(None
);
713 mem
::transmute_copy
::<
714 Result
<Instant
, RecvTimeoutError
>,
715 Result
<T
, RecvTimeoutError
>,
719 ReceiverFlavor
::Never(chan
) => chan
.recv(None
),
720 }.map_err(|_
| RecvError
)
723 /// Waits for a message to be received from the channel, but only for a limited time.
725 /// If the channel is empty and not disconnected, this call will block until the receive
726 /// operation can proceed or the operation times out. If the channel is empty and becomes
727 /// disconnected, this call will wake up and return an error.
729 /// If called on a zero-capacity channel, this method will wait for a send operation to appear
730 /// on the other side of the channel.
736 /// use std::time::Duration;
737 /// use crossbeam_channel::{unbounded, RecvTimeoutError};
739 /// let (s, r) = unbounded();
741 /// thread::spawn(move || {
742 /// thread::sleep(Duration::from_secs(1));
743 /// s.send(5).unwrap();
748 /// r.recv_timeout(Duration::from_millis(500)),
749 /// Err(RecvTimeoutError::Timeout),
752 /// r.recv_timeout(Duration::from_secs(1)),
756 /// r.recv_timeout(Duration::from_secs(1)),
757 /// Err(RecvTimeoutError::Disconnected),
760 pub fn recv_timeout(&self, timeout
: Duration
) -> Result
<T
, RecvTimeoutError
> {
761 let deadline
= Instant
::now() + timeout
;
764 ReceiverFlavor
::Array(chan
) => chan
.recv(Some(deadline
)),
765 ReceiverFlavor
::List(chan
) => chan
.recv(Some(deadline
)),
766 ReceiverFlavor
::Zero(chan
) => chan
.recv(Some(deadline
)),
767 ReceiverFlavor
::After(chan
) => {
768 let msg
= chan
.recv(Some(deadline
));
770 mem
::transmute_copy
::<
771 Result
<Instant
, RecvTimeoutError
>,
772 Result
<T
, RecvTimeoutError
>,
776 ReceiverFlavor
::Tick(chan
) => {
777 let msg
= chan
.recv(Some(deadline
));
779 mem
::transmute_copy
::<
780 Result
<Instant
, RecvTimeoutError
>,
781 Result
<T
, RecvTimeoutError
>,
785 ReceiverFlavor
::Never(chan
) => chan
.recv(Some(deadline
)),
789 /// Returns `true` if the channel is empty.
791 /// Note: Zero-capacity channels are always empty.
796 /// use crossbeam_channel::unbounded;
798 /// let (s, r) = unbounded();
800 /// assert!(r.is_empty());
801 /// s.send(0).unwrap();
802 /// assert!(!r.is_empty());
804 pub fn is_empty(&self) -> bool
{
806 ReceiverFlavor
::Array(chan
) => chan
.is_empty(),
807 ReceiverFlavor
::List(chan
) => chan
.is_empty(),
808 ReceiverFlavor
::Zero(chan
) => chan
.is_empty(),
809 ReceiverFlavor
::After(chan
) => chan
.is_empty(),
810 ReceiverFlavor
::Tick(chan
) => chan
.is_empty(),
811 ReceiverFlavor
::Never(chan
) => chan
.is_empty(),
815 /// Returns `true` if the channel is full.
817 /// Note: Zero-capacity channels are always full.
822 /// use crossbeam_channel::bounded;
824 /// let (s, r) = bounded(1);
826 /// assert!(!r.is_full());
827 /// s.send(0).unwrap();
828 /// assert!(r.is_full());
830 pub fn is_full(&self) -> bool
{
832 ReceiverFlavor
::Array(chan
) => chan
.is_full(),
833 ReceiverFlavor
::List(chan
) => chan
.is_full(),
834 ReceiverFlavor
::Zero(chan
) => chan
.is_full(),
835 ReceiverFlavor
::After(chan
) => chan
.is_full(),
836 ReceiverFlavor
::Tick(chan
) => chan
.is_full(),
837 ReceiverFlavor
::Never(chan
) => chan
.is_full(),
841 /// Returns the number of messages in the channel.
846 /// use crossbeam_channel::unbounded;
848 /// let (s, r) = unbounded();
849 /// assert_eq!(r.len(), 0);
851 /// s.send(1).unwrap();
852 /// s.send(2).unwrap();
853 /// assert_eq!(r.len(), 2);
855 pub fn len(&self) -> usize {
857 ReceiverFlavor
::Array(chan
) => chan
.len(),
858 ReceiverFlavor
::List(chan
) => chan
.len(),
859 ReceiverFlavor
::Zero(chan
) => chan
.len(),
860 ReceiverFlavor
::After(chan
) => chan
.len(),
861 ReceiverFlavor
::Tick(chan
) => chan
.len(),
862 ReceiverFlavor
::Never(chan
) => chan
.len(),
866 /// If the channel is bounded, returns its capacity.
871 /// use crossbeam_channel::{bounded, unbounded};
873 /// let (_, r) = unbounded::<i32>();
874 /// assert_eq!(r.capacity(), None);
876 /// let (_, r) = bounded::<i32>(5);
877 /// assert_eq!(r.capacity(), Some(5));
879 /// let (_, r) = bounded::<i32>(0);
880 /// assert_eq!(r.capacity(), Some(0));
882 pub fn capacity(&self) -> Option
<usize> {
884 ReceiverFlavor
::Array(chan
) => chan
.capacity(),
885 ReceiverFlavor
::List(chan
) => chan
.capacity(),
886 ReceiverFlavor
::Zero(chan
) => chan
.capacity(),
887 ReceiverFlavor
::After(chan
) => chan
.capacity(),
888 ReceiverFlavor
::Tick(chan
) => chan
.capacity(),
889 ReceiverFlavor
::Never(chan
) => chan
.capacity(),
893 /// A blocking iterator over messages in the channel.
895 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if
896 /// the channel becomes empty and disconnected, it returns [`None`] without blocking.
898 /// [`next`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next
899 /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
905 /// use crossbeam_channel::unbounded;
907 /// let (s, r) = unbounded();
909 /// thread::spawn(move || {
910 /// s.send(1).unwrap();
911 /// s.send(2).unwrap();
912 /// s.send(3).unwrap();
913 /// drop(s); // Disconnect the channel.
916 /// // Collect all messages from the channel.
917 /// // Note that the call to `collect` blocks until the sender is dropped.
918 /// let v: Vec<_> = r.iter().collect();
920 /// assert_eq!(v, [1, 2, 3]);
922 pub fn iter(&self) -> Iter
<T
> {
923 Iter { receiver: self }
926 /// A non-blocking iterator over messages in the channel.
928 /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
929 /// never blocks waiting for the next message.
931 /// [`next`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next
937 /// use std::time::Duration;
938 /// use crossbeam_channel::unbounded;
940 /// let (s, r) = unbounded::<i32>();
942 /// thread::spawn(move || {
943 /// s.send(1).unwrap();
944 /// thread::sleep(Duration::from_secs(1));
945 /// s.send(2).unwrap();
946 /// thread::sleep(Duration::from_secs(2));
947 /// s.send(3).unwrap();
950 /// thread::sleep(Duration::from_secs(2));
952 /// // Collect all messages from the channel without blocking.
953 /// // The third message hasn't been sent yet so we'll collect only the first two.
954 /// let v: Vec<_> = r.try_iter().collect();
956 /// assert_eq!(v, [1, 2]);
958 pub fn try_iter(&self) -> TryIter
<T
> {
959 TryIter { receiver: self }
963 impl<T
> Drop
for Receiver
<T
> {
967 ReceiverFlavor
::Array(chan
) => chan
.release(|c
| c
.disconnect()),
968 ReceiverFlavor
::List(chan
) => chan
.release(|c
| c
.disconnect()),
969 ReceiverFlavor
::Zero(chan
) => chan
.release(|c
| c
.disconnect()),
970 ReceiverFlavor
::After(_
) => {}
,
971 ReceiverFlavor
::Tick(_
) => {}
,
972 ReceiverFlavor
::Never(_
) => {}
,
978 impl<T
> Clone
for Receiver
<T
> {
979 fn clone(&self) -> Self {
980 let flavor
= match &self.flavor
{
981 ReceiverFlavor
::Array(chan
) => ReceiverFlavor
::Array(chan
.acquire()),
982 ReceiverFlavor
::List(chan
) => ReceiverFlavor
::List(chan
.acquire()),
983 ReceiverFlavor
::Zero(chan
) => ReceiverFlavor
::Zero(chan
.acquire()),
984 ReceiverFlavor
::After(chan
) => ReceiverFlavor
::After(chan
.clone()),
985 ReceiverFlavor
::Tick(chan
) => ReceiverFlavor
::Tick(chan
.clone()),
986 ReceiverFlavor
::Never(_
) => ReceiverFlavor
::Never(flavors
::never
::Channel
::new()),
993 impl<T
> fmt
::Debug
for Receiver
<T
> {
994 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
995 f
.pad("Receiver { .. }")
999 impl<'a
, T
> IntoIterator
for &'a Receiver
<T
> {
1001 type IntoIter
= Iter
<'a
, T
>;
1003 fn into_iter(self) -> Self::IntoIter
{
1008 impl<T
> IntoIterator
for Receiver
<T
> {
1010 type IntoIter
= IntoIter
<T
>;
1012 fn into_iter(self) -> Self::IntoIter
{
1013 IntoIter { receiver: self }
1017 /// A blocking iterator over messages in a channel.
1019 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1020 /// channel becomes empty and disconnected, it returns [`None`] without blocking.
1022 /// [`next`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next
1023 /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
1028 /// use std::thread;
1029 /// use crossbeam_channel::unbounded;
1031 /// let (s, r) = unbounded();
1033 /// thread::spawn(move || {
1034 /// s.send(1).unwrap();
1035 /// s.send(2).unwrap();
1036 /// s.send(3).unwrap();
1037 /// drop(s); // Disconnect the channel.
1040 /// // Collect all messages from the channel.
1041 /// // Note that the call to `collect` blocks until the sender is dropped.
1042 /// let v: Vec<_> = r.iter().collect();
1044 /// assert_eq!(v, [1, 2, 3]);
1046 pub struct Iter
<'a
, T
: 'a
> {
1047 receiver
: &'a Receiver
<T
>,
1050 impl<'a
, T
> FusedIterator
for Iter
<'a
, T
> {}
1052 impl<'a
, T
> Iterator
for Iter
<'a
, T
> {
1055 fn next(&mut self) -> Option
<Self::Item
> {
1056 self.receiver
.recv().ok()
1060 impl<'a
, T
> fmt
::Debug
for Iter
<'a
, T
> {
1061 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1062 f
.pad("Iter { .. }")
1066 /// A non-blocking iterator over messages in a channel.
1068 /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1069 /// never blocks waiting for the next message.
1071 /// [`next`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next
1076 /// use std::thread;
1077 /// use std::time::Duration;
1078 /// use crossbeam_channel::unbounded;
1080 /// let (s, r) = unbounded::<i32>();
1082 /// thread::spawn(move || {
1083 /// s.send(1).unwrap();
1084 /// thread::sleep(Duration::from_secs(1));
1085 /// s.send(2).unwrap();
1086 /// thread::sleep(Duration::from_secs(2));
1087 /// s.send(3).unwrap();
1090 /// thread::sleep(Duration::from_secs(2));
1092 /// // Collect all messages from the channel without blocking.
1093 /// // The third message hasn't been sent yet so we'll collect only the first two.
1094 /// let v: Vec<_> = r.try_iter().collect();
1096 /// assert_eq!(v, [1, 2]);
1098 pub struct TryIter
<'a
, T
: 'a
> {
1099 receiver
: &'a Receiver
<T
>,
1102 impl<'a
, T
> Iterator
for TryIter
<'a
, T
> {
1105 fn next(&mut self) -> Option
<Self::Item
> {
1106 self.receiver
.try_recv().ok()
1110 impl<'a
, T
> fmt
::Debug
for TryIter
<'a
, T
> {
1111 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1112 f
.pad("TryIter { .. }")
1116 /// A blocking iterator over messages in a channel.
1118 /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1119 /// channel becomes empty and disconnected, it returns [`None`] without blocking.
1121 /// [`next`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next
1122 /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
1127 /// use std::thread;
1128 /// use crossbeam_channel::unbounded;
1130 /// let (s, r) = unbounded();
1132 /// thread::spawn(move || {
1133 /// s.send(1).unwrap();
1134 /// s.send(2).unwrap();
1135 /// s.send(3).unwrap();
1136 /// drop(s); // Disconnect the channel.
1139 /// // Collect all messages from the channel.
1140 /// // Note that the call to `collect` blocks until the sender is dropped.
1141 /// let v: Vec<_> = r.into_iter().collect();
1143 /// assert_eq!(v, [1, 2, 3]);
1145 pub struct IntoIter
<T
> {
1146 receiver
: Receiver
<T
>,
1149 impl<T
> FusedIterator
for IntoIter
<T
> {}
1151 impl<T
> Iterator
for IntoIter
<T
> {
1154 fn next(&mut self) -> Option
<Self::Item
> {
1155 self.receiver
.recv().ok()
1159 impl<T
> fmt
::Debug
for IntoIter
<T
> {
1160 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1161 f
.pad("IntoIter { .. }")
1165 impl<T
> SelectHandle
for Sender
<T
> {
1166 fn try_select(&self, token
: &mut Token
) -> bool
{
1167 match &self.flavor
{
1168 SenderFlavor
::Array(chan
) => chan
.sender().try_select(token
),
1169 SenderFlavor
::List(chan
) => chan
.sender().try_select(token
),
1170 SenderFlavor
::Zero(chan
) => chan
.sender().try_select(token
),
1174 fn deadline(&self) -> Option
<Instant
> {
1178 fn register(&self, oper
: Operation
, cx
: &Context
) -> bool
{
1179 match &self.flavor
{
1180 SenderFlavor
::Array(chan
) => chan
.sender().register(oper
, cx
),
1181 SenderFlavor
::List(chan
) => chan
.sender().register(oper
, cx
),
1182 SenderFlavor
::Zero(chan
) => chan
.sender().register(oper
, cx
),
1186 fn unregister(&self, oper
: Operation
) {
1187 match &self.flavor
{
1188 SenderFlavor
::Array(chan
) => chan
.sender().unregister(oper
),
1189 SenderFlavor
::List(chan
) => chan
.sender().unregister(oper
),
1190 SenderFlavor
::Zero(chan
) => chan
.sender().unregister(oper
),
1194 fn accept(&self, token
: &mut Token
, cx
: &Context
) -> bool
{
1195 match &self.flavor
{
1196 SenderFlavor
::Array(chan
) => chan
.sender().accept(token
, cx
),
1197 SenderFlavor
::List(chan
) => chan
.sender().accept(token
, cx
),
1198 SenderFlavor
::Zero(chan
) => chan
.sender().accept(token
, cx
),
1202 fn is_ready(&self) -> bool
{
1203 match &self.flavor
{
1204 SenderFlavor
::Array(chan
) => chan
.sender().is_ready(),
1205 SenderFlavor
::List(chan
) => chan
.sender().is_ready(),
1206 SenderFlavor
::Zero(chan
) => chan
.sender().is_ready(),
1210 fn watch(&self, oper
: Operation
, cx
: &Context
) -> bool
{
1211 match &self.flavor
{
1212 SenderFlavor
::Array(chan
) => chan
.sender().watch(oper
, cx
),
1213 SenderFlavor
::List(chan
) => chan
.sender().watch(oper
, cx
),
1214 SenderFlavor
::Zero(chan
) => chan
.sender().watch(oper
, cx
),
1218 fn unwatch(&self, oper
: Operation
) {
1219 match &self.flavor
{
1220 SenderFlavor
::Array(chan
) => chan
.sender().unwatch(oper
),
1221 SenderFlavor
::List(chan
) => chan
.sender().unwatch(oper
),
1222 SenderFlavor
::Zero(chan
) => chan
.sender().unwatch(oper
),
1227 impl<T
> SelectHandle
for Receiver
<T
> {
1228 fn try_select(&self, token
: &mut Token
) -> bool
{
1229 match &self.flavor
{
1230 ReceiverFlavor
::Array(chan
) => chan
.receiver().try_select(token
),
1231 ReceiverFlavor
::List(chan
) => chan
.receiver().try_select(token
),
1232 ReceiverFlavor
::Zero(chan
) => chan
.receiver().try_select(token
),
1233 ReceiverFlavor
::After(chan
) => chan
.try_select(token
),
1234 ReceiverFlavor
::Tick(chan
) => chan
.try_select(token
),
1235 ReceiverFlavor
::Never(chan
) => chan
.try_select(token
),
1239 fn deadline(&self) -> Option
<Instant
> {
1240 match &self.flavor
{
1241 ReceiverFlavor
::Array(_
) => None
,
1242 ReceiverFlavor
::List(_
) => None
,
1243 ReceiverFlavor
::Zero(_
) => None
,
1244 ReceiverFlavor
::After(chan
) => chan
.deadline(),
1245 ReceiverFlavor
::Tick(chan
) => chan
.deadline(),
1246 ReceiverFlavor
::Never(chan
) => chan
.deadline(),
1250 fn register(&self, oper
: Operation
, cx
: &Context
) -> bool
{
1251 match &self.flavor
{
1252 ReceiverFlavor
::Array(chan
) => chan
.receiver().register(oper
, cx
),
1253 ReceiverFlavor
::List(chan
) => chan
.receiver().register(oper
, cx
),
1254 ReceiverFlavor
::Zero(chan
) => chan
.receiver().register(oper
, cx
),
1255 ReceiverFlavor
::After(chan
) => chan
.register(oper
, cx
),
1256 ReceiverFlavor
::Tick(chan
) => chan
.register(oper
, cx
),
1257 ReceiverFlavor
::Never(chan
) => chan
.register(oper
, cx
),
1261 fn unregister(&self, oper
: Operation
) {
1262 match &self.flavor
{
1263 ReceiverFlavor
::Array(chan
) => chan
.receiver().unregister(oper
),
1264 ReceiverFlavor
::List(chan
) => chan
.receiver().unregister(oper
),
1265 ReceiverFlavor
::Zero(chan
) => chan
.receiver().unregister(oper
),
1266 ReceiverFlavor
::After(chan
) => chan
.unregister(oper
),
1267 ReceiverFlavor
::Tick(chan
) => chan
.unregister(oper
),
1268 ReceiverFlavor
::Never(chan
) => chan
.unregister(oper
),
1272 fn accept(&self, token
: &mut Token
, cx
: &Context
) -> bool
{
1273 match &self.flavor
{
1274 ReceiverFlavor
::Array(chan
) => chan
.receiver().accept(token
, cx
),
1275 ReceiverFlavor
::List(chan
) => chan
.receiver().accept(token
, cx
),
1276 ReceiverFlavor
::Zero(chan
) => chan
.receiver().accept(token
, cx
),
1277 ReceiverFlavor
::After(chan
) => chan
.accept(token
, cx
),
1278 ReceiverFlavor
::Tick(chan
) => chan
.accept(token
, cx
),
1279 ReceiverFlavor
::Never(chan
) => chan
.accept(token
, cx
),
1283 fn is_ready(&self) -> bool
{
1284 match &self.flavor
{
1285 ReceiverFlavor
::Array(chan
) => chan
.receiver().is_ready(),
1286 ReceiverFlavor
::List(chan
) => chan
.receiver().is_ready(),
1287 ReceiverFlavor
::Zero(chan
) => chan
.receiver().is_ready(),
1288 ReceiverFlavor
::After(chan
) => chan
.is_ready(),
1289 ReceiverFlavor
::Tick(chan
) => chan
.is_ready(),
1290 ReceiverFlavor
::Never(chan
) => chan
.is_ready(),
1294 fn watch(&self, oper
: Operation
, cx
: &Context
) -> bool
{
1295 match &self.flavor
{
1296 ReceiverFlavor
::Array(chan
) => chan
.receiver().watch(oper
, cx
),
1297 ReceiverFlavor
::List(chan
) => chan
.receiver().watch(oper
, cx
),
1298 ReceiverFlavor
::Zero(chan
) => chan
.receiver().watch(oper
, cx
),
1299 ReceiverFlavor
::After(chan
) => chan
.watch(oper
, cx
),
1300 ReceiverFlavor
::Tick(chan
) => chan
.watch(oper
, cx
),
1301 ReceiverFlavor
::Never(chan
) => chan
.watch(oper
, cx
),
1305 fn unwatch(&self, oper
: Operation
) {
1306 match &self.flavor
{
1307 ReceiverFlavor
::Array(chan
) => chan
.receiver().unwatch(oper
),
1308 ReceiverFlavor
::List(chan
) => chan
.receiver().unwatch(oper
),
1309 ReceiverFlavor
::Zero(chan
) => chan
.receiver().unwatch(oper
),
1310 ReceiverFlavor
::After(chan
) => chan
.unwatch(oper
),
1311 ReceiverFlavor
::Tick(chan
) => chan
.unwatch(oper
),
1312 ReceiverFlavor
::Never(chan
) => chan
.unwatch(oper
),
1317 /// Writes a message into the channel.
1318 pub unsafe fn write
<T
>(s
: &Sender
<T
>, token
: &mut Token
, msg
: T
) -> Result
<(), T
> {
1320 SenderFlavor
::Array(chan
) => chan
.write(token
, msg
),
1321 SenderFlavor
::List(chan
) => chan
.write(token
, msg
),
1322 SenderFlavor
::Zero(chan
) => chan
.write(token
, msg
),
1326 /// Reads a message from the channel.
1327 pub unsafe fn read
<T
>(r
: &Receiver
<T
>, token
: &mut Token
) -> Result
<T
, ()> {
1329 ReceiverFlavor
::Array(chan
) => chan
.read(token
),
1330 ReceiverFlavor
::List(chan
) => chan
.read(token
),
1331 ReceiverFlavor
::Zero(chan
) => chan
.read(token
),
1332 ReceiverFlavor
::After(chan
) => {
1333 mem
::transmute_copy
::<Result
<Instant
, ()>, Result
<T
, ()>>(&chan
.read(token
))
1335 ReceiverFlavor
::Tick(chan
) => {
1336 mem
::transmute_copy
::<Result
<Instant
, ()>, Result
<T
, ()>>(&chan
.read(token
))
1338 ReceiverFlavor
::Never(chan
) => chan
.read(token
),