From 05356884d69b316434d1aafc6a8cb3ecf653221c Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Tue, 10 Mar 2020 12:58:59 +0100 Subject: [PATCH] add AsyncWrite interface to encoder::FileImpl Signed-off-by: Wolfgang Bumiller --- src/encoder.rs | 103 +++++++++++++++++++++++++++++++++++++++++++- src/encoder/sync.rs | 30 +++++++++++-- 2 files changed, 128 insertions(+), 5 deletions(-) diff --git a/src/encoder.rs b/src/encoder.rs index 3ade568..9474e71 100644 --- a/src/encoder.rs +++ b/src/encoder.rs @@ -34,6 +34,16 @@ pub trait SeqWrite { buf: &[u8], ) -> Poll>; + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll>; + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll>; + /// While writing to a pxar archive we need to remember how much dat we've written to track some /// offsets. Particularly items like the goodbye table need to be able to compute offsets to /// further back in the archive. @@ -63,6 +73,14 @@ impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) { } } + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + unsafe { self.map_unchecked_mut(|this| &mut **this).poll_close(cx) } + } + fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { unsafe { self.map_unchecked_mut(|this| &mut **this).poll_position(cx) } } @@ -88,6 +106,16 @@ impl PxarStruct for format::QuotaProjectId { } impl<'a> dyn SeqWrite + 'a { + /// awaitable version of `poll_flush`. + async fn flush(&mut self) -> io::Result<()> { + poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *self).poll_flush(cx) }).await + } + + /// awaitable version of `poll_close`. + async fn close(&mut self) -> io::Result<()> { + poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *self).poll_close(cx) }).await + } + /// awaitable version of `poll_position`. async fn position(&mut self) -> io::Result { poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *self).poll_position(cx) }).await @@ -358,7 +386,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { htype: u64, entry_data: &[u8], ) -> io::Result<()> { - self.check(); + self.check()?; let file_offset = (&mut self.output as &mut dyn SeqWrite).position().await?; @@ -622,6 +650,41 @@ impl<'a> FileImpl<'a> { } } + /// Poll write interface to more easily connect to tokio/futures. + #[cfg(any(feature = "tokio-io", feature = "futures-io"))] + pub fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + data: &[u8], + ) -> Poll> { + let this = self.get_mut(); + this.check_remaining(data.len())?; + let output = unsafe { Pin::new_unchecked(&mut *this.output) }; + match output.poll_seq_write(cx, data) { + Poll::Ready(Ok(put)) => { + this.remaining_size -= put as u64; + Poll::Ready(Ok(put)) + } + other => other, + } + } + + /// Poll flush interface to more easily connect to tokio/futures. + #[cfg(any(feature = "tokio-io", feature = "futures-io"))] + pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + unsafe { + self.map_unchecked_mut(|this| &mut this.output).poll_flush(cx) + } + } + + /// Poll close/shutdown interface to more easily connect to tokio/futures. + #[cfg(any(feature = "tokio-io", feature = "futures-io"))] + pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + unsafe { + self.map_unchecked_mut(|this| &mut this.output).poll_close(cx) + } + } + /// Write file data for the current file entry in a pxar archive. /// /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than @@ -642,3 +705,41 @@ impl<'a> FileImpl<'a> { Ok(()) } } + +#[cfg(feature = "tokio-io")] +impl<'a> tokio::io::AsyncWrite for FileImpl<'a> { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll> { + FileImpl::poll_write(self, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + FileImpl::poll_flush(self, cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + FileImpl::poll_close(self, cx) + } +} + +#[cfg(feature = "futures-io")] +impl<'a> futures::io::AsyncWrite for FileImpl<'a> { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll> { + FileImpl::poll_write(self, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + FileImpl::poll_flush(self, cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + FileImpl::poll_close(self, cx) + } +} diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs index e90ddb1..7374c06 100644 --- a/src/encoder/sync.rs +++ b/src/encoder/sync.rs @@ -45,7 +45,7 @@ impl<'a> Encoder<'a, StandardWriter> { } impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { - /// Create a *blocking* encoder from an output implementing our internal write interface. + /// Create a *blocking* encoder for an output implementing our internal write interface. /// /// Note that the `output`'s `SeqWrite` implementation must always return `Poll::Ready` and is /// not allowed to use the `Waker`, as this will cause a `panic!`. @@ -130,13 +130,23 @@ impl<'a> io::Write for File<'a> { /// Pxar encoder write adapter for `std::io::Write`. pub struct StandardWriter { - inner: T, + inner: Option, position: u64, } impl StandardWriter { pub fn new(inner: T) -> Self { - Self { inner, position: 0 } + Self { inner: Some(inner), position: 0 } + } + + fn inner(&mut self) -> io::Result<&mut T> { + self.inner + .as_mut() + .ok_or_else(|| io_format_err!("write after close")) + } + + fn pin_to_inner(self: Pin<&mut Self>) -> io::Result<&mut T> { + unsafe { self.get_unchecked_mut() }.inner() } } @@ -147,7 +157,7 @@ impl SeqWrite for StandardWriter { buf: &[u8], ) -> Poll> { let this = unsafe { self.get_unchecked_mut() }; - Poll::Ready(match this.inner.write(buf) { + Poll::Ready(match this.inner()?.write(buf) { Ok(got) => { this.position += got as u64; Ok(got) @@ -159,4 +169,16 @@ impl SeqWrite for StandardWriter { fn poll_position(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { Poll::Ready(Ok(self.as_ref().position)) } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + Poll::Ready(self.pin_to_inner().and_then(|inner| inner.flush())) + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + let this = unsafe { self.get_unchecked_mut() }; + Poll::Ready(match this.inner.as_mut() { + None => Ok(()), + Some(inner) => inner.flush(), + }) + } } -- 2.39.5