]> git.proxmox.com Git - pxar.git/blame - src/decoder/aio.rs
update to tokio 1.0
[pxar.git] / src / decoder / aio.rs
CommitLineData
6cd4f635
WB
1//! Asynchronous `pxar` format handling.
2
3use std::io;
4
cf12da39
WB
5#[cfg(feature = "tokio-fs")]
6use std::path::Path;
7
6cd4f635
WB
8use crate::decoder::{self, SeqRead};
9use crate::Entry;
10
11/// Asynchronous `pxar` decoder.
12///
13/// This is the `async` version of the `pxar` decoder.
14#[repr(transparent)]
15pub struct Decoder<T> {
16 inner: decoder::DecoderImpl<T>,
17}
18
19#[cfg(feature = "futures-io")]
cf12da39 20impl<T: futures::io::AsyncRead> Decoder<FuturesReader<T>> {
6cd4f635
WB
21 /// Decode a `pxar` archive from a `futures::io::AsyncRead` input.
22 #[inline]
cf12da39 23 pub async fn from_futures(input: T) -> io::Result<Self> {
6cd4f635
WB
24 Decoder::new(FuturesReader::new(input)).await
25 }
26}
27
28#[cfg(feature = "tokio-io")]
cf12da39 29impl<T: tokio::io::AsyncRead> Decoder<TokioReader<T>> {
6cd4f635
WB
30 /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input.
31 #[inline]
cf12da39 32 pub async fn from_tokio(input: T) -> io::Result<Self> {
6cd4f635 33 Decoder::new(TokioReader::new(input)).await
cf12da39
WB
34 }
35}
36
37#[cfg(feature = "tokio-fs")]
38impl Decoder<TokioReader<tokio::fs::File>> {
39 /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input.
40 #[inline]
41 pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
42 Decoder::from_tokio(tokio::fs::File::open(path.as_ref()).await?).await
6cd4f635
WB
43 }
44}
45
46impl<T: SeqRead> Decoder<T> {
47 /// Create an async decoder from an input implementing our internal read interface.
48 pub async fn new(input: T) -> io::Result<Self> {
49 Ok(Self {
50 inner: decoder::DecoderImpl::new(input).await?,
51 })
52 }
53
afe05f3f
WB
54 /// Internal helper for `Accessor`. In this case we have the low-level state machine, and the
55 /// layer "above" the `Accessor` propagates the actual type (sync vs async).
56 pub(crate) fn from_impl(inner: decoder::DecoderImpl<T>) -> Self {
57 Self { inner }
58 }
59
1b25fc08
WB
60 // I would normally agree with clippy, but this is async and we can at most implement Stream,
61 // which we do with feature flags...
62 #[allow(clippy::should_implement_trait)]
6cd4f635
WB
63 /// If this is a directory entry, get the next item inside the directory.
64 pub async fn next(&mut self) -> Option<io::Result<Entry>> {
65 self.inner.next_do().await.transpose()
66 }
67
27631c16
WB
68 /// Include goodbye tables in iteration.
69 pub fn enable_goodbye_entries(&mut self, on: bool) {
70 self.inner.with_goodbye_tables = on;
71 }
72
6cd4f635
WB
73 /// Turn this decoder into a `Stream`.
74 #[cfg(feature = "futures-io")]
75 pub fn into_stream(self) -> DecoderStream<T> {
76 DecoderStream::new(self)
77 }
78}
79
80#[cfg(feature = "futures-io")]
81mod stream {
82 use std::future::Future;
83 use std::io;
84 use std::pin::Pin;
85 use std::task::{Context, Poll};
86
87 use super::{Entry, SeqRead};
88
89 /// A wrapper for the async decoder implementing `futures::stream::Stream`.
90 ///
91 /// As long as streams are poll-based this wrapper is required to turn `async fn next()` into
92 /// `Stream`'s `poll_next()` interface.
1b25fc08 93 #[allow(clippy::type_complexity)] // yeah no
6cd4f635
WB
94 pub struct DecoderStream<T> {
95 inner: super::Decoder<T>,
96 future: Option<Pin<Box<dyn Future<Output = Option<io::Result<Entry>>>>>>,
97 }
98
99 impl<T> DecoderStream<T> {
100 pub fn new(inner: super::Decoder<T>) -> Self {
101 Self {
102 inner,
103 future: None,
104 }
105 }
106 }
107
108 impl<T: SeqRead> futures::stream::Stream for DecoderStream<T> {
109 type Item = io::Result<Entry>;
110
111 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
112 let this = unsafe { self.get_unchecked_mut() };
113 loop {
114 if let Some(mut fut) = this.future.take() {
115 match fut.as_mut().poll(cx) {
116 Poll::Ready(res) => return Poll::Ready(res),
117 Poll::Pending => {
118 this.future = Some(fut);
119 return Poll::Pending;
120 }
121 }
122 }
123 unsafe {
124 let fut: Box<dyn Future<Output = _>> = Box::new(this.inner.next());
125 // Discard the lifetime:
126 let fut: *mut (dyn Future<Output = Option<io::Result<Entry>>> + 'static) =
127 core::mem::transmute(Box::into_raw(fut));
128 let fut = Box::from_raw(fut);
129 this.future = Some(Pin::new_unchecked(fut));
130 }
131 }
132 }
133 }
134}
135
136#[cfg(feature = "futures-io")]
137pub use stream::DecoderStream;
138
6e02c122
FG
139#[cfg(feature = "futures-io")]
140mod fut {
141 use std::io;
142 use std::pin::Pin;
143 use std::task::{Context, Poll};
6cd4f635 144
6e02c122
FG
145 /// Read adapter for `futures::io::AsyncRead`
146 pub struct FuturesReader<T> {
147 inner: T,
148 }
6cd4f635 149
6e02c122
FG
150 impl<T: futures::io::AsyncRead> FuturesReader<T> {
151 pub fn new(inner: T) -> Self {
152 Self { inner }
153 }
154 }
155
156 impl<T: futures::io::AsyncRead> crate::decoder::SeqRead for FuturesReader<T> {
157 fn poll_seq_read(
158 self: Pin<&mut Self>,
159 cx: &mut Context,
160 buf: &mut [u8],
161 ) -> Poll<io::Result<usize>> {
162 unsafe {
163 self.map_unchecked_mut(|this| &mut this.inner)
164 .poll_read(cx, buf)
6cd4f635
WB
165 }
166 }
6cd4f635
WB
167 }
168}
169
6e02c122
FG
170#[cfg(feature = "futures-io")]
171use fut::FuturesReader;
172
173#[cfg(feature = "tokio-io")]
174mod tok {
175 use std::io;
176 use std::pin::Pin;
177 use std::task::{Context, Poll};
178
179 /// Read adapter for `futures::io::AsyncRead`
180 pub struct TokioReader<T> {
181 inner: T,
6cd4f635 182 }
6cd4f635 183
6e02c122
FG
184 impl<T: tokio::io::AsyncRead> TokioReader<T> {
185 pub fn new(inner: T) -> Self {
186 Self { inner }
187 }
188 }
189
190 impl<T: tokio::io::AsyncRead> crate::decoder::SeqRead for TokioReader<T> {
191 fn poll_seq_read(
192 self: Pin<&mut Self>,
193 cx: &mut Context,
194 buf: &mut [u8],
195 ) -> Poll<io::Result<usize>> {
196 let mut read_buf = tokio::io::ReadBuf::new(buf);
197 unsafe {
198 self.map_unchecked_mut(|this| &mut this.inner)
199 .poll_read(cx, &mut read_buf)
200 .map_ok(|_| read_buf.filled().len())
201 }
202 }
6cd4f635
WB
203 }
204}
6e02c122
FG
205
206#[cfg(feature = "tokio-io")]
207use tok::TokioReader;