]>
Commit | Line | Data |
---|---|---|
fa17b1ce | 1 | use std::collections::HashMap; |
c443f58b | 2 | use std::ops::Range; |
745e652a WB |
3 | use std::pin::Pin; |
4 | use std::task::{Context, Poll}; | |
fa17b1ce WB |
5 | |
6 | use bytes::{Bytes, BytesMut}; | |
f7d4e4b5 | 7 | use anyhow::{format_err, Error}; |
ee53955f | 8 | use futures::*; |
ee53955f | 9 | |
c443f58b WB |
10 | pub struct ChunkReadInfo { |
11 | pub range: Range<u64>, | |
12 | pub digest: [u8; 32], | |
13 | } | |
14 | ||
15 | impl ChunkReadInfo { | |
16 | #[inline] | |
17 | pub fn size(&self) -> u64 { | |
18 | self.range.end - self.range.start | |
19 | } | |
20 | } | |
21 | ||
a660978c DM |
22 | /// Trait to get digest list from index files |
23 | /// | |
24 | /// To allow easy iteration over all used chunks. | |
5c1130df | 25 | pub trait IndexFile { |
7bc1d727 WB |
26 | fn index_count(&self) -> usize; |
27 | fn index_digest(&self, pos: usize) -> Option<&[u8; 32]>; | |
a660978c | 28 | fn index_bytes(&self) -> u64; |
f4bf7dfc DM |
29 | |
30 | /// Returns most often used chunks | |
31 | fn find_most_used_chunks(&self, max: usize) -> HashMap<[u8; 32], usize> { | |
32 | let mut map = HashMap::new(); | |
33 | ||
34 | for pos in 0..self.index_count() { | |
35 | let digest = self.index_digest(pos).unwrap(); | |
36 | ||
37 | let count = map.entry(*digest).or_insert(0); | |
38 | *count += 1; | |
39 | } | |
40 | ||
41 | let mut most_used = Vec::new(); | |
42 | ||
43 | for (digest, count) in map { | |
44 | if count <= 1 { continue; } | |
45 | match most_used.binary_search_by_key(&count, |&(_digest, count)| count) { | |
46 | Ok(p) => most_used.insert(p, (digest, count)), | |
47 | Err(p) => most_used.insert(p, (digest, count)), | |
48 | } | |
49 | ||
50 | if most_used.len() > max { let _ = most_used.pop(); } | |
51 | } | |
52 | ||
53 | let mut map = HashMap::new(); | |
54 | ||
55 | for data in most_used { | |
56 | map.insert(data.0, data.1); | |
57 | } | |
58 | ||
59 | map | |
60 | } | |
7bc1d727 | 61 | } |
457531e7 | 62 | |
7f3d2ffa | 63 | /// Encode digest list from an `IndexFile` into a binary stream |
457531e7 DM |
64 | /// |
65 | /// The reader simply returns a birary stream of 32 byte digest values. | |
7f3d2ffa | 66 | pub struct DigestListEncoder { |
5c1130df | 67 | index: Box<dyn IndexFile + Send + Sync>, |
457531e7 DM |
68 | pos: usize, |
69 | count: usize, | |
70 | } | |
71 | ||
7f3d2ffa | 72 | impl DigestListEncoder { |
457531e7 | 73 | |
5c1130df | 74 | pub fn new(index: Box<dyn IndexFile + Send + Sync>) -> Self { |
457531e7 DM |
75 | let count = index.index_count(); |
76 | Self { index, pos: 0, count } | |
77 | } | |
78 | } | |
79 | ||
7f3d2ffa | 80 | impl std::io::Read for DigestListEncoder { |
457531e7 | 81 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { |
f93b55b0 WB |
82 | if buf.len() < 32 { |
83 | panic!("read buffer too small"); | |
84 | } | |
85 | ||
457531e7 DM |
86 | if self.pos < self.count { |
87 | let mut written = 0; | |
88 | loop { | |
89 | let digest = self.index.index_digest(self.pos).unwrap(); | |
f93b55b0 | 90 | buf[written..(written + 32)].copy_from_slice(digest); |
457531e7 DM |
91 | self.pos += 1; |
92 | written += 32; | |
f93b55b0 WB |
93 | if self.pos >= self.count { |
94 | break; | |
95 | } | |
96 | if (written + 32) >= buf.len() { | |
97 | break; | |
98 | } | |
457531e7 | 99 | } |
62ee2eb4 | 100 | Ok(written) |
457531e7 | 101 | } else { |
62ee2eb4 | 102 | Ok(0) |
457531e7 DM |
103 | } |
104 | } | |
105 | } | |
ee53955f DM |
106 | |
107 | /// Decodes a Stream<Item=Bytes> into Stream<Item=<[u8;32]> | |
108 | /// | |
109 | /// The reader simply returns a birary stream of 32 byte digest values. | |
110 | ||
745e652a | 111 | pub struct DigestListDecoder<S: Unpin> { |
ee53955f DM |
112 | input: S, |
113 | buffer: BytesMut, | |
114 | } | |
115 | ||
745e652a | 116 | impl<S: Unpin> DigestListDecoder<S> { |
ee53955f DM |
117 | pub fn new(input: S) -> Self { |
118 | Self { input, buffer: BytesMut::new() } | |
119 | } | |
120 | } | |
121 | ||
745e652a | 122 | impl<S: Unpin> Unpin for DigestListDecoder<S> {} |
ee53955f | 123 | |
745e652a WB |
124 | impl<S: Unpin, E> Stream for DigestListDecoder<S> |
125 | where | |
126 | S: Stream<Item=Result<Bytes, E>>, | |
127 | E: Into<Error>, | |
128 | { | |
129 | type Item = Result<[u8; 32], Error>; | |
ee53955f | 130 | |
745e652a WB |
131 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { |
132 | let this = self.get_mut(); | |
ee53955f | 133 | |
745e652a WB |
134 | loop { |
135 | if this.buffer.len() >= 32 { | |
136 | let left = this.buffer.split_to(32); | |
ee53955f | 137 | |
fa17b1ce WB |
138 | let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit(); |
139 | unsafe { | |
140 | (*digest.as_mut_ptr()).copy_from_slice(&left[..]); | |
745e652a | 141 | return Poll::Ready(Some(Ok(digest.assume_init()))); |
fa17b1ce | 142 | } |
ee53955f DM |
143 | } |
144 | ||
745e652a WB |
145 | match Pin::new(&mut this.input).poll_next(cx) { |
146 | Poll::Pending => { | |
147 | return Poll::Pending; | |
ee53955f | 148 | } |
745e652a WB |
149 | Poll::Ready(Some(Err(err))) => { |
150 | return Poll::Ready(Some(Err(err.into()))); | |
ee53955f | 151 | } |
745e652a WB |
152 | Poll::Ready(Some(Ok(data))) => { |
153 | this.buffer.extend_from_slice(&data); | |
ee53955f DM |
154 | // continue |
155 | } | |
745e652a WB |
156 | Poll::Ready(None) => { |
157 | let rest = this.buffer.len(); | |
158 | if rest == 0 { | |
159 | return Poll::Ready(None); | |
160 | } | |
161 | return Poll::Ready(Some(Err(format_err!( | |
162 | "got small digest ({} != 32).", | |
163 | rest, | |
164 | )))); | |
165 | } | |
ee53955f DM |
166 | } |
167 | } | |
168 | } | |
169 | } |