]> git.proxmox.com Git - pxar.git/commitdiff
add AsyncWrite interface to encoder::FileImpl
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Tue, 10 Mar 2020 11:58:59 +0000 (12:58 +0100)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Wed, 18 Mar 2020 14:08:56 +0000 (15:08 +0100)
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
src/encoder.rs
src/encoder/sync.rs

index 3ade5685481490731af9bda035f5263554efc81c..9474e7142888b87c2b00996d3715fac1bd841ac0 100644 (file)
@@ -34,6 +34,16 @@ pub trait SeqWrite {
         buf: &[u8],
     ) -> Poll<io::Result<usize>>;
 
+    fn poll_flush(
+        self: Pin<&mut Self>,
+        cx: &mut Context,
+    ) -> Poll<io::Result<()>>;
+
+    fn poll_close(
+        self: Pin<&mut Self>,
+        cx: &mut Context,
+    ) -> Poll<io::Result<()>>;
+
     /// While writing to a pxar archive we need to remember how much dat we've written to track some
     /// offsets. Particularly items like the goodbye table need to be able to compute offsets to
     /// further back in the archive.
@@ -63,6 +73,14 @@ impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
         }
     }
 
+    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
+        unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
+    }
+
+    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
+        unsafe { self.map_unchecked_mut(|this| &mut **this).poll_close(cx) }
+    }
+
     fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
         unsafe { self.map_unchecked_mut(|this| &mut **this).poll_position(cx) }
     }
@@ -88,6 +106,16 @@ impl PxarStruct for format::QuotaProjectId {
 }
 
 impl<'a> dyn SeqWrite + 'a {
+    /// awaitable version of `poll_flush`.
+    async fn flush(&mut self) -> io::Result<()> {
+        poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *self).poll_flush(cx) }).await
+    }
+
+    /// awaitable version of `poll_close`.
+    async fn close(&mut self) -> io::Result<()> {
+        poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *self).poll_close(cx) }).await
+    }
+
     /// awaitable version of `poll_position`.
     async fn position(&mut self) -> io::Result<u64> {
         poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *self).poll_position(cx) }).await
@@ -358,7 +386,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         htype: u64,
         entry_data: &[u8],
     ) -> io::Result<()> {
-        self.check();
+        self.check()?;
 
         let file_offset = (&mut self.output as &mut dyn SeqWrite).position().await?;
 
@@ -622,6 +650,41 @@ impl<'a> FileImpl<'a> {
         }
     }
 
+    /// Poll write interface to more easily connect to tokio/futures.
+    #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
+    pub fn poll_write(
+        self: Pin<&mut Self>,
+        cx: &mut Context,
+        data: &[u8],
+    ) -> Poll<io::Result<usize>> {
+        let this = self.get_mut();
+        this.check_remaining(data.len())?;
+        let output = unsafe { Pin::new_unchecked(&mut *this.output) };
+        match output.poll_seq_write(cx, data) {
+            Poll::Ready(Ok(put)) => {
+                this.remaining_size -= put as u64;
+                Poll::Ready(Ok(put))
+            }
+            other => other,
+        }
+    }
+
+    /// Poll flush interface to more easily connect to tokio/futures.
+    #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
+    pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
+        unsafe {
+            self.map_unchecked_mut(|this| &mut this.output).poll_flush(cx)
+        }
+    }
+
+    /// Poll close/shutdown interface to more easily connect to tokio/futures.
+    #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
+    pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
+        unsafe {
+            self.map_unchecked_mut(|this| &mut this.output).poll_close(cx)
+        }
+    }
+
     /// Write file data for the current file entry in a pxar archive.
     ///
     /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
@@ -642,3 +705,41 @@ impl<'a> FileImpl<'a> {
         Ok(())
     }
 }
+
+#[cfg(feature = "tokio-io")]
+impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
+    fn poll_write(
+        self: Pin<&mut Self>,
+        cx: &mut Context,
+        buf: &[u8],
+    ) -> Poll<io::Result<usize>> {
+        FileImpl::poll_write(self, cx, buf)
+    }
+
+    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
+        FileImpl::poll_flush(self, cx)
+    }
+
+    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
+        FileImpl::poll_close(self, cx)
+    }
+}
+
+#[cfg(feature = "futures-io")]
+impl<'a> futures::io::AsyncWrite for FileImpl<'a> {
+    fn poll_write(
+        self: Pin<&mut Self>,
+        cx: &mut Context,
+        buf: &[u8],
+    ) -> Poll<io::Result<usize>> {
+        FileImpl::poll_write(self, cx, buf)
+    }
+
+    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
+        FileImpl::poll_flush(self, cx)
+    }
+
+    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
+        FileImpl::poll_close(self, cx)
+    }
+}
index e90ddb13c0cb9dd508adb0807a9d4bf0fd4a1ee3..7374c0620bdc2958882f96b8458051b19520b2f8 100644 (file)
@@ -45,7 +45,7 @@ impl<'a> Encoder<'a, StandardWriter<std::fs::File>> {
 }
 
 impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
-    /// Create a *blocking* encoder from an output implementing our internal write interface.
+    /// Create a *blocking* encoder for an output implementing our internal write interface.
     ///
     /// Note that the `output`'s `SeqWrite` implementation must always return `Poll::Ready` and is
     /// not allowed to use the `Waker`, as this will cause a `panic!`.
@@ -130,13 +130,23 @@ impl<'a> io::Write for File<'a> {
 
 /// Pxar encoder write adapter for `std::io::Write`.
 pub struct StandardWriter<T> {
-    inner: T,
+    inner: Option<T>,
     position: u64,
 }
 
 impl<T: io::Write> StandardWriter<T> {
     pub fn new(inner: T) -> Self {
-        Self { inner, position: 0 }
+        Self { inner: Some(inner), position: 0 }
+    }
+
+    fn inner(&mut self) -> io::Result<&mut T> {
+        self.inner
+            .as_mut()
+            .ok_or_else(|| io_format_err!("write after close"))
+    }
+
+    fn pin_to_inner(self: Pin<&mut Self>) -> io::Result<&mut T> {
+        unsafe { self.get_unchecked_mut() }.inner()
     }
 }
 
@@ -147,7 +157,7 @@ impl<T: io::Write> SeqWrite for StandardWriter<T> {
         buf: &[u8],
     ) -> Poll<io::Result<usize>> {
         let this = unsafe { self.get_unchecked_mut() };
-        Poll::Ready(match this.inner.write(buf) {
+        Poll::Ready(match this.inner()?.write(buf) {
             Ok(got) => {
                 this.position += got as u64;
                 Ok(got)
@@ -159,4 +169,16 @@ impl<T: io::Write> SeqWrite for StandardWriter<T> {
     fn poll_position(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<u64>> {
         Poll::Ready(Ok(self.as_ref().position))
     }
+
+    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
+        Poll::Ready(self.pin_to_inner().and_then(|inner| inner.flush()))
+    }
+
+    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
+        let this = unsafe { self.get_unchecked_mut() };
+        Poll::Ready(match this.inner.as_mut() {
+            None => Ok(()),
+            Some(inner) => inner.flush(),
+        })
+    }
 }