5 use hyper
::http
::request
::Parts
;
7 use serde_json
::{json, Value}
;
11 use crate::api_schema
::*;
12 use crate::api_schema
::router
::*;
14 use super::environment
::*;
16 pub struct UploadChunk
{
18 store
: Arc
<DataStore
>,
25 pub fn new(stream
: Body
, store
: Arc
<DataStore
>, size
: u32) -> Self {
26 Self { stream, store, size, chunk: vec![] }
30 impl Future
for UploadChunk
{
31 type Item
= ([u8; 32], u32, u32, bool
);
32 type Error
= failure
::Error
;
34 fn poll(&mut self) -> Poll
<([u8; 32], u32, u32, bool
), failure
::Error
> {
36 match try_ready
!(self.stream
.poll()) {
38 if (self.chunk
.len() + chunk
.len()) > (self.size
as usize) {
39 bail
!("uploaded chunk is larger than announced.");
41 self.chunk
.extend_from_slice(&chunk
);
44 if self.chunk
.len() != (self.size
as usize) {
45 bail
!("uploaded chunk has unexpected size.");
48 let (is_duplicate
, digest
, compressed_size
) = self.store
.insert_chunk(&self.chunk
)?
;
50 return Ok(Async
::Ready((digest
, self.size
, compressed_size
as u32, is_duplicate
)))
57 pub fn api_method_upload_fixed_chunk() -> ApiAsyncMethod
{
60 ObjectSchema
::new("Upload a new chunk.")
61 .required("wid", IntegerSchema
::new("Fixed writer ID.")
65 .required("size", IntegerSchema
::new("Chunk size.")
67 .maximum(1024*1024*16)
72 fn upload_fixed_chunk(
76 _info
: &ApiAsyncMethod
,
77 rpcenv
: Box
<RpcEnvironment
>,
78 ) -> Result
<BoxFut
, Error
> {
80 let wid
= tools
::required_integer_param(¶m
, "wid")?
as usize;
81 let size
= tools
::required_integer_param(¶m
, "size")?
as u32;
83 let env
: &BackupEnvironment
= rpcenv
.as_ref();
85 let upload
= UploadChunk
::new(req_body
, env
.datastore
.clone(), size
);
89 let env
: &BackupEnvironment
= rpcenv
.as_ref();
91 let result
= result
.and_then(|(digest
, size
, compressed_size
, is_duplicate
)| {
92 env
.register_fixed_chunk(wid
, digest
, size
, compressed_size
, is_duplicate
)?
;
93 let digest_str
= tools
::digest_to_hex(&digest
);
94 env
.debug(format
!("upload_chunk done: {} bytes, {}", size
, digest_str
));
98 Ok(env
.format_response(result
))
104 pub fn api_method_upload_dynamic_chunk() -> ApiAsyncMethod
{
106 upload_dynamic_chunk
,
107 ObjectSchema
::new("Upload a new chunk.")
108 .required("wid", IntegerSchema
::new("Dynamic writer ID.")
112 .required("size", IntegerSchema
::new("Chunk size.")
114 .maximum(1024*1024*16)
119 fn upload_dynamic_chunk(
123 _info
: &ApiAsyncMethod
,
124 rpcenv
: Box
<RpcEnvironment
>,
125 ) -> Result
<BoxFut
, Error
> {
127 let wid
= tools
::required_integer_param(¶m
, "wid")?
as usize;
128 let size
= tools
::required_integer_param(¶m
, "size")?
as u32;
130 let env
: &BackupEnvironment
= rpcenv
.as_ref();
132 let upload
= UploadChunk
::new(req_body
, env
.datastore
.clone(), size
);
135 .then(move |result
| {
136 let env
: &BackupEnvironment
= rpcenv
.as_ref();
138 let result
= result
.and_then(|(digest
, size
, compressed_size
, is_duplicate
)| {
139 env
.register_dynamic_chunk(wid
, digest
, size
, compressed_size
, is_duplicate
)?
;
140 let digest_str
= tools
::digest_to_hex(&digest
);
141 env
.debug(format
!("upload_chunk done: {} bytes, {}", size
, digest_str
));
142 Ok(json
!(digest_str
))
145 Ok(env
.format_response(result
))
151 pub fn api_method_upload_speedtest() -> ApiAsyncMethod
{
154 ObjectSchema
::new("Test uploadf speed.")
162 _info
: &ApiAsyncMethod
,
163 rpcenv
: Box
<RpcEnvironment
>,
164 ) -> Result
<BoxFut
, Error
> {
167 .map_err(Error
::from
)
168 .fold(0, |size
: usize, chunk
| -> Result
<usize, Error
> {
169 let sum
= size
+ chunk
.len();
170 //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
173 .then(move |result
| {
176 println
!("UPLOAD END {} bytes", size
);
179 println
!("Upload error: {}", err
);
182 let env
: &BackupEnvironment
= rpcenv
.as_ref();
183 Ok(env
.format_response(Ok(Value
::Null
)))
189 pub fn api_method_upload_config() -> ApiAsyncMethod
{
192 ObjectSchema
::new("Upload configuration file.")
193 .required("file-name", crate::api2
::types
::BACKUP_ARCHIVE_NAME_SCHEMA
.clone())
194 .required("size", IntegerSchema
::new("File size.")
196 .maximum(1024*1024*16)
205 _info
: &ApiAsyncMethod
,
206 rpcenv
: Box
<RpcEnvironment
>,
207 ) -> Result
<BoxFut
, Error
> {
209 let mut file_name
= tools
::required_string_param(¶m
, "file-name")?
.to_owned();
210 let size
= tools
::required_integer_param(¶m
, "size")?
as usize;
212 if !file_name
.ends_with(".conf") {
213 bail
!("wrong config file extension: '{}'", file_name
);
215 file_name
.push_str(".zstd");
218 let env
: &BackupEnvironment
= rpcenv
.as_ref();
220 let mut path
= env
.datastore
.base_path();
221 path
.push(env
.backup_dir
.relative_path());
222 path
.push(&file_name
);
224 let env2
= env
.clone();
225 let env3
= env
.clone();
228 .map_err(Error
::from
)
230 .and_then(move |data
| {
231 if size
!= data
.len() {
232 bail
!("got configuration file with unexpected length ({} != {})", size
, data
.len());
235 let data
= zstd
::block
::compress(&data
, 0)?
;
237 tools
::file_set_contents(&path
, &data
, None
)?
;
239 env2
.debug(format
!("upload config {:?} ({} bytes, comp: {})", path
, size
, data
.len()));
244 Ok(env3
.format_response(Ok(Value
::Null
)))