]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/reader/mod.rs
sync job: fix worker ID parsing
[proxmox-backup.git] / src / api2 / reader / mod.rs
1 //! Backup reader/restore protocol (HTTP2 upgrade)
2
3 use anyhow::{bail, format_err, Error};
4 use futures::*;
5 use hex::FromHex;
6 use hyper::header::{self, HeaderValue, UPGRADE};
7 use hyper::http::request::Parts;
8 use hyper::{Body, Request, Response, StatusCode};
9 use serde::Deserialize;
10 use serde_json::Value;
11
12 use proxmox_router::{
13 http_err, list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission,
14 Router, RpcEnvironment, SubdirMap,
15 };
16 use proxmox_schema::{BooleanSchema, ObjectSchema};
17 use proxmox_sys::sortable;
18
19 use pbs_api_types::{
20 Authid, Operation, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA,
21 BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CHUNK_DIGEST_SCHEMA, DATASTORE_SCHEMA,
22 PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
23 };
24 use pbs_config::CachedUserInfo;
25 use pbs_datastore::index::IndexFile;
26 use pbs_datastore::manifest::{archive_type, ArchiveType};
27 use pbs_datastore::{DataStore, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1};
28 use pbs_tools::json::required_string_param;
29 use proxmox_rest_server::{H2Service, WorkerTask};
30 use proxmox_sys::fs::lock_dir_noblock_shared;
31
32 use crate::api2::backup::optional_ns_param;
33 use crate::api2::helpers;
34
35 mod environment;
36 use environment::*;
37
38 pub const ROUTER: Router = Router::new().upgrade(&API_METHOD_UPGRADE_BACKUP);
39
40 #[sortable]
41 pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
42 &ApiHandler::AsyncHttp(&upgrade_to_backup_reader_protocol),
43 &ObjectSchema::new(
44 concat!(
45 "Upgraded to backup protocol ('",
46 PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(),
47 "')."
48 ),
49 &sorted!([
50 ("store", false, &DATASTORE_SCHEMA),
51 ("ns", true, &BACKUP_NAMESPACE_SCHEMA),
52 ("backup-type", false, &BACKUP_TYPE_SCHEMA),
53 ("backup-id", false, &BACKUP_ID_SCHEMA),
54 ("backup-time", false, &BACKUP_TIME_SCHEMA),
55 (
56 "debug",
57 true,
58 &BooleanSchema::new("Enable verbose debug logging.").schema()
59 ),
60 ]),
61 ),
62 )
63 .access(
64 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
65 Some("The user needs Datastore.Read privilege on /datastore/{store}."),
66 &Permission::Anybody,
67 );
68
69 fn upgrade_to_backup_reader_protocol(
70 parts: Parts,
71 req_body: Body,
72 param: Value,
73 _info: &ApiMethod,
74 rpcenv: Box<dyn RpcEnvironment>,
75 ) -> ApiResponseFuture {
76 async move {
77 let debug = param["debug"].as_bool().unwrap_or(false);
78
79 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
80 let store = required_string_param(&param, "store")?.to_owned();
81
82 let user_info = CachedUserInfo::new()?;
83 let privs = user_info.lookup_privs(&auth_id, &["datastore", &store]);
84
85 let priv_read = privs & PRIV_DATASTORE_READ != 0;
86 let priv_backup = privs & PRIV_DATASTORE_BACKUP != 0;
87
88 // priv_backup needs owner check further down below!
89 if !priv_read && !priv_backup {
90 bail!("no permissions on /datastore/{}", store);
91 }
92
93 let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
94
95 let backup_ns = optional_ns_param(&param)?;
96 let backup_dir = pbs_api_types::BackupDir::deserialize(&param)?;
97
98 let protocols = parts
99 .headers
100 .get("UPGRADE")
101 .ok_or_else(|| format_err!("missing Upgrade header"))?
102 .to_str()?;
103
104 if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() {
105 bail!("invalid protocol name");
106 }
107
108 if parts.version >= http::version::Version::HTTP_2 {
109 bail!(
110 "unexpected http version '{:?}' (expected version < 2)",
111 parts.version
112 );
113 }
114
115 let env_type = rpcenv.env_type();
116
117 let backup_dir = datastore.backup_dir(backup_ns, backup_dir)?;
118 if !priv_read {
119 let owner = backup_dir.get_owner()?;
120 let correct_owner = owner == auth_id
121 || (owner.is_token() && Authid::from(owner.user().clone()) == auth_id);
122 if !correct_owner {
123 bail!("backup owner check failed!");
124 }
125 }
126
127 let _guard = lock_dir_noblock_shared(
128 &backup_dir.full_path(),
129 "snapshot",
130 "locked by another operation",
131 )?;
132
133 let path = datastore.base_path();
134
135 //let files = BackupInfo::list_files(&path, &backup_dir)?;
136
137 // FIXME: include namespace here?
138 let worker_id = format!(
139 "{}:{}/{}/{:08X}",
140 store,
141 backup_dir.backup_type(),
142 backup_dir.backup_id(),
143 backup_dir.backup_time(),
144 );
145
146 WorkerTask::spawn(
147 "reader",
148 Some(worker_id),
149 auth_id.to_string(),
150 true,
151 move |worker| async move {
152 let _guard = _guard;
153
154 let mut env = ReaderEnvironment::new(
155 env_type,
156 auth_id,
157 worker.clone(),
158 datastore,
159 backup_dir,
160 );
161
162 env.debug = debug;
163
164 env.log(format!(
165 "starting new backup reader datastore '{}': {:?}",
166 store, path
167 ));
168
169 let service =
170 H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
171
172 let mut abort_future = worker
173 .abort_future()
174 .map(|_| Err(format_err!("task aborted")));
175
176 let env2 = env.clone();
177 let req_fut = async move {
178 let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?;
179 env2.debug("protocol upgrade done");
180
181 let mut http = hyper::server::conn::Http::new();
182 http.http2_only(true);
183 // increase window size: todo - find optiomal size
184 let window_size = 32 * 1024 * 1024; // max = (1 << 31) - 2
185 http.http2_initial_stream_window_size(window_size);
186 http.http2_initial_connection_window_size(window_size);
187 http.http2_max_frame_size(4 * 1024 * 1024);
188
189 http.serve_connection(conn, service)
190 .map_err(Error::from)
191 .await
192 };
193
194 futures::select! {
195 req = req_fut.fuse() => req?,
196 abort = abort_future => abort?,
197 };
198
199 env.log("reader finished successfully");
200
201 Ok(())
202 },
203 )?;
204
205 let response = Response::builder()
206 .status(StatusCode::SWITCHING_PROTOCOLS)
207 .header(
208 UPGRADE,
209 HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()),
210 )
211 .body(Body::empty())?;
212
213 Ok(response)
214 }
215 .boxed()
216 }
217
218 const READER_API_SUBDIRS: SubdirMap = &[
219 ("chunk", &Router::new().download(&API_METHOD_DOWNLOAD_CHUNK)),
220 (
221 "download",
222 &Router::new().download(&API_METHOD_DOWNLOAD_FILE),
223 ),
224 ("speedtest", &Router::new().download(&API_METHOD_SPEEDTEST)),
225 ];
226
227 pub const READER_API_ROUTER: Router = Router::new()
228 .get(&list_subdirs_api_method!(READER_API_SUBDIRS))
229 .subdirs(READER_API_SUBDIRS);
230
231 #[sortable]
232 pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
233 &ApiHandler::AsyncHttp(&download_file),
234 &ObjectSchema::new(
235 "Download specified file.",
236 &sorted!([("file-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA),]),
237 ),
238 );
239
240 fn download_file(
241 _parts: Parts,
242 _req_body: Body,
243 param: Value,
244 _info: &ApiMethod,
245 rpcenv: Box<dyn RpcEnvironment>,
246 ) -> ApiResponseFuture {
247 async move {
248 let env: &ReaderEnvironment = rpcenv.as_ref();
249
250 let file_name = required_string_param(&param, "file-name")?.to_owned();
251
252 let mut path = env.datastore.base_path();
253 path.push(env.backup_dir.relative_path());
254 path.push(&file_name);
255
256 env.log(format!("download {:?}", path.clone()));
257
258 let index: Option<Box<dyn IndexFile + Send>> = match archive_type(&file_name)? {
259 ArchiveType::FixedIndex => {
260 let index = env.datastore.open_fixed_reader(&path)?;
261 Some(Box::new(index))
262 }
263 ArchiveType::DynamicIndex => {
264 let index = env.datastore.open_dynamic_reader(&path)?;
265 Some(Box::new(index))
266 }
267 _ => None,
268 };
269
270 if let Some(index) = index {
271 env.log(format!(
272 "register chunks in '{}' as downloadable.",
273 file_name
274 ));
275
276 for pos in 0..index.index_count() {
277 let info = index.chunk_info(pos).unwrap();
278 env.register_chunk(info.digest);
279 }
280 }
281
282 helpers::create_download_response(path).await
283 }
284 .boxed()
285 }
286
287 #[sortable]
288 pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
289 &ApiHandler::AsyncHttp(&download_chunk),
290 &ObjectSchema::new(
291 "Download specified chunk.",
292 &sorted!([("digest", false, &CHUNK_DIGEST_SCHEMA),]),
293 ),
294 );
295
296 fn download_chunk(
297 _parts: Parts,
298 _req_body: Body,
299 param: Value,
300 _info: &ApiMethod,
301 rpcenv: Box<dyn RpcEnvironment>,
302 ) -> ApiResponseFuture {
303 async move {
304 let env: &ReaderEnvironment = rpcenv.as_ref();
305
306 let digest_str = required_string_param(&param, "digest")?;
307 let digest = <[u8; 32]>::from_hex(digest_str)?;
308
309 if !env.check_chunk_access(digest) {
310 env.log(format!(
311 "attempted to download chunk {} which is not in registered chunk list",
312 digest_str
313 ));
314 return Err(http_err!(
315 UNAUTHORIZED,
316 "download chunk {} not allowed",
317 digest_str
318 ));
319 }
320
321 let (path, _) = env.datastore.chunk_path(&digest);
322 let path2 = path.clone();
323
324 env.debug(format!("download chunk {:?}", path));
325
326 let data =
327 proxmox_async::runtime::block_in_place(|| std::fs::read(path)).map_err(move |err| {
328 http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err)
329 })?;
330
331 let body = Body::from(data);
332
333 // fixme: set other headers ?
334 Ok(Response::builder()
335 .status(StatusCode::OK)
336 .header(header::CONTENT_TYPE, "application/octet-stream")
337 .body(body)
338 .unwrap())
339 }
340 .boxed()
341 }
342
343 /* this is too slow
344 fn download_chunk_old(
345 _parts: Parts,
346 _req_body: Body,
347 param: Value,
348 _info: &ApiMethod,
349 rpcenv: Box<dyn RpcEnvironment>,
350 ) -> Result<ApiResponseFuture, Error> {
351
352 let env: &ReaderEnvironment = rpcenv.as_ref();
353 let env2 = env.clone();
354
355 let digest_str = required_string_param(&param, "digest")?;
356 let digest = <[u8; 32]>::from_hex(digest_str)?;
357
358 let (path, _) = env.datastore.chunk_path(&digest);
359
360 let path2 = path.clone();
361 let path3 = path.clone();
362
363 let response_future = tokio::fs::File::open(path)
364 .map_err(move |err| http_err!(BAD_REQUEST, "open file {:?} failed: {}", path2, err))
365 .and_then(move |file| {
366 env2.debug(format!("download chunk {:?}", path3));
367 let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
368 .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
369
370 let body = Body::wrap_stream(payload);
371
372 // fixme: set other headers ?
373 futures::future::ok(Response::builder()
374 .status(StatusCode::OK)
375 .header(header::CONTENT_TYPE, "application/octet-stream")
376 .body(body)
377 .unwrap())
378 });
379
380 Ok(Box::new(response_future))
381 }
382 */
383
384 pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
385 &ApiHandler::AsyncHttp(&speedtest),
386 &ObjectSchema::new("Test 1M block download speed.", &[]),
387 );
388
389 fn speedtest(
390 _parts: Parts,
391 _req_body: Body,
392 _param: Value,
393 _info: &ApiMethod,
394 _rpcenv: Box<dyn RpcEnvironment>,
395 ) -> ApiResponseFuture {
396 let buffer = vec![65u8; 1024 * 1024]; // nonsense [A,A,A...]
397
398 let body = Body::from(buffer);
399
400 let response = Response::builder()
401 .status(StatusCode::OK)
402 .header(header::CONTENT_TYPE, "application/octet-stream")
403 .body(body)
404 .unwrap();
405
406 future::ok(response).boxed()
407 }