]> git.proxmox.com Git - proxmox-backup.git/blob - pbs-datastore/src/fixed_index.rs
update to proxmox-sys 0.2 crate
[proxmox-backup.git] / pbs-datastore / src / fixed_index.rs
1 use std::fs::File;
2 use std::io::Write;
3 use std::os::unix::io::AsRawFd;
4 use std::path::{Path, PathBuf};
5 use std::sync::Arc;
6 use std::io::{Seek, SeekFrom};
7
8 use anyhow::{bail, format_err, Error};
9
10 use proxmox_sys::process_locker::ProcessLockSharedGuard;
11 use proxmox_io::ReadExt;
12 use proxmox_uuid::Uuid;
13
14 use crate::chunk_stat::ChunkStat;
15 use crate::chunk_store::ChunkStore;
16 use crate::data_blob::ChunkInfo;
17 use crate::file_formats;
18 use crate::index::{ChunkReadInfo, IndexFile};
19
20 /// Header format definition for fixed index files (`.fidx`)
21 #[repr(C)]
22 pub struct FixedIndexHeader {
23 pub magic: [u8; 8],
24 pub uuid: [u8; 16],
25 pub ctime: i64,
26 /// Sha256 over the index ``SHA256(digest1||digest2||...)``
27 pub index_csum: [u8; 32],
28 pub size: u64,
29 pub chunk_size: u64,
30 reserved: [u8; 4016], // overall size is one page (4096 bytes)
31 }
32 proxmox_lang::static_assert_size!(FixedIndexHeader, 4096);
33
34 // split image into fixed size chunks
35
36 pub struct FixedIndexReader {
37 _file: File,
38 pub chunk_size: usize,
39 pub size: u64,
40 index_length: usize,
41 index: *mut u8,
42 pub uuid: [u8; 16],
43 pub ctime: i64,
44 pub index_csum: [u8; 32],
45 }
46
47 // `index` is mmap()ed which cannot be thread-local so should be sendable
48 unsafe impl Send for FixedIndexReader {}
49 unsafe impl Sync for FixedIndexReader {}
50
51 impl Drop for FixedIndexReader {
52 fn drop(&mut self) {
53 if let Err(err) = self.unmap() {
54 eprintln!("Unable to unmap file - {}", err);
55 }
56 }
57 }
58
59 impl FixedIndexReader {
60 pub fn open(path: &Path) -> Result<Self, Error> {
61 File::open(path)
62 .map_err(Error::from)
63 .and_then(Self::new)
64 .map_err(|err| format_err!("Unable to open fixed index {:?} - {}", path, err))
65 }
66
67 pub fn new(mut file: std::fs::File) -> Result<Self, Error> {
68 file.seek(SeekFrom::Start(0))?;
69
70 let header_size = std::mem::size_of::<FixedIndexHeader>();
71
72 let stat = match nix::sys::stat::fstat(file.as_raw_fd()) {
73 Ok(stat) => stat,
74 Err(err) => bail!("fstat failed - {}", err),
75 };
76
77 let size = stat.st_size as usize;
78
79 if size < header_size {
80 bail!("index too small ({})", stat.st_size);
81 }
82
83 let header: Box<FixedIndexHeader> = unsafe { file.read_host_value_boxed()? };
84
85 if header.magic != file_formats::FIXED_SIZED_CHUNK_INDEX_1_0 {
86 bail!("got unknown magic number");
87 }
88
89 let size = u64::from_le(header.size);
90 let ctime = i64::from_le(header.ctime);
91 let chunk_size = u64::from_le(header.chunk_size);
92
93 let index_length = ((size + chunk_size - 1) / chunk_size) as usize;
94 let index_size = index_length * 32;
95
96 let expected_index_size = (stat.st_size as usize) - header_size;
97 if index_size != expected_index_size {
98 bail!(
99 "got unexpected file size ({} != {})",
100 index_size,
101 expected_index_size
102 );
103 }
104
105 let data = unsafe {
106 nix::sys::mman::mmap(
107 std::ptr::null_mut(),
108 index_size,
109 nix::sys::mman::ProtFlags::PROT_READ,
110 nix::sys::mman::MapFlags::MAP_PRIVATE,
111 file.as_raw_fd(),
112 header_size as i64,
113 )
114 }? as *mut u8;
115
116 Ok(Self {
117 _file: file,
118 chunk_size: chunk_size as usize,
119 size,
120 index_length,
121 index: data,
122 ctime,
123 uuid: header.uuid,
124 index_csum: header.index_csum,
125 })
126 }
127
128 fn unmap(&mut self) -> Result<(), Error> {
129 if self.index.is_null() {
130 return Ok(());
131 }
132
133 let index_size = self.index_length * 32;
134
135 if let Err(err) =
136 unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) }
137 {
138 bail!("unmap file failed - {}", err);
139 }
140
141 self.index = std::ptr::null_mut();
142
143 Ok(())
144 }
145
146 pub fn print_info(&self) {
147 println!("Size: {}", self.size);
148 println!("ChunkSize: {}", self.chunk_size);
149
150 let mut ctime_str = self.ctime.to_string();
151 if let Ok(s) = proxmox_time::strftime_local("%c", self.ctime) {
152 ctime_str = s;
153 }
154
155 println!("CTime: {}", ctime_str);
156 println!("UUID: {:?}", self.uuid);
157 }
158 }
159
160 impl IndexFile for FixedIndexReader {
161 fn index_count(&self) -> usize {
162 self.index_length
163 }
164
165 fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> {
166 if pos >= self.index_length {
167 None
168 } else {
169 Some(unsafe { &*(self.index.add(pos * 32) as *const [u8; 32]) })
170 }
171 }
172
173 fn index_bytes(&self) -> u64 {
174 self.size
175 }
176
177 fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo> {
178 if pos >= self.index_length {
179 return None;
180 }
181
182 let start = (pos * self.chunk_size) as u64;
183 let mut end = start + self.chunk_size as u64;
184
185 if end > self.size {
186 end = self.size;
187 }
188
189 let digest = self.index_digest(pos).unwrap();
190 Some(ChunkReadInfo {
191 range: start..end,
192 digest: *digest,
193 })
194 }
195
196 fn index_ctime(&self) -> i64 {
197 self.ctime
198 }
199
200 fn index_size(&self) -> usize {
201 self.size as usize
202 }
203
204 fn compute_csum(&self) -> ([u8; 32], u64) {
205 let mut csum = openssl::sha::Sha256::new();
206 let mut chunk_end = 0;
207 for pos in 0..self.index_count() {
208 let info = self.chunk_info(pos).unwrap();
209 chunk_end = info.range.end;
210 csum.update(&info.digest);
211 }
212 let csum = csum.finish();
213
214 (csum, chunk_end)
215 }
216
217 fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
218 if offset >= self.size {
219 return None;
220 }
221
222 Some((
223 (offset / self.chunk_size as u64) as usize,
224 offset & (self.chunk_size - 1) as u64, // fast modulo, valid for 2^x chunk_size
225 ))
226 }
227 }
228
229 pub struct FixedIndexWriter {
230 store: Arc<ChunkStore>,
231 file: File,
232 _lock: ProcessLockSharedGuard,
233 filename: PathBuf,
234 tmp_filename: PathBuf,
235 chunk_size: usize,
236 size: usize,
237 index_length: usize,
238 index: *mut u8,
239 pub uuid: [u8; 16],
240 pub ctime: i64,
241 }
242
243 // `index` is mmap()ed which cannot be thread-local so should be sendable
244 unsafe impl Send for FixedIndexWriter {}
245
246 impl Drop for FixedIndexWriter {
247 fn drop(&mut self) {
248 let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors
249 if let Err(err) = self.unmap() {
250 eprintln!("Unable to unmap file {:?} - {}", self.tmp_filename, err);
251 }
252 }
253 }
254
255 impl FixedIndexWriter {
256 #[allow(clippy::cast_ptr_alignment)]
257 pub fn create(
258 store: Arc<ChunkStore>,
259 path: &Path,
260 size: usize,
261 chunk_size: usize,
262 ) -> Result<Self, Error> {
263 let shared_lock = store.try_shared_lock()?;
264
265 let full_path = store.relative_path(path);
266 let mut tmp_path = full_path.clone();
267 tmp_path.set_extension("tmp_fidx");
268
269 let mut file = std::fs::OpenOptions::new()
270 .create(true)
271 .truncate(true)
272 .read(true)
273 .write(true)
274 .open(&tmp_path)?;
275
276 let header_size = std::mem::size_of::<FixedIndexHeader>();
277
278 // todo: use static assertion when available in rust
279 if header_size != 4096 {
280 panic!("got unexpected header size");
281 }
282
283 let ctime = proxmox_time::epoch_i64();
284
285 let uuid = Uuid::generate();
286
287 let buffer = vec![0u8; header_size];
288 let header = unsafe { &mut *(buffer.as_ptr() as *mut FixedIndexHeader) };
289
290 header.magic = file_formats::FIXED_SIZED_CHUNK_INDEX_1_0;
291 header.ctime = i64::to_le(ctime);
292 header.size = u64::to_le(size as u64);
293 header.chunk_size = u64::to_le(chunk_size as u64);
294 header.uuid = *uuid.as_bytes();
295
296 header.index_csum = [0u8; 32];
297
298 file.write_all(&buffer)?;
299
300 let index_length = (size + chunk_size - 1) / chunk_size;
301 let index_size = index_length * 32;
302 nix::unistd::ftruncate(file.as_raw_fd(), (header_size + index_size) as i64)?;
303
304 let data = unsafe {
305 nix::sys::mman::mmap(
306 std::ptr::null_mut(),
307 index_size,
308 nix::sys::mman::ProtFlags::PROT_READ | nix::sys::mman::ProtFlags::PROT_WRITE,
309 nix::sys::mman::MapFlags::MAP_SHARED,
310 file.as_raw_fd(),
311 header_size as i64,
312 )
313 }? as *mut u8;
314
315 Ok(Self {
316 store,
317 file,
318 _lock: shared_lock,
319 filename: full_path,
320 tmp_filename: tmp_path,
321 chunk_size,
322 size,
323 index_length,
324 index: data,
325 ctime,
326 uuid: *uuid.as_bytes(),
327 })
328 }
329
330 pub fn index_length(&self) -> usize {
331 self.index_length
332 }
333
334 fn unmap(&mut self) -> Result<(), Error> {
335 if self.index.is_null() {
336 return Ok(());
337 }
338
339 let index_size = self.index_length * 32;
340
341 if let Err(err) =
342 unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) }
343 {
344 bail!("unmap file {:?} failed - {}", self.tmp_filename, err);
345 }
346
347 self.index = std::ptr::null_mut();
348
349 Ok(())
350 }
351
352 pub fn close(&mut self) -> Result<[u8; 32], Error> {
353 if self.index.is_null() {
354 bail!("cannot close already closed index file.");
355 }
356
357 let index_size = self.index_length * 32;
358 let data = unsafe { std::slice::from_raw_parts(self.index, index_size) };
359 let index_csum = openssl::sha::sha256(data);
360
361 self.unmap()?;
362
363 let csum_offset = proxmox_lang::offsetof!(FixedIndexHeader, index_csum);
364 self.file.seek(SeekFrom::Start(csum_offset as u64))?;
365 self.file.write_all(&index_csum)?;
366 self.file.flush()?;
367
368 if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
369 bail!("Atomic rename file {:?} failed - {}", self.filename, err);
370 }
371
372 Ok(index_csum)
373 }
374
375 pub fn check_chunk_alignment(&self, offset: usize, chunk_len: usize) -> Result<usize, Error> {
376 if offset < chunk_len {
377 bail!("got chunk with small offset ({} < {}", offset, chunk_len);
378 }
379
380 let pos = offset - chunk_len;
381
382 if offset > self.size {
383 bail!("chunk data exceeds size ({} >= {})", offset, self.size);
384 }
385
386 // last chunk can be smaller
387 if ((offset != self.size) && (chunk_len != self.chunk_size))
388 || (chunk_len > self.chunk_size)
389 || (chunk_len == 0)
390 {
391 bail!(
392 "chunk with unexpected length ({} != {}",
393 chunk_len,
394 self.chunk_size
395 );
396 }
397
398 if pos & (self.chunk_size - 1) != 0 {
399 bail!("got unaligned chunk (pos = {})", pos);
400 }
401
402 Ok(pos / self.chunk_size)
403 }
404
405 // Note: We want to add data out of order, so do not assume any order here.
406 pub fn add_chunk(&mut self, chunk_info: &ChunkInfo, stat: &mut ChunkStat) -> Result<(), Error> {
407 let chunk_len = chunk_info.chunk_len as usize;
408 let offset = chunk_info.offset as usize; // end of chunk
409
410 let idx = self.check_chunk_alignment(offset, chunk_len)?;
411
412 let (is_duplicate, compressed_size) = self
413 .store
414 .insert_chunk(&chunk_info.chunk, &chunk_info.digest)?;
415
416 stat.chunk_count += 1;
417 stat.compressed_size += compressed_size;
418
419 let digest = &chunk_info.digest;
420
421 println!(
422 "ADD CHUNK {} {} {}% {} {}",
423 idx,
424 chunk_len,
425 (compressed_size * 100) / (chunk_len as u64),
426 is_duplicate,
427 hex::encode(digest)
428 );
429
430 if is_duplicate {
431 stat.duplicate_chunks += 1;
432 } else {
433 stat.disk_size += compressed_size;
434 }
435
436 self.add_digest(idx, digest)
437 }
438
439 pub fn add_digest(&mut self, index: usize, digest: &[u8; 32]) -> Result<(), Error> {
440 if index >= self.index_length {
441 bail!(
442 "add digest failed - index out of range ({} >= {})",
443 index,
444 self.index_length
445 );
446 }
447
448 if self.index.is_null() {
449 bail!("cannot write to closed index file.");
450 }
451
452 let index_pos = index * 32;
453 unsafe {
454 let dst = self.index.add(index_pos);
455 dst.copy_from_nonoverlapping(digest.as_ptr(), 32);
456 }
457
458 Ok(())
459 }
460
461 pub fn clone_data_from(&mut self, reader: &FixedIndexReader) -> Result<(), Error> {
462 if self.index_length != reader.index_count() {
463 bail!("clone_data_from failed - index sizes not equal");
464 }
465
466 for i in 0..self.index_length {
467 self.add_digest(i, reader.index_digest(i).unwrap())?;
468 }
469
470 Ok(())
471 }
472 }