]> git.proxmox.com Git - rustc.git/blame - vendor/crossbeam-channel/src/select.rs
New upstream version 1.51.0+dfsg1
[rustc.git] / vendor / crossbeam-channel / src / select.rs
CommitLineData
1b1a35ee
XL
1//! Interface to the select mechanism.
2
3use std::fmt;
4use std::marker::PhantomData;
5use std::mem;
6use std::time::{Duration, Instant};
7
8use crossbeam_utils::Backoff;
9
5869c6ff
XL
10use crate::channel::{self, Receiver, Sender};
11use crate::context::Context;
12use crate::err::{ReadyTimeoutError, TryReadyError};
13use crate::err::{RecvError, SendError};
14use crate::err::{SelectTimeoutError, TrySelectError};
15use crate::flavors;
16use crate::utils;
1b1a35ee
XL
17
18/// Temporary data that gets initialized during select or a blocking operation, and is consumed by
19/// `read` or `write`.
20///
21/// Each field contains data associated with a specific channel flavor.
22#[derive(Debug, Default)]
23pub struct Token {
5869c6ff 24 pub at: flavors::at::AtToken,
1b1a35ee
XL
25 pub array: flavors::array::ArrayToken,
26 pub list: flavors::list::ListToken,
27 pub never: flavors::never::NeverToken,
28 pub tick: flavors::tick::TickToken,
29 pub zero: flavors::zero::ZeroToken,
30}
31
32/// Identifier associated with an operation by a specific thread on a specific channel.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub struct Operation(usize);
35
36impl Operation {
37 /// Creates an operation identifier from a mutable reference.
38 ///
39 /// This function essentially just turns the address of the reference into a number. The
40 /// reference should point to a variable that is specific to the thread and the operation,
41 /// and is alive for the entire duration of select or blocking operation.
42 #[inline]
43 pub fn hook<T>(r: &mut T) -> Operation {
44 let val = r as *mut T as usize;
45 // Make sure that the pointer address doesn't equal the numerical representation of
46 // `Selected::{Waiting, Aborted, Disconnected}`.
47 assert!(val > 2);
48 Operation(val)
49 }
50}
51
52/// Current state of a select or a blocking operation.
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum Selected {
55 /// Still waiting for an operation.
56 Waiting,
57
58 /// The attempt to block the current thread has been aborted.
59 Aborted,
60
61 /// An operation became ready because a channel is disconnected.
62 Disconnected,
63
64 /// An operation became ready because a message can be sent or received.
65 Operation(Operation),
66}
67
68impl From<usize> for Selected {
69 #[inline]
70 fn from(val: usize) -> Selected {
71 match val {
72 0 => Selected::Waiting,
73 1 => Selected::Aborted,
74 2 => Selected::Disconnected,
75 oper => Selected::Operation(Operation(oper)),
76 }
77 }
78}
79
80impl Into<usize> for Selected {
81 #[inline]
82 fn into(self) -> usize {
83 match self {
84 Selected::Waiting => 0,
85 Selected::Aborted => 1,
86 Selected::Disconnected => 2,
87 Selected::Operation(Operation(val)) => val,
88 }
89 }
90}
91
92/// A receiver or a sender that can participate in select.
93///
94/// This is a handle that assists select in executing an operation, registration, deciding on the
95/// appropriate deadline for blocking, etc.
96pub trait SelectHandle {
97 /// Attempts to select an operation and returns `true` on success.
98 fn try_select(&self, token: &mut Token) -> bool;
99
100 /// Returns a deadline for an operation, if there is one.
101 fn deadline(&self) -> Option<Instant>;
102
103 /// Registers an operation for execution and returns `true` if it is now ready.
104 fn register(&self, oper: Operation, cx: &Context) -> bool;
105
106 /// Unregisters an operation for execution.
107 fn unregister(&self, oper: Operation);
108
109 /// Attempts to select an operation the thread got woken up for and returns `true` on success.
110 fn accept(&self, token: &mut Token, cx: &Context) -> bool;
111
112 /// Returns `true` if an operation can be executed without blocking.
113 fn is_ready(&self) -> bool;
114
115 /// Registers an operation for readiness notification and returns `true` if it is now ready.
116 fn watch(&self, oper: Operation, cx: &Context) -> bool;
117
118 /// Unregisters an operation for readiness notification.
119 fn unwatch(&self, oper: Operation);
120}
121
5869c6ff 122impl<T: SelectHandle> SelectHandle for &T {
1b1a35ee
XL
123 fn try_select(&self, token: &mut Token) -> bool {
124 (**self).try_select(token)
125 }
126
127 fn deadline(&self) -> Option<Instant> {
128 (**self).deadline()
129 }
130
131 fn register(&self, oper: Operation, cx: &Context) -> bool {
132 (**self).register(oper, cx)
133 }
134
135 fn unregister(&self, oper: Operation) {
136 (**self).unregister(oper);
137 }
138
139 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
140 (**self).accept(token, cx)
141 }
142
143 fn is_ready(&self) -> bool {
144 (**self).is_ready()
145 }
146
147 fn watch(&self, oper: Operation, cx: &Context) -> bool {
148 (**self).watch(oper, cx)
149 }
150
151 fn unwatch(&self, oper: Operation) {
152 (**self).unwatch(oper)
153 }
154}
155
156/// Determines when a select operation should time out.
157#[derive(Clone, Copy, Eq, PartialEq)]
158enum Timeout {
159 /// No blocking.
160 Now,
161
162 /// Block forever.
163 Never,
164
165 /// Time out after the time instant.
166 At(Instant),
167}
168
169/// Runs until one of the operations is selected, potentially blocking the current thread.
170///
171/// Successful receive operations will have to be followed up by `channel::read()` and successful
172/// send operations by `channel::write()`.
173fn run_select(
174 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
175 timeout: Timeout,
176) -> Option<(Token, usize, *const u8)> {
177 if handles.is_empty() {
178 // Wait until the timeout and return.
179 match timeout {
180 Timeout::Now => return None,
181 Timeout::Never => {
182 utils::sleep_until(None);
183 unreachable!();
184 }
185 Timeout::At(when) => {
186 utils::sleep_until(Some(when));
187 return None;
188 }
189 }
190 }
191
192 // Shuffle the operations for fairness.
193 utils::shuffle(handles);
194
195 // Create a token, which serves as a temporary variable that gets initialized in this function
196 // and is later used by a call to `channel::read()` or `channel::write()` that completes the
197 // selected operation.
198 let mut token = Token::default();
199
200 // Try selecting one of the operations without blocking.
201 for &(handle, i, ptr) in handles.iter() {
202 if handle.try_select(&mut token) {
203 return Some((token, i, ptr));
204 }
205 }
206
207 loop {
208 // Prepare for blocking.
209 let res = Context::with(|cx| {
210 let mut sel = Selected::Waiting;
211 let mut registered_count = 0;
212 let mut index_ready = None;
213
214 if let Timeout::Now = timeout {
215 cx.try_select(Selected::Aborted).unwrap();
216 }
217
218 // Register all operations.
219 for (handle, i, _) in handles.iter_mut() {
220 registered_count += 1;
221
222 // If registration returns `false`, that means the operation has just become ready.
223 if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
224 // Try aborting select.
225 sel = match cx.try_select(Selected::Aborted) {
226 Ok(()) => {
227 index_ready = Some(*i);
228 Selected::Aborted
229 }
230 Err(s) => s,
231 };
232 break;
233 }
234
235 // If another thread has already selected one of the operations, stop registration.
236 sel = cx.selected();
237 if sel != Selected::Waiting {
238 break;
239 }
240 }
241
242 if sel == Selected::Waiting {
243 // Check with each operation for how long we're allowed to block, and compute the
244 // earliest deadline.
245 let mut deadline: Option<Instant> = match timeout {
246 Timeout::Now => return None,
247 Timeout::Never => None,
248 Timeout::At(when) => Some(when),
249 };
250 for &(handle, _, _) in handles.iter() {
251 if let Some(x) = handle.deadline() {
252 deadline = deadline.map(|y| x.min(y)).or(Some(x));
253 }
254 }
255
256 // Block the current thread.
257 sel = cx.wait_until(deadline);
258 }
259
260 // Unregister all registered operations.
261 for (handle, _, _) in handles.iter_mut().take(registered_count) {
262 handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
263 }
264
265 match sel {
266 Selected::Waiting => unreachable!(),
267 Selected::Aborted => {
268 // If an operation became ready during registration, try selecting it.
269 if let Some(index_ready) = index_ready {
270 for &(handle, i, ptr) in handles.iter() {
271 if i == index_ready && handle.try_select(&mut token) {
272 return Some((i, ptr));
273 }
274 }
275 }
276 }
277 Selected::Disconnected => {}
278 Selected::Operation(_) => {
279 // Find the selected operation.
280 for (handle, i, ptr) in handles.iter_mut() {
281 // Is this the selected operation?
282 if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
283 {
284 // Try selecting this operation.
285 if handle.accept(&mut token, cx) {
286 return Some((*i, *ptr));
287 }
288 }
289 }
290 }
291 }
292
293 None
294 });
295
296 // Return if an operation was selected.
297 if let Some((i, ptr)) = res {
298 return Some((token, i, ptr));
299 }
300
301 // Try selecting one of the operations without blocking.
302 for &(handle, i, ptr) in handles.iter() {
303 if handle.try_select(&mut token) {
304 return Some((token, i, ptr));
305 }
306 }
307
308 match timeout {
309 Timeout::Now => return None,
310 Timeout::Never => {}
311 Timeout::At(when) => {
312 if Instant::now() >= when {
313 return None;
314 }
315 }
316 }
317 }
318}
319
320/// Runs until one of the operations becomes ready, potentially blocking the current thread.
321fn run_ready(
322 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
323 timeout: Timeout,
324) -> Option<usize> {
325 if handles.is_empty() {
326 // Wait until the timeout and return.
327 match timeout {
328 Timeout::Now => return None,
329 Timeout::Never => {
330 utils::sleep_until(None);
331 unreachable!();
332 }
333 Timeout::At(when) => {
334 utils::sleep_until(Some(when));
335 return None;
336 }
337 }
338 }
339
340 // Shuffle the operations for fairness.
341 utils::shuffle(handles);
342
343 loop {
344 let backoff = Backoff::new();
345 loop {
346 // Check operations for readiness.
347 for &(handle, i, _) in handles.iter() {
348 if handle.is_ready() {
349 return Some(i);
350 }
351 }
352
353 if backoff.is_completed() {
354 break;
355 } else {
356 backoff.snooze();
357 }
358 }
359
360 // Check for timeout.
361 match timeout {
362 Timeout::Now => return None,
363 Timeout::Never => {}
364 Timeout::At(when) => {
365 if Instant::now() >= when {
366 return None;
367 }
368 }
369 }
370
371 // Prepare for blocking.
372 let res = Context::with(|cx| {
373 let mut sel = Selected::Waiting;
374 let mut registered_count = 0;
375
376 // Begin watching all operations.
377 for (handle, _, _) in handles.iter_mut() {
378 registered_count += 1;
379 let oper = Operation::hook::<&dyn SelectHandle>(handle);
380
381 // If registration returns `false`, that means the operation has just become ready.
382 if handle.watch(oper, cx) {
383 sel = match cx.try_select(Selected::Operation(oper)) {
384 Ok(()) => Selected::Operation(oper),
385 Err(s) => s,
386 };
387 break;
388 }
389
390 // If another thread has already chosen one of the operations, stop registration.
391 sel = cx.selected();
392 if sel != Selected::Waiting {
393 break;
394 }
395 }
396
397 if sel == Selected::Waiting {
398 // Check with each operation for how long we're allowed to block, and compute the
399 // earliest deadline.
400 let mut deadline: Option<Instant> = match timeout {
401 Timeout::Now => unreachable!(),
402 Timeout::Never => None,
403 Timeout::At(when) => Some(when),
404 };
405 for &(handle, _, _) in handles.iter() {
406 if let Some(x) = handle.deadline() {
407 deadline = deadline.map(|y| x.min(y)).or(Some(x));
408 }
409 }
410
411 // Block the current thread.
412 sel = cx.wait_until(deadline);
413 }
414
415 // Unwatch all operations.
416 for (handle, _, _) in handles.iter_mut().take(registered_count) {
417 handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
418 }
419
420 match sel {
421 Selected::Waiting => unreachable!(),
422 Selected::Aborted => {}
423 Selected::Disconnected => {}
424 Selected::Operation(_) => {
425 for (handle, i, _) in handles.iter_mut() {
426 let oper = Operation::hook::<&dyn SelectHandle>(handle);
427 if sel == Selected::Operation(oper) {
428 return Some(*i);
429 }
430 }
431 }
432 }
433
434 None
435 });
436
437 // Return if an operation became ready.
438 if res.is_some() {
439 return res;
440 }
441 }
442}
443
444/// Attempts to select one of the operations without blocking.
445#[inline]
446pub fn try_select<'a>(
447 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
448) -> Result<SelectedOperation<'a>, TrySelectError> {
449 match run_select(handles, Timeout::Now) {
450 None => Err(TrySelectError),
451 Some((token, index, ptr)) => Ok(SelectedOperation {
452 token,
453 index,
454 ptr,
455 _marker: PhantomData,
456 }),
457 }
458}
459
460/// Blocks until one of the operations becomes ready and selects it.
461#[inline]
462pub fn select<'a>(
463 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
464) -> SelectedOperation<'a> {
465 if handles.is_empty() {
466 panic!("no operations have been added to `Select`");
467 }
468
469 let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
470 SelectedOperation {
471 token,
472 index,
473 ptr,
474 _marker: PhantomData,
475 }
476}
477
478/// Blocks for a limited time until one of the operations becomes ready and selects it.
479#[inline]
480pub fn select_timeout<'a>(
481 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
482 timeout: Duration,
483) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
5869c6ff
XL
484 select_deadline(handles, Instant::now() + timeout)
485}
1b1a35ee 486
5869c6ff
XL
487/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
488#[inline]
489pub fn select_deadline<'a>(
490 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
491 deadline: Instant,
492) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
493 match run_select(handles, Timeout::At(deadline)) {
1b1a35ee
XL
494 None => Err(SelectTimeoutError),
495 Some((token, index, ptr)) => Ok(SelectedOperation {
496 token,
497 index,
498 ptr,
499 _marker: PhantomData,
500 }),
501 }
502}
503
504/// Selects from a set of channel operations.
505///
506/// `Select` allows you to define a set of channel operations, wait until any one of them becomes
507/// ready, and finally execute it. If multiple operations are ready at the same time, a random one
508/// among them is selected.
509///
510/// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
511/// when it will simply return an error because the channel is disconnected.
512///
513/// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
514/// dynamically created list of channel operations.
515///
516/// Once a list of operations has been built with `Select`, there are two different ways of
517/// proceeding:
518///
519/// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
520/// the returned selected operation has already begun and **must** be completed. If we don't
521/// complete it, a panic will occur.
522///
523/// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
524/// successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
525/// possible for another thread to make the operation not ready just before we try executing it,
526/// so it's wise to use a retry loop. However, note that these methods might return with success
527/// spuriously, so it's a good idea to always double check if the operation is really ready.
528///
529/// # Examples
530///
531/// Use [`select`] to receive a message from a list of receivers:
532///
533/// ```
534/// use crossbeam_channel::{Receiver, RecvError, Select};
535///
536/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
537/// // Build a list of operations.
538/// let mut sel = Select::new();
539/// for r in rs {
540/// sel.recv(r);
541/// }
542///
543/// // Complete the selected operation.
544/// let oper = sel.select();
545/// let index = oper.index();
546/// oper.recv(&rs[index])
547/// }
548/// ```
549///
550/// Use [`ready`] to receive a message from a list of receivers:
551///
552/// ```
553/// use crossbeam_channel::{Receiver, RecvError, Select};
554///
555/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
556/// // Build a list of operations.
557/// let mut sel = Select::new();
558/// for r in rs {
559/// sel.recv(r);
560/// }
561///
562/// loop {
563/// // Wait until a receive operation becomes ready and try executing it.
564/// let index = sel.ready();
565/// let res = rs[index].try_recv();
566///
567/// // If the operation turns out not to be ready, retry.
568/// if let Err(e) = res {
569/// if e.is_empty() {
570/// continue;
571/// }
572/// }
573///
574/// // Success!
575/// return res.map_err(|_| RecvError);
576/// }
577/// }
578/// ```
579///
5869c6ff
XL
580/// [`try_select`]: Select::try_select
581/// [`select`]: Select::select
582/// [`select_timeout`]: Select::select_timeout
583/// [`try_ready`]: Select::try_ready
584/// [`ready`]: Select::ready
585/// [`ready_timeout`]: Select::ready_timeout
1b1a35ee
XL
586pub struct Select<'a> {
587 /// A list of senders and receivers participating in selection.
588 handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
589
590 /// The next index to assign to an operation.
591 next_index: usize,
592}
593
5869c6ff
XL
594unsafe impl Send for Select<'_> {}
595unsafe impl Sync for Select<'_> {}
1b1a35ee
XL
596
597impl<'a> Select<'a> {
598 /// Creates an empty list of channel operations for selection.
599 ///
600 /// # Examples
601 ///
602 /// ```
603 /// use crossbeam_channel::Select;
604 ///
605 /// let mut sel = Select::new();
606 ///
607 /// // The list of operations is empty, which means no operation can be selected.
608 /// assert!(sel.try_select().is_err());
609 /// ```
610 pub fn new() -> Select<'a> {
611 Select {
612 handles: Vec::with_capacity(4),
613 next_index: 0,
614 }
615 }
616
617 /// Adds a send operation.
618 ///
619 /// Returns the index of the added operation.
620 ///
621 /// # Examples
622 ///
623 /// ```
1b1a35ee
XL
624 /// use crossbeam_channel::{unbounded, Select};
625 ///
626 /// let (s, r) = unbounded::<i32>();
627 ///
628 /// let mut sel = Select::new();
629 /// let index = sel.send(&s);
630 /// ```
631 pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
632 let i = self.next_index;
633 let ptr = s as *const Sender<_> as *const u8;
634 self.handles.push((s, i, ptr));
635 self.next_index += 1;
636 i
637 }
638
639 /// Adds a receive operation.
640 ///
641 /// Returns the index of the added operation.
642 ///
643 /// # Examples
644 ///
645 /// ```
1b1a35ee
XL
646 /// use crossbeam_channel::{unbounded, Select};
647 ///
648 /// let (s, r) = unbounded::<i32>();
649 ///
650 /// let mut sel = Select::new();
651 /// let index = sel.recv(&r);
652 /// ```
653 pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
654 let i = self.next_index;
655 let ptr = r as *const Receiver<_> as *const u8;
656 self.handles.push((r, i, ptr));
657 self.next_index += 1;
658 i
659 }
660
661 /// Removes a previously added operation.
662 ///
663 /// This is useful when an operation is selected because the channel got disconnected and we
664 /// want to try again to select a different operation instead.
665 ///
666 /// If new operations are added after removing some, the indices of removed operations will not
667 /// be reused.
668 ///
669 /// # Panics
670 ///
671 /// An attempt to remove a non-existing or already removed operation will panic.
672 ///
673 /// # Examples
674 ///
675 /// ```
1b1a35ee
XL
676 /// use crossbeam_channel::{unbounded, Select};
677 ///
678 /// let (s1, r1) = unbounded::<i32>();
679 /// let (_, r2) = unbounded::<i32>();
680 ///
681 /// let mut sel = Select::new();
682 /// let oper1 = sel.recv(&r1);
683 /// let oper2 = sel.recv(&r2);
684 ///
685 /// // Both operations are initially ready, so a random one will be executed.
686 /// let oper = sel.select();
687 /// assert_eq!(oper.index(), oper2);
688 /// assert!(oper.recv(&r2).is_err());
689 /// sel.remove(oper2);
690 ///
691 /// s1.send(10).unwrap();
692 ///
693 /// let oper = sel.select();
694 /// assert_eq!(oper.index(), oper1);
695 /// assert_eq!(oper.recv(&r1), Ok(10));
696 /// ```
697 pub fn remove(&mut self, index: usize) {
698 assert!(
699 index < self.next_index,
700 "index out of bounds; {} >= {}",
701 index,
702 self.next_index,
703 );
704
705 let i = self
706 .handles
707 .iter()
708 .enumerate()
709 .find(|(_, (_, i, _))| *i == index)
710 .expect("no operation with this index")
711 .0;
712
713 self.handles.swap_remove(i);
714 }
715
716 /// Attempts to select one of the operations without blocking.
717 ///
718 /// If an operation is ready, it is selected and returned. If multiple operations are ready at
719 /// the same time, a random one among them is selected. If none of the operations are ready, an
720 /// error is returned.
721 ///
722 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
723 /// even when it will simply return an error because the channel is disconnected.
724 ///
725 /// The selected operation must be completed with [`SelectedOperation::send`]
726 /// or [`SelectedOperation::recv`].
727 ///
1b1a35ee
XL
728 /// # Examples
729 ///
730 /// ```
1b1a35ee
XL
731 /// use crossbeam_channel::{unbounded, Select};
732 ///
733 /// let (s1, r1) = unbounded();
734 /// let (s2, r2) = unbounded();
735 ///
736 /// s1.send(10).unwrap();
737 /// s2.send(20).unwrap();
738 ///
739 /// let mut sel = Select::new();
740 /// let oper1 = sel.recv(&r1);
741 /// let oper2 = sel.recv(&r2);
742 ///
743 /// // Both operations are initially ready, so a random one will be executed.
744 /// let oper = sel.try_select();
745 /// match oper {
746 /// Err(_) => panic!("both operations should be ready"),
747 /// Ok(oper) => match oper.index() {
748 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
749 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
750 /// _ => unreachable!(),
751 /// }
752 /// }
753 /// ```
754 pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
755 try_select(&mut self.handles)
756 }
757
758 /// Blocks until one of the operations becomes ready and selects it.
759 ///
760 /// Once an operation becomes ready, it is selected and returned. If multiple operations are
761 /// ready at the same time, a random one among them is selected.
762 ///
763 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
764 /// even when it will simply return an error because the channel is disconnected.
765 ///
766 /// The selected operation must be completed with [`SelectedOperation::send`]
767 /// or [`SelectedOperation::recv`].
768 ///
1b1a35ee
XL
769 /// # Panics
770 ///
771 /// Panics if no operations have been added to `Select`.
772 ///
773 /// # Examples
774 ///
775 /// ```
776 /// use std::thread;
777 /// use std::time::Duration;
778 /// use crossbeam_channel::{unbounded, Select};
779 ///
780 /// let (s1, r1) = unbounded();
781 /// let (s2, r2) = unbounded();
782 ///
783 /// thread::spawn(move || {
784 /// thread::sleep(Duration::from_secs(1));
785 /// s1.send(10).unwrap();
786 /// });
787 /// thread::spawn(move || s2.send(20).unwrap());
788 ///
789 /// let mut sel = Select::new();
790 /// let oper1 = sel.recv(&r1);
791 /// let oper2 = sel.recv(&r2);
792 ///
793 /// // The second operation will be selected because it becomes ready first.
794 /// let oper = sel.select();
795 /// match oper.index() {
796 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
797 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
798 /// _ => unreachable!(),
799 /// }
800 /// ```
801 pub fn select(&mut self) -> SelectedOperation<'a> {
802 select(&mut self.handles)
803 }
804
805 /// Blocks for a limited time until one of the operations becomes ready and selects it.
806 ///
807 /// If an operation becomes ready, it is selected and returned. If multiple operations are
808 /// ready at the same time, a random one among them is selected. If none of the operations
809 /// become ready for the specified duration, an error is returned.
810 ///
811 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
812 /// even when it will simply return an error because the channel is disconnected.
813 ///
814 /// The selected operation must be completed with [`SelectedOperation::send`]
815 /// or [`SelectedOperation::recv`].
816 ///
1b1a35ee
XL
817 /// # Examples
818 ///
819 /// ```
820 /// use std::thread;
821 /// use std::time::Duration;
822 /// use crossbeam_channel::{unbounded, Select};
823 ///
824 /// let (s1, r1) = unbounded();
825 /// let (s2, r2) = unbounded();
826 ///
827 /// thread::spawn(move || {
828 /// thread::sleep(Duration::from_secs(1));
829 /// s1.send(10).unwrap();
830 /// });
831 /// thread::spawn(move || s2.send(20).unwrap());
832 ///
833 /// let mut sel = Select::new();
834 /// let oper1 = sel.recv(&r1);
835 /// let oper2 = sel.recv(&r2);
836 ///
837 /// // The second operation will be selected because it becomes ready first.
838 /// let oper = sel.select_timeout(Duration::from_millis(500));
839 /// match oper {
840 /// Err(_) => panic!("should not have timed out"),
841 /// Ok(oper) => match oper.index() {
842 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
843 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
844 /// _ => unreachable!(),
845 /// }
846 /// }
847 /// ```
848 pub fn select_timeout(
849 &mut self,
850 timeout: Duration,
851 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
852 select_timeout(&mut self.handles, timeout)
853 }
854
5869c6ff
XL
855 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
856 ///
857 /// If an operation becomes ready, it is selected and returned. If multiple operations are
858 /// ready at the same time, a random one among them is selected. If none of the operations
859 /// become ready before the given deadline, an error is returned.
860 ///
861 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
862 /// even when it will simply return an error because the channel is disconnected.
863 ///
864 /// The selected operation must be completed with [`SelectedOperation::send`]
865 /// or [`SelectedOperation::recv`].
866 ///
867 /// [`SelectedOperation::send`]: struct.SelectedOperation.html#method.send
868 /// [`SelectedOperation::recv`]: struct.SelectedOperation.html#method.recv
869 ///
870 /// # Examples
871 ///
872 /// ```
873 /// use std::thread;
874 /// use std::time::{Instant, Duration};
875 /// use crossbeam_channel::{unbounded, Select};
876 ///
877 /// let (s1, r1) = unbounded();
878 /// let (s2, r2) = unbounded();
879 ///
880 /// thread::spawn(move || {
881 /// thread::sleep(Duration::from_secs(1));
882 /// s1.send(10).unwrap();
883 /// });
884 /// thread::spawn(move || s2.send(20).unwrap());
885 ///
886 /// let mut sel = Select::new();
887 /// let oper1 = sel.recv(&r1);
888 /// let oper2 = sel.recv(&r2);
889 ///
890 /// let deadline = Instant::now() + Duration::from_millis(500);
891 ///
892 /// // The second operation will be selected because it becomes ready first.
893 /// let oper = sel.select_deadline(deadline);
894 /// match oper {
895 /// Err(_) => panic!("should not have timed out"),
896 /// Ok(oper) => match oper.index() {
897 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
898 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
899 /// _ => unreachable!(),
900 /// }
901 /// }
902 /// ```
903 pub fn select_deadline(
904 &mut self,
905 deadline: Instant,
906 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
907 select_deadline(&mut self.handles, deadline)
908 }
909
1b1a35ee
XL
910 /// Attempts to find a ready operation without blocking.
911 ///
912 /// If an operation is ready, its index is returned. If multiple operations are ready at the
913 /// same time, a random one among them is chosen. If none of the operations are ready, an error
914 /// is returned.
915 ///
916 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
917 /// even when it will simply return an error because the channel is disconnected.
918 ///
919 /// Note that this method might return with success spuriously, so it's a good idea to always
920 /// double check if the operation is really ready.
921 ///
922 /// # Examples
923 ///
924 /// ```
1b1a35ee
XL
925 /// use crossbeam_channel::{unbounded, Select};
926 ///
927 /// let (s1, r1) = unbounded();
928 /// let (s2, r2) = unbounded();
929 ///
930 /// s1.send(10).unwrap();
931 /// s2.send(20).unwrap();
932 ///
933 /// let mut sel = Select::new();
934 /// let oper1 = sel.recv(&r1);
935 /// let oper2 = sel.recv(&r2);
936 ///
937 /// // Both operations are initially ready, so a random one will be chosen.
938 /// match sel.try_ready() {
939 /// Err(_) => panic!("both operations should be ready"),
940 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
941 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
942 /// Ok(_) => unreachable!(),
943 /// }
944 /// ```
945 pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
946 match run_ready(&mut self.handles, Timeout::Now) {
947 None => Err(TryReadyError),
948 Some(index) => Ok(index),
949 }
950 }
951
952 /// Blocks until one of the operations becomes ready.
953 ///
954 /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
955 /// the same time, a random one among them is chosen.
956 ///
957 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
958 /// even when it will simply return an error because the channel is disconnected.
959 ///
960 /// Note that this method might return with success spuriously, so it's a good idea to always
961 /// double check if the operation is really ready.
962 ///
963 /// # Panics
964 ///
965 /// Panics if no operations have been added to `Select`.
966 ///
967 /// # Examples
968 ///
969 /// ```
970 /// use std::thread;
971 /// use std::time::Duration;
972 /// use crossbeam_channel::{unbounded, Select};
973 ///
974 /// let (s1, r1) = unbounded();
975 /// let (s2, r2) = unbounded();
976 ///
977 /// thread::spawn(move || {
978 /// thread::sleep(Duration::from_secs(1));
979 /// s1.send(10).unwrap();
980 /// });
981 /// thread::spawn(move || s2.send(20).unwrap());
982 ///
983 /// let mut sel = Select::new();
984 /// let oper1 = sel.recv(&r1);
985 /// let oper2 = sel.recv(&r2);
986 ///
987 /// // The second operation will be selected because it becomes ready first.
988 /// match sel.ready() {
989 /// i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
990 /// i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
991 /// _ => unreachable!(),
992 /// }
993 /// ```
994 pub fn ready(&mut self) -> usize {
995 if self.handles.is_empty() {
996 panic!("no operations have been added to `Select`");
997 }
998
999 run_ready(&mut self.handles, Timeout::Never).unwrap()
1000 }
1001
1002 /// Blocks for a limited time until one of the operations becomes ready.
1003 ///
1004 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1005 /// the same time, a random one among them is chosen. If none of the operations become ready
1006 /// for the specified duration, an error is returned.
1007 ///
1008 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1009 /// even when it will simply return an error because the channel is disconnected.
1010 ///
1011 /// Note that this method might return with success spuriously, so it's a good idea to double
1012 /// check if the operation is really ready.
1013 ///
1014 /// # Examples
1015 ///
1016 /// ```
1017 /// use std::thread;
1018 /// use std::time::Duration;
1019 /// use crossbeam_channel::{unbounded, Select};
1020 ///
1021 /// let (s1, r1) = unbounded();
1022 /// let (s2, r2) = unbounded();
1023 ///
1024 /// thread::spawn(move || {
1025 /// thread::sleep(Duration::from_secs(1));
1026 /// s1.send(10).unwrap();
1027 /// });
1028 /// thread::spawn(move || s2.send(20).unwrap());
1029 ///
1030 /// let mut sel = Select::new();
1031 /// let oper1 = sel.recv(&r1);
1032 /// let oper2 = sel.recv(&r2);
1033 ///
1034 /// // The second operation will be selected because it becomes ready first.
1035 /// match sel.ready_timeout(Duration::from_millis(500)) {
1036 /// Err(_) => panic!("should not have timed out"),
1037 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1038 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1039 /// Ok(_) => unreachable!(),
1040 /// }
1041 /// ```
1042 pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
5869c6ff
XL
1043 self.ready_deadline(Instant::now() + timeout)
1044 }
1b1a35ee 1045
5869c6ff
XL
1046 /// Blocks until a given deadline, or until one of the operations becomes ready.
1047 ///
1048 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1049 /// the same time, a random one among them is chosen. If none of the operations become ready
1050 /// before the deadline, an error is returned.
1051 ///
1052 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1053 /// even when it will simply return an error because the channel is disconnected.
1054 ///
1055 /// Note that this method might return with success spuriously, so it's a good idea to double
1056 /// check if the operation is really ready.
1057 ///
1058 /// # Examples
1059 ///
1060 /// ```
1061 /// use std::thread;
1062 /// use std::time::{Duration, Instant};
1063 /// use crossbeam_channel::{unbounded, Select};
1064 ///
1065 /// let deadline = Instant::now() + Duration::from_millis(500);
1066 ///
1067 /// let (s1, r1) = unbounded();
1068 /// let (s2, r2) = unbounded();
1069 ///
1070 /// thread::spawn(move || {
1071 /// thread::sleep(Duration::from_secs(1));
1072 /// s1.send(10).unwrap();
1073 /// });
1074 /// thread::spawn(move || s2.send(20).unwrap());
1075 ///
1076 /// let mut sel = Select::new();
1077 /// let oper1 = sel.recv(&r1);
1078 /// let oper2 = sel.recv(&r2);
1079 ///
1080 /// // The second operation will be selected because it becomes ready first.
1081 /// match sel.ready_deadline(deadline) {
1082 /// Err(_) => panic!("should not have timed out"),
1083 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1084 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1085 /// Ok(_) => unreachable!(),
1086 /// }
1087 /// ```
1088 pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1089 match run_ready(&mut self.handles, Timeout::At(deadline)) {
1b1a35ee
XL
1090 None => Err(ReadyTimeoutError),
1091 Some(index) => Ok(index),
1092 }
1093 }
1094}
1095
1096impl<'a> Clone for Select<'a> {
1097 fn clone(&self) -> Select<'a> {
1098 Select {
1099 handles: self.handles.clone(),
1100 next_index: self.next_index,
1101 }
1102 }
1103}
1104
1105impl<'a> Default for Select<'a> {
1106 fn default() -> Select<'a> {
1107 Select::new()
1108 }
1109}
1110
5869c6ff
XL
1111impl fmt::Debug for Select<'_> {
1112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1b1a35ee
XL
1113 f.pad("Select { .. }")
1114 }
1115}
1116
1117/// A selected operation that needs to be completed.
1118///
1119/// To complete the operation, call [`send`] or [`recv`].
1120///
1121/// # Panics
1122///
1123/// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1124/// `SelectedOperation` is dropped without completion, a panic occurs.
1125///
5869c6ff
XL
1126/// [`send`]: SelectedOperation::send
1127/// [`recv`]: SelectedOperation::recv
1b1a35ee
XL
1128#[must_use]
1129pub struct SelectedOperation<'a> {
1130 /// Token needed to complete the operation.
1131 token: Token,
1132
1133 /// The index of the selected operation.
1134 index: usize,
1135
1136 /// The address of the selected `Sender` or `Receiver`.
1137 ptr: *const u8,
1138
1139 /// Indicates that `Sender`s and `Receiver`s are borrowed.
1140 _marker: PhantomData<&'a ()>,
1141}
1142
5869c6ff 1143impl SelectedOperation<'_> {
1b1a35ee
XL
1144 /// Returns the index of the selected operation.
1145 ///
1146 /// # Examples
1147 ///
1148 /// ```
1149 /// use crossbeam_channel::{bounded, Select};
1150 ///
1151 /// let (s1, r1) = bounded::<()>(0);
1152 /// let (s2, r2) = bounded::<()>(0);
1153 /// let (s3, r3) = bounded::<()>(1);
1154 ///
1155 /// let mut sel = Select::new();
1156 /// let oper1 = sel.send(&s1);
1157 /// let oper2 = sel.recv(&r2);
1158 /// let oper3 = sel.send(&s3);
1159 ///
1160 /// // Only the last operation is ready.
1161 /// let oper = sel.select();
1162 /// assert_eq!(oper.index(), 2);
1163 /// assert_eq!(oper.index(), oper3);
1164 ///
1165 /// // Complete the operation.
1166 /// oper.send(&s3, ()).unwrap();
1167 /// ```
1168 pub fn index(&self) -> usize {
1169 self.index
1170 }
1171
1172 /// Completes the send operation.
1173 ///
1174 /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1175 /// when the operation was added.
1176 ///
1177 /// # Panics
1178 ///
1179 /// Panics if an incorrect [`Sender`] reference is passed.
1180 ///
1181 /// # Examples
1182 ///
1183 /// ```
1184 /// use crossbeam_channel::{bounded, Select, SendError};
1185 ///
1186 /// let (s, r) = bounded::<i32>(0);
1187 /// drop(r);
1188 ///
1189 /// let mut sel = Select::new();
1190 /// let oper1 = sel.send(&s);
1191 ///
1192 /// let oper = sel.select();
1193 /// assert_eq!(oper.index(), oper1);
1194 /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1195 /// ```
1b1a35ee
XL
1196 pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1197 assert!(
1198 s as *const Sender<T> as *const u8 == self.ptr,
1199 "passed a sender that wasn't selected",
1200 );
1201 let res = unsafe { channel::write(s, &mut self.token, msg) };
1202 mem::forget(self);
1203 res.map_err(SendError)
1204 }
1205
1206 /// Completes the receive operation.
1207 ///
1208 /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1209 /// when the operation was added.
1210 ///
1211 /// # Panics
1212 ///
1213 /// Panics if an incorrect [`Receiver`] reference is passed.
1214 ///
1215 /// # Examples
1216 ///
1217 /// ```
1218 /// use crossbeam_channel::{bounded, Select, RecvError};
1219 ///
1220 /// let (s, r) = bounded::<i32>(0);
1221 /// drop(s);
1222 ///
1223 /// let mut sel = Select::new();
1224 /// let oper1 = sel.recv(&r);
1225 ///
1226 /// let oper = sel.select();
1227 /// assert_eq!(oper.index(), oper1);
1228 /// assert_eq!(oper.recv(&r), Err(RecvError));
1229 /// ```
1b1a35ee
XL
1230 pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1231 assert!(
1232 r as *const Receiver<T> as *const u8 == self.ptr,
1233 "passed a receiver that wasn't selected",
1234 );
1235 let res = unsafe { channel::read(r, &mut self.token) };
1236 mem::forget(self);
1237 res.map_err(|_| RecvError)
1238 }
1239}
1240
5869c6ff
XL
1241impl fmt::Debug for SelectedOperation<'_> {
1242 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1b1a35ee
XL
1243 f.pad("SelectedOperation { .. }")
1244 }
1245}
1246
5869c6ff 1247impl Drop for SelectedOperation<'_> {
1b1a35ee
XL
1248 fn drop(&mut self) {
1249 panic!("dropped `SelectedOperation` without completing the operation");
1250 }
1251}