]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/datastore.rs
unbreak build
[proxmox-backup.git] / src / backup / datastore.rs
1 use std::collections::{HashSet, HashMap};
2 use std::io::{self, Write};
3 use std::path::{Path, PathBuf};
4 use std::sync::{Arc, Mutex};
5 use std::convert::TryFrom;
6 use std::time::Duration;
7 use std::fs::File;
8
9 use anyhow::{bail, format_err, Error};
10 use lazy_static::lazy_static;
11
12 use proxmox::tools::fs::{replace_file, CreateOptions, open_file_locked};
13
14 use super::backup_info::{BackupGroup, BackupDir};
15 use super::chunk_store::ChunkStore;
16 use super::dynamic_index::{DynamicIndexReader, DynamicIndexWriter};
17 use super::fixed_index::{FixedIndexReader, FixedIndexWriter};
18 use super::manifest::{MANIFEST_BLOB_NAME, MANIFEST_LOCK_NAME, CLIENT_LOG_BLOB_NAME, BackupManifest};
19 use super::index::*;
20 use super::{DataBlob, ArchiveType, archive_type};
21 use crate::config::datastore::{self, DataStoreConfig};
22 use crate::task::TaskState;
23 use crate::tools;
24 use crate::tools::format::HumanByte;
25 use crate::tools::fs::{lock_dir_noblock, DirLockGuard};
26 use crate::api2::types::{GarbageCollectionStatus, Userid};
27 use crate::server::UPID;
28
29 lazy_static! {
30 static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
31 }
32
33 /// Datastore Management
34 ///
35 /// A Datastore can store severals backups, and provides the
36 /// management interface for backup.
37 pub struct DataStore {
38 chunk_store: Arc<ChunkStore>,
39 gc_mutex: Mutex<bool>,
40 last_gc_status: Mutex<GarbageCollectionStatus>,
41 }
42
43 impl DataStore {
44
45 pub fn lookup_datastore(name: &str) -> Result<Arc<DataStore>, Error> {
46
47 let (config, _digest) = datastore::config()?;
48 let config: datastore::DataStoreConfig = config.lookup("datastore", name)?;
49 let path = PathBuf::from(&config.path);
50
51 let mut map = DATASTORE_MAP.lock().unwrap();
52
53 if let Some(datastore) = map.get(name) {
54 // Compare Config - if changed, create new Datastore object!
55 if datastore.chunk_store.base == path {
56 return Ok(datastore.clone());
57 }
58 }
59
60 let datastore = DataStore::open_with_path(name, &path, config)?;
61
62 let datastore = Arc::new(datastore);
63 map.insert(name.to_string(), datastore.clone());
64
65 Ok(datastore)
66 }
67
68 fn open_with_path(store_name: &str, path: &Path, _config: DataStoreConfig) -> Result<Self, Error> {
69 let chunk_store = ChunkStore::open(store_name, path)?;
70
71 let gc_status = GarbageCollectionStatus::default();
72
73 Ok(Self {
74 chunk_store: Arc::new(chunk_store),
75 gc_mutex: Mutex::new(false),
76 last_gc_status: Mutex::new(gc_status),
77 })
78 }
79
80 pub fn get_chunk_iterator(
81 &self,
82 ) -> Result<
83 impl Iterator<Item = (Result<tools::fs::ReadDirEntry, Error>, usize, bool)>,
84 Error
85 > {
86 self.chunk_store.get_chunk_iterator()
87 }
88
89 pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> {
90
91 let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
92
93 Ok(index)
94 }
95
96 pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> {
97
98 let full_path = self.chunk_store.relative_path(filename.as_ref());
99
100 let index = FixedIndexReader::open(&full_path)?;
101
102 Ok(index)
103 }
104
105 pub fn create_dynamic_writer<P: AsRef<Path>>(
106 &self, filename: P,
107 ) -> Result<DynamicIndexWriter, Error> {
108
109 let index = DynamicIndexWriter::create(
110 self.chunk_store.clone(), filename.as_ref())?;
111
112 Ok(index)
113 }
114
115 pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
116
117 let full_path = self.chunk_store.relative_path(filename.as_ref());
118
119 let index = DynamicIndexReader::open(&full_path)?;
120
121 Ok(index)
122 }
123
124 pub fn open_index<P>(&self, filename: P) -> Result<Box<dyn IndexFile + Send>, Error>
125 where
126 P: AsRef<Path>,
127 {
128 let filename = filename.as_ref();
129 let out: Box<dyn IndexFile + Send> =
130 match archive_type(filename)? {
131 ArchiveType::DynamicIndex => Box::new(self.open_dynamic_reader(filename)?),
132 ArchiveType::FixedIndex => Box::new(self.open_fixed_reader(filename)?),
133 _ => bail!("cannot open index file of unknown type: {:?}", filename),
134 };
135 Ok(out)
136 }
137
138 pub fn name(&self) -> &str {
139 self.chunk_store.name()
140 }
141
142 pub fn base_path(&self) -> PathBuf {
143 self.chunk_store.base_path()
144 }
145
146 /// Cleanup a backup directory
147 ///
148 /// Removes all files not mentioned in the manifest.
149 pub fn cleanup_backup_dir(&self, backup_dir: &BackupDir, manifest: &BackupManifest
150 ) -> Result<(), Error> {
151
152 let mut full_path = self.base_path();
153 full_path.push(backup_dir.relative_path());
154
155 let mut wanted_files = HashSet::new();
156 wanted_files.insert(MANIFEST_BLOB_NAME.to_string());
157 wanted_files.insert(CLIENT_LOG_BLOB_NAME.to_string());
158 manifest.files().iter().for_each(|item| { wanted_files.insert(item.filename.clone()); });
159
160 for item in tools::fs::read_subdir(libc::AT_FDCWD, &full_path)? {
161 if let Ok(item) = item {
162 if let Some(file_type) = item.file_type() {
163 if file_type != nix::dir::Type::File { continue; }
164 }
165 let file_name = item.file_name().to_bytes();
166 if file_name == b"." || file_name == b".." { continue; };
167
168 if let Ok(name) = std::str::from_utf8(file_name) {
169 if wanted_files.contains(name) { continue; }
170 }
171 println!("remove unused file {:?}", item.file_name());
172 let dirfd = item.parent_fd();
173 let _res = unsafe { libc::unlinkat(dirfd, item.file_name().as_ptr(), 0) };
174 }
175 }
176
177 Ok(())
178 }
179
180 /// Returns the absolute path for a backup_group
181 pub fn group_path(&self, backup_group: &BackupGroup) -> PathBuf {
182 let mut full_path = self.base_path();
183 full_path.push(backup_group.group_path());
184 full_path
185 }
186
187 /// Returns the absolute path for backup_dir
188 pub fn snapshot_path(&self, backup_dir: &BackupDir) -> PathBuf {
189 let mut full_path = self.base_path();
190 full_path.push(backup_dir.relative_path());
191 full_path
192 }
193
194 /// Remove a complete backup group including all snapshots
195 pub fn remove_backup_group(&self, backup_group: &BackupGroup) -> Result<(), Error> {
196
197 let full_path = self.group_path(backup_group);
198
199 let _guard = tools::fs::lock_dir_noblock(&full_path, "backup group", "possible running backup")?;
200
201 log::info!("removing backup group {:?}", full_path);
202
203 // remove all individual backup dirs first to ensure nothing is using them
204 for snap in backup_group.list_backups(&self.base_path())? {
205 self.remove_backup_dir(&snap.backup_dir, false)?;
206 }
207
208 // no snapshots left, we can now safely remove the empty folder
209 std::fs::remove_dir_all(&full_path)
210 .map_err(|err| {
211 format_err!(
212 "removing backup group directory {:?} failed - {}",
213 full_path,
214 err,
215 )
216 })?;
217
218 Ok(())
219 }
220
221 /// Remove a backup directory including all content
222 pub fn remove_backup_dir(&self, backup_dir: &BackupDir, force: bool) -> Result<(), Error> {
223
224 let full_path = self.snapshot_path(backup_dir);
225
226 let (_guard, _manifest_guard);
227 if !force {
228 _guard = lock_dir_noblock(&full_path, "snapshot", "possibly running or in use")?;
229 _manifest_guard = self.lock_manifest(backup_dir);
230 }
231
232 log::info!("removing backup snapshot {:?}", full_path);
233 std::fs::remove_dir_all(&full_path)
234 .map_err(|err| {
235 format_err!(
236 "removing backup snapshot {:?} failed - {}",
237 full_path,
238 err,
239 )
240 })?;
241
242 Ok(())
243 }
244
245 /// Returns the time of the last successful backup
246 ///
247 /// Or None if there is no backup in the group (or the group dir does not exist).
248 pub fn last_successful_backup(&self, backup_group: &BackupGroup) -> Result<Option<i64>, Error> {
249 let base_path = self.base_path();
250 let mut group_path = base_path.clone();
251 group_path.push(backup_group.group_path());
252
253 if group_path.exists() {
254 backup_group.last_successful_backup(&base_path)
255 } else {
256 Ok(None)
257 }
258 }
259
260 /// Returns the backup owner.
261 ///
262 /// The backup owner is the user who first created the backup group.
263 pub fn get_owner(&self, backup_group: &BackupGroup) -> Result<Userid, Error> {
264 let mut full_path = self.base_path();
265 full_path.push(backup_group.group_path());
266 full_path.push("owner");
267 let owner = proxmox::tools::fs::file_read_firstline(full_path)?;
268 Ok(owner.trim_end().parse()?) // remove trailing newline
269 }
270
271 /// Set the backup owner.
272 pub fn set_owner(
273 &self,
274 backup_group: &BackupGroup,
275 userid: &Userid,
276 force: bool,
277 ) -> Result<(), Error> {
278 let mut path = self.base_path();
279 path.push(backup_group.group_path());
280 path.push("owner");
281
282 let mut open_options = std::fs::OpenOptions::new();
283 open_options.write(true);
284 open_options.truncate(true);
285
286 if force {
287 open_options.create(true);
288 } else {
289 open_options.create_new(true);
290 }
291
292 let mut file = open_options.open(&path)
293 .map_err(|err| format_err!("unable to create owner file {:?} - {}", path, err))?;
294
295 writeln!(file, "{}", userid)
296 .map_err(|err| format_err!("unable to write owner file {:?} - {}", path, err))?;
297
298 Ok(())
299 }
300
301 /// Create (if it does not already exists) and lock a backup group
302 ///
303 /// And set the owner to 'userid'. If the group already exists, it returns the
304 /// current owner (instead of setting the owner).
305 ///
306 /// This also acquires an exclusive lock on the directory and returns the lock guard.
307 pub fn create_locked_backup_group(
308 &self,
309 backup_group: &BackupGroup,
310 userid: &Userid,
311 ) -> Result<(Userid, DirLockGuard), Error> {
312 // create intermediate path first:
313 let base_path = self.base_path();
314
315 let mut full_path = base_path.clone();
316 full_path.push(backup_group.backup_type());
317 std::fs::create_dir_all(&full_path)?;
318
319 full_path.push(backup_group.backup_id());
320
321 // create the last component now
322 match std::fs::create_dir(&full_path) {
323 Ok(_) => {
324 let guard = lock_dir_noblock(&full_path, "backup group", "another backup is already running")?;
325 self.set_owner(backup_group, userid, false)?;
326 let owner = self.get_owner(backup_group)?; // just to be sure
327 Ok((owner, guard))
328 }
329 Err(ref err) if err.kind() == io::ErrorKind::AlreadyExists => {
330 let guard = lock_dir_noblock(&full_path, "backup group", "another backup is already running")?;
331 let owner = self.get_owner(backup_group)?; // just to be sure
332 Ok((owner, guard))
333 }
334 Err(err) => bail!("unable to create backup group {:?} - {}", full_path, err),
335 }
336 }
337
338 /// Creates a new backup snapshot inside a BackupGroup
339 ///
340 /// The BackupGroup directory needs to exist.
341 pub fn create_locked_backup_dir(&self, backup_dir: &BackupDir)
342 -> Result<(PathBuf, bool, DirLockGuard), Error>
343 {
344 let relative_path = backup_dir.relative_path();
345 let mut full_path = self.base_path();
346 full_path.push(&relative_path);
347
348 let lock = ||
349 lock_dir_noblock(&full_path, "snapshot", "internal error - tried creating snapshot that's already in use");
350
351 match std::fs::create_dir(&full_path) {
352 Ok(_) => Ok((relative_path, true, lock()?)),
353 Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => Ok((relative_path, false, lock()?)),
354 Err(e) => Err(e.into())
355 }
356 }
357
358 pub fn list_images(&self) -> Result<Vec<PathBuf>, Error> {
359 let base = self.base_path();
360
361 let mut list = vec![];
362
363 use walkdir::WalkDir;
364
365 let walker = WalkDir::new(&base).same_file_system(true).into_iter();
366
367 // make sure we skip .chunks (and other hidden files to keep it simple)
368 fn is_hidden(entry: &walkdir::DirEntry) -> bool {
369 entry.file_name()
370 .to_str()
371 .map(|s| s.starts_with("."))
372 .unwrap_or(false)
373 }
374 let handle_entry_err = |err: walkdir::Error| {
375 if let Some(inner) = err.io_error() {
376 let path = err.path().unwrap_or(Path::new(""));
377 match inner.kind() {
378 io::ErrorKind::PermissionDenied => {
379 // only allow to skip ext4 fsck directory, avoid GC if, for example,
380 // a user got file permissions wrong on datastore rsync to new server
381 if err.depth() > 1 || !path.ends_with("lost+found") {
382 bail!("cannot continue garbage-collection safely, permission denied on: {}", path.display())
383 }
384 },
385 _ => bail!("unexpected error on datastore traversal: {} - {}", inner, path.display()),
386 }
387 }
388 Ok(())
389 };
390 for entry in walker.filter_entry(|e| !is_hidden(e)) {
391 let path = match entry {
392 Ok(entry) => entry.into_path(),
393 Err(err) => {
394 handle_entry_err(err)?;
395 continue
396 },
397 };
398 if let Ok(archive_type) = archive_type(&path) {
399 if archive_type == ArchiveType::FixedIndex || archive_type == ArchiveType::DynamicIndex {
400 list.push(path);
401 }
402 }
403 }
404
405 Ok(list)
406 }
407
408 // mark chunks used by ``index`` as used
409 fn index_mark_used_chunks<I: IndexFile>(
410 &self,
411 index: I,
412 file_name: &Path, // only used for error reporting
413 status: &mut GarbageCollectionStatus,
414 worker: &dyn TaskState,
415 ) -> Result<(), Error> {
416
417 status.index_file_count += 1;
418 status.index_data_bytes += index.index_bytes();
419
420 for pos in 0..index.index_count() {
421 worker.check_abort()?;
422 tools::fail_on_shutdown()?;
423 let digest = index.index_digest(pos).unwrap();
424 if let Err(err) = self.chunk_store.touch_chunk(digest) {
425 crate::task_warn!(
426 worker,
427 "warning: unable to access chunk {}, required by {:?} - {}",
428 proxmox::tools::digest_to_hex(digest),
429 file_name,
430 err,
431 );
432 }
433 }
434 Ok(())
435 }
436
437 fn mark_used_chunks(
438 &self,
439 status: &mut GarbageCollectionStatus,
440 worker: &dyn TaskState,
441 ) -> Result<(), Error> {
442
443 let image_list = self.list_images()?;
444
445 let image_count = image_list.len();
446
447 let mut done = 0;
448
449 let mut last_percentage: usize = 0;
450
451 for path in image_list {
452
453 worker.check_abort()?;
454 tools::fail_on_shutdown()?;
455
456 let full_path = self.chunk_store.relative_path(&path);
457 match std::fs::File::open(&full_path) {
458 Ok(file) => {
459 if let Ok(archive_type) = archive_type(&path) {
460 if archive_type == ArchiveType::FixedIndex {
461 let index = FixedIndexReader::new(file)?;
462 self.index_mark_used_chunks(index, &path, status, worker)?;
463 } else if archive_type == ArchiveType::DynamicIndex {
464 let index = DynamicIndexReader::new(file)?;
465 self.index_mark_used_chunks(index, &path, status, worker)?;
466 }
467 }
468 }
469 Err(err) => {
470 if err.kind() == std::io::ErrorKind::NotFound {
471 // simply ignore vanished files
472 } else {
473 return Err(err.into());
474 }
475 }
476 }
477 done += 1;
478
479 let percentage = done*100/image_count;
480 if percentage > last_percentage {
481 crate::task_log!(
482 worker,
483 "percentage done: phase1 {}% ({} of {} index files)",
484 percentage,
485 done,
486 image_count,
487 );
488 last_percentage = percentage;
489 }
490 }
491
492 Ok(())
493 }
494
495 pub fn last_gc_status(&self) -> GarbageCollectionStatus {
496 self.last_gc_status.lock().unwrap().clone()
497 }
498
499 pub fn garbage_collection_running(&self) -> bool {
500 if let Ok(_) = self.gc_mutex.try_lock() { false } else { true }
501 }
502
503 pub fn garbage_collection(&self, worker: &dyn TaskState, upid: &UPID) -> Result<(), Error> {
504
505 if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
506
507 // avoids that we run GC if an old daemon process has still a
508 // running backup writer, which is not save as we have no "oldest
509 // writer" information and thus no safe atime cutoff
510 let _exclusive_lock = self.chunk_store.try_exclusive_lock()?;
511
512 let phase1_start_time = proxmox::tools::time::epoch_i64();
513 let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
514
515 let mut gc_status = GarbageCollectionStatus::default();
516 gc_status.upid = Some(upid.to_string());
517
518 crate::task_log!(worker, "Start GC phase1 (mark used chunks)");
519
520 self.mark_used_chunks(&mut gc_status, worker)?;
521
522 crate::task_log!(worker, "Start GC phase2 (sweep unused chunks)");
523 self.chunk_store.sweep_unused_chunks(
524 oldest_writer,
525 phase1_start_time,
526 &mut gc_status,
527 worker,
528 )?;
529
530 crate::task_log!(
531 worker,
532 "Removed garbage: {}",
533 HumanByte::from(gc_status.removed_bytes),
534 );
535 crate::task_log!(worker, "Removed chunks: {}", gc_status.removed_chunks);
536 if gc_status.pending_bytes > 0 {
537 crate::task_log!(
538 worker,
539 "Pending removals: {} (in {} chunks)",
540 HumanByte::from(gc_status.pending_bytes),
541 gc_status.pending_chunks,
542 );
543 }
544 if gc_status.removed_bad > 0 {
545 crate::task_log!(worker, "Removed bad files: {}", gc_status.removed_bad);
546 }
547
548 crate::task_log!(
549 worker,
550 "Original data usage: {}",
551 HumanByte::from(gc_status.index_data_bytes),
552 );
553
554 if gc_status.index_data_bytes > 0 {
555 let comp_per = (gc_status.disk_bytes as f64 * 100.)/gc_status.index_data_bytes as f64;
556 crate::task_log!(
557 worker,
558 "On-Disk usage: {} ({:.2}%)",
559 HumanByte::from(gc_status.disk_bytes),
560 comp_per,
561 );
562 }
563
564 crate::task_log!(worker, "On-Disk chunks: {}", gc_status.disk_chunks);
565
566 if gc_status.disk_chunks > 0 {
567 let avg_chunk = gc_status.disk_bytes/(gc_status.disk_chunks as u64);
568 crate::task_log!(worker, "Average chunk size: {}", HumanByte::from(avg_chunk));
569 }
570
571 *self.last_gc_status.lock().unwrap() = gc_status;
572
573 } else {
574 bail!("Start GC failed - (already running/locked)");
575 }
576
577 Ok(())
578 }
579
580 pub fn try_shared_chunk_store_lock(&self) -> Result<tools::ProcessLockSharedGuard, Error> {
581 self.chunk_store.try_shared_lock()
582 }
583
584 pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
585 self.chunk_store.chunk_path(digest)
586 }
587
588 pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result<bool, Error> {
589 self.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
590 }
591
592 pub fn insert_chunk(
593 &self,
594 chunk: &DataBlob,
595 digest: &[u8; 32],
596 ) -> Result<(bool, u64), Error> {
597 self.chunk_store.insert_chunk(chunk, digest)
598 }
599
600 pub fn load_blob(&self, backup_dir: &BackupDir, filename: &str) -> Result<DataBlob, Error> {
601 let mut path = self.base_path();
602 path.push(backup_dir.relative_path());
603 path.push(filename);
604
605 proxmox::try_block!({
606 let mut file = std::fs::File::open(&path)?;
607 DataBlob::load_from_reader(&mut file)
608 }).map_err(|err| format_err!("unable to load blob '{:?}' - {}", path, err))
609 }
610
611
612 pub fn load_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
613
614 let (chunk_path, digest_str) = self.chunk_store.chunk_path(digest);
615
616 proxmox::try_block!({
617 let mut file = std::fs::File::open(&chunk_path)?;
618 DataBlob::load_from_reader(&mut file)
619 }).map_err(|err| format_err!(
620 "store '{}', unable to load chunk '{}' - {}",
621 self.name(),
622 digest_str,
623 err,
624 ))
625 }
626
627 fn lock_manifest(
628 &self,
629 backup_dir: &BackupDir,
630 ) -> Result<File, Error> {
631 let mut path = self.base_path();
632 path.push(backup_dir.relative_path());
633 path.push(&MANIFEST_LOCK_NAME);
634
635 // update_manifest should never take a long time, so if someone else has
636 // the lock we can simply block a bit and should get it soon
637 open_file_locked(&path, Duration::from_secs(5), true)
638 .map_err(|err| {
639 format_err!(
640 "unable to acquire manifest lock {:?} - {}", &path, err
641 )
642 })
643 }
644
645 /// Load the manifest without a lock. Must not be written back.
646 pub fn load_manifest(
647 &self,
648 backup_dir: &BackupDir,
649 ) -> Result<(BackupManifest, u64), Error> {
650 let blob = self.load_blob(backup_dir, MANIFEST_BLOB_NAME)?;
651 let raw_size = blob.raw_size();
652 let manifest = BackupManifest::try_from(blob)?;
653 Ok((manifest, raw_size))
654 }
655
656 /// Update the manifest of the specified snapshot. Never write a manifest directly,
657 /// only use this method - anything else may break locking guarantees.
658 pub fn update_manifest(
659 &self,
660 backup_dir: &BackupDir,
661 update_fn: impl FnOnce(&mut BackupManifest),
662 ) -> Result<(), Error> {
663
664 let _guard = self.lock_manifest(backup_dir)?;
665 let (mut manifest, _) = self.load_manifest(&backup_dir)?;
666
667 update_fn(&mut manifest);
668
669 let manifest = serde_json::to_value(manifest)?;
670 let manifest = serde_json::to_string_pretty(&manifest)?;
671 let blob = DataBlob::encode(manifest.as_bytes(), None, true)?;
672 let raw_data = blob.raw_data();
673
674 let mut path = self.base_path();
675 path.push(backup_dir.relative_path());
676 path.push(MANIFEST_BLOB_NAME);
677
678 // atomic replace invalidates flock - no other writes past this point!
679 replace_file(&path, raw_data, CreateOptions::new())?;
680
681 Ok(())
682 }
683 }