]>
Commit | Line | Data |
---|---|---|
4d16badf | 1 | use std::future::Future; |
f4bf7dfc | 2 | use std::collections::HashMap; |
4d16badf | 3 | use std::pin::Pin; |
e9764238 | 4 | use std::sync::{Arc, Mutex}; |
3c0facc7 | 5 | |
14f6c9cb | 6 | use anyhow::{bail, Error}; |
7f99bf69 DM |
7 | |
8 | use super::BackupReader; | |
14f6c9cb | 9 | use crate::backup::{AsyncReadChunk, CryptConfig, CryptMode, DataBlob, ReadChunk}; |
d973aa82 | 10 | use crate::tools::runtime::block_on; |
7f99bf69 DM |
11 | |
12 | /// Read chunks from remote host using ``BackupReader`` | |
7443a6e0 | 13 | #[derive(Clone)] |
7f99bf69 DM |
14 | pub struct RemoteChunkReader { |
15 | client: Arc<BackupReader>, | |
16 | crypt_config: Option<Arc<CryptConfig>>, | |
14f6c9cb | 17 | crypt_mode: CryptMode, |
7ecfde81 | 18 | cache_hint: Arc<HashMap<[u8; 32], usize>>, |
7443a6e0 | 19 | cache: Arc<Mutex<HashMap<[u8; 32], Vec<u8>>>>, |
7f99bf69 DM |
20 | } |
21 | ||
22 | impl RemoteChunkReader { | |
f4bf7dfc DM |
23 | /// Create a new instance. |
24 | /// | |
25 | /// Chunks listed in ``cache_hint`` are cached and kept in RAM. | |
26 | pub fn new( | |
27 | client: Arc<BackupReader>, | |
28 | crypt_config: Option<Arc<CryptConfig>>, | |
14f6c9cb | 29 | crypt_mode: CryptMode, |
f4bf7dfc DM |
30 | cache_hint: HashMap<[u8; 32], usize>, |
31 | ) -> Self { | |
a609cf21 WB |
32 | Self { |
33 | client, | |
34 | crypt_config, | |
14f6c9cb | 35 | crypt_mode, |
7ecfde81 | 36 | cache_hint: Arc::new(cache_hint), |
7443a6e0 | 37 | cache: Arc::new(Mutex::new(HashMap::new())), |
a609cf21 | 38 | } |
7f99bf69 | 39 | } |
4d16badf | 40 | |
9e496ff6 FG |
41 | /// Downloads raw chunk. This only verifies the (untrusted) CRC32, use |
42 | /// DataBlob::verify_unencrypted or DataBlob::decode before storing/processing further. | |
e9764238 | 43 | pub async fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> { |
4d16badf WB |
44 | let mut chunk_data = Vec::with_capacity(4 * 1024 * 1024); |
45 | ||
46 | self.client | |
47 | .download_chunk(&digest, &mut chunk_data) | |
48 | .await?; | |
49 | ||
39f18b30 | 50 | let chunk = DataBlob::load_from_reader(&mut &chunk_data[..])?; |
4d16badf | 51 | |
14f6c9cb FG |
52 | match self.crypt_mode { |
53 | CryptMode::Encrypt => { | |
54 | match chunk.crypt_mode()? { | |
55 | CryptMode::Encrypt => Ok(chunk), | |
56 | CryptMode::SignOnly | CryptMode::None => bail!("Index and chunk CryptMode don't match."), | |
57 | } | |
58 | }, | |
59 | CryptMode::SignOnly | CryptMode::None => { | |
60 | match chunk.crypt_mode()? { | |
61 | CryptMode::Encrypt => bail!("Index and chunk CryptMode don't match."), | |
62 | CryptMode::SignOnly | CryptMode::None => Ok(chunk), | |
63 | } | |
64 | }, | |
65 | } | |
4d16badf | 66 | } |
7f99bf69 DM |
67 | } |
68 | ||
69 | impl ReadChunk for RemoteChunkReader { | |
e9764238 | 70 | fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> { |
16021f6a | 71 | block_on(Self::read_raw_chunk(self, digest)) |
3758b398 DM |
72 | } |
73 | ||
e9764238 DM |
74 | fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> { |
75 | if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) { | |
3758b398 DM |
76 | return Ok(raw_data.to_vec()); |
77 | } | |
4ee8f53d | 78 | |
4d16badf | 79 | let chunk = ReadChunk::read_raw_chunk(self, digest)?; |
7f99bf69 | 80 | |
8819d1f2 | 81 | let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?; |
3758b398 DM |
82 | |
83 | let use_cache = self.cache_hint.contains_key(digest); | |
f4bf7dfc | 84 | if use_cache { |
e9764238 | 85 | (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec()); |
f4bf7dfc DM |
86 | } |
87 | ||
7f99bf69 DM |
88 | Ok(raw_data) |
89 | } | |
90 | } | |
4d16badf WB |
91 | |
92 | impl AsyncReadChunk for RemoteChunkReader { | |
93 | fn read_raw_chunk<'a>( | |
e9764238 | 94 | &'a self, |
4d16badf WB |
95 | digest: &'a [u8; 32], |
96 | ) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> { | |
97 | Box::pin(Self::read_raw_chunk(self, digest)) | |
98 | } | |
99 | ||
100 | fn read_chunk<'a>( | |
e9764238 | 101 | &'a self, |
4d16badf WB |
102 | digest: &'a [u8; 32], |
103 | ) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> { | |
104 | Box::pin(async move { | |
e9764238 | 105 | if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) { |
4d16badf WB |
106 | return Ok(raw_data.to_vec()); |
107 | } | |
108 | ||
109 | let chunk = Self::read_raw_chunk(self, digest).await?; | |
110 | ||
8819d1f2 | 111 | let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?; |
4d16badf WB |
112 | |
113 | let use_cache = self.cache_hint.contains_key(digest); | |
114 | if use_cache { | |
e9764238 | 115 | (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec()); |
4d16badf WB |
116 | } |
117 | ||
118 | Ok(raw_data) | |
119 | }) | |
120 | } | |
121 | } |