]> git.proxmox.com Git - pxar.git/blob - src/accessor/mod.rs
f9984827b4ae1cdd5d28584a2457a8db6bf24a39
[pxar.git] / src / accessor / mod.rs
1 //! Random access for PXAR files.
2
3 use std::ffi::{OsStr, OsString};
4 use std::future::Future;
5 use std::io;
6 use std::mem::{self, size_of, size_of_val, MaybeUninit};
7 use std::ops::Range;
8 use std::os::unix::ffi::{OsStrExt, OsStringExt};
9 use std::path::{Path, PathBuf};
10 use std::pin::Pin;
11 use std::sync::Arc;
12 use std::task::{Context, Poll};
13
14 use endian_trait::Endian;
15
16 use crate::binary_tree_array;
17 use crate::decoder::{self, DecoderImpl};
18 use crate::format::{self, GoodbyeItem};
19 use crate::util;
20 use crate::{Entry, EntryKind};
21
22 pub mod aio;
23 pub mod cache;
24 pub mod sync;
25
26 pub mod read_at;
27
28 #[doc(inline)]
29 pub use sync::{Accessor, DirEntry, Directory, FileEntry, ReadDir};
30
31 #[doc(inline)]
32 pub use read_at::{MaybeReady, ReadAt, ReadAtExt, ReadAtOperation};
33
34 use cache::Cache;
35
36 /// Range information used for unsafe raw random access:
37 #[derive(Clone, Debug)]
38 pub struct EntryRangeInfo {
39 pub filename_header_offset: Option<u64>,
40 pub entry_range: Range<u64>,
41 }
42
43 impl EntryRangeInfo {
44 pub fn toplevel(entry_range: Range<u64>) -> Self {
45 Self {
46 filename_header_offset: None,
47 entry_range,
48 }
49 }
50 }
51
52 /// awaitable version of `ReadAt`.
53 async fn read_at<T>(input: &T, buf: &mut [u8], offset: u64) -> io::Result<usize>
54 where
55 T: ReadAtExt,
56 {
57 input.read_at(buf, offset).await
58 }
59
60 /// `read_exact_at` - since that's what we _actually_ want most of the time.
61 async fn read_exact_at<T>(input: &T, mut buf: &mut [u8], mut offset: u64) -> io::Result<()>
62 where
63 T: ReadAt,
64 {
65 while !buf.is_empty() {
66 match read_at(input, buf, offset).await? {
67 0 => io_bail!("unexpected EOF"),
68 got => {
69 buf = &mut buf[got..];
70 offset += got as u64;
71 }
72 }
73 }
74 Ok(())
75 }
76
77 /// Helper to read into an `Endian`-implementing `struct`.
78 async fn read_entry_at<T, E: Endian>(input: &T, offset: u64) -> io::Result<E>
79 where
80 T: ReadAt,
81 {
82 let mut data = MaybeUninit::<E>::uninit();
83 let buf =
84 unsafe { std::slice::from_raw_parts_mut(data.as_mut_ptr() as *mut u8, size_of::<E>()) };
85 read_exact_at(input, buf, offset).await?;
86 Ok(unsafe { data.assume_init().from_le() })
87 }
88
89 /// Helper to read into an allocated byte vector.
90 async fn read_exact_data_at<T>(input: &T, size: usize, offset: u64) -> io::Result<Vec<u8>>
91 where
92 T: ReadAt,
93 {
94 let mut data = util::vec_new(size);
95 read_exact_at(input, &mut data[..], offset).await?;
96 Ok(data)
97 }
98
99 /// Allow using trait objects for `T: ReadAt`
100 impl<'d> ReadAt for &(dyn ReadAt + 'd) {
101 fn start_read_at<'a>(
102 self: Pin<&'a Self>,
103 cx: &mut Context,
104 buf: &'a mut [u8],
105 offset: u64,
106 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
107 unsafe { Pin::new_unchecked(&**self).start_read_at(cx, buf, offset) }
108 }
109
110 fn poll_complete<'a>(
111 self: Pin<&'a Self>,
112 op: ReadAtOperation<'a>,
113 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
114 unsafe { Pin::new_unchecked(&**self).poll_complete(op) }
115 }
116 }
117
118 /// Convenience impl for `Arc<dyn ReadAt + Send + Sync + 'static>`. Since `ReadAt` only requires
119 /// immutable `&self`, this adds some convenience by allowing to just `Arc` any `'static` type that
120 /// implemments `ReadAt` for type monomorphization.
121 impl ReadAt for Arc<dyn ReadAt + Send + Sync + 'static> {
122 fn start_read_at<'a>(
123 self: Pin<&'a Self>,
124 cx: &mut Context,
125 buf: &'a mut [u8],
126 offset: u64,
127 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
128 unsafe {
129 self.map_unchecked(|this| &**this)
130 .start_read_at(cx, buf, offset)
131 }
132 }
133
134 fn poll_complete<'a>(
135 self: Pin<&'a Self>,
136 op: ReadAtOperation<'a>,
137 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
138 unsafe { self.map_unchecked(|this| &**this).poll_complete(op) }
139 }
140 }
141
142 /// Convenience impl for in-memory byte slices.
143 impl ReadAt for &'_ [u8] {
144 fn start_read_at<'a>(
145 self: Pin<&'a Self>,
146 _cx: &mut Context,
147 buf: &'a mut [u8],
148 offset: u64,
149 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
150 if offset >= self.len() as u64 {
151 return MaybeReady::Ready(Ok(0));
152 }
153
154 let offset = offset as usize;
155 let end = (offset + buf.len()).min(self.len());
156 let size = end - offset;
157 buf[..size].copy_from_slice(&self[offset..end]);
158 MaybeReady::Ready(Ok(size))
159 }
160
161 fn poll_complete<'a>(
162 self: Pin<&'a Self>,
163 _op: ReadAtOperation<'a>,
164 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
165 panic!("start_read_at on byte slice returned Pending");
166 }
167 }
168
169 #[derive(Clone)]
170 struct Caches {
171 /// The goodbye table cache maps goodbye table offsets to cache entries.
172 gbt_cache: Option<Arc<dyn Cache<u64, [GoodbyeItem]> + Send + Sync>>,
173 }
174
175 impl Default for Caches {
176 fn default() -> Self {
177 Self { gbt_cache: None }
178 }
179 }
180
181 /// The random access state machine implementation.
182 pub(crate) struct AccessorImpl<T> {
183 input: T,
184 size: u64,
185 caches: Arc<Caches>,
186 }
187
188 impl<T: ReadAt> AccessorImpl<T> {
189 pub async fn new(input: T, size: u64) -> io::Result<Self> {
190 if size < (size_of::<GoodbyeItem>() as u64) {
191 io_bail!("too small to contain a pxar archive");
192 }
193
194 Ok(Self {
195 input,
196 size,
197 caches: Arc::new(Caches::default()),
198 })
199 }
200
201 pub fn size(&self) -> u64 {
202 self.size
203 }
204
205 pub async fn open_root_ref<'a>(&'a self) -> io::Result<DirectoryImpl<&'a dyn ReadAt>> {
206 DirectoryImpl::open_at_end(
207 &self.input as &dyn ReadAt,
208 self.size,
209 "/".into(),
210 Arc::clone(&self.caches),
211 )
212 .await
213 }
214
215 pub fn set_goodbye_table_cache(
216 &mut self,
217 cache: Option<Arc<dyn Cache<u64, [GoodbyeItem]> + Send + Sync>>,
218 ) {
219 let new_caches = Arc::new(Caches {
220 gbt_cache: cache,
221 ..*self.caches
222 });
223 self.caches = new_caches;
224 }
225 }
226
227 async fn get_decoder<T: ReadAt>(
228 input: T,
229 entry_range: Range<u64>,
230 path: PathBuf,
231 ) -> io::Result<DecoderImpl<SeqReadAtAdapter<T>>> {
232 Ok(DecoderImpl::new_full(SeqReadAtAdapter::new(input, entry_range), path).await?)
233 }
234
235 // NOTE: This performs the Decoder::read_next_item() behavior! Keep in mind when changing!
236 async fn get_decoder_at_filename<T: ReadAt>(
237 input: T,
238 entry_range: Range<u64>,
239 path: PathBuf,
240 ) -> io::Result<(DecoderImpl<SeqReadAtAdapter<T>>, u64)> {
241 // Read the header, it should be a FILENAME, then skip over it and its length:
242 let header: format::Header = read_entry_at(&input, entry_range.start).await?;
243 header.check_header_size()?;
244
245 if header.htype != format::PXAR_FILENAME {
246 io_bail!("expected filename entry, got {:?}", header);
247 }
248
249 let entry_offset = entry_range.start + header.full_size();
250 if entry_offset >= entry_range.end {
251 io_bail!("filename exceeds current file range");
252 }
253
254 Ok((
255 get_decoder(input, entry_offset..entry_range.end, path).await?,
256 entry_offset,
257 ))
258 }
259
260 impl<T: Clone + ReadAt> AccessorImpl<T> {
261 pub async fn open_root(&self) -> io::Result<DirectoryImpl<T>> {
262 DirectoryImpl::open_at_end(
263 self.input.clone(),
264 self.size,
265 "/".into(),
266 Arc::clone(&self.caches),
267 )
268 .await
269 }
270
271 /// Allow opening a directory at a specified offset.
272 pub async unsafe fn open_dir_at_end(&self, offset: u64) -> io::Result<DirectoryImpl<T>> {
273 DirectoryImpl::open_at_end(
274 self.input.clone(),
275 offset,
276 "/".into(),
277 Arc::clone(&self.caches),
278 )
279 .await
280 }
281
282 /// Allow opening a regular file from a specified range.
283 pub async unsafe fn open_file_at_range(
284 &self,
285 entry_range_info: &EntryRangeInfo,
286 ) -> io::Result<FileEntryImpl<T>> {
287 let mut decoder = get_decoder(
288 self.input.clone(),
289 entry_range_info.entry_range.clone(),
290 PathBuf::new(),
291 )
292 .await?;
293 let entry = decoder
294 .next()
295 .await
296 .ok_or_else(|| io_format_err!("unexpected EOF while decoding file entry"))??;
297 Ok(FileEntryImpl {
298 input: self.input.clone(),
299 entry,
300 entry_range_info: entry_range_info.clone(),
301 caches: Arc::clone(&self.caches),
302 })
303 }
304
305 /// Allow opening arbitrary contents from a specific range.
306 pub unsafe fn open_contents_at_range(&self, range: Range<u64>) -> FileContentsImpl<T> {
307 FileContentsImpl::new(self.input.clone(), range)
308 }
309
310 /// Following a hardlink breaks a couple of conventions we otherwise have, particularly we will
311 /// never know the actual length of the target entry until we're done decoding it, so this
312 /// needs to happen at the accessor level, rather than a "sub-entry-reader".
313 pub async fn follow_hardlink(&self, entry: &FileEntryImpl<T>) -> io::Result<FileEntryImpl<T>> {
314 let link_offset = match entry.entry.kind() {
315 EntryKind::Hardlink(link) => link.offset,
316 _ => io_bail!("cannot resolve a non-hardlink"),
317 };
318
319 let entry_file_offset = entry
320 .entry_range_info
321 .filename_header_offset
322 .ok_or_else(|| io_format_err!("cannot follow hardlink without a file entry header"))?;
323
324 if link_offset > entry_file_offset {
325 io_bail!("invalid offset in hardlink");
326 }
327
328 let link_offset = entry_file_offset - link_offset;
329
330 let (mut decoder, entry_offset) =
331 get_decoder_at_filename(self.input.clone(), link_offset..self.size, PathBuf::new())
332 .await?;
333
334 let entry = decoder
335 .next()
336 .await
337 .ok_or_else(|| io_format_err!("unexpected EOF while following a hardlink"))??;
338
339 match entry.kind() {
340 EntryKind::File { offset: None, .. } => {
341 io_bail!("failed to follow hardlink, reader provided no offsets");
342 }
343 EntryKind::File {
344 offset: Some(offset),
345 size,
346 } => {
347 let meta_size = offset - link_offset;
348 let entry_end = link_offset + meta_size + size;
349 Ok(FileEntryImpl {
350 input: self.input.clone(),
351 entry,
352 entry_range_info: EntryRangeInfo {
353 filename_header_offset: Some(link_offset),
354 entry_range: entry_offset..entry_end,
355 },
356 caches: Arc::clone(&self.caches),
357 })
358 }
359 _ => io_bail!("hardlink does not point to a regular file"),
360 }
361 }
362 }
363
364 /// The directory random-access state machine implementation.
365 pub(crate) struct DirectoryImpl<T> {
366 input: T,
367 entry_ofs: u64,
368 goodbye_ofs: u64,
369 size: u64,
370 table: Arc<[GoodbyeItem]>,
371 path: PathBuf,
372 caches: Arc<Caches>,
373 }
374
375 impl<T: Clone + ReadAt> DirectoryImpl<T> {
376 /// Open a directory ending at the specified position.
377 async fn open_at_end(
378 input: T,
379 end_offset: u64,
380 path: PathBuf,
381 caches: Arc<Caches>,
382 ) -> io::Result<DirectoryImpl<T>> {
383 let tail = Self::read_tail_entry(&input, end_offset).await?;
384
385 if end_offset < tail.size {
386 io_bail!("goodbye tail size out of range");
387 }
388
389 let goodbye_ofs = end_offset - tail.size;
390
391 if goodbye_ofs < tail.offset {
392 io_bail!("goodbye offset out of range");
393 }
394
395 let entry_ofs = goodbye_ofs - tail.offset;
396 let size = end_offset - entry_ofs;
397
398 let table: Option<Arc<[GoodbyeItem]>> = caches
399 .gbt_cache
400 .as_ref()
401 .and_then(|cache| cache.fetch(goodbye_ofs));
402
403 let mut this = Self {
404 input,
405 entry_ofs,
406 goodbye_ofs,
407 size,
408 table: table.as_ref().map_or_else(|| Arc::new([]), Arc::clone),
409 path,
410 caches,
411 };
412
413 // sanity check:
414 if this.table_size() % (size_of::<GoodbyeItem>() as u64) != 0 {
415 io_bail!("invalid goodbye table size: {}", this.table_size());
416 }
417
418 if table.is_none() {
419 this.table = this.load_table().await?;
420 if let Some(ref cache) = this.caches.gbt_cache {
421 cache.insert(goodbye_ofs, Arc::clone(&this.table));
422 }
423 }
424
425 Ok(this)
426 }
427
428 /// Load the entire goodbye table:
429 async fn load_table(&self) -> io::Result<Arc<[GoodbyeItem]>> {
430 let len = self.len();
431 let mut data = Vec::with_capacity(self.len());
432 unsafe {
433 data.set_len(len);
434 let slice = std::slice::from_raw_parts_mut(
435 data.as_mut_ptr() as *mut u8,
436 len * size_of::<GoodbyeItem>(),
437 );
438 read_exact_at(&self.input, slice, self.table_offset()).await?;
439 drop(slice);
440 }
441 Ok(Arc::from(data))
442 }
443
444 #[inline]
445 fn end_offset(&self) -> u64 {
446 self.entry_ofs + self.size
447 }
448
449 #[inline]
450 fn entry_range(&self) -> Range<u64> {
451 self.entry_ofs..self.end_offset()
452 }
453
454 #[inline]
455 fn table_size(&self) -> u64 {
456 (self.end_offset() - self.goodbye_ofs) - (size_of::<format::Header>() as u64)
457 }
458
459 #[inline]
460 fn table_offset(&self) -> u64 {
461 self.goodbye_ofs + (size_of::<format::Header>() as u64)
462 }
463
464 /// Length *excluding* the tail marker!
465 #[inline]
466 fn len(&self) -> usize {
467 (self.table_size() / (size_of::<GoodbyeItem>() as u64)) as usize - 1
468 }
469
470 /// Read the goodbye tail and perform some sanity checks.
471 async fn read_tail_entry(input: &T, end_offset: u64) -> io::Result<GoodbyeItem> {
472 if end_offset < (size_of::<GoodbyeItem>() as u64) {
473 io_bail!("goodbye tail does not fit");
474 }
475
476 let tail_offset = end_offset - (size_of::<GoodbyeItem>() as u64);
477 let tail: GoodbyeItem = read_entry_at(input, tail_offset).await?;
478
479 if tail.hash != format::PXAR_GOODBYE_TAIL_MARKER {
480 io_bail!("no goodbye tail marker found");
481 }
482
483 Ok(tail)
484 }
485
486 /// Get a decoder for the directory contents.
487 pub(crate) async fn decode_full(&self) -> io::Result<DecoderImpl<SeqReadAtAdapter<T>>> {
488 let (dir, decoder) = self.decode_one_entry(self.entry_range(), None).await?;
489 if !dir.is_dir() {
490 io_bail!("directory does not seem to be a directory");
491 }
492 Ok(decoder)
493 }
494
495 async fn get_decoder(
496 &self,
497 entry_range: Range<u64>,
498 file_name: Option<&Path>,
499 ) -> io::Result<DecoderImpl<SeqReadAtAdapter<T>>> {
500 get_decoder(
501 self.input.clone(),
502 entry_range,
503 match file_name {
504 None => self.path.clone(),
505 Some(file) => self.path.join(file),
506 },
507 )
508 .await
509 }
510
511 async fn decode_one_entry(
512 &self,
513 entry_range: Range<u64>,
514 file_name: Option<&Path>,
515 ) -> io::Result<(Entry, DecoderImpl<SeqReadAtAdapter<T>>)> {
516 let mut decoder = self.get_decoder(entry_range, file_name).await?;
517 let entry = decoder
518 .next()
519 .await
520 .ok_or_else(|| io_format_err!("unexpected EOF while decoding directory entry"))??;
521 Ok((entry, decoder))
522 }
523
524 fn lookup_hash_position(&self, hash: u64, start: usize, skip: usize) -> Option<usize> {
525 binary_tree_array::search_by(&self.table, start, skip, |i| hash.cmp(&i.hash))
526 }
527
528 pub async fn lookup_self(&self) -> io::Result<FileEntryImpl<T>> {
529 let (entry, _decoder) = self.decode_one_entry(self.entry_range(), None).await?;
530 Ok(FileEntryImpl {
531 input: self.input.clone(),
532 entry,
533 entry_range_info: EntryRangeInfo {
534 filename_header_offset: None,
535 entry_range: self.entry_range(),
536 },
537 caches: Arc::clone(&self.caches),
538 })
539 }
540
541 /// Lookup a directory entry.
542 pub async fn lookup(&self, path: &Path) -> io::Result<Option<FileEntryImpl<T>>> {
543 let mut cur: Option<FileEntryImpl<T>> = None;
544
545 let mut first = true;
546 for component in path.components() {
547 use std::path::Component;
548
549 let first = mem::replace(&mut first, false);
550
551 let component = match component {
552 Component::Normal(path) => path,
553 Component::ParentDir => io_bail!("cannot enter parent directory in archive"),
554 Component::RootDir | Component::CurDir if first => {
555 cur = Some(self.lookup_self().await?);
556 continue;
557 }
558 Component::CurDir => continue,
559 _ => io_bail!("invalid component in path"),
560 };
561
562 let next = match cur {
563 Some(entry) => {
564 entry
565 .enter_directory()
566 .await?
567 .lookup_component(component)
568 .await?
569 }
570 None => self.lookup_component(component).await?,
571 };
572
573 if next.is_none() {
574 return Ok(None);
575 }
576
577 cur = next;
578 }
579
580 Ok(cur)
581 }
582
583 /// Lookup a single directory entry component (does not handle multiple components in path)
584 pub async fn lookup_component(&self, path: &OsStr) -> io::Result<Option<FileEntryImpl<T>>> {
585 let hash = format::hash_filename(path.as_bytes());
586 let first_index = match self.lookup_hash_position(hash, 0, 0) {
587 Some(index) => index,
588 None => return Ok(None),
589 };
590
591 // Lookup FILENAME, if the hash matches but the filename doesn't, check for a duplicate
592 // hash once found, use the GoodbyeItem's offset+size as well as the file's Entry to return
593 // a DirEntry::Dir or Dir::Entry.
594 //
595 let mut dup = 0;
596 loop {
597 let index = match self.lookup_hash_position(hash, first_index, dup) {
598 Some(index) => index,
599 None => return Ok(None),
600 };
601
602 let cursor = self.get_cursor(index).await?;
603 if cursor.file_name == path {
604 return Ok(Some(cursor.decode_entry().await?));
605 }
606
607 dup += 1;
608 }
609 }
610
611 async fn get_cursor<'a>(&'a self, index: usize) -> io::Result<DirEntryImpl<'a, T>> {
612 let entry = &self.table[index];
613 let file_goodbye_ofs = entry.offset;
614 if self.goodbye_ofs < file_goodbye_ofs {
615 io_bail!("invalid file offset");
616 }
617
618 let file_ofs = self.goodbye_ofs - file_goodbye_ofs;
619 let (file_name, entry_ofs) = self.read_filename_entry(file_ofs).await?;
620
621 let entry_range = Range {
622 start: entry_ofs,
623 end: file_ofs + entry.size,
624 };
625 if entry_range.end < entry_range.start {
626 io_bail!(
627 "bad file: invalid entry ranges for {:?}: \
628 start=0x{:x}, file_ofs=0x{:x}, size=0x{:x}",
629 file_name,
630 entry_ofs,
631 file_ofs,
632 entry.size,
633 );
634 }
635
636 Ok(DirEntryImpl {
637 dir: self,
638 file_name,
639 entry_range_info: EntryRangeInfo {
640 filename_header_offset: Some(file_ofs),
641 entry_range,
642 },
643 caches: Arc::clone(&self.caches),
644 })
645 }
646
647 async fn read_filename_entry(&self, file_ofs: u64) -> io::Result<(PathBuf, u64)> {
648 let head: format::Header = read_entry_at(&self.input, file_ofs).await?;
649 if head.htype != format::PXAR_FILENAME {
650 io_bail!("expected PXAR_FILENAME header, found: {}", head);
651 }
652
653 let mut path = read_exact_data_at(
654 &self.input,
655 head.content_size() as usize,
656 file_ofs + (size_of_val(&head) as u64),
657 )
658 .await?;
659
660 if path.pop() != Some(0) {
661 io_bail!("invalid file name (missing terminating zero)");
662 }
663
664 crate::util::validate_filename(&path)?;
665
666 let file_name = PathBuf::from(OsString::from_vec(path));
667 format::check_file_name(&file_name)?;
668
669 Ok((file_name, file_ofs + head.full_size()))
670 }
671
672 pub fn read_dir(&self) -> ReadDirImpl<T> {
673 ReadDirImpl::new(self, 0)
674 }
675
676 pub fn entry_count(&self) -> usize {
677 self.table.len()
678 }
679 }
680
681 /// A file entry retrieved from a Directory.
682 #[derive(Clone)]
683 pub(crate) struct FileEntryImpl<T: Clone + ReadAt> {
684 input: T,
685 entry: Entry,
686 entry_range_info: EntryRangeInfo,
687 caches: Arc<Caches>,
688 }
689
690 impl<T: Clone + ReadAt> FileEntryImpl<T> {
691 pub async fn enter_directory(&self) -> io::Result<DirectoryImpl<T>> {
692 if !self.entry.is_dir() {
693 io_bail!("enter_directory() on a non-directory");
694 }
695
696 DirectoryImpl::open_at_end(
697 self.input.clone(),
698 self.entry_range_info.entry_range.end,
699 self.entry.path.clone(),
700 Arc::clone(&self.caches),
701 )
702 .await
703 }
704
705 /// For use with unsafe accessor methods.
706 pub fn content_range(&self) -> io::Result<Option<Range<u64>>> {
707 match self.entry.kind {
708 EntryKind::File { offset: None, .. } => {
709 io_bail!("cannot open file, reader provided no offset")
710 }
711 EntryKind::File {
712 size,
713 offset: Some(offset),
714 } => Ok(Some(offset..(offset + size))),
715 _ => Ok(None),
716 }
717 }
718
719 pub async fn contents(&self) -> io::Result<FileContentsImpl<T>> {
720 match self.content_range()? {
721 Some(range) => Ok(FileContentsImpl::new(self.input.clone(), range)),
722 None => io_bail!("not a file"),
723 }
724 }
725
726 #[inline]
727 pub fn into_entry(self) -> Entry {
728 self.entry
729 }
730
731 #[inline]
732 pub fn entry(&self) -> &Entry {
733 &self.entry
734 }
735
736 /// Exposed for raw by-offset access methods (use with `open_dir_at_end`).
737 #[inline]
738 pub fn entry_range_info(&self) -> &EntryRangeInfo {
739 &self.entry_range_info
740 }
741 }
742
743 /// An iterator over the contents of a directory.
744 pub(crate) struct ReadDirImpl<'a, T> {
745 dir: &'a DirectoryImpl<T>,
746 at: usize,
747 }
748
749 impl<'a, T: Clone + ReadAt> ReadDirImpl<'a, T> {
750 fn new(dir: &'a DirectoryImpl<T>, at: usize) -> Self {
751 Self { dir, at }
752 }
753
754 /// Get the next entry.
755 pub async fn next(&mut self) -> io::Result<Option<DirEntryImpl<'a, T>>> {
756 if self.at == self.dir.table.len() {
757 Ok(None)
758 } else {
759 let cursor = self.dir.get_cursor(self.at).await?;
760 self.at += 1;
761 Ok(Some(cursor))
762 }
763 }
764
765 /// Efficient alternative to `Iterator::skip`.
766 #[inline]
767 pub fn skip(self, n: usize) -> Self {
768 Self {
769 at: (self.at + n).min(self.dir.table.len()),
770 dir: self.dir,
771 }
772 }
773
774 /// Efficient alternative to `Iterator::count`.
775 #[inline]
776 pub fn count(self) -> usize {
777 self.dir.table.len()
778 }
779 }
780
781 /// A cursor pointing to a file in a directory.
782 ///
783 /// At this point only the file name has been read and we remembered the position for finding the
784 /// actual data. This can be upgraded into a FileEntryImpl.
785 pub(crate) struct DirEntryImpl<'a, T: Clone + ReadAt> {
786 dir: &'a DirectoryImpl<T>,
787 file_name: PathBuf,
788 entry_range_info: EntryRangeInfo,
789 caches: Arc<Caches>,
790 }
791
792 impl<'a, T: Clone + ReadAt> DirEntryImpl<'a, T> {
793 pub fn file_name(&self) -> &Path {
794 &self.file_name
795 }
796
797 async fn decode_entry(&self) -> io::Result<FileEntryImpl<T>> {
798 let (entry, _decoder) = self
799 .dir
800 .decode_one_entry(
801 self.entry_range_info.entry_range.clone(),
802 Some(&self.file_name),
803 )
804 .await?;
805
806 Ok(FileEntryImpl {
807 input: self.dir.input.clone(),
808 entry,
809 entry_range_info: self.entry_range_info.clone(),
810 caches: Arc::clone(&self.caches),
811 })
812 }
813
814 /// Exposed for raw by-offset access methods.
815 #[inline]
816 pub fn entry_range_info(&self) -> &EntryRangeInfo {
817 &self.entry_range_info
818 }
819 }
820
821 /// A reader for file contents.
822 #[derive(Clone)]
823 pub(crate) struct FileContentsImpl<T> {
824 input: T,
825
826 /// Absolute offset inside the `input`.
827 range: Range<u64>,
828 }
829
830 impl<T: Clone + ReadAt> FileContentsImpl<T> {
831 pub fn new(input: T, range: Range<u64>) -> Self {
832 Self { input, range }
833 }
834
835 #[inline]
836 pub fn file_size(&self) -> u64 {
837 self.range.end - self.range.start
838 }
839
840 async fn read_at(&self, mut buf: &mut [u8], offset: u64) -> io::Result<usize> {
841 let size = self.file_size();
842 if offset >= size {
843 return Ok(0);
844 }
845 let remaining = size - offset;
846
847 if remaining < buf.len() as u64 {
848 buf = &mut buf[..(remaining as usize)];
849 }
850
851 read_at(&self.input, buf, self.range.start + offset).await
852 }
853 }
854
855 impl<T: Clone + ReadAt> ReadAt for FileContentsImpl<T> {
856 fn start_read_at<'a>(
857 self: Pin<&'a Self>,
858 cx: &mut Context,
859 mut buf: &'a mut [u8],
860 offset: u64,
861 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
862 let size = self.file_size();
863 if offset >= size {
864 return MaybeReady::Ready(Ok(0));
865 }
866 let remaining = size - offset;
867
868 if remaining < buf.len() as u64 {
869 buf = &mut buf[..(remaining as usize)];
870 }
871
872 let offset = self.range.start + offset;
873 unsafe { self.map_unchecked(|this| &this.input) }.start_read_at(cx, buf, offset)
874 }
875
876 fn poll_complete<'a>(
877 self: Pin<&'a Self>,
878 op: ReadAtOperation<'a>,
879 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
880 unsafe { self.map_unchecked(|this| &this.input) }.poll_complete(op)
881 }
882 }
883
884 #[doc(hidden)]
885 pub struct SeqReadAtAdapter<T> {
886 input: T,
887 range: Range<u64>,
888 buffer: Vec<u8>,
889 future: Option<Pin<Box<dyn Future<Output = io::Result<(usize, Vec<u8>)>> + 'static>>>,
890 }
891
892 // We lose `Send` via the boxed trait object and don't want to force the trait object to
893 // potentially be more strict than `T`, so we leave it as it is ans implement Send and Sync
894 // depending on T.
895 unsafe impl<T: Send> Send for SeqReadAtAdapter<T> {}
896 unsafe impl<T: Sync> Sync for SeqReadAtAdapter<T> {}
897
898 impl<T> Drop for SeqReadAtAdapter<T> {
899 fn drop(&mut self) {
900 // drop order
901 self.future = None;
902 }
903 }
904
905 impl<T: ReadAt> SeqReadAtAdapter<T> {
906 pub fn new(input: T, range: Range<u64>) -> Self {
907 if range.end < range.start {
908 panic!("BAD SEQ READ AT ADAPTER");
909 }
910 Self {
911 input,
912 range,
913 buffer: Vec::new(),
914 future: None,
915 }
916 }
917
918 #[inline]
919 fn remaining(&self) -> usize {
920 (self.range.end - self.range.start) as usize
921 }
922 }
923
924 impl<T: ReadAt> decoder::SeqRead for SeqReadAtAdapter<T> {
925 fn poll_seq_read(
926 self: Pin<&mut Self>,
927 cx: &mut Context,
928 dest: &mut [u8],
929 ) -> Poll<io::Result<usize>> {
930 let len = dest.len().min(self.remaining());
931 let dest = &mut dest[..len];
932
933 let this = unsafe { self.get_unchecked_mut() };
934 loop {
935 match this.future.take() {
936 None => {
937 let mut buffer = mem::take(&mut this.buffer);
938 util::scale_read_buffer(&mut buffer, dest.len());
939
940 // Note that we're pinned and we have a drop-handler which forces self.future
941 // to be dropped before `input`, so putting a reference to self.input into the
942 // future should be ok!
943 let reader = &this.input;
944
945 let at = this.range.start;
946 let future: Pin<Box<dyn Future<Output = io::Result<(usize, Vec<u8>)>>>> =
947 Box::pin(async move {
948 let got = reader.read_at(&mut buffer, at).await?;
949 io::Result::Ok((got, buffer))
950 });
951 // Ditch the self-reference life-time now:
952 this.future = Some(unsafe { mem::transmute(future) });
953 }
954 Some(mut fut) => match fut.as_mut().poll(cx) {
955 Poll::Pending => {
956 this.future = Some(fut);
957 return Poll::Pending;
958 }
959 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
960 Poll::Ready(Ok((got, buffer))) => {
961 this.buffer = buffer;
962 this.range.start += got as u64;
963 let len = got.min(dest.len());
964 dest[..len].copy_from_slice(&this.buffer[..len]);
965 return Poll::Ready(Ok(len));
966 }
967 },
968 }
969 }
970 }
971
972 fn poll_position(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<io::Result<u64>>> {
973 Poll::Ready(Some(Ok(self.range.start)))
974 }
975 }