]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/reader.rs
use proxmox 0.1.25, use new EnumEntry feature
[proxmox-backup.git] / src / api2 / reader.rs
1 //use chrono::{Local, TimeZone};
2 use anyhow::{bail, format_err, Error};
3 use futures::*;
4 use hyper::header::{self, HeaderValue, UPGRADE};
5 use hyper::http::request::Parts;
6 use hyper::{Body, Response, StatusCode};
7 use serde_json::Value;
8
9 use proxmox::{sortable, identity};
10 use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironment, Permission};
11 use proxmox::api::schema::*;
12 use proxmox::http_err;
13
14 use crate::api2::types::*;
15 use crate::backup::*;
16 use crate::server::{WorkerTask, H2Service};
17 use crate::tools;
18 use crate::config::acl::PRIV_DATASTORE_READ;
19 use crate::config::cached_user_info::CachedUserInfo;
20
21 mod environment;
22 use environment::*;
23
24 pub const ROUTER: Router = Router::new()
25 .upgrade(&API_METHOD_UPGRADE_BACKUP);
26
27 #[sortable]
28 pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
29 &ApiHandler::AsyncHttp(&upgrade_to_backup_reader_protocol),
30 &ObjectSchema::new(
31 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "')."),
32 &sorted!([
33 ("store", false, &DATASTORE_SCHEMA),
34 ("backup-type", false, &BACKUP_TYPE_SCHEMA),
35 ("backup-id", false, &BACKUP_ID_SCHEMA),
36 ("backup-time", false, &BACKUP_TIME_SCHEMA),
37 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
38 ]),
39 )
40 ).access(
41 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
42 Some("The user needs Datastore.Read privilege on /datastore/{store}."),
43 &Permission::Anybody
44 );
45
46 fn upgrade_to_backup_reader_protocol(
47 parts: Parts,
48 req_body: Body,
49 param: Value,
50 _info: &ApiMethod,
51 rpcenv: Box<dyn RpcEnvironment>,
52 ) -> ApiResponseFuture {
53
54 async move {
55 let debug = param["debug"].as_bool().unwrap_or(false);
56
57 let username = rpcenv.get_user().unwrap();
58 let store = tools::required_string_param(&param, "store")?.to_owned();
59
60 let user_info = CachedUserInfo::new()?;
61 user_info.check_privs(&username, &["datastore", &store], PRIV_DATASTORE_READ, false)?;
62
63 let datastore = DataStore::lookup_datastore(&store)?;
64
65 let backup_type = tools::required_string_param(&param, "backup-type")?;
66 let backup_id = tools::required_string_param(&param, "backup-id")?;
67 let backup_time = tools::required_integer_param(&param, "backup-time")?;
68
69 let protocols = parts
70 .headers
71 .get("UPGRADE")
72 .ok_or_else(|| format_err!("missing Upgrade header"))?
73 .to_str()?;
74
75 if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() {
76 bail!("invalid protocol name");
77 }
78
79 if parts.version >= http::version::Version::HTTP_2 {
80 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
81 }
82
83 let env_type = rpcenv.env_type();
84
85 let backup_dir = BackupDir::new(backup_type, backup_id, backup_time);
86 let path = datastore.base_path();
87
88 //let files = BackupInfo::list_files(&path, &backup_dir)?;
89
90 let worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_dir.backup_time().timestamp());
91
92 WorkerTask::spawn("reader", Some(worker_id), &username.clone(), true, move |worker| {
93 let mut env = ReaderEnvironment::new(
94 env_type, username.clone(), worker.clone(), datastore, backup_dir);
95
96 env.debug = debug;
97
98 env.log(format!("starting new backup reader datastore '{}': {:?}", store, path));
99
100 let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
101
102 let abort_future = worker.abort_future();
103
104 let req_fut = req_body
105 .on_upgrade()
106 .map_err(Error::from)
107 .and_then({
108 let env = env.clone();
109 move |conn| {
110 env.debug("protocol upgrade done");
111
112 let mut http = hyper::server::conn::Http::new();
113 http.http2_only(true);
114 // increase window size: todo - find optiomal size
115 let window_size = 32*1024*1024; // max = (1 << 31) - 2
116 http.http2_initial_stream_window_size(window_size);
117 http.http2_initial_connection_window_size(window_size);
118
119 http.serve_connection(conn, service)
120 .map_err(Error::from)
121 }
122 });
123 let abort_future = abort_future
124 .map(|_| Err(format_err!("task aborted")));
125
126 use futures::future::Either;
127 futures::future::select(req_fut, abort_future)
128 .map(|res| match res {
129 Either::Left((Ok(res), _)) => Ok(res),
130 Either::Left((Err(err), _)) => Err(err),
131 Either::Right((Ok(res), _)) => Ok(res),
132 Either::Right((Err(err), _)) => Err(err),
133 })
134 .map_ok(move |_| env.log("reader finished sucessfully"))
135 })?;
136
137 let response = Response::builder()
138 .status(StatusCode::SWITCHING_PROTOCOLS)
139 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
140 .body(Body::empty())?;
141
142 Ok(response)
143 }.boxed()
144 }
145
146 pub const READER_API_ROUTER: Router = Router::new()
147 .subdirs(&[
148 (
149 "chunk", &Router::new()
150 .download(&API_METHOD_DOWNLOAD_CHUNK)
151 ),
152 (
153 "download", &Router::new()
154 .download(&API_METHOD_DOWNLOAD_FILE)
155 ),
156 (
157 "speedtest", &Router::new()
158 .download(&API_METHOD_SPEEDTEST)
159 ),
160 ]);
161
162 #[sortable]
163 pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
164 &ApiHandler::AsyncHttp(&download_file),
165 &ObjectSchema::new(
166 "Download specified file.",
167 &sorted!([
168 ("file-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
169 ]),
170 )
171 );
172
173 fn download_file(
174 _parts: Parts,
175 _req_body: Body,
176 param: Value,
177 _info: &ApiMethod,
178 rpcenv: Box<dyn RpcEnvironment>,
179 ) -> ApiResponseFuture {
180
181 async move {
182 let env: &ReaderEnvironment = rpcenv.as_ref();
183
184 let file_name = tools::required_string_param(&param, "file-name")?.to_owned();
185
186 let mut path = env.datastore.base_path();
187 path.push(env.backup_dir.relative_path());
188 path.push(&file_name);
189
190 let path2 = path.clone();
191 let path3 = path.clone();
192
193 let file = tokio::fs::File::open(path)
194 .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
195 .await?;
196
197 env.log(format!("download {:?}", path3));
198
199 let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
200 .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
201
202 let body = Body::wrap_stream(payload);
203
204 // fixme: set other headers ?
205 Ok(Response::builder()
206 .status(StatusCode::OK)
207 .header(header::CONTENT_TYPE, "application/octet-stream")
208 .body(body)
209 .unwrap())
210 }.boxed()
211 }
212
213 #[sortable]
214 pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
215 &ApiHandler::AsyncHttp(&download_chunk),
216 &ObjectSchema::new(
217 "Download specified chunk.",
218 &sorted!([
219 ("digest", false, &CHUNK_DIGEST_SCHEMA),
220 ]),
221 )
222 );
223
224 fn download_chunk(
225 _parts: Parts,
226 _req_body: Body,
227 param: Value,
228 _info: &ApiMethod,
229 rpcenv: Box<dyn RpcEnvironment>,
230 ) -> ApiResponseFuture {
231
232 async move {
233 let env: &ReaderEnvironment = rpcenv.as_ref();
234
235 let digest_str = tools::required_string_param(&param, "digest")?;
236 let digest = proxmox::tools::hex_to_digest(digest_str)?;
237
238 let (path, _) = env.datastore.chunk_path(&digest);
239 let path2 = path.clone();
240
241 env.debug(format!("download chunk {:?}", path));
242
243 let data = tokio::fs::read(path)
244 .map_err(move |err| http_err!(BAD_REQUEST, format!("reading file {:?} failed: {}", path2, err)))
245 .await?;
246
247 let body = Body::from(data);
248
249 // fixme: set other headers ?
250 Ok(Response::builder()
251 .status(StatusCode::OK)
252 .header(header::CONTENT_TYPE, "application/octet-stream")
253 .body(body)
254 .unwrap())
255 }.boxed()
256 }
257
258 /* this is too slow
259 fn download_chunk_old(
260 _parts: Parts,
261 _req_body: Body,
262 param: Value,
263 _info: &ApiMethod,
264 rpcenv: Box<dyn RpcEnvironment>,
265 ) -> Result<ApiResponseFuture, Error> {
266
267 let env: &ReaderEnvironment = rpcenv.as_ref();
268 let env2 = env.clone();
269
270 let digest_str = tools::required_string_param(&param, "digest")?;
271 let digest = proxmox::tools::hex_to_digest(digest_str)?;
272
273 let (path, _) = env.datastore.chunk_path(&digest);
274
275 let path2 = path.clone();
276 let path3 = path.clone();
277
278 let response_future = tokio::fs::File::open(path)
279 .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
280 .and_then(move |file| {
281 env2.debug(format!("download chunk {:?}", path3));
282 let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
283 .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
284
285 let body = Body::wrap_stream(payload);
286
287 // fixme: set other headers ?
288 futures::future::ok(Response::builder()
289 .status(StatusCode::OK)
290 .header(header::CONTENT_TYPE, "application/octet-stream")
291 .body(body)
292 .unwrap())
293 });
294
295 Ok(Box::new(response_future))
296 }
297 */
298
299 pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
300 &ApiHandler::AsyncHttp(&speedtest),
301 &ObjectSchema::new("Test 4M block download speed.", &[])
302 );
303
304 fn speedtest(
305 _parts: Parts,
306 _req_body: Body,
307 _param: Value,
308 _info: &ApiMethod,
309 _rpcenv: Box<dyn RpcEnvironment>,
310 ) -> ApiResponseFuture {
311
312 let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...]
313
314 let body = Body::from(buffer);
315
316 let response = Response::builder()
317 .status(StatusCode::OK)
318 .header(header::CONTENT_TYPE, "application/octet-stream")
319 .body(body)
320 .unwrap();
321
322 future::ok(response).boxed()
323 }