]> git.proxmox.com Git - pxar.git/blob - src/encoder/mod.rs
encoder: flush after writing last entry
[pxar.git] / src / encoder / mod.rs
1 //! The `pxar` encoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 use std::io;
6 use std::mem::{forget, size_of, size_of_val, take};
7 use std::os::unix::ffi::OsStrExt;
8 use std::path::Path;
9 use std::pin::Pin;
10 use std::sync::{Arc, Mutex};
11 use std::task::{Context, Poll};
12
13 use endian_trait::Endian;
14
15 use crate::binary_tree_array;
16 use crate::decoder::{self, SeqRead};
17 use crate::format::{self, GoodbyeItem};
18 use crate::poll_fn::poll_fn;
19 use crate::Metadata;
20
21 pub mod aio;
22 pub mod sync;
23
24 #[doc(inline)]
25 pub use sync::Encoder;
26
27 /// File reference used to create hard links.
28 #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
29 pub struct LinkOffset(u64);
30
31 impl LinkOffset {
32 #[inline]
33 pub fn raw(self) -> u64 {
34 self.0
35 }
36 }
37
38 /// Sequential write interface used by the encoder's state machine.
39 ///
40 /// This is our internal writer trait which is available for `std::io::Write` types in the
41 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
42 /// wrapper.
43 pub trait SeqWrite {
44 fn poll_seq_write(
45 self: Pin<&mut Self>,
46 cx: &mut Context,
47 buf: &[u8],
48 ) -> Poll<io::Result<usize>>;
49
50 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
51 }
52
53 /// Allow using trait objects for generics taking a `SeqWrite`.
54 impl<S> SeqWrite for &mut S
55 where
56 S: SeqWrite + ?Sized,
57 {
58 fn poll_seq_write(
59 self: Pin<&mut Self>,
60 cx: &mut Context,
61 buf: &[u8],
62 ) -> Poll<io::Result<usize>> {
63 unsafe {
64 self.map_unchecked_mut(|this| &mut **this)
65 .poll_seq_write(cx, buf)
66 }
67 }
68
69 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
70 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
71 }
72 }
73
74 /// awaitable verison of `poll_seq_write`.
75 async fn seq_write<T: SeqWrite + ?Sized>(
76 output: &mut T,
77 buf: &[u8],
78 position: &mut u64,
79 ) -> io::Result<usize> {
80 let put =
81 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await?;
82 *position += put as u64;
83 Ok(put)
84 }
85
86 /// awaitable version of 'poll_flush'.
87 async fn flush<T: SeqWrite + ?Sized>(
88 output: &mut T,
89 ) -> io::Result<()> {
90 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_flush(cx) }).await
91 }
92
93 /// Write the entire contents of a buffer, handling short writes.
94 async fn seq_write_all<T: SeqWrite + ?Sized>(
95 output: &mut T,
96 mut buf: &[u8],
97 position: &mut u64,
98 ) -> io::Result<()> {
99 while !buf.is_empty() {
100 let got = seq_write(&mut *output, buf, &mut *position).await?;
101 buf = &buf[got..];
102 }
103 Ok(())
104 }
105
106 /// Write an endian-swappable struct.
107 async fn seq_write_struct<E: Endian, T>(
108 output: &mut T,
109 data: E,
110 position: &mut u64,
111 ) -> io::Result<()>
112 where
113 T: SeqWrite + ?Sized,
114 {
115 let data = data.to_le();
116 let buf =
117 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
118 seq_write_all(output, buf, position).await
119 }
120
121 /// Write a pxar entry.
122 async fn seq_write_pxar_entry<T>(
123 output: &mut T,
124 htype: u64,
125 data: &[u8],
126 position: &mut u64,
127 ) -> io::Result<()>
128 where
129 T: SeqWrite + ?Sized,
130 {
131 let header = format::Header::with_content_size(htype, data.len() as u64);
132 header.check_header_size()?;
133
134 seq_write_struct(&mut *output, header, &mut *position).await?;
135 seq_write_all(output, data, position).await
136 }
137
138 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
139 /// data buffer.
140 async fn seq_write_pxar_entry_zero<T>(
141 output: &mut T,
142 htype: u64,
143 data: &[u8],
144 position: &mut u64,
145 ) -> io::Result<()>
146 where
147 T: SeqWrite + ?Sized,
148 {
149 let header = format::Header::with_content_size(htype, 1 + data.len() as u64);
150 header.check_header_size()?;
151
152 seq_write_struct(&mut *output, header, &mut *position).await?;
153 seq_write_all(&mut *output, data, &mut *position).await?;
154 seq_write_all(output, &[0u8], position).await
155 }
156
157 /// Write a pxar entry consiting of an endian-swappable struct.
158 async fn seq_write_pxar_struct_entry<E, T>(
159 output: &mut T,
160 htype: u64,
161 data: E,
162 position: &mut u64,
163 ) -> io::Result<()>
164 where
165 T: SeqWrite + ?Sized,
166 E: Endian,
167 {
168 let data = data.to_le();
169 let buf =
170 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
171 seq_write_pxar_entry(output, htype, buf, position).await
172 }
173
174 /// Error conditions caused by wrong usage of this crate.
175 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
176 pub enum EncodeError {
177 /// The user dropped a `File` without without finishing writing all of its contents.
178 ///
179 /// This is required because the payload lengths is written out at the begining and decoding
180 /// requires there to follow the right amount of data.
181 IncompleteFile,
182
183 /// The user dropped a directory without finalizing it.
184 ///
185 /// Finalizing is required to build the goodbye table at the end of a directory.
186 IncompleteDirectory,
187 }
188
189 #[derive(Default)]
190 struct EncoderState {
191 /// Goodbye items for this directory, excluding the tail.
192 items: Vec<GoodbyeItem>,
193
194 /// User caused error conditions.
195 encode_error: Option<EncodeError>,
196
197 /// Offset of this directory's ENTRY.
198 entry_offset: u64,
199
200 /// Offset to this directory's first FILENAME.
201 files_offset: u64,
202
203 /// If this is a subdirectory, this points to the this directory's FILENAME.
204 file_offset: Option<u64>,
205
206 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
207 file_hash: u64,
208
209 /// We need to keep track how much we have written to get offsets.
210 write_position: u64,
211 }
212
213 impl EncoderState {
214 fn merge_error(&mut self, error: Option<EncodeError>) {
215 // one error is enough:
216 if self.encode_error.is_none() {
217 self.encode_error = error;
218 }
219 }
220
221 fn add_error(&mut self, error: EncodeError) {
222 self.merge_error(Some(error));
223 }
224 }
225
226 pub(crate) enum EncoderOutput<'a, T> {
227 Owned(T),
228 Borrowed(&'a mut T),
229 }
230
231 impl<'a, T> EncoderOutput<'a, T> {
232 #[inline]
233 fn to_borrowed<'s>(&'s mut self) -> EncoderOutput<'s, T>
234 where
235 'a: 's,
236 {
237 EncoderOutput::Borrowed(self.as_mut())
238 }
239 }
240
241 impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
242 fn as_mut(&mut self) -> &mut T {
243 match self {
244 EncoderOutput::Owned(o) => o,
245 EncoderOutput::Borrowed(b) => b,
246 }
247 }
248 }
249
250 impl<'a, T> std::convert::From<T> for EncoderOutput<'a, T> {
251 fn from(t: T) -> Self {
252 EncoderOutput::Owned(t)
253 }
254 }
255
256 impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
257 fn from(t: &'a mut T) -> Self {
258 EncoderOutput::Borrowed(t)
259 }
260 }
261
262 /// The encoder state machine implementation for a directory.
263 ///
264 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
265 /// synchronous or `async` I/O objects in as output.
266 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
267 output: EncoderOutput<'a, T>,
268 state: EncoderState,
269 parent: Option<&'a mut EncoderState>,
270 finished: bool,
271
272 /// Since only the "current" entry can be actively writing files, we share the file copy
273 /// buffer.
274 file_copy_buffer: Arc<Mutex<Vec<u8>>>,
275 }
276
277 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
278 fn drop(&mut self) {
279 if let Some(ref mut parent) = self.parent {
280 // propagate errors:
281 parent.merge_error(self.state.encode_error);
282 if !self.finished {
283 parent.add_error(EncodeError::IncompleteDirectory);
284 }
285 } else if !self.finished {
286 // FIXME: how do we deal with this?
287 // eprintln!("Encoder dropped without finishing!");
288 }
289 }
290 }
291
292 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
293 pub async fn new(
294 output: EncoderOutput<'a, T>,
295 metadata: &Metadata,
296 ) -> io::Result<EncoderImpl<'a, T>> {
297 if !metadata.is_dir() {
298 io_bail!("directory metadata must contain the directory mode flag");
299 }
300 let mut this = Self {
301 output,
302 state: EncoderState::default(),
303 parent: None,
304 finished: false,
305 file_copy_buffer: Arc::new(Mutex::new(crate::util::vec_new(1024 * 1024))),
306 };
307
308 this.encode_metadata(metadata).await?;
309 this.state.files_offset = this.position();
310
311 Ok(this)
312 }
313
314 fn check(&self) -> io::Result<()> {
315 match self.state.encode_error {
316 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
317 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
318 None => Ok(()),
319 }
320 }
321
322 pub async fn create_file<'b>(
323 &'b mut self,
324 metadata: &Metadata,
325 file_name: &Path,
326 file_size: u64,
327 ) -> io::Result<FileImpl<'b, T>>
328 where
329 'a: 'b,
330 {
331 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
332 .await
333 }
334
335 async fn create_file_do<'b>(
336 &'b mut self,
337 metadata: &Metadata,
338 file_name: &[u8],
339 file_size: u64,
340 ) -> io::Result<FileImpl<'b, T>>
341 where
342 'a: 'b,
343 {
344 self.check()?;
345
346 let file_offset = self.position();
347 self.start_file_do(Some(metadata), file_name).await?;
348
349 let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
350 header.check_header_size()?;
351
352 seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
353
354 let payload_data_offset = self.position();
355
356 let meta_size = payload_data_offset - file_offset;
357
358 Ok(FileImpl {
359 output: self.output.as_mut(),
360 goodbye_item: GoodbyeItem {
361 hash: format::hash_filename(file_name),
362 offset: file_offset,
363 size: file_size + meta_size,
364 },
365 remaining_size: file_size,
366 parent: &mut self.state,
367 })
368 }
369
370 /// Return a file offset usable with `add_hardlink`.
371 pub async fn add_file(
372 &mut self,
373 metadata: &Metadata,
374 file_name: &Path,
375 file_size: u64,
376 content: &mut dyn SeqRead,
377 ) -> io::Result<LinkOffset> {
378 let buf = Arc::clone(&self.file_copy_buffer);
379 let mut file = self.create_file(metadata, file_name, file_size).await?;
380 let mut buf = buf.lock().expect("failed to lock temporary buffer mutex");
381 loop {
382 let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
383 if got == 0 {
384 break;
385 } else {
386 file.write_all(&buf[..got]).await?;
387 }
388 }
389 Ok(file.file_offset())
390 }
391
392 /// Return a file offset usable with `add_hardlink`.
393 pub async fn add_symlink(
394 &mut self,
395 metadata: &Metadata,
396 file_name: &Path,
397 target: &Path,
398 ) -> io::Result<()> {
399 let target = target.as_os_str().as_bytes();
400 let mut data = Vec::with_capacity(target.len() + 1);
401 data.extend(target);
402 data.push(0);
403 let _ofs: LinkOffset = self
404 .add_file_entry(
405 Some(metadata),
406 file_name,
407 Some((format::PXAR_SYMLINK, &data)),
408 )
409 .await?;
410 Ok(())
411 }
412
413 /// Return a file offset usable with `add_hardlink`.
414 pub async fn add_hardlink(
415 &mut self,
416 file_name: &Path,
417 target: &Path,
418 target_offset: LinkOffset,
419 ) -> io::Result<()> {
420 let current_offset = self.position();
421 if current_offset <= target_offset.0 {
422 io_bail!("invalid hardlink offset, can only point to prior files");
423 }
424
425 let offset_bytes = (current_offset - target_offset.0).to_le_bytes();
426 let target_bytes = target.as_os_str().as_bytes();
427 let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len() + 1);
428 hardlink.extend(&offset_bytes);
429 hardlink.extend(target_bytes);
430 hardlink.push(0);
431 let _this_offset: LinkOffset = self
432 .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink)))
433 .await?;
434 Ok(())
435 }
436
437 /// Return a file offset usable with `add_hardlink`.
438 pub async fn add_device(
439 &mut self,
440 metadata: &Metadata,
441 file_name: &Path,
442 device: format::Device,
443 ) -> io::Result<()> {
444 if !metadata.is_device() {
445 io_bail!("entry added via add_device must have a device mode in its metadata");
446 }
447
448 let device = device.to_le();
449 let device = unsafe {
450 std::slice::from_raw_parts(
451 &device as *const format::Device as *const u8,
452 size_of::<format::Device>(),
453 )
454 };
455 let _ofs: LinkOffset = self
456 .add_file_entry(
457 Some(metadata),
458 file_name,
459 Some((format::PXAR_DEVICE, device)),
460 )
461 .await?;
462 Ok(())
463 }
464
465 /// Return a file offset usable with `add_hardlink`.
466 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
467 if !metadata.is_fifo() {
468 io_bail!("entry added via add_device must be of type fifo in its metadata");
469 }
470
471 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
472 Ok(())
473 }
474
475 /// Return a file offset usable with `add_hardlink`.
476 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
477 if !metadata.is_socket() {
478 io_bail!("entry added via add_device must be of type socket in its metadata");
479 }
480
481 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
482 Ok(())
483 }
484
485 /// Return a file offset usable with `add_hardlink`.
486 async fn add_file_entry(
487 &mut self,
488 metadata: Option<&Metadata>,
489 file_name: &Path,
490 entry_htype_data: Option<(u64, &[u8])>,
491 ) -> io::Result<LinkOffset> {
492 self.check()?;
493
494 let file_offset = self.position();
495
496 let file_name = file_name.as_os_str().as_bytes();
497
498 self.start_file_do(metadata, file_name).await?;
499 if let Some((htype, entry_data)) = entry_htype_data {
500 seq_write_pxar_entry(
501 self.output.as_mut(),
502 htype,
503 entry_data,
504 &mut self.state.write_position,
505 )
506 .await?;
507 }
508
509 let end_offset = self.position();
510
511 self.state.items.push(GoodbyeItem {
512 hash: format::hash_filename(file_name),
513 offset: file_offset,
514 size: end_offset - file_offset,
515 });
516
517 Ok(LinkOffset(file_offset))
518 }
519
520 #[inline]
521 fn position(&mut self) -> u64 {
522 self.state.write_position
523 }
524
525 pub async fn create_directory(
526 &mut self,
527 file_name: &Path,
528 metadata: &Metadata,
529 ) -> io::Result<EncoderImpl<'_, T>> {
530 self.check()?;
531
532 if !metadata.is_dir() {
533 io_bail!("directory metadata must contain the directory mode flag");
534 }
535
536 let file_name = file_name.as_os_str().as_bytes();
537 let file_hash = format::hash_filename(file_name);
538
539 let file_offset = self.position();
540 self.encode_filename(file_name).await?;
541
542 let entry_offset = self.position();
543 self.encode_metadata(&metadata).await?;
544
545 let files_offset = self.position();
546
547 // the child will write to OUR state now:
548 let write_position = self.position();
549
550 let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
551
552 Ok(EncoderImpl {
553 // always forward as Borrowed(), to avoid stacking references on nested calls
554 output: self.output.to_borrowed(),
555 state: EncoderState {
556 entry_offset,
557 files_offset,
558 file_offset: Some(file_offset),
559 file_hash,
560 write_position,
561 ..Default::default()
562 },
563 parent: Some(&mut self.state),
564 finished: false,
565 file_copy_buffer,
566 })
567 }
568
569 async fn start_file_do(
570 &mut self,
571 metadata: Option<&Metadata>,
572 file_name: &[u8],
573 ) -> io::Result<()> {
574 self.encode_filename(file_name).await?;
575 if let Some(metadata) = metadata {
576 self.encode_metadata(&metadata).await?;
577 }
578 Ok(())
579 }
580
581 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
582 seq_write_pxar_struct_entry(
583 self.output.as_mut(),
584 format::PXAR_ENTRY,
585 metadata.stat.clone(),
586 &mut self.state.write_position,
587 )
588 .await?;
589
590 for xattr in &metadata.xattrs {
591 self.write_xattr(xattr).await?;
592 }
593
594 self.write_acls(&metadata.acl).await?;
595
596 if let Some(fcaps) = &metadata.fcaps {
597 self.write_file_capabilities(fcaps).await?;
598 }
599
600 if let Some(qpid) = &metadata.quota_project_id {
601 self.write_quota_project_id(qpid).await?;
602 }
603
604 Ok(())
605 }
606
607 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
608 seq_write_pxar_entry(
609 self.output.as_mut(),
610 format::PXAR_XATTR,
611 &xattr.data,
612 &mut self.state.write_position,
613 )
614 .await
615 }
616
617 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
618 for acl in &acl.users {
619 seq_write_pxar_struct_entry(
620 self.output.as_mut(),
621 format::PXAR_ACL_USER,
622 acl.clone(),
623 &mut self.state.write_position,
624 )
625 .await?;
626 }
627
628 for acl in &acl.groups {
629 seq_write_pxar_struct_entry(
630 self.output.as_mut(),
631 format::PXAR_ACL_GROUP,
632 acl.clone(),
633 &mut self.state.write_position,
634 )
635 .await?;
636 }
637
638 if let Some(acl) = &acl.group_obj {
639 seq_write_pxar_struct_entry(
640 self.output.as_mut(),
641 format::PXAR_ACL_GROUP_OBJ,
642 acl.clone(),
643 &mut self.state.write_position,
644 )
645 .await?;
646 }
647
648 if let Some(acl) = &acl.default {
649 seq_write_pxar_struct_entry(
650 self.output.as_mut(),
651 format::PXAR_ACL_DEFAULT,
652 acl.clone(),
653 &mut self.state.write_position,
654 )
655 .await?;
656 }
657
658 for acl in &acl.default_users {
659 seq_write_pxar_struct_entry(
660 self.output.as_mut(),
661 format::PXAR_ACL_DEFAULT_USER,
662 acl.clone(),
663 &mut self.state.write_position,
664 )
665 .await?;
666 }
667
668 for acl in &acl.default_groups {
669 seq_write_pxar_struct_entry(
670 self.output.as_mut(),
671 format::PXAR_ACL_DEFAULT_GROUP,
672 acl.clone(),
673 &mut self.state.write_position,
674 )
675 .await?;
676 }
677
678 Ok(())
679 }
680
681 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
682 seq_write_pxar_entry(
683 self.output.as_mut(),
684 format::PXAR_FCAPS,
685 &fcaps.data,
686 &mut self.state.write_position,
687 )
688 .await
689 }
690
691 async fn write_quota_project_id(
692 &mut self,
693 quota_project_id: &format::QuotaProjectId,
694 ) -> io::Result<()> {
695 seq_write_pxar_struct_entry(
696 self.output.as_mut(),
697 format::PXAR_QUOTA_PROJID,
698 *quota_project_id,
699 &mut self.state.write_position,
700 )
701 .await
702 }
703
704 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
705 crate::util::validate_filename(file_name)?;
706 seq_write_pxar_entry_zero(
707 self.output.as_mut(),
708 format::PXAR_FILENAME,
709 file_name,
710 &mut self.state.write_position,
711 )
712 .await
713 }
714
715 pub async fn finish(mut self) -> io::Result<()> {
716 let tail_bytes = self.finish_goodbye_table().await?;
717 seq_write_pxar_entry(
718 self.output.as_mut(),
719 format::PXAR_GOODBYE,
720 &tail_bytes,
721 &mut self.state.write_position,
722 )
723 .await?;
724
725 flush(self.output.as_mut()).await?;
726
727 // done up here because of the self-borrow and to propagate
728 let end_offset = self.position();
729
730 if let Some(parent) = &mut self.parent {
731 parent.write_position = end_offset;
732
733 let file_offset = self
734 .state
735 .file_offset
736 .expect("internal error: parent set but no file_offset?");
737
738 parent.items.push(GoodbyeItem {
739 hash: self.state.file_hash,
740 offset: file_offset,
741 size: end_offset - file_offset,
742 });
743 }
744 self.finished = true;
745 Ok(())
746 }
747
748 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
749 let goodbye_offset = self.position();
750
751 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
752 let mut tail = take(&mut self.state.items);
753 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
754 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
755
756 // sort, then create a BST
757 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
758
759 let mut bst = Vec::with_capacity(tail.len() + 1);
760 unsafe {
761 bst.set_len(tail.len());
762 }
763 binary_tree_array::copy(tail.len(), |src, dest| {
764 let mut item = tail[src].clone();
765 // fixup the goodbye table offsets to be relative and with the right endianess
766 item.offset = goodbye_offset - item.offset;
767 unsafe {
768 std::ptr::write(&mut bst[dest], item.to_le());
769 }
770 });
771 drop(tail);
772
773 bst.push(
774 GoodbyeItem {
775 hash: format::PXAR_GOODBYE_TAIL_MARKER,
776 offset: goodbye_offset - self.state.entry_offset,
777 size: goodbye_size,
778 }
779 .to_le(),
780 );
781
782 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
783 // the items make sense:
784 let data = bst.as_mut_ptr() as *mut u8;
785 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
786 forget(bst);
787 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
788 }
789 }
790
791 /// Writer for a file object in a directory.
792 pub struct FileImpl<'a, S: SeqWrite> {
793 output: &'a mut S,
794
795 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
796 /// directly instead of on Drop of FileImpl?
797 goodbye_item: GoodbyeItem,
798
799 /// While writing data to this file, this is how much space we still have left, this must reach
800 /// exactly zero.
801 remaining_size: u64,
802
803 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
804 /// to, and where we insert our `GoodbyeItem`.
805 parent: &'a mut EncoderState,
806 }
807
808 impl<'a, S: SeqWrite> Drop for FileImpl<'a, S> {
809 fn drop(&mut self) {
810 if self.remaining_size != 0 {
811 self.parent.add_error(EncodeError::IncompleteFile);
812 }
813
814 self.parent.items.push(self.goodbye_item.clone());
815 }
816 }
817
818 impl<'a, S: SeqWrite> FileImpl<'a, S> {
819 /// Get the file offset to be able to reference it with `add_hardlink`.
820 pub fn file_offset(&self) -> LinkOffset {
821 LinkOffset(self.goodbye_item.offset)
822 }
823
824 fn check_remaining(&self, size: usize) -> io::Result<()> {
825 if size as u64 > self.remaining_size {
826 io_bail!("attempted to write more than previously allocated");
827 } else {
828 Ok(())
829 }
830 }
831
832 /// Poll write interface to more easily connect to tokio/futures.
833 #[cfg(feature = "tokio-io")]
834 pub fn poll_write(
835 self: Pin<&mut Self>,
836 cx: &mut Context,
837 data: &[u8],
838 ) -> Poll<io::Result<usize>> {
839 let this = self.get_mut();
840 this.check_remaining(data.len())?;
841 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
842 match output.poll_seq_write(cx, data) {
843 Poll::Ready(Ok(put)) => {
844 this.remaining_size -= put as u64;
845 this.parent.write_position += put as u64;
846 Poll::Ready(Ok(put))
847 }
848 other => other,
849 }
850 }
851
852 /// Poll flush interface to more easily connect to tokio/futures.
853 #[cfg(feature = "tokio-io")]
854 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
855 unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
856 }
857
858 /// Poll close/shutdown interface to more easily connect to tokio/futures.
859 ///
860 /// This just calls flush, though, since we're just a virtual writer writing to the file
861 /// provided by our encoder.
862 #[cfg(feature = "tokio-io")]
863 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
864 unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
865 }
866
867 /// Write file data for the current file entry in a pxar archive.
868 ///
869 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
870 /// requested. Check the return value for how many. There's also a `write_all` method available
871 /// for convenience.
872 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
873 self.check_remaining(data.len())?;
874 let put =
875 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
876 .await?;
877 //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
878 self.remaining_size -= put as u64;
879 self.parent.write_position += put as u64;
880 Ok(put)
881 }
882
883 /// Completely write file data for the current file entry in a pxar archive.
884 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
885 self.check_remaining(data.len())?;
886 seq_write_all(self.output, data, &mut self.parent.write_position).await?;
887 self.remaining_size -= data.len() as u64;
888 Ok(())
889 }
890 }
891
892 #[cfg(feature = "tokio-io")]
893 impl<'a, S: SeqWrite> tokio::io::AsyncWrite for FileImpl<'a, S> {
894 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
895 FileImpl::poll_write(self, cx, buf)
896 }
897
898 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
899 FileImpl::poll_flush(self, cx)
900 }
901
902 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
903 FileImpl::poll_close(self, cx)
904 }
905 }