]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/index.rs
4398161c8b44300675613e352991afa26d959d05
[proxmox-backup.git] / src / backup / index.rs
1 use std::collections::HashMap;
2 use std::pin::Pin;
3 use std::task::{Context, Poll};
4
5 use bytes::{Bytes, BytesMut};
6 use anyhow::{format_err, Error};
7 use futures::*;
8
9 /// Trait to get digest list from index files
10 ///
11 /// To allow easy iteration over all used chunks.
12 pub trait IndexFile {
13 fn index_count(&self) -> usize;
14 fn index_digest(&self, pos: usize) -> Option<&[u8; 32]>;
15 fn index_bytes(&self) -> u64;
16
17 /// Returns most often used chunks
18 fn find_most_used_chunks(&self, max: usize) -> HashMap<[u8; 32], usize> {
19 let mut map = HashMap::new();
20
21 for pos in 0..self.index_count() {
22 let digest = self.index_digest(pos).unwrap();
23
24 let count = map.entry(*digest).or_insert(0);
25 *count += 1;
26 }
27
28 let mut most_used = Vec::new();
29
30 for (digest, count) in map {
31 if count <= 1 { continue; }
32 match most_used.binary_search_by_key(&count, |&(_digest, count)| count) {
33 Ok(p) => most_used.insert(p, (digest, count)),
34 Err(p) => most_used.insert(p, (digest, count)),
35 }
36
37 if most_used.len() > max { let _ = most_used.pop(); }
38 }
39
40 let mut map = HashMap::new();
41
42 for data in most_used {
43 map.insert(data.0, data.1);
44 }
45
46 map
47 }
48 }
49
50 /// Encode digest list from an `IndexFile` into a binary stream
51 ///
52 /// The reader simply returns a birary stream of 32 byte digest values.
53 pub struct DigestListEncoder {
54 index: Box<dyn IndexFile + Send + Sync>,
55 pos: usize,
56 count: usize,
57 }
58
59 impl DigestListEncoder {
60
61 pub fn new(index: Box<dyn IndexFile + Send + Sync>) -> Self {
62 let count = index.index_count();
63 Self { index, pos: 0, count }
64 }
65 }
66
67 impl std::io::Read for DigestListEncoder {
68 fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
69 if buf.len() < 32 {
70 panic!("read buffer too small");
71 }
72
73 if self.pos < self.count {
74 let mut written = 0;
75 loop {
76 let digest = self.index.index_digest(self.pos).unwrap();
77 buf[written..(written + 32)].copy_from_slice(digest);
78 self.pos += 1;
79 written += 32;
80 if self.pos >= self.count {
81 break;
82 }
83 if (written + 32) >= buf.len() {
84 break;
85 }
86 }
87 Ok(written)
88 } else {
89 Ok(0)
90 }
91 }
92 }
93
94 /// Decodes a Stream<Item=Bytes> into Stream<Item=<[u8;32]>
95 ///
96 /// The reader simply returns a birary stream of 32 byte digest values.
97
98 pub struct DigestListDecoder<S: Unpin> {
99 input: S,
100 buffer: BytesMut,
101 }
102
103 impl<S: Unpin> DigestListDecoder<S> {
104 pub fn new(input: S) -> Self {
105 Self { input, buffer: BytesMut::new() }
106 }
107 }
108
109 impl<S: Unpin> Unpin for DigestListDecoder<S> {}
110
111 impl<S: Unpin, E> Stream for DigestListDecoder<S>
112 where
113 S: Stream<Item=Result<Bytes, E>>,
114 E: Into<Error>,
115 {
116 type Item = Result<[u8; 32], Error>;
117
118 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
119 let this = self.get_mut();
120
121 loop {
122 if this.buffer.len() >= 32 {
123 let left = this.buffer.split_to(32);
124
125 let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit();
126 unsafe {
127 (*digest.as_mut_ptr()).copy_from_slice(&left[..]);
128 return Poll::Ready(Some(Ok(digest.assume_init())));
129 }
130 }
131
132 match Pin::new(&mut this.input).poll_next(cx) {
133 Poll::Pending => {
134 return Poll::Pending;
135 }
136 Poll::Ready(Some(Err(err))) => {
137 return Poll::Ready(Some(Err(err.into())));
138 }
139 Poll::Ready(Some(Ok(data))) => {
140 this.buffer.extend_from_slice(&data);
141 // continue
142 }
143 Poll::Ready(None) => {
144 let rest = this.buffer.len();
145 if rest == 0 {
146 return Poll::Ready(None);
147 }
148 return Poll::Ready(Some(Err(format_err!(
149 "got small digest ({} != 32).",
150 rest,
151 ))));
152 }
153 }
154 }
155 }
156 }