]>
Commit | Line | Data |
---|---|---|
dd066d28 DM |
1 | use failure::*; |
2 | use lazy_static::lazy_static; | |
3 | ||
4 | use std::sync::Arc; | |
5 | ||
6 | use futures::*; | |
7 | use hyper::header::{self, HeaderValue, UPGRADE}; | |
8 | use hyper::{Body, Response, StatusCode}; | |
9 | use hyper::http::request::Parts; | |
10 | //use chrono::{Local, TimeZone}; | |
11 | ||
12 | use serde_json::Value; | |
13 | ||
14 | use crate::tools; | |
15 | use crate::api_schema::router::*; | |
16 | use crate::api_schema::*; | |
17 | use crate::server::{WorkerTask, H2Service}; | |
18 | use crate::backup::*; | |
09d7dc50 | 19 | use crate::api2::types::*; |
dd066d28 DM |
20 | |
21 | mod environment; | |
22 | use environment::*; | |
23 | ||
24 | pub fn router() -> Router { | |
25 | Router::new() | |
26 | .upgrade(api_method_upgrade_backup()) | |
27 | } | |
28 | ||
29 | pub fn api_method_upgrade_backup() -> ApiAsyncMethod { | |
30 | ApiAsyncMethod::new( | |
31 | upgrade_to_backup_reader_protocol, | |
32 | ObjectSchema::new(concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "').")) | |
33 | .required("store", StringSchema::new("Datastore name.")) | |
34 | .required("backup-type", StringSchema::new("Backup type.") | |
35 | .format(Arc::new(ApiStringFormat::Enum(&["vm", "ct", "host"])))) | |
36 | .required("backup-id", StringSchema::new("Backup ID.")) | |
37 | .required("backup-time", IntegerSchema::new("Backup time (Unix epoch.)") | |
38 | .minimum(1547797308)) | |
39 | .optional("debug", BooleanSchema::new("Enable verbose debug logging.")) | |
40 | ) | |
41 | } | |
42 | ||
43 | fn upgrade_to_backup_reader_protocol( | |
44 | parts: Parts, | |
45 | req_body: Body, | |
46 | param: Value, | |
47 | _info: &ApiAsyncMethod, | |
48 | rpcenv: Box<dyn RpcEnvironment>, | |
49 | ) -> Result<BoxFut, Error> { | |
50 | ||
51 | let debug = param["debug"].as_bool().unwrap_or(false); | |
52 | ||
53 | let store = tools::required_string_param(¶m, "store")?.to_owned(); | |
54 | let datastore = DataStore::lookup_datastore(&store)?; | |
55 | ||
56 | let backup_type = tools::required_string_param(¶m, "backup-type")?; | |
57 | let backup_id = tools::required_string_param(¶m, "backup-id")?; | |
58 | let backup_time = tools::required_integer_param(¶m, "backup-time")?; | |
59 | ||
60 | let protocols = parts | |
61 | .headers | |
62 | .get("UPGRADE") | |
63 | .ok_or_else(|| format_err!("missing Upgrade header"))? | |
64 | .to_str()?; | |
65 | ||
66 | if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() { | |
67 | bail!("invalid protocol name"); | |
68 | } | |
69 | ||
70 | if parts.version >= http::version::Version::HTTP_2 { | |
71 | bail!("unexpected http version '{:?}' (expected version < 2)", parts.version); | |
72 | } | |
73 | ||
74 | let username = rpcenv.get_user().unwrap(); | |
75 | let env_type = rpcenv.env_type(); | |
76 | ||
77 | let backup_dir = BackupDir::new(backup_type, backup_id, backup_time); | |
78 | let path = datastore.base_path(); | |
79 | ||
80 | //let files = BackupInfo::list_files(&path, &backup_dir)?; | |
81 | ||
82 | let worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_dir.backup_time().timestamp()); | |
83 | ||
84 | WorkerTask::spawn("reader", Some(worker_id), &username.clone(), true, move |worker| { | |
85 | let mut env = ReaderEnvironment::new( | |
86 | env_type, username.clone(), worker.clone(), datastore, backup_dir); | |
87 | ||
88 | env.debug = debug; | |
89 | ||
90 | env.log(format!("starting new backup reader datastore '{}': {:?}", store, path)); | |
91 | ||
92 | let service = H2Service::new(env.clone(), worker.clone(), &READER_ROUTER, debug); | |
93 | ||
94 | let abort_future = worker.abort_future(); | |
95 | ||
96 | let env3 = env.clone(); | |
97 | ||
98 | req_body | |
99 | .on_upgrade() | |
100 | .map_err(Error::from) | |
101 | .and_then(move |conn| { | |
102 | env3.debug("protocol upgrade done"); | |
103 | ||
104 | let mut http = hyper::server::conn::Http::new(); | |
105 | http.http2_only(true); | |
106 | // increase window size: todo - find optiomal size | |
107 | let window_size = 32*1024*1024; // max = (1 << 31) - 2 | |
108 | http.http2_initial_stream_window_size(window_size); | |
109 | http.http2_initial_connection_window_size(window_size); | |
110 | ||
111 | http.serve_connection(conn, service) | |
112 | .map_err(Error::from) | |
113 | }) | |
114 | .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); })) | |
115 | .map_err(|(err, _)| err) | |
116 | .and_then(move |(_result, _)| { | |
117 | env.log("reader finished sucessfully"); | |
118 | Ok(()) | |
119 | }) | |
120 | })?; | |
121 | ||
122 | let response = Response::builder() | |
123 | .status(StatusCode::SWITCHING_PROTOCOLS) | |
124 | .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())) | |
125 | .body(Body::empty())?; | |
126 | ||
127 | Ok(Box::new(futures::future::ok(response))) | |
128 | } | |
129 | ||
130 | lazy_static!{ | |
131 | static ref READER_ROUTER: Router = reader_api(); | |
132 | } | |
133 | ||
134 | pub fn reader_api() -> Router { | |
135 | ||
136 | let router = Router::new() | |
09d7dc50 DM |
137 | .subdir( |
138 | "chunk", Router::new() | |
139 | .download(api_method_download_chunk()) | |
140 | ) | |
dd066d28 DM |
141 | .subdir( |
142 | "download", Router::new() | |
143 | .download(api_method_download_file()) | |
09d7dc50 DM |
144 | ) |
145 | .subdir( | |
146 | "speedtest", Router::new() | |
147 | .download(api_method_speedtest()) | |
dd066d28 DM |
148 | ); |
149 | ||
150 | router | |
151 | } | |
152 | ||
153 | pub fn api_method_download_file() -> ApiAsyncMethod { | |
154 | ApiAsyncMethod::new( | |
155 | download_file, | |
156 | ObjectSchema::new("Download specified file.") | |
157 | .required("file-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone()) | |
158 | ) | |
159 | } | |
160 | ||
161 | fn download_file( | |
162 | _parts: Parts, | |
163 | _req_body: Body, | |
164 | param: Value, | |
165 | _info: &ApiAsyncMethod, | |
166 | rpcenv: Box<dyn RpcEnvironment>, | |
167 | ) -> Result<BoxFut, Error> { | |
168 | ||
169 | let env: &ReaderEnvironment = rpcenv.as_ref(); | |
170 | let env2 = env.clone(); | |
171 | ||
172 | let file_name = tools::required_string_param(¶m, "file-name")?.to_owned(); | |
173 | ||
174 | let mut path = env.datastore.base_path(); | |
175 | path.push(env.backup_dir.relative_path()); | |
176 | path.push(&file_name); | |
177 | ||
178 | let path2 = path.clone(); | |
179 | let path3 = path.clone(); | |
180 | ||
181 | let response_future = tokio::fs::File::open(path) | |
182 | .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err))) | |
183 | .and_then(move |file| { | |
184 | env2.log(format!("download {:?}", path3)); | |
185 | let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()). | |
fcfb84fe DM |
186 | map(|bytes| hyper::Chunk::from(bytes.freeze())); |
187 | ||
dd066d28 DM |
188 | let body = Body::wrap_stream(payload); |
189 | ||
190 | // fixme: set other headers ? | |
191 | Ok(Response::builder() | |
192 | .status(StatusCode::OK) | |
193 | .header(header::CONTENT_TYPE, "application/octet-stream") | |
194 | .body(body) | |
195 | .unwrap()) | |
196 | }); | |
197 | ||
198 | Ok(Box::new(response_future)) | |
199 | } | |
09d7dc50 DM |
200 | |
201 | pub fn api_method_download_chunk() -> ApiAsyncMethod { | |
202 | ApiAsyncMethod::new( | |
203 | download_chunk, | |
204 | ObjectSchema::new("Download specified chunk.") | |
205 | .required("digest", CHUNK_DIGEST_SCHEMA.clone()) | |
206 | ) | |
207 | } | |
208 | ||
209 | fn download_chunk( | |
210 | _parts: Parts, | |
211 | _req_body: Body, | |
212 | param: Value, | |
213 | _info: &ApiAsyncMethod, | |
214 | rpcenv: Box<dyn RpcEnvironment>, | |
215 | ) -> Result<BoxFut, Error> { | |
216 | ||
217 | let env: &ReaderEnvironment = rpcenv.as_ref(); | |
218 | let env2 = env.clone(); | |
219 | ||
220 | let digest_str = tools::required_string_param(¶m, "digest")?; | |
221 | let digest = proxmox::tools::hex_to_digest(digest_str)?; | |
222 | ||
223 | let (path, _) = env.datastore.chunk_path(&digest); | |
224 | ||
225 | let path2 = path.clone(); | |
226 | let path3 = path.clone(); | |
227 | ||
228 | let response_future = tokio::fs::File::open(path) | |
229 | .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err))) | |
230 | .and_then(move |file| { | |
231 | env2.debug(format!("download chunk {:?}", path3)); | |
232 | let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()). | |
233 | map(|bytes| hyper::Chunk::from(bytes.freeze())); | |
234 | ||
235 | let body = Body::wrap_stream(payload); | |
236 | ||
237 | // fixme: set other headers ? | |
238 | Ok(Response::builder() | |
239 | .status(StatusCode::OK) | |
240 | .header(header::CONTENT_TYPE, "application/octet-stream") | |
241 | .body(body) | |
242 | .unwrap()) | |
243 | }); | |
244 | ||
245 | Ok(Box::new(response_future)) | |
246 | } | |
247 | ||
248 | pub fn api_method_speedtest() -> ApiAsyncMethod { | |
249 | ApiAsyncMethod::new( | |
250 | speedtest, | |
251 | ObjectSchema::new("Test 4M block download speed.") | |
252 | ) | |
253 | } | |
254 | ||
255 | fn speedtest( | |
256 | _parts: Parts, | |
257 | _req_body: Body, | |
258 | _param: Value, | |
259 | _info: &ApiAsyncMethod, | |
260 | _rpcenv: Box<dyn RpcEnvironment>, | |
261 | ) -> Result<BoxFut, Error> { | |
262 | ||
17243003 | 263 | let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...] |
09d7dc50 DM |
264 | |
265 | let body = Body::from(buffer); | |
266 | ||
267 | let response = Response::builder() | |
268 | .status(StatusCode::OK) | |
269 | .header(header::CONTENT_TYPE, "application/octet-stream") | |
270 | .body(body) | |
271 | .unwrap(); | |
272 | ||
273 | Ok(Box::new(future::ok(response))) | |
274 | } |