]> git.proxmox.com Git - proxmox-backup.git/commitdiff
move src/server/rest.rs to proxmox-rest-server crate
authorDietmar Maurer <dietmar@proxmox.com>
Tue, 21 Sep 2021 05:58:51 +0000 (07:58 +0200)
committerThomas Lamprecht <t.lamprecht@proxmox.com>
Tue, 21 Sep 2021 06:46:41 +0000 (08:46 +0200)
Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
proxmox-rest-server/Cargo.toml
proxmox-rest-server/src/lib.rs
proxmox-rest-server/src/rest.rs [new file with mode: 0644]
src/bin/proxmox-backup-api.rs
src/bin/proxmox-backup-proxy.rs
src/bin/proxmox-restore-daemon.rs
src/server/h2service.rs
src/server/mod.rs
src/server/rest.rs [deleted file]

index c19dfb0b69cee3124de84371f7095318766723ae..2f740e673b985c73b8a695c4bcb72573cd54c57d 100644 (file)
@@ -16,9 +16,13 @@ libc = "0.2"
 log = "0.4"
 nix = "0.19.1"
 percent-encoding = "2.1"
+regex = "1.2"
 serde = { version = "1.0", features = [] }
 serde_json = "1.0"
 tokio = { version = "1.6", features = ["signal", "process"] }
+tokio-openssl = "0.6.1"
+tower-service = "0.3.0"
+url = "2.1"
 
 proxmox = { version = "0.13.3", features = [ "router"] }
 
index 55a10ca605b63caaa775c23c097e390f1fa21af5..2f29f9cd72135b4662e919bc5be8740fb243d017 100644 (file)
@@ -26,6 +26,9 @@ pub use file_logger::{FileLogger, FileLogOptions};
 mod api_config;
 pub use api_config::ApiConfig;
 
