]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/datastore.rs
src/backup/crypt_config.rs: remove encode_chunk, use encrypt_to instead
[proxmox-backup.git] / src / backup / datastore.rs
1 use failure::*;
2
3 use std::io;
4 use std::path::{PathBuf, Path};
5 use std::collections::HashMap;
6 use lazy_static::lazy_static;
7 use std::sync::{Mutex, Arc};
8
9 use crate::tools;
10 use crate::config::datastore;
11 use super::chunk_store::*;
12 use super::fixed_index::*;
13 use super::dynamic_index::*;
14 use super::index::*;
15 use super::backup_info::*;
16 use super::DataChunk;
17 use crate::server::WorkerTask;
18
19 lazy_static!{
20 static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
21 }
22
23 /// Datastore Management
24 ///
25 /// A Datastore can store severals backups, and provides the
26 /// management interface for backup.
27 pub struct DataStore {
28 chunk_store: Arc<ChunkStore>,
29 gc_mutex: Mutex<bool>,
30 last_gc_status: Mutex<GarbageCollectionStatus>,
31 }
32
33 impl DataStore {
34
35 pub fn lookup_datastore(name: &str) -> Result<Arc<DataStore>, Error> {
36
37 let config = datastore::config()?;
38 let (_, store_config) = config.sections.get(name)
39 .ok_or(format_err!("no such datastore '{}'", name))?;
40
41 let path = store_config["path"].as_str().unwrap();
42
43 let mut map = DATASTORE_MAP.lock().unwrap();
44
45 if let Some(datastore) = map.get(name) {
46 // Compare Config - if changed, create new Datastore object!
47 if datastore.chunk_store.base == PathBuf::from(path) {
48 return Ok(datastore.clone());
49 }
50 }
51
52 let datastore = DataStore::open(name)?;
53
54 let datastore = Arc::new(datastore);
55 map.insert(name.to_string(), datastore.clone());
56
57 Ok(datastore)
58 }
59
60 pub fn open(store_name: &str) -> Result<Self, Error> {
61
62 let config = datastore::config()?;
63 let (_, store_config) = config.sections.get(store_name)
64 .ok_or(format_err!("no such datastore '{}'", store_name))?;
65
66 let path = store_config["path"].as_str().unwrap();
67
68 let chunk_store = ChunkStore::open(store_name, path)?;
69
70 let gc_status = GarbageCollectionStatus::default();
71
72 Ok(Self {
73 chunk_store: Arc::new(chunk_store),
74 gc_mutex: Mutex::new(false),
75 last_gc_status: Mutex::new(gc_status),
76 })
77 }
78
79 pub fn get_chunk_iterator(
80 &self,
81 print_percentage: bool,
82 ) -> Result<
83 impl Iterator<Item = Result<tools::fs::ReadDirEntry, Error>>,
84 Error
85 > {
86 self.chunk_store.get_chunk_iterator(print_percentage)
87 }
88
89 pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> {
90
91 let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
92
93 Ok(index)
94 }
95
96 pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> {
97
98 let index = FixedIndexReader::open(self.chunk_store.clone(), filename.as_ref())?;
99
100 Ok(index)
101 }
102
103 pub fn create_dynamic_writer<P: AsRef<Path>>(
104 &self, filename: P,
105 ) -> Result<DynamicIndexWriter, Error> {
106
107 let index = DynamicIndexWriter::create(
108 self.chunk_store.clone(), filename.as_ref())?;
109
110 Ok(index)
111 }
112
113 pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
114
115 let index = DynamicIndexReader::open(self.chunk_store.clone(), filename.as_ref())?;
116
117 Ok(index)
118 }
119
120 pub fn open_index<P>(&self, filename: P) -> Result<Box<dyn IndexFile + Send>, Error>
121 where
122 P: AsRef<Path>,
123 {
124 let filename = filename.as_ref();
125 let out: Box<dyn IndexFile + Send> =
126 match filename.extension().and_then(|ext| ext.to_str()) {
127 Some("didx") => Box::new(self.open_dynamic_reader(filename)?),
128 Some("fidx") => Box::new(self.open_fixed_reader(filename)?),
129 _ => bail!("cannot open index file of unknown type: {:?}", filename),
130 };
131 Ok(out)
132 }
133
134 pub fn base_path(&self) -> PathBuf {
135 self.chunk_store.base_path()
136 }
137
138 /// Remove a backup directory including all content
139 pub fn remove_backup_dir(&self, backup_dir: &BackupDir,
140 ) -> Result<(), io::Error> {
141
142 let relative_path = backup_dir.relative_path();
143 let mut full_path = self.base_path();
144 full_path.push(&relative_path);
145
146 log::info!("removing backup {:?}", full_path);
147 std::fs::remove_dir_all(full_path)?;
148
149 Ok(())
150 }
151
152 pub fn create_backup_dir(&self, backup_dir: &BackupDir) -> Result<(PathBuf, bool), io::Error> {
153
154 // create intermediate path first:
155 let mut full_path = self.base_path();
156 full_path.push(backup_dir.group().group_path());
157 std::fs::create_dir_all(&full_path)?;
158
159 let relative_path = backup_dir.relative_path();
160 let mut full_path = self.base_path();
161 full_path.push(&relative_path);
162
163 // create the last component now
164 match std::fs::create_dir(&full_path) {
165 Ok(_) => Ok((relative_path, true)),
166 Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => Ok((relative_path, false)),
167 Err(e) => Err(e)
168 }
169 }
170
171 pub fn list_images(&self) -> Result<Vec<PathBuf>, Error> {
172 let base = self.base_path();
173
174 let mut list = vec![];
175
176 use walkdir::WalkDir;
177
178 let walker = WalkDir::new(&base).same_file_system(true).into_iter();
179
180 // make sure we skip .chunks (and other hidden files to keep it simple)
181 fn is_hidden(entry: &walkdir::DirEntry) -> bool {
182 entry.file_name()
183 .to_str()
184 .map(|s| s.starts_with("."))
185 .unwrap_or(false)
186 }
187
188 for entry in walker.filter_entry(|e| !is_hidden(e)) {
189 let path = entry?.into_path();
190 if let Some(ext) = path.extension() {
191 if ext == "fidx" {
192 list.push(path);
193 } else if ext == "didx" {
194 list.push(path);
195 }
196 }
197 }
198
199 Ok(list)
200 }
201
202 fn mark_used_chunks(&self, status: &mut GarbageCollectionStatus) -> Result<(), Error> {
203
204 let image_list = self.list_images()?;
205
206 for path in image_list {
207
208 tools::fail_on_shutdown()?;
209
210 if let Some(ext) = path.extension() {
211 if ext == "fidx" {
212 let index = self.open_fixed_reader(&path)?;
213 index.mark_used_chunks(status)?;
214 } else if ext == "didx" {
215 let index = self.open_dynamic_reader(&path)?;
216 index.mark_used_chunks(status)?;
217 }
218 }
219 }
220
221 Ok(())
222 }
223
224 pub fn last_gc_status(&self) -> GarbageCollectionStatus {
225 self.last_gc_status.lock().unwrap().clone()
226 }
227
228 pub fn garbage_collection(&self, worker: Arc<WorkerTask>) -> Result<(), Error> {
229
230 if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
231
232 let _exclusive_lock = self.chunk_store.try_exclusive_lock()?;
233
234 let oldest_writer = self.chunk_store.oldest_writer();
235
236 let mut gc_status = GarbageCollectionStatus::default();
237 gc_status.upid = Some(worker.to_string());
238
239 worker.log("Start GC phase1 (mark chunks)");
240
241 self.mark_used_chunks(&mut gc_status)?;
242
243 worker.log("Start GC phase2 (sweep unused chunks)");
244 self.chunk_store.sweep_unused_chunks(oldest_writer, &mut gc_status)?;
245
246 worker.log(&format!("Used bytes: {}", gc_status.used_bytes));
247 worker.log(&format!("Used chunks: {}", gc_status.used_chunks));
248 worker.log(&format!("Disk bytes: {}", gc_status.disk_bytes));
249 worker.log(&format!("Disk chunks: {}", gc_status.disk_chunks));
250
251 *self.last_gc_status.lock().unwrap() = gc_status;
252
253 } else {
254 bail!("Start GC failed - (already running/locked)");
255 }
256
257 Ok(())
258 }
259
260 pub fn insert_chunk(
261 &self,
262 chunk: &DataChunk,
263 ) -> Result<(bool, u64), Error> {
264 self.chunk_store.insert_chunk(chunk)
265 }
266 }