]>
Commit | Line | Data |
---|---|---|
1a4d82fc JJ |
1 | /// Stream channels |
2 | /// | |
3 | /// This is the flavor of channels which are optimized for one sender and one | |
4 | /// receiver. The sender will be upgraded to a shared channel if the channel is | |
5 | /// cloned. | |
6 | /// | |
7 | /// High level implementation details can be found in the comment of the parent | |
8 | /// module. | |
1a4d82fc | 9 | pub use self::Failure::*; |
1a4d82fc | 10 | use self::Message::*; |
60c5eb7d | 11 | pub use self::UpgradeResult::*; |
1a4d82fc | 12 | |
1a4d82fc | 13 | use core::cmp; |
85aaf69f | 14 | use core::isize; |
532ac7d7 XL |
15 | |
16 | use crate::cell::UnsafeCell; | |
17 | use crate::ptr; | |
18 | use crate::thread; | |
19 | use crate::time::Instant; | |
20 | ||
60c5eb7d | 21 | use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; |
532ac7d7 XL |
22 | use crate::sync::mpsc::blocking::{self, SignalToken}; |
23 | use crate::sync::mpsc::spsc_queue as spsc; | |
60c5eb7d | 24 | use crate::sync::mpsc::Receiver; |
1a4d82fc | 25 | |
85aaf69f | 26 | const DISCONNECTED: isize = isize::MIN; |
1a4d82fc | 27 | #[cfg(test)] |
85aaf69f | 28 | const MAX_STEALS: isize = 5; |
1a4d82fc | 29 | #[cfg(not(test))] |
85aaf69f | 30 | const MAX_STEALS: isize = 1 << 20; |
1a4d82fc JJ |
31 | |
32 | pub struct Packet<T> { | |
abe05a73 XL |
33 | // internal queue for all messages |
34 | queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>, | |
35 | } | |
1a4d82fc | 36 | |
abe05a73 | 37 | struct ProducerAddition { |
60c5eb7d | 38 | cnt: AtomicIsize, // How many items are on this channel |
85aaf69f | 39 | to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up |
1a4d82fc JJ |
40 | |
41 | port_dropped: AtomicBool, // flag if the channel has been destroyed. | |
42 | } | |
43 | ||
abe05a73 | 44 | struct ConsumerAddition { |
60c5eb7d | 45 | steals: UnsafeCell<isize>, // How many times has a port received without blocking? |
abe05a73 XL |
46 | } |
47 | ||
1a4d82fc JJ |
48 | pub enum Failure<T> { |
49 | Empty, | |
50 | Disconnected, | |
51 | Upgraded(Receiver<T>), | |
52 | } | |
53 | ||
54 | pub enum UpgradeResult { | |
55 | UpSuccess, | |
56 | UpDisconnected, | |
57 | UpWoke(SignalToken), | |
58 | } | |
59 | ||
1a4d82fc JJ |
60 | // Any message could contain an "upgrade request" to a new shared port, so the |
61 | // internal queue it's a queue of T, but rather Message<T> | |
62 | enum Message<T> { | |
63 | Data(T), | |
64 | GoUp(Receiver<T>), | |
65 | } | |
66 | ||
c34b1796 | 67 | impl<T> Packet<T> { |
1a4d82fc JJ |
68 | pub fn new() -> Packet<T> { |
69 | Packet { | |
60c5eb7d XL |
70 | queue: unsafe { |
71 | spsc::Queue::with_additions( | |
72 | 128, | |
73 | ProducerAddition { | |
74 | cnt: AtomicIsize::new(0), | |
75 | to_wake: AtomicUsize::new(0), | |
76 | ||
77 | port_dropped: AtomicBool::new(false), | |
78 | }, | |
79 | ConsumerAddition { steals: UnsafeCell::new(0) }, | |
80 | ) | |
81 | }, | |
1a4d82fc JJ |
82 | } |
83 | } | |
84 | ||
476ff2be | 85 | pub fn send(&self, t: T) -> Result<(), T> { |
1a4d82fc JJ |
86 | // If the other port has deterministically gone away, then definitely |
87 | // must return the data back up the stack. Otherwise, the data is | |
88 | // considered as being sent. | |
60c5eb7d XL |
89 | if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { |
90 | return Err(t); | |
91 | } | |
1a4d82fc JJ |
92 | |
93 | match self.do_send(Data(t)) { | |
60c5eb7d XL |
94 | UpSuccess | UpDisconnected => {} |
95 | UpWoke(token) => { | |
96 | token.signal(); | |
97 | } | |
1a4d82fc JJ |
98 | } |
99 | Ok(()) | |
100 | } | |
101 | ||
476ff2be | 102 | pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult { |
1a4d82fc JJ |
103 | // If the port has gone away, then there's no need to proceed any |
104 | // further. | |
abe05a73 | 105 | if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { |
60c5eb7d | 106 | return UpDisconnected; |
abe05a73 | 107 | } |
1a4d82fc JJ |
108 | |
109 | self.do_send(GoUp(up)) | |
110 | } | |
111 | ||
476ff2be | 112 | fn do_send(&self, t: Message<T>) -> UpgradeResult { |
1a4d82fc | 113 | self.queue.push(t); |
abe05a73 | 114 | match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) { |
1a4d82fc JJ |
115 | // As described in the mod's doc comment, -1 == wakeup |
116 | -1 => UpWoke(self.take_to_wake()), | |
117 | // As as described before, SPSC queues must be >= -2 | |
118 | -2 => UpSuccess, | |
119 | ||
120 | // Be sure to preserve the disconnected state, and the return value | |
121 | // in this case is going to be whether our data was received or not. | |
122 | // This manifests itself on whether we have an empty queue or not. | |
123 | // | |
124 | // Primarily, are required to drain the queue here because the port | |
125 | // will never remove this data. We can only have at most one item to | |
126 | // drain (the port drains the rest). | |
127 | DISCONNECTED => { | |
abe05a73 | 128 | self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); |
1a4d82fc JJ |
129 | let first = self.queue.pop(); |
130 | let second = self.queue.pop(); | |
131 | assert!(second.is_none()); | |
132 | ||
133 | match first { | |
134 | Some(..) => UpSuccess, // we failed to send the data | |
135 | None => UpDisconnected, // we successfully sent data | |
136 | } | |
137 | } | |
138 | ||
139 | // Otherwise we just sent some data on a non-waiting queue, so just | |
140 | // make sure the world is sane and carry on! | |
60c5eb7d XL |
141 | n => { |
142 | assert!(n >= 0); | |
143 | UpSuccess | |
144 | } | |
1a4d82fc JJ |
145 | } |
146 | } | |
147 | ||
148 | // Consumes ownership of the 'to_wake' field. | |
476ff2be | 149 | fn take_to_wake(&self) -> SignalToken { |
abe05a73 XL |
150 | let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst); |
151 | self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst); | |
1a4d82fc | 152 | assert!(ptr != 0); |
c34b1796 | 153 | unsafe { SignalToken::cast_from_usize(ptr) } |
1a4d82fc JJ |
154 | } |
155 | ||
156 | // Decrements the count on the channel for a sleeper, returning the sleeper | |
157 | // back if it shouldn't sleep. Note that this is the location where we take | |
158 | // steals into account. | |
476ff2be | 159 | fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> { |
abe05a73 | 160 | assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); |
c34b1796 | 161 | let ptr = unsafe { token.cast_to_usize() }; |
abe05a73 | 162 | self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst); |
1a4d82fc | 163 | |
abe05a73 | 164 | let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) }; |
1a4d82fc | 165 | |
abe05a73 XL |
166 | match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) { |
167 | DISCONNECTED => { | |
168 | self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); | |
169 | } | |
1a4d82fc JJ |
170 | // If we factor in our steals and notice that the channel has no |
171 | // data, we successfully sleep | |
172 | n => { | |
173 | assert!(n >= 0); | |
60c5eb7d XL |
174 | if n - steals <= 0 { |
175 | return Ok(()); | |
176 | } | |
1a4d82fc JJ |
177 | } |
178 | } | |
179 | ||
abe05a73 | 180 | self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst); |
c34b1796 | 181 | Err(unsafe { SignalToken::cast_from_usize(ptr) }) |
1a4d82fc JJ |
182 | } |
183 | ||
476ff2be | 184 | pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> { |
1a4d82fc JJ |
185 | // Optimistic preflight check (scheduling is expensive). |
186 | match self.try_recv() { | |
187 | Err(Empty) => {} | |
188 | data => return data, | |
189 | } | |
190 | ||
bd371182 | 191 | // Welp, our channel has no data. Deschedule the current thread and |
1a4d82fc JJ |
192 | // initiate the blocking protocol. |
193 | let (wait_token, signal_token) = blocking::tokens(); | |
194 | if self.decrement(signal_token).is_ok() { | |
3157f602 XL |
195 | if let Some(deadline) = deadline { |
196 | let timed_out = !wait_token.wait_max_until(deadline); | |
197 | if timed_out { | |
9e0c209e | 198 | self.abort_selection(/* was_upgrade = */ false).map_err(Upgraded)?; |
3157f602 XL |
199 | } |
200 | } else { | |
201 | wait_token.wait(); | |
202 | } | |
1a4d82fc JJ |
203 | } |
204 | ||
205 | match self.try_recv() { | |
206 | // Messages which actually popped from the queue shouldn't count as | |
207 | // a steal, so offset the decrement here (we already have our | |
208 | // "steal" factored into the channel count above). | |
60c5eb7d | 209 | data @ Ok(..) | data @ Err(Upgraded(..)) => unsafe { |
abe05a73 | 210 | *self.queue.consumer_addition().steals.get() -= 1; |
1a4d82fc | 211 | data |
476ff2be | 212 | }, |
1a4d82fc JJ |
213 | |
214 | data => data, | |
215 | } | |
216 | } | |
217 | ||
476ff2be | 218 | pub fn try_recv(&self) -> Result<T, Failure<T>> { |
1a4d82fc JJ |
219 | match self.queue.pop() { |
220 | // If we stole some data, record to that effect (this will be | |
221 | // factored into cnt later on). | |
222 | // | |
223 | // Note that we don't allow steals to grow without bound in order to | |
224 | // prevent eventual overflow of either steals or cnt as an overflow | |
225 | // would have catastrophic results. Sometimes, steals > cnt, but | |
226 | // other times cnt > steals, so we don't know the relation between | |
227 | // steals and cnt. This code path is executed only rarely, so we do | |
228 | // a pretty slow operation, of swapping 0 into cnt, taking steals | |
229 | // down as much as possible (without going negative), and then | |
230 | // adding back in whatever we couldn't factor into steals. | |
476ff2be | 231 | Some(data) => unsafe { |
abe05a73 XL |
232 | if *self.queue.consumer_addition().steals.get() > MAX_STEALS { |
233 | match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) { | |
1a4d82fc | 234 | DISCONNECTED => { |
60c5eb7d XL |
235 | self.queue |
236 | .producer_addition() | |
237 | .cnt | |
238 | .store(DISCONNECTED, Ordering::SeqCst); | |
1a4d82fc JJ |
239 | } |
240 | n => { | |
abe05a73 XL |
241 | let m = cmp::min(n, *self.queue.consumer_addition().steals.get()); |
242 | *self.queue.consumer_addition().steals.get() -= m; | |
1a4d82fc JJ |
243 | self.bump(n - m); |
244 | } | |
245 | } | |
abe05a73 | 246 | assert!(*self.queue.consumer_addition().steals.get() >= 0); |
1a4d82fc | 247 | } |
abe05a73 | 248 | *self.queue.consumer_addition().steals.get() += 1; |
1a4d82fc JJ |
249 | match data { |
250 | Data(t) => Ok(t), | |
251 | GoUp(up) => Err(Upgraded(up)), | |
252 | } | |
476ff2be | 253 | }, |
1a4d82fc JJ |
254 | |
255 | None => { | |
abe05a73 | 256 | match self.queue.producer_addition().cnt.load(Ordering::SeqCst) { |
1a4d82fc JJ |
257 | n if n != DISCONNECTED => Err(Empty), |
258 | ||
259 | // This is a little bit of a tricky case. We failed to pop | |
260 | // data above, and then we have viewed that the channel is | |
261 | // disconnected. In this window more data could have been | |
262 | // sent on the channel. It doesn't really make sense to | |
263 | // return that the channel is disconnected when there's | |
264 | // actually data on it, so be extra sure there's no data by | |
265 | // popping one more time. | |
266 | // | |
267 | // We can ignore steals because the other end is | |
268 | // disconnected and we'll never need to really factor in our | |
269 | // steals again. | |
60c5eb7d XL |
270 | _ => match self.queue.pop() { |
271 | Some(Data(t)) => Ok(t), | |
272 | Some(GoUp(up)) => Err(Upgraded(up)), | |
273 | None => Err(Disconnected), | |
274 | }, | |
1a4d82fc JJ |
275 | } |
276 | } | |
277 | } | |
278 | } | |
279 | ||
476ff2be | 280 | pub fn drop_chan(&self) { |
1a4d82fc JJ |
281 | // Dropping a channel is pretty simple, we just flag it as disconnected |
282 | // and then wakeup a blocker if there is one. | |
abe05a73 | 283 | match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) { |
60c5eb7d XL |
284 | -1 => { |
285 | self.take_to_wake().signal(); | |
286 | } | |
1a4d82fc | 287 | DISCONNECTED => {} |
60c5eb7d XL |
288 | n => { |
289 | assert!(n >= 0); | |
290 | } | |
1a4d82fc JJ |
291 | } |
292 | } | |
293 | ||
476ff2be | 294 | pub fn drop_port(&self) { |
1a4d82fc JJ |
295 | // Dropping a port seems like a fairly trivial thing. In theory all we |
296 | // need to do is flag that we're disconnected and then everything else | |
297 | // can take over (we don't have anyone to wake up). | |
298 | // | |
299 | // The catch for Ports is that we want to drop the entire contents of | |
300 | // the queue. There are multiple reasons for having this property, the | |
301 | // largest of which is that if another chan is waiting in this channel | |
302 | // (but not received yet), then waiting on that port will cause a | |
303 | // deadlock. | |
304 | // | |
305 | // So if we accept that we must now destroy the entire contents of the | |
306 | // queue, this code may make a bit more sense. The tricky part is that | |
307 | // we can't let any in-flight sends go un-dropped, we have to make sure | |
308 | // *everything* is dropped and nothing new will come onto the channel. | |
309 | ||
310 | // The first thing we do is set a flag saying that we're done for. All | |
311 | // sends are gated on this flag, so we're immediately guaranteed that | |
312 | // there are a bounded number of active sends that we'll have to deal | |
313 | // with. | |
abe05a73 | 314 | self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst); |
1a4d82fc JJ |
315 | |
316 | // Now that we're guaranteed to deal with a bounded number of senders, | |
317 | // we need to drain the queue. This draining process happens atomically | |
318 | // with respect to the "count" of the channel. If the count is nonzero | |
319 | // (with steals taken into account), then there must be data on the | |
320 | // channel. In this case we drain everything and then try again. We will | |
321 | // continue to fail while active senders send data while we're dropping | |
322 | // data, but eventually we're guaranteed to break out of this loop | |
323 | // (because there is a bounded number of senders). | |
abe05a73 | 324 | let mut steals = unsafe { *self.queue.consumer_addition().steals.get() }; |
1a4d82fc | 325 | while { |
abe05a73 | 326 | let cnt = self.queue.producer_addition().cnt.compare_and_swap( |
60c5eb7d XL |
327 | steals, |
328 | DISCONNECTED, | |
329 | Ordering::SeqCst, | |
330 | ); | |
1a4d82fc JJ |
331 | cnt != DISCONNECTED && cnt != steals |
332 | } { | |
60c5eb7d XL |
333 | while let Some(_) = self.queue.pop() { |
334 | steals += 1; | |
335 | } | |
1a4d82fc JJ |
336 | } |
337 | ||
338 | // At this point in time, we have gated all future senders from sending, | |
339 | // and we have flagged the channel as being disconnected. The senders | |
340 | // still have some responsibility, however, because some sends may not | |
341 | // complete until after we flag the disconnection. There are more | |
342 | // details in the sending methods that see DISCONNECTED | |
343 | } | |
344 | ||
345 | //////////////////////////////////////////////////////////////////////////// | |
346 | // select implementation | |
347 | //////////////////////////////////////////////////////////////////////////// | |
348 | ||
1a4d82fc | 349 | // increment the count on the channel (used for selection) |
476ff2be | 350 | fn bump(&self, amt: isize) -> isize { |
abe05a73 | 351 | match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) { |
1a4d82fc | 352 | DISCONNECTED => { |
abe05a73 | 353 | self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); |
1a4d82fc JJ |
354 | DISCONNECTED |
355 | } | |
60c5eb7d | 356 | n => n, |
1a4d82fc JJ |
357 | } |
358 | } | |
359 | ||
bd371182 | 360 | // Removes a previous thread from being blocked in this port |
60c5eb7d | 361 | pub fn abort_selection(&self, was_upgrade: bool) -> Result<bool, Receiver<T>> { |
1a4d82fc JJ |
362 | // If we're aborting selection after upgrading from a oneshot, then |
363 | // we're guarantee that no one is waiting. The only way that we could | |
364 | // have seen the upgrade is if data was actually sent on the channel | |
365 | // half again. For us, this means that there is guaranteed to be data on | |
366 | // this channel. Furthermore, we're guaranteed that there was no | |
367 | // start_selection previously, so there's no need to modify `self.cnt` | |
368 | // at all. | |
369 | // | |
370 | // Hence, because of these invariants, we immediately return `Ok(true)`. | |
371 | // Note that the data may not actually be sent on the channel just yet. | |
372 | // The other end could have flagged the upgrade but not sent data to | |
373 | // this end. This is fine because we know it's a small bounded windows | |
374 | // of time until the data is actually sent. | |
375 | if was_upgrade { | |
abe05a73 XL |
376 | assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0); |
377 | assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); | |
60c5eb7d | 378 | return Ok(true); |
1a4d82fc JJ |
379 | } |
380 | ||
381 | // We want to make sure that the count on the channel goes non-negative, | |
382 | // and in the stream case we can have at most one steal, so just assume | |
383 | // that we had one steal. | |
384 | let steals = 1; | |
385 | let prev = self.bump(steals + 1); | |
386 | ||
387 | // If we were previously disconnected, then we know for sure that there | |
bd371182 | 388 | // is no thread in to_wake, so just keep going |
1a4d82fc | 389 | let has_data = if prev == DISCONNECTED { |
abe05a73 | 390 | assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); |
1a4d82fc JJ |
391 | true // there is data, that data is that we're disconnected |
392 | } else { | |
393 | let cur = prev + steals + 1; | |
394 | assert!(cur >= 0); | |
395 | ||
396 | // If the previous count was negative, then we just made things go | |
397 | // positive, hence we passed the -1 boundary and we're responsible | |
398 | // for removing the to_wake() field and trashing it. | |
399 | // | |
400 | // If the previous count was positive then we're in a tougher | |
401 | // situation. A possible race is that a sender just incremented | |
bd371182 | 402 | // through -1 (meaning it's going to try to wake a thread up), but it |
1a4d82fc JJ |
403 | // hasn't yet read the to_wake. In order to prevent a future recv() |
404 | // from waking up too early (this sender picking up the plastered | |
405 | // over to_wake), we spin loop here waiting for to_wake to be 0. | |
406 | // Note that this entire select() implementation needs an overhaul, | |
407 | // and this is *not* the worst part of it, so this is not done as a | |
408 | // final solution but rather out of necessity for now to get | |
409 | // something working. | |
410 | if prev < 0 { | |
411 | drop(self.take_to_wake()); | |
412 | } else { | |
abe05a73 | 413 | while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 { |
85aaf69f | 414 | thread::yield_now(); |
1a4d82fc JJ |
415 | } |
416 | } | |
476ff2be | 417 | unsafe { |
abe05a73 XL |
418 | assert_eq!(*self.queue.consumer_addition().steals.get(), 0); |
419 | *self.queue.consumer_addition().steals.get() = steals; | |
476ff2be | 420 | } |
1a4d82fc JJ |
421 | |
422 | // if we were previously positive, then there's surely data to | |
423 | // receive | |
424 | prev >= 0 | |
425 | }; | |
426 | ||
427 | // Now that we've determined that this queue "has data", we peek at the | |
428 | // queue to see if the data is an upgrade or not. If it's an upgrade, | |
429 | // then we need to destroy this port and abort selection on the | |
430 | // upgraded port. | |
431 | if has_data { | |
432 | match self.queue.peek() { | |
60c5eb7d XL |
433 | Some(&mut GoUp(..)) => match self.queue.pop() { |
434 | Some(GoUp(port)) => Err(port), | |
435 | _ => unreachable!(), | |
436 | }, | |
1a4d82fc JJ |
437 | _ => Ok(true), |
438 | } | |
439 | } else { | |
440 | Ok(false) | |
441 | } | |
442 | } | |
443 | } | |
444 | ||
c34b1796 | 445 | impl<T> Drop for Packet<T> { |
1a4d82fc JJ |
446 | fn drop(&mut self) { |
447 | // Note that this load is not only an assert for correctness about | |
448 | // disconnection, but also a proper fence before the read of | |
449 | // `to_wake`, so this assert cannot be removed with also removing | |
450 | // the `to_wake` assert. | |
abe05a73 XL |
451 | assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED); |
452 | assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); | |
1a4d82fc JJ |
453 | } |
454 | } |