]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/chunk_store.rs
backup/chunk_store.rs: fix test case - use absolute path
[proxmox-backup.git] / src / backup / chunk_store.rs
1 use failure::*;
2 use std::path::{Path, PathBuf};
3 use std::io::Write;
4 use std::time::Duration;
5
6 use openssl::sha;
7 use std::sync::Mutex;
8
9 use std::fs::File;
10 use std::os::unix::io::AsRawFd;
11
12 use crate::tools;
13
14 pub struct GarbageCollectionStatus {
15 pub used_bytes: usize,
16 pub used_chunks: usize,
17 pub disk_bytes: usize,
18 pub disk_chunks: usize,
19 }
20
21 impl Default for GarbageCollectionStatus {
22 fn default() -> Self {
23 GarbageCollectionStatus {
24 used_bytes: 0,
25 used_chunks: 0,
26 disk_bytes: 0,
27 disk_chunks: 0,
28 }
29 }
30 }
31
32 pub struct ChunkStore {
33 name: String, // used for error reporting
34 pub (crate) base: PathBuf,
35 chunk_dir: PathBuf,
36 mutex: Mutex<bool>,
37 _lockfile: File,
38 }
39
40 const HEX_CHARS: &'static [u8; 16] = b"0123456789abcdef";
41
42 // TODO: what about sysctl setting vm.vfs_cache_pressure (0 - 100) ?
43
44 pub fn digest_to_hex(digest: &[u8]) -> String {
45
46 let mut buf = Vec::<u8>::with_capacity(digest.len()*2);
47
48 for i in 0..digest.len() {
49 buf.push(HEX_CHARS[(digest[i] >> 4) as usize]);
50 buf.push(HEX_CHARS[(digest[i] & 0xf) as usize]);
51 }
52
53 unsafe { String::from_utf8_unchecked(buf) }
54 }
55
56 fn digest_to_prefix(digest: &[u8]) -> PathBuf {
57
58 let mut buf = Vec::<u8>::with_capacity(2+1+2+1);
59
60 buf.push(HEX_CHARS[(digest[0] as usize) >> 4]);
61 buf.push(HEX_CHARS[(digest[0] as usize) &0xf]);
62 buf.push(HEX_CHARS[(digest[1] as usize) >> 4]);
63 buf.push(HEX_CHARS[(digest[1] as usize) & 0xf]);
64 buf.push('/' as u8);
65
66 let path = unsafe { String::from_utf8_unchecked(buf)};
67
68 path.into()
69 }
70
71
72 impl ChunkStore {
73
74 fn chunk_dir<P: AsRef<Path>>(path: P) -> PathBuf {
75
76 let mut chunk_dir: PathBuf = PathBuf::from(path.as_ref());
77 chunk_dir.push(".chunks");
78
79 chunk_dir
80 }
81
82 pub fn create<P: Into<PathBuf>>(name: &str, path: P) -> Result<Self, Error> {
83
84 let base: PathBuf = path.into();
85
86 if !base.is_absolute() {
87 bail!("expected absolute path - got {:?}", base);
88 }
89
90 let chunk_dir = Self::chunk_dir(&base);
91
92 if let Err(err) = std::fs::create_dir(&base) {
93 bail!("unable to create chunk store '{}' at {:?} - {}", name, base, err);
94 }
95
96 if let Err(err) = std::fs::create_dir(&chunk_dir) {
97 bail!("unable to create chunk store '{}' subdir {:?} - {}", name, chunk_dir, err);
98 }
99
100 // create 64*1024 subdirs
101 let mut last_percentage = 0;
102
103 for i in 0..64*1024 {
104 let mut l1path = chunk_dir.clone();
105 l1path.push(format!("{:04x}", i));
106 if let Err(err) = std::fs::create_dir(&l1path) {
107 bail!("unable to create chunk store '{}' subdir {:?} - {}", name, l1path, err);
108 }
109 let percentage = (i*100)/(64*1024);
110 if percentage != last_percentage {
111 eprintln!("Percentage done: {}", percentage);
112 last_percentage = percentage;
113 }
114 }
115
116 Self::open(name, base)
117 }
118
119 pub fn open<P: Into<PathBuf>>(name: &str, path: P) -> Result<Self, Error> {
120
121 let base: PathBuf = path.into();
122
123 if !base.is_absolute() {
124 bail!("expected absolute path - got {:?}", base);
125 }
126
127 let chunk_dir = Self::chunk_dir(&base);
128
129 if let Err(err) = std::fs::metadata(&chunk_dir) {
130 bail!("unable to open chunk store '{}' at {:?} - {}", name, chunk_dir, err);
131 }
132
133 let mut lockfile_path = base.clone();
134 lockfile_path.push(".lock");
135
136 // make sure only one process/thread/task can use it
137 let lockfile = tools::open_file_locked(
138 lockfile_path, Duration::from_secs(10))?;
139
140 Ok(ChunkStore {
141 name: name.to_owned(),
142 base,
143 chunk_dir,
144 _lockfile: lockfile,
145 mutex: Mutex::new(false)
146 })
147 }
148
149 pub fn touch_chunk(&self, digest:&[u8]) -> Result<(), Error> {
150
151 let mut chunk_path = self.chunk_dir.clone();
152 let prefix = digest_to_prefix(&digest);
153 chunk_path.push(&prefix);
154 let digest_str = digest_to_hex(&digest);
155 chunk_path.push(&digest_str);
156
157 const UTIME_NOW: i64 = ((1 << 30) - 1);
158 const UTIME_OMIT: i64 = ((1 << 30) - 2);
159
160 let times: [libc::timespec; 2] = [
161 libc::timespec { tv_sec: 0, tv_nsec: UTIME_NOW },
162 libc::timespec { tv_sec: 0, tv_nsec: UTIME_OMIT }
163 ];
164
165 use nix::NixPath;
166
167 let res = chunk_path.with_nix_path(|cstr| unsafe {
168 libc::utimensat(-1, cstr.as_ptr(), &times[0], libc::AT_SYMLINK_NOFOLLOW)
169 })?;
170
171 if let Err(err) = nix::errno::Errno::result(res) {
172 bail!("updata atime failed for chunk {:?} - {}", chunk_path, err);
173 }
174
175 Ok(())
176 }
177
178 pub fn read_chunk(&self, digest:&[u8], buffer: &mut Vec<u8>) -> Result<(), Error> {
179
180 let mut chunk_path = self.chunk_dir.clone();
181 let prefix = digest_to_prefix(&digest);
182 chunk_path.push(&prefix);
183 let digest_str = digest_to_hex(&digest);
184 chunk_path.push(&digest_str);
185
186 let mut f = std::fs::File::open(&chunk_path)?;
187
188 let stat = nix::sys::stat::fstat(f.as_raw_fd())?;
189 let size = stat.st_size as usize;
190
191 if buffer.capacity() < size {
192 let mut newsize = buffer.capacity();
193 while newsize < size { newsize = newsize << 1; }
194 let additional = newsize - buffer.len();
195 buffer.reserve_exact(additional);
196 }
197 unsafe { buffer.set_len(size); }
198
199 use std::io::Read;
200
201 f.read_exact(buffer.as_mut_slice())?;
202
203 Ok(())
204 }
205
206 fn sweep_old_files(&self, handle: &mut nix::dir::Dir, status: &mut GarbageCollectionStatus) -> Result<(), Error> {
207
208 let rawfd = handle.as_raw_fd();
209
210 let now = unsafe { libc::time(std::ptr::null_mut()) };
211
212 for entry in handle.iter() {
213 let entry = match entry {
214 Ok(entry) => entry,
215 Err(_) => continue /* ignore */,
216 };
217 let file_type = match entry.file_type() {
218 Some(file_type) => file_type,
219 None => bail!("unsupported file system type on chunk store '{}'", self.name),
220 };
221 if file_type != nix::dir::Type::File { continue; }
222
223 let filename = entry.file_name();
224 if let Ok(stat) = nix::sys::stat::fstatat(rawfd, filename, nix::fcntl::AtFlags::AT_SYMLINK_NOFOLLOW) {
225 let age = now - stat.st_atime;
226 //println!("FOUND {} {:?}", age/(3600*24), filename);
227 if age/(3600*24) >= 2 {
228 println!("UNLINK {} {:?}", age/(3600*24), filename);
229 let res = unsafe { libc::unlinkat(rawfd, filename.as_ptr(), 0) };
230 if res != 0 {
231 let err = nix::Error::last();
232 bail!("unlink chunk {:?} failed on store '{}' - {}", filename, self.name, err);
233 }
234 } else {
235 status.disk_chunks += 1;
236 status.disk_bytes += stat.st_size as usize;
237
238 }
239 }
240 }
241 Ok(())
242 }
243
244 pub fn sweep_unused_chunks(&self, status: &mut GarbageCollectionStatus) -> Result<(), Error> {
245
246 use nix::fcntl::OFlag;
247 use nix::sys::stat::Mode;
248 use nix::dir::Dir;
249
250 let base_handle = match Dir::open(
251 &self.chunk_dir, OFlag::O_RDONLY, Mode::empty()) {
252 Ok(h) => h,
253 Err(err) => bail!("unable to open store '{}' chunk dir {:?} - {}",
254 self.name, self.chunk_dir, err),
255 };
256
257 let base_fd = base_handle.as_raw_fd();
258
259 let mut last_percentage = 0;
260
261 for i in 0..64*1024 {
262
263 let percentage = (i*100)/(64*1024);
264 if percentage != last_percentage {
265 eprintln!("Percentage done: {}", percentage);
266 last_percentage = percentage;
267 }
268
269 let l1name = PathBuf::from(format!("{:04x}", i));
270 match nix::dir::Dir::openat(base_fd, &l1name, OFlag::O_RDONLY, Mode::empty()) {
271 Ok(mut h) => {
272 //println!("SCAN {:?} {:?}", l1name);
273 self.sweep_old_files(&mut h, status)?;
274 }
275 Err(err) => bail!("unable to open store '{}' dir {:?}/{:?} - {}",
276 self.name, self.chunk_dir, l1name, err),
277 };
278 }
279 Ok(())
280 }
281
282 pub fn insert_chunk(&self, chunk: &[u8]) -> Result<(bool, [u8; 32]), Error> {
283
284 // fixme: use Sha512/256 when available
285 let mut hasher = sha::Sha256::new();
286 hasher.update(chunk);
287
288 let digest = hasher.finish();
289
290 //println!("DIGEST {}", digest_to_hex(&digest));
291
292 let mut chunk_path = self.chunk_dir.clone();
293 let prefix = digest_to_prefix(&digest);
294 chunk_path.push(&prefix);
295 let digest_str = digest_to_hex(&digest);
296 chunk_path.push(&digest_str);
297
298 let lock = self.mutex.lock();
299
300 if let Ok(metadata) = std::fs::metadata(&chunk_path) {
301 if metadata.is_file() {
302 return Ok((true, digest));
303 } else {
304 bail!("Got unexpected file type on store '{}' for chunk {}", self.name, digest_str);
305 }
306 }
307
308 let mut tmp_path = chunk_path.clone();
309 tmp_path.set_extension("tmp");
310 let mut f = std::fs::File::create(&tmp_path)?;
311 f.write_all(chunk)?;
312
313 if let Err(err) = std::fs::rename(&tmp_path, &chunk_path) {
314 if let Err(_) = std::fs::remove_file(&tmp_path) { /* ignore */ }
315 bail!("Atomic rename on store '{}' failed for chunk {} - {}", self.name, digest_str, err);
316 }
317
318 //println!("PATH {:?}", chunk_path);
319
320 drop(lock);
321
322 Ok((false, digest))
323 }
324
325 pub fn relative_path(&self, path: &Path) -> PathBuf {
326
327 let mut full_path = self.base.clone();
328 full_path.push(path);
329 full_path
330 }
331
332 pub fn base_path(&self) -> PathBuf {
333 self.base.clone()
334 }
335 }
336
337
338 #[test]
339 fn test_chunk_store1() {
340
341 let mut path = std::fs::canonicalize(".").unwrap(); // we need absulute path
342 path.push(".testdir");
343
344 if let Err(_e) = std::fs::remove_dir_all(".testdir") { /* ignore */ }
345
346 let chunk_store = ChunkStore::open("test", &path);
347 assert!(chunk_store.is_err());
348
349 let chunk_store = ChunkStore::create("test", &path).unwrap();
350 let (exists, _) = chunk_store.insert_chunk(&[0u8, 1u8]).unwrap();
351 assert!(!exists);
352
353 let (exists, _) = chunk_store.insert_chunk(&[0u8, 1u8]).unwrap();
354 assert!(exists);
355
356
357 let chunk_store = ChunkStore::create("test", &path);
358 assert!(chunk_store.is_err());
359
360
361 }