]>
Commit | Line | Data |
---|---|---|
1 | use failure::*; | |
2 | ||
3 | use std::collections::HashMap; | |
4 | use std::sync::Arc; | |
5 | ||
6 | use futures::*; | |
7 | use hyper::{Body, Request, Response, StatusCode}; | |
8 | ||
9 | use crate::tools; | |
10 | use crate::api_schema::router::*; | |
11 | use crate::server::formatter::*; | |
12 | use crate::server::WorkerTask; | |
13 | ||
14 | /// Hyper Service implementation to handle stateful H2 connections. | |
15 | /// | |
16 | /// We use this kind of service to handle backup protocol | |
17 | /// connections. State is stored inside the generic ``rpcenv``. Logs | |
18 | /// goes into the ``WorkerTask`` log. | |
19 | pub struct H2Service<E> { | |
20 | router: &'static Router, | |
21 | rpcenv: E, | |
22 | worker: Arc<WorkerTask>, | |
23 | debug: bool, | |
24 | } | |
25 | ||
26 | impl <E: RpcEnvironment + Clone> H2Service<E> { | |
27 | ||
28 | pub fn new(rpcenv: E, worker: Arc<WorkerTask>, router: &'static Router, debug: bool) -> Self { | |
29 | Self { rpcenv, worker, router, debug } | |
30 | } | |
31 | ||
32 | pub fn debug<S: AsRef<str>>(&self, msg: S) { | |
33 | if self.debug { self.worker.log(msg); } | |
34 | } | |
35 | ||
36 | fn handle_request(&self, req: Request<Body>) -> BoxFut { | |
37 | ||
38 | let (parts, body) = req.into_parts(); | |
39 | ||
40 | let method = parts.method.clone(); | |
41 | ||
42 | let (path, components) = match tools::normalize_uri_path(parts.uri.path()) { | |
43 | Ok((p,c)) => (p, c), | |
44 | Err(err) => return Box::new(future::err(http_err!(BAD_REQUEST, err.to_string()))), | |
45 | }; | |
46 | ||
47 | self.debug(format!("{} {}", method, path)); | |
48 | ||
49 | let mut uri_param = HashMap::new(); | |
50 | ||
51 | let formatter = &JSON_FORMATTER; | |
52 | ||
53 | match self.router.find_method(&components, method, &mut uri_param) { | |
54 | MethodDefinition::None => { | |
55 | let err = http_err!(NOT_FOUND, "Path not found.".to_string()); | |
56 | return Box::new(future::ok((formatter.format_error)(err))); | |
57 | } | |
58 | MethodDefinition::Simple(api_method) => { | |
59 | return crate::server::rest::handle_sync_api_request( | |
60 | self.rpcenv.clone(), api_method, formatter, parts, body, uri_param); | |
61 | } | |
62 | MethodDefinition::Async(async_method) => { | |
63 | return crate::server::rest::handle_async_api_request( | |
64 | self.rpcenv.clone(), async_method, formatter, parts, body, uri_param); | |
65 | } | |
66 | } | |
67 | } | |
68 | ||
69 | fn log_response(worker: Arc<WorkerTask>, method: hyper::Method, path: &str, resp: &Response<Body>) { | |
70 | ||
71 | let status = resp.status(); | |
72 | ||
73 | if !status.is_success() { | |
74 | let reason = status.canonical_reason().unwrap_or("unknown reason"); | |
75 | ||
76 | let mut message = "request failed"; | |
77 | if let Some(data) = resp.extensions().get::<ErrorMessageExtension>() { | |
78 | message = &data.0; | |
79 | } | |
80 | ||
81 | worker.log(format!("{} {}: {} {}: {}", method.as_str(), path, status.as_str(), reason, message)); | |
82 | } | |
83 | } | |
84 | } | |
85 | ||
86 | impl <E: RpcEnvironment + Clone> hyper::service::Service for H2Service<E> { | |
87 | type ReqBody = Body; | |
88 | type ResBody = Body; | |
89 | type Error = Error; | |
90 | type Future = Box<dyn Future<Item = Response<Body>, Error = Self::Error> + Send>; | |
91 | ||
92 | fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future { | |
93 | let path = req.uri().path().to_owned(); | |
94 | let method = req.method().clone(); | |
95 | let worker = self.worker.clone(); | |
96 | ||
97 | Box::new(self.handle_request(req).then(move |result| { | |
98 | match result { | |
99 | Ok(res) => { | |
100 | Self::log_response(worker, method, &path, &res); | |
101 | Ok::<_, Error>(res) | |
102 | } | |
103 | Err(err) => { | |
104 | if let Some(apierr) = err.downcast_ref::<HttpError>() { | |
105 | let mut resp = Response::new(Body::from(apierr.message.clone())); | |
106 | resp.extensions_mut().insert(ErrorMessageExtension(apierr.message.clone())); | |
107 | *resp.status_mut() = apierr.code; | |
108 | Self::log_response(worker, method, &path, &resp); | |
109 | Ok(resp) | |
110 | } else { | |
111 | let mut resp = Response::new(Body::from(err.to_string())); | |
112 | resp.extensions_mut().insert(ErrorMessageExtension(err.to_string())); | |
113 | *resp.status_mut() = StatusCode::BAD_REQUEST; | |
114 | Self::log_response(worker, method, &path, &resp); | |
115 | Ok(resp) | |
116 | } | |
117 | } | |
118 | } | |
119 | })) | |
120 | } | |
121 | } |