]> git.proxmox.com Git - rustc.git/blob - vendor/tokio/src/runtime/io/mod.rs
New upstream version 1.72.1+dfsg1
[rustc.git] / vendor / tokio / src / runtime / io / mod.rs
1 #![cfg_attr(not(all(feature = "rt", feature = "net")), allow(dead_code))]
2
3 mod registration;
4 pub(crate) use registration::Registration;
5
6 mod scheduled_io;
7 use scheduled_io::ScheduledIo;
8
9 mod metrics;
10
11 use crate::io::interest::Interest;
12 use crate::io::ready::Ready;
13 use crate::runtime::driver;
14 use crate::util::slab::{self, Slab};
15 use crate::{loom::sync::RwLock, util::bit};
16
17 use metrics::IoDriverMetrics;
18
19 use std::fmt;
20 use std::io;
21 use std::time::Duration;
22
23 /// I/O driver, backed by Mio.
24 pub(crate) struct Driver {
25 /// Tracks the number of times `turn` is called. It is safe for this to wrap
26 /// as it is mostly used to determine when to call `compact()`.
27 tick: u8,
28
29 /// True when an event with the signal token is received
30 signal_ready: bool,
31
32 /// Reuse the `mio::Events` value across calls to poll.
33 events: mio::Events,
34
35 /// Primary slab handle containing the state for each resource registered
36 /// with this driver.
37 resources: Slab<ScheduledIo>,
38
39 /// The system event queue.
40 poll: mio::Poll,
41 }
42
43 /// A reference to an I/O driver.
44 pub(crate) struct Handle {
45 /// Registers I/O resources.
46 registry: mio::Registry,
47
48 /// Allocates `ScheduledIo` handles when creating new resources.
49 io_dispatch: RwLock<IoDispatcher>,
50
51 /// Used to wake up the reactor from a call to `turn`.
52 /// Not supported on Wasi due to lack of threading support.
53 #[cfg(not(tokio_wasi))]
54 waker: mio::Waker,
55
56 pub(crate) metrics: IoDriverMetrics,
57 }
58
59 #[derive(Debug)]
60 pub(crate) struct ReadyEvent {
61 tick: u8,
62 pub(crate) ready: Ready,
63 is_shutdown: bool,
64 }
65
66 cfg_net_unix!(
67 impl ReadyEvent {
68 pub(crate) fn with_ready(&self, ready: Ready) -> Self {
69 Self {
70 ready,
71 tick: self.tick,
72 is_shutdown: self.is_shutdown,
73 }
74 }
75 }
76 );
77
78 struct IoDispatcher {
79 allocator: slab::Allocator<ScheduledIo>,
80 is_shutdown: bool,
81 }
82
83 #[derive(Debug, Eq, PartialEq, Clone, Copy)]
84 enum Direction {
85 Read,
86 Write,
87 }
88
89 enum Tick {
90 Set(u8),
91 Clear(u8),
92 }
93
94 // TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
95 // token.
96 const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
97 const TOKEN_SIGNAL: mio::Token = mio::Token(1 + (1 << 31));
98
99 const ADDRESS: bit::Pack = bit::Pack::least_significant(24);
100
101 // Packs the generation value in the `readiness` field.
102 //
103 // The generation prevents a race condition where a slab slot is reused for a
104 // new socket while the I/O driver is about to apply a readiness event. The
105 // generation value is checked when setting new readiness. If the generation do
106 // not match, then the readiness event is discarded.
107 const GENERATION: bit::Pack = ADDRESS.then(7);
108
109 fn _assert_kinds() {
110 fn _assert<T: Send + Sync>() {}
111
112 _assert::<Handle>();
113 }
114
115 // ===== impl Driver =====
116
117 impl Driver {
118 /// Creates a new event loop, returning any error that happened during the
119 /// creation.
120 pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
121 let poll = mio::Poll::new()?;
122 #[cfg(not(tokio_wasi))]
123 let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
124 let registry = poll.registry().try_clone()?;
125
126 let slab = Slab::new();
127 let allocator = slab.allocator();
128
129 let driver = Driver {
130 tick: 0,
131 signal_ready: false,
132 events: mio::Events::with_capacity(nevents),
133 poll,
134 resources: slab,
135 };
136
137 let handle = Handle {
138 registry,
139 io_dispatch: RwLock::new(IoDispatcher::new(allocator)),
140 #[cfg(not(tokio_wasi))]
141 waker,
142 metrics: IoDriverMetrics::default(),
143 };
144
145 Ok((driver, handle))
146 }
147
148 pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
149 let handle = rt_handle.io();
150 self.turn(handle, None);
151 }
152
153 pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
154 let handle = rt_handle.io();
155 self.turn(handle, Some(duration));
156 }
157
158 pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
159 let handle = rt_handle.io();
160
161 if handle.shutdown() {
162 self.resources.for_each(|io| {
163 // If a task is waiting on the I/O resource, notify it that the
164 // runtime is being shutdown. And shutdown will clear all wakers.
165 io.shutdown();
166 });
167 }
168 }
169
170 fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
171 // How often to call `compact()` on the resource slab
172 const COMPACT_INTERVAL: u8 = 255;
173
174 self.tick = self.tick.wrapping_add(1);
175
176 if self.tick == COMPACT_INTERVAL {
177 self.resources.compact()
178 }
179
180 let events = &mut self.events;
181
182 // Block waiting for an event to happen, peeling out how many events
183 // happened.
184 match self.poll.poll(events, max_wait) {
185 Ok(_) => {}
186 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
187 #[cfg(tokio_wasi)]
188 Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
189 // In case of wasm32_wasi this error happens, when trying to poll without subscriptions
190 // just return from the park, as there would be nothing, which wakes us up.
191 }
192 Err(e) => panic!("unexpected error when polling the I/O driver: {:?}", e),
193 }
194
195 // Process all the events that came in, dispatching appropriately
196 let mut ready_count = 0;
197 for event in events.iter() {
198 let token = event.token();
199
200 if token == TOKEN_WAKEUP {
201 // Nothing to do, the event is used to unblock the I/O driver
202 } else if token == TOKEN_SIGNAL {
203 self.signal_ready = true;
204 } else {
205 Self::dispatch(
206 &mut self.resources,
207 self.tick,
208 token,
209 Ready::from_mio(event),
210 );
211 ready_count += 1;
212 }
213 }
214
215 handle.metrics.incr_ready_count_by(ready_count);
216 }
217
218 fn dispatch(resources: &mut Slab<ScheduledIo>, tick: u8, token: mio::Token, ready: Ready) {
219 let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));
220
221 let io = match resources.get(addr) {
222 Some(io) => io,
223 None => return,
224 };
225
226 let res = io.set_readiness(Some(token.0), Tick::Set(tick), |curr| curr | ready);
227
228 if res.is_err() {
229 // token no longer valid!
230 return;
231 }
232
233 io.wake(ready);
234 }
235 }
236
237 impl fmt::Debug for Driver {
238 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239 write!(f, "Driver")
240 }
241 }
242
243 impl Handle {
244 /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
245 /// makes the next call to `turn` return immediately.
246 ///
247 /// This method is intended to be used in situations where a notification
248 /// needs to otherwise be sent to the main reactor. If the reactor is
249 /// currently blocked inside of `turn` then it will wake up and soon return
250 /// after this method has been called. If the reactor is not currently
251 /// blocked in `turn`, then the next call to `turn` will not block and
252 /// return immediately.
253 pub(crate) fn unpark(&self) {
254 #[cfg(not(tokio_wasi))]
255 self.waker.wake().expect("failed to wake I/O driver");
256 }
257
258 /// Registers an I/O resource with the reactor for a given `mio::Ready` state.
259 ///
260 /// The registration token is returned.
261 pub(super) fn add_source(
262 &self,
263 source: &mut impl mio::event::Source,
264 interest: Interest,
265 ) -> io::Result<slab::Ref<ScheduledIo>> {
266 let (address, shared) = self.allocate()?;
267
268 let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));
269
270 self.registry
271 .register(source, mio::Token(token), interest.to_mio())?;
272
273 self.metrics.incr_fd_count();
274
275 Ok(shared)
276 }
277
278 /// Deregisters an I/O resource from the reactor.
279 pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> {
280 self.registry.deregister(source)?;
281
282 self.metrics.dec_fd_count();
283
284 Ok(())
285 }
286
287 /// shutdown the dispatcher.
288 fn shutdown(&self) -> bool {
289 let mut io = self.io_dispatch.write().unwrap();
290 if io.is_shutdown {
291 return false;
292 }
293 io.is_shutdown = true;
294 true
295 }
296
297 fn allocate(&self) -> io::Result<(slab::Address, slab::Ref<ScheduledIo>)> {
298 let io = self.io_dispatch.read().unwrap();
299 if io.is_shutdown {
300 return Err(io::Error::new(
301 io::ErrorKind::Other,
302 crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
303 ));
304 }
305 io.allocator.allocate().ok_or_else(|| {
306 io::Error::new(
307 io::ErrorKind::Other,
308 "reactor at max registered I/O resources",
309 )
310 })
311 }
312 }
313
314 impl fmt::Debug for Handle {
315 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
316 write!(f, "Handle")
317 }
318 }
319
320 // ===== impl IoDispatcher =====
321
322 impl IoDispatcher {
323 fn new(allocator: slab::Allocator<ScheduledIo>) -> Self {
324 Self {
325 allocator,
326 is_shutdown: false,
327 }
328 }
329 }
330
331 impl Direction {
332 pub(super) fn mask(self) -> Ready {
333 match self {
334 Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
335 Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
336 }
337 }
338 }
339
340 // Signal handling
341 cfg_signal_internal_and_unix! {
342 impl Handle {
343 pub(crate) fn register_signal_receiver(&self, receiver: &mut mio::net::UnixStream) -> io::Result<()> {
344 self.registry.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?;
345 Ok(())
346 }
347 }
348
349 impl Driver {
350 pub(crate) fn consume_signal_ready(&mut self) -> bool {
351 let ret = self.signal_ready;
352 self.signal_ready = false;
353 ret
354 }
355 }
356 }