]> git.proxmox.com Git - proxmox-backup.git/blame - src/backup/dynamic_index.rs
make FixedIndexReader Send and update comments
[proxmox-backup.git] / src / backup / dynamic_index.rs
CommitLineData
0433db19
DM
1use failure::*;
2
22968600 3use crate::tools;
7e336555 4use super::chunk_stat::*;
0433db19
DM
5use super::chunk_store::*;
6use super::chunker::*;
7
1629d2ad 8use std::sync::Arc;
5032b57b 9use std::io::{Read, Write, BufWriter};
0433db19
DM
10use std::fs::File;
11use std::path::{Path, PathBuf};
12use std::os::unix::io::AsRawFd;
13use uuid::Uuid;
ddbdf80d 14//use chrono::{Local, TimeZone};
0433db19 15
e5064ba6 16/// Header format definition for dynamic index files (`.dixd`)
0433db19 17#[repr(C)]
93d5d779 18pub struct DynamicIndexHeader {
e5064ba6 19 /// The string `PROXMOX-DIDX`
0433db19
DM
20 pub magic: [u8; 12],
21 pub version: u32,
22 pub uuid: [u8; 16],
23 pub ctime: u64,
44b3f62b 24 reserved: [u8; 4056], // overall size is one page (4096 bytes)
0433db19
DM
25}
26
77703d95 27
93d5d779 28pub struct DynamicIndexReader {
150f1bd8 29 store: Arc<ChunkStore>,
728797d0 30 _file: File,
9f49fe1d 31 pub size: usize,
77703d95
DM
32 filename: PathBuf,
33 index: *const u8,
34 index_entries: usize,
9f49fe1d
DM
35 pub uuid: [u8; 16],
36 pub ctime: u64,
77703d95
DM
37}
38
5be4065b
WB
39// `index` is mmap()ed which cannot be thread-local so should be sendable
40// FIXME: Introduce an mmap wrapper type for this?
93d5d779 41unsafe impl Send for DynamicIndexReader {}
0b05fd58 42
93d5d779 43impl Drop for DynamicIndexReader {
77703d95
DM
44
45 fn drop(&mut self) {
46 if let Err(err) = self.unmap() {
47 eprintln!("Unable to unmap file {:?} - {}", self.filename, err);
48 }
49 }
50}
51
93d5d779 52impl DynamicIndexReader {
77703d95 53
150f1bd8 54 pub fn open(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> {
77703d95
DM
55
56 let full_path = store.relative_path(path);
57
58 let mut file = std::fs::File::open(&full_path)?;
59
93d5d779 60 let header_size = std::mem::size_of::<DynamicIndexHeader>();
77703d95
DM
61
62 // todo: use static assertion when available in rust
63 if header_size != 4096 { bail!("got unexpected header size for {:?}", path); }
64
65 let mut buffer = vec![0u8; header_size];
66 file.read_exact(&mut buffer)?;
67
93d5d779 68 let header = unsafe { &mut * (buffer.as_ptr() as *mut DynamicIndexHeader) };
77703d95 69
93d5d779 70 if header.magic != *b"PROXMOX-DIDX" {
77703d95
DM
71 bail!("got unknown magic number for {:?}", path);
72 }
73
74 let version = u32::from_le(header.version);
75 if version != 1 {
76 bail!("got unsupported version number ({}) for {:?}", version, path);
77 }
78
79 let ctime = u64::from_le(header.ctime);
80
81 let rawfd = file.as_raw_fd();
82
83 let stat = match nix::sys::stat::fstat(rawfd) {
84 Ok(stat) => stat,
85 Err(err) => bail!("fstat {:?} failed - {}", path, err),
86 };
87
88 let size = stat.st_size as usize;
89
ddbdf80d 90 let index_size = size - header_size;
77703d95
DM
91 if (index_size % 40) != 0 {
92 bail!("got unexpected file size for {:?}", path);
93 }
94
95 let data = unsafe { nix::sys::mman::mmap(
96 std::ptr::null_mut(),
97 index_size,
98 nix::sys::mman::ProtFlags::PROT_READ,
99 nix::sys::mman::MapFlags::MAP_PRIVATE,
100 rawfd,
101 header_size as i64) }? as *const u8;
102
77703d95
DM
103 Ok(Self {
104 store,
105 filename: full_path,
728797d0 106 _file: file,
77703d95
DM
107 size,
108 index: data,
109 index_entries: index_size/40,
110 ctime,
111 uuid: header.uuid,
112 })
113 }
114
115 fn unmap(&mut self) -> Result<(), Error> {
116
117 if self.index == std::ptr::null_mut() { return Ok(()); }
118
b663789b 119 if let Err(err) = unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, self.index_entries*40) } {
77703d95
DM
120 bail!("unmap file {:?} failed - {}", self.filename, err);
121 }
122
123 self.index = std::ptr::null_mut();
124
125 Ok(())
126 }
127
39c6bd86
DM
128 #[inline]
129 fn chunk_end(&self, pos: usize) -> u64 {
130 if pos >= self.index_entries {
131 panic!("chunk index out of range");
132 }
133 unsafe { *(self.index.add(pos*40) as *const u64) }
134 }
135
136 #[inline]
137 fn chunk_digest(&self, pos: usize) -> &[u8] {
138 if pos >= self.index_entries {
139 panic!("chunk index out of range");
140 }
141 unsafe { std::slice::from_raw_parts(self.index.add(pos*40+8), 32) }
142 }
143
ddbdf80d 144 pub fn mark_used_chunks(&self, _status: &mut GarbageCollectionStatus) -> Result<(), Error> {
77703d95
DM
145
146 for pos in 0..self.index_entries {
39c6bd86 147 let digest = self.chunk_digest(pos);
77703d95
DM
148 if let Err(err) = self.store.touch_chunk(digest) {
149 bail!("unable to access chunk {}, required by {:?} - {}",
22968600 150 tools::digest_to_hex(digest), self.filename, err);
77703d95
DM
151 }
152 }
153 Ok(())
154 }
96df2fb4
DM
155
156 pub fn dump_catar(&self, mut writer: Box<Write>) -> Result<(), Error> {
157
df9973e8
DM
158 let mut buffer = Vec::with_capacity(1024*1024);
159
96df2fb4 160 for pos in 0..self.index_entries {
ddbdf80d 161 let _end = self.chunk_end(pos);
39c6bd86
DM
162 let digest = self.chunk_digest(pos);
163 //println!("Dump {:08x}", end );
df9973e8 164 self.store.read_chunk(digest, &mut buffer)?;
df9973e8 165 writer.write_all(&buffer)?;
96df2fb4
DM
166
167 }
168
169 Ok(())
170 }
39c6bd86
DM
171
172 fn binary_search(
173 &self,
174 start_idx: usize,
175 start: u64,
176 end_idx: usize,
177 end: u64,
178 offset: u64
179 ) -> Result<usize, Error> {
180
181 if (offset >= end) || (offset < start) {
182 bail!("offset out of range");
183 }
184
185 if end_idx == start_idx {
186 return Ok(start_idx); // found
187 }
188 let middle_idx = (start_idx + end_idx)/2;
189 let middle_end = self.chunk_end(middle_idx);
190
191 if offset < middle_end {
192 return self.binary_search(start_idx, start, middle_idx, middle_end, offset);
193 } else {
194 return self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset);
195 }
196 }
197}
198
93d5d779
DM
199pub struct BufferedDynamicReader {
200 index: DynamicIndexReader,
39c6bd86
DM
201 archive_size: u64,
202 read_buffer: Vec<u8>,
203 buffered_chunk_idx: usize,
204 buffered_chunk_start: u64,
205 read_offset: u64,
77703d95
DM
206}
207
93d5d779 208impl BufferedDynamicReader {
39c6bd86 209
93d5d779 210 pub fn new(index: DynamicIndexReader) -> Self {
39c6bd86
DM
211
212 let archive_size = index.chunk_end(index.index_entries - 1);
213 Self {
214 index: index,
215 archive_size: archive_size,
216 read_buffer: Vec::with_capacity(1024*1024),
217 buffered_chunk_idx: 0,
218 buffered_chunk_start: 0,
219 read_offset: 0,
220 }
221 }
222
223 pub fn archive_size(&self) -> u64 { self.archive_size }
224
0a72e267
DM
225 fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
226
0b05fd58 227 let index = &self.index;
0a72e267
DM
228 let end = index.chunk_end(idx);
229 let digest = index.chunk_digest(idx);
230 index.store.read_chunk(digest, &mut self.read_buffer)?;
231
232 self.buffered_chunk_idx = idx;
233 self.buffered_chunk_start = end - (self.read_buffer.len() as u64);
234 //println!("BUFFER {} {}", self.buffered_chunk_start, end);
235 Ok(())
236 }
237}
238
fded74d0 239impl crate::tools::BufferedRead for BufferedDynamicReader {
0a72e267
DM
240
241 fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
39c6bd86 242
318564ac
DM
243 if offset == self.archive_size { return Ok(&self.read_buffer[0..0]); }
244
39c6bd86 245 let buffer_len = self.read_buffer.len();
0b05fd58 246 let index = &self.index;
39c6bd86
DM
247
248 // optimization for sequential read
249 if buffer_len > 0 &&
250 ((self.buffered_chunk_idx + 1) < index.index_entries) &&
251 (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
252 {
253 let next_idx = self.buffered_chunk_idx + 1;
254 let next_end = index.chunk_end(next_idx);
255 if offset < next_end {
373ef4a5 256 self.buffer_chunk(next_idx)?;
39c6bd86
DM
257 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
258 return Ok(&self.read_buffer[buffer_offset..]);
259 }
260 }
261
262 if (buffer_len == 0) ||
263 (offset < self.buffered_chunk_start) ||
264 (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
265 {
266 let end_idx = index.index_entries - 1;
267 let end = index.chunk_end(end_idx);
268 let idx = index.binary_search(0, 0, end_idx, end, offset)?;
373ef4a5 269 self.buffer_chunk(idx)?;
39c6bd86
DM
270 }
271
272 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
273 Ok(&self.read_buffer[buffer_offset..])
274 }
275
39c6bd86 276}
77703d95 277
93d5d779 278impl std::io::Read for BufferedDynamicReader {
4624fe29
DM
279
280 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
281
282 use std::io::{Error, ErrorKind};
fded74d0 283 use crate::tools::BufferedRead;
4624fe29
DM
284
285 let data = match self.buffered_read(self.read_offset) {
286 Ok(v) => v,
287 Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
288 };
289
290 let n = if data.len() > buf.len() { buf.len() } else { data.len() };
291
292 unsafe { std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n); }
293
294 self.read_offset += n as u64;
295
296 return Ok(n);
297 }
298}
299
93d5d779 300impl std::io::Seek for BufferedDynamicReader {
34337050
DM
301
302 fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, std::io::Error> {
303
ddbdf80d 304 use std::io::{SeekFrom};
34337050
DM
305
306 let new_offset = match pos {
307 SeekFrom::Start(start_offset) => start_offset as i64,
308 SeekFrom::End(end_offset) => (self.archive_size as i64)+ end_offset,
309 SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
310 };
311
ddbdf80d 312 use std::io::{Error, ErrorKind};
34337050
DM
313 if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
314 return Err(Error::new(
315 ErrorKind::Other,
316 format!("seek is out of range {} ([0..{}])", new_offset, self.archive_size)));
317 }
318 self.read_offset = new_offset as u64;
319
320 Ok(self.read_offset)
321 }
322}
323
93d5d779 324pub struct DynamicIndexWriter {
1629d2ad 325 store: Arc<ChunkStore>,
0433db19 326 chunker: Chunker,
5032b57b 327 writer: BufWriter<File>,
5e7a09be 328 closed: bool,
0433db19
DM
329 filename: PathBuf,
330 tmp_filename: PathBuf,
9f49fe1d
DM
331 pub uuid: [u8; 16],
332 pub ctime: u64,
0433db19 333
7e336555
DM
334 stat: ChunkStat,
335
0433db19
DM
336 chunk_offset: usize,
337 last_chunk: usize,
338 chunk_buffer: Vec<u8>,
339}
340
93d5d779 341impl Drop for DynamicIndexWriter {
1629d2ad
DM
342
343 fn drop(&mut self) {
344 let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors
345 }
346}
347
93d5d779 348impl DynamicIndexWriter {
0433db19 349
1629d2ad 350 pub fn create(store: Arc<ChunkStore>, path: &Path, chunk_size: usize) -> Result<Self, Error> {
0433db19
DM
351
352 let full_path = store.relative_path(path);
353 let mut tmp_path = full_path.clone();
93d5d779 354 tmp_path.set_extension("tmp_didx");
0433db19 355
ddbdf80d 356 let file = std::fs::OpenOptions::new()
0433db19
DM
357 .create(true).truncate(true)
358 .read(true)
359 .write(true)
360 .open(&tmp_path)?;
361
5032b57b
DM
362 let mut writer = BufWriter::with_capacity(1024*1024, file);
363
93d5d779 364 let header_size = std::mem::size_of::<DynamicIndexHeader>();
0433db19
DM
365
366 // todo: use static assertion when available in rust
367 if header_size != 4096 { panic!("got unexpected header size"); }
368
369 let ctime = std::time::SystemTime::now().duration_since(
370 std::time::SystemTime::UNIX_EPOCH)?.as_secs();
371
372 let uuid = Uuid::new_v4();
373
374 let mut buffer = vec![0u8; header_size];
93d5d779 375 let header = crate::tools::map_struct_mut::<DynamicIndexHeader>(&mut buffer)?;
0433db19 376
93d5d779 377 header.magic = *b"PROXMOX-DIDX";
0433db19
DM
378 header.version = u32::to_le(1);
379 header.ctime = u64::to_le(ctime);
380 header.uuid = *uuid.as_bytes();
381
5032b57b 382 writer.write_all(&buffer)?;
0433db19
DM
383
384 Ok(Self {
385 store,
386 chunker: Chunker::new(chunk_size),
5032b57b 387 writer: writer,
5e7a09be 388 closed: false,
0433db19
DM
389 filename: full_path,
390 tmp_filename: tmp_path,
391 ctime,
392 uuid: *uuid.as_bytes(),
393
7e336555
DM
394 stat: ChunkStat::new(0),
395
0433db19
DM
396 chunk_offset: 0,
397 last_chunk: 0,
398 chunk_buffer: Vec::with_capacity(chunk_size*4),
399 })
400 }
5e7a09be
DM
401
402 pub fn close(&mut self) -> Result<(), Error> {
403
404 if self.closed {
405 bail!("cannot close already closed archive index file {:?}", self.filename);
406 }
407
408 self.closed = true;
409
410 self.write_chunk_buffer()?;
411
5032b57b 412 self.writer.flush()?;
5e7a09be 413
7e336555
DM
414 self.stat.size = self.chunk_offset as u64;
415
416 // add size of index file
417 self.stat.size += (self.stat.chunk_count*40 + std::mem::size_of::<DynamicIndexHeader>()) as u64;
418
419 println!("STAT: {:?}", self.stat);
420
5e7a09be
DM
421 // fixme:
422
423 if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
424 bail!("Atomic rename file {:?} failed - {}", self.filename, err);
425 }
426
427 Ok(())
428 }
429
7e336555
DM
430 pub fn stat(&self) -> &ChunkStat {
431 &self.stat
432 }
433
5e7a09be
DM
434 fn write_chunk_buffer(&mut self) -> Result<(), std::io::Error> {
435
436 use std::io::{Error, ErrorKind};
437
438 let chunk_size = self.chunk_buffer.len();
439
440 if chunk_size == 0 { return Ok(()); }
441
442 let expected_chunk_size = self.chunk_offset - self.last_chunk;
443 if expected_chunk_size != self.chunk_buffer.len() {
444 return Err(Error::new(
445 ErrorKind::Other,
446 format!("wrong chunk size {} != {}", expected_chunk_size, chunk_size)));
447 }
448
7e336555 449 self.stat.chunk_count += 1;
247cdbce 450
5e7a09be
DM
451 self.last_chunk = self.chunk_offset;
452
453 match self.store.insert_chunk(&self.chunk_buffer) {
798f7fa0
DM
454 Ok((is_duplicate, digest, compressed_size)) => {
455
7e336555 456 self.stat.compressed_size += compressed_size;
798f7fa0 457 if is_duplicate {
7e336555 458 self.stat.duplicate_chunks += 1;
798f7fa0 459 } else {
7e336555 460 self.stat.disk_size += compressed_size;
798f7fa0
DM
461 }
462
463 println!("ADD CHUNK {:016x} {} {}% {} {}", self.chunk_offset, chunk_size,
464 (compressed_size*100)/(chunk_size as u64), is_duplicate, tools::digest_to_hex(&digest));
f661f374 465 self.add_chunk(self.chunk_offset as u64, &digest)?;
5e7a09be
DM
466 self.chunk_buffer.truncate(0);
467 return Ok(());
468 }
469 Err(err) => {
470 self.chunk_buffer.truncate(0);
471 return Err(Error::new(ErrorKind::Other, err.to_string()));
472 }
473 }
5e7a09be 474 }
f661f374
WB
475
476 pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), std::io::Error> {
477 self.writer.write(unsafe { &std::mem::transmute::<u64, [u8;8]>(offset.to_le()) })?;
478 self.writer.write(digest)?;
479 Ok(())
480 }
0433db19
DM
481}
482
93d5d779 483impl Write for DynamicIndexWriter {
0433db19
DM
484
485 fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
486
0433db19
DM
487 let chunker = &mut self.chunker;
488
489 let pos = chunker.scan(data);
490
491 if pos > 0 {
492 self.chunk_buffer.extend(&data[0..pos]);
493 self.chunk_offset += pos;
494
5e7a09be
DM
495 self.write_chunk_buffer()?;
496 Ok(pos)
0433db19
DM
497
498 } else {
499 self.chunk_offset += data.len();
500 self.chunk_buffer.extend(data);
5e7a09be 501 Ok(data.len())
0433db19
DM
502 }
503 }
504
505 fn flush(&mut self) -> std::result::Result<(), std::io::Error> {
506
94a882e9
DM
507 use std::io::{Error, ErrorKind};
508
5e7a09be 509 Err(Error::new(ErrorKind::Other, "please use close() instead of flush()"))
0433db19
DM
510 }
511}