[dependencies]
libc = "0.2"
bytes = "0.5"
-proxmox = { version = "0.1.38", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v0.3.0" }
+proxmox = { version = "0.1.41", features = [ "sortable-macro", "api-macro" ] }
+proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v0.4.0" }
#proxmox-backup = { path = "../proxmox-backup" }
chrono = "0.4" # Date and time library for Rust
anyhow = "1.0"
tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
openssl = "0.10"
h2 = { version = "0.2", features = ["stream"] }
+lazy_static = "1.4"
RegisterImage {
device_name: String,
size: u64,
+ incremental: bool,
callback_info: CallbackPointers,
},
CloseImage {
use anyhow::{bail, format_err, Error};
-use std::collections::HashSet;
+use std::collections::{HashSet, HashMap};
use std::sync::{Mutex, Arc};
use std::os::raw::c_int;
use crate::capi_types::*;
use crate::upload_queue::*;
+use lazy_static::lazy_static;
+
+lazy_static!{
+ static ref PREVIOUS_CSUMS: Mutex<HashMap<String, [u8;32]>> = {
+ Mutex::new(HashMap::new())
+ };
+}
+
struct ImageUploadInfo {
wid: u64,
device_name: String,
pub(crate) async fn add_config(
client: Arc<BackupWriter>,
- crypt_config: Option<Arc<CryptConfig>>,
registry: Arc<Mutex<ImageRegistry>>,
name: String,
data: DataPointer,
let data: &[u8] = unsafe { std::slice::from_raw_parts(data.0, size as usize) };
let data = data.to_vec();
- let stats = client.upload_blob_from_data(data, &blob_name, crypt_config, true, false).await?;
+ let stats = client.upload_blob_from_data(data, &blob_name, true, Some(false)).await?;
let mut guard = registry.lock().unwrap();
guard.file_list.push(json!({
pub(crate) async fn register_image(
client: Arc<BackupWriter>,
crypt_config: Option<Arc<CryptConfig>>,
+ manifest: Arc<Mutex<Option<Arc<BackupManifest>>>>,
registry: Arc<Mutex<ImageRegistry>>,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
device_name: String,
device_size: u64,
chunk_size: u64,
+ incremental: bool,
) -> Result<c_int, Error> {
- //println!("register image {} size {}", device_name, device_size);
let archive_name = format!("{}.img.fidx", device_name);
- client.download_chunk_list("fixed_index", &archive_name, known_chunks.clone()).await?;
- //println!("register image download chunk list OK");
+ let manifest = {
+ let guard = manifest.lock().unwrap();
+ match &*guard {
+ Some(manifest) => Some(manifest.clone()),
+ None => None
+ }
+ };
+ let index = match manifest {
+ Some(manifest) => {
+ Some(client.download_previous_fixed_index(&archive_name, &manifest, known_chunks.clone()).await?)
+ },
+ None => None
+ };
+
+ let mut param = json!({ "archive-name": archive_name , "size": device_size });
+ let mut initial_index = Arc::new(None);
+
+ if incremental {
+ let csum = {
+ let map = PREVIOUS_CSUMS.lock().unwrap();
+ match map.get(&device_name) {
+ Some(c) => Some(*c),
+ None => None
+ }
+ };
+
+ if let Some(csum) = csum {
+ param.as_object_mut().unwrap().insert("reuse-csum".to_owned(), json!(proxmox::tools::digest_to_hex(&csum)));
+
+ match index {
+ Some(index) => {
+ let index_size = ((device_size + chunk_size -1)/chunk_size) as usize;
+ if index_size != index.index_count() {
+ bail!("previous backup has different size than current state, cannot do incremental backup (drive: {})", archive_name);
+ }
+ if index.compute_csum().0 != csum {
+ bail!("previous backup checksum doesn't match session cache, incremental backup would be out of sync (drive: {})", archive_name);
+ }
+
+ initial_index = Arc::new(Some(index));
+ },
+ None => bail!("no previous backup found, cannot do incremental backup")
+ }
+
+ } else {
+ bail!("no previous backups in this session, cannot do incremental one");
+ }
+ }
- let param = json!({ "archive-name": archive_name , "size": device_size});
let wid = client.post("fixed_index", Some(param)).await?.as_u64().unwrap();
let zero_chunk_digest =
register_zero_chunk(client.clone(), crypt_config, chunk_size as usize, wid).await?;
- let (upload_queue, upload_result) = create_upload_queue(
+ let (upload_queue, upload_result) = create_upload_queue(
client.clone(),
known_chunks.clone(),
+ initial_index.clone(),
wid,
device_size,
chunk_size,
None => bail!("close_image: unknown error because upload result channel was already closed"),
};
+ let csum = proxmox::tools::digest_to_hex(&upload_result.csum);
+
let param = json!({
"wid": wid ,
"chunk-count": upload_result.chunk_count,
"size": upload_result.bytes_written,
- "csum": proxmox::tools::digest_to_hex(&upload_result.csum),
+ "csum": csum.clone(),
});
let _value = client.post("fixed_close", Some(param)).await?;
+ {
+ let mut reg_guard = registry.lock().unwrap();
+ let info = reg_guard.lookup(dev_id)?;
+ let mut prev_csum_guard = PREVIOUS_CSUMS.lock().unwrap();
+
+ prev_csum_guard.insert(info.device_name.clone(), proxmox::tools::hex_to_digest(&csum).unwrap());
+ }
+
let mut guard = registry.lock().unwrap();
guard.file_list.push(json!({
"filename": format!("{}.img.fidx", device_name),
"size": device_size,
- "csum": proxmox::tools::digest_to_hex(&upload_result.csum),
+ "csum": csum.clone(),
}));
Ok(0)
pub(crate) async fn finish_backup(
client: Arc<BackupWriter>,
- crypt_config: Option<Arc<CryptConfig>>,
registry: Arc<Mutex<ImageRegistry>>,
setup: BackupSetup,
) -> Result<c_int, Error> {
};
client
- .upload_blob_from_data(index_data, "index.json.blob", crypt_config, true, true)
+ .upload_blob_from_data(index_data, "index.json.blob", true, Some(true))
.await?;
client.finish().await?;
handle: *mut ProxmoxBackupHandle,
device_name: *const c_char, // expect utf8 here
size: u64,
+ is_incremental: i32,
error: * mut * mut c_char,
) -> c_int {
let task = unsafe { &mut *(handle as * mut BackupTask) };
let device_name = unsafe { CStr::from_ptr(device_name).to_string_lossy().to_string() };
- let msg = BackupMessage::RegisterImage { device_name, size, callback_info };
+ let msg = BackupMessage::RegisterImage { device_name, size, callback_info, incremental: is_incremental != 0 };
if let Err(_) = task.command_tx.send(msg) {
raise_error_int!(error, format_err!("task already aborted (send command failed)"));
handle: *mut ProxmoxBackupHandle,
device_name: *const c_char, // expect utf8 here
size: u64,
+ is_incremental: i32,
callback: extern "C" fn(*mut c_void),
callback_data: *mut c_void,
result: *mut c_int,
let device_name = unsafe { CStr::from_ptr(device_name).to_string_lossy().to_string() };
- let msg = BackupMessage::RegisterImage { device_name, size, callback_info };
+ let msg = BackupMessage::RegisterImage { device_name, size, callback_info, incremental: is_incremental != 0 };
if let Err(_) = task.command_tx.send(msg) {
callback_info2.send_result(Err(format_err!("task already aborted")));
use futures::future::Future;
use serde_json::json;
use tokio::sync::{mpsc, oneshot};
+use proxmox_backup::backup::*;
use proxmox_backup::client::*;
pub(crate) struct ChunkUploadInfo {
pub(crate) fn create_upload_queue(
client: Arc<BackupWriter>,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+ initial_index: Arc<Option<FixedIndexReader>>,
wid: u64,
device_size: u64,
chunk_size: u64,
upload_handler(
client,
known_chunks,
+ initial_index,
wid,
device_size,
chunk_size,
async fn upload_handler(
client: Arc<BackupWriter>,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+ initial_index: Arc<Option<FixedIndexReader>>,
wid: u64,
device_size: u64,
chunk_size: u64,
let mut index = Vec::with_capacity(index_size);
index.resize(index_size, [0u8; 32]);
+ // for incremental, initialize with data from previous backup
+ // caller ensures initial_index length is index_size
+ if let Some(init) = initial_index.as_ref() {
+ for i in 0..index_size {
+ index[i] = *init.index_digest(i).unwrap();
+ }
+ }
+
while let Some(response_future) = upload_queue.recv().await {
match response_future.await {
Ok(ChunkUploadInfo { digest, offset, size, chunk_is_known }) => {
let written_bytes2 = written_bytes.clone();
let known_chunks = Arc::new(Mutex::new(HashSet::new()));
+ let manifest = Arc::new(Mutex::new(None));
let chunk_size = setup.chunk_size;
BackupMessage::Connect { callback_info } => {
let setup = setup.clone();
let client = client.clone();
+ let crypt_config = crypt_config.clone();
+ let manifest = manifest.clone();
let command_future = async move {
let options = HttpClientOptions::new()
.password(setup.password.clone());
let http = HttpClient::new(&setup.host, &setup.user, options)?;
- let writer = BackupWriter::start(http, &setup.store, "vm", &setup.backup_id, setup.backup_time, false).await?;
+ let writer = BackupWriter::start(http, crypt_config.clone(), &setup.store, "vm", &setup.backup_id, setup.backup_time, false).await?;
- let mut guard = client.lock().unwrap();
- *guard = Some(writer);
+ let last_manifest = writer.download_previous_manifest().await;
+ let mut manifest_guard = manifest.lock().unwrap();
+ *manifest_guard = match last_manifest {
+ Ok(last_manifest) => Some(Arc::new(last_manifest)),
+ Err(_) => None
+ };
+
+ let mut client_guard = client.lock().unwrap();
+ *client_guard = Some(writer);
Ok(0)
};
Some(client) => {
let command_future = add_config(
client,
- crypt_config.clone(),
registry.clone(),
name,
data,
}
}
}
- BackupMessage::RegisterImage { device_name, size, callback_info} => {
+ BackupMessage::RegisterImage { device_name, size, incremental, callback_info } => {
let client = (*(client.lock().unwrap())).clone();
match client {
Some(client) => {
let command_future = register_image(
client,
crypt_config.clone(),
+ manifest.clone(),
registry.clone(),
known_chunks.clone(),
device_name,
size,
chunk_size,
+ incremental,
);
tokio::spawn(handle_async_command(command_future, abort.listen(), callback_info));
}
Some(client) => {
let command_future = finish_backup(
client,
- crypt_config.clone(),
registry.clone(),
setup.clone(),
);