]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/admin/datastore/backup.rs
src/backup/backup_info.rs: implement list_backup for BackupGroup, cleanups
[proxmox-backup.git] / src / api2 / admin / datastore / backup.rs
CommitLineData
152764ec
DM
1use failure::*;
2
72375ce6 3use std::sync::Arc;
92ac375a
DM
4
5use futures::*;
152764ec 6use hyper::header::{HeaderValue, UPGRADE};
bd1507c4 7use hyper::{Body, Response, StatusCode};
152764ec 8use hyper::http::request::Parts;
0aadd40b 9use chrono::{Local, TimeZone};
152764ec 10
f9578f3c 11use serde_json::{json, Value};
152764ec 12
92ac375a 13use crate::tools;
152764ec
DM
14use crate::api_schema::router::*;
15use crate::api_schema::*;
58c8d7d9 16use crate::server::WorkerTask;
21ee7912 17use crate::backup::*;
152764ec 18
d95ced64
DM
19mod environment;
20use environment::*;
21
bd1507c4
DM
22mod service;
23use service::*;
24
21ee7912
DM
25mod upload_chunk;
26use upload_chunk::*;
27
28
ca60c371 29pub fn api_method_upgrade_backup() -> ApiAsyncMethod {
152764ec 30 ApiAsyncMethod::new(
0aadd40b 31 upgrade_to_backup_protocol,
ca60c371 32 ObjectSchema::new("Upgraded to backup protocol.")
0aadd40b
DM
33 .required("store", StringSchema::new("Datastore name."))
34 .required("backup-type", StringSchema::new("Backup type.")
35 .format(Arc::new(ApiStringFormat::Enum(vec!["vm".into(), "ct".into(), "host".into()]))))
36 .required("backup-id", StringSchema::new("Backup ID."))
152764ec
DM
37 )
38}
39
0aadd40b 40fn upgrade_to_backup_protocol(
152764ec
DM
41 parts: Parts,
42 req_body: Body,
0aadd40b 43 param: Value,
152764ec 44 _info: &ApiAsyncMethod,
b4b63e52 45 rpcenv: Box<RpcEnvironment>,
152764ec 46) -> Result<BoxFut, Error> {
0aadd40b 47
bd1507c4
DM
48 static PROXMOX_BACKUP_PROTOCOL_ID: &str = "proxmox-backup-protocol-h2";
49
0aadd40b 50 let store = tools::required_string_param(&param, "store")?;
21ee7912
DM
51 let datastore = DataStore::lookup_datastore(store)?;
52
0aadd40b
DM
53 let backup_type = tools::required_string_param(&param, "backup-type")?;
54 let backup_id = tools::required_string_param(&param, "backup-id")?;
f9578f3c 55 let backup_time = Local.timestamp(Local::now().timestamp(), 0);
152764ec
DM
56
57 let protocols = parts
58 .headers
59 .get("UPGRADE")
60 .ok_or_else(|| format_err!("missing Upgrade header"))?
61 .to_str()?;
62
0aadd40b 63 if protocols != PROXMOX_BACKUP_PROTOCOL_ID {
152764ec
DM
64 bail!("invalid protocol name");
65 }
66
96e95fc1
DM
67 if parts.version >= http::version::Version::HTTP_2 {
68 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
69 }
70
0aadd40b 71 let worker_id = format!("{}_{}_{}", store, backup_type, backup_id);
d9bd06ea 72
58c8d7d9
DM
73 let username = rpcenv.get_user().unwrap();
74 let env_type = rpcenv.env_type();
92ac375a 75
f9578f3c
DM
76 let backup_dir = BackupDir::new(backup_type, backup_id, backup_time.timestamp());
77
78 let (path, is_new) = datastore.create_backup_dir(&backup_dir)?;
79 if !is_new { bail!("backup directorty already exists."); }
80
0aadd40b 81 WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| {
f9578f3c
DM
82 let backup_env = BackupEnvironment::new(
83 env_type, username.clone(), worker.clone(), datastore, backup_dir, path);
58c8d7d9 84 let service = BackupService::new(backup_env, worker.clone());
72375ce6 85
a66ab8ae
DM
86 let abort_future = worker.abort_future();
87
152764ec
DM
88 req_body
89 .on_upgrade()
92ac375a 90 .map_err(Error::from)
152764ec 91 .and_then(move |conn| {
d9bd06ea 92 worker.log("upgrade done");
92ac375a
DM
93
94 let mut http = hyper::server::conn::Http::new();
95 http.http2_only(true);
96
d9bd06ea
DM
97 http.serve_connection(conn, service)
98 .map_err(Error::from)
0aadd40b 99 })
a66ab8ae 100 .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); }))
e3a44552
DM
101 .and_then(|(result, _)| Ok(result))
102 .map_err(|(err, _)| err)
090ac9f7
DM
103 })?;
104
105 let response = Response::builder()
106 .status(StatusCode::SWITCHING_PROTOCOLS)
0aadd40b 107 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID))
090ac9f7
DM
108 .body(Body::empty())?;
109
110 Ok(Box::new(futures::future::ok(response)))
152764ec 111}
92ac375a
DM
112
113fn backup_api() -> Router {
114
115 let test1 = Router::new()
116 .get(
117 ApiMethod::new(
118 test1_get,
52cf506e
DM
119 ObjectSchema::new("Test sync callback.")
120 )
121 );
122
123 let test2 = Router::new()
124 .download(
125 ApiAsyncMethod::new(
126 test2_get,
127 ObjectSchema::new("Test async callback.")
92ac375a
DM
128 )
129 );
130
131 let router = Router::new()
f9578f3c
DM
132 .subdir(
133 "dynamic_chunk", Router::new()
134 .upload(api_method_upload_dynamic_chunk())
135 )
136 .subdir(
137 "dynamic_index", Router::new()
138 .post(api_method_create_dynamic_index())
139 )
92ac375a 140 .subdir("test1", test1)
52cf506e 141 .subdir("test2", test2)
92ac375a
DM
142 .list_subdirs();
143
144 router
145}
146
f9578f3c
DM
147pub fn api_method_create_dynamic_index() -> ApiMethod {
148 ApiMethod::new(
149 create_dynamic_index,
150 ObjectSchema::new("Create dynamic chunk index file.")
151 )
152}
153
154fn create_dynamic_index(
155 param: Value,
156 _info: &ApiMethod,
157 rpcenv: &mut RpcEnvironment,
158) -> Result<Value, Error> {
159
160 let env: &BackupEnvironment = rpcenv.as_ref();
161 env.log("Inside create_dynamic_index");
162
163 let mut archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
164
165 if !archive_name.ends_with(".pxar") {
166 bail!("wrong archive extension");
167 } else {
168 archive_name.push_str(".didx");
169 }
170
171 let mut path = env.path.clone();
172 path.push(archive_name);
173
174 let chunk_size = 4096*1024; // todo: ??
175
176 let index = env.datastore.create_dynamic_writer(path, chunk_size)?;
177 let uid = env.register_dynamic_writer(index);
178
179
180 Ok(json!(uid))
181}
182
92ac375a
DM
183fn test1_get (
184 _param: Value,
185 _info: &ApiMethod,
58c8d7d9 186 rpcenv: &mut RpcEnvironment,
92ac375a
DM
187) -> Result<Value, Error> {
188
58c8d7d9
DM
189 println!("TYPEID {:?}", (*rpcenv).type_id());
190
21ee7912 191 let env: &BackupEnvironment = rpcenv.as_ref();
58c8d7d9
DM
192
193 env.log("Inside test1_get()");
194
92ac375a
DM
195 Ok(Value::Null)
196}
52cf506e
DM
197
198fn test2_get(
d9bd06ea
DM
199 _parts: Parts,
200 _req_body: Body,
201 _param: Value,
52cf506e 202 _info: &ApiAsyncMethod,
b4b63e52 203 _rpcenv: Box<RpcEnvironment>,
52cf506e 204) -> Result<BoxFut, Error> {
52cf506e
DM
205
206 let fut = tokio::timer::Interval::new_interval(std::time::Duration::from_millis(300))
207 .map_err(|err| http_err!(INTERNAL_SERVER_ERROR, format!("tokio timer interval error: {}", err)))
a66ab8ae 208 .take(50)
52cf506e
DM
209 .for_each(|tv| {
210 println!("LOOP {:?}", tv);
211 Ok(())
212 })
213 .and_then(|_| {
214 println!("TASK DONE");
215 Ok(Response::builder()
216 .status(StatusCode::OK)
090ac9f7 217 .body(Body::empty())?)
52cf506e
DM
218 });
219
220 Ok(Box::new(fut))
221}