]>
Commit | Line | Data |
---|---|---|
07ad6470 DM |
1 | //! Sync datastore from remote server |
2 | ||
e2956c60 | 3 | use std::collections::{HashMap, HashSet}; |
07ad6470 | 4 | use std::convert::TryFrom; |
07ad6470 | 5 | use std::io::{Seek, SeekFrom}; |
54417086 | 6 | use std::sync::atomic::{AtomicUsize, Ordering}; |
e2956c60 FG |
7 | use std::sync::{Arc, Mutex}; |
8 | use std::time::SystemTime; | |
07ad6470 | 9 | |
c23192d3 | 10 | use anyhow::{bail, format_err, Error}; |
6ef1b649 | 11 | use http::StatusCode; |
c06c1b4b | 12 | use pbs_config::CachedUserInfo; |
ee0ea735 | 13 | use serde_json::json; |
c23192d3 | 14 | |
6ef1b649 | 15 | use proxmox_router::HttpError; |
d5790a9f | 16 | use proxmox_sys::task_log; |
c23192d3 | 17 | |
2d5287fb | 18 | use pbs_api_types::{ |
7d0dbaa0 FG |
19 | privs_to_priv_names, Authid, BackupNamespace, DatastoreWithNamespace, GroupFilter, |
20 | GroupListItem, NamespaceListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, | |
21 | MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, | |
2d5287fb | 22 | }; |
71e53463 | 23 | |
ee0ea735 TL |
24 | use pbs_client::{ |
25 | BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader, | |
26 | }; | |
ea584a75 WB |
27 | use pbs_datastore::data_blob::DataBlob; |
28 | use pbs_datastore::dynamic_index::DynamicIndexReader; | |
29 | use pbs_datastore::fixed_index::FixedIndexReader; | |
30 | use pbs_datastore::index::IndexFile; | |
31 | use pbs_datastore::manifest::{ | |
ee0ea735 | 32 | archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, |
ea584a75 | 33 | }; |
df768ebe | 34 | use pbs_datastore::{check_backup_owner, DataStore, StoreProgress}; |
ba0ccc59 | 35 | use pbs_tools::sha::sha256; |
b9700a9f | 36 | use proxmox_rest_server::WorkerTask; |
c23192d3 | 37 | |
260147bd | 38 | use crate::tools::parallel_handler::ParallelHandler; |
07ad6470 | 39 | |
29c56859 | 40 | /// Parameters for a pull operation. |
6e9e6c7a | 41 | pub struct PullParameters { |
29c56859 | 42 | /// Remote that is pulled from |
6e9e6c7a | 43 | remote: Remote, |
29c56859 | 44 | /// Full specification of remote datastore |
6e9e6c7a | 45 | source: BackupRepository, |
29c56859 | 46 | /// Local store that is pulled into |
6e9e6c7a | 47 | store: Arc<DataStore>, |
c06c1b4b FG |
48 | /// Remote namespace |
49 | remote_ns: BackupNamespace, | |
50 | /// Local namespace (anchor) | |
51 | ns: BackupNamespace, | |
29c56859 | 52 | /// Owner of synced groups (needs to match local owner of pre-existing groups) |
6e9e6c7a | 53 | owner: Authid, |
29c56859 | 54 | /// Whether to remove groups which exist locally, but not on the remote end |
6e9e6c7a | 55 | remove_vanished: bool, |
b9310489 FG |
56 | /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion) |
57 | max_depth: Option<usize>, | |
29c56859 | 58 | /// Filters for reducing the pull scope |
71e53463 | 59 | group_filter: Option<Vec<GroupFilter>>, |
29c56859 | 60 | /// Rate limits for all transfers from `remote` |
2d5287fb | 61 | limit: RateLimitConfig, |
6e9e6c7a FG |
62 | } |
63 | ||
64 | impl PullParameters { | |
29c56859 FG |
65 | /// Creates a new instance of `PullParameters`. |
66 | /// | |
67 | /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a | |
68 | /// [BackupRepository] with `remote_store`. | |
6e9e6c7a FG |
69 | pub fn new( |
70 | store: &str, | |
c06c1b4b | 71 | ns: BackupNamespace, |
6e9e6c7a FG |
72 | remote: &str, |
73 | remote_store: &str, | |
c06c1b4b | 74 | remote_ns: BackupNamespace, |
6e9e6c7a FG |
75 | owner: Authid, |
76 | remove_vanished: Option<bool>, | |
b9310489 | 77 | max_depth: Option<usize>, |
71e53463 | 78 | group_filter: Option<Vec<GroupFilter>>, |
2d5287fb | 79 | limit: RateLimitConfig, |
6e9e6c7a | 80 | ) -> Result<Self, Error> { |
e9d2fc93 | 81 | let store = DataStore::lookup_datastore(store, Some(Operation::Write))?; |
6e9e6c7a | 82 | |
66abc4cb FG |
83 | if let Some(max_depth) = max_depth { |
84 | ns.check_max_depth(max_depth)?; | |
85 | remote_ns.check_max_depth(max_depth)?; | |
86 | } | |
c06c1b4b | 87 | |
6e9e6c7a FG |
88 | let (remote_config, _digest) = pbs_config::remote::config()?; |
89 | let remote: Remote = remote_config.lookup("remote", remote)?; | |
90 | ||
61ef4ae8 | 91 | let remove_vanished = remove_vanished.unwrap_or(false); |
6e9e6c7a FG |
92 | |
93 | let source = BackupRepository::new( | |
94 | Some(remote.config.auth_id.clone()), | |
95 | Some(remote.config.host.clone()), | |
96 | remote.config.port, | |
97 | remote_store.to_string(), | |
98 | ); | |
99 | ||
ee0ea735 TL |
100 | Ok(Self { |
101 | remote, | |
c06c1b4b FG |
102 | remote_ns, |
103 | ns, | |
ee0ea735 TL |
104 | source, |
105 | store, | |
106 | owner, | |
107 | remove_vanished, | |
c06c1b4b | 108 | max_depth, |
ee0ea735 TL |
109 | group_filter, |
110 | limit, | |
111 | }) | |
6e9e6c7a FG |
112 | } |
113 | ||
29c56859 | 114 | /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from. |
6e9e6c7a | 115 | pub async fn client(&self) -> Result<HttpClient, Error> { |
2d5287fb | 116 | crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await |
6e9e6c7a | 117 | } |
c06c1b4b FG |
118 | |
119 | /// Returns DatastoreWithNamespace with namespace (or local namespace anchor). | |
120 | pub fn store_with_ns(&self, ns: BackupNamespace) -> DatastoreWithNamespace { | |
121 | DatastoreWithNamespace { | |
122 | store: self.store.name().to_string(), | |
123 | ns, | |
124 | } | |
125 | } | |
6e9e6c7a FG |
126 | } |
127 | ||
07ad6470 | 128 | async fn pull_index_chunks<I: IndexFile>( |
998db639 | 129 | worker: &WorkerTask, |
73b2cc49 | 130 | chunk_reader: RemoteChunkReader, |
07ad6470 DM |
131 | target: Arc<DataStore>, |
132 | index: I, | |
e2956c60 | 133 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 134 | ) -> Result<(), Error> { |
73b2cc49 | 135 | use futures::stream::{self, StreamExt, TryStreamExt}; |
07ad6470 | 136 | |
998db639 DM |
137 | let start_time = SystemTime::now(); |
138 | ||
ebbe4958 DM |
139 | let stream = stream::iter( |
140 | (0..index.index_count()) | |
141 | .map(|pos| index.chunk_info(pos).unwrap()) | |
142 | .filter(|info| { | |
143 | let mut guard = downloaded_chunks.lock().unwrap(); | |
144 | let done = guard.contains(&info.digest); | |
145 | if !done { | |
146 | // Note: We mark a chunk as downloaded before its actually downloaded | |
147 | // to avoid duplicate downloads. | |
148 | guard.insert(info.digest); | |
149 | } | |
150 | !done | |
e2956c60 | 151 | }), |
ebbe4958 | 152 | ); |
07ad6470 | 153 | |
a71bc08f | 154 | let target2 = target.clone(); |
54417086 | 155 | let verify_pool = ParallelHandler::new( |
e2956c60 FG |
156 | "sync chunk writer", |
157 | 4, | |
158 | move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { | |
25877d05 | 159 | // println!("verify and write {}", hex::encode(&digest)); |
54417086 | 160 | chunk.verify_unencrypted(size as usize, &digest)?; |
a71bc08f | 161 | target2.insert_chunk(&chunk, &digest)?; |
54417086 | 162 | Ok(()) |
e2956c60 | 163 | }, |
54417086 DM |
164 | ); |
165 | ||
166 | let verify_and_write_channel = verify_pool.channel(); | |
998db639 | 167 | |
54417086 DM |
168 | let bytes = Arc::new(AtomicUsize::new(0)); |
169 | ||
170 | stream | |
73b2cc49 | 171 | .map(|info| { |
73b2cc49 DM |
172 | let target = Arc::clone(&target); |
173 | let chunk_reader = chunk_reader.clone(); | |
54417086 DM |
174 | let bytes = Arc::clone(&bytes); |
175 | let verify_and_write_channel = verify_and_write_channel.clone(); | |
73b2cc49 DM |
176 | |
177 | Ok::<_, Error>(async move { | |
9a1b24b6 | 178 | let chunk_exists = proxmox_async::runtime::block_in_place(|| { |
e2956c60 FG |
179 | target.cond_touch_chunk(&info.digest, false) |
180 | })?; | |
73b2cc49 | 181 | if chunk_exists { |
25877d05 | 182 | //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest)); |
73b2cc49 DM |
183 | return Ok::<_, Error>(()); |
184 | } | |
25877d05 | 185 | //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest)); |
73b2cc49 | 186 | let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; |
54417086 | 187 | let raw_size = chunk.raw_size() as usize; |
73b2cc49 | 188 | |
998db639 | 189 | // decode, verify and write in a separate threads to maximize throughput |
9a1b24b6 | 190 | proxmox_async::runtime::block_in_place(|| { |
e2956c60 FG |
191 | verify_and_write_channel.send((chunk, info.digest, info.size())) |
192 | })?; | |
54417086 DM |
193 | |
194 | bytes.fetch_add(raw_size, Ordering::SeqCst); | |
998db639 DM |
195 | |
196 | Ok(()) | |
e2956c60 | 197 | }) |
73b2cc49 DM |
198 | }) |
199 | .try_buffer_unordered(20) | |
200 | .try_for_each(|_res| futures::future::ok(())) | |
54417086 | 201 | .await?; |
998db639 | 202 | |
54417086 | 203 | drop(verify_and_write_channel); |
998db639 | 204 | |
54417086 | 205 | verify_pool.complete()?; |
998db639 DM |
206 | |
207 | let elapsed = start_time.elapsed()?.as_secs_f64(); | |
208 | ||
54417086 DM |
209 | let bytes = bytes.load(Ordering::SeqCst); |
210 | ||
1ec0d70d | 211 | task_log!( |
ee0ea735 | 212 | worker, |
e2956c60 FG |
213 | "downloaded {} bytes ({:.2} MiB/s)", |
214 | bytes, | |
215 | (bytes as f64) / (1024.0 * 1024.0 * elapsed) | |
1ec0d70d | 216 | ); |
07ad6470 DM |
217 | |
218 | Ok(()) | |
219 | } | |
220 | ||
221 | async fn download_manifest( | |
222 | reader: &BackupReader, | |
223 | filename: &std::path::Path, | |
224 | ) -> Result<std::fs::File, Error> { | |
3d571d55 | 225 | let mut tmp_manifest_file = std::fs::OpenOptions::new() |
07ad6470 DM |
226 | .write(true) |
227 | .create(true) | |
194da6f8 | 228 | .truncate(true) |
07ad6470 DM |
229 | .read(true) |
230 | .open(&filename)?; | |
231 | ||
e2956c60 FG |
232 | reader |
233 | .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file) | |
234 | .await?; | |
07ad6470 DM |
235 | |
236 | tmp_manifest_file.seek(SeekFrom::Start(0))?; | |
237 | ||
238 | Ok(tmp_manifest_file) | |
239 | } | |
240 | ||
e2956c60 | 241 | fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> { |
2ce15934 | 242 | if size != info.size { |
e2956c60 FG |
243 | bail!( |
244 | "wrong size for file '{}' ({} != {})", | |
245 | info.filename, | |
246 | info.size, | |
247 | size | |
248 | ); | |
2ce15934 FG |
249 | } |
250 | ||
251 | if csum != &info.csum { | |
252 | bail!("wrong checksum for file '{}'", info.filename); | |
253 | } | |
254 | ||
255 | Ok(()) | |
256 | } | |
257 | ||
29c56859 FG |
258 | /// Pulls a single file referenced by a manifest. |
259 | /// | |
260 | /// Pulling an archive consists of the following steps: | |
261 | /// - Create tmp file for archive | |
262 | /// - Download archive file into tmp file | |
263 | /// - Verify tmp file checksum | |
264 | /// - if archive is an index, pull referenced chunks | |
265 | /// - Rename tmp file into real path | |
07ad6470 DM |
266 | async fn pull_single_archive( |
267 | worker: &WorkerTask, | |
268 | reader: &BackupReader, | |
269 | chunk_reader: &mut RemoteChunkReader, | |
c06c1b4b | 270 | snapshot: &pbs_datastore::BackupDir, |
2ce15934 | 271 | archive_info: &FileInfo, |
e2956c60 | 272 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 273 | ) -> Result<(), Error> { |
2ce15934 | 274 | let archive_name = &archive_info.filename; |
c06c1b4b | 275 | let mut path = snapshot.full_path(); |
07ad6470 DM |
276 | path.push(archive_name); |
277 | ||
278 | let mut tmp_path = path.clone(); | |
279 | tmp_path.set_extension("tmp"); | |
280 | ||
1ec0d70d DM |
281 | task_log!(worker, "sync archive {}", archive_name); |
282 | ||
3d571d55 | 283 | let mut tmpfile = std::fs::OpenOptions::new() |
07ad6470 DM |
284 | .write(true) |
285 | .create(true) | |
286 | .read(true) | |
287 | .open(&tmp_path)?; | |
288 | ||
3d571d55 | 289 | reader.download(archive_name, &mut tmpfile).await?; |
07ad6470 DM |
290 | |
291 | match archive_type(archive_name)? { | |
292 | ArchiveType::DynamicIndex => { | |
e2956c60 FG |
293 | let index = DynamicIndexReader::new(tmpfile).map_err(|err| { |
294 | format_err!("unable to read dynamic index {:?} - {}", tmp_path, err) | |
295 | })?; | |
2ce15934 FG |
296 | let (csum, size) = index.compute_csum(); |
297 | verify_archive(archive_info, &csum, size)?; | |
07ad6470 | 298 | |
e2956c60 FG |
299 | pull_index_chunks( |
300 | worker, | |
301 | chunk_reader.clone(), | |
c06c1b4b | 302 | snapshot.datastore().clone(), |
e2956c60 FG |
303 | index, |
304 | downloaded_chunks, | |
305 | ) | |
306 | .await?; | |
07ad6470 DM |
307 | } |
308 | ArchiveType::FixedIndex => { | |
e2956c60 FG |
309 | let index = FixedIndexReader::new(tmpfile).map_err(|err| { |
310 | format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err) | |
311 | })?; | |
2ce15934 FG |
312 | let (csum, size) = index.compute_csum(); |
313 | verify_archive(archive_info, &csum, size)?; | |
07ad6470 | 314 | |
e2956c60 FG |
315 | pull_index_chunks( |
316 | worker, | |
317 | chunk_reader.clone(), | |
c06c1b4b | 318 | snapshot.datastore().clone(), |
e2956c60 FG |
319 | index, |
320 | downloaded_chunks, | |
321 | ) | |
322 | .await?; | |
07ad6470 | 323 | } |
2ce15934 | 324 | ArchiveType::Blob => { |
ba0ccc59 WB |
325 | tmpfile.seek(SeekFrom::Start(0))?; |
326 | let (csum, size) = sha256(&mut tmpfile)?; | |
2ce15934 FG |
327 | verify_archive(archive_info, &csum, size)?; |
328 | } | |
07ad6470 DM |
329 | } |
330 | if let Err(err) = std::fs::rename(&tmp_path, &path) { | |
331 | bail!("Atomic rename file {:?} failed - {}", path, err); | |
332 | } | |
333 | Ok(()) | |
334 | } | |
335 | ||
1610c45a DM |
336 | // Note: The client.log.blob is uploaded after the backup, so it is |
337 | // not mentioned in the manifest. | |
338 | async fn try_client_log_download( | |
339 | worker: &WorkerTask, | |
340 | reader: Arc<BackupReader>, | |
341 | path: &std::path::Path, | |
342 | ) -> Result<(), Error> { | |
1610c45a DM |
343 | let mut tmp_path = path.to_owned(); |
344 | tmp_path.set_extension("tmp"); | |
345 | ||
346 | let tmpfile = std::fs::OpenOptions::new() | |
347 | .write(true) | |
348 | .create(true) | |
349 | .read(true) | |
350 | .open(&tmp_path)?; | |
351 | ||
add5861e | 352 | // Note: be silent if there is no log - only log successful download |
3d571d55 | 353 | if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await { |
1610c45a DM |
354 | if let Err(err) = std::fs::rename(&tmp_path, &path) { |
355 | bail!("Atomic rename file {:?} failed - {}", path, err); | |
356 | } | |
1ec0d70d | 357 | task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME); |
1610c45a DM |
358 | } |
359 | ||
360 | Ok(()) | |
361 | } | |
362 | ||
29c56859 FG |
363 | /// Actual implementation of pulling a snapshot. |
364 | /// | |
365 | /// Pulling a snapshot consists of the following steps: | |
366 | /// - (Re)download the manifest | |
367 | /// -- if it matches, only download log and treat snapshot as already synced | |
368 | /// - Iterate over referenced files | |
369 | /// -- if file already exists, verify contents | |
370 | /// -- if not, pull it from the remote | |
371 | /// - Download log if not already existing | |
07ad6470 DM |
372 | async fn pull_snapshot( |
373 | worker: &WorkerTask, | |
374 | reader: Arc<BackupReader>, | |
c06c1b4b | 375 | snapshot: &pbs_datastore::BackupDir, |
e2956c60 | 376 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 377 | ) -> Result<(), Error> { |
c06c1b4b | 378 | let mut manifest_name = snapshot.full_path(); |
07ad6470 DM |
379 | manifest_name.push(MANIFEST_BLOB_NAME); |
380 | ||
c06c1b4b | 381 | let mut client_log_name = snapshot.full_path(); |
1610c45a DM |
382 | client_log_name.push(CLIENT_LOG_BLOB_NAME); |
383 | ||
07ad6470 DM |
384 | let mut tmp_manifest_name = manifest_name.clone(); |
385 | tmp_manifest_name.set_extension("tmp"); | |
386 | ||
c1c4a18f FG |
387 | let download_res = download_manifest(&reader, &tmp_manifest_name).await; |
388 | let mut tmp_manifest_file = match download_res { | |
389 | Ok(manifest_file) => manifest_file, | |
390 | Err(err) => { | |
391 | match err.downcast_ref::<HttpError>() { | |
e2956c60 FG |
392 | Some(HttpError { code, message }) => match *code { |
393 | StatusCode::NOT_FOUND => { | |
1ec0d70d DM |
394 | task_log!( |
395 | worker, | |
1afce610 FG |
396 | "skipping snapshot {} - vanished since start of sync", |
397 | snapshot.dir(), | |
1ec0d70d | 398 | ); |
e2956c60 FG |
399 | return Ok(()); |
400 | } | |
401 | _ => { | |
87cdc327 | 402 | bail!("HTTP error {code} - {message}"); |
c1c4a18f FG |
403 | } |
404 | }, | |
405 | None => { | |
406 | return Err(err); | |
e2956c60 | 407 | } |
c1c4a18f | 408 | }; |
e2956c60 | 409 | } |
c1c4a18f | 410 | }; |
39f18b30 | 411 | let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?; |
07ad6470 DM |
412 | |
413 | if manifest_name.exists() { | |
6ef1b649 | 414 | let manifest_blob = proxmox_lang::try_block!({ |
e2956c60 | 415 | let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| { |
87cdc327 | 416 | format_err!("unable to open local manifest {manifest_name:?} - {err}") |
e2956c60 | 417 | })?; |
07ad6470 | 418 | |
39f18b30 | 419 | let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?; |
07ad6470 | 420 | Ok(manifest_blob) |
e2956c60 FG |
421 | }) |
422 | .map_err(|err: Error| { | |
87cdc327 | 423 | format_err!("unable to read local manifest {manifest_name:?} - {err}") |
07ad6470 DM |
424 | })?; |
425 | ||
426 | if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { | |
1610c45a DM |
427 | if !client_log_name.exists() { |
428 | try_client_log_download(worker, reader, &client_log_name).await?; | |
429 | } | |
1ec0d70d | 430 | task_log!(worker, "no data changes"); |
e0085e66 | 431 | let _ = std::fs::remove_file(&tmp_manifest_name); |
07ad6470 DM |
432 | return Ok(()); // nothing changed |
433 | } | |
434 | } | |
435 | ||
436 | let manifest = BackupManifest::try_from(tmp_manifest_blob)?; | |
437 | ||
07ad6470 | 438 | for item in manifest.files() { |
c06c1b4b | 439 | let mut path = snapshot.full_path(); |
07ad6470 DM |
440 | path.push(&item.filename); |
441 | ||
442 | if path.exists() { | |
443 | match archive_type(&item.filename)? { | |
444 | ArchiveType::DynamicIndex => { | |
445 | let index = DynamicIndexReader::open(&path)?; | |
446 | let (csum, size) = index.compute_csum(); | |
447 | match manifest.verify_file(&item.filename, &csum, size) { | |
448 | Ok(_) => continue, | |
449 | Err(err) => { | |
1ec0d70d | 450 | task_log!(worker, "detected changed file {:?} - {}", path, err); |
07ad6470 DM |
451 | } |
452 | } | |
453 | } | |
454 | ArchiveType::FixedIndex => { | |
455 | let index = FixedIndexReader::open(&path)?; | |
456 | let (csum, size) = index.compute_csum(); | |
457 | match manifest.verify_file(&item.filename, &csum, size) { | |
458 | Ok(_) => continue, | |
459 | Err(err) => { | |
1ec0d70d | 460 | task_log!(worker, "detected changed file {:?} - {}", path, err); |
07ad6470 DM |
461 | } |
462 | } | |
463 | } | |
464 | ArchiveType::Blob => { | |
465 | let mut tmpfile = std::fs::File::open(&path)?; | |
ba0ccc59 | 466 | let (csum, size) = sha256(&mut tmpfile)?; |
07ad6470 DM |
467 | match manifest.verify_file(&item.filename, &csum, size) { |
468 | Ok(_) => continue, | |
469 | Err(err) => { | |
1ec0d70d | 470 | task_log!(worker, "detected changed file {:?} - {}", path, err); |
07ad6470 DM |
471 | } |
472 | } | |
473 | } | |
474 | } | |
475 | } | |
476 | ||
e2956c60 FG |
477 | let mut chunk_reader = RemoteChunkReader::new( |
478 | reader.clone(), | |
479 | None, | |
480 | item.chunk_crypt_mode(), | |
481 | HashMap::new(), | |
482 | ); | |
14f6c9cb | 483 | |
07ad6470 DM |
484 | pull_single_archive( |
485 | worker, | |
486 | &reader, | |
487 | &mut chunk_reader, | |
07ad6470 | 488 | snapshot, |
9a37bd6c | 489 | item, |
ebbe4958 | 490 | downloaded_chunks.clone(), |
e2956c60 FG |
491 | ) |
492 | .await?; | |
07ad6470 DM |
493 | } |
494 | ||
495 | if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { | |
496 | bail!("Atomic rename file {:?} failed - {}", manifest_name, err); | |
497 | } | |
498 | ||
1610c45a DM |
499 | if !client_log_name.exists() { |
500 | try_client_log_download(worker, reader, &client_log_name).await?; | |
501 | } | |
502 | ||
fe79687c TL |
503 | snapshot |
504 | .cleanup_unreferenced_files(&manifest) | |
505 | .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?; | |
07ad6470 DM |
506 | |
507 | Ok(()) | |
508 | } | |
509 | ||
c06c1b4b FG |
510 | /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case. |
511 | /// | |
512 | /// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is | |
513 | /// pointing to the local datastore and target namespace. | |
aa073917 | 514 | async fn pull_snapshot_from( |
07ad6470 DM |
515 | worker: &WorkerTask, |
516 | reader: Arc<BackupReader>, | |
c06c1b4b | 517 | snapshot: &pbs_datastore::BackupDir, |
e2956c60 | 518 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 519 | ) -> Result<(), Error> { |
c06c1b4b FG |
520 | let (_path, is_new, _snap_lock) = snapshot |
521 | .datastore() | |
1afce610 | 522 | .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?; |
07ad6470 DM |
523 | |
524 | if is_new { | |
1afce610 | 525 | task_log!(worker, "sync snapshot {}", snapshot.dir()); |
07ad6470 | 526 | |
c06c1b4b | 527 | if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await { |
1afce610 FG |
528 | if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir( |
529 | snapshot.backup_ns(), | |
530 | snapshot.as_ref(), | |
531 | true, | |
532 | ) { | |
1ec0d70d | 533 | task_log!(worker, "cleanup error - {}", cleanup_err); |
07ad6470 DM |
534 | } |
535 | return Err(err); | |
536 | } | |
1afce610 | 537 | task_log!(worker, "sync snapshot {} done", snapshot.dir()); |
07ad6470 | 538 | } else { |
1afce610 | 539 | task_log!(worker, "re-sync snapshot {}", snapshot.dir()); |
c06c1b4b | 540 | pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?; |
1afce610 | 541 | task_log!(worker, "re-sync snapshot {} done", snapshot.dir()); |
07ad6470 DM |
542 | } |
543 | ||
544 | Ok(()) | |
545 | } | |
546 | ||
d2354a16 DC |
547 | struct SkipInfo { |
548 | oldest: i64, | |
549 | newest: i64, | |
550 | count: u64, | |
551 | } | |
552 | ||
553 | impl SkipInfo { | |
554 | fn update(&mut self, backup_time: i64) { | |
555 | self.count += 1; | |
556 | ||
557 | if backup_time < self.oldest { | |
558 | self.oldest = backup_time; | |
559 | } | |
560 | ||
561 | if backup_time > self.newest { | |
562 | self.newest = backup_time; | |
563 | } | |
564 | } | |
565 | ||
566 | fn affected(&self) -> Result<String, Error> { | |
567 | match self.count { | |
568 | 0 => Ok(String::new()), | |
6ef1b649 | 569 | 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?), |
ee0ea735 TL |
570 | _ => Ok(format!( |
571 | "{} .. {}", | |
572 | proxmox_time::epoch_to_rfc3339_utc(self.oldest)?, | |
573 | proxmox_time::epoch_to_rfc3339_utc(self.newest)?, | |
574 | )), | |
d2354a16 DC |
575 | } |
576 | } | |
577 | } | |
578 | ||
579 | impl std::fmt::Display for SkipInfo { | |
580 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
581 | write!( | |
582 | f, | |
583 | "skipped: {} snapshot(s) ({}) older than the newest local snapshot", | |
584 | self.count, | |
585 | self.affected().map_err(|_| std::fmt::Error)? | |
586 | ) | |
587 | } | |
588 | } | |
589 | ||
29c56859 FG |
590 | /// Pulls a group according to `params`. |
591 | /// | |
592 | /// Pulling a group consists of the following steps: | |
c06c1b4b FG |
593 | /// - Query the list of snapshots available for this group in the source namespace on the remote |
594 | /// - Sort by snapshot time | |
29c56859 FG |
595 | /// - Get last snapshot timestamp on local datastore |
596 | /// - Iterate over list of snapshots | |
597 | /// -- Recreate client/BackupReader | |
598 | /// -- pull snapshot, unless it's not finished yet or older than last local snapshot | |
599 | /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote | |
600 | /// | |
c06c1b4b FG |
601 | /// Backwards-compat: if `source_ns` is [None], only the group type and ID will be sent to the |
602 | /// remote when querying snapshots. This allows us to interact with old remotes that don't have | |
603 | /// namespace support yet. | |
604 | /// | |
29c56859 FG |
605 | /// Permission checks: |
606 | /// - remote snapshot access is checked by remote (twice: query and opening the backup reader) | |
607 | /// - local group owner is already checked by pull_store | |
aa073917 | 608 | async fn pull_group( |
07ad6470 DM |
609 | worker: &WorkerTask, |
610 | client: &HttpClient, | |
6e9e6c7a | 611 | params: &PullParameters, |
db87d93e | 612 | group: &pbs_api_types::BackupGroup, |
c06c1b4b | 613 | remote_ns: BackupNamespace, |
fc8920e3 | 614 | progress: &mut StoreProgress, |
07ad6470 | 615 | ) -> Result<(), Error> { |
ee0ea735 TL |
616 | let path = format!( |
617 | "api2/json/admin/datastore/{}/snapshots", | |
618 | params.source.store() | |
619 | ); | |
07ad6470 | 620 | |
c06c1b4b | 621 | let mut args = json!({ |
db87d93e WB |
622 | "backup-type": group.ty, |
623 | "backup-id": group.id, | |
07ad6470 DM |
624 | }); |
625 | ||
c06c1b4b | 626 | if !remote_ns.is_root() { |
bc21ade2 | 627 | args["ns"] = serde_json::to_value(&remote_ns)?; |
c06c1b4b FG |
628 | } |
629 | ||
630 | let target_ns = remote_ns.map_prefix(¶ms.remote_ns, ¶ms.ns)?; | |
631 | ||
07ad6470 DM |
632 | let mut result = client.get(&path, Some(args)).await?; |
633 | let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?; | |
634 | ||
988d575d | 635 | list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time)); |
07ad6470 | 636 | |
0081903f DM |
637 | client.login().await?; // make sure auth is complete |
638 | ||
07ad6470 DM |
639 | let fingerprint = client.fingerprint(); |
640 | ||
c06c1b4b | 641 | let last_sync = params.store.last_successful_backup(&target_ns, group)?; |
07ad6470 DM |
642 | |
643 | let mut remote_snapshots = std::collections::HashSet::new(); | |
644 | ||
f37d8540 | 645 | // start with 65536 chunks (up to 256 GiB) |
e2956c60 | 646 | let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64))); |
ebbe4958 | 647 | |
fc8920e3 | 648 | progress.group_snapshots = list.len() as u64; |
7b8aa893 | 649 | |
d2354a16 DC |
650 | let mut skip_info = SkipInfo { |
651 | oldest: i64::MAX, | |
652 | newest: i64::MIN, | |
653 | count: 0, | |
654 | }; | |
655 | ||
7b8aa893 | 656 | for (pos, item) in list.into_iter().enumerate() { |
db87d93e | 657 | let snapshot = item.backup; |
86f6f741 FG |
658 | |
659 | // in-progress backups can't be synced | |
54aec2fa | 660 | if item.size.is_none() { |
ee0ea735 TL |
661 | task_log!( |
662 | worker, | |
663 | "skipping snapshot {} - in-progress backup", | |
664 | snapshot | |
665 | ); | |
86f6f741 FG |
666 | continue; |
667 | } | |
668 | ||
8c74349b | 669 | remote_snapshots.insert(snapshot.time); |
07ad6470 DM |
670 | |
671 | if let Some(last_sync_time) = last_sync { | |
8c74349b WB |
672 | if last_sync_time > snapshot.time { |
673 | skip_info.update(snapshot.time); | |
e2956c60 FG |
674 | continue; |
675 | } | |
07ad6470 DM |
676 | } |
677 | ||
0081903f DM |
678 | // get updated auth_info (new tickets) |
679 | let auth_info = client.login().await?; | |
680 | ||
ee0ea735 TL |
681 | let options = |
682 | HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone()) | |
683 | .rate_limit(params.limit.clone()); | |
07ad6470 | 684 | |
e2956c60 | 685 | let new_client = HttpClient::new( |
6e9e6c7a FG |
686 | params.source.host(), |
687 | params.source.port(), | |
688 | params.source.auth_id(), | |
e2956c60 FG |
689 | options, |
690 | )?; | |
07ad6470 | 691 | |
133d718f WB |
692 | let reader = BackupReader::start( |
693 | new_client, | |
694 | None, | |
695 | params.source.store(), | |
c06c1b4b | 696 | &remote_ns, |
133d718f WB |
697 | &snapshot, |
698 | true, | |
699 | ) | |
700 | .await?; | |
07ad6470 | 701 | |
c06c1b4b FG |
702 | let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?; |
703 | ||
704 | let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await; | |
7b8aa893 | 705 | |
fc8920e3 | 706 | progress.done_snapshots = pos as u64 + 1; |
1ec0d70d | 707 | task_log!(worker, "percentage done: {}", progress); |
7b8aa893 DM |
708 | |
709 | result?; // stop on error | |
07ad6470 DM |
710 | } |
711 | ||
6e9e6c7a | 712 | if params.remove_vanished { |
c06c1b4b | 713 | let group = params.store.backup_group(target_ns.clone(), group.clone()); |
6da20161 | 714 | let local_list = group.list_backups()?; |
07ad6470 | 715 | for info in local_list { |
1afce610 FG |
716 | let snapshot = info.backup_dir; |
717 | if remote_snapshots.contains(&snapshot.backup_time()) { | |
e2956c60 FG |
718 | continue; |
719 | } | |
1afce610 | 720 | if snapshot.is_protected() { |
34339261 DC |
721 | task_log!( |
722 | worker, | |
e3ea5770 | 723 | "don't delete vanished snapshot {} (protected)", |
1afce610 | 724 | snapshot.dir() |
34339261 DC |
725 | ); |
726 | continue; | |
727 | } | |
1afce610 | 728 | task_log!(worker, "delete vanished snapshot {}", snapshot.dir()); |
db87d93e WB |
729 | params |
730 | .store | |
1afce610 | 731 | .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?; |
07ad6470 DM |
732 | } |
733 | } | |
734 | ||
d2354a16 DC |
735 | if skip_info.count > 0 { |
736 | task_log!(worker, "{}", skip_info); | |
737 | } | |
738 | ||
07ad6470 DM |
739 | Ok(()) |
740 | } | |
741 | ||
7a3e777d | 742 | // will modify params if switching to backwards mode for lack of NS support on remote end |
c06c1b4b | 743 | async fn query_namespaces( |
7a3e777d | 744 | worker: &WorkerTask, |
c06c1b4b | 745 | client: &HttpClient, |
7a3e777d | 746 | params: &mut PullParameters, |
c06c1b4b FG |
747 | ) -> Result<Vec<BackupNamespace>, Error> { |
748 | let path = format!( | |
749 | "api2/json/admin/datastore/{}/namespace", | |
750 | params.source.store() | |
751 | ); | |
11567dfb FG |
752 | let mut data = json!({}); |
753 | if let Some(max_depth) = params.max_depth { | |
754 | data["max-depth"] = json!(max_depth); | |
755 | } | |
756 | ||
757 | if !params.remote_ns.is_root() { | |
758 | data["parent"] = json!(params.remote_ns); | |
759 | } | |
7a3e777d | 760 | |
11567dfb | 761 | let mut result = match client.get(&path, Some(data)).await { |
7a3e777d FG |
762 | Ok(res) => res, |
763 | Err(err) => match err.downcast_ref::<HttpError>() { | |
764 | Some(HttpError { code, message }) => match *code { | |
765 | StatusCode::NOT_FOUND => { | |
766 | if params.remote_ns.is_root() && params.max_depth.is_none() { | |
767 | task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode"); | |
768 | task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system."); | |
769 | params.max_depth = Some(0); | |
770 | } else { | |
771 | bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.") | |
772 | } | |
773 | ||
774 | return Ok(vec![params.remote_ns.clone()]); | |
775 | } | |
776 | _ => { | |
777 | bail!("Querying namespaces failed - HTTP error {code} - {message}"); | |
778 | } | |
779 | }, | |
780 | None => { | |
781 | bail!("Querying namespaces failed - {err}"); | |
782 | } | |
783 | }, | |
784 | }; | |
785 | ||
c06c1b4b FG |
786 | let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?; |
787 | ||
788 | // parents first | |
789 | list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len())); | |
790 | ||
791 | Ok(list.iter().map(|item| item.ns.clone()).collect()) | |
792 | } | |
793 | ||
794 | fn check_ns_privs( | |
795 | store_with_ns: &DatastoreWithNamespace, | |
796 | owner: &Authid, | |
797 | privs: u64, | |
798 | ) -> Result<(), Error> { | |
799 | let user_info = CachedUserInfo::new()?; | |
800 | ||
801 | // TODO re-sync with API, maybe find common place? | |
802 | ||
7d0dbaa0 FG |
803 | let path = &store_with_ns.acl_path(); |
804 | let user_privs = user_info.lookup_privs(owner, path); | |
c06c1b4b FG |
805 | |
806 | if (user_privs & privs) == 0 { | |
7d0dbaa0 FG |
807 | let priv_names = privs_to_priv_names(privs).join("|"); |
808 | let path = path.join("/"); | |
809 | bail!("privilege(s) {priv_names} missing on /{path}"); | |
c06c1b4b FG |
810 | } |
811 | Ok(()) | |
812 | } | |
813 | ||
814 | fn check_and_create_ns( | |
815 | params: &PullParameters, | |
816 | store_with_ns: &DatastoreWithNamespace, | |
817 | ) -> Result<bool, Error> { | |
818 | let ns = &store_with_ns.ns; | |
819 | let mut created = false; | |
820 | ||
821 | if !ns.is_root() && !params.store.namespace_path(&ns).exists() { | |
822 | let mut parent = ns.clone(); | |
823 | let name = parent.pop(); | |
824 | ||
825 | let parent = params.store_with_ns(parent); | |
826 | ||
827 | if let Err(err) = check_ns_privs(&parent, ¶ms.owner, PRIV_DATASTORE_MODIFY) { | |
828 | bail!( | |
829 | "Not allowed to create namespace {} - {}", | |
830 | store_with_ns, | |
831 | err, | |
832 | ); | |
833 | } | |
834 | if let Some(name) = name { | |
835 | if let Err(err) = params.store.create_namespace(&parent.ns, name) { | |
836 | bail!( | |
837 | "sync namespace {} failed - namespace creation failed: {}", | |
838 | &store_with_ns, | |
839 | err | |
840 | ); | |
841 | } | |
842 | created = true; | |
843 | } else { | |
844 | bail!( | |
845 | "sync namespace {} failed - namespace creation failed - couldn't determine parent namespace", | |
846 | &store_with_ns, | |
847 | ); | |
848 | } | |
849 | } | |
850 | ||
851 | // TODO re-sync with API, maybe find common place? | |
852 | if let Err(err) = check_ns_privs(&store_with_ns, ¶ms.owner, PRIV_DATASTORE_BACKUP) { | |
853 | bail!("sync namespace {} failed - {}", &store_with_ns, err); | |
854 | } | |
855 | ||
856 | Ok(created) | |
857 | } | |
858 | ||
859 | fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> { | |
860 | let parent = local_ns.clone().parent(); | |
861 | check_ns_privs( | |
862 | ¶ms.store_with_ns(parent), | |
863 | ¶ms.owner, | |
864 | PRIV_DATASTORE_MODIFY, | |
865 | )?; | |
d1f9ccea | 866 | params.store.remove_namespace_recursive(local_ns, true) |
c06c1b4b FG |
867 | } |
868 | ||
869 | fn check_and_remove_vanished_ns( | |
870 | worker: &WorkerTask, | |
871 | params: &PullParameters, | |
872 | synced_ns: HashSet<BackupNamespace>, | |
873 | ) -> Result<bool, Error> { | |
874 | let mut errors = false; | |
875 | let user_info = CachedUserInfo::new()?; | |
876 | ||
87be232d FG |
877 | // clamp like remote does so that we don't list more than we can ever have synced. |
878 | let max_depth = params | |
879 | .max_depth | |
880 | .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth()); | |
881 | ||
c06c1b4b FG |
882 | let mut local_ns_list: Vec<BackupNamespace> = params |
883 | .store | |
87be232d | 884 | .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))? |
c06c1b4b FG |
885 | .filter(|ns| { |
886 | let store_with_ns = params.store_with_ns(ns.clone()); | |
887 | let user_privs = user_info.lookup_privs(¶ms.owner, &store_with_ns.acl_path()); | |
888 | user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0 | |
889 | }) | |
890 | .collect(); | |
891 | ||
892 | // children first! | |
893 | local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len())); | |
894 | ||
895 | for local_ns in local_ns_list { | |
896 | if local_ns == params.ns { | |
897 | continue; | |
898 | } | |
899 | ||
900 | if synced_ns.contains(&local_ns) { | |
901 | continue; | |
902 | } | |
903 | ||
904 | if local_ns.is_root() { | |
905 | continue; | |
906 | } | |
907 | match check_and_remove_ns(params, &local_ns) { | |
908 | Ok(true) => task_log!(worker, "Removed namespace {}", local_ns), | |
909 | Ok(false) => task_log!( | |
910 | worker, | |
911 | "Did not remove namespace {} - protected snapshots remain", | |
912 | local_ns | |
913 | ), | |
914 | Err(err) => { | |
915 | task_log!(worker, "Failed to remove namespace {} - {}", local_ns, err); | |
916 | errors = true; | |
917 | } | |
918 | } | |
919 | } | |
920 | ||
921 | Ok(errors) | |
922 | } | |
923 | ||
29c56859 FG |
924 | /// Pulls a store according to `params`. |
925 | /// | |
926 | /// Pulling a store consists of the following steps: | |
c06c1b4b FG |
927 | /// - Query list of namespaces on the remote |
928 | /// - Iterate list | |
929 | /// -- create sub-NS if needed (and allowed) | |
930 | /// -- attempt to pull each NS in turn | |
931 | /// - (remove_vanished && max_depth > 0) remove sub-NS which are not or no longer available on the remote | |
932 | /// | |
933 | /// Backwards compat: if the remote namespace is `/` and recursion is disabled, no namespace is | |
934 | /// passed to the remote at all to allow pulling from remotes which have no notion of namespaces. | |
29c56859 FG |
935 | /// |
936 | /// Permission checks: | |
c06c1b4b FG |
937 | /// - access to local datastore, namespace anchor and remote entry need to be checked at call site |
938 | /// - remote namespaces are filtered by remote | |
939 | /// - creation and removal of sub-NS checked here | |
940 | /// - access to sub-NS checked here | |
07ad6470 DM |
941 | pub async fn pull_store( |
942 | worker: &WorkerTask, | |
943 | client: &HttpClient, | |
d9aad37f | 944 | mut params: PullParameters, |
07ad6470 | 945 | ) -> Result<(), Error> { |
07ad6470 | 946 | // explicit create shared lock to prevent GC on newly created chunks |
6e9e6c7a | 947 | let _shared_store_lock = params.store.try_shared_chunk_store_lock()?; |
7a3e777d | 948 | let mut errors = false; |
07ad6470 | 949 | |
7a3e777d | 950 | let old_max_depth = params.max_depth; |
b9310489 | 951 | let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) { |
c06c1b4b FG |
952 | vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces! |
953 | } else { | |
7a3e777d | 954 | query_namespaces(worker, client, &mut params).await? |
c06c1b4b | 955 | }; |
7a3e777d | 956 | errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode |
c06c1b4b FG |
957 | |
958 | let (mut groups, mut snapshots) = (0, 0); | |
959 | let mut synced_ns = HashSet::with_capacity(namespaces.len()); | |
c06c1b4b FG |
960 | |
961 | for namespace in namespaces { | |
962 | let source_store_ns = DatastoreWithNamespace { | |
963 | store: params.source.store().to_owned(), | |
964 | ns: namespace.clone(), | |
965 | }; | |
966 | let target_ns = namespace.map_prefix(¶ms.remote_ns, ¶ms.ns)?; | |
967 | let target_store_ns = params.store_with_ns(target_ns.clone()); | |
968 | ||
969 | task_log!(worker, "----"); | |
970 | task_log!( | |
971 | worker, | |
972 | "Syncing {} into {}", | |
973 | source_store_ns, | |
974 | target_store_ns | |
975 | ); | |
976 | ||
977 | synced_ns.insert(target_ns.clone()); | |
978 | ||
d9aad37f | 979 | match check_and_create_ns(¶ms, &target_store_ns) { |
c06c1b4b FG |
980 | Ok(true) => task_log!(worker, "Created namespace {}", target_ns), |
981 | Ok(false) => {} | |
982 | Err(err) => { | |
983 | task_log!( | |
984 | worker, | |
985 | "Cannot sync {} into {} - {}", | |
986 | source_store_ns, | |
987 | target_store_ns, | |
988 | err, | |
989 | ); | |
990 | errors = true; | |
991 | continue; | |
992 | } | |
993 | } | |
994 | ||
d9aad37f | 995 | match pull_ns(worker, client, ¶ms, namespace.clone(), target_ns).await { |
c06c1b4b FG |
996 | Ok((ns_progress, ns_errors)) => { |
997 | errors |= ns_errors; | |
998 | ||
b9310489 | 999 | if params.max_depth != Some(0) { |
c06c1b4b FG |
1000 | groups += ns_progress.done_groups; |
1001 | snapshots += ns_progress.done_snapshots; | |
1002 | task_log!( | |
1003 | worker, | |
1004 | "Finished syncing namespace {}, current progress: {} groups, {} snapshots", | |
1005 | namespace, | |
1006 | groups, | |
1007 | snapshots, | |
1008 | ); | |
1009 | } | |
1010 | } | |
1011 | Err(err) => { | |
1012 | errors = true; | |
1013 | task_log!( | |
1014 | worker, | |
1015 | "Encountered errors while syncing namespace {} - {}", | |
1016 | namespace, | |
1017 | err, | |
1018 | ); | |
1019 | } | |
1020 | }; | |
1021 | } | |
1022 | ||
1023 | if params.remove_vanished { | |
d9aad37f | 1024 | errors |= check_and_remove_vanished_ns(worker, ¶ms, synced_ns)?; |
c06c1b4b FG |
1025 | } |
1026 | ||
1027 | if errors { | |
1028 | bail!("sync failed with some errors."); | |
1029 | } | |
1030 | ||
1031 | Ok(()) | |
1032 | } | |
1033 | ||
1034 | /// Pulls a namespace according to `params`. | |
1035 | /// | |
1036 | /// Pulling a namespace consists of the following steps: | |
1037 | /// - Query list of groups on the remote (in `source_ns`) | |
1038 | /// - Filter list according to configured group filters | |
1039 | /// - Iterate list and attempt to pull each group in turn | |
1040 | /// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are | |
1041 | /// not or no longer available on the remote | |
1042 | /// | |
1043 | /// Permission checks: | |
1044 | /// - remote namespaces are filtered by remote | |
1045 | /// - owner check for vanished groups done here | |
1046 | pub async fn pull_ns( | |
1047 | worker: &WorkerTask, | |
1048 | client: &HttpClient, | |
1049 | params: &PullParameters, | |
1050 | source_ns: BackupNamespace, | |
1051 | target_ns: BackupNamespace, | |
1052 | ) -> Result<(StoreProgress, bool), Error> { | |
6e9e6c7a | 1053 | let path = format!("api2/json/admin/datastore/{}/groups", params.source.store()); |
07ad6470 | 1054 | |
c06c1b4b FG |
1055 | let args = if !source_ns.is_root() { |
1056 | Some(json!({ | |
bc21ade2 | 1057 | "ns": source_ns, |
c06c1b4b FG |
1058 | })) |
1059 | } else { | |
1060 | None | |
1061 | }; | |
1062 | ||
44de5bcc | 1063 | let mut result = client |
c06c1b4b | 1064 | .get(&path, args) |
44de5bcc FG |
1065 | .await |
1066 | .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?; | |
07ad6470 DM |
1067 | |
1068 | let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?; | |
1069 | ||
71e53463 | 1070 | let total_count = list.len(); |
07ad6470 | 1071 | list.sort_unstable_by(|a, b| { |
988d575d | 1072 | let type_order = a.backup.ty.cmp(&b.backup.ty); |
07ad6470 | 1073 | if type_order == std::cmp::Ordering::Equal { |
988d575d | 1074 | a.backup.id.cmp(&b.backup.id) |
07ad6470 DM |
1075 | } else { |
1076 | type_order | |
1077 | } | |
1078 | }); | |
1079 | ||
db87d93e | 1080 | let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool { |
ee0ea735 | 1081 | filters.iter().any(|filter| group.matches(filter)) |
71e53463 FG |
1082 | }; |
1083 | ||
c06c1b4b | 1084 | // Get groups with target NS set |
db87d93e | 1085 | let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect(); |
e2e7560d | 1086 | |
71e53463 FG |
1087 | let list = if let Some(ref group_filter) = ¶ms.group_filter { |
1088 | let unfiltered_count = list.len(); | |
db87d93e | 1089 | let list: Vec<pbs_api_types::BackupGroup> = list |
71e53463 | 1090 | .into_iter() |
ee0ea735 | 1091 | .filter(|group| apply_filters(group, group_filter)) |
71e53463 | 1092 | .collect(); |
ee0ea735 TL |
1093 | task_log!( |
1094 | worker, | |
1095 | "found {} groups to sync (out of {} total)", | |
1096 | list.len(), | |
1097 | unfiltered_count | |
1098 | ); | |
71e53463 FG |
1099 | list |
1100 | } else { | |
1101 | task_log!(worker, "found {} groups to sync", total_count); | |
1102 | list | |
1103 | }; | |
1104 | ||
07ad6470 DM |
1105 | let mut errors = false; |
1106 | ||
1107 | let mut new_groups = std::collections::HashSet::new(); | |
e2e7560d FG |
1108 | for group in list.iter() { |
1109 | new_groups.insert(group.clone()); | |
07ad6470 DM |
1110 | } |
1111 | ||
fc8920e3 FG |
1112 | let mut progress = StoreProgress::new(list.len() as u64); |
1113 | ||
e2e7560d | 1114 | for (done, group) in list.into_iter().enumerate() { |
fc8920e3 FG |
1115 | progress.done_groups = done as u64; |
1116 | progress.done_snapshots = 0; | |
1117 | progress.group_snapshots = 0; | |
7b8aa893 | 1118 | |
133d718f WB |
1119 | let (owner, _lock_guard) = |
1120 | match params | |
1121 | .store | |
c06c1b4b | 1122 | .create_locked_backup_group(&target_ns, &group, ¶ms.owner) |
133d718f WB |
1123 | { |
1124 | Ok(result) => result, | |
1125 | Err(err) => { | |
1126 | task_log!( | |
1127 | worker, | |
1128 | "sync group {} failed - group lock failed: {}", | |
1129 | &group, | |
1130 | err | |
1131 | ); | |
1132 | errors = true; // do not stop here, instead continue | |
1133 | continue; | |
1134 | } | |
1135 | }; | |
30f73fa2 | 1136 | |
07ad6470 | 1137 | // permission check |
6e9e6c7a | 1138 | if params.owner != owner { |
e2956c60 | 1139 | // only the owner is allowed to create additional snapshots |
1ec0d70d DM |
1140 | task_log!( |
1141 | worker, | |
e2e7560d | 1142 | "sync group {} failed - owner check failed ({} != {})", |
ee0ea735 TL |
1143 | &group, |
1144 | params.owner, | |
1145 | owner | |
1ec0d70d | 1146 | ); |
7b8aa893 | 1147 | errors = true; // do not stop here, instead continue |
c06c1b4b FG |
1148 | } else if let Err(err) = pull_group( |
1149 | worker, | |
1150 | client, | |
1151 | params, | |
1152 | &group, | |
1153 | source_ns.clone(), | |
1154 | &mut progress, | |
1155 | ) | |
1156 | .await | |
1157 | { | |
ee0ea735 | 1158 | task_log!(worker, "sync group {} failed - {}", &group, err,); |
20813274 | 1159 | errors = true; // do not stop here, instead continue |
07ad6470 DM |
1160 | } |
1161 | } | |
1162 | ||
6e9e6c7a | 1163 | if params.remove_vanished { |
6ef1b649 | 1164 | let result: Result<(), Error> = proxmox_lang::try_block!({ |
c06c1b4b | 1165 | for local_group in params.store.iter_backup_groups(target_ns.clone())? { |
249dde8b | 1166 | let local_group = local_group?; |
e13303fc FG |
1167 | let local_group = local_group.group(); |
1168 | if new_groups.contains(local_group) { | |
e2956c60 FG |
1169 | continue; |
1170 | } | |
e13303fc | 1171 | let owner = params.store.get_owner(&target_ns, local_group)?; |
df768ebe FG |
1172 | if check_backup_owner(&owner, ¶ms.owner).is_err() { |
1173 | continue; | |
1174 | } | |
71e53463 | 1175 | if let Some(ref group_filter) = ¶ms.group_filter { |
e13303fc | 1176 | if !apply_filters(local_group, group_filter) { |
71e53463 FG |
1177 | continue; |
1178 | } | |
1179 | } | |
e13303fc FG |
1180 | task_log!(worker, "delete vanished group '{local_group}'",); |
1181 | match params.store.remove_backup_group(&target_ns, local_group) { | |
ee0ea735 | 1182 | Ok(true) => {} |
34339261 | 1183 | Ok(false) => { |
ee0ea735 TL |
1184 | task_log!( |
1185 | worker, | |
1186 | "kept some protected snapshots of group '{}'", | |
1187 | local_group | |
1188 | ); | |
1189 | } | |
34339261 DC |
1190 | Err(err) => { |
1191 | task_log!(worker, "{}", err); | |
1192 | errors = true; | |
1193 | } | |
07ad6470 DM |
1194 | } |
1195 | } | |
1196 | Ok(()) | |
1197 | }); | |
1198 | if let Err(err) = result { | |
1ec0d70d | 1199 | task_log!(worker, "error during cleanup: {}", err); |
07ad6470 DM |
1200 | errors = true; |
1201 | }; | |
1202 | } | |
1203 | ||
c06c1b4b | 1204 | Ok((progress, errors)) |
07ad6470 | 1205 | } |