]> git.proxmox.com Git - proxmox-backup.git/blame - src/client/merge_known_chunks.rs
src/client/http_client.rs: encode parameters inside body
[proxmox-backup.git] / src / client / merge_known_chunks.rs
CommitLineData
aa1b2e04
DM
1use failure::*;
2use futures::*;
ecb21b47 3use std::collections::HashSet;
aa1b2e04
DM
4use std::sync::{Arc, Mutex};
5
6pub struct ChunkInfo {
7 pub digest: [u8; 32],
8 pub data: bytes::BytesMut,
9 pub offset: u64,
10}
11
12pub enum MergedChunkInfo {
9bb675ec 13 Known(Vec<(u64,[u8;32])>),
aa1b2e04
DM
14 New(ChunkInfo),
15}
16
17pub trait MergeKnownChunks: Sized {
18 fn merge_known_chunks(self, known_chunks: Arc<Mutex<HashSet<[u8;32]>>>) -> MergeKnownChunksQueue<Self>;
19}
20
21pub struct MergeKnownChunksQueue<S> {
22 input: S,
23 known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
ecb21b47 24 buffer: Option<MergedChunkInfo>,
aa1b2e04
DM
25}
26
27impl <S> MergeKnownChunks for S
28 where S: Stream<Item=ChunkInfo, Error=Error>,
29{
30 fn merge_known_chunks(self, known_chunks: Arc<Mutex<HashSet<[u8;32]>>>) -> MergeKnownChunksQueue<Self> {
ecb21b47 31 MergeKnownChunksQueue { input: self, known_chunks, buffer: None }
aa1b2e04
DM
32 }
33}
34
35impl <S> Stream for MergeKnownChunksQueue<S>
36 where S: Stream<Item=ChunkInfo, Error=Error>,
37{
38 type Item = MergedChunkInfo;
39 type Error = Error;
40
41 fn poll(&mut self) -> Poll<Option<MergedChunkInfo>, Error> {
42 loop {
aa1b2e04
DM
43 match self.input.poll() {
44 Err(err) => {
45 return Err(err);
46 }
47 Ok(Async::NotReady) => {
48 return Ok(Async::NotReady);
49 }
50 Ok(Async::Ready(None)) => {
ecb21b47
DM
51 if let Some(last) = self.buffer.take() {
52 return Ok(Async::Ready(Some(last)));
aa1b2e04 53 } else {
ecb21b47 54 return Ok(Async::Ready(None));
aa1b2e04
DM
55 }
56 }
57 Ok(Async::Ready(Some(chunk_info))) => {
58
59 let mut known_chunks = self.known_chunks.lock().unwrap();
60 let chunk_is_known = known_chunks.contains(&chunk_info.digest);
61
62 if chunk_is_known {
63
ecb21b47
DM
64 let last = self.buffer.take();
65
66 match last {
67 None => {
9bb675ec 68 self.buffer = Some(MergedChunkInfo::Known(vec![(chunk_info.offset, chunk_info.digest)]));
ecb21b47
DM
69 // continue
70 }
71 Some(MergedChunkInfo::Known(mut list)) => {
9bb675ec 72 list.push((chunk_info.offset, chunk_info.digest));
ecb21b47
DM
73 let len = list.len();
74 self.buffer = Some(MergedChunkInfo::Known(list));
75
76 if len >= 64 {
77 return Ok(Async::Ready(self.buffer.take()));
78 }
79 // continue
80
81 }
82 Some(MergedChunkInfo::New(_)) => {
9bb675ec 83 self.buffer = Some(MergedChunkInfo::Known(vec![(chunk_info.offset, chunk_info.digest)]));
ecb21b47 84 return Ok(Async::Ready(last));
aa1b2e04 85 }
aa1b2e04 86 }
ecb21b47 87
aa1b2e04
DM
88 } else {
89 known_chunks.insert(chunk_info.digest);
ecb21b47
DM
90 let new = MergedChunkInfo::New(chunk_info);
91 if let Some(last) = self.buffer.take() {
92 self.buffer = Some(new);
93 return Ok(Async::Ready(Some(last)));
94 } else {
95 return Ok(Async::Ready(Some(new)));
96 }
aa1b2e04
DM
97 }
98 }
99 }
100 }
101 }
102}