]>
Commit | Line | Data |
---|---|---|
152764ec | 1 | use failure::*; |
92ac375a | 2 | use lazy_static::lazy_static; |
152764ec | 3 | |
92ac375a | 4 | use std::collections::HashMap; |
72375ce6 | 5 | use std::sync::Arc; |
92ac375a DM |
6 | |
7 | use futures::*; | |
152764ec | 8 | use hyper::header::{HeaderValue, UPGRADE}; |
92ac375a | 9 | use hyper::{Body, Request, Response, StatusCode}; |
152764ec | 10 | use hyper::http::request::Parts; |
152764ec DM |
11 | |
12 | use serde_json::Value; | |
13 | ||
92ac375a | 14 | use crate::tools; |
152764ec DM |
15 | use crate::api_schema::router::*; |
16 | use crate::api_schema::*; | |
92ac375a | 17 | use crate::server::formatter::*; |
d9bd06ea | 18 | use crate::server::{WorkerTask, RestEnvironment}; |
152764ec DM |
19 | |
20 | pub fn api_method_upgrade_h2upload() -> ApiAsyncMethod { | |
21 | ApiAsyncMethod::new( | |
22 | upgrade_h2upload, | |
23 | ObjectSchema::new("Experimental h2 server") | |
24 | .required("store", StringSchema::new("Datastore name.")), | |
25 | ) | |
26 | } | |
27 | ||
92ac375a DM |
28 | lazy_static!{ |
29 | static ref BACKUP_ROUTER: Router = backup_api(); | |
30 | } | |
31 | ||
32 | pub struct BackupService { | |
33 | rpcenv: RestEnvironment, | |
72375ce6 | 34 | worker: Arc<WorkerTask>, |
92ac375a DM |
35 | } |
36 | ||
37 | impl BackupService { | |
38 | ||
72375ce6 DM |
39 | fn new(rpcenv: RestEnvironment, worker: Arc<WorkerTask>) -> Self { |
40 | Self { rpcenv, worker } | |
92ac375a DM |
41 | } |
42 | ||
43 | fn handle_request(&self, req: Request<Body>) -> BoxFut { | |
44 | ||
45 | let (parts, body) = req.into_parts(); | |
46 | ||
47 | let method = parts.method.clone(); | |
48 | ||
49 | let (path, components) = match tools::normalize_uri_path(parts.uri.path()) { | |
50 | Ok((p,c)) => (p, c), | |
51 | Err(err) => return Box::new(future::err(http_err!(BAD_REQUEST, err.to_string()))), | |
52 | }; | |
53 | ||
54 | let formatter = &JSON_FORMATTER; | |
55 | ||
72375ce6 DM |
56 | self.worker.log(format!("H2 REQUEST {} {}", method, path)); |
57 | self.worker.log(format!("H2 COMPO {:?}", components)); | |
92ac375a DM |
58 | |
59 | let mut uri_param = HashMap::new(); | |
60 | ||
61 | match BACKUP_ROUTER.find_method(&components, method, &mut uri_param) { | |
62 | MethodDefinition::None => { | |
63 | let err = http_err!(NOT_FOUND, "Path not found.".to_string()); | |
64 | return Box::new(future::ok((formatter.format_error)(err))); | |
65 | } | |
66 | MethodDefinition::Simple(api_method) => { | |
67 | return crate::server::rest::handle_sync_api_request(self.rpcenv.clone(), api_method, formatter, parts, body, uri_param); | |
68 | } | |
69 | MethodDefinition::Async(async_method) => { | |
70 | return crate::server::rest::handle_async_api_request(self.rpcenv.clone(), async_method, formatter, parts, body, uri_param); | |
71 | } | |
72 | } | |
73 | } | |
72375ce6 DM |
74 | |
75 | fn log_response(worker: Arc<WorkerTask>, method: hyper::Method, path: &str, resp: &Response<Body>) { | |
76 | ||
77 | let status = resp.status(); | |
78 | ||
79 | if !status.is_success() { | |
80 | let reason = status.canonical_reason().unwrap_or("unknown reason"); | |
72375ce6 DM |
81 | |
82 | let mut message = "request failed"; | |
83 | if let Some(data) = resp.extensions().get::<ErrorMessageExtension>() { | |
84 | message = &data.0; | |
85 | } | |
86 | ||
4e3da4b3 | 87 | worker.log(format!("{} {}: {} {}: {}", method.as_str(), path, status.as_str(), reason, message)); |
72375ce6 DM |
88 | } |
89 | } | |
92ac375a DM |
90 | } |
91 | ||
d9bd06ea DM |
92 | impl Drop for BackupService { |
93 | fn drop(&mut self) { | |
94 | println!("SERVER DROP"); | |
95 | } | |
96 | } | |
97 | ||
92ac375a DM |
98 | impl hyper::service::Service for BackupService { |
99 | type ReqBody = Body; | |
100 | type ResBody = Body; | |
101 | type Error = hyper::Error; | |
102 | type Future = Box<Future<Item = Response<Body>, Error = Self::Error> + Send>; | |
103 | ||
104 | fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future { | |
72375ce6 DM |
105 | let path = req.uri().path().to_owned(); |
106 | let method = req.method().clone(); | |
107 | let worker = self.worker.clone(); | |
92ac375a DM |
108 | |
109 | Box::new(self.handle_request(req).then(move |result| { | |
110 | match result { | |
111 | Ok(res) => { | |
72375ce6 | 112 | Self::log_response(worker, method, &path, &res); |
92ac375a DM |
113 | Ok::<_, hyper::Error>(res) |
114 | } | |
115 | Err(err) => { | |
116 | if let Some(apierr) = err.downcast_ref::<HttpError>() { | |
117 | let mut resp = Response::new(Body::from(apierr.message.clone())); | |
118 | *resp.status_mut() = apierr.code; | |
72375ce6 | 119 | Self::log_response(worker, method, &path, &resp); |
92ac375a DM |
120 | Ok(resp) |
121 | } else { | |
122 | let mut resp = Response::new(Body::from(err.to_string())); | |
123 | *resp.status_mut() = StatusCode::BAD_REQUEST; | |
72375ce6 | 124 | Self::log_response(worker, method, &path, &resp); |
92ac375a DM |
125 | Ok(resp) |
126 | } | |
127 | } | |
128 | } | |
129 | })) | |
130 | } | |
131 | } | |
132 | ||
152764ec DM |
133 | fn upgrade_h2upload( |
134 | parts: Parts, | |
135 | req_body: Body, | |
92ac375a | 136 | _param: Value, |
152764ec | 137 | _info: &ApiAsyncMethod, |
92ac375a | 138 | rpcenv: &mut RpcEnvironment, |
152764ec DM |
139 | ) -> Result<BoxFut, Error> { |
140 | let expected_protocol: &'static str = "proxmox-backup-protocol-h2"; | |
141 | ||
142 | let protocols = parts | |
143 | .headers | |
144 | .get("UPGRADE") | |
145 | .ok_or_else(|| format_err!("missing Upgrade header"))? | |
146 | .to_str()?; | |
147 | ||
148 | if protocols != expected_protocol { | |
149 | bail!("invalid protocol name"); | |
150 | } | |
151 | ||
96e95fc1 DM |
152 | if parts.version >= http::version::Version::HTTP_2 { |
153 | bail!("unexpected http version '{:?}' (expected version < 2)", parts.version); | |
154 | } | |
155 | ||
d9bd06ea DM |
156 | let worker_id = String::from("test2workerid"); |
157 | ||
92ac375a | 158 | |
72375ce6 DM |
159 | let mut rpcenv1 = RestEnvironment::new(rpcenv.env_type()); |
160 | rpcenv1.set_user(rpcenv.get_user()); | |
161 | ||
162 | WorkerTask::spawn("test2_download", Some(worker_id), &rpcenv.get_user().unwrap(), true, move |worker| { | |
163 | let service = BackupService::new(rpcenv1, worker.clone()); | |
164 | ||
a66ab8ae DM |
165 | let abort_future = worker.abort_future(); |
166 | ||
152764ec DM |
167 | req_body |
168 | .on_upgrade() | |
92ac375a | 169 | .map_err(Error::from) |
152764ec | 170 | .and_then(move |conn| { |
d9bd06ea | 171 | worker.log("upgrade done"); |
92ac375a DM |
172 | |
173 | let mut http = hyper::server::conn::Http::new(); | |
174 | http.http2_only(true); | |
175 | ||
d9bd06ea DM |
176 | http.serve_connection(conn, service) |
177 | .map_err(Error::from) | |
178 | .then(|x| { | |
179 | println!("H2 END"); | |
180 | x | |
181 | }) | |
152764ec | 182 | }) |
a66ab8ae | 183 | .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); })) |
e3a44552 DM |
184 | .and_then(|(result, _)| Ok(result)) |
185 | .map_err(|(err, _)| err) | |
d9bd06ea | 186 | }).unwrap(); |
152764ec DM |
187 | |
188 | Ok(Box::new(futures::future::ok( | |
189 | Response::builder() | |
190 | .status(StatusCode::SWITCHING_PROTOCOLS) | |
191 | .header(UPGRADE, HeaderValue::from_static(expected_protocol)) | |
192 | .body(Body::empty()) | |
193 | .unwrap() | |
194 | ))) | |
195 | } | |
92ac375a DM |
196 | |
197 | fn backup_api() -> Router { | |
198 | ||
199 | let test1 = Router::new() | |
200 | .get( | |
201 | ApiMethod::new( | |
202 | test1_get, | |
52cf506e DM |
203 | ObjectSchema::new("Test sync callback.") |
204 | ) | |
205 | ); | |
206 | ||
207 | let test2 = Router::new() | |
208 | .download( | |
209 | ApiAsyncMethod::new( | |
210 | test2_get, | |
211 | ObjectSchema::new("Test async callback.") | |
92ac375a DM |
212 | ) |
213 | ); | |
214 | ||
215 | let router = Router::new() | |
216 | .subdir("test1", test1) | |
52cf506e | 217 | .subdir("test2", test2) |
92ac375a DM |
218 | .list_subdirs(); |
219 | ||
220 | router | |
221 | } | |
222 | ||
223 | fn test1_get ( | |
224 | _param: Value, | |
225 | _info: &ApiMethod, | |
226 | _rpcenv: &mut RpcEnvironment, | |
227 | ) -> Result<Value, Error> { | |
228 | ||
92ac375a DM |
229 | Ok(Value::Null) |
230 | } | |
52cf506e DM |
231 | |
232 | fn test2_get( | |
d9bd06ea DM |
233 | _parts: Parts, |
234 | _req_body: Body, | |
235 | _param: Value, | |
52cf506e | 236 | _info: &ApiAsyncMethod, |
d9bd06ea | 237 | _rpcenv: &mut RpcEnvironment, |
52cf506e | 238 | ) -> Result<BoxFut, Error> { |
52cf506e DM |
239 | |
240 | let fut = tokio::timer::Interval::new_interval(std::time::Duration::from_millis(300)) | |
241 | .map_err(|err| http_err!(INTERNAL_SERVER_ERROR, format!("tokio timer interval error: {}", err))) | |
a66ab8ae | 242 | .take(50) |
52cf506e DM |
243 | .for_each(|tv| { |
244 | println!("LOOP {:?}", tv); | |
245 | Ok(()) | |
246 | }) | |
247 | .and_then(|_| { | |
248 | println!("TASK DONE"); | |
249 | Ok(Response::builder() | |
250 | .status(StatusCode::OK) | |
251 | .body(Body::empty()) | |
252 | .unwrap()) | |
253 | }); | |
254 | ||
255 | Ok(Box::new(fut)) | |
256 | } |