]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup/upload_chunk.rs
backup: Add support for client side encryption
[proxmox-backup.git] / src / api2 / backup / upload_chunk.rs
1 use failure::*;
2 use futures::*;
3 use std::sync::Arc;
4
5 use hyper::http::request::Parts;
6 use hyper::Body;
7 use serde_json::{json, Value};
8
9 use crate::tools;
10 use crate::backup::*;
11 use crate::api_schema::*;
12 use crate::api_schema::router::*;
13 use crate::api2::types::*;
14
15 use super::environment::*;
16
17 pub struct UploadChunk {
18 stream: Body,
19 store: Arc<DataStore>,
20 digest: [u8; 32],
21 size: u32,
22 encoded_size: u32,
23 raw_data: Option<Vec<u8>>,
24 }
25
26 impl UploadChunk {
27
28 pub fn new(stream: Body, store: Arc<DataStore>, digest: [u8; 32], size: u32, encoded_size: u32) -> Self {
29 Self { stream, store, size, encoded_size, raw_data: Some(vec![]), digest }
30 }
31 }
32
33 impl Future for UploadChunk {
34 type Item = ([u8; 32], u32, u32, bool);
35 type Error = failure::Error;
36
37 fn poll(&mut self) -> Poll<([u8; 32], u32, u32, bool), failure::Error> {
38 loop {
39 match try_ready!(self.stream.poll()) {
40 Some(input) => {
41 if let Some(ref mut raw_data) = self.raw_data {
42 if (raw_data.len() + input.len()) > (self.encoded_size as usize) {
43 bail!("uploaded chunk is larger than announced.");
44 }
45 raw_data.extend_from_slice(&input);
46 } else {
47 bail!("poll upload chunk stream failed - already finished.");
48 }
49 }
50 None => {
51 if let Some(raw_data) = self.raw_data.take() {
52 if raw_data.len() != (self.encoded_size as usize) {
53 bail!("uploaded chunk has unexpected size.");
54 }
55
56 let chunk = DataChunk::from_raw(raw_data, self.digest)?;
57
58 let (is_duplicate, compressed_size) = self.store.insert_chunk(&chunk)?;
59
60 return Ok(Async::Ready((self.digest, self.size, compressed_size as u32, is_duplicate)))
61 } else {
62 bail!("poll upload chunk stream failed - already finished.");
63 }
64 }
65 }
66 }
67 }
68 }
69
70 pub fn api_method_upload_fixed_chunk() -> ApiAsyncMethod {
71 ApiAsyncMethod::new(
72 upload_fixed_chunk,
73 ObjectSchema::new("Upload a new chunk.")
74 .required("wid", IntegerSchema::new("Fixed writer ID.")
75 .minimum(1)
76 .maximum(256)
77 )
78 .required("digest", CHUNK_DIGEST_SCHEMA.clone())
79 .required("size", IntegerSchema::new("Chunk size.")
80 .minimum(1)
81 .maximum(1024*1024*16)
82 )
83 .required("encoded-size", IntegerSchema::new("Encoded chunk size.")
84 .minimum(9)
85 // fixme: .maximum(1024*1024*16+40)
86 )
87 )
88 }
89
90 fn upload_fixed_chunk(
91 _parts: Parts,
92 req_body: Body,
93 param: Value,
94 _info: &ApiAsyncMethod,
95 rpcenv: Box<dyn RpcEnvironment>,
96 ) -> Result<BoxFut, Error> {
97
98 let wid = tools::required_integer_param(&param, "wid")? as usize;
99 let size = tools::required_integer_param(&param, "size")? as u32;
100 let encoded_size = tools::required_integer_param(&param, "encoded-size")? as u32;
101
102 let digest_str = tools::required_string_param(&param, "digest")?;
103 let digest = crate::tools::hex_to_digest(digest_str)?;
104
105 let env: &BackupEnvironment = rpcenv.as_ref();
106
107 let upload = UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size);
108
109 let resp = upload
110 .then(move |result| {
111 let env: &BackupEnvironment = rpcenv.as_ref();
112
113 let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| {
114 env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
115 let digest_str = tools::digest_to_hex(&digest);
116 env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
117 Ok(json!(digest_str))
118 });
119
120 Ok(env.format_response(result))
121 });
122
123 Ok(Box::new(resp))
124 }
125
126 pub fn api_method_upload_dynamic_chunk() -> ApiAsyncMethod {
127 ApiAsyncMethod::new(
128 upload_dynamic_chunk,
129 ObjectSchema::new("Upload a new chunk.")
130 .required("wid", IntegerSchema::new("Dynamic writer ID.")
131 .minimum(1)
132 .maximum(256)
133 )
134 .required("digest", CHUNK_DIGEST_SCHEMA.clone())
135 .required("size", IntegerSchema::new("Chunk size.")
136 .minimum(1)
137 .maximum(1024*1024*16)
138 )
139 .required("encoded-size", IntegerSchema::new("Encoded chunk size.")
140 .minimum(9)
141 // fixme: .maximum(1024*1024*16+40)
142 )
143 )
144 }
145
146 fn upload_dynamic_chunk(
147 _parts: Parts,
148 req_body: Body,
149 param: Value,
150 _info: &ApiAsyncMethod,
151 rpcenv: Box<dyn RpcEnvironment>,
152 ) -> Result<BoxFut, Error> {
153
154 let wid = tools::required_integer_param(&param, "wid")? as usize;
155 let size = tools::required_integer_param(&param, "size")? as u32;
156 let encoded_size = tools::required_integer_param(&param, "encoded-size")? as u32;
157
158 let digest_str = tools::required_string_param(&param, "digest")?;
159 let digest = crate::tools::hex_to_digest(digest_str)?;
160
161 let env: &BackupEnvironment = rpcenv.as_ref();
162
163 let upload = UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size);
164
165 let resp = upload
166 .then(move |result| {
167 let env: &BackupEnvironment = rpcenv.as_ref();
168
169 let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| {
170 env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
171 let digest_str = tools::digest_to_hex(&digest);
172 env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
173 Ok(json!(digest_str))
174 });
175
176 Ok(env.format_response(result))
177 });
178
179 Ok(Box::new(resp))
180 }
181
182 pub fn api_method_upload_speedtest() -> ApiAsyncMethod {
183 ApiAsyncMethod::new(
184 upload_speedtest,
185 ObjectSchema::new("Test uploadf speed.")
186 )
187 }
188
189 fn upload_speedtest(
190 _parts: Parts,
191 req_body: Body,
192 _param: Value,
193 _info: &ApiAsyncMethod,
194 rpcenv: Box<dyn RpcEnvironment>,
195 ) -> Result<BoxFut, Error> {
196
197 let resp = req_body
198 .map_err(Error::from)
199 .fold(0, |size: usize, chunk| -> Result<usize, Error> {
200 let sum = size + chunk.len();
201 //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
202 Ok(sum)
203 })
204 .then(move |result| {
205 match result {
206 Ok(size) => {
207 println!("UPLOAD END {} bytes", size);
208 }
209 Err(err) => {
210 println!("Upload error: {}", err);
211 }
212 }
213 let env: &BackupEnvironment = rpcenv.as_ref();
214 Ok(env.format_response(Ok(Value::Null)))
215 });
216
217 Ok(Box::new(resp))
218 }
219
220 pub fn api_method_upload_config() -> ApiAsyncMethod {
221 ApiAsyncMethod::new(
222 upload_config,
223 ObjectSchema::new("Upload configuration file.")
224 .required("file-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
225 .required("size", IntegerSchema::new("File size.")
226 .minimum(1)
227 .maximum(1024*1024*16)
228 )
229 )
230 }
231
232 fn upload_config(
233 _parts: Parts,
234 req_body: Body,
235 param: Value,
236 _info: &ApiAsyncMethod,
237 rpcenv: Box<dyn RpcEnvironment>,
238 ) -> Result<BoxFut, Error> {
239
240 let mut file_name = tools::required_string_param(&param, "file-name")?.to_owned();
241 let size = tools::required_integer_param(&param, "size")? as usize;
242
243 if !file_name.ends_with(".conf") {
244 bail!("wrong config file extension: '{}'", file_name);
245 } else {
246 file_name.push_str(".zstd");
247 }
248
249 let env: &BackupEnvironment = rpcenv.as_ref();
250
251 let mut path = env.datastore.base_path();
252 path.push(env.backup_dir.relative_path());
253 path.push(&file_name);
254
255 let env2 = env.clone();
256 let env3 = env.clone();
257
258 let resp = req_body
259 .map_err(Error::from)
260 .concat2()
261 .and_then(move |data| {
262 if size != data.len() {
263 bail!("got configuration file with unexpected length ({} != {})", size, data.len());
264 }
265
266 let data = zstd::block::compress(&data, 0)?;
267
268 tools::file_set_contents(&path, &data, None)?;
269
270 env2.debug(format!("upload config {:?} ({} bytes, comp: {})", path, size, data.len()));
271
272 Ok(())
273 })
274 .and_then(move |_| {
275 Ok(env3.format_response(Ok(Value::Null)))
276 })
277 ;
278
279 Ok(Box::new(resp))
280 }