+mod rest;
+pub use rest::{RestServer, handle_api_request};
+
 pub enum AuthError {
     Generic(Error),
     NoData,
diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
new file mode 100644 (file)
index 0000000..dde47b5
--- /dev/null
@@ -0,0 +1,739 @@
+use std::collections::HashMap;
+use std::future::Future;
+use std::hash::BuildHasher;
+use std::path::{Path, PathBuf};
+use std::pin::Pin;
+use std::sync::{Arc, Mutex};
+use std::task::{Context, Poll};
+
+use anyhow::{bail, format_err, Error};
+use futures::future::{self, FutureExt, TryFutureExt};
+use futures::stream::TryStreamExt;
+use hyper::body::HttpBody;
+use hyper::header::{self, HeaderMap};
+use hyper::http::request::Parts;
+use hyper::{Body, Request, Response, StatusCode};
+use lazy_static::lazy_static;
+use regex::Regex;
+use serde_json::Value;
+use tokio::fs::File;
+use tokio::time::Instant;
+use url::form_urlencoded;
+
+use proxmox::api::schema::{
+    parse_parameter_strings, parse_simple_value, verify_json_object, ObjectSchemaType,
+    ParameterSchema,
+};
+use proxmox::api::{
+    check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
+    RpcEnvironmentType, UserInformation,
+};
+use proxmox::http_err;
+
+use pbs_tools::compression::{DeflateEncoder, Level};
+use pbs_tools::stream::AsyncReaderStream;
+
+use crate::{
+    ApiConfig, FileLogger, AuthError, RestEnvironment, CompressionMethod,
+    extract_cookie, normalize_uri_path, formatter::*,
+};
+
+extern "C" {
+    fn tzset();
+}
+
+struct AuthStringExtension(String);
+
+struct EmptyUserInformation {}
+
+impl UserInformation for EmptyUserInformation {
+    fn is_superuser(&self, _userid: &str) -> bool { false }
+    fn is_group_member(&self, _userid: &str, _group: &str) -> bool { false }
+    fn lookup_privs(&self, _userid: &str, _path: &[&str]) -> u64 { 0 }
+}
+
+pub struct RestServer {
+    pub api_config: Arc<ApiConfig>,
+}
+
+const MAX_URI_QUERY_LENGTH: usize = 3072;
+const CHUNK_SIZE_LIMIT: u64 = 32 * 1024;
+
+impl RestServer {
+    pub fn new(api_config: ApiConfig) -> Self {
+        Self {
+            api_config: Arc::new(api_config),
+        }
+    }
+}
+
+impl tower_service::Service<&Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>>
+    for RestServer
+{
+    type Response = ApiService;
+    type Error = Error;
+    type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
+
+    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn call(
+        &mut self,
+        ctx: &Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>,
+    ) -> Self::Future {
+        match ctx.get_ref().peer_addr() {
+            Err(err) => future::err(format_err!("unable to get peer address - {}", err)).boxed(),
+            Ok(peer) => future::ok(ApiService {
+                peer,
+                api_config: self.api_config.clone(),
+            })
+            .boxed(),
+        }
+    }
+}
+
+impl tower_service::Service<&tokio::net::TcpStream> for RestServer {
+    type Response = ApiService;
+    type Error = Error;
+    type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
+
+    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn call(&mut self, ctx: &tokio::net::TcpStream) -> Self::Future {
+        match ctx.peer_addr() {
+            Err(err) => future::err(format_err!("unable to get peer address - {}", err)).boxed(),
+            Ok(peer) => future::ok(ApiService {
+                peer,
+                api_config: self.api_config.clone(),
+            })
+            .boxed(),
+        }
+    }
+}
+
+impl tower_service::Service<&tokio::net::UnixStream> for RestServer {
+    type Response = ApiService;
+    type Error = Error;
+    type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
+
+    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn call(&mut self, _ctx: &tokio::net::UnixStream) -> Self::Future {
+        // TODO: Find a way to actually represent the vsock peer in the ApiService struct - for now
+        // it doesn't really matter, so just use a fake IP address
+        let fake_peer = "0.0.0.0:807".parse().unwrap();
+        future::ok(ApiService {
+            peer: fake_peer,
+            api_config: self.api_config.clone(),
+        })
+        .boxed()
+    }
+}
+
+pub struct ApiService {
+    pub peer: std::net::SocketAddr,
+    pub api_config: Arc<ApiConfig>,
+}
+
+fn log_response(
+    logfile: Option<&Arc<Mutex<FileLogger>>>,
+    peer: &std::net::SocketAddr,
+    method: hyper::Method,
+    path_query: &str,
+    resp: &Response<Body>,
+    user_agent: Option<String>,
+) {
+    if resp.extensions().get::<NoLogExtension>().is_some() {
+        return;
+    };
+
+    // we also log URL-to-long requests, so avoid message bigger than PIPE_BUF (4k on Linux)
+    // to profit from atomicty guarantees for O_APPEND opened logfiles
+    let path = &path_query[..MAX_URI_QUERY_LENGTH.min(path_query.len())];
+
+    let status = resp.status();
+    if !(status.is_success() || status.is_informational()) {
+        let reason = status.canonical_reason().unwrap_or("unknown reason");
+
+        let message = match resp.extensions().get::<ErrorMessageExtension>() {
+            Some(data) => &data.0,
+            None => "request failed",
+        };
+
+        log::error!(
+            "{} {}: {} {}: [client {}] {}",
+            method.as_str(),
+            path,
+            status.as_str(),
+            reason,
+            peer,
+            message
+        );
+    }
+    if let Some(logfile) = logfile {
+        let auth_id = match resp.extensions().get::<AuthStringExtension>() {
+            Some(AuthStringExtension(auth_id)) => auth_id.clone(),
+            None => "-".to_string(),
+        };
+        let now = proxmox::tools::time::epoch_i64();
+        // time format which apache/nginx use (by default), copied from pve-http-server
+        let datetime = proxmox::tools::time::strftime_local("%d/%m/%Y:%H:%M:%S %z", now)
+            .unwrap_or_else(|_| "-".to_string());
+
+        logfile.lock().unwrap().log(format!(
+            "{} - {} [{}] \"{} {}\" {} {} {}",
+            peer.ip(),
+            auth_id,
+            datetime,
+            method.as_str(),
+            path,
+            status.as_str(),
+            resp.body().size_hint().lower(),
+            user_agent.unwrap_or_else(|| "-".to_string()),
+        ));
+    }
+}
+
+fn get_proxied_peer(headers: &HeaderMap) -> Option<std::net::SocketAddr> {
+    lazy_static! {
+        static ref RE: Regex = Regex::new(r#"for="([^"]+)""#).unwrap();
+    }
+    let forwarded = headers.get(header::FORWARDED)?.to_str().ok()?;
+    let capture = RE.captures(&forwarded)?;
+    let rhost = capture.get(1)?.as_str();
+
+    rhost.parse().ok()
+}
+
+fn get_user_agent(headers: &HeaderMap) -> Option<String> {
+    let agent = headers.get(header::USER_AGENT)?.to_str();
+    agent
+        .map(|s| {
+            let mut s = s.to_owned();
+            s.truncate(128);
+            s
+        })
+        .ok()
+}
+
+impl tower_service::Service<Request<Body>> for ApiService {
+    type Response = Response<Body>;
+    type Error = Error;
+    #[allow(clippy::type_complexity)]
+    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
+
+    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn call(&mut self, req: Request<Body>) -> Self::Future {
+        let path = req.uri().path_and_query().unwrap().as_str().to_owned();
+        let method = req.method().clone();
+        let user_agent = get_user_agent(req.headers());
+
+        let config = Arc::clone(&self.api_config);
+        let peer = match get_proxied_peer(req.headers()) {
+            Some(proxied_peer) => proxied_peer,
+            None => self.peer,
+        };
+        async move {
+            let response = match handle_request(Arc::clone(&config), req, &peer).await {
+                Ok(response) => response,
+                Err(err) => {
+                    let (err, code) = match err.downcast_ref::<HttpError>() {
+                        Some(apierr) => (apierr.message.clone(), apierr.code),
+                        _ => (err.to_string(), StatusCode::BAD_REQUEST),
+                    };
+                    Response::builder()
+                        .status(code)
+                        .extension(ErrorMessageExtension(err.to_string()))
+                        .body(err.into())?
+                }
+            };
+            let logger = config.get_access_log();
+            log_response(logger, &peer, method, &path, &response, user_agent);
+            Ok(response)
+        }
+        .boxed()
+    }
+}
+
+fn parse_query_parameters<S: 'static + BuildHasher + Send>(
+    param_schema: ParameterSchema,
+    form: &str, // x-www-form-urlencoded body data
+    parts: &Parts,
+    uri_param: &HashMap<String, String, S>,
+) -> Result<Value, Error> {
+    let mut param_list: Vec<(String, String)> = vec![];
+
+    if !form.is_empty() {
+        for (k, v) in form_urlencoded::parse(form.as_bytes()).into_owned() {
+            param_list.push((k, v));
+        }
+    }
+
+    if let Some(query_str) = parts.uri.query() {
+        for (k, v) in form_urlencoded::parse(query_str.as_bytes()).into_owned() {
+            if k == "_dc" {
+                continue;
+            } // skip extjs "disable cache" parameter
+            param_list.push((k, v));
+        }
+    }
+
+    for (k, v) in uri_param {
+        param_list.push((k.clone(), v.clone()));
+    }
+
+    let params = parse_parameter_strings(&param_list, param_schema, true)?;
+
+    Ok(params)
+}
+
+async fn get_request_parameters<S: 'static + BuildHasher + Send>(
+    param_schema: ParameterSchema,
+    parts: Parts,
+    req_body: Body,
+    uri_param: HashMap<String, String, S>,
+) -> Result<Value, Error> {
+    let mut is_json = false;
+
+    if let Some(value) = parts.headers.get(header::CONTENT_TYPE) {
+        match value.to_str().map(|v| v.split(';').next()) {
+            Ok(Some("application/x-www-form-urlencoded")) => {
+                is_json = false;
+            }
+            Ok(Some("application/json")) => {
+                is_json = true;
+            }
+            _ => bail!("unsupported content type {:?}", value.to_str()),
+        }
+    }
+
+    let body = TryStreamExt::map_err(req_body, |err| {
+        http_err!(BAD_REQUEST, "Problems reading request body: {}", err)
+    })
+    .try_fold(Vec::new(), |mut acc, chunk| async move {
+        // FIXME: max request body size?
+        if acc.len() + chunk.len() < 64 * 1024 {
+            acc.extend_from_slice(&*chunk);
+            Ok(acc)
+        } else {
+            Err(http_err!(BAD_REQUEST, "Request body too large"))
+        }
+    })
+    .await?;
+
+    let utf8_data =
+        std::str::from_utf8(&body).map_err(|err| format_err!("Request body not uft8: {}", err))?;
+
+    if is_json {
+        let mut params: Value = serde_json::from_str(utf8_data)?;
+        for (k, v) in uri_param {
+            if let Some((_optional, prop_schema)) = param_schema.lookup(&k) {
+                params[&k] = parse_simple_value(&v, prop_schema)?;
+            }
+        }
+        verify_json_object(&params, &param_schema)?;
+        return Ok(params);
+    } else {
+        parse_query_parameters(param_schema, utf8_data, &parts, &uri_param)
+    }
+}
+
+struct NoLogExtension();
+
+async fn proxy_protected_request(
+    info: &'static ApiMethod,
+    mut parts: Parts,
+    req_body: Body,
+    peer: &std::net::SocketAddr,
+) -> Result<Response<Body>, Error> {
+    let mut uri_parts = parts.uri.clone().into_parts();
+
+    uri_parts.scheme = Some(http::uri::Scheme::HTTP);
+    uri_parts.authority = Some(http::uri::Authority::from_static("127.0.0.1:82"));
+    let new_uri = http::Uri::from_parts(uri_parts).unwrap();
+
+    parts.uri = new_uri;
+
+    let mut request = Request::from_parts(parts, req_body);
+    request.headers_mut().insert(
+        header::FORWARDED,
+        format!("for=\"{}\";", peer).parse().unwrap(),
+    );
+
+    let reload_timezone = info.reload_timezone;
+
+    let resp = hyper::client::Client::new()
+        .request(request)
+        .map_err(Error::from)
+        .map_ok(|mut resp| {
+            resp.extensions_mut().insert(NoLogExtension());
+            resp
+        })
+        .await?;
+
+    if reload_timezone {
+        unsafe {
+            tzset();
+        }
+    }
+
+    Ok(resp)
+}
+
+pub async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHasher + Send>(
+    mut rpcenv: Env,
+    info: &'static ApiMethod,
+    formatter: &'static OutputFormatter,
+    parts: Parts,
+    req_body: Body,
+    uri_param: HashMap<String, String, S>,
+) -> Result<Response<Body>, Error> {
+    let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000);
+    let compression = extract_compression_method(&parts.headers);
+
+    let result = match info.handler {
+        ApiHandler::AsyncHttp(handler) => {
+            let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?;
+            (handler)(parts, req_body, params, info, Box::new(rpcenv)).await
+        }
+        ApiHandler::Sync(handler) => {
+            let params =
+                get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
+            (handler)(params, info, &mut rpcenv).map(|data| (formatter.format_data)(data, &rpcenv))
+        }
+        ApiHandler::Async(handler) => {
+            let params =
+                get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
+            (handler)(params, info, &mut rpcenv)
+                .await
+                .map(|data| (formatter.format_data)(data, &rpcenv))
+        }
+    };
+
+    let mut resp = match result {
+        Ok(resp) => resp,
+        Err(err) => {
+            if let Some(httperr) = err.downcast_ref::<HttpError>() {
+                if httperr.code == StatusCode::UNAUTHORIZED {
+                    tokio::time::sleep_until(Instant::from_std(delay_unauth_time)).await;
+                }
+            }
+            (formatter.format_error)(err)
+        }
+    };
+
+    let resp = match compression {
+        Some(CompressionMethod::Deflate) => {
+            resp.headers_mut().insert(
+                header::CONTENT_ENCODING,
+                CompressionMethod::Deflate.content_encoding(),
+            );
+            resp.map(|body| {
+                Body::wrap_stream(DeflateEncoder::with_quality(
+                    TryStreamExt::map_err(body, |err| {
+                        proxmox::io_format_err!("error during compression: {}", err)
+                    }),
+                    Level::Default,
+                ))
+            })
+        }
+        None => resp,
+    };
+
+    if info.reload_timezone {
+        unsafe {
+            tzset();
+        }
+    }
+
+    Ok(resp)
+}
+
+
+fn extension_to_content_type(filename: &Path) -> (&'static str, bool) {
+    if let Some(ext) = filename.extension().and_then(|osstr| osstr.to_str()) {
+        return match ext {
+            "css" => ("text/css", false),
+            "html" => ("text/html", false),
+            "js" => ("application/javascript", false),
+            "json" => ("application/json", false),
+            "map" => ("application/json", false),
+            "png" => ("image/png", true),
+            "ico" => ("image/x-icon", true),
+            "gif" => ("image/gif", true),
+            "svg" => ("image/svg+xml", false),
+            "jar" => ("application/java-archive", true),
+            "woff" => ("application/font-woff", true),
+            "woff2" => ("application/font-woff2", true),
+            "ttf" => ("application/font-snft", true),
+            "pdf" => ("application/pdf", true),
+            "epub" => ("application/epub+zip", true),
+            "mp3" => ("audio/mpeg", true),
+            "oga" => ("audio/ogg", true),
+            "tgz" => ("application/x-compressed-tar", true),
+            _ => ("application/octet-stream", false),
+        };
+    }
+
+    ("application/octet-stream", false)
+}
+
+async fn simple_static_file_download(
+    filename: PathBuf,
+    content_type: &'static str,
+    compression: Option<CompressionMethod>,
+) -> Result<Response<Body>, Error> {
+    use tokio::io::AsyncReadExt;
+
+    let mut file = File::open(filename)
+        .await
+        .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
+
+    let mut data: Vec<u8> = Vec::new();
+
+    let mut response = match compression {
+        Some(CompressionMethod::Deflate) => {
+            let mut enc = DeflateEncoder::with_quality(data, Level::Default);
+            enc.compress_vec(&mut file, CHUNK_SIZE_LIMIT as usize)
+                .await?;
+            let mut response = Response::new(enc.into_inner().into());
+            response.headers_mut().insert(
+                header::CONTENT_ENCODING,
+                CompressionMethod::Deflate.content_encoding(),
+            );
+            response
+        }
+        None => {
+            file.read_to_end(&mut data)
+                .await
+                .map_err(|err| http_err!(BAD_REQUEST, "File read failed: {}", err))?;
+            Response::new(data.into())
+        }
+    };
+
+    response.headers_mut().insert(
+        header::CONTENT_TYPE,
+        header::HeaderValue::from_static(content_type),
+    );
+
+    Ok(response)
+}
+
+async fn chuncked_static_file_download(
+    filename: PathBuf,
+    content_type: &'static str,
+    compression: Option<CompressionMethod>,
+) -> Result<Response<Body>, Error> {
+    let mut resp = Response::builder()
+        .status(StatusCode::OK)
+        .header(header::CONTENT_TYPE, content_type);
+
+    let file = File::open(filename)
+        .await
+        .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
+
+    let body = match compression {
+        Some(CompressionMethod::Deflate) => {
+            resp = resp.header(
+                header::CONTENT_ENCODING,
+                CompressionMethod::Deflate.content_encoding(),
+            );
+            Body::wrap_stream(DeflateEncoder::with_quality(
+                AsyncReaderStream::new(file),
+                Level::Default,
+            ))
+        }
+        None => Body::wrap_stream(AsyncReaderStream::new(file)),
+    };
+
+    Ok(resp.body(body).unwrap())
+}
+
+async fn handle_static_file_download(
+    filename: PathBuf,
+    compression: Option<CompressionMethod>,
+) -> Result<Response<Body>, Error> {
+    let metadata = tokio::fs::metadata(filename.clone())
+        .map_err(|err| http_err!(BAD_REQUEST, "File access problems: {}", err))
+        .await?;
+
+    let (content_type, nocomp) = extension_to_content_type(&filename);
+    let compression = if nocomp { None } else { compression };
+
+    if metadata.len() < CHUNK_SIZE_LIMIT {
+        simple_static_file_download(filename, content_type, compression).await
+    } else {
+        chuncked_static_file_download(filename, content_type, compression).await
+    }
+}
+
+fn extract_lang_header(headers: &http::HeaderMap) -> Option<String> {
+    if let Some(Ok(cookie)) = headers.get("COOKIE").map(|v| v.to_str()) {
+        return extract_cookie(cookie, "PBSLangCookie");
+    }
+    None
+}
+
+// FIXME: support handling multiple compression methods
+fn extract_compression_method(headers: &http::HeaderMap) -> Option<CompressionMethod> {
+    if let Some(Ok(encodings)) = headers.get(header::ACCEPT_ENCODING).map(|v| v.to_str()) {
+        for encoding in encodings.split(&[',', ' '][..]) {
+            if let Ok(method) = encoding.parse() {
+                return Some(method);
+            }
+        }
+    }
+    None
+}
+
+async fn handle_request(
+    api: Arc<ApiConfig>,
+    req: Request<Body>,
+    peer: &std::net::SocketAddr,
+) -> Result<Response<Body>, Error> {
+    let (parts, body) = req.into_parts();
+    let method = parts.method.clone();
+    let (path, components) = normalize_uri_path(parts.uri.path())?;
+
+    let comp_len = components.len();
+
+    let query = parts.uri.query().unwrap_or_default();
+    if path.len() + query.len() > MAX_URI_QUERY_LENGTH {
+        return Ok(Response::builder()
+            .status(StatusCode::URI_TOO_LONG)
+            .body("".into())
+            .unwrap());
+    }
+
+    let env_type = api.env_type();
+    let mut rpcenv = RestEnvironment::new(env_type, Arc::clone(&api));
+
+    rpcenv.set_client_ip(Some(*peer));
+
+    let auth = &api.api_auth;
+
+    let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000);
+    let access_forbidden_time = std::time::Instant::now() + std::time::Duration::from_millis(500);
+
+    if comp_len >= 1 && components[0] == "api2" {
+        if comp_len >= 2 {
+            let format = components[1];
+
+            let formatter = match format {
+                "json" => &JSON_FORMATTER,
+                "extjs" => &EXTJS_FORMATTER,
+                _ => bail!("Unsupported output format '{}'.", format),
+            };
+
+            let mut uri_param = HashMap::new();
+            let api_method = api.find_method(&components[2..], method.clone(), &mut uri_param);
+
+            let mut auth_required = true;
+            if let Some(api_method) = api_method {
+                if let Permission::World = *api_method.access.permission {
+                    auth_required = false; // no auth for endpoints with World permission
+                }
+            }
+
+            let mut user_info: Box<dyn UserInformation + Send + Sync> = Box::new(EmptyUserInformation {});
+
+            if auth_required {
+                match auth.check_auth(&parts.headers, &method) {
+                    Ok((authid, info)) => {
+                        rpcenv.set_auth_id(Some(authid));
+                        user_info = info;
+                    }
+                    Err(auth_err) => {
+                        let err = match auth_err {
+                            AuthError::Generic(err) => err,
+                            AuthError::NoData => {
+                                format_err!("no authentication credentials provided.")
+                            }
+                        };
+                        // fixme: log Username??
+                        rpcenv.log_failed_auth(None, &err.to_string());
+
+                        // always delay unauthorized calls by 3 seconds (from start of request)
+                        let err = http_err!(UNAUTHORIZED, "authentication failed - {}", err);
+                        tokio::time::sleep_until(Instant::from_std(delay_unauth_time)).await;
+                        return Ok((formatter.format_error)(err));
+                    }
+                }
+            }
+
+            match api_method {
+                None => {
+                    let err = http_err!(NOT_FOUND, "Path '{}' not found.", path);
+                    return Ok((formatter.format_error)(err));
+                }
+                Some(api_method) => {
+                    let auth_id = rpcenv.get_auth_id();
+                    let user_info = user_info;
+
+                    if !check_api_permission(
+                        api_method.access.permission,
+                        auth_id.as_deref(),
+                        &uri_param,
+                        user_info.as_ref(),
+                    ) {
+                        let err = http_err!(FORBIDDEN, "permission check failed");
+                        tokio::time::sleep_until(Instant::from_std(access_forbidden_time)).await;
+                        return Ok((formatter.format_error)(err));
+                    }
+
+                    let result = if api_method.protected && env_type == RpcEnvironmentType::PUBLIC {
+                        proxy_protected_request(api_method, parts, body, peer).await
+                    } else {
+                        handle_api_request(rpcenv, api_method, formatter, parts, body, uri_param)
+                            .await
+                    };
+
+                    let mut response = match result {
+                        Ok(resp) => resp,
+                        Err(err) => (formatter.format_error)(err),
+                    };
+
+                    if let Some(auth_id) = auth_id {
+                        response.extensions_mut().insert(AuthStringExtension(auth_id));
+                    }
+
+                    return Ok(response);
+                }
+            }
+        }
+    } else {
+        // not Auth required for accessing files!
+
+        if method != hyper::Method::GET {
+            bail!("Unsupported HTTP method {}", method);
+        }
+
+        if comp_len == 0 {
+            let language = extract_lang_header(&parts.headers);
+            match auth.check_auth(&parts.headers, &method) {
+                Ok((auth_id, _user_info)) => {
+                    return Ok(api.get_index(Some(auth_id), language, parts));
+                }
+                Err(AuthError::Generic(_)) => {
+                    tokio::time::sleep_until(Instant::from_std(delay_unauth_time)).await;
+                }
+                Err(AuthError::NoData) => {}
+            }
+            return Ok(api.get_index(None, language, parts));
+        } else {
+            let filename = api.find_alias(&components);
+            let compression = extract_compression_method(&parts.headers);
+            return handle_static_file_download(filename, compression).await;
+        }
+    }
+
+    Err(http_err!(NOT_FOUND, "Path '{}' not found.", path))
+}
index 7653f41256df1a489f7e3cabef73e7db0ca49c1e..9ad10260ec8299e57a45c13a39b491abf1c71370 100644 (file)
@@ -10,12 +10,11 @@ use proxmox::api::RpcEnvironmentType;
 use proxmox::tools::fs::CreateOptions;
 
 use pbs_tools::auth::private_auth_key;
