]>
git.proxmox.com Git - proxmox-backup.git/blob - src/client/merge_known_chunks.rs
2 use std
::task
::{Context, Poll}
;
5 use futures
::{ready, Stream}
;
6 use pin_project
::pin_project
;
8 use pbs_datastore
::data_blob
::ChunkInfo
;
10 pub enum MergedChunkInfo
{
11 Known(Vec
<(u64, [u8; 32])>),
15 pub trait MergeKnownChunks
: Sized
{
16 fn merge_known_chunks(self) -> MergeKnownChunksQueue
<Self>;
20 pub struct MergeKnownChunksQueue
<S
> {
23 buffer
: Option
<MergedChunkInfo
>,
26 impl<S
> MergeKnownChunks
for S
28 S
: Stream
<Item
= Result
<MergedChunkInfo
, Error
>>,
30 fn merge_known_chunks(self) -> MergeKnownChunksQueue
<Self> {
31 MergeKnownChunksQueue
{
38 impl<S
> Stream
for MergeKnownChunksQueue
<S
>
40 S
: Stream
<Item
= Result
<MergedChunkInfo
, Error
>>,
42 type Item
= Result
<MergedChunkInfo
, Error
>;
44 fn poll_next(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Option
<Self::Item
>> {
45 let mut this
= self.project();
48 match ready
!(this
.input
.as_mut().poll_next(cx
)) {
49 Some(Err(err
)) => return Poll
::Ready(Some(Err(err
))),
51 if let Some(last
) = this
.buffer
.take() {
52 return Poll
::Ready(Some(Ok(last
)));
54 return Poll
::Ready(None
);
57 Some(Ok(mergerd_chunk_info
)) => {
58 match mergerd_chunk_info
{
59 MergedChunkInfo
::Known(list
) => {
60 let last
= this
.buffer
.take();
64 *this
.buffer
= Some(MergedChunkInfo
::Known(list
));
67 Some(MergedChunkInfo
::Known(mut last_list
)) => {
68 last_list
.extend_from_slice(&list
);
69 let len
= last_list
.len();
70 *this
.buffer
= Some(MergedChunkInfo
::Known(last_list
));
73 return Poll
::Ready(this
.buffer
.take().map(Ok
));
77 Some(MergedChunkInfo
::New(_
)) => {
78 *this
.buffer
= Some(MergedChunkInfo
::Known(list
));
79 return Poll
::Ready(last
.map(Ok
));
83 MergedChunkInfo
::New(chunk_info
) => {
84 let new
= MergedChunkInfo
::New(chunk_info
);
85 if let Some(last
) = this
.buffer
.take() {
86 *this
.buffer
= Some(new
);
87 return Poll
::Ready(Some(Ok(last
)));
89 return Poll
::Ready(Some(Ok(new
)));