3 use std
::collections
::HashMap
;
5 use std
::task
::{Context, Poll}
;
8 use hyper
::{Body, Request, Response, StatusCode}
;
10 use proxmox
::api
::{http_err, ApiFuture, ApiHandler, HttpError, Router, RpcEnvironment}
;
13 use crate::server
::formatter
::*;
14 use crate::server
::WorkerTask
;
16 /// Hyper Service implementation to handle stateful H2 connections.
18 /// We use this kind of service to handle backup protocol
19 /// connections. State is stored inside the generic ``rpcenv``. Logs
20 /// goes into the ``WorkerTask`` log.
21 pub struct H2Service
<E
> {
22 router
: &'
static Router
,
24 worker
: Arc
<WorkerTask
>,
28 impl <E
: RpcEnvironment
+ Clone
> H2Service
<E
> {
30 pub fn new(rpcenv
: E
, worker
: Arc
<WorkerTask
>, router
: &'
static Router
, debug
: bool
) -> Self {
31 Self { rpcenv, worker, router, debug }
34 pub fn debug
<S
: AsRef
<str>>(&self, msg
: S
) {
35 if self.debug { self.worker.log(msg); }
38 fn handle_request(&self, req
: Request
<Body
>) -> ApiFuture
{
40 let (parts
, body
) = req
.into_parts();
42 let method
= parts
.method
.clone();
44 let (path
, components
) = match tools
::normalize_uri_path(parts
.uri
.path()) {
46 Err(err
) => return Box
::new(future
::err(http_err
!(BAD_REQUEST
, err
.to_string()))),
49 self.debug(format
!("{} {}", method
, path
));
51 let mut uri_param
= HashMap
::new();
53 let formatter
= &JSON_FORMATTER
;
55 match self.router
.find_method(&components
, method
, &mut uri_param
) {
57 let err
= http_err
!(NOT_FOUND
, "Path not found.".to_string());
58 Box
::new(future
::ok((formatter
.format_error
)(err
)))
61 match api_method
.handler
{
62 ApiHandler
::Sync(_
) => {
63 crate::server
::rest
::handle_sync_api_request(
64 self.rpcenv
.clone(), api_method
, formatter
, parts
, body
, uri_param
)
66 ApiHandler
::Async(_
) => {
67 crate::server
::rest
::handle_async_api_request(
68 self.rpcenv
.clone(), api_method
, formatter
, parts
, body
, uri_param
)
75 fn log_response(worker
: Arc
<WorkerTask
>, method
: hyper
::Method
, path
: &str, resp
: &Response
<Body
>) {
77 let status
= resp
.status();
79 if !status
.is_success() {
80 let reason
= status
.canonical_reason().unwrap_or("unknown reason");
82 let mut message
= "request failed";
83 if let Some(data
) = resp
.extensions().get
::<ErrorMessageExtension
>() {
87 worker
.log(format
!("{} {}: {} {}: {}", method
.as_str(), path
, status
.as_str(), reason
, message
));
92 impl <E
: RpcEnvironment
+ Clone
> tower_service
::Service
<Request
<Body
>> for H2Service
<E
> {
93 type Response
= Response
<Body
>;
96 std
::pin
::Pin
<Box
<dyn Future
<Output
= Result
<Response
<Body
>, Self::Error
>> + Send
>>;
98 fn poll_ready(&mut self, _cx
: &mut Context
) -> Poll
<Result
<(), Self::Error
>> {
102 fn call(&mut self, req
: Request
<Body
>) -> Self::Future
{
103 let path
= req
.uri().path().to_owned();
104 let method
= req
.method().clone();
105 let worker
= self.worker
.clone();
107 std
::pin
::Pin
::from(self.handle_request(req
))
108 .map(move |result
| match result
{
110 Self::log_response(worker
, method
, &path
, &res
);
114 if let Some(apierr
) = err
.downcast_ref
::<HttpError
>() {
115 let mut resp
= Response
::new(Body
::from(apierr
.message
.clone()));
116 resp
.extensions_mut().insert(ErrorMessageExtension(apierr
.message
.clone()));
117 *resp
.status_mut() = apierr
.code
;
118 Self::log_response(worker
, method
, &path
, &resp
);
121 let mut resp
= Response
::new(Body
::from(err
.to_string()));
122 resp
.extensions_mut().insert(ErrorMessageExtension(err
.to_string()));
123 *resp
.status_mut() = StatusCode
::BAD_REQUEST
;
124 Self::log_response(worker
, method
, &path
, &resp
);