]> git.proxmox.com Git - rustc.git/blob - vendor/crossbeam-channel/src/select.rs
New upstream version 1.48.0~beta.8+dfsg1
[rustc.git] / vendor / crossbeam-channel / src / select.rs
1 //! Interface to the select mechanism.
2
3 use std::fmt;
4 use std::marker::PhantomData;
5 use std::mem;
6 use std::time::{Duration, Instant};
7
8 use crossbeam_utils::Backoff;
9
10 use channel::{self, Receiver, Sender};
11 use context::Context;
12 use err::{ReadyTimeoutError, TryReadyError};
13 use err::{RecvError, SendError};
14 use err::{SelectTimeoutError, TrySelectError};
15 use flavors;
16 use utils;
17
18 /// Temporary data that gets initialized during select or a blocking operation, and is consumed by
19 /// `read` or `write`.
20 ///
21 /// Each field contains data associated with a specific channel flavor.
22 #[derive(Debug, Default)]
23 pub struct Token {
24 pub after: flavors::after::AfterToken,
25 pub array: flavors::array::ArrayToken,
26 pub list: flavors::list::ListToken,
27 pub never: flavors::never::NeverToken,
28 pub tick: flavors::tick::TickToken,
29 pub zero: flavors::zero::ZeroToken,
30 }
31
32 /// Identifier associated with an operation by a specific thread on a specific channel.
33 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
34 pub struct Operation(usize);
35
36 impl Operation {
37 /// Creates an operation identifier from a mutable reference.
38 ///
39 /// This function essentially just turns the address of the reference into a number. The
40 /// reference should point to a variable that is specific to the thread and the operation,
41 /// and is alive for the entire duration of select or blocking operation.
42 #[inline]
43 pub fn hook<T>(r: &mut T) -> Operation {
44 let val = r as *mut T as usize;
45 // Make sure that the pointer address doesn't equal the numerical representation of
46 // `Selected::{Waiting, Aborted, Disconnected}`.
47 assert!(val > 2);
48 Operation(val)
49 }
50 }
51
52 /// Current state of a select or a blocking operation.
53 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
54 pub enum Selected {
55 /// Still waiting for an operation.
56 Waiting,
57
58 /// The attempt to block the current thread has been aborted.
59 Aborted,
60
61 /// An operation became ready because a channel is disconnected.
62 Disconnected,
63
64 /// An operation became ready because a message can be sent or received.
65 Operation(Operation),
66 }
67
68 impl From<usize> for Selected {
69 #[inline]
70 fn from(val: usize) -> Selected {
71 match val {
72 0 => Selected::Waiting,
73 1 => Selected::Aborted,
74 2 => Selected::Disconnected,
75 oper => Selected::Operation(Operation(oper)),
76 }
77 }
78 }
79
80 impl Into<usize> for Selected {
81 #[inline]
82 fn into(self) -> usize {
83 match self {
84 Selected::Waiting => 0,
85 Selected::Aborted => 1,
86 Selected::Disconnected => 2,
87 Selected::Operation(Operation(val)) => val,
88 }
89 }
90 }
91
92 /// A receiver or a sender that can participate in select.
93 ///
94 /// This is a handle that assists select in executing an operation, registration, deciding on the
95 /// appropriate deadline for blocking, etc.
96 pub trait SelectHandle {
97 /// Attempts to select an operation and returns `true` on success.
98 fn try_select(&self, token: &mut Token) -> bool;
99
100 /// Returns a deadline for an operation, if there is one.
101 fn deadline(&self) -> Option<Instant>;
102
103 /// Registers an operation for execution and returns `true` if it is now ready.
104 fn register(&self, oper: Operation, cx: &Context) -> bool;
105
106 /// Unregisters an operation for execution.
107 fn unregister(&self, oper: Operation);
108
109 /// Attempts to select an operation the thread got woken up for and returns `true` on success.
110 fn accept(&self, token: &mut Token, cx: &Context) -> bool;
111
112 /// Returns `true` if an operation can be executed without blocking.
113 fn is_ready(&self) -> bool;
114
115 /// Registers an operation for readiness notification and returns `true` if it is now ready.
116 fn watch(&self, oper: Operation, cx: &Context) -> bool;
117
118 /// Unregisters an operation for readiness notification.
119 fn unwatch(&self, oper: Operation);
120 }
121
122 impl<'a, T: SelectHandle> SelectHandle for &'a T {
123 fn try_select(&self, token: &mut Token) -> bool {
124 (**self).try_select(token)
125 }
126
127 fn deadline(&self) -> Option<Instant> {
128 (**self).deadline()
129 }
130
131 fn register(&self, oper: Operation, cx: &Context) -> bool {
132 (**self).register(oper, cx)
133 }
134
135 fn unregister(&self, oper: Operation) {
136 (**self).unregister(oper);
137 }
138
139 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
140 (**self).accept(token, cx)
141 }
142
143 fn is_ready(&self) -> bool {
144 (**self).is_ready()
145 }
146
147 fn watch(&self, oper: Operation, cx: &Context) -> bool {
148 (**self).watch(oper, cx)
149 }
150
151 fn unwatch(&self, oper: Operation) {
152 (**self).unwatch(oper)
153 }
154 }
155
156 /// Determines when a select operation should time out.
157 #[derive(Clone, Copy, Eq, PartialEq)]
158 enum Timeout {
159 /// No blocking.
160 Now,
161
162 /// Block forever.
163 Never,
164
165 /// Time out after the time instant.
166 At(Instant),
167 }
168
169 /// Runs until one of the operations is selected, potentially blocking the current thread.
170 ///
171 /// Successful receive operations will have to be followed up by `channel::read()` and successful
172 /// send operations by `channel::write()`.
173 fn run_select(
174 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
175 timeout: Timeout,
176 ) -> Option<(Token, usize, *const u8)> {
177 if handles.is_empty() {
178 // Wait until the timeout and return.
179 match timeout {
180 Timeout::Now => return None,
181 Timeout::Never => {
182 utils::sleep_until(None);
183 unreachable!();
184 }
185 Timeout::At(when) => {
186 utils::sleep_until(Some(when));
187 return None;
188 }
189 }
190 }
191
192 // Shuffle the operations for fairness.
193 utils::shuffle(handles);
194
195 // Create a token, which serves as a temporary variable that gets initialized in this function
196 // and is later used by a call to `channel::read()` or `channel::write()` that completes the
197 // selected operation.
198 let mut token = Token::default();
199
200 // Try selecting one of the operations without blocking.
201 for &(handle, i, ptr) in handles.iter() {
202 if handle.try_select(&mut token) {
203 return Some((token, i, ptr));
204 }
205 }
206
207 loop {
208 // Prepare for blocking.
209 let res = Context::with(|cx| {
210 let mut sel = Selected::Waiting;
211 let mut registered_count = 0;
212 let mut index_ready = None;
213
214 if let Timeout::Now = timeout {
215 cx.try_select(Selected::Aborted).unwrap();
216 }
217
218 // Register all operations.
219 for (handle, i, _) in handles.iter_mut() {
220 registered_count += 1;
221
222 // If registration returns `false`, that means the operation has just become ready.
223 if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
224 // Try aborting select.
225 sel = match cx.try_select(Selected::Aborted) {
226 Ok(()) => {
227 index_ready = Some(*i);
228 Selected::Aborted
229 }
230 Err(s) => s,
231 };
232 break;
233 }
234
235 // If another thread has already selected one of the operations, stop registration.
236 sel = cx.selected();
237 if sel != Selected::Waiting {
238 break;
239 }
240 }
241
242 if sel == Selected::Waiting {
243 // Check with each operation for how long we're allowed to block, and compute the
244 // earliest deadline.
245 let mut deadline: Option<Instant> = match timeout {
246 Timeout::Now => return None,
247 Timeout::Never => None,
248 Timeout::At(when) => Some(when),
249 };
250 for &(handle, _, _) in handles.iter() {
251 if let Some(x) = handle.deadline() {
252 deadline = deadline.map(|y| x.min(y)).or(Some(x));
253 }
254 }
255
256 // Block the current thread.
257 sel = cx.wait_until(deadline);
258 }
259
260 // Unregister all registered operations.
261 for (handle, _, _) in handles.iter_mut().take(registered_count) {
262 handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
263 }
264
265 match sel {
266 Selected::Waiting => unreachable!(),
267 Selected::Aborted => {
268 // If an operation became ready during registration, try selecting it.
269 if let Some(index_ready) = index_ready {
270 for &(handle, i, ptr) in handles.iter() {
271 if i == index_ready && handle.try_select(&mut token) {
272 return Some((i, ptr));
273 }
274 }
275 }
276 }
277 Selected::Disconnected => {}
278 Selected::Operation(_) => {
279 // Find the selected operation.
280 for (handle, i, ptr) in handles.iter_mut() {
281 // Is this the selected operation?
282 if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
283 {
284 // Try selecting this operation.
285 if handle.accept(&mut token, cx) {
286 return Some((*i, *ptr));
287 }
288 }
289 }
290 }
291 }
292
293 None
294 });
295
296 // Return if an operation was selected.
297 if let Some((i, ptr)) = res {
298 return Some((token, i, ptr));
299 }
300
301 // Try selecting one of the operations without blocking.
302 for &(handle, i, ptr) in handles.iter() {
303 if handle.try_select(&mut token) {
304 return Some((token, i, ptr));
305 }
306 }
307
308 match timeout {
309 Timeout::Now => return None,
310 Timeout::Never => {}
311 Timeout::At(when) => {
312 if Instant::now() >= when {
313 return None;
314 }
315 }
316 }
317 }
318 }
319
320 /// Runs until one of the operations becomes ready, potentially blocking the current thread.
321 fn run_ready(
322 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
323 timeout: Timeout,
324 ) -> Option<usize> {
325 if handles.is_empty() {
326 // Wait until the timeout and return.
327 match timeout {
328 Timeout::Now => return None,
329 Timeout::Never => {
330 utils::sleep_until(None);
331 unreachable!();
332 }
333 Timeout::At(when) => {
334 utils::sleep_until(Some(when));
335 return None;
336 }
337 }
338 }
339
340 // Shuffle the operations for fairness.
341 utils::shuffle(handles);
342
343 loop {
344 let backoff = Backoff::new();
345 loop {
346 // Check operations for readiness.
347 for &(handle, i, _) in handles.iter() {
348 if handle.is_ready() {
349 return Some(i);
350 }
351 }
352
353 if backoff.is_completed() {
354 break;
355 } else {
356 backoff.snooze();
357 }
358 }
359
360 // Check for timeout.
361 match timeout {
362 Timeout::Now => return None,
363 Timeout::Never => {}
364 Timeout::At(when) => {
365 if Instant::now() >= when {
366 return None;
367 }
368 }
369 }
370
371 // Prepare for blocking.
372 let res = Context::with(|cx| {
373 let mut sel = Selected::Waiting;
374 let mut registered_count = 0;
375
376 // Begin watching all operations.
377 for (handle, _, _) in handles.iter_mut() {
378 registered_count += 1;
379 let oper = Operation::hook::<&dyn SelectHandle>(handle);
380
381 // If registration returns `false`, that means the operation has just become ready.
382 if handle.watch(oper, cx) {
383 sel = match cx.try_select(Selected::Operation(oper)) {
384 Ok(()) => Selected::Operation(oper),
385 Err(s) => s,
386 };
387 break;
388 }
389
390 // If another thread has already chosen one of the operations, stop registration.
391 sel = cx.selected();
392 if sel != Selected::Waiting {
393 break;
394 }
395 }
396
397 if sel == Selected::Waiting {
398 // Check with each operation for how long we're allowed to block, and compute the
399 // earliest deadline.
400 let mut deadline: Option<Instant> = match timeout {
401 Timeout::Now => unreachable!(),
402 Timeout::Never => None,
403 Timeout::At(when) => Some(when),
404 };
405 for &(handle, _, _) in handles.iter() {
406 if let Some(x) = handle.deadline() {
407 deadline = deadline.map(|y| x.min(y)).or(Some(x));
408 }
409 }
410
411 // Block the current thread.
412 sel = cx.wait_until(deadline);
413 }
414
415 // Unwatch all operations.
416 for (handle, _, _) in handles.iter_mut().take(registered_count) {
417 handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
418 }
419
420 match sel {
421 Selected::Waiting => unreachable!(),
422 Selected::Aborted => {}
423 Selected::Disconnected => {}
424 Selected::Operation(_) => {
425 for (handle, i, _) in handles.iter_mut() {
426 let oper = Operation::hook::<&dyn SelectHandle>(handle);
427 if sel == Selected::Operation(oper) {
428 return Some(*i);
429 }
430 }
431 }
432 }
433
434 None
435 });
436
437 // Return if an operation became ready.
438 if res.is_some() {
439 return res;
440 }
441 }
442 }
443
444 /// Attempts to select one of the operations without blocking.
445 #[inline]
446 pub fn try_select<'a>(
447 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
448 ) -> Result<SelectedOperation<'a>, TrySelectError> {
449 match run_select(handles, Timeout::Now) {
450 None => Err(TrySelectError),
451 Some((token, index, ptr)) => Ok(SelectedOperation {
452 token,
453 index,
454 ptr,
455 _marker: PhantomData,
456 }),
457 }
458 }
459
460 /// Blocks until one of the operations becomes ready and selects it.
461 #[inline]
462 pub fn select<'a>(
463 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
464 ) -> SelectedOperation<'a> {
465 if handles.is_empty() {
466 panic!("no operations have been added to `Select`");
467 }
468
469 let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
470 SelectedOperation {
471 token,
472 index,
473 ptr,
474 _marker: PhantomData,
475 }
476 }
477
478 /// Blocks for a limited time until one of the operations becomes ready and selects it.
479 #[inline]
480 pub fn select_timeout<'a>(
481 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
482 timeout: Duration,
483 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
484 let timeout = Timeout::At(Instant::now() + timeout);
485
486 match run_select(handles, timeout) {
487 None => Err(SelectTimeoutError),
488 Some((token, index, ptr)) => Ok(SelectedOperation {
489 token,
490 index,
491 ptr,
492 _marker: PhantomData,
493 }),
494 }
495 }
496
497 /// Selects from a set of channel operations.
498 ///
499 /// `Select` allows you to define a set of channel operations, wait until any one of them becomes
500 /// ready, and finally execute it. If multiple operations are ready at the same time, a random one
501 /// among them is selected.
502 ///
503 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
504 /// when it will simply return an error because the channel is disconnected.
505 ///
506 /// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
507 /// dynamically created list of channel operations.
508 ///
509 /// Once a list of operations has been built with `Select`, there are two different ways of
510 /// proceeding:
511 ///
512 /// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
513 /// the returned selected operation has already begun and **must** be completed. If we don't
514 /// complete it, a panic will occur.
515 ///
516 /// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
517 /// successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
518 /// possible for another thread to make the operation not ready just before we try executing it,
519 /// so it's wise to use a retry loop. However, note that these methods might return with success
520 /// spuriously, so it's a good idea to always double check if the operation is really ready.
521 ///
522 /// # Examples
523 ///
524 /// Use [`select`] to receive a message from a list of receivers:
525 ///
526 /// ```
527 /// use crossbeam_channel::{Receiver, RecvError, Select};
528 ///
529 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
530 /// // Build a list of operations.
531 /// let mut sel = Select::new();
532 /// for r in rs {
533 /// sel.recv(r);
534 /// }
535 ///
536 /// // Complete the selected operation.
537 /// let oper = sel.select();
538 /// let index = oper.index();
539 /// oper.recv(&rs[index])
540 /// }
541 /// ```
542 ///
543 /// Use [`ready`] to receive a message from a list of receivers:
544 ///
545 /// ```
546 /// use crossbeam_channel::{Receiver, RecvError, Select};
547 ///
548 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
549 /// // Build a list of operations.
550 /// let mut sel = Select::new();
551 /// for r in rs {
552 /// sel.recv(r);
553 /// }
554 ///
555 /// loop {
556 /// // Wait until a receive operation becomes ready and try executing it.
557 /// let index = sel.ready();
558 /// let res = rs[index].try_recv();
559 ///
560 /// // If the operation turns out not to be ready, retry.
561 /// if let Err(e) = res {
562 /// if e.is_empty() {
563 /// continue;
564 /// }
565 /// }
566 ///
567 /// // Success!
568 /// return res.map_err(|_| RecvError);
569 /// }
570 /// }
571 /// ```
572 ///
573 /// [`select!`]: macro.select.html
574 /// [`try_select`]: struct.Select.html#method.try_select
575 /// [`select`]: struct.Select.html#method.select
576 /// [`select_timeout`]: struct.Select.html#method.select_timeout
577 /// [`try_ready`]: struct.Select.html#method.try_ready
578 /// [`ready`]: struct.Select.html#method.ready
579 /// [`ready_timeout`]: struct.Select.html#method.ready_timeout
580 pub struct Select<'a> {
581 /// A list of senders and receivers participating in selection.
582 handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
583
584 /// The next index to assign to an operation.
585 next_index: usize,
586 }
587
588 unsafe impl<'a> Send for Select<'a> {}
589 unsafe impl<'a> Sync for Select<'a> {}
590
591 impl<'a> Select<'a> {
592 /// Creates an empty list of channel operations for selection.
593 ///
594 /// # Examples
595 ///
596 /// ```
597 /// use crossbeam_channel::Select;
598 ///
599 /// let mut sel = Select::new();
600 ///
601 /// // The list of operations is empty, which means no operation can be selected.
602 /// assert!(sel.try_select().is_err());
603 /// ```
604 pub fn new() -> Select<'a> {
605 Select {
606 handles: Vec::with_capacity(4),
607 next_index: 0,
608 }
609 }
610
611 /// Adds a send operation.
612 ///
613 /// Returns the index of the added operation.
614 ///
615 /// # Examples
616 ///
617 /// ```
618 /// use std::thread;
619 /// use crossbeam_channel::{unbounded, Select};
620 ///
621 /// let (s, r) = unbounded::<i32>();
622 ///
623 /// let mut sel = Select::new();
624 /// let index = sel.send(&s);
625 /// ```
626 pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
627 let i = self.next_index;
628 let ptr = s as *const Sender<_> as *const u8;
629 self.handles.push((s, i, ptr));
630 self.next_index += 1;
631 i
632 }
633
634 /// Adds a receive operation.
635 ///
636 /// Returns the index of the added operation.
637 ///
638 /// # Examples
639 ///
640 /// ```
641 /// use std::thread;
642 /// use crossbeam_channel::{unbounded, Select};
643 ///
644 /// let (s, r) = unbounded::<i32>();
645 ///
646 /// let mut sel = Select::new();
647 /// let index = sel.recv(&r);
648 /// ```
649 pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
650 let i = self.next_index;
651 let ptr = r as *const Receiver<_> as *const u8;
652 self.handles.push((r, i, ptr));
653 self.next_index += 1;
654 i
655 }
656
657 /// Removes a previously added operation.
658 ///
659 /// This is useful when an operation is selected because the channel got disconnected and we
660 /// want to try again to select a different operation instead.
661 ///
662 /// If new operations are added after removing some, the indices of removed operations will not
663 /// be reused.
664 ///
665 /// # Panics
666 ///
667 /// An attempt to remove a non-existing or already removed operation will panic.
668 ///
669 /// # Examples
670 ///
671 /// ```
672 /// use std::thread;
673 /// use crossbeam_channel::{unbounded, Select};
674 ///
675 /// let (s1, r1) = unbounded::<i32>();
676 /// let (_, r2) = unbounded::<i32>();
677 ///
678 /// let mut sel = Select::new();
679 /// let oper1 = sel.recv(&r1);
680 /// let oper2 = sel.recv(&r2);
681 ///
682 /// // Both operations are initially ready, so a random one will be executed.
683 /// let oper = sel.select();
684 /// assert_eq!(oper.index(), oper2);
685 /// assert!(oper.recv(&r2).is_err());
686 /// sel.remove(oper2);
687 ///
688 /// s1.send(10).unwrap();
689 ///
690 /// let oper = sel.select();
691 /// assert_eq!(oper.index(), oper1);
692 /// assert_eq!(oper.recv(&r1), Ok(10));
693 /// ```
694 pub fn remove(&mut self, index: usize) {
695 assert!(
696 index < self.next_index,
697 "index out of bounds; {} >= {}",
698 index,
699 self.next_index,
700 );
701
702 let i = self
703 .handles
704 .iter()
705 .enumerate()
706 .find(|(_, (_, i, _))| *i == index)
707 .expect("no operation with this index")
708 .0;
709
710 self.handles.swap_remove(i);
711 }
712
713 /// Attempts to select one of the operations without blocking.
714 ///
715 /// If an operation is ready, it is selected and returned. If multiple operations are ready at
716 /// the same time, a random one among them is selected. If none of the operations are ready, an
717 /// error is returned.
718 ///
719 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
720 /// even when it will simply return an error because the channel is disconnected.
721 ///
722 /// The selected operation must be completed with [`SelectedOperation::send`]
723 /// or [`SelectedOperation::recv`].
724 ///
725 /// [`SelectedOperation::send`]: struct.SelectedOperation.html#method.send
726 /// [`SelectedOperation::recv`]: struct.SelectedOperation.html#method.recv
727 ///
728 /// # Examples
729 ///
730 /// ```
731 /// use std::thread;
732 /// use crossbeam_channel::{unbounded, Select};
733 ///
734 /// let (s1, r1) = unbounded();
735 /// let (s2, r2) = unbounded();
736 ///
737 /// s1.send(10).unwrap();
738 /// s2.send(20).unwrap();
739 ///
740 /// let mut sel = Select::new();
741 /// let oper1 = sel.recv(&r1);
742 /// let oper2 = sel.recv(&r2);
743 ///
744 /// // Both operations are initially ready, so a random one will be executed.
745 /// let oper = sel.try_select();
746 /// match oper {
747 /// Err(_) => panic!("both operations should be ready"),
748 /// Ok(oper) => match oper.index() {
749 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
750 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
751 /// _ => unreachable!(),
752 /// }
753 /// }
754 /// ```
755 pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
756 try_select(&mut self.handles)
757 }
758
759 /// Blocks until one of the operations becomes ready and selects it.
760 ///
761 /// Once an operation becomes ready, it is selected and returned. If multiple operations are
762 /// ready at the same time, a random one among them is selected.
763 ///
764 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
765 /// even when it will simply return an error because the channel is disconnected.
766 ///
767 /// The selected operation must be completed with [`SelectedOperation::send`]
768 /// or [`SelectedOperation::recv`].
769 ///
770 /// [`SelectedOperation::send`]: struct.SelectedOperation.html#method.send
771 /// [`SelectedOperation::recv`]: struct.SelectedOperation.html#method.recv
772 ///
773 /// # Panics
774 ///
775 /// Panics if no operations have been added to `Select`.
776 ///
777 /// # Examples
778 ///
779 /// ```
780 /// use std::thread;
781 /// use std::time::Duration;
782 /// use crossbeam_channel::{unbounded, Select};
783 ///
784 /// let (s1, r1) = unbounded();
785 /// let (s2, r2) = unbounded();
786 ///
787 /// thread::spawn(move || {
788 /// thread::sleep(Duration::from_secs(1));
789 /// s1.send(10).unwrap();
790 /// });
791 /// thread::spawn(move || s2.send(20).unwrap());
792 ///
793 /// let mut sel = Select::new();
794 /// let oper1 = sel.recv(&r1);
795 /// let oper2 = sel.recv(&r2);
796 ///
797 /// // The second operation will be selected because it becomes ready first.
798 /// let oper = sel.select();
799 /// match oper.index() {
800 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
801 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
802 /// _ => unreachable!(),
803 /// }
804 /// ```
805 pub fn select(&mut self) -> SelectedOperation<'a> {
806 select(&mut self.handles)
807 }
808
809 /// Blocks for a limited time until one of the operations becomes ready and selects it.
810 ///
811 /// If an operation becomes ready, it is selected and returned. If multiple operations are
812 /// ready at the same time, a random one among them is selected. If none of the operations
813 /// become ready for the specified duration, an error is returned.
814 ///
815 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
816 /// even when it will simply return an error because the channel is disconnected.
817 ///
818 /// The selected operation must be completed with [`SelectedOperation::send`]
819 /// or [`SelectedOperation::recv`].
820 ///
821 /// [`SelectedOperation::send`]: struct.SelectedOperation.html#method.send
822 /// [`SelectedOperation::recv`]: struct.SelectedOperation.html#method.recv
823 ///
824 /// # Examples
825 ///
826 /// ```
827 /// use std::thread;
828 /// use std::time::Duration;
829 /// use crossbeam_channel::{unbounded, Select};
830 ///
831 /// let (s1, r1) = unbounded();
832 /// let (s2, r2) = unbounded();
833 ///
834 /// thread::spawn(move || {
835 /// thread::sleep(Duration::from_secs(1));
836 /// s1.send(10).unwrap();
837 /// });
838 /// thread::spawn(move || s2.send(20).unwrap());
839 ///
840 /// let mut sel = Select::new();
841 /// let oper1 = sel.recv(&r1);
842 /// let oper2 = sel.recv(&r2);
843 ///
844 /// // The second operation will be selected because it becomes ready first.
845 /// let oper = sel.select_timeout(Duration::from_millis(500));
846 /// match oper {
847 /// Err(_) => panic!("should not have timed out"),
848 /// Ok(oper) => match oper.index() {
849 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
850 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
851 /// _ => unreachable!(),
852 /// }
853 /// }
854 /// ```
855 pub fn select_timeout(
856 &mut self,
857 timeout: Duration,
858 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
859 select_timeout(&mut self.handles, timeout)
860 }
861
862 /// Attempts to find a ready operation without blocking.
863 ///
864 /// If an operation is ready, its index is returned. If multiple operations are ready at the
865 /// same time, a random one among them is chosen. If none of the operations are ready, an error
866 /// is returned.
867 ///
868 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
869 /// even when it will simply return an error because the channel is disconnected.
870 ///
871 /// Note that this method might return with success spuriously, so it's a good idea to always
872 /// double check if the operation is really ready.
873 ///
874 /// # Examples
875 ///
876 /// ```
877 /// use std::thread;
878 /// use crossbeam_channel::{unbounded, Select};
879 ///
880 /// let (s1, r1) = unbounded();
881 /// let (s2, r2) = unbounded();
882 ///
883 /// s1.send(10).unwrap();
884 /// s2.send(20).unwrap();
885 ///
886 /// let mut sel = Select::new();
887 /// let oper1 = sel.recv(&r1);
888 /// let oper2 = sel.recv(&r2);
889 ///
890 /// // Both operations are initially ready, so a random one will be chosen.
891 /// match sel.try_ready() {
892 /// Err(_) => panic!("both operations should be ready"),
893 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
894 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
895 /// Ok(_) => unreachable!(),
896 /// }
897 /// ```
898 pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
899 match run_ready(&mut self.handles, Timeout::Now) {
900 None => Err(TryReadyError),
901 Some(index) => Ok(index),
902 }
903 }
904
905 /// Blocks until one of the operations becomes ready.
906 ///
907 /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
908 /// the same time, a random one among them is chosen.
909 ///
910 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
911 /// even when it will simply return an error because the channel is disconnected.
912 ///
913 /// Note that this method might return with success spuriously, so it's a good idea to always
914 /// double check if the operation is really ready.
915 ///
916 /// # Panics
917 ///
918 /// Panics if no operations have been added to `Select`.
919 ///
920 /// # Examples
921 ///
922 /// ```
923 /// use std::thread;
924 /// use std::time::Duration;
925 /// use crossbeam_channel::{unbounded, Select};
926 ///
927 /// let (s1, r1) = unbounded();
928 /// let (s2, r2) = unbounded();
929 ///
930 /// thread::spawn(move || {
931 /// thread::sleep(Duration::from_secs(1));
932 /// s1.send(10).unwrap();
933 /// });
934 /// thread::spawn(move || s2.send(20).unwrap());
935 ///
936 /// let mut sel = Select::new();
937 /// let oper1 = sel.recv(&r1);
938 /// let oper2 = sel.recv(&r2);
939 ///
940 /// // The second operation will be selected because it becomes ready first.
941 /// match sel.ready() {
942 /// i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
943 /// i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
944 /// _ => unreachable!(),
945 /// }
946 /// ```
947 pub fn ready(&mut self) -> usize {
948 if self.handles.is_empty() {
949 panic!("no operations have been added to `Select`");
950 }
951
952 run_ready(&mut self.handles, Timeout::Never).unwrap()
953 }
954
955 /// Blocks for a limited time until one of the operations becomes ready.
956 ///
957 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
958 /// the same time, a random one among them is chosen. If none of the operations become ready
959 /// for the specified duration, an error is returned.
960 ///
961 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
962 /// even when it will simply return an error because the channel is disconnected.
963 ///
964 /// Note that this method might return with success spuriously, so it's a good idea to double
965 /// check if the operation is really ready.
966 ///
967 /// # Examples
968 ///
969 /// ```
970 /// use std::thread;
971 /// use std::time::Duration;
972 /// use crossbeam_channel::{unbounded, Select};
973 ///
974 /// let (s1, r1) = unbounded();
975 /// let (s2, r2) = unbounded();
976 ///
977 /// thread::spawn(move || {
978 /// thread::sleep(Duration::from_secs(1));
979 /// s1.send(10).unwrap();
980 /// });
981 /// thread::spawn(move || s2.send(20).unwrap());
982 ///
983 /// let mut sel = Select::new();
984 /// let oper1 = sel.recv(&r1);
985 /// let oper2 = sel.recv(&r2);
986 ///
987 /// // The second operation will be selected because it becomes ready first.
988 /// match sel.ready_timeout(Duration::from_millis(500)) {
989 /// Err(_) => panic!("should not have timed out"),
990 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
991 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
992 /// Ok(_) => unreachable!(),
993 /// }
994 /// ```
995 pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
996 let timeout = Timeout::At(Instant::now() + timeout);
997
998 match run_ready(&mut self.handles, timeout) {
999 None => Err(ReadyTimeoutError),
1000 Some(index) => Ok(index),
1001 }
1002 }
1003 }
1004
1005 impl<'a> Clone for Select<'a> {
1006 fn clone(&self) -> Select<'a> {
1007 Select {
1008 handles: self.handles.clone(),
1009 next_index: self.next_index,
1010 }
1011 }
1012 }
1013
1014 impl<'a> Default for Select<'a> {
1015 fn default() -> Select<'a> {
1016 Select::new()
1017 }
1018 }
1019
1020 impl<'a> fmt::Debug for Select<'a> {
1021 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1022 f.pad("Select { .. }")
1023 }
1024 }
1025
1026 /// A selected operation that needs to be completed.
1027 ///
1028 /// To complete the operation, call [`send`] or [`recv`].
1029 ///
1030 /// # Panics
1031 ///
1032 /// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1033 /// `SelectedOperation` is dropped without completion, a panic occurs.
1034 ///
1035 /// [`send`]: struct.SelectedOperation.html#method.send
1036 /// [`recv`]: struct.SelectedOperation.html#method.recv
1037 #[must_use]
1038 pub struct SelectedOperation<'a> {
1039 /// Token needed to complete the operation.
1040 token: Token,
1041
1042 /// The index of the selected operation.
1043 index: usize,
1044
1045 /// The address of the selected `Sender` or `Receiver`.
1046 ptr: *const u8,
1047
1048 /// Indicates that `Sender`s and `Receiver`s are borrowed.
1049 _marker: PhantomData<&'a ()>,
1050 }
1051
1052 impl<'a> SelectedOperation<'a> {
1053 /// Returns the index of the selected operation.
1054 ///
1055 /// # Examples
1056 ///
1057 /// ```
1058 /// use crossbeam_channel::{bounded, Select};
1059 ///
1060 /// let (s1, r1) = bounded::<()>(0);
1061 /// let (s2, r2) = bounded::<()>(0);
1062 /// let (s3, r3) = bounded::<()>(1);
1063 ///
1064 /// let mut sel = Select::new();
1065 /// let oper1 = sel.send(&s1);
1066 /// let oper2 = sel.recv(&r2);
1067 /// let oper3 = sel.send(&s3);
1068 ///
1069 /// // Only the last operation is ready.
1070 /// let oper = sel.select();
1071 /// assert_eq!(oper.index(), 2);
1072 /// assert_eq!(oper.index(), oper3);
1073 ///
1074 /// // Complete the operation.
1075 /// oper.send(&s3, ()).unwrap();
1076 /// ```
1077 pub fn index(&self) -> usize {
1078 self.index
1079 }
1080
1081 /// Completes the send operation.
1082 ///
1083 /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1084 /// when the operation was added.
1085 ///
1086 /// # Panics
1087 ///
1088 /// Panics if an incorrect [`Sender`] reference is passed.
1089 ///
1090 /// # Examples
1091 ///
1092 /// ```
1093 /// use crossbeam_channel::{bounded, Select, SendError};
1094 ///
1095 /// let (s, r) = bounded::<i32>(0);
1096 /// drop(r);
1097 ///
1098 /// let mut sel = Select::new();
1099 /// let oper1 = sel.send(&s);
1100 ///
1101 /// let oper = sel.select();
1102 /// assert_eq!(oper.index(), oper1);
1103 /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1104 /// ```
1105 ///
1106 /// [`Sender`]: struct.Sender.html
1107 /// [`Select::send`]: struct.Select.html#method.send
1108 pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1109 assert!(
1110 s as *const Sender<T> as *const u8 == self.ptr,
1111 "passed a sender that wasn't selected",
1112 );
1113 let res = unsafe { channel::write(s, &mut self.token, msg) };
1114 mem::forget(self);
1115 res.map_err(SendError)
1116 }
1117
1118 /// Completes the receive operation.
1119 ///
1120 /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1121 /// when the operation was added.
1122 ///
1123 /// # Panics
1124 ///
1125 /// Panics if an incorrect [`Receiver`] reference is passed.
1126 ///
1127 /// # Examples
1128 ///
1129 /// ```
1130 /// use crossbeam_channel::{bounded, Select, RecvError};
1131 ///
1132 /// let (s, r) = bounded::<i32>(0);
1133 /// drop(s);
1134 ///
1135 /// let mut sel = Select::new();
1136 /// let oper1 = sel.recv(&r);
1137 ///
1138 /// let oper = sel.select();
1139 /// assert_eq!(oper.index(), oper1);
1140 /// assert_eq!(oper.recv(&r), Err(RecvError));
1141 /// ```
1142 ///
1143 /// [`Receiver`]: struct.Receiver.html
1144 /// [`Select::recv`]: struct.Select.html#method.recv
1145 pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1146 assert!(
1147 r as *const Receiver<T> as *const u8 == self.ptr,
1148 "passed a receiver that wasn't selected",
1149 );
1150 let res = unsafe { channel::read(r, &mut self.token) };
1151 mem::forget(self);
1152 res.map_err(|_| RecvError)
1153 }
1154 }
1155
1156 impl<'a> fmt::Debug for SelectedOperation<'a> {
1157 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1158 f.pad("SelectedOperation { .. }")
1159 }
1160 }
1161
1162 impl<'a> Drop for SelectedOperation<'a> {
1163 fn drop(&mut self) {
1164 panic!("dropped `SelectedOperation` without completing the operation");
1165 }
1166 }