]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/admin/datastore/backup.rs
src/api2/admin/datastore/backup/upload_chunk.rs: rename upload_dynamic_chunk into...
[proxmox-backup.git] / src / api2 / admin / datastore / backup.rs
CommitLineData
152764ec
DM
1use failure::*;
2
72375ce6 3use std::sync::Arc;
92ac375a
DM
4
5use futures::*;
152764ec 6use hyper::header::{HeaderValue, UPGRADE};
bd1507c4 7use hyper::{Body, Response, StatusCode};
152764ec 8use hyper::http::request::Parts;
0aadd40b 9use chrono::{Local, TimeZone};
152764ec 10
f9578f3c 11use serde_json::{json, Value};
152764ec 12
92ac375a 13use crate::tools;
d3611366 14use crate::tools::wrapped_reader_stream::*;
152764ec
DM
15use crate::api_schema::router::*;
16use crate::api_schema::*;
58c8d7d9 17use crate::server::WorkerTask;
21ee7912 18use crate::backup::*;
152764ec 19
d95ced64
DM
20mod environment;
21use environment::*;
22
bd1507c4
DM
23mod service;
24use service::*;
25
21ee7912
DM
26mod upload_chunk;
27use upload_chunk::*;
28
29
ca60c371 30pub fn api_method_upgrade_backup() -> ApiAsyncMethod {
152764ec 31 ApiAsyncMethod::new(
0aadd40b 32 upgrade_to_backup_protocol,
ca60c371 33 ObjectSchema::new("Upgraded to backup protocol.")
0aadd40b
DM
34 .required("store", StringSchema::new("Datastore name."))
35 .required("backup-type", StringSchema::new("Backup type.")
cc84a830 36 .format(Arc::new(ApiStringFormat::Enum(&["vm", "ct", "host"]))))
0aadd40b 37 .required("backup-id", StringSchema::new("Backup ID."))
152764ec
DM
38 )
39}
40
0aadd40b 41fn upgrade_to_backup_protocol(
152764ec
DM
42 parts: Parts,
43 req_body: Body,
0aadd40b 44 param: Value,
152764ec 45 _info: &ApiAsyncMethod,
b4b63e52 46 rpcenv: Box<RpcEnvironment>,
152764ec 47) -> Result<BoxFut, Error> {
0aadd40b 48
bd1507c4
DM
49 static PROXMOX_BACKUP_PROTOCOL_ID: &str = "proxmox-backup-protocol-h2";
50
bb105f9d
DM
51 let store = tools::required_string_param(&param, "store")?.to_owned();
52 let datastore = DataStore::lookup_datastore(&store)?;
21ee7912 53
0aadd40b
DM
54 let backup_type = tools::required_string_param(&param, "backup-type")?;
55 let backup_id = tools::required_string_param(&param, "backup-id")?;
f9578f3c 56 let backup_time = Local.timestamp(Local::now().timestamp(), 0);
152764ec
DM
57
58 let protocols = parts
59 .headers
60 .get("UPGRADE")
61 .ok_or_else(|| format_err!("missing Upgrade header"))?
62 .to_str()?;
63
0aadd40b 64 if protocols != PROXMOX_BACKUP_PROTOCOL_ID {
152764ec
DM
65 bail!("invalid protocol name");
66 }
67
96e95fc1
DM
68 if parts.version >= http::version::Version::HTTP_2 {
69 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
70 }
71
0aadd40b 72 let worker_id = format!("{}_{}_{}", store, backup_type, backup_id);
d9bd06ea 73
58c8d7d9
DM
74 let username = rpcenv.get_user().unwrap();
75 let env_type = rpcenv.env_type();
92ac375a 76
51a4f63f 77 let backup_group = BackupGroup::new(backup_type, backup_id);
fbb798f6 78 let last_backup = BackupInfo::last_backup(&datastore.base_path(), &backup_group).unwrap_or(None);
51a4f63f 79 let backup_dir = BackupDir::new_with_group(backup_group, backup_time.timestamp());
f9578f3c 80
bb105f9d 81 let (path, is_new) = datastore.create_backup_dir(&backup_dir)?;
f9578f3c
DM
82 if !is_new { bail!("backup directorty already exists."); }
83
0aadd40b 84 WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| {
bb105f9d 85 let mut env = BackupEnvironment::new(
6b95c7df 86 env_type, username.clone(), worker.clone(), datastore, backup_dir);
b02a52e3 87
bb105f9d 88 env.last_backup = last_backup;
b02a52e3 89
bb105f9d
DM
90 env.log(format!("starting new backup on datastore '{}': {:?}", store, path));
91
372724af 92 let service = BackupService::new(env.clone(), worker.clone());
72375ce6 93
a66ab8ae
DM
94 let abort_future = worker.abort_future();
95
372724af 96 let env2 = env.clone();
bb105f9d 97
152764ec
DM
98 req_body
99 .on_upgrade()
92ac375a 100 .map_err(Error::from)
152764ec 101 .and_then(move |conn| {
d9bd06ea 102 worker.log("upgrade done");
92ac375a
DM
103
104 let mut http = hyper::server::conn::Http::new();
105 http.http2_only(true);
adec8ea2 106 // increase window size: todo - find optiomal size
771953f9
DM
107 let window_size = 32*1024*1024; // max = (1 << 31) - 2
108 http.http2_initial_stream_window_size(window_size);
109 http.http2_initial_connection_window_size(window_size);
92ac375a 110
d9bd06ea
DM
111 http.serve_connection(conn, service)
112 .map_err(Error::from)
0aadd40b 113 })
a66ab8ae 114 .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); }))
372724af 115 .map_err(|(err, _)| err)
bb105f9d 116 .and_then(move |(_result, _)| {
372724af
DM
117 env.ensure_finished()?;
118 env.log("backup finished sucessfully");
bb105f9d
DM
119 Ok(())
120 })
372724af
DM
121 .then(move |result| {
122 if let Err(err) = result {
a9584932
DM
123 match env2.ensure_finished() {
124 Ok(()) => {}, // ignorte error after finish
125 _ => {
126 env2.log(format!("backup failed: {}", err));
127 env2.log("removing failed backup");
128 env2.remove_backup()?;
129 return Err(err);
130 }
131 }
372724af
DM
132 }
133 Ok(())
bb105f9d 134 })
090ac9f7
DM
135 })?;
136
137 let response = Response::builder()
138 .status(StatusCode::SWITCHING_PROTOCOLS)
0aadd40b 139 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID))
090ac9f7
DM
140 .body(Body::empty())?;
141
142 Ok(Box::new(futures::future::ok(response)))
152764ec 143}
92ac375a
DM
144
145fn backup_api() -> Router {
146
92ac375a 147 let router = Router::new()
f9578f3c 148 .subdir(
a1e7cff3
DM
149 "upload_chunk", Router::new()
150 .upload(api_method_upload_chunk())
f9578f3c
DM
151 )
152 .subdir(
153 "dynamic_index", Router::new()
d3611366 154 .download(api_method_dynamic_chunk_index())
f9578f3c 155 .post(api_method_create_dynamic_index())
82ab7230 156 .put(api_method_dynamic_append())
f9578f3c 157 )
a2077252
DM
158 .subdir(
159 "dynamic_close", Router::new()
160 .post(api_method_close_dynamic_index())
161 )
372724af
DM
162 .subdir(
163 "finish", Router::new()
a55fcd74 164 .post(
372724af
DM
165 ApiMethod::new(
166 finish_backup,
167 ObjectSchema::new("Mark backup as finished.")
168 )
169 )
170 )
adec8ea2
DM
171 .subdir(
172 "speedtest", Router::new()
173 .upload(api_method_upload_speedtest())
174 )
92ac375a
DM
175 .list_subdirs();
176
177 router
178}
179
d3611366
DM
180pub fn api_method_dynamic_chunk_index() -> ApiAsyncMethod {
181 ApiAsyncMethod::new(
182 dynamic_chunk_index,
183 ObjectSchema::new(r###"
184Download the dynamic chunk index from the previous backup.
185Simply returns an empty list if this is the first backup.
186"###
187 )
4e93f8c1 188 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
d3611366
DM
189 )
190}
191
f9578f3c
DM
192pub fn api_method_create_dynamic_index() -> ApiMethod {
193 ApiMethod::new(
194 create_dynamic_index,
195 ObjectSchema::new("Create dynamic chunk index file.")
4e93f8c1 196 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
f9578f3c
DM
197 )
198}
199
200fn create_dynamic_index(
201 param: Value,
202 _info: &ApiMethod,
203 rpcenv: &mut RpcEnvironment,
204) -> Result<Value, Error> {
205
206 let env: &BackupEnvironment = rpcenv.as_ref();
f9578f3c 207
8bea85b4 208 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
f9578f3c 209
8bea85b4 210 let mut archive_name = name.clone();
f9578f3c
DM
211 if !archive_name.ends_with(".pxar") {
212 bail!("wrong archive extension");
213 } else {
214 archive_name.push_str(".didx");
215 }
216
6b95c7df 217 let mut path = env.backup_dir.relative_path();
f9578f3c
DM
218 path.push(archive_name);
219
220 let chunk_size = 4096*1024; // todo: ??
221
bb105f9d 222 let index = env.datastore.create_dynamic_writer(&path, chunk_size)?;
8bea85b4 223 let wid = env.register_dynamic_writer(index, name)?;
f9578f3c 224
bb105f9d 225 env.log(format!("created new dynamic index {} ({:?})", wid, path));
f9578f3c 226
bb105f9d 227 Ok(json!(wid))
f9578f3c
DM
228}
229
82ab7230
DM
230pub fn api_method_dynamic_append() -> ApiMethod {
231 ApiMethod::new(
232 dynamic_append,
233 ObjectSchema::new("Append chunk to dynamic index writer.")
417cb073
DM
234 .required("wid", IntegerSchema::new("Dynamic writer ID.")
235 .minimum(1)
236 .maximum(256)
237 )
aa1b2e04
DM
238 .required("digest-list", ArraySchema::new(
239 "Chunk digest list.",
240 StringSchema::new("Chunk digest.").into())
241 )
417cb073
DM
242 .required("offset-list", ArraySchema::new(
243 "Chunk offset list.",
244 IntegerSchema::new("Corresponding chunk offsets.")
245 .minimum(0)
246 .into())
82ab7230
DM
247 )
248 )
249}
250
251fn dynamic_append (
252 param: Value,
253 _info: &ApiMethod,
254 rpcenv: &mut RpcEnvironment,
255) -> Result<Value, Error> {
256
257 let wid = tools::required_integer_param(&param, "wid")? as usize;
aa1b2e04 258 let digest_list = tools::required_array_param(&param, "digest-list")?;
417cb073 259 let offset_list = tools::required_array_param(&param, "offset-list")?;
aa1b2e04
DM
260
261 println!("DIGEST LIST LEN {}", digest_list.len());
82ab7230 262
417cb073
DM
263 if offset_list.len() != digest_list.len() {
264 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
265 }
266
82ab7230
DM
267 let env: &BackupEnvironment = rpcenv.as_ref();
268
417cb073 269 for (i, item) in digest_list.iter().enumerate() {
aa1b2e04
DM
270 let digest_str = item.as_str().unwrap();
271 let digest = crate::tools::hex_to_digest(digest_str)?;
417cb073 272 let offset = offset_list[i].as_u64().unwrap();
aa1b2e04 273 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
417cb073 274 env.dynamic_writer_append_chunk(wid, offset, size, &digest)?;
82ab7230 275
aa1b2e04
DM
276 env.log(format!("sucessfully added chunk {} to dynamic index {}", digest_str, wid));
277 }
82ab7230
DM
278
279 Ok(Value::Null)
280}
281
a2077252
DM
282pub fn api_method_close_dynamic_index() -> ApiMethod {
283 ApiMethod::new(
284 close_dynamic_index,
285 ObjectSchema::new("Close dynamic index writer.")
286 .required("wid", IntegerSchema::new("Dynamic writer ID.")
287 .minimum(1)
288 .maximum(256)
289 )
8bea85b4
DM
290 .required("chunk-count", IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
291 .minimum(1)
292 )
293 .required("size", IntegerSchema::new("File size. This is used to verify that the server got all data.")
294 .minimum(1)
295 )
a2077252
DM
296 )
297}
298
299fn close_dynamic_index (
300 param: Value,
301 _info: &ApiMethod,
302 rpcenv: &mut RpcEnvironment,
303) -> Result<Value, Error> {
304
305 let wid = tools::required_integer_param(&param, "wid")? as usize;
8bea85b4
DM
306 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
307 let size = tools::required_integer_param(&param, "size")? as u64;
a2077252
DM
308
309 let env: &BackupEnvironment = rpcenv.as_ref();
310
8bea85b4 311 env.dynamic_writer_close(wid, chunk_count, size)?;
a2077252 312
bb105f9d
DM
313 env.log(format!("sucessfully closed dynamic index {}", wid));
314
a2077252
DM
315 Ok(Value::Null)
316}
317
318
372724af
DM
319fn finish_backup (
320 _param: Value,
321 _info: &ApiMethod,
322 rpcenv: &mut RpcEnvironment,
323) -> Result<Value, Error> {
324
325 let env: &BackupEnvironment = rpcenv.as_ref();
326
327 env.finish_backup()?;
328
329 Ok(Value::Null)
330}
a2077252 331
d3611366
DM
332fn dynamic_chunk_index(
333 _parts: Parts,
334 _req_body: Body,
335 param: Value,
336 _info: &ApiAsyncMethod,
337 rpcenv: Box<RpcEnvironment>,
338) -> Result<BoxFut, Error> {
339
340 let env: &BackupEnvironment = rpcenv.as_ref();
d3611366 341
a9584932
DM
342 println!("TEST CHUNK DOWNLOAD");
343
d3611366
DM
344 let mut archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
345
346 if !archive_name.ends_with(".pxar") {
347 bail!("wrong archive extension");
348 } else {
349 archive_name.push_str(".didx");
350 }
351
a9584932
DM
352 let empty_response = {
353 Response::builder()
354 .status(StatusCode::OK)
355 .body(Body::empty())?
356 };
357
d3611366
DM
358 let last_backup = match &env.last_backup {
359 Some(info) => info,
a9584932 360 None => return Ok(Box::new(future::ok(empty_response))),
d3611366
DM
361 };
362
363 let mut path = last_backup.backup_dir.relative_path();
a9584932 364 path.push(&archive_name);
d3611366 365
a9584932
DM
366 let index = match env.datastore.open_dynamic_reader(path) {
367 Ok(index) => index,
368 Err(_) => {
369 env.log(format!("there is no last backup for archive '{}'", archive_name));
370 return Ok(Box::new(future::ok(empty_response)));
371 }
372 };
373
374 let count = index.index_count();
375 for pos in 0..count {
376 let (start, end, digest) = index.chunk_info(pos)?;
377 let size = (end - start) as u32;
378 env.register_chunk(digest, size)?;
379 }
d3611366 380
7f3d2ffa 381 let reader = DigestListEncoder::new(Box::new(index));
d3611366
DM
382
383 let stream = WrappedReaderStream::new(reader);
384
385 // fixme: set size, content type?
386 let response = http::Response::builder()
387 .status(200)
388 .body(Body::wrap_stream(stream))?;
389
390 Ok(Box::new(future::ok(response)))
391}