]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/remote_chunk_reader.rs
blobs: attempt to verify on decode when possible
[proxmox-backup.git] / src / client / remote_chunk_reader.rs
1 use std::future::Future;
2 use std::collections::HashMap;
3 use std::pin::Pin;
4 use std::sync::{Arc, Mutex};
5
6 use anyhow::Error;
7
8 use super::BackupReader;
9 use crate::backup::{AsyncReadChunk, CryptConfig, DataBlob, ReadChunk};
10 use crate::tools::runtime::block_on;
11
12 /// Read chunks from remote host using ``BackupReader``
13 #[derive(Clone)]
14 pub struct RemoteChunkReader {
15 client: Arc<BackupReader>,
16 crypt_config: Option<Arc<CryptConfig>>,
17 cache_hint: HashMap<[u8; 32], usize>,
18 cache: Arc<Mutex<HashMap<[u8; 32], Vec<u8>>>>,
19 }
20
21 impl RemoteChunkReader {
22 /// Create a new instance.
23 ///
24 /// Chunks listed in ``cache_hint`` are cached and kept in RAM.
25 pub fn new(
26 client: Arc<BackupReader>,
27 crypt_config: Option<Arc<CryptConfig>>,
28 cache_hint: HashMap<[u8; 32], usize>,
29 ) -> Self {
30 Self {
31 client,
32 crypt_config,
33 cache_hint,
34 cache: Arc::new(Mutex::new(HashMap::new())),
35 }
36 }
37
38 pub async fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
39 let mut chunk_data = Vec::with_capacity(4 * 1024 * 1024);
40
41 self.client
42 .download_chunk(&digest, &mut chunk_data)
43 .await?;
44
45 let chunk = DataBlob::load_from_reader(&mut &chunk_data[..])?;
46
47 // fixme: verify digest?
48
49 Ok(chunk)
50 }
51 }
52
53 impl ReadChunk for RemoteChunkReader {
54 fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
55 block_on(Self::read_raw_chunk(self, digest))
56 }
57
58 fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> {
59 if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
60 return Ok(raw_data.to_vec());
61 }
62
63 let chunk = ReadChunk::read_raw_chunk(self, digest)?;
64
65 let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
66
67 let use_cache = self.cache_hint.contains_key(digest);
68 if use_cache {
69 (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
70 }
71
72 Ok(raw_data)
73 }
74 }
75
76 impl AsyncReadChunk for RemoteChunkReader {
77 fn read_raw_chunk<'a>(
78 &'a self,
79 digest: &'a [u8; 32],
80 ) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> {
81 Box::pin(Self::read_raw_chunk(self, digest))
82 }
83
84 fn read_chunk<'a>(
85 &'a self,
86 digest: &'a [u8; 32],
87 ) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> {
88 Box::pin(async move {
89 if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
90 return Ok(raw_data.to_vec());
91 }
92
93 let chunk = Self::read_raw_chunk(self, digest).await?;
94
95 let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
96
97 let use_cache = self.cache_hint.contains_key(digest);
98 if use_cache {
99 (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
100 }
101
102 Ok(raw_data)
103 })
104 }
105 }