]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/client/remote_chunk_reader.rs
add tools::http for generic HTTP GET and move HttpsConnector there
[proxmox-backup.git] / src / client / remote_chunk_reader.rs
index be57f0f52deb7227fce753df5bf2f3ab6a521ac5..06f693a2b64b369455130a058254c1285e341365 100644 (file)
@@ -1,20 +1,22 @@
 use std::future::Future;
 use std::collections::HashMap;
 use std::pin::Pin;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 
-use anyhow::Error;
+use anyhow::{bail, Error};
 
 use super::BackupReader;
-use crate::backup::{AsyncReadChunk, CryptConfig, DataBlob, ReadChunk};
+use crate::backup::{AsyncReadChunk, CryptConfig, CryptMode, DataBlob, ReadChunk};
 use crate::tools::runtime::block_on;
 
 /// Read chunks from remote host using ``BackupReader``
+#[derive(Clone)]
 pub struct RemoteChunkReader {
     client: Arc<BackupReader>,
     crypt_config: Option<Arc<CryptConfig>>,
-    cache_hint: HashMap<[u8; 32], usize>,
-    cache: HashMap<[u8; 32], Vec<u8>>,
+    crypt_mode: CryptMode,
+    cache_hint: Arc<HashMap<[u8; 32], usize>>,
+    cache: Arc<Mutex<HashMap<[u8; 32], Vec<u8>>>>,
 }
 
 impl RemoteChunkReader {
@@ -24,63 +26,63 @@ impl RemoteChunkReader {
     pub fn new(
         client: Arc<BackupReader>,
         crypt_config: Option<Arc<CryptConfig>>,
+        crypt_mode: CryptMode,
         cache_hint: HashMap<[u8; 32], usize>,
     ) -> Self {
         Self {
             client,
             crypt_config,
-            cache_hint,
-            cache: HashMap::new(),
+            crypt_mode,
+            cache_hint: Arc::new(cache_hint),
+            cache: Arc::new(Mutex::new(HashMap::new())),
         }
     }
 
-    pub async fn read_raw_chunk(&mut self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
+    /// Downloads raw chunk. This only verifies the (untrusted) CRC32, use
+    /// DataBlob::verify_unencrypted or DataBlob::decode before storing/processing further.
+    pub async fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
         let mut chunk_data = Vec::with_capacity(4 * 1024 * 1024);
 
         self.client
             .download_chunk(&digest, &mut chunk_data)
             .await?;
 
-        let chunk = DataBlob::from_raw(chunk_data)?;
-        chunk.verify_crc()?;
-
-        Ok(chunk)
+        let chunk = DataBlob::load_from_reader(&mut &chunk_data[..])?;
+
+        match self.crypt_mode {
+            CryptMode::Encrypt => {
+                match chunk.crypt_mode()? {
+                    CryptMode::Encrypt => Ok(chunk),
+                    CryptMode::SignOnly | CryptMode::None => bail!("Index and chunk CryptMode don't match."),
+                }
+            },
+            CryptMode::SignOnly | CryptMode::None => {
+                match chunk.crypt_mode()? {
+                    CryptMode::Encrypt => bail!("Index and chunk CryptMode don't match."),
+                    CryptMode::SignOnly | CryptMode::None => Ok(chunk),
+                }
+            },
+        }
     }
 }
 
 impl ReadChunk for RemoteChunkReader {
-    fn read_raw_chunk(&mut self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
-        let mut chunk_data = Vec::with_capacity(4 * 1024 * 1024);
-
-        //tokio::task::block_in_place(|| futures::executor::block_on(self.client.download_chunk(&digest, &mut chunk_data)))?;
-        block_on(async {
-            // download_chunk returns the writer back to us, but we need to return a 'static value
-            self.client
-                .download_chunk(&digest, &mut chunk_data)
-                .await
-                .map(drop)
-        })?;
-
-        let chunk = DataBlob::from_raw(chunk_data)?;
-        chunk.verify_crc()?;
-
-        Ok(chunk)
+    fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
+        block_on(Self::read_raw_chunk(self, digest))
     }
 
-    fn read_chunk(&mut self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> {
-        if let Some(raw_data) = self.cache.get(digest) {
+    fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> {
+        if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
             return Ok(raw_data.to_vec());
         }
 
         let chunk = ReadChunk::read_raw_chunk(self, digest)?;
 
-        let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
-
-        // fixme: verify digest?
+        let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
 
         let use_cache = self.cache_hint.contains_key(digest);
         if use_cache {
-            self.cache.insert(*digest, raw_data.to_vec());
+            (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
         }
 
         Ok(raw_data)
@@ -89,30 +91,28 @@ impl ReadChunk for RemoteChunkReader {
 
 impl AsyncReadChunk for RemoteChunkReader {
     fn read_raw_chunk<'a>(
-        &'a mut self,
+        &'a self,
         digest: &'a [u8; 32],
     ) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> {
         Box::pin(Self::read_raw_chunk(self, digest))
     }
 
     fn read_chunk<'a>(
-        &'a mut self,
+        &'a self,
         digest: &'a [u8; 32],
     ) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> {
         Box::pin(async move {
-            if let Some(raw_data) = self.cache.get(digest) {
+            if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
                 return Ok(raw_data.to_vec());
             }
 
             let chunk = Self::read_raw_chunk(self, digest).await?;
 
-            let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
-
-            // fixme: verify digest?
+            let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
 
             let use_cache = self.cache_hint.contains_key(digest);
             if use_cache {
-                self.cache.insert(*digest, raw_data.to_vec());
+                (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
             }
 
             Ok(raw_data)