]>
Commit | Line | Data |
---|---|---|
f4bf7dfc | 1 | use std::collections::HashMap; |
bdfa6370 | 2 | use std::future::Future; |
4d16badf | 3 | use std::pin::Pin; |
e9764238 | 4 | use std::sync::{Arc, Mutex}; |
3c0facc7 | 5 | |
14f6c9cb | 6 | use anyhow::{bail, Error}; |
7f99bf69 | 7 | |
9a1b24b6 DM |
8 | use proxmox_async::runtime::block_on; |
9 | ||
bbdda58b | 10 | use pbs_api_types::CryptMode; |
be3a0295 | 11 | use pbs_datastore::data_blob::DataBlob; |
72fbe9ff | 12 | use pbs_datastore::read_chunk::AsyncReadChunk; |
bdfa6370 TL |
13 | use pbs_datastore::read_chunk::ReadChunk; |
14 | use pbs_tools::crypt_config::CryptConfig; | |
7f99bf69 | 15 | |
be3a0295 | 16 | use super::BackupReader; |
be3a0295 | 17 | |
7f99bf69 | 18 | /// Read chunks from remote host using ``BackupReader`` |
7443a6e0 | 19 | #[derive(Clone)] |
7f99bf69 DM |
20 | pub struct RemoteChunkReader { |
21 | client: Arc<BackupReader>, | |
22 | crypt_config: Option<Arc<CryptConfig>>, | |
14f6c9cb | 23 | crypt_mode: CryptMode, |
7ecfde81 | 24 | cache_hint: Arc<HashMap<[u8; 32], usize>>, |
7443a6e0 | 25 | cache: Arc<Mutex<HashMap<[u8; 32], Vec<u8>>>>, |
7f99bf69 DM |
26 | } |
27 | ||
28 | impl RemoteChunkReader { | |
f4bf7dfc DM |
29 | /// Create a new instance. |
30 | /// | |
31 | /// Chunks listed in ``cache_hint`` are cached and kept in RAM. | |
32 | pub fn new( | |
33 | client: Arc<BackupReader>, | |
34 | crypt_config: Option<Arc<CryptConfig>>, | |
14f6c9cb | 35 | crypt_mode: CryptMode, |
f4bf7dfc DM |
36 | cache_hint: HashMap<[u8; 32], usize>, |
37 | ) -> Self { | |
a609cf21 WB |
38 | Self { |
39 | client, | |
40 | crypt_config, | |
14f6c9cb | 41 | crypt_mode, |
7ecfde81 | 42 | cache_hint: Arc::new(cache_hint), |
7443a6e0 | 43 | cache: Arc::new(Mutex::new(HashMap::new())), |
a609cf21 | 44 | } |
7f99bf69 | 45 | } |
4d16badf | 46 | |
9e496ff6 FG |
47 | /// Downloads raw chunk. This only verifies the (untrusted) CRC32, use |
48 | /// DataBlob::verify_unencrypted or DataBlob::decode before storing/processing further. | |
e9764238 | 49 | pub async fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> { |
4d16badf WB |
50 | let mut chunk_data = Vec::with_capacity(4 * 1024 * 1024); |
51 | ||
bdfa6370 | 52 | self.client.download_chunk(digest, &mut chunk_data).await?; |
4d16badf | 53 | |
39f18b30 | 54 | let chunk = DataBlob::load_from_reader(&mut &chunk_data[..])?; |
4d16badf | 55 | |
14f6c9cb | 56 | match self.crypt_mode { |
bdfa6370 TL |
57 | CryptMode::Encrypt => match chunk.crypt_mode()? { |
58 | CryptMode::Encrypt => Ok(chunk), | |
59 | CryptMode::SignOnly | CryptMode::None => { | |
60 | bail!("Index and chunk CryptMode don't match.") | |
14f6c9cb FG |
61 | } |
62 | }, | |
bdfa6370 TL |
63 | CryptMode::SignOnly | CryptMode::None => match chunk.crypt_mode()? { |
64 | CryptMode::Encrypt => bail!("Index and chunk CryptMode don't match."), | |
65 | CryptMode::SignOnly | CryptMode::None => Ok(chunk), | |
14f6c9cb FG |
66 | }, |
67 | } | |
4d16badf | 68 | } |
7f99bf69 DM |
69 | } |
70 | ||
71 | impl ReadChunk for RemoteChunkReader { | |
e9764238 | 72 | fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> { |
16021f6a | 73 | block_on(Self::read_raw_chunk(self, digest)) |
3758b398 DM |
74 | } |
75 | ||
e9764238 DM |
76 | fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> { |
77 | if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) { | |
3758b398 DM |
78 | return Ok(raw_data.to_vec()); |
79 | } | |
4ee8f53d | 80 | |
4d16badf | 81 | let chunk = ReadChunk::read_raw_chunk(self, digest)?; |
7f99bf69 | 82 | |
8819d1f2 | 83 | let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?; |
3758b398 DM |
84 | |
85 | let use_cache = self.cache_hint.contains_key(digest); | |
f4bf7dfc | 86 | if use_cache { |
e9764238 | 87 | (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec()); |
f4bf7dfc DM |
88 | } |
89 | ||
7f99bf69 DM |
90 | Ok(raw_data) |
91 | } | |
92 | } | |
4d16badf WB |
93 | |
94 | impl AsyncReadChunk for RemoteChunkReader { | |
95 | fn read_raw_chunk<'a>( | |
e9764238 | 96 | &'a self, |
4d16badf WB |
97 | digest: &'a [u8; 32], |
98 | ) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> { | |
99 | Box::pin(Self::read_raw_chunk(self, digest)) | |
100 | } | |
101 | ||
102 | fn read_chunk<'a>( | |
e9764238 | 103 | &'a self, |
4d16badf WB |
104 | digest: &'a [u8; 32], |
105 | ) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> { | |
106 | Box::pin(async move { | |
e9764238 | 107 | if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) { |
4d16badf WB |
108 | return Ok(raw_data.to_vec()); |
109 | } | |
110 | ||
111 | let chunk = Self::read_raw_chunk(self, digest).await?; | |
112 | ||
bdfa6370 TL |
113 | let raw_data = |
114 | chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?; | |
4d16badf WB |
115 | |
116 | let use_cache = self.cache_hint.contains_key(digest); | |
117 | if use_cache { | |
e9764238 | 118 | (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec()); |
4d16badf WB |
119 | } |
120 | ||
121 | Ok(raw_data) | |
122 | }) | |
123 | } | |
124 | } |