3 use std
::task
::{Context, Poll}
;
5 use anyhow
::{bail, format_err, Error}
;
8 use hyper
::http
::request
::Parts
;
10 use serde_json
::{json, Value}
;
12 use proxmox_router
::{ApiHandler, ApiMethod, ApiResponseFuture, RpcEnvironment}
;
13 use proxmox_schema
::*;
14 use proxmox_sys
::sortable
;
16 use pbs_api_types
::{BACKUP_ARCHIVE_NAME_SCHEMA, CHUNK_DIGEST_SCHEMA}
;
17 use pbs_datastore
::file_formats
::{DataBlobHeader, EncryptedDataBlobHeader}
;
18 use pbs_datastore
::{DataBlob, DataStore}
;
19 use pbs_tools
::json
::{required_integer_param, required_string_param}
;
21 use super::environment
::*;
23 pub struct UploadChunk
{
25 store
: Arc
<DataStore
>,
29 raw_data
: Option
<Vec
<u8>>,
35 store
: Arc
<DataStore
>,
45 raw_data
: Some(vec
![]),
51 impl Future
for UploadChunk
{
52 type Output
= Result
<([u8; 32], u32, u32, bool
), Error
>;
54 fn poll(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Self::Output
> {
55 let this
= self.get_mut();
57 let err
: Error
= loop {
58 match ready
!(Pin
::new(&mut this
.stream
).poll_next(cx
)) {
59 Some(Err(err
)) => return Poll
::Ready(Err(Error
::from(err
))),
61 if let Some(ref mut raw_data
) = this
.raw_data
{
62 if (raw_data
.len() + input
.len()) > (this
.encoded_size
as usize) {
63 break format_err
!("uploaded chunk is larger than announced.");
65 raw_data
.extend_from_slice(&input
);
67 break format_err
!("poll upload chunk stream failed - already finished.");
71 if let Some(raw_data
) = this
.raw_data
.take() {
72 if raw_data
.len() != (this
.encoded_size
as usize) {
73 break format_err
!("uploaded chunk has unexpected size.");
76 let (is_duplicate
, compressed_size
) = match proxmox_lang
::try_block
! {
77 let mut chunk
= DataBlob
::from_raw(raw_data
)?
;
79 proxmox_async
::runtime
::block_in_place(|| {
80 chunk
.verify_unencrypted(this
.size
as usize, &this
.digest
)?
;
82 // always comput CRC at server side
83 chunk
.set_crc(chunk
.compute_crc());
85 this
.store
.insert_chunk(&chunk
, &this
.digest
)
90 Err(err
) => break err
,
93 return Poll
::Ready(Ok((
96 compressed_size
as u32,
100 break format_err
!("poll upload chunk stream failed - already finished.");
105 Poll
::Ready(Err(err
))
110 pub const API_METHOD_UPLOAD_FIXED_CHUNK
: ApiMethod
= ApiMethod
::new(
111 &ApiHandler
::AsyncHttp(&upload_fixed_chunk
),
113 "Upload a new chunk.",
118 &IntegerSchema
::new("Fixed writer ID.")
123 ("digest", false, &CHUNK_DIGEST_SCHEMA
),
127 &IntegerSchema
::new("Chunk size.")
129 .maximum(1024 * 1024 * 16)
135 &IntegerSchema
::new("Encoded chunk size.")
136 .minimum((std
::mem
::size_of
::<DataBlobHeader
>() as isize) + 1)
139 + (std
::mem
::size_of
::<EncryptedDataBlobHeader
>() as isize)
147 fn upload_fixed_chunk(
152 rpcenv
: Box
<dyn RpcEnvironment
>,
153 ) -> ApiResponseFuture
{
155 let wid
= required_integer_param(¶m
, "wid")?
as usize;
156 let size
= required_integer_param(¶m
, "size")?
as u32;
157 let encoded_size
= required_integer_param(¶m
, "encoded-size")?
as u32;
159 let digest_str
= required_string_param(¶m
, "digest")?
;
160 let digest
= <[u8; 32]>::from_hex(digest_str
)?
;
162 let env
: &BackupEnvironment
= rpcenv
.as_ref();
164 let (digest
, size
, compressed_size
, is_duplicate
) =
165 UploadChunk
::new(req_body
, env
.datastore
.clone(), digest
, size
, encoded_size
).await?
;
167 env
.register_fixed_chunk(wid
, digest
, size
, compressed_size
, is_duplicate
)?
;
168 let digest_str
= hex
::encode(digest
);
169 env
.debug(format
!("upload_chunk done: {} bytes, {}", size
, digest_str
));
171 let result
= Ok(json
!(digest_str
));
173 Ok(env
.format_response(result
))
179 pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK
: ApiMethod
= ApiMethod
::new(
180 &ApiHandler
::AsyncHttp(&upload_dynamic_chunk
),
182 "Upload a new chunk.",
187 &IntegerSchema
::new("Dynamic writer ID.")
192 ("digest", false, &CHUNK_DIGEST_SCHEMA
),
196 &IntegerSchema
::new("Chunk size.")
198 .maximum(1024 * 1024 * 16)
204 &IntegerSchema
::new("Encoded chunk size.")
205 .minimum((std
::mem
::size_of
::<DataBlobHeader
>() as isize) + 1)
208 + (std
::mem
::size_of
::<EncryptedDataBlobHeader
>() as isize)
216 fn upload_dynamic_chunk(
221 rpcenv
: Box
<dyn RpcEnvironment
>,
222 ) -> ApiResponseFuture
{
224 let wid
= required_integer_param(¶m
, "wid")?
as usize;
225 let size
= required_integer_param(¶m
, "size")?
as u32;
226 let encoded_size
= required_integer_param(¶m
, "encoded-size")?
as u32;
228 let digest_str
= required_string_param(¶m
, "digest")?
;
229 let digest
= <[u8; 32]>::from_hex(digest_str
)?
;
231 let env
: &BackupEnvironment
= rpcenv
.as_ref();
233 let (digest
, size
, compressed_size
, is_duplicate
) =
234 UploadChunk
::new(req_body
, env
.datastore
.clone(), digest
, size
, encoded_size
).await?
;
236 env
.register_dynamic_chunk(wid
, digest
, size
, compressed_size
, is_duplicate
)?
;
237 let digest_str
= hex
::encode(digest
);
238 env
.debug(format
!("upload_chunk done: {} bytes, {}", size
, digest_str
));
240 let result
= Ok(json
!(digest_str
));
241 Ok(env
.format_response(result
))
246 pub const API_METHOD_UPLOAD_SPEEDTEST
: ApiMethod
= ApiMethod
::new(
247 &ApiHandler
::AsyncHttp(&upload_speedtest
),
248 &ObjectSchema
::new("Test upload speed.", &[]),
256 rpcenv
: Box
<dyn RpcEnvironment
>,
257 ) -> ApiResponseFuture
{
259 let result
= req_body
260 .map_err(Error
::from
)
261 .try_fold(0, |size
: usize, chunk
| {
262 let sum
= size
+ chunk
.len();
263 //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
264 future
::ok
::<usize, Error
>(sum
)
270 println
!("UPLOAD END {} bytes", size
);
273 println
!("Upload error: {}", err
);
276 let env
: &BackupEnvironment
= rpcenv
.as_ref();
277 Ok(env
.format_response(Ok(Value
::Null
)))
283 pub const API_METHOD_UPLOAD_BLOB
: ApiMethod
= ApiMethod
::new(
284 &ApiHandler
::AsyncHttp(&upload_blob
),
286 "Upload binary blob file.",
288 ("file-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA
),
292 &IntegerSchema
::new("Encoded blob size.")
293 .minimum(std
::mem
::size_of
::<DataBlobHeader
>() as isize)
296 + (std
::mem
::size_of
::<EncryptedDataBlobHeader
>() as isize)
309 rpcenv
: Box
<dyn RpcEnvironment
>,
310 ) -> ApiResponseFuture
{
312 let file_name
= required_string_param(¶m
, "file-name")?
.to_owned();
313 let encoded_size
= required_integer_param(¶m
, "encoded-size")?
as usize;
315 let env
: &BackupEnvironment
= rpcenv
.as_ref();
317 if !file_name
.ends_with(".blob") {
318 bail
!("wrong blob file extension: '{}'", file_name
);
322 .map_err(Error
::from
)
323 .try_fold(Vec
::new(), |mut acc
, chunk
| {
324 acc
.extend_from_slice(&chunk
);
325 future
::ok
::<_
, Error
>(acc
)
329 if encoded_size
!= data
.len() {
331 "got blob with unexpected length ({} != {})",
337 env
.add_blob(&file_name
, data
)?
;
339 Ok(env
.format_response(Ok(Value
::Null
)))