2 use std
::convert
::TryInto
;
3 use std
::io
::{Seek, SeekFrom}
;
5 use super::chunk_stat
::*;
6 use super::chunk_store
::*;
10 use chrono
::{Local, TimeZone}
;
13 use std
::os
::unix
::io
::AsRawFd
;
14 use std
::path
::{Path, PathBuf}
;
17 use super::read_chunk
::*;
20 use proxmox
::tools
::io
::ReadExt
;
21 use proxmox
::tools
::Uuid
;
23 /// Header format definition for fixed index files (`.fidx`)
25 pub struct FixedIndexHeader
{
29 /// Sha256 over the index ``SHA256(digest1||digest2||...)``
30 pub index_csum
: [u8; 32],
33 reserved
: [u8; 4016], // overall size is one page (4096 bytes)
35 proxmox
::tools
::static_assert_size
!(FixedIndexHeader
, 4096);
37 // split image into fixed size chunks
39 pub struct FixedIndexReader
{
41 pub chunk_size
: usize,
47 pub index_csum
: [u8; 32],
50 // `index` is mmap()ed which cannot be thread-local so should be sendable
51 unsafe impl Send
for FixedIndexReader {}
52 unsafe impl Sync
for FixedIndexReader {}
54 impl Drop
for FixedIndexReader
{
56 if let Err(err
) = self.unmap() {
57 eprintln
!("Unable to unmap file - {}", err
);
62 impl FixedIndexReader
{
63 pub fn open(path
: &Path
) -> Result
<Self, Error
> {
66 .and_then(|file
| Self::new(file
))
67 .map_err(|err
| format_err
!("Unable to open fixed index {:?} - {}", path
, err
))
70 pub fn new(mut file
: std
::fs
::File
) -> Result
<Self, Error
> {
72 nix
::fcntl
::flock(file
.as_raw_fd(), nix
::fcntl
::FlockArg
::LockSharedNonblock
)
74 bail
!("unable to get shared lock - {}", err
);
77 file
.seek(SeekFrom
::Start(0))?
;
79 let header_size
= std
::mem
::size_of
::<FixedIndexHeader
>();
80 let header
: Box
<FixedIndexHeader
> = unsafe { file.read_host_value_boxed()? }
;
82 if header
.magic
!= super::FIXED_SIZED_CHUNK_INDEX_1_0
{
83 bail
!("got unknown magic number");
86 let size
= u64::from_le(header
.size
);
87 let ctime
= u64::from_le(header
.ctime
);
88 let chunk_size
= u64::from_le(header
.chunk_size
);
90 let index_length
= ((size
+ chunk_size
- 1) / chunk_size
) as usize;
91 let index_size
= index_length
* 32;
93 let rawfd
= file
.as_raw_fd();
95 let stat
= match nix
::sys
::stat
::fstat(rawfd
) {
97 Err(err
) => bail
!("fstat failed - {}", err
),
100 let expected_index_size
= (stat
.st_size
as usize) - header_size
;
101 if index_size
!= expected_index_size
{
103 "got unexpected file size ({} != {})",
110 nix
::sys
::mman
::mmap(
111 std
::ptr
::null_mut(),
113 nix
::sys
::mman
::ProtFlags
::PROT_READ
,
114 nix
::sys
::mman
::MapFlags
::MAP_PRIVATE
,
122 chunk_size
: chunk_size
as usize,
128 index_csum
: header
.index_csum
,
132 fn unmap(&mut self) -> Result
<(), Error
> {
133 if self.index
== std
::ptr
::null_mut() {
137 let index_size
= self.index_length
* 32;
140 unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) }
142 bail
!("unmap file failed - {}", err
);
145 self.index
= std
::ptr
::null_mut();
150 pub fn chunk_info(&self, pos
: usize) -> Result
<(u64, u64, [u8; 32]), Error
> {
151 if pos
>= self.index_length
{
152 bail
!("chunk index out of range");
154 let start
= (pos
* self.chunk_size
) as u64;
155 let mut end
= start
+ self.chunk_size
as u64;
161 let mut digest
= std
::mem
::MaybeUninit
::<[u8; 32]>::uninit();
163 std
::ptr
::copy_nonoverlapping(
164 self.index
.add(pos
* 32),
165 (*digest
.as_mut_ptr()).as_mut_ptr(),
170 Ok((start
, end
, unsafe { digest.assume_init() }
))
174 fn chunk_digest(&self, pos
: usize) -> &[u8; 32] {
175 if pos
>= self.index_length
{
176 panic
!("chunk index out of range");
178 let slice
= unsafe { std::slice::from_raw_parts(self.index.add(pos * 32), 32) }
;
179 slice
.try_into().unwrap()
183 fn chunk_end(&self, pos
: usize) -> u64 {
184 if pos
>= self.index_length
{
185 panic
!("chunk index out of range");
188 let end
= ((pos
+ 1) * self.chunk_size
) as u64;
196 /// Compute checksum and data size
197 pub fn compute_csum(&self) -> ([u8; 32], u64) {
198 let mut csum
= openssl
::sha
::Sha256
::new();
199 let mut chunk_end
= 0;
200 for pos
in 0..self.index_length
{
201 chunk_end
= ((pos
+ 1) * self.chunk_size
) as u64;
202 let digest
= self.chunk_digest(pos
);
205 let csum
= csum
.finish();
210 pub fn print_info(&self) {
211 println
!("Size: {}", self.size
);
212 println
!("ChunkSize: {}", self.chunk_size
);
215 Local
.timestamp(self.ctime
as i64, 0).format("%c")
217 println
!("UUID: {:?}", self.uuid
);
221 impl IndexFile
for FixedIndexReader
{
222 fn index_count(&self) -> usize {
226 fn index_digest(&self, pos
: usize) -> Option
<&[u8; 32]> {
227 if pos
>= self.index_length
{
230 Some(unsafe { std::mem::transmute(self.index.add(pos * 32)) }
)
234 fn index_bytes(&self) -> u64 {
239 pub struct FixedIndexWriter
{
240 store
: Arc
<ChunkStore
>,
242 _lock
: tools
::ProcessLockSharedGuard
,
244 tmp_filename
: PathBuf
,
253 // `index` is mmap()ed which cannot be thread-local so should be sendable
254 unsafe impl Send
for FixedIndexWriter {}
256 impl Drop
for FixedIndexWriter
{
258 let _
= std
::fs
::remove_file(&self.tmp_filename
); // ignore errors
259 if let Err(err
) = self.unmap() {
260 eprintln
!("Unable to unmap file {:?} - {}", self.tmp_filename
, err
);
265 impl FixedIndexWriter
{
266 #[allow(clippy::cast_ptr_alignment)]
268 store
: Arc
<ChunkStore
>,
272 ) -> Result
<Self, Error
> {
273 let shared_lock
= store
.try_shared_lock()?
;
275 let full_path
= store
.relative_path(path
);
276 let mut tmp_path
= full_path
.clone();
277 tmp_path
.set_extension("tmp_fidx");
279 let mut file
= std
::fs
::OpenOptions
::new()
286 let header_size
= std
::mem
::size_of
::<FixedIndexHeader
>();
288 // todo: use static assertion when available in rust
289 if header_size
!= 4096 {
290 panic
!("got unexpected header size");
293 let ctime
= std
::time
::SystemTime
::now()
294 .duration_since(std
::time
::SystemTime
::UNIX_EPOCH
)?
297 let uuid
= Uuid
::generate();
299 let buffer
= vec
![0u8; header_size
];
300 let header
= unsafe { &mut *(buffer.as_ptr() as *mut FixedIndexHeader) }
;
302 header
.magic
= super::FIXED_SIZED_CHUNK_INDEX_1_0
;
303 header
.ctime
= u64::to_le(ctime
);
304 header
.size
= u64::to_le(size
as u64);
305 header
.chunk_size
= u64::to_le(chunk_size
as u64);
306 header
.uuid
= *uuid
.as_bytes();
308 header
.index_csum
= [0u8; 32];
310 file
.write_all(&buffer
)?
;
312 let index_length
= (size
+ chunk_size
- 1) / chunk_size
;
313 let index_size
= index_length
* 32;
314 nix
::unistd
::ftruncate(file
.as_raw_fd(), (header_size
+ index_size
) as i64)?
;
317 nix
::sys
::mman
::mmap(
318 std
::ptr
::null_mut(),
320 nix
::sys
::mman
::ProtFlags
::PROT_READ
| nix
::sys
::mman
::ProtFlags
::PROT_WRITE
,
321 nix
::sys
::mman
::MapFlags
::MAP_SHARED
,
332 tmp_filename
: tmp_path
,
338 uuid
: *uuid
.as_bytes(),
342 pub fn index_length(&self) -> usize {
346 fn unmap(&mut self) -> Result
<(), Error
> {
347 if self.index
== std
::ptr
::null_mut() {
351 let index_size
= self.index_length
* 32;
354 unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) }
356 bail
!("unmap file {:?} failed - {}", self.tmp_filename
, err
);
359 self.index
= std
::ptr
::null_mut();
364 pub fn close(&mut self) -> Result
<[u8; 32], Error
> {
365 if self.index
== std
::ptr
::null_mut() {
366 bail
!("cannot close already closed index file.");
369 let index_size
= self.index_length
* 32;
370 let data
= unsafe { std::slice::from_raw_parts(self.index, index_size) }
;
371 let index_csum
= openssl
::sha
::sha256(data
);
375 let csum_offset
= proxmox
::tools
::offsetof!(FixedIndexHeader
, index_csum
);
376 self.file
.seek(SeekFrom
::Start(csum_offset
as u64))?
;
377 self.file
.write_all(&index_csum
)?
;
380 if let Err(err
) = std
::fs
::rename(&self.tmp_filename
, &self.filename
) {
381 bail
!("Atomic rename file {:?} failed - {}", self.filename
, err
);
387 pub fn check_chunk_alignment(&self, offset
: usize, chunk_len
: usize) -> Result
<usize, Error
> {
388 if offset
< chunk_len
{
389 bail
!("got chunk with small offset ({} < {}", offset
, chunk_len
);
392 let pos
= offset
- chunk_len
;
394 if offset
> self.size
{
395 bail
!("chunk data exceeds size ({} >= {})", offset
, self.size
);
398 // last chunk can be smaller
399 if ((offset
!= self.size
) && (chunk_len
!= self.chunk_size
))
400 || (chunk_len
> self.chunk_size
)
404 "chunk with unexpected length ({} != {}",
410 if pos
& (self.chunk_size
- 1) != 0 {
411 bail
!("got unaligned chunk (pos = {})", pos
);
414 Ok(pos
/ self.chunk_size
)
417 // Note: We want to add data out of order, so do not assume any order here.
418 pub fn add_chunk(&mut self, chunk_info
: &ChunkInfo
, stat
: &mut ChunkStat
) -> Result
<(), Error
> {
419 let chunk_len
= chunk_info
.chunk_len
as usize;
420 let offset
= chunk_info
.offset
as usize; // end of chunk
422 let idx
= self.check_chunk_alignment(offset
, chunk_len
)?
;
424 let (is_duplicate
, compressed_size
) = self
426 .insert_chunk(&chunk_info
.chunk
, &chunk_info
.digest
)?
;
428 stat
.chunk_count
+= 1;
429 stat
.compressed_size
+= compressed_size
;
431 let digest
= &chunk_info
.digest
;
434 "ADD CHUNK {} {} {}% {} {}",
437 (compressed_size
* 100) / (chunk_len
as u64),
439 proxmox
::tools
::digest_to_hex(digest
)
443 stat
.duplicate_chunks
+= 1;
445 stat
.disk_size
+= compressed_size
;
448 self.add_digest(idx
, digest
)
451 pub fn add_digest(&mut self, index
: usize, digest
: &[u8; 32]) -> Result
<(), Error
> {
452 if index
>= self.index_length
{
454 "add digest failed - index out of range ({} >= {})",
460 if self.index
== std
::ptr
::null_mut() {
461 bail
!("cannot write to closed index file.");
464 let index_pos
= index
* 32;
466 let dst
= self.index
.add(index_pos
);
467 dst
.copy_from_nonoverlapping(digest
.as_ptr(), 32);
474 pub struct BufferedFixedReader
<S
> {
476 index
: FixedIndexReader
,
478 read_buffer
: Vec
<u8>,
479 buffered_chunk_idx
: usize,
480 buffered_chunk_start
: u64,
484 impl<S
: ReadChunk
> BufferedFixedReader
<S
> {
485 pub fn new(index
: FixedIndexReader
, store
: S
) -> Self {
486 let archive_size
= index
.size
;
491 read_buffer
: Vec
::with_capacity(1024 * 1024),
492 buffered_chunk_idx
: 0,
493 buffered_chunk_start
: 0,
498 pub fn archive_size(&self) -> u64 {
502 fn buffer_chunk(&mut self, idx
: usize) -> Result
<(), Error
> {
503 let index
= &self.index
;
504 let (start
, end
, digest
) = index
.chunk_info(idx
)?
;
508 let data
= self.store
.read_chunk(&digest
)?
;
510 if (end
- start
) != data
.len() as u64 {
512 "read chunk with wrong size ({} != {}",
518 self.read_buffer
.clear();
519 self.read_buffer
.extend_from_slice(&data
);
521 self.buffered_chunk_idx
= idx
;
523 self.buffered_chunk_start
= start
as u64;
524 //println!("BUFFER {} {}", self.buffered_chunk_start, end);
529 impl<S
: ReadChunk
> crate::tools
::BufferedRead
for BufferedFixedReader
<S
> {
530 fn buffered_read(&mut self, offset
: u64) -> Result
<&[u8], Error
> {
531 if offset
== self.archive_size
{
532 return Ok(&self.read_buffer
[0..0]);
535 let buffer_len
= self.read_buffer
.len();
536 let index
= &self.index
;
538 // optimization for sequential read
540 && ((self.buffered_chunk_idx
+ 1) < index
.index_length
)
541 && (offset
>= (self.buffered_chunk_start
+ (self.read_buffer
.len() as u64)))
543 let next_idx
= self.buffered_chunk_idx
+ 1;
544 let next_end
= index
.chunk_end(next_idx
);
545 if offset
< next_end
{
546 self.buffer_chunk(next_idx
)?
;
547 let buffer_offset
= (offset
- self.buffered_chunk_start
) as usize;
548 return Ok(&self.read_buffer
[buffer_offset
..]);
553 || (offset
< self.buffered_chunk_start
)
554 || (offset
>= (self.buffered_chunk_start
+ (self.read_buffer
.len() as u64)))
556 let idx
= (offset
/ index
.chunk_size
as u64) as usize;
557 self.buffer_chunk(idx
)?
;
560 let buffer_offset
= (offset
- self.buffered_chunk_start
) as usize;
561 Ok(&self.read_buffer
[buffer_offset
..])
565 impl<S
: ReadChunk
> std
::io
::Read
for BufferedFixedReader
<S
> {
566 fn read(&mut self, buf
: &mut [u8]) -> Result
<usize, std
::io
::Error
> {
567 use crate::tools
::BufferedRead
;
568 use std
::io
::{Error, ErrorKind}
;
570 let data
= match self.buffered_read(self.read_offset
) {
572 Err(err
) => return Err(Error
::new(ErrorKind
::Other
, err
.to_string())),
575 let n
= if data
.len() > buf
.len() {
582 std
::ptr
::copy_nonoverlapping(data
.as_ptr(), buf
.as_mut_ptr(), n
);
585 self.read_offset
+= n
as u64;
591 impl<S
: ReadChunk
> Seek
for BufferedFixedReader
<S
> {
592 fn seek(&mut self, pos
: SeekFrom
) -> Result
<u64, std
::io
::Error
> {
593 let new_offset
= match pos
{
594 SeekFrom
::Start(start_offset
) => start_offset
as i64,
595 SeekFrom
::End(end_offset
) => (self.archive_size
as i64) + end_offset
,
596 SeekFrom
::Current(offset
) => (self.read_offset
as i64) + offset
,
599 use std
::io
::{Error, ErrorKind}
;
600 if (new_offset
< 0) || (new_offset
> (self.archive_size
as i64)) {
601 return Err(Error
::new(
604 "seek is out of range {} ([0..{}])",
605 new_offset
, self.archive_size
609 self.read_offset
= new_offset
as u64;