]>
Commit | Line | Data |
---|---|---|
1a4d82fc JJ |
1 | // Copyright 2014 The Rust Project Developers. See the COPYRIGHT |
2 | // file at the top-level directory of this distribution and at | |
3 | // http://rust-lang.org/COPYRIGHT. | |
4 | // | |
5 | // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or | |
6 | // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license | |
7 | // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your | |
8 | // option. This file may not be copied, modified, or distributed | |
9 | // except according to those terms. | |
10 | ||
11 | /// Shared channels | |
12 | /// | |
13 | /// This is the flavor of channels which are not necessarily optimized for any | |
14 | /// particular use case, but are the most general in how they are used. Shared | |
15 | /// channels are cloneable allowing for multiple senders. | |
16 | /// | |
17 | /// High level implementation details can be found in the comment of the parent | |
18 | /// module. You'll also note that the implementation of the shared and stream | |
19 | /// channels are quite similar, and this is no coincidence! | |
20 | ||
21 | pub use self::Failure::*; | |
22 | ||
1a4d82fc | 23 | use core::cmp; |
9e0c209e | 24 | use core::intrinsics::abort; |
85aaf69f | 25 | use core::isize; |
1a4d82fc | 26 | |
85aaf69f | 27 | use sync::atomic::{AtomicUsize, AtomicIsize, AtomicBool, Ordering}; |
1a4d82fc JJ |
28 | use sync::mpsc::blocking::{self, SignalToken}; |
29 | use sync::mpsc::mpsc_queue as mpsc; | |
30 | use sync::mpsc::select::StartResult::*; | |
31 | use sync::mpsc::select::StartResult; | |
32 | use sync::{Mutex, MutexGuard}; | |
85aaf69f | 33 | use thread; |
3157f602 | 34 | use time::Instant; |
1a4d82fc | 35 | |
85aaf69f SL |
36 | const DISCONNECTED: isize = isize::MIN; |
37 | const FUDGE: isize = 1024; | |
9e0c209e | 38 | const MAX_REFCOUNT: usize = (isize::MAX) as usize; |
1a4d82fc | 39 | #[cfg(test)] |
85aaf69f | 40 | const MAX_STEALS: isize = 5; |
1a4d82fc | 41 | #[cfg(not(test))] |
85aaf69f | 42 | const MAX_STEALS: isize = 1 << 20; |
1a4d82fc JJ |
43 | |
44 | pub struct Packet<T> { | |
45 | queue: mpsc::Queue<T>, | |
85aaf69f SL |
46 | cnt: AtomicIsize, // How many items are on this channel |
47 | steals: isize, // How many times has a port received without blocking? | |
48 | to_wake: AtomicUsize, // SignalToken for wake up | |
1a4d82fc JJ |
49 | |
50 | // The number of channels which are currently using this packet. | |
9e0c209e | 51 | channels: AtomicUsize, |
1a4d82fc JJ |
52 | |
53 | // See the discussion in Port::drop and the channel send methods for what | |
54 | // these are used for | |
55 | port_dropped: AtomicBool, | |
85aaf69f | 56 | sender_drain: AtomicIsize, |
1a4d82fc JJ |
57 | |
58 | // this lock protects various portions of this implementation during | |
59 | // select() | |
60 | select_lock: Mutex<()>, | |
61 | } | |
62 | ||
63 | pub enum Failure { | |
64 | Empty, | |
65 | Disconnected, | |
66 | } | |
67 | ||
c34b1796 | 68 | impl<T> Packet<T> { |
1a4d82fc JJ |
69 | // Creation of a packet *must* be followed by a call to postinit_lock |
70 | // and later by inherit_blocker | |
71 | pub fn new() -> Packet<T> { | |
3157f602 | 72 | Packet { |
1a4d82fc | 73 | queue: mpsc::Queue::new(), |
85aaf69f | 74 | cnt: AtomicIsize::new(0), |
1a4d82fc | 75 | steals: 0, |
85aaf69f | 76 | to_wake: AtomicUsize::new(0), |
9e0c209e | 77 | channels: AtomicUsize::new(2), |
1a4d82fc | 78 | port_dropped: AtomicBool::new(false), |
85aaf69f | 79 | sender_drain: AtomicIsize::new(0), |
1a4d82fc | 80 | select_lock: Mutex::new(()), |
3157f602 | 81 | } |
1a4d82fc JJ |
82 | } |
83 | ||
84 | // This function should be used after newly created Packet | |
85 | // was wrapped with an Arc | |
86 | // In other case mutex data will be duplicated while cloning | |
87 | // and that could cause problems on platforms where it is | |
88 | // represented by opaque data structure | |
89 | pub fn postinit_lock(&self) -> MutexGuard<()> { | |
90 | self.select_lock.lock().unwrap() | |
91 | } | |
92 | ||
93 | // This function is used at the creation of a shared packet to inherit a | |
bd371182 AL |
94 | // previously blocked thread. This is done to prevent spurious wakeups of |
95 | // threads in select(). | |
1a4d82fc JJ |
96 | // |
97 | // This can only be called at channel-creation time | |
98 | pub fn inherit_blocker(&mut self, | |
99 | token: Option<SignalToken>, | |
100 | guard: MutexGuard<()>) { | |
101 | token.map(|token| { | |
102 | assert_eq!(self.cnt.load(Ordering::SeqCst), 0); | |
103 | assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); | |
c34b1796 | 104 | self.to_wake.store(unsafe { token.cast_to_usize() }, Ordering::SeqCst); |
1a4d82fc JJ |
105 | self.cnt.store(-1, Ordering::SeqCst); |
106 | ||
107 | // This store is a little sketchy. What's happening here is that | |
108 | // we're transferring a blocker from a oneshot or stream channel to | |
109 | // this shared channel. In doing so, we never spuriously wake them | |
110 | // up and rather only wake them up at the appropriate time. This | |
111 | // implementation of shared channels assumes that any blocking | |
112 | // recv() will undo the increment of steals performed in try_recv() | |
113 | // once the recv is complete. This thread that we're inheriting, | |
114 | // however, is not in the middle of recv. Hence, the first time we | |
115 | // wake them up, they're going to wake up from their old port, move | |
116 | // on to the upgraded port, and then call the block recv() function. | |
117 | // | |
118 | // When calling this function, they'll find there's data immediately | |
119 | // available, counting it as a steal. This in fact wasn't a steal | |
120 | // because we appropriately blocked them waiting for data. | |
121 | // | |
122 | // To offset this bad increment, we initially set the steal count to | |
123 | // -1. You'll find some special code in abort_selection() as well to | |
124 | // ensure that this -1 steal count doesn't escape too far. | |
125 | self.steals = -1; | |
126 | }); | |
127 | ||
128 | // When the shared packet is constructed, we grabbed this lock. The | |
129 | // purpose of this lock is to ensure that abort_selection() doesn't | |
130 | // interfere with this method. After we unlock this lock, we're | |
131 | // signifying that we're done modifying self.cnt and self.to_wake and | |
132 | // the port is ready for the world to continue using it. | |
133 | drop(guard); | |
134 | } | |
135 | ||
136 | pub fn send(&mut self, t: T) -> Result<(), T> { | |
137 | // See Port::drop for what's going on | |
138 | if self.port_dropped.load(Ordering::SeqCst) { return Err(t) } | |
139 | ||
140 | // Note that the multiple sender case is a little trickier | |
141 | // semantically than the single sender case. The logic for | |
142 | // incrementing is "add and if disconnected store disconnected". | |
143 | // This could end up leading some senders to believe that there | |
144 | // wasn't a disconnect if in fact there was a disconnect. This means | |
145 | // that while one thread is attempting to re-store the disconnected | |
146 | // states, other threads could walk through merrily incrementing | |
147 | // this very-negative disconnected count. To prevent senders from | |
148 | // spuriously attempting to send when the channels is actually | |
149 | // disconnected, the count has a ranged check here. | |
150 | // | |
151 | // This is also done for another reason. Remember that the return | |
152 | // value of this function is: | |
153 | // | |
154 | // `true` == the data *may* be received, this essentially has no | |
155 | // meaning | |
156 | // `false` == the data will *never* be received, this has a lot of | |
157 | // meaning | |
158 | // | |
159 | // In the SPSC case, we have a check of 'queue.is_empty()' to see | |
160 | // whether the data was actually received, but this same condition | |
161 | // means nothing in a multi-producer context. As a result, this | |
162 | // preflight check serves as the definitive "this will never be | |
163 | // received". Once we get beyond this check, we have permanently | |
164 | // entered the realm of "this may be received" | |
165 | if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE { | |
166 | return Err(t) | |
167 | } | |
168 | ||
169 | self.queue.push(t); | |
170 | match self.cnt.fetch_add(1, Ordering::SeqCst) { | |
171 | -1 => { | |
172 | self.take_to_wake().signal(); | |
173 | } | |
174 | ||
175 | // In this case, we have possibly failed to send our data, and | |
176 | // we need to consider re-popping the data in order to fully | |
177 | // destroy it. We must arbitrate among the multiple senders, | |
178 | // however, because the queues that we're using are | |
179 | // single-consumer queues. In order to do this, all exiting | |
180 | // pushers will use an atomic count in order to count those | |
181 | // flowing through. Pushers who see 0 are required to drain as | |
182 | // much as possible, and then can only exit when they are the | |
183 | // only pusher (otherwise they must try again). | |
184 | n if n < DISCONNECTED + FUDGE => { | |
185 | // see the comment in 'try' for a shared channel for why this | |
186 | // window of "not disconnected" is ok. | |
187 | self.cnt.store(DISCONNECTED, Ordering::SeqCst); | |
188 | ||
189 | if self.sender_drain.fetch_add(1, Ordering::SeqCst) == 0 { | |
190 | loop { | |
191 | // drain the queue, for info on the thread yield see the | |
192 | // discussion in try_recv | |
193 | loop { | |
194 | match self.queue.pop() { | |
195 | mpsc::Data(..) => {} | |
196 | mpsc::Empty => break, | |
85aaf69f | 197 | mpsc::Inconsistent => thread::yield_now(), |
1a4d82fc JJ |
198 | } |
199 | } | |
200 | // maybe we're done, if we're not the last ones | |
201 | // here, then we need to go try again. | |
202 | if self.sender_drain.fetch_sub(1, Ordering::SeqCst) == 1 { | |
203 | break | |
204 | } | |
205 | } | |
206 | ||
207 | // At this point, there may still be data on the queue, | |
208 | // but only if the count hasn't been incremented and | |
209 | // some other sender hasn't finished pushing data just | |
210 | // yet. That sender in question will drain its own data. | |
211 | } | |
212 | } | |
213 | ||
214 | // Can't make any assumptions about this case like in the SPSC case. | |
215 | _ => {} | |
216 | } | |
217 | ||
218 | Ok(()) | |
219 | } | |
220 | ||
3157f602 | 221 | pub fn recv(&mut self, deadline: Option<Instant>) -> Result<T, Failure> { |
1a4d82fc JJ |
222 | // This code is essentially the exact same as that found in the stream |
223 | // case (see stream.rs) | |
224 | match self.try_recv() { | |
225 | Err(Empty) => {} | |
226 | data => return data, | |
227 | } | |
228 | ||
229 | let (wait_token, signal_token) = blocking::tokens(); | |
230 | if self.decrement(signal_token) == Installed { | |
3157f602 XL |
231 | if let Some(deadline) = deadline { |
232 | let timed_out = !wait_token.wait_max_until(deadline); | |
233 | if timed_out { | |
234 | self.abort_selection(false); | |
235 | } | |
236 | } else { | |
237 | wait_token.wait(); | |
238 | } | |
1a4d82fc JJ |
239 | } |
240 | ||
241 | match self.try_recv() { | |
242 | data @ Ok(..) => { self.steals -= 1; data } | |
243 | data => data, | |
244 | } | |
245 | } | |
246 | ||
247 | // Essentially the exact same thing as the stream decrement function. | |
248 | // Returns true if blocking should proceed. | |
249 | fn decrement(&mut self, token: SignalToken) -> StartResult { | |
250 | assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); | |
c34b1796 | 251 | let ptr = unsafe { token.cast_to_usize() }; |
1a4d82fc JJ |
252 | self.to_wake.store(ptr, Ordering::SeqCst); |
253 | ||
254 | let steals = self.steals; | |
255 | self.steals = 0; | |
256 | ||
257 | match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { | |
258 | DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } | |
259 | // If we factor in our steals and notice that the channel has no | |
260 | // data, we successfully sleep | |
261 | n => { | |
262 | assert!(n >= 0); | |
263 | if n - steals <= 0 { return Installed } | |
264 | } | |
265 | } | |
266 | ||
267 | self.to_wake.store(0, Ordering::SeqCst); | |
c34b1796 | 268 | drop(unsafe { SignalToken::cast_from_usize(ptr) }); |
1a4d82fc JJ |
269 | Abort |
270 | } | |
271 | ||
272 | pub fn try_recv(&mut self) -> Result<T, Failure> { | |
273 | let ret = match self.queue.pop() { | |
274 | mpsc::Data(t) => Some(t), | |
275 | mpsc::Empty => None, | |
276 | ||
277 | // This is a bit of an interesting case. The channel is reported as | |
278 | // having data available, but our pop() has failed due to the queue | |
279 | // being in an inconsistent state. This means that there is some | |
280 | // pusher somewhere which has yet to complete, but we are guaranteed | |
281 | // that a pop will eventually succeed. In this case, we spin in a | |
282 | // yield loop because the remote sender should finish their enqueue | |
283 | // operation "very quickly". | |
284 | // | |
285 | // Avoiding this yield loop would require a different queue | |
286 | // abstraction which provides the guarantee that after M pushes have | |
287 | // succeeded, at least M pops will succeed. The current queues | |
288 | // guarantee that if there are N active pushes, you can pop N times | |
289 | // once all N have finished. | |
290 | mpsc::Inconsistent => { | |
291 | let data; | |
292 | loop { | |
85aaf69f | 293 | thread::yield_now(); |
1a4d82fc JJ |
294 | match self.queue.pop() { |
295 | mpsc::Data(t) => { data = t; break } | |
296 | mpsc::Empty => panic!("inconsistent => empty"), | |
297 | mpsc::Inconsistent => {} | |
298 | } | |
299 | } | |
300 | Some(data) | |
301 | } | |
302 | }; | |
303 | match ret { | |
304 | // See the discussion in the stream implementation for why we | |
305 | // might decrement steals. | |
306 | Some(data) => { | |
307 | if self.steals > MAX_STEALS { | |
308 | match self.cnt.swap(0, Ordering::SeqCst) { | |
309 | DISCONNECTED => { | |
310 | self.cnt.store(DISCONNECTED, Ordering::SeqCst); | |
311 | } | |
312 | n => { | |
313 | let m = cmp::min(n, self.steals); | |
314 | self.steals -= m; | |
315 | self.bump(n - m); | |
316 | } | |
317 | } | |
318 | assert!(self.steals >= 0); | |
319 | } | |
320 | self.steals += 1; | |
321 | Ok(data) | |
322 | } | |
323 | ||
324 | // See the discussion in the stream implementation for why we try | |
325 | // again. | |
326 | None => { | |
327 | match self.cnt.load(Ordering::SeqCst) { | |
328 | n if n != DISCONNECTED => Err(Empty), | |
329 | _ => { | |
330 | match self.queue.pop() { | |
331 | mpsc::Data(t) => Ok(t), | |
332 | mpsc::Empty => Err(Disconnected), | |
333 | // with no senders, an inconsistency is impossible. | |
334 | mpsc::Inconsistent => unreachable!(), | |
335 | } | |
336 | } | |
337 | } | |
338 | } | |
339 | } | |
340 | } | |
341 | ||
342 | // Prepares this shared packet for a channel clone, essentially just bumping | |
343 | // a refcount. | |
344 | pub fn clone_chan(&mut self) { | |
9e0c209e SL |
345 | let old_count = self.channels.fetch_add(1, Ordering::SeqCst); |
346 | ||
347 | // See comments on Arc::clone() on why we do this (for `mem::forget`). | |
348 | if old_count > MAX_REFCOUNT { | |
349 | unsafe { | |
350 | abort(); | |
351 | } | |
352 | } | |
1a4d82fc JJ |
353 | } |
354 | ||
355 | // Decrement the reference count on a channel. This is called whenever a | |
356 | // Chan is dropped and may end up waking up a receiver. It's the receiver's | |
357 | // responsibility on the other end to figure out that we've disconnected. | |
358 | pub fn drop_chan(&mut self) { | |
359 | match self.channels.fetch_sub(1, Ordering::SeqCst) { | |
360 | 1 => {} | |
361 | n if n > 1 => return, | |
362 | n => panic!("bad number of channels left {}", n), | |
363 | } | |
364 | ||
365 | match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) { | |
366 | -1 => { self.take_to_wake().signal(); } | |
367 | DISCONNECTED => {} | |
368 | n => { assert!(n >= 0); } | |
369 | } | |
370 | } | |
371 | ||
372 | // See the long discussion inside of stream.rs for why the queue is drained, | |
373 | // and why it is done in this fashion. | |
374 | pub fn drop_port(&mut self) { | |
375 | self.port_dropped.store(true, Ordering::SeqCst); | |
376 | let mut steals = self.steals; | |
377 | while { | |
378 | let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, Ordering::SeqCst); | |
379 | cnt != DISCONNECTED && cnt != steals | |
380 | } { | |
381 | // See the discussion in 'try_recv' for why we yield | |
382 | // control of this thread. | |
383 | loop { | |
384 | match self.queue.pop() { | |
385 | mpsc::Data(..) => { steals += 1; } | |
386 | mpsc::Empty | mpsc::Inconsistent => break, | |
387 | } | |
388 | } | |
389 | } | |
390 | } | |
391 | ||
392 | // Consumes ownership of the 'to_wake' field. | |
393 | fn take_to_wake(&mut self) -> SignalToken { | |
394 | let ptr = self.to_wake.load(Ordering::SeqCst); | |
395 | self.to_wake.store(0, Ordering::SeqCst); | |
396 | assert!(ptr != 0); | |
c34b1796 | 397 | unsafe { SignalToken::cast_from_usize(ptr) } |
1a4d82fc JJ |
398 | } |
399 | ||
400 | //////////////////////////////////////////////////////////////////////////// | |
401 | // select implementation | |
402 | //////////////////////////////////////////////////////////////////////////// | |
403 | ||
404 | // Helper function for select, tests whether this port can receive without | |
405 | // blocking (obviously not an atomic decision). | |
406 | // | |
407 | // This is different than the stream version because there's no need to peek | |
408 | // at the queue, we can just look at the local count. | |
409 | pub fn can_recv(&mut self) -> bool { | |
410 | let cnt = self.cnt.load(Ordering::SeqCst); | |
411 | cnt == DISCONNECTED || cnt - self.steals > 0 | |
412 | } | |
413 | ||
414 | // increment the count on the channel (used for selection) | |
c34b1796 | 415 | fn bump(&mut self, amt: isize) -> isize { |
1a4d82fc JJ |
416 | match self.cnt.fetch_add(amt, Ordering::SeqCst) { |
417 | DISCONNECTED => { | |
418 | self.cnt.store(DISCONNECTED, Ordering::SeqCst); | |
419 | DISCONNECTED | |
420 | } | |
421 | n => n | |
422 | } | |
423 | } | |
424 | ||
425 | // Inserts the signal token for selection on this port, returning true if | |
426 | // blocking should proceed. | |
427 | // | |
428 | // The code here is the same as in stream.rs, except that it doesn't need to | |
429 | // peek at the channel to see if an upgrade is pending. | |
430 | pub fn start_selection(&mut self, token: SignalToken) -> StartResult { | |
431 | match self.decrement(token) { | |
432 | Installed => Installed, | |
433 | Abort => { | |
434 | let prev = self.bump(1); | |
435 | assert!(prev == DISCONNECTED || prev >= 0); | |
436 | Abort | |
437 | } | |
438 | } | |
439 | } | |
440 | ||
bd371182 | 441 | // Cancels a previous thread waiting on this port, returning whether there's |
1a4d82fc JJ |
442 | // data on the port. |
443 | // | |
444 | // This is similar to the stream implementation (hence fewer comments), but | |
445 | // uses a different value for the "steals" variable. | |
446 | pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool { | |
447 | // Before we do anything else, we bounce on this lock. The reason for | |
448 | // doing this is to ensure that any upgrade-in-progress is gone and | |
449 | // done with. Without this bounce, we can race with inherit_blocker | |
450 | // about looking at and dealing with to_wake. Once we have acquired the | |
451 | // lock, we are guaranteed that inherit_blocker is done. | |
452 | { | |
453 | let _guard = self.select_lock.lock().unwrap(); | |
454 | } | |
455 | ||
456 | // Like the stream implementation, we want to make sure that the count | |
457 | // on the channel goes non-negative. We don't know how negative the | |
458 | // stream currently is, so instead of using a steal value of 1, we load | |
459 | // the channel count and figure out what we should do to make it | |
460 | // positive. | |
461 | let steals = { | |
462 | let cnt = self.cnt.load(Ordering::SeqCst); | |
463 | if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0} | |
464 | }; | |
465 | let prev = self.bump(steals + 1); | |
466 | ||
467 | if prev == DISCONNECTED { | |
468 | assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); | |
469 | true | |
470 | } else { | |
471 | let cur = prev + steals + 1; | |
472 | assert!(cur >= 0); | |
473 | if prev < 0 { | |
474 | drop(self.take_to_wake()); | |
475 | } else { | |
476 | while self.to_wake.load(Ordering::SeqCst) != 0 { | |
85aaf69f | 477 | thread::yield_now(); |
1a4d82fc JJ |
478 | } |
479 | } | |
480 | // if the number of steals is -1, it was the pre-emptive -1 steal | |
481 | // count from when we inherited a blocker. This is fine because | |
482 | // we're just going to overwrite it with a real value. | |
483 | assert!(self.steals == 0 || self.steals == -1); | |
484 | self.steals = steals; | |
485 | prev >= 0 | |
486 | } | |
487 | } | |
488 | } | |
489 | ||
c34b1796 | 490 | impl<T> Drop for Packet<T> { |
1a4d82fc JJ |
491 | fn drop(&mut self) { |
492 | // Note that this load is not only an assert for correctness about | |
493 | // disconnection, but also a proper fence before the read of | |
494 | // `to_wake`, so this assert cannot be removed with also removing | |
495 | // the `to_wake` assert. | |
496 | assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED); | |
497 | assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); | |
498 | assert_eq!(self.channels.load(Ordering::SeqCst), 0); | |
499 | } | |
500 | } |