]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/datastore.rs
src/api2/backup.rs: aquire backup lock earlier in create_locked_backup_group()
[proxmox-backup.git] / src / backup / datastore.rs
1 use std::collections::{HashSet, HashMap};
2 use std::io::{self, Write};
3 use std::path::{Path, PathBuf};
4 use std::sync::{Arc, Mutex};
5 use std::convert::TryFrom;
6
7 use anyhow::{bail, format_err, Error};
8 use lazy_static::lazy_static;
9 use chrono::{DateTime, Utc};
10
11 use super::backup_info::{BackupGroup, BackupGroupGuard, BackupDir, BackupInfo};
12 use super::chunk_store::ChunkStore;
13 use super::dynamic_index::{DynamicIndexReader, DynamicIndexWriter};
14 use super::fixed_index::{FixedIndexReader, FixedIndexWriter};
15 use super::manifest::{MANIFEST_BLOB_NAME, CLIENT_LOG_BLOB_NAME, BackupManifest};
16 use super::index::*;
17 use super::{DataBlob, ArchiveType, archive_type};
18 use crate::backup::CryptMode;
19 use crate::config::datastore;
20 use crate::server::WorkerTask;
21 use crate::tools;
22 use crate::api2::types::GarbageCollectionStatus;
23
24 lazy_static! {
25 static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
26 }
27
28 /// Datastore Management
29 ///
30 /// A Datastore can store severals backups, and provides the
31 /// management interface for backup.
32 pub struct DataStore {
33 chunk_store: Arc<ChunkStore>,
34 gc_mutex: Mutex<bool>,
35 last_gc_status: Mutex<GarbageCollectionStatus>,
36 }
37
38 impl DataStore {
39
40 pub fn lookup_datastore(name: &str) -> Result<Arc<DataStore>, Error> {
41
42 let (config, _digest) = datastore::config()?;
43 let config: datastore::DataStoreConfig = config.lookup("datastore", name)?;
44
45 let mut map = DATASTORE_MAP.lock().unwrap();
46
47 if let Some(datastore) = map.get(name) {
48 // Compare Config - if changed, create new Datastore object!
49 if datastore.chunk_store.base == PathBuf::from(&config.path) {
50 return Ok(datastore.clone());
51 }
52 }
53
54 let datastore = DataStore::open(name)?;
55
56 let datastore = Arc::new(datastore);
57 map.insert(name.to_string(), datastore.clone());
58
59 Ok(datastore)
60 }
61
62 pub fn open(store_name: &str) -> Result<Self, Error> {
63
64 let (config, _digest) = datastore::config()?;
65 let (_, store_config) = config.sections.get(store_name)
66 .ok_or(format_err!("no such datastore '{}'", store_name))?;
67
68 let path = store_config["path"].as_str().unwrap();
69
70 let chunk_store = ChunkStore::open(store_name, path)?;
71
72 let gc_status = GarbageCollectionStatus::default();
73
74 Ok(Self {
75 chunk_store: Arc::new(chunk_store),
76 gc_mutex: Mutex::new(false),
77 last_gc_status: Mutex::new(gc_status),
78 })
79 }
80
81 pub fn get_chunk_iterator(
82 &self,
83 ) -> Result<
84 impl Iterator<Item = (Result<tools::fs::ReadDirEntry, Error>, usize)>,
85 Error
86 > {
87 self.chunk_store.get_chunk_iterator()
88 }
89
90 pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> {
91
92 let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
93
94 Ok(index)
95 }
96
97 pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> {
98
99 let full_path = self.chunk_store.relative_path(filename.as_ref());
100
101 let index = FixedIndexReader::open(&full_path)?;
102
103 Ok(index)
104 }
105
106 pub fn create_dynamic_writer<P: AsRef<Path>>(
107 &self, filename: P,
108 ) -> Result<DynamicIndexWriter, Error> {
109
110 let index = DynamicIndexWriter::create(
111 self.chunk_store.clone(), filename.as_ref())?;
112
113 Ok(index)
114 }
115
116 pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
117
118 let full_path = self.chunk_store.relative_path(filename.as_ref());
119
120 let index = DynamicIndexReader::open(&full_path)?;
121
122 Ok(index)
123 }
124
125 pub fn open_index<P>(&self, filename: P) -> Result<Box<dyn IndexFile + Send>, Error>
126 where
127 P: AsRef<Path>,
128 {
129 let filename = filename.as_ref();
130 let out: Box<dyn IndexFile + Send> =
131 match archive_type(filename)? {
132 ArchiveType::DynamicIndex => Box::new(self.open_dynamic_reader(filename)?),
133 ArchiveType::FixedIndex => Box::new(self.open_fixed_reader(filename)?),
134 _ => bail!("cannot open index file of unknown type: {:?}", filename),
135 };
136 Ok(out)
137 }
138
139 pub fn name(&self) -> &str {
140 self.chunk_store.name()
141 }
142
143 pub fn base_path(&self) -> PathBuf {
144 self.chunk_store.base_path()
145 }
146
147 /// Cleanup a backup directory
148 ///
149 /// Removes all files not mentioned in the manifest.
150 pub fn cleanup_backup_dir(&self, backup_dir: &BackupDir, manifest: &BackupManifest
151 ) -> Result<(), Error> {
152
153 let mut full_path = self.base_path();
154 full_path.push(backup_dir.relative_path());
155
156 let mut wanted_files = HashSet::new();
157 wanted_files.insert(MANIFEST_BLOB_NAME.to_string());
158 wanted_files.insert(CLIENT_LOG_BLOB_NAME.to_string());
159 manifest.files().iter().for_each(|item| { wanted_files.insert(item.filename.clone()); });
160
161 for item in tools::fs::read_subdir(libc::AT_FDCWD, &full_path)? {
162 if let Ok(item) = item {
163 if let Some(file_type) = item.file_type() {
164 if file_type != nix::dir::Type::File { continue; }
165 }
166 let file_name = item.file_name().to_bytes();
167 if file_name == b"." || file_name == b".." { continue; };
168
169 if let Ok(name) = std::str::from_utf8(file_name) {
170 if wanted_files.contains(name) { continue; }
171 }
172 println!("remove unused file {:?}", item.file_name());
173 let dirfd = item.parent_fd();
174 let _res = unsafe { libc::unlinkat(dirfd, item.file_name().as_ptr(), 0) };
175 }
176 }
177
178 Ok(())
179 }
180
181 /// Returns the absolute path for a backup_group
182 pub fn group_path(&self, backup_group: &BackupGroup) -> PathBuf {
183 let mut full_path = self.base_path();
184 full_path.push(backup_group.group_path());
185 full_path
186 }
187
188 /// Returns the absolute path for backup_dir
189 pub fn snapshot_path(&self, backup_dir: &BackupDir) -> PathBuf {
190 let mut full_path = self.base_path();
191 full_path.push(backup_dir.relative_path());
192 full_path
193 }
194
195 /// Remove a complete backup group including all snapshots
196 pub fn remove_backup_group(&self, backup_group: &BackupGroup) -> Result<(), Error> {
197
198 let full_path = self.group_path(backup_group);
199
200 let mut snap_list = backup_group.list_backups(&self.base_path())?;
201 BackupInfo::sort_list(&mut snap_list, false);
202 for snap in snap_list {
203 if snap.is_finished() {
204 break;
205 } else {
206 bail!(
207 "cannot remove backup group {:?}, contains potentially running backup: {}",
208 full_path,
209 snap.backup_dir
210 );
211 }
212 }
213
214 log::info!("removing backup group {:?}", full_path);
215 std::fs::remove_dir_all(&full_path)
216 .map_err(|err| {
217 format_err!(
218 "removing backup group {:?} failed - {}",
219 full_path,
220 err,
221 )
222 })?;
223
224 Ok(())
225 }
226
227 /// Remove a backup directory including all content
228 pub fn remove_backup_dir(&self, backup_dir: &BackupDir, force: bool) -> Result<(), Error> {
229
230 let full_path = self.snapshot_path(backup_dir);
231
232 if !force {
233 let mut snap_list = backup_dir.group().list_backups(&self.base_path())?;
234 BackupInfo::sort_list(&mut snap_list, false);
235 let mut prev_snap_finished = true;
236 for snap in snap_list {
237 let cur_snap_finished = snap.is_finished();
238 if &snap.backup_dir == backup_dir {
239 if !cur_snap_finished {
240 bail!(
241 "cannot remove currently running snapshot: {:?}",
242 backup_dir
243 );
244 }
245 if !prev_snap_finished {
246 bail!(
247 "cannot remove snapshot {:?}, successor is currently running and potentially based on it",
248 backup_dir
249 );
250 }
251 break;
252 }
253 prev_snap_finished = cur_snap_finished;
254 }
255 }
256
257 log::info!("removing backup snapshot {:?}", full_path);
258 std::fs::remove_dir_all(&full_path)
259 .map_err(|err| {
260 format_err!(
261 "removing backup snapshot {:?} failed - {}",
262 full_path,
263 err,
264 )
265 })?;
266
267 Ok(())
268 }
269
270 /// Returns the time of the last successful backup
271 ///
272 /// Or None if there is no backup in the group (or the group dir does not exist).
273 pub fn last_successful_backup(&self, backup_group: &BackupGroup) -> Result<Option<DateTime<Utc>>, Error> {
274 let base_path = self.base_path();
275 let mut group_path = base_path.clone();
276 group_path.push(backup_group.group_path());
277
278 if group_path.exists() {
279 backup_group.last_successful_backup(&base_path)
280 } else {
281 Ok(None)
282 }
283 }
284
285 /// Returns the backup owner.
286 ///
287 /// The backup owner is the user who first created the backup group.
288 pub fn get_owner(&self, backup_group: &BackupGroup) -> Result<String, Error> {
289 let mut full_path = self.base_path();
290 full_path.push(backup_group.group_path());
291 full_path.push("owner");
292 let owner = proxmox::tools::fs::file_read_firstline(full_path)?;
293 Ok(owner.trim_end().to_string()) // remove trailing newline
294 }
295
296 /// Set the backup owner.
297 pub fn set_owner(&self, backup_group: &BackupGroup, userid: &str, force: bool) -> Result<(), Error> {
298 let mut path = self.base_path();
299 path.push(backup_group.group_path());
300 path.push("owner");
301
302 let mut open_options = std::fs::OpenOptions::new();
303 open_options.write(true);
304 open_options.truncate(true);
305
306 if force {
307 open_options.create(true);
308 } else {
309 open_options.create_new(true);
310 }
311
312 let mut file = open_options.open(&path)
313 .map_err(|err| format_err!("unable to create owner file {:?} - {}", path, err))?;
314
315 write!(file, "{}\n", userid)
316 .map_err(|err| format_err!("unable to write owner file {:?} - {}", path, err))?;
317
318 Ok(())
319 }
320
321 /// Create (if it does not already exists) and lock a backup group
322 ///
323 /// And set the owner to 'userid'. If the group already exists, it returns the
324 /// current owner (instead of setting the owner).
325 ///
326 /// This also aquires an exclusive lock on the directory and returns the lock guard.
327 pub fn create_locked_backup_group(&self, backup_group: &BackupGroup, userid: &str) -> Result<(String, BackupGroupGuard), Error> {
328
329 // create intermediate path first:
330 let base_path = self.base_path();
331
332 let mut full_path = base_path.clone();
333 full_path.push(backup_group.backup_type());
334 std::fs::create_dir_all(&full_path)?;
335
336 full_path.push(backup_group.backup_id());
337
338 // create the last component now
339 match std::fs::create_dir(&full_path) {
340 Ok(_) => {
341 let guard = backup_group.lock(&base_path)?;
342 self.set_owner(backup_group, userid, false)?;
343 let owner = self.get_owner(backup_group)?; // just to be sure
344 Ok((owner, guard))
345 }
346 Err(ref err) if err.kind() == io::ErrorKind::AlreadyExists => {
347 let guard = backup_group.lock(&base_path)?;
348 let owner = self.get_owner(backup_group)?; // just to be sure
349 Ok((owner, guard))
350 }
351 Err(err) => bail!("unable to create backup group {:?} - {}", full_path, err),
352 }
353 }
354
355 /// Creates a new backup snapshot inside a BackupGroup
356 ///
357 /// The BackupGroup directory needs to exist.
358 pub fn create_backup_dir(&self, backup_dir: &BackupDir) -> Result<(PathBuf, bool), io::Error> {
359 let relative_path = backup_dir.relative_path();
360 let mut full_path = self.base_path();
361 full_path.push(&relative_path);
362
363 match std::fs::create_dir(&full_path) {
364 Ok(_) => Ok((relative_path, true)),
365 Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => Ok((relative_path, false)),
366 Err(e) => Err(e)
367 }
368 }
369
370 pub fn list_images(&self) -> Result<Vec<PathBuf>, Error> {
371 let base = self.base_path();
372
373 let mut list = vec![];
374
375 use walkdir::WalkDir;
376
377 let walker = WalkDir::new(&base).same_file_system(true).into_iter();
378
379 // make sure we skip .chunks (and other hidden files to keep it simple)
380 fn is_hidden(entry: &walkdir::DirEntry) -> bool {
381 entry.file_name()
382 .to_str()
383 .map(|s| s.starts_with("."))
384 .unwrap_or(false)
385 }
386 let handle_entry_err = |err: walkdir::Error| {
387 if let Some(inner) = err.io_error() {
388 let path = err.path().unwrap_or(Path::new(""));
389 match inner.kind() {
390 io::ErrorKind::PermissionDenied => {
391 // only allow to skip ext4 fsck directory, avoid GC if, for example,
392 // a user got file permissions wrong on datastore rsync to new server
393 if err.depth() > 1 || !path.ends_with("lost+found") {
394 bail!("cannot continue garbage-collection safely, permission denied on: {}", path.display())
395 }
396 },
397 _ => bail!("unexpected error on datastore traversal: {} - {}", inner, path.display()),
398 }
399 }
400 Ok(())
401 };
402 for entry in walker.filter_entry(|e| !is_hidden(e)) {
403 let path = match entry {
404 Ok(entry) => entry.into_path(),
405 Err(err) => {
406 handle_entry_err(err)?;
407 continue
408 },
409 };
410 if let Ok(archive_type) = archive_type(&path) {
411 if archive_type == ArchiveType::FixedIndex || archive_type == ArchiveType::DynamicIndex {
412 list.push(path);
413 }
414 }
415 }
416
417 Ok(list)
418 }
419
420 // mark chunks used by ``index`` as used
421 fn index_mark_used_chunks<I: IndexFile>(
422 &self,
423 index: I,
424 file_name: &Path, // only used for error reporting
425 status: &mut GarbageCollectionStatus,
426 worker: &WorkerTask,
427 ) -> Result<(), Error> {
428
429 status.index_file_count += 1;
430 status.index_data_bytes += index.index_bytes();
431
432 for pos in 0..index.index_count() {
433 worker.fail_on_abort()?;
434 tools::fail_on_shutdown()?;
435 let digest = index.index_digest(pos).unwrap();
436 if let Err(err) = self.chunk_store.touch_chunk(digest) {
437 bail!("unable to access chunk {}, required by {:?} - {}",
438 proxmox::tools::digest_to_hex(digest), file_name, err);
439 }
440 }
441 Ok(())
442 }
443
444 fn mark_used_chunks(&self, status: &mut GarbageCollectionStatus, worker: &WorkerTask) -> Result<(), Error> {
445
446 let image_list = self.list_images()?;
447
448 for path in image_list {
449
450 worker.fail_on_abort()?;
451 tools::fail_on_shutdown()?;
452
453 if let Ok(archive_type) = archive_type(&path) {
454 if archive_type == ArchiveType::FixedIndex {
455 let index = self.open_fixed_reader(&path)?;
456 self.index_mark_used_chunks(index, &path, status, worker)?;
457 } else if archive_type == ArchiveType::DynamicIndex {
458 let index = self.open_dynamic_reader(&path)?;
459 self.index_mark_used_chunks(index, &path, status, worker)?;
460 }
461 }
462 }
463
464 Ok(())
465 }
466
467 pub fn last_gc_status(&self) -> GarbageCollectionStatus {
468 self.last_gc_status.lock().unwrap().clone()
469 }
470
471 pub fn garbage_collection_running(&self) -> bool {
472 if let Ok(_) = self.gc_mutex.try_lock() { false } else { true }
473 }
474
475 pub fn garbage_collection(&self, worker: &WorkerTask) -> Result<(), Error> {
476
477 if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
478
479 let _exclusive_lock = self.chunk_store.try_exclusive_lock()?;
480
481 let now = unsafe { libc::time(std::ptr::null_mut()) };
482
483 let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(now);
484
485 let mut gc_status = GarbageCollectionStatus::default();
486 gc_status.upid = Some(worker.to_string());
487
488 worker.log("Start GC phase1 (mark used chunks)");
489
490 self.mark_used_chunks(&mut gc_status, &worker)?;
491
492 worker.log("Start GC phase2 (sweep unused chunks)");
493 self.chunk_store.sweep_unused_chunks(oldest_writer, &mut gc_status, &worker)?;
494
495 worker.log(&format!("Removed bytes: {}", gc_status.removed_bytes));
496 worker.log(&format!("Removed chunks: {}", gc_status.removed_chunks));
497 if gc_status.pending_bytes > 0 {
498 worker.log(&format!("Pending removals: {} bytes ({} chunks)", gc_status.pending_bytes, gc_status.pending_chunks));
499 }
500
501 worker.log(&format!("Original data bytes: {}", gc_status.index_data_bytes));
502
503 if gc_status.index_data_bytes > 0 {
504 let comp_per = (gc_status.disk_bytes*100)/gc_status.index_data_bytes;
505 worker.log(&format!("Disk bytes: {} ({} %)", gc_status.disk_bytes, comp_per));
506 }
507
508 worker.log(&format!("Disk chunks: {}", gc_status.disk_chunks));
509
510 if gc_status.disk_chunks > 0 {
511 let avg_chunk = gc_status.disk_bytes/(gc_status.disk_chunks as u64);
512 worker.log(&format!("Average chunk size: {}", avg_chunk));
513 }
514
515 *self.last_gc_status.lock().unwrap() = gc_status;
516
517 } else {
518 bail!("Start GC failed - (already running/locked)");
519 }
520
521 Ok(())
522 }
523
524 pub fn try_shared_chunk_store_lock(&self) -> Result<tools::ProcessLockSharedGuard, Error> {
525 self.chunk_store.try_shared_lock()
526 }
527
528 pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
529 self.chunk_store.chunk_path(digest)
530 }
531
532 pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result<bool, Error> {
533 self.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
534 }
535
536 pub fn insert_chunk(
537 &self,
538 chunk: &DataBlob,
539 digest: &[u8; 32],
540 ) -> Result<(bool, u64), Error> {
541 self.chunk_store.insert_chunk(chunk, digest)
542 }
543
544 pub fn verify_stored_chunk(&self, digest: &[u8; 32], expected_chunk_size: u64) -> Result<(), Error> {
545 let blob = self.load_chunk(digest)?;
546 blob.verify_unencrypted(expected_chunk_size as usize, digest)?;
547 Ok(())
548 }
549
550 pub fn load_blob(&self, backup_dir: &BackupDir, filename: &str) -> Result<DataBlob, Error> {
551 let mut path = self.base_path();
552 path.push(backup_dir.relative_path());
553 path.push(filename);
554
555 proxmox::try_block!({
556 let mut file = std::fs::File::open(&path)?;
557 DataBlob::load_from_reader(&mut file)
558 }).map_err(|err| format_err!("unable to load blob '{:?}' - {}", path, err))
559 }
560
561 pub fn load_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
562
563 let (chunk_path, digest_str) = self.chunk_store.chunk_path(digest);
564
565 proxmox::try_block!({
566 let mut file = std::fs::File::open(&chunk_path)?;
567 DataBlob::load_from_reader(&mut file)
568 }).map_err(|err| format_err!(
569 "store '{}', unable to load chunk '{}' - {}",
570 self.name(),
571 digest_str,
572 err,
573 ))
574 }
575
576 pub fn load_manifest(
577 &self,
578 backup_dir: &BackupDir,
579 ) -> Result<(BackupManifest, CryptMode, u64), Error> {
580 let blob = self.load_blob(backup_dir, MANIFEST_BLOB_NAME)?;
581 let raw_size = blob.raw_size();
582 let crypt_mode = blob.crypt_mode()?;
583 let manifest = BackupManifest::try_from(blob)?;
584 Ok((manifest, crypt_mode, raw_size))
585 }
586 }