]>
git.proxmox.com Git - proxmox-backup.git/blob - src/backup/datastore.rs
4 use std
::path
::{PathBuf, Path}
;
5 use std
::collections
::HashMap
;
6 use lazy_static
::lazy_static
;
7 use std
::sync
::{Mutex, Arc}
;
10 use crate::config
::datastore
;
11 use super::chunk_store
::*;
12 use super::fixed_index
::*;
13 use super::dynamic_index
::*;
15 use super::backup_info
::*;
17 use crate::server
::WorkerTask
;
20 static ref DATASTORE_MAP
: Mutex
<HashMap
<String
, Arc
<DataStore
>>> = Mutex
::new(HashMap
::new());
23 /// Datastore Management
25 /// A Datastore can store severals backups, and provides the
26 /// management interface for backup.
27 pub struct DataStore
{
28 chunk_store
: Arc
<ChunkStore
>,
29 gc_mutex
: Mutex
<bool
>,
30 last_gc_status
: Mutex
<GarbageCollectionStatus
>,
35 pub fn lookup_datastore(name
: &str) -> Result
<Arc
<DataStore
>, Error
> {
37 let config
= datastore
::config()?
;
38 let (_
, store_config
) = config
.sections
.get(name
)
39 .ok_or(format_err
!("no such datastore '{}'", name
))?
;
41 let path
= store_config
["path"].as_str().unwrap();
43 let mut map
= DATASTORE_MAP
.lock().unwrap();
45 if let Some(datastore
) = map
.get(name
) {
46 // Compare Config - if changed, create new Datastore object!
47 if datastore
.chunk_store
.base
== PathBuf
::from(path
) {
48 return Ok(datastore
.clone());
52 let datastore
= DataStore
::open(name
)?
;
54 let datastore
= Arc
::new(datastore
);
55 map
.insert(name
.to_string(), datastore
.clone());
60 pub fn open(store_name
: &str) -> Result
<Self, Error
> {
62 let config
= datastore
::config()?
;
63 let (_
, store_config
) = config
.sections
.get(store_name
)
64 .ok_or(format_err
!("no such datastore '{}'", store_name
))?
;
66 let path
= store_config
["path"].as_str().unwrap();
68 let chunk_store
= ChunkStore
::open(store_name
, path
)?
;
70 let gc_status
= GarbageCollectionStatus
::default();
73 chunk_store
: Arc
::new(chunk_store
),
74 gc_mutex
: Mutex
::new(false),
75 last_gc_status
: Mutex
::new(gc_status
),
79 pub fn get_chunk_iterator(
81 print_percentage
: bool
,
83 impl Iterator
<Item
= Result
<tools
::fs
::ReadDirEntry
, Error
>>,
86 self.chunk_store
.get_chunk_iterator(print_percentage
)
89 pub fn create_fixed_writer
<P
: AsRef
<Path
>>(&self, filename
: P
, size
: usize, chunk_size
: usize) -> Result
<FixedIndexWriter
, Error
> {
91 let index
= FixedIndexWriter
::create(self.chunk_store
.clone(), filename
.as_ref(), size
, chunk_size
)?
;
96 pub fn open_fixed_reader
<P
: AsRef
<Path
>>(&self, filename
: P
) -> Result
<FixedIndexReader
, Error
> {
98 let index
= FixedIndexReader
::open(self.chunk_store
.clone(), filename
.as_ref())?
;
103 pub fn create_dynamic_writer
<P
: AsRef
<Path
>>(
105 ) -> Result
<DynamicIndexWriter
, Error
> {
107 let index
= DynamicIndexWriter
::create(
108 self.chunk_store
.clone(), filename
.as_ref())?
;
113 pub fn open_dynamic_reader
<P
: AsRef
<Path
>>(&self, filename
: P
) -> Result
<DynamicIndexReader
, Error
> {
115 let index
= DynamicIndexReader
::open(self.chunk_store
.clone(), filename
.as_ref())?
;
120 pub fn open_index
<P
>(&self, filename
: P
) -> Result
<Box
<dyn IndexFile
+ Send
>, Error
>
124 let filename
= filename
.as_ref();
125 let out
: Box
<dyn IndexFile
+ Send
> =
126 match filename
.extension().and_then(|ext
| ext
.to_str()) {
127 Some("didx") => Box
::new(self.open_dynamic_reader(filename
)?
),
128 Some("fidx") => Box
::new(self.open_fixed_reader(filename
)?
),
129 _
=> bail
!("cannot open index file of unknown type: {:?}", filename
),
134 pub fn base_path(&self) -> PathBuf
{
135 self.chunk_store
.base_path()
138 /// Remove a backup directory including all content
139 pub fn remove_backup_dir(&self, backup_dir
: &BackupDir
,
140 ) -> Result
<(), io
::Error
> {
142 let relative_path
= backup_dir
.relative_path();
143 let mut full_path
= self.base_path();
144 full_path
.push(&relative_path
);
146 log
::info
!("removing backup {:?}", full_path
);
147 std
::fs
::remove_dir_all(full_path
)?
;
152 pub fn create_backup_dir(&self, backup_dir
: &BackupDir
) -> Result
<(PathBuf
, bool
), io
::Error
> {
154 // create intermediate path first:
155 let mut full_path
= self.base_path();
156 full_path
.push(backup_dir
.group().group_path());
157 std
::fs
::create_dir_all(&full_path
)?
;
159 let relative_path
= backup_dir
.relative_path();
160 let mut full_path
= self.base_path();
161 full_path
.push(&relative_path
);
163 // create the last component now
164 match std
::fs
::create_dir(&full_path
) {
165 Ok(_
) => Ok((relative_path
, true)),
166 Err(ref e
) if e
.kind() == io
::ErrorKind
::AlreadyExists
=> Ok((relative_path
, false)),
171 pub fn list_images(&self) -> Result
<Vec
<PathBuf
>, Error
> {
172 let base
= self.base_path();
174 let mut list
= vec
![];
176 use walkdir
::WalkDir
;
178 let walker
= WalkDir
::new(&base
).same_file_system(true).into_iter();
180 // make sure we skip .chunks (and other hidden files to keep it simple)
181 fn is_hidden(entry
: &walkdir
::DirEntry
) -> bool
{
184 .map(|s
| s
.starts_with("."))
188 for entry
in walker
.filter_entry(|e
| !is_hidden(e
)) {
189 let path
= entry?
.into_path();
190 if let Some(ext
) = path
.extension() {
193 } else if ext
== "didx" {
202 fn mark_used_chunks(&self, status
: &mut GarbageCollectionStatus
) -> Result
<(), Error
> {
204 let image_list
= self.list_images()?
;
206 for path
in image_list
{
208 tools
::fail_on_shutdown()?
;
210 if let Some(ext
) = path
.extension() {
212 let index
= self.open_fixed_reader(&path
)?
;
213 index
.mark_used_chunks(status
)?
;
214 } else if ext
== "didx" {
215 let index
= self.open_dynamic_reader(&path
)?
;
216 index
.mark_used_chunks(status
)?
;
224 pub fn last_gc_status(&self) -> GarbageCollectionStatus
{
225 self.last_gc_status
.lock().unwrap().clone()
228 pub fn garbage_collection(&self, worker
: Arc
<WorkerTask
>) -> Result
<(), Error
> {
230 if let Ok(ref mut _mutex
) = self.gc_mutex
.try_lock() {
232 let _exclusive_lock
= self.chunk_store
.try_exclusive_lock()?
;
234 let oldest_writer
= self.chunk_store
.oldest_writer();
236 let mut gc_status
= GarbageCollectionStatus
::default();
237 gc_status
.upid
= Some(worker
.to_string());
239 worker
.log("Start GC phase1 (mark chunks)");
241 self.mark_used_chunks(&mut gc_status
)?
;
243 worker
.log("Start GC phase2 (sweep unused chunks)");
244 self.chunk_store
.sweep_unused_chunks(oldest_writer
, &mut gc_status
)?
;
246 worker
.log(&format
!("Used bytes: {}", gc_status
.used_bytes
));
247 worker
.log(&format
!("Used chunks: {}", gc_status
.used_chunks
));
248 worker
.log(&format
!("Disk bytes: {}", gc_status
.disk_bytes
));
249 worker
.log(&format
!("Disk chunks: {}", gc_status
.disk_chunks
));
251 *self.last_gc_status
.lock().unwrap() = gc_status
;
254 bail
!("Start GC failed - (already running/locked)");
263 ) -> Result
<(bool
, u64), Error
> {
264 self.chunk_store
.insert_chunk(chunk
)