3 use std
::task
::{Context, Poll}
;
5 use anyhow
::{bail, format_err, Error}
;
8 use hyper
::http
::request
::Parts
;
9 use serde_json
::{json, Value}
;
11 use proxmox
::{sortable, identity}
;
12 use proxmox
::api
::{ApiResponseFuture, ApiHandler, ApiMethod, RpcEnvironment}
;
13 use proxmox
::api
::schema
::*;
15 use crate::api2
::types
::*;
19 use super::environment
::*;
21 pub struct UploadChunk
{
23 store
: Arc
<DataStore
>,
27 raw_data
: Option
<Vec
<u8>>,
31 pub fn new(stream
: Body
, store
: Arc
<DataStore
>, digest
: [u8; 32], size
: u32, encoded_size
: u32) -> Self {
32 Self { stream, store, size, encoded_size, raw_data: Some(vec![]), digest }
36 impl Future
for UploadChunk
{
37 type Output
= Result
<([u8; 32], u32, u32, bool
), Error
>;
39 fn poll(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Self::Output
> {
40 let this
= self.get_mut();
42 let err
: Error
= loop {
43 match ready
!(Pin
::new(&mut this
.stream
).poll_next(cx
)) {
44 Some(Err(err
)) => return Poll
::Ready(Err(Error
::from(err
))),
46 if let Some(ref mut raw_data
) = this
.raw_data
{
47 if (raw_data
.len() + input
.len()) > (this
.encoded_size
as usize) {
48 break format_err
!("uploaded chunk is larger than announced.");
50 raw_data
.extend_from_slice(&input
);
52 break format_err
!("poll upload chunk stream failed - already finished.");
56 if let Some(raw_data
) = this
.raw_data
.take() {
57 if raw_data
.len() != (this
.encoded_size
as usize) {
58 break format_err
!("uploaded chunk has unexpected size.");
61 let (is_duplicate
, compressed_size
) = match proxmox
::try_block
! {
62 let mut chunk
= DataBlob
::from_raw(raw_data
)?
;
64 chunk
.verify_unencrypted(this
.size
as usize, &this
.digest
)?
;
66 // always comput CRC at server side
67 chunk
.set_crc(chunk
.compute_crc());
69 this
.store
.insert_chunk(&chunk
, &this
.digest
)
72 Err(err
) => break err
,
75 return Poll
::Ready(Ok((this
.digest
, this
.size
, compressed_size
as u32, is_duplicate
)))
77 break format_err
!("poll upload chunk stream failed - already finished.");
87 pub const API_METHOD_UPLOAD_FIXED_CHUNK
: ApiMethod
= ApiMethod
::new(
88 &ApiHandler
::AsyncHttp(&upload_fixed_chunk
),
90 "Upload a new chunk.",
92 ("wid", false, &IntegerSchema
::new("Fixed writer ID.")
97 ("digest", false, &CHUNK_DIGEST_SCHEMA
),
98 ("size", false, &IntegerSchema
::new("Chunk size.")
100 .maximum(1024*1024*16)
103 ("encoded-size", false, &IntegerSchema
::new("Encoded chunk size.")
104 .minimum((std
::mem
::size_of
::<DataBlobHeader
>() as isize)+1)
105 .maximum(1024*1024*16+(std
::mem
::size_of
::<EncryptedDataBlobHeader
>() as isize))
112 fn upload_fixed_chunk(
117 rpcenv
: Box
<dyn RpcEnvironment
>,
118 ) -> ApiResponseFuture
{
121 let wid
= tools
::required_integer_param(¶m
, "wid")?
as usize;
122 let size
= tools
::required_integer_param(¶m
, "size")?
as u32;
123 let encoded_size
= tools
::required_integer_param(¶m
, "encoded-size")?
as u32;
125 let digest_str
= tools
::required_string_param(¶m
, "digest")?
;
126 let digest
= proxmox
::tools
::hex_to_digest(digest_str
)?
;
128 let env
: &BackupEnvironment
= rpcenv
.as_ref();
130 let (digest
, size
, compressed_size
, is_duplicate
) =
131 UploadChunk
::new(req_body
, env
.datastore
.clone(), digest
, size
, encoded_size
).await?
;
133 env
.register_fixed_chunk(wid
, digest
, size
, compressed_size
, is_duplicate
)?
;
134 let digest_str
= proxmox
::tools
::digest_to_hex(&digest
);
135 env
.debug(format
!("upload_chunk done: {} bytes, {}", size
, digest_str
));
137 let result
= Ok(json
!(digest_str
));
139 Ok(env
.format_response(result
))
145 pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK
: ApiMethod
= ApiMethod
::new(
146 &ApiHandler
::AsyncHttp(&upload_dynamic_chunk
),
148 "Upload a new chunk.",
150 ("wid", false, &IntegerSchema
::new("Dynamic writer ID.")
155 ("digest", false, &CHUNK_DIGEST_SCHEMA
),
156 ("size", false, &IntegerSchema
::new("Chunk size.")
158 .maximum(1024*1024*16)
161 ("encoded-size", false, &IntegerSchema
::new("Encoded chunk size.")
162 .minimum((std
::mem
::size_of
::<DataBlobHeader
>() as isize) +1)
163 .maximum(1024*1024*16+(std
::mem
::size_of
::<EncryptedDataBlobHeader
>() as isize))
170 fn upload_dynamic_chunk(
175 rpcenv
: Box
<dyn RpcEnvironment
>,
176 ) -> ApiResponseFuture
{
179 let wid
= tools
::required_integer_param(¶m
, "wid")?
as usize;
180 let size
= tools
::required_integer_param(¶m
, "size")?
as u32;
181 let encoded_size
= tools
::required_integer_param(¶m
, "encoded-size")?
as u32;
183 let digest_str
= tools
::required_string_param(¶m
, "digest")?
;
184 let digest
= proxmox
::tools
::hex_to_digest(digest_str
)?
;
186 let env
: &BackupEnvironment
= rpcenv
.as_ref();
188 let (digest
, size
, compressed_size
, is_duplicate
) =
189 UploadChunk
::new(req_body
, env
.datastore
.clone(), digest
, size
, encoded_size
)
192 env
.register_dynamic_chunk(wid
, digest
, size
, compressed_size
, is_duplicate
)?
;
193 let digest_str
= proxmox
::tools
::digest_to_hex(&digest
);
194 env
.debug(format
!("upload_chunk done: {} bytes, {}", size
, digest_str
));
196 let result
= Ok(json
!(digest_str
));
197 Ok(env
.format_response(result
))
201 pub const API_METHOD_UPLOAD_SPEEDTEST
: ApiMethod
= ApiMethod
::new(
202 &ApiHandler
::AsyncHttp(&upload_speedtest
),
203 &ObjectSchema
::new("Test upload speed.", &[])
211 rpcenv
: Box
<dyn RpcEnvironment
>,
212 ) -> ApiResponseFuture
{
216 let result
= req_body
217 .map_err(Error
::from
)
218 .try_fold(0, |size
: usize, chunk
| {
219 let sum
= size
+ chunk
.len();
220 //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
221 future
::ok
::<usize, Error
>(sum
)
227 println
!("UPLOAD END {} bytes", size
);
230 println
!("Upload error: {}", err
);
233 let env
: &BackupEnvironment
= rpcenv
.as_ref();
234 Ok(env
.format_response(Ok(Value
::Null
)))
239 pub const API_METHOD_UPLOAD_BLOB
: ApiMethod
= ApiMethod
::new(
240 &ApiHandler
::AsyncHttp(&upload_blob
),
242 "Upload binary blob file.",
244 ("file-name", false, &crate::api2
::types
::BACKUP_ARCHIVE_NAME_SCHEMA
),
245 ("encoded-size", false, &IntegerSchema
::new("Encoded blob size.")
246 .minimum((std
::mem
::size_of
::<DataBlobHeader
>() as isize) +1)
247 .maximum(1024*1024*16+(std
::mem
::size_of
::<EncryptedDataBlobHeader
>() as isize))
259 rpcenv
: Box
<dyn RpcEnvironment
>,
260 ) -> ApiResponseFuture
{
263 let file_name
= tools
::required_string_param(¶m
, "file-name")?
.to_owned();
264 let encoded_size
= tools
::required_integer_param(¶m
, "encoded-size")?
as usize;
266 let env
: &BackupEnvironment
= rpcenv
.as_ref();
268 if !file_name
.ends_with(".blob") {
269 bail
!("wrong blob file extension: '{}'", file_name
);
273 .map_err(Error
::from
)
274 .try_fold(Vec
::new(), |mut acc
, chunk
| {
275 acc
.extend_from_slice(&*chunk
);
276 future
::ok
::<_
, Error
>(acc
)
280 if encoded_size
!= data
.len() {
281 bail
!("got blob with unexpected length ({} != {})", encoded_size
, data
.len());
284 env
.add_blob(&file_name
, data
)?
;
286 Ok(env
.format_response(Ok(Value
::Null
)))