]> git.proxmox.com Git - proxmox-backup.git/blame - pbs-datastore/src/chunk_store.rs
datastore: implement sync-level tuning for datastores
[proxmox-backup.git] / pbs-datastore / src / chunk_store.rs
CommitLineData
43b13033 1use std::os::unix::io::AsRawFd;
c23192d3
WB
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex};
4
5use anyhow::{bail, format_err, Error};
128b37fe 6
647186dd 7use pbs_api_types::{DatastoreFSyncLevel, GarbageCollectionStatus};
6de1899b 8use proxmox_sys::fs::{create_dir, create_path, file_type_from_file_stat, CreateOptions};
42c2b5be
TL
9use proxmox_sys::process_locker::{
10 ProcessLockExclusiveGuard, ProcessLockSharedGuard, ProcessLocker,
11};
12use proxmox_sys::task_log;
13use proxmox_sys::WorkerTaskContext;
a92830dc 14
c23192d3 15use crate::DataBlob;
365bb90f 16
e5064ba6 17/// File system based chunk store
35cf5daa 18pub struct ChunkStore {
277fc5a3 19 name: String, // used for error reporting
42c2b5be 20 pub(crate) base: PathBuf,
35cf5daa 21 chunk_dir: PathBuf,
81b2a872 22 mutex: Mutex<()>,
6da20161 23 locker: Option<Arc<Mutex<ProcessLocker>>>,
647186dd 24 sync_level: DatastoreFSyncLevel,
128b37fe
DM
25}
26
176e4af9
DM
27// TODO: what about sysctl setting vm.vfs_cache_pressure (0 - 100) ?
28
36898ffc 29pub fn verify_chunk_size(size: usize) -> Result<(), Error> {
42c2b5be
TL
30 static SIZES: [usize; 7] = [
31 64 * 1024,
32 128 * 1024,
33 256 * 1024,
34 512 * 1024,
35 1024 * 1024,
36 2048 * 1024,
37 4096 * 1024,
38 ];
247cdbce
DM
39
40 if !SIZES.contains(&size) {
d4902506 41 bail!("Got unsupported chunk size '{size}'");
247cdbce
DM
42 }
43 Ok(())
44}
45
08481a0b 46fn digest_to_prefix(digest: &[u8]) -> PathBuf {
42c2b5be 47 let mut buf = Vec::<u8>::with_capacity(2 + 1 + 2 + 1);
128b37fe 48
62ee2eb4 49 const HEX_CHARS: &[u8; 16] = b"0123456789abcdef";
22968600 50
128b37fe 51 buf.push(HEX_CHARS[(digest[0] as usize) >> 4]);
42c2b5be 52 buf.push(HEX_CHARS[(digest[0] as usize) & 0xf]);
e95950e4 53 buf.push(HEX_CHARS[(digest[1] as usize) >> 4]);
128b37fe 54 buf.push(HEX_CHARS[(digest[1] as usize) & 0xf]);
d8d8af98 55 buf.push(b'/');
128b37fe 56
42c2b5be 57 let path = unsafe { String::from_utf8_unchecked(buf) };
128b37fe
DM
58
59 path.into()
35cf5daa
DM
60}
61
62impl ChunkStore {
6da20161
WB
63 #[doc(hidden)]
64 pub unsafe fn panic_store() -> Self {
65 Self {
66 name: String::new(),
67 base: PathBuf::new(),
68 chunk_dir: PathBuf::new(),
69 mutex: Mutex::new(()),
70 locker: None,
647186dd 71 sync_level: Default::default(),
6da20161
WB
72 }
73 }
74
45773720 75 fn chunk_dir<P: AsRef<Path>>(path: P) -> PathBuf {
45773720
DM
76 let mut chunk_dir: PathBuf = PathBuf::from(path.as_ref());
77 chunk_dir.push(".chunks");
78
79 chunk_dir
80 }
81
c23192d3
WB
82 pub fn base(&self) -> &Path {
83 &self.base
84 }
85
42c2b5be
TL
86 pub fn create<P>(
87 name: &str,
88 path: P,
89 uid: nix::unistd::Uid,
90 gid: nix::unistd::Gid,
91 worker: Option<&dyn WorkerTaskContext>,
647186dd 92 sync_level: DatastoreFSyncLevel,
42c2b5be 93 ) -> Result<Self, Error>
afdcfb5b
WB
94 where
95 P: Into<PathBuf>,
96 {
45773720 97 let base: PathBuf = path.into();
68469eeb
DM
98
99 if !base.is_absolute() {
d4902506 100 bail!("expected absolute path - got {base:?}");
68469eeb
DM
101 }
102
45773720 103 let chunk_dir = Self::chunk_dir(&base);
35cf5daa 104
42c2b5be 105 let options = CreateOptions::new().owner(uid).group(gid);
e67770d4 106
0b97bc61
DM
107 let default_options = CreateOptions::new();
108
44288184 109 match create_path(&base, Some(default_options), Some(options.clone())) {
d4902506 110 Err(err) => bail!("unable to create chunk store '{name}' at {base:?} - {err}"),
42c2b5be
TL
111 Ok(res) => {
112 if !res {
113 nix::unistd::chown(&base, Some(uid), Some(gid))?
114 }
115 }
2989f6bf
DM
116 }
117
0b97bc61 118 if let Err(err) = create_dir(&chunk_dir, options.clone()) {
d4902506 119 bail!("unable to create chunk store '{name}' subdir {chunk_dir:?} - {err}");
2989f6bf 120 }
35cf5daa 121
7e210bd0
DM
122 // create lock file with correct owner/group
123 let lockfile_path = Self::lockfile_path(&base);
25877d05 124 proxmox_sys::fs::replace_file(lockfile_path, b"", options.clone(), false)?;
7e210bd0 125
bc616633 126 // create 64*1024 subdirs
e95950e4
DM
127 let mut last_percentage = 0;
128
42c2b5be 129 for i in 0..64 * 1024 {
af3e7d75 130 let mut l1path = chunk_dir.clone();
bc616633 131 l1path.push(format!("{:04x}", i));
0b97bc61 132 if let Err(err) = create_dir(&l1path, options.clone()) {
42c2b5be
TL
133 bail!(
134 "unable to create chunk store '{}' subdir {:?} - {}",
135 name,
136 l1path,
137 err
138 );
2989f6bf 139 }
42c2b5be 140 let percentage = (i * 100) / (64 * 1024);
bc616633 141 if percentage != last_percentage {
2de4dc3a
DC
142 if let Some(worker) = worker {
143 task_log!(worker, "Chunkstore create: {}%", percentage)
144 }
bc616633 145 last_percentage = percentage;
e95950e4 146 }
35cf5daa
DM
147 }
148
647186dd 149 Self::open(name, base, sync_level)
35cf5daa
DM
150 }
151
7e210bd0 152 fn lockfile_path<P: Into<PathBuf>>(base: P) -> PathBuf {
44288184 153 let mut lockfile_path: PathBuf = base.into();
7e210bd0 154 lockfile_path.push(".lock");
7e210bd0
DM
155 lockfile_path
156 }
157
2515ff35
TL
158 /// Opens the chunk store with a new process locker.
159 ///
160 /// Note that this must be used with care, as it's dangerous to create two instances on the
161 /// same base path, as closing the underlying ProcessLocker drops all locks from this process
162 /// on the lockfile (even if separate FDs)
647186dd
DC
163 pub(crate) fn open<P: Into<PathBuf>>(
164 name: &str,
165 base: P,
166 sync_level: DatastoreFSyncLevel,
167 ) -> Result<Self, Error> {
7e210bd0 168 let base: PathBuf = base.into();
68469eeb
DM
169
170 if !base.is_absolute() {
171 bail!("expected absolute path - got {:?}", base);
172 }
173
45773720
DM
174 let chunk_dir = Self::chunk_dir(&base);
175
ce55dbbc 176 if let Err(err) = std::fs::metadata(&chunk_dir) {
d4902506 177 bail!("unable to open chunk store '{name}' at {chunk_dir:?} - {err}");
ce55dbbc 178 }
45773720 179
7e210bd0 180 let lockfile_path = Self::lockfile_path(&base);
45773720 181
c23192d3 182 let locker = ProcessLocker::new(&lockfile_path)?;
35cf5daa 183
b8d4766a 184 Ok(ChunkStore {
277fc5a3 185 name: name.to_owned(),
b8d4766a
DM
186 base,
187 chunk_dir,
6da20161 188 locker: Some(locker),
42c2b5be 189 mutex: Mutex::new(()),
647186dd 190 sync_level,
b8d4766a 191 })
35cf5daa
DM
192 }
193
a660978c 194 pub fn touch_chunk(&self, digest: &[u8; 32]) -> Result<(), Error> {
6da20161
WB
195 // unwrap: only `None` in unit tests
196 assert!(self.locker.is_some());
197
2585a8a4
DM
198 self.cond_touch_chunk(digest, true)?;
199 Ok(())
200 }
201
b298e9f1 202 pub fn cond_touch_chunk(&self, digest: &[u8; 32], assert_exists: bool) -> Result<bool, Error> {
6da20161
WB
203 // unwrap: only `None` in unit tests
204 assert!(self.locker.is_some());
205
a660978c 206 let (chunk_path, _digest_str) = self.chunk_path(digest);
b298e9f1 207 self.cond_touch_path(&chunk_path, assert_exists)
fd192564 208 }
3d5c11e5 209
b298e9f1 210 pub fn cond_touch_path(&self, path: &Path, assert_exists: bool) -> Result<bool, Error> {
6da20161
WB
211 // unwrap: only `None` in unit tests
212 assert!(self.locker.is_some());
213
07ce44a6
DM
214 const UTIME_NOW: i64 = (1 << 30) - 1;
215 const UTIME_OMIT: i64 = (1 << 30) - 2;
7ee2aa1b 216
a198d74f 217 let times: [libc::timespec; 2] = [
42c2b5be
TL
218 libc::timespec {
219 tv_sec: 0,
220 tv_nsec: UTIME_NOW,
221 },
222 libc::timespec {
223 tv_sec: 0,
224 tv_nsec: UTIME_OMIT,
225 },
7ee2aa1b
DM
226 ];
227
228 use nix::NixPath;
229
fd192564 230 let res = path.with_nix_path(|cstr| unsafe {
2585a8a4
DM
231 let tmp = libc::utimensat(-1, cstr.as_ptr(), &times[0], libc::AT_SYMLINK_NOFOLLOW);
232 nix::errno::Errno::result(tmp)
7ee2aa1b
DM
233 })?;
234
2585a8a4 235 if let Err(err) = res {
11ca8343 236 if !assert_exists && err == nix::errno::Errno::ENOENT {
2585a8a4
DM
237 return Ok(false);
238 }
d4902506 239 bail!("update atime failed for chunk/file {path:?} - {err}");
7ee2aa1b
DM
240 }
241
2585a8a4 242 Ok(true)
3d5c11e5
DM
243 }
244
9739aca4
WB
245 pub fn get_chunk_iterator(
246 &self,
247 ) -> Result<
42c2b5be
TL
248 impl Iterator<Item = (Result<proxmox_sys::fs::ReadDirEntry, Error>, usize, bool)>
249 + std::iter::FusedIterator,
250 Error,
9739aca4 251 > {
6da20161
WB
252 // unwrap: only `None` in unit tests
253 assert!(self.locker.is_some());
254
9739aca4
WB
255 use nix::dir::Dir;
256 use nix::fcntl::OFlag;
257 use nix::sys::stat::Mode;
258
42c2b5be
TL
259 let base_handle =
260 Dir::open(&self.chunk_dir, OFlag::O_RDONLY, Mode::empty()).map_err(|err| {
a24e3993 261 format_err!(
d4902506 262 "unable to open store '{}' chunk dir {:?} - {err}",
a24e3993
DM
263 self.name,
264 self.chunk_dir,
a24e3993
DM
265 )
266 })?;
9739aca4 267
a3f3e91d 268 let mut done = false;
25877d05 269 let mut inner: Option<proxmox_sys::fs::ReadDir> = None;
a3f3e91d
WB
270 let mut at = 0;
271 let mut percentage = 0;
272 Ok(std::iter::from_fn(move || {
273 if done {
274 return None;
275 }
276
277 loop {
278 if let Some(ref mut inner) = inner {
279 match inner.next() {
280 Some(Ok(entry)) => {
281 // skip files if they're not a hash
282 let bytes = entry.file_name().to_bytes();
a9767cf7 283 if bytes.len() != 64 && bytes.len() != 64 + ".0.bad".len() {
a3f3e91d
WB
284 continue;
285 }
a9767cf7 286 if !bytes.iter().take(64).all(u8::is_ascii_hexdigit) {
a3f3e91d
WB
287 continue;
288 }
a9767cf7 289
d8d8af98 290 let bad = bytes.ends_with(b".bad");
a9767cf7 291 return Some((Ok(entry), percentage, bad));
a3f3e91d
WB
292 }
293 Some(Err(err)) => {
294 // stop after first error
295 done = true;
296 // and pass the error through:
a9767cf7 297 return Some((Err(err), percentage, false));
a3f3e91d
WB
298 }
299 None => (), // open next directory
c7f481b6 300 }
c7f481b6 301 }
a3f3e91d
WB
302
303 inner = None;
304
305 if at == 0x10000 {
306 done = true;
307 return None;
308 }
309
310 let subdir: &str = &format!("{:04x}", at);
311 percentage = (at * 100) / 0x10000;
312 at += 1;
25877d05 313 match proxmox_sys::fs::read_subdir(base_handle.as_raw_fd(), subdir) {
a3f3e91d
WB
314 Ok(dir) => {
315 inner = Some(dir);
316 // start reading:
317 continue;
318 }
11ca8343 319 Err(ref err) if err == &nix::errno::Errno::ENOENT => {
a3f3e91d
WB
320 // non-existing directories are okay, just keep going:
321 continue;
322 }
323 Err(err) => {
324 // other errors are fatal, so end our iteration
325 done = true;
326 // and pass the error through:
42c2b5be 327 return Some((
d4902506 328 Err(format_err!("unable to read subdir '{subdir}' - {err}")),
42c2b5be
TL
329 percentage,
330 false,
331 ));
a3f3e91d 332 }
62f2422f
WB
333 }
334 }
42c2b5be
TL
335 })
336 .fuse())
c7f481b6
WB
337 }
338
11861a48 339 pub fn oldest_writer(&self) -> Option<i64> {
6da20161
WB
340 // unwrap: only `None` in unit tests
341 ProcessLocker::oldest_shared_lock(self.locker.clone().unwrap())
11861a48
DM
342 }
343
0fd55b08 344 pub fn sweep_unused_chunks(
11861a48 345 &self,
cf459b19 346 oldest_writer: i64,
d3d566f7 347 phase1_start_time: i64,
a5736098 348 status: &mut GarbageCollectionStatus,
c8449217 349 worker: &dyn WorkerTaskContext,
11861a48 350 ) -> Result<(), Error> {
6da20161
WB
351 // unwrap: only `None` in unit tests
352 assert!(self.locker.is_some());
353
9349d2a1 354 use nix::sys::stat::fstatat;
a9767cf7 355 use nix::unistd::{unlinkat, UnlinkatFlags};
08481a0b 356
42c2b5be 357 let mut min_atime = phase1_start_time - 3600 * 24; // at least 24h (see mount option relatime)
11861a48 358
cf459b19
DM
359 if oldest_writer < min_atime {
360 min_atime = oldest_writer;
11861a48
DM
361 }
362
363 min_atime -= 300; // add 5 mins gap for safety
364
a5736098 365 let mut last_percentage = 0;
e4c2fbf1
DM
366 let mut chunk_count = 0;
367
a9767cf7 368 for (entry, percentage, bad) in self.get_chunk_iterator()? {
a5736098
DM
369 if last_percentage != percentage {
370 last_percentage = percentage;
42c2b5be 371 task_log!(worker, "processed {}% ({} chunks)", percentage, chunk_count,);
a5736098 372 }
92da93b2 373
f6b1d1cc 374 worker.check_abort()?;
0fd55b08 375 worker.fail_on_shutdown()?;
92da93b2 376
fdd71f52 377 let (dirfd, entry) = match entry {
c7f481b6 378 Ok(entry) => (entry.parent_fd(), entry),
42c2b5be 379 Err(err) => bail!(
d4902506 380 "chunk iterator on chunk store '{}' failed - {err}",
42c2b5be 381 self.name,
42c2b5be 382 ),
eae8aa3a 383 };
fdd71f52 384
d85987ae
DM
385 let filename = entry.file_name();
386
15a77c4c
DM
387 let lock = self.mutex.lock();
388
9349d2a1 389 if let Ok(stat) = fstatat(dirfd, filename, nix::fcntl::AtFlags::AT_SYMLINK_NOFOLLOW) {
6de1899b
DC
390 let file_type = file_type_from_file_stat(&stat);
391 if file_type != Some(nix::dir::Type::File) {
392 drop(lock);
393 continue;
394 }
395
396 chunk_count += 1;
397
fd192564 398 if stat.st_atime < min_atime {
cf459b19
DM
399 //let age = now - stat.st_atime;
400 //println!("UNLINK {} {:?}", age/(3600*24), filename);
a9767cf7 401 if let Err(err) = unlinkat(Some(dirfd), filename, UnlinkatFlags::NoRemoveDir) {
fd192564
SR
402 if bad {
403 status.still_bad += 1;
404 }
9349d2a1 405 bail!(
d4902506 406 "unlinking chunk {filename:?} failed on store '{}' - {err}",
9349d2a1 407 self.name,
9349d2a1 408 );
277fc5a3 409 }
fd192564
SR
410 if bad {
411 status.removed_bad += 1;
412 } else {
413 status.removed_chunks += 1;
414 }
a660978c 415 status.removed_bytes += stat.st_size as u64;
8db14689 416 } else if stat.st_atime < oldest_writer {
fd192564
SR
417 if bad {
418 status.still_bad += 1;
419 } else {
420 status.pending_chunks += 1;
421 }
8db14689 422 status.pending_bytes += stat.st_size as u64;
cf459b19 423 } else {
fd192564
SR
424 if !bad {
425 status.disk_chunks += 1;
426 }
8db14689 427 status.disk_bytes += stat.st_size as u64;
08481a0b 428 }
eae8aa3a 429 }
15a77c4c 430 drop(lock);
eae8aa3a 431 }
a5736098 432
277fc5a3 433 Ok(())
08481a0b
DM
434 }
435
42c2b5be 436 pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> {
6da20161
WB
437 // unwrap: only `None` in unit tests
438 assert!(self.locker.is_some());
439
25877d05 440 //println!("DIGEST {}", hex::encode(digest));
128b37fe 441
81a6ce6f 442 let (chunk_path, digest_str) = self.chunk_path(digest);
c5d82e5f
DM
443
444 let lock = self.mutex.lock();
445
dd612daa
DC
446 let raw_data = chunk.raw_data();
447 let encoded_size = raw_data.len() as u64;
448
d4902506
TL
449 let name = &self.name;
450
c5d82e5f 451 if let Ok(metadata) = std::fs::metadata(&chunk_path) {
dd612daa 452 if !metadata.is_file() {
d4902506 453 bail!("got unexpected file type on store '{name}' for chunk {digest_str}");
c5d82e5f 454 }
dd612daa
DC
455 let old_size = metadata.len();
456 if encoded_size == old_size {
457 self.touch_chunk(digest)?;
458 return Ok((true, old_size));
459 } else if old_size == 0 {
d4902506 460 log::warn!("found empty chunk '{digest_str}' in store {name}, overwriting");
dd612daa 461 } else {
2a088b99
TL
462 // other sizes can happen in legitimate and illegitimate ways:
463 // - illegitimate: encryped chunks and bad actor client
464 // - legitimate: same chunk but newer zstd version (or compression level) can
465 // compress it better (or worse) so the
466 // Ideally we could take the actual smaller chunk so that improved zstd tech gets
467 // leveraged, but we could only allow to do that for un-encrypted ones.
dd612daa 468 }
c5d82e5f 469 }
128b37fe 470
647186dd
DC
471 let chunk_dir_path = chunk_path
472 .parent()
473 .ok_or_else(|| format_err!("unable to get chunk dir"))?
474 .to_owned();
475
476 proxmox_sys::fs::replace_file(
477 chunk_path,
478 raw_data,
479 CreateOptions::new(),
480 self.sync_level == DatastoreFSyncLevel::File,
481 )
482 .map_err(|err| {
483 format_err!("inserting chunk on store '{name}' failed for {digest_str} - {err}")
484 })?;
485
486 if self.sync_level == DatastoreFSyncLevel::File {
487 // fsync dir handle to persist the tmp rename
488 let dir = std::fs::File::open(chunk_dir_path)?;
489 nix::unistd::fsync(dir.as_raw_fd())
490 .map_err(|err| format_err!("fsync failed: {err}"))?;
491 }
c5d82e5f 492
c5d82e5f
DM
493 drop(lock);
494
f98ac774 495 Ok((false, encoded_size))
128b37fe
DM
496 }
497
42c2b5be 498 pub fn chunk_path(&self, digest: &[u8; 32]) -> (PathBuf, String) {
6da20161
WB
499 // unwrap: only `None` in unit tests
500 assert!(self.locker.is_some());
501
81a6ce6f
DM
502 let mut chunk_path = self.chunk_dir.clone();
503 let prefix = digest_to_prefix(digest);
504 chunk_path.push(&prefix);
25877d05 505 let digest_str = hex::encode(digest);
81a6ce6f
DM
506 chunk_path.push(&digest_str);
507 (chunk_path, digest_str)
508 }
509
606ce64b 510 pub fn relative_path(&self, path: &Path) -> PathBuf {
6da20161
WB
511 // unwrap: only `None` in unit tests
512 assert!(self.locker.is_some());
513
606ce64b
DM
514 let mut full_path = self.base.clone();
515 full_path.push(path);
516 full_path
517 }
518
92c3fd2e
DM
519 pub fn name(&self) -> &str {
520 &self.name
521 }
522
3d5c11e5 523 pub fn base_path(&self) -> PathBuf {
6da20161
WB
524 // unwrap: only `None` in unit tests
525 assert!(self.locker.is_some());
526
3d5c11e5
DM
527 self.base.clone()
528 }
43b13033 529
d5790a9f 530 pub fn try_shared_lock(&self) -> Result<ProcessLockSharedGuard, Error> {
6da20161
WB
531 // unwrap: only `None` in unit tests
532 ProcessLocker::try_shared_lock(self.locker.clone().unwrap())
43b13033
DM
533 }
534
d5790a9f 535 pub fn try_exclusive_lock(&self) -> Result<ProcessLockExclusiveGuard, Error> {
6da20161
WB
536 // unwrap: only `None` in unit tests
537 ProcessLocker::try_exclusive_lock(self.locker.clone().unwrap())
43b13033 538 }
35cf5daa
DM
539}
540
35cf5daa
DM
541#[test]
542fn test_chunk_store1() {
d1d74c43 543 let mut path = std::fs::canonicalize(".").unwrap(); // we need absolute path
332dcc22
DM
544 path.push(".testdir");
545
35cf5daa
DM
546 if let Err(_e) = std::fs::remove_dir_all(".testdir") { /* ignore */ }
547
647186dd 548 let chunk_store = ChunkStore::open("test", &path, DatastoreFSyncLevel::None);
35cf5daa
DM
549 assert!(chunk_store.is_err());
550
42c2b5be
TL
551 let user = nix::unistd::User::from_uid(nix::unistd::Uid::current())
552 .unwrap()
553 .unwrap();
647186dd
DC
554 let chunk_store = ChunkStore::create(
555 "test",
556 &path,
557 user.uid,
558 user.gid,
559 None,
560 DatastoreFSyncLevel::None,
561 )
562 .unwrap();
f98ac774 563
42c2b5be
TL
564 let (chunk, digest) = crate::data_blob::DataChunkBuilder::new(&[0u8, 1u8])
565 .build()
566 .unwrap();
f98ac774 567
4ee8f53d 568 let (exists, _) = chunk_store.insert_chunk(&chunk, &digest).unwrap();
391a2e43
DM
569 assert!(!exists);
570
4ee8f53d 571 let (exists, _) = chunk_store.insert_chunk(&chunk, &digest).unwrap();
391a2e43 572 assert!(exists);
128b37fe 573
647186dd
DC
574 let chunk_store = ChunkStore::create(
575 "test",
576 &path,
577 user.uid,
578 user.gid,
579 None,
580 DatastoreFSyncLevel::None,
581 );
35cf5daa
DM
582 assert!(chunk_store.is_err());
583
e0a5d1ca 584 if let Err(_e) = std::fs::remove_dir_all(".testdir") { /* ignore */ }
35cf5daa 585}