]>
Commit | Line | Data |
---|---|---|
1a4d82fc JJ |
1 | // Copyright 2014 The Rust Project Developers. See the COPYRIGHT |
2 | // file at the top-level directory of this distribution and at | |
3 | // http://rust-lang.org/COPYRIGHT. | |
4 | // | |
5 | // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or | |
6 | // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license | |
7 | // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your | |
8 | // option. This file may not be copied, modified, or distributed | |
9 | // except according to those terms. | |
10 | ||
11 | /// Stream channels | |
12 | /// | |
13 | /// This is the flavor of channels which are optimized for one sender and one | |
14 | /// receiver. The sender will be upgraded to a shared channel if the channel is | |
15 | /// cloned. | |
16 | /// | |
17 | /// High level implementation details can be found in the comment of the parent | |
18 | /// module. | |
19 | ||
20 | pub use self::Failure::*; | |
21 | pub use self::UpgradeResult::*; | |
22 | pub use self::SelectionResult::*; | |
23 | use self::Message::*; | |
24 | ||
25 | use core::prelude::*; | |
26 | ||
27 | use core::cmp; | |
85aaf69f SL |
28 | use core::isize; |
29 | use thread; | |
1a4d82fc | 30 | |
85aaf69f | 31 | use sync::atomic::{AtomicIsize, AtomicUsize, Ordering, AtomicBool}; |
1a4d82fc JJ |
32 | use sync::mpsc::Receiver; |
33 | use sync::mpsc::blocking::{self, SignalToken}; | |
34 | use sync::mpsc::spsc_queue as spsc; | |
35 | ||
85aaf69f | 36 | const DISCONNECTED: isize = isize::MIN; |
1a4d82fc | 37 | #[cfg(test)] |
85aaf69f | 38 | const MAX_STEALS: isize = 5; |
1a4d82fc | 39 | #[cfg(not(test))] |
85aaf69f | 40 | const MAX_STEALS: isize = 1 << 20; |
1a4d82fc JJ |
41 | |
42 | pub struct Packet<T> { | |
43 | queue: spsc::Queue<Message<T>>, // internal queue for all message | |
44 | ||
85aaf69f | 45 | cnt: AtomicIsize, // How many items are on this channel |
c34b1796 | 46 | steals: isize, // How many times has a port received without blocking? |
85aaf69f | 47 | to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up |
1a4d82fc JJ |
48 | |
49 | port_dropped: AtomicBool, // flag if the channel has been destroyed. | |
50 | } | |
51 | ||
52 | pub enum Failure<T> { | |
53 | Empty, | |
54 | Disconnected, | |
55 | Upgraded(Receiver<T>), | |
56 | } | |
57 | ||
58 | pub enum UpgradeResult { | |
59 | UpSuccess, | |
60 | UpDisconnected, | |
61 | UpWoke(SignalToken), | |
62 | } | |
63 | ||
64 | pub enum SelectionResult<T> { | |
65 | SelSuccess, | |
66 | SelCanceled, | |
67 | SelUpgraded(SignalToken, Receiver<T>), | |
68 | } | |
69 | ||
70 | // Any message could contain an "upgrade request" to a new shared port, so the | |
71 | // internal queue it's a queue of T, but rather Message<T> | |
72 | enum Message<T> { | |
73 | Data(T), | |
74 | GoUp(Receiver<T>), | |
75 | } | |
76 | ||
c34b1796 | 77 | impl<T> Packet<T> { |
1a4d82fc JJ |
78 | pub fn new() -> Packet<T> { |
79 | Packet { | |
80 | queue: unsafe { spsc::Queue::new(128) }, | |
81 | ||
85aaf69f | 82 | cnt: AtomicIsize::new(0), |
1a4d82fc | 83 | steals: 0, |
85aaf69f | 84 | to_wake: AtomicUsize::new(0), |
1a4d82fc JJ |
85 | |
86 | port_dropped: AtomicBool::new(false), | |
87 | } | |
88 | } | |
89 | ||
90 | pub fn send(&mut self, t: T) -> Result<(), T> { | |
91 | // If the other port has deterministically gone away, then definitely | |
92 | // must return the data back up the stack. Otherwise, the data is | |
93 | // considered as being sent. | |
94 | if self.port_dropped.load(Ordering::SeqCst) { return Err(t) } | |
95 | ||
96 | match self.do_send(Data(t)) { | |
97 | UpSuccess | UpDisconnected => {}, | |
98 | UpWoke(token) => { token.signal(); } | |
99 | } | |
100 | Ok(()) | |
101 | } | |
102 | ||
103 | pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult { | |
104 | // If the port has gone away, then there's no need to proceed any | |
105 | // further. | |
106 | if self.port_dropped.load(Ordering::SeqCst) { return UpDisconnected } | |
107 | ||
108 | self.do_send(GoUp(up)) | |
109 | } | |
110 | ||
111 | fn do_send(&mut self, t: Message<T>) -> UpgradeResult { | |
112 | self.queue.push(t); | |
113 | match self.cnt.fetch_add(1, Ordering::SeqCst) { | |
114 | // As described in the mod's doc comment, -1 == wakeup | |
115 | -1 => UpWoke(self.take_to_wake()), | |
116 | // As as described before, SPSC queues must be >= -2 | |
117 | -2 => UpSuccess, | |
118 | ||
119 | // Be sure to preserve the disconnected state, and the return value | |
120 | // in this case is going to be whether our data was received or not. | |
121 | // This manifests itself on whether we have an empty queue or not. | |
122 | // | |
123 | // Primarily, are required to drain the queue here because the port | |
124 | // will never remove this data. We can only have at most one item to | |
125 | // drain (the port drains the rest). | |
126 | DISCONNECTED => { | |
127 | self.cnt.store(DISCONNECTED, Ordering::SeqCst); | |
128 | let first = self.queue.pop(); | |
129 | let second = self.queue.pop(); | |
130 | assert!(second.is_none()); | |
131 | ||
132 | match first { | |
133 | Some(..) => UpSuccess, // we failed to send the data | |
134 | None => UpDisconnected, // we successfully sent data | |
135 | } | |
136 | } | |
137 | ||
138 | // Otherwise we just sent some data on a non-waiting queue, so just | |
139 | // make sure the world is sane and carry on! | |
140 | n => { assert!(n >= 0); UpSuccess } | |
141 | } | |
142 | } | |
143 | ||
144 | // Consumes ownership of the 'to_wake' field. | |
145 | fn take_to_wake(&mut self) -> SignalToken { | |
146 | let ptr = self.to_wake.load(Ordering::SeqCst); | |
147 | self.to_wake.store(0, Ordering::SeqCst); | |
148 | assert!(ptr != 0); | |
c34b1796 | 149 | unsafe { SignalToken::cast_from_usize(ptr) } |
1a4d82fc JJ |
150 | } |
151 | ||
152 | // Decrements the count on the channel for a sleeper, returning the sleeper | |
153 | // back if it shouldn't sleep. Note that this is the location where we take | |
154 | // steals into account. | |
155 | fn decrement(&mut self, token: SignalToken) -> Result<(), SignalToken> { | |
156 | assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); | |
c34b1796 | 157 | let ptr = unsafe { token.cast_to_usize() }; |
1a4d82fc JJ |
158 | self.to_wake.store(ptr, Ordering::SeqCst); |
159 | ||
160 | let steals = self.steals; | |
161 | self.steals = 0; | |
162 | ||
163 | match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { | |
164 | DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } | |
165 | // If we factor in our steals and notice that the channel has no | |
166 | // data, we successfully sleep | |
167 | n => { | |
168 | assert!(n >= 0); | |
169 | if n - steals <= 0 { return Ok(()) } | |
170 | } | |
171 | } | |
172 | ||
173 | self.to_wake.store(0, Ordering::SeqCst); | |
c34b1796 | 174 | Err(unsafe { SignalToken::cast_from_usize(ptr) }) |
1a4d82fc JJ |
175 | } |
176 | ||
177 | pub fn recv(&mut self) -> Result<T, Failure<T>> { | |
178 | // Optimistic preflight check (scheduling is expensive). | |
179 | match self.try_recv() { | |
180 | Err(Empty) => {} | |
181 | data => return data, | |
182 | } | |
183 | ||
bd371182 | 184 | // Welp, our channel has no data. Deschedule the current thread and |
1a4d82fc JJ |
185 | // initiate the blocking protocol. |
186 | let (wait_token, signal_token) = blocking::tokens(); | |
187 | if self.decrement(signal_token).is_ok() { | |
188 | wait_token.wait() | |
189 | } | |
190 | ||
191 | match self.try_recv() { | |
192 | // Messages which actually popped from the queue shouldn't count as | |
193 | // a steal, so offset the decrement here (we already have our | |
194 | // "steal" factored into the channel count above). | |
195 | data @ Ok(..) | | |
196 | data @ Err(Upgraded(..)) => { | |
197 | self.steals -= 1; | |
198 | data | |
199 | } | |
200 | ||
201 | data => data, | |
202 | } | |
203 | } | |
204 | ||
205 | pub fn try_recv(&mut self) -> Result<T, Failure<T>> { | |
206 | match self.queue.pop() { | |
207 | // If we stole some data, record to that effect (this will be | |
208 | // factored into cnt later on). | |
209 | // | |
210 | // Note that we don't allow steals to grow without bound in order to | |
211 | // prevent eventual overflow of either steals or cnt as an overflow | |
212 | // would have catastrophic results. Sometimes, steals > cnt, but | |
213 | // other times cnt > steals, so we don't know the relation between | |
214 | // steals and cnt. This code path is executed only rarely, so we do | |
215 | // a pretty slow operation, of swapping 0 into cnt, taking steals | |
216 | // down as much as possible (without going negative), and then | |
217 | // adding back in whatever we couldn't factor into steals. | |
218 | Some(data) => { | |
219 | if self.steals > MAX_STEALS { | |
220 | match self.cnt.swap(0, Ordering::SeqCst) { | |
221 | DISCONNECTED => { | |
222 | self.cnt.store(DISCONNECTED, Ordering::SeqCst); | |
223 | } | |
224 | n => { | |
225 | let m = cmp::min(n, self.steals); | |
226 | self.steals -= m; | |
227 | self.bump(n - m); | |
228 | } | |
229 | } | |
230 | assert!(self.steals >= 0); | |
231 | } | |
232 | self.steals += 1; | |
233 | match data { | |
234 | Data(t) => Ok(t), | |
235 | GoUp(up) => Err(Upgraded(up)), | |
236 | } | |
237 | } | |
238 | ||
239 | None => { | |
240 | match self.cnt.load(Ordering::SeqCst) { | |
241 | n if n != DISCONNECTED => Err(Empty), | |
242 | ||
243 | // This is a little bit of a tricky case. We failed to pop | |
244 | // data above, and then we have viewed that the channel is | |
245 | // disconnected. In this window more data could have been | |
246 | // sent on the channel. It doesn't really make sense to | |
247 | // return that the channel is disconnected when there's | |
248 | // actually data on it, so be extra sure there's no data by | |
249 | // popping one more time. | |
250 | // | |
251 | // We can ignore steals because the other end is | |
252 | // disconnected and we'll never need to really factor in our | |
253 | // steals again. | |
254 | _ => { | |
255 | match self.queue.pop() { | |
256 | Some(Data(t)) => Ok(t), | |
257 | Some(GoUp(up)) => Err(Upgraded(up)), | |
258 | None => Err(Disconnected), | |
259 | } | |
260 | } | |
261 | } | |
262 | } | |
263 | } | |
264 | } | |
265 | ||
266 | pub fn drop_chan(&mut self) { | |
267 | // Dropping a channel is pretty simple, we just flag it as disconnected | |
268 | // and then wakeup a blocker if there is one. | |
269 | match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) { | |
270 | -1 => { self.take_to_wake().signal(); } | |
271 | DISCONNECTED => {} | |
272 | n => { assert!(n >= 0); } | |
273 | } | |
274 | } | |
275 | ||
276 | pub fn drop_port(&mut self) { | |
277 | // Dropping a port seems like a fairly trivial thing. In theory all we | |
278 | // need to do is flag that we're disconnected and then everything else | |
279 | // can take over (we don't have anyone to wake up). | |
280 | // | |
281 | // The catch for Ports is that we want to drop the entire contents of | |
282 | // the queue. There are multiple reasons for having this property, the | |
283 | // largest of which is that if another chan is waiting in this channel | |
284 | // (but not received yet), then waiting on that port will cause a | |
285 | // deadlock. | |
286 | // | |
287 | // So if we accept that we must now destroy the entire contents of the | |
288 | // queue, this code may make a bit more sense. The tricky part is that | |
289 | // we can't let any in-flight sends go un-dropped, we have to make sure | |
290 | // *everything* is dropped and nothing new will come onto the channel. | |
291 | ||
292 | // The first thing we do is set a flag saying that we're done for. All | |
293 | // sends are gated on this flag, so we're immediately guaranteed that | |
294 | // there are a bounded number of active sends that we'll have to deal | |
295 | // with. | |
296 | self.port_dropped.store(true, Ordering::SeqCst); | |
297 | ||
298 | // Now that we're guaranteed to deal with a bounded number of senders, | |
299 | // we need to drain the queue. This draining process happens atomically | |
300 | // with respect to the "count" of the channel. If the count is nonzero | |
301 | // (with steals taken into account), then there must be data on the | |
302 | // channel. In this case we drain everything and then try again. We will | |
303 | // continue to fail while active senders send data while we're dropping | |
304 | // data, but eventually we're guaranteed to break out of this loop | |
305 | // (because there is a bounded number of senders). | |
306 | let mut steals = self.steals; | |
307 | while { | |
308 | let cnt = self.cnt.compare_and_swap( | |
309 | steals, DISCONNECTED, Ordering::SeqCst); | |
310 | cnt != DISCONNECTED && cnt != steals | |
311 | } { | |
312 | loop { | |
313 | match self.queue.pop() { | |
314 | Some(..) => { steals += 1; } | |
315 | None => break | |
316 | } | |
317 | } | |
318 | } | |
319 | ||
320 | // At this point in time, we have gated all future senders from sending, | |
321 | // and we have flagged the channel as being disconnected. The senders | |
322 | // still have some responsibility, however, because some sends may not | |
323 | // complete until after we flag the disconnection. There are more | |
324 | // details in the sending methods that see DISCONNECTED | |
325 | } | |
326 | ||
327 | //////////////////////////////////////////////////////////////////////////// | |
328 | // select implementation | |
329 | //////////////////////////////////////////////////////////////////////////// | |
330 | ||
331 | // Tests to see whether this port can receive without blocking. If Ok is | |
332 | // returned, then that's the answer. If Err is returned, then the returned | |
333 | // port needs to be queried instead (an upgrade happened) | |
334 | pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> { | |
335 | // We peek at the queue to see if there's anything on it, and we use | |
336 | // this return value to determine if we should pop from the queue and | |
337 | // upgrade this channel immediately. If it looks like we've got an | |
338 | // upgrade pending, then go through the whole recv rigamarole to update | |
339 | // the internal state. | |
340 | match self.queue.peek() { | |
341 | Some(&mut GoUp(..)) => { | |
342 | match self.recv() { | |
343 | Err(Upgraded(port)) => Err(port), | |
344 | _ => unreachable!(), | |
345 | } | |
346 | } | |
347 | Some(..) => Ok(true), | |
348 | None => Ok(false) | |
349 | } | |
350 | } | |
351 | ||
352 | // increment the count on the channel (used for selection) | |
c34b1796 | 353 | fn bump(&mut self, amt: isize) -> isize { |
1a4d82fc JJ |
354 | match self.cnt.fetch_add(amt, Ordering::SeqCst) { |
355 | DISCONNECTED => { | |
356 | self.cnt.store(DISCONNECTED, Ordering::SeqCst); | |
357 | DISCONNECTED | |
358 | } | |
359 | n => n | |
360 | } | |
361 | } | |
362 | ||
363 | // Attempts to start selecting on this port. Like a oneshot, this can fail | |
364 | // immediately because of an upgrade. | |
365 | pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> { | |
366 | match self.decrement(token) { | |
367 | Ok(()) => SelSuccess, | |
368 | Err(token) => { | |
369 | let ret = match self.queue.peek() { | |
370 | Some(&mut GoUp(..)) => { | |
371 | match self.queue.pop() { | |
372 | Some(GoUp(port)) => SelUpgraded(token, port), | |
373 | _ => unreachable!(), | |
374 | } | |
375 | } | |
376 | Some(..) => SelCanceled, | |
377 | None => SelCanceled, | |
378 | }; | |
379 | // Undo our decrement above, and we should be guaranteed that the | |
380 | // previous value is positive because we're not going to sleep | |
381 | let prev = self.bump(1); | |
382 | assert!(prev == DISCONNECTED || prev >= 0); | |
383 | return ret; | |
384 | } | |
385 | } | |
386 | } | |
387 | ||
bd371182 | 388 | // Removes a previous thread from being blocked in this port |
1a4d82fc JJ |
389 | pub fn abort_selection(&mut self, |
390 | was_upgrade: bool) -> Result<bool, Receiver<T>> { | |
391 | // If we're aborting selection after upgrading from a oneshot, then | |
392 | // we're guarantee that no one is waiting. The only way that we could | |
393 | // have seen the upgrade is if data was actually sent on the channel | |
394 | // half again. For us, this means that there is guaranteed to be data on | |
395 | // this channel. Furthermore, we're guaranteed that there was no | |
396 | // start_selection previously, so there's no need to modify `self.cnt` | |
397 | // at all. | |
398 | // | |
399 | // Hence, because of these invariants, we immediately return `Ok(true)`. | |
400 | // Note that the data may not actually be sent on the channel just yet. | |
401 | // The other end could have flagged the upgrade but not sent data to | |
402 | // this end. This is fine because we know it's a small bounded windows | |
403 | // of time until the data is actually sent. | |
404 | if was_upgrade { | |
405 | assert_eq!(self.steals, 0); | |
406 | assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); | |
407 | return Ok(true) | |
408 | } | |
409 | ||
410 | // We want to make sure that the count on the channel goes non-negative, | |
411 | // and in the stream case we can have at most one steal, so just assume | |
412 | // that we had one steal. | |
413 | let steals = 1; | |
414 | let prev = self.bump(steals + 1); | |
415 | ||
416 | // If we were previously disconnected, then we know for sure that there | |
bd371182 | 417 | // is no thread in to_wake, so just keep going |
1a4d82fc JJ |
418 | let has_data = if prev == DISCONNECTED { |
419 | assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); | |
420 | true // there is data, that data is that we're disconnected | |
421 | } else { | |
422 | let cur = prev + steals + 1; | |
423 | assert!(cur >= 0); | |
424 | ||
425 | // If the previous count was negative, then we just made things go | |
426 | // positive, hence we passed the -1 boundary and we're responsible | |
427 | // for removing the to_wake() field and trashing it. | |
428 | // | |
429 | // If the previous count was positive then we're in a tougher | |
430 | // situation. A possible race is that a sender just incremented | |
bd371182 | 431 | // through -1 (meaning it's going to try to wake a thread up), but it |
1a4d82fc JJ |
432 | // hasn't yet read the to_wake. In order to prevent a future recv() |
433 | // from waking up too early (this sender picking up the plastered | |
434 | // over to_wake), we spin loop here waiting for to_wake to be 0. | |
435 | // Note that this entire select() implementation needs an overhaul, | |
436 | // and this is *not* the worst part of it, so this is not done as a | |
437 | // final solution but rather out of necessity for now to get | |
438 | // something working. | |
439 | if prev < 0 { | |
440 | drop(self.take_to_wake()); | |
441 | } else { | |
442 | while self.to_wake.load(Ordering::SeqCst) != 0 { | |
85aaf69f | 443 | thread::yield_now(); |
1a4d82fc JJ |
444 | } |
445 | } | |
446 | assert_eq!(self.steals, 0); | |
447 | self.steals = steals; | |
448 | ||
449 | // if we were previously positive, then there's surely data to | |
450 | // receive | |
451 | prev >= 0 | |
452 | }; | |
453 | ||
454 | // Now that we've determined that this queue "has data", we peek at the | |
455 | // queue to see if the data is an upgrade or not. If it's an upgrade, | |
456 | // then we need to destroy this port and abort selection on the | |
457 | // upgraded port. | |
458 | if has_data { | |
459 | match self.queue.peek() { | |
460 | Some(&mut GoUp(..)) => { | |
461 | match self.queue.pop() { | |
462 | Some(GoUp(port)) => Err(port), | |
463 | _ => unreachable!(), | |
464 | } | |
465 | } | |
466 | _ => Ok(true), | |
467 | } | |
468 | } else { | |
469 | Ok(false) | |
470 | } | |
471 | } | |
472 | } | |
473 | ||
474 | #[unsafe_destructor] | |
c34b1796 | 475 | impl<T> Drop for Packet<T> { |
1a4d82fc JJ |
476 | fn drop(&mut self) { |
477 | // Note that this load is not only an assert for correctness about | |
478 | // disconnection, but also a proper fence before the read of | |
479 | // `to_wake`, so this assert cannot be removed with also removing | |
480 | // the `to_wake` assert. | |
481 | assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED); | |
482 | assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); | |
483 | } | |
484 | } |