1 use std
::collections
::{HashSet, HashMap}
;
2 use std
::io
::{self, Write}
;
3 use std
::path
::{Path, PathBuf}
;
4 use std
::sync
::{Arc, Mutex}
;
6 use anyhow
::{bail, format_err, Error}
;
7 use lazy_static
::lazy_static
;
8 use chrono
::{DateTime, Utc}
;
10 use super::backup_info
::{BackupGroup, BackupDir}
;
11 use super::chunk_store
::ChunkStore
;
12 use super::dynamic_index
::{DynamicIndexReader, DynamicIndexWriter}
;
13 use super::fixed_index
::{FixedIndexReader, FixedIndexWriter}
;
14 use super::manifest
::{MANIFEST_BLOB_NAME, BackupManifest}
;
16 use super::{DataBlob, ArchiveType, archive_type}
;
17 use crate::config
::datastore
;
18 use crate::server
::WorkerTask
;
20 use crate::api2
::types
::GarbageCollectionStatus
;
23 static ref DATASTORE_MAP
: Mutex
<HashMap
<String
, Arc
<DataStore
>>> = Mutex
::new(HashMap
::new());
26 /// Datastore Management
28 /// A Datastore can store severals backups, and provides the
29 /// management interface for backup.
30 pub struct DataStore
{
31 chunk_store
: Arc
<ChunkStore
>,
32 gc_mutex
: Mutex
<bool
>,
33 last_gc_status
: Mutex
<GarbageCollectionStatus
>,
38 pub fn lookup_datastore(name
: &str) -> Result
<Arc
<DataStore
>, Error
> {
40 let (config
, _digest
) = datastore
::config()?
;
41 let config
: datastore
::DataStoreConfig
= config
.lookup("datastore", name
)?
;
43 let mut map
= DATASTORE_MAP
.lock().unwrap();
45 if let Some(datastore
) = map
.get(name
) {
46 // Compare Config - if changed, create new Datastore object!
47 if datastore
.chunk_store
.base
== PathBuf
::from(&config
.path
) {
48 return Ok(datastore
.clone());
52 let datastore
= DataStore
::open(name
)?
;
54 let datastore
= Arc
::new(datastore
);
55 map
.insert(name
.to_string(), datastore
.clone());
60 pub fn open(store_name
: &str) -> Result
<Self, Error
> {
62 let (config
, _digest
) = datastore
::config()?
;
63 let (_
, store_config
) = config
.sections
.get(store_name
)
64 .ok_or(format_err
!("no such datastore '{}'", store_name
))?
;
66 let path
= store_config
["path"].as_str().unwrap();
68 let chunk_store
= ChunkStore
::open(store_name
, path
)?
;
70 let gc_status
= GarbageCollectionStatus
::default();
73 chunk_store
: Arc
::new(chunk_store
),
74 gc_mutex
: Mutex
::new(false),
75 last_gc_status
: Mutex
::new(gc_status
),
79 pub fn get_chunk_iterator(
82 impl Iterator
<Item
= (Result
<tools
::fs
::ReadDirEntry
, Error
>, usize)>,
85 self.chunk_store
.get_chunk_iterator()
88 pub fn create_fixed_writer
<P
: AsRef
<Path
>>(&self, filename
: P
, size
: usize, chunk_size
: usize) -> Result
<FixedIndexWriter
, Error
> {
90 let index
= FixedIndexWriter
::create(self.chunk_store
.clone(), filename
.as_ref(), size
, chunk_size
)?
;
95 pub fn open_fixed_reader
<P
: AsRef
<Path
>>(&self, filename
: P
) -> Result
<FixedIndexReader
, Error
> {
97 let full_path
= self.chunk_store
.relative_path(filename
.as_ref());
99 let index
= FixedIndexReader
::open(&full_path
)?
;
104 pub fn create_dynamic_writer
<P
: AsRef
<Path
>>(
106 ) -> Result
<DynamicIndexWriter
, Error
> {
108 let index
= DynamicIndexWriter
::create(
109 self.chunk_store
.clone(), filename
.as_ref())?
;
114 pub fn open_dynamic_reader
<P
: AsRef
<Path
>>(&self, filename
: P
) -> Result
<DynamicIndexReader
, Error
> {
116 let full_path
= self.chunk_store
.relative_path(filename
.as_ref());
118 let index
= DynamicIndexReader
::open(&full_path
)?
;
123 pub fn open_index
<P
>(&self, filename
: P
) -> Result
<Box
<dyn IndexFile
+ Send
>, Error
>
127 let filename
= filename
.as_ref();
128 let out
: Box
<dyn IndexFile
+ Send
> =
129 match archive_type(filename
)?
{
130 ArchiveType
::DynamicIndex
=> Box
::new(self.open_dynamic_reader(filename
)?
),
131 ArchiveType
::FixedIndex
=> Box
::new(self.open_fixed_reader(filename
)?
),
132 _
=> bail
!("cannot open index file of unknown type: {:?}", filename
),
137 pub fn base_path(&self) -> PathBuf
{
138 self.chunk_store
.base_path()
141 /// Clenaup a backup directory
143 /// Removes all files not mentioned in the manifest.
144 pub fn cleanup_backup_dir(&self, backup_dir
: &BackupDir
, manifest
: &BackupManifest
145 ) -> Result
<(), Error
> {
147 let mut full_path
= self.base_path();
148 full_path
.push(backup_dir
.relative_path());
150 let mut wanted_files
= HashSet
::new();
151 wanted_files
.insert(MANIFEST_BLOB_NAME
.to_string());
152 manifest
.files().iter().for_each(|item
| { wanted_files.insert(item.filename.clone()); }
);
154 for item
in tools
::fs
::read_subdir(libc
::AT_FDCWD
, &full_path
)?
{
155 if let Ok(item
) = item
{
156 if let Some(file_type
) = item
.file_type() {
157 if file_type
!= nix
::dir
::Type
::File { continue; }
159 let file_name
= item
.file_name().to_bytes();
160 if file_name
== b
"." || file_name
== b
".." { continue; }
;
162 if let Ok(name
) = std
::str::from_utf8(file_name
) {
163 if wanted_files
.contains(name
) { continue; }
165 println
!("remove unused file {:?}", item
.file_name());
166 let dirfd
= item
.parent_fd();
167 let _res
= unsafe { libc::unlinkat(dirfd, item.file_name().as_ptr(), 0) }
;
174 /// Returns the absolute path for a backup_group
175 pub fn group_path(&self, backup_group
: &BackupGroup
) -> PathBuf
{
176 let mut full_path
= self.base_path();
177 full_path
.push(backup_group
.group_path());
181 /// Returns the absolute path for backup_dir
182 pub fn snapshot_path(&self, backup_dir
: &BackupDir
) -> PathBuf
{
183 let mut full_path
= self.base_path();
184 full_path
.push(backup_dir
.relative_path());
188 /// Remove a complete backup group including all snapshots
189 pub fn remove_backup_group(&self, backup_group
: &BackupGroup
) -> Result
<(), Error
> {
191 let full_path
= self.group_path(backup_group
);
193 log
::info
!("removing backup group {:?}", full_path
);
194 std
::fs
::remove_dir_all(&full_path
)
197 "removing backup group {:?} failed - {}",
206 /// Remove a backup directory including all content
207 pub fn remove_backup_dir(&self, backup_dir
: &BackupDir
) -> Result
<(), Error
> {
209 let full_path
= self.snapshot_path(backup_dir
);
211 log
::info
!("removing backup snapshot {:?}", full_path
);
212 std
::fs
::remove_dir_all(&full_path
)
215 "removing backup snapshot {:?} failed - {}",
224 /// Returns the time of the last successful backup
226 /// Or None if there is no backup in the group (or the group dir does not exist).
227 pub fn last_successful_backup(&self, backup_group
: &BackupGroup
) -> Result
<Option
<DateTime
<Utc
>>, Error
> {
228 let base_path
= self.base_path();
229 let mut group_path
= base_path
.clone();
230 group_path
.push(backup_group
.group_path());
232 if group_path
.exists() {
233 backup_group
.last_successful_backup(&base_path
)
239 /// Returns the backup owner.
241 /// The backup owner is the user who first created the backup group.
242 pub fn get_owner(&self, backup_group
: &BackupGroup
) -> Result
<String
, Error
> {
243 let mut full_path
= self.base_path();
244 full_path
.push(backup_group
.group_path());
245 full_path
.push("owner");
246 let owner
= proxmox
::tools
::fs
::file_read_firstline(full_path
)?
;
247 Ok(owner
.trim_end().to_string()) // remove trailing newline
250 /// Set the backup owner.
251 pub fn set_owner(&self, backup_group
: &BackupGroup
, userid
: &str, force
: bool
) -> Result
<(), Error
> {
252 let mut path
= self.base_path();
253 path
.push(backup_group
.group_path());
256 let mut open_options
= std
::fs
::OpenOptions
::new();
257 open_options
.write(true);
258 open_options
.truncate(true);
261 open_options
.create(true);
263 open_options
.create_new(true);
266 let mut file
= open_options
.open(&path
)
267 .map_err(|err
| format_err
!("unable to create owner file {:?} - {}", path
, err
))?
;
269 write
!(file
, "{}\n", userid
)
270 .map_err(|err
| format_err
!("unable to write owner file {:?} - {}", path
, err
))?
;
275 /// Create a backup group if it does not already exists.
277 /// And set the owner to 'userid'. If the group already exists, it returns the
278 /// current owner (instead of setting the owner).
279 pub fn create_backup_group(&self, backup_group
: &BackupGroup
, userid
: &str) -> Result
<String
, Error
> {
281 // create intermediate path first:
282 let base_path
= self.base_path();
284 let mut full_path
= base_path
.clone();
285 full_path
.push(backup_group
.backup_type());
286 std
::fs
::create_dir_all(&full_path
)?
;
288 full_path
.push(backup_group
.backup_id());
290 // create the last component now
291 match std
::fs
::create_dir(&full_path
) {
293 self.set_owner(backup_group
, userid
, false)?
;
294 let owner
= self.get_owner(backup_group
)?
; // just to be sure
297 Err(ref err
) if err
.kind() == io
::ErrorKind
::AlreadyExists
=> {
298 let owner
= self.get_owner(backup_group
)?
; // just to be sure
301 Err(err
) => bail
!("unable to create backup group {:?} - {}", full_path
, err
),
305 /// Creates a new backup snapshot inside a BackupGroup
307 /// The BackupGroup directory needs to exist.
308 pub fn create_backup_dir(&self, backup_dir
: &BackupDir
) -> Result
<(PathBuf
, bool
), io
::Error
> {
309 let relative_path
= backup_dir
.relative_path();
310 let mut full_path
= self.base_path();
311 full_path
.push(&relative_path
);
313 match std
::fs
::create_dir(&full_path
) {
314 Ok(_
) => Ok((relative_path
, true)),
315 Err(ref e
) if e
.kind() == io
::ErrorKind
::AlreadyExists
=> Ok((relative_path
, false)),
320 pub fn list_images(&self) -> Result
<Vec
<PathBuf
>, Error
> {
321 let base
= self.base_path();
323 let mut list
= vec
![];
325 use walkdir
::WalkDir
;
327 let walker
= WalkDir
::new(&base
).same_file_system(true).into_iter();
329 // make sure we skip .chunks (and other hidden files to keep it simple)
330 fn is_hidden(entry
: &walkdir
::DirEntry
) -> bool
{
333 .map(|s
| s
.starts_with("."))
337 for entry
in walker
.filter_entry(|e
| !is_hidden(e
)) {
338 let path
= entry?
.into_path();
339 if let Ok(archive_type
) = archive_type(&path
) {
340 if archive_type
== ArchiveType
::FixedIndex
|| archive_type
== ArchiveType
::DynamicIndex
{
349 // mark chunks used by ``index`` as used
350 fn index_mark_used_chunks
<I
: IndexFile
>(
353 file_name
: &Path
, // only used for error reporting
354 status
: &mut GarbageCollectionStatus
,
355 ) -> Result
<(), Error
> {
357 status
.index_file_count
+= 1;
358 status
.index_data_bytes
+= index
.index_bytes();
360 for pos
in 0..index
.index_count() {
361 tools
::fail_on_shutdown()?
;
362 let digest
= index
.index_digest(pos
).unwrap();
363 if let Err(err
) = self.chunk_store
.touch_chunk(digest
) {
364 bail
!("unable to access chunk {}, required by {:?} - {}",
365 proxmox
::tools
::digest_to_hex(digest
), file_name
, err
);
371 fn mark_used_chunks(&self, status
: &mut GarbageCollectionStatus
) -> Result
<(), Error
> {
373 let image_list
= self.list_images()?
;
375 for path
in image_list
{
377 tools
::fail_on_shutdown()?
;
379 if let Ok(archive_type
) = archive_type(&path
) {
380 if archive_type
== ArchiveType
::FixedIndex
{
381 let index
= self.open_fixed_reader(&path
)?
;
382 self.index_mark_used_chunks(index
, &path
, status
)?
;
383 } else if archive_type
== ArchiveType
::DynamicIndex
{
384 let index
= self.open_dynamic_reader(&path
)?
;
385 self.index_mark_used_chunks(index
, &path
, status
)?
;
393 pub fn last_gc_status(&self) -> GarbageCollectionStatus
{
394 self.last_gc_status
.lock().unwrap().clone()
397 pub fn garbage_collection(&self, worker
: Arc
<WorkerTask
>) -> Result
<(), Error
> {
399 if let Ok(ref mut _mutex
) = self.gc_mutex
.try_lock() {
401 let _exclusive_lock
= self.chunk_store
.try_exclusive_lock()?
;
403 let now
= unsafe { libc::time(std::ptr::null_mut()) }
;
405 let oldest_writer
= self.chunk_store
.oldest_writer().unwrap_or(now
);
407 let mut gc_status
= GarbageCollectionStatus
::default();
408 gc_status
.upid
= Some(worker
.to_string());
410 worker
.log("Start GC phase1 (mark used chunks)");
412 self.mark_used_chunks(&mut gc_status
)?
;
414 worker
.log("Start GC phase2 (sweep unused chunks)");
415 self.chunk_store
.sweep_unused_chunks(oldest_writer
, &mut gc_status
, worker
.clone())?
;
417 worker
.log(&format
!("Removed bytes: {}", gc_status
.removed_bytes
));
418 worker
.log(&format
!("Removed chunks: {}", gc_status
.removed_chunks
));
419 if gc_status
.pending_bytes
> 0 {
420 worker
.log(&format
!("Pending removals: {} bytes ({} chunks)", gc_status
.pending_bytes
, gc_status
.pending_chunks
));
423 worker
.log(&format
!("Original data bytes: {}", gc_status
.index_data_bytes
));
425 if gc_status
.index_data_bytes
> 0 {
426 let comp_per
= (gc_status
.disk_bytes
*100)/gc_status
.index_data_bytes
;
427 worker
.log(&format
!("Disk bytes: {} ({} %)", gc_status
.disk_bytes
, comp_per
));
430 worker
.log(&format
!("Disk chunks: {}", gc_status
.disk_chunks
));
432 if gc_status
.disk_chunks
> 0 {
433 let avg_chunk
= gc_status
.disk_bytes
/(gc_status
.disk_chunks
as u64);
434 worker
.log(&format
!("Average chunk size: {}", avg_chunk
));
437 *self.last_gc_status
.lock().unwrap() = gc_status
;
440 bail
!("Start GC failed - (already running/locked)");
446 pub fn try_shared_chunk_store_lock(&self) -> Result
<tools
::ProcessLockSharedGuard
, Error
> {
447 self.chunk_store
.try_shared_lock()
450 pub fn chunk_path(&self, digest
:&[u8; 32]) -> (PathBuf
, String
) {
451 self.chunk_store
.chunk_path(digest
)
454 pub fn cond_touch_chunk(&self, digest
: &[u8; 32], fail_if_not_exist
: bool
) -> Result
<bool
, Error
> {
455 self.chunk_store
.cond_touch_chunk(digest
, fail_if_not_exist
)
462 ) -> Result
<(bool
, u64), Error
> {
463 self.chunk_store
.insert_chunk(chunk
, digest
)