1 #![cfg_attr(not(all(feature = "rt", feature = "net")), allow(dead_code))]
4 pub(crate) use registration
::Registration
;
7 use scheduled_io
::ScheduledIo
;
11 use crate::io
::interest
::Interest
;
12 use crate::io
::ready
::Ready
;
13 use crate::runtime
::driver
;
14 use crate::util
::slab
::{self, Slab}
;
15 use crate::{loom::sync::RwLock, util::bit}
;
17 use metrics
::IoDriverMetrics
;
21 use std
::time
::Duration
;
23 /// I/O driver, backed by Mio.
24 pub(crate) struct Driver
{
25 /// Tracks the number of times `turn` is called. It is safe for this to wrap
26 /// as it is mostly used to determine when to call `compact()`.
29 /// True when an event with the signal token is received
32 /// Reuse the `mio::Events` value across calls to poll.
35 /// Primary slab handle containing the state for each resource registered
37 resources
: Slab
<ScheduledIo
>,
39 /// The system event queue.
43 /// A reference to an I/O driver.
44 pub(crate) struct Handle
{
45 /// Registers I/O resources.
46 registry
: mio
::Registry
,
48 /// Allocates `ScheduledIo` handles when creating new resources.
49 io_dispatch
: RwLock
<IoDispatcher
>,
51 /// Used to wake up the reactor from a call to `turn`.
52 /// Not supported on Wasi due to lack of threading support.
53 #[cfg(not(tokio_wasi))]
56 pub(crate) metrics
: IoDriverMetrics
,
60 pub(crate) struct ReadyEvent
{
62 pub(crate) ready
: Ready
,
68 pub(crate) fn with_ready(&self, ready
: Ready
) -> Self {
72 is_shutdown
: self.is_shutdown
,
79 allocator
: slab
::Allocator
<ScheduledIo
>,
83 #[derive(Debug, Eq, PartialEq, Clone, Copy)]
94 // TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
96 const TOKEN_WAKEUP
: mio
::Token
= mio
::Token(1 << 31);
97 const TOKEN_SIGNAL
: mio
::Token
= mio
::Token(1 + (1 << 31));
99 const ADDRESS
: bit
::Pack
= bit
::Pack
::least_significant(24);
101 // Packs the generation value in the `readiness` field.
103 // The generation prevents a race condition where a slab slot is reused for a
104 // new socket while the I/O driver is about to apply a readiness event. The
105 // generation value is checked when setting new readiness. If the generation do
106 // not match, then the readiness event is discarded.
107 const GENERATION
: bit
::Pack
= ADDRESS
.then(7);
110 fn _assert
<T
: Send
+ Sync
>() {}
115 // ===== impl Driver =====
118 /// Creates a new event loop, returning any error that happened during the
120 pub(crate) fn new(nevents
: usize) -> io
::Result
<(Driver
, Handle
)> {
121 let poll
= mio
::Poll
::new()?
;
122 #[cfg(not(tokio_wasi))]
123 let waker
= mio
::Waker
::new(poll
.registry(), TOKEN_WAKEUP
)?
;
124 let registry
= poll
.registry().try_clone()?
;
126 let slab
= Slab
::new();
127 let allocator
= slab
.allocator();
129 let driver
= Driver
{
132 events
: mio
::Events
::with_capacity(nevents
),
137 let handle
= Handle
{
139 io_dispatch
: RwLock
::new(IoDispatcher
::new(allocator
)),
140 #[cfg(not(tokio_wasi))]
142 metrics
: IoDriverMetrics
::default(),
148 pub(crate) fn park(&mut self, rt_handle
: &driver
::Handle
) {
149 let handle
= rt_handle
.io();
150 self.turn(handle
, None
);
153 pub(crate) fn park_timeout(&mut self, rt_handle
: &driver
::Handle
, duration
: Duration
) {
154 let handle
= rt_handle
.io();
155 self.turn(handle
, Some(duration
));
158 pub(crate) fn shutdown(&mut self, rt_handle
: &driver
::Handle
) {
159 let handle
= rt_handle
.io();
161 if handle
.shutdown() {
162 self.resources
.for_each(|io
| {
163 // If a task is waiting on the I/O resource, notify it that the
164 // runtime is being shutdown. And shutdown will clear all wakers.
170 fn turn(&mut self, handle
: &Handle
, max_wait
: Option
<Duration
>) {
171 // How often to call `compact()` on the resource slab
172 const COMPACT_INTERVAL
: u8 = 255;
174 self.tick
= self.tick
.wrapping_add(1);
176 if self.tick
== COMPACT_INTERVAL
{
177 self.resources
.compact()
180 let events
= &mut self.events
;
182 // Block waiting for an event to happen, peeling out how many events
184 match self.poll
.poll(events
, max_wait
) {
186 Err(ref e
) if e
.kind() == io
::ErrorKind
::Interrupted
=> {}
188 Err(e
) if e
.kind() == io
::ErrorKind
::InvalidInput
=> {
189 // In case of wasm32_wasi this error happens, when trying to poll without subscriptions
190 // just return from the park, as there would be nothing, which wakes us up.
192 Err(e
) => panic
!("unexpected error when polling the I/O driver: {:?}", e
),
195 // Process all the events that came in, dispatching appropriately
196 let mut ready_count
= 0;
197 for event
in events
.iter() {
198 let token
= event
.token();
200 if token
== TOKEN_WAKEUP
{
201 // Nothing to do, the event is used to unblock the I/O driver
202 } else if token
== TOKEN_SIGNAL
{
203 self.signal_ready
= true;
209 Ready
::from_mio(event
),
215 handle
.metrics
.incr_ready_count_by(ready_count
);
218 fn dispatch(resources
: &mut Slab
<ScheduledIo
>, tick
: u8, token
: mio
::Token
, ready
: Ready
) {
219 let addr
= slab
::Address
::from_usize(ADDRESS
.unpack(token
.0));
221 let io
= match resources
.get(addr
) {
226 let res
= io
.set_readiness(Some(token
.0), Tick
::Set(tick
), |curr
| curr
| ready
);
229 // token no longer valid!
237 impl fmt
::Debug
for Driver
{
238 fn fmt(&self, f
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
244 /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
245 /// makes the next call to `turn` return immediately.
247 /// This method is intended to be used in situations where a notification
248 /// needs to otherwise be sent to the main reactor. If the reactor is
249 /// currently blocked inside of `turn` then it will wake up and soon return
250 /// after this method has been called. If the reactor is not currently
251 /// blocked in `turn`, then the next call to `turn` will not block and
252 /// return immediately.
253 pub(crate) fn unpark(&self) {
254 #[cfg(not(tokio_wasi))]
255 self.waker
.wake().expect("failed to wake I/O driver");
258 /// Registers an I/O resource with the reactor for a given `mio::Ready` state.
260 /// The registration token is returned.
261 pub(super) fn add_source(
263 source
: &mut impl mio
::event
::Source
,
265 ) -> io
::Result
<slab
::Ref
<ScheduledIo
>> {
266 let (address
, shared
) = self.allocate()?
;
268 let token
= GENERATION
.pack(shared
.generation(), ADDRESS
.pack(address
.as_usize(), 0));
271 .register(source
, mio
::Token(token
), interest
.to_mio())?
;
273 self.metrics
.incr_fd_count();
278 /// Deregisters an I/O resource from the reactor.
279 pub(super) fn deregister_source(&self, source
: &mut impl mio
::event
::Source
) -> io
::Result
<()> {
280 self.registry
.deregister(source
)?
;
282 self.metrics
.dec_fd_count();
287 /// shutdown the dispatcher.
288 fn shutdown(&self) -> bool
{
289 let mut io
= self.io_dispatch
.write().unwrap();
293 io
.is_shutdown
= true;
297 fn allocate(&self) -> io
::Result
<(slab
::Address
, slab
::Ref
<ScheduledIo
>)> {
298 let io
= self.io_dispatch
.read().unwrap();
300 return Err(io
::Error
::new(
301 io
::ErrorKind
::Other
,
302 crate::util
::error
::RUNTIME_SHUTTING_DOWN_ERROR
,
305 io
.allocator
.allocate().ok_or_else(|| {
307 io
::ErrorKind
::Other
,
308 "reactor at max registered I/O resources",
314 impl fmt
::Debug
for Handle
{
315 fn fmt(&self, f
: &mut fmt
::Formatter
<'_
>) -> fmt
::Result
{
320 // ===== impl IoDispatcher =====
323 fn new(allocator
: slab
::Allocator
<ScheduledIo
>) -> Self {
332 pub(super) fn mask(self) -> Ready
{
334 Direction
::Read
=> Ready
::READABLE
| Ready
::READ_CLOSED
,
335 Direction
::Write
=> Ready
::WRITABLE
| Ready
::WRITE_CLOSED
,
341 cfg_signal_internal_and_unix
! {
343 pub(crate) fn register_signal_receiver(&self, receiver
: &mut mio
::net
::UnixStream
) -> io
::Result
<()> {
344 self.registry
.register(receiver
, TOKEN_SIGNAL
, mio
::Interest
::READABLE
)?
;
350 pub(crate) fn consume_signal_ready(&mut self) -> bool
{
351 let ret
= self.signal_ready
;
352 self.signal_ready
= false;