]> git.proxmox.com Git - proxmox-backup.git/blob - proxmox-file-restore/src/block_driver_qemu.rs
b6eaf83afe9746766ec9096915272311da0ab2ab
[proxmox-backup.git] / proxmox-file-restore / src / block_driver_qemu.rs
1 //! Block file access via a small QEMU restore VM using the PBS block driver in QEMU
2 use std::collections::HashMap;
3 use std::fs::{File, OpenOptions};
4 use std::io::{prelude::*, SeekFrom};
5
6 use anyhow::{bail, Error};
7 use futures::FutureExt;
8 use serde::{Deserialize, Serialize};
9 use serde_json::json;
10
11 use proxmox::tools::fs::lock_file;
12
13 use pbs_client::{DEFAULT_VSOCK_PORT, BackupRepository, VsockClient};
14 use pbs_datastore::backup_info::BackupDir;
15 use pbs_datastore::catalog::ArchiveEntry;
16
17 use super::block_driver::*;
18 use crate::get_user_run_dir;
19
20 const RESTORE_VM_MAP: &str = "restore-vm-map.json";
21
22 pub struct QemuBlockDriver {}
23
24 #[derive(Clone, Hash, Serialize, Deserialize)]
25 struct VMState {
26 pid: i32,
27 cid: i32,
28 ticket: String,
29 }
30
31 struct VMStateMap {
32 map: HashMap<String, VMState>,
33 file: File,
34 }
35
36 impl VMStateMap {
37 fn open_file_raw(write: bool) -> Result<File, Error> {
38 use std::os::unix::fs::OpenOptionsExt;
39 let mut path = get_user_run_dir()?;
40 path.push(RESTORE_VM_MAP);
41 OpenOptions::new()
42 .read(true)
43 .write(write)
44 .create(write)
45 .mode(0o600)
46 .open(path)
47 .map_err(Error::from)
48 }
49
50 /// Acquire a lock on the state map and retrieve a deserialized version
51 fn load() -> Result<Self, Error> {
52 let mut file = Self::open_file_raw(true)?;
53 lock_file(&mut file, true, Some(std::time::Duration::from_secs(120)))?;
54 let map = serde_json::from_reader(&file).unwrap_or_default();
55 Ok(Self { map, file })
56 }
57
58 /// Load a read-only copy of the current VM map. Only use for informational purposes, like
59 /// shell auto-completion, for anything requiring consistency use load() !
60 fn load_read_only() -> Result<HashMap<String, VMState>, Error> {
61 let file = Self::open_file_raw(false)?;
62 Ok(serde_json::from_reader(&file).unwrap_or_default())
63 }
64
65 /// Write back a potentially modified state map, consuming the held lock
66 fn write(mut self) -> Result<(), Error> {
67 self.file.seek(SeekFrom::Start(0))?;
68 self.file.set_len(0)?;
69 serde_json::to_writer(self.file, &self.map)?;
70
71 // drop ourselves including file lock
72 Ok(())
73 }
74
75 /// Return the map, but drop the lock immediately
76 fn read_only(self) -> HashMap<String, VMState> {
77 self.map
78 }
79 }
80
81 fn make_name(repo: &BackupRepository, snap: &BackupDir) -> String {
82 let full = format!("qemu_{}/{}", repo, snap);
83 proxmox::tools::systemd::escape_unit(&full, false)
84 }
85
86 /// remove non-responsive VMs from given map, returns 'true' if map was modified
87 async fn cleanup_map(map: &mut HashMap<String, VMState>) -> bool {
88 let mut to_remove = Vec::new();
89 for (name, state) in map.iter() {
90 let client = VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone()));
91 let res = client
92 .get("api2/json/status", Some(json!({"keep-timeout": true})))
93 .await;
94 if res.is_err() {
95 // VM is not reachable, remove from map and inform user
96 to_remove.push(name.clone());
97 eprintln!(
98 "VM '{}' (pid: {}, cid: {}) was not reachable, removing from map",
99 name, state.pid, state.cid
100 );
101 let _ = super::qemu_helper::try_kill_vm(state.pid);
102 }
103 }
104
105 for tr in &to_remove {
106 map.remove(tr);
107 }
108
109 !to_remove.is_empty()
110 }
111
112 fn new_ticket() -> String {
113 proxmox::tools::Uuid::generate().to_string()
114 }
115
116 async fn ensure_running(details: &SnapRestoreDetails) -> Result<VsockClient, Error> {
117 let name = make_name(&details.repo, &details.snapshot);
118 let mut state = VMStateMap::load()?;
119
120 cleanup_map(&mut state.map).await;
121
122 let new_cid;
123 let vms = match state.map.get(&name) {
124 Some(vm) => {
125 let client = VsockClient::new(vm.cid, DEFAULT_VSOCK_PORT, Some(vm.ticket.clone()));
126 let res = client.get("api2/json/status", None).await;
127 match res {
128 Ok(_) => {
129 // VM is running and we just reset its timeout, nothing to do
130 return Ok(client);
131 }
132 Err(err) => {
133 eprintln!("stale VM detected, restarting ({})", err);
134 // VM is dead, restart
135 let _ = super::qemu_helper::try_kill_vm(vm.pid);
136 let vms = start_vm(vm.cid, details).await?;
137 new_cid = vms.cid;
138 state.map.insert(name, vms.clone());
139 vms
140 }
141 }
142 }
143 None => {
144 let mut cid = state
145 .map
146 .iter()
147 .map(|v| v.1.cid)
148 .max()
149 .unwrap_or(0)
150 .wrapping_add(1);
151
152 // offset cid by user id, to avoid unneccessary retries
153 let running_uid = nix::unistd::Uid::current();
154 cid = cid.wrapping_add(running_uid.as_raw() as i32);
155
156 // some low CIDs have special meaning, start at 10 to avoid them
157 cid = cid.max(10);
158
159 let vms = start_vm(cid, details).await?;
160 new_cid = vms.cid;
161 state.map.insert(name, vms.clone());
162 vms
163 }
164 };
165
166 state.write()?;
167 Ok(VsockClient::new(
168 new_cid,
169 DEFAULT_VSOCK_PORT,
170 Some(vms.ticket.clone()),
171 ))
172 }
173
174 async fn start_vm(cid_request: i32, details: &SnapRestoreDetails) -> Result<VMState, Error> {
175 let ticket = new_ticket();
176 let files = details
177 .manifest
178 .files()
179 .iter()
180 .map(|file| file.filename.clone())
181 .filter(|name| name.ends_with(".img.fidx"));
182 let (pid, cid) =
183 super::qemu_helper::start_vm((cid_request.abs() & 0xFFFF) as u16, details, files, &ticket)
184 .await?;
185 Ok(VMState { pid, cid, ticket })
186 }
187
188 impl BlockRestoreDriver for QemuBlockDriver {
189 fn data_list(
190 &self,
191 details: SnapRestoreDetails,
192 img_file: String,
193 mut path: Vec<u8>,
194 ) -> Async<Result<Vec<ArchiveEntry>, Error>> {
195 async move {
196 let client = ensure_running(&details).await?;
197 if !path.is_empty() && path[0] != b'/' {
198 path.insert(0, b'/');
199 }
200 let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>());
201 let mut result = client
202 .get("api2/json/list", Some(json!({ "path": path })))
203 .await?;
204 serde_json::from_value(result["data"].take()).map_err(|err| err.into())
205 }
206 .boxed()
207 }
208
209 fn data_extract(
210 &self,
211 details: SnapRestoreDetails,
212 img_file: String,
213 mut path: Vec<u8>,
214 pxar: bool,
215 ) -> Async<Result<Box<dyn tokio::io::AsyncRead + Unpin + Send>, Error>> {
216 async move {
217 let client = ensure_running(&details).await?;
218 if !path.is_empty() && path[0] != b'/' {
219 path.insert(0, b'/');
220 }
221 let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>());
222 let (mut tx, rx) = tokio::io::duplex(1024 * 4096);
223 tokio::spawn(async move {
224 if let Err(err) = client
225 .download(
226 "api2/json/extract",
227 Some(json!({ "path": path, "pxar": pxar })),
228 &mut tx,
229 )
230 .await
231 {
232 eprintln!("reading file extraction stream failed - {}", err);
233 std::process::exit(1);
234 }
235 });
236
237 Ok(Box::new(rx) as Box<dyn tokio::io::AsyncRead + Unpin + Send>)
238 }
239 .boxed()
240 }
241
242 fn status(&self) -> Async<Result<Vec<DriverStatus>, Error>> {
243 async move {
244 let mut state_map = VMStateMap::load()?;
245 let modified = cleanup_map(&mut state_map.map).await;
246 let map = if modified {
247 let m = state_map.map.clone();
248 state_map.write()?;
249 m
250 } else {
251 state_map.read_only()
252 };
253 let mut result = Vec::new();
254
255 for (n, s) in map.iter() {
256 let client = VsockClient::new(s.cid, DEFAULT_VSOCK_PORT, Some(s.ticket.clone()));
257 let resp = client
258 .get("api2/json/status", Some(json!({"keep-timeout": true})))
259 .await;
260 let name = proxmox::tools::systemd::unescape_unit(n)
261 .unwrap_or_else(|_| "<invalid name>".to_owned());
262 let mut extra = json!({"pid": s.pid, "cid": s.cid});
263
264 match resp {
265 Ok(status) => match status["data"].as_object() {
266 Some(map) => {
267 for (k, v) in map.iter() {
268 extra[k] = v.clone();
269 }
270 }
271 None => {
272 let err = format!(
273 "invalid JSON received from /status call: {}",
274 status.to_string()
275 );
276 extra["error"] = json!(err);
277 }
278 },
279 Err(err) => {
280 let err = format!("error during /status API call: {}", err);
281 extra["error"] = json!(err);
282 }
283 }
284
285 result.push(DriverStatus {
286 id: name,
287 data: extra,
288 });
289 }
290
291 Ok(result)
292 }
293 .boxed()
294 }
295
296 fn stop(&self, id: String) -> Async<Result<(), Error>> {
297 async move {
298 let name = proxmox::tools::systemd::escape_unit(&id, false);
299 let mut map = VMStateMap::load()?;
300 let map_mod = cleanup_map(&mut map.map).await;
301 match map.map.get(&name) {
302 Some(state) => {
303 let client =
304 VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone()));
305 // ignore errors, this either fails because:
306 // * the VM is unreachable/dead, in which case we don't want it in the map
307 // * the call was successful and the connection reset when the VM stopped
308 let _ = client.get("api2/json/stop", None).await;
309 map.map.remove(&name);
310 map.write()?;
311 }
312 None => {
313 if map_mod {
314 map.write()?;
315 }
316 bail!("VM with name '{}' not found", name);
317 }
318 }
319 Ok(())
320 }
321 .boxed()
322 }
323
324 fn list(&self) -> Vec<String> {
325 match VMStateMap::load_read_only() {
326 Ok(state) => state
327 .iter()
328 .filter_map(|(name, _)| proxmox::tools::systemd::unescape_unit(&name).ok())
329 .collect(),
330 Err(_) => Vec::new(),
331 }
332 }
333 }