X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=src%2Fencoder%2Fmod.rs;h=d14b2d04765bce11378af2de2007205675f64635;hb=ddc08ebb8b6540764aeabb3fa72695b6adfc3b47;hp=1004efaf9aba9e9cb3658c2250d5e2e386530652;hpb=737f75cf97e9163875e04c8a641e47117915c0ad;p=pxar.git diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs index 1004efa..d14b2d0 100644 --- a/src/encoder/mod.rs +++ b/src/encoder/mod.rs @@ -2,6 +2,8 @@ //! //! This is the implementation used by both the synchronous and async pxar wrappers. +#![deny(missing_docs)] + use std::io; use std::mem::{forget, size_of, size_of_val, take}; use std::os::unix::ffi::OsStrExt; @@ -29,6 +31,7 @@ pub use sync::Encoder; pub struct LinkOffset(u64); impl LinkOffset { + /// Get the raw byte offset of this link. #[inline] pub fn raw(self) -> u64 { self.0 @@ -41,12 +44,23 @@ impl LinkOffset { /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous /// wrapper. pub trait SeqWrite { + /// Attempt to perform a sequential write to the file. On success, the number of written bytes + /// is returned as `Poll::Ready(Ok(bytes))`. + /// + /// If writing is not yet possible, `Poll::Pending` is returned and the current task will be + /// notified via the `cx.waker()` when writing becomes possible. fn poll_seq_write( self: Pin<&mut Self>, cx: &mut Context, buf: &[u8], ) -> Poll>; + /// Attempt to flush the output, ensuring that all buffered data reaches the destination. + /// + /// On success, returns `Poll::Ready(Ok(()))`. + /// + /// If flushing cannot complete immediately, `Poll::Pending` is returned and the current task + /// will be notified via `cx.waker()` when progress can be made. fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll>; } @@ -83,6 +97,11 @@ async fn seq_write( Ok(put) } +/// awaitable version of 'poll_flush'. +async fn flush(output: &mut T) -> io::Result<()> { + poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_flush(cx) }).await +} + /// Write the entire contents of a buffer, handling short writes. async fn seq_write_all( output: &mut T, @@ -190,6 +209,7 @@ struct EncoderState { /// Offset of this directory's ENTRY. entry_offset: u64, + #[allow(dead_code)] /// Offset to this directory's first FILENAME. files_offset: u64, @@ -223,7 +243,7 @@ pub(crate) enum EncoderOutput<'a, T> { impl<'a, T> EncoderOutput<'a, T> { #[inline] - fn to_borrowed<'s>(&'s mut self) -> EncoderOutput<'s, T> + fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T> where 'a: 's, { @@ -295,7 +315,9 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { state: EncoderState::default(), parent: None, finished: false, - file_copy_buffer: Arc::new(Mutex::new(crate::util::vec_new(1024 * 1024))), + file_copy_buffer: Arc::new(Mutex::new(unsafe { + crate::util::vec_new_uninitialized(1024 * 1024) + })), }; this.encode_metadata(metadata).await?; @@ -360,6 +382,31 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { }) } + fn take_file_copy_buffer(&self) -> Vec { + let buf: Vec<_> = take( + &mut self + .file_copy_buffer + .lock() + .expect("failed to lock temporary buffer mutex"), + ); + if buf.len() < 1024 * 1024 { + drop(buf); + unsafe { crate::util::vec_new_uninitialized(1024 * 1024) } + } else { + buf + } + } + + fn put_file_copy_buffer(&self, buf: Vec) { + let mut lock = self + .file_copy_buffer + .lock() + .expect("failed to lock temporary buffer mutex"); + if lock.len() < buf.len() { + *lock = buf; + } + } + /// Return a file offset usable with `add_hardlink`. pub async fn add_file( &mut self, @@ -368,9 +415,8 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { file_size: u64, content: &mut dyn SeqRead, ) -> io::Result { - let buf = Arc::clone(&self.file_copy_buffer); + let mut buf = self.take_file_copy_buffer(); let mut file = self.create_file(metadata, file_name, file_size).await?; - let mut buf = buf.lock().expect("failed to lock temporary buffer mutex"); loop { let got = decoder::seq_read(&mut *content, &mut buf[..]).await?; if got == 0 { @@ -379,7 +425,10 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { file.write_all(&buf[..got]).await?; } } - Ok(file.file_offset()) + let offset = file.file_offset(); + drop(file); + self.put_file_copy_buffer(buf); + Ok(offset) } /// Return a file offset usable with `add_hardlink`. @@ -533,7 +582,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { self.encode_filename(file_name).await?; let entry_offset = self.position(); - self.encode_metadata(&metadata).await?; + self.encode_metadata(metadata).await?; let files_offset = self.position(); @@ -544,7 +593,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { Ok(EncoderImpl { // always forward as Borrowed(), to avoid stacking references on nested calls - output: self.output.to_borrowed(), + output: self.output.to_borrowed_mut(), state: EncoderState { entry_offset, files_offset, @@ -566,7 +615,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { ) -> io::Result<()> { self.encode_filename(file_name).await?; if let Some(metadata) = metadata { - self.encode_metadata(&metadata).await?; + self.encode_metadata(metadata).await?; } Ok(()) } @@ -715,6 +764,10 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { ) .await?; + if let EncoderOutput::Owned(output) = &mut self.output { + flush(output).await?; + } + // done up here because of the self-borrow and to propagate let end_offset = self.position(); @@ -748,6 +801,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash)); let mut bst = Vec::with_capacity(tail.len() + 1); + #[allow(clippy::uninit_vec)] unsafe { bst.set_len(tail.len()); } @@ -780,7 +834,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { } /// Writer for a file object in a directory. -pub struct FileImpl<'a, S: SeqWrite> { +pub(crate) struct FileImpl<'a, S: SeqWrite> { output: &'a mut S, /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it