From: Wolfgang Bumiller Date: Mon, 12 Oct 2020 09:46:34 +0000 (+0200) Subject: don't require WorkerTask in backup/ X-Git-Tag: v0.9.1~12 X-Git-Url: https://git.proxmox.com/?a=commitdiff_plain;h=f6b1d1cc66707de5a3ed42358f2c4125ee74d034;p=proxmox-backup.git don't require WorkerTask in backup/ To untangle the server code from the actual backup implementation. It would be ideal if the whole backup/ dir could become its own crate with minimal dependencies, certainly without depending on the actual api server. That would then also be used more easily to create forensic tools for all the data file types we have in the backup repositories. Signed-off-by: Wolfgang Bumiller --- diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index af3af0ad..c260b62d 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -518,7 +518,14 @@ pub fn verify( let failed_dirs = if let Some(backup_dir) = backup_dir { let mut res = Vec::new(); - if !verify_backup_dir(datastore, &backup_dir, verified_chunks, corrupt_chunks, worker.clone())? { + if !verify_backup_dir( + datastore, + &backup_dir, + verified_chunks, + corrupt_chunks, + worker.clone(), + worker.upid().clone(), + )? { res.push(backup_dir.to_string()); } res @@ -530,10 +537,11 @@ pub fn verify( corrupt_chunks, None, worker.clone(), + worker.upid(), )?; failed_dirs } else { - verify_all_backups(datastore, worker.clone())? + verify_all_backups(datastore, worker.clone(), worker.upid())? }; if failed_dirs.len() > 0 { worker.log("Failed to verify following snapshots:"); @@ -770,7 +778,7 @@ fn start_garbage_collection( to_stdout, move |worker| { worker.log(format!("starting garbage collection on store {}", store)); - datastore.garbage_collection(&worker) + datastore.garbage_collection(&*worker, worker.upid()) }, )?; diff --git a/src/backup/chunk_store.rs b/src/backup/chunk_store.rs index 9c81ff27..4397648b 100644 --- a/src/backup/chunk_store.rs +++ b/src/backup/chunk_store.rs @@ -11,7 +11,7 @@ use crate::tools; use crate::api2::types::GarbageCollectionStatus; use super::DataBlob; -use crate::server::WorkerTask; +use crate::task::TaskState; /// File system based chunk store pub struct ChunkStore { @@ -278,7 +278,7 @@ impl ChunkStore { oldest_writer: i64, phase1_start_time: i64, status: &mut GarbageCollectionStatus, - worker: &WorkerTask, + worker: &dyn TaskState, ) -> Result<(), Error> { use nix::sys::stat::fstatat; use nix::unistd::{unlinkat, UnlinkatFlags}; @@ -297,10 +297,15 @@ impl ChunkStore { for (entry, percentage, bad) in self.get_chunk_iterator()? { if last_percentage != percentage { last_percentage = percentage; - worker.log(format!("percentage done: phase2 {}% (processed {} chunks)", percentage, chunk_count)); + crate::task_log!( + worker, + "percentage done: phase2 {}% (processed {} chunks)", + percentage, + chunk_count, + ); } - worker.fail_on_abort()?; + worker.check_abort()?; tools::fail_on_shutdown()?; let (dirfd, entry) = match entry { @@ -334,12 +339,13 @@ impl ChunkStore { Ok(_) => { match unlinkat(Some(dirfd), filename, UnlinkatFlags::NoRemoveDir) { Err(err) => - worker.warn(format!( + crate::task_warn!( + worker, "unlinking corrupt chunk {:?} failed on store '{}' - {}", filename, self.name, err, - )), + ), Ok(_) => { status.removed_bad += 1; status.removed_bytes += stat.st_size as u64; @@ -351,11 +357,12 @@ impl ChunkStore { }, Err(err) => { // some other error, warn user and keep .bad file around too - worker.warn(format!( + crate::task_warn!( + worker, "error during stat on '{:?}' - {}", orig_filename, err, - )); + ); } } } else if stat.st_atime < min_atime { diff --git a/src/backup/datastore.rs b/src/backup/datastore.rs index 1b5f7f8a..1f708293 100644 --- a/src/backup/datastore.rs +++ b/src/backup/datastore.rs @@ -18,11 +18,12 @@ use super::manifest::{MANIFEST_BLOB_NAME, CLIENT_LOG_BLOB_NAME, BackupManifest}; use super::index::*; use super::{DataBlob, ArchiveType, archive_type}; use crate::config::datastore; -use crate::server::WorkerTask; +use crate::task::TaskState; use crate::tools; use crate::tools::format::HumanByte; use crate::tools::fs::{lock_dir_noblock, DirLockGuard}; use crate::api2::types::{GarbageCollectionStatus, Userid}; +use crate::server::UPID; lazy_static! { static ref DATASTORE_MAP: Mutex>> = Mutex::new(HashMap::new()); @@ -411,25 +412,34 @@ impl DataStore { index: I, file_name: &Path, // only used for error reporting status: &mut GarbageCollectionStatus, - worker: &WorkerTask, + worker: &dyn TaskState, ) -> Result<(), Error> { status.index_file_count += 1; status.index_data_bytes += index.index_bytes(); for pos in 0..index.index_count() { - worker.fail_on_abort()?; + worker.check_abort()?; tools::fail_on_shutdown()?; let digest = index.index_digest(pos).unwrap(); if let Err(err) = self.chunk_store.touch_chunk(digest) { - worker.warn(&format!("warning: unable to access chunk {}, required by {:?} - {}", - proxmox::tools::digest_to_hex(digest), file_name, err)); + crate::task_warn!( + worker, + "warning: unable to access chunk {}, required by {:?} - {}", + proxmox::tools::digest_to_hex(digest), + file_name, + err, + ); } } Ok(()) } - fn mark_used_chunks(&self, status: &mut GarbageCollectionStatus, worker: &WorkerTask) -> Result<(), Error> { + fn mark_used_chunks( + &self, + status: &mut GarbageCollectionStatus, + worker: &dyn TaskState, + ) -> Result<(), Error> { let image_list = self.list_images()?; @@ -441,7 +451,7 @@ impl DataStore { for path in image_list { - worker.fail_on_abort()?; + worker.check_abort()?; tools::fail_on_shutdown()?; if let Ok(archive_type) = archive_type(&path) { @@ -457,8 +467,13 @@ impl DataStore { let percentage = done*100/image_count; if percentage > last_percentage { - worker.log(format!("percentage done: phase1 {}% ({} of {} index files)", - percentage, done, image_count)); + crate::task_log!( + worker, + "percentage done: phase1 {}% ({} of {} index files)", + percentage, + done, + image_count, + ); last_percentage = percentage; } } @@ -474,7 +489,7 @@ impl DataStore { if let Ok(_) = self.gc_mutex.try_lock() { false } else { true } } - pub fn garbage_collection(&self, worker: &WorkerTask) -> Result<(), Error> { + pub fn garbage_collection(&self, worker: &dyn TaskState, upid: &UPID) -> Result<(), Error> { if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() { @@ -487,36 +502,59 @@ impl DataStore { let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time); let mut gc_status = GarbageCollectionStatus::default(); - gc_status.upid = Some(worker.to_string()); - - worker.log("Start GC phase1 (mark used chunks)"); - - self.mark_used_chunks(&mut gc_status, &worker)?; - - worker.log("Start GC phase2 (sweep unused chunks)"); - self.chunk_store.sweep_unused_chunks(oldest_writer, phase1_start_time, &mut gc_status, &worker)?; - - worker.log(&format!("Removed garbage: {}", HumanByte::from(gc_status.removed_bytes))); - worker.log(&format!("Removed chunks: {}", gc_status.removed_chunks)); + gc_status.upid = Some(upid.to_string()); + + crate::task_log!(worker, "Start GC phase1 (mark used chunks)"); + + self.mark_used_chunks(&mut gc_status, worker)?; + + crate::task_log!(worker, "Start GC phase2 (sweep unused chunks)"); + self.chunk_store.sweep_unused_chunks( + oldest_writer, + phase1_start_time, + &mut gc_status, + worker, + )?; + + crate::task_log!( + worker, + "Removed garbage: {}", + HumanByte::from(gc_status.removed_bytes), + ); + crate::task_log!(worker, "Removed chunks: {}", gc_status.removed_chunks); if gc_status.pending_bytes > 0 { - worker.log(&format!("Pending removals: {} (in {} chunks)", HumanByte::from(gc_status.pending_bytes), gc_status.pending_chunks)); + crate::task_log!( + worker, + "Pending removals: {} (in {} chunks)", + HumanByte::from(gc_status.pending_bytes), + gc_status.pending_chunks, + ); } if gc_status.removed_bad > 0 { - worker.log(&format!("Removed bad files: {}", gc_status.removed_bad)); + crate::task_log!(worker, "Removed bad files: {}", gc_status.removed_bad); } - worker.log(&format!("Original data usage: {}", HumanByte::from(gc_status.index_data_bytes))); + crate::task_log!( + worker, + "Original data usage: {}", + HumanByte::from(gc_status.index_data_bytes), + ); if gc_status.index_data_bytes > 0 { let comp_per = (gc_status.disk_bytes as f64 * 100.)/gc_status.index_data_bytes as f64; - worker.log(&format!("On-Disk usage: {} ({:.2}%)", HumanByte::from(gc_status.disk_bytes), comp_per)); + crate::task_log!( + worker, + "On-Disk usage: {} ({:.2}%)", + HumanByte::from(gc_status.disk_bytes), + comp_per, + ); } - worker.log(&format!("On-Disk chunks: {}", gc_status.disk_chunks)); + crate::task_log!(worker, "On-Disk chunks: {}", gc_status.disk_chunks); if gc_status.disk_chunks > 0 { let avg_chunk = gc_status.disk_bytes/(gc_status.disk_chunks as u64); - worker.log(&format!("Average chunk size: {}", HumanByte::from(avg_chunk))); + crate::task_log!(worker, "Average chunk size: {}", HumanByte::from(avg_chunk)); } *self.last_gc_status.lock().unwrap() = gc_status; diff --git a/src/backup/verify.rs b/src/backup/verify.rs index 0c55305f..f2c38eae 100644 --- a/src/backup/verify.rs +++ b/src/backup/verify.rs @@ -6,9 +6,7 @@ use std::time::Instant; use anyhow::{bail, format_err, Error}; use crate::{ - server::WorkerTask, api2::types::*, - tools::ParallelHandler, backup::{ DataStore, DataBlob, @@ -21,6 +19,10 @@ use crate::{ ArchiveType, archive_type, }, + server::UPID, + task::TaskState, + task_log, + tools::ParallelHandler, }; fn verify_blob(datastore: Arc, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> { @@ -51,7 +53,7 @@ fn verify_blob(datastore: Arc, backup_dir: &BackupDir, info: &FileInf fn rename_corrupted_chunk( datastore: Arc, digest: &[u8;32], - worker: Arc, + worker: &dyn TaskState, ) { let (path, digest_str) = datastore.chunk_path(digest); @@ -64,12 +66,12 @@ fn rename_corrupted_chunk( match std::fs::rename(&path, &new_path) { Ok(_) => { - worker.log(format!("corrupted chunk renamed to {:?}", &new_path)); + task_log!(worker, "corrupted chunk renamed to {:?}", &new_path); }, Err(err) => { match err.kind() { std::io::ErrorKind::NotFound => { /* ignored */ }, - _ => worker.log(format!("could not rename corrupted chunk {:?} - {}", &path, err)) + _ => task_log!(worker, "could not rename corrupted chunk {:?} - {}", &path, err) } } }; @@ -81,7 +83,7 @@ fn verify_index_chunks( verified_chunks: Arc>>, corrupt_chunks: Arc>>, crypt_mode: CryptMode, - worker: Arc, + worker: Arc, ) -> Result<(), Error> { let errors = Arc::new(AtomicUsize::new(0)); @@ -103,7 +105,7 @@ fn verify_index_chunks( let chunk_crypt_mode = match chunk.crypt_mode() { Err(err) => { corrupt_chunks2.lock().unwrap().insert(digest); - worker2.log(format!("can't verify chunk, unknown CryptMode - {}", err)); + task_log!(worker2, "can't verify chunk, unknown CryptMode - {}", err); errors2.fetch_add(1, Ordering::SeqCst); return Ok(()); }, @@ -111,19 +113,20 @@ fn verify_index_chunks( }; if chunk_crypt_mode != crypt_mode { - worker2.log(format!( + task_log!( + worker2, "chunk CryptMode {:?} does not match index CryptMode {:?}", chunk_crypt_mode, crypt_mode - )); + ); errors2.fetch_add(1, Ordering::SeqCst); } if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) { corrupt_chunks2.lock().unwrap().insert(digest); - worker2.log(format!("{}", err)); + task_log!(worker2, "{}", err); errors2.fetch_add(1, Ordering::SeqCst); - rename_corrupted_chunk(datastore2.clone(), &digest, worker2.clone()); + rename_corrupted_chunk(datastore2.clone(), &digest, &worker2); } else { verified_chunks2.lock().unwrap().insert(digest); } @@ -134,7 +137,7 @@ fn verify_index_chunks( for pos in 0..index.index_count() { - worker.fail_on_abort()?; + worker.check_abort()?; crate::tools::fail_on_shutdown()?; let info = index.chunk_info(pos).unwrap(); @@ -146,7 +149,7 @@ fn verify_index_chunks( if corrupt_chunks.lock().unwrap().contains(&info.digest) { let digest_str = proxmox::tools::digest_to_hex(&info.digest); - worker.log(format!("chunk {} was marked as corrupt", digest_str)); + task_log!(worker, "chunk {} was marked as corrupt", digest_str); errors.fetch_add(1, Ordering::SeqCst); continue; } @@ -154,9 +157,9 @@ fn verify_index_chunks( match datastore.load_chunk(&info.digest) { Err(err) => { corrupt_chunks.lock().unwrap().insert(info.digest); - worker.log(format!("can't verify chunk, load failed - {}", err)); + task_log!(worker, "can't verify chunk, load failed - {}", err); errors.fetch_add(1, Ordering::SeqCst); - rename_corrupted_chunk(datastore.clone(), &info.digest, worker.clone()); + rename_corrupted_chunk(datastore.clone(), &info.digest, &worker); continue; } Ok(chunk) => { @@ -179,8 +182,16 @@ fn verify_index_chunks( let error_count = errors.load(Ordering::SeqCst); - worker.log(format!(" verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)", - read_bytes_mib, decoded_bytes_mib, elapsed, read_speed, decode_speed, error_count)); + task_log!( + worker, + " verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)", + read_bytes_mib, + decoded_bytes_mib, + elapsed, + read_speed, + decode_speed, + error_count, + ); if errors.load(Ordering::SeqCst) > 0 { bail!("chunks could not be verified"); @@ -195,7 +206,7 @@ fn verify_fixed_index( info: &FileInfo, verified_chunks: Arc>>, corrupt_chunks: Arc>>, - worker: Arc, + worker: Arc, ) -> Result<(), Error> { let mut path = backup_dir.relative_path(); @@ -212,7 +223,14 @@ fn verify_fixed_index( bail!("wrong index checksum"); } - verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker) + verify_index_chunks( + datastore, + Box::new(index), + verified_chunks, + corrupt_chunks, + info.chunk_crypt_mode(), + worker, + ) } fn verify_dynamic_index( @@ -221,7 +239,7 @@ fn verify_dynamic_index( info: &FileInfo, verified_chunks: Arc>>, corrupt_chunks: Arc>>, - worker: Arc, + worker: Arc, ) -> Result<(), Error> { let mut path = backup_dir.relative_path(); @@ -238,7 +256,14 @@ fn verify_dynamic_index( bail!("wrong index checksum"); } - verify_index_chunks(datastore, Box::new(index), verified_chunks, corrupt_chunks, info.chunk_crypt_mode(), worker) + verify_index_chunks( + datastore, + Box::new(index), + verified_chunks, + corrupt_chunks, + info.chunk_crypt_mode(), + worker, + ) } /// Verify a single backup snapshot @@ -255,25 +280,32 @@ pub fn verify_backup_dir( backup_dir: &BackupDir, verified_chunks: Arc>>, corrupt_chunks: Arc>>, - worker: Arc + worker: Arc, + upid: UPID, ) -> Result { let mut manifest = match datastore.load_manifest(&backup_dir) { Ok((manifest, _)) => manifest, Err(err) => { - worker.log(format!("verify {}:{} - manifest load error: {}", datastore.name(), backup_dir, err)); + task_log!( + worker, + "verify {}:{} - manifest load error: {}", + datastore.name(), + backup_dir, + err, + ); return Ok(false); } }; - worker.log(format!("verify {}:{}", datastore.name(), backup_dir)); + task_log!(worker, "verify {}:{}", datastore.name(), backup_dir); let mut error_count = 0; let mut verify_result = VerifyState::Ok; for info in manifest.files() { let result = proxmox::try_block!({ - worker.log(format!(" check {}", info.filename)); + task_log!(worker, " check {}", info.filename); match archive_type(&info.filename)? { ArchiveType::FixedIndex => verify_fixed_index( @@ -297,11 +329,18 @@ pub fn verify_backup_dir( } }); - worker.fail_on_abort()?; + worker.check_abort()?; crate::tools::fail_on_shutdown()?; if let Err(err) = result { - worker.log(format!("verify {}:{}/{} failed: {}", datastore.name(), backup_dir, info.filename, err)); + task_log!( + worker, + "verify {}:{}/{} failed: {}", + datastore.name(), + backup_dir, + info.filename, + err, + ); error_count += 1; verify_result = VerifyState::Failed; } @@ -310,7 +349,7 @@ pub fn verify_backup_dir( let verify_state = SnapshotVerifyState { state: verify_result, - upid: worker.upid().clone(), + upid, }; manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?; datastore.store_manifest(&backup_dir, serde_json::to_value(manifest)?) @@ -332,19 +371,26 @@ pub fn verify_backup_group( verified_chunks: Arc>>, corrupt_chunks: Arc>>, progress: Option<(usize, usize)>, // (done, snapshot_count) - worker: Arc, + worker: Arc, + upid: &UPID, ) -> Result<(usize, Vec), Error> { let mut errors = Vec::new(); let mut list = match group.list_backups(&datastore.base_path()) { Ok(list) => list, Err(err) => { - worker.log(format!("verify group {}:{} - unable to list backups: {}", datastore.name(), group, err)); + task_log!( + worker, + "verify group {}:{} - unable to list backups: {}", + datastore.name(), + group, + err, + ); return Ok((0, errors)); } }; - worker.log(format!("verify group {}:{}", datastore.name(), group)); + task_log!(worker, "verify group {}:{}", datastore.name(), group); let (done, snapshot_count) = progress.unwrap_or((0, list.len())); @@ -352,13 +398,26 @@ pub fn verify_backup_group( BackupInfo::sort_list(&mut list, false); // newest first for info in list { count += 1; - if !verify_backup_dir(datastore.clone(), &info.backup_dir, verified_chunks.clone(), corrupt_chunks.clone(), worker.clone())?{ + if !verify_backup_dir( + datastore.clone(), + &info.backup_dir, + verified_chunks.clone(), + corrupt_chunks.clone(), + worker.clone(), + upid.clone(), + )? { errors.push(info.backup_dir.to_string()); } if snapshot_count != 0 { let pos = done + count; let percentage = ((pos as f64) * 100.0)/(snapshot_count as f64); - worker.log(format!("percentage done: {:.2}% ({} of {} snapshots)", percentage, pos, snapshot_count)); + task_log!( + worker, + "percentage done: {:.2}% ({} of {} snapshots)", + percentage, + pos, + snapshot_count, + ); } } @@ -372,8 +431,11 @@ pub fn verify_backup_group( /// Returns /// - Ok(failed_dirs) where failed_dirs had verification errors /// - Err(_) if task was aborted -pub fn verify_all_backups(datastore: Arc, worker: Arc) -> Result, Error> { - +pub fn verify_all_backups( + datastore: Arc, + worker: Arc, + upid: &UPID, +) -> Result, Error> { let mut errors = Vec::new(); let mut list = match BackupGroup::list_groups(&datastore.base_path()) { @@ -382,7 +444,12 @@ pub fn verify_all_backups(datastore: Arc, worker: Arc) -> .filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark")) .collect::>(), Err(err) => { - worker.log(format!("verify datastore {} - unable to list backups: {}", datastore.name(), err)); + task_log!( + worker, + "verify datastore {} - unable to list backups: {}", + datastore.name(), + err, + ); return Ok(errors); } }; @@ -400,7 +467,7 @@ pub fn verify_all_backups(datastore: Arc, worker: Arc) -> // start with 64 chunks since we assume there are few corrupt ones let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64))); - worker.log(format!("verify datastore {} ({} snapshots)", datastore.name(), snapshot_count)); + task_log!(worker, "verify datastore {} ({} snapshots)", datastore.name(), snapshot_count); let mut done = 0; for group in list { @@ -411,6 +478,7 @@ pub fn verify_all_backups(datastore: Arc, worker: Arc) -> corrupt_chunks.clone(), Some((done, snapshot_count)), worker.clone(), + upid, )?; errors.append(&mut group_errors); diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 67fbc541..b28ac035 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -306,7 +306,7 @@ async fn schedule_datastore_garbage_collection() { worker.log(format!("starting garbage collection on store {}", store)); worker.log(format!("task triggered by schedule '{}'", event_str)); - let result = datastore.garbage_collection(&worker); + let result = datastore.garbage_collection(&*worker, worker.upid()); let status = worker.create_state(&result); @@ -557,7 +557,8 @@ async fn schedule_datastore_verification() { worker.log(format!("starting verification on store {}", store2)); worker.log(format!("task triggered by schedule '{}'", event_str)); let result = try_block!({ - let failed_dirs = verify_all_backups(datastore, worker.clone())?; + let failed_dirs = + verify_all_backups(datastore, worker.clone(), worker.upid())?; if failed_dirs.len() > 0 { worker.log("Failed to verify following snapshots:"); for dir in failed_dirs {