]> git.proxmox.com Git - pxar.git/blob - src/encoder/aio.rs
revert Send requirement from previous patch
[pxar.git] / src / encoder / aio.rs
1 //! Asynchronous `pxar` format handling.
2
3 use std::io;
4 use std::path::Path;
5 use std::pin::Pin;
6 use std::task::{Context, Poll};
7
8 use crate::encoder::{self, LinkOffset, SeqWrite};
9 use crate::format;
10 use crate::Metadata;
11
12 /// Asynchronous `pxar` encoder.
13 ///
14 /// This is the `async` version of the `pxar` encoder.
15 #[repr(transparent)]
16 pub struct Encoder<'a, T: SeqWrite + 'a> {
17 inner: encoder::EncoderImpl<'a, T>,
18 }
19
20 #[cfg(feature = "tokio-io")]
21 impl<'a, T: tokio::io::AsyncWrite + 'a> Encoder<'a, TokioWriter<T>> {
22 /// Encode a `pxar` archive into a `tokio::io::AsyncWrite` output.
23 #[inline]
24 pub async fn from_tokio(
25 output: T,
26 metadata: &Metadata,
27 ) -> io::Result<Encoder<'a, TokioWriter<T>>> {
28 Encoder::new(TokioWriter::new(output), metadata).await
29 }
30 }
31
32 #[cfg(feature = "tokio-fs")]
33 impl<'a> Encoder<'a, TokioWriter<tokio::fs::File>> {
34 /// Convenience shortcut for `File::create` followed by `Encoder::from_tokio`.
35 pub async fn create<'b, P: AsRef<Path>>(
36 path: P,
37 metadata: &'b Metadata,
38 ) -> io::Result<Encoder<'a, TokioWriter<tokio::fs::File>>> {
39 Encoder::new(
40 TokioWriter::new(tokio::fs::File::create(path.as_ref()).await?),
41 metadata,
42 )
43 .await
44 }
45 }
46
47 impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
48 /// Create an asynchronous encoder for an output implementing our internal write interface.
49 pub async fn new(output: T, metadata: &Metadata) -> io::Result<Encoder<'a, T>> {
50 Ok(Self {
51 inner: encoder::EncoderImpl::new(output.into(), metadata).await?,
52 })
53 }
54
55 /// Create a new regular file in the archive. This returns a `File` object to which the
56 /// contents have to be written out *completely*. Failing to do so will put the encoder into an
57 /// error state.
58 pub async fn create_file<'b, P: AsRef<Path>>(
59 &'b mut self,
60 metadata: &Metadata,
61 file_name: P,
62 file_size: u64,
63 ) -> io::Result<File<'b, T>>
64 where
65 'a: 'b,
66 {
67 Ok(File {
68 inner: self
69 .inner
70 .create_file(metadata, file_name.as_ref(), file_size)
71 .await?,
72 })
73 }
74
75 // /// Convenience shortcut to add a *regular* file by path including its contents to the archive.
76 // pub async fn add_file<P, F>(
77 // &mut self,
78 // metadata: &Metadata,
79 // file_name: P,
80 // file_size: u64,
81 // content: &mut dyn tokio::io::Read,
82 // ) -> io::Result<()>
83 // where
84 // P: AsRef<Path>,
85 // F: AsAsyncReader,
86 // {
87 // self.inner.add_file(
88 // metadata,
89 // file_name.as_ref(),
90 // file_size,
91 // content.as_async_reader(),
92 // ).await
93 // }
94
95 /// Create a new subdirectory. Note that the subdirectory has to be finished by calling the
96 /// `finish()` method, otherwise the entire archive will be in an error state.
97 pub async fn create_directory<P: AsRef<Path>>(
98 &mut self,
99 file_name: P,
100 metadata: &Metadata,
101 ) -> io::Result<Encoder<'_, T>> {
102 Ok(Encoder {
103 inner: self
104 .inner
105 .create_directory(file_name.as_ref(), metadata)
106 .await?,
107 })
108 }
109
110 /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
111 pub async fn finish(self) -> io::Result<()> {
112 self.inner.finish().await
113 }
114
115 /// Add a symbolic link to the archive.
116 pub async fn add_symlink<PF: AsRef<Path>, PT: AsRef<Path>>(
117 &mut self,
118 metadata: &Metadata,
119 file_name: PF,
120 target: PT,
121 ) -> io::Result<()> {
122 self.inner
123 .add_symlink(metadata, file_name.as_ref(), target.as_ref())
124 .await
125 }
126
127 /// Add a hard link to the archive.
128 pub async fn add_hardlink<PF: AsRef<Path>, PT: AsRef<Path>>(
129 &mut self,
130 file_name: PF,
131 target: PT,
132 offset: LinkOffset,
133 ) -> io::Result<()> {
134 self.inner
135 .add_hardlink(file_name.as_ref(), target.as_ref(), offset)
136 .await
137 }
138
139 /// Add a device node to the archive.
140 pub async fn add_device<P: AsRef<Path>>(
141 &mut self,
142 metadata: &Metadata,
143 file_name: P,
144 device: format::Device,
145 ) -> io::Result<()> {
146 self.inner
147 .add_device(metadata, file_name.as_ref(), device)
148 .await
149 }
150
151 /// Add a device node to the archive.
152 pub async fn add_fifo<P: AsRef<Path>>(
153 &mut self,
154 metadata: &Metadata,
155 file_name: P,
156 ) -> io::Result<()> {
157 self.inner.add_fifo(metadata, file_name.as_ref()).await
158 }
159
160 /// Add a device node to the archive.
161 pub async fn add_socket<P: AsRef<Path>>(
162 &mut self,
163 metadata: &Metadata,
164 file_name: P,
165 ) -> io::Result<()> {
166 self.inner.add_socket(metadata, file_name.as_ref()).await
167 }
168 }
169
170 #[repr(transparent)]
171 pub struct File<'a, S: SeqWrite> {
172 inner: encoder::FileImpl<'a, S>,
173 }
174
175 impl<'a, S: SeqWrite> File<'a, S> {
176 /// Get the file offset to be able to reference it with `add_hardlink`.
177 pub fn file_offset(&self) -> LinkOffset {
178 self.inner.file_offset()
179 }
180
181 /// Write file data for the current file entry in a pxar archive.
182 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
183 self.inner.write(data).await
184 }
185
186 /// Completely write file data for the current file entry in a pxar archive.
187 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
188 self.inner.write_all(data).await
189 }
190 }
191
192 #[cfg(feature = "tokio-io")]
193 impl<'a, S: SeqWrite> tokio::io::AsyncWrite for File<'a, S> {
194 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, data: &[u8]) -> Poll<io::Result<usize>> {
195 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll_write(cx, data)
196 }
197
198 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
199 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll_flush(cx)
200 }
201
202 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
203 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll_close(cx)
204 }
205 }
206
207 /// Pxar encoder write adapter for `tokio::io::AsyncWrite`.
208 #[cfg(feature = "tokio-io")]
209 mod tokio_writer {
210 use std::io;
211 use std::pin::Pin;
212 use std::task::{Context, Poll};
213
214 use crate::encoder::SeqWrite;
215
216 pub struct TokioWriter<T> {
217 inner: Option<T>,
218 }
219
220 impl<T: tokio::io::AsyncWrite> TokioWriter<T> {
221 pub fn new(inner: T) -> Self {
222 Self { inner: Some(inner) }
223 }
224
225 fn inner_mut(&mut self) -> io::Result<Pin<&mut T>> {
226 let inner = self
227 .inner
228 .as_mut()
229 .ok_or_else(|| io_format_err!("write after close"))?;
230 Ok(unsafe { Pin::new_unchecked(inner) })
231 }
232
233 fn inner(self: Pin<&mut Self>) -> io::Result<Pin<&mut T>> {
234 unsafe { self.get_unchecked_mut() }.inner_mut()
235 }
236 }
237
238 impl<T: tokio::io::AsyncWrite> SeqWrite for TokioWriter<T> {
239 fn poll_seq_write(
240 self: Pin<&mut Self>,
241 cx: &mut Context,
242 buf: &[u8],
243 ) -> Poll<io::Result<usize>> {
244 let this = unsafe { self.get_unchecked_mut() };
245 this.inner_mut()?.poll_write(cx, buf)
246 }
247
248 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
249 self.inner()?.poll_flush(cx)
250 }
251 }
252 }
253
254 #[cfg(feature = "tokio-io")]
255 pub use tokio_writer::TokioWriter;
256
257 #[cfg(test)]
258 mod test {
259 use std::io;
260 use std::pin::Pin;
261 use std::task::{Context, Poll};
262
263 use super::Encoder;
264 use crate::Metadata;
265
266 struct DummyOutput;
267
268 impl super::SeqWrite for DummyOutput {
269 fn poll_seq_write(
270 self: Pin<&mut Self>,
271 _cx: &mut Context,
272 _buf: &[u8],
273 ) -> Poll<io::Result<usize>> {
274 unreachable!();
275 }
276
277 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
278 unreachable!();
279 }
280 }
281
282 #[test]
283 /// Assert that `Encoder` is `Send`
284 fn send_test() {
285 let test = async {
286 let mut encoder = Encoder::new(DummyOutput, &Metadata::dir_builder(0o700).build())
287 .await
288 .unwrap();
289 {
290 let mut dir = encoder
291 .create_directory("baba", &Metadata::dir_builder(0o700).build())
292 .await
293 .unwrap();
294 dir.create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
295 .await
296 .unwrap();
297 }
298 encoder.finish().await.unwrap();
299 };
300
301 fn test_send<T: Send>(_: T) {}
302 test_send(test);
303 }
304 }