]> git.proxmox.com Git - pxar.git/blob - src/encoder/aio.rs
doc fixup
[pxar.git] / src / encoder / aio.rs
1 //! Asynchronous `pxar` format handling.
2
3 use std::io;
4 use std::path::Path;
5 use std::pin::Pin;
6 use std::task::{Context, Poll};
7
8 use crate::encoder::{self, LinkOffset, PayloadOffset, SeqWrite};
9 use crate::format;
10 use crate::{Metadata, PxarVariant};
11
12 /// Asynchronous `pxar` encoder.
13 ///
14 /// This is the `async` version of the `pxar` encoder.
15 #[repr(transparent)]
16 pub struct Encoder<'a, T: SeqWrite + 'a> {
17 inner: encoder::EncoderImpl<'a, T>,
18 }
19
20 #[cfg(feature = "tokio-io")]
21 impl<'a, T: tokio::io::AsyncWrite + 'a> Encoder<'a, TokioWriter<T>> {
22 /// Encode a `pxar` archive into a `tokio::io::AsyncWrite` output.
23 #[inline]
24 pub async fn from_tokio(
25 output: PxarVariant<T, T>,
26 metadata: &Metadata,
27 prelude: Option<&[u8]>,
28 ) -> io::Result<Encoder<'a, TokioWriter<T>>> {
29 Encoder::new(
30 output.wrap(|output| TokioWriter::new(output)),
31 metadata,
32 prelude,
33 )
34 .await
35 }
36 }
37
38 #[cfg(feature = "tokio-fs")]
39 impl<'a> Encoder<'a, TokioWriter<tokio::fs::File>> {
40 /// Convenience shortcut for `File::create` followed by `Encoder::from_tokio`.
41 pub async fn create<'b, P: AsRef<Path>>(
42 path: P,
43 metadata: &'b Metadata,
44 ) -> io::Result<Encoder<'a, TokioWriter<tokio::fs::File>>> {
45 Encoder::new(
46 PxarVariant::Unified(TokioWriter::new(
47 tokio::fs::File::create(path.as_ref()).await?,
48 )),
49 metadata,
50 None,
51 )
52 .await
53 }
54 }
55
56 impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
57 /// Create an asynchronous encoder for an output implementing our internal write interface.
58 pub async fn new(
59 output: PxarVariant<T, T>,
60 metadata: &Metadata,
61 prelude: Option<&[u8]>,
62 ) -> io::Result<Encoder<'a, T>> {
63 let output = output.wrap_multi(|output| output.into(), |payload_output| payload_output);
64 Ok(Self {
65 inner: encoder::EncoderImpl::new(output, metadata, prelude).await?,
66 })
67 }
68
69 /// Create a new regular file in the archive. This returns a `File` object to which the
70 /// contents have to be written out *completely*. Failing to do so will put the encoder into an
71 /// error state.
72 pub async fn create_file<'b, P: AsRef<Path>>(
73 &'b mut self,
74 metadata: &Metadata,
75 file_name: P,
76 file_size: u64,
77 ) -> io::Result<File<'b, T>>
78 where
79 'a: 'b,
80 {
81 Ok(File {
82 inner: self
83 .inner
84 .create_file(metadata, file_name.as_ref(), file_size)
85 .await?,
86 })
87 }
88
89 /// Get current position for payload stream
90 pub fn payload_position(&self) -> io::Result<PayloadOffset> {
91 self.inner.payload_position()
92 }
93
94 // /// Convenience shortcut to add a *regular* file by path including its contents to the archive.
95 // pub async fn add_file<P, F>(
96 // &mut self,
97 // metadata: &Metadata,
98 // file_name: P,
99 // file_size: u64,
100 // content: &mut dyn tokio::io::Read,
101 // ) -> io::Result<()>
102 // where
103 // P: AsRef<Path>,
104 // F: AsAsyncReader,
105 // {
106 // self.inner.add_file(
107 // metadata,
108 // file_name.as_ref(),
109 // file_size,
110 // content.as_async_reader(),
111 // ).await
112 // }
113
114 /// Encode a payload reference pointing to given offset in the separate payload output
115 ///
116 /// Returns with error if the encoder instance has no separate payload output or encoding
117 /// failed.
118 pub async fn add_payload_ref(
119 &mut self,
120 metadata: &Metadata,
121 file_name: &Path,
122 file_size: u64,
123 payload_offset: PayloadOffset,
124 ) -> io::Result<LinkOffset> {
125 self.inner
126 .add_payload_ref(metadata, file_name.as_ref(), file_size, payload_offset)
127 .await
128 }
129
130 /// Add size to payload stream
131 pub fn advance(&mut self, size: PayloadOffset) -> io::Result<()> {
132 self.inner.advance(size)
133 }
134
135 /// Create a new subdirectory. Note that the subdirectory has to be finished by calling the
136 /// `finish()` method, otherwise the entire archive will be in an error state.
137 pub async fn create_directory<P: AsRef<Path>>(
138 &mut self,
139 file_name: P,
140 metadata: &Metadata,
141 ) -> io::Result<()> {
142 self.inner
143 .create_directory(file_name.as_ref(), metadata)
144 .await
145 }
146
147 /// Finish this directory. This is mandatory, encodes the end for the current directory.
148 pub async fn finish(&mut self) -> io::Result<()> {
149 self.inner.finish().await
150 }
151
152 /// Close the encoder instance. This is mandatory, encodes the end for the optional payload
153 /// output stream, if some is given
154 pub async fn close(self) -> io::Result<()> {
155 self.inner.close().await
156 }
157
158 /// Add a symbolic link to the archive.
159 pub async fn add_symlink<PF: AsRef<Path>, PT: AsRef<Path>>(
160 &mut self,
161 metadata: &Metadata,
162 file_name: PF,
163 target: PT,
164 ) -> io::Result<()> {
165 self.inner
166 .add_symlink(metadata, file_name.as_ref(), target.as_ref())
167 .await
168 }
169
170 /// Add a hard link to the archive.
171 pub async fn add_hardlink<PF: AsRef<Path>, PT: AsRef<Path>>(
172 &mut self,
173 file_name: PF,
174 target: PT,
175 offset: LinkOffset,
176 ) -> io::Result<()> {
177 self.inner
178 .add_hardlink(file_name.as_ref(), target.as_ref(), offset)
179 .await
180 }
181
182 /// Add a device node to the archive.
183 pub async fn add_device<P: AsRef<Path>>(
184 &mut self,
185 metadata: &Metadata,
186 file_name: P,
187 device: format::Device,
188 ) -> io::Result<()> {
189 self.inner
190 .add_device(metadata, file_name.as_ref(), device)
191 .await
192 }
193
194 /// Add a device node to the archive.
195 pub async fn add_fifo<P: AsRef<Path>>(
196 &mut self,
197 metadata: &Metadata,
198 file_name: P,
199 ) -> io::Result<()> {
200 self.inner.add_fifo(metadata, file_name.as_ref()).await
201 }
202
203 /// Add a device node to the archive.
204 pub async fn add_socket<P: AsRef<Path>>(
205 &mut self,
206 metadata: &Metadata,
207 file_name: P,
208 ) -> io::Result<()> {
209 self.inner.add_socket(metadata, file_name.as_ref()).await
210 }
211 }
212
213 /// This is a "file" inside a pxar archive, to which the initially declared amount of data should
214 /// be written.
215 ///
216 /// Writing more or less than the designated amount is an error and will cause the produced archive
217 /// to be broken.
218 #[repr(transparent)]
219 pub struct File<'a, S: SeqWrite> {
220 inner: encoder::FileImpl<'a, S>,
221 }
222
223 impl<'a, S: SeqWrite> File<'a, S> {
224 /// Get the file offset to be able to reference it with `add_hardlink`.
225 pub fn file_offset(&self) -> LinkOffset {
226 self.inner.file_offset()
227 }
228
229 /// Write file data for the current file entry in a pxar archive.
230 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
231 self.inner.write(data).await
232 }
233
234 /// Completely write file data for the current file entry in a pxar archive.
235 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
236 self.inner.write_all(data).await
237 }
238 }
239
240 #[cfg(feature = "tokio-io")]
241 impl<'a, S: SeqWrite> tokio::io::AsyncWrite for File<'a, S> {
242 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, data: &[u8]) -> Poll<io::Result<usize>> {
243 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll_write(cx, data)
244 }
245
246 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
247 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll_flush(cx)
248 }
249
250 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
251 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll_close(cx)
252 }
253 }
254
255 /// Pxar encoder write adapter for `tokio::io::AsyncWrite`.
256 #[cfg(feature = "tokio-io")]
257 mod tokio_writer {
258 use std::io;
259 use std::pin::Pin;
260 use std::task::{Context, Poll};
261
262 use crate::encoder::SeqWrite;
263
264 /// Pxar encoder write adapter for [`AsyncWrite`](tokio::io::AsyncWrite).
265 pub struct TokioWriter<T> {
266 inner: Option<T>,
267 }
268
269 impl<T: tokio::io::AsyncWrite> TokioWriter<T> {
270 /// Make a new [`SeqWrite`] wrapper for an object implementing
271 /// [`AsyncWrite`](tokio::io::AsyncWrite).
272 pub fn new(inner: T) -> Self {
273 Self { inner: Some(inner) }
274 }
275
276 fn inner_mut(&mut self) -> io::Result<Pin<&mut T>> {
277 let inner = self
278 .inner
279 .as_mut()
280 .ok_or_else(|| io_format_err!("write after close"))?;
281 Ok(unsafe { Pin::new_unchecked(inner) })
282 }
283
284 fn inner(self: Pin<&mut Self>) -> io::Result<Pin<&mut T>> {
285 unsafe { self.get_unchecked_mut() }.inner_mut()
286 }
287 }
288
289 impl<T: tokio::io::AsyncWrite> SeqWrite for TokioWriter<T> {
290 fn poll_seq_write(
291 self: Pin<&mut Self>,
292 cx: &mut Context,
293 buf: &[u8],
294 ) -> Poll<io::Result<usize>> {
295 let this = unsafe { self.get_unchecked_mut() };
296 this.inner_mut()?.poll_write(cx, buf)
297 }
298
299 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
300 self.inner()?.poll_flush(cx)
301 }
302 }
303 }
304
305 #[cfg(feature = "tokio-io")]
306 pub use tokio_writer::TokioWriter;
307
308 #[cfg(test)]
309 mod test {
310 use std::io;
311 use std::pin::Pin;
312 use std::task::{Context, Poll};
313
314 use super::Encoder;
315 use crate::Metadata;
316
317 struct DummyOutput;
318
319 impl super::SeqWrite for DummyOutput {
320 fn poll_seq_write(
321 self: Pin<&mut Self>,
322 _cx: &mut Context,
323 _buf: &[u8],
324 ) -> Poll<io::Result<usize>> {
325 unreachable!();
326 }
327
328 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
329 unreachable!();
330 }
331 }
332
333 #[test]
334 /// Assert that `Encoder` is `Send`
335 fn send_test() {
336 let test = async {
337 let mut encoder = Encoder::new(
338 crate::PxarVariant::Unified(DummyOutput),
339 &Metadata::dir_builder(0o700).build(),
340 None,
341 )
342 .await
343 .unwrap();
344 {
345 encoder
346 .create_directory("baba", &Metadata::dir_builder(0o700).build())
347 .await
348 .unwrap();
349 encoder
350 .create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
351 .await
352 .unwrap();
353 }
354 encoder.finish().await.unwrap();
355 };
356
357 fn test_send<T: Send>(_: T) {}
358 test_send(test);
359 }
360 }