use anyhow::{Error};
use futures::*;
+use pin_project::pin_project;
use crate::backup::ChunkInfo;
fn merge_known_chunks(self) -> MergeKnownChunksQueue<Self>;
}
+#[pin_project]
pub struct MergeKnownChunksQueue<S> {
+ #[pin]
input: S,
buffer: Option<MergedChunkInfo>,
}
type Item = Result<MergedChunkInfo, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
- let this = unsafe { self.get_unchecked_mut() };
+ let mut this = self.project();
loop {
- match ready!(unsafe { Pin::new_unchecked(&mut this.input) }.poll_next(cx)) {
+ match ready!(this.input.as_mut().poll_next(cx)) {
Some(Err(err)) => return Poll::Ready(Some(Err(err))),
None => {
if let Some(last) = this.buffer.take() {
match last {
None => {
- this.buffer = Some(MergedChunkInfo::Known(list));
+ *this.buffer = Some(MergedChunkInfo::Known(list));
// continue
}
Some(MergedChunkInfo::Known(mut last_list)) => {
last_list.extend_from_slice(&list);
let len = last_list.len();
- this.buffer = Some(MergedChunkInfo::Known(last_list));
+ *this.buffer = Some(MergedChunkInfo::Known(last_list));
if len >= 64 {
return Poll::Ready(this.buffer.take().map(Ok));
// continue
}
Some(MergedChunkInfo::New(_)) => {
- this.buffer = Some(MergedChunkInfo::Known(list));
+ *this.buffer = Some(MergedChunkInfo::Known(list));
return Poll::Ready(last.map(Ok));
}
}
MergedChunkInfo::New(chunk_info) => {
let new = MergedChunkInfo::New(chunk_info);
if let Some(last) = this.buffer.take() {
- this.buffer = Some(new);
+ *this.buffer = Some(new);
return Poll::Ready(Some(Ok(last)));
} else {
return Poll::Ready(Some(Ok(new)));