]>
Commit | Line | Data |
---|---|---|
6cd4f635 WB |
1 | //! Asynchronous `pxar` random-access handling. |
2 | //! | |
3 | //! Currently neither tokio nor futures have an `AsyncFileExt` variant. | |
9de9cca9 WB |
4 | //! |
5 | //! TODO: Implement a locking version for AsyncSeek+AsyncRead files? | |
d3a83ee3 | 6 | |
e72062a9 | 7 | use std::future::Future; |
d3a83ee3 | 8 | use std::io; |
e72062a9 | 9 | use std::mem; |
d3a83ee3 WB |
10 | use std::ops::Range; |
11 | use std::os::unix::fs::FileExt; | |
12 | use std::path::Path; | |
13 | use std::pin::Pin; | |
14 | use std::sync::Arc; | |
15 | use std::task::{Context, Poll}; | |
16 | ||
e72062a9 | 17 | use crate::accessor::{self, cache::Cache, MaybeReady, ReadAt, ReadAtOperation}; |
afe05f3f | 18 | use crate::decoder::aio::Decoder; |
06070d26 | 19 | use crate::format::GoodbyeItem; |
e72062a9 | 20 | use crate::util; |
d3a83ee3 WB |
21 | use crate::Entry; |
22 | ||
23 | use super::sync::{FileReader, FileRefReader}; | |
24 | ||
25 | /// Asynchronous `pxar` random-access decoder. | |
26 | /// | |
27 | /// | |
28 | /// This is the `async` version of the `pxar` accessor. | |
29 | #[repr(transparent)] | |
30 | pub struct Accessor<T> { | |
31 | inner: accessor::AccessorImpl<T>, | |
32 | } | |
33 | ||
34 | impl<T: FileExt> Accessor<FileReader<T>> { | |
35 | /// Decode a `pxar` archive from a standard file implementing `FileExt`. | |
36 | /// | |
37 | /// Note that "plain files" don't normally block on `read(2)` operations anyway, and tokio has | |
38 | /// no support for asynchronous `read_at` operations, so we allow creating an accessor backed | |
39 | /// by a blocking file. | |
40 | #[inline] | |
41 | pub async fn from_file_and_size(input: T, size: u64) -> io::Result<Self> { | |
42 | Accessor::new(FileReader::new(input), size).await | |
43 | } | |
44 | } | |
45 | ||
46 | impl Accessor<FileReader<std::fs::File>> { | |
47 | /// Decode a `pxar` archive from a regular `std::io::File` input. | |
48 | #[inline] | |
49 | pub async fn from_file(input: std::fs::File) -> io::Result<Self> { | |
50 | let size = input.metadata()?.len(); | |
51 | Accessor::from_file_and_size(input, size).await | |
52 | } | |
53 | ||
54 | /// Convenience shortcut for `File::open` followed by `Accessor::from_file`. | |
55 | pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> { | |
56 | Self::from_file(std::fs::File::open(path.as_ref())?).await | |
57 | } | |
58 | } | |
59 | ||
60 | impl<T: Clone + std::ops::Deref<Target = std::fs::File>> Accessor<FileRefReader<T>> { | |
61 | /// Open an `Arc` or `Rc` of `File`. | |
62 | pub async fn from_file_ref(input: T) -> io::Result<Self> { | |
63 | let size = input.deref().metadata()?.len(); | |
64 | Accessor::from_file_ref_and_size(input, size).await | |
65 | } | |
66 | } | |
67 | ||
68 | impl<T> Accessor<FileRefReader<T>> | |
69 | where | |
70 | T: Clone + std::ops::Deref, | |
71 | T::Target: FileExt, | |
72 | { | |
73 | /// Open an `Arc` or `Rc` of `File`. | |
74 | pub async fn from_file_ref_and_size( | |
75 | input: T, | |
76 | size: u64, | |
77 | ) -> io::Result<Accessor<FileRefReader<T>>> { | |
78 | Accessor::new(FileRefReader::new(input), size).await | |
79 | } | |
80 | } | |
81 | ||
82 | impl<T: ReadAt> Accessor<T> { | |
83 | /// Create a *blocking* random-access decoder from an input implementing our internal read | |
84 | /// interface. | |
85 | /// | |
86 | /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is | |
87 | /// not allowed to use the `Waker`, as this will cause a `panic!`. | |
88 | pub async fn new(input: T, size: u64) -> io::Result<Self> { | |
89 | Ok(Self { | |
90 | inner: accessor::AccessorImpl::new(input, size).await?, | |
91 | }) | |
92 | } | |
93 | ||
94 | /// Open a directory handle to the root of the pxar archive. | |
1b25fc08 | 95 | pub async fn open_root_ref(&self) -> io::Result<Directory<&dyn ReadAt>> { |
d3a83ee3 WB |
96 | Ok(Directory::new(self.inner.open_root_ref().await?)) |
97 | } | |
98 | ||
e5a2495e | 99 | /// Set a cache for the goodbye tables to reduce random disk access. |
d3a83ee3 WB |
100 | pub fn set_goodbye_table_cache<C>(&mut self, cache: Option<C>) |
101 | where | |
102 | C: Cache<u64, [GoodbyeItem]> + Send + Sync + 'static, | |
103 | { | |
104 | self.inner | |
105 | .set_goodbye_table_cache(cache.map(|cache| Arc::new(cache) as _)) | |
106 | } | |
107 | ||
108 | /// Get the full archive size we're allowed to access. | |
109 | #[inline] | |
110 | pub fn size(&self) -> u64 { | |
111 | self.inner.size() | |
112 | } | |
113 | } | |
114 | ||
115 | impl<T: Clone + ReadAt> Accessor<T> { | |
e5a2495e | 116 | /// Open the "root" directory entry of this pxar archive. |
d3a83ee3 WB |
117 | pub async fn open_root(&self) -> io::Result<Directory<T>> { |
118 | Ok(Directory::new(self.inner.open_root().await?)) | |
119 | } | |
120 | ||
121 | /// Allow opening a directory at a specified offset. | |
1b25fc08 WB |
122 | /// |
123 | /// # Safety | |
124 | /// | |
125 | /// This should only be used with offsets known to point to the end of a directory, otherwise | |
126 | /// this usually fails, unless the data otherwise happens to look like a valid directory. | |
d3a83ee3 WB |
127 | pub async unsafe fn open_dir_at_end(&self, offset: u64) -> io::Result<Directory<T>> { |
128 | Ok(Directory::new(self.inner.open_dir_at_end(offset).await?)) | |
129 | } | |
130 | ||
131 | /// Allow opening a regular file from a specified range. | |
1b25fc08 WB |
132 | /// |
133 | /// # Safety | |
134 | /// | |
135 | /// Should only be used with `entry_range_info`s originating from the same archive, otherwise | |
136 | /// the result will be undefined and likely fail (or contain unexpected data). | |
06070d26 WB |
137 | pub async unsafe fn open_file_at_range( |
138 | &self, | |
139 | entry_range_info: &accessor::EntryRangeInfo, | |
140 | ) -> io::Result<FileEntry<T>> { | |
d3a83ee3 | 141 | Ok(FileEntry { |
06070d26 | 142 | inner: self.inner.open_file_at_range(entry_range_info).await?, |
d3a83ee3 WB |
143 | }) |
144 | } | |
145 | ||
146 | /// Allow opening arbitrary contents from a specific range. | |
1b25fc08 WB |
147 | /// |
148 | /// # Safety | |
149 | /// | |
150 | /// This will provide a reader over an arbitrary range of the archive file, so unless this | |
151 | /// comes from a actual file entry data, the contents might not make much sense. | |
d3a83ee3 WB |
152 | pub unsafe fn open_contents_at_range(&self, range: Range<u64>) -> FileContents<T> { |
153 | FileContents { | |
154 | inner: self.inner.open_contents_at_range(range), | |
155 | at: 0, | |
e72062a9 WB |
156 | buffer: Vec::new(), |
157 | future: None, | |
d3a83ee3 WB |
158 | } |
159 | } | |
6bfadb8a WB |
160 | |
161 | /// Following a hardlink. | |
06070d26 | 162 | pub async fn follow_hardlink(&self, entry: &FileEntry<T>) -> io::Result<FileEntry<T>> { |
6bfadb8a | 163 | Ok(FileEntry { |
06070d26 | 164 | inner: self.inner.follow_hardlink(&entry.inner).await?, |
6bfadb8a WB |
165 | }) |
166 | } | |
d3a83ee3 WB |
167 | } |
168 | ||
169 | /// A pxar directory entry. This provdies blocking access to the contents of a directory. | |
170 | #[repr(transparent)] | |
171 | pub struct Directory<T> { | |
172 | inner: accessor::DirectoryImpl<T>, | |
173 | } | |
174 | ||
175 | impl<T: Clone + ReadAt> Directory<T> { | |
176 | fn new(inner: accessor::DirectoryImpl<T>) -> Self { | |
177 | Self { inner } | |
178 | } | |
179 | ||
180 | /// Get a decoder for the directory contents. | |
181 | pub async fn decode_full(&self) -> io::Result<Decoder<accessor::SeqReadAtAdapter<T>>> { | |
182 | Ok(Decoder::from_impl(self.inner.decode_full().await?)) | |
183 | } | |
184 | ||
185 | /// Get a `FileEntry` referencing the directory itself. | |
186 | /// | |
187 | /// Helper function for our fuse implementation. | |
188 | pub async fn lookup_self(&self) -> io::Result<FileEntry<T>> { | |
189 | Ok(FileEntry { | |
190 | inner: self.inner.lookup_self().await?, | |
191 | }) | |
192 | } | |
193 | ||
4af15944 WB |
194 | /// Lookup an entry starting from this current directory. |
195 | /// | |
196 | /// For convenience, this already resolves paths with multiple components. | |
d3a83ee3 WB |
197 | pub async fn lookup<P: AsRef<Path>>(&self, path: P) -> io::Result<Option<FileEntry<T>>> { |
198 | if let Some(file_entry) = self.inner.lookup(path.as_ref()).await? { | |
199 | Ok(Some(FileEntry { inner: file_entry })) | |
200 | } else { | |
201 | Ok(None) | |
202 | } | |
203 | } | |
204 | ||
205 | /// Get an iterator over the directory's contents. | |
1b25fc08 | 206 | pub fn read_dir(&self) -> ReadDir<T> { |
d3a83ee3 WB |
207 | ReadDir { |
208 | inner: self.inner.read_dir(), | |
209 | } | |
210 | } | |
211 | ||
212 | /// Get the number of entries in this directory. | |
213 | #[inline] | |
214 | pub fn entry_count(&self) -> usize { | |
215 | self.inner.entry_count() | |
216 | } | |
217 | } | |
218 | ||
219 | /// A file entry in a direcotry, retrieved via the `lookup` method or from | |
220 | /// `DirEntry::decode_entry``. | |
93fa37fb | 221 | #[derive(Clone)] |
d3a83ee3 WB |
222 | #[repr(transparent)] |
223 | pub struct FileEntry<T: Clone + ReadAt> { | |
224 | inner: accessor::FileEntryImpl<T>, | |
225 | } | |
226 | ||
227 | impl<T: Clone + ReadAt> FileEntry<T> { | |
228 | /// Get a handle to the subdirectory this file entry points to, if it is in fact a directory. | |
229 | pub async fn enter_directory(&self) -> io::Result<Directory<T>> { | |
230 | Ok(Directory::new(self.inner.enter_directory().await?)) | |
231 | } | |
232 | ||
233 | /// For use with unsafe accessor methods. | |
234 | pub fn content_range(&self) -> io::Result<Option<Range<u64>>> { | |
235 | self.inner.content_range() | |
236 | } | |
237 | ||
e5a2495e | 238 | /// Get the file's contents. |
d3a83ee3 WB |
239 | pub async fn contents(&self) -> io::Result<FileContents<T>> { |
240 | Ok(FileContents { | |
241 | inner: self.inner.contents().await?, | |
242 | at: 0, | |
e72062a9 WB |
243 | buffer: Vec::new(), |
244 | future: None, | |
d3a83ee3 WB |
245 | }) |
246 | } | |
247 | ||
e5a2495e WB |
248 | /// Convenience shortcut for when only the metadata contained in the [`Entry`] struct is of |
249 | /// interest. | |
d3a83ee3 WB |
250 | #[inline] |
251 | pub fn into_entry(self) -> Entry { | |
252 | self.inner.into_entry() | |
253 | } | |
254 | ||
e5a2495e | 255 | /// Access the contained [`Entry`] for metadata access. |
d3a83ee3 WB |
256 | #[inline] |
257 | pub fn entry(&self) -> &Entry { | |
258 | &self.inner.entry() | |
259 | } | |
260 | ||
261 | /// Exposed for raw by-offset access methods (use with `open_dir_at_end`). | |
262 | #[inline] | |
06070d26 WB |
263 | pub fn entry_range_info(&self) -> &accessor::EntryRangeInfo { |
264 | self.inner.entry_range_info() | |
d3a83ee3 WB |
265 | } |
266 | } | |
267 | ||
268 | impl<T: Clone + ReadAt> std::ops::Deref for FileEntry<T> { | |
269 | type Target = Entry; | |
270 | ||
271 | fn deref(&self) -> &Self::Target { | |
272 | self.entry() | |
273 | } | |
274 | } | |
275 | ||
276 | /// An iterator over the contents of a `Directory`. | |
277 | #[repr(transparent)] | |
278 | pub struct ReadDir<'a, T> { | |
279 | inner: accessor::ReadDirImpl<'a, T>, | |
280 | } | |
281 | ||
282 | impl<'a, T: Clone + ReadAt> ReadDir<'a, T> { | |
283 | /// Efficient alternative to `Iterator::skip`. | |
284 | #[inline] | |
285 | pub fn skip(self, n: usize) -> Self { | |
286 | Self { | |
287 | inner: self.inner.skip(n), | |
288 | } | |
289 | } | |
290 | ||
291 | /// Efficient alternative to `Iterator::count`. | |
292 | #[inline] | |
293 | pub fn count(self) -> usize { | |
294 | self.inner.count() | |
295 | } | |
296 | ||
e5a2495e | 297 | /// Get the next directory entry. |
d3a83ee3 WB |
298 | pub async fn next(&mut self) -> Option<io::Result<DirEntry<'a, T>>> { |
299 | match self.inner.next().await { | |
300 | Ok(Some(inner)) => Some(Ok(DirEntry { inner })), | |
301 | Ok(None) => None, | |
302 | Err(err) => Some(Err(err)), | |
303 | } | |
304 | } | |
305 | } | |
306 | ||
307 | /// A directory entry. When iterating through the contents of a directory we first get access to | |
308 | /// the file name. The remaining information can be decoded afterwards. | |
309 | #[repr(transparent)] | |
310 | pub struct DirEntry<'a, T: Clone + ReadAt> { | |
311 | inner: accessor::DirEntryImpl<'a, T>, | |
312 | } | |
313 | ||
314 | impl<'a, T: Clone + ReadAt> DirEntry<'a, T> { | |
315 | /// Get the current file name. | |
316 | pub fn file_name(&self) -> &Path { | |
317 | self.inner.file_name() | |
318 | } | |
319 | ||
320 | /// Decode the entry. | |
321 | /// | |
322 | /// When iterating over a directory, the file name is read separately from the file attributes, | |
323 | /// so only the file name is available here, while the attributes still need to be decoded. | |
324 | pub async fn decode_entry(&self) -> io::Result<FileEntry<T>> { | |
325 | self.inner | |
326 | .decode_entry() | |
327 | .await | |
328 | .map(|inner| FileEntry { inner }) | |
329 | } | |
330 | ||
331 | /// Exposed for raw by-offset access methods. | |
332 | #[inline] | |
06070d26 WB |
333 | pub fn entry_range_info(&self) -> &accessor::EntryRangeInfo { |
334 | self.inner.entry_range_info() | |
d3a83ee3 WB |
335 | } |
336 | } | |
337 | ||
1b25fc08 WB |
338 | /// File content read future result. |
339 | struct ReadResult { | |
340 | len: usize, | |
341 | buffer: Vec<u8>, | |
342 | } | |
343 | ||
d3a83ee3 WB |
344 | /// A reader for file contents. |
345 | pub struct FileContents<T> { | |
346 | inner: accessor::FileContentsImpl<T>, | |
347 | at: u64, | |
e72062a9 | 348 | buffer: Vec<u8>, |
1b25fc08 | 349 | future: Option<Pin<Box<dyn Future<Output = io::Result<ReadResult>> + 'static>>>, |
e72062a9 WB |
350 | } |
351 | ||
352 | // We lose `Send` via the boxed trait object and don't want to force the trait object to | |
353 | // potentially be more strict than `T`, so we leave it as it is ans implement Send and Sync | |
354 | // depending on T. | |
355 | unsafe impl<T: Send> Send for FileContents<T> {} | |
356 | unsafe impl<T: Sync> Sync for FileContents<T> {} | |
357 | ||
7aee9c1f FG |
358 | #[cfg(feature = "tokio-io")] |
359 | impl<T: Clone + ReadAt> tokio::io::AsyncRead for FileContents<T> { | |
360 | fn poll_read( | |
e72062a9 WB |
361 | self: Pin<&mut Self>, |
362 | cx: &mut Context, | |
7aee9c1f FG |
363 | dest: &mut tokio::io::ReadBuf, |
364 | ) -> Poll<io::Result<()>> { | |
e72062a9 WB |
365 | let this = unsafe { Pin::into_inner_unchecked(self) }; |
366 | loop { | |
367 | match this.future.take() { | |
368 | None => { | |
369 | let mut buffer = mem::take(&mut this.buffer); | |
7aee9c1f | 370 | util::scale_read_buffer(&mut buffer, dest.remaining()); |
e72062a9 WB |
371 | let reader: accessor::FileContentsImpl<T> = this.inner.clone(); |
372 | let at = this.at; | |
1b25fc08 | 373 | let future: Pin<Box<dyn Future<Output = io::Result<ReadResult>>>> = |
e72062a9 | 374 | Box::pin(async move { |
1b25fc08 WB |
375 | let len = reader.read_at(&mut buffer, at).await?; |
376 | io::Result::Ok(ReadResult { len, buffer }) | |
e72062a9 WB |
377 | }); |
378 | // This future has the lifetime from T. Self also has this lifetime and we | |
379 | // store this in a pinned self. T maybe a reference with a non-'static life | |
380 | // time, but then it cannot be a self-reference into Self, so this should be | |
381 | // valid in all cases: | |
382 | this.future = Some(unsafe { mem::transmute(future) }); | |
383 | } | |
384 | Some(mut fut) => match fut.as_mut().poll(cx) { | |
385 | Poll::Pending => { | |
386 | this.future = Some(fut); | |
387 | return Poll::Pending; | |
388 | } | |
389 | Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), | |
1b25fc08 | 390 | Poll::Ready(Ok(ReadResult { len: got, buffer })) => { |
e72062a9 WB |
391 | this.buffer = buffer; |
392 | this.at += got as u64; | |
7aee9c1f FG |
393 | let len = got.min(dest.remaining()); |
394 | dest.put_slice(&this.buffer[..len]); | |
395 | return Poll::Ready(Ok(())); | |
e72062a9 WB |
396 | } |
397 | }, | |
398 | } | |
399 | } | |
400 | } | |
d3a83ee3 WB |
401 | } |
402 | ||
d3a83ee3 | 403 | impl<T: Clone + ReadAt> ReadAt for FileContents<T> { |
e72062a9 WB |
404 | fn start_read_at<'a>( |
405 | self: Pin<&'a Self>, | |
d3a83ee3 | 406 | cx: &mut Context, |
e72062a9 | 407 | buf: &'a mut [u8], |
d3a83ee3 | 408 | offset: u64, |
e72062a9 WB |
409 | ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> { |
410 | unsafe { self.map_unchecked(|this| &this.inner) }.start_read_at(cx, buf, offset) | |
411 | } | |
412 | ||
413 | fn poll_complete<'a>( | |
414 | self: Pin<&'a Self>, | |
415 | op: ReadAtOperation<'a>, | |
416 | ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> { | |
417 | unsafe { self.map_unchecked(|this| &this.inner) }.poll_complete(op) | |
d3a83ee3 WB |
418 | } |
419 | } | |
c3bba169 WB |
420 | |
421 | impl<T: Clone + ReadAt> FileContents<T> { | |
422 | /// Convenience helper for `read_at`: | |
423 | pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> { | |
e72062a9 | 424 | self.inner.read_at(buf, offset).await |
c3bba169 WB |
425 | } |
426 | } |