]>
Commit | Line | Data |
---|---|---|
07ad6470 DM |
1 | //! Sync datastore from remote server |
2 | ||
3 | use anyhow::{bail, format_err, Error}; | |
4 | use serde_json::json; | |
e2956c60 | 5 | use std::collections::{HashMap, HashSet}; |
07ad6470 | 6 | use std::convert::TryFrom; |
07ad6470 | 7 | use std::io::{Seek, SeekFrom}; |
54417086 | 8 | use std::sync::atomic::{AtomicUsize, Ordering}; |
e2956c60 FG |
9 | use std::sync::{Arc, Mutex}; |
10 | use std::time::SystemTime; | |
07ad6470 | 11 | |
1bc1d81a | 12 | use crate::{ |
1bc1d81a | 13 | api2::types::*, |
e2956c60 | 14 | backup::*, |
1bc1d81a | 15 | client::*, |
e2956c60 | 16 | server::WorkerTask, |
d2354a16 | 17 | task_log, |
e2956c60 | 18 | tools::{compute_file_csum, ParallelHandler}, |
1bc1d81a | 19 | }; |
e2956c60 | 20 | use proxmox::api::error::{HttpError, StatusCode}; |
07ad6470 DM |
21 | |
22 | // fixme: implement filters | |
23 | // fixme: delete vanished groups | |
24 | // Todo: correctly lock backup groups | |
25 | ||
26 | async fn pull_index_chunks<I: IndexFile>( | |
998db639 | 27 | worker: &WorkerTask, |
73b2cc49 | 28 | chunk_reader: RemoteChunkReader, |
07ad6470 DM |
29 | target: Arc<DataStore>, |
30 | index: I, | |
e2956c60 | 31 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 32 | ) -> Result<(), Error> { |
73b2cc49 | 33 | use futures::stream::{self, StreamExt, TryStreamExt}; |
07ad6470 | 34 | |
998db639 DM |
35 | let start_time = SystemTime::now(); |
36 | ||
ebbe4958 DM |
37 | let stream = stream::iter( |
38 | (0..index.index_count()) | |
39 | .map(|pos| index.chunk_info(pos).unwrap()) | |
40 | .filter(|info| { | |
41 | let mut guard = downloaded_chunks.lock().unwrap(); | |
42 | let done = guard.contains(&info.digest); | |
43 | if !done { | |
44 | // Note: We mark a chunk as downloaded before its actually downloaded | |
45 | // to avoid duplicate downloads. | |
46 | guard.insert(info.digest); | |
47 | } | |
48 | !done | |
e2956c60 | 49 | }), |
ebbe4958 | 50 | ); |
07ad6470 | 51 | |
a71bc08f | 52 | let target2 = target.clone(); |
54417086 | 53 | let verify_pool = ParallelHandler::new( |
e2956c60 FG |
54 | "sync chunk writer", |
55 | 4, | |
56 | move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { | |
54417086 DM |
57 | // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest)); |
58 | chunk.verify_unencrypted(size as usize, &digest)?; | |
a71bc08f | 59 | target2.insert_chunk(&chunk, &digest)?; |
54417086 | 60 | Ok(()) |
e2956c60 | 61 | }, |
54417086 DM |
62 | ); |
63 | ||
64 | let verify_and_write_channel = verify_pool.channel(); | |
998db639 | 65 | |
54417086 DM |
66 | let bytes = Arc::new(AtomicUsize::new(0)); |
67 | ||
68 | stream | |
73b2cc49 | 69 | .map(|info| { |
73b2cc49 DM |
70 | let target = Arc::clone(&target); |
71 | let chunk_reader = chunk_reader.clone(); | |
54417086 DM |
72 | let bytes = Arc::clone(&bytes); |
73 | let verify_and_write_channel = verify_and_write_channel.clone(); | |
73b2cc49 DM |
74 | |
75 | Ok::<_, Error>(async move { | |
d420962f | 76 | let chunk_exists = pbs_runtime::block_in_place(|| { |
e2956c60 FG |
77 | target.cond_touch_chunk(&info.digest, false) |
78 | })?; | |
73b2cc49 DM |
79 | if chunk_exists { |
80 | //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest))); | |
81 | return Ok::<_, Error>(()); | |
82 | } | |
83 | //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest))); | |
84 | let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; | |
54417086 | 85 | let raw_size = chunk.raw_size() as usize; |
73b2cc49 | 86 | |
998db639 | 87 | // decode, verify and write in a separate threads to maximize throughput |
d420962f | 88 | pbs_runtime::block_in_place(|| { |
e2956c60 FG |
89 | verify_and_write_channel.send((chunk, info.digest, info.size())) |
90 | })?; | |
54417086 DM |
91 | |
92 | bytes.fetch_add(raw_size, Ordering::SeqCst); | |
998db639 DM |
93 | |
94 | Ok(()) | |
e2956c60 | 95 | }) |
73b2cc49 DM |
96 | }) |
97 | .try_buffer_unordered(20) | |
98 | .try_for_each(|_res| futures::future::ok(())) | |
54417086 | 99 | .await?; |
998db639 | 100 | |
54417086 | 101 | drop(verify_and_write_channel); |
998db639 | 102 | |
54417086 | 103 | verify_pool.complete()?; |
998db639 DM |
104 | |
105 | let elapsed = start_time.elapsed()?.as_secs_f64(); | |
106 | ||
54417086 DM |
107 | let bytes = bytes.load(Ordering::SeqCst); |
108 | ||
e2956c60 FG |
109 | worker.log(format!( |
110 | "downloaded {} bytes ({:.2} MiB/s)", | |
111 | bytes, | |
112 | (bytes as f64) / (1024.0 * 1024.0 * elapsed) | |
113 | )); | |
07ad6470 DM |
114 | |
115 | Ok(()) | |
116 | } | |
117 | ||
118 | async fn download_manifest( | |
119 | reader: &BackupReader, | |
120 | filename: &std::path::Path, | |
121 | ) -> Result<std::fs::File, Error> { | |
3d571d55 | 122 | let mut tmp_manifest_file = std::fs::OpenOptions::new() |
07ad6470 DM |
123 | .write(true) |
124 | .create(true) | |
194da6f8 | 125 | .truncate(true) |
07ad6470 DM |
126 | .read(true) |
127 | .open(&filename)?; | |
128 | ||
e2956c60 FG |
129 | reader |
130 | .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file) | |
131 | .await?; | |
07ad6470 DM |
132 | |
133 | tmp_manifest_file.seek(SeekFrom::Start(0))?; | |
134 | ||
135 | Ok(tmp_manifest_file) | |
136 | } | |
137 | ||
e2956c60 | 138 | fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> { |
2ce15934 | 139 | if size != info.size { |
e2956c60 FG |
140 | bail!( |
141 | "wrong size for file '{}' ({} != {})", | |
142 | info.filename, | |
143 | info.size, | |
144 | size | |
145 | ); | |
2ce15934 FG |
146 | } |
147 | ||
148 | if csum != &info.csum { | |
149 | bail!("wrong checksum for file '{}'", info.filename); | |
150 | } | |
151 | ||
152 | Ok(()) | |
153 | } | |
154 | ||
07ad6470 DM |
155 | async fn pull_single_archive( |
156 | worker: &WorkerTask, | |
157 | reader: &BackupReader, | |
158 | chunk_reader: &mut RemoteChunkReader, | |
159 | tgt_store: Arc<DataStore>, | |
160 | snapshot: &BackupDir, | |
2ce15934 | 161 | archive_info: &FileInfo, |
e2956c60 | 162 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 163 | ) -> Result<(), Error> { |
2ce15934 | 164 | let archive_name = &archive_info.filename; |
07ad6470 DM |
165 | let mut path = tgt_store.base_path(); |
166 | path.push(snapshot.relative_path()); | |
167 | path.push(archive_name); | |
168 | ||
169 | let mut tmp_path = path.clone(); | |
170 | tmp_path.set_extension("tmp"); | |
171 | ||
172 | worker.log(format!("sync archive {}", archive_name)); | |
3d571d55 | 173 | let mut tmpfile = std::fs::OpenOptions::new() |
07ad6470 DM |
174 | .write(true) |
175 | .create(true) | |
176 | .read(true) | |
177 | .open(&tmp_path)?; | |
178 | ||
3d571d55 | 179 | reader.download(archive_name, &mut tmpfile).await?; |
07ad6470 DM |
180 | |
181 | match archive_type(archive_name)? { | |
182 | ArchiveType::DynamicIndex => { | |
e2956c60 FG |
183 | let index = DynamicIndexReader::new(tmpfile).map_err(|err| { |
184 | format_err!("unable to read dynamic index {:?} - {}", tmp_path, err) | |
185 | })?; | |
2ce15934 FG |
186 | let (csum, size) = index.compute_csum(); |
187 | verify_archive(archive_info, &csum, size)?; | |
07ad6470 | 188 | |
e2956c60 FG |
189 | pull_index_chunks( |
190 | worker, | |
191 | chunk_reader.clone(), | |
192 | tgt_store.clone(), | |
193 | index, | |
194 | downloaded_chunks, | |
195 | ) | |
196 | .await?; | |
07ad6470 DM |
197 | } |
198 | ArchiveType::FixedIndex => { | |
e2956c60 FG |
199 | let index = FixedIndexReader::new(tmpfile).map_err(|err| { |
200 | format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err) | |
201 | })?; | |
2ce15934 FG |
202 | let (csum, size) = index.compute_csum(); |
203 | verify_archive(archive_info, &csum, size)?; | |
07ad6470 | 204 | |
e2956c60 FG |
205 | pull_index_chunks( |
206 | worker, | |
207 | chunk_reader.clone(), | |
208 | tgt_store.clone(), | |
209 | index, | |
210 | downloaded_chunks, | |
211 | ) | |
212 | .await?; | |
07ad6470 | 213 | } |
2ce15934 FG |
214 | ArchiveType::Blob => { |
215 | let (csum, size) = compute_file_csum(&mut tmpfile)?; | |
216 | verify_archive(archive_info, &csum, size)?; | |
217 | } | |
07ad6470 DM |
218 | } |
219 | if let Err(err) = std::fs::rename(&tmp_path, &path) { | |
220 | bail!("Atomic rename file {:?} failed - {}", path, err); | |
221 | } | |
222 | Ok(()) | |
223 | } | |
224 | ||
1610c45a DM |
225 | // Note: The client.log.blob is uploaded after the backup, so it is |
226 | // not mentioned in the manifest. | |
227 | async fn try_client_log_download( | |
228 | worker: &WorkerTask, | |
229 | reader: Arc<BackupReader>, | |
230 | path: &std::path::Path, | |
231 | ) -> Result<(), Error> { | |
1610c45a DM |
232 | let mut tmp_path = path.to_owned(); |
233 | tmp_path.set_extension("tmp"); | |
234 | ||
235 | let tmpfile = std::fs::OpenOptions::new() | |
236 | .write(true) | |
237 | .create(true) | |
238 | .read(true) | |
239 | .open(&tmp_path)?; | |
240 | ||
add5861e | 241 | // Note: be silent if there is no log - only log successful download |
3d571d55 | 242 | if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await { |
1610c45a DM |
243 | if let Err(err) = std::fs::rename(&tmp_path, &path) { |
244 | bail!("Atomic rename file {:?} failed - {}", path, err); | |
245 | } | |
add5861e | 246 | worker.log(format!("got backup log file {:?}", CLIENT_LOG_BLOB_NAME)); |
1610c45a DM |
247 | } |
248 | ||
249 | Ok(()) | |
250 | } | |
251 | ||
07ad6470 DM |
252 | async fn pull_snapshot( |
253 | worker: &WorkerTask, | |
254 | reader: Arc<BackupReader>, | |
255 | tgt_store: Arc<DataStore>, | |
256 | snapshot: &BackupDir, | |
e2956c60 | 257 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 258 | ) -> Result<(), Error> { |
07ad6470 DM |
259 | let mut manifest_name = tgt_store.base_path(); |
260 | manifest_name.push(snapshot.relative_path()); | |
261 | manifest_name.push(MANIFEST_BLOB_NAME); | |
262 | ||
1610c45a DM |
263 | let mut client_log_name = tgt_store.base_path(); |
264 | client_log_name.push(snapshot.relative_path()); | |
265 | client_log_name.push(CLIENT_LOG_BLOB_NAME); | |
266 | ||
07ad6470 DM |
267 | let mut tmp_manifest_name = manifest_name.clone(); |
268 | tmp_manifest_name.set_extension("tmp"); | |
269 | ||
c1c4a18f FG |
270 | let download_res = download_manifest(&reader, &tmp_manifest_name).await; |
271 | let mut tmp_manifest_file = match download_res { | |
272 | Ok(manifest_file) => manifest_file, | |
273 | Err(err) => { | |
274 | match err.downcast_ref::<HttpError>() { | |
e2956c60 FG |
275 | Some(HttpError { code, message }) => match *code { |
276 | StatusCode::NOT_FOUND => { | |
277 | worker.log(format!( | |
278 | "skipping snapshot {} - vanished since start of sync", | |
279 | snapshot | |
280 | )); | |
281 | return Ok(()); | |
282 | } | |
283 | _ => { | |
284 | bail!("HTTP error {} - {}", code, message); | |
c1c4a18f FG |
285 | } |
286 | }, | |
287 | None => { | |
288 | return Err(err); | |
e2956c60 | 289 | } |
c1c4a18f | 290 | }; |
e2956c60 | 291 | } |
c1c4a18f | 292 | }; |
39f18b30 | 293 | let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?; |
07ad6470 DM |
294 | |
295 | if manifest_name.exists() { | |
296 | let manifest_blob = proxmox::try_block!({ | |
e2956c60 FG |
297 | let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| { |
298 | format_err!( | |
299 | "unable to open local manifest {:?} - {}", | |
300 | manifest_name, | |
301 | err | |
302 | ) | |
303 | })?; | |
07ad6470 | 304 | |
39f18b30 | 305 | let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?; |
07ad6470 | 306 | Ok(manifest_blob) |
e2956c60 FG |
307 | }) |
308 | .map_err(|err: Error| { | |
309 | format_err!( | |
310 | "unable to read local manifest {:?} - {}", | |
311 | manifest_name, | |
312 | err | |
313 | ) | |
07ad6470 DM |
314 | })?; |
315 | ||
316 | if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { | |
1610c45a DM |
317 | if !client_log_name.exists() { |
318 | try_client_log_download(worker, reader, &client_log_name).await?; | |
319 | } | |
320 | worker.log("no data changes"); | |
e0085e66 | 321 | let _ = std::fs::remove_file(&tmp_manifest_name); |
07ad6470 DM |
322 | return Ok(()); // nothing changed |
323 | } | |
324 | } | |
325 | ||
326 | let manifest = BackupManifest::try_from(tmp_manifest_blob)?; | |
327 | ||
07ad6470 DM |
328 | for item in manifest.files() { |
329 | let mut path = tgt_store.base_path(); | |
330 | path.push(snapshot.relative_path()); | |
331 | path.push(&item.filename); | |
332 | ||
333 | if path.exists() { | |
334 | match archive_type(&item.filename)? { | |
335 | ArchiveType::DynamicIndex => { | |
336 | let index = DynamicIndexReader::open(&path)?; | |
337 | let (csum, size) = index.compute_csum(); | |
338 | match manifest.verify_file(&item.filename, &csum, size) { | |
339 | Ok(_) => continue, | |
340 | Err(err) => { | |
341 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
342 | } | |
343 | } | |
344 | } | |
345 | ArchiveType::FixedIndex => { | |
346 | let index = FixedIndexReader::open(&path)?; | |
347 | let (csum, size) = index.compute_csum(); | |
348 | match manifest.verify_file(&item.filename, &csum, size) { | |
349 | Ok(_) => continue, | |
350 | Err(err) => { | |
351 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
352 | } | |
353 | } | |
354 | } | |
355 | ArchiveType::Blob => { | |
356 | let mut tmpfile = std::fs::File::open(&path)?; | |
357 | let (csum, size) = compute_file_csum(&mut tmpfile)?; | |
358 | match manifest.verify_file(&item.filename, &csum, size) { | |
359 | Ok(_) => continue, | |
360 | Err(err) => { | |
361 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
362 | } | |
363 | } | |
364 | } | |
365 | } | |
366 | } | |
367 | ||
e2956c60 FG |
368 | let mut chunk_reader = RemoteChunkReader::new( |
369 | reader.clone(), | |
370 | None, | |
371 | item.chunk_crypt_mode(), | |
372 | HashMap::new(), | |
373 | ); | |
14f6c9cb | 374 | |
07ad6470 DM |
375 | pull_single_archive( |
376 | worker, | |
377 | &reader, | |
378 | &mut chunk_reader, | |
379 | tgt_store.clone(), | |
380 | snapshot, | |
2ce15934 | 381 | &item, |
ebbe4958 | 382 | downloaded_chunks.clone(), |
e2956c60 FG |
383 | ) |
384 | .await?; | |
07ad6470 DM |
385 | } |
386 | ||
387 | if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { | |
388 | bail!("Atomic rename file {:?} failed - {}", manifest_name, err); | |
389 | } | |
390 | ||
1610c45a DM |
391 | if !client_log_name.exists() { |
392 | try_client_log_download(worker, reader, &client_log_name).await?; | |
393 | } | |
394 | ||
07ad6470 DM |
395 | // cleanup - remove stale files |
396 | tgt_store.cleanup_backup_dir(snapshot, &manifest)?; | |
397 | ||
398 | Ok(()) | |
399 | } | |
400 | ||
401 | pub async fn pull_snapshot_from( | |
402 | worker: &WorkerTask, | |
403 | reader: Arc<BackupReader>, | |
404 | tgt_store: Arc<DataStore>, | |
405 | snapshot: &BackupDir, | |
e2956c60 | 406 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 407 | ) -> Result<(), Error> { |
f23f7543 | 408 | let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?; |
07ad6470 DM |
409 | |
410 | if is_new { | |
411 | worker.log(format!("sync snapshot {:?}", snapshot.relative_path())); | |
412 | ||
e2956c60 FG |
413 | if let Err(err) = pull_snapshot( |
414 | worker, | |
415 | reader, | |
416 | tgt_store.clone(), | |
417 | &snapshot, | |
418 | downloaded_chunks, | |
419 | ) | |
420 | .await | |
421 | { | |
c9756b40 | 422 | if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) { |
07ad6470 DM |
423 | worker.log(format!("cleanup error - {}", cleanup_err)); |
424 | } | |
425 | return Err(err); | |
426 | } | |
4856a218 | 427 | worker.log(format!("sync snapshot {:?} done", snapshot.relative_path())); |
07ad6470 DM |
428 | } else { |
429 | worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path())); | |
e2956c60 FG |
430 | pull_snapshot( |
431 | worker, | |
432 | reader, | |
433 | tgt_store.clone(), | |
434 | &snapshot, | |
435 | downloaded_chunks, | |
436 | ) | |
437 | .await?; | |
438 | worker.log(format!( | |
439 | "re-sync snapshot {:?} done", | |
440 | snapshot.relative_path() | |
441 | )); | |
07ad6470 DM |
442 | } |
443 | ||
444 | Ok(()) | |
445 | } | |
446 | ||
d2354a16 DC |
447 | struct SkipInfo { |
448 | oldest: i64, | |
449 | newest: i64, | |
450 | count: u64, | |
451 | } | |
452 | ||
453 | impl SkipInfo { | |
454 | fn update(&mut self, backup_time: i64) { | |
455 | self.count += 1; | |
456 | ||
457 | if backup_time < self.oldest { | |
458 | self.oldest = backup_time; | |
459 | } | |
460 | ||
461 | if backup_time > self.newest { | |
462 | self.newest = backup_time; | |
463 | } | |
464 | } | |
465 | ||
466 | fn affected(&self) -> Result<String, Error> { | |
467 | match self.count { | |
468 | 0 => Ok(String::new()), | |
469 | 1 => proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest), | |
470 | _ => { | |
471 | Ok(format!( | |
472 | "{} .. {}", | |
473 | proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest)?, | |
474 | proxmox::tools::time::epoch_to_rfc3339_utc(self.newest)?, | |
475 | )) | |
476 | } | |
477 | } | |
478 | } | |
479 | } | |
480 | ||
481 | impl std::fmt::Display for SkipInfo { | |
482 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
483 | write!( | |
484 | f, | |
485 | "skipped: {} snapshot(s) ({}) older than the newest local snapshot", | |
486 | self.count, | |
487 | self.affected().map_err(|_| std::fmt::Error)? | |
488 | ) | |
489 | } | |
490 | } | |
491 | ||
07ad6470 DM |
492 | pub async fn pull_group( |
493 | worker: &WorkerTask, | |
494 | client: &HttpClient, | |
495 | src_repo: &BackupRepository, | |
496 | tgt_store: Arc<DataStore>, | |
497 | group: &BackupGroup, | |
498 | delete: bool, | |
fc8920e3 | 499 | progress: &mut StoreProgress, |
07ad6470 | 500 | ) -> Result<(), Error> { |
07ad6470 DM |
501 | let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store()); |
502 | ||
503 | let args = json!({ | |
504 | "backup-type": group.backup_type(), | |
505 | "backup-id": group.backup_id(), | |
506 | }); | |
507 | ||
508 | let mut result = client.get(&path, Some(args)).await?; | |
509 | let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?; | |
510 | ||
511 | list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time)); | |
512 | ||
0081903f DM |
513 | client.login().await?; // make sure auth is complete |
514 | ||
07ad6470 DM |
515 | let fingerprint = client.fingerprint(); |
516 | ||
517 | let last_sync = tgt_store.last_successful_backup(group)?; | |
518 | ||
519 | let mut remote_snapshots = std::collections::HashSet::new(); | |
520 | ||
ebbe4958 | 521 | // start with 16384 chunks (up to 65GB) |
e2956c60 | 522 | let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64))); |
ebbe4958 | 523 | |
fc8920e3 | 524 | progress.group_snapshots = list.len() as u64; |
7b8aa893 | 525 | |
d2354a16 DC |
526 | let mut skip_info = SkipInfo { |
527 | oldest: i64::MAX, | |
528 | newest: i64::MIN, | |
529 | count: 0, | |
530 | }; | |
531 | ||
7b8aa893 | 532 | for (pos, item) in list.into_iter().enumerate() { |
e0e5b442 | 533 | let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?; |
86f6f741 FG |
534 | |
535 | // in-progress backups can't be synced | |
54aec2fa | 536 | if item.size.is_none() { |
e2956c60 FG |
537 | worker.log(format!( |
538 | "skipping snapshot {} - in-progress backup", | |
539 | snapshot | |
540 | )); | |
86f6f741 FG |
541 | continue; |
542 | } | |
543 | ||
544 | let backup_time = snapshot.backup_time(); | |
545 | ||
07ad6470 DM |
546 | remote_snapshots.insert(backup_time); |
547 | ||
548 | if let Some(last_sync_time) = last_sync { | |
e2956c60 | 549 | if last_sync_time > backup_time { |
d2354a16 | 550 | skip_info.update(backup_time); |
e2956c60 FG |
551 | continue; |
552 | } | |
07ad6470 DM |
553 | } |
554 | ||
0081903f DM |
555 | // get updated auth_info (new tickets) |
556 | let auth_info = client.login().await?; | |
557 | ||
93e3581c | 558 | let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone()); |
07ad6470 | 559 | |
e2956c60 FG |
560 | let new_client = HttpClient::new( |
561 | src_repo.host(), | |
562 | src_repo.port(), | |
563 | src_repo.auth_id(), | |
564 | options, | |
565 | )?; | |
07ad6470 DM |
566 | |
567 | let reader = BackupReader::start( | |
568 | new_client, | |
569 | None, | |
570 | src_repo.store(), | |
86f6f741 FG |
571 | snapshot.group().backup_type(), |
572 | snapshot.group().backup_id(), | |
07ad6470 DM |
573 | backup_time, |
574 | true, | |
e2956c60 FG |
575 | ) |
576 | .await?; | |
07ad6470 | 577 | |
e2956c60 FG |
578 | let result = pull_snapshot_from( |
579 | worker, | |
580 | reader, | |
581 | tgt_store.clone(), | |
582 | &snapshot, | |
583 | downloaded_chunks.clone(), | |
584 | ) | |
585 | .await; | |
7b8aa893 | 586 | |
fc8920e3 | 587 | progress.done_snapshots = pos as u64 + 1; |
6eff8dec | 588 | worker.log(format!("percentage done: {}", progress)); |
7b8aa893 DM |
589 | |
590 | result?; // stop on error | |
07ad6470 DM |
591 | } |
592 | ||
593 | if delete { | |
594 | let local_list = group.list_backups(&tgt_store.base_path())?; | |
595 | for info in local_list { | |
596 | let backup_time = info.backup_dir.backup_time(); | |
e2956c60 FG |
597 | if remote_snapshots.contains(&backup_time) { |
598 | continue; | |
599 | } | |
600 | worker.log(format!( | |
601 | "delete vanished snapshot {:?}", | |
602 | info.backup_dir.relative_path() | |
603 | )); | |
c9756b40 | 604 | tgt_store.remove_backup_dir(&info.backup_dir, false)?; |
07ad6470 DM |
605 | } |
606 | } | |
607 | ||
d2354a16 DC |
608 | if skip_info.count > 0 { |
609 | task_log!(worker, "{}", skip_info); | |
610 | } | |
611 | ||
07ad6470 DM |
612 | Ok(()) |
613 | } | |
614 | ||
615 | pub async fn pull_store( | |
616 | worker: &WorkerTask, | |
617 | client: &HttpClient, | |
618 | src_repo: &BackupRepository, | |
619 | tgt_store: Arc<DataStore>, | |
620 | delete: bool, | |
e6dc35ac | 621 | auth_id: Authid, |
07ad6470 | 622 | ) -> Result<(), Error> { |
07ad6470 DM |
623 | // explicit create shared lock to prevent GC on newly created chunks |
624 | let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?; | |
625 | ||
626 | let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store()); | |
627 | ||
44de5bcc FG |
628 | let mut result = client |
629 | .get(&path, None) | |
630 | .await | |
631 | .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?; | |
07ad6470 DM |
632 | |
633 | let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?; | |
634 | ||
f6e28f4e DC |
635 | worker.log(format!("found {} groups to sync", list.len())); |
636 | ||
07ad6470 DM |
637 | list.sort_unstable_by(|a, b| { |
638 | let type_order = a.backup_type.cmp(&b.backup_type); | |
639 | if type_order == std::cmp::Ordering::Equal { | |
640 | a.backup_id.cmp(&b.backup_id) | |
641 | } else { | |
642 | type_order | |
643 | } | |
644 | }); | |
645 | ||
646 | let mut errors = false; | |
647 | ||
648 | let mut new_groups = std::collections::HashSet::new(); | |
649 | for item in list.iter() { | |
650 | new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id)); | |
651 | } | |
652 | ||
fc8920e3 FG |
653 | let mut progress = StoreProgress::new(list.len() as u64); |
654 | ||
655 | for (done, item) in list.into_iter().enumerate() { | |
656 | progress.done_groups = done as u64; | |
657 | progress.done_snapshots = 0; | |
658 | progress.group_snapshots = 0; | |
7b8aa893 | 659 | |
07ad6470 DM |
660 | let group = BackupGroup::new(&item.backup_type, &item.backup_id); |
661 | ||
30f73fa2 DM |
662 | let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) { |
663 | Ok(result) => result, | |
664 | Err(err) => { | |
e2956c60 FG |
665 | worker.log(format!( |
666 | "sync group {}/{} failed - group lock failed: {}", | |
667 | item.backup_type, item.backup_id, err | |
668 | )); | |
30f73fa2 DM |
669 | errors = true; // do not stop here, instead continue |
670 | continue; | |
671 | } | |
672 | }; | |
673 | ||
07ad6470 | 674 | // permission check |
e2956c60 FG |
675 | if auth_id != owner { |
676 | // only the owner is allowed to create additional snapshots | |
677 | worker.log(format!( | |
678 | "sync group {}/{} failed - owner check failed ({} != {})", | |
679 | item.backup_type, item.backup_id, auth_id, owner | |
680 | )); | |
7b8aa893 | 681 | errors = true; // do not stop here, instead continue |
20813274 WB |
682 | } else if let Err(err) = pull_group( |
683 | worker, | |
684 | client, | |
685 | src_repo, | |
686 | tgt_store.clone(), | |
687 | &group, | |
688 | delete, | |
fc8920e3 | 689 | &mut progress, |
e2956c60 FG |
690 | ) |
691 | .await | |
692 | { | |
20813274 WB |
693 | worker.log(format!( |
694 | "sync group {}/{} failed - {}", | |
e2956c60 | 695 | item.backup_type, item.backup_id, err, |
20813274 WB |
696 | )); |
697 | errors = true; // do not stop here, instead continue | |
07ad6470 DM |
698 | } |
699 | } | |
700 | ||
701 | if delete { | |
702 | let result: Result<(), Error> = proxmox::try_block!({ | |
7f3b0f67 | 703 | let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?; |
07ad6470 | 704 | for local_group in local_groups { |
e2956c60 FG |
705 | if new_groups.contains(&local_group) { |
706 | continue; | |
707 | } | |
708 | worker.log(format!( | |
709 | "delete vanished group '{}/{}'", | |
710 | local_group.backup_type(), | |
711 | local_group.backup_id() | |
712 | )); | |
07ad6470 DM |
713 | if let Err(err) = tgt_store.remove_backup_group(&local_group) { |
714 | worker.log(err.to_string()); | |
715 | errors = true; | |
716 | } | |
717 | } | |
718 | Ok(()) | |
719 | }); | |
720 | if let Err(err) = result { | |
721 | worker.log(format!("error during cleanup: {}", err)); | |
722 | errors = true; | |
723 | }; | |
724 | } | |
725 | ||
726 | if errors { | |
727 | bail!("sync failed with some errors."); | |
728 | } | |
729 | ||
730 | Ok(()) | |
731 | } |