]> git.proxmox.com Git - pxar.git/blob - src/encoder/mod.rs
remove futures-io feature
[pxar.git] / src / encoder / mod.rs
1 //! The `pxar` encoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 use std::cell::RefCell;
6 use std::io;
7 use std::mem::{forget, size_of, size_of_val, take};
8 use std::os::unix::ffi::OsStrExt;
9 use std::path::Path;
10 use std::pin::Pin;
11 use std::rc::Rc;
12 use std::task::{Context, Poll};
13
14 use endian_trait::Endian;
15
16 use crate::binary_tree_array;
17 use crate::decoder::{self, SeqRead};
18 use crate::format::{self, GoodbyeItem};
19 use crate::poll_fn::poll_fn;
20 use crate::Metadata;
21
22 pub mod aio;
23 pub mod sync;
24
25 #[doc(inline)]
26 pub use sync::Encoder;
27
28 /// File reference used to create hard links.
29 #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
30 pub struct LinkOffset(u64);
31
32 impl LinkOffset {
33 #[inline]
34 pub fn raw(self) -> u64 {
35 self.0
36 }
37 }
38
39 /// Sequential write interface used by the encoder's state machine.
40 ///
41 /// This is our internal writer trait which is available for `std::io::Write` types in the
42 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
43 /// wrapper.
44 pub trait SeqWrite {
45 fn poll_seq_write(
46 self: Pin<&mut Self>,
47 cx: &mut Context,
48 buf: &[u8],
49 ) -> Poll<io::Result<usize>>;
50
51 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
52
53 /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
54 /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
55 /// subdirectory in that would have a trait object pointing to the trait object, and so on.
56 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
57 where
58 Self: Sized,
59 {
60 self as &mut dyn SeqWrite
61 }
62 }
63
64 /// Allow using trait objects for generics taking a `SeqWrite`.
65 impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
66 fn poll_seq_write(
67 self: Pin<&mut Self>,
68 cx: &mut Context,
69 buf: &[u8],
70 ) -> Poll<io::Result<usize>> {
71 unsafe {
72 self.map_unchecked_mut(|this| &mut **this)
73 .poll_seq_write(cx, buf)
74 }
75 }
76
77 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
78 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
79 }
80
81 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
82 where
83 Self: Sized,
84 {
85 &mut **self
86 }
87 }
88
89 /// awaitable verison of `poll_seq_write`.
90 async fn seq_write<T: SeqWrite + ?Sized>(
91 output: &mut T,
92 buf: &[u8],
93 position: &mut u64,
94 ) -> io::Result<usize> {
95 let put =
96 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await?;
97 *position += put as u64;
98 Ok(put)
99 }
100
101 /// Write the entire contents of a buffer, handling short writes.
102 async fn seq_write_all<T: SeqWrite + ?Sized>(
103 output: &mut T,
104 mut buf: &[u8],
105 position: &mut u64,
106 ) -> io::Result<()> {
107 while !buf.is_empty() {
108 let got = seq_write(&mut *output, buf, &mut *position).await?;
109 buf = &buf[got..];
110 }
111 Ok(())
112 }
113
114 /// Write an endian-swappable struct.
115 async fn seq_write_struct<E: Endian, T>(
116 output: &mut T,
117 data: E,
118 position: &mut u64,
119 ) -> io::Result<()>
120 where
121 T: SeqWrite + ?Sized,
122 {
123 let data = data.to_le();
124 seq_write_all(
125 output,
126 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) },
127 position,
128 )
129 .await
130 }
131
132 /// Write a pxar entry.
133 async fn seq_write_pxar_entry<T>(
134 output: &mut T,
135 htype: u64,
136 data: &[u8],
137 position: &mut u64,
138 ) -> io::Result<()>
139 where
140 T: SeqWrite + ?Sized,
141 {
142 let header = format::Header::with_content_size(htype, data.len() as u64);
143 header.check_header_size()?;
144
145 seq_write_struct(&mut *output, header, &mut *position).await?;
146 seq_write_all(output, data, position).await
147 }
148
149 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
150 /// data buffer.
151 async fn seq_write_pxar_entry_zero<T>(
152 output: &mut T,
153 htype: u64,
154 data: &[u8],
155 position: &mut u64,
156 ) -> io::Result<()>
157 where
158 T: SeqWrite + ?Sized,
159 {
160 let header = format::Header::with_content_size(htype, 1 + data.len() as u64);
161 header.check_header_size()?;
162
163 seq_write_struct(&mut *output, header, &mut *position).await?;
164 seq_write_all(&mut *output, data, &mut *position).await?;
165 seq_write_all(output, &[0u8], position).await
166 }
167
168 /// Write a pxar entry consiting of an endian-swappable struct.
169 async fn seq_write_pxar_struct_entry<E, T>(
170 output: &mut T,
171 htype: u64,
172 data: E,
173 position: &mut u64,
174 ) -> io::Result<()>
175 where
176 T: SeqWrite + ?Sized,
177 E: Endian,
178 {
179 let data = data.to_le();
180 seq_write_pxar_entry(
181 output,
182 htype,
183 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) },
184 position,
185 )
186 .await
187 }
188
189 /// Error conditions caused by wrong usage of this crate.
190 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
191 pub enum EncodeError {
192 /// The user dropped a `File` without without finishing writing all of its contents.
193 ///
194 /// This is required because the payload lengths is written out at the begining and decoding
195 /// requires there to follow the right amount of data.
196 IncompleteFile,
197
198 /// The user dropped a directory without finalizing it.
199 ///
200 /// Finalizing is required to build the goodbye table at the end of a directory.
201 IncompleteDirectory,
202 }
203
204 #[derive(Default)]
205 struct EncoderState {
206 /// Goodbye items for this directory, excluding the tail.
207 items: Vec<GoodbyeItem>,
208
209 /// User caused error conditions.
210 encode_error: Option<EncodeError>,
211
212 /// Offset of this directory's ENTRY.
213 entry_offset: u64,
214
215 /// Offset to this directory's first FILENAME.
216 files_offset: u64,
217
218 /// If this is a subdirectory, this points to the this directory's FILENAME.
219 file_offset: Option<u64>,
220
221 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
222 file_hash: u64,
223
224 /// We need to keep track how much we have written to get offsets.
225 write_position: u64,
226 }
227
228 impl EncoderState {
229 fn merge_error(&mut self, error: Option<EncodeError>) {
230 // one error is enough:
231 if self.encode_error.is_none() {
232 self.encode_error = error;
233 }
234 }
235
236 fn add_error(&mut self, error: EncodeError) {
237 self.merge_error(Some(error));
238 }
239 }
240
241 /// The encoder state machine implementation for a directory.
242 ///
243 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
244 /// synchronous or `async` I/O objects in as output.
245 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
246 output: Option<T>,
247 state: EncoderState,
248 parent: Option<&'a mut EncoderState>,
249 finished: bool,
250
251 /// Since only the "current" entry can be actively writing files, we share the file copy
252 /// buffer.
253 file_copy_buffer: Rc<RefCell<Vec<u8>>>,
254 }
255
256 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
257 fn drop(&mut self) {
258 if let Some(ref mut parent) = self.parent {
259 // propagate errors:
260 parent.merge_error(self.state.encode_error);
261 if !self.finished {
262 parent.add_error(EncodeError::IncompleteDirectory);
263 }
264 } else if !self.finished {
265 // FIXME: how do we deal with this?
266 // eprintln!("Encoder dropped without finishing!");
267 }
268 }
269 }
270
271 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
272 pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
273 if !metadata.is_dir() {
274 io_bail!("directory metadata must contain the directory mode flag");
275 }
276 let mut this = Self {
277 output: Some(output),
278 state: EncoderState::default(),
279 parent: None,
280 finished: false,
281 file_copy_buffer: Rc::new(RefCell::new(crate::util::vec_new(1024 * 1024))),
282 };
283
284 this.encode_metadata(metadata).await?;
285 this.state.files_offset = this.position();
286
287 Ok(this)
288 }
289
290 fn check(&self) -> io::Result<()> {
291 match self.state.encode_error {
292 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
293 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
294 None => Ok(()),
295 }
296 }
297
298 pub async fn create_file<'b>(
299 &'b mut self,
300 metadata: &Metadata,
301 file_name: &Path,
302 file_size: u64,
303 ) -> io::Result<FileImpl<'b>>
304 where
305 'a: 'b,
306 {
307 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
308 .await
309 }
310
311 async fn create_file_do<'b>(
312 &'b mut self,
313 metadata: &Metadata,
314 file_name: &[u8],
315 file_size: u64,
316 ) -> io::Result<FileImpl<'b>>
317 where
318 'a: 'b,
319 {
320 self.check()?;
321
322 let file_offset = self.position();
323 self.start_file_do(Some(metadata), file_name).await?;
324
325 let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
326 header.check_header_size()?;
327
328 seq_write_struct(
329 self.output.as_mut().unwrap(),
330 header,
331 &mut self.state.write_position,
332 )
333 .await?;
334
335 let payload_data_offset = self.position();
336
337 let meta_size = payload_data_offset - file_offset;
338
339 Ok(FileImpl {
340 output: self.output.as_mut().unwrap(),
341 goodbye_item: GoodbyeItem {
342 hash: format::hash_filename(file_name),
343 offset: file_offset,
344 size: file_size + meta_size,
345 },
346 remaining_size: file_size,
347 parent: &mut self.state,
348 })
349 }
350
351 /// Return a file offset usable with `add_hardlink`.
352 // This is an `&mut self`, the Rc to the `file_copy_buffer` is handed down in
353 // `create_directory` which is also an `&mut self` method and returns an encoder holding a
354 // mutable borrow on `self`, opening a nested entry in the archive.
355 // So this should be fine.
356 #[allow(clippy::await_holding_refcell_ref)]
357 pub async fn add_file(
358 &mut self,
359 metadata: &Metadata,
360 file_name: &Path,
361 file_size: u64,
362 content: &mut dyn SeqRead,
363 ) -> io::Result<LinkOffset> {
364 let buf = Rc::clone(&self.file_copy_buffer);
365 let mut file = self.create_file(metadata, file_name, file_size).await?;
366 let mut buf = buf.borrow_mut();
367 loop {
368 let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
369 if got == 0 {
370 break;
371 } else {
372 file.write_all(&buf[..got]).await?;
373 }
374 }
375 Ok(file.file_offset())
376 }
377
378 /// Return a file offset usable with `add_hardlink`.
379 pub async fn add_symlink(
380 &mut self,
381 metadata: &Metadata,
382 file_name: &Path,
383 target: &Path,
384 ) -> io::Result<()> {
385 let target = target.as_os_str().as_bytes();
386 let mut data = Vec::with_capacity(target.len() + 1);
387 data.extend(target);
388 data.push(0);
389 let _ofs: LinkOffset = self
390 .add_file_entry(
391 Some(metadata),
392 file_name,
393 Some((format::PXAR_SYMLINK, &data)),
394 )
395 .await?;
396 Ok(())
397 }
398
399 /// Return a file offset usable with `add_hardlink`.
400 pub async fn add_hardlink(
401 &mut self,
402 file_name: &Path,
403 target: &Path,
404 target_offset: LinkOffset,
405 ) -> io::Result<()> {
406 let current_offset = self.position();
407 if current_offset <= target_offset.0 {
408 io_bail!("invalid hardlink offset, can only point to prior files");
409 }
410
411 let offset_bytes = (current_offset - target_offset.0).to_le_bytes();
412 let target_bytes = target.as_os_str().as_bytes();
413 let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len() + 1);
414 hardlink.extend(&offset_bytes);
415 hardlink.extend(target_bytes);
416 hardlink.push(0);
417 let _this_offset: LinkOffset = self
418 .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink)))
419 .await?;
420 Ok(())
421 }
422
423 /// Return a file offset usable with `add_hardlink`.
424 pub async fn add_device(
425 &mut self,
426 metadata: &Metadata,
427 file_name: &Path,
428 device: format::Device,
429 ) -> io::Result<()> {
430 if !metadata.is_device() {
431 io_bail!("entry added via add_device must have a device mode in its metadata");
432 }
433
434 let device = device.to_le();
435 let device = unsafe {
436 std::slice::from_raw_parts(
437 &device as *const format::Device as *const u8,
438 size_of::<format::Device>(),
439 )
440 };
441 let _ofs: LinkOffset = self
442 .add_file_entry(
443 Some(metadata),
444 file_name,
445 Some((format::PXAR_DEVICE, device)),
446 )
447 .await?;
448 Ok(())
449 }
450
451 /// Return a file offset usable with `add_hardlink`.
452 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
453 if !metadata.is_fifo() {
454 io_bail!("entry added via add_device must be of type fifo in its metadata");
455 }
456
457 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
458 Ok(())
459 }
460
461 /// Return a file offset usable with `add_hardlink`.
462 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
463 if !metadata.is_socket() {
464 io_bail!("entry added via add_device must be of type socket in its metadata");
465 }
466
467 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
468 Ok(())
469 }
470
471 /// Return a file offset usable with `add_hardlink`.
472 async fn add_file_entry(
473 &mut self,
474 metadata: Option<&Metadata>,
475 file_name: &Path,
476 entry_htype_data: Option<(u64, &[u8])>,
477 ) -> io::Result<LinkOffset> {
478 self.check()?;
479
480 let file_offset = self.position();
481
482 let file_name = file_name.as_os_str().as_bytes();
483
484 self.start_file_do(metadata, file_name).await?;
485 if let Some((htype, entry_data)) = entry_htype_data {
486 seq_write_pxar_entry(
487 self.output.as_mut().unwrap(),
488 htype,
489 entry_data,
490 &mut self.state.write_position,
491 )
492 .await?;
493 }
494
495 let end_offset = self.position();
496
497 self.state.items.push(GoodbyeItem {
498 hash: format::hash_filename(file_name),
499 offset: file_offset,
500 size: end_offset - file_offset,
501 });
502
503 Ok(LinkOffset(file_offset))
504 }
505
506 #[inline]
507 fn position(&mut self) -> u64 {
508 self.state.write_position
509 }
510
511 pub async fn create_directory<'b>(
512 &'b mut self,
513 file_name: &Path,
514 metadata: &Metadata,
515 ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>>
516 where
517 'a: 'b,
518 {
519 self.check()?;
520
521 if !metadata.is_dir() {
522 io_bail!("directory metadata must contain the directory mode flag");
523 }
524
525 let file_name = file_name.as_os_str().as_bytes();
526 let file_hash = format::hash_filename(file_name);
527
528 let file_offset = self.position();
529 self.encode_filename(file_name).await?;
530
531 let entry_offset = self.position();
532 self.encode_metadata(&metadata).await?;
533
534 let files_offset = self.position();
535
536 // the child will write to OUR state now:
537 let write_position = self.position();
538
539 Ok(EncoderImpl {
540 output: self.output.as_mut().map(SeqWrite::as_trait_object),
541 state: EncoderState {
542 entry_offset,
543 files_offset,
544 file_offset: Some(file_offset),
545 file_hash,
546 write_position,
547 ..Default::default()
548 },
549 parent: Some(&mut self.state),
550 finished: false,
551 file_copy_buffer: Rc::clone(&self.file_copy_buffer),
552 })
553 }
554
555 async fn start_file_do(
556 &mut self,
557 metadata: Option<&Metadata>,
558 file_name: &[u8],
559 ) -> io::Result<()> {
560 self.encode_filename(file_name).await?;
561 if let Some(metadata) = metadata {
562 self.encode_metadata(&metadata).await?;
563 }
564 Ok(())
565 }
566
567 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
568 seq_write_pxar_struct_entry(
569 self.output.as_mut().unwrap(),
570 format::PXAR_ENTRY,
571 metadata.stat.clone(),
572 &mut self.state.write_position,
573 )
574 .await?;
575
576 for xattr in &metadata.xattrs {
577 self.write_xattr(xattr).await?;
578 }
579
580 self.write_acls(&metadata.acl).await?;
581
582 if let Some(fcaps) = &metadata.fcaps {
583 self.write_file_capabilities(fcaps).await?;
584 }
585
586 if let Some(qpid) = &metadata.quota_project_id {
587 self.write_quota_project_id(qpid).await?;
588 }
589
590 Ok(())
591 }
592
593 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
594 seq_write_pxar_entry(
595 self.output.as_mut().unwrap(),
596 format::PXAR_XATTR,
597 &xattr.data,
598 &mut self.state.write_position,
599 )
600 .await
601 }
602
603 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
604 for acl in &acl.users {
605 seq_write_pxar_struct_entry(
606 self.output.as_mut().unwrap(),
607 format::PXAR_ACL_USER,
608 acl.clone(),
609 &mut self.state.write_position,
610 )
611 .await?;
612 }
613
614 for acl in &acl.groups {
615 seq_write_pxar_struct_entry(
616 self.output.as_mut().unwrap(),
617 format::PXAR_ACL_GROUP,
618 acl.clone(),
619 &mut self.state.write_position,
620 )
621 .await?;
622 }
623
624 if let Some(acl) = &acl.group_obj {
625 seq_write_pxar_struct_entry(
626 self.output.as_mut().unwrap(),
627 format::PXAR_ACL_GROUP_OBJ,
628 acl.clone(),
629 &mut self.state.write_position,
630 )
631 .await?;
632 }
633
634 if let Some(acl) = &acl.default {
635 seq_write_pxar_struct_entry(
636 self.output.as_mut().unwrap(),
637 format::PXAR_ACL_DEFAULT,
638 acl.clone(),
639 &mut self.state.write_position,
640 )
641 .await?;
642 }
643
644 for acl in &acl.default_users {
645 seq_write_pxar_struct_entry(
646 self.output.as_mut().unwrap(),
647 format::PXAR_ACL_DEFAULT_USER,
648 acl.clone(),
649 &mut self.state.write_position,
650 )
651 .await?;
652 }
653
654 for acl in &acl.default_groups {
655 seq_write_pxar_struct_entry(
656 self.output.as_mut().unwrap(),
657 format::PXAR_ACL_DEFAULT_GROUP,
658 acl.clone(),
659 &mut self.state.write_position,
660 )
661 .await?;
662 }
663
664 Ok(())
665 }
666
667 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
668 seq_write_pxar_entry(
669 self.output.as_mut().unwrap(),
670 format::PXAR_FCAPS,
671 &fcaps.data,
672 &mut self.state.write_position,
673 )
674 .await
675 }
676
677 async fn write_quota_project_id(
678 &mut self,
679 quota_project_id: &format::QuotaProjectId,
680 ) -> io::Result<()> {
681 seq_write_pxar_struct_entry(
682 self.output.as_mut().unwrap(),
683 format::PXAR_QUOTA_PROJID,
684 *quota_project_id,
685 &mut self.state.write_position,
686 )
687 .await
688 }
689
690 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
691 crate::util::validate_filename(file_name)?;
692 seq_write_pxar_entry_zero(
693 self.output.as_mut().unwrap(),
694 format::PXAR_FILENAME,
695 file_name,
696 &mut self.state.write_position,
697 )
698 .await
699 }
700
701 pub async fn finish(mut self) -> io::Result<T> {
702 let tail_bytes = self.finish_goodbye_table().await?;
703 seq_write_pxar_entry(
704 self.output.as_mut().unwrap(),
705 format::PXAR_GOODBYE,
706 &tail_bytes,
707 &mut self.state.write_position,
708 )
709 .await?;
710
711 // done up here because of the self-borrow and to propagate
712 let end_offset = self.position();
713
714 if let Some(parent) = &mut self.parent {
715 parent.write_position = end_offset;
716
717 let file_offset = self
718 .state
719 .file_offset
720 .expect("internal error: parent set but no file_offset?");
721
722 parent.items.push(GoodbyeItem {
723 hash: self.state.file_hash,
724 offset: file_offset,
725 size: end_offset - file_offset,
726 });
727 }
728 self.finished = true;
729 Ok(self.output.take().unwrap())
730 }
731
732 pub fn into_writer(mut self) -> T {
733 self.output.take().unwrap()
734 }
735
736 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
737 let goodbye_offset = self.position();
738
739 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
740 let mut tail = take(&mut self.state.items);
741 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
742 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
743
744 // sort, then create a BST
745 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
746
747 let mut bst = Vec::with_capacity(tail.len() + 1);
748 unsafe {
749 bst.set_len(tail.len());
750 }
751 binary_tree_array::copy(tail.len(), |src, dest| {
752 let mut item = tail[src].clone();
753 // fixup the goodbye table offsets to be relative and with the right endianess
754 item.offset = goodbye_offset - item.offset;
755 unsafe {
756 std::ptr::write(&mut bst[dest], item.to_le());
757 }
758 });
759 drop(tail);
760
761 bst.push(
762 GoodbyeItem {
763 hash: format::PXAR_GOODBYE_TAIL_MARKER,
764 offset: goodbye_offset - self.state.entry_offset,
765 size: goodbye_size,
766 }
767 .to_le(),
768 );
769
770 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
771 // the items make sense:
772 let data = bst.as_mut_ptr() as *mut u8;
773 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
774 forget(bst);
775 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
776 }
777 }
778
779 /// Writer for a file object in a directory.
780 pub struct FileImpl<'a> {
781 output: &'a mut dyn SeqWrite,
782
783 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
784 /// directly instead of on Drop of FileImpl?
785 goodbye_item: GoodbyeItem,
786
787 /// While writing data to this file, this is how much space we still have left, this must reach
788 /// exactly zero.
789 remaining_size: u64,
790
791 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
792 /// to, and where we insert our `GoodbyeItem`.
793 parent: &'a mut EncoderState,
794 }
795
796 impl<'a> Drop for FileImpl<'a> {
797 fn drop(&mut self) {
798 if self.remaining_size != 0 {
799 self.parent.add_error(EncodeError::IncompleteFile);
800 }
801
802 self.parent.items.push(self.goodbye_item.clone());
803 }
804 }
805
806 impl<'a> FileImpl<'a> {
807 /// Get the file offset to be able to reference it with `add_hardlink`.
808 pub fn file_offset(&self) -> LinkOffset {
809 LinkOffset(self.goodbye_item.offset)
810 }
811
812 fn check_remaining(&self, size: usize) -> io::Result<()> {
813 if size as u64 > self.remaining_size {
814 io_bail!("attempted to write more than previously allocated");
815 } else {
816 Ok(())
817 }
818 }
819
820 /// Poll write interface to more easily connect to tokio/futures.
821 #[cfg(feature = "tokio-io")]
822 pub fn poll_write(
823 self: Pin<&mut Self>,
824 cx: &mut Context,
825 data: &[u8],
826 ) -> Poll<io::Result<usize>> {
827 let this = self.get_mut();
828 this.check_remaining(data.len())?;
829 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
830 match output.poll_seq_write(cx, data) {
831 Poll::Ready(Ok(put)) => {
832 this.remaining_size -= put as u64;
833 this.parent.write_position += put as u64;
834 Poll::Ready(Ok(put))
835 }
836 other => other,
837 }
838 }
839
840 /// Poll flush interface to more easily connect to tokio/futures.
841 #[cfg(feature = "tokio-io")]
842 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
843 unsafe {
844 self.map_unchecked_mut(|this| &mut this.output)
845 .poll_flush(cx)
846 }
847 }
848
849 /// Poll close/shutdown interface to more easily connect to tokio/futures.
850 ///
851 /// This just calls flush, though, since we're just a virtual writer writing to the file
852 /// provided by our encoder.
853 #[cfg(feature = "tokio-io")]
854 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
855 unsafe {
856 self.map_unchecked_mut(|this| &mut this.output)
857 .poll_flush(cx)
858 }
859 }
860
861 /// Write file data for the current file entry in a pxar archive.
862 ///
863 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
864 /// requested. Check the return value for how many. There's also a `write_all` method available
865 /// for convenience.
866 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
867 self.check_remaining(data.len())?;
868 let put =
869 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
870 .await?;
871 //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
872 self.remaining_size -= put as u64;
873 self.parent.write_position += put as u64;
874 Ok(put)
875 }
876
877 /// Completely write file data for the current file entry in a pxar archive.
878 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
879 self.check_remaining(data.len())?;
880 seq_write_all(&mut self.output, data, &mut self.parent.write_position).await?;
881 self.remaining_size -= data.len() as u64;
882 Ok(())
883 }
884 }
885
886 #[cfg(feature = "tokio-io")]
887 impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
888 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
889 FileImpl::poll_write(self, cx, buf)
890 }
891
892 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
893 FileImpl::poll_flush(self, cx)
894 }
895
896 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
897 FileImpl::poll_close(self, cx)
898 }
899 }