]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/reader.rs
extract create_download_response API helper
[proxmox-backup.git] / src / api2 / reader.rs
CommitLineData
cad540e9 1//use chrono::{Local, TimeZone};
f7d4e4b5 2use anyhow::{bail, format_err, Error};
dd066d28
DM
3use futures::*;
4use hyper::header::{self, HeaderValue, UPGRADE};
dd066d28 5use hyper::http::request::Parts;
cad540e9 6use hyper::{Body, Response, StatusCode};
dd066d28
DM
7use serde_json::Value;
8
552c2259 9use proxmox::{sortable, identity};
73ce1d11 10use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironment, Permission};
cad540e9 11use proxmox::api::schema::*;
9ea4bce4 12use proxmox::http_err;
552c2259 13
09d7dc50 14use crate::api2::types::*;
cad540e9
WB
15use crate::backup::*;
16use crate::server::{WorkerTask, H2Service};
17use crate::tools;
d00e1a21 18use crate::config::acl::PRIV_DATASTORE_READ;
365f0f72 19use crate::config::cached_user_info::CachedUserInfo;
e22f4882 20use crate::api2::helpers;
dd066d28
DM
21
22mod environment;
23use environment::*;
24
255f378a
DM
25pub const ROUTER: Router = Router::new()
26 .upgrade(&API_METHOD_UPGRADE_BACKUP);
27
552c2259 28#[sortable]
255f378a 29pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
329d40b5 30 &ApiHandler::AsyncHttp(&upgrade_to_backup_reader_protocol),
255f378a
DM
31 &ObjectSchema::new(
32 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "')."),
552c2259 33 &sorted!([
66c49c21 34 ("store", false, &DATASTORE_SCHEMA),
bc0d0388
DM
35 ("backup-type", false, &BACKUP_TYPE_SCHEMA),
36 ("backup-id", false, &BACKUP_ID_SCHEMA),
37 ("backup-time", false, &BACKUP_TIME_SCHEMA),
255f378a 38 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
552c2259 39 ]),
dd066d28 40 )
365f0f72
DM
41).access(
42 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
d00e1a21 43 Some("The user needs Datastore.Read privilege on /datastore/{store}."),
365f0f72
DM
44 &Permission::Anybody
45);
dd066d28
DM
46
47fn upgrade_to_backup_reader_protocol(
48 parts: Parts,
49 req_body: Body,
50 param: Value,
255f378a 51 _info: &ApiMethod,
dd066d28 52 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 53) -> ApiResponseFuture {
dd066d28 54
ad51d02a
DM
55 async move {
56 let debug = param["debug"].as_bool().unwrap_or(false);
dd066d28 57
365f0f72 58 let username = rpcenv.get_user().unwrap();
ad51d02a 59 let store = tools::required_string_param(&param, "store")?.to_owned();
365f0f72
DM
60
61 let user_info = CachedUserInfo::new()?;
d00e1a21 62 user_info.check_privs(&username, &["datastore", &store], PRIV_DATASTORE_READ, false)?;
365f0f72 63
ad51d02a 64 let datastore = DataStore::lookup_datastore(&store)?;
dd066d28 65
ad51d02a
DM
66 let backup_type = tools::required_string_param(&param, "backup-type")?;
67 let backup_id = tools::required_string_param(&param, "backup-id")?;
68 let backup_time = tools::required_integer_param(&param, "backup-time")?;
dd066d28 69
ad51d02a
DM
70 let protocols = parts
71 .headers
72 .get("UPGRADE")
73 .ok_or_else(|| format_err!("missing Upgrade header"))?
dd066d28
DM
74 .to_str()?;
75
ad51d02a
DM
76 if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() {
77 bail!("invalid protocol name");
78 }
dd066d28 79
ad51d02a
DM
80 if parts.version >= http::version::Version::HTTP_2 {
81 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
82 }
dd066d28 83
ad51d02a 84 let env_type = rpcenv.env_type();
dd066d28 85
ad51d02a
DM
86 let backup_dir = BackupDir::new(backup_type, backup_id, backup_time);
87 let path = datastore.base_path();
dd066d28 88
ad51d02a 89 //let files = BackupInfo::list_files(&path, &backup_dir)?;
dd066d28 90
ad51d02a 91 let worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_dir.backup_time().timestamp());
dd066d28 92
ad51d02a
DM
93 WorkerTask::spawn("reader", Some(worker_id), &username.clone(), true, move |worker| {
94 let mut env = ReaderEnvironment::new(
95 env_type, username.clone(), worker.clone(), datastore, backup_dir);
dd066d28 96
ad51d02a 97 env.debug = debug;
dd066d28 98
ad51d02a 99 env.log(format!("starting new backup reader datastore '{}': {:?}", store, path));
dd066d28 100
ad51d02a 101 let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
dd066d28 102
ad51d02a 103 let abort_future = worker.abort_future();
dd066d28 104
ad51d02a
DM
105 let req_fut = req_body
106 .on_upgrade()
107 .map_err(Error::from)
108 .and_then({
109 let env = env.clone();
110 move |conn| {
111 env.debug("protocol upgrade done");
ffb64344 112
ad51d02a
DM
113 let mut http = hyper::server::conn::Http::new();
114 http.http2_only(true);
115 // increase window size: todo - find optiomal size
116 let window_size = 32*1024*1024; // max = (1 << 31) - 2
117 http.http2_initial_stream_window_size(window_size);
118 http.http2_initial_connection_window_size(window_size);
ffb64344 119
ad51d02a
DM
120 http.serve_connection(conn, service)
121 .map_err(Error::from)
122 }
123 });
124 let abort_future = abort_future
125 .map(|_| Err(format_err!("task aborted")));
ffb64344 126
ad51d02a
DM
127 use futures::future::Either;
128 futures::future::select(req_fut, abort_future)
129 .map(|res| match res {
130 Either::Left((Ok(res), _)) => Ok(res),
131 Either::Left((Err(err), _)) => Err(err),
132 Either::Right((Ok(res), _)) => Ok(res),
133 Either::Right((Err(err), _)) => Err(err),
134 })
add5861e 135 .map_ok(move |_| env.log("reader finished successfully"))
ad51d02a 136 })?;
dd066d28 137
ad51d02a
DM
138 let response = Response::builder()
139 .status(StatusCode::SWITCHING_PROTOCOLS)
140 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
141 .body(Body::empty())?;
dd066d28 142
ad51d02a
DM
143 Ok(response)
144 }.boxed()
dd066d28
DM
145}
146
255f378a
DM
147pub const READER_API_ROUTER: Router = Router::new()
148 .subdirs(&[
149 (
150 "chunk", &Router::new()
151 .download(&API_METHOD_DOWNLOAD_CHUNK)
152 ),
153 (
154 "download", &Router::new()
155 .download(&API_METHOD_DOWNLOAD_FILE)
156 ),
157 (
158 "speedtest", &Router::new()
159 .download(&API_METHOD_SPEEDTEST)
160 ),
161 ]);
162
552c2259 163#[sortable]
255f378a 164pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
329d40b5 165 &ApiHandler::AsyncHttp(&download_file),
255f378a
DM
166 &ObjectSchema::new(
167 "Download specified file.",
552c2259
DM
168 &sorted!([
169 ("file-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
170 ]),
dd066d28 171 )
255f378a 172);
dd066d28
DM
173
174fn download_file(
175 _parts: Parts,
176 _req_body: Body,
177 param: Value,
255f378a 178 _info: &ApiMethod,
dd066d28 179 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 180) -> ApiResponseFuture {
dd066d28 181
ad51d02a
DM
182 async move {
183 let env: &ReaderEnvironment = rpcenv.as_ref();
dd066d28 184
ad51d02a 185 let file_name = tools::required_string_param(&param, "file-name")?.to_owned();
dd066d28 186
ad51d02a
DM
187 let mut path = env.datastore.base_path();
188 path.push(env.backup_dir.relative_path());
189 path.push(&file_name);
dd066d28 190
e22f4882 191 env.log(format!("download {:?}", path.clone()));
dd066d28 192
e22f4882 193 helpers::create_download_response(path).await
ad51d02a 194 }.boxed()
dd066d28 195}
09d7dc50 196
552c2259 197#[sortable]
255f378a 198pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
329d40b5 199 &ApiHandler::AsyncHttp(&download_chunk),
255f378a
DM
200 &ObjectSchema::new(
201 "Download specified chunk.",
552c2259
DM
202 &sorted!([
203 ("digest", false, &CHUNK_DIGEST_SCHEMA),
204 ]),
09d7dc50 205 )
255f378a 206);
09d7dc50
DM
207
208fn download_chunk(
209 _parts: Parts,
210 _req_body: Body,
211 param: Value,
255f378a 212 _info: &ApiMethod,
09d7dc50 213 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 214) -> ApiResponseFuture {
09d7dc50 215
ad51d02a
DM
216 async move {
217 let env: &ReaderEnvironment = rpcenv.as_ref();
c0b1b14c 218
ad51d02a
DM
219 let digest_str = tools::required_string_param(&param, "digest")?;
220 let digest = proxmox::tools::hex_to_digest(digest_str)?;
c0b1b14c 221
ad51d02a
DM
222 let (path, _) = env.datastore.chunk_path(&digest);
223 let path2 = path.clone();
c0b1b14c 224
ad51d02a 225 env.debug(format!("download chunk {:?}", path));
c0b1b14c 226
ad51d02a
DM
227 let data = tokio::fs::read(path)
228 .map_err(move |err| http_err!(BAD_REQUEST, format!("reading file {:?} failed: {}", path2, err)))
229 .await?;
f7aa6f15 230
ad51d02a 231 let body = Body::from(data);
f7aa6f15 232
ad51d02a
DM
233 // fixme: set other headers ?
234 Ok(Response::builder()
235 .status(StatusCode::OK)
236 .header(header::CONTENT_TYPE, "application/octet-stream")
237 .body(body)
238 .unwrap())
239 }.boxed()
c0b1b14c
DM
240}
241
242/* this is too slow
243fn download_chunk_old(
244 _parts: Parts,
245 _req_body: Body,
246 param: Value,
255f378a 247 _info: &ApiMethod,
c0b1b14c 248 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 249) -> Result<ApiResponseFuture, Error> {
c0b1b14c 250
09d7dc50
DM
251 let env: &ReaderEnvironment = rpcenv.as_ref();
252 let env2 = env.clone();
253
254 let digest_str = tools::required_string_param(&param, "digest")?;
255 let digest = proxmox::tools::hex_to_digest(digest_str)?;
256
257 let (path, _) = env.datastore.chunk_path(&digest);
258
259 let path2 = path.clone();
260 let path3 = path.clone();
261
262 let response_future = tokio::fs::File::open(path)
263 .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
264 .and_then(move |file| {
265 env2.debug(format!("download chunk {:?}", path3));
db0cb9ce
WB
266 let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
267 .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
09d7dc50
DM
268
269 let body = Body::wrap_stream(payload);
270
271 // fixme: set other headers ?
ffb64344 272 futures::future::ok(Response::builder()
09d7dc50
DM
273 .status(StatusCode::OK)
274 .header(header::CONTENT_TYPE, "application/octet-stream")
275 .body(body)
276 .unwrap())
277 });
278
279 Ok(Box::new(response_future))
280}
c0b1b14c 281*/
09d7dc50 282
255f378a 283pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
329d40b5 284 &ApiHandler::AsyncHttp(&speedtest),
255f378a
DM
285 &ObjectSchema::new("Test 4M block download speed.", &[])
286);
09d7dc50
DM
287
288fn speedtest(
289 _parts: Parts,
290 _req_body: Body,
291 _param: Value,
255f378a 292 _info: &ApiMethod,
09d7dc50 293 _rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 294) -> ApiResponseFuture {
09d7dc50 295
17243003 296 let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...]
09d7dc50
DM
297
298 let body = Body::from(buffer);
299
300 let response = Response::builder()
301 .status(StatusCode::OK)
302 .header(header::CONTENT_TYPE, "application/octet-stream")
303 .body(body)
304 .unwrap();
305
ad51d02a 306 future::ok(response).boxed()
09d7dc50 307}