]>
Commit | Line | Data |
---|---|---|
bf78f708 DM |
1 | //! Backup reader/restore protocol (HTTP2 upgrade) |
2 | ||
f7d4e4b5 | 3 | use anyhow::{bail, format_err, Error}; |
dd066d28 DM |
4 | use futures::*; |
5 | use hyper::header::{self, HeaderValue, UPGRADE}; | |
dd066d28 | 6 | use hyper::http::request::Parts; |
89e9134a | 7 | use hyper::{Body, Response, Request, StatusCode}; |
dd066d28 DM |
8 | use serde_json::Value; |
9 | ||
6ef1b649 WB |
10 | use proxmox::{identity, sortable}; |
11 | use proxmox_router::{ | |
12 | http_err, list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission, | |
13 | Router, RpcEnvironment, SubdirMap, | |
5bc8e80a | 14 | }; |
6ef1b649 | 15 | use proxmox_schema::{BooleanSchema, ObjectSchema}; |
5bc8e80a | 16 | |
8cc3760e DM |
17 | use pbs_api_types::{ |
18 | Authid, DATASTORE_SCHEMA, BACKUP_TYPE_SCHEMA, BACKUP_TIME_SCHEMA, BACKUP_ID_SCHEMA, | |
19 | CHUNK_DIGEST_SCHEMA, PRIV_DATASTORE_READ, PRIV_DATASTORE_BACKUP, | |
6227654a | 20 | BACKUP_ARCHIVE_NAME_SCHEMA, |
8cc3760e | 21 | }; |
770a36e5 | 22 | use pbs_tools::fs::lock_dir_noblock_shared; |
3c8c2827 | 23 | use pbs_tools::json::{required_integer_param, required_string_param}; |
6d5d305d | 24 | use pbs_datastore::{DataStore, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1}; |
b2065dc7 WB |
25 | use pbs_datastore::backup_info::BackupDir; |
26 | use pbs_datastore::index::IndexFile; | |
27 | use pbs_datastore::manifest::{archive_type, ArchiveType}; | |
ba3d7e19 | 28 | use pbs_config::CachedUserInfo; |
f7348a23 | 29 | use proxmox_rest_server::{WorkerTask, H2Service}; |
770a36e5 | 30 | |
6d5d305d | 31 | use crate::api2::helpers; |
dd066d28 DM |
32 | |
33 | mod environment; | |
34 | use environment::*; | |
35 | ||
255f378a DM |
36 | pub const ROUTER: Router = Router::new() |
37 | .upgrade(&API_METHOD_UPGRADE_BACKUP); | |
38 | ||
552c2259 | 39 | #[sortable] |
255f378a | 40 | pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new( |
329d40b5 | 41 | &ApiHandler::AsyncHttp(&upgrade_to_backup_reader_protocol), |
255f378a DM |
42 | &ObjectSchema::new( |
43 | concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "')."), | |
552c2259 | 44 | &sorted!([ |
66c49c21 | 45 | ("store", false, &DATASTORE_SCHEMA), |
bc0d0388 DM |
46 | ("backup-type", false, &BACKUP_TYPE_SCHEMA), |
47 | ("backup-id", false, &BACKUP_ID_SCHEMA), | |
48 | ("backup-time", false, &BACKUP_TIME_SCHEMA), | |
255f378a | 49 | ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()), |
552c2259 | 50 | ]), |
dd066d28 | 51 | ) |
365f0f72 DM |
52 | ).access( |
53 | // Note: parameter 'store' is no uri parameter, so we need to test inside function body | |
d00e1a21 | 54 | Some("The user needs Datastore.Read privilege on /datastore/{store}."), |
365f0f72 DM |
55 | &Permission::Anybody |
56 | ); | |
dd066d28 DM |
57 | |
58 | fn upgrade_to_backup_reader_protocol( | |
59 | parts: Parts, | |
60 | req_body: Body, | |
61 | param: Value, | |
255f378a | 62 | _info: &ApiMethod, |
dd066d28 | 63 | rpcenv: Box<dyn RpcEnvironment>, |
bb084b9c | 64 | ) -> ApiResponseFuture { |
dd066d28 | 65 | |
ad51d02a DM |
66 | async move { |
67 | let debug = param["debug"].as_bool().unwrap_or(false); | |
dd066d28 | 68 | |
e6dc35ac | 69 | let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; |
3c8c2827 | 70 | let store = required_string_param(¶m, "store")?.to_owned(); |
365f0f72 DM |
71 | |
72 | let user_info = CachedUserInfo::new()?; | |
e6dc35ac | 73 | let privs = user_info.lookup_privs(&auth_id, &["datastore", &store]); |
19ca962b FG |
74 | |
75 | let priv_read = privs & PRIV_DATASTORE_READ != 0; | |
76 | let priv_backup = privs & PRIV_DATASTORE_BACKUP != 0; | |
77 | ||
78 | // priv_backup needs owner check further down below! | |
79 | if !priv_read && !priv_backup { | |
80 | bail!("no permissions on /datastore/{}", store); | |
81 | } | |
365f0f72 | 82 | |
ad51d02a | 83 | let datastore = DataStore::lookup_datastore(&store)?; |
dd066d28 | 84 | |
3c8c2827 WB |
85 | let backup_type = required_string_param(¶m, "backup-type")?; |
86 | let backup_id = required_string_param(¶m, "backup-id")?; | |
87 | let backup_time = required_integer_param(¶m, "backup-time")?; | |
dd066d28 | 88 | |
ad51d02a DM |
89 | let protocols = parts |
90 | .headers | |
91 | .get("UPGRADE") | |
92 | .ok_or_else(|| format_err!("missing Upgrade header"))? | |
dd066d28 DM |
93 | .to_str()?; |
94 | ||
ad51d02a DM |
95 | if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() { |
96 | bail!("invalid protocol name"); | |
97 | } | |
dd066d28 | 98 | |
ad51d02a DM |
99 | if parts.version >= http::version::Version::HTTP_2 { |
100 | bail!("unexpected http version '{:?}' (expected version < 2)", parts.version); | |
101 | } | |
dd066d28 | 102 | |
ad51d02a | 103 | let env_type = rpcenv.env_type(); |
dd066d28 | 104 | |
e0e5b442 | 105 | let backup_dir = BackupDir::new(backup_type, backup_id, backup_time)?; |
19ca962b FG |
106 | if !priv_read { |
107 | let owner = datastore.get_owner(backup_dir.group())?; | |
bff85572 FG |
108 | let correct_owner = owner == auth_id |
109 | || (owner.is_token() | |
110 | && Authid::from(owner.user().clone()) == auth_id); | |
111 | if !correct_owner { | |
19ca962b FG |
112 | bail!("backup owner check failed!"); |
113 | } | |
114 | } | |
115 | ||
238a872d SR |
116 | let _guard = lock_dir_noblock_shared( |
117 | &datastore.snapshot_path(&backup_dir), | |
118 | "snapshot", | |
119 | "locked by another operation")?; | |
120 | ||
ad51d02a | 121 | let path = datastore.base_path(); |
dd066d28 | 122 | |
ad51d02a | 123 | //let files = BackupInfo::list_files(&path, &backup_dir)?; |
dd066d28 | 124 | |
4ebda996 | 125 | let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time()); |
dd066d28 | 126 | |
049a22a3 | 127 | WorkerTask::spawn("reader", Some(worker_id), auth_id.to_string(), true, move |worker| async move { |
bdb6e6b8 DC |
128 | let _guard = _guard; |
129 | ||
ad51d02a | 130 | let mut env = ReaderEnvironment::new( |
e7cb4dc5 | 131 | env_type, |
e6dc35ac | 132 | auth_id, |
e7cb4dc5 WB |
133 | worker.clone(), |
134 | datastore, | |
135 | backup_dir, | |
136 | ); | |
dd066d28 | 137 | |
ad51d02a | 138 | env.debug = debug; |
dd066d28 | 139 | |
ad51d02a | 140 | env.log(format!("starting new backup reader datastore '{}': {:?}", store, path)); |
dd066d28 | 141 | |
ad51d02a | 142 | let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug); |
dd066d28 | 143 | |
bdb6e6b8 DC |
144 | let mut abort_future = worker.abort_future() |
145 | .map(|_| Err(format_err!("task aborted"))); | |
146 | ||
147 | let env2 = env.clone(); | |
148 | let req_fut = async move { | |
149 | let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?; | |
150 | env2.debug("protocol upgrade done"); | |
151 | ||
152 | let mut http = hyper::server::conn::Http::new(); | |
153 | http.http2_only(true); | |
154 | // increase window size: todo - find optiomal size | |
155 | let window_size = 32*1024*1024; // max = (1 << 31) - 2 | |
156 | http.http2_initial_stream_window_size(window_size); | |
157 | http.http2_initial_connection_window_size(window_size); | |
158 | http.http2_max_frame_size(4*1024*1024); | |
159 | ||
160 | http.serve_connection(conn, service) | |
161 | .map_err(Error::from).await | |
162 | }; | |
163 | ||
164 | futures::select!{ | |
165 | req = req_fut.fuse() => req?, | |
166 | abort = abort_future => abort?, | |
167 | }; | |
168 | ||
169 | env.log("reader finished successfully"); | |
170 | ||
171 | Ok(()) | |
ad51d02a | 172 | })?; |
dd066d28 | 173 | |
ad51d02a DM |
174 | let response = Response::builder() |
175 | .status(StatusCode::SWITCHING_PROTOCOLS) | |
176 | .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())) | |
177 | .body(Body::empty())?; | |
dd066d28 | 178 | |
ad51d02a DM |
179 | Ok(response) |
180 | }.boxed() | |
dd066d28 DM |
181 | } |
182 | ||
5bc8e80a DM |
183 | const READER_API_SUBDIRS: SubdirMap = &[ |
184 | ( | |
185 | "chunk", &Router::new() | |
186 | .download(&API_METHOD_DOWNLOAD_CHUNK) | |
187 | ), | |
188 | ( | |
189 | "download", &Router::new() | |
190 | .download(&API_METHOD_DOWNLOAD_FILE) | |
191 | ), | |
192 | ( | |
193 | "speedtest", &Router::new() | |
194 | .download(&API_METHOD_SPEEDTEST) | |
195 | ), | |
196 | ]; | |
197 | ||
255f378a | 198 | pub const READER_API_ROUTER: Router = Router::new() |
5bc8e80a DM |
199 | .get(&list_subdirs_api_method!(READER_API_SUBDIRS)) |
200 | .subdirs(READER_API_SUBDIRS); | |
255f378a | 201 | |
552c2259 | 202 | #[sortable] |
255f378a | 203 | pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new( |
329d40b5 | 204 | &ApiHandler::AsyncHttp(&download_file), |
255f378a DM |
205 | &ObjectSchema::new( |
206 | "Download specified file.", | |
552c2259 | 207 | &sorted!([ |
6227654a | 208 | ("file-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA), |
552c2259 | 209 | ]), |
dd066d28 | 210 | ) |
255f378a | 211 | ); |
dd066d28 DM |
212 | |
213 | fn download_file( | |
214 | _parts: Parts, | |
215 | _req_body: Body, | |
216 | param: Value, | |
255f378a | 217 | _info: &ApiMethod, |
dd066d28 | 218 | rpcenv: Box<dyn RpcEnvironment>, |
bb084b9c | 219 | ) -> ApiResponseFuture { |
dd066d28 | 220 | |
ad51d02a DM |
221 | async move { |
222 | let env: &ReaderEnvironment = rpcenv.as_ref(); | |
dd066d28 | 223 | |
3c8c2827 | 224 | let file_name = required_string_param(¶m, "file-name")?.to_owned(); |
dd066d28 | 225 | |
ad51d02a DM |
226 | let mut path = env.datastore.base_path(); |
227 | path.push(env.backup_dir.relative_path()); | |
228 | path.push(&file_name); | |
dd066d28 | 229 | |
e22f4882 | 230 | env.log(format!("download {:?}", path.clone())); |
5bc8e80a | 231 | |
d479f0c8 FG |
232 | let index: Option<Box<dyn IndexFile + Send>> = match archive_type(&file_name)? { |
233 | ArchiveType::FixedIndex => { | |
234 | let index = env.datastore.open_fixed_reader(&path)?; | |
235 | Some(Box::new(index)) | |
236 | } | |
237 | ArchiveType::DynamicIndex => { | |
238 | let index = env.datastore.open_dynamic_reader(&path)?; | |
239 | Some(Box::new(index)) | |
240 | } | |
241 | _ => { None } | |
242 | }; | |
243 | ||
244 | if let Some(index) = index { | |
245 | env.log(format!("register chunks in '{}' as downloadable.", file_name)); | |
246 | ||
247 | for pos in 0..index.index_count() { | |
248 | let info = index.chunk_info(pos).unwrap(); | |
249 | env.register_chunk(info.digest); | |
250 | } | |
251 | } | |
dd066d28 | 252 | |
e22f4882 | 253 | helpers::create_download_response(path).await |
ad51d02a | 254 | }.boxed() |
dd066d28 | 255 | } |
09d7dc50 | 256 | |
552c2259 | 257 | #[sortable] |
255f378a | 258 | pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new( |
329d40b5 | 259 | &ApiHandler::AsyncHttp(&download_chunk), |
255f378a DM |
260 | &ObjectSchema::new( |
261 | "Download specified chunk.", | |
552c2259 DM |
262 | &sorted!([ |
263 | ("digest", false, &CHUNK_DIGEST_SCHEMA), | |
264 | ]), | |
09d7dc50 | 265 | ) |
255f378a | 266 | ); |
09d7dc50 DM |
267 | |
268 | fn download_chunk( | |
269 | _parts: Parts, | |
270 | _req_body: Body, | |
271 | param: Value, | |
255f378a | 272 | _info: &ApiMethod, |
09d7dc50 | 273 | rpcenv: Box<dyn RpcEnvironment>, |
bb084b9c | 274 | ) -> ApiResponseFuture { |
09d7dc50 | 275 | |
ad51d02a DM |
276 | async move { |
277 | let env: &ReaderEnvironment = rpcenv.as_ref(); | |
c0b1b14c | 278 | |
3c8c2827 | 279 | let digest_str = required_string_param(¶m, "digest")?; |
ad51d02a | 280 | let digest = proxmox::tools::hex_to_digest(digest_str)?; |
c0b1b14c | 281 | |
d479f0c8 FG |
282 | if !env.check_chunk_access(digest) { |
283 | env.log(format!("attempted to download chunk {} which is not in registered chunk list", digest_str)); | |
284 | return Err(http_err!(UNAUTHORIZED, "download chunk {} not allowed", digest_str)); | |
285 | } | |
286 | ||
ad51d02a DM |
287 | let (path, _) = env.datastore.chunk_path(&digest); |
288 | let path2 = path.clone(); | |
c0b1b14c | 289 | |
ad51d02a | 290 | env.debug(format!("download chunk {:?}", path)); |
c0b1b14c | 291 | |
9a1b24b6 | 292 | let data = proxmox_async::runtime::block_in_place(|| std::fs::read(path)) |
8aa67ee7 | 293 | .map_err(move |err| http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err))?; |
f7aa6f15 | 294 | |
ad51d02a | 295 | let body = Body::from(data); |
f7aa6f15 | 296 | |
ad51d02a DM |
297 | // fixme: set other headers ? |
298 | Ok(Response::builder() | |
299 | .status(StatusCode::OK) | |
300 | .header(header::CONTENT_TYPE, "application/octet-stream") | |
301 | .body(body) | |
302 | .unwrap()) | |
303 | }.boxed() | |
c0b1b14c DM |
304 | } |
305 | ||
306 | /* this is too slow | |
307 | fn download_chunk_old( | |
308 | _parts: Parts, | |
309 | _req_body: Body, | |
310 | param: Value, | |
255f378a | 311 | _info: &ApiMethod, |
c0b1b14c | 312 | rpcenv: Box<dyn RpcEnvironment>, |
bb084b9c | 313 | ) -> Result<ApiResponseFuture, Error> { |
c0b1b14c | 314 | |
09d7dc50 DM |
315 | let env: &ReaderEnvironment = rpcenv.as_ref(); |
316 | let env2 = env.clone(); | |
317 | ||
3c8c2827 | 318 | let digest_str = required_string_param(¶m, "digest")?; |
09d7dc50 DM |
319 | let digest = proxmox::tools::hex_to_digest(digest_str)?; |
320 | ||
321 | let (path, _) = env.datastore.chunk_path(&digest); | |
322 | ||
323 | let path2 = path.clone(); | |
324 | let path3 = path.clone(); | |
325 | ||
326 | let response_future = tokio::fs::File::open(path) | |
8aa67ee7 | 327 | .map_err(move |err| http_err!(BAD_REQUEST, "open file {:?} failed: {}", path2, err)) |
09d7dc50 DM |
328 | .and_then(move |file| { |
329 | env2.debug(format!("download chunk {:?}", path3)); | |
db0cb9ce WB |
330 | let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new()) |
331 | .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze())); | |
09d7dc50 DM |
332 | |
333 | let body = Body::wrap_stream(payload); | |
334 | ||
335 | // fixme: set other headers ? | |
ffb64344 | 336 | futures::future::ok(Response::builder() |
09d7dc50 DM |
337 | .status(StatusCode::OK) |
338 | .header(header::CONTENT_TYPE, "application/octet-stream") | |
339 | .body(body) | |
340 | .unwrap()) | |
341 | }); | |
342 | ||
343 | Ok(Box::new(response_future)) | |
344 | } | |
c0b1b14c | 345 | */ |
09d7dc50 | 346 | |
255f378a | 347 | pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new( |
329d40b5 | 348 | &ApiHandler::AsyncHttp(&speedtest), |
4c7f100d | 349 | &ObjectSchema::new("Test 1M block download speed.", &[]) |
255f378a | 350 | ); |
09d7dc50 DM |
351 | |
352 | fn speedtest( | |
353 | _parts: Parts, | |
354 | _req_body: Body, | |
355 | _param: Value, | |
255f378a | 356 | _info: &ApiMethod, |
09d7dc50 | 357 | _rpcenv: Box<dyn RpcEnvironment>, |
bb084b9c | 358 | ) -> ApiResponseFuture { |
09d7dc50 | 359 | |
17243003 | 360 | let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...] |
09d7dc50 DM |
361 | |
362 | let body = Body::from(buffer); | |
363 | ||
364 | let response = Response::builder() | |
365 | .status(StatusCode::OK) | |
366 | .header(header::CONTENT_TYPE, "application/octet-stream") | |
367 | .body(body) | |
368 | .unwrap(); | |
369 | ||
ad51d02a | 370 | future::ok(response).boxed() |
09d7dc50 | 371 | } |