]>
Commit | Line | Data |
---|---|---|
58421ec1 | 1 | //! Block file access via a small QEMU restore VM using the PBS block driver in QEMU |
2b7f8dd5 | 2 | use std::collections::HashMap; |
72220d79 | 3 | use std::ffi::OsStr; |
2b7f8dd5 | 4 | use std::fs::{File, OpenOptions}; |
b300e6fb | 5 | use std::io::{prelude::*, BufReader, BufWriter, SeekFrom}; |
72220d79 DC |
6 | use std::os::unix::prelude::OsStrExt; |
7 | use std::path::Path; | |
2b7f8dd5 | 8 | |
58421ec1 SR |
9 | use anyhow::{bail, Error}; |
10 | use futures::FutureExt; | |
11 | use serde::{Deserialize, Serialize}; | |
12 | use serde_json::json; | |
13 | ||
25877d05 | 14 | use proxmox_sys::fs::lock_file; |
05d22be1 | 15 | use proxmox_systemd; |
2b7f8dd5 | 16 | |
cc900ae2 | 17 | use pbs_api_types::{file_restore::FileRestoreFormat, BackupDir, BackupNamespace}; |
40ea990c | 18 | use pbs_client::{BackupRepository, VsockClient, DEFAULT_VSOCK_PORT}; |
013b1e8b | 19 | use pbs_datastore::catalog::ArchiveEntry; |
2b7f8dd5 | 20 | |
58421ec1 | 21 | use super::block_driver::*; |
2b7f8dd5 | 22 | use crate::get_user_run_dir; |
2f0f3e99 | 23 | use crate::qemu_helper::{self, MAX_MEMORY_DIMM_SIZE}; |
58421ec1 SR |
24 | |
25 | const RESTORE_VM_MAP: &str = "restore-vm-map.json"; | |
26 | ||
27 | pub struct QemuBlockDriver {} | |
28 | ||
29 | #[derive(Clone, Hash, Serialize, Deserialize)] | |
30 | struct VMState { | |
31 | pid: i32, | |
32 | cid: i32, | |
33 | ticket: String, | |
34 | } | |
35 | ||
36 | struct VMStateMap { | |
37 | map: HashMap<String, VMState>, | |
38 | file: File, | |
39 | } | |
40 | ||
41 | impl VMStateMap { | |
42 | fn open_file_raw(write: bool) -> Result<File, Error> { | |
43 | use std::os::unix::fs::OpenOptionsExt; | |
44 | let mut path = get_user_run_dir()?; | |
45 | path.push(RESTORE_VM_MAP); | |
46 | OpenOptions::new() | |
47 | .read(true) | |
48 | .write(write) | |
49 | .create(write) | |
50 | .mode(0o600) | |
51 | .open(path) | |
52 | .map_err(Error::from) | |
53 | } | |
54 | ||
55 | /// Acquire a lock on the state map and retrieve a deserialized version | |
56 | fn load() -> Result<Self, Error> { | |
57 | let mut file = Self::open_file_raw(true)?; | |
66501529 | 58 | lock_file(&mut file, true, Some(std::time::Duration::from_secs(120)))?; |
b300e6fb | 59 | let map = serde_json::from_reader(BufReader::new(&mut file)).unwrap_or_default(); |
58421ec1 SR |
60 | Ok(Self { map, file }) |
61 | } | |
62 | ||
63 | /// Load a read-only copy of the current VM map. Only use for informational purposes, like | |
64 | /// shell auto-completion, for anything requiring consistency use load() ! | |
65 | fn load_read_only() -> Result<HashMap<String, VMState>, Error> { | |
66 | let file = Self::open_file_raw(false)?; | |
b300e6fb | 67 | Ok(serde_json::from_reader(BufReader::new(file)).unwrap_or_default()) |
58421ec1 SR |
68 | } |
69 | ||
70 | /// Write back a potentially modified state map, consuming the held lock | |
71 | fn write(mut self) -> Result<(), Error> { | |
72 | self.file.seek(SeekFrom::Start(0))?; | |
73 | self.file.set_len(0)?; | |
b300e6fb | 74 | serde_json::to_writer(BufWriter::new(&mut self.file), &self.map)?; |
58421ec1 SR |
75 | |
76 | // drop ourselves including file lock | |
77 | Ok(()) | |
78 | } | |
79 | ||
80 | /// Return the map, but drop the lock immediately | |
81 | fn read_only(self) -> HashMap<String, VMState> { | |
82 | self.map | |
83 | } | |
84 | } | |
85 | ||
8ca7cccf WB |
86 | fn make_name(repo: &BackupRepository, ns: &BackupNamespace, snap: &BackupDir) -> String { |
87 | let full = if ns.is_root() { | |
2aaf3ef1 | 88 | format!("qemu_{repo}/{snap}") |
8ca7cccf | 89 | } else { |
2aaf3ef1 | 90 | format!("qemu_{repo}:{ns}/{snap}") |
8ca7cccf | 91 | }; |
05d22be1 | 92 | proxmox_systemd::escape_unit(full, false) |
58421ec1 SR |
93 | } |
94 | ||
95 | /// remove non-responsive VMs from given map, returns 'true' if map was modified | |
96 | async fn cleanup_map(map: &mut HashMap<String, VMState>) -> bool { | |
97 | let mut to_remove = Vec::new(); | |
98 | for (name, state) in map.iter() { | |
99 | let client = VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone())); | |
100 | let res = client | |
101 | .get("api2/json/status", Some(json!({"keep-timeout": true}))) | |
102 | .await; | |
103 | if res.is_err() { | |
104 | // VM is not reachable, remove from map and inform user | |
105 | to_remove.push(name.clone()); | |
095b3c1c | 106 | log::warn!( |
58421ec1 | 107 | "VM '{}' (pid: {}, cid: {}) was not reachable, removing from map", |
b58dd1d4 WB |
108 | name, |
109 | state.pid, | |
110 | state.cid | |
58421ec1 | 111 | ); |
7d39e471 | 112 | let _ = super::qemu_helper::try_kill_vm(state.pid); |
58421ec1 SR |
113 | } |
114 | } | |
115 | ||
116 | for tr in &to_remove { | |
117 | map.remove(tr); | |
118 | } | |
119 | ||
120 | !to_remove.is_empty() | |
121 | } | |
122 | ||
123 | fn new_ticket() -> String { | |
6ef1b649 | 124 | proxmox_uuid::Uuid::generate().to_string() |
58421ec1 SR |
125 | } |
126 | ||
72220d79 | 127 | async fn ensure_running(details: &SnapRestoreDetails) -> Result<(i32, VsockClient), Error> { |
8ca7cccf | 128 | let name = make_name(&details.repo, &details.namespace, &details.snapshot); |
58421ec1 SR |
129 | let mut state = VMStateMap::load()?; |
130 | ||
131 | cleanup_map(&mut state.map).await; | |
132 | ||
133 | let new_cid; | |
134 | let vms = match state.map.get(&name) { | |
135 | Some(vm) => { | |
136 | let client = VsockClient::new(vm.cid, DEFAULT_VSOCK_PORT, Some(vm.ticket.clone())); | |
137 | let res = client.get("api2/json/status", None).await; | |
138 | match res { | |
139 | Ok(_) => { | |
140 | // VM is running and we just reset its timeout, nothing to do | |
72220d79 | 141 | return Ok((vm.cid, client)); |
58421ec1 SR |
142 | } |
143 | Err(err) => { | |
095b3c1c | 144 | log::warn!("stale VM detected, restarting ({})", err); |
58421ec1 | 145 | // VM is dead, restart |
7d39e471 | 146 | let _ = super::qemu_helper::try_kill_vm(vm.pid); |
58421ec1 SR |
147 | let vms = start_vm(vm.cid, details).await?; |
148 | new_cid = vms.cid; | |
149 | state.map.insert(name, vms.clone()); | |
150 | vms | |
151 | } | |
152 | } | |
153 | } | |
154 | None => { | |
155 | let mut cid = state | |
156 | .map | |
157 | .iter() | |
158 | .map(|v| v.1.cid) | |
159 | .max() | |
160 | .unwrap_or(0) | |
161 | .wrapping_add(1); | |
162 | ||
74cad4a8 | 163 | // offset cid by user id, to avoid unnecessary retries |
58421ec1 SR |
164 | let running_uid = nix::unistd::Uid::current(); |
165 | cid = cid.wrapping_add(running_uid.as_raw() as i32); | |
166 | ||
167 | // some low CIDs have special meaning, start at 10 to avoid them | |
168 | cid = cid.max(10); | |
169 | ||
170 | let vms = start_vm(cid, details).await?; | |
171 | new_cid = vms.cid; | |
172 | state.map.insert(name, vms.clone()); | |
173 | vms | |
174 | } | |
175 | }; | |
176 | ||
177 | state.write()?; | |
72220d79 | 178 | Ok(( |
58421ec1 | 179 | new_cid, |
72220d79 | 180 | VsockClient::new(new_cid, DEFAULT_VSOCK_PORT, Some(vms.ticket)), |
58421ec1 SR |
181 | )) |
182 | } | |
183 | ||
72220d79 DC |
184 | fn path_is_zfs(path: &[u8]) -> bool { |
185 | if path.is_empty() { | |
186 | return false; | |
187 | } | |
188 | let path = Path::new(OsStr::from_bytes(path)); | |
189 | let mut components = path.components(); | |
190 | let part = match components.next() { | |
191 | Some(std::path::Component::RootDir) => match components.next() { | |
192 | Some(std::path::Component::Normal(comp)) => comp, | |
193 | _ => return false, | |
194 | }, | |
195 | Some(std::path::Component::Normal(comp)) => comp, | |
196 | _ => return false, | |
197 | }; | |
198 | ||
199 | part == OsStr::new("zpool") && components.next().is_some() | |
200 | } | |
201 | ||
69e3beb9 TL |
202 | async fn handle_extra_guest_memory_needs(cid: i32, path: &[u8]) { |
203 | use std::env::var; | |
204 | match var("PBS_FILE_RESTORE_MEM_HOTPLUG_ALLOW").ok().as_deref() { | |
205 | Some("true") => (), | |
206 | _ => return, // this is opt-in | |
207 | } | |
2f0f3e99 TL |
208 | let size = match var("PBS_FILE_RESTORE_MEM_HOTPLUG_SIZE_MB").map(|v| v.parse::<usize>()) { |
209 | Ok(Ok(size)) if size > MAX_MEMORY_DIMM_SIZE => { | |
210 | log::warn!("limit memory request of {size} to {MAX_MEMORY_DIMM_SIZE}"); | |
211 | MAX_MEMORY_DIMM_SIZE | |
212 | } | |
213 | Ok(Ok(size)) => size, | |
214 | _ => 256, // in practice this means a total of ~ 512 MB depending on disk count | |
215 | }; | |
69e3beb9 TL |
216 | |
217 | if path_is_zfs(path) { | |
2f0f3e99 | 218 | if let Err(err) = qemu_helper::hotplug_memory(cid, size).await { |
69e3beb9 TL |
219 | log::error!("could not increase memory: {err}"); |
220 | } | |
221 | } | |
222 | } | |
223 | ||
58421ec1 SR |
224 | async fn start_vm(cid_request: i32, details: &SnapRestoreDetails) -> Result<VMState, Error> { |
225 | let ticket = new_ticket(); | |
226 | let files = details | |
227 | .manifest | |
228 | .files() | |
229 | .iter() | |
230 | .map(|file| file.filename.clone()) | |
231 | .filter(|name| name.ends_with(".img.fidx")); | |
232 | let (pid, cid) = | |
233 | super::qemu_helper::start_vm((cid_request.abs() & 0xFFFF) as u16, details, files, &ticket) | |
234 | .await?; | |
235 | Ok(VMState { pid, cid, ticket }) | |
236 | } | |
237 | ||
238 | impl BlockRestoreDriver for QemuBlockDriver { | |
801ec1db SR |
239 | fn data_list( |
240 | &self, | |
241 | details: SnapRestoreDetails, | |
242 | img_file: String, | |
243 | mut path: Vec<u8>, | |
244 | ) -> Async<Result<Vec<ArchiveEntry>, Error>> { | |
245 | async move { | |
72220d79 | 246 | let (cid, client) = ensure_running(&details).await?; |
801ec1db SR |
247 | if !path.is_empty() && path[0] != b'/' { |
248 | path.insert(0, b'/'); | |
249 | } | |
69e3beb9 | 250 | handle_extra_guest_memory_needs(cid, &path).await; |
801ec1db SR |
251 | let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>()); |
252 | let mut result = client | |
253 | .get("api2/json/list", Some(json!({ "path": path }))) | |
254 | .await?; | |
255 | serde_json::from_value(result["data"].take()).map_err(|err| err.into()) | |
256 | } | |
257 | .boxed() | |
258 | } | |
259 | ||
b13089cd SR |
260 | fn data_extract( |
261 | &self, | |
262 | details: SnapRestoreDetails, | |
263 | img_file: String, | |
264 | mut path: Vec<u8>, | |
cc900ae2 DC |
265 | format: Option<FileRestoreFormat>, |
266 | zstd: bool, | |
b13089cd SR |
267 | ) -> Async<Result<Box<dyn tokio::io::AsyncRead + Unpin + Send>, Error>> { |
268 | async move { | |
72220d79 | 269 | let (cid, client) = ensure_running(&details).await?; |
b13089cd SR |
270 | if !path.is_empty() && path[0] != b'/' { |
271 | path.insert(0, b'/'); | |
272 | } | |
69e3beb9 | 273 | handle_extra_guest_memory_needs(cid, &path).await; |
b13089cd SR |
274 | let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>()); |
275 | let (mut tx, rx) = tokio::io::duplex(1024 * 4096); | |
cc900ae2 DC |
276 | let mut data = json!({ "path": path, "zstd": zstd }); |
277 | if let Some(format) = format { | |
278 | data["format"] = serde_json::to_value(format)?; | |
279 | } | |
b13089cd SR |
280 | tokio::spawn(async move { |
281 | if let Err(err) = client | |
cc900ae2 | 282 | .download("api2/json/extract", Some(data), &mut tx) |
b13089cd SR |
283 | .await |
284 | { | |
095b3c1c | 285 | log::error!("reading file extraction stream failed - {}", err); |
ddbd63ed | 286 | std::process::exit(1); |
b13089cd SR |
287 | } |
288 | }); | |
289 | ||
290 | Ok(Box::new(rx) as Box<dyn tokio::io::AsyncRead + Unpin + Send>) | |
291 | } | |
292 | .boxed() | |
293 | } | |
294 | ||
58421ec1 SR |
295 | fn status(&self) -> Async<Result<Vec<DriverStatus>, Error>> { |
296 | async move { | |
297 | let mut state_map = VMStateMap::load()?; | |
298 | let modified = cleanup_map(&mut state_map.map).await; | |
299 | let map = if modified { | |
300 | let m = state_map.map.clone(); | |
301 | state_map.write()?; | |
302 | m | |
303 | } else { | |
304 | state_map.read_only() | |
305 | }; | |
306 | let mut result = Vec::new(); | |
307 | ||
308 | for (n, s) in map.iter() { | |
309 | let client = VsockClient::new(s.cid, DEFAULT_VSOCK_PORT, Some(s.ticket.clone())); | |
310 | let resp = client | |
311 | .get("api2/json/status", Some(json!({"keep-timeout": true}))) | |
312 | .await; | |
05d22be1 | 313 | let name = proxmox_systemd::unescape_unit(n) |
58421ec1 SR |
314 | .unwrap_or_else(|_| "<invalid name>".to_owned()); |
315 | let mut extra = json!({"pid": s.pid, "cid": s.cid}); | |
316 | ||
317 | match resp { | |
318 | Ok(status) => match status["data"].as_object() { | |
319 | Some(map) => { | |
320 | for (k, v) in map.iter() { | |
321 | extra[k] = v.clone(); | |
322 | } | |
323 | } | |
324 | None => { | |
2aaf3ef1 | 325 | let err = format!("invalid JSON received from /status call: {status}"); |
58421ec1 SR |
326 | extra["error"] = json!(err); |
327 | } | |
328 | }, | |
329 | Err(err) => { | |
2aaf3ef1 | 330 | let err = format!("error during /status API call: {err}"); |
58421ec1 SR |
331 | extra["error"] = json!(err); |
332 | } | |
333 | } | |
334 | ||
335 | result.push(DriverStatus { | |
336 | id: name, | |
337 | data: extra, | |
338 | }); | |
339 | } | |
340 | ||
341 | Ok(result) | |
342 | } | |
343 | .boxed() | |
344 | } | |
345 | ||
346 | fn stop(&self, id: String) -> Async<Result<(), Error>> { | |
347 | async move { | |
05d22be1 | 348 | let name = proxmox_systemd::escape_unit(&id, false); |
58421ec1 SR |
349 | let mut map = VMStateMap::load()?; |
350 | let map_mod = cleanup_map(&mut map.map).await; | |
351 | match map.map.get(&name) { | |
352 | Some(state) => { | |
353 | let client = | |
354 | VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone())); | |
355 | // ignore errors, this either fails because: | |
356 | // * the VM is unreachable/dead, in which case we don't want it in the map | |
357 | // * the call was successful and the connection reset when the VM stopped | |
358 | let _ = client.get("api2/json/stop", None).await; | |
359 | map.map.remove(&name); | |
360 | map.write()?; | |
361 | } | |
362 | None => { | |
363 | if map_mod { | |
364 | map.write()?; | |
365 | } | |
2aaf3ef1 | 366 | bail!("VM with name '{name}' not found"); |
58421ec1 SR |
367 | } |
368 | } | |
369 | Ok(()) | |
370 | } | |
371 | .boxed() | |
372 | } | |
373 | ||
374 | fn list(&self) -> Vec<String> { | |
375 | match VMStateMap::load_read_only() { | |
376 | Ok(state) => state | |
377 | .iter() | |
05d22be1 | 378 | .filter_map(|(name, _)| proxmox_systemd::unescape_unit(name).ok()) |
58421ec1 SR |
379 | .collect(), |
380 | Err(_) => Vec::new(), | |
381 | } | |
382 | } | |
383 | } |