]> git.proxmox.com Git - pxar.git/blame - src/encoder/mod.rs
add more code documentation
[pxar.git] / src / encoder / mod.rs
CommitLineData
70acf637
WB
1//! The `pxar` encoder state machine.
2//!
3//! This is the implementation used by both the synchronous and async pxar wrappers.
4
e5a2495e
WB
5#![deny(missing_docs)]
6
70acf637
WB
7use std::io;
8use std::mem::{forget, size_of, size_of_val, take};
9use std::os::unix::ffi::OsStrExt;
10use std::path::Path;
11use std::pin::Pin;
a08b84b7 12use std::sync::{Arc, Mutex};
70acf637
WB
13use std::task::{Context, Poll};
14
15use endian_trait::Endian;
16
749855a4 17use crate::binary_tree_array;
951620f1 18use crate::decoder::{self, SeqRead};
70acf637
WB
19use crate::format::{self, GoodbyeItem};
20use crate::poll_fn::poll_fn;
21use crate::Metadata;
22
54109840 23pub mod aio;
70acf637
WB
24pub mod sync;
25
26#[doc(inline)]
27pub use sync::Encoder;
28
d6748862
WB
29/// File reference used to create hard links.
30#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
31pub struct LinkOffset(u64);
32
33impl LinkOffset {
e5a2495e 34 /// Get the raw byte offset of this link.
d6748862
WB
35 #[inline]
36 pub fn raw(self) -> u64 {
37 self.0
38 }
39}
40
70acf637
WB
41/// Sequential write interface used by the encoder's state machine.
42///
43/// This is our internal writer trait which is available for `std::io::Write` types in the
44/// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
45/// wrapper.
46pub trait SeqWrite {
e5a2495e
WB
47 /// Attempt to perform a sequential write to the file. On success, the number of written bytes
48 /// is returned as `Poll::Ready(Ok(bytes))`.
49 ///
50 /// If writing is not yet possible, `Poll::Pending` is returned and the current task will be
51 /// notified via the `cx.waker()` when writing becomes possible.
70acf637
WB
52 fn poll_seq_write(
53 self: Pin<&mut Self>,
54 cx: &mut Context,
55 buf: &[u8],
56 ) -> Poll<io::Result<usize>>;
57
e5a2495e
WB
58 /// Attempt to flush the output, ensuring that all buffered data reaches the destination.
59 ///
60 /// On success, returns `Poll::Ready(Ok(()))`.
61 ///
62 /// If flushing cannot complete immediately, `Poll::Pending` is returned and the current task
63 /// will be notified via `cx.waker()` when progress can be made.
327e33f2 64 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
70acf637
WB
65}
66
67/// Allow using trait objects for generics taking a `SeqWrite`.
d8402433
SR
68impl<S> SeqWrite for &mut S
69where
70 S: SeqWrite + ?Sized,
71{
70acf637
WB
72 fn poll_seq_write(
73 self: Pin<&mut Self>,
74 cx: &mut Context,
75 buf: &[u8],
76 ) -> Poll<io::Result<usize>> {
77 unsafe {
78 self.map_unchecked_mut(|this| &mut **this)
79 .poll_seq_write(cx, buf)
80 }
81 }
82
05356884
WB
83 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
84 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
85 }
70acf637
WB
86}
87
fc9c8c9d 88/// awaitable verison of `poll_seq_write`.
ec4a53ed
WB
89async fn seq_write<T: SeqWrite + ?Sized>(
90 output: &mut T,
91 buf: &[u8],
92 position: &mut u64,
93) -> io::Result<usize> {
94 let put =
95 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await?;
96 *position += put as u64;
97 Ok(put)
fc9c8c9d 98}
70acf637 99
d995b531 100/// awaitable version of 'poll_flush'.
eae6dc06 101async fn flush<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<()> {
d995b531
DC
102 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_flush(cx) }).await
103}
104
fc9c8c9d 105/// Write the entire contents of a buffer, handling short writes.
ec4a53ed
WB
106async fn seq_write_all<T: SeqWrite + ?Sized>(
107 output: &mut T,
108 mut buf: &[u8],
109 position: &mut u64,
110) -> io::Result<()> {
fc9c8c9d 111 while !buf.is_empty() {
ec4a53ed 112 let got = seq_write(&mut *output, buf, &mut *position).await?;
fc9c8c9d 113 buf = &buf[got..];
70acf637 114 }
fc9c8c9d
WB
115 Ok(())
116}
70acf637 117
fc9c8c9d 118/// Write an endian-swappable struct.
ec4a53ed
WB
119async fn seq_write_struct<E: Endian, T>(
120 output: &mut T,
121 data: E,
122 position: &mut u64,
123) -> io::Result<()>
fc9c8c9d
WB
124where
125 T: SeqWrite + ?Sized,
126{
127 let data = data.to_le();
a08b84b7
WB
128 let buf =
129 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
8bff2f7c 130 seq_write_all(output, buf, position).await
fc9c8c9d 131}
70acf637 132
fc9c8c9d 133/// Write a pxar entry.
ec4a53ed
WB
134async fn seq_write_pxar_entry<T>(
135 output: &mut T,
136 htype: u64,
137 data: &[u8],
138 position: &mut u64,
139) -> io::Result<()>
fc9c8c9d
WB
140where
141 T: SeqWrite + ?Sized,
142{
ec0761f9
FG
143 let header = format::Header::with_content_size(htype, data.len() as u64);
144 header.check_header_size()?;
145
ec4a53ed
WB
146 seq_write_struct(&mut *output, header, &mut *position).await?;
147 seq_write_all(output, data, position).await
fc9c8c9d 148}
70acf637 149
fc9c8c9d
WB
150/// Write a pxar entry terminated by an additional zero which is not contained in the provided
151/// data buffer.
ec4a53ed
WB
152async fn seq_write_pxar_entry_zero<T>(
153 output: &mut T,
154 htype: u64,
155 data: &[u8],
156 position: &mut u64,
157) -> io::Result<()>
fc9c8c9d
WB
158where
159 T: SeqWrite + ?Sized,
160{
ec0761f9
FG
161 let header = format::Header::with_content_size(htype, 1 + data.len() as u64);
162 header.check_header_size()?;
163
ec4a53ed
WB
164 seq_write_struct(&mut *output, header, &mut *position).await?;
165 seq_write_all(&mut *output, data, &mut *position).await?;
166 seq_write_all(output, &[0u8], position).await
fc9c8c9d 167}
70acf637 168
fc9c8c9d 169/// Write a pxar entry consiting of an endian-swappable struct.
ec4a53ed
WB
170async fn seq_write_pxar_struct_entry<E, T>(
171 output: &mut T,
172 htype: u64,
173 data: E,
174 position: &mut u64,
175) -> io::Result<()>
fc9c8c9d
WB
176where
177 T: SeqWrite + ?Sized,
178 E: Endian,
179{
180 let data = data.to_le();
a08b84b7
WB
181 let buf =
182 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
8bff2f7c 183 seq_write_pxar_entry(output, htype, buf, position).await
70acf637
WB
184}
185
59e6f679
WB
186/// Error conditions caused by wrong usage of this crate.
187#[derive(Clone, Copy, Debug, Eq, PartialEq)]
188pub enum EncodeError {
189 /// The user dropped a `File` without without finishing writing all of its contents.
190 ///
191 /// This is required because the payload lengths is written out at the begining and decoding
192 /// requires there to follow the right amount of data.
193 IncompleteFile,
194
195 /// The user dropped a directory without finalizing it.
196 ///
197 /// Finalizing is required to build the goodbye table at the end of a directory.
198 IncompleteDirectory,
199}
200
70acf637
WB
201#[derive(Default)]
202struct EncoderState {
203 /// Goodbye items for this directory, excluding the tail.
204 items: Vec<GoodbyeItem>,
205
59e6f679
WB
206 /// User caused error conditions.
207 encode_error: Option<EncodeError>,
70acf637
WB
208
209 /// Offset of this directory's ENTRY.
210 entry_offset: u64,
211
d4a04d53 212 #[allow(dead_code)]
70acf637
WB
213 /// Offset to this directory's first FILENAME.
214 files_offset: u64,
215
216 /// If this is a subdirectory, this points to the this directory's FILENAME.
217 file_offset: Option<u64>,
749855a4
WB
218
219 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
220 file_hash: u64,
ec4a53ed
WB
221
222 /// We need to keep track how much we have written to get offsets.
223 write_position: u64,
70acf637
WB
224}
225
59e6f679
WB
226impl EncoderState {
227 fn merge_error(&mut self, error: Option<EncodeError>) {
228 // one error is enough:
229 if self.encode_error.is_none() {
230 self.encode_error = error;
231 }
232 }
233
234 fn add_error(&mut self, error: EncodeError) {
235 self.merge_error(Some(error));
236 }
237}
238
d8402433
SR
239pub(crate) enum EncoderOutput<'a, T> {
240 Owned(T),
241 Borrowed(&'a mut T),
242}
243
b8d79e5f
WB
244impl<'a, T> EncoderOutput<'a, T> {
245 #[inline]
d4a04d53 246 fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
b8d79e5f
WB
247 where
248 'a: 's,
249 {
250 EncoderOutput::Borrowed(self.as_mut())
251 }
252}
253
d8402433
SR
254impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
255 fn as_mut(&mut self) -> &mut T {
256 match self {
b8d79e5f 257 EncoderOutput::Owned(o) => o,
d8402433
SR
258 EncoderOutput::Borrowed(b) => b,
259 }
260 }
261}
262
263impl<'a, T> std::convert::From<T> for EncoderOutput<'a, T> {
264 fn from(t: T) -> Self {
265 EncoderOutput::Owned(t)
266 }
267}
268
269impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
270 fn from(t: &'a mut T) -> Self {
271 EncoderOutput::Borrowed(t)
272 }
273}
274
70acf637
WB
275/// The encoder state machine implementation for a directory.
276///
277/// We use `async fn` to implement the encoder state machine so that we can easily plug in both
278/// synchronous or `async` I/O objects in as output.
279pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
d8402433 280 output: EncoderOutput<'a, T>,
70acf637
WB
281 state: EncoderState,
282 parent: Option<&'a mut EncoderState>,
283 finished: bool,
79e3f621
WB
284
285 /// Since only the "current" entry can be actively writing files, we share the file copy
286 /// buffer.
a08b84b7 287 file_copy_buffer: Arc<Mutex<Vec<u8>>>,
70acf637
WB
288}
289
290impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
291 fn drop(&mut self) {
292 if let Some(ref mut parent) = self.parent {
293 // propagate errors:
59e6f679
WB
294 parent.merge_error(self.state.encode_error);
295 if !self.finished {
296 parent.add_error(EncodeError::IncompleteDirectory);
70acf637
WB
297 }
298 } else if !self.finished {
299 // FIXME: how do we deal with this?
2f56c76c 300 // eprintln!("Encoder dropped without finishing!");
70acf637
WB
301 }
302 }
303}
304
305impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
737f75cf
WB
306 pub async fn new(
307 output: EncoderOutput<'a, T>,
308 metadata: &Metadata,
309 ) -> io::Result<EncoderImpl<'a, T>> {
70acf637
WB
310 if !metadata.is_dir() {
311 io_bail!("directory metadata must contain the directory mode flag");
312 }
313 let mut this = Self {
d8402433 314 output,
70acf637
WB
315 state: EncoderState::default(),
316 parent: None,
317 finished: false,
a08b84b7 318 file_copy_buffer: Arc::new(Mutex::new(crate::util::vec_new(1024 * 1024))),
70acf637
WB
319 };
320
321 this.encode_metadata(metadata).await?;
ec4a53ed 322 this.state.files_offset = this.position();
70acf637
WB
323
324 Ok(this)
325 }
326
327 fn check(&self) -> io::Result<()> {
59e6f679
WB
328 match self.state.encode_error {
329 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
330 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
a7149b09 331 None => Ok(()),
70acf637
WB
332 }
333 }
334
335 pub async fn create_file<'b>(
336 &'b mut self,
337 metadata: &Metadata,
338 file_name: &Path,
339 file_size: u64,
d8402433 340 ) -> io::Result<FileImpl<'b, T>>
70acf637
WB
341 where
342 'a: 'b,
343 {
344 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
345 .await
346 }
347
348 async fn create_file_do<'b>(
349 &'b mut self,
350 metadata: &Metadata,
351 file_name: &[u8],
352 file_size: u64,
d8402433 353 ) -> io::Result<FileImpl<'b, T>>
70acf637
WB
354 where
355 'a: 'b,
356 {
357 self.check()?;
358
ec4a53ed 359 let file_offset = self.position();
2ab25a17 360 self.start_file_do(Some(metadata), file_name).await?;
70acf637 361
ec0761f9
FG
362 let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
363 header.check_header_size()?;
364
737f75cf 365 seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
70acf637 366
ec4a53ed 367 let payload_data_offset = self.position();
70acf637
WB
368
369 let meta_size = payload_data_offset - file_offset;
370
371 Ok(FileImpl {
d8402433 372 output: self.output.as_mut(),
70acf637
WB
373 goodbye_item: GoodbyeItem {
374 hash: format::hash_filename(file_name),
375 offset: file_offset,
376 size: file_size + meta_size,
377 },
378 remaining_size: file_size,
379 parent: &mut self.state,
380 })
381 }
382
d6748862 383 /// Return a file offset usable with `add_hardlink`.
70acf637
WB
384 pub async fn add_file(
385 &mut self,
386 metadata: &Metadata,
387 file_name: &Path,
388 file_size: u64,
389 content: &mut dyn SeqRead,
d6748862 390 ) -> io::Result<LinkOffset> {
a08b84b7 391 let buf = Arc::clone(&self.file_copy_buffer);
70acf637 392 let mut file = self.create_file(metadata, file_name, file_size).await?;
a08b84b7 393 let mut buf = buf.lock().expect("failed to lock temporary buffer mutex");
70acf637 394 loop {
79e3f621 395 let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
70acf637
WB
396 if got == 0 {
397 break;
398 } else {
399 file.write_all(&buf[..got]).await?;
400 }
401 }
d6748862 402 Ok(file.file_offset())
70acf637
WB
403 }
404
d6748862 405 /// Return a file offset usable with `add_hardlink`.
2fb73e7b
WB
406 pub async fn add_symlink(
407 &mut self,
408 metadata: &Metadata,
409 file_name: &Path,
410 target: &Path,
501ea220 411 ) -> io::Result<()> {
16f0464d
WB
412 let target = target.as_os_str().as_bytes();
413 let mut data = Vec::with_capacity(target.len() + 1);
414 data.extend(target);
415 data.push(0);
b0487d4f
WB
416 let _ofs: LinkOffset = self
417 .add_file_entry(
418 Some(metadata),
419 file_name,
16f0464d 420 Some((format::PXAR_SYMLINK, &data)),
b0487d4f
WB
421 )
422 .await?;
501ea220 423 Ok(())
0abc4121
WB
424 }
425
d6748862 426 /// Return a file offset usable with `add_hardlink`.
b0752929 427 pub async fn add_hardlink(
0abc4121 428 &mut self,
0abc4121
WB
429 file_name: &Path,
430 target: &Path,
63f2296f 431 target_offset: LinkOffset,
3fe26522 432 ) -> io::Result<()> {
ec4a53ed 433 let current_offset = self.position();
63f2296f
WB
434 if current_offset <= target_offset.0 {
435 io_bail!("invalid hardlink offset, can only point to prior files");
436 }
437
f3e50baa
WB
438 let offset_bytes = (current_offset - target_offset.0).to_le_bytes();
439 let target_bytes = target.as_os_str().as_bytes();
16f0464d 440 let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len() + 1);
f3e50baa
WB
441 hardlink.extend(&offset_bytes);
442 hardlink.extend(target_bytes);
16f0464d 443 hardlink.push(0);
b0487d4f 444 let _this_offset: LinkOffset = self
f3e50baa 445 .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink)))
b0487d4f 446 .await?;
d6748862 447 Ok(())
3fe26522
WB
448 }
449
d6748862 450 /// Return a file offset usable with `add_hardlink`.
3fe26522
WB
451 pub async fn add_device(
452 &mut self,
453 metadata: &Metadata,
454 file_name: &Path,
455 device: format::Device,
501ea220 456 ) -> io::Result<()> {
a7149b09
WB
457 if !metadata.is_device() {
458 io_bail!("entry added via add_device must have a device mode in its metadata");
459 }
460
3fe26522
WB
461 let device = device.to_le();
462 let device = unsafe {
463 std::slice::from_raw_parts(
464 &device as *const format::Device as *const u8,
465 size_of::<format::Device>(),
466 )
467 };
b0487d4f
WB
468 let _ofs: LinkOffset = self
469 .add_file_entry(
470 Some(metadata),
471 file_name,
472 Some((format::PXAR_DEVICE, device)),
473 )
474 .await?;
501ea220 475 Ok(())
3fe26522
WB
476 }
477
d6748862 478 /// Return a file offset usable with `add_hardlink`.
501ea220 479 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
a7149b09
WB
480 if !metadata.is_fifo() {
481 io_bail!("entry added via add_device must be of type fifo in its metadata");
482 }
483
501ea220
WB
484 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
485 Ok(())
a7149b09
WB
486 }
487
d6748862 488 /// Return a file offset usable with `add_hardlink`.
501ea220 489 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
a7149b09
WB
490 if !metadata.is_socket() {
491 io_bail!("entry added via add_device must be of type socket in its metadata");
492 }
493
501ea220
WB
494 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
495 Ok(())
a7149b09
WB
496 }
497
d6748862 498 /// Return a file offset usable with `add_hardlink`.
3fe26522
WB
499 async fn add_file_entry(
500 &mut self,
2ab25a17 501 metadata: Option<&Metadata>,
3fe26522 502 file_name: &Path,
a7149b09 503 entry_htype_data: Option<(u64, &[u8])>,
d6748862 504 ) -> io::Result<LinkOffset> {
05356884 505 self.check()?;
2fb73e7b 506
ec4a53ed 507 let file_offset = self.position();
2fb73e7b
WB
508
509 let file_name = file_name.as_os_str().as_bytes();
2fb73e7b
WB
510
511 self.start_file_do(metadata, file_name).await?;
a7149b09 512 if let Some((htype, entry_data)) = entry_htype_data {
ec4a53ed 513 seq_write_pxar_entry(
d8402433 514 self.output.as_mut(),
ec4a53ed
WB
515 htype,
516 entry_data,
517 &mut self.state.write_position,
518 )
519 .await?;
a7149b09 520 }
2fb73e7b 521
ec4a53ed 522 let end_offset = self.position();
2fb73e7b
WB
523
524 self.state.items.push(GoodbyeItem {
525 hash: format::hash_filename(file_name),
526 offset: file_offset,
527 size: end_offset - file_offset,
528 });
529
d6748862 530 Ok(LinkOffset(file_offset))
2fb73e7b
WB
531 }
532
70acf637 533 #[inline]
ec4a53ed
WB
534 fn position(&mut self) -> u64 {
535 self.state.write_position
70acf637
WB
536 }
537
d8402433
SR
538 pub async fn create_directory(
539 &mut self,
70acf637
WB
540 file_name: &Path,
541 metadata: &Metadata,
d8402433 542 ) -> io::Result<EncoderImpl<'_, T>> {
70acf637
WB
543 self.check()?;
544
545 if !metadata.is_dir() {
546 io_bail!("directory metadata must contain the directory mode flag");
547 }
548
549 let file_name = file_name.as_os_str().as_bytes();
749855a4 550 let file_hash = format::hash_filename(file_name);
70acf637 551
ec4a53ed 552 let file_offset = self.position();
70acf637
WB
553 self.encode_filename(file_name).await?;
554
ec4a53ed 555 let entry_offset = self.position();
70acf637
WB
556 self.encode_metadata(&metadata).await?;
557
ec4a53ed
WB
558 let files_offset = self.position();
559
560 // the child will write to OUR state now:
561 let write_position = self.position();
70acf637 562
d8402433
SR
563 let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
564
70acf637 565 Ok(EncoderImpl {
d8402433 566 // always forward as Borrowed(), to avoid stacking references on nested calls
d4a04d53 567 output: self.output.to_borrowed_mut(),
70acf637
WB
568 state: EncoderState {
569 entry_offset,
570 files_offset,
571 file_offset: Some(file_offset),
1b25fc08 572 file_hash,
ec4a53ed 573 write_position,
70acf637
WB
574 ..Default::default()
575 },
576 parent: Some(&mut self.state),
577 finished: false,
d8402433 578 file_copy_buffer,
70acf637
WB
579 })
580 }
581
2ab25a17
WB
582 async fn start_file_do(
583 &mut self,
584 metadata: Option<&Metadata>,
585 file_name: &[u8],
586 ) -> io::Result<()> {
70acf637 587 self.encode_filename(file_name).await?;
2ab25a17
WB
588 if let Some(metadata) = metadata {
589 self.encode_metadata(&metadata).await?;
590 }
70acf637
WB
591 Ok(())
592 }
593
594 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
ec4a53ed 595 seq_write_pxar_struct_entry(
d8402433 596 self.output.as_mut(),
ec4a53ed
WB
597 format::PXAR_ENTRY,
598 metadata.stat.clone(),
599 &mut self.state.write_position,
600 )
601 .await?;
b5a471a3
WB
602
603 for xattr in &metadata.xattrs {
604 self.write_xattr(xattr).await?;
605 }
606
607 self.write_acls(&metadata.acl).await?;
608
609 if let Some(fcaps) = &metadata.fcaps {
610 self.write_file_capabilities(fcaps).await?;
611 }
612
613 if let Some(qpid) = &metadata.quota_project_id {
614 self.write_quota_project_id(qpid).await?;
615 }
616
617 Ok(())
618 }
619
620 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
ec4a53ed 621 seq_write_pxar_entry(
d8402433 622 self.output.as_mut(),
ec4a53ed
WB
623 format::PXAR_XATTR,
624 &xattr.data,
625 &mut self.state.write_position,
626 )
627 .await
b5a471a3
WB
628 }
629
630 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
631 for acl in &acl.users {
ec4a53ed 632 seq_write_pxar_struct_entry(
d8402433 633 self.output.as_mut(),
ec4a53ed
WB
634 format::PXAR_ACL_USER,
635 acl.clone(),
636 &mut self.state.write_position,
637 )
638 .await?;
b5a471a3
WB
639 }
640
641 for acl in &acl.groups {
ec4a53ed 642 seq_write_pxar_struct_entry(
d8402433 643 self.output.as_mut(),
ec4a53ed
WB
644 format::PXAR_ACL_GROUP,
645 acl.clone(),
646 &mut self.state.write_position,
647 )
648 .await?;
b5a471a3
WB
649 }
650
651 if let Some(acl) = &acl.group_obj {
ec4a53ed 652 seq_write_pxar_struct_entry(
d8402433 653 self.output.as_mut(),
ec4a53ed
WB
654 format::PXAR_ACL_GROUP_OBJ,
655 acl.clone(),
656 &mut self.state.write_position,
657 )
658 .await?;
b5a471a3
WB
659 }
660
661 if let Some(acl) = &acl.default {
ec4a53ed 662 seq_write_pxar_struct_entry(
d8402433 663 self.output.as_mut(),
ec4a53ed
WB
664 format::PXAR_ACL_DEFAULT,
665 acl.clone(),
666 &mut self.state.write_position,
667 )
668 .await?;
b5a471a3
WB
669 }
670
671 for acl in &acl.default_users {
fc9c8c9d 672 seq_write_pxar_struct_entry(
d8402433 673 self.output.as_mut(),
fc9c8c9d
WB
674 format::PXAR_ACL_DEFAULT_USER,
675 acl.clone(),
ec4a53ed 676 &mut self.state.write_position,
fc9c8c9d
WB
677 )
678 .await?;
b5a471a3
WB
679 }
680
681 for acl in &acl.default_groups {
fc9c8c9d 682 seq_write_pxar_struct_entry(
d8402433 683 self.output.as_mut(),
fc9c8c9d
WB
684 format::PXAR_ACL_DEFAULT_GROUP,
685 acl.clone(),
ec4a53ed 686 &mut self.state.write_position,
fc9c8c9d
WB
687 )
688 .await?;
b5a471a3
WB
689 }
690
70acf637
WB
691 Ok(())
692 }
693
b5a471a3 694 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
ec4a53ed 695 seq_write_pxar_entry(
d8402433 696 self.output.as_mut(),
ec4a53ed
WB
697 format::PXAR_FCAPS,
698 &fcaps.data,
699 &mut self.state.write_position,
700 )
701 .await
b5a471a3
WB
702 }
703
704 async fn write_quota_project_id(
705 &mut self,
706 quota_project_id: &format::QuotaProjectId,
707 ) -> io::Result<()> {
fc9c8c9d 708 seq_write_pxar_struct_entry(
d8402433 709 self.output.as_mut(),
fc9c8c9d 710 format::PXAR_QUOTA_PROJID,
1b25fc08 711 *quota_project_id,
ec4a53ed 712 &mut self.state.write_position,
fc9c8c9d
WB
713 )
714 .await
b5a471a3
WB
715 }
716
70acf637 717 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
f3ac1c51 718 crate::util::validate_filename(file_name)?;
ec4a53ed 719 seq_write_pxar_entry_zero(
d8402433 720 self.output.as_mut(),
ec4a53ed
WB
721 format::PXAR_FILENAME,
722 file_name,
723 &mut self.state.write_position,
724 )
725 .await
70acf637
WB
726 }
727
d8402433 728 pub async fn finish(mut self) -> io::Result<()> {
70acf637 729 let tail_bytes = self.finish_goodbye_table().await?;
ec4a53ed 730 seq_write_pxar_entry(
d8402433 731 self.output.as_mut(),
ec4a53ed
WB
732 format::PXAR_GOODBYE,
733 &tail_bytes,
734 &mut self.state.write_position,
735 )
736 .await?;
737
e827b69a
WB
738 if let EncoderOutput::Owned(output) = &mut self.output {
739 flush(output).await?;
740 }
d995b531 741
ec4a53ed
WB
742 // done up here because of the self-borrow and to propagate
743 let end_offset = self.position();
744
749855a4 745 if let Some(parent) = &mut self.parent {
ec4a53ed
WB
746 parent.write_position = end_offset;
747
749855a4
WB
748 let file_offset = self
749 .state
750 .file_offset
751 .expect("internal error: parent set but no file_offset?");
752
749855a4
WB
753 parent.items.push(GoodbyeItem {
754 hash: self.state.file_hash,
755 offset: file_offset,
756 size: end_offset - file_offset,
757 });
758 }
70acf637 759 self.finished = true;
d8402433 760 Ok(())
70acf637
WB
761 }
762
763 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
ec4a53ed 764 let goodbye_offset = self.position();
70acf637
WB
765
766 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
767 let mut tail = take(&mut self.state.items);
768 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
769 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
770
749855a4
WB
771 // sort, then create a BST
772 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
70acf637 773
749855a4
WB
774 let mut bst = Vec::with_capacity(tail.len() + 1);
775 unsafe {
776 bst.set_len(tail.len());
70acf637 777 }
749855a4
WB
778 binary_tree_array::copy(tail.len(), |src, dest| {
779 let mut item = tail[src].clone();
780 // fixup the goodbye table offsets to be relative and with the right endianess
781 item.offset = goodbye_offset - item.offset;
782 unsafe {
783 std::ptr::write(&mut bst[dest], item.to_le());
784 }
785 });
786 drop(tail);
787
788 bst.push(
789 GoodbyeItem {
790 hash: format::PXAR_GOODBYE_TAIL_MARKER,
791 offset: goodbye_offset - self.state.entry_offset,
792 size: goodbye_size,
793 }
794 .to_le(),
795 );
70acf637
WB
796
797 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
798 // the items make sense:
749855a4
WB
799 let data = bst.as_mut_ptr() as *mut u8;
800 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
801 forget(bst);
70acf637
WB
802 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
803 }
804}
805
806/// Writer for a file object in a directory.
e5a2495e 807pub(crate) struct FileImpl<'a, S: SeqWrite> {
d8402433 808 output: &'a mut S,
70acf637
WB
809
810 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
811 /// directly instead of on Drop of FileImpl?
812 goodbye_item: GoodbyeItem,
813
814 /// While writing data to this file, this is how much space we still have left, this must reach
815 /// exactly zero.
816 remaining_size: u64,
817
59e6f679 818 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
70acf637
WB
819 /// to, and where we insert our `GoodbyeItem`.
820 parent: &'a mut EncoderState,
821}
822
d8402433 823impl<'a, S: SeqWrite> Drop for FileImpl<'a, S> {
70acf637
WB
824 fn drop(&mut self) {
825 if self.remaining_size != 0 {
59e6f679 826 self.parent.add_error(EncodeError::IncompleteFile);
70acf637
WB
827 }
828
829 self.parent.items.push(self.goodbye_item.clone());
830 }
831}
832
d8402433 833impl<'a, S: SeqWrite> FileImpl<'a, S> {
d6748862
WB
834 /// Get the file offset to be able to reference it with `add_hardlink`.
835 pub fn file_offset(&self) -> LinkOffset {
836 LinkOffset(self.goodbye_item.offset)
837 }
838
70acf637
WB
839 fn check_remaining(&self, size: usize) -> io::Result<()> {
840 if size as u64 > self.remaining_size {
841 io_bail!("attempted to write more than previously allocated");
842 } else {
843 Ok(())
844 }
845 }
846
05356884 847 /// Poll write interface to more easily connect to tokio/futures.
7aee9c1f 848 #[cfg(feature = "tokio-io")]
05356884
WB
849 pub fn poll_write(
850 self: Pin<&mut Self>,
851 cx: &mut Context,
852 data: &[u8],
853 ) -> Poll<io::Result<usize>> {
854 let this = self.get_mut();
855 this.check_remaining(data.len())?;
856 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
857 match output.poll_seq_write(cx, data) {
858 Poll::Ready(Ok(put)) => {
859 this.remaining_size -= put as u64;
ec4a53ed 860 this.parent.write_position += put as u64;
05356884
WB
861 Poll::Ready(Ok(put))
862 }
863 other => other,
864 }
865 }
866
867 /// Poll flush interface to more easily connect to tokio/futures.
7aee9c1f 868 #[cfg(feature = "tokio-io")]
05356884 869 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
737f75cf 870 unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
05356884
WB
871 }
872
873 /// Poll close/shutdown interface to more easily connect to tokio/futures.
ec4a53ed
WB
874 ///
875 /// This just calls flush, though, since we're just a virtual writer writing to the file
876 /// provided by our encoder.
7aee9c1f 877 #[cfg(feature = "tokio-io")]
05356884 878 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
737f75cf 879 unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
05356884
WB
880 }
881
70acf637
WB
882 /// Write file data for the current file entry in a pxar archive.
883 ///
884 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
885 /// requested. Check the return value for how many. There's also a `write_all` method available
886 /// for convenience.
887 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
888 self.check_remaining(data.len())?;
ec4a53ed
WB
889 let put =
890 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
891 .await?;
84a1926f 892 //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
70acf637 893 self.remaining_size -= put as u64;
ec4a53ed 894 self.parent.write_position += put as u64;
70acf637
WB
895 Ok(put)
896 }
897
898 /// Completely write file data for the current file entry in a pxar archive.
899 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
900 self.check_remaining(data.len())?;
d8402433 901 seq_write_all(self.output, data, &mut self.parent.write_position).await?;
70acf637
WB
902 self.remaining_size -= data.len() as u64;
903 Ok(())
904 }
905}
05356884
WB
906
907#[cfg(feature = "tokio-io")]
d8402433 908impl<'a, S: SeqWrite> tokio::io::AsyncWrite for FileImpl<'a, S> {
327e33f2 909 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
05356884
WB
910 FileImpl::poll_write(self, cx, buf)
911 }
912
913 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
914 FileImpl::poll_flush(self, cx)
915 }
916
917 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
918 FileImpl::poll_close(self, cx)
919 }
920}