1 use std
::collections
::HashMap
;
3 use std
::task
::{Context, Poll}
;
5 use bytes
::{Bytes, BytesMut}
;
6 use anyhow
::{format_err, Error}
;
9 /// Trait to get digest list from index files
11 /// To allow easy iteration over all used chunks.
13 fn index_count(&self) -> usize;
14 fn index_digest(&self, pos
: usize) -> Option
<&[u8; 32]>;
15 fn index_bytes(&self) -> u64;
17 /// Returns most often used chunks
18 fn find_most_used_chunks(&self, max
: usize) -> HashMap
<[u8; 32], usize> {
19 let mut map
= HashMap
::new();
21 for pos
in 0..self.index_count() {
22 let digest
= self.index_digest(pos
).unwrap();
24 let count
= map
.entry(*digest
).or_insert(0);
28 let mut most_used
= Vec
::new();
30 for (digest
, count
) in map
{
31 if count
<= 1 { continue; }
32 match most_used
.binary_search_by_key(&count
, |&(_digest
, count
)| count
) {
33 Ok(p
) => most_used
.insert(p
, (digest
, count
)),
34 Err(p
) => most_used
.insert(p
, (digest
, count
)),
37 if most_used
.len() > max { let _ = most_used.pop(); }
40 let mut map
= HashMap
::new();
42 for data
in most_used
{
43 map
.insert(data
.0, data
.1);
50 /// Encode digest list from an `IndexFile` into a binary stream
52 /// The reader simply returns a birary stream of 32 byte digest values.
53 pub struct DigestListEncoder
{
54 index
: Box
<dyn IndexFile
+ Send
+ Sync
>,
59 impl DigestListEncoder
{
61 pub fn new(index
: Box
<dyn IndexFile
+ Send
+ Sync
>) -> Self {
62 let count
= index
.index_count();
63 Self { index, pos: 0, count }
67 impl std
::io
::Read
for DigestListEncoder
{
68 fn read(&mut self, buf
: &mut [u8]) -> Result
<usize, std
::io
::Error
> {
70 panic
!("read buffer too small");
73 if self.pos
< self.count
{
76 let digest
= self.index
.index_digest(self.pos
).unwrap();
77 buf
[written
..(written
+ 32)].copy_from_slice(digest
);
80 if self.pos
>= self.count
{
83 if (written
+ 32) >= buf
.len() {
94 /// Decodes a Stream<Item=Bytes> into Stream<Item=<[u8;32]>
96 /// The reader simply returns a birary stream of 32 byte digest values.
98 pub struct DigestListDecoder
<S
: Unpin
> {
103 impl<S
: Unpin
> DigestListDecoder
<S
> {
104 pub fn new(input
: S
) -> Self {
105 Self { input, buffer: BytesMut::new() }
109 impl<S
: Unpin
> Unpin
for DigestListDecoder
<S
> {}
111 impl<S
: Unpin
, E
> Stream
for DigestListDecoder
<S
>
113 S
: Stream
<Item
=Result
<Bytes
, E
>>,
116 type Item
= Result
<[u8; 32], Error
>;
118 fn poll_next(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Option
<Self::Item
>> {
119 let this
= self.get_mut();
122 if this
.buffer
.len() >= 32 {
123 let left
= this
.buffer
.split_to(32);
125 let mut digest
= std
::mem
::MaybeUninit
::<[u8; 32]>::uninit();
127 (*digest
.as_mut_ptr()).copy_from_slice(&left
[..]);
128 return Poll
::Ready(Some(Ok(digest
.assume_init())));
132 match Pin
::new(&mut this
.input
).poll_next(cx
) {
134 return Poll
::Pending
;
136 Poll
::Ready(Some(Err(err
))) => {
137 return Poll
::Ready(Some(Err(err
.into())));
139 Poll
::Ready(Some(Ok(data
))) => {
140 this
.buffer
.extend_from_slice(&data
);
143 Poll
::Ready(None
) => {
144 let rest
= this
.buffer
.len();
146 return Poll
::Ready(None
);
148 return Poll
::Ready(Some(Err(format_err
!(
149 "got small digest ({} != 32).",