]> git.proxmox.com Git - rustc.git/blame - src/libstd/sync/mpsc/sync.rs
New upstream version 1.44.1+dfsg1
[rustc.git] / src / libstd / sync / mpsc / sync.rs
CommitLineData
60c5eb7d 1use self::Blocker::*;
1a4d82fc
JJ
2/// Synchronous channels/ports
3///
4/// This channel implementation differs significantly from the asynchronous
5/// implementations found next to it (oneshot/stream/share). This is an
6/// implementation of a synchronous, bounded buffer channel.
7///
8/// Each channel is created with some amount of backing buffer, and sends will
9/// *block* until buffer space becomes available. A buffer size of 0 is valid,
10/// which means that every successful send is paired with a successful recv.
11///
12/// This flavor of channels defines a new `send_opt` method for channels which
bd371182 13/// is the method by which a message is sent but the thread does not panic if it
1a4d82fc
JJ
14/// cannot be delivered.
15///
16/// Another major difference is that send() will *always* return back the data
17/// if it couldn't be sent. This is because it is deterministically known when
18/// the data is received and when it is not received.
19///
20/// Implementation-wise, it can all be summed up with "use a mutex plus some
21/// logic". The mutex used here is an OS native mutex, meaning that no user code
22/// is run inside of the mutex (to prevent context switching). This
23/// implementation shares almost all code for the buffered and unbuffered cases
24/// of a synchronous channel. There are a few branches for the unbuffered case,
25/// but they're mostly just relevant to blocking senders.
1a4d82fc 26pub use self::Failure::*;
1a4d82fc 27
9e0c209e 28use core::intrinsics::abort;
1a4d82fc 29use core::mem;
85aaf69f 30use core::ptr;
1a4d82fc 31
60c5eb7d
XL
32use crate::sync::atomic::{AtomicUsize, Ordering};
33use crate::sync::mpsc::blocking::{self, SignalToken, WaitToken};
532ac7d7
XL
34use crate::sync::{Mutex, MutexGuard};
35use crate::time::Instant;
1a4d82fc 36
9e0c209e
SL
37const MAX_REFCOUNT: usize = (isize::MAX) as usize;
38
1a4d82fc
JJ
39pub struct Packet<T> {
40 /// Only field outside of the mutex. Just done for kicks, but mainly because
41 /// the other shared channel already had the code implemented
85aaf69f 42 channels: AtomicUsize,
1a4d82fc
JJ
43
44 lock: Mutex<State<T>>,
45}
46
60c5eb7d 47unsafe impl<T: Send> Send for Packet<T> {}
1a4d82fc 48
60c5eb7d 49unsafe impl<T: Send> Sync for Packet<T> {}
1a4d82fc
JJ
50
51struct State<T> {
52 disconnected: bool, // Is the channel disconnected yet?
53 queue: Queue, // queue of senders waiting to send data
bd371182 54 blocker: Blocker, // currently blocked thread on this channel
1a4d82fc 55 buf: Buffer<T>, // storage for buffered messages
c34b1796 56 cap: usize, // capacity of this channel
1a4d82fc
JJ
57
58 /// A curious flag used to indicate whether a sender failed or succeeded in
bd371182 59 /// blocking. This is used to transmit information back to the thread that it
1a4d82fc
JJ
60 /// must dequeue its message from the buffer because it was not received.
61 /// This is only relevant in the 0-buffer case. This obviously cannot be
62 /// safely constructed, but it's guaranteed to always have a valid pointer
63 /// value.
64 canceled: Option<&'static mut bool>,
65}
66
c34b1796 67unsafe impl<T: Send> Send for State<T> {}
1a4d82fc
JJ
68
69/// Possible flavors of threads who can be blocked on this channel.
70enum Blocker {
71 BlockedSender(SignalToken),
72 BlockedReceiver(SignalToken),
60c5eb7d 73 NoneBlocked,
1a4d82fc
JJ
74}
75
bd371182 76/// Simple queue for threading threads together. Nodes are stack-allocated, so
1a4d82fc
JJ
77/// this structure is not safe at all
78struct Queue {
79 head: *mut Node,
80 tail: *mut Node,
81}
82
83struct Node {
84 token: Option<SignalToken>,
85 next: *mut Node,
86}
87
88unsafe impl Send for Node {}
89
90/// A simple ring-buffer
91struct Buffer<T> {
92 buf: Vec<Option<T>>,
c34b1796
AL
93 start: usize,
94 size: usize,
1a4d82fc
JJ
95}
96
85aaf69f 97#[derive(Debug)]
1a4d82fc
JJ
98pub enum Failure {
99 Empty,
100 Disconnected,
101}
102
103/// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
104/// in the meantime. This re-locks the mutex upon returning.
60c5eb7d
XL
105fn wait<'a, 'b, T>(
106 lock: &'a Mutex<State<T>>,
107 mut guard: MutexGuard<'b, State<T>>,
108 f: fn(SignalToken) -> Blocker,
109) -> MutexGuard<'a, State<T>> {
1a4d82fc
JJ
110 let (wait_token, signal_token) = blocking::tokens();
111 match mem::replace(&mut guard.blocker, f(signal_token)) {
112 NoneBlocked => {}
113 _ => unreachable!(),
114 }
60c5eb7d
XL
115 drop(guard); // unlock
116 wait_token.wait(); // block
1a4d82fc
JJ
117 lock.lock().unwrap() // relock
118}
119
3157f602 120/// Same as wait, but waiting at most until `deadline`.
60c5eb7d
XL
121fn wait_timeout_receiver<'a, 'b, T>(
122 lock: &'a Mutex<State<T>>,
123 deadline: Instant,
124 mut guard: MutexGuard<'b, State<T>>,
125 success: &mut bool,
126) -> MutexGuard<'a, State<T>> {
3157f602
XL
127 let (wait_token, signal_token) = blocking::tokens();
128 match mem::replace(&mut guard.blocker, BlockedReceiver(signal_token)) {
129 NoneBlocked => {}
130 _ => unreachable!(),
131 }
60c5eb7d
XL
132 drop(guard); // unlock
133 *success = wait_token.wait_max_until(deadline); // block
3157f602
XL
134 let mut new_guard = lock.lock().unwrap(); // relock
135 if !*success {
136 abort_selection(&mut new_guard);
137 }
138 new_guard
139}
140
416331ca 141fn abort_selection<T>(guard: &mut MutexGuard<'_, State<T>>) -> bool {
3157f602
XL
142 match mem::replace(&mut guard.blocker, NoneBlocked) {
143 NoneBlocked => true,
144 BlockedSender(token) => {
145 guard.blocker = BlockedSender(token);
146 true
147 }
60c5eb7d
XL
148 BlockedReceiver(token) => {
149 drop(token);
150 false
151 }
3157f602
XL
152 }
153}
154
1a4d82fc 155/// Wakes up a thread, dropping the lock at the correct time
532ac7d7 156fn wakeup<T>(token: SignalToken, guard: MutexGuard<'_, State<T>>) {
bd371182 157 // We need to be careful to wake up the waiting thread *outside* of the mutex
1a4d82fc
JJ
158 // in case it incurs a context switch.
159 drop(guard);
160 token.signal();
161}
162
c34b1796 163impl<T> Packet<T> {
416331ca 164 pub fn new(capacity: usize) -> Packet<T> {
1a4d82fc 165 Packet {
85aaf69f 166 channels: AtomicUsize::new(1),
1a4d82fc
JJ
167 lock: Mutex::new(State {
168 disconnected: false,
169 blocker: NoneBlocked,
416331ca 170 cap: capacity,
1a4d82fc 171 canceled: None,
60c5eb7d 172 queue: Queue { head: ptr::null_mut(), tail: ptr::null_mut() },
1a4d82fc 173 buf: Buffer {
60c5eb7d 174 buf: (0..capacity + if capacity == 0 { 1 } else { 0 }).map(|_| None).collect(),
1a4d82fc
JJ
175 start: 0,
176 size: 0,
177 },
178 }),
179 }
180 }
181
182 // wait until a send slot is available, returning locked access to
183 // the channel state.
532ac7d7 184 fn acquire_send_slot(&self) -> MutexGuard<'_, State<T>> {
85aaf69f 185 let mut node = Node { token: None, next: ptr::null_mut() };
1a4d82fc
JJ
186 loop {
187 let mut guard = self.lock.lock().unwrap();
188 // are we ready to go?
416331ca 189 if guard.disconnected || guard.buf.size() < guard.buf.capacity() {
1a4d82fc
JJ
190 return guard;
191 }
192 // no room; actually block
193 let wait_token = guard.queue.enqueue(&mut node);
194 drop(guard);
195 wait_token.wait();
196 }
197 }
198
199 pub fn send(&self, t: T) -> Result<(), T> {
200 let mut guard = self.acquire_send_slot();
60c5eb7d
XL
201 if guard.disconnected {
202 return Err(t);
203 }
1a4d82fc
JJ
204 guard.buf.enqueue(t);
205
206 match mem::replace(&mut guard.blocker, NoneBlocked) {
207 // if our capacity is 0, then we need to wait for a receiver to be
208 // available to take our data. After waiting, we check again to make
209 // sure the port didn't go away in the meantime. If it did, we need
210 // to hand back our data.
211 NoneBlocked if guard.cap == 0 => {
212 let mut canceled = false;
213 assert!(guard.canceled.is_none());
214 guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
215 let mut guard = wait(&self.lock, guard, BlockedSender);
60c5eb7d 216 if canceled { Err(guard.buf.dequeue()) } else { Ok(()) }
1a4d82fc
JJ
217 }
218
219 // success, we buffered some data
220 NoneBlocked => Ok(()),
221
222 // success, someone's about to receive our buffered data.
60c5eb7d
XL
223 BlockedReceiver(token) => {
224 wakeup(token, guard);
225 Ok(())
226 }
1a4d82fc
JJ
227
228 BlockedSender(..) => panic!("lolwut"),
229 }
230 }
231
232 pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
233 let mut guard = self.lock.lock().unwrap();
234 if guard.disconnected {
235 Err(super::TrySendError::Disconnected(t))
416331ca 236 } else if guard.buf.size() == guard.buf.capacity() {
1a4d82fc
JJ
237 Err(super::TrySendError::Full(t))
238 } else if guard.cap == 0 {
239 // With capacity 0, even though we have buffer space we can't
240 // transfer the data unless there's a receiver waiting.
241 match mem::replace(&mut guard.blocker, NoneBlocked) {
242 NoneBlocked => Err(super::TrySendError::Full(t)),
243 BlockedSender(..) => unreachable!(),
244 BlockedReceiver(token) => {
245 guard.buf.enqueue(t);
246 wakeup(token, guard);
247 Ok(())
248 }
249 }
250 } else {
251 // If the buffer has some space and the capacity isn't 0, then we
252 // just enqueue the data for later retrieval, ensuring to wake up
253 // any blocked receiver if there is one.
416331ca 254 assert!(guard.buf.size() < guard.buf.capacity());
1a4d82fc
JJ
255 guard.buf.enqueue(t);
256 match mem::replace(&mut guard.blocker, NoneBlocked) {
257 BlockedReceiver(token) => wakeup(token, guard),
258 NoneBlocked => {}
259 BlockedSender(..) => unreachable!(),
260 }
261 Ok(())
262 }
263 }
264
265 // Receives a message from this channel
266 //
267 // When reading this, remember that there can only ever be one receiver at
268 // time.
3157f602 269 pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
1a4d82fc
JJ
270 let mut guard = self.lock.lock().unwrap();
271
3157f602
XL
272 let mut woke_up_after_waiting = false;
273 // Wait for the buffer to have something in it. No need for a
274 // while loop because we're the only receiver.
1a4d82fc 275 if !guard.disconnected && guard.buf.size() == 0 {
3157f602 276 if let Some(deadline) = deadline {
60c5eb7d
XL
277 guard =
278 wait_timeout_receiver(&self.lock, deadline, guard, &mut woke_up_after_waiting);
3157f602
XL
279 } else {
280 guard = wait(&self.lock, guard, BlockedReceiver);
281 woke_up_after_waiting = true;
282 }
283 }
284
0731742a 285 // N.B., channel could be disconnected while waiting, so the order of
3157f602
XL
286 // these conditionals is important.
287 if guard.disconnected && guard.buf.size() == 0 {
288 return Err(Disconnected);
1a4d82fc 289 }
1a4d82fc
JJ
290
291 // Pick up the data, wake up our neighbors, and carry on
3157f602
XL
292 assert!(guard.buf.size() > 0 || (deadline.is_some() && !woke_up_after_waiting));
293
60c5eb7d
XL
294 if guard.buf.size() == 0 {
295 return Err(Empty);
296 }
3157f602 297
1a4d82fc 298 let ret = guard.buf.dequeue();
3157f602 299 self.wakeup_senders(woke_up_after_waiting, guard);
e9174d1e 300 Ok(ret)
1a4d82fc
JJ
301 }
302
303 pub fn try_recv(&self) -> Result<T, Failure> {
304 let mut guard = self.lock.lock().unwrap();
305
306 // Easy cases first
60c5eb7d
XL
307 if guard.disconnected && guard.buf.size() == 0 {
308 return Err(Disconnected);
309 }
310 if guard.buf.size() == 0 {
311 return Err(Empty);
312 }
1a4d82fc
JJ
313
314 // Be sure to wake up neighbors
315 let ret = Ok(guard.buf.dequeue());
316 self.wakeup_senders(false, guard);
e9174d1e 317 ret
1a4d82fc
JJ
318 }
319
320 // Wake up pending senders after some data has been received
321 //
322 // * `waited` - flag if the receiver blocked to receive some data, or if it
323 // just picked up some data on the way out
324 // * `guard` - the lock guard that is held over this channel's lock
532ac7d7 325 fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<'_, State<T>>) {
1a4d82fc
JJ
326 let pending_sender1: Option<SignalToken> = guard.queue.dequeue();
327
328 // If this is a no-buffer channel (cap == 0), then if we didn't wait we
329 // need to ACK the sender. If we waited, then the sender waking us up
330 // was already the ACK.
331 let pending_sender2 = if guard.cap == 0 && !waited {
332 match mem::replace(&mut guard.blocker, NoneBlocked) {
333 NoneBlocked => None,
334 BlockedReceiver(..) => unreachable!(),
335 BlockedSender(token) => {
336 guard.canceled.take();
337 Some(token)
338 }
339 }
340 } else {
341 None
342 };
343 mem::drop(guard);
344
bd371182 345 // only outside of the lock do we wake up the pending threads
1a4d82fc
JJ
346 pending_sender1.map(|t| t.signal());
347 pending_sender2.map(|t| t.signal());
348 }
349
350 // Prepares this shared packet for a channel clone, essentially just bumping
351 // a refcount.
352 pub fn clone_chan(&self) {
9e0c209e
SL
353 let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
354
355 // See comments on Arc::clone() on why we do this (for `mem::forget`).
356 if old_count > MAX_REFCOUNT {
357 unsafe {
358 abort();
359 }
360 }
1a4d82fc
JJ
361 }
362
363 pub fn drop_chan(&self) {
364 // Only flag the channel as disconnected if we're the last channel
365 match self.channels.fetch_sub(1, Ordering::SeqCst) {
366 1 => {}
60c5eb7d 367 _ => return,
1a4d82fc
JJ
368 }
369
370 // Not much to do other than wake up a receiver if one's there
371 let mut guard = self.lock.lock().unwrap();
60c5eb7d
XL
372 if guard.disconnected {
373 return;
374 }
1a4d82fc
JJ
375 guard.disconnected = true;
376 match mem::replace(&mut guard.blocker, NoneBlocked) {
377 NoneBlocked => {}
378 BlockedSender(..) => unreachable!(),
379 BlockedReceiver(token) => wakeup(token, guard),
380 }
381 }
382
383 pub fn drop_port(&self) {
384 let mut guard = self.lock.lock().unwrap();
385
60c5eb7d
XL
386 if guard.disconnected {
387 return;
388 }
1a4d82fc
JJ
389 guard.disconnected = true;
390
391 // If the capacity is 0, then the sender may want its data back after
392 // we're disconnected. Otherwise it's now our responsibility to destroy
393 // the buffered data. As with many other portions of this code, this
394 // needs to be careful to destroy the data *outside* of the lock to
395 // prevent deadlock.
60c5eb7d
XL
396 let _data = if guard.cap != 0 { mem::take(&mut guard.buf.buf) } else { Vec::new() };
397 let mut queue =
398 mem::replace(&mut guard.queue, Queue { head: ptr::null_mut(), tail: ptr::null_mut() });
1a4d82fc
JJ
399
400 let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) {
401 NoneBlocked => None,
402 BlockedSender(token) => {
403 *guard.canceled.take().unwrap() = true;
404 Some(token)
405 }
406 BlockedReceiver(..) => unreachable!(),
407 };
408 mem::drop(guard);
409
60c5eb7d
XL
410 while let Some(token) = queue.dequeue() {
411 token.signal();
412 }
1a4d82fc
JJ
413 waiter.map(|t| t.signal());
414 }
1a4d82fc
JJ
415}
416
c34b1796 417impl<T> Drop for Packet<T> {
1a4d82fc
JJ
418 fn drop(&mut self) {
419 assert_eq!(self.channels.load(Ordering::SeqCst), 0);
420 let mut guard = self.lock.lock().unwrap();
421 assert!(guard.queue.dequeue().is_none());
422 assert!(guard.canceled.is_none());
423 }
424}
425
1a4d82fc
JJ
426////////////////////////////////////////////////////////////////////////////////
427// Buffer, a simple ring buffer backed by Vec<T>
428////////////////////////////////////////////////////////////////////////////////
429
430impl<T> Buffer<T> {
431 fn enqueue(&mut self, t: T) {
432 let pos = (self.start + self.size) % self.buf.len();
433 self.size += 1;
434 let prev = mem::replace(&mut self.buf[pos], Some(t));
435 assert!(prev.is_none());
436 }
437
438 fn dequeue(&mut self) -> T {
439 let start = self.start;
440 self.size -= 1;
441 self.start = (self.start + 1) % self.buf.len();
442 let result = &mut self.buf[start];
443 result.take().unwrap()
444 }
445
60c5eb7d
XL
446 fn size(&self) -> usize {
447 self.size
448 }
449 fn capacity(&self) -> usize {
450 self.buf.len()
451 }
1a4d82fc
JJ
452}
453
454////////////////////////////////////////////////////////////////////////////////
bd371182 455// Queue, a simple queue to enqueue threads with (stack-allocated nodes)
1a4d82fc
JJ
456////////////////////////////////////////////////////////////////////////////////
457
458impl Queue {
459 fn enqueue(&mut self, node: &mut Node) -> WaitToken {
460 let (wait_token, signal_token) = blocking::tokens();
461 node.token = Some(signal_token);
85aaf69f 462 node.next = ptr::null_mut();
1a4d82fc
JJ
463
464 if self.tail.is_null() {
465 self.head = node as *mut Node;
466 self.tail = node as *mut Node;
467 } else {
468 unsafe {
469 (*self.tail).next = node as *mut Node;
470 self.tail = node as *mut Node;
471 }
472 }
473
474 wait_token
475 }
476
477 fn dequeue(&mut self) -> Option<SignalToken> {
478 if self.head.is_null() {
60c5eb7d 479 return None;
1a4d82fc
JJ
480 }
481 let node = self.head;
482 self.head = unsafe { (*node).next };
483 if self.head.is_null() {
85aaf69f 484 self.tail = ptr::null_mut();
1a4d82fc
JJ
485 }
486 unsafe {
85aaf69f 487 (*node).next = ptr::null_mut();
1a4d82fc
JJ
488 Some((*node).token.take().unwrap())
489 }
490 }
491}