]> git.proxmox.com Git - pxar.git/blob - src/decoder.rs
import
[pxar.git] / src / decoder.rs
1 //! The `pxar` decoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 use std::convert::TryFrom;
6 use std::ffi::OsString;
7 use std::io;
8 use std::mem::{self, size_of, size_of_val, MaybeUninit};
9 use std::os::unix::ffi::{OsStrExt, OsStringExt};
10 use std::path::{Path, PathBuf};
11 use std::pin::Pin;
12 use std::task::{Context, Poll};
13
14 //use std::os::unix::fs::FileExt;
15
16 use endian_trait::Endian;
17
18 use crate::format::{self, Header};
19 use crate::poll_fn::poll_fn;
20 use crate::util::{self, io_err_other};
21 use crate::{Entry, EntryKind, Metadata};
22
23 pub mod aio;
24 pub mod sync;
25
26 #[doc(inline)]
27 pub use sync::Decoder;
28
29 /// To skip through non-seekable files.
30 static mut SCRATCH_BUFFER: MaybeUninit<[u8; 4096]> = MaybeUninit::uninit();
31
32 fn scratch_buffer() -> &'static mut [u8] {
33 unsafe { &mut (*SCRATCH_BUFFER.as_mut_ptr())[..] }
34 }
35
36 /// Sequential read interface used by the decoder's state machine.
37 ///
38 /// To simply iterate through a directory we just need the equivalent of `poll_read()`.
39 ///
40 /// Currently we also have a `poll_position()` method which can be added for types supporting
41 /// `Seek` or `AsyncSeek`. In this case the starting position of each entry becomes available
42 /// (accessible via the `Entry::offset()`), to allow jumping between entries.
43 pub trait SeqRead {
44 /// Mostly we want to read sequentially, so this is basically an `AsyncRead` equivalent.
45 fn poll_seq_read(
46 self: Pin<&mut Self>,
47 cx: &mut Context,
48 buf: &mut [u8],
49 ) -> Poll<io::Result<usize>>;
50
51 /// While going through the data we may want to take notes about some offsets within the file
52 /// for later. If the reader does not support seeking or positional reading, this can just
53 /// return `None`.
54 fn poll_position(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<io::Result<u64>>> {
55 Poll::Ready(None)
56 }
57 }
58
59 /// Allow using trait objects for generics taking a `SeqRead`:
60 impl<'a> SeqRead for &mut (dyn SeqRead + 'a) {
61 fn poll_seq_read(
62 self: Pin<&mut Self>,
63 cx: &mut Context,
64 buf: &mut [u8],
65 ) -> Poll<io::Result<usize>> {
66 unsafe {
67 self.map_unchecked_mut(|this| &mut **this)
68 .poll_seq_read(cx, buf)
69 }
70 }
71
72 fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<io::Result<u64>>> {
73 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_position(cx) }
74 }
75 }
76
77 /// We do not want to bother with actual polling, so we implement `async fn` variants of the above
78 /// on `dyn SeqRead`.
79 ///
80 /// The reason why this is not an internal `SeqReadExt` trait like `AsyncReadExt` is simply that
81 /// we'd then need to define all the `Future` types they return manually and explicitly. Since we
82 /// have no use for them, all we want is the ability to use `async fn`...
83 ///
84 /// The downside is that we need some `(&mut self.input as &mut dyn SeqRead)` casts in the
85 /// decoder's code, but that's fine.
86 impl<'a> dyn SeqRead + 'a {
87 /// awaitable version of `poll_position`.
88 async fn position(&mut self) -> Option<io::Result<u64>> {
89 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *self).poll_position(cx) }).await
90 }
91
92 /// awaitable version of `poll_seq_read`.
93 async fn seq_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
94 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *self).poll_seq_read(cx, buf) }).await
95 }
96
97 /// `read_exact` - since that's what we _actually_ want most of the time, but with EOF handling
98 async fn seq_read_exact_or_eof(&mut self, mut buf: &mut [u8]) -> io::Result<Option<()>> {
99 let mut eof_ok = true;
100 while !buf.is_empty() {
101 match self.seq_read(buf).await? {
102 0 if eof_ok => break,
103 0 => io_bail!("unexpected EOF"),
104 got => buf = &mut buf[got..],
105 }
106 eof_ok = false;
107 }
108 Ok(Some(()))
109 }
110
111 /// `read_exact` - since that's what we _actually_ want most of the time.
112 async fn seq_read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
113 match self.seq_read_exact_or_eof(buf).await? {
114 Some(()) => Ok(()),
115 None => io_bail!("unexpected eof"),
116 }
117 }
118
119 /// Helper to read into an allocated byte vector.
120 async fn seq_read_exact_data(&mut self, size: usize) -> io::Result<Vec<u8>> {
121 let mut data = util::vec_new(size);
122 self.seq_read_exact(&mut data[..]).await?;
123 Ok(data)
124 }
125
126 /// `seq_read_entry` with EOF handling
127 async fn seq_read_entry_or_eof<T: Endian>(&mut self) -> io::Result<Option<T>> {
128 let mut data = MaybeUninit::<T>::uninit();
129 let buf =
130 unsafe { std::slice::from_raw_parts_mut(data.as_mut_ptr() as *mut u8, size_of::<T>()) };
131 if self.seq_read_exact_or_eof(buf).await?.is_none() {
132 return Ok(None);
133 }
134 Ok(Some(unsafe { data.assume_init().from_le() }))
135 }
136
137 /// Helper to read into an `Endian`-implementing `struct`.
138 async fn seq_read_entry<T: Endian>(&mut self) -> io::Result<T> {
139 self.seq_read_entry_or_eof()
140 .await?
141 .ok_or_else(|| io_format_err!("unexepcted EOF"))
142 }
143 }
144
145 /// The decoder state machine implementation.
146 ///
147 /// We use `async fn` to implement the decoder state machine so that we can easily plug in both
148 /// synchronous or `async` I/O objects in as input.
149 pub struct DecoderImpl<T> {
150 input: T,
151 current_header: Header,
152 entry: Entry,
153 path_lengths: Vec<usize>,
154 state: State,
155 with_goodbye_tables: bool,
156 }
157
158 enum State {
159 Begin,
160 Default,
161 InPayload,
162 InDirectory,
163 Eof,
164 }
165
166 /// Control flow while parsing items.
167 ///
168 /// When parsing an entry, we usually go through all of its attribute items. Once we reach the end
169 /// of the entry we stop.
170 /// Note that if we're in a directory, we stopped at the beginning of its contents.
171 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
172 enum ItemResult {
173 /// We parsed an "attribute" item and should continue parsing.
174 Attribute,
175
176 /// We finished an entry (`SYMLINK`, `HARDLINK`, ...) or just entered the contents of a
177 /// directory (`FILENAME`, `GOODBYE`).
178 ///
179 /// We stop moving forward at this point.
180 Entry,
181 }
182
183 impl<T: SeqRead> DecoderImpl<T> {
184 pub async fn new(input: T) -> io::Result<Self> {
185 Self::new_full(input, "/".into()).await
186 }
187
188 pub(crate) async fn new_full(mut input: T, path: PathBuf) -> io::Result<Self> {
189 let offset = (&mut input as &mut dyn SeqRead)
190 .position()
191 .await
192 .transpose()?;
193 let this = DecoderImpl {
194 input,
195 current_header: unsafe { mem::zeroed() },
196 entry: Entry {
197 path,
198 kind: EntryKind::EndOfDirectory,
199 metadata: Metadata::default(),
200 offset,
201 },
202 path_lengths: Vec::new(),
203 state: State::Begin,
204 with_goodbye_tables: false,
205 };
206
207 // this.read_next_entry().await?;
208
209 Ok(this)
210 }
211
212 /// Get the next file entry, recursing into directories.
213 pub async fn next(&mut self) -> Option<io::Result<Entry>> {
214 self.next_do().await.transpose()
215 }
216
217 pub(crate) async fn next_do(&mut self) -> io::Result<Option<Entry>> {
218 loop {
219 match self.state {
220 State::Eof => return Ok(None),
221 State::Begin => return self.read_next_entry().await.map(Some),
222 State::Default => {
223 // we completely finished an entry, so now we're going "up" in the directory
224 // hierarchy and parse the next PXAR_FILENAME or the PXAR_GOODBYE:
225 self.read_next_item().await?;
226 }
227 State::InPayload => {
228 // We need to skip the current payload first.
229 self.skip_entry().await?;
230 self.read_next_item().await?;
231 }
232 State::InDirectory => {
233 // We're at the next FILENAME or GOODBYE item.
234 }
235 }
236
237 match self.current_header.htype {
238 format::PXAR_FILENAME => return self.handle_file_entry().await,
239 format::PXAR_GOODBYE => {
240 if self.with_goodbye_tables {
241 self.entry.kind = EntryKind::EndOfDirectory;
242 let offset = (&mut self.input as &mut dyn SeqRead)
243 .position()
244 .await
245 .transpose()?;
246 self.entry.offset = offset;
247 self.state = State::InPayload;
248 return Ok(Some(self.entry.take()));
249 }
250
251 self.skip_entry().await?;
252 if self.path_lengths.pop().is_some() {
253 self.state = State::Default;
254 // and move on:
255 continue;
256 } else {
257 self.state = State::Eof;
258 // early out:
259 return Ok(None);
260 }
261 }
262 h => io_bail!(
263 "expected filename or directory-goodbye pxar entry, got: {:x}",
264 h
265 ),
266 }
267 }
268 }
269
270 async fn handle_file_entry(&mut self) -> io::Result<Option<Entry>> {
271 let mut data = self.read_entry_as_bytes().await?;
272
273 // filenames are zero terminated!
274 if data.pop() != Some(0) {
275 io_bail!("illegal path found (missing terminating zero)");
276 }
277 if data.is_empty() {
278 io_bail!("illegal path found (empty)");
279 }
280
281 let path = PathBuf::from(OsString::from_vec(data));
282 self.set_path(&path)?;
283 self.read_next_entry().await.map(Some)
284 }
285
286 fn reset_path(&mut self) -> io::Result<()> {
287 let path_len = *self
288 .path_lengths
289 .last()
290 .ok_or_else(|| io_format_err!("internal decoder error: path underrun"))?;
291 let mut path = mem::replace(&mut self.entry.path, PathBuf::new())
292 .into_os_string()
293 .into_vec();
294 path.truncate(path_len);
295 self.entry.path = PathBuf::from(OsString::from_vec(path));
296 Ok(())
297 }
298
299 fn set_path(&mut self, path: &Path) -> io::Result<()> {
300 self.reset_path()?;
301 self.entry.path.push(path);
302 Ok(())
303 }
304
305 async fn read_next_entry_or_eof(&mut self) -> io::Result<Option<Entry>> {
306 self.state = State::Default;
307 self.entry.clear_data();
308
309 #[derive(Endian)]
310 #[repr(C)]
311 struct WithHeader<U: Endian> {
312 header: Header,
313 data: U,
314 }
315
316 let entry: WithHeader<format::Entry> = {
317 let input: &mut dyn SeqRead = &mut self.input;
318 match input.seq_read_entry_or_eof().await? {
319 None => return Ok(None),
320 Some(entry) => entry,
321 }
322 };
323
324 if entry.header.htype != format::PXAR_ENTRY {
325 io_bail!(
326 "expected pxar entry of type 'Entry', got: {:x}",
327 entry.header.htype
328 );
329 }
330
331 self.current_header = unsafe { mem::zeroed() };
332 self.entry.metadata = Metadata {
333 stat: entry.data,
334 ..Default::default()
335 };
336
337 while self.read_next_item().await? != ItemResult::Entry {}
338
339 if self.entry.is_dir() {
340 self.path_lengths
341 .push(self.entry.path.as_os_str().as_bytes().len());
342 }
343
344 Ok(Some(self.entry.take()))
345 }
346
347 async fn read_next_entry(&mut self) -> io::Result<Entry> {
348 self.read_next_entry_or_eof()
349 .await?
350 .ok_or_else(|| io_format_err!("unexpected EOF"))
351 }
352
353 async fn read_next_item(&mut self) -> io::Result<ItemResult> {
354 self.read_next_header().await?;
355 self.read_current_item().await
356 }
357
358 async fn read_next_header(&mut self) -> io::Result<()> {
359 let dest = unsafe {
360 std::slice::from_raw_parts_mut(
361 &mut self.current_header as *mut Header as *mut u8,
362 size_of_val(&self.current_header),
363 )
364 };
365 (&mut self.input as &mut dyn SeqRead)
366 .seq_read_exact(dest)
367 .await?;
368 Ok(())
369 }
370
371 /// Read the next item, the header is already loaded.
372 async fn read_current_item(&mut self) -> io::Result<ItemResult> {
373 match self.current_header.htype {
374 format::PXAR_XATTR => {
375 let xattr = self.read_xattr().await?;
376 self.entry.metadata.xattrs.push(xattr);
377 }
378 format::PXAR_ACL_USER => {
379 let entry = self.read_acl_user().await?;
380 self.entry.metadata.acl.users.push(entry);
381 }
382 format::PXAR_ACL_GROUP => {
383 let entry = self.read_acl_group().await?;
384 self.entry.metadata.acl.groups.push(entry);
385 }
386 format::PXAR_ACL_GROUP_OBJ => {
387 if self.entry.metadata.acl.group_obj.is_some() {
388 io_bail!("multiple acl group object entries detected");
389 }
390 let entry = self.read_acl_group_object().await?;
391 self.entry.metadata.acl.group_obj = Some(entry);
392 }
393 format::PXAR_ACL_DEFAULT => {
394 if self.entry.metadata.acl.default.is_some() {
395 io_bail!("multiple acl default entries detected");
396 }
397 let entry = self.read_acl_default().await?;
398 self.entry.metadata.acl.default = Some(entry);
399 }
400 format::PXAR_ACL_DEFAULT_USER => {
401 let entry = self.read_acl_user().await?;
402 self.entry.metadata.acl.default_users.push(entry);
403 }
404 format::PXAR_ACL_DEFAULT_GROUP => {
405 let entry = self.read_acl_group().await?;
406 self.entry.metadata.acl.default_groups.push(entry);
407 }
408 format::PXAR_FCAPS => {
409 if self.entry.metadata.fcaps.is_some() {
410 io_bail!("multiple file capability entries detected");
411 }
412 let entry = self.read_fcaps().await?;
413 self.entry.metadata.fcaps = Some(entry);
414 }
415 format::PXAR_QUOTA_PROJID => {
416 if self.entry.metadata.quota_project_id.is_some() {
417 io_bail!("multiple quota project id entries detected");
418 }
419 let entry = self.read_quota_project_id().await?;
420 self.entry.metadata.quota_project_id = Some(entry);
421 }
422 format::PXAR_SYMLINK => {
423 self.entry.kind = EntryKind::Symlink(self.read_symlink().await?);
424 return Ok(ItemResult::Entry);
425 }
426 format::PXAR_HARDLINK => {
427 self.entry.kind = EntryKind::Hardlink(self.read_hardlink().await?);
428 return Ok(ItemResult::Entry);
429 }
430 format::PXAR_DEVICE => {
431 self.entry.kind = EntryKind::Device(self.read_device().await?);
432 return Ok(ItemResult::Entry);
433 }
434 format::PXAR_PAYLOAD => {
435 self.entry.kind = EntryKind::File {
436 size: self.current_header.content_size(),
437 };
438 self.state = State::InPayload;
439 return Ok(ItemResult::Entry);
440 }
441 format::PXAR_FILENAME | format::PXAR_GOODBYE => {
442 self.state = State::InDirectory;
443 self.entry.kind = EntryKind::Directory;
444 return Ok(ItemResult::Entry);
445 }
446 _ => io_bail!("unexpected entry type: {:x}", self.current_header.htype),
447 }
448
449 Ok(ItemResult::Attribute)
450 }
451
452 //
453 // Local read helpers.
454 //
455 // These utilize additional information and hence are not part of the `dyn SeqRead` impl.
456 //
457
458 async fn skip_entry(&mut self) -> io::Result<()> {
459 let mut len = self.current_header.content_size();
460 let scratch = scratch_buffer();
461 while len >= (scratch.len() as u64) {
462 (&mut self.input as &mut dyn SeqRead)
463 .seq_read_exact(scratch)
464 .await?;
465 len -= scratch.len() as u64;
466 }
467 let len = len as usize;
468 if len > 0 {
469 (&mut self.input as &mut dyn SeqRead)
470 .seq_read_exact(&mut scratch[..len])
471 .await?;
472 }
473 Ok(())
474 }
475
476 async fn read_entry_as_bytes(&mut self) -> io::Result<Vec<u8>> {
477 let size = usize::try_from(self.current_header.content_size()).map_err(io_err_other)?;
478 let data = (&mut self.input as &mut dyn SeqRead)
479 .seq_read_exact_data(size)
480 .await?;
481 Ok(data)
482 }
483
484 /// Helper to read a struct entry while checking its size.
485 async fn read_simple_entry<U: Endian + 'static>(
486 &mut self,
487 what: &'static str,
488 ) -> io::Result<U> {
489 if self.current_header.content_size() != (size_of::<T>() as u64) {
490 io_bail!(
491 "bad {} size: {} (expected {})",
492 what,
493 self.current_header.content_size(),
494 size_of::<T>(),
495 );
496 }
497 (&mut self.input as &mut dyn SeqRead).seq_read_entry().await
498 }
499
500 //
501 // Read functions for PXAR components.
502 //
503
504 async fn read_xattr(&mut self) -> io::Result<format::XAttr> {
505 let data = self.read_entry_as_bytes().await?;
506
507 let name_len = data
508 .iter()
509 .position(|c| *c == 0)
510 .ok_or_else(|| io_format_err!("missing value separator in xattr"))?;
511
512 Ok(format::XAttr { data, name_len })
513 }
514
515 async fn read_symlink(&mut self) -> io::Result<format::Symlink> {
516 let data = self.read_entry_as_bytes().await?;
517 Ok(format::Symlink { data })
518 }
519
520 async fn read_hardlink(&mut self) -> io::Result<format::Hardlink> {
521 let data = self.read_entry_as_bytes().await?;
522 Ok(format::Hardlink { data })
523 }
524
525 async fn read_device(&mut self) -> io::Result<format::Device> {
526 self.read_simple_entry("device").await
527 }
528
529 async fn read_fcaps(&mut self) -> io::Result<format::FCaps> {
530 let data = self.read_entry_as_bytes().await?;
531 Ok(format::FCaps { data })
532 }
533
534 async fn read_acl_user(&mut self) -> io::Result<format::acl::User> {
535 self.read_simple_entry("acl user").await
536 }
537
538 async fn read_acl_group(&mut self) -> io::Result<format::acl::Group> {
539 self.read_simple_entry("acl group").await
540 }
541
542 async fn read_acl_group_object(&mut self) -> io::Result<format::acl::GroupObject> {
543 self.read_simple_entry("acl group object").await
544 }
545
546 async fn read_acl_default(&mut self) -> io::Result<format::acl::Default> {
547 self.read_simple_entry("acl default").await
548 }
549
550 async fn read_quota_project_id(&mut self) -> io::Result<format::QuotaProjectId> {
551 self.read_simple_entry("quota project id").await
552 }
553 }