]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/admin/datastore/backup.rs
src/backup/dynamic_index.rs: add chunk_info method
[proxmox-backup.git] / src / api2 / admin / datastore / backup.rs
CommitLineData
152764ec
DM
1use failure::*;
2
72375ce6 3use std::sync::Arc;
92ac375a
DM
4
5use futures::*;
152764ec 6use hyper::header::{HeaderValue, UPGRADE};
bd1507c4 7use hyper::{Body, Response, StatusCode};
152764ec 8use hyper::http::request::Parts;
0aadd40b 9use chrono::{Local, TimeZone};
152764ec 10
f9578f3c 11use serde_json::{json, Value};
152764ec 12
92ac375a 13use crate::tools;
d3611366 14use crate::tools::wrapped_reader_stream::*;
152764ec
DM
15use crate::api_schema::router::*;
16use crate::api_schema::*;
58c8d7d9 17use crate::server::WorkerTask;
21ee7912 18use crate::backup::*;
152764ec 19
d95ced64
DM
20mod environment;
21use environment::*;
22
bd1507c4
DM
23mod service;
24use service::*;
25
21ee7912
DM
26mod upload_chunk;
27use upload_chunk::*;
28
29
ca60c371 30pub fn api_method_upgrade_backup() -> ApiAsyncMethod {
152764ec 31 ApiAsyncMethod::new(
0aadd40b 32 upgrade_to_backup_protocol,
ca60c371 33 ObjectSchema::new("Upgraded to backup protocol.")
0aadd40b
DM
34 .required("store", StringSchema::new("Datastore name."))
35 .required("backup-type", StringSchema::new("Backup type.")
36 .format(Arc::new(ApiStringFormat::Enum(vec!["vm".into(), "ct".into(), "host".into()]))))
37 .required("backup-id", StringSchema::new("Backup ID."))
152764ec
DM
38 )
39}
40
0aadd40b 41fn upgrade_to_backup_protocol(
152764ec
DM
42 parts: Parts,
43 req_body: Body,
0aadd40b 44 param: Value,
152764ec 45 _info: &ApiAsyncMethod,
b4b63e52 46 rpcenv: Box<RpcEnvironment>,
152764ec 47) -> Result<BoxFut, Error> {
0aadd40b 48
bd1507c4
DM
49 static PROXMOX_BACKUP_PROTOCOL_ID: &str = "proxmox-backup-protocol-h2";
50
bb105f9d
DM
51 let store = tools::required_string_param(&param, "store")?.to_owned();
52 let datastore = DataStore::lookup_datastore(&store)?;
21ee7912 53
0aadd40b
DM
54 let backup_type = tools::required_string_param(&param, "backup-type")?;
55 let backup_id = tools::required_string_param(&param, "backup-id")?;
f9578f3c 56 let backup_time = Local.timestamp(Local::now().timestamp(), 0);
152764ec
DM
57
58 let protocols = parts
59 .headers
60 .get("UPGRADE")
61 .ok_or_else(|| format_err!("missing Upgrade header"))?
62 .to_str()?;
63
0aadd40b 64 if protocols != PROXMOX_BACKUP_PROTOCOL_ID {
152764ec
DM
65 bail!("invalid protocol name");
66 }
67
96e95fc1
DM
68 if parts.version >= http::version::Version::HTTP_2 {
69 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
70 }
71
0aadd40b 72 let worker_id = format!("{}_{}_{}", store, backup_type, backup_id);
d9bd06ea 73
58c8d7d9
DM
74 let username = rpcenv.get_user().unwrap();
75 let env_type = rpcenv.env_type();
92ac375a 76
51a4f63f 77 let backup_group = BackupGroup::new(backup_type, backup_id);
fbb798f6 78 let last_backup = BackupInfo::last_backup(&datastore.base_path(), &backup_group).unwrap_or(None);
51a4f63f 79 let backup_dir = BackupDir::new_with_group(backup_group, backup_time.timestamp());
f9578f3c 80
bb105f9d 81 let (path, is_new) = datastore.create_backup_dir(&backup_dir)?;
f9578f3c
DM
82 if !is_new { bail!("backup directorty already exists."); }
83
0aadd40b 84 WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| {
bb105f9d 85 let mut env = BackupEnvironment::new(
6b95c7df 86 env_type, username.clone(), worker.clone(), datastore, backup_dir);
b02a52e3 87
bb105f9d 88 env.last_backup = last_backup;
b02a52e3 89
bb105f9d
DM
90 env.log(format!("starting new backup on datastore '{}': {:?}", store, path));
91
372724af 92 let service = BackupService::new(env.clone(), worker.clone());
72375ce6 93
a66ab8ae
DM
94 let abort_future = worker.abort_future();
95
372724af 96 let env2 = env.clone();
bb105f9d 97
152764ec
DM
98 req_body
99 .on_upgrade()
92ac375a 100 .map_err(Error::from)
152764ec 101 .and_then(move |conn| {
d9bd06ea 102 worker.log("upgrade done");
92ac375a
DM
103
104 let mut http = hyper::server::conn::Http::new();
105 http.http2_only(true);
adec8ea2
DM
106 // increase window size: todo - find optiomal size
107 http.http2_initial_stream_window_size( (1 << 31) - 2);
108 http.http2_initial_connection_window_size( (1 << 31) - 2);
92ac375a 109
d9bd06ea
DM
110 http.serve_connection(conn, service)
111 .map_err(Error::from)
0aadd40b 112 })
a66ab8ae 113 .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); }))
372724af 114 .map_err(|(err, _)| err)
bb105f9d 115 .and_then(move |(_result, _)| {
372724af
DM
116 env.ensure_finished()?;
117 env.log("backup finished sucessfully");
bb105f9d
DM
118 Ok(())
119 })
372724af
DM
120 .then(move |result| {
121 if let Err(err) = result {
122 env2.log(format!("backup failed: {}", err));
123 env2.log("removing failed backup");
124 env2.remove_backup()?;
125 return Err(err);
126 }
127 Ok(())
bb105f9d 128 })
090ac9f7
DM
129 })?;
130
131 let response = Response::builder()
132 .status(StatusCode::SWITCHING_PROTOCOLS)
0aadd40b 133 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID))
090ac9f7
DM
134 .body(Body::empty())?;
135
136 Ok(Box::new(futures::future::ok(response)))
152764ec 137}
92ac375a
DM
138
139fn backup_api() -> Router {
140
141 let test1 = Router::new()
142 .get(
143 ApiMethod::new(
144 test1_get,
52cf506e
DM
145 ObjectSchema::new("Test sync callback.")
146 )
147 );
148
149 let test2 = Router::new()
150 .download(
151 ApiAsyncMethod::new(
152 test2_get,
153 ObjectSchema::new("Test async callback.")
92ac375a
DM
154 )
155 );
156
157 let router = Router::new()
f9578f3c
DM
158 .subdir(
159 "dynamic_chunk", Router::new()
160 .upload(api_method_upload_dynamic_chunk())
161 )
162 .subdir(
163 "dynamic_index", Router::new()
d3611366 164 .download(api_method_dynamic_chunk_index())
f9578f3c 165 .post(api_method_create_dynamic_index())
82ab7230 166 .put(api_method_dynamic_append())
f9578f3c 167 )
a2077252
DM
168 .subdir(
169 "dynamic_close", Router::new()
170 .post(api_method_close_dynamic_index())
171 )
372724af
DM
172 .subdir(
173 "finish", Router::new()
174 .get(
175 ApiMethod::new(
176 finish_backup,
177 ObjectSchema::new("Mark backup as finished.")
178 )
179 )
180 )
adec8ea2
DM
181 .subdir(
182 "speedtest", Router::new()
183 .upload(api_method_upload_speedtest())
184 )
92ac375a 185 .subdir("test1", test1)
52cf506e 186 .subdir("test2", test2)
92ac375a
DM
187 .list_subdirs();
188
189 router
190}
191
d3611366
DM
192pub fn api_method_dynamic_chunk_index() -> ApiAsyncMethod {
193 ApiAsyncMethod::new(
194 dynamic_chunk_index,
195 ObjectSchema::new(r###"
196Download the dynamic chunk index from the previous backup.
197Simply returns an empty list if this is the first backup.
198"###
199 )
4e93f8c1 200 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
d3611366
DM
201 )
202}
203
f9578f3c
DM
204pub fn api_method_create_dynamic_index() -> ApiMethod {
205 ApiMethod::new(
206 create_dynamic_index,
207 ObjectSchema::new("Create dynamic chunk index file.")
4e93f8c1 208 .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
f9578f3c
DM
209 )
210}
211
212fn create_dynamic_index(
213 param: Value,
214 _info: &ApiMethod,
215 rpcenv: &mut RpcEnvironment,
216) -> Result<Value, Error> {
217
218 let env: &BackupEnvironment = rpcenv.as_ref();
f9578f3c
DM
219
220 let mut archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
221
222 if !archive_name.ends_with(".pxar") {
223 bail!("wrong archive extension");
224 } else {
225 archive_name.push_str(".didx");
226 }
227
6b95c7df 228 let mut path = env.backup_dir.relative_path();
f9578f3c
DM
229 path.push(archive_name);
230
231 let chunk_size = 4096*1024; // todo: ??
232
bb105f9d 233 let index = env.datastore.create_dynamic_writer(&path, chunk_size)?;
372724af 234 let wid = env.register_dynamic_writer(index)?;
f9578f3c 235
bb105f9d 236 env.log(format!("created new dynamic index {} ({:?})", wid, path));
f9578f3c 237
bb105f9d 238 Ok(json!(wid))
f9578f3c
DM
239}
240
82ab7230
DM
241pub fn api_method_dynamic_append() -> ApiMethod {
242 ApiMethod::new(
243 dynamic_append,
244 ObjectSchema::new("Append chunk to dynamic index writer.")
245 .required("digest", StringSchema::new("Chunk digest."))
246 .required("wid", IntegerSchema::new("Dynamic writer ID.")
247 .minimum(1)
248 .maximum(256)
249 )
250 )
251}
252
253fn dynamic_append (
254 param: Value,
255 _info: &ApiMethod,
256 rpcenv: &mut RpcEnvironment,
257) -> Result<Value, Error> {
258
259 let wid = tools::required_integer_param(&param, "wid")? as usize;
260 let digest_str = tools::required_string_param(&param, "digest")?;
261
262 let env: &BackupEnvironment = rpcenv.as_ref();
263
a09c0e38
DM
264 let digest = crate::tools::hex_to_digest(digest_str)?;
265 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk"))?;
82ab7230 266
a09c0e38 267 env.dynamic_writer_append_chunk(wid, size, &digest)?;
82ab7230
DM
268
269 env.log(format!("sucessfully added chunk {} to dynamic index {}", digest_str, wid));
270
271 Ok(Value::Null)
272}
273
a2077252
DM
274pub fn api_method_close_dynamic_index() -> ApiMethod {
275 ApiMethod::new(
276 close_dynamic_index,
277 ObjectSchema::new("Close dynamic index writer.")
278 .required("wid", IntegerSchema::new("Dynamic writer ID.")
279 .minimum(1)
280 .maximum(256)
281 )
282 )
283}
284
285fn close_dynamic_index (
286 param: Value,
287 _info: &ApiMethod,
288 rpcenv: &mut RpcEnvironment,
289) -> Result<Value, Error> {
290
291 let wid = tools::required_integer_param(&param, "wid")? as usize;
292
293 let env: &BackupEnvironment = rpcenv.as_ref();
294
295 env.dynamic_writer_close(wid)?;
296
bb105f9d
DM
297 env.log(format!("sucessfully closed dynamic index {}", wid));
298
a2077252
DM
299 Ok(Value::Null)
300}
301
302
372724af
DM
303fn finish_backup (
304 _param: Value,
305 _info: &ApiMethod,
306 rpcenv: &mut RpcEnvironment,
307) -> Result<Value, Error> {
308
309 let env: &BackupEnvironment = rpcenv.as_ref();
310
311 env.finish_backup()?;
312
313 Ok(Value::Null)
314}
a2077252 315
92ac375a
DM
316fn test1_get (
317 _param: Value,
318 _info: &ApiMethod,
58c8d7d9 319 rpcenv: &mut RpcEnvironment,
92ac375a
DM
320) -> Result<Value, Error> {
321
58c8d7d9 322
21ee7912 323 let env: &BackupEnvironment = rpcenv.as_ref();
58c8d7d9
DM
324
325 env.log("Inside test1_get()");
326
92ac375a
DM
327 Ok(Value::Null)
328}
52cf506e 329
d3611366
DM
330fn dynamic_chunk_index(
331 _parts: Parts,
332 _req_body: Body,
333 param: Value,
334 _info: &ApiAsyncMethod,
335 rpcenv: Box<RpcEnvironment>,
336) -> Result<BoxFut, Error> {
337
338 let env: &BackupEnvironment = rpcenv.as_ref();
d3611366
DM
339
340 let mut archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
341
342 if !archive_name.ends_with(".pxar") {
343 bail!("wrong archive extension");
344 } else {
345 archive_name.push_str(".didx");
346 }
347
348 let last_backup = match &env.last_backup {
349 Some(info) => info,
350 None => {
351 let response = Response::builder()
352 .status(StatusCode::OK)
353 .body(Body::empty())?;
354 return Ok(Box::new(future::ok(response)));
355 }
356 };
357
358 let mut path = last_backup.backup_dir.relative_path();
359 path.push(archive_name);
360
361 let index = env.datastore.open_dynamic_reader(path)?;
362 // fixme: register index so that client can refer to it by ID
363
364 let reader = ChunkListReader::new(Box::new(index));
365
366 let stream = WrappedReaderStream::new(reader);
367
368 // fixme: set size, content type?
369 let response = http::Response::builder()
370 .status(200)
371 .body(Body::wrap_stream(stream))?;
372
373 Ok(Box::new(future::ok(response)))
374}
375
52cf506e 376fn test2_get(
d9bd06ea
DM
377 _parts: Parts,
378 _req_body: Body,
379 _param: Value,
52cf506e 380 _info: &ApiAsyncMethod,
b4b63e52 381 _rpcenv: Box<RpcEnvironment>,
52cf506e 382) -> Result<BoxFut, Error> {
52cf506e
DM
383
384 let fut = tokio::timer::Interval::new_interval(std::time::Duration::from_millis(300))
385 .map_err(|err| http_err!(INTERNAL_SERVER_ERROR, format!("tokio timer interval error: {}", err)))
a66ab8ae 386 .take(50)
52cf506e
DM
387 .for_each(|tv| {
388 println!("LOOP {:?}", tv);
389 Ok(())
390 })
391 .and_then(|_| {
392 println!("TASK DONE");
393 Ok(Response::builder()
394 .status(StatusCode::OK)
090ac9f7 395 .body(Body::empty())?)
52cf506e
DM
396 });
397
398 Ok(Box::new(fut))
399}