1 use std
::collections
::HashMap
;
2 use std
::future
::Future
;
3 use std
::hash
::BuildHasher
;
4 use std
::path
::{Path, PathBuf}
;
7 use std
::task
::{Context, Poll}
;
9 use anyhow
::{bail, format_err, Error}
;
10 use futures
::future
::{self, FutureExt, TryFutureExt}
;
11 use futures
::stream
::TryStreamExt
;
13 use hyper
::http
::request
::Parts
;
14 use hyper
::{Body, Request, Response, StatusCode}
;
15 use serde_json
::{json, Value}
;
17 use tokio
::time
::Instant
;
18 use url
::form_urlencoded
;
20 use proxmox
::http_err
;
30 use proxmox
::api
::schema
::{
32 parse_parameter_strings
,
37 use super::environment
::RestEnvironment
;
38 use super::formatter
::*;
41 use crate::auth_helpers
::*;
42 use crate::api2
::types
::Userid
;
44 use crate::tools
::ticket
::Ticket
;
45 use crate::config
::cached_user_info
::CachedUserInfo
;
47 extern "C" { fn tzset(); }
49 pub struct RestServer
{
50 pub api_config
: Arc
<ApiConfig
>,
55 pub fn new(api_config
: ApiConfig
) -> Self {
56 Self { api_config: Arc::new(api_config) }
60 impl tower_service
::Service
<&tokio_openssl
::SslStream
<tokio
::net
::TcpStream
>> for RestServer
{
61 type Response
= ApiService
;
63 type Future
= Pin
<Box
<dyn Future
<Output
= Result
<ApiService
, Error
>> + Send
>>;
65 fn poll_ready(&mut self, _cx
: &mut Context
) -> Poll
<Result
<(), Self::Error
>> {
69 fn call(&mut self, ctx
: &tokio_openssl
::SslStream
<tokio
::net
::TcpStream
>) -> Self::Future
{
70 match ctx
.get_ref().peer_addr() {
72 future
::err(format_err
!("unable to get peer address - {}", err
)).boxed()
75 future
::ok(ApiService { peer, api_config: self.api_config.clone() }
).boxed()
81 impl tower_service
::Service
<&tokio
::net
::TcpStream
> for RestServer
{
82 type Response
= ApiService
;
84 type Future
= Pin
<Box
<dyn Future
<Output
= Result
<ApiService
, Error
>> + Send
>>;
86 fn poll_ready(&mut self, _cx
: &mut Context
) -> Poll
<Result
<(), Self::Error
>> {
90 fn call(&mut self, ctx
: &tokio
::net
::TcpStream
) -> Self::Future
{
91 match ctx
.peer_addr() {
93 future
::err(format_err
!("unable to get peer address - {}", err
)).boxed()
96 future
::ok(ApiService { peer, api_config: self.api_config.clone() }
).boxed()
102 pub struct ApiService
{
103 pub peer
: std
::net
::SocketAddr
,
104 pub api_config
: Arc
<ApiConfig
>,
108 peer
: &std
::net
::SocketAddr
,
109 method
: hyper
::Method
,
111 resp
: &Response
<Body
>,
114 if resp
.extensions().get
::<NoLogExtension
>().is_some() { return; }
;
116 let status
= resp
.status();
118 if !(status
.is_success() || status
.is_informational()) {
119 let reason
= status
.canonical_reason().unwrap_or("unknown reason");
121 let mut message
= "request failed";
122 if let Some(data
) = resp
.extensions().get
::<ErrorMessageExtension
>() {
126 log
::error
!("{} {}: {} {}: [client {}] {}", method
.as_str(), path
, status
.as_str(), reason
, peer
, message
);
130 impl tower_service
::Service
<Request
<Body
>> for ApiService
{
131 type Response
= Response
<Body
>;
133 type Future
= Pin
<Box
<dyn Future
<Output
= Result
<Response
<Body
>, Self::Error
>> + Send
>>;
135 fn poll_ready(&mut self, _cx
: &mut Context
) -> Poll
<Result
<(), Self::Error
>> {
139 fn call(&mut self, req
: Request
<Body
>) -> Self::Future
{
140 let path
= req
.uri().path().to_owned();
141 let method
= req
.method().clone();
143 let config
= Arc
::clone(&self.api_config
);
144 let peer
= self.peer
;
146 match handle_request(config
, req
).await
{
148 log_response(&peer
, method
, &path
, &res
);
149 Ok
::<_
, Self::Error
>(res
)
152 if let Some(apierr
) = err
.downcast_ref
::<HttpError
>() {
153 let mut resp
= Response
::new(Body
::from(apierr
.message
.clone()));
154 *resp
.status_mut() = apierr
.code
;
155 log_response(&peer
, method
, &path
, &resp
);
158 let mut resp
= Response
::new(Body
::from(err
.to_string()));
159 *resp
.status_mut() = StatusCode
::BAD_REQUEST
;
160 log_response(&peer
, method
, &path
, &resp
);
170 fn parse_query_parameters
<S
: '
static + BuildHasher
+ Send
>(
171 param_schema
: &ObjectSchema
,
172 form
: &str, // x-www-form-urlencoded body data
174 uri_param
: &HashMap
<String
, String
, S
>,
175 ) -> Result
<Value
, Error
> {
177 let mut param_list
: Vec
<(String
, String
)> = vec
![];
179 if !form
.is_empty() {
180 for (k
, v
) in form_urlencoded
::parse(form
.as_bytes()).into_owned() {
181 param_list
.push((k
, v
));
185 if let Some(query_str
) = parts
.uri
.query() {
186 for (k
, v
) in form_urlencoded
::parse(query_str
.as_bytes()).into_owned() {
187 if k
== "_dc" { continue; }
// skip extjs "disable cache" parameter
188 param_list
.push((k
, v
));
192 for (k
, v
) in uri_param
{
193 param_list
.push((k
.clone(), v
.clone()));
196 let params
= parse_parameter_strings(¶m_list
, param_schema
, true)?
;
201 async
fn get_request_parameters
<S
: '
static + BuildHasher
+ Send
>(
202 param_schema
: &ObjectSchema
,
205 uri_param
: HashMap
<String
, String
, S
>,
206 ) -> Result
<Value
, Error
> {
208 let mut is_json
= false;
210 if let Some(value
) = parts
.headers
.get(header
::CONTENT_TYPE
) {
211 match value
.to_str().map(|v
| v
.split('
;'
).next()) {
212 Ok(Some("application/x-www-form-urlencoded")) => {
215 Ok(Some("application/json")) => {
218 _
=> bail
!("unsupported content type {:?}", value
.to_str()),
223 .map_err(|err
| http_err
!(BAD_REQUEST
, "Promlems reading request body: {}", err
))
224 .try_fold(Vec
::new(), |mut acc
, chunk
| async
move {
225 if acc
.len() + chunk
.len() < 64*1024 { //fimxe: max request body size?
226 acc
.extend_from_slice(&*chunk
);
229 Err(http_err
!(BAD_REQUEST
, "Request body too large"))
233 let utf8_data
= std
::str::from_utf8(&body
)
234 .map_err(|err
| format_err
!("Request body not uft8: {}", err
))?
;
237 let mut params
: Value
= serde_json
::from_str(utf8_data
)?
;
238 for (k
, v
) in uri_param
{
239 if let Some((_optional
, prop_schema
)) = param_schema
.lookup(&k
) {
240 params
[&k
] = parse_simple_value(&v
, prop_schema
)?
;
243 verify_json_object(¶ms
, param_schema
)?
;
246 parse_query_parameters(param_schema
, utf8_data
, &parts
, &uri_param
)
250 struct NoLogExtension();
252 async
fn proxy_protected_request(
253 info
: &'
static ApiMethod
,
256 ) -> Result
<Response
<Body
>, Error
> {
258 let mut uri_parts
= parts
.uri
.clone().into_parts();
260 uri_parts
.scheme
= Some(http
::uri
::Scheme
::HTTP
);
261 uri_parts
.authority
= Some(http
::uri
::Authority
::from_static("127.0.0.1:82"));
262 let new_uri
= http
::Uri
::from_parts(uri_parts
).unwrap();
266 let request
= Request
::from_parts(parts
, req_body
);
268 let reload_timezone
= info
.reload_timezone
;
270 let resp
= hyper
::client
::Client
::new()
272 .map_err(Error
::from
)
274 resp
.extensions_mut().insert(NoLogExtension());
279 if reload_timezone { unsafe { tzset(); }
}
284 pub async
fn handle_api_request
<Env
: RpcEnvironment
, S
: '
static + BuildHasher
+ Send
>(
286 info
: &'
static ApiMethod
,
287 formatter
: &'
static OutputFormatter
,
290 uri_param
: HashMap
<String
, String
, S
>,
291 ) -> Result
<Response
<Body
>, Error
> {
293 let delay_unauth_time
= std
::time
::Instant
::now() + std
::time
::Duration
::from_millis(3000);
295 let result
= match info
.handler
{
296 ApiHandler
::AsyncHttp(handler
) => {
297 let params
= parse_query_parameters(info
.parameters
, "", &parts
, &uri_param
)?
;
298 (handler
)(parts
, req_body
, params
, info
, Box
::new(rpcenv
)).await
300 ApiHandler
::Sync(handler
) => {
301 let params
= get_request_parameters(info
.parameters
, parts
, req_body
, uri_param
).await?
;
302 (handler
)(params
, info
, &mut rpcenv
)
303 .map(|data
| (formatter
.format_data
)(data
, &rpcenv
))
305 ApiHandler
::Async(handler
) => {
306 let params
= get_request_parameters(info
.parameters
, parts
, req_body
, uri_param
).await?
;
307 (handler
)(params
, info
, &mut rpcenv
)
309 .map(|data
| (formatter
.format_data
)(data
, &rpcenv
))
313 let resp
= match result
{
316 if let Some(httperr
) = err
.downcast_ref
::<HttpError
>() {
317 if httperr
.code
== StatusCode
::UNAUTHORIZED
{
318 tokio
::time
::delay_until(Instant
::from_std(delay_unauth_time
)).await
;
321 (formatter
.format_error
)(err
)
325 if info
.reload_timezone { unsafe { tzset(); }
}
331 userid
: Option
<Userid
>,
332 csrf_token
: Option
<String
>,
333 language
: Option
<String
>,
334 api
: &Arc
<ApiConfig
>,
336 ) -> Response
<Body
> {
338 let nodename
= proxmox
::tools
::nodename();
339 let user
= userid
.as_ref().map(|u
| u
.as_str()).unwrap_or("");
341 let csrf_token
= csrf_token
.unwrap_or_else(|| String
::from(""));
343 let mut debug
= false;
344 let mut template_file
= "index";
346 if let Some(query_str
) = parts
.uri
.query() {
347 for (k
, v
) in form_urlencoded
::parse(query_str
.as_bytes()).into_owned() {
348 if k
== "debug" && v
!= "0" && v
!= "false" {
350 } else if k
== "console" {
351 template_file
= "console";
356 let mut lang
= String
::from("");
357 if let Some(language
) = language
{
358 if Path
::new(&format
!("/usr/share/pbs-i18n/pbs-lang-{}.js", language
)).exists() {
364 "NodeName": nodename
,
366 "CSRFPreventionToken": csrf_token
,
371 let (ct
, index
) = match api
.render_template(template_file
, &data
) {
372 Ok(index
) => ("text/html", index
),
374 ("text/plain", format
!("Error rendering template: {}", err
))
379 .status(StatusCode
::OK
)
380 .header(header
::CONTENT_TYPE
, ct
)
385 fn extension_to_content_type(filename
: &Path
) -> (&'
static str, bool
) {
387 if let Some(ext
) = filename
.extension().and_then(|osstr
| osstr
.to_str()) {
389 "css" => ("text/css", false),
390 "html" => ("text/html", false),
391 "js" => ("application/javascript", false),
392 "json" => ("application/json", false),
393 "map" => ("application/json", false),
394 "png" => ("image/png", true),
395 "ico" => ("image/x-icon", true),
396 "gif" => ("image/gif", true),
397 "svg" => ("image/svg+xml", false),
398 "jar" => ("application/java-archive", true),
399 "woff" => ("application/font-woff", true),
400 "woff2" => ("application/font-woff2", true),
401 "ttf" => ("application/font-snft", true),
402 "pdf" => ("application/pdf", true),
403 "epub" => ("application/epub+zip", true),
404 "mp3" => ("audio/mpeg", true),
405 "oga" => ("audio/ogg", true),
406 "tgz" => ("application/x-compressed-tar", true),
407 _
=> ("application/octet-stream", false),
411 ("application/octet-stream", false)
414 async
fn simple_static_file_download(filename
: PathBuf
) -> Result
<Response
<Body
>, Error
> {
416 let (content_type
, _nocomp
) = extension_to_content_type(&filename
);
418 use tokio
::io
::AsyncReadExt
;
420 let mut file
= File
::open(filename
)
422 .map_err(|err
| http_err
!(BAD_REQUEST
, "File open failed: {}", err
))?
;
424 let mut data
: Vec
<u8> = Vec
::new();
425 file
.read_to_end(&mut data
)
427 .map_err(|err
| http_err
!(BAD_REQUEST
, "File read failed: {}", err
))?
;
429 let mut response
= Response
::new(data
.into());
430 response
.headers_mut().insert(
431 header
::CONTENT_TYPE
,
432 header
::HeaderValue
::from_static(content_type
));
436 async
fn chuncked_static_file_download(filename
: PathBuf
) -> Result
<Response
<Body
>, Error
> {
437 let (content_type
, _nocomp
) = extension_to_content_type(&filename
);
439 let file
= File
::open(filename
)
441 .map_err(|err
| http_err
!(BAD_REQUEST
, "File open failed: {}", err
))?
;
443 let payload
= tokio_util
::codec
::FramedRead
::new(file
, tokio_util
::codec
::BytesCodec
::new())
444 .map_ok(|bytes
| hyper
::body
::Bytes
::from(bytes
.freeze()));
445 let body
= Body
::wrap_stream(payload
);
447 // fixme: set other headers ?
448 Ok(Response
::builder()
449 .status(StatusCode
::OK
)
450 .header(header
::CONTENT_TYPE
, content_type
)
456 async
fn handle_static_file_download(filename
: PathBuf
) -> Result
<Response
<Body
>, Error
> {
458 let metadata
= tokio
::fs
::metadata(filename
.clone())
459 .map_err(|err
| http_err
!(BAD_REQUEST
, "File access problems: {}", err
))
462 if metadata
.len() < 1024*32 {
463 simple_static_file_download(filename
).await
465 chuncked_static_file_download(filename
).await
469 fn extract_auth_data(headers
: &http
::HeaderMap
) -> (Option
<String
>, Option
<String
>, Option
<String
>) {
471 let mut ticket
= None
;
472 let mut language
= None
;
473 if let Some(raw_cookie
) = headers
.get("COOKIE") {
474 if let Ok(cookie
) = raw_cookie
.to_str() {
475 ticket
= tools
::extract_cookie(cookie
, "PBSAuthCookie");
476 language
= tools
::extract_cookie(cookie
, "PBSLangCookie");
480 let token
= match headers
.get("CSRFPreventionToken").map(|v
| v
.to_str()) {
481 Some(Ok(v
)) => Some(v
.to_owned()),
485 (ticket
, token
, language
)
489 method
: &hyper
::Method
,
490 ticket
: &Option
<String
>,
491 token
: &Option
<String
>,
492 user_info
: &CachedUserInfo
,
493 ) -> Result
<Userid
, Error
> {
494 let ticket_lifetime
= tools
::ticket
::TICKET_LIFETIME
;
496 let ticket
= ticket
.as_ref().map(String
::as_str
);
497 let userid
: Userid
= Ticket
::parse(&ticket
.ok_or_else(|| format_err
!("missing ticket"))?
)?
498 .verify_with_time_frame(public_auth_key(), "PBS", None
, -300..ticket_lifetime
)?
;
500 if !user_info
.is_active_user(&userid
) {
501 bail
!("user account disabled or expired.");
504 if method
!= hyper
::Method
::GET
{
505 if let Some(token
) = token
{
506 verify_csrf_prevention_token(csrf_secret(), &userid
, &token
, -300, ticket_lifetime
)?
;
508 bail
!("missing CSRF prevention token");
515 async
fn handle_request(api
: Arc
<ApiConfig
>, req
: Request
<Body
>) -> Result
<Response
<Body
>, Error
> {
517 let (parts
, body
) = req
.into_parts();
519 let method
= parts
.method
.clone();
520 let (path
, components
) = tools
::normalize_uri_path(parts
.uri
.path())?
;
522 let comp_len
= components
.len();
524 //println!("REQUEST {} {}", method, path);
525 //println!("COMPO {:?}", components);
527 let env_type
= api
.env_type();
528 let mut rpcenv
= RestEnvironment
::new(env_type
);
530 let user_info
= CachedUserInfo
::new()?
;
532 let delay_unauth_time
= std
::time
::Instant
::now() + std
::time
::Duration
::from_millis(3000);
533 let access_forbidden_time
= std
::time
::Instant
::now() + std
::time
::Duration
::from_millis(500);
535 if comp_len
>= 1 && components
[0] == "api2" {
539 let format
= components
[1];
541 let formatter
= match format
{
542 "json" => &JSON_FORMATTER
,
543 "extjs" => &EXTJS_FORMATTER
,
544 _
=> bail
!("Unsupported output format '{}'.", format
),
547 let mut uri_param
= HashMap
::new();
548 let api_method
= api
.find_method(&components
[2..], method
.clone(), &mut uri_param
);
550 let mut auth_required
= true;
551 if let Some(api_method
) = api_method
{
552 if let Permission
::World
= *api_method
.access
.permission
{
553 auth_required
= false; // no auth for endpoints with World permission
558 let (ticket
, token
, _
) = extract_auth_data(&parts
.headers
);
559 match check_auth(&method
, &ticket
, &token
, &user_info
) {
560 Ok(userid
) => rpcenv
.set_user(Some(userid
.to_string())),
562 // always delay unauthorized calls by 3 seconds (from start of request)
563 let err
= http_err
!(UNAUTHORIZED
, "authentication failed - {}", err
);
564 tokio
::time
::delay_until(Instant
::from_std(delay_unauth_time
)).await
;
565 return Ok((formatter
.format_error
)(err
));
572 let err
= http_err
!(NOT_FOUND
, "Path '{}' not found.", path
);
573 return Ok((formatter
.format_error
)(err
));
575 Some(api_method
) => {
576 let user
= rpcenv
.get_user();
577 if !check_api_permission(api_method
.access
.permission
, user
.as_deref(), &uri_param
, user_info
.as_ref()) {
578 let err
= http_err
!(FORBIDDEN
, "permission check failed");
579 tokio
::time
::delay_until(Instant
::from_std(access_forbidden_time
)).await
;
580 return Ok((formatter
.format_error
)(err
));
583 let result
= if api_method
.protected
&& env_type
== RpcEnvironmentType
::PUBLIC
{
584 proxy_protected_request(api_method
, parts
, body
).await
586 handle_api_request(rpcenv
, api_method
, formatter
, parts
, body
, uri_param
).await
589 if let Err(err
) = result
{
590 return Ok((formatter
.format_error
)(err
));
598 // not Auth required for accessing files!
600 if method
!= hyper
::Method
::GET
{
601 bail
!("Unsupported HTTP method {}", method
);
605 let (ticket
, token
, language
) = extract_auth_data(&parts
.headers
);
607 match check_auth(&method
, &ticket
, &token
, &user_info
) {
609 let new_token
= assemble_csrf_prevention_token(csrf_secret(), &userid
);
610 return Ok(get_index(Some(userid
), Some(new_token
), language
, &api
, parts
));
613 tokio
::time
::delay_until(Instant
::from_std(delay_unauth_time
)).await
;
614 return Ok(get_index(None
, None
, language
, &api
, parts
));
618 return Ok(get_index(None
, None
, language
, &api
, parts
));
621 let filename
= api
.find_alias(&components
);
622 return handle_static_file_download(filename
).await
;
626 Err(http_err
!(NOT_FOUND
, "Path '{}' not found.", path
))