]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup/upload_chunk.rs
api: BoxFut -> ApiFuture
[proxmox-backup.git] / src / api2 / backup / upload_chunk.rs
1 use std::pin::Pin;
2 use std::sync::Arc;
3 use std::task::{Context, Poll};
4
5 use failure::*;
6 use futures::*;
7 use hyper::Body;
8 use hyper::http::request::Parts;
9 use serde_json::{json, Value};
10
11 use proxmox::{sortable, identity};
12
13 use crate::api2::types::*;
14 use crate::api_schema::*;
15 use crate::api_schema::router::*;
16 use crate::backup::*;
17 use crate::tools;
18
19 use super::environment::*;
20
21 pub struct UploadChunk {
22 stream: Body,
23 store: Arc<DataStore>,
24 digest: [u8; 32],
25 size: u32,
26 encoded_size: u32,
27 raw_data: Option<Vec<u8>>,
28 }
29
30 impl UploadChunk {
31 pub fn new(stream: Body, store: Arc<DataStore>, digest: [u8; 32], size: u32, encoded_size: u32) -> Self {
32 Self { stream, store, size, encoded_size, raw_data: Some(vec![]), digest }
33 }
34 }
35
36 impl Future for UploadChunk {
37 type Output = Result<([u8; 32], u32, u32, bool), Error>;
38
39 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
40 let this = self.get_mut();
41
42 let err: Error = loop {
43 match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
44 Some(Err(err)) => return Poll::Ready(Err(Error::from(err))),
45 Some(Ok(input)) => {
46 if let Some(ref mut raw_data) = this.raw_data {
47 if (raw_data.len() + input.len()) > (this.encoded_size as usize) {
48 break format_err!("uploaded chunk is larger than announced.");
49 }
50 raw_data.extend_from_slice(&input);
51 } else {
52 break format_err!("poll upload chunk stream failed - already finished.");
53 }
54 }
55 None => {
56 if let Some(raw_data) = this.raw_data.take() {
57 if raw_data.len() != (this.encoded_size as usize) {
58 break format_err!("uploaded chunk has unexpected size.");
59 }
60
61 let (is_duplicate, compressed_size) = match proxmox::tools::try_block! {
62 let mut chunk = DataBlob::from_raw(raw_data)?;
63
64 chunk.verify_unencrypted(this.size as usize, &this.digest)?;
65
66 // always comput CRC at server side
67 chunk.set_crc(chunk.compute_crc());
68
69 this.store.insert_chunk(&chunk, &this.digest)
70 } {
71 Ok(res) => res,
72 Err(err) => break err,
73 };
74
75 return Poll::Ready(Ok((this.digest, this.size, compressed_size as u32, is_duplicate)))
76 } else {
77 break format_err!("poll upload chunk stream failed - already finished.");
78 }
79 }
80 }
81 };
82 Poll::Ready(Err(err))
83 }
84 }
85
86 #[sortable]
87 pub const API_METHOD_UPLOAD_FIXED_CHUNK: ApiMethod = ApiMethod::new(
88 &ApiHandler::Async(&upload_fixed_chunk),
89 &ObjectSchema::new(
90 "Upload a new chunk.",
91 &sorted!([
92 ("wid", false, &IntegerSchema::new("Fixed writer ID.")
93 .minimum(1)
94 .maximum(256)
95 .schema()
96 ),
97 ("digest", false, &CHUNK_DIGEST_SCHEMA),
98 ("size", false, &IntegerSchema::new("Chunk size.")
99 .minimum(1)
100 .maximum(1024*1024*16)
101 .schema()
102 ),
103 ("encoded-size", false, &IntegerSchema::new("Encoded chunk size.")
104 .minimum((std::mem::size_of::<DataBlobHeader>() as isize)+1)
105 .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize))
106 .schema()
107 ),
108 ]),
109 )
110 );
111
112 fn upload_fixed_chunk(
113 _parts: Parts,
114 req_body: Body,
115 param: Value,
116 _info: &ApiMethod,
117 rpcenv: Box<dyn RpcEnvironment>,
118 ) -> Result<ApiFuture, Error> {
119
120 let wid = tools::required_integer_param(&param, "wid")? as usize;
121 let size = tools::required_integer_param(&param, "size")? as u32;
122 let encoded_size = tools::required_integer_param(&param, "encoded-size")? as u32;
123
124 let digest_str = tools::required_string_param(&param, "digest")?;
125 let digest = proxmox::tools::hex_to_digest(digest_str)?;
126
127 let env: &BackupEnvironment = rpcenv.as_ref();
128
129 let upload = UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size);
130
131 let resp = upload
132 .then(move |result| {
133 let env: &BackupEnvironment = rpcenv.as_ref();
134
135 let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| {
136 env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
137 let digest_str = proxmox::tools::digest_to_hex(&digest);
138 env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
139 Ok(json!(digest_str))
140 });
141
142 future::ok(env.format_response(result))
143 });
144
145 Ok(Box::new(resp))
146 }
147
148 #[sortable]
149 pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK: ApiMethod = ApiMethod::new(
150 &ApiHandler::Async(&upload_dynamic_chunk),
151 &ObjectSchema::new(
152 "Upload a new chunk.",
153 &sorted!([
154 ("wid", false, &IntegerSchema::new("Dynamic writer ID.")
155 .minimum(1)
156 .maximum(256)
157 .schema()
158 ),
159 ("digest", false, &CHUNK_DIGEST_SCHEMA),
160 ("size", false, &IntegerSchema::new("Chunk size.")
161 .minimum(1)
162 .maximum(1024*1024*16)
163 .schema()
164 ),
165 ("encoded-size", false, &IntegerSchema::new("Encoded chunk size.")
166 .minimum((std::mem::size_of::<DataBlobHeader>() as isize) +1)
167 .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize))
168 .schema()
169 ),
170 ]),
171 )
172 );
173
174 fn upload_dynamic_chunk(
175 _parts: Parts,
176 req_body: Body,
177 param: Value,
178 _info: &ApiMethod,
179 rpcenv: Box<dyn RpcEnvironment>,
180 ) -> Result<ApiFuture, Error> {
181
182 let wid = tools::required_integer_param(&param, "wid")? as usize;
183 let size = tools::required_integer_param(&param, "size")? as u32;
184 let encoded_size = tools::required_integer_param(&param, "encoded-size")? as u32;
185
186 let digest_str = tools::required_string_param(&param, "digest")?;
187 let digest = proxmox::tools::hex_to_digest(digest_str)?;
188
189 let env: &BackupEnvironment = rpcenv.as_ref();
190
191 let upload = UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size);
192
193 let resp = upload
194 .then(move |result| {
195 let env: &BackupEnvironment = rpcenv.as_ref();
196
197 let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| {
198 env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
199 let digest_str = proxmox::tools::digest_to_hex(&digest);
200 env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
201 Ok(json!(digest_str))
202 });
203
204 future::ok(env.format_response(result))
205 });
206
207 Ok(Box::new(resp))
208 }
209
210 pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
211 &ApiHandler::Async(&upload_speedtest),
212 &ObjectSchema::new("Test upload speed.", &[])
213 );
214
215 fn upload_speedtest(
216 _parts: Parts,
217 req_body: Body,
218 _param: Value,
219 _info: &ApiMethod,
220 rpcenv: Box<dyn RpcEnvironment>,
221 ) -> Result<ApiFuture, Error> {
222
223 let resp = req_body
224 .map_err(Error::from)
225 .try_fold(0, |size: usize, chunk| {
226 let sum = size + chunk.len();
227 //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
228 future::ok::<usize, Error>(sum)
229 })
230 .then(move |result| {
231 match result {
232 Ok(size) => {
233 println!("UPLOAD END {} bytes", size);
234 }
235 Err(err) => {
236 println!("Upload error: {}", err);
237 }
238 }
239 let env: &BackupEnvironment = rpcenv.as_ref();
240 future::ok(env.format_response(Ok(Value::Null)))
241 });
242
243 Ok(Box::new(resp))
244 }
245
246 #[sortable]
247 pub const API_METHOD_UPLOAD_BLOB: ApiMethod = ApiMethod::new(
248 &ApiHandler::Async(&upload_blob),
249 &ObjectSchema::new(
250 "Upload binary blob file.",
251 &sorted!([
252 ("file-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
253 ("encoded-size", false, &IntegerSchema::new("Encoded blob size.")
254 .minimum((std::mem::size_of::<DataBlobHeader>() as isize) +1)
255 .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize))
256 .schema()
257 )
258 ]),
259 )
260 );
261
262 fn upload_blob(
263 _parts: Parts,
264 req_body: Body,
265 param: Value,
266 _info: &ApiMethod,
267 rpcenv: Box<dyn RpcEnvironment>,
268 ) -> Result<ApiFuture, Error> {
269
270 let file_name = tools::required_string_param(&param, "file-name")?.to_owned();
271 let encoded_size = tools::required_integer_param(&param, "encoded-size")? as usize;
272
273
274 let env: &BackupEnvironment = rpcenv.as_ref();
275
276 if !file_name.ends_with(".blob") {
277 bail!("wrong blob file extension: '{}'", file_name);
278 }
279
280 let env2 = env.clone();
281 let env3 = env.clone();
282
283 let resp = req_body
284 .map_err(Error::from)
285 .try_fold(Vec::new(), |mut acc, chunk| {
286 acc.extend_from_slice(&*chunk);
287 future::ok::<_, Error>(acc)
288 })
289 .and_then(move |data| async move {
290 if encoded_size != data.len() {
291 bail!("got blob with unexpected length ({} != {})", encoded_size, data.len());
292 }
293
294 env2.add_blob(&file_name, data)?;
295
296 Ok(())
297 })
298 .and_then(move |_| {
299 future::ok(env3.format_response(Ok(Value::Null)))
300 })
301 ;
302
303 Ok(Box::new(resp))
304 }