1 use std
::collections
::{HashSet, HashMap}
;
2 use std
::io
::{self, Write}
;
3 use std
::path
::{Path, PathBuf}
;
4 use std
::sync
::{Arc, Mutex}
;
5 use std
::convert
::TryFrom
;
6 use std
::time
::Duration
;
9 use anyhow
::{bail, format_err, Error}
;
10 use lazy_static
::lazy_static
;
12 use proxmox
::tools
::fs
::{replace_file, file_read_optional_string, CreateOptions, open_file_locked}
;
14 use super::backup_info
::{BackupGroup, BackupDir}
;
15 use super::chunk_store
::ChunkStore
;
16 use super::dynamic_index
::{DynamicIndexReader, DynamicIndexWriter}
;
17 use super::fixed_index
::{FixedIndexReader, FixedIndexWriter}
;
18 use super::manifest
::{MANIFEST_BLOB_NAME, MANIFEST_LOCK_NAME, CLIENT_LOG_BLOB_NAME, BackupManifest}
;
20 use super::{DataBlob, ArchiveType, archive_type}
;
21 use crate::config
::datastore
::{self, DataStoreConfig}
;
22 use crate::task
::TaskState
;
24 use crate::tools
::format
::HumanByte
;
25 use crate::tools
::fs
::{lock_dir_noblock, DirLockGuard}
;
26 use crate::api2
::types
::{GarbageCollectionStatus, Userid}
;
27 use crate::server
::UPID
;
30 static ref DATASTORE_MAP
: Mutex
<HashMap
<String
, Arc
<DataStore
>>> = Mutex
::new(HashMap
::new());
33 /// Datastore Management
35 /// A Datastore can store severals backups, and provides the
36 /// management interface for backup.
37 pub struct DataStore
{
38 chunk_store
: Arc
<ChunkStore
>,
39 gc_mutex
: Mutex
<bool
>,
40 last_gc_status
: Mutex
<GarbageCollectionStatus
>,
46 pub fn lookup_datastore(name
: &str) -> Result
<Arc
<DataStore
>, Error
> {
48 let (config
, _digest
) = datastore
::config()?
;
49 let config
: datastore
::DataStoreConfig
= config
.lookup("datastore", name
)?
;
50 let path
= PathBuf
::from(&config
.path
);
52 let mut map
= DATASTORE_MAP
.lock().unwrap();
54 if let Some(datastore
) = map
.get(name
) {
55 // Compare Config - if changed, create new Datastore object!
56 if datastore
.chunk_store
.base
== path
&&
57 datastore
.verify_new
== config
.verify_new
.unwrap_or(false)
59 return Ok(datastore
.clone());
63 let datastore
= DataStore
::open_with_path(name
, &path
, config
)?
;
65 let datastore
= Arc
::new(datastore
);
66 map
.insert(name
.to_string(), datastore
.clone());
71 fn open_with_path(store_name
: &str, path
: &Path
, config
: DataStoreConfig
) -> Result
<Self, Error
> {
72 let chunk_store
= ChunkStore
::open(store_name
, path
)?
;
74 let mut gc_status_path
= chunk_store
.base_path();
75 gc_status_path
.push(".gc-status");
77 let gc_status
= if let Some(state
) = file_read_optional_string(gc_status_path
)?
{
78 match serde_json
::from_str(&state
) {
81 eprintln
!("error reading gc-status: {}", err
);
82 GarbageCollectionStatus
::default()
86 GarbageCollectionStatus
::default()
90 chunk_store
: Arc
::new(chunk_store
),
91 gc_mutex
: Mutex
::new(false),
92 last_gc_status
: Mutex
::new(gc_status
),
93 verify_new
: config
.verify_new
.unwrap_or(false),
97 pub fn get_chunk_iterator(
100 impl Iterator
<Item
= (Result
<tools
::fs
::ReadDirEntry
, Error
>, usize, bool
)>,
103 self.chunk_store
.get_chunk_iterator()
106 pub fn create_fixed_writer
<P
: AsRef
<Path
>>(&self, filename
: P
, size
: usize, chunk_size
: usize) -> Result
<FixedIndexWriter
, Error
> {
108 let index
= FixedIndexWriter
::create(self.chunk_store
.clone(), filename
.as_ref(), size
, chunk_size
)?
;
113 pub fn open_fixed_reader
<P
: AsRef
<Path
>>(&self, filename
: P
) -> Result
<FixedIndexReader
, Error
> {
115 let full_path
= self.chunk_store
.relative_path(filename
.as_ref());
117 let index
= FixedIndexReader
::open(&full_path
)?
;
122 pub fn create_dynamic_writer
<P
: AsRef
<Path
>>(
124 ) -> Result
<DynamicIndexWriter
, Error
> {
126 let index
= DynamicIndexWriter
::create(
127 self.chunk_store
.clone(), filename
.as_ref())?
;
132 pub fn open_dynamic_reader
<P
: AsRef
<Path
>>(&self, filename
: P
) -> Result
<DynamicIndexReader
, Error
> {
134 let full_path
= self.chunk_store
.relative_path(filename
.as_ref());
136 let index
= DynamicIndexReader
::open(&full_path
)?
;
141 pub fn open_index
<P
>(&self, filename
: P
) -> Result
<Box
<dyn IndexFile
+ Send
>, Error
>
145 let filename
= filename
.as_ref();
146 let out
: Box
<dyn IndexFile
+ Send
> =
147 match archive_type(filename
)?
{
148 ArchiveType
::DynamicIndex
=> Box
::new(self.open_dynamic_reader(filename
)?
),
149 ArchiveType
::FixedIndex
=> Box
::new(self.open_fixed_reader(filename
)?
),
150 _
=> bail
!("cannot open index file of unknown type: {:?}", filename
),
155 pub fn name(&self) -> &str {
156 self.chunk_store
.name()
159 pub fn base_path(&self) -> PathBuf
{
160 self.chunk_store
.base_path()
163 /// Cleanup a backup directory
165 /// Removes all files not mentioned in the manifest.
166 pub fn cleanup_backup_dir(&self, backup_dir
: &BackupDir
, manifest
: &BackupManifest
167 ) -> Result
<(), Error
> {
169 let mut full_path
= self.base_path();
170 full_path
.push(backup_dir
.relative_path());
172 let mut wanted_files
= HashSet
::new();
173 wanted_files
.insert(MANIFEST_BLOB_NAME
.to_string());
174 wanted_files
.insert(CLIENT_LOG_BLOB_NAME
.to_string());
175 manifest
.files().iter().for_each(|item
| { wanted_files.insert(item.filename.clone()); }
);
177 for item
in tools
::fs
::read_subdir(libc
::AT_FDCWD
, &full_path
)?
{
178 if let Ok(item
) = item
{
179 if let Some(file_type
) = item
.file_type() {
180 if file_type
!= nix
::dir
::Type
::File { continue; }
182 let file_name
= item
.file_name().to_bytes();
183 if file_name
== b
"." || file_name
== b
".." { continue; }
;
185 if let Ok(name
) = std
::str::from_utf8(file_name
) {
186 if wanted_files
.contains(name
) { continue; }
188 println
!("remove unused file {:?}", item
.file_name());
189 let dirfd
= item
.parent_fd();
190 let _res
= unsafe { libc::unlinkat(dirfd, item.file_name().as_ptr(), 0) }
;
197 /// Returns the absolute path for a backup_group
198 pub fn group_path(&self, backup_group
: &BackupGroup
) -> PathBuf
{
199 let mut full_path
= self.base_path();
200 full_path
.push(backup_group
.group_path());
204 /// Returns the absolute path for backup_dir
205 pub fn snapshot_path(&self, backup_dir
: &BackupDir
) -> PathBuf
{
206 let mut full_path
= self.base_path();
207 full_path
.push(backup_dir
.relative_path());
211 /// Remove a complete backup group including all snapshots
212 pub fn remove_backup_group(&self, backup_group
: &BackupGroup
) -> Result
<(), Error
> {
214 let full_path
= self.group_path(backup_group
);
216 let _guard
= tools
::fs
::lock_dir_noblock(&full_path
, "backup group", "possible running backup")?
;
218 log
::info
!("removing backup group {:?}", full_path
);
220 // remove all individual backup dirs first to ensure nothing is using them
221 for snap
in backup_group
.list_backups(&self.base_path())?
{
222 self.remove_backup_dir(&snap
.backup_dir
, false)?
;
225 // no snapshots left, we can now safely remove the empty folder
226 std
::fs
::remove_dir_all(&full_path
)
229 "removing backup group directory {:?} failed - {}",
238 /// Remove a backup directory including all content
239 pub fn remove_backup_dir(&self, backup_dir
: &BackupDir
, force
: bool
) -> Result
<(), Error
> {
241 let full_path
= self.snapshot_path(backup_dir
);
243 let (_guard
, _manifest_guard
);
245 _guard
= lock_dir_noblock(&full_path
, "snapshot", "possibly running or in use")?
;
246 _manifest_guard
= self.lock_manifest(backup_dir
);
249 log
::info
!("removing backup snapshot {:?}", full_path
);
250 std
::fs
::remove_dir_all(&full_path
)
253 "removing backup snapshot {:?} failed - {}",
262 /// Returns the time of the last successful backup
264 /// Or None if there is no backup in the group (or the group dir does not exist).
265 pub fn last_successful_backup(&self, backup_group
: &BackupGroup
) -> Result
<Option
<i64>, Error
> {
266 let base_path
= self.base_path();
267 let mut group_path
= base_path
.clone();
268 group_path
.push(backup_group
.group_path());
270 if group_path
.exists() {
271 backup_group
.last_successful_backup(&base_path
)
277 /// Returns the backup owner.
279 /// The backup owner is the user who first created the backup group.
280 pub fn get_owner(&self, backup_group
: &BackupGroup
) -> Result
<Userid
, Error
> {
281 let mut full_path
= self.base_path();
282 full_path
.push(backup_group
.group_path());
283 full_path
.push("owner");
284 let owner
= proxmox
::tools
::fs
::file_read_firstline(full_path
)?
;
285 Ok(owner
.trim_end().parse()?
) // remove trailing newline
288 /// Set the backup owner.
291 backup_group
: &BackupGroup
,
294 ) -> Result
<(), Error
> {
295 let mut path
= self.base_path();
296 path
.push(backup_group
.group_path());
299 let mut open_options
= std
::fs
::OpenOptions
::new();
300 open_options
.write(true);
301 open_options
.truncate(true);
304 open_options
.create(true);
306 open_options
.create_new(true);
309 let mut file
= open_options
.open(&path
)
310 .map_err(|err
| format_err
!("unable to create owner file {:?} - {}", path
, err
))?
;
312 writeln
!(file
, "{}", userid
)
313 .map_err(|err
| format_err
!("unable to write owner file {:?} - {}", path
, err
))?
;
318 /// Create (if it does not already exists) and lock a backup group
320 /// And set the owner to 'userid'. If the group already exists, it returns the
321 /// current owner (instead of setting the owner).
323 /// This also acquires an exclusive lock on the directory and returns the lock guard.
324 pub fn create_locked_backup_group(
326 backup_group
: &BackupGroup
,
328 ) -> Result
<(Userid
, DirLockGuard
), Error
> {
329 // create intermediate path first:
330 let base_path
= self.base_path();
332 let mut full_path
= base_path
.clone();
333 full_path
.push(backup_group
.backup_type());
334 std
::fs
::create_dir_all(&full_path
)?
;
336 full_path
.push(backup_group
.backup_id());
338 // create the last component now
339 match std
::fs
::create_dir(&full_path
) {
341 let guard
= lock_dir_noblock(&full_path
, "backup group", "another backup is already running")?
;
342 self.set_owner(backup_group
, userid
, false)?
;
343 let owner
= self.get_owner(backup_group
)?
; // just to be sure
346 Err(ref err
) if err
.kind() == io
::ErrorKind
::AlreadyExists
=> {
347 let guard
= lock_dir_noblock(&full_path
, "backup group", "another backup is already running")?
;
348 let owner
= self.get_owner(backup_group
)?
; // just to be sure
351 Err(err
) => bail
!("unable to create backup group {:?} - {}", full_path
, err
),
355 /// Creates a new backup snapshot inside a BackupGroup
357 /// The BackupGroup directory needs to exist.
358 pub fn create_locked_backup_dir(&self, backup_dir
: &BackupDir
)
359 -> Result
<(PathBuf
, bool
, DirLockGuard
), Error
>
361 let relative_path
= backup_dir
.relative_path();
362 let mut full_path
= self.base_path();
363 full_path
.push(&relative_path
);
366 lock_dir_noblock(&full_path
, "snapshot", "internal error - tried creating snapshot that's already in use");
368 match std
::fs
::create_dir(&full_path
) {
369 Ok(_
) => Ok((relative_path
, true, lock()?
)),
370 Err(ref e
) if e
.kind() == io
::ErrorKind
::AlreadyExists
=> Ok((relative_path
, false, lock()?
)),
371 Err(e
) => Err(e
.into())
375 pub fn list_images(&self) -> Result
<Vec
<PathBuf
>, Error
> {
376 let base
= self.base_path();
378 let mut list
= vec
![];
380 use walkdir
::WalkDir
;
382 let walker
= WalkDir
::new(&base
).same_file_system(true).into_iter();
384 // make sure we skip .chunks (and other hidden files to keep it simple)
385 fn is_hidden(entry
: &walkdir
::DirEntry
) -> bool
{
388 .map(|s
| s
.starts_with("."))
391 let handle_entry_err
= |err
: walkdir
::Error
| {
392 if let Some(inner
) = err
.io_error() {
393 let path
= err
.path().unwrap_or(Path
::new(""));
395 io
::ErrorKind
::PermissionDenied
=> {
396 // only allow to skip ext4 fsck directory, avoid GC if, for example,
397 // a user got file permissions wrong on datastore rsync to new server
398 if err
.depth() > 1 || !path
.ends_with("lost+found") {
399 bail
!("cannot continue garbage-collection safely, permission denied on: {}", path
.display())
402 _
=> bail
!("unexpected error on datastore traversal: {} - {}", inner
, path
.display()),
407 for entry
in walker
.filter_entry(|e
| !is_hidden(e
)) {
408 let path
= match entry
{
409 Ok(entry
) => entry
.into_path(),
411 handle_entry_err(err
)?
;
415 if let Ok(archive_type
) = archive_type(&path
) {
416 if archive_type
== ArchiveType
::FixedIndex
|| archive_type
== ArchiveType
::DynamicIndex
{
425 // mark chunks used by ``index`` as used
426 fn index_mark_used_chunks
<I
: IndexFile
>(
429 file_name
: &Path
, // only used for error reporting
430 status
: &mut GarbageCollectionStatus
,
431 worker
: &dyn TaskState
,
432 ) -> Result
<(), Error
> {
434 status
.index_file_count
+= 1;
435 status
.index_data_bytes
+= index
.index_bytes();
437 for pos
in 0..index
.index_count() {
438 worker
.check_abort()?
;
439 tools
::fail_on_shutdown()?
;
440 let digest
= index
.index_digest(pos
).unwrap();
441 if let Err(err
) = self.chunk_store
.touch_chunk(digest
) {
444 "warning: unable to access chunk {}, required by {:?} - {}",
445 proxmox
::tools
::digest_to_hex(digest
),
456 status
: &mut GarbageCollectionStatus
,
457 worker
: &dyn TaskState
,
458 ) -> Result
<(), Error
> {
460 let image_list
= self.list_images()?
;
462 let image_count
= image_list
.len();
466 let mut last_percentage
: usize = 0;
468 for path
in image_list
{
470 worker
.check_abort()?
;
471 tools
::fail_on_shutdown()?
;
473 let full_path
= self.chunk_store
.relative_path(&path
);
474 match std
::fs
::File
::open(&full_path
) {
476 if let Ok(archive_type
) = archive_type(&path
) {
477 if archive_type
== ArchiveType
::FixedIndex
{
478 let index
= FixedIndexReader
::new(file
)?
;
479 self.index_mark_used_chunks(index
, &path
, status
, worker
)?
;
480 } else if archive_type
== ArchiveType
::DynamicIndex
{
481 let index
= DynamicIndexReader
::new(file
)?
;
482 self.index_mark_used_chunks(index
, &path
, status
, worker
)?
;
487 if err
.kind() == std
::io
::ErrorKind
::NotFound
{
488 // simply ignore vanished files
490 return Err(err
.into());
496 let percentage
= done
*100/image_count
;
497 if percentage
> last_percentage
{
500 "percentage done: phase1 {}% ({} of {} index files)",
505 last_percentage
= percentage
;
512 pub fn last_gc_status(&self) -> GarbageCollectionStatus
{
513 self.last_gc_status
.lock().unwrap().clone()
516 pub fn garbage_collection_running(&self) -> bool
{
517 if let Ok(_
) = self.gc_mutex
.try_lock() { false }
else { true }
520 pub fn garbage_collection(&self, worker
: &dyn TaskState
, upid
: &UPID
) -> Result
<(), Error
> {
522 if let Ok(ref mut _mutex
) = self.gc_mutex
.try_lock() {
524 // avoids that we run GC if an old daemon process has still a
525 // running backup writer, which is not save as we have no "oldest
526 // writer" information and thus no safe atime cutoff
527 let _exclusive_lock
= self.chunk_store
.try_exclusive_lock()?
;
529 let phase1_start_time
= proxmox
::tools
::time
::epoch_i64();
530 let oldest_writer
= self.chunk_store
.oldest_writer().unwrap_or(phase1_start_time
);
532 let mut gc_status
= GarbageCollectionStatus
::default();
533 gc_status
.upid
= Some(upid
.to_string());
535 crate::task_log
!(worker
, "Start GC phase1 (mark used chunks)");
537 self.mark_used_chunks(&mut gc_status
, worker
)?
;
539 crate::task_log
!(worker
, "Start GC phase2 (sweep unused chunks)");
540 self.chunk_store
.sweep_unused_chunks(
549 "Removed garbage: {}",
550 HumanByte
::from(gc_status
.removed_bytes
),
552 crate::task_log
!(worker
, "Removed chunks: {}", gc_status
.removed_chunks
);
553 if gc_status
.pending_bytes
> 0 {
556 "Pending removals: {} (in {} chunks)",
557 HumanByte
::from(gc_status
.pending_bytes
),
558 gc_status
.pending_chunks
,
561 if gc_status
.removed_bad
> 0 {
562 crate::task_log
!(worker
, "Removed bad files: {}", gc_status
.removed_bad
);
565 if gc_status
.still_bad
> 0 {
566 crate::task_log
!(worker
, "Bad chunks: {}", gc_status
.removed_bad
);
571 "Original data usage: {}",
572 HumanByte
::from(gc_status
.index_data_bytes
),
575 if gc_status
.index_data_bytes
> 0 {
576 let comp_per
= (gc_status
.disk_bytes
as f64 * 100.)/gc_status
.index_data_bytes
as f64;
579 "On-Disk usage: {} ({:.2}%)",
580 HumanByte
::from(gc_status
.disk_bytes
),
585 crate::task_log
!(worker
, "On-Disk chunks: {}", gc_status
.disk_chunks
);
587 if gc_status
.disk_chunks
> 0 {
588 let avg_chunk
= gc_status
.disk_bytes
/(gc_status
.disk_chunks
as u64);
589 crate::task_log
!(worker
, "Average chunk size: {}", HumanByte
::from(avg_chunk
));
592 if let Ok(serialized
) = serde_json
::to_string(&gc_status
) {
593 let mut path
= self.base_path();
594 path
.push(".gc-status");
596 let backup_user
= crate::backup
::backup_user()?
;
597 let mode
= nix
::sys
::stat
::Mode
::from_bits_truncate(0o0644);
598 // set the correct owner/group/permissions while saving file
599 // owner(rw) = backup, group(r)= backup
600 let options
= CreateOptions
::new()
602 .owner(backup_user
.uid
)
603 .group(backup_user
.gid
);
606 let _
= replace_file(path
, serialized
.as_bytes(), options
);
609 *self.last_gc_status
.lock().unwrap() = gc_status
;
612 bail
!("Start GC failed - (already running/locked)");
618 pub fn try_shared_chunk_store_lock(&self) -> Result
<tools
::ProcessLockSharedGuard
, Error
> {
619 self.chunk_store
.try_shared_lock()
622 pub fn chunk_path(&self, digest
:&[u8; 32]) -> (PathBuf
, String
) {
623 self.chunk_store
.chunk_path(digest
)
626 pub fn cond_touch_chunk(&self, digest
: &[u8; 32], fail_if_not_exist
: bool
) -> Result
<bool
, Error
> {
627 self.chunk_store
.cond_touch_chunk(digest
, fail_if_not_exist
)
634 ) -> Result
<(bool
, u64), Error
> {
635 self.chunk_store
.insert_chunk(chunk
, digest
)
638 pub fn load_blob(&self, backup_dir
: &BackupDir
, filename
: &str) -> Result
<DataBlob
, Error
> {
639 let mut path
= self.base_path();
640 path
.push(backup_dir
.relative_path());
643 proxmox
::try_block
!({
644 let mut file
= std
::fs
::File
::open(&path
)?
;
645 DataBlob
::load_from_reader(&mut file
)
646 }).map_err(|err
| format_err
!("unable to load blob '{:?}' - {}", path
, err
))
650 pub fn load_chunk(&self, digest
: &[u8; 32]) -> Result
<DataBlob
, Error
> {
652 let (chunk_path
, digest_str
) = self.chunk_store
.chunk_path(digest
);
654 proxmox
::try_block
!({
655 let mut file
= std
::fs
::File
::open(&chunk_path
)?
;
656 DataBlob
::load_from_reader(&mut file
)
657 }).map_err(|err
| format_err
!(
658 "store '{}', unable to load chunk '{}' - {}",
667 backup_dir
: &BackupDir
,
668 ) -> Result
<File
, Error
> {
669 let mut path
= self.base_path();
670 path
.push(backup_dir
.relative_path());
671 path
.push(&MANIFEST_LOCK_NAME
);
673 // update_manifest should never take a long time, so if someone else has
674 // the lock we can simply block a bit and should get it soon
675 open_file_locked(&path
, Duration
::from_secs(5), true)
678 "unable to acquire manifest lock {:?} - {}", &path
, err
683 /// Load the manifest without a lock. Must not be written back.
684 pub fn load_manifest(
686 backup_dir
: &BackupDir
,
687 ) -> Result
<(BackupManifest
, u64), Error
> {
688 let blob
= self.load_blob(backup_dir
, MANIFEST_BLOB_NAME
)?
;
689 let raw_size
= blob
.raw_size();
690 let manifest
= BackupManifest
::try_from(blob
)?
;
691 Ok((manifest
, raw_size
))
694 /// Update the manifest of the specified snapshot. Never write a manifest directly,
695 /// only use this method - anything else may break locking guarantees.
696 pub fn update_manifest(
698 backup_dir
: &BackupDir
,
699 update_fn
: impl FnOnce(&mut BackupManifest
),
700 ) -> Result
<(), Error
> {
702 let _guard
= self.lock_manifest(backup_dir
)?
;
703 let (mut manifest
, _
) = self.load_manifest(&backup_dir
)?
;
705 update_fn(&mut manifest
);
707 let manifest
= serde_json
::to_value(manifest
)?
;
708 let manifest
= serde_json
::to_string_pretty(&manifest
)?
;
709 let blob
= DataBlob
::encode(manifest
.as_bytes(), None
, true)?
;
710 let raw_data
= blob
.raw_data();
712 let mut path
= self.base_path();
713 path
.push(backup_dir
.relative_path());
714 path
.push(MANIFEST_BLOB_NAME
);
716 // atomic replace invalidates flock - no other writes past this point!
717 replace_file(&path
, raw_data
, CreateOptions
::new())?
;
722 pub fn verify_new(&self) -> bool
{