]>
git.proxmox.com Git - proxmox-backup.git/blob - pbs-client/src/remote_chunk_reader.rs
1 use std
::future
::Future
;
2 use std
::collections
::HashMap
;
4 use std
::sync
::{Arc, Mutex}
;
6 use anyhow
::{bail, Error}
;
8 use proxmox_async
::runtime
::block_on
;
10 use pbs_tools
::crypt_config
::CryptConfig
;
11 use pbs_api_types
::CryptMode
;
12 use pbs_datastore
::data_blob
::DataBlob
;
13 use pbs_datastore
::read_chunk
::ReadChunk
;
14 use pbs_datastore
::read_chunk
::AsyncReadChunk
;
16 use super::BackupReader
;
18 /// Read chunks from remote host using ``BackupReader``
20 pub struct RemoteChunkReader
{
21 client
: Arc
<BackupReader
>,
22 crypt_config
: Option
<Arc
<CryptConfig
>>,
23 crypt_mode
: CryptMode
,
24 cache_hint
: Arc
<HashMap
<[u8; 32], usize>>,
25 cache
: Arc
<Mutex
<HashMap
<[u8; 32], Vec
<u8>>>>,
28 impl RemoteChunkReader
{
29 /// Create a new instance.
31 /// Chunks listed in ``cache_hint`` are cached and kept in RAM.
33 client
: Arc
<BackupReader
>,
34 crypt_config
: Option
<Arc
<CryptConfig
>>,
35 crypt_mode
: CryptMode
,
36 cache_hint
: HashMap
<[u8; 32], usize>,
42 cache_hint
: Arc
::new(cache_hint
),
43 cache
: Arc
::new(Mutex
::new(HashMap
::new())),
47 /// Downloads raw chunk. This only verifies the (untrusted) CRC32, use
48 /// DataBlob::verify_unencrypted or DataBlob::decode before storing/processing further.
49 pub async
fn read_raw_chunk(&self, digest
: &[u8; 32]) -> Result
<DataBlob
, Error
> {
50 let mut chunk_data
= Vec
::with_capacity(4 * 1024 * 1024);
53 .download_chunk(digest
, &mut chunk_data
)
56 let chunk
= DataBlob
::load_from_reader(&mut &chunk_data
[..])?
;
58 match self.crypt_mode
{
59 CryptMode
::Encrypt
=> {
60 match chunk
.crypt_mode()?
{
61 CryptMode
::Encrypt
=> Ok(chunk
),
62 CryptMode
::SignOnly
| CryptMode
::None
=> bail
!("Index and chunk CryptMode don't match."),
65 CryptMode
::SignOnly
| CryptMode
::None
=> {
66 match chunk
.crypt_mode()?
{
67 CryptMode
::Encrypt
=> bail
!("Index and chunk CryptMode don't match."),
68 CryptMode
::SignOnly
| CryptMode
::None
=> Ok(chunk
),
75 impl ReadChunk
for RemoteChunkReader
{
76 fn read_raw_chunk(&self, digest
: &[u8; 32]) -> Result
<DataBlob
, Error
> {
77 block_on(Self::read_raw_chunk(self, digest
))
80 fn read_chunk(&self, digest
: &[u8; 32]) -> Result
<Vec
<u8>, Error
> {
81 if let Some(raw_data
) = (*self.cache
.lock().unwrap()).get(digest
) {
82 return Ok(raw_data
.to_vec());
85 let chunk
= ReadChunk
::read_raw_chunk(self, digest
)?
;
87 let raw_data
= chunk
.decode(self.crypt_config
.as_ref().map(Arc
::as_ref
), Some(digest
))?
;
89 let use_cache
= self.cache_hint
.contains_key(digest
);
91 (*self.cache
.lock().unwrap()).insert(*digest
, raw_data
.to_vec());
98 impl AsyncReadChunk
for RemoteChunkReader
{
99 fn read_raw_chunk
<'a
>(
101 digest
: &'a
[u8; 32],
102 ) -> Pin
<Box
<dyn Future
<Output
= Result
<DataBlob
, Error
>> + Send
+ 'a
>> {
103 Box
::pin(Self::read_raw_chunk(self, digest
))
108 digest
: &'a
[u8; 32],
109 ) -> Pin
<Box
<dyn Future
<Output
= Result
<Vec
<u8>, Error
>> + Send
+ 'a
>> {
110 Box
::pin(async
move {
111 if let Some(raw_data
) = (*self.cache
.lock().unwrap()).get(digest
) {
112 return Ok(raw_data
.to_vec());
115 let chunk
= Self::read_raw_chunk(self, digest
).await?
;
117 let raw_data
= chunk
.decode(self.crypt_config
.as_ref().map(Arc
::as_ref
), Some(digest
))?
;
119 let use_cache
= self.cache_hint
.contains_key(digest
);
121 (*self.cache
.lock().unwrap()).insert(*digest
, raw_data
.to_vec());