]>
Commit | Line | Data |
---|---|---|
9fa01778 | 1 | /// Shared channels. |
1a4d82fc JJ |
2 | /// |
3 | /// This is the flavor of channels which are not necessarily optimized for any | |
4 | /// particular use case, but are the most general in how they are used. Shared | |
5 | /// channels are cloneable allowing for multiple senders. | |
6 | /// | |
7 | /// High level implementation details can be found in the comment of the parent | |
8 | /// module. You'll also note that the implementation of the shared and stream | |
9 | /// channels are quite similar, and this is no coincidence! | |
1a4d82fc | 10 | pub use self::Failure::*; |
48663c56 | 11 | use self::StartResult::*; |
1a4d82fc | 12 | |
1a4d82fc | 13 | use core::cmp; |
9e0c209e | 14 | use core::intrinsics::abort; |
1a4d82fc | 15 | |
532ac7d7 XL |
16 | use crate::cell::UnsafeCell; |
17 | use crate::ptr; | |
60c5eb7d | 18 | use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; |
532ac7d7 XL |
19 | use crate::sync::mpsc::blocking::{self, SignalToken}; |
20 | use crate::sync::mpsc::mpsc_queue as mpsc; | |
532ac7d7 XL |
21 | use crate::sync::{Mutex, MutexGuard}; |
22 | use crate::thread; | |
23 | use crate::time::Instant; | |
1a4d82fc | 24 | |
85aaf69f SL |
25 | const DISCONNECTED: isize = isize::MIN; |
26 | const FUDGE: isize = 1024; | |
9e0c209e | 27 | const MAX_REFCOUNT: usize = (isize::MAX) as usize; |
1a4d82fc | 28 | #[cfg(test)] |
85aaf69f | 29 | const MAX_STEALS: isize = 5; |
1a4d82fc | 30 | #[cfg(not(test))] |
85aaf69f | 31 | const MAX_STEALS: isize = 1 << 20; |
1a4d82fc JJ |
32 | |
33 | pub struct Packet<T> { | |
34 | queue: mpsc::Queue<T>, | |
60c5eb7d | 35 | cnt: AtomicIsize, // How many items are on this channel |
476ff2be | 36 | steals: UnsafeCell<isize>, // How many times has a port received without blocking? |
60c5eb7d | 37 | to_wake: AtomicUsize, // SignalToken for wake up |
1a4d82fc JJ |
38 | |
39 | // The number of channels which are currently using this packet. | |
9e0c209e | 40 | channels: AtomicUsize, |
1a4d82fc JJ |
41 | |
42 | // See the discussion in Port::drop and the channel send methods for what | |
43 | // these are used for | |
44 | port_dropped: AtomicBool, | |
85aaf69f | 45 | sender_drain: AtomicIsize, |
1a4d82fc JJ |
46 | |
47 | // this lock protects various portions of this implementation during | |
48 | // select() | |
49 | select_lock: Mutex<()>, | |
50 | } | |
51 | ||
52 | pub enum Failure { | |
53 | Empty, | |
54 | Disconnected, | |
55 | } | |
56 | ||
48663c56 XL |
57 | #[derive(PartialEq, Eq)] |
58 | enum StartResult { | |
59 | Installed, | |
60 | Abort, | |
61 | } | |
62 | ||
c34b1796 | 63 | impl<T> Packet<T> { |
1a4d82fc JJ |
64 | // Creation of a packet *must* be followed by a call to postinit_lock |
65 | // and later by inherit_blocker | |
66 | pub fn new() -> Packet<T> { | |
3157f602 | 67 | Packet { |
1a4d82fc | 68 | queue: mpsc::Queue::new(), |
85aaf69f | 69 | cnt: AtomicIsize::new(0), |
476ff2be | 70 | steals: UnsafeCell::new(0), |
85aaf69f | 71 | to_wake: AtomicUsize::new(0), |
9e0c209e | 72 | channels: AtomicUsize::new(2), |
1a4d82fc | 73 | port_dropped: AtomicBool::new(false), |
85aaf69f | 74 | sender_drain: AtomicIsize::new(0), |
1a4d82fc | 75 | select_lock: Mutex::new(()), |
3157f602 | 76 | } |
1a4d82fc JJ |
77 | } |
78 | ||
79 | // This function should be used after newly created Packet | |
80 | // was wrapped with an Arc | |
81 | // In other case mutex data will be duplicated while cloning | |
82 | // and that could cause problems on platforms where it is | |
83 | // represented by opaque data structure | |
532ac7d7 | 84 | pub fn postinit_lock(&self) -> MutexGuard<'_, ()> { |
1a4d82fc JJ |
85 | self.select_lock.lock().unwrap() |
86 | } | |
87 | ||
88 | // This function is used at the creation of a shared packet to inherit a | |
bd371182 AL |
89 | // previously blocked thread. This is done to prevent spurious wakeups of |
90 | // threads in select(). | |
1a4d82fc JJ |
91 | // |
92 | // This can only be called at channel-creation time | |
60c5eb7d | 93 | pub fn inherit_blocker(&self, token: Option<SignalToken>, guard: MutexGuard<'_, ()>) { |
1a4d82fc JJ |
94 | token.map(|token| { |
95 | assert_eq!(self.cnt.load(Ordering::SeqCst), 0); | |
96 | assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); | |
c34b1796 | 97 | self.to_wake.store(unsafe { token.cast_to_usize() }, Ordering::SeqCst); |
1a4d82fc JJ |
98 | self.cnt.store(-1, Ordering::SeqCst); |
99 | ||
100 | // This store is a little sketchy. What's happening here is that | |
101 | // we're transferring a blocker from a oneshot or stream channel to | |
102 | // this shared channel. In doing so, we never spuriously wake them | |
103 | // up and rather only wake them up at the appropriate time. This | |
104 | // implementation of shared channels assumes that any blocking | |
105 | // recv() will undo the increment of steals performed in try_recv() | |
106 | // once the recv is complete. This thread that we're inheriting, | |
107 | // however, is not in the middle of recv. Hence, the first time we | |
108 | // wake them up, they're going to wake up from their old port, move | |
109 | // on to the upgraded port, and then call the block recv() function. | |
110 | // | |
111 | // When calling this function, they'll find there's data immediately | |
112 | // available, counting it as a steal. This in fact wasn't a steal | |
113 | // because we appropriately blocked them waiting for data. | |
114 | // | |
115 | // To offset this bad increment, we initially set the steal count to | |
116 | // -1. You'll find some special code in abort_selection() as well to | |
117 | // ensure that this -1 steal count doesn't escape too far. | |
60c5eb7d XL |
118 | unsafe { |
119 | *self.steals.get() = -1; | |
120 | } | |
1a4d82fc JJ |
121 | }); |
122 | ||
123 | // When the shared packet is constructed, we grabbed this lock. The | |
124 | // purpose of this lock is to ensure that abort_selection() doesn't | |
125 | // interfere with this method. After we unlock this lock, we're | |
126 | // signifying that we're done modifying self.cnt and self.to_wake and | |
127 | // the port is ready for the world to continue using it. | |
128 | drop(guard); | |
129 | } | |
130 | ||
476ff2be | 131 | pub fn send(&self, t: T) -> Result<(), T> { |
1a4d82fc | 132 | // See Port::drop for what's going on |
60c5eb7d XL |
133 | if self.port_dropped.load(Ordering::SeqCst) { |
134 | return Err(t); | |
135 | } | |
1a4d82fc JJ |
136 | |
137 | // Note that the multiple sender case is a little trickier | |
138 | // semantically than the single sender case. The logic for | |
139 | // incrementing is "add and if disconnected store disconnected". | |
140 | // This could end up leading some senders to believe that there | |
141 | // wasn't a disconnect if in fact there was a disconnect. This means | |
142 | // that while one thread is attempting to re-store the disconnected | |
143 | // states, other threads could walk through merrily incrementing | |
144 | // this very-negative disconnected count. To prevent senders from | |
145 | // spuriously attempting to send when the channels is actually | |
146 | // disconnected, the count has a ranged check here. | |
147 | // | |
148 | // This is also done for another reason. Remember that the return | |
149 | // value of this function is: | |
150 | // | |
151 | // `true` == the data *may* be received, this essentially has no | |
152 | // meaning | |
153 | // `false` == the data will *never* be received, this has a lot of | |
154 | // meaning | |
155 | // | |
156 | // In the SPSC case, we have a check of 'queue.is_empty()' to see | |
157 | // whether the data was actually received, but this same condition | |
158 | // means nothing in a multi-producer context. As a result, this | |
159 | // preflight check serves as the definitive "this will never be | |
160 | // received". Once we get beyond this check, we have permanently | |
161 | // entered the realm of "this may be received" | |
162 | if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE { | |
60c5eb7d | 163 | return Err(t); |
1a4d82fc JJ |
164 | } |
165 | ||
166 | self.queue.push(t); | |
167 | match self.cnt.fetch_add(1, Ordering::SeqCst) { | |
168 | -1 => { | |
169 | self.take_to_wake().signal(); | |
170 | } | |
171 | ||
172 | // In this case, we have possibly failed to send our data, and | |
173 | // we need to consider re-popping the data in order to fully | |
174 | // destroy it. We must arbitrate among the multiple senders, | |
175 | // however, because the queues that we're using are | |
176 | // single-consumer queues. In order to do this, all exiting | |
177 | // pushers will use an atomic count in order to count those | |
178 | // flowing through. Pushers who see 0 are required to drain as | |
179 | // much as possible, and then can only exit when they are the | |
180 | // only pusher (otherwise they must try again). | |
181 | n if n < DISCONNECTED + FUDGE => { | |
182 | // see the comment in 'try' for a shared channel for why this | |
183 | // window of "not disconnected" is ok. | |
184 | self.cnt.store(DISCONNECTED, Ordering::SeqCst); | |
185 | ||
186 | if self.sender_drain.fetch_add(1, Ordering::SeqCst) == 0 { | |
187 | loop { | |
188 | // drain the queue, for info on the thread yield see the | |
189 | // discussion in try_recv | |
190 | loop { | |
191 | match self.queue.pop() { | |
192 | mpsc::Data(..) => {} | |
193 | mpsc::Empty => break, | |
85aaf69f | 194 | mpsc::Inconsistent => thread::yield_now(), |
1a4d82fc JJ |
195 | } |
196 | } | |
197 | // maybe we're done, if we're not the last ones | |
198 | // here, then we need to go try again. | |
199 | if self.sender_drain.fetch_sub(1, Ordering::SeqCst) == 1 { | |
60c5eb7d | 200 | break; |
1a4d82fc JJ |
201 | } |
202 | } | |
203 | ||
204 | // At this point, there may still be data on the queue, | |
205 | // but only if the count hasn't been incremented and | |
206 | // some other sender hasn't finished pushing data just | |
207 | // yet. That sender in question will drain its own data. | |
208 | } | |
209 | } | |
210 | ||
211 | // Can't make any assumptions about this case like in the SPSC case. | |
212 | _ => {} | |
213 | } | |
214 | ||
215 | Ok(()) | |
216 | } | |
217 | ||
476ff2be | 218 | pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> { |
1a4d82fc JJ |
219 | // This code is essentially the exact same as that found in the stream |
220 | // case (see stream.rs) | |
221 | match self.try_recv() { | |
222 | Err(Empty) => {} | |
223 | data => return data, | |
224 | } | |
225 | ||
226 | let (wait_token, signal_token) = blocking::tokens(); | |
227 | if self.decrement(signal_token) == Installed { | |
3157f602 XL |
228 | if let Some(deadline) = deadline { |
229 | let timed_out = !wait_token.wait_max_until(deadline); | |
230 | if timed_out { | |
231 | self.abort_selection(false); | |
232 | } | |
233 | } else { | |
234 | wait_token.wait(); | |
235 | } | |
1a4d82fc JJ |
236 | } |
237 | ||
238 | match self.try_recv() { | |
60c5eb7d XL |
239 | data @ Ok(..) => unsafe { |
240 | *self.steals.get() -= 1; | |
241 | data | |
242 | }, | |
1a4d82fc JJ |
243 | data => data, |
244 | } | |
245 | } | |
246 | ||
247 | // Essentially the exact same thing as the stream decrement function. | |
248 | // Returns true if blocking should proceed. | |
476ff2be SL |
249 | fn decrement(&self, token: SignalToken) -> StartResult { |
250 | unsafe { | |
251 | assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); | |
252 | let ptr = token.cast_to_usize(); | |
253 | self.to_wake.store(ptr, Ordering::SeqCst); | |
254 | ||
255 | let steals = ptr::replace(self.steals.get(), 0); | |
256 | ||
257 | match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { | |
60c5eb7d XL |
258 | DISCONNECTED => { |
259 | self.cnt.store(DISCONNECTED, Ordering::SeqCst); | |
260 | } | |
476ff2be SL |
261 | // If we factor in our steals and notice that the channel has no |
262 | // data, we successfully sleep | |
263 | n => { | |
264 | assert!(n >= 0); | |
60c5eb7d XL |
265 | if n - steals <= 0 { |
266 | return Installed; | |
267 | } | |
476ff2be | 268 | } |
1a4d82fc | 269 | } |
1a4d82fc | 270 | |
476ff2be SL |
271 | self.to_wake.store(0, Ordering::SeqCst); |
272 | drop(SignalToken::cast_from_usize(ptr)); | |
273 | Abort | |
274 | } | |
1a4d82fc JJ |
275 | } |
276 | ||
476ff2be | 277 | pub fn try_recv(&self) -> Result<T, Failure> { |
1a4d82fc JJ |
278 | let ret = match self.queue.pop() { |
279 | mpsc::Data(t) => Some(t), | |
280 | mpsc::Empty => None, | |
281 | ||
282 | // This is a bit of an interesting case. The channel is reported as | |
283 | // having data available, but our pop() has failed due to the queue | |
284 | // being in an inconsistent state. This means that there is some | |
285 | // pusher somewhere which has yet to complete, but we are guaranteed | |
286 | // that a pop will eventually succeed. In this case, we spin in a | |
287 | // yield loop because the remote sender should finish their enqueue | |
288 | // operation "very quickly". | |
289 | // | |
290 | // Avoiding this yield loop would require a different queue | |
291 | // abstraction which provides the guarantee that after M pushes have | |
292 | // succeeded, at least M pops will succeed. The current queues | |
293 | // guarantee that if there are N active pushes, you can pop N times | |
294 | // once all N have finished. | |
295 | mpsc::Inconsistent => { | |
296 | let data; | |
297 | loop { | |
85aaf69f | 298 | thread::yield_now(); |
1a4d82fc | 299 | match self.queue.pop() { |
60c5eb7d XL |
300 | mpsc::Data(t) => { |
301 | data = t; | |
302 | break; | |
303 | } | |
1a4d82fc JJ |
304 | mpsc::Empty => panic!("inconsistent => empty"), |
305 | mpsc::Inconsistent => {} | |
306 | } | |
307 | } | |
308 | Some(data) | |
309 | } | |
310 | }; | |
311 | match ret { | |
312 | // See the discussion in the stream implementation for why we | |
313 | // might decrement steals. | |
476ff2be SL |
314 | Some(data) => unsafe { |
315 | if *self.steals.get() > MAX_STEALS { | |
1a4d82fc JJ |
316 | match self.cnt.swap(0, Ordering::SeqCst) { |
317 | DISCONNECTED => { | |
318 | self.cnt.store(DISCONNECTED, Ordering::SeqCst); | |
319 | } | |
320 | n => { | |
476ff2be SL |
321 | let m = cmp::min(n, *self.steals.get()); |
322 | *self.steals.get() -= m; | |
1a4d82fc JJ |
323 | self.bump(n - m); |
324 | } | |
325 | } | |
476ff2be | 326 | assert!(*self.steals.get() >= 0); |
1a4d82fc | 327 | } |
476ff2be | 328 | *self.steals.get() += 1; |
1a4d82fc | 329 | Ok(data) |
476ff2be | 330 | }, |
1a4d82fc JJ |
331 | |
332 | // See the discussion in the stream implementation for why we try | |
333 | // again. | |
334 | None => { | |
335 | match self.cnt.load(Ordering::SeqCst) { | |
336 | n if n != DISCONNECTED => Err(Empty), | |
337 | _ => { | |
338 | match self.queue.pop() { | |
339 | mpsc::Data(t) => Ok(t), | |
340 | mpsc::Empty => Err(Disconnected), | |
341 | // with no senders, an inconsistency is impossible. | |
342 | mpsc::Inconsistent => unreachable!(), | |
343 | } | |
344 | } | |
345 | } | |
346 | } | |
347 | } | |
348 | } | |
349 | ||
350 | // Prepares this shared packet for a channel clone, essentially just bumping | |
351 | // a refcount. | |
476ff2be | 352 | pub fn clone_chan(&self) { |
9e0c209e SL |
353 | let old_count = self.channels.fetch_add(1, Ordering::SeqCst); |
354 | ||
355 | // See comments on Arc::clone() on why we do this (for `mem::forget`). | |
356 | if old_count > MAX_REFCOUNT { | |
357 | unsafe { | |
358 | abort(); | |
359 | } | |
360 | } | |
1a4d82fc JJ |
361 | } |
362 | ||
363 | // Decrement the reference count on a channel. This is called whenever a | |
364 | // Chan is dropped and may end up waking up a receiver. It's the receiver's | |
365 | // responsibility on the other end to figure out that we've disconnected. | |
476ff2be | 366 | pub fn drop_chan(&self) { |
1a4d82fc JJ |
367 | match self.channels.fetch_sub(1, Ordering::SeqCst) { |
368 | 1 => {} | |
369 | n if n > 1 => return, | |
370 | n => panic!("bad number of channels left {}", n), | |
371 | } | |
372 | ||
373 | match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) { | |
60c5eb7d XL |
374 | -1 => { |
375 | self.take_to_wake().signal(); | |
376 | } | |
1a4d82fc | 377 | DISCONNECTED => {} |
60c5eb7d XL |
378 | n => { |
379 | assert!(n >= 0); | |
380 | } | |
1a4d82fc JJ |
381 | } |
382 | } | |
383 | ||
384 | // See the long discussion inside of stream.rs for why the queue is drained, | |
385 | // and why it is done in this fashion. | |
476ff2be | 386 | pub fn drop_port(&self) { |
1a4d82fc | 387 | self.port_dropped.store(true, Ordering::SeqCst); |
476ff2be | 388 | let mut steals = unsafe { *self.steals.get() }; |
1a4d82fc JJ |
389 | while { |
390 | let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, Ordering::SeqCst); | |
391 | cnt != DISCONNECTED && cnt != steals | |
392 | } { | |
393 | // See the discussion in 'try_recv' for why we yield | |
394 | // control of this thread. | |
395 | loop { | |
396 | match self.queue.pop() { | |
60c5eb7d XL |
397 | mpsc::Data(..) => { |
398 | steals += 1; | |
399 | } | |
1a4d82fc JJ |
400 | mpsc::Empty | mpsc::Inconsistent => break, |
401 | } | |
402 | } | |
403 | } | |
404 | } | |
405 | ||
406 | // Consumes ownership of the 'to_wake' field. | |
476ff2be | 407 | fn take_to_wake(&self) -> SignalToken { |
1a4d82fc JJ |
408 | let ptr = self.to_wake.load(Ordering::SeqCst); |
409 | self.to_wake.store(0, Ordering::SeqCst); | |
410 | assert!(ptr != 0); | |
c34b1796 | 411 | unsafe { SignalToken::cast_from_usize(ptr) } |
1a4d82fc JJ |
412 | } |
413 | ||
414 | //////////////////////////////////////////////////////////////////////////// | |
415 | // select implementation | |
416 | //////////////////////////////////////////////////////////////////////////// | |
417 | ||
1a4d82fc | 418 | // increment the count on the channel (used for selection) |
476ff2be | 419 | fn bump(&self, amt: isize) -> isize { |
1a4d82fc JJ |
420 | match self.cnt.fetch_add(amt, Ordering::SeqCst) { |
421 | DISCONNECTED => { | |
422 | self.cnt.store(DISCONNECTED, Ordering::SeqCst); | |
423 | DISCONNECTED | |
424 | } | |
60c5eb7d | 425 | n => n, |
1a4d82fc JJ |
426 | } |
427 | } | |
428 | ||
bd371182 | 429 | // Cancels a previous thread waiting on this port, returning whether there's |
1a4d82fc JJ |
430 | // data on the port. |
431 | // | |
432 | // This is similar to the stream implementation (hence fewer comments), but | |
433 | // uses a different value for the "steals" variable. | |
476ff2be | 434 | pub fn abort_selection(&self, _was_upgrade: bool) -> bool { |
1a4d82fc JJ |
435 | // Before we do anything else, we bounce on this lock. The reason for |
436 | // doing this is to ensure that any upgrade-in-progress is gone and | |
437 | // done with. Without this bounce, we can race with inherit_blocker | |
438 | // about looking at and dealing with to_wake. Once we have acquired the | |
439 | // lock, we are guaranteed that inherit_blocker is done. | |
440 | { | |
441 | let _guard = self.select_lock.lock().unwrap(); | |
442 | } | |
443 | ||
444 | // Like the stream implementation, we want to make sure that the count | |
445 | // on the channel goes non-negative. We don't know how negative the | |
446 | // stream currently is, so instead of using a steal value of 1, we load | |
447 | // the channel count and figure out what we should do to make it | |
448 | // positive. | |
449 | let steals = { | |
450 | let cnt = self.cnt.load(Ordering::SeqCst); | |
60c5eb7d | 451 | if cnt < 0 && cnt != DISCONNECTED { -cnt } else { 0 } |
1a4d82fc JJ |
452 | }; |
453 | let prev = self.bump(steals + 1); | |
454 | ||
455 | if prev == DISCONNECTED { | |
456 | assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); | |
457 | true | |
458 | } else { | |
459 | let cur = prev + steals + 1; | |
460 | assert!(cur >= 0); | |
461 | if prev < 0 { | |
462 | drop(self.take_to_wake()); | |
463 | } else { | |
464 | while self.to_wake.load(Ordering::SeqCst) != 0 { | |
85aaf69f | 465 | thread::yield_now(); |
1a4d82fc JJ |
466 | } |
467 | } | |
476ff2be SL |
468 | unsafe { |
469 | // if the number of steals is -1, it was the pre-emptive -1 steal | |
470 | // count from when we inherited a blocker. This is fine because | |
471 | // we're just going to overwrite it with a real value. | |
472 | let old = self.steals.get(); | |
473 | assert!(*old == 0 || *old == -1); | |
474 | *old = steals; | |
475 | prev >= 0 | |
476 | } | |
1a4d82fc JJ |
477 | } |
478 | } | |
479 | } | |
480 | ||
c34b1796 | 481 | impl<T> Drop for Packet<T> { |
1a4d82fc JJ |
482 | fn drop(&mut self) { |
483 | // Note that this load is not only an assert for correctness about | |
484 | // disconnection, but also a proper fence before the read of | |
485 | // `to_wake`, so this assert cannot be removed with also removing | |
486 | // the `to_wake` assert. | |
487 | assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED); | |
488 | assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); | |
489 | assert_eq!(self.channels.load(Ordering::SeqCst), 0); | |
490 | } | |
491 | } |