]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/bin/proxmox_restore_daemon/api.rs
move client to pbs-client subcrate
[proxmox-backup.git] / src / bin / proxmox_restore_daemon / api.rs
index 7ac70278b8e223dad11b8489b4aee2d3164201a6..fbbda13cc4ea0c9dc7349123ee753695bd901ac3 100644 (file)
@@ -1,4 +1,9 @@
 ///! File-restore API running inside the restore VM
+use std::ffi::OsStr;
+use std::fs;
+use std::os::unix::ffi::OsStrExt;
+use std::path::{Path, PathBuf};
+
 use anyhow::{bail, Error};
 use futures::FutureExt;
 use hyper::http::request::Parts;
@@ -6,11 +11,7 @@ use hyper::{header, Body, Response, StatusCode};
 use log::error;
 use pathpatterns::{MatchEntry, MatchPattern, MatchType, Pattern};
 use serde_json::Value;
-
-use std::ffi::OsStr;
-use std::fs;
-use std::os::unix::ffi::OsStrExt;
-use std::path::{Path, PathBuf};
+use tokio::sync::Semaphore;
 
 use proxmox::api::{
     api, schema::*, ApiHandler, ApiMethod, ApiResponseFuture, Permission, Router, RpcEnvironment,
@@ -18,14 +19,17 @@ use proxmox::api::{
 };
 use proxmox::{identity, list_subdirs_api_method, sortable};
 
+use pbs_client::pxar::{create_archive, Flags, PxarCreateOptions, ENCODER_MAX_ENTRIES};
+use pbs_tools::fs::read_subdir;
+use pbs_tools::zip::zip_directory;
+
 use proxmox_backup::api2::types::*;
 use proxmox_backup::backup::DirEntryAttribute;
-use proxmox_backup::pxar::{create_archive, Flags, PxarCreateOptions, ENCODER_MAX_ENTRIES};
-use proxmox_backup::tools::{self, fs::read_subdir, zip::zip_directory};
+use proxmox_backup::tools;
 
 use pxar::encoder::aio::TokioWriter;
 
-use super::{disk::ResolveResult, watchdog_remaining, watchdog_ping};
+use super::{disk::ResolveResult, watchdog_remaining, watchdog_inhibit, watchdog_ping};
 
 // NOTE: All API endpoints must have Permission::Superuser, as the configs for authentication do
 // not exist within the restore VM. Safety is guaranteed by checking a ticket via a custom ApiAuth.
@@ -41,6 +45,8 @@ pub const ROUTER: Router = Router::new()
     .get(&list_subdirs_api_method!(SUBDIRS))
     .subdirs(SUBDIRS);
 
+static DOWNLOAD_SEM: Semaphore = Semaphore::const_new(8);
+
 fn read_uptime() -> Result<f32, Error> {
     let uptime = fs::read_to_string("/proc/uptime")?;
     // unwrap the Option, if /proc/uptime is empty we have bigger problems
@@ -148,7 +154,7 @@ fn list(
             match root_entry {
                 DirEntryAttribute::File { .. } => {
                     // list on file, return details
-                    res.push(ArchiveEntry::new(&param_path, &root_entry));
+                    res.push(ArchiveEntry::new(&param_path, Some(&root_entry)));
                 }
                 DirEntryAttribute::Directory { .. } => {
                     // list on directory, return all contained files/dirs
@@ -176,7 +182,7 @@ fn list(
                             if let Ok(entry) = entry {
                                 res.push(ArchiveEntry::new(
                                     full_path.as_os_str().as_bytes(),
-                                    &entry,
+                                    Some(&entry),
                                 ));
                             }
                         }
@@ -192,7 +198,7 @@ fn list(
                 t_path.extend(t.as_bytes());
                 res.push(ArchiveEntry::new(
                     &t_path[..],
-                    &DirEntryAttribute::Directory { start: 0 },
+                    None,
                 ));
             }
         }
@@ -200,10 +206,12 @@ fn list(
             for c in comps {
                 let mut c_path = path.clone();
                 c_path.push(b'/');
-                c_path.extend(c.as_bytes());
-                res.push(ArchiveEntry::new(
+                c_path.extend(c.0.as_bytes());
+                res.push(ArchiveEntry::new_with_size(
                     &c_path[..],
-                    &DirEntryAttribute::Directory { start: 0 },
+                    // this marks the beginning of a filesystem, i.e. '/', so this is a Directory
+                    Some(&DirEntryAttribute::Directory { start: 0 }),
+                    c.1,
                 ));
             }
         }
@@ -246,8 +254,16 @@ fn extract(
     _info: &ApiMethod,
     _rpcenv: Box<dyn RpcEnvironment>,
 ) -> ApiResponseFuture {
-    watchdog_ping();
+    // download can take longer than watchdog timeout, inhibit until done
+    let _inhibitor = watchdog_inhibit();
     async move {
+        let _inhibitor = _inhibitor;
+
+        let _permit = match DOWNLOAD_SEM.try_acquire() {
+            Ok(permit) => permit,
+            Err(_) => bail!("maximum concurrent download limit reached, please wait for another restore to finish before attempting a new one"),
+        };
+
         let path = tools::required_string_param(&param, "path")?;
         let mut path = base64::decode(path)?;
         if let Some(b'/') = path.last() {
@@ -277,6 +293,8 @@ fn extract(
 
         if pxar {
             tokio::spawn(async move {
+                let _inhibitor = _inhibitor;
+                let _permit = _permit;
                 let result = async move {
                     // pxar always expects a directory as it's root, so to accommodate files as
                     // well we encode the parent dir with a filter only matching the target instead
@@ -334,6 +352,8 @@ fn extract(
             });
         } else {
             tokio::spawn(async move {
+                let _inhibitor = _inhibitor;
+                let _permit = _permit;
                 let result = async move {
                     if vm_path.is_dir() {
                         zip_directory(&mut writer, &vm_path).await?;