]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/api2/pull.rs
api: datastore create: allow re-using existing dirs if empty & not a mountpoint
[proxmox-backup.git] / src / api2 / pull.rs
index 7028593aaf8ac13fee5254fb04ebb6e20852b287..59db36603f5cb2edf1235a2aec1e4f6007a96176 100644 (file)
 //! Sync datastore from remote server
-
-use failure::*;
-use serde_json::json;
-use std::convert::TryFrom;
-use std::sync::Arc;
-use std::collections::HashMap;
-use std::io::{Seek, SeekFrom};
-use chrono::{Utc, TimeZone};
-
-use proxmox::api::api;
-use proxmox::api::{ApiMethod, Router, RpcEnvironment};
-
-use crate::server::{WorkerTask};
-use crate::backup::*;
-use crate::client::*;
-use crate::config::remote;
-use crate::api2::types::*;
-
-// fixme: implement filters
-// fixme: delete vanished groups
-// Todo: correctly lock backup groups
-
-async fn pull_index_chunks<I: IndexFile>(
-    _worker: &WorkerTask,
-    chunk_reader: &mut RemoteChunkReader,
-    target: Arc<DataStore>,
-    index: I,
+use anyhow::{bail, format_err, Error};
+use futures::{future::FutureExt, select};
+
+use proxmox_router::{Permission, Router, RpcEnvironment};
+use proxmox_schema::api;
+use proxmox_sys::task_log;
+
+use pbs_api_types::{
+    Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
+    GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
+    PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
+    TRANSFER_LAST_SCHEMA,
+};
+use pbs_config::CachedUserInfo;
+use proxmox_human_byte::HumanByte;
+use proxmox_rest_server::WorkerTask;
+
+use crate::server::jobstate::Job;
+use crate::server::pull::{pull_store, PullParameters};
+
+pub fn check_pull_privs(
+    auth_id: &Authid,
+    store: &str,
+    ns: Option<&str>,
+    remote: Option<&str>,
+    remote_store: &str,
+    delete: bool,
 ) -> Result<(), Error> {
+    let user_info = CachedUserInfo::new()?;
+
+    let local_store_ns_acl_path = match ns {
+        Some(ns) => vec!["datastore", store, ns],
+        None => vec!["datastore", store],
+    };
+
+    user_info.check_privs(
+        auth_id,
+        &local_store_ns_acl_path,
+        PRIV_DATASTORE_BACKUP,
+        false,
+    )?;
+
+    if let Some(remote) = remote {
+        user_info.check_privs(
+            auth_id,
+            &["remote", remote, remote_store],
+            PRIV_REMOTE_READ,
+            false,
+        )?;
+    } else {
+        user_info.check_privs(
+            auth_id,
+            &["datastore", remote_store],
+            PRIV_DATASTORE_BACKUP,
+            false,
+        )?;
+    }
 
-
-    for pos in 0..index.index_count() {
-        let digest = index.index_digest(pos).unwrap();
-        let chunk_exists = target.cond_touch_chunk(digest, false)?;
-        if chunk_exists {
-            //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
-            continue;
-        }
-        //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
-        let chunk = chunk_reader.read_raw_chunk(&digest)?;
-
-        target.insert_chunk(&chunk, &digest)?;
+    if delete {
+        user_info.check_privs(
+            auth_id,
+            &local_store_ns_acl_path,
+            PRIV_DATASTORE_PRUNE,
+            false,
+        )?;
     }
 
     Ok(())
 }
 
-async fn download_manifest(
-    reader: &BackupReader,
-    filename: &std::path::Path,
-) -> Result<std::fs::File, Error> {
-
-    let tmp_manifest_file = std::fs::OpenOptions::new()
-        .write(true)
-        .create(true)
-        .read(true)
-        .open(&filename)?;
-
-    let mut tmp_manifest_file = reader.download(MANIFEST_BLOB_NAME, tmp_manifest_file).await?;
-
-    tmp_manifest_file.seek(SeekFrom::Start(0))?;
-
-    Ok(tmp_manifest_file)
-}
-
-async fn pull_single_archive(
-    worker: &WorkerTask,
-    reader: &BackupReader,
-    chunk_reader: &mut RemoteChunkReader,
-    tgt_store: Arc<DataStore>,
-    snapshot: &BackupDir,
-    archive_name: &str,
-) -> Result<(), Error> {
-
-    let mut path = tgt_store.base_path();
-    path.push(snapshot.relative_path());
-    path.push(archive_name);
-
-    let mut tmp_path = path.clone();
-    tmp_path.set_extension("tmp");
-
-    worker.log(format!("sync archive {}", archive_name));
-    let tmpfile = std::fs::OpenOptions::new()
-        .write(true)
-        .create(true)
-        .read(true)
-        .open(&tmp_path)?;
-
-    let tmpfile = reader.download(archive_name, tmpfile).await?;
-
-    match archive_type(archive_name)? {
-        ArchiveType::DynamicIndex => {
-            let index = DynamicIndexReader::new(tmpfile)
-                .map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?;
-
-            pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
-        }
-        ArchiveType::FixedIndex => {
-            let index = FixedIndexReader::new(tmpfile)
-                .map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?;
-
-            pull_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?;
-        }
-        ArchiveType::Blob => { /* nothing to do */ }
+impl TryFrom<&SyncJobConfig> for PullParameters {
+    type Error = Error;
+
+    fn try_from(sync_job: &SyncJobConfig) -> Result<Self, Self::Error> {
+        PullParameters::new(
+            &sync_job.store,
+            sync_job.ns.clone().unwrap_or_default(),
+            sync_job.remote.as_deref(),
+            &sync_job.remote_store,
+            sync_job.remote_ns.clone().unwrap_or_default(),
+            sync_job
+                .owner
+                .as_ref()
+                .unwrap_or_else(|| Authid::root_auth_id())
+                .clone(),
+            sync_job.remove_vanished,
+            sync_job.max_depth,
+            sync_job.group_filter.clone(),
+            sync_job.limit.clone(),
+            sync_job.transfer_last,
+        )
     }
-    if let Err(err) = std::fs::rename(&tmp_path, &path) {
-        bail!("Atomic rename file {:?} failed - {}", path, err);
-    }
-    Ok(())
 }
 
-async fn pull_snapshot(
-    worker: &WorkerTask,
-    reader: Arc<BackupReader>,
-    tgt_store: Arc<DataStore>,
-    snapshot: &BackupDir,
-) -> Result<(), Error> {
-
-    let mut manifest_name = tgt_store.base_path();
-    manifest_name.push(snapshot.relative_path());
-    manifest_name.push(MANIFEST_BLOB_NAME);
-
-    let mut tmp_manifest_name = manifest_name.clone();
-    tmp_manifest_name.set_extension("tmp");
+pub fn do_sync_job(
+    mut job: Job,
+    sync_job: SyncJobConfig,
+    auth_id: &Authid,
+    schedule: Option<String>,
+    to_stdout: bool,
+) -> Result<String, Error> {
+    let job_id = format!(
+        "{}:{}:{}:{}:{}",
+        sync_job.remote.as_deref().unwrap_or("-"),
+        sync_job.remote_store,
+        sync_job.store,
+        sync_job.ns.clone().unwrap_or_default(),
+        job.jobname()
+    );
+    let worker_type = job.jobtype().to_string();
+
+    if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
+        bail!("can't sync to same datastore");
+    }
 
-    let mut tmp_manifest_file = download_manifest(&reader, &tmp_manifest_name).await?;
-    let tmp_manifest_blob = DataBlob::load(&mut tmp_manifest_file)?;
-    tmp_manifest_blob.verify_crc()?;
+    let (email, notify) = crate::server::lookup_datastore_notify_settings(&sync_job.store);
 
-    if manifest_name.exists() {
-        let manifest_blob = proxmox::tools::try_block!({
-            let mut manifest_file = std::fs::File::open(&manifest_name)
-                .map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?;
+    let upid_str = WorkerTask::spawn(
+        &worker_type,
+        Some(job_id.clone()),
+        auth_id.to_string(),
+        to_stdout,
+        move |worker| async move {
+            job.start(&worker.upid().to_string())?;
 
-            let manifest_blob = DataBlob::load(&mut manifest_file)?;
-            manifest_blob.verify_crc()?;
-            Ok(manifest_blob)
-        }).map_err(|err: Error| {
-            format_err!("unable to read local manifest {:?} - {}", manifest_name, err)
-        })?;
+            let worker2 = worker.clone();
+            let sync_job2 = sync_job.clone();
 
-        if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
-            return Ok(()); // nothing changed
-        }
-    }
+            let worker_future = async move {
+                let pull_params = PullParameters::try_from(&sync_job)?;
 
-    let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
-
-    let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, HashMap::new());
-
-    for item in manifest.files() {
-        let mut path = tgt_store.base_path();
-        path.push(snapshot.relative_path());
-        path.push(&item.filename);
-
-        if path.exists() {
-            match archive_type(&item.filename)? {
-                ArchiveType::DynamicIndex => {
-                    let index = DynamicIndexReader::open(&path)?;
-                    let (csum, size) = index.compute_csum();
-                    match manifest.verify_file(&item.filename, &csum, size) {
-                        Ok(_) => continue,
-                        Err(err) => {
-                            worker.log(format!("detected changed file {:?} - {}", path, err));
-                        }
-                    }
-                }
-                ArchiveType::FixedIndex => {
-                    let index = FixedIndexReader::open(&path)?;
-                    let (csum, size) = index.compute_csum();
-                    match manifest.verify_file(&item.filename, &csum, size) {
-                        Ok(_) => continue,
-                        Err(err) => {
-                            worker.log(format!("detected changed file {:?} - {}", path, err));
-                        }
-                    }
+                task_log!(worker, "Starting datastore sync job '{}'", job_id);
+                if let Some(event_str) = schedule {
+                    task_log!(worker, "task triggered by schedule '{}'", event_str);
                 }
-                ArchiveType::Blob => {
-                    let mut tmpfile = std::fs::File::open(&path)?;
-                    let (csum, size) = compute_file_csum(&mut tmpfile)?;
-                    match manifest.verify_file(&item.filename, &csum, size) {
-                        Ok(_) => continue,
-                        Err(err) => {
-                            worker.log(format!("detected changed file {:?} - {}", path, err));
-                        }
-                    }
+                task_log!(
+                    worker,
+                    "sync datastore '{}' from '{}{}'",
+                    sync_job.store,
+                    sync_job
+                        .remote
+                        .as_deref()
+                        .map_or(String::new(), |remote| format!("{remote}/")),
+                    sync_job.remote_store,
+                );
+
+                let pull_stats = pull_store(&worker, pull_params).await?;
+
+                if pull_stats.bytes != 0 {
+                    let amount = HumanByte::from(pull_stats.bytes);
+                    let rate = HumanByte::new_binary(
+                        pull_stats.bytes as f64 / pull_stats.elapsed.as_secs_f64(),
+                    );
+                    task_log!(
+                        worker,
+                        "Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)",
+                        pull_stats.chunk_count,
+                    );
+                } else {
+                    task_log!(worker, "Summary: sync job found no new data to pull");
                 }
-            }
-        }
-
-        pull_single_archive(
-            worker,
-            &reader,
-            &mut chunk_reader,
-            tgt_store.clone(),
-            snapshot,
-            &item.filename,
-        ).await?;
-    }
 
-    if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
-        bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
-    }
+                if let Some(removed) = pull_stats.removed {
+                    task_log!(
+                        worker,
+                        "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
+                        removed.snapshots,
+                        removed.groups,
+                        removed.namespaces,
+                    );
+                }
 
-    // cleanup - remove stale files
-    tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
+                task_log!(worker, "sync job '{}' end", &job_id);
 
-    Ok(())
-}
+                Ok(())
+            };
 
-pub async fn pull_snapshot_from(
-    worker: &WorkerTask,
-    reader: Arc<BackupReader>,
-    tgt_store: Arc<DataStore>,
-    snapshot: &BackupDir,
-) -> Result<(), Error> {
+            let mut abort_future = worker2
+                .abort_future()
+                .map(|_| Err(format_err!("sync aborted")));
 
-    let (_path, is_new) = tgt_store.create_backup_dir(&snapshot)?;
+            let result = select! {
+                worker = worker_future.fuse() => worker,
+                abort = abort_future => abort,
+            };
 
-    if is_new {
-        worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
+            let status = worker2.create_state(&result);
 
-        if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await {
-            if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot) {
-                worker.log(format!("cleanup error - {}", cleanup_err));
+            match job.finish(status) {
+                Ok(_) => {}
+                Err(err) => {
+                    eprintln!("could not finish job state: {}", err);
+                }
             }
-            return Err(err);
-        }
-    } else {
-        worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
-        pull_snapshot(worker, reader, tgt_store.clone(), &snapshot).await?
-    }
-
-    Ok(())
-}
-
-pub async fn pull_group(
-    worker: &WorkerTask,
-    client: &HttpClient,
-    src_repo: &BackupRepository,
-    tgt_store: Arc<DataStore>,
-    group: &BackupGroup,
-) -> Result<(), Error> {
-
-    let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
-
-    let args = json!({
-        "backup-type": group.backup_type(),
-        "backup-id": group.backup_id(),
-    });
-
-    let mut result = client.get(&path, Some(args)).await?;
-    let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
 
-    list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
-
-    let auth_info = client.login().await?;
-
-    let last_sync = group.last_successful_backup(&tgt_store.base_path())?;
-
-    for item in list {
-        let backup_time = Utc.timestamp(item.backup_time, 0);
-        if let Some(last_sync_time) = last_sync {
-            if last_sync_time > backup_time { continue; }
-        }
-
-        let new_client = HttpClient::new(
-            src_repo.host(),
-            src_repo.user(),
-            Some(auth_info.ticket.clone())
-        )?;
-
-        let reader = BackupReader::start(
-            new_client,
-            None,
-            src_repo.store(),
-            &item.backup_type,
-            &item.backup_id,
-            backup_time,
-            true,
-        ).await?;
-
-        let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time);
-
-        pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?;
-    }
-
-    Ok(())
-}
-
-pub async fn pull_store(
-    worker: &WorkerTask,
-    client: &HttpClient,
-    src_repo: &BackupRepository,
-    tgt_store: Arc<DataStore>,
-) -> Result<(), Error> {
-
-    let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
-
-    let mut result = client.get(&path, None).await?;
-
-    let list = result["data"].as_array_mut().unwrap();
-
-    list.sort_unstable_by(|a, b| {
-        let a_id = a["backup-id"].as_str().unwrap();
-        let a_backup_type = a["backup-type"].as_str().unwrap();
-        let b_id = b["backup-id"].as_str().unwrap();
-        let b_backup_type = b["backup-type"].as_str().unwrap();
-
-        let type_order = a_backup_type.cmp(b_backup_type);
-        if type_order == std::cmp::Ordering::Equal {
-            a_id.cmp(b_id)
-        } else {
-            type_order
-        }
-    });
-
-    let mut errors = false;
-
-    for item in list {
-
-        let id = item["backup-id"].as_str().unwrap();
-        let btype = item["backup-type"].as_str().unwrap();
-
-        let group = BackupGroup::new(btype, id);
-        if let Err(err) = pull_group(worker, client, src_repo, tgt_store.clone(), &group).await {
-            worker.log(format!("sync group {}/{} failed - {}", btype, id, err));
-            errors = true;
-            // continue
-        }
-    }
+            if let Some(email) = email {
+                if let Err(err) =
+                    crate::server::send_sync_status(&email, notify, &sync_job2, &result)
+                {
+                    eprintln!("send sync notification failed: {}", err);
+                }
+            }
 
-    if errors {
-        bail!("sync failed with some errors.");
-    }
+            result
+        },
+    )?;
 
-    Ok(())
+    Ok(upid_str)
 }
 
 #[api(
@@ -343,55 +215,133 @@ pub async fn pull_store(
             store: {
                 schema: DATASTORE_SCHEMA,
             },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
+            },
             remote: {
                 schema: REMOTE_ID_SCHEMA,
+                optional: true,
             },
             "remote-store": {
                 schema: DATASTORE_SCHEMA,
             },
+            "remote-ns": {
+                type: BackupNamespace,
+                optional: true,
+            },
+            "remove-vanished": {
+                schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
+                optional: true,
+            },
+            "max-depth": {
+                schema: NS_MAX_DEPTH_REDUCED_SCHEMA,
+                optional: true,
+            },
+            "group-filter": {
+                schema: GROUP_FILTER_LIST_SCHEMA,
+                optional: true,
+            },
+            limit: {
+                type: RateLimitConfig,
+                flatten: true,
+            },
+            "transfer-last": {
+                schema: TRANSFER_LAST_SCHEMA,
+                optional: true,
+            },
         },
     },
