]> git.proxmox.com Git - pxar.git/blob - src/encoder/aio.rs
1df75791f87bfafa8b383958598ebad7d778f2a1
[pxar.git] / src / encoder / aio.rs
1 //! Asynchronous `pxar` format handling.
2
3 use std::io;
4 use std::path::Path;
5 use std::pin::Pin;
6 use std::task::{Context, Poll};
7
8 use crate::encoder::{self, SeqWrite};
9 use crate::format;
10 use crate::Metadata;
11
12 // #[cfg(feature = "futures-io")]
13 // use crate::decoder::aio::FuturesReader;
14 // #[cfg(feature = "tokio-io")]
15 // use crate::decoder::aio::TokioReader;
16
17 /// Asynchronous `pxar` encoder.
18 ///
19 /// This is the `async` version of the `pxar` encoder.
20 #[repr(transparent)]
21 pub struct Encoder<'a, T: SeqWrite + 'a> {
22 inner: encoder::EncoderImpl<'a, T>,
23 }
24
25 #[cfg(feature = "futures-io")]
26 impl<'a, T: futures::io::AsyncWrite + 'a> Encoder<'a, FuturesWriter<T>> {
27 /// Encode a `pxar` archive into a `futures::io::AsyncWrite` output.
28 #[inline]
29 pub async fn from_futures(
30 output: T,
31 metadata: &Metadata,
32 ) -> io::Result<Encoder<'a, FuturesWriter<T>>> {
33 Encoder::new(FuturesWriter::new(output), metadata).await
34 }
35 }
36
37 #[cfg(feature = "tokio-io")]
38 impl<'a, T: tokio::io::AsyncWrite + 'a> Encoder<'a, TokioWriter<T>> {
39 /// Encode a `pxar` archive into a `tokio::io::AsyncWrite` output.
40 #[inline]
41 pub async fn from_futures(
42 output: T,
43 metadata: &Metadata,
44 ) -> io::Result<Encoder<'a, TokioWriter<T>>> {
45 Encoder::new(TokioWriter::new(output), metadata).await
46 }
47 }
48
49 #[cfg(feature = "tokio-fs")]
50 impl<'a> Encoder<'a, TokioWriter<tokio::fs::File>> {
51 /// Convenience shortcut for `File::create` followed by `Encoder::from_tokio`.
52 pub async fn create<'b, P: AsRef<Path>>(
53 path: P,
54 metadata: &'b Metadata,
55 ) -> io::Result<Encoder<'a, TokioWriter<tokio::fs::File>>> {
56 Encoder::new(
57 TokioWriter::new(tokio::fs::File::create(path.as_ref()).await?),
58 metadata,
59 )
60 .await
61 }
62 }
63
64 impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
65 /// Create an asynchronous encoder for an output implementing our internal write interface.
66 pub async fn new(output: T, metadata: &Metadata) -> io::Result<Encoder<'a, T>> {
67 Ok(Self {
68 inner: encoder::EncoderImpl::new(output, metadata).await?,
69 })
70 }
71
72 /// Create a new regular file in the archive. This returns a `File` object to which the
73 /// contents have to be written out *completely*. Failing to do so will put the encoder into an
74 /// error state.
75 pub async fn create_file<'b, P: AsRef<Path>>(
76 &'b mut self,
77 metadata: &Metadata,
78 file_name: P,
79 file_size: u64,
80 ) -> io::Result<File<'b>>
81 where
82 'a: 'b,
83 {
84 Ok(File {
85 inner: self
86 .inner
87 .create_file(metadata, file_name.as_ref(), file_size)
88 .await?,
89 })
90 }
91
92 // /// Convenience shortcut to add a *regular* file by path including its contents to the archive.
93 // pub async fn add_file<P, F>(
94 // &mut self,
95 // metadata: &Metadata,
96 // file_name: P,
97 // file_size: u64,
98 // content: &mut dyn tokio::io::Read,
99 // ) -> io::Result<()>
100 // where
101 // P: AsRef<Path>,
102 // F: AsAsyncReader,
103 // {
104 // self.inner.add_file(
105 // metadata,
106 // file_name.as_ref(),
107 // file_size,
108 // content.as_async_reader(),
109 // ).await
110 // }
111
112 /// Create a new subdirectory. Note that the subdirectory has to be finished by calling the
113 /// `finish()` method, otherwise the entire archive will be in an error state.
114 pub async fn create_directory<'b, P: AsRef<Path>>(
115 &'b mut self,
116 file_name: P,
117 metadata: &Metadata,
118 ) -> io::Result<Encoder<'b, &'b mut dyn SeqWrite>>
119 where
120 'a: 'b,
121 {
122 Ok(Encoder {
123 inner: self
124 .inner
125 .create_directory(file_name.as_ref(), metadata)
126 .await?,
127 })
128 }
129
130 /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
131 pub async fn finish(self) -> io::Result<()> {
132 self.inner.finish().await
133 }
134
135 /// Add a symbolic link to the archive.
136 pub async fn add_symlink<PF: AsRef<Path>, PT: AsRef<Path>>(
137 &mut self,
138 metadata: &Metadata,
139 file_name: PF,
140 target: PT,
141 ) -> io::Result<()> {
142 self.inner
143 .add_symlink(metadata, file_name.as_ref(), target.as_ref())
144 .await
145 }
146
147 /// Add a hard link to the archive.
148 pub async fn add_hardlink<PF: AsRef<Path>, PT: AsRef<Path>>(
149 &mut self,
150 file_name: PF,
151 target: PT,
152 offset: u64,
153 ) -> io::Result<()> {
154 self.inner
155 .add_hardlink(file_name.as_ref(), target.as_ref(), offset)
156 .await
157 }
158
159 /// Add a device node to the archive.
160 pub async fn add_device<P: AsRef<Path>>(
161 &mut self,
162 metadata: &Metadata,
163 file_name: P,
164 device: format::Device,
165 ) -> io::Result<()> {
166 self.inner
167 .add_device(metadata, file_name.as_ref(), device)
168 .await
169 }
170
171 /// Add a device node to the archive.
172 pub async fn add_fifo<P: AsRef<Path>>(
173 &mut self,
174 metadata: &Metadata,
175 file_name: P,
176 ) -> io::Result<()> {
177 self.inner.add_fifo(metadata, file_name.as_ref()).await
178 }
179
180 /// Add a device node to the archive.
181 pub async fn add_socket<P: AsRef<Path>>(
182 &mut self,
183 metadata: &Metadata,
184 file_name: P,
185 ) -> io::Result<()> {
186 self.inner.add_socket(metadata, file_name.as_ref()).await
187 }
188 }
189
190 #[repr(transparent)]
191 pub struct File<'a> {
192 inner: encoder::FileImpl<'a>,
193 }
194
195 #[cfg(feature = "futures-io")]
196 impl<'a> futures::io::AsyncWrite for File<'a> {
197 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, data: &[u8]) -> Poll<io::Result<usize>> {
198 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll_write(cx, data)
199 }
200
201 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
202 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll_flush(cx)
203 }
204
205 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
206 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll_close(cx)
207 }
208 }
209
210 #[cfg(feature = "tokio-io")]
211 impl<'a> tokio::io::AsyncWrite for File<'a> {
212 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, data: &[u8]) -> Poll<io::Result<usize>> {
213 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll_write(cx, data)
214 }
215
216 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
217 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll_flush(cx)
218 }
219
220 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
221 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll_close(cx)
222 }
223 }
224
225 /// Pxar encoder write adapter for `futures::io::AsyncWrite`.
226 #[cfg(feature = "futures-io")]
227 mod futures_writer {
228 use std::io;
229 use std::pin::Pin;
230 use std::task::{Context, Poll};
231
232 use crate::encoder::SeqWrite;
233
234 pub struct FuturesWriter<T> {
235 inner: Option<T>,
236 position: u64,
237 }
238
239 impl<T: futures::io::AsyncWrite> FuturesWriter<T> {
240 pub fn new(inner: T) -> Self {
241 Self {
242 inner: Some(inner),
243 position: 0,
244 }
245 }
246
247 fn inner_mut(self: &mut Self) -> io::Result<Pin<&mut T>> {
248 let inner = self
249 .inner
250 .as_mut()
251 .ok_or_else(|| io_format_err!("write after close"))?;
252 Ok(unsafe { Pin::new_unchecked(inner) })
253 }
254
255 fn inner(self: Pin<&mut Self>) -> io::Result<Pin<&mut T>> {
256 unsafe { self.get_unchecked_mut() }.inner_mut()
257 }
258 }
259
260 impl<T: futures::io::AsyncWrite> SeqWrite for FuturesWriter<T> {
261 fn poll_seq_write(
262 self: Pin<&mut Self>,
263 cx: &mut Context,
264 buf: &[u8],
265 ) -> Poll<io::Result<usize>> {
266 let this = unsafe { self.get_unchecked_mut() };
267 let got = ready!(this.inner_mut()?.poll_write(cx, buf))?;
268 this.position += got as u64;
269 Poll::Ready(Ok(got))
270 }
271
272 fn poll_position(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<u64>> {
273 Poll::Ready(Ok(self.as_ref().position))
274 }
275
276 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
277 self.inner()?.poll_flush(cx)
278 }
279
280 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
281 let this = unsafe { self.get_unchecked_mut() };
282 match this.inner.as_mut() {
283 None => return Poll::Ready(Ok(())),
284 Some(inner) => {
285 ready!(unsafe { Pin::new_unchecked(inner).poll_close(cx) })?;
286 this.inner = None;
287 Poll::Ready(Ok(()))
288 }
289 }
290 }
291 }
292 }
293
294 pub use futures_writer::FuturesWriter;
295
296 /// Pxar encoder write adapter for `tokio::io::AsyncWrite`.
297 #[cfg(feature = "tokio-io")]
298 mod tokio_writer {
299 use std::io;
300 use std::pin::Pin;
301 use std::task::{Context, Poll};
302
303 use crate::encoder::SeqWrite;
304
305 pub struct TokioWriter<T> {
306 inner: Option<T>,
307 position: u64,
308 }
309
310 impl<T: tokio::io::AsyncWrite> TokioWriter<T> {
311 pub fn new(inner: T) -> Self {
312 Self {
313 inner: Some(inner),
314 position: 0,
315 }
316 }
317
318 fn inner_mut(self: &mut Self) -> io::Result<Pin<&mut T>> {
319 let inner = self
320 .inner
321 .as_mut()
322 .ok_or_else(|| io_format_err!("write after close"))?;
323 Ok(unsafe { Pin::new_unchecked(inner) })
324 }
325
326 fn inner(self: Pin<&mut Self>) -> io::Result<Pin<&mut T>> {
327 unsafe { self.get_unchecked_mut() }.inner_mut()
328 }
329 }
330
331 impl<T: tokio::io::AsyncWrite> SeqWrite for TokioWriter<T> {
332 fn poll_seq_write(
333 self: Pin<&mut Self>,
334 cx: &mut Context,
335 buf: &[u8],
336 ) -> Poll<io::Result<usize>> {
337 let this = unsafe { self.get_unchecked_mut() };
338 let got = ready!(this.inner_mut()?.poll_write(cx, buf))?;
339 this.position += got as u64;
340 Poll::Ready(Ok(got))
341 }
342
343 fn poll_position(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<u64>> {
344 Poll::Ready(Ok(self.as_ref().position))
345 }
346
347 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
348 self.inner()?.poll_flush(cx)
349 }
350
351 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
352 let this = unsafe { self.get_unchecked_mut() };
353 match this.inner.as_mut() {
354 None => return Poll::Ready(Ok(())),
355 Some(inner) => {
356 ready!(unsafe { Pin::new_unchecked(inner).poll_shutdown(cx) })?;
357 this.inner = None;
358 Poll::Ready(Ok(()))
359 }
360 }
361 }
362 }
363 }
364
365 pub use tokio_writer::TokioWriter;