]>
Commit | Line | Data |
---|---|---|
60c5eb7d | 1 | use self::Blocker::*; |
1a4d82fc JJ |
2 | /// Synchronous channels/ports |
3 | /// | |
4 | /// This channel implementation differs significantly from the asynchronous | |
5 | /// implementations found next to it (oneshot/stream/share). This is an | |
6 | /// implementation of a synchronous, bounded buffer channel. | |
7 | /// | |
8 | /// Each channel is created with some amount of backing buffer, and sends will | |
9 | /// *block* until buffer space becomes available. A buffer size of 0 is valid, | |
10 | /// which means that every successful send is paired with a successful recv. | |
11 | /// | |
12 | /// This flavor of channels defines a new `send_opt` method for channels which | |
bd371182 | 13 | /// is the method by which a message is sent but the thread does not panic if it |
1a4d82fc JJ |
14 | /// cannot be delivered. |
15 | /// | |
16 | /// Another major difference is that send() will *always* return back the data | |
17 | /// if it couldn't be sent. This is because it is deterministically known when | |
18 | /// the data is received and when it is not received. | |
19 | /// | |
20 | /// Implementation-wise, it can all be summed up with "use a mutex plus some | |
21 | /// logic". The mutex used here is an OS native mutex, meaning that no user code | |
22 | /// is run inside of the mutex (to prevent context switching). This | |
23 | /// implementation shares almost all code for the buffered and unbuffered cases | |
24 | /// of a synchronous channel. There are a few branches for the unbuffered case, | |
25 | /// but they're mostly just relevant to blocking senders. | |
1a4d82fc | 26 | pub use self::Failure::*; |
1a4d82fc | 27 | |
9e0c209e | 28 | use core::intrinsics::abort; |
1a4d82fc | 29 | use core::mem; |
85aaf69f | 30 | use core::ptr; |
1a4d82fc | 31 | |
60c5eb7d XL |
32 | use crate::sync::atomic::{AtomicUsize, Ordering}; |
33 | use crate::sync::mpsc::blocking::{self, SignalToken, WaitToken}; | |
532ac7d7 XL |
34 | use crate::sync::{Mutex, MutexGuard}; |
35 | use crate::time::Instant; | |
1a4d82fc | 36 | |
9e0c209e SL |
37 | const MAX_REFCOUNT: usize = (isize::MAX) as usize; |
38 | ||
1a4d82fc JJ |
39 | pub struct Packet<T> { |
40 | /// Only field outside of the mutex. Just done for kicks, but mainly because | |
41 | /// the other shared channel already had the code implemented | |
85aaf69f | 42 | channels: AtomicUsize, |
1a4d82fc JJ |
43 | |
44 | lock: Mutex<State<T>>, | |
45 | } | |
46 | ||
60c5eb7d | 47 | unsafe impl<T: Send> Send for Packet<T> {} |
1a4d82fc | 48 | |
60c5eb7d | 49 | unsafe impl<T: Send> Sync for Packet<T> {} |
1a4d82fc JJ |
50 | |
51 | struct State<T> { | |
52 | disconnected: bool, // Is the channel disconnected yet? | |
53 | queue: Queue, // queue of senders waiting to send data | |
bd371182 | 54 | blocker: Blocker, // currently blocked thread on this channel |
1a4d82fc | 55 | buf: Buffer<T>, // storage for buffered messages |
c34b1796 | 56 | cap: usize, // capacity of this channel |
1a4d82fc JJ |
57 | |
58 | /// A curious flag used to indicate whether a sender failed or succeeded in | |
bd371182 | 59 | /// blocking. This is used to transmit information back to the thread that it |
1a4d82fc JJ |
60 | /// must dequeue its message from the buffer because it was not received. |
61 | /// This is only relevant in the 0-buffer case. This obviously cannot be | |
62 | /// safely constructed, but it's guaranteed to always have a valid pointer | |
63 | /// value. | |
64 | canceled: Option<&'static mut bool>, | |
65 | } | |
66 | ||
c34b1796 | 67 | unsafe impl<T: Send> Send for State<T> {} |
1a4d82fc JJ |
68 | |
69 | /// Possible flavors of threads who can be blocked on this channel. | |
70 | enum Blocker { | |
71 | BlockedSender(SignalToken), | |
72 | BlockedReceiver(SignalToken), | |
60c5eb7d | 73 | NoneBlocked, |
1a4d82fc JJ |
74 | } |
75 | ||
bd371182 | 76 | /// Simple queue for threading threads together. Nodes are stack-allocated, so |
1a4d82fc JJ |
77 | /// this structure is not safe at all |
78 | struct Queue { | |
79 | head: *mut Node, | |
80 | tail: *mut Node, | |
81 | } | |
82 | ||
83 | struct Node { | |
84 | token: Option<SignalToken>, | |
85 | next: *mut Node, | |
86 | } | |
87 | ||
88 | unsafe impl Send for Node {} | |
89 | ||
90 | /// A simple ring-buffer | |
91 | struct Buffer<T> { | |
92 | buf: Vec<Option<T>>, | |
c34b1796 AL |
93 | start: usize, |
94 | size: usize, | |
1a4d82fc JJ |
95 | } |
96 | ||
85aaf69f | 97 | #[derive(Debug)] |
1a4d82fc JJ |
98 | pub enum Failure { |
99 | Empty, | |
100 | Disconnected, | |
101 | } | |
102 | ||
103 | /// Atomically blocks the current thread, placing it into `slot`, unlocking `lock` | |
104 | /// in the meantime. This re-locks the mutex upon returning. | |
60c5eb7d XL |
105 | fn wait<'a, 'b, T>( |
106 | lock: &'a Mutex<State<T>>, | |
107 | mut guard: MutexGuard<'b, State<T>>, | |
108 | f: fn(SignalToken) -> Blocker, | |
109 | ) -> MutexGuard<'a, State<T>> { | |
1a4d82fc JJ |
110 | let (wait_token, signal_token) = blocking::tokens(); |
111 | match mem::replace(&mut guard.blocker, f(signal_token)) { | |
112 | NoneBlocked => {} | |
113 | _ => unreachable!(), | |
114 | } | |
60c5eb7d XL |
115 | drop(guard); // unlock |
116 | wait_token.wait(); // block | |
1a4d82fc JJ |
117 | lock.lock().unwrap() // relock |
118 | } | |
119 | ||
3157f602 | 120 | /// Same as wait, but waiting at most until `deadline`. |
60c5eb7d XL |
121 | fn wait_timeout_receiver<'a, 'b, T>( |
122 | lock: &'a Mutex<State<T>>, | |
123 | deadline: Instant, | |
124 | mut guard: MutexGuard<'b, State<T>>, | |
125 | success: &mut bool, | |
126 | ) -> MutexGuard<'a, State<T>> { | |
3157f602 XL |
127 | let (wait_token, signal_token) = blocking::tokens(); |
128 | match mem::replace(&mut guard.blocker, BlockedReceiver(signal_token)) { | |
129 | NoneBlocked => {} | |
130 | _ => unreachable!(), | |
131 | } | |
60c5eb7d XL |
132 | drop(guard); // unlock |
133 | *success = wait_token.wait_max_until(deadline); // block | |
3157f602 XL |
134 | let mut new_guard = lock.lock().unwrap(); // relock |
135 | if !*success { | |
136 | abort_selection(&mut new_guard); | |
137 | } | |
138 | new_guard | |
139 | } | |
140 | ||
416331ca | 141 | fn abort_selection<T>(guard: &mut MutexGuard<'_, State<T>>) -> bool { |
3157f602 XL |
142 | match mem::replace(&mut guard.blocker, NoneBlocked) { |
143 | NoneBlocked => true, | |
144 | BlockedSender(token) => { | |
145 | guard.blocker = BlockedSender(token); | |
146 | true | |
147 | } | |
60c5eb7d XL |
148 | BlockedReceiver(token) => { |
149 | drop(token); | |
150 | false | |
151 | } | |
3157f602 XL |
152 | } |
153 | } | |
154 | ||
1a4d82fc | 155 | /// Wakes up a thread, dropping the lock at the correct time |
532ac7d7 | 156 | fn wakeup<T>(token: SignalToken, guard: MutexGuard<'_, State<T>>) { |
bd371182 | 157 | // We need to be careful to wake up the waiting thread *outside* of the mutex |
1a4d82fc JJ |
158 | // in case it incurs a context switch. |
159 | drop(guard); | |
160 | token.signal(); | |
161 | } | |
162 | ||
c34b1796 | 163 | impl<T> Packet<T> { |
416331ca | 164 | pub fn new(capacity: usize) -> Packet<T> { |
1a4d82fc | 165 | Packet { |
85aaf69f | 166 | channels: AtomicUsize::new(1), |
1a4d82fc JJ |
167 | lock: Mutex::new(State { |
168 | disconnected: false, | |
169 | blocker: NoneBlocked, | |
416331ca | 170 | cap: capacity, |
1a4d82fc | 171 | canceled: None, |
60c5eb7d | 172 | queue: Queue { head: ptr::null_mut(), tail: ptr::null_mut() }, |
1a4d82fc | 173 | buf: Buffer { |
60c5eb7d | 174 | buf: (0..capacity + if capacity == 0 { 1 } else { 0 }).map(|_| None).collect(), |
1a4d82fc JJ |
175 | start: 0, |
176 | size: 0, | |
177 | }, | |
178 | }), | |
179 | } | |
180 | } | |
181 | ||
182 | // wait until a send slot is available, returning locked access to | |
183 | // the channel state. | |
532ac7d7 | 184 | fn acquire_send_slot(&self) -> MutexGuard<'_, State<T>> { |
85aaf69f | 185 | let mut node = Node { token: None, next: ptr::null_mut() }; |
1a4d82fc JJ |
186 | loop { |
187 | let mut guard = self.lock.lock().unwrap(); | |
188 | // are we ready to go? | |
416331ca | 189 | if guard.disconnected || guard.buf.size() < guard.buf.capacity() { |
1a4d82fc JJ |
190 | return guard; |
191 | } | |
192 | // no room; actually block | |
193 | let wait_token = guard.queue.enqueue(&mut node); | |
194 | drop(guard); | |
195 | wait_token.wait(); | |
196 | } | |
197 | } | |
198 | ||
199 | pub fn send(&self, t: T) -> Result<(), T> { | |
200 | let mut guard = self.acquire_send_slot(); | |
60c5eb7d XL |
201 | if guard.disconnected { |
202 | return Err(t); | |
203 | } | |
1a4d82fc JJ |
204 | guard.buf.enqueue(t); |
205 | ||
206 | match mem::replace(&mut guard.blocker, NoneBlocked) { | |
207 | // if our capacity is 0, then we need to wait for a receiver to be | |
208 | // available to take our data. After waiting, we check again to make | |
209 | // sure the port didn't go away in the meantime. If it did, we need | |
210 | // to hand back our data. | |
211 | NoneBlocked if guard.cap == 0 => { | |
212 | let mut canceled = false; | |
213 | assert!(guard.canceled.is_none()); | |
214 | guard.canceled = Some(unsafe { mem::transmute(&mut canceled) }); | |
215 | let mut guard = wait(&self.lock, guard, BlockedSender); | |
60c5eb7d | 216 | if canceled { Err(guard.buf.dequeue()) } else { Ok(()) } |
1a4d82fc JJ |
217 | } |
218 | ||
219 | // success, we buffered some data | |
220 | NoneBlocked => Ok(()), | |
221 | ||
222 | // success, someone's about to receive our buffered data. | |
60c5eb7d XL |
223 | BlockedReceiver(token) => { |
224 | wakeup(token, guard); | |
225 | Ok(()) | |
226 | } | |
1a4d82fc JJ |
227 | |
228 | BlockedSender(..) => panic!("lolwut"), | |
229 | } | |
230 | } | |
231 | ||
232 | pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> { | |
233 | let mut guard = self.lock.lock().unwrap(); | |
234 | if guard.disconnected { | |
235 | Err(super::TrySendError::Disconnected(t)) | |
416331ca | 236 | } else if guard.buf.size() == guard.buf.capacity() { |
1a4d82fc JJ |
237 | Err(super::TrySendError::Full(t)) |
238 | } else if guard.cap == 0 { | |
239 | // With capacity 0, even though we have buffer space we can't | |
240 | // transfer the data unless there's a receiver waiting. | |
241 | match mem::replace(&mut guard.blocker, NoneBlocked) { | |
242 | NoneBlocked => Err(super::TrySendError::Full(t)), | |
243 | BlockedSender(..) => unreachable!(), | |
244 | BlockedReceiver(token) => { | |
245 | guard.buf.enqueue(t); | |
246 | wakeup(token, guard); | |
247 | Ok(()) | |
248 | } | |
249 | } | |
250 | } else { | |
251 | // If the buffer has some space and the capacity isn't 0, then we | |
252 | // just enqueue the data for later retrieval, ensuring to wake up | |
253 | // any blocked receiver if there is one. | |
416331ca | 254 | assert!(guard.buf.size() < guard.buf.capacity()); |
1a4d82fc JJ |
255 | guard.buf.enqueue(t); |
256 | match mem::replace(&mut guard.blocker, NoneBlocked) { | |
257 | BlockedReceiver(token) => wakeup(token, guard), | |
258 | NoneBlocked => {} | |
259 | BlockedSender(..) => unreachable!(), | |
260 | } | |
261 | Ok(()) | |
262 | } | |
263 | } | |
264 | ||
265 | // Receives a message from this channel | |
266 | // | |
267 | // When reading this, remember that there can only ever be one receiver at | |
268 | // time. | |
3157f602 | 269 | pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> { |
1a4d82fc JJ |
270 | let mut guard = self.lock.lock().unwrap(); |
271 | ||
3157f602 XL |
272 | let mut woke_up_after_waiting = false; |
273 | // Wait for the buffer to have something in it. No need for a | |
274 | // while loop because we're the only receiver. | |
1a4d82fc | 275 | if !guard.disconnected && guard.buf.size() == 0 { |
3157f602 | 276 | if let Some(deadline) = deadline { |
60c5eb7d XL |
277 | guard = |
278 | wait_timeout_receiver(&self.lock, deadline, guard, &mut woke_up_after_waiting); | |
3157f602 XL |
279 | } else { |
280 | guard = wait(&self.lock, guard, BlockedReceiver); | |
281 | woke_up_after_waiting = true; | |
282 | } | |
283 | } | |
284 | ||
0731742a | 285 | // N.B., channel could be disconnected while waiting, so the order of |
3157f602 XL |
286 | // these conditionals is important. |
287 | if guard.disconnected && guard.buf.size() == 0 { | |
288 | return Err(Disconnected); | |
1a4d82fc | 289 | } |
1a4d82fc JJ |
290 | |
291 | // Pick up the data, wake up our neighbors, and carry on | |
3157f602 XL |
292 | assert!(guard.buf.size() > 0 || (deadline.is_some() && !woke_up_after_waiting)); |
293 | ||
60c5eb7d XL |
294 | if guard.buf.size() == 0 { |
295 | return Err(Empty); | |
296 | } | |
3157f602 | 297 | |
1a4d82fc | 298 | let ret = guard.buf.dequeue(); |
3157f602 | 299 | self.wakeup_senders(woke_up_after_waiting, guard); |
e9174d1e | 300 | Ok(ret) |
1a4d82fc JJ |
301 | } |
302 | ||
303 | pub fn try_recv(&self) -> Result<T, Failure> { | |
304 | let mut guard = self.lock.lock().unwrap(); | |
305 | ||
306 | // Easy cases first | |
60c5eb7d XL |
307 | if guard.disconnected && guard.buf.size() == 0 { |
308 | return Err(Disconnected); | |
309 | } | |
310 | if guard.buf.size() == 0 { | |
311 | return Err(Empty); | |
312 | } | |
1a4d82fc JJ |
313 | |
314 | // Be sure to wake up neighbors | |
315 | let ret = Ok(guard.buf.dequeue()); | |
316 | self.wakeup_senders(false, guard); | |
e9174d1e | 317 | ret |
1a4d82fc JJ |
318 | } |
319 | ||
320 | // Wake up pending senders after some data has been received | |
321 | // | |
322 | // * `waited` - flag if the receiver blocked to receive some data, or if it | |
323 | // just picked up some data on the way out | |
324 | // * `guard` - the lock guard that is held over this channel's lock | |
532ac7d7 | 325 | fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<'_, State<T>>) { |
1a4d82fc JJ |
326 | let pending_sender1: Option<SignalToken> = guard.queue.dequeue(); |
327 | ||
328 | // If this is a no-buffer channel (cap == 0), then if we didn't wait we | |
329 | // need to ACK the sender. If we waited, then the sender waking us up | |
330 | // was already the ACK. | |
331 | let pending_sender2 = if guard.cap == 0 && !waited { | |
332 | match mem::replace(&mut guard.blocker, NoneBlocked) { | |
333 | NoneBlocked => None, | |
334 | BlockedReceiver(..) => unreachable!(), | |
335 | BlockedSender(token) => { | |
336 | guard.canceled.take(); | |
337 | Some(token) | |
338 | } | |
339 | } | |
340 | } else { | |
341 | None | |
342 | }; | |
343 | mem::drop(guard); | |
344 | ||
bd371182 | 345 | // only outside of the lock do we wake up the pending threads |
1a4d82fc JJ |
346 | pending_sender1.map(|t| t.signal()); |
347 | pending_sender2.map(|t| t.signal()); | |
348 | } | |
349 | ||
350 | // Prepares this shared packet for a channel clone, essentially just bumping | |
351 | // a refcount. | |
352 | pub fn clone_chan(&self) { | |
9e0c209e SL |
353 | let old_count = self.channels.fetch_add(1, Ordering::SeqCst); |
354 | ||
355 | // See comments on Arc::clone() on why we do this (for `mem::forget`). | |
356 | if old_count > MAX_REFCOUNT { | |
357 | unsafe { | |
358 | abort(); | |
359 | } | |
360 | } | |
1a4d82fc JJ |
361 | } |
362 | ||
363 | pub fn drop_chan(&self) { | |
364 | // Only flag the channel as disconnected if we're the last channel | |
365 | match self.channels.fetch_sub(1, Ordering::SeqCst) { | |
366 | 1 => {} | |
60c5eb7d | 367 | _ => return, |
1a4d82fc JJ |
368 | } |
369 | ||
370 | // Not much to do other than wake up a receiver if one's there | |
371 | let mut guard = self.lock.lock().unwrap(); | |
60c5eb7d XL |
372 | if guard.disconnected { |
373 | return; | |
374 | } | |
1a4d82fc JJ |
375 | guard.disconnected = true; |
376 | match mem::replace(&mut guard.blocker, NoneBlocked) { | |
377 | NoneBlocked => {} | |
378 | BlockedSender(..) => unreachable!(), | |
379 | BlockedReceiver(token) => wakeup(token, guard), | |
380 | } | |
381 | } | |
382 | ||
383 | pub fn drop_port(&self) { | |
384 | let mut guard = self.lock.lock().unwrap(); | |
385 | ||
60c5eb7d XL |
386 | if guard.disconnected { |
387 | return; | |
388 | } | |
1a4d82fc JJ |
389 | guard.disconnected = true; |
390 | ||
391 | // If the capacity is 0, then the sender may want its data back after | |
392 | // we're disconnected. Otherwise it's now our responsibility to destroy | |
393 | // the buffered data. As with many other portions of this code, this | |
394 | // needs to be careful to destroy the data *outside* of the lock to | |
395 | // prevent deadlock. | |
60c5eb7d XL |
396 | let _data = if guard.cap != 0 { mem::take(&mut guard.buf.buf) } else { Vec::new() }; |
397 | let mut queue = | |
398 | mem::replace(&mut guard.queue, Queue { head: ptr::null_mut(), tail: ptr::null_mut() }); | |
1a4d82fc JJ |
399 | |
400 | let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) { | |
401 | NoneBlocked => None, | |
402 | BlockedSender(token) => { | |
403 | *guard.canceled.take().unwrap() = true; | |
404 | Some(token) | |
405 | } | |
406 | BlockedReceiver(..) => unreachable!(), | |
407 | }; | |
408 | mem::drop(guard); | |
409 | ||
60c5eb7d XL |
410 | while let Some(token) = queue.dequeue() { |
411 | token.signal(); | |
412 | } | |
1a4d82fc JJ |
413 | waiter.map(|t| t.signal()); |
414 | } | |
1a4d82fc JJ |
415 | } |
416 | ||
c34b1796 | 417 | impl<T> Drop for Packet<T> { |
1a4d82fc JJ |
418 | fn drop(&mut self) { |
419 | assert_eq!(self.channels.load(Ordering::SeqCst), 0); | |
420 | let mut guard = self.lock.lock().unwrap(); | |
421 | assert!(guard.queue.dequeue().is_none()); | |
422 | assert!(guard.canceled.is_none()); | |
423 | } | |
424 | } | |
425 | ||
1a4d82fc JJ |
426 | //////////////////////////////////////////////////////////////////////////////// |
427 | // Buffer, a simple ring buffer backed by Vec<T> | |
428 | //////////////////////////////////////////////////////////////////////////////// | |
429 | ||
430 | impl<T> Buffer<T> { | |
431 | fn enqueue(&mut self, t: T) { | |
432 | let pos = (self.start + self.size) % self.buf.len(); | |
433 | self.size += 1; | |
434 | let prev = mem::replace(&mut self.buf[pos], Some(t)); | |
435 | assert!(prev.is_none()); | |
436 | } | |
437 | ||
438 | fn dequeue(&mut self) -> T { | |
439 | let start = self.start; | |
440 | self.size -= 1; | |
441 | self.start = (self.start + 1) % self.buf.len(); | |
442 | let result = &mut self.buf[start]; | |
443 | result.take().unwrap() | |
444 | } | |
445 | ||
60c5eb7d XL |
446 | fn size(&self) -> usize { |
447 | self.size | |
448 | } | |
449 | fn capacity(&self) -> usize { | |
450 | self.buf.len() | |
451 | } | |
1a4d82fc JJ |
452 | } |
453 | ||
454 | //////////////////////////////////////////////////////////////////////////////// | |
bd371182 | 455 | // Queue, a simple queue to enqueue threads with (stack-allocated nodes) |
1a4d82fc JJ |
456 | //////////////////////////////////////////////////////////////////////////////// |
457 | ||
458 | impl Queue { | |
459 | fn enqueue(&mut self, node: &mut Node) -> WaitToken { | |
460 | let (wait_token, signal_token) = blocking::tokens(); | |
461 | node.token = Some(signal_token); | |
85aaf69f | 462 | node.next = ptr::null_mut(); |
1a4d82fc JJ |
463 | |
464 | if self.tail.is_null() { | |
465 | self.head = node as *mut Node; | |
466 | self.tail = node as *mut Node; | |
467 | } else { | |
468 | unsafe { | |
469 | (*self.tail).next = node as *mut Node; | |
470 | self.tail = node as *mut Node; | |
471 | } | |
472 | } | |
473 | ||
474 | wait_token | |
475 | } | |
476 | ||
477 | fn dequeue(&mut self) -> Option<SignalToken> { | |
478 | if self.head.is_null() { | |
60c5eb7d | 479 | return None; |
1a4d82fc JJ |
480 | } |
481 | let node = self.head; | |
482 | self.head = unsafe { (*node).next }; | |
483 | if self.head.is_null() { | |
85aaf69f | 484 | self.tail = ptr::null_mut(); |
1a4d82fc JJ |
485 | } |
486 | unsafe { | |
85aaf69f | 487 | (*node).next = ptr::null_mut(); |
1a4d82fc JJ |
488 | Some((*node).token.take().unwrap()) |
489 | } | |
490 | } | |
491 | } |