From f98ac774eed03221f19e094f24da53763e92129d Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Thu, 13 Jun 2019 11:47:23 +0200 Subject: [PATCH] backup: Add support for client side encryption first try ... --- src/api2/backup/upload_chunk.rs | 59 ++++++++++++++++++++++-------- src/backup/chunk_store.rs | 61 +++++++++++++------------------- src/backup/datastore.rs | 12 +++---- src/backup/dynamic_index.rs | 41 ++++++++++++++------- src/backup/fixed_index.rs | 32 ++++++++++++----- src/bin/proxmox-backup-client.rs | 28 ++++++++++++--- src/client/http_client.rs | 60 ++++++++++++++++++------------- 7 files changed, 185 insertions(+), 108 deletions(-) diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs index 0bfe2a40..3cd85890 100644 --- a/src/api2/backup/upload_chunk.rs +++ b/src/api2/backup/upload_chunk.rs @@ -10,20 +10,23 @@ use crate::tools; use crate::backup::*; use crate::api_schema::*; use crate::api_schema::router::*; +use crate::api2::types::*; use super::environment::*; pub struct UploadChunk { stream: Body, store: Arc, + digest: [u8; 32], size: u32, - chunk: Vec, + encoded_size: u32, + raw_data: Option>, } impl UploadChunk { - pub fn new(stream: Body, store: Arc, size: u32) -> Self { - Self { stream, store, size, chunk: vec![] } + pub fn new(stream: Body, store: Arc, digest: [u8; 32], size: u32, encoded_size: u32) -> Self { + Self { stream, store, size, encoded_size, raw_data: Some(vec![]), digest } } } @@ -34,20 +37,30 @@ impl Future for UploadChunk { fn poll(&mut self) -> Poll<([u8; 32], u32, u32, bool), failure::Error> { loop { match try_ready!(self.stream.poll()) { - Some(chunk) => { - if (self.chunk.len() + chunk.len()) > (self.size as usize) { - bail!("uploaded chunk is larger than announced."); + Some(input) => { + if let Some(ref mut raw_data) = self.raw_data { + if (raw_data.len() + input.len()) > (self.encoded_size as usize) { + bail!("uploaded chunk is larger than announced."); + } + raw_data.extend_from_slice(&input); + } else { + bail!("poll upload chunk stream failed - already finished."); } - self.chunk.extend_from_slice(&chunk); } None => { - if self.chunk.len() != (self.size as usize) { - bail!("uploaded chunk has unexpected size."); - } + if let Some(raw_data) = self.raw_data.take() { + if raw_data.len() != (self.encoded_size as usize) { + bail!("uploaded chunk has unexpected size."); + } + + let chunk = DataChunk::from_raw(raw_data, self.digest)?; - let (is_duplicate, digest, compressed_size) = self.store.insert_chunk(&self.chunk)?; + let (is_duplicate, compressed_size) = self.store.insert_chunk(&chunk)?; - return Ok(Async::Ready((digest, self.size, compressed_size as u32, is_duplicate))) + return Ok(Async::Ready((self.digest, self.size, compressed_size as u32, is_duplicate))) + } else { + bail!("poll upload chunk stream failed - already finished."); + } } } } @@ -62,10 +75,15 @@ pub fn api_method_upload_fixed_chunk() -> ApiAsyncMethod { .minimum(1) .maximum(256) ) + .required("digest", CHUNK_DIGEST_SCHEMA.clone()) .required("size", IntegerSchema::new("Chunk size.") .minimum(1) .maximum(1024*1024*16) ) + .required("encoded-size", IntegerSchema::new("Encoded chunk size.") + .minimum(9) + // fixme: .maximum(1024*1024*16+40) + ) ) } @@ -79,10 +97,14 @@ fn upload_fixed_chunk( let wid = tools::required_integer_param(¶m, "wid")? as usize; let size = tools::required_integer_param(¶m, "size")? as u32; + let encoded_size = tools::required_integer_param(¶m, "encoded-size")? as u32; + + let digest_str = tools::required_string_param(¶m, "digest")?; + let digest = crate::tools::hex_to_digest(digest_str)?; let env: &BackupEnvironment = rpcenv.as_ref(); - let upload = UploadChunk::new(req_body, env.datastore.clone(), size); + let upload = UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size); let resp = upload .then(move |result| { @@ -109,10 +131,15 @@ pub fn api_method_upload_dynamic_chunk() -> ApiAsyncMethod { .minimum(1) .maximum(256) ) + .required("digest", CHUNK_DIGEST_SCHEMA.clone()) .required("size", IntegerSchema::new("Chunk size.") .minimum(1) .maximum(1024*1024*16) ) + .required("encoded-size", IntegerSchema::new("Encoded chunk size.") + .minimum(9) + // fixme: .maximum(1024*1024*16+40) + ) ) } @@ -126,10 +153,14 @@ fn upload_dynamic_chunk( let wid = tools::required_integer_param(¶m, "wid")? as usize; let size = tools::required_integer_param(¶m, "size")? as u32; + let encoded_size = tools::required_integer_param(¶m, "encoded-size")? as u32; + + let digest_str = tools::required_string_param(¶m, "digest")?; + let digest = crate::tools::hex_to_digest(digest_str)?; let env: &BackupEnvironment = rpcenv.as_ref(); - let upload = UploadChunk::new(req_body, env.datastore.clone(), size); + let upload = UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size); let resp = upload .then(move |result| { diff --git a/src/backup/chunk_store.rs b/src/backup/chunk_store.rs index b0945143..c5b18d1d 100644 --- a/src/backup/chunk_store.rs +++ b/src/backup/chunk_store.rs @@ -1,14 +1,13 @@ use failure::*; use std::path::{Path, PathBuf}; -use std::io::{Read, Write}; +use std::io::Write; use std::sync::{Arc, Mutex}; use std::os::unix::io::AsRawFd; use serde_derive::Serialize; -use openssl::sha; - use crate::tools; +use super::DataChunk; #[derive(Clone, Serialize)] pub struct GarbageCollectionStatus { @@ -173,21 +172,19 @@ impl ChunkStore { Ok(()) } - pub fn read_chunk(&self, digest:&[u8], buffer: &mut Vec) -> Result<(), Error> { + pub fn read_chunk(&self, digest:&[u8; 32]) -> Result { let mut chunk_path = self.chunk_dir.clone(); - let prefix = digest_to_prefix(&digest); + let prefix = digest_to_prefix(digest); chunk_path.push(&prefix); - let digest_str = tools::digest_to_hex(&digest); + let digest_str = tools::digest_to_hex(digest); chunk_path.push(&digest_str); - buffer.clear(); - let f = std::fs::File::open(&chunk_path)?; - let mut decoder = zstd::stream::Decoder::new(f)?; - - decoder.read_to_end(buffer)?; + let mut file = std::fs::File::open(&chunk_path) + .map_err(|err| format_err!( + "store '{}', unable to read chunk '{}' - {}", self.name, digest_str, err))?; - Ok(()) + DataChunk::load(&mut file, *digest) } pub fn get_chunk_iterator( @@ -320,21 +317,14 @@ impl ChunkStore { Ok(()) } - pub fn insert_chunk(&self, chunk: &[u8]) -> Result<(bool, [u8; 32], u64), Error> { - - // fixme: use Sha512/256 when available - let digest = sha::sha256(chunk); - let (new, csize) = self.insert_chunk_noverify(&digest, chunk)?; - Ok((new, digest, csize)) - } - - pub fn insert_chunk_noverify( + pub fn insert_chunk( &self, - digest: &[u8; 32], - chunk: &[u8], + chunk: &DataChunk, ) -> Result<(bool, u64), Error> { - //println!("DIGEST {}", tools::digest_to_hex(&digest)); + let digest = chunk.digest(); + + //println!("DIGEST {}", tools::digest_to_hex(digest)); let mut chunk_path = self.chunk_dir.clone(); let prefix = digest_to_prefix(digest); @@ -355,12 +345,12 @@ impl ChunkStore { let mut tmp_path = chunk_path.clone(); tmp_path.set_extension("tmp"); - let f = std::fs::File::create(&tmp_path)?; + let mut file = std::fs::File::create(&tmp_path)?; - let mut encoder = zstd::stream::Encoder::new(f, 1)?; + let raw_data = chunk.raw_data(); + let encoded_size = raw_data.len() as u64; - encoder.write_all(chunk)?; - let f = encoder.finish()?; + file.write_all(raw_data)?; if let Err(err) = std::fs::rename(&tmp_path, &chunk_path) { if let Err(_) = std::fs::remove_file(&tmp_path) { /* ignore */ } @@ -372,15 +362,9 @@ impl ChunkStore { ); } - // fixme: is there a better way to get the compressed size? - let stat = nix::sys::stat::fstat(f.as_raw_fd())?; - let compressed_size = stat.st_size as u64; - - //println!("PATH {:?}", chunk_path); - drop(lock); - Ok((false, compressed_size)) + Ok((false, encoded_size)) } pub fn relative_path(&self, path: &Path) -> PathBuf { @@ -416,10 +400,13 @@ fn test_chunk_store1() { assert!(chunk_store.is_err()); let chunk_store = ChunkStore::create("test", &path).unwrap(); - let (exists, _, _) = chunk_store.insert_chunk(&[0u8, 1u8]).unwrap(); + + let chunk = super::DataChunkBuilder::new(&[0u8, 1u8]).build().unwrap(); + + let (exists, _) = chunk_store.insert_chunk(&chunk).unwrap(); assert!(!exists); - let (exists, _, _) = chunk_store.insert_chunk(&[0u8, 1u8]).unwrap(); + let (exists, _) = chunk_store.insert_chunk(&chunk).unwrap(); assert!(exists); diff --git a/src/backup/datastore.rs b/src/backup/datastore.rs index f0c092b6..211f9328 100644 --- a/src/backup/datastore.rs +++ b/src/backup/datastore.rs @@ -13,6 +13,7 @@ use super::fixed_index::*; use super::dynamic_index::*; use super::index::*; use super::backup_info::*; +use super::DataChunk; use crate::server::WorkerTask; lazy_static!{ @@ -256,15 +257,10 @@ impl DataStore { Ok(()) } - pub fn insert_chunk(&self, chunk: &[u8]) -> Result<(bool, [u8; 32], u64), Error> { - self.chunk_store.insert_chunk(chunk) - } - - pub fn insert_chunk_noverify( + pub fn insert_chunk( &self, - digest: &[u8; 32], - chunk: &[u8], + chunk: &DataChunk, ) -> Result<(bool, u64), Error> { - self.chunk_store.insert_chunk_noverify(digest, chunk) + self.chunk_store.insert_chunk(chunk) } } diff --git a/src/backup/dynamic_index.rs b/src/backup/dynamic_index.rs index c93aae11..9af4e0e5 100644 --- a/src/backup/dynamic_index.rs +++ b/src/backup/dynamic_index.rs @@ -1,4 +1,5 @@ use failure::*; +use std::convert::TryInto; use crate::tools; use super::IndexFile; @@ -17,6 +18,8 @@ use uuid::Uuid; use crate::tools::io::ops::*; use crate::tools::vec; +use super::{DataChunk, DataChunkBuilder}; + /// Header format definition for dynamic index files (`.dixd`) #[repr(C)] pub struct DynamicIndexHeader { @@ -158,11 +161,12 @@ impl DynamicIndexReader { } #[inline] - fn chunk_digest(&self, pos: usize) -> &[u8] { + fn chunk_digest(&self, pos: usize) -> &[u8; 32] { if pos >= self.index_entries { panic!("chunk index out of range"); } - unsafe { std::slice::from_raw_parts(self.index.add(pos*40+8), 32) } + let slice = unsafe { std::slice::from_raw_parts(self.index.add(pos*40+8), 32) }; + slice.try_into().unwrap() } pub fn mark_used_chunks(&self, _status: &mut GarbageCollectionStatus) -> Result<(), Error> { @@ -182,15 +186,14 @@ impl DynamicIndexReader { pub fn dump_pxar(&self, mut writer: Box) -> Result<(), Error> { - let mut buffer = Vec::with_capacity(1024*1024); - for pos in 0..self.index_entries { let _end = self.chunk_end(pos); let digest = self.chunk_digest(pos); //println!("Dump {:08x}", end ); - self.store.read_chunk(digest, &mut buffer)?; - writer.write_all(&buffer)?; - + let chunk = self.store.read_chunk(digest)?; + // fimxe: handle encrypted chunks + let data = chunk.decode(None)?; + writer.write_all(&data)?; } Ok(()) @@ -270,7 +273,14 @@ impl BufferedDynamicReader { let index = &self.index; let end = index.chunk_end(idx); let digest = index.chunk_digest(idx); - index.store.read_chunk(digest, &mut self.read_buffer)?; + + let chunk = index.store.read_chunk(digest)?; + // fimxe: handle encrypted chunks + // fixme: avoid copy + let data = chunk.decode(None)?; + + self.read_buffer.clear(); + self.read_buffer.extend_from_slice(&data); self.buffered_chunk_idx = idx; self.buffered_chunk_start = end - (self.read_buffer.len() as u64); @@ -433,7 +443,8 @@ impl DynamicIndexWriter { }) } - pub fn insert_chunk(&self, chunk: &[u8]) -> Result<(bool, [u8; 32], u64), Error> { + // fixme: use add_chunk instead? + pub fn insert_chunk(&self, chunk: &DataChunk) -> Result<(bool, u64), Error> { self.store.insert_chunk(chunk) } @@ -531,8 +542,14 @@ impl DynamicChunkWriter { self.last_chunk = self.chunk_offset; - match self.index.insert_chunk(&self.chunk_buffer) { - Ok((is_duplicate, digest, compressed_size)) => { + let chunk = DataChunkBuilder::new(&self.chunk_buffer) + .compress(true) + .build()?; + + let digest = chunk.digest(); + + match self.index.insert_chunk(&chunk) { + Ok((is_duplicate, compressed_size)) => { self.stat.compressed_size += compressed_size; if is_duplicate { @@ -542,7 +559,7 @@ impl DynamicChunkWriter { } println!("ADD CHUNK {:016x} {} {}% {} {}", self.chunk_offset, chunk_size, - (compressed_size*100)/(chunk_size as u64), is_duplicate, tools::digest_to_hex(&digest)); + (compressed_size*100)/(chunk_size as u64), is_duplicate, tools::digest_to_hex(digest)); self.index.add_chunk(self.chunk_offset as u64, &digest)?; self.chunk_buffer.truncate(0); return Ok(()); diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs index 571dc7c5..b54e338b 100644 --- a/src/backup/fixed_index.rs +++ b/src/backup/fixed_index.rs @@ -12,6 +12,7 @@ use std::path::{Path, PathBuf}; use std::os::unix::io::AsRawFd; use uuid::Uuid; use chrono::{Local, TimeZone}; +use super::ChunkInfo; /// Header format definition for fixed index files (`.fidx`) #[repr(C)] @@ -307,29 +308,42 @@ impl FixedIndexWriter { } // Note: We want to add data out of order, so do not assume any order here. - pub fn add_chunk(&mut self, pos: usize, chunk: &[u8], stat: &mut ChunkStat) -> Result<(), Error> { + pub fn add_chunk(&mut self, chunk_info: &ChunkInfo, stat: &mut ChunkStat) -> Result<(), Error> { - let end = pos + chunk.len(); + let chunk_len = chunk_info.chunk_len as usize; + let end = chunk_info.offset as usize; + + if end < chunk_len { + bail!("got chunk with small offset ({} < {}", end, chunk_len); + } + + let pos = end - chunk_len; if end > self.size { bail!("write chunk data exceeds size ({} >= {})", end, self.size); } // last chunk can be smaller - if ((end != self.size) && (chunk.len() != self.chunk_size)) || - (chunk.len() > self.chunk_size) || (chunk.len() == 0) { - bail!("got chunk with wrong length ({} != {}", chunk.len(), self.chunk_size); + if ((end != self.size) && (chunk_len != self.chunk_size)) || + (chunk_len > self.chunk_size) || (chunk_len == 0) { + bail!("got chunk with wrong length ({} != {}", chunk_len, self.chunk_size); } if pos & (self.chunk_size-1) != 0 { bail!("add unaligned chunk (pos = {})", pos); } - let (is_duplicate, digest, compressed_size) = self.store.insert_chunk(chunk)?; + if (end as u64) != chunk_info.offset { + bail!("got chunk with wrong offset ({} != {}", end, chunk_info.offset); + } + + let (is_duplicate, compressed_size) = self.store.insert_chunk(&chunk_info.chunk)?; stat.chunk_count += 1; stat.compressed_size += compressed_size; - println!("ADD CHUNK {} {} {}% {} {}", pos, chunk.len(), - (compressed_size*100)/(chunk.len() as u64), is_duplicate, tools::digest_to_hex(&digest)); + let digest = chunk_info.chunk.digest(); + + println!("ADD CHUNK {} {} {}% {} {}", pos, chunk_len, + (compressed_size*100)/(chunk_len as u64), is_duplicate, tools::digest_to_hex(digest)); if is_duplicate { stat.duplicate_chunks += 1; @@ -337,7 +351,7 @@ impl FixedIndexWriter { stat.disk_size += compressed_size; } - self.add_digest(pos / self.chunk_size, &digest) + self.add_digest(pos / self.chunk_size, digest) } pub fn add_digest(&mut self, index: usize, digest: &[u8; 32]) -> Result<(), Error> { diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index 440c5753..df30b970 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -113,6 +113,7 @@ fn backup_directory>( chunk_size: Option, all_file_systems: bool, verbose: bool, + crypt_config: Option>, ) -> Result<(), Error> { let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), all_file_systems, verbose)?; @@ -130,7 +131,7 @@ fn backup_directory>( .map_err(|_| {}).map(|_| ()) ); - client.upload_stream(archive_name, stream, "dynamic", None).wait()?; + client.upload_stream(archive_name, stream, "dynamic", None, crypt_config).wait()?; Ok(()) } @@ -142,6 +143,7 @@ fn backup_image>( image_size: u64, chunk_size: Option, _verbose: bool, + crypt_config: Option>, ) -> Result<(), Error> { let path = image_path.as_ref().to_owned(); @@ -153,7 +155,7 @@ fn backup_image>( let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4*1024*1024)); - client.upload_stream(archive_name, stream, "fixed", Some(image_size)).wait()?; + client.upload_stream(archive_name, stream, "fixed", Some(image_size), crypt_config).wait()?; Ok(()) } @@ -456,6 +458,8 @@ fn create_backup( println!("Client name: {}", tools::nodename()); println!("Start Time: {}", backup_time.to_rfc3339()); + let crypt_config = None; + let client = client.start_backup(repo.store(), "host", &backup_id, verbose).wait()?; for (backup_type, filename, target, size) in upload_list { @@ -466,11 +470,27 @@ fn create_backup( } BackupType::PXAR => { println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target); - backup_directory(&client, &filename, &target, chunk_size_opt, all_file_systems, verbose)?; + backup_directory( + &client, + &filename, + &target, + chunk_size_opt, + all_file_systems, + verbose, + crypt_config.clone(), + )?; } BackupType::IMAGE => { println!("Upload image '{}' to '{:?}' as {}", filename, repo, target); - backup_image(&client, &filename, &target, size, chunk_size_opt, verbose)?; + backup_image( + &client, + &filename, + &target, + size, + chunk_size_opt, + verbose, + crypt_config.clone(), + )?; } } } diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 25bf33a3..329536d4 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -484,20 +484,11 @@ impl BackupClient { stream: impl Stream, prefix: &str, fixed_size: Option, + crypt_config: Option>, ) -> impl Future { let known_chunks = Arc::new(Mutex::new(HashSet::new())); - let mut stream_len = 0u64; - - let stream = stream. - map(move |data| { - let digest = openssl::sha::sha256(&data); - let offset = stream_len; - stream_len += data.len() as u64; - ChunkInfo { data, digest, offset } - }); - let h2 = self.h2.clone(); let h2_2 = self.h2.clone(); let h2_3 = self.h2.clone(); @@ -519,7 +510,7 @@ impl BackupClient { }) .and_then(move |res| { let wid = res.as_u64().unwrap(); - Self::upload_chunk_info_stream(h2_3, wid, stream, &prefix, known_chunks.clone()) + Self::upload_chunk_info_stream(h2_3, wid, stream, &prefix, known_chunks.clone(), crypt_config) .and_then(move |(chunk_count, size, _speed)| { let param = json!({ "wid": wid , @@ -671,9 +662,10 @@ impl BackupClient { fn upload_chunk_info_stream( h2: H2Client, wid: u64, - stream: impl Stream, + stream: impl Stream, prefix: &str, known_chunks: Arc>>, + crypt_config: Option>, ) -> impl Future { let repeat = std::sync::Arc::new(AtomicUsize::new(0)); @@ -690,17 +682,29 @@ impl BackupClient { let start_time = std::time::Instant::now(); stream - .map(move |chunk_info| { + .and_then(move |data| { + + let chunk_len = data.len(); + repeat.fetch_add(1, Ordering::SeqCst); - stream_len.fetch_add(chunk_info.data.len(), Ordering::SeqCst); + let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64; + + let mut chunk_builder = DataChunkBuilder::new(data.as_ref()) + .compress(true); + + if let Some(ref crypt_config) = crypt_config { + chunk_builder = chunk_builder.crypt_config(crypt_config); + } let mut known_chunks = known_chunks.lock().unwrap(); - let chunk_is_known = known_chunks.contains(&chunk_info.digest); + let digest = chunk_builder.digest(); + let chunk_is_known = known_chunks.contains(digest); if chunk_is_known { - MergedChunkInfo::Known(vec![(chunk_info.offset, chunk_info.digest)]) + Ok(MergedChunkInfo::Known(vec![(offset, *digest)])) } else { - known_chunks.insert(chunk_info.digest); - MergedChunkInfo::New(chunk_info) + known_chunks.insert(*digest); + let chunk = chunk_builder.build()?; + Ok(MergedChunkInfo::New(ChunkInfo { chunk, chunk_len: chunk_len as u64, offset })) } }) .merge_known_chunks() @@ -708,15 +712,23 @@ impl BackupClient { if let MergedChunkInfo::New(chunk_info) = merged_chunk_info { let offset = chunk_info.offset; - let digest = chunk_info.digest; + let digest = *chunk_info.chunk.digest(); + let digest_str = tools::digest_to_hex(&digest); let upload_queue = upload_queue.clone(); - println!("upload new chunk {} ({} bytes, offset {})", tools::digest_to_hex(&digest), - chunk_info.data.len(), offset); + println!("upload new chunk {} ({} bytes, offset {})", digest_str, + chunk_info.chunk_len, offset); + + let chunk_data = chunk_info.chunk.raw_data(); + let param = json!({ + "wid": wid, + "digest": digest_str, + "size": chunk_info.chunk_len, + "encoded-size": chunk_data.len(), + }); - let param = json!({ "wid": wid, "size" : chunk_info.data.len() }); let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param)).unwrap(); - let upload_data = Some(chunk_info.data.freeze()); + let upload_data = Some(bytes::Bytes::from(chunk_data)); let new_info = MergedChunkInfo::Known(vec![(offset, digest)]); @@ -883,7 +895,7 @@ impl H2Client { .and_then(move |mut send_request| { if let Some(data) = data { let (response, stream) = send_request.send_request(request, false).unwrap(); - future::Either::A(PipeToSendStream::new(bytes::Bytes::from(data), stream) + future::Either::A(PipeToSendStream::new(data, stream) .and_then(move |_| { future::ok(response) })) -- 2.39.2