]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/reader.rs
update a chunk of stuff to the hyper release
[proxmox-backup.git] / src / api2 / reader.rs
1 //use chrono::{Local, TimeZone};
2 use failure::*;
3 use futures::*;
4 use hyper::header::{self, HeaderValue, UPGRADE};
5 use hyper::http::request::Parts;
6 use hyper::{Body, Response, StatusCode};
7 use serde_json::Value;
8
9 use proxmox::{sortable, identity};
10 use proxmox::api::http_err;
11 use proxmox::api::{ApiFuture, ApiHandler, ApiMethod, Router, RpcEnvironment};
12 use proxmox::api::schema::*;
13
14 use crate::api2::types::*;
15 use crate::backup::*;
16 use crate::server::{WorkerTask, H2Service};
17 use crate::tools;
18
19 mod environment;
20 use environment::*;
21
22 pub const ROUTER: Router = Router::new()
23 .upgrade(&API_METHOD_UPGRADE_BACKUP);
24
25 #[sortable]
26 pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
27 &ApiHandler::AsyncHttp(&upgrade_to_backup_reader_protocol),
28 &ObjectSchema::new(
29 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "')."),
30 &sorted!([
31 ("store", false, &DATASTORE_SCHEMA),
32 ("backup-type", false, &StringSchema::new("Backup type.")
33 .format(&ApiStringFormat::Enum(&["vm", "ct", "host"]))
34 .schema()
35 ),
36 ("backup-id", false, &StringSchema::new("Backup ID.").schema()),
37 ("backup-time", false, &IntegerSchema::new("Backup time (Unix epoch.)")
38 .minimum(1_547_797_308)
39 .schema()
40 ),
41 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
42 ]),
43 )
44 );
45
46 fn upgrade_to_backup_reader_protocol(
47 parts: Parts,
48 req_body: Body,
49 param: Value,
50 _info: &ApiMethod,
51 rpcenv: Box<dyn RpcEnvironment>,
52 ) -> ApiFuture {
53
54 async move {
55 let debug = param["debug"].as_bool().unwrap_or(false);
56
57 let store = tools::required_string_param(&param, "store")?.to_owned();
58 let datastore = DataStore::lookup_datastore(&store)?;
59
60 let backup_type = tools::required_string_param(&param, "backup-type")?;
61 let backup_id = tools::required_string_param(&param, "backup-id")?;
62 let backup_time = tools::required_integer_param(&param, "backup-time")?;
63
64 let protocols = parts
65 .headers
66 .get("UPGRADE")
67 .ok_or_else(|| format_err!("missing Upgrade header"))?
68 .to_str()?;
69
70 if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() {
71 bail!("invalid protocol name");
72 }
73
74 if parts.version >= http::version::Version::HTTP_2 {
75 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
76 }
77
78 let username = rpcenv.get_user().unwrap();
79 let env_type = rpcenv.env_type();
80
81 let backup_dir = BackupDir::new(backup_type, backup_id, backup_time);
82 let path = datastore.base_path();
83
84 //let files = BackupInfo::list_files(&path, &backup_dir)?;
85
86 let worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_dir.backup_time().timestamp());
87
88 WorkerTask::spawn("reader", Some(worker_id), &username.clone(), true, move |worker| {
89 let mut env = ReaderEnvironment::new(
90 env_type, username.clone(), worker.clone(), datastore, backup_dir);
91
92 env.debug = debug;
93
94 env.log(format!("starting new backup reader datastore '{}': {:?}", store, path));
95
96 let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
97
98 let abort_future = worker.abort_future();
99
100 let req_fut = req_body
101 .on_upgrade()
102 .map_err(Error::from)
103 .and_then({
104 let env = env.clone();
105 move |conn| {
106 env.debug("protocol upgrade done");
107
108 let mut http = hyper::server::conn::Http::new();
109 http.http2_only(true);
110 // increase window size: todo - find optiomal size
111 let window_size = 32*1024*1024; // max = (1 << 31) - 2
112 http.http2_initial_stream_window_size(window_size);
113 http.http2_initial_connection_window_size(window_size);
114
115 http.serve_connection(conn, service)
116 .map_err(Error::from)
117 }
118 });
119 let abort_future = abort_future
120 .map(|_| Err(format_err!("task aborted")));
121
122 use futures::future::Either;
123 futures::future::select(req_fut, abort_future)
124 .map(|res| match res {
125 Either::Left((Ok(res), _)) => Ok(res),
126 Either::Left((Err(err), _)) => Err(err),
127 Either::Right((Ok(res), _)) => Ok(res),
128 Either::Right((Err(err), _)) => Err(err),
129 })
130 .map_ok(move |_| env.log("reader finished sucessfully"))
131 })?;
132
133 let response = Response::builder()
134 .status(StatusCode::SWITCHING_PROTOCOLS)
135 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
136 .body(Body::empty())?;
137
138 Ok(response)
139 }.boxed()
140 }
141
142 pub const READER_API_ROUTER: Router = Router::new()
143 .subdirs(&[
144 (
145 "chunk", &Router::new()
146 .download(&API_METHOD_DOWNLOAD_CHUNK)
147 ),
148 (
149 "download", &Router::new()
150 .download(&API_METHOD_DOWNLOAD_FILE)
151 ),
152 (
153 "speedtest", &Router::new()
154 .download(&API_METHOD_SPEEDTEST)
155 ),
156 ]);
157
158 #[sortable]
159 pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
160 &ApiHandler::AsyncHttp(&download_file),
161 &ObjectSchema::new(
162 "Download specified file.",
163 &sorted!([
164 ("file-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
165 ]),
166 )
167 );
168
169 fn download_file(
170 _parts: Parts,
171 _req_body: Body,
172 param: Value,
173 _info: &ApiMethod,
174 rpcenv: Box<dyn RpcEnvironment>,
175 ) -> ApiFuture {
176
177 async move {
178 let env: &ReaderEnvironment = rpcenv.as_ref();
179
180 let file_name = tools::required_string_param(&param, "file-name")?.to_owned();
181
182 let mut path = env.datastore.base_path();
183 path.push(env.backup_dir.relative_path());
184 path.push(&file_name);
185
186 let path2 = path.clone();
187 let path3 = path.clone();
188
189 let file = tokio::fs::File::open(path)
190 .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
191 .await?;
192
193 env.log(format!("download {:?}", path3));
194
195 let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
196 .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
197
198 let body = Body::wrap_stream(payload);
199
200 // fixme: set other headers ?
201 Ok(Response::builder()
202 .status(StatusCode::OK)
203 .header(header::CONTENT_TYPE, "application/octet-stream")
204 .body(body)
205 .unwrap())
206 }.boxed()
207 }
208
209 #[sortable]
210 pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
211 &ApiHandler::AsyncHttp(&download_chunk),
212 &ObjectSchema::new(
213 "Download specified chunk.",
214 &sorted!([
215 ("digest", false, &CHUNK_DIGEST_SCHEMA),
216 ]),
217 )
218 );
219
220 fn download_chunk(
221 _parts: Parts,
222 _req_body: Body,
223 param: Value,
224 _info: &ApiMethod,
225 rpcenv: Box<dyn RpcEnvironment>,
226 ) -> ApiFuture {
227
228 async move {
229 let env: &ReaderEnvironment = rpcenv.as_ref();
230
231 let digest_str = tools::required_string_param(&param, "digest")?;
232 let digest = proxmox::tools::hex_to_digest(digest_str)?;
233
234 let (path, _) = env.datastore.chunk_path(&digest);
235 let path2 = path.clone();
236
237 env.debug(format!("download chunk {:?}", path));
238
239 let data = tokio::fs::read(path)
240 .map_err(move |err| http_err!(BAD_REQUEST, format!("reading file {:?} failed: {}", path2, err)))
241 .await?;
242
243 let body = Body::from(data);
244
245 // fixme: set other headers ?
246 Ok(Response::builder()
247 .status(StatusCode::OK)
248 .header(header::CONTENT_TYPE, "application/octet-stream")
249 .body(body)
250 .unwrap())
251 }.boxed()
252 }
253
254 /* this is too slow
255 fn download_chunk_old(
256 _parts: Parts,
257 _req_body: Body,
258 param: Value,
259 _info: &ApiMethod,
260 rpcenv: Box<dyn RpcEnvironment>,
261 ) -> Result<ApiFuture, Error> {
262
263 let env: &ReaderEnvironment = rpcenv.as_ref();
264 let env2 = env.clone();
265
266 let digest_str = tools::required_string_param(&param, "digest")?;
267 let digest = proxmox::tools::hex_to_digest(digest_str)?;
268
269 let (path, _) = env.datastore.chunk_path(&digest);
270
271 let path2 = path.clone();
272 let path3 = path.clone();
273
274 let response_future = tokio::fs::File::open(path)
275 .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
276 .and_then(move |file| {
277 env2.debug(format!("download chunk {:?}", path3));
278 let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
279 .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
280
281 let body = Body::wrap_stream(payload);
282
283 // fixme: set other headers ?
284 futures::future::ok(Response::builder()
285 .status(StatusCode::OK)
286 .header(header::CONTENT_TYPE, "application/octet-stream")
287 .body(body)
288 .unwrap())
289 });
290
291 Ok(Box::new(response_future))
292 }
293 */
294
295 pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
296 &ApiHandler::AsyncHttp(&speedtest),
297 &ObjectSchema::new("Test 4M block download speed.", &[])
298 );
299
300 fn speedtest(
301 _parts: Parts,
302 _req_body: Body,
303 _param: Value,
304 _info: &ApiMethod,
305 _rpcenv: Box<dyn RpcEnvironment>,
306 ) -> ApiFuture {
307
308 let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...]
309
310 let body = Body::from(buffer);
311
312 let response = Response::builder()
313 .status(StatusCode::OK)
314 .header(header::CONTENT_TYPE, "application/octet-stream")
315 .body(body)
316 .unwrap();
317
318 future::ok(response).boxed()
319 }