1 //! Block file access via a small QEMU restore VM using the PBS block driver in QEMU
2 use std
::collections
::HashMap
;
3 use std
::fs
::{File, OpenOptions}
;
4 use std
::io
::{prelude::*, SeekFrom}
;
6 use anyhow
::{bail, Error}
;
7 use futures
::FutureExt
;
8 use serde
::{Deserialize, Serialize}
;
11 use proxmox_sys
::fs
::lock_file
;
13 use pbs_client
::{DEFAULT_VSOCK_PORT, BackupRepository, VsockClient}
;
14 use pbs_datastore
::backup_info
::BackupDir
;
15 use pbs_datastore
::catalog
::ArchiveEntry
;
17 use super::block_driver
::*;
18 use crate::get_user_run_dir
;
20 const RESTORE_VM_MAP
: &str = "restore-vm-map.json";
22 pub struct QemuBlockDriver {}
24 #[derive(Clone, Hash, Serialize, Deserialize)]
32 map
: HashMap
<String
, VMState
>,
37 fn open_file_raw(write
: bool
) -> Result
<File
, Error
> {
38 use std
::os
::unix
::fs
::OpenOptionsExt
;
39 let mut path
= get_user_run_dir()?
;
40 path
.push(RESTORE_VM_MAP
);
50 /// Acquire a lock on the state map and retrieve a deserialized version
51 fn load() -> Result
<Self, Error
> {
52 let mut file
= Self::open_file_raw(true)?
;
53 lock_file(&mut file
, true, Some(std
::time
::Duration
::from_secs(120)))?
;
54 let map
= serde_json
::from_reader(&file
).unwrap_or_default();
55 Ok(Self { map, file }
)
58 /// Load a read-only copy of the current VM map. Only use for informational purposes, like
59 /// shell auto-completion, for anything requiring consistency use load() !
60 fn load_read_only() -> Result
<HashMap
<String
, VMState
>, Error
> {
61 let file
= Self::open_file_raw(false)?
;
62 Ok(serde_json
::from_reader(&file
).unwrap_or_default())
65 /// Write back a potentially modified state map, consuming the held lock
66 fn write(mut self) -> Result
<(), Error
> {
67 self.file
.seek(SeekFrom
::Start(0))?
;
68 self.file
.set_len(0)?
;
69 serde_json
::to_writer(self.file
, &self.map
)?
;
71 // drop ourselves including file lock
75 /// Return the map, but drop the lock immediately
76 fn read_only(self) -> HashMap
<String
, VMState
> {
81 fn make_name(repo
: &BackupRepository
, snap
: &BackupDir
) -> String
{
82 let full
= format
!("qemu_{}/{}", repo
, snap
);
83 proxmox_sys
::systemd
::escape_unit(&full
, false)
86 /// remove non-responsive VMs from given map, returns 'true' if map was modified
87 async
fn cleanup_map(map
: &mut HashMap
<String
, VMState
>) -> bool
{
88 let mut to_remove
= Vec
::new();
89 for (name
, state
) in map
.iter() {
90 let client
= VsockClient
::new(state
.cid
, DEFAULT_VSOCK_PORT
, Some(state
.ticket
.clone()));
92 .get("api2/json/status", Some(json
!({"keep-timeout": true}
)))
95 // VM is not reachable, remove from map and inform user
96 to_remove
.push(name
.clone());
98 "VM '{}' (pid: {}, cid: {}) was not reachable, removing from map",
99 name
, state
.pid
, state
.cid
101 let _
= super::qemu_helper
::try_kill_vm(state
.pid
);
105 for tr
in &to_remove
{
109 !to_remove
.is_empty()
112 fn new_ticket() -> String
{
113 proxmox_uuid
::Uuid
::generate().to_string()
116 async
fn ensure_running(details
: &SnapRestoreDetails
) -> Result
<VsockClient
, Error
> {
117 let name
= make_name(&details
.repo
, &details
.snapshot
);
118 let mut state
= VMStateMap
::load()?
;
120 cleanup_map(&mut state
.map
).await
;
123 let vms
= match state
.map
.get(&name
) {
125 let client
= VsockClient
::new(vm
.cid
, DEFAULT_VSOCK_PORT
, Some(vm
.ticket
.clone()));
126 let res
= client
.get("api2/json/status", None
).await
;
129 // VM is running and we just reset its timeout, nothing to do
133 eprintln
!("stale VM detected, restarting ({})", err
);
134 // VM is dead, restart
135 let _
= super::qemu_helper
::try_kill_vm(vm
.pid
);
136 let vms
= start_vm(vm
.cid
, details
).await?
;
138 state
.map
.insert(name
, vms
.clone());
152 // offset cid by user id, to avoid unneccessary retries
153 let running_uid
= nix
::unistd
::Uid
::current();
154 cid
= cid
.wrapping_add(running_uid
.as_raw() as i32);
156 // some low CIDs have special meaning, start at 10 to avoid them
159 let vms
= start_vm(cid
, details
).await?
;
161 state
.map
.insert(name
, vms
.clone());
170 Some(vms
.ticket
.clone()),
174 async
fn start_vm(cid_request
: i32, details
: &SnapRestoreDetails
) -> Result
<VMState
, Error
> {
175 let ticket
= new_ticket();
180 .map(|file
| file
.filename
.clone())
181 .filter(|name
| name
.ends_with(".img.fidx"));
183 super::qemu_helper
::start_vm((cid_request
.abs() & 0xFFFF) as u16, details
, files
, &ticket
)
185 Ok(VMState { pid, cid, ticket }
)
188 impl BlockRestoreDriver
for QemuBlockDriver
{
191 details
: SnapRestoreDetails
,
194 ) -> Async
<Result
<Vec
<ArchiveEntry
>, Error
>> {
196 let client
= ensure_running(&details
).await?
;
197 if !path
.is_empty() && path
[0] != b'
/'
{
198 path
.insert(0, b'
/'
);
200 let path
= base64
::encode(img_file
.bytes().chain(path
).collect
::<Vec
<u8>>());
201 let mut result
= client
202 .get("api2/json/list", Some(json
!({ "path": path }
)))
204 serde_json
::from_value(result
["data"].take()).map_err(|err
| err
.into())
211 details
: SnapRestoreDetails
,
215 ) -> Async
<Result
<Box
<dyn tokio
::io
::AsyncRead
+ Unpin
+ Send
>, Error
>> {
217 let client
= ensure_running(&details
).await?
;
218 if !path
.is_empty() && path
[0] != b'
/'
{
219 path
.insert(0, b'
/'
);
221 let path
= base64
::encode(img_file
.bytes().chain(path
).collect
::<Vec
<u8>>());
222 let (mut tx
, rx
) = tokio
::io
::duplex(1024 * 4096);
223 tokio
::spawn(async
move {
224 if let Err(err
) = client
227 Some(json
!({ "path": path, "pxar": pxar }
)),
232 eprintln
!("reading file extraction stream failed - {}", err
);
233 std
::process
::exit(1);
237 Ok(Box
::new(rx
) as Box
<dyn tokio
::io
::AsyncRead
+ Unpin
+ Send
>)
242 fn status(&self) -> Async
<Result
<Vec
<DriverStatus
>, Error
>> {
244 let mut state_map
= VMStateMap
::load()?
;
245 let modified
= cleanup_map(&mut state_map
.map
).await
;
246 let map
= if modified
{
247 let m
= state_map
.map
.clone();
251 state_map
.read_only()
253 let mut result
= Vec
::new();
255 for (n
, s
) in map
.iter() {
256 let client
= VsockClient
::new(s
.cid
, DEFAULT_VSOCK_PORT
, Some(s
.ticket
.clone()));
258 .get("api2/json/status", Some(json
!({"keep-timeout": true}
)))
260 let name
= proxmox_sys
::systemd
::unescape_unit(n
)
261 .unwrap_or_else(|_
| "<invalid name>".to_owned());
262 let mut extra
= json
!({"pid": s.pid, "cid": s.cid}
);
265 Ok(status
) => match status
["data"].as_object() {
267 for (k
, v
) in map
.iter() {
268 extra
[k
] = v
.clone();
273 "invalid JSON received from /status call: {}",
276 extra
["error"] = json
!(err
);
280 let err
= format
!("error during /status API call: {}", err
);
281 extra
["error"] = json
!(err
);
285 result
.push(DriverStatus
{
296 fn stop(&self, id
: String
) -> Async
<Result
<(), Error
>> {
298 let name
= proxmox_sys
::systemd
::escape_unit(&id
, false);
299 let mut map
= VMStateMap
::load()?
;
300 let map_mod
= cleanup_map(&mut map
.map
).await
;
301 match map
.map
.get(&name
) {
304 VsockClient
::new(state
.cid
, DEFAULT_VSOCK_PORT
, Some(state
.ticket
.clone()));
305 // ignore errors, this either fails because:
306 // * the VM is unreachable/dead, in which case we don't want it in the map
307 // * the call was successful and the connection reset when the VM stopped
308 let _
= client
.get("api2/json/stop", None
).await
;
309 map
.map
.remove(&name
);
316 bail
!("VM with name '{}' not found", name
);
324 fn list(&self) -> Vec
<String
> {
325 match VMStateMap
::load_read_only() {
328 .filter_map(|(name
, _
)| proxmox_sys
::systemd
::unescape_unit(&name
).ok())
330 Err(_
) => Vec
::new(),