//!
//! This is the implementation used by both the synchronous and async pxar wrappers.
+#![deny(missing_docs)]
+
use std::io;
use std::mem::{forget, size_of, size_of_val, take};
use std::os::unix::ffi::OsStrExt;
pub struct LinkOffset(u64);
impl LinkOffset {
+ /// Get the raw byte offset of this link.
#[inline]
pub fn raw(self) -> u64 {
self.0
/// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
/// wrapper.
pub trait SeqWrite {
+ /// Attempt to perform a sequential write to the file. On success, the number of written bytes
+ /// is returned as `Poll::Ready(Ok(bytes))`.
+ ///
+ /// If writing is not yet possible, `Poll::Pending` is returned and the current task will be
+ /// notified via the `cx.waker()` when writing becomes possible.
fn poll_seq_write(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<io::Result<usize>>;
+ /// Attempt to flush the output, ensuring that all buffered data reaches the destination.
+ ///
+ /// On success, returns `Poll::Ready(Ok(()))`.
+ ///
+ /// If flushing cannot complete immediately, `Poll::Pending` is returned and the current task
+ /// will be notified via `cx.waker()` when progress can be made.
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
}
state: EncoderState::default(),
parent: None,
finished: false,
- file_copy_buffer: Arc::new(Mutex::new(crate::util::vec_new(1024 * 1024))),
+ file_copy_buffer: Arc::new(Mutex::new(unsafe {
+ crate::util::vec_new_uninitialized(1024 * 1024)
+ })),
};
this.encode_metadata(metadata).await?;
})
}
+ fn take_file_copy_buffer(&self) -> Vec<u8> {
+ let buf: Vec<_> = take(
+ &mut self
+ .file_copy_buffer
+ .lock()
+ .expect("failed to lock temporary buffer mutex"),
+ );
+ if buf.len() < 1024 * 1024 {
+ drop(buf);
+ unsafe { crate::util::vec_new_uninitialized(1024 * 1024) }
+ } else {
+ buf
+ }
+ }
+
+ fn put_file_copy_buffer(&self, buf: Vec<u8>) {
+ let mut lock = self
+ .file_copy_buffer
+ .lock()
+ .expect("failed to lock temporary buffer mutex");
+ if lock.len() < buf.len() {
+ *lock = buf;
+ }
+ }
+
/// Return a file offset usable with `add_hardlink`.
pub async fn add_file(
&mut self,
file_size: u64,
content: &mut dyn SeqRead,
) -> io::Result<LinkOffset> {
- let buf = Arc::clone(&self.file_copy_buffer);
+ let mut buf = self.take_file_copy_buffer();
let mut file = self.create_file(metadata, file_name, file_size).await?;
- let mut buf = buf.lock().expect("failed to lock temporary buffer mutex");
loop {
let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
if got == 0 {
file.write_all(&buf[..got]).await?;
}
}
- Ok(file.file_offset())
+ let offset = file.file_offset();
+ drop(file);
+ self.put_file_copy_buffer(buf);
+ Ok(offset)
}
/// Return a file offset usable with `add_hardlink`.
self.encode_filename(file_name).await?;
let entry_offset = self.position();
- self.encode_metadata(&metadata).await?;
+ self.encode_metadata(metadata).await?;
let files_offset = self.position();
) -> io::Result<()> {
self.encode_filename(file_name).await?;
if let Some(metadata) = metadata {
- self.encode_metadata(&metadata).await?;
+ self.encode_metadata(metadata).await?;
}
Ok(())
}
tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
let mut bst = Vec::with_capacity(tail.len() + 1);
+ #[allow(clippy::uninit_vec)]
unsafe {
bst.set_len(tail.len());
}
}
/// Writer for a file object in a directory.
-pub struct FileImpl<'a, S: SeqWrite> {
+pub(crate) struct FileImpl<'a, S: SeqWrite> {
output: &'a mut S,
/// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it