]> git.proxmox.com Git - rustc.git/blame - src/libstd/sync/mpsc/shared.rs
New upstream version 1.44.1+dfsg1
[rustc.git] / src / libstd / sync / mpsc / shared.rs
CommitLineData
9fa01778 1/// Shared channels.
1a4d82fc
JJ
2///
3/// This is the flavor of channels which are not necessarily optimized for any
4/// particular use case, but are the most general in how they are used. Shared
5/// channels are cloneable allowing for multiple senders.
6///
7/// High level implementation details can be found in the comment of the parent
8/// module. You'll also note that the implementation of the shared and stream
9/// channels are quite similar, and this is no coincidence!
1a4d82fc 10pub use self::Failure::*;
48663c56 11use self::StartResult::*;
1a4d82fc 12
1a4d82fc 13use core::cmp;
9e0c209e 14use core::intrinsics::abort;
1a4d82fc 15
532ac7d7
XL
16use crate::cell::UnsafeCell;
17use crate::ptr;
60c5eb7d 18use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
532ac7d7
XL
19use crate::sync::mpsc::blocking::{self, SignalToken};
20use crate::sync::mpsc::mpsc_queue as mpsc;
532ac7d7
XL
21use crate::sync::{Mutex, MutexGuard};
22use crate::thread;
23use crate::time::Instant;
1a4d82fc 24
85aaf69f
SL
25const DISCONNECTED: isize = isize::MIN;
26const FUDGE: isize = 1024;
9e0c209e 27const MAX_REFCOUNT: usize = (isize::MAX) as usize;
1a4d82fc 28#[cfg(test)]
85aaf69f 29const MAX_STEALS: isize = 5;
1a4d82fc 30#[cfg(not(test))]
85aaf69f 31const MAX_STEALS: isize = 1 << 20;
1a4d82fc
JJ
32
33pub struct Packet<T> {
34 queue: mpsc::Queue<T>,
60c5eb7d 35 cnt: AtomicIsize, // How many items are on this channel
476ff2be 36 steals: UnsafeCell<isize>, // How many times has a port received without blocking?
60c5eb7d 37 to_wake: AtomicUsize, // SignalToken for wake up
1a4d82fc
JJ
38
39 // The number of channels which are currently using this packet.
9e0c209e 40 channels: AtomicUsize,
1a4d82fc
JJ
41
42 // See the discussion in Port::drop and the channel send methods for what
43 // these are used for
44 port_dropped: AtomicBool,
85aaf69f 45 sender_drain: AtomicIsize,
1a4d82fc
JJ
46
47 // this lock protects various portions of this implementation during
48 // select()
49 select_lock: Mutex<()>,
50}
51
52pub enum Failure {
53 Empty,
54 Disconnected,
55}
56
48663c56
XL
57#[derive(PartialEq, Eq)]
58enum StartResult {
59 Installed,
60 Abort,
61}
62
c34b1796 63impl<T> Packet<T> {
1a4d82fc
JJ
64 // Creation of a packet *must* be followed by a call to postinit_lock
65 // and later by inherit_blocker
66 pub fn new() -> Packet<T> {
3157f602 67 Packet {
1a4d82fc 68 queue: mpsc::Queue::new(),
85aaf69f 69 cnt: AtomicIsize::new(0),
476ff2be 70 steals: UnsafeCell::new(0),
85aaf69f 71 to_wake: AtomicUsize::new(0),
9e0c209e 72 channels: AtomicUsize::new(2),
1a4d82fc 73 port_dropped: AtomicBool::new(false),
85aaf69f 74 sender_drain: AtomicIsize::new(0),
1a4d82fc 75 select_lock: Mutex::new(()),
3157f602 76 }
1a4d82fc
JJ
77 }
78
79 // This function should be used after newly created Packet
80 // was wrapped with an Arc
81 // In other case mutex data will be duplicated while cloning
82 // and that could cause problems on platforms where it is
83 // represented by opaque data structure
532ac7d7 84 pub fn postinit_lock(&self) -> MutexGuard<'_, ()> {
1a4d82fc
JJ
85 self.select_lock.lock().unwrap()
86 }
87
88 // This function is used at the creation of a shared packet to inherit a
bd371182
AL
89 // previously blocked thread. This is done to prevent spurious wakeups of
90 // threads in select().
1a4d82fc
JJ
91 //
92 // This can only be called at channel-creation time
60c5eb7d 93 pub fn inherit_blocker(&self, token: Option<SignalToken>, guard: MutexGuard<'_, ()>) {
1a4d82fc
JJ
94 token.map(|token| {
95 assert_eq!(self.cnt.load(Ordering::SeqCst), 0);
96 assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
c34b1796 97 self.to_wake.store(unsafe { token.cast_to_usize() }, Ordering::SeqCst);
1a4d82fc
JJ
98 self.cnt.store(-1, Ordering::SeqCst);
99
100 // This store is a little sketchy. What's happening here is that
101 // we're transferring a blocker from a oneshot or stream channel to
102 // this shared channel. In doing so, we never spuriously wake them
103 // up and rather only wake them up at the appropriate time. This
104 // implementation of shared channels assumes that any blocking
105 // recv() will undo the increment of steals performed in try_recv()
106 // once the recv is complete. This thread that we're inheriting,
107 // however, is not in the middle of recv. Hence, the first time we
108 // wake them up, they're going to wake up from their old port, move
109 // on to the upgraded port, and then call the block recv() function.
110 //
111 // When calling this function, they'll find there's data immediately
112 // available, counting it as a steal. This in fact wasn't a steal
113 // because we appropriately blocked them waiting for data.
114 //
115 // To offset this bad increment, we initially set the steal count to
116 // -1. You'll find some special code in abort_selection() as well to
117 // ensure that this -1 steal count doesn't escape too far.
60c5eb7d
XL
118 unsafe {
119 *self.steals.get() = -1;
120 }
1a4d82fc
JJ
121 });
122
123 // When the shared packet is constructed, we grabbed this lock. The
124 // purpose of this lock is to ensure that abort_selection() doesn't
125 // interfere with this method. After we unlock this lock, we're
126 // signifying that we're done modifying self.cnt and self.to_wake and
127 // the port is ready for the world to continue using it.
128 drop(guard);
129 }
130
476ff2be 131 pub fn send(&self, t: T) -> Result<(), T> {
1a4d82fc 132 // See Port::drop for what's going on
60c5eb7d
XL
133 if self.port_dropped.load(Ordering::SeqCst) {
134 return Err(t);
135 }
1a4d82fc
JJ
136
137 // Note that the multiple sender case is a little trickier
138 // semantically than the single sender case. The logic for
139 // incrementing is "add and if disconnected store disconnected".
140 // This could end up leading some senders to believe that there
141 // wasn't a disconnect if in fact there was a disconnect. This means
142 // that while one thread is attempting to re-store the disconnected
143 // states, other threads could walk through merrily incrementing
144 // this very-negative disconnected count. To prevent senders from
145 // spuriously attempting to send when the channels is actually
146 // disconnected, the count has a ranged check here.
147 //
148 // This is also done for another reason. Remember that the return
149 // value of this function is:
150 //
151 // `true` == the data *may* be received, this essentially has no
152 // meaning
153 // `false` == the data will *never* be received, this has a lot of
154 // meaning
155 //
156 // In the SPSC case, we have a check of 'queue.is_empty()' to see
157 // whether the data was actually received, but this same condition
158 // means nothing in a multi-producer context. As a result, this
159 // preflight check serves as the definitive "this will never be
160 // received". Once we get beyond this check, we have permanently
161 // entered the realm of "this may be received"
162 if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE {
60c5eb7d 163 return Err(t);
1a4d82fc
JJ
164 }
165
166 self.queue.push(t);
167 match self.cnt.fetch_add(1, Ordering::SeqCst) {
168 -1 => {
169 self.take_to_wake().signal();
170 }
171
172 // In this case, we have possibly failed to send our data, and
173 // we need to consider re-popping the data in order to fully
174 // destroy it. We must arbitrate among the multiple senders,
175 // however, because the queues that we're using are
176 // single-consumer queues. In order to do this, all exiting
177 // pushers will use an atomic count in order to count those
178 // flowing through. Pushers who see 0 are required to drain as
179 // much as possible, and then can only exit when they are the
180 // only pusher (otherwise they must try again).
181 n if n < DISCONNECTED + FUDGE => {
182 // see the comment in 'try' for a shared channel for why this
183 // window of "not disconnected" is ok.
184 self.cnt.store(DISCONNECTED, Ordering::SeqCst);
185
186 if self.sender_drain.fetch_add(1, Ordering::SeqCst) == 0 {
187 loop {
188 // drain the queue, for info on the thread yield see the
189 // discussion in try_recv
190 loop {
191 match self.queue.pop() {
192 mpsc::Data(..) => {}
193 mpsc::Empty => break,
85aaf69f 194 mpsc::Inconsistent => thread::yield_now(),
1a4d82fc
JJ
195 }
196 }
197 // maybe we're done, if we're not the last ones
198 // here, then we need to go try again.
199 if self.sender_drain.fetch_sub(1, Ordering::SeqCst) == 1 {
60c5eb7d 200 break;
1a4d82fc
JJ
201 }
202 }
203
204 // At this point, there may still be data on the queue,
205 // but only if the count hasn't been incremented and
206 // some other sender hasn't finished pushing data just
207 // yet. That sender in question will drain its own data.
208 }
209 }
210
211 // Can't make any assumptions about this case like in the SPSC case.
212 _ => {}
213 }
214
215 Ok(())
216 }
217
476ff2be 218 pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
1a4d82fc
JJ
219 // This code is essentially the exact same as that found in the stream
220 // case (see stream.rs)
221 match self.try_recv() {
222 Err(Empty) => {}
223 data => return data,
224 }
225
226 let (wait_token, signal_token) = blocking::tokens();
227 if self.decrement(signal_token) == Installed {
3157f602
XL
228 if let Some(deadline) = deadline {
229 let timed_out = !wait_token.wait_max_until(deadline);
230 if timed_out {
231 self.abort_selection(false);
232 }
233 } else {
234 wait_token.wait();
235 }
1a4d82fc
JJ
236 }
237
238 match self.try_recv() {
60c5eb7d
XL
239 data @ Ok(..) => unsafe {
240 *self.steals.get() -= 1;
241 data
242 },
1a4d82fc
JJ
243 data => data,
244 }
245 }
246
247 // Essentially the exact same thing as the stream decrement function.
248 // Returns true if blocking should proceed.
476ff2be
SL
249 fn decrement(&self, token: SignalToken) -> StartResult {
250 unsafe {
251 assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
252 let ptr = token.cast_to_usize();
253 self.to_wake.store(ptr, Ordering::SeqCst);
254
255 let steals = ptr::replace(self.steals.get(), 0);
256
257 match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
60c5eb7d
XL
258 DISCONNECTED => {
259 self.cnt.store(DISCONNECTED, Ordering::SeqCst);
260 }
476ff2be
SL
261 // If we factor in our steals and notice that the channel has no
262 // data, we successfully sleep
263 n => {
264 assert!(n >= 0);
60c5eb7d
XL
265 if n - steals <= 0 {
266 return Installed;
267 }
476ff2be 268 }
1a4d82fc 269 }
1a4d82fc 270
476ff2be
SL
271 self.to_wake.store(0, Ordering::SeqCst);
272 drop(SignalToken::cast_from_usize(ptr));
273 Abort
274 }
1a4d82fc
JJ
275 }
276
476ff2be 277 pub fn try_recv(&self) -> Result<T, Failure> {
1a4d82fc
JJ
278 let ret = match self.queue.pop() {
279 mpsc::Data(t) => Some(t),
280 mpsc::Empty => None,
281
282 // This is a bit of an interesting case. The channel is reported as
283 // having data available, but our pop() has failed due to the queue
284 // being in an inconsistent state. This means that there is some
285 // pusher somewhere which has yet to complete, but we are guaranteed
286 // that a pop will eventually succeed. In this case, we spin in a
287 // yield loop because the remote sender should finish their enqueue
288 // operation "very quickly".
289 //
290 // Avoiding this yield loop would require a different queue
291 // abstraction which provides the guarantee that after M pushes have
292 // succeeded, at least M pops will succeed. The current queues
293 // guarantee that if there are N active pushes, you can pop N times
294 // once all N have finished.
295 mpsc::Inconsistent => {
296 let data;
297 loop {
85aaf69f 298 thread::yield_now();
1a4d82fc 299 match self.queue.pop() {
60c5eb7d
XL
300 mpsc::Data(t) => {
301 data = t;
302 break;
303 }
1a4d82fc
JJ
304 mpsc::Empty => panic!("inconsistent => empty"),
305 mpsc::Inconsistent => {}
306 }
307 }
308 Some(data)
309 }
310 };
311 match ret {
312 // See the discussion in the stream implementation for why we
313 // might decrement steals.
476ff2be
SL
314 Some(data) => unsafe {
315 if *self.steals.get() > MAX_STEALS {
1a4d82fc
JJ
316 match self.cnt.swap(0, Ordering::SeqCst) {
317 DISCONNECTED => {
318 self.cnt.store(DISCONNECTED, Ordering::SeqCst);
319 }
320 n => {
476ff2be
SL
321 let m = cmp::min(n, *self.steals.get());
322 *self.steals.get() -= m;
1a4d82fc
JJ
323 self.bump(n - m);
324 }
325 }
476ff2be 326 assert!(*self.steals.get() >= 0);
1a4d82fc 327 }
476ff2be 328 *self.steals.get() += 1;
1a4d82fc 329 Ok(data)
476ff2be 330 },
1a4d82fc
JJ
331
332 // See the discussion in the stream implementation for why we try
333 // again.
334 None => {
335 match self.cnt.load(Ordering::SeqCst) {
336 n if n != DISCONNECTED => Err(Empty),
337 _ => {
338 match self.queue.pop() {
339 mpsc::Data(t) => Ok(t),
340 mpsc::Empty => Err(Disconnected),
341 // with no senders, an inconsistency is impossible.
342 mpsc::Inconsistent => unreachable!(),
343 }
344 }
345 }
346 }
347 }
348 }
349
350 // Prepares this shared packet for a channel clone, essentially just bumping
351 // a refcount.
476ff2be 352 pub fn clone_chan(&self) {
9e0c209e
SL
353 let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
354
355 // See comments on Arc::clone() on why we do this (for `mem::forget`).
356 if old_count > MAX_REFCOUNT {
357 unsafe {
358 abort();
359 }
360 }
1a4d82fc
JJ
361 }
362
363 // Decrement the reference count on a channel. This is called whenever a
364 // Chan is dropped and may end up waking up a receiver. It's the receiver's
365 // responsibility on the other end to figure out that we've disconnected.
476ff2be 366 pub fn drop_chan(&self) {
1a4d82fc
JJ
367 match self.channels.fetch_sub(1, Ordering::SeqCst) {
368 1 => {}
369 n if n > 1 => return,
370 n => panic!("bad number of channels left {}", n),
371 }
372
373 match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
60c5eb7d
XL
374 -1 => {
375 self.take_to_wake().signal();
376 }
1a4d82fc 377 DISCONNECTED => {}
60c5eb7d
XL
378 n => {
379 assert!(n >= 0);
380 }
1a4d82fc
JJ
381 }
382 }
383
384 // See the long discussion inside of stream.rs for why the queue is drained,
385 // and why it is done in this fashion.
476ff2be 386 pub fn drop_port(&self) {
1a4d82fc 387 self.port_dropped.store(true, Ordering::SeqCst);
476ff2be 388 let mut steals = unsafe { *self.steals.get() };
1a4d82fc
JJ
389 while {
390 let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, Ordering::SeqCst);
391 cnt != DISCONNECTED && cnt != steals
392 } {
393 // See the discussion in 'try_recv' for why we yield
394 // control of this thread.
395 loop {
396 match self.queue.pop() {
60c5eb7d
XL
397 mpsc::Data(..) => {
398 steals += 1;
399 }
1a4d82fc
JJ
400 mpsc::Empty | mpsc::Inconsistent => break,
401 }
402 }
403 }
404 }
405
406 // Consumes ownership of the 'to_wake' field.
476ff2be 407 fn take_to_wake(&self) -> SignalToken {
1a4d82fc
JJ
408 let ptr = self.to_wake.load(Ordering::SeqCst);
409 self.to_wake.store(0, Ordering::SeqCst);
410 assert!(ptr != 0);
c34b1796 411 unsafe { SignalToken::cast_from_usize(ptr) }
1a4d82fc
JJ
412 }
413
414 ////////////////////////////////////////////////////////////////////////////
415 // select implementation
416 ////////////////////////////////////////////////////////////////////////////
417
1a4d82fc 418 // increment the count on the channel (used for selection)
476ff2be 419 fn bump(&self, amt: isize) -> isize {
1a4d82fc
JJ
420 match self.cnt.fetch_add(amt, Ordering::SeqCst) {
421 DISCONNECTED => {
422 self.cnt.store(DISCONNECTED, Ordering::SeqCst);
423 DISCONNECTED
424 }
60c5eb7d 425 n => n,
1a4d82fc
JJ
426 }
427 }
428
bd371182 429 // Cancels a previous thread waiting on this port, returning whether there's
1a4d82fc
JJ
430 // data on the port.
431 //
432 // This is similar to the stream implementation (hence fewer comments), but
433 // uses a different value for the "steals" variable.
476ff2be 434 pub fn abort_selection(&self, _was_upgrade: bool) -> bool {
1a4d82fc
JJ
435 // Before we do anything else, we bounce on this lock. The reason for
436 // doing this is to ensure that any upgrade-in-progress is gone and
437 // done with. Without this bounce, we can race with inherit_blocker
438 // about looking at and dealing with to_wake. Once we have acquired the
439 // lock, we are guaranteed that inherit_blocker is done.
440 {
441 let _guard = self.select_lock.lock().unwrap();
442 }
443
444 // Like the stream implementation, we want to make sure that the count
445 // on the channel goes non-negative. We don't know how negative the
446 // stream currently is, so instead of using a steal value of 1, we load
447 // the channel count and figure out what we should do to make it
448 // positive.
449 let steals = {
450 let cnt = self.cnt.load(Ordering::SeqCst);
60c5eb7d 451 if cnt < 0 && cnt != DISCONNECTED { -cnt } else { 0 }
1a4d82fc
JJ
452 };
453 let prev = self.bump(steals + 1);
454
455 if prev == DISCONNECTED {
456 assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
457 true
458 } else {
459 let cur = prev + steals + 1;
460 assert!(cur >= 0);
461 if prev < 0 {
462 drop(self.take_to_wake());
463 } else {
464 while self.to_wake.load(Ordering::SeqCst) != 0 {
85aaf69f 465 thread::yield_now();
1a4d82fc
JJ
466 }
467 }
476ff2be
SL
468 unsafe {
469 // if the number of steals is -1, it was the pre-emptive -1 steal
470 // count from when we inherited a blocker. This is fine because
471 // we're just going to overwrite it with a real value.
472 let old = self.steals.get();
473 assert!(*old == 0 || *old == -1);
474 *old = steals;
475 prev >= 0
476 }
1a4d82fc
JJ
477 }
478 }
479}
480
c34b1796 481impl<T> Drop for Packet<T> {
1a4d82fc
JJ
482 fn drop(&mut self) {
483 // Note that this load is not only an assert for correctness about
484 // disconnection, but also a proper fence before the read of
485 // `to_wake`, so this assert cannot be removed with also removing
486 // the `to_wake` assert.
487 assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
488 assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
489 assert_eq!(self.channels.load(Ordering::SeqCst), 0);
490 }
491}