]> git.proxmox.com Git - pxar.git/blob - src/encoder/mod.rs
formatting fixup
[pxar.git] / src / encoder / mod.rs
1 //! The `pxar` encoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 use std::cell::RefCell;
6 use std::io;
7 use std::mem::{forget, size_of, size_of_val, take};
8 use std::os::unix::ffi::OsStrExt;
9 use std::path::Path;
10 use std::pin::Pin;
11 use std::rc::Rc;
12 use std::task::{Context, Poll};
13
14 use endian_trait::Endian;
15
16 use crate::binary_tree_array;
17 use crate::decoder::{self, SeqRead};
18 use crate::format::{self, GoodbyeItem};
19 use crate::poll_fn::poll_fn;
20 use crate::Metadata;
21
22 pub mod aio;
23 pub mod sync;
24
25 #[doc(inline)]
26 pub use sync::Encoder;
27
28 /// File reference used to create hard links.
29 #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
30 pub struct LinkOffset(u64);
31
32 impl LinkOffset {
33 #[inline]
34 pub fn raw(self) -> u64 {
35 self.0
36 }
37 }
38
39 /// Sequential write interface used by the encoder's state machine.
40 ///
41 /// This is our internal writer trait which is available for `std::io::Write` types in the
42 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
43 /// wrapper.
44 pub trait SeqWrite {
45 fn poll_seq_write(
46 self: Pin<&mut Self>,
47 cx: &mut Context,
48 buf: &[u8],
49 ) -> Poll<io::Result<usize>>;
50
51 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
52
53 /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
54 /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
55 /// subdirectory in that would have a trait object pointing to the trait object, and so on.
56 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
57 where
58 Self: Sized,
59 {
60 self as &mut dyn SeqWrite
61 }
62 }
63
64 /// Allow using trait objects for generics taking a `SeqWrite`.
65 impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
66 fn poll_seq_write(
67 self: Pin<&mut Self>,
68 cx: &mut Context,
69 buf: &[u8],
70 ) -> Poll<io::Result<usize>> {
71 unsafe {
72 self.map_unchecked_mut(|this| &mut **this)
73 .poll_seq_write(cx, buf)
74 }
75 }
76
77 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
78 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
79 }
80
81 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
82 where
83 Self: Sized,
84 {
85 &mut **self
86 }
87 }
88
89 /// awaitable verison of `poll_seq_write`.
90 async fn seq_write<T: SeqWrite + ?Sized>(
91 output: &mut T,
92 buf: &[u8],
93 position: &mut u64,
94 ) -> io::Result<usize> {
95 let put =
96 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await?;
97 *position += put as u64;
98 Ok(put)
99 }
100
101 /// Write the entire contents of a buffer, handling short writes.
102 async fn seq_write_all<T: SeqWrite + ?Sized>(
103 output: &mut T,
104 mut buf: &[u8],
105 position: &mut u64,
106 ) -> io::Result<()> {
107 while !buf.is_empty() {
108 let got = seq_write(&mut *output, buf, &mut *position).await?;
109 buf = &buf[got..];
110 }
111 Ok(())
112 }
113
114 /// Write an endian-swappable struct.
115 async fn seq_write_struct<E: Endian, T>(
116 output: &mut T,
117 data: E,
118 position: &mut u64,
119 ) -> io::Result<()>
120 where
121 T: SeqWrite + ?Sized,
122 {
123 let data = data.to_le();
124 seq_write_all(
125 output,
126 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) },
127 position,
128 )
129 .await
130 }
131
132 /// Write a pxar entry.
133 async fn seq_write_pxar_entry<T>(
134 output: &mut T,
135 htype: u64,
136 data: &[u8],
137 position: &mut u64,
138 ) -> io::Result<()>
139 where
140 T: SeqWrite + ?Sized,
141 {
142 let header = format::Header::with_content_size(htype, data.len() as u64);
143 header.check_header_size()?;
144
145 seq_write_struct(&mut *output, header, &mut *position).await?;
146 seq_write_all(output, data, position).await
147 }
148
149 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
150 /// data buffer.
151 async fn seq_write_pxar_entry_zero<T>(
152 output: &mut T,
153 htype: u64,
154 data: &[u8],
155 position: &mut u64,
156 ) -> io::Result<()>
157 where
158 T: SeqWrite + ?Sized,
159 {
160 let header = format::Header::with_content_size(htype, 1 + data.len() as u64);
161 header.check_header_size()?;
162
163 seq_write_struct(&mut *output, header, &mut *position).await?;
164 seq_write_all(&mut *output, data, &mut *position).await?;
165 seq_write_all(output, &[0u8], position).await
166 }
167
168 /// Write a pxar entry consiting of an endian-swappable struct.
169 async fn seq_write_pxar_struct_entry<E, T>(
170 output: &mut T,
171 htype: u64,
172 data: E,
173 position: &mut u64,
174 ) -> io::Result<()>
175 where
176 T: SeqWrite + ?Sized,
177 E: Endian,
178 {
179 let data = data.to_le();
180 seq_write_pxar_entry(
181 output,
182 htype,
183 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) },
184 position,
185 )
186 .await
187 }
188
189 /// Error conditions caused by wrong usage of this crate.
190 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
191 pub enum EncodeError {
192 /// The user dropped a `File` without without finishing writing all of its contents.
193 ///
194 /// This is required because the payload lengths is written out at the begining and decoding
195 /// requires there to follow the right amount of data.
196 IncompleteFile,
197
198 /// The user dropped a directory without finalizing it.
199 ///
200 /// Finalizing is required to build the goodbye table at the end of a directory.
201 IncompleteDirectory,
202 }
203
204 #[derive(Default)]
205 struct EncoderState {
206 /// Goodbye items for this directory, excluding the tail.
207 items: Vec<GoodbyeItem>,
208
209 /// User caused error conditions.
210 encode_error: Option<EncodeError>,
211
212 /// Offset of this directory's ENTRY.
213 entry_offset: u64,
214
215 /// Offset to this directory's first FILENAME.
216 files_offset: u64,
217
218 /// If this is a subdirectory, this points to the this directory's FILENAME.
219 file_offset: Option<u64>,
220
221 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
222 file_hash: u64,
223
224 /// We need to keep track how much we have written to get offsets.
225 write_position: u64,
226 }
227
228 impl EncoderState {
229 fn merge_error(&mut self, error: Option<EncodeError>) {
230 // one error is enough:
231 if self.encode_error.is_none() {
232 self.encode_error = error;
233 }
234 }
235
236 fn add_error(&mut self, error: EncodeError) {
237 self.merge_error(Some(error));
238 }
239 }
240
241 /// The encoder state machine implementation for a directory.
242 ///
243 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
244 /// synchronous or `async` I/O objects in as output.
245 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
246 output: Option<T>,
247 state: EncoderState,
248 parent: Option<&'a mut EncoderState>,
249 finished: bool,
250
251 /// Since only the "current" entry can be actively writing files, we share the file copy
252 /// buffer.
253 file_copy_buffer: Rc<RefCell<Vec<u8>>>,
254 }
255
256 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
257 fn drop(&mut self) {
258 if let Some(ref mut parent) = self.parent {
259 // propagate errors:
260 parent.merge_error(self.state.encode_error);
261 if !self.finished {
262 parent.add_error(EncodeError::IncompleteDirectory);
263 }
264 } else if !self.finished {
265 // FIXME: how do we deal with this?
266 // eprintln!("Encoder dropped without finishing!");
267 }
268 }
269 }
270
271 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
272 pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
273 if !metadata.is_dir() {
274 io_bail!("directory metadata must contain the directory mode flag");
275 }
276 let mut this = Self {
277 output: Some(output),
278 state: EncoderState::default(),
279 parent: None,
280 finished: false,
281 file_copy_buffer: Rc::new(RefCell::new(crate::util::vec_new(1024 * 1024))),
282 };
283
284 this.encode_metadata(metadata).await?;
285 this.state.files_offset = this.position();
286
287 Ok(this)
288 }
289
290 fn check(&self) -> io::Result<()> {
291 match self.state.encode_error {
292 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
293 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
294 None => Ok(()),
295 }
296 }
297
298 pub async fn create_file<'b>(
299 &'b mut self,
300 metadata: &Metadata,
301 file_name: &Path,
302 file_size: u64,
303 ) -> io::Result<FileImpl<'b>>
304 where
305 'a: 'b,
306 {
307 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
308 .await
309 }
310
311 async fn create_file_do<'b>(
312 &'b mut self,
313 metadata: &Metadata,
314 file_name: &[u8],
315 file_size: u64,
316 ) -> io::Result<FileImpl<'b>>
317 where
318 'a: 'b,
319 {
320 self.check()?;
321
322 let file_offset = self.position();
323 self.start_file_do(Some(metadata), file_name).await?;
324
325 let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
326 header.check_header_size()?;
327
328 seq_write_struct(
329 self.output.as_mut().unwrap(),
330 header,
331 &mut self.state.write_position,
332 )
333 .await?;
334
335 let payload_data_offset = self.position();
336
337 let meta_size = payload_data_offset - file_offset;
338
339 Ok(FileImpl {
340 output: self.output.as_mut().unwrap(),
341 goodbye_item: GoodbyeItem {
342 hash: format::hash_filename(file_name),
343 offset: file_offset,
344 size: file_size + meta_size,
345 },
346 remaining_size: file_size,
347 parent: &mut self.state,
348 })
349 }
350
351 /// Return a file offset usable with `add_hardlink`.
352 pub async fn add_file(
353 &mut self,
354 metadata: &Metadata,
355 file_name: &Path,
356 file_size: u64,
357 content: &mut dyn SeqRead,
358 ) -> io::Result<LinkOffset> {
359 let buf = Rc::clone(&self.file_copy_buffer);
360 let mut file = self.create_file(metadata, file_name, file_size).await?;
361 let mut buf = buf.borrow_mut();
362 loop {
363 let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
364 if got == 0 {
365 break;
366 } else {
367 file.write_all(&buf[..got]).await?;
368 }
369 }
370 Ok(file.file_offset())
371 }
372
373 /// Return a file offset usable with `add_hardlink`.
374 pub async fn add_symlink(
375 &mut self,
376 metadata: &Metadata,
377 file_name: &Path,
378 target: &Path,
379 ) -> io::Result<()> {
380 let target = target.as_os_str().as_bytes();
381 let mut data = Vec::with_capacity(target.len() + 1);
382 data.extend(target);
383 data.push(0);
384 let _ofs: LinkOffset = self
385 .add_file_entry(
386 Some(metadata),
387 file_name,
388 Some((format::PXAR_SYMLINK, &data)),
389 )
390 .await?;
391 Ok(())
392 }
393
394 /// Return a file offset usable with `add_hardlink`.
395 pub async fn add_hardlink(
396 &mut self,
397 file_name: &Path,
398 target: &Path,
399 target_offset: LinkOffset,
400 ) -> io::Result<()> {
401 let current_offset = self.position();
402 if current_offset <= target_offset.0 {
403 io_bail!("invalid hardlink offset, can only point to prior files");
404 }
405
406 let offset_bytes = (current_offset - target_offset.0).to_le_bytes();
407 let target_bytes = target.as_os_str().as_bytes();
408 let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len() + 1);
409 hardlink.extend(&offset_bytes);
410 hardlink.extend(target_bytes);
411 hardlink.push(0);
412 let _this_offset: LinkOffset = self
413 .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink)))
414 .await?;
415 Ok(())
416 }
417
418 /// Return a file offset usable with `add_hardlink`.
419 pub async fn add_device(
420 &mut self,
421 metadata: &Metadata,
422 file_name: &Path,
423 device: format::Device,
424 ) -> io::Result<()> {
425 if !metadata.is_device() {
426 io_bail!("entry added via add_device must have a device mode in its metadata");
427 }
428
429 let device = device.to_le();
430 let device = unsafe {
431 std::slice::from_raw_parts(
432 &device as *const format::Device as *const u8,
433 size_of::<format::Device>(),
434 )
435 };
436 let _ofs: LinkOffset = self
437 .add_file_entry(
438 Some(metadata),
439 file_name,
440 Some((format::PXAR_DEVICE, device)),
441 )
442 .await?;
443 Ok(())
444 }
445
446 /// Return a file offset usable with `add_hardlink`.
447 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
448 if !metadata.is_fifo() {
449 io_bail!("entry added via add_device must be of type fifo in its metadata");
450 }
451
452 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
453 Ok(())
454 }
455
456 /// Return a file offset usable with `add_hardlink`.
457 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
458 if !metadata.is_socket() {
459 io_bail!("entry added via add_device must be of type socket in its metadata");
460 }
461
462 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
463 Ok(())
464 }
465
466 /// Return a file offset usable with `add_hardlink`.
467 async fn add_file_entry(
468 &mut self,
469 metadata: Option<&Metadata>,
470 file_name: &Path,
471 entry_htype_data: Option<(u64, &[u8])>,
472 ) -> io::Result<LinkOffset> {
473 self.check()?;
474
475 let file_offset = self.position();
476
477 let file_name = file_name.as_os_str().as_bytes();
478
479 self.start_file_do(metadata, file_name).await?;
480 if let Some((htype, entry_data)) = entry_htype_data {
481 seq_write_pxar_entry(
482 self.output.as_mut().unwrap(),
483 htype,
484 entry_data,
485 &mut self.state.write_position,
486 )
487 .await?;
488 }
489
490 let end_offset = self.position();
491
492 self.state.items.push(GoodbyeItem {
493 hash: format::hash_filename(file_name),
494 offset: file_offset,
495 size: end_offset - file_offset,
496 });
497
498 Ok(LinkOffset(file_offset))
499 }
500
501 #[inline]
502 fn position(&mut self) -> u64 {
503 self.state.write_position
504 }
505
506 pub async fn create_directory<'b>(
507 &'b mut self,
508 file_name: &Path,
509 metadata: &Metadata,
510 ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>>
511 where
512 'a: 'b,
513 {
514 self.check()?;
515
516 if !metadata.is_dir() {
517 io_bail!("directory metadata must contain the directory mode flag");
518 }
519
520 let file_name = file_name.as_os_str().as_bytes();
521 let file_hash = format::hash_filename(file_name);
522
523 let file_offset = self.position();
524 self.encode_filename(file_name).await?;
525
526 let entry_offset = self.position();
527 self.encode_metadata(&metadata).await?;
528
529 let files_offset = self.position();
530
531 // the child will write to OUR state now:
532 let write_position = self.position();
533
534 Ok(EncoderImpl {
535 output: self.output.as_mut().map(SeqWrite::as_trait_object),
536 state: EncoderState {
537 entry_offset,
538 files_offset,
539 file_offset: Some(file_offset),
540 file_hash,
541 write_position,
542 ..Default::default()
543 },
544 parent: Some(&mut self.state),
545 finished: false,
546 file_copy_buffer: Rc::clone(&self.file_copy_buffer),
547 })
548 }
549
550 async fn start_file_do(
551 &mut self,
552 metadata: Option<&Metadata>,
553 file_name: &[u8],
554 ) -> io::Result<()> {
555 self.encode_filename(file_name).await?;
556 if let Some(metadata) = metadata {
557 self.encode_metadata(&metadata).await?;
558 }
559 Ok(())
560 }
561
562 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
563 seq_write_pxar_struct_entry(
564 self.output.as_mut().unwrap(),
565 format::PXAR_ENTRY,
566 metadata.stat.clone(),
567 &mut self.state.write_position,
568 )
569 .await?;
570
571 for xattr in &metadata.xattrs {
572 self.write_xattr(xattr).await?;
573 }
574
575 self.write_acls(&metadata.acl).await?;
576
577 if let Some(fcaps) = &metadata.fcaps {
578 self.write_file_capabilities(fcaps).await?;
579 }
580
581 if let Some(qpid) = &metadata.quota_project_id {
582 self.write_quota_project_id(qpid).await?;
583 }
584
585 Ok(())
586 }
587
588 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
589 seq_write_pxar_entry(
590 self.output.as_mut().unwrap(),
591 format::PXAR_XATTR,
592 &xattr.data,
593 &mut self.state.write_position,
594 )
595 .await
596 }
597
598 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
599 for acl in &acl.users {
600 seq_write_pxar_struct_entry(
601 self.output.as_mut().unwrap(),
602 format::PXAR_ACL_USER,
603 acl.clone(),
604 &mut self.state.write_position,
605 )
606 .await?;
607 }
608
609 for acl in &acl.groups {
610 seq_write_pxar_struct_entry(
611 self.output.as_mut().unwrap(),
612 format::PXAR_ACL_GROUP,
613 acl.clone(),
614 &mut self.state.write_position,
615 )
616 .await?;
617 }
618
619 if let Some(acl) = &acl.group_obj {
620 seq_write_pxar_struct_entry(
621 self.output.as_mut().unwrap(),
622 format::PXAR_ACL_GROUP_OBJ,
623 acl.clone(),
624 &mut self.state.write_position,
625 )
626 .await?;
627 }
628
629 if let Some(acl) = &acl.default {
630 seq_write_pxar_struct_entry(
631 self.output.as_mut().unwrap(),
632 format::PXAR_ACL_DEFAULT,
633 acl.clone(),
634 &mut self.state.write_position,
635 )
636 .await?;
637 }
638
639 for acl in &acl.default_users {
640 seq_write_pxar_struct_entry(
641 self.output.as_mut().unwrap(),
642 format::PXAR_ACL_DEFAULT_USER,
643 acl.clone(),
644 &mut self.state.write_position,
645 )
646 .await?;
647 }
648
649 for acl in &acl.default_groups {
650 seq_write_pxar_struct_entry(
651 self.output.as_mut().unwrap(),
652 format::PXAR_ACL_DEFAULT_GROUP,
653 acl.clone(),
654 &mut self.state.write_position,
655 )
656 .await?;
657 }
658
659 Ok(())
660 }
661
662 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
663 seq_write_pxar_entry(
664 self.output.as_mut().unwrap(),
665 format::PXAR_FCAPS,
666 &fcaps.data,
667 &mut self.state.write_position,
668 )
669 .await
670 }
671
672 async fn write_quota_project_id(
673 &mut self,
674 quota_project_id: &format::QuotaProjectId,
675 ) -> io::Result<()> {
676 seq_write_pxar_struct_entry(
677 self.output.as_mut().unwrap(),
678 format::PXAR_QUOTA_PROJID,
679 *quota_project_id,
680 &mut self.state.write_position,
681 )
682 .await
683 }
684
685 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
686 crate::util::validate_filename(file_name)?;
687 seq_write_pxar_entry_zero(
688 self.output.as_mut().unwrap(),
689 format::PXAR_FILENAME,
690 file_name,
691 &mut self.state.write_position,
692 )
693 .await
694 }
695
696 pub async fn finish(mut self) -> io::Result<T> {
697 let tail_bytes = self.finish_goodbye_table().await?;
698 seq_write_pxar_entry(
699 self.output.as_mut().unwrap(),
700 format::PXAR_GOODBYE,
701 &tail_bytes,
702 &mut self.state.write_position,
703 )
704 .await?;
705
706 // done up here because of the self-borrow and to propagate
707 let end_offset = self.position();
708
709 if let Some(parent) = &mut self.parent {
710 parent.write_position = end_offset;
711
712 let file_offset = self
713 .state
714 .file_offset
715 .expect("internal error: parent set but no file_offset?");
716
717 parent.items.push(GoodbyeItem {
718 hash: self.state.file_hash,
719 offset: file_offset,
720 size: end_offset - file_offset,
721 });
722 }
723 self.finished = true;
724 Ok(self.output.take().unwrap())
725 }
726
727 pub fn into_writer(mut self) -> T {
728 self.output.take().unwrap()
729 }
730
731 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
732 let goodbye_offset = self.position();
733
734 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
735 let mut tail = take(&mut self.state.items);
736 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
737 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
738
739 // sort, then create a BST
740 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
741
742 let mut bst = Vec::with_capacity(tail.len() + 1);
743 unsafe {
744 bst.set_len(tail.len());
745 }
746 binary_tree_array::copy(tail.len(), |src, dest| {
747 let mut item = tail[src].clone();
748 // fixup the goodbye table offsets to be relative and with the right endianess
749 item.offset = goodbye_offset - item.offset;
750 unsafe {
751 std::ptr::write(&mut bst[dest], item.to_le());
752 }
753 });
754 drop(tail);
755
756 bst.push(
757 GoodbyeItem {
758 hash: format::PXAR_GOODBYE_TAIL_MARKER,
759 offset: goodbye_offset - self.state.entry_offset,
760 size: goodbye_size,
761 }
762 .to_le(),
763 );
764
765 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
766 // the items make sense:
767 let data = bst.as_mut_ptr() as *mut u8;
768 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
769 forget(bst);
770 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
771 }
772 }
773
774 /// Writer for a file object in a directory.
775 pub struct FileImpl<'a> {
776 output: &'a mut dyn SeqWrite,
777
778 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
779 /// directly instead of on Drop of FileImpl?
780 goodbye_item: GoodbyeItem,
781
782 /// While writing data to this file, this is how much space we still have left, this must reach
783 /// exactly zero.
784 remaining_size: u64,
785
786 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
787 /// to, and where we insert our `GoodbyeItem`.
788 parent: &'a mut EncoderState,
789 }
790
791 impl<'a> Drop for FileImpl<'a> {
792 fn drop(&mut self) {
793 if self.remaining_size != 0 {
794 self.parent.add_error(EncodeError::IncompleteFile);
795 }
796
797 self.parent.items.push(self.goodbye_item.clone());
798 }
799 }
800
801 impl<'a> FileImpl<'a> {
802 /// Get the file offset to be able to reference it with `add_hardlink`.
803 pub fn file_offset(&self) -> LinkOffset {
804 LinkOffset(self.goodbye_item.offset)
805 }
806
807 fn check_remaining(&self, size: usize) -> io::Result<()> {
808 if size as u64 > self.remaining_size {
809 io_bail!("attempted to write more than previously allocated");
810 } else {
811 Ok(())
812 }
813 }
814
815 /// Poll write interface to more easily connect to tokio/futures.
816 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
817 pub fn poll_write(
818 self: Pin<&mut Self>,
819 cx: &mut Context,
820 data: &[u8],
821 ) -> Poll<io::Result<usize>> {
822 let this = self.get_mut();
823 this.check_remaining(data.len())?;
824 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
825 match output.poll_seq_write(cx, data) {
826 Poll::Ready(Ok(put)) => {
827 this.remaining_size -= put as u64;
828 this.parent.write_position += put as u64;
829 Poll::Ready(Ok(put))
830 }
831 other => other,
832 }
833 }
834
835 /// Poll flush interface to more easily connect to tokio/futures.
836 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
837 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
838 unsafe {
839 self.map_unchecked_mut(|this| &mut this.output)
840 .poll_flush(cx)
841 }
842 }
843
844 /// Poll close/shutdown interface to more easily connect to tokio/futures.
845 ///
846 /// This just calls flush, though, since we're just a virtual writer writing to the file
847 /// provided by our encoder.
848 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
849 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
850 unsafe {
851 self.map_unchecked_mut(|this| &mut this.output)
852 .poll_flush(cx)
853 }
854 }
855
856 /// Write file data for the current file entry in a pxar archive.
857 ///
858 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
859 /// requested. Check the return value for how many. There's also a `write_all` method available
860 /// for convenience.
861 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
862 self.check_remaining(data.len())?;
863 let put =
864 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
865 .await?;
866 //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
867 self.remaining_size -= put as u64;
868 self.parent.write_position += put as u64;
869 Ok(put)
870 }
871
872 /// Completely write file data for the current file entry in a pxar archive.
873 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
874 self.check_remaining(data.len())?;
875 seq_write_all(&mut self.output, data, &mut self.parent.write_position).await?;
876 self.remaining_size -= data.len() as u64;
877 Ok(())
878 }
879 }
880
881 #[cfg(feature = "tokio-io")]
882 impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
883 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
884 FileImpl::poll_write(self, cx, buf)
885 }
886
887 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
888 FileImpl::poll_flush(self, cx)
889 }
890
891 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
892 FileImpl::poll_close(self, cx)
893 }
894 }
895
896 #[cfg(feature = "futures-io")]
897 impl<'a> futures::io::AsyncWrite for FileImpl<'a> {
898 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
899 FileImpl::poll_write(self, cx, buf)
900 }
901
902 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
903 FileImpl::poll_flush(self, cx)
904 }
905
906 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
907 FileImpl::poll_close(self, cx)
908 }
909 }