]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/datastore.rs
src/backup/datastore.rs - garbage-collection: fix average chunk size
[proxmox-backup.git] / src / backup / datastore.rs
1 use std::collections::HashMap;
2 use std::io;
3 use std::path::{Path, PathBuf};
4 use std::sync::{Arc, Mutex};
5
6 use failure::*;
7 use lazy_static::lazy_static;
8
9 use super::backup_info::BackupDir;
10 use super::chunk_store::{ChunkStore, GarbageCollectionStatus};
11 use super::dynamic_index::{DynamicIndexReader, DynamicIndexWriter};
12 use super::fixed_index::{FixedIndexReader, FixedIndexWriter};
13 use super::index::*;
14 use super::DataBlob;
15 use crate::config::datastore;
16 use crate::server::WorkerTask;
17 use crate::tools;
18
19 lazy_static! {
20 static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
21 }
22
23 /// Datastore Management
24 ///
25 /// A Datastore can store severals backups, and provides the
26 /// management interface for backup.
27 pub struct DataStore {
28 chunk_store: Arc<ChunkStore>,
29 gc_mutex: Mutex<bool>,
30 last_gc_status: Mutex<GarbageCollectionStatus>,
31 }
32
33 impl DataStore {
34
35 pub fn lookup_datastore(name: &str) -> Result<Arc<DataStore>, Error> {
36
37 let config = datastore::config()?;
38 let (_, store_config) = config.sections.get(name)
39 .ok_or(format_err!("no such datastore '{}'", name))?;
40
41 let path = store_config["path"].as_str().unwrap();
42
43 let mut map = DATASTORE_MAP.lock().unwrap();
44
45 if let Some(datastore) = map.get(name) {
46 // Compare Config - if changed, create new Datastore object!
47 if datastore.chunk_store.base == PathBuf::from(path) {
48 return Ok(datastore.clone());
49 }
50 }
51
52 let datastore = DataStore::open(name)?;
53
54 let datastore = Arc::new(datastore);
55 map.insert(name.to_string(), datastore.clone());
56
57 Ok(datastore)
58 }
59
60 pub fn open(store_name: &str) -> Result<Self, Error> {
61
62 let config = datastore::config()?;
63 let (_, store_config) = config.sections.get(store_name)
64 .ok_or(format_err!("no such datastore '{}'", store_name))?;
65
66 let path = store_config["path"].as_str().unwrap();
67
68 let chunk_store = ChunkStore::open(store_name, path)?;
69
70 let gc_status = GarbageCollectionStatus::default();
71
72 Ok(Self {
73 chunk_store: Arc::new(chunk_store),
74 gc_mutex: Mutex::new(false),
75 last_gc_status: Mutex::new(gc_status),
76 })
77 }
78
79 pub fn get_chunk_iterator(
80 &self,
81 ) -> Result<
82 impl Iterator<Item = (Result<tools::fs::ReadDirEntry, Error>, usize)>,
83 Error
84 > {
85 self.chunk_store.get_chunk_iterator()
86 }
87
88 pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> {
89
90 let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
91
92 Ok(index)
93 }
94
95 pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> {
96
97 let full_path = self.chunk_store.relative_path(filename.as_ref());
98
99 let index = FixedIndexReader::open(&full_path)?;
100
101 Ok(index)
102 }
103
104 pub fn create_dynamic_writer<P: AsRef<Path>>(
105 &self, filename: P,
106 ) -> Result<DynamicIndexWriter, Error> {
107
108 let index = DynamicIndexWriter::create(
109 self.chunk_store.clone(), filename.as_ref())?;
110
111 Ok(index)
112 }
113
114 pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
115
116 let full_path = self.chunk_store.relative_path(filename.as_ref());
117
118 let index = DynamicIndexReader::open(&full_path)?;
119
120 Ok(index)
121 }
122
123 pub fn open_index<P>(&self, filename: P) -> Result<Box<dyn IndexFile + Send>, Error>
124 where
125 P: AsRef<Path>,
126 {
127 let filename = filename.as_ref();
128 let out: Box<dyn IndexFile + Send> =
129 match filename.extension().and_then(|ext| ext.to_str()) {
130 Some("didx") => Box::new(self.open_dynamic_reader(filename)?),
131 Some("fidx") => Box::new(self.open_fixed_reader(filename)?),
132 _ => bail!("cannot open index file of unknown type: {:?}", filename),
133 };
134 Ok(out)
135 }
136
137 pub fn base_path(&self) -> PathBuf {
138 self.chunk_store.base_path()
139 }
140
141 /// Remove a backup directory including all content
142 pub fn remove_backup_dir(&self, backup_dir: &BackupDir,
143 ) -> Result<(), io::Error> {
144
145 let relative_path = backup_dir.relative_path();
146 let mut full_path = self.base_path();
147 full_path.push(&relative_path);
148
149 log::info!("removing backup {:?}", full_path);
150 std::fs::remove_dir_all(full_path)?;
151
152 Ok(())
153 }
154
155 pub fn create_backup_dir(&self, backup_dir: &BackupDir) -> Result<(PathBuf, bool), io::Error> {
156
157 // create intermediate path first:
158 let mut full_path = self.base_path();
159 full_path.push(backup_dir.group().group_path());
160 std::fs::create_dir_all(&full_path)?;
161
162 let relative_path = backup_dir.relative_path();
163 let mut full_path = self.base_path();
164 full_path.push(&relative_path);
165
166 // create the last component now
167 match std::fs::create_dir(&full_path) {
168 Ok(_) => Ok((relative_path, true)),
169 Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => Ok((relative_path, false)),
170 Err(e) => Err(e)
171 }
172 }
173
174 pub fn list_images(&self) -> Result<Vec<PathBuf>, Error> {
175 let base = self.base_path();
176
177 let mut list = vec![];
178
179 use walkdir::WalkDir;
180
181 let walker = WalkDir::new(&base).same_file_system(true).into_iter();
182
183 // make sure we skip .chunks (and other hidden files to keep it simple)
184 fn is_hidden(entry: &walkdir::DirEntry) -> bool {
185 entry.file_name()
186 .to_str()
187 .map(|s| s.starts_with("."))
188 .unwrap_or(false)
189 }
190
191 for entry in walker.filter_entry(|e| !is_hidden(e)) {
192 let path = entry?.into_path();
193 if let Some(ext) = path.extension() {
194 if ext == "fidx" || ext == "didx"{
195 list.push(path);
196 }
197 }
198 }
199
200 Ok(list)
201 }
202
203 // mark chunks used by ``index`` as used
204 fn index_mark_used_chunks<I: IndexFile>(
205 &self,
206 index: I,
207 file_name: &Path, // only used for error reporting
208 status: &mut GarbageCollectionStatus,
209 ) -> Result<(), Error> {
210
211 status.index_file_count += 1;
212 status.index_data_bytes += index.index_bytes();
213
214 for pos in 0..index.index_count() {
215 tools::fail_on_shutdown()?;
216 let digest = index.index_digest(pos).unwrap();
217 if let Err(err) = self.chunk_store.touch_chunk(digest) {
218 bail!("unable to access chunk {}, required by {:?} - {}",
219 proxmox::tools::digest_to_hex(digest), file_name, err);
220 }
221 }
222 Ok(())
223 }
224
225 fn mark_used_chunks(&self, status: &mut GarbageCollectionStatus) -> Result<(), Error> {
226
227 let image_list = self.list_images()?;
228
229 for path in image_list {
230
231 tools::fail_on_shutdown()?;
232
233 if let Some(ext) = path.extension() {
234 if ext == "fidx" {
235 let index = self.open_fixed_reader(&path)?;
236 self.index_mark_used_chunks(index, &path, status)?;
237 } else if ext == "didx" {
238 let index = self.open_dynamic_reader(&path)?;
239 self.index_mark_used_chunks(index, &path, status)?;
240 }
241 }
242 }
243
244 Ok(())
245 }
246
247 pub fn last_gc_status(&self) -> GarbageCollectionStatus {
248 self.last_gc_status.lock().unwrap().clone()
249 }
250
251 pub fn garbage_collection(&self, worker: Arc<WorkerTask>) -> Result<(), Error> {
252
253 if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
254
255 let _exclusive_lock = self.chunk_store.try_exclusive_lock()?;
256
257 let oldest_writer = self.chunk_store.oldest_writer();
258
259 let mut gc_status = GarbageCollectionStatus::default();
260 gc_status.upid = Some(worker.to_string());
261
262 worker.log("Start GC phase1 (mark used chunks)");
263
264 self.mark_used_chunks(&mut gc_status)?;
265
266 worker.log("Start GC phase2 (sweep unused chunks)");
267 self.chunk_store.sweep_unused_chunks(oldest_writer, &mut gc_status, worker.clone())?;
268
269 worker.log(&format!("Removed bytes: {}", gc_status.removed_bytes));
270 worker.log(&format!("Removed chunks: {}", gc_status.removed_chunks));
271 worker.log(&format!("Original data bytes: {}", gc_status.index_data_bytes));
272 let comp_per = (gc_status.disk_bytes*100)/gc_status.index_data_bytes;
273 worker.log(&format!("Disk bytes: {} ({} %)", gc_status.disk_bytes, comp_per));
274 worker.log(&format!("Disk chunks: {}", gc_status.disk_chunks));
275 let avg_chunk = gc_status.disk_bytes/(gc_status.disk_chunks as u64);
276 worker.log(&format!("Average chunk size: {}", avg_chunk));
277
278 *self.last_gc_status.lock().unwrap() = gc_status;
279
280 } else {
281 bail!("Start GC failed - (already running/locked)");
282 }
283
284 Ok(())
285 }
286
287 pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
288 self.chunk_store.chunk_path(digest)
289 }
290
291 pub fn insert_chunk(
292 &self,
293 chunk: &DataBlob,
294 digest: &[u8; 32],
295 ) -> Result<(bool, u64), Error> {
296 self.chunk_store.insert_chunk(chunk, digest)
297 }
298 }