]>
Commit | Line | Data |
---|---|---|
6cd4f635 WB |
1 | //! Asynchronous `pxar` random-access handling. |
2 | //! | |
3 | //! Currently neither tokio nor futures have an `AsyncFileExt` variant. | |
9de9cca9 WB |
4 | //! |
5 | //! TODO: Implement a locking version for AsyncSeek+AsyncRead files? | |
d3a83ee3 | 6 | |
e72062a9 | 7 | use std::future::Future; |
d3a83ee3 | 8 | use std::io; |
e72062a9 | 9 | use std::mem; |
d3a83ee3 WB |
10 | use std::ops::Range; |
11 | use std::os::unix::fs::FileExt; | |
12 | use std::path::Path; | |
13 | use std::pin::Pin; | |
14 | use std::sync::Arc; | |
15 | use std::task::{Context, Poll}; | |
16 | ||
e72062a9 | 17 | use crate::accessor::{self, cache::Cache, MaybeReady, ReadAt, ReadAtOperation}; |
afe05f3f | 18 | use crate::decoder::aio::Decoder; |
06070d26 | 19 | use crate::format::GoodbyeItem; |
e72062a9 | 20 | use crate::util; |
d3a83ee3 WB |
21 | use crate::Entry; |
22 | ||
23 | use super::sync::{FileReader, FileRefReader}; | |
24 | ||
25 | /// Asynchronous `pxar` random-access decoder. | |
26 | /// | |
27 | /// | |
28 | /// This is the `async` version of the `pxar` accessor. | |
29 | #[repr(transparent)] | |
30 | pub struct Accessor<T> { | |
31 | inner: accessor::AccessorImpl<T>, | |
32 | } | |
33 | ||
34 | impl<T: FileExt> Accessor<FileReader<T>> { | |
35 | /// Decode a `pxar` archive from a standard file implementing `FileExt`. | |
36 | /// | |
37 | /// Note that "plain files" don't normally block on `read(2)` operations anyway, and tokio has | |
38 | /// no support for asynchronous `read_at` operations, so we allow creating an accessor backed | |
39 | /// by a blocking file. | |
40 | #[inline] | |
41 | pub async fn from_file_and_size(input: T, size: u64) -> io::Result<Self> { | |
42 | Accessor::new(FileReader::new(input), size).await | |
43 | } | |
44 | } | |
45 | ||
46 | impl Accessor<FileReader<std::fs::File>> { | |
47 | /// Decode a `pxar` archive from a regular `std::io::File` input. | |
48 | #[inline] | |
49 | pub async fn from_file(input: std::fs::File) -> io::Result<Self> { | |
50 | let size = input.metadata()?.len(); | |
51 | Accessor::from_file_and_size(input, size).await | |
52 | } | |
53 | ||
54 | /// Convenience shortcut for `File::open` followed by `Accessor::from_file`. | |
55 | pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> { | |
56 | Self::from_file(std::fs::File::open(path.as_ref())?).await | |
57 | } | |
58 | } | |
59 | ||
60 | impl<T: Clone + std::ops::Deref<Target = std::fs::File>> Accessor<FileRefReader<T>> { | |
61 | /// Open an `Arc` or `Rc` of `File`. | |
62 | pub async fn from_file_ref(input: T) -> io::Result<Self> { | |
63 | let size = input.deref().metadata()?.len(); | |
64 | Accessor::from_file_ref_and_size(input, size).await | |
65 | } | |
66 | } | |
67 | ||
68 | impl<T> Accessor<FileRefReader<T>> | |
69 | where | |
70 | T: Clone + std::ops::Deref, | |
71 | T::Target: FileExt, | |
72 | { | |
73 | /// Open an `Arc` or `Rc` of `File`. | |
74 | pub async fn from_file_ref_and_size( | |
75 | input: T, | |
76 | size: u64, | |
77 | ) -> io::Result<Accessor<FileRefReader<T>>> { | |
78 | Accessor::new(FileRefReader::new(input), size).await | |
79 | } | |
80 | } | |
81 | ||
82 | impl<T: ReadAt> Accessor<T> { | |
83 | /// Create a *blocking* random-access decoder from an input implementing our internal read | |
84 | /// interface. | |
85 | /// | |
86 | /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is | |
87 | /// not allowed to use the `Waker`, as this will cause a `panic!`. | |
88 | pub async fn new(input: T, size: u64) -> io::Result<Self> { | |
89 | Ok(Self { | |
90 | inner: accessor::AccessorImpl::new(input, size).await?, | |
91 | }) | |
92 | } | |
93 | ||
94 | /// Open a directory handle to the root of the pxar archive. | |
1b25fc08 | 95 | pub async fn open_root_ref(&self) -> io::Result<Directory<&dyn ReadAt>> { |
d3a83ee3 WB |
96 | Ok(Directory::new(self.inner.open_root_ref().await?)) |
97 | } | |
98 | ||
99 | pub fn set_goodbye_table_cache<C>(&mut self, cache: Option<C>) | |
100 | where | |
101 | C: Cache<u64, [GoodbyeItem]> + Send + Sync + 'static, | |
102 | { | |
103 | self.inner | |
104 | .set_goodbye_table_cache(cache.map(|cache| Arc::new(cache) as _)) | |
105 | } | |
106 | ||
107 | /// Get the full archive size we're allowed to access. | |
108 | #[inline] | |
109 | pub fn size(&self) -> u64 { | |
110 | self.inner.size() | |
111 | } | |
112 | } | |
113 | ||
114 | impl<T: Clone + ReadAt> Accessor<T> { | |
115 | pub async fn open_root(&self) -> io::Result<Directory<T>> { | |
116 | Ok(Directory::new(self.inner.open_root().await?)) | |
117 | } | |
118 | ||
119 | /// Allow opening a directory at a specified offset. | |
1b25fc08 WB |
120 | /// |
121 | /// # Safety | |
122 | /// | |
123 | /// This should only be used with offsets known to point to the end of a directory, otherwise | |
124 | /// this usually fails, unless the data otherwise happens to look like a valid directory. | |
d3a83ee3 WB |
125 | pub async unsafe fn open_dir_at_end(&self, offset: u64) -> io::Result<Directory<T>> { |
126 | Ok(Directory::new(self.inner.open_dir_at_end(offset).await?)) | |
127 | } | |
128 | ||
129 | /// Allow opening a regular file from a specified range. | |
1b25fc08 WB |
130 | /// |
131 | /// # Safety | |
132 | /// | |
133 | /// Should only be used with `entry_range_info`s originating from the same archive, otherwise | |
134 | /// the result will be undefined and likely fail (or contain unexpected data). | |
06070d26 WB |
135 | pub async unsafe fn open_file_at_range( |
136 | &self, | |
137 | entry_range_info: &accessor::EntryRangeInfo, | |
138 | ) -> io::Result<FileEntry<T>> { | |
d3a83ee3 | 139 | Ok(FileEntry { |
06070d26 | 140 | inner: self.inner.open_file_at_range(entry_range_info).await?, |
d3a83ee3 WB |
141 | }) |
142 | } | |
143 | ||
144 | /// Allow opening arbitrary contents from a specific range. | |
1b25fc08 WB |
145 | /// |
146 | /// # Safety | |
147 | /// | |
148 | /// This will provide a reader over an arbitrary range of the archive file, so unless this | |
149 | /// comes from a actual file entry data, the contents might not make much sense. | |
d3a83ee3 WB |
150 | pub unsafe fn open_contents_at_range(&self, range: Range<u64>) -> FileContents<T> { |
151 | FileContents { | |
152 | inner: self.inner.open_contents_at_range(range), | |
153 | at: 0, | |
e72062a9 WB |
154 | buffer: Vec::new(), |
155 | future: None, | |
d3a83ee3 WB |
156 | } |
157 | } | |
6bfadb8a WB |
158 | |
159 | /// Following a hardlink. | |
06070d26 | 160 | pub async fn follow_hardlink(&self, entry: &FileEntry<T>) -> io::Result<FileEntry<T>> { |
6bfadb8a | 161 | Ok(FileEntry { |
06070d26 | 162 | inner: self.inner.follow_hardlink(&entry.inner).await?, |
6bfadb8a WB |
163 | }) |
164 | } | |
d3a83ee3 WB |
165 | } |
166 | ||
167 | /// A pxar directory entry. This provdies blocking access to the contents of a directory. | |
168 | #[repr(transparent)] | |
169 | pub struct Directory<T> { | |
170 | inner: accessor::DirectoryImpl<T>, | |
171 | } | |
172 | ||
173 | impl<T: Clone + ReadAt> Directory<T> { | |
174 | fn new(inner: accessor::DirectoryImpl<T>) -> Self { | |
175 | Self { inner } | |
176 | } | |
177 | ||
178 | /// Get a decoder for the directory contents. | |
179 | pub async fn decode_full(&self) -> io::Result<Decoder<accessor::SeqReadAtAdapter<T>>> { | |
180 | Ok(Decoder::from_impl(self.inner.decode_full().await?)) | |
181 | } | |
182 | ||
183 | /// Get a `FileEntry` referencing the directory itself. | |
184 | /// | |
185 | /// Helper function for our fuse implementation. | |
186 | pub async fn lookup_self(&self) -> io::Result<FileEntry<T>> { | |
187 | Ok(FileEntry { | |
188 | inner: self.inner.lookup_self().await?, | |
189 | }) | |
190 | } | |
191 | ||
4af15944 WB |
192 | /// Lookup an entry starting from this current directory. |
193 | /// | |
194 | /// For convenience, this already resolves paths with multiple components. | |
d3a83ee3 WB |
195 | pub async fn lookup<P: AsRef<Path>>(&self, path: P) -> io::Result<Option<FileEntry<T>>> { |
196 | if let Some(file_entry) = self.inner.lookup(path.as_ref()).await? { | |
197 | Ok(Some(FileEntry { inner: file_entry })) | |
198 | } else { | |
199 | Ok(None) | |
200 | } | |
201 | } | |
202 | ||
203 | /// Get an iterator over the directory's contents. | |
1b25fc08 | 204 | pub fn read_dir(&self) -> ReadDir<T> { |
d3a83ee3 WB |
205 | ReadDir { |
206 | inner: self.inner.read_dir(), | |
207 | } | |
208 | } | |
209 | ||
210 | /// Get the number of entries in this directory. | |
211 | #[inline] | |
212 | pub fn entry_count(&self) -> usize { | |
213 | self.inner.entry_count() | |
214 | } | |
215 | } | |
216 | ||
217 | /// A file entry in a direcotry, retrieved via the `lookup` method or from | |
218 | /// `DirEntry::decode_entry``. | |
93fa37fb | 219 | #[derive(Clone)] |
d3a83ee3 WB |
220 | #[repr(transparent)] |
221 | pub struct FileEntry<T: Clone + ReadAt> { | |
222 | inner: accessor::FileEntryImpl<T>, | |
223 | } | |
224 | ||
225 | impl<T: Clone + ReadAt> FileEntry<T> { | |
226 | /// Get a handle to the subdirectory this file entry points to, if it is in fact a directory. | |
227 | pub async fn enter_directory(&self) -> io::Result<Directory<T>> { | |
228 | Ok(Directory::new(self.inner.enter_directory().await?)) | |
229 | } | |
230 | ||
231 | /// For use with unsafe accessor methods. | |
232 | pub fn content_range(&self) -> io::Result<Option<Range<u64>>> { | |
233 | self.inner.content_range() | |
234 | } | |
235 | ||
236 | pub async fn contents(&self) -> io::Result<FileContents<T>> { | |
237 | Ok(FileContents { | |
238 | inner: self.inner.contents().await?, | |
239 | at: 0, | |
e72062a9 WB |
240 | buffer: Vec::new(), |
241 | future: None, | |
d3a83ee3 WB |
242 | }) |
243 | } | |
244 | ||
245 | #[inline] | |
246 | pub fn into_entry(self) -> Entry { | |
247 | self.inner.into_entry() | |
248 | } | |
249 | ||
250 | #[inline] | |
251 | pub fn entry(&self) -> &Entry { | |
252 | &self.inner.entry() | |
253 | } | |
254 | ||
255 | /// Exposed for raw by-offset access methods (use with `open_dir_at_end`). | |
256 | #[inline] | |
06070d26 WB |
257 | pub fn entry_range_info(&self) -> &accessor::EntryRangeInfo { |
258 | self.inner.entry_range_info() | |
d3a83ee3 WB |
259 | } |
260 | } | |
261 | ||
262 | impl<T: Clone + ReadAt> std::ops::Deref for FileEntry<T> { | |
263 | type Target = Entry; | |
264 | ||
265 | fn deref(&self) -> &Self::Target { | |
266 | self.entry() | |
267 | } | |
268 | } | |
269 | ||
270 | /// An iterator over the contents of a `Directory`. | |
271 | #[repr(transparent)] | |
272 | pub struct ReadDir<'a, T> { | |
273 | inner: accessor::ReadDirImpl<'a, T>, | |
274 | } | |
275 | ||
276 | impl<'a, T: Clone + ReadAt> ReadDir<'a, T> { | |
277 | /// Efficient alternative to `Iterator::skip`. | |
278 | #[inline] | |
279 | pub fn skip(self, n: usize) -> Self { | |
280 | Self { | |
281 | inner: self.inner.skip(n), | |
282 | } | |
283 | } | |
284 | ||
285 | /// Efficient alternative to `Iterator::count`. | |
286 | #[inline] | |
287 | pub fn count(self) -> usize { | |
288 | self.inner.count() | |
289 | } | |
290 | ||
291 | pub async fn next(&mut self) -> Option<io::Result<DirEntry<'a, T>>> { | |
292 | match self.inner.next().await { | |
293 | Ok(Some(inner)) => Some(Ok(DirEntry { inner })), | |
294 | Ok(None) => None, | |
295 | Err(err) => Some(Err(err)), | |
296 | } | |
297 | } | |
298 | } | |
299 | ||
300 | /// A directory entry. When iterating through the contents of a directory we first get access to | |
301 | /// the file name. The remaining information can be decoded afterwards. | |
302 | #[repr(transparent)] | |
303 | pub struct DirEntry<'a, T: Clone + ReadAt> { | |
304 | inner: accessor::DirEntryImpl<'a, T>, | |
305 | } | |
306 | ||
307 | impl<'a, T: Clone + ReadAt> DirEntry<'a, T> { | |
308 | /// Get the current file name. | |
309 | pub fn file_name(&self) -> &Path { | |
310 | self.inner.file_name() | |
311 | } | |
312 | ||
313 | /// Decode the entry. | |
314 | /// | |
315 | /// When iterating over a directory, the file name is read separately from the file attributes, | |
316 | /// so only the file name is available here, while the attributes still need to be decoded. | |
317 | pub async fn decode_entry(&self) -> io::Result<FileEntry<T>> { | |
318 | self.inner | |
319 | .decode_entry() | |
320 | .await | |
321 | .map(|inner| FileEntry { inner }) | |
322 | } | |
323 | ||
324 | /// Exposed for raw by-offset access methods. | |
325 | #[inline] | |
06070d26 WB |
326 | pub fn entry_range_info(&self) -> &accessor::EntryRangeInfo { |
327 | self.inner.entry_range_info() | |
d3a83ee3 WB |
328 | } |
329 | } | |
330 | ||
1b25fc08 WB |
331 | /// File content read future result. |
332 | struct ReadResult { | |
333 | len: usize, | |
334 | buffer: Vec<u8>, | |
335 | } | |
336 | ||
d3a83ee3 WB |
337 | /// A reader for file contents. |
338 | pub struct FileContents<T> { | |
339 | inner: accessor::FileContentsImpl<T>, | |
340 | at: u64, | |
e72062a9 | 341 | buffer: Vec<u8>, |
1b25fc08 | 342 | future: Option<Pin<Box<dyn Future<Output = io::Result<ReadResult>> + 'static>>>, |
e72062a9 WB |
343 | } |
344 | ||
345 | // We lose `Send` via the boxed trait object and don't want to force the trait object to | |
346 | // potentially be more strict than `T`, so we leave it as it is ans implement Send and Sync | |
347 | // depending on T. | |
348 | unsafe impl<T: Send> Send for FileContents<T> {} | |
349 | unsafe impl<T: Sync> Sync for FileContents<T> {} | |
350 | ||
351 | #[cfg(any(feature = "futures-io", feature = "tokio-io"))] | |
352 | impl<T: Clone + ReadAt> FileContents<T> { | |
353 | /// Similar implementation exists for SeqReadAtAdapter in mod.rs | |
354 | fn do_poll_read( | |
355 | self: Pin<&mut Self>, | |
356 | cx: &mut Context, | |
357 | dest: &mut [u8], | |
358 | ) -> Poll<io::Result<usize>> { | |
359 | let this = unsafe { Pin::into_inner_unchecked(self) }; | |
360 | loop { | |
361 | match this.future.take() { | |
362 | None => { | |
363 | let mut buffer = mem::take(&mut this.buffer); | |
364 | util::scale_read_buffer(&mut buffer, dest.len()); | |
365 | let reader: accessor::FileContentsImpl<T> = this.inner.clone(); | |
366 | let at = this.at; | |
1b25fc08 | 367 | let future: Pin<Box<dyn Future<Output = io::Result<ReadResult>>>> = |
e72062a9 | 368 | Box::pin(async move { |
1b25fc08 WB |
369 | let len = reader.read_at(&mut buffer, at).await?; |
370 | io::Result::Ok(ReadResult { len, buffer }) | |
e72062a9 WB |
371 | }); |
372 | // This future has the lifetime from T. Self also has this lifetime and we | |
373 | // store this in a pinned self. T maybe a reference with a non-'static life | |
374 | // time, but then it cannot be a self-reference into Self, so this should be | |
375 | // valid in all cases: | |
376 | this.future = Some(unsafe { mem::transmute(future) }); | |
377 | } | |
378 | Some(mut fut) => match fut.as_mut().poll(cx) { | |
379 | Poll::Pending => { | |
380 | this.future = Some(fut); | |
381 | return Poll::Pending; | |
382 | } | |
383 | Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), | |
1b25fc08 | 384 | Poll::Ready(Ok(ReadResult { len: got, buffer })) => { |
e72062a9 WB |
385 | this.buffer = buffer; |
386 | this.at += got as u64; | |
387 | let len = got.min(dest.len()); | |
388 | dest[..len].copy_from_slice(&this.buffer[..len]); | |
389 | return Poll::Ready(Ok(len)); | |
390 | } | |
391 | }, | |
392 | } | |
393 | } | |
394 | } | |
d3a83ee3 WB |
395 | } |
396 | ||
397 | #[cfg(feature = "futures-io")] | |
398 | impl<T: Clone + ReadAt> futures::io::AsyncRead for FileContents<T> { | |
399 | fn poll_read( | |
400 | self: Pin<&mut Self>, | |
401 | cx: &mut Context, | |
402 | buf: &mut [u8], | |
403 | ) -> Poll<io::Result<usize>> { | |
e72062a9 | 404 | Self::do_poll_read(self, cx, buf) |
d3a83ee3 WB |
405 | } |
406 | } | |
407 | ||
408 | #[cfg(feature = "tokio-io")] | |
409 | impl<T: Clone + ReadAt> tokio::io::AsyncRead for FileContents<T> { | |
410 | fn poll_read( | |
411 | self: Pin<&mut Self>, | |
412 | cx: &mut Context, | |
6e02c122 FG |
413 | buf: &mut tokio::io::ReadBuf, |
414 | ) -> Poll<io::Result<()>> { | |
415 | Self::do_poll_read(self, cx, &mut buf.initialize_unfilled()) | |
416 | .map_ok(|bytes| { buf.set_filled(bytes); () }) | |
d3a83ee3 WB |
417 | } |
418 | } | |
419 | ||
420 | impl<T: Clone + ReadAt> ReadAt for FileContents<T> { | |
e72062a9 WB |
421 | fn start_read_at<'a>( |
422 | self: Pin<&'a Self>, | |
d3a83ee3 | 423 | cx: &mut Context, |
e72062a9 | 424 | buf: &'a mut [u8], |
d3a83ee3 | 425 | offset: u64, |
e72062a9 WB |
426 | ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> { |
427 | unsafe { self.map_unchecked(|this| &this.inner) }.start_read_at(cx, buf, offset) | |
428 | } | |
429 | ||
430 | fn poll_complete<'a>( | |
431 | self: Pin<&'a Self>, | |
432 | op: ReadAtOperation<'a>, | |
433 | ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> { | |
434 | unsafe { self.map_unchecked(|this| &this.inner) }.poll_complete(op) | |
d3a83ee3 WB |
435 | } |
436 | } | |
c3bba169 WB |
437 | |
438 | impl<T: Clone + ReadAt> FileContents<T> { | |
439 | /// Convenience helper for `read_at`: | |
440 | pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> { | |
e72062a9 | 441 | self.inner.read_at(buf, offset).await |
c3bba169 WB |
442 | } |
443 | } |