]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup/upload_chunk.rs
move required_X_param to pbs_tools::json
[proxmox-backup.git] / src / api2 / backup / upload_chunk.rs
1 use std::pin::Pin;
2 use std::sync::Arc;
3 use std::task::{Context, Poll};
4
5 use anyhow::{bail, format_err, Error};
6 use futures::*;
7 use hyper::Body;
8 use hyper::http::request::Parts;
9 use serde_json::{json, Value};
10
11 use proxmox::{sortable, identity};
12 use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, RpcEnvironment};
13 use proxmox::api::schema::*;
14
15 use pbs_tools::json::{required_integer_param, required_string_param};
16
17 use crate::api2::types::*;
18 use crate::backup::*;
19
20 use super::environment::*;
21
22 pub struct UploadChunk {
23 stream: Body,
24 store: Arc<DataStore>,
25 digest: [u8; 32],
26 size: u32,
27 encoded_size: u32,
28 raw_data: Option<Vec<u8>>,
29 }
30
31 impl UploadChunk {
32 pub fn new(stream: Body, store: Arc<DataStore>, digest: [u8; 32], size: u32, encoded_size: u32) -> Self {
33 Self { stream, store, size, encoded_size, raw_data: Some(vec![]), digest }
34 }
35 }
36
37 impl Future for UploadChunk {
38 type Output = Result<([u8; 32], u32, u32, bool), Error>;
39
40 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
41 let this = self.get_mut();
42
43 let err: Error = loop {
44 match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
45 Some(Err(err)) => return Poll::Ready(Err(Error::from(err))),
46 Some(Ok(input)) => {
47 if let Some(ref mut raw_data) = this.raw_data {
48 if (raw_data.len() + input.len()) > (this.encoded_size as usize) {
49 break format_err!("uploaded chunk is larger than announced.");
50 }
51 raw_data.extend_from_slice(&input);
52 } else {
53 break format_err!("poll upload chunk stream failed - already finished.");
54 }
55 }
56 None => {
57 if let Some(raw_data) = this.raw_data.take() {
58 if raw_data.len() != (this.encoded_size as usize) {
59 break format_err!("uploaded chunk has unexpected size.");
60 }
61
62 let (is_duplicate, compressed_size) = match proxmox::try_block! {
63 let mut chunk = DataBlob::from_raw(raw_data)?;
64
65 pbs_runtime::block_in_place(|| {
66 chunk.verify_unencrypted(this.size as usize, &this.digest)?;
67
68 // always comput CRC at server side
69 chunk.set_crc(chunk.compute_crc());
70
71 this.store.insert_chunk(&chunk, &this.digest)
72 })
73
74 } {
75 Ok(res) => res,
76 Err(err) => break err,
77 };
78
79 return Poll::Ready(Ok((this.digest, this.size, compressed_size as u32, is_duplicate)))
80 } else {
81 break format_err!("poll upload chunk stream failed - already finished.");
82 }
83 }
84 }
85 };
86 Poll::Ready(Err(err))
87 }
88 }
89
90 #[sortable]
91 pub const API_METHOD_UPLOAD_FIXED_CHUNK: ApiMethod = ApiMethod::new(
92 &ApiHandler::AsyncHttp(&upload_fixed_chunk),
93 &ObjectSchema::new(
94 "Upload a new chunk.",
95 &sorted!([
96 ("wid", false, &IntegerSchema::new("Fixed writer ID.")
97 .minimum(1)
98 .maximum(256)
99 .schema()
100 ),
101 ("digest", false, &CHUNK_DIGEST_SCHEMA),
102 ("size", false, &IntegerSchema::new("Chunk size.")
103 .minimum(1)
104 .maximum(1024*1024*16)
105 .schema()
106 ),
107 ("encoded-size", false, &IntegerSchema::new("Encoded chunk size.")
108 .minimum((std::mem::size_of::<DataBlobHeader>() as isize)+1)
109 .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize))
110 .schema()
111 ),
112 ]),
113 )
114 );
115
116 fn upload_fixed_chunk(
117 _parts: Parts,
118 req_body: Body,
119 param: Value,
120 _info: &ApiMethod,
121 rpcenv: Box<dyn RpcEnvironment>,
122 ) -> ApiResponseFuture {
123
124 async move {
125 let wid = required_integer_param(&param, "wid")? as usize;
126 let size = required_integer_param(&param, "size")? as u32;
127 let encoded_size = required_integer_param(&param, "encoded-size")? as u32;
128
129 let digest_str = required_string_param(&param, "digest")?;
130 let digest = proxmox::tools::hex_to_digest(digest_str)?;
131
132 let env: &BackupEnvironment = rpcenv.as_ref();
133
134 let (digest, size, compressed_size, is_duplicate) =
135 UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
136
137 env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
138 let digest_str = proxmox::tools::digest_to_hex(&digest);
139 env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
140
141 let result = Ok(json!(digest_str));
142
143 Ok(env.format_response(result))
144 }
145 .boxed()
146 }
147
148 #[sortable]
149 pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK: ApiMethod = ApiMethod::new(
150 &ApiHandler::AsyncHttp(&upload_dynamic_chunk),
151 &ObjectSchema::new(
152 "Upload a new chunk.",
153 &sorted!([
154 ("wid", false, &IntegerSchema::new("Dynamic writer ID.")
155 .minimum(1)
156 .maximum(256)
157 .schema()
158 ),
159 ("digest", false, &CHUNK_DIGEST_SCHEMA),
160 ("size", false, &IntegerSchema::new("Chunk size.")
161 .minimum(1)
162 .maximum(1024*1024*16)
163 .schema()
164 ),
165 ("encoded-size", false, &IntegerSchema::new("Encoded chunk size.")
166 .minimum((std::mem::size_of::<DataBlobHeader>() as isize) +1)
167 .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize))
168 .schema()
169 ),
170 ]),
171 )
172 );
173
174 fn upload_dynamic_chunk(
175 _parts: Parts,
176 req_body: Body,
177 param: Value,
178 _info: &ApiMethod,
179 rpcenv: Box<dyn RpcEnvironment>,
180 ) -> ApiResponseFuture {
181
182 async move {
183 let wid = required_integer_param(&param, "wid")? as usize;
184 let size = required_integer_param(&param, "size")? as u32;
185 let encoded_size = required_integer_param(&param, "encoded-size")? as u32;
186
187 let digest_str = required_string_param(&param, "digest")?;
188 let digest = proxmox::tools::hex_to_digest(digest_str)?;
189
190 let env: &BackupEnvironment = rpcenv.as_ref();
191
192 let (digest, size, compressed_size, is_duplicate) =
193 UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size)
194 .await?;
195
196 env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
197 let digest_str = proxmox::tools::digest_to_hex(&digest);
198 env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
199
200 let result = Ok(json!(digest_str));
201 Ok(env.format_response(result))
202 }.boxed()
203 }
204
205 pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
206 &ApiHandler::AsyncHttp(&upload_speedtest),
207 &ObjectSchema::new("Test upload speed.", &[])
208 );
209
210 fn upload_speedtest(
211 _parts: Parts,
212 req_body: Body,
213 _param: Value,
214 _info: &ApiMethod,
215 rpcenv: Box<dyn RpcEnvironment>,
216 ) -> ApiResponseFuture {
217
218 async move {
219
220 let result = req_body
221 .map_err(Error::from)
222 .try_fold(0, |size: usize, chunk| {
223 let sum = size + chunk.len();
224 //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
225 future::ok::<usize, Error>(sum)
226 })
227 .await;
228
229 match result {
230 Ok(size) => {
231 println!("UPLOAD END {} bytes", size);
232 }
233 Err(err) => {
234 println!("Upload error: {}", err);
235 }
236 }
237 let env: &BackupEnvironment = rpcenv.as_ref();
238 Ok(env.format_response(Ok(Value::Null)))
239 }.boxed()
240 }
241
242 #[sortable]
243 pub const API_METHOD_UPLOAD_BLOB: ApiMethod = ApiMethod::new(
244 &ApiHandler::AsyncHttp(&upload_blob),
245 &ObjectSchema::new(
246 "Upload binary blob file.",
247 &sorted!([
248 ("file-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
249 ("encoded-size", false, &IntegerSchema::new("Encoded blob size.")
250 .minimum(std::mem::size_of::<DataBlobHeader>() as isize)
251 .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize))
252 .schema()
253 )
254 ]),
255 )
256 );
257
258 fn upload_blob(
259 _parts: Parts,
260 req_body: Body,
261 param: Value,
262 _info: &ApiMethod,
263 rpcenv: Box<dyn RpcEnvironment>,
264 ) -> ApiResponseFuture {
265
266 async move {
267 let file_name = required_string_param(&param, "file-name")?.to_owned();
268 let encoded_size = required_integer_param(&param, "encoded-size")? as usize;
269
270 let env: &BackupEnvironment = rpcenv.as_ref();
271
272 if !file_name.ends_with(".blob") {
273 bail!("wrong blob file extension: '{}'", file_name);
274 }
275
276 let data = req_body
277 .map_err(Error::from)
278 .try_fold(Vec::new(), |mut acc, chunk| {
279 acc.extend_from_slice(&*chunk);
280 future::ok::<_, Error>(acc)
281 })
282 .await?;
283
284 if encoded_size != data.len() {
285 bail!("got blob with unexpected length ({} != {})", encoded_size, data.len());
286 }
287
288 env.add_blob(&file_name, data)?;
289
290 Ok(env.format_response(Ok(Value::Null)))
291 }.boxed()
292 }