]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/remote_chunk_reader.rs
remote_chunk_reader.rs: use Arc for cache_hint to make clone faster
[proxmox-backup.git] / src / client / remote_chunk_reader.rs
1 use std::future::Future;
2 use std::collections::HashMap;
3 use std::pin::Pin;
4 use std::sync::{Arc, Mutex};
5
6 use anyhow::{bail, Error};
7
8 use super::BackupReader;
9 use crate::backup::{AsyncReadChunk, CryptConfig, CryptMode, DataBlob, ReadChunk};
10 use crate::tools::runtime::block_on;
11
12 /// Read chunks from remote host using ``BackupReader``
13 #[derive(Clone)]
14 pub struct RemoteChunkReader {
15 client: Arc<BackupReader>,
16 crypt_config: Option<Arc<CryptConfig>>,
17 crypt_mode: CryptMode,
18 cache_hint: Arc<HashMap<[u8; 32], usize>>,
19 cache: Arc<Mutex<HashMap<[u8; 32], Vec<u8>>>>,
20 }
21
22 impl RemoteChunkReader {
23 /// Create a new instance.
24 ///
25 /// Chunks listed in ``cache_hint`` are cached and kept in RAM.
26 pub fn new(
27 client: Arc<BackupReader>,
28 crypt_config: Option<Arc<CryptConfig>>,
29 crypt_mode: CryptMode,
30 cache_hint: HashMap<[u8; 32], usize>,
31 ) -> Self {
32 Self {
33 client,
34 crypt_config,
35 crypt_mode,
36 cache_hint: Arc::new(cache_hint),
37 cache: Arc::new(Mutex::new(HashMap::new())),
38 }
39 }
40
41 /// Downloads raw chunk. This only verifies the (untrusted) CRC32, use
42 /// DataBlob::verify_unencrypted or DataBlob::decode before storing/processing further.
43 pub async fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
44 let mut chunk_data = Vec::with_capacity(4 * 1024 * 1024);
45
46 self.client
47 .download_chunk(&digest, &mut chunk_data)
48 .await?;
49
50 let chunk = DataBlob::load_from_reader(&mut &chunk_data[..])?;
51
52 match self.crypt_mode {
53 CryptMode::Encrypt => {
54 match chunk.crypt_mode()? {
55 CryptMode::Encrypt => Ok(chunk),
56 CryptMode::SignOnly | CryptMode::None => bail!("Index and chunk CryptMode don't match."),
57 }
58 },
59 CryptMode::SignOnly | CryptMode::None => {
60 match chunk.crypt_mode()? {
61 CryptMode::Encrypt => bail!("Index and chunk CryptMode don't match."),
62 CryptMode::SignOnly | CryptMode::None => Ok(chunk),
63 }
64 },
65 }
66 }
67 }
68
69 impl ReadChunk for RemoteChunkReader {
70 fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
71 block_on(Self::read_raw_chunk(self, digest))
72 }
73
74 fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> {
75 if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
76 return Ok(raw_data.to_vec());
77 }
78
79 let chunk = ReadChunk::read_raw_chunk(self, digest)?;
80
81 let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
82
83 let use_cache = self.cache_hint.contains_key(digest);
84 if use_cache {
85 (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
86 }
87
88 Ok(raw_data)
89 }
90 }
91
92 impl AsyncReadChunk for RemoteChunkReader {
93 fn read_raw_chunk<'a>(
94 &'a self,
95 digest: &'a [u8; 32],
96 ) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> {
97 Box::pin(Self::read_raw_chunk(self, digest))
98 }
99
100 fn read_chunk<'a>(
101 &'a self,
102 digest: &'a [u8; 32],
103 ) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> {
104 Box::pin(async move {
105 if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
106 return Ok(raw_data.to_vec());
107 }
108
109 let chunk = Self::read_raw_chunk(self, digest).await?;
110
111 let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
112
113 let use_cache = self.cache_hint.contains_key(digest);
114 if use_cache {
115 (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
116 }
117
118 Ok(raw_data)
119 })
120 }
121 }