]>
Commit | Line | Data |
---|---|---|
5b391199 WB |
1 | use std::pin::Pin; |
2 | use std::task::{Context, Poll}; | |
3 | ||
be3a0295 WB |
4 | use anyhow::Error; |
5 | use futures::{ready, Stream}; | |
5d1d0f5d | 6 | use pin_project::pin_project; |
aa1b2e04 | 7 | |
be3a0295 | 8 | use pbs_datastore::data_blob::ChunkInfo; |
aa1b2e04 DM |
9 | |
10 | pub enum MergedChunkInfo { | |
7a57cb77 | 11 | Known(Vec<(u64, [u8; 32])>), |
aa1b2e04 DM |
12 | New(ChunkInfo), |
13 | } | |
14 | ||
15 | pub trait MergeKnownChunks: Sized { | |
62436222 | 16 | fn merge_known_chunks(self) -> MergeKnownChunksQueue<Self>; |
aa1b2e04 DM |
17 | } |
18 | ||
5d1d0f5d | 19 | #[pin_project] |
aa1b2e04 | 20 | pub struct MergeKnownChunksQueue<S> { |
5d1d0f5d | 21 | #[pin] |
aa1b2e04 | 22 | input: S, |
ecb21b47 | 23 | buffer: Option<MergedChunkInfo>, |
aa1b2e04 DM |
24 | } |
25 | ||
7a57cb77 WB |
26 | impl<S> MergeKnownChunks for S |
27 | where | |
5b391199 | 28 | S: Stream<Item = Result<MergedChunkInfo, Error>>, |
aa1b2e04 | 29 | { |
62436222 | 30 | fn merge_known_chunks(self) -> MergeKnownChunksQueue<Self> { |
7a57cb77 WB |
31 | MergeKnownChunksQueue { |
32 | input: self, | |
33 | buffer: None, | |
34 | } | |
aa1b2e04 DM |
35 | } |
36 | } | |
37 | ||
7a57cb77 WB |
38 | impl<S> Stream for MergeKnownChunksQueue<S> |
39 | where | |
5b391199 | 40 | S: Stream<Item = Result<MergedChunkInfo, Error>>, |
aa1b2e04 | 41 | { |
5b391199 WB |
42 | type Item = Result<MergedChunkInfo, Error>; |
43 | ||
44 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { | |
5d1d0f5d | 45 | let mut this = self.project(); |
aa1b2e04 | 46 | |
aa1b2e04 | 47 | loop { |
5d1d0f5d | 48 | match ready!(this.input.as_mut().poll_next(cx)) { |
5b391199 WB |
49 | Some(Err(err)) => return Poll::Ready(Some(Err(err))), |
50 | None => { | |
51 | if let Some(last) = this.buffer.take() { | |
52 | return Poll::Ready(Some(Ok(last))); | |
aa1b2e04 | 53 | } else { |
5b391199 | 54 | return Poll::Ready(None); |
aa1b2e04 DM |
55 | } |
56 | } | |
5b391199 | 57 | Some(Ok(mergerd_chunk_info)) => { |
62436222 DM |
58 | match mergerd_chunk_info { |
59 | MergedChunkInfo::Known(list) => { | |
5b391199 | 60 | let last = this.buffer.take(); |
aa1b2e04 | 61 | |
62436222 DM |
62 | match last { |
63 | None => { | |
5d1d0f5d | 64 | *this.buffer = Some(MergedChunkInfo::Known(list)); |
62436222 | 65 | // continue |
ecb21b47 | 66 | } |
62436222 DM |
67 | Some(MergedChunkInfo::Known(mut last_list)) => { |
68 | last_list.extend_from_slice(&list); | |
69 | let len = last_list.len(); | |
5d1d0f5d | 70 | *this.buffer = Some(MergedChunkInfo::Known(last_list)); |
ecb21b47 | 71 | |
62436222 | 72 | if len >= 64 { |
5b391199 | 73 | return Poll::Ready(this.buffer.take().map(Ok)); |
62436222 DM |
74 | } |
75 | // continue | |
76 | } | |
77 | Some(MergedChunkInfo::New(_)) => { | |
5d1d0f5d | 78 | *this.buffer = Some(MergedChunkInfo::Known(list)); |
5b391199 | 79 | return Poll::Ready(last.map(Ok)); |
62436222 | 80 | } |
aa1b2e04 | 81 | } |
aa1b2e04 | 82 | } |
62436222 DM |
83 | MergedChunkInfo::New(chunk_info) => { |
84 | let new = MergedChunkInfo::New(chunk_info); | |
5b391199 | 85 | if let Some(last) = this.buffer.take() { |
5d1d0f5d | 86 | *this.buffer = Some(new); |
5b391199 | 87 | return Poll::Ready(Some(Ok(last))); |
62436222 | 88 | } else { |
5b391199 | 89 | return Poll::Ready(Some(Ok(new))); |
62436222 | 90 | } |
ecb21b47 | 91 | } |
aa1b2e04 DM |
92 | } |
93 | } | |
94 | } | |
95 | } | |
96 | } | |
97 | } |