]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/fixed_index.rs
partially revert commit 1f82f9b7b5d231da22a541432d5617cb303c0000
[proxmox-backup.git] / src / backup / fixed_index.rs
1 use anyhow::{bail, format_err, Error};
2 use std::io::{Seek, SeekFrom};
3
4 use super::chunk_stat::*;
5 use super::chunk_store::*;
6 use super::{IndexFile, ChunkReadInfo};
7 use crate::tools::{self, epoch_now_u64};
8
9 use chrono::{Local, TimeZone};
10 use std::fs::File;
11 use std::io::Write;
12 use std::os::unix::io::AsRawFd;
13 use std::path::{Path, PathBuf};
14 use std::sync::Arc;
15
16 use super::read_chunk::*;
17 use super::ChunkInfo;
18
19 use proxmox::tools::io::ReadExt;
20 use proxmox::tools::Uuid;
21
22 /// Header format definition for fixed index files (`.fidx`)
23 #[repr(C)]
24 pub struct FixedIndexHeader {
25 pub magic: [u8; 8],
26 pub uuid: [u8; 16],
27 pub ctime: u64,
28 /// Sha256 over the index ``SHA256(digest1||digest2||...)``
29 pub index_csum: [u8; 32],
30 pub size: u64,
31 pub chunk_size: u64,
32 reserved: [u8; 4016], // overall size is one page (4096 bytes)
33 }
34 proxmox::static_assert_size!(FixedIndexHeader, 4096);
35
36 // split image into fixed size chunks
37
38 pub struct FixedIndexReader {
39 _file: File,
40 pub chunk_size: usize,
41 pub size: u64,
42 index_length: usize,
43 index: *mut u8,
44 pub uuid: [u8; 16],
45 pub ctime: u64,
46 pub index_csum: [u8; 32],
47 }
48
49 // `index` is mmap()ed which cannot be thread-local so should be sendable
50 unsafe impl Send for FixedIndexReader {}
51 unsafe impl Sync for FixedIndexReader {}
52
53 impl Drop for FixedIndexReader {
54 fn drop(&mut self) {
55 if let Err(err) = self.unmap() {
56 eprintln!("Unable to unmap file - {}", err);
57 }
58 }
59 }
60
61 impl FixedIndexReader {
62 pub fn open(path: &Path) -> Result<Self, Error> {
63 File::open(path)
64 .map_err(Error::from)
65 .and_then(|file| Self::new(file))
66 .map_err(|err| format_err!("Unable to open fixed index {:?} - {}", path, err))
67 }
68
69 pub fn new(mut file: std::fs::File) -> Result<Self, Error> {
70 if let Err(err) =
71 nix::fcntl::flock(file.as_raw_fd(), nix::fcntl::FlockArg::LockSharedNonblock)
72 {
73 bail!("unable to get shared lock - {}", err);
74 }
75
76 file.seek(SeekFrom::Start(0))?;
77
78 let header_size = std::mem::size_of::<FixedIndexHeader>();
79 let header: Box<FixedIndexHeader> = unsafe { file.read_host_value_boxed()? };
80
81 if header.magic != super::FIXED_SIZED_CHUNK_INDEX_1_0 {
82 bail!("got unknown magic number");
83 }
84
85 let size = u64::from_le(header.size);
86 let ctime = u64::from_le(header.ctime);
87 let chunk_size = u64::from_le(header.chunk_size);
88
89 let index_length = ((size + chunk_size - 1) / chunk_size) as usize;
90 let index_size = index_length * 32;
91
92 let rawfd = file.as_raw_fd();
93
94 let stat = match nix::sys::stat::fstat(rawfd) {
95 Ok(stat) => stat,
96 Err(err) => bail!("fstat failed - {}", err),
97 };
98
99 let expected_index_size = (stat.st_size as usize) - header_size;
100 if index_size != expected_index_size {
101 bail!(
102 "got unexpected file size ({} != {})",
103 index_size,
104 expected_index_size
105 );
106 }
107
108 let data = unsafe {
109 nix::sys::mman::mmap(
110 std::ptr::null_mut(),
111 index_size,
112 nix::sys::mman::ProtFlags::PROT_READ,
113 nix::sys::mman::MapFlags::MAP_PRIVATE,
114 file.as_raw_fd(),
115 header_size as i64,
116 )
117 }? as *mut u8;
118
119 Ok(Self {
120 _file: file,
121 chunk_size: chunk_size as usize,
122 size,
123 index_length,
124 index: data,
125 ctime,
126 uuid: header.uuid,
127 index_csum: header.index_csum,
128 })
129 }
130
131 fn unmap(&mut self) -> Result<(), Error> {
132 if self.index == std::ptr::null_mut() {
133 return Ok(());
134 }
135
136 let index_size = self.index_length * 32;
137
138 if let Err(err) =
139 unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) }
140 {
141 bail!("unmap file failed - {}", err);
142 }
143
144 self.index = std::ptr::null_mut();
145
146 Ok(())
147 }
148
149 #[inline]
150 fn chunk_end(&self, pos: usize) -> u64 {
151 if pos >= self.index_length {
152 panic!("chunk index out of range");
153 }
154
155 let end = ((pos + 1) * self.chunk_size) as u64;
156 if end > self.size {
157 self.size
158 } else {
159 end
160 }
161 }
162
163 pub fn print_info(&self) {
164 println!("Size: {}", self.size);
165 println!("ChunkSize: {}", self.chunk_size);
166 println!(
167 "CTime: {}",
168 Local.timestamp(self.ctime as i64, 0).format("%c")
169 );
170 println!("UUID: {:?}", self.uuid);
171 }
172 }
173
174 impl IndexFile for FixedIndexReader {
175 fn index_count(&self) -> usize {
176 self.index_length
177 }
178
179 fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> {
180 if pos >= self.index_length {
181 None
182 } else {
183 Some(unsafe { std::mem::transmute(self.index.add(pos * 32)) })
184 }
185 }
186
187 fn index_bytes(&self) -> u64 {
188 self.size
189 }
190
191 fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo> {
192 if pos >= self.index_length {
193 return None;
194 }
195
196 let start = (pos * self.chunk_size) as u64;
197 let mut end = start + self.chunk_size as u64;
198
199 if end > self.size {
200 end = self.size;
201 }
202
203 let digest = self.index_digest(pos).unwrap();
204 Some(ChunkReadInfo {
205 range: start..end,
206 digest: *digest,
207 })
208 }
209
210 fn compute_csum(&self) -> ([u8; 32], u64) {
211 let mut csum = openssl::sha::Sha256::new();
212 let mut chunk_end = 0;
213 for pos in 0..self.index_count() {
214 let info = self.chunk_info(pos).unwrap();
215 chunk_end = info.range.end;
216 csum.update(&info.digest);
217 }
218 let csum = csum.finish();
219
220 (csum, chunk_end)
221 }
222 }
223
224 pub struct FixedIndexWriter {
225 store: Arc<ChunkStore>,
226 file: File,
227 _lock: tools::ProcessLockSharedGuard,
228 filename: PathBuf,
229 tmp_filename: PathBuf,
230 chunk_size: usize,
231 size: usize,
232 index_length: usize,
233 index: *mut u8,
234 pub uuid: [u8; 16],
235 pub ctime: u64,
236 }
237
238 // `index` is mmap()ed which cannot be thread-local so should be sendable
239 unsafe impl Send for FixedIndexWriter {}
240
241 impl Drop for FixedIndexWriter {
242 fn drop(&mut self) {
243 let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors
244 if let Err(err) = self.unmap() {
245 eprintln!("Unable to unmap file {:?} - {}", self.tmp_filename, err);
246 }
247 }
248 }
249
250 impl FixedIndexWriter {
251 #[allow(clippy::cast_ptr_alignment)]
252 pub fn create(
253 store: Arc<ChunkStore>,
254 path: &Path,
255 size: usize,
256 chunk_size: usize,
257 ) -> Result<Self, Error> {
258 let shared_lock = store.try_shared_lock()?;
259
260 let full_path = store.relative_path(path);
261 let mut tmp_path = full_path.clone();
262 tmp_path.set_extension("tmp_fidx");
263
264 let mut file = std::fs::OpenOptions::new()
265 .create(true)
266 .truncate(true)
267 .read(true)
268 .write(true)
269 .open(&tmp_path)?;
270
271 let header_size = std::mem::size_of::<FixedIndexHeader>();
272
273 // todo: use static assertion when available in rust
274 if header_size != 4096 {
275 panic!("got unexpected header size");
276 }
277
278 let ctime = epoch_now_u64()?;
279
280 let uuid = Uuid::generate();
281
282 let buffer = vec![0u8; header_size];
283 let header = unsafe { &mut *(buffer.as_ptr() as *mut FixedIndexHeader) };
284
285 header.magic = super::FIXED_SIZED_CHUNK_INDEX_1_0;
286 header.ctime = u64::to_le(ctime);
287 header.size = u64::to_le(size as u64);
288 header.chunk_size = u64::to_le(chunk_size as u64);
289 header.uuid = *uuid.as_bytes();
290
291 header.index_csum = [0u8; 32];
292
293 file.write_all(&buffer)?;
294
295 let index_length = (size + chunk_size - 1) / chunk_size;
296 let index_size = index_length * 32;
297 nix::unistd::ftruncate(file.as_raw_fd(), (header_size + index_size) as i64)?;
298
299 let data = unsafe {
300 nix::sys::mman::mmap(
301 std::ptr::null_mut(),
302 index_size,
303 nix::sys::mman::ProtFlags::PROT_READ | nix::sys::mman::ProtFlags::PROT_WRITE,
304 nix::sys::mman::MapFlags::MAP_SHARED,
305 file.as_raw_fd(),
306 header_size as i64,
307 )
308 }? as *mut u8;
309
310 Ok(Self {
311 store,
312 file,
313 _lock: shared_lock,
314 filename: full_path,
315 tmp_filename: tmp_path,
316 chunk_size,
317 size,
318 index_length,
319 index: data,
320 ctime,
321 uuid: *uuid.as_bytes(),
322 })
323 }
324
325 pub fn index_length(&self) -> usize {
326 self.index_length
327 }
328
329 fn unmap(&mut self) -> Result<(), Error> {
330 if self.index == std::ptr::null_mut() {
331 return Ok(());
332 }
333
334 let index_size = self.index_length * 32;
335
336 if let Err(err) =
337 unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) }
338 {
339 bail!("unmap file {:?} failed - {}", self.tmp_filename, err);
340 }
341
342 self.index = std::ptr::null_mut();
343
344 Ok(())
345 }
346
347 pub fn close(&mut self) -> Result<[u8; 32], Error> {
348 if self.index == std::ptr::null_mut() {
349 bail!("cannot close already closed index file.");
350 }
351
352 let index_size = self.index_length * 32;
353 let data = unsafe { std::slice::from_raw_parts(self.index, index_size) };
354 let index_csum = openssl::sha::sha256(data);
355
356 self.unmap()?;
357
358 let csum_offset = proxmox::offsetof!(FixedIndexHeader, index_csum);
359 self.file.seek(SeekFrom::Start(csum_offset as u64))?;
360 self.file.write_all(&index_csum)?;
361 self.file.flush()?;
362
363 if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
364 bail!("Atomic rename file {:?} failed - {}", self.filename, err);
365 }
366
367 Ok(index_csum)
368 }
369
370 pub fn check_chunk_alignment(&self, offset: usize, chunk_len: usize) -> Result<usize, Error> {
371 if offset < chunk_len {
372 bail!("got chunk with small offset ({} < {}", offset, chunk_len);
373 }
374
375 let pos = offset - chunk_len;
376
377 if offset > self.size {
378 bail!("chunk data exceeds size ({} >= {})", offset, self.size);
379 }
380
381 // last chunk can be smaller
382 if ((offset != self.size) && (chunk_len != self.chunk_size))
383 || (chunk_len > self.chunk_size)
384 || (chunk_len == 0)
385 {
386 bail!(
387 "chunk with unexpected length ({} != {}",
388 chunk_len,
389 self.chunk_size
390 );
391 }
392
393 if pos & (self.chunk_size - 1) != 0 {
394 bail!("got unaligned chunk (pos = {})", pos);
395 }
396
397 Ok(pos / self.chunk_size)
398 }
399
400 // Note: We want to add data out of order, so do not assume any order here.
401 pub fn add_chunk(&mut self, chunk_info: &ChunkInfo, stat: &mut ChunkStat) -> Result<(), Error> {
402 let chunk_len = chunk_info.chunk_len as usize;
403 let offset = chunk_info.offset as usize; // end of chunk
404
405 let idx = self.check_chunk_alignment(offset, chunk_len)?;
406
407 let (is_duplicate, compressed_size) = self
408 .store
409 .insert_chunk(&chunk_info.chunk, &chunk_info.digest)?;
410
411 stat.chunk_count += 1;
412 stat.compressed_size += compressed_size;
413
414 let digest = &chunk_info.digest;
415
416 println!(
417 "ADD CHUNK {} {} {}% {} {}",
418 idx,
419 chunk_len,
420 (compressed_size * 100) / (chunk_len as u64),
421 is_duplicate,
422 proxmox::tools::digest_to_hex(digest)
423 );
424
425 if is_duplicate {
426 stat.duplicate_chunks += 1;
427 } else {
428 stat.disk_size += compressed_size;
429 }
430
431 self.add_digest(idx, digest)
432 }
433
434 pub fn add_digest(&mut self, index: usize, digest: &[u8; 32]) -> Result<(), Error> {
435 if index >= self.index_length {
436 bail!(
437 "add digest failed - index out of range ({} >= {})",
438 index,
439 self.index_length
440 );
441 }
442
443 if self.index == std::ptr::null_mut() {
444 bail!("cannot write to closed index file.");
445 }
446
447 let index_pos = index * 32;
448 unsafe {
449 let dst = self.index.add(index_pos);
450 dst.copy_from_nonoverlapping(digest.as_ptr(), 32);
451 }
452
453 Ok(())
454 }
455
456 pub fn clone_data_from(&mut self, reader: &FixedIndexReader) -> Result<(), Error> {
457 if self.index_length != reader.index_count() {
458 bail!("clone_data_from failed - index sizes not equal");
459 }
460
461 for i in 0..self.index_length {
462 self.add_digest(i, reader.index_digest(i).unwrap())?;
463 }
464
465 Ok(())
466 }
467 }
468
469 pub struct BufferedFixedReader<S> {
470 store: S,
471 index: FixedIndexReader,
472 archive_size: u64,
473 read_buffer: Vec<u8>,
474 buffered_chunk_idx: usize,
475 buffered_chunk_start: u64,
476 read_offset: u64,
477 }
478
479 impl<S: ReadChunk> BufferedFixedReader<S> {
480 pub fn new(index: FixedIndexReader, store: S) -> Self {
481 let archive_size = index.size;
482 Self {
483 store,
484 index,
485 archive_size,
486 read_buffer: Vec::with_capacity(1024 * 1024),
487 buffered_chunk_idx: 0,
488 buffered_chunk_start: 0,
489 read_offset: 0,
490 }
491 }
492
493 pub fn archive_size(&self) -> u64 {
494 self.archive_size
495 }
496
497 fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
498 let index = &self.index;
499 let info = match index.chunk_info(idx) {
500 Some(info) => info,
501 None => bail!("chunk index out of range"),
502 };
503
504 // fixme: avoid copy
505
506 let data = self.store.read_chunk(&info.digest)?;
507 let size = info.range.end - info.range.start;
508 if size != data.len() as u64 {
509 bail!("read chunk with wrong size ({} != {}", size, data.len());
510 }
511
512 self.read_buffer.clear();
513 self.read_buffer.extend_from_slice(&data);
514
515 self.buffered_chunk_idx = idx;
516
517 self.buffered_chunk_start = info.range.start as u64;
518 Ok(())
519 }
520 }
521
522 impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> {
523 fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
524 if offset == self.archive_size {
525 return Ok(&self.read_buffer[0..0]);
526 }
527
528 let buffer_len = self.read_buffer.len();
529 let index = &self.index;
530
531 // optimization for sequential read
532 if buffer_len > 0
533 && ((self.buffered_chunk_idx + 1) < index.index_length)
534 && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
535 {
536 let next_idx = self.buffered_chunk_idx + 1;
537 let next_end = index.chunk_end(next_idx);
538 if offset < next_end {
539 self.buffer_chunk(next_idx)?;
540 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
541 return Ok(&self.read_buffer[buffer_offset..]);
542 }
543 }
544
545 if (buffer_len == 0)
546 || (offset < self.buffered_chunk_start)
547 || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
548 {
549 let idx = (offset / index.chunk_size as u64) as usize;
550 self.buffer_chunk(idx)?;
551 }
552
553 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
554 Ok(&self.read_buffer[buffer_offset..])
555 }
556 }
557
558 impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> {
559 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
560 use crate::tools::BufferedRead;
561 use std::io::{Error, ErrorKind};
562
563 let data = match self.buffered_read(self.read_offset) {
564 Ok(v) => v,
565 Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
566 };
567
568 let n = if data.len() > buf.len() {
569 buf.len()
570 } else {
571 data.len()
572 };
573
574 unsafe {
575 std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n);
576 }
577
578 self.read_offset += n as u64;
579
580 Ok(n)
581 }
582 }
583
584 impl<S: ReadChunk> Seek for BufferedFixedReader<S> {
585 fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
586 let new_offset = match pos {
587 SeekFrom::Start(start_offset) => start_offset as i64,
588 SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
589 SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
590 };
591
592 use std::io::{Error, ErrorKind};
593 if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
594 return Err(Error::new(
595 ErrorKind::Other,
596 format!(
597 "seek is out of range {} ([0..{}])",
598 new_offset, self.archive_size
599 ),
600 ));
601 }
602 self.read_offset = new_offset as u64;
603
604 Ok(self.read_offset)
605 }
606 }