1 //! Channel that delivers a message after a certain amount of time.
3 //! Messages cannot be sent into this kind of channel; they are materialized on demand.
5 use std
::sync
::atomic
::{AtomicBool, Ordering}
;
7 use std
::time
::{Duration, Instant}
;
10 use err
::{RecvTimeoutError, TryRecvError}
;
11 use select
::{Operation, SelectHandle, Token}
;
14 /// Result of a receive operation.
15 pub type AfterToken
= Option
<Instant
>;
17 /// Channel that delivers a message after a certain amount of time.
19 /// The instant at which the message will be delivered.
20 delivery_time
: Instant
,
22 /// `true` if the message has been received.
27 /// Creates a channel that delivers a message after a certain duration of time.
29 pub fn new(dur
: Duration
) -> Self {
31 delivery_time
: Instant
::now() + dur
,
32 received
: AtomicBool
::new(false),
36 /// Attempts to receive a message without blocking.
38 pub fn try_recv(&self) -> Result
<Instant
, TryRecvError
> {
39 // We use relaxed ordering because this is just an optional optimistic check.
40 if self.received
.load(Ordering
::Relaxed
) {
41 // The message has already been received.
42 return Err(TryRecvError
::Empty
);
45 if Instant
::now() < self.delivery_time
{
46 // The message was not delivered yet.
47 return Err(TryRecvError
::Empty
);
50 // Try receiving the message if it is still available.
51 if !self.received
.swap(true, Ordering
::SeqCst
) {
52 // Success! Return delivery time as the message.
53 Ok(self.delivery_time
)
55 // The message was already received.
56 Err(TryRecvError
::Empty
)
60 /// Receives a message from the channel.
62 pub fn recv(&self, deadline
: Option
<Instant
>) -> Result
<Instant
, RecvTimeoutError
> {
63 // We use relaxed ordering because this is just an optional optimistic check.
64 if self.received
.load(Ordering
::Relaxed
) {
65 // The message has already been received.
66 utils
::sleep_until(deadline
);
67 return Err(RecvTimeoutError
::Timeout
);
70 // Wait until the message is received or the deadline is reached.
72 let now
= Instant
::now();
74 // Check if we can receive the next message.
75 if now
>= self.delivery_time
{
79 // Check if the deadline has been reached.
80 if let Some(d
) = deadline
{
82 return Err(RecvTimeoutError
::Timeout
);
85 thread
::sleep(self.delivery_time
.min(d
) - now
);
87 thread
::sleep(self.delivery_time
- now
);
91 // Try receiving the message if it is still available.
92 if !self.received
.swap(true, Ordering
::SeqCst
) {
93 // Success! Return the message, which is the instant at which it was delivered.
94 Ok(self.delivery_time
)
96 // The message was already received. Block forever.
97 utils
::sleep_until(None
);
102 /// Reads a message from the channel.
104 pub unsafe fn read(&self, token
: &mut Token
) -> Result
<Instant
, ()> {
105 token
.after
.ok_or(())
108 /// Returns `true` if the channel is empty.
110 pub fn is_empty(&self) -> bool
{
111 // We use relaxed ordering because this is just an optional optimistic check.
112 if self.received
.load(Ordering
::Relaxed
) {
116 // If the delivery time hasn't been reached yet, the channel is empty.
117 if Instant
::now() < self.delivery_time
{
121 // The delivery time has been reached. The channel is empty only if the message has already
123 self.received
.load(Ordering
::SeqCst
)
126 /// Returns `true` if the channel is full.
128 pub fn is_full(&self) -> bool
{
132 /// Returns the number of messages in the channel.
134 pub fn len(&self) -> usize {
142 /// Returns the capacity of the channel.
144 pub fn capacity(&self) -> Option
<usize> {
149 impl SelectHandle
for Channel
{
151 fn try_select(&self, token
: &mut Token
) -> bool
{
152 match self.try_recv() {
154 token
.after
= Some(msg
);
157 Err(TryRecvError
::Disconnected
) => {
161 Err(TryRecvError
::Empty
) => false,
166 fn deadline(&self) -> Option
<Instant
> {
167 // We use relaxed ordering because this is just an optional optimistic check.
168 if self.received
.load(Ordering
::Relaxed
) {
171 Some(self.delivery_time
)
176 fn register(&self, _oper
: Operation
, _cx
: &Context
) -> bool
{
181 fn unregister(&self, _oper
: Operation
) {}
184 fn accept(&self, token
: &mut Token
, _cx
: &Context
) -> bool
{
185 self.try_select(token
)
189 fn is_ready(&self) -> bool
{
194 fn watch(&self, _oper
: Operation
, _cx
: &Context
) -> bool
{
199 fn unwatch(&self, _oper
: Operation
) {}