1 use std
::convert
::TryInto
;
3 use std
::io
::{BufWriter, Seek, SeekFrom, Write}
;
4 use std
::os
::unix
::io
::AsRawFd
;
5 use std
::path
::{Path, PathBuf}
;
10 use proxmox
::tools
::io
::ReadExt
;
11 use proxmox
::tools
::uuid
::Uuid
;
12 use proxmox
::tools
::vec
;
14 use super::chunk_stat
::ChunkStat
;
15 use super::chunk_store
::ChunkStore
;
16 use super::read_chunk
::ReadChunk
;
19 use super::{DataBlob, DataChunkBuilder}
;
22 /// Header format definition for dynamic index files (`.dixd`)
24 pub struct DynamicIndexHeader
{
28 /// Sha256 over the index ``SHA256(offset1||digest1||offset2||digest2||...)``
29 pub index_csum
: [u8; 32],
30 reserved
: [u8; 4032], // overall size is one page (4096 bytes)
32 proxmox
::tools
::static_assert_size
!(DynamicIndexHeader
, 4096);
33 // TODO: Once non-Copy unions are stabilized, use:
34 // union DynamicIndexHeader {
35 // reserved: [u8; 4096],
36 // pub data: DynamicIndexHeaderData,
39 pub struct DynamicIndexReader
{
46 pub index_csum
: [u8; 32],
49 // `index` is mmap()ed which cannot be thread-local so should be sendable
50 // FIXME: Introduce an mmap wrapper type for this?
51 unsafe impl Send
for DynamicIndexReader {}
52 unsafe impl Sync
for DynamicIndexReader {}
54 impl Drop
for DynamicIndexReader
{
56 if let Err(err
) = self.unmap() {
57 eprintln
!("Unable to unmap dynamic index - {}", err
);
62 impl DynamicIndexReader
{
63 pub fn open(path
: &Path
) -> Result
<Self, Error
> {
66 .and_then(|file
| Self::new(file
))
67 .map_err(|err
| format_err
!("Unable to open dynamic index {:?} - {}", path
, err
))
70 pub fn new(mut file
: std
::fs
::File
) -> Result
<Self, Error
> {
72 nix
::fcntl
::flock(file
.as_raw_fd(), nix
::fcntl
::FlockArg
::LockSharedNonblock
)
74 bail
!("unable to get shared lock - {}", err
);
77 file
.seek(SeekFrom
::Start(0))?
;
79 let header_size
= std
::mem
::size_of
::<DynamicIndexHeader
>();
81 let header
: Box
<DynamicIndexHeader
> = unsafe { file.read_host_value_boxed()? }
;
83 if header
.magic
!= super::DYNAMIC_SIZED_CHUNK_INDEX_1_0
{
84 bail
!("got unknown magic number");
87 let ctime
= u64::from_le(header
.ctime
);
89 let rawfd
= file
.as_raw_fd();
91 let stat
= nix
::sys
::stat
::fstat(rawfd
)?
;
93 let size
= stat
.st_size
as usize;
95 let index_size
= size
- header_size
;
96 if (index_size
% 40) != 0 {
97 bail
!("got unexpected file size");
101 nix
::sys
::mman
::mmap(
102 std
::ptr
::null_mut(),
104 nix
::sys
::mman
::ProtFlags
::PROT_READ
,
105 nix
::sys
::mman
::MapFlags
::MAP_PRIVATE
,
115 index_entries
: index_size
/ 40,
118 index_csum
: header
.index_csum
,
122 fn unmap(&mut self) -> Result
<(), Error
> {
123 if self.index
== std
::ptr
::null_mut() {
127 if let Err(err
) = unsafe {
128 nix
::sys
::mman
::munmap(self.index
as *mut std
::ffi
::c_void
, self.index_entries
* 40)
130 bail
!("unmap dynamic index failed - {}", err
);
133 self.index
= std
::ptr
::null_mut();
138 #[allow(clippy::cast_ptr_alignment)]
139 pub fn chunk_info(&self, pos
: usize) -> Result
<(u64, u64, [u8; 32]), Error
> {
140 if pos
>= self.index_entries
{
141 bail
!("chunk index out of range");
143 let start
= if pos
== 0 {
146 unsafe { *(self.index.add((pos - 1) * 40) as *const u64) }
149 let end
= unsafe { *(self.index.add(pos * 40) as *const u64) }
;
151 let mut digest
= std
::mem
::MaybeUninit
::<[u8; 32]>::uninit();
153 std
::ptr
::copy_nonoverlapping(
154 self.index
.add(pos
* 40 + 8),
155 (*digest
.as_mut_ptr()).as_mut_ptr(),
160 Ok((start
, end
, unsafe { digest.assume_init() }
))
164 #[allow(clippy::cast_ptr_alignment)]
165 fn chunk_end(&self, pos
: usize) -> u64 {
166 if pos
>= self.index_entries
{
167 panic
!("chunk index out of range");
169 unsafe { *(self.index.add(pos * 40) as *const u64) }
173 fn chunk_digest(&self, pos
: usize) -> &[u8; 32] {
174 if pos
>= self.index_entries
{
175 panic
!("chunk index out of range");
177 let slice
= unsafe { std::slice::from_raw_parts(self.index.add(pos * 40 + 8), 32) }
;
178 slice
.try_into().unwrap()
181 /// Compute checksum and data size
182 pub fn compute_csum(&self) -> ([u8; 32], u64) {
183 let mut csum
= openssl
::sha
::Sha256
::new();
184 let mut chunk_end
= 0;
185 for pos
in 0..self.index_entries
{
186 chunk_end
= self.chunk_end(pos
);
187 let digest
= self.chunk_digest(pos
);
188 csum
.update(&chunk_end
.to_le_bytes());
191 let csum
= csum
.finish();
197 pub fn dump_pxar(&self, mut writer: Box<dyn Write>) -> Result<(), Error> {
199 for pos in 0..self.index_entries {
200 let _end = self.chunk_end(pos);
201 let digest = self.chunk_digest(pos);
202 //println!("Dump {:08x}", end );
203 let chunk = self.store.read_chunk(digest)?;
204 // fimxe: handle encrypted chunks
205 let data = chunk.decode(None)?;
206 writer.write_all(&data)?;
220 ) -> Result
<usize, Error
> {
221 if (offset
>= end
) || (offset
< start
) {
222 bail
!("offset out of range");
225 if end_idx
== start_idx
{
226 return Ok(start_idx
); // found
228 let middle_idx
= (start_idx
+ end_idx
) / 2;
229 let middle_end
= self.chunk_end(middle_idx
);
231 if offset
< middle_end
{
232 self.binary_search(start_idx
, start
, middle_idx
, middle_end
, offset
)
234 self.binary_search(middle_idx
+ 1, middle_end
, end_idx
, end
, offset
)
239 impl IndexFile
for DynamicIndexReader
{
240 fn index_count(&self) -> usize {
244 fn index_digest(&self, pos
: usize) -> Option
<&[u8; 32]> {
245 if pos
>= self.index_entries
{
248 Some(unsafe { std::mem::transmute(self.chunk_digest(pos).as_ptr()) }
)
252 fn index_bytes(&self) -> u64 {
253 if self.index_entries
== 0 {
256 self.chunk_end((self.index_entries
- 1) as usize)
261 pub struct BufferedDynamicReader
<S
> {
263 index
: DynamicIndexReader
,
265 read_buffer
: Vec
<u8>,
266 buffered_chunk_idx
: usize,
267 buffered_chunk_start
: u64,
271 impl<S
: ReadChunk
> BufferedDynamicReader
<S
> {
272 pub fn new(index
: DynamicIndexReader
, store
: S
) -> Self {
273 let archive_size
= index
.chunk_end(index
.index_entries
- 1);
278 read_buffer
: Vec
::with_capacity(1024 * 1024),
279 buffered_chunk_idx
: 0,
280 buffered_chunk_start
: 0,
285 pub fn archive_size(&self) -> u64 {
289 fn buffer_chunk(&mut self, idx
: usize) -> Result
<(), Error
> {
290 let index
= &self.index
;
291 let (start
, end
, digest
) = index
.chunk_info(idx
)?
;
295 let data
= self.store
.read_chunk(&digest
)?
;
297 if (end
- start
) != data
.len() as u64 {
299 "read chunk with wrong size ({} != {}",
305 self.read_buffer
.clear();
306 self.read_buffer
.extend_from_slice(&data
);
308 self.buffered_chunk_idx
= idx
;
310 self.buffered_chunk_start
= start
as u64;
311 //println!("BUFFER {} {}", self.buffered_chunk_start, end);
316 impl<S
: ReadChunk
> crate::tools
::BufferedRead
for BufferedDynamicReader
<S
> {
317 fn buffered_read(&mut self, offset
: u64) -> Result
<&[u8], Error
> {
318 if offset
== self.archive_size
{
319 return Ok(&self.read_buffer
[0..0]);
322 let buffer_len
= self.read_buffer
.len();
323 let index
= &self.index
;
325 // optimization for sequential read
327 && ((self.buffered_chunk_idx
+ 1) < index
.index_entries
)
328 && (offset
>= (self.buffered_chunk_start
+ (self.read_buffer
.len() as u64)))
330 let next_idx
= self.buffered_chunk_idx
+ 1;
331 let next_end
= index
.chunk_end(next_idx
);
332 if offset
< next_end
{
333 self.buffer_chunk(next_idx
)?
;
334 let buffer_offset
= (offset
- self.buffered_chunk_start
) as usize;
335 return Ok(&self.read_buffer
[buffer_offset
..]);
340 || (offset
< self.buffered_chunk_start
)
341 || (offset
>= (self.buffered_chunk_start
+ (self.read_buffer
.len() as u64)))
343 let end_idx
= index
.index_entries
- 1;
344 let end
= index
.chunk_end(end_idx
);
345 let idx
= index
.binary_search(0, 0, end_idx
, end
, offset
)?
;
346 self.buffer_chunk(idx
)?
;
349 let buffer_offset
= (offset
- self.buffered_chunk_start
) as usize;
350 Ok(&self.read_buffer
[buffer_offset
..])
354 impl<S
: ReadChunk
> std
::io
::Read
for BufferedDynamicReader
<S
> {
355 fn read(&mut self, buf
: &mut [u8]) -> Result
<usize, std
::io
::Error
> {
356 use crate::tools
::BufferedRead
;
357 use std
::io
::{Error, ErrorKind}
;
359 let data
= match self.buffered_read(self.read_offset
) {
361 Err(err
) => return Err(Error
::new(ErrorKind
::Other
, err
.to_string())),
364 let n
= if data
.len() > buf
.len() {
371 std
::ptr
::copy_nonoverlapping(data
.as_ptr(), buf
.as_mut_ptr(), n
);
374 self.read_offset
+= n
as u64;
380 impl<S
: ReadChunk
> std
::io
::Seek
for BufferedDynamicReader
<S
> {
381 fn seek(&mut self, pos
: SeekFrom
) -> Result
<u64, std
::io
::Error
> {
382 let new_offset
= match pos
{
383 SeekFrom
::Start(start_offset
) => start_offset
as i64,
384 SeekFrom
::End(end_offset
) => (self.archive_size
as i64) + end_offset
,
385 SeekFrom
::Current(offset
) => (self.read_offset
as i64) + offset
,
388 use std
::io
::{Error, ErrorKind}
;
389 if (new_offset
< 0) || (new_offset
> (self.archive_size
as i64)) {
390 return Err(Error
::new(
393 "seek is out of range {} ([0..{}])",
394 new_offset
, self.archive_size
398 self.read_offset
= new_offset
as u64;
404 /// Create dynamic index files (`.dixd`)
405 pub struct DynamicIndexWriter
{
406 store
: Arc
<ChunkStore
>,
407 _lock
: tools
::ProcessLockSharedGuard
,
408 writer
: BufWriter
<File
>,
411 tmp_filename
: PathBuf
,
412 csum
: Option
<openssl
::sha
::Sha256
>,
417 impl Drop
for DynamicIndexWriter
{
419 let _
= std
::fs
::remove_file(&self.tmp_filename
); // ignore errors
423 impl DynamicIndexWriter
{
424 pub fn create(store
: Arc
<ChunkStore
>, path
: &Path
) -> Result
<Self, Error
> {
425 let shared_lock
= store
.try_shared_lock()?
;
427 let full_path
= store
.relative_path(path
);
428 let mut tmp_path
= full_path
.clone();
429 tmp_path
.set_extension("tmp_didx");
431 let file
= std
::fs
::OpenOptions
::new()
438 let mut writer
= BufWriter
::with_capacity(1024 * 1024, file
);
440 let header_size
= std
::mem
::size_of
::<DynamicIndexHeader
>();
442 // todo: use static assertion when available in rust
443 if header_size
!= 4096 {
444 panic
!("got unexpected header size");
447 let ctime
= std
::time
::SystemTime
::now()
448 .duration_since(std
::time
::SystemTime
::UNIX_EPOCH
)?
451 let uuid
= Uuid
::generate();
453 let mut buffer
= vec
::zeroed(header_size
);
454 let header
= crate::tools
::map_struct_mut
::<DynamicIndexHeader
>(&mut buffer
)?
;
456 header
.magic
= super::DYNAMIC_SIZED_CHUNK_INDEX_1_0
;
457 header
.ctime
= u64::to_le(ctime
);
458 header
.uuid
= *uuid
.as_bytes();
460 header
.index_csum
= [0u8; 32];
462 writer
.write_all(&buffer
)?
;
464 let csum
= Some(openssl
::sha
::Sha256
::new());
472 tmp_filename
: tmp_path
,
474 uuid
: *uuid
.as_bytes(),
479 // fixme: use add_chunk instead?
480 pub fn insert_chunk(&self, chunk
: &DataBlob
, digest
: &[u8; 32]) -> Result
<(bool
, u64), Error
> {
481 self.store
.insert_chunk(chunk
, digest
)
484 pub fn close(&mut self) -> Result
<[u8; 32], Error
> {
487 "cannot close already closed archive index file {:?}",
494 self.writer
.flush()?
;
496 let csum_offset
= proxmox
::tools
::offsetof!(DynamicIndexHeader
, index_csum
);
497 self.writer
.seek(SeekFrom
::Start(csum_offset
as u64))?
;
499 let csum
= self.csum
.take().unwrap();
500 let index_csum
= csum
.finish();
502 self.writer
.write_all(&index_csum
)?
;
503 self.writer
.flush()?
;
505 if let Err(err
) = std
::fs
::rename(&self.tmp_filename
, &self.filename
) {
506 bail
!("Atomic rename file {:?} failed - {}", self.filename
, err
);
512 // fixme: rename to add_digest
513 pub fn add_chunk(&mut self, offset
: u64, digest
: &[u8; 32]) -> Result
<(), Error
> {
516 "cannot write to closed dynamic index file {:?}",
521 let offset_le
: &[u8; 8] = unsafe { &std::mem::transmute::<u64, [u8; 8]>(offset.to_le()) }
;
523 if let Some(ref mut csum
) = self.csum
{
524 csum
.update(offset_le
);
528 self.writer
.write_all(offset_le
)?
;
529 self.writer
.write_all(digest
)?
;
534 /// Writer which splits a binary stream into dynamic sized chunks
536 /// And store the resulting chunk list into the index file.
537 pub struct DynamicChunkWriter
{
538 index
: DynamicIndexWriter
,
544 chunk_buffer
: Vec
<u8>,
547 impl DynamicChunkWriter
{
548 pub fn new(index
: DynamicIndexWriter
, chunk_size
: usize) -> Self {
552 chunker
: Chunker
::new(chunk_size
),
553 stat
: ChunkStat
::new(0),
556 chunk_buffer
: Vec
::with_capacity(chunk_size
* 4),
560 pub fn stat(&self) -> &ChunkStat
{
564 pub fn close(&mut self) -> Result
<(), Error
> {
571 self.write_chunk_buffer()?
;
575 self.stat
.size
= self.chunk_offset
as u64;
577 // add size of index file
579 (self.stat
.chunk_count
* 40 + std
::mem
::size_of
::<DynamicIndexHeader
>()) as u64;
584 fn write_chunk_buffer(&mut self) -> Result
<(), Error
> {
585 let chunk_size
= self.chunk_buffer
.len();
591 let expected_chunk_size
= self.chunk_offset
- self.last_chunk
;
592 if expected_chunk_size
!= self.chunk_buffer
.len() {
593 bail
!("wrong chunk size {} != {}", expected_chunk_size
, chunk_size
);
596 self.stat
.chunk_count
+= 1;
598 self.last_chunk
= self.chunk_offset
;
600 let (chunk
, digest
) = DataChunkBuilder
::new(&self.chunk_buffer
)
604 match self.index
.insert_chunk(&chunk
, &digest
) {
605 Ok((is_duplicate
, compressed_size
)) => {
606 self.stat
.compressed_size
+= compressed_size
;
608 self.stat
.duplicate_chunks
+= 1;
610 self.stat
.disk_size
+= compressed_size
;
614 "ADD CHUNK {:016x} {} {}% {} {}",
617 (compressed_size
* 100) / (chunk_size
as u64),
619 proxmox
::tools
::digest_to_hex(&digest
)
621 self.index
.add_chunk(self.chunk_offset
as u64, &digest
)?
;
622 self.chunk_buffer
.truncate(0);
626 self.chunk_buffer
.truncate(0);
633 impl Write
for DynamicChunkWriter
{
634 fn write(&mut self, data
: &[u8]) -> std
::result
::Result
<usize, std
::io
::Error
> {
635 let chunker
= &mut self.chunker
;
637 let pos
= chunker
.scan(data
);
640 self.chunk_buffer
.extend_from_slice(&data
[0..pos
]);
641 self.chunk_offset
+= pos
;
643 if let Err(err
) = self.write_chunk_buffer() {
644 return Err(std
::io
::Error
::new(
645 std
::io
::ErrorKind
::Other
,
651 self.chunk_offset
+= data
.len();
652 self.chunk_buffer
.extend_from_slice(data
);
657 fn flush(&mut self) -> std
::result
::Result
<(), std
::io
::Error
> {
658 Err(std
::io
::Error
::new(
659 std
::io
::ErrorKind
::Other
,
660 "please use close() instead of flush()",