]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/reader.rs
src/client/http_client.rs - h2 download: implement flow control
[proxmox-backup.git] / src / api2 / reader.rs
CommitLineData
dd066d28
DM
1use failure::*;
2use lazy_static::lazy_static;
3
4use std::sync::Arc;
5
6use futures::*;
7use hyper::header::{self, HeaderValue, UPGRADE};
8use hyper::{Body, Response, StatusCode};
9use hyper::http::request::Parts;
10//use chrono::{Local, TimeZone};
11
12use serde_json::Value;
13
14use crate::tools;
15use crate::api_schema::router::*;
16use crate::api_schema::*;
17use crate::server::{WorkerTask, H2Service};
18use crate::backup::*;
19//use crate::api2::types::*;
20
21mod environment;
22use environment::*;
23
24pub fn router() -> Router {
25 Router::new()
26 .upgrade(api_method_upgrade_backup())
27}
28
29pub fn api_method_upgrade_backup() -> ApiAsyncMethod {
30 ApiAsyncMethod::new(
31 upgrade_to_backup_reader_protocol,
32 ObjectSchema::new(concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "')."))
33 .required("store", StringSchema::new("Datastore name."))
34 .required("backup-type", StringSchema::new("Backup type.")
35 .format(Arc::new(ApiStringFormat::Enum(&["vm", "ct", "host"]))))
36 .required("backup-id", StringSchema::new("Backup ID."))
37 .required("backup-time", IntegerSchema::new("Backup time (Unix epoch.)")
38 .minimum(1547797308))
39 .optional("debug", BooleanSchema::new("Enable verbose debug logging."))
40 )
41}
42
43fn upgrade_to_backup_reader_protocol(
44 parts: Parts,
45 req_body: Body,
46 param: Value,
47 _info: &ApiAsyncMethod,
48 rpcenv: Box<dyn RpcEnvironment>,
49) -> Result<BoxFut, Error> {
50
51 let debug = param["debug"].as_bool().unwrap_or(false);
52
53 let store = tools::required_string_param(&param, "store")?.to_owned();
54 let datastore = DataStore::lookup_datastore(&store)?;
55
56 let backup_type = tools::required_string_param(&param, "backup-type")?;
57 let backup_id = tools::required_string_param(&param, "backup-id")?;
58 let backup_time = tools::required_integer_param(&param, "backup-time")?;
59
60 let protocols = parts
61 .headers
62 .get("UPGRADE")
63 .ok_or_else(|| format_err!("missing Upgrade header"))?
64 .to_str()?;
65
66 if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() {
67 bail!("invalid protocol name");
68 }
69
70 if parts.version >= http::version::Version::HTTP_2 {
71 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
72 }
73
74 let username = rpcenv.get_user().unwrap();
75 let env_type = rpcenv.env_type();
76
77 let backup_dir = BackupDir::new(backup_type, backup_id, backup_time);
78 let path = datastore.base_path();
79
80 //let files = BackupInfo::list_files(&path, &backup_dir)?;
81
82 let worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_dir.backup_time().timestamp());
83
84 WorkerTask::spawn("reader", Some(worker_id), &username.clone(), true, move |worker| {
85 let mut env = ReaderEnvironment::new(
86 env_type, username.clone(), worker.clone(), datastore, backup_dir);
87
88 env.debug = debug;
89
90 env.log(format!("starting new backup reader datastore '{}': {:?}", store, path));
91
92 let service = H2Service::new(env.clone(), worker.clone(), &READER_ROUTER, debug);
93
94 let abort_future = worker.abort_future();
95
96 let env3 = env.clone();
97
98 req_body
99 .on_upgrade()
100 .map_err(Error::from)
101 .and_then(move |conn| {
102 env3.debug("protocol upgrade done");
103
104 let mut http = hyper::server::conn::Http::new();
105 http.http2_only(true);
106 // increase window size: todo - find optiomal size
107 let window_size = 32*1024*1024; // max = (1 << 31) - 2
108 http.http2_initial_stream_window_size(window_size);
109 http.http2_initial_connection_window_size(window_size);
110
111 http.serve_connection(conn, service)
112 .map_err(Error::from)
113 })
114 .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); }))
115 .map_err(|(err, _)| err)
116 .and_then(move |(_result, _)| {
117 env.log("reader finished sucessfully");
118 Ok(())
119 })
120 })?;
121
122 let response = Response::builder()
123 .status(StatusCode::SWITCHING_PROTOCOLS)
124 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
125 .body(Body::empty())?;
126
127 Ok(Box::new(futures::future::ok(response)))
128}
129
130lazy_static!{
131 static ref READER_ROUTER: Router = reader_api();
132}
133
134pub fn reader_api() -> Router {
135
136 let router = Router::new()
137 .subdir(
138 "download", Router::new()
139 .download(api_method_download_file())
140 );
141
142 router
143}
144
145pub fn api_method_download_file() -> ApiAsyncMethod {
146 ApiAsyncMethod::new(
147 download_file,
148 ObjectSchema::new("Download specified file.")
149 .required("file-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
150 )
151}
152
153fn download_file(
154 _parts: Parts,
155 _req_body: Body,
156 param: Value,
157 _info: &ApiAsyncMethod,
158 rpcenv: Box<dyn RpcEnvironment>,
159) -> Result<BoxFut, Error> {
160
161 let env: &ReaderEnvironment = rpcenv.as_ref();
162 let env2 = env.clone();
163
164 let file_name = tools::required_string_param(&param, "file-name")?.to_owned();
165
166 let mut path = env.datastore.base_path();
167 path.push(env.backup_dir.relative_path());
168 path.push(&file_name);
169
170 let path2 = path.clone();
171 let path3 = path.clone();
172
173 let response_future = tokio::fs::File::open(path)
174 .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
175 .and_then(move |file| {
176 env2.log(format!("download {:?}", path3));
177 let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()).
178 map(|bytes| {
179 //sigh - howto avoid copy here? or the whole map() ??
180 hyper::Chunk::from(bytes.to_vec())
181 });
182 let body = Body::wrap_stream(payload);
183
184 // fixme: set other headers ?
185 Ok(Response::builder()
186 .status(StatusCode::OK)
187 .header(header::CONTENT_TYPE, "application/octet-stream")
188 .body(body)
189 .unwrap())
190 });
191
192 Ok(Box::new(response_future))
193}