1 use anyhow
::{format_err, Error}
;
2 use std
::io
::{Write, Seek, SeekFrom}
;
5 use std
::os
::unix
::fs
::OpenOptionsExt
;
7 use futures
::future
::AbortHandle
;
8 use serde_json
::{json, Value}
;
10 use proxmox
::tools
::digest_to_hex
;
12 use pbs_tools
::crypt_config
::CryptConfig
;
13 use pbs_tools
::sha
::sha256
;
14 use pbs_datastore
::{PROXMOX_BACKUP_READER_PROTOCOL_ID_V1, BackupManifest}
;
15 use pbs_datastore
::data_blob
::DataBlob
;
16 use pbs_datastore
::data_blob_reader
::DataBlobReader
;
17 use pbs_datastore
::dynamic_index
::DynamicIndexReader
;
18 use pbs_datastore
::fixed_index
::FixedIndexReader
;
19 use pbs_datastore
::index
::IndexFile
;
20 use pbs_datastore
::manifest
::MANIFEST_BLOB_NAME
;
22 use super::{HttpClient, H2Client}
;
25 pub struct BackupReader
{
28 crypt_config
: Option
<Arc
<CryptConfig
>>,
31 impl Drop
for BackupReader
{
40 fn new(h2
: H2Client
, abort
: AbortHandle
, crypt_config
: Option
<Arc
<CryptConfig
>>) -> Arc
<Self> {
41 Arc
::new(Self { h2, abort, crypt_config}
)
44 /// Create a new instance by upgrading the connection at '/api2/json/reader'
47 crypt_config
: Option
<Arc
<CryptConfig
>>,
53 ) -> Result
<Arc
<BackupReader
>, Error
> {
56 "backup-type": backup_type
,
57 "backup-id": backup_id
,
58 "backup-time": backup_time
,
62 let req
= HttpClient
::request_builder(client
.server(), client
.port(), "GET", "/api2/json/reader", Some(param
)).unwrap();
64 let (h2
, abort
) = client
.start_h2_connection(req
, String
::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1
!())).await?
;
66 Ok(BackupReader
::new(h2
, abort
, crypt_config
))
69 /// Execute a GET request
74 ) -> Result
<Value
, Error
> {
75 self.h2
.get(path
, param
).await
78 /// Execute a PUT request
83 ) -> Result
<Value
, Error
> {
84 self.h2
.put(path
, param
).await
87 /// Execute a POST request
92 ) -> Result
<Value
, Error
> {
93 self.h2
.post(path
, param
).await
96 /// Execute a GET request and send output to a writer
97 pub async
fn download
<W
: Write
+ Send
>(
101 ) -> Result
<(), Error
> {
102 let path
= "download";
103 let param
= json
!({ "file-name": file_name }
);
104 self.h2
.download(path
, Some(param
), output
).await
107 /// Execute a special GET request and send output to a writer
109 /// This writes random data, and is only useful to test download speed.
110 pub async
fn speedtest
<W
: Write
+ Send
>(
113 ) -> Result
<(), Error
> {
114 self.h2
.download("speedtest", None
, output
).await
117 /// Download a specific chunk
118 pub async
fn download_chunk
<W
: Write
+ Send
>(
122 ) -> Result
<(), Error
> {
124 let param
= json
!({ "digest": digest_to_hex(digest) }
);
125 self.h2
.download(path
, Some(param
), output
).await
128 pub fn force_close(self) {
132 /// Download backup manifest (index.json)
134 /// The manifest signature is verified if we have a crypt_config.
135 pub async
fn download_manifest(&self) -> Result
<(BackupManifest
, Vec
<u8>), Error
> {
137 let mut raw_data
= Vec
::with_capacity(64 * 1024);
138 self.download(MANIFEST_BLOB_NAME
, &mut raw_data
).await?
;
139 let blob
= DataBlob
::load_from_reader(&mut &raw_data
[..])?
;
140 // no expected digest available
141 let data
= blob
.decode(None
, None
)?
;
143 let manifest
= BackupManifest
::from_data(&data
[..], self.crypt_config
.as_ref().map(Arc
::as_ref
))?
;
148 /// Download a .blob file
150 /// This creates a temporary file in /tmp (using O_TMPFILE). The data is verified using
151 /// the provided manifest.
152 pub async
fn download_blob(
154 manifest
: &BackupManifest
,
156 ) -> Result
<DataBlobReader
<'_
, File
>, Error
> {
158 let mut tmpfile
= std
::fs
::OpenOptions
::new()
161 .custom_flags(libc
::O_TMPFILE
)
164 self.download(name
, &mut tmpfile
).await?
;
166 tmpfile
.seek(SeekFrom
::Start(0))?
;
167 let (csum
, size
) = sha256(&mut tmpfile
)?
;
168 manifest
.verify_file(name
, &csum
, size
)?
;
170 tmpfile
.seek(SeekFrom
::Start(0))?
;
172 DataBlobReader
::new(tmpfile
, self.crypt_config
.clone())
175 /// Download dynamic index file
177 /// This creates a temporary file in /tmp (using O_TMPFILE). The index is verified using
178 /// the provided manifest.
179 pub async
fn download_dynamic_index(
181 manifest
: &BackupManifest
,
183 ) -> Result
<DynamicIndexReader
, Error
> {
185 let mut tmpfile
= std
::fs
::OpenOptions
::new()
188 .custom_flags(libc
::O_TMPFILE
)
191 self.download(name
, &mut tmpfile
).await?
;
193 let index
= DynamicIndexReader
::new(tmpfile
)
194 .map_err(|err
| format_err
!("unable to read dynamic index '{}' - {}", name
, err
))?
;
196 // Note: do not use values stored in index (not trusted) - instead, computed them again
197 let (csum
, size
) = index
.compute_csum();
198 manifest
.verify_file(name
, &csum
, size
)?
;
203 /// Download fixed index file
205 /// This creates a temporary file in /tmp (using O_TMPFILE). The index is verified using
206 /// the provided manifest.
207 pub async
fn download_fixed_index(
209 manifest
: &BackupManifest
,
211 ) -> Result
<FixedIndexReader
, Error
> {
213 let mut tmpfile
= std
::fs
::OpenOptions
::new()
216 .custom_flags(libc
::O_TMPFILE
)
219 self.download(name
, &mut tmpfile
).await?
;
221 let index
= FixedIndexReader
::new(tmpfile
)
222 .map_err(|err
| format_err
!("unable to read fixed index '{}' - {}", name
, err
))?
;
224 // Note: do not use values stored in index (not trusted) - instead, computed them again
225 let (csum
, size
) = index
.compute_csum();
226 manifest
.verify_file(name
, &csum
, size
)?
;