]> git.proxmox.com Git - proxmox-backup.git/blame - src/backup/dynamic_index.rs
src/api2/backup/environment.rs: log duplicate chunks
[proxmox-backup.git] / src / backup / dynamic_index.rs
CommitLineData
0433db19 1use failure::*;
f98ac774 2use std::convert::TryInto;
0433db19 3
22968600 4use crate::tools;
7bc1d727 5use super::IndexFile;
7e336555 6use super::chunk_stat::*;
0433db19 7use super::chunk_store::*;
06178f13 8use proxmox_protocol::Chunker;
0433db19 9
1629d2ad 10use std::sync::Arc;
976595e1 11use std::io::{Write, BufWriter};
0433db19
DM
12use std::fs::File;
13use std::path::{Path, PathBuf};
14use std::os::unix::io::AsRawFd;
15use uuid::Uuid;
ddbdf80d 16//use chrono::{Local, TimeZone};
0433db19 17
8ea3b1d1
WB
18use crate::tools::io::ops::*;
19use crate::tools::vec;
20
f98ac774
DM
21use super::{DataChunk, DataChunkBuilder};
22
e5064ba6 23/// Header format definition for dynamic index files (`.dixd`)
0433db19 24#[repr(C)]
93d5d779 25pub struct DynamicIndexHeader {
e5064ba6 26 /// The string `PROXMOX-DIDX`
0433db19
DM
27 pub magic: [u8; 12],
28 pub version: u32,
29 pub uuid: [u8; 16],
30 pub ctime: u64,
44b3f62b 31 reserved: [u8; 4056], // overall size is one page (4096 bytes)
0433db19
DM
32}
33
77703d95 34
93d5d779 35pub struct DynamicIndexReader {
150f1bd8 36 store: Arc<ChunkStore>,
728797d0 37 _file: File,
9f49fe1d 38 pub size: usize,
77703d95
DM
39 filename: PathBuf,
40 index: *const u8,
41 index_entries: usize,
9f49fe1d
DM
42 pub uuid: [u8; 16],
43 pub ctime: u64,
77703d95
DM
44}
45
5be4065b
WB
46// `index` is mmap()ed which cannot be thread-local so should be sendable
47// FIXME: Introduce an mmap wrapper type for this?
93d5d779 48unsafe impl Send for DynamicIndexReader {}
0b05fd58 49
93d5d779 50impl Drop for DynamicIndexReader {
77703d95
DM
51
52 fn drop(&mut self) {
53 if let Err(err) = self.unmap() {
54 eprintln!("Unable to unmap file {:?} - {}", self.filename, err);
55 }
56 }
57}
58
93d5d779 59impl DynamicIndexReader {
77703d95 60
150f1bd8 61 pub fn open(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> {
77703d95
DM
62
63 let full_path = store.relative_path(path);
64
65 let mut file = std::fs::File::open(&full_path)?;
66
c597a92c
DM
67 if let Err(err) = nix::fcntl::flock(file.as_raw_fd(), nix::fcntl::FlockArg::LockSharedNonblock) {
68 bail!("unable to get shared lock on {:?} - {}", full_path, err);
69 }
70
93d5d779 71 let header_size = std::mem::size_of::<DynamicIndexHeader>();
77703d95
DM
72
73 // todo: use static assertion when available in rust
74 if header_size != 4096 { bail!("got unexpected header size for {:?}", path); }
75
8ea3b1d1 76 let buffer = file.read_exact_allocated(header_size)?;
77703d95 77
8ea3b1d1 78 let header = unsafe { &* (buffer.as_ptr() as *const DynamicIndexHeader) };
77703d95 79
93d5d779 80 if header.magic != *b"PROXMOX-DIDX" {
77703d95
DM
81 bail!("got unknown magic number for {:?}", path);
82 }
83
84 let version = u32::from_le(header.version);
85 if version != 1 {
86 bail!("got unsupported version number ({}) for {:?}", version, path);
87 }
88
89 let ctime = u64::from_le(header.ctime);
90
91 let rawfd = file.as_raw_fd();
92
93 let stat = match nix::sys::stat::fstat(rawfd) {
94 Ok(stat) => stat,
95 Err(err) => bail!("fstat {:?} failed - {}", path, err),
96 };
97
98 let size = stat.st_size as usize;
99
ddbdf80d 100 let index_size = size - header_size;
77703d95
DM
101 if (index_size % 40) != 0 {
102 bail!("got unexpected file size for {:?}", path);
103 }
104
105 let data = unsafe { nix::sys::mman::mmap(
106 std::ptr::null_mut(),
107 index_size,
108 nix::sys::mman::ProtFlags::PROT_READ,
109 nix::sys::mman::MapFlags::MAP_PRIVATE,
110 rawfd,
111 header_size as i64) }? as *const u8;
112
77703d95
DM
113 Ok(Self {
114 store,
115 filename: full_path,
728797d0 116 _file: file,
77703d95
DM
117 size,
118 index: data,
119 index_entries: index_size/40,
120 ctime,
121 uuid: header.uuid,
122 })
123 }
124
125 fn unmap(&mut self) -> Result<(), Error> {
126
127 if self.index == std::ptr::null_mut() { return Ok(()); }
128
b663789b 129 if let Err(err) = unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, self.index_entries*40) } {
77703d95
DM
130 bail!("unmap file {:?} failed - {}", self.filename, err);
131 }
132
133 self.index = std::ptr::null_mut();
134
135 Ok(())
136 }
137
40f4e198
DM
138 pub fn chunk_info(&self, pos: usize) -> Result<(u64, u64, [u8; 32]), Error> {
139 if pos >= self.index_entries {
140 bail!("chunk index out of range");
141 }
142 let start = if pos == 0 {
143 0
144 } else {
145 unsafe { *(self.index.add((pos-1)*40) as *const u64) }
146 };
147
148 let end = unsafe { *(self.index.add(pos*40) as *const u64) };
149 let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() };
150 unsafe { std::ptr::copy_nonoverlapping(self.index.add(pos*40+8), digest.as_mut_ptr(), 32); }
151
152 Ok((start, end, digest))
153 }
154
39c6bd86
DM
155 #[inline]
156 fn chunk_end(&self, pos: usize) -> u64 {
157 if pos >= self.index_entries {
158 panic!("chunk index out of range");
159 }
160 unsafe { *(self.index.add(pos*40) as *const u64) }
161 }
162
163 #[inline]
f98ac774 164 fn chunk_digest(&self, pos: usize) -> &[u8; 32] {
39c6bd86
DM
165 if pos >= self.index_entries {
166 panic!("chunk index out of range");
167 }
f98ac774
DM
168 let slice = unsafe { std::slice::from_raw_parts(self.index.add(pos*40+8), 32) };
169 slice.try_into().unwrap()
39c6bd86
DM
170 }
171
ddbdf80d 172 pub fn mark_used_chunks(&self, _status: &mut GarbageCollectionStatus) -> Result<(), Error> {
77703d95
DM
173
174 for pos in 0..self.index_entries {
92da93b2
DM
175
176 tools::fail_on_shutdown()?;
177
39c6bd86 178 let digest = self.chunk_digest(pos);
77703d95
DM
179 if let Err(err) = self.store.touch_chunk(digest) {
180 bail!("unable to access chunk {}, required by {:?} - {}",
22968600 181 tools::digest_to_hex(digest), self.filename, err);
77703d95
DM
182 }
183 }
184 Ok(())
185 }
96df2fb4 186
dd5495d6 187 pub fn dump_pxar(&self, mut writer: Box<dyn Write>) -> Result<(), Error> {
96df2fb4
DM
188
189 for pos in 0..self.index_entries {
ddbdf80d 190 let _end = self.chunk_end(pos);
39c6bd86
DM
191 let digest = self.chunk_digest(pos);
192 //println!("Dump {:08x}", end );
f98ac774
DM
193 let chunk = self.store.read_chunk(digest)?;
194 // fimxe: handle encrypted chunks
195 let data = chunk.decode(None)?;
196 writer.write_all(&data)?;
96df2fb4
DM
197 }
198
199 Ok(())
200 }
39c6bd86
DM
201
202 fn binary_search(
203 &self,
204 start_idx: usize,
205 start: u64,
206 end_idx: usize,
207 end: u64,
208 offset: u64
209 ) -> Result<usize, Error> {
210
211 if (offset >= end) || (offset < start) {
212 bail!("offset out of range");
213 }
214
215 if end_idx == start_idx {
216 return Ok(start_idx); // found
217 }
218 let middle_idx = (start_idx + end_idx)/2;
219 let middle_end = self.chunk_end(middle_idx);
220
221 if offset < middle_end {
222 return self.binary_search(start_idx, start, middle_idx, middle_end, offset);
223 } else {
224 return self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset);
225 }
226 }
227}
228
7bc1d727
WB
229impl IndexFile for DynamicIndexReader {
230 fn index_count(&self) -> usize {
231 self.index_entries
232 }
233
234 fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> {
235 if pos >= self.index_entries {
236 None
237 } else {
238 Some(unsafe {
239 std::mem::transmute(self.chunk_digest(pos).as_ptr())
240 })
241 }
242 }
243}
244
93d5d779
DM
245pub struct BufferedDynamicReader {
246 index: DynamicIndexReader,
39c6bd86
DM
247 archive_size: u64,
248 read_buffer: Vec<u8>,
249 buffered_chunk_idx: usize,
250 buffered_chunk_start: u64,
251 read_offset: u64,
77703d95
DM
252}
253
93d5d779 254impl BufferedDynamicReader {
39c6bd86 255
93d5d779 256 pub fn new(index: DynamicIndexReader) -> Self {
39c6bd86
DM
257
258 let archive_size = index.chunk_end(index.index_entries - 1);
259 Self {
260 index: index,
261 archive_size: archive_size,
262 read_buffer: Vec::with_capacity(1024*1024),
263 buffered_chunk_idx: 0,
264 buffered_chunk_start: 0,
265 read_offset: 0,
266 }
267 }
268
269 pub fn archive_size(&self) -> u64 { self.archive_size }
270
0a72e267
DM
271 fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
272
0b05fd58 273 let index = &self.index;
0a72e267
DM
274 let end = index.chunk_end(idx);
275 let digest = index.chunk_digest(idx);
f98ac774
DM
276
277 let chunk = index.store.read_chunk(digest)?;
278 // fimxe: handle encrypted chunks
279 // fixme: avoid copy
280 let data = chunk.decode(None)?;
281
282 self.read_buffer.clear();
283 self.read_buffer.extend_from_slice(&data);
0a72e267
DM
284
285 self.buffered_chunk_idx = idx;
286 self.buffered_chunk_start = end - (self.read_buffer.len() as u64);
287 //println!("BUFFER {} {}", self.buffered_chunk_start, end);
288 Ok(())
289 }
290}
291
fded74d0 292impl crate::tools::BufferedRead for BufferedDynamicReader {
0a72e267
DM
293
294 fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
39c6bd86 295
318564ac
DM
296 if offset == self.archive_size { return Ok(&self.read_buffer[0..0]); }
297
39c6bd86 298 let buffer_len = self.read_buffer.len();
0b05fd58 299 let index = &self.index;
39c6bd86
DM
300
301 // optimization for sequential read
302 if buffer_len > 0 &&
303 ((self.buffered_chunk_idx + 1) < index.index_entries) &&
304 (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
305 {
306 let next_idx = self.buffered_chunk_idx + 1;
307 let next_end = index.chunk_end(next_idx);
308 if offset < next_end {
373ef4a5 309 self.buffer_chunk(next_idx)?;
39c6bd86
DM
310 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
311 return Ok(&self.read_buffer[buffer_offset..]);
312 }
313 }
314
315 if (buffer_len == 0) ||
316 (offset < self.buffered_chunk_start) ||
317 (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
318 {
319 let end_idx = index.index_entries - 1;
320 let end = index.chunk_end(end_idx);
321 let idx = index.binary_search(0, 0, end_idx, end, offset)?;
373ef4a5 322 self.buffer_chunk(idx)?;
39c6bd86
DM
323 }
324
325 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
326 Ok(&self.read_buffer[buffer_offset..])
327 }
328
39c6bd86 329}
77703d95 330
93d5d779 331impl std::io::Read for BufferedDynamicReader {
4624fe29
DM
332
333 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
334
335 use std::io::{Error, ErrorKind};
fded74d0 336 use crate::tools::BufferedRead;
4624fe29
DM
337
338 let data = match self.buffered_read(self.read_offset) {
339 Ok(v) => v,
340 Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
341 };
342
343 let n = if data.len() > buf.len() { buf.len() } else { data.len() };
344
345 unsafe { std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n); }
346
347 self.read_offset += n as u64;
348
349 return Ok(n);
350 }
351}
352
93d5d779 353impl std::io::Seek for BufferedDynamicReader {
34337050
DM
354
355 fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, std::io::Error> {
356
ddbdf80d 357 use std::io::{SeekFrom};
34337050
DM
358
359 let new_offset = match pos {
360 SeekFrom::Start(start_offset) => start_offset as i64,
361 SeekFrom::End(end_offset) => (self.archive_size as i64)+ end_offset,
362 SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
363 };
364
ddbdf80d 365 use std::io::{Error, ErrorKind};
34337050
DM
366 if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
367 return Err(Error::new(
368 ErrorKind::Other,
369 format!("seek is out of range {} ([0..{}])", new_offset, self.archive_size)));
370 }
371 self.read_offset = new_offset as u64;
372
373 Ok(self.read_offset)
374 }
375}
376
976595e1 377/// Create dynamic index files (`.dixd`)
93d5d779 378pub struct DynamicIndexWriter {
1629d2ad 379 store: Arc<ChunkStore>,
43b13033 380 _lock: tools::ProcessLockSharedGuard,
5032b57b 381 writer: BufWriter<File>,
5e7a09be 382 closed: bool,
0433db19
DM
383 filename: PathBuf,
384 tmp_filename: PathBuf,
9f49fe1d
DM
385 pub uuid: [u8; 16],
386 pub ctime: u64,
0433db19
DM
387}
388
93d5d779 389impl Drop for DynamicIndexWriter {
1629d2ad
DM
390
391 fn drop(&mut self) {
392 let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors
393 }
394}
395
93d5d779 396impl DynamicIndexWriter {
0433db19 397
976595e1 398 pub fn create(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> {
0433db19 399
43b13033
DM
400 let shared_lock = store.try_shared_lock()?;
401
0433db19
DM
402 let full_path = store.relative_path(path);
403 let mut tmp_path = full_path.clone();
93d5d779 404 tmp_path.set_extension("tmp_didx");
0433db19 405
ddbdf80d 406 let file = std::fs::OpenOptions::new()
0433db19
DM
407 .create(true).truncate(true)
408 .read(true)
409 .write(true)
410 .open(&tmp_path)?;
411
5032b57b
DM
412 let mut writer = BufWriter::with_capacity(1024*1024, file);
413
93d5d779 414 let header_size = std::mem::size_of::<DynamicIndexHeader>();
0433db19
DM
415
416 // todo: use static assertion when available in rust
417 if header_size != 4096 { panic!("got unexpected header size"); }
418
419 let ctime = std::time::SystemTime::now().duration_since(
420 std::time::SystemTime::UNIX_EPOCH)?.as_secs();
421
422 let uuid = Uuid::new_v4();
423
8ea3b1d1 424 let mut buffer = vec::undefined(header_size);
93d5d779 425 let header = crate::tools::map_struct_mut::<DynamicIndexHeader>(&mut buffer)?;
0433db19 426
93d5d779 427 header.magic = *b"PROXMOX-DIDX";
0433db19
DM
428 header.version = u32::to_le(1);
429 header.ctime = u64::to_le(ctime);
430 header.uuid = *uuid.as_bytes();
431
5032b57b 432 writer.write_all(&buffer)?;
0433db19
DM
433
434 Ok(Self {
435 store,
43b13033 436 _lock: shared_lock,
5032b57b 437 writer: writer,
5e7a09be 438 closed: false,
0433db19
DM
439 filename: full_path,
440 tmp_filename: tmp_path,
441 ctime,
442 uuid: *uuid.as_bytes(),
0433db19
DM
443 })
444 }
5e7a09be 445
f98ac774
DM
446 // fixme: use add_chunk instead?
447 pub fn insert_chunk(&self, chunk: &DataChunk) -> Result<(bool, u64), Error> {
976595e1
DM
448 self.store.insert_chunk(chunk)
449 }
450
5e7a09be
DM
451 pub fn close(&mut self) -> Result<(), Error> {
452
453 if self.closed {
454 bail!("cannot close already closed archive index file {:?}", self.filename);
455 }
456
457 self.closed = true;
458
5032b57b 459 self.writer.flush()?;
5e7a09be 460
5e7a09be
DM
461 if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
462 bail!("Atomic rename file {:?} failed - {}", self.filename, err);
463 }
464
465 Ok(())
466 }
467
976595e1
DM
468 // fixme: rename to add_digest
469 pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), Error> {
470 if self.closed {
471 bail!("cannot write to closed dynamic index file {:?}", self.filename);
472 }
473 self.writer.write(unsafe { &std::mem::transmute::<u64, [u8;8]>(offset.to_le()) })?;
474 self.writer.write(digest)?;
475 Ok(())
476 }
477}
478
479/// Writer which splits a binary stream into dynamic sized chunks
480///
481/// And store the resulting chunk list into the index file.
482pub struct DynamicChunkWriter {
483 index: DynamicIndexWriter,
484 closed: bool,
485 chunker: Chunker,
486 stat: ChunkStat,
487 chunk_offset: usize,
488 last_chunk: usize,
489 chunk_buffer: Vec<u8>,
490}
491
492impl DynamicChunkWriter {
493
494 pub fn new(index: DynamicIndexWriter, chunk_size: usize) -> Self {
495 Self {
496 index,
497 closed: false,
498 chunker: Chunker::new(chunk_size),
499 stat: ChunkStat::new(0),
500 chunk_offset: 0,
501 last_chunk: 0,
502 chunk_buffer: Vec::with_capacity(chunk_size*4),
503 }
504 }
505
7e336555
DM
506 pub fn stat(&self) -> &ChunkStat {
507 &self.stat
508 }
509
976595e1 510 pub fn close(&mut self) -> Result<(), Error> {
5e7a09be 511
976595e1
DM
512 if self.closed {
513 return Ok(());
514 }
515
516 self.closed = true;
517
518 self.write_chunk_buffer()?;
519
520 self.index.close()?;
521
522 self.stat.size = self.chunk_offset as u64;
523
524 // add size of index file
525 self.stat.size += (self.stat.chunk_count*40 + std::mem::size_of::<DynamicIndexHeader>()) as u64;
526
527 Ok(())
528 }
529
530 fn write_chunk_buffer(&mut self) -> Result<(), Error> {
5e7a09be
DM
531
532 let chunk_size = self.chunk_buffer.len();
533
534 if chunk_size == 0 { return Ok(()); }
535
536 let expected_chunk_size = self.chunk_offset - self.last_chunk;
537 if expected_chunk_size != self.chunk_buffer.len() {
976595e1 538 bail!("wrong chunk size {} != {}", expected_chunk_size, chunk_size);
5e7a09be
DM
539 }
540
7e336555 541 self.stat.chunk_count += 1;
247cdbce 542
5e7a09be
DM
543 self.last_chunk = self.chunk_offset;
544
f98ac774
DM
545 let chunk = DataChunkBuilder::new(&self.chunk_buffer)
546 .compress(true)
547 .build()?;
548
549 let digest = chunk.digest();
550
551 match self.index.insert_chunk(&chunk) {
552 Ok((is_duplicate, compressed_size)) => {
798f7fa0 553
7e336555 554 self.stat.compressed_size += compressed_size;
798f7fa0 555 if is_duplicate {
7e336555 556 self.stat.duplicate_chunks += 1;
798f7fa0 557 } else {
7e336555 558 self.stat.disk_size += compressed_size;
798f7fa0
DM
559 }
560
561 println!("ADD CHUNK {:016x} {} {}% {} {}", self.chunk_offset, chunk_size,
f98ac774 562 (compressed_size*100)/(chunk_size as u64), is_duplicate, tools::digest_to_hex(digest));
976595e1 563 self.index.add_chunk(self.chunk_offset as u64, &digest)?;
5e7a09be
DM
564 self.chunk_buffer.truncate(0);
565 return Ok(());
566 }
567 Err(err) => {
568 self.chunk_buffer.truncate(0);
976595e1 569 return Err(err);
5e7a09be
DM
570 }
571 }
5e7a09be 572 }
0433db19
DM
573}
574
976595e1 575impl Write for DynamicChunkWriter {
0433db19
DM
576
577 fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
578
0433db19
DM
579 let chunker = &mut self.chunker;
580
581 let pos = chunker.scan(data);
582
583 if pos > 0 {
584 self.chunk_buffer.extend(&data[0..pos]);
585 self.chunk_offset += pos;
586
976595e1
DM
587 if let Err(err) = self.write_chunk_buffer() {
588 return Err(std::io::Error::new(std::io::ErrorKind::Other, err.to_string()));
589 }
5e7a09be 590 Ok(pos)
0433db19
DM
591
592 } else {
593 self.chunk_offset += data.len();
594 self.chunk_buffer.extend(data);
5e7a09be 595 Ok(data.len())
0433db19
DM
596 }
597 }
598
599 fn flush(&mut self) -> std::result::Result<(), std::io::Error> {
976595e1 600 Err(std::io::Error::new(std::io::ErrorKind::Other, "please use close() instead of flush()"))
0433db19
DM
601 }
602}