]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/datastore.rs
datastore: fix typo
[proxmox-backup.git] / src / backup / datastore.rs
1 use std::collections::{HashSet, HashMap};
2 use std::io::{self, Write};
3 use std::path::{Path, PathBuf};
4 use std::sync::{Arc, Mutex};
5 use std::convert::TryFrom;
6
7 use anyhow::{bail, format_err, Error};
8 use lazy_static::lazy_static;
9 use chrono::{DateTime, Utc};
10
11 use super::backup_info::{BackupGroup, BackupDir};
12 use super::chunk_store::ChunkStore;
13 use super::dynamic_index::{DynamicIndexReader, DynamicIndexWriter};
14 use super::fixed_index::{FixedIndexReader, FixedIndexWriter};
15 use super::manifest::{MANIFEST_BLOB_NAME, CLIENT_LOG_BLOB_NAME, BackupManifest};
16 use super::index::*;
17 use super::{DataBlob, ArchiveType, archive_type};
18 use crate::backup::CryptMode;
19 use crate::config::datastore;
20 use crate::server::WorkerTask;
21 use crate::tools;
22 use crate::api2::types::GarbageCollectionStatus;
23
24 lazy_static! {
25 static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
26 }
27
28 /// Datastore Management
29 ///
30 /// A Datastore can store severals backups, and provides the
31 /// management interface for backup.
32 pub struct DataStore {
33 chunk_store: Arc<ChunkStore>,
34 gc_mutex: Mutex<bool>,
35 last_gc_status: Mutex<GarbageCollectionStatus>,
36 }
37
38 impl DataStore {
39
40 pub fn lookup_datastore(name: &str) -> Result<Arc<DataStore>, Error> {
41
42 let (config, _digest) = datastore::config()?;
43 let config: datastore::DataStoreConfig = config.lookup("datastore", name)?;
44
45 let mut map = DATASTORE_MAP.lock().unwrap();
46
47 if let Some(datastore) = map.get(name) {
48 // Compare Config - if changed, create new Datastore object!
49 if datastore.chunk_store.base == PathBuf::from(&config.path) {
50 return Ok(datastore.clone());
51 }
52 }
53
54 let datastore = DataStore::open(name)?;
55
56 let datastore = Arc::new(datastore);
57 map.insert(name.to_string(), datastore.clone());
58
59 Ok(datastore)
60 }
61
62 pub fn open(store_name: &str) -> Result<Self, Error> {
63
64 let (config, _digest) = datastore::config()?;
65 let (_, store_config) = config.sections.get(store_name)
66 .ok_or(format_err!("no such datastore '{}'", store_name))?;
67
68 let path = store_config["path"].as_str().unwrap();
69
70 let chunk_store = ChunkStore::open(store_name, path)?;
71
72 let gc_status = GarbageCollectionStatus::default();
73
74 Ok(Self {
75 chunk_store: Arc::new(chunk_store),
76 gc_mutex: Mutex::new(false),
77 last_gc_status: Mutex::new(gc_status),
78 })
79 }
80
81 pub fn get_chunk_iterator(
82 &self,
83 ) -> Result<
84 impl Iterator<Item = (Result<tools::fs::ReadDirEntry, Error>, usize)>,
85 Error
86 > {
87 self.chunk_store.get_chunk_iterator()
88 }
89
90 pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> {
91
92 let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
93
94 Ok(index)
95 }
96
97 pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> {
98
99 let full_path = self.chunk_store.relative_path(filename.as_ref());
100
101 let index = FixedIndexReader::open(&full_path)?;
102
103 Ok(index)
104 }
105
106 pub fn create_dynamic_writer<P: AsRef<Path>>(
107 &self, filename: P,
108 ) -> Result<DynamicIndexWriter, Error> {
109
110 let index = DynamicIndexWriter::create(
111 self.chunk_store.clone(), filename.as_ref())?;
112
113 Ok(index)
114 }
115
116 pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
117
118 let full_path = self.chunk_store.relative_path(filename.as_ref());
119
120 let index = DynamicIndexReader::open(&full_path)?;
121
122 Ok(index)
123 }
124
125 pub fn open_index<P>(&self, filename: P) -> Result<Box<dyn IndexFile + Send>, Error>
126 where
127 P: AsRef<Path>,
128 {
129 let filename = filename.as_ref();
130 let out: Box<dyn IndexFile + Send> =
131 match archive_type(filename)? {
132 ArchiveType::DynamicIndex => Box::new(self.open_dynamic_reader(filename)?),
133 ArchiveType::FixedIndex => Box::new(self.open_fixed_reader(filename)?),
134 _ => bail!("cannot open index file of unknown type: {:?}", filename),
135 };
136 Ok(out)
137 }
138
139 pub fn name(&self) -> &str {
140 self.chunk_store.name()
141 }
142
143 pub fn base_path(&self) -> PathBuf {
144 self.chunk_store.base_path()
145 }
146
147 /// Cleanup a backup directory
148 ///
149 /// Removes all files not mentioned in the manifest.
150 pub fn cleanup_backup_dir(&self, backup_dir: &BackupDir, manifest: &BackupManifest
151 ) -> Result<(), Error> {
152
153 let mut full_path = self.base_path();
154 full_path.push(backup_dir.relative_path());
155
156 let mut wanted_files = HashSet::new();
157 wanted_files.insert(MANIFEST_BLOB_NAME.to_string());
158 wanted_files.insert(CLIENT_LOG_BLOB_NAME.to_string());
159 manifest.files().iter().for_each(|item| { wanted_files.insert(item.filename.clone()); });
160
161 for item in tools::fs::read_subdir(libc::AT_FDCWD, &full_path)? {
162 if let Ok(item) = item {
163 if let Some(file_type) = item.file_type() {
164 if file_type != nix::dir::Type::File { continue; }
165 }
166 let file_name = item.file_name().to_bytes();
167 if file_name == b"." || file_name == b".." { continue; };
168
169 if let Ok(name) = std::str::from_utf8(file_name) {
170 if wanted_files.contains(name) { continue; }
171 }
172 println!("remove unused file {:?}", item.file_name());
173 let dirfd = item.parent_fd();
174 let _res = unsafe { libc::unlinkat(dirfd, item.file_name().as_ptr(), 0) };
175 }
176 }
177
178 Ok(())
179 }
180
181 /// Returns the absolute path for a backup_group
182 pub fn group_path(&self, backup_group: &BackupGroup) -> PathBuf {
183 let mut full_path = self.base_path();
184 full_path.push(backup_group.group_path());
185 full_path
186 }
187
188 /// Returns the absolute path for backup_dir
189 pub fn snapshot_path(&self, backup_dir: &BackupDir) -> PathBuf {
190 let mut full_path = self.base_path();
191 full_path.push(backup_dir.relative_path());
192 full_path
193 }
194
195 /// Remove a complete backup group including all snapshots
196 pub fn remove_backup_group(&self, backup_group: &BackupGroup) -> Result<(), Error> {
197
198 let full_path = self.group_path(backup_group);
199
200 log::info!("removing backup group {:?}", full_path);
201 std::fs::remove_dir_all(&full_path)
202 .map_err(|err| {
203 format_err!(
204 "removing backup group {:?} failed - {}",
205 full_path,
206 err,
207 )
208 })?;
209
210 Ok(())
211 }
212
213 /// Remove a backup directory including all content
214 pub fn remove_backup_dir(&self, backup_dir: &BackupDir) -> Result<(), Error> {
215
216 let full_path = self.snapshot_path(backup_dir);
217
218 log::info!("removing backup snapshot {:?}", full_path);
219 std::fs::remove_dir_all(&full_path)
220 .map_err(|err| {
221 format_err!(
222 "removing backup snapshot {:?} failed - {}",
223 full_path,
224 err,
225 )
226 })?;
227
228 Ok(())
229 }
230
231 /// Returns the time of the last successful backup
232 ///
233 /// Or None if there is no backup in the group (or the group dir does not exist).
234 pub fn last_successful_backup(&self, backup_group: &BackupGroup) -> Result<Option<DateTime<Utc>>, Error> {
235 let base_path = self.base_path();
236 let mut group_path = base_path.clone();
237 group_path.push(backup_group.group_path());
238
239 if group_path.exists() {
240 backup_group.last_successful_backup(&base_path)
241 } else {
242 Ok(None)
243 }
244 }
245
246 /// Returns the backup owner.
247 ///
248 /// The backup owner is the user who first created the backup group.
249 pub fn get_owner(&self, backup_group: &BackupGroup) -> Result<String, Error> {
250 let mut full_path = self.base_path();
251 full_path.push(backup_group.group_path());
252 full_path.push("owner");
253 let owner = proxmox::tools::fs::file_read_firstline(full_path)?;
254 Ok(owner.trim_end().to_string()) // remove trailing newline
255 }
256
257 /// Set the backup owner.
258 pub fn set_owner(&self, backup_group: &BackupGroup, userid: &str, force: bool) -> Result<(), Error> {
259 let mut path = self.base_path();
260 path.push(backup_group.group_path());
261 path.push("owner");
262
263 let mut open_options = std::fs::OpenOptions::new();
264 open_options.write(true);
265 open_options.truncate(true);
266
267 if force {
268 open_options.create(true);
269 } else {
270 open_options.create_new(true);
271 }
272
273 let mut file = open_options.open(&path)
274 .map_err(|err| format_err!("unable to create owner file {:?} - {}", path, err))?;
275
276 write!(file, "{}\n", userid)
277 .map_err(|err| format_err!("unable to write owner file {:?} - {}", path, err))?;
278
279 Ok(())
280 }
281
282 /// Create a backup group if it does not already exists.
283 ///
284 /// And set the owner to 'userid'. If the group already exists, it returns the
285 /// current owner (instead of setting the owner).
286 pub fn create_backup_group(&self, backup_group: &BackupGroup, userid: &str) -> Result<String, Error> {
287
288 // create intermediate path first:
289 let base_path = self.base_path();
290
291 let mut full_path = base_path.clone();
292 full_path.push(backup_group.backup_type());
293 std::fs::create_dir_all(&full_path)?;
294
295 full_path.push(backup_group.backup_id());
296
297 // create the last component now
298 match std::fs::create_dir(&full_path) {
299 Ok(_) => {
300 self.set_owner(backup_group, userid, false)?;
301 let owner = self.get_owner(backup_group)?; // just to be sure
302 Ok(owner)
303 }
304 Err(ref err) if err.kind() == io::ErrorKind::AlreadyExists => {
305 let owner = self.get_owner(backup_group)?; // just to be sure
306 Ok(owner)
307 }
308 Err(err) => bail!("unable to create backup group {:?} - {}", full_path, err),
309 }
310 }
311
312 /// Creates a new backup snapshot inside a BackupGroup
313 ///
314 /// The BackupGroup directory needs to exist.
315 pub fn create_backup_dir(&self, backup_dir: &BackupDir) -> Result<(PathBuf, bool), io::Error> {
316 let relative_path = backup_dir.relative_path();
317 let mut full_path = self.base_path();
318 full_path.push(&relative_path);
319
320 match std::fs::create_dir(&full_path) {
321 Ok(_) => Ok((relative_path, true)),
322 Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => Ok((relative_path, false)),
323 Err(e) => Err(e)
324 }
325 }
326
327 pub fn list_images(&self) -> Result<Vec<PathBuf>, Error> {
328 let base = self.base_path();
329
330 let mut list = vec![];
331
332 use walkdir::WalkDir;
333
334 let walker = WalkDir::new(&base).same_file_system(true).into_iter();
335
336 // make sure we skip .chunks (and other hidden files to keep it simple)
337 fn is_hidden(entry: &walkdir::DirEntry) -> bool {
338 entry.file_name()
339 .to_str()
340 .map(|s| s.starts_with("."))
341 .unwrap_or(false)
342 }
343
344 for entry in walker.filter_entry(|e| !is_hidden(e)) {
345 let path = entry?.into_path();
346 if let Ok(archive_type) = archive_type(&path) {
347 if archive_type == ArchiveType::FixedIndex || archive_type == ArchiveType::DynamicIndex {
348 list.push(path);
349 }
350 }
351 }
352
353 Ok(list)
354 }
355
356 // mark chunks used by ``index`` as used
357 fn index_mark_used_chunks<I: IndexFile>(
358 &self,
359 index: I,
360 file_name: &Path, // only used for error reporting
361 status: &mut GarbageCollectionStatus,
362 worker: &WorkerTask,
363 ) -> Result<(), Error> {
364
365 status.index_file_count += 1;
366 status.index_data_bytes += index.index_bytes();
367
368 for pos in 0..index.index_count() {
369 worker.fail_on_abort()?;
370 tools::fail_on_shutdown()?;
371 let digest = index.index_digest(pos).unwrap();
372 if let Err(err) = self.chunk_store.touch_chunk(digest) {
373 bail!("unable to access chunk {}, required by {:?} - {}",
374 proxmox::tools::digest_to_hex(digest), file_name, err);
375 }
376 }
377 Ok(())
378 }
379
380 fn mark_used_chunks(&self, status: &mut GarbageCollectionStatus, worker: &WorkerTask) -> Result<(), Error> {
381
382 let image_list = self.list_images()?;
383
384 for path in image_list {
385
386 worker.fail_on_abort()?;
387 tools::fail_on_shutdown()?;
388
389 if let Ok(archive_type) = archive_type(&path) {
390 if archive_type == ArchiveType::FixedIndex {
391 let index = self.open_fixed_reader(&path)?;
392 self.index_mark_used_chunks(index, &path, status, worker)?;
393 } else if archive_type == ArchiveType::DynamicIndex {
394 let index = self.open_dynamic_reader(&path)?;
395 self.index_mark_used_chunks(index, &path, status, worker)?;
396 }
397 }
398 }
399
400 Ok(())
401 }
402
403 pub fn last_gc_status(&self) -> GarbageCollectionStatus {
404 self.last_gc_status.lock().unwrap().clone()
405 }
406
407 pub fn garbage_collection_running(&self) -> bool {
408 if let Ok(_) = self.gc_mutex.try_lock() { false } else { true }
409 }
410
411 pub fn garbage_collection(&self, worker: &WorkerTask) -> Result<(), Error> {
412
413 if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
414
415 let _exclusive_lock = self.chunk_store.try_exclusive_lock()?;
416
417 let now = unsafe { libc::time(std::ptr::null_mut()) };
418
419 let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(now);
420
421 let mut gc_status = GarbageCollectionStatus::default();
422 gc_status.upid = Some(worker.to_string());
423
424 worker.log("Start GC phase1 (mark used chunks)");
425
426 self.mark_used_chunks(&mut gc_status, &worker)?;
427
428 worker.log("Start GC phase2 (sweep unused chunks)");
429 self.chunk_store.sweep_unused_chunks(oldest_writer, &mut gc_status, &worker)?;
430
431 worker.log(&format!("Removed bytes: {}", gc_status.removed_bytes));
432 worker.log(&format!("Removed chunks: {}", gc_status.removed_chunks));
433 if gc_status.pending_bytes > 0 {
434 worker.log(&format!("Pending removals: {} bytes ({} chunks)", gc_status.pending_bytes, gc_status.pending_chunks));
435 }
436
437 worker.log(&format!("Original data bytes: {}", gc_status.index_data_bytes));
438
439 if gc_status.index_data_bytes > 0 {
440 let comp_per = (gc_status.disk_bytes*100)/gc_status.index_data_bytes;
441 worker.log(&format!("Disk bytes: {} ({} %)", gc_status.disk_bytes, comp_per));
442 }
443
444 worker.log(&format!("Disk chunks: {}", gc_status.disk_chunks));
445
446 if gc_status.disk_chunks > 0 {
447 let avg_chunk = gc_status.disk_bytes/(gc_status.disk_chunks as u64);
448 worker.log(&format!("Average chunk size: {}", avg_chunk));
449 }
450
451 *self.last_gc_status.lock().unwrap() = gc_status;
452
453 } else {
454 bail!("Start GC failed - (already running/locked)");
455 }
456
457 Ok(())
458 }
459
460 pub fn try_shared_chunk_store_lock(&self) -> Result<tools::ProcessLockSharedGuard, Error> {
461 self.chunk_store.try_shared_lock()
462 }
463
464 pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
465 self.chunk_store.chunk_path(digest)
466 }
467
468 pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result<bool, Error> {
469 self.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
470 }
471
472 pub fn insert_chunk(
473 &self,
474 chunk: &DataBlob,
475 digest: &[u8; 32],
476 ) -> Result<(bool, u64), Error> {
477 self.chunk_store.insert_chunk(chunk, digest)
478 }
479
480 pub fn verify_stored_chunk(&self, digest: &[u8; 32], expected_chunk_size: u64) -> Result<(), Error> {
481 let blob = self.chunk_store.read_chunk(digest)?;
482 blob.verify_crc()?;
483 blob.verify_unencrypted(expected_chunk_size as usize, digest)?;
484 Ok(())
485 }
486
487 pub fn load_blob(&self, backup_dir: &BackupDir, filename: &str) -> Result<(DataBlob, u64), Error> {
488 let mut path = self.base_path();
489 path.push(backup_dir.relative_path());
490 path.push(filename);
491
492 let raw_data = proxmox::tools::fs::file_get_contents(&path)?;
493 let raw_size = raw_data.len() as u64;
494 let blob = DataBlob::from_raw(raw_data)?;
495 Ok((blob, raw_size))
496 }
497
498 pub fn load_manifest(
499 &self,
500 backup_dir: &BackupDir,
501 ) -> Result<(BackupManifest, CryptMode, u64), Error> {
502 let (blob, raw_size) = self.load_blob(backup_dir, MANIFEST_BLOB_NAME)?;
503 let crypt_mode = blob.crypt_mode()?;
504 let manifest = BackupManifest::try_from(blob)?;
505 Ok((manifest, crypt_mode, raw_size))
506 }
507 }