]>
git.proxmox.com Git - proxmox-backup.git/blob - src/backup/datastore.rs
4 use std
::path
::{PathBuf, Path}
;
5 use std
::collections
::HashMap
;
6 use lazy_static
::lazy_static
;
7 use std
::sync
::{Mutex, Arc}
;
10 use crate::config
::datastore
;
11 use super::chunk_store
::*;
12 use super::fixed_index
::*;
13 use super::dynamic_index
::*;
15 use super::backup_info
::*;
17 use crate::server
::WorkerTask
;
20 static ref DATASTORE_MAP
: Mutex
<HashMap
<String
, Arc
<DataStore
>>> = Mutex
::new(HashMap
::new());
23 /// Datastore Management
25 /// A Datastore can store severals backups, and provides the
26 /// management interface for backup.
27 pub struct DataStore
{
28 chunk_store
: Arc
<ChunkStore
>,
29 gc_mutex
: Mutex
<bool
>,
30 last_gc_status
: Mutex
<GarbageCollectionStatus
>,
35 pub fn lookup_datastore(name
: &str) -> Result
<Arc
<DataStore
>, Error
> {
37 let config
= datastore
::config()?
;
38 let (_
, store_config
) = config
.sections
.get(name
)
39 .ok_or(format_err
!("no such datastore '{}'", name
))?
;
41 let path
= store_config
["path"].as_str().unwrap();
43 let mut map
= DATASTORE_MAP
.lock().unwrap();
45 if let Some(datastore
) = map
.get(name
) {
46 // Compare Config - if changed, create new Datastore object!
47 if datastore
.chunk_store
.base
== PathBuf
::from(path
) {
48 return Ok(datastore
.clone());
52 let datastore
= DataStore
::open(name
)?
;
54 let datastore
= Arc
::new(datastore
);
55 map
.insert(name
.to_string(), datastore
.clone());
60 pub fn open(store_name
: &str) -> Result
<Self, Error
> {
62 let config
= datastore
::config()?
;
63 let (_
, store_config
) = config
.sections
.get(store_name
)
64 .ok_or(format_err
!("no such datastore '{}'", store_name
))?
;
66 let path
= store_config
["path"].as_str().unwrap();
68 let chunk_store
= ChunkStore
::open(store_name
, path
)?
;
70 let gc_status
= GarbageCollectionStatus
::default();
73 chunk_store
: Arc
::new(chunk_store
),
74 gc_mutex
: Mutex
::new(false),
75 last_gc_status
: Mutex
::new(gc_status
),
79 pub fn get_chunk_iterator(
81 print_percentage
: bool
,
83 impl Iterator
<Item
= Result
<tools
::fs
::ReadDirEntry
, Error
>>,
86 self.chunk_store
.get_chunk_iterator(print_percentage
)
89 pub fn create_fixed_writer
<P
: AsRef
<Path
>>(&self, filename
: P
, size
: usize, chunk_size
: usize) -> Result
<FixedIndexWriter
, Error
> {
91 let index
= FixedIndexWriter
::create(self.chunk_store
.clone(), filename
.as_ref(), size
, chunk_size
)?
;
96 pub fn open_fixed_reader
<P
: AsRef
<Path
>>(&self, filename
: P
) -> Result
<FixedIndexReader
, Error
> {
98 let index
= FixedIndexReader
::open(self.chunk_store
.clone(), filename
.as_ref())?
;
103 pub fn create_dynamic_writer
<P
: AsRef
<Path
>>(
105 ) -> Result
<DynamicIndexWriter
, Error
> {
107 let index
= DynamicIndexWriter
::create(
108 self.chunk_store
.clone(), filename
.as_ref())?
;
113 pub fn open_dynamic_reader
<P
: AsRef
<Path
>>(&self, filename
: P
) -> Result
<DynamicIndexReader
, Error
> {
115 let full_path
= self.chunk_store
.relative_path(filename
.as_ref());
117 let index
= DynamicIndexReader
::open(&full_path
)?
;
122 pub fn open_index
<P
>(&self, filename
: P
) -> Result
<Box
<dyn IndexFile
+ Send
>, Error
>
126 let filename
= filename
.as_ref();
127 let out
: Box
<dyn IndexFile
+ Send
> =
128 match filename
.extension().and_then(|ext
| ext
.to_str()) {
129 Some("didx") => Box
::new(self.open_dynamic_reader(filename
)?
),
130 Some("fidx") => Box
::new(self.open_fixed_reader(filename
)?
),
131 _
=> bail
!("cannot open index file of unknown type: {:?}", filename
),
136 pub fn base_path(&self) -> PathBuf
{
137 self.chunk_store
.base_path()
140 /// Remove a backup directory including all content
141 pub fn remove_backup_dir(&self, backup_dir
: &BackupDir
,
142 ) -> Result
<(), io
::Error
> {
144 let relative_path
= backup_dir
.relative_path();
145 let mut full_path
= self.base_path();
146 full_path
.push(&relative_path
);
148 log
::info
!("removing backup {:?}", full_path
);
149 std
::fs
::remove_dir_all(full_path
)?
;
154 pub fn create_backup_dir(&self, backup_dir
: &BackupDir
) -> Result
<(PathBuf
, bool
), io
::Error
> {
156 // create intermediate path first:
157 let mut full_path
= self.base_path();
158 full_path
.push(backup_dir
.group().group_path());
159 std
::fs
::create_dir_all(&full_path
)?
;
161 let relative_path
= backup_dir
.relative_path();
162 let mut full_path
= self.base_path();
163 full_path
.push(&relative_path
);
165 // create the last component now
166 match std
::fs
::create_dir(&full_path
) {
167 Ok(_
) => Ok((relative_path
, true)),
168 Err(ref e
) if e
.kind() == io
::ErrorKind
::AlreadyExists
=> Ok((relative_path
, false)),
173 pub fn list_images(&self) -> Result
<Vec
<PathBuf
>, Error
> {
174 let base
= self.base_path();
176 let mut list
= vec
![];
178 use walkdir
::WalkDir
;
180 let walker
= WalkDir
::new(&base
).same_file_system(true).into_iter();
182 // make sure we skip .chunks (and other hidden files to keep it simple)
183 fn is_hidden(entry
: &walkdir
::DirEntry
) -> bool
{
186 .map(|s
| s
.starts_with("."))
190 for entry
in walker
.filter_entry(|e
| !is_hidden(e
)) {
191 let path
= entry?
.into_path();
192 if let Some(ext
) = path
.extension() {
195 } else if ext
== "didx" {
204 fn mark_used_chunks(&self, status
: &mut GarbageCollectionStatus
) -> Result
<(), Error
> {
206 let image_list
= self.list_images()?
;
208 for path
in image_list
{
210 tools
::fail_on_shutdown()?
;
212 if let Some(ext
) = path
.extension() {
214 let index
= self.open_fixed_reader(&path
)?
;
215 index
.mark_used_chunks(status
)?
;
216 } else if ext
== "didx" {
217 let index
= self.open_dynamic_reader(&path
)?
;
218 index
.mark_used_chunks(status
)?
;
226 pub fn last_gc_status(&self) -> GarbageCollectionStatus
{
227 self.last_gc_status
.lock().unwrap().clone()
230 pub fn garbage_collection(&self, worker
: Arc
<WorkerTask
>) -> Result
<(), Error
> {
232 if let Ok(ref mut _mutex
) = self.gc_mutex
.try_lock() {
234 let _exclusive_lock
= self.chunk_store
.try_exclusive_lock()?
;
236 let oldest_writer
= self.chunk_store
.oldest_writer();
238 let mut gc_status
= GarbageCollectionStatus
::default();
239 gc_status
.upid
= Some(worker
.to_string());
241 worker
.log("Start GC phase1 (mark chunks)");
243 self.mark_used_chunks(&mut gc_status
)?
;
245 worker
.log("Start GC phase2 (sweep unused chunks)");
246 self.chunk_store
.sweep_unused_chunks(oldest_writer
, &mut gc_status
)?
;
248 worker
.log(&format
!("Used bytes: {}", gc_status
.used_bytes
));
249 worker
.log(&format
!("Used chunks: {}", gc_status
.used_chunks
));
250 worker
.log(&format
!("Disk bytes: {}", gc_status
.disk_bytes
));
251 worker
.log(&format
!("Disk chunks: {}", gc_status
.disk_chunks
));
253 *self.last_gc_status
.lock().unwrap() = gc_status
;
256 bail
!("Start GC failed - (already running/locked)");
262 pub fn chunk_path(&self, digest
:&[u8; 32]) -> (PathBuf
, String
) {
263 self.chunk_store
.chunk_path(digest
)
269 ) -> Result
<(bool
, u64), Error
> {
270 self.chunk_store
.insert_chunk(chunk
)