]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/chunk_store.rs
implement garbage collection for .aidx files
[proxmox-backup.git] / src / backup / chunk_store.rs
1 use failure::*;
2 use std::path::{Path, PathBuf};
3 use std::io::Write;
4 use std::time::Duration;
5
6 use openssl::sha;
7 use std::sync::Mutex;
8
9 use std::fs::File;
10 use std::os::unix::io::AsRawFd;
11
12 use crate::tools;
13
14 pub struct GarbageCollectionStatus {
15 pub used_bytes: usize,
16 pub used_chunks: usize,
17 pub disk_bytes: usize,
18 pub disk_chunks: usize,
19 }
20
21 impl Default for GarbageCollectionStatus {
22 fn default() -> Self {
23 GarbageCollectionStatus {
24 used_bytes: 0,
25 used_chunks: 0,
26 disk_bytes: 0,
27 disk_chunks: 0,
28 }
29 }
30 }
31
32 pub struct ChunkStore {
33 name: String, // used for error reporting
34 pub (crate) base: PathBuf,
35 chunk_dir: PathBuf,
36 mutex: Mutex<bool>,
37 _lockfile: File,
38 }
39
40 const HEX_CHARS: &'static [u8; 16] = b"0123456789abcdef";
41
42 // TODO: what about sysctl setting vm.vfs_cache_pressure (0 - 100) ?
43
44 pub fn digest_to_hex(digest: &[u8]) -> String {
45
46 let mut buf = Vec::<u8>::with_capacity(digest.len()*2);
47
48 for i in 0..digest.len() {
49 buf.push(HEX_CHARS[(digest[i] >> 4) as usize]);
50 buf.push(HEX_CHARS[(digest[i] & 0xf) as usize]);
51 }
52
53 unsafe { String::from_utf8_unchecked(buf) }
54 }
55
56 fn digest_to_prefix(digest: &[u8]) -> PathBuf {
57
58 let mut buf = Vec::<u8>::with_capacity(2+1+2+1);
59
60 buf.push(HEX_CHARS[(digest[0] as usize) >> 4]);
61 buf.push(HEX_CHARS[(digest[0] as usize) &0xf]);
62 buf.push('/' as u8);
63
64 buf.push(HEX_CHARS[(digest[1] as usize) >> 4]);
65 buf.push(HEX_CHARS[(digest[1] as usize) & 0xf]);
66 buf.push('/' as u8);
67
68 let path = unsafe { String::from_utf8_unchecked(buf)};
69
70 path.into()
71 }
72
73
74 impl ChunkStore {
75
76 fn chunk_dir<P: AsRef<Path>>(path: P) -> PathBuf {
77
78 let mut chunk_dir: PathBuf = PathBuf::from(path.as_ref());
79 chunk_dir.push(".chunks");
80
81 chunk_dir
82 }
83
84 pub fn create<P: Into<PathBuf>>(name: &str, path: P) -> Result<Self, Error> {
85
86 let base: PathBuf = path.into();
87 let chunk_dir = Self::chunk_dir(&base);
88
89 if let Err(err) = std::fs::create_dir(&base) {
90 bail!("unable to create chunk store '{}' at {:?} - {}", name, base, err);
91 }
92
93 if let Err(err) = std::fs::create_dir(&chunk_dir) {
94 bail!("unable to create chunk store '{}' subdir {:?} - {}", name, chunk_dir, err);
95 }
96
97 // create 256*256 subdirs
98 let mut last_percentage = 0;
99
100 for i in 0..256 {
101 let mut l1path = chunk_dir.clone();
102 l1path.push(format!("{:02x}",i));
103 if let Err(err) = std::fs::create_dir(&l1path) {
104 bail!("unable to create chunk store '{}' subdir {:?} - {}", name, l1path, err);
105 }
106 for j in 0..256 {
107 let mut l2path = l1path.clone();
108 l2path.push(format!("{:02x}",j));
109 if let Err(err) = std::fs::create_dir(&l2path) {
110 bail!("unable to create chunk store '{}' subdir {:?} - {}", name, l2path, err);
111 }
112 let percentage = ((i*256+j)*100)/(256*256);
113 if percentage != last_percentage {
114 eprintln!("Percentage done: {}", percentage);
115 last_percentage = percentage;
116 }
117 }
118 }
119
120 Self::open(name, base)
121 }
122
123 pub fn open<P: Into<PathBuf>>(name: &str, path: P) -> Result<Self, Error> {
124
125 let base: PathBuf = path.into();
126 let chunk_dir = Self::chunk_dir(&base);
127
128 if let Err(err) = std::fs::metadata(&chunk_dir) {
129 bail!("unable to open chunk store '{}' at {:?} - {}", name, chunk_dir, err);
130 }
131
132 let mut lockfile_path = base.clone();
133 lockfile_path.push(".lock");
134
135 // make sure only one process/thread/task can use it
136 let lockfile = tools::open_file_locked(
137 lockfile_path, Duration::from_secs(10))?;
138
139 Ok(ChunkStore {
140 name: name.to_owned(),
141 base,
142 chunk_dir,
143 _lockfile: lockfile,
144 mutex: Mutex::new(false)
145 })
146 }
147
148 pub fn touch_chunk(&self, digest:&[u8]) -> Result<(), Error> {
149
150 let mut chunk_path = self.chunk_dir.clone();
151 let prefix = digest_to_prefix(&digest);
152 chunk_path.push(&prefix);
153 let digest_str = digest_to_hex(&digest);
154 chunk_path.push(&digest_str);
155
156 const UTIME_NOW: i64 = ((1 << 30) - 1);
157 const UTIME_OMIT: i64 = ((1 << 30) - 2);
158
159 let times: [libc::timespec; 2] = [
160 libc::timespec { tv_sec: 0, tv_nsec: UTIME_NOW },
161 libc::timespec { tv_sec: 0, tv_nsec: UTIME_OMIT }
162 ];
163
164 use nix::NixPath;
165
166 let res = chunk_path.with_nix_path(|cstr| unsafe {
167 libc::utimensat(-1, cstr.as_ptr(), &times[0], libc::AT_SYMLINK_NOFOLLOW)
168 })?;
169
170 if let Err(err) = nix::errno::Errno::result(res) {
171 bail!("updata atime failed for chunk {:?} - {}", chunk_path, err);
172 }
173
174 Ok(())
175 }
176
177 fn sweep_old_files(&self, handle: &mut nix::dir::Dir, status: &mut GarbageCollectionStatus) -> Result<(), Error> {
178
179 let rawfd = handle.as_raw_fd();
180
181 let now = unsafe { libc::time(std::ptr::null_mut()) };
182
183 for entry in handle.iter() {
184 let entry = match entry {
185 Ok(entry) => entry,
186 Err(_) => continue /* ignore */,
187 };
188 let file_type = match entry.file_type() {
189 Some(file_type) => file_type,
190 None => bail!("unsupported file system type on chunk store '{}'", self.name),
191 };
192 if file_type != nix::dir::Type::File { continue; }
193
194 let filename = entry.file_name();
195 if let Ok(stat) = nix::sys::stat::fstatat(rawfd, filename, nix::fcntl::AtFlags::AT_SYMLINK_NOFOLLOW) {
196 let age = now - stat.st_atime;
197 //println!("FOUND {} {:?}", age/(3600*24), filename);
198 if age/(3600*24) >= 2 {
199 println!("UNLINK {} {:?}", age/(3600*24), filename);
200 let res = unsafe { libc::unlinkat(rawfd, filename.as_ptr(), 0) };
201 if res != 0 {
202 let err = nix::Error::last();
203 bail!("unlink chunk {:?} failed on store '{}' - {}", filename, self.name, err);
204 }
205 } else {
206 status.disk_chunks += 1;
207 status.disk_bytes += stat.st_size as usize;
208
209 }
210 }
211 }
212 Ok(())
213 }
214
215 pub fn sweep_unused_chunks(&self, status: &mut GarbageCollectionStatus) -> Result<(), Error> {
216
217 use nix::fcntl::OFlag;
218 use nix::sys::stat::Mode;
219 use nix::dir::Dir;
220
221 let base_handle = match Dir::open(
222 &self.chunk_dir, OFlag::O_RDONLY, Mode::empty()) {
223 Ok(h) => h,
224 Err(err) => bail!("unable to open store '{}' chunk dir {:?} - {}",
225 self.name, self.chunk_dir, err),
226 };
227
228 let base_fd = base_handle.as_raw_fd();
229
230 let mut last_percentage = 0;
231
232 for i in 0..256 {
233 let l1name = PathBuf::from(format!("{:02x}", i));
234 let l1_handle = match nix::dir::Dir::openat(
235 base_fd, &l1name, OFlag::O_RDONLY, Mode::empty()) {
236 Ok(h) => h,
237 Err(err) => bail!("unable to open store '{}' dir {:?}/{:?} - {}",
238 self.name, self.chunk_dir, l1name, err),
239 };
240
241 let l1_fd = l1_handle.as_raw_fd();
242
243 for j in 0..256 {
244 let l2name = PathBuf::from(format!("{:02x}", j));
245
246 let percentage = ((i*256+j)*100)/(256*256);
247 if percentage != last_percentage {
248 eprintln!("Percentage done: {}", percentage);
249 last_percentage = percentage;
250 }
251 //println!("SCAN {:?} {:?}", l1name, l2name);
252
253 let mut l2_handle = match Dir::openat(
254 l1_fd, &l2name, OFlag::O_RDONLY, Mode::empty()) {
255 Ok(h) => h,
256 Err(err) => bail!(
257 "unable to open store '{}' dir {:?}/{:?}/{:?} - {}",
258 self.name, self.chunk_dir, l1name, l2name, err),
259 };
260 self.sweep_old_files(&mut l2_handle, status)?;
261 }
262 }
263 Ok(())
264 }
265
266 pub fn insert_chunk(&self, chunk: &[u8]) -> Result<(bool, [u8; 32]), Error> {
267
268 // fixme: use Sha512/256 when available
269 let mut hasher = sha::Sha256::new();
270 hasher.update(chunk);
271
272 let digest = hasher.finish();
273
274 //println!("DIGEST {}", digest_to_hex(&digest));
275
276 let mut chunk_path = self.chunk_dir.clone();
277 let prefix = digest_to_prefix(&digest);
278 chunk_path.push(&prefix);
279 let digest_str = digest_to_hex(&digest);
280 chunk_path.push(&digest_str);
281
282 let lock = self.mutex.lock();
283
284 if let Ok(metadata) = std::fs::metadata(&chunk_path) {
285 if metadata.is_file() {
286 return Ok((true, digest));
287 } else {
288 bail!("Got unexpected file type on store '{}' for chunk {}", self.name, digest_str);
289 }
290 }
291
292 let mut tmp_path = chunk_path.clone();
293 tmp_path.set_extension("tmp");
294 let mut f = std::fs::File::create(&tmp_path)?;
295 f.write_all(chunk)?;
296
297 if let Err(err) = std::fs::rename(&tmp_path, &chunk_path) {
298 if let Err(_) = std::fs::remove_file(&tmp_path) { /* ignore */ }
299 bail!("Atomic rename on store '{}' failed for chunk {} - {}", self.name, digest_str, err);
300 }
301
302 //println!("PATH {:?}", chunk_path);
303
304 drop(lock);
305
306 Ok((false, digest))
307 }
308
309 pub fn relative_path(&self, path: &Path) -> PathBuf {
310
311 let mut full_path = self.base.clone();
312 full_path.push(path);
313 full_path
314 }
315
316 pub fn base_path(&self) -> PathBuf {
317 self.base.clone()
318 }
319 }
320
321
322 #[test]
323 fn test_chunk_store1() {
324
325 if let Err(_e) = std::fs::remove_dir_all(".testdir") { /* ignore */ }
326
327 let chunk_store = ChunkStore::open("test", ".testdir");
328 assert!(chunk_store.is_err());
329
330 let chunk_store = ChunkStore::create("test", ".testdir").unwrap();
331 let (exists, _) = chunk_store.insert_chunk(&[0u8, 1u8]).unwrap();
332 assert!(!exists);
333
334 let (exists, _) = chunk_store.insert_chunk(&[0u8, 1u8]).unwrap();
335 assert!(exists);
336
337
338 let chunk_store = ChunkStore::create("test", ".testdir");
339 assert!(chunk_store.is_err());
340
341
342 }