]> git.proxmox.com Git - proxmox-backup.git/blob - src/client/merge_known_chunks.rs
move 'wait_for_local_worker' from client to server
[proxmox-backup.git] / src / client / merge_known_chunks.rs
1 use std::pin::Pin;
2 use std::task::{Context, Poll};
3
4 use anyhow::Error;
5 use futures::{ready, Stream};
6 use pin_project::pin_project;
7
8 use pbs_datastore::data_blob::ChunkInfo;
9
10 pub enum MergedChunkInfo {
11 Known(Vec<(u64, [u8; 32])>),
12 New(ChunkInfo),
13 }
14
15 pub trait MergeKnownChunks: Sized {
16 fn merge_known_chunks(self) -> MergeKnownChunksQueue<Self>;
17 }
18
19 #[pin_project]
20 pub struct MergeKnownChunksQueue<S> {
21 #[pin]
22 input: S,
23 buffer: Option<MergedChunkInfo>,
24 }
25
26 impl<S> MergeKnownChunks for S
27 where
28 S: Stream<Item = Result<MergedChunkInfo, Error>>,
29 {
30 fn merge_known_chunks(self) -> MergeKnownChunksQueue<Self> {
31 MergeKnownChunksQueue {
32 input: self,
33 buffer: None,
34 }
35 }
36 }
37
38 impl<S> Stream for MergeKnownChunksQueue<S>
39 where
40 S: Stream<Item = Result<MergedChunkInfo, Error>>,
41 {
42 type Item = Result<MergedChunkInfo, Error>;
43
44 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
45 let mut this = self.project();
46
47 loop {
48 match ready!(this.input.as_mut().poll_next(cx)) {
49 Some(Err(err)) => return Poll::Ready(Some(Err(err))),
50 None => {
51 if let Some(last) = this.buffer.take() {
52 return Poll::Ready(Some(Ok(last)));
53 } else {
54 return Poll::Ready(None);
55 }
56 }
57 Some(Ok(mergerd_chunk_info)) => {
58 match mergerd_chunk_info {
59 MergedChunkInfo::Known(list) => {
60 let last = this.buffer.take();
61
62 match last {
63 None => {
64 *this.buffer = Some(MergedChunkInfo::Known(list));
65 // continue
66 }
67 Some(MergedChunkInfo::Known(mut last_list)) => {
68 last_list.extend_from_slice(&list);
69 let len = last_list.len();
70 *this.buffer = Some(MergedChunkInfo::Known(last_list));
71
72 if len >= 64 {
73 return Poll::Ready(this.buffer.take().map(Ok));
74 }
75 // continue
76 }
77 Some(MergedChunkInfo::New(_)) => {
78 *this.buffer = Some(MergedChunkInfo::Known(list));
79 return Poll::Ready(last.map(Ok));
80 }
81 }
82 }
83 MergedChunkInfo::New(chunk_info) => {
84 let new = MergedChunkInfo::New(chunk_info);
85 if let Some(last) = this.buffer.take() {
86 *this.buffer = Some(new);
87 return Poll::Ready(Some(Ok(last)));
88 } else {
89 return Poll::Ready(Some(Ok(new)));
90 }
91 }
92 }
93 }
94 }
95 }
96 }
97 }