]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/catalog.rs
src/backup/catalog.rs: new parse() helper
[proxmox-backup.git] / src / backup / catalog.rs
1 use failure::*;
2 use std::ffi::{CStr, CString};
3 use std::os::unix::ffi::OsStringExt;
4 use std::io::{Read, Write, Seek, SeekFrom};
5 use std::convert::TryFrom;
6
7 use chrono::offset::{TimeZone, Local};
8
9 use proxmox::tools::io::ReadExt;
10
11 use crate::pxar::catalog::{BackupCatalogWriter, CatalogEntryType};
12
13 enum DirEntry {
14 Directory { name: Vec<u8>, start: u64 },
15 File { name: Vec<u8>, size: u64, mtime: u64 },
16 Symlink { name: Vec<u8> },
17 Hardlink { name: Vec<u8> },
18 BlockDevice { name: Vec<u8> },
19 CharDevice { name: Vec<u8> },
20 Fifo { name: Vec<u8> },
21 Socket { name: Vec<u8> },
22 }
23
24 struct DirInfo {
25 name: CString,
26 entries: Vec<DirEntry>,
27 }
28
29 impl DirInfo {
30
31 fn new(name: CString) -> Self {
32 DirInfo { name, entries: Vec::new() }
33 }
34
35 fn new_rootdir() -> Self {
36 DirInfo::new(CString::new(b"/".to_vec()).unwrap())
37 }
38
39 fn encode_entry<W: Write>(
40 writer: &mut W,
41 entry: &DirEntry,
42 pos: u64,
43 ) -> Result<(), Error> {
44 match entry {
45 DirEntry::Directory { name, start } => {
46 writer.write_all(&[CatalogEntryType::Directory as u8])?;
47 catalog_encode_u64(writer, name.len() as u64)?;
48 writer.write_all(name)?;
49 catalog_encode_u64(writer, pos - start)?;
50 }
51 DirEntry::File { name, size, mtime } => {
52 writer.write_all(&[CatalogEntryType::File as u8])?;
53 catalog_encode_u64(writer, name.len() as u64)?;
54 writer.write_all(name)?;
55 catalog_encode_u64(writer, *size)?;
56 catalog_encode_u64(writer, *mtime)?;
57 }
58 DirEntry::Symlink { name } => {
59 writer.write_all(&[CatalogEntryType::Symlink as u8])?;
60 catalog_encode_u64(writer, name.len() as u64)?;
61 writer.write_all(name)?;
62 }
63 DirEntry::Hardlink { name } => {
64 writer.write_all(&[CatalogEntryType::Hardlink as u8])?;
65 catalog_encode_u64(writer, name.len() as u64)?;
66 writer.write_all(name)?;
67 }
68 DirEntry::BlockDevice { name } => {
69 writer.write_all(&[CatalogEntryType::BlockDevice as u8])?;
70 catalog_encode_u64(writer, name.len() as u64)?;
71 writer.write_all(name)?;
72 }
73 DirEntry::CharDevice { name } => {
74 writer.write_all(&[CatalogEntryType::CharDevice as u8])?;
75 catalog_encode_u64(writer, name.len() as u64)?;
76 writer.write_all(name)?;
77 }
78 DirEntry::Fifo { name } => {
79 writer.write_all(&[CatalogEntryType::Fifo as u8])?;
80 catalog_encode_u64(writer, name.len() as u64)?;
81 writer.write_all(name)?;
82 }
83 DirEntry::Socket { name } => {
84 writer.write_all(&[CatalogEntryType::Socket as u8])?;
85 catalog_encode_u64(writer, name.len() as u64)?;
86 writer.write_all(name)?;
87 }
88 }
89 Ok(())
90 }
91
92 fn encode(self, start: u64) -> Result<(CString, Vec<u8>), Error> {
93 let mut table = Vec::new();
94 catalog_encode_u64(&mut table, self.entries.len() as u64)?;
95 for entry in self.entries {
96 Self::encode_entry(&mut table, &entry, start)?;
97 }
98
99 let mut data = Vec::new();
100 catalog_encode_u64(&mut data, table.len() as u64)?;
101 data.extend_from_slice(&table);
102
103 Ok((self.name, data))
104 }
105
106 fn parse<C: FnMut(CatalogEntryType, Vec<u8>, u64, u64, u64) -> Result<(), Error>>(
107 data: &[u8],
108 mut callback: C,
109 ) -> Result<(), Error> {
110
111 let mut cursor = data;
112
113 let entries = catalog_decode_u64(&mut cursor)?;
114
115 for _ in 0..entries {
116
117 let mut buf = [ 0u8 ];
118 cursor.read_exact(&mut buf)?;
119 let etype = CatalogEntryType::try_from(buf[0])?;
120
121 let name_len = catalog_decode_u64(&mut cursor)?;
122 let name = cursor.read_exact_allocated(name_len as usize)?;
123
124 match etype {
125 CatalogEntryType::Directory => {
126 let offset = catalog_decode_u64(&mut cursor)?;
127 callback(etype, name, offset, 0, 0)?;
128 }
129 CatalogEntryType::File => {
130 let size = catalog_decode_u64(&mut cursor)?;
131 let mtime = catalog_decode_u64(&mut cursor)?;
132 callback(etype, name, 0, size, mtime)?;
133 }
134 _ => {
135 callback(etype, name, 0, 0, 0)?;
136 }
137 }
138 }
139
140 if !cursor.is_empty() {
141 bail!("unable to parse whole catalog data block");
142 }
143
144 Ok(())
145 }
146 }
147
148 pub struct CatalogWriter<W> {
149 writer: W,
150 dirstack: Vec<DirInfo>,
151 pos: u64,
152 }
153
154 impl <W: Write> CatalogWriter<W> {
155
156 pub fn new(writer: W) -> Result<Self, Error> {
157 Ok(Self { writer, dirstack: vec![ DirInfo::new_rootdir() ], pos: 0 })
158 }
159
160 pub fn finish(&mut self) -> Result<(), Error> {
161 if self.dirstack.len() != 1 {
162 bail!("unable to finish catalog at level {}", self.dirstack.len());
163 }
164
165 let dir = self.dirstack.pop().unwrap();
166
167 let start = self.pos;
168 let (_, data) = dir.encode(start)?;
169 self.write_all(&data)?;
170
171 self.write_all(&start.to_le_bytes())?;
172
173 self.writer.flush()?;
174
175 Ok(())
176 }
177 }
178
179 impl <W: Write> BackupCatalogWriter for CatalogWriter<W> {
180
181 fn start_directory(&mut self, name: &CStr) -> Result<(), Error> {
182 let new = DirInfo::new(name.to_owned());
183 self.dirstack.push(new);
184 Ok(())
185 }
186
187 fn end_directory(&mut self) -> Result<(), Error> {
188 let (start, name) = match self.dirstack.pop() {
189 Some(dir) => {
190 let start = self.pos;
191 let (name, data) = dir.encode(start)?;
192 self.write_all(&data)?;
193 (start, name)
194 }
195 None => {
196 bail!("got unexpected end_directory level 0");
197 }
198 };
199
200 let current = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
201 let name = name.to_bytes().to_vec();
202 current.entries.push(DirEntry::Directory { name, start });
203
204 Ok(())
205 }
206
207 fn add_file(&mut self, name: &CStr, size: u64, mtime: u64) -> Result<(), Error> {
208 let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
209 let name = name.to_bytes().to_vec();
210 dir.entries.push(DirEntry::File { name, size, mtime });
211 Ok(())
212 }
213
214 fn add_symlink(&mut self, name: &CStr) -> Result<(), Error> {
215 let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
216 let name = name.to_bytes().to_vec();
217 dir.entries.push(DirEntry::Symlink { name });
218 Ok(())
219 }
220
221 fn add_hardlink(&mut self, name: &CStr) -> Result<(), Error> {
222 let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
223 let name = name.to_bytes().to_vec();
224 dir.entries.push(DirEntry::Hardlink { name });
225 Ok(())
226 }
227
228 fn add_block_device(&mut self, name: &CStr) -> Result<(), Error> {
229 let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
230 let name = name.to_bytes().to_vec();
231 dir.entries.push(DirEntry::BlockDevice { name });
232 Ok(())
233 }
234
235 fn add_char_device(&mut self, name: &CStr) -> Result<(), Error> {
236 let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
237 let name = name.to_bytes().to_vec();
238 dir.entries.push(DirEntry::CharDevice { name });
239 Ok(())
240 }
241
242 fn add_fifo(&mut self, name: &CStr) -> Result<(), Error> {
243 let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
244 let name = name.to_bytes().to_vec();
245 dir.entries.push(DirEntry::Fifo { name });
246 Ok(())
247 }
248
249 fn add_socket(&mut self, name: &CStr) -> Result<(), Error> {
250 let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
251 let name = name.to_bytes().to_vec();
252 dir.entries.push(DirEntry::Socket { name });
253 Ok(())
254 }
255 }
256
257 impl<W: Write> CatalogWriter<W> {
258 fn write_all(&mut self, data: &[u8]) -> Result<(), Error> {
259 self.writer.write_all(data)?;
260 self.pos += u64::try_from(data.len())?;
261 Ok(())
262 }
263 }
264
265 // fixme: move to somehere else?
266 /// Implement Write to tokio mpsc channel Sender
267 pub struct SenderWriter(tokio::sync::mpsc::Sender<Result<Vec<u8>, Error>>);
268
269 impl SenderWriter {
270 pub fn new(sender: tokio::sync::mpsc::Sender<Result<Vec<u8>, Error>>) -> Self {
271 Self(sender)
272 }
273 }
274
275 impl Write for SenderWriter {
276 fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
277 futures::executor::block_on(async move {
278 self.0.send(Ok(buf.to_vec())).await
279 .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))?;
280 Ok(buf.len())
281 })
282 }
283
284 fn flush(&mut self) -> Result<(), std::io::Error> {
285 Ok(())
286 }
287 }
288
289 pub struct CatalogReader<R> {
290 reader: R,
291 }
292
293 impl <R: Read + Seek> CatalogReader<R> {
294
295 pub fn new(reader: R) -> Self {
296 Self { reader }
297 }
298
299 fn next_byte<C: Read>(reader: &mut C) -> Result<u8, std::io::Error> {
300 let mut buf = [0u8; 1];
301 reader.read_exact(&mut buf)?;
302 Ok(buf[0])
303 }
304
305 pub fn dump(&mut self) -> Result<(), Error> {
306
307 self.reader.seek(SeekFrom::End(-8))?;
308
309 let start = unsafe { self.reader.read_le_value::<u64>()? };
310
311 self.dump_dir(std::path::Path::new("./"), start)
312 }
313
314 pub fn dump_dir(&mut self, prefix: &std::path::Path, start: u64) -> Result<(), Error> {
315
316 self.reader.seek(SeekFrom::Start(start))?;
317
318 let size = catalog_decode_u64(&mut self.reader)?;
319
320 if size < 1 { bail!("got small directory size {}", size) };
321
322 let data = self.reader.read_exact_allocated(size as usize)?;
323
324 DirInfo::parse(&data, |etype, name, offset, size, mtime| {
325
326 let mut path = std::path::PathBuf::from(prefix);
327 path.push(std::ffi::OsString::from_vec(name));
328
329 match etype {
330 CatalogEntryType::Directory => {
331 println!("{} {:?}", char::from(etype as u8), path);
332 if offset > start {
333 bail!("got wrong directory offset ({} > {})", offset, start);
334 }
335 let pos = start - offset;
336 self.dump_dir(&path, pos)?;
337 }
338 CatalogEntryType::File => {
339 let dt = Local.timestamp(mtime as i64, 0);
340
341 println!(
342 "{} {:?} {} {}",
343 char::from(etype as u8),
344 path,
345 size,
346 dt.to_rfc3339_opts(chrono::SecondsFormat::Secs, false),
347 );
348 }
349 _ => {
350 println!("{} {:?}", char::from(etype as u8), path);
351 }
352 }
353
354 Ok(())
355 })
356 }
357 }
358
359 /// Serialize u64 as short, variable length byte sequence
360 ///
361 /// Stores 7 bits per byte, Bit 8 indicates the end of the sequence (when not set).
362 /// We limit values to a maximum of 2^63.
363 pub fn catalog_encode_u64<W: Write>(writer: &mut W, v: u64) -> Result<(), Error> {
364 let mut enc = Vec::new();
365
366 if (v & (1<<63)) != 0 { bail!("catalog_encode_u64 failed - value >= 2^63"); }
367 let mut d = v;
368 loop {
369 if d < 128 {
370 enc.push(d as u8);
371 break;
372 }
373 enc.push((128 | (d & 127)) as u8);
374 d = d >> 7;
375 }
376 writer.write_all(&enc)?;
377
378 Ok(())
379 }
380
381 /// Deserialize u64 from variable length byte sequence
382 ///
383 /// We currently read maximal 9 bytes, which give a maximum of 63 bits.
384 pub fn catalog_decode_u64<R: Read>(reader: &mut R) -> Result<u64, Error> {
385
386 let mut v: u64 = 0;
387 let mut buf = [0u8];
388
389 for i in 0..9 { // only allow 9 bytes (63 bits)
390 if buf.is_empty() {
391 bail!("decode_u64 failed - unexpected EOB");
392 }
393 reader.read_exact(&mut buf)?;
394 let t = buf[0];
395 if t < 128 {
396 v |= (t as u64) << (i*7);
397 return Ok(v);
398 } else {
399 v |= ((t & 127) as u64) << (i*7);
400 }
401 }
402
403 bail!("decode_u64 failed - missing end marker");
404 }
405
406 #[test]
407 fn test_catalog_u64_encoder() {
408
409 fn test_encode_decode(value: u64) {
410
411 let mut data = Vec::new();
412 catalog_encode_u64(&mut data, value).unwrap();
413
414 //println!("ENCODE {} {:?}", value, data);
415
416 let slice = &mut &data[..];
417 let decoded = catalog_decode_u64(slice).unwrap();
418
419 //println!("DECODE {}", decoded);
420
421 assert!(decoded == value);
422 }
423
424 test_encode_decode(126);
425 test_encode_decode((1<<12)-1);
426 test_encode_decode((1<<20)-1);
427 test_encode_decode((1<<50)-1);
428 test_encode_decode((1<<63)-1);
429 }