1 use anyhow
::{bail, format_err, Error}
;
2 use std
::io
::{Seek, SeekFrom}
;
4 use super::chunk_stat
::*;
5 use super::chunk_store
::*;
6 use super::{IndexFile, ChunkReadInfo}
;
7 use crate::tools
::{self, epoch_now_u64}
;
9 use chrono
::{Local, TimeZone}
;
12 use std
::os
::unix
::io
::AsRawFd
;
13 use std
::path
::{Path, PathBuf}
;
16 use super::read_chunk
::*;
19 use proxmox
::tools
::io
::ReadExt
;
20 use proxmox
::tools
::Uuid
;
22 /// Header format definition for fixed index files (`.fidx`)
24 pub struct FixedIndexHeader
{
28 /// Sha256 over the index ``SHA256(digest1||digest2||...)``
29 pub index_csum
: [u8; 32],
32 reserved
: [u8; 4016], // overall size is one page (4096 bytes)
34 proxmox
::static_assert_size
!(FixedIndexHeader
, 4096);
36 // split image into fixed size chunks
38 pub struct FixedIndexReader
{
40 pub chunk_size
: usize,
46 pub index_csum
: [u8; 32],
49 // `index` is mmap()ed which cannot be thread-local so should be sendable
50 unsafe impl Send
for FixedIndexReader {}
51 unsafe impl Sync
for FixedIndexReader {}
53 impl Drop
for FixedIndexReader
{
55 if let Err(err
) = self.unmap() {
56 eprintln
!("Unable to unmap file - {}", err
);
61 impl FixedIndexReader
{
62 pub fn open(path
: &Path
) -> Result
<Self, Error
> {
65 .and_then(|file
| Self::new(file
))
66 .map_err(|err
| format_err
!("Unable to open fixed index {:?} - {}", path
, err
))
69 pub fn new(mut file
: std
::fs
::File
) -> Result
<Self, Error
> {
71 nix
::fcntl
::flock(file
.as_raw_fd(), nix
::fcntl
::FlockArg
::LockSharedNonblock
)
73 bail
!("unable to get shared lock - {}", err
);
76 file
.seek(SeekFrom
::Start(0))?
;
78 let header_size
= std
::mem
::size_of
::<FixedIndexHeader
>();
79 let header
: Box
<FixedIndexHeader
> = unsafe { file.read_host_value_boxed()? }
;
81 if header
.magic
!= super::FIXED_SIZED_CHUNK_INDEX_1_0
{
82 bail
!("got unknown magic number");
85 let size
= u64::from_le(header
.size
);
86 let ctime
= u64::from_le(header
.ctime
);
87 let chunk_size
= u64::from_le(header
.chunk_size
);
89 let index_length
= ((size
+ chunk_size
- 1) / chunk_size
) as usize;
90 let index_size
= index_length
* 32;
92 let rawfd
= file
.as_raw_fd();
94 let stat
= match nix
::sys
::stat
::fstat(rawfd
) {
96 Err(err
) => bail
!("fstat failed - {}", err
),
99 let expected_index_size
= (stat
.st_size
as usize) - header_size
;
100 if index_size
!= expected_index_size
{
102 "got unexpected file size ({} != {})",
109 nix
::sys
::mman
::mmap(
110 std
::ptr
::null_mut(),
112 nix
::sys
::mman
::ProtFlags
::PROT_READ
,
113 nix
::sys
::mman
::MapFlags
::MAP_PRIVATE
,
121 chunk_size
: chunk_size
as usize,
127 index_csum
: header
.index_csum
,
131 fn unmap(&mut self) -> Result
<(), Error
> {
132 if self.index
== std
::ptr
::null_mut() {
136 let index_size
= self.index_length
* 32;
139 unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) }
141 bail
!("unmap file failed - {}", err
);
144 self.index
= std
::ptr
::null_mut();
150 fn chunk_end(&self, pos
: usize) -> u64 {
151 if pos
>= self.index_length
{
152 panic
!("chunk index out of range");
155 let end
= ((pos
+ 1) * self.chunk_size
) as u64;
163 pub fn print_info(&self) {
164 println
!("Size: {}", self.size
);
165 println
!("ChunkSize: {}", self.chunk_size
);
168 Local
.timestamp(self.ctime
as i64, 0).format("%c")
170 println
!("UUID: {:?}", self.uuid
);
174 impl IndexFile
for FixedIndexReader
{
175 fn index_count(&self) -> usize {
179 fn index_digest(&self, pos
: usize) -> Option
<&[u8; 32]> {
180 if pos
>= self.index_length
{
183 Some(unsafe { std::mem::transmute(self.index.add(pos * 32)) }
)
187 fn index_bytes(&self) -> u64 {
191 fn chunk_info(&self, pos
: usize) -> Option
<ChunkReadInfo
> {
192 if pos
>= self.index_length
{
196 let start
= (pos
* self.chunk_size
) as u64;
197 let mut end
= start
+ self.chunk_size
as u64;
203 let digest
= self.index_digest(pos
).unwrap();
210 fn compute_csum(&self) -> ([u8; 32], u64) {
211 let mut csum
= openssl
::sha
::Sha256
::new();
212 let mut chunk_end
= 0;
213 for pos
in 0..self.index_count() {
214 let info
= self.chunk_info(pos
).unwrap();
215 chunk_end
= info
.range
.end
;
216 csum
.update(&info
.digest
);
218 let csum
= csum
.finish();
224 pub struct FixedIndexWriter
{
225 store
: Arc
<ChunkStore
>,
227 _lock
: tools
::ProcessLockSharedGuard
,
229 tmp_filename
: PathBuf
,
238 // `index` is mmap()ed which cannot be thread-local so should be sendable
239 unsafe impl Send
for FixedIndexWriter {}
241 impl Drop
for FixedIndexWriter
{
243 let _
= std
::fs
::remove_file(&self.tmp_filename
); // ignore errors
244 if let Err(err
) = self.unmap() {
245 eprintln
!("Unable to unmap file {:?} - {}", self.tmp_filename
, err
);
250 impl FixedIndexWriter
{
251 #[allow(clippy::cast_ptr_alignment)]
253 store
: Arc
<ChunkStore
>,
257 ) -> Result
<Self, Error
> {
258 let shared_lock
= store
.try_shared_lock()?
;
260 let full_path
= store
.relative_path(path
);
261 let mut tmp_path
= full_path
.clone();
262 tmp_path
.set_extension("tmp_fidx");
264 let mut file
= std
::fs
::OpenOptions
::new()
271 let header_size
= std
::mem
::size_of
::<FixedIndexHeader
>();
273 // todo: use static assertion when available in rust
274 if header_size
!= 4096 {
275 panic
!("got unexpected header size");
278 let ctime
= epoch_now_u64()?
;
280 let uuid
= Uuid
::generate();
282 let buffer
= vec
![0u8; header_size
];
283 let header
= unsafe { &mut *(buffer.as_ptr() as *mut FixedIndexHeader) }
;
285 header
.magic
= super::FIXED_SIZED_CHUNK_INDEX_1_0
;
286 header
.ctime
= u64::to_le(ctime
);
287 header
.size
= u64::to_le(size
as u64);
288 header
.chunk_size
= u64::to_le(chunk_size
as u64);
289 header
.uuid
= *uuid
.as_bytes();
291 header
.index_csum
= [0u8; 32];
293 file
.write_all(&buffer
)?
;
295 let index_length
= (size
+ chunk_size
- 1) / chunk_size
;
296 let index_size
= index_length
* 32;
297 nix
::unistd
::ftruncate(file
.as_raw_fd(), (header_size
+ index_size
) as i64)?
;
300 nix
::sys
::mman
::mmap(
301 std
::ptr
::null_mut(),
303 nix
::sys
::mman
::ProtFlags
::PROT_READ
| nix
::sys
::mman
::ProtFlags
::PROT_WRITE
,
304 nix
::sys
::mman
::MapFlags
::MAP_SHARED
,
315 tmp_filename
: tmp_path
,
321 uuid
: *uuid
.as_bytes(),
325 pub fn index_length(&self) -> usize {
329 fn unmap(&mut self) -> Result
<(), Error
> {
330 if self.index
== std
::ptr
::null_mut() {
334 let index_size
= self.index_length
* 32;
337 unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) }
339 bail
!("unmap file {:?} failed - {}", self.tmp_filename
, err
);
342 self.index
= std
::ptr
::null_mut();
347 pub fn close(&mut self) -> Result
<[u8; 32], Error
> {
348 if self.index
== std
::ptr
::null_mut() {
349 bail
!("cannot close already closed index file.");
352 let index_size
= self.index_length
* 32;
353 let data
= unsafe { std::slice::from_raw_parts(self.index, index_size) }
;
354 let index_csum
= openssl
::sha
::sha256(data
);
358 let csum_offset
= proxmox
::offsetof!(FixedIndexHeader
, index_csum
);
359 self.file
.seek(SeekFrom
::Start(csum_offset
as u64))?
;
360 self.file
.write_all(&index_csum
)?
;
363 if let Err(err
) = std
::fs
::rename(&self.tmp_filename
, &self.filename
) {
364 bail
!("Atomic rename file {:?} failed - {}", self.filename
, err
);
370 pub fn check_chunk_alignment(&self, offset
: usize, chunk_len
: usize) -> Result
<usize, Error
> {
371 if offset
< chunk_len
{
372 bail
!("got chunk with small offset ({} < {}", offset
, chunk_len
);
375 let pos
= offset
- chunk_len
;
377 if offset
> self.size
{
378 bail
!("chunk data exceeds size ({} >= {})", offset
, self.size
);
381 // last chunk can be smaller
382 if ((offset
!= self.size
) && (chunk_len
!= self.chunk_size
))
383 || (chunk_len
> self.chunk_size
)
387 "chunk with unexpected length ({} != {}",
393 if pos
& (self.chunk_size
- 1) != 0 {
394 bail
!("got unaligned chunk (pos = {})", pos
);
397 Ok(pos
/ self.chunk_size
)
400 // Note: We want to add data out of order, so do not assume any order here.
401 pub fn add_chunk(&mut self, chunk_info
: &ChunkInfo
, stat
: &mut ChunkStat
) -> Result
<(), Error
> {
402 let chunk_len
= chunk_info
.chunk_len
as usize;
403 let offset
= chunk_info
.offset
as usize; // end of chunk
405 let idx
= self.check_chunk_alignment(offset
, chunk_len
)?
;
407 let (is_duplicate
, compressed_size
) = self
409 .insert_chunk(&chunk_info
.chunk
, &chunk_info
.digest
)?
;
411 stat
.chunk_count
+= 1;
412 stat
.compressed_size
+= compressed_size
;
414 let digest
= &chunk_info
.digest
;
417 "ADD CHUNK {} {} {}% {} {}",
420 (compressed_size
* 100) / (chunk_len
as u64),
422 proxmox
::tools
::digest_to_hex(digest
)
426 stat
.duplicate_chunks
+= 1;
428 stat
.disk_size
+= compressed_size
;
431 self.add_digest(idx
, digest
)
434 pub fn add_digest(&mut self, index
: usize, digest
: &[u8; 32]) -> Result
<(), Error
> {
435 if index
>= self.index_length
{
437 "add digest failed - index out of range ({} >= {})",
443 if self.index
== std
::ptr
::null_mut() {
444 bail
!("cannot write to closed index file.");
447 let index_pos
= index
* 32;
449 let dst
= self.index
.add(index_pos
);
450 dst
.copy_from_nonoverlapping(digest
.as_ptr(), 32);
456 pub fn clone_data_from(&mut self, reader
: &FixedIndexReader
) -> Result
<(), Error
> {
457 if self.index_length
!= reader
.index_count() {
458 bail
!("clone_data_from failed - index sizes not equal");
461 for i
in 0..self.index_length
{
462 self.add_digest(i
, reader
.index_digest(i
).unwrap())?
;
469 pub struct BufferedFixedReader
<S
> {
471 index
: FixedIndexReader
,
473 read_buffer
: Vec
<u8>,
474 buffered_chunk_idx
: usize,
475 buffered_chunk_start
: u64,
479 impl<S
: ReadChunk
> BufferedFixedReader
<S
> {
480 pub fn new(index
: FixedIndexReader
, store
: S
) -> Self {
481 let archive_size
= index
.size
;
486 read_buffer
: Vec
::with_capacity(1024 * 1024),
487 buffered_chunk_idx
: 0,
488 buffered_chunk_start
: 0,
493 pub fn archive_size(&self) -> u64 {
497 fn buffer_chunk(&mut self, idx
: usize) -> Result
<(), Error
> {
498 let index
= &self.index
;
499 let info
= match index
.chunk_info(idx
) {
501 None
=> bail
!("chunk index out of range"),
506 let data
= self.store
.read_chunk(&info
.digest
)?
;
507 let size
= info
.range
.end
- info
.range
.start
;
508 if size
!= data
.len() as u64 {
509 bail
!("read chunk with wrong size ({} != {}", size
, data
.len());
512 self.read_buffer
.clear();
513 self.read_buffer
.extend_from_slice(&data
);
515 self.buffered_chunk_idx
= idx
;
517 self.buffered_chunk_start
= info
.range
.start
as u64;
522 impl<S
: ReadChunk
> crate::tools
::BufferedRead
for BufferedFixedReader
<S
> {
523 fn buffered_read(&mut self, offset
: u64) -> Result
<&[u8], Error
> {
524 if offset
== self.archive_size
{
525 return Ok(&self.read_buffer
[0..0]);
528 let buffer_len
= self.read_buffer
.len();
529 let index
= &self.index
;
531 // optimization for sequential read
533 && ((self.buffered_chunk_idx
+ 1) < index
.index_length
)
534 && (offset
>= (self.buffered_chunk_start
+ (self.read_buffer
.len() as u64)))
536 let next_idx
= self.buffered_chunk_idx
+ 1;
537 let next_end
= index
.chunk_end(next_idx
);
538 if offset
< next_end
{
539 self.buffer_chunk(next_idx
)?
;
540 let buffer_offset
= (offset
- self.buffered_chunk_start
) as usize;
541 return Ok(&self.read_buffer
[buffer_offset
..]);
546 || (offset
< self.buffered_chunk_start
)
547 || (offset
>= (self.buffered_chunk_start
+ (self.read_buffer
.len() as u64)))
549 let idx
= (offset
/ index
.chunk_size
as u64) as usize;
550 self.buffer_chunk(idx
)?
;
553 let buffer_offset
= (offset
- self.buffered_chunk_start
) as usize;
554 Ok(&self.read_buffer
[buffer_offset
..])
558 impl<S
: ReadChunk
> std
::io
::Read
for BufferedFixedReader
<S
> {
559 fn read(&mut self, buf
: &mut [u8]) -> Result
<usize, std
::io
::Error
> {
560 use crate::tools
::BufferedRead
;
561 use std
::io
::{Error, ErrorKind}
;
563 let data
= match self.buffered_read(self.read_offset
) {
565 Err(err
) => return Err(Error
::new(ErrorKind
::Other
, err
.to_string())),
568 let n
= if data
.len() > buf
.len() {
575 std
::ptr
::copy_nonoverlapping(data
.as_ptr(), buf
.as_mut_ptr(), n
);
578 self.read_offset
+= n
as u64;
584 impl<S
: ReadChunk
> Seek
for BufferedFixedReader
<S
> {
585 fn seek(&mut self, pos
: SeekFrom
) -> Result
<u64, std
::io
::Error
> {
586 let new_offset
= match pos
{
587 SeekFrom
::Start(start_offset
) => start_offset
as i64,
588 SeekFrom
::End(end_offset
) => (self.archive_size
as i64) + end_offset
,
589 SeekFrom
::Current(offset
) => (self.read_offset
as i64) + offset
,
592 use std
::io
::{Error, ErrorKind}
;
593 if (new_offset
< 0) || (new_offset
> (self.archive_size
as i64)) {
594 return Err(Error
::new(
597 "seek is out of range {} ([0..{}])",
598 new_offset
, self.archive_size
602 self.read_offset
= new_offset
as u64;