]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/backup/upload_chunk.rs
switch from failure to anyhow
[proxmox-backup.git] / src / api2 / backup / upload_chunk.rs
CommitLineData
76220055 1use std::pin::Pin;
21ee7912 2use std::sync::Arc;
76220055 3use std::task::{Context, Poll};
21ee7912 4
f7d4e4b5 5use anyhow::{bail, format_err, Error};
76220055 6use futures::*;
21ee7912 7use hyper::Body;
76220055 8use hyper::http::request::Parts;
21ee7912
DM
9use serde_json::{json, Value};
10
552c2259 11use proxmox::{sortable, identity};
bb084b9c 12use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, RpcEnvironment};
a2479cfa 13use proxmox::api::schema::*;
552c2259 14
76220055 15use crate::api2::types::*;
76220055
WB
16use crate::backup::*;
17use crate::tools;
21ee7912
DM
18
19use super::environment::*;
20
21pub struct UploadChunk {
22 stream: Body,
23 store: Arc<DataStore>,
f98ac774 24 digest: [u8; 32],
a09c0e38 25 size: u32,
f98ac774
DM
26 encoded_size: u32,
27 raw_data: Option<Vec<u8>>,
21ee7912
DM
28}
29
30impl UploadChunk {
f98ac774
DM
31 pub fn new(stream: Body, store: Arc<DataStore>, digest: [u8; 32], size: u32, encoded_size: u32) -> Self {
32 Self { stream, store, size, encoded_size, raw_data: Some(vec![]), digest }
21ee7912
DM
33 }
34}
35
36impl Future for UploadChunk {
76220055
WB
37 type Output = Result<([u8; 32], u32, u32, bool), Error>;
38
39 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
40 let this = self.get_mut();
41
42 let err: Error = loop {
43 match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
44 Some(Err(err)) => return Poll::Ready(Err(Error::from(err))),
45 Some(Ok(input)) => {
46 if let Some(ref mut raw_data) = this.raw_data {
47 if (raw_data.len() + input.len()) > (this.encoded_size as usize) {
48 break format_err!("uploaded chunk is larger than announced.");
f98ac774
DM
49 }
50 raw_data.extend_from_slice(&input);
51 } else {
76220055 52 break format_err!("poll upload chunk stream failed - already finished.");
21ee7912 53 }
21ee7912
DM
54 }
55 None => {
76220055
WB
56 if let Some(raw_data) = this.raw_data.take() {
57 if raw_data.len() != (this.encoded_size as usize) {
58 break format_err!("uploaded chunk has unexpected size.");
f98ac774
DM
59 }
60
9ea4bce4 61 let (is_duplicate, compressed_size) = match proxmox::try_block! {
4ee8f53d 62 let mut chunk = DataBlob::from_raw(raw_data)?;
21ee7912 63
4ee8f53d 64 chunk.verify_unencrypted(this.size as usize, &this.digest)?;
fa148dbd 65
76220055
WB
66 // always comput CRC at server side
67 chunk.set_crc(chunk.compute_crc());
6f083b7a 68
4ee8f53d 69 this.store.insert_chunk(&chunk, &this.digest)
76220055
WB
70 } {
71 Ok(res) => res,
72 Err(err) => break err,
73 };
21ee7912 74
76220055 75 return Poll::Ready(Ok((this.digest, this.size, compressed_size as u32, is_duplicate)))
f98ac774 76 } else {
76220055 77 break format_err!("poll upload chunk stream failed - already finished.");
f98ac774 78 }
21ee7912
DM
79 }
80 }
76220055
WB
81 };
82 Poll::Ready(Err(err))
21ee7912
DM
83 }
84}
85
552c2259 86#[sortable]
255f378a 87pub const API_METHOD_UPLOAD_FIXED_CHUNK: ApiMethod = ApiMethod::new(
329d40b5 88 &ApiHandler::AsyncHttp(&upload_fixed_chunk),
255f378a
DM
89 &ObjectSchema::new(
90 "Upload a new chunk.",
552c2259 91 &sorted!([
255f378a
DM
92 ("wid", false, &IntegerSchema::new("Fixed writer ID.")
93 .minimum(1)
94 .maximum(256)
95 .schema()
96 ),
97 ("digest", false, &CHUNK_DIGEST_SCHEMA),
98 ("size", false, &IntegerSchema::new("Chunk size.")
99 .minimum(1)
100 .maximum(1024*1024*16)
101 .schema()
102 ),
103 ("encoded-size", false, &IntegerSchema::new("Encoded chunk size.")
104 .minimum((std::mem::size_of::<DataBlobHeader>() as isize)+1)
105 .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize))
106 .schema()
107 ),
552c2259 108 ]),
642322b4 109 )
255f378a 110);
642322b4
DM
111
112fn upload_fixed_chunk(
113 _parts: Parts,
114 req_body: Body,
115 param: Value,
255f378a 116 _info: &ApiMethod,
dd5495d6 117 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 118) -> ApiResponseFuture {
642322b4 119
ad51d02a
DM
120 async move {
121 let wid = tools::required_integer_param(&param, "wid")? as usize;
122 let size = tools::required_integer_param(&param, "size")? as u32;
123 let encoded_size = tools::required_integer_param(&param, "encoded-size")? as u32;
f98ac774 124
ad51d02a
DM
125 let digest_str = tools::required_string_param(&param, "digest")?;
126 let digest = proxmox::tools::hex_to_digest(digest_str)?;
642322b4 127
ad51d02a 128 let env: &BackupEnvironment = rpcenv.as_ref();
642322b4 129
ad51d02a
DM
130 let (digest, size, compressed_size, is_duplicate) =
131 UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
642322b4 132
ad51d02a
DM
133 env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
134 let digest_str = proxmox::tools::digest_to_hex(&digest);
135 env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
642322b4 136
ad51d02a 137 let result = Ok(json!(digest_str));
642322b4 138
ad51d02a
DM
139 Ok(env.format_response(result))
140 }
141 .boxed()
642322b4
DM
142}
143
552c2259 144#[sortable]
255f378a 145pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK: ApiMethod = ApiMethod::new(
329d40b5 146 &ApiHandler::AsyncHttp(&upload_dynamic_chunk),
255f378a
DM
147 &ObjectSchema::new(
148 "Upload a new chunk.",
552c2259 149 &sorted!([
255f378a
DM
150 ("wid", false, &IntegerSchema::new("Dynamic writer ID.")
151 .minimum(1)
152 .maximum(256)
153 .schema()
154 ),
155 ("digest", false, &CHUNK_DIGEST_SCHEMA),
156 ("size", false, &IntegerSchema::new("Chunk size.")
157 .minimum(1)
158 .maximum(1024*1024*16)
159 .schema()
160 ),
161 ("encoded-size", false, &IntegerSchema::new("Encoded chunk size.")
162 .minimum((std::mem::size_of::<DataBlobHeader>() as isize) +1)
163 .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize))
164 .schema()
165 ),
552c2259 166 ]),
21ee7912 167 )
255f378a 168);
21ee7912 169
642322b4 170fn upload_dynamic_chunk(
21ee7912
DM
171 _parts: Parts,
172 req_body: Body,
173 param: Value,
255f378a 174 _info: &ApiMethod,
dd5495d6 175 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 176) -> ApiResponseFuture {
f98ac774 177
ad51d02a
DM
178 async move {
179 let wid = tools::required_integer_param(&param, "wid")? as usize;
180 let size = tools::required_integer_param(&param, "size")? as u32;
181 let encoded_size = tools::required_integer_param(&param, "encoded-size")? as u32;
f9578f3c 182
ad51d02a
DM
183 let digest_str = tools::required_string_param(&param, "digest")?;
184 let digest = proxmox::tools::hex_to_digest(digest_str)?;
21ee7912 185
ad51d02a 186 let env: &BackupEnvironment = rpcenv.as_ref();
21ee7912 187
ad51d02a
DM
188 let (digest, size, compressed_size, is_duplicate) =
189 UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size)
190 .await?;
f9578f3c 191
ad51d02a
DM
192 env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
193 let digest_str = proxmox::tools::digest_to_hex(&digest);
194 env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
f9578f3c 195
ad51d02a
DM
196 let result = Ok(json!(digest_str));
197 Ok(env.format_response(result))
198 }.boxed()
21ee7912 199}
adec8ea2 200
255f378a 201pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
329d40b5 202 &ApiHandler::AsyncHttp(&upload_speedtest),
255f378a
DM
203 &ObjectSchema::new("Test upload speed.", &[])
204);
adec8ea2
DM
205
206fn upload_speedtest(
207 _parts: Parts,
208 req_body: Body,
82ab7230 209 _param: Value,
255f378a 210 _info: &ApiMethod,
dd5495d6 211 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 212) -> ApiResponseFuture {
ad51d02a
DM
213
214 async move {
215
216 let result = req_body
217 .map_err(Error::from)
218 .try_fold(0, |size: usize, chunk| {
219 let sum = size + chunk.len();
220 //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
221 future::ok::<usize, Error>(sum)
222 })
223 .await;
224
225 match result {
226 Ok(size) => {
227 println!("UPLOAD END {} bytes", size);
adec8ea2 228 }
ad51d02a
DM
229 Err(err) => {
230 println!("Upload error: {}", err);
231 }
232 }
233 let env: &BackupEnvironment = rpcenv.as_ref();
234 Ok(env.format_response(Ok(Value::Null)))
235 }.boxed()
adec8ea2 236}
39d6846e 237
552c2259 238#[sortable]
255f378a 239pub const API_METHOD_UPLOAD_BLOB: ApiMethod = ApiMethod::new(
329d40b5 240 &ApiHandler::AsyncHttp(&upload_blob),
255f378a
DM
241 &ObjectSchema::new(
242 "Upload binary blob file.",
552c2259 243 &sorted!([
255f378a
DM
244 ("file-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
245 ("encoded-size", false, &IntegerSchema::new("Encoded blob size.")
246 .minimum((std::mem::size_of::<DataBlobHeader>() as isize) +1)
247 .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize))
248 .schema()
39d6846e 249 )
552c2259 250 ]),
39d6846e 251 )
255f378a 252);
39d6846e 253
cb08ac3e 254fn upload_blob(
39d6846e
DM
255 _parts: Parts,
256 req_body: Body,
257 param: Value,
255f378a 258 _info: &ApiMethod,
dd5495d6 259 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 260) -> ApiResponseFuture {
39d6846e 261
ad51d02a
DM
262 async move {
263 let file_name = tools::required_string_param(&param, "file-name")?.to_owned();
264 let encoded_size = tools::required_integer_param(&param, "encoded-size")? as usize;
39d6846e 265
ad51d02a 266 let env: &BackupEnvironment = rpcenv.as_ref();
39d6846e 267
ad51d02a
DM
268 if !file_name.ends_with(".blob") {
269 bail!("wrong blob file extension: '{}'", file_name);
270 }
39d6846e 271
ad51d02a
DM
272 let data = req_body
273 .map_err(Error::from)
274 .try_fold(Vec::new(), |mut acc, chunk| {
275 acc.extend_from_slice(&*chunk);
276 future::ok::<_, Error>(acc)
277 })
278 .await?;
39d6846e 279
ad51d02a
DM
280 if encoded_size != data.len() {
281 bail!("got blob with unexpected length ({} != {})", encoded_size, data.len());
282 }
39d6846e 283
ad51d02a 284 env.add_blob(&file_name, data)?;
39d6846e 285
ad51d02a
DM
286 Ok(env.format_response(Ok(Value::Null)))
287 }.boxed()
39d6846e 288}