]> git.proxmox.com Git - pxar.git/blobdiff - src/encoder/mod.rs
don't hold temp buffer mutex across await point
[pxar.git] / src / encoder / mod.rs
index f100939b1650c575344e8d42748d233238689589..d14b2d04765bce11378af2de2007205675f64635 100644 (file)
@@ -2,13 +2,14 @@
 //!
 //! This is the implementation used by both the synchronous and async pxar wrappers.
 
-use std::cell::RefCell;
+#![deny(missing_docs)]
+
 use std::io;
 use std::mem::{forget, size_of, size_of_val, take};
 use std::os::unix::ffi::OsStrExt;
 use std::path::Path;
 use std::pin::Pin;
-use std::rc::Rc;
+use std::sync::{Arc, Mutex};
 use std::task::{Context, Poll};
 
 use endian_trait::Endian;
@@ -30,6 +31,7 @@ pub use sync::Encoder;
 pub struct LinkOffset(u64);
 
 impl LinkOffset {
+    /// Get the raw byte offset of this link.
     #[inline]
     pub fn raw(self) -> u64 {
         self.0
@@ -42,34 +44,31 @@ impl LinkOffset {
 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
 /// wrapper.
 pub trait SeqWrite {
+    /// Attempt to perform a sequential write to the file. On success, the number of written bytes
+    /// is returned as `Poll::Ready(Ok(bytes))`.
+    ///
+    /// If writing is not yet possible, `Poll::Pending` is returned and the current task will be
+    /// notified via the `cx.waker()` when writing becomes possible.
     fn poll_seq_write(
         self: Pin<&mut Self>,
         cx: &mut Context,
         buf: &[u8],
     ) -> Poll<io::Result<usize>>;
 
+    /// Attempt to flush the output, ensuring that all buffered data reaches the destination.
+    ///
+    /// On success, returns `Poll::Ready(Ok(()))`.
+    ///
+    /// If flushing cannot complete immediately, `Poll::Pending` is returned and the current task
+    /// will be notified via `cx.waker()` when progress can be made.
     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
-
-    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
-
-    /// While writing to a pxar archive we need to remember how much dat we've written to track some
-    /// offsets. Particularly items like the goodbye table need to be able to compute offsets to
-    /// further back in the archive.
-    fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>>;
-
-    /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
-    /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
-    /// subdirectory in that would have a trait object pointing to the trait object, and so on.
-    fn as_trait_object(&mut self) -> &mut dyn SeqWrite
-    where
-        Self: Sized,
-    {
-        self as &mut dyn SeqWrite
-    }
 }
 
 /// Allow using trait objects for generics taking a `SeqWrite`.
-impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
+impl<S> SeqWrite for &mut S
+where
+    S: SeqWrite + ?Sized,
+{
     fn poll_seq_write(
         self: Pin<&mut Self>,
         cx: &mut Context,
@@ -84,91 +83,104 @@ impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
         unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
     }
-
-    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
-        unsafe { self.map_unchecked_mut(|this| &mut **this).poll_close(cx) }
-    }
-
-    fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
-        unsafe { self.map_unchecked_mut(|this| &mut **this).poll_position(cx) }
-    }
-
-    fn as_trait_object(&mut self) -> &mut dyn SeqWrite
-    where
-        Self: Sized,
-    {
-        &mut **self
-    }
 }
 
-/// awaitable version of `poll_position`.
-async fn seq_write_position<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<u64> {
-    poll_fn(move |cx| unsafe { Pin::new_unchecked(&mut *output).poll_position(cx) }).await
+/// awaitable verison of `poll_seq_write`.
+async fn seq_write<T: SeqWrite + ?Sized>(
+    output: &mut T,
+    buf: &[u8],
+    position: &mut u64,
+) -> io::Result<usize> {
+    let put =
+        poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await?;
+    *position += put as u64;
+    Ok(put)
 }
 
-/// awaitable verison of `poll_seq_write`.
-async fn seq_write<T: SeqWrite + ?Sized>(output: &mut T, buf: &[u8]) -> io::Result<usize> {
-    poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await
+/// awaitable version of 'poll_flush'.
+async fn flush<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<()> {
+    poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_flush(cx) }).await
 }
 
 /// Write the entire contents of a buffer, handling short writes.
-async fn seq_write_all<T: SeqWrite + ?Sized>(output: &mut T, mut buf: &[u8]) -> io::Result<()> {
+async fn seq_write_all<T: SeqWrite + ?Sized>(
+    output: &mut T,
+    mut buf: &[u8],
+    position: &mut u64,
+) -> io::Result<()> {
     while !buf.is_empty() {
-        let got = seq_write(&mut *output, buf).await?;
+        let got = seq_write(&mut *output, buf, &mut *position).await?;
         buf = &buf[got..];
     }
     Ok(())
 }
 
 /// Write an endian-swappable struct.
-async fn seq_write_struct<E: Endian, T>(output: &mut T, data: E) -> io::Result<()>
+async fn seq_write_struct<E: Endian, T>(
+    output: &mut T,
+    data: E,
+    position: &mut u64,
+) -> io::Result<()>
 where
     T: SeqWrite + ?Sized,
 {
     let data = data.to_le();
-    seq_write_all(output, unsafe {
-        std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
-    })
-    .await
+    let buf =
+        unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
+    seq_write_all(output, buf, position).await
 }
 
 /// Write a pxar entry.
-async fn seq_write_pxar_entry<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
+async fn seq_write_pxar_entry<T>(
+    output: &mut T,
+    htype: u64,
+    data: &[u8],
+    position: &mut u64,
+) -> io::Result<()>
 where
     T: SeqWrite + ?Sized,
 {
     let header = format::Header::with_content_size(htype, data.len() as u64);
     header.check_header_size()?;
 
-    seq_write_struct(&mut *output, header).await?;
-    seq_write_all(output, data).await
+    seq_write_struct(&mut *output, header, &mut *position).await?;
+    seq_write_all(output, data, position).await
 }
 
 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
 /// data buffer.
-async fn seq_write_pxar_entry_zero<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
+async fn seq_write_pxar_entry_zero<T>(
+    output: &mut T,
+    htype: u64,
+    data: &[u8],
+    position: &mut u64,
+) -> io::Result<()>
 where
     T: SeqWrite + ?Sized,
 {
     let header = format::Header::with_content_size(htype, 1 + data.len() as u64);
     header.check_header_size()?;
 
-    seq_write_struct(&mut *output, header).await?;
-    seq_write_all(&mut *output, data).await?;
-    seq_write_all(output, &[0u8]).await
+    seq_write_struct(&mut *output, header, &mut *position).await?;
+    seq_write_all(&mut *output, data, &mut *position).await?;
+    seq_write_all(output, &[0u8], position).await
 }
 
 /// Write a pxar entry consiting of an endian-swappable struct.
-async fn seq_write_pxar_struct_entry<E, T>(output: &mut T, htype: u64, data: E) -> io::Result<()>
+async fn seq_write_pxar_struct_entry<E, T>(
+    output: &mut T,
+    htype: u64,
+    data: E,
+    position: &mut u64,
+) -> io::Result<()>
 where
     T: SeqWrite + ?Sized,
     E: Endian,
 {
     let data = data.to_le();
-    seq_write_pxar_entry(output, htype, unsafe {
-        std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
-    })
-    .await
+    let buf =
+        unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
+    seq_write_pxar_entry(output, htype, buf, position).await
 }
 
 /// Error conditions caused by wrong usage of this crate.
@@ -197,6 +209,7 @@ struct EncoderState {
     /// Offset of this directory's ENTRY.
     entry_offset: u64,
 
+    #[allow(dead_code)]
     /// Offset to this directory's first FILENAME.
     files_offset: u64,
 
@@ -205,6 +218,9 @@ struct EncoderState {
 
     /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
     file_hash: u64,
+
+    /// We need to keep track how much we have written to get offsets.
+    write_position: u64,
 }
 
 impl EncoderState {
@@ -220,19 +236,55 @@ impl EncoderState {
     }
 }
 
+pub(crate) enum EncoderOutput<'a, T> {
+    Owned(T),
+    Borrowed(&'a mut T),
+}
+
+impl<'a, T> EncoderOutput<'a, T> {
+    #[inline]
+    fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
+    where
+        'a: 's,
+    {
+        EncoderOutput::Borrowed(self.as_mut())
+    }
+}
+
+impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
+    fn as_mut(&mut self) -> &mut T {
+        match self {
+            EncoderOutput::Owned(o) => o,
+            EncoderOutput::Borrowed(b) => b,
+        }
+    }
+}
+
+impl<'a, T> std::convert::From<T> for EncoderOutput<'a, T> {
+    fn from(t: T) -> Self {
+        EncoderOutput::Owned(t)
+    }
+}
+
+impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
+    fn from(t: &'a mut T) -> Self {
+        EncoderOutput::Borrowed(t)
+    }
+}
+
 /// The encoder state machine implementation for a directory.
 ///
 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
 /// synchronous or `async` I/O objects in as output.
 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
-    output: T,
+    output: EncoderOutput<'a, T>,
     state: EncoderState,
     parent: Option<&'a mut EncoderState>,
     finished: bool,
 
     /// Since only the "current" entry can be actively writing files, we share the file copy
     /// buffer.
-    file_copy_buffer: Rc<RefCell<Vec<u8>>>,
+    file_copy_buffer: Arc<Mutex<Vec<u8>>>,
 }
 
 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
