]>
Commit | Line | Data |
---|---|---|
1b1a35ee XL |
1 | //! Interface to the select mechanism. |
2 | ||
3 | use std::fmt; | |
4 | use std::marker::PhantomData; | |
5 | use std::mem; | |
6 | use std::time::{Duration, Instant}; | |
7 | ||
8 | use crossbeam_utils::Backoff; | |
9 | ||
5869c6ff XL |
10 | use crate::channel::{self, Receiver, Sender}; |
11 | use crate::context::Context; | |
12 | use crate::err::{ReadyTimeoutError, TryReadyError}; | |
13 | use crate::err::{RecvError, SendError}; | |
14 | use crate::err::{SelectTimeoutError, TrySelectError}; | |
15 | use crate::flavors; | |
16 | use crate::utils; | |
1b1a35ee XL |
17 | |
18 | /// Temporary data that gets initialized during select or a blocking operation, and is consumed by | |
19 | /// `read` or `write`. | |
20 | /// | |
21 | /// Each field contains data associated with a specific channel flavor. | |
22 | #[derive(Debug, Default)] | |
23 | pub struct Token { | |
5869c6ff | 24 | pub at: flavors::at::AtToken, |
1b1a35ee XL |
25 | pub array: flavors::array::ArrayToken, |
26 | pub list: flavors::list::ListToken, | |
27 | pub never: flavors::never::NeverToken, | |
28 | pub tick: flavors::tick::TickToken, | |
29 | pub zero: flavors::zero::ZeroToken, | |
30 | } | |
31 | ||
32 | /// Identifier associated with an operation by a specific thread on a specific channel. | |
33 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | |
34 | pub struct Operation(usize); | |
35 | ||
36 | impl Operation { | |
37 | /// Creates an operation identifier from a mutable reference. | |
38 | /// | |
39 | /// This function essentially just turns the address of the reference into a number. The | |
40 | /// reference should point to a variable that is specific to the thread and the operation, | |
41 | /// and is alive for the entire duration of select or blocking operation. | |
42 | #[inline] | |
43 | pub fn hook<T>(r: &mut T) -> Operation { | |
44 | let val = r as *mut T as usize; | |
45 | // Make sure that the pointer address doesn't equal the numerical representation of | |
46 | // `Selected::{Waiting, Aborted, Disconnected}`. | |
47 | assert!(val > 2); | |
48 | Operation(val) | |
49 | } | |
50 | } | |
51 | ||
52 | /// Current state of a select or a blocking operation. | |
53 | #[derive(Debug, Clone, Copy, PartialEq, Eq)] | |
54 | pub enum Selected { | |
55 | /// Still waiting for an operation. | |
56 | Waiting, | |
57 | ||
58 | /// The attempt to block the current thread has been aborted. | |
59 | Aborted, | |
60 | ||
61 | /// An operation became ready because a channel is disconnected. | |
62 | Disconnected, | |
63 | ||
64 | /// An operation became ready because a message can be sent or received. | |
65 | Operation(Operation), | |
66 | } | |
67 | ||
68 | impl From<usize> for Selected { | |
69 | #[inline] | |
70 | fn from(val: usize) -> Selected { | |
71 | match val { | |
72 | 0 => Selected::Waiting, | |
73 | 1 => Selected::Aborted, | |
74 | 2 => Selected::Disconnected, | |
75 | oper => Selected::Operation(Operation(oper)), | |
76 | } | |
77 | } | |
78 | } | |
79 | ||
80 | impl Into<usize> for Selected { | |
81 | #[inline] | |
82 | fn into(self) -> usize { | |
83 | match self { | |
84 | Selected::Waiting => 0, | |
85 | Selected::Aborted => 1, | |
86 | Selected::Disconnected => 2, | |
87 | Selected::Operation(Operation(val)) => val, | |
88 | } | |
89 | } | |
90 | } | |
91 | ||
92 | /// A receiver or a sender that can participate in select. | |
93 | /// | |
94 | /// This is a handle that assists select in executing an operation, registration, deciding on the | |
95 | /// appropriate deadline for blocking, etc. | |
96 | pub trait SelectHandle { | |
97 | /// Attempts to select an operation and returns `true` on success. | |
98 | fn try_select(&self, token: &mut Token) -> bool; | |
99 | ||
100 | /// Returns a deadline for an operation, if there is one. | |
101 | fn deadline(&self) -> Option<Instant>; | |
102 | ||
103 | /// Registers an operation for execution and returns `true` if it is now ready. | |
104 | fn register(&self, oper: Operation, cx: &Context) -> bool; | |
105 | ||
106 | /// Unregisters an operation for execution. | |
107 | fn unregister(&self, oper: Operation); | |
108 | ||
109 | /// Attempts to select an operation the thread got woken up for and returns `true` on success. | |
110 | fn accept(&self, token: &mut Token, cx: &Context) -> bool; | |
111 | ||
112 | /// Returns `true` if an operation can be executed without blocking. | |
113 | fn is_ready(&self) -> bool; | |
114 | ||
115 | /// Registers an operation for readiness notification and returns `true` if it is now ready. | |
116 | fn watch(&self, oper: Operation, cx: &Context) -> bool; | |
117 | ||
118 | /// Unregisters an operation for readiness notification. | |
119 | fn unwatch(&self, oper: Operation); | |
120 | } | |
121 | ||
5869c6ff | 122 | impl<T: SelectHandle> SelectHandle for &T { |
1b1a35ee XL |
123 | fn try_select(&self, token: &mut Token) -> bool { |
124 | (**self).try_select(token) | |
125 | } | |
126 | ||
127 | fn deadline(&self) -> Option<Instant> { | |
128 | (**self).deadline() | |
129 | } | |
130 | ||
131 | fn register(&self, oper: Operation, cx: &Context) -> bool { | |
132 | (**self).register(oper, cx) | |
133 | } | |
134 | ||
135 | fn unregister(&self, oper: Operation) { | |
136 | (**self).unregister(oper); | |
137 | } | |
138 | ||
139 | fn accept(&self, token: &mut Token, cx: &Context) -> bool { | |
140 | (**self).accept(token, cx) | |
141 | } | |
142 | ||
143 | fn is_ready(&self) -> bool { | |
144 | (**self).is_ready() | |
145 | } | |
146 | ||
147 | fn watch(&self, oper: Operation, cx: &Context) -> bool { | |
148 | (**self).watch(oper, cx) | |
149 | } | |
150 | ||
151 | fn unwatch(&self, oper: Operation) { | |
152 | (**self).unwatch(oper) | |
153 | } | |
154 | } | |
155 | ||
156 | /// Determines when a select operation should time out. | |
157 | #[derive(Clone, Copy, Eq, PartialEq)] | |
158 | enum Timeout { | |
159 | /// No blocking. | |
160 | Now, | |
161 | ||
162 | /// Block forever. | |
163 | Never, | |
164 | ||
165 | /// Time out after the time instant. | |
166 | At(Instant), | |
167 | } | |
168 | ||
169 | /// Runs until one of the operations is selected, potentially blocking the current thread. | |
170 | /// | |
171 | /// Successful receive operations will have to be followed up by `channel::read()` and successful | |
172 | /// send operations by `channel::write()`. | |
173 | fn run_select( | |
174 | handles: &mut [(&dyn SelectHandle, usize, *const u8)], | |
175 | timeout: Timeout, | |
176 | ) -> Option<(Token, usize, *const u8)> { | |
177 | if handles.is_empty() { | |
178 | // Wait until the timeout and return. | |
179 | match timeout { | |
180 | Timeout::Now => return None, | |
181 | Timeout::Never => { | |
182 | utils::sleep_until(None); | |
183 | unreachable!(); | |
184 | } | |
185 | Timeout::At(when) => { | |
186 | utils::sleep_until(Some(when)); | |
187 | return None; | |
188 | } | |
189 | } | |
190 | } | |
191 | ||
192 | // Shuffle the operations for fairness. | |
193 | utils::shuffle(handles); | |
194 | ||
195 | // Create a token, which serves as a temporary variable that gets initialized in this function | |
196 | // and is later used by a call to `channel::read()` or `channel::write()` that completes the | |
197 | // selected operation. | |
198 | let mut token = Token::default(); | |
199 | ||
200 | // Try selecting one of the operations without blocking. | |
201 | for &(handle, i, ptr) in handles.iter() { | |
202 | if handle.try_select(&mut token) { | |
203 | return Some((token, i, ptr)); | |
204 | } | |
205 | } | |
206 | ||
207 | loop { | |
208 | // Prepare for blocking. | |
209 | let res = Context::with(|cx| { | |
210 | let mut sel = Selected::Waiting; | |
211 | let mut registered_count = 0; | |
212 | let mut index_ready = None; | |
213 | ||
214 | if let Timeout::Now = timeout { | |
215 | cx.try_select(Selected::Aborted).unwrap(); | |
216 | } | |
217 | ||
218 | // Register all operations. | |
219 | for (handle, i, _) in handles.iter_mut() { | |
220 | registered_count += 1; | |
221 | ||
222 | // If registration returns `false`, that means the operation has just become ready. | |
223 | if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) { | |
224 | // Try aborting select. | |
225 | sel = match cx.try_select(Selected::Aborted) { | |
226 | Ok(()) => { | |
227 | index_ready = Some(*i); | |
228 | Selected::Aborted | |
229 | } | |
230 | Err(s) => s, | |
231 | }; | |
232 | break; | |
233 | } | |
234 | ||
235 | // If another thread has already selected one of the operations, stop registration. | |
236 | sel = cx.selected(); | |
237 | if sel != Selected::Waiting { | |
238 | break; | |
239 | } | |
240 | } | |
241 | ||
242 | if sel == Selected::Waiting { | |
243 | // Check with each operation for how long we're allowed to block, and compute the | |
244 | // earliest deadline. | |
245 | let mut deadline: Option<Instant> = match timeout { | |
246 | Timeout::Now => return None, | |
247 | Timeout::Never => None, | |
248 | Timeout::At(when) => Some(when), | |
249 | }; | |
250 | for &(handle, _, _) in handles.iter() { | |
251 | if let Some(x) = handle.deadline() { | |
252 | deadline = deadline.map(|y| x.min(y)).or(Some(x)); | |
253 | } | |
254 | } | |
255 | ||
256 | // Block the current thread. | |
257 | sel = cx.wait_until(deadline); | |
258 | } | |
259 | ||
260 | // Unregister all registered operations. | |
261 | for (handle, _, _) in handles.iter_mut().take(registered_count) { | |
262 | handle.unregister(Operation::hook::<&dyn SelectHandle>(handle)); | |
263 | } | |
264 | ||
265 | match sel { | |
266 | Selected::Waiting => unreachable!(), | |
267 | Selected::Aborted => { | |
268 | // If an operation became ready during registration, try selecting it. | |
269 | if let Some(index_ready) = index_ready { | |
270 | for &(handle, i, ptr) in handles.iter() { | |
271 | if i == index_ready && handle.try_select(&mut token) { | |
272 | return Some((i, ptr)); | |
273 | } | |
274 | } | |
275 | } | |
276 | } | |
277 | Selected::Disconnected => {} | |
278 | Selected::Operation(_) => { | |
279 | // Find the selected operation. | |
280 | for (handle, i, ptr) in handles.iter_mut() { | |
281 | // Is this the selected operation? | |
282 | if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle)) | |
283 | { | |
284 | // Try selecting this operation. | |
285 | if handle.accept(&mut token, cx) { | |
286 | return Some((*i, *ptr)); | |
287 | } | |
288 | } | |
289 | } | |
290 | } | |
291 | } | |
292 | ||
293 | None | |
294 | }); | |
295 | ||
296 | // Return if an operation was selected. | |
297 | if let Some((i, ptr)) = res { | |
298 | return Some((token, i, ptr)); | |
299 | } | |
300 | ||
301 | // Try selecting one of the operations without blocking. | |
302 | for &(handle, i, ptr) in handles.iter() { | |
303 | if handle.try_select(&mut token) { | |
304 | return Some((token, i, ptr)); | |
305 | } | |
306 | } | |
307 | ||
308 | match timeout { | |
309 | Timeout::Now => return None, | |
310 | Timeout::Never => {} | |
311 | Timeout::At(when) => { | |
312 | if Instant::now() >= when { | |
313 | return None; | |
314 | } | |
315 | } | |
316 | } | |
317 | } | |
318 | } | |
319 | ||
320 | /// Runs until one of the operations becomes ready, potentially blocking the current thread. | |
321 | fn run_ready( | |
322 | handles: &mut [(&dyn SelectHandle, usize, *const u8)], | |
323 | timeout: Timeout, | |
324 | ) -> Option<usize> { | |
325 | if handles.is_empty() { | |
326 | // Wait until the timeout and return. | |
327 | match timeout { | |
328 | Timeout::Now => return None, | |
329 | Timeout::Never => { | |
330 | utils::sleep_until(None); | |
331 | unreachable!(); | |
332 | } | |
333 | Timeout::At(when) => { | |
334 | utils::sleep_until(Some(when)); | |
335 | return None; | |
336 | } | |
337 | } | |
338 | } | |
339 | ||
340 | // Shuffle the operations for fairness. | |
341 | utils::shuffle(handles); | |
342 | ||
343 | loop { | |
344 | let backoff = Backoff::new(); | |
345 | loop { | |
346 | // Check operations for readiness. | |
347 | for &(handle, i, _) in handles.iter() { | |
348 | if handle.is_ready() { | |
349 | return Some(i); | |
350 | } | |
351 | } | |
352 | ||
353 | if backoff.is_completed() { | |
354 | break; | |
355 | } else { | |
356 | backoff.snooze(); | |
357 | } | |
358 | } | |
359 | ||
360 | // Check for timeout. | |
361 | match timeout { | |
362 | Timeout::Now => return None, | |
363 | Timeout::Never => {} | |
364 | Timeout::At(when) => { | |
365 | if Instant::now() >= when { | |
366 | return None; | |
367 | } | |
368 | } | |
369 | } | |
370 | ||
371 | // Prepare for blocking. | |
372 | let res = Context::with(|cx| { | |
373 | let mut sel = Selected::Waiting; | |
374 | let mut registered_count = 0; | |
375 | ||
376 | // Begin watching all operations. | |
377 | for (handle, _, _) in handles.iter_mut() { | |
378 | registered_count += 1; | |
379 | let oper = Operation::hook::<&dyn SelectHandle>(handle); | |
380 | ||
381 | // If registration returns `false`, that means the operation has just become ready. | |
382 | if handle.watch(oper, cx) { | |
383 | sel = match cx.try_select(Selected::Operation(oper)) { | |
384 | Ok(()) => Selected::Operation(oper), | |
385 | Err(s) => s, | |
386 | }; | |
387 | break; | |
388 | } | |
389 | ||
390 | // If another thread has already chosen one of the operations, stop registration. | |
391 | sel = cx.selected(); | |
392 | if sel != Selected::Waiting { | |
393 | break; | |
394 | } | |
395 | } | |
396 | ||
397 | if sel == Selected::Waiting { | |
398 | // Check with each operation for how long we're allowed to block, and compute the | |
399 | // earliest deadline. | |
400 | let mut deadline: Option<Instant> = match timeout { | |
401 | Timeout::Now => unreachable!(), | |
402 | Timeout::Never => None, | |
403 | Timeout::At(when) => Some(when), | |
404 | }; | |
405 | for &(handle, _, _) in handles.iter() { | |
406 | if let Some(x) = handle.deadline() { | |
407 | deadline = deadline.map(|y| x.min(y)).or(Some(x)); | |
408 | } | |
409 | } | |
410 | ||
411 | // Block the current thread. | |
412 | sel = cx.wait_until(deadline); | |
413 | } | |
414 | ||
415 | // Unwatch all operations. | |
416 | for (handle, _, _) in handles.iter_mut().take(registered_count) { | |
417 | handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle)); | |
418 | } | |
419 | ||
420 | match sel { | |
421 | Selected::Waiting => unreachable!(), | |
422 | Selected::Aborted => {} | |
423 | Selected::Disconnected => {} | |
424 | Selected::Operation(_) => { | |
425 | for (handle, i, _) in handles.iter_mut() { | |
426 | let oper = Operation::hook::<&dyn SelectHandle>(handle); | |
427 | if sel == Selected::Operation(oper) { | |
428 | return Some(*i); | |
429 | } | |
430 | } | |
431 | } | |
432 | } | |
433 | ||
434 | None | |
435 | }); | |
436 | ||
437 | // Return if an operation became ready. | |
438 | if res.is_some() { | |
439 | return res; | |
440 | } | |
441 | } | |
442 | } | |
443 | ||
444 | /// Attempts to select one of the operations without blocking. | |
445 | #[inline] | |
446 | pub fn try_select<'a>( | |
447 | handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], | |
448 | ) -> Result<SelectedOperation<'a>, TrySelectError> { | |
449 | match run_select(handles, Timeout::Now) { | |
450 | None => Err(TrySelectError), | |
451 | Some((token, index, ptr)) => Ok(SelectedOperation { | |
452 | token, | |
453 | index, | |
454 | ptr, | |
455 | _marker: PhantomData, | |
456 | }), | |
457 | } | |
458 | } | |
459 | ||
460 | /// Blocks until one of the operations becomes ready and selects it. | |
461 | #[inline] | |
462 | pub fn select<'a>( | |
463 | handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], | |
464 | ) -> SelectedOperation<'a> { | |
465 | if handles.is_empty() { | |
466 | panic!("no operations have been added to `Select`"); | |
467 | } | |
468 | ||
469 | let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap(); | |
470 | SelectedOperation { | |
471 | token, | |
472 | index, | |
473 | ptr, | |
474 | _marker: PhantomData, | |
475 | } | |
476 | } | |
477 | ||
478 | /// Blocks for a limited time until one of the operations becomes ready and selects it. | |
479 | #[inline] | |
480 | pub fn select_timeout<'a>( | |
481 | handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], | |
482 | timeout: Duration, | |
483 | ) -> Result<SelectedOperation<'a>, SelectTimeoutError> { | |
5869c6ff XL |
484 | select_deadline(handles, Instant::now() + timeout) |
485 | } | |
1b1a35ee | 486 | |
5869c6ff XL |
487 | /// Blocks until a given deadline, or until one of the operations becomes ready and selects it. |
488 | #[inline] | |
489 | pub fn select_deadline<'a>( | |
490 | handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], | |
491 | deadline: Instant, | |
492 | ) -> Result<SelectedOperation<'a>, SelectTimeoutError> { | |
493 | match run_select(handles, Timeout::At(deadline)) { | |
1b1a35ee XL |
494 | None => Err(SelectTimeoutError), |
495 | Some((token, index, ptr)) => Ok(SelectedOperation { | |
496 | token, | |
497 | index, | |
498 | ptr, | |
499 | _marker: PhantomData, | |
500 | }), | |
501 | } | |
502 | } | |
503 | ||
504 | /// Selects from a set of channel operations. | |
505 | /// | |
506 | /// `Select` allows you to define a set of channel operations, wait until any one of them becomes | |
507 | /// ready, and finally execute it. If multiple operations are ready at the same time, a random one | |
508 | /// among them is selected. | |
509 | /// | |
510 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready even | |
511 | /// when it will simply return an error because the channel is disconnected. | |
512 | /// | |
513 | /// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a | |
514 | /// dynamically created list of channel operations. | |
515 | /// | |
516 | /// Once a list of operations has been built with `Select`, there are two different ways of | |
517 | /// proceeding: | |
518 | /// | |
519 | /// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful, | |
520 | /// the returned selected operation has already begun and **must** be completed. If we don't | |
521 | /// complete it, a panic will occur. | |
522 | /// | |
523 | /// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If | |
524 | /// successful, we may attempt to execute the operation, but are not obliged to. In fact, it's | |
525 | /// possible for another thread to make the operation not ready just before we try executing it, | |
526 | /// so it's wise to use a retry loop. However, note that these methods might return with success | |
527 | /// spuriously, so it's a good idea to always double check if the operation is really ready. | |
528 | /// | |
529 | /// # Examples | |
530 | /// | |
531 | /// Use [`select`] to receive a message from a list of receivers: | |
532 | /// | |
533 | /// ``` | |
534 | /// use crossbeam_channel::{Receiver, RecvError, Select}; | |
535 | /// | |
536 | /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> { | |
537 | /// // Build a list of operations. | |
538 | /// let mut sel = Select::new(); | |
539 | /// for r in rs { | |
540 | /// sel.recv(r); | |
541 | /// } | |
542 | /// | |
543 | /// // Complete the selected operation. | |
544 | /// let oper = sel.select(); | |
545 | /// let index = oper.index(); | |
546 | /// oper.recv(&rs[index]) | |
547 | /// } | |
548 | /// ``` | |
549 | /// | |
550 | /// Use [`ready`] to receive a message from a list of receivers: | |
551 | /// | |
552 | /// ``` | |
553 | /// use crossbeam_channel::{Receiver, RecvError, Select}; | |
554 | /// | |
555 | /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> { | |
556 | /// // Build a list of operations. | |
557 | /// let mut sel = Select::new(); | |
558 | /// for r in rs { | |
559 | /// sel.recv(r); | |
560 | /// } | |
561 | /// | |
562 | /// loop { | |
563 | /// // Wait until a receive operation becomes ready and try executing it. | |
564 | /// let index = sel.ready(); | |
565 | /// let res = rs[index].try_recv(); | |
566 | /// | |
567 | /// // If the operation turns out not to be ready, retry. | |
568 | /// if let Err(e) = res { | |
569 | /// if e.is_empty() { | |
570 | /// continue; | |
571 | /// } | |
572 | /// } | |
573 | /// | |
574 | /// // Success! | |
575 | /// return res.map_err(|_| RecvError); | |
576 | /// } | |
577 | /// } | |
578 | /// ``` | |
579 | /// | |
5869c6ff XL |
580 | /// [`try_select`]: Select::try_select |
581 | /// [`select`]: Select::select | |
582 | /// [`select_timeout`]: Select::select_timeout | |
583 | /// [`try_ready`]: Select::try_ready | |
584 | /// [`ready`]: Select::ready | |
585 | /// [`ready_timeout`]: Select::ready_timeout | |
1b1a35ee XL |
586 | pub struct Select<'a> { |
587 | /// A list of senders and receivers participating in selection. | |
588 | handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>, | |
589 | ||
590 | /// The next index to assign to an operation. | |
591 | next_index: usize, | |
592 | } | |
593 | ||
5869c6ff XL |
594 | unsafe impl Send for Select<'_> {} |
595 | unsafe impl Sync for Select<'_> {} | |
1b1a35ee XL |
596 | |
597 | impl<'a> Select<'a> { | |
598 | /// Creates an empty list of channel operations for selection. | |
599 | /// | |
600 | /// # Examples | |
601 | /// | |
602 | /// ``` | |
603 | /// use crossbeam_channel::Select; | |
604 | /// | |
605 | /// let mut sel = Select::new(); | |
606 | /// | |
607 | /// // The list of operations is empty, which means no operation can be selected. | |
608 | /// assert!(sel.try_select().is_err()); | |
609 | /// ``` | |
610 | pub fn new() -> Select<'a> { | |
611 | Select { | |
612 | handles: Vec::with_capacity(4), | |
613 | next_index: 0, | |
614 | } | |
615 | } | |
616 | ||
617 | /// Adds a send operation. | |
618 | /// | |
619 | /// Returns the index of the added operation. | |
620 | /// | |
621 | /// # Examples | |
622 | /// | |
623 | /// ``` | |
1b1a35ee XL |
624 | /// use crossbeam_channel::{unbounded, Select}; |
625 | /// | |
626 | /// let (s, r) = unbounded::<i32>(); | |
627 | /// | |
628 | /// let mut sel = Select::new(); | |
629 | /// let index = sel.send(&s); | |
630 | /// ``` | |
631 | pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize { | |
632 | let i = self.next_index; | |
633 | let ptr = s as *const Sender<_> as *const u8; | |
634 | self.handles.push((s, i, ptr)); | |
635 | self.next_index += 1; | |
636 | i | |
637 | } | |
638 | ||
639 | /// Adds a receive operation. | |
640 | /// | |
641 | /// Returns the index of the added operation. | |
642 | /// | |
643 | /// # Examples | |
644 | /// | |
645 | /// ``` | |
1b1a35ee XL |
646 | /// use crossbeam_channel::{unbounded, Select}; |
647 | /// | |
648 | /// let (s, r) = unbounded::<i32>(); | |
649 | /// | |
650 | /// let mut sel = Select::new(); | |
651 | /// let index = sel.recv(&r); | |
652 | /// ``` | |
653 | pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize { | |
654 | let i = self.next_index; | |
655 | let ptr = r as *const Receiver<_> as *const u8; | |
656 | self.handles.push((r, i, ptr)); | |
657 | self.next_index += 1; | |
658 | i | |
659 | } | |
660 | ||
661 | /// Removes a previously added operation. | |
662 | /// | |
663 | /// This is useful when an operation is selected because the channel got disconnected and we | |
664 | /// want to try again to select a different operation instead. | |
665 | /// | |
666 | /// If new operations are added after removing some, the indices of removed operations will not | |
667 | /// be reused. | |
668 | /// | |
669 | /// # Panics | |
670 | /// | |
671 | /// An attempt to remove a non-existing or already removed operation will panic. | |
672 | /// | |
673 | /// # Examples | |
674 | /// | |
675 | /// ``` | |
1b1a35ee XL |
676 | /// use crossbeam_channel::{unbounded, Select}; |
677 | /// | |
678 | /// let (s1, r1) = unbounded::<i32>(); | |
679 | /// let (_, r2) = unbounded::<i32>(); | |
680 | /// | |
681 | /// let mut sel = Select::new(); | |
682 | /// let oper1 = sel.recv(&r1); | |
683 | /// let oper2 = sel.recv(&r2); | |
684 | /// | |
685 | /// // Both operations are initially ready, so a random one will be executed. | |
686 | /// let oper = sel.select(); | |
687 | /// assert_eq!(oper.index(), oper2); | |
688 | /// assert!(oper.recv(&r2).is_err()); | |
689 | /// sel.remove(oper2); | |
690 | /// | |
691 | /// s1.send(10).unwrap(); | |
692 | /// | |
693 | /// let oper = sel.select(); | |
694 | /// assert_eq!(oper.index(), oper1); | |
695 | /// assert_eq!(oper.recv(&r1), Ok(10)); | |
696 | /// ``` | |
697 | pub fn remove(&mut self, index: usize) { | |
698 | assert!( | |
699 | index < self.next_index, | |
700 | "index out of bounds; {} >= {}", | |
701 | index, | |
702 | self.next_index, | |
703 | ); | |
704 | ||
705 | let i = self | |
706 | .handles | |
707 | .iter() | |
708 | .enumerate() | |
709 | .find(|(_, (_, i, _))| *i == index) | |
710 | .expect("no operation with this index") | |
711 | .0; | |
712 | ||
713 | self.handles.swap_remove(i); | |
714 | } | |
715 | ||
716 | /// Attempts to select one of the operations without blocking. | |
717 | /// | |
718 | /// If an operation is ready, it is selected and returned. If multiple operations are ready at | |
719 | /// the same time, a random one among them is selected. If none of the operations are ready, an | |
720 | /// error is returned. | |
721 | /// | |
722 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready | |
723 | /// even when it will simply return an error because the channel is disconnected. | |
724 | /// | |
725 | /// The selected operation must be completed with [`SelectedOperation::send`] | |
726 | /// or [`SelectedOperation::recv`]. | |
727 | /// | |
1b1a35ee XL |
728 | /// # Examples |
729 | /// | |
730 | /// ``` | |
1b1a35ee XL |
731 | /// use crossbeam_channel::{unbounded, Select}; |
732 | /// | |
733 | /// let (s1, r1) = unbounded(); | |
734 | /// let (s2, r2) = unbounded(); | |
735 | /// | |
736 | /// s1.send(10).unwrap(); | |
737 | /// s2.send(20).unwrap(); | |
738 | /// | |
739 | /// let mut sel = Select::new(); | |
740 | /// let oper1 = sel.recv(&r1); | |
741 | /// let oper2 = sel.recv(&r2); | |
742 | /// | |
743 | /// // Both operations are initially ready, so a random one will be executed. | |
744 | /// let oper = sel.try_select(); | |
745 | /// match oper { | |
746 | /// Err(_) => panic!("both operations should be ready"), | |
747 | /// Ok(oper) => match oper.index() { | |
748 | /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)), | |
749 | /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)), | |
750 | /// _ => unreachable!(), | |
751 | /// } | |
752 | /// } | |
753 | /// ``` | |
754 | pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> { | |
755 | try_select(&mut self.handles) | |
756 | } | |
757 | ||
758 | /// Blocks until one of the operations becomes ready and selects it. | |
759 | /// | |
760 | /// Once an operation becomes ready, it is selected and returned. If multiple operations are | |
761 | /// ready at the same time, a random one among them is selected. | |
762 | /// | |
763 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready | |
764 | /// even when it will simply return an error because the channel is disconnected. | |
765 | /// | |
766 | /// The selected operation must be completed with [`SelectedOperation::send`] | |
767 | /// or [`SelectedOperation::recv`]. | |
768 | /// | |
1b1a35ee XL |
769 | /// # Panics |
770 | /// | |
771 | /// Panics if no operations have been added to `Select`. | |
772 | /// | |
773 | /// # Examples | |
774 | /// | |
775 | /// ``` | |
776 | /// use std::thread; | |
777 | /// use std::time::Duration; | |
778 | /// use crossbeam_channel::{unbounded, Select}; | |
779 | /// | |
780 | /// let (s1, r1) = unbounded(); | |
781 | /// let (s2, r2) = unbounded(); | |
782 | /// | |
783 | /// thread::spawn(move || { | |
784 | /// thread::sleep(Duration::from_secs(1)); | |
785 | /// s1.send(10).unwrap(); | |
786 | /// }); | |
787 | /// thread::spawn(move || s2.send(20).unwrap()); | |
788 | /// | |
789 | /// let mut sel = Select::new(); | |
790 | /// let oper1 = sel.recv(&r1); | |
791 | /// let oper2 = sel.recv(&r2); | |
792 | /// | |
793 | /// // The second operation will be selected because it becomes ready first. | |
794 | /// let oper = sel.select(); | |
795 | /// match oper.index() { | |
796 | /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)), | |
797 | /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)), | |
798 | /// _ => unreachable!(), | |
799 | /// } | |
800 | /// ``` | |
801 | pub fn select(&mut self) -> SelectedOperation<'a> { | |
802 | select(&mut self.handles) | |
803 | } | |
804 | ||
805 | /// Blocks for a limited time until one of the operations becomes ready and selects it. | |
806 | /// | |
807 | /// If an operation becomes ready, it is selected and returned. If multiple operations are | |
808 | /// ready at the same time, a random one among them is selected. If none of the operations | |
809 | /// become ready for the specified duration, an error is returned. | |
810 | /// | |
811 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready | |
812 | /// even when it will simply return an error because the channel is disconnected. | |
813 | /// | |
814 | /// The selected operation must be completed with [`SelectedOperation::send`] | |
815 | /// or [`SelectedOperation::recv`]. | |
816 | /// | |
1b1a35ee XL |
817 | /// # Examples |
818 | /// | |
819 | /// ``` | |
820 | /// use std::thread; | |
821 | /// use std::time::Duration; | |
822 | /// use crossbeam_channel::{unbounded, Select}; | |
823 | /// | |
824 | /// let (s1, r1) = unbounded(); | |
825 | /// let (s2, r2) = unbounded(); | |
826 | /// | |
827 | /// thread::spawn(move || { | |
828 | /// thread::sleep(Duration::from_secs(1)); | |
829 | /// s1.send(10).unwrap(); | |
830 | /// }); | |
831 | /// thread::spawn(move || s2.send(20).unwrap()); | |
832 | /// | |
833 | /// let mut sel = Select::new(); | |
834 | /// let oper1 = sel.recv(&r1); | |
835 | /// let oper2 = sel.recv(&r2); | |
836 | /// | |
837 | /// // The second operation will be selected because it becomes ready first. | |
838 | /// let oper = sel.select_timeout(Duration::from_millis(500)); | |
839 | /// match oper { | |
840 | /// Err(_) => panic!("should not have timed out"), | |
841 | /// Ok(oper) => match oper.index() { | |
842 | /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)), | |
843 | /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)), | |
844 | /// _ => unreachable!(), | |
845 | /// } | |
846 | /// } | |
847 | /// ``` | |
848 | pub fn select_timeout( | |
849 | &mut self, | |
850 | timeout: Duration, | |
851 | ) -> Result<SelectedOperation<'a>, SelectTimeoutError> { | |
852 | select_timeout(&mut self.handles, timeout) | |
853 | } | |
854 | ||
5869c6ff XL |
855 | /// Blocks until a given deadline, or until one of the operations becomes ready and selects it. |
856 | /// | |
857 | /// If an operation becomes ready, it is selected and returned. If multiple operations are | |
858 | /// ready at the same time, a random one among them is selected. If none of the operations | |
859 | /// become ready before the given deadline, an error is returned. | |
860 | /// | |
861 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready | |
862 | /// even when it will simply return an error because the channel is disconnected. | |
863 | /// | |
864 | /// The selected operation must be completed with [`SelectedOperation::send`] | |
865 | /// or [`SelectedOperation::recv`]. | |
866 | /// | |
867 | /// [`SelectedOperation::send`]: struct.SelectedOperation.html#method.send | |
868 | /// [`SelectedOperation::recv`]: struct.SelectedOperation.html#method.recv | |
869 | /// | |
870 | /// # Examples | |
871 | /// | |
872 | /// ``` | |
873 | /// use std::thread; | |
874 | /// use std::time::{Instant, Duration}; | |
875 | /// use crossbeam_channel::{unbounded, Select}; | |
876 | /// | |
877 | /// let (s1, r1) = unbounded(); | |
878 | /// let (s2, r2) = unbounded(); | |
879 | /// | |
880 | /// thread::spawn(move || { | |
881 | /// thread::sleep(Duration::from_secs(1)); | |
882 | /// s1.send(10).unwrap(); | |
883 | /// }); | |
884 | /// thread::spawn(move || s2.send(20).unwrap()); | |
885 | /// | |
886 | /// let mut sel = Select::new(); | |
887 | /// let oper1 = sel.recv(&r1); | |
888 | /// let oper2 = sel.recv(&r2); | |
889 | /// | |
890 | /// let deadline = Instant::now() + Duration::from_millis(500); | |
891 | /// | |
892 | /// // The second operation will be selected because it becomes ready first. | |
893 | /// let oper = sel.select_deadline(deadline); | |
894 | /// match oper { | |
895 | /// Err(_) => panic!("should not have timed out"), | |
896 | /// Ok(oper) => match oper.index() { | |
897 | /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)), | |
898 | /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)), | |
899 | /// _ => unreachable!(), | |
900 | /// } | |
901 | /// } | |
902 | /// ``` | |
903 | pub fn select_deadline( | |
904 | &mut self, | |
905 | deadline: Instant, | |
906 | ) -> Result<SelectedOperation<'a>, SelectTimeoutError> { | |
907 | select_deadline(&mut self.handles, deadline) | |
908 | } | |
909 | ||
1b1a35ee XL |
910 | /// Attempts to find a ready operation without blocking. |
911 | /// | |
912 | /// If an operation is ready, its index is returned. If multiple operations are ready at the | |
913 | /// same time, a random one among them is chosen. If none of the operations are ready, an error | |
914 | /// is returned. | |
915 | /// | |
916 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready | |
917 | /// even when it will simply return an error because the channel is disconnected. | |
918 | /// | |
919 | /// Note that this method might return with success spuriously, so it's a good idea to always | |
920 | /// double check if the operation is really ready. | |
921 | /// | |
922 | /// # Examples | |
923 | /// | |
924 | /// ``` | |
1b1a35ee XL |
925 | /// use crossbeam_channel::{unbounded, Select}; |
926 | /// | |
927 | /// let (s1, r1) = unbounded(); | |
928 | /// let (s2, r2) = unbounded(); | |
929 | /// | |
930 | /// s1.send(10).unwrap(); | |
931 | /// s2.send(20).unwrap(); | |
932 | /// | |
933 | /// let mut sel = Select::new(); | |
934 | /// let oper1 = sel.recv(&r1); | |
935 | /// let oper2 = sel.recv(&r2); | |
936 | /// | |
937 | /// // Both operations are initially ready, so a random one will be chosen. | |
938 | /// match sel.try_ready() { | |
939 | /// Err(_) => panic!("both operations should be ready"), | |
940 | /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)), | |
941 | /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)), | |
942 | /// Ok(_) => unreachable!(), | |
943 | /// } | |
944 | /// ``` | |
945 | pub fn try_ready(&mut self) -> Result<usize, TryReadyError> { | |
946 | match run_ready(&mut self.handles, Timeout::Now) { | |
947 | None => Err(TryReadyError), | |
948 | Some(index) => Ok(index), | |
949 | } | |
950 | } | |
951 | ||
952 | /// Blocks until one of the operations becomes ready. | |
953 | /// | |
954 | /// Once an operation becomes ready, its index is returned. If multiple operations are ready at | |
955 | /// the same time, a random one among them is chosen. | |
956 | /// | |
957 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready | |
958 | /// even when it will simply return an error because the channel is disconnected. | |
959 | /// | |
960 | /// Note that this method might return with success spuriously, so it's a good idea to always | |
961 | /// double check if the operation is really ready. | |
962 | /// | |
963 | /// # Panics | |
964 | /// | |
965 | /// Panics if no operations have been added to `Select`. | |
966 | /// | |
967 | /// # Examples | |
968 | /// | |
969 | /// ``` | |
970 | /// use std::thread; | |
971 | /// use std::time::Duration; | |
972 | /// use crossbeam_channel::{unbounded, Select}; | |
973 | /// | |
974 | /// let (s1, r1) = unbounded(); | |
975 | /// let (s2, r2) = unbounded(); | |
976 | /// | |
977 | /// thread::spawn(move || { | |
978 | /// thread::sleep(Duration::from_secs(1)); | |
979 | /// s1.send(10).unwrap(); | |
980 | /// }); | |
981 | /// thread::spawn(move || s2.send(20).unwrap()); | |
982 | /// | |
983 | /// let mut sel = Select::new(); | |
984 | /// let oper1 = sel.recv(&r1); | |
985 | /// let oper2 = sel.recv(&r2); | |
986 | /// | |
987 | /// // The second operation will be selected because it becomes ready first. | |
988 | /// match sel.ready() { | |
989 | /// i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)), | |
990 | /// i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)), | |
991 | /// _ => unreachable!(), | |
992 | /// } | |
993 | /// ``` | |
994 | pub fn ready(&mut self) -> usize { | |
995 | if self.handles.is_empty() { | |
996 | panic!("no operations have been added to `Select`"); | |
997 | } | |
998 | ||
999 | run_ready(&mut self.handles, Timeout::Never).unwrap() | |
1000 | } | |
1001 | ||
1002 | /// Blocks for a limited time until one of the operations becomes ready. | |
1003 | /// | |
1004 | /// If an operation becomes ready, its index is returned. If multiple operations are ready at | |
1005 | /// the same time, a random one among them is chosen. If none of the operations become ready | |
1006 | /// for the specified duration, an error is returned. | |
1007 | /// | |
1008 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready | |
1009 | /// even when it will simply return an error because the channel is disconnected. | |
1010 | /// | |
1011 | /// Note that this method might return with success spuriously, so it's a good idea to double | |
1012 | /// check if the operation is really ready. | |
1013 | /// | |
1014 | /// # Examples | |
1015 | /// | |
1016 | /// ``` | |
1017 | /// use std::thread; | |
1018 | /// use std::time::Duration; | |
1019 | /// use crossbeam_channel::{unbounded, Select}; | |
1020 | /// | |
1021 | /// let (s1, r1) = unbounded(); | |
1022 | /// let (s2, r2) = unbounded(); | |
1023 | /// | |
1024 | /// thread::spawn(move || { | |
1025 | /// thread::sleep(Duration::from_secs(1)); | |
1026 | /// s1.send(10).unwrap(); | |
1027 | /// }); | |
1028 | /// thread::spawn(move || s2.send(20).unwrap()); | |
1029 | /// | |
1030 | /// let mut sel = Select::new(); | |
1031 | /// let oper1 = sel.recv(&r1); | |
1032 | /// let oper2 = sel.recv(&r2); | |
1033 | /// | |
1034 | /// // The second operation will be selected because it becomes ready first. | |
1035 | /// match sel.ready_timeout(Duration::from_millis(500)) { | |
1036 | /// Err(_) => panic!("should not have timed out"), | |
1037 | /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)), | |
1038 | /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)), | |
1039 | /// Ok(_) => unreachable!(), | |
1040 | /// } | |
1041 | /// ``` | |
1042 | pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> { | |
5869c6ff XL |
1043 | self.ready_deadline(Instant::now() + timeout) |
1044 | } | |
1b1a35ee | 1045 | |
5869c6ff XL |
1046 | /// Blocks until a given deadline, or until one of the operations becomes ready. |
1047 | /// | |
1048 | /// If an operation becomes ready, its index is returned. If multiple operations are ready at | |
1049 | /// the same time, a random one among them is chosen. If none of the operations become ready | |
1050 | /// before the deadline, an error is returned. | |
1051 | /// | |
1052 | /// An operation is considered to be ready if it doesn't have to block. Note that it is ready | |
1053 | /// even when it will simply return an error because the channel is disconnected. | |
1054 | /// | |
1055 | /// Note that this method might return with success spuriously, so it's a good idea to double | |
1056 | /// check if the operation is really ready. | |
1057 | /// | |
1058 | /// # Examples | |
1059 | /// | |
1060 | /// ``` | |
1061 | /// use std::thread; | |
1062 | /// use std::time::{Duration, Instant}; | |
1063 | /// use crossbeam_channel::{unbounded, Select}; | |
1064 | /// | |
1065 | /// let deadline = Instant::now() + Duration::from_millis(500); | |
1066 | /// | |
1067 | /// let (s1, r1) = unbounded(); | |
1068 | /// let (s2, r2) = unbounded(); | |
1069 | /// | |
1070 | /// thread::spawn(move || { | |
1071 | /// thread::sleep(Duration::from_secs(1)); | |
1072 | /// s1.send(10).unwrap(); | |
1073 | /// }); | |
1074 | /// thread::spawn(move || s2.send(20).unwrap()); | |
1075 | /// | |
1076 | /// let mut sel = Select::new(); | |
1077 | /// let oper1 = sel.recv(&r1); | |
1078 | /// let oper2 = sel.recv(&r2); | |
1079 | /// | |
1080 | /// // The second operation will be selected because it becomes ready first. | |
1081 | /// match sel.ready_deadline(deadline) { | |
1082 | /// Err(_) => panic!("should not have timed out"), | |
1083 | /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)), | |
1084 | /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)), | |
1085 | /// Ok(_) => unreachable!(), | |
1086 | /// } | |
1087 | /// ``` | |
1088 | pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> { | |
1089 | match run_ready(&mut self.handles, Timeout::At(deadline)) { | |
1b1a35ee XL |
1090 | None => Err(ReadyTimeoutError), |
1091 | Some(index) => Ok(index), | |
1092 | } | |
1093 | } | |
1094 | } | |
1095 | ||
1096 | impl<'a> Clone for Select<'a> { | |
1097 | fn clone(&self) -> Select<'a> { | |
1098 | Select { | |
1099 | handles: self.handles.clone(), | |
1100 | next_index: self.next_index, | |
1101 | } | |
1102 | } | |
1103 | } | |
1104 | ||
1105 | impl<'a> Default for Select<'a> { | |
1106 | fn default() -> Select<'a> { | |
1107 | Select::new() | |
1108 | } | |
1109 | } | |
1110 | ||
5869c6ff XL |
1111 | impl fmt::Debug for Select<'_> { |
1112 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
1b1a35ee XL |
1113 | f.pad("Select { .. }") |
1114 | } | |
1115 | } | |
1116 | ||
1117 | /// A selected operation that needs to be completed. | |
1118 | /// | |
1119 | /// To complete the operation, call [`send`] or [`recv`]. | |
1120 | /// | |
1121 | /// # Panics | |
1122 | /// | |
1123 | /// Forgetting to complete the operation is an error and might lead to deadlocks. If a | |
1124 | /// `SelectedOperation` is dropped without completion, a panic occurs. | |
1125 | /// | |
5869c6ff XL |
1126 | /// [`send`]: SelectedOperation::send |
1127 | /// [`recv`]: SelectedOperation::recv | |
1b1a35ee XL |
1128 | #[must_use] |
1129 | pub struct SelectedOperation<'a> { | |
1130 | /// Token needed to complete the operation. | |
1131 | token: Token, | |
1132 | ||
1133 | /// The index of the selected operation. | |
1134 | index: usize, | |
1135 | ||
1136 | /// The address of the selected `Sender` or `Receiver`. | |
1137 | ptr: *const u8, | |
1138 | ||
1139 | /// Indicates that `Sender`s and `Receiver`s are borrowed. | |
1140 | _marker: PhantomData<&'a ()>, | |
1141 | } | |
1142 | ||
5869c6ff | 1143 | impl SelectedOperation<'_> { |
1b1a35ee XL |
1144 | /// Returns the index of the selected operation. |
1145 | /// | |
1146 | /// # Examples | |
1147 | /// | |
1148 | /// ``` | |
1149 | /// use crossbeam_channel::{bounded, Select}; | |
1150 | /// | |
1151 | /// let (s1, r1) = bounded::<()>(0); | |
1152 | /// let (s2, r2) = bounded::<()>(0); | |
1153 | /// let (s3, r3) = bounded::<()>(1); | |
1154 | /// | |
1155 | /// let mut sel = Select::new(); | |
1156 | /// let oper1 = sel.send(&s1); | |
1157 | /// let oper2 = sel.recv(&r2); | |
1158 | /// let oper3 = sel.send(&s3); | |
1159 | /// | |
1160 | /// // Only the last operation is ready. | |
1161 | /// let oper = sel.select(); | |
1162 | /// assert_eq!(oper.index(), 2); | |
1163 | /// assert_eq!(oper.index(), oper3); | |
1164 | /// | |
1165 | /// // Complete the operation. | |
1166 | /// oper.send(&s3, ()).unwrap(); | |
1167 | /// ``` | |
1168 | pub fn index(&self) -> usize { | |
1169 | self.index | |
1170 | } | |
1171 | ||
1172 | /// Completes the send operation. | |
1173 | /// | |
1174 | /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`] | |
1175 | /// when the operation was added. | |
1176 | /// | |
1177 | /// # Panics | |
1178 | /// | |
1179 | /// Panics if an incorrect [`Sender`] reference is passed. | |
1180 | /// | |
1181 | /// # Examples | |
1182 | /// | |
1183 | /// ``` | |
1184 | /// use crossbeam_channel::{bounded, Select, SendError}; | |
1185 | /// | |
1186 | /// let (s, r) = bounded::<i32>(0); | |
1187 | /// drop(r); | |
1188 | /// | |
1189 | /// let mut sel = Select::new(); | |
1190 | /// let oper1 = sel.send(&s); | |
1191 | /// | |
1192 | /// let oper = sel.select(); | |
1193 | /// assert_eq!(oper.index(), oper1); | |
1194 | /// assert_eq!(oper.send(&s, 10), Err(SendError(10))); | |
1195 | /// ``` | |
1b1a35ee XL |
1196 | pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> { |
1197 | assert!( | |
1198 | s as *const Sender<T> as *const u8 == self.ptr, | |
1199 | "passed a sender that wasn't selected", | |
1200 | ); | |
1201 | let res = unsafe { channel::write(s, &mut self.token, msg) }; | |
1202 | mem::forget(self); | |
1203 | res.map_err(SendError) | |
1204 | } | |
1205 | ||
1206 | /// Completes the receive operation. | |
1207 | /// | |
1208 | /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`] | |
1209 | /// when the operation was added. | |
1210 | /// | |
1211 | /// # Panics | |
1212 | /// | |
1213 | /// Panics if an incorrect [`Receiver`] reference is passed. | |
1214 | /// | |
1215 | /// # Examples | |
1216 | /// | |
1217 | /// ``` | |
1218 | /// use crossbeam_channel::{bounded, Select, RecvError}; | |
1219 | /// | |
1220 | /// let (s, r) = bounded::<i32>(0); | |
1221 | /// drop(s); | |
1222 | /// | |
1223 | /// let mut sel = Select::new(); | |
1224 | /// let oper1 = sel.recv(&r); | |
1225 | /// | |
1226 | /// let oper = sel.select(); | |
1227 | /// assert_eq!(oper.index(), oper1); | |
1228 | /// assert_eq!(oper.recv(&r), Err(RecvError)); | |
1229 | /// ``` | |
1b1a35ee XL |
1230 | pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> { |
1231 | assert!( | |
1232 | r as *const Receiver<T> as *const u8 == self.ptr, | |
1233 | "passed a receiver that wasn't selected", | |
1234 | ); | |
1235 | let res = unsafe { channel::read(r, &mut self.token) }; | |
1236 | mem::forget(self); | |
1237 | res.map_err(|_| RecvError) | |
1238 | } | |
1239 | } | |
1240 | ||
5869c6ff XL |
1241 | impl fmt::Debug for SelectedOperation<'_> { |
1242 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
1b1a35ee XL |
1243 | f.pad("SelectedOperation { .. }") |
1244 | } | |
1245 | } | |
1246 | ||
5869c6ff | 1247 | impl Drop for SelectedOperation<'_> { |
1b1a35ee XL |
1248 | fn drop(&mut self) { |
1249 | panic!("dropped `SelectedOperation` without completing the operation"); | |
1250 | } | |
1251 | } |