]> git.proxmox.com Git - proxmox-backup.git/blame - src/bin/proxmox_file_restore/block_driver_qemu.rs
move client to pbs-client subcrate
[proxmox-backup.git] / src / bin / proxmox_file_restore / block_driver_qemu.rs
CommitLineData
58421ec1 1//! Block file access via a small QEMU restore VM using the PBS block driver in QEMU
2b7f8dd5
WB
2use std::collections::HashMap;
3use std::fs::{File, OpenOptions};
4use std::io::{prelude::*, SeekFrom};
5
58421ec1
SR
6use anyhow::{bail, Error};
7use futures::FutureExt;
8use serde::{Deserialize, Serialize};
9use serde_json::json;
10
58421ec1 11use proxmox::tools::fs::lock_file;
2b7f8dd5
WB
12
13use pbs_client::{DEFAULT_VSOCK_PORT, BackupRepository, VsockClient};
14
801ec1db 15use proxmox_backup::api2::types::ArchiveEntry;
58421ec1 16use proxmox_backup::backup::BackupDir;
58421ec1
SR
17use proxmox_backup::tools;
18
19use super::block_driver::*;
2b7f8dd5 20use crate::get_user_run_dir;
58421ec1
SR
21
22const RESTORE_VM_MAP: &str = "restore-vm-map.json";
23
24pub struct QemuBlockDriver {}
25
26#[derive(Clone, Hash, Serialize, Deserialize)]
27struct VMState {
28 pid: i32,
29 cid: i32,
30 ticket: String,
31}
32
33struct VMStateMap {
34 map: HashMap<String, VMState>,
35 file: File,
36}
37
38impl VMStateMap {
39 fn open_file_raw(write: bool) -> Result<File, Error> {
40 use std::os::unix::fs::OpenOptionsExt;
41 let mut path = get_user_run_dir()?;
42 path.push(RESTORE_VM_MAP);
43 OpenOptions::new()
44 .read(true)
45 .write(write)
46 .create(write)
47 .mode(0o600)
48 .open(path)
49 .map_err(Error::from)
50 }
51
52 /// Acquire a lock on the state map and retrieve a deserialized version
53 fn load() -> Result<Self, Error> {
54 let mut file = Self::open_file_raw(true)?;
66501529 55 lock_file(&mut file, true, Some(std::time::Duration::from_secs(120)))?;
58421ec1
SR
56 let map = serde_json::from_reader(&file).unwrap_or_default();
57 Ok(Self { map, file })
58 }
59
60 /// Load a read-only copy of the current VM map. Only use for informational purposes, like
61 /// shell auto-completion, for anything requiring consistency use load() !
62 fn load_read_only() -> Result<HashMap<String, VMState>, Error> {
63 let file = Self::open_file_raw(false)?;
64 Ok(serde_json::from_reader(&file).unwrap_or_default())
65 }
66
67 /// Write back a potentially modified state map, consuming the held lock
68 fn write(mut self) -> Result<(), Error> {
69 self.file.seek(SeekFrom::Start(0))?;
70 self.file.set_len(0)?;
71 serde_json::to_writer(self.file, &self.map)?;
72
73 // drop ourselves including file lock
74 Ok(())
75 }
76
77 /// Return the map, but drop the lock immediately
78 fn read_only(self) -> HashMap<String, VMState> {
79 self.map
80 }
81}
82
83fn make_name(repo: &BackupRepository, snap: &BackupDir) -> String {
84 let full = format!("qemu_{}/{}", repo, snap);
85 tools::systemd::escape_unit(&full, false)
86}
87
88/// remove non-responsive VMs from given map, returns 'true' if map was modified
89async fn cleanup_map(map: &mut HashMap<String, VMState>) -> bool {
90 let mut to_remove = Vec::new();
91 for (name, state) in map.iter() {
92 let client = VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone()));
93 let res = client
94 .get("api2/json/status", Some(json!({"keep-timeout": true})))
95 .await;
96 if res.is_err() {
97 // VM is not reachable, remove from map and inform user
98 to_remove.push(name.clone());
1011fb55 99 eprintln!(
58421ec1
SR
100 "VM '{}' (pid: {}, cid: {}) was not reachable, removing from map",
101 name, state.pid, state.cid
102 );
7d39e471 103 let _ = super::qemu_helper::try_kill_vm(state.pid);
58421ec1
SR
104 }
105 }
106
107 for tr in &to_remove {
108 map.remove(tr);
109 }
110
111 !to_remove.is_empty()
112}
113
114fn new_ticket() -> String {
115 proxmox::tools::Uuid::generate().to_string()
116}
117
118async fn ensure_running(details: &SnapRestoreDetails) -> Result<VsockClient, Error> {
119 let name = make_name(&details.repo, &details.snapshot);
120 let mut state = VMStateMap::load()?;
121
122 cleanup_map(&mut state.map).await;
123
124 let new_cid;
125 let vms = match state.map.get(&name) {
126 Some(vm) => {
127 let client = VsockClient::new(vm.cid, DEFAULT_VSOCK_PORT, Some(vm.ticket.clone()));
128 let res = client.get("api2/json/status", None).await;
129 match res {
130 Ok(_) => {
131 // VM is running and we just reset its timeout, nothing to do
132 return Ok(client);
133 }
134 Err(err) => {
1011fb55 135 eprintln!("stale VM detected, restarting ({})", err);
58421ec1 136 // VM is dead, restart
7d39e471 137 let _ = super::qemu_helper::try_kill_vm(vm.pid);
58421ec1
SR
138 let vms = start_vm(vm.cid, details).await?;
139 new_cid = vms.cid;
140 state.map.insert(name, vms.clone());
141 vms
142 }
143 }
144 }
145 None => {
146 let mut cid = state
147 .map
148 .iter()
149 .map(|v| v.1.cid)
150 .max()
151 .unwrap_or(0)
152 .wrapping_add(1);
153
154 // offset cid by user id, to avoid unneccessary retries
155 let running_uid = nix::unistd::Uid::current();
156 cid = cid.wrapping_add(running_uid.as_raw() as i32);
157
158 // some low CIDs have special meaning, start at 10 to avoid them
159 cid = cid.max(10);
160
161 let vms = start_vm(cid, details).await?;
162 new_cid = vms.cid;
163 state.map.insert(name, vms.clone());
164 vms
165 }
166 };
167
168 state.write()?;
169 Ok(VsockClient::new(
170 new_cid,
171 DEFAULT_VSOCK_PORT,
172 Some(vms.ticket.clone()),
173 ))
174}
175
176async fn start_vm(cid_request: i32, details: &SnapRestoreDetails) -> Result<VMState, Error> {
177 let ticket = new_ticket();
178 let files = details
179 .manifest
180 .files()
181 .iter()
182 .map(|file| file.filename.clone())
183 .filter(|name| name.ends_with(".img.fidx"));
184 let (pid, cid) =
185 super::qemu_helper::start_vm((cid_request.abs() & 0xFFFF) as u16, details, files, &ticket)
186 .await?;
187 Ok(VMState { pid, cid, ticket })
188}
189
190impl BlockRestoreDriver for QemuBlockDriver {
801ec1db
SR
191 fn data_list(
192 &self,
193 details: SnapRestoreDetails,
194 img_file: String,
195 mut path: Vec<u8>,
196 ) -> Async<Result<Vec<ArchiveEntry>, Error>> {
197 async move {
198 let client = ensure_running(&details).await?;
199 if !path.is_empty() && path[0] != b'/' {
200 path.insert(0, b'/');
201 }
202 let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>());
203 let mut result = client
204 .get("api2/json/list", Some(json!({ "path": path })))
205 .await?;
206 serde_json::from_value(result["data"].take()).map_err(|err| err.into())
207 }
208 .boxed()
209 }
210
b13089cd
SR
211 fn data_extract(
212 &self,
213 details: SnapRestoreDetails,
214 img_file: String,
215 mut path: Vec<u8>,
216 pxar: bool,
217 ) -> Async<Result<Box<dyn tokio::io::AsyncRead + Unpin + Send>, Error>> {
218 async move {
219 let client = ensure_running(&details).await?;
220 if !path.is_empty() && path[0] != b'/' {
221 path.insert(0, b'/');
222 }
223 let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>());
224 let (mut tx, rx) = tokio::io::duplex(1024 * 4096);
225 tokio::spawn(async move {
226 if let Err(err) = client
227 .download(
228 "api2/json/extract",
229 Some(json!({ "path": path, "pxar": pxar })),
230 &mut tx,
231 )
232 .await
233 {
234 eprintln!("reading file extraction stream failed - {}", err);
ddbd63ed 235 std::process::exit(1);
b13089cd
SR
236 }
237 });
238
239 Ok(Box::new(rx) as Box<dyn tokio::io::AsyncRead + Unpin + Send>)
240 }
241 .boxed()
242 }
243
58421ec1
SR
244 fn status(&self) -> Async<Result<Vec<DriverStatus>, Error>> {
245 async move {
246 let mut state_map = VMStateMap::load()?;
247 let modified = cleanup_map(&mut state_map.map).await;
248 let map = if modified {
249 let m = state_map.map.clone();
250 state_map.write()?;
251 m
252 } else {
253 state_map.read_only()
254 };
255 let mut result = Vec::new();
256
257 for (n, s) in map.iter() {
258 let client = VsockClient::new(s.cid, DEFAULT_VSOCK_PORT, Some(s.ticket.clone()));
259 let resp = client
260 .get("api2/json/status", Some(json!({"keep-timeout": true})))
261 .await;
262 let name = tools::systemd::unescape_unit(n)
263 .unwrap_or_else(|_| "<invalid name>".to_owned());
264 let mut extra = json!({"pid": s.pid, "cid": s.cid});
265
266 match resp {
267 Ok(status) => match status["data"].as_object() {
268 Some(map) => {
269 for (k, v) in map.iter() {
270 extra[k] = v.clone();
271 }
272 }
273 None => {
274 let err = format!(
275 "invalid JSON received from /status call: {}",
276 status.to_string()
277 );
278 extra["error"] = json!(err);
279 }
280 },
281 Err(err) => {
282 let err = format!("error during /status API call: {}", err);
283 extra["error"] = json!(err);
284 }
285 }
286
287 result.push(DriverStatus {
288 id: name,
289 data: extra,
290 });
291 }
292
293 Ok(result)
294 }
295 .boxed()
296 }
297
298 fn stop(&self, id: String) -> Async<Result<(), Error>> {
299 async move {
300 let name = tools::systemd::escape_unit(&id, false);
301 let mut map = VMStateMap::load()?;
302 let map_mod = cleanup_map(&mut map.map).await;
303 match map.map.get(&name) {
304 Some(state) => {
305 let client =
306 VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone()));
307 // ignore errors, this either fails because:
308 // * the VM is unreachable/dead, in which case we don't want it in the map
309 // * the call was successful and the connection reset when the VM stopped
310 let _ = client.get("api2/json/stop", None).await;
311 map.map.remove(&name);
312 map.write()?;
313 }
314 None => {
315 if map_mod {
316 map.write()?;
317 }
318 bail!("VM with name '{}' not found", name);
319 }
320 }
321 Ok(())
322 }
323 .boxed()
324 }
325
326 fn list(&self) -> Vec<String> {
327 match VMStateMap::load_read_only() {
328 Ok(state) => state
329 .iter()
330 .filter_map(|(name, _)| tools::systemd::unescape_unit(&name).ok())
331 .collect(),
332 Err(_) => Vec::new(),
333 }
334 }
335}