2 use std
::io
::{self, BufWriter, Seek, SeekFrom, Write}
;
4 use std
::os
::unix
::io
::AsRawFd
;
5 use std
::path
::{Path, PathBuf}
;
6 use std
::sync
::{Arc, Mutex}
;
7 use std
::task
::Context
;
10 use anyhow
::{bail, format_err, Error}
;
12 use proxmox
::tools
::io
::ReadExt
;
13 use proxmox
::tools
::uuid
::Uuid
;
14 use proxmox
::tools
::mmap
::Mmap
;
15 use pxar
::accessor
::{MaybeReady, ReadAt, ReadAtOperation}
;
17 use super::chunk_stat
::ChunkStat
;
18 use super::chunk_store
::ChunkStore
;
19 use super::index
::ChunkReadInfo
;
20 use super::read_chunk
::ReadChunk
;
23 use super::{DataBlob, DataChunkBuilder}
;
26 /// Header format definition for dynamic index files (`.dixd`)
28 pub struct DynamicIndexHeader
{
32 /// Sha256 over the index ``SHA256(offset1||digest1||offset2||digest2||...)``
33 pub index_csum
: [u8; 32],
34 reserved
: [u8; 4032], // overall size is one page (4096 bytes)
36 proxmox
::static_assert_size
!(DynamicIndexHeader
, 4096);
37 // TODO: Once non-Copy unions are stabilized, use:
38 // union DynamicIndexHeader {
39 // reserved: [u8; 4096],
40 // pub data: DynamicIndexHeaderData,
43 impl DynamicIndexHeader
{
44 /// Convenience method to allocate a zero-initialized header struct.
45 pub fn zeroed() -> Box
<Self> {
47 Box
::from_raw(std
::alloc
::alloc_zeroed(std
::alloc
::Layout
::new
::<Self>()) as *mut Self)
51 pub fn as_bytes(&self) -> &[u8] {
53 std
::slice
::from_raw_parts(
54 self as *const Self as *const u8,
55 std
::mem
::size_of
::<Self>(),
61 #[derive(Clone, Debug)]
63 pub struct DynamicEntry
{
70 pub fn end(&self) -> u64 {
71 u64::from_le(self.end_le
)
75 pub struct DynamicIndexReader
{
78 index
: Mmap
<DynamicEntry
>,
81 pub index_csum
: [u8; 32],
84 impl DynamicIndexReader
{
85 pub fn open(path
: &Path
) -> Result
<Self, Error
> {
89 .map_err(|err
| format_err
!("Unable to open dynamic index {:?} - {}", path
, err
))
92 pub fn new(mut file
: std
::fs
::File
) -> Result
<Self, Error
> {
94 nix
::fcntl
::flock(file
.as_raw_fd(), nix
::fcntl
::FlockArg
::LockSharedNonblock
)
96 bail
!("unable to get shared lock - {}", err
);
99 // FIXME: This is NOT OUR job! Check the callers of this method and remove this!
100 file
.seek(SeekFrom
::Start(0))?
;
102 let header_size
= std
::mem
::size_of
::<DynamicIndexHeader
>();
104 let header
: Box
<DynamicIndexHeader
> = unsafe { file.read_host_value_boxed()? }
;
106 if header
.magic
!= super::DYNAMIC_SIZED_CHUNK_INDEX_1_0
{
107 bail
!("got unknown magic number");
110 let ctime
= proxmox
::tools
::time
::epoch_i64();
112 let rawfd
= file
.as_raw_fd();
114 let stat
= nix
::sys
::stat
::fstat(rawfd
)?
;
116 let size
= stat
.st_size
as usize;
118 let index_size
= size
- header_size
;
119 let index_count
= index_size
/ 40;
120 if index_count
* 40 != index_size
{
121 bail
!("got unexpected file size");
129 nix
::sys
::mman
::ProtFlags
::PROT_READ
,
130 nix
::sys
::mman
::MapFlags
::MAP_PRIVATE
,
140 index_csum
: header
.index_csum
,
145 #[allow(clippy::cast_ptr_alignment)]
146 fn chunk_end(&self, pos
: usize) -> u64 {
147 if pos
>= self.index
.len() {
148 panic
!("chunk index out of range");
150 self.index
[pos
].end()
154 fn chunk_digest(&self, pos
: usize) -> &[u8; 32] {
155 if pos
>= self.index
.len() {
156 panic
!("chunk index out of range");
158 &self.index
[pos
].digest
161 // TODO: can we use std::slice::binary_search with Mmap now?
169 ) -> Result
<usize, Error
> {
170 if (offset
>= end
) || (offset
< start
) {
171 bail
!("offset out of range");
174 if end_idx
== start_idx
{
175 return Ok(start_idx
); // found
177 let middle_idx
= (start_idx
+ end_idx
) / 2;
178 let middle_end
= self.chunk_end(middle_idx
);
180 if offset
< middle_end
{
181 self.binary_search(start_idx
, start
, middle_idx
, middle_end
, offset
)
183 self.binary_search(middle_idx
+ 1, middle_end
, end_idx
, end
, offset
)
188 impl IndexFile
for DynamicIndexReader
{
189 fn index_count(&self) -> usize {
193 fn index_digest(&self, pos
: usize) -> Option
<&[u8; 32]> {
194 if pos
>= self.index
.len() {
197 Some(unsafe { std::mem::transmute(self.chunk_digest(pos).as_ptr()) }
)
201 fn index_bytes(&self) -> u64 {
202 if self.index
.is_empty() {
205 self.chunk_end(self.index
.len() - 1)
209 fn compute_csum(&self) -> ([u8; 32], u64) {
210 let mut csum
= openssl
::sha
::Sha256
::new();
211 let mut chunk_end
= 0;
212 for pos
in 0..self.index_count() {
213 let info
= self.chunk_info(pos
).unwrap();
214 chunk_end
= info
.range
.end
;
215 csum
.update(&chunk_end
.to_le_bytes());
216 csum
.update(&info
.digest
);
218 let csum
= csum
.finish();
222 #[allow(clippy::cast_ptr_alignment)]
223 fn chunk_info(&self, pos
: usize) -> Option
<ChunkReadInfo
> {
224 if pos
>= self.index
.len() {
227 let start
= if pos
== 0 { 0 }
else { self.index[pos - 1].end() }
;
229 let end
= self.index
[pos
].end();
233 digest
: self.index
[pos
].digest
.clone(),
237 fn chunk_from_offset(&self, offset
: u64) -> Option
<(usize, u64)> {
238 let end_idx
= self.index
.len() - 1;
239 let end
= self.chunk_end(end_idx
);
240 let found_idx
= self.binary_search(0, 0, end_idx
, end
, offset
);
241 let found_idx
= match found_idx
{
243 Err(_
) => return None
246 let found_start
= if found_idx
== 0 {
249 self.chunk_end(found_idx
- 1)
252 Some((found_idx
, offset
- found_start
))
262 /// Perform sanity checks on the range and data size:
263 pub fn new(range
: Range
<u64>, data
: Vec
<u8>) -> Result
<Self, Error
> {
264 if data
.len() as u64 != range
.end
- range
.start
{
266 "read chunk with wrong size ({} != {})",
268 range
.end
- range
.start
,
271 Ok(Self { range, data }
)
275 pub struct BufferedDynamicReader
<S
> {
277 index
: DynamicIndexReader
,
279 read_buffer
: Vec
<u8>,
280 buffered_chunk_idx
: usize,
281 buffered_chunk_start
: u64,
283 lru_cache
: crate::tools
::lru_cache
::LruCache
<usize, CachedChunk
>,
286 struct ChunkCacher
<'a
, S
> {
288 index
: &'a DynamicIndexReader
,
291 impl<'a
, S
: ReadChunk
> crate::tools
::lru_cache
::Cacher
<usize, CachedChunk
> for ChunkCacher
<'a
, S
> {
292 fn fetch(&mut self, index
: usize) -> Result
<Option
<CachedChunk
>, Error
> {
293 let info
= match self.index
.chunk_info(index
) {
295 None
=> bail
!("chunk index out of range"),
297 let range
= info
.range
;
298 let data
= self.store
.read_chunk(&info
.digest
)?
;
299 CachedChunk
::new(range
, data
).map(Some
)
303 impl<S
: ReadChunk
> BufferedDynamicReader
<S
> {
304 pub fn new(index
: DynamicIndexReader
, store
: S
) -> Self {
305 let archive_size
= index
.index_bytes();
310 read_buffer
: Vec
::with_capacity(1024 * 1024),
311 buffered_chunk_idx
: 0,
312 buffered_chunk_start
: 0,
314 lru_cache
: crate::tools
::lru_cache
::LruCache
::new(32),
318 pub fn archive_size(&self) -> u64 {
322 fn buffer_chunk(&mut self, idx
: usize) -> Result
<(), Error
> {
323 //let (start, end, data) = self.lru_cache.access(
324 let cached_chunk
= self.lru_cache
.access(
327 store
: &mut self.store
,
330 )?
.ok_or_else(|| format_err
!("chunk not found by cacher"))?
;
333 self.read_buffer
.clear();
334 self.read_buffer
.extend_from_slice(&cached_chunk
.data
);
336 self.buffered_chunk_idx
= idx
;
338 self.buffered_chunk_start
= cached_chunk
.range
.start
;
339 //println!("BUFFER {} {}", self.buffered_chunk_start, end);
344 impl<S
: ReadChunk
> crate::tools
::BufferedRead
for BufferedDynamicReader
<S
> {
345 fn buffered_read(&mut self, offset
: u64) -> Result
<&[u8], Error
> {
346 if offset
== self.archive_size
{
347 return Ok(&self.read_buffer
[0..0]);
350 let buffer_len
= self.read_buffer
.len();
351 let index
= &self.index
;
353 // optimization for sequential read
355 && ((self.buffered_chunk_idx
+ 1) < index
.index
.len())
356 && (offset
>= (self.buffered_chunk_start
+ (self.read_buffer
.len() as u64)))
358 let next_idx
= self.buffered_chunk_idx
+ 1;
359 let next_end
= index
.chunk_end(next_idx
);
360 if offset
< next_end
{
361 self.buffer_chunk(next_idx
)?
;
362 let buffer_offset
= (offset
- self.buffered_chunk_start
) as usize;
363 return Ok(&self.read_buffer
[buffer_offset
..]);
368 || (offset
< self.buffered_chunk_start
)
369 || (offset
>= (self.buffered_chunk_start
+ (self.read_buffer
.len() as u64)))
371 let end_idx
= index
.index
.len() - 1;
372 let end
= index
.chunk_end(end_idx
);
373 let idx
= index
.binary_search(0, 0, end_idx
, end
, offset
)?
;
374 self.buffer_chunk(idx
)?
;
377 let buffer_offset
= (offset
- self.buffered_chunk_start
) as usize;
378 Ok(&self.read_buffer
[buffer_offset
..])
382 impl<S
: ReadChunk
> std
::io
::Read
for BufferedDynamicReader
<S
> {
383 fn read(&mut self, buf
: &mut [u8]) -> Result
<usize, std
::io
::Error
> {
384 use crate::tools
::BufferedRead
;
385 use std
::io
::{Error, ErrorKind}
;
387 let data
= match self.buffered_read(self.read_offset
) {
389 Err(err
) => return Err(Error
::new(ErrorKind
::Other
, err
.to_string())),
392 let n
= if data
.len() > buf
.len() {
398 buf
[0..n
].copy_from_slice(&data
[0..n
]);
400 self.read_offset
+= n
as u64;
406 impl<S
: ReadChunk
> std
::io
::Seek
for BufferedDynamicReader
<S
> {
407 fn seek(&mut self, pos
: SeekFrom
) -> Result
<u64, std
::io
::Error
> {
408 let new_offset
= match pos
{
409 SeekFrom
::Start(start_offset
) => start_offset
as i64,
410 SeekFrom
::End(end_offset
) => (self.archive_size
as i64) + end_offset
,
411 SeekFrom
::Current(offset
) => (self.read_offset
as i64) + offset
,
414 use std
::io
::{Error, ErrorKind}
;
415 if (new_offset
< 0) || (new_offset
> (self.archive_size
as i64)) {
416 return Err(Error
::new(
419 "seek is out of range {} ([0..{}])",
420 new_offset
, self.archive_size
424 self.read_offset
= new_offset
as u64;
430 /// This is a workaround until we have cleaned up the chunk/reader/... infrastructure for better
433 /// Ideally BufferedDynamicReader gets replaced so the LruCache maps to `BroadcastFuture<Chunk>`,
434 /// so that we can properly access it from multiple threads simultaneously while not issuing
435 /// duplicate simultaneous reads over http.
437 pub struct LocalDynamicReadAt
<R
: ReadChunk
> {
438 inner
: Arc
<Mutex
<BufferedDynamicReader
<R
>>>,
441 impl<R
: ReadChunk
> LocalDynamicReadAt
<R
> {
442 pub fn new(inner
: BufferedDynamicReader
<R
>) -> Self {
444 inner
: Arc
::new(Mutex
::new(inner
)),
449 impl<R
: ReadChunk
> ReadAt
for LocalDynamicReadAt
<R
> {
450 fn start_read_at
<'a
>(
455 ) -> MaybeReady
<io
::Result
<usize>, ReadAtOperation
<'a
>> {
457 MaybeReady
::Ready(tokio
::task
::block_in_place(move || {
458 let mut reader
= self.inner
.lock().unwrap();
459 reader
.seek(SeekFrom
::Start(offset
))?
;
460 Ok(reader
.read(buf
)?
)
464 fn poll_complete
<'a
>(
466 _op
: ReadAtOperation
<'a
>,
467 ) -> MaybeReady
<io
::Result
<usize>, ReadAtOperation
<'a
>> {
468 panic
!("LocalDynamicReadAt::start_read_at returned Pending");
473 /// Create dynamic index files (`.dixd`)
474 pub struct DynamicIndexWriter
{
475 store
: Arc
<ChunkStore
>,
476 _lock
: tools
::ProcessLockSharedGuard
,
477 writer
: BufWriter
<File
>,
480 tmp_filename
: PathBuf
,
481 csum
: Option
<openssl
::sha
::Sha256
>,
486 impl Drop
for DynamicIndexWriter
{
488 let _
= std
::fs
::remove_file(&self.tmp_filename
); // ignore errors
492 impl DynamicIndexWriter
{
493 pub fn create(store
: Arc
<ChunkStore
>, path
: &Path
) -> Result
<Self, Error
> {
494 let shared_lock
= store
.try_shared_lock()?
;
496 let full_path
= store
.relative_path(path
);
497 let mut tmp_path
= full_path
.clone();
498 tmp_path
.set_extension("tmp_didx");
500 let file
= std
::fs
::OpenOptions
::new()
507 let mut writer
= BufWriter
::with_capacity(1024 * 1024, file
);
509 let ctime
= proxmox
::tools
::time
::epoch_i64();
511 let uuid
= Uuid
::generate();
513 let mut header
= DynamicIndexHeader
::zeroed();
514 header
.magic
= super::DYNAMIC_SIZED_CHUNK_INDEX_1_0
;
515 header
.ctime
= i64::to_le(ctime
);
516 header
.uuid
= *uuid
.as_bytes();
517 // header.index_csum = [0u8; 32];
518 writer
.write_all(header
.as_bytes())?
;
520 let csum
= Some(openssl
::sha
::Sha256
::new());
528 tmp_filename
: tmp_path
,
530 uuid
: *uuid
.as_bytes(),
535 // fixme: use add_chunk instead?
536 pub fn insert_chunk(&self, chunk
: &DataBlob
, digest
: &[u8; 32]) -> Result
<(bool
, u64), Error
> {
537 self.store
.insert_chunk(chunk
, digest
)
540 pub fn close(&mut self) -> Result
<[u8; 32], Error
> {
543 "cannot close already closed archive index file {:?}",
550 self.writer
.flush()?
;
552 let csum_offset
= proxmox
::offsetof!(DynamicIndexHeader
, index_csum
);
553 self.writer
.seek(SeekFrom
::Start(csum_offset
as u64))?
;
555 let csum
= self.csum
.take().unwrap();
556 let index_csum
= csum
.finish();
558 self.writer
.write_all(&index_csum
)?
;
559 self.writer
.flush()?
;
561 if let Err(err
) = std
::fs
::rename(&self.tmp_filename
, &self.filename
) {
562 bail
!("Atomic rename file {:?} failed - {}", self.filename
, err
);
568 // fixme: rename to add_digest
569 pub fn add_chunk(&mut self, offset
: u64, digest
: &[u8; 32]) -> Result
<(), Error
> {
572 "cannot write to closed dynamic index file {:?}",
577 let offset_le
: &[u8; 8] = unsafe { &std::mem::transmute::<u64, [u8; 8]>(offset.to_le()) }
;
579 if let Some(ref mut csum
) = self.csum
{
580 csum
.update(offset_le
);
584 self.writer
.write_all(offset_le
)?
;
585 self.writer
.write_all(digest
)?
;
590 /// Writer which splits a binary stream into dynamic sized chunks
592 /// And store the resulting chunk list into the index file.
593 pub struct DynamicChunkWriter
{
594 index
: DynamicIndexWriter
,
600 chunk_buffer
: Vec
<u8>,
603 impl DynamicChunkWriter
{
604 pub fn new(index
: DynamicIndexWriter
, chunk_size
: usize) -> Self {
608 chunker
: Chunker
::new(chunk_size
),
609 stat
: ChunkStat
::new(0),
612 chunk_buffer
: Vec
::with_capacity(chunk_size
* 4),
616 pub fn stat(&self) -> &ChunkStat
{
620 pub fn close(&mut self) -> Result
<(), Error
> {
627 self.write_chunk_buffer()?
;
631 self.stat
.size
= self.chunk_offset
as u64;
633 // add size of index file
635 (self.stat
.chunk_count
* 40 + std
::mem
::size_of
::<DynamicIndexHeader
>()) as u64;
640 fn write_chunk_buffer(&mut self) -> Result
<(), Error
> {
641 let chunk_size
= self.chunk_buffer
.len();
647 let expected_chunk_size
= self.chunk_offset
- self.last_chunk
;
648 if expected_chunk_size
!= self.chunk_buffer
.len() {
649 bail
!("wrong chunk size {} != {}", expected_chunk_size
, chunk_size
);
652 self.stat
.chunk_count
+= 1;
654 self.last_chunk
= self.chunk_offset
;
656 let (chunk
, digest
) = DataChunkBuilder
::new(&self.chunk_buffer
)
660 match self.index
.insert_chunk(&chunk
, &digest
) {
661 Ok((is_duplicate
, compressed_size
)) => {
662 self.stat
.compressed_size
+= compressed_size
;
664 self.stat
.duplicate_chunks
+= 1;
666 self.stat
.disk_size
+= compressed_size
;
670 "ADD CHUNK {:016x} {} {}% {} {}",
673 (compressed_size
* 100) / (chunk_size
as u64),
675 proxmox
::tools
::digest_to_hex(&digest
)
677 self.index
.add_chunk(self.chunk_offset
as u64, &digest
)?
;
678 self.chunk_buffer
.truncate(0);
682 self.chunk_buffer
.truncate(0);
689 impl Write
for DynamicChunkWriter
{
690 fn write(&mut self, data
: &[u8]) -> std
::result
::Result
<usize, std
::io
::Error
> {
691 let chunker
= &mut self.chunker
;
693 let pos
= chunker
.scan(data
);
696 self.chunk_buffer
.extend_from_slice(&data
[0..pos
]);
697 self.chunk_offset
+= pos
;
699 if let Err(err
) = self.write_chunk_buffer() {
700 return Err(std
::io
::Error
::new(
701 std
::io
::ErrorKind
::Other
,
707 self.chunk_offset
+= data
.len();
708 self.chunk_buffer
.extend_from_slice(data
);
713 fn flush(&mut self) -> std
::result
::Result
<(), std
::io
::Error
> {
714 Err(std
::io
::Error
::new(
715 std
::io
::ErrorKind
::Other
,
716 "please use close() instead of flush()",