]> git.proxmox.com Git - proxmox-backup.git/blame - src/backup/index.rs
switch to external pxar and fuse crates
[proxmox-backup.git] / src / backup / index.rs
CommitLineData
fa17b1ce 1use std::collections::HashMap;
c443f58b 2use std::ops::Range;
745e652a
WB
3use std::pin::Pin;
4use std::task::{Context, Poll};
fa17b1ce
WB
5
6use bytes::{Bytes, BytesMut};
f7d4e4b5 7use anyhow::{format_err, Error};
ee53955f 8use futures::*;
ee53955f 9
c443f58b
WB
10pub struct ChunkReadInfo {
11 pub range: Range<u64>,
12 pub digest: [u8; 32],
13}
14
15impl ChunkReadInfo {
16 #[inline]
17 pub fn size(&self) -> u64 {
18 self.range.end - self.range.start
19 }
20}
21
a660978c
DM
22/// Trait to get digest list from index files
23///
24/// To allow easy iteration over all used chunks.
5c1130df 25pub trait IndexFile {
7bc1d727
WB
26 fn index_count(&self) -> usize;
27 fn index_digest(&self, pos: usize) -> Option<&[u8; 32]>;
a660978c 28 fn index_bytes(&self) -> u64;
f4bf7dfc
DM
29
30 /// Returns most often used chunks
31 fn find_most_used_chunks(&self, max: usize) -> HashMap<[u8; 32], usize> {
32 let mut map = HashMap::new();
33
34 for pos in 0..self.index_count() {
35 let digest = self.index_digest(pos).unwrap();
36
37 let count = map.entry(*digest).or_insert(0);
38 *count += 1;
39 }
40
41 let mut most_used = Vec::new();
42
43 for (digest, count) in map {
44 if count <= 1 { continue; }
45 match most_used.binary_search_by_key(&count, |&(_digest, count)| count) {
46 Ok(p) => most_used.insert(p, (digest, count)),
47 Err(p) => most_used.insert(p, (digest, count)),
48 }
49
50 if most_used.len() > max { let _ = most_used.pop(); }
51 }
52
53 let mut map = HashMap::new();
54
55 for data in most_used {
56 map.insert(data.0, data.1);
57 }
58
59 map
60 }
7bc1d727 61}
457531e7 62
7f3d2ffa 63/// Encode digest list from an `IndexFile` into a binary stream
457531e7
DM
64///
65/// The reader simply returns a birary stream of 32 byte digest values.
7f3d2ffa 66pub struct DigestListEncoder {
5c1130df 67 index: Box<dyn IndexFile + Send + Sync>,
457531e7
DM
68 pos: usize,
69 count: usize,
70}
71
7f3d2ffa 72impl DigestListEncoder {
457531e7 73
5c1130df 74 pub fn new(index: Box<dyn IndexFile + Send + Sync>) -> Self {
457531e7
DM
75 let count = index.index_count();
76 Self { index, pos: 0, count }
77 }
78}
79
7f3d2ffa 80impl std::io::Read for DigestListEncoder {
457531e7 81 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
f93b55b0
WB
82 if buf.len() < 32 {
83 panic!("read buffer too small");
84 }
85
457531e7
DM
86 if self.pos < self.count {
87 let mut written = 0;
88 loop {
89 let digest = self.index.index_digest(self.pos).unwrap();
f93b55b0 90 buf[written..(written + 32)].copy_from_slice(digest);
457531e7
DM
91 self.pos += 1;
92 written += 32;
f93b55b0
WB
93 if self.pos >= self.count {
94 break;
95 }
96 if (written + 32) >= buf.len() {
97 break;
98 }
457531e7 99 }
62ee2eb4 100 Ok(written)
457531e7 101 } else {
62ee2eb4 102 Ok(0)
457531e7
DM
103 }
104 }
105}
ee53955f
DM
106
107/// Decodes a Stream<Item=Bytes> into Stream<Item=<[u8;32]>
108///
109/// The reader simply returns a birary stream of 32 byte digest values.
110
745e652a 111pub struct DigestListDecoder<S: Unpin> {
ee53955f
DM
112 input: S,
113 buffer: BytesMut,
114}
115
745e652a 116impl<S: Unpin> DigestListDecoder<S> {
ee53955f
DM
117 pub fn new(input: S) -> Self {
118 Self { input, buffer: BytesMut::new() }
119 }
120}
121
745e652a 122impl<S: Unpin> Unpin for DigestListDecoder<S> {}
ee53955f 123
745e652a
WB
124impl<S: Unpin, E> Stream for DigestListDecoder<S>
125where
126 S: Stream<Item=Result<Bytes, E>>,
127 E: Into<Error>,
128{
129 type Item = Result<[u8; 32], Error>;
ee53955f 130
745e652a
WB
131 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
132 let this = self.get_mut();
ee53955f 133
745e652a
WB
134 loop {
135 if this.buffer.len() >= 32 {
136 let left = this.buffer.split_to(32);
ee53955f 137
fa17b1ce
WB
138 let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit();
139 unsafe {
140 (*digest.as_mut_ptr()).copy_from_slice(&left[..]);
745e652a 141 return Poll::Ready(Some(Ok(digest.assume_init())));
fa17b1ce 142 }
ee53955f
DM
143 }
144
745e652a
WB
145 match Pin::new(&mut this.input).poll_next(cx) {
146 Poll::Pending => {
147 return Poll::Pending;
ee53955f 148 }
745e652a
WB
149 Poll::Ready(Some(Err(err))) => {
150 return Poll::Ready(Some(Err(err.into())));
ee53955f 151 }
745e652a
WB
152 Poll::Ready(Some(Ok(data))) => {
153 this.buffer.extend_from_slice(&data);
ee53955f
DM
154 // continue
155 }
745e652a
WB
156 Poll::Ready(None) => {
157 let rest = this.buffer.len();
158 if rest == 0 {
159 return Poll::Ready(None);
160 }
161 return Poll::Ready(Some(Err(format_err!(
162 "got small digest ({} != 32).",
163 rest,
164 ))));
165 }
ee53955f
DM
166 }
167 }
168 }
169}