]>
Commit | Line | Data |
---|---|---|
de8ec041 DM |
1 | use failure::*; |
2 | use serde_json::json; | |
3 | use std::convert::TryFrom; | |
4 | use std::sync::Arc; | |
5 | use std::collections::HashMap; | |
6 | use std::io::{Seek, SeekFrom}; | |
7 | use chrono::{Utc, TimeZone}; | |
8 | ||
9 | use proxmox::api::api; | |
10 | use proxmox::api::{ApiMethod, Router, RpcEnvironment}; | |
11 | ||
12 | use crate::server::{WorkerTask}; | |
13 | use crate::backup::*; | |
14 | use crate::client::*; | |
15 | use crate::api2::types::*; | |
16 | ||
17 | // fixme: implement filters | |
18 | // Todo: correctly lock backup groups | |
19 | ||
20 | async fn sync_index_chunks<I: IndexFile>( | |
21 | _worker: &WorkerTask, | |
22 | chunk_reader: &mut RemoteChunkReader, | |
23 | target: Arc<DataStore>, | |
24 | index: I, | |
25 | ) -> Result<(), Error> { | |
26 | ||
27 | ||
28 | for pos in 0..index.index_count() { | |
29 | let digest = index.index_digest(pos).unwrap(); | |
30 | let chunk_exists = target.cond_touch_chunk(digest, false)?; | |
31 | if chunk_exists { | |
32 | //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest))); | |
33 | continue; | |
34 | } | |
35 | //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest))); | |
36 | let chunk = chunk_reader.read_raw_chunk(&digest)?; | |
37 | ||
38 | target.insert_chunk(&chunk, &digest)?; | |
39 | } | |
40 | ||
41 | Ok(()) | |
42 | } | |
43 | ||
44 | async fn download_manifest( | |
45 | reader: &BackupReader, | |
46 | filename: &std::path::Path, | |
47 | ) -> Result<std::fs::File, Error> { | |
48 | ||
49 | let tmp_manifest_file = std::fs::OpenOptions::new() | |
50 | .write(true) | |
51 | .create(true) | |
52 | .read(true) | |
53 | .open(&filename)?; | |
54 | ||
55 | let mut tmp_manifest_file = reader.download(MANIFEST_BLOB_NAME, tmp_manifest_file).await?; | |
56 | ||
57 | tmp_manifest_file.seek(SeekFrom::Start(0))?; | |
58 | ||
59 | Ok(tmp_manifest_file) | |
60 | } | |
61 | ||
62 | async fn sync_single_archive( | |
63 | worker: &WorkerTask, | |
64 | reader: &BackupReader, | |
65 | chunk_reader: &mut RemoteChunkReader, | |
66 | tgt_store: Arc<DataStore>, | |
67 | snapshot: &BackupDir, | |
68 | archive_name: &str, | |
69 | ) -> Result<(), Error> { | |
70 | ||
71 | let mut path = tgt_store.base_path(); | |
72 | path.push(snapshot.relative_path()); | |
73 | path.push(archive_name); | |
74 | ||
75 | let mut tmp_path = path.clone(); | |
76 | tmp_path.set_extension("tmp"); | |
77 | ||
78 | worker.log(format!("sync archive {}", archive_name)); | |
79 | let tmpfile = std::fs::OpenOptions::new() | |
80 | .write(true) | |
81 | .create(true) | |
82 | .read(true) | |
83 | .open(&tmp_path)?; | |
84 | ||
85 | let tmpfile = reader.download(archive_name, tmpfile).await?; | |
86 | ||
87 | match archive_type(archive_name)? { | |
88 | ArchiveType::DynamicIndex => { | |
89 | let index = DynamicIndexReader::new(tmpfile) | |
90 | .map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?; | |
91 | ||
92 | sync_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?; | |
93 | } | |
94 | ArchiveType::FixedIndex => { | |
95 | let index = FixedIndexReader::new(tmpfile) | |
96 | .map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?; | |
97 | ||
98 | sync_index_chunks(worker, chunk_reader, tgt_store.clone(), index).await?; | |
99 | } | |
100 | ArchiveType::Blob => { /* nothing to do */ } | |
101 | } | |
102 | if let Err(err) = std::fs::rename(&tmp_path, &path) { | |
103 | bail!("Atomic rename file {:?} failed - {}", path, err); | |
104 | } | |
105 | Ok(()) | |
106 | } | |
107 | ||
108 | async fn sync_snapshot( | |
109 | worker: &WorkerTask, | |
110 | reader: Arc<BackupReader>, | |
111 | tgt_store: Arc<DataStore>, | |
112 | snapshot: &BackupDir, | |
113 | ) -> Result<(), Error> { | |
114 | ||
115 | let mut manifest_name = tgt_store.base_path(); | |
116 | manifest_name.push(snapshot.relative_path()); | |
117 | manifest_name.push(MANIFEST_BLOB_NAME); | |
118 | ||
119 | let mut tmp_manifest_name = manifest_name.clone(); | |
120 | tmp_manifest_name.set_extension("tmp"); | |
121 | ||
122 | let mut tmp_manifest_file = download_manifest(&reader, &tmp_manifest_name).await?; | |
123 | let tmp_manifest_blob = DataBlob::load(&mut tmp_manifest_file)?; | |
124 | tmp_manifest_blob.verify_crc()?; | |
125 | ||
126 | if manifest_name.exists() { | |
127 | let manifest_blob = proxmox::tools::try_block!({ | |
128 | let mut manifest_file = std::fs::File::open(&manifest_name) | |
129 | .map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?; | |
130 | ||
131 | let manifest_blob = DataBlob::load(&mut manifest_file)?; | |
132 | manifest_blob.verify_crc()?; | |
133 | Ok(manifest_blob) | |
134 | }).map_err(|err: Error| { | |
135 | format_err!("unable to read local manifest {:?} - {}", manifest_name, err) | |
136 | })?; | |
137 | ||
138 | if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { | |
139 | return Ok(()); // nothing changed | |
140 | } | |
141 | } | |
142 | ||
143 | let manifest = BackupManifest::try_from(tmp_manifest_blob)?; | |
144 | ||
145 | let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, HashMap::new()); | |
146 | ||
147 | for item in manifest.files() { | |
148 | let mut path = tgt_store.base_path(); | |
149 | path.push(snapshot.relative_path()); | |
150 | path.push(&item.filename); | |
151 | ||
152 | if path.exists() { | |
153 | match archive_type(&item.filename)? { | |
154 | ArchiveType::DynamicIndex => { | |
155 | let index = DynamicIndexReader::open(&path)?; | |
156 | let (csum, size) = index.compute_csum(); | |
157 | match manifest.verify_file(&item.filename, &csum, size) { | |
158 | Ok(_) => continue, | |
159 | Err(err) => { | |
160 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
161 | } | |
162 | } | |
163 | } | |
164 | ArchiveType::FixedIndex => { | |
165 | let index = FixedIndexReader::open(&path)?; | |
166 | let (csum, size) = index.compute_csum(); | |
167 | match manifest.verify_file(&item.filename, &csum, size) { | |
168 | Ok(_) => continue, | |
169 | Err(err) => { | |
170 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
171 | } | |
172 | } | |
173 | } | |
174 | ArchiveType::Blob => { | |
175 | let mut tmpfile = std::fs::File::open(&path)?; | |
176 | let (csum, size) = compute_file_csum(&mut tmpfile)?; | |
177 | match manifest.verify_file(&item.filename, &csum, size) { | |
178 | Ok(_) => continue, | |
179 | Err(err) => { | |
180 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
181 | } | |
182 | } | |
183 | } | |
184 | } | |
185 | } | |
186 | ||
187 | sync_single_archive( | |
188 | worker, | |
189 | &reader, | |
190 | &mut chunk_reader, | |
191 | tgt_store.clone(), | |
192 | snapshot, | |
193 | &item.filename, | |
194 | ).await?; | |
195 | } | |
196 | ||
197 | if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { | |
198 | bail!("Atomic rename file {:?} failed - {}", manifest_name, err); | |
199 | } | |
200 | ||
201 | // cleanup - remove stale files | |
202 | tgt_store.cleanup_backup_dir(snapshot, &manifest)?; | |
203 | ||
204 | Ok(()) | |
205 | } | |
206 | ||
207 | pub async fn sync_snapshot_from( | |
208 | worker: &WorkerTask, | |
209 | reader: Arc<BackupReader>, | |
210 | tgt_store: Arc<DataStore>, | |
211 | snapshot: &BackupDir, | |
212 | ) -> Result<(), Error> { | |
213 | ||
214 | let (_path, is_new) = tgt_store.create_backup_dir(&snapshot)?; | |
215 | ||
216 | if is_new { | |
217 | worker.log(format!("sync snapshot {:?}", snapshot.relative_path())); | |
218 | ||
219 | if let Err(err) = sync_snapshot(worker, reader, tgt_store.clone(), &snapshot).await { | |
220 | if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot) { | |
221 | worker.log(format!("cleanup error - {}", cleanup_err)); | |
222 | } | |
223 | return Err(err); | |
224 | } | |
225 | } else { | |
226 | worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path())); | |
227 | sync_snapshot(worker, reader, tgt_store.clone(), &snapshot).await? | |
228 | } | |
229 | ||
230 | Ok(()) | |
231 | } | |
232 | ||
233 | pub async fn sync_group( | |
234 | worker: &WorkerTask, | |
235 | client: &HttpClient, | |
236 | src_repo: &BackupRepository, | |
237 | tgt_store: Arc<DataStore>, | |
238 | group: &BackupGroup, | |
239 | ) -> Result<(), Error> { | |
240 | ||
241 | let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store()); | |
242 | ||
243 | let args = json!({ | |
244 | "backup-type": group.backup_type(), | |
245 | "backup-id": group.backup_id(), | |
246 | }); | |
247 | ||
248 | let mut result = client.get(&path, Some(args)).await?; | |
249 | let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?; | |
250 | ||
251 | list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time)); | |
252 | ||
253 | let auth_info = client.login().await?; | |
254 | ||
255 | let last_sync = group.last_successful_backup(&tgt_store.base_path())?; | |
256 | ||
257 | for item in list { | |
258 | let backup_time = Utc.timestamp(item.backup_time, 0); | |
259 | if let Some(last_sync_time) = last_sync { | |
260 | if last_sync_time > backup_time { continue; } | |
261 | } | |
262 | ||
263 | let new_client = HttpClient::new( | |
264 | src_repo.host(), | |
265 | src_repo.user(), | |
266 | Some(auth_info.ticket.clone()) | |
267 | )?; | |
268 | ||
269 | let reader = BackupReader::start( | |
270 | new_client, | |
271 | None, | |
272 | src_repo.store(), | |
273 | &item.backup_type, | |
274 | &item.backup_id, | |
275 | backup_time, | |
276 | true, | |
277 | ).await?; | |
278 | ||
279 | let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time); | |
280 | ||
281 | sync_snapshot_from(worker, reader, tgt_store.clone(), &snapshot).await?; | |
282 | } | |
283 | ||
284 | Ok(()) | |
285 | } | |
286 | ||
287 | pub async fn sync_store( | |
288 | worker: &WorkerTask, | |
289 | client: &HttpClient, | |
290 | src_repo: &BackupRepository, | |
291 | tgt_store: Arc<DataStore>, | |
292 | ) -> Result<(), Error> { | |
293 | ||
294 | let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store()); | |
295 | ||
296 | let mut result = client.get(&path, None).await?; | |
297 | ||
298 | let list = result["data"].as_array_mut().unwrap(); | |
299 | ||
300 | list.sort_unstable_by(|a, b| { | |
301 | let a_id = a["backup-id"].as_str().unwrap(); | |
302 | let a_backup_type = a["backup-type"].as_str().unwrap(); | |
303 | let b_id = b["backup-id"].as_str().unwrap(); | |
304 | let b_backup_type = b["backup-type"].as_str().unwrap(); | |
305 | ||
306 | let type_order = a_backup_type.cmp(b_backup_type); | |
307 | if type_order == std::cmp::Ordering::Equal { | |
308 | a_id.cmp(b_id) | |
309 | } else { | |
310 | type_order | |
311 | } | |
312 | }); | |
313 | ||
314 | let mut errors = false; | |
315 | ||
316 | for item in list { | |
317 | ||
318 | let id = item["backup-id"].as_str().unwrap(); | |
319 | let btype = item["backup-type"].as_str().unwrap(); | |
320 | ||
321 | let group = BackupGroup::new(btype, id); | |
322 | if let Err(err) = sync_group(worker, client, src_repo, tgt_store.clone(), &group).await { | |
323 | worker.log(format!("sync group {}/{} failed - {}", btype, id, err)); | |
324 | errors = true; | |
325 | // continue | |
326 | } | |
327 | } | |
328 | ||
329 | if errors { | |
330 | bail!("sync failed with some errors."); | |
331 | } | |
332 | ||
333 | Ok(()) | |
334 | } | |
335 | ||
336 | #[api( | |
337 | input: { | |
338 | properties: { | |
339 | store: { | |
340 | schema: DATASTORE_SCHEMA, | |
341 | }, | |
342 | "remote-host": { | |
343 | description: "Remote host", // TODO: use predefined type: host or IP | |
344 | type: String, | |
345 | }, | |
346 | "remote-store": { | |
347 | schema: DATASTORE_SCHEMA, | |
348 | }, | |
349 | "remote-user": { | |
350 | description: "Remote user name.", // TODO: use predefined typed | |
351 | type: String, | |
352 | }, | |
353 | "remote-password": { | |
354 | description: "Remote passsword.", | |
355 | type: String, | |
356 | }, | |
357 | }, | |
358 | }, | |
359 | )] | |
360 | /// Sync store from otherrepository | |
361 | async fn sync_from ( | |
362 | store: String, | |
363 | remote_host: String, | |
364 | remote_store: String, | |
365 | remote_user: String, | |
366 | remote_password: String, | |
367 | _info: &ApiMethod, | |
368 | rpcenv: &mut dyn RpcEnvironment, | |
369 | ) -> Result<String, Error> { | |
370 | ||
371 | let username = rpcenv.get_user().unwrap(); | |
372 | ||
373 | let tgt_store = DataStore::lookup_datastore(&store)?; | |
374 | ||
375 | let client = HttpClient::new(&remote_host, &remote_user, Some(remote_password))?; | |
376 | let _auth_info = client.login() // make sure we can auth | |
377 | .await | |
378 | .map_err(|err| format_err!("remote connection to '{}' failed - {}", remote_host, err))?; | |
379 | ||
380 | let src_repo = BackupRepository::new(Some(remote_user), Some(remote_host), remote_store); | |
381 | ||
382 | // fixme: set to_stdout to false? | |
383 | let upid_str = WorkerTask::spawn("sync", Some(store.clone()), &username.clone(), true, move |worker| async move { | |
384 | ||
385 | worker.log(format!("sync datastore '{}' start", store)); | |
386 | ||
387 | // explicit create shared lock to prevent GC on newly created chunks | |
388 | let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?; | |
389 | ||
390 | sync_store(&worker, &client, &src_repo, tgt_store.clone()).await?; | |
391 | ||
392 | worker.log(format!("sync datastore '{}' end", store)); | |
393 | ||
394 | Ok(()) | |
395 | })?; | |
396 | ||
397 | Ok(upid_str) | |
398 | } | |
399 | ||
400 | pub const ROUTER: Router = Router::new() | |
401 | .post(&API_METHOD_SYNC_FROM); |