]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup/upload_chunk.rs
clippy 1.65 fixes
[proxmox-backup.git] / src / api2 / backup / upload_chunk.rs
1 use std::pin::Pin;
2 use std::sync::Arc;
3 use std::task::{Context, Poll};
4
5 use anyhow::{bail, format_err, Error};
6 use futures::*;
7 use hex::FromHex;
8 use hyper::http::request::Parts;
9 use hyper::Body;
10 use serde_json::{json, Value};
11
12 use proxmox_router::{ApiHandler, ApiMethod, ApiResponseFuture, RpcEnvironment};
13 use proxmox_schema::*;
14 use proxmox_sys::sortable;
15
16 use pbs_api_types::{BACKUP_ARCHIVE_NAME_SCHEMA, CHUNK_DIGEST_SCHEMA};
17 use pbs_datastore::file_formats::{DataBlobHeader, EncryptedDataBlobHeader};
18 use pbs_datastore::{DataBlob, DataStore};
19 use pbs_tools::json::{required_integer_param, required_string_param};
20
21 use super::environment::*;
22
23 pub struct UploadChunk {
24 stream: Body,
25 store: Arc<DataStore>,
26 digest: [u8; 32],
27 size: u32,
28 encoded_size: u32,
29 raw_data: Option<Vec<u8>>,
30 }
31
32 impl UploadChunk {
33 pub fn new(
34 stream: Body,
35 store: Arc<DataStore>,
36 digest: [u8; 32],
37 size: u32,
38 encoded_size: u32,
39 ) -> Self {
40 Self {
41 stream,
42 store,
43 size,
44 encoded_size,
45 raw_data: Some(vec![]),
46 digest,
47 }
48 }
49 }
50
51 impl Future for UploadChunk {
52 type Output = Result<([u8; 32], u32, u32, bool), Error>;
53
54 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
55 let this = self.get_mut();
56
57 let err: Error = loop {
58 match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
59 Some(Err(err)) => return Poll::Ready(Err(Error::from(err))),
60 Some(Ok(input)) => {
61 if let Some(ref mut raw_data) = this.raw_data {
62 if (raw_data.len() + input.len()) > (this.encoded_size as usize) {
63 break format_err!("uploaded chunk is larger than announced.");
64 }
65 raw_data.extend_from_slice(&input);
66 } else {
67 break format_err!("poll upload chunk stream failed - already finished.");
68 }
69 }
70 None => {
71 if let Some(raw_data) = this.raw_data.take() {
72 if raw_data.len() != (this.encoded_size as usize) {
73 break format_err!("uploaded chunk has unexpected size.");
74 }
75
76 let (is_duplicate, compressed_size) = match proxmox_lang::try_block! {
77 let mut chunk = DataBlob::from_raw(raw_data)?;
78
79 proxmox_async::runtime::block_in_place(|| {
80 chunk.verify_unencrypted(this.size as usize, &this.digest)?;
81
82 // always comput CRC at server side
83 chunk.set_crc(chunk.compute_crc());
84
85 this.store.insert_chunk(&chunk, &this.digest)
86 })
87
88 } {
89 Ok(res) => res,
90 Err(err) => break err,
91 };
92
93 return Poll::Ready(Ok((
94 this.digest,
95 this.size,
96 compressed_size as u32,
97 is_duplicate,
98 )));
99 } else {
100 break format_err!("poll upload chunk stream failed - already finished.");
101 }
102 }
103 }
104 };
105 Poll::Ready(Err(err))
106 }
107 }
108
109 #[sortable]
110 pub const API_METHOD_UPLOAD_FIXED_CHUNK: ApiMethod = ApiMethod::new(
111 &ApiHandler::AsyncHttp(&upload_fixed_chunk),
112 &ObjectSchema::new(
113 "Upload a new chunk.",
114 &sorted!([
115 (
116 "wid",
117 false,
118 &IntegerSchema::new("Fixed writer ID.")
119 .minimum(1)
120 .maximum(256)
121 .schema()
122 ),
123 ("digest", false, &CHUNK_DIGEST_SCHEMA),
124 (
125 "size",
126 false,
127 &IntegerSchema::new("Chunk size.")
128 .minimum(1)
129 .maximum(1024 * 1024 * 16)
130 .schema()
131 ),
132 (
133 "encoded-size",
134 false,
135 &IntegerSchema::new("Encoded chunk size.")
136 .minimum((std::mem::size_of::<DataBlobHeader>() as isize) + 1)
137 .maximum(
138 1024 * 1024 * 16
139 + (std::mem::size_of::<EncryptedDataBlobHeader>() as isize)
140 )
141 .schema()
142 ),
143 ]),
144 ),
145 );
146
147 fn upload_fixed_chunk(
148 _parts: Parts,
149 req_body: Body,
150 param: Value,
151 _info: &ApiMethod,
152 rpcenv: Box<dyn RpcEnvironment>,
153 ) -> ApiResponseFuture {
154 async move {
155 let wid = required_integer_param(&param, "wid")? as usize;
156 let size = required_integer_param(&param, "size")? as u32;
157 let encoded_size = required_integer_param(&param, "encoded-size")? as u32;
158
159 let digest_str = required_string_param(&param, "digest")?;
160 let digest = <[u8; 32]>::from_hex(digest_str)?;
161
162 let env: &BackupEnvironment = rpcenv.as_ref();
163
164 let (digest, size, compressed_size, is_duplicate) =
165 UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
166
167 env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
168 let digest_str = hex::encode(digest);
169 env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
170
171 let result = Ok(json!(digest_str));
172
173 Ok(env.format_response(result))
174 }
175 .boxed()
176 }
177
178 #[sortable]
179 pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK: ApiMethod = ApiMethod::new(
180 &ApiHandler::AsyncHttp(&upload_dynamic_chunk),
181 &ObjectSchema::new(
182 "Upload a new chunk.",
183 &sorted!([
184 (
185 "wid",
186 false,
187 &IntegerSchema::new("Dynamic writer ID.")
188 .minimum(1)
189 .maximum(256)
190 .schema()
191 ),
192 ("digest", false, &CHUNK_DIGEST_SCHEMA),
193 (
194 "size",
195 false,
196 &IntegerSchema::new("Chunk size.")
197 .minimum(1)
198 .maximum(1024 * 1024 * 16)
199 .schema()
200 ),
201 (
202 "encoded-size",
203 false,
204 &IntegerSchema::new("Encoded chunk size.")
205 .minimum((std::mem::size_of::<DataBlobHeader>() as isize) + 1)
206 .maximum(
207 1024 * 1024 * 16
208 + (std::mem::size_of::<EncryptedDataBlobHeader>() as isize)
209 )
210 .schema()
211 ),
212 ]),
213 ),
214 );
215
216 fn upload_dynamic_chunk(
217 _parts: Parts,
218 req_body: Body,
219 param: Value,
220 _info: &ApiMethod,
221 rpcenv: Box<dyn RpcEnvironment>,
222 ) -> ApiResponseFuture {
223 async move {
224 let wid = required_integer_param(&param, "wid")? as usize;
225 let size = required_integer_param(&param, "size")? as u32;
226 let encoded_size = required_integer_param(&param, "encoded-size")? as u32;
227
228 let digest_str = required_string_param(&param, "digest")?;
229 let digest = <[u8; 32]>::from_hex(digest_str)?;
230
231 let env: &BackupEnvironment = rpcenv.as_ref();
232
233 let (digest, size, compressed_size, is_duplicate) =
234 UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
235
236 env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
237 let digest_str = hex::encode(digest);
238 env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
239
240 let result = Ok(json!(digest_str));
241 Ok(env.format_response(result))
242 }
243 .boxed()
244 }
245
246 pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
247 &ApiHandler::AsyncHttp(&upload_speedtest),
248 &ObjectSchema::new("Test upload speed.", &[]),
249 );
250
251 fn upload_speedtest(
252 _parts: Parts,
253 req_body: Body,
254 _param: Value,
255 _info: &ApiMethod,
256 rpcenv: Box<dyn RpcEnvironment>,
257 ) -> ApiResponseFuture {
258 async move {
259 let result = req_body
260 .map_err(Error::from)
261 .try_fold(0, |size: usize, chunk| {
262 let sum = size + chunk.len();
263 //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
264 future::ok::<usize, Error>(sum)
265 })
266 .await;
267
268 match result {
269 Ok(size) => {
270 println!("UPLOAD END {} bytes", size);
271 }
272 Err(err) => {
273 println!("Upload error: {}", err);
274 }
275 }
276 let env: &BackupEnvironment = rpcenv.as_ref();
277 Ok(env.format_response(Ok(Value::Null)))
278 }
279 .boxed()
280 }
281
282 #[sortable]
283 pub const API_METHOD_UPLOAD_BLOB: ApiMethod = ApiMethod::new(
284 &ApiHandler::AsyncHttp(&upload_blob),
285 &ObjectSchema::new(
286 "Upload binary blob file.",
287 &sorted!([
288 ("file-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA),
289 (
290 "encoded-size",
291 false,
292 &IntegerSchema::new("Encoded blob size.")
293 .minimum(std::mem::size_of::<DataBlobHeader>() as isize)
294 .maximum(
295 1024 * 1024 * 16
296 + (std::mem::size_of::<EncryptedDataBlobHeader>() as isize)
297 )
298 .schema()
299 )
300 ]),
301 ),
302 );
303
304 fn upload_blob(
305 _parts: Parts,
306 req_body: Body,
307 param: Value,
308 _info: &ApiMethod,
309 rpcenv: Box<dyn RpcEnvironment>,
310 ) -> ApiResponseFuture {
311 async move {
312 let file_name = required_string_param(&param, "file-name")?.to_owned();
313 let encoded_size = required_integer_param(&param, "encoded-size")? as usize;
314
315 let env: &BackupEnvironment = rpcenv.as_ref();
316
317 if !file_name.ends_with(".blob") {
318 bail!("wrong blob file extension: '{}'", file_name);
319 }
320
321 let data = req_body
322 .map_err(Error::from)
323 .try_fold(Vec::new(), |mut acc, chunk| {
324 acc.extend_from_slice(&chunk);
325 future::ok::<_, Error>(acc)
326 })
327 .await?;
328
329 if encoded_size != data.len() {
330 bail!(
331 "got blob with unexpected length ({} != {})",
332 encoded_size,
333 data.len()
334 );
335 }
336
337 env.add_blob(&file_name, data)?;
338
339 Ok(env.format_response(Ok(Value::Null)))
340 }
341 .boxed()
342 }