]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/api2/admin/datastore.rs
replace 'disk_usage' with 'fs_info' from proxmox-sys
[proxmox-backup.git] / src / api2 / admin / datastore.rs
index b6a51b6939886cf1d5a0635e0fe9719d7dd2d04f..ad2744b718263c7cc08f3173f65dfe9fcb85e2e6 100644 (file)
@@ -32,14 +32,14 @@ use pxar::accessor::aio::Accessor;
 use pxar::EntryKind;
 
 use pbs_api_types::{
-    Authid, BackupContent, BackupNamespace, BackupType, Counts, CryptMode, DataStoreListItem,
-    DataStoreStatus, DatastoreWithNamespace, GarbageCollectionStatus, GroupListItem, Operation,
-    PruneOptions, RRDMode, RRDTimeFrame, SnapshotListItem, SnapshotVerifyState,
-    BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA,
-    BACKUP_TYPE_SCHEMA, DATASTORE_SCHEMA, IGNORE_VERIFIED_BACKUPS_SCHEMA, MAX_NAMESPACE_DEPTH,
-    NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY,
-    PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, UPID_SCHEMA,
-    VERIFICATION_OUTDATED_AFTER_SCHEMA,
+    print_ns_and_snapshot, print_store_and_ns, Authid, BackupContent, BackupNamespace, BackupType,
+    Counts, CryptMode, DataStoreListItem, DataStoreStatus, GarbageCollectionStatus, GroupListItem,
+    KeepOptions, Operation, PruneJobOptions, RRDMode, RRDTimeFrame, SnapshotListItem,
+    SnapshotVerifyState, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA,
+    BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, DATASTORE_SCHEMA, IGNORE_VERIFIED_BACKUPS_SCHEMA,
+    MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
+    PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY,
+    UPID_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,
 };
 use pbs_client::pxar::{create_tar, create_zip};
 use pbs_config::CachedUserInfo;
@@ -63,8 +63,8 @@ use proxmox_rest_server::{formatter, WorkerTask};
 use crate::api2::backup::optional_ns_param;
 use crate::api2::node::rrd::create_value_from_rrd;
 use crate::backup::{
-    verify_all_backups, verify_backup_dir, verify_backup_group, verify_filter,
-    ListAccessibleBackupGroups,
+    check_ns_privs_full, verify_all_backups, verify_backup_dir, verify_backup_group, verify_filter,
+    ListAccessibleBackupGroups, NS_PRIVS_OK,
 };
 
 use crate::server::jobstate::Job;
@@ -81,38 +81,6 @@ fn get_group_note_path(
     note_path
 }
 
-// TODO: move somewhere we can reuse it from (namespace has its own copy atm.)
-fn get_ns_privs(store: &str, ns: &BackupNamespace, auth_id: &Authid) -> Result<u64, Error> {
-    let user_info = CachedUserInfo::new()?;
-
-    Ok(if ns.is_root() {
-        user_info.lookup_privs(auth_id, &["datastore", store])
-    } else {
-        user_info.lookup_privs(auth_id, &["datastore", store, &ns.to_string()])
-    })
-}
-
-// asserts that either either `full_access_privs` or `partial_access_privs` are fulfilled,
-// returning value indicates whether further checks like group ownerships are required
-fn check_ns_privs(
-    store: &str,
-    ns: &BackupNamespace,
-    auth_id: &Authid,
-    full_access_privs: u64,
-    partial_access_privs: u64,
-) -> Result<bool, Error> {
-    let privs = get_ns_privs(store, ns, auth_id)?;
-
-    if full_access_privs != 0 && (privs & full_access_privs) != 0 {
-        return Ok(false);
-    }
-    if partial_access_privs != 0 && (privs & partial_access_privs) != 0 {
-        return Ok(true);
-    }
-
-    proxmox_router::http_bail!(FORBIDDEN, "permission check failed");
-}
-
 // helper to unify common sequence of checks:
 // 1. check privs on NS (full or limited access)
 // 2. load datastore
