]>
Commit | Line | Data |
---|---|---|
76220055 | 1 | use std::pin::Pin; |
21ee7912 | 2 | use std::sync::Arc; |
76220055 | 3 | use std::task::{Context, Poll}; |
21ee7912 | 4 | |
f7d4e4b5 | 5 | use anyhow::{bail, format_err, Error}; |
76220055 | 6 | use futures::*; |
21ee7912 | 7 | use hyper::Body; |
76220055 | 8 | use hyper::http::request::Parts; |
21ee7912 DM |
9 | use serde_json::{json, Value}; |
10 | ||
552c2259 | 11 | use proxmox::{sortable, identity}; |
bb084b9c | 12 | use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, RpcEnvironment}; |
a2479cfa | 13 | use proxmox::api::schema::*; |
552c2259 | 14 | |
76220055 | 15 | use crate::api2::types::*; |
76220055 WB |
16 | use crate::backup::*; |
17 | use crate::tools; | |
21ee7912 DM |
18 | |
19 | use super::environment::*; | |
20 | ||
21 | pub struct UploadChunk { | |
22 | stream: Body, | |
23 | store: Arc<DataStore>, | |
f98ac774 | 24 | digest: [u8; 32], |
a09c0e38 | 25 | size: u32, |
f98ac774 DM |
26 | encoded_size: u32, |
27 | raw_data: Option<Vec<u8>>, | |
21ee7912 DM |
28 | } |
29 | ||
30 | impl UploadChunk { | |
f98ac774 DM |
31 | pub fn new(stream: Body, store: Arc<DataStore>, digest: [u8; 32], size: u32, encoded_size: u32) -> Self { |
32 | Self { stream, store, size, encoded_size, raw_data: Some(vec![]), digest } | |
21ee7912 DM |
33 | } |
34 | } | |
35 | ||
36 | impl Future for UploadChunk { | |
76220055 WB |
37 | type Output = Result<([u8; 32], u32, u32, bool), Error>; |
38 | ||
39 | fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { | |
40 | let this = self.get_mut(); | |
41 | ||
42 | let err: Error = loop { | |
43 | match ready!(Pin::new(&mut this.stream).poll_next(cx)) { | |
44 | Some(Err(err)) => return Poll::Ready(Err(Error::from(err))), | |
45 | Some(Ok(input)) => { | |
46 | if let Some(ref mut raw_data) = this.raw_data { | |
47 | if (raw_data.len() + input.len()) > (this.encoded_size as usize) { | |
48 | break format_err!("uploaded chunk is larger than announced."); | |
f98ac774 DM |
49 | } |
50 | raw_data.extend_from_slice(&input); | |
51 | } else { | |
76220055 | 52 | break format_err!("poll upload chunk stream failed - already finished."); |
21ee7912 | 53 | } |
21ee7912 DM |
54 | } |
55 | None => { | |
76220055 WB |
56 | if let Some(raw_data) = this.raw_data.take() { |
57 | if raw_data.len() != (this.encoded_size as usize) { | |
58 | break format_err!("uploaded chunk has unexpected size."); | |
f98ac774 DM |
59 | } |
60 | ||
9ea4bce4 | 61 | let (is_duplicate, compressed_size) = match proxmox::try_block! { |
4ee8f53d | 62 | let mut chunk = DataBlob::from_raw(raw_data)?; |
21ee7912 | 63 | |
4ee8f53d | 64 | chunk.verify_unencrypted(this.size as usize, &this.digest)?; |
fa148dbd | 65 | |
76220055 WB |
66 | // always comput CRC at server side |
67 | chunk.set_crc(chunk.compute_crc()); | |
6f083b7a | 68 | |
4ee8f53d | 69 | this.store.insert_chunk(&chunk, &this.digest) |
76220055 WB |
70 | } { |
71 | Ok(res) => res, | |
72 | Err(err) => break err, | |
73 | }; | |
21ee7912 | 74 | |
76220055 | 75 | return Poll::Ready(Ok((this.digest, this.size, compressed_size as u32, is_duplicate))) |
f98ac774 | 76 | } else { |
76220055 | 77 | break format_err!("poll upload chunk stream failed - already finished."); |
f98ac774 | 78 | } |
21ee7912 DM |
79 | } |
80 | } | |
76220055 WB |
81 | }; |
82 | Poll::Ready(Err(err)) | |
21ee7912 DM |
83 | } |
84 | } | |
85 | ||
552c2259 | 86 | #[sortable] |
255f378a | 87 | pub const API_METHOD_UPLOAD_FIXED_CHUNK: ApiMethod = ApiMethod::new( |
329d40b5 | 88 | &ApiHandler::AsyncHttp(&upload_fixed_chunk), |
255f378a DM |
89 | &ObjectSchema::new( |
90 | "Upload a new chunk.", | |
552c2259 | 91 | &sorted!([ |
255f378a DM |
92 | ("wid", false, &IntegerSchema::new("Fixed writer ID.") |
93 | .minimum(1) | |
94 | .maximum(256) | |
95 | .schema() | |
96 | ), | |
97 | ("digest", false, &CHUNK_DIGEST_SCHEMA), | |
98 | ("size", false, &IntegerSchema::new("Chunk size.") | |
99 | .minimum(1) | |
100 | .maximum(1024*1024*16) | |
101 | .schema() | |
102 | ), | |
103 | ("encoded-size", false, &IntegerSchema::new("Encoded chunk size.") | |
104 | .minimum((std::mem::size_of::<DataBlobHeader>() as isize)+1) | |
105 | .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize)) | |
106 | .schema() | |
107 | ), | |
552c2259 | 108 | ]), |
642322b4 | 109 | ) |
255f378a | 110 | ); |
642322b4 DM |
111 | |
112 | fn upload_fixed_chunk( | |
113 | _parts: Parts, | |
114 | req_body: Body, | |
115 | param: Value, | |
255f378a | 116 | _info: &ApiMethod, |
dd5495d6 | 117 | rpcenv: Box<dyn RpcEnvironment>, |
bb084b9c | 118 | ) -> ApiResponseFuture { |
642322b4 | 119 | |
ad51d02a DM |
120 | async move { |
121 | let wid = tools::required_integer_param(¶m, "wid")? as usize; | |
122 | let size = tools::required_integer_param(¶m, "size")? as u32; | |
123 | let encoded_size = tools::required_integer_param(¶m, "encoded-size")? as u32; | |
f98ac774 | 124 | |
ad51d02a DM |
125 | let digest_str = tools::required_string_param(¶m, "digest")?; |
126 | let digest = proxmox::tools::hex_to_digest(digest_str)?; | |
642322b4 | 127 | |
ad51d02a | 128 | let env: &BackupEnvironment = rpcenv.as_ref(); |
642322b4 | 129 | |
ad51d02a DM |
130 | let (digest, size, compressed_size, is_duplicate) = |
131 | UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?; | |
642322b4 | 132 | |
ad51d02a DM |
133 | env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?; |
134 | let digest_str = proxmox::tools::digest_to_hex(&digest); | |
135 | env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str)); | |
642322b4 | 136 | |
ad51d02a | 137 | let result = Ok(json!(digest_str)); |
642322b4 | 138 | |
ad51d02a DM |
139 | Ok(env.format_response(result)) |
140 | } | |
141 | .boxed() | |
642322b4 DM |
142 | } |
143 | ||
552c2259 | 144 | #[sortable] |
255f378a | 145 | pub const API_METHOD_UPLOAD_DYNAMIC_CHUNK: ApiMethod = ApiMethod::new( |
329d40b5 | 146 | &ApiHandler::AsyncHttp(&upload_dynamic_chunk), |
255f378a DM |
147 | &ObjectSchema::new( |
148 | "Upload a new chunk.", | |
552c2259 | 149 | &sorted!([ |
255f378a DM |
150 | ("wid", false, &IntegerSchema::new("Dynamic writer ID.") |
151 | .minimum(1) | |
152 | .maximum(256) | |
153 | .schema() | |
154 | ), | |
155 | ("digest", false, &CHUNK_DIGEST_SCHEMA), | |
156 | ("size", false, &IntegerSchema::new("Chunk size.") | |
157 | .minimum(1) | |
158 | .maximum(1024*1024*16) | |
159 | .schema() | |
160 | ), | |
161 | ("encoded-size", false, &IntegerSchema::new("Encoded chunk size.") | |
162 | .minimum((std::mem::size_of::<DataBlobHeader>() as isize) +1) | |
163 | .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize)) | |
164 | .schema() | |
165 | ), | |
552c2259 | 166 | ]), |
21ee7912 | 167 | ) |
255f378a | 168 | ); |
21ee7912 | 169 | |
642322b4 | 170 | fn upload_dynamic_chunk( |
21ee7912 DM |
171 | _parts: Parts, |
172 | req_body: Body, | |
173 | param: Value, | |
255f378a | 174 | _info: &ApiMethod, |
dd5495d6 | 175 | rpcenv: Box<dyn RpcEnvironment>, |
bb084b9c | 176 | ) -> ApiResponseFuture { |
f98ac774 | 177 | |
ad51d02a DM |
178 | async move { |
179 | let wid = tools::required_integer_param(¶m, "wid")? as usize; | |
180 | let size = tools::required_integer_param(¶m, "size")? as u32; | |
181 | let encoded_size = tools::required_integer_param(¶m, "encoded-size")? as u32; | |
f9578f3c | 182 | |
ad51d02a DM |
183 | let digest_str = tools::required_string_param(¶m, "digest")?; |
184 | let digest = proxmox::tools::hex_to_digest(digest_str)?; | |
21ee7912 | 185 | |
ad51d02a | 186 | let env: &BackupEnvironment = rpcenv.as_ref(); |
21ee7912 | 187 | |
ad51d02a DM |
188 | let (digest, size, compressed_size, is_duplicate) = |
189 | UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size) | |
190 | .await?; | |
f9578f3c | 191 | |
ad51d02a DM |
192 | env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?; |
193 | let digest_str = proxmox::tools::digest_to_hex(&digest); | |
194 | env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str)); | |
f9578f3c | 195 | |
ad51d02a DM |
196 | let result = Ok(json!(digest_str)); |
197 | Ok(env.format_response(result)) | |
198 | }.boxed() | |
21ee7912 | 199 | } |
adec8ea2 | 200 | |
255f378a | 201 | pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new( |
329d40b5 | 202 | &ApiHandler::AsyncHttp(&upload_speedtest), |
255f378a DM |
203 | &ObjectSchema::new("Test upload speed.", &[]) |
204 | ); | |
adec8ea2 DM |
205 | |
206 | fn upload_speedtest( | |
207 | _parts: Parts, | |
208 | req_body: Body, | |
82ab7230 | 209 | _param: Value, |
255f378a | 210 | _info: &ApiMethod, |
dd5495d6 | 211 | rpcenv: Box<dyn RpcEnvironment>, |
bb084b9c | 212 | ) -> ApiResponseFuture { |
ad51d02a DM |
213 | |
214 | async move { | |
215 | ||
216 | let result = req_body | |
217 | .map_err(Error::from) | |
218 | .try_fold(0, |size: usize, chunk| { | |
219 | let sum = size + chunk.len(); | |
220 | //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum); | |
221 | future::ok::<usize, Error>(sum) | |
222 | }) | |
223 | .await; | |
224 | ||
225 | match result { | |
226 | Ok(size) => { | |
227 | println!("UPLOAD END {} bytes", size); | |
adec8ea2 | 228 | } |
ad51d02a DM |
229 | Err(err) => { |
230 | println!("Upload error: {}", err); | |
231 | } | |
232 | } | |
233 | let env: &BackupEnvironment = rpcenv.as_ref(); | |
234 | Ok(env.format_response(Ok(Value::Null))) | |
235 | }.boxed() | |
adec8ea2 | 236 | } |
39d6846e | 237 | |
552c2259 | 238 | #[sortable] |
255f378a | 239 | pub const API_METHOD_UPLOAD_BLOB: ApiMethod = ApiMethod::new( |
329d40b5 | 240 | &ApiHandler::AsyncHttp(&upload_blob), |
255f378a DM |
241 | &ObjectSchema::new( |
242 | "Upload binary blob file.", | |
552c2259 | 243 | &sorted!([ |
255f378a DM |
244 | ("file-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA), |
245 | ("encoded-size", false, &IntegerSchema::new("Encoded blob size.") | |
246 | .minimum((std::mem::size_of::<DataBlobHeader>() as isize) +1) | |
247 | .maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize)) | |
248 | .schema() | |
39d6846e | 249 | ) |
552c2259 | 250 | ]), |
39d6846e | 251 | ) |
255f378a | 252 | ); |
39d6846e | 253 | |
cb08ac3e | 254 | fn upload_blob( |
39d6846e DM |
255 | _parts: Parts, |
256 | req_body: Body, | |
257 | param: Value, | |
255f378a | 258 | _info: &ApiMethod, |
dd5495d6 | 259 | rpcenv: Box<dyn RpcEnvironment>, |
bb084b9c | 260 | ) -> ApiResponseFuture { |
39d6846e | 261 | |
ad51d02a DM |
262 | async move { |
263 | let file_name = tools::required_string_param(¶m, "file-name")?.to_owned(); | |
264 | let encoded_size = tools::required_integer_param(¶m, "encoded-size")? as usize; | |
39d6846e | 265 | |
ad51d02a | 266 | let env: &BackupEnvironment = rpcenv.as_ref(); |
39d6846e | 267 | |
ad51d02a DM |
268 | if !file_name.ends_with(".blob") { |
269 | bail!("wrong blob file extension: '{}'", file_name); | |
270 | } | |
39d6846e | 271 | |
ad51d02a DM |
272 | let data = req_body |
273 | .map_err(Error::from) | |
274 | .try_fold(Vec::new(), |mut acc, chunk| { | |
275 | acc.extend_from_slice(&*chunk); | |
276 | future::ok::<_, Error>(acc) | |
277 | }) | |
278 | .await?; | |
39d6846e | 279 | |
ad51d02a DM |
280 | if encoded_size != data.len() { |
281 | bail!("got blob with unexpected length ({} != {})", encoded_size, data.len()); | |
282 | } | |
39d6846e | 283 | |
ad51d02a | 284 | env.add_blob(&file_name, data)?; |
39d6846e | 285 | |
ad51d02a DM |
286 | Ok(env.format_response(Ok(Value::Null))) |
287 | }.boxed() | |
39d6846e | 288 | } |