]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/dynamic_index.rs
src/backup/backup_info.rs: also list .blob files
[proxmox-backup.git] / src / backup / dynamic_index.rs
1 use failure::*;
2 use std::convert::TryInto;
3
4 use crate::tools;
5 use super::IndexFile;
6 use super::chunk_stat::*;
7 use super::chunk_store::*;
8 use proxmox_protocol::Chunker;
9
10 use std::sync::Arc;
11 use std::io::{Write, BufWriter};
12 use std::fs::File;
13 use std::path::{Path, PathBuf};
14 use std::os::unix::io::AsRawFd;
15 use uuid::Uuid;
16 //use chrono::{Local, TimeZone};
17
18 use crate::tools::io::ops::*;
19 use crate::tools::vec;
20
21 use super::{DataChunk, DataChunkBuilder};
22
23 /// Header format definition for dynamic index files (`.dixd`)
24 #[repr(C)]
25 pub struct DynamicIndexHeader {
26 /// The string `PROXMOX-DIDX`
27 pub magic: [u8; 8],
28 pub uuid: [u8; 16],
29 pub ctime: u64,
30 /// Sha256 over the index ``SHA256(offset1||digest1||offset2||digest2||...)``
31 pub index_csum: [u8; 32],
32 reserved: [u8; 4030], // overall size is one page (4096 bytes)
33 }
34
35
36 pub struct DynamicIndexReader {
37 store: Arc<ChunkStore>,
38 _file: File,
39 pub size: usize,
40 filename: PathBuf,
41 index: *const u8,
42 index_entries: usize,
43 pub uuid: [u8; 16],
44 pub ctime: u64,
45 pub index_csum: [u8; 32],
46 }
47
48 // `index` is mmap()ed which cannot be thread-local so should be sendable
49 // FIXME: Introduce an mmap wrapper type for this?
50 unsafe impl Send for DynamicIndexReader {}
51
52 impl Drop for DynamicIndexReader {
53
54 fn drop(&mut self) {
55 if let Err(err) = self.unmap() {
56 eprintln!("Unable to unmap file {:?} - {}", self.filename, err);
57 }
58 }
59 }
60
61 impl DynamicIndexReader {
62
63 pub fn open(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> {
64
65 let full_path = store.relative_path(path);
66
67 let mut file = std::fs::File::open(&full_path)?;
68
69 if let Err(err) = nix::fcntl::flock(file.as_raw_fd(), nix::fcntl::FlockArg::LockSharedNonblock) {
70 bail!("unable to get shared lock on {:?} - {}", full_path, err);
71 }
72
73 let header_size = std::mem::size_of::<DynamicIndexHeader>();
74
75 // todo: use static assertion when available in rust
76 if header_size != 4096 { bail!("got unexpected header size for {:?}", path); }
77
78 let buffer = file.read_exact_allocated(header_size)?;
79
80 let header = unsafe { &* (buffer.as_ptr() as *const DynamicIndexHeader) };
81
82 if header.magic != super::DYNAMIC_SIZED_CHUNK_INDEX_1_0 {
83 bail!("got unknown magic number for {:?}", path);
84 }
85
86 let ctime = u64::from_le(header.ctime);
87
88 let rawfd = file.as_raw_fd();
89
90 let stat = match nix::sys::stat::fstat(rawfd) {
91 Ok(stat) => stat,
92 Err(err) => bail!("fstat {:?} failed - {}", path, err),
93 };
94
95 let size = stat.st_size as usize;
96
97 let index_size = size - header_size;
98 if (index_size % 40) != 0 {
99 bail!("got unexpected file size for {:?}", path);
100 }
101
102 let data = unsafe { nix::sys::mman::mmap(
103 std::ptr::null_mut(),
104 index_size,
105 nix::sys::mman::ProtFlags::PROT_READ,
106 nix::sys::mman::MapFlags::MAP_PRIVATE,
107 rawfd,
108 header_size as i64) }? as *const u8;
109
110 Ok(Self {
111 store,
112 filename: full_path,
113 _file: file,
114 size,
115 index: data,
116 index_entries: index_size/40,
117 ctime,
118 uuid: header.uuid,
119 index_csum: header.index_csum,
120 })
121 }
122
123 fn unmap(&mut self) -> Result<(), Error> {
124
125 if self.index == std::ptr::null_mut() { return Ok(()); }
126
127 if let Err(err) = unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, self.index_entries*40) } {
128 bail!("unmap file {:?} failed - {}", self.filename, err);
129 }
130
131 self.index = std::ptr::null_mut();
132
133 Ok(())
134 }
135
136 pub fn chunk_info(&self, pos: usize) -> Result<(u64, u64, [u8; 32]), Error> {
137 if pos >= self.index_entries {
138 bail!("chunk index out of range");
139 }
140 let start = if pos == 0 {
141 0
142 } else {
143 unsafe { *(self.index.add((pos-1)*40) as *const u64) }
144 };
145
146 let end = unsafe { *(self.index.add(pos*40) as *const u64) };
147 let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() };
148 unsafe { std::ptr::copy_nonoverlapping(self.index.add(pos*40+8), digest.as_mut_ptr(), 32); }
149
150 Ok((start, end, digest))
151 }
152
153 #[inline]
154 fn chunk_end(&self, pos: usize) -> u64 {
155 if pos >= self.index_entries {
156 panic!("chunk index out of range");
157 }
158 unsafe { *(self.index.add(pos*40) as *const u64) }
159 }
160
161 #[inline]
162 fn chunk_digest(&self, pos: usize) -> &[u8; 32] {
163 if pos >= self.index_entries {
164 panic!("chunk index out of range");
165 }
166 let slice = unsafe { std::slice::from_raw_parts(self.index.add(pos*40+8), 32) };
167 slice.try_into().unwrap()
168 }
169
170 pub fn mark_used_chunks(&self, _status: &mut GarbageCollectionStatus) -> Result<(), Error> {
171
172 for pos in 0..self.index_entries {
173
174 tools::fail_on_shutdown()?;
175
176 let digest = self.chunk_digest(pos);
177 if let Err(err) = self.store.touch_chunk(digest) {
178 bail!("unable to access chunk {}, required by {:?} - {}",
179 proxmox::tools::digest_to_hex(digest), self.filename, err);
180 }
181 }
182 Ok(())
183 }
184
185 pub fn dump_pxar(&self, mut writer: Box<dyn Write>) -> Result<(), Error> {
186
187 for pos in 0..self.index_entries {
188 let _end = self.chunk_end(pos);
189 let digest = self.chunk_digest(pos);
190 //println!("Dump {:08x}", end );
191 let chunk = self.store.read_chunk(digest)?;
192 // fimxe: handle encrypted chunks
193 let data = chunk.decode(None)?;
194 writer.write_all(&data)?;
195 }
196
197 Ok(())
198 }
199
200 fn binary_search(
201 &self,
202 start_idx: usize,
203 start: u64,
204 end_idx: usize,
205 end: u64,
206 offset: u64
207 ) -> Result<usize, Error> {
208
209 if (offset >= end) || (offset < start) {
210 bail!("offset out of range");
211 }
212
213 if end_idx == start_idx {
214 return Ok(start_idx); // found
215 }
216 let middle_idx = (start_idx + end_idx)/2;
217 let middle_end = self.chunk_end(middle_idx);
218
219 if offset < middle_end {
220 return self.binary_search(start_idx, start, middle_idx, middle_end, offset);
221 } else {
222 return self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset);
223 }
224 }
225 }
226
227 impl IndexFile for DynamicIndexReader {
228 fn index_count(&self) -> usize {
229 self.index_entries
230 }
231
232 fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> {
233 if pos >= self.index_entries {
234 None
235 } else {
236 Some(unsafe {
237 std::mem::transmute(self.chunk_digest(pos).as_ptr())
238 })
239 }
240 }
241 }
242
243 pub struct BufferedDynamicReader {
244 index: DynamicIndexReader,
245 archive_size: u64,
246 read_buffer: Vec<u8>,
247 buffered_chunk_idx: usize,
248 buffered_chunk_start: u64,
249 read_offset: u64,
250 }
251
252 impl BufferedDynamicReader {
253
254 pub fn new(index: DynamicIndexReader) -> Self {
255
256 let archive_size = index.chunk_end(index.index_entries - 1);
257 Self {
258 index: index,
259 archive_size: archive_size,
260 read_buffer: Vec::with_capacity(1024*1024),
261 buffered_chunk_idx: 0,
262 buffered_chunk_start: 0,
263 read_offset: 0,
264 }
265 }
266
267 pub fn archive_size(&self) -> u64 { self.archive_size }
268
269 fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
270
271 let index = &self.index;
272 let end = index.chunk_end(idx);
273 let digest = index.chunk_digest(idx);
274
275 let chunk = index.store.read_chunk(digest)?;
276 // fimxe: handle encrypted chunks
277 // fixme: avoid copy
278 let data = chunk.decode(None)?;
279
280 self.read_buffer.clear();
281 self.read_buffer.extend_from_slice(&data);
282
283 self.buffered_chunk_idx = idx;
284 self.buffered_chunk_start = end - (self.read_buffer.len() as u64);
285 //println!("BUFFER {} {}", self.buffered_chunk_start, end);
286 Ok(())
287 }
288 }
289
290 impl crate::tools::BufferedRead for BufferedDynamicReader {
291
292 fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
293
294 if offset == self.archive_size { return Ok(&self.read_buffer[0..0]); }
295
296 let buffer_len = self.read_buffer.len();
297 let index = &self.index;
298
299 // optimization for sequential read
300 if buffer_len > 0 &&
301 ((self.buffered_chunk_idx + 1) < index.index_entries) &&
302 (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
303 {
304 let next_idx = self.buffered_chunk_idx + 1;
305 let next_end = index.chunk_end(next_idx);
306 if offset < next_end {
307 self.buffer_chunk(next_idx)?;
308 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
309 return Ok(&self.read_buffer[buffer_offset..]);
310 }
311 }
312
313 if (buffer_len == 0) ||
314 (offset < self.buffered_chunk_start) ||
315 (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
316 {
317 let end_idx = index.index_entries - 1;
318 let end = index.chunk_end(end_idx);
319 let idx = index.binary_search(0, 0, end_idx, end, offset)?;
320 self.buffer_chunk(idx)?;
321 }
322
323 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
324 Ok(&self.read_buffer[buffer_offset..])
325 }
326
327 }
328
329 impl std::io::Read for BufferedDynamicReader {
330
331 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
332
333 use std::io::{Error, ErrorKind};
334 use crate::tools::BufferedRead;
335
336 let data = match self.buffered_read(self.read_offset) {
337 Ok(v) => v,
338 Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
339 };
340
341 let n = if data.len() > buf.len() { buf.len() } else { data.len() };
342
343 unsafe { std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n); }
344
345 self.read_offset += n as u64;
346
347 return Ok(n);
348 }
349 }
350
351 impl std::io::Seek for BufferedDynamicReader {
352
353 fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, std::io::Error> {
354
355 use std::io::{SeekFrom};
356
357 let new_offset = match pos {
358 SeekFrom::Start(start_offset) => start_offset as i64,
359 SeekFrom::End(end_offset) => (self.archive_size as i64)+ end_offset,
360 SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
361 };
362
363 use std::io::{Error, ErrorKind};
364 if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
365 return Err(Error::new(
366 ErrorKind::Other,
367 format!("seek is out of range {} ([0..{}])", new_offset, self.archive_size)));
368 }
369 self.read_offset = new_offset as u64;
370
371 Ok(self.read_offset)
372 }
373 }
374
375 /// Create dynamic index files (`.dixd`)
376 pub struct DynamicIndexWriter {
377 store: Arc<ChunkStore>,
378 _lock: tools::ProcessLockSharedGuard,
379 writer: BufWriter<File>,
380 closed: bool,
381 filename: PathBuf,
382 tmp_filename: PathBuf,
383 csum: Option<openssl::sha::Sha256>,
384 pub uuid: [u8; 16],
385 pub ctime: u64,
386 }
387
388 impl Drop for DynamicIndexWriter {
389
390 fn drop(&mut self) {
391 let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors
392 }
393 }
394
395 impl DynamicIndexWriter {
396
397 pub fn create(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> {
398
399 let shared_lock = store.try_shared_lock()?;
400
401 let full_path = store.relative_path(path);
402 let mut tmp_path = full_path.clone();
403 tmp_path.set_extension("tmp_didx");
404
405 let file = std::fs::OpenOptions::new()
406 .create(true).truncate(true)
407 .read(true)
408 .write(true)
409 .open(&tmp_path)?;
410
411 let mut writer = BufWriter::with_capacity(1024*1024, file);
412
413 let header_size = std::mem::size_of::<DynamicIndexHeader>();
414
415 // todo: use static assertion when available in rust
416 if header_size != 4096 { panic!("got unexpected header size"); }
417
418 let ctime = std::time::SystemTime::now().duration_since(
419 std::time::SystemTime::UNIX_EPOCH)?.as_secs();
420
421 let uuid = Uuid::new_v4();
422
423 let mut buffer = vec::undefined(header_size);
424 let header = crate::tools::map_struct_mut::<DynamicIndexHeader>(&mut buffer)?;
425
426 header.magic = super::DYNAMIC_SIZED_CHUNK_INDEX_1_0;
427 header.ctime = u64::to_le(ctime);
428 header.uuid = *uuid.as_bytes();
429
430 header.index_csum = [0u8; 32];
431
432 writer.write_all(&buffer)?;
433
434 let csum = Some(openssl::sha::Sha256::new());
435
436 Ok(Self {
437 store,
438 _lock: shared_lock,
439 writer: writer,
440 closed: false,
441 filename: full_path,
442 tmp_filename: tmp_path,
443 ctime,
444 uuid: *uuid.as_bytes(),
445 csum,
446 })
447 }
448
449 // fixme: use add_chunk instead?
450 pub fn insert_chunk(&self, chunk: &DataChunk) -> Result<(bool, u64), Error> {
451 self.store.insert_chunk(chunk)
452 }
453
454 pub fn close(&mut self) -> Result<[u8; 32], Error> {
455
456 if self.closed {
457 bail!("cannot close already closed archive index file {:?}", self.filename);
458 }
459
460 self.closed = true;
461
462 self.writer.flush()?;
463
464 use std::io::Seek;
465
466 let csum_offset = proxmox::tools::offsetof!(DynamicIndexHeader, index_csum);
467 self.writer.seek(std::io::SeekFrom::Start(csum_offset as u64))?;
468
469 let csum = self.csum.take().unwrap();
470 let index_csum = csum.finish();
471
472 self.writer.write_all(&index_csum)?;
473 self.writer.flush()?;
474
475
476 if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
477 bail!("Atomic rename file {:?} failed - {}", self.filename, err);
478 }
479
480 Ok(index_csum)
481 }
482
483 // fixme: rename to add_digest
484 pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), Error> {
485 if self.closed {
486 bail!("cannot write to closed dynamic index file {:?}", self.filename);
487 }
488
489 let offset_le: &[u8; 8] = unsafe { &std::mem::transmute::<u64, [u8;8]>(offset.to_le()) };
490
491 if let Some(ref mut csum) = self.csum {
492 csum.update(offset_le);
493 csum.update(digest);
494 }
495
496 self.writer.write(offset_le)?;
497 self.writer.write(digest)?;
498 Ok(())
499 }
500 }
501
502 /// Writer which splits a binary stream into dynamic sized chunks
503 ///
504 /// And store the resulting chunk list into the index file.
505 pub struct DynamicChunkWriter {
506 index: DynamicIndexWriter,
507 closed: bool,
508 chunker: Chunker,
509 stat: ChunkStat,
510 chunk_offset: usize,
511 last_chunk: usize,
512 chunk_buffer: Vec<u8>,
513 }
514
515 impl DynamicChunkWriter {
516
517 pub fn new(index: DynamicIndexWriter, chunk_size: usize) -> Self {
518 Self {
519 index,
520 closed: false,
521 chunker: Chunker::new(chunk_size),
522 stat: ChunkStat::new(0),
523 chunk_offset: 0,
524 last_chunk: 0,
525 chunk_buffer: Vec::with_capacity(chunk_size*4),
526 }
527 }
528
529 pub fn stat(&self) -> &ChunkStat {
530 &self.stat
531 }
532
533 pub fn close(&mut self) -> Result<(), Error> {
534
535 if self.closed {
536 return Ok(());
537 }
538
539 self.closed = true;
540
541 self.write_chunk_buffer()?;
542
543 self.index.close()?;
544
545 self.stat.size = self.chunk_offset as u64;
546
547 // add size of index file
548 self.stat.size += (self.stat.chunk_count*40 + std::mem::size_of::<DynamicIndexHeader>()) as u64;
549
550 Ok(())
551 }
552
553 fn write_chunk_buffer(&mut self) -> Result<(), Error> {
554
555 let chunk_size = self.chunk_buffer.len();
556
557 if chunk_size == 0 { return Ok(()); }
558
559 let expected_chunk_size = self.chunk_offset - self.last_chunk;
560 if expected_chunk_size != self.chunk_buffer.len() {
561 bail!("wrong chunk size {} != {}", expected_chunk_size, chunk_size);
562 }
563
564 self.stat.chunk_count += 1;
565
566 self.last_chunk = self.chunk_offset;
567
568 let chunk = DataChunkBuilder::new(&self.chunk_buffer)
569 .compress(true)
570 .build()?;
571
572 let digest = chunk.digest();
573
574 match self.index.insert_chunk(&chunk) {
575 Ok((is_duplicate, compressed_size)) => {
576
577 self.stat.compressed_size += compressed_size;
578 if is_duplicate {
579 self.stat.duplicate_chunks += 1;
580 } else {
581 self.stat.disk_size += compressed_size;
582 }
583
584 println!("ADD CHUNK {:016x} {} {}% {} {}", self.chunk_offset, chunk_size,
585 (compressed_size*100)/(chunk_size as u64), is_duplicate, proxmox::tools::digest_to_hex(digest));
586 self.index.add_chunk(self.chunk_offset as u64, &digest)?;
587 self.chunk_buffer.truncate(0);
588 return Ok(());
589 }
590 Err(err) => {
591 self.chunk_buffer.truncate(0);
592 return Err(err);
593 }
594 }
595 }
596 }
597
598 impl Write for DynamicChunkWriter {
599
600 fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
601
602 let chunker = &mut self.chunker;
603
604 let pos = chunker.scan(data);
605
606 if pos > 0 {
607 self.chunk_buffer.extend(&data[0..pos]);
608 self.chunk_offset += pos;
609
610 if let Err(err) = self.write_chunk_buffer() {
611 return Err(std::io::Error::new(std::io::ErrorKind::Other, err.to_string()));
612 }
613 Ok(pos)
614
615 } else {
616 self.chunk_offset += data.len();
617 self.chunk_buffer.extend(data);
618 Ok(data.len())
619 }
620 }
621
622 fn flush(&mut self) -> std::result::Result<(), std::io::Error> {
623 Err(std::io::Error::new(std::io::ErrorKind::Other, "please use close() instead of flush()"))
624 }
625 }