]> git.proxmox.com Git - rustc.git/blob - src/libstd/sync/mpsc/stream.rs
a194c99669263e356e1ad1bf46f160a6c212f8cf
[rustc.git] / src / libstd / sync / mpsc / stream.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /// Stream channels
12 ///
13 /// This is the flavor of channels which are optimized for one sender and one
14 /// receiver. The sender will be upgraded to a shared channel if the channel is
15 /// cloned.
16 ///
17 /// High level implementation details can be found in the comment of the parent
18 /// module.
19
20 pub use self::Failure::*;
21 pub use self::UpgradeResult::*;
22 pub use self::SelectionResult::*;
23 use self::Message::*;
24
25 use core::prelude::*;
26
27 use core::cmp;
28 use core::isize;
29 use thread;
30
31 use sync::atomic::{AtomicIsize, AtomicUsize, Ordering, AtomicBool};
32 use sync::mpsc::Receiver;
33 use sync::mpsc::blocking::{self, SignalToken};
34 use sync::mpsc::spsc_queue as spsc;
35
36 const DISCONNECTED: isize = isize::MIN;
37 #[cfg(test)]
38 const MAX_STEALS: isize = 5;
39 #[cfg(not(test))]
40 const MAX_STEALS: isize = 1 << 20;
41
42 pub struct Packet<T> {
43 queue: spsc::Queue<Message<T>>, // internal queue for all message
44
45 cnt: AtomicIsize, // How many items are on this channel
46 steals: int, // How many times has a port received without blocking?
47 to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
48
49 port_dropped: AtomicBool, // flag if the channel has been destroyed.
50 }
51
52 pub enum Failure<T> {
53 Empty,
54 Disconnected,
55 Upgraded(Receiver<T>),
56 }
57
58 pub enum UpgradeResult {
59 UpSuccess,
60 UpDisconnected,
61 UpWoke(SignalToken),
62 }
63
64 pub enum SelectionResult<T> {
65 SelSuccess,
66 SelCanceled,
67 SelUpgraded(SignalToken, Receiver<T>),
68 }
69
70 // Any message could contain an "upgrade request" to a new shared port, so the
71 // internal queue it's a queue of T, but rather Message<T>
72 enum Message<T> {
73 Data(T),
74 GoUp(Receiver<T>),
75 }
76
77 impl<T: Send + 'static> Packet<T> {
78 pub fn new() -> Packet<T> {
79 Packet {
80 queue: unsafe { spsc::Queue::new(128) },
81
82 cnt: AtomicIsize::new(0),
83 steals: 0,
84 to_wake: AtomicUsize::new(0),
85
86 port_dropped: AtomicBool::new(false),
87 }
88 }
89
90 pub fn send(&mut self, t: T) -> Result<(), T> {
91 // If the other port has deterministically gone away, then definitely
92 // must return the data back up the stack. Otherwise, the data is
93 // considered as being sent.
94 if self.port_dropped.load(Ordering::SeqCst) { return Err(t) }
95
96 match self.do_send(Data(t)) {
97 UpSuccess | UpDisconnected => {},
98 UpWoke(token) => { token.signal(); }
99 }
100 Ok(())
101 }
102
103 pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
104 // If the port has gone away, then there's no need to proceed any
105 // further.
106 if self.port_dropped.load(Ordering::SeqCst) { return UpDisconnected }
107
108 self.do_send(GoUp(up))
109 }
110
111 fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
112 self.queue.push(t);
113 match self.cnt.fetch_add(1, Ordering::SeqCst) {
114 // As described in the mod's doc comment, -1 == wakeup
115 -1 => UpWoke(self.take_to_wake()),
116 // As as described before, SPSC queues must be >= -2
117 -2 => UpSuccess,
118
119 // Be sure to preserve the disconnected state, and the return value
120 // in this case is going to be whether our data was received or not.
121 // This manifests itself on whether we have an empty queue or not.
122 //
123 // Primarily, are required to drain the queue here because the port
124 // will never remove this data. We can only have at most one item to
125 // drain (the port drains the rest).
126 DISCONNECTED => {
127 self.cnt.store(DISCONNECTED, Ordering::SeqCst);
128 let first = self.queue.pop();
129 let second = self.queue.pop();
130 assert!(second.is_none());
131
132 match first {
133 Some(..) => UpSuccess, // we failed to send the data
134 None => UpDisconnected, // we successfully sent data
135 }
136 }
137
138 // Otherwise we just sent some data on a non-waiting queue, so just
139 // make sure the world is sane and carry on!
140 n => { assert!(n >= 0); UpSuccess }
141 }
142 }
143
144 // Consumes ownership of the 'to_wake' field.
145 fn take_to_wake(&mut self) -> SignalToken {
146 let ptr = self.to_wake.load(Ordering::SeqCst);
147 self.to_wake.store(0, Ordering::SeqCst);
148 assert!(ptr != 0);
149 unsafe { SignalToken::cast_from_uint(ptr) }
150 }
151
152 // Decrements the count on the channel for a sleeper, returning the sleeper
153 // back if it shouldn't sleep. Note that this is the location where we take
154 // steals into account.
155 fn decrement(&mut self, token: SignalToken) -> Result<(), SignalToken> {
156 assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
157 let ptr = unsafe { token.cast_to_uint() };
158 self.to_wake.store(ptr, Ordering::SeqCst);
159
160 let steals = self.steals;
161 self.steals = 0;
162
163 match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
164 DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
165 // If we factor in our steals and notice that the channel has no
166 // data, we successfully sleep
167 n => {
168 assert!(n >= 0);
169 if n - steals <= 0 { return Ok(()) }
170 }
171 }
172
173 self.to_wake.store(0, Ordering::SeqCst);
174 Err(unsafe { SignalToken::cast_from_uint(ptr) })
175 }
176
177 pub fn recv(&mut self) -> Result<T, Failure<T>> {
178 // Optimistic preflight check (scheduling is expensive).
179 match self.try_recv() {
180 Err(Empty) => {}
181 data => return data,
182 }
183
184 // Welp, our channel has no data. Deschedule the current task and
185 // initiate the blocking protocol.
186 let (wait_token, signal_token) = blocking::tokens();
187 if self.decrement(signal_token).is_ok() {
188 wait_token.wait()
189 }
190
191 match self.try_recv() {
192 // Messages which actually popped from the queue shouldn't count as
193 // a steal, so offset the decrement here (we already have our
194 // "steal" factored into the channel count above).
195 data @ Ok(..) |
196 data @ Err(Upgraded(..)) => {
197 self.steals -= 1;
198 data
199 }
200
201 data => data,
202 }
203 }
204
205 pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
206 match self.queue.pop() {
207 // If we stole some data, record to that effect (this will be
208 // factored into cnt later on).
209 //
210 // Note that we don't allow steals to grow without bound in order to
211 // prevent eventual overflow of either steals or cnt as an overflow
212 // would have catastrophic results. Sometimes, steals > cnt, but
213 // other times cnt > steals, so we don't know the relation between
214 // steals and cnt. This code path is executed only rarely, so we do
215 // a pretty slow operation, of swapping 0 into cnt, taking steals
216 // down as much as possible (without going negative), and then
217 // adding back in whatever we couldn't factor into steals.
218 Some(data) => {
219 if self.steals > MAX_STEALS {
220 match self.cnt.swap(0, Ordering::SeqCst) {
221 DISCONNECTED => {
222 self.cnt.store(DISCONNECTED, Ordering::SeqCst);
223 }
224 n => {
225 let m = cmp::min(n, self.steals);
226 self.steals -= m;
227 self.bump(n - m);
228 }
229 }
230 assert!(self.steals >= 0);
231 }
232 self.steals += 1;
233 match data {
234 Data(t) => Ok(t),
235 GoUp(up) => Err(Upgraded(up)),
236 }
237 }
238
239 None => {
240 match self.cnt.load(Ordering::SeqCst) {
241 n if n != DISCONNECTED => Err(Empty),
242
243 // This is a little bit of a tricky case. We failed to pop
244 // data above, and then we have viewed that the channel is
245 // disconnected. In this window more data could have been
246 // sent on the channel. It doesn't really make sense to
247 // return that the channel is disconnected when there's
248 // actually data on it, so be extra sure there's no data by
249 // popping one more time.
250 //
251 // We can ignore steals because the other end is
252 // disconnected and we'll never need to really factor in our
253 // steals again.
254 _ => {
255 match self.queue.pop() {
256 Some(Data(t)) => Ok(t),
257 Some(GoUp(up)) => Err(Upgraded(up)),
258 None => Err(Disconnected),
259 }
260 }
261 }
262 }
263 }
264 }
265
266 pub fn drop_chan(&mut self) {
267 // Dropping a channel is pretty simple, we just flag it as disconnected
268 // and then wakeup a blocker if there is one.
269 match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
270 -1 => { self.take_to_wake().signal(); }
271 DISCONNECTED => {}
272 n => { assert!(n >= 0); }
273 }
274 }
275
276 pub fn drop_port(&mut self) {
277 // Dropping a port seems like a fairly trivial thing. In theory all we
278 // need to do is flag that we're disconnected and then everything else
279 // can take over (we don't have anyone to wake up).
280 //
281 // The catch for Ports is that we want to drop the entire contents of
282 // the queue. There are multiple reasons for having this property, the
283 // largest of which is that if another chan is waiting in this channel
284 // (but not received yet), then waiting on that port will cause a
285 // deadlock.
286 //
287 // So if we accept that we must now destroy the entire contents of the
288 // queue, this code may make a bit more sense. The tricky part is that
289 // we can't let any in-flight sends go un-dropped, we have to make sure
290 // *everything* is dropped and nothing new will come onto the channel.
291
292 // The first thing we do is set a flag saying that we're done for. All
293 // sends are gated on this flag, so we're immediately guaranteed that
294 // there are a bounded number of active sends that we'll have to deal
295 // with.
296 self.port_dropped.store(true, Ordering::SeqCst);
297
298 // Now that we're guaranteed to deal with a bounded number of senders,
299 // we need to drain the queue. This draining process happens atomically
300 // with respect to the "count" of the channel. If the count is nonzero
301 // (with steals taken into account), then there must be data on the
302 // channel. In this case we drain everything and then try again. We will
303 // continue to fail while active senders send data while we're dropping
304 // data, but eventually we're guaranteed to break out of this loop
305 // (because there is a bounded number of senders).
306 let mut steals = self.steals;
307 while {
308 let cnt = self.cnt.compare_and_swap(
309 steals, DISCONNECTED, Ordering::SeqCst);
310 cnt != DISCONNECTED && cnt != steals
311 } {
312 loop {
313 match self.queue.pop() {
314 Some(..) => { steals += 1; }
315 None => break
316 }
317 }
318 }
319
320 // At this point in time, we have gated all future senders from sending,
321 // and we have flagged the channel as being disconnected. The senders
322 // still have some responsibility, however, because some sends may not
323 // complete until after we flag the disconnection. There are more
324 // details in the sending methods that see DISCONNECTED
325 }
326
327 ////////////////////////////////////////////////////////////////////////////
328 // select implementation
329 ////////////////////////////////////////////////////////////////////////////
330
331 // Tests to see whether this port can receive without blocking. If Ok is
332 // returned, then that's the answer. If Err is returned, then the returned
333 // port needs to be queried instead (an upgrade happened)
334 pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
335 // We peek at the queue to see if there's anything on it, and we use
336 // this return value to determine if we should pop from the queue and
337 // upgrade this channel immediately. If it looks like we've got an
338 // upgrade pending, then go through the whole recv rigamarole to update
339 // the internal state.
340 match self.queue.peek() {
341 Some(&mut GoUp(..)) => {
342 match self.recv() {
343 Err(Upgraded(port)) => Err(port),
344 _ => unreachable!(),
345 }
346 }
347 Some(..) => Ok(true),
348 None => Ok(false)
349 }
350 }
351
352 // increment the count on the channel (used for selection)
353 fn bump(&mut self, amt: int) -> int {
354 match self.cnt.fetch_add(amt, Ordering::SeqCst) {
355 DISCONNECTED => {
356 self.cnt.store(DISCONNECTED, Ordering::SeqCst);
357 DISCONNECTED
358 }
359 n => n
360 }
361 }
362
363 // Attempts to start selecting on this port. Like a oneshot, this can fail
364 // immediately because of an upgrade.
365 pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> {
366 match self.decrement(token) {
367 Ok(()) => SelSuccess,
368 Err(token) => {
369 let ret = match self.queue.peek() {
370 Some(&mut GoUp(..)) => {
371 match self.queue.pop() {
372 Some(GoUp(port)) => SelUpgraded(token, port),
373 _ => unreachable!(),
374 }
375 }
376 Some(..) => SelCanceled,
377 None => SelCanceled,
378 };
379 // Undo our decrement above, and we should be guaranteed that the
380 // previous value is positive because we're not going to sleep
381 let prev = self.bump(1);
382 assert!(prev == DISCONNECTED || prev >= 0);
383 return ret;
384 }
385 }
386 }
387
388 // Removes a previous task from being blocked in this port
389 pub fn abort_selection(&mut self,
390 was_upgrade: bool) -> Result<bool, Receiver<T>> {
391 // If we're aborting selection after upgrading from a oneshot, then
392 // we're guarantee that no one is waiting. The only way that we could
393 // have seen the upgrade is if data was actually sent on the channel
394 // half again. For us, this means that there is guaranteed to be data on
395 // this channel. Furthermore, we're guaranteed that there was no
396 // start_selection previously, so there's no need to modify `self.cnt`
397 // at all.
398 //
399 // Hence, because of these invariants, we immediately return `Ok(true)`.
400 // Note that the data may not actually be sent on the channel just yet.
401 // The other end could have flagged the upgrade but not sent data to
402 // this end. This is fine because we know it's a small bounded windows
403 // of time until the data is actually sent.
404 if was_upgrade {
405 assert_eq!(self.steals, 0);
406 assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
407 return Ok(true)
408 }
409
410 // We want to make sure that the count on the channel goes non-negative,
411 // and in the stream case we can have at most one steal, so just assume
412 // that we had one steal.
413 let steals = 1;
414 let prev = self.bump(steals + 1);
415
416 // If we were previously disconnected, then we know for sure that there
417 // is no task in to_wake, so just keep going
418 let has_data = if prev == DISCONNECTED {
419 assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
420 true // there is data, that data is that we're disconnected
421 } else {
422 let cur = prev + steals + 1;
423 assert!(cur >= 0);
424
425 // If the previous count was negative, then we just made things go
426 // positive, hence we passed the -1 boundary and we're responsible
427 // for removing the to_wake() field and trashing it.
428 //
429 // If the previous count was positive then we're in a tougher
430 // situation. A possible race is that a sender just incremented
431 // through -1 (meaning it's going to try to wake a task up), but it
432 // hasn't yet read the to_wake. In order to prevent a future recv()
433 // from waking up too early (this sender picking up the plastered
434 // over to_wake), we spin loop here waiting for to_wake to be 0.
435 // Note that this entire select() implementation needs an overhaul,
436 // and this is *not* the worst part of it, so this is not done as a
437 // final solution but rather out of necessity for now to get
438 // something working.
439 if prev < 0 {
440 drop(self.take_to_wake());
441 } else {
442 while self.to_wake.load(Ordering::SeqCst) != 0 {
443 thread::yield_now();
444 }
445 }
446 assert_eq!(self.steals, 0);
447 self.steals = steals;
448
449 // if we were previously positive, then there's surely data to
450 // receive
451 prev >= 0
452 };
453
454 // Now that we've determined that this queue "has data", we peek at the
455 // queue to see if the data is an upgrade or not. If it's an upgrade,
456 // then we need to destroy this port and abort selection on the
457 // upgraded port.
458 if has_data {
459 match self.queue.peek() {
460 Some(&mut GoUp(..)) => {
461 match self.queue.pop() {
462 Some(GoUp(port)) => Err(port),
463 _ => unreachable!(),
464 }
465 }
466 _ => Ok(true),
467 }
468 } else {
469 Ok(false)
470 }
471 }
472 }
473
474 #[unsafe_destructor]
475 impl<T: Send + 'static> Drop for Packet<T> {
476 fn drop(&mut self) {
477 // Note that this load is not only an assert for correctness about
478 // disconnection, but also a proper fence before the read of
479 // `to_wake`, so this assert cannot be removed with also removing
480 // the `to_wake` assert.
481 assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
482 assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
483 }
484 }