]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/api2/admin/datastore.rs
replace 'disk_usage' with 'fs_info' from proxmox-sys
[proxmox-backup.git] / src / api2 / admin / datastore.rs
index b9bcde12b130449a53298343b1272a1638e0a2f4..ad2744b718263c7cc08f3173f65dfe9fcb85e2e6 100644 (file)
@@ -4,100 +4,112 @@ use std::collections::HashSet;
 use std::ffi::OsStr;
 use std::os::unix::ffi::OsStrExt;
 use std::path::PathBuf;
+use std::sync::Arc;
 
 use anyhow::{bail, format_err, Error};
 use futures::*;
 use hyper::http::request::Parts;
 use hyper::{header, Body, Response, StatusCode};
+use serde::Deserialize;
 use serde_json::{json, Value};
 use tokio_stream::wrappers::ReceiverStream;
 
-use proxmox::{identity, sortable};
-use proxmox::tools::fs::{
-    file_read_firstline, file_read_optional_string, replace_file, CreateOptions,
-};
+use proxmox_async::blocking::WrappedReaderStream;
+use proxmox_async::{io::AsyncChannelWriter, stream::AsyncReaderStream};
+use proxmox_compression::zstd::ZstdEncoder;
 use proxmox_router::{
-    list_subdirs_api_method, http_err, ApiResponseFuture, ApiHandler, ApiMethod, Router,
-    RpcEnvironment, RpcEnvironmentType, SubdirMap, Permission,
+    http_err, list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission,
+    Router, RpcEnvironment, RpcEnvironmentType, SubdirMap,
 };
 use proxmox_schema::*;
+use proxmox_sys::fs::{
+    file_read_firstline, file_read_optional_string, replace_file, CreateOptions,
+};
+use proxmox_sys::sortable;
+use proxmox_sys::{task_log, task_warn};
 
 use pxar::accessor::aio::Accessor;
 use pxar::EntryKind;
 
