]>
Commit | Line | Data |
---|---|---|
6cd4f635 WB |
1 | //! Asynchronous `pxar` format handling. |
2 | ||
3 | use std::io; | |
4 | ||
cf12da39 WB |
5 | #[cfg(feature = "tokio-fs")] |
6 | use std::path::Path; | |
7 | ||
6cd4f635 WB |
8 | use crate::decoder::{self, SeqRead}; |
9 | use crate::Entry; | |
10 | ||
11 | /// Asynchronous `pxar` decoder. | |
12 | /// | |
13 | /// This is the `async` version of the `pxar` decoder. | |
14 | #[repr(transparent)] | |
15 | pub struct Decoder<T> { | |
16 | inner: decoder::DecoderImpl<T>, | |
17 | } | |
18 | ||
19 | #[cfg(feature = "futures-io")] | |
cf12da39 | 20 | impl<T: futures::io::AsyncRead> Decoder<FuturesReader<T>> { |
6cd4f635 WB |
21 | /// Decode a `pxar` archive from a `futures::io::AsyncRead` input. |
22 | #[inline] | |
cf12da39 | 23 | pub async fn from_futures(input: T) -> io::Result<Self> { |
6cd4f635 WB |
24 | Decoder::new(FuturesReader::new(input)).await |
25 | } | |
26 | } | |
27 | ||
28 | #[cfg(feature = "tokio-io")] | |
cf12da39 | 29 | impl<T: tokio::io::AsyncRead> Decoder<TokioReader<T>> { |
6cd4f635 WB |
30 | /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input. |
31 | #[inline] | |
cf12da39 | 32 | pub async fn from_tokio(input: T) -> io::Result<Self> { |
6cd4f635 | 33 | Decoder::new(TokioReader::new(input)).await |
cf12da39 WB |
34 | } |
35 | } | |
36 | ||
37 | #[cfg(feature = "tokio-fs")] | |
38 | impl Decoder<TokioReader<tokio::fs::File>> { | |
39 | /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input. | |
40 | #[inline] | |
41 | pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> { | |
42 | Decoder::from_tokio(tokio::fs::File::open(path.as_ref()).await?).await | |
6cd4f635 WB |
43 | } |
44 | } | |
45 | ||
46 | impl<T: SeqRead> Decoder<T> { | |
47 | /// Create an async decoder from an input implementing our internal read interface. | |
48 | pub async fn new(input: T) -> io::Result<Self> { | |
49 | Ok(Self { | |
50 | inner: decoder::DecoderImpl::new(input).await?, | |
51 | }) | |
52 | } | |
53 | ||
afe05f3f WB |
54 | /// Internal helper for `Accessor`. In this case we have the low-level state machine, and the |
55 | /// layer "above" the `Accessor` propagates the actual type (sync vs async). | |
56 | pub(crate) fn from_impl(inner: decoder::DecoderImpl<T>) -> Self { | |
57 | Self { inner } | |
58 | } | |
59 | ||
1b25fc08 WB |
60 | // I would normally agree with clippy, but this is async and we can at most implement Stream, |
61 | // which we do with feature flags... | |
62 | #[allow(clippy::should_implement_trait)] | |
6cd4f635 WB |
63 | /// If this is a directory entry, get the next item inside the directory. |
64 | pub async fn next(&mut self) -> Option<io::Result<Entry>> { | |
65 | self.inner.next_do().await.transpose() | |
66 | } | |
67 | ||
27631c16 WB |
68 | /// Include goodbye tables in iteration. |
69 | pub fn enable_goodbye_entries(&mut self, on: bool) { | |
70 | self.inner.with_goodbye_tables = on; | |
71 | } | |
72 | ||
6cd4f635 WB |
73 | /// Turn this decoder into a `Stream`. |
74 | #[cfg(feature = "futures-io")] | |
75 | pub fn into_stream(self) -> DecoderStream<T> { | |
76 | DecoderStream::new(self) | |
77 | } | |
78 | } | |
79 | ||
80 | #[cfg(feature = "futures-io")] | |
81 | mod stream { | |
82 | use std::future::Future; | |
83 | use std::io; | |
84 | use std::pin::Pin; | |
85 | use std::task::{Context, Poll}; | |
86 | ||
87 | use super::{Entry, SeqRead}; | |
88 | ||
89 | /// A wrapper for the async decoder implementing `futures::stream::Stream`. | |
90 | /// | |
91 | /// As long as streams are poll-based this wrapper is required to turn `async fn next()` into | |
92 | /// `Stream`'s `poll_next()` interface. | |
1b25fc08 | 93 | #[allow(clippy::type_complexity)] // yeah no |
6cd4f635 WB |
94 | pub struct DecoderStream<T> { |
95 | inner: super::Decoder<T>, | |
96 | future: Option<Pin<Box<dyn Future<Output = Option<io::Result<Entry>>>>>>, | |
97 | } | |
98 | ||
99 | impl<T> DecoderStream<T> { | |
100 | pub fn new(inner: super::Decoder<T>) -> Self { | |
101 | Self { | |
102 | inner, | |
103 | future: None, | |
104 | } | |
105 | } | |
106 | } | |
107 | ||
108 | impl<T: SeqRead> futures::stream::Stream for DecoderStream<T> { | |
109 | type Item = io::Result<Entry>; | |
110 | ||
111 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { | |
112 | let this = unsafe { self.get_unchecked_mut() }; | |
113 | loop { | |
114 | if let Some(mut fut) = this.future.take() { | |
115 | match fut.as_mut().poll(cx) { | |
116 | Poll::Ready(res) => return Poll::Ready(res), | |
117 | Poll::Pending => { | |
118 | this.future = Some(fut); | |
119 | return Poll::Pending; | |
120 | } | |
121 | } | |
122 | } | |
123 | unsafe { | |
124 | let fut: Box<dyn Future<Output = _>> = Box::new(this.inner.next()); | |
125 | // Discard the lifetime: | |
126 | let fut: *mut (dyn Future<Output = Option<io::Result<Entry>>> + 'static) = | |
127 | core::mem::transmute(Box::into_raw(fut)); | |
128 | let fut = Box::from_raw(fut); | |
129 | this.future = Some(Pin::new_unchecked(fut)); | |
130 | } | |
131 | } | |
132 | } | |
133 | } | |
134 | } | |
135 | ||
136 | #[cfg(feature = "futures-io")] | |
137 | pub use stream::DecoderStream; | |
138 | ||
6e02c122 FG |
139 | #[cfg(feature = "futures-io")] |
140 | mod fut { | |
141 | use std::io; | |
142 | use std::pin::Pin; | |
143 | use std::task::{Context, Poll}; | |
6cd4f635 | 144 | |
6e02c122 FG |
145 | /// Read adapter for `futures::io::AsyncRead` |
146 | pub struct FuturesReader<T> { | |
147 | inner: T, | |
148 | } | |
6cd4f635 | 149 | |
6e02c122 FG |
150 | impl<T: futures::io::AsyncRead> FuturesReader<T> { |
151 | pub fn new(inner: T) -> Self { | |
152 | Self { inner } | |
153 | } | |
154 | } | |
155 | ||
156 | impl<T: futures::io::AsyncRead> crate::decoder::SeqRead for FuturesReader<T> { | |
157 | fn poll_seq_read( | |
158 | self: Pin<&mut Self>, | |
159 | cx: &mut Context, | |
160 | buf: &mut [u8], | |
161 | ) -> Poll<io::Result<usize>> { | |
162 | unsafe { | |
163 | self.map_unchecked_mut(|this| &mut this.inner) | |
164 | .poll_read(cx, buf) | |
6cd4f635 WB |
165 | } |
166 | } | |
6cd4f635 WB |
167 | } |
168 | } | |
169 | ||
6e02c122 FG |
170 | #[cfg(feature = "futures-io")] |
171 | use fut::FuturesReader; | |
172 | ||
173 | #[cfg(feature = "tokio-io")] | |
174 | mod tok { | |
175 | use std::io; | |
176 | use std::pin::Pin; | |
177 | use std::task::{Context, Poll}; | |
178 | ||
179 | /// Read adapter for `futures::io::AsyncRead` | |
180 | pub struct TokioReader<T> { | |
181 | inner: T, | |
6cd4f635 | 182 | } |
6cd4f635 | 183 | |
6e02c122 FG |
184 | impl<T: tokio::io::AsyncRead> TokioReader<T> { |
185 | pub fn new(inner: T) -> Self { | |
186 | Self { inner } | |
187 | } | |
188 | } | |
189 | ||
190 | impl<T: tokio::io::AsyncRead> crate::decoder::SeqRead for TokioReader<T> { | |
191 | fn poll_seq_read( | |
192 | self: Pin<&mut Self>, | |
193 | cx: &mut Context, | |
194 | buf: &mut [u8], | |
195 | ) -> Poll<io::Result<usize>> { | |
196 | let mut read_buf = tokio::io::ReadBuf::new(buf); | |
197 | unsafe { | |
198 | self.map_unchecked_mut(|this| &mut this.inner) | |
199 | .poll_read(cx, &mut read_buf) | |
200 | .map_ok(|_| read_buf.filled().len()) | |
201 | } | |
202 | } | |
6cd4f635 WB |
203 | } |
204 | } | |
6e02c122 FG |
205 | |
206 | #[cfg(feature = "tokio-io")] | |
207 | use tok::TokioReader; |