]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/datastore.rs
src/backup/dynamic_index.rs: split class DynamicIndexWriter
[proxmox-backup.git] / src / backup / datastore.rs
1 use failure::*;
2
3 use std::io;
4 use std::path::{PathBuf, Path};
5 use std::collections::HashMap;
6 use lazy_static::lazy_static;
7 use std::sync::{Mutex, Arc};
8
9 use crate::tools;
10 use crate::config::datastore;
11 use super::chunk_store::*;
12 use super::fixed_index::*;
13 use super::dynamic_index::*;
14 use super::index::*;
15 use super::backup_info::*;
16 use crate::server::WorkerTask;
17
18 lazy_static!{
19 static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
20 }
21
22 /// Datastore Management
23 ///
24 /// A Datastore can store severals backups, and provides the
25 /// management interface for backup.
26 pub struct DataStore {
27 chunk_store: Arc<ChunkStore>,
28 gc_mutex: Mutex<bool>,
29 last_gc_status: Mutex<GarbageCollectionStatus>,
30 }
31
32 impl DataStore {
33
34 pub fn lookup_datastore(name: &str) -> Result<Arc<DataStore>, Error> {
35
36 let config = datastore::config()?;
37 let (_, store_config) = config.sections.get(name)
38 .ok_or(format_err!("no such datastore '{}'", name))?;
39
40 let path = store_config["path"].as_str().unwrap();
41
42 let mut map = DATASTORE_MAP.lock().unwrap();
43
44 if let Some(datastore) = map.get(name) {
45 // Compare Config - if changed, create new Datastore object!
46 if datastore.chunk_store.base == PathBuf::from(path) {
47 return Ok(datastore.clone());
48 }
49 }
50
51 let datastore = DataStore::open(name)?;
52
53 let datastore = Arc::new(datastore);
54 map.insert(name.to_string(), datastore.clone());
55
56 Ok(datastore)
57 }
58
59 pub fn open(store_name: &str) -> Result<Self, Error> {
60
61 let config = datastore::config()?;
62 let (_, store_config) = config.sections.get(store_name)
63 .ok_or(format_err!("no such datastore '{}'", store_name))?;
64
65 let path = store_config["path"].as_str().unwrap();
66
67 let chunk_store = ChunkStore::open(store_name, path)?;
68
69 let gc_status = GarbageCollectionStatus::default();
70
71 Ok(Self {
72 chunk_store: Arc::new(chunk_store),
73 gc_mutex: Mutex::new(false),
74 last_gc_status: Mutex::new(gc_status),
75 })
76 }
77
78 pub fn get_chunk_iterator(
79 &self,
80 print_percentage: bool,
81 ) -> Result<
82 impl Iterator<Item = Result<tools::fs::ReadDirEntry, Error>>,
83 Error
84 > {
85 self.chunk_store.get_chunk_iterator(print_percentage)
86 }
87
88 pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> {
89
90 let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
91
92 Ok(index)
93 }
94
95 pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> {
96
97 let index = FixedIndexReader::open(self.chunk_store.clone(), filename.as_ref())?;
98
99 Ok(index)
100 }
101
102 pub fn create_dynamic_writer<P: AsRef<Path>>(
103 &self, filename: P,
104 ) -> Result<DynamicIndexWriter, Error> {
105
106 let index = DynamicIndexWriter::create(
107 self.chunk_store.clone(), filename.as_ref())?;
108
109 Ok(index)
110 }
111
112 pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
113
114 let index = DynamicIndexReader::open(self.chunk_store.clone(), filename.as_ref())?;
115
116 Ok(index)
117 }
118
119 pub fn open_index<P>(&self, filename: P) -> Result<Box<dyn IndexFile + Send>, Error>
120 where
121 P: AsRef<Path>,
122 {
123 let filename = filename.as_ref();
124 let out: Box<dyn IndexFile + Send> =
125 match filename.extension().and_then(|ext| ext.to_str()) {
126 Some("didx") => Box::new(self.open_dynamic_reader(filename)?),
127 Some("fidx") => Box::new(self.open_fixed_reader(filename)?),
128 _ => bail!("cannot open index file of unknown type: {:?}", filename),
129 };
130 Ok(out)
131 }
132
133 pub fn base_path(&self) -> PathBuf {
134 self.chunk_store.base_path()
135 }
136
137 /// Remove a backup directory including all content
138 pub fn remove_backup_dir(&self, backup_dir: &BackupDir,
139 ) -> Result<(), io::Error> {
140
141 let relative_path = backup_dir.relative_path();
142 let mut full_path = self.base_path();
143 full_path.push(&relative_path);
144
145 log::info!("removing backup {:?}", full_path);
146 std::fs::remove_dir_all(full_path)?;
147
148 Ok(())
149 }
150
151 pub fn create_backup_dir(&self, backup_dir: &BackupDir) -> Result<(PathBuf, bool), io::Error> {
152
153 // create intermediate path first:
154 let mut full_path = self.base_path();
155 full_path.push(backup_dir.group().group_path());
156 std::fs::create_dir_all(&full_path)?;
157
158 let relative_path = backup_dir.relative_path();
159 let mut full_path = self.base_path();
160 full_path.push(&relative_path);
161
162 // create the last component now
163 match std::fs::create_dir(&full_path) {
164 Ok(_) => Ok((relative_path, true)),
165 Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => Ok((relative_path, false)),
166 Err(e) => Err(e)
167 }
168 }
169
170 pub fn list_images(&self) -> Result<Vec<PathBuf>, Error> {
171 let base = self.base_path();
172
173 let mut list = vec![];
174
175 use walkdir::WalkDir;
176
177 let walker = WalkDir::new(&base).same_file_system(true).into_iter();
178
179 // make sure we skip .chunks (and other hidden files to keep it simple)
180 fn is_hidden(entry: &walkdir::DirEntry) -> bool {
181 entry.file_name()
182 .to_str()
183 .map(|s| s.starts_with("."))
184 .unwrap_or(false)
185 }
186
187 for entry in walker.filter_entry(|e| !is_hidden(e)) {
188 let path = entry?.into_path();
189 if let Some(ext) = path.extension() {
190 if ext == "fidx" {
191 list.push(path);
192 } else if ext == "didx" {
193 list.push(path);
194 }
195 }
196 }
197
198 Ok(list)
199 }
200
201 fn mark_used_chunks(&self, status: &mut GarbageCollectionStatus) -> Result<(), Error> {
202
203 let image_list = self.list_images()?;
204
205 for path in image_list {
206
207 tools::fail_on_shutdown()?;
208
209 if let Some(ext) = path.extension() {
210 if ext == "fidx" {
211 let index = self.open_fixed_reader(&path)?;
212 index.mark_used_chunks(status)?;
213 } else if ext == "didx" {
214 let index = self.open_dynamic_reader(&path)?;
215 index.mark_used_chunks(status)?;
216 }
217 }
218 }
219
220 Ok(())
221 }
222
223 pub fn last_gc_status(&self) -> GarbageCollectionStatus {
224 self.last_gc_status.lock().unwrap().clone()
225 }
226
227 pub fn garbage_collection(&self, worker: Arc<WorkerTask>) -> Result<(), Error> {
228
229 if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
230
231 let _exclusive_lock = self.chunk_store.try_exclusive_lock()?;
232
233 let oldest_writer = self.chunk_store.oldest_writer();
234
235 let mut gc_status = GarbageCollectionStatus::default();
236 gc_status.upid = Some(worker.to_string());
237
238 worker.log("Start GC phase1 (mark chunks)");
239
240 self.mark_used_chunks(&mut gc_status)?;
241
242 worker.log("Start GC phase2 (sweep unused chunks)");
243 self.chunk_store.sweep_unused_chunks(oldest_writer, &mut gc_status)?;
244
245 worker.log(&format!("Used bytes: {}", gc_status.used_bytes));
246 worker.log(&format!("Used chunks: {}", gc_status.used_chunks));
247 worker.log(&format!("Disk bytes: {}", gc_status.disk_bytes));
248 worker.log(&format!("Disk chunks: {}", gc_status.disk_chunks));
249
250 *self.last_gc_status.lock().unwrap() = gc_status;
251
252 } else {
253 bail!("Start GC failed - (already running/locked)");
254 }
255
256 Ok(())
257 }
258
259 pub fn insert_chunk(&self, chunk: &[u8]) -> Result<(bool, [u8; 32], u64), Error> {
260 self.chunk_store.insert_chunk(chunk)
261 }
262
263 pub fn insert_chunk_noverify(
264 &self,
265 digest: &[u8; 32],
266 chunk: &[u8],
267 ) -> Result<(bool, u64), Error> {
268 self.chunk_store.insert_chunk_noverify(digest, chunk)
269 }
270 }