]> git.proxmox.com Git - pxar.git/blob - src/encoder/mod.rs
encoder: let finish return writer, add into_writer
[pxar.git] / src / encoder / mod.rs
1 //! The `pxar` encoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 use std::cell::RefCell;
6 use std::io;
7 use std::mem::{forget, size_of, size_of_val, take};
8 use std::os::unix::ffi::OsStrExt;
9 use std::path::Path;
10 use std::pin::Pin;
11 use std::rc::Rc;
12 use std::task::{Context, Poll};
13
14 use endian_trait::Endian;
15
16 use crate::binary_tree_array;
17 use crate::decoder::{self, SeqRead};
18 use crate::format::{self, GoodbyeItem};
19 use crate::poll_fn::poll_fn;
20 use crate::Metadata;
21
22 pub mod aio;
23 pub mod sync;
24
25 #[doc(inline)]
26 pub use sync::Encoder;
27
28 /// File reference used to create hard links.
29 #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
30 pub struct LinkOffset(u64);
31
32 impl LinkOffset {
33 #[inline]
34 pub fn raw(self) -> u64 {
35 self.0
36 }
37 }
38
39 /// Sequential write interface used by the encoder's state machine.
40 ///
41 /// This is our internal writer trait which is available for `std::io::Write` types in the
42 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
43 /// wrapper.
44 pub trait SeqWrite {
45 fn poll_seq_write(
46 self: Pin<&mut Self>,
47 cx: &mut Context,
48 buf: &[u8],
49 ) -> Poll<io::Result<usize>>;
50
51 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
52
53 /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
54 /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
55 /// subdirectory in that would have a trait object pointing to the trait object, and so on.
56 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
57 where
58 Self: Sized,
59 {
60 self as &mut dyn SeqWrite
61 }
62 }
63
64 /// Allow using trait objects for generics taking a `SeqWrite`.
65 impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
66 fn poll_seq_write(
67 self: Pin<&mut Self>,
68 cx: &mut Context,
69 buf: &[u8],
70 ) -> Poll<io::Result<usize>> {
71 unsafe {
72 self.map_unchecked_mut(|this| &mut **this)
73 .poll_seq_write(cx, buf)
74 }
75 }
76
77 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
78 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
79 }
80
81 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
82 where
83 Self: Sized,
84 {
85 &mut **self
86 }
87 }
88
89 /// awaitable verison of `poll_seq_write`.
90 async fn seq_write<T: SeqWrite + ?Sized>(
91 output: &mut T,
92 buf: &[u8],
93 position: &mut u64,
94 ) -> io::Result<usize> {
95 let put =
96 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await?;
97 *position += put as u64;
98 Ok(put)
99 }
100
101 /// Write the entire contents of a buffer, handling short writes.
102 async fn seq_write_all<T: SeqWrite + ?Sized>(
103 output: &mut T,
104 mut buf: &[u8],
105 position: &mut u64,
106 ) -> io::Result<()> {
107 while !buf.is_empty() {
108 let got = seq_write(&mut *output, buf, &mut *position).await?;
109 buf = &buf[got..];
110 }
111 Ok(())
112 }
113
114 /// Write an endian-swappable struct.
115 async fn seq_write_struct<E: Endian, T>(
116 output: &mut T,
117 data: E,
118 position: &mut u64,
119 ) -> io::Result<()>
120 where
121 T: SeqWrite + ?Sized,
122 {
123 let data = data.to_le();
124 seq_write_all(
125 output,
126 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) },
127 position,
128 )
129 .await
130 }
131
132 /// Write a pxar entry.
133 async fn seq_write_pxar_entry<T>(
134 output: &mut T,
135 htype: u64,
136 data: &[u8],
137 position: &mut u64,
138 ) -> io::Result<()>
139 where
140 T: SeqWrite + ?Sized,
141 {
142 let header = format::Header::with_content_size(htype, data.len() as u64);
143 header.check_header_size()?;
144
145 seq_write_struct(&mut *output, header, &mut *position).await?;
146 seq_write_all(output, data, position).await
147 }
148
149 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
150 /// data buffer.
151 async fn seq_write_pxar_entry_zero<T>(
152 output: &mut T,
153 htype: u64,
154 data: &[u8],
155 position: &mut u64,
156 ) -> io::Result<()>
157 where
158 T: SeqWrite + ?Sized,
159 {
160 let header = format::Header::with_content_size(htype, 1 + data.len() as u64);
161 header.check_header_size()?;
162
163 seq_write_struct(&mut *output, header, &mut *position).await?;
164 seq_write_all(&mut *output, data, &mut *position).await?;
165 seq_write_all(output, &[0u8], position).await
166 }
167
168 /// Write a pxar entry consiting of an endian-swappable struct.
169 async fn seq_write_pxar_struct_entry<E, T>(
170 output: &mut T,
171 htype: u64,
172 data: E,
173 position: &mut u64,
174 ) -> io::Result<()>
175 where
176 T: SeqWrite + ?Sized,
177 E: Endian,
178 {
179 let data = data.to_le();
180 seq_write_pxar_entry(
181 output,
182 htype,
183 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) },
184 position,
185 )
186 .await
187 }
188
189 /// Error conditions caused by wrong usage of this crate.
190 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
191 pub enum EncodeError {
192 /// The user dropped a `File` without without finishing writing all of its contents.
193 ///
194 /// This is required because the payload lengths is written out at the begining and decoding
195 /// requires there to follow the right amount of data.
196 IncompleteFile,
197
198 /// The user dropped a directory without finalizing it.
199 ///
200 /// Finalizing is required to build the goodbye table at the end of a directory.
201 IncompleteDirectory,
202 }
203
204 #[derive(Default)]
205 struct EncoderState {
206 /// Goodbye items for this directory, excluding the tail.
207 items: Vec<GoodbyeItem>,
208
209 /// User caused error conditions.
210 encode_error: Option<EncodeError>,
211
212 /// Offset of this directory's ENTRY.
213 entry_offset: u64,
214
215 /// Offset to this directory's first FILENAME.
216 files_offset: u64,
217
218 /// If this is a subdirectory, this points to the this directory's FILENAME.
219 file_offset: Option<u64>,
220
221 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
222 file_hash: u64,
223
224 /// We need to keep track how much we have written to get offsets.
225 write_position: u64,
226 }
227
228 impl EncoderState {
229 fn merge_error(&mut self, error: Option<EncodeError>) {
230 // one error is enough:
231 if self.encode_error.is_none() {
232 self.encode_error = error;
233 }
234 }
235
236 fn add_error(&mut self, error: EncodeError) {
237 self.merge_error(Some(error));
238 }
239 }
240
241 /// The encoder state machine implementation for a directory.
242 ///
243 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
244 /// synchronous or `async` I/O objects in as output.
245 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
246 output: Option<T>,
247 state: EncoderState,
248 parent: Option<&'a mut EncoderState>,
249 finished: bool,
250
251 /// Since only the "current" entry can be actively writing files, we share the file copy
252 /// buffer.
253 file_copy_buffer: Rc<RefCell<Vec<u8>>>,
254 }
255
256 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
257 fn drop(&mut self) {
258 if let Some(ref mut parent) = self.parent {
259 // propagate errors:
260 parent.merge_error(self.state.encode_error);
261 if !self.finished {
262 parent.add_error(EncodeError::IncompleteDirectory);
263 }
264 } else if !self.finished {
265 // FIXME: how do we deal with this?
266 // eprintln!("Encoder dropped without finishing!");
267 }
268 }
269 }
270
271 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
272 pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
273 if !metadata.is_dir() {
274 io_bail!("directory metadata must contain the directory mode flag");
275 }
276 let mut this = Self {
277 output: Some(output),
278 state: EncoderState::default(),
279 parent: None,
280 finished: false,
281 file_copy_buffer: Rc::new(RefCell::new(crate::util::vec_new(1024 * 1024))),
282 };
283
284 this.encode_metadata(metadata).await?;
285 this.state.files_offset = this.position();
286
287 Ok(this)
288 }
289
290 fn check(&self) -> io::Result<()> {
291 match self.state.encode_error {
292 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
293 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
294 None => Ok(()),
295 }
296 }
297
298 pub async fn create_file<'b>(
299 &'b mut self,
300 metadata: &Metadata,
301 file_name: &Path,
302 file_size: u64,
303 ) -> io::Result<FileImpl<'b>>
304 where
305 'a: 'b,
306 {
307 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
308 .await
309 }
310
311 async fn create_file_do<'b>(
312 &'b mut self,
313 metadata: &Metadata,
314 file_name: &[u8],
315 file_size: u64,
316 ) -> io::Result<FileImpl<'b>>
317 where
318 'a: 'b,
319 {
320 self.check()?;
321
322 let file_offset = self.position();
323 self.start_file_do(Some(metadata), file_name).await?;
324
325 let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
326 header.check_header_size()?;
327
328 seq_write_struct(self.output.as_mut().unwrap(), header, &mut self.state.write_position).await?;
329
330 let payload_data_offset = self.position();
331
332 let meta_size = payload_data_offset - file_offset;
333
334 Ok(FileImpl {
335 output: self.output.as_mut().unwrap(),
336 goodbye_item: GoodbyeItem {
337 hash: format::hash_filename(file_name),
338 offset: file_offset,
339 size: file_size + meta_size,
340 },
341 remaining_size: file_size,
342 parent: &mut self.state,
343 })
344 }
345
346 /// Return a file offset usable with `add_hardlink`.
347 pub async fn add_file(
348 &mut self,
349 metadata: &Metadata,
350 file_name: &Path,
351 file_size: u64,
352 content: &mut dyn SeqRead,
353 ) -> io::Result<LinkOffset> {
354 let buf = Rc::clone(&self.file_copy_buffer);
355 let mut file = self.create_file(metadata, file_name, file_size).await?;
356 let mut buf = buf.borrow_mut();
357 loop {
358 let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
359 if got == 0 {
360 break;
361 } else {
362 file.write_all(&buf[..got]).await?;
363 }
364 }
365 Ok(file.file_offset())
366 }
367
368 /// Return a file offset usable with `add_hardlink`.
369 pub async fn add_symlink(
370 &mut self,
371 metadata: &Metadata,
372 file_name: &Path,
373 target: &Path,
374 ) -> io::Result<()> {
375 let target = target.as_os_str().as_bytes();
376 let mut data = Vec::with_capacity(target.len() + 1);
377 data.extend(target);
378 data.push(0);
379 let _ofs: LinkOffset = self
380 .add_file_entry(
381 Some(metadata),
382 file_name,
383 Some((format::PXAR_SYMLINK, &data)),
384 )
385 .await?;
386 Ok(())
387 }
388
389 /// Return a file offset usable with `add_hardlink`.
390 pub async fn add_hardlink(
391 &mut self,
392 file_name: &Path,
393 target: &Path,
394 target_offset: LinkOffset,
395 ) -> io::Result<()> {
396 let current_offset = self.position();
397 if current_offset <= target_offset.0 {
398 io_bail!("invalid hardlink offset, can only point to prior files");
399 }
400
401 let offset_bytes = (current_offset - target_offset.0).to_le_bytes();
402 let target_bytes = target.as_os_str().as_bytes();
403 let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len() + 1);
404 hardlink.extend(&offset_bytes);
405 hardlink.extend(target_bytes);
406 hardlink.push(0);
407 let _this_offset: LinkOffset = self
408 .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink)))
409 .await?;
410 Ok(())
411 }
412
413 /// Return a file offset usable with `add_hardlink`.
414 pub async fn add_device(
415 &mut self,
416 metadata: &Metadata,
417 file_name: &Path,
418 device: format::Device,
419 ) -> io::Result<()> {
420 if !metadata.is_device() {
421 io_bail!("entry added via add_device must have a device mode in its metadata");
422 }
423
424 let device = device.to_le();
425 let device = unsafe {
426 std::slice::from_raw_parts(
427 &device as *const format::Device as *const u8,
428 size_of::<format::Device>(),
429 )
430 };
431 let _ofs: LinkOffset = self
432 .add_file_entry(
433 Some(metadata),
434 file_name,
435 Some((format::PXAR_DEVICE, device)),
436 )
437 .await?;
438 Ok(())
439 }
440
441 /// Return a file offset usable with `add_hardlink`.
442 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
443 if !metadata.is_fifo() {
444 io_bail!("entry added via add_device must be of type fifo in its metadata");
445 }
446
447 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
448 Ok(())
449 }
450
451 /// Return a file offset usable with `add_hardlink`.
452 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
453 if !metadata.is_socket() {
454 io_bail!("entry added via add_device must be of type socket in its metadata");
455 }
456
457 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
458 Ok(())
459 }
460
461 /// Return a file offset usable with `add_hardlink`.
462 async fn add_file_entry(
463 &mut self,
464 metadata: Option<&Metadata>,
465 file_name: &Path,
466 entry_htype_data: Option<(u64, &[u8])>,
467 ) -> io::Result<LinkOffset> {
468 self.check()?;
469
470 let file_offset = self.position();
471
472 let file_name = file_name.as_os_str().as_bytes();
473
474 self.start_file_do(metadata, file_name).await?;
475 if let Some((htype, entry_data)) = entry_htype_data {
476 seq_write_pxar_entry(
477 self.output.as_mut().unwrap(),
478 htype,
479 entry_data,
480 &mut self.state.write_position,
481 )
482 .await?;
483 }
484
485 let end_offset = self.position();
486
487 self.state.items.push(GoodbyeItem {
488 hash: format::hash_filename(file_name),
489 offset: file_offset,
490 size: end_offset - file_offset,
491 });
492
493 Ok(LinkOffset(file_offset))
494 }
495
496 #[inline]
497 fn position(&mut self) -> u64 {
498 self.state.write_position
499 }
500
501 pub async fn create_directory<'b>(
502 &'b mut self,
503 file_name: &Path,
504 metadata: &Metadata,
505 ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>>
506 where
507 'a: 'b,
508 {
509 self.check()?;
510
511 if !metadata.is_dir() {
512 io_bail!("directory metadata must contain the directory mode flag");
513 }
514
515 let file_name = file_name.as_os_str().as_bytes();
516 let file_hash = format::hash_filename(file_name);
517
518 let file_offset = self.position();
519 self.encode_filename(file_name).await?;
520
521 let entry_offset = self.position();
522 self.encode_metadata(&metadata).await?;
523
524 let files_offset = self.position();
525
526 // the child will write to OUR state now:
527 let write_position = self.position();
528
529 Ok(EncoderImpl {
530 output: self.output.as_mut().map(SeqWrite::as_trait_object),
531 state: EncoderState {
532 entry_offset,
533 files_offset,
534 file_offset: Some(file_offset),
535 file_hash,
536 write_position,
537 ..Default::default()
538 },
539 parent: Some(&mut self.state),
540 finished: false,
541 file_copy_buffer: Rc::clone(&self.file_copy_buffer),
542 })
543 }
544
545 async fn start_file_do(
546 &mut self,
547 metadata: Option<&Metadata>,
548 file_name: &[u8],
549 ) -> io::Result<()> {
550 self.encode_filename(file_name).await?;
551 if let Some(metadata) = metadata {
552 self.encode_metadata(&metadata).await?;
553 }
554 Ok(())
555 }
556
557 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
558 seq_write_pxar_struct_entry(
559 self.output.as_mut().unwrap(),
560 format::PXAR_ENTRY,
561 metadata.stat.clone(),
562 &mut self.state.write_position,
563 )
564 .await?;
565
566 for xattr in &metadata.xattrs {
567 self.write_xattr(xattr).await?;
568 }
569
570 self.write_acls(&metadata.acl).await?;
571
572 if let Some(fcaps) = &metadata.fcaps {
573 self.write_file_capabilities(fcaps).await?;
574 }
575
576 if let Some(qpid) = &metadata.quota_project_id {
577 self.write_quota_project_id(qpid).await?;
578 }
579
580 Ok(())
581 }
582
583 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
584 seq_write_pxar_entry(
585 self.output.as_mut().unwrap(),
586 format::PXAR_XATTR,
587 &xattr.data,
588 &mut self.state.write_position,
589 )
590 .await
591 }
592
593 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
594 for acl in &acl.users {
595 seq_write_pxar_struct_entry(
596 self.output.as_mut().unwrap(),
597 format::PXAR_ACL_USER,
598 acl.clone(),
599 &mut self.state.write_position,
600 )
601 .await?;
602 }
603
604 for acl in &acl.groups {
605 seq_write_pxar_struct_entry(
606 self.output.as_mut().unwrap(),
607 format::PXAR_ACL_GROUP,
608 acl.clone(),
609 &mut self.state.write_position,
610 )
611 .await?;
612 }
613
614 if let Some(acl) = &acl.group_obj {
615 seq_write_pxar_struct_entry(
616 self.output.as_mut().unwrap(),
617 format::PXAR_ACL_GROUP_OBJ,
618 acl.clone(),
619 &mut self.state.write_position,
620 )
621 .await?;
622 }
623
624 if let Some(acl) = &acl.default {
625 seq_write_pxar_struct_entry(
626 self.output.as_mut().unwrap(),
627 format::PXAR_ACL_DEFAULT,
628 acl.clone(),
629 &mut self.state.write_position,
630 )
631 .await?;
632 }
633
634 for acl in &acl.default_users {
635 seq_write_pxar_struct_entry(
636 self.output.as_mut().unwrap(),
637 format::PXAR_ACL_DEFAULT_USER,
638 acl.clone(),
639 &mut self.state.write_position,
640 )
641 .await?;
642 }
643
644 for acl in &acl.default_groups {
645 seq_write_pxar_struct_entry(
646 self.output.as_mut().unwrap(),
647 format::PXAR_ACL_DEFAULT_GROUP,
648 acl.clone(),
649 &mut self.state.write_position,
650 )
651 .await?;
652 }
653
654 Ok(())
655 }
656
657 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
658 seq_write_pxar_entry(
659 self.output.as_mut().unwrap(),
660 format::PXAR_FCAPS,
661 &fcaps.data,
662 &mut self.state.write_position,
663 )
664 .await
665 }
666
667 async fn write_quota_project_id(
668 &mut self,
669 quota_project_id: &format::QuotaProjectId,
670 ) -> io::Result<()> {
671 seq_write_pxar_struct_entry(
672 self.output.as_mut().unwrap(),
673 format::PXAR_QUOTA_PROJID,
674 *quota_project_id,
675 &mut self.state.write_position,
676 )
677 .await
678 }
679
680 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
681 crate::util::validate_filename(file_name)?;
682 seq_write_pxar_entry_zero(
683 self.output.as_mut().unwrap(),
684 format::PXAR_FILENAME,
685 file_name,
686 &mut self.state.write_position,
687 )
688 .await
689 }
690
691 pub async fn finish(mut self) -> io::Result<T> {
692 let tail_bytes = self.finish_goodbye_table().await?;
693 seq_write_pxar_entry(
694 self.output.as_mut().unwrap(),
695 format::PXAR_GOODBYE,
696 &tail_bytes,
697 &mut self.state.write_position,
698 )
699 .await?;
700
701 // done up here because of the self-borrow and to propagate
702 let end_offset = self.position();
703
704 if let Some(parent) = &mut self.parent {
705 parent.write_position = end_offset;
706
707 let file_offset = self
708 .state
709 .file_offset
710 .expect("internal error: parent set but no file_offset?");
711
712 parent.items.push(GoodbyeItem {
713 hash: self.state.file_hash,
714 offset: file_offset,
715 size: end_offset - file_offset,
716 });
717 }
718 self.finished = true;
719 Ok(self.output.take().unwrap())
720 }
721
722 pub fn into_writer(mut self) -> T {
723 self.output.take().unwrap()
724 }
725
726 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
727 let goodbye_offset = self.position();
728
729 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
730 let mut tail = take(&mut self.state.items);
731 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
732 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
733
734 // sort, then create a BST
735 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
736
737 let mut bst = Vec::with_capacity(tail.len() + 1);
738 unsafe {
739 bst.set_len(tail.len());
740 }
741 binary_tree_array::copy(tail.len(), |src, dest| {
742 let mut item = tail[src].clone();
743 // fixup the goodbye table offsets to be relative and with the right endianess
744 item.offset = goodbye_offset - item.offset;
745 unsafe {
746 std::ptr::write(&mut bst[dest], item.to_le());
747 }
748 });
749 drop(tail);
750
751 bst.push(
752 GoodbyeItem {
753 hash: format::PXAR_GOODBYE_TAIL_MARKER,
754 offset: goodbye_offset - self.state.entry_offset,
755 size: goodbye_size,
756 }
757 .to_le(),
758 );
759
760 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
761 // the items make sense:
762 let data = bst.as_mut_ptr() as *mut u8;
763 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
764 forget(bst);
765 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
766 }
767 }
768
769 /// Writer for a file object in a directory.
770 pub struct FileImpl<'a> {
771 output: &'a mut dyn SeqWrite,
772
773 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
774 /// directly instead of on Drop of FileImpl?
775 goodbye_item: GoodbyeItem,
776
777 /// While writing data to this file, this is how much space we still have left, this must reach
778 /// exactly zero.
779 remaining_size: u64,
780
781 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
782 /// to, and where we insert our `GoodbyeItem`.
783 parent: &'a mut EncoderState,
784 }
785
786 impl<'a> Drop for FileImpl<'a> {
787 fn drop(&mut self) {
788 if self.remaining_size != 0 {
789 self.parent.add_error(EncodeError::IncompleteFile);
790 }
791
792 self.parent.items.push(self.goodbye_item.clone());
793 }
794 }
795
796 impl<'a> FileImpl<'a> {
797 /// Get the file offset to be able to reference it with `add_hardlink`.
798 pub fn file_offset(&self) -> LinkOffset {
799 LinkOffset(self.goodbye_item.offset)
800 }
801
802 fn check_remaining(&self, size: usize) -> io::Result<()> {
803 if size as u64 > self.remaining_size {
804 io_bail!("attempted to write more than previously allocated");
805 } else {
806 Ok(())
807 }
808 }
809
810 /// Poll write interface to more easily connect to tokio/futures.
811 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
812 pub fn poll_write(
813 self: Pin<&mut Self>,
814 cx: &mut Context,
815 data: &[u8],
816 ) -> Poll<io::Result<usize>> {
817 let this = self.get_mut();
818 this.check_remaining(data.len())?;
819 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
820 match output.poll_seq_write(cx, data) {
821 Poll::Ready(Ok(put)) => {
822 this.remaining_size -= put as u64;
823 this.parent.write_position += put as u64;
824 Poll::Ready(Ok(put))
825 }
826 other => other,
827 }
828 }
829
830 /// Poll flush interface to more easily connect to tokio/futures.
831 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
832 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
833 unsafe {
834 self.map_unchecked_mut(|this| &mut this.output)
835 .poll_flush(cx)
836 }
837 }
838
839 /// Poll close/shutdown interface to more easily connect to tokio/futures.
840 ///
841 /// This just calls flush, though, since we're just a virtual writer writing to the file
842 /// provided by our encoder.
843 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
844 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
845 unsafe {
846 self.map_unchecked_mut(|this| &mut this.output)
847 .poll_flush(cx)
848 }
849 }
850
851 /// Write file data for the current file entry in a pxar archive.
852 ///
853 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
854 /// requested. Check the return value for how many. There's also a `write_all` method available
855 /// for convenience.
856 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
857 self.check_remaining(data.len())?;
858 let put =
859 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
860 .await?;
861 //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
862 self.remaining_size -= put as u64;
863 self.parent.write_position += put as u64;
864 Ok(put)
865 }
866
867 /// Completely write file data for the current file entry in a pxar archive.
868 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
869 self.check_remaining(data.len())?;
870 seq_write_all(&mut self.output, data, &mut self.parent.write_position).await?;
871 self.remaining_size -= data.len() as u64;
872 Ok(())
873 }
874 }
875
876 #[cfg(feature = "tokio-io")]
877 impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
878 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
879 FileImpl::poll_write(self, cx, buf)
880 }
881
882 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
883 FileImpl::poll_flush(self, cx)
884 }
885
886 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
887 FileImpl::poll_close(self, cx)
888 }
889 }
890
891 #[cfg(feature = "futures-io")]
892 impl<'a> futures::io::AsyncWrite for FileImpl<'a> {
893 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
894 FileImpl::poll_write(self, cx, buf)
895 }
896
897 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
898 FileImpl::poll_flush(self, cx)
899 }
900
901 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
902 FileImpl::poll_close(self, cx)
903 }
904 }