]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/dynamic_index.rs
af848e93b0958e35c8f4b57a3757a0af68963290
[proxmox-backup.git] / src / backup / dynamic_index.rs
1 use std::convert::TryInto;
2 use std::fs::File;
3 use std::io::{BufWriter, Seek, SeekFrom, Write};
4 use std::os::unix::io::AsRawFd;
5 use std::path::{Path, PathBuf};
6 use std::sync::Arc;
7
8 use failure::*;
9
10 use proxmox::tools::io::ReadExt;
11 use proxmox::tools::uuid::Uuid;
12 use proxmox::tools::vec;
13
14 use super::chunk_stat::ChunkStat;
15 use super::chunk_store::ChunkStore;
16 use super::read_chunk::ReadChunk;
17 use super::Chunker;
18 use super::IndexFile;
19 use super::{DataBlob, DataChunkBuilder};
20 use crate::tools;
21
22 /// Header format definition for dynamic index files (`.dixd`)
23 #[repr(C)]
24 pub struct DynamicIndexHeader {
25 pub magic: [u8; 8],
26 pub uuid: [u8; 16],
27 pub ctime: u64,
28 /// Sha256 over the index ``SHA256(offset1||digest1||offset2||digest2||...)``
29 pub index_csum: [u8; 32],
30 reserved: [u8; 4032], // overall size is one page (4096 bytes)
31 }
32 proxmox::tools::static_assert_size!(DynamicIndexHeader, 4096);
33 // TODO: Once non-Copy unions are stabilized, use:
34 // union DynamicIndexHeader {
35 // reserved: [u8; 4096],
36 // pub data: DynamicIndexHeaderData,
37 // }
38
39 pub struct DynamicIndexReader {
40 _file: File,
41 pub size: usize,
42 index: *const u8,
43 index_entries: usize,
44 pub uuid: [u8; 16],
45 pub ctime: u64,
46 pub index_csum: [u8; 32],
47 }
48
49 // `index` is mmap()ed which cannot be thread-local so should be sendable
50 // FIXME: Introduce an mmap wrapper type for this?
51 unsafe impl Send for DynamicIndexReader {}
52 unsafe impl Sync for DynamicIndexReader {}
53
54 impl Drop for DynamicIndexReader {
55 fn drop(&mut self) {
56 if let Err(err) = self.unmap() {
57 eprintln!("Unable to unmap dynamic index - {}", err);
58 }
59 }
60 }
61
62 impl DynamicIndexReader {
63 pub fn open(path: &Path) -> Result<Self, Error> {
64 File::open(path)
65 .map_err(Error::from)
66 .and_then(|file| Self::new(file))
67 .map_err(|err| format_err!("Unable to open dynamic index {:?} - {}", path, err))
68 }
69
70 pub fn new(mut file: std::fs::File) -> Result<Self, Error> {
71 if let Err(err) =
72 nix::fcntl::flock(file.as_raw_fd(), nix::fcntl::FlockArg::LockSharedNonblock)
73 {
74 bail!("unable to get shared lock - {}", err);
75 }
76
77 file.seek(SeekFrom::Start(0))?;
78
79 let header_size = std::mem::size_of::<DynamicIndexHeader>();
80
81 let header: Box<DynamicIndexHeader> = unsafe { file.read_host_value_boxed()? };
82
83 if header.magic != super::DYNAMIC_SIZED_CHUNK_INDEX_1_0 {
84 bail!("got unknown magic number");
85 }
86
87 let ctime = u64::from_le(header.ctime);
88
89 let rawfd = file.as_raw_fd();
90
91 let stat = nix::sys::stat::fstat(rawfd)?;
92
93 let size = stat.st_size as usize;
94
95 let index_size = size - header_size;
96 if (index_size % 40) != 0 {
97 bail!("got unexpected file size");
98 }
99
100 let data = unsafe {
101 nix::sys::mman::mmap(
102 std::ptr::null_mut(),
103 index_size,
104 nix::sys::mman::ProtFlags::PROT_READ,
105 nix::sys::mman::MapFlags::MAP_PRIVATE,
106 rawfd,
107 header_size as i64,
108 )
109 }? as *const u8;
110
111 Ok(Self {
112 _file: file,
113 size,
114 index: data,
115 index_entries: index_size / 40,
116 ctime,
117 uuid: header.uuid,
118 index_csum: header.index_csum,
119 })
120 }
121
122 fn unmap(&mut self) -> Result<(), Error> {
123 if self.index == std::ptr::null_mut() {
124 return Ok(());
125 }
126
127 if let Err(err) = unsafe {
128 nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, self.index_entries * 40)
129 } {
130 bail!("unmap dynamic index failed - {}", err);
131 }
132
133 self.index = std::ptr::null_mut();
134
135 Ok(())
136 }
137
138 #[allow(clippy::cast_ptr_alignment)]
139 pub fn chunk_info(&self, pos: usize) -> Result<(u64, u64, [u8; 32]), Error> {
140 if pos >= self.index_entries {
141 bail!("chunk index out of range");
142 }
143 let start = if pos == 0 {
144 0
145 } else {
146 unsafe { *(self.index.add((pos - 1) * 40) as *const u64) }
147 };
148
149 let end = unsafe { *(self.index.add(pos * 40) as *const u64) };
150
151 let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit();
152 unsafe {
153 std::ptr::copy_nonoverlapping(
154 self.index.add(pos * 40 + 8),
155 (*digest.as_mut_ptr()).as_mut_ptr(),
156 32,
157 );
158 }
159
160 Ok((start, end, unsafe { digest.assume_init() }))
161 }
162
163 #[inline]
164 #[allow(clippy::cast_ptr_alignment)]
165 fn chunk_end(&self, pos: usize) -> u64 {
166 if pos >= self.index_entries {
167 panic!("chunk index out of range");
168 }
169 unsafe { *(self.index.add(pos * 40) as *const u64) }
170 }
171
172 #[inline]
173 fn chunk_digest(&self, pos: usize) -> &[u8; 32] {
174 if pos >= self.index_entries {
175 panic!("chunk index out of range");
176 }
177 let slice = unsafe { std::slice::from_raw_parts(self.index.add(pos * 40 + 8), 32) };
178 slice.try_into().unwrap()
179 }
180
181 /// Compute checksum and data size
182 pub fn compute_csum(&self) -> ([u8; 32], u64) {
183 let mut csum = openssl::sha::Sha256::new();
184 let mut chunk_end = 0;
185 for pos in 0..self.index_entries {
186 chunk_end = self.chunk_end(pos);
187 let digest = self.chunk_digest(pos);
188 csum.update(&chunk_end.to_le_bytes());
189 csum.update(digest);
190 }
191 let csum = csum.finish();
192
193 (csum, chunk_end)
194 }
195
196 /*
197 pub fn dump_pxar(&self, mut writer: Box<dyn Write>) -> Result<(), Error> {
198
199 for pos in 0..self.index_entries {
200 let _end = self.chunk_end(pos);
201 let digest = self.chunk_digest(pos);
202 //println!("Dump {:08x}", end );
203 let chunk = self.store.read_chunk(digest)?;
204 // fimxe: handle encrypted chunks
205 let data = chunk.decode(None)?;
206 writer.write_all(&data)?;
207 }
208
209 Ok(())
210 }
211 */
212
213 fn binary_search(
214 &self,
215 start_idx: usize,
216 start: u64,
217 end_idx: usize,
218 end: u64,
219 offset: u64,
220 ) -> Result<usize, Error> {
221 if (offset >= end) || (offset < start) {
222 bail!("offset out of range");
223 }
224
225 if end_idx == start_idx {
226 return Ok(start_idx); // found
227 }
228 let middle_idx = (start_idx + end_idx) / 2;
229 let middle_end = self.chunk_end(middle_idx);
230
231 if offset < middle_end {
232 self.binary_search(start_idx, start, middle_idx, middle_end, offset)
233 } else {
234 self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset)
235 }
236 }
237 }
238
239 impl IndexFile for DynamicIndexReader {
240 fn index_count(&self) -> usize {
241 self.index_entries
242 }
243
244 fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> {
245 if pos >= self.index_entries {
246 None
247 } else {
248 Some(unsafe { std::mem::transmute(self.chunk_digest(pos).as_ptr()) })
249 }
250 }
251
252 fn index_bytes(&self) -> u64 {
253 if self.index_entries == 0 {
254 0
255 } else {
256 self.chunk_end((self.index_entries - 1) as usize)
257 }
258 }
259 }
260
261 pub struct BufferedDynamicReader<S> {
262 store: S,
263 index: DynamicIndexReader,
264 archive_size: u64,
265 read_buffer: Vec<u8>,
266 buffered_chunk_idx: usize,
267 buffered_chunk_start: u64,
268 read_offset: u64,
269 }
270
271 impl<S: ReadChunk> BufferedDynamicReader<S> {
272 pub fn new(index: DynamicIndexReader, store: S) -> Self {
273 let archive_size = index.chunk_end(index.index_entries - 1);
274 Self {
275 store,
276 index,
277 archive_size,
278 read_buffer: Vec::with_capacity(1024 * 1024),
279 buffered_chunk_idx: 0,
280 buffered_chunk_start: 0,
281 read_offset: 0,
282 }
283 }
284
285 pub fn archive_size(&self) -> u64 {
286 self.archive_size
287 }
288
289 fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
290 let index = &self.index;
291 let (start, end, digest) = index.chunk_info(idx)?;
292
293 // fixme: avoid copy
294
295 let data = self.store.read_chunk(&digest)?;
296
297 if (end - start) != data.len() as u64 {
298 bail!(
299 "read chunk with wrong size ({} != {}",
300 (end - start),
301 data.len()
302 );
303 }
304
305 self.read_buffer.clear();
306 self.read_buffer.extend_from_slice(&data);
307
308 self.buffered_chunk_idx = idx;
309
310 self.buffered_chunk_start = start as u64;
311 //println!("BUFFER {} {}", self.buffered_chunk_start, end);
312 Ok(())
313 }
314 }
315
316 impl<S: ReadChunk> crate::tools::BufferedRead for BufferedDynamicReader<S> {
317 fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
318 if offset == self.archive_size {
319 return Ok(&self.read_buffer[0..0]);
320 }
321
322 let buffer_len = self.read_buffer.len();
323 let index = &self.index;
324
325 // optimization for sequential read
326 if buffer_len > 0
327 && ((self.buffered_chunk_idx + 1) < index.index_entries)
328 && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
329 {
330 let next_idx = self.buffered_chunk_idx + 1;
331 let next_end = index.chunk_end(next_idx);
332 if offset < next_end {
333 self.buffer_chunk(next_idx)?;
334 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
335 return Ok(&self.read_buffer[buffer_offset..]);
336 }
337 }
338
339 if (buffer_len == 0)
340 || (offset < self.buffered_chunk_start)
341 || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
342 {
343 let end_idx = index.index_entries - 1;
344 let end = index.chunk_end(end_idx);
345 let idx = index.binary_search(0, 0, end_idx, end, offset)?;
346 self.buffer_chunk(idx)?;
347 }
348
349 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
350 Ok(&self.read_buffer[buffer_offset..])
351 }
352 }
353
354 impl<S: ReadChunk> std::io::Read for BufferedDynamicReader<S> {
355 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
356 use crate::tools::BufferedRead;
357 use std::io::{Error, ErrorKind};
358
359 let data = match self.buffered_read(self.read_offset) {
360 Ok(v) => v,
361 Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
362 };
363
364 let n = if data.len() > buf.len() {
365 buf.len()
366 } else {
367 data.len()
368 };
369
370 unsafe {
371 std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n);
372 }
373
374 self.read_offset += n as u64;
375
376 Ok(n)
377 }
378 }
379
380 impl<S: ReadChunk> std::io::Seek for BufferedDynamicReader<S> {
381 fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
382 let new_offset = match pos {
383 SeekFrom::Start(start_offset) => start_offset as i64,
384 SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
385 SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
386 };
387
388 use std::io::{Error, ErrorKind};
389 if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
390 return Err(Error::new(
391 ErrorKind::Other,
392 format!(
393 "seek is out of range {} ([0..{}])",
394 new_offset, self.archive_size
395 ),
396 ));
397 }
398 self.read_offset = new_offset as u64;
399
400 Ok(self.read_offset)
401 }
402 }
403
404 /// Create dynamic index files (`.dixd`)
405 pub struct DynamicIndexWriter {
406 store: Arc<ChunkStore>,
407 _lock: tools::ProcessLockSharedGuard,
408 writer: BufWriter<File>,
409 closed: bool,
410 filename: PathBuf,
411 tmp_filename: PathBuf,
412 csum: Option<openssl::sha::Sha256>,
413 pub uuid: [u8; 16],
414 pub ctime: u64,
415 }
416
417 impl Drop for DynamicIndexWriter {
418 fn drop(&mut self) {
419 let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors
420 }
421 }
422
423 impl DynamicIndexWriter {
424 pub fn create(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> {
425 let shared_lock = store.try_shared_lock()?;
426
427 let full_path = store.relative_path(path);
428 let mut tmp_path = full_path.clone();
429 tmp_path.set_extension("tmp_didx");
430
431 let file = std::fs::OpenOptions::new()
432 .create(true)
433 .truncate(true)
434 .read(true)
435 .write(true)
436 .open(&tmp_path)?;
437
438 let mut writer = BufWriter::with_capacity(1024 * 1024, file);
439
440 let header_size = std::mem::size_of::<DynamicIndexHeader>();
441
442 // todo: use static assertion when available in rust
443 if header_size != 4096 {
444 panic!("got unexpected header size");
445 }
446
447 let ctime = std::time::SystemTime::now()
448 .duration_since(std::time::SystemTime::UNIX_EPOCH)?
449 .as_secs();
450
451 let uuid = Uuid::generate();
452
453 let mut buffer = vec::zeroed(header_size);
454 let header = crate::tools::map_struct_mut::<DynamicIndexHeader>(&mut buffer)?;
455
456 header.magic = super::DYNAMIC_SIZED_CHUNK_INDEX_1_0;
457 header.ctime = u64::to_le(ctime);
458 header.uuid = *uuid.as_bytes();
459
460 header.index_csum = [0u8; 32];
461
462 writer.write_all(&buffer)?;
463
464 let csum = Some(openssl::sha::Sha256::new());
465
466 Ok(Self {
467 store,
468 _lock: shared_lock,
469 writer,
470 closed: false,
471 filename: full_path,
472 tmp_filename: tmp_path,
473 ctime,
474 uuid: *uuid.as_bytes(),
475 csum,
476 })
477 }
478
479 // fixme: use add_chunk instead?
480 pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> {
481 self.store.insert_chunk(chunk, digest)
482 }
483
484 pub fn close(&mut self) -> Result<[u8; 32], Error> {
485 if self.closed {
486 bail!(
487 "cannot close already closed archive index file {:?}",
488 self.filename
489 );
490 }
491
492 self.closed = true;
493
494 self.writer.flush()?;
495
496 let csum_offset = proxmox::tools::offsetof!(DynamicIndexHeader, index_csum);
497 self.writer.seek(SeekFrom::Start(csum_offset as u64))?;
498
499 let csum = self.csum.take().unwrap();
500 let index_csum = csum.finish();
501
502 self.writer.write_all(&index_csum)?;
503 self.writer.flush()?;
504
505 if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
506 bail!("Atomic rename file {:?} failed - {}", self.filename, err);
507 }
508
509 Ok(index_csum)
510 }
511
512 // fixme: rename to add_digest
513 pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), Error> {
514 if self.closed {
515 bail!(
516 "cannot write to closed dynamic index file {:?}",
517 self.filename
518 );
519 }
520
521 let offset_le: &[u8; 8] = unsafe { &std::mem::transmute::<u64, [u8; 8]>(offset.to_le()) };
522
523 if let Some(ref mut csum) = self.csum {
524 csum.update(offset_le);
525 csum.update(digest);
526 }
527
528 self.writer.write_all(offset_le)?;
529 self.writer.write_all(digest)?;
530 Ok(())
531 }
532 }
533
534 /// Writer which splits a binary stream into dynamic sized chunks
535 ///
536 /// And store the resulting chunk list into the index file.
537 pub struct DynamicChunkWriter {
538 index: DynamicIndexWriter,
539 closed: bool,
540 chunker: Chunker,
541 stat: ChunkStat,
542 chunk_offset: usize,
543 last_chunk: usize,
544 chunk_buffer: Vec<u8>,
545 }
546
547 impl DynamicChunkWriter {
548 pub fn new(index: DynamicIndexWriter, chunk_size: usize) -> Self {
549 Self {
550 index,
551 closed: false,
552 chunker: Chunker::new(chunk_size),
553 stat: ChunkStat::new(0),
554 chunk_offset: 0,
555 last_chunk: 0,
556 chunk_buffer: Vec::with_capacity(chunk_size * 4),
557 }
558 }
559
560 pub fn stat(&self) -> &ChunkStat {
561 &self.stat
562 }
563
564 pub fn close(&mut self) -> Result<(), Error> {
565 if self.closed {
566 return Ok(());
567 }
568
569 self.closed = true;
570
571 self.write_chunk_buffer()?;
572
573 self.index.close()?;
574
575 self.stat.size = self.chunk_offset as u64;
576
577 // add size of index file
578 self.stat.size +=
579 (self.stat.chunk_count * 40 + std::mem::size_of::<DynamicIndexHeader>()) as u64;
580
581 Ok(())
582 }
583
584 fn write_chunk_buffer(&mut self) -> Result<(), Error> {
585 let chunk_size = self.chunk_buffer.len();
586
587 if chunk_size == 0 {
588 return Ok(());
589 }
590
591 let expected_chunk_size = self.chunk_offset - self.last_chunk;
592 if expected_chunk_size != self.chunk_buffer.len() {
593 bail!("wrong chunk size {} != {}", expected_chunk_size, chunk_size);
594 }
595
596 self.stat.chunk_count += 1;
597
598 self.last_chunk = self.chunk_offset;
599
600 let (chunk, digest) = DataChunkBuilder::new(&self.chunk_buffer)
601 .compress(true)
602 .build()?;
603
604 match self.index.insert_chunk(&chunk, &digest) {
605 Ok((is_duplicate, compressed_size)) => {
606 self.stat.compressed_size += compressed_size;
607 if is_duplicate {
608 self.stat.duplicate_chunks += 1;
609 } else {
610 self.stat.disk_size += compressed_size;
611 }
612
613 println!(
614 "ADD CHUNK {:016x} {} {}% {} {}",
615 self.chunk_offset,
616 chunk_size,
617 (compressed_size * 100) / (chunk_size as u64),
618 is_duplicate,
619 proxmox::tools::digest_to_hex(&digest)
620 );
621 self.index.add_chunk(self.chunk_offset as u64, &digest)?;
622 self.chunk_buffer.truncate(0);
623 Ok(())
624 }
625 Err(err) => {
626 self.chunk_buffer.truncate(0);
627 Err(err)
628 }
629 }
630 }
631 }
632
633 impl Write for DynamicChunkWriter {
634 fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
635 let chunker = &mut self.chunker;
636
637 let pos = chunker.scan(data);
638
639 if pos > 0 {
640 self.chunk_buffer.extend_from_slice(&data[0..pos]);
641 self.chunk_offset += pos;
642
643 if let Err(err) = self.write_chunk_buffer() {
644 return Err(std::io::Error::new(
645 std::io::ErrorKind::Other,
646 err.to_string(),
647 ));
648 }
649 Ok(pos)
650 } else {
651 self.chunk_offset += data.len();
652 self.chunk_buffer.extend_from_slice(data);
653 Ok(data.len())
654 }
655 }
656
657 fn flush(&mut self) -> std::result::Result<(), std::io::Error> {
658 Err(std::io::Error::new(
659 std::io::ErrorKind::Other,
660 "please use close() instead of flush()",
661 ))
662 }
663 }