]>
Commit | Line | Data |
---|---|---|
07ad6470 DM |
1 | //! Sync datastore from remote server |
2 | ||
e2956c60 | 3 | use std::collections::{HashMap, HashSet}; |
076f36ec HL |
4 | use std::io::{Seek, Write}; |
5 | use std::path::{Path, PathBuf}; | |
54417086 | 6 | use std::sync::atomic::{AtomicUsize, Ordering}; |
e2956c60 | 7 | use std::sync::{Arc, Mutex}; |
68ac365f | 8 | use std::time::{Duration, SystemTime}; |
07ad6470 | 9 | |
c23192d3 | 10 | use anyhow::{bail, format_err, Error}; |
6ef1b649 | 11 | use http::StatusCode; |
12632250 | 12 | use proxmox_human_byte::HumanByte; |
05a52d01 | 13 | use proxmox_rest_server::WorkerTask; |
6ef1b649 | 14 | use proxmox_router::HttpError; |
05a52d01 HL |
15 | use proxmox_sys::{task_log, task_warn}; |
16 | use serde_json::json; | |
c23192d3 | 17 | |
2d5287fb | 18 | use pbs_api_types::{ |
05a52d01 HL |
19 | print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter, |
20 | GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH, | |
7aeabff2 | 21 | PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ, |
2d5287fb | 22 | }; |
05a52d01 HL |
23 | use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader}; |
24 | use pbs_config::CachedUserInfo; | |
ea584a75 WB |
25 | use pbs_datastore::data_blob::DataBlob; |
26 | use pbs_datastore::dynamic_index::DynamicIndexReader; | |
27 | use pbs_datastore::fixed_index::FixedIndexReader; | |
28 | use pbs_datastore::index::IndexFile; | |
29 | use pbs_datastore::manifest::{ | |
ee0ea735 | 30 | archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, |
ea584a75 | 31 | }; |
05a52d01 | 32 | use pbs_datastore::read_chunk::AsyncReadChunk; |
076f36ec HL |
33 | use pbs_datastore::{ |
34 | check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress, | |
35 | }; | |
ba0ccc59 | 36 | use pbs_tools::sha::sha256; |
c23192d3 | 37 | |
076f36ec | 38 | use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups}; |
260147bd | 39 | use crate::tools::parallel_handler::ParallelHandler; |
07ad6470 | 40 | |
05a52d01 HL |
41 | struct RemoteReader { |
42 | backup_reader: Arc<BackupReader>, | |
43 | dir: BackupDir, | |
44 | } | |
45 | ||
076f36ec HL |
46 | struct LocalReader { |
47 | _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>, | |
48 | path: PathBuf, | |
49 | datastore: Arc<DataStore>, | |
50 | } | |
51 | ||
05a52d01 | 52 | pub(crate) struct PullTarget { |
6e9e6c7a | 53 | store: Arc<DataStore>, |
c06c1b4b | 54 | ns: BackupNamespace, |
05a52d01 HL |
55 | } |
56 | ||
57 | pub(crate) struct RemoteSource { | |
58 | repo: BackupRepository, | |
59 | ns: BackupNamespace, | |
60 | client: HttpClient, | |
61 | } | |
62 | ||
076f36ec HL |
63 | pub(crate) struct LocalSource { |
64 | store: Arc<DataStore>, | |
65 | ns: BackupNamespace, | |
66 | } | |
67 | ||
68ac365f CE |
68 | #[derive(Default)] |
69 | pub(crate) struct PullStats { | |
70 | pub(crate) chunk_count: usize, | |
71 | pub(crate) bytes: usize, | |
72 | pub(crate) elapsed: Duration, | |
73 | } | |
74 | ||
75 | impl PullStats { | |
76 | fn add(&mut self, rhs: PullStats) { | |
77 | self.chunk_count += rhs.chunk_count; | |
78 | self.bytes += rhs.bytes; | |
79 | self.elapsed += rhs.elapsed; | |
80 | } | |
81 | } | |
82 | ||
05a52d01 HL |
83 | #[async_trait::async_trait] |
84 | /// `PullSource` is a trait that provides an interface for pulling data/information from a source. | |
85 | /// The trait includes methods for listing namespaces, groups, and backup directories, | |
86 | /// as well as retrieving a reader for reading data from the source | |
87 | trait PullSource: Send + Sync { | |
88 | /// Lists namespaces from the source. | |
89 | async fn list_namespaces( | |
90 | &self, | |
91 | max_depth: &mut Option<usize>, | |
92 | worker: &WorkerTask, | |
93 | ) -> Result<Vec<BackupNamespace>, Error>; | |
94 | ||
95 | /// Lists groups within a specific namespace from the source. | |
96 | async fn list_groups( | |
97 | &self, | |
98 | namespace: &BackupNamespace, | |
99 | owner: &Authid, | |
100 | ) -> Result<Vec<BackupGroup>, Error>; | |
101 | ||
102 | /// Lists backup directories for a specific group within a specific namespace from the source. | |
103 | async fn list_backup_dirs( | |
104 | &self, | |
105 | namespace: &BackupNamespace, | |
106 | group: &BackupGroup, | |
107 | worker: &WorkerTask, | |
108 | ) -> Result<Vec<BackupDir>, Error>; | |
109 | fn get_ns(&self) -> BackupNamespace; | |
4cc4ea64 | 110 | fn get_store(&self) -> &str; |
05a52d01 HL |
111 | |
112 | /// Returns a reader for reading data from a specific backup directory. | |
113 | async fn reader( | |
114 | &self, | |
115 | ns: &BackupNamespace, | |
116 | dir: &BackupDir, | |
117 | ) -> Result<Arc<dyn PullReader>, Error>; | |
118 | } | |
119 | ||
120 | #[async_trait::async_trait] | |
121 | impl PullSource for RemoteSource { | |
122 | async fn list_namespaces( | |
123 | &self, | |
124 | max_depth: &mut Option<usize>, | |
125 | worker: &WorkerTask, | |
126 | ) -> Result<Vec<BackupNamespace>, Error> { | |
127 | if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) { | |
128 | return Ok(vec![self.ns.clone()]); | |
129 | } | |
130 | ||
131 | let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store()); | |
132 | let mut data = json!({}); | |
133 | if let Some(max_depth) = max_depth { | |
134 | data["max-depth"] = json!(max_depth); | |
135 | } | |
136 | ||
137 | if !self.ns.is_root() { | |
138 | data["parent"] = json!(self.ns); | |
139 | } | |
140 | self.client.login().await?; | |
141 | ||
142 | let mut result = match self.client.get(&path, Some(data)).await { | |
143 | Ok(res) => res, | |
144 | Err(err) => match err.downcast_ref::<HttpError>() { | |
145 | Some(HttpError { code, message }) => match code { | |
146 | &StatusCode::NOT_FOUND => { | |
147 | if self.ns.is_root() && max_depth.is_none() { | |
148 | task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode"); | |
149 | task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system."); | |
150 | max_depth.replace(0); | |
151 | } else { | |
152 | bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.") | |
153 | } | |
154 | ||
155 | return Ok(vec![self.ns.clone()]); | |
156 | } | |
157 | _ => { | |
158 | bail!("Querying namespaces failed - HTTP error {code} - {message}"); | |
159 | } | |
160 | }, | |
161 | None => { | |
162 | bail!("Querying namespaces failed - {err}"); | |
163 | } | |
164 | }, | |
165 | }; | |
166 | ||
167 | let list: Vec<BackupNamespace> = | |
168 | serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())? | |
169 | .into_iter() | |
170 | .map(|list_item| list_item.ns) | |
171 | .collect(); | |
172 | ||
173 | Ok(list) | |
174 | } | |
175 | ||
176 | async fn list_groups( | |
177 | &self, | |
178 | namespace: &BackupNamespace, | |
179 | _owner: &Authid, | |
180 | ) -> Result<Vec<BackupGroup>, Error> { | |
181 | let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store()); | |
182 | ||
183 | let args = if !namespace.is_root() { | |
184 | Some(json!({ "ns": namespace.clone() })) | |
185 | } else { | |
186 | None | |
187 | }; | |
188 | ||
189 | self.client.login().await?; | |
190 | let mut result = | |
191 | self.client.get(&path, args).await.map_err(|err| { | |
192 | format_err!("Failed to retrieve backup groups from remote - {}", err) | |
193 | })?; | |
194 | ||
195 | Ok( | |
196 | serde_json::from_value::<Vec<GroupListItem>>(result["data"].take()) | |
197 | .map_err(Error::from)? | |
198 | .into_iter() | |
199 | .map(|item| item.backup) | |
200 | .collect::<Vec<BackupGroup>>(), | |
201 | ) | |
202 | } | |
203 | ||
204 | async fn list_backup_dirs( | |
205 | &self, | |
b14e5dcb | 206 | namespace: &BackupNamespace, |
05a52d01 HL |
207 | group: &BackupGroup, |
208 | worker: &WorkerTask, | |
209 | ) -> Result<Vec<BackupDir>, Error> { | |
210 | let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store()); | |
211 | ||
212 | let mut args = json!({ | |
213 | "backup-type": group.ty, | |
214 | "backup-id": group.id, | |
215 | }); | |
216 | ||
b14e5dcb | 217 | if !namespace.is_root() { |
2224b390 | 218 | args["ns"] = serde_json::to_value(namespace)?; |
05a52d01 HL |
219 | } |
220 | ||
221 | self.client.login().await?; | |
222 | ||
223 | let mut result = self.client.get(&path, Some(args)).await?; | |
224 | let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?; | |
225 | Ok(snapshot_list | |
226 | .into_iter() | |
227 | .filter_map(|item: SnapshotListItem| { | |
228 | let snapshot = item.backup; | |
229 | // in-progress backups can't be synced | |
230 | if item.size.is_none() { | |
231 | task_log!( | |
232 | worker, | |
233 | "skipping snapshot {} - in-progress backup", | |
234 | snapshot | |
235 | ); | |
236 | return None; | |
237 | } | |
238 | ||
239 | Some(snapshot) | |
240 | }) | |
241 | .collect::<Vec<BackupDir>>()) | |
242 | } | |
243 | ||
244 | fn get_ns(&self) -> BackupNamespace { | |
245 | self.ns.clone() | |
246 | } | |
247 | ||
4cc4ea64 | 248 | fn get_store(&self) -> &str { |
2224b390 | 249 | self.repo.store() |
05a52d01 HL |
250 | } |
251 | ||
252 | async fn reader( | |
253 | &self, | |
254 | ns: &BackupNamespace, | |
255 | dir: &BackupDir, | |
256 | ) -> Result<Arc<dyn PullReader>, Error> { | |
257 | let backup_reader = | |
258 | BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?; | |
259 | Ok(Arc::new(RemoteReader { | |
260 | backup_reader, | |
261 | dir: dir.clone(), | |
262 | })) | |
263 | } | |
264 | } | |
265 | ||
076f36ec HL |
266 | #[async_trait::async_trait] |
267 | impl PullSource for LocalSource { | |
268 | async fn list_namespaces( | |
269 | &self, | |
270 | max_depth: &mut Option<usize>, | |
271 | _worker: &WorkerTask, | |
272 | ) -> Result<Vec<BackupNamespace>, Error> { | |
273 | ListNamespacesRecursive::new_max_depth( | |
274 | self.store.clone(), | |
275 | self.ns.clone(), | |
276 | max_depth.unwrap_or(MAX_NAMESPACE_DEPTH), | |
277 | )? | |
278 | .collect() | |
279 | } | |
280 | ||
281 | async fn list_groups( | |
282 | &self, | |
283 | namespace: &BackupNamespace, | |
284 | owner: &Authid, | |
285 | ) -> Result<Vec<BackupGroup>, Error> { | |
286 | Ok(ListAccessibleBackupGroups::new_with_privs( | |
287 | &self.store, | |
288 | namespace.clone(), | |
289 | 0, | |
7aeabff2 HL |
290 | Some(PRIV_DATASTORE_READ), |
291 | Some(PRIV_DATASTORE_BACKUP), | |
076f36ec HL |
292 | Some(owner), |
293 | )? | |
294 | .filter_map(Result::ok) | |
295 | .map(|backup_group| backup_group.group().clone()) | |
296 | .collect::<Vec<pbs_api_types::BackupGroup>>()) | |
297 | } | |
298 | ||
299 | async fn list_backup_dirs( | |
300 | &self, | |
301 | namespace: &BackupNamespace, | |
302 | group: &BackupGroup, | |
303 | _worker: &WorkerTask, | |
304 | ) -> Result<Vec<BackupDir>, Error> { | |
305 | Ok(self | |
306 | .store | |
307 | .backup_group(namespace.clone(), group.clone()) | |
308 | .iter_snapshots()? | |
309 | .filter_map(Result::ok) | |
310 | .map(|snapshot| snapshot.dir().to_owned()) | |
311 | .collect::<Vec<BackupDir>>()) | |
312 | } | |
313 | ||
314 | fn get_ns(&self) -> BackupNamespace { | |
315 | self.ns.clone() | |
316 | } | |
317 | ||
4cc4ea64 FG |
318 | fn get_store(&self) -> &str { |
319 | self.store.name() | |
076f36ec HL |
320 | } |
321 | ||
322 | async fn reader( | |
323 | &self, | |
324 | ns: &BackupNamespace, | |
325 | dir: &BackupDir, | |
326 | ) -> Result<Arc<dyn PullReader>, Error> { | |
327 | let dir = self.store.backup_dir(ns.clone(), dir.clone())?; | |
328 | let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared( | |
329 | &dir.full_path(), | |
330 | "snapshot", | |
331 | "locked by another operation", | |
332 | )?; | |
333 | Ok(Arc::new(LocalReader { | |
334 | _dir_lock: Arc::new(Mutex::new(dir_lock)), | |
335 | path: dir.full_path(), | |
336 | datastore: dir.datastore().clone(), | |
337 | })) | |
338 | } | |
339 | } | |
340 | ||
05a52d01 HL |
341 | #[async_trait::async_trait] |
342 | /// `PullReader` is a trait that provides an interface for reading data from a source. | |
343 | /// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped. | |
344 | trait PullReader: Send + Sync { | |
345 | /// Returns a chunk reader with the specified encryption mode. | |
346 | fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>; | |
347 | ||
348 | /// Asynchronously loads a file from the source into a local file. | |
349 | /// `filename` is the name of the file to load from the source. | |
350 | /// `into` is the path of the local file to load the source file into. | |
351 | async fn load_file_into( | |
352 | &self, | |
353 | filename: &str, | |
354 | into: &Path, | |
355 | worker: &WorkerTask, | |
356 | ) -> Result<Option<DataBlob>, Error>; | |
357 | ||
358 | /// Tries to download the client log from the source and save it into a local file. | |
359 | async fn try_download_client_log( | |
360 | &self, | |
361 | to_path: &Path, | |
362 | worker: &WorkerTask, | |
363 | ) -> Result<(), Error>; | |
364 | ||
365 | fn skip_chunk_sync(&self, target_store_name: &str) -> bool; | |
366 | } | |
367 | ||
368 | #[async_trait::async_trait] | |
369 | impl PullReader for RemoteReader { | |
370 | fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> { | |
371 | Arc::new(RemoteChunkReader::new( | |
372 | self.backup_reader.clone(), | |
373 | None, | |
374 | crypt_mode, | |
375 | HashMap::new(), | |
376 | )) | |
377 | } | |
378 | ||
379 | async fn load_file_into( | |
380 | &self, | |
381 | filename: &str, | |
382 | into: &Path, | |
383 | worker: &WorkerTask, | |
384 | ) -> Result<Option<DataBlob>, Error> { | |
385 | let mut tmp_file = std::fs::OpenOptions::new() | |
386 | .write(true) | |
387 | .create(true) | |
388 | .truncate(true) | |
389 | .read(true) | |
390 | .open(into)?; | |
391 | let download_result = self.backup_reader.download(filename, &mut tmp_file).await; | |
392 | if let Err(err) = download_result { | |
393 | match err.downcast_ref::<HttpError>() { | |
394 | Some(HttpError { code, message }) => match *code { | |
395 | StatusCode::NOT_FOUND => { | |
396 | task_log!( | |
397 | worker, | |
398 | "skipping snapshot {} - vanished since start of sync", | |
399 | &self.dir, | |
400 | ); | |
401 | return Ok(None); | |
402 | } | |
403 | _ => { | |
404 | bail!("HTTP error {code} - {message}"); | |
405 | } | |
406 | }, | |
407 | None => { | |
408 | return Err(err); | |
409 | } | |
410 | }; | |
411 | }; | |
412 | tmp_file.rewind()?; | |
413 | Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) | |
414 | } | |
415 | ||
416 | async fn try_download_client_log( | |
417 | &self, | |
418 | to_path: &Path, | |
419 | worker: &WorkerTask, | |
420 | ) -> Result<(), Error> { | |
421 | let mut tmp_path = to_path.to_owned(); | |
422 | tmp_path.set_extension("tmp"); | |
423 | ||
424 | let tmpfile = std::fs::OpenOptions::new() | |
425 | .write(true) | |
426 | .create(true) | |
427 | .read(true) | |
428 | .open(&tmp_path)?; | |
429 | ||
430 | // Note: be silent if there is no log - only log successful download | |
431 | if let Ok(()) = self | |
432 | .backup_reader | |
433 | .download(CLIENT_LOG_BLOB_NAME, tmpfile) | |
434 | .await | |
435 | { | |
436 | if let Err(err) = std::fs::rename(&tmp_path, to_path) { | |
437 | bail!("Atomic rename file {:?} failed - {}", to_path, err); | |
438 | } | |
439 | task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME); | |
440 | } | |
441 | ||
442 | Ok(()) | |
443 | } | |
444 | ||
445 | fn skip_chunk_sync(&self, _target_store_name: &str) -> bool { | |
446 | false | |
447 | } | |
448 | } | |
449 | ||
076f36ec HL |
450 | #[async_trait::async_trait] |
451 | impl PullReader for LocalReader { | |
452 | fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> { | |
453 | Arc::new(LocalChunkReader::new( | |
454 | self.datastore.clone(), | |
455 | None, | |
456 | crypt_mode, | |
457 | )) | |
458 | } | |
459 | ||
460 | async fn load_file_into( | |
461 | &self, | |
462 | filename: &str, | |
463 | into: &Path, | |
464 | _worker: &WorkerTask, | |
465 | ) -> Result<Option<DataBlob>, Error> { | |
466 | let mut tmp_file = std::fs::OpenOptions::new() | |
467 | .write(true) | |
468 | .create(true) | |
469 | .truncate(true) | |
470 | .read(true) | |
471 | .open(into)?; | |
472 | let mut from_path = self.path.clone(); | |
473 | from_path.push(filename); | |
474 | tmp_file.write_all(std::fs::read(from_path)?.as_slice())?; | |
475 | tmp_file.rewind()?; | |
476 | Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) | |
477 | } | |
478 | ||
479 | async fn try_download_client_log( | |
480 | &self, | |
481 | _to_path: &Path, | |
482 | _worker: &WorkerTask, | |
483 | ) -> Result<(), Error> { | |
484 | Ok(()) | |
485 | } | |
486 | ||
487 | fn skip_chunk_sync(&self, target_store_name: &str) -> bool { | |
488 | self.datastore.name() == target_store_name | |
489 | } | |
490 | } | |
491 | ||
05a52d01 HL |
492 | /// Parameters for a pull operation. |
493 | pub(crate) struct PullParameters { | |
494 | /// Where data is pulled from | |
495 | source: Arc<dyn PullSource>, | |
496 | /// Where data should be pulled into | |
497 | target: PullTarget, | |
29c56859 | 498 | /// Owner of synced groups (needs to match local owner of pre-existing groups) |
6e9e6c7a | 499 | owner: Authid, |
29c56859 | 500 | /// Whether to remove groups which exist locally, but not on the remote end |
6e9e6c7a | 501 | remove_vanished: bool, |
b9310489 FG |
502 | /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion) |
503 | max_depth: Option<usize>, | |
29c56859 | 504 | /// Filters for reducing the pull scope |
59c92736 | 505 | group_filter: Vec<GroupFilter>, |
9b67352a SH |
506 | /// How many snapshots should be transferred at most (taking the newest N snapshots) |
507 | transfer_last: Option<usize>, | |
6e9e6c7a FG |
508 | } |
509 | ||
510 | impl PullParameters { | |
29c56859 | 511 | /// Creates a new instance of `PullParameters`. |
5b6cb51d | 512 | pub(crate) fn new( |
6e9e6c7a | 513 | store: &str, |
c06c1b4b | 514 | ns: BackupNamespace, |
05a52d01 | 515 | remote: Option<&str>, |
6e9e6c7a | 516 | remote_store: &str, |
c06c1b4b | 517 | remote_ns: BackupNamespace, |
6e9e6c7a FG |
518 | owner: Authid, |
519 | remove_vanished: Option<bool>, | |
b9310489 | 520 | max_depth: Option<usize>, |
71e53463 | 521 | group_filter: Option<Vec<GroupFilter>>, |
2d5287fb | 522 | limit: RateLimitConfig, |
9b67352a | 523 | transfer_last: Option<usize>, |
6e9e6c7a | 524 | ) -> Result<Self, Error> { |
66abc4cb FG |
525 | if let Some(max_depth) = max_depth { |
526 | ns.check_max_depth(max_depth)?; | |
527 | remote_ns.check_max_depth(max_depth)?; | |
05a52d01 | 528 | }; |
61ef4ae8 | 529 | let remove_vanished = remove_vanished.unwrap_or(false); |
6e9e6c7a | 530 | |
05a52d01 HL |
531 | let source: Arc<dyn PullSource> = if let Some(remote) = remote { |
532 | let (remote_config, _digest) = pbs_config::remote::config()?; | |
533 | let remote: Remote = remote_config.lookup("remote", remote)?; | |
6e9e6c7a | 534 | |
05a52d01 HL |
535 | let repo = BackupRepository::new( |
536 | Some(remote.config.auth_id.clone()), | |
537 | Some(remote.config.host.clone()), | |
538 | remote.config.port, | |
539 | remote_store.to_string(), | |
540 | ); | |
541 | let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?; | |
542 | Arc::new(RemoteSource { | |
543 | repo, | |
544 | ns: remote_ns, | |
545 | client, | |
546 | }) | |
547 | } else { | |
076f36ec HL |
548 | Arc::new(LocalSource { |
549 | store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?, | |
550 | ns: remote_ns, | |
551 | }) | |
05a52d01 HL |
552 | }; |
553 | let target = PullTarget { | |
554 | store: DataStore::lookup_datastore(store, Some(Operation::Write))?, | |
c06c1b4b | 555 | ns, |
05a52d01 HL |
556 | }; |
557 | ||
2dd9f98f | 558 | let group_filter = group_filter.unwrap_or_default(); |
59c92736 | 559 | |
05a52d01 | 560 | Ok(Self { |
ee0ea735 | 561 | source, |
05a52d01 | 562 | target, |
ee0ea735 TL |
563 | owner, |
564 | remove_vanished, | |
c06c1b4b | 565 | max_depth, |
ee0ea735 | 566 | group_filter, |
9b67352a | 567 | transfer_last, |
ee0ea735 | 568 | }) |
6e9e6c7a | 569 | } |
6e9e6c7a FG |
570 | } |
571 | ||
07ad6470 | 572 | async fn pull_index_chunks<I: IndexFile>( |
998db639 | 573 | worker: &WorkerTask, |
05a52d01 | 574 | chunk_reader: Arc<dyn AsyncReadChunk>, |
07ad6470 DM |
575 | target: Arc<DataStore>, |
576 | index: I, | |
e2956c60 | 577 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
68ac365f | 578 | ) -> Result<PullStats, Error> { |
73b2cc49 | 579 | use futures::stream::{self, StreamExt, TryStreamExt}; |
07ad6470 | 580 | |
998db639 DM |
581 | let start_time = SystemTime::now(); |
582 | ||
ebbe4958 DM |
583 | let stream = stream::iter( |
584 | (0..index.index_count()) | |
585 | .map(|pos| index.chunk_info(pos).unwrap()) | |
586 | .filter(|info| { | |
587 | let mut guard = downloaded_chunks.lock().unwrap(); | |
588 | let done = guard.contains(&info.digest); | |
589 | if !done { | |
590 | // Note: We mark a chunk as downloaded before its actually downloaded | |
591 | // to avoid duplicate downloads. | |
592 | guard.insert(info.digest); | |
593 | } | |
594 | !done | |
e2956c60 | 595 | }), |
ebbe4958 | 596 | ); |
07ad6470 | 597 | |
a71bc08f | 598 | let target2 = target.clone(); |
54417086 | 599 | let verify_pool = ParallelHandler::new( |
e2956c60 FG |
600 | "sync chunk writer", |
601 | 4, | |
602 | move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { | |
25877d05 | 603 | // println!("verify and write {}", hex::encode(&digest)); |
54417086 | 604 | chunk.verify_unencrypted(size as usize, &digest)?; |
a71bc08f | 605 | target2.insert_chunk(&chunk, &digest)?; |
54417086 | 606 | Ok(()) |
e2956c60 | 607 | }, |
54417086 DM |
608 | ); |
609 | ||
610 | let verify_and_write_channel = verify_pool.channel(); | |
998db639 | 611 | |
54417086 | 612 | let bytes = Arc::new(AtomicUsize::new(0)); |
68ac365f | 613 | let chunk_count = Arc::new(AtomicUsize::new(0)); |
54417086 DM |
614 | |
615 | stream | |
73b2cc49 | 616 | .map(|info| { |
73b2cc49 DM |
617 | let target = Arc::clone(&target); |
618 | let chunk_reader = chunk_reader.clone(); | |
54417086 | 619 | let bytes = Arc::clone(&bytes); |
68ac365f | 620 | let chunk_count = Arc::clone(&chunk_count); |
54417086 | 621 | let verify_and_write_channel = verify_and_write_channel.clone(); |
73b2cc49 DM |
622 | |
623 | Ok::<_, Error>(async move { | |
9a1b24b6 | 624 | let chunk_exists = proxmox_async::runtime::block_in_place(|| { |
e2956c60 FG |
625 | target.cond_touch_chunk(&info.digest, false) |
626 | })?; | |
73b2cc49 | 627 | if chunk_exists { |
25877d05 | 628 | //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest)); |
73b2cc49 DM |
629 | return Ok::<_, Error>(()); |
630 | } | |
25877d05 | 631 | //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest)); |
73b2cc49 | 632 | let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; |
54417086 | 633 | let raw_size = chunk.raw_size() as usize; |
73b2cc49 | 634 | |
998db639 | 635 | // decode, verify and write in a separate threads to maximize throughput |
9a1b24b6 | 636 | proxmox_async::runtime::block_in_place(|| { |
e2956c60 FG |
637 | verify_and_write_channel.send((chunk, info.digest, info.size())) |
638 | })?; | |
54417086 DM |
639 | |
640 | bytes.fetch_add(raw_size, Ordering::SeqCst); | |
68ac365f | 641 | chunk_count.fetch_add(1, Ordering::SeqCst); |
998db639 DM |
642 | |
643 | Ok(()) | |
e2956c60 | 644 | }) |
73b2cc49 DM |
645 | }) |
646 | .try_buffer_unordered(20) | |
647 | .try_for_each(|_res| futures::future::ok(())) | |
54417086 | 648 | .await?; |
998db639 | 649 | |
54417086 | 650 | drop(verify_and_write_channel); |
998db639 | 651 | |
54417086 | 652 | verify_pool.complete()?; |
998db639 | 653 | |
68ac365f | 654 | let elapsed = start_time.elapsed()?; |
998db639 | 655 | |
54417086 | 656 | let bytes = bytes.load(Ordering::SeqCst); |
68ac365f | 657 | let chunk_count = chunk_count.load(Ordering::SeqCst); |
54417086 | 658 | |
1ec0d70d | 659 | task_log!( |
ee0ea735 | 660 | worker, |
12632250 CE |
661 | "downloaded {} ({}/s)", |
662 | HumanByte::from(bytes), | |
663 | HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()), | |
1ec0d70d | 664 | ); |
07ad6470 | 665 | |
68ac365f CE |
666 | Ok(PullStats { |
667 | chunk_count, | |
668 | bytes, | |
669 | elapsed, | |
670 | }) | |
07ad6470 DM |
671 | } |
672 | ||
e2956c60 | 673 | fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> { |
2ce15934 | 674 | if size != info.size { |
e2956c60 FG |
675 | bail!( |
676 | "wrong size for file '{}' ({} != {})", | |
677 | info.filename, | |
678 | info.size, | |
679 | size | |
680 | ); | |
2ce15934 FG |
681 | } |
682 | ||
683 | if csum != &info.csum { | |
684 | bail!("wrong checksum for file '{}'", info.filename); | |
685 | } | |
686 | ||
687 | Ok(()) | |
688 | } | |
689 | ||
29c56859 FG |
690 | /// Pulls a single file referenced by a manifest. |
691 | /// | |
692 | /// Pulling an archive consists of the following steps: | |
05a52d01 HL |
693 | /// - Load archive file into tmp file |
694 | /// -- Load file into tmp file | |
695 | /// -- Verify tmp file checksum | |
29c56859 FG |
696 | /// - if archive is an index, pull referenced chunks |
697 | /// - Rename tmp file into real path | |
05a52d01 HL |
698 | async fn pull_single_archive<'a>( |
699 | worker: &'a WorkerTask, | |
700 | reader: Arc<dyn PullReader + 'a>, | |
701 | snapshot: &'a pbs_datastore::BackupDir, | |
702 | archive_info: &'a FileInfo, | |
e2956c60 | 703 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
68ac365f | 704 | ) -> Result<PullStats, Error> { |
2ce15934 | 705 | let archive_name = &archive_info.filename; |
c06c1b4b | 706 | let mut path = snapshot.full_path(); |
07ad6470 DM |
707 | path.push(archive_name); |
708 | ||
709 | let mut tmp_path = path.clone(); | |
710 | tmp_path.set_extension("tmp"); | |
711 | ||
68ac365f CE |
712 | let mut pull_stats = PullStats::default(); |
713 | ||
1ec0d70d DM |
714 | task_log!(worker, "sync archive {}", archive_name); |
715 | ||
05a52d01 HL |
716 | reader |
717 | .load_file_into(archive_name, &tmp_path, worker) | |
718 | .await?; | |
07ad6470 | 719 | |
05a52d01 | 720 | let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?; |
07ad6470 DM |
721 | |
722 | match archive_type(archive_name)? { | |
723 | ArchiveType::DynamicIndex => { | |
e2956c60 FG |
724 | let index = DynamicIndexReader::new(tmpfile).map_err(|err| { |
725 | format_err!("unable to read dynamic index {:?} - {}", tmp_path, err) | |
726 | })?; | |
2ce15934 FG |
727 | let (csum, size) = index.compute_csum(); |
728 | verify_archive(archive_info, &csum, size)?; | |
07ad6470 | 729 | |
05a52d01 HL |
730 | if reader.skip_chunk_sync(snapshot.datastore().name()) { |
731 | task_log!(worker, "skipping chunk sync for same datastore"); | |
732 | } else { | |
68ac365f | 733 | let stats = pull_index_chunks( |
05a52d01 HL |
734 | worker, |
735 | reader.chunk_reader(archive_info.crypt_mode), | |
736 | snapshot.datastore().clone(), | |
737 | index, | |
738 | downloaded_chunks, | |
739 | ) | |
740 | .await?; | |
68ac365f | 741 | pull_stats.add(stats); |
05a52d01 | 742 | } |
07ad6470 DM |
743 | } |
744 | ArchiveType::FixedIndex => { | |
e2956c60 FG |
745 | let index = FixedIndexReader::new(tmpfile).map_err(|err| { |
746 | format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err) | |
747 | })?; | |
2ce15934 FG |
748 | let (csum, size) = index.compute_csum(); |
749 | verify_archive(archive_info, &csum, size)?; | |
07ad6470 | 750 | |
05a52d01 HL |
751 | if reader.skip_chunk_sync(snapshot.datastore().name()) { |
752 | task_log!(worker, "skipping chunk sync for same datastore"); | |
753 | } else { | |
68ac365f | 754 | let stats = pull_index_chunks( |
05a52d01 HL |
755 | worker, |
756 | reader.chunk_reader(archive_info.crypt_mode), | |
757 | snapshot.datastore().clone(), | |
758 | index, | |
759 | downloaded_chunks, | |
760 | ) | |
761 | .await?; | |
68ac365f | 762 | pull_stats.add(stats); |
05a52d01 | 763 | } |
07ad6470 | 764 | } |
2ce15934 | 765 | ArchiveType::Blob => { |
05a52d01 | 766 | tmpfile.rewind()?; |
ba0ccc59 | 767 | let (csum, size) = sha256(&mut tmpfile)?; |
2ce15934 FG |
768 | verify_archive(archive_info, &csum, size)?; |
769 | } | |
07ad6470 DM |
770 | } |
771 | if let Err(err) = std::fs::rename(&tmp_path, &path) { | |
772 | bail!("Atomic rename file {:?} failed - {}", path, err); | |
773 | } | |
68ac365f | 774 | Ok(pull_stats) |
07ad6470 DM |
775 | } |
776 | ||
29c56859 FG |
777 | /// Actual implementation of pulling a snapshot. |
778 | /// | |
779 | /// Pulling a snapshot consists of the following steps: | |
780 | /// - (Re)download the manifest | |
781 | /// -- if it matches, only download log and treat snapshot as already synced | |
782 | /// - Iterate over referenced files | |
783 | /// -- if file already exists, verify contents | |
784 | /// -- if not, pull it from the remote | |
785 | /// - Download log if not already existing | |
05a52d01 HL |
786 | async fn pull_snapshot<'a>( |
787 | worker: &'a WorkerTask, | |
788 | reader: Arc<dyn PullReader + 'a>, | |
789 | snapshot: &'a pbs_datastore::BackupDir, | |
e2956c60 | 790 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
68ac365f CE |
791 | ) -> Result<PullStats, Error> { |
792 | let mut pull_stats = PullStats::default(); | |
c06c1b4b | 793 | let mut manifest_name = snapshot.full_path(); |
07ad6470 DM |
794 | manifest_name.push(MANIFEST_BLOB_NAME); |
795 | ||
c06c1b4b | 796 | let mut client_log_name = snapshot.full_path(); |
1610c45a DM |
797 | client_log_name.push(CLIENT_LOG_BLOB_NAME); |
798 | ||
07ad6470 DM |
799 | let mut tmp_manifest_name = manifest_name.clone(); |
800 | tmp_manifest_name.set_extension("tmp"); | |
05a52d01 HL |
801 | let tmp_manifest_blob; |
802 | if let Some(data) = reader | |
803 | .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker) | |
804 | .await? | |
805 | { | |
806 | tmp_manifest_blob = data; | |
807 | } else { | |
68ac365f | 808 | return Ok(pull_stats); |
05a52d01 | 809 | } |
07ad6470 DM |
810 | |
811 | if manifest_name.exists() { | |
6ef1b649 | 812 | let manifest_blob = proxmox_lang::try_block!({ |
e2956c60 | 813 | let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| { |
87cdc327 | 814 | format_err!("unable to open local manifest {manifest_name:?} - {err}") |
e2956c60 | 815 | })?; |
07ad6470 | 816 | |
39f18b30 | 817 | let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?; |
07ad6470 | 818 | Ok(manifest_blob) |
e2956c60 FG |
819 | }) |
820 | .map_err(|err: Error| { | |
87cdc327 | 821 | format_err!("unable to read local manifest {manifest_name:?} - {err}") |
07ad6470 DM |
822 | })?; |
823 | ||
824 | if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { | |
1610c45a | 825 | if !client_log_name.exists() { |
05a52d01 HL |
826 | reader |
827 | .try_download_client_log(&client_log_name, worker) | |
828 | .await?; | |
829 | }; | |
1ec0d70d | 830 | task_log!(worker, "no data changes"); |
e0085e66 | 831 | let _ = std::fs::remove_file(&tmp_manifest_name); |
68ac365f | 832 | return Ok(pull_stats); // nothing changed |
07ad6470 DM |
833 | } |
834 | } | |
835 | ||
836 | let manifest = BackupManifest::try_from(tmp_manifest_blob)?; | |
837 | ||
07ad6470 | 838 | for item in manifest.files() { |
c06c1b4b | 839 | let mut path = snapshot.full_path(); |
07ad6470 DM |
840 | path.push(&item.filename); |
841 | ||
842 | if path.exists() { | |
843 | match archive_type(&item.filename)? { | |
844 | ArchiveType::DynamicIndex => { | |
845 | let index = DynamicIndexReader::open(&path)?; | |
846 | let (csum, size) = index.compute_csum(); | |
847 | match manifest.verify_file(&item.filename, &csum, size) { | |
848 | Ok(_) => continue, | |
849 | Err(err) => { | |
1ec0d70d | 850 | task_log!(worker, "detected changed file {:?} - {}", path, err); |
07ad6470 DM |
851 | } |
852 | } | |
853 | } | |
854 | ArchiveType::FixedIndex => { | |
855 | let index = FixedIndexReader::open(&path)?; | |
856 | let (csum, size) = index.compute_csum(); | |
857 | match manifest.verify_file(&item.filename, &csum, size) { | |
858 | Ok(_) => continue, | |
859 | Err(err) => { | |
1ec0d70d | 860 | task_log!(worker, "detected changed file {:?} - {}", path, err); |
07ad6470 DM |
861 | } |
862 | } | |
863 | } | |
864 | ArchiveType::Blob => { | |
865 | let mut tmpfile = std::fs::File::open(&path)?; | |
ba0ccc59 | 866 | let (csum, size) = sha256(&mut tmpfile)?; |
07ad6470 DM |
867 | match manifest.verify_file(&item.filename, &csum, size) { |
868 | Ok(_) => continue, | |
869 | Err(err) => { | |
1ec0d70d | 870 | task_log!(worker, "detected changed file {:?} - {}", path, err); |
07ad6470 DM |
871 | } |
872 | } | |
873 | } | |
874 | } | |
875 | } | |
876 | ||
68ac365f | 877 | let stats = pull_single_archive( |
07ad6470 | 878 | worker, |
05a52d01 | 879 | reader.clone(), |
07ad6470 | 880 | snapshot, |
9a37bd6c | 881 | item, |
ebbe4958 | 882 | downloaded_chunks.clone(), |
e2956c60 FG |
883 | ) |
884 | .await?; | |
68ac365f | 885 | pull_stats.add(stats); |
07ad6470 DM |
886 | } |
887 | ||
888 | if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { | |
889 | bail!("Atomic rename file {:?} failed - {}", manifest_name, err); | |
890 | } | |
891 | ||
1610c45a | 892 | if !client_log_name.exists() { |
05a52d01 HL |
893 | reader |
894 | .try_download_client_log(&client_log_name, worker) | |
895 | .await?; | |
896 | }; | |
fe79687c TL |
897 | snapshot |
898 | .cleanup_unreferenced_files(&manifest) | |
899 | .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?; | |
07ad6470 | 900 | |
68ac365f | 901 | Ok(pull_stats) |
07ad6470 DM |
902 | } |
903 | ||
c06c1b4b FG |
904 | /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case. |
905 | /// | |
05a52d01 HL |
906 | /// The `reader` is configured to read from the source backup directory, while the |
907 | /// `snapshot` is pointing to the local datastore and target namespace. | |
908 | async fn pull_snapshot_from<'a>( | |
909 | worker: &'a WorkerTask, | |
910 | reader: Arc<dyn PullReader + 'a>, | |
911 | snapshot: &'a pbs_datastore::BackupDir, | |
e2956c60 | 912 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
68ac365f | 913 | ) -> Result<PullStats, Error> { |
c06c1b4b FG |
914 | let (_path, is_new, _snap_lock) = snapshot |
915 | .datastore() | |
1afce610 | 916 | .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?; |
07ad6470 | 917 | |
68ac365f | 918 | let pull_stats = if is_new { |
1afce610 | 919 | task_log!(worker, "sync snapshot {}", snapshot.dir()); |
07ad6470 | 920 | |
68ac365f CE |
921 | match pull_snapshot(worker, reader, snapshot, downloaded_chunks).await { |
922 | Err(err) => { | |
923 | if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir( | |
924 | snapshot.backup_ns(), | |
925 | snapshot.as_ref(), | |
926 | true, | |
927 | ) { | |
928 | task_log!(worker, "cleanup error - {}", cleanup_err); | |
929 | } | |
930 | return Err(err); | |
931 | } | |
932 | Ok(pull_stats) => { | |
933 | task_log!(worker, "sync snapshot {} done", snapshot.dir()); | |
934 | pull_stats | |
07ad6470 | 935 | } |
07ad6470 DM |
936 | } |
937 | } else { | |
1afce610 | 938 | task_log!(worker, "re-sync snapshot {}", snapshot.dir()); |
68ac365f CE |
939 | pull_snapshot(worker, reader, snapshot, downloaded_chunks).await? |
940 | }; | |
07ad6470 | 941 | |
68ac365f | 942 | Ok(pull_stats) |
07ad6470 DM |
943 | } |
944 | ||
40a57cfa | 945 | #[derive(PartialEq, Eq)] |
71db1615 SH |
946 | enum SkipReason { |
947 | AlreadySynced, | |
948 | TransferLast, | |
949 | } | |
950 | ||
40a57cfa FG |
951 | impl std::fmt::Display for SkipReason { |
952 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
953 | write!( | |
954 | f, | |
955 | "{}", | |
956 | match self { | |
957 | SkipReason::AlreadySynced => "older than the newest local snapshot", | |
958 | SkipReason::TransferLast => "due to transfer-last", | |
959 | } | |
960 | ) | |
961 | } | |
962 | } | |
963 | ||
d2354a16 DC |
964 | struct SkipInfo { |
965 | oldest: i64, | |
966 | newest: i64, | |
967 | count: u64, | |
71db1615 | 968 | skip_reason: SkipReason, |
d2354a16 DC |
969 | } |
970 | ||
971 | impl SkipInfo { | |
71db1615 SH |
972 | fn new(skip_reason: SkipReason) -> Self { |
973 | SkipInfo { | |
974 | oldest: i64::MAX, | |
975 | newest: i64::MIN, | |
976 | count: 0, | |
977 | skip_reason, | |
978 | } | |
979 | } | |
980 | ||
981 | fn reset(&mut self) { | |
982 | self.count = 0; | |
983 | self.oldest = i64::MAX; | |
984 | self.newest = i64::MIN; | |
985 | } | |
986 | ||
d2354a16 DC |
987 | fn update(&mut self, backup_time: i64) { |
988 | self.count += 1; | |
989 | ||
990 | if backup_time < self.oldest { | |
991 | self.oldest = backup_time; | |
992 | } | |
993 | ||
994 | if backup_time > self.newest { | |
995 | self.newest = backup_time; | |
996 | } | |
997 | } | |
998 | ||
999 | fn affected(&self) -> Result<String, Error> { | |
1000 | match self.count { | |
1001 | 0 => Ok(String::new()), | |
6ef1b649 | 1002 | 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?), |
ee0ea735 TL |
1003 | _ => Ok(format!( |
1004 | "{} .. {}", | |
1005 | proxmox_time::epoch_to_rfc3339_utc(self.oldest)?, | |
1006 | proxmox_time::epoch_to_rfc3339_utc(self.newest)?, | |
1007 | )), | |
d2354a16 DC |
1008 | } |
1009 | } | |
1010 | } | |
1011 | ||
1012 | impl std::fmt::Display for SkipInfo { | |
1013 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
1014 | write!( | |
1015 | f, | |
71db1615 | 1016 | "skipped: {} snapshot(s) ({}) - {}", |
d2354a16 | 1017 | self.count, |
71db1615 | 1018 | self.affected().map_err(|_| std::fmt::Error)?, |
40a57cfa | 1019 | self.skip_reason, |
d2354a16 DC |
1020 | ) |
1021 | } | |
1022 | } | |
1023 | ||
29c56859 FG |
1024 | /// Pulls a group according to `params`. |
1025 | /// | |
1026 | /// Pulling a group consists of the following steps: | |
c06c1b4b FG |
1027 | /// - Query the list of snapshots available for this group in the source namespace on the remote |
1028 | /// - Sort by snapshot time | |
29c56859 FG |
1029 | /// - Get last snapshot timestamp on local datastore |
1030 | /// - Iterate over list of snapshots | |
29c56859 FG |
1031 | /// -- pull snapshot, unless it's not finished yet or older than last local snapshot |
1032 | /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote | |
1033 | /// | |
05a52d01 | 1034 | /// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the |
c06c1b4b FG |
1035 | /// remote when querying snapshots. This allows us to interact with old remotes that don't have |
1036 | /// namespace support yet. | |
1037 | /// | |
29c56859 FG |
1038 | /// Permission checks: |
1039 | /// - remote snapshot access is checked by remote (twice: query and opening the backup reader) | |
1040 | /// - local group owner is already checked by pull_store | |
aa073917 | 1041 | async fn pull_group( |
07ad6470 | 1042 | worker: &WorkerTask, |
6e9e6c7a | 1043 | params: &PullParameters, |
05a52d01 HL |
1044 | source_namespace: &BackupNamespace, |
1045 | group: &BackupGroup, | |
fc8920e3 | 1046 | progress: &mut StoreProgress, |
68ac365f | 1047 | ) -> Result<PullStats, Error> { |
71db1615 SH |
1048 | let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced); |
1049 | let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast); | |
d2354a16 | 1050 | |
05a52d01 HL |
1051 | let mut raw_list: Vec<BackupDir> = params |
1052 | .source | |
1053 | .list_backup_dirs(source_namespace, group, worker) | |
1054 | .await?; | |
1055 | raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time)); | |
1056 | ||
1057 | let total_amount = raw_list.len(); | |
9b67352a SH |
1058 | |
1059 | let cutoff = params | |
1060 | .transfer_last | |
1061 | .map(|count| total_amount.saturating_sub(count)) | |
1062 | .unwrap_or_default(); | |
1063 | ||
05a52d01 | 1064 | let target_ns = source_namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?; |
86f6f741 | 1065 | |
05a52d01 HL |
1066 | let mut source_snapshots = HashSet::new(); |
1067 | let last_sync_time = params | |
1068 | .target | |
1069 | .store | |
1070 | .last_successful_backup(&target_ns, group)? | |
1071 | .unwrap_or(i64::MIN); | |
1072 | ||
1073 | let list: Vec<BackupDir> = raw_list | |
1074 | .into_iter() | |
1075 | .enumerate() | |
1076 | .filter(|&(pos, ref dir)| { | |
1077 | source_snapshots.insert(dir.time); | |
1078 | if last_sync_time > dir.time { | |
1079 | already_synced_skip_info.update(dir.time); | |
1080 | return false; | |
1081 | } else if already_synced_skip_info.count > 0 { | |
1082 | task_log!(worker, "{}", already_synced_skip_info); | |
1083 | already_synced_skip_info.reset(); | |
1084 | return true; | |
1085 | } | |
0081903f | 1086 | |
05a52d01 HL |
1087 | if pos < cutoff && last_sync_time != dir.time { |
1088 | transfer_last_skip_info.update(dir.time); | |
1089 | return false; | |
1090 | } else if transfer_last_skip_info.count > 0 { | |
1091 | task_log!(worker, "{}", transfer_last_skip_info); | |
1092 | transfer_last_skip_info.reset(); | |
1093 | } | |
1094 | true | |
1095 | }) | |
1096 | .map(|(_, dir)| dir) | |
1097 | .collect(); | |
07ad6470 | 1098 | |
05a52d01 HL |
1099 | // start with 65536 chunks (up to 256 GiB) |
1100 | let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64))); | |
07ad6470 | 1101 | |
05a52d01 | 1102 | progress.group_snapshots = list.len() as u64; |
07ad6470 | 1103 | |
68ac365f CE |
1104 | let mut pull_stats = PullStats::default(); |
1105 | ||
05a52d01 HL |
1106 | for (pos, from_snapshot) in list.into_iter().enumerate() { |
1107 | let to_snapshot = params | |
1108 | .target | |
1109 | .store | |
1110 | .backup_dir(target_ns.clone(), from_snapshot.clone())?; | |
c06c1b4b | 1111 | |
05a52d01 HL |
1112 | let reader = params |
1113 | .source | |
1114 | .reader(source_namespace, &from_snapshot) | |
1115 | .await?; | |
1116 | let result = | |
1117 | pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await; | |
7b8aa893 | 1118 | |
fc8920e3 | 1119 | progress.done_snapshots = pos as u64 + 1; |
1ec0d70d | 1120 | task_log!(worker, "percentage done: {}", progress); |
7b8aa893 | 1121 | |
68ac365f CE |
1122 | let stats = result?; // stop on error |
1123 | pull_stats.add(stats); | |
07ad6470 DM |
1124 | } |
1125 | ||
6e9e6c7a | 1126 | if params.remove_vanished { |
05a52d01 HL |
1127 | let group = params |
1128 | .target | |
1129 | .store | |
1130 | .backup_group(target_ns.clone(), group.clone()); | |
6da20161 | 1131 | let local_list = group.list_backups()?; |
07ad6470 | 1132 | for info in local_list { |
1afce610 | 1133 | let snapshot = info.backup_dir; |
05a52d01 | 1134 | if source_snapshots.contains(&snapshot.backup_time()) { |
e2956c60 FG |
1135 | continue; |
1136 | } | |
1afce610 | 1137 | if snapshot.is_protected() { |
34339261 DC |
1138 | task_log!( |
1139 | worker, | |
e3ea5770 | 1140 | "don't delete vanished snapshot {} (protected)", |
1afce610 | 1141 | snapshot.dir() |
34339261 DC |
1142 | ); |
1143 | continue; | |
1144 | } | |
1afce610 | 1145 | task_log!(worker, "delete vanished snapshot {}", snapshot.dir()); |
db87d93e | 1146 | params |
05a52d01 | 1147 | .target |
db87d93e | 1148 | .store |
1afce610 | 1149 | .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?; |
07ad6470 DM |
1150 | } |
1151 | } | |
1152 | ||
68ac365f | 1153 | Ok(pull_stats) |
07ad6470 DM |
1154 | } |
1155 | ||
abd82485 | 1156 | fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> { |
c06c1b4b | 1157 | let mut created = false; |
05a52d01 | 1158 | let store_ns_str = print_store_and_ns(params.target.store.name(), ns); |
c06c1b4b | 1159 | |
05a52d01 HL |
1160 | if !ns.is_root() && !params.target.store.namespace_path(ns).exists() { |
1161 | check_ns_modification_privs(params.target.store.name(), ns, ¶ms.owner) | |
77bd14f6 FG |
1162 | .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?; |
1163 | ||
ea2e91e5 FG |
1164 | let name = match ns.components().last() { |
1165 | Some(name) => name.to_owned(), | |
1166 | None => { | |
1167 | bail!("Failed to determine last component of namespace."); | |
c06c1b4b | 1168 | } |
ea2e91e5 FG |
1169 | }; |
1170 | ||
05a52d01 | 1171 | if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) { |
abd82485 | 1172 | bail!("sync into {store_ns_str} failed - namespace creation failed: {err}"); |
c06c1b4b | 1173 | } |
ea2e91e5 | 1174 | created = true; |
c06c1b4b FG |
1175 | } |
1176 | ||
abd82485 | 1177 | check_ns_privs( |
05a52d01 | 1178 | params.target.store.name(), |
abd82485 FG |
1179 | ns, |
1180 | ¶ms.owner, | |
1181 | PRIV_DATASTORE_BACKUP, | |
1182 | ) | |
1183 | .map_err(|err| format_err!("sync into {store_ns_str} not allowed - {err}"))?; | |
c06c1b4b FG |
1184 | |
1185 | Ok(created) | |
1186 | } | |
1187 | ||
1188 | fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> { | |
05a52d01 | 1189 | check_ns_modification_privs(params.target.store.name(), local_ns, ¶ms.owner) |
77bd14f6 | 1190 | .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?; |
ea2e91e5 | 1191 | |
05a52d01 HL |
1192 | params |
1193 | .target | |
1194 | .store | |
1195 | .remove_namespace_recursive(local_ns, true) | |
c06c1b4b FG |
1196 | } |
1197 | ||
1198 | fn check_and_remove_vanished_ns( | |
1199 | worker: &WorkerTask, | |
1200 | params: &PullParameters, | |
1201 | synced_ns: HashSet<BackupNamespace>, | |
1202 | ) -> Result<bool, Error> { | |
1203 | let mut errors = false; | |
1204 | let user_info = CachedUserInfo::new()?; | |
1205 | ||
87be232d FG |
1206 | // clamp like remote does so that we don't list more than we can ever have synced. |
1207 | let max_depth = params | |
1208 | .max_depth | |
05a52d01 | 1209 | .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth()); |
87be232d | 1210 | |
c06c1b4b | 1211 | let mut local_ns_list: Vec<BackupNamespace> = params |
05a52d01 | 1212 | .target |
c06c1b4b | 1213 | .store |
05a52d01 | 1214 | .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))? |
c06c1b4b | 1215 | .filter(|ns| { |
abd82485 | 1216 | let user_privs = |
05a52d01 | 1217 | user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.target.store.name())); |
c06c1b4b FG |
1218 | user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0 |
1219 | }) | |
1220 | .collect(); | |
1221 | ||
1222 | // children first! | |
1223 | local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len())); | |
1224 | ||
1225 | for local_ns in local_ns_list { | |
05a52d01 | 1226 | if local_ns == params.target.ns { |
c06c1b4b FG |
1227 | continue; |
1228 | } | |
1229 | ||
1230 | if synced_ns.contains(&local_ns) { | |
1231 | continue; | |
1232 | } | |
1233 | ||
1234 | if local_ns.is_root() { | |
1235 | continue; | |
1236 | } | |
1237 | match check_and_remove_ns(params, &local_ns) { | |
1238 | Ok(true) => task_log!(worker, "Removed namespace {}", local_ns), | |
1239 | Ok(false) => task_log!( | |
1240 | worker, | |
1241 | "Did not remove namespace {} - protected snapshots remain", | |
1242 | local_ns | |
1243 | ), | |
1244 | Err(err) => { | |
1245 | task_log!(worker, "Failed to remove namespace {} - {}", local_ns, err); | |
1246 | errors = true; | |
1247 | } | |
1248 | } | |
1249 | } | |
1250 | ||
1251 | Ok(errors) | |
1252 | } | |
1253 | ||
29c56859 FG |
1254 | /// Pulls a store according to `params`. |
1255 | /// | |
1256 | /// Pulling a store consists of the following steps: | |
c06c1b4b FG |
1257 | /// - Query list of namespaces on the remote |
1258 | /// - Iterate list | |
1259 | /// -- create sub-NS if needed (and allowed) | |
1260 | /// -- attempt to pull each NS in turn | |
1261 | /// - (remove_vanished && max_depth > 0) remove sub-NS which are not or no longer available on the remote | |
1262 | /// | |
1263 | /// Backwards compat: if the remote namespace is `/` and recursion is disabled, no namespace is | |
1264 | /// passed to the remote at all to allow pulling from remotes which have no notion of namespaces. | |
29c56859 FG |
1265 | /// |
1266 | /// Permission checks: | |
c06c1b4b FG |
1267 | /// - access to local datastore, namespace anchor and remote entry need to be checked at call site |
1268 | /// - remote namespaces are filtered by remote | |
1269 | /// - creation and removal of sub-NS checked here | |
1270 | /// - access to sub-NS checked here | |
5b6cb51d | 1271 | pub(crate) async fn pull_store( |
07ad6470 | 1272 | worker: &WorkerTask, |
d9aad37f | 1273 | mut params: PullParameters, |
68ac365f | 1274 | ) -> Result<PullStats, Error> { |
07ad6470 | 1275 | // explicit create shared lock to prevent GC on newly created chunks |
05a52d01 | 1276 | let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?; |
7a3e777d | 1277 | let mut errors = false; |
07ad6470 | 1278 | |
7a3e777d | 1279 | let old_max_depth = params.max_depth; |
05a52d01 HL |
1280 | let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) { |
1281 | vec![params.source.get_ns()] // backwards compat - don't query remote namespaces! | |
c06c1b4b | 1282 | } else { |
05a52d01 HL |
1283 | params |
1284 | .source | |
1285 | .list_namespaces(&mut params.max_depth, worker) | |
1286 | .await? | |
c06c1b4b | 1287 | }; |
05a52d01 HL |
1288 | |
1289 | let ns_layers_to_be_pulled = namespaces | |
1290 | .iter() | |
1291 | .map(BackupNamespace::depth) | |
1292 | .max() | |
1293 | .map_or(0, |v| v - params.source.get_ns().depth()); | |
1294 | let target_depth = params.target.ns.depth(); | |
1295 | ||
1296 | if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH { | |
1297 | bail!( | |
1298 | "Syncing would exceed max allowed namespace depth. ({}+{} > {})", | |
1299 | ns_layers_to_be_pulled, | |
1300 | target_depth, | |
1301 | MAX_NAMESPACE_DEPTH | |
1302 | ); | |
1303 | } | |
1304 | ||
7a3e777d | 1305 | errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode |
05a52d01 | 1306 | namespaces.sort_unstable_by_key(|a| a.name_len()); |
c06c1b4b FG |
1307 | |
1308 | let (mut groups, mut snapshots) = (0, 0); | |
1309 | let mut synced_ns = HashSet::with_capacity(namespaces.len()); | |
68ac365f | 1310 | let mut pull_stats = PullStats::default(); |
c06c1b4b FG |
1311 | |
1312 | for namespace in namespaces { | |
4cc4ea64 | 1313 | let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace); |
abd82485 | 1314 | |
05a52d01 HL |
1315 | let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?; |
1316 | let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns); | |
c06c1b4b FG |
1317 | |
1318 | task_log!(worker, "----"); | |
1319 | task_log!( | |
1320 | worker, | |
1321 | "Syncing {} into {}", | |
abd82485 FG |
1322 | source_store_ns_str, |
1323 | target_store_ns_str | |
c06c1b4b FG |
1324 | ); |
1325 | ||
1326 | synced_ns.insert(target_ns.clone()); | |
1327 | ||
abd82485 | 1328 | match check_and_create_ns(¶ms, &target_ns) { |
c06c1b4b FG |
1329 | Ok(true) => task_log!(worker, "Created namespace {}", target_ns), |
1330 | Ok(false) => {} | |
1331 | Err(err) => { | |
1332 | task_log!( | |
1333 | worker, | |
1334 | "Cannot sync {} into {} - {}", | |
abd82485 FG |
1335 | source_store_ns_str, |
1336 | target_store_ns_str, | |
c06c1b4b FG |
1337 | err, |
1338 | ); | |
1339 | errors = true; | |
1340 | continue; | |
1341 | } | |
1342 | } | |
1343 | ||
05a52d01 | 1344 | match pull_ns(worker, &namespace, &mut params).await { |
68ac365f | 1345 | Ok((ns_progress, ns_pull_stats, ns_errors)) => { |
c06c1b4b FG |
1346 | errors |= ns_errors; |
1347 | ||
68ac365f CE |
1348 | pull_stats.add(ns_pull_stats); |
1349 | ||
b9310489 | 1350 | if params.max_depth != Some(0) { |
c06c1b4b FG |
1351 | groups += ns_progress.done_groups; |
1352 | snapshots += ns_progress.done_snapshots; | |
1353 | task_log!( | |
1354 | worker, | |
1355 | "Finished syncing namespace {}, current progress: {} groups, {} snapshots", | |
1356 | namespace, | |
1357 | groups, | |
1358 | snapshots, | |
1359 | ); | |
1360 | } | |
1361 | } | |
1362 | Err(err) => { | |
1363 | errors = true; | |
1364 | task_log!( | |
1365 | worker, | |
1366 | "Encountered errors while syncing namespace {} - {}", | |
05a52d01 | 1367 | &namespace, |
c06c1b4b FG |
1368 | err, |
1369 | ); | |
1370 | } | |
1371 | }; | |
1372 | } | |
1373 | ||
1374 | if params.remove_vanished { | |
d9aad37f | 1375 | errors |= check_and_remove_vanished_ns(worker, ¶ms, synced_ns)?; |
c06c1b4b FG |
1376 | } |
1377 | ||
1378 | if errors { | |
1379 | bail!("sync failed with some errors."); | |
1380 | } | |
1381 | ||
68ac365f | 1382 | Ok(pull_stats) |
c06c1b4b FG |
1383 | } |
1384 | ||
1385 | /// Pulls a namespace according to `params`. | |
1386 | /// | |
1387 | /// Pulling a namespace consists of the following steps: | |
1388 | /// - Query list of groups on the remote (in `source_ns`) | |
1389 | /// - Filter list according to configured group filters | |
1390 | /// - Iterate list and attempt to pull each group in turn | |
1391 | /// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are | |
1392 | /// not or no longer available on the remote | |
1393 | /// | |
1394 | /// Permission checks: | |
1395 | /// - remote namespaces are filtered by remote | |
1396 | /// - owner check for vanished groups done here | |
5b6cb51d | 1397 | pub(crate) async fn pull_ns( |
c06c1b4b | 1398 | worker: &WorkerTask, |
05a52d01 HL |
1399 | namespace: &BackupNamespace, |
1400 | params: &mut PullParameters, | |
68ac365f | 1401 | ) -> Result<(StoreProgress, PullStats, bool), Error> { |
05a52d01 | 1402 | let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, ¶ms.owner).await?; |
07ad6470 DM |
1403 | |
1404 | list.sort_unstable_by(|a, b| { | |
05a52d01 | 1405 | let type_order = a.ty.cmp(&b.ty); |
07ad6470 | 1406 | if type_order == std::cmp::Ordering::Equal { |
05a52d01 | 1407 | a.id.cmp(&b.id) |
07ad6470 DM |
1408 | } else { |
1409 | type_order | |
1410 | } | |
1411 | }); | |
1412 | ||
59c92736 PH |
1413 | let unfiltered_count = list.len(); |
1414 | let list: Vec<BackupGroup> = list | |
1415 | .into_iter() | |
1416 | .filter(|group| group.apply_filters(¶ms.group_filter)) | |
1417 | .collect(); | |
1418 | task_log!( | |
1419 | worker, | |
1420 | "found {} groups to sync (out of {} total)", | |
1421 | list.len(), | |
1422 | unfiltered_count | |
1423 | ); | |
71e53463 | 1424 | |
07ad6470 DM |
1425 | let mut errors = false; |
1426 | ||
05a52d01 | 1427 | let mut new_groups = HashSet::new(); |
e2e7560d FG |
1428 | for group in list.iter() { |
1429 | new_groups.insert(group.clone()); | |
07ad6470 DM |
1430 | } |
1431 | ||
fc8920e3 | 1432 | let mut progress = StoreProgress::new(list.len() as u64); |
68ac365f | 1433 | let mut pull_stats = PullStats::default(); |
fc8920e3 | 1434 | |
05a52d01 HL |
1435 | let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?; |
1436 | ||
e2e7560d | 1437 | for (done, group) in list.into_iter().enumerate() { |
fc8920e3 FG |
1438 | progress.done_groups = done as u64; |
1439 | progress.done_snapshots = 0; | |
1440 | progress.group_snapshots = 0; | |
7b8aa893 | 1441 | |
133d718f WB |
1442 | let (owner, _lock_guard) = |
1443 | match params | |
05a52d01 | 1444 | .target |
133d718f | 1445 | .store |
c06c1b4b | 1446 | .create_locked_backup_group(&target_ns, &group, ¶ms.owner) |
133d718f WB |
1447 | { |
1448 | Ok(result) => result, | |
1449 | Err(err) => { | |
1450 | task_log!( | |
1451 | worker, | |
1452 | "sync group {} failed - group lock failed: {}", | |
1453 | &group, | |
1454 | err | |
1455 | ); | |
05a52d01 HL |
1456 | errors = true; |
1457 | // do not stop here, instead continue | |
1458 | task_log!(worker, "create_locked_backup_group failed"); | |
133d718f WB |
1459 | continue; |
1460 | } | |
1461 | }; | |
30f73fa2 | 1462 | |
07ad6470 | 1463 | // permission check |
6e9e6c7a | 1464 | if params.owner != owner { |
e2956c60 | 1465 | // only the owner is allowed to create additional snapshots |
1ec0d70d DM |
1466 | task_log!( |
1467 | worker, | |
e2e7560d | 1468 | "sync group {} failed - owner check failed ({} != {})", |
ee0ea735 TL |
1469 | &group, |
1470 | params.owner, | |
1471 | owner | |
1ec0d70d | 1472 | ); |
7b8aa893 | 1473 | errors = true; // do not stop here, instead continue |
68ac365f CE |
1474 | } else { |
1475 | match pull_group(worker, params, namespace, &group, &mut progress).await { | |
1476 | Ok(stats) => pull_stats.add(stats), | |
1477 | Err(err) => { | |
1478 | task_log!(worker, "sync group {} failed - {}", &group, err,); | |
1479 | errors = true; // do not stop here, instead continue | |
1480 | } | |
1481 | } | |
07ad6470 DM |
1482 | } |
1483 | } | |
1484 | ||
6e9e6c7a | 1485 | if params.remove_vanished { |
6ef1b649 | 1486 | let result: Result<(), Error> = proxmox_lang::try_block!({ |
05a52d01 | 1487 | for local_group in params.target.store.iter_backup_groups(target_ns.clone())? { |
249dde8b | 1488 | let local_group = local_group?; |
e13303fc FG |
1489 | let local_group = local_group.group(); |
1490 | if new_groups.contains(local_group) { | |
e2956c60 FG |
1491 | continue; |
1492 | } | |
05a52d01 | 1493 | let owner = params.target.store.get_owner(&target_ns, local_group)?; |
df768ebe FG |
1494 | if check_backup_owner(&owner, ¶ms.owner).is_err() { |
1495 | continue; | |
1496 | } | |
59c92736 PH |
1497 | if !local_group.apply_filters(¶ms.group_filter) { |
1498 | continue; | |
71e53463 | 1499 | } |
e13303fc | 1500 | task_log!(worker, "delete vanished group '{local_group}'",); |
524ed404 | 1501 | let delete_stats_result = params |
05a52d01 HL |
1502 | .target |
1503 | .store | |
524ed404 CE |
1504 | .remove_backup_group(&target_ns, local_group); |
1505 | ||
1506 | match delete_stats_result { | |
1507 | Ok(stats) => { | |
1508 | if !stats.all_removed() { | |
1509 | task_log!( | |
1510 | worker, | |
1511 | "kept some protected snapshots of group '{local_group}'", | |
1512 | ); | |
1513 | } | |
ee0ea735 | 1514 | } |
34339261 DC |
1515 | Err(err) => { |
1516 | task_log!(worker, "{}", err); | |
1517 | errors = true; | |
1518 | } | |
07ad6470 DM |
1519 | } |
1520 | } | |
1521 | Ok(()) | |
1522 | }); | |
1523 | if let Err(err) = result { | |
1ec0d70d | 1524 | task_log!(worker, "error during cleanup: {}", err); |
07ad6470 DM |
1525 | errors = true; |
1526 | }; | |
1527 | } | |
1528 | ||
68ac365f | 1529 | Ok((progress, pull_stats, errors)) |
07ad6470 | 1530 | } |