]>
Commit | Line | Data |
---|---|---|
58421ec1 | 1 | //! Block file access via a small QEMU restore VM using the PBS block driver in QEMU |
2b7f8dd5 WB |
2 | use std::collections::HashMap; |
3 | use std::fs::{File, OpenOptions}; | |
4 | use std::io::{prelude::*, SeekFrom}; | |
5 | ||
58421ec1 SR |
6 | use anyhow::{bail, Error}; |
7 | use futures::FutureExt; | |
8 | use serde::{Deserialize, Serialize}; | |
9 | use serde_json::json; | |
10 | ||
58421ec1 | 11 | use proxmox::tools::fs::lock_file; |
2b7f8dd5 WB |
12 | |
13 | use pbs_client::{DEFAULT_VSOCK_PORT, BackupRepository, VsockClient}; | |
14 | ||
801ec1db | 15 | use proxmox_backup::api2::types::ArchiveEntry; |
58421ec1 | 16 | use proxmox_backup::backup::BackupDir; |
58421ec1 SR |
17 | use proxmox_backup::tools; |
18 | ||
19 | use super::block_driver::*; | |
2b7f8dd5 | 20 | use crate::get_user_run_dir; |
58421ec1 SR |
21 | |
22 | const RESTORE_VM_MAP: &str = "restore-vm-map.json"; | |
23 | ||
24 | pub struct QemuBlockDriver {} | |
25 | ||
26 | #[derive(Clone, Hash, Serialize, Deserialize)] | |
27 | struct VMState { | |
28 | pid: i32, | |
29 | cid: i32, | |
30 | ticket: String, | |
31 | } | |
32 | ||
33 | struct VMStateMap { | |
34 | map: HashMap<String, VMState>, | |
35 | file: File, | |
36 | } | |
37 | ||
38 | impl VMStateMap { | |
39 | fn open_file_raw(write: bool) -> Result<File, Error> { | |
40 | use std::os::unix::fs::OpenOptionsExt; | |
41 | let mut path = get_user_run_dir()?; | |
42 | path.push(RESTORE_VM_MAP); | |
43 | OpenOptions::new() | |
44 | .read(true) | |
45 | .write(write) | |
46 | .create(write) | |
47 | .mode(0o600) | |
48 | .open(path) | |
49 | .map_err(Error::from) | |
50 | } | |
51 | ||
52 | /// Acquire a lock on the state map and retrieve a deserialized version | |
53 | fn load() -> Result<Self, Error> { | |
54 | let mut file = Self::open_file_raw(true)?; | |
66501529 | 55 | lock_file(&mut file, true, Some(std::time::Duration::from_secs(120)))?; |
58421ec1 SR |
56 | let map = serde_json::from_reader(&file).unwrap_or_default(); |
57 | Ok(Self { map, file }) | |
58 | } | |
59 | ||
60 | /// Load a read-only copy of the current VM map. Only use for informational purposes, like | |
61 | /// shell auto-completion, for anything requiring consistency use load() ! | |
62 | fn load_read_only() -> Result<HashMap<String, VMState>, Error> { | |
63 | let file = Self::open_file_raw(false)?; | |
64 | Ok(serde_json::from_reader(&file).unwrap_or_default()) | |
65 | } | |
66 | ||
67 | /// Write back a potentially modified state map, consuming the held lock | |
68 | fn write(mut self) -> Result<(), Error> { | |
69 | self.file.seek(SeekFrom::Start(0))?; | |
70 | self.file.set_len(0)?; | |
71 | serde_json::to_writer(self.file, &self.map)?; | |
72 | ||
73 | // drop ourselves including file lock | |
74 | Ok(()) | |
75 | } | |
76 | ||
77 | /// Return the map, but drop the lock immediately | |
78 | fn read_only(self) -> HashMap<String, VMState> { | |
79 | self.map | |
80 | } | |
81 | } | |
82 | ||
83 | fn make_name(repo: &BackupRepository, snap: &BackupDir) -> String { | |
84 | let full = format!("qemu_{}/{}", repo, snap); | |
85 | tools::systemd::escape_unit(&full, false) | |
86 | } | |
87 | ||
88 | /// remove non-responsive VMs from given map, returns 'true' if map was modified | |
89 | async fn cleanup_map(map: &mut HashMap<String, VMState>) -> bool { | |
90 | let mut to_remove = Vec::new(); | |
91 | for (name, state) in map.iter() { | |
92 | let client = VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone())); | |
93 | let res = client | |
94 | .get("api2/json/status", Some(json!({"keep-timeout": true}))) | |
95 | .await; | |
96 | if res.is_err() { | |
97 | // VM is not reachable, remove from map and inform user | |
98 | to_remove.push(name.clone()); | |
1011fb55 | 99 | eprintln!( |
58421ec1 SR |
100 | "VM '{}' (pid: {}, cid: {}) was not reachable, removing from map", |
101 | name, state.pid, state.cid | |
102 | ); | |
7d39e471 | 103 | let _ = super::qemu_helper::try_kill_vm(state.pid); |
58421ec1 SR |
104 | } |
105 | } | |
106 | ||
107 | for tr in &to_remove { | |
108 | map.remove(tr); | |
109 | } | |
110 | ||
111 | !to_remove.is_empty() | |
112 | } | |
113 | ||
114 | fn new_ticket() -> String { | |
115 | proxmox::tools::Uuid::generate().to_string() | |
116 | } | |
117 | ||
118 | async fn ensure_running(details: &SnapRestoreDetails) -> Result<VsockClient, Error> { | |
119 | let name = make_name(&details.repo, &details.snapshot); | |
120 | let mut state = VMStateMap::load()?; | |
121 | ||
122 | cleanup_map(&mut state.map).await; | |
123 | ||
124 | let new_cid; | |
125 | let vms = match state.map.get(&name) { | |
126 | Some(vm) => { | |
127 | let client = VsockClient::new(vm.cid, DEFAULT_VSOCK_PORT, Some(vm.ticket.clone())); | |
128 | let res = client.get("api2/json/status", None).await; | |
129 | match res { | |
130 | Ok(_) => { | |
131 | // VM is running and we just reset its timeout, nothing to do | |
132 | return Ok(client); | |
133 | } | |
134 | Err(err) => { | |
1011fb55 | 135 | eprintln!("stale VM detected, restarting ({})", err); |
58421ec1 | 136 | // VM is dead, restart |
7d39e471 | 137 | let _ = super::qemu_helper::try_kill_vm(vm.pid); |
58421ec1 SR |
138 | let vms = start_vm(vm.cid, details).await?; |
139 | new_cid = vms.cid; | |
140 | state.map.insert(name, vms.clone()); | |
141 | vms | |
142 | } | |
143 | } | |
144 | } | |
145 | None => { | |
146 | let mut cid = state | |
147 | .map | |
148 | .iter() | |
149 | .map(|v| v.1.cid) | |
150 | .max() | |
151 | .unwrap_or(0) | |
152 | .wrapping_add(1); | |
153 | ||
154 | // offset cid by user id, to avoid unneccessary retries | |
155 | let running_uid = nix::unistd::Uid::current(); | |
156 | cid = cid.wrapping_add(running_uid.as_raw() as i32); | |
157 | ||
158 | // some low CIDs have special meaning, start at 10 to avoid them | |
159 | cid = cid.max(10); | |
160 | ||
161 | let vms = start_vm(cid, details).await?; | |
162 | new_cid = vms.cid; | |
163 | state.map.insert(name, vms.clone()); | |
164 | vms | |
165 | } | |
166 | }; | |
167 | ||
168 | state.write()?; | |
169 | Ok(VsockClient::new( | |
170 | new_cid, | |
171 | DEFAULT_VSOCK_PORT, | |
172 | Some(vms.ticket.clone()), | |
173 | )) | |
174 | } | |
175 | ||
176 | async fn start_vm(cid_request: i32, details: &SnapRestoreDetails) -> Result<VMState, Error> { | |
177 | let ticket = new_ticket(); | |
178 | let files = details | |
179 | .manifest | |
180 | .files() | |
181 | .iter() | |
182 | .map(|file| file.filename.clone()) | |
183 | .filter(|name| name.ends_with(".img.fidx")); | |
184 | let (pid, cid) = | |
185 | super::qemu_helper::start_vm((cid_request.abs() & 0xFFFF) as u16, details, files, &ticket) | |
186 | .await?; | |
187 | Ok(VMState { pid, cid, ticket }) | |
188 | } | |
189 | ||
190 | impl BlockRestoreDriver for QemuBlockDriver { | |
801ec1db SR |
191 | fn data_list( |
192 | &self, | |
193 | details: SnapRestoreDetails, | |
194 | img_file: String, | |
195 | mut path: Vec<u8>, | |
196 | ) -> Async<Result<Vec<ArchiveEntry>, Error>> { | |
197 | async move { | |
198 | let client = ensure_running(&details).await?; | |
199 | if !path.is_empty() && path[0] != b'/' { | |
200 | path.insert(0, b'/'); | |
201 | } | |
202 | let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>()); | |
203 | let mut result = client | |
204 | .get("api2/json/list", Some(json!({ "path": path }))) | |
205 | .await?; | |
206 | serde_json::from_value(result["data"].take()).map_err(|err| err.into()) | |
207 | } | |
208 | .boxed() | |
209 | } | |
210 | ||
b13089cd SR |
211 | fn data_extract( |
212 | &self, | |
213 | details: SnapRestoreDetails, | |
214 | img_file: String, | |
215 | mut path: Vec<u8>, | |
216 | pxar: bool, | |
217 | ) -> Async<Result<Box<dyn tokio::io::AsyncRead + Unpin + Send>, Error>> { | |
218 | async move { | |
219 | let client = ensure_running(&details).await?; | |
220 | if !path.is_empty() && path[0] != b'/' { | |
221 | path.insert(0, b'/'); | |
222 | } | |
223 | let path = base64::encode(img_file.bytes().chain(path).collect::<Vec<u8>>()); | |
224 | let (mut tx, rx) = tokio::io::duplex(1024 * 4096); | |
225 | tokio::spawn(async move { | |
226 | if let Err(err) = client | |
227 | .download( | |
228 | "api2/json/extract", | |
229 | Some(json!({ "path": path, "pxar": pxar })), | |
230 | &mut tx, | |
231 | ) | |
232 | .await | |
233 | { | |
234 | eprintln!("reading file extraction stream failed - {}", err); | |
ddbd63ed | 235 | std::process::exit(1); |
b13089cd SR |
236 | } |
237 | }); | |
238 | ||
239 | Ok(Box::new(rx) as Box<dyn tokio::io::AsyncRead + Unpin + Send>) | |
240 | } | |
241 | .boxed() | |
242 | } | |
243 | ||
58421ec1 SR |
244 | fn status(&self) -> Async<Result<Vec<DriverStatus>, Error>> { |
245 | async move { | |
246 | let mut state_map = VMStateMap::load()?; | |
247 | let modified = cleanup_map(&mut state_map.map).await; | |
248 | let map = if modified { | |
249 | let m = state_map.map.clone(); | |
250 | state_map.write()?; | |
251 | m | |
252 | } else { | |
253 | state_map.read_only() | |
254 | }; | |
255 | let mut result = Vec::new(); | |
256 | ||
257 | for (n, s) in map.iter() { | |
258 | let client = VsockClient::new(s.cid, DEFAULT_VSOCK_PORT, Some(s.ticket.clone())); | |
259 | let resp = client | |
260 | .get("api2/json/status", Some(json!({"keep-timeout": true}))) | |
261 | .await; | |
262 | let name = tools::systemd::unescape_unit(n) | |
263 | .unwrap_or_else(|_| "<invalid name>".to_owned()); | |
264 | let mut extra = json!({"pid": s.pid, "cid": s.cid}); | |
265 | ||
266 | match resp { | |
267 | Ok(status) => match status["data"].as_object() { | |
268 | Some(map) => { | |
269 | for (k, v) in map.iter() { | |
270 | extra[k] = v.clone(); | |
271 | } | |
272 | } | |
273 | None => { | |
274 | let err = format!( | |
275 | "invalid JSON received from /status call: {}", | |
276 | status.to_string() | |
277 | ); | |
278 | extra["error"] = json!(err); | |
279 | } | |
280 | }, | |
281 | Err(err) => { | |
282 | let err = format!("error during /status API call: {}", err); | |
283 | extra["error"] = json!(err); | |
284 | } | |
285 | } | |
286 | ||
287 | result.push(DriverStatus { | |
288 | id: name, | |
289 | data: extra, | |
290 | }); | |
291 | } | |
292 | ||
293 | Ok(result) | |
294 | } | |
295 | .boxed() | |
296 | } | |
297 | ||
298 | fn stop(&self, id: String) -> Async<Result<(), Error>> { | |
299 | async move { | |
300 | let name = tools::systemd::escape_unit(&id, false); | |
301 | let mut map = VMStateMap::load()?; | |
302 | let map_mod = cleanup_map(&mut map.map).await; | |
303 | match map.map.get(&name) { | |
304 | Some(state) => { | |
305 | let client = | |
306 | VsockClient::new(state.cid, DEFAULT_VSOCK_PORT, Some(state.ticket.clone())); | |
307 | // ignore errors, this either fails because: | |
308 | // * the VM is unreachable/dead, in which case we don't want it in the map | |
309 | // * the call was successful and the connection reset when the VM stopped | |
310 | let _ = client.get("api2/json/stop", None).await; | |
311 | map.map.remove(&name); | |
312 | map.write()?; | |
313 | } | |
314 | None => { | |
315 | if map_mod { | |
316 | map.write()?; | |
317 | } | |
318 | bail!("VM with name '{}' not found", name); | |
319 | } | |
320 | } | |
321 | Ok(()) | |
322 | } | |
323 | .boxed() | |
324 | } | |
325 | ||
326 | fn list(&self) -> Vec<String> { | |
327 | match VMStateMap::load_read_only() { | |
328 | Ok(state) => state | |
329 | .iter() | |
330 | .filter_map(|(name, _)| tools::systemd::unescape_unit(&name).ok()) | |
331 | .collect(), | |
332 | Err(_) => Vec::new(), | |
333 | } | |
334 | } | |
335 | } |