5 use hyper
::http
::request
::Parts
;
7 use serde_json
::{json, Value}
;
11 use crate::api_schema
::*;
12 use crate::api_schema
::router
::*;
13 use crate::api2
::types
::*;
15 use super::environment
::*;
17 pub struct UploadChunk
{
19 store
: Arc
<DataStore
>,
23 raw_data
: Option
<Vec
<u8>>,
28 pub fn new(stream
: Body
, store
: Arc
<DataStore
>, digest
: [u8; 32], size
: u32, encoded_size
: u32) -> Self {
29 Self { stream, store, size, encoded_size, raw_data: Some(vec![]), digest }
33 impl Future
for UploadChunk
{
34 type Item
= ([u8; 32], u32, u32, bool
);
35 type Error
= failure
::Error
;
37 fn poll(&mut self) -> Poll
<([u8; 32], u32, u32, bool
), failure
::Error
> {
39 match try_ready
!(self.stream
.poll()) {
41 if let Some(ref mut raw_data
) = self.raw_data
{
42 if (raw_data
.len() + input
.len()) > (self.encoded_size
as usize) {
43 bail
!("uploaded chunk is larger than announced.");
45 raw_data
.extend_from_slice(&input
);
47 bail
!("poll upload chunk stream failed - already finished.");
51 if let Some(raw_data
) = self.raw_data
.take() {
52 if raw_data
.len() != (self.encoded_size
as usize) {
53 bail
!("uploaded chunk has unexpected size.");
56 let chunk
= DataChunk
::from_raw(raw_data
, self.digest
)?
;
58 let (is_duplicate
, compressed_size
) = self.store
.insert_chunk(&chunk
)?
;
60 return Ok(Async
::Ready((self.digest
, self.size
, compressed_size
as u32, is_duplicate
)))
62 bail
!("poll upload chunk stream failed - already finished.");
70 pub fn api_method_upload_fixed_chunk() -> ApiAsyncMethod
{
73 ObjectSchema
::new("Upload a new chunk.")
74 .required("wid", IntegerSchema
::new("Fixed writer ID.")
78 .required("digest", CHUNK_DIGEST_SCHEMA
.clone())
79 .required("size", IntegerSchema
::new("Chunk size.")
81 .maximum(1024*1024*16)
83 .required("encoded-size", IntegerSchema
::new("Encoded chunk size.")
85 // fixme: .maximum(1024*1024*16+40)
90 fn upload_fixed_chunk(
94 _info
: &ApiAsyncMethod
,
95 rpcenv
: Box
<dyn RpcEnvironment
>,
96 ) -> Result
<BoxFut
, Error
> {
98 let wid
= tools
::required_integer_param(¶m
, "wid")?
as usize;
99 let size
= tools
::required_integer_param(¶m
, "size")?
as u32;
100 let encoded_size
= tools
::required_integer_param(¶m
, "encoded-size")?
as u32;
102 let digest_str
= tools
::required_string_param(¶m
, "digest")?
;
103 let digest
= crate::tools
::hex_to_digest(digest_str
)?
;
105 let env
: &BackupEnvironment
= rpcenv
.as_ref();
107 let upload
= UploadChunk
::new(req_body
, env
.datastore
.clone(), digest
, size
, encoded_size
);
110 .then(move |result
| {
111 let env
: &BackupEnvironment
= rpcenv
.as_ref();
113 let result
= result
.and_then(|(digest
, size
, compressed_size
, is_duplicate
)| {
114 env
.register_fixed_chunk(wid
, digest
, size
, compressed_size
, is_duplicate
)?
;
115 let digest_str
= tools
::digest_to_hex(&digest
);
116 env
.debug(format
!("upload_chunk done: {} bytes, {}", size
, digest_str
));
117 Ok(json
!(digest_str
))
120 Ok(env
.format_response(result
))
126 pub fn api_method_upload_dynamic_chunk() -> ApiAsyncMethod
{
128 upload_dynamic_chunk
,
129 ObjectSchema
::new("Upload a new chunk.")
130 .required("wid", IntegerSchema
::new("Dynamic writer ID.")
134 .required("digest", CHUNK_DIGEST_SCHEMA
.clone())
135 .required("size", IntegerSchema
::new("Chunk size.")
137 .maximum(1024*1024*16)
139 .required("encoded-size", IntegerSchema
::new("Encoded chunk size.")
141 // fixme: .maximum(1024*1024*16+40)
146 fn upload_dynamic_chunk(
150 _info
: &ApiAsyncMethod
,
151 rpcenv
: Box
<dyn RpcEnvironment
>,
152 ) -> Result
<BoxFut
, Error
> {
154 let wid
= tools
::required_integer_param(¶m
, "wid")?
as usize;
155 let size
= tools
::required_integer_param(¶m
, "size")?
as u32;
156 let encoded_size
= tools
::required_integer_param(¶m
, "encoded-size")?
as u32;
158 let digest_str
= tools
::required_string_param(¶m
, "digest")?
;
159 let digest
= crate::tools
::hex_to_digest(digest_str
)?
;
161 let env
: &BackupEnvironment
= rpcenv
.as_ref();
163 let upload
= UploadChunk
::new(req_body
, env
.datastore
.clone(), digest
, size
, encoded_size
);
166 .then(move |result
| {
167 let env
: &BackupEnvironment
= rpcenv
.as_ref();
169 let result
= result
.and_then(|(digest
, size
, compressed_size
, is_duplicate
)| {
170 env
.register_dynamic_chunk(wid
, digest
, size
, compressed_size
, is_duplicate
)?
;
171 let digest_str
= tools
::digest_to_hex(&digest
);
172 env
.debug(format
!("upload_chunk done: {} bytes, {}", size
, digest_str
));
173 Ok(json
!(digest_str
))
176 Ok(env
.format_response(result
))
182 pub fn api_method_upload_speedtest() -> ApiAsyncMethod
{
185 ObjectSchema
::new("Test uploadf speed.")
193 _info
: &ApiAsyncMethod
,
194 rpcenv
: Box
<dyn RpcEnvironment
>,
195 ) -> Result
<BoxFut
, Error
> {
198 .map_err(Error
::from
)
199 .fold(0, |size
: usize, chunk
| -> Result
<usize, Error
> {
200 let sum
= size
+ chunk
.len();
201 //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
204 .then(move |result
| {
207 println
!("UPLOAD END {} bytes", size
);
210 println
!("Upload error: {}", err
);
213 let env
: &BackupEnvironment
= rpcenv
.as_ref();
214 Ok(env
.format_response(Ok(Value
::Null
)))
220 pub fn api_method_upload_config() -> ApiAsyncMethod
{
223 ObjectSchema
::new("Upload configuration file.")
224 .required("file-name", crate::api2
::types
::BACKUP_ARCHIVE_NAME_SCHEMA
.clone())
225 .required("size", IntegerSchema
::new("File size.")
227 .maximum(1024*1024*16)
236 _info
: &ApiAsyncMethod
,
237 rpcenv
: Box
<dyn RpcEnvironment
>,
238 ) -> Result
<BoxFut
, Error
> {
240 let mut file_name
= tools
::required_string_param(¶m
, "file-name")?
.to_owned();
241 let size
= tools
::required_integer_param(¶m
, "size")?
as usize;
243 if !file_name
.ends_with(".conf") {
244 bail
!("wrong config file extension: '{}'", file_name
);
246 file_name
.push_str(".zstd");
249 let env
: &BackupEnvironment
= rpcenv
.as_ref();
251 let mut path
= env
.datastore
.base_path();
252 path
.push(env
.backup_dir
.relative_path());
253 path
.push(&file_name
);
255 let env2
= env
.clone();
256 let env3
= env
.clone();
259 .map_err(Error
::from
)
261 .and_then(move |data
| {
262 if size
!= data
.len() {
263 bail
!("got configuration file with unexpected length ({} != {})", size
, data
.len());
266 let data
= zstd
::block
::compress(&data
, 0)?
;
268 tools
::file_set_contents(&path
, &data
, None
)?
;
270 env2
.debug(format
!("upload config {:?} ({} bytes, comp: {})", path
, size
, data
.len()));
275 Ok(env3
.format_response(Ok(Value
::Null
)))