]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/reader/mod.rs
2b11d1b1b006f22cfa48e01725c2a9a41b9bb161
[proxmox-backup.git] / src / api2 / reader / mod.rs
1 //! Backup reader/restore protocol (HTTP2 upgrade)
2
3 use anyhow::{bail, format_err, Error};
4 use futures::*;
5 use hyper::header::{self, HeaderValue, UPGRADE};
6 use hyper::http::request::Parts;
7 use hyper::{Body, Response, Request, StatusCode};
8 use serde_json::Value;
9 use hex::FromHex;
10
11 use proxmox_sys::sortable;
12 use proxmox_router::{
13 http_err, list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission,
14 Router, RpcEnvironment, SubdirMap,
15 };
16 use proxmox_schema::{BooleanSchema, ObjectSchema};
17
18 use pbs_api_types::{
19 Authid, DATASTORE_SCHEMA, BACKUP_TYPE_SCHEMA, BACKUP_TIME_SCHEMA, BACKUP_ID_SCHEMA,
20 CHUNK_DIGEST_SCHEMA, PRIV_DATASTORE_READ, PRIV_DATASTORE_BACKUP,
21 BACKUP_ARCHIVE_NAME_SCHEMA,
22 };
23 use proxmox_sys::fs::lock_dir_noblock_shared;
24 use pbs_tools::json::{required_integer_param, required_string_param};
25 use pbs_datastore::{DataStore, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1};
26 use pbs_datastore::backup_info::BackupDir;
27 use pbs_datastore::index::IndexFile;
28 use pbs_datastore::manifest::{archive_type, ArchiveType};
29 use pbs_config::CachedUserInfo;
30 use proxmox_rest_server::{WorkerTask, H2Service};
31
32 use crate::api2::helpers;
33
34 mod environment;
35 use environment::*;
36
37 pub const ROUTER: Router = Router::new()
38 .upgrade(&API_METHOD_UPGRADE_BACKUP);
39
40 #[sortable]
41 pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
42 &ApiHandler::AsyncHttp(&upgrade_to_backup_reader_protocol),
43 &ObjectSchema::new(
44 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "')."),
45 &sorted!([
46 ("store", false, &DATASTORE_SCHEMA),
47 ("backup-type", false, &BACKUP_TYPE_SCHEMA),
48 ("backup-id", false, &BACKUP_ID_SCHEMA),
49 ("backup-time", false, &BACKUP_TIME_SCHEMA),
50 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
51 ]),
52 )
53 ).access(
54 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
55 Some("The user needs Datastore.Read privilege on /datastore/{store}."),
56 &Permission::Anybody
57 );
58
59 fn upgrade_to_backup_reader_protocol(
60 parts: Parts,
61 req_body: Body,
62 param: Value,
63 _info: &ApiMethod,
64 rpcenv: Box<dyn RpcEnvironment>,
65 ) -> ApiResponseFuture {
66
67 async move {
68 let debug = param["debug"].as_bool().unwrap_or(false);
69
70 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
71 let store = required_string_param(&param, "store")?.to_owned();
72
73 let user_info = CachedUserInfo::new()?;
74 let privs = user_info.lookup_privs(&auth_id, &["datastore", &store]);
75
76 let priv_read = privs & PRIV_DATASTORE_READ != 0;
77 let priv_backup = privs & PRIV_DATASTORE_BACKUP != 0;
78
79 // priv_backup needs owner check further down below!
80 if !priv_read && !priv_backup {
81 bail!("no permissions on /datastore/{}", store);
82 }
83
84 let datastore = DataStore::lookup_datastore(&store)?;
85
86 let backup_type = required_string_param(&param, "backup-type")?;
87 let backup_id = required_string_param(&param, "backup-id")?;
88 let backup_time = required_integer_param(&param, "backup-time")?;
89
90 let protocols = parts
91 .headers
92 .get("UPGRADE")
93 .ok_or_else(|| format_err!("missing Upgrade header"))?
94 .to_str()?;
95
96 if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() {
97 bail!("invalid protocol name");
98 }
99
100 if parts.version >= http::version::Version::HTTP_2 {
101 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
102 }
103
104 let env_type = rpcenv.env_type();
105
106 let backup_dir = BackupDir::new(backup_type, backup_id, backup_time)?;
107 if !priv_read {
108 let owner = datastore.get_owner(backup_dir.group())?;
109 let correct_owner = owner == auth_id
110 || (owner.is_token()
111 && Authid::from(owner.user().clone()) == auth_id);
112 if !correct_owner {
113 bail!("backup owner check failed!");
114 }
115 }
116
117 let _guard = lock_dir_noblock_shared(
118 &datastore.snapshot_path(&backup_dir),
119 "snapshot",
120 "locked by another operation")?;
121
122 let path = datastore.base_path();
123
124 //let files = BackupInfo::list_files(&path, &backup_dir)?;
125
126 let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time());
127
128 WorkerTask::spawn("reader", Some(worker_id), auth_id.to_string(), true, move |worker| async move {
129 let _guard = _guard;
130
131 let mut env = ReaderEnvironment::new(
132 env_type,
133 auth_id,
134 worker.clone(),
135 datastore,
136 backup_dir,
137 );
138
139 env.debug = debug;
140
141 env.log(format!("starting new backup reader datastore '{}': {:?}", store, path));
142
143 let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
144
145 let mut abort_future = worker.abort_future()
146 .map(|_| Err(format_err!("task aborted")));
147
148 let env2 = env.clone();
149 let req_fut = async move {
150 let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?;
151 env2.debug("protocol upgrade done");
152
153 let mut http = hyper::server::conn::Http::new();
154 http.http2_only(true);
155 // increase window size: todo - find optiomal size
156 let window_size = 32*1024*1024; // max = (1 << 31) - 2
157 http.http2_initial_stream_window_size(window_size);
158 http.http2_initial_connection_window_size(window_size);
159 http.http2_max_frame_size(4*1024*1024);
160
161 http.serve_connection(conn, service)
162 .map_err(Error::from).await
163 };
164
165 futures::select!{
166 req = req_fut.fuse() => req?,
167 abort = abort_future => abort?,
168 };
169
170 env.log("reader finished successfully");
171
172 Ok(())
173 })?;
174
175 let response = Response::builder()
176 .status(StatusCode::SWITCHING_PROTOCOLS)
177 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
178 .body(Body::empty())?;
179
180 Ok(response)
181 }.boxed()
182 }
183
184 const READER_API_SUBDIRS: SubdirMap = &[
185 (
186 "chunk", &Router::new()
187 .download(&API_METHOD_DOWNLOAD_CHUNK)
188 ),
189 (
190 "download", &Router::new()
191 .download(&API_METHOD_DOWNLOAD_FILE)
192 ),
193 (
194 "speedtest", &Router::new()
195 .download(&API_METHOD_SPEEDTEST)
196 ),
197 ];
198
199 pub const READER_API_ROUTER: Router = Router::new()
200 .get(&list_subdirs_api_method!(READER_API_SUBDIRS))
201 .subdirs(READER_API_SUBDIRS);
202
203 #[sortable]
204 pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
205 &ApiHandler::AsyncHttp(&download_file),
206 &ObjectSchema::new(
207 "Download specified file.",
208 &sorted!([
209 ("file-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA),
210 ]),
211 )
212 );
213
214 fn download_file(
215 _parts: Parts,
216 _req_body: Body,
217 param: Value,
218 _info: &ApiMethod,
219 rpcenv: Box<dyn RpcEnvironment>,
220 ) -> ApiResponseFuture {
221
222 async move {
223 let env: &ReaderEnvironment = rpcenv.as_ref();
224
225 let file_name = required_string_param(&param, "file-name")?.to_owned();
226
227 let mut path = env.datastore.base_path();
228 path.push(env.backup_dir.relative_path());
229 path.push(&file_name);
230
231 env.log(format!("download {:?}", path.clone()));
232
233 let index: Option<Box<dyn IndexFile + Send>> = match archive_type(&file_name)? {
234 ArchiveType::FixedIndex => {
235 let index = env.datastore.open_fixed_reader(&path)?;
236 Some(Box::new(index))
237 }
238 ArchiveType::DynamicIndex => {
239 let index = env.datastore.open_dynamic_reader(&path)?;
240 Some(Box::new(index))
241 }
242 _ => { None }
243 };
244
245 if let Some(index) = index {
246 env.log(format!("register chunks in '{}' as downloadable.", file_name));
247
248 for pos in 0..index.index_count() {
249 let info = index.chunk_info(pos).unwrap();
250 env.register_chunk(info.digest);
251 }
252 }
253
254 helpers::create_download_response(path).await
255 }.boxed()
256 }
257
258 #[sortable]
259 pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
260 &ApiHandler::AsyncHttp(&download_chunk),
261 &ObjectSchema::new(
262 "Download specified chunk.",
263 &sorted!([
264 ("digest", false, &CHUNK_DIGEST_SCHEMA),
265 ]),
266 )
267 );
268
269 fn download_chunk(
270 _parts: Parts,
271 _req_body: Body,
272 param: Value,
273 _info: &ApiMethod,
274 rpcenv: Box<dyn RpcEnvironment>,
275 ) -> ApiResponseFuture {
276
277 async move {
278 let env: &ReaderEnvironment = rpcenv.as_ref();
279
280 let digest_str = required_string_param(&param, "digest")?;
281 let digest = <[u8; 32]>::from_hex(digest_str)?;
282
283 if !env.check_chunk_access(digest) {
284 env.log(format!("attempted to download chunk {} which is not in registered chunk list", digest_str));
285 return Err(http_err!(UNAUTHORIZED, "download chunk {} not allowed", digest_str));
286 }
287
288 let (path, _) = env.datastore.chunk_path(&digest);
289 let path2 = path.clone();
290
291 env.debug(format!("download chunk {:?}", path));
292
293 let data = proxmox_async::runtime::block_in_place(|| std::fs::read(path))
294 .map_err(move |err| http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err))?;
295
296 let body = Body::from(data);
297
298 // fixme: set other headers ?
299 Ok(Response::builder()
300 .status(StatusCode::OK)
301 .header(header::CONTENT_TYPE, "application/octet-stream")
302 .body(body)
303 .unwrap())
304 }.boxed()
305 }
306
307 /* this is too slow
308 fn download_chunk_old(
309 _parts: Parts,
310 _req_body: Body,
311 param: Value,
312 _info: &ApiMethod,
313 rpcenv: Box<dyn RpcEnvironment>,
314 ) -> Result<ApiResponseFuture, Error> {
315
316 let env: &ReaderEnvironment = rpcenv.as_ref();
317 let env2 = env.clone();
318
319 let digest_str = required_string_param(&param, "digest")?;
320 let digest = <[u8; 32]>::from_hex(digest_str)?;
321
322 let (path, _) = env.datastore.chunk_path(&digest);
323
324 let path2 = path.clone();
325 let path3 = path.clone();
326
327 let response_future = tokio::fs::File::open(path)
328 .map_err(move |err| http_err!(BAD_REQUEST, "open file {:?} failed: {}", path2, err))
329 .and_then(move |file| {
330 env2.debug(format!("download chunk {:?}", path3));
331 let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
332 .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
333
334 let body = Body::wrap_stream(payload);
335
336 // fixme: set other headers ?
337 futures::future::ok(Response::builder()
338 .status(StatusCode::OK)
339 .header(header::CONTENT_TYPE, "application/octet-stream")
340 .body(body)
341 .unwrap())
342 });
343
344 Ok(Box::new(response_future))
345 }
346 */
347
348 pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
349 &ApiHandler::AsyncHttp(&speedtest),
350 &ObjectSchema::new("Test 1M block download speed.", &[])
351 );
352
353 fn speedtest(
354 _parts: Parts,
355 _req_body: Body,
356 _param: Value,
357 _info: &ApiMethod,
358 _rpcenv: Box<dyn RpcEnvironment>,
359 ) -> ApiResponseFuture {
360
361 let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...]
362
363 let body = Body::from(buffer);
364
365 let response = Response::builder()
366 .status(StatusCode::OK)
367 .header(header::CONTENT_TYPE, "application/octet-stream")
368 .body(body)
369 .unwrap();
370
371 future::ok(response).boxed()
372 }