]>
Commit | Line | Data |
---|---|---|
58421ec1 | 1 | //! Block file access via a small QEMU restore VM using the PBS block driver in QEMU |
2b7f8dd5 WB |
2 | use std::collections::HashMap; |
3 | use std::fs::{File, OpenOptions}; | |
4 | use std::io::{prelude::*, SeekFrom}; | |
5 | ||
58421ec1 SR |
6 | use anyhow::{bail, Error}; |
7 | use futures::FutureExt; | |
8 | use serde::{Deserialize, Serialize}; | |
9 | use serde_json::json; | |
10 | ||
25877d05 | 11 | use proxmox_sys::fs::lock_file; |
2b7f8dd5 WB |
12 | |
13 | use pbs_client::{DEFAULT_VSOCK_PORT, BackupRepository, VsockClient}; | |
b2065dc7 | 14 | use pbs_datastore::backup_info::BackupDir; |
013b1e8b | 15 | use pbs_datastore::catalog::ArchiveEntry; |
2b7f8dd5 | 16 | |
58421ec1 | 17 | use super::block_driver::*; |
2b7f8dd5 | 18 | use crate::get_user_run_dir; |
58421ec1 SR |
19 | |
20 | const RESTORE_VM_MAP: &str = "restore-vm-map.json"; | |
21 | ||
22 | pub struct QemuBlockDriver {} | |
23 | ||
24 | #[derive(Clone, Hash, Serialize, Deserialize)] | |
25 | struct VMState { | |
26 | pid: i32, | |
27 | cid: i32, | |
28 | ticket: String, | |
29 | } | |
30 | ||
31 | struct VMStateMap { | |
32 | map: HashMap<String, VMState>, | |
33 | file: File, | |
34 | } | |
35 | ||
36 | impl VMStateMap { | |
37 | fn open_file_raw(write: bool) -> Result<File, Error> { | |
38 | use std::os::unix::fs::OpenOptionsExt; | |
39 | let mut path = get_user_run_dir()?; | |
40 | path.push(RESTORE_VM_MAP); | |
41 | OpenOptions::new() | |
42 | .read(true) | |
43 | .write(write) | |
44 | .create(write) | |
45 | .mode(0o600) | |
46 | .open(path) | |
47 | .map_err(Error::from) | |
48 | } | |
49 | ||
50 | /// Acquire a lock on the state map and retrieve a deserialized version | |
51 | fn load() -> Result<Self, Error> { | |
52 | let mut file = Self::open_file_raw(true)?; | |
66501529 | 53 | lock_file(&mut file, true, Some(std::time::Duration::from_secs(120)))?; |
58421ec1 SR |
54 | let map = serde_json::from_reader(&file).unwrap_or_default(); |
55 | Ok(Self { map, file }) | |
56 | } | |
57 | ||
58 | /// Load a read-only copy of the current VM map. Only use for informational purposes, like | |
59 | /// shell auto-completion, for anything requiring consistency use load() ! | |
60 | fn load_read_only() -> Result<HashMap<String, VMState>, Error> { | |
61 | let file = Self::open_file_raw(false)?; | |
62 | Ok(serde_json::from_reader(&file).unwrap_or_default()) | |
63 | } | |
64 | ||
65 | /// Write back a potentially modified state map, consuming the held lock | |
66 | fn write(mut self) -> Result<(), Error> { | |
67 | self.file.seek(SeekFrom::Start(0))?; | |
68 | self.file.set_len(0)?; | |
69 | serde_json::to_writer(self.file, &self.map)?; | |
70 | ||
71 | // drop ourselves including file lock | |
72 | Ok(()) | |
73 | } | |
74 | ||
75 | /// Return the map, but drop the lock immediately | |
76 | fn read_only(self) -> HashMap<String, VMState> { | |
77 | self.map | |
78 | } | |
79 | } | |
80 | ||
81 | fn make_name(repo: &BackupRepository, snap: &BackupDir) -> String { | |
82 | let full = format!("qemu_{}/{}", repo, snap); | |
25877d05 | 83 | proxmox_sys::systemd::escape_unit(&full, false) |
58421ec1 SR |
84 | } |
85 | ||
86 | /// remove non-responsive VMs from given map, returns 'true' if map was modified | |
87 | async fn cleanup_map(map: &mut HashMap<String, VMState>) -> bool { | |
88 | let mut to_remove = Vec::new(); | |
89 | for (name, state) in map.iter() { | |
90 | let client = VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone())); | |
91 | let res = client | |
92 | .get("api2/json/status", Some(json!({"keep-timeout": true}))) | |
93 | .await; | |
94 | if res.is_err() { | |
95 | // VM is not reachable, remove from map and inform user | |
96 | to_remove.push(name.clone()); | |
1011fb55 | 97 | eprintln!( |
58421ec1 SR |
98 | "VM '{}' (pid: {}, cid: {}) was not reachable, removing from map", |
99 | name, state.pid, state.cid | |
100 | ); | |
7d39e471 | 101 | let _ = super::qemu_helper::try_kill_vm(state.pid); |
58421ec1 SR |
102 | } |
103 | } | |
104 | ||
105 | for tr in &to_remove { | |
106 | map.remove(tr); | |
107 | } | |
108 | ||
109 | !to_remove.is_empty() | |
110 | } | |
111 | ||
112 | fn new_ticket() -> String { | |
6ef1b649 | 113 | proxmox_uuid::Uuid::generate().to_string() |
58421ec1 SR |
114 | } |
115 | ||
116 | async fn ensure_running(details: &SnapRestoreDetails) -> Result<VsockClient, Error> { | |
117 | let name = make_name(&details.repo, &details.snapshot); | |
118 | let mut state = VMStateMap::load()?; | |
119 | ||
120 | cleanup_map(&mut state.map).await; | |
121 | ||
122 | let new_cid; | |
123 | let vms = match state.map.get(&name) { | |
124 | Some(vm) => { | |
125 | let client = VsockClient::new(vm.cid, DEFAULT_VSOCK_PORT, Some(vm.ticket.clone())); | |
126 | let res = client.get("api2/json/status", None).await; | |
127 | match res { | |
128 | Ok(_) => { | |
129 | // VM is running and we just reset its timeout, nothing to do | |
130 | return Ok(client); | |
131 | } | |
132 | Err(err) => { | |
1011fb55 | 133 | eprintln!("stale VM detected, restarting ({})", err); |
58421ec1 | 134 | // VM is dead, restart |
7d39e471 | 135 | let _ = super::qemu_helper::try_kill_vm(vm.pid); |
58421ec1 SR |
136 | let vms = start_vm(vm.cid, details).await?; |
137 | new_cid = vms.cid; | |
138 | state.map.insert(name, vms.clone()); | |
139 | vms | |
140 | } | |
141 | } | |
142 | } | |
143 | None => { | |
144 | let mut cid = state | |
145 | .map | |
146 | .iter() | |
147 | .map(|v| v.1.cid) | |
148 | .max() | |
149 | .unwrap_or(0) | |
150 | .wrapping_add(1); | |
151 | ||
152 | // offset cid by user id, to avoid unneccessary retries | |
153 | let running_uid = nix::unistd::Uid::current(); | |
154 | cid = cid.wrapping_add(running_uid.as_raw() as i32); | |
155 | ||
156 | // some low CIDs have special meaning, start at 10 to avoid them | |
157 | cid = cid.max(10); | |
158 | ||
159 | let vms = start_vm(cid, details).await?; | |
160 | new_cid = vms.cid; | |
161 | state.map.insert(name, vms.clone()); | |
162 | vms | |
163 | } | |
164 | }; | |
165 | ||
166 | state.write()?; | |
167 | Ok(VsockClient::new( | |
168 | new_cid, | |
169 | DEFAULT_VSOCK_PORT, | |
170 | Some(vms.ticket.clone()), | |
171 | )) | |
172 | } | |
173 | ||
174 | async fn start_vm(cid_request: i32, details: &SnapRestoreDetails) -> Result<VMState, Error> { | |
175 | let ticket = new_ticket(); | |
176 | let files = details | |
177 | .manifest | |
178 | .files() | |
179 | .iter() | |
180 | .map(|file| file.filename.clone()) | |
181 | .filter(|name| name.ends_with(".img.fidx")); | |
182 | let (pid, cid) = | |
183 | super::qemu_helper::start_vm((cid_request.abs() & 0xFFFF) as u16, details, files, &ticket) | |
184 | .await?; | |
185 | Ok(VMState { pid, cid, ticket }) | |
186 | } | |
187 | ||
188 | impl BlockRestoreDriver for QemuBlockDriver { | |
801ec1db SR |
189 | fn data_list( |
190 | &self, | |
191 | details: SnapRestoreDetails, | |
192 | img_file: String, | |
193 | mut path: Vec<u8>, | |
194 | ) -> Async<Result<Vec<ArchiveEntry>, Error>> { | |
195 | async move { | |
196 | let client = ensure_running(&details).await?; | |
197 | if !path.is_empty() && path[0] != b'/' { | |
198 | path.insert(0, b'/'); | |
199 | } | |
200 | let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>()); | |
201 | let mut result = client | |
202 | .get("api2/json/list", Some(json!({ "path": path }))) | |
203 | .await?; | |
204 | serde_json::from_value(result["data"].take()).map_err(|err| err.into()) | |
205 | } | |
206 | .boxed() | |
207 | } | |
208 | ||
b13089cd SR |
209 | fn data_extract( |
210 | &self, | |
211 | details: SnapRestoreDetails, | |
212 | img_file: String, | |
213 | mut path: Vec<u8>, | |
214 | pxar: bool, | |
215 | ) -> Async<Result<Box<dyn tokio::io::AsyncRead + Unpin + Send>, Error>> { | |
216 | async move { | |
217 | let client = ensure_running(&details).await?; | |
218 | if !path.is_empty() && path[0] != b'/' { | |
219 | path.insert(0, b'/'); | |
220 | } | |
221 | let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>()); | |
222 | let (mut tx, rx) = tokio::io::duplex(1024 * 4096); | |
223 | tokio::spawn(async move { | |
224 | if let Err(err) = client | |
225 | .download( | |
226 | "api2/json/extract", | |
227 | Some(json!({ "path": path, "pxar": pxar })), | |
228 | &mut tx, | |
229 | ) | |
230 | .await | |
231 | { | |
232 | eprintln!("reading file extraction stream failed - {}", err); | |
ddbd63ed | 233 | std::process::exit(1); |
b13089cd SR |
234 | } |
235 | }); | |
236 | ||
237 | Ok(Box::new(rx) as Box<dyn tokio::io::AsyncRead + Unpin + Send>) | |
238 | } | |
239 | .boxed() | |
240 | } | |
241 | ||
58421ec1 SR |
242 | fn status(&self) -> Async<Result<Vec<DriverStatus>, Error>> { |
243 | async move { | |
244 | let mut state_map = VMStateMap::load()?; | |
245 | let modified = cleanup_map(&mut state_map.map).await; | |
246 | let map = if modified { | |
247 | let m = state_map.map.clone(); | |
248 | state_map.write()?; | |
249 | m | |
250 | } else { | |
251 | state_map.read_only() | |
252 | }; | |
253 | let mut result = Vec::new(); | |
254 | ||
255 | for (n, s) in map.iter() { | |
256 | let client = VsockClient::new(s.cid, DEFAULT_VSOCK_PORT, Some(s.ticket.clone())); | |
257 | let resp = client | |
258 | .get("api2/json/status", Some(json!({"keep-timeout": true}))) | |
259 | .await; | |
25877d05 | 260 | let name = proxmox_sys::systemd::unescape_unit(n) |
58421ec1 SR |
261 | .unwrap_or_else(|_| "<invalid name>".to_owned()); |
262 | let mut extra = json!({"pid": s.pid, "cid": s.cid}); | |
263 | ||
264 | match resp { | |
265 | Ok(status) => match status["data"].as_object() { | |
266 | Some(map) => { | |
267 | for (k, v) in map.iter() { | |
268 | extra[k] = v.clone(); | |
269 | } | |
270 | } | |
271 | None => { | |
272 | let err = format!( | |
273 | "invalid JSON received from /status call: {}", | |
274 | status.to_string() | |
275 | ); | |
276 | extra["error"] = json!(err); | |
277 | } | |
278 | }, | |
279 | Err(err) => { | |
280 | let err = format!("error during /status API call: {}", err); | |
281 | extra["error"] = json!(err); | |
282 | } | |
283 | } | |
284 | ||
285 | result.push(DriverStatus { | |
286 | id: name, | |
287 | data: extra, | |
288 | }); | |
289 | } | |
290 | ||
291 | Ok(result) | |
292 | } | |
293 | .boxed() | |
294 | } | |
295 | ||
296 | fn stop(&self, id: String) -> Async<Result<(), Error>> { | |
297 | async move { | |
25877d05 | 298 | let name = proxmox_sys::systemd::escape_unit(&id, false); |
58421ec1 SR |
299 | let mut map = VMStateMap::load()?; |
300 | let map_mod = cleanup_map(&mut map.map).await; | |
301 | match map.map.get(&name) { | |
302 | Some(state) => { | |
303 | let client = | |
304 | VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone())); | |
305 | // ignore errors, this either fails because: | |
306 | // * the VM is unreachable/dead, in which case we don't want it in the map | |
307 | // * the call was successful and the connection reset when the VM stopped | |
308 | let _ = client.get("api2/json/stop", None).await; | |
309 | map.map.remove(&name); | |
310 | map.write()?; | |
311 | } | |
312 | None => { | |
313 | if map_mod { | |
314 | map.write()?; | |
315 | } | |
316 | bail!("VM with name '{}' not found", name); | |
317 | } | |
318 | } | |
319 | Ok(()) | |
320 | } | |
321 | .boxed() | |
322 | } | |
323 | ||
324 | fn list(&self) -> Vec<String> { | |
325 | match VMStateMap::load_read_only() { | |
326 | Ok(state) => state | |
327 | .iter() | |
9a37bd6c | 328 | .filter_map(|(name, _)| proxmox_sys::systemd::unescape_unit(name).ok()) |
58421ec1 SR |
329 | .collect(), |
330 | Err(_) => Vec::new(), | |
331 | } | |
332 | } | |
333 | } |