1 //! The `pxar` decoder state machine.
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
7 use std
::ffi
::OsString
;
9 use std
::mem
::{self, size_of, size_of_val, MaybeUninit}
;
10 use std
::os
::unix
::ffi
::{OsStrExt, OsStringExt}
;
11 use std
::path
::{Path, PathBuf}
;
13 use std
::task
::{Context, Poll}
;
15 //use std::os::unix::fs::FileExt;
17 use endian_trait
::Endian
;
19 use crate::format
::{self, Header}
;
20 use crate::poll_fn
::poll_fn
;
21 use crate::util
::{self, io_err_other}
;
22 use crate::{Entry, EntryKind, Metadata}
;
28 pub use sync
::Decoder
;
30 /// To skip through non-seekable files.
31 static mut SCRATCH_BUFFER
: MaybeUninit
<[u8; 4096]> = MaybeUninit
::uninit();
33 fn scratch_buffer() -> &'
static mut [u8] {
34 unsafe { &mut (*SCRATCH_BUFFER.as_mut_ptr())[..] }
37 /// Sequential read interface used by the decoder's state machine.
39 /// To simply iterate through a directory we just need the equivalent of `poll_read()`.
41 /// Currently we also have a `poll_position()` method which can be added for types supporting
42 /// `Seek` or `AsyncSeek`. In this case the starting position of each entry becomes available
43 /// (accessible via the `Entry::offset()`), to allow jumping between entries.
45 /// Mostly we want to read sequentially, so this is basically an `AsyncRead` equivalent.
50 ) -> Poll
<io
::Result
<usize>>;
52 /// While going through the data we may want to take notes about some offsets within the file
53 /// for later. If the reader does not support seeking or positional reading, this can just
55 fn poll_position(self: Pin
<&mut Self>, _cx
: &mut Context
) -> Poll
<Option
<io
::Result
<u64>>> {
60 /// Allow using trait objects for generics taking a `SeqRead`:
61 impl<'a
> SeqRead
for &mut (dyn SeqRead
+ 'a
) {
66 ) -> Poll
<io
::Result
<usize>> {
68 self.map_unchecked_mut(|this
| &mut **this
)
69 .poll_seq_read(cx
, buf
)
73 fn poll_position(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Option
<io
::Result
<u64>>> {
74 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_position(cx) }
78 /// awaitable version of `poll_position`.
79 async
fn seq_read_position
<T
: SeqRead
+ ?Sized
>(input
: &mut T
) -> Option
<io
::Result
<u64>> {
80 poll_fn(|cx
| unsafe { Pin::new_unchecked(&mut *input).poll_position(cx) }
).await
83 /// awaitable version of `poll_seq_read`.
84 pub(crate) async
fn seq_read
<T
: SeqRead
+ ?Sized
>(
87 ) -> io
::Result
<usize> {
88 poll_fn(|cx
| unsafe { Pin::new_unchecked(&mut *input).poll_seq_read(cx, buf) }
).await
91 /// `read_exact` - since that's what we _actually_ want most of the time, but with EOF handling
92 async
fn seq_read_exact_or_eof
<T
>(input
: &mut T
, mut buf
: &mut [u8]) -> io
::Result
<Option
<()>>
96 let mut eof_ok
= true;
97 while !buf
.is_empty() {
98 match seq_read(&mut *input
, buf
).await?
{
99 0 if eof_ok
=> return Ok(None
),
100 0 => io_bail
!("unexpected EOF"),
101 got
=> buf
= &mut buf
[got
..],
108 /// `read_exact` - since that's what we _actually_ want most of the time.
109 async
fn seq_read_exact
<T
: SeqRead
+ ?Sized
>(input
: &mut T
, buf
: &mut [u8]) -> io
::Result
<()> {
110 match seq_read_exact_or_eof(input
, buf
).await?
{
112 None
=> io_bail
!("unexpected EOF"),
116 /// Helper to read into an allocated byte vector.
117 async
fn seq_read_exact_data
<T
>(input
: &mut T
, size
: usize) -> io
::Result
<Vec
<u8>>
121 let mut data
= unsafe { util::vec_new_uninitialized(size) }
;
122 seq_read_exact(input
, &mut data
[..]).await?
;
126 /// `seq_read_entry` with EOF handling
127 async
fn seq_read_entry_or_eof
<T
, E
>(input
: &mut T
) -> io
::Result
<Option
<E
>>
132 let mut data
= MaybeUninit
::<E
>::uninit();
134 unsafe { std::slice::from_raw_parts_mut(data.as_mut_ptr() as *mut u8, size_of::<E>()) }
;
135 if seq_read_exact_or_eof(input
, buf
).await?
.is_none() {
138 Ok(Some(unsafe { data.assume_init().from_le() }
))
141 /// Helper to read into an `Endian`-implementing `struct`.
142 async
fn seq_read_entry
<T
: SeqRead
+ ?Sized
, E
: Endian
>(input
: &mut T
) -> io
::Result
<E
> {
143 seq_read_entry_or_eof(input
)
145 .ok_or_else(|| io_format_err
!("unexpected EOF"))
148 /// The decoder state machine implementation.
150 /// We use `async fn` to implement the decoder state machine so that we can easily plug in both
151 /// synchronous or `async` I/O objects in as input.
152 pub(crate) struct DecoderImpl
<T
> {
154 current_header
: Header
,
156 path_lengths
: Vec
<usize>,
158 with_goodbye_tables
: bool
,
160 /// The random access code uses decoders for sub-ranges which may not end in a `PAYLOAD` for
161 /// entries like FIFOs or sockets, so there we explicitly allow an item to terminate with EOF.
162 eof_after_entry
: bool
,
172 /// file entries with no data (fifo, socket)
180 /// Control flow while parsing items.
182 /// When parsing an entry, we usually go through all of its attribute items. Once we reach the end
183 /// of the entry we stop.
184 /// Note that if we're in a directory, we stopped at the beginning of its contents.
185 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
186 pub(crate) enum ItemResult
{
187 /// We parsed an "attribute" item and should continue parsing.
190 /// We finished an entry (`SYMLINK`, `HARDLINK`, ...) or just entered the contents of a
191 /// directory (`FILENAME`, `GOODBYE`).
193 /// We stop moving forward at this point.
197 impl<I
: SeqRead
> DecoderImpl
<I
> {
198 pub async
fn new(input
: I
) -> io
::Result
<Self> {
199 Self::new_full(input
, "/".into(), false).await
202 pub(crate) fn input(&self) -> &I
{
206 pub(crate) async
fn new_full(
209 eof_after_entry
: bool
,
210 ) -> io
::Result
<Self> {
211 let this
= DecoderImpl
{
213 current_header
: unsafe { mem::zeroed() }
,
216 kind
: EntryKind
::GoodbyeTable
,
217 metadata
: Metadata
::default(),
219 path_lengths
: Vec
::new(),
221 with_goodbye_tables
: false,
225 // this.read_next_entry().await?;
230 /// Get the next file entry, recursing into directories.
231 pub async
fn next(&mut self) -> Option
<io
::Result
<Entry
>> {
232 self.next_do().await
.transpose()
235 async
fn next_do(&mut self) -> io
::Result
<Option
<Entry
>> {
238 State
::Eof
=> return Ok(None
),
239 State
::Begin
=> return self.read_next_entry().await
.map(Some
),
241 // we completely finished an entry, so now we're going "up" in the directory
242 // hierarchy and parse the next PXAR_FILENAME or the PXAR_GOODBYE:
243 self.read_next_item().await?
;
245 State
::InPayload { offset }
=> {
246 // We need to skip the current payload first.
247 self.skip_entry(offset
).await?
;
248 self.read_next_item().await?
;
250 State
::InGoodbyeTable
=> {
251 self.skip_entry(0).await?
;
252 if self.path_lengths
.pop().is_none() {
253 // The root directory has an entry containing '1'.
254 io_bail
!("unexpected EOF in goodbye table");
257 if self.path_lengths
.is_empty() {
258 // we are at the end of the archive now
259 self.state
= State
::Eof
;
263 // We left the directory, now keep going in our parent.
264 self.state
= State
::Default
;
267 State
::InSpecialFile
=> {
268 self.entry
.clear_data();
269 self.state
= State
::InDirectory
;
270 self.entry
.kind
= EntryKind
::Directory
;
272 State
::InDirectory
=> {
273 // We're at the next FILENAME or GOODBYE item.
277 match self.current_header
.htype
{
278 format
::PXAR_FILENAME
=> return self.handle_file_entry().await
,
279 format
::PXAR_GOODBYE
=> {
280 self.state
= State
::InGoodbyeTable
;
282 if self.with_goodbye_tables
{
283 self.entry
.clear_data();
284 return Ok(Some(Entry
{
285 path
: PathBuf
::new(),
286 metadata
: Metadata
::default(),
287 kind
: EntryKind
::GoodbyeTable
,
290 // go up to goodbye table handling
295 "expected filename or directory-goodbye pxar entry, got: {}",
302 pub fn content_size(&self) -> Option
<u64> {
303 if let State
::InPayload { .. }
= self.state
{
304 Some(self.current_header
.content_size())
310 pub fn content_reader(&mut self) -> Option
<Contents
<I
>> {
311 if let State
::InPayload { offset }
= &mut self.state
{
315 self.current_header
.content_size(),
322 async
fn handle_file_entry(&mut self) -> io
::Result
<Option
<Entry
>> {
323 let mut data
= self.read_entry_as_bytes().await?
;
325 // filenames are zero terminated!
326 if data
.pop() != Some(0) {
327 io_bail
!("illegal path found (missing terminating zero)");
330 crate::util
::validate_filename(&data
)?
;
332 let path
= PathBuf
::from(OsString
::from_vec(data
));
333 self.set_path(&path
)?
;
334 self.read_next_entry().await
.map(Some
)
337 fn reset_path(&mut self) -> io
::Result
<()> {
341 .ok_or_else(|| io_format_err
!("internal decoder error: path underrun"))?
;
342 let mut path
= mem
::replace(&mut self.entry
.path
, PathBuf
::new())
345 path
.truncate(path_len
);
346 self.entry
.path
= PathBuf
::from(OsString
::from_vec(path
));
350 fn set_path(&mut self, path
: &Path
) -> io
::Result
<()> {
352 self.entry
.path
.push(path
);
356 async
fn read_next_entry_or_eof(&mut self) -> io
::Result
<Option
<Entry
>> {
357 self.state
= State
::Default
;
358 self.entry
.clear_data();
360 let header
: Header
= match seq_read_entry_or_eof(&mut self.input
).await?
{
361 None
=> return Ok(None
),
362 Some(header
) => header
,
365 header
.check_header_size()?
;
367 if header
.htype
== format
::PXAR_HARDLINK
{
368 // The only "dangling" header without an 'Entry' in front of it because it does not
369 // carry its own metadata.
370 self.current_header
= header
;
372 // Hardlinks have no metadata and no additional items.
373 self.entry
.metadata
= Metadata
::default();
374 self.entry
.kind
= EntryKind
::Hardlink(self.read_hardlink().await?
);
376 Ok(Some(self.entry
.take()))
377 } else if header
.htype
== format
::PXAR_ENTRY
|| header
.htype
== format
::PXAR_ENTRY_V1
{
378 if header
.htype
== format
::PXAR_ENTRY
{
379 self.entry
.metadata
= Metadata
{
380 stat
: seq_read_entry(&mut self.input
).await?
,
383 } else if header
.htype
== format
::PXAR_ENTRY_V1
{
384 let stat
: format
::Stat_V1
= seq_read_entry(&mut self.input
).await?
;
386 self.entry
.metadata
= Metadata
{
394 self.current_header
= unsafe { mem::zeroed() }
;
397 match self.read_next_item_or_eof().await?
{
398 Some(ItemResult
::Entry
) => break,
399 Some(ItemResult
::Attribute
) => continue,
400 None
if self.eof_after_entry
=> {
401 // Single FIFOs and sockets (as received from the Accessor) won't reach a
402 // FILENAME/GOODBYE entry:
403 if self.entry
.metadata
.is_fifo() {
404 self.entry
.kind
= EntryKind
::Fifo
;
405 } else if self.entry
.metadata
.is_socket() {
406 self.entry
.kind
= EntryKind
::Socket
;
408 self.entry
.kind
= EntryKind
::Directory
;
412 None
=> io_bail
!("unexpected EOF in entry"),
416 if self.entry
.is_dir() {
418 .push(self.entry
.path
.as_os_str().as_bytes().len());
421 Ok(Some(self.entry
.take()))
423 io_bail
!("expected pxar entry of type 'Entry', got: {}", header
,);
427 async
fn read_next_entry(&mut self) -> io
::Result
<Entry
> {
428 self.read_next_entry_or_eof()
430 .ok_or_else(|| io_format_err
!("unexpected EOF"))
433 async
fn read_next_item(&mut self) -> io
::Result
<ItemResult
> {
434 match self.read_next_item_or_eof().await?
{
435 Some(item
) => Ok(item
),
436 None
=> io_bail
!("unexpected EOF"),
440 // NOTE: The random accessor will decode FIFOs and Sockets in a decoder instance with a ranged
441 // reader so there is no PAYLOAD or GOODBYE TABLE to "end" an entry.
443 // NOTE: This behavior method is also recreated in the accessor's `get_decoder_at_filename`
444 // function! Keep in mind when changing!
445 async
fn read_next_item_or_eof(&mut self) -> io
::Result
<Option
<ItemResult
>> {
446 match self.read_next_header_or_eof().await?
{
447 Some(()) => self.read_current_item().await
.map(Some
),
452 async
fn read_next_header_or_eof(&mut self) -> io
::Result
<Option
<()>> {
454 std
::slice
::from_raw_parts_mut(
455 &mut self.current_header
as *mut Header
as *mut u8,
456 size_of_val(&self.current_header
),
460 match seq_read_exact_or_eof(&mut self.input
, dest
).await?
{
462 self.current_header
.check_header_size()?
;
469 /// Read the next item, the header is already loaded.
470 async
fn read_current_item(&mut self) -> io
::Result
<ItemResult
> {
471 match self.current_header
.htype
{
472 format
::PXAR_XATTR
=> {
473 let xattr
= self.read_xattr().await?
;
474 self.entry
.metadata
.xattrs
.push(xattr
);
476 format
::PXAR_ACL_USER
=> {
477 let entry
= self.read_acl_user().await?
;
478 self.entry
.metadata
.acl
.users
.push(entry
);
480 format
::PXAR_ACL_GROUP
=> {
481 let entry
= self.read_acl_group().await?
;
482 self.entry
.metadata
.acl
.groups
.push(entry
);
484 format
::PXAR_ACL_GROUP_OBJ
=> {
485 if self.entry
.metadata
.acl
.group_obj
.is_some() {
486 io_bail
!("multiple acl group object entries detected");
488 let entry
= self.read_acl_group_object().await?
;
489 self.entry
.metadata
.acl
.group_obj
= Some(entry
);
491 format
::PXAR_ACL_DEFAULT
=> {
492 if self.entry
.metadata
.acl
.default.is_some() {
493 io_bail
!("multiple acl default entries detected");
495 let entry
= self.read_acl_default().await?
;
496 self.entry
.metadata
.acl
.default = Some(entry
);
498 format
::PXAR_ACL_DEFAULT_USER
=> {
499 let entry
= self.read_acl_user().await?
;
500 self.entry
.metadata
.acl
.default_users
.push(entry
);
502 format
::PXAR_ACL_DEFAULT_GROUP
=> {
503 let entry
= self.read_acl_group().await?
;
504 self.entry
.metadata
.acl
.default_groups
.push(entry
);
506 format
::PXAR_FCAPS
=> {
507 if self.entry
.metadata
.fcaps
.is_some() {
508 io_bail
!("multiple file capability entries detected");
510 let entry
= self.read_fcaps().await?
;
511 self.entry
.metadata
.fcaps
= Some(entry
);
513 format
::PXAR_QUOTA_PROJID
=> {
514 if self.entry
.metadata
.quota_project_id
.is_some() {
515 io_bail
!("multiple quota project id entries detected");
517 let entry
= self.read_quota_project_id().await?
;
518 self.entry
.metadata
.quota_project_id
= Some(entry
);
520 format
::PXAR_SYMLINK
=> {
521 self.entry
.kind
= EntryKind
::Symlink(self.read_symlink().await?
);
522 return Ok(ItemResult
::Entry
);
524 format
::PXAR_HARDLINK
=> io_bail
!("encountered unexpected hardlink entry"),
525 format
::PXAR_DEVICE
=> {
526 self.entry
.kind
= EntryKind
::Device(self.read_device().await?
);
527 return Ok(ItemResult
::Entry
);
529 format
::PXAR_PAYLOAD
=> {
530 let offset
= seq_read_position(&mut self.input
).await
.transpose()?
;
531 self.entry
.kind
= EntryKind
::File
{
532 size
: self.current_header
.content_size(),
535 self.state
= State
::InPayload { offset: 0 }
;
536 return Ok(ItemResult
::Entry
);
538 format
::PXAR_FILENAME
| format
::PXAR_GOODBYE
=> {
539 if self.entry
.metadata
.is_fifo() {
540 self.state
= State
::InSpecialFile
;
541 self.entry
.kind
= EntryKind
::Fifo
;
542 } else if self.entry
.metadata
.is_socket() {
543 self.state
= State
::InSpecialFile
;
544 self.entry
.kind
= EntryKind
::Socket
;
546 // As a shortcut this is copy-pasted to `next_do`'s `InSpecialFile` case.
547 // Keep in mind when editing this!
548 self.state
= State
::InDirectory
;
549 self.entry
.kind
= EntryKind
::Directory
;
551 return Ok(ItemResult
::Entry
);
553 _
=> io_bail
!("unexpected entry type: {}", self.current_header
),
556 Ok(ItemResult
::Attribute
)
560 // Local read helpers.
562 // These utilize additional information and hence are not part of the `dyn SeqRead` impl.
565 async
fn skip_entry(&mut self, offset
: u64) -> io
::Result
<()> {
566 let mut len
= self.current_header
.content_size() - offset
;
567 let scratch
= scratch_buffer();
568 while len
>= (scratch
.len() as u64) {
569 seq_read_exact(&mut self.input
, scratch
).await?
;
570 len
-= scratch
.len() as u64;
572 let len
= len
as usize;
574 seq_read_exact(&mut self.input
, &mut scratch
[..len
]).await?
;
579 async
fn read_entry_as_bytes(&mut self) -> io
::Result
<Vec
<u8>> {
580 let size
= usize::try_from(self.current_header
.content_size()).map_err(io_err_other
)?
;
581 let data
= seq_read_exact_data(&mut self.input
, size
).await?
;
585 /// Helper to read a struct entry while checking its size.
586 async
fn read_simple_entry
<T
: Endian
+ '
static>(
590 if self.current_header
.content_size() != (size_of
::<T
>() as u64) {
592 "bad {} size: {} (expected {})",
594 self.current_header
.content_size(),
598 seq_read_entry(&mut self.input
).await
602 // Read functions for PXAR components.
605 async
fn read_xattr(&mut self) -> io
::Result
<format
::XAttr
> {
606 let data
= self.read_entry_as_bytes().await?
;
610 .position(|c
| *c
== 0)
611 .ok_or_else(|| io_format_err
!("missing value separator in xattr"))?
;
613 Ok(format
::XAttr { data, name_len }
)
616 async
fn read_symlink(&mut self) -> io
::Result
<format
::Symlink
> {
617 let data
= self.read_entry_as_bytes().await?
;
618 Ok(format
::Symlink { data }
)
621 async
fn read_hardlink(&mut self) -> io
::Result
<format
::Hardlink
> {
623 usize::try_from(self.current_header
.content_size()).map_err(io_err_other
)?
;
625 if content_size
<= size_of
::<u64>() {
626 io_bail
!("bad hardlink entry (too small)");
628 let data_size
= content_size
- size_of
::<u64>();
630 let offset
: u64 = seq_read_entry(&mut self.input
).await?
;
631 let data
= seq_read_exact_data(&mut self.input
, data_size
).await?
;
633 Ok(format
::Hardlink { offset, data }
)
636 async
fn read_device(&mut self) -> io
::Result
<format
::Device
> {
637 self.read_simple_entry("device").await
640 async
fn read_fcaps(&mut self) -> io
::Result
<format
::FCaps
> {
641 let data
= self.read_entry_as_bytes().await?
;
642 Ok(format
::FCaps { data }
)
645 async
fn read_acl_user(&mut self) -> io
::Result
<format
::acl
::User
> {
646 self.read_simple_entry("acl user").await
649 async
fn read_acl_group(&mut self) -> io
::Result
<format
::acl
::Group
> {
650 self.read_simple_entry("acl group").await
653 async
fn read_acl_group_object(&mut self) -> io
::Result
<format
::acl
::GroupObject
> {
654 self.read_simple_entry("acl group object").await
657 async
fn read_acl_default(&mut self) -> io
::Result
<format
::acl
::Default
> {
658 self.read_simple_entry("acl default").await
661 async
fn read_quota_project_id(&mut self) -> io
::Result
<format
::QuotaProjectId
> {
662 self.read_simple_entry("quota project id").await
666 /// Reader for file contents inside a pxar archive.
667 pub struct Contents
<'a
, T
: SeqRead
> {
673 impl<'a
, T
: SeqRead
> Contents
<'a
, T
> {
674 fn new(input
: &'a
mut T
, at
: &'a
mut u64, len
: u64) -> Self {
675 Self { input, at, len }
679 fn remaining(&self) -> u64 {
684 impl<'a
, T
: SeqRead
> SeqRead
for Contents
<'a
, T
> {
686 mut self: Pin
<&mut Self>,
689 ) -> Poll
<io
::Result
<usize>> {
690 let max_read
= (buf
.len() as u64).min(self.remaining()) as usize;
692 return Poll
::Ready(Ok(0));
695 let buf
= &mut buf
[..max_read
];
696 let got
= ready
!(unsafe { Pin::new_unchecked(&mut *self.input) }
.poll_seq_read(cx
, buf
))?
;
697 *self.at
+= got
as u64;
701 fn poll_position(mut self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Option
<io
::Result
<u64>>> {
702 unsafe { Pin::new_unchecked(&mut *self.input) }
.poll_position(cx
)