]>
Commit | Line | Data |
---|---|---|
8bf7342c WB |
1 | //! This implements the parts of qemu's AioContext interface we need for testing outside qemu. |
2 | ||
3 | use std::collections::HashMap; | |
4 | use std::os::unix::io::RawFd; | |
5 | use std::sync::{Arc, Mutex, RwLock}; | |
6 | use std::thread; | |
7 | ||
8 | use failure::Error; | |
9 | use mio::{Events, Poll, Token}; | |
10 | use mio::unix::EventedFd; | |
11 | ||
12 | use crate::util::{AioCb, AioHandlerState}; | |
13 | ||
14 | /// This is a reference to a standalone `AioContextImpl` and allows instantiating a new context | |
15 | /// with a polling thread. | |
16 | #[derive(Clone)] | |
17 | #[repr(transparent)] | |
18 | pub struct AioContext(Arc<AioContextImpl>); | |
19 | ||
20 | impl std::ops::Deref for AioContext { | |
21 | type Target = AioContextImpl; | |
22 | ||
23 | fn deref(&self) -> &Self::Target { | |
24 | &*self.0 | |
25 | } | |
26 | } | |
27 | ||
28 | impl AioContext { | |
29 | /// Create a new `AioContext` instance with an associated polling thread, which will live as | |
30 | /// long as there are references to it. | |
31 | pub fn new() -> Result<Self, Error> { | |
32 | Ok(Self(AioContextImpl::new()?)) | |
33 | } | |
34 | } | |
35 | ||
36 | pub struct AioContextImpl { | |
37 | poll: Poll, | |
38 | handlers: RwLock<HashMap<Token, AioHandlerState>>, | |
39 | poll_thread: Mutex<Option<thread::JoinHandle<()>>>, | |
40 | } | |
41 | ||
42 | impl AioContextImpl { | |
43 | pub fn new() -> Result<Arc<Self>, Error> { | |
44 | let this = Arc::new(Self { | |
45 | poll: Poll::new()?, | |
46 | handlers: RwLock::new(HashMap::new()), | |
47 | poll_thread: Mutex::new(None), | |
48 | }); | |
49 | ||
50 | let this2 = Arc::clone(&this); | |
51 | this.poll_thread.lock().unwrap().replace(thread::spawn(|| this2.main_loop())); | |
52 | ||
53 | Ok(this) | |
54 | } | |
55 | ||
56 | /// Qemu's aio_set_fd_handler. We're skipping the `io_poll` parameter for this implementation | |
57 | /// as we don't use it. | |
58 | /// ``` | |
59 | /// void aio_set_fd_handler(AioContext *ctx, | |
60 | /// int fd, | |
61 | /// bool is_external, | |
62 | /// IOHandler *io_read, | |
63 | /// IOHandler *io_write, | |
64 | /// AioPollFn *io_poll, | |
65 | /// void *opaque); | |
66 | /// ``` | |
67 | /// | |
68 | /// Since this does not have any ways of returning errors, wrong usage will cause a panic in | |
69 | /// this test implementation. | |
70 | pub fn set_fd_handler( | |
71 | &self, | |
72 | fd: RawFd, | |
73 | io_read: Option<AioCb>, | |
74 | io_write: Option<AioCb>, | |
75 | // skipping io_poll, | |
76 | //opaque: *const (), | |
77 | ) { | |
78 | self.set_fd_handler_impl(fd, io_read, io_write, mio::PollOpt::level()) | |
79 | } | |
80 | ||
81 | /// This is going to be a proposed new api for Qemu's AioContext. | |
82 | pub fn set_fd_handler_edge( | |
83 | &self, | |
84 | fd: RawFd, | |
85 | io_read: Option<AioCb>, | |
86 | io_write: Option<AioCb>, | |
87 | // skipping io_poll, | |
88 | //opaque: *const (), | |
89 | ) { | |
90 | self.set_fd_handler_impl(fd, io_read, io_write, mio::PollOpt::edge()) | |
91 | } | |
92 | ||
93 | fn set_fd_handler_impl( | |
94 | &self, | |
95 | fd: RawFd, | |
96 | io_read: Option<AioCb>, | |
97 | io_write: Option<AioCb>, | |
98 | // skipping io_poll, | |
99 | //opaque: *const (), | |
100 | poll_opt: mio::PollOpt, | |
101 | ) { | |
102 | if io_read.is_none() && io_write.is_none() { | |
103 | return self.remove_fd_handler(fd); | |
104 | } | |
105 | ||
106 | let handlers = AioHandlerState { | |
107 | read: io_read, | |
108 | write: io_write, | |
109 | }; | |
110 | ||
111 | let mio_ready = handlers.mio_ready(); | |
112 | ||
113 | let token = Token(fd as usize); | |
114 | ||
115 | use std::collections::hash_map::Entry; | |
116 | match self.handlers.write().unwrap().entry(token) { | |
117 | Entry::Vacant(entry) => { | |
118 | self.poll.register(&EventedFd(&fd), token, mio_ready, poll_opt) | |
119 | .expect("failed to register a new fd for polling"); | |
120 | entry.insert(handlers); | |
121 | } | |
122 | Entry::Occupied(mut entry) => { | |
123 | self.poll.reregister(&EventedFd(&fd), token, mio_ready, poll_opt) | |
124 | .expect("failed to update an existing poll fd"); | |
125 | entry.insert(handlers); | |
126 | } | |
127 | } | |
128 | } | |
129 | ||
130 | fn remove_fd_handler(&self, fd: RawFd) { | |
131 | let mut guard = self.handlers.write().unwrap(); | |
132 | self.poll.deregister(&EventedFd(&fd)) | |
133 | .expect("failed to remove an existing poll fd"); | |
134 | guard.remove(&Token(fd as usize)); | |
135 | } | |
136 | ||
137 | /// We don't use qemu's aio_poll, so let's make this easy: | |
138 | /// | |
139 | /// ``` | |
140 | /// bool aio_poll(AioContext *ctx, bool blocking); | |
141 | /// ``` | |
142 | pub fn poll(&self) -> Result<(), Error> { | |
143 | let timeout = Some(std::time::Duration::from_millis(100)); | |
144 | ||
145 | let mut events = Events::with_capacity(16); | |
146 | ||
147 | if self.poll.poll(&mut events, timeout)? == 0 { | |
148 | return Ok(()); | |
149 | } | |
150 | ||
151 | for event in events.iter() { | |
152 | let token = event.token(); | |
153 | let ready = event.readiness(); | |
154 | // NOTE: We need to read-lock while fetching handlers, but handlers need a write-lock!!! | |
155 | // because they need to be edge-triggered and therefore *update* this handler list! | |
156 | // | |
157 | // While we could instead do this here (or use edge triggering from mio), this would | |
158 | // not properly simulate Qemu's AioContext, so we enforce this behavior here as well. | |
159 | // | |
160 | // This means we cannot just hold a read lock during the events.iter() iteration | |
161 | // though. | |
162 | let handler = self.handlers.read().unwrap().get(&token).map(|h| AioHandlerState { | |
163 | // Those are Option<Arc>! | |
164 | read: h.read.clone(), | |
165 | write: h.write.clone(), | |
166 | }); | |
167 | if let Some(handler) = handler { | |
168 | if ready.is_readable() { | |
169 | handler.read.as_ref().map(|func| func()); | |
170 | } | |
171 | if ready.is_writable() { | |
172 | handler.write.as_ref().map(|func| func()); | |
173 | } | |
174 | } | |
175 | } | |
176 | ||
177 | Ok(()) | |
178 | } | |
179 | ||
180 | fn main_loop(mut self: Arc<Self>) { | |
181 | while Arc::get_mut(&mut self).is_none() { | |
182 | if let Err(err) = self.poll() { | |
183 | dbg!("error AioContextImpl::poll(): {}", err); | |
184 | break; | |
185 | } | |
186 | } | |
187 | } | |
188 | } |