]> git.proxmox.com Git - proxmox-backup.git/blame - proxmox-file-restore/src/block_driver_qemu.rs
update to rrd-api-types 1.0.2
[proxmox-backup.git] / proxmox-file-restore / src / block_driver_qemu.rs
CommitLineData
58421ec1 1//! Block file access via a small QEMU restore VM using the PBS block driver in QEMU
2b7f8dd5 2use std::collections::HashMap;
72220d79 3use std::ffi::OsStr;
2b7f8dd5 4use std::fs::{File, OpenOptions};
b300e6fb 5use std::io::{prelude::*, BufReader, BufWriter, SeekFrom};
72220d79
DC
6use std::os::unix::prelude::OsStrExt;
7use std::path::Path;
2b7f8dd5 8
58421ec1
SR
9use anyhow::{bail, Error};
10use futures::FutureExt;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13
25877d05 14use proxmox_sys::fs::lock_file;
05d22be1 15use proxmox_systemd;
2b7f8dd5 16
cc900ae2 17use pbs_api_types::{file_restore::FileRestoreFormat, BackupDir, BackupNamespace};
40ea990c 18use pbs_client::{BackupRepository, VsockClient, DEFAULT_VSOCK_PORT};
013b1e8b 19use pbs_datastore::catalog::ArchiveEntry;
2b7f8dd5 20
58421ec1 21use super::block_driver::*;
2b7f8dd5 22use crate::get_user_run_dir;
2f0f3e99 23use crate::qemu_helper::{self, MAX_MEMORY_DIMM_SIZE};
58421ec1
SR
24
25const RESTORE_VM_MAP: &str = "restore-vm-map.json";
26
27pub struct QemuBlockDriver {}
28
29#[derive(Clone, Hash, Serialize, Deserialize)]
30struct VMState {
31 pid: i32,
32 cid: i32,
33 ticket: String,
34}
35
36struct VMStateMap {
37 map: HashMap<String, VMState>,
38 file: File,
39}
40
41impl VMStateMap {
42 fn open_file_raw(write: bool) -> Result<File, Error> {
43 use std::os::unix::fs::OpenOptionsExt;
44 let mut path = get_user_run_dir()?;
45 path.push(RESTORE_VM_MAP);
46 OpenOptions::new()
47 .read(true)
48 .write(write)
49 .create(write)
50 .mode(0o600)
51 .open(path)
52 .map_err(Error::from)
53 }
54
55 /// Acquire a lock on the state map and retrieve a deserialized version
56 fn load() -> Result<Self, Error> {
57 let mut file = Self::open_file_raw(true)?;
66501529 58 lock_file(&mut file, true, Some(std::time::Duration::from_secs(120)))?;
b300e6fb 59 let map = serde_json::from_reader(BufReader::new(&mut file)).unwrap_or_default();
58421ec1
SR
60 Ok(Self { map, file })
61 }
62
63 /// Load a read-only copy of the current VM map. Only use for informational purposes, like
64 /// shell auto-completion, for anything requiring consistency use load() !
65 fn load_read_only() -> Result<HashMap<String, VMState>, Error> {
66 let file = Self::open_file_raw(false)?;
b300e6fb 67 Ok(serde_json::from_reader(BufReader::new(file)).unwrap_or_default())
58421ec1
SR
68 }
69
70 /// Write back a potentially modified state map, consuming the held lock
71 fn write(mut self) -> Result<(), Error> {
72 self.file.seek(SeekFrom::Start(0))?;
73 self.file.set_len(0)?;
b300e6fb 74 serde_json::to_writer(BufWriter::new(&mut self.file), &self.map)?;
58421ec1
SR
75
76 // drop ourselves including file lock
77 Ok(())
78 }
79
80 /// Return the map, but drop the lock immediately
81 fn read_only(self) -> HashMap<String, VMState> {
82 self.map
83 }
84}
85
8ca7cccf
WB
86fn make_name(repo: &BackupRepository, ns: &BackupNamespace, snap: &BackupDir) -> String {
87 let full = if ns.is_root() {
2aaf3ef1 88 format!("qemu_{repo}/{snap}")
8ca7cccf 89 } else {
2aaf3ef1 90 format!("qemu_{repo}:{ns}/{snap}")
8ca7cccf 91 };
05d22be1 92 proxmox_systemd::escape_unit(full, false)
58421ec1
SR
93}
94
95/// remove non-responsive VMs from given map, returns 'true' if map was modified
96async fn cleanup_map(map: &mut HashMap<String, VMState>) -> bool {
97 let mut to_remove = Vec::new();
98 for (name, state) in map.iter() {
99 let client = VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone()));
100 let res = client
101 .get("api2/json/status", Some(json!({"keep-timeout": true})))
102 .await;
103 if res.is_err() {
104 // VM is not reachable, remove from map and inform user
105 to_remove.push(name.clone());
095b3c1c 106 log::warn!(
58421ec1 107 "VM '{}' (pid: {}, cid: {}) was not reachable, removing from map",
b58dd1d4
WB
108 name,
109 state.pid,
110 state.cid
58421ec1 111 );
7d39e471 112 let _ = super::qemu_helper::try_kill_vm(state.pid);
58421ec1
SR
113 }
114 }
115
116 for tr in &to_remove {
117 map.remove(tr);
118 }
119
120 !to_remove.is_empty()
121}
122
123fn new_ticket() -> String {
6ef1b649 124 proxmox_uuid::Uuid::generate().to_string()
58421ec1
SR
125}
126
72220d79 127async fn ensure_running(details: &SnapRestoreDetails) -> Result<(i32, VsockClient), Error> {
8ca7cccf 128 let name = make_name(&details.repo, &details.namespace, &details.snapshot);
58421ec1
SR
129 let mut state = VMStateMap::load()?;
130
131 cleanup_map(&mut state.map).await;
132
133 let new_cid;
134 let vms = match state.map.get(&name) {
135 Some(vm) => {
136 let client = VsockClient::new(vm.cid, DEFAULT_VSOCK_PORT, Some(vm.ticket.clone()));
137 let res = client.get("api2/json/status", None).await;
138 match res {
139 Ok(_) => {
140 // VM is running and we just reset its timeout, nothing to do
72220d79 141 return Ok((vm.cid, client));
58421ec1
SR
142 }
143 Err(err) => {
095b3c1c 144 log::warn!("stale VM detected, restarting ({})", err);
58421ec1 145 // VM is dead, restart
7d39e471 146 let _ = super::qemu_helper::try_kill_vm(vm.pid);
58421ec1
SR
147 let vms = start_vm(vm.cid, details).await?;
148 new_cid = vms.cid;
149 state.map.insert(name, vms.clone());
150 vms
151 }
152 }
153 }
154 None => {
155 let mut cid = state
156 .map
157 .iter()
158 .map(|v| v.1.cid)
159 .max()
160 .unwrap_or(0)
161 .wrapping_add(1);
162
74cad4a8 163 // offset cid by user id, to avoid unnecessary retries
58421ec1
SR
164 let running_uid = nix::unistd::Uid::current();
165 cid = cid.wrapping_add(running_uid.as_raw() as i32);
166
167 // some low CIDs have special meaning, start at 10 to avoid them
168 cid = cid.max(10);
169
170 let vms = start_vm(cid, details).await?;
171 new_cid = vms.cid;
172 state.map.insert(name, vms.clone());
173 vms
174 }
175 };
176
177 state.write()?;
72220d79 178 Ok((
58421ec1 179 new_cid,
72220d79 180 VsockClient::new(new_cid, DEFAULT_VSOCK_PORT, Some(vms.ticket)),
58421ec1
SR
181 ))
182}
183
72220d79
DC
184fn path_is_zfs(path: &[u8]) -> bool {
185 if path.is_empty() {
186 return false;
187 }
188 let path = Path::new(OsStr::from_bytes(path));
189 let mut components = path.components();
190 let part = match components.next() {
191 Some(std::path::Component::RootDir) => match components.next() {
192 Some(std::path::Component::Normal(comp)) => comp,
193 _ => return false,
194 },
195 Some(std::path::Component::Normal(comp)) => comp,
196 _ => return false,
197 };
198
199 part == OsStr::new("zpool") && components.next().is_some()
200}
201
69e3beb9
TL
202async fn handle_extra_guest_memory_needs(cid: i32, path: &[u8]) {
203 use std::env::var;
204 match var("PBS_FILE_RESTORE_MEM_HOTPLUG_ALLOW").ok().as_deref() {
205 Some("true") => (),
206 _ => return, // this is opt-in
207 }
2f0f3e99
TL
208 let size = match var("PBS_FILE_RESTORE_MEM_HOTPLUG_SIZE_MB").map(|v| v.parse::<usize>()) {
209 Ok(Ok(size)) if size > MAX_MEMORY_DIMM_SIZE => {
210 log::warn!("limit memory request of {size} to {MAX_MEMORY_DIMM_SIZE}");
211 MAX_MEMORY_DIMM_SIZE
212 }
213 Ok(Ok(size)) => size,
214 _ => 256, // in practice this means a total of ~ 512 MB depending on disk count
215 };
69e3beb9
TL
216
217 if path_is_zfs(path) {
2f0f3e99 218 if let Err(err) = qemu_helper::hotplug_memory(cid, size).await {
69e3beb9
TL
219 log::error!("could not increase memory: {err}");
220 }
221 }
222}
223
58421ec1
SR
224async fn start_vm(cid_request: i32, details: &SnapRestoreDetails) -> Result<VMState, Error> {
225 let ticket = new_ticket();
226 let files = details
227 .manifest
228 .files()
229 .iter()
230 .map(|file| file.filename.clone())
231 .filter(|name| name.ends_with(".img.fidx"));
232 let (pid, cid) =
233 super::qemu_helper::start_vm((cid_request.abs() & 0xFFFF) as u16, details, files, &ticket)
234 .await?;
235 Ok(VMState { pid, cid, ticket })
236}
237
238impl BlockRestoreDriver for QemuBlockDriver {
801ec1db
SR
239 fn data_list(
240 &self,
241 details: SnapRestoreDetails,
242 img_file: String,
243 mut path: Vec<u8>,
244 ) -> Async<Result<Vec<ArchiveEntry>, Error>> {
245 async move {
72220d79 246 let (cid, client) = ensure_running(&details).await?;
801ec1db
SR
247 if !path.is_empty() && path[0] != b'/' {
248 path.insert(0, b'/');
249 }
69e3beb9 250 handle_extra_guest_memory_needs(cid, &path).await;
801ec1db
SR
251 let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>());
252 let mut result = client
253 .get("api2/json/list", Some(json!({ "path": path })))
254 .await?;
255 serde_json::from_value(result["data"].take()).map_err(|err| err.into())
256 }
257 .boxed()
258 }
259
b13089cd
SR
260 fn data_extract(
261 &self,
262 details: SnapRestoreDetails,
263 img_file: String,
264 mut path: Vec<u8>,
cc900ae2
DC
265 format: Option<FileRestoreFormat>,
266 zstd: bool,
b13089cd
SR
267 ) -> Async<Result<Box<dyn tokio::io::AsyncRead + Unpin + Send>, Error>> {
268 async move {
72220d79 269 let (cid, client) = ensure_running(&details).await?;
b13089cd
SR
270 if !path.is_empty() && path[0] != b'/' {
271 path.insert(0, b'/');
272 }
69e3beb9 273 handle_extra_guest_memory_needs(cid, &path).await;
b13089cd
SR
274 let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>());
275 let (mut tx, rx) = tokio::io::duplex(1024 * 4096);
cc900ae2
DC
276 let mut data = json!({ "path": path, "zstd": zstd });
277 if let Some(format) = format {
278 data["format"] = serde_json::to_value(format)?;
279 }
b13089cd
SR
280 tokio::spawn(async move {
281 if let Err(err) = client
cc900ae2 282 .download("api2/json/extract", Some(data), &mut tx)
b13089cd
SR
283 .await
284 {
095b3c1c 285 log::error!("reading file extraction stream failed - {}", err);
ddbd63ed 286 std::process::exit(1);
b13089cd
SR
287 }
288 });
289
290 Ok(Box::new(rx) as Box<dyn tokio::io::AsyncRead + Unpin + Send>)
291 }
292 .boxed()
293 }
294
58421ec1
SR
295 fn status(&self) -> Async<Result<Vec<DriverStatus>, Error>> {
296 async move {
297 let mut state_map = VMStateMap::load()?;
298 let modified = cleanup_map(&mut state_map.map).await;
299 let map = if modified {
300 let m = state_map.map.clone();
301 state_map.write()?;
302 m
303 } else {
304 state_map.read_only()
305 };
306 let mut result = Vec::new();
307
308 for (n, s) in map.iter() {
309 let client = VsockClient::new(s.cid, DEFAULT_VSOCK_PORT, Some(s.ticket.clone()));
310 let resp = client
311 .get("api2/json/status", Some(json!({"keep-timeout": true})))
312 .await;
05d22be1 313 let name = proxmox_systemd::unescape_unit(n)
58421ec1
SR
314 .unwrap_or_else(|_| "<invalid name>".to_owned());
315 let mut extra = json!({"pid": s.pid, "cid": s.cid});
316
317 match resp {
318 Ok(status) => match status["data"].as_object() {
319 Some(map) => {
320 for (k, v) in map.iter() {
321 extra[k] = v.clone();
322 }
323 }
324 None => {
2aaf3ef1 325 let err = format!("invalid JSON received from /status call: {status}");
58421ec1
SR
326 extra["error"] = json!(err);
327 }
328 },
329 Err(err) => {
2aaf3ef1 330 let err = format!("error during /status API call: {err}");
58421ec1
SR
331 extra["error"] = json!(err);
332 }
333 }
334
335 result.push(DriverStatus {
336 id: name,
337 data: extra,
338 });
339 }
340
341 Ok(result)
342 }
343 .boxed()
344 }
345
346 fn stop(&self, id: String) -> Async<Result<(), Error>> {
347 async move {
05d22be1 348 let name = proxmox_systemd::escape_unit(&id, false);
58421ec1
SR
349 let mut map = VMStateMap::load()?;
350 let map_mod = cleanup_map(&mut map.map).await;
351 match map.map.get(&name) {
352 Some(state) => {
353 let client =
354 VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone()));
355 // ignore errors, this either fails because:
356 // * the VM is unreachable/dead, in which case we don't want it in the map
357 // * the call was successful and the connection reset when the VM stopped
358 let _ = client.get("api2/json/stop", None).await;
359 map.map.remove(&name);
360 map.write()?;
361 }
362 None => {
363 if map_mod {
364 map.write()?;
365 }
2aaf3ef1 366 bail!("VM with name '{name}' not found");
58421ec1
SR
367 }
368 }
369 Ok(())
370 }
371 .boxed()
372 }
373
374 fn list(&self) -> Vec<String> {
375 match VMStateMap::load_read_only() {
376 Ok(state) => state
377 .iter()
05d22be1 378 .filter_map(|(name, _)| proxmox_systemd::unescape_unit(name).ok())
58421ec1
SR
379 .collect(),
380 Err(_) => Vec::new(),
381 }
382 }
383}