]> git.proxmox.com Git - proxmox-backup.git/blame - qemu-io/src/aio_context/standalone.rs
add qemu-io crate, AioContext reactor helper
[proxmox-backup.git] / qemu-io / src / aio_context / standalone.rs
CommitLineData
8bf7342c
WB
1//! This implements the parts of qemu's AioContext interface we need for testing outside qemu.
2
3use std::collections::HashMap;
4use std::os::unix::io::RawFd;
5use std::sync::{Arc, Mutex, RwLock};
6use std::thread;
7
8use failure::Error;
9use mio::{Events, Poll, Token};
10use mio::unix::EventedFd;
11
12use crate::util::{AioCb, AioHandlerState};
13
14/// This is a reference to a standalone `AioContextImpl` and allows instantiating a new context
15/// with a polling thread.
16#[derive(Clone)]
17#[repr(transparent)]
18pub struct AioContext(Arc<AioContextImpl>);
19
20impl std::ops::Deref for AioContext {
21 type Target = AioContextImpl;
22
23 fn deref(&self) -> &Self::Target {
24 &*self.0
25 }
26}
27
28impl AioContext {
29 /// Create a new `AioContext` instance with an associated polling thread, which will live as
30 /// long as there are references to it.
31 pub fn new() -> Result<Self, Error> {
32 Ok(Self(AioContextImpl::new()?))
33 }
34}
35
36pub struct AioContextImpl {
37 poll: Poll,
38 handlers: RwLock<HashMap<Token, AioHandlerState>>,
39 poll_thread: Mutex<Option<thread::JoinHandle<()>>>,
40}
41
42impl AioContextImpl {
43 pub fn new() -> Result<Arc<Self>, Error> {
44 let this = Arc::new(Self {
45 poll: Poll::new()?,
46 handlers: RwLock::new(HashMap::new()),
47 poll_thread: Mutex::new(None),
48 });
49
50 let this2 = Arc::clone(&this);
51 this.poll_thread.lock().unwrap().replace(thread::spawn(|| this2.main_loop()));
52
53 Ok(this)
54 }
55
56 /// Qemu's aio_set_fd_handler. We're skipping the `io_poll` parameter for this implementation
57 /// as we don't use it.
58 /// ```
59 /// void aio_set_fd_handler(AioContext *ctx,
60 /// int fd,
61 /// bool is_external,
62 /// IOHandler *io_read,
63 /// IOHandler *io_write,
64 /// AioPollFn *io_poll,
65 /// void *opaque);
66 /// ```
67 ///
68 /// Since this does not have any ways of returning errors, wrong usage will cause a panic in
69 /// this test implementation.
70 pub fn set_fd_handler(
71 &self,
72 fd: RawFd,
73 io_read: Option<AioCb>,
74 io_write: Option<AioCb>,
75 // skipping io_poll,
76 //opaque: *const (),
77 ) {
78 self.set_fd_handler_impl(fd, io_read, io_write, mio::PollOpt::level())
79 }
80
81 /// This is going to be a proposed new api for Qemu's AioContext.
82 pub fn set_fd_handler_edge(
83 &self,
84 fd: RawFd,
85 io_read: Option<AioCb>,
86 io_write: Option<AioCb>,
87 // skipping io_poll,
88 //opaque: *const (),
89 ) {
90 self.set_fd_handler_impl(fd, io_read, io_write, mio::PollOpt::edge())
91 }
92
93 fn set_fd_handler_impl(
94 &self,
95 fd: RawFd,
96 io_read: Option<AioCb>,
97 io_write: Option<AioCb>,
98 // skipping io_poll,
99 //opaque: *const (),
100 poll_opt: mio::PollOpt,
101 ) {
102 if io_read.is_none() && io_write.is_none() {
103 return self.remove_fd_handler(fd);
104 }
105
106 let handlers = AioHandlerState {
107 read: io_read,
108 write: io_write,
109 };
110
111 let mio_ready = handlers.mio_ready();
112
113 let token = Token(fd as usize);
114
115 use std::collections::hash_map::Entry;
116 match self.handlers.write().unwrap().entry(token) {
117 Entry::Vacant(entry) => {
118 self.poll.register(&EventedFd(&fd), token, mio_ready, poll_opt)
119 .expect("failed to register a new fd for polling");
120 entry.insert(handlers);
121 }
122 Entry::Occupied(mut entry) => {
123 self.poll.reregister(&EventedFd(&fd), token, mio_ready, poll_opt)
124 .expect("failed to update an existing poll fd");
125 entry.insert(handlers);
126 }
127 }
128 }
129
130 fn remove_fd_handler(&self, fd: RawFd) {
131 let mut guard = self.handlers.write().unwrap();
132 self.poll.deregister(&EventedFd(&fd))
133 .expect("failed to remove an existing poll fd");
134 guard.remove(&Token(fd as usize));
135 }
136
137 /// We don't use qemu's aio_poll, so let's make this easy:
138 ///
139 /// ```
140 /// bool aio_poll(AioContext *ctx, bool blocking);
141 /// ```
142 pub fn poll(&self) -> Result<(), Error> {
143 let timeout = Some(std::time::Duration::from_millis(100));
144
145 let mut events = Events::with_capacity(16);
146
147 if self.poll.poll(&mut events, timeout)? == 0 {
148 return Ok(());
149 }
150
151 for event in events.iter() {
152 let token = event.token();
153 let ready = event.readiness();
154 // NOTE: We need to read-lock while fetching handlers, but handlers need a write-lock!!!
155 // because they need to be edge-triggered and therefore *update* this handler list!
156 //
157 // While we could instead do this here (or use edge triggering from mio), this would
158 // not properly simulate Qemu's AioContext, so we enforce this behavior here as well.
159 //
160 // This means we cannot just hold a read lock during the events.iter() iteration
161 // though.
162 let handler = self.handlers.read().unwrap().get(&token).map(|h| AioHandlerState {
163 // Those are Option<Arc>!
164 read: h.read.clone(),
165 write: h.write.clone(),
166 });
167 if let Some(handler) = handler {
168 if ready.is_readable() {
169 handler.read.as_ref().map(|func| func());
170 }
171 if ready.is_writable() {
172 handler.write.as_ref().map(|func| func());
173 }
174 }
175 }
176
177 Ok(())
178 }
179
180 fn main_loop(mut self: Arc<Self>) {
181 while Arc::get_mut(&mut self).is_none() {
182 if let Err(err) = self.poll() {
183 dbg!("error AioContextImpl::poll(): {}", err);
184 break;
185 }
186 }
187 }
188}