]>
Commit | Line | Data |
---|---|---|
aa1b2e04 DM |
1 | use failure::*; |
2 | use futures::*; | |
ecb21b47 | 3 | use std::collections::HashSet; |
aa1b2e04 DM |
4 | use std::sync::{Arc, Mutex}; |
5 | ||
6 | pub struct ChunkInfo { | |
7 | pub digest: [u8; 32], | |
8 | pub data: bytes::BytesMut, | |
9 | pub offset: u64, | |
10 | } | |
11 | ||
12 | pub enum MergedChunkInfo { | |
9bb675ec | 13 | Known(Vec<(u64,[u8;32])>), |
aa1b2e04 DM |
14 | New(ChunkInfo), |
15 | } | |
16 | ||
17 | pub trait MergeKnownChunks: Sized { | |
18 | fn merge_known_chunks(self, known_chunks: Arc<Mutex<HashSet<[u8;32]>>>) -> MergeKnownChunksQueue<Self>; | |
19 | } | |
20 | ||
21 | pub struct MergeKnownChunksQueue<S> { | |
22 | input: S, | |
23 | known_chunks: Arc<Mutex<HashSet<[u8;32]>>>, | |
ecb21b47 | 24 | buffer: Option<MergedChunkInfo>, |
aa1b2e04 DM |
25 | } |
26 | ||
27 | impl <S> MergeKnownChunks for S | |
28 | where S: Stream<Item=ChunkInfo, Error=Error>, | |
29 | { | |
30 | fn merge_known_chunks(self, known_chunks: Arc<Mutex<HashSet<[u8;32]>>>) -> MergeKnownChunksQueue<Self> { | |
ecb21b47 | 31 | MergeKnownChunksQueue { input: self, known_chunks, buffer: None } |
aa1b2e04 DM |
32 | } |
33 | } | |
34 | ||
35 | impl <S> Stream for MergeKnownChunksQueue<S> | |
36 | where S: Stream<Item=ChunkInfo, Error=Error>, | |
37 | { | |
38 | type Item = MergedChunkInfo; | |
39 | type Error = Error; | |
40 | ||
41 | fn poll(&mut self) -> Poll<Option<MergedChunkInfo>, Error> { | |
42 | loop { | |
aa1b2e04 DM |
43 | match self.input.poll() { |
44 | Err(err) => { | |
45 | return Err(err); | |
46 | } | |
47 | Ok(Async::NotReady) => { | |
48 | return Ok(Async::NotReady); | |
49 | } | |
50 | Ok(Async::Ready(None)) => { | |
ecb21b47 DM |
51 | if let Some(last) = self.buffer.take() { |
52 | return Ok(Async::Ready(Some(last))); | |
aa1b2e04 | 53 | } else { |
ecb21b47 | 54 | return Ok(Async::Ready(None)); |
aa1b2e04 DM |
55 | } |
56 | } | |
57 | Ok(Async::Ready(Some(chunk_info))) => { | |
58 | ||
59 | let mut known_chunks = self.known_chunks.lock().unwrap(); | |
60 | let chunk_is_known = known_chunks.contains(&chunk_info.digest); | |
61 | ||
62 | if chunk_is_known { | |
63 | ||
ecb21b47 DM |
64 | let last = self.buffer.take(); |
65 | ||
66 | match last { | |
67 | None => { | |
9bb675ec | 68 | self.buffer = Some(MergedChunkInfo::Known(vec![(chunk_info.offset, chunk_info.digest)])); |
ecb21b47 DM |
69 | // continue |
70 | } | |
71 | Some(MergedChunkInfo::Known(mut list)) => { | |
9bb675ec | 72 | list.push((chunk_info.offset, chunk_info.digest)); |
ecb21b47 DM |
73 | let len = list.len(); |
74 | self.buffer = Some(MergedChunkInfo::Known(list)); | |
75 | ||
76 | if len >= 64 { | |
77 | return Ok(Async::Ready(self.buffer.take())); | |
78 | } | |
79 | // continue | |
80 | ||
81 | } | |
82 | Some(MergedChunkInfo::New(_)) => { | |
9bb675ec | 83 | self.buffer = Some(MergedChunkInfo::Known(vec![(chunk_info.offset, chunk_info.digest)])); |
ecb21b47 | 84 | return Ok(Async::Ready(last)); |
aa1b2e04 | 85 | } |
aa1b2e04 | 86 | } |
ecb21b47 | 87 | |
aa1b2e04 DM |
88 | } else { |
89 | known_chunks.insert(chunk_info.digest); | |
ecb21b47 DM |
90 | let new = MergedChunkInfo::New(chunk_info); |
91 | if let Some(last) = self.buffer.take() { | |
92 | self.buffer = Some(new); | |
93 | return Ok(Async::Ready(Some(last))); | |
94 | } else { | |
95 | return Ok(Async::Ready(Some(new))); | |
96 | } | |
aa1b2e04 DM |
97 | } |
98 | } | |
99 | } | |
100 | } | |
101 | } | |
102 | } |