]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup/upload_chunk.rs
src/api2.rs: move backup api to /backup
[proxmox-backup.git] / src / api2 / backup / upload_chunk.rs
1 use failure::*;
2 use futures::*;
3 use std::sync::Arc;
4
5 use hyper::http::request::Parts;
6 use hyper::Body;
7 use serde_json::{json, Value};
8
9 use crate::tools;
10 use crate::backup::*;
11 use crate::api_schema::*;
12 use crate::api_schema::router::*;
13
14 use super::environment::*;
15
16 pub struct UploadChunk {
17 stream: Body,
18 store: Arc<DataStore>,
19 size: u32,
20 chunk: Vec<u8>,
21 }
22
23 impl UploadChunk {
24
25 pub fn new(stream: Body, store: Arc<DataStore>, size: u32) -> Self {
26 Self { stream, store, size, chunk: vec![] }
27 }
28 }
29
30 impl Future for UploadChunk {
31 type Item = ([u8; 32], u32, u32, bool);
32 type Error = failure::Error;
33
34 fn poll(&mut self) -> Poll<([u8; 32], u32, u32, bool), failure::Error> {
35 loop {
36 match try_ready!(self.stream.poll()) {
37 Some(chunk) => {
38 if (self.chunk.len() + chunk.len()) > (self.size as usize) {
39 bail!("uploaded chunk is larger than announced.");
40 }
41 self.chunk.extend_from_slice(&chunk);
42 }
43 None => {
44 if self.chunk.len() != (self.size as usize) {
45 bail!("uploaded chunk has unexpected size.");
46 }
47
48 let (is_duplicate, digest, compressed_size) = self.store.insert_chunk(&self.chunk)?;
49
50 return Ok(Async::Ready((digest, self.size, compressed_size as u32, is_duplicate)))
51 }
52 }
53 }
54 }
55 }
56
57 pub fn api_method_upload_fixed_chunk() -> ApiAsyncMethod {
58 ApiAsyncMethod::new(
59 upload_fixed_chunk,
60 ObjectSchema::new("Upload a new chunk.")
61 .required("wid", IntegerSchema::new("Fixed writer ID.")
62 .minimum(1)
63 .maximum(256)
64 )
65 .required("size", IntegerSchema::new("Chunk size.")
66 .minimum(1)
67 .maximum(1024*1024*16)
68 )
69 )
70 }
71
72 fn upload_fixed_chunk(
73 _parts: Parts,
74 req_body: Body,
75 param: Value,
76 _info: &ApiAsyncMethod,
77 rpcenv: Box<RpcEnvironment>,
78 ) -> Result<BoxFut, Error> {
79
80 let wid = tools::required_integer_param(&param, "wid")? as usize;
81 let size = tools::required_integer_param(&param, "size")? as u32;
82
83 let env: &BackupEnvironment = rpcenv.as_ref();
84
85 let upload = UploadChunk::new(req_body, env.datastore.clone(), size);
86
87 let resp = upload
88 .then(move |result| {
89 let env: &BackupEnvironment = rpcenv.as_ref();
90
91 let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| {
92 env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
93 let digest_str = tools::digest_to_hex(&digest);
94 env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
95 Ok(json!(digest_str))
96 });
97
98 Ok(env.format_response(result))
99 });
100
101 Ok(Box::new(resp))
102 }
103
104 pub fn api_method_upload_dynamic_chunk() -> ApiAsyncMethod {
105 ApiAsyncMethod::new(
106 upload_dynamic_chunk,
107 ObjectSchema::new("Upload a new chunk.")
108 .required("wid", IntegerSchema::new("Dynamic writer ID.")
109 .minimum(1)
110 .maximum(256)
111 )
112 .required("size", IntegerSchema::new("Chunk size.")
113 .minimum(1)
114 .maximum(1024*1024*16)
115 )
116 )
117 }
118
119 fn upload_dynamic_chunk(
120 _parts: Parts,
121 req_body: Body,
122 param: Value,
123 _info: &ApiAsyncMethod,
124 rpcenv: Box<RpcEnvironment>,
125 ) -> Result<BoxFut, Error> {
126
127 let wid = tools::required_integer_param(&param, "wid")? as usize;
128 let size = tools::required_integer_param(&param, "size")? as u32;
129
130 let env: &BackupEnvironment = rpcenv.as_ref();
131
132 let upload = UploadChunk::new(req_body, env.datastore.clone(), size);
133
134 let resp = upload
135 .then(move |result| {
136 let env: &BackupEnvironment = rpcenv.as_ref();
137
138 let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| {
139 env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
140 let digest_str = tools::digest_to_hex(&digest);
141 env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
142 Ok(json!(digest_str))
143 });
144
145 Ok(env.format_response(result))
146 });
147
148 Ok(Box::new(resp))
149 }
150
151 pub fn api_method_upload_speedtest() -> ApiAsyncMethod {
152 ApiAsyncMethod::new(
153 upload_speedtest,
154 ObjectSchema::new("Test uploadf speed.")
155 )
156 }
157
158 fn upload_speedtest(
159 _parts: Parts,
160 req_body: Body,
161 _param: Value,
162 _info: &ApiAsyncMethod,
163 rpcenv: Box<RpcEnvironment>,
164 ) -> Result<BoxFut, Error> {
165
166 let resp = req_body
167 .map_err(Error::from)
168 .fold(0, |size: usize, chunk| -> Result<usize, Error> {
169 let sum = size + chunk.len();
170 //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
171 Ok(sum)
172 })
173 .then(move |result| {
174 match result {
175 Ok(size) => {
176 println!("UPLOAD END {} bytes", size);
177 }
178 Err(err) => {
179 println!("Upload error: {}", err);
180 }
181 }
182 let env: &BackupEnvironment = rpcenv.as_ref();
183 Ok(env.format_response(Ok(Value::Null)))
184 });
185
186 Ok(Box::new(resp))
187 }
188
189 pub fn api_method_upload_config() -> ApiAsyncMethod {
190 ApiAsyncMethod::new(
191 upload_config,
192 ObjectSchema::new("Upload configuration file.")
193 .required("file-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
194 .required("size", IntegerSchema::new("File size.")
195 .minimum(1)
196 .maximum(1024*1024*16)
197 )
198 )
199 }
200
201 fn upload_config(
202 _parts: Parts,
203 req_body: Body,
204 param: Value,
205 _info: &ApiAsyncMethod,
206 rpcenv: Box<RpcEnvironment>,
207 ) -> Result<BoxFut, Error> {
208
209 let mut file_name = tools::required_string_param(&param, "file-name")?.to_owned();
210 let size = tools::required_integer_param(&param, "size")? as usize;
211
212 if !file_name.ends_with(".conf") {
213 bail!("wrong config file extension: '{}'", file_name);
214 } else {
215 file_name.push_str(".zstd");
216 }
217
218 let env: &BackupEnvironment = rpcenv.as_ref();
219
220 let mut path = env.datastore.base_path();
221 path.push(env.backup_dir.relative_path());
222 path.push(&file_name);
223
224 let env2 = env.clone();
225 let env3 = env.clone();
226
227 let resp = req_body
228 .map_err(Error::from)
229 .concat2()
230 .and_then(move |data| {
231 if size != data.len() {
232 bail!("got configuration file with unexpected length ({} != {})", size, data.len());
233 }
234
235 let data = zstd::block::compress(&data, 0)?;
236
237 tools::file_set_contents(&path, &data, None)?;
238
239 env2.debug(format!("upload config {:?} ({} bytes, comp: {})", path, size, data.len()));
240
241 Ok(())
242 })
243 .and_then(move |_| {
244 Ok(env3.format_response(Ok(Value::Null)))
245 })
246 ;
247
248 Ok(Box::new(resp))
249 }