]> git.proxmox.com Git - proxmox-backup.git/blame - src/backup/chunk_store.rs
backup: add optional progress callback to ChunkIterator
[proxmox-backup.git] / src / backup / chunk_store.rs
CommitLineData
35cf5daa
DM
1use failure::*;
2use std::path::{Path, PathBuf};
128b37fe 3use std::io::Write;
1628a4c7 4use std::time::Duration;
128b37fe 5
7b2b40a8 6use openssl::sha;
c5d82e5f
DM
7use std::sync::Mutex;
8
365bb90f 9use std::fs::File;
4070096c 10use std::os::unix::io::{AsRawFd, RawFd};
35cf5daa 11
365bb90f 12use crate::tools;
4070096c 13use crate::tools::borrow::Tied;
365bb90f 14
64e53b28
DM
15pub struct GarbageCollectionStatus {
16 pub used_bytes: usize,
17 pub used_chunks: usize,
18 pub disk_bytes: usize,
19 pub disk_chunks: usize,
20}
21
22impl Default for GarbageCollectionStatus {
23 fn default() -> Self {
24 GarbageCollectionStatus {
25 used_bytes: 0,
26 used_chunks: 0,
27 disk_bytes: 0,
28 disk_chunks: 0,
29 }
30 }
31}
32
35cf5daa 33pub struct ChunkStore {
277fc5a3 34 name: String, // used for error reporting
2c32fdde 35 pub (crate) base: PathBuf,
35cf5daa 36 chunk_dir: PathBuf,
c5d82e5f 37 mutex: Mutex<bool>,
eae8aa3a 38 _lockfile: File,
128b37fe
DM
39}
40
176e4af9
DM
41// TODO: what about sysctl setting vm.vfs_cache_pressure (0 - 100) ?
42
08481a0b 43fn digest_to_prefix(digest: &[u8]) -> PathBuf {
128b37fe 44
e95950e4 45 let mut buf = Vec::<u8>::with_capacity(2+1+2+1);
128b37fe 46
22968600
DM
47 const HEX_CHARS: &'static [u8; 16] = b"0123456789abcdef";
48
128b37fe
DM
49 buf.push(HEX_CHARS[(digest[0] as usize) >> 4]);
50 buf.push(HEX_CHARS[(digest[0] as usize) &0xf]);
e95950e4 51 buf.push(HEX_CHARS[(digest[1] as usize) >> 4]);
128b37fe 52 buf.push(HEX_CHARS[(digest[1] as usize) & 0xf]);
128b37fe
DM
53 buf.push('/' as u8);
54
55 let path = unsafe { String::from_utf8_unchecked(buf)};
56
57 path.into()
35cf5daa
DM
58}
59
4070096c
WB
60// This is one thing which would actually get nicer with futures & tokio-fs...
61pub struct ChunkIterator {
62 base_dir: nix::dir::Dir,
63 index: usize,
64 subdir: Option<
65 Tied<nix::dir::Dir, Iterator<Item = nix::Result<nix::dir::Entry>>>
66 >,
67 subdir_fd: RawFd,
c9e7f4dc 68 progress: Option<fn(u8)>,
4070096c
WB
69}
70
71impl ChunkIterator {
72 fn new(base_dir: nix::dir::Dir) -> Self {
73 ChunkIterator {
74 base_dir,
75 index: 0,
76 subdir: None,
77 subdir_fd: 0,
c9e7f4dc 78 progress: None,
4070096c
WB
79 }
80 }
81
c9e7f4dc
WB
82 fn with_progress(base_dir: nix::dir::Dir, progress: fn(u8)) -> Self {
83 let mut me = Self::new(base_dir);
84 me.progress = Some(progress);
85 me
86 }
87
4070096c
WB
88 fn next_subdir(&mut self) -> Result<bool, Error> {
89 if self.index == 0x10000 {
90 return Ok(false);
91 }
92
93 let l1name = PathBuf::from(format!("{:04x}", self.index));
94 self.index += 1;
c9e7f4dc
WB
95 if let Some(cb) = self.progress {
96 let prev = ((self.index-1) * 100) / 0x10000;
97 let now = (self.index * 100) / 0x10000;
98 if prev != now {
99 cb(now as u8);
100 }
101 }
4070096c
WB
102
103 use nix::dir::{Dir, Entry};
104 use nix::fcntl::OFlag;
105 use nix::sys::stat::Mode;
106 match Dir::openat(self.base_dir.as_raw_fd(), &l1name, OFlag::O_RDONLY, Mode::empty()) {
107 Ok(dir) => {
108 self.subdir_fd = dir.as_raw_fd();
109 self.subdir = Some(Tied::new(dir, |dir| {
110 Box::new(unsafe { (*dir).iter() })
111 as Box<Iterator<Item = nix::Result<Entry>>>
112 }));
113 return Ok(true);
114 }
115 Err(err) => {
116 self.index = 0x10000;
117 bail!("unable to open chunk dir {:?}: {}", l1name, err);
118 }
119 }
120 }
121}
122
123impl Iterator for ChunkIterator {
124 type Item = Result<(RawFd, nix::dir::Entry), Error>;
125
126 fn next(&mut self) -> Option<Self::Item> {
127 loop {
128 match self.subdir {
129 None => {
130 match self.next_subdir() {
131 Ok(true) => continue, // Enter the Some case
132 Ok(false) => return None,
133 Err(e) => return Some(Err(e)),
134 }
135 }
136 Some(ref mut dir) => {
137 let dir = dir.as_mut();
138 match dir.next() {
139 Some(Ok(entry)) => return Some(Ok((self.subdir_fd, entry))),
140 Some(Err(e)) => return Some(Err(e.into())),
141 None => {
142 // Go to the next directory
143 self.subdir = None;
144 continue;
145 }
146 }
147 }
148 }
149 }
150 }
151}
12bb93b3 152
35cf5daa
DM
153impl ChunkStore {
154
45773720
DM
155 fn chunk_dir<P: AsRef<Path>>(path: P) -> PathBuf {
156
157 let mut chunk_dir: PathBuf = PathBuf::from(path.as_ref());
158 chunk_dir.push(".chunks");
159
160 chunk_dir
161 }
162
277fc5a3 163 pub fn create<P: Into<PathBuf>>(name: &str, path: P) -> Result<Self, Error> {
35cf5daa 164
45773720 165 let base: PathBuf = path.into();
68469eeb
DM
166
167 if !base.is_absolute() {
168 bail!("expected absolute path - got {:?}", base);
169 }
170
45773720 171 let chunk_dir = Self::chunk_dir(&base);
35cf5daa 172
2989f6bf 173 if let Err(err) = std::fs::create_dir(&base) {
277fc5a3 174 bail!("unable to create chunk store '{}' at {:?} - {}", name, base, err);
2989f6bf
DM
175 }
176
177 if let Err(err) = std::fs::create_dir(&chunk_dir) {
277fc5a3 178 bail!("unable to create chunk store '{}' subdir {:?} - {}", name, chunk_dir, err);
2989f6bf 179 }
35cf5daa 180
bc616633 181 // create 64*1024 subdirs
e95950e4
DM
182 let mut last_percentage = 0;
183
bc616633 184 for i in 0..64*1024 {
af3e7d75 185 let mut l1path = chunk_dir.clone();
bc616633 186 l1path.push(format!("{:04x}", i));
2989f6bf 187 if let Err(err) = std::fs::create_dir(&l1path) {
277fc5a3 188 bail!("unable to create chunk store '{}' subdir {:?} - {}", name, l1path, err);
2989f6bf 189 }
bc616633
DM
190 let percentage = (i*100)/(64*1024);
191 if percentage != last_percentage {
192 eprintln!("Percentage done: {}", percentage);
193 last_percentage = percentage;
e95950e4 194 }
35cf5daa
DM
195 }
196
277fc5a3 197 Self::open(name, base)
35cf5daa
DM
198 }
199
277fc5a3 200 pub fn open<P: Into<PathBuf>>(name: &str, path: P) -> Result<Self, Error> {
35cf5daa 201
45773720 202 let base: PathBuf = path.into();
68469eeb
DM
203
204 if !base.is_absolute() {
205 bail!("expected absolute path - got {:?}", base);
206 }
207
45773720
DM
208 let chunk_dir = Self::chunk_dir(&base);
209
ce55dbbc 210 if let Err(err) = std::fs::metadata(&chunk_dir) {
277fc5a3 211 bail!("unable to open chunk store '{}' at {:?} - {}", name, chunk_dir, err);
ce55dbbc 212 }
45773720
DM
213
214 let mut lockfile_path = base.clone();
215 lockfile_path.push(".lock");
216
5ba69689 217 // make sure only one process/thread/task can use it
1628a4c7
WB
218 let lockfile = tools::open_file_locked(
219 lockfile_path, Duration::from_secs(10))?;
35cf5daa 220
b8d4766a 221 Ok(ChunkStore {
277fc5a3 222 name: name.to_owned(),
b8d4766a
DM
223 base,
224 chunk_dir,
bc616633 225 _lockfile: lockfile,
b8d4766a
DM
226 mutex: Mutex::new(false)
227 })
35cf5daa
DM
228 }
229
7394ca3e 230 pub fn touch_chunk(&self, digest:&[u8]) -> Result<(), Error> {
3d5c11e5 231
a198d74f 232 let mut chunk_path = self.chunk_dir.clone();
08481a0b 233 let prefix = digest_to_prefix(&digest);
3d5c11e5 234 chunk_path.push(&prefix);
22968600 235 let digest_str = tools::digest_to_hex(&digest);
3d5c11e5
DM
236 chunk_path.push(&digest_str);
237
7ee2aa1b
DM
238 const UTIME_NOW: i64 = ((1 << 30) - 1);
239 const UTIME_OMIT: i64 = ((1 << 30) - 2);
240
a198d74f 241 let times: [libc::timespec; 2] = [
7ee2aa1b
DM
242 libc::timespec { tv_sec: 0, tv_nsec: UTIME_NOW },
243 libc::timespec { tv_sec: 0, tv_nsec: UTIME_OMIT }
244 ];
245
246 use nix::NixPath;
247
248 let res = chunk_path.with_nix_path(|cstr| unsafe {
249 libc::utimensat(-1, cstr.as_ptr(), &times[0], libc::AT_SYMLINK_NOFOLLOW)
250 })?;
251
252 if let Err(err) = nix::errno::Errno::result(res) {
253 bail!("updata atime failed for chunk {:?} - {}", chunk_path, err);
254 }
255
3d5c11e5
DM
256 Ok(())
257 }
258
df9973e8 259 pub fn read_chunk(&self, digest:&[u8], buffer: &mut Vec<u8>) -> Result<(), Error> {
96df2fb4
DM
260
261 let mut chunk_path = self.chunk_dir.clone();
262 let prefix = digest_to_prefix(&digest);
263 chunk_path.push(&prefix);
22968600 264 let digest_str = tools::digest_to_hex(&digest);
96df2fb4
DM
265 chunk_path.push(&digest_str);
266
267 let mut f = std::fs::File::open(&chunk_path)?;
df9973e8
DM
268
269 let stat = nix::sys::stat::fstat(f.as_raw_fd())?;
270 let size = stat.st_size as usize;
271
060c4811
DM
272 if buffer.capacity() < size {
273 let mut newsize = buffer.capacity();
274 while newsize < size { newsize = newsize << 1; }
275 let additional = newsize - buffer.len();
276 buffer.reserve_exact(additional);
df9973e8
DM
277 }
278 unsafe { buffer.set_len(size); }
96df2fb4
DM
279
280 use std::io::Read;
281
df9973e8 282 f.read_exact(buffer.as_mut_slice())?;
96df2fb4 283
df9973e8 284 Ok(())
96df2fb4
DM
285 }
286
64e53b28 287 fn sweep_old_files(&self, handle: &mut nix::dir::Dir, status: &mut GarbageCollectionStatus) -> Result<(), Error> {
08481a0b
DM
288
289 let rawfd = handle.as_raw_fd();
290
291 let now = unsafe { libc::time(std::ptr::null_mut()) };
292
293 for entry in handle.iter() {
eae8aa3a
DM
294 let entry = match entry {
295 Ok(entry) => entry,
296 Err(_) => continue /* ignore */,
297 };
298 let file_type = match entry.file_type() {
299 Some(file_type) => file_type,
277fc5a3 300 None => bail!("unsupported file system type on chunk store '{}'", self.name),
eae8aa3a
DM
301 };
302 if file_type != nix::dir::Type::File { continue; }
303
304 let filename = entry.file_name();
305 if let Ok(stat) = nix::sys::stat::fstatat(rawfd, filename, nix::fcntl::AtFlags::AT_SYMLINK_NOFOLLOW) {
306 let age = now - stat.st_atime;
e95950e4 307 //println!("FOUND {} {:?}", age/(3600*24), filename);
eae8aa3a
DM
308 if age/(3600*24) >= 2 {
309 println!("UNLINK {} {:?}", age/(3600*24), filename);
277fc5a3
DM
310 let res = unsafe { libc::unlinkat(rawfd, filename.as_ptr(), 0) };
311 if res != 0 {
312 let err = nix::Error::last();
313 bail!("unlink chunk {:?} failed on store '{}' - {}", filename, self.name, err);
314 }
64e53b28
DM
315 } else {
316 status.disk_chunks += 1;
317 status.disk_bytes += stat.st_size as usize;
318
08481a0b 319 }
eae8aa3a
DM
320 }
321 }
277fc5a3 322 Ok(())
08481a0b
DM
323 }
324
77703d95 325 pub fn sweep_unused_chunks(&self, status: &mut GarbageCollectionStatus) -> Result<(), Error> {
6ea3a0b7 326
1c43c56b
DM
327 use nix::fcntl::OFlag;
328 use nix::sys::stat::Mode;
329 use nix::dir::Dir;
330
eae8aa3a 331 let base_handle = match Dir::open(
1c43c56b 332 &self.chunk_dir, OFlag::O_RDONLY, Mode::empty()) {
2bf5f6b2 333 Ok(h) => h,
277fc5a3
DM
334 Err(err) => bail!("unable to open store '{}' chunk dir {:?} - {}",
335 self.name, self.chunk_dir, err),
2bf5f6b2
DM
336 };
337
338 let base_fd = base_handle.as_raw_fd();
339
176e4af9
DM
340 let mut last_percentage = 0;
341
bc616633 342 for i in 0..64*1024 {
2bf5f6b2 343
bc616633
DM
344 let percentage = (i*100)/(64*1024);
345 if percentage != last_percentage {
346 eprintln!("Percentage done: {}", percentage);
347 last_percentage = percentage;
348 }
6c20a13d 349
bc616633
DM
350 let l1name = PathBuf::from(format!("{:04x}", i));
351 match nix::dir::Dir::openat(base_fd, &l1name, OFlag::O_RDONLY, Mode::empty()) {
352 Ok(mut h) => {
353 //println!("SCAN {:?} {:?}", l1name);
354 self.sweep_old_files(&mut h, status)?;
176e4af9 355 }
bc616633
DM
356 Err(err) => bail!("unable to open store '{}' dir {:?}/{:?} - {}",
357 self.name, self.chunk_dir, l1name, err),
358 };
08481a0b 359 }
6ea3a0b7
DM
360 Ok(())
361 }
362
03e4753d 363 pub fn insert_chunk(&self, chunk: &[u8]) -> Result<(bool, [u8; 32]), Error> {
128b37fe 364
7b2b40a8
DM
365 // fixme: use Sha512/256 when available
366 let mut hasher = sha::Sha256::new();
367 hasher.update(chunk);
368
369 let digest = hasher.finish();
370
22968600 371 //println!("DIGEST {}", tools::digest_to_hex(&digest));
128b37fe 372
af3e7d75 373 let mut chunk_path = self.chunk_dir.clone();
08481a0b 374 let prefix = digest_to_prefix(&digest);
c5d82e5f 375 chunk_path.push(&prefix);
22968600 376 let digest_str = tools::digest_to_hex(&digest);
c5d82e5f
DM
377 chunk_path.push(&digest_str);
378
379 let lock = self.mutex.lock();
380
381 if let Ok(metadata) = std::fs::metadata(&chunk_path) {
382 if metadata.is_file() {
391a2e43 383 return Ok((true, digest));
c5d82e5f 384 } else {
f7dd683b 385 bail!("Got unexpected file type on store '{}' for chunk {}", self.name, digest_str);
c5d82e5f
DM
386 }
387 }
128b37fe 388
c5d82e5f
DM
389 let mut tmp_path = chunk_path.clone();
390 tmp_path.set_extension("tmp");
391 let mut f = std::fs::File::create(&tmp_path)?;
128b37fe
DM
392 f.write_all(chunk)?;
393
c5d82e5f
DM
394 if let Err(err) = std::fs::rename(&tmp_path, &chunk_path) {
395 if let Err(_) = std::fs::remove_file(&tmp_path) { /* ignore */ }
f7dd683b 396 bail!("Atomic rename on store '{}' failed for chunk {} - {}", self.name, digest_str, err);
c5d82e5f
DM
397 }
398
77703d95 399 //println!("PATH {:?}", chunk_path);
128b37fe 400
c5d82e5f
DM
401 drop(lock);
402
391a2e43 403 Ok((false, digest))
128b37fe
DM
404 }
405
606ce64b
DM
406 pub fn relative_path(&self, path: &Path) -> PathBuf {
407
408 let mut full_path = self.base.clone();
409 full_path.push(path);
410 full_path
411 }
412
3d5c11e5
DM
413 pub fn base_path(&self) -> PathBuf {
414 self.base.clone()
415 }
35cf5daa
DM
416}
417
418
419#[test]
420fn test_chunk_store1() {
421
332dcc22
DM
422 let mut path = std::fs::canonicalize(".").unwrap(); // we need absulute path
423 path.push(".testdir");
424
35cf5daa
DM
425 if let Err(_e) = std::fs::remove_dir_all(".testdir") { /* ignore */ }
426
332dcc22 427 let chunk_store = ChunkStore::open("test", &path);
35cf5daa
DM
428 assert!(chunk_store.is_err());
429
332dcc22 430 let chunk_store = ChunkStore::create("test", &path).unwrap();
391a2e43
DM
431 let (exists, _) = chunk_store.insert_chunk(&[0u8, 1u8]).unwrap();
432 assert!(!exists);
433
434 let (exists, _) = chunk_store.insert_chunk(&[0u8, 1u8]).unwrap();
435 assert!(exists);
128b37fe 436
35cf5daa 437
332dcc22 438 let chunk_store = ChunkStore::create("test", &path);
35cf5daa
DM
439 assert!(chunk_store.is_err());
440
e0a5d1ca 441 if let Err(_e) = std::fs::remove_dir_all(".testdir") { /* ignore */ }
35cf5daa 442}