]> git.proxmox.com Git - proxmox-backup.git/blob - pbs-datastore/src/dynamic_index.rs
56d9b08015a5e43dd5af4946247e3f4c58928f5b
[proxmox-backup.git] / pbs-datastore / src / dynamic_index.rs
1 use std::fs::File;
2 use std::io::{BufWriter, Seek, SeekFrom, Write};
3 use std::ops::Range;
4 use std::os::unix::io::AsRawFd;
5 use std::path::{Path, PathBuf};
6 use std::pin::Pin;
7 use std::sync::{Arc, Mutex};
8 use std::task::Context;
9
10 use anyhow::{bail, format_err, Error};
11
12 use proxmox::tools::mmap::Mmap;
13 use proxmox_io::ReadExt;
14 use proxmox_uuid::Uuid;
15 use proxmox_sys::process_locker::ProcessLockSharedGuard;
16 use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
17
18 use pbs_tools::lru_cache::LruCache;
19
20 use crate::Chunker;
21 use crate::chunk_stat::ChunkStat;
22 use crate::chunk_store::ChunkStore;
23 use crate::data_blob::{DataBlob, DataChunkBuilder};
24 use crate::file_formats;
25 use crate::index::{IndexFile, ChunkReadInfo};
26 use crate::read_chunk::ReadChunk;
27
28 /// Header format definition for dynamic index files (`.dixd`)
29 #[repr(C)]
30 pub struct DynamicIndexHeader {
31 pub magic: [u8; 8],
32 pub uuid: [u8; 16],
33 pub ctime: i64,
34 /// Sha256 over the index ``SHA256(offset1||digest1||offset2||digest2||...)``
35 pub index_csum: [u8; 32],
36 reserved: [u8; 4032], // overall size is one page (4096 bytes)
37 }
38 proxmox_lang::static_assert_size!(DynamicIndexHeader, 4096);
39 // TODO: Once non-Copy unions are stabilized, use:
40 // union DynamicIndexHeader {
41 // reserved: [u8; 4096],
42 // pub data: DynamicIndexHeaderData,
43 // }
44
45 impl DynamicIndexHeader {
46 /// Convenience method to allocate a zero-initialized header struct.
47 pub fn zeroed() -> Box<Self> {
48 unsafe {
49 Box::from_raw(std::alloc::alloc_zeroed(std::alloc::Layout::new::<Self>()) as *mut Self)
50 }
51 }
52
53 pub fn as_bytes(&self) -> &[u8] {
54 unsafe {
55 std::slice::from_raw_parts(
56 self as *const Self as *const u8,
57 std::mem::size_of::<Self>(),
58 )
59 }
60 }
61 }
62
63 #[derive(Clone, Debug)]
64 #[repr(C)]
65 pub struct DynamicEntry {
66 end_le: u64,
67 digest: [u8; 32],
68 }
69
70 impl DynamicEntry {
71 #[inline]
72 pub fn end(&self) -> u64 {
73 u64::from_le(self.end_le)
74 }
75 }
76
77 pub struct DynamicIndexReader {
78 _file: File,
79 pub size: usize,
80 index: Mmap<DynamicEntry>,
81 pub uuid: [u8; 16],
82 pub ctime: i64,
83 pub index_csum: [u8; 32],
84 }
85
86 impl DynamicIndexReader {
87 pub fn open(path: &Path) -> Result<Self, Error> {
88 File::open(path)
89 .map_err(Error::from)
90 .and_then(Self::new)
91 .map_err(|err| format_err!("Unable to open dynamic index {:?} - {}", path, err))
92 }
93
94 pub fn index(&self) -> &[DynamicEntry] {
95 &self.index
96 }
97
98 pub fn new(mut file: std::fs::File) -> Result<Self, Error> {
99 // FIXME: This is NOT OUR job! Check the callers of this method and remove this!
100 file.seek(SeekFrom::Start(0))?;
101
102 let header_size = std::mem::size_of::<DynamicIndexHeader>();
103
104 let rawfd = file.as_raw_fd();
105 let stat = match nix::sys::stat::fstat(rawfd) {
106 Ok(stat) => stat,
107 Err(err) => bail!("fstat failed - {}", err),
108 };
109
110 let size = stat.st_size as usize;
111
112 if size < header_size {
113 bail!("index too small ({})", stat.st_size);
114 }
115
116 let header: Box<DynamicIndexHeader> = unsafe { file.read_host_value_boxed()? };
117
118 if header.magic != file_formats::DYNAMIC_SIZED_CHUNK_INDEX_1_0 {
119 bail!("got unknown magic number");
120 }
121
122 let ctime = proxmox_time::epoch_i64();
123
124 let index_size = stat.st_size as usize - header_size;
125 let index_count = index_size / 40;
126 if index_count * 40 != index_size {
127 bail!("got unexpected file size");
128 }
129
130 let index = unsafe {
131 Mmap::map_fd(
132 rawfd,
133 header_size as u64,
134 index_count,
135 nix::sys::mman::ProtFlags::PROT_READ,
136 nix::sys::mman::MapFlags::MAP_PRIVATE,
137 )?
138 };
139
140 Ok(Self {
141 _file: file,
142 size,
143 index,
144 ctime,
145 uuid: header.uuid,
146 index_csum: header.index_csum,
147 })
148 }
149
150 #[inline]
151 #[allow(clippy::cast_ptr_alignment)]
152 pub fn chunk_end(&self, pos: usize) -> u64 {
153 if pos >= self.index.len() {
154 panic!("chunk index out of range");
155 }
156 self.index[pos].end()
157 }
158
159 #[inline]
160 fn chunk_digest(&self, pos: usize) -> &[u8; 32] {
161 if pos >= self.index.len() {
162 panic!("chunk index out of range");
163 }
164 &self.index[pos].digest
165 }
166
167 pub fn binary_search(
168 &self,
169 start_idx: usize,
170 start: u64,
171 end_idx: usize,
172 end: u64,
173 offset: u64,
174 ) -> Result<usize, Error> {
175 if (offset >= end) || (offset < start) {
176 bail!("offset out of range");
177 }
178
179 if end_idx == start_idx {
180 return Ok(start_idx); // found
181 }
182 let middle_idx = (start_idx + end_idx) / 2;
183 let middle_end = self.chunk_end(middle_idx);
184
185 if offset < middle_end {
186 self.binary_search(start_idx, start, middle_idx, middle_end, offset)
187 } else {
188 self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset)
189 }
190 }
191 }
192
193 impl IndexFile for DynamicIndexReader {
194 fn index_count(&self) -> usize {
195 self.index.len()
196 }
197
198 fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> {
199 if pos >= self.index.len() {
200 None
201 } else {
202 Some(unsafe { &*(self.chunk_digest(pos).as_ptr() as *const [u8; 32]) })
203 }
204 }
205
206 fn index_bytes(&self) -> u64 {
207 if self.index.is_empty() {
208 0
209 } else {
210 self.chunk_end(self.index.len() - 1)
211 }
212 }
213
214 fn compute_csum(&self) -> ([u8; 32], u64) {
215 let mut csum = openssl::sha::Sha256::new();
216 let mut chunk_end = 0;
217 for pos in 0..self.index_count() {
218 let info = self.chunk_info(pos).unwrap();
219 chunk_end = info.range.end;
220 csum.update(&chunk_end.to_le_bytes());
221 csum.update(&info.digest);
222 }
223 let csum = csum.finish();
224 (csum, chunk_end)
225 }
226
227 fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo> {
228 if pos >= self.index.len() {
229 return None;
230 }
231 let start = if pos == 0 { 0 } else { self.index[pos - 1].end() };
232
233 let end = self.index[pos].end();
234
235 Some(ChunkReadInfo {
236 range: start..end,
237 digest: self.index[pos].digest,
238 })
239 }
240
241 fn index_ctime(&self) -> i64 {
242 self.ctime
243 }
244
245 fn index_size(&self) -> usize {
246 self.size as usize
247 }
248
249 fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
250 let end_idx = self.index.len() - 1;
251 let end = self.chunk_end(end_idx);
252 let found_idx = self.binary_search(0, 0, end_idx, end, offset);
253 let found_idx = match found_idx {
254 Ok(i) => i,
255 Err(_) => return None
256 };
257
258 let found_start = if found_idx == 0 {
259 0
260 } else {
261 self.chunk_end(found_idx - 1)
262 };
263
264 Some((found_idx, offset - found_start))
265 }
266 }
267
268 /// Create dynamic index files (`.dixd`)
269 pub struct DynamicIndexWriter {
270 store: Arc<ChunkStore>,
271 _lock: ProcessLockSharedGuard,
272 writer: BufWriter<File>,
273 closed: bool,
274 filename: PathBuf,
275 tmp_filename: PathBuf,
276 csum: Option<openssl::sha::Sha256>,
277 pub uuid: [u8; 16],
278 pub ctime: i64,
279 }
280
281 impl Drop for DynamicIndexWriter {
282 fn drop(&mut self) {
283 let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors
284 }
285 }
286
287 impl DynamicIndexWriter {
288 pub fn create(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> {
289 let shared_lock = store.try_shared_lock()?;
290
291 let full_path = store.relative_path(path);
292 let mut tmp_path = full_path.clone();
293 tmp_path.set_extension("tmp_didx");
294
295 let file = std::fs::OpenOptions::new()
296 .create(true)
297 .truncate(true)
298 .read(true)
299 .write(true)
300 .open(&tmp_path)?;
301
302 let mut writer = BufWriter::with_capacity(1024 * 1024, file);
303
304 let ctime = proxmox_time::epoch_i64();
305
306 let uuid = Uuid::generate();
307
308 let mut header = DynamicIndexHeader::zeroed();
309 header.magic = file_formats::DYNAMIC_SIZED_CHUNK_INDEX_1_0;
310 header.ctime = i64::to_le(ctime);
311 header.uuid = *uuid.as_bytes();
312 // header.index_csum = [0u8; 32];
313 writer.write_all(header.as_bytes())?;
314
315 let csum = Some(openssl::sha::Sha256::new());
316
317 Ok(Self {
318 store,
319 _lock: shared_lock,
320 writer,
321 closed: false,
322 filename: full_path,
323 tmp_filename: tmp_path,
324 ctime,
325 uuid: *uuid.as_bytes(),
326 csum,
327 })
328 }
329
330 // fixme: use add_chunk instead?
331 pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> {
332 self.store.insert_chunk(chunk, digest)
333 }
334
335 pub fn close(&mut self) -> Result<[u8; 32], Error> {
336 if self.closed {
337 bail!(
338 "cannot close already closed archive index file {:?}",
339 self.filename
340 );
341 }
342
343 self.closed = true;
344
345 self.writer.flush()?;
346
347 let csum_offset = proxmox_lang::offsetof!(DynamicIndexHeader, index_csum);
348 self.writer.seek(SeekFrom::Start(csum_offset as u64))?;
349
350 let csum = self.csum.take().unwrap();
351 let index_csum = csum.finish();
352
353 self.writer.write_all(&index_csum)?;
354 self.writer.flush()?;
355
356 if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
357 bail!("Atomic rename file {:?} failed - {}", self.filename, err);
358 }
359
360 Ok(index_csum)
361 }
362
363 // fixme: rename to add_digest
364 pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), Error> {
365 if self.closed {
366 bail!(
367 "cannot write to closed dynamic index file {:?}",
368 self.filename
369 );
370 }
371
372 let offset_le: &[u8; 8] = unsafe { &std::mem::transmute::<u64, [u8; 8]>(offset.to_le()) };
373
374 if let Some(ref mut csum) = self.csum {
375 csum.update(offset_le);
376 csum.update(digest);
377 }
378
379 self.writer.write_all(offset_le)?;
380 self.writer.write_all(digest)?;
381 Ok(())
382 }
383 }
384
385 /// Writer which splits a binary stream into dynamic sized chunks
386 ///
387 /// And store the resulting chunk list into the index file.
388 pub struct DynamicChunkWriter {
389 index: DynamicIndexWriter,
390 closed: bool,
391 chunker: Chunker,
392 stat: ChunkStat,
393 chunk_offset: usize,
394 last_chunk: usize,
395 chunk_buffer: Vec<u8>,
396 }
397
398 impl DynamicChunkWriter {
399 pub fn new(index: DynamicIndexWriter, chunk_size: usize) -> Self {
400 Self {
401 index,
402 closed: false,
403 chunker: Chunker::new(chunk_size),
404 stat: ChunkStat::new(0),
405 chunk_offset: 0,
406 last_chunk: 0,
407 chunk_buffer: Vec::with_capacity(chunk_size * 4),
408 }
409 }
410
411 pub fn stat(&self) -> &ChunkStat {
412 &self.stat
413 }
414
415 pub fn close(&mut self) -> Result<(), Error> {
416 if self.closed {
417 return Ok(());
418 }
419
420 self.closed = true;
421
422 self.write_chunk_buffer()?;
423
424 self.index.close()?;
425
426 self.stat.size = self.chunk_offset as u64;
427
428 // add size of index file
429 self.stat.size +=
430 (self.stat.chunk_count * 40 + std::mem::size_of::<DynamicIndexHeader>()) as u64;
431
432 Ok(())
433 }
434
435 fn write_chunk_buffer(&mut self) -> Result<(), Error> {
436 let chunk_size = self.chunk_buffer.len();
437
438 if chunk_size == 0 {
439 return Ok(());
440 }
441
442 let expected_chunk_size = self.chunk_offset - self.last_chunk;
443 if expected_chunk_size != self.chunk_buffer.len() {
444 bail!("wrong chunk size {} != {}", expected_chunk_size, chunk_size);
445 }
446
447 self.stat.chunk_count += 1;
448
449 self.last_chunk = self.chunk_offset;
450
451 let (chunk, digest) = DataChunkBuilder::new(&self.chunk_buffer)
452 .compress(true)
453 .build()?;
454
455 match self.index.insert_chunk(&chunk, &digest) {
456 Ok((is_duplicate, compressed_size)) => {
457 self.stat.compressed_size += compressed_size;
458 if is_duplicate {
459 self.stat.duplicate_chunks += 1;
460 } else {
461 self.stat.disk_size += compressed_size;
462 }
463
464 println!(
465 "ADD CHUNK {:016x} {} {}% {} {}",
466 self.chunk_offset,
467 chunk_size,
468 (compressed_size * 100) / (chunk_size as u64),
469 is_duplicate,
470 proxmox::tools::digest_to_hex(&digest)
471 );
472 self.index.add_chunk(self.chunk_offset as u64, &digest)?;
473 self.chunk_buffer.truncate(0);
474 Ok(())
475 }
476 Err(err) => {
477 self.chunk_buffer.truncate(0);
478 Err(err)
479 }
480 }
481 }
482 }
483
484 impl Write for DynamicChunkWriter {
485 fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
486 let chunker = &mut self.chunker;
487
488 let pos = chunker.scan(data);
489
490 if pos > 0 {
491 self.chunk_buffer.extend_from_slice(&data[0..pos]);
492 self.chunk_offset += pos;
493
494 if let Err(err) = self.write_chunk_buffer() {
495 return Err(std::io::Error::new(
496 std::io::ErrorKind::Other,
497 err.to_string(),
498 ));
499 }
500 Ok(pos)
501 } else {
502 self.chunk_offset += data.len();
503 self.chunk_buffer.extend_from_slice(data);
504 Ok(data.len())
505 }
506 }
507
508 fn flush(&mut self) -> std::result::Result<(), std::io::Error> {
509 Err(std::io::Error::new(
510 std::io::ErrorKind::Other,
511 "please use close() instead of flush()",
512 ))
513 }
514 }
515
516 struct CachedChunk {
517 range: Range<u64>,
518 data: Vec<u8>,
519 }
520
521 impl CachedChunk {
522 /// Perform sanity checks on the range and data size:
523 pub fn new(range: Range<u64>, data: Vec<u8>) -> Result<Self, Error> {
524 if data.len() as u64 != range.end - range.start {
525 bail!(
526 "read chunk with wrong size ({} != {})",
527 data.len(),
528 range.end - range.start,
529 );
530 }
531 Ok(Self { range, data })
532 }
533 }
534
535 pub struct BufferedDynamicReader<S> {
536 store: S,
537 index: DynamicIndexReader,
538 archive_size: u64,
539 read_buffer: Vec<u8>,
540 buffered_chunk_idx: usize,
541 buffered_chunk_start: u64,
542 read_offset: u64,
543 lru_cache: LruCache<usize, CachedChunk>,
544 }
545
546 struct ChunkCacher<'a, S> {
547 store: &'a mut S,
548 index: &'a DynamicIndexReader,
549 }
550
551 impl<'a, S: ReadChunk> pbs_tools::lru_cache::Cacher<usize, CachedChunk> for ChunkCacher<'a, S> {
552 fn fetch(&mut self, index: usize) -> Result<Option<CachedChunk>, Error> {
553 let info = match self.index.chunk_info(index) {
554 Some(info) => info,
555 None => bail!("chunk index out of range"),
556 };
557 let range = info.range;
558 let data = self.store.read_chunk(&info.digest)?;
559 CachedChunk::new(range, data).map(Some)
560 }
561 }
562
563 impl<S: ReadChunk> BufferedDynamicReader<S> {
564 pub fn new(index: DynamicIndexReader, store: S) -> Self {
565 let archive_size = index.index_bytes();
566 Self {
567 store,
568 index,
569 archive_size,
570 read_buffer: Vec::with_capacity(1024 * 1024),
571 buffered_chunk_idx: 0,
572 buffered_chunk_start: 0,
573 read_offset: 0,
574 lru_cache: LruCache::new(32),
575 }
576 }
577
578 pub fn archive_size(&self) -> u64 {
579 self.archive_size
580 }
581
582 fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
583 //let (start, end, data) = self.lru_cache.access(
584 let cached_chunk = self.lru_cache.access(
585 idx,
586 &mut ChunkCacher {
587 store: &mut self.store,
588 index: &self.index,
589 },
590 )?.ok_or_else(|| format_err!("chunk not found by cacher"))?;
591
592 // fixme: avoid copy
593 self.read_buffer.clear();
594 self.read_buffer.extend_from_slice(&cached_chunk.data);
595
596 self.buffered_chunk_idx = idx;
597
598 self.buffered_chunk_start = cached_chunk.range.start;
599 //println!("BUFFER {} {}", self.buffered_chunk_start, end);
600 Ok(())
601 }
602 }
603
604 impl<S: ReadChunk> pbs_tools::io::BufferedRead for BufferedDynamicReader<S> {
605 fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
606 if offset == self.archive_size {
607 return Ok(&self.read_buffer[0..0]);
608 }
609
610 let buffer_len = self.read_buffer.len();
611 let index = &self.index;
612
613 // optimization for sequential read
614 if buffer_len > 0
615 && ((self.buffered_chunk_idx + 1) < index.index().len())
616 && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
617 {
618 let next_idx = self.buffered_chunk_idx + 1;
619 let next_end = index.chunk_end(next_idx);
620 if offset < next_end {
621 self.buffer_chunk(next_idx)?;
622 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
623 return Ok(&self.read_buffer[buffer_offset..]);
624 }
625 }
626
627 if (buffer_len == 0)
628 || (offset < self.buffered_chunk_start)
629 || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
630 {
631 let end_idx = index.index().len() - 1;
632 let end = index.chunk_end(end_idx);
633 let idx = index.binary_search(0, 0, end_idx, end, offset)?;
634 self.buffer_chunk(idx)?;
635 }
636
637 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
638 Ok(&self.read_buffer[buffer_offset..])
639 }
640 }
641
642 impl<S: ReadChunk> std::io::Read for BufferedDynamicReader<S> {
643 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
644 use pbs_tools::io::BufferedRead;
645 use std::io::{Error, ErrorKind};
646
647 let data = match self.buffered_read(self.read_offset) {
648 Ok(v) => v,
649 Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
650 };
651
652 let n = if data.len() > buf.len() {
653 buf.len()
654 } else {
655 data.len()
656 };
657
658 buf[0..n].copy_from_slice(&data[0..n]);
659
660 self.read_offset += n as u64;
661
662 Ok(n)
663 }
664 }
665
666 impl<S: ReadChunk> std::io::Seek for BufferedDynamicReader<S> {
667 fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
668 let new_offset = match pos {
669 SeekFrom::Start(start_offset) => start_offset as i64,
670 SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
671 SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
672 };
673
674 use std::io::{Error, ErrorKind};
675 if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
676 return Err(Error::new(
677 ErrorKind::Other,
678 format!(
679 "seek is out of range {} ([0..{}])",
680 new_offset, self.archive_size
681 ),
682 ));
683 }
684 self.read_offset = new_offset as u64;
685
686 Ok(self.read_offset)
687 }
688 }
689
690 /// This is a workaround until we have cleaned up the chunk/reader/... infrastructure for better
691 /// async use!
692 ///
693 /// Ideally BufferedDynamicReader gets replaced so the LruCache maps to `BroadcastFuture<Chunk>`,
694 /// so that we can properly access it from multiple threads simultaneously while not issuing
695 /// duplicate simultaneous reads over http.
696 #[derive(Clone)]
697 pub struct LocalDynamicReadAt<R: ReadChunk> {
698 inner: Arc<Mutex<BufferedDynamicReader<R>>>,
699 }
700
701 impl<R: ReadChunk> LocalDynamicReadAt<R> {
702 pub fn new(inner: BufferedDynamicReader<R>) -> Self {
703 Self {
704 inner: Arc::new(Mutex::new(inner)),
705 }
706 }
707 }
708
709 impl<R: ReadChunk> ReadAt for LocalDynamicReadAt<R> {
710 fn start_read_at<'a>(
711 self: Pin<&'a Self>,
712 _cx: &mut Context,
713 buf: &'a mut [u8],
714 offset: u64,
715 ) -> MaybeReady<std::io::Result<usize>, ReadAtOperation<'a>> {
716 use std::io::Read;
717 MaybeReady::Ready(tokio::task::block_in_place(move || {
718 let mut reader = self.inner.lock().unwrap();
719 reader.seek(SeekFrom::Start(offset))?;
720 Ok(reader.read(buf)?)
721 }))
722 }
723
724 fn poll_complete<'a>(
725 self: Pin<&'a Self>,
726 _op: ReadAtOperation<'a>,
727 ) -> MaybeReady<std::io::Result<usize>, ReadAtOperation<'a>> {
728 panic!("LocalDynamicReadAt::start_read_at returned Pending");
729 }
730 }