]>
Commit | Line | Data |
---|---|---|
ee53955f DM |
1 | use failure::*; |
2 | use futures::*; | |
3 | use bytes::{Bytes, BytesMut}; | |
f4bf7dfc | 4 | use std::collections::HashMap; |
ee53955f | 5 | |
a660978c DM |
6 | /// Trait to get digest list from index files |
7 | /// | |
8 | /// To allow easy iteration over all used chunks. | |
457531e7 | 9 | pub trait IndexFile: Send { |
7bc1d727 WB |
10 | fn index_count(&self) -> usize; |
11 | fn index_digest(&self, pos: usize) -> Option<&[u8; 32]>; | |
a660978c | 12 | fn index_bytes(&self) -> u64; |
f4bf7dfc DM |
13 | |
14 | /// Returns most often used chunks | |
15 | fn find_most_used_chunks(&self, max: usize) -> HashMap<[u8; 32], usize> { | |
16 | let mut map = HashMap::new(); | |
17 | ||
18 | for pos in 0..self.index_count() { | |
19 | let digest = self.index_digest(pos).unwrap(); | |
20 | ||
21 | let count = map.entry(*digest).or_insert(0); | |
22 | *count += 1; | |
23 | } | |
24 | ||
25 | let mut most_used = Vec::new(); | |
26 | ||
27 | for (digest, count) in map { | |
28 | if count <= 1 { continue; } | |
29 | match most_used.binary_search_by_key(&count, |&(_digest, count)| count) { | |
30 | Ok(p) => most_used.insert(p, (digest, count)), | |
31 | Err(p) => most_used.insert(p, (digest, count)), | |
32 | } | |
33 | ||
34 | if most_used.len() > max { let _ = most_used.pop(); } | |
35 | } | |
36 | ||
37 | let mut map = HashMap::new(); | |
38 | ||
39 | for data in most_used { | |
40 | map.insert(data.0, data.1); | |
41 | } | |
42 | ||
43 | map | |
44 | } | |
7bc1d727 | 45 | } |
457531e7 | 46 | |
7f3d2ffa | 47 | /// Encode digest list from an `IndexFile` into a binary stream |
457531e7 DM |
48 | /// |
49 | /// The reader simply returns a birary stream of 32 byte digest values. | |
7f3d2ffa | 50 | pub struct DigestListEncoder { |
457531e7 DM |
51 | index: Box<dyn IndexFile>, |
52 | pos: usize, | |
53 | count: usize, | |
54 | } | |
55 | ||
7f3d2ffa | 56 | impl DigestListEncoder { |
457531e7 DM |
57 | |
58 | pub fn new(index: Box<dyn IndexFile>) -> Self { | |
59 | let count = index.index_count(); | |
60 | Self { index, pos: 0, count } | |
61 | } | |
62 | } | |
63 | ||
7f3d2ffa | 64 | impl std::io::Read for DigestListEncoder { |
457531e7 DM |
65 | |
66 | fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { | |
67 | if buf.len() < 32 { panic!("read buffer too small"); } | |
68 | if self.pos < self.count { | |
69 | let mut written = 0; | |
70 | loop { | |
71 | let digest = self.index.index_digest(self.pos).unwrap(); | |
72 | unsafe { std::ptr::copy_nonoverlapping(digest.as_ptr(), buf.as_mut_ptr().add(written), 32); } | |
73 | self.pos += 1; | |
74 | written += 32; | |
75 | if self.pos >= self.count { break; } | |
76 | if (written + 32) >= buf.len() { break; } | |
77 | } | |
78 | return Ok(written); | |
79 | } else { | |
80 | return Ok(0); | |
81 | } | |
82 | } | |
83 | } | |
ee53955f DM |
84 | |
85 | /// Decodes a Stream<Item=Bytes> into Stream<Item=<[u8;32]> | |
86 | /// | |
87 | /// The reader simply returns a birary stream of 32 byte digest values. | |
88 | ||
89 | pub struct DigestListDecoder<S> { | |
90 | input: S, | |
91 | buffer: BytesMut, | |
92 | } | |
93 | ||
94 | impl <S> DigestListDecoder<S> { | |
95 | ||
96 | pub fn new(input: S) -> Self { | |
97 | Self { input, buffer: BytesMut::new() } | |
98 | } | |
99 | } | |
100 | ||
101 | impl <S> Stream for DigestListDecoder<S> | |
102 | where S: Stream<Item=Bytes>, | |
103 | S::Error: Into<Error>, | |
104 | { | |
105 | type Item = [u8; 32]; | |
106 | type Error = Error; | |
107 | ||
108 | fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> { | |
109 | loop { | |
110 | ||
111 | if self.buffer.len() >= 32 { | |
112 | ||
113 | let left = self.buffer.split_to(32); | |
114 | ||
115 | let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() }; | |
116 | unsafe { std::ptr::copy_nonoverlapping(left.as_ptr(), digest.as_mut_ptr(), 32); } | |
117 | ||
118 | return Ok(Async::Ready(Some(digest))); | |
119 | } | |
120 | ||
121 | match self.input.poll() { | |
122 | Err(err) => { | |
123 | return Err(err.into()); | |
124 | } | |
125 | Ok(Async::NotReady) => { | |
126 | return Ok(Async::NotReady); | |
127 | } | |
128 | Ok(Async::Ready(None)) => { | |
129 | let rest = self.buffer.len(); | |
130 | if rest == 0 { return Ok(Async::Ready(None)); } | |
131 | return Err(format_err!("got small digest ({} != 32).", rest)); | |
132 | } | |
133 | Ok(Async::Ready(Some(data))) => { | |
134 | self.buffer.extend_from_slice(&data); | |
135 | // continue | |
136 | } | |
137 | } | |
138 | } | |
139 | } | |
140 | } |