use super::index::*;
use super::{DataBlob, ArchiveType, archive_type};
use crate::config::datastore;
-use crate::server::WorkerTask;
+use crate::task::TaskState;
use crate::tools;
use crate::tools::format::HumanByte;
use crate::tools::fs::{lock_dir_noblock, DirLockGuard};
use crate::api2::types::{GarbageCollectionStatus, Userid};
+use crate::server::UPID;
lazy_static! {
static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
index: I,
file_name: &Path, // only used for error reporting
status: &mut GarbageCollectionStatus,
- worker: &WorkerTask,
+ worker: &dyn TaskState,
) -> Result<(), Error> {
status.index_file_count += 1;
status.index_data_bytes += index.index_bytes();
for pos in 0..index.index_count() {
- worker.fail_on_abort()?;
+ worker.check_abort()?;
tools::fail_on_shutdown()?;
let digest = index.index_digest(pos).unwrap();
if let Err(err) = self.chunk_store.touch_chunk(digest) {
- worker.warn(&format!("warning: unable to access chunk {}, required by {:?} - {}",
- proxmox::tools::digest_to_hex(digest), file_name, err));
+ crate::task_warn!(
+ worker,
+ "warning: unable to access chunk {}, required by {:?} - {}",
+ proxmox::tools::digest_to_hex(digest),
+ file_name,
+ err,
+ );
}
}
Ok(())
}
- fn mark_used_chunks(&self, status: &mut GarbageCollectionStatus, worker: &WorkerTask) -> Result<(), Error> {
+ fn mark_used_chunks(
+ &self,
+ status: &mut GarbageCollectionStatus,
+ worker: &dyn TaskState,
+ ) -> Result<(), Error> {
let image_list = self.list_images()?;
for path in image_list {
- worker.fail_on_abort()?;
+ worker.check_abort()?;
tools::fail_on_shutdown()?;
if let Ok(archive_type) = archive_type(&path) {
let percentage = done*100/image_count;
if percentage > last_percentage {
- worker.log(format!("percentage done: phase1 {}% ({} of {} index files)",
- percentage, done, image_count));
+ crate::task_log!(
+ worker,
+ "percentage done: phase1 {}% ({} of {} index files)",
+ percentage,
+ done,
+ image_count,
+ );
last_percentage = percentage;
}
}
if let Ok(_) = self.gc_mutex.try_lock() { false } else { true }
}
- pub fn garbage_collection(&self, worker: &WorkerTask) -> Result<(), Error> {
+ pub fn garbage_collection(&self, worker: &dyn TaskState, upid: &UPID) -> Result<(), Error> {
if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
let mut gc_status = GarbageCollectionStatus::default();
- gc_status.upid = Some(worker.to_string());
-
- worker.log("Start GC phase1 (mark used chunks)");
-
- self.mark_used_chunks(&mut gc_status, &worker)?;
-
- worker.log("Start GC phase2 (sweep unused chunks)");
- self.chunk_store.sweep_unused_chunks(oldest_writer, phase1_start_time, &mut gc_status, &worker)?;
-
- worker.log(&format!("Removed garbage: {}", HumanByte::from(gc_status.removed_bytes)));
- worker.log(&format!("Removed chunks: {}", gc_status.removed_chunks));
+ gc_status.upid = Some(upid.to_string());
+
+ crate::task_log!(worker, "Start GC phase1 (mark used chunks)");
+
+ self.mark_used_chunks(&mut gc_status, worker)?;
+
+ crate::task_log!(worker, "Start GC phase2 (sweep unused chunks)");
+ self.chunk_store.sweep_unused_chunks(
+ oldest_writer,
+ phase1_start_time,
+ &mut gc_status,
+ worker,
+ )?;
+
+ crate::task_log!(
+ worker,
+ "Removed garbage: {}",
+ HumanByte::from(gc_status.removed_bytes),
+ );
+ crate::task_log!(worker, "Removed chunks: {}", gc_status.removed_chunks);
if gc_status.pending_bytes > 0 {
- worker.log(&format!("Pending removals: {} (in {} chunks)", HumanByte::from(gc_status.pending_bytes), gc_status.pending_chunks));
+ crate::task_log!(
+ worker,
+ "Pending removals: {} (in {} chunks)",
+ HumanByte::from(gc_status.pending_bytes),
+ gc_status.pending_chunks,
+ );
}
if gc_status.removed_bad > 0 {
- worker.log(&format!("Removed bad files: {}", gc_status.removed_bad));
+ crate::task_log!(worker, "Removed bad files: {}", gc_status.removed_bad);
}
- worker.log(&format!("Original data usage: {}", HumanByte::from(gc_status.index_data_bytes)));
+ crate::task_log!(
+ worker,
+ "Original data usage: {}",
+ HumanByte::from(gc_status.index_data_bytes),
+ );
if gc_status.index_data_bytes > 0 {
let comp_per = (gc_status.disk_bytes as f64 * 100.)/gc_status.index_data_bytes as f64;
- worker.log(&format!("On-Disk usage: {} ({:.2}%)", HumanByte::from(gc_status.disk_bytes), comp_per));
+ crate::task_log!(
+ worker,
+ "On-Disk usage: {} ({:.2}%)",
+ HumanByte::from(gc_status.disk_bytes),
+ comp_per,
+ );
}
- worker.log(&format!("On-Disk chunks: {}", gc_status.disk_chunks));
+ crate::task_log!(worker, "On-Disk chunks: {}", gc_status.disk_chunks);
if gc_status.disk_chunks > 0 {
let avg_chunk = gc_status.disk_bytes/(gc_status.disk_chunks as u64);
- worker.log(&format!("Average chunk size: {}", HumanByte::from(avg_chunk)));
+ crate::task_log!(worker, "Average chunk size: {}", HumanByte::from(avg_chunk));
}
*self.last_gc_status.lock().unwrap() = gc_status;
use anyhow::{bail, format_err, Error};
use crate::{
- server::WorkerTask,
api2::types::*,
- tools::ParallelHandler,
backup::{
DataStore,
DataBlob,
ArchiveType,
archive_type,
},
+ server::UPID,
+ task::TaskState,
+ task_log,
+ tools::ParallelHandler,
};
fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
fn rename_corrupted_chunk(
datastore: Arc<DataStore>,
digest: &[u8;32],
- worker: Arc<WorkerTask>,
+ worker: &dyn TaskState,
) {
let (path, digest_str) = datastore.chunk_path(digest);
match std::fs::rename(&path, &new_path) {
Ok(_) => {
- worker.log(format!("corrupted chunk renamed to {:?}", &new_path));
+ task_log!(worker, "corrupted chunk renamed to {:?}", &new_path);
},
Err(err) => {
match err.kind() {
std::io::ErrorKind::NotFound => { /* ignored */ },
- _ => worker.log(format!("could not rename corrupted chunk {:?} - {}", &path, err))
+ _ => task_log!(worker, "could not rename corrupted chunk {:?} - {}", &path, err)
}
}
};
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
crypt_mode: CryptMode,
- worker: Arc<WorkerTask>,
+ worker: Arc<dyn TaskState + Send + Sync>,
) -> Result<(), Error> {
let errors = Arc::new(AtomicUsize::new(0));
let chunk_crypt_mode = match chunk.crypt_mode() {
Err(err) => {
corrupt_chunks2.lock().unwrap().insert(digest);
- worker2.log(format!("can't verify chunk, unknown CryptMode - {}", err));
+ task_log!(worker2, "can't verify chunk, unknown CryptMode - {}", err);
errors2.fetch_add(1, Ordering::SeqCst);
return Ok(());
},
};
if chunk_crypt_mode != crypt_mode {
- worker2.log(format!(
+ task_log!(
+ worker2,
"chunk CryptMode {:?} does not match index CryptMode {:?}",
chunk_crypt_mode,
crypt_mode
- ));
+ );
errors2.fetch_add(1, Ordering::SeqCst);
}
if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
corrupt_chunks2.lock().unwrap().insert(digest);
- worker2.log(format!("{}", err));
+ task_log!(worker2, "{}", err);
errors2.fetch_add(1, Ordering::SeqCst);
- rename_corrupted_chunk(datastore2.clone(), &digest, worker2.clone());
+ rename_corrupted_chunk(datastore2.clone(), &digest, &worker2);
} else {
verified_chunks2.lock().unwrap().insert(digest);
}
for pos in 0..index.index_count() {
- worker.fail_on_abort()?;
+ worker.check_abort()?;
crate::tools::fail_on_shutdown()?;
let info = index.chunk_info(pos).unwrap();
if corrupt_chunks.lock().unwrap().contains(&info.digest) {
let digest_str = proxmox::tools::digest_to_hex(&info.digest);
- worker.log(format!("chunk {} was marked as corrupt", digest_str));
+ task_log!(worker, "chunk {} was marked as corrupt", digest_str);
errors.fetch_add(1, Ordering::SeqCst);
continue;
}
match datastore.load_chunk(&info.digest) {
Err(err) => {
corrupt_chunks.lock().unwrap().insert(info.digest);
- worker.log(format!("can't verify chunk, load failed - {}", err));
+ task_log!(worker, "can't verify chunk, load failed - {}", err);
errors.fetch_add(1, Ordering::SeqCst);
- rename_corrupted_chunk(datastore.clone(), &info.digest, worker.clone());
+ rename_corrupted_chunk(datastore.clone(), &info.digest, &worker);
continue;
}
Ok(chunk) => {
let error_count = errors.load(Ordering::SeqCst);
- worker.log(format!(" verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
- read_bytes_mib, decoded_bytes_mib, elapsed, read_speed, decode_speed, error_count));
+ task_log!(
+ worker,
+ " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
+ read_bytes_mib,
+ decoded_bytes_mib,
+ elapsed,
+ read_speed,
+ decode_speed,
+ error_count,
+ );
if errors.load(Ordering::SeqCst) > 0 {
bail!("chunks could not be verified");
info: &FileInfo,
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- worker: Arc<WorkerTask>,
+ worker: Arc<dyn TaskState + Send + Sync>,
) -> Result<(), Error> {
let mut path = backup_dir.relative_path();
bail!("wrong index checksum");
}
- verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker)
+ verify_index_chunks(
+ datastore,
+ Box::new(index),
+ verified_chunks,
+ corrupt_chunks,
+ info.chunk_crypt_mode(),
+ worker,
+ )
}
fn verify_dynamic_index(
info: &FileInfo,
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- worker: Arc<WorkerTask>,
+ worker: Arc<dyn TaskState + Send + Sync>,
) -> Result<(), Error> {
let mut path = backup_dir.relative_path();
bail!("wrong index checksum");
}
- verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker)
+ verify_index_chunks(
+ datastore,
+ Box::new(index),
+ verified_chunks,
+ corrupt_chunks,
+ info.chunk_crypt_mode(),
+ worker,
+ )
}
/// Verify a single backup snapshot
backup_dir: &BackupDir,
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- worker: Arc<WorkerTask>
+ worker: Arc<dyn TaskState + Send + Sync>,
+ upid: UPID,
) -> Result<bool, Error> {
let mut manifest = match datastore.load_manifest(&backup_dir) {
Ok((manifest, _)) => manifest,
Err(err) => {
- worker.log(format!("verify {}:{} - manifest load error: {}", datastore.name(), backup_dir, err));
+ task_log!(
+ worker,
+ "verify {}:{} - manifest load error: {}",
+ datastore.name(),
+ backup_dir,
+ err,
+ );
return Ok(false);
}
};
- worker.log(format!("verify {}:{}", datastore.name(), backup_dir));
+ task_log!(worker, "verify {}:{}", datastore.name(), backup_dir);
let mut error_count = 0;
let mut verify_result = VerifyState::Ok;
for info in manifest.files() {
let result = proxmox::try_block!({
- worker.log(format!(" check {}", info.filename));
+ task_log!(worker, " check {}", info.filename);
match archive_type(&info.filename)? {
ArchiveType::FixedIndex =>
verify_fixed_index(
}
});
- worker.fail_on_abort()?;
+ worker.check_abort()?;
crate::tools::fail_on_shutdown()?;
if let Err(err) = result {
- worker.log(format!("verify {}:{}/{} failed: {}", datastore.name(), backup_dir, info.filename, err));
+ task_log!(
+ worker,
+ "verify {}:{}/{} failed: {}",
+ datastore.name(),
+ backup_dir,
+ info.filename,
+ err,
+ );
error_count += 1;
verify_result = VerifyState::Failed;
}
let verify_state = SnapshotVerifyState {
state: verify_result,
- upid: worker.upid().clone(),
+ upid,
};
manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?;
datastore.store_manifest(&backup_dir, serde_json::to_value(manifest)?)
verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
progress: Option<(usize, usize)>, // (done, snapshot_count)
- worker: Arc<WorkerTask>,
+ worker: Arc<dyn TaskState + Send + Sync>,
+ upid: &UPID,
) -> Result<(usize, Vec<String>), Error> {
let mut errors = Vec::new();
let mut list = match group.list_backups(&datastore.base_path()) {
Ok(list) => list,
Err(err) => {
- worker.log(format!("verify group {}:{} - unable to list backups: {}", datastore.name(), group, err));
+ task_log!(
+ worker,
+ "verify group {}:{} - unable to list backups: {}",
+ datastore.name(),
+ group,
+ err,
+ );
return Ok((0, errors));
}
};
- worker.log(format!("verify group {}:{}", datastore.name(), group));
+ task_log!(worker, "verify group {}:{}", datastore.name(), group);
let (done, snapshot_count) = progress.unwrap_or((0, list.len()));
BackupInfo::sort_list(&mut list, false); // newest first
for info in list {
count += 1;
- if !verify_backup_dir(datastore.clone(), &info.backup_dir, verified_chunks.clone(), corrupt_chunks.clone(), worker.clone())?{
+ if !verify_backup_dir(
+ datastore.clone(),
+ &info.backup_dir,
+ verified_chunks.clone(),
+ corrupt_chunks.clone(),
+ worker.clone(),
+ upid.clone(),
+ )? {
errors.push(info.backup_dir.to_string());
}
if snapshot_count != 0 {
let pos = done + count;
let percentage = ((pos as f64) * 100.0)/(snapshot_count as f64);
- worker.log(format!("percentage done: {:.2}% ({} of {} snapshots)", percentage, pos, snapshot_count));
+ task_log!(
+ worker,
+ "percentage done: {:.2}% ({} of {} snapshots)",
+ percentage,
+ pos,
+ snapshot_count,
+ );
}
}
/// Returns
/// - Ok(failed_dirs) where failed_dirs had verification errors
/// - Err(_) if task was aborted
-pub fn verify_all_backups(datastore: Arc<DataStore>, worker: Arc<WorkerTask>) -> Result<Vec<String>, Error> {
-
+pub fn verify_all_backups(
+ datastore: Arc<DataStore>,
+ worker: Arc<dyn TaskState + Send + Sync>,
+ upid: &UPID,
+) -> Result<Vec<String>, Error> {
let mut errors = Vec::new();
let mut list = match BackupGroup::list_groups(&datastore.base_path()) {
.filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark"))
.collect::<Vec<BackupGroup>>(),
Err(err) => {
- worker.log(format!("verify datastore {} - unable to list backups: {}", datastore.name(), err));
+ task_log!(
+ worker,
+ "verify datastore {} - unable to list backups: {}",
+ datastore.name(),
+ err,
+ );
return Ok(errors);
}
};
// start with 64 chunks since we assume there are few corrupt ones
let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
- worker.log(format!("verify datastore {} ({} snapshots)", datastore.name(), snapshot_count));
+ task_log!(worker, "verify datastore {} ({} snapshots)", datastore.name(), snapshot_count);
let mut done = 0;
for group in list {
corrupt_chunks.clone(),
Some((done, snapshot_count)),
worker.clone(),
+ upid,
)?;
errors.append(&mut group_errors);