]> git.proxmox.com Git - pxar.git/blob - src/accessor.rs
fix hardlink format
[pxar.git] / src / accessor.rs
1 //! Random access for PXAR files.
2
3 use std::ffi::{OsStr, OsString};
4 use std::io;
5 use std::mem::{self, size_of, size_of_val, MaybeUninit};
6 use std::ops::Range;
7 use std::os::unix::ffi::{OsStrExt, OsStringExt};
8 use std::path::{Path, PathBuf};
9 use std::pin::Pin;
10 use std::sync::Arc;
11 use std::task::{Context, Poll};
12
13 use endian_trait::Endian;
14
15 use crate::binary_tree_array;
16 use crate::decoder::{self, DecoderImpl};
17 use crate::format::{self, GoodbyeItem};
18 use crate::poll_fn::poll_fn;
19 use crate::util;
20 use crate::{Entry, EntryKind};
21
22 pub mod aio;
23 pub mod cache;
24 pub mod sync;
25
26 #[doc(inline)]
27 pub use sync::{Accessor, DirEntry, Directory, FileEntry, ReadDir};
28
29 use cache::Cache;
30
31 /// Random access read implementation.
32 pub trait ReadAt {
33 fn poll_read_at(
34 self: Pin<&Self>,
35 cx: &mut Context,
36 buf: &mut [u8],
37 offset: u64,
38 ) -> Poll<io::Result<usize>>;
39 }
40
41 /// awaitable version of `poll_read_at`.
42 async fn read_at<T>(input: &T, buf: &mut [u8], offset: u64) -> io::Result<usize>
43 where
44 T: ReadAt + ?Sized,
45 {
46 poll_fn(|cx| unsafe { Pin::new_unchecked(input).poll_read_at(cx, buf, offset) }).await
47 }
48
49 /// `read_exact_at` - since that's what we _actually_ want most of the time.
50 async fn read_exact_at<T>(input: &T, mut buf: &mut [u8], mut offset: u64) -> io::Result<()>
51 where
52 T: ReadAt + ?Sized,
53 {
54 while !buf.is_empty() {
55 match read_at(input, buf, offset).await? {
56 0 => io_bail!("unexpected EOF"),
57 got => {
58 buf = &mut buf[got..];
59 offset += got as u64;
60 }
61 }
62 }
63 Ok(())
64 }
65
66 /// Helper to read into an `Endian`-implementing `struct`.
67 async fn read_entry_at<T, E: Endian>(input: &T, offset: u64) -> io::Result<E>
68 where
69 T: ReadAt + ?Sized,
70 {
71 let mut data = MaybeUninit::<E>::uninit();
72 let buf =
73 unsafe { std::slice::from_raw_parts_mut(data.as_mut_ptr() as *mut u8, size_of::<E>()) };
74 read_exact_at(input, buf, offset).await?;
75 Ok(unsafe { data.assume_init().from_le() })
76 }
77
78 /// Helper to read into an allocated byte vector.
79 async fn read_exact_data_at<T>(input: &T, size: usize, offset: u64) -> io::Result<Vec<u8>>
80 where
81 T: ReadAt + ?Sized,
82 {
83 let mut data = util::vec_new(size);
84 read_exact_at(input, &mut data[..], offset).await?;
85 Ok(data)
86 }
87
88 /// Allow using trait objects for `T: ReadAt`
89 impl<'a> ReadAt for &(dyn ReadAt + 'a) {
90 fn poll_read_at(
91 self: Pin<&Self>,
92 cx: &mut Context,
93 buf: &mut [u8],
94 offset: u64,
95 ) -> Poll<io::Result<usize>> {
96 unsafe { Pin::new_unchecked(&**self).poll_read_at(cx, buf, offset) }
97 }
98 }
99
100 /// Convenience impl for `Arc<dyn ReadAt + Send + Sync + 'static>`. Since `ReadAt` only requires
101 /// immutable `&self`, this adds some convenience by allowing to just `Arc` any `'static` type that
102 /// implemments `ReadAt` for type monomorphization.
103 impl ReadAt for Arc<dyn ReadAt + Send + Sync + 'static> {
104 fn poll_read_at(
105 self: Pin<&Self>,
106 cx: &mut Context,
107 buf: &mut [u8],
108 offset: u64,
109 ) -> Poll<io::Result<usize>> {
110 unsafe { Pin::new_unchecked(&**self).poll_read_at(cx, buf, offset) }
111 }
112 }
113
114 #[derive(Clone)]
115 struct Caches {
116 /// The goodbye table cache maps goodbye table offsets to cache entries.
117 gbt_cache: Option<Arc<dyn Cache<u64, [GoodbyeItem]> + Send + Sync>>,
118 }
119
120 impl Default for Caches {
121 fn default() -> Self {
122 Self { gbt_cache: None }
123 }
124 }
125
126 /// The random access state machine implementation.
127 pub(crate) struct AccessorImpl<T> {
128 input: T,
129 size: u64,
130 caches: Arc<Caches>,
131 }
132
133 impl<T: ReadAt> AccessorImpl<T> {
134 pub async fn new(input: T, size: u64) -> io::Result<Self> {
135 if size < (size_of::<GoodbyeItem>() as u64) {
136 io_bail!("too small to contain a pxar archive");
137 }
138
139 Ok(Self {
140 input,
141 size,
142 caches: Arc::new(Caches::default()),
143 })
144 }
145
146 pub fn size(&self) -> u64 {
147 self.size
148 }
149
150 pub async fn open_root_ref<'a>(&'a self) -> io::Result<DirectoryImpl<&'a dyn ReadAt>> {
151 DirectoryImpl::open_at_end(
152 &self.input as &dyn ReadAt,
153 self.size,
154 "/".into(),
155 Arc::clone(&self.caches),
156 )
157 .await
158 }
159
160 pub fn set_goodbye_table_cache(
161 &mut self,
162 cache: Option<Arc<dyn Cache<u64, [GoodbyeItem]> + Send + Sync>>,
163 ) {
164 let new_caches = Arc::new(Caches {
165 gbt_cache: cache,
166 ..*self.caches
167 });
168 self.caches = new_caches;
169 }
170 }
171
172 async fn get_decoder<T: ReadAt>(
173 input: T,
174 entry_range: Range<u64>,
175 path: PathBuf,
176 ) -> io::Result<DecoderImpl<SeqReadAtAdapter<T>>> {
177 Ok(DecoderImpl::new_full(SeqReadAtAdapter::new(input, entry_range), path).await?)
178 }
179
180 impl<T: Clone + ReadAt> AccessorImpl<T> {
181 pub async fn open_root(&self) -> io::Result<DirectoryImpl<T>> {
182 DirectoryImpl::open_at_end(
183 self.input.clone(),
184 self.size,
185 "/".into(),
186 Arc::clone(&self.caches),
187 )
188 .await
189 }
190
191 /// Allow opening a directory at a specified offset.
192 pub async unsafe fn open_dir_at_end(&self, offset: u64) -> io::Result<DirectoryImpl<T>> {
193 DirectoryImpl::open_at_end(
194 self.input.clone(),
195 offset,
196 "/".into(),
197 Arc::clone(&self.caches),
198 )
199 .await
200 }
201
202 /// Allow opening a regular file from a specified range.
203 pub async unsafe fn open_file_at_range(
204 &self,
205 range: Range<u64>,
206 ) -> io::Result<FileEntryImpl<T>> {
207 let mut decoder = get_decoder(self.input.clone(), range.clone(), PathBuf::new()).await?;
208 let entry = decoder
209 .next()
210 .await
211 .ok_or_else(|| io_format_err!("unexpected EOF while decoding file entry"))??;
212 Ok(FileEntryImpl {
213 input: self.input.clone(),
214 entry,
215 entry_range: range,
216 caches: Arc::clone(&self.caches),
217 })
218 }
219
220 /// Allow opening arbitrary contents from a specific range.
221 pub unsafe fn open_contents_at_range(&self, range: Range<u64>) -> FileContentsImpl<T> {
222 FileContentsImpl::new(self.input.clone(), range)
223 }
224 }
225
226 /// The directory random-access state machine implementation.
227 pub(crate) struct DirectoryImpl<T> {
228 input: T,
229 entry_ofs: u64,
230 goodbye_ofs: u64,
231 size: u64,
232 table: Arc<[GoodbyeItem]>,
233 path: PathBuf,
234 caches: Arc<Caches>,
235 }
236
237 impl<T: Clone + ReadAt> DirectoryImpl<T> {
238 /// Open a directory ending at the specified position.
239 async fn open_at_end(
240 input: T,
241 end_offset: u64,
242 path: PathBuf,
243 caches: Arc<Caches>,
244 ) -> io::Result<DirectoryImpl<T>> {
245 let tail = Self::read_tail_entry(&input, end_offset).await?;
246
247 if end_offset < tail.size {
248 io_bail!("goodbye tail size out of range");
249 }
250
251 let goodbye_ofs = end_offset - tail.size;
252
253 if goodbye_ofs < tail.offset {
254 io_bail!("goodbye offset out of range");
255 }
256
257 let entry_ofs = goodbye_ofs - tail.offset;
258 let size = end_offset - entry_ofs;
259
260 let table: Option<Arc<[GoodbyeItem]>> = caches
261 .gbt_cache
262 .as_ref()
263 .and_then(|cache| cache.fetch(goodbye_ofs));
264
265 let mut this = Self {
266 input,
267 entry_ofs,
268 goodbye_ofs,
269 size,
270 table: table.as_ref().map_or_else(|| Arc::new([]), Arc::clone),
271 path,
272 caches,
273 };
274
275 // sanity check:
276 if this.table_size() % (size_of::<GoodbyeItem>() as u64) != 0 {
277 io_bail!("invalid goodbye table size: {}", this.table_size());
278 }
279
280 if table.is_none() {
281 this.table = this.load_table().await?;
282 if let Some(ref cache) = this.caches.gbt_cache {
283 cache.insert(goodbye_ofs, Arc::clone(&this.table));
284 }
285 }
286
287 Ok(this)
288 }
289
290 /// Load the entire goodbye table:
291 async fn load_table(&self) -> io::Result<Arc<[GoodbyeItem]>> {
292 let len = self.len();
293 let mut data = Vec::with_capacity(self.len());
294 unsafe {
295 data.set_len(len);
296 let slice = std::slice::from_raw_parts_mut(
297 data.as_mut_ptr() as *mut u8,
298 len * size_of::<GoodbyeItem>(),
299 );
300 read_exact_at(&self.input, slice, self.table_offset()).await?;
301 drop(slice);
302 }
303 Ok(Arc::from(data))
304 }
305
306 #[inline]
307 fn end_offset(&self) -> u64 {
308 self.entry_ofs + self.size
309 }
310
311 #[inline]
312 fn entry_range(&self) -> Range<u64> {
313 self.entry_ofs..self.end_offset()
314 }
315
316 #[inline]
317 fn table_size(&self) -> u64 {
318 (self.end_offset() - self.goodbye_ofs) - (size_of::<format::Header>() as u64)
319 }
320
321 #[inline]
322 fn table_offset(&self) -> u64 {
323 self.goodbye_ofs + (size_of::<format::Header>() as u64)
324 }
325
326 /// Length *excluding* the tail marker!
327 #[inline]
328 fn len(&self) -> usize {
329 (self.table_size() / (size_of::<GoodbyeItem>() as u64)) as usize - 1
330 }
331
332 /// Read the goodbye tail and perform some sanity checks.
333 async fn read_tail_entry(input: &T, end_offset: u64) -> io::Result<GoodbyeItem> {
334 if end_offset < (size_of::<GoodbyeItem>() as u64) {
335 io_bail!("goodbye tail does not fit");
336 }
337
338 let tail_offset = end_offset - (size_of::<GoodbyeItem>() as u64);
339 let tail: GoodbyeItem = read_entry_at(input, tail_offset).await?;
340
341 if tail.hash != format::PXAR_GOODBYE_TAIL_MARKER {
342 io_bail!("no goodbye tail marker found");
343 }
344
345 Ok(tail)
346 }
347
348 /// Get a decoder for the directory contents.
349 pub(crate) async fn decode_full(&self) -> io::Result<DecoderImpl<SeqReadAtAdapter<T>>> {
350 let (dir, decoder) = self.decode_one_entry(self.entry_range(), None).await?;
351 if !dir.is_dir() {
352 io_bail!("directory does not seem to be a directory");
353 }
354 Ok(decoder)
355 }
356
357 async fn get_decoder(
358 &self,
359 entry_range: Range<u64>,
360 file_name: Option<&Path>,
361 ) -> io::Result<DecoderImpl<SeqReadAtAdapter<T>>> {
362 get_decoder(
363 self.input.clone(),
364 entry_range,
365 match file_name {
366 None => self.path.clone(),
367 Some(file) => self.path.join(file),
368 },
369 )
370 .await
371 }
372
373 async fn decode_one_entry(
374 &self,
375 entry_range: Range<u64>,
376 file_name: Option<&Path>,
377 ) -> io::Result<(Entry, DecoderImpl<SeqReadAtAdapter<T>>)> {
378 let mut decoder = self.get_decoder(entry_range, file_name).await?;
379 let entry = decoder
380 .next()
381 .await
382 .ok_or_else(|| io_format_err!("unexpected EOF while decoding directory entry"))??;
383 Ok((entry, decoder))
384 }
385
386 fn lookup_hash_position(&self, hash: u64, start: usize, skip: usize) -> Option<usize> {
387 binary_tree_array::search_by(&self.table, start, skip, |i| hash.cmp(&i.hash))
388 }
389
390 pub async fn lookup_self(&self) -> io::Result<FileEntryImpl<T>> {
391 let (entry, _decoder) = self.decode_one_entry(self.entry_range(), None).await?;
392 Ok(FileEntryImpl {
393 input: self.input.clone(),
394 entry,
395 entry_range: self.entry_range(),
396 caches: Arc::clone(&self.caches),
397 })
398 }
399
400 /// Lookup a directory entry.
401 pub async fn lookup(&self, path: &Path) -> io::Result<Option<FileEntryImpl<T>>> {
402 let mut cur: Option<FileEntryImpl<T>> = None;
403
404 let mut first = true;
405 for component in path.components() {
406 use std::path::Component;
407
408 let first = mem::replace(&mut first, false);
409
410 let component = match component {
411 Component::Normal(path) => path,
412 Component::ParentDir => io_bail!("cannot enter parent directory in archive"),
413 Component::RootDir | Component::CurDir if first => {
414 cur = Some(self.lookup_self().await?);
415 continue;
416 }
417 Component::CurDir => continue,
418 _ => io_bail!("invalid component in path"),
419 };
420
421 let next = match cur {
422 Some(entry) => {
423 entry
424 .enter_directory()
425 .await?
426 .lookup_component(component)
427 .await?
428 }
429 None => self.lookup_component(component).await?,
430 };
431
432 if next.is_none() {
433 return Ok(None);
434 }
435
436 cur = next;
437 }
438
439 Ok(cur)
440 }
441
442 /// Lookup a single directory entry component (does not handle multiple components in path)
443 pub async fn lookup_component(&self, path: &OsStr) -> io::Result<Option<FileEntryImpl<T>>> {
444 let hash = format::hash_filename(path.as_bytes());
445 let first_index = match self.lookup_hash_position(hash, 0, 0) {
446 Some(index) => index,
447 None => return Ok(None),
448 };
449
450 // Lookup FILENAME, if the hash matches but the filename doesn't, check for a duplicate
451 // hash once found, use the GoodbyeItem's offset+size as well as the file's Entry to return
452 // a DirEntry::Dir or Dir::Entry.
453 //
454 let mut dup = 0;
455 loop {
456 let index = match self.lookup_hash_position(hash, first_index, dup) {
457 Some(index) => index,
458 None => return Ok(None),
459 };
460
461 let cursor = self.get_cursor(index).await?;
462 if cursor.file_name == path {
463 return Ok(Some(cursor.decode_entry().await?));
464 }
465
466 dup += 1;
467 }
468 }
469
470 async fn get_cursor<'a>(&'a self, index: usize) -> io::Result<DirEntryImpl<'a, T>> {
471 let entry = &self.table[index];
472 let file_goodbye_ofs = entry.offset;
473 if self.goodbye_ofs < file_goodbye_ofs {
474 io_bail!("invalid file offset");
475 }
476
477 let file_ofs = self.goodbye_ofs - file_goodbye_ofs;
478 let (file_name, entry_ofs) = self.read_filename_entry(file_ofs).await?;
479
480 let entry_range = Range {
481 start: entry_ofs,
482 end: file_ofs + entry.size,
483 };
484 if entry_range.end < entry_range.start {
485 io_bail!(
486 "bad file: invalid entry ranges for {:?}: \
487 start=0x{:x}, file_ofs=0x{:x}, size=0x{:x}",
488 file_name,
489 entry_ofs,
490 file_ofs,
491 entry.size,
492 );
493 }
494
495 Ok(DirEntryImpl {
496 dir: self,
497 file_name,
498 entry_range,
499 caches: Arc::clone(&self.caches),
500 })
501 }
502
503 async fn read_filename_entry(&self, file_ofs: u64) -> io::Result<(PathBuf, u64)> {
504 let head: format::Header = read_entry_at(&self.input, file_ofs).await?;
505 if head.htype != format::PXAR_FILENAME {
506 io_bail!("expected PXAR_FILENAME header, found: {:x}", head.htype);
507 }
508
509 let mut path = read_exact_data_at(
510 &self.input,
511 head.content_size() as usize,
512 file_ofs + (size_of_val(&head) as u64),
513 )
514 .await?;
515
516 if path.pop() != Some(0) {
517 io_bail!("invalid file name (missing terminating zero)");
518 }
519
520 if path.is_empty() {
521 io_bail!("invalid empty file name");
522 }
523
524 let file_name = PathBuf::from(OsString::from_vec(path));
525 format::check_file_name(&file_name)?;
526
527 Ok((file_name, file_ofs + head.full_size()))
528 }
529
530 pub fn read_dir(&self) -> ReadDirImpl<T> {
531 ReadDirImpl::new(self, 0)
532 }
533
534 pub fn entry_count(&self) -> usize {
535 self.table.len()
536 }
537 }
538
539 /// A file entry retrieved from a Directory.
540 #[derive(Clone)]
541 pub(crate) struct FileEntryImpl<T: Clone + ReadAt> {
542 input: T,
543 entry: Entry,
544 entry_range: Range<u64>,
545 caches: Arc<Caches>,
546 }
547
548 impl<T: Clone + ReadAt> FileEntryImpl<T> {
549 pub async fn enter_directory(&self) -> io::Result<DirectoryImpl<T>> {
550 if !self.entry.is_dir() {
551 io_bail!("enter_directory() on a non-directory");
552 }
553
554 DirectoryImpl::open_at_end(
555 self.input.clone(),
556 self.entry_range.end,
557 self.entry.path.clone(),
558 Arc::clone(&self.caches),
559 )
560 .await
561 }
562
563 /// For use with unsafe accessor methods.
564 pub fn content_range(&self) -> io::Result<Option<Range<u64>>> {
565 match self.entry.kind {
566 EntryKind::File { offset: None, .. } => {
567 io_bail!("cannot open file, reader provided no offset")
568 }
569 EntryKind::File {
570 size,
571 offset: Some(offset),
572 } => Ok(Some(offset..(offset + size))),
573 _ => Ok(None),
574 }
575 }
576
577 pub async fn contents(&self) -> io::Result<FileContentsImpl<T>> {
578 match self.content_range()? {
579 Some(range) => Ok(FileContentsImpl::new(self.input.clone(), range)),
580 None => io_bail!("not a file"),
581 }
582 }
583
584 #[inline]
585 pub fn into_entry(self) -> Entry {
586 self.entry
587 }
588
589 #[inline]
590 pub fn entry(&self) -> &Entry {
591 &self.entry
592 }
593
594 /// Exposed for raw by-offset access methods (use with `open_dir_at_end`).
595 #[inline]
596 pub fn entry_range(&self) -> Range<u64> {
597 self.entry_range.clone()
598 }
599 }
600
601 /// An iterator over the contents of a directory.
602 pub(crate) struct ReadDirImpl<'a, T> {
603 dir: &'a DirectoryImpl<T>,
604 at: usize,
605 }
606
607 impl<'a, T: Clone + ReadAt> ReadDirImpl<'a, T> {
608 fn new(dir: &'a DirectoryImpl<T>, at: usize) -> Self {
609 Self { dir, at }
610 }
611
612 /// Get the next entry.
613 pub async fn next(&mut self) -> io::Result<Option<DirEntryImpl<'a, T>>> {
614 if self.at == self.dir.table.len() {
615 Ok(None)
616 } else {
617 let cursor = self.dir.get_cursor(self.at).await?;
618 self.at += 1;
619 Ok(Some(cursor))
620 }
621 }
622
623 /// Efficient alternative to `Iterator::skip`.
624 #[inline]
625 pub fn skip(self, n: usize) -> Self {
626 Self {
627 at: (self.at + n).min(self.dir.table.len()),
628 dir: self.dir,
629 }
630 }
631
632 /// Efficient alternative to `Iterator::count`.
633 #[inline]
634 pub fn count(self) -> usize {
635 self.dir.table.len()
636 }
637 }
638
639 /// A cursor pointing to a file in a directory.
640 ///
641 /// At this point only the file name has been read and we remembered the position for finding the
642 /// actual data. This can be upgraded into a FileEntryImpl.
643 pub(crate) struct DirEntryImpl<'a, T: Clone + ReadAt> {
644 dir: &'a DirectoryImpl<T>,
645 file_name: PathBuf,
646 entry_range: Range<u64>,
647 caches: Arc<Caches>,
648 }
649
650 impl<'a, T: Clone + ReadAt> DirEntryImpl<'a, T> {
651 pub fn file_name(&self) -> &Path {
652 &self.file_name
653 }
654
655 async fn decode_entry(&self) -> io::Result<FileEntryImpl<T>> {
656 let (entry, _decoder) = self
657 .dir
658 .decode_one_entry(self.entry_range.clone(), Some(&self.file_name))
659 .await?;
660
661 Ok(FileEntryImpl {
662 input: self.dir.input.clone(),
663 entry,
664 entry_range: self.entry_range(),
665 caches: Arc::clone(&self.caches),
666 })
667 }
668
669 /// Exposed for raw by-offset access methods.
670 #[inline]
671 pub fn entry_range(&self) -> Range<u64> {
672 self.entry_range.clone()
673 }
674 }
675
676 /// A reader for file contents.
677 pub(crate) struct FileContentsImpl<T> {
678 input: T,
679
680 /// Absolute offset inside the `input`.
681 range: Range<u64>,
682 }
683
684 impl<T: Clone + ReadAt> FileContentsImpl<T> {
685 pub fn new(input: T, range: Range<u64>) -> Self {
686 Self { input, range }
687 }
688
689 #[inline]
690 pub fn file_size(&self) -> u64 {
691 self.range.end - self.range.start
692 }
693
694 async fn read_at(&self, mut buf: &mut [u8], offset: u64) -> io::Result<usize> {
695 let size = self.file_size();
696 if offset >= size {
697 return Ok(0);
698 }
699 let remaining = size - offset;
700
701 if remaining < buf.len() as u64 {
702 buf = &mut buf[..(remaining as usize)];
703 }
704
705 read_at(&self.input, buf, self.range.start + offset).await
706 }
707 }
708
709 impl<T: Clone + ReadAt> ReadAt for FileContentsImpl<T> {
710 fn poll_read_at(
711 self: Pin<&Self>,
712 cx: &mut Context,
713 mut buf: &mut [u8],
714 offset: u64,
715 ) -> Poll<io::Result<usize>> {
716 let size = self.file_size();
717 if offset >= size {
718 return Poll::Ready(Ok(0));
719 }
720 let remaining = size - offset;
721
722 if remaining < buf.len() as u64 {
723 buf = &mut buf[..(remaining as usize)];
724 }
725
726 let offset = self.range.start + offset;
727 unsafe { self.map_unchecked(|this| &this.input) }.poll_read_at(cx, buf, offset)
728 }
729 }
730
731 #[doc(hidden)]
732 pub struct SeqReadAtAdapter<T> {
733 input: T,
734 range: Range<u64>,
735 }
736
737 impl<T: ReadAt> SeqReadAtAdapter<T> {
738 pub fn new(input: T, range: Range<u64>) -> Self {
739 if range.end < range.start {
740 panic!("BAD SEQ READ AT ADAPTER");
741 }
742 Self { input, range }
743 }
744
745 #[inline]
746 fn remaining(&self) -> usize {
747 (self.range.end - self.range.start) as usize
748 }
749 }
750
751 impl<T: ReadAt> decoder::SeqRead for SeqReadAtAdapter<T> {
752 fn poll_seq_read(
753 self: Pin<&mut Self>,
754 cx: &mut Context,
755 buf: &mut [u8],
756 ) -> Poll<io::Result<usize>> {
757 let len = buf.len().min(self.remaining());
758 let buf = &mut buf[..len];
759
760 let this = unsafe { self.get_unchecked_mut() };
761
762 let got = ready!(unsafe {
763 Pin::new_unchecked(&this.input).poll_read_at(cx, buf, this.range.start)
764 })?;
765 this.range.start += got as u64;
766 Poll::Ready(Ok(got))
767 }
768
769 fn poll_position(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<io::Result<u64>>> {
770 Poll::Ready(Some(Ok(self.range.start)))
771 }
772 }