2 /// Synchronous channels/ports
4 /// This channel implementation differs significantly from the asynchronous
5 /// implementations found next to it (oneshot/stream/share). This is an
6 /// implementation of a synchronous, bounded buffer channel.
8 /// Each channel is created with some amount of backing buffer, and sends will
9 /// *block* until buffer space becomes available. A buffer size of 0 is valid,
10 /// which means that every successful send is paired with a successful recv.
12 /// This flavor of channels defines a new `send_opt` method for channels which
13 /// is the method by which a message is sent but the thread does not panic if it
14 /// cannot be delivered.
16 /// Another major difference is that send() will *always* return back the data
17 /// if it couldn't be sent. This is because it is deterministically known when
18 /// the data is received and when it is not received.
20 /// Implementation-wise, it can all be summed up with "use a mutex plus some
21 /// logic". The mutex used here is an OS native mutex, meaning that no user code
22 /// is run inside of the mutex (to prevent context switching). This
23 /// implementation shares almost all code for the buffered and unbuffered cases
24 /// of a synchronous channel. There are a few branches for the unbuffered case,
25 /// but they're mostly just relevant to blocking senders.
26 pub use self::Failure
::*;
28 use core
::intrinsics
::abort
;
32 use crate::sync
::atomic
::{AtomicUsize, Ordering}
;
33 use crate::sync
::mpsc
::blocking
::{self, SignalToken, WaitToken}
;
34 use crate::sync
::{Mutex, MutexGuard}
;
35 use crate::time
::Instant
;
37 const MAX_REFCOUNT
: usize = (isize::MAX
) as usize;
39 pub struct Packet
<T
> {
40 /// Only field outside of the mutex. Just done for kicks, but mainly because
41 /// the other shared channel already had the code implemented
42 channels
: AtomicUsize
,
44 lock
: Mutex
<State
<T
>>,
47 unsafe impl<T
: Send
> Send
for Packet
<T
> {}
49 unsafe impl<T
: Send
> Sync
for Packet
<T
> {}
52 disconnected
: bool
, // Is the channel disconnected yet?
53 queue
: Queue
, // queue of senders waiting to send data
54 blocker
: Blocker
, // currently blocked thread on this channel
55 buf
: Buffer
<T
>, // storage for buffered messages
56 cap
: usize, // capacity of this channel
58 /// A curious flag used to indicate whether a sender failed or succeeded in
59 /// blocking. This is used to transmit information back to the thread that it
60 /// must dequeue its message from the buffer because it was not received.
61 /// This is only relevant in the 0-buffer case. This obviously cannot be
62 /// safely constructed, but it's guaranteed to always have a valid pointer
64 canceled
: Option
<&'
static mut bool
>,
67 unsafe impl<T
: Send
> Send
for State
<T
> {}
69 /// Possible flavors of threads who can be blocked on this channel.
71 BlockedSender(SignalToken
),
72 BlockedReceiver(SignalToken
),
76 /// Simple queue for threading threads together. Nodes are stack-allocated, so
77 /// this structure is not safe at all
84 token
: Option
<SignalToken
>,
88 unsafe impl Send
for Node {}
90 /// A simple ring-buffer
103 /// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
104 /// in the meantime. This re-locks the mutex upon returning.
106 lock
: &'a Mutex
<State
<T
>>,
107 mut guard
: MutexGuard
<'b
, State
<T
>>,
108 f
: fn(SignalToken
) -> Blocker
,
109 ) -> MutexGuard
<'a
, State
<T
>> {
110 let (wait_token
, signal_token
) = blocking
::tokens();
111 match mem
::replace(&mut guard
.blocker
, f(signal_token
)) {
115 drop(guard
); // unlock
116 wait_token
.wait(); // block
117 lock
.lock().unwrap() // relock
120 /// Same as wait, but waiting at most until `deadline`.
121 fn wait_timeout_receiver
<'a
, 'b
, T
>(
122 lock
: &'a Mutex
<State
<T
>>,
124 mut guard
: MutexGuard
<'b
, State
<T
>>,
126 ) -> MutexGuard
<'a
, State
<T
>> {
127 let (wait_token
, signal_token
) = blocking
::tokens();
128 match mem
::replace(&mut guard
.blocker
, BlockedReceiver(signal_token
)) {
132 drop(guard
); // unlock
133 *success
= wait_token
.wait_max_until(deadline
); // block
134 let mut new_guard
= lock
.lock().unwrap(); // relock
136 abort_selection(&mut new_guard
);
141 fn abort_selection
<T
>(guard
: &mut MutexGuard
<'_
, State
<T
>>) -> bool
{
142 match mem
::replace(&mut guard
.blocker
, NoneBlocked
) {
144 BlockedSender(token
) => {
145 guard
.blocker
= BlockedSender(token
);
148 BlockedReceiver(token
) => {
155 /// Wakes up a thread, dropping the lock at the correct time
156 fn wakeup
<T
>(token
: SignalToken
, guard
: MutexGuard
<'_
, State
<T
>>) {
157 // We need to be careful to wake up the waiting thread *outside* of the mutex
158 // in case it incurs a context switch.
164 pub fn new(capacity
: usize) -> Packet
<T
> {
166 channels
: AtomicUsize
::new(1),
167 lock
: Mutex
::new(State
{
169 blocker
: NoneBlocked
,
172 queue
: Queue { head: ptr::null_mut(), tail: ptr::null_mut() }
,
174 buf
: (0..capacity
+ if capacity
== 0 { 1 }
else { 0 }
).map(|_
| None
).collect(),
182 // wait until a send slot is available, returning locked access to
183 // the channel state.
184 fn acquire_send_slot(&self) -> MutexGuard
<'_
, State
<T
>> {
185 let mut node
= Node { token: None, next: ptr::null_mut() }
;
187 let mut guard
= self.lock
.lock().unwrap();
188 // are we ready to go?
189 if guard
.disconnected
|| guard
.buf
.size() < guard
.buf
.capacity() {
192 // no room; actually block
193 let wait_token
= guard
.queue
.enqueue(&mut node
);
199 pub fn send(&self, t
: T
) -> Result
<(), T
> {
200 let mut guard
= self.acquire_send_slot();
201 if guard
.disconnected
{
204 guard
.buf
.enqueue(t
);
206 match mem
::replace(&mut guard
.blocker
, NoneBlocked
) {
207 // if our capacity is 0, then we need to wait for a receiver to be
208 // available to take our data. After waiting, we check again to make
209 // sure the port didn't go away in the meantime. If it did, we need
210 // to hand back our data.
211 NoneBlocked
if guard
.cap
== 0 => {
212 let mut canceled
= false;
213 assert
!(guard
.canceled
.is_none());
214 guard
.canceled
= Some(unsafe { mem::transmute(&mut canceled) }
);
215 let mut guard
= wait(&self.lock
, guard
, BlockedSender
);
216 if canceled { Err(guard.buf.dequeue()) }
else { Ok(()) }
219 // success, we buffered some data
220 NoneBlocked
=> Ok(()),
222 // success, someone's about to receive our buffered data.
223 BlockedReceiver(token
) => {
224 wakeup(token
, guard
);
228 BlockedSender(..) => panic
!("lolwut"),
232 pub fn try_send(&self, t
: T
) -> Result
<(), super::TrySendError
<T
>> {
233 let mut guard
= self.lock
.lock().unwrap();
234 if guard
.disconnected
{
235 Err(super::TrySendError
::Disconnected(t
))
236 } else if guard
.buf
.size() == guard
.buf
.capacity() {
237 Err(super::TrySendError
::Full(t
))
238 } else if guard
.cap
== 0 {
239 // With capacity 0, even though we have buffer space we can't
240 // transfer the data unless there's a receiver waiting.
241 match mem
::replace(&mut guard
.blocker
, NoneBlocked
) {
242 NoneBlocked
=> Err(super::TrySendError
::Full(t
)),
243 BlockedSender(..) => unreachable
!(),
244 BlockedReceiver(token
) => {
245 guard
.buf
.enqueue(t
);
246 wakeup(token
, guard
);
251 // If the buffer has some space and the capacity isn't 0, then we
252 // just enqueue the data for later retrieval, ensuring to wake up
253 // any blocked receiver if there is one.
254 assert
!(guard
.buf
.size() < guard
.buf
.capacity());
255 guard
.buf
.enqueue(t
);
256 match mem
::replace(&mut guard
.blocker
, NoneBlocked
) {
257 BlockedReceiver(token
) => wakeup(token
, guard
),
259 BlockedSender(..) => unreachable
!(),
265 // Receives a message from this channel
267 // When reading this, remember that there can only ever be one receiver at
269 pub fn recv(&self, deadline
: Option
<Instant
>) -> Result
<T
, Failure
> {
270 let mut guard
= self.lock
.lock().unwrap();
272 let mut woke_up_after_waiting
= false;
273 // Wait for the buffer to have something in it. No need for a
274 // while loop because we're the only receiver.
275 if !guard
.disconnected
&& guard
.buf
.size() == 0 {
276 if let Some(deadline
) = deadline
{
278 wait_timeout_receiver(&self.lock
, deadline
, guard
, &mut woke_up_after_waiting
);
280 guard
= wait(&self.lock
, guard
, BlockedReceiver
);
281 woke_up_after_waiting
= true;
285 // N.B., channel could be disconnected while waiting, so the order of
286 // these conditionals is important.
287 if guard
.disconnected
&& guard
.buf
.size() == 0 {
288 return Err(Disconnected
);
291 // Pick up the data, wake up our neighbors, and carry on
292 assert
!(guard
.buf
.size() > 0 || (deadline
.is_some() && !woke_up_after_waiting
));
294 if guard
.buf
.size() == 0 {
298 let ret
= guard
.buf
.dequeue();
299 self.wakeup_senders(woke_up_after_waiting
, guard
);
303 pub fn try_recv(&self) -> Result
<T
, Failure
> {
304 let mut guard
= self.lock
.lock().unwrap();
307 if guard
.disconnected
&& guard
.buf
.size() == 0 {
308 return Err(Disconnected
);
310 if guard
.buf
.size() == 0 {
314 // Be sure to wake up neighbors
315 let ret
= Ok(guard
.buf
.dequeue());
316 self.wakeup_senders(false, guard
);
320 // Wake up pending senders after some data has been received
322 // * `waited` - flag if the receiver blocked to receive some data, or if it
323 // just picked up some data on the way out
324 // * `guard` - the lock guard that is held over this channel's lock
325 fn wakeup_senders(&self, waited
: bool
, mut guard
: MutexGuard
<'_
, State
<T
>>) {
326 let pending_sender1
: Option
<SignalToken
> = guard
.queue
.dequeue();
328 // If this is a no-buffer channel (cap == 0), then if we didn't wait we
329 // need to ACK the sender. If we waited, then the sender waking us up
330 // was already the ACK.
331 let pending_sender2
= if guard
.cap
== 0 && !waited
{
332 match mem
::replace(&mut guard
.blocker
, NoneBlocked
) {
334 BlockedReceiver(..) => unreachable
!(),
335 BlockedSender(token
) => {
336 guard
.canceled
.take();
345 // only outside of the lock do we wake up the pending threads
346 pending_sender1
.map(|t
| t
.signal());
347 pending_sender2
.map(|t
| t
.signal());
350 // Prepares this shared packet for a channel clone, essentially just bumping
352 pub fn clone_chan(&self) {
353 let old_count
= self.channels
.fetch_add(1, Ordering
::SeqCst
);
355 // See comments on Arc::clone() on why we do this (for `mem::forget`).
356 if old_count
> MAX_REFCOUNT
{
363 pub fn drop_chan(&self) {
364 // Only flag the channel as disconnected if we're the last channel
365 match self.channels
.fetch_sub(1, Ordering
::SeqCst
) {
370 // Not much to do other than wake up a receiver if one's there
371 let mut guard
= self.lock
.lock().unwrap();
372 if guard
.disconnected
{
375 guard
.disconnected
= true;
376 match mem
::replace(&mut guard
.blocker
, NoneBlocked
) {
378 BlockedSender(..) => unreachable
!(),
379 BlockedReceiver(token
) => wakeup(token
, guard
),
383 pub fn drop_port(&self) {
384 let mut guard
= self.lock
.lock().unwrap();
386 if guard
.disconnected
{
389 guard
.disconnected
= true;
391 // If the capacity is 0, then the sender may want its data back after
392 // we're disconnected. Otherwise it's now our responsibility to destroy
393 // the buffered data. As with many other portions of this code, this
394 // needs to be careful to destroy the data *outside* of the lock to
396 let _data
= if guard
.cap
!= 0 { mem::take(&mut guard.buf.buf) }
else { Vec::new() }
;
398 mem
::replace(&mut guard
.queue
, Queue { head: ptr::null_mut(), tail: ptr::null_mut() }
);
400 let waiter
= match mem
::replace(&mut guard
.blocker
, NoneBlocked
) {
402 BlockedSender(token
) => {
403 *guard
.canceled
.take().unwrap() = true;
406 BlockedReceiver(..) => unreachable
!(),
410 while let Some(token
) = queue
.dequeue() {
413 waiter
.map(|t
| t
.signal());
417 impl<T
> Drop
for Packet
<T
> {
419 assert_eq
!(self.channels
.load(Ordering
::SeqCst
), 0);
420 let mut guard
= self.lock
.lock().unwrap();
421 assert
!(guard
.queue
.dequeue().is_none());
422 assert
!(guard
.canceled
.is_none());
426 ////////////////////////////////////////////////////////////////////////////////
427 // Buffer, a simple ring buffer backed by Vec<T>
428 ////////////////////////////////////////////////////////////////////////////////
431 fn enqueue(&mut self, t
: T
) {
432 let pos
= (self.start
+ self.size
) % self.buf
.len();
434 let prev
= mem
::replace(&mut self.buf
[pos
], Some(t
));
435 assert
!(prev
.is_none());
438 fn dequeue(&mut self) -> T
{
439 let start
= self.start
;
441 self.start
= (self.start
+ 1) % self.buf
.len();
442 let result
= &mut self.buf
[start
];
443 result
.take().unwrap()
446 fn size(&self) -> usize {
449 fn capacity(&self) -> usize {
454 ////////////////////////////////////////////////////////////////////////////////
455 // Queue, a simple queue to enqueue threads with (stack-allocated nodes)
456 ////////////////////////////////////////////////////////////////////////////////
459 fn enqueue(&mut self, node
: &mut Node
) -> WaitToken
{
460 let (wait_token
, signal_token
) = blocking
::tokens();
461 node
.token
= Some(signal_token
);
462 node
.next
= ptr
::null_mut();
464 if self.tail
.is_null() {
465 self.head
= node
as *mut Node
;
466 self.tail
= node
as *mut Node
;
469 (*self.tail
).next
= node
as *mut Node
;
470 self.tail
= node
as *mut Node
;
477 fn dequeue(&mut self) -> Option
<SignalToken
> {
478 if self.head
.is_null() {
481 let node
= self.head
;
482 self.head
= unsafe { (*node).next }
;
483 if self.head
.is_null() {
484 self.tail
= ptr
::null_mut();
487 (*node
).next
= ptr
::null_mut();
488 Some((*node
).token
.take().unwrap())