]>
Commit | Line | Data |
---|---|---|
cad540e9 | 1 | //use chrono::{Local, TimeZone}; |
f7d4e4b5 | 2 | use anyhow::{bail, format_err, Error}; |
dd066d28 DM |
3 | use futures::*; |
4 | use hyper::header::{self, HeaderValue, UPGRADE}; | |
dd066d28 | 5 | use hyper::http::request::Parts; |
cad540e9 | 6 | use hyper::{Body, Response, StatusCode}; |
dd066d28 DM |
7 | use serde_json::Value; |
8 | ||
552c2259 | 9 | use proxmox::{sortable, identity}; |
73ce1d11 | 10 | use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironment, Permission}; |
cad540e9 | 11 | use proxmox::api::schema::*; |
9ea4bce4 | 12 | use proxmox::http_err; |
552c2259 | 13 | |
09d7dc50 | 14 | use crate::api2::types::*; |
cad540e9 WB |
15 | use crate::backup::*; |
16 | use crate::server::{WorkerTask, H2Service}; | |
17 | use crate::tools; | |
d00e1a21 | 18 | use crate::config::acl::PRIV_DATASTORE_READ; |
365f0f72 | 19 | use crate::config::cached_user_info::CachedUserInfo; |
dd066d28 DM |
20 | |
21 | mod environment; | |
22 | use environment::*; | |
23 | ||
255f378a DM |
24 | pub const ROUTER: Router = Router::new() |
25 | .upgrade(&API_METHOD_UPGRADE_BACKUP); | |
26 | ||
552c2259 | 27 | #[sortable] |
255f378a | 28 | pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new( |
329d40b5 | 29 | &ApiHandler::AsyncHttp(&upgrade_to_backup_reader_protocol), |
255f378a DM |
30 | &ObjectSchema::new( |
31 | concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "')."), | |
552c2259 | 32 | &sorted!([ |
66c49c21 | 33 | ("store", false, &DATASTORE_SCHEMA), |
255f378a DM |
34 | ("backup-type", false, &StringSchema::new("Backup type.") |
35 | .format(&ApiStringFormat::Enum(&["vm", "ct", "host"])) | |
36 | .schema() | |
37 | ), | |
38 | ("backup-id", false, &StringSchema::new("Backup ID.").schema()), | |
39 | ("backup-time", false, &IntegerSchema::new("Backup time (Unix epoch.)") | |
40 | .minimum(1_547_797_308) | |
41 | .schema() | |
42 | ), | |
43 | ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()), | |
552c2259 | 44 | ]), |
dd066d28 | 45 | ) |
365f0f72 DM |
46 | ).access( |
47 | // Note: parameter 'store' is no uri parameter, so we need to test inside function body | |
d00e1a21 | 48 | Some("The user needs Datastore.Read privilege on /datastore/{store}."), |
365f0f72 DM |
49 | &Permission::Anybody |
50 | ); | |
dd066d28 DM |
51 | |
52 | fn upgrade_to_backup_reader_protocol( | |
53 | parts: Parts, | |
54 | req_body: Body, | |
55 | param: Value, | |
255f378a | 56 | _info: &ApiMethod, |
dd066d28 | 57 | rpcenv: Box<dyn RpcEnvironment>, |
bb084b9c | 58 | ) -> ApiResponseFuture { |
dd066d28 | 59 | |
ad51d02a DM |
60 | async move { |
61 | let debug = param["debug"].as_bool().unwrap_or(false); | |
dd066d28 | 62 | |
365f0f72 | 63 | let username = rpcenv.get_user().unwrap(); |
ad51d02a | 64 | let store = tools::required_string_param(¶m, "store")?.to_owned(); |
365f0f72 DM |
65 | |
66 | let user_info = CachedUserInfo::new()?; | |
d00e1a21 | 67 | user_info.check_privs(&username, &["datastore", &store], PRIV_DATASTORE_READ, false)?; |
365f0f72 | 68 | |
ad51d02a | 69 | let datastore = DataStore::lookup_datastore(&store)?; |
dd066d28 | 70 | |
ad51d02a DM |
71 | let backup_type = tools::required_string_param(¶m, "backup-type")?; |
72 | let backup_id = tools::required_string_param(¶m, "backup-id")?; | |
73 | let backup_time = tools::required_integer_param(¶m, "backup-time")?; | |
dd066d28 | 74 | |
ad51d02a DM |
75 | let protocols = parts |
76 | .headers | |
77 | .get("UPGRADE") | |
78 | .ok_or_else(|| format_err!("missing Upgrade header"))? | |
dd066d28 DM |
79 | .to_str()?; |
80 | ||
ad51d02a DM |
81 | if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() { |
82 | bail!("invalid protocol name"); | |
83 | } | |
dd066d28 | 84 | |
ad51d02a DM |
85 | if parts.version >= http::version::Version::HTTP_2 { |
86 | bail!("unexpected http version '{:?}' (expected version < 2)", parts.version); | |
87 | } | |
dd066d28 | 88 | |
ad51d02a | 89 | let env_type = rpcenv.env_type(); |
dd066d28 | 90 | |
ad51d02a DM |
91 | let backup_dir = BackupDir::new(backup_type, backup_id, backup_time); |
92 | let path = datastore.base_path(); | |
dd066d28 | 93 | |
ad51d02a | 94 | //let files = BackupInfo::list_files(&path, &backup_dir)?; |
dd066d28 | 95 | |
ad51d02a | 96 | let worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_dir.backup_time().timestamp()); |
dd066d28 | 97 | |
ad51d02a DM |
98 | WorkerTask::spawn("reader", Some(worker_id), &username.clone(), true, move |worker| { |
99 | let mut env = ReaderEnvironment::new( | |
100 | env_type, username.clone(), worker.clone(), datastore, backup_dir); | |
dd066d28 | 101 | |
ad51d02a | 102 | env.debug = debug; |
dd066d28 | 103 | |
ad51d02a | 104 | env.log(format!("starting new backup reader datastore '{}': {:?}", store, path)); |
dd066d28 | 105 | |
ad51d02a | 106 | let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug); |
dd066d28 | 107 | |
ad51d02a | 108 | let abort_future = worker.abort_future(); |
dd066d28 | 109 | |
ad51d02a DM |
110 | let req_fut = req_body |
111 | .on_upgrade() | |
112 | .map_err(Error::from) | |
113 | .and_then({ | |
114 | let env = env.clone(); | |
115 | move |conn| { | |
116 | env.debug("protocol upgrade done"); | |
ffb64344 | 117 | |
ad51d02a DM |
118 | let mut http = hyper::server::conn::Http::new(); |
119 | http.http2_only(true); | |
120 | // increase window size: todo - find optiomal size | |
121 | let window_size = 32*1024*1024; // max = (1 << 31) - 2 | |
122 | http.http2_initial_stream_window_size(window_size); | |
123 | http.http2_initial_connection_window_size(window_size); | |
ffb64344 | 124 | |
ad51d02a DM |
125 | http.serve_connection(conn, service) |
126 | .map_err(Error::from) | |
127 | } | |
128 | }); | |
129 | let abort_future = abort_future | |
130 | .map(|_| Err(format_err!("task aborted"))); | |
ffb64344 | 131 | |
ad51d02a DM |
132 | use futures::future::Either; |
133 | futures::future::select(req_fut, abort_future) | |
134 | .map(|res| match res { | |
135 | Either::Left((Ok(res), _)) => Ok(res), | |
136 | Either::Left((Err(err), _)) => Err(err), | |
137 | Either::Right((Ok(res), _)) => Ok(res), | |
138 | Either::Right((Err(err), _)) => Err(err), | |
139 | }) | |
140 | .map_ok(move |_| env.log("reader finished sucessfully")) | |
141 | })?; | |
dd066d28 | 142 | |
ad51d02a DM |
143 | let response = Response::builder() |
144 | .status(StatusCode::SWITCHING_PROTOCOLS) | |
145 | .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())) | |
146 | .body(Body::empty())?; | |
dd066d28 | 147 | |
ad51d02a DM |
148 | Ok(response) |
149 | }.boxed() | |
dd066d28 DM |
150 | } |
151 | ||
255f378a DM |
152 | pub const READER_API_ROUTER: Router = Router::new() |
153 | .subdirs(&[ | |
154 | ( | |
155 | "chunk", &Router::new() | |
156 | .download(&API_METHOD_DOWNLOAD_CHUNK) | |
157 | ), | |
158 | ( | |
159 | "download", &Router::new() | |
160 | .download(&API_METHOD_DOWNLOAD_FILE) | |
161 | ), | |
162 | ( | |
163 | "speedtest", &Router::new() | |
164 | .download(&API_METHOD_SPEEDTEST) | |
165 | ), | |
166 | ]); | |
167 | ||
552c2259 | 168 | #[sortable] |
255f378a | 169 | pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new( |
329d40b5 | 170 | &ApiHandler::AsyncHttp(&download_file), |
255f378a DM |
171 | &ObjectSchema::new( |
172 | "Download specified file.", | |
552c2259 DM |
173 | &sorted!([ |
174 | ("file-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA), | |
175 | ]), | |
dd066d28 | 176 | ) |
255f378a | 177 | ); |
dd066d28 DM |
178 | |
179 | fn download_file( | |
180 | _parts: Parts, | |
181 | _req_body: Body, | |
182 | param: Value, | |
255f378a | 183 | _info: &ApiMethod, |
dd066d28 | 184 | rpcenv: Box<dyn RpcEnvironment>, |
bb084b9c | 185 | ) -> ApiResponseFuture { |
dd066d28 | 186 | |
ad51d02a DM |
187 | async move { |
188 | let env: &ReaderEnvironment = rpcenv.as_ref(); | |
dd066d28 | 189 | |
ad51d02a | 190 | let file_name = tools::required_string_param(¶m, "file-name")?.to_owned(); |
dd066d28 | 191 | |
ad51d02a DM |
192 | let mut path = env.datastore.base_path(); |
193 | path.push(env.backup_dir.relative_path()); | |
194 | path.push(&file_name); | |
dd066d28 | 195 | |
ad51d02a DM |
196 | let path2 = path.clone(); |
197 | let path3 = path.clone(); | |
dd066d28 | 198 | |
ad51d02a DM |
199 | let file = tokio::fs::File::open(path) |
200 | .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err))) | |
201 | .await?; | |
fcfb84fe | 202 | |
ad51d02a | 203 | env.log(format!("download {:?}", path3)); |
dd066d28 | 204 | |
db0cb9ce WB |
205 | let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new()) |
206 | .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze())); | |
dd066d28 | 207 | |
ad51d02a DM |
208 | let body = Body::wrap_stream(payload); |
209 | ||
210 | // fixme: set other headers ? | |
211 | Ok(Response::builder() | |
212 | .status(StatusCode::OK) | |
213 | .header(header::CONTENT_TYPE, "application/octet-stream") | |
214 | .body(body) | |
215 | .unwrap()) | |
216 | }.boxed() | |
dd066d28 | 217 | } |
09d7dc50 | 218 | |
552c2259 | 219 | #[sortable] |
255f378a | 220 | pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new( |
329d40b5 | 221 | &ApiHandler::AsyncHttp(&download_chunk), |
255f378a DM |
222 | &ObjectSchema::new( |
223 | "Download specified chunk.", | |
552c2259 DM |
224 | &sorted!([ |
225 | ("digest", false, &CHUNK_DIGEST_SCHEMA), | |
226 | ]), | |
09d7dc50 | 227 | ) |
255f378a | 228 | ); |
09d7dc50 DM |
229 | |
230 | fn download_chunk( | |
231 | _parts: Parts, | |
232 | _req_body: Body, | |
233 | param: Value, | |
255f378a | 234 | _info: &ApiMethod, |
09d7dc50 | 235 | rpcenv: Box<dyn RpcEnvironment>, |
bb084b9c | 236 | ) -> ApiResponseFuture { |
09d7dc50 | 237 | |
ad51d02a DM |
238 | async move { |
239 | let env: &ReaderEnvironment = rpcenv.as_ref(); | |
c0b1b14c | 240 | |
ad51d02a DM |
241 | let digest_str = tools::required_string_param(¶m, "digest")?; |
242 | let digest = proxmox::tools::hex_to_digest(digest_str)?; | |
c0b1b14c | 243 | |
ad51d02a DM |
244 | let (path, _) = env.datastore.chunk_path(&digest); |
245 | let path2 = path.clone(); | |
c0b1b14c | 246 | |
ad51d02a | 247 | env.debug(format!("download chunk {:?}", path)); |
c0b1b14c | 248 | |
ad51d02a DM |
249 | let data = tokio::fs::read(path) |
250 | .map_err(move |err| http_err!(BAD_REQUEST, format!("reading file {:?} failed: {}", path2, err))) | |
251 | .await?; | |
f7aa6f15 | 252 | |
ad51d02a | 253 | let body = Body::from(data); |
f7aa6f15 | 254 | |
ad51d02a DM |
255 | // fixme: set other headers ? |
256 | Ok(Response::builder() | |
257 | .status(StatusCode::OK) | |
258 | .header(header::CONTENT_TYPE, "application/octet-stream") | |
259 | .body(body) | |
260 | .unwrap()) | |
261 | }.boxed() | |
c0b1b14c DM |
262 | } |
263 | ||
264 | /* this is too slow | |
265 | fn download_chunk_old( | |
266 | _parts: Parts, | |
267 | _req_body: Body, | |
268 | param: Value, | |
255f378a | 269 | _info: &ApiMethod, |
c0b1b14c | 270 | rpcenv: Box<dyn RpcEnvironment>, |
bb084b9c | 271 | ) -> Result<ApiResponseFuture, Error> { |
c0b1b14c | 272 | |
09d7dc50 DM |
273 | let env: &ReaderEnvironment = rpcenv.as_ref(); |
274 | let env2 = env.clone(); | |
275 | ||
276 | let digest_str = tools::required_string_param(¶m, "digest")?; | |
277 | let digest = proxmox::tools::hex_to_digest(digest_str)?; | |
278 | ||
279 | let (path, _) = env.datastore.chunk_path(&digest); | |
280 | ||
281 | let path2 = path.clone(); | |
282 | let path3 = path.clone(); | |
283 | ||
284 | let response_future = tokio::fs::File::open(path) | |
285 | .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err))) | |
286 | .and_then(move |file| { | |
287 | env2.debug(format!("download chunk {:?}", path3)); | |
db0cb9ce WB |
288 | let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new()) |
289 | .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze())); | |
09d7dc50 DM |
290 | |
291 | let body = Body::wrap_stream(payload); | |
292 | ||
293 | // fixme: set other headers ? | |
ffb64344 | 294 | futures::future::ok(Response::builder() |
09d7dc50 DM |
295 | .status(StatusCode::OK) |
296 | .header(header::CONTENT_TYPE, "application/octet-stream") | |
297 | .body(body) | |
298 | .unwrap()) | |
299 | }); | |
300 | ||
301 | Ok(Box::new(response_future)) | |
302 | } | |
c0b1b14c | 303 | */ |
09d7dc50 | 304 | |
255f378a | 305 | pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new( |
329d40b5 | 306 | &ApiHandler::AsyncHttp(&speedtest), |
255f378a DM |
307 | &ObjectSchema::new("Test 4M block download speed.", &[]) |
308 | ); | |
09d7dc50 DM |
309 | |
310 | fn speedtest( | |
311 | _parts: Parts, | |
312 | _req_body: Body, | |
313 | _param: Value, | |
255f378a | 314 | _info: &ApiMethod, |
09d7dc50 | 315 | _rpcenv: Box<dyn RpcEnvironment>, |
bb084b9c | 316 | ) -> ApiResponseFuture { |
09d7dc50 | 317 | |
17243003 | 318 | let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...] |
09d7dc50 DM |
319 | |
320 | let body = Body::from(buffer); | |
321 | ||
322 | let response = Response::builder() | |
323 | .status(StatusCode::OK) | |
324 | .header(header::CONTENT_TYPE, "application/octet-stream") | |
325 | .body(body) | |
326 | .unwrap(); | |
327 | ||
ad51d02a | 328 | future::ok(response).boxed() |
09d7dc50 | 329 | } |