3 use std
::task
::{Context, Poll}
;
8 use hyper
::http
::request
::Parts
;
9 use serde_json
::{json, Value}
;
11 use proxmox
::{sortable, identity}
;
13 use crate::api2
::types
::*;
14 use crate::api_schema
::*;
15 use crate::api_schema
::router
::*;
19 use super::environment
::*;
21 pub struct UploadChunk
{
23 store
: Arc
<DataStore
>,
27 raw_data
: Option
<Vec
<u8>>,
31 pub fn new(stream
: Body
, store
: Arc
<DataStore
>, digest
: [u8; 32], size
: u32, encoded_size
: u32) -> Self {
32 Self { stream, store, size, encoded_size, raw_data: Some(vec![]), digest }
36 impl Future
for UploadChunk
{
37 type Output
= Result
<([u8; 32], u32, u32, bool
), Error
>;
39 fn poll(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Self::Output
> {
40 let this
= self.get_mut();
42 let err
: Error
= loop {
43 match ready
!(Pin
::new(&mut this
.stream
).poll_next(cx
)) {
44 Some(Err(err
)) => return Poll
::Ready(Err(Error
::from(err
))),
46 if let Some(ref mut raw_data
) = this
.raw_data
{
47 if (raw_data
.len() + input
.len()) > (this
.encoded_size
as usize) {
48 break format_err
!("uploaded chunk is larger than announced.");
50 raw_data
.extend_from_slice(&input
);
52 break format_err
!("poll upload chunk stream failed - already finished.");
56 if let Some(raw_data
) = this
.raw_data
.take() {
57 if raw_data
.len() != (this
.encoded_size
as usize) {
58 break format_err
!("uploaded chunk has unexpected size.");
61 let (is_duplicate
, compressed_size
) = match proxmox
::tools
::try_block
! {
62 let mut chunk
= DataBlob
::from_raw(raw_data
)?
;
64 chunk
.verify_unencrypted(this
.size
as usize, &this
.digest
)?
;
66 // always comput CRC at server side
67 chunk
.set_crc(chunk
.compute_crc());
69 this
.store
.insert_chunk(&chunk
, &this
.digest
)
72 Err(err
) => break err
,
75 return Poll
::Ready(Ok((this
.digest
, this
.size
, compressed_size
as u32, is_duplicate
)))
77 break format_err
!("poll upload chunk stream failed - already finished.");
87 pub const API_METHOD_UPLOAD_FIXED_CHUNK
: ApiMethod
= ApiMethod
::new(
88 &ApiHandler
::Async(&upload_fixed_chunk
),
90 "Upload a new chunk.",
92 ("wid", false, &IntegerSchema
::new("Fixed writer ID.")
97 ("digest", false, &CHUNK_DIGEST_SCHEMA
),
98 ("size", false, &IntegerSchema
::new("Chunk size.")
100 .maximum(1024*1024*16)
103 ("encoded-size", false, &IntegerSchema
::new("Encoded chunk size.")
104 .minimum((std
::mem
::size_of
::<DataBlobHeader
>() as isize)+1)
105 .maximum(1024*1024*16+(std
::mem
::size_of
::<EncryptedDataBlobHeader
>() as isize))
112 fn upload_fixed_chunk(
117 rpcenv
: Box
<dyn RpcEnvironment
>,
118 ) -> Result
<ApiFuture
, Error
> {
120 let wid
= tools
::required_integer_param(¶m
, "wid")?
as usize;
121 let size
= tools
::required_integer_param(¶m
, "size")?
as u32;
122 let encoded_size
= tools
::required_integer_param(¶m
, "encoded-size")?
as u32;
124 let digest_str
= tools
::required_string_param(¶m
, "digest")?
;
125 let digest
= proxmox
::tools
::hex_to_digest(digest_str
)?
;
127 let env
: &BackupEnvironment
= rpcenv
.as_ref();
129 let upload
= UploadChunk
::new(req_body
, env
.datastore
.clone(), digest
, size
, encoded_size
);
132 .then(move |result
| {
133 let env
: &BackupEnvironment
= rpcenv
.as_ref();
135 let result
= result
.and_then(|(digest
, size
, compressed_size
, is_duplicate
)| {
136 env
.register_fixed_chunk(wid
, digest
, size
, compressed_size
, is_duplicate
)?
;
137 let digest_str
= proxmox
::tools
::digest_to_hex(&digest
);
138 env
.debug(format
!("upload_chunk done: {} bytes, {}", size
, digest_str
));
139 Ok(json
!(digest_str
))
142 future
::ok(env
.format_response(result
))
149 pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK
: ApiMethod
= ApiMethod
::new(
150 &ApiHandler
::Async(&upload_dynamic_chunk
),
152 "Upload a new chunk.",
154 ("wid", false, &IntegerSchema
::new("Dynamic writer ID.")
159 ("digest", false, &CHUNK_DIGEST_SCHEMA
),
160 ("size", false, &IntegerSchema
::new("Chunk size.")
162 .maximum(1024*1024*16)
165 ("encoded-size", false, &IntegerSchema
::new("Encoded chunk size.")
166 .minimum((std
::mem
::size_of
::<DataBlobHeader
>() as isize) +1)
167 .maximum(1024*1024*16+(std
::mem
::size_of
::<EncryptedDataBlobHeader
>() as isize))
174 fn upload_dynamic_chunk(
179 rpcenv
: Box
<dyn RpcEnvironment
>,
180 ) -> Result
<ApiFuture
, Error
> {
182 let wid
= tools
::required_integer_param(¶m
, "wid")?
as usize;
183 let size
= tools
::required_integer_param(¶m
, "size")?
as u32;
184 let encoded_size
= tools
::required_integer_param(¶m
, "encoded-size")?
as u32;
186 let digest_str
= tools
::required_string_param(¶m
, "digest")?
;
187 let digest
= proxmox
::tools
::hex_to_digest(digest_str
)?
;
189 let env
: &BackupEnvironment
= rpcenv
.as_ref();
191 let upload
= UploadChunk
::new(req_body
, env
.datastore
.clone(), digest
, size
, encoded_size
);
194 .then(move |result
| {
195 let env
: &BackupEnvironment
= rpcenv
.as_ref();
197 let result
= result
.and_then(|(digest
, size
, compressed_size
, is_duplicate
)| {
198 env
.register_dynamic_chunk(wid
, digest
, size
, compressed_size
, is_duplicate
)?
;
199 let digest_str
= proxmox
::tools
::digest_to_hex(&digest
);
200 env
.debug(format
!("upload_chunk done: {} bytes, {}", size
, digest_str
));
201 Ok(json
!(digest_str
))
204 future
::ok(env
.format_response(result
))
210 pub const API_METHOD_UPLOAD_SPEEDTEST
: ApiMethod
= ApiMethod
::new(
211 &ApiHandler
::Async(&upload_speedtest
),
212 &ObjectSchema
::new("Test upload speed.", &[])
220 rpcenv
: Box
<dyn RpcEnvironment
>,
221 ) -> Result
<ApiFuture
, Error
> {
224 .map_err(Error
::from
)
225 .try_fold(0, |size
: usize, chunk
| {
226 let sum
= size
+ chunk
.len();
227 //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
228 future
::ok
::<usize, Error
>(sum
)
230 .then(move |result
| {
233 println
!("UPLOAD END {} bytes", size
);
236 println
!("Upload error: {}", err
);
239 let env
: &BackupEnvironment
= rpcenv
.as_ref();
240 future
::ok(env
.format_response(Ok(Value
::Null
)))
247 pub const API_METHOD_UPLOAD_BLOB
: ApiMethod
= ApiMethod
::new(
248 &ApiHandler
::Async(&upload_blob
),
250 "Upload binary blob file.",
252 ("file-name", false, &crate::api2
::types
::BACKUP_ARCHIVE_NAME_SCHEMA
),
253 ("encoded-size", false, &IntegerSchema
::new("Encoded blob size.")
254 .minimum((std
::mem
::size_of
::<DataBlobHeader
>() as isize) +1)
255 .maximum(1024*1024*16+(std
::mem
::size_of
::<EncryptedDataBlobHeader
>() as isize))
267 rpcenv
: Box
<dyn RpcEnvironment
>,
268 ) -> Result
<ApiFuture
, Error
> {
270 let file_name
= tools
::required_string_param(¶m
, "file-name")?
.to_owned();
271 let encoded_size
= tools
::required_integer_param(¶m
, "encoded-size")?
as usize;
274 let env
: &BackupEnvironment
= rpcenv
.as_ref();
276 if !file_name
.ends_with(".blob") {
277 bail
!("wrong blob file extension: '{}'", file_name
);
280 let env2
= env
.clone();
281 let env3
= env
.clone();
284 .map_err(Error
::from
)
285 .try_fold(Vec
::new(), |mut acc
, chunk
| {
286 acc
.extend_from_slice(&*chunk
);
287 future
::ok
::<_
, Error
>(acc
)
289 .and_then(move |data
| async
move {
290 if encoded_size
!= data
.len() {
291 bail
!("got blob with unexpected length ({} != {})", encoded_size
, data
.len());
294 env2
.add_blob(&file_name
, data
)?
;
299 future
::ok(env3
.format_response(Ok(Value
::Null
)))