//!
//! This is the implementation used by both the synchronous and async pxar wrappers.
+#![deny(missing_docs)]
+
use std::io;
use std::mem::{forget, size_of, size_of_val, take};
use std::os::unix::ffi::OsStrExt;
use std::path::Path;
use std::pin::Pin;
+use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use endian_trait::Endian;
#[doc(inline)]
pub use sync::Encoder;
+/// File reference used to create hard links.
+#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
+pub struct LinkOffset(u64);
+
+impl LinkOffset {
+ /// Get the raw byte offset of this link.
+ #[inline]
+ pub fn raw(self) -> u64 {
+ self.0
+ }
+}
+
/// Sequential write interface used by the encoder's state machine.
///
/// This is our internal writer trait which is available for `std::io::Write` types in the
/// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
/// wrapper.
pub trait SeqWrite {
+ /// Attempt to perform a sequential write to the file. On success, the number of written bytes
+ /// is returned as `Poll::Ready(Ok(bytes))`.
+ ///
+ /// If writing is not yet possible, `Poll::Pending` is returned and the current task will be
+ /// notified via the `cx.waker()` when writing becomes possible.
fn poll_seq_write(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<io::Result<usize>>;
+ /// Attempt to flush the output, ensuring that all buffered data reaches the destination.
+ ///
+ /// On success, returns `Poll::Ready(Ok(()))`.
+ ///
+ /// If flushing cannot complete immediately, `Poll::Pending` is returned and the current task
+ /// will be notified via `cx.waker()` when progress can be made.
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
-
- fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
-
- /// While writing to a pxar archive we need to remember how much dat we've written to track some
- /// offsets. Particularly items like the goodbye table need to be able to compute offsets to
- /// further back in the archive.
- fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>>;
-
- /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
- /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
- /// subdirectory in that would have a trait object pointing to the trait object, and so on.
- fn as_trait_object(&mut self) -> &mut dyn SeqWrite
- where
- Self: Sized,
- {
- self as &mut dyn SeqWrite
- }
}
/// Allow using trait objects for generics taking a `SeqWrite`.
-impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
+impl<S> SeqWrite for &mut S
+where
+ S: SeqWrite + ?Sized,
+{
fn poll_seq_write(
self: Pin<&mut Self>,
cx: &mut Context,
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
}
-
- fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
- unsafe { self.map_unchecked_mut(|this| &mut **this).poll_close(cx) }
- }
-
- fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
- unsafe { self.map_unchecked_mut(|this| &mut **this).poll_position(cx) }
- }
-
- fn as_trait_object(&mut self) -> &mut dyn SeqWrite
- where
- Self: Sized,
- {
- &mut **self
- }
}
-/// awaitable version of `poll_position`.
-async fn seq_write_position<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<u64> {
- poll_fn(move |cx| unsafe { Pin::new_unchecked(&mut *output).poll_position(cx) }).await
+/// awaitable verison of `poll_seq_write`.
+async fn seq_write<T: SeqWrite + ?Sized>(
+ output: &mut T,
+ buf: &[u8],
+ position: &mut u64,
+) -> io::Result<usize> {
+ let put =
+ poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await?;
+ *position += put as u64;
+ Ok(put)
}
-/// awaitable verison of `poll_seq_write`.
-async fn seq_write<T: SeqWrite + ?Sized>(output: &mut T, buf: &[u8]) -> io::Result<usize> {
- poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await
+/// awaitable version of 'poll_flush'.
+async fn flush<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<()> {
+ poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_flush(cx) }).await
}
/// Write the entire contents of a buffer, handling short writes.
-async fn seq_write_all<T: SeqWrite + ?Sized>(output: &mut T, mut buf: &[u8]) -> io::Result<()> {
+async fn seq_write_all<T: SeqWrite + ?Sized>(
+ output: &mut T,
+ mut buf: &[u8],
+ position: &mut u64,
+) -> io::Result<()> {
while !buf.is_empty() {
- let got = seq_write(&mut *output, buf).await?;
+ let got = seq_write(&mut *output, buf, &mut *position).await?;
buf = &buf[got..];
}
Ok(())
}
/// Write an endian-swappable struct.
-async fn seq_write_struct<E: Endian, T>(output: &mut T, data: E) -> io::Result<()>
+async fn seq_write_struct<E: Endian, T>(
+ output: &mut T,
+ data: E,
+ position: &mut u64,
+) -> io::Result<()>
where
T: SeqWrite + ?Sized,
{
let data = data.to_le();
- seq_write_all(output, unsafe {
- std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
- })
- .await
+ let buf =
+ unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
+ seq_write_all(output, buf, position).await
}
/// Write a pxar entry.
-async fn seq_write_pxar_entry<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
+async fn seq_write_pxar_entry<T>(
+ output: &mut T,
+ htype: u64,
+ data: &[u8],
+ position: &mut u64,
+) -> io::Result<()>
where
T: SeqWrite + ?Sized,
{
- seq_write_struct(
- &mut *output,
- format::Header::with_content_size(htype, data.len() as u64),
- )
- .await?;
- seq_write_all(output, data).await
+ let header = format::Header::with_content_size(htype, data.len() as u64);
+ header.check_header_size()?;
+
+ seq_write_struct(&mut *output, header, &mut *position).await?;
+ seq_write_all(output, data, position).await
}
/// Write a pxar entry terminated by an additional zero which is not contained in the provided
/// data buffer.
-async fn seq_write_pxar_entry_zero<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
+async fn seq_write_pxar_entry_zero<T>(
+ output: &mut T,
+ htype: u64,
+ data: &[u8],
+ position: &mut u64,
+) -> io::Result<()>
where
T: SeqWrite + ?Sized,
{
- seq_write_struct(
- &mut *output,
- format::Header::with_content_size(htype, 1 + data.len() as u64),
- )
- .await?;
- seq_write_all(&mut *output, data).await?;
- seq_write_all(output, &[0u8]).await
+ let header = format::Header::with_content_size(htype, 1 + data.len() as u64);
+ header.check_header_size()?;
+
+ seq_write_struct(&mut *output, header, &mut *position).await?;
+ seq_write_all(&mut *output, data, &mut *position).await?;
+ seq_write_all(output, &[0u8], position).await
}
/// Write a pxar entry consiting of an endian-swappable struct.
-async fn seq_write_pxar_struct_entry<E, T>(output: &mut T, htype: u64, data: E) -> io::Result<()>
+async fn seq_write_pxar_struct_entry<E, T>(
+ output: &mut T,
+ htype: u64,
+ data: E,
+ position: &mut u64,
+) -> io::Result<()>
where
T: SeqWrite + ?Sized,
E: Endian,
{
let data = data.to_le();
- seq_write_pxar_entry(output, htype, unsafe {
- std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
- })
- .await
+ let buf =
+ unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
+ seq_write_pxar_entry(output, htype, buf, position).await
}
/// Error conditions caused by wrong usage of this crate.
/// Offset of this directory's ENTRY.
entry_offset: u64,
+ #[allow(dead_code)]
/// Offset to this directory's first FILENAME.
files_offset: u64,
/// If this is a subdirectory, this contains this directory's hash for the goodbye item.
file_hash: u64,
+
+ /// We need to keep track how much we have written to get offsets.
+ write_position: u64,
}
impl EncoderState {
}
}
+pub(crate) enum EncoderOutput<'a, T> {
+ Owned(T),
+ Borrowed(&'a mut T),
+}
+
+impl<'a, T> EncoderOutput<'a, T> {
+ #[inline]
+ fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
+ where
+ 'a: 's,
+ {
+ EncoderOutput::Borrowed(self.as_mut())
+ }
+}
+
+impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
+ fn as_mut(&mut self) -> &mut T {
+ match self {
+ EncoderOutput::Owned(o) => o,
+ EncoderOutput::Borrowed(b) => b,
+ }
+ }
+}
+
+impl<'a, T> std::convert::From<T> for EncoderOutput<'a, T> {
+ fn from(t: T) -> Self {
+ EncoderOutput::Owned(t)
+ }
+}
+
+impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
+ fn from(t: &'a mut T) -> Self {
+ EncoderOutput::Borrowed(t)
+ }
+}
+
/// The encoder state machine implementation for a directory.
///
/// We use `async fn` to implement the encoder state machine so that we can easily plug in both
/// synchronous or `async` I/O objects in as output.
pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
- output: T,
+ output: EncoderOutput<'a, T>,
state: EncoderState,
parent: Option<&'a mut EncoderState>,
finished: bool,
+
+ /// Since only the "current" entry can be actively writing files, we share the file copy
+ /// buffer.
+ file_copy_buffer: Arc<Mutex<Vec<u8>>>,
}
impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
}
impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
- pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
+ pub async fn new(
+ output: EncoderOutput<'a, T>,
+ metadata: &Metadata,
+ ) -> io::Result<EncoderImpl<'a, T>> {
if !metadata.is_dir() {
io_bail!("directory metadata must contain the directory mode flag");
}
state: EncoderState::default(),
parent: None,
finished: false,
+ file_copy_buffer: Arc::new(Mutex::new(unsafe {
+ crate::util::vec_new_uninitialized(1024 * 1024)
+ })),
};
this.encode_metadata(metadata).await?;
- this.state.files_offset = seq_write_position(&mut this.output).await?;
+ this.state.files_offset = this.position();
Ok(this)
}
metadata: &Metadata,
file_name: &Path,
file_size: u64,
- ) -> io::Result<FileImpl<'b>>
+ ) -> io::Result<FileImpl<'b, T>>
where
'a: 'b,
{
metadata: &Metadata,
file_name: &[u8],
file_size: u64,
- ) -> io::Result<FileImpl<'b>>
+ ) -> io::Result<FileImpl<'b, T>>
where
'a: 'b,
{
self.check()?;
- let file_offset = seq_write_position(&mut self.output).await?;
+ let file_offset = self.position();
self.start_file_do(Some(metadata), file_name).await?;
- seq_write_struct(
- &mut self.output,
- format::Header::with_content_size(format::PXAR_PAYLOAD, file_size),
- )
- .await?;
+ let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
+ header.check_header_size()?;
+
+ seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
- let payload_data_offset = seq_write_position(&mut self.output).await?;
+ let payload_data_offset = self.position();
let meta_size = payload_data_offset - file_offset;
Ok(FileImpl {
- output: &mut self.output,
+ output: self.output.as_mut(),
goodbye_item: GoodbyeItem {
hash: format::hash_filename(file_name),
offset: file_offset,
})
}
+ fn take_file_copy_buffer(&self) -> Vec<u8> {
+ let buf: Vec<_> = take(
+ &mut self
+ .file_copy_buffer
+ .lock()
+ .expect("failed to lock temporary buffer mutex"),
+ );
+ if buf.len() < 1024 * 1024 {
+ drop(buf);
+ unsafe { crate::util::vec_new_uninitialized(1024 * 1024) }
+ } else {
+ buf
+ }
+ }
+
+ fn put_file_copy_buffer(&self, buf: Vec<u8>) {
+ let mut lock = self
+ .file_copy_buffer
+ .lock()
+ .expect("failed to lock temporary buffer mutex");
+ if lock.len() < buf.len() {
+ *lock = buf;
+ }
+ }
+
+ /// Return a file offset usable with `add_hardlink`.
pub async fn add_file(
&mut self,
metadata: &Metadata,
file_name: &Path,
file_size: u64,
content: &mut dyn SeqRead,
- ) -> io::Result<()> {
+ ) -> io::Result<LinkOffset> {
+ let mut buf = self.take_file_copy_buffer();
let mut file = self.create_file(metadata, file_name, file_size).await?;
- let mut buf = crate::util::vec_new(4096);
loop {
- let got = decoder::seq_read(&mut *content, &mut buf).await?;
+ let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
if got == 0 {
break;
} else {
file.write_all(&buf[..got]).await?;
}
}
- Ok(())
+ let offset = file.file_offset();
+ drop(file);
+ self.put_file_copy_buffer(buf);
+ Ok(offset)
}
+ /// Return a file offset usable with `add_hardlink`.
pub async fn add_symlink(
&mut self,
metadata: &Metadata,
file_name: &Path,
target: &Path,
) -> io::Result<()> {
- self.add_file_entry(
- Some(metadata),
- file_name,
- Some((format::PXAR_SYMLINK, target.as_os_str().as_bytes())),
- )
- .await
+ let target = target.as_os_str().as_bytes();
+ let mut data = Vec::with_capacity(target.len() + 1);
+ data.extend(target);
+ data.push(0);
+ let _ofs: LinkOffset = self
+ .add_file_entry(
+ Some(metadata),
+ file_name,
+ Some((format::PXAR_SYMLINK, &data)),
+ )
+ .await?;
+ Ok(())
}
+ /// Return a file offset usable with `add_hardlink`.
pub async fn add_hardlink(
&mut self,
file_name: &Path,
target: &Path,
- offset: u64,
+ target_offset: LinkOffset,
) -> io::Result<()> {
- let hardlink = format::Hardlink {
- offset,
- data: target.as_os_str().as_bytes().to_vec(),
- };
- let hardlink = unsafe {
- std::slice::from_raw_parts(
- &hardlink as *const format::Hardlink as *const u8,
- size_of::<format::Hardlink>(),
- )
- };
- self.add_file_entry(
- None,
- file_name,
- Some((format::PXAR_HARDLINK, hardlink)),
- )
- .await
+ let current_offset = self.position();
+ if current_offset <= target_offset.0 {
+ io_bail!("invalid hardlink offset, can only point to prior files");
+ }
+
+ let offset_bytes = (current_offset - target_offset.0).to_le_bytes();
+ let target_bytes = target.as_os_str().as_bytes();
+ let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len() + 1);
+ hardlink.extend(&offset_bytes);
+ hardlink.extend(target_bytes);
+ hardlink.push(0);
+ let _this_offset: LinkOffset = self
+ .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink)))
+ .await?;
+ Ok(())
}
+ /// Return a file offset usable with `add_hardlink`.
pub async fn add_device(
&mut self,
metadata: &Metadata,
size_of::<format::Device>(),
)
};
- self.add_file_entry(
- Some(metadata),
- file_name,
- Some((format::PXAR_DEVICE, device)),
- )
- .await
+ let _ofs: LinkOffset = self
+ .add_file_entry(
+ Some(metadata),
+ file_name,
+ Some((format::PXAR_DEVICE, device)),
+ )
+ .await?;
+ Ok(())
}
+ /// Return a file offset usable with `add_hardlink`.
pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
if !metadata.is_fifo() {
io_bail!("entry added via add_device must be of type fifo in its metadata");
}
- self.add_file_entry(Some(metadata), file_name, None).await
+ let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
+ Ok(())
}
+ /// Return a file offset usable with `add_hardlink`.
pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
if !metadata.is_socket() {
io_bail!("entry added via add_device must be of type socket in its metadata");
}
- self.add_file_entry(Some(metadata), file_name, None).await
+ let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
+ Ok(())
}
+ /// Return a file offset usable with `add_hardlink`.
async fn add_file_entry(
&mut self,
metadata: Option<&Metadata>,
file_name: &Path,
entry_htype_data: Option<(u64, &[u8])>,
- ) -> io::Result<()> {
+ ) -> io::Result<LinkOffset> {
self.check()?;
- let file_offset = seq_write_position(&mut self.output).await?;
+ let file_offset = self.position();
let file_name = file_name.as_os_str().as_bytes();
self.start_file_do(metadata, file_name).await?;
if let Some((htype, entry_data)) = entry_htype_data {
- seq_write_pxar_entry_zero(&mut self.output, htype, entry_data).await?;
+ seq_write_pxar_entry(
+ self.output.as_mut(),
+ htype,
+ entry_data,
+ &mut self.state.write_position,
+ )
+ .await?;
}
- let end_offset = seq_write_position(&mut self.output).await?;
+ let end_offset = self.position();
self.state.items.push(GoodbyeItem {
hash: format::hash_filename(file_name),
size: end_offset - file_offset,
});
- Ok(())
+ Ok(LinkOffset(file_offset))
}
- /// Helper
#[inline]
- async fn position(&mut self) -> io::Result<u64> {
- seq_write_position(&mut self.output).await
+ fn position(&mut self) -> u64 {
+ self.state.write_position
}
- pub async fn create_directory<'b>(
- &'b mut self,
+ pub async fn create_directory(
+ &mut self,
file_name: &Path,
metadata: &Metadata,
- ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>>
- where
- 'a: 'b,
- {
+ ) -> io::Result<EncoderImpl<'_, T>> {
self.check()?;
if !metadata.is_dir() {
let file_name = file_name.as_os_str().as_bytes();
let file_hash = format::hash_filename(file_name);
- let file_offset = self.position().await?;
+ let file_offset = self.position();
self.encode_filename(file_name).await?;
- let entry_offset = self.position().await?;
- self.encode_metadata(&metadata).await?;
+ let entry_offset = self.position();
+ self.encode_metadata(metadata).await?;
+
+ let files_offset = self.position();
+
+ // the child will write to OUR state now:
+ let write_position = self.position();
- let files_offset = self.position().await?;
+ let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
Ok(EncoderImpl {
- output: self.output.as_trait_object(),
+ // always forward as Borrowed(), to avoid stacking references on nested calls
+ output: self.output.to_borrowed_mut(),
state: EncoderState {
entry_offset,
files_offset,
file_offset: Some(file_offset),
- file_hash: file_hash,
+ file_hash,
+ write_position,
..Default::default()
},
parent: Some(&mut self.state),
finished: false,
+ file_copy_buffer,
})
}
) -> io::Result<()> {
self.encode_filename(file_name).await?;
if let Some(metadata) = metadata {
- self.encode_metadata(&metadata).await?;
+ self.encode_metadata(metadata).await?;
}
Ok(())
}
async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
- seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ENTRY, metadata.stat.clone())
- .await?;
+ seq_write_pxar_struct_entry(
+ self.output.as_mut(),
+ format::PXAR_ENTRY,
+ metadata.stat.clone(),
+ &mut self.state.write_position,
+ )
+ .await?;
for xattr in &metadata.xattrs {
self.write_xattr(xattr).await?;
}
async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
- seq_write_pxar_entry(&mut self.output, format::PXAR_XATTR, &xattr.data).await
+ seq_write_pxar_entry(
+ self.output.as_mut(),
+ format::PXAR_XATTR,
+ &xattr.data,
+ &mut self.state.write_position,
+ )
+ .await
}
async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
for acl in &acl.users {
- seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_USER, acl.clone())
- .await?;
+ seq_write_pxar_struct_entry(
+ self.output.as_mut(),
+ format::PXAR_ACL_USER,
+ acl.clone(),
+ &mut self.state.write_position,
+ )
+ .await?;
}
for acl in &acl.groups {
- seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP, acl.clone())
- .await?;
+ seq_write_pxar_struct_entry(
+ self.output.as_mut(),
+ format::PXAR_ACL_GROUP,
+ acl.clone(),
+ &mut self.state.write_position,
+ )
+ .await?;
}
if let Some(acl) = &acl.group_obj {
- seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP_OBJ, acl.clone())
- .await?;
+ seq_write_pxar_struct_entry(
+ self.output.as_mut(),
+ format::PXAR_ACL_GROUP_OBJ,
+ acl.clone(),
+ &mut self.state.write_position,
+ )
+ .await?;
}
if let Some(acl) = &acl.default {
- seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_DEFAULT, acl.clone())
- .await?;
+ seq_write_pxar_struct_entry(
+ self.output.as_mut(),
+ format::PXAR_ACL_DEFAULT,
+ acl.clone(),
+ &mut self.state.write_position,
+ )
+ .await?;
}
for acl in &acl.default_users {
seq_write_pxar_struct_entry(
- &mut self.output,
+ self.output.as_mut(),
format::PXAR_ACL_DEFAULT_USER,
acl.clone(),
+ &mut self.state.write_position,
)
.await?;
}
for acl in &acl.default_groups {
seq_write_pxar_struct_entry(
- &mut self.output,
+ self.output.as_mut(),
format::PXAR_ACL_DEFAULT_GROUP,
acl.clone(),
+ &mut self.state.write_position,
)
.await?;
}
}
async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
- seq_write_pxar_entry(&mut self.output, format::PXAR_FCAPS, &fcaps.data).await
+ seq_write_pxar_entry(
+ self.output.as_mut(),
+ format::PXAR_FCAPS,
+ &fcaps.data,
+ &mut self.state.write_position,
+ )
+ .await
}
async fn write_quota_project_id(
quota_project_id: &format::QuotaProjectId,
) -> io::Result<()> {
seq_write_pxar_struct_entry(
- &mut self.output,
+ self.output.as_mut(),
format::PXAR_QUOTA_PROJID,
- quota_project_id.clone(),
+ *quota_project_id,
+ &mut self.state.write_position,
)
.await
}
async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
- seq_write_pxar_entry_zero(&mut self.output, format::PXAR_FILENAME, file_name).await
+ crate::util::validate_filename(file_name)?;
+ seq_write_pxar_entry_zero(
+ self.output.as_mut(),
+ format::PXAR_FILENAME,
+ file_name,
+ &mut self.state.write_position,
+ )
+ .await
}
pub async fn finish(mut self) -> io::Result<()> {
let tail_bytes = self.finish_goodbye_table().await?;
- seq_write_pxar_entry(&mut self.output, format::PXAR_GOODBYE, &tail_bytes).await?;
+ seq_write_pxar_entry(
+ self.output.as_mut(),
+ format::PXAR_GOODBYE,
+ &tail_bytes,
+ &mut self.state.write_position,
+ )
+ .await?;
+
+ if let EncoderOutput::Owned(output) = &mut self.output {
+ flush(output).await?;
+ }
+
+ // done up here because of the self-borrow and to propagate
+ let end_offset = self.position();
+
if let Some(parent) = &mut self.parent {
+ parent.write_position = end_offset;
+
let file_offset = self
.state
.file_offset
.expect("internal error: parent set but no file_offset?");
- let end_offset = seq_write_position(&mut self.output).await?;
-
parent.items.push(GoodbyeItem {
hash: self.state.file_hash,
offset: file_offset,
}
async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
- let goodbye_offset = seq_write_position(&mut self.output).await?;
+ let goodbye_offset = self.position();
// "take" out the tail (to not leave an array of endian-swapped structs in `self`)
let mut tail = take(&mut self.state.items);
tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
let mut bst = Vec::with_capacity(tail.len() + 1);
+ #[allow(clippy::uninit_vec)]
unsafe {
bst.set_len(tail.len());
}
}
/// Writer for a file object in a directory.
-pub struct FileImpl<'a> {
- output: &'a mut dyn SeqWrite,
+pub(crate) struct FileImpl<'a, S: SeqWrite> {
+ output: &'a mut S,
/// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
/// directly instead of on Drop of FileImpl?
parent: &'a mut EncoderState,
}
-impl<'a> Drop for FileImpl<'a> {
+impl<'a, S: SeqWrite> Drop for FileImpl<'a, S> {
fn drop(&mut self) {
if self.remaining_size != 0 {
self.parent.add_error(EncodeError::IncompleteFile);
}
}
-impl<'a> FileImpl<'a> {
+impl<'a, S: SeqWrite> FileImpl<'a, S> {
+ /// Get the file offset to be able to reference it with `add_hardlink`.
+ pub fn file_offset(&self) -> LinkOffset {
+ LinkOffset(self.goodbye_item.offset)
+ }
+
fn check_remaining(&self, size: usize) -> io::Result<()> {
if size as u64 > self.remaining_size {
io_bail!("attempted to write more than previously allocated");
}
/// Poll write interface to more easily connect to tokio/futures.
- #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
+ #[cfg(feature = "tokio-io")]
pub fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context,
match output.poll_seq_write(cx, data) {
Poll::Ready(Ok(put)) => {
this.remaining_size -= put as u64;
+ this.parent.write_position += put as u64;
Poll::Ready(Ok(put))
}
other => other,
}
/// Poll flush interface to more easily connect to tokio/futures.
- #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
+ #[cfg(feature = "tokio-io")]
pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
- unsafe {
- self.map_unchecked_mut(|this| &mut this.output)
- .poll_flush(cx)
- }
+ unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
}
/// Poll close/shutdown interface to more easily connect to tokio/futures.
- #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
+ ///
+ /// This just calls flush, though, since we're just a virtual writer writing to the file
+ /// provided by our encoder.
+ #[cfg(feature = "tokio-io")]
pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
- unsafe {
- self.map_unchecked_mut(|this| &mut this.output)
- .poll_close(cx)
- }
+ unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
}
/// Write file data for the current file entry in a pxar archive.
/// for convenience.
pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
self.check_remaining(data.len())?;
- let put = seq_write(&mut self.output, data).await?;
+ let put =
+ poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
+ .await?;
+ //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
self.remaining_size -= put as u64;
+ self.parent.write_position += put as u64;
Ok(put)
}
/// Completely write file data for the current file entry in a pxar archive.
pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
self.check_remaining(data.len())?;
- seq_write_all(&mut self.output, data).await?;
+ seq_write_all(self.output, data, &mut self.parent.write_position).await?;
self.remaining_size -= data.len() as u64;
Ok(())
}
}
#[cfg(feature = "tokio-io")]
-impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
+impl<'a, S: SeqWrite> tokio::io::AsyncWrite for FileImpl<'a, S> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
FileImpl::poll_write(self, cx, buf)
}
FileImpl::poll_close(self, cx)
}
}
-
-#[cfg(feature = "futures-io")]
-impl<'a> futures::io::AsyncWrite for FileImpl<'a> {
- fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
- FileImpl::poll_write(self, cx, buf)
- }
-
- fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
- FileImpl::poll_flush(self, cx)
- }
-
- fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
- FileImpl::poll_close(self, cx)
- }
-}