]>
Commit | Line | Data |
---|---|---|
1a4d82fc JJ |
1 | // Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT |
2 | // file at the top-level directory of this distribution and at | |
3 | // http://rust-lang.org/COPYRIGHT. | |
4 | // | |
5 | // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or | |
6 | // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license | |
7 | // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your | |
8 | // option. This file may not be copied, modified, or distributed | |
9 | // except according to those terms. | |
10 | ||
11 | //! Selection over an array of receivers | |
12 | //! | |
13 | //! This module contains the implementation machinery necessary for selecting | |
14 | //! over a number of receivers. One large goal of this module is to provide an | |
15 | //! efficient interface to selecting over any receiver of any type. | |
16 | //! | |
17 | //! This is achieved through an architecture of a "receiver set" in which | |
18 | //! receivers are added to a set and then the entire set is waited on at once. | |
19 | //! The set can be waited on multiple times to prevent re-adding each receiver | |
20 | //! to the set. | |
21 | //! | |
22 | //! Usage of this module is currently encouraged to go through the use of the | |
23 | //! `select!` macro. This macro allows naturally binding of variables to the | |
24 | //! received values of receivers in a much more natural syntax then usage of the | |
25 | //! `Select` structure directly. | |
26 | //! | |
c34b1796 | 27 | //! # Examples |
1a4d82fc JJ |
28 | //! |
29 | //! ```rust | |
c1a9b12d SL |
30 | //! #![feature(mpsc_select)] |
31 | //! | |
1a4d82fc JJ |
32 | //! use std::sync::mpsc::channel; |
33 | //! | |
34 | //! let (tx1, rx1) = channel(); | |
35 | //! let (tx2, rx2) = channel(); | |
36 | //! | |
85aaf69f SL |
37 | //! tx1.send(1).unwrap(); |
38 | //! tx2.send(2).unwrap(); | |
1a4d82fc JJ |
39 | //! |
40 | //! select! { | |
41 | //! val = rx1.recv() => { | |
85aaf69f | 42 | //! assert_eq!(val.unwrap(), 1); |
1a4d82fc JJ |
43 | //! }, |
44 | //! val = rx2.recv() => { | |
85aaf69f | 45 | //! assert_eq!(val.unwrap(), 2); |
1a4d82fc JJ |
46 | //! } |
47 | //! } | |
48 | //! ``` | |
49 | ||
50 | #![allow(dead_code)] | |
62682a34 | 51 | #![unstable(feature = "mpsc_select", |
85aaf69f SL |
52 | reason = "This implementation, while likely sufficient, is unsafe and \ |
53 | likely to be error prone. At some point in the future this \ | |
54 | module will likely be replaced, and it is currently \ | |
55 | unknown how much API breakage that will cause. The ability \ | |
56 | to select over a number of channels will remain forever, \ | |
57 | but no guarantees beyond this are being made")] | |
1a4d82fc JJ |
58 | |
59 | ||
60 | use core::prelude::*; | |
61 | ||
bd371182 | 62 | use core::cell::{Cell, UnsafeCell}; |
1a4d82fc JJ |
63 | use core::marker; |
64 | use core::mem; | |
85aaf69f SL |
65 | use core::ptr; |
66 | use core::usize; | |
1a4d82fc JJ |
67 | |
68 | use sync::mpsc::{Receiver, RecvError}; | |
69 | use sync::mpsc::blocking::{self, SignalToken}; | |
70 | ||
71 | /// The "receiver set" of the select interface. This structure is used to manage | |
72 | /// a set of receivers which are being selected over. | |
73 | pub struct Select { | |
bd371182 AL |
74 | inner: UnsafeCell<SelectInner>, |
75 | next_id: Cell<usize>, | |
76 | } | |
77 | ||
78 | struct SelectInner { | |
1a4d82fc JJ |
79 | head: *mut Handle<'static, ()>, |
80 | tail: *mut Handle<'static, ()>, | |
1a4d82fc JJ |
81 | } |
82 | ||
85aaf69f SL |
83 | impl !marker::Send for Select {} |
84 | ||
1a4d82fc JJ |
85 | /// A handle to a receiver which is currently a member of a `Select` set of |
86 | /// receivers. This handle is used to keep the receiver in the set as well as | |
87 | /// interact with the underlying receiver. | |
c34b1796 | 88 | pub struct Handle<'rx, T:Send+'rx> { |
1a4d82fc JJ |
89 | /// The ID of this handle, used to compare against the return value of |
90 | /// `Select::wait()` | |
c34b1796 | 91 | id: usize, |
bd371182 | 92 | selector: *mut SelectInner, |
1a4d82fc JJ |
93 | next: *mut Handle<'static, ()>, |
94 | prev: *mut Handle<'static, ()>, | |
95 | added: bool, | |
96 | packet: &'rx (Packet+'rx), | |
97 | ||
98 | // due to our fun transmutes, we be sure to place this at the end. (nothing | |
99 | // previous relies on T) | |
100 | rx: &'rx Receiver<T>, | |
101 | } | |
102 | ||
103 | struct Packets { cur: *mut Handle<'static, ()> } | |
104 | ||
105 | #[doc(hidden)] | |
106 | #[derive(PartialEq)] | |
107 | pub enum StartResult { | |
108 | Installed, | |
109 | Abort, | |
110 | } | |
111 | ||
112 | #[doc(hidden)] | |
113 | pub trait Packet { | |
114 | fn can_recv(&self) -> bool; | |
115 | fn start_selection(&self, token: SignalToken) -> StartResult; | |
116 | fn abort_selection(&self) -> bool; | |
117 | } | |
118 | ||
119 | impl Select { | |
85aaf69f SL |
120 | /// Creates a new selection structure. This set is initially empty. |
121 | /// | |
122 | /// Usage of this struct directly can sometimes be burdensome, and usage is much easier through | |
123 | /// the `select!` macro. | |
124 | /// | |
125 | /// # Examples | |
126 | /// | |
127 | /// ``` | |
c1a9b12d SL |
128 | /// #![feature(mpsc_select)] |
129 | /// | |
85aaf69f | 130 | /// use std::sync::mpsc::Select; |
1a4d82fc | 131 | /// |
85aaf69f SL |
132 | /// let select = Select::new(); |
133 | /// ``` | |
1a4d82fc JJ |
134 | pub fn new() -> Select { |
135 | Select { | |
bd371182 AL |
136 | inner: UnsafeCell::new(SelectInner { |
137 | head: ptr::null_mut(), | |
138 | tail: ptr::null_mut(), | |
139 | }), | |
1a4d82fc JJ |
140 | next_id: Cell::new(1), |
141 | } | |
142 | } | |
143 | ||
144 | /// Creates a new handle into this receiver set for a new receiver. Note | |
145 | /// that this does *not* add the receiver to the receiver set, for that you | |
146 | /// must call the `add` method on the handle itself. | |
c34b1796 | 147 | pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> { |
1a4d82fc JJ |
148 | let id = self.next_id.get(); |
149 | self.next_id.set(id + 1); | |
150 | Handle { | |
151 | id: id, | |
bd371182 | 152 | selector: self.inner.get(), |
85aaf69f SL |
153 | next: ptr::null_mut(), |
154 | prev: ptr::null_mut(), | |
1a4d82fc JJ |
155 | added: false, |
156 | rx: rx, | |
157 | packet: rx, | |
158 | } | |
159 | } | |
160 | ||
161 | /// Waits for an event on this receiver set. The returned value is *not* an | |
162 | /// index, but rather an id. This id can be queried against any active | |
163 | /// `Handle` structures (each one has an `id` method). The handle with | |
164 | /// the matching `id` will have some sort of event available on it. The | |
165 | /// event could either be that data is available or the corresponding | |
166 | /// channel has been closed. | |
c34b1796 | 167 | pub fn wait(&self) -> usize { |
1a4d82fc JJ |
168 | self.wait2(true) |
169 | } | |
170 | ||
171 | /// Helper method for skipping the preflight checks during testing | |
c34b1796 | 172 | fn wait2(&self, do_preflight_checks: bool) -> usize { |
1a4d82fc JJ |
173 | // Note that this is currently an inefficient implementation. We in |
174 | // theory have knowledge about all receivers in the set ahead of time, | |
175 | // so this method shouldn't really have to iterate over all of them yet | |
176 | // again. The idea with this "receiver set" interface is to get the | |
177 | // interface right this time around, and later this implementation can | |
178 | // be optimized. | |
179 | // | |
180 | // This implementation can be summarized by: | |
181 | // | |
182 | // fn select(receivers) { | |
183 | // if any receiver ready { return ready index } | |
184 | // deschedule { | |
185 | // block on all receivers | |
186 | // } | |
187 | // unblock on all receivers | |
188 | // return ready index | |
189 | // } | |
190 | // | |
191 | // Most notably, the iterations over all of the receivers shouldn't be | |
192 | // necessary. | |
193 | unsafe { | |
194 | // Stage 1: preflight checks. Look for any packets ready to receive | |
195 | if do_preflight_checks { | |
196 | for handle in self.iter() { | |
197 | if (*handle).packet.can_recv() { | |
198 | return (*handle).id(); | |
199 | } | |
200 | } | |
201 | } | |
202 | ||
203 | // Stage 2: begin the blocking process | |
204 | // | |
205 | // Create a number of signal tokens, and install each one | |
206 | // sequentially until one fails. If one fails, then abort the | |
207 | // selection on the already-installed tokens. | |
208 | let (wait_token, signal_token) = blocking::tokens(); | |
209 | for (i, handle) in self.iter().enumerate() { | |
210 | match (*handle).packet.start_selection(signal_token.clone()) { | |
211 | StartResult::Installed => {} | |
212 | StartResult::Abort => { | |
213 | // Go back and abort the already-begun selections | |
214 | for handle in self.iter().take(i) { | |
215 | (*handle).packet.abort_selection(); | |
216 | } | |
217 | return (*handle).id; | |
218 | } | |
219 | } | |
220 | } | |
221 | ||
222 | // Stage 3: no messages available, actually block | |
223 | wait_token.wait(); | |
224 | ||
225 | // Stage 4: there *must* be message available; find it. | |
226 | // | |
227 | // Abort the selection process on each receiver. If the abort | |
228 | // process returns `true`, then that means that the receiver is | |
229 | // ready to receive some data. Note that this also means that the | |
230 | // receiver may have yet to have fully read the `to_wake` field and | |
231 | // woken us up (although the wakeup is guaranteed to fail). | |
232 | // | |
233 | // This situation happens in the window of where a sender invokes | |
bd371182 | 234 | // increment(), sees -1, and then decides to wake up the thread. After |
1a4d82fc JJ |
235 | // all this is done, the sending thread will set `selecting` to |
236 | // `false`. Until this is done, we cannot return. If we were to | |
237 | // return, then a sender could wake up a receiver which has gone | |
238 | // back to sleep after this call to `select`. | |
239 | // | |
240 | // Note that it is a "fairly small window" in which an increment() | |
241 | // views that it should wake a thread up until the `selecting` bit | |
242 | // is set to false. For now, the implementation currently just spins | |
243 | // in a yield loop. This is very distasteful, but this | |
244 | // implementation is already nowhere near what it should ideally be. | |
245 | // A rewrite should focus on avoiding a yield loop, and for now this | |
246 | // implementation is tying us over to a more efficient "don't | |
247 | // iterate over everything every time" implementation. | |
85aaf69f | 248 | let mut ready_id = usize::MAX; |
1a4d82fc JJ |
249 | for handle in self.iter() { |
250 | if (*handle).packet.abort_selection() { | |
251 | ready_id = (*handle).id; | |
252 | } | |
253 | } | |
254 | ||
255 | // We must have found a ready receiver | |
85aaf69f | 256 | assert!(ready_id != usize::MAX); |
1a4d82fc JJ |
257 | return ready_id; |
258 | } | |
259 | } | |
260 | ||
bd371182 | 261 | fn iter(&self) -> Packets { Packets { cur: unsafe { &*self.inner.get() }.head } } |
1a4d82fc JJ |
262 | } |
263 | ||
c34b1796 | 264 | impl<'rx, T: Send> Handle<'rx, T> { |
9346a6ac | 265 | /// Retrieves the id of this handle. |
1a4d82fc | 266 | #[inline] |
c34b1796 | 267 | pub fn id(&self) -> usize { self.id } |
1a4d82fc | 268 | |
9346a6ac | 269 | /// Blocks to receive a value on the underlying receiver, returning `Some` on |
1a4d82fc JJ |
270 | /// success or `None` if the channel disconnects. This function has the same |
271 | /// semantics as `Receiver.recv` | |
272 | pub fn recv(&mut self) -> Result<T, RecvError> { self.rx.recv() } | |
273 | ||
274 | /// Adds this handle to the receiver set that the handle was created from. This | |
275 | /// method can be called multiple times, but it has no effect if `add` was | |
276 | /// called previously. | |
277 | /// | |
278 | /// This method is unsafe because it requires that the `Handle` is not moved | |
279 | /// while it is added to the `Select` set. | |
280 | pub unsafe fn add(&mut self) { | |
281 | if self.added { return } | |
bd371182 | 282 | let selector = &mut *self.selector; |
1a4d82fc JJ |
283 | let me: *mut Handle<'static, ()> = mem::transmute(&*self); |
284 | ||
285 | if selector.head.is_null() { | |
286 | selector.head = me; | |
287 | selector.tail = me; | |
288 | } else { | |
289 | (*me).prev = selector.tail; | |
290 | assert!((*me).next.is_null()); | |
291 | (*selector.tail).next = me; | |
292 | selector.tail = me; | |
293 | } | |
294 | self.added = true; | |
295 | } | |
296 | ||
297 | /// Removes this handle from the `Select` set. This method is unsafe because | |
298 | /// it has no guarantee that the `Handle` was not moved since `add` was | |
299 | /// called. | |
300 | pub unsafe fn remove(&mut self) { | |
301 | if !self.added { return } | |
302 | ||
bd371182 | 303 | let selector = &mut *self.selector; |
1a4d82fc JJ |
304 | let me: *mut Handle<'static, ()> = mem::transmute(&*self); |
305 | ||
306 | if self.prev.is_null() { | |
307 | assert_eq!(selector.head, me); | |
308 | selector.head = self.next; | |
309 | } else { | |
310 | (*self.prev).next = self.next; | |
311 | } | |
312 | if self.next.is_null() { | |
313 | assert_eq!(selector.tail, me); | |
314 | selector.tail = self.prev; | |
315 | } else { | |
316 | (*self.next).prev = self.prev; | |
317 | } | |
318 | ||
85aaf69f SL |
319 | self.next = ptr::null_mut(); |
320 | self.prev = ptr::null_mut(); | |
1a4d82fc JJ |
321 | |
322 | self.added = false; | |
323 | } | |
324 | } | |
325 | ||
1a4d82fc JJ |
326 | impl Drop for Select { |
327 | fn drop(&mut self) { | |
bd371182 AL |
328 | unsafe { |
329 | assert!((&*self.inner.get()).head.is_null()); | |
330 | assert!((&*self.inner.get()).tail.is_null()); | |
331 | } | |
1a4d82fc JJ |
332 | } |
333 | } | |
334 | ||
c34b1796 | 335 | impl<'rx, T: Send> Drop for Handle<'rx, T> { |
1a4d82fc JJ |
336 | fn drop(&mut self) { |
337 | unsafe { self.remove() } | |
338 | } | |
339 | } | |
340 | ||
341 | impl Iterator for Packets { | |
342 | type Item = *mut Handle<'static, ()>; | |
343 | ||
344 | fn next(&mut self) -> Option<*mut Handle<'static, ()>> { | |
345 | if self.cur.is_null() { | |
346 | None | |
347 | } else { | |
348 | let ret = Some(self.cur); | |
349 | unsafe { self.cur = (*self.cur).next; } | |
350 | ret | |
351 | } | |
352 | } | |
353 | } | |
354 | ||
355 | #[cfg(test)] | |
356 | #[allow(unused_imports)] | |
d9579d0f | 357 | mod tests { |
1a4d82fc JJ |
358 | use prelude::v1::*; |
359 | ||
85aaf69f | 360 | use thread; |
1a4d82fc JJ |
361 | use sync::mpsc::*; |
362 | ||
363 | // Don't use the libstd version so we can pull in the right Select structure | |
364 | // (std::comm points at the wrong one) | |
365 | macro_rules! select { | |
366 | ( | |
367 | $($name:pat = $rx:ident.$meth:ident() => $code:expr),+ | |
368 | ) => ({ | |
369 | let sel = Select::new(); | |
370 | $( let mut $rx = sel.handle(&$rx); )+ | |
371 | unsafe { | |
372 | $( $rx.add(); )+ | |
373 | } | |
374 | let ret = sel.wait(); | |
375 | $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+ | |
376 | { unreachable!() } | |
377 | }) | |
378 | } | |
379 | ||
380 | #[test] | |
381 | fn smoke() { | |
c34b1796 AL |
382 | let (tx1, rx1) = channel::<i32>(); |
383 | let (tx2, rx2) = channel::<i32>(); | |
1a4d82fc JJ |
384 | tx1.send(1).unwrap(); |
385 | select! { | |
386 | foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); }, | |
387 | _bar = rx2.recv() => { panic!() } | |
388 | } | |
389 | tx2.send(2).unwrap(); | |
390 | select! { | |
391 | _foo = rx1.recv() => { panic!() }, | |
392 | bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) } | |
393 | } | |
394 | drop(tx1); | |
395 | select! { | |
396 | foo = rx1.recv() => { assert!(foo.is_err()); }, | |
397 | _bar = rx2.recv() => { panic!() } | |
398 | } | |
399 | drop(tx2); | |
400 | select! { | |
401 | bar = rx2.recv() => { assert!(bar.is_err()); } | |
402 | } | |
403 | } | |
404 | ||
405 | #[test] | |
406 | fn smoke2() { | |
c34b1796 AL |
407 | let (_tx1, rx1) = channel::<i32>(); |
408 | let (_tx2, rx2) = channel::<i32>(); | |
409 | let (_tx3, rx3) = channel::<i32>(); | |
410 | let (_tx4, rx4) = channel::<i32>(); | |
411 | let (tx5, rx5) = channel::<i32>(); | |
1a4d82fc JJ |
412 | tx5.send(4).unwrap(); |
413 | select! { | |
414 | _foo = rx1.recv() => { panic!("1") }, | |
415 | _foo = rx2.recv() => { panic!("2") }, | |
416 | _foo = rx3.recv() => { panic!("3") }, | |
417 | _foo = rx4.recv() => { panic!("4") }, | |
418 | foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); } | |
419 | } | |
420 | } | |
421 | ||
422 | #[test] | |
423 | fn closed() { | |
c34b1796 AL |
424 | let (_tx1, rx1) = channel::<i32>(); |
425 | let (tx2, rx2) = channel::<i32>(); | |
1a4d82fc JJ |
426 | drop(tx2); |
427 | ||
428 | select! { | |
429 | _a1 = rx1.recv() => { panic!() }, | |
430 | a2 = rx2.recv() => { assert!(a2.is_err()); } | |
431 | } | |
432 | } | |
433 | ||
434 | #[test] | |
435 | fn unblocks() { | |
c34b1796 AL |
436 | let (tx1, rx1) = channel::<i32>(); |
437 | let (_tx2, rx2) = channel::<i32>(); | |
438 | let (tx3, rx3) = channel::<i32>(); | |
1a4d82fc | 439 | |
85aaf69f SL |
440 | let _t = thread::spawn(move|| { |
441 | for _ in 0..20 { thread::yield_now(); } | |
1a4d82fc JJ |
442 | tx1.send(1).unwrap(); |
443 | rx3.recv().unwrap(); | |
85aaf69f | 444 | for _ in 0..20 { thread::yield_now(); } |
1a4d82fc JJ |
445 | }); |
446 | ||
447 | select! { | |
448 | a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, | |
449 | _b = rx2.recv() => { panic!() } | |
450 | } | |
451 | tx3.send(1).unwrap(); | |
452 | select! { | |
453 | a = rx1.recv() => { assert!(a.is_err()) }, | |
454 | _b = rx2.recv() => { panic!() } | |
455 | } | |
456 | } | |
457 | ||
458 | #[test] | |
459 | fn both_ready() { | |
c34b1796 AL |
460 | let (tx1, rx1) = channel::<i32>(); |
461 | let (tx2, rx2) = channel::<i32>(); | |
1a4d82fc JJ |
462 | let (tx3, rx3) = channel::<()>(); |
463 | ||
85aaf69f SL |
464 | let _t = thread::spawn(move|| { |
465 | for _ in 0..20 { thread::yield_now(); } | |
1a4d82fc JJ |
466 | tx1.send(1).unwrap(); |
467 | tx2.send(2).unwrap(); | |
468 | rx3.recv().unwrap(); | |
469 | }); | |
470 | ||
471 | select! { | |
472 | a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, | |
473 | a = rx2.recv() => { assert_eq!(a.unwrap(), 2); } | |
474 | } | |
475 | select! { | |
476 | a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, | |
477 | a = rx2.recv() => { assert_eq!(a.unwrap(), 2); } | |
478 | } | |
479 | assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); | |
480 | assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty)); | |
481 | tx3.send(()).unwrap(); | |
482 | } | |
483 | ||
484 | #[test] | |
485 | fn stress() { | |
c34b1796 AL |
486 | const AMT: i32 = 10000; |
487 | let (tx1, rx1) = channel::<i32>(); | |
488 | let (tx2, rx2) = channel::<i32>(); | |
1a4d82fc JJ |
489 | let (tx3, rx3) = channel::<()>(); |
490 | ||
85aaf69f SL |
491 | let _t = thread::spawn(move|| { |
492 | for i in 0..AMT { | |
1a4d82fc JJ |
493 | if i % 2 == 0 { |
494 | tx1.send(i).unwrap(); | |
495 | } else { | |
496 | tx2.send(i).unwrap(); | |
497 | } | |
498 | rx3.recv().unwrap(); | |
499 | } | |
500 | }); | |
501 | ||
85aaf69f | 502 | for i in 0..AMT { |
1a4d82fc JJ |
503 | select! { |
504 | i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); }, | |
505 | i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); } | |
506 | } | |
507 | tx3.send(()).unwrap(); | |
508 | } | |
509 | } | |
510 | ||
511 | #[test] | |
512 | fn cloning() { | |
c34b1796 AL |
513 | let (tx1, rx1) = channel::<i32>(); |
514 | let (_tx2, rx2) = channel::<i32>(); | |
1a4d82fc JJ |
515 | let (tx3, rx3) = channel::<()>(); |
516 | ||
85aaf69f | 517 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
518 | rx3.recv().unwrap(); |
519 | tx1.clone(); | |
520 | assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty)); | |
521 | tx1.send(2).unwrap(); | |
522 | rx3.recv().unwrap(); | |
523 | }); | |
524 | ||
525 | tx3.send(()).unwrap(); | |
526 | select! { | |
527 | _i1 = rx1.recv() => {}, | |
528 | _i2 = rx2.recv() => panic!() | |
529 | } | |
530 | tx3.send(()).unwrap(); | |
531 | } | |
532 | ||
533 | #[test] | |
534 | fn cloning2() { | |
c34b1796 AL |
535 | let (tx1, rx1) = channel::<i32>(); |
536 | let (_tx2, rx2) = channel::<i32>(); | |
1a4d82fc JJ |
537 | let (tx3, rx3) = channel::<()>(); |
538 | ||
85aaf69f | 539 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
540 | rx3.recv().unwrap(); |
541 | tx1.clone(); | |
542 | assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty)); | |
543 | tx1.send(2).unwrap(); | |
544 | rx3.recv().unwrap(); | |
545 | }); | |
546 | ||
547 | tx3.send(()).unwrap(); | |
548 | select! { | |
549 | _i1 = rx1.recv() => {}, | |
550 | _i2 = rx2.recv() => panic!() | |
551 | } | |
552 | tx3.send(()).unwrap(); | |
553 | } | |
554 | ||
555 | #[test] | |
556 | fn cloning3() { | |
557 | let (tx1, rx1) = channel::<()>(); | |
558 | let (tx2, rx2) = channel::<()>(); | |
559 | let (tx3, rx3) = channel::<()>(); | |
85aaf69f | 560 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
561 | let s = Select::new(); |
562 | let mut h1 = s.handle(&rx1); | |
563 | let mut h2 = s.handle(&rx2); | |
564 | unsafe { h2.add(); } | |
565 | unsafe { h1.add(); } | |
566 | assert_eq!(s.wait(), h2.id); | |
567 | tx3.send(()).unwrap(); | |
568 | }); | |
569 | ||
85aaf69f | 570 | for _ in 0..1000 { thread::yield_now(); } |
1a4d82fc JJ |
571 | drop(tx1.clone()); |
572 | tx2.send(()).unwrap(); | |
573 | rx3.recv().unwrap(); | |
574 | } | |
575 | ||
576 | #[test] | |
577 | fn preflight1() { | |
578 | let (tx, rx) = channel(); | |
579 | tx.send(()).unwrap(); | |
580 | select! { | |
581 | _n = rx.recv() => {} | |
582 | } | |
583 | } | |
584 | ||
585 | #[test] | |
586 | fn preflight2() { | |
587 | let (tx, rx) = channel(); | |
588 | tx.send(()).unwrap(); | |
589 | tx.send(()).unwrap(); | |
590 | select! { | |
591 | _n = rx.recv() => {} | |
592 | } | |
593 | } | |
594 | ||
595 | #[test] | |
596 | fn preflight3() { | |
597 | let (tx, rx) = channel(); | |
598 | drop(tx.clone()); | |
599 | tx.send(()).unwrap(); | |
600 | select! { | |
601 | _n = rx.recv() => {} | |
602 | } | |
603 | } | |
604 | ||
605 | #[test] | |
606 | fn preflight4() { | |
607 | let (tx, rx) = channel(); | |
608 | tx.send(()).unwrap(); | |
609 | let s = Select::new(); | |
610 | let mut h = s.handle(&rx); | |
611 | unsafe { h.add(); } | |
612 | assert_eq!(s.wait2(false), h.id); | |
613 | } | |
614 | ||
615 | #[test] | |
616 | fn preflight5() { | |
617 | let (tx, rx) = channel(); | |
618 | tx.send(()).unwrap(); | |
619 | tx.send(()).unwrap(); | |
620 | let s = Select::new(); | |
621 | let mut h = s.handle(&rx); | |
622 | unsafe { h.add(); } | |
623 | assert_eq!(s.wait2(false), h.id); | |
624 | } | |
625 | ||
626 | #[test] | |
627 | fn preflight6() { | |
628 | let (tx, rx) = channel(); | |
629 | drop(tx.clone()); | |
630 | tx.send(()).unwrap(); | |
631 | let s = Select::new(); | |
632 | let mut h = s.handle(&rx); | |
633 | unsafe { h.add(); } | |
634 | assert_eq!(s.wait2(false), h.id); | |
635 | } | |
636 | ||
637 | #[test] | |
638 | fn preflight7() { | |
639 | let (tx, rx) = channel::<()>(); | |
640 | drop(tx); | |
641 | let s = Select::new(); | |
642 | let mut h = s.handle(&rx); | |
643 | unsafe { h.add(); } | |
644 | assert_eq!(s.wait2(false), h.id); | |
645 | } | |
646 | ||
647 | #[test] | |
648 | fn preflight8() { | |
649 | let (tx, rx) = channel(); | |
650 | tx.send(()).unwrap(); | |
651 | drop(tx); | |
652 | rx.recv().unwrap(); | |
653 | let s = Select::new(); | |
654 | let mut h = s.handle(&rx); | |
655 | unsafe { h.add(); } | |
656 | assert_eq!(s.wait2(false), h.id); | |
657 | } | |
658 | ||
659 | #[test] | |
660 | fn preflight9() { | |
661 | let (tx, rx) = channel(); | |
662 | drop(tx.clone()); | |
663 | tx.send(()).unwrap(); | |
664 | drop(tx); | |
665 | rx.recv().unwrap(); | |
666 | let s = Select::new(); | |
667 | let mut h = s.handle(&rx); | |
668 | unsafe { h.add(); } | |
669 | assert_eq!(s.wait2(false), h.id); | |
670 | } | |
671 | ||
672 | #[test] | |
673 | fn oneshot_data_waiting() { | |
674 | let (tx1, rx1) = channel(); | |
675 | let (tx2, rx2) = channel(); | |
85aaf69f | 676 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
677 | select! { |
678 | _n = rx1.recv() => {} | |
679 | } | |
680 | tx2.send(()).unwrap(); | |
681 | }); | |
682 | ||
85aaf69f | 683 | for _ in 0..100 { thread::yield_now() } |
1a4d82fc JJ |
684 | tx1.send(()).unwrap(); |
685 | rx2.recv().unwrap(); | |
686 | } | |
687 | ||
688 | #[test] | |
689 | fn stream_data_waiting() { | |
690 | let (tx1, rx1) = channel(); | |
691 | let (tx2, rx2) = channel(); | |
692 | tx1.send(()).unwrap(); | |
693 | tx1.send(()).unwrap(); | |
694 | rx1.recv().unwrap(); | |
695 | rx1.recv().unwrap(); | |
85aaf69f | 696 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
697 | select! { |
698 | _n = rx1.recv() => {} | |
699 | } | |
700 | tx2.send(()).unwrap(); | |
701 | }); | |
702 | ||
85aaf69f | 703 | for _ in 0..100 { thread::yield_now() } |
1a4d82fc JJ |
704 | tx1.send(()).unwrap(); |
705 | rx2.recv().unwrap(); | |
706 | } | |
707 | ||
708 | #[test] | |
709 | fn shared_data_waiting() { | |
710 | let (tx1, rx1) = channel(); | |
711 | let (tx2, rx2) = channel(); | |
712 | drop(tx1.clone()); | |
713 | tx1.send(()).unwrap(); | |
714 | rx1.recv().unwrap(); | |
85aaf69f | 715 | let _t = thread::spawn(move|| { |
1a4d82fc JJ |
716 | select! { |
717 | _n = rx1.recv() => {} | |
718 | } | |
719 | tx2.send(()).unwrap(); | |
720 | }); | |
721 | ||
85aaf69f | 722 | for _ in 0..100 { thread::yield_now() } |
1a4d82fc JJ |
723 | tx1.send(()).unwrap(); |
724 | rx2.recv().unwrap(); | |
725 | } | |
726 | ||
727 | #[test] | |
728 | fn sync1() { | |
c34b1796 | 729 | let (tx, rx) = sync_channel::<i32>(1); |
1a4d82fc JJ |
730 | tx.send(1).unwrap(); |
731 | select! { | |
732 | n = rx.recv() => { assert_eq!(n.unwrap(), 1); } | |
733 | } | |
734 | } | |
735 | ||
736 | #[test] | |
737 | fn sync2() { | |
c34b1796 | 738 | let (tx, rx) = sync_channel::<i32>(0); |
85aaf69f SL |
739 | let _t = thread::spawn(move|| { |
740 | for _ in 0..100 { thread::yield_now() } | |
1a4d82fc JJ |
741 | tx.send(1).unwrap(); |
742 | }); | |
743 | select! { | |
744 | n = rx.recv() => { assert_eq!(n.unwrap(), 1); } | |
745 | } | |
746 | } | |
747 | ||
748 | #[test] | |
749 | fn sync3() { | |
c34b1796 AL |
750 | let (tx1, rx1) = sync_channel::<i32>(0); |
751 | let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel(); | |
85aaf69f SL |
752 | let _t = thread::spawn(move|| { tx1.send(1).unwrap(); }); |
753 | let _t = thread::spawn(move|| { tx2.send(2).unwrap(); }); | |
1a4d82fc JJ |
754 | select! { |
755 | n = rx1.recv() => { | |
756 | let n = n.unwrap(); | |
757 | assert_eq!(n, 1); | |
758 | assert_eq!(rx2.recv().unwrap(), 2); | |
759 | }, | |
760 | n = rx2.recv() => { | |
761 | let n = n.unwrap(); | |
762 | assert_eq!(n, 2); | |
763 | assert_eq!(rx1.recv().unwrap(), 1); | |
764 | } | |
765 | } | |
766 | } | |
767 | } |