use proxmox::api::api;
use proxmox::api::{ApiMethod, Router, RpcEnvironment, Permission};
-use crate::server::{WorkerTask, jobstate::Job};
+use crate::server::{WorkerTask, jobstate::Job, pull::pull_store};
use crate::backup::DataStore;
-use crate::client::{HttpClient, BackupRepository, pull::pull_store};
-use crate::api2::types::*;
+use crate::client::{HttpClient, BackupRepository};
+use crate::api2::types::{
+ DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, Authid,
+};
use crate::config::{
remote,
sync::SyncJobConfig,
worker.log(format!("Sync datastore '{}' from '{}/{}'",
sync_job.store, sync_job.remote, sync_job.remote_store));
- crate::client::pull::pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, sync_owner).await?;
+ pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, sync_owner).await?;
worker.log(format!("sync job '{}' end", &job_id));
mod backup_specification;
pub use backup_specification::*;
-pub mod pull;
-
/// Connect to localhost:8007 as root@pam
///
/// This automatically creates a ticket if run as 'root' user.
+++ /dev/null
-//! Sync datastore from remote server
-
-use std::collections::{HashMap, HashSet};
-use std::convert::TryFrom;
-use std::io::{Seek, SeekFrom};
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{Arc, Mutex};
-use std::time::SystemTime;
-
-use anyhow::{bail, format_err, Error};
-use serde_json::json;
-
-use proxmox::api::error::{HttpError, StatusCode};
-
-use pbs_api_types::{Authid, SnapshotListItem, GroupListItem};
-use pbs_datastore::{task_log, BackupInfo, BackupDir, BackupGroup, StoreProgress};
-use pbs_datastore::data_blob::DataBlob;
-use pbs_datastore::dynamic_index::DynamicIndexReader;
-use pbs_datastore::fixed_index::FixedIndexReader;
-use pbs_datastore::index::IndexFile;
-use pbs_datastore::manifest::{
- CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
-};
-use pbs_tools::sha::sha256;
-
-use crate::{
- backup::DataStore,
- client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader},
- server::WorkerTask,
- tools::ParallelHandler,
-};
-
-// fixme: implement filters
-// fixme: delete vanished groups
-// Todo: correctly lock backup groups
-
-async fn pull_index_chunks<I: IndexFile>(
- worker: &WorkerTask,
- chunk_reader: RemoteChunkReader,
- target: Arc<DataStore>,
- index: I,
- downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
- use futures::stream::{self, StreamExt, TryStreamExt};
-
- let start_time = SystemTime::now();
-
- let stream = stream::iter(
- (0..index.index_count())
- .map(|pos| index.chunk_info(pos).unwrap())
- .filter(|info| {
- let mut guard = downloaded_chunks.lock().unwrap();
- let done = guard.contains(&info.digest);
- if !done {
- // Note: We mark a chunk as downloaded before its actually downloaded
- // to avoid duplicate downloads.
- guard.insert(info.digest);
- }
- !done
- }),
- );
-
- let target2 = target.clone();
- let verify_pool = ParallelHandler::new(
- "sync chunk writer",
- 4,
- move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
- // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
- chunk.verify_unencrypted(size as usize, &digest)?;
- target2.insert_chunk(&chunk, &digest)?;
- Ok(())
- },
- );
-
- let verify_and_write_channel = verify_pool.channel();
-
- let bytes = Arc::new(AtomicUsize::new(0));
-
- stream
- .map(|info| {
- let target = Arc::clone(&target);
- let chunk_reader = chunk_reader.clone();
- let bytes = Arc::clone(&bytes);
- let verify_and_write_channel = verify_and_write_channel.clone();
-
- Ok::<_, Error>(async move {
- let chunk_exists = pbs_runtime::block_in_place(|| {
- target.cond_touch_chunk(&info.digest, false)
- })?;
- if chunk_exists {
- //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
- return Ok::<_, Error>(());
- }
- //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
- let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
- let raw_size = chunk.raw_size() as usize;
-
- // decode, verify and write in a separate threads to maximize throughput
- pbs_runtime::block_in_place(|| {
- verify_and_write_channel.send((chunk, info.digest, info.size()))
- })?;
-
- bytes.fetch_add(raw_size, Ordering::SeqCst);
-
- Ok(())
- })
- })
- .try_buffer_unordered(20)
- .try_for_each(|_res| futures::future::ok(()))
- .await?;
-
- drop(verify_and_write_channel);
-
- verify_pool.complete()?;
-
- let elapsed = start_time.elapsed()?.as_secs_f64();
-
- let bytes = bytes.load(Ordering::SeqCst);
-
- worker.log(format!(
- "downloaded {} bytes ({:.2} MiB/s)",
- bytes,
- (bytes as f64) / (1024.0 * 1024.0 * elapsed)
- ));
-
- Ok(())
-}
-
-async fn download_manifest(
- reader: &BackupReader,
- filename: &std::path::Path,
-) -> Result<std::fs::File, Error> {
- let mut tmp_manifest_file = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .read(true)
- .open(&filename)?;
-
- reader
- .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
- .await?;
-
- tmp_manifest_file.seek(SeekFrom::Start(0))?;
-
- Ok(tmp_manifest_file)
-}
-
-fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
- if size != info.size {
- bail!(
- "wrong size for file '{}' ({} != {})",
- info.filename,
- info.size,
- size
- );
- }
-
- if csum != &info.csum {
- bail!("wrong checksum for file '{}'", info.filename);
- }
-
- Ok(())
-}
-
-async fn pull_single_archive(
- worker: &WorkerTask,
- reader: &BackupReader,
- chunk_reader: &mut RemoteChunkReader,
- tgt_store: Arc<DataStore>,
- snapshot: &BackupDir,
- archive_info: &FileInfo,
- downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
- let archive_name = &archive_info.filename;
- let mut path = tgt_store.base_path();
- path.push(snapshot.relative_path());
- path.push(archive_name);
-
- let mut tmp_path = path.clone();
- tmp_path.set_extension("tmp");
-
- worker.log(format!("sync archive {}", archive_name));
- let mut tmpfile = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .read(true)
- .open(&tmp_path)?;
-
- reader.download(archive_name, &mut tmpfile).await?;
-
- match archive_type(archive_name)? {
- ArchiveType::DynamicIndex => {
- let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
- format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
- })?;
- let (csum, size) = index.compute_csum();
- verify_archive(archive_info, &csum, size)?;
-
- pull_index_chunks(
- worker,
- chunk_reader.clone(),
- tgt_store.clone(),
- index,
- downloaded_chunks,
- )
- .await?;
- }
- ArchiveType::FixedIndex => {
- let index = FixedIndexReader::new(tmpfile).map_err(|err| {
- format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
- })?;
- let (csum, size) = index.compute_csum();
- verify_archive(archive_info, &csum, size)?;
-
- pull_index_chunks(
- worker,
- chunk_reader.clone(),
- tgt_store.clone(),
- index,
- downloaded_chunks,
- )
- .await?;
- }
- ArchiveType::Blob => {
- tmpfile.seek(SeekFrom::Start(0))?;
- let (csum, size) = sha256(&mut tmpfile)?;
- verify_archive(archive_info, &csum, size)?;
- }
- }
- if let Err(err) = std::fs::rename(&tmp_path, &path) {
- bail!("Atomic rename file {:?} failed - {}", path, err);
- }
- Ok(())
-}
-
-// Note: The client.log.blob is uploaded after the backup, so it is
-// not mentioned in the manifest.
-async fn try_client_log_download(
- worker: &WorkerTask,
- reader: Arc<BackupReader>,
- path: &std::path::Path,
-) -> Result<(), Error> {
- let mut tmp_path = path.to_owned();
- tmp_path.set_extension("tmp");
-
- let tmpfile = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .read(true)
- .open(&tmp_path)?;
-
- // Note: be silent if there is no log - only log successful download
- if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
- if let Err(err) = std::fs::rename(&tmp_path, &path) {
- bail!("Atomic rename file {:?} failed - {}", path, err);
- }
- worker.log(format!("got backup log file {:?}", CLIENT_LOG_BLOB_NAME));
- }
-
- Ok(())
-}
-
-async fn pull_snapshot(
- worker: &WorkerTask,
- reader: Arc<BackupReader>,
- tgt_store: Arc<DataStore>,
- snapshot: &BackupDir,
- downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
- let mut manifest_name = tgt_store.base_path();
- manifest_name.push(snapshot.relative_path());
- manifest_name.push(MANIFEST_BLOB_NAME);
-
- let mut client_log_name = tgt_store.base_path();
- client_log_name.push(snapshot.relative_path());
- client_log_name.push(CLIENT_LOG_BLOB_NAME);
-
- let mut tmp_manifest_name = manifest_name.clone();
- tmp_manifest_name.set_extension("tmp");
-
- let download_res = download_manifest(&reader, &tmp_manifest_name).await;
- let mut tmp_manifest_file = match download_res {
- Ok(manifest_file) => manifest_file,
- Err(err) => {
- match err.downcast_ref::<HttpError>() {
- Some(HttpError { code, message }) => match *code {
- StatusCode::NOT_FOUND => {
- worker.log(format!(
- "skipping snapshot {} - vanished since start of sync",
- snapshot
- ));
- return Ok(());
- }
- _ => {
- bail!("HTTP error {} - {}", code, message);
- }
- },
- None => {
- return Err(err);
- }
- };
- }
- };
- let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
-
- if manifest_name.exists() {
- let manifest_blob = proxmox::try_block!({
- let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
- format_err!(
- "unable to open local manifest {:?} - {}",
- manifest_name,
- err
- )
- })?;
-
- let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
- Ok(manifest_blob)
- })
- .map_err(|err: Error| {
- format_err!(
- "unable to read local manifest {:?} - {}",
- manifest_name,
- err
- )
- })?;
-
- if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
- if !client_log_name.exists() {
- try_client_log_download(worker, reader, &client_log_name).await?;
- }
- worker.log("no data changes");
- let _ = std::fs::remove_file(&tmp_manifest_name);
- return Ok(()); // nothing changed
- }
- }
-
- let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
-
- for item in manifest.files() {
- let mut path = tgt_store.base_path();
- path.push(snapshot.relative_path());
- path.push(&item.filename);
-
- if path.exists() {
- match archive_type(&item.filename)? {
- ArchiveType::DynamicIndex => {
- let index = DynamicIndexReader::open(&path)?;
- let (csum, size) = index.compute_csum();
- match manifest.verify_file(&item.filename, &csum, size) {
- Ok(_) => continue,
- Err(err) => {
- worker.log(format!("detected changed file {:?} - {}", path, err));
- }
- }
- }
- ArchiveType::FixedIndex => {
- let index = FixedIndexReader::open(&path)?;
- let (csum, size) = index.compute_csum();
- match manifest.verify_file(&item.filename, &csum, size) {
- Ok(_) => continue,
- Err(err) => {
- worker.log(format!("detected changed file {:?} - {}", path, err));
- }
- }
- }
- ArchiveType::Blob => {
- let mut tmpfile = std::fs::File::open(&path)?;
- let (csum, size) = sha256(&mut tmpfile)?;
- match manifest.verify_file(&item.filename, &csum, size) {
- Ok(_) => continue,
- Err(err) => {
- worker.log(format!("detected changed file {:?} - {}", path, err));
- }
- }
- }
- }
- }
-
- let mut chunk_reader = RemoteChunkReader::new(
- reader.clone(),
- None,
- item.chunk_crypt_mode(),
- HashMap::new(),
- );
-
- pull_single_archive(
- worker,
- &reader,
- &mut chunk_reader,
- tgt_store.clone(),
- snapshot,
- &item,
- downloaded_chunks.clone(),
- )
- .await?;
- }
-
- if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
- bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
- }
-
- if !client_log_name.exists() {
- try_client_log_download(worker, reader, &client_log_name).await?;
- }
-
- // cleanup - remove stale files
- tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
-
- Ok(())
-}
-
-pub async fn pull_snapshot_from(
- worker: &WorkerTask,
- reader: Arc<BackupReader>,
- tgt_store: Arc<DataStore>,
- snapshot: &BackupDir,
- downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
- let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
-
- if is_new {
- worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
-
- if let Err(err) = pull_snapshot(
- worker,
- reader,
- tgt_store.clone(),
- &snapshot,
- downloaded_chunks,
- )
- .await
- {
- if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
- worker.log(format!("cleanup error - {}", cleanup_err));
- }
- return Err(err);
- }
- worker.log(format!("sync snapshot {:?} done", snapshot.relative_path()));
- } else {
- worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
- pull_snapshot(
- worker,
- reader,
- tgt_store.clone(),
- &snapshot,
- downloaded_chunks,
- )
- .await?;
- worker.log(format!(
- "re-sync snapshot {:?} done",
- snapshot.relative_path()
- ));
- }
-
- Ok(())
-}
-
-struct SkipInfo {
- oldest: i64,
- newest: i64,
- count: u64,
-}
-
-impl SkipInfo {
- fn update(&mut self, backup_time: i64) {
- self.count += 1;
-
- if backup_time < self.oldest {
- self.oldest = backup_time;
- }
-
- if backup_time > self.newest {
- self.newest = backup_time;
- }
- }
-
- fn affected(&self) -> Result<String, Error> {
- match self.count {
- 0 => Ok(String::new()),
- 1 => proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest),
- _ => {
- Ok(format!(
- "{} .. {}",
- proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest)?,
- proxmox::tools::time::epoch_to_rfc3339_utc(self.newest)?,
- ))
- }
- }
- }
-}
-
-impl std::fmt::Display for SkipInfo {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(
- f,
- "skipped: {} snapshot(s) ({}) older than the newest local snapshot",
- self.count,
- self.affected().map_err(|_| std::fmt::Error)?
- )
- }
-}
-
-pub async fn pull_group(
- worker: &WorkerTask,
- client: &HttpClient,
- src_repo: &BackupRepository,
- tgt_store: Arc<DataStore>,
- group: &BackupGroup,
- delete: bool,
- progress: &mut StoreProgress,
-) -> Result<(), Error> {
- let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
-
- let args = json!({
- "backup-type": group.backup_type(),
- "backup-id": group.backup_id(),
- });
-
- let mut result = client.get(&path, Some(args)).await?;
- let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
-
- list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
-
- client.login().await?; // make sure auth is complete
-
- let fingerprint = client.fingerprint();
-
- let last_sync = tgt_store.last_successful_backup(group)?;
-
- let mut remote_snapshots = std::collections::HashSet::new();
-
- // start with 16384 chunks (up to 65GB)
- let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
-
- progress.group_snapshots = list.len() as u64;
-
- let mut skip_info = SkipInfo {
- oldest: i64::MAX,
- newest: i64::MIN,
- count: 0,
- };
-
- for (pos, item) in list.into_iter().enumerate() {
- let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
-
- // in-progress backups can't be synced
- if item.size.is_none() {
- worker.log(format!(
- "skipping snapshot {} - in-progress backup",
- snapshot
- ));
- continue;
- }
-
- let backup_time = snapshot.backup_time();
-
- remote_snapshots.insert(backup_time);
-
- if let Some(last_sync_time) = last_sync {
- if last_sync_time > backup_time {
- skip_info.update(backup_time);
- continue;
- }
- }
-
- // get updated auth_info (new tickets)
- let auth_info = client.login().await?;
-
- let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone());
-
- let new_client = HttpClient::new(
- src_repo.host(),
- src_repo.port(),
- src_repo.auth_id(),
- options,
- )?;
-
- let reader = BackupReader::start(
- new_client,
- None,
- src_repo.store(),
- snapshot.group().backup_type(),
- snapshot.group().backup_id(),
- backup_time,
- true,
- )
- .await?;
-
- let result = pull_snapshot_from(
- worker,
- reader,
- tgt_store.clone(),
- &snapshot,
- downloaded_chunks.clone(),
- )
- .await;
-
- progress.done_snapshots = pos as u64 + 1;
- worker.log(format!("percentage done: {}", progress));
-
- result?; // stop on error
- }
-
- if delete {
- let local_list = group.list_backups(&tgt_store.base_path())?;
- for info in local_list {
- let backup_time = info.backup_dir.backup_time();
- if remote_snapshots.contains(&backup_time) {
- continue;
- }
- worker.log(format!(
- "delete vanished snapshot {:?}",
- info.backup_dir.relative_path()
- ));
- tgt_store.remove_backup_dir(&info.backup_dir, false)?;
- }
- }
-
- if skip_info.count > 0 {
- task_log!(worker, "{}", skip_info);
- }
-
- Ok(())
-}
-
-pub async fn pull_store(
- worker: &WorkerTask,
- client: &HttpClient,
- src_repo: &BackupRepository,
- tgt_store: Arc<DataStore>,
- delete: bool,
- auth_id: Authid,
-) -> Result<(), Error> {
- // explicit create shared lock to prevent GC on newly created chunks
- let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
-
- let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
-
- let mut result = client
- .get(&path, None)
- .await
- .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
-
- let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
-
- worker.log(format!("found {} groups to sync", list.len()));
-
- list.sort_unstable_by(|a, b| {
- let type_order = a.backup_type.cmp(&b.backup_type);
- if type_order == std::cmp::Ordering::Equal {
- a.backup_id.cmp(&b.backup_id)
- } else {
- type_order
- }
- });
-
- let mut errors = false;
-
- let mut new_groups = std::collections::HashSet::new();
- for item in list.iter() {
- new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id));
- }
-
- let mut progress = StoreProgress::new(list.len() as u64);
-
- for (done, item) in list.into_iter().enumerate() {
- progress.done_groups = done as u64;
- progress.done_snapshots = 0;
- progress.group_snapshots = 0;
-
- let group = BackupGroup::new(&item.backup_type, &item.backup_id);
-
- let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) {
- Ok(result) => result,
- Err(err) => {
- worker.log(format!(
- "sync group {}/{} failed - group lock failed: {}",
- item.backup_type, item.backup_id, err
- ));
- errors = true; // do not stop here, instead continue
- continue;
- }
- };
-
- // permission check
- if auth_id != owner {
- // only the owner is allowed to create additional snapshots
- worker.log(format!(
- "sync group {}/{} failed - owner check failed ({} != {})",
- item.backup_type, item.backup_id, auth_id, owner
- ));
- errors = true; // do not stop here, instead continue
- } else if let Err(err) = pull_group(
- worker,
- client,
- src_repo,
- tgt_store.clone(),
- &group,
- delete,
- &mut progress,
- )
- .await
- {
- worker.log(format!(
- "sync group {}/{} failed - {}",
- item.backup_type, item.backup_id, err,
- ));
- errors = true; // do not stop here, instead continue
- }
- }
-
- if delete {
- let result: Result<(), Error> = proxmox::try_block!({
- let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?;
- for local_group in local_groups {
- if new_groups.contains(&local_group) {
- continue;
- }
- worker.log(format!(
- "delete vanished group '{}/{}'",
- local_group.backup_type(),
- local_group.backup_id()
- ));
- if let Err(err) = tgt_store.remove_backup_group(&local_group) {
- worker.log(err.to_string());
- errors = true;
- }
- }
- Ok(())
- });
- if let Err(err) = result {
- worker.log(format!("error during cleanup: {}", err));
- errors = true;
- };
- }
-
- if errors {
- bail!("sync failed with some errors.");
- }
-
- Ok(())
-}
pub mod auth;
+pub mod pull;
+
pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
let proxy_pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
let sock = crate::server::ctrl_sock_from_pid(proxy_pid);
--- /dev/null
+//! Sync datastore from remote server
+
+use std::collections::{HashMap, HashSet};
+use std::convert::TryFrom;
+use std::io::{Seek, SeekFrom};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::time::SystemTime;
+
+use anyhow::{bail, format_err, Error};
+use serde_json::json;
+
+use proxmox::api::error::{HttpError, StatusCode};
+
+use pbs_api_types::{Authid, SnapshotListItem, GroupListItem};
+use pbs_datastore::{task_log, BackupInfo, BackupDir, BackupGroup, StoreProgress};
+use pbs_datastore::data_blob::DataBlob;
+use pbs_datastore::dynamic_index::DynamicIndexReader;
+use pbs_datastore::fixed_index::FixedIndexReader;
+use pbs_datastore::index::IndexFile;
+use pbs_datastore::manifest::{
+ CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type
+};
+use pbs_tools::sha::sha256;
+
+use crate::{
+ backup::DataStore,
+ client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader},
+ server::WorkerTask,
+ tools::ParallelHandler,
+};
+
+// fixme: implement filters
+// fixme: delete vanished groups
+// Todo: correctly lock backup groups
+
+async fn pull_index_chunks<I: IndexFile>(
+ worker: &WorkerTask,
+ chunk_reader: RemoteChunkReader,
+ target: Arc<DataStore>,
+ index: I,
+ downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+) -> Result<(), Error> {
+ use futures::stream::{self, StreamExt, TryStreamExt};
+
+ let start_time = SystemTime::now();
+
+ let stream = stream::iter(
+ (0..index.index_count())
+ .map(|pos| index.chunk_info(pos).unwrap())
+ .filter(|info| {
+ let mut guard = downloaded_chunks.lock().unwrap();
+ let done = guard.contains(&info.digest);
+ if !done {
+ // Note: We mark a chunk as downloaded before its actually downloaded
+ // to avoid duplicate downloads.
+ guard.insert(info.digest);
+ }
+ !done
+ }),
+ );
+
+ let target2 = target.clone();
+ let verify_pool = ParallelHandler::new(
+ "sync chunk writer",
+ 4,
+ move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
+ // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
+ chunk.verify_unencrypted(size as usize, &digest)?;
+ target2.insert_chunk(&chunk, &digest)?;
+ Ok(())
+ },
+ );
+
+ let verify_and_write_channel = verify_pool.channel();
+
+ let bytes = Arc::new(AtomicUsize::new(0));
+
+ stream
+ .map(|info| {
+ let target = Arc::clone(&target);
+ let chunk_reader = chunk_reader.clone();
+ let bytes = Arc::clone(&bytes);
+ let verify_and_write_channel = verify_and_write_channel.clone();
+
+ Ok::<_, Error>(async move {
+ let chunk_exists = pbs_runtime::block_in_place(|| {
+ target.cond_touch_chunk(&info.digest, false)
+ })?;
+ if chunk_exists {
+ //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
+ return Ok::<_, Error>(());
+ }
+ //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
+ let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
+ let raw_size = chunk.raw_size() as usize;
+
+ // decode, verify and write in a separate threads to maximize throughput
+ pbs_runtime::block_in_place(|| {
+ verify_and_write_channel.send((chunk, info.digest, info.size()))
+ })?;
+
+ bytes.fetch_add(raw_size, Ordering::SeqCst);
+
+ Ok(())
+ })
+ })
+ .try_buffer_unordered(20)
+ .try_for_each(|_res| futures::future::ok(()))
+ .await?;
+
+ drop(verify_and_write_channel);
+
+ verify_pool.complete()?;
+
+ let elapsed = start_time.elapsed()?.as_secs_f64();
+
+ let bytes = bytes.load(Ordering::SeqCst);
+
+ worker.log(format!(
+ "downloaded {} bytes ({:.2} MiB/s)",
+ bytes,
+ (bytes as f64) / (1024.0 * 1024.0 * elapsed)
+ ));
+
+ Ok(())
+}
+
+async fn download_manifest(
+ reader: &BackupReader,
+ filename: &std::path::Path,
+) -> Result<std::fs::File, Error> {
+ let mut tmp_manifest_file = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .read(true)
+ .open(&filename)?;
+
+ reader
+ .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
+ .await?;
+
+ tmp_manifest_file.seek(SeekFrom::Start(0))?;
+
+ Ok(tmp_manifest_file)
+}
+
+fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
+ if size != info.size {
+ bail!(
+ "wrong size for file '{}' ({} != {})",
+ info.filename,
+ info.size,
+ size
+ );
+ }
+
+ if csum != &info.csum {
+ bail!("wrong checksum for file '{}'", info.filename);
+ }
+
+ Ok(())
+}
+
+async fn pull_single_archive(
+ worker: &WorkerTask,
+ reader: &BackupReader,
+ chunk_reader: &mut RemoteChunkReader,
+ tgt_store: Arc<DataStore>,
+ snapshot: &BackupDir,
+ archive_info: &FileInfo,
+ downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+) -> Result<(), Error> {
+ let archive_name = &archive_info.filename;
+ let mut path = tgt_store.base_path();
+ path.push(snapshot.relative_path());
+ path.push(archive_name);
+
+ let mut tmp_path = path.clone();
+ tmp_path.set_extension("tmp");
+
+ worker.log(format!("sync archive {}", archive_name));
+ let mut tmpfile = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .read(true)
+ .open(&tmp_path)?;
+
+ reader.download(archive_name, &mut tmpfile).await?;
+
+ match archive_type(archive_name)? {
+ ArchiveType::DynamicIndex => {
+ let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
+ format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
+ })?;
+ let (csum, size) = index.compute_csum();
+ verify_archive(archive_info, &csum, size)?;
+
+ pull_index_chunks(
+ worker,
+ chunk_reader.clone(),
+ tgt_store.clone(),
+ index,
+ downloaded_chunks,
+ )
+ .await?;
+ }
+ ArchiveType::FixedIndex => {
+ let index = FixedIndexReader::new(tmpfile).map_err(|err| {
+ format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
+ })?;
+ let (csum, size) = index.compute_csum();
+ verify_archive(archive_info, &csum, size)?;
+
+ pull_index_chunks(
+ worker,
+ chunk_reader.clone(),
+ tgt_store.clone(),
+ index,
+ downloaded_chunks,
+ )
+ .await?;
+ }
+ ArchiveType::Blob => {
+ tmpfile.seek(SeekFrom::Start(0))?;
+ let (csum, size) = sha256(&mut tmpfile)?;
+ verify_archive(archive_info, &csum, size)?;
+ }
+ }
+ if let Err(err) = std::fs::rename(&tmp_path, &path) {
+ bail!("Atomic rename file {:?} failed - {}", path, err);
+ }
+ Ok(())
+}
+
+// Note: The client.log.blob is uploaded after the backup, so it is
+// not mentioned in the manifest.
+async fn try_client_log_download(
+ worker: &WorkerTask,
+ reader: Arc<BackupReader>,
+ path: &std::path::Path,
+) -> Result<(), Error> {
+ let mut tmp_path = path.to_owned();
+ tmp_path.set_extension("tmp");
+
+ let tmpfile = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .read(true)
+ .open(&tmp_path)?;
+
+ // Note: be silent if there is no log - only log successful download
+ if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
+ if let Err(err) = std::fs::rename(&tmp_path, &path) {
+ bail!("Atomic rename file {:?} failed - {}", path, err);
+ }
+ worker.log(format!("got backup log file {:?}", CLIENT_LOG_BLOB_NAME));
+ }
+
+ Ok(())
+}
+
+async fn pull_snapshot(
+ worker: &WorkerTask,
+ reader: Arc<BackupReader>,
+ tgt_store: Arc<DataStore>,
+ snapshot: &BackupDir,
+ downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+) -> Result<(), Error> {
+ let mut manifest_name = tgt_store.base_path();
+ manifest_name.push(snapshot.relative_path());
+ manifest_name.push(MANIFEST_BLOB_NAME);
+
+ let mut client_log_name = tgt_store.base_path();
+ client_log_name.push(snapshot.relative_path());
+ client_log_name.push(CLIENT_LOG_BLOB_NAME);
+
+ let mut tmp_manifest_name = manifest_name.clone();
+ tmp_manifest_name.set_extension("tmp");
+
+ let download_res = download_manifest(&reader, &tmp_manifest_name).await;
+ let mut tmp_manifest_file = match download_res {
+ Ok(manifest_file) => manifest_file,
+ Err(err) => {
+ match err.downcast_ref::<HttpError>() {
+ Some(HttpError { code, message }) => match *code {
+ StatusCode::NOT_FOUND => {
+ worker.log(format!(
+ "skipping snapshot {} - vanished since start of sync",
+ snapshot
+ ));
+ return Ok(());
+ }
+ _ => {
+ bail!("HTTP error {} - {}", code, message);
+ }
+ },
+ None => {
+ return Err(err);
+ }
+ };
+ }
+ };
+ let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
+
+ if manifest_name.exists() {
+ let manifest_blob = proxmox::try_block!({
+ let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
+ format_err!(
+ "unable to open local manifest {:?} - {}",
+ manifest_name,
+ err
+ )
+ })?;
+
+ let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
+ Ok(manifest_blob)
+ })
+ .map_err(|err: Error| {
+ format_err!(
+ "unable to read local manifest {:?} - {}",
+ manifest_name,
+ err
+ )
+ })?;
+
+ if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
+ if !client_log_name.exists() {
+ try_client_log_download(worker, reader, &client_log_name).await?;
+ }
+ worker.log("no data changes");
+ let _ = std::fs::remove_file(&tmp_manifest_name);
+ return Ok(()); // nothing changed
+ }
+ }
+
+ let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
+
+ for item in manifest.files() {
+ let mut path = tgt_store.base_path();
+ path.push(snapshot.relative_path());
+ path.push(&item.filename);
+
+ if path.exists() {
+ match archive_type(&item.filename)? {
+ ArchiveType::DynamicIndex => {
+ let index = DynamicIndexReader::open(&path)?;
+ let (csum, size) = index.compute_csum();
+ match manifest.verify_file(&item.filename, &csum, size) {
+ Ok(_) => continue,
+ Err(err) => {
+ worker.log(format!("detected changed file {:?} - {}", path, err));
+ }
+ }
+ }
+ ArchiveType::FixedIndex => {
+ let index = FixedIndexReader::open(&path)?;
+ let (csum, size) = index.compute_csum();
+ match manifest.verify_file(&item.filename, &csum, size) {
+ Ok(_) => continue,
+ Err(err) => {
+ worker.log(format!("detected changed file {:?} - {}", path, err));
+ }
+ }
+ }
+ ArchiveType::Blob => {
+ let mut tmpfile = std::fs::File::open(&path)?;
+ let (csum, size) = sha256(&mut tmpfile)?;
+ match manifest.verify_file(&item.filename, &csum, size) {
+ Ok(_) => continue,
+ Err(err) => {
+ worker.log(format!("detected changed file {:?} - {}", path, err));
+ }
+ }
+ }
+ }
+ }
+
+ let mut chunk_reader = RemoteChunkReader::new(
+ reader.clone(),
+ None,
+ item.chunk_crypt_mode(),
+ HashMap::new(),
+ );
+
+ pull_single_archive(
+ worker,
+ &reader,
+ &mut chunk_reader,
+ tgt_store.clone(),
+ snapshot,
+ &item,
+ downloaded_chunks.clone(),
+ )
+ .await?;
+ }
+
+ if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
+ bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
+ }
+
+ if !client_log_name.exists() {
+ try_client_log_download(worker, reader, &client_log_name).await?;
+ }
+
+ // cleanup - remove stale files
+ tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
+
+ Ok(())
+}
+
+pub async fn pull_snapshot_from(
+ worker: &WorkerTask,
+ reader: Arc<BackupReader>,
+ tgt_store: Arc<DataStore>,
+ snapshot: &BackupDir,
+ downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+) -> Result<(), Error> {
+ let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
+
+ if is_new {
+ worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
+
+ if let Err(err) = pull_snapshot(
+ worker,
+ reader,
+ tgt_store.clone(),
+ &snapshot,
+ downloaded_chunks,
+ )
+ .await
+ {
+ if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
+ worker.log(format!("cleanup error - {}", cleanup_err));
+ }
+ return Err(err);
+ }
+ worker.log(format!("sync snapshot {:?} done", snapshot.relative_path()));
+ } else {
+ worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
+ pull_snapshot(
+ worker,
+ reader,
+ tgt_store.clone(),
+ &snapshot,
+ downloaded_chunks,
+ )
+ .await?;
+ worker.log(format!(
+ "re-sync snapshot {:?} done",
+ snapshot.relative_path()
+ ));
+ }
+
+ Ok(())
+}
+
+struct SkipInfo {
+ oldest: i64,
+ newest: i64,
+ count: u64,
+}
+
+impl SkipInfo {
+ fn update(&mut self, backup_time: i64) {
+ self.count += 1;
+
+ if backup_time < self.oldest {
+ self.oldest = backup_time;
+ }
+
+ if backup_time > self.newest {
+ self.newest = backup_time;
+ }
+ }
+
+ fn affected(&self) -> Result<String, Error> {
+ match self.count {
+ 0 => Ok(String::new()),
+ 1 => proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest),
+ _ => {
+ Ok(format!(
+ "{} .. {}",
+ proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest)?,
+ proxmox::tools::time::epoch_to_rfc3339_utc(self.newest)?,
+ ))
+ }
+ }
+ }
+}
+
+impl std::fmt::Display for SkipInfo {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "skipped: {} snapshot(s) ({}) older than the newest local snapshot",
+ self.count,
+ self.affected().map_err(|_| std::fmt::Error)?
+ )
+ }
+}
+
+pub async fn pull_group(
+ worker: &WorkerTask,
+ client: &HttpClient,
+ src_repo: &BackupRepository,
+ tgt_store: Arc<DataStore>,
+ group: &BackupGroup,
+ delete: bool,
+ progress: &mut StoreProgress,
+) -> Result<(), Error> {
+ let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
+
+ let args = json!({
+ "backup-type": group.backup_type(),
+ "backup-id": group.backup_id(),
+ });
+
+ let mut result = client.get(&path, Some(args)).await?;
+ let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
+
+ list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time));
+
+ client.login().await?; // make sure auth is complete
+
+ let fingerprint = client.fingerprint();
+
+ let last_sync = tgt_store.last_successful_backup(group)?;
+
+ let mut remote_snapshots = std::collections::HashSet::new();
+
+ // start with 16384 chunks (up to 65GB)
+ let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
+
+ progress.group_snapshots = list.len() as u64;
+
+ let mut skip_info = SkipInfo {
+ oldest: i64::MAX,
+ newest: i64::MIN,
+ count: 0,
+ };
+
+ for (pos, item) in list.into_iter().enumerate() {
+ let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?;
+
+ // in-progress backups can't be synced
+ if item.size.is_none() {
+ worker.log(format!(
+ "skipping snapshot {} - in-progress backup",
+ snapshot
+ ));
+ continue;
+ }
+
+ let backup_time = snapshot.backup_time();
+
+ remote_snapshots.insert(backup_time);
+
+ if let Some(last_sync_time) = last_sync {
+ if last_sync_time > backup_time {
+ skip_info.update(backup_time);
+ continue;
+ }
+ }
+
+ // get updated auth_info (new tickets)
+ let auth_info = client.login().await?;
+
+ let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone());
+
+ let new_client = HttpClient::new(
+ src_repo.host(),
+ src_repo.port(),
+ src_repo.auth_id(),
+ options,
+ )?;
+
+ let reader = BackupReader::start(
+ new_client,
+ None,
+ src_repo.store(),
+ snapshot.group().backup_type(),
+ snapshot.group().backup_id(),
+ backup_time,
+ true,
+ )
+ .await?;
+
+ let result = pull_snapshot_from(
+ worker,
+ reader,
+ tgt_store.clone(),
+ &snapshot,
+ downloaded_chunks.clone(),
+ )
+ .await;
+
+ progress.done_snapshots = pos as u64 + 1;
+ worker.log(format!("percentage done: {}", progress));
+
+ result?; // stop on error
+ }
+
+ if delete {
+ let local_list = group.list_backups(&tgt_store.base_path())?;
+ for info in local_list {
+ let backup_time = info.backup_dir.backup_time();
+ if remote_snapshots.contains(&backup_time) {
+ continue;
+ }
+ worker.log(format!(
+ "delete vanished snapshot {:?}",
+ info.backup_dir.relative_path()
+ ));
+ tgt_store.remove_backup_dir(&info.backup_dir, false)?;
+ }
+ }
+
+ if skip_info.count > 0 {
+ task_log!(worker, "{}", skip_info);
+ }
+
+ Ok(())
+}
+
+pub async fn pull_store(
+ worker: &WorkerTask,
+ client: &HttpClient,
+ src_repo: &BackupRepository,
+ tgt_store: Arc<DataStore>,
+ delete: bool,
+ auth_id: Authid,
+) -> Result<(), Error> {
+ // explicit create shared lock to prevent GC on newly created chunks
+ let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
+
+ let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
+
+ let mut result = client
+ .get(&path, None)
+ .await
+ .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
+
+ let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
+
+ worker.log(format!("found {} groups to sync", list.len()));
+
+ list.sort_unstable_by(|a, b| {
+ let type_order = a.backup_type.cmp(&b.backup_type);
+ if type_order == std::cmp::Ordering::Equal {
+ a.backup_id.cmp(&b.backup_id)
+ } else {
+ type_order
+ }
+ });
+
+ let mut errors = false;
+
+ let mut new_groups = std::collections::HashSet::new();
+ for item in list.iter() {
+ new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id));
+ }
+
+ let mut progress = StoreProgress::new(list.len() as u64);
+
+ for (done, item) in list.into_iter().enumerate() {
+ progress.done_groups = done as u64;
+ progress.done_snapshots = 0;
+ progress.group_snapshots = 0;
+
+ let group = BackupGroup::new(&item.backup_type, &item.backup_id);
+
+ let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) {
+ Ok(result) => result,
+ Err(err) => {
+ worker.log(format!(
+ "sync group {}/{} failed - group lock failed: {}",
+ item.backup_type, item.backup_id, err
+ ));
+ errors = true; // do not stop here, instead continue
+ continue;
+ }
+ };
+
+ // permission check
+ if auth_id != owner {
+ // only the owner is allowed to create additional snapshots
+ worker.log(format!(
+ "sync group {}/{} failed - owner check failed ({} != {})",
+ item.backup_type, item.backup_id, auth_id, owner
+ ));
+ errors = true; // do not stop here, instead continue
+ } else if let Err(err) = pull_group(
+ worker,
+ client,
+ src_repo,
+ tgt_store.clone(),
+ &group,
+ delete,
+ &mut progress,
+ )
+ .await
+ {
+ worker.log(format!(
+ "sync group {}/{} failed - {}",
+ item.backup_type, item.backup_id, err,
+ ));
+ errors = true; // do not stop here, instead continue
+ }
+ }
+
+ if delete {
+ let result: Result<(), Error> = proxmox::try_block!({
+ let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?;
+ for local_group in local_groups {
+ if new_groups.contains(&local_group) {
+ continue;
+ }
+ worker.log(format!(
+ "delete vanished group '{}/{}'",
+ local_group.backup_type(),
+ local_group.backup_id()
+ ));
+ if let Err(err) = tgt_store.remove_backup_group(&local_group) {
+ worker.log(err.to_string());
+ errors = true;
+ }
+ }
+ Ok(())
+ });
+ if let Err(err) = result {
+ worker.log(format!("error during cleanup: {}", err));
+ errors = true;
+ };
+ }
+
+ if errors {
+ bail!("sync failed with some errors.");
+ }
+
+ Ok(())
+}