1 //! Interface to the select mechanism.
4 use std
::marker
::PhantomData
;
6 use std
::time
::{Duration, Instant}
;
8 use crossbeam_utils
::Backoff
;
10 use channel
::{self, Receiver, Sender}
;
12 use err
::{ReadyTimeoutError, TryReadyError}
;
13 use err
::{RecvError, SendError}
;
14 use err
::{SelectTimeoutError, TrySelectError}
;
18 /// Temporary data that gets initialized during select or a blocking operation, and is consumed by
19 /// `read` or `write`.
21 /// Each field contains data associated with a specific channel flavor.
22 #[derive(Debug, Default)]
24 pub after
: flavors
::after
::AfterToken
,
25 pub array
: flavors
::array
::ArrayToken
,
26 pub list
: flavors
::list
::ListToken
,
27 pub never
: flavors
::never
::NeverToken
,
28 pub tick
: flavors
::tick
::TickToken
,
29 pub zero
: flavors
::zero
::ZeroToken
,
32 /// Identifier associated with an operation by a specific thread on a specific channel.
33 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
34 pub struct Operation(usize);
37 /// Creates an operation identifier from a mutable reference.
39 /// This function essentially just turns the address of the reference into a number. The
40 /// reference should point to a variable that is specific to the thread and the operation,
41 /// and is alive for the entire duration of select or blocking operation.
43 pub fn hook
<T
>(r
: &mut T
) -> Operation
{
44 let val
= r
as *mut T
as usize;
45 // Make sure that the pointer address doesn't equal the numerical representation of
46 // `Selected::{Waiting, Aborted, Disconnected}`.
52 /// Current state of a select or a blocking operation.
53 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
55 /// Still waiting for an operation.
58 /// The attempt to block the current thread has been aborted.
61 /// An operation became ready because a channel is disconnected.
64 /// An operation became ready because a message can be sent or received.
68 impl From
<usize> for Selected
{
70 fn from(val
: usize) -> Selected
{
72 0 => Selected
::Waiting
,
73 1 => Selected
::Aborted
,
74 2 => Selected
::Disconnected
,
75 oper
=> Selected
::Operation(Operation(oper
)),
80 impl Into
<usize> for Selected
{
82 fn into(self) -> usize {
84 Selected
::Waiting
=> 0,
85 Selected
::Aborted
=> 1,
86 Selected
::Disconnected
=> 2,
87 Selected
::Operation(Operation(val
)) => val
,
92 /// A receiver or a sender that can participate in select.
94 /// This is a handle that assists select in executing an operation, registration, deciding on the
95 /// appropriate deadline for blocking, etc.
96 pub trait SelectHandle
{
97 /// Attempts to select an operation and returns `true` on success.
98 fn try_select(&self, token
: &mut Token
) -> bool
;
100 /// Returns a deadline for an operation, if there is one.
101 fn deadline(&self) -> Option
<Instant
>;
103 /// Registers an operation for execution and returns `true` if it is now ready.
104 fn register(&self, oper
: Operation
, cx
: &Context
) -> bool
;
106 /// Unregisters an operation for execution.
107 fn unregister(&self, oper
: Operation
);
109 /// Attempts to select an operation the thread got woken up for and returns `true` on success.
110 fn accept(&self, token
: &mut Token
, cx
: &Context
) -> bool
;
112 /// Returns `true` if an operation can be executed without blocking.
113 fn is_ready(&self) -> bool
;
115 /// Registers an operation for readiness notification and returns `true` if it is now ready.
116 fn watch(&self, oper
: Operation
, cx
: &Context
) -> bool
;
118 /// Unregisters an operation for readiness notification.
119 fn unwatch(&self, oper
: Operation
);
122 impl<'a
, T
: SelectHandle
> SelectHandle
for &'a T
{
123 fn try_select(&self, token
: &mut Token
) -> bool
{
124 (**self).try_select(token
)
127 fn deadline(&self) -> Option
<Instant
> {
131 fn register(&self, oper
: Operation
, cx
: &Context
) -> bool
{
132 (**self).register(oper
, cx
)
135 fn unregister(&self, oper
: Operation
) {
136 (**self).unregister(oper
);
139 fn accept(&self, token
: &mut Token
, cx
: &Context
) -> bool
{
140 (**self).accept(token
, cx
)
143 fn is_ready(&self) -> bool
{
147 fn watch(&self, oper
: Operation
, cx
: &Context
) -> bool
{
148 (**self).watch(oper
, cx
)
151 fn unwatch(&self, oper
: Operation
) {
152 (**self).unwatch(oper
)
156 /// Determines when a select operation should time out.
157 #[derive(Clone, Copy, Eq, PartialEq)]
165 /// Time out after the time instant.
169 /// Runs until one of the operations is selected, potentially blocking the current thread.
171 /// Successful receive operations will have to be followed up by `channel::read()` and successful
172 /// send operations by `channel::write()`.
174 handles
: &mut [(&dyn SelectHandle
, usize, *const u8)],
176 ) -> Option
<(Token
, usize, *const u8)> {
177 if handles
.is_empty() {
178 // Wait until the timeout and return.
180 Timeout
::Now
=> return None
,
182 utils
::sleep_until(None
);
185 Timeout
::At(when
) => {
186 utils
::sleep_until(Some(when
));
192 // Shuffle the operations for fairness.
193 utils
::shuffle(handles
);
195 // Create a token, which serves as a temporary variable that gets initialized in this function
196 // and is later used by a call to `channel::read()` or `channel::write()` that completes the
197 // selected operation.
198 let mut token
= Token
::default();
200 // Try selecting one of the operations without blocking.
201 for &(handle
, i
, ptr
) in handles
.iter() {
202 if handle
.try_select(&mut token
) {
203 return Some((token
, i
, ptr
));
208 // Prepare for blocking.
209 let res
= Context
::with(|cx
| {
210 let mut sel
= Selected
::Waiting
;
211 let mut registered_count
= 0;
212 let mut index_ready
= None
;
214 if let Timeout
::Now
= timeout
{
215 cx
.try_select(Selected
::Aborted
).unwrap();
218 // Register all operations.
219 for (handle
, i
, _
) in handles
.iter_mut() {
220 registered_count
+= 1;
222 // If registration returns `false`, that means the operation has just become ready.
223 if handle
.register(Operation
::hook
::<&dyn SelectHandle
>(handle
), cx
) {
224 // Try aborting select.
225 sel
= match cx
.try_select(Selected
::Aborted
) {
227 index_ready
= Some(*i
);
235 // If another thread has already selected one of the operations, stop registration.
237 if sel
!= Selected
::Waiting
{
242 if sel
== Selected
::Waiting
{
243 // Check with each operation for how long we're allowed to block, and compute the
244 // earliest deadline.
245 let mut deadline
: Option
<Instant
> = match timeout
{
246 Timeout
::Now
=> return None
,
247 Timeout
::Never
=> None
,
248 Timeout
::At(when
) => Some(when
),
250 for &(handle
, _
, _
) in handles
.iter() {
251 if let Some(x
) = handle
.deadline() {
252 deadline
= deadline
.map(|y
| x
.min(y
)).or(Some(x
));
256 // Block the current thread.
257 sel
= cx
.wait_until(deadline
);
260 // Unregister all registered operations.
261 for (handle
, _
, _
) in handles
.iter_mut().take(registered_count
) {
262 handle
.unregister(Operation
::hook
::<&dyn SelectHandle
>(handle
));
266 Selected
::Waiting
=> unreachable
!(),
267 Selected
::Aborted
=> {
268 // If an operation became ready during registration, try selecting it.
269 if let Some(index_ready
) = index_ready
{
270 for &(handle
, i
, ptr
) in handles
.iter() {
271 if i
== index_ready
&& handle
.try_select(&mut token
) {
272 return Some((i
, ptr
));
277 Selected
::Disconnected
=> {}
278 Selected
::Operation(_
) => {
279 // Find the selected operation.
280 for (handle
, i
, ptr
) in handles
.iter_mut() {
281 // Is this the selected operation?
282 if sel
== Selected
::Operation(Operation
::hook
::<&dyn SelectHandle
>(handle
))
284 // Try selecting this operation.
285 if handle
.accept(&mut token
, cx
) {
286 return Some((*i
, *ptr
));
296 // Return if an operation was selected.
297 if let Some((i
, ptr
)) = res
{
298 return Some((token
, i
, ptr
));
301 // Try selecting one of the operations without blocking.
302 for &(handle
, i
, ptr
) in handles
.iter() {
303 if handle
.try_select(&mut token
) {
304 return Some((token
, i
, ptr
));
309 Timeout
::Now
=> return None
,
311 Timeout
::At(when
) => {
312 if Instant
::now() >= when
{
320 /// Runs until one of the operations becomes ready, potentially blocking the current thread.
322 handles
: &mut [(&dyn SelectHandle
, usize, *const u8)],
325 if handles
.is_empty() {
326 // Wait until the timeout and return.
328 Timeout
::Now
=> return None
,
330 utils
::sleep_until(None
);
333 Timeout
::At(when
) => {
334 utils
::sleep_until(Some(when
));
340 // Shuffle the operations for fairness.
341 utils
::shuffle(handles
);
344 let backoff
= Backoff
::new();
346 // Check operations for readiness.
347 for &(handle
, i
, _
) in handles
.iter() {
348 if handle
.is_ready() {
353 if backoff
.is_completed() {
360 // Check for timeout.
362 Timeout
::Now
=> return None
,
364 Timeout
::At(when
) => {
365 if Instant
::now() >= when
{
371 // Prepare for blocking.
372 let res
= Context
::with(|cx
| {
373 let mut sel
= Selected
::Waiting
;
374 let mut registered_count
= 0;
376 // Begin watching all operations.
377 for (handle
, _
, _
) in handles
.iter_mut() {
378 registered_count
+= 1;
379 let oper
= Operation
::hook
::<&dyn SelectHandle
>(handle
);
381 // If registration returns `false`, that means the operation has just become ready.
382 if handle
.watch(oper
, cx
) {
383 sel
= match cx
.try_select(Selected
::Operation(oper
)) {
384 Ok(()) => Selected
::Operation(oper
),
390 // If another thread has already chosen one of the operations, stop registration.
392 if sel
!= Selected
::Waiting
{
397 if sel
== Selected
::Waiting
{
398 // Check with each operation for how long we're allowed to block, and compute the
399 // earliest deadline.
400 let mut deadline
: Option
<Instant
> = match timeout
{
401 Timeout
::Now
=> unreachable
!(),
402 Timeout
::Never
=> None
,
403 Timeout
::At(when
) => Some(when
),
405 for &(handle
, _
, _
) in handles
.iter() {
406 if let Some(x
) = handle
.deadline() {
407 deadline
= deadline
.map(|y
| x
.min(y
)).or(Some(x
));
411 // Block the current thread.
412 sel
= cx
.wait_until(deadline
);
415 // Unwatch all operations.
416 for (handle
, _
, _
) in handles
.iter_mut().take(registered_count
) {
417 handle
.unwatch(Operation
::hook
::<&dyn SelectHandle
>(handle
));
421 Selected
::Waiting
=> unreachable
!(),
422 Selected
::Aborted
=> {}
423 Selected
::Disconnected
=> {}
424 Selected
::Operation(_
) => {
425 for (handle
, i
, _
) in handles
.iter_mut() {
426 let oper
= Operation
::hook
::<&dyn SelectHandle
>(handle
);
427 if sel
== Selected
::Operation(oper
) {
437 // Return if an operation became ready.
444 /// Attempts to select one of the operations without blocking.
446 pub fn try_select
<'a
>(
447 handles
: &mut [(&'a
dyn SelectHandle
, usize, *const u8)],
448 ) -> Result
<SelectedOperation
<'a
>, TrySelectError
> {
449 match run_select(handles
, Timeout
::Now
) {
450 None
=> Err(TrySelectError
),
451 Some((token
, index
, ptr
)) => Ok(SelectedOperation
{
455 _marker
: PhantomData
,
460 /// Blocks until one of the operations becomes ready and selects it.
463 handles
: &mut [(&'a
dyn SelectHandle
, usize, *const u8)],
464 ) -> SelectedOperation
<'a
> {
465 if handles
.is_empty() {
466 panic
!("no operations have been added to `Select`");
469 let (token
, index
, ptr
) = run_select(handles
, Timeout
::Never
).unwrap();
474 _marker
: PhantomData
,
478 /// Blocks for a limited time until one of the operations becomes ready and selects it.
480 pub fn select_timeout
<'a
>(
481 handles
: &mut [(&'a
dyn SelectHandle
, usize, *const u8)],
483 ) -> Result
<SelectedOperation
<'a
>, SelectTimeoutError
> {
484 let timeout
= Timeout
::At(Instant
::now() + timeout
);
486 match run_select(handles
, timeout
) {
487 None
=> Err(SelectTimeoutError
),
488 Some((token
, index
, ptr
)) => Ok(SelectedOperation
{
492 _marker
: PhantomData
,
497 /// Selects from a set of channel operations.
499 /// `Select` allows you to define a set of channel operations, wait until any one of them becomes
500 /// ready, and finally execute it. If multiple operations are ready at the same time, a random one
501 /// among them is selected.
503 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
504 /// when it will simply return an error because the channel is disconnected.
506 /// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
507 /// dynamically created list of channel operations.
509 /// Once a list of operations has been built with `Select`, there are two different ways of
512 /// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
513 /// the returned selected operation has already begun and **must** be completed. If we don't
514 /// complete it, a panic will occur.
516 /// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
517 /// successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
518 /// possible for another thread to make the operation not ready just before we try executing it,
519 /// so it's wise to use a retry loop. However, note that these methods might return with success
520 /// spuriously, so it's a good idea to always double check if the operation is really ready.
524 /// Use [`select`] to receive a message from a list of receivers:
527 /// use crossbeam_channel::{Receiver, RecvError, Select};
529 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
530 /// // Build a list of operations.
531 /// let mut sel = Select::new();
536 /// // Complete the selected operation.
537 /// let oper = sel.select();
538 /// let index = oper.index();
539 /// oper.recv(&rs[index])
543 /// Use [`ready`] to receive a message from a list of receivers:
546 /// use crossbeam_channel::{Receiver, RecvError, Select};
548 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
549 /// // Build a list of operations.
550 /// let mut sel = Select::new();
556 /// // Wait until a receive operation becomes ready and try executing it.
557 /// let index = sel.ready();
558 /// let res = rs[index].try_recv();
560 /// // If the operation turns out not to be ready, retry.
561 /// if let Err(e) = res {
562 /// if e.is_empty() {
568 /// return res.map_err(|_| RecvError);
573 /// [`select!`]: macro.select.html
574 /// [`try_select`]: struct.Select.html#method.try_select
575 /// [`select`]: struct.Select.html#method.select
576 /// [`select_timeout`]: struct.Select.html#method.select_timeout
577 /// [`try_ready`]: struct.Select.html#method.try_ready
578 /// [`ready`]: struct.Select.html#method.ready
579 /// [`ready_timeout`]: struct.Select.html#method.ready_timeout
580 pub struct Select
<'a
> {
581 /// A list of senders and receivers participating in selection.
582 handles
: Vec
<(&'a
dyn SelectHandle
, usize, *const u8)>,
584 /// The next index to assign to an operation.
588 unsafe impl<'a
> Send
for Select
<'a
> {}
589 unsafe impl<'a
> Sync
for Select
<'a
> {}
591 impl<'a
> Select
<'a
> {
592 /// Creates an empty list of channel operations for selection.
597 /// use crossbeam_channel::Select;
599 /// let mut sel = Select::new();
601 /// // The list of operations is empty, which means no operation can be selected.
602 /// assert!(sel.try_select().is_err());
604 pub fn new() -> Select
<'a
> {
606 handles
: Vec
::with_capacity(4),
611 /// Adds a send operation.
613 /// Returns the index of the added operation.
619 /// use crossbeam_channel::{unbounded, Select};
621 /// let (s, r) = unbounded::<i32>();
623 /// let mut sel = Select::new();
624 /// let index = sel.send(&s);
626 pub fn send
<T
>(&mut self, s
: &'a Sender
<T
>) -> usize {
627 let i
= self.next_index
;
628 let ptr
= s
as *const Sender
<_
> as *const u8;
629 self.handles
.push((s
, i
, ptr
));
630 self.next_index
+= 1;
634 /// Adds a receive operation.
636 /// Returns the index of the added operation.
642 /// use crossbeam_channel::{unbounded, Select};
644 /// let (s, r) = unbounded::<i32>();
646 /// let mut sel = Select::new();
647 /// let index = sel.recv(&r);
649 pub fn recv
<T
>(&mut self, r
: &'a Receiver
<T
>) -> usize {
650 let i
= self.next_index
;
651 let ptr
= r
as *const Receiver
<_
> as *const u8;
652 self.handles
.push((r
, i
, ptr
));
653 self.next_index
+= 1;
657 /// Removes a previously added operation.
659 /// This is useful when an operation is selected because the channel got disconnected and we
660 /// want to try again to select a different operation instead.
662 /// If new operations are added after removing some, the indices of removed operations will not
667 /// An attempt to remove a non-existing or already removed operation will panic.
673 /// use crossbeam_channel::{unbounded, Select};
675 /// let (s1, r1) = unbounded::<i32>();
676 /// let (_, r2) = unbounded::<i32>();
678 /// let mut sel = Select::new();
679 /// let oper1 = sel.recv(&r1);
680 /// let oper2 = sel.recv(&r2);
682 /// // Both operations are initially ready, so a random one will be executed.
683 /// let oper = sel.select();
684 /// assert_eq!(oper.index(), oper2);
685 /// assert!(oper.recv(&r2).is_err());
686 /// sel.remove(oper2);
688 /// s1.send(10).unwrap();
690 /// let oper = sel.select();
691 /// assert_eq!(oper.index(), oper1);
692 /// assert_eq!(oper.recv(&r1), Ok(10));
694 pub fn remove(&mut self, index
: usize) {
696 index
< self.next_index
,
697 "index out of bounds; {} >= {}",
706 .find(|(_
, (_
, i
, _
))| *i
== index
)
707 .expect("no operation with this index")
710 self.handles
.swap_remove(i
);
713 /// Attempts to select one of the operations without blocking.
715 /// If an operation is ready, it is selected and returned. If multiple operations are ready at
716 /// the same time, a random one among them is selected. If none of the operations are ready, an
717 /// error is returned.
719 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
720 /// even when it will simply return an error because the channel is disconnected.
722 /// The selected operation must be completed with [`SelectedOperation::send`]
723 /// or [`SelectedOperation::recv`].
725 /// [`SelectedOperation::send`]: struct.SelectedOperation.html#method.send
726 /// [`SelectedOperation::recv`]: struct.SelectedOperation.html#method.recv
732 /// use crossbeam_channel::{unbounded, Select};
734 /// let (s1, r1) = unbounded();
735 /// let (s2, r2) = unbounded();
737 /// s1.send(10).unwrap();
738 /// s2.send(20).unwrap();
740 /// let mut sel = Select::new();
741 /// let oper1 = sel.recv(&r1);
742 /// let oper2 = sel.recv(&r2);
744 /// // Both operations are initially ready, so a random one will be executed.
745 /// let oper = sel.try_select();
747 /// Err(_) => panic!("both operations should be ready"),
748 /// Ok(oper) => match oper.index() {
749 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
750 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
751 /// _ => unreachable!(),
755 pub fn try_select(&mut self) -> Result
<SelectedOperation
<'a
>, TrySelectError
> {
756 try_select(&mut self.handles
)
759 /// Blocks until one of the operations becomes ready and selects it.
761 /// Once an operation becomes ready, it is selected and returned. If multiple operations are
762 /// ready at the same time, a random one among them is selected.
764 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
765 /// even when it will simply return an error because the channel is disconnected.
767 /// The selected operation must be completed with [`SelectedOperation::send`]
768 /// or [`SelectedOperation::recv`].
770 /// [`SelectedOperation::send`]: struct.SelectedOperation.html#method.send
771 /// [`SelectedOperation::recv`]: struct.SelectedOperation.html#method.recv
775 /// Panics if no operations have been added to `Select`.
781 /// use std::time::Duration;
782 /// use crossbeam_channel::{unbounded, Select};
784 /// let (s1, r1) = unbounded();
785 /// let (s2, r2) = unbounded();
787 /// thread::spawn(move || {
788 /// thread::sleep(Duration::from_secs(1));
789 /// s1.send(10).unwrap();
791 /// thread::spawn(move || s2.send(20).unwrap());
793 /// let mut sel = Select::new();
794 /// let oper1 = sel.recv(&r1);
795 /// let oper2 = sel.recv(&r2);
797 /// // The second operation will be selected because it becomes ready first.
798 /// let oper = sel.select();
799 /// match oper.index() {
800 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
801 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
802 /// _ => unreachable!(),
805 pub fn select(&mut self) -> SelectedOperation
<'a
> {
806 select(&mut self.handles
)
809 /// Blocks for a limited time until one of the operations becomes ready and selects it.
811 /// If an operation becomes ready, it is selected and returned. If multiple operations are
812 /// ready at the same time, a random one among them is selected. If none of the operations
813 /// become ready for the specified duration, an error is returned.
815 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
816 /// even when it will simply return an error because the channel is disconnected.
818 /// The selected operation must be completed with [`SelectedOperation::send`]
819 /// or [`SelectedOperation::recv`].
821 /// [`SelectedOperation::send`]: struct.SelectedOperation.html#method.send
822 /// [`SelectedOperation::recv`]: struct.SelectedOperation.html#method.recv
828 /// use std::time::Duration;
829 /// use crossbeam_channel::{unbounded, Select};
831 /// let (s1, r1) = unbounded();
832 /// let (s2, r2) = unbounded();
834 /// thread::spawn(move || {
835 /// thread::sleep(Duration::from_secs(1));
836 /// s1.send(10).unwrap();
838 /// thread::spawn(move || s2.send(20).unwrap());
840 /// let mut sel = Select::new();
841 /// let oper1 = sel.recv(&r1);
842 /// let oper2 = sel.recv(&r2);
844 /// // The second operation will be selected because it becomes ready first.
845 /// let oper = sel.select_timeout(Duration::from_millis(500));
847 /// Err(_) => panic!("should not have timed out"),
848 /// Ok(oper) => match oper.index() {
849 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
850 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
851 /// _ => unreachable!(),
855 pub fn select_timeout(
858 ) -> Result
<SelectedOperation
<'a
>, SelectTimeoutError
> {
859 select_timeout(&mut self.handles
, timeout
)
862 /// Attempts to find a ready operation without blocking.
864 /// If an operation is ready, its index is returned. If multiple operations are ready at the
865 /// same time, a random one among them is chosen. If none of the operations are ready, an error
868 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
869 /// even when it will simply return an error because the channel is disconnected.
871 /// Note that this method might return with success spuriously, so it's a good idea to always
872 /// double check if the operation is really ready.
878 /// use crossbeam_channel::{unbounded, Select};
880 /// let (s1, r1) = unbounded();
881 /// let (s2, r2) = unbounded();
883 /// s1.send(10).unwrap();
884 /// s2.send(20).unwrap();
886 /// let mut sel = Select::new();
887 /// let oper1 = sel.recv(&r1);
888 /// let oper2 = sel.recv(&r2);
890 /// // Both operations are initially ready, so a random one will be chosen.
891 /// match sel.try_ready() {
892 /// Err(_) => panic!("both operations should be ready"),
893 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
894 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
895 /// Ok(_) => unreachable!(),
898 pub fn try_ready(&mut self) -> Result
<usize, TryReadyError
> {
899 match run_ready(&mut self.handles
, Timeout
::Now
) {
900 None
=> Err(TryReadyError
),
901 Some(index
) => Ok(index
),
905 /// Blocks until one of the operations becomes ready.
907 /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
908 /// the same time, a random one among them is chosen.
910 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
911 /// even when it will simply return an error because the channel is disconnected.
913 /// Note that this method might return with success spuriously, so it's a good idea to always
914 /// double check if the operation is really ready.
918 /// Panics if no operations have been added to `Select`.
924 /// use std::time::Duration;
925 /// use crossbeam_channel::{unbounded, Select};
927 /// let (s1, r1) = unbounded();
928 /// let (s2, r2) = unbounded();
930 /// thread::spawn(move || {
931 /// thread::sleep(Duration::from_secs(1));
932 /// s1.send(10).unwrap();
934 /// thread::spawn(move || s2.send(20).unwrap());
936 /// let mut sel = Select::new();
937 /// let oper1 = sel.recv(&r1);
938 /// let oper2 = sel.recv(&r2);
940 /// // The second operation will be selected because it becomes ready first.
941 /// match sel.ready() {
942 /// i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
943 /// i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
944 /// _ => unreachable!(),
947 pub fn ready(&mut self) -> usize {
948 if self.handles
.is_empty() {
949 panic
!("no operations have been added to `Select`");
952 run_ready(&mut self.handles
, Timeout
::Never
).unwrap()
955 /// Blocks for a limited time until one of the operations becomes ready.
957 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
958 /// the same time, a random one among them is chosen. If none of the operations become ready
959 /// for the specified duration, an error is returned.
961 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
962 /// even when it will simply return an error because the channel is disconnected.
964 /// Note that this method might return with success spuriously, so it's a good idea to double
965 /// check if the operation is really ready.
971 /// use std::time::Duration;
972 /// use crossbeam_channel::{unbounded, Select};
974 /// let (s1, r1) = unbounded();
975 /// let (s2, r2) = unbounded();
977 /// thread::spawn(move || {
978 /// thread::sleep(Duration::from_secs(1));
979 /// s1.send(10).unwrap();
981 /// thread::spawn(move || s2.send(20).unwrap());
983 /// let mut sel = Select::new();
984 /// let oper1 = sel.recv(&r1);
985 /// let oper2 = sel.recv(&r2);
987 /// // The second operation will be selected because it becomes ready first.
988 /// match sel.ready_timeout(Duration::from_millis(500)) {
989 /// Err(_) => panic!("should not have timed out"),
990 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
991 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
992 /// Ok(_) => unreachable!(),
995 pub fn ready_timeout(&mut self, timeout
: Duration
) -> Result
<usize, ReadyTimeoutError
> {
996 let timeout
= Timeout
::At(Instant
::now() + timeout
);
998 match run_ready(&mut self.handles
, timeout
) {
999 None
=> Err(ReadyTimeoutError
),
1000 Some(index
) => Ok(index
),
1005 impl<'a
> Clone
for Select
<'a
> {
1006 fn clone(&self) -> Select
<'a
> {
1008 handles
: self.handles
.clone(),
1009 next_index
: self.next_index
,
1014 impl<'a
> Default
for Select
<'a
> {
1015 fn default() -> Select
<'a
> {
1020 impl<'a
> fmt
::Debug
for Select
<'a
> {
1021 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1022 f
.pad("Select { .. }")
1026 /// A selected operation that needs to be completed.
1028 /// To complete the operation, call [`send`] or [`recv`].
1032 /// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1033 /// `SelectedOperation` is dropped without completion, a panic occurs.
1035 /// [`send`]: struct.SelectedOperation.html#method.send
1036 /// [`recv`]: struct.SelectedOperation.html#method.recv
1038 pub struct SelectedOperation
<'a
> {
1039 /// Token needed to complete the operation.
1042 /// The index of the selected operation.
1045 /// The address of the selected `Sender` or `Receiver`.
1048 /// Indicates that `Sender`s and `Receiver`s are borrowed.
1049 _marker
: PhantomData
<&'
a ()>,
1052 impl<'a
> SelectedOperation
<'a
> {
1053 /// Returns the index of the selected operation.
1058 /// use crossbeam_channel::{bounded, Select};
1060 /// let (s1, r1) = bounded::<()>(0);
1061 /// let (s2, r2) = bounded::<()>(0);
1062 /// let (s3, r3) = bounded::<()>(1);
1064 /// let mut sel = Select::new();
1065 /// let oper1 = sel.send(&s1);
1066 /// let oper2 = sel.recv(&r2);
1067 /// let oper3 = sel.send(&s3);
1069 /// // Only the last operation is ready.
1070 /// let oper = sel.select();
1071 /// assert_eq!(oper.index(), 2);
1072 /// assert_eq!(oper.index(), oper3);
1074 /// // Complete the operation.
1075 /// oper.send(&s3, ()).unwrap();
1077 pub fn index(&self) -> usize {
1081 /// Completes the send operation.
1083 /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1084 /// when the operation was added.
1088 /// Panics if an incorrect [`Sender`] reference is passed.
1093 /// use crossbeam_channel::{bounded, Select, SendError};
1095 /// let (s, r) = bounded::<i32>(0);
1098 /// let mut sel = Select::new();
1099 /// let oper1 = sel.send(&s);
1101 /// let oper = sel.select();
1102 /// assert_eq!(oper.index(), oper1);
1103 /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1106 /// [`Sender`]: struct.Sender.html
1107 /// [`Select::send`]: struct.Select.html#method.send
1108 pub fn send
<T
>(mut self, s
: &Sender
<T
>, msg
: T
) -> Result
<(), SendError
<T
>> {
1110 s
as *const Sender
<T
> as *const u8 == self.ptr
,
1111 "passed a sender that wasn't selected",
1113 let res
= unsafe { channel::write(s, &mut self.token, msg) }
;
1115 res
.map_err(SendError
)
1118 /// Completes the receive operation.
1120 /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1121 /// when the operation was added.
1125 /// Panics if an incorrect [`Receiver`] reference is passed.
1130 /// use crossbeam_channel::{bounded, Select, RecvError};
1132 /// let (s, r) = bounded::<i32>(0);
1135 /// let mut sel = Select::new();
1136 /// let oper1 = sel.recv(&r);
1138 /// let oper = sel.select();
1139 /// assert_eq!(oper.index(), oper1);
1140 /// assert_eq!(oper.recv(&r), Err(RecvError));
1143 /// [`Receiver`]: struct.Receiver.html
1144 /// [`Select::recv`]: struct.Select.html#method.recv
1145 pub fn recv
<T
>(mut self, r
: &Receiver
<T
>) -> Result
<T
, RecvError
> {
1147 r
as *const Receiver
<T
> as *const u8 == self.ptr
,
1148 "passed a receiver that wasn't selected",
1150 let res
= unsafe { channel::read(r, &mut self.token) }
;
1152 res
.map_err(|_
| RecvError
)
1156 impl<'a
> fmt
::Debug
for SelectedOperation
<'a
> {
1157 fn fmt(&self, f
: &mut fmt
::Formatter
) -> fmt
::Result
{
1158 f
.pad("SelectedOperation { .. }")
1162 impl<'a
> Drop
for SelectedOperation
<'a
> {
1163 fn drop(&mut self) {
1164 panic
!("dropped `SelectedOperation` without completing the operation");