]>
Commit | Line | Data |
---|---|---|
fa17b1ce WB |
1 | use std::collections::HashMap; |
2 | ||
3 | use bytes::{Bytes, BytesMut}; | |
ee53955f DM |
4 | use failure::*; |
5 | use futures::*; | |
ee53955f | 6 | |
a660978c DM |
7 | /// Trait to get digest list from index files |
8 | /// | |
9 | /// To allow easy iteration over all used chunks. | |
457531e7 | 10 | pub trait IndexFile: Send { |
7bc1d727 WB |
11 | fn index_count(&self) -> usize; |
12 | fn index_digest(&self, pos: usize) -> Option<&[u8; 32]>; | |
a660978c | 13 | fn index_bytes(&self) -> u64; |
f4bf7dfc DM |
14 | |
15 | /// Returns most often used chunks | |
16 | fn find_most_used_chunks(&self, max: usize) -> HashMap<[u8; 32], usize> { | |
17 | let mut map = HashMap::new(); | |
18 | ||
19 | for pos in 0..self.index_count() { | |
20 | let digest = self.index_digest(pos).unwrap(); | |
21 | ||
22 | let count = map.entry(*digest).or_insert(0); | |
23 | *count += 1; | |
24 | } | |
25 | ||
26 | let mut most_used = Vec::new(); | |
27 | ||
28 | for (digest, count) in map { | |
29 | if count <= 1 { continue; } | |
30 | match most_used.binary_search_by_key(&count, |&(_digest, count)| count) { | |
31 | Ok(p) => most_used.insert(p, (digest, count)), | |
32 | Err(p) => most_used.insert(p, (digest, count)), | |
33 | } | |
34 | ||
35 | if most_used.len() > max { let _ = most_used.pop(); } | |
36 | } | |
37 | ||
38 | let mut map = HashMap::new(); | |
39 | ||
40 | for data in most_used { | |
41 | map.insert(data.0, data.1); | |
42 | } | |
43 | ||
44 | map | |
45 | } | |
7bc1d727 | 46 | } |
457531e7 | 47 | |
7f3d2ffa | 48 | /// Encode digest list from an `IndexFile` into a binary stream |
457531e7 DM |
49 | /// |
50 | /// The reader simply returns a birary stream of 32 byte digest values. | |
7f3d2ffa | 51 | pub struct DigestListEncoder { |
457531e7 DM |
52 | index: Box<dyn IndexFile>, |
53 | pos: usize, | |
54 | count: usize, | |
55 | } | |
56 | ||
7f3d2ffa | 57 | impl DigestListEncoder { |
457531e7 DM |
58 | |
59 | pub fn new(index: Box<dyn IndexFile>) -> Self { | |
60 | let count = index.index_count(); | |
61 | Self { index, pos: 0, count } | |
62 | } | |
63 | } | |
64 | ||
7f3d2ffa | 65 | impl std::io::Read for DigestListEncoder { |
457531e7 | 66 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { |
f93b55b0 WB |
67 | if buf.len() < 32 { |
68 | panic!("read buffer too small"); | |
69 | } | |
70 | ||
457531e7 DM |
71 | if self.pos < self.count { |
72 | let mut written = 0; | |
73 | loop { | |
74 | let digest = self.index.index_digest(self.pos).unwrap(); | |
f93b55b0 | 75 | buf[written..(written + 32)].copy_from_slice(digest); |
457531e7 DM |
76 | self.pos += 1; |
77 | written += 32; | |
f93b55b0 WB |
78 | if self.pos >= self.count { |
79 | break; | |
80 | } | |
81 | if (written + 32) >= buf.len() { | |
82 | break; | |
83 | } | |
457531e7 DM |
84 | } |
85 | return Ok(written); | |
86 | } else { | |
87 | return Ok(0); | |
88 | } | |
89 | } | |
90 | } | |
ee53955f DM |
91 | |
92 | /// Decodes a Stream<Item=Bytes> into Stream<Item=<[u8;32]> | |
93 | /// | |
94 | /// The reader simply returns a birary stream of 32 byte digest values. | |
95 | ||
96 | pub struct DigestListDecoder<S> { | |
97 | input: S, | |
98 | buffer: BytesMut, | |
99 | } | |
100 | ||
101 | impl <S> DigestListDecoder<S> { | |
102 | ||
103 | pub fn new(input: S) -> Self { | |
104 | Self { input, buffer: BytesMut::new() } | |
105 | } | |
106 | } | |
107 | ||
108 | impl <S> Stream for DigestListDecoder<S> | |
109 | where S: Stream<Item=Bytes>, | |
110 | S::Error: Into<Error>, | |
111 | { | |
112 | type Item = [u8; 32]; | |
113 | type Error = Error; | |
114 | ||
115 | fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> { | |
116 | loop { | |
117 | ||
118 | if self.buffer.len() >= 32 { | |
119 | ||
120 | let left = self.buffer.split_to(32); | |
121 | ||
fa17b1ce WB |
122 | let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit(); |
123 | unsafe { | |
124 | (*digest.as_mut_ptr()).copy_from_slice(&left[..]); | |
125 | return Ok(Async::Ready(Some(digest.assume_init()))); | |
126 | } | |
ee53955f DM |
127 | } |
128 | ||
129 | match self.input.poll() { | |
130 | Err(err) => { | |
131 | return Err(err.into()); | |
132 | } | |
133 | Ok(Async::NotReady) => { | |
134 | return Ok(Async::NotReady); | |
135 | } | |
136 | Ok(Async::Ready(None)) => { | |
137 | let rest = self.buffer.len(); | |
138 | if rest == 0 { return Ok(Async::Ready(None)); } | |
139 | return Err(format_err!("got small digest ({} != 32).", rest)); | |
140 | } | |
141 | Ok(Async::Ready(Some(data))) => { | |
142 | self.buffer.extend_from_slice(&data); | |
143 | // continue | |
144 | } | |
145 | } | |
146 | } | |
147 | } | |
148 | } |