]> git.proxmox.com Git - rustc.git/blame - src/libstd/sync/mpsc/select.rs
Imported Upstream version 1.3.0+dfsg1
[rustc.git] / src / libstd / sync / mpsc / select.rs
CommitLineData
1a4d82fc
JJ
1// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution and at
3// http://rust-lang.org/COPYRIGHT.
4//
5// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8// option. This file may not be copied, modified, or distributed
9// except according to those terms.
10
11//! Selection over an array of receivers
12//!
13//! This module contains the implementation machinery necessary for selecting
14//! over a number of receivers. One large goal of this module is to provide an
15//! efficient interface to selecting over any receiver of any type.
16//!
17//! This is achieved through an architecture of a "receiver set" in which
18//! receivers are added to a set and then the entire set is waited on at once.
19//! The set can be waited on multiple times to prevent re-adding each receiver
20//! to the set.
21//!
22//! Usage of this module is currently encouraged to go through the use of the
23//! `select!` macro. This macro allows naturally binding of variables to the
24//! received values of receivers in a much more natural syntax then usage of the
25//! `Select` structure directly.
26//!
c34b1796 27//! # Examples
1a4d82fc
JJ
28//!
29//! ```rust
c1a9b12d
SL
30//! #![feature(mpsc_select)]
31//!
1a4d82fc
JJ
32//! use std::sync::mpsc::channel;
33//!
34//! let (tx1, rx1) = channel();
35//! let (tx2, rx2) = channel();
36//!
85aaf69f
SL
37//! tx1.send(1).unwrap();
38//! tx2.send(2).unwrap();
1a4d82fc
JJ
39//!
40//! select! {
41//! val = rx1.recv() => {
85aaf69f 42//! assert_eq!(val.unwrap(), 1);
1a4d82fc
JJ
43//! },
44//! val = rx2.recv() => {
85aaf69f 45//! assert_eq!(val.unwrap(), 2);
1a4d82fc
JJ
46//! }
47//! }
48//! ```
49
50#![allow(dead_code)]
62682a34 51#![unstable(feature = "mpsc_select",
85aaf69f
SL
52 reason = "This implementation, while likely sufficient, is unsafe and \
53 likely to be error prone. At some point in the future this \
54 module will likely be replaced, and it is currently \
55 unknown how much API breakage that will cause. The ability \
56 to select over a number of channels will remain forever, \
57 but no guarantees beyond this are being made")]
1a4d82fc
JJ
58
59
60use core::prelude::*;
61
bd371182 62use core::cell::{Cell, UnsafeCell};
1a4d82fc
JJ
63use core::marker;
64use core::mem;
85aaf69f
SL
65use core::ptr;
66use core::usize;
1a4d82fc
JJ
67
68use sync::mpsc::{Receiver, RecvError};
69use sync::mpsc::blocking::{self, SignalToken};
70
71/// The "receiver set" of the select interface. This structure is used to manage
72/// a set of receivers which are being selected over.
73pub struct Select {
bd371182
AL
74 inner: UnsafeCell<SelectInner>,
75 next_id: Cell<usize>,
76}
77
78struct SelectInner {
1a4d82fc
JJ
79 head: *mut Handle<'static, ()>,
80 tail: *mut Handle<'static, ()>,
1a4d82fc
JJ
81}
82
85aaf69f
SL
83impl !marker::Send for Select {}
84
1a4d82fc
JJ
85/// A handle to a receiver which is currently a member of a `Select` set of
86/// receivers. This handle is used to keep the receiver in the set as well as
87/// interact with the underlying receiver.
c34b1796 88pub struct Handle<'rx, T:Send+'rx> {
1a4d82fc
JJ
89 /// The ID of this handle, used to compare against the return value of
90 /// `Select::wait()`
c34b1796 91 id: usize,
bd371182 92 selector: *mut SelectInner,
1a4d82fc
JJ
93 next: *mut Handle<'static, ()>,
94 prev: *mut Handle<'static, ()>,
95 added: bool,
96 packet: &'rx (Packet+'rx),
97
98 // due to our fun transmutes, we be sure to place this at the end. (nothing
99 // previous relies on T)
100 rx: &'rx Receiver<T>,
101}
102
103struct Packets { cur: *mut Handle<'static, ()> }
104
105#[doc(hidden)]
106#[derive(PartialEq)]
107pub enum StartResult {
108 Installed,
109 Abort,
110}
111
112#[doc(hidden)]
113pub trait Packet {
114 fn can_recv(&self) -> bool;
115 fn start_selection(&self, token: SignalToken) -> StartResult;
116 fn abort_selection(&self) -> bool;
117}
118
119impl Select {
85aaf69f
SL
120 /// Creates a new selection structure. This set is initially empty.
121 ///
122 /// Usage of this struct directly can sometimes be burdensome, and usage is much easier through
123 /// the `select!` macro.
124 ///
125 /// # Examples
126 ///
127 /// ```
c1a9b12d
SL
128 /// #![feature(mpsc_select)]
129 ///
85aaf69f 130 /// use std::sync::mpsc::Select;
1a4d82fc 131 ///
85aaf69f
SL
132 /// let select = Select::new();
133 /// ```
1a4d82fc
JJ
134 pub fn new() -> Select {
135 Select {
bd371182
AL
136 inner: UnsafeCell::new(SelectInner {
137 head: ptr::null_mut(),
138 tail: ptr::null_mut(),
139 }),
1a4d82fc
JJ
140 next_id: Cell::new(1),
141 }
142 }
143
144 /// Creates a new handle into this receiver set for a new receiver. Note
145 /// that this does *not* add the receiver to the receiver set, for that you
146 /// must call the `add` method on the handle itself.
c34b1796 147 pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
1a4d82fc
JJ
148 let id = self.next_id.get();
149 self.next_id.set(id + 1);
150 Handle {
151 id: id,
bd371182 152 selector: self.inner.get(),
85aaf69f
SL
153 next: ptr::null_mut(),
154 prev: ptr::null_mut(),
1a4d82fc
JJ
155 added: false,
156 rx: rx,
157 packet: rx,
158 }
159 }
160
161 /// Waits for an event on this receiver set. The returned value is *not* an
162 /// index, but rather an id. This id can be queried against any active
163 /// `Handle` structures (each one has an `id` method). The handle with
164 /// the matching `id` will have some sort of event available on it. The
165 /// event could either be that data is available or the corresponding
166 /// channel has been closed.
c34b1796 167 pub fn wait(&self) -> usize {
1a4d82fc
JJ
168 self.wait2(true)
169 }
170
171 /// Helper method for skipping the preflight checks during testing
c34b1796 172 fn wait2(&self, do_preflight_checks: bool) -> usize {
1a4d82fc
JJ
173 // Note that this is currently an inefficient implementation. We in
174 // theory have knowledge about all receivers in the set ahead of time,
175 // so this method shouldn't really have to iterate over all of them yet
176 // again. The idea with this "receiver set" interface is to get the
177 // interface right this time around, and later this implementation can
178 // be optimized.
179 //
180 // This implementation can be summarized by:
181 //
182 // fn select(receivers) {
183 // if any receiver ready { return ready index }
184 // deschedule {
185 // block on all receivers
186 // }
187 // unblock on all receivers
188 // return ready index
189 // }
190 //
191 // Most notably, the iterations over all of the receivers shouldn't be
192 // necessary.
193 unsafe {
194 // Stage 1: preflight checks. Look for any packets ready to receive
195 if do_preflight_checks {
196 for handle in self.iter() {
197 if (*handle).packet.can_recv() {
198 return (*handle).id();
199 }
200 }
201 }
202
203 // Stage 2: begin the blocking process
204 //
205 // Create a number of signal tokens, and install each one
206 // sequentially until one fails. If one fails, then abort the
207 // selection on the already-installed tokens.
208 let (wait_token, signal_token) = blocking::tokens();
209 for (i, handle) in self.iter().enumerate() {
210 match (*handle).packet.start_selection(signal_token.clone()) {
211 StartResult::Installed => {}
212 StartResult::Abort => {
213 // Go back and abort the already-begun selections
214 for handle in self.iter().take(i) {
215 (*handle).packet.abort_selection();
216 }
217 return (*handle).id;
218 }
219 }
220 }
221
222 // Stage 3: no messages available, actually block
223 wait_token.wait();
224
225 // Stage 4: there *must* be message available; find it.
226 //
227 // Abort the selection process on each receiver. If the abort
228 // process returns `true`, then that means that the receiver is
229 // ready to receive some data. Note that this also means that the
230 // receiver may have yet to have fully read the `to_wake` field and
231 // woken us up (although the wakeup is guaranteed to fail).
232 //
233 // This situation happens in the window of where a sender invokes
bd371182 234 // increment(), sees -1, and then decides to wake up the thread. After
1a4d82fc
JJ
235 // all this is done, the sending thread will set `selecting` to
236 // `false`. Until this is done, we cannot return. If we were to
237 // return, then a sender could wake up a receiver which has gone
238 // back to sleep after this call to `select`.
239 //
240 // Note that it is a "fairly small window" in which an increment()
241 // views that it should wake a thread up until the `selecting` bit
242 // is set to false. For now, the implementation currently just spins
243 // in a yield loop. This is very distasteful, but this
244 // implementation is already nowhere near what it should ideally be.
245 // A rewrite should focus on avoiding a yield loop, and for now this
246 // implementation is tying us over to a more efficient "don't
247 // iterate over everything every time" implementation.
85aaf69f 248 let mut ready_id = usize::MAX;
1a4d82fc
JJ
249 for handle in self.iter() {
250 if (*handle).packet.abort_selection() {
251 ready_id = (*handle).id;
252 }
253 }
254
255 // We must have found a ready receiver
85aaf69f 256 assert!(ready_id != usize::MAX);
1a4d82fc
JJ
257 return ready_id;
258 }
259 }
260
bd371182 261 fn iter(&self) -> Packets { Packets { cur: unsafe { &*self.inner.get() }.head } }
1a4d82fc
JJ
262}
263
c34b1796 264impl<'rx, T: Send> Handle<'rx, T> {
9346a6ac 265 /// Retrieves the id of this handle.
1a4d82fc 266 #[inline]
c34b1796 267 pub fn id(&self) -> usize { self.id }
1a4d82fc 268
9346a6ac 269 /// Blocks to receive a value on the underlying receiver, returning `Some` on
1a4d82fc
JJ
270 /// success or `None` if the channel disconnects. This function has the same
271 /// semantics as `Receiver.recv`
272 pub fn recv(&mut self) -> Result<T, RecvError> { self.rx.recv() }
273
274 /// Adds this handle to the receiver set that the handle was created from. This
275 /// method can be called multiple times, but it has no effect if `add` was
276 /// called previously.
277 ///
278 /// This method is unsafe because it requires that the `Handle` is not moved
279 /// while it is added to the `Select` set.
280 pub unsafe fn add(&mut self) {
281 if self.added { return }
bd371182 282 let selector = &mut *self.selector;
1a4d82fc
JJ
283 let me: *mut Handle<'static, ()> = mem::transmute(&*self);
284
285 if selector.head.is_null() {
286 selector.head = me;
287 selector.tail = me;
288 } else {
289 (*me).prev = selector.tail;
290 assert!((*me).next.is_null());
291 (*selector.tail).next = me;
292 selector.tail = me;
293 }
294 self.added = true;
295 }
296
297 /// Removes this handle from the `Select` set. This method is unsafe because
298 /// it has no guarantee that the `Handle` was not moved since `add` was
299 /// called.
300 pub unsafe fn remove(&mut self) {
301 if !self.added { return }
302
bd371182 303 let selector = &mut *self.selector;
1a4d82fc
JJ
304 let me: *mut Handle<'static, ()> = mem::transmute(&*self);
305
306 if self.prev.is_null() {
307 assert_eq!(selector.head, me);
308 selector.head = self.next;
309 } else {
310 (*self.prev).next = self.next;
311 }
312 if self.next.is_null() {
313 assert_eq!(selector.tail, me);
314 selector.tail = self.prev;
315 } else {
316 (*self.next).prev = self.prev;
317 }
318
85aaf69f
SL
319 self.next = ptr::null_mut();
320 self.prev = ptr::null_mut();
1a4d82fc
JJ
321
322 self.added = false;
323 }
324}
325
1a4d82fc
JJ
326impl Drop for Select {
327 fn drop(&mut self) {
bd371182
AL
328 unsafe {
329 assert!((&*self.inner.get()).head.is_null());
330 assert!((&*self.inner.get()).tail.is_null());
331 }
1a4d82fc
JJ
332 }
333}
334
c34b1796 335impl<'rx, T: Send> Drop for Handle<'rx, T> {
1a4d82fc
JJ
336 fn drop(&mut self) {
337 unsafe { self.remove() }
338 }
339}
340
341impl Iterator for Packets {
342 type Item = *mut Handle<'static, ()>;
343
344 fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
345 if self.cur.is_null() {
346 None
347 } else {
348 let ret = Some(self.cur);
349 unsafe { self.cur = (*self.cur).next; }
350 ret
351 }
352 }
353}
354
355#[cfg(test)]
356#[allow(unused_imports)]
d9579d0f 357mod tests {
1a4d82fc
JJ
358 use prelude::v1::*;
359
85aaf69f 360 use thread;
1a4d82fc
JJ
361 use sync::mpsc::*;
362
363 // Don't use the libstd version so we can pull in the right Select structure
364 // (std::comm points at the wrong one)
365 macro_rules! select {
366 (
367 $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
368 ) => ({
369 let sel = Select::new();
370 $( let mut $rx = sel.handle(&$rx); )+
371 unsafe {
372 $( $rx.add(); )+
373 }
374 let ret = sel.wait();
375 $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
376 { unreachable!() }
377 })
378 }
379
380 #[test]
381 fn smoke() {
c34b1796
AL
382 let (tx1, rx1) = channel::<i32>();
383 let (tx2, rx2) = channel::<i32>();
1a4d82fc
JJ
384 tx1.send(1).unwrap();
385 select! {
386 foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); },
387 _bar = rx2.recv() => { panic!() }
388 }
389 tx2.send(2).unwrap();
390 select! {
391 _foo = rx1.recv() => { panic!() },
392 bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) }
393 }
394 drop(tx1);
395 select! {
396 foo = rx1.recv() => { assert!(foo.is_err()); },
397 _bar = rx2.recv() => { panic!() }
398 }
399 drop(tx2);
400 select! {
401 bar = rx2.recv() => { assert!(bar.is_err()); }
402 }
403 }
404
405 #[test]
406 fn smoke2() {
c34b1796
AL
407 let (_tx1, rx1) = channel::<i32>();
408 let (_tx2, rx2) = channel::<i32>();
409 let (_tx3, rx3) = channel::<i32>();
410 let (_tx4, rx4) = channel::<i32>();
411 let (tx5, rx5) = channel::<i32>();
1a4d82fc
JJ
412 tx5.send(4).unwrap();
413 select! {
414 _foo = rx1.recv() => { panic!("1") },
415 _foo = rx2.recv() => { panic!("2") },
416 _foo = rx3.recv() => { panic!("3") },
417 _foo = rx4.recv() => { panic!("4") },
418 foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); }
419 }
420 }
421
422 #[test]
423 fn closed() {
c34b1796
AL
424 let (_tx1, rx1) = channel::<i32>();
425 let (tx2, rx2) = channel::<i32>();
1a4d82fc
JJ
426 drop(tx2);
427
428 select! {
429 _a1 = rx1.recv() => { panic!() },
430 a2 = rx2.recv() => { assert!(a2.is_err()); }
431 }
432 }
433
434 #[test]
435 fn unblocks() {
c34b1796
AL
436 let (tx1, rx1) = channel::<i32>();
437 let (_tx2, rx2) = channel::<i32>();
438 let (tx3, rx3) = channel::<i32>();
1a4d82fc 439
85aaf69f
SL
440 let _t = thread::spawn(move|| {
441 for _ in 0..20 { thread::yield_now(); }
1a4d82fc
JJ
442 tx1.send(1).unwrap();
443 rx3.recv().unwrap();
85aaf69f 444 for _ in 0..20 { thread::yield_now(); }
1a4d82fc
JJ
445 });
446
447 select! {
448 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
449 _b = rx2.recv() => { panic!() }
450 }
451 tx3.send(1).unwrap();
452 select! {
453 a = rx1.recv() => { assert!(a.is_err()) },
454 _b = rx2.recv() => { panic!() }
455 }
456 }
457
458 #[test]
459 fn both_ready() {
c34b1796
AL
460 let (tx1, rx1) = channel::<i32>();
461 let (tx2, rx2) = channel::<i32>();
1a4d82fc
JJ
462 let (tx3, rx3) = channel::<()>();
463
85aaf69f
SL
464 let _t = thread::spawn(move|| {
465 for _ in 0..20 { thread::yield_now(); }
1a4d82fc
JJ
466 tx1.send(1).unwrap();
467 tx2.send(2).unwrap();
468 rx3.recv().unwrap();
469 });
470
471 select! {
472 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
473 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
474 }
475 select! {
476 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
477 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
478 }
479 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
480 assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
481 tx3.send(()).unwrap();
482 }
483
484 #[test]
485 fn stress() {
c34b1796
AL
486 const AMT: i32 = 10000;
487 let (tx1, rx1) = channel::<i32>();
488 let (tx2, rx2) = channel::<i32>();
1a4d82fc
JJ
489 let (tx3, rx3) = channel::<()>();
490
85aaf69f
SL
491 let _t = thread::spawn(move|| {
492 for i in 0..AMT {
1a4d82fc
JJ
493 if i % 2 == 0 {
494 tx1.send(i).unwrap();
495 } else {
496 tx2.send(i).unwrap();
497 }
498 rx3.recv().unwrap();
499 }
500 });
501
85aaf69f 502 for i in 0..AMT {
1a4d82fc
JJ
503 select! {
504 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
505 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
506 }
507 tx3.send(()).unwrap();
508 }
509 }
510
511 #[test]
512 fn cloning() {
c34b1796
AL
513 let (tx1, rx1) = channel::<i32>();
514 let (_tx2, rx2) = channel::<i32>();
1a4d82fc
JJ
515 let (tx3, rx3) = channel::<()>();
516
85aaf69f 517 let _t = thread::spawn(move|| {
1a4d82fc
JJ
518 rx3.recv().unwrap();
519 tx1.clone();
520 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
521 tx1.send(2).unwrap();
522 rx3.recv().unwrap();
523 });
524
525 tx3.send(()).unwrap();
526 select! {
527 _i1 = rx1.recv() => {},
528 _i2 = rx2.recv() => panic!()
529 }
530 tx3.send(()).unwrap();
531 }
532
533 #[test]
534 fn cloning2() {
c34b1796
AL
535 let (tx1, rx1) = channel::<i32>();
536 let (_tx2, rx2) = channel::<i32>();
1a4d82fc
JJ
537 let (tx3, rx3) = channel::<()>();
538
85aaf69f 539 let _t = thread::spawn(move|| {
1a4d82fc
JJ
540 rx3.recv().unwrap();
541 tx1.clone();
542 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
543 tx1.send(2).unwrap();
544 rx3.recv().unwrap();
545 });
546
547 tx3.send(()).unwrap();
548 select! {
549 _i1 = rx1.recv() => {},
550 _i2 = rx2.recv() => panic!()
551 }
552 tx3.send(()).unwrap();
553 }
554
555 #[test]
556 fn cloning3() {
557 let (tx1, rx1) = channel::<()>();
558 let (tx2, rx2) = channel::<()>();
559 let (tx3, rx3) = channel::<()>();
85aaf69f 560 let _t = thread::spawn(move|| {
1a4d82fc
JJ
561 let s = Select::new();
562 let mut h1 = s.handle(&rx1);
563 let mut h2 = s.handle(&rx2);
564 unsafe { h2.add(); }
565 unsafe { h1.add(); }
566 assert_eq!(s.wait(), h2.id);
567 tx3.send(()).unwrap();
568 });
569
85aaf69f 570 for _ in 0..1000 { thread::yield_now(); }
1a4d82fc
JJ
571 drop(tx1.clone());
572 tx2.send(()).unwrap();
573 rx3.recv().unwrap();
574 }
575
576 #[test]
577 fn preflight1() {
578 let (tx, rx) = channel();
579 tx.send(()).unwrap();
580 select! {
581 _n = rx.recv() => {}
582 }
583 }
584
585 #[test]
586 fn preflight2() {
587 let (tx, rx) = channel();
588 tx.send(()).unwrap();
589 tx.send(()).unwrap();
590 select! {
591 _n = rx.recv() => {}
592 }
593 }
594
595 #[test]
596 fn preflight3() {
597 let (tx, rx) = channel();
598 drop(tx.clone());
599 tx.send(()).unwrap();
600 select! {
601 _n = rx.recv() => {}
602 }
603 }
604
605 #[test]
606 fn preflight4() {
607 let (tx, rx) = channel();
608 tx.send(()).unwrap();
609 let s = Select::new();
610 let mut h = s.handle(&rx);
611 unsafe { h.add(); }
612 assert_eq!(s.wait2(false), h.id);
613 }
614
615 #[test]
616 fn preflight5() {
617 let (tx, rx) = channel();
618 tx.send(()).unwrap();
619 tx.send(()).unwrap();
620 let s = Select::new();
621 let mut h = s.handle(&rx);
622 unsafe { h.add(); }
623 assert_eq!(s.wait2(false), h.id);
624 }
625
626 #[test]
627 fn preflight6() {
628 let (tx, rx) = channel();
629 drop(tx.clone());
630 tx.send(()).unwrap();
631 let s = Select::new();
632 let mut h = s.handle(&rx);
633 unsafe { h.add(); }
634 assert_eq!(s.wait2(false), h.id);
635 }
636
637 #[test]
638 fn preflight7() {
639 let (tx, rx) = channel::<()>();
640 drop(tx);
641 let s = Select::new();
642 let mut h = s.handle(&rx);
643 unsafe { h.add(); }
644 assert_eq!(s.wait2(false), h.id);
645 }
646
647 #[test]
648 fn preflight8() {
649 let (tx, rx) = channel();
650 tx.send(()).unwrap();
651 drop(tx);
652 rx.recv().unwrap();
653 let s = Select::new();
654 let mut h = s.handle(&rx);
655 unsafe { h.add(); }
656 assert_eq!(s.wait2(false), h.id);
657 }
658
659 #[test]
660 fn preflight9() {
661 let (tx, rx) = channel();
662 drop(tx.clone());
663 tx.send(()).unwrap();
664 drop(tx);
665 rx.recv().unwrap();
666 let s = Select::new();
667 let mut h = s.handle(&rx);
668 unsafe { h.add(); }
669 assert_eq!(s.wait2(false), h.id);
670 }
671
672 #[test]
673 fn oneshot_data_waiting() {
674 let (tx1, rx1) = channel();
675 let (tx2, rx2) = channel();
85aaf69f 676 let _t = thread::spawn(move|| {
1a4d82fc
JJ
677 select! {
678 _n = rx1.recv() => {}
679 }
680 tx2.send(()).unwrap();
681 });
682
85aaf69f 683 for _ in 0..100 { thread::yield_now() }
1a4d82fc
JJ
684 tx1.send(()).unwrap();
685 rx2.recv().unwrap();
686 }
687
688 #[test]
689 fn stream_data_waiting() {
690 let (tx1, rx1) = channel();
691 let (tx2, rx2) = channel();
692 tx1.send(()).unwrap();
693 tx1.send(()).unwrap();
694 rx1.recv().unwrap();
695 rx1.recv().unwrap();
85aaf69f 696 let _t = thread::spawn(move|| {
1a4d82fc
JJ
697 select! {
698 _n = rx1.recv() => {}
699 }
700 tx2.send(()).unwrap();
701 });
702
85aaf69f 703 for _ in 0..100 { thread::yield_now() }
1a4d82fc
JJ
704 tx1.send(()).unwrap();
705 rx2.recv().unwrap();
706 }
707
708 #[test]
709 fn shared_data_waiting() {
710 let (tx1, rx1) = channel();
711 let (tx2, rx2) = channel();
712 drop(tx1.clone());
713 tx1.send(()).unwrap();
714 rx1.recv().unwrap();
85aaf69f 715 let _t = thread::spawn(move|| {
1a4d82fc
JJ
716 select! {
717 _n = rx1.recv() => {}
718 }
719 tx2.send(()).unwrap();
720 });
721
85aaf69f 722 for _ in 0..100 { thread::yield_now() }
1a4d82fc
JJ
723 tx1.send(()).unwrap();
724 rx2.recv().unwrap();
725 }
726
727 #[test]
728 fn sync1() {
c34b1796 729 let (tx, rx) = sync_channel::<i32>(1);
1a4d82fc
JJ
730 tx.send(1).unwrap();
731 select! {
732 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
733 }
734 }
735
736 #[test]
737 fn sync2() {
c34b1796 738 let (tx, rx) = sync_channel::<i32>(0);
85aaf69f
SL
739 let _t = thread::spawn(move|| {
740 for _ in 0..100 { thread::yield_now() }
1a4d82fc
JJ
741 tx.send(1).unwrap();
742 });
743 select! {
744 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
745 }
746 }
747
748 #[test]
749 fn sync3() {
c34b1796
AL
750 let (tx1, rx1) = sync_channel::<i32>(0);
751 let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
85aaf69f
SL
752 let _t = thread::spawn(move|| { tx1.send(1).unwrap(); });
753 let _t = thread::spawn(move|| { tx2.send(2).unwrap(); });
1a4d82fc
JJ
754 select! {
755 n = rx1.recv() => {
756 let n = n.unwrap();
757 assert_eq!(n, 1);
758 assert_eq!(rx2.recv().unwrap(), 2);
759 },
760 n = rx2.recv() => {
761 let n = n.unwrap();
762 assert_eq!(n, 2);
763 assert_eq!(rx1.recv().unwrap(), 1);
764 }
765 }
766 }
767}