1 use std
::collections
::HashMap
;
3 use std
::path
::{Path, PathBuf}
;
4 use std
::sync
::{Arc, Mutex}
;
7 use lazy_static
::lazy_static
;
9 use super::backup_info
::BackupDir
;
10 use super::chunk_store
::{ChunkStore, GarbageCollectionStatus}
;
11 use super::dynamic_index
::{DynamicIndexReader, DynamicIndexWriter}
;
12 use super::fixed_index
::{FixedIndexReader, FixedIndexWriter}
;
15 use crate::config
::datastore
;
16 use crate::server
::WorkerTask
;
20 static ref DATASTORE_MAP
: Mutex
<HashMap
<String
, Arc
<DataStore
>>> = Mutex
::new(HashMap
::new());
23 /// Datastore Management
25 /// A Datastore can store severals backups, and provides the
26 /// management interface for backup.
27 pub struct DataStore
{
28 chunk_store
: Arc
<ChunkStore
>,
29 gc_mutex
: Mutex
<bool
>,
30 last_gc_status
: Mutex
<GarbageCollectionStatus
>,
35 pub fn lookup_datastore(name
: &str) -> Result
<Arc
<DataStore
>, Error
> {
37 let config
= datastore
::config()?
;
38 let (_
, store_config
) = config
.sections
.get(name
)
39 .ok_or(format_err
!("no such datastore '{}'", name
))?
;
41 let path
= store_config
["path"].as_str().unwrap();
43 let mut map
= DATASTORE_MAP
.lock().unwrap();
45 if let Some(datastore
) = map
.get(name
) {
46 // Compare Config - if changed, create new Datastore object!
47 if datastore
.chunk_store
.base
== PathBuf
::from(path
) {
48 return Ok(datastore
.clone());
52 let datastore
= DataStore
::open(name
)?
;
54 let datastore
= Arc
::new(datastore
);
55 map
.insert(name
.to_string(), datastore
.clone());
60 pub fn open(store_name
: &str) -> Result
<Self, Error
> {
62 let config
= datastore
::config()?
;
63 let (_
, store_config
) = config
.sections
.get(store_name
)
64 .ok_or(format_err
!("no such datastore '{}'", store_name
))?
;
66 let path
= store_config
["path"].as_str().unwrap();
68 let chunk_store
= ChunkStore
::open(store_name
, path
)?
;
70 let gc_status
= GarbageCollectionStatus
::default();
73 chunk_store
: Arc
::new(chunk_store
),
74 gc_mutex
: Mutex
::new(false),
75 last_gc_status
: Mutex
::new(gc_status
),
79 pub fn get_chunk_iterator(
82 impl Iterator
<Item
= (Result
<tools
::fs
::ReadDirEntry
, Error
>, usize)>,
85 self.chunk_store
.get_chunk_iterator()
88 pub fn create_fixed_writer
<P
: AsRef
<Path
>>(&self, filename
: P
, size
: usize, chunk_size
: usize) -> Result
<FixedIndexWriter
, Error
> {
90 let index
= FixedIndexWriter
::create(self.chunk_store
.clone(), filename
.as_ref(), size
, chunk_size
)?
;
95 pub fn open_fixed_reader
<P
: AsRef
<Path
>>(&self, filename
: P
) -> Result
<FixedIndexReader
, Error
> {
97 let full_path
= self.chunk_store
.relative_path(filename
.as_ref());
99 let index
= FixedIndexReader
::open(&full_path
)?
;
104 pub fn create_dynamic_writer
<P
: AsRef
<Path
>>(
106 ) -> Result
<DynamicIndexWriter
, Error
> {
108 let index
= DynamicIndexWriter
::create(
109 self.chunk_store
.clone(), filename
.as_ref())?
;
114 pub fn open_dynamic_reader
<P
: AsRef
<Path
>>(&self, filename
: P
) -> Result
<DynamicIndexReader
, Error
> {
116 let full_path
= self.chunk_store
.relative_path(filename
.as_ref());
118 let index
= DynamicIndexReader
::open(&full_path
)?
;
123 pub fn open_index
<P
>(&self, filename
: P
) -> Result
<Box
<dyn IndexFile
+ Send
>, Error
>
127 let filename
= filename
.as_ref();
128 let out
: Box
<dyn IndexFile
+ Send
> =
129 match filename
.extension().and_then(|ext
| ext
.to_str()) {
130 Some("didx") => Box
::new(self.open_dynamic_reader(filename
)?
),
131 Some("fidx") => Box
::new(self.open_fixed_reader(filename
)?
),
132 _
=> bail
!("cannot open index file of unknown type: {:?}", filename
),
137 pub fn base_path(&self) -> PathBuf
{
138 self.chunk_store
.base_path()
141 /// Remove a backup directory including all content
142 pub fn remove_backup_dir(&self, backup_dir
: &BackupDir
,
143 ) -> Result
<(), io
::Error
> {
145 let relative_path
= backup_dir
.relative_path();
146 let mut full_path
= self.base_path();
147 full_path
.push(&relative_path
);
149 log
::info
!("removing backup {:?}", full_path
);
150 std
::fs
::remove_dir_all(full_path
)?
;
155 pub fn create_backup_dir(&self, backup_dir
: &BackupDir
) -> Result
<(PathBuf
, bool
), io
::Error
> {
157 // create intermediate path first:
158 let mut full_path
= self.base_path();
159 full_path
.push(backup_dir
.group().group_path());
160 std
::fs
::create_dir_all(&full_path
)?
;
162 let relative_path
= backup_dir
.relative_path();
163 let mut full_path
= self.base_path();
164 full_path
.push(&relative_path
);
166 // create the last component now
167 match std
::fs
::create_dir(&full_path
) {
168 Ok(_
) => Ok((relative_path
, true)),
169 Err(ref e
) if e
.kind() == io
::ErrorKind
::AlreadyExists
=> Ok((relative_path
, false)),
174 pub fn list_images(&self) -> Result
<Vec
<PathBuf
>, Error
> {
175 let base
= self.base_path();
177 let mut list
= vec
![];
179 use walkdir
::WalkDir
;
181 let walker
= WalkDir
::new(&base
).same_file_system(true).into_iter();
183 // make sure we skip .chunks (and other hidden files to keep it simple)
184 fn is_hidden(entry
: &walkdir
::DirEntry
) -> bool
{
187 .map(|s
| s
.starts_with("."))
191 for entry
in walker
.filter_entry(|e
| !is_hidden(e
)) {
192 let path
= entry?
.into_path();
193 if let Some(ext
) = path
.extension() {
194 if ext
== "fidx" || ext
== "didx"{
203 // mark chunks used by ``index`` as used
204 fn index_mark_used_chunks
<I
: IndexFile
>(
207 file_name
: &Path
, // only used for error reporting
208 status
: &mut GarbageCollectionStatus
,
209 ) -> Result
<(), Error
> {
211 status
.index_file_count
+= 1;
212 status
.index_data_bytes
+= index
.index_bytes();
214 for pos
in 0..index
.index_count() {
215 tools
::fail_on_shutdown()?
;
216 let digest
= index
.index_digest(pos
).unwrap();
217 if let Err(err
) = self.chunk_store
.touch_chunk(digest
) {
218 bail
!("unable to access chunk {}, required by {:?} - {}",
219 proxmox
::tools
::digest_to_hex(digest
), file_name
, err
);
225 fn mark_used_chunks(&self, status
: &mut GarbageCollectionStatus
) -> Result
<(), Error
> {
227 let image_list
= self.list_images()?
;
229 for path
in image_list
{
231 tools
::fail_on_shutdown()?
;
233 if let Some(ext
) = path
.extension() {
235 let index
= self.open_fixed_reader(&path
)?
;
236 self.index_mark_used_chunks(index
, &path
, status
)?
;
237 } else if ext
== "didx" {
238 let index
= self.open_dynamic_reader(&path
)?
;
239 self.index_mark_used_chunks(index
, &path
, status
)?
;
247 pub fn last_gc_status(&self) -> GarbageCollectionStatus
{
248 self.last_gc_status
.lock().unwrap().clone()
251 pub fn garbage_collection(&self, worker
: Arc
<WorkerTask
>) -> Result
<(), Error
> {
253 if let Ok(ref mut _mutex
) = self.gc_mutex
.try_lock() {
255 let _exclusive_lock
= self.chunk_store
.try_exclusive_lock()?
;
257 let oldest_writer
= self.chunk_store
.oldest_writer();
259 let mut gc_status
= GarbageCollectionStatus
::default();
260 gc_status
.upid
= Some(worker
.to_string());
262 worker
.log("Start GC phase1 (mark used chunks)");
264 self.mark_used_chunks(&mut gc_status
)?
;
266 worker
.log("Start GC phase2 (sweep unused chunks)");
267 self.chunk_store
.sweep_unused_chunks(oldest_writer
, &mut gc_status
, worker
.clone())?
;
269 worker
.log(&format
!("Removed bytes: {}", gc_status
.removed_bytes
));
270 worker
.log(&format
!("Removed chunks: {}", gc_status
.removed_chunks
));
271 worker
.log(&format
!("Original data bytes: {}", gc_status
.index_data_bytes
));
272 let comp_per
= (gc_status
.disk_bytes
*100)/gc_status
.index_data_bytes
;
273 worker
.log(&format
!("Disk bytes: {} ({} %)", gc_status
.disk_bytes
, comp_per
));
274 worker
.log(&format
!("Disk chunks: {}", gc_status
.disk_chunks
));
275 let avg_chunk
= gc_status
.disk_bytes
/(gc_status
.disk_chunks
as u64);
276 worker
.log(&format
!("Average chunk size: {}", avg_chunk
));
278 *self.last_gc_status
.lock().unwrap() = gc_status
;
281 bail
!("Start GC failed - (already running/locked)");
287 pub fn chunk_path(&self, digest
:&[u8; 32]) -> (PathBuf
, String
) {
288 self.chunk_store
.chunk_path(digest
)
295 ) -> Result
<(bool
, u64), Error
> {
296 self.chunk_store
.insert_chunk(chunk
, digest
)