1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
13 /// This is the flavor of channels which are not necessarily optimized for any
14 /// particular use case, but are the most general in how they are used. Shared
15 /// channels are cloneable allowing for multiple senders.
17 /// High level implementation details can be found in the comment of the parent
18 /// module. You'll also note that the implementation of the shared and stream
19 /// channels are quite similar, and this is no coincidence!
21 pub use self::Failure
::*;
24 use core
::intrinsics
::abort
;
27 use sync
::atomic
::{AtomicUsize, AtomicIsize, AtomicBool, Ordering}
;
28 use sync
::mpsc
::blocking
::{self, SignalToken}
;
29 use sync
::mpsc
::mpsc_queue
as mpsc
;
30 use sync
::mpsc
::select
::StartResult
::*;
31 use sync
::mpsc
::select
::StartResult
;
32 use sync
::{Mutex, MutexGuard}
;
36 const DISCONNECTED
: isize = isize::MIN
;
37 const FUDGE
: isize = 1024;
38 const MAX_REFCOUNT
: usize = (isize::MAX
) as usize;
40 const MAX_STEALS
: isize = 5;
42 const MAX_STEALS
: isize = 1 << 20;
44 pub struct Packet
<T
> {
45 queue
: mpsc
::Queue
<T
>,
46 cnt
: AtomicIsize
, // How many items are on this channel
47 steals
: isize, // How many times has a port received without blocking?
48 to_wake
: AtomicUsize
, // SignalToken for wake up
50 // The number of channels which are currently using this packet.
51 channels
: AtomicUsize
,
53 // See the discussion in Port::drop and the channel send methods for what
55 port_dropped
: AtomicBool
,
56 sender_drain
: AtomicIsize
,
58 // this lock protects various portions of this implementation during
60 select_lock
: Mutex
<()>,
69 // Creation of a packet *must* be followed by a call to postinit_lock
70 // and later by inherit_blocker
71 pub fn new() -> Packet
<T
> {
73 queue
: mpsc
::Queue
::new(),
74 cnt
: AtomicIsize
::new(0),
76 to_wake
: AtomicUsize
::new(0),
77 channels
: AtomicUsize
::new(2),
78 port_dropped
: AtomicBool
::new(false),
79 sender_drain
: AtomicIsize
::new(0),
80 select_lock
: Mutex
::new(()),
84 // This function should be used after newly created Packet
85 // was wrapped with an Arc
86 // In other case mutex data will be duplicated while cloning
87 // and that could cause problems on platforms where it is
88 // represented by opaque data structure
89 pub fn postinit_lock(&self) -> MutexGuard
<()> {
90 self.select_lock
.lock().unwrap()
93 // This function is used at the creation of a shared packet to inherit a
94 // previously blocked thread. This is done to prevent spurious wakeups of
95 // threads in select().
97 // This can only be called at channel-creation time
98 pub fn inherit_blocker(&mut self,
99 token
: Option
<SignalToken
>,
100 guard
: MutexGuard
<()>) {
102 assert_eq
!(self.cnt
.load(Ordering
::SeqCst
), 0);
103 assert_eq
!(self.to_wake
.load(Ordering
::SeqCst
), 0);
104 self.to_wake
.store(unsafe { token.cast_to_usize() }
, Ordering
::SeqCst
);
105 self.cnt
.store(-1, Ordering
::SeqCst
);
107 // This store is a little sketchy. What's happening here is that
108 // we're transferring a blocker from a oneshot or stream channel to
109 // this shared channel. In doing so, we never spuriously wake them
110 // up and rather only wake them up at the appropriate time. This
111 // implementation of shared channels assumes that any blocking
112 // recv() will undo the increment of steals performed in try_recv()
113 // once the recv is complete. This thread that we're inheriting,
114 // however, is not in the middle of recv. Hence, the first time we
115 // wake them up, they're going to wake up from their old port, move
116 // on to the upgraded port, and then call the block recv() function.
118 // When calling this function, they'll find there's data immediately
119 // available, counting it as a steal. This in fact wasn't a steal
120 // because we appropriately blocked them waiting for data.
122 // To offset this bad increment, we initially set the steal count to
123 // -1. You'll find some special code in abort_selection() as well to
124 // ensure that this -1 steal count doesn't escape too far.
128 // When the shared packet is constructed, we grabbed this lock. The
129 // purpose of this lock is to ensure that abort_selection() doesn't
130 // interfere with this method. After we unlock this lock, we're
131 // signifying that we're done modifying self.cnt and self.to_wake and
132 // the port is ready for the world to continue using it.
136 pub fn send(&mut self, t
: T
) -> Result
<(), T
> {
137 // See Port::drop for what's going on
138 if self.port_dropped
.load(Ordering
::SeqCst
) { return Err(t) }
140 // Note that the multiple sender case is a little trickier
141 // semantically than the single sender case. The logic for
142 // incrementing is "add and if disconnected store disconnected".
143 // This could end up leading some senders to believe that there
144 // wasn't a disconnect if in fact there was a disconnect. This means
145 // that while one thread is attempting to re-store the disconnected
146 // states, other threads could walk through merrily incrementing
147 // this very-negative disconnected count. To prevent senders from
148 // spuriously attempting to send when the channels is actually
149 // disconnected, the count has a ranged check here.
151 // This is also done for another reason. Remember that the return
152 // value of this function is:
154 // `true` == the data *may* be received, this essentially has no
156 // `false` == the data will *never* be received, this has a lot of
159 // In the SPSC case, we have a check of 'queue.is_empty()' to see
160 // whether the data was actually received, but this same condition
161 // means nothing in a multi-producer context. As a result, this
162 // preflight check serves as the definitive "this will never be
163 // received". Once we get beyond this check, we have permanently
164 // entered the realm of "this may be received"
165 if self.cnt
.load(Ordering
::SeqCst
) < DISCONNECTED
+ FUDGE
{
170 match self.cnt
.fetch_add(1, Ordering
::SeqCst
) {
172 self.take_to_wake().signal();
175 // In this case, we have possibly failed to send our data, and
176 // we need to consider re-popping the data in order to fully
177 // destroy it. We must arbitrate among the multiple senders,
178 // however, because the queues that we're using are
179 // single-consumer queues. In order to do this, all exiting
180 // pushers will use an atomic count in order to count those
181 // flowing through. Pushers who see 0 are required to drain as
182 // much as possible, and then can only exit when they are the
183 // only pusher (otherwise they must try again).
184 n
if n
< DISCONNECTED
+ FUDGE
=> {
185 // see the comment in 'try' for a shared channel for why this
186 // window of "not disconnected" is ok.
187 self.cnt
.store(DISCONNECTED
, Ordering
::SeqCst
);
189 if self.sender_drain
.fetch_add(1, Ordering
::SeqCst
) == 0 {
191 // drain the queue, for info on the thread yield see the
192 // discussion in try_recv
194 match self.queue
.pop() {
196 mpsc
::Empty
=> break,
197 mpsc
::Inconsistent
=> thread
::yield_now(),
200 // maybe we're done, if we're not the last ones
201 // here, then we need to go try again.
202 if self.sender_drain
.fetch_sub(1, Ordering
::SeqCst
) == 1 {
207 // At this point, there may still be data on the queue,
208 // but only if the count hasn't been incremented and
209 // some other sender hasn't finished pushing data just
210 // yet. That sender in question will drain its own data.
214 // Can't make any assumptions about this case like in the SPSC case.
221 pub fn recv(&mut self, deadline
: Option
<Instant
>) -> Result
<T
, Failure
> {
222 // This code is essentially the exact same as that found in the stream
223 // case (see stream.rs)
224 match self.try_recv() {
229 let (wait_token
, signal_token
) = blocking
::tokens();
230 if self.decrement(signal_token
) == Installed
{
231 if let Some(deadline
) = deadline
{
232 let timed_out
= !wait_token
.wait_max_until(deadline
);
234 self.abort_selection(false);
241 match self.try_recv() {
242 data @
Ok(..) => { self.steals -= 1; data }
247 // Essentially the exact same thing as the stream decrement function.
248 // Returns true if blocking should proceed.
249 fn decrement(&mut self, token
: SignalToken
) -> StartResult
{
250 assert_eq
!(self.to_wake
.load(Ordering
::SeqCst
), 0);
251 let ptr
= unsafe { token.cast_to_usize() }
;
252 self.to_wake
.store(ptr
, Ordering
::SeqCst
);
254 let steals
= self.steals
;
257 match self.cnt
.fetch_sub(1 + steals
, Ordering
::SeqCst
) {
258 DISCONNECTED
=> { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
259 // If we factor in our steals and notice that the channel has no
260 // data, we successfully sleep
263 if n
- steals
<= 0 { return Installed }
267 self.to_wake
.store(0, Ordering
::SeqCst
);
268 drop(unsafe { SignalToken::cast_from_usize(ptr) }
);
272 pub fn try_recv(&mut self) -> Result
<T
, Failure
> {
273 let ret
= match self.queue
.pop() {
274 mpsc
::Data(t
) => Some(t
),
277 // This is a bit of an interesting case. The channel is reported as
278 // having data available, but our pop() has failed due to the queue
279 // being in an inconsistent state. This means that there is some
280 // pusher somewhere which has yet to complete, but we are guaranteed
281 // that a pop will eventually succeed. In this case, we spin in a
282 // yield loop because the remote sender should finish their enqueue
283 // operation "very quickly".
285 // Avoiding this yield loop would require a different queue
286 // abstraction which provides the guarantee that after M pushes have
287 // succeeded, at least M pops will succeed. The current queues
288 // guarantee that if there are N active pushes, you can pop N times
289 // once all N have finished.
290 mpsc
::Inconsistent
=> {
294 match self.queue
.pop() {
295 mpsc
::Data(t
) => { data = t; break }
296 mpsc
::Empty
=> panic
!("inconsistent => empty"),
297 mpsc
::Inconsistent
=> {}
304 // See the discussion in the stream implementation for why we
305 // might decrement steals.
307 if self.steals
> MAX_STEALS
{
308 match self.cnt
.swap(0, Ordering
::SeqCst
) {
310 self.cnt
.store(DISCONNECTED
, Ordering
::SeqCst
);
313 let m
= cmp
::min(n
, self.steals
);
318 assert
!(self.steals
>= 0);
324 // See the discussion in the stream implementation for why we try
327 match self.cnt
.load(Ordering
::SeqCst
) {
328 n
if n
!= DISCONNECTED
=> Err(Empty
),
330 match self.queue
.pop() {
331 mpsc
::Data(t
) => Ok(t
),
332 mpsc
::Empty
=> Err(Disconnected
),
333 // with no senders, an inconsistency is impossible.
334 mpsc
::Inconsistent
=> unreachable
!(),
342 // Prepares this shared packet for a channel clone, essentially just bumping
344 pub fn clone_chan(&mut self) {
345 let old_count
= self.channels
.fetch_add(1, Ordering
::SeqCst
);
347 // See comments on Arc::clone() on why we do this (for `mem::forget`).
348 if old_count
> MAX_REFCOUNT
{
355 // Decrement the reference count on a channel. This is called whenever a
356 // Chan is dropped and may end up waking up a receiver. It's the receiver's
357 // responsibility on the other end to figure out that we've disconnected.
358 pub fn drop_chan(&mut self) {
359 match self.channels
.fetch_sub(1, Ordering
::SeqCst
) {
361 n
if n
> 1 => return,
362 n
=> panic
!("bad number of channels left {}", n
),
365 match self.cnt
.swap(DISCONNECTED
, Ordering
::SeqCst
) {
366 -1 => { self.take_to_wake().signal(); }
368 n
=> { assert!(n >= 0); }
372 // See the long discussion inside of stream.rs for why the queue is drained,
373 // and why it is done in this fashion.
374 pub fn drop_port(&mut self) {
375 self.port_dropped
.store(true, Ordering
::SeqCst
);
376 let mut steals
= self.steals
;
378 let cnt
= self.cnt
.compare_and_swap(steals
, DISCONNECTED
, Ordering
::SeqCst
);
379 cnt
!= DISCONNECTED
&& cnt
!= steals
381 // See the discussion in 'try_recv' for why we yield
382 // control of this thread.
384 match self.queue
.pop() {
385 mpsc
::Data(..) => { steals += 1; }
386 mpsc
::Empty
| mpsc
::Inconsistent
=> break,
392 // Consumes ownership of the 'to_wake' field.
393 fn take_to_wake(&mut self) -> SignalToken
{
394 let ptr
= self.to_wake
.load(Ordering
::SeqCst
);
395 self.to_wake
.store(0, Ordering
::SeqCst
);
397 unsafe { SignalToken::cast_from_usize(ptr) }
400 ////////////////////////////////////////////////////////////////////////////
401 // select implementation
402 ////////////////////////////////////////////////////////////////////////////
404 // Helper function for select, tests whether this port can receive without
405 // blocking (obviously not an atomic decision).
407 // This is different than the stream version because there's no need to peek
408 // at the queue, we can just look at the local count.
409 pub fn can_recv(&mut self) -> bool
{
410 let cnt
= self.cnt
.load(Ordering
::SeqCst
);
411 cnt
== DISCONNECTED
|| cnt
- self.steals
> 0
414 // increment the count on the channel (used for selection)
415 fn bump(&mut self, amt
: isize) -> isize {
416 match self.cnt
.fetch_add(amt
, Ordering
::SeqCst
) {
418 self.cnt
.store(DISCONNECTED
, Ordering
::SeqCst
);
425 // Inserts the signal token for selection on this port, returning true if
426 // blocking should proceed.
428 // The code here is the same as in stream.rs, except that it doesn't need to
429 // peek at the channel to see if an upgrade is pending.
430 pub fn start_selection(&mut self, token
: SignalToken
) -> StartResult
{
431 match self.decrement(token
) {
432 Installed
=> Installed
,
434 let prev
= self.bump(1);
435 assert
!(prev
== DISCONNECTED
|| prev
>= 0);
441 // Cancels a previous thread waiting on this port, returning whether there's
444 // This is similar to the stream implementation (hence fewer comments), but
445 // uses a different value for the "steals" variable.
446 pub fn abort_selection(&mut self, _was_upgrade
: bool
) -> bool
{
447 // Before we do anything else, we bounce on this lock. The reason for
448 // doing this is to ensure that any upgrade-in-progress is gone and
449 // done with. Without this bounce, we can race with inherit_blocker
450 // about looking at and dealing with to_wake. Once we have acquired the
451 // lock, we are guaranteed that inherit_blocker is done.
453 let _guard
= self.select_lock
.lock().unwrap();
456 // Like the stream implementation, we want to make sure that the count
457 // on the channel goes non-negative. We don't know how negative the
458 // stream currently is, so instead of using a steal value of 1, we load
459 // the channel count and figure out what we should do to make it
462 let cnt
= self.cnt
.load(Ordering
::SeqCst
);
463 if cnt
< 0 && cnt
!= DISCONNECTED {-cnt}
else {0}
465 let prev
= self.bump(steals
+ 1);
467 if prev
== DISCONNECTED
{
468 assert_eq
!(self.to_wake
.load(Ordering
::SeqCst
), 0);
471 let cur
= prev
+ steals
+ 1;
474 drop(self.take_to_wake());
476 while self.to_wake
.load(Ordering
::SeqCst
) != 0 {
480 // if the number of steals is -1, it was the pre-emptive -1 steal
481 // count from when we inherited a blocker. This is fine because
482 // we're just going to overwrite it with a real value.
483 assert
!(self.steals
== 0 || self.steals
== -1);
484 self.steals
= steals
;
490 impl<T
> Drop
for Packet
<T
> {
492 // Note that this load is not only an assert for correctness about
493 // disconnection, but also a proper fence before the read of
494 // `to_wake`, so this assert cannot be removed with also removing
495 // the `to_wake` assert.
496 assert_eq
!(self.cnt
.load(Ordering
::SeqCst
), DISCONNECTED
);
497 assert_eq
!(self.to_wake
.load(Ordering
::SeqCst
), 0);
498 assert_eq
!(self.channels
.load(Ordering
::SeqCst
), 0);