]>
Commit | Line | Data |
---|---|---|
07ad6470 DM |
1 | //! Sync datastore from remote server |
2 | ||
e2956c60 | 3 | use std::collections::{HashMap, HashSet}; |
076f36ec HL |
4 | use std::io::{Seek, Write}; |
5 | use std::path::{Path, PathBuf}; | |
54417086 | 6 | use std::sync::atomic::{AtomicUsize, Ordering}; |
e2956c60 FG |
7 | use std::sync::{Arc, Mutex}; |
8 | use std::time::SystemTime; | |
07ad6470 | 9 | |
c23192d3 | 10 | use anyhow::{bail, format_err, Error}; |
6ef1b649 | 11 | use http::StatusCode; |
05a52d01 | 12 | use proxmox_rest_server::WorkerTask; |
6ef1b649 | 13 | use proxmox_router::HttpError; |
05a52d01 HL |
14 | use proxmox_sys::{task_log, task_warn}; |
15 | use serde_json::json; | |
c23192d3 | 16 | |
2d5287fb | 17 | use pbs_api_types::{ |
05a52d01 HL |
18 | print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter, |
19 | GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH, | |
7aeabff2 | 20 | PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ, |
2d5287fb | 21 | }; |
05a52d01 HL |
22 | use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader}; |
23 | use pbs_config::CachedUserInfo; | |
ea584a75 WB |
24 | use pbs_datastore::data_blob::DataBlob; |
25 | use pbs_datastore::dynamic_index::DynamicIndexReader; | |
26 | use pbs_datastore::fixed_index::FixedIndexReader; | |
27 | use pbs_datastore::index::IndexFile; | |
28 | use pbs_datastore::manifest::{ | |
ee0ea735 | 29 | archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, |
ea584a75 | 30 | }; |
05a52d01 | 31 | use pbs_datastore::read_chunk::AsyncReadChunk; |
076f36ec HL |
32 | use pbs_datastore::{ |
33 | check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress, | |
34 | }; | |
ba0ccc59 | 35 | use pbs_tools::sha::sha256; |
c23192d3 | 36 | |
076f36ec | 37 | use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups}; |
260147bd | 38 | use crate::tools::parallel_handler::ParallelHandler; |
07ad6470 | 39 | |
05a52d01 HL |
40 | struct RemoteReader { |
41 | backup_reader: Arc<BackupReader>, | |
42 | dir: BackupDir, | |
43 | } | |
44 | ||
076f36ec HL |
45 | struct LocalReader { |
46 | _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>, | |
47 | path: PathBuf, | |
48 | datastore: Arc<DataStore>, | |
49 | } | |
50 | ||
05a52d01 | 51 | pub(crate) struct PullTarget { |
6e9e6c7a | 52 | store: Arc<DataStore>, |
c06c1b4b | 53 | ns: BackupNamespace, |
05a52d01 HL |
54 | } |
55 | ||
56 | pub(crate) struct RemoteSource { | |
57 | repo: BackupRepository, | |
58 | ns: BackupNamespace, | |
59 | client: HttpClient, | |
60 | } | |
61 | ||
076f36ec HL |
62 | pub(crate) struct LocalSource { |
63 | store: Arc<DataStore>, | |
64 | ns: BackupNamespace, | |
65 | } | |
66 | ||
05a52d01 HL |
67 | #[async_trait::async_trait] |
68 | /// `PullSource` is a trait that provides an interface for pulling data/information from a source. | |
69 | /// The trait includes methods for listing namespaces, groups, and backup directories, | |
70 | /// as well as retrieving a reader for reading data from the source | |
71 | trait PullSource: Send + Sync { | |
72 | /// Lists namespaces from the source. | |
73 | async fn list_namespaces( | |
74 | &self, | |
75 | max_depth: &mut Option<usize>, | |
76 | worker: &WorkerTask, | |
77 | ) -> Result<Vec<BackupNamespace>, Error>; | |
78 | ||
79 | /// Lists groups within a specific namespace from the source. | |
80 | async fn list_groups( | |
81 | &self, | |
82 | namespace: &BackupNamespace, | |
83 | owner: &Authid, | |
84 | ) -> Result<Vec<BackupGroup>, Error>; | |
85 | ||
86 | /// Lists backup directories for a specific group within a specific namespace from the source. | |
87 | async fn list_backup_dirs( | |
88 | &self, | |
89 | namespace: &BackupNamespace, | |
90 | group: &BackupGroup, | |
91 | worker: &WorkerTask, | |
92 | ) -> Result<Vec<BackupDir>, Error>; | |
93 | fn get_ns(&self) -> BackupNamespace; | |
4cc4ea64 | 94 | fn get_store(&self) -> &str; |
05a52d01 HL |
95 | |
96 | /// Returns a reader for reading data from a specific backup directory. | |
97 | async fn reader( | |
98 | &self, | |
99 | ns: &BackupNamespace, | |
100 | dir: &BackupDir, | |
101 | ) -> Result<Arc<dyn PullReader>, Error>; | |
102 | } | |
103 | ||
104 | #[async_trait::async_trait] | |
105 | impl PullSource for RemoteSource { | |
106 | async fn list_namespaces( | |
107 | &self, | |
108 | max_depth: &mut Option<usize>, | |
109 | worker: &WorkerTask, | |
110 | ) -> Result<Vec<BackupNamespace>, Error> { | |
111 | if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) { | |
112 | return Ok(vec![self.ns.clone()]); | |
113 | } | |
114 | ||
115 | let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store()); | |
116 | let mut data = json!({}); | |
117 | if let Some(max_depth) = max_depth { | |
118 | data["max-depth"] = json!(max_depth); | |
119 | } | |
120 | ||
121 | if !self.ns.is_root() { | |
122 | data["parent"] = json!(self.ns); | |
123 | } | |
124 | self.client.login().await?; | |
125 | ||
126 | let mut result = match self.client.get(&path, Some(data)).await { | |
127 | Ok(res) => res, | |
128 | Err(err) => match err.downcast_ref::<HttpError>() { | |
129 | Some(HttpError { code, message }) => match code { | |
130 | &StatusCode::NOT_FOUND => { | |
131 | if self.ns.is_root() && max_depth.is_none() { | |
132 | task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode"); | |
133 | task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system."); | |
134 | max_depth.replace(0); | |
135 | } else { | |
136 | bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.") | |
137 | } | |
138 | ||
139 | return Ok(vec![self.ns.clone()]); | |
140 | } | |
141 | _ => { | |
142 | bail!("Querying namespaces failed - HTTP error {code} - {message}"); | |
143 | } | |
144 | }, | |
145 | None => { | |
146 | bail!("Querying namespaces failed - {err}"); | |
147 | } | |
148 | }, | |
149 | }; | |
150 | ||
151 | let list: Vec<BackupNamespace> = | |
152 | serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())? | |
153 | .into_iter() | |
154 | .map(|list_item| list_item.ns) | |
155 | .collect(); | |
156 | ||
157 | Ok(list) | |
158 | } | |
159 | ||
160 | async fn list_groups( | |
161 | &self, | |
162 | namespace: &BackupNamespace, | |
163 | _owner: &Authid, | |
164 | ) -> Result<Vec<BackupGroup>, Error> { | |
165 | let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store()); | |
166 | ||
167 | let args = if !namespace.is_root() { | |
168 | Some(json!({ "ns": namespace.clone() })) | |
169 | } else { | |
170 | None | |
171 | }; | |
172 | ||
173 | self.client.login().await?; | |
174 | let mut result = | |
175 | self.client.get(&path, args).await.map_err(|err| { | |
176 | format_err!("Failed to retrieve backup groups from remote - {}", err) | |
177 | })?; | |
178 | ||
179 | Ok( | |
180 | serde_json::from_value::<Vec<GroupListItem>>(result["data"].take()) | |
181 | .map_err(Error::from)? | |
182 | .into_iter() | |
183 | .map(|item| item.backup) | |
184 | .collect::<Vec<BackupGroup>>(), | |
185 | ) | |
186 | } | |
187 | ||
188 | async fn list_backup_dirs( | |
189 | &self, | |
b14e5dcb | 190 | namespace: &BackupNamespace, |
05a52d01 HL |
191 | group: &BackupGroup, |
192 | worker: &WorkerTask, | |
193 | ) -> Result<Vec<BackupDir>, Error> { | |
194 | let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store()); | |
195 | ||
196 | let mut args = json!({ | |
197 | "backup-type": group.ty, | |
198 | "backup-id": group.id, | |
199 | }); | |
200 | ||
b14e5dcb FG |
201 | if !namespace.is_root() { |
202 | args["ns"] = serde_json::to_value(&namespace)?; | |
05a52d01 HL |
203 | } |
204 | ||
205 | self.client.login().await?; | |
206 | ||
207 | let mut result = self.client.get(&path, Some(args)).await?; | |
208 | let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?; | |
209 | Ok(snapshot_list | |
210 | .into_iter() | |
211 | .filter_map(|item: SnapshotListItem| { | |
212 | let snapshot = item.backup; | |
213 | // in-progress backups can't be synced | |
214 | if item.size.is_none() { | |
215 | task_log!( | |
216 | worker, | |
217 | "skipping snapshot {} - in-progress backup", | |
218 | snapshot | |
219 | ); | |
220 | return None; | |
221 | } | |
222 | ||
223 | Some(snapshot) | |
224 | }) | |
225 | .collect::<Vec<BackupDir>>()) | |
226 | } | |
227 | ||
228 | fn get_ns(&self) -> BackupNamespace { | |
229 | self.ns.clone() | |
230 | } | |
231 | ||
4cc4ea64 FG |
232 | fn get_store(&self) -> &str { |
233 | &self.repo.store() | |
05a52d01 HL |
234 | } |
235 | ||
236 | async fn reader( | |
237 | &self, | |
238 | ns: &BackupNamespace, | |
239 | dir: &BackupDir, | |
240 | ) -> Result<Arc<dyn PullReader>, Error> { | |
241 | let backup_reader = | |
242 | BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?; | |
243 | Ok(Arc::new(RemoteReader { | |
244 | backup_reader, | |
245 | dir: dir.clone(), | |
246 | })) | |
247 | } | |
248 | } | |
249 | ||
076f36ec HL |
250 | #[async_trait::async_trait] |
251 | impl PullSource for LocalSource { | |
252 | async fn list_namespaces( | |
253 | &self, | |
254 | max_depth: &mut Option<usize>, | |
255 | _worker: &WorkerTask, | |
256 | ) -> Result<Vec<BackupNamespace>, Error> { | |
257 | ListNamespacesRecursive::new_max_depth( | |
258 | self.store.clone(), | |
259 | self.ns.clone(), | |
260 | max_depth.unwrap_or(MAX_NAMESPACE_DEPTH), | |
261 | )? | |
262 | .collect() | |
263 | } | |
264 | ||
265 | async fn list_groups( | |
266 | &self, | |
267 | namespace: &BackupNamespace, | |
268 | owner: &Authid, | |
269 | ) -> Result<Vec<BackupGroup>, Error> { | |
270 | Ok(ListAccessibleBackupGroups::new_with_privs( | |
271 | &self.store, | |
272 | namespace.clone(), | |
273 | 0, | |
7aeabff2 HL |
274 | Some(PRIV_DATASTORE_READ), |
275 | Some(PRIV_DATASTORE_BACKUP), | |
076f36ec HL |
276 | Some(owner), |
277 | )? | |
278 | .filter_map(Result::ok) | |
279 | .map(|backup_group| backup_group.group().clone()) | |
280 | .collect::<Vec<pbs_api_types::BackupGroup>>()) | |
281 | } | |
282 | ||
283 | async fn list_backup_dirs( | |
284 | &self, | |
285 | namespace: &BackupNamespace, | |
286 | group: &BackupGroup, | |
287 | _worker: &WorkerTask, | |
288 | ) -> Result<Vec<BackupDir>, Error> { | |
289 | Ok(self | |
290 | .store | |
291 | .backup_group(namespace.clone(), group.clone()) | |
292 | .iter_snapshots()? | |
293 | .filter_map(Result::ok) | |
294 | .map(|snapshot| snapshot.dir().to_owned()) | |
295 | .collect::<Vec<BackupDir>>()) | |
296 | } | |
297 | ||
298 | fn get_ns(&self) -> BackupNamespace { | |
299 | self.ns.clone() | |
300 | } | |
301 | ||
4cc4ea64 FG |
302 | fn get_store(&self) -> &str { |
303 | self.store.name() | |
076f36ec HL |
304 | } |
305 | ||
306 | async fn reader( | |
307 | &self, | |
308 | ns: &BackupNamespace, | |
309 | dir: &BackupDir, | |
310 | ) -> Result<Arc<dyn PullReader>, Error> { | |
311 | let dir = self.store.backup_dir(ns.clone(), dir.clone())?; | |
312 | let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared( | |
313 | &dir.full_path(), | |
314 | "snapshot", | |
315 | "locked by another operation", | |
316 | )?; | |
317 | Ok(Arc::new(LocalReader { | |
318 | _dir_lock: Arc::new(Mutex::new(dir_lock)), | |
319 | path: dir.full_path(), | |
320 | datastore: dir.datastore().clone(), | |
321 | })) | |
322 | } | |
323 | } | |
324 | ||
05a52d01 HL |
325 | #[async_trait::async_trait] |
326 | /// `PullReader` is a trait that provides an interface for reading data from a source. | |
327 | /// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped. | |
328 | trait PullReader: Send + Sync { | |
329 | /// Returns a chunk reader with the specified encryption mode. | |
330 | fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>; | |
331 | ||
332 | /// Asynchronously loads a file from the source into a local file. | |
333 | /// `filename` is the name of the file to load from the source. | |
334 | /// `into` is the path of the local file to load the source file into. | |
335 | async fn load_file_into( | |
336 | &self, | |
337 | filename: &str, | |
338 | into: &Path, | |
339 | worker: &WorkerTask, | |
340 | ) -> Result<Option<DataBlob>, Error>; | |
341 | ||
342 | /// Tries to download the client log from the source and save it into a local file. | |
343 | async fn try_download_client_log( | |
344 | &self, | |
345 | to_path: &Path, | |
346 | worker: &WorkerTask, | |
347 | ) -> Result<(), Error>; | |
348 | ||
349 | fn skip_chunk_sync(&self, target_store_name: &str) -> bool; | |
350 | } | |
351 | ||
352 | #[async_trait::async_trait] | |
353 | impl PullReader for RemoteReader { | |
354 | fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> { | |
355 | Arc::new(RemoteChunkReader::new( | |
356 | self.backup_reader.clone(), | |
357 | None, | |
358 | crypt_mode, | |
359 | HashMap::new(), | |
360 | )) | |
361 | } | |
362 | ||
363 | async fn load_file_into( | |
364 | &self, | |
365 | filename: &str, | |
366 | into: &Path, | |
367 | worker: &WorkerTask, | |
368 | ) -> Result<Option<DataBlob>, Error> { | |
369 | let mut tmp_file = std::fs::OpenOptions::new() | |
370 | .write(true) | |
371 | .create(true) | |
372 | .truncate(true) | |
373 | .read(true) | |
374 | .open(into)?; | |
375 | let download_result = self.backup_reader.download(filename, &mut tmp_file).await; | |
376 | if let Err(err) = download_result { | |
377 | match err.downcast_ref::<HttpError>() { | |
378 | Some(HttpError { code, message }) => match *code { | |
379 | StatusCode::NOT_FOUND => { | |
380 | task_log!( | |
381 | worker, | |
382 | "skipping snapshot {} - vanished since start of sync", | |
383 | &self.dir, | |
384 | ); | |
385 | return Ok(None); | |
386 | } | |
387 | _ => { | |
388 | bail!("HTTP error {code} - {message}"); | |
389 | } | |
390 | }, | |
391 | None => { | |
392 | return Err(err); | |
393 | } | |
394 | }; | |
395 | }; | |
396 | tmp_file.rewind()?; | |
397 | Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) | |
398 | } | |
399 | ||
400 | async fn try_download_client_log( | |
401 | &self, | |
402 | to_path: &Path, | |
403 | worker: &WorkerTask, | |
404 | ) -> Result<(), Error> { | |
405 | let mut tmp_path = to_path.to_owned(); | |
406 | tmp_path.set_extension("tmp"); | |
407 | ||
408 | let tmpfile = std::fs::OpenOptions::new() | |
409 | .write(true) | |
410 | .create(true) | |
411 | .read(true) | |
412 | .open(&tmp_path)?; | |
413 | ||
414 | // Note: be silent if there is no log - only log successful download | |
415 | if let Ok(()) = self | |
416 | .backup_reader | |
417 | .download(CLIENT_LOG_BLOB_NAME, tmpfile) | |
418 | .await | |
419 | { | |
420 | if let Err(err) = std::fs::rename(&tmp_path, to_path) { | |
421 | bail!("Atomic rename file {:?} failed - {}", to_path, err); | |
422 | } | |
423 | task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME); | |
424 | } | |
425 | ||
426 | Ok(()) | |
427 | } | |
428 | ||
429 | fn skip_chunk_sync(&self, _target_store_name: &str) -> bool { | |
430 | false | |
431 | } | |
432 | } | |
433 | ||
076f36ec HL |
434 | #[async_trait::async_trait] |
435 | impl PullReader for LocalReader { | |
436 | fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> { | |
437 | Arc::new(LocalChunkReader::new( | |
438 | self.datastore.clone(), | |
439 | None, | |
440 | crypt_mode, | |
441 | )) | |
442 | } | |
443 | ||
444 | async fn load_file_into( | |
445 | &self, | |
446 | filename: &str, | |
447 | into: &Path, | |
448 | _worker: &WorkerTask, | |
449 | ) -> Result<Option<DataBlob>, Error> { | |
450 | let mut tmp_file = std::fs::OpenOptions::new() | |
451 | .write(true) | |
452 | .create(true) | |
453 | .truncate(true) | |
454 | .read(true) | |
455 | .open(into)?; | |
456 | let mut from_path = self.path.clone(); | |
457 | from_path.push(filename); | |
458 | tmp_file.write_all(std::fs::read(from_path)?.as_slice())?; | |
459 | tmp_file.rewind()?; | |
460 | Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) | |
461 | } | |
462 | ||
463 | async fn try_download_client_log( | |
464 | &self, | |
465 | _to_path: &Path, | |
466 | _worker: &WorkerTask, | |
467 | ) -> Result<(), Error> { | |
468 | Ok(()) | |
469 | } | |
470 | ||
471 | fn skip_chunk_sync(&self, target_store_name: &str) -> bool { | |
472 | self.datastore.name() == target_store_name | |
473 | } | |
474 | } | |
475 | ||
05a52d01 HL |
476 | /// Parameters for a pull operation. |
477 | pub(crate) struct PullParameters { | |
478 | /// Where data is pulled from | |
479 | source: Arc<dyn PullSource>, | |
480 | /// Where data should be pulled into | |
481 | target: PullTarget, | |
29c56859 | 482 | /// Owner of synced groups (needs to match local owner of pre-existing groups) |
6e9e6c7a | 483 | owner: Authid, |
29c56859 | 484 | /// Whether to remove groups which exist locally, but not on the remote end |
6e9e6c7a | 485 | remove_vanished: bool, |
b9310489 FG |
486 | /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion) |
487 | max_depth: Option<usize>, | |
29c56859 | 488 | /// Filters for reducing the pull scope |
71e53463 | 489 | group_filter: Option<Vec<GroupFilter>>, |
9b67352a SH |
490 | /// How many snapshots should be transferred at most (taking the newest N snapshots) |
491 | transfer_last: Option<usize>, | |
6e9e6c7a FG |
492 | } |
493 | ||
494 | impl PullParameters { | |
29c56859 | 495 | /// Creates a new instance of `PullParameters`. |
5b6cb51d | 496 | pub(crate) fn new( |
6e9e6c7a | 497 | store: &str, |
c06c1b4b | 498 | ns: BackupNamespace, |
05a52d01 | 499 | remote: Option<&str>, |
6e9e6c7a | 500 | remote_store: &str, |
c06c1b4b | 501 | remote_ns: BackupNamespace, |
6e9e6c7a FG |
502 | owner: Authid, |
503 | remove_vanished: Option<bool>, | |
b9310489 | 504 | max_depth: Option<usize>, |
71e53463 | 505 | group_filter: Option<Vec<GroupFilter>>, |
2d5287fb | 506 | limit: RateLimitConfig, |
9b67352a | 507 | transfer_last: Option<usize>, |
6e9e6c7a | 508 | ) -> Result<Self, Error> { |
66abc4cb FG |
509 | if let Some(max_depth) = max_depth { |
510 | ns.check_max_depth(max_depth)?; | |
511 | remote_ns.check_max_depth(max_depth)?; | |
05a52d01 | 512 | }; |
61ef4ae8 | 513 | let remove_vanished = remove_vanished.unwrap_or(false); |
6e9e6c7a | 514 | |
05a52d01 HL |
515 | let source: Arc<dyn PullSource> = if let Some(remote) = remote { |
516 | let (remote_config, _digest) = pbs_config::remote::config()?; | |
517 | let remote: Remote = remote_config.lookup("remote", remote)?; | |
6e9e6c7a | 518 | |
05a52d01 HL |
519 | let repo = BackupRepository::new( |
520 | Some(remote.config.auth_id.clone()), | |
521 | Some(remote.config.host.clone()), | |
522 | remote.config.port, | |
523 | remote_store.to_string(), | |
524 | ); | |
525 | let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?; | |
526 | Arc::new(RemoteSource { | |
527 | repo, | |
528 | ns: remote_ns, | |
529 | client, | |
530 | }) | |
531 | } else { | |
076f36ec HL |
532 | Arc::new(LocalSource { |
533 | store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?, | |
534 | ns: remote_ns, | |
535 | }) | |
05a52d01 HL |
536 | }; |
537 | let target = PullTarget { | |
538 | store: DataStore::lookup_datastore(store, Some(Operation::Write))?, | |
c06c1b4b | 539 | ns, |
05a52d01 HL |
540 | }; |
541 | ||
542 | Ok(Self { | |
ee0ea735 | 543 | source, |
05a52d01 | 544 | target, |
ee0ea735 TL |
545 | owner, |
546 | remove_vanished, | |
c06c1b4b | 547 | max_depth, |
ee0ea735 | 548 | group_filter, |
9b67352a | 549 | transfer_last, |
ee0ea735 | 550 | }) |
6e9e6c7a | 551 | } |
6e9e6c7a FG |
552 | } |
553 | ||
07ad6470 | 554 | async fn pull_index_chunks<I: IndexFile>( |
998db639 | 555 | worker: &WorkerTask, |
05a52d01 | 556 | chunk_reader: Arc<dyn AsyncReadChunk>, |
07ad6470 DM |
557 | target: Arc<DataStore>, |
558 | index: I, | |
e2956c60 | 559 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 560 | ) -> Result<(), Error> { |
73b2cc49 | 561 | use futures::stream::{self, StreamExt, TryStreamExt}; |
07ad6470 | 562 | |
998db639 DM |
563 | let start_time = SystemTime::now(); |
564 | ||
ebbe4958 DM |
565 | let stream = stream::iter( |
566 | (0..index.index_count()) | |
567 | .map(|pos| index.chunk_info(pos).unwrap()) | |
568 | .filter(|info| { | |
569 | let mut guard = downloaded_chunks.lock().unwrap(); | |
570 | let done = guard.contains(&info.digest); | |
571 | if !done { | |
572 | // Note: We mark a chunk as downloaded before its actually downloaded | |
573 | // to avoid duplicate downloads. | |
574 | guard.insert(info.digest); | |
575 | } | |
576 | !done | |
e2956c60 | 577 | }), |
ebbe4958 | 578 | ); |
07ad6470 | 579 | |
a71bc08f | 580 | let target2 = target.clone(); |
54417086 | 581 | let verify_pool = ParallelHandler::new( |
e2956c60 FG |
582 | "sync chunk writer", |
583 | 4, | |
584 | move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { | |
25877d05 | 585 | // println!("verify and write {}", hex::encode(&digest)); |
54417086 | 586 | chunk.verify_unencrypted(size as usize, &digest)?; |
a71bc08f | 587 | target2.insert_chunk(&chunk, &digest)?; |
54417086 | 588 | Ok(()) |
e2956c60 | 589 | }, |
54417086 DM |
590 | ); |
591 | ||
592 | let verify_and_write_channel = verify_pool.channel(); | |
998db639 | 593 | |
54417086 DM |
594 | let bytes = Arc::new(AtomicUsize::new(0)); |
595 | ||
596 | stream | |
73b2cc49 | 597 | .map(|info| { |
73b2cc49 DM |
598 | let target = Arc::clone(&target); |
599 | let chunk_reader = chunk_reader.clone(); | |
54417086 DM |
600 | let bytes = Arc::clone(&bytes); |
601 | let verify_and_write_channel = verify_and_write_channel.clone(); | |
73b2cc49 DM |
602 | |
603 | Ok::<_, Error>(async move { | |
9a1b24b6 | 604 | let chunk_exists = proxmox_async::runtime::block_in_place(|| { |
e2956c60 FG |
605 | target.cond_touch_chunk(&info.digest, false) |
606 | })?; | |
73b2cc49 | 607 | if chunk_exists { |
25877d05 | 608 | //task_log!(worker, "chunk {} exists {}", pos, hex::encode(digest)); |
73b2cc49 DM |
609 | return Ok::<_, Error>(()); |
610 | } | |
25877d05 | 611 | //task_log!(worker, "sync {} chunk {}", pos, hex::encode(digest)); |
73b2cc49 | 612 | let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; |
54417086 | 613 | let raw_size = chunk.raw_size() as usize; |
73b2cc49 | 614 | |
998db639 | 615 | // decode, verify and write in a separate threads to maximize throughput |
9a1b24b6 | 616 | proxmox_async::runtime::block_in_place(|| { |
e2956c60 FG |
617 | verify_and_write_channel.send((chunk, info.digest, info.size())) |
618 | })?; | |
54417086 DM |
619 | |
620 | bytes.fetch_add(raw_size, Ordering::SeqCst); | |
998db639 DM |
621 | |
622 | Ok(()) | |
e2956c60 | 623 | }) |
73b2cc49 DM |
624 | }) |
625 | .try_buffer_unordered(20) | |
626 | .try_for_each(|_res| futures::future::ok(())) | |
54417086 | 627 | .await?; |
998db639 | 628 | |
54417086 | 629 | drop(verify_and_write_channel); |
998db639 | 630 | |
54417086 | 631 | verify_pool.complete()?; |
998db639 DM |
632 | |
633 | let elapsed = start_time.elapsed()?.as_secs_f64(); | |
634 | ||
54417086 DM |
635 | let bytes = bytes.load(Ordering::SeqCst); |
636 | ||
1ec0d70d | 637 | task_log!( |
ee0ea735 | 638 | worker, |
e2956c60 FG |
639 | "downloaded {} bytes ({:.2} MiB/s)", |
640 | bytes, | |
641 | (bytes as f64) / (1024.0 * 1024.0 * elapsed) | |
1ec0d70d | 642 | ); |
07ad6470 DM |
643 | |
644 | Ok(()) | |
645 | } | |
646 | ||
e2956c60 | 647 | fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> { |
2ce15934 | 648 | if size != info.size { |
e2956c60 FG |
649 | bail!( |
650 | "wrong size for file '{}' ({} != {})", | |
651 | info.filename, | |
652 | info.size, | |
653 | size | |
654 | ); | |
2ce15934 FG |
655 | } |
656 | ||
657 | if csum != &info.csum { | |
658 | bail!("wrong checksum for file '{}'", info.filename); | |
659 | } | |
660 | ||
661 | Ok(()) | |
662 | } | |
663 | ||
29c56859 FG |
664 | /// Pulls a single file referenced by a manifest. |
665 | /// | |
666 | /// Pulling an archive consists of the following steps: | |
05a52d01 HL |
667 | /// - Load archive file into tmp file |
668 | /// -- Load file into tmp file | |
669 | /// -- Verify tmp file checksum | |
29c56859 FG |
670 | /// - if archive is an index, pull referenced chunks |
671 | /// - Rename tmp file into real path | |
05a52d01 HL |
672 | async fn pull_single_archive<'a>( |
673 | worker: &'a WorkerTask, | |
674 | reader: Arc<dyn PullReader + 'a>, | |
675 | snapshot: &'a pbs_datastore::BackupDir, | |
676 | archive_info: &'a FileInfo, | |
e2956c60 | 677 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 678 | ) -> Result<(), Error> { |
2ce15934 | 679 | let archive_name = &archive_info.filename; |
c06c1b4b | 680 | let mut path = snapshot.full_path(); |
07ad6470 DM |
681 | path.push(archive_name); |
682 | ||
683 | let mut tmp_path = path.clone(); | |
684 | tmp_path.set_extension("tmp"); | |
685 | ||
1ec0d70d DM |
686 | task_log!(worker, "sync archive {}", archive_name); |
687 | ||
05a52d01 HL |
688 | reader |
689 | .load_file_into(archive_name, &tmp_path, worker) | |
690 | .await?; | |
07ad6470 | 691 | |
05a52d01 | 692 | let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?; |
07ad6470 DM |
693 | |
694 | match archive_type(archive_name)? { | |
695 | ArchiveType::DynamicIndex => { | |
e2956c60 FG |
696 | let index = DynamicIndexReader::new(tmpfile).map_err(|err| { |
697 | format_err!("unable to read dynamic index {:?} - {}", tmp_path, err) | |
698 | })?; | |
2ce15934 FG |
699 | let (csum, size) = index.compute_csum(); |
700 | verify_archive(archive_info, &csum, size)?; | |
07ad6470 | 701 | |
05a52d01 HL |
702 | if reader.skip_chunk_sync(snapshot.datastore().name()) { |
703 | task_log!(worker, "skipping chunk sync for same datastore"); | |
704 | } else { | |
705 | pull_index_chunks( | |
706 | worker, | |
707 | reader.chunk_reader(archive_info.crypt_mode), | |
708 | snapshot.datastore().clone(), | |
709 | index, | |
710 | downloaded_chunks, | |
711 | ) | |
712 | .await?; | |
713 | } | |
07ad6470 DM |
714 | } |
715 | ArchiveType::FixedIndex => { | |
e2956c60 FG |
716 | let index = FixedIndexReader::new(tmpfile).map_err(|err| { |
717 | format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err) | |
718 | })?; | |
2ce15934 FG |
719 | let (csum, size) = index.compute_csum(); |
720 | verify_archive(archive_info, &csum, size)?; | |
07ad6470 | 721 | |
05a52d01 HL |
722 | if reader.skip_chunk_sync(snapshot.datastore().name()) { |
723 | task_log!(worker, "skipping chunk sync for same datastore"); | |
724 | } else { | |
725 | pull_index_chunks( | |
726 | worker, | |
727 | reader.chunk_reader(archive_info.crypt_mode), | |
728 | snapshot.datastore().clone(), | |
729 | index, | |
730 | downloaded_chunks, | |
731 | ) | |
732 | .await?; | |
733 | } | |
07ad6470 | 734 | } |
2ce15934 | 735 | ArchiveType::Blob => { |
05a52d01 | 736 | tmpfile.rewind()?; |
ba0ccc59 | 737 | let (csum, size) = sha256(&mut tmpfile)?; |
2ce15934 FG |
738 | verify_archive(archive_info, &csum, size)?; |
739 | } | |
07ad6470 DM |
740 | } |
741 | if let Err(err) = std::fs::rename(&tmp_path, &path) { | |
742 | bail!("Atomic rename file {:?} failed - {}", path, err); | |
743 | } | |
744 | Ok(()) | |
745 | } | |
746 | ||
29c56859 FG |
747 | /// Actual implementation of pulling a snapshot. |
748 | /// | |
749 | /// Pulling a snapshot consists of the following steps: | |
750 | /// - (Re)download the manifest | |
751 | /// -- if it matches, only download log and treat snapshot as already synced | |
752 | /// - Iterate over referenced files | |
753 | /// -- if file already exists, verify contents | |
754 | /// -- if not, pull it from the remote | |
755 | /// - Download log if not already existing | |
05a52d01 HL |
756 | async fn pull_snapshot<'a>( |
757 | worker: &'a WorkerTask, | |
758 | reader: Arc<dyn PullReader + 'a>, | |
759 | snapshot: &'a pbs_datastore::BackupDir, | |
e2956c60 | 760 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 761 | ) -> Result<(), Error> { |
c06c1b4b | 762 | let mut manifest_name = snapshot.full_path(); |
07ad6470 DM |
763 | manifest_name.push(MANIFEST_BLOB_NAME); |
764 | ||
c06c1b4b | 765 | let mut client_log_name = snapshot.full_path(); |
1610c45a DM |
766 | client_log_name.push(CLIENT_LOG_BLOB_NAME); |
767 | ||
07ad6470 DM |
768 | let mut tmp_manifest_name = manifest_name.clone(); |
769 | tmp_manifest_name.set_extension("tmp"); | |
05a52d01 HL |
770 | let tmp_manifest_blob; |
771 | if let Some(data) = reader | |
772 | .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker) | |
773 | .await? | |
774 | { | |
775 | tmp_manifest_blob = data; | |
776 | } else { | |
777 | return Ok(()); | |
778 | } | |
07ad6470 DM |
779 | |
780 | if manifest_name.exists() { | |
6ef1b649 | 781 | let manifest_blob = proxmox_lang::try_block!({ |
e2956c60 | 782 | let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| { |
87cdc327 | 783 | format_err!("unable to open local manifest {manifest_name:?} - {err}") |
e2956c60 | 784 | })?; |
07ad6470 | 785 | |
39f18b30 | 786 | let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?; |
07ad6470 | 787 | Ok(manifest_blob) |
e2956c60 FG |
788 | }) |
789 | .map_err(|err: Error| { | |
87cdc327 | 790 | format_err!("unable to read local manifest {manifest_name:?} - {err}") |
07ad6470 DM |
791 | })?; |
792 | ||
793 | if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { | |
1610c45a | 794 | if !client_log_name.exists() { |
05a52d01 HL |
795 | reader |
796 | .try_download_client_log(&client_log_name, worker) | |
797 | .await?; | |
798 | }; | |
1ec0d70d | 799 | task_log!(worker, "no data changes"); |
e0085e66 | 800 | let _ = std::fs::remove_file(&tmp_manifest_name); |
07ad6470 DM |
801 | return Ok(()); // nothing changed |
802 | } | |
803 | } | |
804 | ||
805 | let manifest = BackupManifest::try_from(tmp_manifest_blob)?; | |
806 | ||
07ad6470 | 807 | for item in manifest.files() { |
c06c1b4b | 808 | let mut path = snapshot.full_path(); |
07ad6470 DM |
809 | path.push(&item.filename); |
810 | ||
811 | if path.exists() { | |
812 | match archive_type(&item.filename)? { | |
813 | ArchiveType::DynamicIndex => { | |
814 | let index = DynamicIndexReader::open(&path)?; | |
815 | let (csum, size) = index.compute_csum(); | |
816 | match manifest.verify_file(&item.filename, &csum, size) { | |
817 | Ok(_) => continue, | |
818 | Err(err) => { | |
1ec0d70d | 819 | task_log!(worker, "detected changed file {:?} - {}", path, err); |
07ad6470 DM |
820 | } |
821 | } | |
822 | } | |
823 | ArchiveType::FixedIndex => { | |
824 | let index = FixedIndexReader::open(&path)?; | |
825 | let (csum, size) = index.compute_csum(); | |
826 | match manifest.verify_file(&item.filename, &csum, size) { | |
827 | Ok(_) => continue, | |
828 | Err(err) => { | |
1ec0d70d | 829 | task_log!(worker, "detected changed file {:?} - {}", path, err); |
07ad6470 DM |
830 | } |
831 | } | |
832 | } | |
833 | ArchiveType::Blob => { | |
834 | let mut tmpfile = std::fs::File::open(&path)?; | |
ba0ccc59 | 835 | let (csum, size) = sha256(&mut tmpfile)?; |
07ad6470 DM |
836 | match manifest.verify_file(&item.filename, &csum, size) { |
837 | Ok(_) => continue, | |
838 | Err(err) => { | |
1ec0d70d | 839 | task_log!(worker, "detected changed file {:?} - {}", path, err); |
07ad6470 DM |
840 | } |
841 | } | |
842 | } | |
843 | } | |
844 | } | |
845 | ||
846 | pull_single_archive( | |
847 | worker, | |
05a52d01 | 848 | reader.clone(), |
07ad6470 | 849 | snapshot, |
9a37bd6c | 850 | item, |
ebbe4958 | 851 | downloaded_chunks.clone(), |
e2956c60 FG |
852 | ) |
853 | .await?; | |
07ad6470 DM |
854 | } |
855 | ||
856 | if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { | |
857 | bail!("Atomic rename file {:?} failed - {}", manifest_name, err); | |
858 | } | |
859 | ||
1610c45a | 860 | if !client_log_name.exists() { |
05a52d01 HL |
861 | reader |
862 | .try_download_client_log(&client_log_name, worker) | |
863 | .await?; | |
864 | }; | |
fe79687c TL |
865 | snapshot |
866 | .cleanup_unreferenced_files(&manifest) | |
867 | .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?; | |
07ad6470 DM |
868 | |
869 | Ok(()) | |
870 | } | |
871 | ||
c06c1b4b FG |
872 | /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case. |
873 | /// | |
05a52d01 HL |
874 | /// The `reader` is configured to read from the source backup directory, while the |
875 | /// `snapshot` is pointing to the local datastore and target namespace. | |
876 | async fn pull_snapshot_from<'a>( | |
877 | worker: &'a WorkerTask, | |
878 | reader: Arc<dyn PullReader + 'a>, | |
879 | snapshot: &'a pbs_datastore::BackupDir, | |
e2956c60 | 880 | downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, |
07ad6470 | 881 | ) -> Result<(), Error> { |
c06c1b4b FG |
882 | let (_path, is_new, _snap_lock) = snapshot |
883 | .datastore() | |
1afce610 | 884 | .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?; |
07ad6470 DM |
885 | |
886 | if is_new { | |
1afce610 | 887 | task_log!(worker, "sync snapshot {}", snapshot.dir()); |
07ad6470 | 888 | |
c06c1b4b | 889 | if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await { |
1afce610 FG |
890 | if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir( |
891 | snapshot.backup_ns(), | |
892 | snapshot.as_ref(), | |
893 | true, | |
894 | ) { | |
1ec0d70d | 895 | task_log!(worker, "cleanup error - {}", cleanup_err); |
07ad6470 DM |
896 | } |
897 | return Err(err); | |
898 | } | |
1afce610 | 899 | task_log!(worker, "sync snapshot {} done", snapshot.dir()); |
07ad6470 | 900 | } else { |
1afce610 | 901 | task_log!(worker, "re-sync snapshot {}", snapshot.dir()); |
c06c1b4b | 902 | pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?; |
07ad6470 DM |
903 | } |
904 | ||
905 | Ok(()) | |
906 | } | |
907 | ||
40a57cfa | 908 | #[derive(PartialEq, Eq)] |
71db1615 SH |
909 | enum SkipReason { |
910 | AlreadySynced, | |
911 | TransferLast, | |
912 | } | |
913 | ||
40a57cfa FG |
914 | impl std::fmt::Display for SkipReason { |
915 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
916 | write!( | |
917 | f, | |
918 | "{}", | |
919 | match self { | |
920 | SkipReason::AlreadySynced => "older than the newest local snapshot", | |
921 | SkipReason::TransferLast => "due to transfer-last", | |
922 | } | |
923 | ) | |
924 | } | |
925 | } | |
926 | ||
d2354a16 DC |
927 | struct SkipInfo { |
928 | oldest: i64, | |
929 | newest: i64, | |
930 | count: u64, | |
71db1615 | 931 | skip_reason: SkipReason, |
d2354a16 DC |
932 | } |
933 | ||
934 | impl SkipInfo { | |
71db1615 SH |
935 | fn new(skip_reason: SkipReason) -> Self { |
936 | SkipInfo { | |
937 | oldest: i64::MAX, | |
938 | newest: i64::MIN, | |
939 | count: 0, | |
940 | skip_reason, | |
941 | } | |
942 | } | |
943 | ||
944 | fn reset(&mut self) { | |
945 | self.count = 0; | |
946 | self.oldest = i64::MAX; | |
947 | self.newest = i64::MIN; | |
948 | } | |
949 | ||
d2354a16 DC |
950 | fn update(&mut self, backup_time: i64) { |
951 | self.count += 1; | |
952 | ||
953 | if backup_time < self.oldest { | |
954 | self.oldest = backup_time; | |
955 | } | |
956 | ||
957 | if backup_time > self.newest { | |
958 | self.newest = backup_time; | |
959 | } | |
960 | } | |
961 | ||
962 | fn affected(&self) -> Result<String, Error> { | |
963 | match self.count { | |
964 | 0 => Ok(String::new()), | |
6ef1b649 | 965 | 1 => Ok(proxmox_time::epoch_to_rfc3339_utc(self.oldest)?), |
ee0ea735 TL |
966 | _ => Ok(format!( |
967 | "{} .. {}", | |
968 | proxmox_time::epoch_to_rfc3339_utc(self.oldest)?, | |
969 | proxmox_time::epoch_to_rfc3339_utc(self.newest)?, | |
970 | )), | |
d2354a16 DC |
971 | } |
972 | } | |
973 | } | |
974 | ||
975 | impl std::fmt::Display for SkipInfo { | |
976 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
977 | write!( | |
978 | f, | |
71db1615 | 979 | "skipped: {} snapshot(s) ({}) - {}", |
d2354a16 | 980 | self.count, |
71db1615 | 981 | self.affected().map_err(|_| std::fmt::Error)?, |
40a57cfa | 982 | self.skip_reason, |
d2354a16 DC |
983 | ) |
984 | } | |
985 | } | |
986 | ||
29c56859 FG |
987 | /// Pulls a group according to `params`. |
988 | /// | |
989 | /// Pulling a group consists of the following steps: | |
c06c1b4b FG |
990 | /// - Query the list of snapshots available for this group in the source namespace on the remote |
991 | /// - Sort by snapshot time | |
29c56859 FG |
992 | /// - Get last snapshot timestamp on local datastore |
993 | /// - Iterate over list of snapshots | |
29c56859 FG |
994 | /// -- pull snapshot, unless it's not finished yet or older than last local snapshot |
995 | /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote | |
996 | /// | |
05a52d01 | 997 | /// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the |
c06c1b4b FG |
998 | /// remote when querying snapshots. This allows us to interact with old remotes that don't have |
999 | /// namespace support yet. | |
1000 | /// | |
29c56859 FG |
1001 | /// Permission checks: |
1002 | /// - remote snapshot access is checked by remote (twice: query and opening the backup reader) | |
1003 | /// - local group owner is already checked by pull_store | |
aa073917 | 1004 | async fn pull_group( |
07ad6470 | 1005 | worker: &WorkerTask, |
6e9e6c7a | 1006 | params: &PullParameters, |
05a52d01 HL |
1007 | source_namespace: &BackupNamespace, |
1008 | group: &BackupGroup, | |
fc8920e3 | 1009 | progress: &mut StoreProgress, |
07ad6470 | 1010 | ) -> Result<(), Error> { |
71db1615 SH |
1011 | let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced); |
1012 | let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast); | |
d2354a16 | 1013 | |
05a52d01 HL |
1014 | let mut raw_list: Vec<BackupDir> = params |
1015 | .source | |
1016 | .list_backup_dirs(source_namespace, group, worker) | |
1017 | .await?; | |
1018 | raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time)); | |
1019 | ||
1020 | let total_amount = raw_list.len(); | |
9b67352a SH |
1021 | |
1022 | let cutoff = params | |
1023 | .transfer_last | |
1024 | .map(|count| total_amount.saturating_sub(count)) | |
1025 | .unwrap_or_default(); | |
1026 | ||
05a52d01 | 1027 | let target_ns = source_namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?; |
86f6f741 | 1028 | |
05a52d01 HL |
1029 | let mut source_snapshots = HashSet::new(); |
1030 | let last_sync_time = params | |
1031 | .target | |
1032 | .store | |
1033 | .last_successful_backup(&target_ns, group)? | |
1034 | .unwrap_or(i64::MIN); | |
1035 | ||
1036 | let list: Vec<BackupDir> = raw_list | |
1037 | .into_iter() | |
1038 | .enumerate() | |
1039 | .filter(|&(pos, ref dir)| { | |
1040 | source_snapshots.insert(dir.time); | |
1041 | if last_sync_time > dir.time { | |
1042 | already_synced_skip_info.update(dir.time); | |
1043 | return false; | |
1044 | } else if already_synced_skip_info.count > 0 { | |
1045 | task_log!(worker, "{}", already_synced_skip_info); | |
1046 | already_synced_skip_info.reset(); | |
1047 | return true; | |
1048 | } | |
0081903f | 1049 | |
05a52d01 HL |
1050 | if pos < cutoff && last_sync_time != dir.time { |
1051 | transfer_last_skip_info.update(dir.time); | |
1052 | return false; | |
1053 | } else if transfer_last_skip_info.count > 0 { | |
1054 | task_log!(worker, "{}", transfer_last_skip_info); | |
1055 | transfer_last_skip_info.reset(); | |
1056 | } | |
1057 | true | |
1058 | }) | |
1059 | .map(|(_, dir)| dir) | |
1060 | .collect(); | |
07ad6470 | 1061 | |
05a52d01 HL |
1062 | // start with 65536 chunks (up to 256 GiB) |
1063 | let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64))); | |
07ad6470 | 1064 | |
05a52d01 | 1065 | progress.group_snapshots = list.len() as u64; |
07ad6470 | 1066 | |
05a52d01 HL |
1067 | for (pos, from_snapshot) in list.into_iter().enumerate() { |
1068 | let to_snapshot = params | |
1069 | .target | |
1070 | .store | |
1071 | .backup_dir(target_ns.clone(), from_snapshot.clone())?; | |
c06c1b4b | 1072 | |
05a52d01 HL |
1073 | let reader = params |
1074 | .source | |
1075 | .reader(source_namespace, &from_snapshot) | |
1076 | .await?; | |
1077 | let result = | |
1078 | pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await; | |
7b8aa893 | 1079 | |
fc8920e3 | 1080 | progress.done_snapshots = pos as u64 + 1; |
1ec0d70d | 1081 | task_log!(worker, "percentage done: {}", progress); |
7b8aa893 DM |
1082 | |
1083 | result?; // stop on error | |
07ad6470 DM |
1084 | } |
1085 | ||
6e9e6c7a | 1086 | if params.remove_vanished { |
05a52d01 HL |
1087 | let group = params |
1088 | .target | |
1089 | .store | |
1090 | .backup_group(target_ns.clone(), group.clone()); | |
6da20161 | 1091 | let local_list = group.list_backups()?; |
07ad6470 | 1092 | for info in local_list { |
1afce610 | 1093 | let snapshot = info.backup_dir; |
05a52d01 | 1094 | if source_snapshots.contains(&snapshot.backup_time()) { |
e2956c60 FG |
1095 | continue; |
1096 | } | |
1afce610 | 1097 | if snapshot.is_protected() { |
34339261 DC |
1098 | task_log!( |
1099 | worker, | |
e3ea5770 | 1100 | "don't delete vanished snapshot {} (protected)", |
1afce610 | 1101 | snapshot.dir() |
34339261 DC |
1102 | ); |
1103 | continue; | |
1104 | } | |
1afce610 | 1105 | task_log!(worker, "delete vanished snapshot {}", snapshot.dir()); |
db87d93e | 1106 | params |
05a52d01 | 1107 | .target |
db87d93e | 1108 | .store |
1afce610 | 1109 | .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?; |
07ad6470 DM |
1110 | } |
1111 | } | |
1112 | ||
1113 | Ok(()) | |
1114 | } | |
1115 | ||
abd82485 | 1116 | fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> { |
c06c1b4b | 1117 | let mut created = false; |
05a52d01 | 1118 | let store_ns_str = print_store_and_ns(params.target.store.name(), ns); |
c06c1b4b | 1119 | |
05a52d01 HL |
1120 | if !ns.is_root() && !params.target.store.namespace_path(ns).exists() { |
1121 | check_ns_modification_privs(params.target.store.name(), ns, ¶ms.owner) | |
77bd14f6 FG |
1122 | .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?; |
1123 | ||
ea2e91e5 FG |
1124 | let name = match ns.components().last() { |
1125 | Some(name) => name.to_owned(), | |
1126 | None => { | |
1127 | bail!("Failed to determine last component of namespace."); | |
c06c1b4b | 1128 | } |
ea2e91e5 FG |
1129 | }; |
1130 | ||
05a52d01 | 1131 | if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) { |
abd82485 | 1132 | bail!("sync into {store_ns_str} failed - namespace creation failed: {err}"); |
c06c1b4b | 1133 | } |
ea2e91e5 | 1134 | created = true; |
c06c1b4b FG |
1135 | } |
1136 | ||
abd82485 | 1137 | check_ns_privs( |
05a52d01 | 1138 | params.target.store.name(), |
abd82485 FG |
1139 | ns, |
1140 | ¶ms.owner, | |
1141 | PRIV_DATASTORE_BACKUP, | |
1142 | ) | |
1143 | .map_err(|err| format_err!("sync into {store_ns_str} not allowed - {err}"))?; | |
c06c1b4b FG |
1144 | |
1145 | Ok(created) | |
1146 | } | |
1147 | ||
1148 | fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> { | |
05a52d01 | 1149 | check_ns_modification_privs(params.target.store.name(), local_ns, ¶ms.owner) |
77bd14f6 | 1150 | .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?; |
ea2e91e5 | 1151 | |
05a52d01 HL |
1152 | params |
1153 | .target | |
1154 | .store | |
1155 | .remove_namespace_recursive(local_ns, true) | |
c06c1b4b FG |
1156 | } |
1157 | ||
1158 | fn check_and_remove_vanished_ns( | |
1159 | worker: &WorkerTask, | |
1160 | params: &PullParameters, | |
1161 | synced_ns: HashSet<BackupNamespace>, | |
1162 | ) -> Result<bool, Error> { | |
1163 | let mut errors = false; | |
1164 | let user_info = CachedUserInfo::new()?; | |
1165 | ||
87be232d FG |
1166 | // clamp like remote does so that we don't list more than we can ever have synced. |
1167 | let max_depth = params | |
1168 | .max_depth | |
05a52d01 | 1169 | .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth()); |
87be232d | 1170 | |
c06c1b4b | 1171 | let mut local_ns_list: Vec<BackupNamespace> = params |
05a52d01 | 1172 | .target |
c06c1b4b | 1173 | .store |
05a52d01 | 1174 | .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))? |
c06c1b4b | 1175 | .filter(|ns| { |
abd82485 | 1176 | let user_privs = |
05a52d01 | 1177 | user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.target.store.name())); |
c06c1b4b FG |
1178 | user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0 |
1179 | }) | |
1180 | .collect(); | |
1181 | ||
1182 | // children first! | |
1183 | local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len())); | |
1184 | ||
1185 | for local_ns in local_ns_list { | |
05a52d01 | 1186 | if local_ns == params.target.ns { |
c06c1b4b FG |
1187 | continue; |
1188 | } | |
1189 | ||
1190 | if synced_ns.contains(&local_ns) { | |
1191 | continue; | |
1192 | } | |
1193 | ||
1194 | if local_ns.is_root() { | |
1195 | continue; | |
1196 | } | |
1197 | match check_and_remove_ns(params, &local_ns) { | |
1198 | Ok(true) => task_log!(worker, "Removed namespace {}", local_ns), | |
1199 | Ok(false) => task_log!( | |
1200 | worker, | |
1201 | "Did not remove namespace {} - protected snapshots remain", | |
1202 | local_ns | |
1203 | ), | |
1204 | Err(err) => { | |
1205 | task_log!(worker, "Failed to remove namespace {} - {}", local_ns, err); | |
1206 | errors = true; | |
1207 | } | |
1208 | } | |
1209 | } | |
1210 | ||
1211 | Ok(errors) | |
1212 | } | |
1213 | ||
29c56859 FG |
1214 | /// Pulls a store according to `params`. |
1215 | /// | |
1216 | /// Pulling a store consists of the following steps: | |
c06c1b4b FG |
1217 | /// - Query list of namespaces on the remote |
1218 | /// - Iterate list | |
1219 | /// -- create sub-NS if needed (and allowed) | |
1220 | /// -- attempt to pull each NS in turn | |
1221 | /// - (remove_vanished && max_depth > 0) remove sub-NS which are not or no longer available on the remote | |
1222 | /// | |
1223 | /// Backwards compat: if the remote namespace is `/` and recursion is disabled, no namespace is | |
1224 | /// passed to the remote at all to allow pulling from remotes which have no notion of namespaces. | |
29c56859 FG |
1225 | /// |
1226 | /// Permission checks: | |
c06c1b4b FG |
1227 | /// - access to local datastore, namespace anchor and remote entry need to be checked at call site |
1228 | /// - remote namespaces are filtered by remote | |
1229 | /// - creation and removal of sub-NS checked here | |
1230 | /// - access to sub-NS checked here | |
5b6cb51d | 1231 | pub(crate) async fn pull_store( |
07ad6470 | 1232 | worker: &WorkerTask, |
d9aad37f | 1233 | mut params: PullParameters, |
07ad6470 | 1234 | ) -> Result<(), Error> { |
07ad6470 | 1235 | // explicit create shared lock to prevent GC on newly created chunks |
05a52d01 | 1236 | let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?; |
7a3e777d | 1237 | let mut errors = false; |
07ad6470 | 1238 | |
7a3e777d | 1239 | let old_max_depth = params.max_depth; |
05a52d01 HL |
1240 | let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) { |
1241 | vec![params.source.get_ns()] // backwards compat - don't query remote namespaces! | |
c06c1b4b | 1242 | } else { |
05a52d01 HL |
1243 | params |
1244 | .source | |
1245 | .list_namespaces(&mut params.max_depth, worker) | |
1246 | .await? | |
c06c1b4b | 1247 | }; |
05a52d01 HL |
1248 | |
1249 | let ns_layers_to_be_pulled = namespaces | |
1250 | .iter() | |
1251 | .map(BackupNamespace::depth) | |
1252 | .max() | |
1253 | .map_or(0, |v| v - params.source.get_ns().depth()); | |
1254 | let target_depth = params.target.ns.depth(); | |
1255 | ||
1256 | if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH { | |
1257 | bail!( | |
1258 | "Syncing would exceed max allowed namespace depth. ({}+{} > {})", | |
1259 | ns_layers_to_be_pulled, | |
1260 | target_depth, | |
1261 | MAX_NAMESPACE_DEPTH | |
1262 | ); | |
1263 | } | |
1264 | ||
7a3e777d | 1265 | errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode |
05a52d01 | 1266 | namespaces.sort_unstable_by_key(|a| a.name_len()); |
c06c1b4b FG |
1267 | |
1268 | let (mut groups, mut snapshots) = (0, 0); | |
1269 | let mut synced_ns = HashSet::with_capacity(namespaces.len()); | |
c06c1b4b FG |
1270 | |
1271 | for namespace in namespaces { | |
4cc4ea64 | 1272 | let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace); |
abd82485 | 1273 | |
05a52d01 HL |
1274 | let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?; |
1275 | let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns); | |
c06c1b4b FG |
1276 | |
1277 | task_log!(worker, "----"); | |
1278 | task_log!( | |
1279 | worker, | |
1280 | "Syncing {} into {}", | |
abd82485 FG |
1281 | source_store_ns_str, |
1282 | target_store_ns_str | |
c06c1b4b FG |
1283 | ); |
1284 | ||
1285 | synced_ns.insert(target_ns.clone()); | |
1286 | ||
abd82485 | 1287 | match check_and_create_ns(¶ms, &target_ns) { |
c06c1b4b FG |
1288 | Ok(true) => task_log!(worker, "Created namespace {}", target_ns), |
1289 | Ok(false) => {} | |
1290 | Err(err) => { | |
1291 | task_log!( | |
1292 | worker, | |
1293 | "Cannot sync {} into {} - {}", | |
abd82485 FG |
1294 | source_store_ns_str, |
1295 | target_store_ns_str, | |
c06c1b4b FG |
1296 | err, |
1297 | ); | |
1298 | errors = true; | |
1299 | continue; | |
1300 | } | |
1301 | } | |
1302 | ||
05a52d01 | 1303 | match pull_ns(worker, &namespace, &mut params).await { |
c06c1b4b FG |
1304 | Ok((ns_progress, ns_errors)) => { |
1305 | errors |= ns_errors; | |
1306 | ||
b9310489 | 1307 | if params.max_depth != Some(0) { |
c06c1b4b FG |
1308 | groups += ns_progress.done_groups; |
1309 | snapshots += ns_progress.done_snapshots; | |
1310 | task_log!( | |
1311 | worker, | |
1312 | "Finished syncing namespace {}, current progress: {} groups, {} snapshots", | |
1313 | namespace, | |
1314 | groups, | |
1315 | snapshots, | |
1316 | ); | |
1317 | } | |
1318 | } | |
1319 | Err(err) => { | |
1320 | errors = true; | |
1321 | task_log!( | |
1322 | worker, | |
1323 | "Encountered errors while syncing namespace {} - {}", | |
05a52d01 | 1324 | &namespace, |
c06c1b4b FG |
1325 | err, |
1326 | ); | |
1327 | } | |
1328 | }; | |
1329 | } | |
1330 | ||
1331 | if params.remove_vanished { | |
d9aad37f | 1332 | errors |= check_and_remove_vanished_ns(worker, ¶ms, synced_ns)?; |
c06c1b4b FG |
1333 | } |
1334 | ||
1335 | if errors { | |
1336 | bail!("sync failed with some errors."); | |
1337 | } | |
1338 | ||
1339 | Ok(()) | |
1340 | } | |
1341 | ||
1342 | /// Pulls a namespace according to `params`. | |
1343 | /// | |
1344 | /// Pulling a namespace consists of the following steps: | |
1345 | /// - Query list of groups on the remote (in `source_ns`) | |
1346 | /// - Filter list according to configured group filters | |
1347 | /// - Iterate list and attempt to pull each group in turn | |
1348 | /// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are | |
1349 | /// not or no longer available on the remote | |
1350 | /// | |
1351 | /// Permission checks: | |
1352 | /// - remote namespaces are filtered by remote | |
1353 | /// - owner check for vanished groups done here | |
5b6cb51d | 1354 | pub(crate) async fn pull_ns( |
c06c1b4b | 1355 | worker: &WorkerTask, |
05a52d01 HL |
1356 | namespace: &BackupNamespace, |
1357 | params: &mut PullParameters, | |
c06c1b4b | 1358 | ) -> Result<(StoreProgress, bool), Error> { |
05a52d01 | 1359 | let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, ¶ms.owner).await?; |
07ad6470 | 1360 | |
71e53463 | 1361 | let total_count = list.len(); |
07ad6470 | 1362 | list.sort_unstable_by(|a, b| { |
05a52d01 | 1363 | let type_order = a.ty.cmp(&b.ty); |
07ad6470 | 1364 | if type_order == std::cmp::Ordering::Equal { |
05a52d01 | 1365 | a.id.cmp(&b.id) |
07ad6470 DM |
1366 | } else { |
1367 | type_order | |
1368 | } | |
1369 | }); | |
1370 | ||
05a52d01 | 1371 | let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool { |
ee0ea735 | 1372 | filters.iter().any(|filter| group.matches(filter)) |
71e53463 FG |
1373 | }; |
1374 | ||
71e53463 FG |
1375 | let list = if let Some(ref group_filter) = ¶ms.group_filter { |
1376 | let unfiltered_count = list.len(); | |
05a52d01 | 1377 | let list: Vec<BackupGroup> = list |
71e53463 | 1378 | .into_iter() |
ee0ea735 | 1379 | .filter(|group| apply_filters(group, group_filter)) |
71e53463 | 1380 | .collect(); |
ee0ea735 TL |
1381 | task_log!( |
1382 | worker, | |
1383 | "found {} groups to sync (out of {} total)", | |
1384 | list.len(), | |
1385 | unfiltered_count | |
1386 | ); | |
71e53463 FG |
1387 | list |
1388 | } else { | |
1389 | task_log!(worker, "found {} groups to sync", total_count); | |
1390 | list | |
1391 | }; | |
1392 | ||
07ad6470 DM |
1393 | let mut errors = false; |
1394 | ||
05a52d01 | 1395 | let mut new_groups = HashSet::new(); |
e2e7560d FG |
1396 | for group in list.iter() { |
1397 | new_groups.insert(group.clone()); | |
07ad6470 DM |
1398 | } |
1399 | ||
fc8920e3 FG |
1400 | let mut progress = StoreProgress::new(list.len() as u64); |
1401 | ||
05a52d01 HL |
1402 | let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?; |
1403 | ||
e2e7560d | 1404 | for (done, group) in list.into_iter().enumerate() { |
fc8920e3 FG |
1405 | progress.done_groups = done as u64; |
1406 | progress.done_snapshots = 0; | |
1407 | progress.group_snapshots = 0; | |
7b8aa893 | 1408 | |
133d718f WB |
1409 | let (owner, _lock_guard) = |
1410 | match params | |
05a52d01 | 1411 | .target |
133d718f | 1412 | .store |
c06c1b4b | 1413 | .create_locked_backup_group(&target_ns, &group, ¶ms.owner) |
133d718f WB |
1414 | { |
1415 | Ok(result) => result, | |
1416 | Err(err) => { | |
1417 | task_log!( | |
1418 | worker, | |
1419 | "sync group {} failed - group lock failed: {}", | |
1420 | &group, | |
1421 | err | |
1422 | ); | |
05a52d01 HL |
1423 | errors = true; |
1424 | // do not stop here, instead continue | |
1425 | task_log!(worker, "create_locked_backup_group failed"); | |
133d718f WB |
1426 | continue; |
1427 | } | |
1428 | }; | |
30f73fa2 | 1429 | |
07ad6470 | 1430 | // permission check |
6e9e6c7a | 1431 | if params.owner != owner { |
e2956c60 | 1432 | // only the owner is allowed to create additional snapshots |
1ec0d70d DM |
1433 | task_log!( |
1434 | worker, | |
e2e7560d | 1435 | "sync group {} failed - owner check failed ({} != {})", |
ee0ea735 TL |
1436 | &group, |
1437 | params.owner, | |
1438 | owner | |
1ec0d70d | 1439 | ); |
7b8aa893 | 1440 | errors = true; // do not stop here, instead continue |
05a52d01 | 1441 | } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await |
c06c1b4b | 1442 | { |
ee0ea735 | 1443 | task_log!(worker, "sync group {} failed - {}", &group, err,); |
20813274 | 1444 | errors = true; // do not stop here, instead continue |
07ad6470 DM |
1445 | } |
1446 | } | |
1447 | ||
6e9e6c7a | 1448 | if params.remove_vanished { |
6ef1b649 | 1449 | let result: Result<(), Error> = proxmox_lang::try_block!({ |
05a52d01 | 1450 | for local_group in params.target.store.iter_backup_groups(target_ns.clone())? { |
249dde8b | 1451 | let local_group = local_group?; |
e13303fc FG |
1452 | let local_group = local_group.group(); |
1453 | if new_groups.contains(local_group) { | |
e2956c60 FG |
1454 | continue; |
1455 | } | |
05a52d01 | 1456 | let owner = params.target.store.get_owner(&target_ns, local_group)?; |
df768ebe FG |
1457 | if check_backup_owner(&owner, ¶ms.owner).is_err() { |
1458 | continue; | |
1459 | } | |
71e53463 | 1460 | if let Some(ref group_filter) = ¶ms.group_filter { |
e13303fc | 1461 | if !apply_filters(local_group, group_filter) { |
71e53463 FG |
1462 | continue; |
1463 | } | |
1464 | } | |
e13303fc | 1465 | task_log!(worker, "delete vanished group '{local_group}'",); |
05a52d01 HL |
1466 | match params |
1467 | .target | |
1468 | .store | |
1469 | .remove_backup_group(&target_ns, local_group) | |
1470 | { | |
ee0ea735 | 1471 | Ok(true) => {} |
34339261 | 1472 | Ok(false) => { |
ee0ea735 TL |
1473 | task_log!( |
1474 | worker, | |
1475 | "kept some protected snapshots of group '{}'", | |
1476 | local_group | |
1477 | ); | |
1478 | } | |
34339261 DC |
1479 | Err(err) => { |
1480 | task_log!(worker, "{}", err); | |
1481 | errors = true; | |
1482 | } | |
07ad6470 DM |
1483 | } |
1484 | } | |
1485 | Ok(()) | |
1486 | }); | |
1487 | if let Err(err) = result { | |
1ec0d70d | 1488 | task_log!(worker, "error during cleanup: {}", err); |
07ad6470 DM |
1489 | errors = true; |
1490 | }; | |
1491 | } | |
1492 | ||
c06c1b4b | 1493 | Ok((progress, errors)) |
07ad6470 | 1494 | } |