@@ -126,12 +94,12 @@ fn check_privs_and_load_store(
     operation: Option<Operation>,
     backup_group: &pbs_api_types::BackupGroup,
 ) -> Result<Arc<DataStore>, Error> {
-    let limited = check_ns_privs(store, ns, auth_id, full_access_privs, partial_access_privs)?;
+    let limited = check_ns_privs_full(store, ns, auth_id, full_access_privs, partial_access_privs)?;
 
-    let datastore = DataStore::lookup_datastore(&store, operation)?;
+    let datastore = DataStore::lookup_datastore(store, operation)?;
 
     if limited {
-        let owner = datastore.get_owner(&ns, backup_group)?;
+        let owner = datastore.get_owner(ns, backup_group)?;
         check_backup_owner(&owner, &auth_id)?;
     }
 
@@ -214,9 +182,9 @@ pub fn list_groups(
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Vec<GroupListItem>, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-
     let ns = ns.unwrap_or_default();
-    let list_all = !check_ns_privs(
+
+    let list_all = !check_ns_privs_full(
         &store,
         &ns,
         &auth_id,
@@ -230,11 +198,16 @@ pub fn list_groups(
         .iter_backup_groups(ns.clone())? // FIXME: Namespaces and recursion parameters!
         .try_fold(Vec::new(), |mut group_info, group| {
             let group = group?;
+
             let owner = match datastore.get_owner(&ns, group.as_ref()) {
                 Ok(auth_id) => auth_id,
                 Err(err) => {
-                    let id = &store;
-                    eprintln!("Failed to get owner of group '{}/{}' - {}", id, group, err);
+                    eprintln!(
+                        "Failed to get owner of group '{}' in {} - {}",
+                        group.group(),
+                        print_store_and_ns(&store, &ns),
+                        err
+                    );
                     return Ok(group_info);
                 }
             };
@@ -308,7 +281,6 @@ pub fn delete_group(
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-
     let ns = ns.unwrap_or_default();
 
     let datastore = check_privs_and_load_store(
@@ -358,7 +330,6 @@ pub fn list_snapshot_files(
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Vec<BackupContent>, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-
     let ns = ns.unwrap_or_default();
 
     let datastore = check_privs_and_load_store(
@@ -409,8 +380,8 @@ pub fn delete_snapshot(
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-
     let ns = ns.unwrap_or_default();
+
     let datastore = check_privs_and_load_store(
         &store,
         &ns,
@@ -468,7 +439,7 @@ pub fn list_snapshots(
 
     let ns = ns.unwrap_or_default();
 
-    let list_all = !check_ns_privs(
+    let list_all = !check_ns_privs_full(
         &store,
         &ns,
         &auth_id,
@@ -482,20 +453,20 @@ pub fn list_snapshots(
     // backup group and provide an error free (Err -> None) accessor
     let groups = match (backup_type, backup_id) {
         (Some(backup_type), Some(backup_id)) => {
-            vec![datastore.backup_group_from_parts(ns, backup_type, backup_id)]
+            vec![datastore.backup_group_from_parts(ns.clone(), backup_type, backup_id)]
         }
         // FIXME: Recursion
         (Some(backup_type), None) => datastore
-            .iter_backup_groups_ok(ns)?
+            .iter_backup_groups_ok(ns.clone())?
             .filter(|group| group.backup_type() == backup_type)
             .collect(),
         // FIXME: Recursion
         (None, Some(backup_id)) => datastore
-            .iter_backup_groups_ok(ns)?
+            .iter_backup_groups_ok(ns.clone())?
             .filter(|group| group.backup_id() == backup_id)
             .collect(),
         // FIXME: Recursion
-        (None, None) => datastore.list_backup_groups(ns)?,
+        (None, None) => datastore.list_backup_groups(ns.clone())?,
     };
 
     let info_to_snapshot_list_item = |group: &BackupGroup, owner, info: BackupInfo| {
@@ -575,8 +546,10 @@ pub fn list_snapshots(
             Ok(auth_id) => auth_id,
             Err(err) => {
                 eprintln!(
-                    "Failed to get owner of group '{}/{}' - {}",
-                    &store, group, err
+                    "Failed to get owner of group '{}' in {} - {}",
+                    group.group(),
+                    print_store_and_ns(&store, &ns),
+                    err
                 );
                 return Ok(snapshots);
             }
@@ -600,30 +573,35 @@ pub fn list_snapshots(
 
 fn get_snapshots_count(store: &Arc<DataStore>, owner: Option<&Authid>) -> Result<Counts, Error> {
     let root_ns = Default::default();
-    ListAccessibleBackupGroups::new(store, root_ns, MAX_NAMESPACE_DEPTH, owner)?.try_fold(
-        Counts::default(),
-        |mut counts, group| {
-            let group = match group {
-                Ok(group) => group,
-                Err(_) => return Ok(counts), // TODO: add this as error counts?
+    ListAccessibleBackupGroups::new_with_privs(
+        store,
+        root_ns,
+        MAX_NAMESPACE_DEPTH,
+        Some(PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_READ),
+        None,
+        owner,
+    )?
+    .try_fold(Counts::default(), |mut counts, group| {
+        let group = match group {
+            Ok(group) => group,
+            Err(_) => return Ok(counts), // TODO: add this as error counts?
+        };
+        let snapshot_count = group.list_backups()?.len() as u64;
+
+        // only include groups with snapshots, counting/displaying empty groups can confuse
+        if snapshot_count > 0 {
+            let type_count = match group.backup_type() {
+                BackupType::Ct => counts.ct.get_or_insert(Default::default()),
+                BackupType::Vm => counts.vm.get_or_insert(Default::default()),
+                BackupType::Host => counts.host.get_or_insert(Default::default()),
             };
-            let snapshot_count = group.list_backups()?.len() as u64;
-
-            // only include groups with snapshots, counting/displaying emtpy groups can confuse
-            if snapshot_count > 0 {
-                let type_count = match group.backup_type() {
-                    BackupType::Ct => counts.ct.get_or_insert(Default::default()),
-                    BackupType::Vm => counts.vm.get_or_insert(Default::default()),
-                    BackupType::Host => counts.host.get_or_insert(Default::default()),
-                };
 
-                type_count.groups += 1;
-                type_count.snapshots += snapshot_count;
-            }
+            type_count.groups += 1;
+            type_count.snapshots += snapshot_count;
+        }
 
-            Ok(counts)
-        },
-    )
+        Ok(counts)
+    })
 }
 
 #[api(
@@ -645,8 +623,9 @@ fn get_snapshots_count(store: &Arc<DataStore>, owner: Option<&Authid>) -> Result
         type: DataStoreStatus,
     },
     access: {
-        permission: &Permission::Privilege(
-            &["datastore", "{store}"], PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_BACKUP, true),
+        permission: &Permission::Anybody,
+        description: "Requires on /datastore/{store} either DATASTORE_AUDIT or DATASTORE_BACKUP for \
+            the full statistics. Counts of accessible groups are always returned, if any",
     },
 )]
 /// Get datastore status.
@@ -656,13 +635,26 @@ pub fn status(
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<DataStoreStatus, Error> {
-    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
-    let storage = crate::tools::disks::disk_usage(&datastore.base_path())?;
-    let (counts, gc_status) = if verbose {
-        let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-        let user_info = CachedUserInfo::new()?;
+    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let user_info = CachedUserInfo::new()?;
+    let store_privs = user_info.lookup_privs(&auth_id, &["datastore", &store]);
+
+    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read));
+
+    let store_stats = if store_privs & (PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_BACKUP) != 0 {
+        true
+    } else if store_privs & PRIV_DATASTORE_READ != 0 {
+        false // allow at least counts, user can read groups anyway..
+    } else {
+        match user_info.any_privs_below(&auth_id, &["datastore", &store], NS_PRIVS_OK) {
+            // avoid leaking existence info if users hasn't at least any priv. below
+            Ok(false) | Err(_) => return Err(http_err!(FORBIDDEN, "permission check failed")),
+            _ => false,
+        }
+    };
+    let datastore = datastore?; // only unwrap no to avoid leaking existence info
 
-        let store_privs = user_info.lookup_privs(&auth_id, &["datastore", &store]);
+    let (counts, gc_status) = if verbose {
         let filter_owner = if store_privs & PRIV_DATASTORE_AUDIT != 0 {
             None
         } else {
@@ -670,19 +662,34 @@ pub fn status(
         };
 
         let counts = Some(get_snapshots_count(&datastore, filter_owner)?);
-        let gc_status = Some(datastore.last_gc_status());
+        let gc_status = if store_stats {
+            Some(datastore.last_gc_status())
+        } else {
+            None
+        };
 
         (counts, gc_status)
     } else {
         (None, None)
     };
 
-    Ok(DataStoreStatus {
-        total: storage.total,
-        used: storage.used,
-        avail: storage.avail,
-        gc_status,
-        counts,
+    Ok(if store_stats {
+        let storage = proxmox_sys::fs::fs_info(&datastore.base_path())?;
+        DataStoreStatus {
+            total: storage.total,
+            used: storage.used,
+            avail: storage.available,
+            gc_status,
+            counts,
+        }
+    } else {
+        DataStoreStatus {
+            total: 0,
+            used: 0,
+            avail: 0,
+            gc_status,
+            counts,
+        }
     })
 }
 
@@ -748,7 +755,8 @@ pub fn verify(
 ) -> Result<Value, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
     let ns = ns.unwrap_or_default();
-    let owner_check_required = check_ns_privs(
+
+    let owner_check_required = check_ns_privs_full(
         &store,
         &ns,
         &auth_id,
@@ -806,9 +814,9 @@ pub fn verify(
         }
         (None, None, None) => {
             worker_id = if ns.is_root() {
-                store.clone()
+                store
             } else {
-                format!("{store}:{}", ns.display_as_path())
+                format!("{}:{}", store, ns.display_as_path())
             };
         }
         _ => bail!("parameters do not specify a backup group or snapshot"),
@@ -831,7 +839,10 @@ pub fn verify(
                     worker.upid().clone(),
                     Some(&move |manifest| verify_filter(ignore_verified, outdated_after, manifest)),
                 )? {
-                    res.push(backup_dir.to_string());
+                    res.push(print_ns_and_snapshot(
+                        backup_dir.backup_ns(),
+                        backup_dir.as_ref(),
+                    ));
                 }
                 res
             } else if let Some(backup_group) = backup_group {
@@ -876,10 +887,6 @@ pub fn verify(
 #[api(
     input: {
         properties: {
-            ns: {
-                type: BackupNamespace,
-                optional: true,
-            },
             group: {
                 type: pbs_api_types::BackupGroup,
                 flatten: true,
@@ -890,13 +897,17 @@ pub fn verify(
                 default: false,
                 description: "Just show what prune would do, but do not delete anything.",
             },
-            "prune-options": {
-                type: PruneOptions,
+            "keep-options": {
+                type: KeepOptions,
                 flatten: true,
             },
             store: {
                 schema: DATASTORE_SCHEMA,
             },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
+            },
         },
     },
     returns: pbs_api_types::ADMIN_DATASTORE_PRUNE_RETURN_TYPE,
@@ -908,11 +919,11 @@ pub fn verify(
 )]
 /// Prune a group on the datastore
 pub fn prune(
-    ns: Option<BackupNamespace>,
     group: pbs_api_types::BackupGroup,
     dry_run: bool,
-    prune_options: PruneOptions,
+    keep_options: KeepOptions,
     store: String,
+    ns: Option<BackupNamespace>,
     _param: Value,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
@@ -928,19 +939,18 @@ pub fn prune(
         &group,
     )?;
 
-    let group = datastore.backup_group(ns, group);
-
-    let worker_id = format!("{}:{}", store, group);
+    let worker_id = format!("{}:{}:{}", store, ns, group);
+    let group = datastore.backup_group(ns.clone(), group);
 
     let mut prune_result = Vec::new();
 
     let list = group.list_backups()?;
 
-    let mut prune_info = compute_prune_info(list, &prune_options)?;
+    let mut prune_info = compute_prune_info(list, &keep_options)?;
 
     prune_info.reverse(); // delete older snapshots first
 
-    let keep_all = !pbs_datastore::prune::keeps_something(&prune_options);
+    let keep_all = !keep_options.keeps_something();
 
     if dry_run {
         for (info, mark) in prune_info {
@@ -968,16 +978,18 @@ pub fn prune(
     if keep_all {
         task_log!(worker, "No prune selection - keeping all files.");
     } else {
+        let mut opts = Vec::new();
+        if !ns.is_root() {
+            opts.push(format!("--ns {ns}"));
+        }
+        crate::server::cli_keep_options(&mut opts, &keep_options);
+
+        task_log!(worker, "retention options: {}", opts.join(" "));
         task_log!(
             worker,
-            "retention options: {}",
-            pbs_datastore::prune::cli_options_string(&prune_options)
-        );
-        task_log!(
-            worker,
-            "Starting prune on store \"{}\" group \"{}\"",
-            store,
-            group,
+            "Starting prune on {} group \"{}\"",
+            print_store_and_ns(&store, &ns),
+            group.group(),
         );
     }
 
@@ -1027,57 +1039,54 @@ pub fn prune(
                 description: "Just show what prune would do, but do not delete anything.",
             },
             "prune-options": {
-                type: PruneOptions,
+                type: PruneJobOptions,
                 flatten: true,
             },
             store: {
                 schema: DATASTORE_SCHEMA,
             },
-            ns: {
-                type: BackupNamespace,
-                optional: true,
-            },
         },
     },
     returns: {
         schema: UPID_SCHEMA,
     },
     access: {
-        permission: &Permission::Privilege(
-            &["datastore", "{store}"], PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_PRUNE, true),
+        permission: &Permission::Anybody,
+        description: "Requires Datastore.Modify or Datastore.Prune on the datastore/namespace.",
     },
 )]
 /// Prune the datastore
 pub fn prune_datastore(
     dry_run: bool,
-    prune_options: PruneOptions,
+    prune_options: PruneJobOptions,
     store: String,
-    ns: Option<BackupNamespace>,
     _param: Value,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<String, Error> {
+    let user_info = CachedUserInfo::new()?;
+
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
 
+    user_info.check_privs(
+        &auth_id,
+        &prune_options.acl_path(&store),
+        PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_PRUNE,
+        true,
+    )?;
+
     let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
+    let ns = prune_options.ns.clone().unwrap_or_default();
+    let worker_id = format!("{}:{}", store, ns);
 
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
-    // FIXME: add max-depth
-
     let upid_str = WorkerTask::new_thread(
         "prune",
-        Some(store.clone()),
+        Some(worker_id),
         auth_id.to_string(),
         to_stdout,
         move |worker| {
-            crate::server::prune_datastore(
-                worker,
-                auth_id,
-                prune_options,
-                datastore,
-                ns.unwrap_or_default(),
-                dry_run,
-            )
+            crate::server::prune_datastore(worker, auth_id, prune_options, datastore, dry_run)
         },
     )?;
 
@@ -1154,24 +1163,6 @@ pub fn garbage_collection_status(
     Ok(status)
 }
 
-fn can_access_any_ns(store: Arc<DataStore>, auth_id: &Authid, user_info: &CachedUserInfo) -> bool {
-    // NOTE: traversing the datastore could be avoided if we had an "ACL tree: is there any priv
-    // below /datastore/{store}" helper
-    let mut iter =
-        if let Ok(iter) = store.recursive_iter_backup_ns_ok(BackupNamespace::root(), None) {
-            iter
-        } else {
-            return false;
-        };
-    let wanted =
-        PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP;
-    let name = store.name();
-    iter.any(|ns| -> bool {
-        let user_privs = user_info.lookup_privs(&auth_id, &["datastore", name, &ns.to_string()]);
-        user_privs & wanted != 0
-    })
-}
-
 #[api(
     returns: {
         description: "List the accessible datastores.",
@@ -1196,15 +1187,14 @@ pub fn get_datastore_list(
     let mut list = Vec::new();
 
     for (store, (_, data)) in &config.sections {
-        let user_privs = user_info.lookup_privs(&auth_id, &["datastore", store]);
+        let acl_path = &["datastore", store];
+        let user_privs = user_info.lookup_privs(&auth_id, acl_path);
         let allowed = (user_privs & (PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_BACKUP)) != 0;
 
         let mut allow_id = false;
         if !allowed {
-            let scfg: pbs_api_types::DataStoreConfig = serde_json::from_value(data.to_owned())?;
-            // safety: we just cannot go through lookup as we must avoid an operation check
-            if let Ok(datastore) = unsafe { DataStore::open_from_config(scfg, None) } {
-                allow_id = can_access_any_ns(datastore, &auth_id, &user_info);
+            if let Ok(any_privs) = user_info.any_privs_below(&auth_id, acl_path, NS_PRIVS_OK) {
+                allow_id = any_privs;
             }
         }
 
@@ -1259,10 +1249,6 @@ pub fn download_file(
         let store = required_string_param(&param, "store")?;
         let backup_ns = optional_ns_param(&param)?;
 
-        let store_with_ns = DatastoreWithNamespace {
-            store: store.to_owned(),
-            ns: backup_ns.clone(),
-        };
         let backup_dir: pbs_api_types::BackupDir = Deserialize::deserialize(&param)?;
         let datastore = check_privs_and_load_store(
             &store,
@@ -1278,7 +1264,10 @@ pub fn download_file(
 
         println!(
             "Download {} from {} ({}/{})",
-            file_name, store_with_ns, backup_dir, file_name
+            file_name,
+            print_store_and_ns(&store, &backup_ns),
+            backup_dir,
+            file_name
         );
 
         let backup_dir = datastore.backup_dir(backup_ns, backup_dir)?;
@@ -1344,10 +1333,7 @@ pub fn download_file_decoded(
         let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
         let store = required_string_param(&param, "store")?;
         let backup_ns = optional_ns_param(&param)?;
-        let store_with_ns = DatastoreWithNamespace {
-            store: store.to_owned(),
-            ns: backup_ns.clone(),
-        };
+
         let backup_dir_api: pbs_api_types::BackupDir = Deserialize::deserialize(&param)?;
         let datastore = check_privs_and_load_store(
             &store,
@@ -1360,7 +1346,7 @@ pub fn download_file_decoded(
         )?;
 
         let file_name = required_string_param(&param, "file-name")?.to_owned();
-        let backup_dir = datastore.backup_dir(backup_ns, backup_dir_api.clone())?;
+        let backup_dir = datastore.backup_dir(backup_ns.clone(), backup_dir_api.clone())?;
 
         let (manifest, files) = read_backup_index(&backup_dir)?;
         for file in files {
@@ -1371,7 +1357,10 @@ pub fn download_file_decoded(
 
         println!(
             "Download {} from {} ({}/{})",
-            file_name, store_with_ns, backup_dir_api, file_name
+            file_name,
+            print_store_and_ns(&store, &backup_ns),
+            backup_dir_api,
+            file_name
         );
 
         let mut path = datastore.base_path();
@@ -1474,10 +1463,7 @@ pub fn upload_backup_log(
         let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
         let store = required_string_param(&param, "store")?;
         let backup_ns = optional_ns_param(&param)?;
-        let store_with_ns = DatastoreWithNamespace {
-            store: store.to_owned(),
-            ns: backup_ns.clone(),
-        };
+
         let backup_dir_api: pbs_api_types::BackupDir = Deserialize::deserialize(&param)?;
 
         let datastore = check_privs_and_load_store(
@@ -1489,7 +1475,7 @@ pub fn upload_backup_log(
             Some(Operation::Write),
             &backup_dir_api.group,
         )?;
-        let backup_dir = datastore.backup_dir(backup_ns, backup_dir_api.clone())?;
+        let backup_dir = datastore.backup_dir(backup_ns.clone(), backup_dir_api.clone())?;
 
         let file_name = CLIENT_LOG_BLOB_NAME;
 
@@ -1500,7 +1486,10 @@ pub fn upload_backup_log(
             bail!("backup already contains a log.");
         }
 
-        println!("Upload backup log to {store_with_ns} {backup_dir_api}/{file_name}");
+        println!(
+            "Upload backup log to {} {backup_dir_api}/{file_name}",
+            print_store_and_ns(&store, &backup_ns),
+        );
 
         let data = req_body
             .map_err(Error::from)
@@ -1555,6 +1544,7 @@ pub fn catalog(
 ) -> Result<Vec<ArchiveEntry>, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
     let ns = ns.unwrap_or_default();
+
     let datastore = check_privs_and_load_store(
         &store,
         &ns,
@@ -1634,6 +1624,7 @@ pub fn pxar_file_download(
         let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
         let store = required_string_param(&param, "store")?;
         let ns = optional_ns_param(&param)?;
+
         let backup_dir: pbs_api_types::BackupDir = Deserialize::deserialize(&param)?;
         let datastore = check_privs_and_load_store(
             &store,
@@ -1841,6 +1832,7 @@ pub fn get_group_notes(
 ) -> Result<String, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
     let ns = ns.unwrap_or_default();
+
     let datastore = check_privs_and_load_store(
         &store,
         &ns,
@@ -1888,6 +1880,7 @@ pub fn set_group_notes(
 ) -> Result<(), Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
     let ns = ns.unwrap_or_default();
+
     let datastore = check_privs_and_load_store(
         &store,
         &ns,
@@ -1933,6 +1926,7 @@ pub fn get_notes(
 ) -> Result<String, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
     let ns = ns.unwrap_or_default();
+
     let datastore = check_privs_and_load_store(
         &store,
         &ns,
@@ -1985,6 +1979,7 @@ pub fn set_notes(
 ) -> Result<(), Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
     let ns = ns.unwrap_or_default();
+
     let datastore = check_privs_and_load_store(
         &store,
         &ns,
@@ -2131,7 +2126,7 @@ pub fn set_backup_owner(
 ) -> Result<(), Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
     let ns = ns.unwrap_or_default();
-    let owner_check_required = check_ns_privs(
+    let owner_check_required = check_ns_privs_full(
         &store,
         &ns,
         &auth_id,
@@ -2172,7 +2167,7 @@ pub fn set_backup_owner(
                 UNAUTHORIZED,
                 "{} does not have permission to change owner of backup group '{}' to {}",
                 auth_id,
-                backup_group,
+                backup_group.group(),
                 new_owner,
             ));
         }