]>
Commit | Line | Data |
---|---|---|
4d16badf | 1 | use std::future::Future; |
f4bf7dfc | 2 | use std::collections::HashMap; |
4d16badf | 3 | use std::pin::Pin; |
3c0facc7 WB |
4 | use std::sync::Arc; |
5 | ||
a609cf21 | 6 | use anyhow::Error; |
7f99bf69 DM |
7 | |
8 | use super::BackupReader; | |
4d16badf | 9 | use crate::backup::{AsyncReadChunk, CryptConfig, DataBlob, ReadChunk}; |
d973aa82 | 10 | use crate::tools::runtime::block_on; |
7f99bf69 DM |
11 | |
12 | /// Read chunks from remote host using ``BackupReader`` | |
13 | pub struct RemoteChunkReader { | |
14 | client: Arc<BackupReader>, | |
15 | crypt_config: Option<Arc<CryptConfig>>, | |
f4bf7dfc | 16 | cache_hint: HashMap<[u8; 32], usize>, |
a609cf21 | 17 | cache: HashMap<[u8; 32], Vec<u8>>, |
7f99bf69 DM |
18 | } |
19 | ||
20 | impl RemoteChunkReader { | |
f4bf7dfc DM |
21 | /// Create a new instance. |
22 | /// | |
23 | /// Chunks listed in ``cache_hint`` are cached and kept in RAM. | |
24 | pub fn new( | |
25 | client: Arc<BackupReader>, | |
26 | crypt_config: Option<Arc<CryptConfig>>, | |
27 | cache_hint: HashMap<[u8; 32], usize>, | |
28 | ) -> Self { | |
a609cf21 WB |
29 | Self { |
30 | client, | |
31 | crypt_config, | |
32 | cache_hint, | |
33 | cache: HashMap::new(), | |
34 | } | |
7f99bf69 | 35 | } |
4d16badf WB |
36 | |
37 | pub async fn read_raw_chunk(&mut self, digest: &[u8; 32]) -> Result<DataBlob, Error> { | |
38 | let mut chunk_data = Vec::with_capacity(4 * 1024 * 1024); | |
39 | ||
40 | self.client | |
41 | .download_chunk(&digest, &mut chunk_data) | |
42 | .await?; | |
43 | ||
44 | let chunk = DataBlob::from_raw(chunk_data)?; | |
45 | chunk.verify_crc()?; | |
46 | ||
47 | Ok(chunk) | |
48 | } | |
7f99bf69 DM |
49 | } |
50 | ||
51 | impl ReadChunk for RemoteChunkReader { | |
a609cf21 WB |
52 | fn read_raw_chunk(&mut self, digest: &[u8; 32]) -> Result<DataBlob, Error> { |
53 | let mut chunk_data = Vec::with_capacity(4 * 1024 * 1024); | |
7f99bf69 | 54 | |
d973aa82 WB |
55 | //tokio::task::block_in_place(|| futures::executor::block_on(self.client.download_chunk(&digest, &mut chunk_data)))?; |
56 | block_on(async { | |
57 | // download_chunk returns the writer back to us, but we need to return a 'static value | |
58 | self.client | |
59 | .download_chunk(&digest, &mut chunk_data) | |
60 | .await | |
61 | .map(drop) | |
62 | })?; | |
7f99bf69 | 63 | |
4ee8f53d | 64 | let chunk = DataBlob::from_raw(chunk_data)?; |
7f99bf69 DM |
65 | chunk.verify_crc()?; |
66 | ||
3758b398 DM |
67 | Ok(chunk) |
68 | } | |
69 | ||
a609cf21 | 70 | fn read_chunk(&mut self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> { |
3758b398 DM |
71 | if let Some(raw_data) = self.cache.get(digest) { |
72 | return Ok(raw_data.to_vec()); | |
73 | } | |
4ee8f53d | 74 | |
4d16badf | 75 | let chunk = ReadChunk::read_raw_chunk(self, digest)?; |
7f99bf69 | 76 | |
a609cf21 | 77 | let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref))?; |
3758b398 DM |
78 | |
79 | // fixme: verify digest? | |
80 | ||
81 | let use_cache = self.cache_hint.contains_key(digest); | |
f4bf7dfc DM |
82 | if use_cache { |
83 | self.cache.insert(*digest, raw_data.to_vec()); | |
84 | } | |
85 | ||
7f99bf69 DM |
86 | Ok(raw_data) |
87 | } | |
88 | } | |
4d16badf WB |
89 | |
90 | impl AsyncReadChunk for RemoteChunkReader { | |
91 | fn read_raw_chunk<'a>( | |
92 | &'a mut self, | |
93 | digest: &'a [u8; 32], | |
94 | ) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> { | |
95 | Box::pin(Self::read_raw_chunk(self, digest)) | |
96 | } | |
97 | ||
98 | fn read_chunk<'a>( | |
99 | &'a mut self, | |
100 | digest: &'a [u8; 32], | |
101 | ) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> { | |
102 | Box::pin(async move { | |
103 | if let Some(raw_data) = self.cache.get(digest) { | |
104 | return Ok(raw_data.to_vec()); | |
105 | } | |
106 | ||
107 | let chunk = Self::read_raw_chunk(self, digest).await?; | |
108 | ||
109 | let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref))?; | |
110 | ||
111 | // fixme: verify digest? | |
112 | ||
113 | let use_cache = self.cache_hint.contains_key(digest); | |
114 | if use_cache { | |
115 | self.cache.insert(*digest, raw_data.to_vec()); | |
116 | } | |
117 | ||
118 | Ok(raw_data) | |
119 | }) | |
120 | } | |
121 | } |