]>
Commit | Line | Data |
---|---|---|
07ad6470 DM |
1 | //! Sync datastore from remote server |
2 | ||
e2956c60 | 3 | use std::collections::{HashMap, HashSet}; |
07ad6470 | 4 | use std::convert::TryFrom; |
07ad6470 | 5 | use std::io::{Seek, SeekFrom}; |
54417086 | 6 | use std::sync::atomic::{AtomicUsize, Ordering}; |
e2956c60 FG |
7 | use std::sync::{Arc, Mutex}; |
8 | use std::time::SystemTime; | |
07ad6470 | 9 | |
c23192d3 WB |
10 | use anyhow::{bail, format_err, Error}; |
11 | use serde_json::json; | |
6ef1b649 | 12 | use http::StatusCode; |
c23192d3 | 13 | |
6ef1b649 | 14 | use proxmox_router::HttpError; |
d5790a9f | 15 | use proxmox_sys::task_log; |
c23192d3 | 16 | |
71e53463 FG |
17 | use pbs_api_types::{Authid, GroupFilter, GroupListItem, Remote, SnapshotListItem}; |
18 | ||
19 | use pbs_datastore::{BackupDir, BackupInfo, BackupGroup, DataStore, StoreProgress}; | |
ea584a75 WB |
20 | use pbs_datastore::data_blob::DataBlob; |
21 | use pbs_datastore::dynamic_index::DynamicIndexReader; | |
22 | use pbs_datastore::fixed_index::FixedIndexReader; | |
23 | use pbs_datastore::index::IndexFile; | |
24 | use pbs_datastore::manifest::{ | |
25 | CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, ArchiveType, BackupManifest, FileInfo, archive_type | |
26 | }; | |
ba0ccc59 | 27 | use pbs_tools::sha::sha256; |
2b7f8dd5 | 28 | use pbs_client::{BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader}; |
b9700a9f | 29 | use proxmox_rest_server::WorkerTask; |
c23192d3 | 30 | |
6d5d305d | 31 | use crate::tools::ParallelHandler; |
07ad6470 DM |
32 | |
33 | // fixme: implement filters | |
34 | // fixme: delete vanished groups | |
35 | // Todo: correctly lock backup groups | |
36 | ||
6e9e6c7a FG |
37 | pub struct PullParameters { |
38 | remote: Remote, | |
39 | source: BackupRepository, | |
40 | store: Arc<DataStore>, | |
41 | owner: Authid, | |
42 | remove_vanished: bool, | |
71e53463 | 43 | group_filter: Option<Vec<GroupFilter>>, |
6e9e6c7a FG |
44 | } |
45 | ||
46 | impl PullParameters { | |
47 | pub fn new( | |
48 | store: &str, | |
49 | remote: &str, | |
50 | remote_store: &str, | |
51 | owner: Authid, | |
52 | remove_vanished: Option<bool>, | |
71e53463 | 53 | group_filter: Option<Vec<GroupFilter>>, |
6e9e6c7a FG |
54 | ) -> Result<Self, Error> { |
55 | let store = DataStore::lookup_datastore(store)?; | |
56 | ||
57 | let (remote_config, _digest) = pbs_config::remote::config()?; | |
58 | let remote: Remote = remote_config.lookup("remote", remote)?; | |
59 | ||
61ef4ae8 | 60 | let remove_vanished = remove_vanished.unwrap_or(false); |
6e9e6c7a FG |
61 | |
62 | let source = BackupRepository::new( | |
63 | Some(remote.config.auth_id.clone()), | |
64 | Some(remote.config.host.clone()), | |
65 | remote.config.port, | |
66 | remote_store.to_string(), | |
67 | ); | |
68 | ||
71e53463 | 69 | Ok(Self { remote, source, store, owner, remove_vanished, group_filter }) |
6e9e6c7a FG |
70 | } |
71 | ||
72 | pub async fn client(&self) -> Result<HttpClient, Error> { | |
73 | crate::api2::config::remote::remote_client(&self.remote).await | |
74 | } | |
75 | } | |
76 | ||
07ad6470 | 77 | async fn pull_index_chunks<I: IndexFile>( |
998db639 | 78 | worker: &WorkerTask, |
73b2cc49 | 79 | chunk_reader: RemoteChunkReader, |
07ad6470 DM |
80 | target: Arc<DataStore>, |
81 | index: I, | |
e2956c60 | 82 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 83 | ) -> Result<(), Error> { |
73b2cc49 | 84 | use futures::stream::{self, StreamExt, TryStreamExt}; |
07ad6470 | 85 | |
998db639 DM |
86 | let start_time = SystemTime::now(); |
87 | ||
ebbe4958 DM |
88 | let stream = stream::iter( |
89 | (0..index.index_count()) | |
90 | .map(|pos| index.chunk_info(pos).unwrap()) | |
91 | .filter(|info| { | |
92 | let mut guard = downloaded_chunks.lock().unwrap(); | |
93 | let done = guard.contains(&info.digest); | |
94 | if !done { | |
95 | // Note: We mark a chunk as downloaded before its actually downloaded | |
96 | // to avoid duplicate downloads. | |
97 | guard.insert(info.digest); | |
98 | } | |
99 | !done | |
e2956c60 | 100 | }), |
ebbe4958 | 101 | ); |
07ad6470 | 102 | |
a71bc08f | 103 | let target2 = target.clone(); |
54417086 | 104 | let verify_pool = ParallelHandler::new( |
e2956c60 FG |
105 | "sync chunk writer", |
106 | 4, | |
107 | move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { | |
54417086 DM |
108 | // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest)); |
109 | chunk.verify_unencrypted(size as usize, &digest)?; | |
a71bc08f | 110 | target2.insert_chunk(&chunk, &digest)?; |
54417086 | 111 | Ok(()) |
e2956c60 | 112 | }, |
54417086 DM |
113 | ); |
114 | ||
115 | let verify_and_write_channel = verify_pool.channel(); | |
998db639 | 116 | |
54417086 DM |
117 | let bytes = Arc::new(AtomicUsize::new(0)); |
118 | ||
119 | stream | |
73b2cc49 | 120 | .map(|info| { |
73b2cc49 DM |
121 | let target = Arc::clone(&target); |
122 | let chunk_reader = chunk_reader.clone(); | |
54417086 DM |
123 | let bytes = Arc::clone(&bytes); |
124 | let verify_and_write_channel = verify_and_write_channel.clone(); | |
73b2cc49 DM |
125 | |
126 | Ok::<_, Error>(async move { | |
d420962f | 127 | let chunk_exists = pbs_runtime::block_in_place(|| { |
e2956c60 FG |
128 | target.cond_touch_chunk(&info.digest, false) |
129 | })?; | |
73b2cc49 | 130 | if chunk_exists { |
1ec0d70d | 131 | //task_log!(worker, "chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)); |
73b2cc49 DM |
132 | return Ok::<_, Error>(()); |
133 | } | |
1ec0d70d | 134 | //task_log!(worker, "sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)); |
73b2cc49 | 135 | let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; |
54417086 | 136 | let raw_size = chunk.raw_size() as usize; |
73b2cc49 | 137 | |
998db639 | 138 | // decode, verify and write in a separate threads to maximize throughput |
d420962f | 139 | pbs_runtime::block_in_place(|| { |
e2956c60 FG |
140 | verify_and_write_channel.send((chunk, info.digest, info.size())) |
141 | })?; | |
54417086 DM |
142 | |
143 | bytes.fetch_add(raw_size, Ordering::SeqCst); | |
998db639 DM |
144 | |
145 | Ok(()) | |
e2956c60 | 146 | }) |
73b2cc49 DM |
147 | }) |
148 | .try_buffer_unordered(20) | |
149 | .try_for_each(|_res| futures::future::ok(())) | |
54417086 | 150 | .await?; |
998db639 | 151 | |
54417086 | 152 | drop(verify_and_write_channel); |
998db639 | 153 | |
54417086 | 154 | verify_pool.complete()?; |
998db639 DM |
155 | |
156 | let elapsed = start_time.elapsed()?.as_secs_f64(); | |
157 | ||
54417086 DM |
158 | let bytes = bytes.load(Ordering::SeqCst); |
159 | ||
1ec0d70d DM |
160 | task_log!( |
161 | worker, | |
e2956c60 FG |
162 | "downloaded {} bytes ({:.2} MiB/s)", |
163 | bytes, | |
164 | (bytes as f64) / (1024.0 * 1024.0 * elapsed) | |
1ec0d70d | 165 | ); |
07ad6470 DM |
166 | |
167 | Ok(()) | |
168 | } | |
169 | ||
170 | async fn download_manifest( | |
171 | reader: &BackupReader, | |
172 | filename: &std::path::Path, | |
173 | ) -> Result<std::fs::File, Error> { | |
3d571d55 | 174 | let mut tmp_manifest_file = std::fs::OpenOptions::new() |
07ad6470 DM |
175 | .write(true) |
176 | .create(true) | |
194da6f8 | 177 | .truncate(true) |
07ad6470 DM |
178 | .read(true) |
179 | .open(&filename)?; | |
180 | ||
e2956c60 FG |
181 | reader |
182 | .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file) | |
183 | .await?; | |
07ad6470 DM |
184 | |
185 | tmp_manifest_file.seek(SeekFrom::Start(0))?; | |
186 | ||
187 | Ok(tmp_manifest_file) | |
188 | } | |
189 | ||
e2956c60 | 190 | fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> { |
2ce15934 | 191 | if size != info.size { |
e2956c60 FG |
192 | bail!( |
193 | "wrong size for file '{}' ({} != {})", | |
194 | info.filename, | |
195 | info.size, | |
196 | size | |
197 | ); | |
2ce15934 FG |
198 | } |
199 | ||
200 | if csum != &info.csum { | |
201 | bail!("wrong checksum for file '{}'", info.filename); | |
202 | } | |
203 | ||
204 | Ok(()) | |
205 | } | |
206 | ||
07ad6470 DM |
207 | async fn pull_single_archive( |
208 | worker: &WorkerTask, | |
209 | reader: &BackupReader, | |
210 | chunk_reader: &mut RemoteChunkReader, | |
211 | tgt_store: Arc<DataStore>, | |
212 | snapshot: &BackupDir, | |
2ce15934 | 213 | archive_info: &FileInfo, |
e2956c60 | 214 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 215 | ) -> Result<(), Error> { |
2ce15934 | 216 | let archive_name = &archive_info.filename; |
07ad6470 DM |
217 | let mut path = tgt_store.base_path(); |
218 | path.push(snapshot.relative_path()); | |
219 | path.push(archive_name); | |
220 | ||
221 | let mut tmp_path = path.clone(); | |
222 | tmp_path.set_extension("tmp"); | |
223 | ||
1ec0d70d DM |
224 | task_log!(worker, "sync archive {}", archive_name); |
225 | ||
3d571d55 | 226 | let mut tmpfile = std::fs::OpenOptions::new() |
07ad6470 DM |
227 | .write(true) |
228 | .create(true) | |
229 | .read(true) | |
230 | .open(&tmp_path)?; | |
231 | ||
3d571d55 | 232 | reader.download(archive_name, &mut tmpfile).await?; |
07ad6470 DM |
233 | |
234 | match archive_type(archive_name)? { | |
235 | ArchiveType::DynamicIndex => { | |
e2956c60 FG |
236 | let index = DynamicIndexReader::new(tmpfile).map_err(|err| { |
237 | format_err!("unable to read dynamic index {:?} - {}", tmp_path, err) | |
238 | })?; | |
2ce15934 FG |
239 | let (csum, size) = index.compute_csum(); |
240 | verify_archive(archive_info, &csum, size)?; | |
07ad6470 | 241 | |
e2956c60 FG |
242 | pull_index_chunks( |
243 | worker, | |
244 | chunk_reader.clone(), | |
245 | tgt_store.clone(), | |
246 | index, | |
247 | downloaded_chunks, | |
248 | ) | |
249 | .await?; | |
07ad6470 DM |
250 | } |
251 | ArchiveType::FixedIndex => { | |
e2956c60 FG |
252 | let index = FixedIndexReader::new(tmpfile).map_err(|err| { |
253 | format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err) | |
254 | })?; | |
2ce15934 FG |
255 | let (csum, size) = index.compute_csum(); |
256 | verify_archive(archive_info, &csum, size)?; | |
07ad6470 | 257 | |
e2956c60 FG |
258 | pull_index_chunks( |
259 | worker, | |
260 | chunk_reader.clone(), | |
261 | tgt_store.clone(), | |
262 | index, | |
263 | downloaded_chunks, | |
264 | ) | |
265 | .await?; | |
07ad6470 | 266 | } |
2ce15934 | 267 | ArchiveType::Blob => { |
ba0ccc59 WB |
268 | tmpfile.seek(SeekFrom::Start(0))?; |
269 | let (csum, size) = sha256(&mut tmpfile)?; | |
2ce15934 FG |
270 | verify_archive(archive_info, &csum, size)?; |
271 | } | |
07ad6470 DM |
272 | } |
273 | if let Err(err) = std::fs::rename(&tmp_path, &path) { | |
274 | bail!("Atomic rename file {:?} failed - {}", path, err); | |
275 | } | |
276 | Ok(()) | |
277 | } | |
278 | ||
1610c45a DM |
279 | // Note: The client.log.blob is uploaded after the backup, so it is |
280 | // not mentioned in the manifest. | |
281 | async fn try_client_log_download( | |
282 | worker: &WorkerTask, | |
283 | reader: Arc<BackupReader>, | |
284 | path: &std::path::Path, | |
285 | ) -> Result<(), Error> { | |
1610c45a DM |
286 | let mut tmp_path = path.to_owned(); |
287 | tmp_path.set_extension("tmp"); | |
288 | ||
289 | let tmpfile = std::fs::OpenOptions::new() | |
290 | .write(true) | |
291 | .create(true) | |
292 | .read(true) | |
293 | .open(&tmp_path)?; | |
294 | ||
add5861e | 295 | // Note: be silent if there is no log - only log successful download |
3d571d55 | 296 | if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await { |
1610c45a DM |
297 | if let Err(err) = std::fs::rename(&tmp_path, &path) { |
298 | bail!("Atomic rename file {:?} failed - {}", path, err); | |
299 | } | |
1ec0d70d | 300 | task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME); |
1610c45a DM |
301 | } |
302 | ||
303 | Ok(()) | |
304 | } | |
305 | ||
07ad6470 DM |
306 | async fn pull_snapshot( |
307 | worker: &WorkerTask, | |
308 | reader: Arc<BackupReader>, | |
309 | tgt_store: Arc<DataStore>, | |
310 | snapshot: &BackupDir, | |
e2956c60 | 311 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 312 | ) -> Result<(), Error> { |
07ad6470 DM |
313 | let mut manifest_name = tgt_store.base_path(); |
314 | manifest_name.push(snapshot.relative_path()); | |
315 | manifest_name.push(MANIFEST_BLOB_NAME); | |
316 | ||
1610c45a DM |
317 | let mut client_log_name = tgt_store.base_path(); |
318 | client_log_name.push(snapshot.relative_path()); | |
319 | client_log_name.push(CLIENT_LOG_BLOB_NAME); | |
320 | ||
07ad6470 DM |
321 | let mut tmp_manifest_name = manifest_name.clone(); |
322 | tmp_manifest_name.set_extension("tmp"); | |
323 | ||
c1c4a18f FG |
324 | let download_res = download_manifest(&reader, &tmp_manifest_name).await; |
325 | let mut tmp_manifest_file = match download_res { | |
326 | Ok(manifest_file) => manifest_file, | |
327 | Err(err) => { | |
328 | match err.downcast_ref::<HttpError>() { | |
e2956c60 FG |
329 | Some(HttpError { code, message }) => match *code { |
330 | StatusCode::NOT_FOUND => { | |
1ec0d70d DM |
331 | task_log!( |
332 | worker, | |
e2956c60 FG |
333 | "skipping snapshot {} - vanished since start of sync", |
334 | snapshot | |
1ec0d70d | 335 | ); |
e2956c60 FG |
336 | return Ok(()); |
337 | } | |
338 | _ => { | |
339 | bail!("HTTP error {} - {}", code, message); | |
c1c4a18f FG |
340 | } |
341 | }, | |
342 | None => { | |
343 | return Err(err); | |
e2956c60 | 344 | } |
c1c4a18f | 345 | }; |
e2956c60 | 346 | } |
c1c4a18f | 347 | }; |
39f18b30 | 348 | let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?; |
07ad6470 DM |
349 | |
350 | if manifest_name.exists() { | |
6ef1b649 | 351 | let manifest_blob = proxmox_lang::try_block!({ |
e2956c60 FG |
352 | let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| { |
353 | format_err!( | |
354 | "unable to open local manifest {:?} - {}", | |
355 | manifest_name, | |
356 | err | |
357 | ) | |
358 | })?; | |
07ad6470 | 359 | |
39f18b30 | 360 | let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?; |
07ad6470 | 361 | Ok(manifest_blob) |
e2956c60 FG |
362 | }) |
363 | .map_err(|err: Error| { | |
364 | format_err!( | |
365 | "unable to read local manifest {:?} - {}", | |
366 | manifest_name, | |
367 | err | |
368 | ) | |
07ad6470 DM |
369 | })?; |
370 | ||
371 | if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { | |
1610c45a DM |
372 | if !client_log_name.exists() { |
373 | try_client_log_download(worker, reader, &client_log_name).await?; | |
374 | } | |
1ec0d70d | 375 | task_log!(worker, "no data changes"); |
e0085e66 | 376 | let _ = std::fs::remove_file(&tmp_manifest_name); |
07ad6470 DM |
377 | return Ok(()); // nothing changed |
378 | } | |
379 | } | |
380 | ||
381 | let manifest = BackupManifest::try_from(tmp_manifest_blob)?; | |
382 | ||
07ad6470 DM |
383 | for item in manifest.files() { |
384 | let mut path = tgt_store.base_path(); | |
385 | path.push(snapshot.relative_path()); | |
386 | path.push(&item.filename); | |
387 | ||
388 | if path.exists() { | |
389 | match archive_type(&item.filename)? { | |
390 | ArchiveType::DynamicIndex => { | |
391 | let index = DynamicIndexReader::open(&path)?; | |
392 | let (csum, size) = index.compute_csum(); | |
393 | match manifest.verify_file(&item.filename, &csum, size) { | |
394 | Ok(_) => continue, | |
395 | Err(err) => { | |
1ec0d70d | 396 | task_log!(worker, "detected changed file {:?} - {}", path, err); |
07ad6470 DM |
397 | } |
398 | } | |
399 | } | |
400 | ArchiveType::FixedIndex => { | |
401 | let index = FixedIndexReader::open(&path)?; | |
402 | let (csum, size) = index.compute_csum(); | |
403 | match manifest.verify_file(&item.filename, &csum, size) { | |
404 | Ok(_) => continue, | |
405 | Err(err) => { | |
1ec0d70d | 406 | task_log!(worker, "detected changed file {:?} - {}", path, err); |
07ad6470 DM |
407 | } |
408 | } | |
409 | } | |
410 | ArchiveType::Blob => { | |
411 | let mut tmpfile = std::fs::File::open(&path)?; | |
ba0ccc59 | 412 | let (csum, size) = sha256(&mut tmpfile)?; |
07ad6470 DM |
413 | match manifest.verify_file(&item.filename, &csum, size) { |
414 | Ok(_) => continue, | |
415 | Err(err) => { | |
1ec0d70d | 416 | task_log!(worker, "detected changed file {:?} - {}", path, err); |
07ad6470 DM |
417 | } |
418 | } | |
419 | } | |
420 | } | |
421 | } | |
422 | ||
e2956c60 FG |
423 | let mut chunk_reader = RemoteChunkReader::new( |
424 | reader.clone(), | |
425 | None, | |
426 | item.chunk_crypt_mode(), | |
427 | HashMap::new(), | |
428 | ); | |
14f6c9cb | 429 | |
07ad6470 DM |
430 | pull_single_archive( |
431 | worker, | |
432 | &reader, | |
433 | &mut chunk_reader, | |
434 | tgt_store.clone(), | |
435 | snapshot, | |
2ce15934 | 436 | &item, |
ebbe4958 | 437 | downloaded_chunks.clone(), |
e2956c60 FG |
438 | ) |
439 | .await?; | |
07ad6470 DM |
440 | } |
441 | ||
442 | if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { | |
443 | bail!("Atomic rename file {:?} failed - {}", manifest_name, err); | |
444 | } | |
445 | ||
1610c45a DM |
446 | if !client_log_name.exists() { |
447 | try_client_log_download(worker, reader, &client_log_name).await?; | |
448 | } | |
449 | ||
07ad6470 DM |
450 | // cleanup - remove stale files |
451 | tgt_store.cleanup_backup_dir(snapshot, &manifest)?; | |
452 | ||
453 | Ok(()) | |
454 | } | |
455 | ||
456 | pub async fn pull_snapshot_from( | |
457 | worker: &WorkerTask, | |
458 | reader: Arc<BackupReader>, | |
459 | tgt_store: Arc<DataStore>, | |
460 | snapshot: &BackupDir, | |
e2956c60 | 461 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 462 | ) -> Result<(), Error> { |
f23f7543 | 463 | let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?; |
07ad6470 DM |
464 | |
465 | if is_new { | |
1ec0d70d | 466 | task_log!(worker, "sync snapshot {:?}", snapshot.relative_path()); |
07ad6470 | 467 | |
e2956c60 FG |
468 | if let Err(err) = pull_snapshot( |
469 | worker, | |
470 | reader, | |
471 | tgt_store.clone(), | |
472 | &snapshot, | |
473 | downloaded_chunks, | |
474 | ) | |
475 | .await | |
476 | { | |
c9756b40 | 477 | if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) { |
1ec0d70d | 478 | task_log!(worker, "cleanup error - {}", cleanup_err); |
07ad6470 DM |
479 | } |
480 | return Err(err); | |
481 | } | |
1ec0d70d | 482 | task_log!(worker, "sync snapshot {:?} done", snapshot.relative_path()); |
07ad6470 | 483 | } else { |
1ec0d70d | 484 | task_log!(worker, "re-sync snapshot {:?}", snapshot.relative_path()); |
e2956c60 FG |
485 | pull_snapshot( |
486 | worker, | |
487 | reader, | |
488 | tgt_store.clone(), | |
489 | &snapshot, | |
490 | downloaded_chunks, | |
491 | ) | |
492 | .await?; | |
1ec0d70d | 493 | task_log!(worker, "re-sync snapshot {:?} done", snapshot.relative_path()); |
07ad6470 DM |
494 | } |
495 | ||
496 | Ok(()) | |
497 | } | |
498 | ||
d2354a16 DC |
499 | struct SkipInfo { |
500 | oldest: i64, | |
501 | newest: i64, | |
502 | count: u64, | |
503 | } | |
504 | ||
505 | impl SkipInfo { | |
506 | fn update(&mut self, backup_time: i64) { | |
507 | self.count += 1; | |
508 | ||
509 | if backup_time < self.oldest { | |
510 | self.oldest = backup_time; | |
511 | } | |
512 | ||
513 | if backup_time > self.newest { | |
514 | self.newest = backup_time; | |
515 | } | |
516 | } | |
517 | ||
518 | fn affected(&self) -> Result<String, Error> { | |
519 | match self.count { | |
520 | 0 => Ok(String::new()), | |
6ef1b649 | 521 | 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?), |
d2354a16 DC |
522 | _ => { |
523 | Ok(format!( | |
524 | "{} .. {}", | |
6ef1b649 WB |
525 | proxmox_time::epoch_to_rfc3339_utc(self.oldest)?, |
526 | proxmox_time::epoch_to_rfc3339_utc(self.newest)?, | |
d2354a16 DC |
527 | )) |
528 | } | |
529 | } | |
530 | } | |
531 | } | |
532 | ||
533 | impl std::fmt::Display for SkipInfo { | |
534 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
535 | write!( | |
536 | f, | |
537 | "skipped: {} snapshot(s) ({}) older than the newest local snapshot", | |
538 | self.count, | |
539 | self.affected().map_err(|_| std::fmt::Error)? | |
540 | ) | |
541 | } | |
542 | } | |
543 | ||
07ad6470 DM |
544 | pub async fn pull_group( |
545 | worker: &WorkerTask, | |
546 | client: &HttpClient, | |
6e9e6c7a | 547 | params: &PullParameters, |
07ad6470 | 548 | group: &BackupGroup, |
fc8920e3 | 549 | progress: &mut StoreProgress, |
07ad6470 | 550 | ) -> Result<(), Error> { |
6e9e6c7a | 551 | let path = format!("api2/json/admin/datastore/{}/snapshots", params.source.store()); |
07ad6470 DM |
552 | |
553 | let args = json!({ | |
554 | "backup-type": group.backup_type(), | |
555 | "backup-id": group.backup_id(), | |
556 | }); | |
557 | ||
558 | let mut result = client.get(&path, Some(args)).await?; | |
559 | let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?; | |
560 | ||
561 | list.sort_unstable_by(|a, b| a.backup_time.cmp(&b.backup_time)); | |
562 | ||
0081903f DM |
563 | client.login().await?; // make sure auth is complete |
564 | ||
07ad6470 DM |
565 | let fingerprint = client.fingerprint(); |
566 | ||
6e9e6c7a | 567 | let last_sync = params.store.last_successful_backup(group)?; |
07ad6470 DM |
568 | |
569 | let mut remote_snapshots = std::collections::HashSet::new(); | |
570 | ||
ebbe4958 | 571 | // start with 16384 chunks (up to 65GB) |
e2956c60 | 572 | let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64))); |
ebbe4958 | 573 | |
fc8920e3 | 574 | progress.group_snapshots = list.len() as u64; |
7b8aa893 | 575 | |
d2354a16 DC |
576 | let mut skip_info = SkipInfo { |
577 | oldest: i64::MAX, | |
578 | newest: i64::MIN, | |
579 | count: 0, | |
580 | }; | |
581 | ||
7b8aa893 | 582 | for (pos, item) in list.into_iter().enumerate() { |
e0e5b442 | 583 | let snapshot = BackupDir::new(item.backup_type, item.backup_id, item.backup_time)?; |
86f6f741 FG |
584 | |
585 | // in-progress backups can't be synced | |
54aec2fa | 586 | if item.size.is_none() { |
1ec0d70d | 587 | task_log!(worker, "skipping snapshot {} - in-progress backup", snapshot); |
86f6f741 FG |
588 | continue; |
589 | } | |
590 | ||
591 | let backup_time = snapshot.backup_time(); | |
592 | ||
07ad6470 DM |
593 | remote_snapshots.insert(backup_time); |
594 | ||
595 | if let Some(last_sync_time) = last_sync { | |
e2956c60 | 596 | if last_sync_time > backup_time { |
d2354a16 | 597 | skip_info.update(backup_time); |
e2956c60 FG |
598 | continue; |
599 | } | |
07ad6470 DM |
600 | } |
601 | ||
0081903f DM |
602 | // get updated auth_info (new tickets) |
603 | let auth_info = client.login().await?; | |
604 | ||
93e3581c | 605 | let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone()); |
07ad6470 | 606 | |
e2956c60 | 607 | let new_client = HttpClient::new( |
6e9e6c7a FG |
608 | params.source.host(), |
609 | params.source.port(), | |
610 | params.source.auth_id(), | |
e2956c60 FG |
611 | options, |
612 | )?; | |
07ad6470 DM |
613 | |
614 | let reader = BackupReader::start( | |
615 | new_client, | |
616 | None, | |
6e9e6c7a | 617 | params.source.store(), |
86f6f741 FG |
618 | snapshot.group().backup_type(), |
619 | snapshot.group().backup_id(), | |
07ad6470 DM |
620 | backup_time, |
621 | true, | |
e2956c60 FG |
622 | ) |
623 | .await?; | |
07ad6470 | 624 | |
e2956c60 FG |
625 | let result = pull_snapshot_from( |
626 | worker, | |
627 | reader, | |
6e9e6c7a | 628 | params.store.clone(), |
e2956c60 FG |
629 | &snapshot, |
630 | downloaded_chunks.clone(), | |
631 | ) | |
632 | .await; | |
7b8aa893 | 633 | |
fc8920e3 | 634 | progress.done_snapshots = pos as u64 + 1; |
1ec0d70d | 635 | task_log!(worker, "percentage done: {}", progress); |
7b8aa893 DM |
636 | |
637 | result?; // stop on error | |
07ad6470 DM |
638 | } |
639 | ||
6e9e6c7a FG |
640 | if params.remove_vanished { |
641 | let local_list = group.list_backups(¶ms.store.base_path())?; | |
07ad6470 DM |
642 | for info in local_list { |
643 | let backup_time = info.backup_dir.backup_time(); | |
e2956c60 FG |
644 | if remote_snapshots.contains(&backup_time) { |
645 | continue; | |
646 | } | |
6e9e6c7a | 647 | if info.backup_dir.is_protected(params.store.base_path()) { |
34339261 DC |
648 | task_log!( |
649 | worker, | |
650 | "don't delete vanished snapshot {:?} (protected)", | |
651 | info.backup_dir.relative_path() | |
652 | ); | |
653 | continue; | |
654 | } | |
1ec0d70d | 655 | task_log!(worker, "delete vanished snapshot {:?}", info.backup_dir.relative_path()); |
6e9e6c7a | 656 | params.store.remove_backup_dir(&info.backup_dir, false)?; |
07ad6470 DM |
657 | } |
658 | } | |
659 | ||
d2354a16 DC |
660 | if skip_info.count > 0 { |
661 | task_log!(worker, "{}", skip_info); | |
662 | } | |
663 | ||
07ad6470 DM |
664 | Ok(()) |
665 | } | |
666 | ||
667 | pub async fn pull_store( | |
668 | worker: &WorkerTask, | |
669 | client: &HttpClient, | |
6e9e6c7a | 670 | params: &PullParameters, |
07ad6470 | 671 | ) -> Result<(), Error> { |
07ad6470 | 672 | // explicit create shared lock to prevent GC on newly created chunks |
6e9e6c7a | 673 | let _shared_store_lock = params.store.try_shared_chunk_store_lock()?; |
07ad6470 | 674 | |
6e9e6c7a | 675 | let path = format!("api2/json/admin/datastore/{}/groups", params.source.store()); |
07ad6470 | 676 | |
44de5bcc FG |
677 | let mut result = client |
678 | .get(&path, None) | |
679 | .await | |
680 | .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?; | |
07ad6470 DM |
681 | |
682 | let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?; | |
683 | ||
71e53463 | 684 | let total_count = list.len(); |
07ad6470 DM |
685 | list.sort_unstable_by(|a, b| { |
686 | let type_order = a.backup_type.cmp(&b.backup_type); | |
687 | if type_order == std::cmp::Ordering::Equal { | |
688 | a.backup_id.cmp(&b.backup_id) | |
689 | } else { | |
690 | type_order | |
691 | } | |
692 | }); | |
693 | ||
71e53463 FG |
694 | let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool { |
695 | filters | |
696 | .iter() | |
697 | .any(|filter| group.matches(filter)) | |
698 | }; | |
699 | ||
e2e7560d FG |
700 | let list:Vec<BackupGroup> = list |
701 | .into_iter() | |
702 | .map(|item| BackupGroup::new(item.backup_type, item.backup_id)) | |
703 | .collect(); | |
704 | ||
71e53463 FG |
705 | let list = if let Some(ref group_filter) = ¶ms.group_filter { |
706 | let unfiltered_count = list.len(); | |
707 | let list:Vec<BackupGroup> = list | |
708 | .into_iter() | |
709 | .filter(|group| { | |
710 | apply_filters(&group, group_filter) | |
711 | }) | |
712 | .collect(); | |
713 | task_log!(worker, "found {} groups to sync (out of {} total)", list.len(), unfiltered_count); | |
714 | list | |
715 | } else { | |
716 | task_log!(worker, "found {} groups to sync", total_count); | |
717 | list | |
718 | }; | |
719 | ||
07ad6470 DM |
720 | let mut errors = false; |
721 | ||
722 | let mut new_groups = std::collections::HashSet::new(); | |
e2e7560d FG |
723 | for group in list.iter() { |
724 | new_groups.insert(group.clone()); | |
07ad6470 DM |
725 | } |
726 | ||
fc8920e3 FG |
727 | let mut progress = StoreProgress::new(list.len() as u64); |
728 | ||
e2e7560d | 729 | for (done, group) in list.into_iter().enumerate() { |
fc8920e3 FG |
730 | progress.done_groups = done as u64; |
731 | progress.done_snapshots = 0; | |
732 | progress.group_snapshots = 0; | |
7b8aa893 | 733 | |
6e9e6c7a | 734 | let (owner, _lock_guard) = match params.store.create_locked_backup_group(&group, ¶ms.owner) { |
30f73fa2 DM |
735 | Ok(result) => result, |
736 | Err(err) => { | |
1ec0d70d DM |
737 | task_log!( |
738 | worker, | |
e2e7560d FG |
739 | "sync group {} failed - group lock failed: {}", |
740 | &group, err | |
1ec0d70d | 741 | ); |
30f73fa2 DM |
742 | errors = true; // do not stop here, instead continue |
743 | continue; | |
744 | } | |
745 | }; | |
746 | ||
07ad6470 | 747 | // permission check |
6e9e6c7a | 748 | if params.owner != owner { |
e2956c60 | 749 | // only the owner is allowed to create additional snapshots |
1ec0d70d DM |
750 | task_log!( |
751 | worker, | |
e2e7560d | 752 | "sync group {} failed - owner check failed ({} != {})", |
6e9e6c7a | 753 | &group, params.owner, owner |
1ec0d70d | 754 | ); |
7b8aa893 | 755 | errors = true; // do not stop here, instead continue |
20813274 WB |
756 | } else if let Err(err) = pull_group( |
757 | worker, | |
758 | client, | |
6e9e6c7a | 759 | params, |
20813274 | 760 | &group, |
fc8920e3 | 761 | &mut progress, |
e2956c60 FG |
762 | ) |
763 | .await | |
764 | { | |
1ec0d70d DM |
765 | task_log!( |
766 | worker, | |
e2e7560d FG |
767 | "sync group {} failed - {}", |
768 | &group, err, | |
1ec0d70d | 769 | ); |
20813274 | 770 | errors = true; // do not stop here, instead continue |
07ad6470 DM |
771 | } |
772 | } | |
773 | ||
6e9e6c7a | 774 | if params.remove_vanished { |
6ef1b649 | 775 | let result: Result<(), Error> = proxmox_lang::try_block!({ |
6e9e6c7a | 776 | let local_groups = BackupInfo::list_backup_groups(¶ms.store.base_path())?; |
07ad6470 | 777 | for local_group in local_groups { |
e2956c60 FG |
778 | if new_groups.contains(&local_group) { |
779 | continue; | |
780 | } | |
71e53463 FG |
781 | if let Some(ref group_filter) = ¶ms.group_filter { |
782 | if !apply_filters(&local_group, group_filter) { | |
783 | continue; | |
784 | } | |
785 | } | |
1ec0d70d DM |
786 | task_log!( |
787 | worker, | |
e2956c60 FG |
788 | "delete vanished group '{}/{}'", |
789 | local_group.backup_type(), | |
790 | local_group.backup_id() | |
1ec0d70d | 791 | ); |
6e9e6c7a | 792 | match params.store.remove_backup_group(&local_group) { |
34339261 DC |
793 | Ok(true) => {}, |
794 | Ok(false) => { | |
795 | task_log!(worker, "kept some protected snapshots of group '{}'", local_group); | |
796 | }, | |
797 | Err(err) => { | |
798 | task_log!(worker, "{}", err); | |
799 | errors = true; | |
800 | } | |
07ad6470 DM |
801 | } |
802 | } | |
803 | Ok(()) | |
804 | }); | |
805 | if let Err(err) = result { | |
1ec0d70d | 806 | task_log!(worker, "error during cleanup: {}", err); |
07ad6470 DM |
807 | errors = true; |
808 | }; | |
809 | } | |
810 | ||
811 | if errors { | |
812 | bail!("sync failed with some errors."); | |
813 | } | |
814 | ||
815 | Ok(()) | |
816 | } |