1 //! The `pxar` encoder state machine.
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
5 use std
::cell
::RefCell
;
7 use std
::mem
::{forget, size_of, size_of_val, take}
;
8 use std
::os
::unix
::ffi
::OsStrExt
;
12 use std
::task
::{Context, Poll}
;
14 use endian_trait
::Endian
;
16 use crate::binary_tree_array
;
17 use crate::decoder
::{self, SeqRead}
;
18 use crate::format
::{self, GoodbyeItem}
;
19 use crate::poll_fn
::poll_fn
;
26 pub use sync
::Encoder
;
28 /// File reference used to create hard links.
29 #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
30 pub struct LinkOffset(u64);
34 pub fn raw(self) -> u64 {
39 /// Sequential write interface used by the encoder's state machine.
41 /// This is our internal writer trait which is available for `std::io::Write` types in the
42 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
49 ) -> Poll
<io
::Result
<usize>>;
51 fn poll_flush(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>>;
53 /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
54 /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
55 /// subdirectory in that would have a trait object pointing to the trait object, and so on.
56 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
60 self as &mut dyn SeqWrite
64 /// Allow using trait objects for generics taking a `SeqWrite`.
65 impl<'a
> SeqWrite
for &mut (dyn SeqWrite
+ 'a
) {
70 ) -> Poll
<io
::Result
<usize>> {
72 self.map_unchecked_mut(|this
| &mut **this
)
73 .poll_seq_write(cx
, buf
)
77 fn poll_flush(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {
78 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
81 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
89 /// awaitable verison of `poll_seq_write`.
90 async
fn seq_write
<T
: SeqWrite
+ ?Sized
>(
94 ) -> io
::Result
<usize> {
96 poll_fn(|cx
| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }
).await?
;
97 *position
+= put
as u64;
101 /// Write the entire contents of a buffer, handling short writes.
102 async
fn seq_write_all
<T
: SeqWrite
+ ?Sized
>(
106 ) -> io
::Result
<()> {
107 while !buf
.is_empty() {
108 let got
= seq_write(&mut *output
, buf
, &mut *position
).await?
;
114 /// Write an endian-swappable struct.
115 async
fn seq_write_struct
<E
: Endian
, T
>(
121 T
: SeqWrite
+ ?Sized
,
123 let data
= data
.to_le();
126 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) }
,
132 /// Write a pxar entry.
133 async
fn seq_write_pxar_entry
<T
>(
140 T
: SeqWrite
+ ?Sized
,
142 let header
= format
::Header
::with_content_size(htype
, data
.len() as u64);
143 header
.check_header_size()?
;
145 seq_write_struct(&mut *output
, header
, &mut *position
).await?
;
146 seq_write_all(output
, data
, position
).await
149 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
151 async
fn seq_write_pxar_entry_zero
<T
>(
158 T
: SeqWrite
+ ?Sized
,
160 let header
= format
::Header
::with_content_size(htype
, 1 + data
.len() as u64);
161 header
.check_header_size()?
;
163 seq_write_struct(&mut *output
, header
, &mut *position
).await?
;
164 seq_write_all(&mut *output
, data
, &mut *position
).await?
;
165 seq_write_all(output
, &[0u8], position
).await
168 /// Write a pxar entry consiting of an endian-swappable struct.
169 async
fn seq_write_pxar_struct_entry
<E
, T
>(
176 T
: SeqWrite
+ ?Sized
,
179 let data
= data
.to_le();
180 seq_write_pxar_entry(
183 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) }
,
189 /// Error conditions caused by wrong usage of this crate.
190 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
191 pub enum EncodeError
{
192 /// The user dropped a `File` without without finishing writing all of its contents.
194 /// This is required because the payload lengths is written out at the begining and decoding
195 /// requires there to follow the right amount of data.
198 /// The user dropped a directory without finalizing it.
200 /// Finalizing is required to build the goodbye table at the end of a directory.
205 struct EncoderState
{
206 /// Goodbye items for this directory, excluding the tail.
207 items
: Vec
<GoodbyeItem
>,
209 /// User caused error conditions.
210 encode_error
: Option
<EncodeError
>,
212 /// Offset of this directory's ENTRY.
215 /// Offset to this directory's first FILENAME.
218 /// If this is a subdirectory, this points to the this directory's FILENAME.
219 file_offset
: Option
<u64>,
221 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
224 /// We need to keep track how much we have written to get offsets.
229 fn merge_error(&mut self, error
: Option
<EncodeError
>) {
230 // one error is enough:
231 if self.encode_error
.is_none() {
232 self.encode_error
= error
;
236 fn add_error(&mut self, error
: EncodeError
) {
237 self.merge_error(Some(error
));
241 /// The encoder state machine implementation for a directory.
243 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
244 /// synchronous or `async` I/O objects in as output.
245 pub(crate) struct EncoderImpl
<'a
, T
: SeqWrite
+ 'a
> {
248 parent
: Option
<&'a
mut EncoderState
>,
251 /// Since only the "current" entry can be actively writing files, we share the file copy
253 file_copy_buffer
: Rc
<RefCell
<Vec
<u8>>>,
256 impl<'a
, T
: SeqWrite
+ 'a
> Drop
for EncoderImpl
<'a
, T
> {
258 if let Some(ref mut parent
) = self.parent
{
260 parent
.merge_error(self.state
.encode_error
);
262 parent
.add_error(EncodeError
::IncompleteDirectory
);
264 } else if !self.finished
{
265 // FIXME: how do we deal with this?
266 // eprintln!("Encoder dropped without finishing!");
271 impl<'a
, T
: SeqWrite
+ 'a
> EncoderImpl
<'a
, T
> {
272 pub async
fn new(output
: T
, metadata
: &Metadata
) -> io
::Result
<EncoderImpl
<'a
, T
>> {
273 if !metadata
.is_dir() {
274 io_bail
!("directory metadata must contain the directory mode flag");
276 let mut this
= Self {
277 output
: Some(output
),
278 state
: EncoderState
::default(),
281 file_copy_buffer
: Rc
::new(RefCell
::new(crate::util
::vec_new(1024 * 1024))),
284 this
.encode_metadata(metadata
).await?
;
285 this
.state
.files_offset
= this
.position();
290 fn check(&self) -> io
::Result
<()> {
291 match self.state
.encode_error
{
292 Some(EncodeError
::IncompleteFile
) => io_bail
!("incomplete file"),
293 Some(EncodeError
::IncompleteDirectory
) => io_bail
!("directory not finalized"),
298 pub async
fn create_file
<'b
>(
303 ) -> io
::Result
<FileImpl
<'b
>>
307 self.create_file_do(metadata
, file_name
.as_os_str().as_bytes(), file_size
)
311 async
fn create_file_do
<'b
>(
316 ) -> io
::Result
<FileImpl
<'b
>>
322 let file_offset
= self.position();
323 self.start_file_do(Some(metadata
), file_name
).await?
;
325 let header
= format
::Header
::with_content_size(format
::PXAR_PAYLOAD
, file_size
);
326 header
.check_header_size()?
;
328 seq_write_struct(self.output
.as_mut().unwrap(), header
, &mut self.state
.write_position
).await?
;
330 let payload_data_offset
= self.position();
332 let meta_size
= payload_data_offset
- file_offset
;
335 output
: self.output
.as_mut().unwrap(),
336 goodbye_item
: GoodbyeItem
{
337 hash
: format
::hash_filename(file_name
),
339 size
: file_size
+ meta_size
,
341 remaining_size
: file_size
,
342 parent
: &mut self.state
,
346 /// Return a file offset usable with `add_hardlink`.
347 pub async
fn add_file(
352 content
: &mut dyn SeqRead
,
353 ) -> io
::Result
<LinkOffset
> {
354 let buf
= Rc
::clone(&self.file_copy_buffer
);
355 let mut file
= self.create_file(metadata
, file_name
, file_size
).await?
;
356 let mut buf
= buf
.borrow_mut();
358 let got
= decoder
::seq_read(&mut *content
, &mut buf
[..]).await?
;
362 file
.write_all(&buf
[..got
]).await?
;
365 Ok(file
.file_offset())
368 /// Return a file offset usable with `add_hardlink`.
369 pub async
fn add_symlink(
374 ) -> io
::Result
<()> {
375 let target
= target
.as_os_str().as_bytes();
376 let mut data
= Vec
::with_capacity(target
.len() + 1);
379 let _ofs
: LinkOffset
= self
383 Some((format
::PXAR_SYMLINK
, &data
)),
389 /// Return a file offset usable with `add_hardlink`.
390 pub async
fn add_hardlink(
394 target_offset
: LinkOffset
,
395 ) -> io
::Result
<()> {
396 let current_offset
= self.position();
397 if current_offset
<= target_offset
.0 {
398 io_bail
!("invalid hardlink offset, can only point to prior files");
401 let offset_bytes
= (current_offset
- target_offset
.0).to_le_bytes();
402 let target_bytes
= target
.as_os_str().as_bytes();
403 let mut hardlink
= Vec
::with_capacity(offset_bytes
.len() + target_bytes
.len() + 1);
404 hardlink
.extend(&offset_bytes
);
405 hardlink
.extend(target_bytes
);
407 let _this_offset
: LinkOffset
= self
408 .add_file_entry(None
, file_name
, Some((format
::PXAR_HARDLINK
, &hardlink
)))
413 /// Return a file offset usable with `add_hardlink`.
414 pub async
fn add_device(
418 device
: format
::Device
,
419 ) -> io
::Result
<()> {
420 if !metadata
.is_device() {
421 io_bail
!("entry added via add_device must have a device mode in its metadata");
424 let device
= device
.to_le();
425 let device
= unsafe {
426 std
::slice
::from_raw_parts(
427 &device
as *const format
::Device
as *const u8,
428 size_of
::<format
::Device
>(),
431 let _ofs
: LinkOffset
= self
435 Some((format
::PXAR_DEVICE
, device
)),
441 /// Return a file offset usable with `add_hardlink`.
442 pub async
fn add_fifo(&mut self, metadata
: &Metadata
, file_name
: &Path
) -> io
::Result
<()> {
443 if !metadata
.is_fifo() {
444 io_bail
!("entry added via add_device must be of type fifo in its metadata");
447 let _ofs
: LinkOffset
= self.add_file_entry(Some(metadata
), file_name
, None
).await?
;
451 /// Return a file offset usable with `add_hardlink`.
452 pub async
fn add_socket(&mut self, metadata
: &Metadata
, file_name
: &Path
) -> io
::Result
<()> {
453 if !metadata
.is_socket() {
454 io_bail
!("entry added via add_device must be of type socket in its metadata");
457 let _ofs
: LinkOffset
= self.add_file_entry(Some(metadata
), file_name
, None
).await?
;
461 /// Return a file offset usable with `add_hardlink`.
462 async
fn add_file_entry(
464 metadata
: Option
<&Metadata
>,
466 entry_htype_data
: Option
<(u64, &[u8])>,
467 ) -> io
::Result
<LinkOffset
> {
470 let file_offset
= self.position();
472 let file_name
= file_name
.as_os_str().as_bytes();
474 self.start_file_do(metadata
, file_name
).await?
;
475 if let Some((htype
, entry_data
)) = entry_htype_data
{
476 seq_write_pxar_entry(
477 self.output
.as_mut().unwrap(),
480 &mut self.state
.write_position
,
485 let end_offset
= self.position();
487 self.state
.items
.push(GoodbyeItem
{
488 hash
: format
::hash_filename(file_name
),
490 size
: end_offset
- file_offset
,
493 Ok(LinkOffset(file_offset
))
497 fn position(&mut self) -> u64 {
498 self.state
.write_position
501 pub async
fn create_directory
<'b
>(
505 ) -> io
::Result
<EncoderImpl
<'b
, &'b
mut dyn SeqWrite
>>
511 if !metadata
.is_dir() {
512 io_bail
!("directory metadata must contain the directory mode flag");
515 let file_name
= file_name
.as_os_str().as_bytes();
516 let file_hash
= format
::hash_filename(file_name
);
518 let file_offset
= self.position();
519 self.encode_filename(file_name
).await?
;
521 let entry_offset
= self.position();
522 self.encode_metadata(&metadata
).await?
;
524 let files_offset
= self.position();
526 // the child will write to OUR state now:
527 let write_position
= self.position();
530 output
: self.output
.as_mut().map(SeqWrite
::as_trait_object
),
531 state
: EncoderState
{
534 file_offset
: Some(file_offset
),
539 parent
: Some(&mut self.state
),
541 file_copy_buffer
: Rc
::clone(&self.file_copy_buffer
),
545 async
fn start_file_do(
547 metadata
: Option
<&Metadata
>,
549 ) -> io
::Result
<()> {
550 self.encode_filename(file_name
).await?
;
551 if let Some(metadata
) = metadata
{
552 self.encode_metadata(&metadata
).await?
;
557 async
fn encode_metadata(&mut self, metadata
: &Metadata
) -> io
::Result
<()> {
558 seq_write_pxar_struct_entry(
559 self.output
.as_mut().unwrap(),
561 metadata
.stat
.clone(),
562 &mut self.state
.write_position
,
566 for xattr
in &metadata
.xattrs
{
567 self.write_xattr(xattr
).await?
;
570 self.write_acls(&metadata
.acl
).await?
;
572 if let Some(fcaps
) = &metadata
.fcaps
{
573 self.write_file_capabilities(fcaps
).await?
;
576 if let Some(qpid
) = &metadata
.quota_project_id
{
577 self.write_quota_project_id(qpid
).await?
;
583 async
fn write_xattr(&mut self, xattr
: &format
::XAttr
) -> io
::Result
<()> {
584 seq_write_pxar_entry(
585 self.output
.as_mut().unwrap(),
588 &mut self.state
.write_position
,
593 async
fn write_acls(&mut self, acl
: &crate::Acl
) -> io
::Result
<()> {
594 for acl
in &acl
.users
{
595 seq_write_pxar_struct_entry(
596 self.output
.as_mut().unwrap(),
597 format
::PXAR_ACL_USER
,
599 &mut self.state
.write_position
,
604 for acl
in &acl
.groups
{
605 seq_write_pxar_struct_entry(
606 self.output
.as_mut().unwrap(),
607 format
::PXAR_ACL_GROUP
,
609 &mut self.state
.write_position
,
614 if let Some(acl
) = &acl
.group_obj
{
615 seq_write_pxar_struct_entry(
616 self.output
.as_mut().unwrap(),
617 format
::PXAR_ACL_GROUP_OBJ
,
619 &mut self.state
.write_position
,
624 if let Some(acl
) = &acl
.default {
625 seq_write_pxar_struct_entry(
626 self.output
.as_mut().unwrap(),
627 format
::PXAR_ACL_DEFAULT
,
629 &mut self.state
.write_position
,
634 for acl
in &acl
.default_users
{
635 seq_write_pxar_struct_entry(
636 self.output
.as_mut().unwrap(),
637 format
::PXAR_ACL_DEFAULT_USER
,
639 &mut self.state
.write_position
,
644 for acl
in &acl
.default_groups
{
645 seq_write_pxar_struct_entry(
646 self.output
.as_mut().unwrap(),
647 format
::PXAR_ACL_DEFAULT_GROUP
,
649 &mut self.state
.write_position
,
657 async
fn write_file_capabilities(&mut self, fcaps
: &format
::FCaps
) -> io
::Result
<()> {
658 seq_write_pxar_entry(
659 self.output
.as_mut().unwrap(),
662 &mut self.state
.write_position
,
667 async
fn write_quota_project_id(
669 quota_project_id
: &format
::QuotaProjectId
,
670 ) -> io
::Result
<()> {
671 seq_write_pxar_struct_entry(
672 self.output
.as_mut().unwrap(),
673 format
::PXAR_QUOTA_PROJID
,
675 &mut self.state
.write_position
,
680 async
fn encode_filename(&mut self, file_name
: &[u8]) -> io
::Result
<()> {
681 crate::util
::validate_filename(file_name
)?
;
682 seq_write_pxar_entry_zero(
683 self.output
.as_mut().unwrap(),
684 format
::PXAR_FILENAME
,
686 &mut self.state
.write_position
,
691 pub async
fn finish(mut self) -> io
::Result
<T
> {
692 let tail_bytes
= self.finish_goodbye_table().await?
;
693 seq_write_pxar_entry(
694 self.output
.as_mut().unwrap(),
695 format
::PXAR_GOODBYE
,
697 &mut self.state
.write_position
,
701 // done up here because of the self-borrow and to propagate
702 let end_offset
= self.position();
704 if let Some(parent
) = &mut self.parent
{
705 parent
.write_position
= end_offset
;
707 let file_offset
= self
710 .expect("internal error: parent set but no file_offset?");
712 parent
.items
.push(GoodbyeItem
{
713 hash
: self.state
.file_hash
,
715 size
: end_offset
- file_offset
,
718 self.finished
= true;
719 Ok(self.output
.take().unwrap())
722 pub fn into_writer(mut self) -> T
{
723 self.output
.take().unwrap()
726 async
fn finish_goodbye_table(&mut self) -> io
::Result
<Vec
<u8>> {
727 let goodbye_offset
= self.position();
729 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
730 let mut tail
= take(&mut self.state
.items
);
731 let tail_size
= (tail
.len() + 1) * size_of
::<GoodbyeItem
>();
732 let goodbye_size
= tail_size
as u64 + size_of
::<format
::Header
>() as u64;
734 // sort, then create a BST
735 tail
.sort_unstable_by(|a
, b
| a
.hash
.cmp(&b
.hash
));
737 let mut bst
= Vec
::with_capacity(tail
.len() + 1);
739 bst
.set_len(tail
.len());
741 binary_tree_array
::copy(tail
.len(), |src
, dest
| {
742 let mut item
= tail
[src
].clone();
743 // fixup the goodbye table offsets to be relative and with the right endianess
744 item
.offset
= goodbye_offset
- item
.offset
;
746 std
::ptr
::write(&mut bst
[dest
], item
.to_le());
753 hash
: format
::PXAR_GOODBYE_TAIL_MARKER
,
754 offset
: goodbye_offset
- self.state
.entry_offset
,
760 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
761 // the items make sense:
762 let data
= bst
.as_mut_ptr() as *mut u8;
763 let capacity
= bst
.capacity() * size_of
::<GoodbyeItem
>();
765 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) }
)
769 /// Writer for a file object in a directory.
770 pub struct FileImpl
<'a
> {
771 output
: &'a
mut dyn SeqWrite
,
773 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
774 /// directly instead of on Drop of FileImpl?
775 goodbye_item
: GoodbyeItem
,
777 /// While writing data to this file, this is how much space we still have left, this must reach
781 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
782 /// to, and where we insert our `GoodbyeItem`.
783 parent
: &'a
mut EncoderState
,
786 impl<'a
> Drop
for FileImpl
<'a
> {
788 if self.remaining_size
!= 0 {
789 self.parent
.add_error(EncodeError
::IncompleteFile
);
792 self.parent
.items
.push(self.goodbye_item
.clone());
796 impl<'a
> FileImpl
<'a
> {
797 /// Get the file offset to be able to reference it with `add_hardlink`.
798 pub fn file_offset(&self) -> LinkOffset
{
799 LinkOffset(self.goodbye_item
.offset
)
802 fn check_remaining(&self, size
: usize) -> io
::Result
<()> {
803 if size
as u64 > self.remaining_size
{
804 io_bail
!("attempted to write more than previously allocated");
810 /// Poll write interface to more easily connect to tokio/futures.
811 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
813 self: Pin
<&mut Self>,
816 ) -> Poll
<io
::Result
<usize>> {
817 let this
= self.get_mut();
818 this
.check_remaining(data
.len())?
;
819 let output
= unsafe { Pin::new_unchecked(&mut *this.output) }
;
820 match output
.poll_seq_write(cx
, data
) {
821 Poll
::Ready(Ok(put
)) => {
822 this
.remaining_size
-= put
as u64;
823 this
.parent
.write_position
+= put
as u64;
830 /// Poll flush interface to more easily connect to tokio/futures.
831 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
832 pub fn poll_flush(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {
834 self.map_unchecked_mut(|this
| &mut this
.output
)
839 /// Poll close/shutdown interface to more easily connect to tokio/futures.
841 /// This just calls flush, though, since we're just a virtual writer writing to the file
842 /// provided by our encoder.
843 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
844 pub fn poll_close(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {
846 self.map_unchecked_mut(|this
| &mut this
.output
)
851 /// Write file data for the current file entry in a pxar archive.
853 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
854 /// requested. Check the return value for how many. There's also a `write_all` method available
856 pub async
fn write(&mut self, data
: &[u8]) -> io
::Result
<usize> {
857 self.check_remaining(data
.len())?
;
859 poll_fn(|cx
| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) }
)
861 //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
862 self.remaining_size
-= put
as u64;
863 self.parent
.write_position
+= put
as u64;
867 /// Completely write file data for the current file entry in a pxar archive.
868 pub async
fn write_all(&mut self, data
: &[u8]) -> io
::Result
<()> {
869 self.check_remaining(data
.len())?
;
870 seq_write_all(&mut self.output
, data
, &mut self.parent
.write_position
).await?
;
871 self.remaining_size
-= data
.len() as u64;
876 #[cfg(feature = "tokio-io")]
877 impl<'a
> tokio
::io
::AsyncWrite
for FileImpl
<'a
> {
878 fn poll_write(self: Pin
<&mut Self>, cx
: &mut Context
, buf
: &[u8]) -> Poll
<io
::Result
<usize>> {
879 FileImpl
::poll_write(self, cx
, buf
)
882 fn poll_flush(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {
883 FileImpl
::poll_flush(self, cx
)
886 fn poll_shutdown(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {
887 FileImpl
::poll_close(self, cx
)
891 #[cfg(feature = "futures-io")]
892 impl<'a
> futures
::io
::AsyncWrite
for FileImpl
<'a
> {
893 fn poll_write(self: Pin
<&mut Self>, cx
: &mut Context
, buf
: &[u8]) -> Poll
<io
::Result
<usize>> {
894 FileImpl
::poll_write(self, cx
, buf
)
897 fn poll_flush(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {
898 FileImpl
::poll_flush(self, cx
)
901 fn poll_close(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {
902 FileImpl
::poll_close(self, cx
)