]>
Commit | Line | Data |
---|---|---|
07ad6470 DM |
1 | //! Sync datastore from remote server |
2 | ||
e2956c60 | 3 | use std::collections::{HashMap, HashSet}; |
07ad6470 | 4 | use std::convert::TryFrom; |
07ad6470 | 5 | use std::io::{Seek, SeekFrom}; |
54417086 | 6 | use std::sync::atomic::{AtomicUsize, Ordering}; |
e2956c60 FG |
7 | use std::sync::{Arc, Mutex}; |
8 | use std::time::SystemTime; | |
07ad6470 | 9 | |
c23192d3 WB |
10 | use anyhow::{bail, format_err, Error}; |
11 | use serde_json::json; | |
12 | ||
13 | use proxmox::api::error::{HttpError, StatusCode}; | |
14 | ||
15 | use pbs_datastore::task_log; | |
16 | ||
1bc1d81a | 17 | use crate::{ |
1bc1d81a | 18 | api2::types::*, |
e2956c60 | 19 | backup::*, |
1bc1d81a | 20 | client::*, |
e2956c60 FG |
21 | server::WorkerTask, |
22 | tools::{compute_file_csum, ParallelHandler}, | |
1bc1d81a | 23 | }; |
07ad6470 DM |
24 | |
25 | // fixme: implement filters | |
26 | // fixme: delete vanished groups | |
27 | // Todo: correctly lock backup groups | |
28 | ||
29 | async fn pull_index_chunks<I: IndexFile>( | |
998db639 | 30 | worker: &WorkerTask, |
73b2cc49 | 31 | chunk_reader: RemoteChunkReader, |
07ad6470 DM |
32 | target: Arc<DataStore>, |
33 | index: I, | |
e2956c60 | 34 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 35 | ) -> Result<(), Error> { |
73b2cc49 | 36 | use futures::stream::{self, StreamExt, TryStreamExt}; |
07ad6470 | 37 | |
998db639 DM |
38 | let start_time = SystemTime::now(); |
39 | ||
ebbe4958 DM |
40 | let stream = stream::iter( |
41 | (0..index.index_count()) | |
42 | .map(|pos| index.chunk_info(pos).unwrap()) | |
43 | .filter(|info| { | |
44 | let mut guard = downloaded_chunks.lock().unwrap(); | |
45 | let done = guard.contains(&info.digest); | |
46 | if !done { | |
47 | // Note: We mark a chunk as downloaded before its actually downloaded | |
48 | // to avoid duplicate downloads. | |
49 | guard.insert(info.digest); | |
50 | } | |
51 | !done | |
e2956c60 | 52 | }), |
ebbe4958 | 53 | ); |
07ad6470 | 54 | |
a71bc08f | 55 | let target2 = target.clone(); |
54417086 | 56 | let verify_pool = ParallelHandler::new( |
e2956c60 FG |
57 | "sync chunk writer", |
58 | 4, | |
59 | move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { | |
54417086 DM |
60 | // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest)); |
61 | chunk.verify_unencrypted(size as usize, &digest)?; | |
a71bc08f | 62 | target2.insert_chunk(&chunk, &digest)?; |
54417086 | 63 | Ok(()) |
e2956c60 | 64 | }, |
54417086 DM |
65 | ); |
66 | ||
67 | let verify_and_write_channel = verify_pool.channel(); | |
998db639 | 68 | |
54417086 DM |
69 | let bytes = Arc::new(AtomicUsize::new(0)); |
70 | ||
71 | stream | |
73b2cc49 | 72 | .map(|info| { |
73b2cc49 DM |
73 | let target = Arc::clone(&target); |
74 | let chunk_reader = chunk_reader.clone(); | |
54417086 DM |
75 | let bytes = Arc::clone(&bytes); |
76 | let verify_and_write_channel = verify_and_write_channel.clone(); | |
73b2cc49 DM |
77 | |
78 | Ok::<_, Error>(async move { | |
d420962f | 79 | let chunk_exists = pbs_runtime::block_in_place(|| { |
e2956c60 FG |
80 | target.cond_touch_chunk(&info.digest, false) |
81 | })?; | |
73b2cc49 DM |
82 | if chunk_exists { |
83 | //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest))); | |
84 | return Ok::<_, Error>(()); | |
85 | } | |
86 | //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest))); | |
87 | let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; | |
54417086 | 88 | let raw_size = chunk.raw_size() as usize; |
73b2cc49 | 89 | |
998db639 | 90 | // decode, verify and write in a separate threads to maximize throughput |
d420962f | 91 | pbs_runtime::block_in_place(|| { |
e2956c60 FG |
92 | verify_and_write_channel.send((chunk, info.digest, info.size())) |
93 | })?; | |
54417086 DM |
94 | |
95 | bytes.fetch_add(raw_size, Ordering::SeqCst); | |
998db639 DM |
96 | |
97 | Ok(()) | |
e2956c60 | 98 | }) |
73b2cc49 DM |
99 | }) |
100 | .try_buffer_unordered(20) | |
101 | .try_for_each(|_res| futures::future::ok(())) | |
54417086 | 102 | .await?; |
998db639 | 103 | |
54417086 | 104 | drop(verify_and_write_channel); |
998db639 | 105 | |
54417086 | 106 | verify_pool.complete()?; |
998db639 DM |
107 | |
108 | let elapsed = start_time.elapsed()?.as_secs_f64(); | |
109 | ||
54417086 DM |
110 | let bytes = bytes.load(Ordering::SeqCst); |
111 | ||
e2956c60 FG |
112 | worker.log(format!( |
113 | "downloaded {} bytes ({:.2} MiB/s)", | |
114 | bytes, | |
115 | (bytes as f64) / (1024.0 * 1024.0 * elapsed) | |
116 | )); | |
07ad6470 DM |
117 | |
118 | Ok(()) | |
119 | } | |
120 | ||
121 | async fn download_manifest( | |
122 | reader: &BackupReader, | |
123 | filename: &std::path::Path, | |
124 | ) -> Result<std::fs::File, Error> { | |
3d571d55 | 125 | let mut tmp_manifest_file = std::fs::OpenOptions::new() |
07ad6470 DM |
126 | .write(true) |
127 | .create(true) | |
194da6f8 | 128 | .truncate(true) |
07ad6470 DM |
129 | .read(true) |
130 | .open(&filename)?; | |
131 | ||
e2956c60 FG |
132 | reader |
133 | .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file) | |
134 | .await?; | |
07ad6470 DM |
135 | |
136 | tmp_manifest_file.seek(SeekFrom::Start(0))?; | |
137 | ||
138 | Ok(tmp_manifest_file) | |
139 | } | |
140 | ||
e2956c60 | 141 | fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> { |
2ce15934 | 142 | if size != info.size { |
e2956c60 FG |
143 | bail!( |
144 | "wrong size for file '{}' ({} != {})", | |
145 | info.filename, | |
146 | info.size, | |
147 | size | |
148 | ); | |
2ce15934 FG |
149 | } |
150 | ||
151 | if csum != &info.csum { | |
152 | bail!("wrong checksum for file '{}'", info.filename); | |
153 | } | |
154 | ||
155 | Ok(()) | |
156 | } | |
157 | ||
07ad6470 DM |
158 | async fn pull_single_archive( |
159 | worker: &WorkerTask, | |
160 | reader: &BackupReader, | |
161 | chunk_reader: &mut RemoteChunkReader, | |
162 | tgt_store: Arc<DataStore>, | |
163 | snapshot: &BackupDir, | |
2ce15934 | 164 | archive_info: &FileInfo, |
e2956c60 | 165 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 166 | ) -> Result<(), Error> { |
2ce15934 | 167 | let archive_name = &archive_info.filename; |
07ad6470 DM |
168 | let mut path = tgt_store.base_path(); |
169 | path.push(snapshot.relative_path()); | |
170 | path.push(archive_name); | |
171 | ||
172 | let mut tmp_path = path.clone(); | |
173 | tmp_path.set_extension("tmp"); | |
174 | ||
175 | worker.log(format!("sync archive {}", archive_name)); | |
3d571d55 | 176 | let mut tmpfile = std::fs::OpenOptions::new() |
07ad6470 DM |
177 | .write(true) |
178 | .create(true) | |
179 | .read(true) | |
180 | .open(&tmp_path)?; | |
181 | ||
3d571d55 | 182 | reader.download(archive_name, &mut tmpfile).await?; |
07ad6470 DM |
183 | |
184 | match archive_type(archive_name)? { | |
185 | ArchiveType::DynamicIndex => { | |
e2956c60 FG |
186 | let index = DynamicIndexReader::new(tmpfile).map_err(|err| { |
187 | format_err!("unable to read dynamic index {:?} - {}", tmp_path, err) | |
188 | })?; | |
2ce15934 FG |
189 | let (csum, size) = index.compute_csum(); |
190 | verify_archive(archive_info, &csum, size)?; | |
07ad6470 | 191 | |
e2956c60 FG |
192 | pull_index_chunks( |
193 | worker, | |
194 | chunk_reader.clone(), | |
195 | tgt_store.clone(), | |
196 | index, | |
197 | downloaded_chunks, | |
198 | ) | |
199 | .await?; | |
07ad6470 DM |
200 | } |
201 | ArchiveType::FixedIndex => { | |
e2956c60 FG |
202 | let index = FixedIndexReader::new(tmpfile).map_err(|err| { |
203 | format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err) | |
204 | })?; | |
2ce15934 FG |
205 | let (csum, size) = index.compute_csum(); |
206 | verify_archive(archive_info, &csum, size)?; | |
07ad6470 | 207 | |
e2956c60 FG |
208 | pull_index_chunks( |
209 | worker, | |
210 | chunk_reader.clone(), | |
211 | tgt_store.clone(), | |
212 | index, | |
213 | downloaded_chunks, | |
214 | ) | |
215 | .await?; | |
07ad6470 | 216 | } |
2ce15934 FG |
217 | ArchiveType::Blob => { |
218 | let (csum, size) = compute_file_csum(&mut tmpfile)?; | |
219 | verify_archive(archive_info, &csum, size)?; | |
220 | } | |
07ad6470 DM |
221 | } |
222 | if let Err(err) = std::fs::rename(&tmp_path, &path) { | |
223 | bail!("Atomic rename file {:?} failed - {}", path, err); | |
224 | } | |
225 | Ok(()) | |
226 | } | |
227 | ||
1610c45a DM |
228 | // Note: The client.log.blob is uploaded after the backup, so it is |
229 | // not mentioned in the manifest. | |
230 | async fn try_client_log_download( | |
231 | worker: &WorkerTask, | |
232 | reader: Arc<BackupReader>, | |
233 | path: &std::path::Path, | |
234 | ) -> Result<(), Error> { | |
1610c45a DM |
235 | let mut tmp_path = path.to_owned(); |
236 | tmp_path.set_extension("tmp"); | |
237 | ||
238 | let tmpfile = std::fs::OpenOptions::new() | |
239 | .write(true) | |
240 | .create(true) | |
241 | .read(true) | |
242 | .open(&tmp_path)?; | |
243 | ||
add5861e | 244 | // Note: be silent if there is no log - only log successful download |
3d571d55 | 245 | if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await { |
1610c45a DM |
246 | if let Err(err) = std::fs::rename(&tmp_path, &path) { |
247 | bail!("Atomic rename file {:?} failed - {}", path, err); | |
248 | } | |
add5861e | 249 | worker.log(format!("got backup log file {:?}", CLIENT_LOG_BLOB_NAME)); |
1610c45a DM |
250 | } |
251 | ||
252 | Ok(()) | |
253 | } | |
254 | ||
07ad6470 DM |
255 | async fn pull_snapshot( |
256 | worker: &WorkerTask, | |
257 | reader: Arc<BackupReader>, | |
258 | tgt_store: Arc<DataStore>, | |
259 | snapshot: &BackupDir, | |
e2956c60 | 260 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 261 | ) -> Result<(), Error> { |
07ad6470 DM |
262 | let mut manifest_name = tgt_store.base_path(); |
263 | manifest_name.push(snapshot.relative_path()); | |
264 | manifest_name.push(MANIFEST_BLOB_NAME); | |
265 | ||
1610c45a DM |
266 | let mut client_log_name = tgt_store.base_path(); |
267 | client_log_name.push(snapshot.relative_path()); | |
268 | client_log_name.push(CLIENT_LOG_BLOB_NAME); | |
269 | ||
07ad6470 DM |
270 | let mut tmp_manifest_name = manifest_name.clone(); |
271 | tmp_manifest_name.set_extension("tmp"); | |
272 | ||
c1c4a18f FG |
273 | let download_res = download_manifest(&reader, &tmp_manifest_name).await; |
274 | let mut tmp_manifest_file = match download_res { | |
275 | Ok(manifest_file) => manifest_file, | |
276 | Err(err) => { | |
277 | match err.downcast_ref::<HttpError>() { | |
e2956c60 FG |
278 | Some(HttpError { code, message }) => match *code { |
279 | StatusCode::NOT_FOUND => { | |
280 | worker.log(format!( | |
281 | "skipping snapshot {} - vanished since start of sync", | |
282 | snapshot | |
283 | )); | |
284 | return Ok(()); | |
285 | } | |
286 | _ => { | |
287 | bail!("HTTP error {} - {}", code, message); | |
c1c4a18f FG |
288 | } |
289 | }, | |
290 | None => { | |
291 | return Err(err); | |
e2956c60 | 292 | } |
c1c4a18f | 293 | }; |
e2956c60 | 294 | } |
c1c4a18f | 295 | }; |
39f18b30 | 296 | let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?; |
07ad6470 DM |
297 | |
298 | if manifest_name.exists() { | |
299 | let manifest_blob = proxmox::try_block!({ | |
e2956c60 FG |
300 | let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| { |
301 | format_err!( | |
302 | "unable to open local manifest {:?} - {}", | |
303 | manifest_name, | |
304 | err | |
305 | ) | |
306 | })?; | |
07ad6470 | 307 | |
39f18b30 | 308 | let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?; |
07ad6470 | 309 | Ok(manifest_blob) |
e2956c60 FG |
310 | }) |
311 | .map_err(|err: Error| { | |
312 | format_err!( | |
313 | "unable to read local manifest {:?} - {}", | |
314 | manifest_name, | |
315 | err | |
316 | ) | |
07ad6470 DM |
317 | })?; |
318 | ||
319 | if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { | |
1610c45a DM |
320 | if !client_log_name.exists() { |
321 | try_client_log_download(worker, reader, &client_log_name).await?; | |
322 | } | |
323 | worker.log("no data changes"); | |
e0085e66 | 324 | let _ = std::fs::remove_file(&tmp_manifest_name); |
07ad6470 DM |
325 | return Ok(()); // nothing changed |
326 | } | |
327 | } | |
328 | ||
329 | let manifest = BackupManifest::try_from(tmp_manifest_blob)?; | |
330 | ||
07ad6470 DM |
331 | for item in manifest.files() { |
332 | let mut path = tgt_store.base_path(); | |
333 | path.push(snapshot.relative_path()); | |
334 | path.push(&item.filename); | |
335 | ||
336 | if path.exists() { | |
337 | match archive_type(&item.filename)? { | |
338 | ArchiveType::DynamicIndex => { | |
339 | let index = DynamicIndexReader::open(&path)?; | |
340 | let (csum, size) = index.compute_csum(); | |
341 | match manifest.verify_file(&item.filename, &csum, size) { | |
342 | Ok(_) => continue, | |
343 | Err(err) => { | |
344 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
345 | } | |
346 | } | |
347 | } | |
348 | ArchiveType::FixedIndex => { | |
349 | let index = FixedIndexReader::open(&path)?; | |
350 | let (csum, size) = index.compute_csum(); | |
351 | match manifest.verify_file(&item.filename, &csum, size) { | |
352 | Ok(_) => continue, | |
353 | Err(err) => { | |
354 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
355 | } | |
356 | } | |
357 | } | |
358 | ArchiveType::Blob => { | |
359 | let mut tmpfile = std::fs::File::open(&path)?; | |
360 | let (csum, size) = compute_file_csum(&mut tmpfile)?; | |
361 | match manifest.verify_file(&item.filename, &csum, size) { | |
362 | Ok(_) => continue, | |
363 | Err(err) => { | |
364 | worker.log(format!("detected changed file {:?} - {}", path, err)); | |
365 | } | |
366 | } | |
367 | } | |
368 | } | |
369 | } | |
370 | ||
e2956c60 FG |
371 | let mut chunk_reader = RemoteChunkReader::new( |
372 | reader.clone(), | |
373 | None, | |
374 | item.chunk_crypt_mode(), | |
375 | HashMap::new(), | |
376 | ); | |
14f6c9cb | 377 | |
07ad6470 DM |
378 | pull_single_archive( |
379 | worker, | |
380 | &reader, | |
381 | &mut chunk_reader, | |
382 | tgt_store.clone(), | |
383 | snapshot, | |
2ce15934 | 384 | &item, |
ebbe4958 | 385 | downloaded_chunks.clone(), |
e2956c60 FG |
386 | ) |
387 | .await?; | |
07ad6470 DM |
388 | } |
389 | ||
390 | if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { | |
391 | bail!("Atomic rename file {:?} failed - {}", manifest_name, err); | |
392 | } | |
393 | ||
1610c45a DM |
394 | if !client_log_name.exists() { |
395 | try_client_log_download(worker, reader, &client_log_name).await?; | |
396 | } | |
397 | ||
07ad6470 DM |
398 | // cleanup - remove stale files |
399 | tgt_store.cleanup_backup_dir(snapshot, &manifest)?; | |
400 | ||
401 | Ok(()) | |
402 | } | |
403 | ||
404 | pub async fn pull_snapshot_from( | |
405 | worker: &WorkerTask, | |
406 | reader: Arc<BackupReader>, | |
407 | tgt_store: Arc<DataStore>, | |
408 | snapshot: &BackupDir, | |
e2956c60 | 409 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 410 | ) -> Result<(), Error> { |
f23f7543 | 411 | let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?; |
07ad6470 DM |
412 | |
413 | if is_new { | |
414 | worker.log(format!("sync snapshot {:?}", snapshot.relative_path())); | |
415 | ||
e2956c60 FG |
416 | if let Err(err) = pull_snapshot( |
417 | worker, | |
418 | reader, | |
419 | tgt_store.clone(), | |
420 | &snapshot, | |
421 | downloaded_chunks, | |
422 | ) | |
423 | .await | |
424 | { | |
c9756b40 | 425 | if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) { |
07ad6470 DM |
426 | worker.log(format!("cleanup error - {}", cleanup_err)); |
427 | } | |
428 | return Err(err); | |
429 | } | |
4856a218 | 430 | worker.log(format!("sync snapshot {:?} done", snapshot.relative_path())); |
07ad6470 DM |
431 | } else { |
432 | worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path())); | |
e2956c60 FG |
433 | pull_snapshot( |
434 | worker, | |
435 | reader, | |
436 | tgt_store.clone(), | |
437 | &snapshot, | |
438 | downloaded_chunks, | |
439 | ) | |
440 | .await?; | |
441 | worker.log(format!( | |
442 | "re-sync snapshot {:?} done", | |
443 | snapshot.relative_path() | |
444 | )); | |
07ad6470 DM |
445 | } |
446 | ||
447 | Ok(()) | |
448 | } | |
449 | ||
d2354a16 DC |
450 | struct SkipInfo { |
451 | oldest: i64, | |
452 | newest: i64, | |
453 | count: u64, | |
454 | } | |
455 | ||
456 | impl SkipInfo { | |
457 | fn update(&mut self, backup_time: i64) { | |
458 | self.count += 1; | |
459 | ||
460 | if backup_time < self.oldest { | |
461 | self.oldest = backup_time; | |
462 | } | |
463 | ||
464 | if backup_time > self.newest { | |
465 | self.newest = backup_time; | |
466 | } | |
467 | } | |
468 | ||
469 | fn affected(&self) -> Result<String, Error> { | |
470 | match self.count { | |
471 | 0 => Ok(String::new()), | |
472 | 1 => proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest), | |
473 | _ => { | |
474 | Ok(format!( | |
475 | "{} .. {}", | |
476 | proxmox::tools::time::epoch_to_rfc3339_utc(self.oldest)?, | |
477 | proxmox::tools::time::epoch_to_rfc3339_utc(self.newest)?, | |
478 | )) | |
479 | } | |
480 | } | |
481 | } | |
482 | } | |
483 | ||
484 | impl std::fmt::Display for SkipInfo { | |
485 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
486 | write!( | |
487 | f, | |
488 | "skipped: {} snapshot(s) ({}) older than the newest local snapshot", | |
489 | self.count, | |
490 | self.affected().map_err(|_| std::fmt::Error)? | |
491 | ) | |
492 | } | |
493 | } | |
494 | ||
07ad6470 DM |
495 | pub async fn pull_group( |
496 | worker: &WorkerTask, | |
497 | client: &HttpClient, | |
498 | src_repo: &BackupRepository, | |
499 | tgt_store: Arc<DataStore>, | |
500 | group: &BackupGroup, | |
501 | delete: bool, | |
fc8920e3 | 502 | progress: &mut StoreProgress, |
07ad6470 | 503 | ) -> Result<(), Error> { |
07ad6470 DM |
504 | let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store()); |
505 | ||
506 | let args = json!({ | |
507 | "backup-type": group.backup_type(), | |
508 | "backup-id": group.backup_id(), | |
509 | }); | |
510 | ||
511 | let mut result = client.get(&path, Some(args)).await?; | |
512 | let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?; | |
513 | ||
514 | list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time)); | |
515 | ||
0081903f DM |
516 | client.login().await?; // make sure auth is complete |
517 | ||
07ad6470 DM |
518 | let fingerprint = client.fingerprint(); |
519 | ||
520 | let last_sync = tgt_store.last_successful_backup(group)?; | |
521 | ||
522 | let mut remote_snapshots = std::collections::HashSet::new(); | |
523 | ||
ebbe4958 | 524 | // start with 16384 chunks (up to 65GB) |
e2956c60 | 525 | let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64))); |
ebbe4958 | 526 | |
fc8920e3 | 527 | progress.group_snapshots = list.len() as u64; |
7b8aa893 | 528 | |
d2354a16 DC |
529 | let mut skip_info = SkipInfo { |
530 | oldest: i64::MAX, | |
531 | newest: i64::MIN, | |
532 | count: 0, | |
533 | }; | |
534 | ||
7b8aa893 | 535 | for (pos, item) in list.into_iter().enumerate() { |
e0e5b442 | 536 | let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?; |
86f6f741 FG |
537 | |
538 | // in-progress backups can't be synced | |
54aec2fa | 539 | if item.size.is_none() { |
e2956c60 FG |
540 | worker.log(format!( |
541 | "skipping snapshot {} - in-progress backup", | |
542 | snapshot | |
543 | )); | |
86f6f741 FG |
544 | continue; |
545 | } | |
546 | ||
547 | let backup_time = snapshot.backup_time(); | |
548 | ||
07ad6470 DM |
549 | remote_snapshots.insert(backup_time); |
550 | ||
551 | if let Some(last_sync_time) = last_sync { | |
e2956c60 | 552 | if last_sync_time > backup_time { |
d2354a16 | 553 | skip_info.update(backup_time); |
e2956c60 FG |
554 | continue; |
555 | } | |
07ad6470 DM |
556 | } |
557 | ||
0081903f DM |
558 | // get updated auth_info (new tickets) |
559 | let auth_info = client.login().await?; | |
560 | ||
93e3581c | 561 | let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone()); |
07ad6470 | 562 | |
e2956c60 FG |
563 | let new_client = HttpClient::new( |
564 | src_repo.host(), | |
565 | src_repo.port(), | |
566 | src_repo.auth_id(), | |
567 | options, | |
568 | )?; | |
07ad6470 DM |
569 | |
570 | let reader = BackupReader::start( | |
571 | new_client, | |
572 | None, | |
573 | src_repo.store(), | |
86f6f741 FG |
574 | snapshot.group().backup_type(), |
575 | snapshot.group().backup_id(), | |
07ad6470 DM |
576 | backup_time, |
577 | true, | |
e2956c60 FG |
578 | ) |
579 | .await?; | |
07ad6470 | 580 | |
e2956c60 FG |
581 | let result = pull_snapshot_from( |
582 | worker, | |
583 | reader, | |
584 | tgt_store.clone(), | |
585 | &snapshot, | |
586 | downloaded_chunks.clone(), | |
587 | ) | |
588 | .await; | |
7b8aa893 | 589 | |
fc8920e3 | 590 | progress.done_snapshots = pos as u64 + 1; |
6eff8dec | 591 | worker.log(format!("percentage done: {}", progress)); |
7b8aa893 DM |
592 | |
593 | result?; // stop on error | |
07ad6470 DM |
594 | } |
595 | ||
596 | if delete { | |
597 | let local_list = group.list_backups(&tgt_store.base_path())?; | |
598 | for info in local_list { | |
599 | let backup_time = info.backup_dir.backup_time(); | |
e2956c60 FG |
600 | if remote_snapshots.contains(&backup_time) { |
601 | continue; | |
602 | } | |
603 | worker.log(format!( | |
604 | "delete vanished snapshot {:?}", | |
605 | info.backup_dir.relative_path() | |
606 | )); | |
c9756b40 | 607 | tgt_store.remove_backup_dir(&info.backup_dir, false)?; |
07ad6470 DM |
608 | } |
609 | } | |
610 | ||
d2354a16 DC |
611 | if skip_info.count > 0 { |
612 | task_log!(worker, "{}", skip_info); | |
613 | } | |
614 | ||
07ad6470 DM |
615 | Ok(()) |
616 | } | |
617 | ||
618 | pub async fn pull_store( | |
619 | worker: &WorkerTask, | |
620 | client: &HttpClient, | |
621 | src_repo: &BackupRepository, | |
622 | tgt_store: Arc<DataStore>, | |
623 | delete: bool, | |
e6dc35ac | 624 | auth_id: Authid, |
07ad6470 | 625 | ) -> Result<(), Error> { |
07ad6470 DM |
626 | // explicit create shared lock to prevent GC on newly created chunks |
627 | let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?; | |
628 | ||
629 | let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store()); | |
630 | ||
44de5bcc FG |
631 | let mut result = client |
632 | .get(&path, None) | |
633 | .await | |
634 | .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?; | |
07ad6470 DM |
635 | |
636 | let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?; | |
637 | ||
f6e28f4e DC |
638 | worker.log(format!("found {} groups to sync", list.len())); |
639 | ||
07ad6470 DM |
640 | list.sort_unstable_by(|a, b| { |
641 | let type_order = a.backup_type.cmp(&b.backup_type); | |
642 | if type_order == std::cmp::Ordering::Equal { | |
643 | a.backup_id.cmp(&b.backup_id) | |
644 | } else { | |
645 | type_order | |
646 | } | |
647 | }); | |
648 | ||
649 | let mut errors = false; | |
650 | ||
651 | let mut new_groups = std::collections::HashSet::new(); | |
652 | for item in list.iter() { | |
653 | new_groups.insert(BackupGroup::new(&item.backup_type, &item.backup_id)); | |
654 | } | |
655 | ||
fc8920e3 FG |
656 | let mut progress = StoreProgress::new(list.len() as u64); |
657 | ||
658 | for (done, item) in list.into_iter().enumerate() { | |
659 | progress.done_groups = done as u64; | |
660 | progress.done_snapshots = 0; | |
661 | progress.group_snapshots = 0; | |
7b8aa893 | 662 | |
07ad6470 DM |
663 | let group = BackupGroup::new(&item.backup_type, &item.backup_id); |
664 | ||
30f73fa2 DM |
665 | let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) { |
666 | Ok(result) => result, | |
667 | Err(err) => { | |
e2956c60 FG |
668 | worker.log(format!( |
669 | "sync group {}/{} failed - group lock failed: {}", | |
670 | item.backup_type, item.backup_id, err | |
671 | )); | |
30f73fa2 DM |
672 | errors = true; // do not stop here, instead continue |
673 | continue; | |
674 | } | |
675 | }; | |
676 | ||
07ad6470 | 677 | // permission check |
e2956c60 FG |
678 | if auth_id != owner { |
679 | // only the owner is allowed to create additional snapshots | |
680 | worker.log(format!( | |
681 | "sync group {}/{} failed - owner check failed ({} != {})", | |
682 | item.backup_type, item.backup_id, auth_id, owner | |
683 | )); | |
7b8aa893 | 684 | errors = true; // do not stop here, instead continue |
20813274 WB |
685 | } else if let Err(err) = pull_group( |
686 | worker, | |
687 | client, | |
688 | src_repo, | |
689 | tgt_store.clone(), | |
690 | &group, | |
691 | delete, | |
fc8920e3 | 692 | &mut progress, |
e2956c60 FG |
693 | ) |
694 | .await | |
695 | { | |
20813274 WB |
696 | worker.log(format!( |
697 | "sync group {}/{} failed - {}", | |
e2956c60 | 698 | item.backup_type, item.backup_id, err, |
20813274 WB |
699 | )); |
700 | errors = true; // do not stop here, instead continue | |
07ad6470 DM |
701 | } |
702 | } | |
703 | ||
704 | if delete { | |
705 | let result: Result<(), Error> = proxmox::try_block!({ | |
7f3b0f67 | 706 | let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?; |
07ad6470 | 707 | for local_group in local_groups { |
e2956c60 FG |
708 | if new_groups.contains(&local_group) { |
709 | continue; | |
710 | } | |
711 | worker.log(format!( | |
712 | "delete vanished group '{}/{}'", | |
713 | local_group.backup_type(), | |
714 | local_group.backup_id() | |
715 | )); | |
07ad6470 DM |
716 | if let Err(err) = tgt_store.remove_backup_group(&local_group) { |
717 | worker.log(err.to_string()); | |
718 | errors = true; | |
719 | } | |
720 | } | |
721 | Ok(()) | |
722 | }); | |
723 | if let Err(err) = result { | |
724 | worker.log(format!("error during cleanup: {}", err)); | |
725 | errors = true; | |
726 | }; | |
727 | } | |
728 | ||
729 | if errors { | |
730 | bail!("sync failed with some errors."); | |
731 | } | |
732 | ||
733 | Ok(()) | |
734 | } |