]> git.proxmox.com Git - pxar.git/blob - src/encoder/mod.rs
cargo fmt
[pxar.git] / src / encoder / mod.rs
1 //! The `pxar` encoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 use std::io;
6 use std::mem::{forget, size_of, size_of_val, take};
7 use std::os::unix::ffi::OsStrExt;
8 use std::path::Path;
9 use std::pin::Pin;
10 use std::sync::{Arc, Mutex};
11 use std::task::{Context, Poll};
12
13 use endian_trait::Endian;
14
15 use crate::binary_tree_array;
16 use crate::decoder::{self, SeqRead};
17 use crate::format::{self, GoodbyeItem};
18 use crate::poll_fn::poll_fn;
19 use crate::Metadata;
20
21 pub mod aio;
22 pub mod sync;
23
24 #[doc(inline)]
25 pub use sync::Encoder;
26
27 /// File reference used to create hard links.
28 #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
29 pub struct LinkOffset(u64);
30
31 impl LinkOffset {
32 #[inline]
33 pub fn raw(self) -> u64 {
34 self.0
35 }
36 }
37
38 /// Sequential write interface used by the encoder's state machine.
39 ///
40 /// This is our internal writer trait which is available for `std::io::Write` types in the
41 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
42 /// wrapper.
43 pub trait SeqWrite {
44 fn poll_seq_write(
45 self: Pin<&mut Self>,
46 cx: &mut Context,
47 buf: &[u8],
48 ) -> Poll<io::Result<usize>>;
49
50 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
51
52 /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
53 /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
54 /// subdirectory in that would have a trait object pointing to the trait object, and so on.
55 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
56 where
57 Self: Sized,
58 {
59 self as &mut dyn SeqWrite
60 }
61 }
62
63 /// Allow using trait objects for generics taking a `SeqWrite`.
64 impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
65 fn poll_seq_write(
66 self: Pin<&mut Self>,
67 cx: &mut Context,
68 buf: &[u8],
69 ) -> Poll<io::Result<usize>> {
70 unsafe {
71 self.map_unchecked_mut(|this| &mut **this)
72 .poll_seq_write(cx, buf)
73 }
74 }
75
76 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
77 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
78 }
79
80 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
81 where
82 Self: Sized,
83 {
84 &mut **self
85 }
86 }
87
88 /// awaitable verison of `poll_seq_write`.
89 async fn seq_write<T: SeqWrite + ?Sized>(
90 output: &mut T,
91 buf: &[u8],
92 position: &mut u64,
93 ) -> io::Result<usize> {
94 let put =
95 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await?;
96 *position += put as u64;
97 Ok(put)
98 }
99
100 /// Write the entire contents of a buffer, handling short writes.
101 async fn seq_write_all<T: SeqWrite + ?Sized>(
102 output: &mut T,
103 mut buf: &[u8],
104 position: &mut u64,
105 ) -> io::Result<()> {
106 while !buf.is_empty() {
107 let got = seq_write(&mut *output, buf, &mut *position).await?;
108 buf = &buf[got..];
109 }
110 Ok(())
111 }
112
113 /// Write an endian-swappable struct.
114 async fn seq_write_struct<E: Endian, T>(
115 output: &mut T,
116 data: E,
117 position: &mut u64,
118 ) -> io::Result<()>
119 where
120 T: SeqWrite + ?Sized,
121 {
122 let data = data.to_le();
123 let buf =
124 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
125 seq_write_all(output, buf, position).await
126 }
127
128 /// Write a pxar entry.
129 async fn seq_write_pxar_entry<T>(
130 output: &mut T,
131 htype: u64,
132 data: &[u8],
133 position: &mut u64,
134 ) -> io::Result<()>
135 where
136 T: SeqWrite + ?Sized,
137 {
138 let header = format::Header::with_content_size(htype, data.len() as u64);
139 header.check_header_size()?;
140
141 seq_write_struct(&mut *output, header, &mut *position).await?;
142 seq_write_all(output, data, position).await
143 }
144
145 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
146 /// data buffer.
147 async fn seq_write_pxar_entry_zero<T>(
148 output: &mut T,
149 htype: u64,
150 data: &[u8],
151 position: &mut u64,
152 ) -> io::Result<()>
153 where
154 T: SeqWrite + ?Sized,
155 {
156 let header = format::Header::with_content_size(htype, 1 + data.len() as u64);
157 header.check_header_size()?;
158
159 seq_write_struct(&mut *output, header, &mut *position).await?;
160 seq_write_all(&mut *output, data, &mut *position).await?;
161 seq_write_all(output, &[0u8], position).await
162 }
163
164 /// Write a pxar entry consiting of an endian-swappable struct.
165 async fn seq_write_pxar_struct_entry<E, T>(
166 output: &mut T,
167 htype: u64,
168 data: E,
169 position: &mut u64,
170 ) -> io::Result<()>
171 where
172 T: SeqWrite + ?Sized,
173 E: Endian,
174 {
175 let data = data.to_le();
176 let buf =
177 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
178 seq_write_pxar_entry(output, htype, buf, position).await
179 }
180
181 /// Error conditions caused by wrong usage of this crate.
182 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
183 pub enum EncodeError {
184 /// The user dropped a `File` without without finishing writing all of its contents.
185 ///
186 /// This is required because the payload lengths is written out at the begining and decoding
187 /// requires there to follow the right amount of data.
188 IncompleteFile,
189
190 /// The user dropped a directory without finalizing it.
191 ///
192 /// Finalizing is required to build the goodbye table at the end of a directory.
193 IncompleteDirectory,
194 }
195
196 #[derive(Default)]
197 struct EncoderState {
198 /// Goodbye items for this directory, excluding the tail.
199 items: Vec<GoodbyeItem>,
200
201 /// User caused error conditions.
202 encode_error: Option<EncodeError>,
203
204 /// Offset of this directory's ENTRY.
205 entry_offset: u64,
206
207 /// Offset to this directory's first FILENAME.
208 files_offset: u64,
209
210 /// If this is a subdirectory, this points to the this directory's FILENAME.
211 file_offset: Option<u64>,
212
213 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
214 file_hash: u64,
215
216 /// We need to keep track how much we have written to get offsets.
217 write_position: u64,
218 }
219
220 impl EncoderState {
221 fn merge_error(&mut self, error: Option<EncodeError>) {
222 // one error is enough:
223 if self.encode_error.is_none() {
224 self.encode_error = error;
225 }
226 }
227
228 fn add_error(&mut self, error: EncodeError) {
229 self.merge_error(Some(error));
230 }
231 }
232
233 /// The encoder state machine implementation for a directory.
234 ///
235 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
236 /// synchronous or `async` I/O objects in as output.
237 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
238 output: Option<T>,
239 state: EncoderState,
240 parent: Option<&'a mut EncoderState>,
241 finished: bool,
242
243 /// Since only the "current" entry can be actively writing files, we share the file copy
244 /// buffer.
245 file_copy_buffer: Arc<Mutex<Vec<u8>>>,
246 }
247
248 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
249 fn drop(&mut self) {
250 if let Some(ref mut parent) = self.parent {
251 // propagate errors:
252 parent.merge_error(self.state.encode_error);
253 if !self.finished {
254 parent.add_error(EncodeError::IncompleteDirectory);
255 }
256 } else if !self.finished {
257 // FIXME: how do we deal with this?
258 // eprintln!("Encoder dropped without finishing!");
259 }
260 }
261 }
262
263 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
264 pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
265 if !metadata.is_dir() {
266 io_bail!("directory metadata must contain the directory mode flag");
267 }
268 let mut this = Self {
269 output: Some(output),
270 state: EncoderState::default(),
271 parent: None,
272 finished: false,
273 file_copy_buffer: Arc::new(Mutex::new(crate::util::vec_new(1024 * 1024))),
274 };
275
276 this.encode_metadata(metadata).await?;
277 this.state.files_offset = this.position();
278
279 Ok(this)
280 }
281
282 fn check(&self) -> io::Result<()> {
283 match self.state.encode_error {
284 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
285 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
286 None => Ok(()),
287 }
288 }
289
290 pub async fn create_file<'b>(
291 &'b mut self,
292 metadata: &Metadata,
293 file_name: &Path,
294 file_size: u64,
295 ) -> io::Result<FileImpl<'b>>
296 where
297 'a: 'b,
298 {
299 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
300 .await
301 }
302
303 async fn create_file_do<'b>(
304 &'b mut self,
305 metadata: &Metadata,
306 file_name: &[u8],
307 file_size: u64,
308 ) -> io::Result<FileImpl<'b>>
309 where
310 'a: 'b,
311 {
312 self.check()?;
313
314 let file_offset = self.position();
315 self.start_file_do(Some(metadata), file_name).await?;
316
317 let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
318 header.check_header_size()?;
319
320 seq_write_struct(
321 self.output.as_mut().unwrap(),
322 header,
323 &mut self.state.write_position,
324 )
325 .await?;
326
327 let payload_data_offset = self.position();
328
329 let meta_size = payload_data_offset - file_offset;
330
331 Ok(FileImpl {
332 output: self.output.as_mut().unwrap(),
333 goodbye_item: GoodbyeItem {
334 hash: format::hash_filename(file_name),
335 offset: file_offset,
336 size: file_size + meta_size,
337 },
338 remaining_size: file_size,
339 parent: &mut self.state,
340 })
341 }
342
343 /// Return a file offset usable with `add_hardlink`.
344 pub async fn add_file(
345 &mut self,
346 metadata: &Metadata,
347 file_name: &Path,
348 file_size: u64,
349 content: &mut dyn SeqRead,
350 ) -> io::Result<LinkOffset> {
351 let buf = Arc::clone(&self.file_copy_buffer);
352 let mut file = self.create_file(metadata, file_name, file_size).await?;
353 let mut buf = buf.lock().expect("failed to lock temporary buffer mutex");
354 loop {
355 let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
356 if got == 0 {
357 break;
358 } else {
359 file.write_all(&buf[..got]).await?;
360 }
361 }
362 Ok(file.file_offset())
363 }
364
365 /// Return a file offset usable with `add_hardlink`.
366 pub async fn add_symlink(
367 &mut self,
368 metadata: &Metadata,
369 file_name: &Path,
370 target: &Path,
371 ) -> io::Result<()> {
372 let target = target.as_os_str().as_bytes();
373 let mut data = Vec::with_capacity(target.len() + 1);
374 data.extend(target);
375 data.push(0);
376 let _ofs: LinkOffset = self
377 .add_file_entry(
378 Some(metadata),
379 file_name,
380 Some((format::PXAR_SYMLINK, &data)),
381 )
382 .await?;
383 Ok(())
384 }
385
386 /// Return a file offset usable with `add_hardlink`.
387 pub async fn add_hardlink(
388 &mut self,
389 file_name: &Path,
390 target: &Path,
391 target_offset: LinkOffset,
392 ) -> io::Result<()> {
393 let current_offset = self.position();
394 if current_offset <= target_offset.0 {
395 io_bail!("invalid hardlink offset, can only point to prior files");
396 }
397
398 let offset_bytes = (current_offset - target_offset.0).to_le_bytes();
399 let target_bytes = target.as_os_str().as_bytes();
400 let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len() + 1);
401 hardlink.extend(&offset_bytes);
402 hardlink.extend(target_bytes);
403 hardlink.push(0);
404 let _this_offset: LinkOffset = self
405 .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink)))
406 .await?;
407 Ok(())
408 }
409
410 /// Return a file offset usable with `add_hardlink`.
411 pub async fn add_device(
412 &mut self,
413 metadata: &Metadata,
414 file_name: &Path,
415 device: format::Device,
416 ) -> io::Result<()> {
417 if !metadata.is_device() {
418 io_bail!("entry added via add_device must have a device mode in its metadata");
419 }
420
421 let device = device.to_le();
422 let device = unsafe {
423 std::slice::from_raw_parts(
424 &device as *const format::Device as *const u8,
425 size_of::<format::Device>(),
426 )
427 };
428 let _ofs: LinkOffset = self
429 .add_file_entry(
430 Some(metadata),
431 file_name,
432 Some((format::PXAR_DEVICE, device)),
433 )
434 .await?;
435 Ok(())
436 }
437
438 /// Return a file offset usable with `add_hardlink`.
439 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
440 if !metadata.is_fifo() {
441 io_bail!("entry added via add_device must be of type fifo in its metadata");
442 }
443
444 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
445 Ok(())
446 }
447
448 /// Return a file offset usable with `add_hardlink`.
449 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
450 if !metadata.is_socket() {
451 io_bail!("entry added via add_device must be of type socket in its metadata");
452 }
453
454 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
455 Ok(())
456 }
457
458 /// Return a file offset usable with `add_hardlink`.
459 async fn add_file_entry(
460 &mut self,
461 metadata: Option<&Metadata>,
462 file_name: &Path,
463 entry_htype_data: Option<(u64, &[u8])>,
464 ) -> io::Result<LinkOffset> {
465 self.check()?;
466
467 let file_offset = self.position();
468
469 let file_name = file_name.as_os_str().as_bytes();
470
471 self.start_file_do(metadata, file_name).await?;
472 if let Some((htype, entry_data)) = entry_htype_data {
473 seq_write_pxar_entry(
474 self.output.as_mut().unwrap(),
475 htype,
476 entry_data,
477 &mut self.state.write_position,
478 )
479 .await?;
480 }
481
482 let end_offset = self.position();
483
484 self.state.items.push(GoodbyeItem {
485 hash: format::hash_filename(file_name),
486 offset: file_offset,
487 size: end_offset - file_offset,
488 });
489
490 Ok(LinkOffset(file_offset))
491 }
492
493 #[inline]
494 fn position(&mut self) -> u64 {
495 self.state.write_position
496 }
497
498 pub async fn create_directory<'b>(
499 &'b mut self,
500 file_name: &Path,
501 metadata: &Metadata,
502 ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>>
503 where
504 'a: 'b,
505 {
506 self.check()?;
507
508 if !metadata.is_dir() {
509 io_bail!("directory metadata must contain the directory mode flag");
510 }
511
512 let file_name = file_name.as_os_str().as_bytes();
513 let file_hash = format::hash_filename(file_name);
514
515 let file_offset = self.position();
516 self.encode_filename(file_name).await?;
517
518 let entry_offset = self.position();
519 self.encode_metadata(&metadata).await?;
520
521 let files_offset = self.position();
522
523 // the child will write to OUR state now:
524 let write_position = self.position();
525
526 Ok(EncoderImpl {
527 output: self.output.as_mut().map(SeqWrite::as_trait_object),
528 state: EncoderState {
529 entry_offset,
530 files_offset,
531 file_offset: Some(file_offset),
532 file_hash,
533 write_position,
534 ..Default::default()
535 },
536 parent: Some(&mut self.state),
537 finished: false,
538 file_copy_buffer: Arc::clone(&self.file_copy_buffer),
539 })
540 }
541
542 async fn start_file_do(
543 &mut self,
544 metadata: Option<&Metadata>,
545 file_name: &[u8],
546 ) -> io::Result<()> {
547 self.encode_filename(file_name).await?;
548 if let Some(metadata) = metadata {
549 self.encode_metadata(&metadata).await?;
550 }
551 Ok(())
552 }
553
554 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
555 seq_write_pxar_struct_entry(
556 self.output.as_mut().unwrap(),
557 format::PXAR_ENTRY,
558 metadata.stat.clone(),
559 &mut self.state.write_position,
560 )
561 .await?;
562
563 for xattr in &metadata.xattrs {
564 self.write_xattr(xattr).await?;
565 }
566
567 self.write_acls(&metadata.acl).await?;
568
569 if let Some(fcaps) = &metadata.fcaps {
570 self.write_file_capabilities(fcaps).await?;
571 }
572
573 if let Some(qpid) = &metadata.quota_project_id {
574 self.write_quota_project_id(qpid).await?;
575 }
576
577 Ok(())
578 }
579
580 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
581 seq_write_pxar_entry(
582 self.output.as_mut().unwrap(),
583 format::PXAR_XATTR,
584 &xattr.data,
585 &mut self.state.write_position,
586 )
587 .await
588 }
589
590 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
591 for acl in &acl.users {
592 seq_write_pxar_struct_entry(
593 self.output.as_mut().unwrap(),
594 format::PXAR_ACL_USER,
595 acl.clone(),
596 &mut self.state.write_position,
597 )
598 .await?;
599 }
600
601 for acl in &acl.groups {
602 seq_write_pxar_struct_entry(
603 self.output.as_mut().unwrap(),
604 format::PXAR_ACL_GROUP,
605 acl.clone(),
606 &mut self.state.write_position,
607 )
608 .await?;
609 }
610
611 if let Some(acl) = &acl.group_obj {
612 seq_write_pxar_struct_entry(
613 self.output.as_mut().unwrap(),
614 format::PXAR_ACL_GROUP_OBJ,
615 acl.clone(),
616 &mut self.state.write_position,
617 )
618 .await?;
619 }
620
621 if let Some(acl) = &acl.default {
622 seq_write_pxar_struct_entry(
623 self.output.as_mut().unwrap(),
624 format::PXAR_ACL_DEFAULT,
625 acl.clone(),
626 &mut self.state.write_position,
627 )
628 .await?;
629 }
630
631 for acl in &acl.default_users {
632 seq_write_pxar_struct_entry(
633 self.output.as_mut().unwrap(),
634 format::PXAR_ACL_DEFAULT_USER,
635 acl.clone(),
636 &mut self.state.write_position,
637 )
638 .await?;
639 }
640
641 for acl in &acl.default_groups {
642 seq_write_pxar_struct_entry(
643 self.output.as_mut().unwrap(),
644 format::PXAR_ACL_DEFAULT_GROUP,
645 acl.clone(),
646 &mut self.state.write_position,
647 )
648 .await?;
649 }
650
651 Ok(())
652 }
653
654 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
655 seq_write_pxar_entry(
656 self.output.as_mut().unwrap(),
657 format::PXAR_FCAPS,
658 &fcaps.data,
659 &mut self.state.write_position,
660 )
661 .await
662 }
663
664 async fn write_quota_project_id(
665 &mut self,
666 quota_project_id: &format::QuotaProjectId,
667 ) -> io::Result<()> {
668 seq_write_pxar_struct_entry(
669 self.output.as_mut().unwrap(),
670 format::PXAR_QUOTA_PROJID,
671 *quota_project_id,
672 &mut self.state.write_position,
673 )
674 .await
675 }
676
677 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
678 crate::util::validate_filename(file_name)?;
679 seq_write_pxar_entry_zero(
680 self.output.as_mut().unwrap(),
681 format::PXAR_FILENAME,
682 file_name,
683 &mut self.state.write_position,
684 )
685 .await
686 }
687
688 pub async fn finish(mut self) -> io::Result<T> {
689 let tail_bytes = self.finish_goodbye_table().await?;
690 seq_write_pxar_entry(
691 self.output.as_mut().unwrap(),
692 format::PXAR_GOODBYE,
693 &tail_bytes,
694 &mut self.state.write_position,
695 )
696 .await?;
697
698 // done up here because of the self-borrow and to propagate
699 let end_offset = self.position();
700
701 if let Some(parent) = &mut self.parent {
702 parent.write_position = end_offset;
703
704 let file_offset = self
705 .state
706 .file_offset
707 .expect("internal error: parent set but no file_offset?");
708
709 parent.items.push(GoodbyeItem {
710 hash: self.state.file_hash,
711 offset: file_offset,
712 size: end_offset - file_offset,
713 });
714 }
715 self.finished = true;
716 Ok(self.output.take().unwrap())
717 }
718
719 pub fn into_writer(mut self) -> T {
720 self.output.take().unwrap()
721 }
722
723 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
724 let goodbye_offset = self.position();
725
726 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
727 let mut tail = take(&mut self.state.items);
728 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
729 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
730
731 // sort, then create a BST
732 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
733
734 let mut bst = Vec::with_capacity(tail.len() + 1);
735 unsafe {
736 bst.set_len(tail.len());
737 }
738 binary_tree_array::copy(tail.len(), |src, dest| {
739 let mut item = tail[src].clone();
740 // fixup the goodbye table offsets to be relative and with the right endianess
741 item.offset = goodbye_offset - item.offset;
742 unsafe {
743 std::ptr::write(&mut bst[dest], item.to_le());
744 }
745 });
746 drop(tail);
747
748 bst.push(
749 GoodbyeItem {
750 hash: format::PXAR_GOODBYE_TAIL_MARKER,
751 offset: goodbye_offset - self.state.entry_offset,
752 size: goodbye_size,
753 }
754 .to_le(),
755 );
756
757 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
758 // the items make sense:
759 let data = bst.as_mut_ptr() as *mut u8;
760 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
761 forget(bst);
762 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
763 }
764 }
765
766 /// Writer for a file object in a directory.
767 pub struct FileImpl<'a> {
768 output: &'a mut dyn SeqWrite,
769
770 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
771 /// directly instead of on Drop of FileImpl?
772 goodbye_item: GoodbyeItem,
773
774 /// While writing data to this file, this is how much space we still have left, this must reach
775 /// exactly zero.
776 remaining_size: u64,
777
778 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
779 /// to, and where we insert our `GoodbyeItem`.
780 parent: &'a mut EncoderState,
781 }
782
783 impl<'a> Drop for FileImpl<'a> {
784 fn drop(&mut self) {
785 if self.remaining_size != 0 {
786 self.parent.add_error(EncodeError::IncompleteFile);
787 }
788
789 self.parent.items.push(self.goodbye_item.clone());
790 }
791 }
792
793 impl<'a> FileImpl<'a> {
794 /// Get the file offset to be able to reference it with `add_hardlink`.
795 pub fn file_offset(&self) -> LinkOffset {
796 LinkOffset(self.goodbye_item.offset)
797 }
798
799 fn check_remaining(&self, size: usize) -> io::Result<()> {
800 if size as u64 > self.remaining_size {
801 io_bail!("attempted to write more than previously allocated");
802 } else {
803 Ok(())
804 }
805 }
806
807 /// Poll write interface to more easily connect to tokio/futures.
808 #[cfg(feature = "tokio-io")]
809 pub fn poll_write(
810 self: Pin<&mut Self>,
811 cx: &mut Context,
812 data: &[u8],
813 ) -> Poll<io::Result<usize>> {
814 let this = self.get_mut();
815 this.check_remaining(data.len())?;
816 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
817 match output.poll_seq_write(cx, data) {
818 Poll::Ready(Ok(put)) => {
819 this.remaining_size -= put as u64;
820 this.parent.write_position += put as u64;
821 Poll::Ready(Ok(put))
822 }
823 other => other,
824 }
825 }
826
827 /// Poll flush interface to more easily connect to tokio/futures.
828 #[cfg(feature = "tokio-io")]
829 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
830 unsafe {
831 self.map_unchecked_mut(|this| &mut this.output)
832 .poll_flush(cx)
833 }
834 }
835
836 /// Poll close/shutdown interface to more easily connect to tokio/futures.
837 ///
838 /// This just calls flush, though, since we're just a virtual writer writing to the file
839 /// provided by our encoder.
840 #[cfg(feature = "tokio-io")]
841 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
842 unsafe {
843 self.map_unchecked_mut(|this| &mut this.output)
844 .poll_flush(cx)
845 }
846 }
847
848 /// Write file data for the current file entry in a pxar archive.
849 ///
850 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
851 /// requested. Check the return value for how many. There's also a `write_all` method available
852 /// for convenience.
853 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
854 self.check_remaining(data.len())?;
855 let put =
856 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
857 .await?;
858 //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
859 self.remaining_size -= put as u64;
860 self.parent.write_position += put as u64;
861 Ok(put)
862 }
863
864 /// Completely write file data for the current file entry in a pxar archive.
865 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
866 self.check_remaining(data.len())?;
867 seq_write_all(&mut self.output, data, &mut self.parent.write_position).await?;
868 self.remaining_size -= data.len() as u64;
869 Ok(())
870 }
871 }
872
873 #[cfg(feature = "tokio-io")]
874 impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
875 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
876 FileImpl::poll_write(self, cx, buf)
877 }
878
879 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
880 FileImpl::poll_flush(self, cx)
881 }
882
883 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
884 FileImpl::poll_close(self, cx)
885 }
886 }