]> git.proxmox.com Git - proxmox-backup-qemu.git/commitdiff
add incremental backup support
authorStefan Reiter <s.reiter@proxmox.com>
Thu, 25 Jun 2020 10:23:32 +0000 (12:23 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Fri, 26 Jun 2020 09:03:05 +0000 (11:03 +0200)
Tell the server we're doing an incremental backup if QEMU notifies us
with "is_incremental" in register_image. We do this by using the
'reuse-csum' parameter when registering each archive's index, thus
switching the server to incremental mode (where it only expects changed
chunks and verifies the previous backup's checksum).

We use the newly changed API download_previous_fixed_index() to replace
known_chunks generation and also give us the verified index file to
calculate the new checksum with. The manifest for verfication is
downloaded during "Connect".

To initialize the session cache for checksums, lazy_static is used.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
Cargo.toml
src/capi_types.rs
src/commands.rs
src/lib.rs
src/upload_queue.rs
src/worker_task.rs

index 8c84ecc768787fa81ee74851e60bc130742a95d5..3e49926001b8601975015c0022c3efd8e8a3e7fc 100644 (file)
@@ -17,8 +17,8 @@ cbindgen = "0.14.2"
 [dependencies]
 libc = "0.2"
 bytes = "0.5"
-proxmox = { version = "0.1.38", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v0.3.0" }
+proxmox = { version = "0.1.41", features = [ "sortable-macro", "api-macro" ] }
+proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v0.4.0" }
 #proxmox-backup = { path = "../proxmox-backup" }
 chrono = "0.4" # Date and time library for Rust
 anyhow = "1.0"
@@ -27,4 +27,5 @@ serde_json = "1.0"
 tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
 openssl = "0.10"
 h2 = { version = "0.2", features = ["stream"] }
+lazy_static = "1.4"
 
index 069b947e4440b8781ffa1c418d504857941cda69..6f0972dbfd36fa3ac8bd09e91ce775d2978b077e 100644 (file)
@@ -64,6 +64,7 @@ pub(crate) enum BackupMessage {
     RegisterImage {
         device_name: String,
         size: u64,
+        incremental: bool,
         callback_info: CallbackPointers,
     },
     CloseImage {
index c4d9a5d08e62ad9c14422b2a2b7646317858a7eb..c5a23467848601758525c0d814b21ad07be2dfba 100644 (file)
@@ -1,5 +1,5 @@
 use anyhow::{bail, format_err, Error};
-use std::collections::HashSet;
+use std::collections::{HashSet, HashMap};
 use std::sync::{Mutex, Arc};
 use std::os::raw::c_int;
 
@@ -13,6 +13,14 @@ use super::BackupSetup;
 use crate::capi_types::*;
 use crate::upload_queue::*;
 
+use lazy_static::lazy_static;
+
+lazy_static!{
+    static ref PREVIOUS_CSUMS: Mutex<HashMap<String, [u8;32]>> = {
+        Mutex::new(HashMap::new())
+    };
+}
+
 struct ImageUploadInfo {
     wid: u64,
     device_name: String,
@@ -82,7 +90,6 @@ async fn register_zero_chunk(
 
 pub(crate) async fn add_config(
     client: Arc<BackupWriter>,
-    crypt_config: Option<Arc<CryptConfig>>,
     registry: Arc<Mutex<ImageRegistry>>,
     name: String,
     data: DataPointer,
@@ -95,7 +102,7 @@ pub(crate) async fn add_config(
     let data: &[u8] = unsafe { std::slice::from_raw_parts(data.0, size as usize) };
     let data = data.to_vec();
 
-    let stats = client.upload_blob_from_data(data, &blob_name, crypt_config, true, false).await?;
+    let stats = client.upload_blob_from_data(data, &blob_name, true, Some(false)).await?;
 
     let mut guard = registry.lock().unwrap();
     guard.file_list.push(json!({
@@ -110,28 +117,75 @@ pub(crate) async fn add_config(
 pub(crate) async fn register_image(
     client: Arc<BackupWriter>,
     crypt_config: Option<Arc<CryptConfig>>,
+    manifest: Arc<Mutex<Option<Arc<BackupManifest>>>>,
     registry: Arc<Mutex<ImageRegistry>>,
     known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
     device_name: String,
     device_size: u64,
     chunk_size: u64,
+    incremental: bool,
 ) -> Result<c_int, Error> {
-    //println!("register image {} size {}", device_name, device_size);
 
     let archive_name = format!("{}.img.fidx", device_name);
 
-    client.download_chunk_list("fixed_index", &archive_name, known_chunks.clone()).await?;
-    //println!("register image download chunk list OK");
+    let manifest = {
+        let guard = manifest.lock().unwrap();
+        match &*guard {
+            Some(manifest) => Some(manifest.clone()),
+            None => None
+        }
+    };
+    let index = match manifest {
+        Some(manifest) => {
+            Some(client.download_previous_fixed_index(&archive_name, &manifest, known_chunks.clone()).await?)
+        },
+        None => None
+    };
+
+    let mut param = json!({ "archive-name": archive_name , "size": device_size });
+    let mut initial_index = Arc::new(None);
+
+    if incremental {
+        let csum = {
+            let map = PREVIOUS_CSUMS.lock().unwrap();
+            match map.get(&device_name) {
+                Some(c) => Some(*c),
+                None => None
+            }
+        };
+
+        if let Some(csum) = csum {
+            param.as_object_mut().unwrap().insert("reuse-csum".to_owned(), json!(proxmox::tools::digest_to_hex(&csum)));
+
+            match index {
+                Some(index) => {
+                    let index_size = ((device_size + chunk_size -1)/chunk_size) as usize;
+                    if index_size != index.index_count() {
+                        bail!("previous backup has different size than current state, cannot do incremental backup (drive: {})", archive_name);
+                    }
+                    if index.compute_csum().0 != csum {
+                        bail!("previous backup checksum doesn't match session cache, incremental backup would be out of sync (drive: {})", archive_name);
+                    }
+
+                    initial_index = Arc::new(Some(index));
+                },
+                None => bail!("no previous backup found, cannot do incremental backup")
+            }
+
+        } else {
+            bail!("no previous backups in this session, cannot do incremental one");
+        }
+    }
 
-    let param = json!({ "archive-name": archive_name , "size": device_size});
     let wid = client.post("fixed_index", Some(param)).await?.as_u64().unwrap();
 
     let zero_chunk_digest =
         register_zero_chunk(client.clone(), crypt_config, chunk_size as usize, wid).await?;
 
-    let (upload_queue,  upload_result) = create_upload_queue(
+    let (upload_queue, upload_result) = create_upload_queue(
         client.clone(),
         known_chunks.clone(),
+        initial_index.clone(),
         wid,
         device_size,
         chunk_size,
@@ -179,20 +233,30 @@ pub(crate) async fn close_image(
         None => bail!("close_image: unknown error because upload result channel was already closed"),
     };
 
+    let csum = proxmox::tools::digest_to_hex(&upload_result.csum);
+
     let param = json!({
         "wid": wid ,
         "chunk-count": upload_result.chunk_count,
         "size": upload_result.bytes_written,
-        "csum": proxmox::tools::digest_to_hex(&upload_result.csum),
+        "csum": csum.clone(),
     });
 
     let _value = client.post("fixed_close", Some(param)).await?;
 
+    {
+        let mut reg_guard = registry.lock().unwrap();
+        let info = reg_guard.lookup(dev_id)?;
+        let mut prev_csum_guard = PREVIOUS_CSUMS.lock().unwrap();
+
+        prev_csum_guard.insert(info.device_name.clone(), proxmox::tools::hex_to_digest(&csum).unwrap());
+    }
+
     let mut guard = registry.lock().unwrap();
     guard.file_list.push(json!({
         "filename": format!("{}.img.fidx", device_name),
         "size": device_size,
-        "csum": proxmox::tools::digest_to_hex(&upload_result.csum),
+        "csum": csum.clone(),
     }));
 
     Ok(0)
@@ -315,7 +379,6 @@ pub(crate) async fn write_data(
 
 pub(crate) async fn finish_backup(
     client: Arc<BackupWriter>,
-    crypt_config: Option<Arc<CryptConfig>>,
     registry: Arc<Mutex<ImageRegistry>>,
     setup: BackupSetup,
 ) -> Result<c_int, Error> {
@@ -337,7 +400,7 @@ pub(crate) async fn finish_backup(
     };
 
     client
-        .upload_blob_from_data(index_data, "index.json.blob", crypt_config, true, true)
+        .upload_blob_from_data(index_data, "index.json.blob", true, Some(true))
         .await?;
 
     client.finish().await?;
index 7e8852eb85ac95042d6727472bfcc9b244942b4b..6e358b55a47b6ab058de28f3a6fdeef59763d099 100644 (file)
@@ -285,6 +285,7 @@ pub extern "C" fn proxmox_backup_register_image(
     handle: *mut ProxmoxBackupHandle,
     device_name: *const c_char, // expect utf8 here
     size: u64,
+    is_incremental: i32,
     error: * mut * mut c_char,
 ) -> c_int {
     let task = unsafe { &mut *(handle as * mut BackupTask) };
@@ -301,7 +302,7 @@ pub extern "C" fn proxmox_backup_register_image(
 
     let device_name = unsafe { CStr::from_ptr(device_name).to_string_lossy().to_string() };
 
-    let msg = BackupMessage::RegisterImage { device_name, size, callback_info };
+    let msg = BackupMessage::RegisterImage { device_name, size, callback_info, incremental: is_incremental != 0 };
 
     if let Err(_) = task.command_tx.send(msg) {
         raise_error_int!(error, format_err!("task already aborted (send command failed)"));
@@ -325,6 +326,7 @@ pub extern "C" fn proxmox_backup_register_image_async(
     handle: *mut ProxmoxBackupHandle,
     device_name: *const c_char, // expect utf8 here
     size: u64,
+    is_incremental: i32,
     callback: extern "C" fn(*mut c_void),
     callback_data: *mut c_void,
     result: *mut c_int,
@@ -341,7 +343,7 @@ pub extern "C" fn proxmox_backup_register_image_async(
 
     let device_name = unsafe { CStr::from_ptr(device_name).to_string_lossy().to_string() };
 
-    let msg = BackupMessage::RegisterImage { device_name, size, callback_info };
+    let msg = BackupMessage::RegisterImage { device_name, size, callback_info, incremental: is_incremental != 0 };
 
     if let Err(_) = task.command_tx.send(msg) {
         callback_info2.send_result(Err(format_err!("task already aborted")));
index 8957ccefe75c1250c24a6a85a204b30ffc3294af..daf5dec0b980397ccf31b8ae5ac4bd16f81fecc3 100644 (file)
@@ -5,6 +5,7 @@ use std::sync::{Mutex, Arc};
 use futures::future::Future;
 use serde_json::json;
 use tokio::sync::{mpsc, oneshot};
+use proxmox_backup::backup::*;
 use proxmox_backup::client::*;
 
 pub(crate) struct ChunkUploadInfo {
@@ -28,6 +29,7 @@ type UploadResultSender = oneshot::Sender<Result<UploadResult, Error>>;
 pub(crate) fn create_upload_queue(
     client: Arc<BackupWriter>,
     known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+    initial_index: Arc<Option<FixedIndexReader>>,
     wid: u64,
     device_size: u64,
     chunk_size: u64,
@@ -39,6 +41,7 @@ pub(crate) fn create_upload_queue(
         upload_handler(
             client,
             known_chunks,
+            initial_index,
             wid,
             device_size,
             chunk_size,
@@ -70,6 +73,7 @@ async fn upload_chunk_list(
 async fn upload_handler(
     client: Arc<BackupWriter>,
     known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+    initial_index: Arc<Option<FixedIndexReader>>,
     wid: u64,
     device_size: u64,
     chunk_size: u64,
@@ -86,6 +90,14 @@ async fn upload_handler(
     let mut index = Vec::with_capacity(index_size);
     index.resize(index_size, [0u8; 32]);
 
+    // for incremental, initialize with data from previous backup
+    // caller ensures initial_index length is index_size
+    if let Some(init) = initial_index.as_ref() {
+        for i in 0..index_size {
+            index[i] = *init.index_digest(i).unwrap();
+        }
+    }
+
     while let Some(response_future) = upload_queue.recv().await {
         match response_future.await {
             Ok(ChunkUploadInfo { digest, offset, size, chunk_is_known }) => {
index 94a9f48055ebf873bc1fb3acb05770760f7bb135..0152b5b74ad0b4bce277b4d18beee147e7069108 100644 (file)
@@ -120,6 +120,7 @@ fn backup_worker_task(
     let written_bytes2 = written_bytes.clone();
 
     let known_chunks = Arc::new(Mutex::new(HashSet::new()));
+    let manifest = Arc::new(Mutex::new(None));
 
     let chunk_size = setup.chunk_size;
 
@@ -145,6 +146,8 @@ fn backup_worker_task(
                 BackupMessage::Connect { callback_info } => {
                     let setup = setup.clone();
                     let client = client.clone();
+                    let crypt_config = crypt_config.clone();
+                    let manifest = manifest.clone();
 
                     let command_future = async move {
                         let options = HttpClientOptions::new()
@@ -152,10 +155,17 @@ fn backup_worker_task(
                             .password(setup.password.clone());
 
                         let http = HttpClient::new(&setup.host, &setup.user, options)?;
-                        let writer = BackupWriter::start(http, &setup.store, "vm", &setup.backup_id, setup.backup_time, false).await?;
+                        let writer = BackupWriter::start(http, crypt_config.clone(), &setup.store, "vm", &setup.backup_id, setup.backup_time, false).await?;
 
-                        let mut guard = client.lock().unwrap();
-                        *guard = Some(writer);
+                        let last_manifest = writer.download_previous_manifest().await;
+                        let mut manifest_guard = manifest.lock().unwrap();
+                        *manifest_guard = match last_manifest {
+                            Ok(last_manifest) => Some(Arc::new(last_manifest)),
+                            Err(_) => None
+                        };
+
+                        let mut client_guard = client.lock().unwrap();
+                        *client_guard = Some(writer);
                         Ok(0)
                     };
 
@@ -176,7 +186,6 @@ fn backup_worker_task(
                         Some(client) => {
                             let command_future = add_config(
                                 client,
-                                crypt_config.clone(),
                                 registry.clone(),
                                 name,
                                 data,
@@ -189,18 +198,20 @@ fn backup_worker_task(
                         }
                     }
                 }
-                BackupMessage::RegisterImage { device_name, size, callback_info} => {
+                BackupMessage::RegisterImage { device_name, size, incremental, callback_info } => {
                     let client = (*(client.lock().unwrap())).clone();
                     match client {
                         Some(client) => {
                             let command_future = register_image(
                                 client,
                                 crypt_config.clone(),
+                                manifest.clone(),
                                 registry.clone(),
                                 known_chunks.clone(),
                                 device_name,
                                 size,
                                 chunk_size,
+                                incremental,
                             );
                             tokio::spawn(handle_async_command(command_future, abort.listen(), callback_info));
                         }
@@ -250,7 +261,6 @@ fn backup_worker_task(
                         Some(client) => {
                             let command_future = finish_backup(
                                 client,
-                                crypt_config.clone(),
                                 registry.clone(),
                                 setup.clone(),
                             );