]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/api2/admin/datastore.rs
replace 'disk_usage' with 'fs_info' from proxmox-sys
[proxmox-backup.git] / src / api2 / admin / datastore.rs
index bc75da56c7235b4953aa2065a25a30233c37187d..ad2744b718263c7cc08f3173f65dfe9fcb85e2e6 100644 (file)
@@ -32,11 +32,12 @@ use pxar::accessor::aio::Accessor;
 use pxar::EntryKind;
 
 use pbs_api_types::{
-    Authid, BackupContent, BackupNamespace, BackupType, Counts, CryptMode, DataStoreListItem,
-    DataStoreStatus, GarbageCollectionStatus, GroupListItem, Operation, PruneOptions, RRDMode,
-    RRDTimeFrame, SnapshotListItem, SnapshotVerifyState, BACKUP_ARCHIVE_NAME_SCHEMA,
-    BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA,
-    DATASTORE_SCHEMA, IGNORE_VERIFIED_BACKUPS_SCHEMA, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
+    print_ns_and_snapshot, print_store_and_ns, Authid, BackupContent, BackupNamespace, BackupType,
+    Counts, CryptMode, DataStoreListItem, DataStoreStatus, GarbageCollectionStatus, GroupListItem,
+    KeepOptions, Operation, PruneJobOptions, RRDMode, RRDTimeFrame, SnapshotListItem,
+    SnapshotVerifyState, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA,
+    BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, DATASTORE_SCHEMA, IGNORE_VERIFIED_BACKUPS_SCHEMA,
+    MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
     PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY,
     UPID_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,
 };
@@ -61,7 +62,10 @@ use proxmox_rest_server::{formatter, WorkerTask};
 
 use crate::api2::backup::optional_ns_param;
 use crate::api2::node::rrd::create_value_from_rrd;
-use crate::backup::{verify_all_backups, verify_backup_dir, verify_backup_group, verify_filter};
+use crate::backup::{
+    check_ns_privs_full, verify_all_backups, verify_backup_dir, verify_backup_group, verify_filter,
+    ListAccessibleBackupGroups, NS_PRIVS_OK,
+};
 
 use crate::server::jobstate::Job;
 
@@ -77,43 +81,35 @@ fn get_group_note_path(
     note_path
 }
 
