]> git.proxmox.com Git - rustc.git/blob - vendor/crossbeam-channel/src/select.rs
New upstream version 1.51.0+dfsg1
[rustc.git] / vendor / crossbeam-channel / src / select.rs
1 //! Interface to the select mechanism.
2
3 use std::fmt;
4 use std::marker::PhantomData;
5 use std::mem;
6 use std::time::{Duration, Instant};
7
8 use crossbeam_utils::Backoff;
9
10 use crate::channel::{self, Receiver, Sender};
11 use crate::context::Context;
12 use crate::err::{ReadyTimeoutError, TryReadyError};
13 use crate::err::{RecvError, SendError};
14 use crate::err::{SelectTimeoutError, TrySelectError};
15 use crate::flavors;
16 use crate::utils;
17
18 /// Temporary data that gets initialized during select or a blocking operation, and is consumed by
19 /// `read` or `write`.
20 ///
21 /// Each field contains data associated with a specific channel flavor.
22 #[derive(Debug, Default)]
23 pub struct Token {
24 pub at: flavors::at::AtToken,
25 pub array: flavors::array::ArrayToken,
26 pub list: flavors::list::ListToken,
27 pub never: flavors::never::NeverToken,
28 pub tick: flavors::tick::TickToken,
29 pub zero: flavors::zero::ZeroToken,
30 }
31
32 /// Identifier associated with an operation by a specific thread on a specific channel.
33 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
34 pub struct Operation(usize);
35
36 impl Operation {
37 /// Creates an operation identifier from a mutable reference.
38 ///
39 /// This function essentially just turns the address of the reference into a number. The
40 /// reference should point to a variable that is specific to the thread and the operation,
41 /// and is alive for the entire duration of select or blocking operation.
42 #[inline]
43 pub fn hook<T>(r: &mut T) -> Operation {
44 let val = r as *mut T as usize;
45 // Make sure that the pointer address doesn't equal the numerical representation of
46 // `Selected::{Waiting, Aborted, Disconnected}`.
47 assert!(val > 2);
48 Operation(val)
49 }
50 }
51
52 /// Current state of a select or a blocking operation.
53 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
54 pub enum Selected {
55 /// Still waiting for an operation.
56 Waiting,
57
58 /// The attempt to block the current thread has been aborted.
59 Aborted,
60
61 /// An operation became ready because a channel is disconnected.
62 Disconnected,
63
64 /// An operation became ready because a message can be sent or received.
65 Operation(Operation),
66 }
67
68 impl From<usize> for Selected {
69 #[inline]
70 fn from(val: usize) -> Selected {
71 match val {
72 0 => Selected::Waiting,
73 1 => Selected::Aborted,
74 2 => Selected::Disconnected,
75 oper => Selected::Operation(Operation(oper)),
76 }
77 }
78 }
79
80 impl Into<usize> for Selected {
81 #[inline]
82 fn into(self) -> usize {
83 match self {
84 Selected::Waiting => 0,
85 Selected::Aborted => 1,
86 Selected::Disconnected => 2,
87 Selected::Operation(Operation(val)) => val,
88 }
89 }
90 }
91
92 /// A receiver or a sender that can participate in select.
93 ///
94 /// This is a handle that assists select in executing an operation, registration, deciding on the
95 /// appropriate deadline for blocking, etc.
96 pub trait SelectHandle {
97 /// Attempts to select an operation and returns `true` on success.
98 fn try_select(&self, token: &mut Token) -> bool;
99
100 /// Returns a deadline for an operation, if there is one.
101 fn deadline(&self) -> Option<Instant>;
102
103 /// Registers an operation for execution and returns `true` if it is now ready.
104 fn register(&self, oper: Operation, cx: &Context) -> bool;
105
106 /// Unregisters an operation for execution.
107 fn unregister(&self, oper: Operation);
108
109 /// Attempts to select an operation the thread got woken up for and returns `true` on success.
110 fn accept(&self, token: &mut Token, cx: &Context) -> bool;
111
112 /// Returns `true` if an operation can be executed without blocking.
113 fn is_ready(&self) -> bool;
114
115 /// Registers an operation for readiness notification and returns `true` if it is now ready.
116 fn watch(&self, oper: Operation, cx: &Context) -> bool;
117
118 /// Unregisters an operation for readiness notification.
119 fn unwatch(&self, oper: Operation);
120 }
121
122 impl<T: SelectHandle> SelectHandle for &T {
123 fn try_select(&self, token: &mut Token) -> bool {
124 (**self).try_select(token)
125 }
126
127 fn deadline(&self) -> Option<Instant> {
128 (**self).deadline()
129 }
130
131 fn register(&self, oper: Operation, cx: &Context) -> bool {
132 (**self).register(oper, cx)
133 }
134
135 fn unregister(&self, oper: Operation) {
136 (**self).unregister(oper);
137 }
138
139 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
140 (**self).accept(token, cx)
141 }
142
143 fn is_ready(&self) -> bool {
144 (**self).is_ready()
145 }
146
147 fn watch(&self, oper: Operation, cx: &Context) -> bool {
148 (**self).watch(oper, cx)
149 }
150
151 fn unwatch(&self, oper: Operation) {
152 (**self).unwatch(oper)
153 }
154 }
155
156 /// Determines when a select operation should time out.
157 #[derive(Clone, Copy, Eq, PartialEq)]
158 enum Timeout {
159 /// No blocking.
160 Now,
161
162 /// Block forever.
163 Never,
164
165 /// Time out after the time instant.
166 At(Instant),
167 }
168
169 /// Runs until one of the operations is selected, potentially blocking the current thread.
170 ///
171 /// Successful receive operations will have to be followed up by `channel::read()` and successful
172 /// send operations by `channel::write()`.
173 fn run_select(
174 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
175 timeout: Timeout,
176 ) -> Option<(Token, usize, *const u8)> {
177 if handles.is_empty() {
178 // Wait until the timeout and return.
179 match timeout {
180 Timeout::Now => return None,
181 Timeout::Never => {
182 utils::sleep_until(None);
183 unreachable!();
184 }
185 Timeout::At(when) => {
186 utils::sleep_until(Some(when));
187 return None;
188 }
189 }
190 }
191
192 // Shuffle the operations for fairness.
193 utils::shuffle(handles);
194
195 // Create a token, which serves as a temporary variable that gets initialized in this function
196 // and is later used by a call to `channel::read()` or `channel::write()` that completes the
197 // selected operation.
198 let mut token = Token::default();
199
200 // Try selecting one of the operations without blocking.
201 for &(handle, i, ptr) in handles.iter() {
202 if handle.try_select(&mut token) {
203 return Some((token, i, ptr));
204 }
205 }
206
207 loop {
208 // Prepare for blocking.
209 let res = Context::with(|cx| {
210 let mut sel = Selected::Waiting;
211 let mut registered_count = 0;
212 let mut index_ready = None;
213
214 if let Timeout::Now = timeout {
215 cx.try_select(Selected::Aborted).unwrap();
216 }
217
218 // Register all operations.
219 for (handle, i, _) in handles.iter_mut() {
220 registered_count += 1;
221
222 // If registration returns `false`, that means the operation has just become ready.
223 if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
224 // Try aborting select.
225 sel = match cx.try_select(Selected::Aborted) {
226 Ok(()) => {
227 index_ready = Some(*i);
228 Selected::Aborted
229 }
230 Err(s) => s,
231 };
232 break;
233 }
234
235 // If another thread has already selected one of the operations, stop registration.
236 sel = cx.selected();
237 if sel != Selected::Waiting {
238 break;
239 }
240 }
241
242 if sel == Selected::Waiting {
243 // Check with each operation for how long we're allowed to block, and compute the
244 // earliest deadline.
245 let mut deadline: Option<Instant> = match timeout {
246 Timeout::Now => return None,
247 Timeout::Never => None,
248 Timeout::At(when) => Some(when),
249 };
250 for &(handle, _, _) in handles.iter() {
251 if let Some(x) = handle.deadline() {
252 deadline = deadline.map(|y| x.min(y)).or(Some(x));
253 }
254 }
255
256 // Block the current thread.
257 sel = cx.wait_until(deadline);
258 }
259
260 // Unregister all registered operations.
261 for (handle, _, _) in handles.iter_mut().take(registered_count) {
262 handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
263 }
264
265 match sel {
266 Selected::Waiting => unreachable!(),
267 Selected::Aborted => {
268 // If an operation became ready during registration, try selecting it.
269 if let Some(index_ready) = index_ready {
270 for &(handle, i, ptr) in handles.iter() {
271 if i == index_ready && handle.try_select(&mut token) {
272 return Some((i, ptr));
273 }
274 }
275 }
276 }
277 Selected::Disconnected => {}
278 Selected::Operation(_) => {
279 // Find the selected operation.
280 for (handle, i, ptr) in handles.iter_mut() {
281 // Is this the selected operation?
282 if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
283 {
284 // Try selecting this operation.
285 if handle.accept(&mut token, cx) {
286 return Some((*i, *ptr));
287 }
288 }
289 }
290 }
291 }
292
293 None
294 });
295
296 // Return if an operation was selected.
297 if let Some((i, ptr)) = res {
298 return Some((token, i, ptr));
299 }
300
301 // Try selecting one of the operations without blocking.
302 for &(handle, i, ptr) in handles.iter() {
303 if handle.try_select(&mut token) {
304 return Some((token, i, ptr));
305 }
306 }
307
308 match timeout {
309 Timeout::Now => return None,
310 Timeout::Never => {}
311 Timeout::At(when) => {
312 if Instant::now() >= when {
313 return None;
314 }
315 }
316 }
317 }
318 }
319
320 /// Runs until one of the operations becomes ready, potentially blocking the current thread.
321 fn run_ready(
322 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
323 timeout: Timeout,
324 ) -> Option<usize> {
325 if handles.is_empty() {
326 // Wait until the timeout and return.
327 match timeout {
328 Timeout::Now => return None,
329 Timeout::Never => {
330 utils::sleep_until(None);
331 unreachable!();
332 }
333 Timeout::At(when) => {
334 utils::sleep_until(Some(when));
335 return None;
336 }
337 }
338 }
339
340 // Shuffle the operations for fairness.
341 utils::shuffle(handles);
342
343 loop {
344 let backoff = Backoff::new();
345 loop {
346 // Check operations for readiness.
347 for &(handle, i, _) in handles.iter() {
348 if handle.is_ready() {
349 return Some(i);
350 }
351 }
352
353 if backoff.is_completed() {
354 break;
355 } else {
356 backoff.snooze();
357 }
358 }
359
360 // Check for timeout.
361 match timeout {
362 Timeout::Now => return None,
363 Timeout::Never => {}
364 Timeout::At(when) => {
365 if Instant::now() >= when {
366 return None;
367 }
368 }
369 }
370
371 // Prepare for blocking.
372 let res = Context::with(|cx| {
373 let mut sel = Selected::Waiting;
374 let mut registered_count = 0;
375
376 // Begin watching all operations.
377 for (handle, _, _) in handles.iter_mut() {
378 registered_count += 1;
379 let oper = Operation::hook::<&dyn SelectHandle>(handle);
380
381 // If registration returns `false`, that means the operation has just become ready.
382 if handle.watch(oper, cx) {
383 sel = match cx.try_select(Selected::Operation(oper)) {
384 Ok(()) => Selected::Operation(oper),
385 Err(s) => s,
386 };
387 break;
388 }
389
390 // If another thread has already chosen one of the operations, stop registration.
391 sel = cx.selected();
392 if sel != Selected::Waiting {
393 break;
394 }
395 }
396
397 if sel == Selected::Waiting {
398 // Check with each operation for how long we're allowed to block, and compute the
399 // earliest deadline.
400 let mut deadline: Option<Instant> = match timeout {
401 Timeout::Now => unreachable!(),
402 Timeout::Never => None,
403 Timeout::At(when) => Some(when),
404 };
405 for &(handle, _, _) in handles.iter() {
406 if let Some(x) = handle.deadline() {
407 deadline = deadline.map(|y| x.min(y)).or(Some(x));
408 }
409 }
410
411 // Block the current thread.
412 sel = cx.wait_until(deadline);
413 }
414
415 // Unwatch all operations.
416 for (handle, _, _) in handles.iter_mut().take(registered_count) {
417 handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
418 }
419
420 match sel {
421 Selected::Waiting => unreachable!(),
422 Selected::Aborted => {}
423 Selected::Disconnected => {}
424 Selected::Operation(_) => {
425 for (handle, i, _) in handles.iter_mut() {
426 let oper = Operation::hook::<&dyn SelectHandle>(handle);
427 if sel == Selected::Operation(oper) {
428 return Some(*i);
429 }
430 }
431 }
432 }
433
434 None
435 });
436
437 // Return if an operation became ready.
438 if res.is_some() {
439 return res;
440 }
441 }
442 }
443
444 /// Attempts to select one of the operations without blocking.
445 #[inline]
446 pub fn try_select<'a>(
447 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
448 ) -> Result<SelectedOperation<'a>, TrySelectError> {
449 match run_select(handles, Timeout::Now) {
450 None => Err(TrySelectError),
451 Some((token, index, ptr)) => Ok(SelectedOperation {
452 token,
453 index,
454 ptr,
455 _marker: PhantomData,
456 }),
457 }
458 }
459
460 /// Blocks until one of the operations becomes ready and selects it.
461 #[inline]
462 pub fn select<'a>(
463 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
464 ) -> SelectedOperation<'a> {
465 if handles.is_empty() {
466 panic!("no operations have been added to `Select`");
467 }
468
469 let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
470 SelectedOperation {
471 token,
472 index,
473 ptr,
474 _marker: PhantomData,
475 }
476 }
477
478 /// Blocks for a limited time until one of the operations becomes ready and selects it.
479 #[inline]
480 pub fn select_timeout<'a>(
481 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
482 timeout: Duration,
483 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
484 select_deadline(handles, Instant::now() + timeout)
485 }
486
487 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
488 #[inline]
489 pub fn select_deadline<'a>(
490 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
491 deadline: Instant,
492 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
493 match run_select(handles, Timeout::At(deadline)) {
494 None => Err(SelectTimeoutError),
495 Some((token, index, ptr)) => Ok(SelectedOperation {
496 token,
497 index,
498 ptr,
499 _marker: PhantomData,
500 }),
501 }
502 }
503
504 /// Selects from a set of channel operations.
505 ///
506 /// `Select` allows you to define a set of channel operations, wait until any one of them becomes
507 /// ready, and finally execute it. If multiple operations are ready at the same time, a random one
508 /// among them is selected.
509 ///
510 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
511 /// when it will simply return an error because the channel is disconnected.
512 ///
513 /// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
514 /// dynamically created list of channel operations.
515 ///
516 /// Once a list of operations has been built with `Select`, there are two different ways of
517 /// proceeding:
518 ///
519 /// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
520 /// the returned selected operation has already begun and **must** be completed. If we don't
521 /// complete it, a panic will occur.
522 ///
523 /// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
524 /// successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
525 /// possible for another thread to make the operation not ready just before we try executing it,
526 /// so it's wise to use a retry loop. However, note that these methods might return with success
527 /// spuriously, so it's a good idea to always double check if the operation is really ready.
528 ///
529 /// # Examples
530 ///
531 /// Use [`select`] to receive a message from a list of receivers:
532 ///
533 /// ```
534 /// use crossbeam_channel::{Receiver, RecvError, Select};
535 ///
536 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
537 /// // Build a list of operations.
538 /// let mut sel = Select::new();
539 /// for r in rs {
540 /// sel.recv(r);
541 /// }
542 ///
543 /// // Complete the selected operation.
544 /// let oper = sel.select();
545 /// let index = oper.index();
546 /// oper.recv(&rs[index])
547 /// }
548 /// ```
549 ///
550 /// Use [`ready`] to receive a message from a list of receivers:
551 ///
552 /// ```
553 /// use crossbeam_channel::{Receiver, RecvError, Select};
554 ///
555 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
556 /// // Build a list of operations.
557 /// let mut sel = Select::new();
558 /// for r in rs {
559 /// sel.recv(r);
560 /// }
561 ///
562 /// loop {
563 /// // Wait until a receive operation becomes ready and try executing it.
564 /// let index = sel.ready();
565 /// let res = rs[index].try_recv();
566 ///
567 /// // If the operation turns out not to be ready, retry.
568 /// if let Err(e) = res {
569 /// if e.is_empty() {
570 /// continue;
571 /// }
572 /// }
573 ///
574 /// // Success!
575 /// return res.map_err(|_| RecvError);
576 /// }
577 /// }
578 /// ```
579 ///
580 /// [`try_select`]: Select::try_select
581 /// [`select`]: Select::select
582 /// [`select_timeout`]: Select::select_timeout
583 /// [`try_ready`]: Select::try_ready
584 /// [`ready`]: Select::ready
585 /// [`ready_timeout`]: Select::ready_timeout
586 pub struct Select<'a> {
587 /// A list of senders and receivers participating in selection.
588 handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
589
590 /// The next index to assign to an operation.
591 next_index: usize,
592 }
593
594 unsafe impl Send for Select<'_> {}
595 unsafe impl Sync for Select<'_> {}
596
597 impl<'a> Select<'a> {
598 /// Creates an empty list of channel operations for selection.
599 ///
600 /// # Examples
601 ///
602 /// ```
603 /// use crossbeam_channel::Select;
604 ///
605 /// let mut sel = Select::new();
606 ///
607 /// // The list of operations is empty, which means no operation can be selected.
608 /// assert!(sel.try_select().is_err());
609 /// ```
610 pub fn new() -> Select<'a> {
611 Select {
612 handles: Vec::with_capacity(4),
613 next_index: 0,
614 }
615 }
616
617 /// Adds a send operation.
618 ///
619 /// Returns the index of the added operation.
620 ///
621 /// # Examples
622 ///
623 /// ```
624 /// use crossbeam_channel::{unbounded, Select};
625 ///
626 /// let (s, r) = unbounded::<i32>();
627 ///
628 /// let mut sel = Select::new();
629 /// let index = sel.send(&s);
630 /// ```
631 pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
632 let i = self.next_index;
633 let ptr = s as *const Sender<_> as *const u8;
634 self.handles.push((s, i, ptr));
635 self.next_index += 1;
636 i
637 }
638
639 /// Adds a receive operation.
640 ///
641 /// Returns the index of the added operation.
642 ///
643 /// # Examples
644 ///
645 /// ```
646 /// use crossbeam_channel::{unbounded, Select};
647 ///
648 /// let (s, r) = unbounded::<i32>();
649 ///
650 /// let mut sel = Select::new();
651 /// let index = sel.recv(&r);
652 /// ```
653 pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
654 let i = self.next_index;
655 let ptr = r as *const Receiver<_> as *const u8;
656 self.handles.push((r, i, ptr));
657 self.next_index += 1;
658 i
659 }
660
661 /// Removes a previously added operation.
662 ///
663 /// This is useful when an operation is selected because the channel got disconnected and we
664 /// want to try again to select a different operation instead.
665 ///
666 /// If new operations are added after removing some, the indices of removed operations will not
667 /// be reused.
668 ///
669 /// # Panics
670 ///
671 /// An attempt to remove a non-existing or already removed operation will panic.
672 ///
673 /// # Examples
674 ///
675 /// ```
676 /// use crossbeam_channel::{unbounded, Select};
677 ///
678 /// let (s1, r1) = unbounded::<i32>();
679 /// let (_, r2) = unbounded::<i32>();
680 ///
681 /// let mut sel = Select::new();
682 /// let oper1 = sel.recv(&r1);
683 /// let oper2 = sel.recv(&r2);
684 ///
685 /// // Both operations are initially ready, so a random one will be executed.
686 /// let oper = sel.select();
687 /// assert_eq!(oper.index(), oper2);
688 /// assert!(oper.recv(&r2).is_err());
689 /// sel.remove(oper2);
690 ///
691 /// s1.send(10).unwrap();
692 ///
693 /// let oper = sel.select();
694 /// assert_eq!(oper.index(), oper1);
695 /// assert_eq!(oper.recv(&r1), Ok(10));
696 /// ```
697 pub fn remove(&mut self, index: usize) {
698 assert!(
699 index < self.next_index,
700 "index out of bounds; {} >= {}",
701 index,
702 self.next_index,
703 );
704
705 let i = self
706 .handles
707 .iter()
708 .enumerate()
709 .find(|(_, (_, i, _))| *i == index)
710 .expect("no operation with this index")
711 .0;
712
713 self.handles.swap_remove(i);
714 }
715
716 /// Attempts to select one of the operations without blocking.
717 ///
718 /// If an operation is ready, it is selected and returned. If multiple operations are ready at
719 /// the same time, a random one among them is selected. If none of the operations are ready, an
720 /// error is returned.
721 ///
722 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
723 /// even when it will simply return an error because the channel is disconnected.
724 ///
725 /// The selected operation must be completed with [`SelectedOperation::send`]
726 /// or [`SelectedOperation::recv`].
727 ///
728 /// # Examples
729 ///
730 /// ```
731 /// use crossbeam_channel::{unbounded, Select};
732 ///
733 /// let (s1, r1) = unbounded();
734 /// let (s2, r2) = unbounded();
735 ///
736 /// s1.send(10).unwrap();
737 /// s2.send(20).unwrap();
738 ///
739 /// let mut sel = Select::new();
740 /// let oper1 = sel.recv(&r1);
741 /// let oper2 = sel.recv(&r2);
742 ///
743 /// // Both operations are initially ready, so a random one will be executed.
744 /// let oper = sel.try_select();
745 /// match oper {
746 /// Err(_) => panic!("both operations should be ready"),
747 /// Ok(oper) => match oper.index() {
748 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
749 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
750 /// _ => unreachable!(),
751 /// }
752 /// }
753 /// ```
754 pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
755 try_select(&mut self.handles)
756 }
757
758 /// Blocks until one of the operations becomes ready and selects it.
759 ///
760 /// Once an operation becomes ready, it is selected and returned. If multiple operations are
761 /// ready at the same time, a random one among them is selected.
762 ///
763 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
764 /// even when it will simply return an error because the channel is disconnected.
765 ///
766 /// The selected operation must be completed with [`SelectedOperation::send`]
767 /// or [`SelectedOperation::recv`].
768 ///
769 /// # Panics
770 ///
771 /// Panics if no operations have been added to `Select`.
772 ///
773 /// # Examples
774 ///
775 /// ```
776 /// use std::thread;
777 /// use std::time::Duration;
778 /// use crossbeam_channel::{unbounded, Select};
779 ///
780 /// let (s1, r1) = unbounded();
781 /// let (s2, r2) = unbounded();
782 ///
783 /// thread::spawn(move || {
784 /// thread::sleep(Duration::from_secs(1));
785 /// s1.send(10).unwrap();
786 /// });
787 /// thread::spawn(move || s2.send(20).unwrap());
788 ///
789 /// let mut sel = Select::new();
790 /// let oper1 = sel.recv(&r1);
791 /// let oper2 = sel.recv(&r2);
792 ///
793 /// // The second operation will be selected because it becomes ready first.
794 /// let oper = sel.select();
795 /// match oper.index() {
796 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
797 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
798 /// _ => unreachable!(),
799 /// }
800 /// ```
801 pub fn select(&mut self) -> SelectedOperation<'a> {
802 select(&mut self.handles)
803 }
804
805 /// Blocks for a limited time until one of the operations becomes ready and selects it.
806 ///
807 /// If an operation becomes ready, it is selected and returned. If multiple operations are
808 /// ready at the same time, a random one among them is selected. If none of the operations
809 /// become ready for the specified duration, an error is returned.
810 ///
811 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
812 /// even when it will simply return an error because the channel is disconnected.
813 ///
814 /// The selected operation must be completed with [`SelectedOperation::send`]
815 /// or [`SelectedOperation::recv`].
816 ///
817 /// # Examples
818 ///
819 /// ```
820 /// use std::thread;
821 /// use std::time::Duration;
822 /// use crossbeam_channel::{unbounded, Select};
823 ///
824 /// let (s1, r1) = unbounded();
825 /// let (s2, r2) = unbounded();
826 ///
827 /// thread::spawn(move || {
828 /// thread::sleep(Duration::from_secs(1));
829 /// s1.send(10).unwrap();
830 /// });
831 /// thread::spawn(move || s2.send(20).unwrap());
832 ///
833 /// let mut sel = Select::new();
834 /// let oper1 = sel.recv(&r1);
835 /// let oper2 = sel.recv(&r2);
836 ///
837 /// // The second operation will be selected because it becomes ready first.
838 /// let oper = sel.select_timeout(Duration::from_millis(500));
839 /// match oper {
840 /// Err(_) => panic!("should not have timed out"),
841 /// Ok(oper) => match oper.index() {
842 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
843 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
844 /// _ => unreachable!(),
845 /// }
846 /// }
847 /// ```
848 pub fn select_timeout(
849 &mut self,
850 timeout: Duration,
851 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
852 select_timeout(&mut self.handles, timeout)
853 }
854
855 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
856 ///
857 /// If an operation becomes ready, it is selected and returned. If multiple operations are
858 /// ready at the same time, a random one among them is selected. If none of the operations
859 /// become ready before the given deadline, an error is returned.
860 ///
861 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
862 /// even when it will simply return an error because the channel is disconnected.
863 ///
864 /// The selected operation must be completed with [`SelectedOperation::send`]
865 /// or [`SelectedOperation::recv`].
866 ///
867 /// [`SelectedOperation::send`]: struct.SelectedOperation.html#method.send
868 /// [`SelectedOperation::recv`]: struct.SelectedOperation.html#method.recv
869 ///
870 /// # Examples
871 ///
872 /// ```
873 /// use std::thread;
874 /// use std::time::{Instant, Duration};
875 /// use crossbeam_channel::{unbounded, Select};
876 ///
877 /// let (s1, r1) = unbounded();
878 /// let (s2, r2) = unbounded();
879 ///
880 /// thread::spawn(move || {
881 /// thread::sleep(Duration::from_secs(1));
882 /// s1.send(10).unwrap();
883 /// });
884 /// thread::spawn(move || s2.send(20).unwrap());
885 ///
886 /// let mut sel = Select::new();
887 /// let oper1 = sel.recv(&r1);
888 /// let oper2 = sel.recv(&r2);
889 ///
890 /// let deadline = Instant::now() + Duration::from_millis(500);
891 ///
892 /// // The second operation will be selected because it becomes ready first.
893 /// let oper = sel.select_deadline(deadline);
894 /// match oper {
895 /// Err(_) => panic!("should not have timed out"),
896 /// Ok(oper) => match oper.index() {
897 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
898 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
899 /// _ => unreachable!(),
900 /// }
901 /// }
902 /// ```
903 pub fn select_deadline(
904 &mut self,
905 deadline: Instant,
906 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
907 select_deadline(&mut self.handles, deadline)
908 }
909
910 /// Attempts to find a ready operation without blocking.
911 ///
912 /// If an operation is ready, its index is returned. If multiple operations are ready at the
913 /// same time, a random one among them is chosen. If none of the operations are ready, an error
914 /// is returned.
915 ///
916 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
917 /// even when it will simply return an error because the channel is disconnected.
918 ///
919 /// Note that this method might return with success spuriously, so it's a good idea to always
920 /// double check if the operation is really ready.
921 ///
922 /// # Examples
923 ///
924 /// ```
925 /// use crossbeam_channel::{unbounded, Select};
926 ///
927 /// let (s1, r1) = unbounded();
928 /// let (s2, r2) = unbounded();
929 ///
930 /// s1.send(10).unwrap();
931 /// s2.send(20).unwrap();
932 ///
933 /// let mut sel = Select::new();
934 /// let oper1 = sel.recv(&r1);
935 /// let oper2 = sel.recv(&r2);
936 ///
937 /// // Both operations are initially ready, so a random one will be chosen.
938 /// match sel.try_ready() {
939 /// Err(_) => panic!("both operations should be ready"),
940 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
941 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
942 /// Ok(_) => unreachable!(),
943 /// }
944 /// ```
945 pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
946 match run_ready(&mut self.handles, Timeout::Now) {
947 None => Err(TryReadyError),
948 Some(index) => Ok(index),
949 }
950 }
951
952 /// Blocks until one of the operations becomes ready.
953 ///
954 /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
955 /// the same time, a random one among them is chosen.
956 ///
957 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
958 /// even when it will simply return an error because the channel is disconnected.
959 ///
960 /// Note that this method might return with success spuriously, so it's a good idea to always
961 /// double check if the operation is really ready.
962 ///
963 /// # Panics
964 ///
965 /// Panics if no operations have been added to `Select`.
966 ///
967 /// # Examples
968 ///
969 /// ```
970 /// use std::thread;
971 /// use std::time::Duration;
972 /// use crossbeam_channel::{unbounded, Select};
973 ///
974 /// let (s1, r1) = unbounded();
975 /// let (s2, r2) = unbounded();
976 ///
977 /// thread::spawn(move || {
978 /// thread::sleep(Duration::from_secs(1));
979 /// s1.send(10).unwrap();
980 /// });
981 /// thread::spawn(move || s2.send(20).unwrap());
982 ///
983 /// let mut sel = Select::new();
984 /// let oper1 = sel.recv(&r1);
985 /// let oper2 = sel.recv(&r2);
986 ///
987 /// // The second operation will be selected because it becomes ready first.
988 /// match sel.ready() {
989 /// i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
990 /// i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
991 /// _ => unreachable!(),
992 /// }
993 /// ```
994 pub fn ready(&mut self) -> usize {
995 if self.handles.is_empty() {
996 panic!("no operations have been added to `Select`");
997 }
998
999 run_ready(&mut self.handles, Timeout::Never).unwrap()
1000 }
1001
1002 /// Blocks for a limited time until one of the operations becomes ready.
1003 ///
1004 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1005 /// the same time, a random one among them is chosen. If none of the operations become ready
1006 /// for the specified duration, an error is returned.
1007 ///
1008 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1009 /// even when it will simply return an error because the channel is disconnected.
1010 ///
1011 /// Note that this method might return with success spuriously, so it's a good idea to double
1012 /// check if the operation is really ready.
1013 ///
1014 /// # Examples
1015 ///
1016 /// ```
1017 /// use std::thread;
1018 /// use std::time::Duration;
1019 /// use crossbeam_channel::{unbounded, Select};
1020 ///
1021 /// let (s1, r1) = unbounded();
1022 /// let (s2, r2) = unbounded();
1023 ///
1024 /// thread::spawn(move || {
1025 /// thread::sleep(Duration::from_secs(1));
1026 /// s1.send(10).unwrap();
1027 /// });
1028 /// thread::spawn(move || s2.send(20).unwrap());
1029 ///
1030 /// let mut sel = Select::new();
1031 /// let oper1 = sel.recv(&r1);
1032 /// let oper2 = sel.recv(&r2);
1033 ///
1034 /// // The second operation will be selected because it becomes ready first.
1035 /// match sel.ready_timeout(Duration::from_millis(500)) {
1036 /// Err(_) => panic!("should not have timed out"),
1037 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1038 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1039 /// Ok(_) => unreachable!(),
1040 /// }
1041 /// ```
1042 pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1043 self.ready_deadline(Instant::now() + timeout)
1044 }
1045
1046 /// Blocks until a given deadline, or until one of the operations becomes ready.
1047 ///
1048 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1049 /// the same time, a random one among them is chosen. If none of the operations become ready
1050 /// before the deadline, an error is returned.
1051 ///
1052 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1053 /// even when it will simply return an error because the channel is disconnected.
1054 ///
1055 /// Note that this method might return with success spuriously, so it's a good idea to double
1056 /// check if the operation is really ready.
1057 ///
1058 /// # Examples
1059 ///
1060 /// ```
1061 /// use std::thread;
1062 /// use std::time::{Duration, Instant};
1063 /// use crossbeam_channel::{unbounded, Select};
1064 ///
1065 /// let deadline = Instant::now() + Duration::from_millis(500);
1066 ///
1067 /// let (s1, r1) = unbounded();
1068 /// let (s2, r2) = unbounded();
1069 ///
1070 /// thread::spawn(move || {
1071 /// thread::sleep(Duration::from_secs(1));
1072 /// s1.send(10).unwrap();
1073 /// });
1074 /// thread::spawn(move || s2.send(20).unwrap());
1075 ///
1076 /// let mut sel = Select::new();
1077 /// let oper1 = sel.recv(&r1);
1078 /// let oper2 = sel.recv(&r2);
1079 ///
1080 /// // The second operation will be selected because it becomes ready first.
1081 /// match sel.ready_deadline(deadline) {
1082 /// Err(_) => panic!("should not have timed out"),
1083 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1084 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1085 /// Ok(_) => unreachable!(),
1086 /// }
1087 /// ```
1088 pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1089 match run_ready(&mut self.handles, Timeout::At(deadline)) {
1090 None => Err(ReadyTimeoutError),
1091 Some(index) => Ok(index),
1092 }
1093 }
1094 }
1095
1096 impl<'a> Clone for Select<'a> {
1097 fn clone(&self) -> Select<'a> {
1098 Select {
1099 handles: self.handles.clone(),
1100 next_index: self.next_index,
1101 }
1102 }
1103 }
1104
1105 impl<'a> Default for Select<'a> {
1106 fn default() -> Select<'a> {
1107 Select::new()
1108 }
1109 }
1110
1111 impl fmt::Debug for Select<'_> {
1112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1113 f.pad("Select { .. }")
1114 }
1115 }
1116
1117 /// A selected operation that needs to be completed.
1118 ///
1119 /// To complete the operation, call [`send`] or [`recv`].
1120 ///
1121 /// # Panics
1122 ///
1123 /// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1124 /// `SelectedOperation` is dropped without completion, a panic occurs.
1125 ///
1126 /// [`send`]: SelectedOperation::send
1127 /// [`recv`]: SelectedOperation::recv
1128 #[must_use]
1129 pub struct SelectedOperation<'a> {
1130 /// Token needed to complete the operation.
1131 token: Token,
1132
1133 /// The index of the selected operation.
1134 index: usize,
1135
1136 /// The address of the selected `Sender` or `Receiver`.
1137 ptr: *const u8,
1138
1139 /// Indicates that `Sender`s and `Receiver`s are borrowed.
1140 _marker: PhantomData<&'a ()>,
1141 }
1142
1143 impl SelectedOperation<'_> {
1144 /// Returns the index of the selected operation.
1145 ///
1146 /// # Examples
1147 ///
1148 /// ```
1149 /// use crossbeam_channel::{bounded, Select};
1150 ///
1151 /// let (s1, r1) = bounded::<()>(0);
1152 /// let (s2, r2) = bounded::<()>(0);
1153 /// let (s3, r3) = bounded::<()>(1);
1154 ///
1155 /// let mut sel = Select::new();
1156 /// let oper1 = sel.send(&s1);
1157 /// let oper2 = sel.recv(&r2);
1158 /// let oper3 = sel.send(&s3);
1159 ///
1160 /// // Only the last operation is ready.
1161 /// let oper = sel.select();
1162 /// assert_eq!(oper.index(), 2);
1163 /// assert_eq!(oper.index(), oper3);
1164 ///
1165 /// // Complete the operation.
1166 /// oper.send(&s3, ()).unwrap();
1167 /// ```
1168 pub fn index(&self) -> usize {
1169 self.index
1170 }
1171
1172 /// Completes the send operation.
1173 ///
1174 /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1175 /// when the operation was added.
1176 ///
1177 /// # Panics
1178 ///
1179 /// Panics if an incorrect [`Sender`] reference is passed.
1180 ///
1181 /// # Examples
1182 ///
1183 /// ```
1184 /// use crossbeam_channel::{bounded, Select, SendError};
1185 ///
1186 /// let (s, r) = bounded::<i32>(0);
1187 /// drop(r);
1188 ///
1189 /// let mut sel = Select::new();
1190 /// let oper1 = sel.send(&s);
1191 ///
1192 /// let oper = sel.select();
1193 /// assert_eq!(oper.index(), oper1);
1194 /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1195 /// ```
1196 pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1197 assert!(
1198 s as *const Sender<T> as *const u8 == self.ptr,
1199 "passed a sender that wasn't selected",
1200 );
1201 let res = unsafe { channel::write(s, &mut self.token, msg) };
1202 mem::forget(self);
1203 res.map_err(SendError)
1204 }
1205
1206 /// Completes the receive operation.
1207 ///
1208 /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1209 /// when the operation was added.
1210 ///
1211 /// # Panics
1212 ///
1213 /// Panics if an incorrect [`Receiver`] reference is passed.
1214 ///
1215 /// # Examples
1216 ///
1217 /// ```
1218 /// use crossbeam_channel::{bounded, Select, RecvError};
1219 ///
1220 /// let (s, r) = bounded::<i32>(0);
1221 /// drop(s);
1222 ///
1223 /// let mut sel = Select::new();
1224 /// let oper1 = sel.recv(&r);
1225 ///
1226 /// let oper = sel.select();
1227 /// assert_eq!(oper.index(), oper1);
1228 /// assert_eq!(oper.recv(&r), Err(RecvError));
1229 /// ```
1230 pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1231 assert!(
1232 r as *const Receiver<T> as *const u8 == self.ptr,
1233 "passed a receiver that wasn't selected",
1234 );
1235 let res = unsafe { channel::read(r, &mut self.token) };
1236 mem::forget(self);
1237 res.map_err(|_| RecvError)
1238 }
1239 }
1240
1241 impl fmt::Debug for SelectedOperation<'_> {
1242 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1243 f.pad("SelectedOperation { .. }")
1244 }
1245 }
1246
1247 impl Drop for SelectedOperation<'_> {
1248 fn drop(&mut self) {
1249 panic!("dropped `SelectedOperation` without completing the operation");
1250 }
1251 }