]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/reader/mod.rs
move required_X_param to pbs_tools::json
[proxmox-backup.git] / src / api2 / reader / mod.rs
1 //! Backup reader/restore protocol (HTTP2 upgrade)
2
3 use anyhow::{bail, format_err, Error};
4 use futures::*;
5 use hyper::header::{self, HeaderValue, UPGRADE};
6 use hyper::http::request::Parts;
7 use hyper::{Body, Response, Request, StatusCode};
8 use serde_json::Value;
9
10 use proxmox::{
11 http_err,
12 sortable,
13 identity,
14 list_subdirs_api_method,
15 api::{
16 ApiResponseFuture,
17 ApiHandler,
18 ApiMethod,
19 Router,
20 RpcEnvironment,
21 Permission,
22 router::SubdirMap,
23 schema::{
24 ObjectSchema,
25 BooleanSchema,
26 },
27 },
28 };
29
30 use pbs_tools::fs::lock_dir_noblock_shared;
31 use pbs_tools::json::{required_integer_param, required_string_param};
32 use pbs_datastore::PROXMOX_BACKUP_READER_PROTOCOL_ID_V1;
33
34 use crate::{
35 api2::{
36 helpers,
37 types::{
38 DATASTORE_SCHEMA,
39 BACKUP_TYPE_SCHEMA,
40 BACKUP_TIME_SCHEMA,
41 BACKUP_ID_SCHEMA,
42 CHUNK_DIGEST_SCHEMA,
43 Authid,
44 },
45 },
46 backup::{
47 DataStore,
48 ArchiveType,
49 BackupDir,
50 IndexFile,
51 archive_type,
52 },
53 server::{
54 WorkerTask,
55 H2Service,
56 },
57 config::{
58 acl::{
59 PRIV_DATASTORE_READ,
60 PRIV_DATASTORE_BACKUP,
61 },
62 cached_user_info::CachedUserInfo,
63 },
64 };
65
66 mod environment;
67 use environment::*;
68
69 pub const ROUTER: Router = Router::new()
70 .upgrade(&API_METHOD_UPGRADE_BACKUP);
71
72 #[sortable]
73 pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
74 &ApiHandler::AsyncHttp(&upgrade_to_backup_reader_protocol),
75 &ObjectSchema::new(
76 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "')."),
77 &sorted!([
78 ("store", false, &DATASTORE_SCHEMA),
79 ("backup-type", false, &BACKUP_TYPE_SCHEMA),
80 ("backup-id", false, &BACKUP_ID_SCHEMA),
81 ("backup-time", false, &BACKUP_TIME_SCHEMA),
82 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
83 ]),
84 )
85 ).access(
86 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
87 Some("The user needs Datastore.Read privilege on /datastore/{store}."),
88 &Permission::Anybody
89 );
90
91 fn upgrade_to_backup_reader_protocol(
92 parts: Parts,
93 req_body: Body,
94 param: Value,
95 _info: &ApiMethod,
96 rpcenv: Box<dyn RpcEnvironment>,
97 ) -> ApiResponseFuture {
98
99 async move {
100 let debug = param["debug"].as_bool().unwrap_or(false);
101
102 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
103 let store = required_string_param(&param, "store")?.to_owned();
104
105 let user_info = CachedUserInfo::new()?;
106 let privs = user_info.lookup_privs(&auth_id, &["datastore", &store]);
107
108 let priv_read = privs & PRIV_DATASTORE_READ != 0;
109 let priv_backup = privs & PRIV_DATASTORE_BACKUP != 0;
110
111 // priv_backup needs owner check further down below!
112 if !priv_read && !priv_backup {
113 bail!("no permissions on /datastore/{}", store);
114 }
115
116 let datastore = DataStore::lookup_datastore(&store)?;
117
118 let backup_type = required_string_param(&param, "backup-type")?;
119 let backup_id = required_string_param(&param, "backup-id")?;
120 let backup_time = required_integer_param(&param, "backup-time")?;
121
122 let protocols = parts
123 .headers
124 .get("UPGRADE")
125 .ok_or_else(|| format_err!("missing Upgrade header"))?
126 .to_str()?;
127
128 if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() {
129 bail!("invalid protocol name");
130 }
131
132 if parts.version >= http::version::Version::HTTP_2 {
133 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
134 }
135
136 let env_type = rpcenv.env_type();
137
138 let backup_dir = BackupDir::new(backup_type, backup_id, backup_time)?;
139 if !priv_read {
140 let owner = datastore.get_owner(backup_dir.group())?;
141 let correct_owner = owner == auth_id
142 || (owner.is_token()
143 && Authid::from(owner.user().clone()) == auth_id);
144 if !correct_owner {
145 bail!("backup owner check failed!");
146 }
147 }
148
149 let _guard = lock_dir_noblock_shared(
150 &datastore.snapshot_path(&backup_dir),
151 "snapshot",
152 "locked by another operation")?;
153
154 let path = datastore.base_path();
155
156 //let files = BackupInfo::list_files(&path, &backup_dir)?;
157
158 let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time());
159
160 WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| async move {
161 let _guard = _guard;
162
163 let mut env = ReaderEnvironment::new(
164 env_type,
165 auth_id,
166 worker.clone(),
167 datastore,
168 backup_dir,
169 );
170
171 env.debug = debug;
172
173 env.log(format!("starting new backup reader datastore '{}': {:?}", store, path));
174
175 let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
176
177 let mut abort_future = worker.abort_future()
178 .map(|_| Err(format_err!("task aborted")));
179
180 let env2 = env.clone();
181 let req_fut = async move {
182 let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?;
183 env2.debug("protocol upgrade done");
184
185 let mut http = hyper::server::conn::Http::new();
186 http.http2_only(true);
187 // increase window size: todo - find optiomal size
188 let window_size = 32*1024*1024; // max = (1 << 31) - 2
189 http.http2_initial_stream_window_size(window_size);
190 http.http2_initial_connection_window_size(window_size);
191 http.http2_max_frame_size(4*1024*1024);
192
193 http.serve_connection(conn, service)
194 .map_err(Error::from).await
195 };
196
197 futures::select!{
198 req = req_fut.fuse() => req?,
199 abort = abort_future => abort?,
200 };
201
202 env.log("reader finished successfully");
203
204 Ok(())
205 })?;
206
207 let response = Response::builder()
208 .status(StatusCode::SWITCHING_PROTOCOLS)
209 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
210 .body(Body::empty())?;
211
212 Ok(response)
213 }.boxed()
214 }
215
216 const READER_API_SUBDIRS: SubdirMap = &[
217 (
218 "chunk", &Router::new()
219 .download(&API_METHOD_DOWNLOAD_CHUNK)
220 ),
221 (
222 "download", &Router::new()
223 .download(&API_METHOD_DOWNLOAD_FILE)
224 ),
225 (
226 "speedtest", &Router::new()
227 .download(&API_METHOD_SPEEDTEST)
228 ),
229 ];
230
231 pub const READER_API_ROUTER: Router = Router::new()
232 .get(&list_subdirs_api_method!(READER_API_SUBDIRS))
233 .subdirs(READER_API_SUBDIRS);
234
235 #[sortable]
236 pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
237 &ApiHandler::AsyncHttp(&download_file),
238 &ObjectSchema::new(
239 "Download specified file.",
240 &sorted!([
241 ("file-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
242 ]),
243 )
244 );
245
246 fn download_file(
247 _parts: Parts,
248 _req_body: Body,
249 param: Value,
250 _info: &ApiMethod,
251 rpcenv: Box<dyn RpcEnvironment>,
252 ) -> ApiResponseFuture {
253
254 async move {
255 let env: &ReaderEnvironment = rpcenv.as_ref();
256
257 let file_name = required_string_param(&param, "file-name")?.to_owned();
258
259 let mut path = env.datastore.base_path();
260 path.push(env.backup_dir.relative_path());
261 path.push(&file_name);
262
263 env.log(format!("download {:?}", path.clone()));
264
265 let index: Option<Box<dyn IndexFile + Send>> = match archive_type(&file_name)? {
266 ArchiveType::FixedIndex => {
267 let index = env.datastore.open_fixed_reader(&path)?;
268 Some(Box::new(index))
269 }
270 ArchiveType::DynamicIndex => {
271 let index = env.datastore.open_dynamic_reader(&path)?;
272 Some(Box::new(index))
273 }
274 _ => { None }
275 };
276
277 if let Some(index) = index {
278 env.log(format!("register chunks in '{}' as downloadable.", file_name));
279
280 for pos in 0..index.index_count() {
281 let info = index.chunk_info(pos).unwrap();
282 env.register_chunk(info.digest);
283 }
284 }
285
286 helpers::create_download_response(path).await
287 }.boxed()
288 }
289
290 #[sortable]
291 pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
292 &ApiHandler::AsyncHttp(&download_chunk),
293 &ObjectSchema::new(
294 "Download specified chunk.",
295 &sorted!([
296 ("digest", false, &CHUNK_DIGEST_SCHEMA),
297 ]),
298 )
299 );
300
301 fn download_chunk(
302 _parts: Parts,
303 _req_body: Body,
304 param: Value,
305 _info: &ApiMethod,
306 rpcenv: Box<dyn RpcEnvironment>,
307 ) -> ApiResponseFuture {
308
309 async move {
310 let env: &ReaderEnvironment = rpcenv.as_ref();
311
312 let digest_str = required_string_param(&param, "digest")?;
313 let digest = proxmox::tools::hex_to_digest(digest_str)?;
314
315 if !env.check_chunk_access(digest) {
316 env.log(format!("attempted to download chunk {} which is not in registered chunk list", digest_str));
317 return Err(http_err!(UNAUTHORIZED, "download chunk {} not allowed", digest_str));
318 }
319
320 let (path, _) = env.datastore.chunk_path(&digest);
321 let path2 = path.clone();
322
323 env.debug(format!("download chunk {:?}", path));
324
325 let data = pbs_runtime::block_in_place(|| std::fs::read(path))
326 .map_err(move |err| http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err))?;
327
328 let body = Body::from(data);
329
330 // fixme: set other headers ?
331 Ok(Response::builder()
332 .status(StatusCode::OK)
333 .header(header::CONTENT_TYPE, "application/octet-stream")
334 .body(body)
335 .unwrap())
336 }.boxed()
337 }
338
339 /* this is too slow
340 fn download_chunk_old(
341 _parts: Parts,
342 _req_body: Body,
343 param: Value,
344 _info: &ApiMethod,
345 rpcenv: Box<dyn RpcEnvironment>,
346 ) -> Result<ApiResponseFuture, Error> {
347
348 let env: &ReaderEnvironment = rpcenv.as_ref();
349 let env2 = env.clone();
350
351 let digest_str = required_string_param(&param, "digest")?;
352 let digest = proxmox::tools::hex_to_digest(digest_str)?;
353
354 let (path, _) = env.datastore.chunk_path(&digest);
355
356 let path2 = path.clone();
357 let path3 = path.clone();
358
359 let response_future = tokio::fs::File::open(path)
360 .map_err(move |err| http_err!(BAD_REQUEST, "open file {:?} failed: {}", path2, err))
361 .and_then(move |file| {
362 env2.debug(format!("download chunk {:?}", path3));
363 let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
364 .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
365
366 let body = Body::wrap_stream(payload);
367
368 // fixme: set other headers ?
369 futures::future::ok(Response::builder()
370 .status(StatusCode::OK)
371 .header(header::CONTENT_TYPE, "application/octet-stream")
372 .body(body)
373 .unwrap())
374 });
375
376 Ok(Box::new(response_future))
377 }
378 */
379
380 pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
381 &ApiHandler::AsyncHttp(&speedtest),
382 &ObjectSchema::new("Test 1M block download speed.", &[])
383 );
384
385 fn speedtest(
386 _parts: Parts,
387 _req_body: Body,
388 _param: Value,
389 _info: &ApiMethod,
390 _rpcenv: Box<dyn RpcEnvironment>,
391 ) -> ApiResponseFuture {
392
393 let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...]
394
395 let body = Body::from(buffer);
396
397 let response = Response::builder()
398 .status(StatusCode::OK)
399 .header(header::CONTENT_TYPE, "application/octet-stream")
400 .body(body)
401 .unwrap();
402
403 future::ok(response).boxed()
404 }