1 ///! File-restore API running inside the restore VM
4 use std
::os
::unix
::ffi
::OsStrExt
;
5 use std
::path
::{Path, PathBuf}
;
7 use anyhow
::{bail, Error}
;
8 use futures
::FutureExt
;
9 use hyper
::http
::request
::Parts
;
10 use hyper
::{header, Body, Response, StatusCode}
;
12 use serde_json
::Value
;
13 use tokio
::sync
::Semaphore
;
15 use pathpatterns
::{MatchEntry, MatchPattern, MatchType, Pattern}
;
16 use proxmox
::{identity, sortable}
;
18 list_subdirs_api_method
,
19 ApiHandler
, ApiMethod
, ApiResponseFuture
, Permission
, Router
, RpcEnvironment
, SubdirMap
,
21 use proxmox_schema
::*;
22 use proxmox_async
::zip
::zip_directory
;
24 use pbs_api_types
::file_restore
::RestoreDaemonStatus
;
25 use pbs_client
::pxar
::{create_archive, Flags, PxarCreateOptions, ENCODER_MAX_ENTRIES}
;
26 use pbs_datastore
::catalog
::{ArchiveEntry, DirEntryAttribute}
;
27 use pbs_tools
::fs
::read_subdir
;
28 use pbs_tools
::json
::required_string_param
;
30 use pxar
::encoder
::aio
::TokioWriter
;
32 use super::{disk::ResolveResult, watchdog_remaining, watchdog_inhibit, watchdog_ping}
;
34 // NOTE: All API endpoints must have Permission::Superuser, as the configs for authentication do
35 // not exist within the restore VM. Safety is guaranteed by checking a ticket via a custom ApiAuth.
37 const SUBDIRS
: SubdirMap
= &[
38 ("extract", &Router
::new().get(&API_METHOD_EXTRACT
)),
39 ("list", &Router
::new().get(&API_METHOD_LIST
)),
40 ("status", &Router
::new().get(&API_METHOD_STATUS
)),
41 ("stop", &Router
::new().get(&API_METHOD_STOP
)),
44 pub const ROUTER
: Router
= Router
::new()
45 .get(&list_subdirs_api_method
!(SUBDIRS
))
48 static DOWNLOAD_SEM
: Semaphore
= Semaphore
::const_new(8);
50 fn read_uptime() -> Result
<f32, Error
> {
51 let uptime
= fs
::read_to_string("/proc/uptime")?
;
52 // unwrap the Option, if /proc/uptime is empty we have bigger problems
53 Ok(uptime
.split_ascii_whitespace().next().unwrap().parse()?
)
61 description
: "If true, do not reset the watchdog timer on this API call.",
68 description
: "Permissions are handled outside restore VM. This call can be made without a ticket, but keep-timeout is always assumed 'true' then.",
69 permission
: &Permission
::World
,
72 type: RestoreDaemonStatus
,
75 /// General status information
76 fn status(rpcenv
: &mut dyn RpcEnvironment
, keep_timeout
: bool
) -> Result
<RestoreDaemonStatus
, Error
> {
77 if !keep_timeout
&& rpcenv
.get_auth_id().is_some() {
80 Ok(RestoreDaemonStatus
{
81 uptime
: read_uptime()?
as i64,
82 timeout
: watchdog_remaining(),
88 description
: "Permissions are handled outside restore VM.",
89 permission
: &Permission
::Superuser
,
92 /// Stop the restore VM immediately, this will never return if successful
95 println
!("/stop called, shutting down");
96 let err
= reboot
::reboot(reboot
::RebootMode
::RB_POWER_OFF
).unwrap_err();
97 println
!("'reboot' syscall failed: {}", err
);
98 std
::process
::exit(1);
101 fn get_dir_entry(path
: &Path
) -> Result
<DirEntryAttribute
, Error
> {
104 let stat
= stat
::stat(path
)?
;
105 Ok(match stat
.st_mode
& libc
::S_IFMT
{
106 libc
::S_IFREG
=> DirEntryAttribute
::File
{
107 size
: stat
.st_size
as u64,
108 mtime
: stat
.st_mtime
,
110 libc
::S_IFDIR
=> DirEntryAttribute
::Directory { start: 0 }
,
111 _
=> bail
!("unsupported file type: {}", stat
.st_mode
),
120 description
: "base64-encoded path to list files and directories under",
125 description
: "Permissions are handled outside restore VM.",
126 permission
: &Permission
::Superuser
,
129 /// List file details for given file or a list of files and directories under the given path if it
130 /// points to a directory.
134 _rpcenv
: &mut dyn RpcEnvironment
,
135 ) -> Result
<Vec
<ArchiveEntry
>, Error
> {
138 let mut res
= Vec
::new();
140 let param_path
= base64
::decode(path
)?
;
141 let mut path
= param_path
.clone();
142 if let Some(b'
/'
) = path
.last() {
145 let path_str
= OsStr
::from_bytes(&path
[..]);
146 let param_path_buf
= Path
::new(path_str
);
148 let mut disk_state
= crate::DISK_STATE
.lock().unwrap();
149 let query_result
= disk_state
.resolve(¶m_path_buf
)?
;
152 ResolveResult
::Path(vm_path
) => {
153 let root_entry
= get_dir_entry(&vm_path
)?
;
155 DirEntryAttribute
::File { .. }
=> {
156 // list on file, return details
157 res
.push(ArchiveEntry
::new(¶m_path
, Some(&root_entry
)));
159 DirEntryAttribute
::Directory { .. }
=> {
160 // list on directory, return all contained files/dirs
161 for f
in read_subdir(libc
::AT_FDCWD
, &vm_path
)?
{
163 let name
= f
.file_name().to_bytes();
164 let path
= &Path
::new(OsStr
::from_bytes(name
));
165 if path
.components().count() == 1 {
166 // ignore '.' and '..'
167 match path
.components().next().unwrap() {
168 std
::path
::Component
::CurDir
169 | std
::path
::Component
::ParentDir
=> continue,
174 let mut full_vm_path
= PathBuf
::new();
175 full_vm_path
.push(&vm_path
);
176 full_vm_path
.push(path
);
177 let mut full_path
= PathBuf
::new();
178 full_path
.push(param_path_buf
);
179 full_path
.push(path
);
181 let entry
= get_dir_entry(&full_vm_path
);
182 if let Ok(entry
) = entry
{
183 res
.push(ArchiveEntry
::new(
184 full_path
.as_os_str().as_bytes(),
194 ResolveResult
::BucketTypes(types
) => {
196 let mut t_path
= path
.clone();
198 t_path
.extend(t
.as_bytes());
199 res
.push(ArchiveEntry
::new(
205 ResolveResult
::BucketComponents(comps
) => {
207 let mut c_path
= path
.clone();
209 c_path
.extend(c
.0.as_bytes());
210 res
.push(ArchiveEntry
::new_with_size(
212 // this marks the beginning of a filesystem, i.e. '/', so this is a Directory
213 Some(&DirEntryAttribute
::Directory { start: 0 }
),
224 pub const API_METHOD_EXTRACT
: ApiMethod
= ApiMethod
::new(
225 &ApiHandler
::AsyncHttp(&extract
),
227 "Extract a file or directory from the VM as a pxar archive.",
232 &StringSchema
::new("base64-encoded path to list files and directories under")
238 &BooleanSchema
::new(concat
!(
239 "if true, return a pxar archive, otherwise either the ",
240 "file content or the directory as a zip file"
248 .access(None
, &Permission
::Superuser
);
255 _rpcenv
: Box
<dyn RpcEnvironment
>,
256 ) -> ApiResponseFuture
{
257 // download can take longer than watchdog timeout, inhibit until done
258 let _inhibitor
= watchdog_inhibit();
260 let _inhibitor
= _inhibitor
;
262 let _permit
= match DOWNLOAD_SEM
.try_acquire() {
263 Ok(permit
) => permit
,
264 Err(_
) => bail
!("maximum concurrent download limit reached, please wait for another restore to finish before attempting a new one"),
267 let path
= required_string_param(¶m
, "path")?
;
268 let mut path
= base64
::decode(path
)?
;
269 if let Some(b'
/'
) = path
.last() {
272 let path
= Path
::new(OsStr
::from_bytes(&path
[..]));
274 let pxar
= param
["pxar"].as_bool().unwrap_or(true);
277 let mut disk_state
= crate::DISK_STATE
.lock().unwrap();
278 disk_state
.resolve(&path
)?
281 let vm_path
= match query_result
{
282 ResolveResult
::Path(vm_path
) => vm_path
,
283 _
=> bail
!("invalid path, cannot restore meta-directory: {:?}", path
),
286 // check here so we can return a real error message, failing in the async task will stop
287 // the transfer, but not return a useful message
288 if !vm_path
.exists() {
289 bail
!("file or directory {:?} does not exist", path
);
292 let (mut writer
, reader
) = tokio
::io
::duplex(1024 * 64);
295 tokio
::spawn(async
move {
296 let _inhibitor
= _inhibitor
;
297 let _permit
= _permit
;
298 let result
= async
move {
299 // pxar always expects a directory as it's root, so to accommodate files as
300 // well we encode the parent dir with a filter only matching the target instead
301 let mut patterns
= vec
![MatchEntry
::new(
302 MatchPattern
::Pattern(Pattern
::path(b
"*").unwrap()),
306 let name
= match vm_path
.file_name() {
308 None
=> bail
!("no file name found for path: {:?}", vm_path
),
311 if vm_path
.is_dir() {
312 let mut pat
= name
.as_bytes().to_vec();
313 patterns
.push(MatchEntry
::new(
314 MatchPattern
::Pattern(Pattern
::path(pat
.clone())?
),
317 pat
.extend(b
"/**/*".iter());
318 patterns
.push(MatchEntry
::new(
319 MatchPattern
::Pattern(Pattern
::path(pat
)?
),
323 patterns
.push(MatchEntry
::new(
324 MatchPattern
::Literal(name
.as_bytes().to_vec()),
329 let dir_path
= vm_path
.parent().unwrap_or_else(|| Path
::new("/"));
330 let dir
= nix
::dir
::Dir
::open(
332 nix
::fcntl
::OFlag
::O_NOFOLLOW
,
333 nix
::sys
::stat
::Mode
::empty(),
336 let options
= PxarCreateOptions
{
337 entries_max
: ENCODER_MAX_ENTRIES
,
341 skip_lost_and_found
: false,
344 let pxar_writer
= TokioWriter
::new(writer
);
345 create_archive(dir
, pxar_writer
, Flags
::DEFAULT
, |_
| Ok(()), None
, options
)
349 if let Err(err
) = result
{
350 error
!("pxar streaming task failed - {}", err
);
354 tokio
::spawn(async
move {
355 let _inhibitor
= _inhibitor
;
356 let _permit
= _permit
;
357 let result
= async
move {
358 if vm_path
.is_dir() {
359 zip_directory(&mut writer
, &vm_path
).await?
;
361 } else if vm_path
.is_file() {
362 let mut file
= tokio
::fs
::OpenOptions
::new()
366 tokio
::io
::copy(&mut file
, &mut writer
).await?
;
369 bail
!("invalid entry type for path: {:?}", vm_path
);
373 if let Err(err
) = result
{
374 error
!("file or dir streaming task failed - {}", err
);
379 let stream
= tokio_util
::io
::ReaderStream
::new(reader
);
381 let body
= Body
::wrap_stream(stream
);
382 Ok(Response
::builder()
383 .status(StatusCode
::OK
)
384 .header(header
::CONTENT_TYPE
, "application/octet-stream")