]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/reader/mod.rs
use new proxmox-async crate
[proxmox-backup.git] / src / api2 / reader / mod.rs
CommitLineData
bf78f708
DM
1//! Backup reader/restore protocol (HTTP2 upgrade)
2
f7d4e4b5 3use anyhow::{bail, format_err, Error};
dd066d28
DM
4use futures::*;
5use hyper::header::{self, HeaderValue, UPGRADE};
dd066d28 6use hyper::http::request::Parts;
89e9134a 7use hyper::{Body, Response, Request, StatusCode};
dd066d28
DM
8use serde_json::Value;
9
6ef1b649
WB
10use proxmox::{identity, sortable};
11use proxmox_router::{
12 http_err, list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission,
13 Router, RpcEnvironment, SubdirMap,
5bc8e80a 14};
6ef1b649 15use proxmox_schema::{BooleanSchema, ObjectSchema};
5bc8e80a 16
8cc3760e
DM
17use pbs_api_types::{
18 Authid, DATASTORE_SCHEMA, BACKUP_TYPE_SCHEMA, BACKUP_TIME_SCHEMA, BACKUP_ID_SCHEMA,
19 CHUNK_DIGEST_SCHEMA, PRIV_DATASTORE_READ, PRIV_DATASTORE_BACKUP,
6227654a 20 BACKUP_ARCHIVE_NAME_SCHEMA,
8cc3760e 21};
770a36e5 22use pbs_tools::fs::lock_dir_noblock_shared;
3c8c2827 23use pbs_tools::json::{required_integer_param, required_string_param};
6d5d305d 24use pbs_datastore::{DataStore, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1};
b2065dc7
WB
25use pbs_datastore::backup_info::BackupDir;
26use pbs_datastore::index::IndexFile;
27use pbs_datastore::manifest::{archive_type, ArchiveType};
ba3d7e19 28use pbs_config::CachedUserInfo;
f7348a23 29use proxmox_rest_server::{WorkerTask, H2Service};
770a36e5 30
6d5d305d 31use crate::api2::helpers;
dd066d28
DM
32
33mod environment;
34use environment::*;
35
255f378a
DM
36pub const ROUTER: Router = Router::new()
37 .upgrade(&API_METHOD_UPGRADE_BACKUP);
38
552c2259 39#[sortable]
255f378a 40pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
329d40b5 41 &ApiHandler::AsyncHttp(&upgrade_to_backup_reader_protocol),
255f378a
DM
42 &ObjectSchema::new(
43 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "')."),
552c2259 44 &sorted!([
66c49c21 45 ("store", false, &DATASTORE_SCHEMA),
bc0d0388
DM
46 ("backup-type", false, &BACKUP_TYPE_SCHEMA),
47 ("backup-id", false, &BACKUP_ID_SCHEMA),
48 ("backup-time", false, &BACKUP_TIME_SCHEMA),
255f378a 49 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
552c2259 50 ]),
dd066d28 51 )
365f0f72
DM
52).access(
53 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
d00e1a21 54 Some("The user needs Datastore.Read privilege on /datastore/{store}."),
365f0f72
DM
55 &Permission::Anybody
56);
dd066d28
DM
57
58fn upgrade_to_backup_reader_protocol(
59 parts: Parts,
60 req_body: Body,
61 param: Value,
255f378a 62 _info: &ApiMethod,
dd066d28 63 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 64) -> ApiResponseFuture {
dd066d28 65
ad51d02a
DM
66 async move {
67 let debug = param["debug"].as_bool().unwrap_or(false);
dd066d28 68
e6dc35ac 69 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
3c8c2827 70 let store = required_string_param(&param, "store")?.to_owned();
365f0f72
DM
71
72 let user_info = CachedUserInfo::new()?;
e6dc35ac 73 let privs = user_info.lookup_privs(&auth_id, &["datastore", &store]);
19ca962b
FG
74
75 let priv_read = privs & PRIV_DATASTORE_READ != 0;
76 let priv_backup = privs & PRIV_DATASTORE_BACKUP != 0;
77
78 // priv_backup needs owner check further down below!
79 if !priv_read && !priv_backup {
80 bail!("no permissions on /datastore/{}", store);
81 }
365f0f72 82
ad51d02a 83 let datastore = DataStore::lookup_datastore(&store)?;
dd066d28 84
3c8c2827
WB
85 let backup_type = required_string_param(&param, "backup-type")?;
86 let backup_id = required_string_param(&param, "backup-id")?;
87 let backup_time = required_integer_param(&param, "backup-time")?;
dd066d28 88
ad51d02a
DM
89 let protocols = parts
90 .headers
91 .get("UPGRADE")
92 .ok_or_else(|| format_err!("missing Upgrade header"))?
dd066d28
DM
93 .to_str()?;
94
ad51d02a
DM
95 if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() {
96 bail!("invalid protocol name");
97 }
dd066d28 98
ad51d02a
DM
99 if parts.version >= http::version::Version::HTTP_2 {
100 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
101 }
dd066d28 102
ad51d02a 103 let env_type = rpcenv.env_type();
dd066d28 104
e0e5b442 105 let backup_dir = BackupDir::new(backup_type, backup_id, backup_time)?;
19ca962b
FG
106 if !priv_read {
107 let owner = datastore.get_owner(backup_dir.group())?;
bff85572
FG
108 let correct_owner = owner == auth_id
109 || (owner.is_token()
110 && Authid::from(owner.user().clone()) == auth_id);
111 if !correct_owner {
19ca962b
FG
112 bail!("backup owner check failed!");
113 }
114 }
115
238a872d
SR
116 let _guard = lock_dir_noblock_shared(
117 &datastore.snapshot_path(&backup_dir),
118 "snapshot",
119 "locked by another operation")?;
120
ad51d02a 121 let path = datastore.base_path();
dd066d28 122
ad51d02a 123 //let files = BackupInfo::list_files(&path, &backup_dir)?;
dd066d28 124
4ebda996 125 let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time());
dd066d28 126
049a22a3 127 WorkerTask::spawn("reader", Some(worker_id), auth_id.to_string(), true, move |worker| async move {
bdb6e6b8
DC
128 let _guard = _guard;
129
ad51d02a 130 let mut env = ReaderEnvironment::new(
e7cb4dc5 131 env_type,
e6dc35ac 132 auth_id,
e7cb4dc5
WB
133 worker.clone(),
134 datastore,
135 backup_dir,
136 );
dd066d28 137
ad51d02a 138 env.debug = debug;
dd066d28 139
ad51d02a 140 env.log(format!("starting new backup reader datastore '{}': {:?}", store, path));
dd066d28 141
ad51d02a 142 let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
dd066d28 143
bdb6e6b8
DC
144 let mut abort_future = worker.abort_future()
145 .map(|_| Err(format_err!("task aborted")));
146
147 let env2 = env.clone();
148 let req_fut = async move {
149 let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?;
150 env2.debug("protocol upgrade done");
151
152 let mut http = hyper::server::conn::Http::new();
153 http.http2_only(true);
154 // increase window size: todo - find optiomal size
155 let window_size = 32*1024*1024; // max = (1 << 31) - 2
156 http.http2_initial_stream_window_size(window_size);
157 http.http2_initial_connection_window_size(window_size);
158 http.http2_max_frame_size(4*1024*1024);
159
160 http.serve_connection(conn, service)
161 .map_err(Error::from).await
162 };
163
164 futures::select!{
165 req = req_fut.fuse() => req?,
166 abort = abort_future => abort?,
167 };
168
169 env.log("reader finished successfully");
170
171 Ok(())
ad51d02a 172 })?;
dd066d28 173
ad51d02a
DM
174 let response = Response::builder()
175 .status(StatusCode::SWITCHING_PROTOCOLS)
176 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
177 .body(Body::empty())?;
dd066d28 178
ad51d02a
DM
179 Ok(response)
180 }.boxed()
dd066d28
DM
181}
182
5bc8e80a
DM
183const READER_API_SUBDIRS: SubdirMap = &[
184 (
185 "chunk", &Router::new()
186 .download(&API_METHOD_DOWNLOAD_CHUNK)
187 ),
188 (
189 "download", &Router::new()
190 .download(&API_METHOD_DOWNLOAD_FILE)
191 ),
192 (
193 "speedtest", &Router::new()
194 .download(&API_METHOD_SPEEDTEST)
195 ),
196];
197
255f378a 198pub const READER_API_ROUTER: Router = Router::new()
5bc8e80a
DM
199 .get(&list_subdirs_api_method!(READER_API_SUBDIRS))
200 .subdirs(READER_API_SUBDIRS);
255f378a 201
552c2259 202#[sortable]
255f378a 203pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
329d40b5 204 &ApiHandler::AsyncHttp(&download_file),
255f378a
DM
205 &ObjectSchema::new(
206 "Download specified file.",
552c2259 207 &sorted!([
6227654a 208 ("file-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA),
552c2259 209 ]),
dd066d28 210 )
255f378a 211);
dd066d28
DM
212
213fn download_file(
214 _parts: Parts,
215 _req_body: Body,
216 param: Value,
255f378a 217 _info: &ApiMethod,
dd066d28 218 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 219) -> ApiResponseFuture {
dd066d28 220
ad51d02a
DM
221 async move {
222 let env: &ReaderEnvironment = rpcenv.as_ref();
dd066d28 223
3c8c2827 224 let file_name = required_string_param(&param, "file-name")?.to_owned();
dd066d28 225
ad51d02a
DM
226 let mut path = env.datastore.base_path();
227 path.push(env.backup_dir.relative_path());
228 path.push(&file_name);
dd066d28 229
e22f4882 230 env.log(format!("download {:?}", path.clone()));
5bc8e80a 231
d479f0c8
FG
232 let index: Option<Box<dyn IndexFile + Send>> = match archive_type(&file_name)? {
233 ArchiveType::FixedIndex => {
234 let index = env.datastore.open_fixed_reader(&path)?;
235 Some(Box::new(index))
236 }
237 ArchiveType::DynamicIndex => {
238 let index = env.datastore.open_dynamic_reader(&path)?;
239 Some(Box::new(index))
240 }
241 _ => { None }
242 };
243
244 if let Some(index) = index {
245 env.log(format!("register chunks in '{}' as downloadable.", file_name));
246
247 for pos in 0..index.index_count() {
248 let info = index.chunk_info(pos).unwrap();
249 env.register_chunk(info.digest);
250 }
251 }
dd066d28 252
e22f4882 253 helpers::create_download_response(path).await
ad51d02a 254 }.boxed()
dd066d28 255}
09d7dc50 256
552c2259 257#[sortable]
255f378a 258pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
329d40b5 259 &ApiHandler::AsyncHttp(&download_chunk),
255f378a
DM
260 &ObjectSchema::new(
261 "Download specified chunk.",
552c2259
DM
262 &sorted!([
263 ("digest", false, &CHUNK_DIGEST_SCHEMA),
264 ]),
09d7dc50 265 )
255f378a 266);
09d7dc50
DM
267
268fn download_chunk(
269 _parts: Parts,
270 _req_body: Body,
271 param: Value,
255f378a 272 _info: &ApiMethod,
09d7dc50 273 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 274) -> ApiResponseFuture {
09d7dc50 275
ad51d02a
DM
276 async move {
277 let env: &ReaderEnvironment = rpcenv.as_ref();
c0b1b14c 278
3c8c2827 279 let digest_str = required_string_param(&param, "digest")?;
ad51d02a 280 let digest = proxmox::tools::hex_to_digest(digest_str)?;
c0b1b14c 281
d479f0c8
FG
282 if !env.check_chunk_access(digest) {
283 env.log(format!("attempted to download chunk {} which is not in registered chunk list", digest_str));
284 return Err(http_err!(UNAUTHORIZED, "download chunk {} not allowed", digest_str));
285 }
286
ad51d02a
DM
287 let (path, _) = env.datastore.chunk_path(&digest);
288 let path2 = path.clone();
c0b1b14c 289
ad51d02a 290 env.debug(format!("download chunk {:?}", path));
c0b1b14c 291
9a1b24b6 292 let data = proxmox_async::runtime::block_in_place(|| std::fs::read(path))
8aa67ee7 293 .map_err(move |err| http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err))?;
f7aa6f15 294
ad51d02a 295 let body = Body::from(data);
f7aa6f15 296
ad51d02a
DM
297 // fixme: set other headers ?
298 Ok(Response::builder()
299 .status(StatusCode::OK)
300 .header(header::CONTENT_TYPE, "application/octet-stream")
301 .body(body)
302 .unwrap())
303 }.boxed()
c0b1b14c
DM
304}
305
306/* this is too slow
307fn download_chunk_old(
308 _parts: Parts,
309 _req_body: Body,
310 param: Value,
255f378a 311 _info: &ApiMethod,
c0b1b14c 312 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 313) -> Result<ApiResponseFuture, Error> {
c0b1b14c 314
09d7dc50
DM
315 let env: &ReaderEnvironment = rpcenv.as_ref();
316 let env2 = env.clone();
317
3c8c2827 318 let digest_str = required_string_param(&param, "digest")?;
09d7dc50
DM
319 let digest = proxmox::tools::hex_to_digest(digest_str)?;
320
321 let (path, _) = env.datastore.chunk_path(&digest);
322
323 let path2 = path.clone();
324 let path3 = path.clone();
325
326 let response_future = tokio::fs::File::open(path)
8aa67ee7 327 .map_err(move |err| http_err!(BAD_REQUEST, "open file {:?} failed: {}", path2, err))
09d7dc50
DM
328 .and_then(move |file| {
329 env2.debug(format!("download chunk {:?}", path3));
db0cb9ce
WB
330 let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
331 .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
09d7dc50
DM
332
333 let body = Body::wrap_stream(payload);
334
335 // fixme: set other headers ?
ffb64344 336 futures::future::ok(Response::builder()
09d7dc50
DM
337 .status(StatusCode::OK)
338 .header(header::CONTENT_TYPE, "application/octet-stream")
339 .body(body)
340 .unwrap())
341 });
342
343 Ok(Box::new(response_future))
344}
c0b1b14c 345*/
09d7dc50 346
255f378a 347pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
329d40b5 348 &ApiHandler::AsyncHttp(&speedtest),
4c7f100d 349 &ObjectSchema::new("Test 1M block download speed.", &[])
255f378a 350);
09d7dc50
DM
351
352fn speedtest(
353 _parts: Parts,
354 _req_body: Body,
355 _param: Value,
255f378a 356 _info: &ApiMethod,
09d7dc50 357 _rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 358) -> ApiResponseFuture {
09d7dc50 359
17243003 360 let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...]
09d7dc50
DM
361
362 let body = Body::from(buffer);
363
364 let response = Response::builder()
365 .status(StatusCode::OK)
366 .header(header::CONTENT_TYPE, "application/octet-stream")
367 .body(body)
368 .unwrap();
369
ad51d02a 370 future::ok(response).boxed()
09d7dc50 371}