From: Stefan Reiter Date: Thu, 25 Jun 2020 10:23:32 +0000 (+0200) Subject: add incremental backup support X-Git-Url: https://git.proxmox.com/?a=commitdiff_plain;h=cefb437a05d3043592718cf51470160b97b4da94;p=proxmox-backup-qemu.git add incremental backup support Tell the server we're doing an incremental backup if QEMU notifies us with "is_incremental" in register_image. We do this by using the 'reuse-csum' parameter when registering each archive's index, thus switching the server to incremental mode (where it only expects changed chunks and verifies the previous backup's checksum). We use the newly changed API download_previous_fixed_index() to replace known_chunks generation and also give us the verified index file to calculate the new checksum with. The manifest for verfication is downloaded during "Connect". To initialize the session cache for checksums, lazy_static is used. Signed-off-by: Stefan Reiter --- diff --git a/Cargo.toml b/Cargo.toml index 8c84ecc..3e49926 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,8 +17,8 @@ cbindgen = "0.14.2" [dependencies] libc = "0.2" bytes = "0.5" -proxmox = { version = "0.1.38", features = [ "sortable-macro", "api-macro" ] } -proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v0.3.0" } +proxmox = { version = "0.1.41", features = [ "sortable-macro", "api-macro" ] } +proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v0.4.0" } #proxmox-backup = { path = "../proxmox-backup" } chrono = "0.4" # Date and time library for Rust anyhow = "1.0" @@ -27,4 +27,5 @@ serde_json = "1.0" tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] } openssl = "0.10" h2 = { version = "0.2", features = ["stream"] } +lazy_static = "1.4" diff --git a/src/capi_types.rs b/src/capi_types.rs index 069b947..6f0972d 100644 --- a/src/capi_types.rs +++ b/src/capi_types.rs @@ -64,6 +64,7 @@ pub(crate) enum BackupMessage { RegisterImage { device_name: String, size: u64, + incremental: bool, callback_info: CallbackPointers, }, CloseImage { diff --git a/src/commands.rs b/src/commands.rs index c4d9a5d..c5a2346 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -1,5 +1,5 @@ use anyhow::{bail, format_err, Error}; -use std::collections::HashSet; +use std::collections::{HashSet, HashMap}; use std::sync::{Mutex, Arc}; use std::os::raw::c_int; @@ -13,6 +13,14 @@ use super::BackupSetup; use crate::capi_types::*; use crate::upload_queue::*; +use lazy_static::lazy_static; + +lazy_static!{ + static ref PREVIOUS_CSUMS: Mutex> = { + Mutex::new(HashMap::new()) + }; +} + struct ImageUploadInfo { wid: u64, device_name: String, @@ -82,7 +90,6 @@ async fn register_zero_chunk( pub(crate) async fn add_config( client: Arc, - crypt_config: Option>, registry: Arc>, name: String, data: DataPointer, @@ -95,7 +102,7 @@ pub(crate) async fn add_config( let data: &[u8] = unsafe { std::slice::from_raw_parts(data.0, size as usize) }; let data = data.to_vec(); - let stats = client.upload_blob_from_data(data, &blob_name, crypt_config, true, false).await?; + let stats = client.upload_blob_from_data(data, &blob_name, true, Some(false)).await?; let mut guard = registry.lock().unwrap(); guard.file_list.push(json!({ @@ -110,28 +117,75 @@ pub(crate) async fn add_config( pub(crate) async fn register_image( client: Arc, crypt_config: Option>, + manifest: Arc>>>, registry: Arc>, known_chunks: Arc>>, device_name: String, device_size: u64, chunk_size: u64, + incremental: bool, ) -> Result { - //println!("register image {} size {}", device_name, device_size); let archive_name = format!("{}.img.fidx", device_name); - client.download_chunk_list("fixed_index", &archive_name, known_chunks.clone()).await?; - //println!("register image download chunk list OK"); + let manifest = { + let guard = manifest.lock().unwrap(); + match &*guard { + Some(manifest) => Some(manifest.clone()), + None => None + } + }; + let index = match manifest { + Some(manifest) => { + Some(client.download_previous_fixed_index(&archive_name, &manifest, known_chunks.clone()).await?) + }, + None => None + }; + + let mut param = json!({ "archive-name": archive_name , "size": device_size }); + let mut initial_index = Arc::new(None); + + if incremental { + let csum = { + let map = PREVIOUS_CSUMS.lock().unwrap(); + match map.get(&device_name) { + Some(c) => Some(*c), + None => None + } + }; + + if let Some(csum) = csum { + param.as_object_mut().unwrap().insert("reuse-csum".to_owned(), json!(proxmox::tools::digest_to_hex(&csum))); + + match index { + Some(index) => { + let index_size = ((device_size + chunk_size -1)/chunk_size) as usize; + if index_size != index.index_count() { + bail!("previous backup has different size than current state, cannot do incremental backup (drive: {})", archive_name); + } + if index.compute_csum().0 != csum { + bail!("previous backup checksum doesn't match session cache, incremental backup would be out of sync (drive: {})", archive_name); + } + + initial_index = Arc::new(Some(index)); + }, + None => bail!("no previous backup found, cannot do incremental backup") + } + + } else { + bail!("no previous backups in this session, cannot do incremental one"); + } + } - let param = json!({ "archive-name": archive_name , "size": device_size}); let wid = client.post("fixed_index", Some(param)).await?.as_u64().unwrap(); let zero_chunk_digest = register_zero_chunk(client.clone(), crypt_config, chunk_size as usize, wid).await?; - let (upload_queue, upload_result) = create_upload_queue( + let (upload_queue, upload_result) = create_upload_queue( client.clone(), known_chunks.clone(), + initial_index.clone(), wid, device_size, chunk_size, @@ -179,20 +233,30 @@ pub(crate) async fn close_image( None => bail!("close_image: unknown error because upload result channel was already closed"), }; + let csum = proxmox::tools::digest_to_hex(&upload_result.csum); + let param = json!({ "wid": wid , "chunk-count": upload_result.chunk_count, "size": upload_result.bytes_written, - "csum": proxmox::tools::digest_to_hex(&upload_result.csum), + "csum": csum.clone(), }); let _value = client.post("fixed_close", Some(param)).await?; + { + let mut reg_guard = registry.lock().unwrap(); + let info = reg_guard.lookup(dev_id)?; + let mut prev_csum_guard = PREVIOUS_CSUMS.lock().unwrap(); + + prev_csum_guard.insert(info.device_name.clone(), proxmox::tools::hex_to_digest(&csum).unwrap()); + } + let mut guard = registry.lock().unwrap(); guard.file_list.push(json!({ "filename": format!("{}.img.fidx", device_name), "size": device_size, - "csum": proxmox::tools::digest_to_hex(&upload_result.csum), + "csum": csum.clone(), })); Ok(0) @@ -315,7 +379,6 @@ pub(crate) async fn write_data( pub(crate) async fn finish_backup( client: Arc, - crypt_config: Option>, registry: Arc>, setup: BackupSetup, ) -> Result { @@ -337,7 +400,7 @@ pub(crate) async fn finish_backup( }; client - .upload_blob_from_data(index_data, "index.json.blob", crypt_config, true, true) + .upload_blob_from_data(index_data, "index.json.blob", true, Some(true)) .await?; client.finish().await?; diff --git a/src/lib.rs b/src/lib.rs index 7e8852e..6e358b5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -285,6 +285,7 @@ pub extern "C" fn proxmox_backup_register_image( handle: *mut ProxmoxBackupHandle, device_name: *const c_char, // expect utf8 here size: u64, + is_incremental: i32, error: * mut * mut c_char, ) -> c_int { let task = unsafe { &mut *(handle as * mut BackupTask) }; @@ -301,7 +302,7 @@ pub extern "C" fn proxmox_backup_register_image( let device_name = unsafe { CStr::from_ptr(device_name).to_string_lossy().to_string() }; - let msg = BackupMessage::RegisterImage { device_name, size, callback_info }; + let msg = BackupMessage::RegisterImage { device_name, size, callback_info, incremental: is_incremental != 0 }; if let Err(_) = task.command_tx.send(msg) { raise_error_int!(error, format_err!("task already aborted (send command failed)")); @@ -325,6 +326,7 @@ pub extern "C" fn proxmox_backup_register_image_async( handle: *mut ProxmoxBackupHandle, device_name: *const c_char, // expect utf8 here size: u64, + is_incremental: i32, callback: extern "C" fn(*mut c_void), callback_data: *mut c_void, result: *mut c_int, @@ -341,7 +343,7 @@ pub extern "C" fn proxmox_backup_register_image_async( let device_name = unsafe { CStr::from_ptr(device_name).to_string_lossy().to_string() }; - let msg = BackupMessage::RegisterImage { device_name, size, callback_info }; + let msg = BackupMessage::RegisterImage { device_name, size, callback_info, incremental: is_incremental != 0 }; if let Err(_) = task.command_tx.send(msg) { callback_info2.send_result(Err(format_err!("task already aborted"))); diff --git a/src/upload_queue.rs b/src/upload_queue.rs index 8957cce..daf5dec 100644 --- a/src/upload_queue.rs +++ b/src/upload_queue.rs @@ -5,6 +5,7 @@ use std::sync::{Mutex, Arc}; use futures::future::Future; use serde_json::json; use tokio::sync::{mpsc, oneshot}; +use proxmox_backup::backup::*; use proxmox_backup::client::*; pub(crate) struct ChunkUploadInfo { @@ -28,6 +29,7 @@ type UploadResultSender = oneshot::Sender>; pub(crate) fn create_upload_queue( client: Arc, known_chunks: Arc>>, + initial_index: Arc>, wid: u64, device_size: u64, chunk_size: u64, @@ -39,6 +41,7 @@ pub(crate) fn create_upload_queue( upload_handler( client, known_chunks, + initial_index, wid, device_size, chunk_size, @@ -70,6 +73,7 @@ async fn upload_chunk_list( async fn upload_handler( client: Arc, known_chunks: Arc>>, + initial_index: Arc>, wid: u64, device_size: u64, chunk_size: u64, @@ -86,6 +90,14 @@ async fn upload_handler( let mut index = Vec::with_capacity(index_size); index.resize(index_size, [0u8; 32]); + // for incremental, initialize with data from previous backup + // caller ensures initial_index length is index_size + if let Some(init) = initial_index.as_ref() { + for i in 0..index_size { + index[i] = *init.index_digest(i).unwrap(); + } + } + while let Some(response_future) = upload_queue.recv().await { match response_future.await { Ok(ChunkUploadInfo { digest, offset, size, chunk_is_known }) => { diff --git a/src/worker_task.rs b/src/worker_task.rs index 94a9f48..0152b5b 100644 --- a/src/worker_task.rs +++ b/src/worker_task.rs @@ -120,6 +120,7 @@ fn backup_worker_task( let written_bytes2 = written_bytes.clone(); let known_chunks = Arc::new(Mutex::new(HashSet::new())); + let manifest = Arc::new(Mutex::new(None)); let chunk_size = setup.chunk_size; @@ -145,6 +146,8 @@ fn backup_worker_task( BackupMessage::Connect { callback_info } => { let setup = setup.clone(); let client = client.clone(); + let crypt_config = crypt_config.clone(); + let manifest = manifest.clone(); let command_future = async move { let options = HttpClientOptions::new() @@ -152,10 +155,17 @@ fn backup_worker_task( .password(setup.password.clone()); let http = HttpClient::new(&setup.host, &setup.user, options)?; - let writer = BackupWriter::start(http, &setup.store, "vm", &setup.backup_id, setup.backup_time, false).await?; + let writer = BackupWriter::start(http, crypt_config.clone(), &setup.store, "vm", &setup.backup_id, setup.backup_time, false).await?; - let mut guard = client.lock().unwrap(); - *guard = Some(writer); + let last_manifest = writer.download_previous_manifest().await; + let mut manifest_guard = manifest.lock().unwrap(); + *manifest_guard = match last_manifest { + Ok(last_manifest) => Some(Arc::new(last_manifest)), + Err(_) => None + }; + + let mut client_guard = client.lock().unwrap(); + *client_guard = Some(writer); Ok(0) }; @@ -176,7 +186,6 @@ fn backup_worker_task( Some(client) => { let command_future = add_config( client, - crypt_config.clone(), registry.clone(), name, data, @@ -189,18 +198,20 @@ fn backup_worker_task( } } } - BackupMessage::RegisterImage { device_name, size, callback_info} => { + BackupMessage::RegisterImage { device_name, size, incremental, callback_info } => { let client = (*(client.lock().unwrap())).clone(); match client { Some(client) => { let command_future = register_image( client, crypt_config.clone(), + manifest.clone(), registry.clone(), known_chunks.clone(), device_name, size, chunk_size, + incremental, ); tokio::spawn(handle_async_command(command_future, abort.listen(), callback_info)); } @@ -250,7 +261,6 @@ fn backup_worker_task( Some(client) => { let command_future = finish_backup( client, - crypt_config.clone(), registry.clone(), setup.clone(), );