]>
Commit | Line | Data |
---|---|---|
0433db19 DM |
1 | use failure::*; |
2 | ||
22968600 | 3 | use crate::tools; |
0433db19 DM |
4 | use super::chunk_store::*; |
5 | use super::chunker::*; | |
6 | ||
1629d2ad | 7 | use std::sync::Arc; |
5032b57b | 8 | use std::io::{Read, Write, BufWriter}; |
0433db19 DM |
9 | use std::fs::File; |
10 | use std::path::{Path, PathBuf}; | |
11 | use std::os::unix::io::AsRawFd; | |
12 | use uuid::Uuid; | |
ddbdf80d | 13 | //use chrono::{Local, TimeZone}; |
0433db19 DM |
14 | |
15 | #[repr(C)] | |
91a905b6 | 16 | //pub struct DynamicIndexHeader { |
0433db19 DM |
17 | pub struct ArchiveIndexHeader { |
18 | pub magic: [u8; 12], | |
19 | pub version: u32, | |
20 | pub uuid: [u8; 16], | |
21 | pub ctime: u64, | |
44b3f62b | 22 | reserved: [u8; 4056], // overall size is one page (4096 bytes) |
0433db19 DM |
23 | } |
24 | ||
77703d95 | 25 | |
150f1bd8 DM |
26 | pub struct ArchiveIndexReader { |
27 | store: Arc<ChunkStore>, | |
728797d0 | 28 | _file: File, |
9f49fe1d | 29 | pub size: usize, |
77703d95 DM |
30 | filename: PathBuf, |
31 | index: *const u8, | |
32 | index_entries: usize, | |
9f49fe1d DM |
33 | pub uuid: [u8; 16], |
34 | pub ctime: u64, | |
77703d95 DM |
35 | } |
36 | ||
0b05fd58 DM |
37 | // fixme: ???!!! |
38 | unsafe impl Send for ArchiveIndexReader {} | |
39 | ||
150f1bd8 | 40 | impl Drop for ArchiveIndexReader { |
77703d95 DM |
41 | |
42 | fn drop(&mut self) { | |
43 | if let Err(err) = self.unmap() { | |
44 | eprintln!("Unable to unmap file {:?} - {}", self.filename, err); | |
45 | } | |
46 | } | |
47 | } | |
48 | ||
150f1bd8 | 49 | impl ArchiveIndexReader { |
77703d95 | 50 | |
150f1bd8 | 51 | pub fn open(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> { |
77703d95 DM |
52 | |
53 | let full_path = store.relative_path(path); | |
54 | ||
55 | let mut file = std::fs::File::open(&full_path)?; | |
56 | ||
57 | let header_size = std::mem::size_of::<ArchiveIndexHeader>(); | |
58 | ||
59 | // todo: use static assertion when available in rust | |
60 | if header_size != 4096 { bail!("got unexpected header size for {:?}", path); } | |
61 | ||
62 | let mut buffer = vec![0u8; header_size]; | |
63 | file.read_exact(&mut buffer)?; | |
64 | ||
65 | let header = unsafe { &mut * (buffer.as_ptr() as *mut ArchiveIndexHeader) }; | |
66 | ||
67 | if header.magic != *b"PROXMOX-AIDX" { | |
68 | bail!("got unknown magic number for {:?}", path); | |
69 | } | |
70 | ||
71 | let version = u32::from_le(header.version); | |
72 | if version != 1 { | |
73 | bail!("got unsupported version number ({}) for {:?}", version, path); | |
74 | } | |
75 | ||
76 | let ctime = u64::from_le(header.ctime); | |
77 | ||
78 | let rawfd = file.as_raw_fd(); | |
79 | ||
80 | let stat = match nix::sys::stat::fstat(rawfd) { | |
81 | Ok(stat) => stat, | |
82 | Err(err) => bail!("fstat {:?} failed - {}", path, err), | |
83 | }; | |
84 | ||
85 | let size = stat.st_size as usize; | |
86 | ||
ddbdf80d | 87 | let index_size = size - header_size; |
77703d95 DM |
88 | if (index_size % 40) != 0 { |
89 | bail!("got unexpected file size for {:?}", path); | |
90 | } | |
91 | ||
92 | let data = unsafe { nix::sys::mman::mmap( | |
93 | std::ptr::null_mut(), | |
94 | index_size, | |
95 | nix::sys::mman::ProtFlags::PROT_READ, | |
96 | nix::sys::mman::MapFlags::MAP_PRIVATE, | |
97 | rawfd, | |
98 | header_size as i64) }? as *const u8; | |
99 | ||
77703d95 DM |
100 | Ok(Self { |
101 | store, | |
102 | filename: full_path, | |
728797d0 | 103 | _file: file, |
77703d95 DM |
104 | size, |
105 | index: data, | |
106 | index_entries: index_size/40, | |
107 | ctime, | |
108 | uuid: header.uuid, | |
109 | }) | |
110 | } | |
111 | ||
112 | fn unmap(&mut self) -> Result<(), Error> { | |
113 | ||
114 | if self.index == std::ptr::null_mut() { return Ok(()); } | |
115 | ||
b663789b | 116 | if let Err(err) = unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, self.index_entries*40) } { |
77703d95 DM |
117 | bail!("unmap file {:?} failed - {}", self.filename, err); |
118 | } | |
119 | ||
120 | self.index = std::ptr::null_mut(); | |
121 | ||
122 | Ok(()) | |
123 | } | |
124 | ||
39c6bd86 DM |
125 | #[inline] |
126 | fn chunk_end(&self, pos: usize) -> u64 { | |
127 | if pos >= self.index_entries { | |
128 | panic!("chunk index out of range"); | |
129 | } | |
130 | unsafe { *(self.index.add(pos*40) as *const u64) } | |
131 | } | |
132 | ||
133 | #[inline] | |
134 | fn chunk_digest(&self, pos: usize) -> &[u8] { | |
135 | if pos >= self.index_entries { | |
136 | panic!("chunk index out of range"); | |
137 | } | |
138 | unsafe { std::slice::from_raw_parts(self.index.add(pos*40+8), 32) } | |
139 | } | |
140 | ||
ddbdf80d | 141 | pub fn mark_used_chunks(&self, _status: &mut GarbageCollectionStatus) -> Result<(), Error> { |
77703d95 DM |
142 | |
143 | for pos in 0..self.index_entries { | |
39c6bd86 | 144 | let digest = self.chunk_digest(pos); |
77703d95 DM |
145 | if let Err(err) = self.store.touch_chunk(digest) { |
146 | bail!("unable to access chunk {}, required by {:?} - {}", | |
22968600 | 147 | tools::digest_to_hex(digest), self.filename, err); |
77703d95 DM |
148 | } |
149 | } | |
150 | Ok(()) | |
151 | } | |
96df2fb4 DM |
152 | |
153 | pub fn dump_catar(&self, mut writer: Box<Write>) -> Result<(), Error> { | |
154 | ||
df9973e8 DM |
155 | let mut buffer = Vec::with_capacity(1024*1024); |
156 | ||
96df2fb4 | 157 | for pos in 0..self.index_entries { |
ddbdf80d | 158 | let _end = self.chunk_end(pos); |
39c6bd86 DM |
159 | let digest = self.chunk_digest(pos); |
160 | //println!("Dump {:08x}", end ); | |
df9973e8 | 161 | self.store.read_chunk(digest, &mut buffer)?; |
df9973e8 | 162 | writer.write_all(&buffer)?; |
96df2fb4 DM |
163 | |
164 | } | |
165 | ||
166 | Ok(()) | |
167 | } | |
39c6bd86 DM |
168 | |
169 | fn binary_search( | |
170 | &self, | |
171 | start_idx: usize, | |
172 | start: u64, | |
173 | end_idx: usize, | |
174 | end: u64, | |
175 | offset: u64 | |
176 | ) -> Result<usize, Error> { | |
177 | ||
178 | if (offset >= end) || (offset < start) { | |
179 | bail!("offset out of range"); | |
180 | } | |
181 | ||
182 | if end_idx == start_idx { | |
183 | return Ok(start_idx); // found | |
184 | } | |
185 | let middle_idx = (start_idx + end_idx)/2; | |
186 | let middle_end = self.chunk_end(middle_idx); | |
187 | ||
188 | if offset < middle_end { | |
189 | return self.binary_search(start_idx, start, middle_idx, middle_end, offset); | |
190 | } else { | |
191 | return self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset); | |
192 | } | |
193 | } | |
194 | } | |
195 | ||
0b05fd58 DM |
196 | pub struct BufferedArchiveReader { |
197 | index: ArchiveIndexReader, | |
39c6bd86 DM |
198 | archive_size: u64, |
199 | read_buffer: Vec<u8>, | |
200 | buffered_chunk_idx: usize, | |
201 | buffered_chunk_start: u64, | |
202 | read_offset: u64, | |
77703d95 DM |
203 | } |
204 | ||
0b05fd58 | 205 | impl BufferedArchiveReader { |
39c6bd86 | 206 | |
0b05fd58 | 207 | pub fn new(index: ArchiveIndexReader) -> Self { |
39c6bd86 DM |
208 | |
209 | let archive_size = index.chunk_end(index.index_entries - 1); | |
210 | Self { | |
211 | index: index, | |
212 | archive_size: archive_size, | |
213 | read_buffer: Vec::with_capacity(1024*1024), | |
214 | buffered_chunk_idx: 0, | |
215 | buffered_chunk_start: 0, | |
216 | read_offset: 0, | |
217 | } | |
218 | } | |
219 | ||
220 | pub fn archive_size(&self) -> u64 { self.archive_size } | |
221 | ||
0a72e267 DM |
222 | fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> { |
223 | ||
0b05fd58 | 224 | let index = &self.index; |
0a72e267 DM |
225 | let end = index.chunk_end(idx); |
226 | let digest = index.chunk_digest(idx); | |
227 | index.store.read_chunk(digest, &mut self.read_buffer)?; | |
228 | ||
229 | self.buffered_chunk_idx = idx; | |
230 | self.buffered_chunk_start = end - (self.read_buffer.len() as u64); | |
231 | //println!("BUFFER {} {}", self.buffered_chunk_start, end); | |
232 | Ok(()) | |
233 | } | |
234 | } | |
235 | ||
0b05fd58 | 236 | impl crate::tools::BufferedReader for BufferedArchiveReader { |
0a72e267 DM |
237 | |
238 | fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> { | |
39c6bd86 | 239 | |
318564ac DM |
240 | if offset == self.archive_size { return Ok(&self.read_buffer[0..0]); } |
241 | ||
39c6bd86 | 242 | let buffer_len = self.read_buffer.len(); |
0b05fd58 | 243 | let index = &self.index; |
39c6bd86 DM |
244 | |
245 | // optimization for sequential read | |
246 | if buffer_len > 0 && | |
247 | ((self.buffered_chunk_idx + 1) < index.index_entries) && | |
248 | (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) | |
249 | { | |
250 | let next_idx = self.buffered_chunk_idx + 1; | |
251 | let next_end = index.chunk_end(next_idx); | |
252 | if offset < next_end { | |
373ef4a5 | 253 | self.buffer_chunk(next_idx)?; |
39c6bd86 DM |
254 | let buffer_offset = (offset - self.buffered_chunk_start) as usize; |
255 | return Ok(&self.read_buffer[buffer_offset..]); | |
256 | } | |
257 | } | |
258 | ||
259 | if (buffer_len == 0) || | |
260 | (offset < self.buffered_chunk_start) || | |
261 | (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) | |
262 | { | |
263 | let end_idx = index.index_entries - 1; | |
264 | let end = index.chunk_end(end_idx); | |
265 | let idx = index.binary_search(0, 0, end_idx, end, offset)?; | |
373ef4a5 | 266 | self.buffer_chunk(idx)?; |
39c6bd86 DM |
267 | } |
268 | ||
269 | let buffer_offset = (offset - self.buffered_chunk_start) as usize; | |
270 | Ok(&self.read_buffer[buffer_offset..]) | |
271 | } | |
272 | ||
39c6bd86 | 273 | } |
77703d95 | 274 | |
0b05fd58 | 275 | impl std::io::Read for BufferedArchiveReader { |
4624fe29 DM |
276 | |
277 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { | |
278 | ||
279 | use std::io::{Error, ErrorKind}; | |
280 | use crate::tools::BufferedReader; | |
281 | ||
282 | let data = match self.buffered_read(self.read_offset) { | |
283 | Ok(v) => v, | |
284 | Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())), | |
285 | }; | |
286 | ||
287 | let n = if data.len() > buf.len() { buf.len() } else { data.len() }; | |
288 | ||
289 | unsafe { std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n); } | |
290 | ||
291 | self.read_offset += n as u64; | |
292 | ||
293 | return Ok(n); | |
294 | } | |
295 | } | |
296 | ||
0b05fd58 | 297 | impl std::io::Seek for BufferedArchiveReader { |
34337050 DM |
298 | |
299 | fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, std::io::Error> { | |
300 | ||
ddbdf80d | 301 | use std::io::{SeekFrom}; |
34337050 DM |
302 | |
303 | let new_offset = match pos { | |
304 | SeekFrom::Start(start_offset) => start_offset as i64, | |
305 | SeekFrom::End(end_offset) => (self.archive_size as i64)+ end_offset, | |
306 | SeekFrom::Current(offset) => (self.read_offset as i64) + offset, | |
307 | }; | |
308 | ||
ddbdf80d | 309 | use std::io::{Error, ErrorKind}; |
34337050 DM |
310 | if (new_offset < 0) || (new_offset > (self.archive_size as i64)) { |
311 | return Err(Error::new( | |
312 | ErrorKind::Other, | |
313 | format!("seek is out of range {} ([0..{}])", new_offset, self.archive_size))); | |
314 | } | |
315 | self.read_offset = new_offset as u64; | |
316 | ||
317 | Ok(self.read_offset) | |
318 | } | |
319 | } | |
320 | ||
1629d2ad DM |
321 | pub struct ArchiveIndexWriter { |
322 | store: Arc<ChunkStore>, | |
0433db19 | 323 | chunker: Chunker, |
5032b57b | 324 | writer: BufWriter<File>, |
5e7a09be | 325 | closed: bool, |
0433db19 DM |
326 | filename: PathBuf, |
327 | tmp_filename: PathBuf, | |
9f49fe1d DM |
328 | pub uuid: [u8; 16], |
329 | pub ctime: u64, | |
0433db19 DM |
330 | |
331 | chunk_offset: usize, | |
332 | last_chunk: usize, | |
333 | chunk_buffer: Vec<u8>, | |
334 | } | |
335 | ||
1629d2ad DM |
336 | impl Drop for ArchiveIndexWriter { |
337 | ||
338 | fn drop(&mut self) { | |
339 | let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors | |
340 | } | |
341 | } | |
342 | ||
343 | impl ArchiveIndexWriter { | |
0433db19 | 344 | |
1629d2ad | 345 | pub fn create(store: Arc<ChunkStore>, path: &Path, chunk_size: usize) -> Result<Self, Error> { |
0433db19 DM |
346 | |
347 | let full_path = store.relative_path(path); | |
348 | let mut tmp_path = full_path.clone(); | |
349 | tmp_path.set_extension("tmp_aidx"); | |
350 | ||
ddbdf80d | 351 | let file = std::fs::OpenOptions::new() |
0433db19 DM |
352 | .create(true).truncate(true) |
353 | .read(true) | |
354 | .write(true) | |
355 | .open(&tmp_path)?; | |
356 | ||
5032b57b DM |
357 | let mut writer = BufWriter::with_capacity(1024*1024, file); |
358 | ||
0433db19 DM |
359 | let header_size = std::mem::size_of::<ArchiveIndexHeader>(); |
360 | ||
361 | // todo: use static assertion when available in rust | |
362 | if header_size != 4096 { panic!("got unexpected header size"); } | |
363 | ||
364 | let ctime = std::time::SystemTime::now().duration_since( | |
365 | std::time::SystemTime::UNIX_EPOCH)?.as_secs(); | |
366 | ||
367 | let uuid = Uuid::new_v4(); | |
368 | ||
369 | let mut buffer = vec![0u8; header_size]; | |
370 | let header = crate::tools::map_struct_mut::<ArchiveIndexHeader>(&mut buffer)?; | |
371 | ||
372 | header.magic = *b"PROXMOX-AIDX"; | |
373 | header.version = u32::to_le(1); | |
374 | header.ctime = u64::to_le(ctime); | |
375 | header.uuid = *uuid.as_bytes(); | |
376 | ||
5032b57b | 377 | writer.write_all(&buffer)?; |
0433db19 DM |
378 | |
379 | Ok(Self { | |
380 | store, | |
381 | chunker: Chunker::new(chunk_size), | |
5032b57b | 382 | writer: writer, |
5e7a09be | 383 | closed: false, |
0433db19 DM |
384 | filename: full_path, |
385 | tmp_filename: tmp_path, | |
386 | ctime, | |
387 | uuid: *uuid.as_bytes(), | |
388 | ||
389 | chunk_offset: 0, | |
390 | last_chunk: 0, | |
391 | chunk_buffer: Vec::with_capacity(chunk_size*4), | |
392 | }) | |
393 | } | |
5e7a09be DM |
394 | |
395 | pub fn close(&mut self) -> Result<(), Error> { | |
396 | ||
397 | if self.closed { | |
398 | bail!("cannot close already closed archive index file {:?}", self.filename); | |
399 | } | |
400 | ||
401 | self.closed = true; | |
402 | ||
403 | self.write_chunk_buffer()?; | |
404 | ||
5032b57b | 405 | self.writer.flush()?; |
5e7a09be DM |
406 | |
407 | // fixme: | |
408 | ||
409 | if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) { | |
410 | bail!("Atomic rename file {:?} failed - {}", self.filename, err); | |
411 | } | |
412 | ||
413 | Ok(()) | |
414 | } | |
415 | ||
416 | fn write_chunk_buffer(&mut self) -> Result<(), std::io::Error> { | |
417 | ||
418 | use std::io::{Error, ErrorKind}; | |
419 | ||
420 | let chunk_size = self.chunk_buffer.len(); | |
421 | ||
422 | if chunk_size == 0 { return Ok(()); } | |
423 | ||
424 | let expected_chunk_size = self.chunk_offset - self.last_chunk; | |
425 | if expected_chunk_size != self.chunk_buffer.len() { | |
426 | return Err(Error::new( | |
427 | ErrorKind::Other, | |
428 | format!("wrong chunk size {} != {}", expected_chunk_size, chunk_size))); | |
429 | } | |
430 | ||
431 | self.last_chunk = self.chunk_offset; | |
432 | ||
433 | match self.store.insert_chunk(&self.chunk_buffer) { | |
434 | Ok((is_duplicate, digest)) => { | |
22968600 | 435 | println!("ADD CHUNK {:016x} {} {} {}", self.chunk_offset, chunk_size, is_duplicate, tools::digest_to_hex(&digest)); |
5032b57b DM |
436 | self.writer.write(unsafe { &std::mem::transmute::<u64, [u8;8]>(self.chunk_offset as u64) })?; |
437 | self.writer.write(&digest)?; | |
5e7a09be DM |
438 | self.chunk_buffer.truncate(0); |
439 | return Ok(()); | |
440 | } | |
441 | Err(err) => { | |
442 | self.chunk_buffer.truncate(0); | |
443 | return Err(Error::new(ErrorKind::Other, err.to_string())); | |
444 | } | |
445 | } | |
5e7a09be | 446 | } |
0433db19 DM |
447 | } |
448 | ||
1629d2ad | 449 | impl Write for ArchiveIndexWriter { |
0433db19 DM |
450 | |
451 | fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> { | |
452 | ||
0433db19 DM |
453 | let chunker = &mut self.chunker; |
454 | ||
455 | let pos = chunker.scan(data); | |
456 | ||
457 | if pos > 0 { | |
458 | self.chunk_buffer.extend(&data[0..pos]); | |
459 | self.chunk_offset += pos; | |
460 | ||
5e7a09be DM |
461 | self.write_chunk_buffer()?; |
462 | Ok(pos) | |
0433db19 DM |
463 | |
464 | } else { | |
465 | self.chunk_offset += data.len(); | |
466 | self.chunk_buffer.extend(data); | |
5e7a09be | 467 | Ok(data.len()) |
0433db19 DM |
468 | } |
469 | } | |
470 | ||
471 | fn flush(&mut self) -> std::result::Result<(), std::io::Error> { | |
472 | ||
94a882e9 DM |
473 | use std::io::{Error, ErrorKind}; |
474 | ||
5e7a09be | 475 | Err(Error::new(ErrorKind::Other, "please use close() instead of flush()")) |
0433db19 DM |
476 | } |
477 | } |