]> git.proxmox.com Git - proxmox-backup.git/blame - pbs-client/src/remote_chunk_reader.rs
use new proxmox-async crate
[proxmox-backup.git] / pbs-client / src / remote_chunk_reader.rs
CommitLineData
4d16badf 1use std::future::Future;
f4bf7dfc 2use std::collections::HashMap;
4d16badf 3use std::pin::Pin;
e9764238 4use std::sync::{Arc, Mutex};
3c0facc7 5
14f6c9cb 6use anyhow::{bail, Error};
7f99bf69 7
9a1b24b6
DM
8use proxmox_async::runtime::block_on;
9
bbdda58b
DM
10use pbs_tools::crypt_config::CryptConfig;
11use pbs_api_types::CryptMode;
be3a0295
WB
12use pbs_datastore::data_blob::DataBlob;
13use pbs_datastore::read_chunk::ReadChunk;
72fbe9ff 14use pbs_datastore::read_chunk::AsyncReadChunk;
7f99bf69 15
be3a0295 16use super::BackupReader;
be3a0295 17
7f99bf69 18/// Read chunks from remote host using ``BackupReader``
7443a6e0 19#[derive(Clone)]
7f99bf69
DM
20pub struct RemoteChunkReader {
21 client: Arc<BackupReader>,
22 crypt_config: Option<Arc<CryptConfig>>,
14f6c9cb 23 crypt_mode: CryptMode,
7ecfde81 24 cache_hint: Arc<HashMap<[u8; 32], usize>>,
7443a6e0 25 cache: Arc<Mutex<HashMap<[u8; 32], Vec<u8>>>>,
7f99bf69
DM
26}
27
28impl RemoteChunkReader {
f4bf7dfc
DM
29 /// Create a new instance.
30 ///
31 /// Chunks listed in ``cache_hint`` are cached and kept in RAM.
32 pub fn new(
33 client: Arc<BackupReader>,
34 crypt_config: Option<Arc<CryptConfig>>,
14f6c9cb 35 crypt_mode: CryptMode,
f4bf7dfc
DM
36 cache_hint: HashMap<[u8; 32], usize>,
37 ) -> Self {
a609cf21
WB
38 Self {
39 client,
40 crypt_config,
14f6c9cb 41 crypt_mode,
7ecfde81 42 cache_hint: Arc::new(cache_hint),
7443a6e0 43 cache: Arc::new(Mutex::new(HashMap::new())),
a609cf21 44 }
7f99bf69 45 }
4d16badf 46
9e496ff6
FG
47 /// Downloads raw chunk. This only verifies the (untrusted) CRC32, use
48 /// DataBlob::verify_unencrypted or DataBlob::decode before storing/processing further.
e9764238 49 pub async fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
4d16badf
WB
50 let mut chunk_data = Vec::with_capacity(4 * 1024 * 1024);
51
52 self.client
53 .download_chunk(&digest, &mut chunk_data)
54 .await?;
55
39f18b30 56 let chunk = DataBlob::load_from_reader(&mut &chunk_data[..])?;
4d16badf 57
14f6c9cb
FG
58 match self.crypt_mode {
59 CryptMode::Encrypt => {
60 match chunk.crypt_mode()? {
61 CryptMode::Encrypt => Ok(chunk),
62 CryptMode::SignOnly | CryptMode::None => bail!("Index and chunk CryptMode don't match."),
63 }
64 },
65 CryptMode::SignOnly | CryptMode::None => {
66 match chunk.crypt_mode()? {
67 CryptMode::Encrypt => bail!("Index and chunk CryptMode don't match."),
68 CryptMode::SignOnly | CryptMode::None => Ok(chunk),
69 }
70 },
71 }
4d16badf 72 }
7f99bf69
DM
73}
74
75impl ReadChunk for RemoteChunkReader {
e9764238 76 fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
16021f6a 77 block_on(Self::read_raw_chunk(self, digest))
3758b398
DM
78 }
79
e9764238
DM
80 fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> {
81 if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
3758b398
DM
82 return Ok(raw_data.to_vec());
83 }
4ee8f53d 84
4d16badf 85 let chunk = ReadChunk::read_raw_chunk(self, digest)?;
7f99bf69 86
8819d1f2 87 let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
3758b398
DM
88
89 let use_cache = self.cache_hint.contains_key(digest);
f4bf7dfc 90 if use_cache {
e9764238 91 (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
f4bf7dfc
DM
92 }
93
7f99bf69
DM
94 Ok(raw_data)
95 }
96}
4d16badf
WB
97
98impl AsyncReadChunk for RemoteChunkReader {
99 fn read_raw_chunk<'a>(
e9764238 100 &'a self,
4d16badf
WB
101 digest: &'a [u8; 32],
102 ) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> {
103 Box::pin(Self::read_raw_chunk(self, digest))
104 }
105
106 fn read_chunk<'a>(
e9764238 107 &'a self,
4d16badf
WB
108 digest: &'a [u8; 32],
109 ) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> {
110 Box::pin(async move {
e9764238 111 if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
4d16badf
WB
112 return Ok(raw_data.to_vec());
113 }
114
115 let chunk = Self::read_raw_chunk(self, digest).await?;
116
8819d1f2 117 let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
4d16badf
WB
118
119 let use_cache = self.cache_hint.contains_key(digest);
120 if use_cache {
e9764238 121 (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
4d16badf
WB
122 }
123
124 Ok(raw_data)
125 })
126 }
127}