1 ///! File-restore API running inside the restore VM
4 use std
::os
::unix
::ffi
::OsStrExt
;
5 use std
::path
::{Path, PathBuf}
;
7 use anyhow
::{bail, Error}
;
8 use futures
::FutureExt
;
9 use hyper
::http
::request
::Parts
;
10 use hyper
::{header, Body, Response, StatusCode}
;
12 use serde_json
::Value
;
13 use tokio
::sync
::Semaphore
;
15 use pathpatterns
::{MatchEntry, MatchPattern, MatchType, Pattern}
;
17 api
, schema
::*, ApiHandler
, ApiMethod
, ApiResponseFuture
, Permission
, Router
, RpcEnvironment
,
20 use proxmox
::{identity, list_subdirs_api_method, sortable}
;
22 use pbs_api_types
::file_restore
::RestoreDaemonStatus
;
23 use pbs_client
::pxar
::{create_archive, Flags, PxarCreateOptions, ENCODER_MAX_ENTRIES}
;
24 use pbs_datastore
::catalog
::{ArchiveEntry, DirEntryAttribute}
;
25 use pbs_tools
::fs
::read_subdir
;
26 use pbs_tools
::json
::required_string_param
;
27 use pbs_tools
::zip
::zip_directory
;
29 use pxar
::encoder
::aio
::TokioWriter
;
31 use super::{disk::ResolveResult, watchdog_remaining, watchdog_inhibit, watchdog_ping}
;
33 // NOTE: All API endpoints must have Permission::Superuser, as the configs for authentication do
34 // not exist within the restore VM. Safety is guaranteed by checking a ticket via a custom ApiAuth.
36 const SUBDIRS
: SubdirMap
= &[
37 ("extract", &Router
::new().get(&API_METHOD_EXTRACT
)),
38 ("list", &Router
::new().get(&API_METHOD_LIST
)),
39 ("status", &Router
::new().get(&API_METHOD_STATUS
)),
40 ("stop", &Router
::new().get(&API_METHOD_STOP
)),
43 pub const ROUTER
: Router
= Router
::new()
44 .get(&list_subdirs_api_method
!(SUBDIRS
))
47 static DOWNLOAD_SEM
: Semaphore
= Semaphore
::const_new(8);
49 fn read_uptime() -> Result
<f32, Error
> {
50 let uptime
= fs
::read_to_string("/proc/uptime")?
;
51 // unwrap the Option, if /proc/uptime is empty we have bigger problems
52 Ok(uptime
.split_ascii_whitespace().next().unwrap().parse()?
)
60 description
: "If true, do not reset the watchdog timer on this API call.",
67 description
: "Permissions are handled outside restore VM. This call can be made without a ticket, but keep-timeout is always assumed 'true' then.",
68 permission
: &Permission
::World
,
71 type: RestoreDaemonStatus
,
74 /// General status information
75 fn status(rpcenv
: &mut dyn RpcEnvironment
, keep_timeout
: bool
) -> Result
<RestoreDaemonStatus
, Error
> {
76 if !keep_timeout
&& rpcenv
.get_auth_id().is_some() {
79 Ok(RestoreDaemonStatus
{
80 uptime
: read_uptime()?
as i64,
81 timeout
: watchdog_remaining(),
87 description
: "Permissions are handled outside restore VM.",
88 permission
: &Permission
::Superuser
,
91 /// Stop the restore VM immediately, this will never return if successful
94 println
!("/stop called, shutting down");
95 let err
= reboot
::reboot(reboot
::RebootMode
::RB_POWER_OFF
).unwrap_err();
96 println
!("'reboot' syscall failed: {}", err
);
97 std
::process
::exit(1);
100 fn get_dir_entry(path
: &Path
) -> Result
<DirEntryAttribute
, Error
> {
103 let stat
= stat
::stat(path
)?
;
104 Ok(match stat
.st_mode
& libc
::S_IFMT
{
105 libc
::S_IFREG
=> DirEntryAttribute
::File
{
106 size
: stat
.st_size
as u64,
107 mtime
: stat
.st_mtime
,
109 libc
::S_IFDIR
=> DirEntryAttribute
::Directory { start: 0 }
,
110 _
=> bail
!("unsupported file type: {}", stat
.st_mode
),
119 description
: "base64-encoded path to list files and directories under",
124 description
: "Permissions are handled outside restore VM.",
125 permission
: &Permission
::Superuser
,
128 /// List file details for given file or a list of files and directories under the given path if it
129 /// points to a directory.
133 _rpcenv
: &mut dyn RpcEnvironment
,
134 ) -> Result
<Vec
<ArchiveEntry
>, Error
> {
137 let mut res
= Vec
::new();
139 let param_path
= base64
::decode(path
)?
;
140 let mut path
= param_path
.clone();
141 if let Some(b'
/'
) = path
.last() {
144 let path_str
= OsStr
::from_bytes(&path
[..]);
145 let param_path_buf
= Path
::new(path_str
);
147 let mut disk_state
= crate::DISK_STATE
.lock().unwrap();
148 let query_result
= disk_state
.resolve(¶m_path_buf
)?
;
151 ResolveResult
::Path(vm_path
) => {
152 let root_entry
= get_dir_entry(&vm_path
)?
;
154 DirEntryAttribute
::File { .. }
=> {
155 // list on file, return details
156 res
.push(ArchiveEntry
::new(¶m_path
, Some(&root_entry
)));
158 DirEntryAttribute
::Directory { .. }
=> {
159 // list on directory, return all contained files/dirs
160 for f
in read_subdir(libc
::AT_FDCWD
, &vm_path
)?
{
162 let name
= f
.file_name().to_bytes();
163 let path
= &Path
::new(OsStr
::from_bytes(name
));
164 if path
.components().count() == 1 {
165 // ignore '.' and '..'
166 match path
.components().next().unwrap() {
167 std
::path
::Component
::CurDir
168 | std
::path
::Component
::ParentDir
=> continue,
173 let mut full_vm_path
= PathBuf
::new();
174 full_vm_path
.push(&vm_path
);
175 full_vm_path
.push(path
);
176 let mut full_path
= PathBuf
::new();
177 full_path
.push(param_path_buf
);
178 full_path
.push(path
);
180 let entry
= get_dir_entry(&full_vm_path
);
181 if let Ok(entry
) = entry
{
182 res
.push(ArchiveEntry
::new(
183 full_path
.as_os_str().as_bytes(),
193 ResolveResult
::BucketTypes(types
) => {
195 let mut t_path
= path
.clone();
197 t_path
.extend(t
.as_bytes());
198 res
.push(ArchiveEntry
::new(
204 ResolveResult
::BucketComponents(comps
) => {
206 let mut c_path
= path
.clone();
208 c_path
.extend(c
.0.as_bytes());
209 res
.push(ArchiveEntry
::new_with_size(
211 // this marks the beginning of a filesystem, i.e. '/', so this is a Directory
212 Some(&DirEntryAttribute
::Directory { start: 0 }
),
223 pub const API_METHOD_EXTRACT
: ApiMethod
= ApiMethod
::new(
224 &ApiHandler
::AsyncHttp(&extract
),
226 "Extract a file or directory from the VM as a pxar archive.",
231 &StringSchema
::new("base64-encoded path to list files and directories under")
237 &BooleanSchema
::new(concat
!(
238 "if true, return a pxar archive, otherwise either the ",
239 "file content or the directory as a zip file"
247 .access(None
, &Permission
::Superuser
);
254 _rpcenv
: Box
<dyn RpcEnvironment
>,
255 ) -> ApiResponseFuture
{
256 // download can take longer than watchdog timeout, inhibit until done
257 let _inhibitor
= watchdog_inhibit();
259 let _inhibitor
= _inhibitor
;
261 let _permit
= match DOWNLOAD_SEM
.try_acquire() {
262 Ok(permit
) => permit
,
263 Err(_
) => bail
!("maximum concurrent download limit reached, please wait for another restore to finish before attempting a new one"),
266 let path
= required_string_param(¶m
, "path")?
;
267 let mut path
= base64
::decode(path
)?
;
268 if let Some(b'
/'
) = path
.last() {
271 let path
= Path
::new(OsStr
::from_bytes(&path
[..]));
273 let pxar
= param
["pxar"].as_bool().unwrap_or(true);
276 let mut disk_state
= crate::DISK_STATE
.lock().unwrap();
277 disk_state
.resolve(&path
)?
280 let vm_path
= match query_result
{
281 ResolveResult
::Path(vm_path
) => vm_path
,
282 _
=> bail
!("invalid path, cannot restore meta-directory: {:?}", path
),
285 // check here so we can return a real error message, failing in the async task will stop
286 // the transfer, but not return a useful message
287 if !vm_path
.exists() {
288 bail
!("file or directory {:?} does not exist", path
);
291 let (mut writer
, reader
) = tokio
::io
::duplex(1024 * 64);
294 tokio
::spawn(async
move {
295 let _inhibitor
= _inhibitor
;
296 let _permit
= _permit
;
297 let result
= async
move {
298 // pxar always expects a directory as it's root, so to accommodate files as
299 // well we encode the parent dir with a filter only matching the target instead
300 let mut patterns
= vec
![MatchEntry
::new(
301 MatchPattern
::Pattern(Pattern
::path(b
"*").unwrap()),
305 let name
= match vm_path
.file_name() {
307 None
=> bail
!("no file name found for path: {:?}", vm_path
),
310 if vm_path
.is_dir() {
311 let mut pat
= name
.as_bytes().to_vec();
312 patterns
.push(MatchEntry
::new(
313 MatchPattern
::Pattern(Pattern
::path(pat
.clone())?
),
316 pat
.extend(b
"/**/*".iter());
317 patterns
.push(MatchEntry
::new(
318 MatchPattern
::Pattern(Pattern
::path(pat
)?
),
322 patterns
.push(MatchEntry
::new(
323 MatchPattern
::Literal(name
.as_bytes().to_vec()),
328 let dir_path
= vm_path
.parent().unwrap_or_else(|| Path
::new("/"));
329 let dir
= nix
::dir
::Dir
::open(
331 nix
::fcntl
::OFlag
::O_NOFOLLOW
,
332 nix
::sys
::stat
::Mode
::empty(),
335 let options
= PxarCreateOptions
{
336 entries_max
: ENCODER_MAX_ENTRIES
,
340 skip_lost_and_found
: false,
343 let pxar_writer
= TokioWriter
::new(writer
);
344 create_archive(dir
, pxar_writer
, Flags
::DEFAULT
, |_
| Ok(()), None
, options
)
348 if let Err(err
) = result
{
349 error
!("pxar streaming task failed - {}", err
);
353 tokio
::spawn(async
move {
354 let _inhibitor
= _inhibitor
;
355 let _permit
= _permit
;
356 let result
= async
move {
357 if vm_path
.is_dir() {
358 zip_directory(&mut writer
, &vm_path
).await?
;
360 } else if vm_path
.is_file() {
361 let mut file
= tokio
::fs
::OpenOptions
::new()
365 tokio
::io
::copy(&mut file
, &mut writer
).await?
;
368 bail
!("invalid entry type for path: {:?}", vm_path
);
372 if let Err(err
) = result
{
373 error
!("file or dir streaming task failed - {}", err
);
378 let stream
= tokio_util
::io
::ReaderStream
::new(reader
);
380 let body
= Body
::wrap_stream(stream
);
381 Ok(Response
::builder()
382 .status(StatusCode
::OK
)
383 .header(header
::CONTENT_TYPE
, "application/octet-stream")