2 use lazy_static
::lazy_static
;
7 use hyper
::header
::{self, HeaderValue, UPGRADE}
;
8 use hyper
::{Body, Response, StatusCode}
;
9 use hyper
::http
::request
::Parts
;
10 //use chrono::{Local, TimeZone};
12 use serde_json
::Value
;
15 use crate::api_schema
::router
::*;
16 use crate::api_schema
::*;
17 use crate::server
::{WorkerTask, H2Service}
;
19 use crate::api2
::types
::*;
24 pub fn router() -> Router
{
26 .upgrade(api_method_upgrade_backup())
29 pub fn api_method_upgrade_backup() -> ApiAsyncMethod
{
31 upgrade_to_backup_reader_protocol
,
32 ObjectSchema
::new(concat
!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1
!(), "')."))
33 .required("store", StringSchema
::new("Datastore name."))
34 .required("backup-type", StringSchema
::new("Backup type.")
35 .format(Arc
::new(ApiStringFormat
::Enum(&["vm", "ct", "host"]))))
36 .required("backup-id", StringSchema
::new("Backup ID."))
37 .required("backup-time", IntegerSchema
::new("Backup time (Unix epoch.)")
38 .minimum(1_547_797_308))
39 .optional("debug", BooleanSchema
::new("Enable verbose debug logging."))
43 fn upgrade_to_backup_reader_protocol(
47 _info
: &ApiAsyncMethod
,
48 rpcenv
: Box
<dyn RpcEnvironment
>,
49 ) -> Result
<BoxFut
, Error
> {
51 let debug
= param
["debug"].as_bool().unwrap_or(false);
53 let store
= tools
::required_string_param(¶m
, "store")?
.to_owned();
54 let datastore
= DataStore
::lookup_datastore(&store
)?
;
56 let backup_type
= tools
::required_string_param(¶m
, "backup-type")?
;
57 let backup_id
= tools
::required_string_param(¶m
, "backup-id")?
;
58 let backup_time
= tools
::required_integer_param(¶m
, "backup-time")?
;
63 .ok_or_else(|| format_err
!("missing Upgrade header"))?
66 if protocols
!= PROXMOX_BACKUP_READER_PROTOCOL_ID_V1
!() {
67 bail
!("invalid protocol name");
70 if parts
.version
>= http
::version
::Version
::HTTP_2
{
71 bail
!("unexpected http version '{:?}' (expected version < 2)", parts
.version
);
74 let username
= rpcenv
.get_user().unwrap();
75 let env_type
= rpcenv
.env_type();
77 let backup_dir
= BackupDir
::new(backup_type
, backup_id
, backup_time
);
78 let path
= datastore
.base_path();
80 //let files = BackupInfo::list_files(&path, &backup_dir)?;
82 let worker_id
= format
!("{}_{}_{}_{:08X}", store
, backup_type
, backup_id
, backup_dir
.backup_time().timestamp());
84 WorkerTask
::spawn("reader", Some(worker_id
), &username
.clone(), true, move |worker
| {
85 let mut env
= ReaderEnvironment
::new(
86 env_type
, username
.clone(), worker
.clone(), datastore
, backup_dir
);
90 env
.log(format
!("starting new backup reader datastore '{}': {:?}", store
, path
));
92 let service
= H2Service
::new(env
.clone(), worker
.clone(), &READER_ROUTER
, debug
);
94 let abort_future
= worker
.abort_future();
96 let req_fut
= req_body
100 let env
= env
.clone();
102 env
.debug("protocol upgrade done");
104 let mut http
= hyper
::server
::conn
::Http
::new();
105 http
.http2_only(true);
106 // increase window size: todo - find optiomal size
107 let window_size
= 32*1024*1024; // max = (1 << 31) - 2
108 http
.http2_initial_stream_window_size(window_size
);
109 http
.http2_initial_connection_window_size(window_size
);
111 http
.serve_connection(conn
, service
)
112 .map_err(Error
::from
)
115 let abort_future
= abort_future
116 .map(|_
| Err(format_err
!("task aborted")));
118 use futures
::future
::Either
;
119 futures
::future
::select(req_fut
, abort_future
)
120 .map(|res
| match res
{
121 Either
::Left((Ok(res
), _
)) => Ok(res
),
122 Either
::Left((Err(err
), _
)) => Err(err
),
123 Either
::Right((Ok(res
), _
)) => Ok(res
),
124 Either
::Right((Err(err
), _
)) => Err(err
),
126 .map_ok(move |_
| env
.log("reader finished sucessfully"))
129 let response
= Response
::builder()
130 .status(StatusCode
::SWITCHING_PROTOCOLS
)
131 .header(UPGRADE
, HeaderValue
::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1
!()))
132 .body(Body
::empty())?
;
134 Ok(Box
::new(futures
::future
::ok(response
)))
138 static ref READER_ROUTER
: Router
= reader_api();
141 pub fn reader_api() -> Router
{
144 "chunk", Router
::new()
145 .download(api_method_download_chunk())
148 "download", Router
::new()
149 .download(api_method_download_file())
152 "speedtest", Router
::new()
153 .download(api_method_speedtest())
157 pub fn api_method_download_file() -> ApiAsyncMethod
{
160 ObjectSchema
::new("Download specified file.")
161 .required("file-name", crate::api2
::types
::BACKUP_ARCHIVE_NAME_SCHEMA
.clone())
169 _info
: &ApiAsyncMethod
,
170 rpcenv
: Box
<dyn RpcEnvironment
>,
171 ) -> Result
<BoxFut
, Error
> {
173 let env
: &ReaderEnvironment
= rpcenv
.as_ref();
174 let env2
= env
.clone();
176 let file_name
= tools
::required_string_param(¶m
, "file-name")?
.to_owned();
178 let mut path
= env
.datastore
.base_path();
179 path
.push(env
.backup_dir
.relative_path());
180 path
.push(&file_name
);
182 let path2
= path
.clone();
183 let path3
= path
.clone();
185 let response_future
= tokio
::fs
::File
::open(path
)
186 .map_err(move |err
| http_err
!(BAD_REQUEST
, format
!("open file {:?} failed: {}", path2
, err
)))
187 .and_then(move |file
| {
188 env2
.log(format
!("download {:?}", path3
));
189 let payload
= tokio
::codec
::FramedRead
::new(file
, tokio
::codec
::BytesCodec
::new())
190 .map_ok(|bytes
| hyper
::Chunk
::from(bytes
.freeze()));
192 let body
= Body
::wrap_stream(payload
);
194 // fixme: set other headers ?
195 futures
::future
::ok(Response
::builder()
196 .status(StatusCode
::OK
)
197 .header(header
::CONTENT_TYPE
, "application/octet-stream")
202 Ok(Box
::new(response_future
))
205 pub fn api_method_download_chunk() -> ApiAsyncMethod
{
208 ObjectSchema
::new("Download specified chunk.")
209 .required("digest", CHUNK_DIGEST_SCHEMA
.clone())
217 _info
: &ApiAsyncMethod
,
218 rpcenv
: Box
<dyn RpcEnvironment
>,
219 ) -> Result
<BoxFut
, Error
> {
221 let env
: &ReaderEnvironment
= rpcenv
.as_ref();
223 let digest_str
= tools
::required_string_param(¶m
, "digest")?
;
224 let digest
= proxmox
::tools
::hex_to_digest(digest_str
)?
;
226 let (path
, _
) = env
.datastore
.chunk_path(&digest
);
227 let path2
= path
.clone();
229 env
.debug(format
!("download chunk {:?}", path
));
231 let response_future
= tokio
::fs
::read(path
)
232 .map_err(move |err
| http_err
!(BAD_REQUEST
, format
!("reading file {:?} failed: {}", path2
, err
)))
233 .and_then(move |data
| {
234 let body
= Body
::from(data
);
236 // fixme: set other headers ?
239 .status(StatusCode
::OK
)
240 .header(header
::CONTENT_TYPE
, "application/octet-stream")
245 Ok(Box
::new(response_future
))
249 fn download_chunk_old(
253 _info: &ApiAsyncMethod,
254 rpcenv: Box<dyn RpcEnvironment>,
255 ) -> Result<BoxFut, Error> {
257 let env: &ReaderEnvironment = rpcenv.as_ref();
258 let env2 = env.clone();
260 let digest_str = tools::required_string_param(¶m, "digest")?;
261 let digest = proxmox::tools::hex_to_digest(digest_str)?;
263 let (path, _) = env.datastore.chunk_path(&digest);
265 let path2 = path.clone();
266 let path3 = path.clone();
268 let response_future = tokio::fs::File::open(path)
269 .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
270 .and_then(move |file| {
271 env2.debug(format!("download chunk {:?}", path3));
272 let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
273 .map_ok(|bytes| hyper::Chunk::from(bytes.freeze()));
275 let body = Body::wrap_stream(payload);
277 // fixme: set other headers ?
278 futures::future::ok(Response::builder()
279 .status(StatusCode::OK)
280 .header(header::CONTENT_TYPE, "application/octet-stream")
285 Ok(Box::new(response_future))
289 pub fn api_method_speedtest() -> ApiAsyncMethod
{
292 ObjectSchema
::new("Test 4M block download speed.")
300 _info
: &ApiAsyncMethod
,
301 _rpcenv
: Box
<dyn RpcEnvironment
>,
302 ) -> Result
<BoxFut
, Error
> {
304 let buffer
= vec
![65u8; 1024*1024]; // nonsense [A,A,A...]
306 let body
= Body
::from(buffer
);
308 let response
= Response
::builder()
309 .status(StatusCode
::OK
)
310 .header(header
::CONTENT_TYPE
, "application/octet-stream")
314 Ok(Box
::new(future
::ok(response
)))