@@ -251,7 +303,10 @@ impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
 }
 
 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
-    pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
+    pub async fn new(
+        output: EncoderOutput<'a, T>,
+        metadata: &Metadata,
+    ) -> io::Result<EncoderImpl<'a, T>> {
         if !metadata.is_dir() {
             io_bail!("directory metadata must contain the directory mode flag");
         }
@@ -260,11 +315,13 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
             state: EncoderState::default(),
             parent: None,
             finished: false,
-            file_copy_buffer: Rc::new(RefCell::new(crate::util::vec_new(1024 * 1024))),
+            file_copy_buffer: Arc::new(Mutex::new(unsafe {
+                crate::util::vec_new_uninitialized(1024 * 1024)
+            })),
         };
 
         this.encode_metadata(metadata).await?;
-        this.state.files_offset = seq_write_position(&mut this.output).await?;
+        this.state.files_offset = this.position();
 
         Ok(this)
     }
@@ -282,7 +339,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         metadata: &Metadata,
         file_name: &Path,
         file_size: u64,
-    ) -> io::Result<FileImpl<'b>>
+    ) -> io::Result<FileImpl<'b, T>>
     where
         'a: 'b,
     {
@@ -295,26 +352,26 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         metadata: &Metadata,
         file_name: &[u8],
         file_size: u64,
-    ) -> io::Result<FileImpl<'b>>
+    ) -> io::Result<FileImpl<'b, T>>
     where
         'a: 'b,
     {
         self.check()?;
 
-        let file_offset = seq_write_position(&mut self.output).await?;
+        let file_offset = self.position();
         self.start_file_do(Some(metadata), file_name).await?;
 
         let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
         header.check_header_size()?;
 
-        seq_write_struct(&mut self.output, header).await?;
+        seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
 
-        let payload_data_offset = seq_write_position(&mut self.output).await?;
+        let payload_data_offset = self.position();
 
         let meta_size = payload_data_offset - file_offset;
 
         Ok(FileImpl {
-            output: &mut self.output,
+            output: self.output.as_mut(),
             goodbye_item: GoodbyeItem {
                 hash: format::hash_filename(file_name),
                 offset: file_offset,
@@ -325,6 +382,31 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         })
     }
 
+    fn take_file_copy_buffer(&self) -> Vec<u8> {
+        let buf: Vec<_> = take(
+            &mut self
+                .file_copy_buffer
+                .lock()
+                .expect("failed to lock temporary buffer mutex"),
+        );
+        if buf.len() < 1024 * 1024 {
+            drop(buf);
+            unsafe { crate::util::vec_new_uninitialized(1024 * 1024) }
+        } else {
+            buf
+        }
+    }
+
+    fn put_file_copy_buffer(&self, buf: Vec<u8>) {
+        let mut lock = self
+            .file_copy_buffer
+            .lock()
+            .expect("failed to lock temporary buffer mutex");
+        if lock.len() < buf.len() {
+            *lock = buf;
+        }
+    }
+
     /// Return a file offset usable with `add_hardlink`.
     pub async fn add_file(
         &mut self,
@@ -333,9 +415,8 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         file_size: u64,
         content: &mut dyn SeqRead,
     ) -> io::Result<LinkOffset> {
-        let buf = Rc::clone(&self.file_copy_buffer);
+        let mut buf = self.take_file_copy_buffer();
         let mut file = self.create_file(metadata, file_name, file_size).await?;
-        let mut buf = buf.borrow_mut();
         loop {
             let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
             if got == 0 {
@@ -344,7 +425,10 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
                 file.write_all(&buf[..got]).await?;
             }
         }
-        Ok(file.file_offset())
+        let offset = file.file_offset();
+        drop(file);
+        self.put_file_copy_buffer(buf);
+        Ok(offset)
     }
 
     /// Return a file offset usable with `add_hardlink`.
@@ -375,7 +459,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         target: &Path,
         target_offset: LinkOffset,
     ) -> io::Result<()> {
-        let current_offset = seq_write_position(&mut self.output).await?;
+        let current_offset = self.position();
         if current_offset <= target_offset.0 {
             io_bail!("invalid hardlink offset, can only point to prior files");
         }
@@ -449,16 +533,22 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     ) -> io::Result<LinkOffset> {
         self.check()?;
 
-        let file_offset = seq_write_position(&mut self.output).await?;
+        let file_offset = self.position();
 
         let file_name = file_name.as_os_str().as_bytes();
 
         self.start_file_do(metadata, file_name).await?;
         if let Some((htype, entry_data)) = entry_htype_data {
-            seq_write_pxar_entry(&mut self.output, htype, entry_data).await?;
+            seq_write_pxar_entry(
+                self.output.as_mut(),
+                htype,
+                entry_data,
+                &mut self.state.write_position,
+            )
+            .await?;
         }
 
-        let end_offset = seq_write_position(&mut self.output).await?;
+        let end_offset = self.position();
 
         self.state.items.push(GoodbyeItem {
             hash: format::hash_filename(file_name),
@@ -469,20 +559,16 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         Ok(LinkOffset(file_offset))
     }
 
-    /// Helper
     #[inline]
-    async fn position(&mut self) -> io::Result<u64> {
-        seq_write_position(&mut self.output).await
+    fn position(&mut self) -> u64 {
+        self.state.write_position
     }
 
-    pub async fn create_directory<'b>(
-        &'b mut self,
+    pub async fn create_directory(
+        &mut self,
         file_name: &Path,
         metadata: &Metadata,
-    ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>>
-    where
-        'a: 'b,
-    {
+    ) -> io::Result<EncoderImpl<'_, T>> {
         self.check()?;
 
         if !metadata.is_dir() {
@@ -492,26 +578,33 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         let file_name = file_name.as_os_str().as_bytes();
         let file_hash = format::hash_filename(file_name);
 
-        let file_offset = self.position().await?;
+        let file_offset = self.position();
         self.encode_filename(file_name).await?;
 
-        let entry_offset = self.position().await?;
-        self.encode_metadata(&metadata).await?;
+        let entry_offset = self.position();
+        self.encode_metadata(metadata).await?;
 
-        let files_offset = self.position().await?;
+        let files_offset = self.position();
+
+        // the child will write to OUR state now:
+        let write_position = self.position();
+
+        let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
 
         Ok(EncoderImpl {
-            output: self.output.as_trait_object(),
+            // always forward as Borrowed(), to avoid stacking references on nested calls
+            output: self.output.to_borrowed_mut(),
             state: EncoderState {
                 entry_offset,
                 files_offset,
                 file_offset: Some(file_offset),
-                file_hash: file_hash,
+                file_hash,
+                write_position,
                 ..Default::default()
             },
             parent: Some(&mut self.state),
             finished: false,
-            file_copy_buffer: Rc::clone(&self.file_copy_buffer),
+            file_copy_buffer,
         })
     }
 
@@ -522,14 +615,19 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     ) -> io::Result<()> {
         self.encode_filename(file_name).await?;
         if let Some(metadata) = metadata {
-            self.encode_metadata(&metadata).await?;
+            self.encode_metadata(metadata).await?;
         }
         Ok(())
     }
 
     async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
-        seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ENTRY, metadata.stat.clone())
-            .await?;
+        seq_write_pxar_struct_entry(
+            self.output.as_mut(),
+            format::PXAR_ENTRY,
+            metadata.stat.clone(),
+            &mut self.state.write_position,
+        )
+        .await?;
 
         for xattr in &metadata.xattrs {
             self.write_xattr(xattr).await?;
@@ -549,44 +647,72 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
-        seq_write_pxar_entry(&mut self.output, format::PXAR_XATTR, &xattr.data).await
+        seq_write_pxar_entry(
+            self.output.as_mut(),
+            format::PXAR_XATTR,
+            &xattr.data,
+            &mut self.state.write_position,
+        )
+        .await
     }
 
     async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
         for acl in &acl.users {
-            seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_USER, acl.clone())
-                .await?;
+            seq_write_pxar_struct_entry(
+                self.output.as_mut(),
+                format::PXAR_ACL_USER,
+                acl.clone(),
+                &mut self.state.write_position,
+            )
+            .await?;
         }
 
         for acl in &acl.groups {
-            seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP, acl.clone())
-                .await?;
+            seq_write_pxar_struct_entry(
+                self.output.as_mut(),
+                format::PXAR_ACL_GROUP,
+                acl.clone(),
+                &mut self.state.write_position,
+            )
+            .await?;
         }
 
         if let Some(acl) = &acl.group_obj {
-            seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP_OBJ, acl.clone())
-                .await?;
+            seq_write_pxar_struct_entry(
+                self.output.as_mut(),
+                format::PXAR_ACL_GROUP_OBJ,
+                acl.clone(),
+                &mut self.state.write_position,
+            )
+            .await?;
         }
 
         if let Some(acl) = &acl.default {
-            seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_DEFAULT, acl.clone())
-                .await?;
+            seq_write_pxar_struct_entry(
+                self.output.as_mut(),
+                format::PXAR_ACL_DEFAULT,
+                acl.clone(),
+                &mut self.state.write_position,
+            )
+            .await?;
         }
 
         for acl in &acl.default_users {
             seq_write_pxar_struct_entry(
-                &mut self.output,
+                self.output.as_mut(),
                 format::PXAR_ACL_DEFAULT_USER,
                 acl.clone(),
+                &mut self.state.write_position,
             )
             .await?;
         }
 
         for acl in &acl.default_groups {
             seq_write_pxar_struct_entry(
-                &mut self.output,
+                self.output.as_mut(),
                 format::PXAR_ACL_DEFAULT_GROUP,
                 acl.clone(),
+                &mut self.state.write_position,
             )
             .await?;
         }
