use proxmox::api::router::SubdirMap;
use proxmox::api::schema::*;
-use crate::tools::{self, WrappedReaderStream};
+use crate::tools;
use crate::server::{WorkerTask, H2Service};
use crate::backup::*;
use crate::api2::types::*;
),
(
"dynamic_index", &Router::new()
- .download(&API_METHOD_DYNAMIC_CHUNK_INDEX)
.post(&API_METHOD_CREATE_DYNAMIC_INDEX)
.put(&API_METHOD_DYNAMIC_APPEND)
),
),
(
"fixed_index", &Router::new()
- .download(&API_METHOD_FIXED_CHUNK_INDEX)
.post(&API_METHOD_CREATE_FIXED_INDEX)
.put(&API_METHOD_FIXED_APPEND)
),
+ (
+ "previous", &Router::new()
+ .download(&API_METHOD_DOWNLOAD_PREVIOUS)
+ ),
(
"speedtest", &Router::new()
.upload(&API_METHOD_UPLOAD_SPEEDTEST)
}
#[sortable]
-pub const API_METHOD_DYNAMIC_CHUNK_INDEX: ApiMethod = ApiMethod::new(
- &ApiHandler::AsyncHttp(&dynamic_chunk_index),
+pub const API_METHOD_DOWNLOAD_PREVIOUS: ApiMethod = ApiMethod::new(
+ &ApiHandler::AsyncHttp(&download_previous),
&ObjectSchema::new(
- r###"
-Download the dynamic chunk index from the previous backup.
-Simply returns an empty list if this is the first backup.
-"### ,
+ "Download archive from previous backup.",
&sorted!([
("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
]),
)
);
-fn dynamic_chunk_index(
+fn download_previous(
_parts: Parts,
_req_body: Body,
param: Value,
let archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned();
- if !archive_name.ends_with(".didx") {
- bail!("wrong archive extension: '{}'", archive_name);
- }
-
- let empty_response = {
- Response::builder()
- .status(StatusCode::OK)
- .body(Body::empty())?
- };
-
let last_backup = match &env.last_backup {
Some(info) => info,
- None => return Ok(empty_response),
+ None => bail!("no previous backup"),
};
- let mut path = last_backup.backup_dir.relative_path();
- path.push(&archive_name);
-
- let index = match env.datastore.open_dynamic_reader(path) {
- Ok(index) => index,
- Err(_) => {
- env.log(format!("there is no last backup for archive '{}'", archive_name));
- return Ok(empty_response);
- }
- };
-
- env.log(format!("download last backup index for archive '{}'", archive_name));
-
- let count = index.index_count();
- for pos in 0..count {
- let info = index.chunk_info(pos)?;
- let size = info.size() as u32;
- env.register_chunk(info.digest, size)?;
- }
-
- let reader = DigestListEncoder::new(Box::new(index));
-
- let stream = WrappedReaderStream::new(reader);
-
- // fixme: set size, content type?
- let response = http::Response::builder()
- .status(200)
- .body(Body::wrap_stream(stream))?;
-
- Ok(response)
- }.boxed()
-}
+ env.log(format!("download '{}' from previous backup.", archive_name));
-#[sortable]
-pub const API_METHOD_FIXED_CHUNK_INDEX: ApiMethod = ApiMethod::new(
- &ApiHandler::AsyncHttp(&fixed_chunk_index),
- &ObjectSchema::new(
- r###"
-Download the fixed chunk index from the previous backup.
-Simply returns an empty list if this is the first backup.
-"### ,
- &sorted!([
- ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
- ]),
- )
-);
-
-fn fixed_chunk_index(
- _parts: Parts,
- _req_body: Body,
- param: Value,
- _info: &ApiMethod,
- rpcenv: Box<dyn RpcEnvironment>,
-) -> ApiResponseFuture {
-
- async move {
- let env: &BackupEnvironment = rpcenv.as_ref();
-
- let archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned();
-
- if !archive_name.ends_with(".fidx") {
- bail!("wrong archive extension: '{}'", archive_name);
- }
-
- let empty_response = {
- Response::builder()
- .status(StatusCode::OK)
- .body(Body::empty())?
- };
-
- let last_backup = match &env.last_backup {
- Some(info) => info,
- None => return Ok(empty_response),
- };
-
- let mut path = last_backup.backup_dir.relative_path();
+ let mut path = env.datastore.snapshot_path(&last_backup.backup_dir);
path.push(&archive_name);
- let index = match env.datastore.open_fixed_reader(path) {
- Ok(index) => index,
- Err(_) => {
- env.log(format!("there is no last backup for archive '{}'", archive_name));
- return Ok(empty_response);
- }
- };
-
- env.log(format!("download last backup index for archive '{}'", archive_name));
-
- let count = index.index_count();
- let image_size = index.index_bytes();
- for pos in 0..count {
- let digest = index.index_digest(pos).unwrap();
- // Note: last chunk can be smaller
- let start = (pos*index.chunk_size) as u64;
- let mut end = start + index.chunk_size as u64;
- if end > image_size { end = image_size; }
- let size = (end - start) as u32;
- env.register_chunk(*digest, size)?;
- }
-
- let reader = DigestListEncoder::new(Box::new(index));
-
- let stream = WrappedReaderStream::new(reader);
-
- // fixme: set size, content type?
- let response = http::Response::builder()
- .status(200)
- .body(Body::wrap_stream(stream))?;
-
- Ok(response)
+ crate::api2::helpers::create_download_response(path).await
}.boxed()
}
use std::collections::HashMap;
use std::ops::Range;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-
-use bytes::{Bytes, BytesMut};
-use anyhow::{format_err, Error};
-use futures::*;
pub struct ChunkReadInfo {
pub range: Range<u64>,
map
}
}
-
-/// Encode digest list from an `IndexFile` into a binary stream
-///
-/// The reader simply returns a birary stream of 32 byte digest values.
-pub struct DigestListEncoder {
- index: Box<dyn IndexFile + Send + Sync>,
- pos: usize,
- count: usize,
-}
-
-impl DigestListEncoder {
-
- pub fn new(index: Box<dyn IndexFile + Send + Sync>) -> Self {
- let count = index.index_count();
- Self { index, pos: 0, count }
- }
-}
-
-impl std::io::Read for DigestListEncoder {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
- if buf.len() < 32 {
- panic!("read buffer too small");
- }
-
- if self.pos < self.count {
- let mut written = 0;
- loop {
- let digest = self.index.index_digest(self.pos).unwrap();
- buf[written..(written + 32)].copy_from_slice(digest);
- self.pos += 1;
- written += 32;
- if self.pos >= self.count {
- break;
- }
- if (written + 32) >= buf.len() {
- break;
- }
- }
- Ok(written)
- } else {
- Ok(0)
- }
- }
-}
-
-/// Decodes a Stream<Item=Bytes> into Stream<Item=<[u8;32]>
-///
-/// The reader simply returns a birary stream of 32 byte digest values.
-
-pub struct DigestListDecoder<S: Unpin> {
- input: S,
- buffer: BytesMut,
-}
-
-impl<S: Unpin> DigestListDecoder<S> {
- pub fn new(input: S) -> Self {
- Self { input, buffer: BytesMut::new() }
- }
-}
-
-impl<S: Unpin> Unpin for DigestListDecoder<S> {}
-
-impl<S: Unpin, E> Stream for DigestListDecoder<S>
-where
- S: Stream<Item=Result<Bytes, E>>,
- E: Into<Error>,
-{
- type Item = Result<[u8; 32], Error>;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
- let this = self.get_mut();
-
- loop {
- if this.buffer.len() >= 32 {
- let left = this.buffer.split_to(32);
-
- let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit();
- unsafe {
- (*digest.as_mut_ptr()).copy_from_slice(&left[..]);
- return Poll::Ready(Some(Ok(digest.assume_init())));
- }
- }
-
- match Pin::new(&mut this.input).poll_next(cx) {
- Poll::Pending => {
- return Poll::Pending;
- }
- Poll::Ready(Some(Err(err))) => {
- return Poll::Ready(Some(Err(err.into())));
- }
- Poll::Ready(Some(Ok(data))) => {
- this.buffer.extend_from_slice(&data);
- // continue
- }
- Poll::Ready(None) => {
- let rest = this.buffer.len();
- if rest == 0 {
- return Poll::Ready(None);
- }
- return Poll::Ready(Some(Err(format_err!(
- "got small digest ({} != 32).",
- rest,
- ))));
- }
- }
- }
- }
-}
Ok((group.backup_type().to_owned(), group.backup_id().to_owned(), backup_time))
}
-
async fn backup_directory<P: AsRef<Path>>(
client: &BackupWriter,
+ previous_manifest: Option<Arc<BackupManifest>>,
dir_path: P,
archive_name: &str,
chunk_size: Option<usize>,
device_set: Option<HashSet<u64>>,
verbose: bool,
skip_lost_and_found: bool,
- crypt_config: Option<Arc<CryptConfig>>,
catalog: Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
exclude_pattern: Vec<MatchEntry>,
entries_max: usize,
});
let stats = client
- .upload_stream(archive_name, stream, "dynamic", None, crypt_config)
+ .upload_stream(previous_manifest, archive_name, stream, "dynamic", None)
.await?;
Ok(stats)
async fn backup_image<P: AsRef<Path>>(
client: &BackupWriter,
+ previous_manifest: Option<Arc<BackupManifest>>,
image_path: P,
archive_name: &str,
image_size: u64,
chunk_size: Option<usize>,
_verbose: bool,
- crypt_config: Option<Arc<CryptConfig>>,
) -> Result<BackupStats, Error> {
let path = image_path.as_ref().to_owned();
let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4*1024*1024));
let stats = client
- .upload_stream(archive_name, stream, "fixed", Some(image_size), crypt_config)
+ .upload_stream(previous_manifest, archive_name, stream, "fixed", Some(image_size))
.await?;
Ok(stats)
}
fn spawn_catalog_upload(
- client: Arc<BackupWriter>,
- crypt_config: Option<Arc<CryptConfig>>,
+ client: Arc<BackupWriter>
) -> Result<
(
Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
tokio::spawn(async move {
let catalog_upload_result = client
- .upload_stream(CATALOG_NAME, catalog_chunk_stream, "dynamic", None, crypt_config)
+ .upload_stream(None, CATALOG_NAME, catalog_chunk_stream, "dynamic", None)
.await;
if let Err(ref err) = catalog_upload_result {
let client = BackupWriter::start(
client,
+ crypt_config.clone(),
repo.store(),
backup_type,
&backup_id,
verbose,
).await?;
+ let previous_manifest = if let Ok(previous_manifest) = client.download_previous_manifest().await {
+ Some(Arc::new(previous_manifest))
+ } else {
+ None
+ };
+
let snapshot = BackupDir::new(backup_type, backup_id, backup_time.timestamp());
let mut manifest = BackupManifest::new(snapshot);
BackupSpecificationType::CONFIG => {
println!("Upload config file '{}' to '{:?}' as {}", filename, repo, target);
let stats = client
- .upload_blob_from_file(&filename, &target, crypt_config.clone(), true)
+ .upload_blob_from_file(&filename, &target, true, Some(true))
.await?;
manifest.add_file(target, stats.size, stats.csum, is_encrypted)?;
}
BackupSpecificationType::LOGFILE => { // fixme: remove - not needed anymore ?
println!("Upload log file '{}' to '{:?}' as {}", filename, repo, target);
let stats = client
- .upload_blob_from_file(&filename, &target, crypt_config.clone(), true)
+ .upload_blob_from_file(&filename, &target, true, Some(true))
.await?;
manifest.add_file(target, stats.size, stats.csum, is_encrypted)?;
}
BackupSpecificationType::PXAR => {
// start catalog upload on first use
if catalog.is_none() {
- let (cat, res) = spawn_catalog_upload(client.clone(), crypt_config.clone())?;
+ let (cat, res) = spawn_catalog_upload(client.clone())?;
catalog = Some(cat);
catalog_result_tx = Some(res);
}
catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
let stats = backup_directory(
&client,
+ previous_manifest.clone(),
&filename,
&target,
chunk_size_opt,
devices.clone(),
verbose,
skip_lost_and_found,
- crypt_config.clone(),
catalog.clone(),
pattern_list.clone(),
entries_max as usize,
println!("Upload image '{}' to '{:?}' as {}", filename, repo, target);
let stats = backup_image(
&client,
- &filename,
+ previous_manifest.clone(),
+ &filename,
&target,
size,
chunk_size_opt,
verbose,
- crypt_config.clone(),
).await?;
manifest.add_file(target, stats.size, stats.csum, is_encrypted)?;
}
let target = "rsa-encrypted.key";
println!("Upload RSA encoded key to '{:?}' as {}", repo, target);
let stats = client
- .upload_blob_from_data(rsa_encrypted_key, target, None, false, false)
+ .upload_blob_from_data(rsa_encrypted_key, target, false, None)
.await?;
manifest.add_file(format!("{}.blob", target), stats.size, stats.csum, is_encrypted)?;
println!("Upload index.json to '{:?}'", repo);
let manifest = serde_json::to_string_pretty(&manifest)?.into();
client
- .upload_blob_from_data(manifest, MANIFEST_BLOB_NAME, crypt_config.clone(), true, true)
+ .upload_blob_from_data(manifest, MANIFEST_BLOB_NAME, true, Some(true))
.await?;
client.finish().await?;
use std::collections::HashSet;
+use std::os::unix::fs::OpenOptionsExt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
h2: H2Client,
abort: AbortHandle,
verbose: bool,
+ crypt_config: Option<Arc<CryptConfig>>,
}
impl Drop for BackupWriter {
impl BackupWriter {
- fn new(h2: H2Client, abort: AbortHandle, verbose: bool) -> Arc<Self> {
- Arc::new(Self { h2, abort, verbose })
+ fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>, verbose: bool) -> Arc<Self> {
+ Arc::new(Self { h2, abort, crypt_config, verbose })
}
pub async fn start(
client: HttpClient,
+ crypt_config: Option<Arc<CryptConfig>>,
datastore: &str,
backup_type: &str,
backup_id: &str,
let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
- Ok(BackupWriter::new(h2, abort, debug))
+ Ok(BackupWriter::new(h2, abort, crypt_config, debug))
}
pub async fn get(
&self,
data: Vec<u8>,
file_name: &str,
- crypt_config: Option<Arc<CryptConfig>>,
compress: bool,
- sign_only: bool,
+ crypt_or_sign: Option<bool>,
) -> Result<BackupStats, Error> {
- let blob = if let Some(ref crypt_config) = crypt_config {
- if sign_only {
- DataBlob::create_signed(&data, crypt_config, compress)?
+ let blob = if let Some(ref crypt_config) = self.crypt_config {
+ if let Some(encrypt) = crypt_or_sign {
+ if encrypt {
+ DataBlob::encode(&data, Some(crypt_config), compress)?
+ } else {
+ DataBlob::create_signed(&data, crypt_config, compress)?
+ }
} else {
- DataBlob::encode(&data, Some(crypt_config), compress)?
+ DataBlob::encode(&data, None, compress)?
}
} else {
DataBlob::encode(&data, None, compress)?
&self,
src_path: P,
file_name: &str,
- crypt_config: Option<Arc<CryptConfig>>,
compress: bool,
+ crypt_or_sign: Option<bool>,
) -> Result<BackupStats, Error> {
let src_path = src_path.as_ref();
.await
.map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
- let blob = DataBlob::encode(&contents, crypt_config.as_ref().map(AsRef::as_ref), compress)?;
- let raw_data = blob.into_inner();
- let size = raw_data.len() as u64;
- let csum = openssl::sha::sha256(&raw_data);
- let param = json!({
- "encoded-size": size,
- "file-name": file_name,
- });
- self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
- Ok(BackupStats { size, csum })
+ self.upload_blob_from_data(contents, file_name, compress, crypt_or_sign).await
}
pub async fn upload_stream(
&self,
+ previous_manifest: Option<Arc<BackupManifest>>,
archive_name: &str,
stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
prefix: &str,
fixed_size: Option<u64>,
- crypt_config: Option<Arc<CryptConfig>>,
) -> Result<BackupStats, Error> {
let known_chunks = Arc::new(Mutex::new(HashSet::new()));
let index_path = format!("{}_index", prefix);
let close_path = format!("{}_close", prefix);
- self.download_chunk_list(&index_path, archive_name, known_chunks.clone()).await?;
+ if let Some(manifest) = previous_manifest {
+ // try, but ignore errors
+ match archive_type(archive_name) {
+ Ok(ArchiveType::FixedIndex) => {
+ let _ = self.download_previous_fixed_index(archive_name, &manifest, known_chunks.clone()).await;
+ }
+ Ok(ArchiveType::DynamicIndex) => {
+ let _ = self.download_previous_dynamic_index(archive_name, &manifest, known_chunks.clone()).await;
+ }
+ _ => { /* do nothing */ }
+ }
+ }
let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
stream,
&prefix,
known_chunks.clone(),
- crypt_config,
+ self.crypt_config.clone(),
self.verbose,
)
.await?;
(verify_queue_tx, verify_result_rx)
}
- pub async fn download_chunk_list(
+ pub async fn download_previous_fixed_index(
&self,
- path: &str,
archive_name: &str,
+ manifest: &BackupManifest,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- ) -> Result<(), Error> {
+ ) -> Result<FixedIndexReader, Error> {
- let param = json!({ "archive-name": archive_name });
- let request = H2Client::request_builder("localhost", "GET", path, Some(param), None).unwrap();
+ let mut tmpfile = std::fs::OpenOptions::new()
+ .write(true)
+ .read(true)
+ .custom_flags(libc::O_TMPFILE)
+ .open("/tmp")?;
- let h2request = self.h2.send_request(request, None).await?;
- let resp = h2request.await?;
-
- let status = resp.status();
+ let param = json!({ "archive-name": archive_name });
+ self.h2.download("previous", Some(param), &mut tmpfile).await?;
+
+ let index = FixedIndexReader::new(tmpfile)
+ .map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?;
+ // Note: do not use values stored in index (not trusted) - instead, computed them again
+ let (csum, size) = index.compute_csum();
+ manifest.verify_file(archive_name, &csum, size)?;
+
+ // add index chunks to known chunks
+ let mut known_chunks = known_chunks.lock().unwrap();
+ for i in 0..index.index_count() {
+ known_chunks.insert(*index.index_digest(i).unwrap());
+ }
- if !status.is_success() {
- H2Client::h2api_response(resp).await?; // raise error
- unreachable!();
+ if self.verbose {
+ println!("{}: known chunks list length is {}", archive_name, index.index_count());
}
- let mut body = resp.into_body();
- let mut flow_control = body.flow_control().clone();
+ Ok(index)
+ }
+
+ pub async fn download_previous_dynamic_index(
+ &self,
+ archive_name: &str,
+ manifest: &BackupManifest,
+ known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+ ) -> Result<DynamicIndexReader, Error> {
- let mut stream = DigestListDecoder::new(body.map_err(Error::from));
+ let mut tmpfile = std::fs::OpenOptions::new()
+ .write(true)
+ .read(true)
+ .custom_flags(libc::O_TMPFILE)
+ .open("/tmp")?;
- while let Some(chunk) = stream.try_next().await? {
- let _ = flow_control.release_capacity(chunk.len());
- known_chunks.lock().unwrap().insert(chunk);
+ let param = json!({ "archive-name": archive_name });
+ self.h2.download("previous", Some(param), &mut tmpfile).await?;
+
+ let index = DynamicIndexReader::new(tmpfile)
+ .map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?;
+ // Note: do not use values stored in index (not trusted) - instead, computed them again
+ let (csum, size) = index.compute_csum();
+ manifest.verify_file(archive_name, &csum, size)?;
+
+ // add index chunks to known chunks
+ let mut known_chunks = known_chunks.lock().unwrap();
+ for i in 0..index.index_count() {
+ known_chunks.insert(*index.index_digest(i).unwrap());
}
if self.verbose {
- println!("{}: known chunks list length is {}", archive_name, known_chunks.lock().unwrap().len());
+ println!("{}: known chunks list length is {}", archive_name, index.index_count());
}
- Ok(())
+ Ok(index)
+ }
+
+ /// Download backup manifest (index.json) of last backup
+ pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
+
+ use std::convert::TryFrom;
+
+ let mut raw_data = Vec::with_capacity(64 * 1024);
+
+ let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
+ self.h2.download("previous", Some(param), &mut raw_data).await?;
+
+ let blob = DataBlob::from_raw(raw_data)?;
+ blob.verify_crc()?;
+ let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
+ let json: Value = serde_json::from_slice(&data[..])?;
+ let manifest = BackupManifest::try_from(json)?;
+
+ Ok(manifest)
}
fn upload_chunk_info_stream(