]> git.proxmox.com Git - pxar.git/blob - src/encoder/mod.rs
rustfmt
[pxar.git] / src / encoder / mod.rs
1 //! The `pxar` encoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 use std::io;
6 use std::mem::{forget, size_of, size_of_val, take};
7 use std::os::unix::ffi::OsStrExt;
8 use std::path::Path;
9 use std::pin::Pin;
10 use std::sync::{Arc, Mutex};
11 use std::task::{Context, Poll};
12
13 use endian_trait::Endian;
14
15 use crate::binary_tree_array;
16 use crate::decoder::{self, SeqRead};
17 use crate::format::{self, GoodbyeItem};
18 use crate::poll_fn::poll_fn;
19 use crate::Metadata;
20
21 pub mod aio;
22 pub mod sync;
23
24 #[doc(inline)]
25 pub use sync::Encoder;
26
27 /// File reference used to create hard links.
28 #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
29 pub struct LinkOffset(u64);
30
31 impl LinkOffset {
32 #[inline]
33 pub fn raw(self) -> u64 {
34 self.0
35 }
36 }
37
38 /// Sequential write interface used by the encoder's state machine.
39 ///
40 /// This is our internal writer trait which is available for `std::io::Write` types in the
41 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
42 /// wrapper.
43 pub trait SeqWrite {
44 fn poll_seq_write(
45 self: Pin<&mut Self>,
46 cx: &mut Context,
47 buf: &[u8],
48 ) -> Poll<io::Result<usize>>;
49
50 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
51 }
52
53 /// Allow using trait objects for generics taking a `SeqWrite`.
54 impl<S> SeqWrite for &mut S
55 where
56 S: SeqWrite + ?Sized,
57 {
58 fn poll_seq_write(
59 self: Pin<&mut Self>,
60 cx: &mut Context,
61 buf: &[u8],
62 ) -> Poll<io::Result<usize>> {
63 unsafe {
64 self.map_unchecked_mut(|this| &mut **this)
65 .poll_seq_write(cx, buf)
66 }
67 }
68
69 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
70 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
71 }
72 }
73
74 /// awaitable verison of `poll_seq_write`.
75 async fn seq_write<T: SeqWrite + ?Sized>(
76 output: &mut T,
77 buf: &[u8],
78 position: &mut u64,
79 ) -> io::Result<usize> {
80 let put =
81 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await?;
82 *position += put as u64;
83 Ok(put)
84 }
85
86 /// Write the entire contents of a buffer, handling short writes.
87 async fn seq_write_all<T: SeqWrite + ?Sized>(
88 output: &mut T,
89 mut buf: &[u8],
90 position: &mut u64,
91 ) -> io::Result<()> {
92 while !buf.is_empty() {
93 let got = seq_write(&mut *output, buf, &mut *position).await?;
94 buf = &buf[got..];
95 }
96 Ok(())
97 }
98
99 /// Write an endian-swappable struct.
100 async fn seq_write_struct<E: Endian, T>(
101 output: &mut T,
102 data: E,
103 position: &mut u64,
104 ) -> io::Result<()>
105 where
106 T: SeqWrite + ?Sized,
107 {
108 let data = data.to_le();
109 let buf =
110 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
111 seq_write_all(output, buf, position).await
112 }
113
114 /// Write a pxar entry.
115 async fn seq_write_pxar_entry<T>(
116 output: &mut T,
117 htype: u64,
118 data: &[u8],
119 position: &mut u64,
120 ) -> io::Result<()>
121 where
122 T: SeqWrite + ?Sized,
123 {
124 let header = format::Header::with_content_size(htype, data.len() as u64);
125 header.check_header_size()?;
126
127 seq_write_struct(&mut *output, header, &mut *position).await?;
128 seq_write_all(output, data, position).await
129 }
130
131 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
132 /// data buffer.
133 async fn seq_write_pxar_entry_zero<T>(
134 output: &mut T,
135 htype: u64,
136 data: &[u8],
137 position: &mut u64,
138 ) -> io::Result<()>
139 where
140 T: SeqWrite + ?Sized,
141 {
142 let header = format::Header::with_content_size(htype, 1 + data.len() as u64);
143 header.check_header_size()?;
144
145 seq_write_struct(&mut *output, header, &mut *position).await?;
146 seq_write_all(&mut *output, data, &mut *position).await?;
147 seq_write_all(output, &[0u8], position).await
148 }
149
150 /// Write a pxar entry consiting of an endian-swappable struct.
151 async fn seq_write_pxar_struct_entry<E, T>(
152 output: &mut T,
153 htype: u64,
154 data: E,
155 position: &mut u64,
156 ) -> io::Result<()>
157 where
158 T: SeqWrite + ?Sized,
159 E: Endian,
160 {
161 let data = data.to_le();
162 let buf =
163 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
164 seq_write_pxar_entry(output, htype, buf, position).await
165 }
166
167 /// Error conditions caused by wrong usage of this crate.
168 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
169 pub enum EncodeError {
170 /// The user dropped a `File` without without finishing writing all of its contents.
171 ///
172 /// This is required because the payload lengths is written out at the begining and decoding
173 /// requires there to follow the right amount of data.
174 IncompleteFile,
175
176 /// The user dropped a directory without finalizing it.
177 ///
178 /// Finalizing is required to build the goodbye table at the end of a directory.
179 IncompleteDirectory,
180 }
181
182 #[derive(Default)]
183 struct EncoderState {
184 /// Goodbye items for this directory, excluding the tail.
185 items: Vec<GoodbyeItem>,
186
187 /// User caused error conditions.
188 encode_error: Option<EncodeError>,
189
190 /// Offset of this directory's ENTRY.
191 entry_offset: u64,
192
193 /// Offset to this directory's first FILENAME.
194 files_offset: u64,
195
196 /// If this is a subdirectory, this points to the this directory's FILENAME.
197 file_offset: Option<u64>,
198
199 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
200 file_hash: u64,
201
202 /// We need to keep track how much we have written to get offsets.
203 write_position: u64,
204 }
205
206 impl EncoderState {
207 fn merge_error(&mut self, error: Option<EncodeError>) {
208 // one error is enough:
209 if self.encode_error.is_none() {
210 self.encode_error = error;
211 }
212 }
213
214 fn add_error(&mut self, error: EncodeError) {
215 self.merge_error(Some(error));
216 }
217 }
218
219 pub(crate) enum EncoderOutput<'a, T> {
220 Owned(T),
221 Borrowed(&'a mut T),
222 }
223
224 impl<'a, T> EncoderOutput<'a, T> {
225 #[inline]
226 fn to_borrowed<'s>(&'s mut self) -> EncoderOutput<'s, T>
227 where
228 'a: 's,
229 {
230 EncoderOutput::Borrowed(self.as_mut())
231 }
232 }
233
234 impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
235 fn as_mut(&mut self) -> &mut T {
236 match self {
237 EncoderOutput::Owned(o) => o,
238 EncoderOutput::Borrowed(b) => b,
239 }
240 }
241 }
242
243 impl<'a, T> std::convert::From<T> for EncoderOutput<'a, T> {
244 fn from(t: T) -> Self {
245 EncoderOutput::Owned(t)
246 }
247 }
248
249 impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
250 fn from(t: &'a mut T) -> Self {
251 EncoderOutput::Borrowed(t)
252 }
253 }
254
255 /// The encoder state machine implementation for a directory.
256 ///
257 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
258 /// synchronous or `async` I/O objects in as output.
259 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
260 output: EncoderOutput<'a, T>,
261 state: EncoderState,
262 parent: Option<&'a mut EncoderState>,
263 finished: bool,
264
265 /// Since only the "current" entry can be actively writing files, we share the file copy
266 /// buffer.
267 file_copy_buffer: Arc<Mutex<Vec<u8>>>,
268 }
269
270 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
271 fn drop(&mut self) {
272 if let Some(ref mut parent) = self.parent {
273 // propagate errors:
274 parent.merge_error(self.state.encode_error);
275 if !self.finished {
276 parent.add_error(EncodeError::IncompleteDirectory);
277 }
278 } else if !self.finished {
279 // FIXME: how do we deal with this?
280 // eprintln!("Encoder dropped without finishing!");
281 }
282 }
283 }
284
285 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
286 pub async fn new(
287 output: EncoderOutput<'a, T>,
288 metadata: &Metadata,
289 ) -> io::Result<EncoderImpl<'a, T>> {
290 if !metadata.is_dir() {
291 io_bail!("directory metadata must contain the directory mode flag");
292 }
293 let mut this = Self {
294 output,
295 state: EncoderState::default(),
296 parent: None,
297 finished: false,
298 file_copy_buffer: Arc::new(Mutex::new(crate::util::vec_new(1024 * 1024))),
299 };
300
301 this.encode_metadata(metadata).await?;
302 this.state.files_offset = this.position();
303
304 Ok(this)
305 }
306
307 fn check(&self) -> io::Result<()> {
308 match self.state.encode_error {
309 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
310 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
311 None => Ok(()),
312 }
313 }
314
315 pub async fn create_file<'b>(
316 &'b mut self,
317 metadata: &Metadata,
318 file_name: &Path,
319 file_size: u64,
320 ) -> io::Result<FileImpl<'b, T>>
321 where
322 'a: 'b,
323 {
324 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
325 .await
326 }
327
328 async fn create_file_do<'b>(
329 &'b mut self,
330 metadata: &Metadata,
331 file_name: &[u8],
332 file_size: u64,
333 ) -> io::Result<FileImpl<'b, T>>
334 where
335 'a: 'b,
336 {
337 self.check()?;
338
339 let file_offset = self.position();
340 self.start_file_do(Some(metadata), file_name).await?;
341
342 let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
343 header.check_header_size()?;
344
345 seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
346
347 let payload_data_offset = self.position();
348
349 let meta_size = payload_data_offset - file_offset;
350
351 Ok(FileImpl {
352 output: self.output.as_mut(),
353 goodbye_item: GoodbyeItem {
354 hash: format::hash_filename(file_name),
355 offset: file_offset,
356 size: file_size + meta_size,
357 },
358 remaining_size: file_size,
359 parent: &mut self.state,
360 })
361 }
362
363 /// Return a file offset usable with `add_hardlink`.
364 pub async fn add_file(
365 &mut self,
366 metadata: &Metadata,
367 file_name: &Path,
368 file_size: u64,
369 content: &mut dyn SeqRead,
370 ) -> io::Result<LinkOffset> {
371 let buf = Arc::clone(&self.file_copy_buffer);
372 let mut file = self.create_file(metadata, file_name, file_size).await?;
373 let mut buf = buf.lock().expect("failed to lock temporary buffer mutex");
374 loop {
375 let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
376 if got == 0 {
377 break;
378 } else {
379 file.write_all(&buf[..got]).await?;
380 }
381 }
382 Ok(file.file_offset())
383 }
384
385 /// Return a file offset usable with `add_hardlink`.
386 pub async fn add_symlink(
387 &mut self,
388 metadata: &Metadata,
389 file_name: &Path,
390 target: &Path,
391 ) -> io::Result<()> {
392 let target = target.as_os_str().as_bytes();
393 let mut data = Vec::with_capacity(target.len() + 1);
394 data.extend(target);
395 data.push(0);
396 let _ofs: LinkOffset = self
397 .add_file_entry(
398 Some(metadata),
399 file_name,
400 Some((format::PXAR_SYMLINK, &data)),
401 )
402 .await?;
403 Ok(())
404 }
405
406 /// Return a file offset usable with `add_hardlink`.
407 pub async fn add_hardlink(
408 &mut self,
409 file_name: &Path,
410 target: &Path,
411 target_offset: LinkOffset,
412 ) -> io::Result<()> {
413 let current_offset = self.position();
414 if current_offset <= target_offset.0 {
415 io_bail!("invalid hardlink offset, can only point to prior files");
416 }
417
418 let offset_bytes = (current_offset - target_offset.0).to_le_bytes();
419 let target_bytes = target.as_os_str().as_bytes();
420 let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len() + 1);
421 hardlink.extend(&offset_bytes);
422 hardlink.extend(target_bytes);
423 hardlink.push(0);
424 let _this_offset: LinkOffset = self
425 .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink)))
426 .await?;
427 Ok(())
428 }
429
430 /// Return a file offset usable with `add_hardlink`.
431 pub async fn add_device(
432 &mut self,
433 metadata: &Metadata,
434 file_name: &Path,
435 device: format::Device,
436 ) -> io::Result<()> {
437 if !metadata.is_device() {
438 io_bail!("entry added via add_device must have a device mode in its metadata");
439 }
440
441 let device = device.to_le();
442 let device = unsafe {
443 std::slice::from_raw_parts(
444 &device as *const format::Device as *const u8,
445 size_of::<format::Device>(),
446 )
447 };
448 let _ofs: LinkOffset = self
449 .add_file_entry(
450 Some(metadata),
451 file_name,
452 Some((format::PXAR_DEVICE, device)),
453 )
454 .await?;
455 Ok(())
456 }
457
458 /// Return a file offset usable with `add_hardlink`.
459 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
460 if !metadata.is_fifo() {
461 io_bail!("entry added via add_device must be of type fifo in its metadata");
462 }
463
464 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
465 Ok(())
466 }
467
468 /// Return a file offset usable with `add_hardlink`.
469 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
470 if !metadata.is_socket() {
471 io_bail!("entry added via add_device must be of type socket in its metadata");
472 }
473
474 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
475 Ok(())
476 }
477
478 /// Return a file offset usable with `add_hardlink`.
479 async fn add_file_entry(
480 &mut self,
481 metadata: Option<&Metadata>,
482 file_name: &Path,
483 entry_htype_data: Option<(u64, &[u8])>,
484 ) -> io::Result<LinkOffset> {
485 self.check()?;
486
487 let file_offset = self.position();
488
489 let file_name = file_name.as_os_str().as_bytes();
490
491 self.start_file_do(metadata, file_name).await?;
492 if let Some((htype, entry_data)) = entry_htype_data {
493 seq_write_pxar_entry(
494 self.output.as_mut(),
495 htype,
496 entry_data,
497 &mut self.state.write_position,
498 )
499 .await?;
500 }
501
502 let end_offset = self.position();
503
504 self.state.items.push(GoodbyeItem {
505 hash: format::hash_filename(file_name),
506 offset: file_offset,
507 size: end_offset - file_offset,
508 });
509
510 Ok(LinkOffset(file_offset))
511 }
512
513 #[inline]
514 fn position(&mut self) -> u64 {
515 self.state.write_position
516 }
517
518 pub async fn create_directory(
519 &mut self,
520 file_name: &Path,
521 metadata: &Metadata,
522 ) -> io::Result<EncoderImpl<'_, T>> {
523 self.check()?;
524
525 if !metadata.is_dir() {
526 io_bail!("directory metadata must contain the directory mode flag");
527 }
528
529 let file_name = file_name.as_os_str().as_bytes();
530 let file_hash = format::hash_filename(file_name);
531
532 let file_offset = self.position();
533 self.encode_filename(file_name).await?;
534
535 let entry_offset = self.position();
536 self.encode_metadata(&metadata).await?;
537
538 let files_offset = self.position();
539
540 // the child will write to OUR state now:
541 let write_position = self.position();
542
543 let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
544
545 Ok(EncoderImpl {
546 // always forward as Borrowed(), to avoid stacking references on nested calls
547 output: self.output.to_borrowed(),
548 state: EncoderState {
549 entry_offset,
550 files_offset,
551 file_offset: Some(file_offset),
552 file_hash,
553 write_position,
554 ..Default::default()
555 },
556 parent: Some(&mut self.state),
557 finished: false,
558 file_copy_buffer,
559 })
560 }
561
562 async fn start_file_do(
563 &mut self,
564 metadata: Option<&Metadata>,
565 file_name: &[u8],
566 ) -> io::Result<()> {
567 self.encode_filename(file_name).await?;
568 if let Some(metadata) = metadata {
569 self.encode_metadata(&metadata).await?;
570 }
571 Ok(())
572 }
573
574 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
575 seq_write_pxar_struct_entry(
576 self.output.as_mut(),
577 format::PXAR_ENTRY,
578 metadata.stat.clone(),
579 &mut self.state.write_position,
580 )
581 .await?;
582
583 for xattr in &metadata.xattrs {
584 self.write_xattr(xattr).await?;
585 }
586
587 self.write_acls(&metadata.acl).await?;
588
589 if let Some(fcaps) = &metadata.fcaps {
590 self.write_file_capabilities(fcaps).await?;
591 }
592
593 if let Some(qpid) = &metadata.quota_project_id {
594 self.write_quota_project_id(qpid).await?;
595 }
596
597 Ok(())
598 }
599
600 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
601 seq_write_pxar_entry(
602 self.output.as_mut(),
603 format::PXAR_XATTR,
604 &xattr.data,
605 &mut self.state.write_position,
606 )
607 .await
608 }
609
610 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
611 for acl in &acl.users {
612 seq_write_pxar_struct_entry(
613 self.output.as_mut(),
614 format::PXAR_ACL_USER,
615 acl.clone(),
616 &mut self.state.write_position,
617 )
618 .await?;
619 }
620
621 for acl in &acl.groups {
622 seq_write_pxar_struct_entry(
623 self.output.as_mut(),
624 format::PXAR_ACL_GROUP,
625 acl.clone(),
626 &mut self.state.write_position,
627 )
628 .await?;
629 }
630
631 if let Some(acl) = &acl.group_obj {
632 seq_write_pxar_struct_entry(
633 self.output.as_mut(),
634 format::PXAR_ACL_GROUP_OBJ,
635 acl.clone(),
636 &mut self.state.write_position,
637 )
638 .await?;
639 }
640
641 if let Some(acl) = &acl.default {
642 seq_write_pxar_struct_entry(
643 self.output.as_mut(),
644 format::PXAR_ACL_DEFAULT,
645 acl.clone(),
646 &mut self.state.write_position,
647 )
648 .await?;
649 }
650
651 for acl in &acl.default_users {
652 seq_write_pxar_struct_entry(
653 self.output.as_mut(),
654 format::PXAR_ACL_DEFAULT_USER,
655 acl.clone(),
656 &mut self.state.write_position,
657 )
658 .await?;
659 }
660
661 for acl in &acl.default_groups {
662 seq_write_pxar_struct_entry(
663 self.output.as_mut(),
664 format::PXAR_ACL_DEFAULT_GROUP,
665 acl.clone(),
666 &mut self.state.write_position,
667 )
668 .await?;
669 }
670
671 Ok(())
672 }
673
674 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
675 seq_write_pxar_entry(
676 self.output.as_mut(),
677 format::PXAR_FCAPS,
678 &fcaps.data,
679 &mut self.state.write_position,
680 )
681 .await
682 }
683
684 async fn write_quota_project_id(
685 &mut self,
686 quota_project_id: &format::QuotaProjectId,
687 ) -> io::Result<()> {
688 seq_write_pxar_struct_entry(
689 self.output.as_mut(),
690 format::PXAR_QUOTA_PROJID,
691 *quota_project_id,
692 &mut self.state.write_position,
693 )
694 .await
695 }
696
697 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
698 crate::util::validate_filename(file_name)?;
699 seq_write_pxar_entry_zero(
700 self.output.as_mut(),
701 format::PXAR_FILENAME,
702 file_name,
703 &mut self.state.write_position,
704 )
705 .await
706 }
707
708 pub async fn finish(mut self) -> io::Result<()> {
709 let tail_bytes = self.finish_goodbye_table().await?;
710 seq_write_pxar_entry(
711 self.output.as_mut(),
712 format::PXAR_GOODBYE,
713 &tail_bytes,
714 &mut self.state.write_position,
715 )
716 .await?;
717
718 // done up here because of the self-borrow and to propagate
719 let end_offset = self.position();
720
721 if let Some(parent) = &mut self.parent {
722 parent.write_position = end_offset;
723
724 let file_offset = self
725 .state
726 .file_offset
727 .expect("internal error: parent set but no file_offset?");
728
729 parent.items.push(GoodbyeItem {
730 hash: self.state.file_hash,
731 offset: file_offset,
732 size: end_offset - file_offset,
733 });
734 }
735 self.finished = true;
736 Ok(())
737 }
738
739 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
740 let goodbye_offset = self.position();
741
742 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
743 let mut tail = take(&mut self.state.items);
744 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
745 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
746
747 // sort, then create a BST
748 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
749
750 let mut bst = Vec::with_capacity(tail.len() + 1);
751 unsafe {
752 bst.set_len(tail.len());
753 }
754 binary_tree_array::copy(tail.len(), |src, dest| {
755 let mut item = tail[src].clone();
756 // fixup the goodbye table offsets to be relative and with the right endianess
757 item.offset = goodbye_offset - item.offset;
758 unsafe {
759 std::ptr::write(&mut bst[dest], item.to_le());
760 }
761 });
762 drop(tail);
763
764 bst.push(
765 GoodbyeItem {
766 hash: format::PXAR_GOODBYE_TAIL_MARKER,
767 offset: goodbye_offset - self.state.entry_offset,
768 size: goodbye_size,
769 }
770 .to_le(),
771 );
772
773 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
774 // the items make sense:
775 let data = bst.as_mut_ptr() as *mut u8;
776 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
777 forget(bst);
778 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
779 }
780 }
781
782 /// Writer for a file object in a directory.
783 pub struct FileImpl<'a, S: SeqWrite> {
784 output: &'a mut S,
785
786 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
787 /// directly instead of on Drop of FileImpl?
788 goodbye_item: GoodbyeItem,
789
790 /// While writing data to this file, this is how much space we still have left, this must reach
791 /// exactly zero.
792 remaining_size: u64,
793
794 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
795 /// to, and where we insert our `GoodbyeItem`.
796 parent: &'a mut EncoderState,
797 }
798
799 impl<'a, S: SeqWrite> Drop for FileImpl<'a, S> {
800 fn drop(&mut self) {
801 if self.remaining_size != 0 {
802 self.parent.add_error(EncodeError::IncompleteFile);
803 }
804
805 self.parent.items.push(self.goodbye_item.clone());
806 }
807 }
808
809 impl<'a, S: SeqWrite> FileImpl<'a, S> {
810 /// Get the file offset to be able to reference it with `add_hardlink`.
811 pub fn file_offset(&self) -> LinkOffset {
812 LinkOffset(self.goodbye_item.offset)
813 }
814
815 fn check_remaining(&self, size: usize) -> io::Result<()> {
816 if size as u64 > self.remaining_size {
817 io_bail!("attempted to write more than previously allocated");
818 } else {
819 Ok(())
820 }
821 }
822
823 /// Poll write interface to more easily connect to tokio/futures.
824 #[cfg(feature = "tokio-io")]
825 pub fn poll_write(
826 self: Pin<&mut Self>,
827 cx: &mut Context,
828 data: &[u8],
829 ) -> Poll<io::Result<usize>> {
830 let this = self.get_mut();
831 this.check_remaining(data.len())?;
832 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
833 match output.poll_seq_write(cx, data) {
834 Poll::Ready(Ok(put)) => {
835 this.remaining_size -= put as u64;
836 this.parent.write_position += put as u64;
837 Poll::Ready(Ok(put))
838 }
839 other => other,
840 }
841 }
842
843 /// Poll flush interface to more easily connect to tokio/futures.
844 #[cfg(feature = "tokio-io")]
845 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
846 unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
847 }
848
849 /// Poll close/shutdown interface to more easily connect to tokio/futures.
850 ///
851 /// This just calls flush, though, since we're just a virtual writer writing to the file
852 /// provided by our encoder.
853 #[cfg(feature = "tokio-io")]
854 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
855 unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
856 }
857
858 /// Write file data for the current file entry in a pxar archive.
859 ///
860 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
861 /// requested. Check the return value for how many. There's also a `write_all` method available
862 /// for convenience.
863 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
864 self.check_remaining(data.len())?;
865 let put =
866 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
867 .await?;
868 //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
869 self.remaining_size -= put as u64;
870 self.parent.write_position += put as u64;
871 Ok(put)
872 }
873
874 /// Completely write file data for the current file entry in a pxar archive.
875 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
876 self.check_remaining(data.len())?;
877 seq_write_all(self.output, data, &mut self.parent.write_position).await?;
878 self.remaining_size -= data.len() as u64;
879 Ok(())
880 }
881 }
882
883 #[cfg(feature = "tokio-io")]
884 impl<'a, S: SeqWrite> tokio::io::AsyncWrite for FileImpl<'a, S> {
885 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
886 FileImpl::poll_write(self, cx, buf)
887 }
888
889 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
890 FileImpl::poll_flush(self, cx)
891 }
892
893 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
894 FileImpl::poll_close(self, cx)
895 }
896 }