]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/reader.rs
83ae616ac9e4cbc091bc7628c3a64f76206cd9f0
[proxmox-backup.git] / src / api2 / reader.rs
1 //use chrono::{Local, TimeZone};
2 use anyhow::{bail, format_err, Error};
3 use futures::*;
4 use hyper::header::{self, HeaderValue, UPGRADE};
5 use hyper::http::request::Parts;
6 use hyper::{Body, Response, StatusCode};
7 use serde_json::Value;
8
9 use proxmox::{sortable, identity};
10 use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironment, Permission};
11 use proxmox::api::schema::*;
12 use proxmox::http_err;
13
14 use crate::api2::types::*;
15 use crate::backup::*;
16 use crate::server::{WorkerTask, H2Service};
17 use crate::tools;
18 use crate::config::acl::PRIV_DATASTORE_READ;
19 use crate::config::cached_user_info::CachedUserInfo;
20 use crate::api2::helpers;
21
22 mod environment;
23 use environment::*;
24
25 pub const ROUTER: Router = Router::new()
26 .upgrade(&API_METHOD_UPGRADE_BACKUP);
27
28 #[sortable]
29 pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
30 &ApiHandler::AsyncHttp(&upgrade_to_backup_reader_protocol),
31 &ObjectSchema::new(
32 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "')."),
33 &sorted!([
34 ("store", false, &DATASTORE_SCHEMA),
35 ("backup-type", false, &BACKUP_TYPE_SCHEMA),
36 ("backup-id", false, &BACKUP_ID_SCHEMA),
37 ("backup-time", false, &BACKUP_TIME_SCHEMA),
38 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
39 ]),
40 )
41 ).access(
42 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
43 Some("The user needs Datastore.Read privilege on /datastore/{store}."),
44 &Permission::Anybody
45 );
46
47 fn upgrade_to_backup_reader_protocol(
48 parts: Parts,
49 req_body: Body,
50 param: Value,
51 _info: &ApiMethod,
52 rpcenv: Box<dyn RpcEnvironment>,
53 ) -> ApiResponseFuture {
54
55 async move {
56 let debug = param["debug"].as_bool().unwrap_or(false);
57
58 let userid: Userid = rpcenv.get_user().unwrap().parse()?;
59 let store = tools::required_string_param(&param, "store")?.to_owned();
60
61 let user_info = CachedUserInfo::new()?;
62 user_info.check_privs(&userid, &["datastore", &store], PRIV_DATASTORE_READ, false)?;
63
64 let datastore = DataStore::lookup_datastore(&store)?;
65
66 let backup_type = tools::required_string_param(&param, "backup-type")?;
67 let backup_id = tools::required_string_param(&param, "backup-id")?;
68 let backup_time = tools::required_integer_param(&param, "backup-time")?;
69
70 let protocols = parts
71 .headers
72 .get("UPGRADE")
73 .ok_or_else(|| format_err!("missing Upgrade header"))?
74 .to_str()?;
75
76 if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() {
77 bail!("invalid protocol name");
78 }
79
80 if parts.version >= http::version::Version::HTTP_2 {
81 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
82 }
83
84 let env_type = rpcenv.env_type();
85
86 let backup_dir = BackupDir::new(backup_type, backup_id, backup_time);
87 let path = datastore.base_path();
88
89 //let files = BackupInfo::list_files(&path, &backup_dir)?;
90
91 let worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_dir.backup_time().timestamp());
92
93 WorkerTask::spawn("reader", Some(worker_id), userid.clone(), true, move |worker| {
94 let mut env = ReaderEnvironment::new(
95 env_type,
96 userid,
97 worker.clone(),
98 datastore,
99 backup_dir,
100 );
101
102 env.debug = debug;
103
104 env.log(format!("starting new backup reader datastore '{}': {:?}", store, path));
105
106 let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
107
108 let abort_future = worker.abort_future();
109
110 let req_fut = req_body
111 .on_upgrade()
112 .map_err(Error::from)
113 .and_then({
114 let env = env.clone();
115 move |conn| {
116 env.debug("protocol upgrade done");
117
118 let mut http = hyper::server::conn::Http::new();
119 http.http2_only(true);
120 // increase window size: todo - find optiomal size
121 let window_size = 32*1024*1024; // max = (1 << 31) - 2
122 http.http2_initial_stream_window_size(window_size);
123 http.http2_initial_connection_window_size(window_size);
124
125 http.serve_connection(conn, service)
126 .map_err(Error::from)
127 }
128 });
129 let abort_future = abort_future
130 .map(|_| Err(format_err!("task aborted")));
131
132 use futures::future::Either;
133 futures::future::select(req_fut, abort_future)
134 .map(|res| match res {
135 Either::Left((Ok(res), _)) => Ok(res),
136 Either::Left((Err(err), _)) => Err(err),
137 Either::Right((Ok(res), _)) => Ok(res),
138 Either::Right((Err(err), _)) => Err(err),
139 })
140 .map_ok(move |_| env.log("reader finished successfully"))
141 })?;
142
143 let response = Response::builder()
144 .status(StatusCode::SWITCHING_PROTOCOLS)
145 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
146 .body(Body::empty())?;
147
148 Ok(response)
149 }.boxed()
150 }
151
152 pub const READER_API_ROUTER: Router = Router::new()
153 .subdirs(&[
154 (
155 "chunk", &Router::new()
156 .download(&API_METHOD_DOWNLOAD_CHUNK)
157 ),
158 (
159 "download", &Router::new()
160 .download(&API_METHOD_DOWNLOAD_FILE)
161 ),
162 (
163 "speedtest", &Router::new()
164 .download(&API_METHOD_SPEEDTEST)
165 ),
166 ]);
167
168 #[sortable]
169 pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
170 &ApiHandler::AsyncHttp(&download_file),
171 &ObjectSchema::new(
172 "Download specified file.",
173 &sorted!([
174 ("file-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
175 ]),
176 )
177 );
178
179 fn download_file(
180 _parts: Parts,
181 _req_body: Body,
182 param: Value,
183 _info: &ApiMethod,
184 rpcenv: Box<dyn RpcEnvironment>,
185 ) -> ApiResponseFuture {
186
187 async move {
188 let env: &ReaderEnvironment = rpcenv.as_ref();
189
190 let file_name = tools::required_string_param(&param, "file-name")?.to_owned();
191
192 let mut path = env.datastore.base_path();
193 path.push(env.backup_dir.relative_path());
194 path.push(&file_name);
195
196 env.log(format!("download {:?}", path.clone()));
197
198 helpers::create_download_response(path).await
199 }.boxed()
200 }
201
202 #[sortable]
203 pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
204 &ApiHandler::AsyncHttp(&download_chunk),
205 &ObjectSchema::new(
206 "Download specified chunk.",
207 &sorted!([
208 ("digest", false, &CHUNK_DIGEST_SCHEMA),
209 ]),
210 )
211 );
212
213 fn download_chunk(
214 _parts: Parts,
215 _req_body: Body,
216 param: Value,
217 _info: &ApiMethod,
218 rpcenv: Box<dyn RpcEnvironment>,
219 ) -> ApiResponseFuture {
220
221 async move {
222 let env: &ReaderEnvironment = rpcenv.as_ref();
223
224 let digest_str = tools::required_string_param(&param, "digest")?;
225 let digest = proxmox::tools::hex_to_digest(digest_str)?;
226
227 let (path, _) = env.datastore.chunk_path(&digest);
228 let path2 = path.clone();
229
230 env.debug(format!("download chunk {:?}", path));
231
232 let data = tokio::fs::read(path)
233 .await
234 .map_err(move |err| http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err))?;
235
236 let body = Body::from(data);
237
238 // fixme: set other headers ?
239 Ok(Response::builder()
240 .status(StatusCode::OK)
241 .header(header::CONTENT_TYPE, "application/octet-stream")
242 .body(body)
243 .unwrap())
244 }.boxed()
245 }
246
247 /* this is too slow
248 fn download_chunk_old(
249 _parts: Parts,
250 _req_body: Body,
251 param: Value,
252 _info: &ApiMethod,
253 rpcenv: Box<dyn RpcEnvironment>,
254 ) -> Result<ApiResponseFuture, Error> {
255
256 let env: &ReaderEnvironment = rpcenv.as_ref();
257 let env2 = env.clone();
258
259 let digest_str = tools::required_string_param(&param, "digest")?;
260 let digest = proxmox::tools::hex_to_digest(digest_str)?;
261
262 let (path, _) = env.datastore.chunk_path(&digest);
263
264 let path2 = path.clone();
265 let path3 = path.clone();
266
267 let response_future = tokio::fs::File::open(path)
268 .map_err(move |err| http_err!(BAD_REQUEST, "open file {:?} failed: {}", path2, err))
269 .and_then(move |file| {
270 env2.debug(format!("download chunk {:?}", path3));
271 let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
272 .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
273
274 let body = Body::wrap_stream(payload);
275
276 // fixme: set other headers ?
277 futures::future::ok(Response::builder()
278 .status(StatusCode::OK)
279 .header(header::CONTENT_TYPE, "application/octet-stream")
280 .body(body)
281 .unwrap())
282 });
283
284 Ok(Box::new(response_future))
285 }
286 */
287
288 pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
289 &ApiHandler::AsyncHttp(&speedtest),
290 &ObjectSchema::new("Test 4M block download speed.", &[])
291 );
292
293 fn speedtest(
294 _parts: Parts,
295 _req_body: Body,
296 _param: Value,
297 _info: &ApiMethod,
298 _rpcenv: Box<dyn RpcEnvironment>,
299 ) -> ApiResponseFuture {
300
301 let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...]
302
303 let body = Body::from(buffer);
304
305 let response = Response::builder()
306 .status(StatusCode::OK)
307 .header(header::CONTENT_TYPE, "application/octet-stream")
308 .body(body)
309 .unwrap();
310
311 future::ok(response).boxed()
312 }