//!
//! This is the implementation used by both the synchronous and async pxar wrappers.
+#![deny(missing_docs)]
+
use std::io;
use std::mem::{forget, size_of, size_of_val, take};
use std::os::unix::ffi::OsStrExt;
pub struct LinkOffset(u64);
impl LinkOffset {
+ /// Get the raw byte offset of this link.
#[inline]
pub fn raw(self) -> u64 {
self.0
/// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
/// wrapper.
pub trait SeqWrite {
+ /// Attempt to perform a sequential write to the file. On success, the number of written bytes
+ /// is returned as `Poll::Ready(Ok(bytes))`.
+ ///
+ /// If writing is not yet possible, `Poll::Pending` is returned and the current task will be
+ /// notified via the `cx.waker()` when writing becomes possible.
fn poll_seq_write(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<io::Result<usize>>;
+ /// Attempt to flush the output, ensuring that all buffered data reaches the destination.
+ ///
+ /// On success, returns `Poll::Ready(Ok(()))`.
+ ///
+ /// If flushing cannot complete immediately, `Poll::Pending` is returned and the current task
+ /// will be notified via `cx.waker()` when progress can be made.
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
}
Ok(put)
}
+/// awaitable version of 'poll_flush'.
+async fn flush<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<()> {
+ poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_flush(cx) }).await
+}
+
/// Write the entire contents of a buffer, handling short writes.
async fn seq_write_all<T: SeqWrite + ?Sized>(
output: &mut T,
/// Offset of this directory's ENTRY.
entry_offset: u64,
+ #[allow(dead_code)]
/// Offset to this directory's first FILENAME.
files_offset: u64,
Borrowed(&'a mut T),
}
+impl<'a, T> EncoderOutput<'a, T> {
+ #[inline]
+ fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
+ where
+ 'a: 's,
+ {
+ EncoderOutput::Borrowed(self.as_mut())
+ }
+}
+
impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
fn as_mut(&mut self) -> &mut T {
match self {
- EncoderOutput::Owned(ref mut o) => o,
+ EncoderOutput::Owned(o) => o,
EncoderOutput::Borrowed(b) => b,
}
}
}
impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
- pub(crate) async fn new(output: EncoderOutput<'a, T>, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
+ pub async fn new(
+ output: EncoderOutput<'a, T>,
+ metadata: &Metadata,
+ ) -> io::Result<EncoderImpl<'a, T>> {
if !metadata.is_dir() {
io_bail!("directory metadata must contain the directory mode flag");
}
state: EncoderState::default(),
parent: None,
finished: false,
- file_copy_buffer: Arc::new(Mutex::new(crate::util::vec_new(1024 * 1024))),
+ file_copy_buffer: Arc::new(Mutex::new(unsafe {
+ crate::util::vec_new_uninitialized(1024 * 1024)
+ })),
};
this.encode_metadata(metadata).await?;
let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
header.check_header_size()?;
- seq_write_struct(
- self.output.as_mut(),
- header,
- &mut self.state.write_position,
- )
- .await?;
+ seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
let payload_data_offset = self.position();
})
}
+ fn take_file_copy_buffer(&self) -> Vec<u8> {
+ let buf: Vec<_> = take(
+ &mut self
+ .file_copy_buffer
+ .lock()
+ .expect("failed to lock temporary buffer mutex"),
+ );
+ if buf.len() < 1024 * 1024 {
+ drop(buf);
+ unsafe { crate::util::vec_new_uninitialized(1024 * 1024) }
+ } else {
+ buf
+ }
+ }
+
+ fn put_file_copy_buffer(&self, buf: Vec<u8>) {
+ let mut lock = self
+ .file_copy_buffer
+ .lock()
+ .expect("failed to lock temporary buffer mutex");
+ if lock.len() < buf.len() {
+ *lock = buf;
+ }
+ }
+
/// Return a file offset usable with `add_hardlink`.
pub async fn add_file(
&mut self,
file_size: u64,
content: &mut dyn SeqRead,
) -> io::Result<LinkOffset> {
- let buf = Arc::clone(&self.file_copy_buffer);
+ let mut buf = self.take_file_copy_buffer();
let mut file = self.create_file(metadata, file_name, file_size).await?;
- let mut buf = buf.lock().expect("failed to lock temporary buffer mutex");
loop {
let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
if got == 0 {
file.write_all(&buf[..got]).await?;
}
}
- Ok(file.file_offset())
+ let offset = file.file_offset();
+ drop(file);
+ self.put_file_copy_buffer(buf);
+ Ok(offset)
}
/// Return a file offset usable with `add_hardlink`.
self.encode_filename(file_name).await?;
let entry_offset = self.position();
- self.encode_metadata(&metadata).await?;
+ self.encode_metadata(metadata).await?;
let files_offset = self.position();
Ok(EncoderImpl {
// always forward as Borrowed(), to avoid stacking references on nested calls
- output: self.output.as_mut().into(),
+ output: self.output.to_borrowed_mut(),
state: EncoderState {
entry_offset,
files_offset,
) -> io::Result<()> {
self.encode_filename(file_name).await?;
if let Some(metadata) = metadata {
- self.encode_metadata(&metadata).await?;
+ self.encode_metadata(metadata).await?;
}
Ok(())
}
)
.await?;
+ if let EncoderOutput::Owned(output) = &mut self.output {
+ flush(output).await?;
+ }
+
// done up here because of the self-borrow and to propagate
let end_offset = self.position();
tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
let mut bst = Vec::with_capacity(tail.len() + 1);
+ #[allow(clippy::uninit_vec)]
unsafe {
bst.set_len(tail.len());
}
}
/// Writer for a file object in a directory.
-pub struct FileImpl<'a, S: SeqWrite> {
+pub(crate) struct FileImpl<'a, S: SeqWrite> {
output: &'a mut S,
/// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
/// Poll flush interface to more easily connect to tokio/futures.
#[cfg(feature = "tokio-io")]
pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
- unsafe {
- self.map_unchecked_mut(|this| this.output)
- .poll_flush(cx)
- }
+ unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
}
/// Poll close/shutdown interface to more easily connect to tokio/futures.
/// provided by our encoder.
#[cfg(feature = "tokio-io")]
pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
- unsafe {
- self.map_unchecked_mut(|this| this.output)
- .poll_flush(cx)
- }
+ unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
}
/// Write file data for the current file entry in a pxar archive.