-use pbs_api_types::{ Authid, BackupContent, Counts, CryptMode,
-    DataStoreListItem, GarbageCollectionStatus, GroupListItem,
-    SnapshotListItem, SnapshotVerifyState, PruneOptions,
-    DataStoreStatus, RRDMode, RRDTimeFrame,
-    BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_TIME_SCHEMA,
-    BACKUP_TYPE_SCHEMA, DATASTORE_SCHEMA,
-    IGNORE_VERIFIED_BACKUPS_SCHEMA, UPID_SCHEMA,
-    VERIFICATION_OUTDATED_AFTER_SCHEMA, PRIV_DATASTORE_AUDIT,
-    PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_READ, PRIV_DATASTORE_PRUNE,
-    PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_VERIFY,
-
-};
-use pbs_client::pxar::create_zip;
-use pbs_datastore::{
-    check_backup_owner, DataStore, BackupDir, BackupGroup, StoreProgress, LocalChunkReader,
-    CATALOG_NAME,
+use pbs_api_types::{
+    print_ns_and_snapshot, print_store_and_ns, Authid, BackupContent, BackupNamespace, BackupType,
+    Counts, CryptMode, DataStoreListItem, DataStoreStatus, GarbageCollectionStatus, GroupListItem,
+    KeepOptions, Operation, PruneJobOptions, RRDMode, RRDTimeFrame, SnapshotListItem,
+    SnapshotVerifyState, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA,
+    BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, DATASTORE_SCHEMA, IGNORE_VERIFIED_BACKUPS_SCHEMA,
+    MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
+    PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY,
+    UPID_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,
 };
+use pbs_client::pxar::{create_tar, create_zip};
+use pbs_config::CachedUserInfo;
 use pbs_datastore::backup_info::BackupInfo;
 use pbs_datastore::cached_chunk_reader::CachedChunkReader;
 use pbs_datastore::catalog::{ArchiveEntry, CatalogReader};
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::data_blob_reader::DataBlobReader;
 use pbs_datastore::dynamic_index::{BufferedDynamicReader, DynamicIndexReader, LocalDynamicReadAt};
-use pbs_datastore::fixed_index::{FixedIndexReader};
+use pbs_datastore::fixed_index::FixedIndexReader;
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{BackupManifest, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME};
 use pbs_datastore::prune::compute_prune_info;
-use pbs_tools::blocking::WrappedReaderStream;
-use pbs_tools::stream::{AsyncReaderStream, AsyncChannelWriter};
-use pbs_tools::json::{required_integer_param, required_string_param};
-use pbs_tools::{task_log, task_warn};
-use pbs_config::CachedUserInfo;
-use proxmox_rest_server::{WorkerTask, formatter};
+use pbs_datastore::{
+    check_backup_owner, task_tracking, BackupDir, BackupGroup, DataStore, LocalChunkReader,
+    StoreProgress, CATALOG_NAME,
+};
+use pbs_tools::json::required_string_param;
+use proxmox_rest_server::{formatter, WorkerTask};
 
+use crate::api2::backup::optional_ns_param;
 use crate::api2::node::rrd::create_value_from_rrd;
 use crate::backup::{
-    verify_all_backups, verify_backup_group, verify_backup_dir, verify_filter,
+    check_ns_privs_full, verify_all_backups, verify_backup_dir, verify_backup_group, verify_filter,
+    ListAccessibleBackupGroups, NS_PRIVS_OK,
 };
 
 use crate::server::jobstate::Job;
 
-
 const GROUP_NOTES_FILE_NAME: &str = "notes";
 
-fn get_group_note_path(store: &DataStore, group: &BackupGroup) -> PathBuf {
-    let mut note_path = store.base_path();
-    note_path.push(group.group_path());
+fn get_group_note_path(
+    store: &DataStore,
+    ns: &BackupNamespace,
+    group: &pbs_api_types::BackupGroup,
+) -> PathBuf {
+    let mut note_path = store.group_path(ns, group);
     note_path.push(GROUP_NOTES_FILE_NAME);
     note_path
 }
 
-fn check_priv_or_backup_owner(
-    store: &DataStore,
-    group: &BackupGroup,
+// helper to unify common sequence of checks:
+// 1. check privs on NS (full or limited access)
+// 2. load datastore
+// 3. if needed (only limited access), check owner of group
+fn check_privs_and_load_store(
+    store: &str,
+    ns: &BackupNamespace,
     auth_id: &Authid,
-    required_privs: u64,
-) -> Result<(), Error> {
-    let user_info = CachedUserInfo::new()?;
-    let privs = user_info.lookup_privs(&auth_id, &["datastore", store.name()]);
+    full_access_privs: u64,
+    partial_access_privs: u64,
+    operation: Option<Operation>,
+    backup_group: &pbs_api_types::BackupGroup,
+) -> Result<Arc<DataStore>, Error> {
+    let limited = check_ns_privs_full(store, ns, auth_id, full_access_privs, partial_access_privs)?;
+
+    let datastore = DataStore::lookup_datastore(store, operation)?;
 
-    if privs & required_privs == 0 {
-        let owner = store.get_owner(group)?;
-        check_backup_owner(&owner, auth_id)?;
+    if limited {
+        let owner = datastore.get_owner(ns, backup_group)?;
+        check_backup_owner(&owner, &auth_id)?;
     }
-    Ok(())
+
+    Ok(datastore)
 }
 
 fn read_backup_index(
-    store: &DataStore,
     backup_dir: &BackupDir,
 ) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
-
-    let (manifest, index_size) = store.load_manifest(backup_dir)?;
+    let (manifest, index_size) = backup_dir.load_manifest()?;
 
     let mut result = Vec::new();
     for item in manifest.files() {
@@ -121,11 +133,9 @@ fn read_backup_index(
 }
 
 fn get_all_snapshot_files(
-    store: &DataStore,
     info: &BackupInfo,
 ) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
-
-    let (manifest, mut files) = read_backup_index(&store, &info.backup_dir)?;
+    let (manifest, mut files) = read_backup_index(&info.backup_dir)?;
 
     let file_set = files.iter().fold(HashSet::new(), |mut acc, item| {
         acc.insert(item.filename.clone());
@@ -133,7 +143,9 @@ fn get_all_snapshot_files(
     });
 
     for file in &info.files {
-        if file_set.contains(file) { continue; }
+        if file_set.contains(file) {
+            continue;
+        }
         files.push(BackupContent {
             filename: file.to_string(),
             size: None,
@@ -150,78 +162,85 @@ fn get_all_snapshot_files(
             store: {
                 schema: DATASTORE_SCHEMA,
             },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
+            },
         },
     },
     returns: pbs_api_types::ADMIN_DATASTORE_LIST_GROUPS_RETURN_TYPE,
     access: {
-        permission: &Permission::Privilege(
-            &["datastore", "{store}"],
-            PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_BACKUP,
-            true),
+        permission: &Permission::Anybody,
+        description: "Requires DATASTORE_AUDIT for all or DATASTORE_BACKUP for owned groups on \
+            /datastore/{store}[/{namespace}]",
     },
 )]
 /// List backup groups.
 pub fn list_groups(
     store: String,
+    ns: Option<BackupNamespace>,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Vec<GroupListItem>, Error> {
-
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let user_info = CachedUserInfo::new()?;
-    let user_privs = user_info.lookup_privs(&auth_id, &["datastore", &store]);
+    let ns = ns.unwrap_or_default();
+
+    let list_all = !check_ns_privs_full(
+        &store,
+        &ns,
+        &auth_id,
+        PRIV_DATASTORE_AUDIT,
+        PRIV_DATASTORE_BACKUP,
+    )?;
 
-    let datastore = DataStore::lookup_datastore(&store)?;
-    let list_all = (user_privs & PRIV_DATASTORE_AUDIT) != 0;
+    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
 
-    let backup_groups = BackupInfo::list_backup_groups(&datastore.base_path())?;
+    datastore
+        .iter_backup_groups(ns.clone())? // FIXME: Namespaces and recursion parameters!
+        .try_fold(Vec::new(), |mut group_info, group| {
+            let group = group?;
 
-    let group_info = backup_groups
-        .into_iter()
-        .fold(Vec::new(), |mut group_info, group| {
-            let owner = match datastore.get_owner(&group) {
+            let owner = match datastore.get_owner(&ns, group.as_ref()) {
                 Ok(auth_id) => auth_id,
                 Err(err) => {
-                    eprintln!("Failed to get owner of group '{}/{}' - {}",
-                             &store,
-                             group,
-                             err);
-                    return group_info;
-                },
+                    eprintln!(
+                        "Failed to get owner of group '{}' in {} - {}",
+                        group.group(),
+                        print_store_and_ns(&store, &ns),
+                        err
+                    );
+                    return Ok(group_info);
+                }
             };
             if !list_all && check_backup_owner(&owner, &auth_id).is_err() {
-                return group_info;
+                return Ok(group_info);
             }
 
-            let snapshots = match group.list_backups(&datastore.base_path()) {
+            let snapshots = match group.list_backups() {
                 Ok(snapshots) => snapshots,
-                Err(_) => {
-                    return group_info;
-                },
+                Err(_) => return Ok(group_info),
             };
 
             let backup_count: u64 = snapshots.len() as u64;
             if backup_count == 0 {
-                return group_info;
+                return Ok(group_info);
             }
 
             let last_backup = snapshots
                 .iter()
-                .fold(&snapshots[0], |last, curr| {
-                    if curr.is_finished()
-                        && curr.backup_dir.backup_time() > last.backup_dir.backup_time() {
-                        curr
+                .fold(&snapshots[0], |a, b| {
+                    if a.is_finished() && a.backup_dir.backup_time() > b.backup_dir.backup_time() {
+                        a
                     } else {
-                        last
+                        b
                     }
                 })
                 .to_owned();
 
-            let note_path = get_group_note_path(&datastore, &group);
+            let note_path = get_group_note_path(&datastore, &ns, group.as_ref());
             let comment = file_read_firstline(&note_path).ok();
 
             group_info.push(GroupListItem {
-                backup_type: group.backup_type().to_string(),
-                backup_id: group.backup_id().to_string(),
+                backup: group.into(),
                 last_backup: last_backup.backup_dir.backup_time(),
                 owner: Some(owner),
                 backup_count,
@@ -229,51 +248,53 @@ pub fn list_groups(
                 comment,
             });
 
-            group_info
-        });
-
-    Ok(group_info)
+            Ok(group_info)
+        })
 }
 
 #[api(
     input: {
         properties: {
-            store: {
-                schema: DATASTORE_SCHEMA,
-            },
-            "backup-type": {
-                schema: BACKUP_TYPE_SCHEMA,
+            store: { schema: DATASTORE_SCHEMA },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
             },
-            "backup-id": {
-                schema: BACKUP_ID_SCHEMA,
+            group: {
+                type: pbs_api_types::BackupGroup,
+                flatten: true,
             },
         },
     },
     access: {
-        permission: &Permission::Privilege(
-            &["datastore", "{store}"],
-            PRIV_DATASTORE_MODIFY| PRIV_DATASTORE_PRUNE,
-            true),
+        permission: &Permission::Anybody,
+        description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_MODIFY for any\
+            or DATASTORE_PRUNE and being the owner of the group",
     },
 )]
 /// Delete backup group including all snapshots.
 pub fn delete_group(
     store: String,
-    backup_type: String,
-    backup_id: String,
+    ns: Option<BackupNamespace>,
+    group: pbs_api_types::BackupGroup,
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
-
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let ns = ns.unwrap_or_default();
+
+    let datastore = check_privs_and_load_store(
+        &store,
+        &ns,
+        &auth_id,
+        PRIV_DATASTORE_MODIFY,
+        PRIV_DATASTORE_PRUNE,
+        Some(Operation::Write),
+        &group,
+    )?;
 
-    let group = BackupGroup::new(backup_type, backup_id);
-    let datastore = DataStore::lookup_datastore(&store)?;
-
-    check_priv_or_backup_owner(&datastore, &group, &auth_id, PRIV_DATASTORE_MODIFY)?;
-
-    if !datastore.remove_backup_group(&group)? {
-        bail!("did not delete whole group because of protected snapthots");
+    if !datastore.remove_backup_group(&ns, &group)? {
+        bail!("group only partially deleted due to protected snapshots");
     }
 
     Ok(Value::Null)
@@ -282,48 +303,50 @@ pub fn delete_group(
 #[api(
     input: {
         properties: {
-            store: {
-                schema: DATASTORE_SCHEMA,
-            },
-            "backup-type": {
-                schema: BACKUP_TYPE_SCHEMA,
-            },
-            "backup-id": {
-                schema: BACKUP_ID_SCHEMA,
+            store: { schema: DATASTORE_SCHEMA },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
             },
-            "backup-time": {
-                schema: BACKUP_TIME_SCHEMA,
+            backup_dir: {
+                type: pbs_api_types::BackupDir,
+                flatten: true,
             },
         },
     },
     returns: pbs_api_types::ADMIN_DATASTORE_LIST_SNAPSHOT_FILES_RETURN_TYPE,
     access: {
-        permission: &Permission::Privilege(
-            &["datastore", "{store}"],
-            PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP,
-            true),
+        permission: &Permission::Anybody,
+        description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_AUDIT or \
+            DATASTORE_READ for any or DATASTORE_BACKUP and being the owner of the group",
     },
 )]
 /// List snapshot files.
 pub fn list_snapshot_files(
     store: String,
-    backup_type: String,
-    backup_id: String,
-    backup_time: i64,
+    ns: Option<BackupNamespace>,
+    backup_dir: pbs_api_types::BackupDir,
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Vec<BackupContent>, Error> {
-
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let datastore = DataStore::lookup_datastore(&store)?;
-
-    let snapshot = BackupDir::new(backup_type, backup_id, backup_time)?;
+    let ns = ns.unwrap_or_default();
+
+    let datastore = check_privs_and_load_store(
+        &store,
+        &ns,
+        &auth_id,
+        PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_READ,
+        PRIV_DATASTORE_BACKUP,
+        Some(Operation::Read),
+        &backup_dir.group,
+    )?;
 
-    check_priv_or_backup_owner(&datastore, snapshot.group(), &auth_id, PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_READ)?;
+    let snapshot = datastore.backup_dir(ns, backup_dir)?;
 
-    let info = BackupInfo::new(&datastore.base_path(), snapshot)?;
+    let info = BackupInfo::new(snapshot)?;
 
-    let (_manifest, files) = get_all_snapshot_files(&datastore, &info)?;
+    let (_manifest, files) = get_all_snapshot_files(&info)?;
 
     Ok(files)
 }
@@ -331,58 +354,63 @@ pub fn list_snapshot_files(
 #[api(
     input: {
         properties: {
-            store: {
-                schema: DATASTORE_SCHEMA,
-            },
-            "backup-type": {
-                schema: BACKUP_TYPE_SCHEMA,
-            },
-            "backup-id": {
-                schema: BACKUP_ID_SCHEMA,
+            store: { schema: DATASTORE_SCHEMA },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
             },
-            "backup-time": {
-                schema: BACKUP_TIME_SCHEMA,
+            backup_dir: {
+                type: pbs_api_types::BackupDir,
+                flatten: true,
             },
         },
     },
     access: {
-        permission: &Permission::Privilege(
-            &["datastore", "{store}"],
-            PRIV_DATASTORE_MODIFY| PRIV_DATASTORE_PRUNE,
-            true),
+        permission: &Permission::Anybody,
+        description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_MODIFY for any\
+            or DATASTORE_PRUNE and being the owner of the group",
     },
 )]
 /// Delete backup snapshot.
 pub fn delete_snapshot(
     store: String,
-    backup_type: String,
-    backup_id: String,
-    backup_time: i64,
+    ns: Option<BackupNamespace>,
+    backup_dir: pbs_api_types::BackupDir,
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
-
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let ns = ns.unwrap_or_default();
+
+    let datastore = check_privs_and_load_store(
+        &store,
+        &ns,
+        &auth_id,
+        PRIV_DATASTORE_MODIFY,
+        PRIV_DATASTORE_PRUNE,
+        Some(Operation::Write),
+        &backup_dir.group,
+    )?;
 
-    let snapshot = BackupDir::new(backup_type, backup_id, backup_time)?;
-    let datastore = DataStore::lookup_datastore(&store)?;
-
-    check_priv_or_backup_owner(&datastore, snapshot.group(), &auth_id, PRIV_DATASTORE_MODIFY)?;
+    let snapshot = datastore.backup_dir(ns, backup_dir)?;
 
-    datastore.remove_backup_dir(&snapshot, false)?;
+    snapshot.destroy(false)?;
 
     Ok(Value::Null)
 }
 
 #[api(
+    streaming: true,
     input: {
         properties: {
-            store: {
-                schema: DATASTORE_SCHEMA,
+            store: { schema: DATASTORE_SCHEMA },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
             },
             "backup-type": {
                 optional: true,
-                schema: BACKUP_TYPE_SCHEMA,
+                type: BackupType,
             },
             "backup-id": {
                 optional: true,
@@ -392,60 +420,63 @@ pub fn delete_snapshot(
     },
     returns: pbs_api_types::ADMIN_DATASTORE_LIST_SNAPSHOTS_RETURN_TYPE,
     access: {
-        permission: &Permission::Privilege(
-            &["datastore", "{store}"],
-            PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_BACKUP,
-            true),
+        permission: &Permission::Anybody,
+        description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_AUDIT for any \
+            or DATASTORE_BACKUP and being the owner of the group",
     },
 )]
 /// List backup snapshots.
-pub fn list_snapshots (
+pub fn list_snapshots(
     store: String,
-    backup_type: Option<String>,
+    ns: Option<BackupNamespace>,
+    backup_type: Option<BackupType>,
     backup_id: Option<String>,
     _param: Value,
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Vec<SnapshotListItem>, Error> {
-
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let user_info = CachedUserInfo::new()?;
-    let user_privs = user_info.lookup_privs(&auth_id, &["datastore", &store]);
 
-    let list_all = (user_privs & PRIV_DATASTORE_AUDIT) != 0;
+    let ns = ns.unwrap_or_default();
 
-    let datastore = DataStore::lookup_datastore(&store)?;
+    let list_all = !check_ns_privs_full(
+        &store,
+        &ns,
+        &auth_id,
+        PRIV_DATASTORE_AUDIT,
+        PRIV_DATASTORE_BACKUP,
+    )?;
 
-    let base_path = datastore.base_path();
+    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
 
+    // FIXME: filter also owner before collecting, for doing that nicely the owner should move into
+    // backup group and provide an error free (Err -> None) accessor
     let groups = match (backup_type, backup_id) {
         (Some(backup_type), Some(backup_id)) => {
-            let mut groups = Vec::with_capacity(1);
-            groups.push(BackupGroup::new(backup_type, backup_id));
-            groups
-        },
-        (Some(backup_type), None) => {
-            BackupInfo::list_backup_groups(&base_path)?
-                .into_iter()
-                .filter(|group| group.backup_type() == backup_type)
-                .collect()
-        },
-        (None, Some(backup_id)) => {
-            BackupInfo::list_backup_groups(&base_path)?
-                .into_iter()
-                .filter(|group| group.backup_id() == backup_id)
-                .collect()
-        },
-        _ => BackupInfo::list_backup_groups(&base_path)?,
+            vec![datastore.backup_group_from_parts(ns.clone(), backup_type, backup_id)]
+        }
+        // FIXME: Recursion
+        (Some(backup_type), None) => datastore
+            .iter_backup_groups_ok(ns.clone())?
+            .filter(|group| group.backup_type() == backup_type)
+            .collect(),
+        // FIXME: Recursion
+        (None, Some(backup_id)) => datastore
+            .iter_backup_groups_ok(ns.clone())?
+            .filter(|group| group.backup_id() == backup_id)
+            .collect(),
+        // FIXME: Recursion
+        (None, None) => datastore.list_backup_groups(ns.clone())?,
     };
 
     let info_to_snapshot_list_item = |group: &BackupGroup, owner, info: BackupInfo| {
-        let backup_type = group.backup_type().to_string();
-        let backup_id = group.backup_id().to_string();
-        let backup_time = info.backup_dir.backup_time();
-        let protected = info.backup_dir.is_protected(base_path.clone());
+        let backup = pbs_api_types::BackupDir {
+            group: group.into(),
+            time: info.backup_dir.backup_time(),
+        };
+        let protected = info.backup_dir.is_protected();
 
-        match get_all_snapshot_files(&datastore, &info) {
+        match get_all_snapshot_files(&info) {
             Ok((manifest, files)) => {
                 // extract the first line from notes
                 let comment: Option<String> = manifest.unprotected["notes"]
@@ -458,24 +489,23 @@ pub fn list_snapshots (
                     Err(err) => {
                         eprintln!("error parsing fingerprint: '{}'", err);
                         None
-                    },
+                    }
                 };
 
                 let verification = manifest.unprotected["verify_state"].clone();
-                let verification: Option<SnapshotVerifyState> = match serde_json::from_value(verification) {
-                    Ok(verify) => verify,
-                    Err(err) => {
-                        eprintln!("error parsing verification state : '{}'", err);
-                        None
-                    }
-                };
+                let verification: Option<SnapshotVerifyState> =
+                    match serde_json::from_value(verification) {
+                        Ok(verify) => verify,
+                        Err(err) => {
+                            eprintln!("error parsing verification state : '{}'", err);
+                            None
+                        }
+                    };
 
                 let size = Some(files.iter().map(|x| x.size.unwrap_or(0)).sum());
 
                 SnapshotListItem {
-                    backup_type,
-                    backup_id,
-                    backup_time,
+                    backup,
                     comment,
                     verification,
                     fingerprint,
@@ -484,23 +514,21 @@ pub fn list_snapshots (
                     owner,
                     protected,
                 }
-            },
+            }
             Err(err) => {
                 eprintln!("error during snapshot file listing: '{}'", err);
                 let files = info
-                        .files
-                        .into_iter()
-                        .map(|filename| BackupContent {
-                            filename,
-                            size: None,
-                            crypt_mode: None,
-                        })
-                        .collect();
+                    .files
+                    .into_iter()
+                    .map(|filename| BackupContent {
+                        filename,
+                        size: None,
+                        crypt_mode: None,
+                    })
+                    .collect();
 
                 SnapshotListItem {
-                    backup_type,
-                    backup_id,
-                    backup_time,
+                    backup,
                     comment: None,
                     verification: None,
                     fingerprint: None,
@@ -509,77 +537,71 @@ pub fn list_snapshots (
                     owner,
                     protected,
                 }
-            },
+            }
         }
     };
 
-    groups
-        .iter()
-        .try_fold(Vec::new(), |mut snapshots, group| {
-            let owner = match datastore.get_owner(group) {
-                Ok(auth_id) => auth_id,
-                Err(err) => {
-                    eprintln!("Failed to get owner of group '{}/{}' - {}",
-                              &store,
-                              group,
-                              err);
-                    return Ok(snapshots);
-                },
-            };
-
-            if !list_all && check_backup_owner(&owner, &auth_id).is_err() {
+    groups.iter().try_fold(Vec::new(), |mut snapshots, group| {
+        let owner = match group.get_owner() {
+            Ok(auth_id) => auth_id,
+            Err(err) => {
+                eprintln!(
+                    "Failed to get owner of group '{}' in {} - {}",
+                    group.group(),
+                    print_store_and_ns(&store, &ns),
+                    err
+                );
                 return Ok(snapshots);
             }
+        };
 
-            let group_backups = group.list_backups(&datastore.base_path())?;
+        if !list_all && check_backup_owner(&owner, &auth_id).is_err() {
+            return Ok(snapshots);
+        }
 
-            snapshots.extend(
-                group_backups
-                    .into_iter()
-                    .map(|info| info_to_snapshot_list_item(&group, Some(owner.clone()), info))
-            );
+        let group_backups = group.list_backups()?;
 
-            Ok(snapshots)
-        })
-}
-
-fn get_snapshots_count(store: &DataStore, filter_owner: Option<&Authid>) -> Result<Counts, Error> {
-    let base_path = store.base_path();
-    let groups = BackupInfo::list_backup_groups(&base_path)?;
+        snapshots.extend(
+            group_backups
+                .into_iter()
+                .map(|info| info_to_snapshot_list_item(group, Some(owner.clone()), info)),
+        );
 
-    groups.iter()
-        .filter(|group| {
-            let owner = match store.get_owner(&group) {
-                Ok(owner) => owner,
-                Err(err) => {
-                    eprintln!("Failed to get owner of group '{}/{}' - {}",
-                              store.name(),
-                              group,
-                              err);
-                    return false;
-                },
-            };
+        Ok(snapshots)
+    })
+}
 
-            match filter_owner {
-                Some(filter) => check_backup_owner(&owner, filter).is_ok(),
-                None => true,
-            }
-        })
-        .try_fold(Counts::default(), |mut counts, group| {
-            let snapshot_count = group.list_backups(&base_path)?.len() as u64;
+fn get_snapshots_count(store: &Arc<DataStore>, owner: Option<&Authid>) -> Result<Counts, Error> {
+    let root_ns = Default::default();
+    ListAccessibleBackupGroups::new_with_privs(
+        store,
+        root_ns,
+        MAX_NAMESPACE_DEPTH,
+        Some(PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_READ),
+        None,
+        owner,
+    )?
+    .try_fold(Counts::default(), |mut counts, group| {
+        let group = match group {
+            Ok(group) => group,
+            Err(_) => return Ok(counts), // TODO: add this as error counts?
+        };
+        let snapshot_count = group.list_backups()?.len() as u64;
 
+        // only include groups with snapshots, counting/displaying empty groups can confuse
+        if snapshot_count > 0 {
             let type_count = match group.backup_type() {
-                "ct" => counts.ct.get_or_insert(Default::default()),
-                "vm" => counts.vm.get_or_insert(Default::default()),
-                "host" => counts.host.get_or_insert(Default::default()),
-                _ => counts.other.get_or_insert(Default::default()),
+                BackupType::Ct => counts.ct.get_or_insert(Default::default()),
+                BackupType::Vm => counts.vm.get_or_insert(Default::default()),
+                BackupType::Host => counts.host.get_or_insert(Default::default()),
             };
 
             type_count.groups += 1;
             type_count.snapshots += snapshot_count;
+        }
 
-            Ok(counts)
-        })
+        Ok(counts)
+    })
 }
 
 #[api(
@@ -601,7 +623,9 @@ fn get_snapshots_count(store: &DataStore, filter_owner: Option<&Authid>) -> Resu
         type: DataStoreStatus,
     },
     access: {
-        permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_BACKUP, true),
+        permission: &Permission::Anybody,
+        description: "Requires on /datastore/{store} either DATASTORE_AUDIT or DATASTORE_BACKUP for \
+            the full statistics. Counts of accessible groups are always returned, if any",
     },
 )]
 /// Get datastore status.
@@ -611,13 +635,26 @@ pub fn status(
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<DataStoreStatus, Error> {
-    let datastore = DataStore::lookup_datastore(&store)?;
-    let storage = crate::tools::disks::disk_usage(&datastore.base_path())?;
-    let (counts, gc_status) = if verbose {
-        let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-        let user_info = CachedUserInfo::new()?;
+    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let user_info = CachedUserInfo::new()?;
+    let store_privs = user_info.lookup_privs(&auth_id, &["datastore", &store]);
 
-        let store_privs = user_info.lookup_privs(&auth_id, &["datastore", &store]);
+    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read));
+
+    let store_stats = if store_privs & (PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_BACKUP) != 0 {
+        true
+    } else if store_privs & PRIV_DATASTORE_READ != 0 {
+        false // allow at least counts, user can read groups anyway..
+    } else {
+        match user_info.any_privs_below(&auth_id, &["datastore", &store], NS_PRIVS_OK) {
+            // avoid leaking existence info if users hasn't at least any priv. below
+            Ok(false) | Err(_) => return Err(http_err!(FORBIDDEN, "permission check failed")),
+            _ => false,
+        }
+    };
+    let datastore = datastore?; // only unwrap no to avoid leaking existence info
+
+    let (counts, gc_status) = if verbose {
         let filter_owner = if store_privs & PRIV_DATASTORE_AUDIT != 0 {
             None
         } else {
@@ -625,19 +662,34 @@ pub fn status(
         };
 
         let counts = Some(get_snapshots_count(&datastore, filter_owner)?);
-        let gc_status = Some(datastore.last_gc_status());
+        let gc_status = if store_stats {
+            Some(datastore.last_gc_status())
+        } else {
+            None
+        };
 
         (counts, gc_status)
     } else {
         (None, None)
     };
 
-    Ok(DataStoreStatus {
-        total: storage.total,
-        used: storage.used,
-        avail: storage.avail,
-        gc_status,
-        counts,
+    Ok(if store_stats {
+        let storage = proxmox_sys::fs::fs_info(&datastore.base_path())?;
+        DataStoreStatus {
+            total: storage.total,
+            used: storage.used,
+            avail: storage.available,
+            gc_status,
+            counts,
+        }
+    } else {
+        DataStoreStatus {
+            total: 0,
+            used: 0,
+            avail: 0,
+            gc_status,
+            counts,
+        }
     })
 }
 
@@ -647,8 +699,12 @@ pub fn status(
             store: {
                 schema: DATASTORE_SCHEMA,
             },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
+            },
             "backup-type": {
-                schema: BACKUP_TYPE_SCHEMA,
+                type: BackupType,
                 optional: true,
             },
             "backup-id": {
@@ -667,13 +723,19 @@ pub fn status(
                 schema: BACKUP_TIME_SCHEMA,
                 optional: true,
             },
+            "max-depth": {
+                schema: NS_MAX_DEPTH_SCHEMA,
+                optional: true,
+            },
         },
     },
     returns: {
         schema: UPID_SCHEMA,
     },
     access: {
-        permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_VERIFY | PRIV_DATASTORE_BACKUP, true),
+        permission: &Permission::Anybody,
+        description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_VERIFY for any \
+            or DATASTORE_BACKUP and being the owner of the group",
     },
 )]
 /// Verify backups.
@@ -682,17 +744,29 @@ pub fn status(
 /// or all backups in the datastore.
 pub fn verify(
     store: String,
-    backup_type: Option<String>,
+    ns: Option<BackupNamespace>,
+    backup_type: Option<BackupType>,
     backup_id: Option<String>,
     backup_time: Option<i64>,
     ignore_verified: Option<bool>,
     outdated_after: Option<i64>,
+    max_depth: Option<usize>,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
-    let datastore = DataStore::lookup_datastore(&store)?;
+    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let ns = ns.unwrap_or_default();
+
+    let owner_check_required = check_ns_privs_full(
+        &store,
+        &ns,
+        &auth_id,
+        PRIV_DATASTORE_VERIFY,
+        PRIV_DATASTORE_BACKUP,
+    )?;
+
+    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
     let ignore_verified = ignore_verified.unwrap_or(true);
 
-    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
     let worker_id;
 
     let mut backup_dir = None;
@@ -701,25 +775,49 @@ pub fn verify(
 
     match (backup_type, backup_id, backup_time) {
         (Some(backup_type), Some(backup_id), Some(backup_time)) => {
-            worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_time);
-            let dir = BackupDir::new(backup_type, backup_id, backup_time)?;
+            worker_id = format!(
+                "{}:{}/{}/{}/{:08X}",
+                store,
+                ns.display_as_path(),
+                backup_type,
+                backup_id,
+                backup_time
+            );
+            let dir =
+                datastore.backup_dir_from_parts(ns.clone(), backup_type, backup_id, backup_time)?;
 
-            check_priv_or_backup_owner(&datastore, dir.group(), &auth_id, PRIV_DATASTORE_VERIFY)?;
+            if owner_check_required {
+                let owner = datastore.get_owner(dir.backup_ns(), dir.as_ref())?;
+                check_backup_owner(&owner, &auth_id)?;
+            }
 
             backup_dir = Some(dir);
             worker_type = "verify_snapshot";
         }
         (Some(backup_type), Some(backup_id), None) => {
-            worker_id = format!("{}:{}/{}", store, backup_type, backup_id);
-            let group = BackupGroup::new(backup_type, backup_id);
+            worker_id = format!(
+                "{}:{}/{}/{}",
+                store,
+                ns.display_as_path(),
+                backup_type,
+                backup_id
+            );
+            let group = pbs_api_types::BackupGroup::from((backup_type, backup_id));
 
-            check_priv_or_backup_owner(&datastore, &group, &auth_id, PRIV_DATASTORE_VERIFY)?;
+            if owner_check_required {
+                let owner = datastore.get_owner(&ns, &group)?;
+                check_backup_owner(&owner, &auth_id)?;
+            }
 
-            backup_group = Some(group);
+            backup_group = Some(datastore.backup_group(ns.clone(), group));
             worker_type = "verify_group";
         }
         (None, None, None) => {
-            worker_id = store.clone();
+            worker_id = if ns.is_root() {
+                store
+            } else {
+                format!("{}:{}", store, ns.display_as_path())
+            };
         }
         _ => bail!("parameters do not specify a backup group or snapshot"),
     }
@@ -739,11 +837,12 @@ pub fn verify(
                     &verify_worker,
                     &backup_dir,
                     worker.upid().clone(),
-                    Some(&move |manifest| {
-                        verify_filter(ignore_verified, outdated_after, manifest)
-                    }),
+                    Some(&move |manifest| verify_filter(ignore_verified, outdated_after, manifest)),
                 )? {
-                    res.push(backup_dir.to_string());
+                    res.push(print_ns_and_snapshot(
+                        backup_dir.backup_ns(),
+                        backup_dir.as_ref(),
+                    ));
                 }
                 res
             } else if let Some(backup_group) = backup_group {
@@ -752,17 +851,12 @@ pub fn verify(
                     &backup_group,
                     &mut StoreProgress::new(1),
                     worker.upid(),
-                    Some(&move |manifest| {
-                        verify_filter(ignore_verified, outdated_after, manifest)
-                    }),
+                    Some(&move |manifest| verify_filter(ignore_verified, outdated_after, manifest)),
                 )?;
                 failed_dirs
             } else {
-                let privs = CachedUserInfo::new()?
-                    .lookup_privs(&auth_id, &["datastore", &store]);
-
-                let owner = if privs & PRIV_DATASTORE_VERIFY == 0 {
-                    Some(auth_id)
+                let owner = if owner_check_required {
+                    Some(&auth_id)
                 } else {
                     None
                 };
@@ -770,10 +864,10 @@ pub fn verify(
                 verify_all_backups(
                     &verify_worker,
                     worker.upid(),
+                    ns,
+                    max_depth,
                     owner,
-                    Some(&move |manifest| {
-                        verify_filter(ignore_verified, outdated_after, manifest)
-                    }),
+                    Some(&move |manifest| verify_filter(ignore_verified, outdated_after, manifest)),
                 )?
             };
             if !failed_dirs.is_empty() {
@@ -793,11 +887,9 @@ pub fn verify(
 #[api(
     input: {
         properties: {
-            "backup-id": {
-                schema: BACKUP_ID_SCHEMA,
-            },
-            "backup-type": {
-                schema: BACKUP_TYPE_SCHEMA,
+            group: {
+                type: pbs_api_types::BackupGroup,
+                flatten: true,
             },
             "dry-run": {
                 optional: true,
@@ -805,79 +897,100 @@ pub fn verify(
                 default: false,
                 description: "Just show what prune would do, but do not delete anything.",
             },
-            "prune-options": {
-                type: PruneOptions,
+            "keep-options": {
+                type: KeepOptions,
                 flatten: true,
             },
             store: {
                 schema: DATASTORE_SCHEMA,
             },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
+            },
         },
     },
     returns: pbs_api_types::ADMIN_DATASTORE_PRUNE_RETURN_TYPE,
     access: {
-        permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_PRUNE, true),
+        permission: &Permission::Anybody,
+        description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_MODIFY for any\
+            or DATASTORE_PRUNE and being the owner of the group",
     },
 )]
 /// Prune a group on the datastore
 pub fn prune(
-    backup_id: String,
-    backup_type: String,
+    group: pbs_api_types::BackupGroup,
     dry_run: bool,
-    prune_options: PruneOptions,
+    keep_options: KeepOptions,
     store: String,
+    ns: Option<BackupNamespace>,
     _param: Value,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
-
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let ns = ns.unwrap_or_default();
+    let datastore = check_privs_and_load_store(
+        &store,
+        &ns,
+        &auth_id,
+        PRIV_DATASTORE_MODIFY,
+        PRIV_DATASTORE_PRUNE,
+        Some(Operation::Write),
+        &group,
+    )?;
 
-    let group = BackupGroup::new(&backup_type, &backup_id);
-
-    let datastore = DataStore::lookup_datastore(&store)?;
-
-    check_priv_or_backup_owner(&datastore, &group, &auth_id, PRIV_DATASTORE_MODIFY)?;
-
-    let worker_id = format!("{}:{}/{}", store, &backup_type, &backup_id);
+    let worker_id = format!("{}:{}:{}", store, ns, group);
+    let group = datastore.backup_group(ns.clone(), group);
 
     let mut prune_result = Vec::new();
 
-    let list = group.list_backups(&datastore.base_path())?;
+    let list = group.list_backups()?;
 
-    let mut prune_info = compute_prune_info(list, &prune_options)?;
+    let mut prune_info = compute_prune_info(list, &keep_options)?;
 
     prune_info.reverse(); // delete older snapshots first
 
-    let keep_all = !pbs_datastore::prune::keeps_something(&prune_options);
+    let keep_all = !keep_options.keeps_something();
 
     if dry_run {
         for (info, mark) in prune_info {
             let keep = keep_all || mark.keep();
 
-            let backup_time = info.backup_dir.backup_time();
-            let group = info.backup_dir.group();
-
-            prune_result.push(json!({
-                "backup-type": group.backup_type(),
-                "backup-id": group.backup_id(),
-                "backup-time": backup_time,
+            let mut result = json!({
+                "backup-type": info.backup_dir.backup_type(),
+                "backup-id": info.backup_dir.backup_id(),
+                "backup-time": info.backup_dir.backup_time(),
                 "keep": keep,
                 "protected": mark.protected(),
-            }));
+            });
+            let prune_ns = info.backup_dir.backup_ns();
+            if !prune_ns.is_root() {
+                result["ns"] = serde_json::to_value(prune_ns)?;
+            }
+            prune_result.push(result);
         }
         return Ok(json!(prune_result));
     }
 
-
     // We use a WorkerTask just to have a task log, but run synchrounously
     let worker = WorkerTask::new("prune", Some(worker_id), auth_id.to_string(), true)?;
 
     if keep_all {
         task_log!(worker, "No prune selection - keeping all files.");
     } else {
-        task_log!(worker, "retention options: {}", pbs_datastore::prune::cli_options_string(&prune_options));
-        task_log!(worker, "Starting prune on store \"{}\" group \"{}/{}\"",
-                  store, backup_type, backup_id);
+        let mut opts = Vec::new();
+        if !ns.is_root() {
+            opts.push(format!("--ns {ns}"));
+        }
+        crate::server::cli_keep_options(&mut opts, &keep_options);
+
+        task_log!(worker, "retention options: {}", opts.join(" "));
+        task_log!(
+            worker,
+            "Starting prune on {} group \"{}\"",
+            print_store_and_ns(&store, &ns),
+            group.group(),
+        );
     }
 
     for (info, mark) in prune_info {
@@ -885,29 +998,22 @@ pub fn prune(
 
         let backup_time = info.backup_dir.backup_time();
         let timestamp = info.backup_dir.backup_time_string();
-        let group = info.backup_dir.group();
-
+        let group: &pbs_api_types::BackupGroup = info.backup_dir.as_ref();
 
-        let msg = format!(
-            "{}/{}/{} {}",
-            group.backup_type(),
-            group.backup_id(),
-            timestamp,
-            mark,
-        );
+        let msg = format!("{}/{}/{} {}", group.ty, group.id, timestamp, mark,);
 
         task_log!(worker, "{}", msg);
 
         prune_result.push(json!({
-            "backup-type": group.backup_type(),
-            "backup-id": group.backup_id(),
+            "backup-type": group.ty,
+            "backup-id": group.id,
             "backup-time": backup_time,
             "keep": keep,
             "protected": mark.protected(),
         }));
 
         if !(dry_run || keep) {
-            if let Err(err) = datastore.remove_backup_dir(&info.backup_dir, false) {
+            if let Err(err) = info.backup_dir.destroy(false) {
                 task_warn!(
                     worker,
                     "failed to remove dir {:?}: {}",
@@ -933,7 +1039,7 @@ pub fn prune(
                 description: "Just show what prune would do, but do not delete anything.",
             },
             "prune-options": {
-                type: PruneOptions,
+                type: PruneJobOptions,
                 flatten: true,
             },
             store: {
@@ -945,37 +1051,43 @@ pub fn prune(
         schema: UPID_SCHEMA,
     },
     access: {
-        permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_PRUNE, true),
+        permission: &Permission::Anybody,
+        description: "Requires Datastore.Modify or Datastore.Prune on the datastore/namespace.",
     },
 )]
 /// Prune the datastore
 pub fn prune_datastore(
     dry_run: bool,
-    prune_options: PruneOptions,
+    prune_options: PruneJobOptions,
     store: String,
     _param: Value,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<String, Error> {
+    let user_info = CachedUserInfo::new()?;
 
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
 
-    let datastore = DataStore::lookup_datastore(&store)?;
+    user_info.check_privs(
+        &auth_id,
+        &prune_options.acl_path(&store),
+        PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_PRUNE,
+        true,
+    )?;
+
+    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
+    let ns = prune_options.ns.clone().unwrap_or_default();
+    let worker_id = format!("{}:{}", store, ns);
 
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
     let upid_str = WorkerTask::new_thread(
         "prune",
-        Some(store.clone()),
+        Some(worker_id),
         auth_id.to_string(),
         to_stdout,
-        move |worker| crate::server::prune_datastore(
-            worker.clone(),
-            auth_id,
-            prune_options,
-            &store,
-            datastore,
-            dry_run
-        ),
+        move |worker| {
+            crate::server::prune_datastore(worker, auth_id, prune_options, datastore, dry_run)
+        },
     )?;
 
     Ok(upid_str)
@@ -1002,17 +1114,23 @@ pub fn start_garbage_collection(
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
-
-    let datastore = DataStore::lookup_datastore(&store)?;
+    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
 
-    let job =  Job::new("garbage_collection", &store)
+    let job = Job::new("garbage_collection", &store)
         .map_err(|_| format_err!("garbage collection already running"))?;
 
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
-    let upid_str = crate::server::do_garbage_collection_job(job, datastore, &auth_id, None, to_stdout)
-        .map_err(|err| format_err!("unable to start garbage collection job on datastore {} - {}", store, err))?;
+    let upid_str =
+        crate::server::do_garbage_collection_job(job, datastore, &auth_id, None, to_stdout)
+            .map_err(|err| {
+                format_err!(
+                    "unable to start garbage collection job on datastore {} - {}",
+                    store,
+                    err
+                )
+            })?;
 
     Ok(json!(upid_str))
 }
@@ -1038,8 +1156,7 @@ pub fn garbage_collection_status(
     _info: &ApiMethod,
     _rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<GarbageCollectionStatus, Error> {
-
-    let datastore = DataStore::lookup_datastore(&store)?;
+    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
 
     let status = datastore.last_gc_status();
 
@@ -1062,7 +1179,6 @@ pub fn get_datastore_list(
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Vec<DataStoreListItem>, Error> {
-
     let (config, _digest) = pbs_config::datastore::config()?;
 
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
@@ -1071,15 +1187,27 @@ pub fn get_datastore_list(
     let mut list = Vec::new();
 
     for (store, (_, data)) in &config.sections {
-        let user_privs = user_info.lookup_privs(&auth_id, &["datastore", &store]);
-        let allowed = (user_privs & (PRIV_DATASTORE_AUDIT| PRIV_DATASTORE_BACKUP)) != 0;
-        if allowed {
-            list.push(
-                DataStoreListItem {
-                    store: store.clone(),
-                    comment: data["comment"].as_str().map(String::from),
-                }
-            );
+        let acl_path = &["datastore", store];
+        let user_privs = user_info.lookup_privs(&auth_id, acl_path);
+        let allowed = (user_privs & (PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_BACKUP)) != 0;
+
+        let mut allow_id = false;
+        if !allowed {
+            if let Ok(any_privs) = user_info.any_privs_below(&auth_id, acl_path, NS_PRIVS_OK) {
+                allow_id = any_privs;
+            }
+        }
+
+        if allowed || allow_id {
+            list.push(DataStoreListItem {
+                store: store.clone(),
+                comment: if !allowed {
+                    None
+                } else {
+                    data["comment"].as_str().map(String::from)
+                },
+                maintenance: data["maintenance-mode"].as_str().map(String::from),
+            });
         }
     }
 
@@ -1093,16 +1221,20 @@ pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
         "Download single raw file from backup snapshot.",
         &sorted!([
             ("store", false, &DATASTORE_SCHEMA),
+            ("ns", true, &BACKUP_NAMESPACE_SCHEMA),
             ("backup-type", false, &BACKUP_TYPE_SCHEMA),
-            ("backup-id", false,  &BACKUP_ID_SCHEMA),
+            ("backup-id", false, &BACKUP_ID_SCHEMA),
             ("backup-time", false, &BACKUP_TIME_SCHEMA),
             ("file-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA),
         ]),
-    )
-).access(None, &Permission::Privilege(
-    &["datastore", "{store}"],
-    PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP,
-    true)
+    ),
+)
+.access(
+    Some(
+        "Requires on /datastore/{store}[/{namespace}] either DATASTORE_READ for any or \
+        DATASTORE_BACKUP and being the owner of the group",
+    ),
+    &Permission::Anybody,
 );
 
 pub fn download_file(
@@ -1112,24 +1244,33 @@ pub fn download_file(
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
 ) -> ApiResponseFuture {
-
     async move {
+        let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
         let store = required_string_param(&param, "store")?;
-        let datastore = DataStore::lookup_datastore(store)?;
+        let backup_ns = optional_ns_param(&param)?;
 
-        let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+        let backup_dir: pbs_api_types::BackupDir = Deserialize::deserialize(&param)?;
+        let datastore = check_privs_and_load_store(
+            &store,
+            &backup_ns,
+            &auth_id,
+            PRIV_DATASTORE_READ,
+            PRIV_DATASTORE_BACKUP,
+            Some(Operation::Read),
+            &backup_dir.group,
+        )?;
 
         let file_name = required_string_param(&param, "file-name")?.to_owned();
 
-        let backup_type = required_string_param(&param, "backup-type")?;
-        let backup_id = required_string_param(&param, "backup-id")?;
-        let backup_time = required_integer_param(&param, "backup-time")?;
-
-        let backup_dir = BackupDir::new(backup_type, backup_id, backup_time)?;
-
-        check_priv_or_backup_owner(&datastore, backup_dir.group(), &auth_id, PRIV_DATASTORE_READ)?;
+        println!(
+            "Download {} from {} ({}/{})",
+            file_name,
+            print_store_and_ns(&store, &backup_ns),
+            backup_dir,
+            file_name
+        );
 
-        println!("Download {} from {} ({}/{})", file_name, store, backup_dir, file_name);
+        let backup_dir = datastore.backup_dir(backup_ns, backup_dir)?;
 
         let mut path = datastore.base_path();
         path.push(backup_dir.relative_path());
@@ -1139,21 +1280,23 @@ pub fn download_file(
             .await
             .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
 
-        let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
-            .map_ok(|bytes| bytes.freeze())
-            .map_err(move |err| {
-                eprintln!("error during streaming of '{:?}' - {}", &path, err);
-                err
-            });
+        let payload =
+            tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
+                .map_ok(|bytes| bytes.freeze())
+                .map_err(move |err| {
+                    eprintln!("error during streaming of '{:?}' - {}", &path, err);
+                    err
+                });
         let body = Body::wrap_stream(payload);
 
         // fixme: set other headers ?
         Ok(Response::builder()
-           .status(StatusCode::OK)
-           .header(header::CONTENT_TYPE, "application/octet-stream")
-           .body(body)
-           .unwrap())
-    }.boxed()
+            .status(StatusCode::OK)
+            .header(header::CONTENT_TYPE, "application/octet-stream")
+            .body(body)
+            .unwrap())
+    }
+    .boxed()
 }
 
 #[sortable]
@@ -1163,16 +1306,20 @@ pub const API_METHOD_DOWNLOAD_FILE_DECODED: ApiMethod = ApiMethod::new(
         "Download single decoded file from backup snapshot. Only works if it's not encrypted.",
         &sorted!([
             ("store", false, &DATASTORE_SCHEMA),
+            ("ns", true, &BACKUP_NAMESPACE_SCHEMA),
             ("backup-type", false, &BACKUP_TYPE_SCHEMA),
-            ("backup-id", false,  &BACKUP_ID_SCHEMA),
+            ("backup-id", false, &BACKUP_ID_SCHEMA),
             ("backup-time", false, &BACKUP_TIME_SCHEMA),
             ("file-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA),
         ]),
-    )
-).access(None, &Permission::Privilege(
-    &["datastore", "{store}"],
-    PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP,
-    true)
+    ),
+)
+.access(
+    Some(
+        "Requires on /datastore/{store}[/{namespace}] either DATASTORE_READ for any or \
+        DATASTORE_BACKUP and being the owner of the group",
+    ),
+    &Permission::Anybody,
 );
 
 pub fn download_file_decoded(
@@ -1182,31 +1329,39 @@ pub fn download_file_decoded(
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
 ) -> ApiResponseFuture {
-
     async move {
+        let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
         let store = required_string_param(&param, "store")?;
-        let datastore = DataStore::lookup_datastore(store)?;
+        let backup_ns = optional_ns_param(&param)?;
 
-        let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+        let backup_dir_api: pbs_api_types::BackupDir = Deserialize::deserialize(&param)?;
+        let datastore = check_privs_and_load_store(
+            &store,
+            &backup_ns,
+            &auth_id,
+            PRIV_DATASTORE_READ,
+            PRIV_DATASTORE_BACKUP,
+            Some(Operation::Read),
+            &backup_dir_api.group,
+        )?;
 
         let file_name = required_string_param(&param, "file-name")?.to_owned();
+        let backup_dir = datastore.backup_dir(backup_ns.clone(), backup_dir_api.clone())?;
 
-        let backup_type = required_string_param(&param, "backup-type")?;
-        let backup_id = required_string_param(&param, "backup-id")?;
-        let backup_time = required_integer_param(&param, "backup-time")?;
-
-        let backup_dir = BackupDir::new(backup_type, backup_id, backup_time)?;
-
-        check_priv_or_backup_owner(&datastore, backup_dir.group(), &auth_id, PRIV_DATASTORE_READ)?;
-
-        let (manifest, files) = read_backup_index(&datastore, &backup_dir)?;
+        let (manifest, files) = read_backup_index(&backup_dir)?;
         for file in files {
             if file.filename == file_name && file.crypt_mode == Some(CryptMode::Encrypt) {
                 bail!("cannot decode '{}' - is encrypted", file_name);
             }
         }
 
-        println!("Download {} from {} ({}/{})", file_name, store, backup_dir, file_name);
+        println!(
+            "Download {} from {} ({}/{})",
+            file_name,
+            print_store_and_ns(&store, &backup_ns),
+            backup_dir_api,
+            file_name
+        );
 
         let mut path = datastore.base_path();
         path.push(backup_dir.relative_path());
@@ -1216,34 +1371,38 @@ pub fn download_file_decoded(
 
         let body = match extension {
             "didx" => {
-                let index = DynamicIndexReader::open(&path)
-                    .map_err(|err| format_err!("unable to read dynamic index '{:?}' - {}", &path, err))?;
+                let index = DynamicIndexReader::open(&path).map_err(|err| {
+                    format_err!("unable to read dynamic index '{:?}' - {}", &path, err)
+                })?;
                 let (csum, size) = index.compute_csum();
                 manifest.verify_file(&file_name, &csum, size)?;
 
                 let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None);
                 let reader = CachedChunkReader::new(chunk_reader, index, 1).seekable();
-                Body::wrap_stream(AsyncReaderStream::new(reader)
-                    .map_err(move |err| {
-                        eprintln!("error during streaming of '{:?}' - {}", path, err);
-                        err
-                    }))
-            },
+                Body::wrap_stream(AsyncReaderStream::new(reader).map_err(move |err| {
+                    eprintln!("error during streaming of '{:?}' - {}", path, err);
+                    err
+                }))
+            }
             "fidx" => {
-                let index = FixedIndexReader::open(&path)
-                    .map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", &path, err))?;
+                let index = FixedIndexReader::open(&path).map_err(|err| {
+                    format_err!("unable to read fixed index '{:?}' - {}", &path, err)
+                })?;
 
                 let (csum, size) = index.compute_csum();
                 manifest.verify_file(&file_name, &csum, size)?;
 
                 let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None);
                 let reader = CachedChunkReader::new(chunk_reader, index, 1).seekable();
-                Body::wrap_stream(AsyncReaderStream::with_buffer_size(reader, 4*1024*1024)
-                    .map_err(move |err| {
-                        eprintln!("error during streaming of '{:?}' - {}", path, err);
-                        err
-                    }))
-            },
+                Body::wrap_stream(
+                    AsyncReaderStream::with_buffer_size(reader, 4 * 1024 * 1024).map_err(
+                        move |err| {
+                            eprintln!("error during streaming of '{:?}' - {}", path, err);
+                            err
+                        },
+                    ),
+                )
+            }
             "blob" => {
                 let file = std::fs::File::open(&path)
                     .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
@@ -1251,25 +1410,27 @@ pub fn download_file_decoded(
                 // FIXME: load full blob to verify index checksum?
 
                 Body::wrap_stream(
-                    WrappedReaderStream::new(DataBlobReader::new(file, None)?)
-                        .map_err(move |err| {
+                    WrappedReaderStream::new(DataBlobReader::new(file, None)?).map_err(
+                        move |err| {
                             eprintln!("error during streaming of '{:?}' - {}", path, err);
                             err
-                        })
+                        },
+                    ),
                 )
-            },
+            }
             extension => {
                 bail!("cannot download '{}' files", extension);
-            },
+            }
         };
 
         // fixme: set other headers ?
         Ok(Response::builder()
-           .status(StatusCode::OK)
-           .header(header::CONTENT_TYPE, "application/octet-stream")
-           .body(body)
-           .unwrap())
-    }.boxed()
+            .status(StatusCode::OK)
+            .header(header::CONTENT_TYPE, "application/octet-stream")
+            .body(body)
+            .unwrap())
+    }
+    .boxed()
 }
 
 #[sortable]
@@ -1279,14 +1440,16 @@ pub const API_METHOD_UPLOAD_BACKUP_LOG: ApiMethod = ApiMethod::new(
         "Upload the client backup log file into a backup snapshot ('client.log.blob').",
         &sorted!([
             ("store", false, &DATASTORE_SCHEMA),
+            ("ns", true, &BACKUP_NAMESPACE_SCHEMA),
             ("backup-type", false, &BACKUP_TYPE_SCHEMA),
             ("backup-id", false, &BACKUP_ID_SCHEMA),
             ("backup-time", false, &BACKUP_TIME_SCHEMA),
         ]),
-    )
-).access(
+    ),
+)
+.access(
     Some("Only the backup creator/owner is allowed to do this."),
-    &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_BACKUP, false)
+    &Permission::Anybody,
 );
 
 pub fn upload_backup_log(
@@ -1296,33 +1459,37 @@ pub fn upload_backup_log(
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
 ) -> ApiResponseFuture {
-
     async move {
+        let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
         let store = required_string_param(&param, "store")?;
-        let datastore = DataStore::lookup_datastore(store)?;
-
-        let file_name =  CLIENT_LOG_BLOB_NAME;
-
-        let backup_type = required_string_param(&param, "backup-type")?;
-        let backup_id = required_string_param(&param, "backup-id")?;
-        let backup_time = required_integer_param(&param, "backup-time")?;
+        let backup_ns = optional_ns_param(&param)?;
 
-        let backup_dir = BackupDir::new(backup_type, backup_id, backup_time)?;
-
-        let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-        let owner = datastore.get_owner(backup_dir.group())?;
-        check_backup_owner(&owner, &auth_id)?;
+        let backup_dir_api: pbs_api_types::BackupDir = Deserialize::deserialize(&param)?;
 
-        let mut path = datastore.base_path();
-        path.push(backup_dir.relative_path());
+        let datastore = check_privs_and_load_store(
+            &store,
+            &backup_ns,
+            &auth_id,
+            0,
+            PRIV_DATASTORE_BACKUP,
+            Some(Operation::Write),
+            &backup_dir_api.group,
+        )?;
+        let backup_dir = datastore.backup_dir(backup_ns.clone(), backup_dir_api.clone())?;
+
+        let file_name = CLIENT_LOG_BLOB_NAME;
+
+        let mut path = backup_dir.full_path();
         path.push(&file_name);
 
         if path.exists() {
             bail!("backup already contains a log.");
         }
 
-        println!("Upload backup log to {}/{}/{}/{}/{}", store,
-                 backup_type, backup_id, backup_dir.backup_time_string(), file_name);
+        println!(
+            "Upload backup log to {} {backup_dir_api}/{file_name}",
+            print_store_and_ns(&store, &backup_ns),
+        );
 
         let data = req_body
             .map_err(Error::from)
@@ -1339,23 +1506,21 @@ pub fn upload_backup_log(
 
         // fixme: use correct formatter
         Ok(formatter::JSON_FORMATTER.format_data(Value::Null, &*rpcenv))
-    }.boxed()
+    }
+    .boxed()
 }
 
 #[api(
     input: {
         properties: {
-            store: {
-                schema: DATASTORE_SCHEMA,
-            },
-            "backup-type": {
-                schema: BACKUP_TYPE_SCHEMA,
-            },
-            "backup-id": {
-                schema: BACKUP_ID_SCHEMA,
+            store: { schema: DATASTORE_SCHEMA },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
             },
-            "backup-time": {
-                schema: BACKUP_TIME_SCHEMA,
+            backup_dir: {
+                type: pbs_api_types::BackupDir,
+                flatten: true,
             },
             "filepath": {
                 description: "Base64 encoded path.",
@@ -1364,29 +1529,37 @@ pub fn upload_backup_log(
         },
     },
     access: {
-        permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP, true),
+        description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_READ for any or \
+            DATASTORE_BACKUP and being the owner of the group",
+        permission: &Permission::Anybody,
     },
 )]
 /// Get the entries of the given path of the catalog
 pub fn catalog(
     store: String,
-    backup_type: String,
-    backup_id: String,
-    backup_time: i64,
+    ns: Option<BackupNamespace>,
+    backup_dir: pbs_api_types::BackupDir,
     filepath: String,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Vec<ArchiveEntry>, Error> {
-    let datastore = DataStore::lookup_datastore(&store)?;
-
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let ns = ns.unwrap_or_default();
+
+    let datastore = check_privs_and_load_store(
+        &store,
+        &ns,
+        &auth_id,
+        PRIV_DATASTORE_READ,
+        PRIV_DATASTORE_BACKUP,
+        Some(Operation::Read),
+        &backup_dir.group,
+    )?;
 
-    let backup_dir = BackupDir::new(backup_type, backup_id, backup_time)?;
-
-    check_priv_or_backup_owner(&datastore, backup_dir.group(), &auth_id, PRIV_DATASTORE_READ)?;
+    let backup_dir = datastore.backup_dir(ns, backup_dir)?;
 
     let file_name = CATALOG_NAME;
 
-    let (manifest, files) = read_backup_index(&datastore, &backup_dir)?;
+    let (manifest, files) = read_backup_index(&backup_dir)?;
     for file in files {
         if file.filename == file_name && file.crypt_mode == Some(CryptMode::Encrypt) {
             bail!("cannot decode '{}' - is encrypted", file_name);
@@ -1401,7 +1574,7 @@ pub fn catalog(
         .map_err(|err| format_err!("unable to read dynamic index '{:?}' - {}", &path, err))?;
 
     let (csum, size) = index.compute_csum();
-    manifest.verify_file(&file_name, &csum, size)?;
+    manifest.verify_file(file_name, &csum, size)?;
 
     let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None);
     let reader = BufferedDynamicReader::new(index, chunk_reader);
@@ -1424,16 +1597,20 @@ pub const API_METHOD_PXAR_FILE_DOWNLOAD: ApiMethod = ApiMethod::new(
         "Download single file from pxar file of a backup snapshot. Only works if it's not encrypted.",
         &sorted!([
             ("store", false, &DATASTORE_SCHEMA),
+            ("ns", true, &BACKUP_NAMESPACE_SCHEMA),
             ("backup-type", false, &BACKUP_TYPE_SCHEMA),
             ("backup-id", false,  &BACKUP_ID_SCHEMA),
             ("backup-time", false, &BACKUP_TIME_SCHEMA),
             ("filepath", false, &StringSchema::new("Base64 encoded path").schema()),
+            ("tar", true, &BooleanSchema::new("Download as .tar.zst").schema()),
         ]),
     )
-).access(None, &Permission::Privilege(
-    &["datastore", "{store}"],
-    PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP,
-    true)
+).access(
+    Some(
+        "Requires on /datastore/{store}[/{namespace}] either DATASTORE_READ for any or \
+        DATASTORE_BACKUP and being the owner of the group",
+    ),
+    &Permission::Anybody,
 );
 
 pub fn pxar_file_download(
@@ -1443,22 +1620,27 @@ pub fn pxar_file_download(
     _info: &ApiMethod,
     rpcenv: Box<dyn RpcEnvironment>,
 ) -> ApiResponseFuture {
-
     async move {
-        let store = required_string_param(&param, "store")?;
-        let datastore = DataStore::lookup_datastore(&store)?;
-
         let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+        let store = required_string_param(&param, "store")?;
+        let ns = optional_ns_param(&param)?;
 
-        let filepath = required_string_param(&param, "filepath")?.to_owned();
+        let backup_dir: pbs_api_types::BackupDir = Deserialize::deserialize(&param)?;
+        let datastore = check_privs_and_load_store(
+            &store,
+            &ns,
+            &auth_id,
+            PRIV_DATASTORE_READ,
+            PRIV_DATASTORE_BACKUP,
+            Some(Operation::Read),
+            &backup_dir.group,
+        )?;
 
-        let backup_type = required_string_param(&param, "backup-type")?;
-        let backup_id = required_string_param(&param, "backup-id")?;
-        let backup_time = required_integer_param(&param, "backup-time")?;
+        let backup_dir = datastore.backup_dir(ns, backup_dir)?;
 
-        let backup_dir = BackupDir::new(backup_type, backup_id, backup_time)?;
+        let filepath = required_string_param(&param, "filepath")?.to_owned();
 
-        check_priv_or_backup_owner(&datastore, backup_dir.group(), &auth_id, PRIV_DATASTORE_READ)?;
+        let tar = param["tar"].as_bool().unwrap_or(false);
 
         let mut components = base64::decode(&filepath)?;
         if !components.is_empty() && components[0] == b'/' {
@@ -1468,7 +1650,7 @@ pub fn pxar_file_download(
         let mut split = components.splitn(2, |c| *c == b'/');
         let pxar_name = std::str::from_utf8(split.next().unwrap())?;
         let file_path = split.next().unwrap_or(b"/");
-        let (manifest, files) = read_backup_index(&datastore, &backup_dir)?;
+        let (manifest, files) = read_backup_index(&backup_dir)?;
         for file in files {
             if file.filename == pxar_name && file.crypt_mode == Some(CryptMode::Encrypt) {
                 bail!("cannot decode '{}' - is encrypted", pxar_name);
@@ -1483,7 +1665,7 @@ pub fn pxar_file_download(
             .map_err(|err| format_err!("unable to read dynamic index '{:?}' - {}", &path, err))?;
 
         let (csum, size) = index.compute_csum();
-        manifest.verify_file(&pxar_name, &csum, size)?;
+        manifest.verify_file(pxar_name, &csum, size)?;
 
         let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None);
         let reader = BufferedDynamicReader::new(index, chunk_reader);
@@ -1494,7 +1676,8 @@ pub fn pxar_file_download(
         let root = decoder.open_root().await?;
         let path = OsStr::from_bytes(file_path).to_os_string();
         let file = root
-            .lookup(&path).await?
+            .lookup(&path)
+            .await?
             .ok_or_else(|| format_err!("error opening '{:?}'", path))?;
 
         let body = match file.kind() {
@@ -1507,34 +1690,49 @@ pub fn pxar_file_download(
             EntryKind::Hardlink(_) => Body::wrap_stream(
                 AsyncReaderStream::new(decoder.follow_hardlink(&file).await?.contents().await?)
                     .map_err(move |err| {
-                        eprintln!(
-                            "error during streaming of hardlink '{:?}' - {}",
-                            path, err
-                        );
+                        eprintln!("error during streaming of hardlink '{:?}' - {}", path, err);
                         err
                     }),
             ),
             EntryKind::Directory => {
-                let (sender, receiver) = tokio::sync::mpsc::channel(100);
+                let (sender, receiver) = tokio::sync::mpsc::channel::<Result<_, Error>>(100);
                 let channelwriter = AsyncChannelWriter::new(sender, 1024 * 1024);
-                proxmox_rest_server::spawn_internal_task(
-                    create_zip(channelwriter, decoder, path.clone(), false)
-                );
-                Body::wrap_stream(ReceiverStream::new(receiver).map_err(move |err| {
-                    eprintln!("error during streaming of zip '{:?}' - {}", path, err);
-                    err
-                }))
+                if tar {
+                    proxmox_rest_server::spawn_internal_task(create_tar(
+                        channelwriter,
+                        decoder,
+                        path.clone(),
+                        false,
+                    ));
+                    let zstdstream = ZstdEncoder::new(ReceiverStream::new(receiver))?;
+                    Body::wrap_stream(zstdstream.map_err(move |err| {
+                        eprintln!("error during streaming of tar.zst '{:?}' - {}", path, err);
+                        err
+                    }))
+                } else {
+                    proxmox_rest_server::spawn_internal_task(create_zip(
+                        channelwriter,
+                        decoder,
+                        path.clone(),
+                        false,
+                    ));
+                    Body::wrap_stream(ReceiverStream::new(receiver).map_err(move |err| {
+                        eprintln!("error during streaming of zip '{:?}' - {}", path, err);
+                        err
+                    }))
+                }
             }
             other => bail!("cannot download file of type {:?}", other),
         };
 
         // fixme: set other headers ?
         Ok(Response::builder()
-           .status(StatusCode::OK)
-           .header(header::CONTENT_TYPE, "application/octet-stream")
-           .body(body)
-           .unwrap())
-    }.boxed()
+            .status(StatusCode::OK)
+            .header(header::CONTENT_TYPE, "application/octet-stream")
+            .body(body)
+            .unwrap())
+    }
+    .boxed()
 }
 
 #[api(
@@ -1552,7 +1750,8 @@ pub fn pxar_file_download(
         },
     },
     access: {
-        permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_BACKUP, true),
+        permission: &Permission::Privilege(
+            &["datastore", "{store}"], PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_BACKUP, true),
     },
 )]
 /// Read datastore stats
@@ -1562,18 +1761,25 @@ pub fn get_rrd_stats(
     cf: RRDMode,
     _param: Value,
 ) -> Result<Value, Error> {
+    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
+    let disk_manager = crate::tools::disks::DiskManage::new();
+
+    let mut rrd_fields = vec![
+        "total",
+        "used",
+        "read_ios",
+        "read_bytes",
+        "write_ios",
+        "write_bytes",
+    ];
+
+    // we do not have io_ticks for zpools, so don't include them
+    match disk_manager.find_mounted_device(&datastore.base_path()) {
+        Ok(Some((fs_type, _, _))) if fs_type.as_str() == "zfs" => {}
+        _ => rrd_fields.push("io_ticks"),
+    };
 
-    create_value_from_rrd(
-        &format!("datastore/{}", store),
-        &[
-            "total", "used",
-            "read_ios", "read_bytes",
-            "write_ios", "write_bytes",
-            "io_ticks",
-        ],
-        timeframe,
-        cf,
-    )
+    create_value_from_rrd(&format!("datastore/{}", store), &rrd_fields, timeframe, cf)
 }
 
 #[api(
@@ -1582,47 +1788,76 @@ pub fn get_rrd_stats(
             store: {
                 schema: DATASTORE_SCHEMA,
             },
-            "backup-type": {
-                schema: BACKUP_TYPE_SCHEMA,
+        },
+    },
+    access: {
+        permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_AUDIT, true),
+    },
+)]
+/// Read datastore stats
+pub fn get_active_operations(store: String, _param: Value) -> Result<Value, Error> {
+    let active_operations = task_tracking::get_active_operations(&store)?;
+    Ok(json!({
+        "read": active_operations.read,
+        "write": active_operations.write,
+    }))
+}
+
+#[api(
+    input: {
+        properties: {
+            store: { schema: DATASTORE_SCHEMA },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
             },
-            "backup-id": {
-                schema: BACKUP_ID_SCHEMA,
+            backup_group: {
+                type: pbs_api_types::BackupGroup,
+                flatten: true,
             },
         },
     },
     access: {
-        permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_BACKUP, true),
+        permission: &Permission::Anybody,
+        description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_AUDIT for any \
+            or DATASTORE_BACKUP and being the owner of the group",
     },
 )]
 /// Get "notes" for a backup group
 pub fn get_group_notes(
     store: String,
-    backup_type: String,
-    backup_id: String,
+    ns: Option<BackupNamespace>,
+    backup_group: pbs_api_types::BackupGroup,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<String, Error> {
-    let datastore = DataStore::lookup_datastore(&store)?;
-
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let backup_group = BackupGroup::new(backup_type, backup_id);
-
-    check_priv_or_backup_owner(&datastore, &backup_group, &auth_id, PRIV_DATASTORE_AUDIT)?;
+    let ns = ns.unwrap_or_default();
+
+    let datastore = check_privs_and_load_store(
+        &store,
+        &ns,
+        &auth_id,
+        PRIV_DATASTORE_AUDIT,
+        PRIV_DATASTORE_BACKUP,
+        Some(Operation::Read),
+        &backup_group,
+    )?;
 
-    let note_path = get_group_note_path(&datastore, &backup_group);
+    let note_path = get_group_note_path(&datastore, &ns, &backup_group);
     Ok(file_read_optional_string(note_path)?.unwrap_or_else(|| "".to_owned()))
 }
 
 #[api(
     input: {
         properties: {
-            store: {
-                schema: DATASTORE_SCHEMA,
-            },
-            "backup-type": {
-                schema: BACKUP_TYPE_SCHEMA,
+            store: { schema: DATASTORE_SCHEMA },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
             },
-            "backup-id": {
-                schema: BACKUP_ID_SCHEMA,
+            backup_group: {
+                type: pbs_api_types::BackupGroup,
+                flatten: true,
             },
             notes: {
                 description: "A multiline text.",
@@ -1630,27 +1865,33 @@ pub fn get_group_notes(
         },
     },
     access: {
-        permission: &Permission::Privilege(&["datastore", "{store}"],
-                                           PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_BACKUP,
-                                           true),
+        permission: &Permission::Anybody,
+        description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_MODIFY for any \
+            or DATASTORE_BACKUP and being the owner of the group",
     },
 )]
 /// Set "notes" for a backup group
 pub fn set_group_notes(
     store: String,
-    backup_type: String,
-    backup_id: String,
+    ns: Option<BackupNamespace>,
+    backup_group: pbs_api_types::BackupGroup,
     notes: String,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<(), Error> {
-    let datastore = DataStore::lookup_datastore(&store)?;
-
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let backup_group = BackupGroup::new(backup_type, backup_id);
-
-    check_priv_or_backup_owner(&datastore, &backup_group, &auth_id, PRIV_DATASTORE_MODIFY)?;
+    let ns = ns.unwrap_or_default();
+
+    let datastore = check_privs_and_load_store(
+        &store,
+        &ns,
+        &auth_id,
+        PRIV_DATASTORE_MODIFY,
+        PRIV_DATASTORE_BACKUP,
+        Some(Operation::Write),
+        &backup_group,
+    )?;
 
-    let note_path = get_group_note_path(&datastore, &backup_group);
+    let note_path = get_group_note_path(&datastore, &ns, &backup_group);
     replace_file(note_path, notes.as_bytes(), CreateOptions::new(), false)?;
 
     Ok(())
@@ -1659,44 +1900,48 @@ pub fn set_group_notes(
 #[api(
     input: {
         properties: {
-            store: {
-                schema: DATASTORE_SCHEMA,
-            },
-            "backup-type": {
-                schema: BACKUP_TYPE_SCHEMA,
-            },
-            "backup-id": {
-                schema: BACKUP_ID_SCHEMA,
+            store: { schema: DATASTORE_SCHEMA },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
             },
-            "backup-time": {
-                schema: BACKUP_TIME_SCHEMA,
+            backup_dir: {
+                type: pbs_api_types::BackupDir,
+                flatten: true,
             },
         },
     },
     access: {
-        permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_BACKUP, true),
+        permission: &Permission::Anybody,
+        description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_AUDIT for any \
+            or DATASTORE_BACKUP and being the owner of the group",
     },
 )]
 /// Get "notes" for a specific backup
 pub fn get_notes(
     store: String,
-    backup_type: String,
-    backup_id: String,
-    backup_time: i64,
+    ns: Option<BackupNamespace>,
+    backup_dir: pbs_api_types::BackupDir,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<String, Error> {
-    let datastore = DataStore::lookup_datastore(&store)?;
-
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let backup_dir = BackupDir::new(backup_type, backup_id, backup_time)?;
+    let ns = ns.unwrap_or_default();
+
+    let datastore = check_privs_and_load_store(
+        &store,
+        &ns,
+        &auth_id,
+        PRIV_DATASTORE_AUDIT,
+        PRIV_DATASTORE_BACKUP,
+        Some(Operation::Read),
+        &backup_dir.group,
+    )?;
 
-    check_priv_or_backup_owner(&datastore, backup_dir.group(), &auth_id, PRIV_DATASTORE_AUDIT)?;
+    let backup_dir = datastore.backup_dir(ns, backup_dir)?;
 
-    let (manifest, _) = datastore.load_manifest(&backup_dir)?;
+    let (manifest, _) = backup_dir.load_manifest()?;
 
-    let notes = manifest.unprotected["notes"]
-        .as_str()
-        .unwrap_or("");
+    let notes = manifest.unprotected["notes"].as_str().unwrap_or("");
 
     Ok(String::from(notes))
 }
@@ -1704,17 +1949,14 @@ pub fn get_notes(
 #[api(
     input: {
         properties: {
-            store: {
-                schema: DATASTORE_SCHEMA,
-            },
-            "backup-type": {
-                schema: BACKUP_TYPE_SCHEMA,
-            },
-            "backup-id": {
-                schema: BACKUP_ID_SCHEMA,
+            store: { schema: DATASTORE_SCHEMA },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
             },
-            "backup-time": {
-                schema: BACKUP_TIME_SCHEMA,
+            backup_dir: {
+                type: pbs_api_types::BackupDir,
+                flatten: true,
             },
             notes: {
                 description: "A multiline text.",
@@ -1722,30 +1964,39 @@ pub fn get_notes(
         },
     },
     access: {
-        permission: &Permission::Privilege(&["datastore", "{store}"],
-                                           PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_BACKUP,
-                                           true),
+        permission: &Permission::Anybody,
+        description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_MODIFY for any \
+            or DATASTORE_BACKUP and being the owner of the group",
     },
 )]
 /// Set "notes" for a specific backup
 pub fn set_notes(
     store: String,
-    backup_type: String,
-    backup_id: String,
-    backup_time: i64,
+    ns: Option<BackupNamespace>,
+    backup_dir: pbs_api_types::BackupDir,
     notes: String,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<(), Error> {
-    let datastore = DataStore::lookup_datastore(&store)?;
-
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let backup_dir = BackupDir::new(backup_type, backup_id, backup_time)?;
+    let ns = ns.unwrap_or_default();
+
+    let datastore = check_privs_and_load_store(
+        &store,
+        &ns,
+        &auth_id,
+        PRIV_DATASTORE_MODIFY,
+        PRIV_DATASTORE_BACKUP,
+        Some(Operation::Write),
+        &backup_dir.group,
+    )?;
 
-    check_priv_or_backup_owner(&datastore, backup_dir.group(), &auth_id, PRIV_DATASTORE_MODIFY)?;
+    let backup_dir = datastore.backup_dir(ns, backup_dir)?;
 
-    datastore.update_manifest(&backup_dir,|manifest| {
-        manifest.unprotected["notes"] = notes.into();
-    }).map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
+    backup_dir
+        .update_manifest(|manifest| {
+            manifest.unprotected["notes"] = notes.into();
+        })
+        .map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
 
     Ok(())
 }
@@ -1753,58 +2004,58 @@ pub fn set_notes(
 #[api(
     input: {
         properties: {
-            store: {
-                schema: DATASTORE_SCHEMA,
-            },
-            "backup-type": {
-                schema: BACKUP_TYPE_SCHEMA,
-            },
-            "backup-id": {
-                schema: BACKUP_ID_SCHEMA,
+            store: { schema: DATASTORE_SCHEMA },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
             },
-            "backup-time": {
-                schema: BACKUP_TIME_SCHEMA,
+            backup_dir: {
+                type: pbs_api_types::BackupDir,
+                flatten: true,
             },
         },
     },
     access: {
-        permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_BACKUP, true),
+        permission: &Permission::Anybody,
+        description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_AUDIT for any \
+            or DATASTORE_BACKUP and being the owner of the group",
     },
 )]
 /// Query protection for a specific backup
 pub fn get_protection(
     store: String,
-    backup_type: String,
-    backup_id: String,
-    backup_time: i64,
+    ns: Option<BackupNamespace>,
+    backup_dir: pbs_api_types::BackupDir,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<bool, Error> {
-    let datastore = DataStore::lookup_datastore(&store)?;
-
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let backup_dir = BackupDir::new(backup_type, backup_id, backup_time)?;
-
-    check_priv_or_backup_owner(&datastore, backup_dir.group(), &auth_id, PRIV_DATASTORE_AUDIT)?;
+    let ns = ns.unwrap_or_default();
+    let datastore = check_privs_and_load_store(
+        &store,
+        &ns,
+        &auth_id,
+        PRIV_DATASTORE_AUDIT,
+        PRIV_DATASTORE_BACKUP,
+        Some(Operation::Read),
+        &backup_dir.group,
+    )?;
 
-    let protected_path = backup_dir.protected_file(datastore.base_path());
+    let backup_dir = datastore.backup_dir(ns, backup_dir)?;
 
-    Ok(protected_path.exists())
+    Ok(backup_dir.is_protected())
 }
 
 #[api(
     input: {
         properties: {
-            store: {
-                schema: DATASTORE_SCHEMA,
-            },
-            "backup-type": {
-                schema: BACKUP_TYPE_SCHEMA,
-            },
-            "backup-id": {
-                schema: BACKUP_ID_SCHEMA,
+            store: { schema: DATASTORE_SCHEMA },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
             },
-            "backup-time": {
-                schema: BACKUP_TIME_SCHEMA,
+            backup_dir: {
+                type: pbs_api_types::BackupDir,
+                flatten: true,
             },
             protected: {
                 description: "Enable/disable protection.",
@@ -1812,26 +2063,32 @@ pub fn get_protection(
         },
     },
     access: {
-        permission: &Permission::Privilege(&["datastore", "{store}"],
-                                           PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_BACKUP,
-                                           true),
+        permission: &Permission::Anybody,
+        description: "Requires on /datastore/{store}[/{namespace}] either DATASTORE_MODIFY for any \
+            or DATASTORE_BACKUP and being the owner of the group",
     },
 )]
 /// En- or disable protection for a specific backup
 pub fn set_protection(
     store: String,
-    backup_type: String,
-    backup_id: String,
-    backup_time: i64,
+    ns: Option<BackupNamespace>,
+    backup_dir: pbs_api_types::BackupDir,
     protected: bool,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<(), Error> {
-    let datastore = DataStore::lookup_datastore(&store)?;
-
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
-    let backup_dir = BackupDir::new(backup_type, backup_id, backup_time)?;
+    let ns = ns.unwrap_or_default();
+    let datastore = check_privs_and_load_store(
+        &store,
+        &ns,
+        &auth_id,
+        PRIV_DATASTORE_MODIFY,
+        PRIV_DATASTORE_BACKUP,
+        Some(Operation::Write),
+        &backup_dir.group,
+    )?;
 
-    check_priv_or_backup_owner(&datastore, backup_dir.group(), &auth_id, PRIV_DATASTORE_MODIFY)?;
+    let backup_dir = datastore.backup_dir(ns, backup_dir)?;
 
     datastore.update_protection(&backup_dir, protected)
 }
@@ -1839,14 +2096,14 @@ pub fn set_protection(
 #[api(
     input: {
         properties: {
-            store: {
-                schema: DATASTORE_SCHEMA,
-            },
-            "backup-type": {
-                schema: BACKUP_TYPE_SCHEMA,
+            store: { schema: DATASTORE_SCHEMA },
+            ns: {
+                type: BackupNamespace,
+                optional: true,
             },
-            "backup-id": {
-                schema: BACKUP_ID_SCHEMA,
+            backup_group: {
+                type: pbs_api_types::BackupGroup,
+                flatten: true,
             },
             "new-owner": {
                 type: Authid,
@@ -1855,80 +2112,82 @@ pub fn set_protection(
     },
     access: {
         permission: &Permission::Anybody,
-        description: "Datastore.Modify on whole datastore, or changing ownership between user and a user's token for owned backups with Datastore.Backup"
+        description: "Datastore.Modify on whole datastore, or changing ownership between user and \
+            a user's token for owned backups with Datastore.Backup"
     },
 )]
 /// Change owner of a backup group
 pub fn set_backup_owner(
     store: String,
-    backup_type: String,
-    backup_id: String,
+    ns: Option<BackupNamespace>,
+    backup_group: pbs_api_types::BackupGroup,
     new_owner: Authid,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<(), Error> {
-
-    let datastore = DataStore::lookup_datastore(&store)?;
-
-    let backup_group = BackupGroup::new(backup_type, backup_id);
-
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let ns = ns.unwrap_or_default();
+    let owner_check_required = check_ns_privs_full(
+        &store,
+        &ns,
+        &auth_id,
+        PRIV_DATASTORE_MODIFY,
+        PRIV_DATASTORE_BACKUP,
+    )?;
 
-    let user_info = CachedUserInfo::new()?;
+    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
 
-    let privs = user_info.lookup_privs(&auth_id, &["datastore", &store]);
+    let backup_group = datastore.backup_group(ns, backup_group);
 
-    let allowed = if (privs & PRIV_DATASTORE_MODIFY) != 0 {
-        // High-privilege user/token
-        true
-    } else if (privs & PRIV_DATASTORE_BACKUP) != 0 {
-        let owner = datastore.get_owner(&backup_group)?;
+    if owner_check_required {
+        let owner = backup_group.get_owner()?;
 
-        match (owner.is_token(), new_owner.is_token()) {
+        let allowed = match (owner.is_token(), new_owner.is_token()) {
             (true, true) => {
                 // API token to API token, owned by same user
                 let owner = owner.user();
                 let new_owner = new_owner.user();
                 owner == new_owner && Authid::from(owner.clone()) == auth_id
-            },
+            }
             (true, false) => {
                 // API token to API token owner
-                Authid::from(owner.user().clone()) == auth_id
-                    && new_owner == auth_id
-            },
+                Authid::from(owner.user().clone()) == auth_id && new_owner == auth_id
+            }
             (false, true) => {
                 // API token owner to API token
-                owner == auth_id
-                    && Authid::from(new_owner.user().clone()) == auth_id
-            },
+                owner == auth_id && Authid::from(new_owner.user().clone()) == auth_id
+            }
             (false, false) => {
                 // User to User, not allowed for unprivileged users
                 false
-            },
-        }
-    } else {
-        false
-    };
+            }
+        };
 
-    if !allowed {
-        return Err(http_err!(UNAUTHORIZED,
-                  "{} does not have permission to change owner of backup group '{}' to {}",
-                  auth_id,
-                  backup_group,
-                  new_owner,
-        ));
+        if !allowed {
+            return Err(http_err!(
+                UNAUTHORIZED,
+                "{} does not have permission to change owner of backup group '{}' to {}",
+                auth_id,
+                backup_group.group(),
+                new_owner,
+            ));
+        }
     }
 
+    let user_info = CachedUserInfo::new()?;
+
     if !user_info.is_active_auth_id(&new_owner) {
-        bail!("{} '{}' is inactive or non-existent",
-              if new_owner.is_token() {
-                  "API token".to_string()
-              } else {
-                  "user".to_string()
-              },
-              new_owner);
+        bail!(
+            "{} '{}' is inactive or non-existent",
+            if new_owner.is_token() {
+                "API token".to_string()
+            } else {
+                "user".to_string()
+            },
+            new_owner
+        );
     }
 
-    datastore.set_owner(&backup_group, &new_owner, true)?;
+    backup_group.set_owner(&new_owner, true)?;
 
     Ok(())
 }
@@ -1936,108 +2195,86 @@ pub fn set_backup_owner(
 #[sortable]
 const DATASTORE_INFO_SUBDIRS: SubdirMap = &[
     (
-        "catalog",
-        &Router::new()
-            .get(&API_METHOD_CATALOG)
+        "active-operations",
+        &Router::new().get(&API_METHOD_GET_ACTIVE_OPERATIONS),
     ),
+    ("catalog", &Router::new().get(&API_METHOD_CATALOG)),
     (
         "change-owner",
-        &Router::new()
-            .post(&API_METHOD_SET_BACKUP_OWNER)
+        &Router::new().post(&API_METHOD_SET_BACKUP_OWNER),
     ),
     (
         "download",
-        &Router::new()
-            .download(&API_METHOD_DOWNLOAD_FILE)
+        &Router::new().download(&API_METHOD_DOWNLOAD_FILE),
     ),
     (
         "download-decoded",
-        &Router::new()
-            .download(&API_METHOD_DOWNLOAD_FILE_DECODED)
-    ),
-    (
-        "files",
-        &Router::new()
-            .get(&API_METHOD_LIST_SNAPSHOT_FILES)
+        &Router::new().download(&API_METHOD_DOWNLOAD_FILE_DECODED),
     ),
+    ("files", &Router::new().get(&API_METHOD_LIST_SNAPSHOT_FILES)),
     (
         "gc",
         &Router::new()
             .get(&API_METHOD_GARBAGE_COLLECTION_STATUS)
-            .post(&API_METHOD_START_GARBAGE_COLLECTION)
+            .post(&API_METHOD_START_GARBAGE_COLLECTION),
     ),
     (
         "group-notes",
         &Router::new()
             .get(&API_METHOD_GET_GROUP_NOTES)
-            .put(&API_METHOD_SET_GROUP_NOTES)
+            .put(&API_METHOD_SET_GROUP_NOTES),
     ),
     (
         "groups",
         &Router::new()
             .get(&API_METHOD_LIST_GROUPS)
-            .delete(&API_METHOD_DELETE_GROUP)
+            .delete(&API_METHOD_DELETE_GROUP),
+    ),
+    (
+        "namespace",
+        // FIXME: move into datastore:: sub-module?!
+        &crate::api2::admin::namespace::ROUTER,
     ),
     (
         "notes",
         &Router::new()
             .get(&API_METHOD_GET_NOTES)
-            .put(&API_METHOD_SET_NOTES)
+            .put(&API_METHOD_SET_NOTES),
     ),
     (
         "protected",
         &Router::new()
             .get(&API_METHOD_GET_PROTECTION)
-            .put(&API_METHOD_SET_PROTECTION)
-    ),
-    (
-        "prune",
-        &Router::new()
-            .post(&API_METHOD_PRUNE)
+            .put(&API_METHOD_SET_PROTECTION),
     ),
+    ("prune", &Router::new().post(&API_METHOD_PRUNE)),
     (
         "prune-datastore",
-        &Router::new()
-            .post(&API_METHOD_PRUNE_DATASTORE)
+        &Router::new().post(&API_METHOD_PRUNE_DATASTORE),
     ),
     (
         "pxar-file-download",
-        &Router::new()
-            .download(&API_METHOD_PXAR_FILE_DOWNLOAD)
-    ),
-    (
-        "rrd",
-        &Router::new()
-            .get(&API_METHOD_GET_RRD_STATS)
+        &Router::new().download(&API_METHOD_PXAR_FILE_DOWNLOAD),
     ),
+    ("rrd", &Router::new().get(&API_METHOD_GET_RRD_STATS)),
     (
         "snapshots",
         &Router::new()
             .get(&API_METHOD_LIST_SNAPSHOTS)
-            .delete(&API_METHOD_DELETE_SNAPSHOT)
-    ),
-    (
-        "status",
-        &Router::new()
-            .get(&API_METHOD_STATUS)
+            .delete(&API_METHOD_DELETE_SNAPSHOT),
     ),
+    ("status", &Router::new().get(&API_METHOD_STATUS)),
     (
         "upload-backup-log",
-        &Router::new()
-            .upload(&API_METHOD_UPLOAD_BACKUP_LOG)
-    ),
-    (
-        "verify",
-        &Router::new()
-            .post(&API_METHOD_VERIFY)
+        &Router::new().upload(&API_METHOD_UPLOAD_BACKUP_LOG),
     ),
+    ("verify", &Router::new().post(&API_METHOD_VERIFY)),
 ];
 
 const DATASTORE_INFO_ROUTER: Router = Router::new()
     .get(&list_subdirs_api_method!(DATASTORE_INFO_SUBDIRS))
     .subdirs(DATASTORE_INFO_SUBDIRS);
 
-
 pub const ROUTER: Router = Router::new()
     .get(&API_METHOD_GET_DATASTORE_LIST)
     .match_all("store", &DATASTORE_INFO_ROUTER);