2 use std
::io
::{Seek, SeekFrom}
;
6 use super::chunk_stat
::*;
7 use super::chunk_store
::*;
10 use std
::io
::{Read, Write}
;
12 use std
::path
::{Path, PathBuf}
;
13 use std
::os
::unix
::io
::AsRawFd
;
15 use chrono
::{Local, TimeZone}
;
18 use super::read_chunk
::*;
20 /// Header format definition for fixed index files (`.fidx`)
22 pub struct FixedIndexHeader
{
26 /// Sha256 over the index ``SHA256(digest1||digest2||...)``
27 pub index_csum
: [u8; 32],
30 reserved
: [u8; 4016], // overall size is one page (4096 bytes)
33 // split image into fixed size chunks
35 pub struct FixedIndexReader
{
37 pub chunk_size
: usize,
43 pub index_csum
: [u8; 32],
46 // `index` is mmap()ed which cannot be thread-local so should be sendable
47 unsafe impl Send
for FixedIndexReader {}
49 impl Drop
for FixedIndexReader
{
52 if let Err(err
) = self.unmap() {
53 eprintln
!("Unable to unmap file - {}", err
);
58 impl FixedIndexReader
{
60 pub fn open(path
: &Path
) -> Result
<Self, Error
> {
64 .and_then(|file
| Self::new(file
))
65 .map_err(|err
| format_err
!("Unable to open fixed index {:?} - {}", path
, err
))
68 pub fn new(mut file
: std
::fs
::File
) -> Result
<Self, Error
> {
70 if let Err(err
) = nix
::fcntl
::flock(file
.as_raw_fd(), nix
::fcntl
::FlockArg
::LockSharedNonblock
) {
71 bail
!("unable to get shared lock - {}", err
);
74 file
.seek(SeekFrom
::Start(0))?
;
76 let header_size
= std
::mem
::size_of
::<FixedIndexHeader
>();
78 // todo: use static assertion when available in rust
79 if header_size
!= 4096 { bail!("got unexpected header size"); }
81 let mut buffer
= vec
![0u8; header_size
];
82 file
.read_exact(&mut buffer
)?
;
84 let header
= unsafe { &mut * (buffer.as_ptr() as *mut FixedIndexHeader) }
;
86 if header
.magic
!= super::FIXED_SIZED_CHUNK_INDEX_1_0
{
87 bail
!("got unknown magic number");
90 let size
= u64::from_le(header
.size
);
91 let ctime
= u64::from_le(header
.ctime
);
92 let chunk_size
= u64::from_le(header
.chunk_size
);
94 let index_length
= ((size
+ chunk_size
- 1)/chunk_size
) as usize;
95 let index_size
= index_length
*32;
97 let rawfd
= file
.as_raw_fd();
99 let stat
= match nix
::sys
::stat
::fstat(rawfd
) {
101 Err(err
) => bail
!("fstat failed - {}", err
),
104 let expected_index_size
= (stat
.st_size
as usize) - header_size
;
105 if index_size
!= expected_index_size
{
106 bail
!("got unexpected file size ({} != {})", index_size
, expected_index_size
);
109 let data
= unsafe { nix
::sys
::mman
::mmap(
110 std
::ptr
::null_mut(),
112 nix
::sys
::mman
::ProtFlags
::PROT_READ
,
113 nix
::sys
::mman
::MapFlags
::MAP_PRIVATE
,
115 header_size
as i64) }?
as *mut u8;
119 chunk_size
: chunk_size
as usize,
125 index_csum
: header
.index_csum
,
129 fn unmap(&mut self) -> Result
<(), Error
> {
131 if self.index
== std
::ptr
::null_mut() { return Ok(()); }
133 let index_size
= self.index_length
*32;
135 if let Err(err
) = unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) }
{
136 bail
!("unmap file failed - {}", err
);
139 self.index
= std
::ptr
::null_mut();
144 pub fn chunk_info(&self, pos
: usize) -> Result
<(u64, u64, [u8; 32]), Error
> {
146 if pos
>= self.index_length
{
147 bail
!("chunk index out of range");
149 let start
= (pos
* self.chunk_size
) as u64;
150 let mut end
= start
+ self.chunk_size
as u64;
156 let mut digest
: [u8; 32] = unsafe { std::mem::uninitialized() }
;
157 unsafe { std::ptr::copy_nonoverlapping(self.index.add(pos*32), digest.as_mut_ptr(), 32); }
159 Ok((start
, end
, digest
))
163 fn chunk_end(&self, pos
: usize) -> u64 {
164 if pos
>= self.index_length
{
165 panic
!("chunk index out of range");
168 let end
= ((pos
+1) * self.chunk_size
) as u64;
176 pub fn print_info(&self) {
177 println
!("Size: {}", self.size
);
178 println
!("ChunkSize: {}", self.chunk_size
);
179 println
!("CTime: {}", Local
.timestamp(self.ctime
as i64, 0).format("%c"));
180 println
!("UUID: {:?}", self.uuid
);
184 impl IndexFile
for FixedIndexReader
{
185 fn index_count(&self) -> usize {
189 fn index_digest(&self, pos
: usize) -> Option
<&[u8; 32]> {
190 if pos
>= self.index_length
{
193 Some(unsafe { std::mem::transmute(self.index.add(pos*32)) }
)
197 fn index_bytes(&self) -> u64 {
202 pub struct FixedIndexWriter
{
203 store
: Arc
<ChunkStore
>,
205 _lock
: tools
::ProcessLockSharedGuard
,
207 tmp_filename
: PathBuf
,
216 // `index` is mmap()ed which cannot be thread-local so should be sendable
217 unsafe impl Send
for FixedIndexWriter {}
219 impl Drop
for FixedIndexWriter
{
222 let _
= std
::fs
::remove_file(&self.tmp_filename
); // ignore errors
223 if let Err(err
) = self.unmap() {
224 eprintln
!("Unable to unmap file {:?} - {}", self.tmp_filename
, err
);
229 impl FixedIndexWriter
{
231 pub fn create(store
: Arc
<ChunkStore
>, path
: &Path
, size
: usize, chunk_size
: usize) -> Result
<Self, Error
> {
233 let shared_lock
= store
.try_shared_lock()?
;
235 let full_path
= store
.relative_path(path
);
236 let mut tmp_path
= full_path
.clone();
237 tmp_path
.set_extension("tmp_fidx");
239 let mut file
= std
::fs
::OpenOptions
::new()
240 .create(true).truncate(true)
245 let header_size
= std
::mem
::size_of
::<FixedIndexHeader
>();
247 // todo: use static assertion when available in rust
248 if header_size
!= 4096 { panic!("got unexpected header size"); }
250 let ctime
= std
::time
::SystemTime
::now().duration_since(
251 std
::time
::SystemTime
::UNIX_EPOCH
)?
.as_secs();
253 let uuid
= Uuid
::new_v4();
255 let buffer
= vec
![0u8; header_size
];
256 let header
= unsafe { &mut * (buffer.as_ptr() as *mut FixedIndexHeader) }
;
258 header
.magic
= super::FIXED_SIZED_CHUNK_INDEX_1_0
;
259 header
.ctime
= u64::to_le(ctime
);
260 header
.size
= u64::to_le(size
as u64);
261 header
.chunk_size
= u64::to_le(chunk_size
as u64);
262 header
.uuid
= *uuid
.as_bytes();
264 header
.index_csum
= [0u8; 32];
266 file
.write_all(&buffer
)?
;
268 let index_length
= (size
+ chunk_size
- 1)/chunk_size
;
269 let index_size
= index_length
*32;
270 nix
::unistd
::ftruncate(file
.as_raw_fd(), (header_size
+ index_size
) as i64)?
;
272 let data
= unsafe { nix
::sys
::mman
::mmap(
273 std
::ptr
::null_mut(),
275 nix
::sys
::mman
::ProtFlags
::PROT_READ
| nix
::sys
::mman
::ProtFlags
::PROT_WRITE
,
276 nix
::sys
::mman
::MapFlags
::MAP_SHARED
,
278 header_size
as i64) }?
as *mut u8;
285 tmp_filename
: tmp_path
,
291 uuid
: *uuid
.as_bytes(),
295 pub fn index_length(&self) -> usize {
299 fn unmap(&mut self) -> Result
<(), Error
> {
301 if self.index
== std
::ptr
::null_mut() { return Ok(()); }
303 let index_size
= self.index_length
*32;
305 if let Err(err
) = unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) }
{
306 bail
!("unmap file {:?} failed - {}", self.tmp_filename
, err
);
309 self.index
= std
::ptr
::null_mut();
314 pub fn close(&mut self) -> Result
<[u8; 32], Error
> {
316 if self.index
== std
::ptr
::null_mut() { bail!("cannot close already closed index file."); }
318 let index_size
= self.index_length
*32;
319 let data
= unsafe { std::slice::from_raw_parts(self.index, index_size) }
;
320 let index_csum
= openssl
::sha
::sha256(data
);
324 let csum_offset
= proxmox
::tools
::offsetof!(FixedIndexHeader
, index_csum
);
325 self.file
.seek(SeekFrom
::Start(csum_offset
as u64))?
;
326 self.file
.write_all(&index_csum
)?
;
329 if let Err(err
) = std
::fs
::rename(&self.tmp_filename
, &self.filename
) {
330 bail
!("Atomic rename file {:?} failed - {}", self.filename
, err
);
336 pub fn check_chunk_alignment(&self, offset
: usize, chunk_len
: usize) -> Result
<usize, Error
> {
338 if offset
< chunk_len
{
339 bail
!("got chunk with small offset ({} < {}", offset
, chunk_len
);
342 let pos
= offset
- chunk_len
;
344 if offset
> self.size
{
345 bail
!("chunk data exceeds size ({} >= {})", offset
, self.size
);
348 // last chunk can be smaller
349 if ((offset
!= self.size
) && (chunk_len
!= self.chunk_size
)) ||
350 (chunk_len
> self.chunk_size
) || (chunk_len
== 0) {
351 bail
!("chunk with unexpected length ({} != {}", chunk_len
, self.chunk_size
);
354 if pos
& (self.chunk_size
-1) != 0 {
355 bail
!("got unaligned chunk (pos = {})", pos
);
358 Ok(pos
/ self.chunk_size
)
361 // Note: We want to add data out of order, so do not assume any order here.
362 pub fn add_chunk(&mut self, chunk_info
: &ChunkInfo
, stat
: &mut ChunkStat
) -> Result
<(), Error
> {
364 let chunk_len
= chunk_info
.chunk_len
as usize;
365 let offset
= chunk_info
.offset
as usize; // end of chunk
367 let idx
= self.check_chunk_alignment(offset
, chunk_len
)?
;
369 let (is_duplicate
, compressed_size
) = self.store
.insert_chunk(&chunk_info
.chunk
)?
;
371 stat
.chunk_count
+= 1;
372 stat
.compressed_size
+= compressed_size
;
374 let digest
= chunk_info
.chunk
.digest();
376 println
!("ADD CHUNK {} {} {}% {} {}", idx
, chunk_len
,
377 (compressed_size
*100)/(chunk_len
as u64), is_duplicate
, proxmox
::tools
::digest_to_hex(digest
));
380 stat
.duplicate_chunks
+= 1;
382 stat
.disk_size
+= compressed_size
;
385 self.add_digest(idx
, digest
)
388 pub fn add_digest(&mut self, index
: usize, digest
: &[u8; 32]) -> Result
<(), Error
> {
390 if index
>= self.index_length
{
391 bail
!("add digest failed - index out of range ({} >= {})", index
, self.index_length
);
394 if self.index
== std
::ptr
::null_mut() { bail!("cannot write to closed index file."); }
396 let index_pos
= index
*32;
398 let dst
= self.index
.add(index_pos
);
399 dst
.copy_from_nonoverlapping(digest
.as_ptr(), 32);
406 pub struct BufferedFixedReader
<S
> {
408 index
: FixedIndexReader
,
410 read_buffer
: Vec
<u8>,
411 buffered_chunk_idx
: usize,
412 buffered_chunk_start
: u64,
416 impl <S
: ReadChunk
> BufferedFixedReader
<S
> {
418 pub fn new(index
: FixedIndexReader
, store
: S
) -> Self {
420 let archive_size
= index
.size
;
424 archive_size
: archive_size
,
425 read_buffer
: Vec
::with_capacity(1024*1024),
426 buffered_chunk_idx
: 0,
427 buffered_chunk_start
: 0,
432 pub fn archive_size(&self) -> u64 { self.archive_size }
434 fn buffer_chunk(&mut self, idx
: usize) -> Result
<(), Error
> {
436 let index
= &self.index
;
437 let (start
, end
, digest
) = index
.chunk_info(idx
)?
;
441 let data
= self.store
.read_chunk(&digest
)?
;
443 if (end
- start
) != data
.len() as u64 {
444 bail
!("read chunk with wrong size ({} != {}", (end
- start
), data
.len());
447 self.read_buffer
.clear();
448 self.read_buffer
.extend_from_slice(&data
);
450 self.buffered_chunk_idx
= idx
;
452 self.buffered_chunk_start
= start
as u64;
453 //println!("BUFFER {} {}", self.buffered_chunk_start, end);
458 impl <S
: ReadChunk
> crate::tools
::BufferedRead
for BufferedFixedReader
<S
> {
460 fn buffered_read(&mut self, offset
: u64) -> Result
<&[u8], Error
> {
462 if offset
== self.archive_size { return Ok(&self.read_buffer[0..0]); }
464 let buffer_len
= self.read_buffer
.len();
465 let index
= &self.index
;
467 // optimization for sequential read
469 ((self.buffered_chunk_idx
+ 1) < index
.index_length
) &&
470 (offset
>= (self.buffered_chunk_start
+ (self.read_buffer
.len() as u64)))
472 let next_idx
= self.buffered_chunk_idx
+ 1;
473 let next_end
= index
.chunk_end(next_idx
);
474 if offset
< next_end
{
475 self.buffer_chunk(next_idx
)?
;
476 let buffer_offset
= (offset
- self.buffered_chunk_start
) as usize;
477 return Ok(&self.read_buffer
[buffer_offset
..]);
481 if (buffer_len
== 0) ||
482 (offset
< self.buffered_chunk_start
) ||
483 (offset
>= (self.buffered_chunk_start
+ (self.read_buffer
.len() as u64)))
485 let idx
= (offset
/ index
.chunk_size
as u64) as usize;
486 self.buffer_chunk(idx
)?
;
489 let buffer_offset
= (offset
- self.buffered_chunk_start
) as usize;
490 Ok(&self.read_buffer
[buffer_offset
..])
494 impl <S
: ReadChunk
> std
::io
::Read
for BufferedFixedReader
<S
> {
496 fn read(&mut self, buf
: &mut [u8]) -> Result
<usize, std
::io
::Error
> {
498 use std
::io
::{Error, ErrorKind}
;
499 use crate::tools
::BufferedRead
;
501 let data
= match self.buffered_read(self.read_offset
) {
503 Err(err
) => return Err(Error
::new(ErrorKind
::Other
, err
.to_string())),
506 let n
= if data
.len() > buf
.len() { buf.len() }
else { data.len() }
;
508 unsafe { std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n); }
510 self.read_offset
+= n
as u64;
516 impl <S
: ReadChunk
> Seek
for BufferedFixedReader
<S
> {
518 fn seek(&mut self, pos
: SeekFrom
) -> Result
<u64, std
::io
::Error
> {
520 let new_offset
= match pos
{
521 SeekFrom
::Start(start_offset
) => start_offset
as i64,
522 SeekFrom
::End(end_offset
) => (self.archive_size
as i64)+ end_offset
,
523 SeekFrom
::Current(offset
) => (self.read_offset
as i64) + offset
,
526 use std
::io
::{Error, ErrorKind}
;
527 if (new_offset
< 0) || (new_offset
> (self.archive_size
as i64)) {
528 return Err(Error
::new(
530 format
!("seek is out of range {} ([0..{}])", new_offset
, self.archive_size
)));
532 self.read_offset
= new_offset
as u64;