]>
git.proxmox.com Git - proxmox-backup.git/blob - src/backup/datastore.rs
4 use std
::path
::{PathBuf, Path}
;
5 use std
::collections
::HashMap
;
6 use lazy_static
::lazy_static
;
7 use std
::sync
::{Mutex, Arc}
;
10 use crate::config
::datastore
;
11 use super::chunk_store
::*;
12 use super::fixed_index
::*;
13 use super::dynamic_index
::*;
15 use super::backup_info
::*;
16 use crate::server
::WorkerTask
;
19 static ref DATASTORE_MAP
: Mutex
<HashMap
<String
, Arc
<DataStore
>>> = Mutex
::new(HashMap
::new());
22 /// Datastore Management
24 /// A Datastore can store severals backups, and provides the
25 /// management interface for backup.
26 pub struct DataStore
{
27 chunk_store
: Arc
<ChunkStore
>,
28 gc_mutex
: Mutex
<bool
>,
29 last_gc_status
: Mutex
<GarbageCollectionStatus
>,
34 pub fn lookup_datastore(name
: &str) -> Result
<Arc
<DataStore
>, Error
> {
36 let config
= datastore
::config()?
;
37 let (_
, store_config
) = config
.sections
.get(name
)
38 .ok_or(format_err
!("no such datastore '{}'", name
))?
;
40 let path
= store_config
["path"].as_str().unwrap();
42 let mut map
= DATASTORE_MAP
.lock().unwrap();
44 if let Some(datastore
) = map
.get(name
) {
45 // Compare Config - if changed, create new Datastore object!
46 if datastore
.chunk_store
.base
== PathBuf
::from(path
) {
47 return Ok(datastore
.clone());
51 let datastore
= DataStore
::open(name
)?
;
53 let datastore
= Arc
::new(datastore
);
54 map
.insert(name
.to_string(), datastore
.clone());
59 pub fn open(store_name
: &str) -> Result
<Self, Error
> {
61 let config
= datastore
::config()?
;
62 let (_
, store_config
) = config
.sections
.get(store_name
)
63 .ok_or(format_err
!("no such datastore '{}'", store_name
))?
;
65 let path
= store_config
["path"].as_str().unwrap();
67 let chunk_store
= ChunkStore
::open(store_name
, path
)?
;
69 let gc_status
= GarbageCollectionStatus
::default();
72 chunk_store
: Arc
::new(chunk_store
),
73 gc_mutex
: Mutex
::new(false),
74 last_gc_status
: Mutex
::new(gc_status
),
78 pub fn get_chunk_iterator(
80 print_percentage
: bool
,
82 impl Iterator
<Item
= Result
<tools
::fs
::ReadDirEntry
, Error
>>,
85 self.chunk_store
.get_chunk_iterator(print_percentage
)
88 pub fn create_fixed_writer
<P
: AsRef
<Path
>>(&self, filename
: P
, size
: usize, chunk_size
: usize) -> Result
<FixedIndexWriter
, Error
> {
90 let index
= FixedIndexWriter
::create(self.chunk_store
.clone(), filename
.as_ref(), size
, chunk_size
)?
;
95 pub fn open_fixed_reader
<P
: AsRef
<Path
>>(&self, filename
: P
) -> Result
<FixedIndexReader
, Error
> {
97 let index
= FixedIndexReader
::open(self.chunk_store
.clone(), filename
.as_ref())?
;
102 pub fn create_dynamic_writer
<P
: AsRef
<Path
>>(
104 ) -> Result
<DynamicIndexWriter
, Error
> {
106 let index
= DynamicIndexWriter
::create(
107 self.chunk_store
.clone(), filename
.as_ref())?
;
112 pub fn open_dynamic_reader
<P
: AsRef
<Path
>>(&self, filename
: P
) -> Result
<DynamicIndexReader
, Error
> {
114 let index
= DynamicIndexReader
::open(self.chunk_store
.clone(), filename
.as_ref())?
;
119 pub fn open_index
<P
>(&self, filename
: P
) -> Result
<Box
<dyn IndexFile
+ Send
>, Error
>
123 let filename
= filename
.as_ref();
124 let out
: Box
<dyn IndexFile
+ Send
> =
125 match filename
.extension().and_then(|ext
| ext
.to_str()) {
126 Some("didx") => Box
::new(self.open_dynamic_reader(filename
)?
),
127 Some("fidx") => Box
::new(self.open_fixed_reader(filename
)?
),
128 _
=> bail
!("cannot open index file of unknown type: {:?}", filename
),
133 pub fn base_path(&self) -> PathBuf
{
134 self.chunk_store
.base_path()
137 /// Remove a backup directory including all content
138 pub fn remove_backup_dir(&self, backup_dir
: &BackupDir
,
139 ) -> Result
<(), io
::Error
> {
141 let relative_path
= backup_dir
.relative_path();
142 let mut full_path
= self.base_path();
143 full_path
.push(&relative_path
);
145 log
::info
!("removing backup {:?}", full_path
);
146 std
::fs
::remove_dir_all(full_path
)?
;
151 pub fn create_backup_dir(&self, backup_dir
: &BackupDir
) -> Result
<(PathBuf
, bool
), io
::Error
> {
153 // create intermediate path first:
154 let mut full_path
= self.base_path();
155 full_path
.push(backup_dir
.group().group_path());
156 std
::fs
::create_dir_all(&full_path
)?
;
158 let relative_path
= backup_dir
.relative_path();
159 let mut full_path
= self.base_path();
160 full_path
.push(&relative_path
);
162 // create the last component now
163 match std
::fs
::create_dir(&full_path
) {
164 Ok(_
) => Ok((relative_path
, true)),
165 Err(ref e
) if e
.kind() == io
::ErrorKind
::AlreadyExists
=> Ok((relative_path
, false)),
170 pub fn list_images(&self) -> Result
<Vec
<PathBuf
>, Error
> {
171 let base
= self.base_path();
173 let mut list
= vec
![];
175 use walkdir
::WalkDir
;
177 let walker
= WalkDir
::new(&base
).same_file_system(true).into_iter();
179 // make sure we skip .chunks (and other hidden files to keep it simple)
180 fn is_hidden(entry
: &walkdir
::DirEntry
) -> bool
{
183 .map(|s
| s
.starts_with("."))
187 for entry
in walker
.filter_entry(|e
| !is_hidden(e
)) {
188 let path
= entry?
.into_path();
189 if let Some(ext
) = path
.extension() {
192 } else if ext
== "didx" {
201 fn mark_used_chunks(&self, status
: &mut GarbageCollectionStatus
) -> Result
<(), Error
> {
203 let image_list
= self.list_images()?
;
205 for path
in image_list
{
207 tools
::fail_on_shutdown()?
;
209 if let Some(ext
) = path
.extension() {
211 let index
= self.open_fixed_reader(&path
)?
;
212 index
.mark_used_chunks(status
)?
;
213 } else if ext
== "didx" {
214 let index
= self.open_dynamic_reader(&path
)?
;
215 index
.mark_used_chunks(status
)?
;
223 pub fn last_gc_status(&self) -> GarbageCollectionStatus
{
224 self.last_gc_status
.lock().unwrap().clone()
227 pub fn garbage_collection(&self, worker
: Arc
<WorkerTask
>) -> Result
<(), Error
> {
229 if let Ok(ref mut _mutex
) = self.gc_mutex
.try_lock() {
231 let _exclusive_lock
= self.chunk_store
.try_exclusive_lock()?
;
233 let oldest_writer
= self.chunk_store
.oldest_writer();
235 let mut gc_status
= GarbageCollectionStatus
::default();
236 gc_status
.upid
= Some(worker
.to_string());
238 worker
.log("Start GC phase1 (mark chunks)");
240 self.mark_used_chunks(&mut gc_status
)?
;
242 worker
.log("Start GC phase2 (sweep unused chunks)");
243 self.chunk_store
.sweep_unused_chunks(oldest_writer
, &mut gc_status
)?
;
245 worker
.log(&format
!("Used bytes: {}", gc_status
.used_bytes
));
246 worker
.log(&format
!("Used chunks: {}", gc_status
.used_chunks
));
247 worker
.log(&format
!("Disk bytes: {}", gc_status
.disk_bytes
));
248 worker
.log(&format
!("Disk chunks: {}", gc_status
.disk_chunks
));
250 *self.last_gc_status
.lock().unwrap() = gc_status
;
253 bail
!("Start GC failed - (already running/locked)");
259 pub fn insert_chunk(&self, chunk
: &[u8]) -> Result
<(bool
, [u8; 32], u64), Error
> {
260 self.chunk_store
.insert_chunk(chunk
)
263 pub fn insert_chunk_noverify(
267 ) -> Result
<(bool
, u64), Error
> {
268 self.chunk_store
.insert_chunk_noverify(digest
, chunk
)