1 use anyhow
::{format_err, Error}
;
2 use std
::io
::{Read, Write, Seek, SeekFrom}
;
5 use std
::os
::unix
::fs
::OpenOptionsExt
;
7 use chrono
::{DateTime, Utc}
;
8 use futures
::future
::AbortHandle
;
9 use serde_json
::{json, Value}
;
11 use proxmox
::tools
::digest_to_hex
;
15 use super::{HttpClient, H2Client}
;
18 pub struct BackupReader
{
21 crypt_config
: Option
<Arc
<CryptConfig
>>,
24 impl Drop
for BackupReader
{
33 fn new(h2
: H2Client
, abort
: AbortHandle
, crypt_config
: Option
<Arc
<CryptConfig
>>) -> Arc
<Self> {
34 Arc
::new(Self { h2, abort, crypt_config}
)
37 /// Create a new instance by upgrading the connection at '/api2/json/reader'
40 crypt_config
: Option
<Arc
<CryptConfig
>>,
44 backup_time
: DateTime
<Utc
>,
46 ) -> Result
<Arc
<BackupReader
>, Error
> {
49 "backup-type": backup_type
,
50 "backup-id": backup_id
,
51 "backup-time": backup_time
.timestamp(),
55 let req
= HttpClient
::request_builder(client
.server(), "GET", "/api2/json/reader", Some(param
)).unwrap();
57 let (h2
, abort
) = client
.start_h2_connection(req
, String
::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1
!())).await?
;
59 Ok(BackupReader
::new(h2
, abort
, crypt_config
))
62 /// Execute a GET request
67 ) -> Result
<Value
, Error
> {
68 self.h2
.get(path
, param
).await
71 /// Execute a PUT request
76 ) -> Result
<Value
, Error
> {
77 self.h2
.put(path
, param
).await
80 /// Execute a POST request
85 ) -> Result
<Value
, Error
> {
86 self.h2
.post(path
, param
).await
89 /// Execute a GET request and send output to a writer
90 pub async
fn download
<W
: Write
+ Send
>(
94 ) -> Result
<W
, Error
> {
95 let path
= "download";
96 let param
= json
!({ "file-name": file_name }
);
97 self.h2
.download(path
, Some(param
), output
).await
100 /// Execute a special GET request and send output to a writer
102 /// This writes random data, and is only useful to test download speed.
103 pub async
fn speedtest
<W
: Write
+ Send
>(
106 ) -> Result
<W
, Error
> {
107 self.h2
.download("speedtest", None
, output
).await
110 /// Download a specific chunk
111 pub async
fn download_chunk
<W
: Write
+ Send
>(
115 ) -> Result
<W
, Error
> {
117 let param
= json
!({ "digest": digest_to_hex(digest) }
);
118 self.h2
.download(path
, Some(param
), output
).await
121 pub fn force_close(self) {
125 /// Download backup manifest (index.json)
126 pub async
fn download_manifest(&self) -> Result
<BackupManifest
, Error
> {
128 use std
::convert
::TryFrom
;
130 let raw_data
= self.download(MANIFEST_BLOB_NAME
, Vec
::with_capacity(64*1024)).await?
;
131 let blob
= DataBlob
::from_raw(raw_data
)?
;
133 let data
= blob
.decode(self.crypt_config
.as_ref().map(Arc
::as_ref
))?
;
134 let json
: Value
= serde_json
::from_slice(&data
[..])?
;
136 BackupManifest
::try_from(json
)
139 /// Download a .blob file
141 /// This creates a temporary file in /tmp (using O_TMPFILE). The data is verified using
142 /// the provided manifest.
143 pub async
fn download_blob(
145 manifest
: &BackupManifest
,
147 ) -> Result
<DataBlobReader
<File
>, Error
> {
149 let tmpfile
= std
::fs
::OpenOptions
::new()
152 .custom_flags(libc
::O_TMPFILE
)
155 let mut tmpfile
= self.download(name
, tmpfile
).await?
;
157 let (csum
, size
) = compute_file_csum(&mut tmpfile
)?
;
158 manifest
.verify_file(name
, &csum
, size
)?
;
160 tmpfile
.seek(SeekFrom
::Start(0))?
;
162 DataBlobReader
::new(tmpfile
, self.crypt_config
.clone())
165 /// Download dynamic index file
167 /// This creates a temporary file in /tmp (using O_TMPFILE). The index is verified using
168 /// the provided manifest.
169 pub async
fn download_dynamic_index(
171 manifest
: &BackupManifest
,
173 ) -> Result
<DynamicIndexReader
, Error
> {
175 let tmpfile
= std
::fs
::OpenOptions
::new()
178 .custom_flags(libc
::O_TMPFILE
)
181 let tmpfile
= self.download(name
, tmpfile
).await?
;
183 let index
= DynamicIndexReader
::new(tmpfile
)
184 .map_err(|err
| format_err
!("unable to read dynamic index '{}' - {}", name
, err
))?
;
186 // Note: do not use values stored in index (not trusted) - instead, computed them again
187 let (csum
, size
) = index
.compute_csum();
188 manifest
.verify_file(name
, &csum
, size
)?
;
193 /// Download fixed index file
195 /// This creates a temporary file in /tmp (using O_TMPFILE). The index is verified using
196 /// the provided manifest.
197 pub async
fn download_fixed_index(
199 manifest
: &BackupManifest
,
201 ) -> Result
<FixedIndexReader
, Error
> {
203 let tmpfile
= std
::fs
::OpenOptions
::new()
206 .custom_flags(libc
::O_TMPFILE
)
209 let tmpfile
= self.download(name
, tmpfile
).await?
;
211 let index
= FixedIndexReader
::new(tmpfile
)
212 .map_err(|err
| format_err
!("unable to read fixed index '{}' - {}", name
, err
))?
;
214 // Note: do not use values stored in index (not trusted) - instead, computed them again
215 let (csum
, size
) = index
.compute_csum();
216 manifest
.verify_file(name
, &csum
, size
)?
;
222 pub fn compute_file_csum(file
: &mut File
) -> Result
<([u8; 32], u64), Error
> {
224 file
.seek(SeekFrom
::Start(0))?
;
226 let mut hasher
= openssl
::sha
::Sha256
::new();
227 let mut buffer
= proxmox
::tools
::vec
::undefined(256*1024);
228 let mut size
: u64 = 0;
231 let count
= match file
.read(&mut buffer
) {
233 Err(ref err
) if err
.kind() == std
::io
::ErrorKind
::Interrupted
=> { continue; }
234 Err(err
) => return Err(err
.into()),
239 size
+= count
as u64;
240 hasher
.update(&buffer
[..count
]);
243 let csum
= hasher
.finish();