1 //! Interface to the select mechanism.
4 use std
::marker
::PhantomData
;
6 use std
::time
::{Duration, Instant}
;
8 use crossbeam_utils
::Backoff
;
10 use crate::channel
::{self, Receiver, Sender}
;
11 use crate::context
::Context
;
12 use crate::err
::{ReadyTimeoutError, TryReadyError}
;
13 use crate::err
::{RecvError, SendError}
;
14 use crate::err
::{SelectTimeoutError, TrySelectError}
;
18 /// Temporary data that gets initialized during select or a blocking operation, and is consumed by
19 /// `read` or `write`.
21 /// Each field contains data associated with a specific channel flavor.
22 #[derive(Debug, Default)]
24 pub at
: flavors
::at
::AtToken
,
25 pub array
: flavors
::array
::ArrayToken
,
26 pub list
: flavors
::list
::ListToken
,
27 pub never
: flavors
::never
::NeverToken
,
28 pub tick
: flavors
::tick
::TickToken
,
29 pub zero
: flavors
::zero
::ZeroToken
,
32 /// Identifier associated with an operation by a specific thread on a specific channel.
33 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
34 pub struct Operation(usize);
37 /// Creates an operation identifier from a mutable reference.
39 /// This function essentially just turns the address of the reference into a number. The
40 /// reference should point to a variable that is specific to the thread and the operation,
41 /// and is alive for the entire duration of select or blocking operation.
43 pub fn hook
<T
>(r
: &mut T
) -> Operation
{
44 let val
= r
as *mut T
as usize;
45 // Make sure that the pointer address doesn't equal the numerical representation of
46 // `Selected::{Waiting, Aborted, Disconnected}`.
52 /// Current state of a select or a blocking operation.
53 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
55 /// Still waiting for an operation.
58 /// The attempt to block the current thread has been aborted.
61 /// An operation became ready because a channel is disconnected.
64 /// An operation became ready because a message can be sent or received.
68 impl From
<usize> for Selected
{
70 fn from(val
: usize) -> Selected
{
72 0 => Selected
::Waiting
,
73 1 => Selected
::Aborted
,
74 2 => Selected
::Disconnected
,
75 oper
=> Selected
::Operation(Operation(oper
)),
80 impl Into
<usize> for Selected
{
82 fn into(self) -> usize {
84 Selected
::Waiting
=> 0,
85 Selected
::Aborted
=> 1,
86 Selected
::Disconnected
=> 2,
87 Selected
::Operation(Operation(val
)) => val
,
92 /// A receiver or a sender that can participate in select.
94 /// This is a handle that assists select in executing an operation, registration, deciding on the
95 /// appropriate deadline for blocking, etc.
96 pub trait SelectHandle
{
97 /// Attempts to select an operation and returns `true` on success.
98 fn try_select(&self, token
: &mut Token
) -> bool
;
100 /// Returns a deadline for an operation, if there is one.
101 fn deadline(&self) -> Option
<Instant
>;
103 /// Registers an operation for execution and returns `true` if it is now ready.
104 fn register(&self, oper
: Operation
, cx
: &Context
) -> bool
;
106 /// Unregisters an operation for execution.
107 fn unregister(&self, oper
: Operation
);
109 /// Attempts to select an operation the thread got woken up for and returns `true` on success.
110 fn accept(&self, token
: &mut Token
, cx
: &Context
) -> bool
;
112 /// Returns `true` if an operation can be executed without blocking.
113 fn is_ready(&self) -> bool
;
115 /// Registers an operation for readiness notification and returns `true` if it is now ready.
116 fn watch(&self, oper
: Operation
, cx
: &Context
) -> bool
;
118 /// Unregisters an operation for readiness notification.
119 fn unwatch(&self, oper
: Operation
);
122 impl<T
: SelectHandle
> SelectHandle
for &T
{
123 fn try_select(&self, token
: &mut Token
) -> bool
{
124 (**self).try_select(token
)
127 fn deadline(&self) -> Option
<Instant
> {
131 fn register(&self, oper
: Operation
, cx
: &Context
) -> bool
{
132 (**self).register(oper
, cx
)
135 fn unregister(&self, oper
: Operation
) {
136 (**self).unregister(oper
);
139 fn accept(&self, token
: &mut Token
, cx
: &Context
) -> bool
{
140 (**self).accept(token
, cx
)
143 fn is_ready(&self) -> bool
{
147 fn watch(&self, oper
: Operation
, cx
: &Context
) -> bool
{
148 (**self).watch(oper
, cx
)
151 fn unwatch(&self, oper
: Operation
) {
152 (**self).unwatch(oper
)
156 /// Determines when a select operation should time out.
157 #[derive(Clone, Copy, Eq, PartialEq)]
165 /// Time out after the time instant.
169 /// Runs until one of the operations is selected, potentially blocking the current thread.
171 /// Successful receive operations will have to be followed up by `channel::read()` and successful
172 /// send operations by `channel::write()`.
174 handles
: &mut [(&dyn SelectHandle
, usize, *const u8)],
176 ) -> Option
<(Token
, usize, *const u8)> {
177 if handles
.is_empty() {
178 // Wait until the timeout and return.
180 Timeout
::Now
=> return None
,
182 utils
::sleep_until(None
);
185 Timeout
::At(when
) => {
186 utils
::sleep_until(Some(when
));
192 // Shuffle the operations for fairness.
193 utils
::shuffle(handles
);
195 // Create a token, which serves as a temporary variable that gets initialized in this function
196 // and is later used by a call to `channel::read()` or `channel::write()` that completes the
197 // selected operation.
198 let mut token
= Token
::default();
200 // Try selecting one of the operations without blocking.
201 for &(handle
, i
, ptr
) in handles
.iter() {
202 if handle
.try_select(&mut token
) {
203 return Some((token
, i
, ptr
));
208 // Prepare for blocking.
209 let res
= Context
::with(|cx
| {
210 let mut sel
= Selected
::Waiting
;
211 let mut registered_count
= 0;
212 let mut index_ready
= None
;
214 if let Timeout
::Now
= timeout
{
215 cx
.try_select(Selected
::Aborted
).unwrap();
218 // Register all operations.
219 for (handle
, i
, _
) in handles
.iter_mut() {
220 registered_count
+= 1;
222 // If registration returns `false`, that means the operation has just become ready.
223 if handle
.register(Operation
::hook
::<&dyn SelectHandle
>(handle
), cx
) {
224 // Try aborting select.
225 sel
= match cx
.try_select(Selected
::Aborted
) {
227 index_ready
= Some(*i
);
235 // If another thread has already selected one of the operations, stop registration.
237 if sel
!= Selected
::Waiting
{
242 if sel
== Selected
::Waiting
{
243 // Check with each operation for how long we're allowed to block, and compute the
244 // earliest deadline.
245 let mut deadline
: Option
<Instant
> = match timeout
{
246 Timeout
::Now
=> return None
,
247 Timeout
::Never
=> None
,
248 Timeout
::At(when
) => Some(when
),
250 for &(handle
, _
, _
) in handles
.iter() {
251 if let Some(x
) = handle
.deadline() {
252 deadline
= deadline
.map(|y
| x
.min(y
)).or(Some(x
));
256 // Block the current thread.
257 sel
= cx
.wait_until(deadline
);
260 // Unregister all registered operations.
261 for (handle
, _
, _
) in handles
.iter_mut().take(registered_count
) {
262 handle
.unregister(Operation
::hook
::<&dyn SelectHandle
>(handle
));
266 Selected
::Waiting
=> unreachable
!(),
267 Selected
::Aborted
=> {
268 // If an operation became ready during registration, try selecting it.
269 if let Some(index_ready
) = index_ready
{
270 for &(handle
, i
, ptr
) in handles
.iter() {
271 if i
== index_ready
&& handle
.try_select(&mut token
) {
272 return Some((i
, ptr
));
277 Selected
::Disconnected
=> {}
278 Selected
::Operation(_
) => {
279 // Find the selected operation.
280 for (handle
, i
, ptr
) in handles
.iter_mut() {
281 // Is this the selected operation?
282 if sel
== Selected
::Operation(Operation
::hook
::<&dyn SelectHandle
>(handle
))
284 // Try selecting this operation.
285 if handle
.accept(&mut token
, cx
) {
286 return Some((*i
, *ptr
));
296 // Return if an operation was selected.
297 if let Some((i
, ptr
)) = res
{
298 return Some((token
, i
, ptr
));
301 // Try selecting one of the operations without blocking.
302 for &(handle
, i
, ptr
) in handles
.iter() {
303 if handle
.try_select(&mut token
) {
304 return Some((token
, i
, ptr
));
309 Timeout
::Now
=> return None
,
311 Timeout
::At(when
) => {
312 if Instant
::now() >= when
{
320 /// Runs until one of the operations becomes ready, potentially blocking the current thread.
322 handles
: &mut [(&dyn SelectHandle
, usize, *const u8)],
325 if handles
.is_empty() {
326 // Wait until the timeout and return.
328 Timeout
::Now
=> return None
,
330 utils
::sleep_until(None
);
333 Timeout
::At(when
) => {
334 utils
::sleep_until(Some(when
));
340 // Shuffle the operations for fairness.
341 utils
::shuffle(handles
);
344 let backoff
= Backoff
::new();
346 // Check operations for readiness.
347 for &(handle
, i
, _
) in handles
.iter() {
348 if handle
.is_ready() {
353 if backoff
.is_completed() {
360 // Check for timeout.
362 Timeout
::Now
=> return None
,
364 Timeout
::At(when
) => {
365 if Instant
::now() >= when
{
371 // Prepare for blocking.
372 let res
= Context
::with(|cx
| {
373 let mut sel
= Selected
::Waiting
;
374 let mut registered_count
= 0;
376 // Begin watching all operations.
377 for (handle
, _
, _
) in handles
.iter_mut() {
378 registered_count
+= 1;
379 let oper
= Operation
::hook
::<&dyn SelectHandle
>(handle
);
381 // If registration returns `false`, that means the operation has just become ready.
382 if handle
.watch(oper
, cx
) {
383 sel
= match cx
.try_select(Selected
::Operation(oper
)) {
384 Ok(()) => Selected
::Operation(oper
),
390 // If another thread has already chosen one of the operations, stop registration.
392 if sel
!= Selected
::Waiting
{
397 if sel
== Selected
::Waiting
{
398 // Check with each operation for how long we're allowed to block, and compute the
399 // earliest deadline.
400 let mut deadline
: Option
<Instant
> = match timeout
{
401 Timeout
::Now
=> unreachable
!(),
402 Timeout
::Never
=> None
,
403 Timeout
::At(when
) => Some(when
),
405 for &(handle
, _
, _
) in handles
.iter() {
406 if let Some(x
) = handle
.deadline() {
407 deadline
= deadline
.map(|y
| x
.min(y
)).or(Some(x
));
411 // Block the current thread.
412 sel
= cx
.wait_until(deadline
);
415 // Unwatch all operations.
416 for (handle
, _
, _
) in handles
.iter_mut().take(registered_count
) {
417 handle
.unwatch(Operation
::hook
::<&dyn SelectHandle
>(handle
));
421 Selected
::Waiting
=> unreachable
!(),
422 Selected
::Aborted
=> {}
423 Selected
::Disconnected
=> {}
424 Selected
::Operation(_
) => {
425 for (handle
, i
, _
) in handles
.iter_mut() {
426 let oper
= Operation
::hook
::<&dyn SelectHandle
>(handle
);
427 if sel
== Selected
::Operation(oper
) {
437 // Return if an operation became ready.
444 /// Attempts to select one of the operations without blocking.
446 pub fn try_select
<'a
>(
447 handles
: &mut [(&'a
dyn SelectHandle
, usize, *const u8)],
448 ) -> Result
<SelectedOperation
<'a
>, TrySelectError
> {
449 match run_select(handles
, Timeout
::Now
) {
450 None
=> Err(TrySelectError
),
451 Some((token
, index
, ptr
)) => Ok(SelectedOperation
{
455 _marker
: PhantomData
,
460 /// Blocks until one of the operations becomes ready and selects it.
463 handles
: &mut [(&'a
dyn SelectHandle
, usize, *const u8)],
464 ) -> SelectedOperation
<'a
> {
465 if handles
.is_empty() {
466 panic
!("no operations have been added to `Select`");
469 let (token
, index
, ptr
) = run_select(handles
, Timeout
::Never
).unwrap();
474 _marker
: PhantomData
,
478 /// Blocks for a limited time until one of the operations becomes ready and selects it.
480 pub fn select_timeout
<'a
>(
481 handles
: &mut [(&'a
dyn SelectHandle
, usize, *const u8)],
483 ) -> Result
<SelectedOperation
<'a
>, SelectTimeoutError
> {
484 select_deadline(handles
, Instant
::now() + timeout
)
487 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
489 pub fn select_deadline
<'a
>(
490 handles
: &mut [(&'a
dyn SelectHandle
, usize, *const u8)],
492 ) -> Result
<SelectedOperation
<'a
>, SelectTimeoutError
> {
493 match run_select(handles
, Timeout
::At(deadline
)) {
494 None
=> Err(SelectTimeoutError
),
495 Some((token
, index
, ptr
)) => Ok(SelectedOperation
{
499 _marker
: PhantomData
,
504 /// Selects from a set of channel operations.
506 /// `Select` allows you to define a set of channel operations, wait until any one of them becomes
507 /// ready, and finally execute it. If multiple operations are ready at the same time, a random one
508 /// among them is selected.
510 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
511 /// when it will simply return an error because the channel is disconnected.
513 /// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
514 /// dynamically created list of channel operations.
516 /// Once a list of operations has been built with `Select`, there are two different ways of
519 /// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
520 /// the returned selected operation has already begun and **must** be completed. If we don't
521 /// complete it, a panic will occur.
523 /// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
524 /// successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
525 /// possible for another thread to make the operation not ready just before we try executing it,
526 /// so it's wise to use a retry loop. However, note that these methods might return with success
527 /// spuriously, so it's a good idea to always double check if the operation is really ready.
531 /// Use [`select`] to receive a message from a list of receivers:
534 /// use crossbeam_channel::{Receiver, RecvError, Select};
536 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
537 /// // Build a list of operations.
538 /// let mut sel = Select::new();
543 /// // Complete the selected operation.
544 /// let oper = sel.select();
545 /// let index = oper.index();
546 /// oper.recv(&rs[index])
550 /// Use [`ready`] to receive a message from a list of receivers:
553 /// use crossbeam_channel::{Receiver, RecvError, Select};
555 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
556 /// // Build a list of operations.
557 /// let mut sel = Select::new();
563 /// // Wait until a receive operation becomes ready and try executing it.
564 /// let index = sel.ready();
565 /// let res = rs[index].try_recv();
567 /// // If the operation turns out not to be ready, retry.
568 /// if let Err(e) = res {
569 /// if e.is_empty() {
575 /// return res.map_err(|_| RecvError);
580 /// [`try_select`]: Select::try_select
581 /// [`select`]: Select::select
582 /// [`select_timeout`]: Select::select_timeout
583 /// [`try_ready`]: Select::try_ready
584 /// [`ready`]: Select::ready
585 /// [`ready_timeout`]: Select::ready_timeout
586 pub struct Select
<'a
> {
587 /// A list of senders and receivers participating in selection.
588 handles
: Vec
<(&'a
dyn SelectHandle
, usize, *const u8)>,
590 /// The next index to assign to an operation.
594 unsafe impl Send
for Select
<'_
> {}
595 unsafe impl Sync
for Select
<'_
> {}
597 impl<'a
> Select
<'a
> {
598 /// Creates an empty list of channel operations for selection.
603 /// use crossbeam_channel::Select;
605 /// let mut sel = Select::new();
607 /// // The list of operations is empty, which means no operation can be selected.
608 /// assert!(sel.try_select().is_err());
610 pub fn new() -> Select
<'a
> {
612 handles
: Vec
::with_capacity(4),
617 /// Adds a send operation.
619 /// Returns the index of the added operation.
624 /// use crossbeam_channel::{unbounded, Select};
626 /// let (s, r) = unbounded::<i32>();
628 /// let mut sel = Select::new();
629 /// let index = sel.send(&s);
631 pub fn send
<T
>(&mut self, s
: &'a Sender
<T
>) -> usize {
632 let i
= self.next_index
;
633 let ptr
= s
as *const Sender
<_
> as *const u8;
634 self.handles
.push((s
, i
, ptr
));
635 self.next_index
+= 1;
639 /// Adds a receive operation.
641 /// Returns the index of the added operation.
646 /// use crossbeam_channel::{unbounded, Select};
648 /// let (s, r) = unbounded::<i32>();
650 /// let mut sel = Select::new();
651 /// let index = sel.recv(&r);
653 pub fn recv
<T
>(&mut self, r
: &'a Receiver
<T
>) -> usize {
654 let i
= self.next_index
;
655 let ptr
= r
as *const Receiver
<_
> as *const u8;
656 self.handles
.push((r
, i
, ptr
));
657 self.next_index
+= 1;
661 /// Removes a previously added operation.
663 /// This is useful when an operation is selected because the channel got disconnected and we
664 /// want to try again to select a different operation instead.
666 /// If new operations are added after removing some, the indices of removed operations will not
671 /// An attempt to remove a non-existing or already removed operation will panic.
676 /// use crossbeam_channel::{unbounded, Select};
678 /// let (s1, r1) = unbounded::<i32>();
679 /// let (_, r2) = unbounded::<i32>();
681 /// let mut sel = Select::new();
682 /// let oper1 = sel.recv(&r1);
683 /// let oper2 = sel.recv(&r2);
685 /// // Both operations are initially ready, so a random one will be executed.
686 /// let oper = sel.select();
687 /// assert_eq!(oper.index(), oper2);
688 /// assert!(oper.recv(&r2).is_err());
689 /// sel.remove(oper2);
691 /// s1.send(10).unwrap();
693 /// let oper = sel.select();
694 /// assert_eq!(oper.index(), oper1);
695 /// assert_eq!(oper.recv(&r1), Ok(10));
697 pub fn remove(&mut self, index
: usize) {
699 index
< self.next_index
,
700 "index out of bounds; {} >= {}",
709 .find(|(_
, (_
, i
, _
))| *i
== index
)
710 .expect("no operation with this index")
713 self.handles
.swap_remove(i
);
716 /// Attempts to select one of the operations without blocking.
718 /// If an operation is ready, it is selected and returned. If multiple operations are ready at
719 /// the same time, a random one among them is selected. If none of the operations are ready, an
720 /// error is returned.
722 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
723 /// even when it will simply return an error because the channel is disconnected.
725 /// The selected operation must be completed with [`SelectedOperation::send`]
726 /// or [`SelectedOperation::recv`].
731 /// use crossbeam_channel::{unbounded, Select};
733 /// let (s1, r1) = unbounded();
734 /// let (s2, r2) = unbounded();
736 /// s1.send(10).unwrap();
737 /// s2.send(20).unwrap();
739 /// let mut sel = Select::new();
740 /// let oper1 = sel.recv(&r1);
741 /// let oper2 = sel.recv(&r2);
743 /// // Both operations are initially ready, so a random one will be executed.
744 /// let oper = sel.try_select();
746 /// Err(_) => panic!("both operations should be ready"),
747 /// Ok(oper) => match oper.index() {
748 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
749 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
750 /// _ => unreachable!(),
754 pub fn try_select(&mut self) -> Result
<SelectedOperation
<'a
>, TrySelectError
> {
755 try_select(&mut self.handles
)
758 /// Blocks until one of the operations becomes ready and selects it.
760 /// Once an operation becomes ready, it is selected and returned. If multiple operations are
761 /// ready at the same time, a random one among them is selected.
763 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
764 /// even when it will simply return an error because the channel is disconnected.
766 /// The selected operation must be completed with [`SelectedOperation::send`]
767 /// or [`SelectedOperation::recv`].
771 /// Panics if no operations have been added to `Select`.
777 /// use std::time::Duration;
778 /// use crossbeam_channel::{unbounded, Select};
780 /// let (s1, r1) = unbounded();
781 /// let (s2, r2) = unbounded();
783 /// thread::spawn(move || {
784 /// thread::sleep(Duration::from_secs(1));
785 /// s1.send(10).unwrap();
787 /// thread::spawn(move || s2.send(20).unwrap());
789 /// let mut sel = Select::new();
790 /// let oper1 = sel.recv(&r1);
791 /// let oper2 = sel.recv(&r2);
793 /// // The second operation will be selected because it becomes ready first.
794 /// let oper = sel.select();
795 /// match oper.index() {
796 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
797 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
798 /// _ => unreachable!(),
801 pub fn select(&mut self) -> SelectedOperation
<'a
> {
802 select(&mut self.handles
)
805 /// Blocks for a limited time until one of the operations becomes ready and selects it.
807 /// If an operation becomes ready, it is selected and returned. If multiple operations are
808 /// ready at the same time, a random one among them is selected. If none of the operations
809 /// become ready for the specified duration, an error is returned.
811 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
812 /// even when it will simply return an error because the channel is disconnected.
814 /// The selected operation must be completed with [`SelectedOperation::send`]
815 /// or [`SelectedOperation::recv`].
821 /// use std::time::Duration;
822 /// use crossbeam_channel::{unbounded, Select};
824 /// let (s1, r1) = unbounded();
825 /// let (s2, r2) = unbounded();
827 /// thread::spawn(move || {
828 /// thread::sleep(Duration::from_secs(1));
829 /// s1.send(10).unwrap();
831 /// thread::spawn(move || s2.send(20).unwrap());
833 /// let mut sel = Select::new();
834 /// let oper1 = sel.recv(&r1);
835 /// let oper2 = sel.recv(&r2);
837 /// // The second operation will be selected because it becomes ready first.
838 /// let oper = sel.select_timeout(Duration::from_millis(500));
840 /// Err(_) => panic!("should not have timed out"),
841 /// Ok(oper) => match oper.index() {
842 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
843 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
844 /// _ => unreachable!(),
848 pub fn select_timeout(
851 ) -> Result
<SelectedOperation
<'a
>, SelectTimeoutError
> {
852 select_timeout(&mut self.handles
, timeout
)
855 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
857 /// If an operation becomes ready, it is selected and returned. If multiple operations are
858 /// ready at the same time, a random one among them is selected. If none of the operations
859 /// become ready before the given deadline, an error is returned.
861 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
862 /// even when it will simply return an error because the channel is disconnected.
864 /// The selected operation must be completed with [`SelectedOperation::send`]
865 /// or [`SelectedOperation::recv`].
867 /// [`SelectedOperation::send`]: struct.SelectedOperation.html#method.send
868 /// [`SelectedOperation::recv`]: struct.SelectedOperation.html#method.recv
874 /// use std::time::{Instant, Duration};
875 /// use crossbeam_channel::{unbounded, Select};
877 /// let (s1, r1) = unbounded();
878 /// let (s2, r2) = unbounded();
880 /// thread::spawn(move || {
881 /// thread::sleep(Duration::from_secs(1));
882 /// s1.send(10).unwrap();
884 /// thread::spawn(move || s2.send(20).unwrap());
886 /// let mut sel = Select::new();
887 /// let oper1 = sel.recv(&r1);
888 /// let oper2 = sel.recv(&r2);
890 /// let deadline = Instant::now() + Duration::from_millis(500);
892 /// // The second operation will be selected because it becomes ready first.
893 /// let oper = sel.select_deadline(deadline);
895 /// Err(_) => panic!("should not have timed out"),
896 /// Ok(oper) => match oper.index() {
897 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
898 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
899 /// _ => unreachable!(),
903 pub fn select_deadline(
906 ) -> Result
<SelectedOperation
<'a
>, SelectTimeoutError
> {
907 select_deadline(&mut self.handles
, deadline
)
910 /// Attempts to find a ready operation without blocking.
912 /// If an operation is ready, its index is returned. If multiple operations are ready at the
913 /// same time, a random one among them is chosen. If none of the operations are ready, an error
916 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
917 /// even when it will simply return an error because the channel is disconnected.
919 /// Note that this method might return with success spuriously, so it's a good idea to always
920 /// double check if the operation is really ready.
925 /// use crossbeam_channel::{unbounded, Select};
927 /// let (s1, r1) = unbounded();
928 /// let (s2, r2) = unbounded();
930 /// s1.send(10).unwrap();
931 /// s2.send(20).unwrap();
933 /// let mut sel = Select::new();
934 /// let oper1 = sel.recv(&r1);
935 /// let oper2 = sel.recv(&r2);
937 /// // Both operations are initially ready, so a random one will be chosen.
938 /// match sel.try_ready() {
939 /// Err(_) => panic!("both operations should be ready"),
940 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
941 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
942 /// Ok(_) => unreachable!(),
945 pub fn try_ready(&mut self) -> Result
<usize, TryReadyError
> {
946 match run_ready(&mut self.handles
, Timeout
::Now
) {
947 None
=> Err(TryReadyError
),
948 Some(index
) => Ok(index
),
952 /// Blocks until one of the operations becomes ready.
954 /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
955 /// the same time, a random one among them is chosen.
957 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
958 /// even when it will simply return an error because the channel is disconnected.
960 /// Note that this method might return with success spuriously, so it's a good idea to always
961 /// double check if the operation is really ready.
965 /// Panics if no operations have been added to `Select`.
971 /// use std::time::Duration;
972 /// use crossbeam_channel::{unbounded, Select};
974 /// let (s1, r1) = unbounded();
975 /// let (s2, r2) = unbounded();
977 /// thread::spawn(move || {
978 /// thread::sleep(Duration::from_secs(1));
979 /// s1.send(10).unwrap();
981 /// thread::spawn(move || s2.send(20).unwrap());
983 /// let mut sel = Select::new();
984 /// let oper1 = sel.recv(&r1);
985 /// let oper2 = sel.recv(&r2);
987 /// // The second operation will be selected because it becomes ready first.
988 /// match sel.ready() {
989 /// i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
990 /// i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
991 /// _ => unreachable!(),
994 pub fn ready(&mut self) -> usize {
995 if self.handles
.is_empty() {
996 panic
!("no operations have been added to `Select`");
999 run_ready(&mut self.handles
, Timeout
::Never
).unwrap()
1002 /// Blocks for a limited time until one of the operations becomes ready.
1004 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1005 /// the same time, a random one among them is chosen. If none of the operations become ready
1006 /// for the specified duration, an error is returned.
1008 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1009 /// even when it will simply return an error because the channel is disconnected.
1011 /// Note that this method might return with success spuriously, so it's a good idea to double
1012 /// check if the operation is really ready.
1017 /// use std::thread;
1018 /// use std::time::Duration;
1019 /// use crossbeam_channel::{unbounded, Select};
1021 /// let (s1, r1) = unbounded();
1022 /// let (s2, r2) = unbounded();
1024 /// thread::spawn(move || {
1025 /// thread::sleep(Duration::from_secs(1));
1026 /// s1.send(10).unwrap();
1028 /// thread::spawn(move || s2.send(20).unwrap());
1030 /// let mut sel = Select::new();
1031 /// let oper1 = sel.recv(&r1);
1032 /// let oper2 = sel.recv(&r2);
1034 /// // The second operation will be selected because it becomes ready first.
1035 /// match sel.ready_timeout(Duration::from_millis(500)) {
1036 /// Err(_) => panic!("should not have timed out"),
1037 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1038 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1039 /// Ok(_) => unreachable!(),
1042 pub fn ready_timeout(&mut self, timeout
: Duration
) -> Result
<usize, ReadyTimeoutError
> {
1043 self.ready_deadline(Instant
::now() + timeout
)
1046 /// Blocks until a given deadline, or until one of the operations becomes ready.
1048 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1049 /// the same time, a random one among them is chosen. If none of the operations become ready
1050 /// before the deadline, an error is returned.
1052 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1053 /// even when it will simply return an error because the channel is disconnected.
1055 /// Note that this method might return with success spuriously, so it's a good idea to double
1056 /// check if the operation is really ready.
1061 /// use std::thread;
1062 /// use std::time::{Duration, Instant};
1063 /// use crossbeam_channel::{unbounded, Select};
1065 /// let deadline = Instant::now() + Duration::from_millis(500);
1067 /// let (s1, r1) = unbounded();
1068 /// let (s2, r2) = unbounded();
1070 /// thread::spawn(move || {
1071 /// thread::sleep(Duration::from_secs(1));
1072 /// s1.send(10).unwrap();
1074 /// thread::spawn(move || s2.send(20).unwrap());
1076 /// let mut sel = Select::new();
1077 /// let oper1 = sel.recv(&r1);
1078 /// let oper2 = sel.recv(&r2);
1080 /// // The second operation will be selected because it becomes ready first.
1081 /// match sel.ready_deadline(deadline) {
1082 /// Err(_) => panic!("should not have timed out"),
1083 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1084 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1085 /// Ok(_) => unreachable!(),
1088 pub fn ready_deadline(&mut self, deadline
: Instant
) -> Result
<usize, ReadyTimeoutError
> {
1089 match run_ready(&mut self.handles
, Timeout
::At(deadline
)) {
1090 None
=> Err(ReadyTimeoutError
),
1091 Some(index
) => Ok(index
),
1096 impl<'a
> Clone
for Select
<'a
> {
1097 fn clone(&self) -> Select
<'a
> {
1099 handles
: self.handles
.clone(),
1100 next_index
: self.next_index
,
1105 impl<'a
> Default
for Select
<'a
> {
1106 fn default() -> Select
<'a
> {
1111 impl fmt
::Debug
for Select
<'_
> {
1112 fn fmt(&self, f
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
1113 f
.pad("Select { .. }")
1117 /// A selected operation that needs to be completed.
1119 /// To complete the operation, call [`send`] or [`recv`].
1123 /// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1124 /// `SelectedOperation` is dropped without completion, a panic occurs.
1126 /// [`send`]: SelectedOperation::send
1127 /// [`recv`]: SelectedOperation::recv
1129 pub struct SelectedOperation
<'a
> {
1130 /// Token needed to complete the operation.
1133 /// The index of the selected operation.
1136 /// The address of the selected `Sender` or `Receiver`.
1139 /// Indicates that `Sender`s and `Receiver`s are borrowed.
1140 _marker
: PhantomData
<&'
a ()>,
1143 impl SelectedOperation
<'_
> {
1144 /// Returns the index of the selected operation.
1149 /// use crossbeam_channel::{bounded, Select};
1151 /// let (s1, r1) = bounded::<()>(0);
1152 /// let (s2, r2) = bounded::<()>(0);
1153 /// let (s3, r3) = bounded::<()>(1);
1155 /// let mut sel = Select::new();
1156 /// let oper1 = sel.send(&s1);
1157 /// let oper2 = sel.recv(&r2);
1158 /// let oper3 = sel.send(&s3);
1160 /// // Only the last operation is ready.
1161 /// let oper = sel.select();
1162 /// assert_eq!(oper.index(), 2);
1163 /// assert_eq!(oper.index(), oper3);
1165 /// // Complete the operation.
1166 /// oper.send(&s3, ()).unwrap();
1168 pub fn index(&self) -> usize {
1172 /// Completes the send operation.
1174 /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1175 /// when the operation was added.
1179 /// Panics if an incorrect [`Sender`] reference is passed.
1184 /// use crossbeam_channel::{bounded, Select, SendError};
1186 /// let (s, r) = bounded::<i32>(0);
1189 /// let mut sel = Select::new();
1190 /// let oper1 = sel.send(&s);
1192 /// let oper = sel.select();
1193 /// assert_eq!(oper.index(), oper1);
1194 /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1196 pub fn send
<T
>(mut self, s
: &Sender
<T
>, msg
: T
) -> Result
<(), SendError
<T
>> {
1198 s
as *const Sender
<T
> as *const u8 == self.ptr
,
1199 "passed a sender that wasn't selected",
1201 let res
= unsafe { channel::write(s, &mut self.token, msg) }
;
1203 res
.map_err(SendError
)
1206 /// Completes the receive operation.
1208 /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1209 /// when the operation was added.
1213 /// Panics if an incorrect [`Receiver`] reference is passed.
1218 /// use crossbeam_channel::{bounded, Select, RecvError};
1220 /// let (s, r) = bounded::<i32>(0);
1223 /// let mut sel = Select::new();
1224 /// let oper1 = sel.recv(&r);
1226 /// let oper = sel.select();
1227 /// assert_eq!(oper.index(), oper1);
1228 /// assert_eq!(oper.recv(&r), Err(RecvError));
1230 pub fn recv
<T
>(mut self, r
: &Receiver
<T
>) -> Result
<T
, RecvError
> {
1232 r
as *const Receiver
<T
> as *const u8 == self.ptr
,
1233 "passed a receiver that wasn't selected",
1235 let res
= unsafe { channel::read(r, &mut self.token) }
;
1237 res
.map_err(|_
| RecvError
)
1241 impl fmt
::Debug
for SelectedOperation
<'_
> {
1242 fn fmt(&self, f
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
1243 f
.pad("SelectedOperation { .. }")
1247 impl Drop
for SelectedOperation
<'_
> {
1248 fn drop(&mut self) {
1249 panic
!("dropped `SelectedOperation` without completing the operation");