1 //! The `pxar` encoder state machine.
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
6 use std
::mem
::{forget, size_of, size_of_val, take}
;
7 use std
::os
::unix
::ffi
::OsStrExt
;
10 use std
::task
::{Context, Poll}
;
12 use endian_trait
::Endian
;
14 use crate::binary_tree_array
;
15 use crate::decoder
::{self, SeqRead}
;
16 use crate::format
::{self, GoodbyeItem}
;
17 use crate::poll_fn
::poll_fn
;
24 pub use sync
::Encoder
;
26 /// File reference used to create hard links.
27 #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
28 pub struct LinkOffset(u64);
32 pub fn raw(self) -> u64 {
37 /// Sequential write interface used by the encoder's state machine.
39 /// This is our internal writer trait which is available for `std::io::Write` types in the
40 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
47 ) -> Poll
<io
::Result
<usize>>;
49 fn poll_flush(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>>;
51 fn poll_close(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>>;
53 /// While writing to a pxar archive we need to remember how much dat we've written to track some
54 /// offsets. Particularly items like the goodbye table need to be able to compute offsets to
55 /// further back in the archive.
56 fn poll_position(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<u64>>;
58 /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
59 /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
60 /// subdirectory in that would have a trait object pointing to the trait object, and so on.
61 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
65 self as &mut dyn SeqWrite
69 /// Allow using trait objects for generics taking a `SeqWrite`.
70 impl<'a
> SeqWrite
for &mut (dyn SeqWrite
+ 'a
) {
75 ) -> Poll
<io
::Result
<usize>> {
77 self.map_unchecked_mut(|this
| &mut **this
)
78 .poll_seq_write(cx
, buf
)
82 fn poll_flush(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {
83 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
86 fn poll_close(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {
87 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_close(cx) }
90 fn poll_position(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<u64>> {
91 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_position(cx) }
94 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
102 /// awaitable version of `poll_position`.
103 async
fn seq_write_position
<T
: SeqWrite
+ ?Sized
>(output
: &mut T
) -> io
::Result
<u64> {
104 poll_fn(move |cx
| unsafe { Pin::new_unchecked(&mut *output).poll_position(cx) }
).await
107 /// awaitable verison of `poll_seq_write`.
108 async
fn seq_write
<T
: SeqWrite
+ ?Sized
>(output
: &mut T
, buf
: &[u8]) -> io
::Result
<usize> {
109 poll_fn(|cx
| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }
).await
112 /// Write the entire contents of a buffer, handling short writes.
113 async
fn seq_write_all
<T
: SeqWrite
+ ?Sized
>(output
: &mut T
, mut buf
: &[u8]) -> io
::Result
<()> {
114 while !buf
.is_empty() {
115 let got
= seq_write(&mut *output
, buf
).await?
;
121 /// Write an endian-swappable struct.
122 async
fn seq_write_struct
<E
: Endian
, T
>(output
: &mut T
, data
: E
) -> io
::Result
<()>
124 T
: SeqWrite
+ ?Sized
,
126 let data
= data
.to_le();
127 seq_write_all(output
, unsafe {
128 std
::slice
::from_raw_parts(&data
as *const E
as *const u8, size_of_val(&data
))
133 /// Write a pxar entry.
134 async
fn seq_write_pxar_entry
<T
>(output
: &mut T
, htype
: u64, data
: &[u8]) -> io
::Result
<()>
136 T
: SeqWrite
+ ?Sized
,
140 format
::Header
::with_content_size(htype
, data
.len() as u64),
143 seq_write_all(output
, data
).await
146 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
148 async
fn seq_write_pxar_entry_zero
<T
>(output
: &mut T
, htype
: u64, data
: &[u8]) -> io
::Result
<()>
150 T
: SeqWrite
+ ?Sized
,
154 format
::Header
::with_content_size(htype
, 1 + data
.len() as u64),
157 seq_write_all(&mut *output
, data
).await?
;
158 seq_write_all(output
, &[0u8]).await
161 /// Write a pxar entry consiting of an endian-swappable struct.
162 async
fn seq_write_pxar_struct_entry
<E
, T
>(output
: &mut T
, htype
: u64, data
: E
) -> io
::Result
<()>
164 T
: SeqWrite
+ ?Sized
,
167 let data
= data
.to_le();
168 seq_write_pxar_entry(output
, htype
, unsafe {
169 std
::slice
::from_raw_parts(&data
as *const E
as *const u8, size_of_val(&data
))
174 /// Error conditions caused by wrong usage of this crate.
175 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
176 pub enum EncodeError
{
177 /// The user dropped a `File` without without finishing writing all of its contents.
179 /// This is required because the payload lengths is written out at the begining and decoding
180 /// requires there to follow the right amount of data.
183 /// The user dropped a directory without finalizing it.
185 /// Finalizing is required to build the goodbye table at the end of a directory.
190 struct EncoderState
{
191 /// Goodbye items for this directory, excluding the tail.
192 items
: Vec
<GoodbyeItem
>,
194 /// User caused error conditions.
195 encode_error
: Option
<EncodeError
>,
197 /// Offset of this directory's ENTRY.
200 /// Offset to this directory's first FILENAME.
203 /// If this is a subdirectory, this points to the this directory's FILENAME.
204 file_offset
: Option
<u64>,
206 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
211 fn merge_error(&mut self, error
: Option
<EncodeError
>) {
212 // one error is enough:
213 if self.encode_error
.is_none() {
214 self.encode_error
= error
;
218 fn add_error(&mut self, error
: EncodeError
) {
219 self.merge_error(Some(error
));
223 /// The encoder state machine implementation for a directory.
225 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
226 /// synchronous or `async` I/O objects in as output.
227 pub(crate) struct EncoderImpl
<'a
, T
: SeqWrite
+ 'a
> {
230 parent
: Option
<&'a
mut EncoderState
>,
234 impl<'a
, T
: SeqWrite
+ 'a
> Drop
for EncoderImpl
<'a
, T
> {
236 if let Some(ref mut parent
) = self.parent
{
238 parent
.merge_error(self.state
.encode_error
);
240 parent
.add_error(EncodeError
::IncompleteDirectory
);
242 } else if !self.finished
{
243 // FIXME: how do we deal with this?
244 // eprintln!("Encoder dropped without finishing!");
249 impl<'a
, T
: SeqWrite
+ 'a
> EncoderImpl
<'a
, T
> {
250 pub async
fn new(output
: T
, metadata
: &Metadata
) -> io
::Result
<EncoderImpl
<'a
, T
>> {
251 if !metadata
.is_dir() {
252 io_bail
!("directory metadata must contain the directory mode flag");
254 let mut this
= Self {
256 state
: EncoderState
::default(),
261 this
.encode_metadata(metadata
).await?
;
262 this
.state
.files_offset
= seq_write_position(&mut this
.output
).await?
;
267 fn check(&self) -> io
::Result
<()> {
268 match self.state
.encode_error
{
269 Some(EncodeError
::IncompleteFile
) => io_bail
!("incomplete file"),
270 Some(EncodeError
::IncompleteDirectory
) => io_bail
!("directory not finalized"),
275 pub async
fn create_file
<'b
>(
280 ) -> io
::Result
<FileImpl
<'b
>>
284 self.create_file_do(metadata
, file_name
.as_os_str().as_bytes(), file_size
)
288 async
fn create_file_do
<'b
>(
293 ) -> io
::Result
<FileImpl
<'b
>>
299 let file_offset
= seq_write_position(&mut self.output
).await?
;
300 self.start_file_do(Some(metadata
), file_name
).await?
;
304 format
::Header
::with_content_size(format
::PXAR_PAYLOAD
, file_size
),
308 let payload_data_offset
= seq_write_position(&mut self.output
).await?
;
310 let meta_size
= payload_data_offset
- file_offset
;
313 output
: &mut self.output
,
314 goodbye_item
: GoodbyeItem
{
315 hash
: format
::hash_filename(file_name
),
317 size
: file_size
+ meta_size
,
319 remaining_size
: file_size
,
320 parent
: &mut self.state
,
324 /// Return a file offset usable with `add_hardlink`.
325 pub async
fn add_file(
330 content
: &mut dyn SeqRead
,
331 ) -> io
::Result
<LinkOffset
> {
332 let mut file
= self.create_file(metadata
, file_name
, file_size
).await?
;
333 let mut buf
= crate::util
::vec_new(4096);
335 let got
= decoder
::seq_read(&mut *content
, &mut buf
).await?
;
339 file
.write_all(&buf
[..got
]).await?
;
342 Ok(file
.file_offset())
345 /// Return a file offset usable with `add_hardlink`.
346 pub async
fn add_symlink(
351 ) -> io
::Result
<()> {
352 let _ofs
: LinkOffset
= self.add_file_entry(
355 Some((format
::PXAR_SYMLINK
, target
.as_os_str().as_bytes())),
361 /// Return a file offset usable with `add_hardlink`.
362 pub async
fn add_hardlink(
367 ) -> io
::Result
<()> {
368 let hardlink
= format
::Hardlink
{
370 data
: target
.as_os_str().as_bytes().to_vec(),
372 let hardlink
= unsafe {
373 std
::slice
::from_raw_parts(
374 &hardlink
as *const format
::Hardlink
as *const u8,
375 size_of
::<format
::Hardlink
>(),
378 let _this_offset
: LinkOffset
= self.add_file_entry(
381 Some((format
::PXAR_HARDLINK
, hardlink
)),
387 /// Return a file offset usable with `add_hardlink`.
388 pub async
fn add_device(
392 device
: format
::Device
,
393 ) -> io
::Result
<()> {
394 if !metadata
.is_device() {
395 io_bail
!("entry added via add_device must have a device mode in its metadata");
398 let device
= device
.to_le();
399 let device
= unsafe {
400 std
::slice
::from_raw_parts(
401 &device
as *const format
::Device
as *const u8,
402 size_of
::<format
::Device
>(),
405 let _ofs
: LinkOffset
= self.add_file_entry(
408 Some((format
::PXAR_DEVICE
, device
)),
414 /// Return a file offset usable with `add_hardlink`.
415 pub async
fn add_fifo(&mut self, metadata
: &Metadata
, file_name
: &Path
) -> io
::Result
<()> {
416 if !metadata
.is_fifo() {
417 io_bail
!("entry added via add_device must be of type fifo in its metadata");
420 let _ofs
: LinkOffset
= self.add_file_entry(Some(metadata
), file_name
, None
).await?
;
424 /// Return a file offset usable with `add_hardlink`.
425 pub async
fn add_socket(&mut self, metadata
: &Metadata
, file_name
: &Path
) -> io
::Result
<()> {
426 if !metadata
.is_socket() {
427 io_bail
!("entry added via add_device must be of type socket in its metadata");
430 let _ofs
: LinkOffset
= self.add_file_entry(Some(metadata
), file_name
, None
).await?
;
434 /// Return a file offset usable with `add_hardlink`.
435 async
fn add_file_entry(
437 metadata
: Option
<&Metadata
>,
439 entry_htype_data
: Option
<(u64, &[u8])>,
440 ) -> io
::Result
<LinkOffset
> {
443 let file_offset
= seq_write_position(&mut self.output
).await?
;
445 let file_name
= file_name
.as_os_str().as_bytes();
447 self.start_file_do(metadata
, file_name
).await?
;
448 if let Some((htype
, entry_data
)) = entry_htype_data
{
449 seq_write_pxar_entry_zero(&mut self.output
, htype
, entry_data
).await?
;
452 let end_offset
= seq_write_position(&mut self.output
).await?
;
454 self.state
.items
.push(GoodbyeItem
{
455 hash
: format
::hash_filename(file_name
),
457 size
: end_offset
- file_offset
,
460 Ok(LinkOffset(file_offset
))
465 async
fn position(&mut self) -> io
::Result
<u64> {
466 seq_write_position(&mut self.output
).await
469 pub async
fn create_directory
<'b
>(
473 ) -> io
::Result
<EncoderImpl
<'b
, &'b
mut dyn SeqWrite
>>
479 if !metadata
.is_dir() {
480 io_bail
!("directory metadata must contain the directory mode flag");
483 let file_name
= file_name
.as_os_str().as_bytes();
484 let file_hash
= format
::hash_filename(file_name
);
486 let file_offset
= self.position().await?
;
487 self.encode_filename(file_name
).await?
;
489 let entry_offset
= self.position().await?
;
490 self.encode_metadata(&metadata
).await?
;
492 let files_offset
= self.position().await?
;
495 output
: self.output
.as_trait_object(),
496 state
: EncoderState
{
499 file_offset
: Some(file_offset
),
500 file_hash
: file_hash
,
503 parent
: Some(&mut self.state
),
508 async
fn start_file_do(
510 metadata
: Option
<&Metadata
>,
512 ) -> io
::Result
<()> {
513 self.encode_filename(file_name
).await?
;
514 if let Some(metadata
) = metadata
{
515 self.encode_metadata(&metadata
).await?
;
520 async
fn encode_metadata(&mut self, metadata
: &Metadata
) -> io
::Result
<()> {
521 seq_write_pxar_struct_entry(&mut self.output
, format
::PXAR_ENTRY
, metadata
.stat
.clone())
524 for xattr
in &metadata
.xattrs
{
525 self.write_xattr(xattr
).await?
;
528 self.write_acls(&metadata
.acl
).await?
;
530 if let Some(fcaps
) = &metadata
.fcaps
{
531 self.write_file_capabilities(fcaps
).await?
;
534 if let Some(qpid
) = &metadata
.quota_project_id
{
535 self.write_quota_project_id(qpid
).await?
;
541 async
fn write_xattr(&mut self, xattr
: &format
::XAttr
) -> io
::Result
<()> {
542 seq_write_pxar_entry(&mut self.output
, format
::PXAR_XATTR
, &xattr
.data
).await
545 async
fn write_acls(&mut self, acl
: &crate::Acl
) -> io
::Result
<()> {
546 for acl
in &acl
.users
{
547 seq_write_pxar_struct_entry(&mut self.output
, format
::PXAR_ACL_USER
, acl
.clone())
551 for acl
in &acl
.groups
{
552 seq_write_pxar_struct_entry(&mut self.output
, format
::PXAR_ACL_GROUP
, acl
.clone())
556 if let Some(acl
) = &acl
.group_obj
{
557 seq_write_pxar_struct_entry(&mut self.output
, format
::PXAR_ACL_GROUP_OBJ
, acl
.clone())
561 if let Some(acl
) = &acl
.default {
562 seq_write_pxar_struct_entry(&mut self.output
, format
::PXAR_ACL_DEFAULT
, acl
.clone())
566 for acl
in &acl
.default_users
{
567 seq_write_pxar_struct_entry(
569 format
::PXAR_ACL_DEFAULT_USER
,
575 for acl
in &acl
.default_groups
{
576 seq_write_pxar_struct_entry(
578 format
::PXAR_ACL_DEFAULT_GROUP
,
587 async
fn write_file_capabilities(&mut self, fcaps
: &format
::FCaps
) -> io
::Result
<()> {
588 seq_write_pxar_entry(&mut self.output
, format
::PXAR_FCAPS
, &fcaps
.data
).await
591 async
fn write_quota_project_id(
593 quota_project_id
: &format
::QuotaProjectId
,
594 ) -> io
::Result
<()> {
595 seq_write_pxar_struct_entry(
597 format
::PXAR_QUOTA_PROJID
,
598 quota_project_id
.clone(),
603 async
fn encode_filename(&mut self, file_name
: &[u8]) -> io
::Result
<()> {
604 seq_write_pxar_entry_zero(&mut self.output
, format
::PXAR_FILENAME
, file_name
).await
607 pub async
fn finish(mut self) -> io
::Result
<()> {
608 let tail_bytes
= self.finish_goodbye_table().await?
;
609 seq_write_pxar_entry(&mut self.output
, format
::PXAR_GOODBYE
, &tail_bytes
).await?
;
610 if let Some(parent
) = &mut self.parent
{
611 let file_offset
= self
614 .expect("internal error: parent set but no file_offset?");
616 let end_offset
= seq_write_position(&mut self.output
).await?
;
618 parent
.items
.push(GoodbyeItem
{
619 hash
: self.state
.file_hash
,
621 size
: end_offset
- file_offset
,
624 self.finished
= true;
628 async
fn finish_goodbye_table(&mut self) -> io
::Result
<Vec
<u8>> {
629 let goodbye_offset
= seq_write_position(&mut self.output
).await?
;
631 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
632 let mut tail
= take(&mut self.state
.items
);
633 let tail_size
= (tail
.len() + 1) * size_of
::<GoodbyeItem
>();
634 let goodbye_size
= tail_size
as u64 + size_of
::<format
::Header
>() as u64;
636 // sort, then create a BST
637 tail
.sort_unstable_by(|a
, b
| a
.hash
.cmp(&b
.hash
));
639 let mut bst
= Vec
::with_capacity(tail
.len() + 1);
641 bst
.set_len(tail
.len());
643 binary_tree_array
::copy(tail
.len(), |src
, dest
| {
644 let mut item
= tail
[src
].clone();
645 // fixup the goodbye table offsets to be relative and with the right endianess
646 item
.offset
= goodbye_offset
- item
.offset
;
648 std
::ptr
::write(&mut bst
[dest
], item
.to_le());
655 hash
: format
::PXAR_GOODBYE_TAIL_MARKER
,
656 offset
: goodbye_offset
- self.state
.entry_offset
,
662 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
663 // the items make sense:
664 let data
= bst
.as_mut_ptr() as *mut u8;
665 let capacity
= bst
.capacity() * size_of
::<GoodbyeItem
>();
667 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) }
)
671 /// Writer for a file object in a directory.
672 pub struct FileImpl
<'a
> {
673 output
: &'a
mut dyn SeqWrite
,
675 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
676 /// directly instead of on Drop of FileImpl?
677 goodbye_item
: GoodbyeItem
,
679 /// While writing data to this file, this is how much space we still have left, this must reach
683 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
684 /// to, and where we insert our `GoodbyeItem`.
685 parent
: &'a
mut EncoderState
,
688 impl<'a
> Drop
for FileImpl
<'a
> {
690 if self.remaining_size
!= 0 {
691 self.parent
.add_error(EncodeError
::IncompleteFile
);
694 self.parent
.items
.push(self.goodbye_item
.clone());
698 impl<'a
> FileImpl
<'a
> {
699 /// Get the file offset to be able to reference it with `add_hardlink`.
700 pub fn file_offset(&self) -> LinkOffset
{
701 LinkOffset(self.goodbye_item
.offset
)
704 fn check_remaining(&self, size
: usize) -> io
::Result
<()> {
705 if size
as u64 > self.remaining_size
{
706 io_bail
!("attempted to write more than previously allocated");
712 /// Poll write interface to more easily connect to tokio/futures.
713 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
715 self: Pin
<&mut Self>,
718 ) -> Poll
<io
::Result
<usize>> {
719 let this
= self.get_mut();
720 this
.check_remaining(data
.len())?
;
721 let output
= unsafe { Pin::new_unchecked(&mut *this.output) }
;
722 match output
.poll_seq_write(cx
, data
) {
723 Poll
::Ready(Ok(put
)) => {
724 this
.remaining_size
-= put
as u64;
731 /// Poll flush interface to more easily connect to tokio/futures.
732 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
733 pub fn poll_flush(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {
735 self.map_unchecked_mut(|this
| &mut this
.output
)
740 /// Poll close/shutdown interface to more easily connect to tokio/futures.
741 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
742 pub fn poll_close(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {
744 self.map_unchecked_mut(|this
| &mut this
.output
)
749 /// Write file data for the current file entry in a pxar archive.
751 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
752 /// requested. Check the return value for how many. There's also a `write_all` method available
754 pub async
fn write(&mut self, data
: &[u8]) -> io
::Result
<usize> {
755 self.check_remaining(data
.len())?
;
756 let put
= seq_write(&mut self.output
, data
).await?
;
757 self.remaining_size
-= put
as u64;
761 /// Completely write file data for the current file entry in a pxar archive.
762 pub async
fn write_all(&mut self, data
: &[u8]) -> io
::Result
<()> {
763 self.check_remaining(data
.len())?
;
764 seq_write_all(&mut self.output
, data
).await?
;
765 self.remaining_size
-= data
.len() as u64;
770 #[cfg(feature = "tokio-io")]
771 impl<'a
> tokio
::io
::AsyncWrite
for FileImpl
<'a
> {
772 fn poll_write(self: Pin
<&mut Self>, cx
: &mut Context
, buf
: &[u8]) -> Poll
<io
::Result
<usize>> {
773 FileImpl
::poll_write(self, cx
, buf
)
776 fn poll_flush(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {
777 FileImpl
::poll_flush(self, cx
)
780 fn poll_shutdown(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {
781 FileImpl
::poll_close(self, cx
)
785 #[cfg(feature = "futures-io")]
786 impl<'a
> futures
::io
::AsyncWrite
for FileImpl
<'a
> {
787 fn poll_write(self: Pin
<&mut Self>, cx
: &mut Context
, buf
: &[u8]) -> Poll
<io
::Result
<usize>> {
788 FileImpl
::poll_write(self, cx
, buf
)
791 fn poll_flush(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {
792 FileImpl
::poll_flush(self, cx
)
795 fn poll_close(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {
796 FileImpl
::poll_close(self, cx
)