]>
Commit | Line | Data |
---|---|---|
70acf637 WB |
1 | //! The `pxar` encoder state machine. |
2 | //! | |
3 | //! This is the implementation used by both the synchronous and async pxar wrappers. | |
4 | ||
5 | use std::io; | |
6 | use std::mem::{forget, size_of, size_of_val, take}; | |
7 | use std::os::unix::ffi::OsStrExt; | |
8 | use std::path::Path; | |
9 | use std::pin::Pin; | |
10 | use std::task::{Context, Poll}; | |
11 | ||
12 | use endian_trait::Endian; | |
13 | ||
749855a4 | 14 | use crate::binary_tree_array; |
951620f1 | 15 | use crate::decoder::{self, SeqRead}; |
70acf637 WB |
16 | use crate::format::{self, GoodbyeItem}; |
17 | use crate::poll_fn::poll_fn; | |
18 | use crate::Metadata; | |
19 | ||
54109840 | 20 | pub mod aio; |
70acf637 WB |
21 | pub mod sync; |
22 | ||
23 | #[doc(inline)] | |
24 | pub use sync::Encoder; | |
25 | ||
26 | /// Sequential write interface used by the encoder's state machine. | |
27 | /// | |
28 | /// This is our internal writer trait which is available for `std::io::Write` types in the | |
29 | /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous | |
30 | /// wrapper. | |
31 | pub trait SeqWrite { | |
32 | fn poll_seq_write( | |
33 | self: Pin<&mut Self>, | |
34 | cx: &mut Context, | |
35 | buf: &[u8], | |
36 | ) -> Poll<io::Result<usize>>; | |
37 | ||
327e33f2 | 38 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>; |
05356884 | 39 | |
327e33f2 | 40 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>; |
05356884 | 41 | |
70acf637 WB |
42 | /// While writing to a pxar archive we need to remember how much dat we've written to track some |
43 | /// offsets. Particularly items like the goodbye table need to be able to compute offsets to | |
44 | /// further back in the archive. | |
45 | fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>>; | |
46 | ||
47 | /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper. | |
48 | /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another | |
49 | /// subdirectory in that would have a trait object pointing to the trait object, and so on. | |
50 | fn as_trait_object(&mut self) -> &mut dyn SeqWrite | |
51 | where | |
52 | Self: Sized, | |
53 | { | |
54 | self as &mut dyn SeqWrite | |
55 | } | |
56 | } | |
57 | ||
58 | /// Allow using trait objects for generics taking a `SeqWrite`. | |
59 | impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) { | |
60 | fn poll_seq_write( | |
61 | self: Pin<&mut Self>, | |
62 | cx: &mut Context, | |
63 | buf: &[u8], | |
64 | ) -> Poll<io::Result<usize>> { | |
65 | unsafe { | |
66 | self.map_unchecked_mut(|this| &mut **this) | |
67 | .poll_seq_write(cx, buf) | |
68 | } | |
69 | } | |
70 | ||
05356884 WB |
71 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { |
72 | unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) } | |
73 | } | |
74 | ||
75 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
76 | unsafe { self.map_unchecked_mut(|this| &mut **this).poll_close(cx) } | |
77 | } | |
78 | ||
70acf637 WB |
79 | fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> { |
80 | unsafe { self.map_unchecked_mut(|this| &mut **this).poll_position(cx) } | |
81 | } | |
82 | ||
83 | fn as_trait_object(&mut self) -> &mut dyn SeqWrite | |
84 | where | |
85 | Self: Sized, | |
86 | { | |
87 | &mut **self | |
88 | } | |
89 | } | |
90 | ||
fc9c8c9d WB |
91 | /// awaitable version of `poll_position`. |
92 | async fn seq_write_position<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<u64> { | |
93 | poll_fn(move |cx| unsafe { Pin::new_unchecked(&mut *output).poll_position(cx) }).await | |
94 | } | |
70acf637 | 95 | |
fc9c8c9d WB |
96 | /// awaitable verison of `poll_seq_write`. |
97 | async fn seq_write<T: SeqWrite + ?Sized>(output: &mut T, buf: &[u8]) -> io::Result<usize> { | |
98 | poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await | |
99 | } | |
70acf637 | 100 | |
fc9c8c9d WB |
101 | /// Write the entire contents of a buffer, handling short writes. |
102 | async fn seq_write_all<T: SeqWrite + ?Sized>(output: &mut T, mut buf: &[u8]) -> io::Result<()> { | |
103 | while !buf.is_empty() { | |
104 | let got = seq_write(&mut *output, buf).await?; | |
105 | buf = &buf[got..]; | |
70acf637 | 106 | } |
fc9c8c9d WB |
107 | Ok(()) |
108 | } | |
70acf637 | 109 | |
fc9c8c9d WB |
110 | /// Write an endian-swappable struct. |
111 | async fn seq_write_struct<E: Endian, T>(output: &mut T, data: E) -> io::Result<()> | |
112 | where | |
113 | T: SeqWrite + ?Sized, | |
114 | { | |
115 | let data = data.to_le(); | |
116 | seq_write_all(output, unsafe { | |
117 | std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) | |
118 | }) | |
119 | .await | |
120 | } | |
70acf637 | 121 | |
fc9c8c9d WB |
122 | /// Write a pxar entry. |
123 | async fn seq_write_pxar_entry<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()> | |
124 | where | |
125 | T: SeqWrite + ?Sized, | |
126 | { | |
127 | seq_write_struct( | |
128 | &mut *output, | |
129 | format::Header::with_content_size(htype, data.len() as u64), | |
130 | ) | |
131 | .await?; | |
132 | seq_write_all(output, data).await | |
133 | } | |
70acf637 | 134 | |
fc9c8c9d WB |
135 | /// Write a pxar entry terminated by an additional zero which is not contained in the provided |
136 | /// data buffer. | |
137 | async fn seq_write_pxar_entry_zero<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()> | |
138 | where | |
139 | T: SeqWrite + ?Sized, | |
140 | { | |
141 | seq_write_struct( | |
142 | &mut *output, | |
143 | format::Header::with_content_size(htype, 1 + data.len() as u64), | |
144 | ) | |
145 | .await?; | |
146 | seq_write_all(&mut *output, data).await?; | |
147 | seq_write_all(output, &[0u8]).await | |
148 | } | |
70acf637 | 149 | |
fc9c8c9d WB |
150 | /// Write a pxar entry consiting of an endian-swappable struct. |
151 | async fn seq_write_pxar_struct_entry<E, T>(output: &mut T, htype: u64, data: E) -> io::Result<()> | |
152 | where | |
153 | T: SeqWrite + ?Sized, | |
154 | E: Endian, | |
155 | { | |
156 | let data = data.to_le(); | |
157 | seq_write_pxar_entry(output, htype, unsafe { | |
158 | std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) | |
159 | }) | |
160 | .await | |
70acf637 WB |
161 | } |
162 | ||
59e6f679 WB |
163 | /// Error conditions caused by wrong usage of this crate. |
164 | #[derive(Clone, Copy, Debug, Eq, PartialEq)] | |
165 | pub enum EncodeError { | |
166 | /// The user dropped a `File` without without finishing writing all of its contents. | |
167 | /// | |
168 | /// This is required because the payload lengths is written out at the begining and decoding | |
169 | /// requires there to follow the right amount of data. | |
170 | IncompleteFile, | |
171 | ||
172 | /// The user dropped a directory without finalizing it. | |
173 | /// | |
174 | /// Finalizing is required to build the goodbye table at the end of a directory. | |
175 | IncompleteDirectory, | |
176 | } | |
177 | ||
70acf637 WB |
178 | #[derive(Default)] |
179 | struct EncoderState { | |
180 | /// Goodbye items for this directory, excluding the tail. | |
181 | items: Vec<GoodbyeItem>, | |
182 | ||
59e6f679 WB |
183 | /// User caused error conditions. |
184 | encode_error: Option<EncodeError>, | |
70acf637 WB |
185 | |
186 | /// Offset of this directory's ENTRY. | |
187 | entry_offset: u64, | |
188 | ||
189 | /// Offset to this directory's first FILENAME. | |
190 | files_offset: u64, | |
191 | ||
192 | /// If this is a subdirectory, this points to the this directory's FILENAME. | |
193 | file_offset: Option<u64>, | |
749855a4 WB |
194 | |
195 | /// If this is a subdirectory, this contains this directory's hash for the goodbye item. | |
196 | file_hash: u64, | |
70acf637 WB |
197 | } |
198 | ||
59e6f679 WB |
199 | impl EncoderState { |
200 | fn merge_error(&mut self, error: Option<EncodeError>) { | |
201 | // one error is enough: | |
202 | if self.encode_error.is_none() { | |
203 | self.encode_error = error; | |
204 | } | |
205 | } | |
206 | ||
207 | fn add_error(&mut self, error: EncodeError) { | |
208 | self.merge_error(Some(error)); | |
209 | } | |
210 | } | |
211 | ||
70acf637 WB |
212 | /// The encoder state machine implementation for a directory. |
213 | /// | |
214 | /// We use `async fn` to implement the encoder state machine so that we can easily plug in both | |
215 | /// synchronous or `async` I/O objects in as output. | |
216 | pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> { | |
217 | output: T, | |
218 | state: EncoderState, | |
219 | parent: Option<&'a mut EncoderState>, | |
220 | finished: bool, | |
221 | } | |
222 | ||
223 | impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> { | |
224 | fn drop(&mut self) { | |
225 | if let Some(ref mut parent) = self.parent { | |
226 | // propagate errors: | |
59e6f679 WB |
227 | parent.merge_error(self.state.encode_error); |
228 | if !self.finished { | |
229 | parent.add_error(EncodeError::IncompleteDirectory); | |
70acf637 WB |
230 | } |
231 | } else if !self.finished { | |
232 | // FIXME: how do we deal with this? | |
2f56c76c | 233 | // eprintln!("Encoder dropped without finishing!"); |
70acf637 WB |
234 | } |
235 | } | |
236 | } | |
237 | ||
238 | impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { | |
239 | pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> { | |
240 | if !metadata.is_dir() { | |
241 | io_bail!("directory metadata must contain the directory mode flag"); | |
242 | } | |
243 | let mut this = Self { | |
244 | output, | |
245 | state: EncoderState::default(), | |
246 | parent: None, | |
247 | finished: false, | |
248 | }; | |
249 | ||
250 | this.encode_metadata(metadata).await?; | |
fc9c8c9d | 251 | this.state.files_offset = seq_write_position(&mut this.output).await?; |
70acf637 WB |
252 | |
253 | Ok(this) | |
254 | } | |
255 | ||
256 | fn check(&self) -> io::Result<()> { | |
59e6f679 WB |
257 | match self.state.encode_error { |
258 | Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"), | |
259 | Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"), | |
a7149b09 | 260 | None => Ok(()), |
70acf637 WB |
261 | } |
262 | } | |
263 | ||
264 | pub async fn create_file<'b>( | |
265 | &'b mut self, | |
266 | metadata: &Metadata, | |
267 | file_name: &Path, | |
268 | file_size: u64, | |
269 | ) -> io::Result<FileImpl<'b>> | |
270 | where | |
271 | 'a: 'b, | |
272 | { | |
273 | self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size) | |
274 | .await | |
275 | } | |
276 | ||
277 | async fn create_file_do<'b>( | |
278 | &'b mut self, | |
279 | metadata: &Metadata, | |
280 | file_name: &[u8], | |
281 | file_size: u64, | |
282 | ) -> io::Result<FileImpl<'b>> | |
283 | where | |
284 | 'a: 'b, | |
285 | { | |
286 | self.check()?; | |
287 | ||
fc9c8c9d | 288 | let file_offset = seq_write_position(&mut self.output).await?; |
2ab25a17 | 289 | self.start_file_do(Some(metadata), file_name).await?; |
70acf637 | 290 | |
fc9c8c9d WB |
291 | seq_write_struct( |
292 | &mut self.output, | |
293 | format::Header::with_content_size(format::PXAR_PAYLOAD, file_size), | |
294 | ) | |
295 | .await?; | |
70acf637 | 296 | |
fc9c8c9d | 297 | let payload_data_offset = seq_write_position(&mut self.output).await?; |
70acf637 WB |
298 | |
299 | let meta_size = payload_data_offset - file_offset; | |
300 | ||
301 | Ok(FileImpl { | |
302 | output: &mut self.output, | |
303 | goodbye_item: GoodbyeItem { | |
304 | hash: format::hash_filename(file_name), | |
305 | offset: file_offset, | |
306 | size: file_size + meta_size, | |
307 | }, | |
308 | remaining_size: file_size, | |
309 | parent: &mut self.state, | |
310 | }) | |
311 | } | |
312 | ||
313 | pub async fn add_file( | |
314 | &mut self, | |
315 | metadata: &Metadata, | |
316 | file_name: &Path, | |
317 | file_size: u64, | |
318 | content: &mut dyn SeqRead, | |
319 | ) -> io::Result<()> { | |
320 | let mut file = self.create_file(metadata, file_name, file_size).await?; | |
321 | let mut buf = crate::util::vec_new(4096); | |
322 | loop { | |
951620f1 | 323 | let got = decoder::seq_read(&mut *content, &mut buf).await?; |
70acf637 WB |
324 | if got == 0 { |
325 | break; | |
326 | } else { | |
327 | file.write_all(&buf[..got]).await?; | |
328 | } | |
329 | } | |
330 | Ok(()) | |
331 | } | |
332 | ||
2fb73e7b WB |
333 | pub async fn add_symlink( |
334 | &mut self, | |
335 | metadata: &Metadata, | |
336 | file_name: &Path, | |
337 | target: &Path, | |
0abc4121 | 338 | ) -> io::Result<()> { |
2ab25a17 | 339 | self.add_link(Some(metadata), file_name, target, format::PXAR_SYMLINK) |
0abc4121 WB |
340 | .await |
341 | } | |
342 | ||
2ab25a17 WB |
343 | pub async fn add_hardlink(&mut self, file_name: &Path, target: &Path) -> io::Result<()> { |
344 | self.add_link(None, file_name, target, format::PXAR_HARDLINK) | |
0abc4121 WB |
345 | .await |
346 | } | |
347 | ||
348 | async fn add_link( | |
349 | &mut self, | |
2ab25a17 | 350 | metadata: Option<&Metadata>, |
0abc4121 WB |
351 | file_name: &Path, |
352 | target: &Path, | |
353 | htype: u64, | |
3fe26522 | 354 | ) -> io::Result<()> { |
a7149b09 WB |
355 | self.add_file_entry( |
356 | metadata, | |
357 | file_name, | |
358 | Some((htype, target.as_os_str().as_bytes())), | |
359 | ) | |
360 | .await | |
3fe26522 WB |
361 | } |
362 | ||
363 | pub async fn add_device( | |
364 | &mut self, | |
365 | metadata: &Metadata, | |
366 | file_name: &Path, | |
367 | device: format::Device, | |
368 | ) -> io::Result<()> { | |
a7149b09 WB |
369 | if !metadata.is_device() { |
370 | io_bail!("entry added via add_device must have a device mode in its metadata"); | |
371 | } | |
372 | ||
3fe26522 WB |
373 | let device = device.to_le(); |
374 | let device = unsafe { | |
375 | std::slice::from_raw_parts( | |
376 | &device as *const format::Device as *const u8, | |
377 | size_of::<format::Device>(), | |
378 | ) | |
379 | }; | |
2ab25a17 WB |
380 | self.add_file_entry( |
381 | Some(metadata), | |
382 | file_name, | |
383 | Some((format::PXAR_DEVICE, device)), | |
384 | ) | |
385 | .await | |
3fe26522 WB |
386 | } |
387 | ||
a7149b09 WB |
388 | pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> { |
389 | if !metadata.is_fifo() { | |
390 | io_bail!("entry added via add_device must be of type fifo in its metadata"); | |
391 | } | |
392 | ||
2ab25a17 | 393 | self.add_file_entry(Some(metadata), file_name, None).await |
a7149b09 WB |
394 | } |
395 | ||
396 | pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> { | |
397 | if !metadata.is_socket() { | |
398 | io_bail!("entry added via add_device must be of type socket in its metadata"); | |
399 | } | |
400 | ||
2ab25a17 | 401 | self.add_file_entry(Some(metadata), file_name, None).await |
a7149b09 WB |
402 | } |
403 | ||
3fe26522 WB |
404 | async fn add_file_entry( |
405 | &mut self, | |
2ab25a17 | 406 | metadata: Option<&Metadata>, |
3fe26522 | 407 | file_name: &Path, |
a7149b09 | 408 | entry_htype_data: Option<(u64, &[u8])>, |
2fb73e7b | 409 | ) -> io::Result<()> { |
05356884 | 410 | self.check()?; |
2fb73e7b | 411 | |
fc9c8c9d | 412 | let file_offset = seq_write_position(&mut self.output).await?; |
2fb73e7b WB |
413 | |
414 | let file_name = file_name.as_os_str().as_bytes(); | |
2fb73e7b WB |
415 | |
416 | self.start_file_do(metadata, file_name).await?; | |
a7149b09 | 417 | if let Some((htype, entry_data)) = entry_htype_data { |
fc9c8c9d | 418 | seq_write_pxar_entry_zero(&mut self.output, htype, entry_data).await?; |
a7149b09 | 419 | } |
2fb73e7b | 420 | |
fc9c8c9d | 421 | let end_offset = seq_write_position(&mut self.output).await?; |
2fb73e7b WB |
422 | |
423 | self.state.items.push(GoodbyeItem { | |
424 | hash: format::hash_filename(file_name), | |
425 | offset: file_offset, | |
426 | size: end_offset - file_offset, | |
427 | }); | |
428 | ||
429 | Ok(()) | |
430 | } | |
431 | ||
70acf637 WB |
432 | /// Helper |
433 | #[inline] | |
434 | async fn position(&mut self) -> io::Result<u64> { | |
fc9c8c9d | 435 | seq_write_position(&mut self.output).await |
70acf637 WB |
436 | } |
437 | ||
438 | pub async fn create_directory<'b>( | |
439 | &'b mut self, | |
440 | file_name: &Path, | |
441 | metadata: &Metadata, | |
442 | ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>> | |
443 | where | |
444 | 'a: 'b, | |
445 | { | |
446 | self.check()?; | |
447 | ||
448 | if !metadata.is_dir() { | |
449 | io_bail!("directory metadata must contain the directory mode flag"); | |
450 | } | |
451 | ||
452 | let file_name = file_name.as_os_str().as_bytes(); | |
749855a4 | 453 | let file_hash = format::hash_filename(file_name); |
70acf637 WB |
454 | |
455 | let file_offset = self.position().await?; | |
456 | self.encode_filename(file_name).await?; | |
457 | ||
458 | let entry_offset = self.position().await?; | |
459 | self.encode_metadata(&metadata).await?; | |
460 | ||
461 | let files_offset = self.position().await?; | |
462 | ||
463 | Ok(EncoderImpl { | |
464 | output: self.output.as_trait_object(), | |
465 | state: EncoderState { | |
466 | entry_offset, | |
467 | files_offset, | |
468 | file_offset: Some(file_offset), | |
749855a4 | 469 | file_hash: file_hash, |
70acf637 WB |
470 | ..Default::default() |
471 | }, | |
472 | parent: Some(&mut self.state), | |
473 | finished: false, | |
474 | }) | |
475 | } | |
476 | ||
2ab25a17 WB |
477 | async fn start_file_do( |
478 | &mut self, | |
479 | metadata: Option<&Metadata>, | |
480 | file_name: &[u8], | |
481 | ) -> io::Result<()> { | |
70acf637 | 482 | self.encode_filename(file_name).await?; |
2ab25a17 WB |
483 | if let Some(metadata) = metadata { |
484 | self.encode_metadata(&metadata).await?; | |
485 | } | |
70acf637 WB |
486 | Ok(()) |
487 | } | |
488 | ||
489 | async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> { | |
fc9c8c9d | 490 | seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ENTRY, metadata.stat.clone()) |
70acf637 | 491 | .await?; |
b5a471a3 WB |
492 | |
493 | for xattr in &metadata.xattrs { | |
494 | self.write_xattr(xattr).await?; | |
495 | } | |
496 | ||
497 | self.write_acls(&metadata.acl).await?; | |
498 | ||
499 | if let Some(fcaps) = &metadata.fcaps { | |
500 | self.write_file_capabilities(fcaps).await?; | |
501 | } | |
502 | ||
503 | if let Some(qpid) = &metadata.quota_project_id { | |
504 | self.write_quota_project_id(qpid).await?; | |
505 | } | |
506 | ||
507 | Ok(()) | |
508 | } | |
509 | ||
510 | async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> { | |
fc9c8c9d | 511 | seq_write_pxar_entry(&mut self.output, format::PXAR_XATTR, &xattr.data).await |
b5a471a3 WB |
512 | } |
513 | ||
514 | async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> { | |
515 | for acl in &acl.users { | |
fc9c8c9d | 516 | seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_USER, acl.clone()) |
b5a471a3 WB |
517 | .await?; |
518 | } | |
519 | ||
520 | for acl in &acl.groups { | |
fc9c8c9d | 521 | seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP, acl.clone()) |
b5a471a3 WB |
522 | .await?; |
523 | } | |
524 | ||
525 | if let Some(acl) = &acl.group_obj { | |
fc9c8c9d | 526 | seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP_OBJ, acl.clone()) |
b5a471a3 WB |
527 | .await?; |
528 | } | |
529 | ||
530 | if let Some(acl) = &acl.default { | |
fc9c8c9d | 531 | seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_DEFAULT, acl.clone()) |
b5a471a3 WB |
532 | .await?; |
533 | } | |
534 | ||
535 | for acl in &acl.default_users { | |
fc9c8c9d WB |
536 | seq_write_pxar_struct_entry( |
537 | &mut self.output, | |
538 | format::PXAR_ACL_DEFAULT_USER, | |
539 | acl.clone(), | |
540 | ) | |
541 | .await?; | |
b5a471a3 WB |
542 | } |
543 | ||
544 | for acl in &acl.default_groups { | |
fc9c8c9d WB |
545 | seq_write_pxar_struct_entry( |
546 | &mut self.output, | |
547 | format::PXAR_ACL_DEFAULT_GROUP, | |
548 | acl.clone(), | |
549 | ) | |
550 | .await?; | |
b5a471a3 WB |
551 | } |
552 | ||
70acf637 WB |
553 | Ok(()) |
554 | } | |
555 | ||
b5a471a3 | 556 | async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> { |
fc9c8c9d | 557 | seq_write_pxar_entry(&mut self.output, format::PXAR_FCAPS, &fcaps.data).await |
b5a471a3 WB |
558 | } |
559 | ||
560 | async fn write_quota_project_id( | |
561 | &mut self, | |
562 | quota_project_id: &format::QuotaProjectId, | |
563 | ) -> io::Result<()> { | |
fc9c8c9d WB |
564 | seq_write_pxar_struct_entry( |
565 | &mut self.output, | |
566 | format::PXAR_QUOTA_PROJID, | |
567 | quota_project_id.clone(), | |
568 | ) | |
569 | .await | |
b5a471a3 WB |
570 | } |
571 | ||
70acf637 | 572 | async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> { |
fc9c8c9d | 573 | seq_write_pxar_entry_zero(&mut self.output, format::PXAR_FILENAME, file_name).await |
70acf637 WB |
574 | } |
575 | ||
576 | pub async fn finish(mut self) -> io::Result<()> { | |
577 | let tail_bytes = self.finish_goodbye_table().await?; | |
fc9c8c9d | 578 | seq_write_pxar_entry(&mut self.output, format::PXAR_GOODBYE, &tail_bytes).await?; |
749855a4 WB |
579 | if let Some(parent) = &mut self.parent { |
580 | let file_offset = self | |
581 | .state | |
582 | .file_offset | |
583 | .expect("internal error: parent set but no file_offset?"); | |
584 | ||
fc9c8c9d | 585 | let end_offset = seq_write_position(&mut self.output).await?; |
749855a4 WB |
586 | |
587 | parent.items.push(GoodbyeItem { | |
588 | hash: self.state.file_hash, | |
589 | offset: file_offset, | |
590 | size: end_offset - file_offset, | |
591 | }); | |
592 | } | |
70acf637 WB |
593 | self.finished = true; |
594 | Ok(()) | |
595 | } | |
596 | ||
597 | async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> { | |
fc9c8c9d | 598 | let goodbye_offset = seq_write_position(&mut self.output).await?; |
70acf637 WB |
599 | |
600 | // "take" out the tail (to not leave an array of endian-swapped structs in `self`) | |
601 | let mut tail = take(&mut self.state.items); | |
602 | let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>(); | |
603 | let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64; | |
604 | ||
749855a4 WB |
605 | // sort, then create a BST |
606 | tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash)); | |
70acf637 | 607 | |
749855a4 WB |
608 | let mut bst = Vec::with_capacity(tail.len() + 1); |
609 | unsafe { | |
610 | bst.set_len(tail.len()); | |
70acf637 | 611 | } |
749855a4 WB |
612 | binary_tree_array::copy(tail.len(), |src, dest| { |
613 | let mut item = tail[src].clone(); | |
614 | // fixup the goodbye table offsets to be relative and with the right endianess | |
615 | item.offset = goodbye_offset - item.offset; | |
616 | unsafe { | |
617 | std::ptr::write(&mut bst[dest], item.to_le()); | |
618 | } | |
619 | }); | |
620 | drop(tail); | |
621 | ||
622 | bst.push( | |
623 | GoodbyeItem { | |
624 | hash: format::PXAR_GOODBYE_TAIL_MARKER, | |
625 | offset: goodbye_offset - self.state.entry_offset, | |
626 | size: goodbye_size, | |
627 | } | |
628 | .to_le(), | |
629 | ); | |
70acf637 WB |
630 | |
631 | // turn this into a byte vector since after endian-swapping we can no longer guarantee that | |
632 | // the items make sense: | |
749855a4 WB |
633 | let data = bst.as_mut_ptr() as *mut u8; |
634 | let capacity = bst.capacity() * size_of::<GoodbyeItem>(); | |
635 | forget(bst); | |
70acf637 WB |
636 | Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) }) |
637 | } | |
638 | } | |
639 | ||
640 | /// Writer for a file object in a directory. | |
641 | pub struct FileImpl<'a> { | |
642 | output: &'a mut dyn SeqWrite, | |
643 | ||
644 | /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it | |
645 | /// directly instead of on Drop of FileImpl? | |
646 | goodbye_item: GoodbyeItem, | |
647 | ||
648 | /// While writing data to this file, this is how much space we still have left, this must reach | |
649 | /// exactly zero. | |
650 | remaining_size: u64, | |
651 | ||
59e6f679 | 652 | /// The directory containing this file. This is where we propagate the `IncompleteFile` error |
70acf637 WB |
653 | /// to, and where we insert our `GoodbyeItem`. |
654 | parent: &'a mut EncoderState, | |
655 | } | |
656 | ||
657 | impl<'a> Drop for FileImpl<'a> { | |
658 | fn drop(&mut self) { | |
659 | if self.remaining_size != 0 { | |
59e6f679 | 660 | self.parent.add_error(EncodeError::IncompleteFile); |
70acf637 WB |
661 | } |
662 | ||
663 | self.parent.items.push(self.goodbye_item.clone()); | |
664 | } | |
665 | } | |
666 | ||
667 | impl<'a> FileImpl<'a> { | |
668 | fn check_remaining(&self, size: usize) -> io::Result<()> { | |
669 | if size as u64 > self.remaining_size { | |
670 | io_bail!("attempted to write more than previously allocated"); | |
671 | } else { | |
672 | Ok(()) | |
673 | } | |
674 | } | |
675 | ||
05356884 WB |
676 | /// Poll write interface to more easily connect to tokio/futures. |
677 | #[cfg(any(feature = "tokio-io", feature = "futures-io"))] | |
678 | pub fn poll_write( | |
679 | self: Pin<&mut Self>, | |
680 | cx: &mut Context, | |
681 | data: &[u8], | |
682 | ) -> Poll<io::Result<usize>> { | |
683 | let this = self.get_mut(); | |
684 | this.check_remaining(data.len())?; | |
685 | let output = unsafe { Pin::new_unchecked(&mut *this.output) }; | |
686 | match output.poll_seq_write(cx, data) { | |
687 | Poll::Ready(Ok(put)) => { | |
688 | this.remaining_size -= put as u64; | |
689 | Poll::Ready(Ok(put)) | |
690 | } | |
691 | other => other, | |
692 | } | |
693 | } | |
694 | ||
695 | /// Poll flush interface to more easily connect to tokio/futures. | |
696 | #[cfg(any(feature = "tokio-io", feature = "futures-io"))] | |
697 | pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
698 | unsafe { | |
327e33f2 WB |
699 | self.map_unchecked_mut(|this| &mut this.output) |
700 | .poll_flush(cx) | |
05356884 WB |
701 | } |
702 | } | |
703 | ||
704 | /// Poll close/shutdown interface to more easily connect to tokio/futures. | |
705 | #[cfg(any(feature = "tokio-io", feature = "futures-io"))] | |
706 | pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
707 | unsafe { | |
327e33f2 WB |
708 | self.map_unchecked_mut(|this| &mut this.output) |
709 | .poll_close(cx) | |
05356884 WB |
710 | } |
711 | } | |
712 | ||
70acf637 WB |
713 | /// Write file data for the current file entry in a pxar archive. |
714 | /// | |
715 | /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than | |
716 | /// requested. Check the return value for how many. There's also a `write_all` method available | |
717 | /// for convenience. | |
718 | pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> { | |
719 | self.check_remaining(data.len())?; | |
fc9c8c9d | 720 | let put = seq_write(&mut self.output, data).await?; |
70acf637 WB |
721 | self.remaining_size -= put as u64; |
722 | Ok(put) | |
723 | } | |
724 | ||
725 | /// Completely write file data for the current file entry in a pxar archive. | |
726 | pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> { | |
727 | self.check_remaining(data.len())?; | |
fc9c8c9d | 728 | seq_write_all(&mut self.output, data).await?; |
70acf637 WB |
729 | self.remaining_size -= data.len() as u64; |
730 | Ok(()) | |
731 | } | |
732 | } | |
05356884 WB |
733 | |
734 | #[cfg(feature = "tokio-io")] | |
735 | impl<'a> tokio::io::AsyncWrite for FileImpl<'a> { | |
327e33f2 | 736 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> { |
05356884 WB |
737 | FileImpl::poll_write(self, cx, buf) |
738 | } | |
739 | ||
740 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
741 | FileImpl::poll_flush(self, cx) | |
742 | } | |
743 | ||
744 | fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
745 | FileImpl::poll_close(self, cx) | |
746 | } | |
747 | } | |
748 | ||
749 | #[cfg(feature = "futures-io")] | |
750 | impl<'a> futures::io::AsyncWrite for FileImpl<'a> { | |
327e33f2 | 751 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> { |
05356884 WB |
752 | FileImpl::poll_write(self, cx, buf) |
753 | } | |
754 | ||
755 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
756 | FileImpl::poll_flush(self, cx) | |
757 | } | |
758 | ||
759 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
760 | FileImpl::poll_close(self, cx) | |
761 | } | |
762 | } |