3 use std
::collections
::HashMap
;
6 use std
::task
::{Context, Poll}
;
9 use hyper
::{Body, Request, Response, StatusCode}
;
11 use proxmox_router
::{ApiResponseFuture, HttpError, Router, RpcEnvironment}
;
12 use proxmox_router
::http_err
;
14 use crate::{normalize_uri_path, WorkerTask}
;
15 use crate::formatter
::*;
17 /// Hyper Service implementation to handle stateful H2 connections.
19 /// We use this kind of service to handle backup protocol
20 /// connections. State is stored inside the generic ``rpcenv``. Logs
21 /// goes into the ``WorkerTask`` log.
22 pub struct H2Service
<E
> {
23 router
: &'
static Router
,
25 worker
: Arc
<WorkerTask
>,
29 impl <E
: RpcEnvironment
+ Clone
> H2Service
<E
> {
31 pub fn new(rpcenv
: E
, worker
: Arc
<WorkerTask
>, router
: &'
static Router
, debug
: bool
) -> Self {
32 Self { rpcenv, worker, router, debug }
35 pub fn debug
<S
: AsRef
<str>>(&self, msg
: S
) {
36 if self.debug { self.worker.log_message(msg); }
39 fn handle_request(&self, req
: Request
<Body
>) -> ApiResponseFuture
{
41 let (parts
, body
) = req
.into_parts();
43 let method
= parts
.method
.clone();
45 let (path
, components
) = match normalize_uri_path(parts
.uri
.path()) {
47 Err(err
) => return future
::err(http_err
!(BAD_REQUEST
, "{}", err
)).boxed(),
50 self.debug(format
!("{} {}", method
, path
));
52 let mut uri_param
= HashMap
::new();
54 let formatter
= JSON_FORMATTER
;
56 match self.router
.find_method(&components
, method
, &mut uri_param
) {
58 let err
= http_err
!(NOT_FOUND
, "Path '{}' not found.", path
);
59 future
::ok(formatter
.format_error(err
)).boxed()
62 crate::rest
::handle_api_request(
63 self.rpcenv
.clone(), api_method
, formatter
, parts
, body
, uri_param
).boxed()
68 fn log_response(worker
: Arc
<WorkerTask
>, method
: hyper
::Method
, path
: &str, resp
: &Response
<Body
>) {
70 let status
= resp
.status();
72 if !status
.is_success() {
73 let reason
= status
.canonical_reason().unwrap_or("unknown reason");
75 let mut message
= "request failed";
76 if let Some(data
) = resp
.extensions().get
::<ErrorMessageExtension
>() {
80 worker
.log_message(format
!(
92 impl <E
: RpcEnvironment
+ Clone
> tower_service
::Service
<Request
<Body
>> for H2Service
<E
> {
93 type Response
= Response
<Body
>;
95 #[allow(clippy::type_complexity)]
96 type Future
= Pin
<Box
<dyn Future
<Output
= Result
<Self::Response
, Self::Error
>> + Send
>>;
98 fn poll_ready(&mut self, _cx
: &mut Context
) -> Poll
<Result
<(), Self::Error
>> {
102 fn call(&mut self, req
: Request
<Body
>) -> Self::Future
{
103 let path
= req
.uri().path().to_owned();
104 let method
= req
.method().clone();
105 let worker
= self.worker
.clone();
107 self.handle_request(req
)
108 .map(move |result
| match result
{
110 Self::log_response(worker
, method
, &path
, &res
);
114 if let Some(apierr
) = err
.downcast_ref
::<HttpError
>() {
115 let mut resp
= Response
::new(Body
::from(apierr
.message
.clone()));
116 resp
.extensions_mut().insert(ErrorMessageExtension(apierr
.message
.clone()));
117 *resp
.status_mut() = apierr
.code
;
118 Self::log_response(worker
, method
, &path
, &resp
);
121 let mut resp
= Response
::new(Body
::from(err
.to_string()));
122 resp
.extensions_mut().insert(ErrorMessageExtension(err
.to_string()));
123 *resp
.status_mut() = StatusCode
::BAD_REQUEST
;
124 Self::log_response(worker
, method
, &path
, &resp
);