]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/backup/index.rs
switch to external pxar and fuse crates
[proxmox-backup.git] / src / backup / index.rs
index e58636bf83c5712fb769ca4f71a3ffbae0e4f3b6..5f37717c1170ff6ad2bd2f171c4ebe6b07675427 100644 (file)
@@ -1,9 +1,24 @@
 use std::collections::HashMap;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
 
 use bytes::{Bytes, BytesMut};
-use failure::*;
+use anyhow::{format_err, Error};
 use futures::*;
 
+pub struct ChunkReadInfo {
+    pub range: Range<u64>,
+    pub digest: [u8; 32],
+}
+
+impl ChunkReadInfo {
+    #[inline]
+    pub fn size(&self) -> u64 {
+        self.range.end - self.range.start
+    }
+}
+
 /// Trait to get digest list from index files
 ///
 /// To allow easy iteration over all used chunks.
@@ -82,9 +97,9 @@ impl std::io::Read for DigestListEncoder {
                     break;
                 }
             }
-            return Ok(written);
+            Ok(written)
         } else {
-            return Ok(0);
+            Ok(0)
         }
     }
 }
@@ -93,55 +108,61 @@ impl std::io::Read for DigestListEncoder {
 ///
 /// The reader simply returns a birary stream of 32 byte digest values.
 
-pub struct DigestListDecoder<S> {
+pub struct DigestListDecoder<S: Unpin> {
     input: S,
     buffer: BytesMut,
 }
 
-impl <S> DigestListDecoder<S> {
-
+impl<S: Unpin> DigestListDecoder<S> {
     pub fn new(input: S) -> Self {
         Self { input, buffer: BytesMut::new() }
     }
 }
 
-impl <S> Stream for DigestListDecoder<S>
-    where S: Stream<Item=Bytes>,
-          S::Error: Into<Error>,
-{
-    type Item = [u8; 32];
-    type Error = Error;
+impl<S: Unpin> Unpin for DigestListDecoder<S> {}
 
-    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
-        loop {
+impl<S: Unpin, E> Stream for DigestListDecoder<S>
+where
+    S: Stream<Item=Result<Bytes, E>>,
+    E: Into<Error>,
+{
+    type Item = Result<[u8; 32], Error>;
 
-            if self.buffer.len() >= 32 {
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+        let this = self.get_mut();
 
-                let left = self.buffer.split_to(32);
+        loop {
+            if this.buffer.len() >= 32 {
+                let left = this.buffer.split_to(32);
 
                 let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit();
                 unsafe {
                     (*digest.as_mut_ptr()).copy_from_slice(&left[..]);
-                    return Ok(Async::Ready(Some(digest.assume_init())));
+                    return Poll::Ready(Some(Ok(digest.assume_init())));
                 }
             }
 
-            match self.input.poll() {
-                Err(err) => {
-                    return Err(err.into());
-                }
-                Ok(Async::NotReady) => {
-                    return Ok(Async::NotReady);
+            match Pin::new(&mut this.input).poll_next(cx) {
+                Poll::Pending => {
+                    return Poll::Pending;
                 }
-                Ok(Async::Ready(None)) => {
-                    let rest = self.buffer.len();
-                    if rest == 0 { return Ok(Async::Ready(None)); }
-                    return Err(format_err!("got small digest ({} != 32).", rest));
+                Poll::Ready(Some(Err(err))) => {
+                    return Poll::Ready(Some(Err(err.into())));
                 }
-                Ok(Async::Ready(Some(data))) => {
-                    self.buffer.extend_from_slice(&data);
+                Poll::Ready(Some(Ok(data))) => {
+                    this.buffer.extend_from_slice(&data);
                     // continue
                 }
+                Poll::Ready(None) => {
+                    let rest = this.buffer.len();
+                    if rest == 0 {
+                        return Poll::Ready(None);
+                    }
+                    return Poll::Ready(Some(Err(format_err!(
+                        "got small digest ({} != 32).",
+                        rest,
+                    ))));
+                }
             }
         }
     }