3 use std
::task
::{Context, Poll}
;
5 use anyhow
::{bail, format_err, Error}
;
8 use hyper
::http
::request
::Parts
;
9 use serde_json
::{json, Value}
;
12 use proxmox_sys
::{sortable, identity}
;
13 use proxmox_router
::{ApiResponseFuture, ApiHandler, ApiMethod, RpcEnvironment}
;
14 use proxmox_schema
::*;
16 use pbs_datastore
::{DataStore, DataBlob}
;
17 use pbs_datastore
::file_formats
::{DataBlobHeader, EncryptedDataBlobHeader}
;
18 use pbs_tools
::json
::{required_integer_param, required_string_param}
;
19 use pbs_api_types
::{CHUNK_DIGEST_SCHEMA, BACKUP_ARCHIVE_NAME_SCHEMA}
;
21 use super::environment
::*;
23 pub struct UploadChunk
{
25 store
: Arc
<DataStore
>,
29 raw_data
: Option
<Vec
<u8>>,
33 pub fn new(stream
: Body
, store
: Arc
<DataStore
>, digest
: [u8; 32], size
: u32, encoded_size
: u32) -> Self {
34 Self { stream, store, size, encoded_size, raw_data: Some(vec![]), digest }
38 impl Future
for UploadChunk
{
39 type Output
= Result
<([u8; 32], u32, u32, bool
), Error
>;
41 fn poll(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Self::Output
> {
42 let this
= self.get_mut();
44 let err
: Error
= loop {
45 match ready
!(Pin
::new(&mut this
.stream
).poll_next(cx
)) {
46 Some(Err(err
)) => return Poll
::Ready(Err(Error
::from(err
))),
48 if let Some(ref mut raw_data
) = this
.raw_data
{
49 if (raw_data
.len() + input
.len()) > (this
.encoded_size
as usize) {
50 break format_err
!("uploaded chunk is larger than announced.");
52 raw_data
.extend_from_slice(&input
);
54 break format_err
!("poll upload chunk stream failed - already finished.");
58 if let Some(raw_data
) = this
.raw_data
.take() {
59 if raw_data
.len() != (this
.encoded_size
as usize) {
60 break format_err
!("uploaded chunk has unexpected size.");
63 let (is_duplicate
, compressed_size
) = match proxmox_lang
::try_block
! {
64 let mut chunk
= DataBlob
::from_raw(raw_data
)?
;
66 proxmox_async
::runtime
::block_in_place(|| {
67 chunk
.verify_unencrypted(this
.size
as usize, &this
.digest
)?
;
69 // always comput CRC at server side
70 chunk
.set_crc(chunk
.compute_crc());
72 this
.store
.insert_chunk(&chunk
, &this
.digest
)
77 Err(err
) => break err
,
80 return Poll
::Ready(Ok((this
.digest
, this
.size
, compressed_size
as u32, is_duplicate
)))
82 break format_err
!("poll upload chunk stream failed - already finished.");
92 pub const API_METHOD_UPLOAD_FIXED_CHUNK
: ApiMethod
= ApiMethod
::new(
93 &ApiHandler
::AsyncHttp(&upload_fixed_chunk
),
95 "Upload a new chunk.",
97 ("wid", false, &IntegerSchema
::new("Fixed writer ID.")
102 ("digest", false, &CHUNK_DIGEST_SCHEMA
),
103 ("size", false, &IntegerSchema
::new("Chunk size.")
105 .maximum(1024*1024*16)
108 ("encoded-size", false, &IntegerSchema
::new("Encoded chunk size.")
109 .minimum((std
::mem
::size_of
::<DataBlobHeader
>() as isize)+1)
110 .maximum(1024*1024*16+(std
::mem
::size_of
::<EncryptedDataBlobHeader
>() as isize))
117 fn upload_fixed_chunk(
122 rpcenv
: Box
<dyn RpcEnvironment
>,
123 ) -> ApiResponseFuture
{
126 let wid
= required_integer_param(¶m
, "wid")?
as usize;
127 let size
= required_integer_param(¶m
, "size")?
as u32;
128 let encoded_size
= required_integer_param(¶m
, "encoded-size")?
as u32;
130 let digest_str
= required_string_param(¶m
, "digest")?
;
131 let digest
= <[u8; 32]>::from_hex(digest_str
)?
;
133 let env
: &BackupEnvironment
= rpcenv
.as_ref();
135 let (digest
, size
, compressed_size
, is_duplicate
) =
136 UploadChunk
::new(req_body
, env
.datastore
.clone(), digest
, size
, encoded_size
).await?
;
138 env
.register_fixed_chunk(wid
, digest
, size
, compressed_size
, is_duplicate
)?
;
139 let digest_str
= hex
::encode(&digest
);
140 env
.debug(format
!("upload_chunk done: {} bytes, {}", size
, digest_str
));
142 let result
= Ok(json
!(digest_str
));
144 Ok(env
.format_response(result
))
150 pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK
: ApiMethod
= ApiMethod
::new(
151 &ApiHandler
::AsyncHttp(&upload_dynamic_chunk
),
153 "Upload a new chunk.",
155 ("wid", false, &IntegerSchema
::new("Dynamic writer ID.")
160 ("digest", false, &CHUNK_DIGEST_SCHEMA
),
161 ("size", false, &IntegerSchema
::new("Chunk size.")
163 .maximum(1024*1024*16)
166 ("encoded-size", false, &IntegerSchema
::new("Encoded chunk size.")
167 .minimum((std
::mem
::size_of
::<DataBlobHeader
>() as isize) +1)
168 .maximum(1024*1024*16+(std
::mem
::size_of
::<EncryptedDataBlobHeader
>() as isize))
175 fn upload_dynamic_chunk(
180 rpcenv
: Box
<dyn RpcEnvironment
>,
181 ) -> ApiResponseFuture
{
184 let wid
= required_integer_param(¶m
, "wid")?
as usize;
185 let size
= required_integer_param(¶m
, "size")?
as u32;
186 let encoded_size
= required_integer_param(¶m
, "encoded-size")?
as u32;
188 let digest_str
= required_string_param(¶m
, "digest")?
;
189 let digest
= <[u8; 32]>::from_hex(digest_str
)?
;
191 let env
: &BackupEnvironment
= rpcenv
.as_ref();
193 let (digest
, size
, compressed_size
, is_duplicate
) =
194 UploadChunk
::new(req_body
, env
.datastore
.clone(), digest
, size
, encoded_size
)
197 env
.register_dynamic_chunk(wid
, digest
, size
, compressed_size
, is_duplicate
)?
;
198 let digest_str
= hex
::encode(&digest
);
199 env
.debug(format
!("upload_chunk done: {} bytes, {}", size
, digest_str
));
201 let result
= Ok(json
!(digest_str
));
202 Ok(env
.format_response(result
))
206 pub const API_METHOD_UPLOAD_SPEEDTEST
: ApiMethod
= ApiMethod
::new(
207 &ApiHandler
::AsyncHttp(&upload_speedtest
),
208 &ObjectSchema
::new("Test upload speed.", &[])
216 rpcenv
: Box
<dyn RpcEnvironment
>,
217 ) -> ApiResponseFuture
{
221 let result
= req_body
222 .map_err(Error
::from
)
223 .try_fold(0, |size
: usize, chunk
| {
224 let sum
= size
+ chunk
.len();
225 //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
226 future
::ok
::<usize, Error
>(sum
)
232 println
!("UPLOAD END {} bytes", size
);
235 println
!("Upload error: {}", err
);
238 let env
: &BackupEnvironment
= rpcenv
.as_ref();
239 Ok(env
.format_response(Ok(Value
::Null
)))
244 pub const API_METHOD_UPLOAD_BLOB
: ApiMethod
= ApiMethod
::new(
245 &ApiHandler
::AsyncHttp(&upload_blob
),
247 "Upload binary blob file.",
249 ("file-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA
),
250 ("encoded-size", false, &IntegerSchema
::new("Encoded blob size.")
251 .minimum(std
::mem
::size_of
::<DataBlobHeader
>() as isize)
252 .maximum(1024*1024*16+(std
::mem
::size_of
::<EncryptedDataBlobHeader
>() as isize))
264 rpcenv
: Box
<dyn RpcEnvironment
>,
265 ) -> ApiResponseFuture
{
268 let file_name
= required_string_param(¶m
, "file-name")?
.to_owned();
269 let encoded_size
= required_integer_param(¶m
, "encoded-size")?
as usize;
271 let env
: &BackupEnvironment
= rpcenv
.as_ref();
273 if !file_name
.ends_with(".blob") {
274 bail
!("wrong blob file extension: '{}'", file_name
);
278 .map_err(Error
::from
)
279 .try_fold(Vec
::new(), |mut acc
, chunk
| {
280 acc
.extend_from_slice(&*chunk
);
281 future
::ok
::<_
, Error
>(acc
)
285 if encoded_size
!= data
.len() {
286 bail
!("got blob with unexpected length ({} != {})", encoded_size
, data
.len());
289 env
.add_blob(&file_name
, data
)?
;
291 Ok(env
.format_response(Ok(Value
::Null
)))