-use proxmox_rest_server::ApiConfig;
+use proxmox_rest_server::{ApiConfig, RestServer};
 
 use proxmox_backup::server::{
     self,
     auth::default_api_auth,
-    rest::*,
 };
 use proxmox_rest_server::daemon;
 
index 5679de43de0c2a26f2abdbbcbbbff15d63beac88..6a8736e28b6f19e75f47d82280953f2a64fdaa0b 100644 (file)
@@ -19,14 +19,13 @@ use proxmox::api::RpcEnvironmentType;
 use proxmox::sys::linux::socket::set_tcp_keepalive;
 use proxmox::tools::fs::CreateOptions;
 
-use proxmox_rest_server::ApiConfig;
+use proxmox_rest_server::{ApiConfig, RestServer};
 
 use proxmox_backup::{
     backup::DataStore,
     server::{
         auth::default_api_auth,
         WorkerTask,
-        rest::*,
         jobstate::{
             self,
             Job,
index d9a8eff081d970ee63a03930465023ac3261c57d..9cf7f75574771d7835c0d1551c26687ceef81e35 100644 (file)
@@ -21,9 +21,7 @@ use hyper::header;
 use proxmox::api::RpcEnvironmentType;
 
 use pbs_client::DEFAULT_VSOCK_PORT;
-use proxmox_rest_server::ApiConfig;
-
-use proxmox_backup::server::rest::*;
+use proxmox_rest_server::{ApiConfig, RestServer};
 
 mod proxmox_restore_daemon;
 use proxmox_restore_daemon::*;
index 4e3024aa151e55ab34c95d6d6ffc9d364b13abfb..41d628be713c741cccf68b8fe202c8796cbd0626 100644 (file)
@@ -61,7 +61,7 @@ impl <E: RpcEnvironment + Clone> H2Service<E> {
                 future::ok((formatter.format_error)(err)).boxed()
             }
             Some(api_method) => {
-                crate::server::rest::handle_api_request(
+                proxmox_rest_server::handle_api_request(
                     self.rpcenv.clone(), api_method, formatter, parts, body, uri_param).boxed()
             }
         }
index 1fc575758c48678207856a26e57af04307399685..a7dcee6760863e9fa75af115c6628db4adbfec70 100644 (file)
@@ -55,9 +55,6 @@ pub use worker_task::*;
 mod h2service;
 pub use h2service::*;
 
-#[macro_use]
-pub mod rest;
-
 pub mod jobstate;
 
 mod verify_job;
diff --git a/src/server/rest.rs b/src/server/rest.rs
deleted file mode 100644 (file)
index 59fbb39..0000000
+++ /dev/null
@@ -1,739 +0,0 @@
-use std::collections::HashMap;
-use std::future::Future;
-use std::hash::BuildHasher;
-use std::path::{Path, PathBuf};
-use std::pin::Pin;
-use std::sync::{Arc, Mutex};
-use std::task::{Context, Poll};
-
-use anyhow::{bail, format_err, Error};
-use futures::future::{self, FutureExt, TryFutureExt};
-use futures::stream::TryStreamExt;
-use hyper::body::HttpBody;
-use hyper::header::{self, HeaderMap};
-use hyper::http::request::Parts;
-use hyper::{Body, Request, Response, StatusCode};
-use lazy_static::lazy_static;
-use regex::Regex;
-use serde_json::Value;
-use tokio::fs::File;
-use tokio::time::Instant;
-use url::form_urlencoded;
-
-use proxmox::api::schema::{
-    parse_parameter_strings, parse_simple_value, verify_json_object, ObjectSchemaType,
-    ParameterSchema,
-};
-use proxmox::api::{
-    check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
-    RpcEnvironmentType, UserInformation,
-};
-use proxmox::http_err;
-
-use pbs_tools::compression::{DeflateEncoder, Level};
-use pbs_tools::stream::AsyncReaderStream;
-use proxmox_rest_server::{
-    ApiConfig, FileLogger, AuthError, RestEnvironment, CompressionMethod,
-    extract_cookie, normalize_uri_path,
-};
-use proxmox_rest_server::formatter::*;
-
-extern "C" {
-    fn tzset();
-}
-
-struct AuthStringExtension(String);
-
-struct EmptyUserInformation {}
-
-impl UserInformation for EmptyUserInformation {
-    fn is_superuser(&self, _userid: &str) -> bool { false }
-    fn is_group_member(&self, _userid: &str, _group: &str) -> bool { false }
-    fn lookup_privs(&self, _userid: &str, _path: &[&str]) -> u64 { 0 }
-}
-
-pub struct RestServer {
-    pub api_config: Arc<ApiConfig>,
-}
-
-const MAX_URI_QUERY_LENGTH: usize = 3072;
-const CHUNK_SIZE_LIMIT: u64 = 32 * 1024;
-
-impl RestServer {
-    pub fn new(api_config: ApiConfig) -> Self {
-        Self {
-            api_config: Arc::new(api_config),
-        }
-    }
-}
-
-impl tower_service::Service<&Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>>
-    for RestServer
-{
-    type Response = ApiService;
-    type Error = Error;
-    type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
-
-    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
-        Poll::Ready(Ok(()))
-    }
-
-    fn call(
-        &mut self,
-        ctx: &Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>,
-    ) -> Self::Future {
-        match ctx.get_ref().peer_addr() {
-            Err(err) => future::err(format_err!("unable to get peer address - {}", err)).boxed(),
-            Ok(peer) => future::ok(ApiService {
-                peer,
-                api_config: self.api_config.clone(),
-            })
-            .boxed(),
-        }
-    }
-}
-
-impl tower_service::Service<&tokio::net::TcpStream> for RestServer {
-    type Response = ApiService;
-    type Error = Error;
-    type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
-
-    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
-        Poll::Ready(Ok(()))
-    }
-
-    fn call(&mut self, ctx: &tokio::net::TcpStream) -> Self::Future {
-        match ctx.peer_addr() {
-            Err(err) => future::err(format_err!("unable to get peer address - {}", err)).boxed(),
-            Ok(peer) => future::ok(ApiService {
-                peer,
-                api_config: self.api_config.clone(),
-            })
-            .boxed(),
-        }
-    }
-}
-
-impl tower_service::Service<&tokio::net::UnixStream> for RestServer {
-    type Response = ApiService;
-    type Error = Error;
-    type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
-
-    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
-        Poll::Ready(Ok(()))
-    }
-
-    fn call(&mut self, _ctx: &tokio::net::UnixStream) -> Self::Future {
-        // TODO: Find a way to actually represent the vsock peer in the ApiService struct - for now
-        // it doesn't really matter, so just use a fake IP address
-        let fake_peer = "0.0.0.0:807".parse().unwrap();
-        future::ok(ApiService {
-            peer: fake_peer,
-            api_config: self.api_config.clone(),
-        })
-        .boxed()
-    }
-}
-
-pub struct ApiService {
-    pub peer: std::net::SocketAddr,
-    pub api_config: Arc<ApiConfig>,
-}
-
-fn log_response(
-    logfile: Option<&Arc<Mutex<FileLogger>>>,
-    peer: &std::net::SocketAddr,
-    method: hyper::Method,
-    path_query: &str,
-    resp: &Response<Body>,
-    user_agent: Option<String>,
-) {
-    if resp.extensions().get::<NoLogExtension>().is_some() {
-        return;
-    };
-
-    // we also log URL-to-long requests, so avoid message bigger than PIPE_BUF (4k on Linux)
-    // to profit from atomicty guarantees for O_APPEND opened logfiles
-    let path = &path_query[..MAX_URI_QUERY_LENGTH.min(path_query.len())];
-
-    let status = resp.status();
-    if !(status.is_success() || status.is_informational()) {
-        let reason = status.canonical_reason().unwrap_or("unknown reason");
-
-        let message = match resp.extensions().get::<ErrorMessageExtension>() {
-            Some(data) => &data.0,
-            None => "request failed",
-        };
-
-        log::error!(
-            "{} {}: {} {}: [client {}] {}",
-            method.as_str(),
-            path,
-            status.as_str(),
-            reason,
-            peer,
-            message
-        );
-    }
-    if let Some(logfile) = logfile {
-        let auth_id = match resp.extensions().get::<AuthStringExtension>() {
-            Some(AuthStringExtension(auth_id)) => auth_id.clone(),
-            None => "-".to_string(),
-        };
-        let now = proxmox::tools::time::epoch_i64();
-        // time format which apache/nginx use (by default), copied from pve-http-server
-        let datetime = proxmox::tools::time::strftime_local("%d/%m/%Y:%H:%M:%S %z", now)
-            .unwrap_or_else(|_| "-".to_string());
-
-        logfile.lock().unwrap().log(format!(
-            "{} - {} [{}] \"{} {}\" {} {} {}",
-            peer.ip(),
-            auth_id,
-            datetime,
-            method.as_str(),
-            path,
-            status.as_str(),
-            resp.body().size_hint().lower(),
-            user_agent.unwrap_or_else(|| "-".to_string()),
-        ));
-    }
-}
-
-fn get_proxied_peer(headers: &HeaderMap) -> Option<std::net::SocketAddr> {
-    lazy_static! {
-        static ref RE: Regex = Regex::new(r#"for="([^"]+)""#).unwrap();
-    }
-    let forwarded = headers.get(header::FORWARDED)?.to_str().ok()?;
-    let capture = RE.captures(&forwarded)?;
-    let rhost = capture.get(1)?.as_str();
-
-    rhost.parse().ok()
-}
-
-fn get_user_agent(headers: &HeaderMap) -> Option<String> {
-    let agent = headers.get(header::USER_AGENT)?.to_str();
-    agent
-        .map(|s| {
-            let mut s = s.to_owned();
-            s.truncate(128);
-            s
-        })
-        .ok()
-}
-
-impl tower_service::Service<Request<Body>> for ApiService {
-    type Response = Response<Body>;
-    type Error = Error;
-    #[allow(clippy::type_complexity)]
-    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
-
-    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
-        Poll::Ready(Ok(()))
-    }
-
-    fn call(&mut self, req: Request<Body>) -> Self::Future {
-        let path = req.uri().path_and_query().unwrap().as_str().to_owned();
-        let method = req.method().clone();
-        let user_agent = get_user_agent(req.headers());
-
-        let config = Arc::clone(&self.api_config);
-        let peer = match get_proxied_peer(req.headers()) {
-            Some(proxied_peer) => proxied_peer,
-            None => self.peer,
-        };
-        async move {
-            let response = match handle_request(Arc::clone(&config), req, &peer).await {
-                Ok(response) => response,
-                Err(err) => {
-                    let (err, code) = match err.downcast_ref::<HttpError>() {
-                        Some(apierr) => (apierr.message.clone(), apierr.code),
-                        _ => (err.to_string(), StatusCode::BAD_REQUEST),
-                    };
-                    Response::builder()
-                        .status(code)
-                        .extension(ErrorMessageExtension(err.to_string()))
-                        .body(err.into())?
-                }
-            };
-            let logger = config.get_access_log();
-            log_response(logger, &peer, method, &path, &response, user_agent);
-            Ok(response)
-        }
-        .boxed()
-    }
-}
-
-fn parse_query_parameters<S: 'static + BuildHasher + Send>(
-    param_schema: ParameterSchema,
-    form: &str, // x-www-form-urlencoded body data
-    parts: &Parts,
-    uri_param: &HashMap<String, String, S>,
-) -> Result<Value, Error> {
-    let mut param_list: Vec<(String, String)> = vec![];
-
-    if !form.is_empty() {
-        for (k, v) in form_urlencoded::parse(form.as_bytes()).into_owned() {
-            param_list.push((k, v));
-        }
-    }
-
-    if let Some(query_str) = parts.uri.query() {
-        for (k, v) in form_urlencoded::parse(query_str.as_bytes()).into_owned() {
-            if k == "_dc" {
-                continue;
-            } // skip extjs "disable cache" parameter
-            param_list.push((k, v));
-        }
-    }
-
-    for (k, v) in uri_param {
-        param_list.push((k.clone(), v.clone()));
-    }
-
-    let params = parse_parameter_strings(&param_list, param_schema, true)?;
-
-    Ok(params)
-}
-
-async fn get_request_parameters<S: 'static + BuildHasher + Send>(
-    param_schema: ParameterSchema,
-    parts: Parts,
-    req_body: Body,
-    uri_param: HashMap<String, String, S>,
-) -> Result<Value, Error> {
-    let mut is_json = false;
-
-    if let Some(value) = parts.headers.get(header::CONTENT_TYPE) {
-        match value.to_str().map(|v| v.split(';').next()) {
-            Ok(Some("application/x-www-form-urlencoded")) => {
-                is_json = false;
-            }
-            Ok(Some("application/json")) => {
-                is_json = true;
-            }
-            _ => bail!("unsupported content type {:?}", value.to_str()),
-        }
-    }
-
-    let body = TryStreamExt::map_err(req_body, |err| {
-        http_err!(BAD_REQUEST, "Problems reading request body: {}", err)
-    })
-    .try_fold(Vec::new(), |mut acc, chunk| async move {
-        // FIXME: max request body size?
-        if acc.len() + chunk.len() < 64 * 1024 {
-            acc.extend_from_slice(&*chunk);
-            Ok(acc)
-        } else {
-            Err(http_err!(BAD_REQUEST, "Request body too large"))
-        }
-    })
-    .await?;
-
-    let utf8_data =
-        std::str::from_utf8(&body).map_err(|err| format_err!("Request body not uft8: {}", err))?;
-
-    if is_json {
-        let mut params: Value = serde_json::from_str(utf8_data)?;
-        for (k, v) in uri_param {
-            if let Some((_optional, prop_schema)) = param_schema.lookup(&k) {
-                params[&k] = parse_simple_value(&v, prop_schema)?;
-            }
-        }
-        verify_json_object(&params, &param_schema)?;
-        return Ok(params);
-    } else {
-        parse_query_parameters(param_schema, utf8_data, &parts, &uri_param)
-    }
-}
-
-struct NoLogExtension();
-
-async fn proxy_protected_request(
-    info: &'static ApiMethod,
-    mut parts: Parts,
-    req_body: Body,
-    peer: &std::net::SocketAddr,
-) -> Result<Response<Body>, Error> {
-    let mut uri_parts = parts.uri.clone().into_parts();
-
-    uri_parts.scheme = Some(http::uri::Scheme::HTTP);
-    uri_parts.authority = Some(http::uri::Authority::from_static("127.0.0.1:82"));
-    let new_uri = http::Uri::from_parts(uri_parts).unwrap();
-
-    parts.uri = new_uri;
-
-    let mut request = Request::from_parts(parts, req_body);
-    request.headers_mut().insert(
-        header::FORWARDED,
-        format!("for=\"{}\";", peer).parse().unwrap(),
-    );
-
-    let reload_timezone = info.reload_timezone;
-
-    let resp = hyper::client::Client::new()
-        .request(request)
-        .map_err(Error::from)
-        .map_ok(|mut resp| {
-            resp.extensions_mut().insert(NoLogExtension());
-            resp
-        })
-        .await?;
-
-    if reload_timezone {
-        unsafe {
-            tzset();
-        }
-    }
-
-    Ok(resp)
-}
-
-pub async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHasher + Send>(
-    mut rpcenv: Env,
-    info: &'static ApiMethod,
-    formatter: &'static OutputFormatter,
-    parts: Parts,
-    req_body: Body,
-    uri_param: HashMap<String, String, S>,
-) -> Result<Response<Body>, Error> {
-    let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000);
-    let compression = extract_compression_method(&parts.headers);
-
-    let result = match info.handler {
-        ApiHandler::AsyncHttp(handler) => {
-            let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?;
-            (handler)(parts, req_body, params, info, Box::new(rpcenv)).await
-        }
-        ApiHandler::Sync(handler) => {
-            let params =
-                get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
-            (handler)(params, info, &mut rpcenv).map(|data| (formatter.format_data)(data, &rpcenv))
-        }
-        ApiHandler::Async(handler) => {
-            let params =
-                get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
-            (handler)(params, info, &mut rpcenv)
-                .await
-                .map(|data| (formatter.format_data)(data, &rpcenv))
-        }
-    };
-
-    let mut resp = match result {
-        Ok(resp) => resp,
-        Err(err) => {
-            if let Some(httperr) = err.downcast_ref::<HttpError>() {
-                if httperr.code == StatusCode::UNAUTHORIZED {
-                    tokio::time::sleep_until(Instant::from_std(delay_unauth_time)).await;
-                }
-            }
-            (formatter.format_error)(err)
-        }
-    };
-
-    let resp = match compression {
-        Some(CompressionMethod::Deflate) => {
-            resp.headers_mut().insert(
-                header::CONTENT_ENCODING,
-                CompressionMethod::Deflate.content_encoding(),
-            );
-            resp.map(|body| {
-                Body::wrap_stream(DeflateEncoder::with_quality(
-                    TryStreamExt::map_err(body, |err| {
-                        proxmox::io_format_err!("error during compression: {}", err)
-                    }),
-                    Level::Default,
-                ))
-            })
-        }
-        None => resp,
-    };
-
-    if info.reload_timezone {
-        unsafe {
-            tzset();
-        }
-    }
-
-    Ok(resp)
-}
-
-
-fn extension_to_content_type(filename: &Path) -> (&'static str, bool) {
-    if let Some(ext) = filename.extension().and_then(|osstr| osstr.to_str()) {
-        return match ext {
-            "css" => ("text/css", false),
-            "html" => ("text/html", false),
-            "js" => ("application/javascript", false),
-            "json" => ("application/json", false),
-            "map" => ("application/json", false),
-            "png" => ("image/png", true),
-            "ico" => ("image/x-icon", true),
-            "gif" => ("image/gif", true),
-            "svg" => ("image/svg+xml", false),
-            "jar" => ("application/java-archive", true),
-            "woff" => ("application/font-woff", true),
-            "woff2" => ("application/font-woff2", true),
-            "ttf" => ("application/font-snft", true),
-            "pdf" => ("application/pdf", true),
-            "epub" => ("application/epub+zip", true),
-            "mp3" => ("audio/mpeg", true),
-            "oga" => ("audio/ogg", true),
-            "tgz" => ("application/x-compressed-tar", true),
-            _ => ("application/octet-stream", false),
-        };
-    }
-
-    ("application/octet-stream", false)
-}
-
-async fn simple_static_file_download(
-    filename: PathBuf,
-    content_type: &'static str,
-    compression: Option<CompressionMethod>,
-) -> Result<Response<Body>, Error> {
-    use tokio::io::AsyncReadExt;
-
-    let mut file = File::open(filename)
-        .await
-        .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
-
-    let mut data: Vec<u8> = Vec::new();
-
-    let mut response = match compression {
-        Some(CompressionMethod::Deflate) => {
-            let mut enc = DeflateEncoder::with_quality(data, Level::Default);
-            enc.compress_vec(&mut file, CHUNK_SIZE_LIMIT as usize)
-                .await?;
-            let mut response = Response::new(enc.into_inner().into());
-            response.headers_mut().insert(
-                header::CONTENT_ENCODING,
-                CompressionMethod::Deflate.content_encoding(),
-            );
-            response
-        }
-        None => {
-            file.read_to_end(&mut data)
-                .await
-                .map_err(|err| http_err!(BAD_REQUEST, "File read failed: {}", err))?;
-            Response::new(data.into())
-        }
-    };
-
-    response.headers_mut().insert(
-        header::CONTENT_TYPE,
-        header::HeaderValue::from_static(content_type),
-    );
-
-    Ok(response)
-}
-
-async fn chuncked_static_file_download(
-    filename: PathBuf,
-    content_type: &'static str,
-    compression: Option<CompressionMethod>,
-) -> Result<Response<Body>, Error> {
-    let mut resp = Response::builder()
-        .status(StatusCode::OK)
-        .header(header::CONTENT_TYPE, content_type);
-
-    let file = File::open(filename)
-        .await
-        .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
-
-    let body = match compression {
-        Some(CompressionMethod::Deflate) => {
-            resp = resp.header(
-                header::CONTENT_ENCODING,
-                CompressionMethod::Deflate.content_encoding(),
-            );
-            Body::wrap_stream(DeflateEncoder::with_quality(
-                AsyncReaderStream::new(file),
-                Level::Default,
-            ))
-        }
-        None => Body::wrap_stream(AsyncReaderStream::new(file)),
-    };
-
-    Ok(resp.body(body).unwrap())
-}
-
-async fn handle_static_file_download(
-    filename: PathBuf,
-    compression: Option<CompressionMethod>,
-) -> Result<Response<Body>, Error> {
-    let metadata = tokio::fs::metadata(filename.clone())
-        .map_err(|err| http_err!(BAD_REQUEST, "File access problems: {}", err))
-        .await?;
-
-    let (content_type, nocomp) = extension_to_content_type(&filename);
-    let compression = if nocomp { None } else { compression };
-
-    if metadata.len() < CHUNK_SIZE_LIMIT {
-        simple_static_file_download(filename, content_type, compression).await
-    } else {
-        chuncked_static_file_download(filename, content_type, compression).await
-    }
-}
-
-fn extract_lang_header(headers: &http::HeaderMap) -> Option<String> {
-    if let Some(Ok(cookie)) = headers.get("COOKIE").map(|v| v.to_str()) {
-        return extract_cookie(cookie, "PBSLangCookie");
-    }
-    None
-}
-
-// FIXME: support handling multiple compression methods
-fn extract_compression_method(headers: &http::HeaderMap) -> Option<CompressionMethod> {
-    if let Some(Ok(encodings)) = headers.get(header::ACCEPT_ENCODING).map(|v| v.to_str()) {
-        for encoding in encodings.split(&[',', ' '][..]) {
-            if let Ok(method) = encoding.parse() {
-                return Some(method);
-            }
-        }
-    }
-    None
-}
-
-async fn handle_request(
-    api: Arc<ApiConfig>,
-    req: Request<Body>,
-    peer: &std::net::SocketAddr,
-) -> Result<Response<Body>, Error> {
-    let (parts, body) = req.into_parts();
-    let method = parts.method.clone();
-    let (path, components) = normalize_uri_path(parts.uri.path())?;
-
-    let comp_len = components.len();
-
-    let query = parts.uri.query().unwrap_or_default();
-    if path.len() + query.len() > MAX_URI_QUERY_LENGTH {
-        return Ok(Response::builder()
-            .status(StatusCode::URI_TOO_LONG)
-            .body("".into())
-            .unwrap());
-    }
-
-    let env_type = api.env_type();
-    let mut rpcenv = RestEnvironment::new(env_type, Arc::clone(&api));
-
-    rpcenv.set_client_ip(Some(*peer));
-
-    let auth = &api.api_auth;
-
-    let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000);
-    let access_forbidden_time = std::time::Instant::now() + std::time::Duration::from_millis(500);
-
-    if comp_len >= 1 && components[0] == "api2" {
-        if comp_len >= 2 {
-            let format = components[1];
-
-            let formatter = match format {
-                "json" => &JSON_FORMATTER,
-                "extjs" => &EXTJS_FORMATTER,
-                _ => bail!("Unsupported output format '{}'.", format),
-            };
-
-            let mut uri_param = HashMap::new();
-            let api_method = api.find_method(&components[2..], method.clone(), &mut uri_param);
-
-            let mut auth_required = true;
-            if let Some(api_method) = api_method {
-                if let Permission::World = *api_method.access.permission {
-                    auth_required = false; // no auth for endpoints with World permission
-                }
-            }
-
-            let mut user_info: Box<dyn UserInformation + Send + Sync> = Box::new(EmptyUserInformation {});
-
-            if auth_required {
-                match auth.check_auth(&parts.headers, &method) {
-                    Ok((authid, info)) => {
-                        rpcenv.set_auth_id(Some(authid));
-                        user_info = info;
-                    }
-                    Err(auth_err) => {
-                        let err = match auth_err {
-                            AuthError::Generic(err) => err,
-                            AuthError::NoData => {
-                                format_err!("no authentication credentials provided.")
-                            }
-                        };
-                        // fixme: log Username??
-                        rpcenv.log_failed_auth(None, &err.to_string());
-
-                        // always delay unauthorized calls by 3 seconds (from start of request)
-                        let err = http_err!(UNAUTHORIZED, "authentication failed - {}", err);
-                        tokio::time::sleep_until(Instant::from_std(delay_unauth_time)).await;
-                        return Ok((formatter.format_error)(err));
-                    }
-                }
-            }
-
-            match api_method {
-                None => {
-                    let err = http_err!(NOT_FOUND, "Path '{}' not found.", path);
-                    return Ok((formatter.format_error)(err));
-                }
-                Some(api_method) => {
-                    let auth_id = rpcenv.get_auth_id();
-                    let user_info = user_info;
-
-                    if !check_api_permission(
-                        api_method.access.permission,
-                        auth_id.as_deref(),
-                        &uri_param,
-                        user_info.as_ref(),
-                    ) {
-                        let err = http_err!(FORBIDDEN, "permission check failed");
-                        tokio::time::sleep_until(Instant::from_std(access_forbidden_time)).await;
-                        return Ok((formatter.format_error)(err));
-                    }
-
-                    let result = if api_method.protected && env_type == RpcEnvironmentType::PUBLIC {
-                        proxy_protected_request(api_method, parts, body, peer).await
-                    } else {
-                        handle_api_request(rpcenv, api_method, formatter, parts, body, uri_param)
-                            .await
-                    };
-
-                    let mut response = match result {
-                        Ok(resp) => resp,
-                        Err(err) => (formatter.format_error)(err),
-                    };
-
-                    if let Some(auth_id) = auth_id {
-                        response.extensions_mut().insert(AuthStringExtension(auth_id));
-                    }
-
-                    return Ok(response);
-                }
-            }
-        }
-    } else {
-        // not Auth required for accessing files!
-
-        if method != hyper::Method::GET {
-            bail!("Unsupported HTTP method {}", method);
-        }
-
-        if comp_len == 0 {
-            let language = extract_lang_header(&parts.headers);
-            match auth.check_auth(&parts.headers, &method) {
-                Ok((auth_id, _user_info)) => {
-                    return Ok(api.get_index(Some(auth_id), language, parts));
-                }
-                Err(AuthError::Generic(_)) => {
-                    tokio::time::sleep_until(Instant::from_std(delay_unauth_time)).await;
-                }
-                Err(AuthError::NoData) => {}
-            }
-            return Ok(api.get_index(None, language, parts));
-        } else {
-            let filename = api.find_alias(&components);
-            let compression = extract_compression_method(&parts.headers);
-            return handle_static_file_download(filename, compression).await;
-        }
-    }
-
-    Err(http_err!(NOT_FOUND, "Path '{}' not found.", path))
-}