]>
Commit | Line | Data |
---|---|---|
f98ac774 | 1 | use std::convert::TryInto; |
0433db19 | 2 | use std::fs::File; |
367f002e | 3 | use std::io::{BufWriter, Seek, SeekFrom, Write}; |
0433db19 | 4 | use std::os::unix::io::AsRawFd; |
367f002e WB |
5 | use std::path::{Path, PathBuf}; |
6 | use std::sync::Arc; | |
7 | ||
8 | use failure::*; | |
0433db19 | 9 | |
9b2b627f | 10 | use proxmox::tools::io::ReadExt; |
f569acc5 | 11 | use proxmox::tools::uuid::Uuid; |
f35197f4 | 12 | use proxmox::tools::vec; |
8ea3b1d1 | 13 | |
367f002e WB |
14 | use super::chunk_stat::ChunkStat; |
15 | use super::chunk_store::ChunkStore; | |
16 | use super::read_chunk::ReadChunk; | |
f569acc5 WB |
17 | use super::Chunker; |
18 | use super::IndexFile; | |
4ee8f53d | 19 | use super::{DataBlob, DataChunkBuilder}; |
367f002e | 20 | use crate::tools; |
f98ac774 | 21 | |
e5064ba6 | 22 | /// Header format definition for dynamic index files (`.dixd`) |
0433db19 | 23 | #[repr(C)] |
93d5d779 | 24 | pub struct DynamicIndexHeader { |
a7dd4830 | 25 | pub magic: [u8; 8], |
0433db19 DM |
26 | pub uuid: [u8; 16], |
27 | pub ctime: u64, | |
16ff6b7c DM |
28 | /// Sha256 over the index ``SHA256(offset1||digest1||offset2||digest2||...)`` |
29 | pub index_csum: [u8; 32], | |
990b930f | 30 | reserved: [u8; 4032], // overall size is one page (4096 bytes) |
0433db19 | 31 | } |
9ea4bce4 | 32 | proxmox::static_assert_size!(DynamicIndexHeader, 4096); |
990b930f WB |
33 | // TODO: Once non-Copy unions are stabilized, use: |
34 | // union DynamicIndexHeader { | |
35 | // reserved: [u8; 4096], | |
36 | // pub data: DynamicIndexHeaderData, | |
37 | // } | |
77703d95 | 38 | |
93d5d779 | 39 | pub struct DynamicIndexReader { |
728797d0 | 40 | _file: File, |
9f49fe1d | 41 | pub size: usize, |
77703d95 DM |
42 | index: *const u8, |
43 | index_entries: usize, | |
9f49fe1d DM |
44 | pub uuid: [u8; 16], |
45 | pub ctime: u64, | |
16ff6b7c | 46 | pub index_csum: [u8; 32], |
77703d95 DM |
47 | } |
48 | ||
5be4065b WB |
49 | // `index` is mmap()ed which cannot be thread-local so should be sendable |
50 | // FIXME: Introduce an mmap wrapper type for this? | |
93d5d779 | 51 | unsafe impl Send for DynamicIndexReader {} |
5c1130df | 52 | unsafe impl Sync for DynamicIndexReader {} |
0b05fd58 | 53 | |
93d5d779 | 54 | impl Drop for DynamicIndexReader { |
77703d95 DM |
55 | fn drop(&mut self) { |
56 | if let Err(err) = self.unmap() { | |
d48a9955 | 57 | eprintln!("Unable to unmap dynamic index - {}", err); |
77703d95 DM |
58 | } |
59 | } | |
60 | } | |
61 | ||
93d5d779 | 62 | impl DynamicIndexReader { |
d48a9955 | 63 | pub fn open(path: &Path) -> Result<Self, Error> { |
0f0a35b3 DM |
64 | File::open(path) |
65 | .map_err(Error::from) | |
66 | .and_then(|file| Self::new(file)) | |
67 | .map_err(|err| format_err!("Unable to open dynamic index {:?} - {}", path, err)) | |
d48a9955 DM |
68 | } |
69 | ||
70 | pub fn new(mut file: std::fs::File) -> Result<Self, Error> { | |
f569acc5 WB |
71 | if let Err(err) = |
72 | nix::fcntl::flock(file.as_raw_fd(), nix::fcntl::FlockArg::LockSharedNonblock) | |
73 | { | |
0f0a35b3 DM |
74 | bail!("unable to get shared lock - {}", err); |
75 | } | |
76 | ||
d48a9955 DM |
77 | file.seek(SeekFrom::Start(0))?; |
78 | ||
93d5d779 | 79 | let header_size = std::mem::size_of::<DynamicIndexHeader>(); |
77703d95 | 80 | |
ba5e6747 | 81 | let header: Box<DynamicIndexHeader> = unsafe { file.read_host_value_boxed()? }; |
77703d95 | 82 | |
a7dd4830 | 83 | if header.magic != super::DYNAMIC_SIZED_CHUNK_INDEX_1_0 { |
d48a9955 | 84 | bail!("got unknown magic number"); |
77703d95 DM |
85 | } |
86 | ||
77703d95 DM |
87 | let ctime = u64::from_le(header.ctime); |
88 | ||
89 | let rawfd = file.as_raw_fd(); | |
90 | ||
d48a9955 | 91 | let stat = nix::sys::stat::fstat(rawfd)?; |
77703d95 DM |
92 | |
93 | let size = stat.st_size as usize; | |
94 | ||
ddbdf80d | 95 | let index_size = size - header_size; |
77703d95 | 96 | if (index_size % 40) != 0 { |
d48a9955 | 97 | bail!("got unexpected file size"); |
77703d95 DM |
98 | } |
99 | ||
f569acc5 WB |
100 | let data = unsafe { |
101 | nix::sys::mman::mmap( | |
102 | std::ptr::null_mut(), | |
103 | index_size, | |
104 | nix::sys::mman::ProtFlags::PROT_READ, | |
105 | nix::sys::mman::MapFlags::MAP_PRIVATE, | |
106 | rawfd, | |
107 | header_size as i64, | |
108 | ) | |
109 | }? as *const u8; | |
77703d95 | 110 | |
77703d95 | 111 | Ok(Self { |
728797d0 | 112 | _file: file, |
77703d95 DM |
113 | size, |
114 | index: data, | |
f569acc5 | 115 | index_entries: index_size / 40, |
77703d95 DM |
116 | ctime, |
117 | uuid: header.uuid, | |
16ff6b7c | 118 | index_csum: header.index_csum, |
77703d95 DM |
119 | }) |
120 | } | |
121 | ||
122 | fn unmap(&mut self) -> Result<(), Error> { | |
f569acc5 WB |
123 | if self.index == std::ptr::null_mut() { |
124 | return Ok(()); | |
125 | } | |
77703d95 | 126 | |
f569acc5 WB |
127 | if let Err(err) = unsafe { |
128 | nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, self.index_entries * 40) | |
129 | } { | |
d48a9955 | 130 | bail!("unmap dynamic index failed - {}", err); |
77703d95 DM |
131 | } |
132 | ||
133 | self.index = std::ptr::null_mut(); | |
134 | ||
135 | Ok(()) | |
136 | } | |
137 | ||
9fe2f639 | 138 | #[allow(clippy::cast_ptr_alignment)] |
40f4e198 DM |
139 | pub fn chunk_info(&self, pos: usize) -> Result<(u64, u64, [u8; 32]), Error> { |
140 | if pos >= self.index_entries { | |
141 | bail!("chunk index out of range"); | |
142 | } | |
143 | let start = if pos == 0 { | |
144 | 0 | |
145 | } else { | |
f569acc5 | 146 | unsafe { *(self.index.add((pos - 1) * 40) as *const u64) } |
40f4e198 DM |
147 | }; |
148 | ||
f569acc5 | 149 | let end = unsafe { *(self.index.add(pos * 40) as *const u64) }; |
40f4e198 | 150 | |
5e58e1bb WB |
151 | let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit(); |
152 | unsafe { | |
153 | std::ptr::copy_nonoverlapping( | |
f569acc5 | 154 | self.index.add(pos * 40 + 8), |
5e58e1bb WB |
155 | (*digest.as_mut_ptr()).as_mut_ptr(), |
156 | 32, | |
157 | ); | |
158 | } | |
159 | ||
160 | Ok((start, end, unsafe { digest.assume_init() })) | |
40f4e198 DM |
161 | } |
162 | ||
39c6bd86 | 163 | #[inline] |
9fe2f639 | 164 | #[allow(clippy::cast_ptr_alignment)] |
39c6bd86 DM |
165 | fn chunk_end(&self, pos: usize) -> u64 { |
166 | if pos >= self.index_entries { | |
167 | panic!("chunk index out of range"); | |
168 | } | |
f569acc5 | 169 | unsafe { *(self.index.add(pos * 40) as *const u64) } |
39c6bd86 DM |
170 | } |
171 | ||
172 | #[inline] | |
f98ac774 | 173 | fn chunk_digest(&self, pos: usize) -> &[u8; 32] { |
39c6bd86 DM |
174 | if pos >= self.index_entries { |
175 | panic!("chunk index out of range"); | |
176 | } | |
f569acc5 | 177 | let slice = unsafe { std::slice::from_raw_parts(self.index.add(pos * 40 + 8), 32) }; |
f98ac774 | 178 | slice.try_into().unwrap() |
39c6bd86 DM |
179 | } |
180 | ||
82c85a21 DM |
181 | /// Compute checksum and data size |
182 | pub fn compute_csum(&self) -> ([u8; 32], u64) { | |
82c85a21 DM |
183 | let mut csum = openssl::sha::Sha256::new(); |
184 | let mut chunk_end = 0; | |
185 | for pos in 0..self.index_entries { | |
186 | chunk_end = self.chunk_end(pos); | |
187 | let digest = self.chunk_digest(pos); | |
188 | csum.update(&chunk_end.to_le_bytes()); | |
189 | csum.update(digest); | |
190 | } | |
191 | let csum = csum.finish(); | |
192 | ||
193 | (csum, chunk_end) | |
194 | } | |
195 | ||
d48a9955 | 196 | /* |
dd5495d6 | 197 | pub fn dump_pxar(&self, mut writer: Box<dyn Write>) -> Result<(), Error> { |
96df2fb4 DM |
198 | |
199 | for pos in 0..self.index_entries { | |
ddbdf80d | 200 | let _end = self.chunk_end(pos); |
39c6bd86 DM |
201 | let digest = self.chunk_digest(pos); |
202 | //println!("Dump {:08x}", end ); | |
f98ac774 DM |
203 | let chunk = self.store.read_chunk(digest)?; |
204 | // fimxe: handle encrypted chunks | |
205 | let data = chunk.decode(None)?; | |
206 | writer.write_all(&data)?; | |
96df2fb4 DM |
207 | } |
208 | ||
209 | Ok(()) | |
210 | } | |
a660978c | 211 | */ |
39c6bd86 DM |
212 | |
213 | fn binary_search( | |
214 | &self, | |
215 | start_idx: usize, | |
216 | start: u64, | |
217 | end_idx: usize, | |
218 | end: u64, | |
f569acc5 | 219 | offset: u64, |
39c6bd86 | 220 | ) -> Result<usize, Error> { |
39c6bd86 DM |
221 | if (offset >= end) || (offset < start) { |
222 | bail!("offset out of range"); | |
223 | } | |
224 | ||
225 | if end_idx == start_idx { | |
226 | return Ok(start_idx); // found | |
227 | } | |
f569acc5 | 228 | let middle_idx = (start_idx + end_idx) / 2; |
39c6bd86 DM |
229 | let middle_end = self.chunk_end(middle_idx); |
230 | ||
231 | if offset < middle_end { | |
62ee2eb4 | 232 | self.binary_search(start_idx, start, middle_idx, middle_end, offset) |
39c6bd86 | 233 | } else { |
62ee2eb4 | 234 | self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset) |
39c6bd86 DM |
235 | } |
236 | } | |
237 | } | |
238 | ||
7bc1d727 WB |
239 | impl IndexFile for DynamicIndexReader { |
240 | fn index_count(&self) -> usize { | |
241 | self.index_entries | |
242 | } | |
243 | ||
244 | fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> { | |
245 | if pos >= self.index_entries { | |
246 | None | |
247 | } else { | |
f569acc5 | 248 | Some(unsafe { std::mem::transmute(self.chunk_digest(pos).as_ptr()) }) |
7bc1d727 WB |
249 | } |
250 | } | |
a660978c DM |
251 | |
252 | fn index_bytes(&self) -> u64 { | |
253 | if self.index_entries == 0 { | |
254 | 0 | |
255 | } else { | |
256 | self.chunk_end((self.index_entries - 1) as usize) | |
257 | } | |
258 | } | |
7bc1d727 WB |
259 | } |
260 | ||
d48a9955 DM |
261 | pub struct BufferedDynamicReader<S> { |
262 | store: S, | |
93d5d779 | 263 | index: DynamicIndexReader, |
39c6bd86 DM |
264 | archive_size: u64, |
265 | read_buffer: Vec<u8>, | |
266 | buffered_chunk_idx: usize, | |
267 | buffered_chunk_start: u64, | |
268 | read_offset: u64, | |
77703d95 DM |
269 | } |
270 | ||
f569acc5 | 271 | impl<S: ReadChunk> BufferedDynamicReader<S> { |
d48a9955 | 272 | pub fn new(index: DynamicIndexReader, store: S) -> Self { |
39c6bd86 DM |
273 | let archive_size = index.chunk_end(index.index_entries - 1); |
274 | Self { | |
d48a9955 | 275 | store, |
653b1ca1 WB |
276 | index, |
277 | archive_size, | |
f569acc5 | 278 | read_buffer: Vec::with_capacity(1024 * 1024), |
39c6bd86 DM |
279 | buffered_chunk_idx: 0, |
280 | buffered_chunk_start: 0, | |
281 | read_offset: 0, | |
282 | } | |
283 | } | |
284 | ||
f569acc5 WB |
285 | pub fn archive_size(&self) -> u64 { |
286 | self.archive_size | |
287 | } | |
39c6bd86 | 288 | |
0a72e267 | 289 | fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> { |
0b05fd58 | 290 | let index = &self.index; |
d48a9955 | 291 | let (start, end, digest) = index.chunk_info(idx)?; |
f98ac774 | 292 | |
f98ac774 | 293 | // fixme: avoid copy |
d48a9955 DM |
294 | |
295 | let data = self.store.read_chunk(&digest)?; | |
296 | ||
f569acc5 WB |
297 | if (end - start) != data.len() as u64 { |
298 | bail!( | |
299 | "read chunk with wrong size ({} != {}", | |
300 | (end - start), | |
301 | data.len() | |
302 | ); | |
d48a9955 | 303 | } |
f98ac774 DM |
304 | |
305 | self.read_buffer.clear(); | |
306 | self.read_buffer.extend_from_slice(&data); | |
0a72e267 DM |
307 | |
308 | self.buffered_chunk_idx = idx; | |
d48a9955 DM |
309 | |
310 | self.buffered_chunk_start = start as u64; | |
0a72e267 DM |
311 | //println!("BUFFER {} {}", self.buffered_chunk_start, end); |
312 | Ok(()) | |
313 | } | |
314 | } | |
315 | ||
f569acc5 | 316 | impl<S: ReadChunk> crate::tools::BufferedRead for BufferedDynamicReader<S> { |
0a72e267 | 317 | fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> { |
f569acc5 WB |
318 | if offset == self.archive_size { |
319 | return Ok(&self.read_buffer[0..0]); | |
320 | } | |
318564ac | 321 | |
39c6bd86 | 322 | let buffer_len = self.read_buffer.len(); |
0b05fd58 | 323 | let index = &self.index; |
39c6bd86 DM |
324 | |
325 | // optimization for sequential read | |
f569acc5 WB |
326 | if buffer_len > 0 |
327 | && ((self.buffered_chunk_idx + 1) < index.index_entries) | |
328 | && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) | |
39c6bd86 DM |
329 | { |
330 | let next_idx = self.buffered_chunk_idx + 1; | |
331 | let next_end = index.chunk_end(next_idx); | |
332 | if offset < next_end { | |
373ef4a5 | 333 | self.buffer_chunk(next_idx)?; |
39c6bd86 DM |
334 | let buffer_offset = (offset - self.buffered_chunk_start) as usize; |
335 | return Ok(&self.read_buffer[buffer_offset..]); | |
336 | } | |
337 | } | |
338 | ||
f569acc5 WB |
339 | if (buffer_len == 0) |
340 | || (offset < self.buffered_chunk_start) | |
341 | || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) | |
39c6bd86 DM |
342 | { |
343 | let end_idx = index.index_entries - 1; | |
344 | let end = index.chunk_end(end_idx); | |
345 | let idx = index.binary_search(0, 0, end_idx, end, offset)?; | |
373ef4a5 | 346 | self.buffer_chunk(idx)?; |
f569acc5 | 347 | } |
39c6bd86 DM |
348 | |
349 | let buffer_offset = (offset - self.buffered_chunk_start) as usize; | |
350 | Ok(&self.read_buffer[buffer_offset..]) | |
351 | } | |
39c6bd86 | 352 | } |
77703d95 | 353 | |
f569acc5 | 354 | impl<S: ReadChunk> std::io::Read for BufferedDynamicReader<S> { |
4624fe29 | 355 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { |
fded74d0 | 356 | use crate::tools::BufferedRead; |
f569acc5 | 357 | use std::io::{Error, ErrorKind}; |
4624fe29 DM |
358 | |
359 | let data = match self.buffered_read(self.read_offset) { | |
360 | Ok(v) => v, | |
361 | Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())), | |
362 | }; | |
363 | ||
f569acc5 WB |
364 | let n = if data.len() > buf.len() { |
365 | buf.len() | |
366 | } else { | |
367 | data.len() | |
368 | }; | |
4624fe29 | 369 | |
f569acc5 WB |
370 | unsafe { |
371 | std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n); | |
372 | } | |
4624fe29 DM |
373 | |
374 | self.read_offset += n as u64; | |
375 | ||
62ee2eb4 | 376 | Ok(n) |
4624fe29 DM |
377 | } |
378 | } | |
379 | ||
f569acc5 | 380 | impl<S: ReadChunk> std::io::Seek for BufferedDynamicReader<S> { |
d48a9955 | 381 | fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> { |
34337050 | 382 | let new_offset = match pos { |
f569acc5 WB |
383 | SeekFrom::Start(start_offset) => start_offset as i64, |
384 | SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset, | |
34337050 DM |
385 | SeekFrom::Current(offset) => (self.read_offset as i64) + offset, |
386 | }; | |
387 | ||
ddbdf80d | 388 | use std::io::{Error, ErrorKind}; |
34337050 DM |
389 | if (new_offset < 0) || (new_offset > (self.archive_size as i64)) { |
390 | return Err(Error::new( | |
391 | ErrorKind::Other, | |
f569acc5 WB |
392 | format!( |
393 | "seek is out of range {} ([0..{}])", | |
394 | new_offset, self.archive_size | |
395 | ), | |
396 | )); | |
34337050 DM |
397 | } |
398 | self.read_offset = new_offset as u64; | |
399 | ||
400 | Ok(self.read_offset) | |
401 | } | |
402 | } | |
403 | ||
976595e1 | 404 | /// Create dynamic index files (`.dixd`) |
93d5d779 | 405 | pub struct DynamicIndexWriter { |
1629d2ad | 406 | store: Arc<ChunkStore>, |
43b13033 | 407 | _lock: tools::ProcessLockSharedGuard, |
5032b57b | 408 | writer: BufWriter<File>, |
5e7a09be | 409 | closed: bool, |
0433db19 DM |
410 | filename: PathBuf, |
411 | tmp_filename: PathBuf, | |
16ff6b7c | 412 | csum: Option<openssl::sha::Sha256>, |
9f49fe1d DM |
413 | pub uuid: [u8; 16], |
414 | pub ctime: u64, | |
0433db19 DM |
415 | } |
416 | ||
93d5d779 | 417 | impl Drop for DynamicIndexWriter { |
1629d2ad DM |
418 | fn drop(&mut self) { |
419 | let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors | |
420 | } | |
421 | } | |
422 | ||
93d5d779 | 423 | impl DynamicIndexWriter { |
976595e1 | 424 | pub fn create(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> { |
43b13033 DM |
425 | let shared_lock = store.try_shared_lock()?; |
426 | ||
0433db19 DM |
427 | let full_path = store.relative_path(path); |
428 | let mut tmp_path = full_path.clone(); | |
93d5d779 | 429 | tmp_path.set_extension("tmp_didx"); |
0433db19 | 430 | |
ddbdf80d | 431 | let file = std::fs::OpenOptions::new() |
f569acc5 WB |
432 | .create(true) |
433 | .truncate(true) | |
0433db19 DM |
434 | .read(true) |
435 | .write(true) | |
436 | .open(&tmp_path)?; | |
437 | ||
f569acc5 | 438 | let mut writer = BufWriter::with_capacity(1024 * 1024, file); |
5032b57b | 439 | |
93d5d779 | 440 | let header_size = std::mem::size_of::<DynamicIndexHeader>(); |
0433db19 DM |
441 | |
442 | // todo: use static assertion when available in rust | |
f569acc5 WB |
443 | if header_size != 4096 { |
444 | panic!("got unexpected header size"); | |
445 | } | |
0433db19 | 446 | |
f569acc5 WB |
447 | let ctime = std::time::SystemTime::now() |
448 | .duration_since(std::time::SystemTime::UNIX_EPOCH)? | |
449 | .as_secs(); | |
0433db19 | 450 | |
f569acc5 | 451 | let uuid = Uuid::generate(); |
0433db19 | 452 | |
d21f8a5b | 453 | let mut buffer = vec::zeroed(header_size); |
93d5d779 | 454 | let header = crate::tools::map_struct_mut::<DynamicIndexHeader>(&mut buffer)?; |
0433db19 | 455 | |
a7dd4830 | 456 | header.magic = super::DYNAMIC_SIZED_CHUNK_INDEX_1_0; |
0433db19 DM |
457 | header.ctime = u64::to_le(ctime); |
458 | header.uuid = *uuid.as_bytes(); | |
459 | ||
16ff6b7c DM |
460 | header.index_csum = [0u8; 32]; |
461 | ||
5032b57b | 462 | writer.write_all(&buffer)?; |
0433db19 | 463 | |
16ff6b7c DM |
464 | let csum = Some(openssl::sha::Sha256::new()); |
465 | ||
0433db19 DM |
466 | Ok(Self { |
467 | store, | |
43b13033 | 468 | _lock: shared_lock, |
653b1ca1 | 469 | writer, |
5e7a09be | 470 | closed: false, |
0433db19 DM |
471 | filename: full_path, |
472 | tmp_filename: tmp_path, | |
473 | ctime, | |
474 | uuid: *uuid.as_bytes(), | |
16ff6b7c | 475 | csum, |
0433db19 DM |
476 | }) |
477 | } | |
5e7a09be | 478 | |
f98ac774 | 479 | // fixme: use add_chunk instead? |
4ee8f53d DM |
480 | pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> { |
481 | self.store.insert_chunk(chunk, digest) | |
976595e1 DM |
482 | } |
483 | ||
f569acc5 | 484 | pub fn close(&mut self) -> Result<[u8; 32], Error> { |
5e7a09be | 485 | if self.closed { |
f569acc5 WB |
486 | bail!( |
487 | "cannot close already closed archive index file {:?}", | |
488 | self.filename | |
489 | ); | |
5e7a09be DM |
490 | } |
491 | ||
492 | self.closed = true; | |
493 | ||
5032b57b | 494 | self.writer.flush()?; |
5e7a09be | 495 | |
9ea4bce4 | 496 | let csum_offset = proxmox::offsetof!(DynamicIndexHeader, index_csum); |
d48a9955 | 497 | self.writer.seek(SeekFrom::Start(csum_offset as u64))?; |
16ff6b7c DM |
498 | |
499 | let csum = self.csum.take().unwrap(); | |
500 | let index_csum = csum.finish(); | |
501 | ||
502 | self.writer.write_all(&index_csum)?; | |
503 | self.writer.flush()?; | |
504 | ||
5e7a09be DM |
505 | if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) { |
506 | bail!("Atomic rename file {:?} failed - {}", self.filename, err); | |
507 | } | |
508 | ||
16ff6b7c | 509 | Ok(index_csum) |
5e7a09be DM |
510 | } |
511 | ||
976595e1 DM |
512 | // fixme: rename to add_digest |
513 | pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), Error> { | |
514 | if self.closed { | |
f569acc5 WB |
515 | bail!( |
516 | "cannot write to closed dynamic index file {:?}", | |
517 | self.filename | |
518 | ); | |
976595e1 | 519 | } |
16ff6b7c | 520 | |
f569acc5 | 521 | let offset_le: &[u8; 8] = unsafe { &std::mem::transmute::<u64, [u8; 8]>(offset.to_le()) }; |
16ff6b7c DM |
522 | |
523 | if let Some(ref mut csum) = self.csum { | |
524 | csum.update(offset_le); | |
525 | csum.update(digest); | |
526 | } | |
527 | ||
cd69d36b DM |
528 | self.writer.write_all(offset_le)?; |
529 | self.writer.write_all(digest)?; | |
976595e1 DM |
530 | Ok(()) |
531 | } | |
532 | } | |
533 | ||
534 | /// Writer which splits a binary stream into dynamic sized chunks | |
535 | /// | |
536 | /// And store the resulting chunk list into the index file. | |
537 | pub struct DynamicChunkWriter { | |
538 | index: DynamicIndexWriter, | |
539 | closed: bool, | |
540 | chunker: Chunker, | |
541 | stat: ChunkStat, | |
542 | chunk_offset: usize, | |
543 | last_chunk: usize, | |
544 | chunk_buffer: Vec<u8>, | |
545 | } | |
546 | ||
547 | impl DynamicChunkWriter { | |
976595e1 DM |
548 | pub fn new(index: DynamicIndexWriter, chunk_size: usize) -> Self { |
549 | Self { | |
550 | index, | |
551 | closed: false, | |
552 | chunker: Chunker::new(chunk_size), | |
553 | stat: ChunkStat::new(0), | |
554 | chunk_offset: 0, | |
555 | last_chunk: 0, | |
f569acc5 | 556 | chunk_buffer: Vec::with_capacity(chunk_size * 4), |
976595e1 DM |
557 | } |
558 | } | |
559 | ||
7e336555 DM |
560 | pub fn stat(&self) -> &ChunkStat { |
561 | &self.stat | |
562 | } | |
563 | ||
f569acc5 | 564 | pub fn close(&mut self) -> Result<(), Error> { |
976595e1 DM |
565 | if self.closed { |
566 | return Ok(()); | |
567 | } | |
568 | ||
569 | self.closed = true; | |
570 | ||
571 | self.write_chunk_buffer()?; | |
572 | ||
573 | self.index.close()?; | |
574 | ||
575 | self.stat.size = self.chunk_offset as u64; | |
576 | ||
577 | // add size of index file | |
f569acc5 WB |
578 | self.stat.size += |
579 | (self.stat.chunk_count * 40 + std::mem::size_of::<DynamicIndexHeader>()) as u64; | |
976595e1 DM |
580 | |
581 | Ok(()) | |
582 | } | |
583 | ||
584 | fn write_chunk_buffer(&mut self) -> Result<(), Error> { | |
5e7a09be DM |
585 | let chunk_size = self.chunk_buffer.len(); |
586 | ||
f569acc5 WB |
587 | if chunk_size == 0 { |
588 | return Ok(()); | |
589 | } | |
5e7a09be DM |
590 | |
591 | let expected_chunk_size = self.chunk_offset - self.last_chunk; | |
592 | if expected_chunk_size != self.chunk_buffer.len() { | |
976595e1 | 593 | bail!("wrong chunk size {} != {}", expected_chunk_size, chunk_size); |
5e7a09be DM |
594 | } |
595 | ||
7e336555 | 596 | self.stat.chunk_count += 1; |
247cdbce | 597 | |
5e7a09be DM |
598 | self.last_chunk = self.chunk_offset; |
599 | ||
4ee8f53d | 600 | let (chunk, digest) = DataChunkBuilder::new(&self.chunk_buffer) |
f98ac774 DM |
601 | .compress(true) |
602 | .build()?; | |
603 | ||
4ee8f53d | 604 | match self.index.insert_chunk(&chunk, &digest) { |
f98ac774 | 605 | Ok((is_duplicate, compressed_size)) => { |
7e336555 | 606 | self.stat.compressed_size += compressed_size; |
798f7fa0 | 607 | if is_duplicate { |
7e336555 | 608 | self.stat.duplicate_chunks += 1; |
798f7fa0 | 609 | } else { |
7e336555 | 610 | self.stat.disk_size += compressed_size; |
798f7fa0 DM |
611 | } |
612 | ||
f569acc5 WB |
613 | println!( |
614 | "ADD CHUNK {:016x} {} {}% {} {}", | |
615 | self.chunk_offset, | |
616 | chunk_size, | |
617 | (compressed_size * 100) / (chunk_size as u64), | |
618 | is_duplicate, | |
619 | proxmox::tools::digest_to_hex(&digest) | |
620 | ); | |
976595e1 | 621 | self.index.add_chunk(self.chunk_offset as u64, &digest)?; |
5e7a09be | 622 | self.chunk_buffer.truncate(0); |
62ee2eb4 | 623 | Ok(()) |
5e7a09be DM |
624 | } |
625 | Err(err) => { | |
626 | self.chunk_buffer.truncate(0); | |
62ee2eb4 | 627 | Err(err) |
5e7a09be DM |
628 | } |
629 | } | |
5e7a09be | 630 | } |
0433db19 DM |
631 | } |
632 | ||
976595e1 | 633 | impl Write for DynamicChunkWriter { |
0433db19 | 634 | fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> { |
0433db19 DM |
635 | let chunker = &mut self.chunker; |
636 | ||
637 | let pos = chunker.scan(data); | |
638 | ||
639 | if pos > 0 { | |
4ee8f53d | 640 | self.chunk_buffer.extend_from_slice(&data[0..pos]); |
0433db19 DM |
641 | self.chunk_offset += pos; |
642 | ||
976595e1 | 643 | if let Err(err) = self.write_chunk_buffer() { |
f569acc5 WB |
644 | return Err(std::io::Error::new( |
645 | std::io::ErrorKind::Other, | |
646 | err.to_string(), | |
647 | )); | |
976595e1 | 648 | } |
5e7a09be | 649 | Ok(pos) |
0433db19 DM |
650 | } else { |
651 | self.chunk_offset += data.len(); | |
4ee8f53d | 652 | self.chunk_buffer.extend_from_slice(data); |
5e7a09be | 653 | Ok(data.len()) |
0433db19 DM |
654 | } |
655 | } | |
656 | ||
657 | fn flush(&mut self) -> std::result::Result<(), std::io::Error> { | |
f569acc5 WB |
658 | Err(std::io::Error::new( |
659 | std::io::ErrorKind::Other, | |
660 | "please use close() instead of flush()", | |
661 | )) | |
0433db19 DM |
662 | } |
663 | } |