]>
Commit | Line | Data |
---|---|---|
4d16badf | 1 | use std::future::Future; |
f4bf7dfc | 2 | use std::collections::HashMap; |
4d16badf | 3 | use std::pin::Pin; |
e9764238 | 4 | use std::sync::{Arc, Mutex}; |
3c0facc7 | 5 | |
14f6c9cb | 6 | use anyhow::{bail, Error}; |
7f99bf69 | 7 | |
9a1b24b6 DM |
8 | use proxmox_async::runtime::block_on; |
9 | ||
bbdda58b DM |
10 | use pbs_tools::crypt_config::CryptConfig; |
11 | use pbs_api_types::CryptMode; | |
be3a0295 WB |
12 | use pbs_datastore::data_blob::DataBlob; |
13 | use pbs_datastore::read_chunk::ReadChunk; | |
72fbe9ff | 14 | use pbs_datastore::read_chunk::AsyncReadChunk; |
7f99bf69 | 15 | |
be3a0295 | 16 | use super::BackupReader; |
be3a0295 | 17 | |
7f99bf69 | 18 | /// Read chunks from remote host using ``BackupReader`` |
7443a6e0 | 19 | #[derive(Clone)] |
7f99bf69 DM |
20 | pub struct RemoteChunkReader { |
21 | client: Arc<BackupReader>, | |
22 | crypt_config: Option<Arc<CryptConfig>>, | |
14f6c9cb | 23 | crypt_mode: CryptMode, |
7ecfde81 | 24 | cache_hint: Arc<HashMap<[u8; 32], usize>>, |
7443a6e0 | 25 | cache: Arc<Mutex<HashMap<[u8; 32], Vec<u8>>>>, |
7f99bf69 DM |
26 | } |
27 | ||
28 | impl RemoteChunkReader { | |
f4bf7dfc DM |
29 | /// Create a new instance. |
30 | /// | |
31 | /// Chunks listed in ``cache_hint`` are cached and kept in RAM. | |
32 | pub fn new( | |
33 | client: Arc<BackupReader>, | |
34 | crypt_config: Option<Arc<CryptConfig>>, | |
14f6c9cb | 35 | crypt_mode: CryptMode, |
f4bf7dfc DM |
36 | cache_hint: HashMap<[u8; 32], usize>, |
37 | ) -> Self { | |
a609cf21 WB |
38 | Self { |
39 | client, | |
40 | crypt_config, | |
14f6c9cb | 41 | crypt_mode, |
7ecfde81 | 42 | cache_hint: Arc::new(cache_hint), |
7443a6e0 | 43 | cache: Arc::new(Mutex::new(HashMap::new())), |
a609cf21 | 44 | } |
7f99bf69 | 45 | } |
4d16badf | 46 | |
9e496ff6 FG |
47 | /// Downloads raw chunk. This only verifies the (untrusted) CRC32, use |
48 | /// DataBlob::verify_unencrypted or DataBlob::decode before storing/processing further. | |
e9764238 | 49 | pub async fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> { |
4d16badf WB |
50 | let mut chunk_data = Vec::with_capacity(4 * 1024 * 1024); |
51 | ||
52 | self.client | |
9a37bd6c | 53 | .download_chunk(digest, &mut chunk_data) |
4d16badf WB |
54 | .await?; |
55 | ||
39f18b30 | 56 | let chunk = DataBlob::load_from_reader(&mut &chunk_data[..])?; |
4d16badf | 57 | |
14f6c9cb FG |
58 | match self.crypt_mode { |
59 | CryptMode::Encrypt => { | |
60 | match chunk.crypt_mode()? { | |
61 | CryptMode::Encrypt => Ok(chunk), | |
62 | CryptMode::SignOnly | CryptMode::None => bail!("Index and chunk CryptMode don't match."), | |
63 | } | |
64 | }, | |
65 | CryptMode::SignOnly | CryptMode::None => { | |
66 | match chunk.crypt_mode()? { | |
67 | CryptMode::Encrypt => bail!("Index and chunk CryptMode don't match."), | |
68 | CryptMode::SignOnly | CryptMode::None => Ok(chunk), | |
69 | } | |
70 | }, | |
71 | } | |
4d16badf | 72 | } |
7f99bf69 DM |
73 | } |
74 | ||
75 | impl ReadChunk for RemoteChunkReader { | |
e9764238 | 76 | fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> { |
16021f6a | 77 | block_on(Self::read_raw_chunk(self, digest)) |
3758b398 DM |
78 | } |
79 | ||
e9764238 DM |
80 | fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> { |
81 | if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) { | |
3758b398 DM |
82 | return Ok(raw_data.to_vec()); |
83 | } | |
4ee8f53d | 84 | |
4d16badf | 85 | let chunk = ReadChunk::read_raw_chunk(self, digest)?; |
7f99bf69 | 86 | |
8819d1f2 | 87 | let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?; |
3758b398 DM |
88 | |
89 | let use_cache = self.cache_hint.contains_key(digest); | |
f4bf7dfc | 90 | if use_cache { |
e9764238 | 91 | (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec()); |
f4bf7dfc DM |
92 | } |
93 | ||
7f99bf69 DM |
94 | Ok(raw_data) |
95 | } | |
96 | } | |
4d16badf WB |
97 | |
98 | impl AsyncReadChunk for RemoteChunkReader { | |
99 | fn read_raw_chunk<'a>( | |
e9764238 | 100 | &'a self, |
4d16badf WB |
101 | digest: &'a [u8; 32], |
102 | ) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> { | |
103 | Box::pin(Self::read_raw_chunk(self, digest)) | |
104 | } | |
105 | ||
106 | fn read_chunk<'a>( | |
e9764238 | 107 | &'a self, |
4d16badf WB |
108 | digest: &'a [u8; 32], |
109 | ) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> { | |
110 | Box::pin(async move { | |
e9764238 | 111 | if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) { |
4d16badf WB |
112 | return Ok(raw_data.to_vec()); |
113 | } | |
114 | ||
115 | let chunk = Self::read_raw_chunk(self, digest).await?; | |
116 | ||
8819d1f2 | 117 | let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?; |
4d16badf WB |
118 | |
119 | let use_cache = self.cache_hint.contains_key(digest); | |
120 | if use_cache { | |
e9764238 | 121 | (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec()); |
4d16badf WB |
122 | } |
123 | ||
124 | Ok(raw_data) | |
125 | }) | |
126 | } | |
127 | } |