]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/reader.rs
6aa5504c141788dcf41ada76cdaba82dc30d135d
[proxmox-backup.git] / src / api2 / reader.rs
1 use failure::*;
2 use lazy_static::lazy_static;
3
4 use std::sync::Arc;
5
6 use futures::*;
7 use hyper::header::{self, HeaderValue, UPGRADE};
8 use hyper::{Body, Response, StatusCode};
9 use hyper::http::request::Parts;
10 //use chrono::{Local, TimeZone};
11
12 use serde_json::Value;
13
14 use crate::tools;
15 use crate::api_schema::router::*;
16 use crate::api_schema::*;
17 use crate::server::{WorkerTask, H2Service};
18 use crate::backup::*;
19 use crate::api2::types::*;
20
21 mod environment;
22 use environment::*;
23
24 pub fn router() -> Router {
25 Router::new()
26 .upgrade(api_method_upgrade_backup())
27 }
28
29 pub fn api_method_upgrade_backup() -> ApiAsyncMethod {
30 ApiAsyncMethod::new(
31 upgrade_to_backup_reader_protocol,
32 ObjectSchema::new(concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "')."))
33 .required("store", StringSchema::new("Datastore name."))
34 .required("backup-type", StringSchema::new("Backup type.")
35 .format(Arc::new(ApiStringFormat::Enum(&["vm", "ct", "host"]))))
36 .required("backup-id", StringSchema::new("Backup ID."))
37 .required("backup-time", IntegerSchema::new("Backup time (Unix epoch.)")
38 .minimum(1_547_797_308))
39 .optional("debug", BooleanSchema::new("Enable verbose debug logging."))
40 )
41 }
42
43 fn upgrade_to_backup_reader_protocol(
44 parts: Parts,
45 req_body: Body,
46 param: Value,
47 _info: &ApiAsyncMethod,
48 rpcenv: Box<dyn RpcEnvironment>,
49 ) -> Result<BoxFut, Error> {
50
51 let debug = param["debug"].as_bool().unwrap_or(false);
52
53 let store = tools::required_string_param(&param, "store")?.to_owned();
54 let datastore = DataStore::lookup_datastore(&store)?;
55
56 let backup_type = tools::required_string_param(&param, "backup-type")?;
57 let backup_id = tools::required_string_param(&param, "backup-id")?;
58 let backup_time = tools::required_integer_param(&param, "backup-time")?;
59
60 let protocols = parts
61 .headers
62 .get("UPGRADE")
63 .ok_or_else(|| format_err!("missing Upgrade header"))?
64 .to_str()?;
65
66 if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() {
67 bail!("invalid protocol name");
68 }
69
70 if parts.version >= http::version::Version::HTTP_2 {
71 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
72 }
73
74 let username = rpcenv.get_user().unwrap();
75 let env_type = rpcenv.env_type();
76
77 let backup_dir = BackupDir::new(backup_type, backup_id, backup_time);
78 let path = datastore.base_path();
79
80 //let files = BackupInfo::list_files(&path, &backup_dir)?;
81
82 let worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_dir.backup_time().timestamp());
83
84 WorkerTask::spawn("reader", Some(worker_id), &username.clone(), true, move |worker| {
85 let mut env = ReaderEnvironment::new(
86 env_type, username.clone(), worker.clone(), datastore, backup_dir);
87
88 env.debug = debug;
89
90 env.log(format!("starting new backup reader datastore '{}': {:?}", store, path));
91
92 let service = H2Service::new(env.clone(), worker.clone(), &READER_ROUTER, debug);
93
94 let abort_future = worker.abort_future();
95
96 let req_fut = req_body
97 .on_upgrade()
98 .map_err(Error::from)
99 .and_then({
100 let env = env.clone();
101 move |conn| {
102 env.debug("protocol upgrade done");
103
104 let mut http = hyper::server::conn::Http::new();
105 http.http2_only(true);
106 // increase window size: todo - find optiomal size
107 let window_size = 32*1024*1024; // max = (1 << 31) - 2
108 http.http2_initial_stream_window_size(window_size);
109 http.http2_initial_connection_window_size(window_size);
110
111 http.serve_connection(conn, service)
112 .map_err(Error::from)
113 }
114 });
115 let abort_future = abort_future
116 .map(|_| Err(format_err!("task aborted")));
117
118 use futures::future::Either;
119 futures::future::select(req_fut, abort_future)
120 .map(|res| match res {
121 Either::Left((Ok(res), _)) => Ok(res),
122 Either::Left((Err(err), _)) => Err(err),
123 Either::Right((Ok(res), _)) => Ok(res),
124 Either::Right((Err(err), _)) => Err(err),
125 })
126 .map_ok(move |_| env.log("reader finished sucessfully"))
127 })?;
128
129 let response = Response::builder()
130 .status(StatusCode::SWITCHING_PROTOCOLS)
131 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
132 .body(Body::empty())?;
133
134 Ok(Box::new(futures::future::ok(response)))
135 }
136
137 lazy_static!{
138 static ref READER_ROUTER: Router = reader_api();
139 }
140
141 pub fn reader_api() -> Router {
142 Router::new()
143 .subdir(
144 "chunk", Router::new()
145 .download(api_method_download_chunk())
146 )
147 .subdir(
148 "download", Router::new()
149 .download(api_method_download_file())
150 )
151 .subdir(
152 "speedtest", Router::new()
153 .download(api_method_speedtest())
154 )
155 }
156
157 pub fn api_method_download_file() -> ApiAsyncMethod {
158 ApiAsyncMethod::new(
159 download_file,
160 ObjectSchema::new("Download specified file.")
161 .required("file-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
162 )
163 }
164
165 fn download_file(
166 _parts: Parts,
167 _req_body: Body,
168 param: Value,
169 _info: &ApiAsyncMethod,
170 rpcenv: Box<dyn RpcEnvironment>,
171 ) -> Result<BoxFut, Error> {
172
173 let env: &ReaderEnvironment = rpcenv.as_ref();
174 let env2 = env.clone();
175
176 let file_name = tools::required_string_param(&param, "file-name")?.to_owned();
177
178 let mut path = env.datastore.base_path();
179 path.push(env.backup_dir.relative_path());
180 path.push(&file_name);
181
182 let path2 = path.clone();
183 let path3 = path.clone();
184
185 let response_future = tokio::fs::File::open(path)
186 .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
187 .and_then(move |file| {
188 env2.log(format!("download {:?}", path3));
189 let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
190 .map_ok(|bytes| hyper::Chunk::from(bytes.freeze()));
191
192 let body = Body::wrap_stream(payload);
193
194 // fixme: set other headers ?
195 futures::future::ok(Response::builder()
196 .status(StatusCode::OK)
197 .header(header::CONTENT_TYPE, "application/octet-stream")
198 .body(body)
199 .unwrap())
200 });
201
202 Ok(Box::new(response_future))
203 }
204
205 pub fn api_method_download_chunk() -> ApiAsyncMethod {
206 ApiAsyncMethod::new(
207 download_chunk,
208 ObjectSchema::new("Download specified chunk.")
209 .required("digest", CHUNK_DIGEST_SCHEMA.clone())
210 )
211 }
212
213 fn download_chunk(
214 _parts: Parts,
215 _req_body: Body,
216 param: Value,
217 _info: &ApiAsyncMethod,
218 rpcenv: Box<dyn RpcEnvironment>,
219 ) -> Result<BoxFut, Error> {
220
221 let env: &ReaderEnvironment = rpcenv.as_ref();
222
223 let digest_str = tools::required_string_param(&param, "digest")?;
224 let digest = proxmox::tools::hex_to_digest(digest_str)?;
225
226 let (path, _) = env.datastore.chunk_path(&digest);
227 let path2 = path.clone();
228
229 env.debug(format!("download chunk {:?}", path));
230
231 let response_future = tokio::fs::read(path)
232 .map_err(move |err| http_err!(BAD_REQUEST, format!("reading file {:?} failed: {}", path2, err)))
233 .and_then(move |data| {
234 let body = Body::from(data);
235
236 // fixme: set other headers ?
237 futures::future::ok(
238 Response::builder()
239 .status(StatusCode::OK)
240 .header(header::CONTENT_TYPE, "application/octet-stream")
241 .body(body)
242 .unwrap())
243 });
244
245 Ok(Box::new(response_future))
246 }
247
248 /* this is too slow
249 fn download_chunk_old(
250 _parts: Parts,
251 _req_body: Body,
252 param: Value,
253 _info: &ApiAsyncMethod,
254 rpcenv: Box<dyn RpcEnvironment>,
255 ) -> Result<BoxFut, Error> {
256
257 let env: &ReaderEnvironment = rpcenv.as_ref();
258 let env2 = env.clone();
259
260 let digest_str = tools::required_string_param(&param, "digest")?;
261 let digest = proxmox::tools::hex_to_digest(digest_str)?;
262
263 let (path, _) = env.datastore.chunk_path(&digest);
264
265 let path2 = path.clone();
266 let path3 = path.clone();
267
268 let response_future = tokio::fs::File::open(path)
269 .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
270 .and_then(move |file| {
271 env2.debug(format!("download chunk {:?}", path3));
272 let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
273 .map_ok(|bytes| hyper::Chunk::from(bytes.freeze()));
274
275 let body = Body::wrap_stream(payload);
276
277 // fixme: set other headers ?
278 futures::future::ok(Response::builder()
279 .status(StatusCode::OK)
280 .header(header::CONTENT_TYPE, "application/octet-stream")
281 .body(body)
282 .unwrap())
283 });
284
285 Ok(Box::new(response_future))
286 }
287 */
288
289 pub fn api_method_speedtest() -> ApiAsyncMethod {
290 ApiAsyncMethod::new(
291 speedtest,
292 ObjectSchema::new("Test 4M block download speed.")
293 )
294 }
295
296 fn speedtest(
297 _parts: Parts,
298 _req_body: Body,
299 _param: Value,
300 _info: &ApiAsyncMethod,
301 _rpcenv: Box<dyn RpcEnvironment>,
302 ) -> Result<BoxFut, Error> {
303
304 let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...]
305
306 let body = Body::from(buffer);
307
308 let response = Response::builder()
309 .status(StatusCode::OK)
310 .header(header::CONTENT_TYPE, "application/octet-stream")
311 .body(body)
312 .unwrap();
313
314 Ok(Box::new(future::ok(response)))
315 }