]> git.proxmox.com Git - pxar.git/blob - src/encoder/mod.rs
add more code documentation
[pxar.git] / src / encoder / mod.rs
1 //! The `pxar` encoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 #![deny(missing_docs)]
6
7 use std::io;
8 use std::mem::{forget, size_of, size_of_val, take};
9 use std::os::unix::ffi::OsStrExt;
10 use std::path::Path;
11 use std::pin::Pin;
12 use std::sync::{Arc, Mutex};
13 use std::task::{Context, Poll};
14
15 use endian_trait::Endian;
16
17 use crate::binary_tree_array;
18 use crate::decoder::{self, SeqRead};
19 use crate::format::{self, GoodbyeItem};
20 use crate::poll_fn::poll_fn;
21 use crate::Metadata;
22
23 pub mod aio;
24 pub mod sync;
25
26 #[doc(inline)]
27 pub use sync::Encoder;
28
29 /// File reference used to create hard links.
30 #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
31 pub struct LinkOffset(u64);
32
33 impl LinkOffset {
34 /// Get the raw byte offset of this link.
35 #[inline]
36 pub fn raw(self) -> u64 {
37 self.0
38 }
39 }
40
41 /// Sequential write interface used by the encoder's state machine.
42 ///
43 /// This is our internal writer trait which is available for `std::io::Write` types in the
44 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
45 /// wrapper.
46 pub trait SeqWrite {
47 /// Attempt to perform a sequential write to the file. On success, the number of written bytes
48 /// is returned as `Poll::Ready(Ok(bytes))`.
49 ///
50 /// If writing is not yet possible, `Poll::Pending` is returned and the current task will be
51 /// notified via the `cx.waker()` when writing becomes possible.
52 fn poll_seq_write(
53 self: Pin<&mut Self>,
54 cx: &mut Context,
55 buf: &[u8],
56 ) -> Poll<io::Result<usize>>;
57
58 /// Attempt to flush the output, ensuring that all buffered data reaches the destination.
59 ///
60 /// On success, returns `Poll::Ready(Ok(()))`.
61 ///
62 /// If flushing cannot complete immediately, `Poll::Pending` is returned and the current task
63 /// will be notified via `cx.waker()` when progress can be made.
64 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
65 }
66
67 /// Allow using trait objects for generics taking a `SeqWrite`.
68 impl<S> SeqWrite for &mut S
69 where
70 S: SeqWrite + ?Sized,
71 {
72 fn poll_seq_write(
73 self: Pin<&mut Self>,
74 cx: &mut Context,
75 buf: &[u8],
76 ) -> Poll<io::Result<usize>> {
77 unsafe {
78 self.map_unchecked_mut(|this| &mut **this)
79 .poll_seq_write(cx, buf)
80 }
81 }
82
83 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
84 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
85 }
86 }
87
88 /// awaitable verison of `poll_seq_write`.
89 async fn seq_write<T: SeqWrite + ?Sized>(
90 output: &mut T,
91 buf: &[u8],
92 position: &mut u64,
93 ) -> io::Result<usize> {
94 let put =
95 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await?;
96 *position += put as u64;
97 Ok(put)
98 }
99
100 /// awaitable version of 'poll_flush'.
101 async fn flush<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<()> {
102 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_flush(cx) }).await
103 }
104
105 /// Write the entire contents of a buffer, handling short writes.
106 async fn seq_write_all<T: SeqWrite + ?Sized>(
107 output: &mut T,
108 mut buf: &[u8],
109 position: &mut u64,
110 ) -> io::Result<()> {
111 while !buf.is_empty() {
112 let got = seq_write(&mut *output, buf, &mut *position).await?;
113 buf = &buf[got..];
114 }
115 Ok(())
116 }
117
118 /// Write an endian-swappable struct.
119 async fn seq_write_struct<E: Endian, T>(
120 output: &mut T,
121 data: E,
122 position: &mut u64,
123 ) -> io::Result<()>
124 where
125 T: SeqWrite + ?Sized,
126 {
127 let data = data.to_le();
128 let buf =
129 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
130 seq_write_all(output, buf, position).await
131 }
132
133 /// Write a pxar entry.
134 async fn seq_write_pxar_entry<T>(
135 output: &mut T,
136 htype: u64,
137 data: &[u8],
138 position: &mut u64,
139 ) -> io::Result<()>
140 where
141 T: SeqWrite + ?Sized,
142 {
143 let header = format::Header::with_content_size(htype, data.len() as u64);
144 header.check_header_size()?;
145
146 seq_write_struct(&mut *output, header, &mut *position).await?;
147 seq_write_all(output, data, position).await
148 }
149
150 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
151 /// data buffer.
152 async fn seq_write_pxar_entry_zero<T>(
153 output: &mut T,
154 htype: u64,
155 data: &[u8],
156 position: &mut u64,
157 ) -> io::Result<()>
158 where
159 T: SeqWrite + ?Sized,
160 {
161 let header = format::Header::with_content_size(htype, 1 + data.len() as u64);
162 header.check_header_size()?;
163
164 seq_write_struct(&mut *output, header, &mut *position).await?;
165 seq_write_all(&mut *output, data, &mut *position).await?;
166 seq_write_all(output, &[0u8], position).await
167 }
168
169 /// Write a pxar entry consiting of an endian-swappable struct.
170 async fn seq_write_pxar_struct_entry<E, T>(
171 output: &mut T,
172 htype: u64,
173 data: E,
174 position: &mut u64,
175 ) -> io::Result<()>
176 where
177 T: SeqWrite + ?Sized,
178 E: Endian,
179 {
180 let data = data.to_le();
181 let buf =
182 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
183 seq_write_pxar_entry(output, htype, buf, position).await
184 }
185
186 /// Error conditions caused by wrong usage of this crate.
187 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
188 pub enum EncodeError {
189 /// The user dropped a `File` without without finishing writing all of its contents.
190 ///
191 /// This is required because the payload lengths is written out at the begining and decoding
192 /// requires there to follow the right amount of data.
193 IncompleteFile,
194
195 /// The user dropped a directory without finalizing it.
196 ///
197 /// Finalizing is required to build the goodbye table at the end of a directory.
198 IncompleteDirectory,
199 }
200
201 #[derive(Default)]
202 struct EncoderState {
203 /// Goodbye items for this directory, excluding the tail.
204 items: Vec<GoodbyeItem>,
205
206 /// User caused error conditions.
207 encode_error: Option<EncodeError>,
208
209 /// Offset of this directory's ENTRY.
210 entry_offset: u64,
211
212 #[allow(dead_code)]
213 /// Offset to this directory's first FILENAME.
214 files_offset: u64,
215
216 /// If this is a subdirectory, this points to the this directory's FILENAME.
217 file_offset: Option<u64>,
218
219 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
220 file_hash: u64,
221
222 /// We need to keep track how much we have written to get offsets.
223 write_position: u64,
224 }
225
226 impl EncoderState {
227 fn merge_error(&mut self, error: Option<EncodeError>) {
228 // one error is enough:
229 if self.encode_error.is_none() {
230 self.encode_error = error;
231 }
232 }
233
234 fn add_error(&mut self, error: EncodeError) {
235 self.merge_error(Some(error));
236 }
237 }
238
239 pub(crate) enum EncoderOutput<'a, T> {
240 Owned(T),
241 Borrowed(&'a mut T),
242 }
243
244 impl<'a, T> EncoderOutput<'a, T> {
245 #[inline]
246 fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
247 where
248 'a: 's,
249 {
250 EncoderOutput::Borrowed(self.as_mut())
251 }
252 }
253
254 impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
255 fn as_mut(&mut self) -> &mut T {
256 match self {
257 EncoderOutput::Owned(o) => o,
258 EncoderOutput::Borrowed(b) => b,
259 }
260 }
261 }
262
263 impl<'a, T> std::convert::From<T> for EncoderOutput<'a, T> {
264 fn from(t: T) -> Self {
265 EncoderOutput::Owned(t)
266 }
267 }
268
269 impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
270 fn from(t: &'a mut T) -> Self {
271 EncoderOutput::Borrowed(t)
272 }
273 }
274
275 /// The encoder state machine implementation for a directory.
276 ///
277 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
278 /// synchronous or `async` I/O objects in as output.
279 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
280 output: EncoderOutput<'a, T>,
281 state: EncoderState,
282 parent: Option<&'a mut EncoderState>,
283 finished: bool,
284
285 /// Since only the "current" entry can be actively writing files, we share the file copy
286 /// buffer.
287 file_copy_buffer: Arc<Mutex<Vec<u8>>>,
288 }
289
290 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
291 fn drop(&mut self) {
292 if let Some(ref mut parent) = self.parent {
293 // propagate errors:
294 parent.merge_error(self.state.encode_error);
295 if !self.finished {
296 parent.add_error(EncodeError::IncompleteDirectory);
297 }
298 } else if !self.finished {
299 // FIXME: how do we deal with this?
300 // eprintln!("Encoder dropped without finishing!");
301 }
302 }
303 }
304
305 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
306 pub async fn new(
307 output: EncoderOutput<'a, T>,
308 metadata: &Metadata,
309 ) -> io::Result<EncoderImpl<'a, T>> {
310 if !metadata.is_dir() {
311 io_bail!("directory metadata must contain the directory mode flag");
312 }
313 let mut this = Self {
314 output,
315 state: EncoderState::default(),
316 parent: None,
317 finished: false,
318 file_copy_buffer: Arc::new(Mutex::new(crate::util::vec_new(1024 * 1024))),
319 };
320
321 this.encode_metadata(metadata).await?;
322 this.state.files_offset = this.position();
323
324 Ok(this)
325 }
326
327 fn check(&self) -> io::Result<()> {
328 match self.state.encode_error {
329 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
330 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
331 None => Ok(()),
332 }
333 }
334
335 pub async fn create_file<'b>(
336 &'b mut self,
337 metadata: &Metadata,
338 file_name: &Path,
339 file_size: u64,
340 ) -> io::Result<FileImpl<'b, T>>
341 where
342 'a: 'b,
343 {
344 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
345 .await
346 }
347
348 async fn create_file_do<'b>(
349 &'b mut self,
350 metadata: &Metadata,
351 file_name: &[u8],
352 file_size: u64,
353 ) -> io::Result<FileImpl<'b, T>>
354 where
355 'a: 'b,
356 {
357 self.check()?;
358
359 let file_offset = self.position();
360 self.start_file_do(Some(metadata), file_name).await?;
361
362 let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
363 header.check_header_size()?;
364
365 seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
366
367 let payload_data_offset = self.position();
368
369 let meta_size = payload_data_offset - file_offset;
370
371 Ok(FileImpl {
372 output: self.output.as_mut(),
373 goodbye_item: GoodbyeItem {
374 hash: format::hash_filename(file_name),
375 offset: file_offset,
376 size: file_size + meta_size,
377 },
378 remaining_size: file_size,
379 parent: &mut self.state,
380 })
381 }
382
383 /// Return a file offset usable with `add_hardlink`.
384 pub async fn add_file(
385 &mut self,
386 metadata: &Metadata,
387 file_name: &Path,
388 file_size: u64,
389 content: &mut dyn SeqRead,
390 ) -> io::Result<LinkOffset> {
391 let buf = Arc::clone(&self.file_copy_buffer);
392 let mut file = self.create_file(metadata, file_name, file_size).await?;
393 let mut buf = buf.lock().expect("failed to lock temporary buffer mutex");
394 loop {
395 let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
396 if got == 0 {
397 break;
398 } else {
399 file.write_all(&buf[..got]).await?;
400 }
401 }
402 Ok(file.file_offset())
403 }
404
405 /// Return a file offset usable with `add_hardlink`.
406 pub async fn add_symlink(
407 &mut self,
408 metadata: &Metadata,
409 file_name: &Path,
410 target: &Path,
411 ) -> io::Result<()> {
412 let target = target.as_os_str().as_bytes();
413 let mut data = Vec::with_capacity(target.len() + 1);
414 data.extend(target);
415 data.push(0);
416 let _ofs: LinkOffset = self
417 .add_file_entry(
418 Some(metadata),
419 file_name,
420 Some((format::PXAR_SYMLINK, &data)),
421 )
422 .await?;
423 Ok(())
424 }
425
426 /// Return a file offset usable with `add_hardlink`.
427 pub async fn add_hardlink(
428 &mut self,
429 file_name: &Path,
430 target: &Path,
431 target_offset: LinkOffset,
432 ) -> io::Result<()> {
433 let current_offset = self.position();
434 if current_offset <= target_offset.0 {
435 io_bail!("invalid hardlink offset, can only point to prior files");
436 }
437
438 let offset_bytes = (current_offset - target_offset.0).to_le_bytes();
439 let target_bytes = target.as_os_str().as_bytes();
440 let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len() + 1);
441 hardlink.extend(&offset_bytes);
442 hardlink.extend(target_bytes);
443 hardlink.push(0);
444 let _this_offset: LinkOffset = self
445 .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink)))
446 .await?;
447 Ok(())
448 }
449
450 /// Return a file offset usable with `add_hardlink`.
451 pub async fn add_device(
452 &mut self,
453 metadata: &Metadata,
454 file_name: &Path,
455 device: format::Device,
456 ) -> io::Result<()> {
457 if !metadata.is_device() {
458 io_bail!("entry added via add_device must have a device mode in its metadata");
459 }
460
461 let device = device.to_le();
462 let device = unsafe {
463 std::slice::from_raw_parts(
464 &device as *const format::Device as *const u8,
465 size_of::<format::Device>(),
466 )
467 };
468 let _ofs: LinkOffset = self
469 .add_file_entry(
470 Some(metadata),
471 file_name,
472 Some((format::PXAR_DEVICE, device)),
473 )
474 .await?;
475 Ok(())
476 }
477
478 /// Return a file offset usable with `add_hardlink`.
479 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
480 if !metadata.is_fifo() {
481 io_bail!("entry added via add_device must be of type fifo in its metadata");
482 }
483
484 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
485 Ok(())
486 }
487
488 /// Return a file offset usable with `add_hardlink`.
489 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
490 if !metadata.is_socket() {
491 io_bail!("entry added via add_device must be of type socket in its metadata");
492 }
493
494 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
495 Ok(())
496 }
497
498 /// Return a file offset usable with `add_hardlink`.
499 async fn add_file_entry(
500 &mut self,
501 metadata: Option<&Metadata>,
502 file_name: &Path,
503 entry_htype_data: Option<(u64, &[u8])>,
504 ) -> io::Result<LinkOffset> {
505 self.check()?;
506
507 let file_offset = self.position();
508
509 let file_name = file_name.as_os_str().as_bytes();
510
511 self.start_file_do(metadata, file_name).await?;
512 if let Some((htype, entry_data)) = entry_htype_data {
513 seq_write_pxar_entry(
514 self.output.as_mut(),
515 htype,
516 entry_data,
517 &mut self.state.write_position,
518 )
519 .await?;
520 }
521
522 let end_offset = self.position();
523
524 self.state.items.push(GoodbyeItem {
525 hash: format::hash_filename(file_name),
526 offset: file_offset,
527 size: end_offset - file_offset,
528 });
529
530 Ok(LinkOffset(file_offset))
531 }
532
533 #[inline]
534 fn position(&mut self) -> u64 {
535 self.state.write_position
536 }
537
538 pub async fn create_directory(
539 &mut self,
540 file_name: &Path,
541 metadata: &Metadata,
542 ) -> io::Result<EncoderImpl<'_, T>> {
543 self.check()?;
544
545 if !metadata.is_dir() {
546 io_bail!("directory metadata must contain the directory mode flag");
547 }
548
549 let file_name = file_name.as_os_str().as_bytes();
550 let file_hash = format::hash_filename(file_name);
551
552 let file_offset = self.position();
553 self.encode_filename(file_name).await?;
554
555 let entry_offset = self.position();
556 self.encode_metadata(&metadata).await?;
557
558 let files_offset = self.position();
559
560 // the child will write to OUR state now:
561 let write_position = self.position();
562
563 let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
564
565 Ok(EncoderImpl {
566 // always forward as Borrowed(), to avoid stacking references on nested calls
567 output: self.output.to_borrowed_mut(),
568 state: EncoderState {
569 entry_offset,
570 files_offset,
571 file_offset: Some(file_offset),
572 file_hash,
573 write_position,
574 ..Default::default()
575 },
576 parent: Some(&mut self.state),
577 finished: false,
578 file_copy_buffer,
579 })
580 }
581
582 async fn start_file_do(
583 &mut self,
584 metadata: Option<&Metadata>,
585 file_name: &[u8],
586 ) -> io::Result<()> {
587 self.encode_filename(file_name).await?;
588 if let Some(metadata) = metadata {
589 self.encode_metadata(&metadata).await?;
590 }
591 Ok(())
592 }
593
594 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
595 seq_write_pxar_struct_entry(
596 self.output.as_mut(),
597 format::PXAR_ENTRY,
598 metadata.stat.clone(),
599 &mut self.state.write_position,
600 )
601 .await?;
602
603 for xattr in &metadata.xattrs {
604 self.write_xattr(xattr).await?;
605 }
606
607 self.write_acls(&metadata.acl).await?;
608
609 if let Some(fcaps) = &metadata.fcaps {
610 self.write_file_capabilities(fcaps).await?;
611 }
612
613 if let Some(qpid) = &metadata.quota_project_id {
614 self.write_quota_project_id(qpid).await?;
615 }
616
617 Ok(())
618 }
619
620 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
621 seq_write_pxar_entry(
622 self.output.as_mut(),
623 format::PXAR_XATTR,
624 &xattr.data,
625 &mut self.state.write_position,
626 )
627 .await
628 }
629
630 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
631 for acl in &acl.users {
632 seq_write_pxar_struct_entry(
633 self.output.as_mut(),
634 format::PXAR_ACL_USER,
635 acl.clone(),
636 &mut self.state.write_position,
637 )
638 .await?;
639 }
640
641 for acl in &acl.groups {
642 seq_write_pxar_struct_entry(
643 self.output.as_mut(),
644 format::PXAR_ACL_GROUP,
645 acl.clone(),
646 &mut self.state.write_position,
647 )
648 .await?;
649 }
650
651 if let Some(acl) = &acl.group_obj {
652 seq_write_pxar_struct_entry(
653 self.output.as_mut(),
654 format::PXAR_ACL_GROUP_OBJ,
655 acl.clone(),
656 &mut self.state.write_position,
657 )
658 .await?;
659 }
660
661 if let Some(acl) = &acl.default {
662 seq_write_pxar_struct_entry(
663 self.output.as_mut(),
664 format::PXAR_ACL_DEFAULT,
665 acl.clone(),
666 &mut self.state.write_position,
667 )
668 .await?;
669 }
670
671 for acl in &acl.default_users {
672 seq_write_pxar_struct_entry(
673 self.output.as_mut(),
674 format::PXAR_ACL_DEFAULT_USER,
675 acl.clone(),
676 &mut self.state.write_position,
677 )
678 .await?;
679 }
680
681 for acl in &acl.default_groups {
682 seq_write_pxar_struct_entry(
683 self.output.as_mut(),
684 format::PXAR_ACL_DEFAULT_GROUP,
685 acl.clone(),
686 &mut self.state.write_position,
687 )
688 .await?;
689 }
690
691 Ok(())
692 }
693
694 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
695 seq_write_pxar_entry(
696 self.output.as_mut(),
697 format::PXAR_FCAPS,
698 &fcaps.data,
699 &mut self.state.write_position,
700 )
701 .await
702 }
703
704 async fn write_quota_project_id(
705 &mut self,
706 quota_project_id: &format::QuotaProjectId,
707 ) -> io::Result<()> {
708 seq_write_pxar_struct_entry(
709 self.output.as_mut(),
710 format::PXAR_QUOTA_PROJID,
711 *quota_project_id,
712 &mut self.state.write_position,
713 )
714 .await
715 }
716
717 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
718 crate::util::validate_filename(file_name)?;
719 seq_write_pxar_entry_zero(
720 self.output.as_mut(),
721 format::PXAR_FILENAME,
722 file_name,
723 &mut self.state.write_position,
724 )
725 .await
726 }
727
728 pub async fn finish(mut self) -> io::Result<()> {
729 let tail_bytes = self.finish_goodbye_table().await?;
730 seq_write_pxar_entry(
731 self.output.as_mut(),
732 format::PXAR_GOODBYE,
733 &tail_bytes,
734 &mut self.state.write_position,
735 )
736 .await?;
737
738 if let EncoderOutput::Owned(output) = &mut self.output {
739 flush(output).await?;
740 }
741
742 // done up here because of the self-borrow and to propagate
743 let end_offset = self.position();
744
745 if let Some(parent) = &mut self.parent {
746 parent.write_position = end_offset;
747
748 let file_offset = self
749 .state
750 .file_offset
751 .expect("internal error: parent set but no file_offset?");
752
753 parent.items.push(GoodbyeItem {
754 hash: self.state.file_hash,
755 offset: file_offset,
756 size: end_offset - file_offset,
757 });
758 }
759 self.finished = true;
760 Ok(())
761 }
762
763 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
764 let goodbye_offset = self.position();
765
766 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
767 let mut tail = take(&mut self.state.items);
768 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
769 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
770
771 // sort, then create a BST
772 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
773
774 let mut bst = Vec::with_capacity(tail.len() + 1);
775 unsafe {
776 bst.set_len(tail.len());
777 }
778 binary_tree_array::copy(tail.len(), |src, dest| {
779 let mut item = tail[src].clone();
780 // fixup the goodbye table offsets to be relative and with the right endianess
781 item.offset = goodbye_offset - item.offset;
782 unsafe {
783 std::ptr::write(&mut bst[dest], item.to_le());
784 }
785 });
786 drop(tail);
787
788 bst.push(
789 GoodbyeItem {
790 hash: format::PXAR_GOODBYE_TAIL_MARKER,
791 offset: goodbye_offset - self.state.entry_offset,
792 size: goodbye_size,
793 }
794 .to_le(),
795 );
796
797 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
798 // the items make sense:
799 let data = bst.as_mut_ptr() as *mut u8;
800 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
801 forget(bst);
802 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
803 }
804 }
805
806 /// Writer for a file object in a directory.
807 pub(crate) struct FileImpl<'a, S: SeqWrite> {
808 output: &'a mut S,
809
810 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
811 /// directly instead of on Drop of FileImpl?
812 goodbye_item: GoodbyeItem,
813
814 /// While writing data to this file, this is how much space we still have left, this must reach
815 /// exactly zero.
816 remaining_size: u64,
817
818 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
819 /// to, and where we insert our `GoodbyeItem`.
820 parent: &'a mut EncoderState,
821 }
822
823 impl<'a, S: SeqWrite> Drop for FileImpl<'a, S> {
824 fn drop(&mut self) {
825 if self.remaining_size != 0 {
826 self.parent.add_error(EncodeError::IncompleteFile);
827 }
828
829 self.parent.items.push(self.goodbye_item.clone());
830 }
831 }
832
833 impl<'a, S: SeqWrite> FileImpl<'a, S> {
834 /// Get the file offset to be able to reference it with `add_hardlink`.
835 pub fn file_offset(&self) -> LinkOffset {
836 LinkOffset(self.goodbye_item.offset)
837 }
838
839 fn check_remaining(&self, size: usize) -> io::Result<()> {
840 if size as u64 > self.remaining_size {
841 io_bail!("attempted to write more than previously allocated");
842 } else {
843 Ok(())
844 }
845 }
846
847 /// Poll write interface to more easily connect to tokio/futures.
848 #[cfg(feature = "tokio-io")]
849 pub fn poll_write(
850 self: Pin<&mut Self>,
851 cx: &mut Context,
852 data: &[u8],
853 ) -> Poll<io::Result<usize>> {
854 let this = self.get_mut();
855 this.check_remaining(data.len())?;
856 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
857 match output.poll_seq_write(cx, data) {
858 Poll::Ready(Ok(put)) => {
859 this.remaining_size -= put as u64;
860 this.parent.write_position += put as u64;
861 Poll::Ready(Ok(put))
862 }
863 other => other,
864 }
865 }
866
867 /// Poll flush interface to more easily connect to tokio/futures.
868 #[cfg(feature = "tokio-io")]
869 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
870 unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
871 }
872
873 /// Poll close/shutdown interface to more easily connect to tokio/futures.
874 ///
875 /// This just calls flush, though, since we're just a virtual writer writing to the file
876 /// provided by our encoder.
877 #[cfg(feature = "tokio-io")]
878 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
879 unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
880 }
881
882 /// Write file data for the current file entry in a pxar archive.
883 ///
884 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
885 /// requested. Check the return value for how many. There's also a `write_all` method available
886 /// for convenience.
887 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
888 self.check_remaining(data.len())?;
889 let put =
890 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
891 .await?;
892 //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
893 self.remaining_size -= put as u64;
894 self.parent.write_position += put as u64;
895 Ok(put)
896 }
897
898 /// Completely write file data for the current file entry in a pxar archive.
899 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
900 self.check_remaining(data.len())?;
901 seq_write_all(self.output, data, &mut self.parent.write_position).await?;
902 self.remaining_size -= data.len() as u64;
903 Ok(())
904 }
905 }
906
907 #[cfg(feature = "tokio-io")]
908 impl<'a, S: SeqWrite> tokio::io::AsyncWrite for FileImpl<'a, S> {
909 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
910 FileImpl::poll_write(self, cx, buf)
911 }
912
913 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
914 FileImpl::poll_flush(self, cx)
915 }
916
917 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
918 FileImpl::poll_close(self, cx)
919 }
920 }