]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/datastore.rs
src/backup/dynamic_index.rs: introduce ReadChunk trait
[proxmox-backup.git] / src / backup / datastore.rs
1 use failure::*;
2
3 use std::io;
4 use std::path::{PathBuf, Path};
5 use std::collections::HashMap;
6 use lazy_static::lazy_static;
7 use std::sync::{Mutex, Arc};
8
9 use crate::tools;
10 use crate::config::datastore;
11 use super::chunk_store::*;
12 use super::fixed_index::*;
13 use super::dynamic_index::*;
14 use super::index::*;
15 use super::backup_info::*;
16 use super::DataChunk;
17 use crate::server::WorkerTask;
18
19 lazy_static!{
20 static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
21 }
22
23 /// Datastore Management
24 ///
25 /// A Datastore can store severals backups, and provides the
26 /// management interface for backup.
27 pub struct DataStore {
28 chunk_store: Arc<ChunkStore>,
29 gc_mutex: Mutex<bool>,
30 last_gc_status: Mutex<GarbageCollectionStatus>,
31 }
32
33 impl DataStore {
34
35 pub fn lookup_datastore(name: &str) -> Result<Arc<DataStore>, Error> {
36
37 let config = datastore::config()?;
38 let (_, store_config) = config.sections.get(name)
39 .ok_or(format_err!("no such datastore '{}'", name))?;
40
41 let path = store_config["path"].as_str().unwrap();
42
43 let mut map = DATASTORE_MAP.lock().unwrap();
44
45 if let Some(datastore) = map.get(name) {
46 // Compare Config - if changed, create new Datastore object!
47 if datastore.chunk_store.base == PathBuf::from(path) {
48 return Ok(datastore.clone());
49 }
50 }
51
52 let datastore = DataStore::open(name)?;
53
54 let datastore = Arc::new(datastore);
55 map.insert(name.to_string(), datastore.clone());
56
57 Ok(datastore)
58 }
59
60 pub fn open(store_name: &str) -> Result<Self, Error> {
61
62 let config = datastore::config()?;
63 let (_, store_config) = config.sections.get(store_name)
64 .ok_or(format_err!("no such datastore '{}'", store_name))?;
65
66 let path = store_config["path"].as_str().unwrap();
67
68 let chunk_store = ChunkStore::open(store_name, path)?;
69
70 let gc_status = GarbageCollectionStatus::default();
71
72 Ok(Self {
73 chunk_store: Arc::new(chunk_store),
74 gc_mutex: Mutex::new(false),
75 last_gc_status: Mutex::new(gc_status),
76 })
77 }
78
79 pub fn get_chunk_iterator(
80 &self,
81 print_percentage: bool,
82 ) -> Result<
83 impl Iterator<Item = Result<tools::fs::ReadDirEntry, Error>>,
84 Error
85 > {
86 self.chunk_store.get_chunk_iterator(print_percentage)
87 }
88
89 pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> {
90
91 let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
92
93 Ok(index)
94 }
95
96 pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> {
97
98 let index = FixedIndexReader::open(self.chunk_store.clone(), filename.as_ref())?;
99
100 Ok(index)
101 }
102
103 pub fn create_dynamic_writer<P: AsRef<Path>>(
104 &self, filename: P,
105 ) -> Result<DynamicIndexWriter, Error> {
106
107 let index = DynamicIndexWriter::create(
108 self.chunk_store.clone(), filename.as_ref())?;
109
110 Ok(index)
111 }
112
113 pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
114
115 let full_path = self.chunk_store.relative_path(filename.as_ref());
116
117 let index = DynamicIndexReader::open(&full_path)?;
118
119 Ok(index)
120 }
121
122 pub fn open_index<P>(&self, filename: P) -> Result<Box<dyn IndexFile + Send>, Error>
123 where
124 P: AsRef<Path>,
125 {
126 let filename = filename.as_ref();
127 let out: Box<dyn IndexFile + Send> =
128 match filename.extension().and_then(|ext| ext.to_str()) {
129 Some("didx") => Box::new(self.open_dynamic_reader(filename)?),
130 Some("fidx") => Box::new(self.open_fixed_reader(filename)?),
131 _ => bail!("cannot open index file of unknown type: {:?}", filename),
132 };
133 Ok(out)
134 }
135
136 pub fn base_path(&self) -> PathBuf {
137 self.chunk_store.base_path()
138 }
139
140 /// Remove a backup directory including all content
141 pub fn remove_backup_dir(&self, backup_dir: &BackupDir,
142 ) -> Result<(), io::Error> {
143
144 let relative_path = backup_dir.relative_path();
145 let mut full_path = self.base_path();
146 full_path.push(&relative_path);
147
148 log::info!("removing backup {:?}", full_path);
149 std::fs::remove_dir_all(full_path)?;
150
151 Ok(())
152 }
153
154 pub fn create_backup_dir(&self, backup_dir: &BackupDir) -> Result<(PathBuf, bool), io::Error> {
155
156 // create intermediate path first:
157 let mut full_path = self.base_path();
158 full_path.push(backup_dir.group().group_path());
159 std::fs::create_dir_all(&full_path)?;
160
161 let relative_path = backup_dir.relative_path();
162 let mut full_path = self.base_path();
163 full_path.push(&relative_path);
164
165 // create the last component now
166 match std::fs::create_dir(&full_path) {
167 Ok(_) => Ok((relative_path, true)),
168 Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => Ok((relative_path, false)),
169 Err(e) => Err(e)
170 }
171 }
172
173 pub fn list_images(&self) -> Result<Vec<PathBuf>, Error> {
174 let base = self.base_path();
175
176 let mut list = vec![];
177
178 use walkdir::WalkDir;
179
180 let walker = WalkDir::new(&base).same_file_system(true).into_iter();
181
182 // make sure we skip .chunks (and other hidden files to keep it simple)
183 fn is_hidden(entry: &walkdir::DirEntry) -> bool {
184 entry.file_name()
185 .to_str()
186 .map(|s| s.starts_with("."))
187 .unwrap_or(false)
188 }
189
190 for entry in walker.filter_entry(|e| !is_hidden(e)) {
191 let path = entry?.into_path();
192 if let Some(ext) = path.extension() {
193 if ext == "fidx" {
194 list.push(path);
195 } else if ext == "didx" {
196 list.push(path);
197 }
198 }
199 }
200
201 Ok(list)
202 }
203
204 fn mark_used_chunks(&self, status: &mut GarbageCollectionStatus) -> Result<(), Error> {
205
206 let image_list = self.list_images()?;
207
208 for path in image_list {
209
210 tools::fail_on_shutdown()?;
211
212 if let Some(ext) = path.extension() {
213 if ext == "fidx" {
214 let index = self.open_fixed_reader(&path)?;
215 index.mark_used_chunks(status)?;
216 } else if ext == "didx" {
217 let index = self.open_dynamic_reader(&path)?;
218 index.mark_used_chunks(status)?;
219 }
220 }
221 }
222
223 Ok(())
224 }
225
226 pub fn last_gc_status(&self) -> GarbageCollectionStatus {
227 self.last_gc_status.lock().unwrap().clone()
228 }
229
230 pub fn garbage_collection(&self, worker: Arc<WorkerTask>) -> Result<(), Error> {
231
232 if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
233
234 let _exclusive_lock = self.chunk_store.try_exclusive_lock()?;
235
236 let oldest_writer = self.chunk_store.oldest_writer();
237
238 let mut gc_status = GarbageCollectionStatus::default();
239 gc_status.upid = Some(worker.to_string());
240
241 worker.log("Start GC phase1 (mark chunks)");
242
243 self.mark_used_chunks(&mut gc_status)?;
244
245 worker.log("Start GC phase2 (sweep unused chunks)");
246 self.chunk_store.sweep_unused_chunks(oldest_writer, &mut gc_status)?;
247
248 worker.log(&format!("Used bytes: {}", gc_status.used_bytes));
249 worker.log(&format!("Used chunks: {}", gc_status.used_chunks));
250 worker.log(&format!("Disk bytes: {}", gc_status.disk_bytes));
251 worker.log(&format!("Disk chunks: {}", gc_status.disk_chunks));
252
253 *self.last_gc_status.lock().unwrap() = gc_status;
254
255 } else {
256 bail!("Start GC failed - (already running/locked)");
257 }
258
259 Ok(())
260 }
261
262 pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
263 self.chunk_store.chunk_path(digest)
264 }
265
266 pub fn insert_chunk(
267 &self,
268 chunk: &DataChunk,
269 ) -> Result<(bool, u64), Error> {
270 self.chunk_store.insert_chunk(chunk)
271 }
272 }