]> git.proxmox.com Git - pxar.git/blob - src/encoder/mod.rs
don't hold temp buffer mutex across await point
[pxar.git] / src / encoder / mod.rs
1 //! The `pxar` encoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 #![deny(missing_docs)]
6
7 use std::io;
8 use std::mem::{forget, size_of, size_of_val, take};
9 use std::os::unix::ffi::OsStrExt;
10 use std::path::Path;
11 use std::pin::Pin;
12 use std::sync::{Arc, Mutex};
13 use std::task::{Context, Poll};
14
15 use endian_trait::Endian;
16
17 use crate::binary_tree_array;
18 use crate::decoder::{self, SeqRead};
19 use crate::format::{self, GoodbyeItem};
20 use crate::poll_fn::poll_fn;
21 use crate::Metadata;
22
23 pub mod aio;
24 pub mod sync;
25
26 #[doc(inline)]
27 pub use sync::Encoder;
28
29 /// File reference used to create hard links.
30 #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
31 pub struct LinkOffset(u64);
32
33 impl LinkOffset {
34 /// Get the raw byte offset of this link.
35 #[inline]
36 pub fn raw(self) -> u64 {
37 self.0
38 }
39 }
40
41 /// Sequential write interface used by the encoder's state machine.
42 ///
43 /// This is our internal writer trait which is available for `std::io::Write` types in the
44 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
45 /// wrapper.
46 pub trait SeqWrite {
47 /// Attempt to perform a sequential write to the file. On success, the number of written bytes
48 /// is returned as `Poll::Ready(Ok(bytes))`.
49 ///
50 /// If writing is not yet possible, `Poll::Pending` is returned and the current task will be
51 /// notified via the `cx.waker()` when writing becomes possible.
52 fn poll_seq_write(
53 self: Pin<&mut Self>,
54 cx: &mut Context,
55 buf: &[u8],
56 ) -> Poll<io::Result<usize>>;
57
58 /// Attempt to flush the output, ensuring that all buffered data reaches the destination.
59 ///
60 /// On success, returns `Poll::Ready(Ok(()))`.
61 ///
62 /// If flushing cannot complete immediately, `Poll::Pending` is returned and the current task
63 /// will be notified via `cx.waker()` when progress can be made.
64 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
65 }
66
67 /// Allow using trait objects for generics taking a `SeqWrite`.
68 impl<S> SeqWrite for &mut S
69 where
70 S: SeqWrite + ?Sized,
71 {
72 fn poll_seq_write(
73 self: Pin<&mut Self>,
74 cx: &mut Context,
75 buf: &[u8],
76 ) -> Poll<io::Result<usize>> {
77 unsafe {
78 self.map_unchecked_mut(|this| &mut **this)
79 .poll_seq_write(cx, buf)
80 }
81 }
82
83 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
84 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
85 }
86 }
87
88 /// awaitable verison of `poll_seq_write`.
89 async fn seq_write<T: SeqWrite + ?Sized>(
90 output: &mut T,
91 buf: &[u8],
92 position: &mut u64,
93 ) -> io::Result<usize> {
94 let put =
95 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await?;
96 *position += put as u64;
97 Ok(put)
98 }
99
100 /// awaitable version of 'poll_flush'.
101 async fn flush<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<()> {
102 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_flush(cx) }).await
103 }
104
105 /// Write the entire contents of a buffer, handling short writes.
106 async fn seq_write_all<T: SeqWrite + ?Sized>(
107 output: &mut T,
108 mut buf: &[u8],
109 position: &mut u64,
110 ) -> io::Result<()> {
111 while !buf.is_empty() {
112 let got = seq_write(&mut *output, buf, &mut *position).await?;
113 buf = &buf[got..];
114 }
115 Ok(())
116 }
117
118 /// Write an endian-swappable struct.
119 async fn seq_write_struct<E: Endian, T>(
120 output: &mut T,
121 data: E,
122 position: &mut u64,
123 ) -> io::Result<()>
124 where
125 T: SeqWrite + ?Sized,
126 {
127 let data = data.to_le();
128 let buf =
129 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
130 seq_write_all(output, buf, position).await
131 }
132
133 /// Write a pxar entry.
134 async fn seq_write_pxar_entry<T>(
135 output: &mut T,
136 htype: u64,
137 data: &[u8],
138 position: &mut u64,
139 ) -> io::Result<()>
140 where
141 T: SeqWrite + ?Sized,
142 {
143 let header = format::Header::with_content_size(htype, data.len() as u64);
144 header.check_header_size()?;
145
146 seq_write_struct(&mut *output, header, &mut *position).await?;
147 seq_write_all(output, data, position).await
148 }
149
150 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
151 /// data buffer.
152 async fn seq_write_pxar_entry_zero<T>(
153 output: &mut T,
154 htype: u64,
155 data: &[u8],
156 position: &mut u64,
157 ) -> io::Result<()>
158 where
159 T: SeqWrite + ?Sized,
160 {
161 let header = format::Header::with_content_size(htype, 1 + data.len() as u64);
162 header.check_header_size()?;
163
164 seq_write_struct(&mut *output, header, &mut *position).await?;
165 seq_write_all(&mut *output, data, &mut *position).await?;
166 seq_write_all(output, &[0u8], position).await
167 }
168
169 /// Write a pxar entry consiting of an endian-swappable struct.
170 async fn seq_write_pxar_struct_entry<E, T>(
171 output: &mut T,
172 htype: u64,
173 data: E,
174 position: &mut u64,
175 ) -> io::Result<()>
176 where
177 T: SeqWrite + ?Sized,
178 E: Endian,
179 {
180 let data = data.to_le();
181 let buf =
182 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
183 seq_write_pxar_entry(output, htype, buf, position).await
184 }
185
186 /// Error conditions caused by wrong usage of this crate.
187 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
188 pub enum EncodeError {
189 /// The user dropped a `File` without without finishing writing all of its contents.
190 ///
191 /// This is required because the payload lengths is written out at the begining and decoding
192 /// requires there to follow the right amount of data.
193 IncompleteFile,
194
195 /// The user dropped a directory without finalizing it.
196 ///
197 /// Finalizing is required to build the goodbye table at the end of a directory.
198 IncompleteDirectory,
199 }
200
201 #[derive(Default)]
202 struct EncoderState {
203 /// Goodbye items for this directory, excluding the tail.
204 items: Vec<GoodbyeItem>,
205
206 /// User caused error conditions.
207 encode_error: Option<EncodeError>,
208
209 /// Offset of this directory's ENTRY.
210 entry_offset: u64,
211
212 #[allow(dead_code)]
213 /// Offset to this directory's first FILENAME.
214 files_offset: u64,
215
216 /// If this is a subdirectory, this points to the this directory's FILENAME.
217 file_offset: Option<u64>,
218
219 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
220 file_hash: u64,
221
222 /// We need to keep track how much we have written to get offsets.
223 write_position: u64,
224 }
225
226 impl EncoderState {
227 fn merge_error(&mut self, error: Option<EncodeError>) {
228 // one error is enough:
229 if self.encode_error.is_none() {
230 self.encode_error = error;
231 }
232 }
233
234 fn add_error(&mut self, error: EncodeError) {
235 self.merge_error(Some(error));
236 }
237 }
238
239 pub(crate) enum EncoderOutput<'a, T> {
240 Owned(T),
241 Borrowed(&'a mut T),
242 }
243
244 impl<'a, T> EncoderOutput<'a, T> {
245 #[inline]
246 fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
247 where
248 'a: 's,
249 {
250 EncoderOutput::Borrowed(self.as_mut())
251 }
252 }
253
254 impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
255 fn as_mut(&mut self) -> &mut T {
256 match self {
257 EncoderOutput::Owned(o) => o,
258 EncoderOutput::Borrowed(b) => b,
259 }
260 }
261 }
262
263 impl<'a, T> std::convert::From<T> for EncoderOutput<'a, T> {
264 fn from(t: T) -> Self {
265 EncoderOutput::Owned(t)
266 }
267 }
268
269 impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
270 fn from(t: &'a mut T) -> Self {
271 EncoderOutput::Borrowed(t)
272 }
273 }
274
275 /// The encoder state machine implementation for a directory.
276 ///
277 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
278 /// synchronous or `async` I/O objects in as output.
279 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
280 output: EncoderOutput<'a, T>,
281 state: EncoderState,
282 parent: Option<&'a mut EncoderState>,
283 finished: bool,
284
285 /// Since only the "current" entry can be actively writing files, we share the file copy
286 /// buffer.
287 file_copy_buffer: Arc<Mutex<Vec<u8>>>,
288 }
289
290 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
291 fn drop(&mut self) {
292 if let Some(ref mut parent) = self.parent {
293 // propagate errors:
294 parent.merge_error(self.state.encode_error);
295 if !self.finished {
296 parent.add_error(EncodeError::IncompleteDirectory);
297 }
298 } else if !self.finished {
299 // FIXME: how do we deal with this?
300 // eprintln!("Encoder dropped without finishing!");
301 }
302 }
303 }
304
305 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
306 pub async fn new(
307 output: EncoderOutput<'a, T>,
308 metadata: &Metadata,
309 ) -> io::Result<EncoderImpl<'a, T>> {
310 if !metadata.is_dir() {
311 io_bail!("directory metadata must contain the directory mode flag");
312 }
313 let mut this = Self {
314 output,
315 state: EncoderState::default(),
316 parent: None,
317 finished: false,
318 file_copy_buffer: Arc::new(Mutex::new(unsafe {
319 crate::util::vec_new_uninitialized(1024 * 1024)
320 })),
321 };
322
323 this.encode_metadata(metadata).await?;
324 this.state.files_offset = this.position();
325
326 Ok(this)
327 }
328
329 fn check(&self) -> io::Result<()> {
330 match self.state.encode_error {
331 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
332 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
333 None => Ok(()),
334 }
335 }
336
337 pub async fn create_file<'b>(
338 &'b mut self,
339 metadata: &Metadata,
340 file_name: &Path,
341 file_size: u64,
342 ) -> io::Result<FileImpl<'b, T>>
343 where
344 'a: 'b,
345 {
346 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
347 .await
348 }
349
350 async fn create_file_do<'b>(
351 &'b mut self,
352 metadata: &Metadata,
353 file_name: &[u8],
354 file_size: u64,
355 ) -> io::Result<FileImpl<'b, T>>
356 where
357 'a: 'b,
358 {
359 self.check()?;
360
361 let file_offset = self.position();
362 self.start_file_do(Some(metadata), file_name).await?;
363
364 let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
365 header.check_header_size()?;
366
367 seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
368
369 let payload_data_offset = self.position();
370
371 let meta_size = payload_data_offset - file_offset;
372
373 Ok(FileImpl {
374 output: self.output.as_mut(),
375 goodbye_item: GoodbyeItem {
376 hash: format::hash_filename(file_name),
377 offset: file_offset,
378 size: file_size + meta_size,
379 },
380 remaining_size: file_size,
381 parent: &mut self.state,
382 })
383 }
384
385 fn take_file_copy_buffer(&self) -> Vec<u8> {
386 let buf: Vec<_> = take(
387 &mut self
388 .file_copy_buffer
389 .lock()
390 .expect("failed to lock temporary buffer mutex"),
391 );
392 if buf.len() < 1024 * 1024 {
393 drop(buf);
394 unsafe { crate::util::vec_new_uninitialized(1024 * 1024) }
395 } else {
396 buf
397 }
398 }
399
400 fn put_file_copy_buffer(&self, buf: Vec<u8>) {
401 let mut lock = self
402 .file_copy_buffer
403 .lock()
404 .expect("failed to lock temporary buffer mutex");
405 if lock.len() < buf.len() {
406 *lock = buf;
407 }
408 }
409
410 /// Return a file offset usable with `add_hardlink`.
411 pub async fn add_file(
412 &mut self,
413 metadata: &Metadata,
414 file_name: &Path,
415 file_size: u64,
416 content: &mut dyn SeqRead,
417 ) -> io::Result<LinkOffset> {
418 let mut buf = self.take_file_copy_buffer();
419 let mut file = self.create_file(metadata, file_name, file_size).await?;
420 loop {
421 let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
422 if got == 0 {
423 break;
424 } else {
425 file.write_all(&buf[..got]).await?;
426 }
427 }
428 let offset = file.file_offset();
429 drop(file);
430 self.put_file_copy_buffer(buf);
431 Ok(offset)
432 }
433
434 /// Return a file offset usable with `add_hardlink`.
435 pub async fn add_symlink(
436 &mut self,
437 metadata: &Metadata,
438 file_name: &Path,
439 target: &Path,
440 ) -> io::Result<()> {
441 let target = target.as_os_str().as_bytes();
442 let mut data = Vec::with_capacity(target.len() + 1);
443 data.extend(target);
444 data.push(0);
445 let _ofs: LinkOffset = self
446 .add_file_entry(
447 Some(metadata),
448 file_name,
449 Some((format::PXAR_SYMLINK, &data)),
450 )
451 .await?;
452 Ok(())
453 }
454
455 /// Return a file offset usable with `add_hardlink`.
456 pub async fn add_hardlink(
457 &mut self,
458 file_name: &Path,
459 target: &Path,
460 target_offset: LinkOffset,
461 ) -> io::Result<()> {
462 let current_offset = self.position();
463 if current_offset <= target_offset.0 {
464 io_bail!("invalid hardlink offset, can only point to prior files");
465 }
466
467 let offset_bytes = (current_offset - target_offset.0).to_le_bytes();
468 let target_bytes = target.as_os_str().as_bytes();
469 let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len() + 1);
470 hardlink.extend(&offset_bytes);
471 hardlink.extend(target_bytes);
472 hardlink.push(0);
473 let _this_offset: LinkOffset = self
474 .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink)))
475 .await?;
476 Ok(())
477 }
478
479 /// Return a file offset usable with `add_hardlink`.
480 pub async fn add_device(
481 &mut self,
482 metadata: &Metadata,
483 file_name: &Path,
484 device: format::Device,
485 ) -> io::Result<()> {
486 if !metadata.is_device() {
487 io_bail!("entry added via add_device must have a device mode in its metadata");
488 }
489
490 let device = device.to_le();
491 let device = unsafe {
492 std::slice::from_raw_parts(
493 &device as *const format::Device as *const u8,
494 size_of::<format::Device>(),
495 )
496 };
497 let _ofs: LinkOffset = self
498 .add_file_entry(
499 Some(metadata),
500 file_name,
501 Some((format::PXAR_DEVICE, device)),
502 )
503 .await?;
504 Ok(())
505 }
506
507 /// Return a file offset usable with `add_hardlink`.
508 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
509 if !metadata.is_fifo() {
510 io_bail!("entry added via add_device must be of type fifo in its metadata");
511 }
512
513 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
514 Ok(())
515 }
516
517 /// Return a file offset usable with `add_hardlink`.
518 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
519 if !metadata.is_socket() {
520 io_bail!("entry added via add_device must be of type socket in its metadata");
521 }
522
523 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
524 Ok(())
525 }
526
527 /// Return a file offset usable with `add_hardlink`.
528 async fn add_file_entry(
529 &mut self,
530 metadata: Option<&Metadata>,
531 file_name: &Path,
532 entry_htype_data: Option<(u64, &[u8])>,
533 ) -> io::Result<LinkOffset> {
534 self.check()?;
535
536 let file_offset = self.position();
537
538 let file_name = file_name.as_os_str().as_bytes();
539
540 self.start_file_do(metadata, file_name).await?;
541 if let Some((htype, entry_data)) = entry_htype_data {
542 seq_write_pxar_entry(
543 self.output.as_mut(),
544 htype,
545 entry_data,
546 &mut self.state.write_position,
547 )
548 .await?;
549 }
550
551 let end_offset = self.position();
552
553 self.state.items.push(GoodbyeItem {
554 hash: format::hash_filename(file_name),
555 offset: file_offset,
556 size: end_offset - file_offset,
557 });
558
559 Ok(LinkOffset(file_offset))
560 }
561
562 #[inline]
563 fn position(&mut self) -> u64 {
564 self.state.write_position
565 }
566
567 pub async fn create_directory(
568 &mut self,
569 file_name: &Path,
570 metadata: &Metadata,
571 ) -> io::Result<EncoderImpl<'_, T>> {
572 self.check()?;
573
574 if !metadata.is_dir() {
575 io_bail!("directory metadata must contain the directory mode flag");
576 }
577
578 let file_name = file_name.as_os_str().as_bytes();
579 let file_hash = format::hash_filename(file_name);
580
581 let file_offset = self.position();
582 self.encode_filename(file_name).await?;
583
584 let entry_offset = self.position();
585 self.encode_metadata(metadata).await?;
586
587 let files_offset = self.position();
588
589 // the child will write to OUR state now:
590 let write_position = self.position();
591
592 let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
593
594 Ok(EncoderImpl {
595 // always forward as Borrowed(), to avoid stacking references on nested calls
596 output: self.output.to_borrowed_mut(),
597 state: EncoderState {
598 entry_offset,
599 files_offset,
600 file_offset: Some(file_offset),
601 file_hash,
602 write_position,
603 ..Default::default()
604 },
605 parent: Some(&mut self.state),
606 finished: false,
607 file_copy_buffer,
608 })
609 }
610
611 async fn start_file_do(
612 &mut self,
613 metadata: Option<&Metadata>,
614 file_name: &[u8],
615 ) -> io::Result<()> {
616 self.encode_filename(file_name).await?;
617 if let Some(metadata) = metadata {
618 self.encode_metadata(metadata).await?;
619 }
620 Ok(())
621 }
622
623 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
624 seq_write_pxar_struct_entry(
625 self.output.as_mut(),
626 format::PXAR_ENTRY,
627 metadata.stat.clone(),
628 &mut self.state.write_position,
629 )
630 .await?;
631
632 for xattr in &metadata.xattrs {
633 self.write_xattr(xattr).await?;
634 }
635
636 self.write_acls(&metadata.acl).await?;
637
638 if let Some(fcaps) = &metadata.fcaps {
639 self.write_file_capabilities(fcaps).await?;
640 }
641
642 if let Some(qpid) = &metadata.quota_project_id {
643 self.write_quota_project_id(qpid).await?;
644 }
645
646 Ok(())
647 }
648
649 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
650 seq_write_pxar_entry(
651 self.output.as_mut(),
652 format::PXAR_XATTR,
653 &xattr.data,
654 &mut self.state.write_position,
655 )
656 .await
657 }
658
659 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
660 for acl in &acl.users {
661 seq_write_pxar_struct_entry(
662 self.output.as_mut(),
663 format::PXAR_ACL_USER,
664 acl.clone(),
665 &mut self.state.write_position,
666 )
667 .await?;
668 }
669
670 for acl in &acl.groups {
671 seq_write_pxar_struct_entry(
672 self.output.as_mut(),
673 format::PXAR_ACL_GROUP,
674 acl.clone(),
675 &mut self.state.write_position,
676 )
677 .await?;
678 }
679
680 if let Some(acl) = &acl.group_obj {
681 seq_write_pxar_struct_entry(
682 self.output.as_mut(),
683 format::PXAR_ACL_GROUP_OBJ,
684 acl.clone(),
685 &mut self.state.write_position,
686 )
687 .await?;
688 }
689
690 if let Some(acl) = &acl.default {
691 seq_write_pxar_struct_entry(
692 self.output.as_mut(),
693 format::PXAR_ACL_DEFAULT,
694 acl.clone(),
695 &mut self.state.write_position,
696 )
697 .await?;
698 }
699
700 for acl in &acl.default_users {
701 seq_write_pxar_struct_entry(
702 self.output.as_mut(),
703 format::PXAR_ACL_DEFAULT_USER,
704 acl.clone(),
705 &mut self.state.write_position,
706 )
707 .await?;
708 }
709
710 for acl in &acl.default_groups {
711 seq_write_pxar_struct_entry(
712 self.output.as_mut(),
713 format::PXAR_ACL_DEFAULT_GROUP,
714 acl.clone(),
715 &mut self.state.write_position,
716 )
717 .await?;
718 }
719
720 Ok(())
721 }
722
723 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
724 seq_write_pxar_entry(
725 self.output.as_mut(),
726 format::PXAR_FCAPS,
727 &fcaps.data,
728 &mut self.state.write_position,
729 )
730 .await
731 }
732
733 async fn write_quota_project_id(
734 &mut self,
735 quota_project_id: &format::QuotaProjectId,
736 ) -> io::Result<()> {
737 seq_write_pxar_struct_entry(
738 self.output.as_mut(),
739 format::PXAR_QUOTA_PROJID,
740 *quota_project_id,
741 &mut self.state.write_position,
742 )
743 .await
744 }
745
746 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
747 crate::util::validate_filename(file_name)?;
748 seq_write_pxar_entry_zero(
749 self.output.as_mut(),
750 format::PXAR_FILENAME,
751 file_name,
752 &mut self.state.write_position,
753 )
754 .await
755 }
756
757 pub async fn finish(mut self) -> io::Result<()> {
758 let tail_bytes = self.finish_goodbye_table().await?;
759 seq_write_pxar_entry(
760 self.output.as_mut(),
761 format::PXAR_GOODBYE,
762 &tail_bytes,
763 &mut self.state.write_position,
764 )
765 .await?;
766
767 if let EncoderOutput::Owned(output) = &mut self.output {
768 flush(output).await?;
769 }
770
771 // done up here because of the self-borrow and to propagate
772 let end_offset = self.position();
773
774 if let Some(parent) = &mut self.parent {
775 parent.write_position = end_offset;
776
777 let file_offset = self
778 .state
779 .file_offset
780 .expect("internal error: parent set but no file_offset?");
781
782 parent.items.push(GoodbyeItem {
783 hash: self.state.file_hash,
784 offset: file_offset,
785 size: end_offset - file_offset,
786 });
787 }
788 self.finished = true;
789 Ok(())
790 }
791
792 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
793 let goodbye_offset = self.position();
794
795 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
796 let mut tail = take(&mut self.state.items);
797 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
798 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
799
800 // sort, then create a BST
801 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
802
803 let mut bst = Vec::with_capacity(tail.len() + 1);
804 #[allow(clippy::uninit_vec)]
805 unsafe {
806 bst.set_len(tail.len());
807 }
808 binary_tree_array::copy(tail.len(), |src, dest| {
809 let mut item = tail[src].clone();
810 // fixup the goodbye table offsets to be relative and with the right endianess
811 item.offset = goodbye_offset - item.offset;
812 unsafe {
813 std::ptr::write(&mut bst[dest], item.to_le());
814 }
815 });
816 drop(tail);
817
818 bst.push(
819 GoodbyeItem {
820 hash: format::PXAR_GOODBYE_TAIL_MARKER,
821 offset: goodbye_offset - self.state.entry_offset,
822 size: goodbye_size,
823 }
824 .to_le(),
825 );
826
827 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
828 // the items make sense:
829 let data = bst.as_mut_ptr() as *mut u8;
830 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
831 forget(bst);
832 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
833 }
834 }
835
836 /// Writer for a file object in a directory.
837 pub(crate) struct FileImpl<'a, S: SeqWrite> {
838 output: &'a mut S,
839
840 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
841 /// directly instead of on Drop of FileImpl?
842 goodbye_item: GoodbyeItem,
843
844 /// While writing data to this file, this is how much space we still have left, this must reach
845 /// exactly zero.
846 remaining_size: u64,
847
848 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
849 /// to, and where we insert our `GoodbyeItem`.
850 parent: &'a mut EncoderState,
851 }
852
853 impl<'a, S: SeqWrite> Drop for FileImpl<'a, S> {
854 fn drop(&mut self) {
855 if self.remaining_size != 0 {
856 self.parent.add_error(EncodeError::IncompleteFile);
857 }
858
859 self.parent.items.push(self.goodbye_item.clone());
860 }
861 }
862
863 impl<'a, S: SeqWrite> FileImpl<'a, S> {
864 /// Get the file offset to be able to reference it with `add_hardlink`.
865 pub fn file_offset(&self) -> LinkOffset {
866 LinkOffset(self.goodbye_item.offset)
867 }
868
869 fn check_remaining(&self, size: usize) -> io::Result<()> {
870 if size as u64 > self.remaining_size {
871 io_bail!("attempted to write more than previously allocated");
872 } else {
873 Ok(())
874 }
875 }
876
877 /// Poll write interface to more easily connect to tokio/futures.
878 #[cfg(feature = "tokio-io")]
879 pub fn poll_write(
880 self: Pin<&mut Self>,
881 cx: &mut Context,
882 data: &[u8],
883 ) -> Poll<io::Result<usize>> {
884 let this = self.get_mut();
885 this.check_remaining(data.len())?;
886 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
887 match output.poll_seq_write(cx, data) {
888 Poll::Ready(Ok(put)) => {
889 this.remaining_size -= put as u64;
890 this.parent.write_position += put as u64;
891 Poll::Ready(Ok(put))
892 }
893 other => other,
894 }
895 }
896
897 /// Poll flush interface to more easily connect to tokio/futures.
898 #[cfg(feature = "tokio-io")]
899 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
900 unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
901 }
902
903 /// Poll close/shutdown interface to more easily connect to tokio/futures.
904 ///
905 /// This just calls flush, though, since we're just a virtual writer writing to the file
906 /// provided by our encoder.
907 #[cfg(feature = "tokio-io")]
908 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
909 unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
910 }
911
912 /// Write file data for the current file entry in a pxar archive.
913 ///
914 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
915 /// requested. Check the return value for how many. There's also a `write_all` method available
916 /// for convenience.
917 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
918 self.check_remaining(data.len())?;
919 let put =
920 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
921 .await?;
922 //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
923 self.remaining_size -= put as u64;
924 self.parent.write_position += put as u64;
925 Ok(put)
926 }
927
928 /// Completely write file data for the current file entry in a pxar archive.
929 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
930 self.check_remaining(data.len())?;
931 seq_write_all(self.output, data, &mut self.parent.write_position).await?;
932 self.remaining_size -= data.len() as u64;
933 Ok(())
934 }
935 }
936
937 #[cfg(feature = "tokio-io")]
938 impl<'a, S: SeqWrite> tokio::io::AsyncWrite for FileImpl<'a, S> {
939 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
940 FileImpl::poll_write(self, cx, buf)
941 }
942
943 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
944 FileImpl::poll_flush(self, cx)
945 }
946
947 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
948 FileImpl::poll_close(self, cx)
949 }
950 }