3 use std
::task
::{Context, Poll}
;
5 use anyhow
::{bail, format_err, Error}
;
8 use hyper
::http
::request
::Parts
;
9 use serde_json
::{json, Value}
;
11 use proxmox
::{sortable, identity}
;
12 use proxmox
::api
::{ApiResponseFuture, ApiHandler, ApiMethod, RpcEnvironment}
;
13 use proxmox
::api
::schema
::*;
15 use pbs_tools
::json
::{required_integer_param, required_string_param}
;
17 use crate::api2
::types
::*;
20 use super::environment
::*;
22 pub struct UploadChunk
{
24 store
: Arc
<DataStore
>,
28 raw_data
: Option
<Vec
<u8>>,
32 pub fn new(stream
: Body
, store
: Arc
<DataStore
>, digest
: [u8; 32], size
: u32, encoded_size
: u32) -> Self {
33 Self { stream, store, size, encoded_size, raw_data: Some(vec![]), digest }
37 impl Future
for UploadChunk
{
38 type Output
= Result
<([u8; 32], u32, u32, bool
), Error
>;
40 fn poll(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Self::Output
> {
41 let this
= self.get_mut();
43 let err
: Error
= loop {
44 match ready
!(Pin
::new(&mut this
.stream
).poll_next(cx
)) {
45 Some(Err(err
)) => return Poll
::Ready(Err(Error
::from(err
))),
47 if let Some(ref mut raw_data
) = this
.raw_data
{
48 if (raw_data
.len() + input
.len()) > (this
.encoded_size
as usize) {
49 break format_err
!("uploaded chunk is larger than announced.");
51 raw_data
.extend_from_slice(&input
);
53 break format_err
!("poll upload chunk stream failed - already finished.");
57 if let Some(raw_data
) = this
.raw_data
.take() {
58 if raw_data
.len() != (this
.encoded_size
as usize) {
59 break format_err
!("uploaded chunk has unexpected size.");
62 let (is_duplicate
, compressed_size
) = match proxmox
::try_block
! {
63 let mut chunk
= DataBlob
::from_raw(raw_data
)?
;
65 pbs_runtime
::block_in_place(|| {
66 chunk
.verify_unencrypted(this
.size
as usize, &this
.digest
)?
;
68 // always comput CRC at server side
69 chunk
.set_crc(chunk
.compute_crc());
71 this
.store
.insert_chunk(&chunk
, &this
.digest
)
76 Err(err
) => break err
,
79 return Poll
::Ready(Ok((this
.digest
, this
.size
, compressed_size
as u32, is_duplicate
)))
81 break format_err
!("poll upload chunk stream failed - already finished.");
91 pub const API_METHOD_UPLOAD_FIXED_CHUNK
: ApiMethod
= ApiMethod
::new(
92 &ApiHandler
::AsyncHttp(&upload_fixed_chunk
),
94 "Upload a new chunk.",
96 ("wid", false, &IntegerSchema
::new("Fixed writer ID.")
101 ("digest", false, &CHUNK_DIGEST_SCHEMA
),
102 ("size", false, &IntegerSchema
::new("Chunk size.")
104 .maximum(1024*1024*16)
107 ("encoded-size", false, &IntegerSchema
::new("Encoded chunk size.")
108 .minimum((std
::mem
::size_of
::<DataBlobHeader
>() as isize)+1)
109 .maximum(1024*1024*16+(std
::mem
::size_of
::<EncryptedDataBlobHeader
>() as isize))
116 fn upload_fixed_chunk(
121 rpcenv
: Box
<dyn RpcEnvironment
>,
122 ) -> ApiResponseFuture
{
125 let wid
= required_integer_param(¶m
, "wid")?
as usize;
126 let size
= required_integer_param(¶m
, "size")?
as u32;
127 let encoded_size
= required_integer_param(¶m
, "encoded-size")?
as u32;
129 let digest_str
= required_string_param(¶m
, "digest")?
;
130 let digest
= proxmox
::tools
::hex_to_digest(digest_str
)?
;
132 let env
: &BackupEnvironment
= rpcenv
.as_ref();
134 let (digest
, size
, compressed_size
, is_duplicate
) =
135 UploadChunk
::new(req_body
, env
.datastore
.clone(), digest
, size
, encoded_size
).await?
;
137 env
.register_fixed_chunk(wid
, digest
, size
, compressed_size
, is_duplicate
)?
;
138 let digest_str
= proxmox
::tools
::digest_to_hex(&digest
);
139 env
.debug(format
!("upload_chunk done: {} bytes, {}", size
, digest_str
));
141 let result
= Ok(json
!(digest_str
));
143 Ok(env
.format_response(result
))
149 pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK
: ApiMethod
= ApiMethod
::new(
150 &ApiHandler
::AsyncHttp(&upload_dynamic_chunk
),
152 "Upload a new chunk.",
154 ("wid", false, &IntegerSchema
::new("Dynamic writer ID.")
159 ("digest", false, &CHUNK_DIGEST_SCHEMA
),
160 ("size", false, &IntegerSchema
::new("Chunk size.")
162 .maximum(1024*1024*16)
165 ("encoded-size", false, &IntegerSchema
::new("Encoded chunk size.")
166 .minimum((std
::mem
::size_of
::<DataBlobHeader
>() as isize) +1)
167 .maximum(1024*1024*16+(std
::mem
::size_of
::<EncryptedDataBlobHeader
>() as isize))
174 fn upload_dynamic_chunk(
179 rpcenv
: Box
<dyn RpcEnvironment
>,
180 ) -> ApiResponseFuture
{
183 let wid
= required_integer_param(¶m
, "wid")?
as usize;
184 let size
= required_integer_param(¶m
, "size")?
as u32;
185 let encoded_size
= required_integer_param(¶m
, "encoded-size")?
as u32;
187 let digest_str
= required_string_param(¶m
, "digest")?
;
188 let digest
= proxmox
::tools
::hex_to_digest(digest_str
)?
;
190 let env
: &BackupEnvironment
= rpcenv
.as_ref();
192 let (digest
, size
, compressed_size
, is_duplicate
) =
193 UploadChunk
::new(req_body
, env
.datastore
.clone(), digest
, size
, encoded_size
)
196 env
.register_dynamic_chunk(wid
, digest
, size
, compressed_size
, is_duplicate
)?
;
197 let digest_str
= proxmox
::tools
::digest_to_hex(&digest
);
198 env
.debug(format
!("upload_chunk done: {} bytes, {}", size
, digest_str
));
200 let result
= Ok(json
!(digest_str
));
201 Ok(env
.format_response(result
))
205 pub const API_METHOD_UPLOAD_SPEEDTEST
: ApiMethod
= ApiMethod
::new(
206 &ApiHandler
::AsyncHttp(&upload_speedtest
),
207 &ObjectSchema
::new("Test upload speed.", &[])
215 rpcenv
: Box
<dyn RpcEnvironment
>,
216 ) -> ApiResponseFuture
{
220 let result
= req_body
221 .map_err(Error
::from
)
222 .try_fold(0, |size
: usize, chunk
| {
223 let sum
= size
+ chunk
.len();
224 //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
225 future
::ok
::<usize, Error
>(sum
)
231 println
!("UPLOAD END {} bytes", size
);
234 println
!("Upload error: {}", err
);
237 let env
: &BackupEnvironment
= rpcenv
.as_ref();
238 Ok(env
.format_response(Ok(Value
::Null
)))
243 pub const API_METHOD_UPLOAD_BLOB
: ApiMethod
= ApiMethod
::new(
244 &ApiHandler
::AsyncHttp(&upload_blob
),
246 "Upload binary blob file.",
248 ("file-name", false, &crate::api2
::types
::BACKUP_ARCHIVE_NAME_SCHEMA
),
249 ("encoded-size", false, &IntegerSchema
::new("Encoded blob size.")
250 .minimum(std
::mem
::size_of
::<DataBlobHeader
>() as isize)
251 .maximum(1024*1024*16+(std
::mem
::size_of
::<EncryptedDataBlobHeader
>() as isize))
263 rpcenv
: Box
<dyn RpcEnvironment
>,
264 ) -> ApiResponseFuture
{
267 let file_name
= required_string_param(¶m
, "file-name")?
.to_owned();
268 let encoded_size
= required_integer_param(¶m
, "encoded-size")?
as usize;
270 let env
: &BackupEnvironment
= rpcenv
.as_ref();
272 if !file_name
.ends_with(".blob") {
273 bail
!("wrong blob file extension: '{}'", file_name
);
277 .map_err(Error
::from
)
278 .try_fold(Vec
::new(), |mut acc
, chunk
| {
279 acc
.extend_from_slice(&*chunk
);
280 future
::ok
::<_
, Error
>(acc
)
284 if encoded_size
!= data
.len() {
285 bail
!("got blob with unexpected length ({} != {})", encoded_size
, data
.len());
288 env
.add_blob(&file_name
, data
)?
;
290 Ok(env
.format_response(Ok(Value
::Null
)))