]> git.proxmox.com Git - pxar.git/blame - src/accessor/aio.rs
update to tokio 1.0
[pxar.git] / src / accessor / aio.rs
CommitLineData
6cd4f635
WB
1//! Asynchronous `pxar` random-access handling.
2//!
3//! Currently neither tokio nor futures have an `AsyncFileExt` variant.
9de9cca9
WB
4//!
5//! TODO: Implement a locking version for AsyncSeek+AsyncRead files?
d3a83ee3 6
e72062a9 7use std::future::Future;
d3a83ee3 8use std::io;
e72062a9 9use std::mem;
d3a83ee3
WB
10use std::ops::Range;
11use std::os::unix::fs::FileExt;
12use std::path::Path;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{Context, Poll};
16
e72062a9 17use crate::accessor::{self, cache::Cache, MaybeReady, ReadAt, ReadAtOperation};
afe05f3f 18use crate::decoder::aio::Decoder;
06070d26 19use crate::format::GoodbyeItem;
e72062a9 20use crate::util;
d3a83ee3
WB
21use crate::Entry;
22
23use super::sync::{FileReader, FileRefReader};
24
25/// Asynchronous `pxar` random-access decoder.
26///
27///
28/// This is the `async` version of the `pxar` accessor.
29#[repr(transparent)]
30pub struct Accessor<T> {
31 inner: accessor::AccessorImpl<T>,
32}
33
34impl<T: FileExt> Accessor<FileReader<T>> {
35 /// Decode a `pxar` archive from a standard file implementing `FileExt`.
36 ///
37 /// Note that "plain files" don't normally block on `read(2)` operations anyway, and tokio has
38 /// no support for asynchronous `read_at` operations, so we allow creating an accessor backed
39 /// by a blocking file.
40 #[inline]
41 pub async fn from_file_and_size(input: T, size: u64) -> io::Result<Self> {
42 Accessor::new(FileReader::new(input), size).await
43 }
44}
45
46impl Accessor<FileReader<std::fs::File>> {
47 /// Decode a `pxar` archive from a regular `std::io::File` input.
48 #[inline]
49 pub async fn from_file(input: std::fs::File) -> io::Result<Self> {
50 let size = input.metadata()?.len();
51 Accessor::from_file_and_size(input, size).await
52 }
53
54 /// Convenience shortcut for `File::open` followed by `Accessor::from_file`.
55 pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
56 Self::from_file(std::fs::File::open(path.as_ref())?).await
57 }
58}
59
60impl<T: Clone + std::ops::Deref<Target = std::fs::File>> Accessor<FileRefReader<T>> {
61 /// Open an `Arc` or `Rc` of `File`.
62 pub async fn from_file_ref(input: T) -> io::Result<Self> {
63 let size = input.deref().metadata()?.len();
64 Accessor::from_file_ref_and_size(input, size).await
65 }
66}
67
68impl<T> Accessor<FileRefReader<T>>
69where
70 T: Clone + std::ops::Deref,
71 T::Target: FileExt,
72{
73 /// Open an `Arc` or `Rc` of `File`.
74 pub async fn from_file_ref_and_size(
75 input: T,
76 size: u64,
77 ) -> io::Result<Accessor<FileRefReader<T>>> {
78 Accessor::new(FileRefReader::new(input), size).await
79 }
80}
81
82impl<T: ReadAt> Accessor<T> {
83 /// Create a *blocking* random-access decoder from an input implementing our internal read
84 /// interface.
85 ///
86 /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is
87 /// not allowed to use the `Waker`, as this will cause a `panic!`.
88 pub async fn new(input: T, size: u64) -> io::Result<Self> {
89 Ok(Self {
90 inner: accessor::AccessorImpl::new(input, size).await?,
91 })
92 }
93
94 /// Open a directory handle to the root of the pxar archive.
1b25fc08 95 pub async fn open_root_ref(&self) -> io::Result<Directory<&dyn ReadAt>> {
d3a83ee3
WB
96 Ok(Directory::new(self.inner.open_root_ref().await?))
97 }
98
99 pub fn set_goodbye_table_cache<C>(&mut self, cache: Option<C>)
100 where
101 C: Cache<u64, [GoodbyeItem]> + Send + Sync + 'static,
102 {
103 self.inner
104 .set_goodbye_table_cache(cache.map(|cache| Arc::new(cache) as _))
105 }
106
107 /// Get the full archive size we're allowed to access.
108 #[inline]
109 pub fn size(&self) -> u64 {
110 self.inner.size()
111 }
112}
113
114impl<T: Clone + ReadAt> Accessor<T> {
115 pub async fn open_root(&self) -> io::Result<Directory<T>> {
116 Ok(Directory::new(self.inner.open_root().await?))
117 }
118
119 /// Allow opening a directory at a specified offset.
1b25fc08
WB
120 ///
121 /// # Safety
122 ///
123 /// This should only be used with offsets known to point to the end of a directory, otherwise
124 /// this usually fails, unless the data otherwise happens to look like a valid directory.
d3a83ee3
WB
125 pub async unsafe fn open_dir_at_end(&self, offset: u64) -> io::Result<Directory<T>> {
126 Ok(Directory::new(self.inner.open_dir_at_end(offset).await?))
127 }
128
129 /// Allow opening a regular file from a specified range.
1b25fc08
WB
130 ///
131 /// # Safety
132 ///
133 /// Should only be used with `entry_range_info`s originating from the same archive, otherwise
134 /// the result will be undefined and likely fail (or contain unexpected data).
06070d26
WB
135 pub async unsafe fn open_file_at_range(
136 &self,
137 entry_range_info: &accessor::EntryRangeInfo,
138 ) -> io::Result<FileEntry<T>> {
d3a83ee3 139 Ok(FileEntry {
06070d26 140 inner: self.inner.open_file_at_range(entry_range_info).await?,
d3a83ee3
WB
141 })
142 }
143
144 /// Allow opening arbitrary contents from a specific range.
1b25fc08
WB
145 ///
146 /// # Safety
147 ///
148 /// This will provide a reader over an arbitrary range of the archive file, so unless this
149 /// comes from a actual file entry data, the contents might not make much sense.
d3a83ee3
WB
150 pub unsafe fn open_contents_at_range(&self, range: Range<u64>) -> FileContents<T> {
151 FileContents {
152 inner: self.inner.open_contents_at_range(range),
153 at: 0,
e72062a9
WB
154 buffer: Vec::new(),
155 future: None,
d3a83ee3
WB
156 }
157 }
6bfadb8a
WB
158
159 /// Following a hardlink.
06070d26 160 pub async fn follow_hardlink(&self, entry: &FileEntry<T>) -> io::Result<FileEntry<T>> {
6bfadb8a 161 Ok(FileEntry {
06070d26 162 inner: self.inner.follow_hardlink(&entry.inner).await?,
6bfadb8a
WB
163 })
164 }
d3a83ee3
WB
165}
166
167/// A pxar directory entry. This provdies blocking access to the contents of a directory.
168#[repr(transparent)]
169pub struct Directory<T> {
170 inner: accessor::DirectoryImpl<T>,
171}
172
173impl<T: Clone + ReadAt> Directory<T> {
174 fn new(inner: accessor::DirectoryImpl<T>) -> Self {
175 Self { inner }
176 }
177
178 /// Get a decoder for the directory contents.
179 pub async fn decode_full(&self) -> io::Result<Decoder<accessor::SeqReadAtAdapter<T>>> {
180 Ok(Decoder::from_impl(self.inner.decode_full().await?))
181 }
182
183 /// Get a `FileEntry` referencing the directory itself.
184 ///
185 /// Helper function for our fuse implementation.
186 pub async fn lookup_self(&self) -> io::Result<FileEntry<T>> {
187 Ok(FileEntry {
188 inner: self.inner.lookup_self().await?,
189 })
190 }
191
4af15944
WB
192 /// Lookup an entry starting from this current directory.
193 ///
194 /// For convenience, this already resolves paths with multiple components.
d3a83ee3
WB
195 pub async fn lookup<P: AsRef<Path>>(&self, path: P) -> io::Result<Option<FileEntry<T>>> {
196 if let Some(file_entry) = self.inner.lookup(path.as_ref()).await? {
197 Ok(Some(FileEntry { inner: file_entry }))
198 } else {
199 Ok(None)
200 }
201 }
202
203 /// Get an iterator over the directory's contents.
1b25fc08 204 pub fn read_dir(&self) -> ReadDir<T> {
d3a83ee3
WB
205 ReadDir {
206 inner: self.inner.read_dir(),
207 }
208 }
209
210 /// Get the number of entries in this directory.
211 #[inline]
212 pub fn entry_count(&self) -> usize {
213 self.inner.entry_count()
214 }
215}
216
217/// A file entry in a direcotry, retrieved via the `lookup` method or from
218/// `DirEntry::decode_entry``.
93fa37fb 219#[derive(Clone)]
d3a83ee3
WB
220#[repr(transparent)]
221pub struct FileEntry<T: Clone + ReadAt> {
222 inner: accessor::FileEntryImpl<T>,
223}
224
225impl<T: Clone + ReadAt> FileEntry<T> {
226 /// Get a handle to the subdirectory this file entry points to, if it is in fact a directory.
227 pub async fn enter_directory(&self) -> io::Result<Directory<T>> {
228 Ok(Directory::new(self.inner.enter_directory().await?))
229 }
230
231 /// For use with unsafe accessor methods.
232 pub fn content_range(&self) -> io::Result<Option<Range<u64>>> {
233 self.inner.content_range()
234 }
235
236 pub async fn contents(&self) -> io::Result<FileContents<T>> {
237 Ok(FileContents {
238 inner: self.inner.contents().await?,
239 at: 0,
e72062a9
WB
240 buffer: Vec::new(),
241 future: None,
d3a83ee3
WB
242 })
243 }
244
245 #[inline]
246 pub fn into_entry(self) -> Entry {
247 self.inner.into_entry()
248 }
249
250 #[inline]
251 pub fn entry(&self) -> &Entry {
252 &self.inner.entry()
253 }
254
255 /// Exposed for raw by-offset access methods (use with `open_dir_at_end`).
256 #[inline]
06070d26
WB
257 pub fn entry_range_info(&self) -> &accessor::EntryRangeInfo {
258 self.inner.entry_range_info()
d3a83ee3
WB
259 }
260}
261
262impl<T: Clone + ReadAt> std::ops::Deref for FileEntry<T> {
263 type Target = Entry;
264
265 fn deref(&self) -> &Self::Target {
266 self.entry()
267 }
268}
269
270/// An iterator over the contents of a `Directory`.
271#[repr(transparent)]
272pub struct ReadDir<'a, T> {
273 inner: accessor::ReadDirImpl<'a, T>,
274}
275
276impl<'a, T: Clone + ReadAt> ReadDir<'a, T> {
277 /// Efficient alternative to `Iterator::skip`.
278 #[inline]
279 pub fn skip(self, n: usize) -> Self {
280 Self {
281 inner: self.inner.skip(n),
282 }
283 }
284
285 /// Efficient alternative to `Iterator::count`.
286 #[inline]
287 pub fn count(self) -> usize {
288 self.inner.count()
289 }
290
291 pub async fn next(&mut self) -> Option<io::Result<DirEntry<'a, T>>> {
292 match self.inner.next().await {
293 Ok(Some(inner)) => Some(Ok(DirEntry { inner })),
294 Ok(None) => None,
295 Err(err) => Some(Err(err)),
296 }
297 }
298}
299
300/// A directory entry. When iterating through the contents of a directory we first get access to
301/// the file name. The remaining information can be decoded afterwards.
302#[repr(transparent)]
303pub struct DirEntry<'a, T: Clone + ReadAt> {
304 inner: accessor::DirEntryImpl<'a, T>,
305}
306
307impl<'a, T: Clone + ReadAt> DirEntry<'a, T> {
308 /// Get the current file name.
309 pub fn file_name(&self) -> &Path {
310 self.inner.file_name()
311 }
312
313 /// Decode the entry.
314 ///
315 /// When iterating over a directory, the file name is read separately from the file attributes,
316 /// so only the file name is available here, while the attributes still need to be decoded.
317 pub async fn decode_entry(&self) -> io::Result<FileEntry<T>> {
318 self.inner
319 .decode_entry()
320 .await
321 .map(|inner| FileEntry { inner })
322 }
323
324 /// Exposed for raw by-offset access methods.
325 #[inline]
06070d26
WB
326 pub fn entry_range_info(&self) -> &accessor::EntryRangeInfo {
327 self.inner.entry_range_info()
d3a83ee3
WB
328 }
329}
330
1b25fc08
WB
331/// File content read future result.
332struct ReadResult {
333 len: usize,
334 buffer: Vec<u8>,
335}
336
d3a83ee3
WB
337/// A reader for file contents.
338pub struct FileContents<T> {
339 inner: accessor::FileContentsImpl<T>,
340 at: u64,
e72062a9 341 buffer: Vec<u8>,
1b25fc08 342 future: Option<Pin<Box<dyn Future<Output = io::Result<ReadResult>> + 'static>>>,
e72062a9
WB
343}
344
345// We lose `Send` via the boxed trait object and don't want to force the trait object to
346// potentially be more strict than `T`, so we leave it as it is ans implement Send and Sync
347// depending on T.
348unsafe impl<T: Send> Send for FileContents<T> {}
349unsafe impl<T: Sync> Sync for FileContents<T> {}
350
351#[cfg(any(feature = "futures-io", feature = "tokio-io"))]
352impl<T: Clone + ReadAt> FileContents<T> {
353 /// Similar implementation exists for SeqReadAtAdapter in mod.rs
354 fn do_poll_read(
355 self: Pin<&mut Self>,
356 cx: &mut Context,
357 dest: &mut [u8],
358 ) -> Poll<io::Result<usize>> {
359 let this = unsafe { Pin::into_inner_unchecked(self) };
360 loop {
361 match this.future.take() {
362 None => {
363 let mut buffer = mem::take(&mut this.buffer);
364 util::scale_read_buffer(&mut buffer, dest.len());
365 let reader: accessor::FileContentsImpl<T> = this.inner.clone();
366 let at = this.at;
1b25fc08 367 let future: Pin<Box<dyn Future<Output = io::Result<ReadResult>>>> =
e72062a9 368 Box::pin(async move {
1b25fc08
WB
369 let len = reader.read_at(&mut buffer, at).await?;
370 io::Result::Ok(ReadResult { len, buffer })
e72062a9
WB
371 });
372 // This future has the lifetime from T. Self also has this lifetime and we
373 // store this in a pinned self. T maybe a reference with a non-'static life
374 // time, but then it cannot be a self-reference into Self, so this should be
375 // valid in all cases:
376 this.future = Some(unsafe { mem::transmute(future) });
377 }
378 Some(mut fut) => match fut.as_mut().poll(cx) {
379 Poll::Pending => {
380 this.future = Some(fut);
381 return Poll::Pending;
382 }
383 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
1b25fc08 384 Poll::Ready(Ok(ReadResult { len: got, buffer })) => {
e72062a9
WB
385 this.buffer = buffer;
386 this.at += got as u64;
387 let len = got.min(dest.len());
388 dest[..len].copy_from_slice(&this.buffer[..len]);
389 return Poll::Ready(Ok(len));
390 }
391 },
392 }
393 }
394 }
d3a83ee3
WB
395}
396
397#[cfg(feature = "futures-io")]
398impl<T: Clone + ReadAt> futures::io::AsyncRead for FileContents<T> {
399 fn poll_read(
400 self: Pin<&mut Self>,
401 cx: &mut Context,
402 buf: &mut [u8],
403 ) -> Poll<io::Result<usize>> {
e72062a9 404 Self::do_poll_read(self, cx, buf)
d3a83ee3
WB
405 }
406}
407
408#[cfg(feature = "tokio-io")]
409impl<T: Clone + ReadAt> tokio::io::AsyncRead for FileContents<T> {
410 fn poll_read(
411 self: Pin<&mut Self>,
412 cx: &mut Context,
6e02c122
FG
413 buf: &mut tokio::io::ReadBuf,
414 ) -> Poll<io::Result<()>> {
415 Self::do_poll_read(self, cx, &mut buf.initialize_unfilled())
416 .map_ok(|bytes| { buf.set_filled(bytes); () })
d3a83ee3
WB
417 }
418}
419
420impl<T: Clone + ReadAt> ReadAt for FileContents<T> {
e72062a9
WB
421 fn start_read_at<'a>(
422 self: Pin<&'a Self>,
d3a83ee3 423 cx: &mut Context,
e72062a9 424 buf: &'a mut [u8],
d3a83ee3 425 offset: u64,
e72062a9
WB
426 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
427 unsafe { self.map_unchecked(|this| &this.inner) }.start_read_at(cx, buf, offset)
428 }
429
430 fn poll_complete<'a>(
431 self: Pin<&'a Self>,
432 op: ReadAtOperation<'a>,
433 ) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
434 unsafe { self.map_unchecked(|this| &this.inner) }.poll_complete(op)
d3a83ee3
WB
435 }
436}
c3bba169
WB
437
438impl<T: Clone + ReadAt> FileContents<T> {
439 /// Convenience helper for `read_at`:
440 pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<usize> {
e72062a9 441 self.inner.read_at(buf, offset).await
c3bba169
WB
442 }
443}