]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/index.rs
bc20557481a26da720c6fbadfdb62d93cafe82ab
[proxmox-backup.git] / src / backup / index.rs
1 use failure::*;
2 use futures::*;
3 use bytes::{Bytes, BytesMut};
4
5 /// Trait to get digest list from index files
6 ///
7 /// To allow easy iteration over all used chunks.
8 pub trait IndexFile: Send {
9 fn index_count(&self) -> usize;
10 fn index_digest(&self, pos: usize) -> Option<&[u8; 32]>;
11 fn index_bytes(&self) -> u64;
12 }
13
14 /// Encode digest list from an `IndexFile` into a binary stream
15 ///
16 /// The reader simply returns a birary stream of 32 byte digest values.
17 pub struct DigestListEncoder {
18 index: Box<dyn IndexFile>,
19 pos: usize,
20 count: usize,
21 }
22
23 impl DigestListEncoder {
24
25 pub fn new(index: Box<dyn IndexFile>) -> Self {
26 let count = index.index_count();
27 Self { index, pos: 0, count }
28 }
29 }
30
31 impl std::io::Read for DigestListEncoder {
32
33 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
34 if buf.len() < 32 { panic!("read buffer too small"); }
35 if self.pos < self.count {
36 let mut written = 0;
37 loop {
38 let digest = self.index.index_digest(self.pos).unwrap();
39 unsafe { std::ptr::copy_nonoverlapping(digest.as_ptr(), buf.as_mut_ptr().add(written), 32); }
40 self.pos += 1;
41 written += 32;
42 if self.pos >= self.count { break; }
43 if (written + 32) >= buf.len() { break; }
44 }
45 return Ok(written);
46 } else {
47 return Ok(0);
48 }
49 }
50 }
51
52 /// Decodes a Stream<Item=Bytes> into Stream<Item=<[u8;32]>
53 ///
54 /// The reader simply returns a birary stream of 32 byte digest values.
55
56 pub struct DigestListDecoder<S> {
57 input: S,
58 buffer: BytesMut,
59 }
60
61 impl <S> DigestListDecoder<S> {
62
63 pub fn new(input: S) -> Self {
64 Self { input, buffer: BytesMut::new() }
65 }
66 }
67
68 impl <S> Stream for DigestListDecoder<S>
69 where S: Stream<Item=Bytes>,
70 S::Error: Into<Error>,
71 {
72 type Item = [u8; 32];
73 type Error = Error;
74
75 fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
76 loop {
77
78 if self.buffer.len() >= 32 {
79
80 let left = self.buffer.split_to(32);
81
82 let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() };
83 unsafe { std::ptr::copy_nonoverlapping(left.as_ptr(), digest.as_mut_ptr(), 32); }
84
85 return Ok(Async::Ready(Some(digest)));
86 }
87
88 match self.input.poll() {
89 Err(err) => {
90 return Err(err.into());
91 }
92 Ok(Async::NotReady) => {
93 return Ok(Async::NotReady);
94 }
95 Ok(Async::Ready(None)) => {
96 let rest = self.buffer.len();
97 if rest == 0 { return Ok(Async::Ready(None)); }
98 return Err(format_err!("got small digest ({} != 32).", rest));
99 }
100 Ok(Async::Ready(Some(data))) => {
101 self.buffer.extend_from_slice(&data);
102 // continue
103 }
104 }
105 }
106 }
107 }