]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/fixed_index.rs
add incremental backup support
[proxmox-backup.git] / src / backup / fixed_index.rs
1 use anyhow::{bail, format_err, Error};
2 use std::convert::TryInto;
3 use std::io::{Seek, SeekFrom};
4
5 use super::chunk_stat::*;
6 use super::chunk_store::*;
7 use super::IndexFile;
8 use crate::tools::{self, epoch_now_u64};
9
10 use chrono::{Local, TimeZone};
11 use std::fs::File;
12 use std::io::Write;
13 use std::os::unix::io::AsRawFd;
14 use std::path::{Path, PathBuf};
15 use std::sync::Arc;
16
17 use super::read_chunk::*;
18 use super::ChunkInfo;
19
20 use proxmox::tools::io::ReadExt;
21 use proxmox::tools::Uuid;
22
23 /// Header format definition for fixed index files (`.fidx`)
24 #[repr(C)]
25 pub struct FixedIndexHeader {
26 pub magic: [u8; 8],
27 pub uuid: [u8; 16],
28 pub ctime: u64,
29 /// Sha256 over the index ``SHA256(digest1||digest2||...)``
30 pub index_csum: [u8; 32],
31 pub size: u64,
32 pub chunk_size: u64,
33 reserved: [u8; 4016], // overall size is one page (4096 bytes)
34 }
35 proxmox::static_assert_size!(FixedIndexHeader, 4096);
36
37 // split image into fixed size chunks
38
39 pub struct FixedIndexReader {
40 _file: File,
41 pub chunk_size: usize,
42 pub size: u64,
43 index_length: usize,
44 index: *mut u8,
45 pub uuid: [u8; 16],
46 pub ctime: u64,
47 pub index_csum: [u8; 32],
48 }
49
50 // `index` is mmap()ed which cannot be thread-local so should be sendable
51 unsafe impl Send for FixedIndexReader {}
52 unsafe impl Sync for FixedIndexReader {}
53
54 impl Drop for FixedIndexReader {
55 fn drop(&mut self) {
56 if let Err(err) = self.unmap() {
57 eprintln!("Unable to unmap file - {}", err);
58 }
59 }
60 }
61
62 impl FixedIndexReader {
63 pub fn open(path: &Path) -> Result<Self, Error> {
64 File::open(path)
65 .map_err(Error::from)
66 .and_then(|file| Self::new(file))
67 .map_err(|err| format_err!("Unable to open fixed index {:?} - {}", path, err))
68 }
69
70 pub fn new(mut file: std::fs::File) -> Result<Self, Error> {
71 if let Err(err) =
72 nix::fcntl::flock(file.as_raw_fd(), nix::fcntl::FlockArg::LockSharedNonblock)
73 {
74 bail!("unable to get shared lock - {}", err);
75 }
76
77 file.seek(SeekFrom::Start(0))?;
78
79 let header_size = std::mem::size_of::<FixedIndexHeader>();
80 let header: Box<FixedIndexHeader> = unsafe { file.read_host_value_boxed()? };
81
82 if header.magic != super::FIXED_SIZED_CHUNK_INDEX_1_0 {
83 bail!("got unknown magic number");
84 }
85
86 let size = u64::from_le(header.size);
87 let ctime = u64::from_le(header.ctime);
88 let chunk_size = u64::from_le(header.chunk_size);
89
90 let index_length = ((size + chunk_size - 1) / chunk_size) as usize;
91 let index_size = index_length * 32;
92
93 let rawfd = file.as_raw_fd();
94
95 let stat = match nix::sys::stat::fstat(rawfd) {
96 Ok(stat) => stat,
97 Err(err) => bail!("fstat failed - {}", err),
98 };
99
100 let expected_index_size = (stat.st_size as usize) - header_size;
101 if index_size != expected_index_size {
102 bail!(
103 "got unexpected file size ({} != {})",
104 index_size,
105 expected_index_size
106 );
107 }
108
109 let data = unsafe {
110 nix::sys::mman::mmap(
111 std::ptr::null_mut(),
112 index_size,
113 nix::sys::mman::ProtFlags::PROT_READ,
114 nix::sys::mman::MapFlags::MAP_PRIVATE,
115 file.as_raw_fd(),
116 header_size as i64,
117 )
118 }? as *mut u8;
119
120 Ok(Self {
121 _file: file,
122 chunk_size: chunk_size as usize,
123 size,
124 index_length,
125 index: data,
126 ctime,
127 uuid: header.uuid,
128 index_csum: header.index_csum,
129 })
130 }
131
132 fn unmap(&mut self) -> Result<(), Error> {
133 if self.index == std::ptr::null_mut() {
134 return Ok(());
135 }
136
137 let index_size = self.index_length * 32;
138
139 if let Err(err) =
140 unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) }
141 {
142 bail!("unmap file failed - {}", err);
143 }
144
145 self.index = std::ptr::null_mut();
146
147 Ok(())
148 }
149
150 pub fn chunk_info(&self, pos: usize) -> Result<(u64, u64, [u8; 32]), Error> {
151 if pos >= self.index_length {
152 bail!("chunk index out of range");
153 }
154 let start = (pos * self.chunk_size) as u64;
155 let mut end = start + self.chunk_size as u64;
156
157 if end > self.size {
158 end = self.size;
159 }
160
161 let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit();
162 unsafe {
163 std::ptr::copy_nonoverlapping(
164 self.index.add(pos * 32),
165 (*digest.as_mut_ptr()).as_mut_ptr(),
166 32,
167 );
168 }
169
170 Ok((start, end, unsafe { digest.assume_init() }))
171 }
172
173 #[inline]
174 fn chunk_digest(&self, pos: usize) -> &[u8; 32] {
175 if pos >= self.index_length {
176 panic!("chunk index out of range");
177 }
178 let slice = unsafe { std::slice::from_raw_parts(self.index.add(pos * 32), 32) };
179 slice.try_into().unwrap()
180 }
181
182 #[inline]
183 fn chunk_end(&self, pos: usize) -> u64 {
184 if pos >= self.index_length {
185 panic!("chunk index out of range");
186 }
187
188 let end = ((pos + 1) * self.chunk_size) as u64;
189 if end > self.size {
190 self.size
191 } else {
192 end
193 }
194 }
195
196 /// Compute checksum and data size
197 pub fn compute_csum(&self) -> ([u8; 32], u64) {
198 let mut csum = openssl::sha::Sha256::new();
199 let mut chunk_end = 0;
200 for pos in 0..self.index_length {
201 chunk_end = self.chunk_end(pos);
202 let digest = self.chunk_digest(pos);
203 csum.update(digest);
204 }
205 let csum = csum.finish();
206
207 (csum, chunk_end)
208 }
209
210 pub fn print_info(&self) {
211 println!("Size: {}", self.size);
212 println!("ChunkSize: {}", self.chunk_size);
213 println!(
214 "CTime: {}",
215 Local.timestamp(self.ctime as i64, 0).format("%c")
216 );
217 println!("UUID: {:?}", self.uuid);
218 }
219 }
220
221 impl IndexFile for FixedIndexReader {
222 fn index_count(&self) -> usize {
223 self.index_length
224 }
225
226 fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> {
227 if pos >= self.index_length {
228 None
229 } else {
230 Some(unsafe { std::mem::transmute(self.index.add(pos * 32)) })
231 }
232 }
233
234 fn index_bytes(&self) -> u64 {
235 self.size
236 }
237 }
238
239 pub struct FixedIndexWriter {
240 store: Arc<ChunkStore>,
241 file: File,
242 _lock: tools::ProcessLockSharedGuard,
243 filename: PathBuf,
244 tmp_filename: PathBuf,
245 chunk_size: usize,
246 size: usize,
247 index_length: usize,
248 index: *mut u8,
249 pub uuid: [u8; 16],
250 pub ctime: u64,
251 }
252
253 // `index` is mmap()ed which cannot be thread-local so should be sendable
254 unsafe impl Send for FixedIndexWriter {}
255
256 impl Drop for FixedIndexWriter {
257 fn drop(&mut self) {
258 let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors
259 if let Err(err) = self.unmap() {
260 eprintln!("Unable to unmap file {:?} - {}", self.tmp_filename, err);
261 }
262 }
263 }
264
265 impl FixedIndexWriter {
266 #[allow(clippy::cast_ptr_alignment)]
267 pub fn create(
268 store: Arc<ChunkStore>,
269 path: &Path,
270 size: usize,
271 chunk_size: usize,
272 ) -> Result<Self, Error> {
273 let shared_lock = store.try_shared_lock()?;
274
275 let full_path = store.relative_path(path);
276 let mut tmp_path = full_path.clone();
277 tmp_path.set_extension("tmp_fidx");
278
279 let mut file = std::fs::OpenOptions::new()
280 .create(true)
281 .truncate(true)
282 .read(true)
283 .write(true)
284 .open(&tmp_path)?;
285
286 let header_size = std::mem::size_of::<FixedIndexHeader>();
287
288 // todo: use static assertion when available in rust
289 if header_size != 4096 {
290 panic!("got unexpected header size");
291 }
292
293 let ctime = epoch_now_u64()?;
294
295 let uuid = Uuid::generate();
296
297 let buffer = vec![0u8; header_size];
298 let header = unsafe { &mut *(buffer.as_ptr() as *mut FixedIndexHeader) };
299
300 header.magic = super::FIXED_SIZED_CHUNK_INDEX_1_0;
301 header.ctime = u64::to_le(ctime);
302 header.size = u64::to_le(size as u64);
303 header.chunk_size = u64::to_le(chunk_size as u64);
304 header.uuid = *uuid.as_bytes();
305
306 header.index_csum = [0u8; 32];
307
308 file.write_all(&buffer)?;
309
310 let index_length = (size + chunk_size - 1) / chunk_size;
311 let index_size = index_length * 32;
312 nix::unistd::ftruncate(file.as_raw_fd(), (header_size + index_size) as i64)?;
313
314 let data = unsafe {
315 nix::sys::mman::mmap(
316 std::ptr::null_mut(),
317 index_size,
318 nix::sys::mman::ProtFlags::PROT_READ | nix::sys::mman::ProtFlags::PROT_WRITE,
319 nix::sys::mman::MapFlags::MAP_SHARED,
320 file.as_raw_fd(),
321 header_size as i64,
322 )
323 }? as *mut u8;
324
325 Ok(Self {
326 store,
327 file,
328 _lock: shared_lock,
329 filename: full_path,
330 tmp_filename: tmp_path,
331 chunk_size,
332 size,
333 index_length,
334 index: data,
335 ctime,
336 uuid: *uuid.as_bytes(),
337 })
338 }
339
340 pub fn index_length(&self) -> usize {
341 self.index_length
342 }
343
344 fn unmap(&mut self) -> Result<(), Error> {
345 if self.index == std::ptr::null_mut() {
346 return Ok(());
347 }
348
349 let index_size = self.index_length * 32;
350
351 if let Err(err) =
352 unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) }
353 {
354 bail!("unmap file {:?} failed - {}", self.tmp_filename, err);
355 }
356
357 self.index = std::ptr::null_mut();
358
359 Ok(())
360 }
361
362 pub fn close(&mut self) -> Result<[u8; 32], Error> {
363 if self.index == std::ptr::null_mut() {
364 bail!("cannot close already closed index file.");
365 }
366
367 let index_size = self.index_length * 32;
368 let data = unsafe { std::slice::from_raw_parts(self.index, index_size) };
369 let index_csum = openssl::sha::sha256(data);
370
371 self.unmap()?;
372
373 let csum_offset = proxmox::offsetof!(FixedIndexHeader, index_csum);
374 self.file.seek(SeekFrom::Start(csum_offset as u64))?;
375 self.file.write_all(&index_csum)?;
376 self.file.flush()?;
377
378 if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
379 bail!("Atomic rename file {:?} failed - {}", self.filename, err);
380 }
381
382 Ok(index_csum)
383 }
384
385 pub fn check_chunk_alignment(&self, offset: usize, chunk_len: usize) -> Result<usize, Error> {
386 if offset < chunk_len {
387 bail!("got chunk with small offset ({} < {}", offset, chunk_len);
388 }
389
390 let pos = offset - chunk_len;
391
392 if offset > self.size {
393 bail!("chunk data exceeds size ({} >= {})", offset, self.size);
394 }
395
396 // last chunk can be smaller
397 if ((offset != self.size) && (chunk_len != self.chunk_size))
398 || (chunk_len > self.chunk_size)
399 || (chunk_len == 0)
400 {
401 bail!(
402 "chunk with unexpected length ({} != {}",
403 chunk_len,
404 self.chunk_size
405 );
406 }
407
408 if pos & (self.chunk_size - 1) != 0 {
409 bail!("got unaligned chunk (pos = {})", pos);
410 }
411
412 Ok(pos / self.chunk_size)
413 }
414
415 // Note: We want to add data out of order, so do not assume any order here.
416 pub fn add_chunk(&mut self, chunk_info: &ChunkInfo, stat: &mut ChunkStat) -> Result<(), Error> {
417 let chunk_len = chunk_info.chunk_len as usize;
418 let offset = chunk_info.offset as usize; // end of chunk
419
420 let idx = self.check_chunk_alignment(offset, chunk_len)?;
421
422 let (is_duplicate, compressed_size) = self
423 .store
424 .insert_chunk(&chunk_info.chunk, &chunk_info.digest)?;
425
426 stat.chunk_count += 1;
427 stat.compressed_size += compressed_size;
428
429 let digest = &chunk_info.digest;
430
431 println!(
432 "ADD CHUNK {} {} {}% {} {}",
433 idx,
434 chunk_len,
435 (compressed_size * 100) / (chunk_len as u64),
436 is_duplicate,
437 proxmox::tools::digest_to_hex(digest)
438 );
439
440 if is_duplicate {
441 stat.duplicate_chunks += 1;
442 } else {
443 stat.disk_size += compressed_size;
444 }
445
446 self.add_digest(idx, digest)
447 }
448
449 pub fn add_digest(&mut self, index: usize, digest: &[u8; 32]) -> Result<(), Error> {
450 if index >= self.index_length {
451 bail!(
452 "add digest failed - index out of range ({} >= {})",
453 index,
454 self.index_length
455 );
456 }
457
458 if self.index == std::ptr::null_mut() {
459 bail!("cannot write to closed index file.");
460 }
461
462 let index_pos = index * 32;
463 unsafe {
464 let dst = self.index.add(index_pos);
465 dst.copy_from_nonoverlapping(digest.as_ptr(), 32);
466 }
467
468 Ok(())
469 }
470
471 pub fn clone_data_from(&mut self, reader: &FixedIndexReader) -> Result<(), Error> {
472 if self.index_length != reader.index_count() {
473 bail!("clone_data_from failed - index sizes not equal");
474 }
475
476 for i in 0..self.index_length {
477 self.add_digest(i, reader.index_digest(i).unwrap())?;
478 }
479
480 Ok(())
481 }
482 }
483
484 pub struct BufferedFixedReader<S> {
485 store: S,
486 index: FixedIndexReader,
487 archive_size: u64,
488 read_buffer: Vec<u8>,
489 buffered_chunk_idx: usize,
490 buffered_chunk_start: u64,
491 read_offset: u64,
492 }
493
494 impl<S: ReadChunk> BufferedFixedReader<S> {
495 pub fn new(index: FixedIndexReader, store: S) -> Self {
496 let archive_size = index.size;
497 Self {
498 store,
499 index,
500 archive_size,
501 read_buffer: Vec::with_capacity(1024 * 1024),
502 buffered_chunk_idx: 0,
503 buffered_chunk_start: 0,
504 read_offset: 0,
505 }
506 }
507
508 pub fn archive_size(&self) -> u64 {
509 self.archive_size
510 }
511
512 fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
513 let index = &self.index;
514 let (start, end, digest) = index.chunk_info(idx)?;
515
516 // fixme: avoid copy
517
518 let data = self.store.read_chunk(&digest)?;
519
520 if (end - start) != data.len() as u64 {
521 bail!(
522 "read chunk with wrong size ({} != {}",
523 (end - start),
524 data.len()
525 );
526 }
527
528 self.read_buffer.clear();
529 self.read_buffer.extend_from_slice(&data);
530
531 self.buffered_chunk_idx = idx;
532
533 self.buffered_chunk_start = start as u64;
534 //println!("BUFFER {} {}", self.buffered_chunk_start, end);
535 Ok(())
536 }
537 }
538
539 impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> {
540 fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
541 if offset == self.archive_size {
542 return Ok(&self.read_buffer[0..0]);
543 }
544
545 let buffer_len = self.read_buffer.len();
546 let index = &self.index;
547
548 // optimization for sequential read
549 if buffer_len > 0
550 && ((self.buffered_chunk_idx + 1) < index.index_length)
551 && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
552 {
553 let next_idx = self.buffered_chunk_idx + 1;
554 let next_end = index.chunk_end(next_idx);
555 if offset < next_end {
556 self.buffer_chunk(next_idx)?;
557 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
558 return Ok(&self.read_buffer[buffer_offset..]);
559 }
560 }
561
562 if (buffer_len == 0)
563 || (offset < self.buffered_chunk_start)
564 || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
565 {
566 let idx = (offset / index.chunk_size as u64) as usize;
567 self.buffer_chunk(idx)?;
568 }
569
570 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
571 Ok(&self.read_buffer[buffer_offset..])
572 }
573 }
574
575 impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> {
576 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
577 use crate::tools::BufferedRead;
578 use std::io::{Error, ErrorKind};
579
580 let data = match self.buffered_read(self.read_offset) {
581 Ok(v) => v,
582 Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
583 };
584
585 let n = if data.len() > buf.len() {
586 buf.len()
587 } else {
588 data.len()
589 };
590
591 unsafe {
592 std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n);
593 }
594
595 self.read_offset += n as u64;
596
597 Ok(n)
598 }
599 }
600
601 impl<S: ReadChunk> Seek for BufferedFixedReader<S> {
602 fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
603 let new_offset = match pos {
604 SeekFrom::Start(start_offset) => start_offset as i64,
605 SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
606 SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
607 };
608
609 use std::io::{Error, ErrorKind};
610 if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
611 return Err(Error::new(
612 ErrorKind::Other,
613 format!(
614 "seek is out of range {} ([0..{}])",
615 new_offset, self.archive_size
616 ),
617 ));
618 }
619 self.read_offset = new_offset as u64;
620
621 Ok(self.read_offset)
622 }
623 }