+    access: {
+        // Note: used parameters are no uri parameters, so we need to test inside function body
+        description: r###"The user needs Datastore.Backup privilege on '/datastore/{store}',
+and needs to own the backup group. Remote.Read is required on '/remote/{remote}/{remote-store}'.
+The delete flag additionally requires the Datastore.Prune privilege on '/datastore/{store}'.
+"###,
+        permission: &Permission::Anybody,
+    },
 )]
 /// Sync store from other repository
-async fn pull (
+#[allow(clippy::too_many_arguments)]
+async fn pull(
     store: String,
-    remote: String,
+    ns: Option<BackupNamespace>,
+    remote: Option<String>,
     remote_store: String,
-    _info: &ApiMethod,
+    remote_ns: Option<BackupNamespace>,
+    remove_vanished: Option<bool>,
+    max_depth: Option<usize>,
+    group_filter: Option<Vec<GroupFilter>>,
+    limit: RateLimitConfig,
+    transfer_last: Option<usize>,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<String, Error> {
+    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let delete = remove_vanished.unwrap_or(false);
 
-    let username = rpcenv.get_user().unwrap();
-
-    let tgt_store = DataStore::lookup_datastore(&store)?;
-
-    let (remote_config, _digest) = remote::config()?;
-    let remote: remote::Remote = remote_config.lookup("remote", &remote)?;
-
-    let client = HttpClient::new(&remote.host, &remote.userid, Some(remote.password.clone()))?;
-    let _auth_info = client.login() // make sure we can auth
-        .await
-        .map_err(|err| format_err!("remote connection to '{}' failed - {}", remote.host, err))?;
+    if remote.is_none() && store == remote_store {
+        bail!("can't sync to same datastore");
+    }
 
-    let src_repo = BackupRepository::new(Some(remote.userid), Some(remote.host), remote_store);
+    let ns = ns.unwrap_or_default();
+    let ns_str = if ns.is_root() {
+        None
+    } else {
+        Some(ns.to_string())
+    };
+
+    check_pull_privs(
+        &auth_id,
+        &store,
+        ns_str.as_deref(),
+        remote.as_deref(),
+        &remote_store,
+        delete,
+    )?;
+
+    let pull_params = PullParameters::new(
+        &store,
+        ns,
+        remote.as_deref(),
+        &remote_store,
+        remote_ns.unwrap_or_default(),
+        auth_id.clone(),
+        remove_vanished,
+        max_depth,
+        group_filter,
+        limit,
+        transfer_last,
+    )?;
 
     // fixme: set to_stdout to false?
-    let upid_str = WorkerTask::spawn("sync", Some(store.clone()), &username.clone(), true, move |worker| async move {
-
-        worker.log(format!("sync datastore '{}' start", store));
-
-        // explicit create shared lock to prevent GC on newly created chunks
-        let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
-
-        pull_store(&worker, &client, &src_repo, tgt_store.clone()).await?;
-
-        worker.log(format!("sync datastore '{}' end", store));
-
-        Ok(())
-    })?;
+    // FIXME: add namespace to worker id?
+    let upid_str = WorkerTask::spawn(
+        "sync",
+        Some(store.clone()),
+        auth_id.to_string(),
+        true,
+        move |worker| async move {
+            task_log!(
+                worker,
+                "pull datastore '{}' from '{}/{}'",
+                store,
+                remote.as_deref().unwrap_or("-"),
+                remote_store,
+            );
+
+            let pull_future = pull_store(&worker, pull_params);
+            (select! {
+                success = pull_future.fuse() => success,
+                abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
+            })?;
+
+            task_log!(worker, "pull datastore '{}' end", store);
+
+            Ok(())
+        },
+    )?;
 
     Ok(upid_str)
 }
 
-pub const ROUTER: Router = Router::new()
-    .post(&API_METHOD_PULL);
+pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);