]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/fixed_index.rs
allow(clippy::cast_ptr_alignment)
[proxmox-backup.git] / src / backup / fixed_index.rs
1 use failure::*;
2 use std::io::{Seek, SeekFrom};
3 use std::convert::TryInto;
4
5 use crate::tools;
6 use super::IndexFile;
7 use super::chunk_stat::*;
8 use super::chunk_store::*;
9
10 use std::sync::Arc;
11 use std::io::Write;
12 use std::fs::File;
13 use std::path::{Path, PathBuf};
14 use std::os::unix::io::AsRawFd;
15 use uuid::Uuid;
16 use chrono::{Local, TimeZone};
17
18 use super::ChunkInfo;
19 use super::read_chunk::*;
20
21 use proxmox::tools::io::ReadExt;
22
23 /// Header format definition for fixed index files (`.fidx`)
24 #[repr(C)]
25 pub struct FixedIndexHeader {
26 pub magic: [u8; 8],
27 pub uuid: [u8; 16],
28 pub ctime: u64,
29 /// Sha256 over the index ``SHA256(digest1||digest2||...)``
30 pub index_csum: [u8; 32],
31 pub size: u64,
32 pub chunk_size: u64,
33 reserved: [u8; 4016], // overall size is one page (4096 bytes)
34 }
35 proxmox::tools::static_assert_size!(FixedIndexHeader, 4096);
36
37 // split image into fixed size chunks
38
39 pub struct FixedIndexReader {
40 _file: File,
41 pub chunk_size: usize,
42 pub size: u64,
43 index_length: usize,
44 index: *mut u8,
45 pub uuid: [u8; 16],
46 pub ctime: u64,
47 pub index_csum: [u8; 32],
48 }
49
50 // `index` is mmap()ed which cannot be thread-local so should be sendable
51 unsafe impl Send for FixedIndexReader {}
52 unsafe impl Sync for FixedIndexReader {}
53
54 impl Drop for FixedIndexReader {
55
56 fn drop(&mut self) {
57 if let Err(err) = self.unmap() {
58 eprintln!("Unable to unmap file - {}", err);
59 }
60 }
61 }
62
63 impl FixedIndexReader {
64
65 pub fn open(path: &Path) -> Result<Self, Error> {
66
67 File::open(path)
68 .map_err(Error::from)
69 .and_then(|file| Self::new(file))
70 .map_err(|err| format_err!("Unable to open fixed index {:?} - {}", path, err))
71 }
72
73 pub fn new(mut file: std::fs::File) -> Result<Self, Error> {
74
75 if let Err(err) = nix::fcntl::flock(file.as_raw_fd(), nix::fcntl::FlockArg::LockSharedNonblock) {
76 bail!("unable to get shared lock - {}", err);
77 }
78
79 file.seek(SeekFrom::Start(0))?;
80
81 let header_size = std::mem::size_of::<FixedIndexHeader>();
82 let header: Box<FixedIndexHeader> = unsafe { file.read_host_value_boxed()? };
83
84 if header.magic != super::FIXED_SIZED_CHUNK_INDEX_1_0 {
85 bail!("got unknown magic number");
86 }
87
88 let size = u64::from_le(header.size);
89 let ctime = u64::from_le(header.ctime);
90 let chunk_size = u64::from_le(header.chunk_size);
91
92 let index_length = ((size + chunk_size - 1)/chunk_size) as usize;
93 let index_size = index_length*32;
94
95 let rawfd = file.as_raw_fd();
96
97 let stat = match nix::sys::stat::fstat(rawfd) {
98 Ok(stat) => stat,
99 Err(err) => bail!("fstat failed - {}", err),
100 };
101
102 let expected_index_size = (stat.st_size as usize) - header_size;
103 if index_size != expected_index_size {
104 bail!("got unexpected file size ({} != {})", index_size, expected_index_size);
105 }
106
107 let data = unsafe { nix::sys::mman::mmap(
108 std::ptr::null_mut(),
109 index_size,
110 nix::sys::mman::ProtFlags::PROT_READ,
111 nix::sys::mman::MapFlags::MAP_PRIVATE,
112 file.as_raw_fd(),
113 header_size as i64) }? as *mut u8;
114
115 Ok(Self {
116 _file: file,
117 chunk_size: chunk_size as usize,
118 size,
119 index_length,
120 index: data,
121 ctime,
122 uuid: header.uuid,
123 index_csum: header.index_csum,
124 })
125 }
126
127 fn unmap(&mut self) -> Result<(), Error> {
128
129 if self.index == std::ptr::null_mut() { return Ok(()); }
130
131 let index_size = self.index_length*32;
132
133 if let Err(err) = unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) } {
134 bail!("unmap file failed - {}", err);
135 }
136
137 self.index = std::ptr::null_mut();
138
139 Ok(())
140 }
141
142 pub fn chunk_info(&self, pos: usize) -> Result<(u64, u64, [u8; 32]), Error> {
143
144 if pos >= self.index_length {
145 bail!("chunk index out of range");
146 }
147 let start = (pos * self.chunk_size) as u64;
148 let mut end = start + self.chunk_size as u64;
149
150 if end > self.size {
151 end = self.size;
152 }
153
154 let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit();
155 unsafe {
156 std::ptr::copy_nonoverlapping(
157 self.index.add(pos*32),
158 (*digest.as_mut_ptr()).as_mut_ptr(),
159 32,
160 );
161 }
162
163 Ok((start, end, unsafe { digest.assume_init() }))
164 }
165
166 #[inline]
167 fn chunk_digest(&self, pos: usize) -> &[u8; 32] {
168 if pos >= self.index_length {
169 panic!("chunk index out of range");
170 }
171 let slice = unsafe { std::slice::from_raw_parts(self.index.add(pos*32), 32) };
172 slice.try_into().unwrap()
173 }
174
175 #[inline]
176 fn chunk_end(&self, pos: usize) -> u64 {
177 if pos >= self.index_length {
178 panic!("chunk index out of range");
179 }
180
181 let end = ((pos+1) * self.chunk_size) as u64;
182 if end > self.size {
183 self.size
184 } else {
185 end
186 }
187 }
188
189 /// Compute checksum and data size
190 pub fn compute_csum(&self) -> ([u8; 32], u64) {
191
192 let mut csum = openssl::sha::Sha256::new();
193 let mut chunk_end = 0;
194 for pos in 0..self.index_length {
195 chunk_end = ((pos+1) * self.chunk_size) as u64;
196 let digest = self.chunk_digest(pos);
197 csum.update(digest);
198 }
199 let csum = csum.finish();
200
201 (csum, chunk_end)
202 }
203
204 pub fn print_info(&self) {
205 println!("Size: {}", self.size);
206 println!("ChunkSize: {}", self.chunk_size);
207 println!("CTime: {}", Local.timestamp(self.ctime as i64, 0).format("%c"));
208 println!("UUID: {:?}", self.uuid);
209 }
210 }
211
212 impl IndexFile for FixedIndexReader {
213 fn index_count(&self) -> usize {
214 self.index_length
215 }
216
217 fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> {
218 if pos >= self.index_length {
219 None
220 } else {
221 Some(unsafe { std::mem::transmute(self.index.add(pos*32)) })
222 }
223 }
224
225 fn index_bytes(&self) -> u64 {
226 self.size
227 }
228 }
229
230 pub struct FixedIndexWriter {
231 store: Arc<ChunkStore>,
232 file: File,
233 _lock: tools::ProcessLockSharedGuard,
234 filename: PathBuf,
235 tmp_filename: PathBuf,
236 chunk_size: usize,
237 size: usize,
238 index_length: usize,
239 index: *mut u8,
240 pub uuid: [u8; 16],
241 pub ctime: u64,
242 }
243
244 // `index` is mmap()ed which cannot be thread-local so should be sendable
245 unsafe impl Send for FixedIndexWriter {}
246
247 impl Drop for FixedIndexWriter {
248
249 fn drop(&mut self) {
250 let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors
251 if let Err(err) = self.unmap() {
252 eprintln!("Unable to unmap file {:?} - {}", self.tmp_filename, err);
253 }
254 }
255 }
256
257 impl FixedIndexWriter {
258
259 #[allow(clippy::cast_ptr_alignment)]
260 pub fn create(store: Arc<ChunkStore>, path: &Path, size: usize, chunk_size: usize) -> Result<Self, Error> {
261
262 let shared_lock = store.try_shared_lock()?;
263
264 let full_path = store.relative_path(path);
265 let mut tmp_path = full_path.clone();
266 tmp_path.set_extension("tmp_fidx");
267
268 let mut file = std::fs::OpenOptions::new()
269 .create(true).truncate(true)
270 .read(true)
271 .write(true)
272 .open(&tmp_path)?;
273
274 let header_size = std::mem::size_of::<FixedIndexHeader>();
275
276 // todo: use static assertion when available in rust
277 if header_size != 4096 { panic!("got unexpected header size"); }
278
279 let ctime = std::time::SystemTime::now().duration_since(
280 std::time::SystemTime::UNIX_EPOCH)?.as_secs();
281
282 let uuid = Uuid::new_v4();
283
284 let buffer = vec![0u8; header_size];
285 let header = unsafe { &mut * (buffer.as_ptr() as *mut FixedIndexHeader) };
286
287 header.magic = super::FIXED_SIZED_CHUNK_INDEX_1_0;
288 header.ctime = u64::to_le(ctime);
289 header.size = u64::to_le(size as u64);
290 header.chunk_size = u64::to_le(chunk_size as u64);
291 header.uuid = *uuid.as_bytes();
292
293 header.index_csum = [0u8; 32];
294
295 file.write_all(&buffer)?;
296
297 let index_length = (size + chunk_size - 1)/chunk_size;
298 let index_size = index_length*32;
299 nix::unistd::ftruncate(file.as_raw_fd(), (header_size + index_size) as i64)?;
300
301 let data = unsafe { nix::sys::mman::mmap(
302 std::ptr::null_mut(),
303 index_size,
304 nix::sys::mman::ProtFlags::PROT_READ | nix::sys::mman::ProtFlags::PROT_WRITE,
305 nix::sys::mman::MapFlags::MAP_SHARED,
306 file.as_raw_fd(),
307 header_size as i64) }? as *mut u8;
308
309 Ok(Self {
310 store,
311 file,
312 _lock: shared_lock,
313 filename: full_path,
314 tmp_filename: tmp_path,
315 chunk_size,
316 size,
317 index_length,
318 index: data,
319 ctime,
320 uuid: *uuid.as_bytes(),
321 })
322 }
323
324 pub fn index_length(&self) -> usize {
325 self.index_length
326 }
327
328 fn unmap(&mut self) -> Result<(), Error> {
329
330 if self.index == std::ptr::null_mut() { return Ok(()); }
331
332 let index_size = self.index_length*32;
333
334 if let Err(err) = unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) } {
335 bail!("unmap file {:?} failed - {}", self.tmp_filename, err);
336 }
337
338 self.index = std::ptr::null_mut();
339
340 Ok(())
341 }
342
343 pub fn close(&mut self) -> Result<[u8; 32], Error> {
344
345 if self.index == std::ptr::null_mut() { bail!("cannot close already closed index file."); }
346
347 let index_size = self.index_length*32;
348 let data = unsafe { std::slice::from_raw_parts(self.index, index_size) };
349 let index_csum = openssl::sha::sha256(data);
350
351 self.unmap()?;
352
353 let csum_offset = proxmox::tools::offsetof!(FixedIndexHeader, index_csum);
354 self.file.seek(SeekFrom::Start(csum_offset as u64))?;
355 self.file.write_all(&index_csum)?;
356 self.file.flush()?;
357
358 if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
359 bail!("Atomic rename file {:?} failed - {}", self.filename, err);
360 }
361
362 Ok(index_csum)
363 }
364
365 pub fn check_chunk_alignment(&self, offset: usize, chunk_len: usize) -> Result<usize, Error> {
366
367 if offset < chunk_len {
368 bail!("got chunk with small offset ({} < {}", offset, chunk_len);
369 }
370
371 let pos = offset - chunk_len;
372
373 if offset > self.size {
374 bail!("chunk data exceeds size ({} >= {})", offset, self.size);
375 }
376
377 // last chunk can be smaller
378 if ((offset != self.size) && (chunk_len != self.chunk_size)) ||
379 (chunk_len > self.chunk_size) || (chunk_len == 0) {
380 bail!("chunk with unexpected length ({} != {}", chunk_len, self.chunk_size);
381 }
382
383 if pos & (self.chunk_size-1) != 0 {
384 bail!("got unaligned chunk (pos = {})", pos);
385 }
386
387 Ok(pos / self.chunk_size)
388 }
389
390 // Note: We want to add data out of order, so do not assume any order here.
391 pub fn add_chunk(&mut self, chunk_info: &ChunkInfo, stat: &mut ChunkStat) -> Result<(), Error> {
392
393 let chunk_len = chunk_info.chunk_len as usize;
394 let offset = chunk_info.offset as usize; // end of chunk
395
396 let idx = self.check_chunk_alignment(offset, chunk_len)?;
397
398 let (is_duplicate, compressed_size) =
399 self.store.insert_chunk(&chunk_info.chunk, &chunk_info.digest)?;
400
401 stat.chunk_count += 1;
402 stat.compressed_size += compressed_size;
403
404 let digest = &chunk_info.digest;
405
406 println!("ADD CHUNK {} {} {}% {} {}", idx, chunk_len,
407 (compressed_size*100)/(chunk_len as u64), is_duplicate, proxmox::tools::digest_to_hex(digest));
408
409 if is_duplicate {
410 stat.duplicate_chunks += 1;
411 } else {
412 stat.disk_size += compressed_size;
413 }
414
415 self.add_digest(idx, digest)
416 }
417
418 pub fn add_digest(&mut self, index: usize, digest: &[u8; 32]) -> Result<(), Error> {
419
420 if index >= self.index_length {
421 bail!("add digest failed - index out of range ({} >= {})", index, self.index_length);
422 }
423
424 if self.index == std::ptr::null_mut() { bail!("cannot write to closed index file."); }
425
426 let index_pos = index*32;
427 unsafe {
428 let dst = self.index.add(index_pos);
429 dst.copy_from_nonoverlapping(digest.as_ptr(), 32);
430 }
431
432 Ok(())
433 }
434 }
435
436 pub struct BufferedFixedReader<S> {
437 store: S,
438 index: FixedIndexReader,
439 archive_size: u64,
440 read_buffer: Vec<u8>,
441 buffered_chunk_idx: usize,
442 buffered_chunk_start: u64,
443 read_offset: u64,
444 }
445
446 impl <S: ReadChunk> BufferedFixedReader<S> {
447
448 pub fn new(index: FixedIndexReader, store: S) -> Self {
449
450 let archive_size = index.size;
451 Self {
452 store,
453 index,
454 archive_size,
455 read_buffer: Vec::with_capacity(1024*1024),
456 buffered_chunk_idx: 0,
457 buffered_chunk_start: 0,
458 read_offset: 0,
459 }
460 }
461
462 pub fn archive_size(&self) -> u64 { self.archive_size }
463
464 fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
465
466 let index = &self.index;
467 let (start, end, digest) = index.chunk_info(idx)?;
468
469 // fixme: avoid copy
470
471 let data = self.store.read_chunk(&digest)?;
472
473 if (end - start) != data.len() as u64 {
474 bail!("read chunk with wrong size ({} != {}", (end - start), data.len());
475 }
476
477 self.read_buffer.clear();
478 self.read_buffer.extend_from_slice(&data);
479
480 self.buffered_chunk_idx = idx;
481
482 self.buffered_chunk_start = start as u64;
483 //println!("BUFFER {} {}", self.buffered_chunk_start, end);
484 Ok(())
485 }
486 }
487
488 impl <S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> {
489
490 fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
491
492 if offset == self.archive_size { return Ok(&self.read_buffer[0..0]); }
493
494 let buffer_len = self.read_buffer.len();
495 let index = &self.index;
496
497 // optimization for sequential read
498 if buffer_len > 0 &&
499 ((self.buffered_chunk_idx + 1) < index.index_length) &&
500 (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
501 {
502 let next_idx = self.buffered_chunk_idx + 1;
503 let next_end = index.chunk_end(next_idx);
504 if offset < next_end {
505 self.buffer_chunk(next_idx)?;
506 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
507 return Ok(&self.read_buffer[buffer_offset..]);
508 }
509 }
510
511 if (buffer_len == 0) ||
512 (offset < self.buffered_chunk_start) ||
513 (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
514 {
515 let idx = (offset / index.chunk_size as u64) as usize;
516 self.buffer_chunk(idx)?;
517 }
518
519 let buffer_offset = (offset - self.buffered_chunk_start) as usize;
520 Ok(&self.read_buffer[buffer_offset..])
521 }
522 }
523
524 impl <S: ReadChunk> std::io::Read for BufferedFixedReader<S> {
525
526 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
527
528 use std::io::{Error, ErrorKind};
529 use crate::tools::BufferedRead;
530
531 let data = match self.buffered_read(self.read_offset) {
532 Ok(v) => v,
533 Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
534 };
535
536 let n = if data.len() > buf.len() { buf.len() } else { data.len() };
537
538 unsafe { std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n); }
539
540 self.read_offset += n as u64;
541
542 return Ok(n);
543 }
544 }
545
546 impl <S: ReadChunk> Seek for BufferedFixedReader<S> {
547
548 fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
549
550 let new_offset = match pos {
551 SeekFrom::Start(start_offset) => start_offset as i64,
552 SeekFrom::End(end_offset) => (self.archive_size as i64)+ end_offset,
553 SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
554 };
555
556 use std::io::{Error, ErrorKind};
557 if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
558 return Err(Error::new(
559 ErrorKind::Other,
560 format!("seek is out of range {} ([0..{}])", new_offset, self.archive_size)));
561 }
562 self.read_offset = new_offset as u64;
563
564 Ok(self.read_offset)
565 }
566 }