]> git.proxmox.com Git - proxmox-backup.git/blame - src/client/merge_known_chunks.rs
move 'wait_for_local_worker' from client to server
[proxmox-backup.git] / src / client / merge_known_chunks.rs
CommitLineData
5b391199
WB
1use std::pin::Pin;
2use std::task::{Context, Poll};
3
be3a0295
WB
4use anyhow::Error;
5use futures::{ready, Stream};
5d1d0f5d 6use pin_project::pin_project;
aa1b2e04 7
be3a0295 8use pbs_datastore::data_blob::ChunkInfo;
aa1b2e04
DM
9
10pub enum MergedChunkInfo {
7a57cb77 11 Known(Vec<(u64, [u8; 32])>),
aa1b2e04
DM
12 New(ChunkInfo),
13}
14
15pub trait MergeKnownChunks: Sized {
62436222 16 fn merge_known_chunks(self) -> MergeKnownChunksQueue<Self>;
aa1b2e04
DM
17}
18
5d1d0f5d 19#[pin_project]
aa1b2e04 20pub struct MergeKnownChunksQueue<S> {
5d1d0f5d 21 #[pin]
aa1b2e04 22 input: S,
ecb21b47 23 buffer: Option<MergedChunkInfo>,
aa1b2e04
DM
24}
25
7a57cb77
WB
26impl<S> MergeKnownChunks for S
27where
5b391199 28 S: Stream<Item = Result<MergedChunkInfo, Error>>,
aa1b2e04 29{
62436222 30 fn merge_known_chunks(self) -> MergeKnownChunksQueue<Self> {
7a57cb77
WB
31 MergeKnownChunksQueue {
32 input: self,
33 buffer: None,
34 }
aa1b2e04
DM
35 }
36}
37
7a57cb77
WB
38impl<S> Stream for MergeKnownChunksQueue<S>
39where
5b391199 40 S: Stream<Item = Result<MergedChunkInfo, Error>>,
aa1b2e04 41{
5b391199
WB
42 type Item = Result<MergedChunkInfo, Error>;
43
44 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
5d1d0f5d 45 let mut this = self.project();
aa1b2e04 46
aa1b2e04 47 loop {
5d1d0f5d 48 match ready!(this.input.as_mut().poll_next(cx)) {
5b391199
WB
49 Some(Err(err)) => return Poll::Ready(Some(Err(err))),
50 None => {
51 if let Some(last) = this.buffer.take() {
52 return Poll::Ready(Some(Ok(last)));
aa1b2e04 53 } else {
5b391199 54 return Poll::Ready(None);
aa1b2e04
DM
55 }
56 }
5b391199 57 Some(Ok(mergerd_chunk_info)) => {
62436222
DM
58 match mergerd_chunk_info {
59 MergedChunkInfo::Known(list) => {
5b391199 60 let last = this.buffer.take();
aa1b2e04 61
62436222
DM
62 match last {
63 None => {
5d1d0f5d 64 *this.buffer = Some(MergedChunkInfo::Known(list));
62436222 65 // continue
ecb21b47 66 }
62436222
DM
67 Some(MergedChunkInfo::Known(mut last_list)) => {
68 last_list.extend_from_slice(&list);
69 let len = last_list.len();
5d1d0f5d 70 *this.buffer = Some(MergedChunkInfo::Known(last_list));
ecb21b47 71
62436222 72 if len >= 64 {
5b391199 73 return Poll::Ready(this.buffer.take().map(Ok));
62436222
DM
74 }
75 // continue
76 }
77 Some(MergedChunkInfo::New(_)) => {
5d1d0f5d 78 *this.buffer = Some(MergedChunkInfo::Known(list));
5b391199 79 return Poll::Ready(last.map(Ok));
62436222 80 }
aa1b2e04 81 }
aa1b2e04 82 }
62436222
DM
83 MergedChunkInfo::New(chunk_info) => {
84 let new = MergedChunkInfo::New(chunk_info);
5b391199 85 if let Some(last) = this.buffer.take() {
5d1d0f5d 86 *this.buffer = Some(new);
5b391199 87 return Poll::Ready(Some(Ok(last)));
62436222 88 } else {
5b391199 89 return Poll::Ready(Some(Ok(new)));
62436222 90 }
ecb21b47 91 }
aa1b2e04
DM
92 }
93 }
94 }
95 }
96 }
97}