]>
Commit | Line | Data |
---|---|---|
07ad6470 DM |
1 | //! Sync datastore from remote server |
2 | ||
3 | use anyhow::{bail, format_err, Error}; | |
4 | use serde_json::json; | |
5 | use std::convert::TryFrom; | |
ebbe4958 DM |
6 | use std::sync::{Arc, Mutex}; |
7 | use std::collections::{HashSet, HashMap}; | |
07ad6470 | 8 | use std::io::{Seek, SeekFrom}; |
998db639 | 9 | use std::time::SystemTime; |
54417086 | 10 | use std::sync::atomic::{AtomicUsize, Ordering}; |
07ad6470 | 11 | |
c1c4a18f | 12 | use proxmox::api::error::{StatusCode, HttpError}; |
1bc1d81a | 13 | use crate::{ |
54417086 | 14 | tools::{ParallelHandler, compute_file_csum}, |
1bc1d81a DM |
15 | server::WorkerTask, |
16 | backup::*, | |
17 | api2::types::*, | |
18 | client::*, | |
19 | }; | |
07ad6470 DM |
20 | |
21 | ||
22 | // fixme: implement filters | |
23 | // fixme: delete vanished groups | |
24 | // Todo: correctly lock backup groups | |
25 | ||
26 | async fn pull_index_chunks<I: IndexFile>( | |
998db639 | 27 | worker: &WorkerTask, |
73b2cc49 | 28 | chunk_reader: RemoteChunkReader, |
07ad6470 DM |
29 | target: Arc<DataStore>, |
30 | index: I, | |
ebbe4958 | 31 | downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>, |
07ad6470 DM |
32 | ) -> Result<(), Error> { |
33 | ||
73b2cc49 | 34 | use futures::stream::{self, StreamExt, TryStreamExt}; |
07ad6470 | 35 | |
998db639 DM |
36 | let start_time = SystemTime::now(); |
37 | ||
ebbe4958 DM |
38 | let stream = stream::iter( |
39 | (0..index.index_count()) | |
40 | .map(|pos| index.chunk_info(pos).unwrap()) | |
41 | .filter(|info| { | |
42 | let mut guard = downloaded_chunks.lock().unwrap(); | |
43 | let done = guard.contains(&info.digest); | |
44 | if !done { | |
45 | // Note: We mark a chunk as downloaded before its actually downloaded | |
46 | // to avoid duplicate downloads. | |
47 | guard.insert(info.digest); | |
48 | } | |
49 | !done | |
50 | }) | |
51 | ); | |
07ad6470 | 52 | |
a71bc08f | 53 | let target2 = target.clone(); |
54417086 DM |
54 | let verify_pool = ParallelHandler::new( |
55 | "sync chunk writer", 4, | |
a71bc08f | 56 | move |(chunk, digest, size): (DataBlob, [u8;32], u64)| { |
54417086 DM |
57 | // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest)); |
58 | chunk.verify_unencrypted(size as usize, &digest)?; | |
a71bc08f | 59 | target2.insert_chunk(&chunk, &digest)?; |
54417086 DM |
60 | Ok(()) |
61 | } | |
62 | ); | |
63 | ||
64 | let verify_and_write_channel = verify_pool.channel(); | |
998db639 | 65 | |
54417086 DM |
66 | let bytes = Arc::new(AtomicUsize::new(0)); |
67 | ||
68 | stream | |
73b2cc49 | 69 | .map(|info| { |
9e496ff6 | 70 | |
73b2cc49 DM |
71 | let target = Arc::clone(&target); |
72 | let chunk_reader = chunk_reader.clone(); | |
54417086 DM |
73 | let bytes = Arc::clone(&bytes); |
74 | let verify_and_write_channel = verify_and_write_channel.clone(); | |
73b2cc49 DM |
75 | |
76 | Ok::<_, Error>(async move { | |
77 | let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?; | |
78 | if chunk_exists { | |
79 | //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest))); | |
80 | return Ok::<_, Error>(()); | |
81 | } | |
82 | //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest))); | |
83 | let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; | |
54417086 | 84 | let raw_size = chunk.raw_size() as usize; |
73b2cc49 | 85 | |
998db639 | 86 | // decode, verify and write in a separate threads to maximize throughput |
54417086 DM |
87 | crate::tools::runtime::block_in_place(|| verify_and_write_channel.send((chunk, info.digest, info.size())))?; |
88 | ||
89 | bytes.fetch_add(raw_size, Ordering::SeqCst); | |
998db639 DM |
90 | |
91 | Ok(()) | |
92 | }) | |
73b2cc49 DM |
93 | }) |
94 | .try_buffer_unordered(20) | |
95 | .try_for_each(|_res| futures::future::ok(())) | |
54417086 | 96 | .await?; |
998db639 | 97 | |
54417086 | 98 | drop(verify_and_write_channel); |
998db639 | 99 | |
54417086 | 100 | verify_pool.complete()?; |
998db639 DM |
101 | |
102 | let elapsed = start_time.elapsed()?.as_secs_f64(); | |
103 | ||
54417086 DM |
104 | let bytes = bytes.load(Ordering::SeqCst); |
105 | ||
42ca9e91 | 106 | worker.log(format!("downloaded {} bytes ({:.2} MiB/s)", bytes, (bytes as f64)/(1024.0*1024.0*elapsed))); |
07ad6470 DM |
107 | |
108 | Ok(()) | |
109 | } | |
110 | ||
111 | async fn download_manifest( | |
112 | reader: &BackupReader, | |
113 | filename: &std::path::Path, | |
114 | ) -> Result<std::fs::File, Error> { | |
115 | ||
3d571d55 | 116 | let mut tmp_manifest_file = std::fs::OpenOptions::new() |
07ad6470 DM |
117 | .write(true) |
118 | .create(true) | |
194da6f8 | 119 | .truncate(true) |
07ad6470 DM |
120 | .read(true) |
121 | .open(&filename)?; | |
122 | ||
3d571d55 | 123 | reader.download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file).await?; |
07ad6470 DM |
124 | |
125 | tmp_manifest_file.seek(SeekFrom::Start(0))?; | |
126 | ||
127 | Ok(tmp_manifest_file) | |
128 | } | |
129 | ||
2ce15934 FG |
130 | fn verify_archive( |
131 | info: &FileInfo, | |
132 | csum: &[u8; 32], | |
133 | size: u64, | |
134 | ) -> Result<(), Error> { | |
135 | if size != info.size { | |
136 | bail!("wrong size for file '{}' ({} != {})", info.filename, info.size, size); | |
137 | } | |
138 | ||
139 | if csum != &info.csum { | |
140 | bail!("wrong checksum for file '{}'", info.filename); | |
141 | } | |
142 | ||
143 | Ok(()) | |
144 | } | |
145 | ||
07ad6470 DM |
146 | async fn pull_single_archive( |
147 | worker: &WorkerTask, | |
148 | reader: &BackupReader, | |
149 | chunk_reader: &mut RemoteChunkReader, | |
150 | tgt_store: Arc<DataStore>, | |
151 | snapshot: &BackupDir, | |
2ce15934 | 152 | archive_info: &FileInfo, |
ebbe4958 | 153 | downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>, |
07ad6470 DM |
154 | ) -> Result<(), Error> { |
155 | ||
2ce15934 | 156 | let archive_name = &archive_info.filename; |
07ad6470 DM |
157 | let mut path = tgt_store.base_path(); |
158 | path.push(snapshot.relative_path()); | |
159 | path.push(archive_name); | |
160 | ||
161 | let mut tmp_path = path.clone(); | |
162 | tmp_path.set_extension("tmp"); | |
163 | ||
164 | worker.log(format!("sync archive {}", archive_name)); | |
3d571d55 | 165 | let mut tmpfile = std::fs::OpenOptions::new() |
07ad6470 DM |
166 | .write(true) |
167 | .create(true) | |
168 | .read(true) | |
169 | .open(&tmp_path)?; | |
170 | ||
3d571d55 | 171 | reader.download(archive_name, &mut tmpfile).await?; |
07ad6470 DM |
172 | |
173 | match archive_type(archive_name)? { | |
174 | ArchiveType::DynamicIndex => { | |
175 | let index = DynamicIndexReader::new(tmpfile) | |
176 | .map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?; | |
2ce15934 FG |
177 | let (csum, size) = index.compute_csum(); |
178 | verify_archive(archive_info, &csum, size)?; | |
07ad6470 | 179 | |
ebbe4958 | 180 | pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?; |
07ad6470 DM |
181 | } |
182 | ArchiveType::FixedIndex => { | |
183 | let index = FixedIndexReader::new(tmpfile) | |
184 | .map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?; | |
2ce15934 FG |
185 | let (csum, size) = index.compute_csum(); |
186 | verify_archive(archive_info, &csum, size)?; | |
07ad6470 | 187 | |
ebbe4958 | 188 | pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?; |
07ad6470 | 189 | } |
2ce15934 FG |
190 | ArchiveType::Blob => { |
191 | let (csum, size) = compute_file_csum(&mut tmpfile)?; | |
192 | verify_archive(archive_info, &csum, size)?; | |
193 | } | |
07ad6470 DM |
194 | } |
195 | if let Err(err) = std::fs::rename(&tmp_path, &path) { | |
196 | bail!("Atomic rename file {:?} failed - {}", path, err); | |
197 | } | |
198 | Ok(()) | |
199 | } | |
200 | ||
1610c45a DM |
201 | // Note: The client.log.blob is uploaded after the backup, so it is |
202 | // not mentioned in the manifest. | |
203 | async fn try_client_log_download( | |
204 | worker: &WorkerTask, | |
205 | reader: Arc<BackupReader>, | |
206 | path: &std::path::Path, | |
207 | ) -> Result<(), Error> { | |
208 | ||
209 | let mut tmp_path = path.to_owned(); | |
210 | tmp_path.set_extension("tmp"); | |
211 | ||
212 | let tmpfile = std::fs::OpenOptions::new() | |
213 | .write(true) | |
214 | .create(true) | |
215 | .read(true) | |
216 | .open(&tmp_path)?; | |
217 | ||
add5861e | 218 | // Note: be silent if there is no log - only log successful download |
3d571d55 | 219 | if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await { |
1610c45a DM |
220 | if let Err(err) = std::fs::rename(&tmp_path, &path) { |
221 | bail!("Atomic rename file {:?} failed - {}", path, err); | |
222 | } | |
add5861e | 223 | worker.log(format!("got backup log file {:?}", CLIENT_LOG_BLOB_NAME)); |
1610c45a DM |
224 | } |
225 | ||
226 | Ok(()) | |
227 | } | |
228 | ||
07ad6470 DM |
229 | async fn pull_snapshot( |
230 | worker: &WorkerTask, | |
231 | reader: Arc<BackupReader>, | |
232 | tgt_store: Arc<DataStore>, | |
233 | snapshot: &BackupDir, | |
ebbe4958 | 234 | downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>, |
07ad6470 DM |
235 | ) -> Result<(), Error> { |
236 | ||
237 | let mut manifest_name = tgt_store.base_path(); | |
238 | manifest_name.push(snapshot.relative_path()); | |
239 | manifest_name.push(MANIFEST_BLOB_NAME); | |
240 | ||
1610c45a DM |
241 | let mut client_log_name = tgt_store.base_path(); |
242 | client_log_name.push(snapshot.relative_path()); | |
243 | client_log_name.push(CLIENT_LOG_BLOB_NAME); | |
244 | ||
07ad6470 DM |
245 | let mut tmp_manifest_name = manifest_name.clone(); |
246 | tmp_manifest_name.set_extension("tmp"); | |
247 | ||
c1c4a18f FG |
248 | let download_res = download_manifest(&reader, &tmp_manifest_name).await; |
249 | let mut tmp_manifest_file = match download_res { | |
250 | Ok(manifest_file) => manifest_file, | |
251 | Err(err) => { | |
252 | match err.downcast_ref::<HttpError>() { | |
253 | Some(HttpError { code, message }) => { | |
54aec2fa WB |
254 | match *code { |
255 | StatusCode::NOT_FOUND => { | |
c1c4a18f FG |
256 | worker.log(format!("skipping snapshot {} - vanished since start of sync", snapshot)); |
257 | return Ok(()); | |
258 | }, | |
259 | _ => { | |
260 | bail!("HTTP error {} - {}", code, message); | |
261 | }, | |
262 | } | |
263 | }, | |
264 | None => { | |
265 | return Err(err); | |
266 | }, | |
267 | }; | |
268 | }, | |
269 | }; | |
39f18b30 | 270 | let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?; |
07ad6470 DM |
271 | |
272 | if manifest_name.exists() { | |
273 | let manifest_blob = proxmox::try_block!({ | |
274 | let mut manifest_file = std::fs::File::open(&manifest_name) | |
275 | .map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?; | |
276 | ||
39f18b30 | 277 | let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?; |
07ad6470 DM |
278 | Ok(manifest_blob) |
279 | }).map_err(|err: Error| { | |
280 | format_err!("unable to read local manifest {:?} - {}", manifest_name, err) | |
281 | })?; | |
282 | ||
283 | if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { | |
1610c45a DM |
284 | if !client_log_name.exists() { |
285 | try_client_log_download(worker, reader, &client_log_name).await?; | |
286 | } | |
287 | worker.log("no data changes"); | |
e0085e66 | 288 | let _ = std::fs::remove_file(&tmp_manifest_name); |
07ad6470 DM |
289 | return Ok(()); // nothing changed |
290 | } | |
291 | } | |
292 | ||
293 | let manifest = BackupManifest::try_from(tmp_manifest_blob)?; | |
294 | ||
07ad6470 DM |
295 | for item in manifest.files() { |
296 | let mut path = tgt_store.base_path(); | |
297 | path.push(snapshot.relative_path()); | |
298 | path.push(&item.filename); | |
299 | ||
300 | if path.exists() { | |
301 | match archive_type(&item.filename)? { | |
302 | ArchiveType::DynamicIndex => { | |
303 | let index = DynamicIndexReader::open(&path)?; | |
304 | let (csum, size) = index.compute_csum(); | |
305 | match manifest.verify_file(&item.filename, &csum, size) { | |
306 | Ok(_) => continue, | |
307 | Err(err) => { | |
308 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
309 | } | |
310 | } | |
311 | } | |
312 | ArchiveType::FixedIndex => { | |
313 | let index = FixedIndexReader::open(&path)?; | |
314 | let (csum, size) = index.compute_csum(); | |
315 | match manifest.verify_file(&item.filename, &csum, size) { | |
316 | Ok(_) => continue, | |
317 | Err(err) => { | |
318 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
319 | } | |
320 | } | |
321 | } | |
322 | ArchiveType::Blob => { | |
323 | let mut tmpfile = std::fs::File::open(&path)?; | |
324 | let (csum, size) = compute_file_csum(&mut tmpfile)?; | |
325 | match manifest.verify_file(&item.filename, &csum, size) { | |
326 | Ok(_) => continue, | |
327 | Err(err) => { | |
328 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
329 | } | |
330 | } | |
331 | } | |
332 | } | |
333 | } | |
334 | ||
14f6c9cb FG |
335 | let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, item.chunk_crypt_mode(), HashMap::new()); |
336 | ||
07ad6470 DM |
337 | pull_single_archive( |
338 | worker, | |
339 | &reader, | |
340 | &mut chunk_reader, | |
341 | tgt_store.clone(), | |
342 | snapshot, | |
2ce15934 | 343 | &item, |
ebbe4958 | 344 | downloaded_chunks.clone(), |
07ad6470 DM |
345 | ).await?; |
346 | } | |
347 | ||
348 | if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { | |
349 | bail!("Atomic rename file {:?} failed - {}", manifest_name, err); | |
350 | } | |
351 | ||
1610c45a DM |
352 | if !client_log_name.exists() { |
353 | try_client_log_download(worker, reader, &client_log_name).await?; | |
354 | } | |
355 | ||
07ad6470 DM |
356 | // cleanup - remove stale files |
357 | tgt_store.cleanup_backup_dir(snapshot, &manifest)?; | |
358 | ||
359 | Ok(()) | |
360 | } | |
361 | ||
362 | pub async fn pull_snapshot_from( | |
363 | worker: &WorkerTask, | |
364 | reader: Arc<BackupReader>, | |
365 | tgt_store: Arc<DataStore>, | |
366 | snapshot: &BackupDir, | |
ebbe4958 | 367 | downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>, |
07ad6470 DM |
368 | ) -> Result<(), Error> { |
369 | ||
f23f7543 | 370 | let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?; |
07ad6470 DM |
371 | |
372 | if is_new { | |
373 | worker.log(format!("sync snapshot {:?}", snapshot.relative_path())); | |
374 | ||
ebbe4958 | 375 | if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await { |
c9756b40 | 376 | if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) { |
07ad6470 DM |
377 | worker.log(format!("cleanup error - {}", cleanup_err)); |
378 | } | |
379 | return Err(err); | |
380 | } | |
4856a218 | 381 | worker.log(format!("sync snapshot {:?} done", snapshot.relative_path())); |
07ad6470 DM |
382 | } else { |
383 | worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path())); | |
ebbe4958 | 384 | pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await?; |
4856a218 | 385 | worker.log(format!("re-sync snapshot {:?} done", snapshot.relative_path())); |
07ad6470 DM |
386 | } |
387 | ||
388 | Ok(()) | |
389 | } | |
390 | ||
391 | pub async fn pull_group( | |
392 | worker: &WorkerTask, | |
393 | client: &HttpClient, | |
394 | src_repo: &BackupRepository, | |
395 | tgt_store: Arc<DataStore>, | |
396 | group: &BackupGroup, | |
397 | delete: bool, | |
7b8aa893 | 398 | progress: Option<(usize, usize)>, // (groups_done, group_count) |
07ad6470 DM |
399 | ) -> Result<(), Error> { |
400 | ||
401 | let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store()); | |
402 | ||
403 | let args = json!({ | |
404 | "backup-type": group.backup_type(), | |
405 | "backup-id": group.backup_id(), | |
406 | }); | |
407 | ||
408 | let mut result = client.get(&path, Some(args)).await?; | |
409 | let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?; | |
410 | ||
411 | list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time)); | |
412 | ||
0081903f DM |
413 | client.login().await?; // make sure auth is complete |
414 | ||
07ad6470 DM |
415 | let fingerprint = client.fingerprint(); |
416 | ||
417 | let last_sync = tgt_store.last_successful_backup(group)?; | |
418 | ||
419 | let mut remote_snapshots = std::collections::HashSet::new(); | |
420 | ||
7b8aa893 DM |
421 | let (per_start, per_group) = if let Some((groups_done, group_count)) = progress { |
422 | let per_start = (groups_done as f64)/(group_count as f64); | |
423 | let per_group = 1.0/(group_count as f64); | |
424 | (per_start, per_group) | |
425 | } else { | |
426 | (0.0, 1.0) | |
427 | }; | |
428 | ||
ebbe4958 DM |
429 | // start with 16384 chunks (up to 65GB) |
430 | let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*64))); | |
431 | ||
7b8aa893 DM |
432 | let snapshot_count = list.len(); |
433 | ||
434 | for (pos, item) in list.into_iter().enumerate() { | |
e0e5b442 | 435 | let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?; |
86f6f741 FG |
436 | |
437 | // in-progress backups can't be synced | |
54aec2fa | 438 | if item.size.is_none() { |
86f6f741 FG |
439 | worker.log(format!("skipping snapshot {} - in-progress backup", snapshot)); |
440 | continue; | |
441 | } | |
442 | ||
443 | let backup_time = snapshot.backup_time(); | |
444 | ||
07ad6470 DM |
445 | remote_snapshots.insert(backup_time); |
446 | ||
447 | if let Some(last_sync_time) = last_sync { | |
448 | if last_sync_time > backup_time { continue; } | |
449 | } | |
450 | ||
0081903f DM |
451 | // get updated auth_info (new tickets) |
452 | let auth_info = client.login().await?; | |
453 | ||
07ad6470 DM |
454 | let options = HttpClientOptions::new() |
455 | .password(Some(auth_info.ticket.clone())) | |
456 | .fingerprint(fingerprint.clone()); | |
457 | ||
34aa8e13 | 458 | let new_client = HttpClient::new(src_repo.host(), src_repo.port(), src_repo.auth_id(), options)?; |
07ad6470 DM |
459 | |
460 | let reader = BackupReader::start( | |
461 | new_client, | |
462 | None, | |
463 | src_repo.store(), | |
86f6f741 FG |
464 | snapshot.group().backup_type(), |
465 | snapshot.group().backup_id(), | |
07ad6470 DM |
466 | backup_time, |
467 | true, | |
468 | ).await?; | |
469 | ||
7b8aa893 DM |
470 | let result = pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks.clone()).await; |
471 | ||
472 | let percentage = (pos as f64)/(snapshot_count as f64); | |
473 | let percentage = per_start + percentage*per_group; | |
474 | worker.log(format!("percentage done: {:.2}%", percentage*100.0)); | |
475 | ||
476 | result?; // stop on error | |
07ad6470 DM |
477 | } |
478 | ||
479 | if delete { | |
480 | let local_list = group.list_backups(&tgt_store.base_path())?; | |
481 | for info in local_list { | |
482 | let backup_time = info.backup_dir.backup_time(); | |
483 | if remote_snapshots.contains(&backup_time) { continue; } | |
484 | worker.log(format!("delete vanished snapshot {:?}", info.backup_dir.relative_path())); | |
c9756b40 | 485 | tgt_store.remove_backup_dir(&info.backup_dir, false)?; |
07ad6470 DM |
486 | } |
487 | } | |
488 | ||
489 | Ok(()) | |
490 | } | |
491 | ||
492 | pub async fn pull_store( | |
493 | worker: &WorkerTask, | |
494 | client: &HttpClient, | |
495 | src_repo: &BackupRepository, | |
496 | tgt_store: Arc<DataStore>, | |
497 | delete: bool, | |
e6dc35ac | 498 | auth_id: Authid, |
07ad6470 DM |
499 | ) -> Result<(), Error> { |
500 | ||
501 | // explicit create shared lock to prevent GC on newly created chunks | |
502 | let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?; | |
503 | ||
504 | let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store()); | |
505 | ||
506 | let mut result = client.get(&path, None).await?; | |
507 | ||
508 | let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?; | |
509 | ||
510 | list.sort_unstable_by(|a, b| { | |
511 | let type_order = a.backup_type.cmp(&b.backup_type); | |
512 | if type_order == std::cmp::Ordering::Equal { | |
513 | a.backup_id.cmp(&b.backup_id) | |
514 | } else { | |
515 | type_order | |
516 | } | |
517 | }); | |
518 | ||
519 | let mut errors = false; | |
520 | ||
521 | let mut new_groups = std::collections::HashSet::new(); | |
522 | for item in list.iter() { | |
523 | new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id)); | |
524 | } | |
525 | ||
7b8aa893 | 526 | let group_count = list.len(); |
7b8aa893 | 527 | |
54aec2fa | 528 | for (groups_done, item) in list.into_iter().enumerate() { |
07ad6470 DM |
529 | let group = BackupGroup::new(&item.backup_type, &item.backup_id); |
530 | ||
30f73fa2 DM |
531 | let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) { |
532 | Ok(result) => result, | |
533 | Err(err) => { | |
534 | worker.log(format!("sync group {}/{} failed - group lock failed: {}", | |
535 | item.backup_type, item.backup_id, err)); | |
536 | errors = true; // do not stop here, instead continue | |
537 | continue; | |
538 | } | |
539 | }; | |
540 | ||
07ad6470 | 541 | // permission check |
e6dc35ac | 542 | if auth_id != owner { // only the owner is allowed to create additional snapshots |
07ad6470 | 543 | worker.log(format!("sync group {}/{} failed - owner check failed ({} != {})", |
e6dc35ac | 544 | item.backup_type, item.backup_id, auth_id, owner)); |
7b8aa893 | 545 | errors = true; // do not stop here, instead continue |
07ad6470 | 546 | |
20813274 WB |
547 | } else if let Err(err) = pull_group( |
548 | worker, | |
549 | client, | |
550 | src_repo, | |
551 | tgt_store.clone(), | |
552 | &group, | |
553 | delete, | |
554 | Some((groups_done, group_count)), | |
555 | ).await { | |
556 | worker.log(format!( | |
557 | "sync group {}/{} failed - {}", | |
558 | item.backup_type, | |
559 | item.backup_id, | |
560 | err, | |
561 | )); | |
562 | errors = true; // do not stop here, instead continue | |
07ad6470 DM |
563 | } |
564 | } | |
565 | ||
566 | if delete { | |
567 | let result: Result<(), Error> = proxmox::try_block!({ | |
568 | let local_groups = BackupGroup::list_groups(&tgt_store.base_path())?; | |
569 | for local_group in local_groups { | |
570 | if new_groups.contains(&local_group) { continue; } | |
571 | worker.log(format!("delete vanished group '{}/{}'", local_group.backup_type(), local_group.backup_id())); | |
572 | if let Err(err) = tgt_store.remove_backup_group(&local_group) { | |
573 | worker.log(err.to_string()); | |
574 | errors = true; | |
575 | } | |
576 | } | |
577 | Ok(()) | |
578 | }); | |
579 | if let Err(err) = result { | |
580 | worker.log(format!("error during cleanup: {}", err)); | |
581 | errors = true; | |
582 | }; | |
583 | } | |
584 | ||
585 | if errors { | |
586 | bail!("sync failed with some errors."); | |
587 | } | |
588 | ||
589 | Ok(()) | |
590 | } |