]> git.proxmox.com Git - pxar.git/blame - src/accessor/aio.rs
add more code documentation
[pxar.git] / src / accessor / aio.rs
CommitLineData
6cd4f635
WB
1//! Asynchronous `pxar` random-access handling.
2//!
3//! Currently neither tokio nor futures have an `AsyncFileExt` variant.
9de9cca9
WB
4//!
5//! TODO: Implement a locking version for AsyncSeek+AsyncRead files?
d3a83ee3 6
e72062a9 7use std::future::Future;
d3a83ee3 8use std::io;
e72062a9 9use std::mem;
d3a83ee3
WB
10use std::ops::Range;
11use std::os::unix::fs::FileExt;
12use std::path::Path;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{Context, Poll};
16
e72062a9 17use crate::accessor::{self, cache::Cache, MaybeReady, ReadAt, ReadAtOperation};
afe05f3f 18use crate::decoder::aio::Decoder;
06070d26 19use crate::format::GoodbyeItem;
e72062a9 20use crate::util;
d3a83ee3
WB
21use crate::Entry;
22
23use super::sync::{FileReader, FileRefReader};
24
25/// Asynchronous `pxar` random-access decoder.
26///
27///
28/// This is the `async` version of the `pxar` accessor.
29#[repr(transparent)]
30pub struct Accessor<T> {
31 inner: accessor::AccessorImpl<T>,
32}
33
34impl<T: FileExt> Accessor<FileReader<T>> {
35 /// Decode a `pxar` archive from a standard file implementing `FileExt`.
36 ///
37 /// Note that "plain files" don't normally block on `read(2)` operations anyway, and tokio has
38 /// no support for asynchronous `read_at` operations, so we allow creating an accessor backed
39 /// by a blocking file.
40 #[inline]
41 pub async fn from_file_and_size(input: T, size: u64) -> io::Result<Self> {
42 Accessor::new(FileReader::new(input), size).await
43 }
44}
45
46impl Accessor<FileReader<std::fs::File>> {
47 /// Decode a `pxar` archive from a regular `std::io::File` input.
48 #[inline]
49 pub async fn from_file(input: std::fs::File) -> io::Result<Self> {
50 let size = input.metadata()?.len();
51 Accessor::from_file_and_size(input, size).await
52 }
53
54 /// Convenience shortcut for `File::open` followed by `Accessor::from_file`.
55 pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
56 Self::from_file(std::fs::File::open(path.as_ref())?).await
57 }
58}
59
60impl<T: Clone + std::ops::Deref<Target = std::fs::File>> Accessor<FileRefReader<T>> {
61 /// Open an `Arc` or `Rc` of `File`.
62 pub async fn from_file_ref(input: T) -> io::Result<Self> {
63 let size = input.deref().metadata()?.len();
64 Accessor::from_file_ref_and_size(input, size).await
65 }
66}
67
68impl<T> Accessor<FileRefReader<T>>
69where
70 T: Clone + std::ops::Deref,
71 T::Target: FileExt,
72{
73 /// Open an `Arc` or `Rc` of `File`.
74 pub async fn from_file_ref_and_size(
75 input: T,
76 size: u64,
77 ) -> io::Result<Accessor<FileRefReader<T>>> {
78 Accessor::new(FileRefReader::new(input), size).await
79 }
80}
81
82impl<T: ReadAt> Accessor<T> {
83 /// Create a *blocking* random-access decoder from an input implementing our internal read
84 /// interface.
85 ///
86 /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is
87 /// not allowed to use the `Waker`, as this will cause a `panic!`.
88 pub async fn new(input: T, size: u64) -> io::Result<Self> {
89 Ok(Self {
90 inner: accessor::AccessorImpl::new(input, size).await?,
91 })
92 }
93
94 /// Open a directory handle to the root of the pxar archive.
1b25fc08 95 pub async fn open_root_ref(&self) -> io::Result<Directory<&dyn ReadAt>> {
d3a83ee3
WB
96 Ok(Directory::new(self.inner.open_root_ref().await?))
97 }
98
e5a2495e 99 /// Set a cache for the goodbye tables to reduce random disk access.
d3a83ee3
WB
100 pub fn set_goodbye_table_cache<C>(&mut self, cache: Option<C>)
101 where
102 C: Cache<u64, [GoodbyeItem]> + Send + Sync + 'static,
103 {
104 self.inner
105 .set_goodbye_table_cache(cache.map(|cache| Arc::new(cache) as _))
106 }
107
108 /// Get the full archive size we're allowed to access.
109 #[inline]
110 pub fn size(&self) -> u64 {
111 self.inner.size()
112 }
113}
114
115impl<T: Clone + ReadAt> Accessor<T> {
e5a2495e 116 /// Open the "root" directory entry of this pxar archive.
d3a83ee3
WB
117 pub async fn open_root(&self) -> io::Result<Directory<T>> {
118 Ok(Directory::new(self.inner.open_root().await?))
119 }
120
121 /// Allow opening a directory at a specified offset.
1b25fc08
WB
122 ///
123 /// # Safety
124 ///
125 /// This should only be used with offsets known to point to the end of a directory, otherwise
126 /// this usually fails, unless the data otherwise happens to look like a valid directory.
d3a83ee3
WB
127 pub async unsafe fn open_dir_at_end(&self, offset: u64) -> io::Result<Directory<T>> {
128 Ok(Directory::new(self.inner.open_dir_at_end(offset).await?))
129 }
130
131 /// Allow opening a regular file from a specified range.
1b25fc08
WB
132 ///
133 /// # Safety
134 ///
135 /// Should only be used with `entry_range_info`s originating from the same archive, otherwise
136 /// the result will be undefined and likely fail (or contain unexpected data).
06070d26
WB
137 pub async unsafe fn open_file_at_range(
138 &self,
139 entry_range_info: &accessor::EntryRangeInfo,
140 ) -> io::Result<FileEntry<T>> {
d3a83ee3 141 Ok(FileEntry {
06070d26 142 inner: self.inner.open_file_at_range(entry_range_info).await?,
d3a83ee3
WB
143 })
144 }
145
146 /// Allow opening arbitrary contents from a specific range.
1b25fc08
WB
147 ///
148 /// # Safety
149 ///
150 /// This will provide a reader over an arbitrary range of the archive file, so unless this
151 /// comes from a actual file entry data, the contents might not make much sense.
d3a83ee3
WB
152 pub unsafe fn open_contents_at_range(&self, range: Range<u64>) -> FileContents<T> {
153 FileContents {
154 inner: self.inner.open_contents_at_range(range),
155 at: 0,
e72062a9
WB
156 buffer: Vec::new(),
157 future: None,
d3a83ee3
WB
158 }
159 }
6bfadb8a
WB
160
161 /// Following a hardlink.
06070d26 162 pub async fn follow_hardlink(&self, entry: &FileEntry<T>) -> io::Result<FileEntry<T>> {
6bfadb8a 163 Ok(FileEntry {
06070d26 164 inner: self.inner.follow_hardlink(&entry.inner).await?,
6bfadb8a
WB
165 })
166 }
d3a83ee3
WB
167}
168
169/// A pxar directory entry. This provdies blocking access to the contents of a directory.
170#[repr(transparent)]
171pub struct Directory<T> {
172 inner: accessor::DirectoryImpl<T>,
173}
174
175impl<T: Clone + ReadAt> Directory<T> {
176 fn new(inner: accessor::DirectoryImpl<T>) -> Self {
177 Self { inner }
178 }
179
180 /// Get a decoder for the directory contents.
181 pub async fn decode_full(&self) -> io::Result<Decoder<accessor::SeqReadAtAdapter<T>>> {
182 Ok(Decoder::from_impl(self.inner.decode_full().await?))
183 }
184
185 /// Get a `FileEntry` referencing the directory itself.
186 ///
187 /// Helper function for our fuse implementation.
188 pub async fn lookup_self(&self) -> io::Result<FileEntry<T>> {
189 Ok(FileEntry {
190 inner: self.inner.lookup_self().await?,
191 })
192 }
193
4af15944
WB
194 /// Lookup an entry starting from this current directory.
195 ///
196 /// For convenience, this already resolves paths with multiple components.
d3a83ee3
WB
197 pub async fn lookup<P: AsRef<Path>>(&self, path: P) -> io::Result<Option<FileEntry<T>>> {
198 if let Some(file_entry) = self.inner.lookup(path.as_ref()).await? {
199 Ok(Some(FileEntry { inner: file_entry }))
200 } else {
201 Ok(None)
202 }
203 }
204
205 /// Get an iterator over the directory's contents.
1b25fc08 206 pub fn read_dir(&self) -> ReadDir<T> {
d3a83ee3
WB
207 ReadDir {
208 inner: self.inner.read_dir(),
209 }
210 }
211
212 /// Get the number of entries in this directory.
213 #[inline]
214 pub fn entry_count(&self) -> usize {
215 self.inner.entry_count()
216 }
217}
218
219/// A file entry in a direcotry, retrieved via the `lookup` method or from
220/// `DirEntry::decode_entry``.
93fa37fb 221#[derive(Clone)]
d3a83ee3
WB
222#[repr(transparent)]
223pub struct FileEntry<T: Clone + ReadAt> {
224 inner: accessor::FileEntryImpl<T>,
225}
226
227impl<T: Clone + ReadAt> FileEntry<T> {
228 /// Get a handle to the subdirectory this file entry points to, if it is in fact a directory.
229 pub async fn enter_directory(&self) -> io::Result<Directory<T>> {
230 Ok(Directory::new(self.inner.enter_directory().await?))
231 }
232
233 /// For use with unsafe accessor methods.
234 pub fn content_range(&self) -> io::Result<Option<Range<u64>>> {
235 self.inner.content_range()
236 }
237
e5a2495e 238 /// Get the file's contents.
d3a83ee3
WB
239 pub async fn contents(&self) -> io::Result<FileContents<T>> {
240 Ok(FileContents {
241 inner: self.inner.contents().await?,
242 at: 0,
e72062a9
WB
243 buffer: Vec::new(),
244 future: None,
d3a83ee3
WB
245 })
246 }
247
e5a2495e
WB
248 /// Convenience shortcut for when only the metadata contained in the [`Entry`] struct is of
249 /// interest.
d3a83ee3
WB
250 #[inline]
251 pub fn into_entry(self) -> Entry {
252 self.inner.into_entry()
253 }
254
e5a2495e 255 /// Access the contained [`Entry`] for metadata access.
d3a83ee3
WB
256 #[inline]
257 pub fn entry(&self) -> &Entry {
258 &self.inner.entry()
259 }
260
261 /// Exposed for raw by-offset access methods (use with `open_dir_at_end`).
262 #[inline]
06070d26
WB
263 pub fn entry_range_info(&self) -> &accessor::EntryRangeInfo {
264 self.inner.entry_range_info()
d3a83ee3
WB
265 }
266}
267
268impl<T: Clone + ReadAt> std::ops::Deref for FileEntry<T> {
269 type Target = Entry;
270
271 fn deref(&self) -> &Self::Target {
272 self.entry()
273 }
274}
275
276/// An iterator over the contents of a `Directory`.
277#[repr(transparent)]
278pub struct ReadDir<'a, T> {
279 inner: accessor::ReadDirImpl<'a, T>,
280}
281
282impl<'a, T: Clone + ReadAt> ReadDir<'a, T> {
283 /// Efficient alternative to `Iterator::skip`.
284 #[inline]
285 pub fn skip(self, n: usize) -> Self {
286 Self {
287 inner: self.inner.skip(n),
288 }
289 }
290
291 /// Efficient alternative to `Iterator::count`.
292 #[inline]
293 pub fn count(self) -> usize {
294 self.inner.count()
295 }
296
e5a2495e 297 /// Get the next directory entry.
d3a83ee3
WB
298 pub async fn next(&mut self) -> Option<io::Result<DirEntry<'a, T>>> {
299 match self.inner.next().await {
300 Ok(Some(inner)) => Some(Ok(DirEntry { inner })),
301 Ok(None) => None,
302 Err(err) => Some(Err(err)),
303 }
304 }
305}
306
307/// A directory entry. When iterating through the contents of a directory we first get access to
308/// the file name. The remaining information can be decoded afterwards.
309#[repr(transparent)]
310pub struct DirEntry<'a, T: Clone + ReadAt> {
311 inner: accessor::DirEntryImpl<'a, T>,
312}
313
314impl<'a, T: Clone + ReadAt> DirEntry<'a, T> {
315 /// Get the current file name.
316 pub fn file_name(&self) -> &Path {
317 self.inner.file_name()
318 }
319
320 /// Decode the entry.
321 ///
322 /// When iterating over a directory, the file name is read separately from the file attributes,
323 /// so only the file name is available here, while the attributes still need to be decoded.
324 pub async fn decode_entry(&self) -> io::Result<FileEntry<T>> {
325 self.inner
326 .decode_entry()
327 .await
328 .map(|inner| FileEntry { inner })
329 }
330
331 /// Exposed for raw by-offset access methods.
332 #[inline]
06070d26
WB
333 pub fn entry_range_info(&self) -> &accessor::EntryRangeInfo {
334 self.inner.entry_range_info()
d3a83ee3
WB
335 }
336}
337
1b25fc08
WB
338/// File content read future result.
339struct ReadResult {
340 len: usize,
341 buffer: Vec<u8>,
342}
343
d3a83ee3
WB
344/// A reader for file contents.
345pub struct FileContents<T> {
346 inner: accessor::FileContentsImpl<T>,
347 at: u64,
e72062a9 348 buffer: Vec<u8>,
1b25fc08 349 future: Option<Pin<Box<dyn Future<Output = io::Result<ReadResult>> + 'static>>>,
e72062a9
WB
350}
351
352// We lose `Send` via the boxed trait object and don't want to force the trait object to
353// potentially be more strict than `T`, so we leave it as it is ans implement Send and Sync
354// depending on T.
355unsafe impl<T: Send> Send for FileContents<T> {}
356unsafe impl<T: Sync> Sync for FileContents<T> {}
357
7aee9c1f
FG
358#[cfg(feature = "tokio-io")]
359impl<T: Clone + ReadAt> tokio::io::AsyncRead for FileContents<T> {
360 fn poll_read(
e72062a9
WB
361 self: Pin<&mut Self>,
362 cx: &mut Context,
7aee9c1f
FG
363 dest: &mut tokio::io::ReadBuf,
364 ) -> Poll<io::Result<()>> {
e72062a9
WB
365 let this = unsafe { Pin::into_inner_unchecked(self) };
366 loop {
367 match this.future.take() {
368 None => {
369 let mut buffer = mem::take(&mut this.buffer);
7aee9c1f 370 util::scale_read_buffer(&mut buffer, dest.remaining());
e72062a9
WB
371 let reader: accessor::FileContentsImpl<T> = this.inner.clone();
372 let at = this.at;
1b25fc08 373 let future: Pin<Box<dyn Future<Output = io::Result<ReadResult>>>> =
e72062a9 374 Box::pin(async move {
1b25fc08
WB
375 let len = reader.read_at(&mut buffer, at).await?;
376 io::Result::Ok(ReadResult { len, buffer })
e72062a9
WB
377 });
378 // This future has the lifetime from T. Self also has this lifetime and we
379 // store this in a pinned self. T maybe a reference with a non-'static life
380 // time, but then it cannot be a self-reference into Self, so this should be
381 // valid in all cases:
382 this.future = Some(unsafe { mem::transmute(future) });
383 }
384 Some(mut fut) => match fut.as_mut().poll(cx) {
385 Poll::Pending => {
386 this.future = Some(fut);
387 return Poll::Pending;
388 }
389 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
1b25fc08 390 Poll::Ready(Ok(ReadResult { len: got, buffer })) => {
e72062a9
WB
391 this.buffer = buffer;
392 this.at += got as u64;
7aee9c1f
FG
393 let len = got.min(dest.remaining());
394 dest.put_slice(&this.buffer[..len]);
395 return Poll::Ready(Ok(()));
e72062a9
WB
396 }
397 },
398 }
399 }
400 }
d3a83ee3
WB
401}
402
d3a83ee3 403impl<T: Clone + ReadAt> ReadAt for FileContents<T> {
e72062a9
WB
404 fn start_read_at<'a>(
405 self: Pin<&'a Self>,
d3a83ee3 406 cx: &mut Context,
e72062a9 407 buf: &'a mut [u8],
d3a83ee3 408 offset: u64,
e72062a9
WB
409 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
410 unsafe { self.map_unchecked(|this| &this.inner) }.start_read_at(cx, buf, offset)
411 }
412
413 fn poll_complete<'a>(
414 self: Pin<&'a Self>,
415 op: ReadAtOperation<'a>,
416 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
417 unsafe { self.map_unchecked(|this| &this.inner) }.poll_complete(op)
d3a83ee3
WB
418 }
419}
c3bba169
WB
420
421impl<T: Clone + ReadAt> FileContents<T> {
422 /// Convenience helper for `read_at`:
423 pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
e72062a9 424 self.inner.read_at(buf, offset).await
c3bba169
WB
425 }
426}