]>
Commit | Line | Data |
---|---|---|
152764ec DM |
1 | use failure::*; |
2 | ||
72375ce6 | 3 | use std::sync::Arc; |
92ac375a DM |
4 | |
5 | use futures::*; | |
152764ec | 6 | use hyper::header::{HeaderValue, UPGRADE}; |
bd1507c4 | 7 | use hyper::{Body, Response, StatusCode}; |
152764ec | 8 | use hyper::http::request::Parts; |
0aadd40b | 9 | use chrono::{Local, TimeZone}; |
152764ec | 10 | |
f9578f3c | 11 | use serde_json::{json, Value}; |
152764ec | 12 | |
92ac375a | 13 | use crate::tools; |
d3611366 | 14 | use crate::tools::wrapped_reader_stream::*; |
152764ec DM |
15 | use crate::api_schema::router::*; |
16 | use crate::api_schema::*; | |
58c8d7d9 | 17 | use crate::server::WorkerTask; |
21ee7912 | 18 | use crate::backup::*; |
152764ec | 19 | |
d95ced64 DM |
20 | mod environment; |
21 | use environment::*; | |
22 | ||
bd1507c4 DM |
23 | mod service; |
24 | use service::*; | |
25 | ||
21ee7912 DM |
26 | mod upload_chunk; |
27 | use upload_chunk::*; | |
28 | ||
29 | ||
ca60c371 | 30 | pub fn api_method_upgrade_backup() -> ApiAsyncMethod { |
152764ec | 31 | ApiAsyncMethod::new( |
0aadd40b | 32 | upgrade_to_backup_protocol, |
ca60c371 | 33 | ObjectSchema::new("Upgraded to backup protocol.") |
0aadd40b DM |
34 | .required("store", StringSchema::new("Datastore name.")) |
35 | .required("backup-type", StringSchema::new("Backup type.") | |
36 | .format(Arc::new(ApiStringFormat::Enum(vec!["vm".into(), "ct".into(), "host".into()])))) | |
37 | .required("backup-id", StringSchema::new("Backup ID.")) | |
152764ec DM |
38 | ) |
39 | } | |
40 | ||
0aadd40b | 41 | fn upgrade_to_backup_protocol( |
152764ec DM |
42 | parts: Parts, |
43 | req_body: Body, | |
0aadd40b | 44 | param: Value, |
152764ec | 45 | _info: &ApiAsyncMethod, |
b4b63e52 | 46 | rpcenv: Box<RpcEnvironment>, |
152764ec | 47 | ) -> Result<BoxFut, Error> { |
0aadd40b | 48 | |
bd1507c4 DM |
49 | static PROXMOX_BACKUP_PROTOCOL_ID: &str = "proxmox-backup-protocol-h2"; |
50 | ||
bb105f9d DM |
51 | let store = tools::required_string_param(¶m, "store")?.to_owned(); |
52 | let datastore = DataStore::lookup_datastore(&store)?; | |
21ee7912 | 53 | |
0aadd40b DM |
54 | let backup_type = tools::required_string_param(¶m, "backup-type")?; |
55 | let backup_id = tools::required_string_param(¶m, "backup-id")?; | |
f9578f3c | 56 | let backup_time = Local.timestamp(Local::now().timestamp(), 0); |
152764ec DM |
57 | |
58 | let protocols = parts | |
59 | .headers | |
60 | .get("UPGRADE") | |
61 | .ok_or_else(|| format_err!("missing Upgrade header"))? | |
62 | .to_str()?; | |
63 | ||
0aadd40b | 64 | if protocols != PROXMOX_BACKUP_PROTOCOL_ID { |
152764ec DM |
65 | bail!("invalid protocol name"); |
66 | } | |
67 | ||
96e95fc1 DM |
68 | if parts.version >= http::version::Version::HTTP_2 { |
69 | bail!("unexpected http version '{:?}' (expected version < 2)", parts.version); | |
70 | } | |
71 | ||
0aadd40b | 72 | let worker_id = format!("{}_{}_{}", store, backup_type, backup_id); |
d9bd06ea | 73 | |
58c8d7d9 DM |
74 | let username = rpcenv.get_user().unwrap(); |
75 | let env_type = rpcenv.env_type(); | |
92ac375a | 76 | |
51a4f63f | 77 | let backup_group = BackupGroup::new(backup_type, backup_id); |
fbb798f6 | 78 | let last_backup = BackupInfo::last_backup(&datastore.base_path(), &backup_group).unwrap_or(None); |
51a4f63f | 79 | let backup_dir = BackupDir::new_with_group(backup_group, backup_time.timestamp()); |
f9578f3c | 80 | |
bb105f9d | 81 | let (path, is_new) = datastore.create_backup_dir(&backup_dir)?; |
f9578f3c DM |
82 | if !is_new { bail!("backup directorty already exists."); } |
83 | ||
0aadd40b | 84 | WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| { |
bb105f9d | 85 | let mut env = BackupEnvironment::new( |
6b95c7df | 86 | env_type, username.clone(), worker.clone(), datastore, backup_dir); |
b02a52e3 | 87 | |
bb105f9d | 88 | env.last_backup = last_backup; |
b02a52e3 | 89 | |
bb105f9d DM |
90 | env.log(format!("starting new backup on datastore '{}': {:?}", store, path)); |
91 | ||
372724af | 92 | let service = BackupService::new(env.clone(), worker.clone()); |
72375ce6 | 93 | |
a66ab8ae DM |
94 | let abort_future = worker.abort_future(); |
95 | ||
372724af | 96 | let env2 = env.clone(); |
bb105f9d | 97 | |
152764ec DM |
98 | req_body |
99 | .on_upgrade() | |
92ac375a | 100 | .map_err(Error::from) |
152764ec | 101 | .and_then(move |conn| { |
d9bd06ea | 102 | worker.log("upgrade done"); |
92ac375a DM |
103 | |
104 | let mut http = hyper::server::conn::Http::new(); | |
105 | http.http2_only(true); | |
adec8ea2 DM |
106 | // increase window size: todo - find optiomal size |
107 | http.http2_initial_stream_window_size( (1 << 31) - 2); | |
108 | http.http2_initial_connection_window_size( (1 << 31) - 2); | |
92ac375a | 109 | |
d9bd06ea DM |
110 | http.serve_connection(conn, service) |
111 | .map_err(Error::from) | |
0aadd40b | 112 | }) |
a66ab8ae | 113 | .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); })) |
372724af | 114 | .map_err(|(err, _)| err) |
bb105f9d | 115 | .and_then(move |(_result, _)| { |
372724af DM |
116 | env.ensure_finished()?; |
117 | env.log("backup finished sucessfully"); | |
bb105f9d DM |
118 | Ok(()) |
119 | }) | |
372724af DM |
120 | .then(move |result| { |
121 | if let Err(err) = result { | |
122 | env2.log(format!("backup failed: {}", err)); | |
123 | env2.log("removing failed backup"); | |
124 | env2.remove_backup()?; | |
125 | return Err(err); | |
126 | } | |
127 | Ok(()) | |
bb105f9d | 128 | }) |
090ac9f7 DM |
129 | })?; |
130 | ||
131 | let response = Response::builder() | |
132 | .status(StatusCode::SWITCHING_PROTOCOLS) | |
0aadd40b | 133 | .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID)) |
090ac9f7 DM |
134 | .body(Body::empty())?; |
135 | ||
136 | Ok(Box::new(futures::future::ok(response))) | |
152764ec | 137 | } |
92ac375a DM |
138 | |
139 | fn backup_api() -> Router { | |
140 | ||
141 | let test1 = Router::new() | |
142 | .get( | |
143 | ApiMethod::new( | |
144 | test1_get, | |
52cf506e DM |
145 | ObjectSchema::new("Test sync callback.") |
146 | ) | |
147 | ); | |
148 | ||
149 | let test2 = Router::new() | |
150 | .download( | |
151 | ApiAsyncMethod::new( | |
152 | test2_get, | |
153 | ObjectSchema::new("Test async callback.") | |
92ac375a DM |
154 | ) |
155 | ); | |
156 | ||
157 | let router = Router::new() | |
f9578f3c DM |
158 | .subdir( |
159 | "dynamic_chunk", Router::new() | |
160 | .upload(api_method_upload_dynamic_chunk()) | |
161 | ) | |
162 | .subdir( | |
163 | "dynamic_index", Router::new() | |
d3611366 | 164 | .download(api_method_dynamic_chunk_index()) |
f9578f3c | 165 | .post(api_method_create_dynamic_index()) |
82ab7230 | 166 | .put(api_method_dynamic_append()) |
f9578f3c | 167 | ) |
a2077252 DM |
168 | .subdir( |
169 | "dynamic_close", Router::new() | |
170 | .post(api_method_close_dynamic_index()) | |
171 | ) | |
372724af DM |
172 | .subdir( |
173 | "finish", Router::new() | |
174 | .get( | |
175 | ApiMethod::new( | |
176 | finish_backup, | |
177 | ObjectSchema::new("Mark backup as finished.") | |
178 | ) | |
179 | ) | |
180 | ) | |
adec8ea2 DM |
181 | .subdir( |
182 | "speedtest", Router::new() | |
183 | .upload(api_method_upload_speedtest()) | |
184 | ) | |
92ac375a | 185 | .subdir("test1", test1) |
52cf506e | 186 | .subdir("test2", test2) |
92ac375a DM |
187 | .list_subdirs(); |
188 | ||
189 | router | |
190 | } | |
191 | ||
d3611366 DM |
192 | pub fn api_method_dynamic_chunk_index() -> ApiAsyncMethod { |
193 | ApiAsyncMethod::new( | |
194 | dynamic_chunk_index, | |
195 | ObjectSchema::new(r###" | |
196 | Download the dynamic chunk index from the previous backup. | |
197 | Simply returns an empty list if this is the first backup. | |
198 | "### | |
199 | ) | |
4e93f8c1 | 200 | .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone()) |
d3611366 DM |
201 | ) |
202 | } | |
203 | ||
f9578f3c DM |
204 | pub fn api_method_create_dynamic_index() -> ApiMethod { |
205 | ApiMethod::new( | |
206 | create_dynamic_index, | |
207 | ObjectSchema::new("Create dynamic chunk index file.") | |
4e93f8c1 | 208 | .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone()) |
f9578f3c DM |
209 | ) |
210 | } | |
211 | ||
212 | fn create_dynamic_index( | |
213 | param: Value, | |
214 | _info: &ApiMethod, | |
215 | rpcenv: &mut RpcEnvironment, | |
216 | ) -> Result<Value, Error> { | |
217 | ||
218 | let env: &BackupEnvironment = rpcenv.as_ref(); | |
f9578f3c DM |
219 | |
220 | let mut archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned(); | |
221 | ||
222 | if !archive_name.ends_with(".pxar") { | |
223 | bail!("wrong archive extension"); | |
224 | } else { | |
225 | archive_name.push_str(".didx"); | |
226 | } | |
227 | ||
6b95c7df | 228 | let mut path = env.backup_dir.relative_path(); |
f9578f3c DM |
229 | path.push(archive_name); |
230 | ||
231 | let chunk_size = 4096*1024; // todo: ?? | |
232 | ||
bb105f9d | 233 | let index = env.datastore.create_dynamic_writer(&path, chunk_size)?; |
372724af | 234 | let wid = env.register_dynamic_writer(index)?; |
f9578f3c | 235 | |
bb105f9d | 236 | env.log(format!("created new dynamic index {} ({:?})", wid, path)); |
f9578f3c | 237 | |
bb105f9d | 238 | Ok(json!(wid)) |
f9578f3c DM |
239 | } |
240 | ||
82ab7230 DM |
241 | pub fn api_method_dynamic_append() -> ApiMethod { |
242 | ApiMethod::new( | |
243 | dynamic_append, | |
244 | ObjectSchema::new("Append chunk to dynamic index writer.") | |
245 | .required("digest", StringSchema::new("Chunk digest.")) | |
246 | .required("wid", IntegerSchema::new("Dynamic writer ID.") | |
247 | .minimum(1) | |
248 | .maximum(256) | |
249 | ) | |
250 | ) | |
251 | } | |
252 | ||
253 | fn dynamic_append ( | |
254 | param: Value, | |
255 | _info: &ApiMethod, | |
256 | rpcenv: &mut RpcEnvironment, | |
257 | ) -> Result<Value, Error> { | |
258 | ||
259 | let wid = tools::required_integer_param(¶m, "wid")? as usize; | |
260 | let digest_str = tools::required_string_param(¶m, "digest")?; | |
261 | ||
262 | let env: &BackupEnvironment = rpcenv.as_ref(); | |
263 | ||
a09c0e38 DM |
264 | let digest = crate::tools::hex_to_digest(digest_str)?; |
265 | let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk"))?; | |
82ab7230 | 266 | |
a09c0e38 | 267 | env.dynamic_writer_append_chunk(wid, size, &digest)?; |
82ab7230 DM |
268 | |
269 | env.log(format!("sucessfully added chunk {} to dynamic index {}", digest_str, wid)); | |
270 | ||
271 | Ok(Value::Null) | |
272 | } | |
273 | ||
a2077252 DM |
274 | pub fn api_method_close_dynamic_index() -> ApiMethod { |
275 | ApiMethod::new( | |
276 | close_dynamic_index, | |
277 | ObjectSchema::new("Close dynamic index writer.") | |
278 | .required("wid", IntegerSchema::new("Dynamic writer ID.") | |
279 | .minimum(1) | |
280 | .maximum(256) | |
281 | ) | |
282 | ) | |
283 | } | |
284 | ||
285 | fn close_dynamic_index ( | |
286 | param: Value, | |
287 | _info: &ApiMethod, | |
288 | rpcenv: &mut RpcEnvironment, | |
289 | ) -> Result<Value, Error> { | |
290 | ||
291 | let wid = tools::required_integer_param(¶m, "wid")? as usize; | |
292 | ||
293 | let env: &BackupEnvironment = rpcenv.as_ref(); | |
294 | ||
295 | env.dynamic_writer_close(wid)?; | |
296 | ||
bb105f9d DM |
297 | env.log(format!("sucessfully closed dynamic index {}", wid)); |
298 | ||
a2077252 DM |
299 | Ok(Value::Null) |
300 | } | |
301 | ||
302 | ||
372724af DM |
303 | fn finish_backup ( |
304 | _param: Value, | |
305 | _info: &ApiMethod, | |
306 | rpcenv: &mut RpcEnvironment, | |
307 | ) -> Result<Value, Error> { | |
308 | ||
309 | let env: &BackupEnvironment = rpcenv.as_ref(); | |
310 | ||
311 | env.finish_backup()?; | |
312 | ||
313 | Ok(Value::Null) | |
314 | } | |
a2077252 | 315 | |
92ac375a DM |
316 | fn test1_get ( |
317 | _param: Value, | |
318 | _info: &ApiMethod, | |
58c8d7d9 | 319 | rpcenv: &mut RpcEnvironment, |
92ac375a DM |
320 | ) -> Result<Value, Error> { |
321 | ||
58c8d7d9 | 322 | |
21ee7912 | 323 | let env: &BackupEnvironment = rpcenv.as_ref(); |
58c8d7d9 DM |
324 | |
325 | env.log("Inside test1_get()"); | |
326 | ||
92ac375a DM |
327 | Ok(Value::Null) |
328 | } | |
52cf506e | 329 | |
d3611366 DM |
330 | fn dynamic_chunk_index( |
331 | _parts: Parts, | |
332 | _req_body: Body, | |
333 | param: Value, | |
334 | _info: &ApiAsyncMethod, | |
335 | rpcenv: Box<RpcEnvironment>, | |
336 | ) -> Result<BoxFut, Error> { | |
337 | ||
338 | let env: &BackupEnvironment = rpcenv.as_ref(); | |
d3611366 DM |
339 | |
340 | let mut archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned(); | |
341 | ||
342 | if !archive_name.ends_with(".pxar") { | |
343 | bail!("wrong archive extension"); | |
344 | } else { | |
345 | archive_name.push_str(".didx"); | |
346 | } | |
347 | ||
348 | let last_backup = match &env.last_backup { | |
349 | Some(info) => info, | |
350 | None => { | |
351 | let response = Response::builder() | |
352 | .status(StatusCode::OK) | |
353 | .body(Body::empty())?; | |
354 | return Ok(Box::new(future::ok(response))); | |
355 | } | |
356 | }; | |
357 | ||
358 | let mut path = last_backup.backup_dir.relative_path(); | |
359 | path.push(archive_name); | |
360 | ||
361 | let index = env.datastore.open_dynamic_reader(path)?; | |
362 | // fixme: register index so that client can refer to it by ID | |
363 | ||
364 | let reader = ChunkListReader::new(Box::new(index)); | |
365 | ||
366 | let stream = WrappedReaderStream::new(reader); | |
367 | ||
368 | // fixme: set size, content type? | |
369 | let response = http::Response::builder() | |
370 | .status(200) | |
371 | .body(Body::wrap_stream(stream))?; | |
372 | ||
373 | Ok(Box::new(future::ok(response))) | |
374 | } | |
375 | ||
52cf506e | 376 | fn test2_get( |
d9bd06ea DM |
377 | _parts: Parts, |
378 | _req_body: Body, | |
379 | _param: Value, | |
52cf506e | 380 | _info: &ApiAsyncMethod, |
b4b63e52 | 381 | _rpcenv: Box<RpcEnvironment>, |
52cf506e | 382 | ) -> Result<BoxFut, Error> { |
52cf506e DM |
383 | |
384 | let fut = tokio::timer::Interval::new_interval(std::time::Duration::from_millis(300)) | |
385 | .map_err(|err| http_err!(INTERNAL_SERVER_ERROR, format!("tokio timer interval error: {}", err))) | |
a66ab8ae | 386 | .take(50) |
52cf506e DM |
387 | .for_each(|tv| { |
388 | println!("LOOP {:?}", tv); | |
389 | Ok(()) | |
390 | }) | |
391 | .and_then(|_| { | |
392 | println!("TASK DONE"); | |
393 | Ok(Response::builder() | |
394 | .status(StatusCode::OK) | |
090ac9f7 | 395 | .body(Body::empty())?) |
52cf506e DM |
396 | }); |
397 | ||
398 | Ok(Box::new(fut)) | |
399 | } |