]>
Commit | Line | Data |
---|---|---|
0433db19 DM |
1 | use failure::*; |
2 | ||
22968600 | 3 | use crate::tools; |
7e336555 | 4 | use super::chunk_stat::*; |
0433db19 DM |
5 | use super::chunk_store::*; |
6 | use super::chunker::*; | |
7 | ||
1629d2ad | 8 | use std::sync::Arc; |
5032b57b | 9 | use std::io::{Read, Write, BufWriter}; |
0433db19 DM |
10 | use std::fs::File; |
11 | use std::path::{Path, PathBuf}; | |
12 | use std::os::unix::io::AsRawFd; | |
13 | use uuid::Uuid; | |
ddbdf80d | 14 | //use chrono::{Local, TimeZone}; |
0433db19 | 15 | |
e5064ba6 | 16 | /// Header format definition for dynamic index files (`.dixd`) |
0433db19 | 17 | #[repr(C)] |
93d5d779 | 18 | pub struct DynamicIndexHeader { |
e5064ba6 | 19 | /// The string `PROXMOX-DIDX` |
0433db19 DM |
20 | pub magic: [u8; 12], |
21 | pub version: u32, | |
22 | pub uuid: [u8; 16], | |
23 | pub ctime: u64, | |
44b3f62b | 24 | reserved: [u8; 4056], // overall size is one page (4096 bytes) |
0433db19 DM |
25 | } |
26 | ||
77703d95 | 27 | |
93d5d779 | 28 | pub struct DynamicIndexReader { |
150f1bd8 | 29 | store: Arc<ChunkStore>, |
728797d0 | 30 | _file: File, |
9f49fe1d | 31 | pub size: usize, |
77703d95 DM |
32 | filename: PathBuf, |
33 | index: *const u8, | |
34 | index_entries: usize, | |
9f49fe1d DM |
35 | pub uuid: [u8; 16], |
36 | pub ctime: u64, | |
77703d95 DM |
37 | } |
38 | ||
5be4065b WB |
39 | // `index` is mmap()ed which cannot be thread-local so should be sendable |
40 | // FIXME: Introduce an mmap wrapper type for this? | |
93d5d779 | 41 | unsafe impl Send for DynamicIndexReader {} |
0b05fd58 | 42 | |
93d5d779 | 43 | impl Drop for DynamicIndexReader { |
77703d95 DM |
44 | |
45 | fn drop(&mut self) { | |
46 | if let Err(err) = self.unmap() { | |
47 | eprintln!("Unable to unmap file {:?} - {}", self.filename, err); | |
48 | } | |
49 | } | |
50 | } | |
51 | ||
93d5d779 | 52 | impl DynamicIndexReader { |
77703d95 | 53 | |
150f1bd8 | 54 | pub fn open(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> { |
77703d95 DM |
55 | |
56 | let full_path = store.relative_path(path); | |
57 | ||
58 | let mut file = std::fs::File::open(&full_path)?; | |
59 | ||
93d5d779 | 60 | let header_size = std::mem::size_of::<DynamicIndexHeader>(); |
77703d95 DM |
61 | |
62 | // todo: use static assertion when available in rust | |
63 | if header_size != 4096 { bail!("got unexpected header size for {:?}", path); } | |
64 | ||
65 | let mut buffer = vec![0u8; header_size]; | |
66 | file.read_exact(&mut buffer)?; | |
67 | ||
93d5d779 | 68 | let header = unsafe { &mut * (buffer.as_ptr() as *mut DynamicIndexHeader) }; |
77703d95 | 69 | |
93d5d779 | 70 | if header.magic != *b"PROXMOX-DIDX" { |
77703d95 DM |
71 | bail!("got unknown magic number for {:?}", path); |
72 | } | |
73 | ||
74 | let version = u32::from_le(header.version); | |
75 | if version != 1 { | |
76 | bail!("got unsupported version number ({}) for {:?}", version, path); | |
77 | } | |
78 | ||
79 | let ctime = u64::from_le(header.ctime); | |
80 | ||
81 | let rawfd = file.as_raw_fd(); | |
82 | ||
83 | let stat = match nix::sys::stat::fstat(rawfd) { | |
84 | Ok(stat) => stat, | |
85 | Err(err) => bail!("fstat {:?} failed - {}", path, err), | |
86 | }; | |
87 | ||
88 | let size = stat.st_size as usize; | |
89 | ||
ddbdf80d | 90 | let index_size = size - header_size; |
77703d95 DM |
91 | if (index_size % 40) != 0 { |
92 | bail!("got unexpected file size for {:?}", path); | |
93 | } | |
94 | ||
95 | let data = unsafe { nix::sys::mman::mmap( | |
96 | std::ptr::null_mut(), | |
97 | index_size, | |
98 | nix::sys::mman::ProtFlags::PROT_READ, | |
99 | nix::sys::mman::MapFlags::MAP_PRIVATE, | |
100 | rawfd, | |
101 | header_size as i64) }? as *const u8; | |
102 | ||
77703d95 DM |
103 | Ok(Self { |
104 | store, | |
105 | filename: full_path, | |
728797d0 | 106 | _file: file, |
77703d95 DM |
107 | size, |
108 | index: data, | |
109 | index_entries: index_size/40, | |
110 | ctime, | |
111 | uuid: header.uuid, | |
112 | }) | |
113 | } | |
114 | ||
115 | fn unmap(&mut self) -> Result<(), Error> { | |
116 | ||
117 | if self.index == std::ptr::null_mut() { return Ok(()); } | |
118 | ||
b663789b | 119 | if let Err(err) = unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, self.index_entries*40) } { |
77703d95 DM |
120 | bail!("unmap file {:?} failed - {}", self.filename, err); |
121 | } | |
122 | ||
123 | self.index = std::ptr::null_mut(); | |
124 | ||
125 | Ok(()) | |
126 | } | |
127 | ||
39c6bd86 DM |
128 | #[inline] |
129 | fn chunk_end(&self, pos: usize) -> u64 { | |
130 | if pos >= self.index_entries { | |
131 | panic!("chunk index out of range"); | |
132 | } | |
133 | unsafe { *(self.index.add(pos*40) as *const u64) } | |
134 | } | |
135 | ||
136 | #[inline] | |
137 | fn chunk_digest(&self, pos: usize) -> &[u8] { | |
138 | if pos >= self.index_entries { | |
139 | panic!("chunk index out of range"); | |
140 | } | |
141 | unsafe { std::slice::from_raw_parts(self.index.add(pos*40+8), 32) } | |
142 | } | |
143 | ||
ddbdf80d | 144 | pub fn mark_used_chunks(&self, _status: &mut GarbageCollectionStatus) -> Result<(), Error> { |
77703d95 DM |
145 | |
146 | for pos in 0..self.index_entries { | |
39c6bd86 | 147 | let digest = self.chunk_digest(pos); |
77703d95 DM |
148 | if let Err(err) = self.store.touch_chunk(digest) { |
149 | bail!("unable to access chunk {}, required by {:?} - {}", | |
22968600 | 150 | tools::digest_to_hex(digest), self.filename, err); |
77703d95 DM |
151 | } |
152 | } | |
153 | Ok(()) | |
154 | } | |
96df2fb4 DM |
155 | |
156 | pub fn dump_catar(&self, mut writer: Box<Write>) -> Result<(), Error> { | |
157 | ||
df9973e8 DM |
158 | let mut buffer = Vec::with_capacity(1024*1024); |
159 | ||
96df2fb4 | 160 | for pos in 0..self.index_entries { |
ddbdf80d | 161 | let _end = self.chunk_end(pos); |
39c6bd86 DM |
162 | let digest = self.chunk_digest(pos); |
163 | //println!("Dump {:08x}", end ); | |
df9973e8 | 164 | self.store.read_chunk(digest, &mut buffer)?; |
df9973e8 | 165 | writer.write_all(&buffer)?; |
96df2fb4 DM |
166 | |
167 | } | |
168 | ||
169 | Ok(()) | |
170 | } | |
39c6bd86 DM |
171 | |
172 | fn binary_search( | |
173 | &self, | |
174 | start_idx: usize, | |
175 | start: u64, | |
176 | end_idx: usize, | |
177 | end: u64, | |
178 | offset: u64 | |
179 | ) -> Result<usize, Error> { | |
180 | ||
181 | if (offset >= end) || (offset < start) { | |
182 | bail!("offset out of range"); | |
183 | } | |
184 | ||
185 | if end_idx == start_idx { | |
186 | return Ok(start_idx); // found | |
187 | } | |
188 | let middle_idx = (start_idx + end_idx)/2; | |
189 | let middle_end = self.chunk_end(middle_idx); | |
190 | ||
191 | if offset < middle_end { | |
192 | return self.binary_search(start_idx, start, middle_idx, middle_end, offset); | |
193 | } else { | |
194 | return self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset); | |
195 | } | |
196 | } | |
197 | } | |
198 | ||
93d5d779 DM |
199 | pub struct BufferedDynamicReader { |
200 | index: DynamicIndexReader, | |
39c6bd86 DM |
201 | archive_size: u64, |
202 | read_buffer: Vec<u8>, | |
203 | buffered_chunk_idx: usize, | |
204 | buffered_chunk_start: u64, | |
205 | read_offset: u64, | |
77703d95 DM |
206 | } |
207 | ||
93d5d779 | 208 | impl BufferedDynamicReader { |
39c6bd86 | 209 | |
93d5d779 | 210 | pub fn new(index: DynamicIndexReader) -> Self { |
39c6bd86 DM |
211 | |
212 | let archive_size = index.chunk_end(index.index_entries - 1); | |
213 | Self { | |
214 | index: index, | |
215 | archive_size: archive_size, | |
216 | read_buffer: Vec::with_capacity(1024*1024), | |
217 | buffered_chunk_idx: 0, | |
218 | buffered_chunk_start: 0, | |
219 | read_offset: 0, | |
220 | } | |
221 | } | |
222 | ||
223 | pub fn archive_size(&self) -> u64 { self.archive_size } | |
224 | ||
0a72e267 DM |
225 | fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> { |
226 | ||
0b05fd58 | 227 | let index = &self.index; |
0a72e267 DM |
228 | let end = index.chunk_end(idx); |
229 | let digest = index.chunk_digest(idx); | |
230 | index.store.read_chunk(digest, &mut self.read_buffer)?; | |
231 | ||
232 | self.buffered_chunk_idx = idx; | |
233 | self.buffered_chunk_start = end - (self.read_buffer.len() as u64); | |
234 | //println!("BUFFER {} {}", self.buffered_chunk_start, end); | |
235 | Ok(()) | |
236 | } | |
237 | } | |
238 | ||
fded74d0 | 239 | impl crate::tools::BufferedRead for BufferedDynamicReader { |
0a72e267 DM |
240 | |
241 | fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> { | |
39c6bd86 | 242 | |
318564ac DM |
243 | if offset == self.archive_size { return Ok(&self.read_buffer[0..0]); } |
244 | ||
39c6bd86 | 245 | let buffer_len = self.read_buffer.len(); |
0b05fd58 | 246 | let index = &self.index; |
39c6bd86 DM |
247 | |
248 | // optimization for sequential read | |
249 | if buffer_len > 0 && | |
250 | ((self.buffered_chunk_idx + 1) < index.index_entries) && | |
251 | (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) | |
252 | { | |
253 | let next_idx = self.buffered_chunk_idx + 1; | |
254 | let next_end = index.chunk_end(next_idx); | |
255 | if offset < next_end { | |
373ef4a5 | 256 | self.buffer_chunk(next_idx)?; |
39c6bd86 DM |
257 | let buffer_offset = (offset - self.buffered_chunk_start) as usize; |
258 | return Ok(&self.read_buffer[buffer_offset..]); | |
259 | } | |
260 | } | |
261 | ||
262 | if (buffer_len == 0) || | |
263 | (offset < self.buffered_chunk_start) || | |
264 | (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) | |
265 | { | |
266 | let end_idx = index.index_entries - 1; | |
267 | let end = index.chunk_end(end_idx); | |
268 | let idx = index.binary_search(0, 0, end_idx, end, offset)?; | |
373ef4a5 | 269 | self.buffer_chunk(idx)?; |
39c6bd86 DM |
270 | } |
271 | ||
272 | let buffer_offset = (offset - self.buffered_chunk_start) as usize; | |
273 | Ok(&self.read_buffer[buffer_offset..]) | |
274 | } | |
275 | ||
39c6bd86 | 276 | } |
77703d95 | 277 | |
93d5d779 | 278 | impl std::io::Read for BufferedDynamicReader { |
4624fe29 DM |
279 | |
280 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { | |
281 | ||
282 | use std::io::{Error, ErrorKind}; | |
fded74d0 | 283 | use crate::tools::BufferedRead; |
4624fe29 DM |
284 | |
285 | let data = match self.buffered_read(self.read_offset) { | |
286 | Ok(v) => v, | |
287 | Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())), | |
288 | }; | |
289 | ||
290 | let n = if data.len() > buf.len() { buf.len() } else { data.len() }; | |
291 | ||
292 | unsafe { std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n); } | |
293 | ||
294 | self.read_offset += n as u64; | |
295 | ||
296 | return Ok(n); | |
297 | } | |
298 | } | |
299 | ||
93d5d779 | 300 | impl std::io::Seek for BufferedDynamicReader { |
34337050 DM |
301 | |
302 | fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, std::io::Error> { | |
303 | ||
ddbdf80d | 304 | use std::io::{SeekFrom}; |
34337050 DM |
305 | |
306 | let new_offset = match pos { | |
307 | SeekFrom::Start(start_offset) => start_offset as i64, | |
308 | SeekFrom::End(end_offset) => (self.archive_size as i64)+ end_offset, | |
309 | SeekFrom::Current(offset) => (self.read_offset as i64) + offset, | |
310 | }; | |
311 | ||
ddbdf80d | 312 | use std::io::{Error, ErrorKind}; |
34337050 DM |
313 | if (new_offset < 0) || (new_offset > (self.archive_size as i64)) { |
314 | return Err(Error::new( | |
315 | ErrorKind::Other, | |
316 | format!("seek is out of range {} ([0..{}])", new_offset, self.archive_size))); | |
317 | } | |
318 | self.read_offset = new_offset as u64; | |
319 | ||
320 | Ok(self.read_offset) | |
321 | } | |
322 | } | |
323 | ||
93d5d779 | 324 | pub struct DynamicIndexWriter { |
1629d2ad | 325 | store: Arc<ChunkStore>, |
0433db19 | 326 | chunker: Chunker, |
5032b57b | 327 | writer: BufWriter<File>, |
5e7a09be | 328 | closed: bool, |
0433db19 DM |
329 | filename: PathBuf, |
330 | tmp_filename: PathBuf, | |
9f49fe1d DM |
331 | pub uuid: [u8; 16], |
332 | pub ctime: u64, | |
0433db19 | 333 | |
7e336555 DM |
334 | stat: ChunkStat, |
335 | ||
0433db19 DM |
336 | chunk_offset: usize, |
337 | last_chunk: usize, | |
338 | chunk_buffer: Vec<u8>, | |
339 | } | |
340 | ||
93d5d779 | 341 | impl Drop for DynamicIndexWriter { |
1629d2ad DM |
342 | |
343 | fn drop(&mut self) { | |
344 | let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors | |
345 | } | |
346 | } | |
347 | ||
93d5d779 | 348 | impl DynamicIndexWriter { |
0433db19 | 349 | |
1629d2ad | 350 | pub fn create(store: Arc<ChunkStore>, path: &Path, chunk_size: usize) -> Result<Self, Error> { |
0433db19 DM |
351 | |
352 | let full_path = store.relative_path(path); | |
353 | let mut tmp_path = full_path.clone(); | |
93d5d779 | 354 | tmp_path.set_extension("tmp_didx"); |
0433db19 | 355 | |
ddbdf80d | 356 | let file = std::fs::OpenOptions::new() |
0433db19 DM |
357 | .create(true).truncate(true) |
358 | .read(true) | |
359 | .write(true) | |
360 | .open(&tmp_path)?; | |
361 | ||
5032b57b DM |
362 | let mut writer = BufWriter::with_capacity(1024*1024, file); |
363 | ||
93d5d779 | 364 | let header_size = std::mem::size_of::<DynamicIndexHeader>(); |
0433db19 DM |
365 | |
366 | // todo: use static assertion when available in rust | |
367 | if header_size != 4096 { panic!("got unexpected header size"); } | |
368 | ||
369 | let ctime = std::time::SystemTime::now().duration_since( | |
370 | std::time::SystemTime::UNIX_EPOCH)?.as_secs(); | |
371 | ||
372 | let uuid = Uuid::new_v4(); | |
373 | ||
374 | let mut buffer = vec![0u8; header_size]; | |
93d5d779 | 375 | let header = crate::tools::map_struct_mut::<DynamicIndexHeader>(&mut buffer)?; |
0433db19 | 376 | |
93d5d779 | 377 | header.magic = *b"PROXMOX-DIDX"; |
0433db19 DM |
378 | header.version = u32::to_le(1); |
379 | header.ctime = u64::to_le(ctime); | |
380 | header.uuid = *uuid.as_bytes(); | |
381 | ||
5032b57b | 382 | writer.write_all(&buffer)?; |
0433db19 DM |
383 | |
384 | Ok(Self { | |
385 | store, | |
386 | chunker: Chunker::new(chunk_size), | |
5032b57b | 387 | writer: writer, |
5e7a09be | 388 | closed: false, |
0433db19 DM |
389 | filename: full_path, |
390 | tmp_filename: tmp_path, | |
391 | ctime, | |
392 | uuid: *uuid.as_bytes(), | |
393 | ||
7e336555 DM |
394 | stat: ChunkStat::new(0), |
395 | ||
0433db19 DM |
396 | chunk_offset: 0, |
397 | last_chunk: 0, | |
398 | chunk_buffer: Vec::with_capacity(chunk_size*4), | |
399 | }) | |
400 | } | |
5e7a09be DM |
401 | |
402 | pub fn close(&mut self) -> Result<(), Error> { | |
403 | ||
404 | if self.closed { | |
405 | bail!("cannot close already closed archive index file {:?}", self.filename); | |
406 | } | |
407 | ||
408 | self.closed = true; | |
409 | ||
410 | self.write_chunk_buffer()?; | |
411 | ||
5032b57b | 412 | self.writer.flush()?; |
5e7a09be | 413 | |
7e336555 DM |
414 | self.stat.size = self.chunk_offset as u64; |
415 | ||
416 | // add size of index file | |
417 | self.stat.size += (self.stat.chunk_count*40 + std::mem::size_of::<DynamicIndexHeader>()) as u64; | |
418 | ||
419 | println!("STAT: {:?}", self.stat); | |
420 | ||
5e7a09be DM |
421 | // fixme: |
422 | ||
423 | if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) { | |
424 | bail!("Atomic rename file {:?} failed - {}", self.filename, err); | |
425 | } | |
426 | ||
427 | Ok(()) | |
428 | } | |
429 | ||
7e336555 DM |
430 | pub fn stat(&self) -> &ChunkStat { |
431 | &self.stat | |
432 | } | |
433 | ||
5e7a09be DM |
434 | fn write_chunk_buffer(&mut self) -> Result<(), std::io::Error> { |
435 | ||
436 | use std::io::{Error, ErrorKind}; | |
437 | ||
438 | let chunk_size = self.chunk_buffer.len(); | |
439 | ||
440 | if chunk_size == 0 { return Ok(()); } | |
441 | ||
442 | let expected_chunk_size = self.chunk_offset - self.last_chunk; | |
443 | if expected_chunk_size != self.chunk_buffer.len() { | |
444 | return Err(Error::new( | |
445 | ErrorKind::Other, | |
446 | format!("wrong chunk size {} != {}", expected_chunk_size, chunk_size))); | |
447 | } | |
448 | ||
7e336555 | 449 | self.stat.chunk_count += 1; |
247cdbce | 450 | |
5e7a09be DM |
451 | self.last_chunk = self.chunk_offset; |
452 | ||
453 | match self.store.insert_chunk(&self.chunk_buffer) { | |
798f7fa0 DM |
454 | Ok((is_duplicate, digest, compressed_size)) => { |
455 | ||
7e336555 | 456 | self.stat.compressed_size += compressed_size; |
798f7fa0 | 457 | if is_duplicate { |
7e336555 | 458 | self.stat.duplicate_chunks += 1; |
798f7fa0 | 459 | } else { |
7e336555 | 460 | self.stat.disk_size += compressed_size; |
798f7fa0 DM |
461 | } |
462 | ||
463 | println!("ADD CHUNK {:016x} {} {}% {} {}", self.chunk_offset, chunk_size, | |
464 | (compressed_size*100)/(chunk_size as u64), is_duplicate, tools::digest_to_hex(&digest)); | |
f661f374 | 465 | self.add_chunk(self.chunk_offset as u64, &digest)?; |
5e7a09be DM |
466 | self.chunk_buffer.truncate(0); |
467 | return Ok(()); | |
468 | } | |
469 | Err(err) => { | |
470 | self.chunk_buffer.truncate(0); | |
471 | return Err(Error::new(ErrorKind::Other, err.to_string())); | |
472 | } | |
473 | } | |
5e7a09be | 474 | } |
f661f374 WB |
475 | |
476 | pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), std::io::Error> { | |
477 | self.writer.write(unsafe { &std::mem::transmute::<u64, [u8;8]>(offset.to_le()) })?; | |
478 | self.writer.write(digest)?; | |
479 | Ok(()) | |
480 | } | |
0433db19 DM |
481 | } |
482 | ||
93d5d779 | 483 | impl Write for DynamicIndexWriter { |
0433db19 DM |
484 | |
485 | fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> { | |
486 | ||
0433db19 DM |
487 | let chunker = &mut self.chunker; |
488 | ||
489 | let pos = chunker.scan(data); | |
490 | ||
491 | if pos > 0 { | |
492 | self.chunk_buffer.extend(&data[0..pos]); | |
493 | self.chunk_offset += pos; | |
494 | ||
5e7a09be DM |
495 | self.write_chunk_buffer()?; |
496 | Ok(pos) | |
0433db19 DM |
497 | |
498 | } else { | |
499 | self.chunk_offset += data.len(); | |
500 | self.chunk_buffer.extend(data); | |
5e7a09be | 501 | Ok(data.len()) |
0433db19 DM |
502 | } |
503 | } | |
504 | ||
505 | fn flush(&mut self) -> std::result::Result<(), std::io::Error> { | |
506 | ||
94a882e9 DM |
507 | use std::io::{Error, ErrorKind}; |
508 | ||
5e7a09be | 509 | Err(Error::new(ErrorKind::Other, "please use close() instead of flush()")) |
0433db19 DM |
510 | } |
511 | } |