]>
Commit | Line | Data |
---|---|---|
1 | use failure::*; | |
2 | use std::convert::TryInto; | |
3 | use std::io::{Seek, SeekFrom}; | |
4 | ||
5 | use super::chunk_stat::*; | |
6 | use super::chunk_store::*; | |
7 | use super::IndexFile; | |
8 | use crate::tools; | |
9 | ||
10 | use chrono::{Local, TimeZone}; | |
11 | use std::fs::File; | |
12 | use std::io::Write; | |
13 | use std::os::unix::io::AsRawFd; | |
14 | use std::path::{Path, PathBuf}; | |
15 | use std::sync::Arc; | |
16 | ||
17 | use super::read_chunk::*; | |
18 | use super::ChunkInfo; | |
19 | ||
20 | use proxmox::tools::io::ReadExt; | |
21 | use proxmox::tools::Uuid; | |
22 | ||
23 | /// Header format definition for fixed index files (`.fidx`) | |
24 | #[repr(C)] | |
25 | pub struct FixedIndexHeader { | |
26 | pub magic: [u8; 8], | |
27 | pub uuid: [u8; 16], | |
28 | pub ctime: u64, | |
29 | /// Sha256 over the index ``SHA256(digest1||digest2||...)`` | |
30 | pub index_csum: [u8; 32], | |
31 | pub size: u64, | |
32 | pub chunk_size: u64, | |
33 | reserved: [u8; 4016], // overall size is one page (4096 bytes) | |
34 | } | |
35 | proxmox::static_assert_size!(FixedIndexHeader, 4096); | |
36 | ||
37 | // split image into fixed size chunks | |
38 | ||
39 | pub struct FixedIndexReader { | |
40 | _file: File, | |
41 | pub chunk_size: usize, | |
42 | pub size: u64, | |
43 | index_length: usize, | |
44 | index: *mut u8, | |
45 | pub uuid: [u8; 16], | |
46 | pub ctime: u64, | |
47 | pub index_csum: [u8; 32], | |
48 | } | |
49 | ||
50 | // `index` is mmap()ed which cannot be thread-local so should be sendable | |
51 | unsafe impl Send for FixedIndexReader {} | |
52 | unsafe impl Sync for FixedIndexReader {} | |
53 | ||
54 | impl Drop for FixedIndexReader { | |
55 | fn drop(&mut self) { | |
56 | if let Err(err) = self.unmap() { | |
57 | eprintln!("Unable to unmap file - {}", err); | |
58 | } | |
59 | } | |
60 | } | |
61 | ||
62 | impl FixedIndexReader { | |
63 | pub fn open(path: &Path) -> Result<Self, Error> { | |
64 | File::open(path) | |
65 | .map_err(Error::from) | |
66 | .and_then(|file| Self::new(file)) | |
67 | .map_err(|err| format_err!("Unable to open fixed index {:?} - {}", path, err)) | |
68 | } | |
69 | ||
70 | pub fn new(mut file: std::fs::File) -> Result<Self, Error> { | |
71 | if let Err(err) = | |
72 | nix::fcntl::flock(file.as_raw_fd(), nix::fcntl::FlockArg::LockSharedNonblock) | |
73 | { | |
74 | bail!("unable to get shared lock - {}", err); | |
75 | } | |
76 | ||
77 | file.seek(SeekFrom::Start(0))?; | |
78 | ||
79 | let header_size = std::mem::size_of::<FixedIndexHeader>(); | |
80 | let header: Box<FixedIndexHeader> = unsafe { file.read_host_value_boxed()? }; | |
81 | ||
82 | if header.magic != super::FIXED_SIZED_CHUNK_INDEX_1_0 { | |
83 | bail!("got unknown magic number"); | |
84 | } | |
85 | ||
86 | let size = u64::from_le(header.size); | |
87 | let ctime = u64::from_le(header.ctime); | |
88 | let chunk_size = u64::from_le(header.chunk_size); | |
89 | ||
90 | let index_length = ((size + chunk_size - 1) / chunk_size) as usize; | |
91 | let index_size = index_length * 32; | |
92 | ||
93 | let rawfd = file.as_raw_fd(); | |
94 | ||
95 | let stat = match nix::sys::stat::fstat(rawfd) { | |
96 | Ok(stat) => stat, | |
97 | Err(err) => bail!("fstat failed - {}", err), | |
98 | }; | |
99 | ||
100 | let expected_index_size = (stat.st_size as usize) - header_size; | |
101 | if index_size != expected_index_size { | |
102 | bail!( | |
103 | "got unexpected file size ({} != {})", | |
104 | index_size, | |
105 | expected_index_size | |
106 | ); | |
107 | } | |
108 | ||
109 | let data = unsafe { | |
110 | nix::sys::mman::mmap( | |
111 | std::ptr::null_mut(), | |
112 | index_size, | |
113 | nix::sys::mman::ProtFlags::PROT_READ, | |
114 | nix::sys::mman::MapFlags::MAP_PRIVATE, | |
115 | file.as_raw_fd(), | |
116 | header_size as i64, | |
117 | ) | |
118 | }? as *mut u8; | |
119 | ||
120 | Ok(Self { | |
121 | _file: file, | |
122 | chunk_size: chunk_size as usize, | |
123 | size, | |
124 | index_length, | |
125 | index: data, | |
126 | ctime, | |
127 | uuid: header.uuid, | |
128 | index_csum: header.index_csum, | |
129 | }) | |
130 | } | |
131 | ||
132 | fn unmap(&mut self) -> Result<(), Error> { | |
133 | if self.index == std::ptr::null_mut() { | |
134 | return Ok(()); | |
135 | } | |
136 | ||
137 | let index_size = self.index_length * 32; | |
138 | ||
139 | if let Err(err) = | |
140 | unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) } | |
141 | { | |
142 | bail!("unmap file failed - {}", err); | |
143 | } | |
144 | ||
145 | self.index = std::ptr::null_mut(); | |
146 | ||
147 | Ok(()) | |
148 | } | |
149 | ||
150 | pub fn chunk_info(&self, pos: usize) -> Result<(u64, u64, [u8; 32]), Error> { | |
151 | if pos >= self.index_length { | |
152 | bail!("chunk index out of range"); | |
153 | } | |
154 | let start = (pos * self.chunk_size) as u64; | |
155 | let mut end = start + self.chunk_size as u64; | |
156 | ||
157 | if end > self.size { | |
158 | end = self.size; | |
159 | } | |
160 | ||
161 | let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit(); | |
162 | unsafe { | |
163 | std::ptr::copy_nonoverlapping( | |
164 | self.index.add(pos * 32), | |
165 | (*digest.as_mut_ptr()).as_mut_ptr(), | |
166 | 32, | |
167 | ); | |
168 | } | |
169 | ||
170 | Ok((start, end, unsafe { digest.assume_init() })) | |
171 | } | |
172 | ||
173 | #[inline] | |
174 | fn chunk_digest(&self, pos: usize) -> &[u8; 32] { | |
175 | if pos >= self.index_length { | |
176 | panic!("chunk index out of range"); | |
177 | } | |
178 | let slice = unsafe { std::slice::from_raw_parts(self.index.add(pos * 32), 32) }; | |
179 | slice.try_into().unwrap() | |
180 | } | |
181 | ||
182 | #[inline] | |
183 | fn chunk_end(&self, pos: usize) -> u64 { | |
184 | if pos >= self.index_length { | |
185 | panic!("chunk index out of range"); | |
186 | } | |
187 | ||
188 | let end = ((pos + 1) * self.chunk_size) as u64; | |
189 | if end > self.size { | |
190 | self.size | |
191 | } else { | |
192 | end | |
193 | } | |
194 | } | |
195 | ||
196 | /// Compute checksum and data size | |
197 | pub fn compute_csum(&self) -> ([u8; 32], u64) { | |
198 | let mut csum = openssl::sha::Sha256::new(); | |
199 | let mut chunk_end = 0; | |
200 | for pos in 0..self.index_length { | |
201 | chunk_end = ((pos + 1) * self.chunk_size) as u64; | |
202 | let digest = self.chunk_digest(pos); | |
203 | csum.update(digest); | |
204 | } | |
205 | let csum = csum.finish(); | |
206 | ||
207 | (csum, chunk_end) | |
208 | } | |
209 | ||
210 | pub fn print_info(&self) { | |
211 | println!("Size: {}", self.size); | |
212 | println!("ChunkSize: {}", self.chunk_size); | |
213 | println!( | |
214 | "CTime: {}", | |
215 | Local.timestamp(self.ctime as i64, 0).format("%c") | |
216 | ); | |
217 | println!("UUID: {:?}", self.uuid); | |
218 | } | |
219 | } | |
220 | ||
221 | impl IndexFile for FixedIndexReader { | |
222 | fn index_count(&self) -> usize { | |
223 | self.index_length | |
224 | } | |
225 | ||
226 | fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> { | |
227 | if pos >= self.index_length { | |
228 | None | |
229 | } else { | |
230 | Some(unsafe { std::mem::transmute(self.index.add(pos * 32)) }) | |
231 | } | |
232 | } | |
233 | ||
234 | fn index_bytes(&self) -> u64 { | |
235 | self.size | |
236 | } | |
237 | } | |
238 | ||
239 | pub struct FixedIndexWriter { | |
240 | store: Arc<ChunkStore>, | |
241 | file: File, | |
242 | _lock: tools::ProcessLockSharedGuard, | |
243 | filename: PathBuf, | |
244 | tmp_filename: PathBuf, | |
245 | chunk_size: usize, | |
246 | size: usize, | |
247 | index_length: usize, | |
248 | index: *mut u8, | |
249 | pub uuid: [u8; 16], | |
250 | pub ctime: u64, | |
251 | } | |
252 | ||
253 | // `index` is mmap()ed which cannot be thread-local so should be sendable | |
254 | unsafe impl Send for FixedIndexWriter {} | |
255 | ||
256 | impl Drop for FixedIndexWriter { | |
257 | fn drop(&mut self) { | |
258 | let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors | |
259 | if let Err(err) = self.unmap() { | |
260 | eprintln!("Unable to unmap file {:?} - {}", self.tmp_filename, err); | |
261 | } | |
262 | } | |
263 | } | |
264 | ||
265 | impl FixedIndexWriter { | |
266 | #[allow(clippy::cast_ptr_alignment)] | |
267 | pub fn create( | |
268 | store: Arc<ChunkStore>, | |
269 | path: &Path, | |
270 | size: usize, | |
271 | chunk_size: usize, | |
272 | ) -> Result<Self, Error> { | |
273 | let shared_lock = store.try_shared_lock()?; | |
274 | ||
275 | let full_path = store.relative_path(path); | |
276 | let mut tmp_path = full_path.clone(); | |
277 | tmp_path.set_extension("tmp_fidx"); | |
278 | ||
279 | let mut file = std::fs::OpenOptions::new() | |
280 | .create(true) | |
281 | .truncate(true) | |
282 | .read(true) | |
283 | .write(true) | |
284 | .open(&tmp_path)?; | |
285 | ||
286 | let header_size = std::mem::size_of::<FixedIndexHeader>(); | |
287 | ||
288 | // todo: use static assertion when available in rust | |
289 | if header_size != 4096 { | |
290 | panic!("got unexpected header size"); | |
291 | } | |
292 | ||
293 | let ctime = std::time::SystemTime::now() | |
294 | .duration_since(std::time::SystemTime::UNIX_EPOCH)? | |
295 | .as_secs(); | |
296 | ||
297 | let uuid = Uuid::generate(); | |
298 | ||
299 | let buffer = vec![0u8; header_size]; | |
300 | let header = unsafe { &mut *(buffer.as_ptr() as *mut FixedIndexHeader) }; | |
301 | ||
302 | header.magic = super::FIXED_SIZED_CHUNK_INDEX_1_0; | |
303 | header.ctime = u64::to_le(ctime); | |
304 | header.size = u64::to_le(size as u64); | |
305 | header.chunk_size = u64::to_le(chunk_size as u64); | |
306 | header.uuid = *uuid.as_bytes(); | |
307 | ||
308 | header.index_csum = [0u8; 32]; | |
309 | ||
310 | file.write_all(&buffer)?; | |
311 | ||
312 | let index_length = (size + chunk_size - 1) / chunk_size; | |
313 | let index_size = index_length * 32; | |
314 | nix::unistd::ftruncate(file.as_raw_fd(), (header_size + index_size) as i64)?; | |
315 | ||
316 | let data = unsafe { | |
317 | nix::sys::mman::mmap( | |
318 | std::ptr::null_mut(), | |
319 | index_size, | |
320 | nix::sys::mman::ProtFlags::PROT_READ | nix::sys::mman::ProtFlags::PROT_WRITE, | |
321 | nix::sys::mman::MapFlags::MAP_SHARED, | |
322 | file.as_raw_fd(), | |
323 | header_size as i64, | |
324 | ) | |
325 | }? as *mut u8; | |
326 | ||
327 | Ok(Self { | |
328 | store, | |
329 | file, | |
330 | _lock: shared_lock, | |
331 | filename: full_path, | |
332 | tmp_filename: tmp_path, | |
333 | chunk_size, | |
334 | size, | |
335 | index_length, | |
336 | index: data, | |
337 | ctime, | |
338 | uuid: *uuid.as_bytes(), | |
339 | }) | |
340 | } | |
341 | ||
342 | pub fn index_length(&self) -> usize { | |
343 | self.index_length | |
344 | } | |
345 | ||
346 | fn unmap(&mut self) -> Result<(), Error> { | |
347 | if self.index == std::ptr::null_mut() { | |
348 | return Ok(()); | |
349 | } | |
350 | ||
351 | let index_size = self.index_length * 32; | |
352 | ||
353 | if let Err(err) = | |
354 | unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) } | |
355 | { | |
356 | bail!("unmap file {:?} failed - {}", self.tmp_filename, err); | |
357 | } | |
358 | ||
359 | self.index = std::ptr::null_mut(); | |
360 | ||
361 | Ok(()) | |
362 | } | |
363 | ||
364 | pub fn close(&mut self) -> Result<[u8; 32], Error> { | |
365 | if self.index == std::ptr::null_mut() { | |
366 | bail!("cannot close already closed index file."); | |
367 | } | |
368 | ||
369 | let index_size = self.index_length * 32; | |
370 | let data = unsafe { std::slice::from_raw_parts(self.index, index_size) }; | |
371 | let index_csum = openssl::sha::sha256(data); | |
372 | ||
373 | self.unmap()?; | |
374 | ||
375 | let csum_offset = proxmox::offsetof!(FixedIndexHeader, index_csum); | |
376 | self.file.seek(SeekFrom::Start(csum_offset as u64))?; | |
377 | self.file.write_all(&index_csum)?; | |
378 | self.file.flush()?; | |
379 | ||
380 | if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) { | |
381 | bail!("Atomic rename file {:?} failed - {}", self.filename, err); | |
382 | } | |
383 | ||
384 | Ok(index_csum) | |
385 | } | |
386 | ||
387 | pub fn check_chunk_alignment(&self, offset: usize, chunk_len: usize) -> Result<usize, Error> { | |
388 | if offset < chunk_len { | |
389 | bail!("got chunk with small offset ({} < {}", offset, chunk_len); | |
390 | } | |
391 | ||
392 | let pos = offset - chunk_len; | |
393 | ||
394 | if offset > self.size { | |
395 | bail!("chunk data exceeds size ({} >= {})", offset, self.size); | |
396 | } | |
397 | ||
398 | // last chunk can be smaller | |
399 | if ((offset != self.size) && (chunk_len != self.chunk_size)) | |
400 | || (chunk_len > self.chunk_size) | |
401 | || (chunk_len == 0) | |
402 | { | |
403 | bail!( | |
404 | "chunk with unexpected length ({} != {}", | |
405 | chunk_len, | |
406 | self.chunk_size | |
407 | ); | |
408 | } | |
409 | ||
410 | if pos & (self.chunk_size - 1) != 0 { | |
411 | bail!("got unaligned chunk (pos = {})", pos); | |
412 | } | |
413 | ||
414 | Ok(pos / self.chunk_size) | |
415 | } | |
416 | ||
417 | // Note: We want to add data out of order, so do not assume any order here. | |
418 | pub fn add_chunk(&mut self, chunk_info: &ChunkInfo, stat: &mut ChunkStat) -> Result<(), Error> { | |
419 | let chunk_len = chunk_info.chunk_len as usize; | |
420 | let offset = chunk_info.offset as usize; // end of chunk | |
421 | ||
422 | let idx = self.check_chunk_alignment(offset, chunk_len)?; | |
423 | ||
424 | let (is_duplicate, compressed_size) = self | |
425 | .store | |
426 | .insert_chunk(&chunk_info.chunk, &chunk_info.digest)?; | |
427 | ||
428 | stat.chunk_count += 1; | |
429 | stat.compressed_size += compressed_size; | |
430 | ||
431 | let digest = &chunk_info.digest; | |
432 | ||
433 | println!( | |
434 | "ADD CHUNK {} {} {}% {} {}", | |
435 | idx, | |
436 | chunk_len, | |
437 | (compressed_size * 100) / (chunk_len as u64), | |
438 | is_duplicate, | |
439 | proxmox::tools::digest_to_hex(digest) | |
440 | ); | |
441 | ||
442 | if is_duplicate { | |
443 | stat.duplicate_chunks += 1; | |
444 | } else { | |
445 | stat.disk_size += compressed_size; | |
446 | } | |
447 | ||
448 | self.add_digest(idx, digest) | |
449 | } | |
450 | ||
451 | pub fn add_digest(&mut self, index: usize, digest: &[u8; 32]) -> Result<(), Error> { | |
452 | if index >= self.index_length { | |
453 | bail!( | |
454 | "add digest failed - index out of range ({} >= {})", | |
455 | index, | |
456 | self.index_length | |
457 | ); | |
458 | } | |
459 | ||
460 | if self.index == std::ptr::null_mut() { | |
461 | bail!("cannot write to closed index file."); | |
462 | } | |
463 | ||
464 | let index_pos = index * 32; | |
465 | unsafe { | |
466 | let dst = self.index.add(index_pos); | |
467 | dst.copy_from_nonoverlapping(digest.as_ptr(), 32); | |
468 | } | |
469 | ||
470 | Ok(()) | |
471 | } | |
472 | } | |
473 | ||
474 | pub struct BufferedFixedReader<S> { | |
475 | store: S, | |
476 | index: FixedIndexReader, | |
477 | archive_size: u64, | |
478 | read_buffer: Vec<u8>, | |
479 | buffered_chunk_idx: usize, | |
480 | buffered_chunk_start: u64, | |
481 | read_offset: u64, | |
482 | } | |
483 | ||
484 | impl<S: ReadChunk> BufferedFixedReader<S> { | |
485 | pub fn new(index: FixedIndexReader, store: S) -> Self { | |
486 | let archive_size = index.size; | |
487 | Self { | |
488 | store, | |
489 | index, | |
490 | archive_size, | |
491 | read_buffer: Vec::with_capacity(1024 * 1024), | |
492 | buffered_chunk_idx: 0, | |
493 | buffered_chunk_start: 0, | |
494 | read_offset: 0, | |
495 | } | |
496 | } | |
497 | ||
498 | pub fn archive_size(&self) -> u64 { | |
499 | self.archive_size | |
500 | } | |
501 | ||
502 | fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> { | |
503 | let index = &self.index; | |
504 | let (start, end, digest) = index.chunk_info(idx)?; | |
505 | ||
506 | // fixme: avoid copy | |
507 | ||
508 | let data = self.store.read_chunk(&digest)?; | |
509 | ||
510 | if (end - start) != data.len() as u64 { | |
511 | bail!( | |
512 | "read chunk with wrong size ({} != {}", | |
513 | (end - start), | |
514 | data.len() | |
515 | ); | |
516 | } | |
517 | ||
518 | self.read_buffer.clear(); | |
519 | self.read_buffer.extend_from_slice(&data); | |
520 | ||
521 | self.buffered_chunk_idx = idx; | |
522 | ||
523 | self.buffered_chunk_start = start as u64; | |
524 | //println!("BUFFER {} {}", self.buffered_chunk_start, end); | |
525 | Ok(()) | |
526 | } | |
527 | } | |
528 | ||
529 | impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> { | |
530 | fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> { | |
531 | if offset == self.archive_size { | |
532 | return Ok(&self.read_buffer[0..0]); | |
533 | } | |
534 | ||
535 | let buffer_len = self.read_buffer.len(); | |
536 | let index = &self.index; | |
537 | ||
538 | // optimization for sequential read | |
539 | if buffer_len > 0 | |
540 | && ((self.buffered_chunk_idx + 1) < index.index_length) | |
541 | && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) | |
542 | { | |
543 | let next_idx = self.buffered_chunk_idx + 1; | |
544 | let next_end = index.chunk_end(next_idx); | |
545 | if offset < next_end { | |
546 | self.buffer_chunk(next_idx)?; | |
547 | let buffer_offset = (offset - self.buffered_chunk_start) as usize; | |
548 | return Ok(&self.read_buffer[buffer_offset..]); | |
549 | } | |
550 | } | |
551 | ||
552 | if (buffer_len == 0) | |
553 | || (offset < self.buffered_chunk_start) | |
554 | || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) | |
555 | { | |
556 | let idx = (offset / index.chunk_size as u64) as usize; | |
557 | self.buffer_chunk(idx)?; | |
558 | } | |
559 | ||
560 | let buffer_offset = (offset - self.buffered_chunk_start) as usize; | |
561 | Ok(&self.read_buffer[buffer_offset..]) | |
562 | } | |
563 | } | |
564 | ||
565 | impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> { | |
566 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { | |
567 | use crate::tools::BufferedRead; | |
568 | use std::io::{Error, ErrorKind}; | |
569 | ||
570 | let data = match self.buffered_read(self.read_offset) { | |
571 | Ok(v) => v, | |
572 | Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())), | |
573 | }; | |
574 | ||
575 | let n = if data.len() > buf.len() { | |
576 | buf.len() | |
577 | } else { | |
578 | data.len() | |
579 | }; | |
580 | ||
581 | unsafe { | |
582 | std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n); | |
583 | } | |
584 | ||
585 | self.read_offset += n as u64; | |
586 | ||
587 | Ok(n) | |
588 | } | |
589 | } | |
590 | ||
591 | impl<S: ReadChunk> Seek for BufferedFixedReader<S> { | |
592 | fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> { | |
593 | let new_offset = match pos { | |
594 | SeekFrom::Start(start_offset) => start_offset as i64, | |
595 | SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset, | |
596 | SeekFrom::Current(offset) => (self.read_offset as i64) + offset, | |
597 | }; | |
598 | ||
599 | use std::io::{Error, ErrorKind}; | |
600 | if (new_offset < 0) || (new_offset > (self.archive_size as i64)) { | |
601 | return Err(Error::new( | |
602 | ErrorKind::Other, | |
603 | format!( | |
604 | "seek is out of range {} ([0..{}])", | |
605 | new_offset, self.archive_size | |
606 | ), | |
607 | )); | |
608 | } | |
609 | self.read_offset = new_offset as u64; | |
610 | ||
611 | Ok(self.read_offset) | |
612 | } | |
613 | } |