]> git.proxmox.com Git - pxar.git/blob - src/decoder/mod.rs
b848f52e56b45cca028f798e66a3376e4d39f53a
[pxar.git] / src / decoder / mod.rs
1 //! The `pxar` decoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 #![deny(missing_docs)]
6
7 use std::ffi::OsString;
8 use std::io;
9 use std::mem::{self, size_of, size_of_val, MaybeUninit};
10 use std::os::unix::ffi::{OsStrExt, OsStringExt};
11 use std::path::{Path, PathBuf};
12 use std::pin::Pin;
13 use std::task::{Context, Poll};
14
15 //use std::os::unix::fs::FileExt;
16
17 use endian_trait::Endian;
18
19 use crate::format::{self, Header};
20 use crate::poll_fn::poll_fn;
21 use crate::util::{self, io_err_other};
22 use crate::{Entry, EntryKind, Metadata};
23
24 pub mod aio;
25 pub mod sync;
26
27 #[doc(inline)]
28 pub use sync::Decoder;
29
30 /// To skip through non-seekable files.
31 static mut SCRATCH_BUFFER: MaybeUninit<[u8; 4096]> = MaybeUninit::uninit();
32
33 fn scratch_buffer() -> &'static mut [u8] {
34 unsafe { &mut (*SCRATCH_BUFFER.as_mut_ptr())[..] }
35 }
36
37 /// Sequential read interface used by the decoder's state machine.
38 ///
39 /// To simply iterate through a directory we just need the equivalent of `poll_read()`.
40 ///
41 /// Currently we also have a `poll_position()` method which can be added for types supporting
42 /// `Seek` or `AsyncSeek`. In this case the starting position of each entry becomes available
43 /// (accessible via the `Entry::offset()`), to allow jumping between entries.
44 pub trait SeqRead {
45 /// Mostly we want to read sequentially, so this is basically an `AsyncRead` equivalent.
46 fn poll_seq_read(
47 self: Pin<&mut Self>,
48 cx: &mut Context,
49 buf: &mut [u8],
50 ) -> Poll<io::Result<usize>>;
51
52 /// While going through the data we may want to take notes about some offsets within the file
53 /// for later. If the reader does not support seeking or positional reading, this can just
54 /// return `None`.
55 fn poll_position(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<io::Result<u64>>> {
56 Poll::Ready(None)
57 }
58 }
59
60 /// Allow using trait objects for generics taking a `SeqRead`:
61 impl<'a> SeqRead for &mut (dyn SeqRead + 'a) {
62 fn poll_seq_read(
63 self: Pin<&mut Self>,
64 cx: &mut Context,
65 buf: &mut [u8],
66 ) -> Poll<io::Result<usize>> {
67 unsafe {
68 self.map_unchecked_mut(|this| &mut **this)
69 .poll_seq_read(cx, buf)
70 }
71 }
72
73 fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<io::Result<u64>>> {
74 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_position(cx) }
75 }
76 }
77
78 /// awaitable version of `poll_position`.
79 async fn seq_read_position<T: SeqRead + ?Sized>(input: &mut T) -> Option<io::Result<u64>> {
80 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *input).poll_position(cx) }).await
81 }
82
83 /// awaitable version of `poll_seq_read`.
84 pub(crate) async fn seq_read<T: SeqRead + ?Sized>(
85 input: &mut T,
86 buf: &mut [u8],
87 ) -> io::Result<usize> {
88 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *input).poll_seq_read(cx, buf) }).await
89 }
90
91 /// `read_exact` - since that's what we _actually_ want most of the time, but with EOF handling
92 async fn seq_read_exact_or_eof<T>(input: &mut T, mut buf: &mut [u8]) -> io::Result<Option<()>>
93 where
94 T: SeqRead + ?Sized,
95 {
96 let mut eof_ok = true;
97 while !buf.is_empty() {
98 match seq_read(&mut *input, buf).await? {
99 0 if eof_ok => return Ok(None),
100 0 => io_bail!("unexpected EOF"),
101 got => buf = &mut buf[got..],
102 }
103 eof_ok = false;
104 }
105 Ok(Some(()))
106 }
107
108 /// `read_exact` - since that's what we _actually_ want most of the time.
109 async fn seq_read_exact<T: SeqRead + ?Sized>(input: &mut T, buf: &mut [u8]) -> io::Result<()> {
110 match seq_read_exact_or_eof(input, buf).await? {
111 Some(()) => Ok(()),
112 None => io_bail!("unexpected EOF"),
113 }
114 }
115
116 /// Helper to read into an allocated byte vector.
117 async fn seq_read_exact_data<T>(input: &mut T, size: usize) -> io::Result<Vec<u8>>
118 where
119 T: SeqRead + ?Sized,
120 {
121 let mut data = unsafe { util::vec_new_uninitialized(size) };
122 seq_read_exact(input, &mut data[..]).await?;
123 Ok(data)
124 }
125
126 /// `seq_read_entry` with EOF handling
127 async fn seq_read_entry_or_eof<T, E>(input: &mut T) -> io::Result<Option<E>>
128 where
129 T: SeqRead + ?Sized,
130 E: Endian,
131 {
132 let mut data = MaybeUninit::<E>::uninit();
133 let buf =
134 unsafe { std::slice::from_raw_parts_mut(data.as_mut_ptr() as *mut u8, size_of::<E>()) };
135 if seq_read_exact_or_eof(input, buf).await?.is_none() {
136 return Ok(None);
137 }
138 Ok(Some(unsafe { data.assume_init().from_le() }))
139 }
140
141 /// Helper to read into an `Endian`-implementing `struct`.
142 async fn seq_read_entry<T: SeqRead + ?Sized, E: Endian>(input: &mut T) -> io::Result<E> {
143 seq_read_entry_or_eof(input)
144 .await?
145 .ok_or_else(|| io_format_err!("unexpected EOF"))
146 }
147
148 /// The decoder state machine implementation.
149 ///
150 /// We use `async fn` to implement the decoder state machine so that we can easily plug in both
151 /// synchronous or `async` I/O objects in as input.
152 pub(crate) struct DecoderImpl<T> {
153 pub(crate) input: T,
154 current_header: Header,
155 entry: Entry,
156 path_lengths: Vec<usize>,
157 state: State,
158 with_goodbye_tables: bool,
159
160 /// The random access code uses decoders for sub-ranges which may not end in a `PAYLOAD` for
161 /// entries like FIFOs or sockets, so there we explicitly allow an item to terminate with EOF.
162 eof_after_entry: bool,
163 }
164
165 enum State {
166 Begin,
167 Default,
168 InPayload {
169 offset: u64,
170 },
171
172 /// file entries with no data (fifo, socket)
173 InSpecialFile,
174
175 InGoodbyeTable,
176 InDirectory,
177 Eof,
178 }
179
180 /// Control flow while parsing items.
181 ///
182 /// When parsing an entry, we usually go through all of its attribute items. Once we reach the end
183 /// of the entry we stop.
184 /// Note that if we're in a directory, we stopped at the beginning of its contents.
185 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
186 pub(crate) enum ItemResult {
187 /// We parsed an "attribute" item and should continue parsing.
188 Attribute,
189
190 /// We finished an entry (`SYMLINK`, `HARDLINK`, ...) or just entered the contents of a
191 /// directory (`FILENAME`, `GOODBYE`).
192 ///
193 /// We stop moving forward at this point.
194 Entry,
195 }
196
197 impl<I: SeqRead> DecoderImpl<I> {
198 pub async fn new(input: I) -> io::Result<Self> {
199 Self::new_full(input, "/".into(), false).await
200 }
201
202 pub(crate) fn input(&self) -> &I {
203 &self.input
204 }
205
206 pub(crate) async fn new_full(
207 input: I,
208 path: PathBuf,
209 eof_after_entry: bool,
210 ) -> io::Result<Self> {
211 let this = DecoderImpl {
212 input,
213 current_header: unsafe { mem::zeroed() },
214 entry: Entry {
215 path,
216 kind: EntryKind::GoodbyeTable,
217 metadata: Metadata::default(),
218 },
219 path_lengths: Vec::new(),
220 state: State::Begin,
221 with_goodbye_tables: false,
222 eof_after_entry,
223 };
224
225 // this.read_next_entry().await?;
226
227 Ok(this)
228 }
229
230 /// Get the next file entry, recursing into directories.
231 pub async fn next(&mut self) -> Option<io::Result<Entry>> {
232 self.next_do().await.transpose()
233 }
234
235 async fn next_do(&mut self) -> io::Result<Option<Entry>> {
236 loop {
237 match self.state {
238 State::Eof => return Ok(None),
239 State::Begin => return self.read_next_entry().await.map(Some),
240 State::Default => {
241 // we completely finished an entry, so now we're going "up" in the directory
242 // hierarchy and parse the next PXAR_FILENAME or the PXAR_GOODBYE:
243 self.read_next_item().await?;
244 }
245 State::InPayload { offset } => {
246 // We need to skip the current payload first.
247 self.skip_entry(offset).await?;
248 self.read_next_item().await?;
249 }
250 State::InGoodbyeTable => {
251 self.skip_entry(0).await?;
252 if self.path_lengths.pop().is_none() {
253 // The root directory has an entry containing '1'.
254 io_bail!("unexpected EOF in goodbye table");
255 }
256
257 if self.path_lengths.is_empty() {
258 // we are at the end of the archive now
259 self.state = State::Eof;
260 return Ok(None);
261 }
262
263 // We left the directory, now keep going in our parent.
264 self.state = State::Default;
265 continue;
266 }
267 State::InSpecialFile => {
268 self.entry.clear_data();
269 self.state = State::InDirectory;
270 self.entry.kind = EntryKind::Directory;
271 }
272 State::InDirectory => {
273 // We're at the next FILENAME or GOODBYE item.
274 }
275 }
276
277 match self.current_header.htype {
278 format::PXAR_FILENAME => return self.handle_file_entry().await,
279 format::PXAR_GOODBYE => {
280 self.state = State::InGoodbyeTable;
281
282 if self.with_goodbye_tables {
283 self.entry.clear_data();
284 return Ok(Some(Entry {
285 path: PathBuf::new(),
286 metadata: Metadata::default(),
287 kind: EntryKind::GoodbyeTable,
288 }));
289 } else {
290 // go up to goodbye table handling
291 continue;
292 }
293 }
294 _ => io_bail!(
295 "expected filename or directory-goodbye pxar entry, got: {}",
296 self.current_header,
297 ),
298 }
299 }
300 }
301
302 pub fn content_size(&self) -> Option<u64> {
303 if let State::InPayload { .. } = self.state {
304 Some(self.current_header.content_size())
305 } else {
306 None
307 }
308 }
309
310 pub fn content_reader(&mut self) -> Option<Contents<I>> {
311 if let State::InPayload { offset } = &mut self.state {
312 Some(Contents::new(
313 &mut self.input,
314 offset,
315 self.current_header.content_size(),
316 ))
317 } else {
318 None
319 }
320 }
321
322 async fn handle_file_entry(&mut self) -> io::Result<Option<Entry>> {
323 let mut data = self.read_entry_as_bytes().await?;
324
325 // filenames are zero terminated!
326 if data.pop() != Some(0) {
327 io_bail!("illegal path found (missing terminating zero)");
328 }
329
330 crate::util::validate_filename(&data)?;
331
332 let path = PathBuf::from(OsString::from_vec(data));
333 self.set_path(&path)?;
334 self.read_next_entry().await.map(Some)
335 }
336
337 fn reset_path(&mut self) -> io::Result<()> {
338 let path_len = *self
339 .path_lengths
340 .last()
341 .ok_or_else(|| io_format_err!("internal decoder error: path underrun"))?;
342 let mut path = mem::replace(&mut self.entry.path, PathBuf::new())
343 .into_os_string()
344 .into_vec();
345 path.truncate(path_len);
346 self.entry.path = PathBuf::from(OsString::from_vec(path));
347 Ok(())
348 }
349
350 fn set_path(&mut self, path: &Path) -> io::Result<()> {
351 self.reset_path()?;
352 self.entry.path.push(path);
353 Ok(())
354 }
355
356 async fn read_next_entry_or_eof(&mut self) -> io::Result<Option<Entry>> {
357 self.state = State::Default;
358 self.entry.clear_data();
359
360 let header: Header = match seq_read_entry_or_eof(&mut self.input).await? {
361 None => return Ok(None),
362 Some(header) => header,
363 };
364
365 header.check_header_size()?;
366
367 if header.htype == format::PXAR_HARDLINK {
368 // The only "dangling" header without an 'Entry' in front of it because it does not
369 // carry its own metadata.
370 self.current_header = header;
371
372 // Hardlinks have no metadata and no additional items.
373 self.entry.metadata = Metadata::default();
374 self.entry.kind = EntryKind::Hardlink(self.read_hardlink().await?);
375
376 Ok(Some(self.entry.take()))
377 } else if header.htype == format::PXAR_ENTRY || header.htype == format::PXAR_ENTRY_V1 {
378 if header.htype == format::PXAR_ENTRY {
379 self.entry.metadata = Metadata {
380 stat: seq_read_entry(&mut self.input).await?,
381 ..Default::default()
382 };
383 } else if header.htype == format::PXAR_ENTRY_V1 {
384 let stat: format::Stat_V1 = seq_read_entry(&mut self.input).await?;
385
386 self.entry.metadata = Metadata {
387 stat: stat.into(),
388 ..Default::default()
389 };
390 } else {
391 unreachable!();
392 }
393
394 self.current_header = unsafe { mem::zeroed() };
395
396 loop {
397 match self.read_next_item_or_eof().await? {
398 Some(ItemResult::Entry) => break,
399 Some(ItemResult::Attribute) => continue,
400 None if self.eof_after_entry => break,
401 None => io_bail!("unexpected EOF in entry"),
402 }
403 }
404
405 if self.entry.is_dir() {
406 self.path_lengths
407 .push(self.entry.path.as_os_str().as_bytes().len());
408 }
409
410 Ok(Some(self.entry.take()))
411 } else {
412 io_bail!("expected pxar entry of type 'Entry', got: {}", header,);
413 }
414 }
415
416 async fn read_next_entry(&mut self) -> io::Result<Entry> {
417 self.read_next_entry_or_eof()
418 .await?
419 .ok_or_else(|| io_format_err!("unexpected EOF"))
420 }
421
422 async fn read_next_item(&mut self) -> io::Result<ItemResult> {
423 match self.read_next_item_or_eof().await? {
424 Some(item) => Ok(item),
425 None => io_bail!("unexpected EOF"),
426 }
427 }
428
429 // NOTE: The random accessor will decode FIFOs and Sockets in a decoder instance with a ranged
430 // reader so there is no PAYLOAD or GOODBYE TABLE to "end" an entry.
431 //
432 // NOTE: This behavior method is also recreated in the accessor's `get_decoder_at_filename`
433 // function! Keep in mind when changing!
434 async fn read_next_item_or_eof(&mut self) -> io::Result<Option<ItemResult>> {
435 match self.read_next_header_or_eof().await? {
436 Some(()) => self.read_current_item().await.map(Some),
437 None => Ok(None),
438 }
439 }
440
441 async fn read_next_header_or_eof(&mut self) -> io::Result<Option<()>> {
442 let dest = unsafe {
443 std::slice::from_raw_parts_mut(
444 &mut self.current_header as *mut Header as *mut u8,
445 size_of_val(&self.current_header),
446 )
447 };
448
449 match seq_read_exact_or_eof(&mut self.input, dest).await? {
450 Some(()) => {
451 self.current_header.check_header_size()?;
452 Ok(Some(()))
453 }
454 None => Ok(None),
455 }
456 }
457
458 /// Read the next item, the header is already loaded.
459 async fn read_current_item(&mut self) -> io::Result<ItemResult> {
460 match self.current_header.htype {
461 format::PXAR_XATTR => {
462 let xattr = self.read_xattr().await?;
463 self.entry.metadata.xattrs.push(xattr);
464 }
465 format::PXAR_ACL_USER => {
466 let entry = self.read_acl_user().await?;
467 self.entry.metadata.acl.users.push(entry);
468 }
469 format::PXAR_ACL_GROUP => {
470 let entry = self.read_acl_group().await?;
471 self.entry.metadata.acl.groups.push(entry);
472 }
473 format::PXAR_ACL_GROUP_OBJ => {
474 if self.entry.metadata.acl.group_obj.is_some() {
475 io_bail!("multiple acl group object entries detected");
476 }
477 let entry = self.read_acl_group_object().await?;
478 self.entry.metadata.acl.group_obj = Some(entry);
479 }
480 format::PXAR_ACL_DEFAULT => {
481 if self.entry.metadata.acl.default.is_some() {
482 io_bail!("multiple acl default entries detected");
483 }
484 let entry = self.read_acl_default().await?;
485 self.entry.metadata.acl.default = Some(entry);
486 }
487 format::PXAR_ACL_DEFAULT_USER => {
488 let entry = self.read_acl_user().await?;
489 self.entry.metadata.acl.default_users.push(entry);
490 }
491 format::PXAR_ACL_DEFAULT_GROUP => {
492 let entry = self.read_acl_group().await?;
493 self.entry.metadata.acl.default_groups.push(entry);
494 }
495 format::PXAR_FCAPS => {
496 if self.entry.metadata.fcaps.is_some() {
497 io_bail!("multiple file capability entries detected");
498 }
499 let entry = self.read_fcaps().await?;
500 self.entry.metadata.fcaps = Some(entry);
501 }
502 format::PXAR_QUOTA_PROJID => {
503 if self.entry.metadata.quota_project_id.is_some() {
504 io_bail!("multiple quota project id entries detected");
505 }
506 let entry = self.read_quota_project_id().await?;
507 self.entry.metadata.quota_project_id = Some(entry);
508 }
509 format::PXAR_SYMLINK => {
510 self.entry.kind = EntryKind::Symlink(self.read_symlink().await?);
511 return Ok(ItemResult::Entry);
512 }
513 format::PXAR_HARDLINK => io_bail!("encountered unexpected hardlink entry"),
514 format::PXAR_DEVICE => {
515 self.entry.kind = EntryKind::Device(self.read_device().await?);
516 return Ok(ItemResult::Entry);
517 }
518 format::PXAR_PAYLOAD => {
519 let offset = seq_read_position(&mut self.input).await.transpose()?;
520 self.entry.kind = EntryKind::File {
521 size: self.current_header.content_size(),
522 offset,
523 };
524 self.state = State::InPayload { offset: 0 };
525 return Ok(ItemResult::Entry);
526 }
527 format::PXAR_FILENAME | format::PXAR_GOODBYE => {
528 if self.entry.metadata.is_fifo() {
529 self.state = State::InSpecialFile;
530 self.entry.kind = EntryKind::Fifo;
531 } else if self.entry.metadata.is_socket() {
532 self.state = State::InSpecialFile;
533 self.entry.kind = EntryKind::Socket;
534 } else {
535 // As a shortcut this is copy-pasted to `next_do`'s `InSpecialFile` case.
536 // Keep in mind when editing this!
537 self.state = State::InDirectory;
538 self.entry.kind = EntryKind::Directory;
539 }
540 return Ok(ItemResult::Entry);
541 }
542 _ => io_bail!("unexpected entry type: {}", self.current_header),
543 }
544
545 Ok(ItemResult::Attribute)
546 }
547
548 //
549 // Local read helpers.
550 //
551 // These utilize additional information and hence are not part of the `dyn SeqRead` impl.
552 //
553
554 async fn skip_entry(&mut self, offset: u64) -> io::Result<()> {
555 let mut len = self.current_header.content_size() - offset;
556 let scratch = scratch_buffer();
557 while len >= (scratch.len() as u64) {
558 seq_read_exact(&mut self.input, scratch).await?;
559 len -= scratch.len() as u64;
560 }
561 let len = len as usize;
562 if len > 0 {
563 seq_read_exact(&mut self.input, &mut scratch[..len]).await?;
564 }
565 Ok(())
566 }
567
568 async fn read_entry_as_bytes(&mut self) -> io::Result<Vec<u8>> {
569 let size = usize::try_from(self.current_header.content_size()).map_err(io_err_other)?;
570 let data = seq_read_exact_data(&mut self.input, size).await?;
571 Ok(data)
572 }
573
574 /// Helper to read a struct entry while checking its size.
575 async fn read_simple_entry<T: Endian + 'static>(
576 &mut self,
577 what: &'static str,
578 ) -> io::Result<T> {
579 if self.current_header.content_size() != (size_of::<T>() as u64) {
580 io_bail!(
581 "bad {} size: {} (expected {})",
582 what,
583 self.current_header.content_size(),
584 size_of::<T>(),
585 );
586 }
587 seq_read_entry(&mut self.input).await
588 }
589
590 //
591 // Read functions for PXAR components.
592 //
593
594 async fn read_xattr(&mut self) -> io::Result<format::XAttr> {
595 let data = self.read_entry_as_bytes().await?;
596
597 let name_len = data
598 .iter()
599 .position(|c| *c == 0)
600 .ok_or_else(|| io_format_err!("missing value separator in xattr"))?;
601
602 Ok(format::XAttr { data, name_len })
603 }
604
605 async fn read_symlink(&mut self) -> io::Result<format::Symlink> {
606 let data = self.read_entry_as_bytes().await?;
607 Ok(format::Symlink { data })
608 }
609
610 async fn read_hardlink(&mut self) -> io::Result<format::Hardlink> {
611 let content_size =
612 usize::try_from(self.current_header.content_size()).map_err(io_err_other)?;
613
614 if content_size <= size_of::<u64>() {
615 io_bail!("bad hardlink entry (too small)");
616 }
617 let data_size = content_size - size_of::<u64>();
618
619 let offset: u64 = seq_read_entry(&mut self.input).await?;
620 let data = seq_read_exact_data(&mut self.input, data_size).await?;
621
622 Ok(format::Hardlink { offset, data })
623 }
624
625 async fn read_device(&mut self) -> io::Result<format::Device> {
626 self.read_simple_entry("device").await
627 }
628
629 async fn read_fcaps(&mut self) -> io::Result<format::FCaps> {
630 let data = self.read_entry_as_bytes().await?;
631 Ok(format::FCaps { data })
632 }
633
634 async fn read_acl_user(&mut self) -> io::Result<format::acl::User> {
635 self.read_simple_entry("acl user").await
636 }
637
638 async fn read_acl_group(&mut self) -> io::Result<format::acl::Group> {
639 self.read_simple_entry("acl group").await
640 }
641
642 async fn read_acl_group_object(&mut self) -> io::Result<format::acl::GroupObject> {
643 self.read_simple_entry("acl group object").await
644 }
645
646 async fn read_acl_default(&mut self) -> io::Result<format::acl::Default> {
647 self.read_simple_entry("acl default").await
648 }
649
650 async fn read_quota_project_id(&mut self) -> io::Result<format::QuotaProjectId> {
651 self.read_simple_entry("quota project id").await
652 }
653 }
654
655 /// Reader for file contents inside a pxar archive.
656 pub struct Contents<'a, T: SeqRead> {
657 input: &'a mut T,
658 at: &'a mut u64,
659 len: u64,
660 }
661
662 impl<'a, T: SeqRead> Contents<'a, T> {
663 fn new(input: &'a mut T, at: &'a mut u64, len: u64) -> Self {
664 Self { input, at, len }
665 }
666
667 #[inline]
668 fn remaining(&self) -> u64 {
669 self.len - *self.at
670 }
671 }
672
673 impl<'a, T: SeqRead> SeqRead for Contents<'a, T> {
674 fn poll_seq_read(
675 mut self: Pin<&mut Self>,
676 cx: &mut Context,
677 buf: &mut [u8],
678 ) -> Poll<io::Result<usize>> {
679 let max_read = (buf.len() as u64).min(self.remaining()) as usize;
680 if max_read == 0 {
681 return Poll::Ready(Ok(0));
682 }
683
684 let buf = &mut buf[..max_read];
685 let got = ready!(unsafe { Pin::new_unchecked(&mut *self.input) }.poll_seq_read(cx, buf))?;
686 *self.at += got as u64;
687 Poll::Ready(Ok(got))
688 }
689
690 fn poll_position(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<io::Result<u64>>> {
691 unsafe { Pin::new_unchecked(&mut *self.input) }.poll_position(cx)
692 }
693 }