-// TODO: move somewhere we can reuse it from (namespace has its own copy atm.)
-fn get_ns_privs(store: &str, ns: &BackupNamespace, auth_id: &Authid) -> Result<u64, Error> {
-    let user_info = CachedUserInfo::new()?;
-
-    Ok(if ns.is_root() {
-        user_info.lookup_privs(auth_id, &["datastore", store])
-    } else {
-        user_info.lookup_privs(auth_id, &["datastore", store, &ns.to_string()])
-    })
-}
-
-// asserts that either either `full_access_privs` or `partial_access_privs` are fulfilled,
-// returning value indicates whether further checks like group ownerships are required
-fn check_ns_privs(
+// helper to unify common sequence of checks:
+// 1. check privs on NS (full or limited access)
+// 2. load datastore
+// 3. if needed (only limited access), check owner of group
+fn check_privs_and_load_store(
     store: &str,
     ns: &BackupNamespace,
     auth_id: &Authid,
     full_access_privs: u64,
     partial_access_privs: u64,
-) -> Result<bool, Error> {
-    let privs = get_ns_privs(store, ns, auth_id)?;
+    operation: Option<Operation>,
+    backup_group: &pbs_api_types::BackupGroup,
+) -> Result<Arc<DataStore>, Error> {
+    let limited = check_ns_privs_full(store, ns, auth_id, full_access_privs, partial_access_privs)?;
 
-    if full_access_privs != 0 && (privs & full_access_privs) != 0 {
-        return Ok(false);
-    }
-    if partial_access_privs != 0 && (privs & partial_access_privs) != 0 {
-        return Ok(true);
+    let datastore = DataStore::lookup_datastore(store, operation)?;
+
+    if limited {
+        let owner = datastore.get_owner(ns, backup_group)?;
+        check_backup_owner(&owner, &auth_id)?;
     }
 
-    proxmox_router::http_bail!(FORBIDDEN, "permission check failed");
+    Ok(datastore)
 }
 
 fn read_backup_index(
-    store: &DataStore,
     backup_dir: &BackupDir,
 ) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
-    let (manifest, index_size) = store.load_manifest(backup_dir)?;
+    let (manifest, index_size) = backup_dir.load_manifest()?;
 
     let mut result = Vec::new();
     for item in manifest.files() {
@@ -137,10 +133,9 @@ fn read_backup_index(
 }
 
 fn get_all_snapshot_files(
-    store: &DataStore,
     info: &BackupInfo,
 ) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
-    let (manifest, mut files) = read_backup_index(store, &info.backup_dir)?;
+    let (manifest, mut files) = read_backup_index(&info.backup_dir)?;
 
     let file_set = files.iter().fold(HashSet::new(), |mut acc, item| {
         acc.insert(item.filename.clone());
@@ -167,7 +162,7 @@ fn get_all_snapshot_files(
             store: {
                 schema: DATASTORE_SCHEMA,
             },
-            "backup-ns": {
+            ns: {
                 type: BackupNamespace,
                 optional: true,
             },
@@ -183,15 +178,15 @@ fn get_all_snapshot_files(
 /// List backup groups.
 pub fn list_groups(
     store: String,
-    backup_ns: Option<BackupNamespace>,
+    ns: Option<BackupNamespace>,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Vec<GroupListItem>, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let ns = ns.unwrap_or_default();
 
-    let backup_ns = backup_ns.unwrap_or_default();
-    let list_all = !check_ns_privs(
+    let list_all = !check_ns_privs_full(
         &store,
-        &backup_ns,
+        &ns,
         &auth_id,
         PRIV_DATASTORE_AUDIT,
         PRIV_DATASTORE_BACKUP,
@@ -200,14 +195,19 @@ pub fn list_groups(
     let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
 
     datastore
-        .iter_backup_groups(backup_ns.clone())? // FIXME: Namespaces and recursion parameters!
+        .iter_backup_groups(ns.clone())? // FIXME: Namespaces and recursion parameters!
         .try_fold(Vec::new(), |mut group_info, group| {
             let group = group?;
-            let owner = match datastore.get_owner(&backup_ns, group.as_ref()) {
+
+            let owner = match datastore.get_owner(&ns, group.as_ref()) {
                 Ok(auth_id) => auth_id,
                 Err(err) => {
-                    let id = &store;
-                    eprintln!("Failed to get owner of group '{}/{}' - {}", id, group, err);
+                    eprintln!(
+                        "Failed to get owner of group '{}' in {} - {}",
+                        group.group(),
+                        print_store_and_ns(&store, &ns),
+                        err
+                    );
                     return Ok(group_info);
                 }
             };
@@ -236,7 +236,7 @@ pub fn list_groups(
                 })
                 .to_owned();
 
-            let note_path = get_group_note_path(&datastore, &backup_ns, group.as_ref());
+            let note_path = get_group_note_path(&datastore, &ns, group.as_ref());
             let comment = file_read_firstline(&note_path).ok();
 
             group_info.push(GroupListItem {
@@ -256,7 +256,7 @@ pub fn list_groups(
     input: {
         properties: {
             store: { schema: DATASTORE_SCHEMA },
-            "backup-ns": {
+            ns: {
                 type: BackupNamespace,
                 optional: true,
             },
@@ -275,31 +275,25 @@ pub fn list_groups(
 /// Delete backup group including all snapshots.
 pub fn delete_group(
     store: String,
-    backup_ns: Option<BackupNamespace>,
+    ns: Option<BackupNamespace>,
     group: pbs_api_types::BackupGroup,
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let ns = ns.unwrap_or_default();
 
-    let backup_ns = backup_ns.unwrap_or_default();
-
-    let owner_check_required = check_ns_privs(
+    let datastore = check_privs_and_load_store(
         &store,
-        &backup_ns,
+        &ns,
         &auth_id,
         PRIV_DATASTORE_MODIFY,
         PRIV_DATASTORE_PRUNE,
+        Some(Operation::Write),
+        &group,
     )?;
 
-    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
-
-    if owner_check_required {
-        let owner = datastore.get_owner(&backup_ns, &group)?;
-        check_backup_owner(&owner, &auth_id)?;
-    }
-
-    if !datastore.remove_backup_group(&backup_ns, &group)? {
+    if !datastore.remove_backup_group(&ns, &group)? {
         bail!("group only partially deleted due to protected snapshots");
     }
 
@@ -310,7 +304,7 @@ pub fn delete_group(
     input: {
         properties: {
             store: { schema: DATASTORE_SCHEMA },
-            "backup-ns": {
+            ns: {
                 type: BackupNamespace,
                 optional: true,
             },
@@ -330,35 +324,29 @@ pub fn delete_group(
 /// List snapshot files.
 pub fn list_snapshot_files(
     store: String,
-    backup_ns: Option<BackupNamespace>,
+    ns: Option<BackupNamespace>,
     backup_dir: pbs_api_types::BackupDir,
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Vec<BackupContent>, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let ns = ns.unwrap_or_default();
 
-    let backup_ns = backup_ns.unwrap_or_default();
-
-    let owner_check_required = check_ns_privs(
+    let datastore = check_privs_and_load_store(
         &store,
-        &backup_ns,
+        &ns,
         &auth_id,
         PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_READ,
         PRIV_DATASTORE_BACKUP,
+        Some(Operation::Read),
+        &backup_dir.group,
     )?;
 
-    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
-
-    if owner_check_required {
-        let owner = datastore.get_owner(&backup_ns, &backup_dir.group)?;
-        check_backup_owner(&owner, &auth_id)?;
-    }
-
-    let snapshot = datastore.backup_dir(backup_ns, backup_dir)?;
+    let snapshot = datastore.backup_dir(ns, backup_dir)?;
 
     let info = BackupInfo::new(snapshot)?;
 
-    let (_manifest, files) = get_all_snapshot_files(&datastore, &info)?;
+    let (_manifest, files) = get_all_snapshot_files(&info)?;
 
     Ok(files)
 }
@@ -367,7 +355,7 @@ pub fn list_snapshot_files(
     input: {
         properties: {
             store: { schema: DATASTORE_SCHEMA },
-            "backup-ns": {
+            ns: {
                 type: BackupNamespace,
                 optional: true,
             },
@@ -386,31 +374,25 @@ pub fn list_snapshot_files(
 /// Delete backup snapshot.
 pub fn delete_snapshot(
     store: String,
-    backup_ns: Option<BackupNamespace>,
+    ns: Option<BackupNamespace>,
     backup_dir: pbs_api_types::BackupDir,
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let ns = ns.unwrap_or_default();
 
-    let backup_ns = backup_ns.unwrap_or_default();
-
-    let owner_check_required = check_ns_privs(
+    let datastore = check_privs_and_load_store(
         &store,
-        &backup_ns,
+        &ns,
         &auth_id,
         PRIV_DATASTORE_MODIFY,
         PRIV_DATASTORE_PRUNE,
+        Some(Operation::Write),
+        &backup_dir.group,
     )?;
 
-    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
-
-    if owner_check_required {
-        let owner = datastore.get_owner(&backup_ns, &backup_dir.group)?;
-        check_backup_owner(&owner, &auth_id)?;
-    }
-
-    let snapshot = datastore.backup_dir(backup_ns, backup_dir)?;
+    let snapshot = datastore.backup_dir(ns, backup_dir)?;
 
     snapshot.destroy(false)?;
 
@@ -422,7 +404,7 @@ pub fn delete_snapshot(
     input: {
         properties: {
             store: { schema: DATASTORE_SCHEMA },
-            "backup-ns": {
+            ns: {
                 type: BackupNamespace,
                 optional: true,
             },
@@ -446,7 +428,7 @@ pub fn delete_snapshot(
 /// List backup snapshots.
 pub fn list_snapshots(
     store: String,
-    backup_ns: Option<BackupNamespace>,
+    ns: Option<BackupNamespace>,
     backup_type: Option<BackupType>,
     backup_id: Option<String>,
     _param: Value,
@@ -455,11 +437,11 @@ pub fn list_snapshots(
 ) -> Result<Vec<SnapshotListItem>, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
 
-    let backup_ns = backup_ns.unwrap_or_default();
+    let ns = ns.unwrap_or_default();
 
-    let list_all = !check_ns_privs(
+    let list_all = !check_ns_privs_full(
         &store,
-        &backup_ns,
+        &ns,
         &auth_id,
         PRIV_DATASTORE_AUDIT,
         PRIV_DATASTORE_BACKUP,
@@ -471,20 +453,20 @@ pub fn list_snapshots(
     // backup group and provide an error free (Err -> None) accessor
     let groups = match (backup_type, backup_id) {
         (Some(backup_type), Some(backup_id)) => {
-            vec![datastore.backup_group_from_parts(backup_ns, backup_type, backup_id)]
+            vec![datastore.backup_group_from_parts(ns.clone(), backup_type, backup_id)]
         }
         // FIXME: Recursion
         (Some(backup_type), None) => datastore
-            .iter_backup_groups_ok(backup_ns)?
+            .iter_backup_groups_ok(ns.clone())?
             .filter(|group| group.backup_type() == backup_type)
             .collect(),
         // FIXME: Recursion
         (None, Some(backup_id)) => datastore
-            .iter_backup_groups_ok(backup_ns)?
+            .iter_backup_groups_ok(ns.clone())?
             .filter(|group| group.backup_id() == backup_id)
             .collect(),
         // FIXME: Recursion
-        (None, None) => datastore.list_backup_groups(backup_ns)?,
+        (None, None) => datastore.list_backup_groups(ns.clone())?,
     };
 
     let info_to_snapshot_list_item = |group: &BackupGroup, owner, info: BackupInfo| {
@@ -494,7 +476,7 @@ pub fn list_snapshots(
         };
         let protected = info.backup_dir.is_protected();
 
-        match get_all_snapshot_files(&datastore, &info) {
+        match get_all_snapshot_files(&info) {
             Ok((manifest, files)) => {
                 // extract the first line from notes
                 let comment: Option<String> = manifest.unprotected["notes"]
@@ -564,8 +546,10 @@ pub fn list_snapshots(
             Ok(auth_id) => auth_id,
             Err(err) => {
                 eprintln!(
-                    "Failed to get owner of group '{}/{}' - {}",
-                    &store, group, err
+                    "Failed to get owner of group '{}' in {} - {}",
+                    group.group(),
+                    print_store_and_ns(&store, &ns),
+                    err
                 );
                 return Ok(snapshots);
             }
@@ -587,45 +571,37 @@ pub fn list_snapshots(
     })
 }
 
-fn get_snapshots_count(
-    store: &Arc<DataStore>,
-    filter_owner: Option<&Authid>,
-) -> Result<Counts, Error> {
-    store
-        .iter_backup_groups_ok(Default::default())? // FIXME: Recurse!
-        .filter(|group| {
-            // FIXME: namespace:
-            let owner = match store.get_owner(&BackupNamespace::root(), group.as_ref()) {
-                Ok(owner) => owner,
-                Err(err) => {
-                    let id = store.name();
-                    eprintln!("Failed to get owner of group '{}/{}' - {}", id, group, err);
-                    return false;
-                }
+fn get_snapshots_count(store: &Arc<DataStore>, owner: Option<&Authid>) -> Result<Counts, Error> {
+    let root_ns = Default::default();
+    ListAccessibleBackupGroups::new_with_privs(
+        store,
+        root_ns,
+        MAX_NAMESPACE_DEPTH,
+        Some(PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_READ),
+        None,
+        owner,
+    )?
+    .try_fold(Counts::default(), |mut counts, group| {
+        let group = match group {
+            Ok(group) => group,
+            Err(_) => return Ok(counts), // TODO: add this as error counts?
+        };
+        let snapshot_count = group.list_backups()?.len() as u64;
+
+        // only include groups with snapshots, counting/displaying empty groups can confuse
+        if snapshot_count > 0 {
+            let type_count = match group.backup_type() {
+                BackupType::Ct => counts.ct.get_or_insert(Default::default()),
+                BackupType::Vm => counts.vm.get_or_insert(Default::default()),
+                BackupType::Host => counts.host.get_or_insert(Default::default()),
             };
 
-            match filter_owner {
-                Some(filter) => check_backup_owner(&owner, filter).is_ok(),
-                None => true,
-            }
-        })
-        .try_fold(Counts::default(), |mut counts, group| {
-            let snapshot_count = group.list_backups()?.len() as u64;
-
-            // only include groups with snapshots, counting/displaying emtpy groups can confuse
-            if snapshot_count > 0 {
-                let type_count = match group.backup_type() {
-                    BackupType::Ct => counts.ct.get_or_insert(Default::default()),
-                    BackupType::Vm => counts.vm.get_or_insert(Default::default()),
-                    BackupType::Host => counts.host.get_or_insert(Default::default()),
-                };
-
-                type_count.groups += 1;
-                type_count.snapshots += snapshot_count;
-            }
+            type_count.groups += 1;
+            type_count.snapshots += snapshot_count;
+        }
 
-            Ok(counts)
-        })
+        Ok(counts)
+    })
 }
 
 #[api(
@@ -647,8 +623,9 @@ fn get_snapshots_count(
         type: DataStoreStatus,
     },
     access: {
-        permission: &Permission::Privilege(
-            &["datastore", "{store}"], PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_BACKUP, true),
+        permission: &Permission::Anybody,
+        description: "Requires on /datastore/{store} either DATASTORE_AUDIT or DATASTORE_BACKUP for \
+            the full statistics. Counts of accessible groups are always returned, if any",
     },
 )]
 /// Get datastore status.
@@ -658,13 +635,26 @@ pub fn status(
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<DataStoreStatus, Error> {
-    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
-    let storage = crate::tools::disks::disk_usage(&datastore.base_path())?;
-    let (counts, gc_status) = if verbose {
-        let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-        let user_info = CachedUserInfo::new()?;
+    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let user_info = CachedUserInfo::new()?;
+    let store_privs = user_info.lookup_privs(&auth_id, &["datastore", &store]);
 
-        let store_privs = user_info.lookup_privs(&auth_id, &["datastore", &store]);
+    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read));
+
+    let store_stats = if store_privs & (PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_BACKUP) != 0 {
+        true
+    } else if store_privs & PRIV_DATASTORE_READ != 0 {
+        false // allow at least counts, user can read groups anyway..
+    } else {
+        match user_info.any_privs_below(&auth_id, &["datastore", &store], NS_PRIVS_OK) {
+            // avoid leaking existence info if users hasn't at least any priv. below
+            Ok(false) | Err(_) => return Err(http_err!(FORBIDDEN, "permission check failed")),
+            _ => false,
+        }
+    };
+    let datastore = datastore?; // only unwrap no to avoid leaking existence info
+
+    let (counts, gc_status) = if verbose {
         let filter_owner = if store_privs & PRIV_DATASTORE_AUDIT != 0 {
             None
         } else {
@@ -672,19 +662,34 @@ pub fn status(
         };
 
         let counts = Some(get_snapshots_count(&datastore, filter_owner)?);
-        let gc_status = Some(datastore.last_gc_status());
+        let gc_status = if store_stats {
+            Some(datastore.last_gc_status())
+        } else {
+            None
+        };
 
         (counts, gc_status)
     } else {
         (None, None)
     };
 
-    Ok(DataStoreStatus {
-        total: storage.total,
-        used: storage.used,
-        avail: storage.avail,
-        gc_status,
-        counts,
+    Ok(if store_stats {
+        let storage = proxmox_sys::fs::fs_info(&datastore.base_path())?;
+        DataStoreStatus {
+            total: storage.total,
+            used: storage.used,
+            avail: storage.available,
+            gc_status,
+            counts,
+        }
+    } else {
+        DataStoreStatus {
+            total: 0,
+            used: 0,
+            avail: 0,
+            gc_status,
+            counts,
+        }
     })
 }
 
@@ -694,7 +699,7 @@ pub fn status(
             store: {
                 schema: DATASTORE_SCHEMA,
             },
-            "backup-ns": {
+            ns: {
                 type: BackupNamespace,
                 optional: true,
             },
@@ -718,6 +723,10 @@ pub fn status(
                 schema: BACKUP_TIME_SCHEMA,
                 optional: true,
             },
+            "max-depth": {
+                schema: NS_MAX_DEPTH_SCHEMA,
+                optional: true,
+            },
         },
     },
     returns: {
@@ -735,19 +744,21 @@ pub fn status(
 /// or all backups in the datastore.
 pub fn verify(
     store: String,
-    backup_ns: Option<BackupNamespace>,
+    ns: Option<BackupNamespace>,
     backup_type: Option<BackupType>,
     backup_id: Option<String>,
     backup_time: Option<i64>,
     ignore_verified: Option<bool>,
     outdated_after: Option<i64>,
+    max_depth: Option<usize>,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let backup_ns = backup_ns.unwrap_or_default();
-    let owner_check_required = check_ns_privs(
+    let ns = ns.unwrap_or_default();
+
+    let owner_check_required = check_ns_privs_full(
         &store,
-        &backup_ns,
+        &ns,
         &auth_id,
         PRIV_DATASTORE_VERIFY,
         PRIV_DATASTORE_BACKUP,
@@ -762,20 +773,18 @@ pub fn verify(
     let mut backup_group = None;
     let mut worker_type = "verify";
 
-    // FIXME: Recursion
-    // FIXME: Namespaces and worker ID, could this be an issue?
     match (backup_type, backup_id, backup_time) {
         (Some(backup_type), Some(backup_id), Some(backup_time)) => {
             worker_id = format!(
                 "{}:{}/{}/{}/{:08X}",
                 store,
-                backup_ns.display_as_path(),
+                ns.display_as_path(),
                 backup_type,
                 backup_id,
                 backup_time
             );
             let dir =
-                datastore.backup_dir_from_parts(backup_ns, backup_type, backup_id, backup_time)?;
+                datastore.backup_dir_from_parts(ns.clone(), backup_type, backup_id, backup_time)?;
 
             if owner_check_required {
                 let owner = datastore.get_owner(dir.backup_ns(), dir.as_ref())?;
@@ -789,22 +798,26 @@ pub fn verify(
             worker_id = format!(
                 "{}:{}/{}/{}",
                 store,
-                backup_ns.display_as_path(),
+                ns.display_as_path(),
                 backup_type,
                 backup_id
             );
             let group = pbs_api_types::BackupGroup::from((backup_type, backup_id));
 
             if owner_check_required {
-                let owner = datastore.get_owner(&backup_ns, &group)?;
+                let owner = datastore.get_owner(&ns, &group)?;
                 check_backup_owner(&owner, &auth_id)?;
             }
 
-            backup_group = Some(datastore.backup_group(backup_ns, group));
+            backup_group = Some(datastore.backup_group(ns.clone(), group));
             worker_type = "verify_group";
         }
         (None, None, None) => {
-            worker_id = store.clone();
+            worker_id = if ns.is_root() {
+                store
+            } else {
+                format!("{}:{}", store, ns.display_as_path())
+            };
         }
         _ => bail!("parameters do not specify a backup group or snapshot"),
     }
@@ -826,7 +839,10 @@ pub fn verify(
                     worker.upid().clone(),
                     Some(&move |manifest| verify_filter(ignore_verified, outdated_after, manifest)),
                 )? {
-                    res.push(backup_dir.to_string());
+                    res.push(print_ns_and_snapshot(
+                        backup_dir.backup_ns(),
+                        backup_dir.as_ref(),
+                    ));
                 }
                 res
             } else if let Some(backup_group) = backup_group {
@@ -840,16 +856,16 @@ pub fn verify(
                 failed_dirs
             } else {
                 let owner = if owner_check_required {
-                    Some(auth_id)
+                    Some(&auth_id)
                 } else {
                     None
                 };
 
-                // FIXME namespace missing here..
-
                 verify_all_backups(
                     &verify_worker,
                     worker.upid(),
+                    ns,
+                    max_depth,
                     owner,
                     Some(&move |manifest| verify_filter(ignore_verified, outdated_after, manifest)),
                 )?
@@ -871,10 +887,6 @@ pub fn verify(
 #[api(
     input: {
         properties: {
-            "backup-ns": {
-                type: BackupNamespace,
-                optional: true,
-            },
             group: {
                 type: pbs_api_types::BackupGroup,
                 flatten: true,
@@ -885,13 +897,17 @@ pub fn verify(
                 default: false,
                 description: "Just show what prune would do, but do not delete anything.",
             },
-            "prune-options": {
-                type: PruneOptions,
+            "keep-options": {
+                type: KeepOptions,
                 flatten: true,
             },
             store: {
                 schema: DATASTORE_SCHEMA,
             },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
+            },
         },
     },
     returns: pbs_api_types::ADMIN_DATASTORE_PRUNE_RETURN_TYPE,
@@ -903,46 +919,38 @@ pub fn verify(
 )]
 /// Prune a group on the datastore
 pub fn prune(
-    backup_ns: Option<BackupNamespace>,
     group: pbs_api_types::BackupGroup,
     dry_run: bool,
-    prune_options: PruneOptions,
+    keep_options: KeepOptions,
     store: String,
+    ns: Option<BackupNamespace>,
     _param: Value,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-
-    let backup_ns = backup_ns.unwrap_or_default();
-
-    let owner_check_required = check_ns_privs(
+    let ns = ns.unwrap_or_default();
+    let datastore = check_privs_and_load_store(
         &store,
-        &backup_ns,
+        &ns,
         &auth_id,
         PRIV_DATASTORE_MODIFY,
         PRIV_DATASTORE_PRUNE,
+        Some(Operation::Write),
+        &group,
     )?;
 
-    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
-
-    if owner_check_required {
-        let owner = datastore.get_owner(&backup_ns, &group)?;
-        check_backup_owner(&owner, &auth_id)?;
-    }
-
-    let group = datastore.backup_group(backup_ns, group);
-
-    let worker_id = format!("{}:{}", store, group);
+    let worker_id = format!("{}:{}:{}", store, ns, group);
+    let group = datastore.backup_group(ns.clone(), group);
 
     let mut prune_result = Vec::new();
 
     let list = group.list_backups()?;
 
-    let mut prune_info = compute_prune_info(list, &prune_options)?;
+    let mut prune_info = compute_prune_info(list, &keep_options)?;
 
     prune_info.reverse(); // delete older snapshots first
 
-    let keep_all = !pbs_datastore::prune::keeps_something(&prune_options);
+    let keep_all = !keep_options.keeps_something();
 
     if dry_run {
         for (info, mark) in prune_info {
@@ -955,9 +963,9 @@ pub fn prune(
                 "keep": keep,
                 "protected": mark.protected(),
             });
-            let ns = info.backup_dir.backup_ns();
-            if !ns.is_root() {
-                result["backup-ns"] = serde_json::to_value(ns)?;
+            let prune_ns = info.backup_dir.backup_ns();
+            if !prune_ns.is_root() {
+                result["ns"] = serde_json::to_value(prune_ns)?;
             }
             prune_result.push(result);
         }
@@ -970,16 +978,18 @@ pub fn prune(
     if keep_all {
         task_log!(worker, "No prune selection - keeping all files.");
     } else {
+        let mut opts = Vec::new();
+        if !ns.is_root() {
+            opts.push(format!("--ns {ns}"));
+        }
+        crate::server::cli_keep_options(&mut opts, &keep_options);
+
+        task_log!(worker, "retention options: {}", opts.join(" "));
         task_log!(
             worker,
-            "retention options: {}",
-            pbs_datastore::prune::cli_options_string(&prune_options)
-        );
-        task_log!(
-            worker,
-            "Starting prune on store \"{}\" group \"{}\"",
-            store,
-            group,
+            "Starting prune on {} group \"{}\"",
+            print_store_and_ns(&store, &ns),
+            group.group(),
         );
     }
 
@@ -1029,57 +1039,54 @@ pub fn prune(
                 description: "Just show what prune would do, but do not delete anything.",
             },
             "prune-options": {
-                type: PruneOptions,
+                type: PruneJobOptions,
                 flatten: true,
             },
             store: {
                 schema: DATASTORE_SCHEMA,
             },
-            ns: {
-                type: BackupNamespace,
-                optional: true,
-            },
         },
     },
     returns: {
         schema: UPID_SCHEMA,
     },
     access: {
-        permission: &Permission::Privilege(
-            &["datastore", "{store}"], PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_PRUNE, true),
+        permission: &Permission::Anybody,
+        description: "Requires Datastore.Modify or Datastore.Prune on the datastore/namespace.",
     },
 )]
 /// Prune the datastore
 pub fn prune_datastore(
     dry_run: bool,
-    prune_options: PruneOptions,
+    prune_options: PruneJobOptions,
     store: String,
-    ns: Option<BackupNamespace>,
     _param: Value,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<String, Error> {
+    let user_info = CachedUserInfo::new()?;
+
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
 
+    user_info.check_privs(
+        &auth_id,
+        &prune_options.acl_path(&store),
+        PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_PRUNE,
+        true,
+    )?;
+
     let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
+    let ns = prune_options.ns.clone().unwrap_or_default();
+    let worker_id = format!("{}:{}", store, ns);
 
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
-    // FIXME: also allow a per-namespace pruning with max-depth
-
     let upid_str = WorkerTask::new_thread(
         "prune",
-        Some(store.clone()),
+        Some(worker_id),
         auth_id.to_string(),
         to_stdout,
         move |worker| {
-            crate::server::prune_datastore(
-                worker,
-                auth_id,
-                prune_options,
-                datastore,
-                ns.unwrap_or_default(),
-                dry_run,
-            )
+            crate::server::prune_datastore(worker, auth_id, prune_options, datastore, dry_run)
         },
     )?;
 
@@ -1156,24 +1163,6 @@ pub fn garbage_collection_status(
     Ok(status)
 }
 
-fn can_access_any_ns(store: Arc<DataStore>, auth_id: &Authid, user_info: &CachedUserInfo) -> bool {
-    // NOTE: traversing the datastore could be avoided if we had an "ACL tree: is there any priv
-    // below /datastore/{store}" helper
-    let mut iter =
-        if let Ok(iter) = store.recursive_iter_backup_ns_ok(BackupNamespace::root(), None) {
-            iter
-        } else {
-            return false;
-        };
-    let wanted =
-        PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP;
-    let name = store.name();
-    iter.any(|ns| -> bool {
-        let user_privs = user_info.lookup_privs(&auth_id, &["datastore", name, &ns.to_string()]);
-        user_privs & wanted != 0
-    })
-}
-
 #[api(
     returns: {
         description: "List the accessible datastores.",
@@ -1198,15 +1187,14 @@ pub fn get_datastore_list(
     let mut list = Vec::new();
 
     for (store, (_, data)) in &config.sections {
-        let user_privs = user_info.lookup_privs(&auth_id, &["datastore", store]);
+        let acl_path = &["datastore", store];
+        let user_privs = user_info.lookup_privs(&auth_id, acl_path);
         let allowed = (user_privs & (PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_BACKUP)) != 0;
 
         let mut allow_id = false;
         if !allowed {
-            let scfg: pbs_api_types::DataStoreConfig = serde_json::from_value(data.to_owned())?;
-            // safety: we just cannot go through lookup as we must avoid an operation check
-            if let Ok(datastore) = unsafe { DataStore::open_from_config(scfg, None) } {
-                allow_id = can_access_any_ns(datastore, &auth_id, &user_info);
+            if let Ok(any_privs) = user_info.any_privs_below(&auth_id, acl_path, NS_PRIVS_OK) {
+                allow_id = any_privs;
             }
         }
 
@@ -1233,7 +1221,7 @@ pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
         "Download single raw file from backup snapshot.",
         &sorted!([
             ("store", false, &DATASTORE_SCHEMA),
-            ("backup-ns", true, &BACKUP_NAMESPACE_SCHEMA),
+            ("ns", true, &BACKUP_NAMESPACE_SCHEMA),
             ("backup-type", false, &BACKUP_TYPE_SCHEMA),
             ("backup-id", false, &BACKUP_ID_SCHEMA),
             ("backup-time", false, &BACKUP_TIME_SCHEMA),
@@ -1260,29 +1248,30 @@ pub fn download_file(
         let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
         let store = required_string_param(&param, "store")?;
         let backup_ns = optional_ns_param(&param)?;
+
         let backup_dir: pbs_api_types::BackupDir = Deserialize::deserialize(&param)?;
-        let owner_check_required = check_ns_privs(
+        let datastore = check_privs_and_load_store(
             &store,
             &backup_ns,
             &auth_id,
             PRIV_DATASTORE_READ,
             PRIV_DATASTORE_BACKUP,
+            Some(Operation::Read),
+            &backup_dir.group,
         )?;
-        let datastore = DataStore::lookup_datastore(store, Some(Operation::Read))?;
-
-        if owner_check_required {
-            let owner = datastore.get_owner(&backup_ns, &backup_dir.group)?;
-            check_backup_owner(&owner, &auth_id)?;
-        }
-        let backup_dir = datastore.backup_dir(backup_ns, backup_dir)?;
 
         let file_name = required_string_param(&param, "file-name")?.to_owned();
 
         println!(
             "Download {} from {} ({}/{})",
-            file_name, store, backup_dir, file_name
+            file_name,
+            print_store_and_ns(&store, &backup_ns),
+            backup_dir,
+            file_name
         );
 
+        let backup_dir = datastore.backup_dir(backup_ns, backup_dir)?;
+
         let mut path = datastore.base_path();
         path.push(backup_dir.relative_path());
         path.push(&file_name);
@@ -1317,7 +1306,7 @@ pub const API_METHOD_DOWNLOAD_FILE_DECODED: ApiMethod = ApiMethod::new(
         "Download single decoded file from backup snapshot. Only works if it's not encrypted.",
         &sorted!([
             ("store", false, &DATASTORE_SCHEMA),
-            ("backup-ns", true, &BACKUP_NAMESPACE_SCHEMA),
+            ("ns", true, &BACKUP_NAMESPACE_SCHEMA),
             ("backup-type", false, &BACKUP_TYPE_SCHEMA),
             ("backup-id", false, &BACKUP_ID_SCHEMA),
             ("backup-time", false, &BACKUP_TIME_SCHEMA),
@@ -1344,26 +1333,22 @@ pub fn download_file_decoded(
         let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
         let store = required_string_param(&param, "store")?;
         let backup_ns = optional_ns_param(&param)?;
-        let backup_dir: pbs_api_types::BackupDir = Deserialize::deserialize(&param)?;
-        let owner_check_required = check_ns_privs(
+
+        let backup_dir_api: pbs_api_types::BackupDir = Deserialize::deserialize(&param)?;
+        let datastore = check_privs_and_load_store(
             &store,
             &backup_ns,
             &auth_id,
             PRIV_DATASTORE_READ,
             PRIV_DATASTORE_BACKUP,
+            Some(Operation::Read),
+            &backup_dir_api.group,
         )?;
-        let datastore = DataStore::lookup_datastore(store, Some(Operation::Read))?;
-
-        if owner_check_required {
-            let owner = datastore.get_owner(&backup_ns, &backup_dir.group)?;
-            check_backup_owner(&owner, &auth_id)?;
-        }
-
-        let backup_dir = datastore.backup_dir(backup_ns, backup_dir)?;
 
         let file_name = required_string_param(&param, "file-name")?.to_owned();
+        let backup_dir = datastore.backup_dir(backup_ns.clone(), backup_dir_api.clone())?;
 
-        let (manifest, files) = read_backup_index(&datastore, &backup_dir)?;
+        let (manifest, files) = read_backup_index(&backup_dir)?;
         for file in files {
             if file.filename == file_name && file.crypt_mode == Some(CryptMode::Encrypt) {
                 bail!("cannot decode '{}' - is encrypted", file_name);
@@ -1372,7 +1357,10 @@ pub fn download_file_decoded(
 
         println!(
             "Download {} from {} ({}/{})",
-            file_name, store, backup_dir, file_name
+            file_name,
+            print_store_and_ns(&store, &backup_ns),
+            backup_dir_api,
+            file_name
         );
 
         let mut path = datastore.base_path();
@@ -1452,7 +1440,7 @@ pub const API_METHOD_UPLOAD_BACKUP_LOG: ApiMethod = ApiMethod::new(
         "Upload the client backup log file into a backup snapshot ('client.log.blob').",
         &sorted!([
             ("store", false, &DATASTORE_SCHEMA),
-            ("backup-ns", true, &BACKUP_NAMESPACE_SCHEMA),
+            ("ns", true, &BACKUP_NAMESPACE_SCHEMA),
             ("backup-type", false, &BACKUP_TYPE_SCHEMA),
             ("backup-id", false, &BACKUP_ID_SCHEMA),
             ("backup-time", false, &BACKUP_TIME_SCHEMA),
@@ -1475,18 +1463,22 @@ pub fn upload_backup_log(
         let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
         let store = required_string_param(&param, "store")?;
         let backup_ns = optional_ns_param(&param)?;
-        let backup_dir: pbs_api_types::BackupDir = Deserialize::deserialize(&param)?;
 
-        check_ns_privs(&store, &backup_ns, &auth_id, PRIV_DATASTORE_BACKUP, 0)?;
+        let backup_dir_api: pbs_api_types::BackupDir = Deserialize::deserialize(&param)?;
 
-        let datastore = DataStore::lookup_datastore(store, Some(Operation::Write))?;
-        let backup_dir = datastore.backup_dir(backup_ns, backup_dir)?;
+        let datastore = check_privs_and_load_store(
+            &store,
+            &backup_ns,
+            &auth_id,
+            0,
+            PRIV_DATASTORE_BACKUP,
+            Some(Operation::Write),
+            &backup_dir_api.group,
+        )?;
+        let backup_dir = datastore.backup_dir(backup_ns.clone(), backup_dir_api.clone())?;
 
         let file_name = CLIENT_LOG_BLOB_NAME;
 
-        let owner = backup_dir.get_owner()?;
-        check_backup_owner(&owner, &auth_id)?;
-
         let mut path = backup_dir.full_path();
         path.push(&file_name);
 
@@ -1494,7 +1486,10 @@ pub fn upload_backup_log(
             bail!("backup already contains a log.");
         }
 
-        println!("Upload backup log to {store}/{backup_dir}/{file_name}");
+        println!(
+            "Upload backup log to {} {backup_dir_api}/{file_name}",
+            print_store_and_ns(&store, &backup_ns),
+        );
 
         let data = req_body
             .map_err(Error::from)
@@ -1519,7 +1514,7 @@ pub fn upload_backup_log(
     input: {
         properties: {
             store: { schema: DATASTORE_SCHEMA },
-            "backup-ns": {
+            ns: {
                 type: BackupNamespace,
                 optional: true,
             },
@@ -1542,33 +1537,29 @@ pub fn upload_backup_log(
 /// Get the entries of the given path of the catalog
 pub fn catalog(
     store: String,
-    backup_ns: Option<BackupNamespace>,
+    ns: Option<BackupNamespace>,
     backup_dir: pbs_api_types::BackupDir,
     filepath: String,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Vec<ArchiveEntry>, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let backup_ns = backup_ns.unwrap_or_default();
-    let owner_check_required = check_ns_privs(
+    let ns = ns.unwrap_or_default();
+
+    let datastore = check_privs_and_load_store(
         &store,
-        &backup_ns,
+        &ns,
         &auth_id,
         PRIV_DATASTORE_READ,
         PRIV_DATASTORE_BACKUP,
+        Some(Operation::Read),
+        &backup_dir.group,
     )?;
 
-    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
-
-    if owner_check_required {
-        let owner = datastore.get_owner(&backup_ns, &backup_dir.group)?;
-        check_backup_owner(&owner, &auth_id)?;
-    }
-
-    let backup_dir = datastore.backup_dir(backup_ns, backup_dir)?;
+    let backup_dir = datastore.backup_dir(ns, backup_dir)?;
 
     let file_name = CATALOG_NAME;
 
-    let (manifest, files) = read_backup_index(&datastore, &backup_dir)?;
+    let (manifest, files) = read_backup_index(&backup_dir)?;
     for file in files {
         if file.filename == file_name && file.crypt_mode == Some(CryptMode::Encrypt) {
             bail!("cannot decode '{}' - is encrypted", file_name);
@@ -1606,7 +1597,7 @@ pub const API_METHOD_PXAR_FILE_DOWNLOAD: ApiMethod = ApiMethod::new(
         "Download single file from pxar file of a backup snapshot. Only works if it's not encrypted.",
         &sorted!([
             ("store", false, &DATASTORE_SCHEMA),
-            ("backup-ns", true, &BACKUP_NAMESPACE_SCHEMA),
+            ("ns", true, &BACKUP_NAMESPACE_SCHEMA),
             ("backup-type", false, &BACKUP_TYPE_SCHEMA),
             ("backup-id", false,  &BACKUP_ID_SCHEMA),
             ("backup-time", false, &BACKUP_TIME_SCHEMA),
@@ -1632,23 +1623,20 @@ pub fn pxar_file_download(
     async move {
         let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
         let store = required_string_param(&param, "store")?;
-        let backup_ns = optional_ns_param(&param)?;
+        let ns = optional_ns_param(&param)?;
+
         let backup_dir: pbs_api_types::BackupDir = Deserialize::deserialize(&param)?;
-        let owner_check_required = check_ns_privs(
+        let datastore = check_privs_and_load_store(
             &store,
-            &backup_ns,
+            &ns,
             &auth_id,
             PRIV_DATASTORE_READ,
             PRIV_DATASTORE_BACKUP,
+            Some(Operation::Read),
+            &backup_dir.group,
         )?;
-        let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
 
-        if owner_check_required {
-            let owner = datastore.get_owner(&backup_ns, &backup_dir.group)?;
-            check_backup_owner(&owner, &auth_id)?;
-        }
-
-        let backup_dir = datastore.backup_dir(backup_ns, backup_dir)?;
+        let backup_dir = datastore.backup_dir(ns, backup_dir)?;
 
         let filepath = required_string_param(&param, "filepath")?.to_owned();
 
@@ -1662,7 +1650,7 @@ pub fn pxar_file_download(
         let mut split = components.splitn(2, |c| *c == b'/');
         let pxar_name = std::str::from_utf8(split.next().unwrap())?;
         let file_path = split.next().unwrap_or(b"/");
-        let (manifest, files) = read_backup_index(&datastore, &backup_dir)?;
+        let (manifest, files) = read_backup_index(&backup_dir)?;
         for file in files {
             if file.filename == pxar_name && file.crypt_mode == Some(CryptMode::Encrypt) {
                 bail!("cannot decode '{}' - is encrypted", pxar_name);
@@ -1819,7 +1807,7 @@ pub fn get_active_operations(store: String, _param: Value) -> Result<Value, Erro
     input: {
         properties: {
             store: { schema: DATASTORE_SCHEMA },
-            "backup-ns": {
+            ns: {
                 type: BackupNamespace,
                 optional: true,
             },
@@ -1838,27 +1826,24 @@ pub fn get_active_operations(store: String, _param: Value) -> Result<Value, Erro
 /// Get "notes" for a backup group
 pub fn get_group_notes(
     store: String,
-    backup_ns: Option<BackupNamespace>,
+    ns: Option<BackupNamespace>,
     backup_group: pbs_api_types::BackupGroup,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<String, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let backup_ns = backup_ns.unwrap_or_default();
-    let owner_check_required = check_ns_privs(
+    let ns = ns.unwrap_or_default();
+
+    let datastore = check_privs_and_load_store(
         &store,
-        &backup_ns,
+        &ns,
         &auth_id,
         PRIV_DATASTORE_AUDIT,
         PRIV_DATASTORE_BACKUP,
+        Some(Operation::Read),
+        &backup_group,
     )?;
-    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
 
-    if owner_check_required {
-        let owner = datastore.get_owner(&backup_ns, &backup_group)?;
-        check_backup_owner(&owner, &auth_id)?;
-    }
-
-    let note_path = get_group_note_path(&datastore, &backup_ns, &backup_group);
+    let note_path = get_group_note_path(&datastore, &ns, &backup_group);
     Ok(file_read_optional_string(note_path)?.unwrap_or_else(|| "".to_owned()))
 }
 
@@ -1866,7 +1851,7 @@ pub fn get_group_notes(
     input: {
         properties: {
             store: { schema: DATASTORE_SCHEMA },
-            "backup-ns": {
+            ns: {
                 type: BackupNamespace,
                 optional: true,
             },
@@ -1888,27 +1873,25 @@ pub fn get_group_notes(
 /// Set "notes" for a backup group
 pub fn set_group_notes(
     store: String,
-    backup_ns: Option<BackupNamespace>,
+    ns: Option<BackupNamespace>,
     backup_group: pbs_api_types::BackupGroup,
     notes: String,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<(), Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let backup_ns = backup_ns.unwrap_or_default();
-    let owner_check_required = check_ns_privs(
+    let ns = ns.unwrap_or_default();
+
+    let datastore = check_privs_and_load_store(
         &store,
-        &backup_ns,
+        &ns,
         &auth_id,
         PRIV_DATASTORE_MODIFY,
         PRIV_DATASTORE_BACKUP,
+        Some(Operation::Write),
+        &backup_group,
     )?;
-    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
-    if owner_check_required {
-        let owner = datastore.get_owner(&backup_ns, &backup_group)?;
-        check_backup_owner(&owner, &auth_id)?;
-    }
 
-    let note_path = get_group_note_path(&datastore, &backup_ns, &backup_group);
+    let note_path = get_group_note_path(&datastore, &ns, &backup_group);
     replace_file(note_path, notes.as_bytes(), CreateOptions::new(), false)?;
 
     Ok(())
@@ -1918,7 +1901,7 @@ pub fn set_group_notes(
     input: {
         properties: {
             store: { schema: DATASTORE_SCHEMA },
-            "backup-ns": {
+            ns: {
                 type: BackupNamespace,
                 optional: true,
             },
@@ -1937,26 +1920,24 @@ pub fn set_group_notes(
 /// Get "notes" for a specific backup
 pub fn get_notes(
     store: String,
-    backup_ns: Option<BackupNamespace>,
+    ns: Option<BackupNamespace>,
     backup_dir: pbs_api_types::BackupDir,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<String, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let backup_ns = backup_ns.unwrap_or_default();
-    let owner_check_required = check_ns_privs(
+    let ns = ns.unwrap_or_default();
+
+    let datastore = check_privs_and_load_store(
         &store,
-        &backup_ns,
+        &ns,
         &auth_id,
         PRIV_DATASTORE_AUDIT,
         PRIV_DATASTORE_BACKUP,
+        Some(Operation::Read),
+        &backup_dir.group,
     )?;
-    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
-    if owner_check_required {
-        let owner = datastore.get_owner(&backup_ns, &backup_dir.group)?;
-        check_backup_owner(&owner, &auth_id)?;
-    }
 
-    let backup_dir = datastore.backup_dir(backup_ns, backup_dir)?;
+    let backup_dir = datastore.backup_dir(ns, backup_dir)?;
 
     let (manifest, _) = backup_dir.load_manifest()?;
 
@@ -1969,7 +1950,7 @@ pub fn get_notes(
     input: {
         properties: {
             store: { schema: DATASTORE_SCHEMA },
-            "backup-ns": {
+            ns: {
                 type: BackupNamespace,
                 optional: true,
             },
@@ -1991,27 +1972,25 @@ pub fn get_notes(
 /// Set "notes" for a specific backup
 pub fn set_notes(
     store: String,
-    backup_ns: Option<BackupNamespace>,
+    ns: Option<BackupNamespace>,
     backup_dir: pbs_api_types::BackupDir,
     notes: String,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<(), Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let backup_ns = backup_ns.unwrap_or_default();
-    let owner_check_required = check_ns_privs(
+    let ns = ns.unwrap_or_default();
+
+    let datastore = check_privs_and_load_store(
         &store,
-        &backup_ns,
+        &ns,
         &auth_id,
         PRIV_DATASTORE_MODIFY,
         PRIV_DATASTORE_BACKUP,
+        Some(Operation::Write),
+        &backup_dir.group,
     )?;
-    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
-    if owner_check_required {
-        let owner = datastore.get_owner(&backup_ns, &backup_dir.group)?;
-        check_backup_owner(&owner, &auth_id)?;
-    }
 
-    let backup_dir = datastore.backup_dir(backup_ns, backup_dir)?;
+    let backup_dir = datastore.backup_dir(ns, backup_dir)?;
 
     backup_dir
         .update_manifest(|manifest| {
@@ -2026,7 +2005,7 @@ pub fn set_notes(
     input: {
         properties: {
             store: { schema: DATASTORE_SCHEMA },
-            "backup-ns": {
+            ns: {
                 type: BackupNamespace,
                 optional: true,
             },
@@ -2045,27 +2024,23 @@ pub fn set_notes(
 /// Query protection for a specific backup
 pub fn get_protection(
     store: String,
-    backup_ns: Option<BackupNamespace>,
+    ns: Option<BackupNamespace>,
     backup_dir: pbs_api_types::BackupDir,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<bool, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let backup_ns = backup_ns.unwrap_or_default();
-    let owner_check_required = check_ns_privs(
+    let ns = ns.unwrap_or_default();
+    let datastore = check_privs_and_load_store(
         &store,
-        &backup_ns,
+        &ns,
         &auth_id,
         PRIV_DATASTORE_AUDIT,
         PRIV_DATASTORE_BACKUP,
+        Some(Operation::Read),
+        &backup_dir.group,
     )?;
-    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
-
-    if owner_check_required {
-        let owner = datastore.get_owner(&backup_ns, &backup_dir.group)?;
-        check_backup_owner(&owner, &auth_id)?;
-    }
 
-    let backup_dir = datastore.backup_dir(backup_ns, backup_dir)?;
+    let backup_dir = datastore.backup_dir(ns, backup_dir)?;
 
     Ok(backup_dir.is_protected())
 }
@@ -2074,7 +2049,7 @@ pub fn get_protection(
     input: {
         properties: {
             store: { schema: DATASTORE_SCHEMA },
-            "backup-ns": {
+            ns: {
                 type: BackupNamespace,
                 optional: true,
             },
@@ -2096,27 +2071,24 @@ pub fn get_protection(
 /// En- or disable protection for a specific backup
 pub fn set_protection(
     store: String,
-    backup_ns: Option<BackupNamespace>,
+    ns: Option<BackupNamespace>,
     backup_dir: pbs_api_types::BackupDir,
     protected: bool,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<(), Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let backup_ns = backup_ns.unwrap_or_default();
-    let owner_check_required = check_ns_privs(
+    let ns = ns.unwrap_or_default();
+    let datastore = check_privs_and_load_store(
         &store,
-        &backup_ns,
+        &ns,
         &auth_id,
         PRIV_DATASTORE_MODIFY,
         PRIV_DATASTORE_BACKUP,
+        Some(Operation::Write),
+        &backup_dir.group,
     )?;
-    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
-    if owner_check_required {
-        let owner = datastore.get_owner(&backup_ns, &backup_dir.group)?;
-        check_backup_owner(&owner, &auth_id)?;
-    }
 
-    let backup_dir = datastore.backup_dir(backup_ns, backup_dir)?;
+    let backup_dir = datastore.backup_dir(ns, backup_dir)?;
 
     datastore.update_protection(&backup_dir, protected)
 }
@@ -2125,7 +2097,7 @@ pub fn set_protection(
     input: {
         properties: {
             store: { schema: DATASTORE_SCHEMA },
-            "backup-ns": {
+            ns: {
                 type: BackupNamespace,
                 optional: true,
             },
@@ -2147,23 +2119,24 @@ pub fn set_protection(
 /// Change owner of a backup group
 pub fn set_backup_owner(
     store: String,
-    backup_ns: Option<BackupNamespace>,
+    ns: Option<BackupNamespace>,
     backup_group: pbs_api_types::BackupGroup,
     new_owner: Authid,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<(), Error> {
-    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
-
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let backup_ns = backup_ns.unwrap_or_default();
-    let owner_check_required = check_ns_privs(
+    let ns = ns.unwrap_or_default();
+    let owner_check_required = check_ns_privs_full(
         &store,
-        &backup_ns,
+        &ns,
         &auth_id,
         PRIV_DATASTORE_MODIFY,
         PRIV_DATASTORE_BACKUP,
     )?;
-    let backup_group = datastore.backup_group(backup_ns, backup_group);
+
+    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
+
+    let backup_group = datastore.backup_group(ns, backup_group);
 
     if owner_check_required {
         let owner = backup_group.get_owner()?;
@@ -2194,7 +2167,7 @@ pub fn set_backup_owner(
                 UNAUTHORIZED,
                 "{} does not have permission to change owner of backup group '{}' to {}",
                 auth_id,
-                backup_group,
+                backup_group.group(),
                 new_owner,
             ));
         }