]> git.proxmox.com Git - cargo.git/blob - vendor/crossbeam-channel/src/flavors/after.rs
New upstream version 0.33.0
[cargo.git] / vendor / crossbeam-channel / src / flavors / after.rs
1 //! Channel that delivers a message after a certain amount of time.
2 //!
3 //! Messages cannot be sent into this kind of channel; they are materialized on demand.
4
5 use std::sync::atomic::{AtomicBool, Ordering};
6 use std::thread;
7 use std::time::{Duration, Instant};
8
9 use context::Context;
10 use err::{RecvTimeoutError, TryRecvError};
11 use select::{Operation, SelectHandle, Token};
12 use utils;
13
14 /// Result of a receive operation.
15 pub type AfterToken = Option<Instant>;
16
17 /// Channel that delivers a message after a certain amount of time.
18 pub struct Channel {
19 /// The instant at which the message will be delivered.
20 delivery_time: Instant,
21
22 /// `true` if the message has been received.
23 received: AtomicBool,
24 }
25
26 impl Channel {
27 /// Creates a channel that delivers a message after a certain duration of time.
28 #[inline]
29 pub fn new(dur: Duration) -> Self {
30 Channel {
31 delivery_time: Instant::now() + dur,
32 received: AtomicBool::new(false),
33 }
34 }
35
36 /// Attempts to receive a message without blocking.
37 #[inline]
38 pub fn try_recv(&self) -> Result<Instant, TryRecvError> {
39 // We use relaxed ordering because this is just an optional optimistic check.
40 if self.received.load(Ordering::Relaxed) {
41 // The message has already been received.
42 return Err(TryRecvError::Empty);
43 }
44
45 if Instant::now() < self.delivery_time {
46 // The message was not delivered yet.
47 return Err(TryRecvError::Empty);
48 }
49
50 // Try receiving the message if it is still available.
51 if !self.received.swap(true, Ordering::SeqCst) {
52 // Success! Return delivery time as the message.
53 Ok(self.delivery_time)
54 } else {
55 // The message was already received.
56 Err(TryRecvError::Empty)
57 }
58 }
59
60 /// Receives a message from the channel.
61 #[inline]
62 pub fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
63 // We use relaxed ordering because this is just an optional optimistic check.
64 if self.received.load(Ordering::Relaxed) {
65 // The message has already been received.
66 utils::sleep_until(deadline);
67 return Err(RecvTimeoutError::Timeout);
68 }
69
70 // Wait until the message is received or the deadline is reached.
71 loop {
72 let now = Instant::now();
73
74 // Check if we can receive the next message.
75 if now >= self.delivery_time {
76 break;
77 }
78
79 // Check if the deadline has been reached.
80 if let Some(d) = deadline {
81 if now >= d {
82 return Err(RecvTimeoutError::Timeout);
83 }
84
85 thread::sleep(self.delivery_time.min(d) - now);
86 } else {
87 thread::sleep(self.delivery_time - now);
88 }
89 }
90
91 // Try receiving the message if it is still available.
92 if !self.received.swap(true, Ordering::SeqCst) {
93 // Success! Return the message, which is the instant at which it was delivered.
94 Ok(self.delivery_time)
95 } else {
96 // The message was already received. Block forever.
97 utils::sleep_until(None);
98 unreachable!()
99 }
100 }
101
102 /// Reads a message from the channel.
103 #[inline]
104 pub unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
105 token.after.ok_or(())
106 }
107
108 /// Returns `true` if the channel is empty.
109 #[inline]
110 pub fn is_empty(&self) -> bool {
111 // We use relaxed ordering because this is just an optional optimistic check.
112 if self.received.load(Ordering::Relaxed) {
113 return true;
114 }
115
116 // If the delivery time hasn't been reached yet, the channel is empty.
117 if Instant::now() < self.delivery_time {
118 return true;
119 }
120
121 // The delivery time has been reached. The channel is empty only if the message has already
122 // been received.
123 self.received.load(Ordering::SeqCst)
124 }
125
126 /// Returns `true` if the channel is full.
127 #[inline]
128 pub fn is_full(&self) -> bool {
129 !self.is_empty()
130 }
131
132 /// Returns the number of messages in the channel.
133 #[inline]
134 pub fn len(&self) -> usize {
135 if self.is_empty() {
136 0
137 } else {
138 1
139 }
140 }
141
142 /// Returns the capacity of the channel.
143 #[inline]
144 pub fn capacity(&self) -> Option<usize> {
145 Some(1)
146 }
147 }
148
149 impl SelectHandle for Channel {
150 #[inline]
151 fn try_select(&self, token: &mut Token) -> bool {
152 match self.try_recv() {
153 Ok(msg) => {
154 token.after = Some(msg);
155 true
156 }
157 Err(TryRecvError::Disconnected) => {
158 token.after = None;
159 true
160 }
161 Err(TryRecvError::Empty) => false,
162 }
163 }
164
165 #[inline]
166 fn deadline(&self) -> Option<Instant> {
167 // We use relaxed ordering because this is just an optional optimistic check.
168 if self.received.load(Ordering::Relaxed) {
169 None
170 } else {
171 Some(self.delivery_time)
172 }
173 }
174
175 #[inline]
176 fn register(&self, _oper: Operation, _cx: &Context) -> bool {
177 self.is_ready()
178 }
179
180 #[inline]
181 fn unregister(&self, _oper: Operation) {}
182
183 #[inline]
184 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
185 self.try_select(token)
186 }
187
188 #[inline]
189 fn is_ready(&self) -> bool {
190 !self.is_empty()
191 }
192
193 #[inline]
194 fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
195 self.is_ready()
196 }
197
198 #[inline]
199 fn unwatch(&self, _oper: Operation) {}
200 }