]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/reader.rs
src/api2/pull.rs: add access permission
[proxmox-backup.git] / src / api2 / reader.rs
CommitLineData
cad540e9 1//use chrono::{Local, TimeZone};
dd066d28 2use failure::*;
dd066d28
DM
3use futures::*;
4use hyper::header::{self, HeaderValue, UPGRADE};
dd066d28 5use hyper::http::request::Parts;
cad540e9 6use hyper::{Body, Response, StatusCode};
dd066d28
DM
7use serde_json::Value;
8
552c2259 9use proxmox::{sortable, identity};
73ce1d11 10use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironment, Permission};
cad540e9 11use proxmox::api::schema::*;
9ea4bce4 12use proxmox::http_err;
552c2259 13
09d7dc50 14use crate::api2::types::*;
cad540e9
WB
15use crate::backup::*;
16use crate::server::{WorkerTask, H2Service};
17use crate::tools;
73ce1d11 18use crate::config::acl::PRIV_DATASTORE_ALLOCATE_SPACE;
dd066d28
DM
19
20mod environment;
21use environment::*;
22
255f378a
DM
23pub const ROUTER: Router = Router::new()
24 .upgrade(&API_METHOD_UPGRADE_BACKUP);
25
552c2259 26#[sortable]
255f378a 27pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
329d40b5 28 &ApiHandler::AsyncHttp(&upgrade_to_backup_reader_protocol),
255f378a
DM
29 &ObjectSchema::new(
30 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "')."),
552c2259 31 &sorted!([
66c49c21 32 ("store", false, &DATASTORE_SCHEMA),
255f378a
DM
33 ("backup-type", false, &StringSchema::new("Backup type.")
34 .format(&ApiStringFormat::Enum(&["vm", "ct", "host"]))
35 .schema()
36 ),
37 ("backup-id", false, &StringSchema::new("Backup ID.").schema()),
38 ("backup-time", false, &IntegerSchema::new("Backup time (Unix epoch.)")
39 .minimum(1_547_797_308)
40 .schema()
41 ),
42 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
552c2259 43 ]),
dd066d28 44 )
73ce1d11 45).access(None, &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_ALLOCATE_SPACE, false));
dd066d28
DM
46
47fn upgrade_to_backup_reader_protocol(
48 parts: Parts,
49 req_body: Body,
50 param: Value,
255f378a 51 _info: &ApiMethod,
dd066d28 52 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 53) -> ApiResponseFuture {
dd066d28 54
ad51d02a
DM
55 async move {
56 let debug = param["debug"].as_bool().unwrap_or(false);
dd066d28 57
ad51d02a
DM
58 let store = tools::required_string_param(&param, "store")?.to_owned();
59 let datastore = DataStore::lookup_datastore(&store)?;
dd066d28 60
ad51d02a
DM
61 let backup_type = tools::required_string_param(&param, "backup-type")?;
62 let backup_id = tools::required_string_param(&param, "backup-id")?;
63 let backup_time = tools::required_integer_param(&param, "backup-time")?;
dd066d28 64
ad51d02a
DM
65 let protocols = parts
66 .headers
67 .get("UPGRADE")
68 .ok_or_else(|| format_err!("missing Upgrade header"))?
dd066d28
DM
69 .to_str()?;
70
ad51d02a
DM
71 if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() {
72 bail!("invalid protocol name");
73 }
dd066d28 74
ad51d02a
DM
75 if parts.version >= http::version::Version::HTTP_2 {
76 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
77 }
dd066d28 78
ad51d02a
DM
79 let username = rpcenv.get_user().unwrap();
80 let env_type = rpcenv.env_type();
dd066d28 81
ad51d02a
DM
82 let backup_dir = BackupDir::new(backup_type, backup_id, backup_time);
83 let path = datastore.base_path();
dd066d28 84
ad51d02a 85 //let files = BackupInfo::list_files(&path, &backup_dir)?;
dd066d28 86
ad51d02a 87 let worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_dir.backup_time().timestamp());
dd066d28 88
ad51d02a
DM
89 WorkerTask::spawn("reader", Some(worker_id), &username.clone(), true, move |worker| {
90 let mut env = ReaderEnvironment::new(
91 env_type, username.clone(), worker.clone(), datastore, backup_dir);
dd066d28 92
ad51d02a 93 env.debug = debug;
dd066d28 94
ad51d02a 95 env.log(format!("starting new backup reader datastore '{}': {:?}", store, path));
dd066d28 96
ad51d02a 97 let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
dd066d28 98
ad51d02a 99 let abort_future = worker.abort_future();
dd066d28 100
ad51d02a
DM
101 let req_fut = req_body
102 .on_upgrade()
103 .map_err(Error::from)
104 .and_then({
105 let env = env.clone();
106 move |conn| {
107 env.debug("protocol upgrade done");
ffb64344 108
ad51d02a
DM
109 let mut http = hyper::server::conn::Http::new();
110 http.http2_only(true);
111 // increase window size: todo - find optiomal size
112 let window_size = 32*1024*1024; // max = (1 << 31) - 2
113 http.http2_initial_stream_window_size(window_size);
114 http.http2_initial_connection_window_size(window_size);
ffb64344 115
ad51d02a
DM
116 http.serve_connection(conn, service)
117 .map_err(Error::from)
118 }
119 });
120 let abort_future = abort_future
121 .map(|_| Err(format_err!("task aborted")));
ffb64344 122
ad51d02a
DM
123 use futures::future::Either;
124 futures::future::select(req_fut, abort_future)
125 .map(|res| match res {
126 Either::Left((Ok(res), _)) => Ok(res),
127 Either::Left((Err(err), _)) => Err(err),
128 Either::Right((Ok(res), _)) => Ok(res),
129 Either::Right((Err(err), _)) => Err(err),
130 })
131 .map_ok(move |_| env.log("reader finished sucessfully"))
132 })?;
dd066d28 133
ad51d02a
DM
134 let response = Response::builder()
135 .status(StatusCode::SWITCHING_PROTOCOLS)
136 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
137 .body(Body::empty())?;
dd066d28 138
ad51d02a
DM
139 Ok(response)
140 }.boxed()
dd066d28
DM
141}
142
255f378a
DM
143pub const READER_API_ROUTER: Router = Router::new()
144 .subdirs(&[
145 (
146 "chunk", &Router::new()
147 .download(&API_METHOD_DOWNLOAD_CHUNK)
148 ),
149 (
150 "download", &Router::new()
151 .download(&API_METHOD_DOWNLOAD_FILE)
152 ),
153 (
154 "speedtest", &Router::new()
155 .download(&API_METHOD_SPEEDTEST)
156 ),
157 ]);
158
552c2259 159#[sortable]
255f378a 160pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
329d40b5 161 &ApiHandler::AsyncHttp(&download_file),
255f378a
DM
162 &ObjectSchema::new(
163 "Download specified file.",
552c2259
DM
164 &sorted!([
165 ("file-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
166 ]),
dd066d28 167 )
255f378a 168);
dd066d28
DM
169
170fn download_file(
171 _parts: Parts,
172 _req_body: Body,
173 param: Value,
255f378a 174 _info: &ApiMethod,
dd066d28 175 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 176) -> ApiResponseFuture {
dd066d28 177
ad51d02a
DM
178 async move {
179 let env: &ReaderEnvironment = rpcenv.as_ref();
dd066d28 180
ad51d02a 181 let file_name = tools::required_string_param(&param, "file-name")?.to_owned();
dd066d28 182
ad51d02a
DM
183 let mut path = env.datastore.base_path();
184 path.push(env.backup_dir.relative_path());
185 path.push(&file_name);
dd066d28 186
ad51d02a
DM
187 let path2 = path.clone();
188 let path3 = path.clone();
dd066d28 189
ad51d02a
DM
190 let file = tokio::fs::File::open(path)
191 .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
192 .await?;
fcfb84fe 193
ad51d02a 194 env.log(format!("download {:?}", path3));
dd066d28 195
db0cb9ce
WB
196 let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
197 .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
dd066d28 198
ad51d02a
DM
199 let body = Body::wrap_stream(payload);
200
201 // fixme: set other headers ?
202 Ok(Response::builder()
203 .status(StatusCode::OK)
204 .header(header::CONTENT_TYPE, "application/octet-stream")
205 .body(body)
206 .unwrap())
207 }.boxed()
dd066d28 208}
09d7dc50 209
552c2259 210#[sortable]
255f378a 211pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
329d40b5 212 &ApiHandler::AsyncHttp(&download_chunk),
255f378a
DM
213 &ObjectSchema::new(
214 "Download specified chunk.",
552c2259
DM
215 &sorted!([
216 ("digest", false, &CHUNK_DIGEST_SCHEMA),
217 ]),
09d7dc50 218 )
255f378a 219);
09d7dc50
DM
220
221fn download_chunk(
222 _parts: Parts,
223 _req_body: Body,
224 param: Value,
255f378a 225 _info: &ApiMethod,
09d7dc50 226 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 227) -> ApiResponseFuture {
09d7dc50 228
ad51d02a
DM
229 async move {
230 let env: &ReaderEnvironment = rpcenv.as_ref();
c0b1b14c 231
ad51d02a
DM
232 let digest_str = tools::required_string_param(&param, "digest")?;
233 let digest = proxmox::tools::hex_to_digest(digest_str)?;
c0b1b14c 234
ad51d02a
DM
235 let (path, _) = env.datastore.chunk_path(&digest);
236 let path2 = path.clone();
c0b1b14c 237
ad51d02a 238 env.debug(format!("download chunk {:?}", path));
c0b1b14c 239
ad51d02a
DM
240 let data = tokio::fs::read(path)
241 .map_err(move |err| http_err!(BAD_REQUEST, format!("reading file {:?} failed: {}", path2, err)))
242 .await?;
f7aa6f15 243
ad51d02a 244 let body = Body::from(data);
f7aa6f15 245
ad51d02a
DM
246 // fixme: set other headers ?
247 Ok(Response::builder()
248 .status(StatusCode::OK)
249 .header(header::CONTENT_TYPE, "application/octet-stream")
250 .body(body)
251 .unwrap())
252 }.boxed()
c0b1b14c
DM
253}
254
255/* this is too slow
256fn download_chunk_old(
257 _parts: Parts,
258 _req_body: Body,
259 param: Value,
255f378a 260 _info: &ApiMethod,
c0b1b14c 261 rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 262) -> Result<ApiResponseFuture, Error> {
c0b1b14c 263
09d7dc50
DM
264 let env: &ReaderEnvironment = rpcenv.as_ref();
265 let env2 = env.clone();
266
267 let digest_str = tools::required_string_param(&param, "digest")?;
268 let digest = proxmox::tools::hex_to_digest(digest_str)?;
269
270 let (path, _) = env.datastore.chunk_path(&digest);
271
272 let path2 = path.clone();
273 let path3 = path.clone();
274
275 let response_future = tokio::fs::File::open(path)
276 .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
277 .and_then(move |file| {
278 env2.debug(format!("download chunk {:?}", path3));
db0cb9ce
WB
279 let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
280 .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
09d7dc50
DM
281
282 let body = Body::wrap_stream(payload);
283
284 // fixme: set other headers ?
ffb64344 285 futures::future::ok(Response::builder()
09d7dc50
DM
286 .status(StatusCode::OK)
287 .header(header::CONTENT_TYPE, "application/octet-stream")
288 .body(body)
289 .unwrap())
290 });
291
292 Ok(Box::new(response_future))
293}
c0b1b14c 294*/
09d7dc50 295
255f378a 296pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
329d40b5 297 &ApiHandler::AsyncHttp(&speedtest),
255f378a
DM
298 &ObjectSchema::new("Test 4M block download speed.", &[])
299);
09d7dc50
DM
300
301fn speedtest(
302 _parts: Parts,
303 _req_body: Body,
304 _param: Value,
255f378a 305 _info: &ApiMethod,
09d7dc50 306 _rpcenv: Box<dyn RpcEnvironment>,
bb084b9c 307) -> ApiResponseFuture {
09d7dc50 308
17243003 309 let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...]
09d7dc50
DM
310
311 let body = Body::from(buffer);
312
313 let response = Response::builder()
314 .status(StatusCode::OK)
315 .header(header::CONTENT_TYPE, "application/octet-stream")
316 .body(body)
317 .unwrap();
318
ad51d02a 319 future::ok(response).boxed()
09d7dc50 320}