]> git.proxmox.com Git - pxar.git/blame - src/encoder/mod.rs
clippy fixup
[pxar.git] / src / encoder / mod.rs
CommitLineData
70acf637
WB
1//! The `pxar` encoder state machine.
2//!
3//! This is the implementation used by both the synchronous and async pxar wrappers.
4
e5a2495e
WB
5#![deny(missing_docs)]
6
70acf637
WB
7use std::io;
8use std::mem::{forget, size_of, size_of_val, take};
9use std::os::unix::ffi::OsStrExt;
10use std::path::Path;
11use std::pin::Pin;
a08b84b7 12use std::sync::{Arc, Mutex};
70acf637
WB
13use std::task::{Context, Poll};
14
15use endian_trait::Endian;
16
749855a4 17use crate::binary_tree_array;
951620f1 18use crate::decoder::{self, SeqRead};
70acf637
WB
19use crate::format::{self, GoodbyeItem};
20use crate::poll_fn::poll_fn;
21use crate::Metadata;
22
54109840 23pub mod aio;
70acf637
WB
24pub mod sync;
25
26#[doc(inline)]
27pub use sync::Encoder;
28
d6748862
WB
29/// File reference used to create hard links.
30#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
31pub struct LinkOffset(u64);
32
33impl LinkOffset {
e5a2495e 34 /// Get the raw byte offset of this link.
d6748862
WB
35 #[inline]
36 pub fn raw(self) -> u64 {
37 self.0
38 }
39}
40
70acf637
WB
41/// Sequential write interface used by the encoder's state machine.
42///
43/// This is our internal writer trait which is available for `std::io::Write` types in the
44/// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
45/// wrapper.
46pub trait SeqWrite {
e5a2495e
WB
47 /// Attempt to perform a sequential write to the file. On success, the number of written bytes
48 /// is returned as `Poll::Ready(Ok(bytes))`.
49 ///
50 /// If writing is not yet possible, `Poll::Pending` is returned and the current task will be
51 /// notified via the `cx.waker()` when writing becomes possible.
70acf637
WB
52 fn poll_seq_write(
53 self: Pin<&mut Self>,
54 cx: &mut Context,
55 buf: &[u8],
56 ) -> Poll<io::Result<usize>>;
57
e5a2495e
WB
58 /// Attempt to flush the output, ensuring that all buffered data reaches the destination.
59 ///
60 /// On success, returns `Poll::Ready(Ok(()))`.
61 ///
62 /// If flushing cannot complete immediately, `Poll::Pending` is returned and the current task
63 /// will be notified via `cx.waker()` when progress can be made.
327e33f2 64 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
70acf637
WB
65}
66
67/// Allow using trait objects for generics taking a `SeqWrite`.
d8402433
SR
68impl<S> SeqWrite for &mut S
69where
70 S: SeqWrite + ?Sized,
71{
70acf637
WB
72 fn poll_seq_write(
73 self: Pin<&mut Self>,
74 cx: &mut Context,
75 buf: &[u8],
76 ) -> Poll<io::Result<usize>> {
77 unsafe {
78 self.map_unchecked_mut(|this| &mut **this)
79 .poll_seq_write(cx, buf)
80 }
81 }
82
05356884
WB
83 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
84 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
85 }
70acf637
WB
86}
87
fc9c8c9d 88/// awaitable verison of `poll_seq_write`.
ec4a53ed
WB
89async fn seq_write<T: SeqWrite + ?Sized>(
90 output: &mut T,
91 buf: &[u8],
92 position: &mut u64,
93) -> io::Result<usize> {
94 let put =
95 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await?;
96 *position += put as u64;
97 Ok(put)
fc9c8c9d 98}
70acf637 99
d995b531 100/// awaitable version of 'poll_flush'.
eae6dc06 101async fn flush<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<()> {
d995b531
DC
102 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_flush(cx) }).await
103}
104
fc9c8c9d 105/// Write the entire contents of a buffer, handling short writes.
ec4a53ed
WB
106async fn seq_write_all<T: SeqWrite + ?Sized>(
107 output: &mut T,
108 mut buf: &[u8],
109 position: &mut u64,
110) -> io::Result<()> {
fc9c8c9d 111 while !buf.is_empty() {
ec4a53ed 112 let got = seq_write(&mut *output, buf, &mut *position).await?;
fc9c8c9d 113 buf = &buf[got..];
70acf637 114 }
fc9c8c9d
WB
115 Ok(())
116}
70acf637 117
fc9c8c9d 118/// Write an endian-swappable struct.
ec4a53ed
WB
119async fn seq_write_struct<E: Endian, T>(
120 output: &mut T,
121 data: E,
122 position: &mut u64,
123) -> io::Result<()>
fc9c8c9d
WB
124where
125 T: SeqWrite + ?Sized,
126{
127 let data = data.to_le();
a08b84b7
WB
128 let buf =
129 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
8bff2f7c 130 seq_write_all(output, buf, position).await
fc9c8c9d 131}
70acf637 132
fc9c8c9d 133/// Write a pxar entry.
ec4a53ed
WB
134async fn seq_write_pxar_entry<T>(
135 output: &mut T,
136 htype: u64,
137 data: &[u8],
138 position: &mut u64,
139) -> io::Result<()>
fc9c8c9d
WB
140where
141 T: SeqWrite + ?Sized,
142{
ec0761f9
FG
143 let header = format::Header::with_content_size(htype, data.len() as u64);
144 header.check_header_size()?;
145
ec4a53ed
WB
146 seq_write_struct(&mut *output, header, &mut *position).await?;
147 seq_write_all(output, data, position).await
fc9c8c9d 148}
70acf637 149
fc9c8c9d
WB
150/// Write a pxar entry terminated by an additional zero which is not contained in the provided
151/// data buffer.
ec4a53ed
WB
152async fn seq_write_pxar_entry_zero<T>(
153 output: &mut T,
154 htype: u64,
155 data: &[u8],
156 position: &mut u64,
157) -> io::Result<()>
fc9c8c9d
WB
158where
159 T: SeqWrite + ?Sized,
160{
ec0761f9
FG
161 let header = format::Header::with_content_size(htype, 1 + data.len() as u64);
162 header.check_header_size()?;
163
ec4a53ed
WB
164 seq_write_struct(&mut *output, header, &mut *position).await?;
165 seq_write_all(&mut *output, data, &mut *position).await?;
166 seq_write_all(output, &[0u8], position).await
fc9c8c9d 167}
70acf637 168
fc9c8c9d 169/// Write a pxar entry consiting of an endian-swappable struct.
ec4a53ed
WB
170async fn seq_write_pxar_struct_entry<E, T>(
171 output: &mut T,
172 htype: u64,
173 data: E,
174 position: &mut u64,
175) -> io::Result<()>
fc9c8c9d
WB
176where
177 T: SeqWrite + ?Sized,
178 E: Endian,
179{
180 let data = data.to_le();
a08b84b7
WB
181 let buf =
182 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
8bff2f7c 183 seq_write_pxar_entry(output, htype, buf, position).await
70acf637
WB
184}
185
59e6f679
WB
186/// Error conditions caused by wrong usage of this crate.
187#[derive(Clone, Copy, Debug, Eq, PartialEq)]
188pub enum EncodeError {
189 /// The user dropped a `File` without without finishing writing all of its contents.
190 ///
191 /// This is required because the payload lengths is written out at the begining and decoding
192 /// requires there to follow the right amount of data.
193 IncompleteFile,
194
195 /// The user dropped a directory without finalizing it.
196 ///
197 /// Finalizing is required to build the goodbye table at the end of a directory.
198 IncompleteDirectory,
199}
200
70acf637
WB
201#[derive(Default)]
202struct EncoderState {
203 /// Goodbye items for this directory, excluding the tail.
204 items: Vec<GoodbyeItem>,
205
59e6f679
WB
206 /// User caused error conditions.
207 encode_error: Option<EncodeError>,
70acf637
WB
208
209 /// Offset of this directory's ENTRY.
210 entry_offset: u64,
211
d4a04d53 212 #[allow(dead_code)]
70acf637
WB
213 /// Offset to this directory's first FILENAME.
214 files_offset: u64,
215
216 /// If this is a subdirectory, this points to the this directory's FILENAME.
217 file_offset: Option<u64>,
749855a4
WB
218
219 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
220 file_hash: u64,
ec4a53ed
WB
221
222 /// We need to keep track how much we have written to get offsets.
223 write_position: u64,
70acf637
WB
224}
225
59e6f679
WB
226impl EncoderState {
227 fn merge_error(&mut self, error: Option<EncodeError>) {
228 // one error is enough:
229 if self.encode_error.is_none() {
230 self.encode_error = error;
231 }
232 }
233
234 fn add_error(&mut self, error: EncodeError) {
235 self.merge_error(Some(error));
236 }
237}
238
d8402433
SR
239pub(crate) enum EncoderOutput<'a, T> {
240 Owned(T),
241 Borrowed(&'a mut T),
242}
243
b8d79e5f
WB
244impl<'a, T> EncoderOutput<'a, T> {
245 #[inline]
d4a04d53 246 fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
b8d79e5f
WB
247 where
248 'a: 's,
249 {
250 EncoderOutput::Borrowed(self.as_mut())
251 }
252}
253
d8402433
SR
254impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
255 fn as_mut(&mut self) -> &mut T {
256 match self {
b8d79e5f 257 EncoderOutput::Owned(o) => o,
d8402433
SR
258 EncoderOutput::Borrowed(b) => b,
259 }
260 }
261}
262
263impl<'a, T> std::convert::From<T> for EncoderOutput<'a, T> {
264 fn from(t: T) -> Self {
265 EncoderOutput::Owned(t)
266 }
267}
268
269impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
270 fn from(t: &'a mut T) -> Self {
271 EncoderOutput::Borrowed(t)
272 }
273}
274
70acf637
WB
275/// The encoder state machine implementation for a directory.
276///
277/// We use `async fn` to implement the encoder state machine so that we can easily plug in both
278/// synchronous or `async` I/O objects in as output.
279pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
d8402433 280 output: EncoderOutput<'a, T>,
70acf637
WB
281 state: EncoderState,
282 parent: Option<&'a mut EncoderState>,
283 finished: bool,
79e3f621
WB
284
285 /// Since only the "current" entry can be actively writing files, we share the file copy
286 /// buffer.
a08b84b7 287 file_copy_buffer: Arc<Mutex<Vec<u8>>>,
70acf637
WB
288}
289
290impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
291 fn drop(&mut self) {
292 if let Some(ref mut parent) = self.parent {
293 // propagate errors:
59e6f679
WB
294 parent.merge_error(self.state.encode_error);
295 if !self.finished {
296 parent.add_error(EncodeError::IncompleteDirectory);
70acf637
WB
297 }
298 } else if !self.finished {
299 // FIXME: how do we deal with this?
2f56c76c 300 // eprintln!("Encoder dropped without finishing!");
70acf637
WB
301 }
302 }
303}
304
305impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
737f75cf
WB
306 pub async fn new(
307 output: EncoderOutput<'a, T>,
308 metadata: &Metadata,
309 ) -> io::Result<EncoderImpl<'a, T>> {
70acf637
WB
310 if !metadata.is_dir() {
311 io_bail!("directory metadata must contain the directory mode flag");
312 }
313 let mut this = Self {
d8402433 314 output,
70acf637
WB
315 state: EncoderState::default(),
316 parent: None,
317 finished: false,
81d50029
WB
318 file_copy_buffer: Arc::new(Mutex::new(unsafe {
319 crate::util::vec_new_uninitialized(1024 * 1024)
320 })),
70acf637
WB
321 };
322
323 this.encode_metadata(metadata).await?;
ec4a53ed 324 this.state.files_offset = this.position();
70acf637
WB
325
326 Ok(this)
327 }
328
329 fn check(&self) -> io::Result<()> {
59e6f679
WB
330 match self.state.encode_error {
331 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
332 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
a7149b09 333 None => Ok(()),
70acf637
WB
334 }
335 }
336
337 pub async fn create_file<'b>(
338 &'b mut self,
339 metadata: &Metadata,
340 file_name: &Path,
341 file_size: u64,
d8402433 342 ) -> io::Result<FileImpl<'b, T>>
70acf637
WB
343 where
344 'a: 'b,
345 {
346 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
347 .await
348 }
349
350 async fn create_file_do<'b>(
351 &'b mut self,
352 metadata: &Metadata,
353 file_name: &[u8],
354 file_size: u64,
d8402433 355 ) -> io::Result<FileImpl<'b, T>>
70acf637
WB
356 where
357 'a: 'b,
358 {
359 self.check()?;
360
ec4a53ed 361 let file_offset = self.position();
2ab25a17 362 self.start_file_do(Some(metadata), file_name).await?;
70acf637 363
ec0761f9
FG
364 let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
365 header.check_header_size()?;
366
737f75cf 367 seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
70acf637 368
ec4a53ed 369 let payload_data_offset = self.position();
70acf637
WB
370
371 let meta_size = payload_data_offset - file_offset;
372
373 Ok(FileImpl {
d8402433 374 output: self.output.as_mut(),
70acf637
WB
375 goodbye_item: GoodbyeItem {
376 hash: format::hash_filename(file_name),
377 offset: file_offset,
378 size: file_size + meta_size,
379 },
380 remaining_size: file_size,
381 parent: &mut self.state,
382 })
383 }
384
ddc08ebb
WB
385 fn take_file_copy_buffer(&self) -> Vec<u8> {
386 let buf: Vec<_> = take(
387 &mut self
388 .file_copy_buffer
389 .lock()
390 .expect("failed to lock temporary buffer mutex"),
391 );
392 if buf.len() < 1024 * 1024 {
393 drop(buf);
394 unsafe { crate::util::vec_new_uninitialized(1024 * 1024) }
395 } else {
396 buf
397 }
398 }
399
400 fn put_file_copy_buffer(&self, buf: Vec<u8>) {
401 let mut lock = self
402 .file_copy_buffer
403 .lock()
404 .expect("failed to lock temporary buffer mutex");
405 if lock.len() < buf.len() {
406 *lock = buf;
407 }
408 }
409
d6748862 410 /// Return a file offset usable with `add_hardlink`.
70acf637
WB
411 pub async fn add_file(
412 &mut self,
413 metadata: &Metadata,
414 file_name: &Path,
415 file_size: u64,
416 content: &mut dyn SeqRead,
d6748862 417 ) -> io::Result<LinkOffset> {
ddc08ebb 418 let mut buf = self.take_file_copy_buffer();
70acf637 419 let mut file = self.create_file(metadata, file_name, file_size).await?;
70acf637 420 loop {
79e3f621 421 let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
70acf637
WB
422 if got == 0 {
423 break;
424 } else {
425 file.write_all(&buf[..got]).await?;
426 }
427 }
ddc08ebb
WB
428 let offset = file.file_offset();
429 drop(file);
430 self.put_file_copy_buffer(buf);
431 Ok(offset)
70acf637
WB
432 }
433
d6748862 434 /// Return a file offset usable with `add_hardlink`.
2fb73e7b
WB
435 pub async fn add_symlink(
436 &mut self,
437 metadata: &Metadata,
438 file_name: &Path,
439 target: &Path,
501ea220 440 ) -> io::Result<()> {
16f0464d
WB
441 let target = target.as_os_str().as_bytes();
442 let mut data = Vec::with_capacity(target.len() + 1);
443 data.extend(target);
444 data.push(0);
b0487d4f
WB
445 let _ofs: LinkOffset = self
446 .add_file_entry(
447 Some(metadata),
448 file_name,
16f0464d 449 Some((format::PXAR_SYMLINK, &data)),
b0487d4f
WB
450 )
451 .await?;
501ea220 452 Ok(())
0abc4121
WB
453 }
454
d6748862 455 /// Return a file offset usable with `add_hardlink`.
b0752929 456 pub async fn add_hardlink(
0abc4121 457 &mut self,
0abc4121
WB
458 file_name: &Path,
459 target: &Path,
63f2296f 460 target_offset: LinkOffset,
3fe26522 461 ) -> io::Result<()> {
ec4a53ed 462 let current_offset = self.position();
63f2296f
WB
463 if current_offset <= target_offset.0 {
464 io_bail!("invalid hardlink offset, can only point to prior files");
465 }
466
f3e50baa
WB
467 let offset_bytes = (current_offset - target_offset.0).to_le_bytes();
468 let target_bytes = target.as_os_str().as_bytes();
16f0464d 469 let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len() + 1);
165dcc33 470 hardlink.extend(offset_bytes);
f3e50baa 471 hardlink.extend(target_bytes);
16f0464d 472 hardlink.push(0);
b0487d4f 473 let _this_offset: LinkOffset = self
f3e50baa 474 .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink)))
b0487d4f 475 .await?;
d6748862 476 Ok(())
3fe26522
WB
477 }
478
d6748862 479 /// Return a file offset usable with `add_hardlink`.
3fe26522
WB
480 pub async fn add_device(
481 &mut self,
482 metadata: &Metadata,
483 file_name: &Path,
484 device: format::Device,
501ea220 485 ) -> io::Result<()> {
a7149b09
WB
486 if !metadata.is_device() {
487 io_bail!("entry added via add_device must have a device mode in its metadata");
488 }
489
3fe26522
WB
490 let device = device.to_le();
491 let device = unsafe {
492 std::slice::from_raw_parts(
493 &device as *const format::Device as *const u8,
494 size_of::<format::Device>(),
495 )
496 };
b0487d4f
WB
497 let _ofs: LinkOffset = self
498 .add_file_entry(
499 Some(metadata),
500 file_name,
501 Some((format::PXAR_DEVICE, device)),
502 )
503 .await?;
501ea220 504 Ok(())
3fe26522
WB
505 }
506
d6748862 507 /// Return a file offset usable with `add_hardlink`.
501ea220 508 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
a7149b09
WB
509 if !metadata.is_fifo() {
510 io_bail!("entry added via add_device must be of type fifo in its metadata");
511 }
512
501ea220
WB
513 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
514 Ok(())
a7149b09
WB
515 }
516
d6748862 517 /// Return a file offset usable with `add_hardlink`.
501ea220 518 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
a7149b09
WB
519 if !metadata.is_socket() {
520 io_bail!("entry added via add_device must be of type socket in its metadata");
521 }
522
501ea220
WB
523 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
524 Ok(())
a7149b09
WB
525 }
526
d6748862 527 /// Return a file offset usable with `add_hardlink`.
3fe26522
WB
528 async fn add_file_entry(
529 &mut self,
2ab25a17 530 metadata: Option<&Metadata>,
3fe26522 531 file_name: &Path,
a7149b09 532 entry_htype_data: Option<(u64, &[u8])>,
d6748862 533 ) -> io::Result<LinkOffset> {
05356884 534 self.check()?;
2fb73e7b 535
ec4a53ed 536 let file_offset = self.position();
2fb73e7b
WB
537
538 let file_name = file_name.as_os_str().as_bytes();
2fb73e7b
WB
539
540 self.start_file_do(metadata, file_name).await?;
a7149b09 541 if let Some((htype, entry_data)) = entry_htype_data {
ec4a53ed 542 seq_write_pxar_entry(
d8402433 543 self.output.as_mut(),
ec4a53ed
WB
544 htype,
545 entry_data,
546 &mut self.state.write_position,
547 )
548 .await?;
a7149b09 549 }
2fb73e7b 550
ec4a53ed 551 let end_offset = self.position();
2fb73e7b
WB
552
553 self.state.items.push(GoodbyeItem {
554 hash: format::hash_filename(file_name),
555 offset: file_offset,
556 size: end_offset - file_offset,
557 });
558
d6748862 559 Ok(LinkOffset(file_offset))
2fb73e7b
WB
560 }
561
70acf637 562 #[inline]
ec4a53ed
WB
563 fn position(&mut self) -> u64 {
564 self.state.write_position
70acf637
WB
565 }
566
d8402433
SR
567 pub async fn create_directory(
568 &mut self,
70acf637
WB
569 file_name: &Path,
570 metadata: &Metadata,
d8402433 571 ) -> io::Result<EncoderImpl<'_, T>> {
70acf637
WB
572 self.check()?;
573
574 if !metadata.is_dir() {
575 io_bail!("directory metadata must contain the directory mode flag");
576 }
577
578 let file_name = file_name.as_os_str().as_bytes();
749855a4 579 let file_hash = format::hash_filename(file_name);
70acf637 580
ec4a53ed 581 let file_offset = self.position();
70acf637
WB
582 self.encode_filename(file_name).await?;
583
ec4a53ed 584 let entry_offset = self.position();
ceb4a546 585 self.encode_metadata(metadata).await?;
70acf637 586
ec4a53ed
WB
587 let files_offset = self.position();
588
589 // the child will write to OUR state now:
590 let write_position = self.position();
70acf637 591
d8402433
SR
592 let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
593
70acf637 594 Ok(EncoderImpl {
d8402433 595 // always forward as Borrowed(), to avoid stacking references on nested calls
d4a04d53 596 output: self.output.to_borrowed_mut(),
70acf637
WB
597 state: EncoderState {
598 entry_offset,
599 files_offset,
600 file_offset: Some(file_offset),
1b25fc08 601 file_hash,
ec4a53ed 602 write_position,
70acf637
WB
603 ..Default::default()
604 },
605 parent: Some(&mut self.state),
606 finished: false,
d8402433 607 file_copy_buffer,
70acf637
WB
608 })
609 }
610
2ab25a17
WB
611 async fn start_file_do(
612 &mut self,
613 metadata: Option<&Metadata>,
614 file_name: &[u8],
615 ) -> io::Result<()> {
70acf637 616 self.encode_filename(file_name).await?;
2ab25a17 617 if let Some(metadata) = metadata {
ceb4a546 618 self.encode_metadata(metadata).await?;
2ab25a17 619 }
70acf637
WB
620 Ok(())
621 }
622
623 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
ec4a53ed 624 seq_write_pxar_struct_entry(
d8402433 625 self.output.as_mut(),
ec4a53ed
WB
626 format::PXAR_ENTRY,
627 metadata.stat.clone(),
628 &mut self.state.write_position,
629 )
630 .await?;
b5a471a3
WB
631
632 for xattr in &metadata.xattrs {
633 self.write_xattr(xattr).await?;
634 }
635
636 self.write_acls(&metadata.acl).await?;
637
638 if let Some(fcaps) = &metadata.fcaps {
639 self.write_file_capabilities(fcaps).await?;
640 }
641
642 if let Some(qpid) = &metadata.quota_project_id {
643 self.write_quota_project_id(qpid).await?;
644 }
645
646 Ok(())
647 }
648
649 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
ec4a53ed 650 seq_write_pxar_entry(
d8402433 651 self.output.as_mut(),
ec4a53ed
WB
652 format::PXAR_XATTR,
653 &xattr.data,
654 &mut self.state.write_position,
655 )
656 .await
b5a471a3
WB
657 }
658
659 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
660 for acl in &acl.users {
ec4a53ed 661 seq_write_pxar_struct_entry(
d8402433 662 self.output.as_mut(),
ec4a53ed
WB
663 format::PXAR_ACL_USER,
664 acl.clone(),
665 &mut self.state.write_position,
666 )
667 .await?;
b5a471a3
WB
668 }
669
670 for acl in &acl.groups {
ec4a53ed 671 seq_write_pxar_struct_entry(
d8402433 672 self.output.as_mut(),
ec4a53ed
WB
673 format::PXAR_ACL_GROUP,
674 acl.clone(),
675 &mut self.state.write_position,
676 )
677 .await?;
b5a471a3
WB
678 }
679
680 if let Some(acl) = &acl.group_obj {
ec4a53ed 681 seq_write_pxar_struct_entry(
d8402433 682 self.output.as_mut(),
ec4a53ed
WB
683 format::PXAR_ACL_GROUP_OBJ,
684 acl.clone(),
685 &mut self.state.write_position,
686 )
687 .await?;
b5a471a3
WB
688 }
689
690 if let Some(acl) = &acl.default {
ec4a53ed 691 seq_write_pxar_struct_entry(
d8402433 692 self.output.as_mut(),
ec4a53ed
WB
693 format::PXAR_ACL_DEFAULT,
694 acl.clone(),
695 &mut self.state.write_position,
696 )
697 .await?;
b5a471a3
WB
698 }
699
700 for acl in &acl.default_users {
fc9c8c9d 701 seq_write_pxar_struct_entry(
d8402433 702 self.output.as_mut(),
fc9c8c9d
WB
703 format::PXAR_ACL_DEFAULT_USER,
704 acl.clone(),
ec4a53ed 705 &mut self.state.write_position,
fc9c8c9d
WB
706 )
707 .await?;
b5a471a3
WB
708 }
709
710 for acl in &acl.default_groups {
fc9c8c9d 711 seq_write_pxar_struct_entry(
d8402433 712 self.output.as_mut(),
fc9c8c9d
WB
713 format::PXAR_ACL_DEFAULT_GROUP,
714 acl.clone(),
ec4a53ed 715 &mut self.state.write_position,
fc9c8c9d
WB
716 )
717 .await?;
b5a471a3
WB
718 }
719
70acf637
WB
720 Ok(())
721 }
722
b5a471a3 723 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
ec4a53ed 724 seq_write_pxar_entry(
d8402433 725 self.output.as_mut(),
ec4a53ed
WB
726 format::PXAR_FCAPS,
727 &fcaps.data,
728 &mut self.state.write_position,
729 )
730 .await
b5a471a3
WB
731 }
732
733 async fn write_quota_project_id(
734 &mut self,
735 quota_project_id: &format::QuotaProjectId,
736 ) -> io::Result<()> {
fc9c8c9d 737 seq_write_pxar_struct_entry(
d8402433 738 self.output.as_mut(),
fc9c8c9d 739 format::PXAR_QUOTA_PROJID,
1b25fc08 740 *quota_project_id,
ec4a53ed 741 &mut self.state.write_position,
fc9c8c9d
WB
742 )
743 .await
b5a471a3
WB
744 }
745
70acf637 746 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
f3ac1c51 747 crate::util::validate_filename(file_name)?;
ec4a53ed 748 seq_write_pxar_entry_zero(
d8402433 749 self.output.as_mut(),
ec4a53ed
WB
750 format::PXAR_FILENAME,
751 file_name,
752 &mut self.state.write_position,
753 )
754 .await
70acf637
WB
755 }
756
d8402433 757 pub async fn finish(mut self) -> io::Result<()> {
70acf637 758 let tail_bytes = self.finish_goodbye_table().await?;
ec4a53ed 759 seq_write_pxar_entry(
d8402433 760 self.output.as_mut(),
ec4a53ed
WB
761 format::PXAR_GOODBYE,
762 &tail_bytes,
763 &mut self.state.write_position,
764 )
765 .await?;
766
e827b69a
WB
767 if let EncoderOutput::Owned(output) = &mut self.output {
768 flush(output).await?;
769 }
d995b531 770
ec4a53ed
WB
771 // done up here because of the self-borrow and to propagate
772 let end_offset = self.position();
773
749855a4 774 if let Some(parent) = &mut self.parent {
ec4a53ed
WB
775 parent.write_position = end_offset;
776
749855a4
WB
777 let file_offset = self
778 .state
779 .file_offset
780 .expect("internal error: parent set but no file_offset?");
781
749855a4
WB
782 parent.items.push(GoodbyeItem {
783 hash: self.state.file_hash,
784 offset: file_offset,
785 size: end_offset - file_offset,
786 });
787 }
70acf637 788 self.finished = true;
d8402433 789 Ok(())
70acf637
WB
790 }
791
792 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
ec4a53ed 793 let goodbye_offset = self.position();
70acf637
WB
794
795 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
796 let mut tail = take(&mut self.state.items);
797 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
798 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
799
749855a4
WB
800 // sort, then create a BST
801 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
70acf637 802
749855a4 803 let mut bst = Vec::with_capacity(tail.len() + 1);
c40ababa 804 #[allow(clippy::uninit_vec)]
749855a4
WB
805 unsafe {
806 bst.set_len(tail.len());
70acf637 807 }
749855a4
WB
808 binary_tree_array::copy(tail.len(), |src, dest| {
809 let mut item = tail[src].clone();
810 // fixup the goodbye table offsets to be relative and with the right endianess
811 item.offset = goodbye_offset - item.offset;
812 unsafe {
813 std::ptr::write(&mut bst[dest], item.to_le());
814 }
815 });
816 drop(tail);
817
818 bst.push(
819 GoodbyeItem {
820 hash: format::PXAR_GOODBYE_TAIL_MARKER,
821 offset: goodbye_offset - self.state.entry_offset,
822 size: goodbye_size,
823 }
824 .to_le(),
825 );
70acf637
WB
826
827 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
828 // the items make sense:
749855a4
WB
829 let data = bst.as_mut_ptr() as *mut u8;
830 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
831 forget(bst);
70acf637
WB
832 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
833 }
834}
835
836/// Writer for a file object in a directory.
e5a2495e 837pub(crate) struct FileImpl<'a, S: SeqWrite> {
d8402433 838 output: &'a mut S,
70acf637
WB
839
840 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
841 /// directly instead of on Drop of FileImpl?
842 goodbye_item: GoodbyeItem,
843
844 /// While writing data to this file, this is how much space we still have left, this must reach
845 /// exactly zero.
846 remaining_size: u64,
847
59e6f679 848 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
70acf637
WB
849 /// to, and where we insert our `GoodbyeItem`.
850 parent: &'a mut EncoderState,
851}
852
d8402433 853impl<'a, S: SeqWrite> Drop for FileImpl<'a, S> {
70acf637
WB
854 fn drop(&mut self) {
855 if self.remaining_size != 0 {
59e6f679 856 self.parent.add_error(EncodeError::IncompleteFile);
70acf637
WB
857 }
858
859 self.parent.items.push(self.goodbye_item.clone());
860 }
861}
862
d8402433 863impl<'a, S: SeqWrite> FileImpl<'a, S> {
d6748862
WB
864 /// Get the file offset to be able to reference it with `add_hardlink`.
865 pub fn file_offset(&self) -> LinkOffset {
866 LinkOffset(self.goodbye_item.offset)
867 }
868
70acf637
WB
869 fn check_remaining(&self, size: usize) -> io::Result<()> {
870 if size as u64 > self.remaining_size {
871 io_bail!("attempted to write more than previously allocated");
872 } else {
873 Ok(())
874 }
875 }
876
05356884 877 /// Poll write interface to more easily connect to tokio/futures.
7aee9c1f 878 #[cfg(feature = "tokio-io")]
05356884
WB
879 pub fn poll_write(
880 self: Pin<&mut Self>,
881 cx: &mut Context,
882 data: &[u8],
883 ) -> Poll<io::Result<usize>> {
884 let this = self.get_mut();
885 this.check_remaining(data.len())?;
886 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
887 match output.poll_seq_write(cx, data) {
888 Poll::Ready(Ok(put)) => {
889 this.remaining_size -= put as u64;
ec4a53ed 890 this.parent.write_position += put as u64;
05356884
WB
891 Poll::Ready(Ok(put))
892 }
893 other => other,
894 }
895 }
896
897 /// Poll flush interface to more easily connect to tokio/futures.
7aee9c1f 898 #[cfg(feature = "tokio-io")]
05356884 899 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
737f75cf 900 unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
05356884
WB
901 }
902
903 /// Poll close/shutdown interface to more easily connect to tokio/futures.
ec4a53ed
WB
904 ///
905 /// This just calls flush, though, since we're just a virtual writer writing to the file
906 /// provided by our encoder.
7aee9c1f 907 #[cfg(feature = "tokio-io")]
05356884 908 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
737f75cf 909 unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
05356884
WB
910 }
911
70acf637
WB
912 /// Write file data for the current file entry in a pxar archive.
913 ///
914 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
915 /// requested. Check the return value for how many. There's also a `write_all` method available
916 /// for convenience.
917 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
918 self.check_remaining(data.len())?;
ec4a53ed
WB
919 let put =
920 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
921 .await?;
84a1926f 922 //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
70acf637 923 self.remaining_size -= put as u64;
ec4a53ed 924 self.parent.write_position += put as u64;
70acf637
WB
925 Ok(put)
926 }
927
928 /// Completely write file data for the current file entry in a pxar archive.
929 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
930 self.check_remaining(data.len())?;
d8402433 931 seq_write_all(self.output, data, &mut self.parent.write_position).await?;
70acf637
WB
932 self.remaining_size -= data.len() as u64;
933 Ok(())
934 }
935}
05356884
WB
936
937#[cfg(feature = "tokio-io")]
d8402433 938impl<'a, S: SeqWrite> tokio::io::AsyncWrite for FileImpl<'a, S> {
327e33f2 939 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
05356884
WB
940 FileImpl::poll_write(self, cx, buf)
941 }
942
943 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
944 FileImpl::poll_flush(self, cx)
945 }
946
947 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
948 FileImpl::poll_close(self, cx)
949 }
950}