3 use std
::collections
::HashMap
;
6 use std
::task
::{Context, Poll}
;
9 use hyper
::{Body, Request, Response, StatusCode}
;
11 use proxmox
::api
::{ApiResponseFuture, HttpError, Router, RpcEnvironment}
;
12 use proxmox
::http_err
;
14 use proxmox_rest_server
::normalize_uri_path
;
15 use proxmox_rest_server
::formatter
::*;
17 use crate::server
::WorkerTask
;
19 /// Hyper Service implementation to handle stateful H2 connections.
21 /// We use this kind of service to handle backup protocol
22 /// connections. State is stored inside the generic ``rpcenv``. Logs
23 /// goes into the ``WorkerTask`` log.
24 pub struct H2Service
<E
> {
25 router
: &'
static Router
,
27 worker
: Arc
<WorkerTask
>,
31 impl <E
: RpcEnvironment
+ Clone
> H2Service
<E
> {
33 pub fn new(rpcenv
: E
, worker
: Arc
<WorkerTask
>, router
: &'
static Router
, debug
: bool
) -> Self {
34 Self { rpcenv, worker, router, debug }
37 pub fn debug
<S
: AsRef
<str>>(&self, msg
: S
) {
38 if self.debug { self.worker.log(msg); }
41 fn handle_request(&self, req
: Request
<Body
>) -> ApiResponseFuture
{
43 let (parts
, body
) = req
.into_parts();
45 let method
= parts
.method
.clone();
47 let (path
, components
) = match normalize_uri_path(parts
.uri
.path()) {
49 Err(err
) => return future
::err(http_err
!(BAD_REQUEST
, "{}", err
)).boxed(),
52 self.debug(format
!("{} {}", method
, path
));
54 let mut uri_param
= HashMap
::new();
56 let formatter
= &JSON_FORMATTER
;
58 match self.router
.find_method(&components
, method
, &mut uri_param
) {
60 let err
= http_err
!(NOT_FOUND
, "Path '{}' not found.", path
);
61 future
::ok((formatter
.format_error
)(err
)).boxed()
64 proxmox_rest_server
::handle_api_request(
65 self.rpcenv
.clone(), api_method
, formatter
, parts
, body
, uri_param
).boxed()
70 fn log_response(worker
: Arc
<WorkerTask
>, method
: hyper
::Method
, path
: &str, resp
: &Response
<Body
>) {
72 let status
= resp
.status();
74 if !status
.is_success() {
75 let reason
= status
.canonical_reason().unwrap_or("unknown reason");
77 let mut message
= "request failed";
78 if let Some(data
) = resp
.extensions().get
::<ErrorMessageExtension
>() {
82 worker
.log(format
!("{} {}: {} {}: {}", method
.as_str(), path
, status
.as_str(), reason
, message
));
87 impl <E
: RpcEnvironment
+ Clone
> tower_service
::Service
<Request
<Body
>> for H2Service
<E
> {
88 type Response
= Response
<Body
>;
90 #[allow(clippy::type_complexity)]
91 type Future
= Pin
<Box
<dyn Future
<Output
= Result
<Self::Response
, Self::Error
>> + Send
>>;
93 fn poll_ready(&mut self, _cx
: &mut Context
) -> Poll
<Result
<(), Self::Error
>> {
97 fn call(&mut self, req
: Request
<Body
>) -> Self::Future
{
98 let path
= req
.uri().path().to_owned();
99 let method
= req
.method().clone();
100 let worker
= self.worker
.clone();
102 self.handle_request(req
)
103 .map(move |result
| match result
{
105 Self::log_response(worker
, method
, &path
, &res
);
109 if let Some(apierr
) = err
.downcast_ref
::<HttpError
>() {
110 let mut resp
= Response
::new(Body
::from(apierr
.message
.clone()));
111 resp
.extensions_mut().insert(ErrorMessageExtension(apierr
.message
.clone()));
112 *resp
.status_mut() = apierr
.code
;
113 Self::log_response(worker
, method
, &path
, &resp
);
116 let mut resp
= Response
::new(Body
::from(err
.to_string()));
117 resp
.extensions_mut().insert(ErrorMessageExtension(err
.to_string()));
118 *resp
.status_mut() = StatusCode
::BAD_REQUEST
;
119 Self::log_response(worker
, method
, &path
, &resp
);