]>
Commit | Line | Data |
---|---|---|
16b48b81 DM |
1 | use crate::json_schema::*; |
2 | use crate::api_info::*; | |
3 | use crate::api_config::*; | |
4 | ||
1a53be14 DM |
5 | use std::collections::HashMap; |
6 | use std::path::{PathBuf}; | |
9bc17e8d DM |
7 | use std::sync::Arc; |
8 | ||
9 | use failure::*; | |
10 | use serde_json::{json, Value}; | |
11 | ||
1a53be14 | 12 | |
1a53be14 | 13 | |
9bc17e8d DM |
14 | use futures::future::{self, Either}; |
15 | //use tokio::prelude::*; | |
16 | //use tokio::timer::Delay; | |
17 | use tokio::fs::File; | |
18 | use tokio_codec; | |
19 | //use bytes::{BytesMut, BufMut}; | |
20 | ||
21 | //use hyper::body::Payload; | |
22 | use hyper::http::request::Parts; | |
23 | use hyper::{Body, Request, Method, Response, StatusCode}; | |
24 | use hyper::service::{Service, NewService}; | |
25 | use hyper::rt::{Future, Stream}; | |
26 | use hyper::header; | |
27 | ||
9bc17e8d DM |
28 | type BoxFut = Box<Future<Item = Response<Body>, Error = failure::Error> + Send>; |
29 | ||
30 | macro_rules! error_response { | |
31 | ($status:ident, $msg:expr) => {{ | |
32 | let mut resp = Response::new(Body::from($msg)); | |
33 | *resp.status_mut() = StatusCode::$status; | |
34 | resp | |
35 | }} | |
36 | } | |
37 | ||
38 | macro_rules! http_error_future { | |
39 | ($status:ident, $msg:expr) => {{ | |
40 | let resp = error_response!($status, $msg); | |
41 | return Box::new(future::ok(resp)); | |
42 | }} | |
43 | } | |
44 | ||
45 | fn get_request_parameters_async<'a>( | |
46 | info: &'a ApiMethod, | |
47 | parts: Parts, | |
48 | req_body: Body, | |
49 | ) -> Box<Future<Item = Value, Error = failure::Error> + Send + 'a> | |
50 | { | |
51 | let resp = req_body | |
52 | .map_err(|err| Error::from(ApiError::new(StatusCode::BAD_REQUEST, format!("Promlems reading request body: {}", err)))) | |
53 | .fold(Vec::new(), |mut acc, chunk| { | |
54 | if acc.len() + chunk.len() < 64*1024 { //fimxe: max request body size? | |
55 | acc.extend_from_slice(&*chunk); | |
56 | Ok(acc) | |
57 | } | |
58 | else { Err(Error::from(ApiError::new(StatusCode::BAD_REQUEST, format!("Request body too large")))) } | |
59 | }) | |
60 | .and_then(move |body| { | |
61 | ||
62 | let bytes = String::from_utf8(body.to_vec())?; // why copy?? | |
63 | ||
64 | println!("GOT BODY {:?}", bytes); | |
65 | ||
66 | let mut test_required = true; | |
67 | ||
68 | let mut params = json!({}); | |
69 | ||
70 | if bytes.len() > 0 { | |
71 | params = parse_query_string(&bytes, &info.parameters, true)?; | |
72 | test_required = false; | |
73 | } | |
74 | ||
75 | if let Some(query_str) = parts.uri.query() { | |
76 | let query_params = parse_query_string(query_str, &info.parameters, test_required)?; | |
77 | ||
78 | for (k, v) in query_params.as_object().unwrap() { | |
79 | params[k] = v.clone(); // fixme: why clone()?? | |
80 | } | |
81 | } | |
82 | ||
83 | println!("GOT PARAMS {}", params); | |
84 | Ok(params) | |
85 | }); | |
86 | ||
87 | Box::new(resp) | |
88 | } | |
89 | ||
90 | fn handle_sync_api_request<'a>( | |
91 | info: &'a ApiMethod, | |
92 | parts: Parts, | |
93 | req_body: Body, | |
94 | ) -> Box<Future<Item = Response<Body>, Error = failure::Error> + Send + 'a> | |
95 | { | |
96 | let params = get_request_parameters_async(info, parts, req_body); | |
97 | ||
98 | let resp = params | |
99 | .and_then(move |params| { | |
100 | ||
101 | println!("GOT PARAMS {}", params); | |
102 | ||
103 | /* | |
104 | let when = Instant::now() + Duration::from_millis(3000); | |
105 | let task = Delay::new(when).then(|_| { | |
106 | println!("A LAZY TASK"); | |
107 | ok(()) | |
108 | }); | |
109 | ||
110 | tokio::spawn(task); | |
111 | */ | |
112 | ||
113 | let res = (info.handler)(params, info)?; | |
114 | ||
115 | Ok(res) | |
116 | ||
117 | }).then(|result| { | |
118 | match result { | |
119 | Ok(ref value) => { | |
120 | let json_str = value.to_string(); | |
121 | ||
122 | Ok(Response::builder() | |
123 | .status(StatusCode::OK) | |
124 | .header(header::CONTENT_TYPE, "application/json") | |
125 | .body(Body::from(json_str))?) | |
126 | } | |
127 | Err(err) => Ok(error_response!(BAD_REQUEST, err.to_string())) | |
128 | } | |
129 | }); | |
130 | ||
131 | Box::new(resp) | |
132 | } | |
133 | ||
134 | fn simple_static_file_download(filename: PathBuf) -> BoxFut { | |
135 | ||
136 | Box::new(File::open(filename) | |
137 | .map_err(|err| Error::from(ApiError::new(StatusCode::BAD_REQUEST, format!("File open failed: {}", err)))) | |
138 | .and_then(|file| { | |
139 | let buf: Vec<u8> = Vec::new(); | |
140 | tokio::io::read_to_end(file, buf) | |
141 | .map_err(|err| Error::from(ApiError::new(StatusCode::BAD_REQUEST, format!("File read failed: {}", err)))) | |
142 | .and_then(|data| Ok(Response::new(data.1.into()))) | |
143 | })) | |
144 | } | |
145 | ||
146 | fn chuncked_static_file_download(filename: PathBuf) -> BoxFut { | |
147 | ||
148 | Box::new(File::open(filename) | |
149 | .map_err(|err| Error::from(ApiError::new(StatusCode::BAD_REQUEST, format!("File open failed: {}", err)))) | |
150 | .and_then(|file| { | |
151 | let payload = tokio_codec::FramedRead::new(file, tokio_codec::BytesCodec::new()). | |
152 | map(|bytes| { | |
153 | //sigh - howto avoid copy here? or the whole map() ?? | |
154 | hyper::Chunk::from(bytes.to_vec()) | |
155 | }); | |
156 | let body = Body::wrap_stream(payload); | |
157 | // fixme: set content type and other headers | |
158 | Ok(Response::builder() | |
159 | .status(StatusCode::OK) | |
160 | .body(body) | |
161 | .unwrap()) | |
162 | })) | |
163 | } | |
164 | ||
165 | fn handle_static_file_download(filename: PathBuf) -> BoxFut { | |
166 | ||
167 | let response = tokio::fs::metadata(filename.clone()) | |
168 | .map_err(|err| Error::from(ApiError::new(StatusCode::BAD_REQUEST, format!("File access problems: {}", err)))) | |
169 | .and_then(|metadata| { | |
170 | if metadata.len() < 1024*32 { | |
171 | Either::A(simple_static_file_download(filename)) | |
172 | } else { | |
173 | Either::B(chuncked_static_file_download(filename)) | |
174 | } | |
175 | }); | |
176 | ||
177 | return Box::new(response); | |
178 | } | |
179 | ||
180 | pub fn handle_request(api: Arc<ApiConfig>, req: Request<Body>) -> BoxFut { | |
181 | ||
182 | let (parts, body) = req.into_parts(); | |
183 | ||
184 | let method = parts.method.clone(); | |
185 | let path = parts.uri.path(); | |
186 | ||
187 | // normalize path | |
188 | // do not allow ".", "..", or hidden files ".XXXX" | |
189 | // also remove empty path components | |
190 | ||
191 | let items = path.split('/'); | |
192 | let mut path = String::new(); | |
193 | let mut components = vec![]; | |
194 | ||
195 | for name in items { | |
196 | if name.is_empty() { continue; } | |
197 | if name.starts_with(".") { | |
198 | http_error_future!(BAD_REQUEST, "Path contains illegal components.\n"); | |
199 | } | |
200 | path.push('/'); | |
201 | path.push_str(name); | |
202 | components.push(name); | |
203 | } | |
204 | ||
205 | let comp_len = components.len(); | |
206 | ||
207 | println!("REQUEST {} {}", method, path); | |
208 | println!("COMPO {:?}", components); | |
209 | ||
210 | if comp_len >= 1 && components[0] == "api3" { | |
211 | println!("GOT API REQUEST"); | |
212 | if comp_len >= 2 { | |
213 | let format = components[1]; | |
214 | if format != "json" { | |
215 | http_error_future!(BAD_REQUEST, format!("Unsupported output format '{}'.", format)) | |
216 | } | |
217 | ||
218 | if let Some(api_method) = api.find_method(&components[2..], method) { | |
219 | // fixme: handle auth | |
220 | return handle_sync_api_request(api_method, parts, body); | |
221 | } | |
222 | } | |
223 | } else { | |
224 | // not Auth for accessing files! | |
225 | ||
226 | let filename = api.find_alias(&components); | |
227 | return handle_static_file_download(filename); | |
228 | } | |
229 | ||
230 | http_error_future!(NOT_FOUND, "Path not found.") | |
231 | //Box::new(ok(Response::new(Body::from("RETURN WEB GUI\n")))) | |
232 | } | |
233 | ||
234 | pub struct RestServer { | |
235 | pub api_config: Arc<ApiConfig>, | |
236 | } | |
237 | ||
238 | impl RestServer { | |
239 | ||
240 | pub fn new(api_config: ApiConfig) -> Self { | |
241 | Self { api_config: Arc::new(api_config) } | |
242 | } | |
243 | } | |
244 | ||
245 | impl NewService for RestServer | |
246 | { | |
247 | type ReqBody = Body; | |
248 | type ResBody = Body; | |
249 | type Error = hyper::Error; | |
250 | type InitError = hyper::Error; | |
251 | type Service = ApiService; | |
252 | type Future = Box<Future<Item = Self::Service, Error = Self::InitError> + Send>; | |
253 | fn new_service(&self) -> Self::Future { | |
254 | Box::new(future::ok(ApiService { api_config: self.api_config.clone() })) | |
255 | } | |
256 | } | |
257 | ||
258 | pub struct ApiService { | |
259 | pub api_config: Arc<ApiConfig>, | |
260 | } | |
261 | ||
262 | impl Service for ApiService { | |
263 | type ReqBody = Body; | |
264 | type ResBody = Body; | |
265 | type Error = hyper::Error; | |
266 | type Future = Box<Future<Item = Response<Body>, Error = Self::Error> + Send>; | |
267 | ||
268 | fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future { | |
269 | ||
270 | Box::new(handle_request(self.api_config.clone(), req).then(|result| { | |
271 | match result { | |
272 | Ok(res) => Ok::<_, hyper::Error>(res), | |
273 | Err(err) => { | |
274 | if let Some(apierr) = err.downcast_ref::<ApiError>() { | |
275 | let mut resp = Response::new(Body::from(apierr.message.clone())); | |
276 | *resp.status_mut() = apierr.code; | |
277 | Ok(resp) | |
278 | } else { | |
279 | let mut resp = Response::new(Body::from(err.to_string())); | |
280 | *resp.status_mut() = StatusCode::BAD_REQUEST; | |
281 | Ok(resp) | |
282 | } | |
283 | } | |
284 | } | |
285 | })) | |
286 | } | |
287 | } |