@@ -595,7 +721,13 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
-        seq_write_pxar_entry(&mut self.output, format::PXAR_FCAPS, &fcaps.data).await
+        seq_write_pxar_entry(
+            self.output.as_mut(),
+            format::PXAR_FCAPS,
+            &fcaps.data,
+            &mut self.state.write_position,
+        )
+        .await
     }
 
     async fn write_quota_project_id(
@@ -603,29 +735,50 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         quota_project_id: &format::QuotaProjectId,
     ) -> io::Result<()> {
         seq_write_pxar_struct_entry(
-            &mut self.output,
+            self.output.as_mut(),
             format::PXAR_QUOTA_PROJID,
-            quota_project_id.clone(),
+            *quota_project_id,
+            &mut self.state.write_position,
         )
         .await
     }
 
     async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
         crate::util::validate_filename(file_name)?;
-        seq_write_pxar_entry_zero(&mut self.output, format::PXAR_FILENAME, file_name).await
+        seq_write_pxar_entry_zero(
+            self.output.as_mut(),
+            format::PXAR_FILENAME,
+            file_name,
+            &mut self.state.write_position,
+        )
+        .await
     }
 
     pub async fn finish(mut self) -> io::Result<()> {
         let tail_bytes = self.finish_goodbye_table().await?;
-        seq_write_pxar_entry(&mut self.output, format::PXAR_GOODBYE, &tail_bytes).await?;
+        seq_write_pxar_entry(
+            self.output.as_mut(),
+            format::PXAR_GOODBYE,
+            &tail_bytes,
+            &mut self.state.write_position,
+        )
+        .await?;
+
+        if let EncoderOutput::Owned(output) = &mut self.output {
+            flush(output).await?;
+        }
+
+        // done up here because of the self-borrow and to propagate
+        let end_offset = self.position();
+
         if let Some(parent) = &mut self.parent {
+            parent.write_position = end_offset;
+
             let file_offset = self
                 .state
                 .file_offset
                 .expect("internal error: parent set but no file_offset?");
 
-            let end_offset = seq_write_position(&mut self.output).await?;
-
             parent.items.push(GoodbyeItem {
                 hash: self.state.file_hash,
                 offset: file_offset,
@@ -637,7 +790,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
-        let goodbye_offset = seq_write_position(&mut self.output).await?;
+        let goodbye_offset = self.position();
 
         // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
         let mut tail = take(&mut self.state.items);
@@ -648,6 +801,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
 
         let mut bst = Vec::with_capacity(tail.len() + 1);
+        #[allow(clippy::uninit_vec)]
         unsafe {
             bst.set_len(tail.len());
         }
@@ -680,8 +834,8 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 }
 
 /// Writer for a file object in a directory.
-pub struct FileImpl<'a> {
-    output: &'a mut dyn SeqWrite,
+pub(crate) struct FileImpl<'a, S: SeqWrite> {
+    output: &'a mut S,
 
     /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
     /// directly instead of on Drop of FileImpl?
@@ -696,7 +850,7 @@ pub struct FileImpl<'a> {
     parent: &'a mut EncoderState,
 }
 
-impl<'a> Drop for FileImpl<'a> {
+impl<'a, S: SeqWrite> Drop for FileImpl<'a, S> {
     fn drop(&mut self) {
         if self.remaining_size != 0 {
             self.parent.add_error(EncodeError::IncompleteFile);
@@ -706,7 +860,7 @@ impl<'a> Drop for FileImpl<'a> {
     }
 }
 
-impl<'a> FileImpl<'a> {
+impl<'a, S: SeqWrite> FileImpl<'a, S> {
     /// Get the file offset to be able to reference it with `add_hardlink`.
     pub fn file_offset(&self) -> LinkOffset {
         LinkOffset(self.goodbye_item.offset)
@@ -721,7 +875,7 @@ impl<'a> FileImpl<'a> {
     }
 
     /// Poll write interface to more easily connect to tokio/futures.
-    #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
+    #[cfg(feature = "tokio-io")]
     pub fn poll_write(
         self: Pin<&mut Self>,
         cx: &mut Context,
@@ -733,6 +887,7 @@ impl<'a> FileImpl<'a> {
         match output.poll_seq_write(cx, data) {
             Poll::Ready(Ok(put)) => {
                 this.remaining_size -= put as u64;
+                this.parent.write_position += put as u64;
                 Poll::Ready(Ok(put))
             }
             other => other,
@@ -740,21 +895,18 @@ impl<'a> FileImpl<'a> {
     }
 
     /// Poll flush interface to more easily connect to tokio/futures.
-    #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
+    #[cfg(feature = "tokio-io")]
     pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
-        unsafe {
-            self.map_unchecked_mut(|this| &mut this.output)
-                .poll_flush(cx)
-        }
+        unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
     }
 
     /// Poll close/shutdown interface to more easily connect to tokio/futures.
-    #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
+    ///
+    /// This just calls flush, though, since we're just a virtual writer writing to the file
+    /// provided by our encoder.
+    #[cfg(feature = "tokio-io")]
     pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
-        unsafe {
-            self.map_unchecked_mut(|this| &mut this.output)
-                .poll_close(cx)
-        }
+        unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
     }
 
     /// Write file data for the current file entry in a pxar archive.
@@ -764,22 +916,26 @@ impl<'a> FileImpl<'a> {
     /// for convenience.
     pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
         self.check_remaining(data.len())?;
-        let put = seq_write(&mut self.output, data).await?;
+        let put =
+            poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
+                .await?;
+        //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
         self.remaining_size -= put as u64;
+        self.parent.write_position += put as u64;
         Ok(put)
     }
 
     /// Completely write file data for the current file entry in a pxar archive.
     pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
         self.check_remaining(data.len())?;
-        seq_write_all(&mut self.output, data).await?;
+        seq_write_all(self.output, data, &mut self.parent.write_position).await?;
         self.remaining_size -= data.len() as u64;
         Ok(())
     }
 }
 
 #[cfg(feature = "tokio-io")]
-impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
+impl<'a, S: SeqWrite> tokio::io::AsyncWrite for FileImpl<'a, S> {
     fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
         FileImpl::poll_write(self, cx, buf)
     }
@@ -792,18 +948,3 @@ impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
         FileImpl::poll_close(self, cx)
     }
 }
-
-#[cfg(feature = "futures-io")]
-impl<'a> futures::io::AsyncWrite for FileImpl<'a> {
-    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
-        FileImpl::poll_write(self, cx, buf)
-    }
-
-    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
-        FileImpl::poll_flush(self, cx)
-    }
-
-    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
-        FileImpl::poll_close(self, cx)
-    }
-}