]> git.proxmox.com Git - proxmox-backup.git/blob - pbs-client/src/remote_chunk_reader.rs
tree-wide: fix needless borrows
[proxmox-backup.git] / pbs-client / src / remote_chunk_reader.rs
1 use std::future::Future;
2 use std::collections::HashMap;
3 use std::pin::Pin;
4 use std::sync::{Arc, Mutex};
5
6 use anyhow::{bail, Error};
7
8 use proxmox_async::runtime::block_on;
9
10 use pbs_tools::crypt_config::CryptConfig;
11 use pbs_api_types::CryptMode;
12 use pbs_datastore::data_blob::DataBlob;
13 use pbs_datastore::read_chunk::ReadChunk;
14 use pbs_datastore::read_chunk::AsyncReadChunk;
15
16 use super::BackupReader;
17
18 /// Read chunks from remote host using ``BackupReader``
19 #[derive(Clone)]
20 pub struct RemoteChunkReader {
21 client: Arc<BackupReader>,
22 crypt_config: Option<Arc<CryptConfig>>,
23 crypt_mode: CryptMode,
24 cache_hint: Arc<HashMap<[u8; 32], usize>>,
25 cache: Arc<Mutex<HashMap<[u8; 32], Vec<u8>>>>,
26 }
27
28 impl RemoteChunkReader {
29 /// Create a new instance.
30 ///
31 /// Chunks listed in ``cache_hint`` are cached and kept in RAM.
32 pub fn new(
33 client: Arc<BackupReader>,
34 crypt_config: Option<Arc<CryptConfig>>,
35 crypt_mode: CryptMode,
36 cache_hint: HashMap<[u8; 32], usize>,
37 ) -> Self {
38 Self {
39 client,
40 crypt_config,
41 crypt_mode,
42 cache_hint: Arc::new(cache_hint),
43 cache: Arc::new(Mutex::new(HashMap::new())),
44 }
45 }
46
47 /// Downloads raw chunk. This only verifies the (untrusted) CRC32, use
48 /// DataBlob::verify_unencrypted or DataBlob::decode before storing/processing further.
49 pub async fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
50 let mut chunk_data = Vec::with_capacity(4 * 1024 * 1024);
51
52 self.client
53 .download_chunk(digest, &mut chunk_data)
54 .await?;
55
56 let chunk = DataBlob::load_from_reader(&mut &chunk_data[..])?;
57
58 match self.crypt_mode {
59 CryptMode::Encrypt => {
60 match chunk.crypt_mode()? {
61 CryptMode::Encrypt => Ok(chunk),
62 CryptMode::SignOnly | CryptMode::None => bail!("Index and chunk CryptMode don't match."),
63 }
64 },
65 CryptMode::SignOnly | CryptMode::None => {
66 match chunk.crypt_mode()? {
67 CryptMode::Encrypt => bail!("Index and chunk CryptMode don't match."),
68 CryptMode::SignOnly | CryptMode::None => Ok(chunk),
69 }
70 },
71 }
72 }
73 }
74
75 impl ReadChunk for RemoteChunkReader {
76 fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
77 block_on(Self::read_raw_chunk(self, digest))
78 }
79
80 fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> {
81 if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
82 return Ok(raw_data.to_vec());
83 }
84
85 let chunk = ReadChunk::read_raw_chunk(self, digest)?;
86
87 let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
88
89 let use_cache = self.cache_hint.contains_key(digest);
90 if use_cache {
91 (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
92 }
93
94 Ok(raw_data)
95 }
96 }
97
98 impl AsyncReadChunk for RemoteChunkReader {
99 fn read_raw_chunk<'a>(
100 &'a self,
101 digest: &'a [u8; 32],
102 ) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> {
103 Box::pin(Self::read_raw_chunk(self, digest))
104 }
105
106 fn read_chunk<'a>(
107 &'a self,
108 digest: &'a [u8; 32],
109 ) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> {
110 Box::pin(async move {
111 if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
112 return Ok(raw_data.to_vec());
113 }
114
115 let chunk = Self::read_raw_chunk(self, digest).await?;
116
117 let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
118
119 let use_cache = self.cache_hint.contains_key(digest);
120 if use_cache {
121 (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
122 }
123
124 Ok(raw_data)
125 })
126 }
127 }