2 use std
::convert
::TryInto
;
6 use super::chunk_stat
::*;
7 use super::chunk_store
::*;
8 use proxmox_protocol
::Chunker
;
11 use std
::io
::{Write, BufWriter}
;
13 use std
::path
::{Path, PathBuf}
;
14 use std
::os
::unix
::io
::AsRawFd
;
16 //use chrono::{Local, TimeZone};
18 use crate::tools
::io
::ops
::*;
19 use crate::tools
::vec
;
21 use super::{DataChunk, DataChunkBuilder}
;
23 /// Header format definition for dynamic index files (`.dixd`)
25 pub struct DynamicIndexHeader
{
26 /// The string `PROXMOX-DIDX`
30 /// Sha256 over the index ``SHA256(offset1||digest1||offset2||digest2||...)``
31 pub index_csum
: [u8; 32],
32 reserved
: [u8; 4030], // overall size is one page (4096 bytes)
36 pub struct DynamicIndexReader
{
37 store
: Arc
<ChunkStore
>,
45 pub index_csum
: [u8; 32],
48 // `index` is mmap()ed which cannot be thread-local so should be sendable
49 // FIXME: Introduce an mmap wrapper type for this?
50 unsafe impl Send
for DynamicIndexReader {}
52 impl Drop
for DynamicIndexReader
{
55 if let Err(err
) = self.unmap() {
56 eprintln
!("Unable to unmap file {:?} - {}", self.filename
, err
);
61 impl DynamicIndexReader
{
63 pub fn open(store
: Arc
<ChunkStore
>, path
: &Path
) -> Result
<Self, Error
> {
65 let full_path
= store
.relative_path(path
);
67 let mut file
= std
::fs
::File
::open(&full_path
)?
;
69 if let Err(err
) = nix
::fcntl
::flock(file
.as_raw_fd(), nix
::fcntl
::FlockArg
::LockSharedNonblock
) {
70 bail
!("unable to get shared lock on {:?} - {}", full_path
, err
);
73 let header_size
= std
::mem
::size_of
::<DynamicIndexHeader
>();
75 // todo: use static assertion when available in rust
76 if header_size
!= 4096 { bail!("got unexpected header size for {:?}
", path); }
78 let buffer = file.read_exact_allocated(header_size)?;
80 let header = unsafe { &* (buffer.as_ptr() as *const DynamicIndexHeader) };
82 if header.magic != super::DYNAMIC_SIZED_CHUNK_INDEX_1_0 {
83 bail!("got unknown magic number
for {:?}
", path);
86 let ctime = u64::from_le(header.ctime);
88 let rawfd = file.as_raw_fd();
90 let stat = match nix::sys::stat::fstat(rawfd) {
92 Err(err) => bail!("fstat {:?} failed
- {}
", path, err),
95 let size = stat.st_size as usize;
97 let index_size = size - header_size;
98 if (index_size % 40) != 0 {
99 bail!("got unexpected file size
for {:?}
", path);
102 let data = unsafe { nix::sys::mman::mmap(
103 std::ptr::null_mut(),
105 nix::sys::mman::ProtFlags::PROT_READ,
106 nix::sys::mman::MapFlags::MAP_PRIVATE,
108 header_size as i64) }? as *const u8;
116 index_entries: index_size/40,
119 index_csum: header.index_csum,
123 fn unmap(&mut self) -> Result<(), Error> {
125 if self.index == std::ptr::null_mut() { return Ok(()); }
127 if let Err(err) = unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, self.index_entries*40) } {
128 bail!("unmap file {:?} failed
- {}
", self.filename, err);
131 self.index = std::ptr::null_mut();
136 pub fn chunk_info(&self, pos: usize) -> Result<(u64, u64, [u8; 32]), Error> {
137 if pos >= self.index_entries {
138 bail!("chunk index out of range
");
140 let start = if pos == 0 {
143 unsafe { *(self.index.add((pos-1)*40) as *const u64) }
146 let end = unsafe { *(self.index.add(pos*40) as *const u64) };
147 let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() };
148 unsafe { std::ptr::copy_nonoverlapping(self.index.add(pos*40+8), digest.as_mut_ptr(), 32); }
150 Ok((start, end, digest))
154 fn chunk_end(&self, pos: usize) -> u64 {
155 if pos >= self.index_entries {
156 panic!("chunk index out of range
");
158 unsafe { *(self.index.add(pos*40) as *const u64) }
162 fn chunk_digest(&self, pos: usize) -> &[u8; 32] {
163 if pos >= self.index_entries {
164 panic!("chunk index out of range
");
166 let slice = unsafe { std::slice::from_raw_parts(self.index.add(pos*40+8), 32) };
167 slice.try_into().unwrap()
170 pub fn mark_used_chunks(&self, _status: &mut GarbageCollectionStatus) -> Result<(), Error> {
172 for pos in 0..self.index_entries {
174 tools::fail_on_shutdown()?;
176 let digest = self.chunk_digest(pos);
177 if let Err(err) = self.store.touch_chunk(digest) {
178 bail!("unable to access chunk {}
, required by {:?}
- {}
",
179 proxmox::tools::digest_to_hex(digest), self.filename, err);
185 pub fn dump_pxar(&self, mut writer: Box<dyn Write>) -> Result<(), Error> {
187 for pos in 0..self.index_entries {
188 let _end = self.chunk_end(pos);
189 let digest = self.chunk_digest(pos);
190 //println!("Dump {:08x}
", end );
191 let chunk = self.store.read_chunk(digest)?;
192 // fimxe: handle encrypted chunks
193 let data = chunk.decode(None)?;
194 writer.write_all(&data)?;
207 ) -> Result<usize, Error> {
209 if (offset >= end) || (offset < start) {
210 bail!("offset out of range
");
213 if end_idx == start_idx {
214 return Ok(start_idx); // found
216 let middle_idx = (start_idx + end_idx)/2;
217 let middle_end = self.chunk_end(middle_idx);
219 if offset < middle_end {
220 return self.binary_search(start_idx, start, middle_idx, middle_end, offset);
222 return self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset);
227 impl IndexFile for DynamicIndexReader {
228 fn index_count(&self) -> usize {
232 fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> {
233 if pos >= self.index_entries {
237 std::mem::transmute(self.chunk_digest(pos).as_ptr())
243 pub struct BufferedDynamicReader {
244 index: DynamicIndexReader,
246 read_buffer: Vec<u8>,
247 buffered_chunk_idx: usize,
248 buffered_chunk_start: u64,
252 impl BufferedDynamicReader {
254 pub fn new(index: DynamicIndexReader) -> Self {
256 let archive_size = index.chunk_end(index.index_entries - 1);
259 archive_size: archive_size,
260 read_buffer: Vec::with_capacity(1024*1024),
261 buffered_chunk_idx: 0,
262 buffered_chunk_start: 0,
267 pub fn archive_size(&self) -> u64 { self.archive_size }
269 fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
271 let index = &self.index;
272 let end = index.chunk_end(idx);
273 let digest = index.chunk_digest(idx);
275 let chunk = index.store.read_chunk(digest)?;
276 // fimxe: handle encrypted chunks
278 let data = chunk.decode(None)?;
280 self.read_buffer.clear();
281 self.read_buffer.extend_from_slice(&data);
283 self.buffered_chunk_idx = idx;
284 self.buffered_chunk_start = end - (self.read_buffer.len() as u64);
285 //println!("BUFFER {} {}
", self.buffered_chunk_start, end);
290 impl crate::tools::BufferedRead for BufferedDynamicReader {
292 fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
294 if offset == self.archive_size { return Ok(&self.read_buffer[0..0]); }
296 let buffer_len = self.read_buffer.len();
297 let index = &self.index;
299 // optimization for sequential read
301 ((self.buffered_chunk_idx + 1) < index.index_entries) &&
302 (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
304 let next_idx = self.buffered_chunk_idx + 1;
305 let next_end = index.chunk_end(next_idx);
306 if offset < next_end {
307 self.buffer_chunk(next_idx)?;
308 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
309 return Ok(&self.read_buffer[buffer_offset..]);
313 if (buffer_len == 0) ||
314 (offset < self.buffered_chunk_start) ||
315 (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
317 let end_idx = index.index_entries - 1;
318 let end = index.chunk_end(end_idx);
319 let idx = index.binary_search(0, 0, end_idx, end, offset)?;
320 self.buffer_chunk(idx)?;
323 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
324 Ok(&self.read_buffer[buffer_offset..])
329 impl std::io::Read for BufferedDynamicReader {
331 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
333 use std::io::{Error, ErrorKind};
334 use crate::tools::BufferedRead;
336 let data = match self.buffered_read(self.read_offset) {
338 Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
341 let n = if data.len() > buf.len() { buf.len() } else { data.len() };
343 unsafe { std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n); }
345 self.read_offset += n as u64;
351 impl std::io::Seek for BufferedDynamicReader {
353 fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, std::io::Error> {
355 use std::io::{SeekFrom};
357 let new_offset = match pos {
358 SeekFrom::Start(start_offset) => start_offset as i64,
359 SeekFrom::End(end_offset) => (self.archive_size as i64)+ end_offset,
360 SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
363 use std::io::{Error, ErrorKind};
364 if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
365 return Err(Error::new(
367 format!("seek is out of range {}
([0..{}
])", new_offset, self.archive_size)));
369 self.read_offset = new_offset as u64;
375 /// Create dynamic index files (`.dixd`)
376 pub struct DynamicIndexWriter {
377 store: Arc<ChunkStore>,
378 _lock: tools::ProcessLockSharedGuard,
379 writer: BufWriter<File>,
382 tmp_filename: PathBuf,
383 csum: Option<openssl::sha::Sha256>,
388 impl Drop for DynamicIndexWriter {
391 let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors
395 impl DynamicIndexWriter {
397 pub fn create(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> {
399 let shared_lock = store.try_shared_lock()?;
401 let full_path = store.relative_path(path);
402 let mut tmp_path = full_path.clone();
403 tmp_path.set_extension("tmp_didx
");
405 let file = std::fs::OpenOptions::new()
406 .create(true).truncate(true)
411 let mut writer = BufWriter::with_capacity(1024*1024, file);
413 let header_size = std::mem::size_of::<DynamicIndexHeader>();
415 // todo: use static assertion when available in rust
416 if header_size != 4096 { panic!("got unexpected header size"); }
418 let ctime = std::time::SystemTime::now().duration_since(
419 std::time::SystemTime::UNIX_EPOCH)?.as_secs();
421 let uuid = Uuid::new_v4();
423 let mut buffer = vec::undefined(header_size);
424 let header = crate::tools::map_struct_mut::<DynamicIndexHeader>(&mut buffer)?;
426 header.magic = super::DYNAMIC_SIZED_CHUNK_INDEX_1_0;
427 header.ctime = u64::to_le(ctime);
428 header.uuid = *uuid.as_bytes();
430 header.index_csum = [0u8; 32];
432 writer.write_all(&buffer)?;
434 let csum = Some(openssl::sha::Sha256::new());
442 tmp_filename: tmp_path,
444 uuid: *uuid.as_bytes(),
449 // fixme: use add_chunk instead?
450 pub fn insert_chunk(&self, chunk: &DataChunk) -> Result<(bool, u64), Error> {
451 self.store.insert_chunk(chunk)
454 pub fn close(&mut self) -> Result<[u8; 32], Error> {
457 bail!("cannot close already closed archive index file {:?}
", self.filename);
462 self.writer.flush()?;
466 let csum_offset = proxmox::tools::offsetof!(DynamicIndexHeader, index_csum);
467 self.writer.seek(std::io::SeekFrom::Start(csum_offset as u64))?;
469 let csum = self.csum.take().unwrap();
470 let index_csum = csum.finish();
472 self.writer.write_all(&index_csum)?;
473 self.writer.flush()?;
476 if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
477 bail!("Atomic rename file {:?} failed
- {}
", self.filename, err);
483 // fixme: rename to add_digest
484 pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), Error> {
486 bail!("cannot write to closed dynamic index file {:?}
", self.filename);
489 let offset_le: &[u8; 8] = unsafe { &std::mem::transmute::<u64, [u8;8]>(offset.to_le()) };
491 if let Some(ref mut csum) = self.csum {
492 csum.update(offset_le);
496 self.writer.write(offset_le)?;
497 self.writer.write(digest)?;
502 /// Writer which splits a binary stream into dynamic sized chunks
504 /// And store the resulting chunk list into the index file.
505 pub struct DynamicChunkWriter {
506 index: DynamicIndexWriter,
512 chunk_buffer: Vec<u8>,
515 impl DynamicChunkWriter {
517 pub fn new(index: DynamicIndexWriter, chunk_size: usize) -> Self {
521 chunker: Chunker::new(chunk_size),
522 stat: ChunkStat::new(0),
525 chunk_buffer: Vec::with_capacity(chunk_size*4),
529 pub fn stat(&self) -> &ChunkStat {
533 pub fn close(&mut self) -> Result<(), Error> {
541 self.write_chunk_buffer()?;
545 self.stat.size = self.chunk_offset as u64;
547 // add size of index file
548 self.stat.size += (self.stat.chunk_count*40 + std::mem::size_of::<DynamicIndexHeader>()) as u64;
553 fn write_chunk_buffer(&mut self) -> Result<(), Error> {
555 let chunk_size = self.chunk_buffer.len();
557 if chunk_size == 0 { return Ok(()); }
559 let expected_chunk_size = self.chunk_offset - self.last_chunk;
560 if expected_chunk_size != self.chunk_buffer.len() {
561 bail!("wrong chunk size {}
!= {}
", expected_chunk_size, chunk_size);
564 self.stat.chunk_count += 1;
566 self.last_chunk = self.chunk_offset;
568 let chunk = DataChunkBuilder::new(&self.chunk_buffer)
572 let digest = chunk.digest();
574 match self.index.insert_chunk(&chunk) {
575 Ok((is_duplicate, compressed_size)) => {
577 self.stat.compressed_size += compressed_size;
579 self.stat.duplicate_chunks += 1;
581 self.stat.disk_size += compressed_size;
584 println!("ADD CHUNK {:016x} {} {}
% {} {}
", self.chunk_offset, chunk_size,
585 (compressed_size*100)/(chunk_size as u64), is_duplicate, proxmox::tools::digest_to_hex(digest));
586 self.index.add_chunk(self.chunk_offset as u64, &digest)?;
587 self.chunk_buffer.truncate(0);
591 self.chunk_buffer.truncate(0);
598 impl Write for DynamicChunkWriter {
600 fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
602 let chunker = &mut self.chunker;
604 let pos = chunker.scan(data);
607 self.chunk_buffer.extend(&data[0..pos]);
608 self.chunk_offset += pos;
610 if let Err(err) = self.write_chunk_buffer() {
611 return Err(std::io::Error::new(std::io::ErrorKind::Other, err.to_string()));
616 self.chunk_offset += data.len();
617 self.chunk_buffer.extend(data);
622 fn flush(&mut self) -> std::result::Result<(), std::io::Error> {
623 Err(std::io::Error::new(std::io::ErrorKind::Other, "please
use close() instead of
flush()"))