]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/reader.rs
src/config/acl.rs: introduce more/better datastore privileges
[proxmox-backup.git] / src / api2 / reader.rs
CommitLineData
cad540e9 1//use chrono::{Local, TimeZone};
f7d4e4b5 2use anyhow::{bail, format_err, Error};
dd066d28
DM
3use futures::*;
4use hyper::header::{self, HeaderValue, UPGRADE};
dd066d28 5use hyper::http::request::Parts;
cad540e9 6use hyper::{Body, Response, StatusCode};
dd066d28
DM
7use serde_json::Value;
8
552c2259 9use proxmox::{sortable, identity};
73ce1d11 10use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironment, Permission};
cad540e9 11use proxmox::api::schema::*;
9ea4bce4 12use proxmox::http_err;
552c2259 13
09d7dc50 14use crate::api2::types::*;
cad540e9
WB
15use crate::backup::*;
16use crate::server::{WorkerTask, H2Service};
17use crate::tools;
d00e1a21 18use crate::config::acl::PRIV_DATASTORE_READ;
365f0f72 19use crate::config::cached_user_info::CachedUserInfo;
dd066d28
DM
20
21mod environment;
22use environment::*;
23
255f378a
DM
24pub const ROUTER: Router = Router::new()
25 .upgrade(&API_METHOD_UPGRADE_BACKUP);
26
552c2259 27#[sortable]
255f378a 28pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
329d40b5 29 &ApiHandler::AsyncHttp(&upgrade_to_backup_reader_protocol),
255f378a
DM
30 &ObjectSchema::new(
31 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "')."),
552c2259 32 &sorted!([
66c49c21 33 ("store", false, &DATASTORE_SCHEMA),
255f378a
DM
34 ("backup-type", false, &StringSchema::new("Backup type.")
35 .format(&ApiStringFormat::Enum(&["vm", "ct", "host"]))
36 .schema()
37 ),
38 ("backup-id", false, &StringSchema::new("Backup ID.").schema()),
39 ("backup-time", false, &IntegerSchema::new("Backup time (Unix epoch.)")
40 .minimum(1_547_797_308)
41 .schema()
42 ),
43 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
552c2259 44 ]),
dd066d28 45 )
365f0f72
DM
46).access(
47 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
d00e1a21 48 Some("The user needs Datastore.Read privilege on /datastore/{store}."),
365f0f72
DM
49 &Permission::Anybody
50);
dd066d28
DM
51
52fn upgrade_to_backup_reader_protocol(
53 parts: Parts,
54 req_body: Body,
55 param: Value,
255f378a 56 _info: &ApiMethod,
dd066d28 57 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 58) -> ApiResponseFuture {
dd066d28 59
ad51d02a
DM
60 async move {
61 let debug = param["debug"].as_bool().unwrap_or(false);
dd066d28 62
365f0f72 63 let username = rpcenv.get_user().unwrap();
ad51d02a 64 let store = tools::required_string_param(&param, "store")?.to_owned();
365f0f72
DM
65
66 let user_info = CachedUserInfo::new()?;
d00e1a21 67 user_info.check_privs(&username, &["datastore", &store], PRIV_DATASTORE_READ, false)?;
365f0f72 68
ad51d02a 69 let datastore = DataStore::lookup_datastore(&store)?;
dd066d28 70
ad51d02a
DM
71 let backup_type = tools::required_string_param(&param, "backup-type")?;
72 let backup_id = tools::required_string_param(&param, "backup-id")?;
73 let backup_time = tools::required_integer_param(&param, "backup-time")?;
dd066d28 74
ad51d02a
DM
75 let protocols = parts
76 .headers
77 .get("UPGRADE")
78 .ok_or_else(|| format_err!("missing Upgrade header"))?
dd066d28
DM
79 .to_str()?;
80
ad51d02a
DM
81 if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() {
82 bail!("invalid protocol name");
83 }
dd066d28 84
ad51d02a
DM
85 if parts.version >= http::version::Version::HTTP_2 {
86 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
87 }
dd066d28 88
ad51d02a 89 let env_type = rpcenv.env_type();
dd066d28 90
ad51d02a
DM
91 let backup_dir = BackupDir::new(backup_type, backup_id, backup_time);
92 let path = datastore.base_path();
dd066d28 93
ad51d02a 94 //let files = BackupInfo::list_files(&path, &backup_dir)?;
dd066d28 95
ad51d02a 96 let worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_dir.backup_time().timestamp());
dd066d28 97
ad51d02a
DM
98 WorkerTask::spawn("reader", Some(worker_id), &username.clone(), true, move |worker| {
99 let mut env = ReaderEnvironment::new(
100 env_type, username.clone(), worker.clone(), datastore, backup_dir);
dd066d28 101
ad51d02a 102 env.debug = debug;
dd066d28 103
ad51d02a 104 env.log(format!("starting new backup reader datastore '{}': {:?}", store, path));
dd066d28 105
ad51d02a 106 let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
dd066d28 107
ad51d02a 108 let abort_future = worker.abort_future();
dd066d28 109
ad51d02a
DM
110 let req_fut = req_body
111 .on_upgrade()
112 .map_err(Error::from)
113 .and_then({
114 let env = env.clone();
115 move |conn| {
116 env.debug("protocol upgrade done");
ffb64344 117
ad51d02a
DM
118 let mut http = hyper::server::conn::Http::new();
119 http.http2_only(true);
120 // increase window size: todo - find optiomal size
121 let window_size = 32*1024*1024; // max = (1 << 31) - 2
122 http.http2_initial_stream_window_size(window_size);
123 http.http2_initial_connection_window_size(window_size);
ffb64344 124
ad51d02a
DM
125 http.serve_connection(conn, service)
126 .map_err(Error::from)
127 }
128 });
129 let abort_future = abort_future
130 .map(|_| Err(format_err!("task aborted")));
ffb64344 131
ad51d02a
DM
132 use futures::future::Either;
133 futures::future::select(req_fut, abort_future)
134 .map(|res| match res {
135 Either::Left((Ok(res), _)) => Ok(res),
136 Either::Left((Err(err), _)) => Err(err),
137 Either::Right((Ok(res), _)) => Ok(res),
138 Either::Right((Err(err), _)) => Err(err),
139 })
140 .map_ok(move |_| env.log("reader finished sucessfully"))
141 })?;
dd066d28 142
ad51d02a
DM
143 let response = Response::builder()
144 .status(StatusCode::SWITCHING_PROTOCOLS)
145 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
146 .body(Body::empty())?;
dd066d28 147
ad51d02a
DM
148 Ok(response)
149 }.boxed()
dd066d28
DM
150}
151
255f378a
DM
152pub const READER_API_ROUTER: Router = Router::new()
153 .subdirs(&[
154 (
155 "chunk", &Router::new()
156 .download(&API_METHOD_DOWNLOAD_CHUNK)
157 ),
158 (
159 "download", &Router::new()
160 .download(&API_METHOD_DOWNLOAD_FILE)
161 ),
162 (
163 "speedtest", &Router::new()
164 .download(&API_METHOD_SPEEDTEST)
165 ),
166 ]);
167
552c2259 168#[sortable]
255f378a 169pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
329d40b5 170 &ApiHandler::AsyncHttp(&download_file),
255f378a
DM
171 &ObjectSchema::new(
172 "Download specified file.",
552c2259
DM
173 &sorted!([
174 ("file-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
175 ]),
dd066d28 176 )
255f378a 177);
dd066d28
DM
178
179fn download_file(
180 _parts: Parts,
181 _req_body: Body,
182 param: Value,
255f378a 183 _info: &ApiMethod,
dd066d28 184 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 185) -> ApiResponseFuture {
dd066d28 186
ad51d02a
DM
187 async move {
188 let env: &ReaderEnvironment = rpcenv.as_ref();
dd066d28 189
ad51d02a 190 let file_name = tools::required_string_param(&param, "file-name")?.to_owned();
dd066d28 191
ad51d02a
DM
192 let mut path = env.datastore.base_path();
193 path.push(env.backup_dir.relative_path());
194 path.push(&file_name);
dd066d28 195
ad51d02a
DM
196 let path2 = path.clone();
197 let path3 = path.clone();
dd066d28 198
ad51d02a
DM
199 let file = tokio::fs::File::open(path)
200 .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
201 .await?;
fcfb84fe 202
ad51d02a 203 env.log(format!("download {:?}", path3));
dd066d28 204
db0cb9ce
WB
205 let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
206 .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
dd066d28 207
ad51d02a
DM
208 let body = Body::wrap_stream(payload);
209
210 // fixme: set other headers ?
211 Ok(Response::builder()
212 .status(StatusCode::OK)
213 .header(header::CONTENT_TYPE, "application/octet-stream")
214 .body(body)
215 .unwrap())
216 }.boxed()
dd066d28 217}
09d7dc50 218
552c2259 219#[sortable]
255f378a 220pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
329d40b5 221 &ApiHandler::AsyncHttp(&download_chunk),
255f378a
DM
222 &ObjectSchema::new(
223 "Download specified chunk.",
552c2259
DM
224 &sorted!([
225 ("digest", false, &CHUNK_DIGEST_SCHEMA),
226 ]),
09d7dc50 227 )
255f378a 228);
09d7dc50
DM
229
230fn download_chunk(
231 _parts: Parts,
232 _req_body: Body,
233 param: Value,
255f378a 234 _info: &ApiMethod,
09d7dc50 235 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 236) -> ApiResponseFuture {
09d7dc50 237
ad51d02a
DM
238 async move {
239 let env: &ReaderEnvironment = rpcenv.as_ref();
c0b1b14c 240
ad51d02a
DM
241 let digest_str = tools::required_string_param(&param, "digest")?;
242 let digest = proxmox::tools::hex_to_digest(digest_str)?;
c0b1b14c 243
ad51d02a
DM
244 let (path, _) = env.datastore.chunk_path(&digest);
245 let path2 = path.clone();
c0b1b14c 246
ad51d02a 247 env.debug(format!("download chunk {:?}", path));
c0b1b14c 248
ad51d02a
DM
249 let data = tokio::fs::read(path)
250 .map_err(move |err| http_err!(BAD_REQUEST, format!("reading file {:?} failed: {}", path2, err)))
251 .await?;
f7aa6f15 252
ad51d02a 253 let body = Body::from(data);
f7aa6f15 254
ad51d02a
DM
255 // fixme: set other headers ?
256 Ok(Response::builder()
257 .status(StatusCode::OK)
258 .header(header::CONTENT_TYPE, "application/octet-stream")
259 .body(body)
260 .unwrap())
261 }.boxed()
c0b1b14c
DM
262}
263
264/* this is too slow
265fn download_chunk_old(
266 _parts: Parts,
267 _req_body: Body,
268 param: Value,
255f378a 269 _info: &ApiMethod,
c0b1b14c 270 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 271) -> Result<ApiResponseFuture, Error> {
c0b1b14c 272
09d7dc50
DM
273 let env: &ReaderEnvironment = rpcenv.as_ref();
274 let env2 = env.clone();
275
276 let digest_str = tools::required_string_param(&param, "digest")?;
277 let digest = proxmox::tools::hex_to_digest(digest_str)?;
278
279 let (path, _) = env.datastore.chunk_path(&digest);
280
281 let path2 = path.clone();
282 let path3 = path.clone();
283
284 let response_future = tokio::fs::File::open(path)
285 .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
286 .and_then(move |file| {
287 env2.debug(format!("download chunk {:?}", path3));
db0cb9ce
WB
288 let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
289 .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
09d7dc50
DM
290
291 let body = Body::wrap_stream(payload);
292
293 // fixme: set other headers ?
ffb64344 294 futures::future::ok(Response::builder()
09d7dc50
DM
295 .status(StatusCode::OK)
296 .header(header::CONTENT_TYPE, "application/octet-stream")
297 .body(body)
298 .unwrap())
299 });
300
301 Ok(Box::new(response_future))
302}
c0b1b14c 303*/
09d7dc50 304
255f378a 305pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
329d40b5 306 &ApiHandler::AsyncHttp(&speedtest),
255f378a
DM
307 &ObjectSchema::new("Test 4M block download speed.", &[])
308);
09d7dc50
DM
309
310fn speedtest(
311 _parts: Parts,
312 _req_body: Body,
313 _param: Value,
255f378a 314 _info: &ApiMethod,
09d7dc50 315 _rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 316) -> ApiResponseFuture {
09d7dc50 317
17243003 318 let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...]
09d7dc50
DM
319
320 let body = Body::from(buffer);
321
322 let response = Response::builder()
323 .status(StatusCode::OK)
324 .header(header::CONTENT_TYPE, "application/octet-stream")
325 .body(body)
326 .unwrap();
327
ad51d02a 328 future::ok(response).boxed()
09d7dc50 329}