]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup/upload_chunk.rs
switch from failure to anyhow
[proxmox-backup.git] / src / api2 / backup / upload_chunk.rs
1 use std::pin::Pin;
2 use std::sync::Arc;
3 use std::task::{Context, Poll};
4
5 use anyhow::{bail, format_err, Error};
6 use futures::*;
7 use hyper::Body;
8 use hyper::http::request::Parts;
9 use serde_json::{json, Value};
10
11 use proxmox::{sortable, identity};
12 use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, RpcEnvironment};
13 use proxmox::api::schema::*;
14
15 use crate::api2::types::*;
16 use crate::backup::*;
17 use crate::tools;
18
19 use super::environment::*;
20
21 pub struct UploadChunk {
22 stream: Body,
23 store: Arc<DataStore>,
24 digest: [u8; 32],
25 size: u32,
26 encoded_size: u32,
27 raw_data: Option<Vec<u8>>,
28 }
29
30 impl UploadChunk {
31 pub fn new(stream: Body, store: Arc<DataStore>, digest: [u8; 32], size: u32, encoded_size: u32) -> Self {
32 Self { stream, store, size, encoded_size, raw_data: Some(vec![]), digest }
33 }
34 }
35
36 impl Future for UploadChunk {
37 type Output = Result<([u8; 32], u32, u32, bool), Error>;
38
39 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
40 let this = self.get_mut();
41
42 let err: Error = loop {
43 match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
44 Some(Err(err)) => return Poll::Ready(Err(Error::from(err))),
45 Some(Ok(input)) => {
46 if let Some(ref mut raw_data) = this.raw_data {
47 if (raw_data.len() + input.len()) > (this.encoded_size as usize) {
48 break format_err!("uploaded chunk is larger than announced.");
49 }
50 raw_data.extend_from_slice(&input);
51 } else {
52 break format_err!("poll upload chunk stream failed - already finished.");
53 }
54 }
55 None => {
56 if let Some(raw_data) = this.raw_data.take() {
57 if raw_data.len() != (this.encoded_size as usize) {
58 break format_err!("uploaded chunk has unexpected size.");
59 }
60
61 let (is_duplicate, compressed_size) = match proxmox::try_block! {
62 let mut chunk = DataBlob::from_raw(raw_data)?;
63
64 chunk.verify_unencrypted(this.size as usize, &this.digest)?;
65
66 // always comput CRC at server side
67 chunk.set_crc(chunk.compute_crc());
68
69 this.store.insert_chunk(&chunk, &this.digest)
70 } {
71 Ok(res) => res,
72 Err(err) => break err,
73 };
74
75 return Poll::Ready(Ok((this.digest, this.size, compressed_size as u32, is_duplicate)))
76 } else {
77 break format_err!("poll upload chunk stream failed - already finished.");
78 }
79 }
80 }
81 };
82 Poll::Ready(Err(err))
83 }
84 }
85
86 #[sortable]
87 pub const API_METHOD_UPLOAD_FIXED_CHUNK: ApiMethod = ApiMethod::new(
88 &ApiHandler::AsyncHttp(&upload_fixed_chunk),
89 &ObjectSchema::new(
90 "Upload a new chunk.",
91 &sorted!([
92 ("wid", false, &IntegerSchema::new("Fixed writer ID.")
93 .minimum(1)
94 .maximum(256)
95 .schema()
96 ),
97 ("digest", false, &CHUNK_DIGEST_SCHEMA),
98 ("size", false, &IntegerSchema::new("Chunk size.")
99 .minimum(1)
100 .maximum(1024*1024*16)
101 .schema()
102 ),
103 ("encoded-size", false, &IntegerSchema::new("Encoded chunk size.")
104 .minimum((std::mem::size_of::<DataBlobHeader>() as isize)+1)
105 .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize))
106 .schema()
107 ),
108 ]),
109 )
110 );
111
112 fn upload_fixed_chunk(
113 _parts: Parts,
114 req_body: Body,
115 param: Value,
116 _info: &ApiMethod,
117 rpcenv: Box<dyn RpcEnvironment>,
118 ) -> ApiResponseFuture {
119
120 async move {
121 let wid = tools::required_integer_param(&param, "wid")? as usize;
122 let size = tools::required_integer_param(&param, "size")? as u32;
123 let encoded_size = tools::required_integer_param(&param, "encoded-size")? as u32;
124
125 let digest_str = tools::required_string_param(&param, "digest")?;
126 let digest = proxmox::tools::hex_to_digest(digest_str)?;
127
128 let env: &BackupEnvironment = rpcenv.as_ref();
129
130 let (digest, size, compressed_size, is_duplicate) =
131 UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
132
133 env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
134 let digest_str = proxmox::tools::digest_to_hex(&digest);
135 env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
136
137 let result = Ok(json!(digest_str));
138
139 Ok(env.format_response(result))
140 }
141 .boxed()
142 }
143
144 #[sortable]
145 pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK: ApiMethod = ApiMethod::new(
146 &ApiHandler::AsyncHttp(&upload_dynamic_chunk),
147 &ObjectSchema::new(
148 "Upload a new chunk.",
149 &sorted!([
150 ("wid", false, &IntegerSchema::new("Dynamic writer ID.")
151 .minimum(1)
152 .maximum(256)
153 .schema()
154 ),
155 ("digest", false, &CHUNK_DIGEST_SCHEMA),
156 ("size", false, &IntegerSchema::new("Chunk size.")
157 .minimum(1)
158 .maximum(1024*1024*16)
159 .schema()
160 ),
161 ("encoded-size", false, &IntegerSchema::new("Encoded chunk size.")
162 .minimum((std::mem::size_of::<DataBlobHeader>() as isize) +1)
163 .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize))
164 .schema()
165 ),
166 ]),
167 )
168 );
169
170 fn upload_dynamic_chunk(
171 _parts: Parts,
172 req_body: Body,
173 param: Value,
174 _info: &ApiMethod,
175 rpcenv: Box<dyn RpcEnvironment>,
176 ) -> ApiResponseFuture {
177
178 async move {
179 let wid = tools::required_integer_param(&param, "wid")? as usize;
180 let size = tools::required_integer_param(&param, "size")? as u32;
181 let encoded_size = tools::required_integer_param(&param, "encoded-size")? as u32;
182
183 let digest_str = tools::required_string_param(&param, "digest")?;
184 let digest = proxmox::tools::hex_to_digest(digest_str)?;
185
186 let env: &BackupEnvironment = rpcenv.as_ref();
187
188 let (digest, size, compressed_size, is_duplicate) =
189 UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size)
190 .await?;
191
192 env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
193 let digest_str = proxmox::tools::digest_to_hex(&digest);
194 env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
195
196 let result = Ok(json!(digest_str));
197 Ok(env.format_response(result))
198 }.boxed()
199 }
200
201 pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
202 &ApiHandler::AsyncHttp(&upload_speedtest),
203 &ObjectSchema::new("Test upload speed.", &[])
204 );
205
206 fn upload_speedtest(
207 _parts: Parts,
208 req_body: Body,
209 _param: Value,
210 _info: &ApiMethod,
211 rpcenv: Box<dyn RpcEnvironment>,
212 ) -> ApiResponseFuture {
213
214 async move {
215
216 let result = req_body
217 .map_err(Error::from)
218 .try_fold(0, |size: usize, chunk| {
219 let sum = size + chunk.len();
220 //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
221 future::ok::<usize, Error>(sum)
222 })
223 .await;
224
225 match result {
226 Ok(size) => {
227 println!("UPLOAD END {} bytes", size);
228 }
229 Err(err) => {
230 println!("Upload error: {}", err);
231 }
232 }
233 let env: &BackupEnvironment = rpcenv.as_ref();
234 Ok(env.format_response(Ok(Value::Null)))
235 }.boxed()
236 }
237
238 #[sortable]
239 pub const API_METHOD_UPLOAD_BLOB: ApiMethod = ApiMethod::new(
240 &ApiHandler::AsyncHttp(&upload_blob),
241 &ObjectSchema::new(
242 "Upload binary blob file.",
243 &sorted!([
244 ("file-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
245 ("encoded-size", false, &IntegerSchema::new("Encoded blob size.")
246 .minimum((std::mem::size_of::<DataBlobHeader>() as isize) +1)
247 .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize))
248 .schema()
249 )
250 ]),
251 )
252 );
253
254 fn upload_blob(
255 _parts: Parts,
256 req_body: Body,
257 param: Value,
258 _info: &ApiMethod,
259 rpcenv: Box<dyn RpcEnvironment>,
260 ) -> ApiResponseFuture {
261
262 async move {
263 let file_name = tools::required_string_param(&param, "file-name")?.to_owned();
264 let encoded_size = tools::required_integer_param(&param, "encoded-size")? as usize;
265
266 let env: &BackupEnvironment = rpcenv.as_ref();
267
268 if !file_name.ends_with(".blob") {
269 bail!("wrong blob file extension: '{}'", file_name);
270 }
271
272 let data = req_body
273 .map_err(Error::from)
274 .try_fold(Vec::new(), |mut acc, chunk| {
275 acc.extend_from_slice(&*chunk);
276 future::ok::<_, Error>(acc)
277 })
278 .await?;
279
280 if encoded_size != data.len() {
281 bail!("got blob with unexpected length ({} != {})", encoded_size, data.len());
282 }
283
284 env.add_blob(&file_name, data)?;
285
286 Ok(env.format_response(Ok(Value::Null)))
287 }.boxed()
288 }