use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
-use std::time::SystemTime;
+use std::time::{Duration, SystemTime};
use anyhow::{bail, format_err, Error};
use http::StatusCode;
ns: BackupNamespace,
}
+#[derive(Default)]
+pub(crate) struct PullStats {
+ pub(crate) chunk_count: usize,
+ pub(crate) bytes: usize,
+ pub(crate) elapsed: Duration,
+}
+
+impl PullStats {
+ fn add(&mut self, rhs: PullStats) {
+ self.chunk_count += rhs.chunk_count;
+ self.bytes += rhs.bytes;
+ self.elapsed += rhs.elapsed;
+ }
+}
+
#[async_trait::async_trait]
/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
/// The trait includes methods for listing namespaces, groups, and backup directories,
target: Arc<DataStore>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
use futures::stream::{self, StreamExt, TryStreamExt};
let start_time = SystemTime::now();
let verify_and_write_channel = verify_pool.channel();
let bytes = Arc::new(AtomicUsize::new(0));
+ let chunk_count = Arc::new(AtomicUsize::new(0));
stream
.map(|info| {
let target = Arc::clone(&target);
let chunk_reader = chunk_reader.clone();
let bytes = Arc::clone(&bytes);
+ let chunk_count = Arc::clone(&chunk_count);
let verify_and_write_channel = verify_and_write_channel.clone();
Ok::<_, Error>(async move {
})?;
bytes.fetch_add(raw_size, Ordering::SeqCst);
+ chunk_count.fetch_add(1, Ordering::SeqCst);
Ok(())
})
verify_pool.complete()?;
- let elapsed = start_time.elapsed()?.as_secs_f64();
+ let elapsed = start_time.elapsed()?;
let bytes = bytes.load(Ordering::SeqCst);
+ let chunk_count = chunk_count.load(Ordering::SeqCst);
task_log!(
worker,
"downloaded {} bytes ({:.2} MiB/s)",
bytes,
- (bytes as f64) / (1024.0 * 1024.0 * elapsed)
+ (bytes as f64) / (1024.0 * 1024.0 * elapsed.as_secs_f64())
);
- Ok(())
+ Ok(PullStats {
+ chunk_count,
+ bytes,
+ elapsed,
+ })
}
fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
snapshot: &'a pbs_datastore::BackupDir,
archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
let archive_name = &archive_info.filename;
let mut path = snapshot.full_path();
path.push(archive_name);
let mut tmp_path = path.clone();
tmp_path.set_extension("tmp");
+ let mut pull_stats = PullStats::default();
+
task_log!(worker, "sync archive {}", archive_name);
reader
if reader.skip_chunk_sync(snapshot.datastore().name()) {
task_log!(worker, "skipping chunk sync for same datastore");
} else {
- pull_index_chunks(
+ let stats = pull_index_chunks(
worker,
reader.chunk_reader(archive_info.crypt_mode),
snapshot.datastore().clone(),
downloaded_chunks,
)
.await?;
+ pull_stats.add(stats);
}
}
ArchiveType::FixedIndex => {
if reader.skip_chunk_sync(snapshot.datastore().name()) {
task_log!(worker, "skipping chunk sync for same datastore");
} else {
- pull_index_chunks(
+ let stats = pull_index_chunks(
worker,
reader.chunk_reader(archive_info.crypt_mode),
snapshot.datastore().clone(),
downloaded_chunks,
)
.await?;
+ pull_stats.add(stats);
}
}
ArchiveType::Blob => {
if let Err(err) = std::fs::rename(&tmp_path, &path) {
bail!("Atomic rename file {:?} failed - {}", path, err);
}
- Ok(())
+ Ok(pull_stats)
}
/// Actual implementation of pulling a snapshot.
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
+ let mut pull_stats = PullStats::default();
let mut manifest_name = snapshot.full_path();
manifest_name.push(MANIFEST_BLOB_NAME);
{
tmp_manifest_blob = data;
} else {
- return Ok(());
+ return Ok(pull_stats);
}
if manifest_name.exists() {
};
task_log!(worker, "no data changes");
let _ = std::fs::remove_file(&tmp_manifest_name);
- return Ok(()); // nothing changed
+ return Ok(pull_stats); // nothing changed
}
}
}
}
- pull_single_archive(
+ let stats = pull_single_archive(
worker,
reader.clone(),
snapshot,
downloaded_chunks.clone(),
)
.await?;
+ pull_stats.add(stats);
}
if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
.cleanup_unreferenced_files(&manifest)
.map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
- Ok(())
+ Ok(pull_stats)
}
/// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
let (_path, is_new, _snap_lock) = snapshot
.datastore()
.create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
- if is_new {
+ let pull_stats = if is_new {
task_log!(worker, "sync snapshot {}", snapshot.dir());
- if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
- if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
- snapshot.backup_ns(),
- snapshot.as_ref(),
- true,
- ) {
- task_log!(worker, "cleanup error - {}", cleanup_err);
+ match pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
+ Err(err) => {
+ if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
+ snapshot.backup_ns(),
+ snapshot.as_ref(),
+ true,
+ ) {
+ task_log!(worker, "cleanup error - {}", cleanup_err);
+ }
+ return Err(err);
+ }
+ Ok(pull_stats) => {
+ task_log!(worker, "sync snapshot {} done", snapshot.dir());
+ pull_stats
}
- return Err(err);
}
- task_log!(worker, "sync snapshot {} done", snapshot.dir());
} else {
task_log!(worker, "re-sync snapshot {}", snapshot.dir());
- pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
- }
+ pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?
+ };
- Ok(())
+ Ok(pull_stats)
}
#[derive(PartialEq, Eq)]
source_namespace: &BackupNamespace,
group: &BackupGroup,
progress: &mut StoreProgress,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
progress.group_snapshots = list.len() as u64;
+ let mut pull_stats = PullStats::default();
+
for (pos, from_snapshot) in list.into_iter().enumerate() {
let to_snapshot = params
.target
progress.done_snapshots = pos as u64 + 1;
task_log!(worker, "percentage done: {}", progress);
- result?; // stop on error
+ let stats = result?; // stop on error
+ pull_stats.add(stats);
}
if params.remove_vanished {
}
}
- Ok(())
+ Ok(pull_stats)
}
fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
pub(crate) async fn pull_store(
worker: &WorkerTask,
mut params: PullParameters,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
// explicit create shared lock to prevent GC on newly created chunks
let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
let mut errors = false;
let (mut groups, mut snapshots) = (0, 0);
let mut synced_ns = HashSet::with_capacity(namespaces.len());
+ let mut pull_stats = PullStats::default();
for namespace in namespaces {
let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace);
}
match pull_ns(worker, &namespace, &mut params).await {
- Ok((ns_progress, ns_errors)) => {
+ Ok((ns_progress, ns_pull_stats, ns_errors)) => {
errors |= ns_errors;
+ pull_stats.add(ns_pull_stats);
+
if params.max_depth != Some(0) {
groups += ns_progress.done_groups;
snapshots += ns_progress.done_snapshots;
bail!("sync failed with some errors.");
}
- Ok(())
+ Ok(pull_stats)
}
/// Pulls a namespace according to `params`.
worker: &WorkerTask,
namespace: &BackupNamespace,
params: &mut PullParameters,
-) -> Result<(StoreProgress, bool), Error> {
+) -> Result<(StoreProgress, PullStats, bool), Error> {
let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, ¶ms.owner).await?;
list.sort_unstable_by(|a, b| {
}
let mut progress = StoreProgress::new(list.len() as u64);
+ let mut pull_stats = PullStats::default();
let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
owner
);
errors = true; // do not stop here, instead continue
- } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
- {
- task_log!(worker, "sync group {} failed - {}", &group, err,);
- errors = true; // do not stop here, instead continue
+ } else {
+ match pull_group(worker, params, namespace, &group, &mut progress).await {
+ Ok(stats) => pull_stats.add(stats),
+ Err(err) => {
+ task_log!(worker, "sync group {} failed - {}", &group, err,);
+ errors = true; // do not stop here, instead continue
+ }
+ }
}
}
};
}
- Ok((progress, errors))
+ Ok((progress, pull_stats, errors))
}