]> git.proxmox.com Git - proxmox-backup.git/blame - src/backup/fixed_index.rs
add and implement chunk_from_offset for IndexFile
[proxmox-backup.git] / src / backup / fixed_index.rs
CommitLineData
f7d4e4b5 1use anyhow::{bail, format_err, Error};
f569acc5 2use std::io::{Seek, SeekFrom};
606ce64b 3
7e336555 4use super::chunk_stat::*;
606ce64b 5use super::chunk_store::*;
fdaab0df 6use super::{IndexFile, ChunkReadInfo};
e693818a 7use crate::tools::{self, epoch_now_u64};
606ce64b 8
f569acc5 9use chrono::{Local, TimeZone};
10eea49d 10use std::fs::File;
f569acc5 11use std::io::Write;
606ce64b 12use std::os::unix::io::AsRawFd;
f569acc5
WB
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
afb4cd28 15
afb4cd28 16use super::read_chunk::*;
f569acc5 17use super::ChunkInfo;
606ce64b 18
4dc79bb1 19use proxmox::tools::io::ReadExt;
f569acc5 20use proxmox::tools::Uuid;
4dc79bb1 21
8e39232a 22/// Header format definition for fixed index files (`.fidx`)
d13e3745 23#[repr(C)]
91a905b6 24pub struct FixedIndexHeader {
a7dd4830 25 pub magic: [u8; 8],
d13e3745 26 pub uuid: [u8; 16],
5e5b7f1c 27 pub ctime: u64,
9335d74e
DM
28 /// Sha256 over the index ``SHA256(digest1||digest2||...)``
29 pub index_csum: [u8; 32],
a7dd4830
DM
30 pub size: u64,
31 pub chunk_size: u64,
32 reserved: [u8; 4016], // overall size is one page (4096 bytes)
d13e3745 33}
9ea4bce4 34proxmox::static_assert_size!(FixedIndexHeader, 4096);
606ce64b
DM
35
36// split image into fixed size chunks
37
91a905b6 38pub struct FixedIndexReader {
10eea49d 39 _file: File,
29ae5c86 40 pub chunk_size: usize,
b46c3fad 41 pub size: u64,
e1225de4 42 index_length: usize,
4818c8b6 43 index: *mut u8,
9f49fe1d
DM
44 pub uuid: [u8; 16],
45 pub ctime: u64,
9335d74e 46 pub index_csum: [u8; 32],
4818c8b6
DM
47}
48
5be4065b
WB
49// `index` is mmap()ed which cannot be thread-local so should be sendable
50unsafe impl Send for FixedIndexReader {}
5c1130df 51unsafe impl Sync for FixedIndexReader {}
5be4065b 52
91a905b6 53impl Drop for FixedIndexReader {
4818c8b6
DM
54 fn drop(&mut self) {
55 if let Err(err) = self.unmap() {
a7c72ad9 56 eprintln!("Unable to unmap file - {}", err);
4818c8b6
DM
57 }
58 }
59}
60
91a905b6 61impl FixedIndexReader {
a7c72ad9 62 pub fn open(path: &Path) -> Result<Self, Error> {
a7c72ad9
DM
63 File::open(path)
64 .map_err(Error::from)
65 .and_then(|file| Self::new(file))
66 .map_err(|err| format_err!("Unable to open fixed index {:?} - {}", path, err))
67 }
4818c8b6 68
a7c72ad9 69 pub fn new(mut file: std::fs::File) -> Result<Self, Error> {
f569acc5
WB
70 if let Err(err) =
71 nix::fcntl::flock(file.as_raw_fd(), nix::fcntl::FlockArg::LockSharedNonblock)
72 {
a7c72ad9 73 bail!("unable to get shared lock - {}", err);
c597a92c
DM
74 }
75
afb4cd28
DM
76 file.seek(SeekFrom::Start(0))?;
77
91a905b6 78 let header_size = std::mem::size_of::<FixedIndexHeader>();
4dc79bb1 79 let header: Box<FixedIndexHeader> = unsafe { file.read_host_value_boxed()? };
4818c8b6 80
a7dd4830 81 if header.magic != super::FIXED_SIZED_CHUNK_INDEX_1_0 {
a7c72ad9 82 bail!("got unknown magic number");
a360f6fa
DM
83 }
84
b46c3fad 85 let size = u64::from_le(header.size);
48d0d356 86 let ctime = u64::from_le(header.ctime);
b46c3fad 87 let chunk_size = u64::from_le(header.chunk_size);
4818c8b6 88
f569acc5
WB
89 let index_length = ((size + chunk_size - 1) / chunk_size) as usize;
90 let index_size = index_length * 32;
4818c8b6 91
0b8e75ed
DM
92 let rawfd = file.as_raw_fd();
93
94 let stat = match nix::sys::stat::fstat(rawfd) {
95 Ok(stat) => stat,
a7c72ad9 96 Err(err) => bail!("fstat failed - {}", err),
0b8e75ed
DM
97 };
98
ddbdf80d 99 let expected_index_size = (stat.st_size as usize) - header_size;
0b8e75ed 100 if index_size != expected_index_size {
f569acc5
WB
101 bail!(
102 "got unexpected file size ({} != {})",
103 index_size,
104 expected_index_size
105 );
0b8e75ed 106 }
4818c8b6 107
f569acc5
WB
108 let data = unsafe {
109 nix::sys::mman::mmap(
110 std::ptr::null_mut(),
111 index_size,
112 nix::sys::mman::ProtFlags::PROT_READ,
113 nix::sys::mman::MapFlags::MAP_PRIVATE,
114 file.as_raw_fd(),
115 header_size as i64,
116 )
117 }? as *mut u8;
4818c8b6
DM
118
119 Ok(Self {
10eea49d 120 _file: file,
b46c3fad 121 chunk_size: chunk_size as usize,
4818c8b6 122 size,
e1225de4 123 index_length,
4818c8b6
DM
124 index: data,
125 ctime,
126 uuid: header.uuid,
9335d74e 127 index_csum: header.index_csum,
4818c8b6
DM
128 })
129 }
130
131 fn unmap(&mut self) -> Result<(), Error> {
f569acc5
WB
132 if self.index == std::ptr::null_mut() {
133 return Ok(());
134 }
4818c8b6 135
f569acc5 136 let index_size = self.index_length * 32;
4818c8b6 137
f569acc5
WB
138 if let Err(err) =
139 unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) }
140 {
a7c72ad9 141 bail!("unmap file failed - {}", err);
4818c8b6
DM
142 }
143
144 self.index = std::ptr::null_mut();
145
146 Ok(())
147 }
148
afb4cd28
DM
149 #[inline]
150 fn chunk_end(&self, pos: usize) -> u64 {
151 if pos >= self.index_length {
152 panic!("chunk index out of range");
153 }
154
f569acc5 155 let end = ((pos + 1) * self.chunk_size) as u64;
afb4cd28
DM
156 if end > self.size {
157 self.size
158 } else {
159 end
160 }
161 }
162
4818c8b6 163 pub fn print_info(&self) {
4818c8b6
DM
164 println!("Size: {}", self.size);
165 println!("ChunkSize: {}", self.chunk_size);
f569acc5
WB
166 println!(
167 "CTime: {}",
168 Local.timestamp(self.ctime as i64, 0).format("%c")
169 );
4818c8b6
DM
170 println!("UUID: {:?}", self.uuid);
171 }
172}
173
7bc1d727
WB
174impl IndexFile for FixedIndexReader {
175 fn index_count(&self) -> usize {
e1225de4 176 self.index_length
7bc1d727
WB
177 }
178
179 fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> {
e1225de4 180 if pos >= self.index_length {
7bc1d727
WB
181 None
182 } else {
f569acc5 183 Some(unsafe { std::mem::transmute(self.index.add(pos * 32)) })
7bc1d727
WB
184 }
185 }
a660978c
DM
186
187 fn index_bytes(&self) -> u64 {
b46c3fad 188 self.size
a660978c 189 }
fdaab0df
DM
190
191 fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo> {
192 if pos >= self.index_length {
193 return None;
194 }
195
196 let start = (pos * self.chunk_size) as u64;
197 let mut end = start + self.chunk_size as u64;
198
199 if end > self.size {
200 end = self.size;
201 }
202
203 let digest = self.index_digest(pos).unwrap();
204 Some(ChunkReadInfo {
205 range: start..end,
206 digest: *digest,
207 })
208 }
2e079b8b
DM
209
210 fn compute_csum(&self) -> ([u8; 32], u64) {
211 let mut csum = openssl::sha::Sha256::new();
212 let mut chunk_end = 0;
213 for pos in 0..self.index_count() {
214 let info = self.chunk_info(pos).unwrap();
215 chunk_end = info.range.end;
216 csum.update(&info.digest);
217 }
218 let csum = csum.finish();
219
220 (csum, chunk_end)
221 }
d0463b67
SR
222
223 fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
224 if offset >= self.size {
225 return None;
226 }
227
228 Some((
229 (offset / self.chunk_size as u64) as usize,
230 offset % self.chunk_size as u64
231 ))
232 }
7bc1d727
WB
233}
234
91a905b6 235pub struct FixedIndexWriter {
150f1bd8 236 store: Arc<ChunkStore>,
9335d74e 237 file: File,
43b13033 238 _lock: tools::ProcessLockSharedGuard,
4fbb72a8
DM
239 filename: PathBuf,
240 tmp_filename: PathBuf,
606ce64b
DM
241 chunk_size: usize,
242 size: usize,
e1225de4 243 index_length: usize,
606ce64b 244 index: *mut u8,
9f49fe1d
DM
245 pub uuid: [u8; 16],
246 pub ctime: u64,
606ce64b
DM
247}
248
c3bb97e5
WB
249// `index` is mmap()ed which cannot be thread-local so should be sendable
250unsafe impl Send for FixedIndexWriter {}
251
91a905b6 252impl Drop for FixedIndexWriter {
4fbb72a8
DM
253 fn drop(&mut self) {
254 let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors
255 if let Err(err) = self.unmap() {
0cd9d420 256 eprintln!("Unable to unmap file {:?} - {}", self.tmp_filename, err);
4fbb72a8
DM
257 }
258 }
259}
260
91a905b6 261impl FixedIndexWriter {
9fe2f639 262 #[allow(clippy::cast_ptr_alignment)]
f569acc5
WB
263 pub fn create(
264 store: Arc<ChunkStore>,
265 path: &Path,
266 size: usize,
267 chunk_size: usize,
268 ) -> Result<Self, Error> {
43b13033
DM
269 let shared_lock = store.try_shared_lock()?;
270
606ce64b 271 let full_path = store.relative_path(path);
4fbb72a8 272 let mut tmp_path = full_path.clone();
91a905b6 273 tmp_path.set_extension("tmp_fidx");
606ce64b
DM
274
275 let mut file = std::fs::OpenOptions::new()
f569acc5
WB
276 .create(true)
277 .truncate(true)
606ce64b
DM
278 .read(true)
279 .write(true)
4fbb72a8 280 .open(&tmp_path)?;
606ce64b 281
91a905b6 282 let header_size = std::mem::size_of::<FixedIndexHeader>();
d13e3745
DM
283
284 // todo: use static assertion when available in rust
f569acc5
WB
285 if header_size != 4096 {
286 panic!("got unexpected header size");
287 }
d13e3745 288
e693818a 289 let ctime = epoch_now_u64()?;
d13e3745 290
f569acc5 291 let uuid = Uuid::generate();
d13e3745 292
0cd9d420 293 let buffer = vec![0u8; header_size];
f569acc5 294 let header = unsafe { &mut *(buffer.as_ptr() as *mut FixedIndexHeader) };
d13e3745 295
a7dd4830 296 header.magic = super::FIXED_SIZED_CHUNK_INDEX_1_0;
48d0d356
DM
297 header.ctime = u64::to_le(ctime);
298 header.size = u64::to_le(size as u64);
299 header.chunk_size = u64::to_le(chunk_size as u64);
d13e3745
DM
300 header.uuid = *uuid.as_bytes();
301
9335d74e
DM
302 header.index_csum = [0u8; 32];
303
5e5b7f1c 304 file.write_all(&buffer)?;
d13e3745 305
f569acc5
WB
306 let index_length = (size + chunk_size - 1) / chunk_size;
307 let index_size = index_length * 32;
d13e3745
DM
308 nix::unistd::ftruncate(file.as_raw_fd(), (header_size + index_size) as i64)?;
309
f569acc5
WB
310 let data = unsafe {
311 nix::sys::mman::mmap(
312 std::ptr::null_mut(),
313 index_size,
314 nix::sys::mman::ProtFlags::PROT_READ | nix::sys::mman::ProtFlags::PROT_WRITE,
315 nix::sys::mman::MapFlags::MAP_SHARED,
316 file.as_raw_fd(),
317 header_size as i64,
318 )
319 }? as *mut u8;
d13e3745 320
606ce64b
DM
321 Ok(Self {
322 store,
9335d74e 323 file,
43b13033 324 _lock: shared_lock,
4fbb72a8
DM
325 filename: full_path,
326 tmp_filename: tmp_path,
606ce64b
DM
327 chunk_size,
328 size,
e1225de4 329 index_length,
606ce64b 330 index: data,
d13e3745
DM
331 ctime,
332 uuid: *uuid.as_bytes(),
606ce64b
DM
333 })
334 }
335
006f3ff4
DM
336 pub fn index_length(&self) -> usize {
337 self.index_length
338 }
339
4fbb72a8 340 fn unmap(&mut self) -> Result<(), Error> {
f569acc5
WB
341 if self.index == std::ptr::null_mut() {
342 return Ok(());
343 }
4fbb72a8 344
f569acc5 345 let index_size = self.index_length * 32;
4fbb72a8 346
f569acc5
WB
347 if let Err(err) =
348 unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) }
349 {
0cd9d420 350 bail!("unmap file {:?} failed - {}", self.tmp_filename, err);
4fbb72a8
DM
351 }
352
353 self.index = std::ptr::null_mut();
354
355 Ok(())
356 }
357
f569acc5
WB
358 pub fn close(&mut self) -> Result<[u8; 32], Error> {
359 if self.index == std::ptr::null_mut() {
360 bail!("cannot close already closed index file.");
361 }
4fbb72a8 362
f569acc5 363 let index_size = self.index_length * 32;
9335d74e 364 let data = unsafe { std::slice::from_raw_parts(self.index, index_size) };
f569acc5 365 let index_csum = openssl::sha::sha256(data);
9335d74e 366
4fbb72a8
DM
367 self.unmap()?;
368
9ea4bce4 369 let csum_offset = proxmox::offsetof!(FixedIndexHeader, index_csum);
afb4cd28 370 self.file.seek(SeekFrom::Start(csum_offset as u64))?;
9335d74e
DM
371 self.file.write_all(&index_csum)?;
372 self.file.flush()?;
373
4fbb72a8
DM
374 if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
375 bail!("Atomic rename file {:?} failed - {}", self.filename, err);
376 }
377
9335d74e 378 Ok(index_csum)
4fbb72a8
DM
379 }
380
5e04ec70 381 pub fn check_chunk_alignment(&self, offset: usize, chunk_len: usize) -> Result<usize, Error> {
5e04ec70
DM
382 if offset < chunk_len {
383 bail!("got chunk with small offset ({} < {}", offset, chunk_len);
f98ac774
DM
384 }
385
5e04ec70 386 let pos = offset - chunk_len;
606ce64b 387
5e04ec70
DM
388 if offset > self.size {
389 bail!("chunk data exceeds size ({} >= {})", offset, self.size);
606ce64b
DM
390 }
391
392 // last chunk can be smaller
f569acc5
WB
393 if ((offset != self.size) && (chunk_len != self.chunk_size))
394 || (chunk_len > self.chunk_size)
395 || (chunk_len == 0)
396 {
397 bail!(
398 "chunk with unexpected length ({} != {}",
399 chunk_len,
400 self.chunk_size
401 );
402 }
606ce64b 403
f569acc5 404 if pos & (self.chunk_size - 1) != 0 {
5e04ec70 405 bail!("got unaligned chunk (pos = {})", pos);
f98ac774
DM
406 }
407
5e04ec70
DM
408 Ok(pos / self.chunk_size)
409 }
410
411 // Note: We want to add data out of order, so do not assume any order here.
412 pub fn add_chunk(&mut self, chunk_info: &ChunkInfo, stat: &mut ChunkStat) -> Result<(), Error> {
5e04ec70
DM
413 let chunk_len = chunk_info.chunk_len as usize;
414 let offset = chunk_info.offset as usize; // end of chunk
415
416 let idx = self.check_chunk_alignment(offset, chunk_len)?;
417
f569acc5
WB
418 let (is_duplicate, compressed_size) = self
419 .store
420 .insert_chunk(&chunk_info.chunk, &chunk_info.digest)?;
798f7fa0 421
cb0708dd
DM
422 stat.chunk_count += 1;
423 stat.compressed_size += compressed_size;
606ce64b 424
4ee8f53d 425 let digest = &chunk_info.digest;
f98ac774 426
f569acc5
WB
427 println!(
428 "ADD CHUNK {} {} {}% {} {}",
429 idx,
430 chunk_len,
431 (compressed_size * 100) / (chunk_len as u64),
432 is_duplicate,
433 proxmox::tools::digest_to_hex(digest)
434 );
798f7fa0
DM
435
436 if is_duplicate {
cb0708dd 437 stat.duplicate_chunks += 1;
798f7fa0 438 } else {
cb0708dd 439 stat.disk_size += compressed_size;
798f7fa0 440 }
606ce64b 441
5e04ec70 442 self.add_digest(idx, digest)
e3062f87
WB
443 }
444
445 pub fn add_digest(&mut self, index: usize, digest: &[u8; 32]) -> Result<(), Error> {
fc14b849 446 if index >= self.index_length {
f569acc5
WB
447 bail!(
448 "add digest failed - index out of range ({} >= {})",
449 index,
450 self.index_length
451 );
fc14b849
DM
452 }
453
f569acc5
WB
454 if self.index == std::ptr::null_mut() {
455 bail!("cannot write to closed index file.");
456 }
01af11f3 457
f569acc5 458 let index_pos = index * 32;
606ce64b
DM
459 unsafe {
460 let dst = self.index.add(index_pos);
461 dst.copy_from_nonoverlapping(digest.as_ptr(), 32);
462 }
463
464 Ok(())
465 }
facd9801
SR
466
467 pub fn clone_data_from(&mut self, reader: &FixedIndexReader) -> Result<(), Error> {
468 if self.index_length != reader.index_count() {
469 bail!("clone_data_from failed - index sizes not equal");
470 }
471
472 for i in 0..self.index_length {
473 self.add_digest(i, reader.index_digest(i).unwrap())?;
474 }
475
476 Ok(())
477 }
606ce64b 478}
afb4cd28
DM
479
480pub struct BufferedFixedReader<S> {
481 store: S,
482 index: FixedIndexReader,
483 archive_size: u64,
484 read_buffer: Vec<u8>,
485 buffered_chunk_idx: usize,
486 buffered_chunk_start: u64,
487 read_offset: u64,
488}
489
f569acc5 490impl<S: ReadChunk> BufferedFixedReader<S> {
afb4cd28 491 pub fn new(index: FixedIndexReader, store: S) -> Self {
afb4cd28
DM
492 let archive_size = index.size;
493 Self {
494 store,
653b1ca1
WB
495 index,
496 archive_size,
f569acc5 497 read_buffer: Vec::with_capacity(1024 * 1024),
afb4cd28
DM
498 buffered_chunk_idx: 0,
499 buffered_chunk_start: 0,
500 read_offset: 0,
501 }
502 }
503
f569acc5
WB
504 pub fn archive_size(&self) -> u64 {
505 self.archive_size
506 }
afb4cd28
DM
507
508 fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
afb4cd28 509 let index = &self.index;
fdaab0df
DM
510 let info = match index.chunk_info(idx) {
511 Some(info) => info,
512 None => bail!("chunk index out of range"),
513 };
afb4cd28
DM
514
515 // fixme: avoid copy
516
fdaab0df
DM
517 let data = self.store.read_chunk(&info.digest)?;
518 let size = info.range.end - info.range.start;
519 if size != data.len() as u64 {
520 bail!("read chunk with wrong size ({} != {}", size, data.len());
afb4cd28
DM
521 }
522
523 self.read_buffer.clear();
524 self.read_buffer.extend_from_slice(&data);
525
526 self.buffered_chunk_idx = idx;
527
fdaab0df 528 self.buffered_chunk_start = info.range.start as u64;
afb4cd28
DM
529 Ok(())
530 }
531}
532
f569acc5 533impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> {
afb4cd28 534 fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
f569acc5
WB
535 if offset == self.archive_size {
536 return Ok(&self.read_buffer[0..0]);
537 }
afb4cd28
DM
538
539 let buffer_len = self.read_buffer.len();
540 let index = &self.index;
541
542 // optimization for sequential read
f569acc5
WB
543 if buffer_len > 0
544 && ((self.buffered_chunk_idx + 1) < index.index_length)
545 && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
afb4cd28
DM
546 {
547 let next_idx = self.buffered_chunk_idx + 1;
548 let next_end = index.chunk_end(next_idx);
549 if offset < next_end {
550 self.buffer_chunk(next_idx)?;
551 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
552 return Ok(&self.read_buffer[buffer_offset..]);
553 }
554 }
555
f569acc5
WB
556 if (buffer_len == 0)
557 || (offset < self.buffered_chunk_start)
558 || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
afb4cd28
DM
559 {
560 let idx = (offset / index.chunk_size as u64) as usize;
561 self.buffer_chunk(idx)?;
f569acc5 562 }
afb4cd28
DM
563
564 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
565 Ok(&self.read_buffer[buffer_offset..])
566 }
567}
568
f569acc5 569impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> {
afb4cd28 570 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
afb4cd28 571 use crate::tools::BufferedRead;
f569acc5 572 use std::io::{Error, ErrorKind};
afb4cd28
DM
573
574 let data = match self.buffered_read(self.read_offset) {
575 Ok(v) => v,
576 Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
577 };
578
f569acc5
WB
579 let n = if data.len() > buf.len() {
580 buf.len()
581 } else {
582 data.len()
583 };
afb4cd28 584
f569acc5
WB
585 unsafe {
586 std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n);
587 }
afb4cd28
DM
588
589 self.read_offset += n as u64;
590
62ee2eb4 591 Ok(n)
afb4cd28
DM
592 }
593}
594
f569acc5 595impl<S: ReadChunk> Seek for BufferedFixedReader<S> {
afb4cd28 596 fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
afb4cd28 597 let new_offset = match pos {
f569acc5
WB
598 SeekFrom::Start(start_offset) => start_offset as i64,
599 SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
afb4cd28
DM
600 SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
601 };
602
603 use std::io::{Error, ErrorKind};
604 if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
605 return Err(Error::new(
606 ErrorKind::Other,
f569acc5
WB
607 format!(
608 "seek is out of range {} ([0..{}])",
609 new_offset, self.archive_size
610 ),
611 ));
afb4cd28
DM
612 }
613 self.read_offset = new_offset as u64;
614
615 Ok(self.read_offset)
616 }
617}