]> git.proxmox.com Git - proxmox-backup.git/blame - src/backup/dynamic_index.rs
src/backup/*_index.rs: used generated magic numbers
[proxmox-backup.git] / src / backup / dynamic_index.rs
CommitLineData
0433db19 1use failure::*;
f98ac774 2use std::convert::TryInto;
0433db19 3
22968600 4use crate::tools;
7bc1d727 5use super::IndexFile;
7e336555 6use super::chunk_stat::*;
0433db19 7use super::chunk_store::*;
06178f13 8use proxmox_protocol::Chunker;
0433db19 9
1629d2ad 10use std::sync::Arc;
976595e1 11use std::io::{Write, BufWriter};
0433db19
DM
12use std::fs::File;
13use std::path::{Path, PathBuf};
14use std::os::unix::io::AsRawFd;
15use uuid::Uuid;
ddbdf80d 16//use chrono::{Local, TimeZone};
0433db19 17
8ea3b1d1
WB
18use crate::tools::io::ops::*;
19use crate::tools::vec;
20
f98ac774
DM
21use super::{DataChunk, DataChunkBuilder};
22
e5064ba6 23/// Header format definition for dynamic index files (`.dixd`)
0433db19 24#[repr(C)]
93d5d779 25pub struct DynamicIndexHeader {
e5064ba6 26 /// The string `PROXMOX-DIDX`
a7dd4830 27 pub magic: [u8; 8],
0433db19
DM
28 pub uuid: [u8; 16],
29 pub ctime: u64,
16ff6b7c
DM
30 /// Sha256 over the index ``SHA256(offset1||digest1||offset2||digest2||...)``
31 pub index_csum: [u8; 32],
a7dd4830 32 reserved: [u8; 4030], // overall size is one page (4096 bytes)
0433db19
DM
33}
34
77703d95 35
93d5d779 36pub struct DynamicIndexReader {
150f1bd8 37 store: Arc<ChunkStore>,
728797d0 38 _file: File,
9f49fe1d 39 pub size: usize,
77703d95
DM
40 filename: PathBuf,
41 index: *const u8,
42 index_entries: usize,
9f49fe1d
DM
43 pub uuid: [u8; 16],
44 pub ctime: u64,
16ff6b7c 45 pub index_csum: [u8; 32],
77703d95
DM
46}
47
5be4065b
WB
48// `index` is mmap()ed which cannot be thread-local so should be sendable
49// FIXME: Introduce an mmap wrapper type for this?
93d5d779 50unsafe impl Send for DynamicIndexReader {}
0b05fd58 51
93d5d779 52impl Drop for DynamicIndexReader {
77703d95
DM
53
54 fn drop(&mut self) {
55 if let Err(err) = self.unmap() {
56 eprintln!("Unable to unmap file {:?} - {}", self.filename, err);
57 }
58 }
59}
60
93d5d779 61impl DynamicIndexReader {
77703d95 62
150f1bd8 63 pub fn open(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> {
77703d95
DM
64
65 let full_path = store.relative_path(path);
66
67 let mut file = std::fs::File::open(&full_path)?;
68
c597a92c
DM
69 if let Err(err) = nix::fcntl::flock(file.as_raw_fd(), nix::fcntl::FlockArg::LockSharedNonblock) {
70 bail!("unable to get shared lock on {:?} - {}", full_path, err);
71 }
72
93d5d779 73 let header_size = std::mem::size_of::<DynamicIndexHeader>();
77703d95
DM
74
75 // todo: use static assertion when available in rust
76 if header_size != 4096 { bail!("got unexpected header size for {:?}", path); }
77
8ea3b1d1 78 let buffer = file.read_exact_allocated(header_size)?;
77703d95 79
8ea3b1d1 80 let header = unsafe { &* (buffer.as_ptr() as *const DynamicIndexHeader) };
77703d95 81
a7dd4830 82 if header.magic != super::DYNAMIC_SIZED_CHUNK_INDEX_1_0 {
77703d95
DM
83 bail!("got unknown magic number for {:?}", path);
84 }
85
77703d95
DM
86 let ctime = u64::from_le(header.ctime);
87
88 let rawfd = file.as_raw_fd();
89
90 let stat = match nix::sys::stat::fstat(rawfd) {
91 Ok(stat) => stat,
92 Err(err) => bail!("fstat {:?} failed - {}", path, err),
93 };
94
95 let size = stat.st_size as usize;
96
ddbdf80d 97 let index_size = size - header_size;
77703d95
DM
98 if (index_size % 40) != 0 {
99 bail!("got unexpected file size for {:?}", path);
100 }
101
102 let data = unsafe { nix::sys::mman::mmap(
103 std::ptr::null_mut(),
104 index_size,
105 nix::sys::mman::ProtFlags::PROT_READ,
106 nix::sys::mman::MapFlags::MAP_PRIVATE,
107 rawfd,
108 header_size as i64) }? as *const u8;
109
77703d95
DM
110 Ok(Self {
111 store,
112 filename: full_path,
728797d0 113 _file: file,
77703d95
DM
114 size,
115 index: data,
116 index_entries: index_size/40,
117 ctime,
118 uuid: header.uuid,
16ff6b7c 119 index_csum: header.index_csum,
77703d95
DM
120 })
121 }
122
123 fn unmap(&mut self) -> Result<(), Error> {
124
125 if self.index == std::ptr::null_mut() { return Ok(()); }
126
b663789b 127 if let Err(err) = unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, self.index_entries*40) } {
77703d95
DM
128 bail!("unmap file {:?} failed - {}", self.filename, err);
129 }
130
131 self.index = std::ptr::null_mut();
132
133 Ok(())
134 }
135
40f4e198
DM
136 pub fn chunk_info(&self, pos: usize) -> Result<(u64, u64, [u8; 32]), Error> {
137 if pos >= self.index_entries {
138 bail!("chunk index out of range");
139 }
140 let start = if pos == 0 {
141 0
142 } else {
143 unsafe { *(self.index.add((pos-1)*40) as *const u64) }
144 };
145
146 let end = unsafe { *(self.index.add(pos*40) as *const u64) };
147 let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() };
148 unsafe { std::ptr::copy_nonoverlapping(self.index.add(pos*40+8), digest.as_mut_ptr(), 32); }
149
150 Ok((start, end, digest))
151 }
152
39c6bd86
DM
153 #[inline]
154 fn chunk_end(&self, pos: usize) -> u64 {
155 if pos >= self.index_entries {
156 panic!("chunk index out of range");
157 }
158 unsafe { *(self.index.add(pos*40) as *const u64) }
159 }
160
161 #[inline]
f98ac774 162 fn chunk_digest(&self, pos: usize) -> &[u8; 32] {
39c6bd86
DM
163 if pos >= self.index_entries {
164 panic!("chunk index out of range");
165 }
f98ac774
DM
166 let slice = unsafe { std::slice::from_raw_parts(self.index.add(pos*40+8), 32) };
167 slice.try_into().unwrap()
39c6bd86
DM
168 }
169
ddbdf80d 170 pub fn mark_used_chunks(&self, _status: &mut GarbageCollectionStatus) -> Result<(), Error> {
77703d95
DM
171
172 for pos in 0..self.index_entries {
92da93b2
DM
173
174 tools::fail_on_shutdown()?;
175
39c6bd86 176 let digest = self.chunk_digest(pos);
77703d95
DM
177 if let Err(err) = self.store.touch_chunk(digest) {
178 bail!("unable to access chunk {}, required by {:?} - {}",
bffd40d6 179 proxmox::tools::digest_to_hex(digest), self.filename, err);
77703d95
DM
180 }
181 }
182 Ok(())
183 }
96df2fb4 184
dd5495d6 185 pub fn dump_pxar(&self, mut writer: Box<dyn Write>) -> Result<(), Error> {
96df2fb4
DM
186
187 for pos in 0..self.index_entries {
ddbdf80d 188 let _end = self.chunk_end(pos);
39c6bd86
DM
189 let digest = self.chunk_digest(pos);
190 //println!("Dump {:08x}", end );
f98ac774
DM
191 let chunk = self.store.read_chunk(digest)?;
192 // fimxe: handle encrypted chunks
193 let data = chunk.decode(None)?;
194 writer.write_all(&data)?;
96df2fb4
DM
195 }
196
197 Ok(())
198 }
39c6bd86
DM
199
200 fn binary_search(
201 &self,
202 start_idx: usize,
203 start: u64,
204 end_idx: usize,
205 end: u64,
206 offset: u64
207 ) -> Result<usize, Error> {
208
209 if (offset >= end) || (offset < start) {
210 bail!("offset out of range");
211 }
212
213 if end_idx == start_idx {
214 return Ok(start_idx); // found
215 }
216 let middle_idx = (start_idx + end_idx)/2;
217 let middle_end = self.chunk_end(middle_idx);
218
219 if offset < middle_end {
220 return self.binary_search(start_idx, start, middle_idx, middle_end, offset);
221 } else {
222 return self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset);
223 }
224 }
225}
226
7bc1d727
WB
227impl IndexFile for DynamicIndexReader {
228 fn index_count(&self) -> usize {
229 self.index_entries
230 }
231
232 fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> {
233 if pos >= self.index_entries {
234 None
235 } else {
236 Some(unsafe {
237 std::mem::transmute(self.chunk_digest(pos).as_ptr())
238 })
239 }
240 }
241}
242
93d5d779
DM
243pub struct BufferedDynamicReader {
244 index: DynamicIndexReader,
39c6bd86
DM
245 archive_size: u64,
246 read_buffer: Vec<u8>,
247 buffered_chunk_idx: usize,
248 buffered_chunk_start: u64,
249 read_offset: u64,
77703d95
DM
250}
251
93d5d779 252impl BufferedDynamicReader {
39c6bd86 253
93d5d779 254 pub fn new(index: DynamicIndexReader) -> Self {
39c6bd86
DM
255
256 let archive_size = index.chunk_end(index.index_entries - 1);
257 Self {
258 index: index,
259 archive_size: archive_size,
260 read_buffer: Vec::with_capacity(1024*1024),
261 buffered_chunk_idx: 0,
262 buffered_chunk_start: 0,
263 read_offset: 0,
264 }
265 }
266
267 pub fn archive_size(&self) -> u64 { self.archive_size }
268
0a72e267
DM
269 fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
270
0b05fd58 271 let index = &self.index;
0a72e267
DM
272 let end = index.chunk_end(idx);
273 let digest = index.chunk_digest(idx);
f98ac774
DM
274
275 let chunk = index.store.read_chunk(digest)?;
276 // fimxe: handle encrypted chunks
277 // fixme: avoid copy
278 let data = chunk.decode(None)?;
279
280 self.read_buffer.clear();
281 self.read_buffer.extend_from_slice(&data);
0a72e267
DM
282
283 self.buffered_chunk_idx = idx;
284 self.buffered_chunk_start = end - (self.read_buffer.len() as u64);
285 //println!("BUFFER {} {}", self.buffered_chunk_start, end);
286 Ok(())
287 }
288}
289
fded74d0 290impl crate::tools::BufferedRead for BufferedDynamicReader {
0a72e267
DM
291
292 fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
39c6bd86 293
318564ac
DM
294 if offset == self.archive_size { return Ok(&self.read_buffer[0..0]); }
295
39c6bd86 296 let buffer_len = self.read_buffer.len();
0b05fd58 297 let index = &self.index;
39c6bd86
DM
298
299 // optimization for sequential read
300 if buffer_len > 0 &&
301 ((self.buffered_chunk_idx + 1) < index.index_entries) &&
302 (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
303 {
304 let next_idx = self.buffered_chunk_idx + 1;
305 let next_end = index.chunk_end(next_idx);
306 if offset < next_end {
373ef4a5 307 self.buffer_chunk(next_idx)?;
39c6bd86
DM
308 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
309 return Ok(&self.read_buffer[buffer_offset..]);
310 }
311 }
312
313 if (buffer_len == 0) ||
314 (offset < self.buffered_chunk_start) ||
315 (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
316 {
317 let end_idx = index.index_entries - 1;
318 let end = index.chunk_end(end_idx);
319 let idx = index.binary_search(0, 0, end_idx, end, offset)?;
373ef4a5 320 self.buffer_chunk(idx)?;
39c6bd86
DM
321 }
322
323 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
324 Ok(&self.read_buffer[buffer_offset..])
325 }
326
39c6bd86 327}
77703d95 328
93d5d779 329impl std::io::Read for BufferedDynamicReader {
4624fe29
DM
330
331 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
332
333 use std::io::{Error, ErrorKind};
fded74d0 334 use crate::tools::BufferedRead;
4624fe29
DM
335
336 let data = match self.buffered_read(self.read_offset) {
337 Ok(v) => v,
338 Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
339 };
340
341 let n = if data.len() > buf.len() { buf.len() } else { data.len() };
342
343 unsafe { std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n); }
344
345 self.read_offset += n as u64;
346
347 return Ok(n);
348 }
349}
350
93d5d779 351impl std::io::Seek for BufferedDynamicReader {
34337050
DM
352
353 fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, std::io::Error> {
354
ddbdf80d 355 use std::io::{SeekFrom};
34337050
DM
356
357 let new_offset = match pos {
358 SeekFrom::Start(start_offset) => start_offset as i64,
359 SeekFrom::End(end_offset) => (self.archive_size as i64)+ end_offset,
360 SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
361 };
362
ddbdf80d 363 use std::io::{Error, ErrorKind};
34337050
DM
364 if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
365 return Err(Error::new(
366 ErrorKind::Other,
367 format!("seek is out of range {} ([0..{}])", new_offset, self.archive_size)));
368 }
369 self.read_offset = new_offset as u64;
370
371 Ok(self.read_offset)
372 }
373}
374
976595e1 375/// Create dynamic index files (`.dixd`)
93d5d779 376pub struct DynamicIndexWriter {
1629d2ad 377 store: Arc<ChunkStore>,
43b13033 378 _lock: tools::ProcessLockSharedGuard,
5032b57b 379 writer: BufWriter<File>,
5e7a09be 380 closed: bool,
0433db19
DM
381 filename: PathBuf,
382 tmp_filename: PathBuf,
16ff6b7c 383 csum: Option<openssl::sha::Sha256>,
9f49fe1d
DM
384 pub uuid: [u8; 16],
385 pub ctime: u64,
0433db19
DM
386}
387
93d5d779 388impl Drop for DynamicIndexWriter {
1629d2ad
DM
389
390 fn drop(&mut self) {
391 let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors
392 }
393}
394
93d5d779 395impl DynamicIndexWriter {
0433db19 396
976595e1 397 pub fn create(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> {
0433db19 398
43b13033
DM
399 let shared_lock = store.try_shared_lock()?;
400
0433db19
DM
401 let full_path = store.relative_path(path);
402 let mut tmp_path = full_path.clone();
93d5d779 403 tmp_path.set_extension("tmp_didx");
0433db19 404
ddbdf80d 405 let file = std::fs::OpenOptions::new()
0433db19
DM
406 .create(true).truncate(true)
407 .read(true)
408 .write(true)
409 .open(&tmp_path)?;
410
5032b57b
DM
411 let mut writer = BufWriter::with_capacity(1024*1024, file);
412
93d5d779 413 let header_size = std::mem::size_of::<DynamicIndexHeader>();
0433db19
DM
414
415 // todo: use static assertion when available in rust
416 if header_size != 4096 { panic!("got unexpected header size"); }
417
418 let ctime = std::time::SystemTime::now().duration_since(
419 std::time::SystemTime::UNIX_EPOCH)?.as_secs();
420
421 let uuid = Uuid::new_v4();
422
8ea3b1d1 423 let mut buffer = vec::undefined(header_size);
93d5d779 424 let header = crate::tools::map_struct_mut::<DynamicIndexHeader>(&mut buffer)?;
0433db19 425
a7dd4830 426 header.magic = super::DYNAMIC_SIZED_CHUNK_INDEX_1_0;
0433db19
DM
427 header.ctime = u64::to_le(ctime);
428 header.uuid = *uuid.as_bytes();
429
16ff6b7c
DM
430 header.index_csum = [0u8; 32];
431
5032b57b 432 writer.write_all(&buffer)?;
0433db19 433
16ff6b7c
DM
434 let csum = Some(openssl::sha::Sha256::new());
435
0433db19
DM
436 Ok(Self {
437 store,
43b13033 438 _lock: shared_lock,
5032b57b 439 writer: writer,
5e7a09be 440 closed: false,
0433db19
DM
441 filename: full_path,
442 tmp_filename: tmp_path,
443 ctime,
444 uuid: *uuid.as_bytes(),
16ff6b7c 445 csum,
0433db19
DM
446 })
447 }
5e7a09be 448
f98ac774
DM
449 // fixme: use add_chunk instead?
450 pub fn insert_chunk(&self, chunk: &DataChunk) -> Result<(bool, u64), Error> {
976595e1
DM
451 self.store.insert_chunk(chunk)
452 }
453
16ff6b7c 454 pub fn close(&mut self) -> Result<[u8; 32], Error> {
5e7a09be
DM
455
456 if self.closed {
457 bail!("cannot close already closed archive index file {:?}", self.filename);
458 }
459
460 self.closed = true;
461
5032b57b 462 self.writer.flush()?;
5e7a09be 463
16ff6b7c
DM
464 use std::io::Seek;
465
466 let csum_offset = proxmox::tools::offsetof!(DynamicIndexHeader, index_csum);
467 self.writer.seek(std::io::SeekFrom::Start(csum_offset as u64))?;
468
469 let csum = self.csum.take().unwrap();
470 let index_csum = csum.finish();
471
472 self.writer.write_all(&index_csum)?;
473 self.writer.flush()?;
474
475
5e7a09be
DM
476 if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
477 bail!("Atomic rename file {:?} failed - {}", self.filename, err);
478 }
479
16ff6b7c 480 Ok(index_csum)
5e7a09be
DM
481 }
482
976595e1
DM
483 // fixme: rename to add_digest
484 pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), Error> {
485 if self.closed {
486 bail!("cannot write to closed dynamic index file {:?}", self.filename);
487 }
16ff6b7c
DM
488
489 let offset_le: &[u8; 8] = unsafe { &std::mem::transmute::<u64, [u8;8]>(offset.to_le()) };
490
491 if let Some(ref mut csum) = self.csum {
492 csum.update(offset_le);
493 csum.update(digest);
494 }
495
496 self.writer.write(offset_le)?;
976595e1
DM
497 self.writer.write(digest)?;
498 Ok(())
499 }
500}
501
502/// Writer which splits a binary stream into dynamic sized chunks
503///
504/// And store the resulting chunk list into the index file.
505pub struct DynamicChunkWriter {
506 index: DynamicIndexWriter,
507 closed: bool,
508 chunker: Chunker,
509 stat: ChunkStat,
510 chunk_offset: usize,
511 last_chunk: usize,
512 chunk_buffer: Vec<u8>,
513}
514
515impl DynamicChunkWriter {
516
517 pub fn new(index: DynamicIndexWriter, chunk_size: usize) -> Self {
518 Self {
519 index,
520 closed: false,
521 chunker: Chunker::new(chunk_size),
522 stat: ChunkStat::new(0),
523 chunk_offset: 0,
524 last_chunk: 0,
525 chunk_buffer: Vec::with_capacity(chunk_size*4),
526 }
527 }
528
7e336555
DM
529 pub fn stat(&self) -> &ChunkStat {
530 &self.stat
531 }
532
976595e1 533 pub fn close(&mut self) -> Result<(), Error> {
5e7a09be 534
976595e1
DM
535 if self.closed {
536 return Ok(());
537 }
538
539 self.closed = true;
540
541 self.write_chunk_buffer()?;
542
543 self.index.close()?;
544
545 self.stat.size = self.chunk_offset as u64;
546
547 // add size of index file
548 self.stat.size += (self.stat.chunk_count*40 + std::mem::size_of::<DynamicIndexHeader>()) as u64;
549
550 Ok(())
551 }
552
553 fn write_chunk_buffer(&mut self) -> Result<(), Error> {
5e7a09be
DM
554
555 let chunk_size = self.chunk_buffer.len();
556
557 if chunk_size == 0 { return Ok(()); }
558
559 let expected_chunk_size = self.chunk_offset - self.last_chunk;
560 if expected_chunk_size != self.chunk_buffer.len() {
976595e1 561 bail!("wrong chunk size {} != {}", expected_chunk_size, chunk_size);
5e7a09be
DM
562 }
563
7e336555 564 self.stat.chunk_count += 1;
247cdbce 565
5e7a09be
DM
566 self.last_chunk = self.chunk_offset;
567
f98ac774
DM
568 let chunk = DataChunkBuilder::new(&self.chunk_buffer)
569 .compress(true)
570 .build()?;
571
572 let digest = chunk.digest();
573
574 match self.index.insert_chunk(&chunk) {
575 Ok((is_duplicate, compressed_size)) => {
798f7fa0 576
7e336555 577 self.stat.compressed_size += compressed_size;
798f7fa0 578 if is_duplicate {
7e336555 579 self.stat.duplicate_chunks += 1;
798f7fa0 580 } else {
7e336555 581 self.stat.disk_size += compressed_size;
798f7fa0
DM
582 }
583
584 println!("ADD CHUNK {:016x} {} {}% {} {}", self.chunk_offset, chunk_size,
bffd40d6 585 (compressed_size*100)/(chunk_size as u64), is_duplicate, proxmox::tools::digest_to_hex(digest));
976595e1 586 self.index.add_chunk(self.chunk_offset as u64, &digest)?;
5e7a09be
DM
587 self.chunk_buffer.truncate(0);
588 return Ok(());
589 }
590 Err(err) => {
591 self.chunk_buffer.truncate(0);
976595e1 592 return Err(err);
5e7a09be
DM
593 }
594 }
5e7a09be 595 }
0433db19
DM
596}
597
976595e1 598impl Write for DynamicChunkWriter {
0433db19
DM
599
600 fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
601
0433db19
DM
602 let chunker = &mut self.chunker;
603
604 let pos = chunker.scan(data);
605
606 if pos > 0 {
607 self.chunk_buffer.extend(&data[0..pos]);
608 self.chunk_offset += pos;
609
976595e1
DM
610 if let Err(err) = self.write_chunk_buffer() {
611 return Err(std::io::Error::new(std::io::ErrorKind::Other, err.to_string()));
612 }
5e7a09be 613 Ok(pos)
0433db19
DM
614
615 } else {
616 self.chunk_offset += data.len();
617 self.chunk_buffer.extend(data);
5e7a09be 618 Ok(data.len())
0433db19
DM
619 }
620 }
621
622 fn flush(&mut self) -> std::result::Result<(), std::io::Error> {
976595e1 623 Err(std::io::Error::new(std::io::ErrorKind::Other, "please use close() instead of flush()"))
0433db19
DM
624 }
625}