1 use std
::collections
::{HashSet, HashMap}
;
2 use std
::io
::{self, Write}
;
3 use std
::path
::{Path, PathBuf}
;
4 use std
::sync
::{Arc, Mutex}
;
5 use std
::convert
::TryFrom
;
7 use anyhow
::{bail, format_err, Error}
;
8 use lazy_static
::lazy_static
;
9 use chrono
::{DateTime, Utc}
;
11 use super::backup_info
::{BackupGroup, BackupGroupGuard, BackupDir, BackupInfo}
;
12 use super::chunk_store
::ChunkStore
;
13 use super::dynamic_index
::{DynamicIndexReader, DynamicIndexWriter}
;
14 use super::fixed_index
::{FixedIndexReader, FixedIndexWriter}
;
15 use super::manifest
::{MANIFEST_BLOB_NAME, CLIENT_LOG_BLOB_NAME, BackupManifest}
;
17 use super::{DataBlob, ArchiveType, archive_type}
;
18 use crate::backup
::CryptMode
;
19 use crate::config
::datastore
;
20 use crate::server
::WorkerTask
;
22 use crate::api2
::types
::GarbageCollectionStatus
;
25 static ref DATASTORE_MAP
: Mutex
<HashMap
<String
, Arc
<DataStore
>>> = Mutex
::new(HashMap
::new());
28 /// Datastore Management
30 /// A Datastore can store severals backups, and provides the
31 /// management interface for backup.
32 pub struct DataStore
{
33 chunk_store
: Arc
<ChunkStore
>,
34 gc_mutex
: Mutex
<bool
>,
35 last_gc_status
: Mutex
<GarbageCollectionStatus
>,
40 pub fn lookup_datastore(name
: &str) -> Result
<Arc
<DataStore
>, Error
> {
42 let (config
, _digest
) = datastore
::config()?
;
43 let config
: datastore
::DataStoreConfig
= config
.lookup("datastore", name
)?
;
45 let mut map
= DATASTORE_MAP
.lock().unwrap();
47 if let Some(datastore
) = map
.get(name
) {
48 // Compare Config - if changed, create new Datastore object!
49 if datastore
.chunk_store
.base
== PathBuf
::from(&config
.path
) {
50 return Ok(datastore
.clone());
54 let datastore
= DataStore
::open(name
)?
;
56 let datastore
= Arc
::new(datastore
);
57 map
.insert(name
.to_string(), datastore
.clone());
62 pub fn open(store_name
: &str) -> Result
<Self, Error
> {
64 let (config
, _digest
) = datastore
::config()?
;
65 let (_
, store_config
) = config
.sections
.get(store_name
)
66 .ok_or(format_err
!("no such datastore '{}'", store_name
))?
;
68 let path
= store_config
["path"].as_str().unwrap();
70 let chunk_store
= ChunkStore
::open(store_name
, path
)?
;
72 let gc_status
= GarbageCollectionStatus
::default();
75 chunk_store
: Arc
::new(chunk_store
),
76 gc_mutex
: Mutex
::new(false),
77 last_gc_status
: Mutex
::new(gc_status
),
81 pub fn get_chunk_iterator(
84 impl Iterator
<Item
= (Result
<tools
::fs
::ReadDirEntry
, Error
>, usize)>,
87 self.chunk_store
.get_chunk_iterator()
90 pub fn create_fixed_writer
<P
: AsRef
<Path
>>(&self, filename
: P
, size
: usize, chunk_size
: usize) -> Result
<FixedIndexWriter
, Error
> {
92 let index
= FixedIndexWriter
::create(self.chunk_store
.clone(), filename
.as_ref(), size
, chunk_size
)?
;
97 pub fn open_fixed_reader
<P
: AsRef
<Path
>>(&self, filename
: P
) -> Result
<FixedIndexReader
, Error
> {
99 let full_path
= self.chunk_store
.relative_path(filename
.as_ref());
101 let index
= FixedIndexReader
::open(&full_path
)?
;
106 pub fn create_dynamic_writer
<P
: AsRef
<Path
>>(
108 ) -> Result
<DynamicIndexWriter
, Error
> {
110 let index
= DynamicIndexWriter
::create(
111 self.chunk_store
.clone(), filename
.as_ref())?
;
116 pub fn open_dynamic_reader
<P
: AsRef
<Path
>>(&self, filename
: P
) -> Result
<DynamicIndexReader
, Error
> {
118 let full_path
= self.chunk_store
.relative_path(filename
.as_ref());
120 let index
= DynamicIndexReader
::open(&full_path
)?
;
125 pub fn open_index
<P
>(&self, filename
: P
) -> Result
<Box
<dyn IndexFile
+ Send
>, Error
>
129 let filename
= filename
.as_ref();
130 let out
: Box
<dyn IndexFile
+ Send
> =
131 match archive_type(filename
)?
{
132 ArchiveType
::DynamicIndex
=> Box
::new(self.open_dynamic_reader(filename
)?
),
133 ArchiveType
::FixedIndex
=> Box
::new(self.open_fixed_reader(filename
)?
),
134 _
=> bail
!("cannot open index file of unknown type: {:?}", filename
),
139 pub fn name(&self) -> &str {
140 self.chunk_store
.name()
143 pub fn base_path(&self) -> PathBuf
{
144 self.chunk_store
.base_path()
147 /// Cleanup a backup directory
149 /// Removes all files not mentioned in the manifest.
150 pub fn cleanup_backup_dir(&self, backup_dir
: &BackupDir
, manifest
: &BackupManifest
151 ) -> Result
<(), Error
> {
153 let mut full_path
= self.base_path();
154 full_path
.push(backup_dir
.relative_path());
156 let mut wanted_files
= HashSet
::new();
157 wanted_files
.insert(MANIFEST_BLOB_NAME
.to_string());
158 wanted_files
.insert(CLIENT_LOG_BLOB_NAME
.to_string());
159 manifest
.files().iter().for_each(|item
| { wanted_files.insert(item.filename.clone()); }
);
161 for item
in tools
::fs
::read_subdir(libc
::AT_FDCWD
, &full_path
)?
{
162 if let Ok(item
) = item
{
163 if let Some(file_type
) = item
.file_type() {
164 if file_type
!= nix
::dir
::Type
::File { continue; }
166 let file_name
= item
.file_name().to_bytes();
167 if file_name
== b
"." || file_name
== b
".." { continue; }
;
169 if let Ok(name
) = std
::str::from_utf8(file_name
) {
170 if wanted_files
.contains(name
) { continue; }
172 println
!("remove unused file {:?}", item
.file_name());
173 let dirfd
= item
.parent_fd();
174 let _res
= unsafe { libc::unlinkat(dirfd, item.file_name().as_ptr(), 0) }
;
181 /// Returns the absolute path for a backup_group
182 pub fn group_path(&self, backup_group
: &BackupGroup
) -> PathBuf
{
183 let mut full_path
= self.base_path();
184 full_path
.push(backup_group
.group_path());
188 /// Returns the absolute path for backup_dir
189 pub fn snapshot_path(&self, backup_dir
: &BackupDir
) -> PathBuf
{
190 let mut full_path
= self.base_path();
191 full_path
.push(backup_dir
.relative_path());
195 /// Remove a complete backup group including all snapshots
196 pub fn remove_backup_group(&self, backup_group
: &BackupGroup
) -> Result
<(), Error
> {
198 let full_path
= self.group_path(backup_group
);
200 let mut snap_list
= backup_group
.list_backups(&self.base_path())?
;
201 BackupInfo
::sort_list(&mut snap_list
, false);
202 for snap
in snap_list
{
203 if snap
.is_finished() {
207 "cannot remove backup group {:?}, contains potentially running backup: {}",
214 log
::info
!("removing backup group {:?}", full_path
);
215 std
::fs
::remove_dir_all(&full_path
)
218 "removing backup group {:?} failed - {}",
227 /// Remove a backup directory including all content
228 pub fn remove_backup_dir(&self, backup_dir
: &BackupDir
, force
: bool
) -> Result
<(), Error
> {
230 let full_path
= self.snapshot_path(backup_dir
);
233 let mut snap_list
= backup_dir
.group().list_backups(&self.base_path())?
;
234 BackupInfo
::sort_list(&mut snap_list
, false);
235 let mut prev_snap_finished
= true;
236 for snap
in snap_list
{
237 let cur_snap_finished
= snap
.is_finished();
238 if &snap
.backup_dir
== backup_dir
{
239 if !cur_snap_finished
{
241 "cannot remove currently running snapshot: {:?}",
245 if !prev_snap_finished
{
247 "cannot remove snapshot {:?}, successor is currently running and potentially based on it",
253 prev_snap_finished
= cur_snap_finished
;
257 log
::info
!("removing backup snapshot {:?}", full_path
);
258 std
::fs
::remove_dir_all(&full_path
)
261 "removing backup snapshot {:?} failed - {}",
270 /// Returns the time of the last successful backup
272 /// Or None if there is no backup in the group (or the group dir does not exist).
273 pub fn last_successful_backup(&self, backup_group
: &BackupGroup
) -> Result
<Option
<DateTime
<Utc
>>, Error
> {
274 let base_path
= self.base_path();
275 let mut group_path
= base_path
.clone();
276 group_path
.push(backup_group
.group_path());
278 if group_path
.exists() {
279 backup_group
.last_successful_backup(&base_path
)
285 /// Returns the backup owner.
287 /// The backup owner is the user who first created the backup group.
288 pub fn get_owner(&self, backup_group
: &BackupGroup
) -> Result
<String
, Error
> {
289 let mut full_path
= self.base_path();
290 full_path
.push(backup_group
.group_path());
291 full_path
.push("owner");
292 let owner
= proxmox
::tools
::fs
::file_read_firstline(full_path
)?
;
293 Ok(owner
.trim_end().to_string()) // remove trailing newline
296 /// Set the backup owner.
297 pub fn set_owner(&self, backup_group
: &BackupGroup
, userid
: &str, force
: bool
) -> Result
<(), Error
> {
298 let mut path
= self.base_path();
299 path
.push(backup_group
.group_path());
302 let mut open_options
= std
::fs
::OpenOptions
::new();
303 open_options
.write(true);
304 open_options
.truncate(true);
307 open_options
.create(true);
309 open_options
.create_new(true);
312 let mut file
= open_options
.open(&path
)
313 .map_err(|err
| format_err
!("unable to create owner file {:?} - {}", path
, err
))?
;
315 write
!(file
, "{}\n", userid
)
316 .map_err(|err
| format_err
!("unable to write owner file {:?} - {}", path
, err
))?
;
321 /// Create (if it does not already exists) and lock a backup group
323 /// And set the owner to 'userid'. If the group already exists, it returns the
324 /// current owner (instead of setting the owner).
326 /// This also aquires an exclusive lock on the directory and returns the lock guard.
327 pub fn create_locked_backup_group(&self, backup_group
: &BackupGroup
, userid
: &str) -> Result
<(String
, BackupGroupGuard
), Error
> {
329 // create intermediate path first:
330 let base_path
= self.base_path();
332 let mut full_path
= base_path
.clone();
333 full_path
.push(backup_group
.backup_type());
334 std
::fs
::create_dir_all(&full_path
)?
;
336 full_path
.push(backup_group
.backup_id());
338 // create the last component now
339 match std
::fs
::create_dir(&full_path
) {
341 let guard
= backup_group
.lock(&base_path
)?
;
342 self.set_owner(backup_group
, userid
, false)?
;
343 let owner
= self.get_owner(backup_group
)?
; // just to be sure
346 Err(ref err
) if err
.kind() == io
::ErrorKind
::AlreadyExists
=> {
347 let guard
= backup_group
.lock(&base_path
)?
;
348 let owner
= self.get_owner(backup_group
)?
; // just to be sure
351 Err(err
) => bail
!("unable to create backup group {:?} - {}", full_path
, err
),
355 /// Creates a new backup snapshot inside a BackupGroup
357 /// The BackupGroup directory needs to exist.
358 pub fn create_backup_dir(&self, backup_dir
: &BackupDir
) -> Result
<(PathBuf
, bool
), io
::Error
> {
359 let relative_path
= backup_dir
.relative_path();
360 let mut full_path
= self.base_path();
361 full_path
.push(&relative_path
);
363 match std
::fs
::create_dir(&full_path
) {
364 Ok(_
) => Ok((relative_path
, true)),
365 Err(ref e
) if e
.kind() == io
::ErrorKind
::AlreadyExists
=> Ok((relative_path
, false)),
370 pub fn list_images(&self) -> Result
<Vec
<PathBuf
>, Error
> {
371 let base
= self.base_path();
373 let mut list
= vec
![];
375 use walkdir
::WalkDir
;
377 let walker
= WalkDir
::new(&base
).same_file_system(true).into_iter();
379 // make sure we skip .chunks (and other hidden files to keep it simple)
380 fn is_hidden(entry
: &walkdir
::DirEntry
) -> bool
{
383 .map(|s
| s
.starts_with("."))
386 let handle_entry_err
= |err
: walkdir
::Error
| {
387 if let Some(inner
) = err
.io_error() {
388 let path
= err
.path().unwrap_or(Path
::new(""));
390 io
::ErrorKind
::PermissionDenied
=> {
391 // only allow to skip ext4 fsck directory, avoid GC if, for example,
392 // a user got file permissions wrong on datastore rsync to new server
393 if err
.depth() > 1 || !path
.ends_with("lost+found") {
394 bail
!("cannot continue garbage-collection safely, permission denied on: {}", path
.display())
397 _
=> bail
!("unexpected error on datastore traversal: {} - {}", inner
, path
.display()),
402 for entry
in walker
.filter_entry(|e
| !is_hidden(e
)) {
403 let path
= match entry
{
404 Ok(entry
) => entry
.into_path(),
406 handle_entry_err(err
)?
;
410 if let Ok(archive_type
) = archive_type(&path
) {
411 if archive_type
== ArchiveType
::FixedIndex
|| archive_type
== ArchiveType
::DynamicIndex
{
420 // mark chunks used by ``index`` as used
421 fn index_mark_used_chunks
<I
: IndexFile
>(
424 file_name
: &Path
, // only used for error reporting
425 status
: &mut GarbageCollectionStatus
,
427 ) -> Result
<(), Error
> {
429 status
.index_file_count
+= 1;
430 status
.index_data_bytes
+= index
.index_bytes();
432 for pos
in 0..index
.index_count() {
433 worker
.fail_on_abort()?
;
434 tools
::fail_on_shutdown()?
;
435 let digest
= index
.index_digest(pos
).unwrap();
436 if let Err(err
) = self.chunk_store
.touch_chunk(digest
) {
437 bail
!("unable to access chunk {}, required by {:?} - {}",
438 proxmox
::tools
::digest_to_hex(digest
), file_name
, err
);
444 fn mark_used_chunks(&self, status
: &mut GarbageCollectionStatus
, worker
: &WorkerTask
) -> Result
<(), Error
> {
446 let image_list
= self.list_images()?
;
448 for path
in image_list
{
450 worker
.fail_on_abort()?
;
451 tools
::fail_on_shutdown()?
;
453 if let Ok(archive_type
) = archive_type(&path
) {
454 if archive_type
== ArchiveType
::FixedIndex
{
455 let index
= self.open_fixed_reader(&path
)?
;
456 self.index_mark_used_chunks(index
, &path
, status
, worker
)?
;
457 } else if archive_type
== ArchiveType
::DynamicIndex
{
458 let index
= self.open_dynamic_reader(&path
)?
;
459 self.index_mark_used_chunks(index
, &path
, status
, worker
)?
;
467 pub fn last_gc_status(&self) -> GarbageCollectionStatus
{
468 self.last_gc_status
.lock().unwrap().clone()
471 pub fn garbage_collection_running(&self) -> bool
{
472 if let Ok(_
) = self.gc_mutex
.try_lock() { false }
else { true }
475 pub fn garbage_collection(&self, worker
: &WorkerTask
) -> Result
<(), Error
> {
477 if let Ok(ref mut _mutex
) = self.gc_mutex
.try_lock() {
479 let _exclusive_lock
= self.chunk_store
.try_exclusive_lock()?
;
481 let now
= unsafe { libc::time(std::ptr::null_mut()) }
;
483 let oldest_writer
= self.chunk_store
.oldest_writer().unwrap_or(now
);
485 let mut gc_status
= GarbageCollectionStatus
::default();
486 gc_status
.upid
= Some(worker
.to_string());
488 worker
.log("Start GC phase1 (mark used chunks)");
490 self.mark_used_chunks(&mut gc_status
, &worker
)?
;
492 worker
.log("Start GC phase2 (sweep unused chunks)");
493 self.chunk_store
.sweep_unused_chunks(oldest_writer
, &mut gc_status
, &worker
)?
;
495 worker
.log(&format
!("Removed bytes: {}", gc_status
.removed_bytes
));
496 worker
.log(&format
!("Removed chunks: {}", gc_status
.removed_chunks
));
497 if gc_status
.pending_bytes
> 0 {
498 worker
.log(&format
!("Pending removals: {} bytes ({} chunks)", gc_status
.pending_bytes
, gc_status
.pending_chunks
));
501 worker
.log(&format
!("Original data bytes: {}", gc_status
.index_data_bytes
));
503 if gc_status
.index_data_bytes
> 0 {
504 let comp_per
= (gc_status
.disk_bytes
*100)/gc_status
.index_data_bytes
;
505 worker
.log(&format
!("Disk bytes: {} ({} %)", gc_status
.disk_bytes
, comp_per
));
508 worker
.log(&format
!("Disk chunks: {}", gc_status
.disk_chunks
));
510 if gc_status
.disk_chunks
> 0 {
511 let avg_chunk
= gc_status
.disk_bytes
/(gc_status
.disk_chunks
as u64);
512 worker
.log(&format
!("Average chunk size: {}", avg_chunk
));
515 *self.last_gc_status
.lock().unwrap() = gc_status
;
518 bail
!("Start GC failed - (already running/locked)");
524 pub fn try_shared_chunk_store_lock(&self) -> Result
<tools
::ProcessLockSharedGuard
, Error
> {
525 self.chunk_store
.try_shared_lock()
528 pub fn chunk_path(&self, digest
:&[u8; 32]) -> (PathBuf
, String
) {
529 self.chunk_store
.chunk_path(digest
)
532 pub fn cond_touch_chunk(&self, digest
: &[u8; 32], fail_if_not_exist
: bool
) -> Result
<bool
, Error
> {
533 self.chunk_store
.cond_touch_chunk(digest
, fail_if_not_exist
)
540 ) -> Result
<(bool
, u64), Error
> {
541 self.chunk_store
.insert_chunk(chunk
, digest
)
544 pub fn verify_stored_chunk(&self, digest
: &[u8; 32], expected_chunk_size
: u64) -> Result
<(), Error
> {
545 let blob
= self.load_chunk(digest
)?
;
546 blob
.verify_unencrypted(expected_chunk_size
as usize, digest
)?
;
550 pub fn load_blob(&self, backup_dir
: &BackupDir
, filename
: &str) -> Result
<DataBlob
, Error
> {
551 let mut path
= self.base_path();
552 path
.push(backup_dir
.relative_path());
555 proxmox
::try_block
!({
556 let mut file
= std
::fs
::File
::open(&path
)?
;
557 DataBlob
::load_from_reader(&mut file
)
558 }).map_err(|err
| format_err
!("unable to load blob '{:?}' - {}", path
, err
))
561 pub fn load_chunk(&self, digest
: &[u8; 32]) -> Result
<DataBlob
, Error
> {
563 let (chunk_path
, digest_str
) = self.chunk_store
.chunk_path(digest
);
565 proxmox
::try_block
!({
566 let mut file
= std
::fs
::File
::open(&chunk_path
)?
;
567 DataBlob
::load_from_reader(&mut file
)
568 }).map_err(|err
| format_err
!(
569 "store '{}', unable to load chunk '{}' - {}",
576 pub fn load_manifest(
578 backup_dir
: &BackupDir
,
579 ) -> Result
<(BackupManifest
, CryptMode
, u64), Error
> {
580 let blob
= self.load_blob(backup_dir
, MANIFEST_BLOB_NAME
)?
;
581 let raw_size
= blob
.raw_size();
582 let crypt_mode
= blob
.crypt_mode()?
;
583 let manifest
= BackupManifest
::try_from(blob
)?
;
584 Ok((manifest
, crypt_mode
, raw_size
))