]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup/upload_chunk.rs
update to proxmox-sys 0.2 crate
[proxmox-backup.git] / src / api2 / backup / upload_chunk.rs
1 use std::pin::Pin;
2 use std::sync::Arc;
3 use std::task::{Context, Poll};
4
5 use anyhow::{bail, format_err, Error};
6 use futures::*;
7 use hyper::Body;
8 use hyper::http::request::Parts;
9 use serde_json::{json, Value};
10 use hex::FromHex;
11
12 use proxmox_sys::{sortable, identity};
13 use proxmox_router::{ApiResponseFuture, ApiHandler, ApiMethod, RpcEnvironment};
14 use proxmox_schema::*;
15
16 use pbs_datastore::{DataStore, DataBlob};
17 use pbs_datastore::file_formats::{DataBlobHeader, EncryptedDataBlobHeader};
18 use pbs_tools::json::{required_integer_param, required_string_param};
19 use pbs_api_types::{CHUNK_DIGEST_SCHEMA, BACKUP_ARCHIVE_NAME_SCHEMA};
20
21 use super::environment::*;
22
23 pub struct UploadChunk {
24 stream: Body,
25 store: Arc<DataStore>,
26 digest: [u8; 32],
27 size: u32,
28 encoded_size: u32,
29 raw_data: Option<Vec<u8>>,
30 }
31
32 impl UploadChunk {
33 pub fn new(stream: Body, store: Arc<DataStore>, digest: [u8; 32], size: u32, encoded_size: u32) -> Self {
34 Self { stream, store, size, encoded_size, raw_data: Some(vec![]), digest }
35 }
36 }
37
38 impl Future for UploadChunk {
39 type Output = Result<([u8; 32], u32, u32, bool), Error>;
40
41 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
42 let this = self.get_mut();
43
44 let err: Error = loop {
45 match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
46 Some(Err(err)) => return Poll::Ready(Err(Error::from(err))),
47 Some(Ok(input)) => {
48 if let Some(ref mut raw_data) = this.raw_data {
49 if (raw_data.len() + input.len()) > (this.encoded_size as usize) {
50 break format_err!("uploaded chunk is larger than announced.");
51 }
52 raw_data.extend_from_slice(&input);
53 } else {
54 break format_err!("poll upload chunk stream failed - already finished.");
55 }
56 }
57 None => {
58 if let Some(raw_data) = this.raw_data.take() {
59 if raw_data.len() != (this.encoded_size as usize) {
60 break format_err!("uploaded chunk has unexpected size.");
61 }
62
63 let (is_duplicate, compressed_size) = match proxmox_lang::try_block! {
64 let mut chunk = DataBlob::from_raw(raw_data)?;
65
66 proxmox_async::runtime::block_in_place(|| {
67 chunk.verify_unencrypted(this.size as usize, &this.digest)?;
68
69 // always comput CRC at server side
70 chunk.set_crc(chunk.compute_crc());
71
72 this.store.insert_chunk(&chunk, &this.digest)
73 })
74
75 } {
76 Ok(res) => res,
77 Err(err) => break err,
78 };
79
80 return Poll::Ready(Ok((this.digest, this.size, compressed_size as u32, is_duplicate)))
81 } else {
82 break format_err!("poll upload chunk stream failed - already finished.");
83 }
84 }
85 }
86 };
87 Poll::Ready(Err(err))
88 }
89 }
90
91 #[sortable]
92 pub const API_METHOD_UPLOAD_FIXED_CHUNK: ApiMethod = ApiMethod::new(
93 &ApiHandler::AsyncHttp(&upload_fixed_chunk),
94 &ObjectSchema::new(
95 "Upload a new chunk.",
96 &sorted!([
97 ("wid", false, &IntegerSchema::new("Fixed writer ID.")
98 .minimum(1)
99 .maximum(256)
100 .schema()
101 ),
102 ("digest", false, &CHUNK_DIGEST_SCHEMA),
103 ("size", false, &IntegerSchema::new("Chunk size.")
104 .minimum(1)
105 .maximum(1024*1024*16)
106 .schema()
107 ),
108 ("encoded-size", false, &IntegerSchema::new("Encoded chunk size.")
109 .minimum((std::mem::size_of::<DataBlobHeader>() as isize)+1)
110 .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize))
111 .schema()
112 ),
113 ]),
114 )
115 );
116
117 fn upload_fixed_chunk(
118 _parts: Parts,
119 req_body: Body,
120 param: Value,
121 _info: &ApiMethod,
122 rpcenv: Box<dyn RpcEnvironment>,
123 ) -> ApiResponseFuture {
124
125 async move {
126 let wid = required_integer_param(&param, "wid")? as usize;
127 let size = required_integer_param(&param, "size")? as u32;
128 let encoded_size = required_integer_param(&param, "encoded-size")? as u32;
129
130 let digest_str = required_string_param(&param, "digest")?;
131 let digest = <[u8; 32]>::from_hex(digest_str)?;
132
133 let env: &BackupEnvironment = rpcenv.as_ref();
134
135 let (digest, size, compressed_size, is_duplicate) =
136 UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
137
138 env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
139 let digest_str = hex::encode(&digest);
140 env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
141
142 let result = Ok(json!(digest_str));
143
144 Ok(env.format_response(result))
145 }
146 .boxed()
147 }
148
149 #[sortable]
150 pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK: ApiMethod = ApiMethod::new(
151 &ApiHandler::AsyncHttp(&upload_dynamic_chunk),
152 &ObjectSchema::new(
153 "Upload a new chunk.",
154 &sorted!([
155 ("wid", false, &IntegerSchema::new("Dynamic writer ID.")
156 .minimum(1)
157 .maximum(256)
158 .schema()
159 ),
160 ("digest", false, &CHUNK_DIGEST_SCHEMA),
161 ("size", false, &IntegerSchema::new("Chunk size.")
162 .minimum(1)
163 .maximum(1024*1024*16)
164 .schema()
165 ),
166 ("encoded-size", false, &IntegerSchema::new("Encoded chunk size.")
167 .minimum((std::mem::size_of::<DataBlobHeader>() as isize) +1)
168 .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize))
169 .schema()
170 ),
171 ]),
172 )
173 );
174
175 fn upload_dynamic_chunk(
176 _parts: Parts,
177 req_body: Body,
178 param: Value,
179 _info: &ApiMethod,
180 rpcenv: Box<dyn RpcEnvironment>,
181 ) -> ApiResponseFuture {
182
183 async move {
184 let wid = required_integer_param(&param, "wid")? as usize;
185 let size = required_integer_param(&param, "size")? as u32;
186 let encoded_size = required_integer_param(&param, "encoded-size")? as u32;
187
188 let digest_str = required_string_param(&param, "digest")?;
189 let digest = <[u8; 32]>::from_hex(digest_str)?;
190
191 let env: &BackupEnvironment = rpcenv.as_ref();
192
193 let (digest, size, compressed_size, is_duplicate) =
194 UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size)
195 .await?;
196
197 env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
198 let digest_str = hex::encode(&digest);
199 env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
200
201 let result = Ok(json!(digest_str));
202 Ok(env.format_response(result))
203 }.boxed()
204 }
205
206 pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
207 &ApiHandler::AsyncHttp(&upload_speedtest),
208 &ObjectSchema::new("Test upload speed.", &[])
209 );
210
211 fn upload_speedtest(
212 _parts: Parts,
213 req_body: Body,
214 _param: Value,
215 _info: &ApiMethod,
216 rpcenv: Box<dyn RpcEnvironment>,
217 ) -> ApiResponseFuture {
218
219 async move {
220
221 let result = req_body
222 .map_err(Error::from)
223 .try_fold(0, |size: usize, chunk| {
224 let sum = size + chunk.len();
225 //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
226 future::ok::<usize, Error>(sum)
227 })
228 .await;
229
230 match result {
231 Ok(size) => {
232 println!("UPLOAD END {} bytes", size);
233 }
234 Err(err) => {
235 println!("Upload error: {}", err);
236 }
237 }
238 let env: &BackupEnvironment = rpcenv.as_ref();
239 Ok(env.format_response(Ok(Value::Null)))
240 }.boxed()
241 }
242
243 #[sortable]
244 pub const API_METHOD_UPLOAD_BLOB: ApiMethod = ApiMethod::new(
245 &ApiHandler::AsyncHttp(&upload_blob),
246 &ObjectSchema::new(
247 "Upload binary blob file.",
248 &sorted!([
249 ("file-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA),
250 ("encoded-size", false, &IntegerSchema::new("Encoded blob size.")
251 .minimum(std::mem::size_of::<DataBlobHeader>() as isize)
252 .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize))
253 .schema()
254 )
255 ]),
256 )
257 );
258
259 fn upload_blob(
260 _parts: Parts,
261 req_body: Body,
262 param: Value,
263 _info: &ApiMethod,
264 rpcenv: Box<dyn RpcEnvironment>,
265 ) -> ApiResponseFuture {
266
267 async move {
268 let file_name = required_string_param(&param, "file-name")?.to_owned();
269 let encoded_size = required_integer_param(&param, "encoded-size")? as usize;
270
271 let env: &BackupEnvironment = rpcenv.as_ref();
272
273 if !file_name.ends_with(".blob") {
274 bail!("wrong blob file extension: '{}'", file_name);
275 }
276
277 let data = req_body
278 .map_err(Error::from)
279 .try_fold(Vec::new(), |mut acc, chunk| {
280 acc.extend_from_slice(&*chunk);
281 future::ok::<_, Error>(acc)
282 })
283 .await?;
284
285 if encoded_size != data.len() {
286 bail!("got blob with unexpected length ({} != {})", encoded_size, data.len());
287 }
288
289 env.add_blob(&file_name, data)?;
290
291 Ok(env.format_response(Ok(Value::Null)))
292 }.boxed()
293 }