]>
Commit | Line | Data |
---|---|---|
152764ec DM |
1 | use failure::*; |
2 | ||
72375ce6 | 3 | use std::sync::Arc; |
92ac375a DM |
4 | |
5 | use futures::*; | |
152764ec | 6 | use hyper::header::{HeaderValue, UPGRADE}; |
bd1507c4 | 7 | use hyper::{Body, Response, StatusCode}; |
152764ec | 8 | use hyper::http::request::Parts; |
0aadd40b | 9 | use chrono::{Local, TimeZone}; |
152764ec | 10 | |
f9578f3c | 11 | use serde_json::{json, Value}; |
152764ec | 12 | |
92ac375a | 13 | use crate::tools; |
152764ec DM |
14 | use crate::api_schema::router::*; |
15 | use crate::api_schema::*; | |
58c8d7d9 | 16 | use crate::server::WorkerTask; |
21ee7912 | 17 | use crate::backup::*; |
152764ec | 18 | |
d95ced64 DM |
19 | mod environment; |
20 | use environment::*; | |
21 | ||
bd1507c4 DM |
22 | mod service; |
23 | use service::*; | |
24 | ||
21ee7912 DM |
25 | mod upload_chunk; |
26 | use upload_chunk::*; | |
27 | ||
28 | ||
ca60c371 | 29 | pub fn api_method_upgrade_backup() -> ApiAsyncMethod { |
152764ec | 30 | ApiAsyncMethod::new( |
0aadd40b | 31 | upgrade_to_backup_protocol, |
ca60c371 | 32 | ObjectSchema::new("Upgraded to backup protocol.") |
0aadd40b DM |
33 | .required("store", StringSchema::new("Datastore name.")) |
34 | .required("backup-type", StringSchema::new("Backup type.") | |
35 | .format(Arc::new(ApiStringFormat::Enum(vec!["vm".into(), "ct".into(), "host".into()])))) | |
36 | .required("backup-id", StringSchema::new("Backup ID.")) | |
152764ec DM |
37 | ) |
38 | } | |
39 | ||
0aadd40b | 40 | fn upgrade_to_backup_protocol( |
152764ec DM |
41 | parts: Parts, |
42 | req_body: Body, | |
0aadd40b | 43 | param: Value, |
152764ec | 44 | _info: &ApiAsyncMethod, |
b4b63e52 | 45 | rpcenv: Box<RpcEnvironment>, |
152764ec | 46 | ) -> Result<BoxFut, Error> { |
0aadd40b | 47 | |
bd1507c4 DM |
48 | static PROXMOX_BACKUP_PROTOCOL_ID: &str = "proxmox-backup-protocol-h2"; |
49 | ||
0aadd40b | 50 | let store = tools::required_string_param(¶m, "store")?; |
21ee7912 DM |
51 | let datastore = DataStore::lookup_datastore(store)?; |
52 | ||
0aadd40b DM |
53 | let backup_type = tools::required_string_param(¶m, "backup-type")?; |
54 | let backup_id = tools::required_string_param(¶m, "backup-id")?; | |
f9578f3c | 55 | let backup_time = Local.timestamp(Local::now().timestamp(), 0); |
152764ec DM |
56 | |
57 | let protocols = parts | |
58 | .headers | |
59 | .get("UPGRADE") | |
60 | .ok_or_else(|| format_err!("missing Upgrade header"))? | |
61 | .to_str()?; | |
62 | ||
0aadd40b | 63 | if protocols != PROXMOX_BACKUP_PROTOCOL_ID { |
152764ec DM |
64 | bail!("invalid protocol name"); |
65 | } | |
66 | ||
96e95fc1 DM |
67 | if parts.version >= http::version::Version::HTTP_2 { |
68 | bail!("unexpected http version '{:?}' (expected version < 2)", parts.version); | |
69 | } | |
70 | ||
0aadd40b | 71 | let worker_id = format!("{}_{}_{}", store, backup_type, backup_id); |
d9bd06ea | 72 | |
58c8d7d9 DM |
73 | let username = rpcenv.get_user().unwrap(); |
74 | let env_type = rpcenv.env_type(); | |
92ac375a | 75 | |
f9578f3c DM |
76 | let backup_dir = BackupDir::new(backup_type, backup_id, backup_time.timestamp()); |
77 | ||
78 | let (path, is_new) = datastore.create_backup_dir(&backup_dir)?; | |
79 | if !is_new { bail!("backup directorty already exists."); } | |
80 | ||
0aadd40b | 81 | WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| { |
f9578f3c DM |
82 | let backup_env = BackupEnvironment::new( |
83 | env_type, username.clone(), worker.clone(), datastore, backup_dir, path); | |
58c8d7d9 | 84 | let service = BackupService::new(backup_env, worker.clone()); |
72375ce6 | 85 | |
a66ab8ae DM |
86 | let abort_future = worker.abort_future(); |
87 | ||
152764ec DM |
88 | req_body |
89 | .on_upgrade() | |
92ac375a | 90 | .map_err(Error::from) |
152764ec | 91 | .and_then(move |conn| { |
d9bd06ea | 92 | worker.log("upgrade done"); |
92ac375a DM |
93 | |
94 | let mut http = hyper::server::conn::Http::new(); | |
95 | http.http2_only(true); | |
96 | ||
d9bd06ea DM |
97 | http.serve_connection(conn, service) |
98 | .map_err(Error::from) | |
0aadd40b | 99 | }) |
a66ab8ae | 100 | .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); })) |
e3a44552 DM |
101 | .and_then(|(result, _)| Ok(result)) |
102 | .map_err(|(err, _)| err) | |
090ac9f7 DM |
103 | })?; |
104 | ||
105 | let response = Response::builder() | |
106 | .status(StatusCode::SWITCHING_PROTOCOLS) | |
0aadd40b | 107 | .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID)) |
090ac9f7 DM |
108 | .body(Body::empty())?; |
109 | ||
110 | Ok(Box::new(futures::future::ok(response))) | |
152764ec | 111 | } |
92ac375a DM |
112 | |
113 | fn backup_api() -> Router { | |
114 | ||
115 | let test1 = Router::new() | |
116 | .get( | |
117 | ApiMethod::new( | |
118 | test1_get, | |
52cf506e DM |
119 | ObjectSchema::new("Test sync callback.") |
120 | ) | |
121 | ); | |
122 | ||
123 | let test2 = Router::new() | |
124 | .download( | |
125 | ApiAsyncMethod::new( | |
126 | test2_get, | |
127 | ObjectSchema::new("Test async callback.") | |
92ac375a DM |
128 | ) |
129 | ); | |
130 | ||
131 | let router = Router::new() | |
f9578f3c DM |
132 | .subdir( |
133 | "dynamic_chunk", Router::new() | |
134 | .upload(api_method_upload_dynamic_chunk()) | |
135 | ) | |
136 | .subdir( | |
137 | "dynamic_index", Router::new() | |
138 | .post(api_method_create_dynamic_index()) | |
139 | ) | |
92ac375a | 140 | .subdir("test1", test1) |
52cf506e | 141 | .subdir("test2", test2) |
92ac375a DM |
142 | .list_subdirs(); |
143 | ||
144 | router | |
145 | } | |
146 | ||
f9578f3c DM |
147 | pub fn api_method_create_dynamic_index() -> ApiMethod { |
148 | ApiMethod::new( | |
149 | create_dynamic_index, | |
150 | ObjectSchema::new("Create dynamic chunk index file.") | |
151 | ) | |
152 | } | |
153 | ||
154 | fn create_dynamic_index( | |
155 | param: Value, | |
156 | _info: &ApiMethod, | |
157 | rpcenv: &mut RpcEnvironment, | |
158 | ) -> Result<Value, Error> { | |
159 | ||
160 | let env: &BackupEnvironment = rpcenv.as_ref(); | |
161 | env.log("Inside create_dynamic_index"); | |
162 | ||
163 | let mut archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned(); | |
164 | ||
165 | if !archive_name.ends_with(".pxar") { | |
166 | bail!("wrong archive extension"); | |
167 | } else { | |
168 | archive_name.push_str(".didx"); | |
169 | } | |
170 | ||
171 | let mut path = env.path.clone(); | |
172 | path.push(archive_name); | |
173 | ||
174 | let chunk_size = 4096*1024; // todo: ?? | |
175 | ||
176 | let index = env.datastore.create_dynamic_writer(path, chunk_size)?; | |
177 | let uid = env.register_dynamic_writer(index); | |
178 | ||
179 | ||
180 | Ok(json!(uid)) | |
181 | } | |
182 | ||
92ac375a DM |
183 | fn test1_get ( |
184 | _param: Value, | |
185 | _info: &ApiMethod, | |
58c8d7d9 | 186 | rpcenv: &mut RpcEnvironment, |
92ac375a DM |
187 | ) -> Result<Value, Error> { |
188 | ||
58c8d7d9 DM |
189 | println!("TYPEID {:?}", (*rpcenv).type_id()); |
190 | ||
21ee7912 | 191 | let env: &BackupEnvironment = rpcenv.as_ref(); |
58c8d7d9 DM |
192 | |
193 | env.log("Inside test1_get()"); | |
194 | ||
92ac375a DM |
195 | Ok(Value::Null) |
196 | } | |
52cf506e DM |
197 | |
198 | fn test2_get( | |
d9bd06ea DM |
199 | _parts: Parts, |
200 | _req_body: Body, | |
201 | _param: Value, | |
52cf506e | 202 | _info: &ApiAsyncMethod, |
b4b63e52 | 203 | _rpcenv: Box<RpcEnvironment>, |
52cf506e | 204 | ) -> Result<BoxFut, Error> { |
52cf506e DM |
205 | |
206 | let fut = tokio::timer::Interval::new_interval(std::time::Duration::from_millis(300)) | |
207 | .map_err(|err| http_err!(INTERNAL_SERVER_ERROR, format!("tokio timer interval error: {}", err))) | |
a66ab8ae | 208 | .take(50) |
52cf506e DM |
209 | .for_each(|tv| { |
210 | println!("LOOP {:?}", tv); | |
211 | Ok(()) | |
212 | }) | |
213 | .and_then(|_| { | |
214 | println!("TASK DONE"); | |
215 | Ok(Response::builder() | |
216 | .status(StatusCode::OK) | |
090ac9f7 | 217 | .body(Body::empty())?) |
52cf506e DM |
218 | }); |
219 | ||
220 | Ok(Box::new(fut)) | |
221 | } |