3 use std
::collections
::HashMap
;
5 use std
::task
::{Context, Poll}
;
8 use hyper
::{Body, Request, Response, StatusCode}
;
10 use proxmox
::api
::{ApiResponseFuture, HttpError, Router, RpcEnvironment}
;
11 use proxmox
::http_err
;
14 use crate::server
::formatter
::*;
15 use crate::server
::WorkerTask
;
17 /// Hyper Service implementation to handle stateful H2 connections.
19 /// We use this kind of service to handle backup protocol
20 /// connections. State is stored inside the generic ``rpcenv``. Logs
21 /// goes into the ``WorkerTask`` log.
22 pub struct H2Service
<E
> {
23 router
: &'
static Router
,
25 worker
: Arc
<WorkerTask
>,
29 impl <E
: RpcEnvironment
+ Clone
> H2Service
<E
> {
31 pub fn new(rpcenv
: E
, worker
: Arc
<WorkerTask
>, router
: &'
static Router
, debug
: bool
) -> Self {
32 Self { rpcenv, worker, router, debug }
35 pub fn debug
<S
: AsRef
<str>>(&self, msg
: S
) {
36 if self.debug { self.worker.log(msg); }
39 fn handle_request(&self, req
: Request
<Body
>) -> ApiResponseFuture
{
41 let (parts
, body
) = req
.into_parts();
43 let method
= parts
.method
.clone();
45 let (path
, components
) = match tools
::normalize_uri_path(parts
.uri
.path()) {
47 Err(err
) => return future
::err(http_err
!(BAD_REQUEST
, err
.to_string())).boxed(),
50 self.debug(format
!("{} {}", method
, path
));
52 let mut uri_param
= HashMap
::new();
54 let formatter
= &JSON_FORMATTER
;
56 match self.router
.find_method(&components
, method
, &mut uri_param
) {
58 let err
= http_err
!(NOT_FOUND
, "Path not found.".to_string());
59 future
::ok((formatter
.format_error
)(err
)).boxed()
62 crate::server
::rest
::handle_api_request(
63 self.rpcenv
.clone(), api_method
, formatter
, parts
, body
, uri_param
).boxed()
68 fn log_response(worker
: Arc
<WorkerTask
>, method
: hyper
::Method
, path
: &str, resp
: &Response
<Body
>) {
70 let status
= resp
.status();
72 if !status
.is_success() {
73 let reason
= status
.canonical_reason().unwrap_or("unknown reason");
75 let mut message
= "request failed";
76 if let Some(data
) = resp
.extensions().get
::<ErrorMessageExtension
>() {
80 worker
.log(format
!("{} {}: {} {}: {}", method
.as_str(), path
, status
.as_str(), reason
, message
));
85 impl <E
: RpcEnvironment
+ Clone
> tower_service
::Service
<Request
<Body
>> for H2Service
<E
> {
86 type Response
= Response
<Body
>;
89 std
::pin
::Pin
<Box
<dyn Future
<Output
= Result
<Response
<Body
>, Self::Error
>> + Send
>>;
91 fn poll_ready(&mut self, _cx
: &mut Context
) -> Poll
<Result
<(), Self::Error
>> {
95 fn call(&mut self, req
: Request
<Body
>) -> Self::Future
{
96 let path
= req
.uri().path().to_owned();
97 let method
= req
.method().clone();
98 let worker
= self.worker
.clone();
100 std
::pin
::Pin
::from(self.handle_request(req
))
101 .map(move |result
| match result
{
103 Self::log_response(worker
, method
, &path
, &res
);
107 if let Some(apierr
) = err
.downcast_ref
::<HttpError
>() {
108 let mut resp
= Response
::new(Body
::from(apierr
.message
.clone()));
109 resp
.extensions_mut().insert(ErrorMessageExtension(apierr
.message
.clone()));
110 *resp
.status_mut() = apierr
.code
;
111 Self::log_response(worker
, method
, &path
, &resp
);
114 let mut resp
= Response
::new(Body
::from(err
.to_string()));
115 resp
.extensions_mut().insert(ErrorMessageExtension(err
.to_string()));
116 *resp
.status_mut() = StatusCode
::BAD_REQUEST
;
117 Self::log_response(worker
, method
, &path
, &resp
);