]> git.proxmox.com Git - pxar.git/blob - src/encoder/mod.rs
encoder: fix contents of hardlinks
[pxar.git] / src / encoder / mod.rs
1 //! The `pxar` encoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 use std::io;
6 use std::mem::{forget, size_of, size_of_val, take};
7 use std::os::unix::ffi::OsStrExt;
8 use std::path::Path;
9 use std::pin::Pin;
10 use std::task::{Context, Poll};
11
12 use endian_trait::Endian;
13
14 use crate::binary_tree_array;
15 use crate::decoder::{self, SeqRead};
16 use crate::format::{self, GoodbyeItem};
17 use crate::poll_fn::poll_fn;
18 use crate::Metadata;
19
20 pub mod aio;
21 pub mod sync;
22
23 #[doc(inline)]
24 pub use sync::Encoder;
25
26 /// File reference used to create hard links.
27 #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
28 pub struct LinkOffset(u64);
29
30 impl LinkOffset {
31 #[inline]
32 pub fn raw(self) -> u64 {
33 self.0
34 }
35 }
36
37 /// Sequential write interface used by the encoder's state machine.
38 ///
39 /// This is our internal writer trait which is available for `std::io::Write` types in the
40 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
41 /// wrapper.
42 pub trait SeqWrite {
43 fn poll_seq_write(
44 self: Pin<&mut Self>,
45 cx: &mut Context,
46 buf: &[u8],
47 ) -> Poll<io::Result<usize>>;
48
49 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
50
51 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
52
53 /// While writing to a pxar archive we need to remember how much dat we've written to track some
54 /// offsets. Particularly items like the goodbye table need to be able to compute offsets to
55 /// further back in the archive.
56 fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>>;
57
58 /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
59 /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
60 /// subdirectory in that would have a trait object pointing to the trait object, and so on.
61 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
62 where
63 Self: Sized,
64 {
65 self as &mut dyn SeqWrite
66 }
67 }
68
69 /// Allow using trait objects for generics taking a `SeqWrite`.
70 impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
71 fn poll_seq_write(
72 self: Pin<&mut Self>,
73 cx: &mut Context,
74 buf: &[u8],
75 ) -> Poll<io::Result<usize>> {
76 unsafe {
77 self.map_unchecked_mut(|this| &mut **this)
78 .poll_seq_write(cx, buf)
79 }
80 }
81
82 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
83 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
84 }
85
86 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
87 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_close(cx) }
88 }
89
90 fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
91 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_position(cx) }
92 }
93
94 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
95 where
96 Self: Sized,
97 {
98 &mut **self
99 }
100 }
101
102 /// awaitable version of `poll_position`.
103 async fn seq_write_position<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<u64> {
104 poll_fn(move |cx| unsafe { Pin::new_unchecked(&mut *output).poll_position(cx) }).await
105 }
106
107 /// awaitable verison of `poll_seq_write`.
108 async fn seq_write<T: SeqWrite + ?Sized>(output: &mut T, buf: &[u8]) -> io::Result<usize> {
109 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await
110 }
111
112 /// Write the entire contents of a buffer, handling short writes.
113 async fn seq_write_all<T: SeqWrite + ?Sized>(output: &mut T, mut buf: &[u8]) -> io::Result<()> {
114 while !buf.is_empty() {
115 let got = seq_write(&mut *output, buf).await?;
116 buf = &buf[got..];
117 }
118 Ok(())
119 }
120
121 /// Write an endian-swappable struct.
122 async fn seq_write_struct<E: Endian, T>(output: &mut T, data: E) -> io::Result<()>
123 where
124 T: SeqWrite + ?Sized,
125 {
126 let data = data.to_le();
127 seq_write_all(output, unsafe {
128 std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
129 })
130 .await
131 }
132
133 /// Write a pxar entry.
134 async fn seq_write_pxar_entry<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
135 where
136 T: SeqWrite + ?Sized,
137 {
138 seq_write_struct(
139 &mut *output,
140 format::Header::with_content_size(htype, data.len() as u64),
141 )
142 .await?;
143 seq_write_all(output, data).await
144 }
145
146 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
147 /// data buffer.
148 async fn seq_write_pxar_entry_zero<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
149 where
150 T: SeqWrite + ?Sized,
151 {
152 seq_write_struct(
153 &mut *output,
154 format::Header::with_content_size(htype, 1 + data.len() as u64),
155 )
156 .await?;
157 seq_write_all(&mut *output, data).await?;
158 seq_write_all(output, &[0u8]).await
159 }
160
161 /// Write a pxar entry consiting of an endian-swappable struct.
162 async fn seq_write_pxar_struct_entry<E, T>(output: &mut T, htype: u64, data: E) -> io::Result<()>
163 where
164 T: SeqWrite + ?Sized,
165 E: Endian,
166 {
167 let data = data.to_le();
168 seq_write_pxar_entry(output, htype, unsafe {
169 std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
170 })
171 .await
172 }
173
174 /// Error conditions caused by wrong usage of this crate.
175 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
176 pub enum EncodeError {
177 /// The user dropped a `File` without without finishing writing all of its contents.
178 ///
179 /// This is required because the payload lengths is written out at the begining and decoding
180 /// requires there to follow the right amount of data.
181 IncompleteFile,
182
183 /// The user dropped a directory without finalizing it.
184 ///
185 /// Finalizing is required to build the goodbye table at the end of a directory.
186 IncompleteDirectory,
187 }
188
189 #[derive(Default)]
190 struct EncoderState {
191 /// Goodbye items for this directory, excluding the tail.
192 items: Vec<GoodbyeItem>,
193
194 /// User caused error conditions.
195 encode_error: Option<EncodeError>,
196
197 /// Offset of this directory's ENTRY.
198 entry_offset: u64,
199
200 /// Offset to this directory's first FILENAME.
201 files_offset: u64,
202
203 /// If this is a subdirectory, this points to the this directory's FILENAME.
204 file_offset: Option<u64>,
205
206 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
207 file_hash: u64,
208 }
209
210 impl EncoderState {
211 fn merge_error(&mut self, error: Option<EncodeError>) {
212 // one error is enough:
213 if self.encode_error.is_none() {
214 self.encode_error = error;
215 }
216 }
217
218 fn add_error(&mut self, error: EncodeError) {
219 self.merge_error(Some(error));
220 }
221 }
222
223 /// The encoder state machine implementation for a directory.
224 ///
225 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
226 /// synchronous or `async` I/O objects in as output.
227 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
228 output: T,
229 state: EncoderState,
230 parent: Option<&'a mut EncoderState>,
231 finished: bool,
232 }
233
234 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
235 fn drop(&mut self) {
236 if let Some(ref mut parent) = self.parent {
237 // propagate errors:
238 parent.merge_error(self.state.encode_error);
239 if !self.finished {
240 parent.add_error(EncodeError::IncompleteDirectory);
241 }
242 } else if !self.finished {
243 // FIXME: how do we deal with this?
244 // eprintln!("Encoder dropped without finishing!");
245 }
246 }
247 }
248
249 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
250 pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
251 if !metadata.is_dir() {
252 io_bail!("directory metadata must contain the directory mode flag");
253 }
254 let mut this = Self {
255 output,
256 state: EncoderState::default(),
257 parent: None,
258 finished: false,
259 };
260
261 this.encode_metadata(metadata).await?;
262 this.state.files_offset = seq_write_position(&mut this.output).await?;
263
264 Ok(this)
265 }
266
267 fn check(&self) -> io::Result<()> {
268 match self.state.encode_error {
269 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
270 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
271 None => Ok(()),
272 }
273 }
274
275 pub async fn create_file<'b>(
276 &'b mut self,
277 metadata: &Metadata,
278 file_name: &Path,
279 file_size: u64,
280 ) -> io::Result<FileImpl<'b>>
281 where
282 'a: 'b,
283 {
284 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
285 .await
286 }
287
288 async fn create_file_do<'b>(
289 &'b mut self,
290 metadata: &Metadata,
291 file_name: &[u8],
292 file_size: u64,
293 ) -> io::Result<FileImpl<'b>>
294 where
295 'a: 'b,
296 {
297 self.check()?;
298
299 let file_offset = seq_write_position(&mut self.output).await?;
300 self.start_file_do(Some(metadata), file_name).await?;
301
302 seq_write_struct(
303 &mut self.output,
304 format::Header::with_content_size(format::PXAR_PAYLOAD, file_size),
305 )
306 .await?;
307
308 let payload_data_offset = seq_write_position(&mut self.output).await?;
309
310 let meta_size = payload_data_offset - file_offset;
311
312 Ok(FileImpl {
313 output: &mut self.output,
314 goodbye_item: GoodbyeItem {
315 hash: format::hash_filename(file_name),
316 offset: file_offset,
317 size: file_size + meta_size,
318 },
319 remaining_size: file_size,
320 parent: &mut self.state,
321 })
322 }
323
324 /// Return a file offset usable with `add_hardlink`.
325 pub async fn add_file(
326 &mut self,
327 metadata: &Metadata,
328 file_name: &Path,
329 file_size: u64,
330 content: &mut dyn SeqRead,
331 ) -> io::Result<LinkOffset> {
332 let mut file = self.create_file(metadata, file_name, file_size).await?;
333 let mut buf = crate::util::vec_new(4096);
334 loop {
335 let got = decoder::seq_read(&mut *content, &mut buf).await?;
336 if got == 0 {
337 break;
338 } else {
339 file.write_all(&buf[..got]).await?;
340 }
341 }
342 Ok(file.file_offset())
343 }
344
345 /// Return a file offset usable with `add_hardlink`.
346 pub async fn add_symlink(
347 &mut self,
348 metadata: &Metadata,
349 file_name: &Path,
350 target: &Path,
351 ) -> io::Result<()> {
352 let _ofs: LinkOffset = self
353 .add_file_entry(
354 Some(metadata),
355 file_name,
356 Some((format::PXAR_SYMLINK, target.as_os_str().as_bytes())),
357 )
358 .await?;
359 Ok(())
360 }
361
362 /// Return a file offset usable with `add_hardlink`.
363 pub async fn add_hardlink(
364 &mut self,
365 file_name: &Path,
366 target: &Path,
367 target_offset: LinkOffset,
368 ) -> io::Result<()> {
369 let current_offset = seq_write_position(&mut self.output).await?;
370 if current_offset <= target_offset.0 {
371 io_bail!("invalid hardlink offset, can only point to prior files");
372 }
373
374 let offset_bytes = (current_offset - target_offset.0).to_le_bytes();
375 let target_bytes = target.as_os_str().as_bytes();
376 let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len());
377 hardlink.extend(&offset_bytes);
378 hardlink.extend(target_bytes);
379 let _this_offset: LinkOffset = self
380 .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink)))
381 .await?;
382 Ok(())
383 }
384
385 /// Return a file offset usable with `add_hardlink`.
386 pub async fn add_device(
387 &mut self,
388 metadata: &Metadata,
389 file_name: &Path,
390 device: format::Device,
391 ) -> io::Result<()> {
392 if !metadata.is_device() {
393 io_bail!("entry added via add_device must have a device mode in its metadata");
394 }
395
396 let device = device.to_le();
397 let device = unsafe {
398 std::slice::from_raw_parts(
399 &device as *const format::Device as *const u8,
400 size_of::<format::Device>(),
401 )
402 };
403 let _ofs: LinkOffset = self
404 .add_file_entry(
405 Some(metadata),
406 file_name,
407 Some((format::PXAR_DEVICE, device)),
408 )
409 .await?;
410 Ok(())
411 }
412
413 /// Return a file offset usable with `add_hardlink`.
414 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
415 if !metadata.is_fifo() {
416 io_bail!("entry added via add_device must be of type fifo in its metadata");
417 }
418
419 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
420 Ok(())
421 }
422
423 /// Return a file offset usable with `add_hardlink`.
424 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
425 if !metadata.is_socket() {
426 io_bail!("entry added via add_device must be of type socket in its metadata");
427 }
428
429 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
430 Ok(())
431 }
432
433 /// Return a file offset usable with `add_hardlink`.
434 async fn add_file_entry(
435 &mut self,
436 metadata: Option<&Metadata>,
437 file_name: &Path,
438 entry_htype_data: Option<(u64, &[u8])>,
439 ) -> io::Result<LinkOffset> {
440 self.check()?;
441
442 let file_offset = seq_write_position(&mut self.output).await?;
443
444 let file_name = file_name.as_os_str().as_bytes();
445
446 self.start_file_do(metadata, file_name).await?;
447 if let Some((htype, entry_data)) = entry_htype_data {
448 seq_write_pxar_entry_zero(&mut self.output, htype, entry_data).await?;
449 }
450
451 let end_offset = seq_write_position(&mut self.output).await?;
452
453 self.state.items.push(GoodbyeItem {
454 hash: format::hash_filename(file_name),
455 offset: file_offset,
456 size: end_offset - file_offset,
457 });
458
459 Ok(LinkOffset(file_offset))
460 }
461
462 /// Helper
463 #[inline]
464 async fn position(&mut self) -> io::Result<u64> {
465 seq_write_position(&mut self.output).await
466 }
467
468 pub async fn create_directory<'b>(
469 &'b mut self,
470 file_name: &Path,
471 metadata: &Metadata,
472 ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>>
473 where
474 'a: 'b,
475 {
476 self.check()?;
477
478 if !metadata.is_dir() {
479 io_bail!("directory metadata must contain the directory mode flag");
480 }
481
482 let file_name = file_name.as_os_str().as_bytes();
483 let file_hash = format::hash_filename(file_name);
484
485 let file_offset = self.position().await?;
486 self.encode_filename(file_name).await?;
487
488 let entry_offset = self.position().await?;
489 self.encode_metadata(&metadata).await?;
490
491 let files_offset = self.position().await?;
492
493 Ok(EncoderImpl {
494 output: self.output.as_trait_object(),
495 state: EncoderState {
496 entry_offset,
497 files_offset,
498 file_offset: Some(file_offset),
499 file_hash: file_hash,
500 ..Default::default()
501 },
502 parent: Some(&mut self.state),
503 finished: false,
504 })
505 }
506
507 async fn start_file_do(
508 &mut self,
509 metadata: Option<&Metadata>,
510 file_name: &[u8],
511 ) -> io::Result<()> {
512 self.encode_filename(file_name).await?;
513 if let Some(metadata) = metadata {
514 self.encode_metadata(&metadata).await?;
515 }
516 Ok(())
517 }
518
519 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
520 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ENTRY, metadata.stat.clone())
521 .await?;
522
523 for xattr in &metadata.xattrs {
524 self.write_xattr(xattr).await?;
525 }
526
527 self.write_acls(&metadata.acl).await?;
528
529 if let Some(fcaps) = &metadata.fcaps {
530 self.write_file_capabilities(fcaps).await?;
531 }
532
533 if let Some(qpid) = &metadata.quota_project_id {
534 self.write_quota_project_id(qpid).await?;
535 }
536
537 Ok(())
538 }
539
540 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
541 seq_write_pxar_entry(&mut self.output, format::PXAR_XATTR, &xattr.data).await
542 }
543
544 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
545 for acl in &acl.users {
546 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_USER, acl.clone())
547 .await?;
548 }
549
550 for acl in &acl.groups {
551 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP, acl.clone())
552 .await?;
553 }
554
555 if let Some(acl) = &acl.group_obj {
556 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP_OBJ, acl.clone())
557 .await?;
558 }
559
560 if let Some(acl) = &acl.default {
561 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_DEFAULT, acl.clone())
562 .await?;
563 }
564
565 for acl in &acl.default_users {
566 seq_write_pxar_struct_entry(
567 &mut self.output,
568 format::PXAR_ACL_DEFAULT_USER,
569 acl.clone(),
570 )
571 .await?;
572 }
573
574 for acl in &acl.default_groups {
575 seq_write_pxar_struct_entry(
576 &mut self.output,
577 format::PXAR_ACL_DEFAULT_GROUP,
578 acl.clone(),
579 )
580 .await?;
581 }
582
583 Ok(())
584 }
585
586 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
587 seq_write_pxar_entry(&mut self.output, format::PXAR_FCAPS, &fcaps.data).await
588 }
589
590 async fn write_quota_project_id(
591 &mut self,
592 quota_project_id: &format::QuotaProjectId,
593 ) -> io::Result<()> {
594 seq_write_pxar_struct_entry(
595 &mut self.output,
596 format::PXAR_QUOTA_PROJID,
597 quota_project_id.clone(),
598 )
599 .await
600 }
601
602 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
603 seq_write_pxar_entry_zero(&mut self.output, format::PXAR_FILENAME, file_name).await
604 }
605
606 pub async fn finish(mut self) -> io::Result<()> {
607 let tail_bytes = self.finish_goodbye_table().await?;
608 seq_write_pxar_entry(&mut self.output, format::PXAR_GOODBYE, &tail_bytes).await?;
609 if let Some(parent) = &mut self.parent {
610 let file_offset = self
611 .state
612 .file_offset
613 .expect("internal error: parent set but no file_offset?");
614
615 let end_offset = seq_write_position(&mut self.output).await?;
616
617 parent.items.push(GoodbyeItem {
618 hash: self.state.file_hash,
619 offset: file_offset,
620 size: end_offset - file_offset,
621 });
622 }
623 self.finished = true;
624 Ok(())
625 }
626
627 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
628 let goodbye_offset = seq_write_position(&mut self.output).await?;
629
630 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
631 let mut tail = take(&mut self.state.items);
632 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
633 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
634
635 // sort, then create a BST
636 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
637
638 let mut bst = Vec::with_capacity(tail.len() + 1);
639 unsafe {
640 bst.set_len(tail.len());
641 }
642 binary_tree_array::copy(tail.len(), |src, dest| {
643 let mut item = tail[src].clone();
644 // fixup the goodbye table offsets to be relative and with the right endianess
645 item.offset = goodbye_offset - item.offset;
646 unsafe {
647 std::ptr::write(&mut bst[dest], item.to_le());
648 }
649 });
650 drop(tail);
651
652 bst.push(
653 GoodbyeItem {
654 hash: format::PXAR_GOODBYE_TAIL_MARKER,
655 offset: goodbye_offset - self.state.entry_offset,
656 size: goodbye_size,
657 }
658 .to_le(),
659 );
660
661 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
662 // the items make sense:
663 let data = bst.as_mut_ptr() as *mut u8;
664 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
665 forget(bst);
666 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
667 }
668 }
669
670 /// Writer for a file object in a directory.
671 pub struct FileImpl<'a> {
672 output: &'a mut dyn SeqWrite,
673
674 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
675 /// directly instead of on Drop of FileImpl?
676 goodbye_item: GoodbyeItem,
677
678 /// While writing data to this file, this is how much space we still have left, this must reach
679 /// exactly zero.
680 remaining_size: u64,
681
682 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
683 /// to, and where we insert our `GoodbyeItem`.
684 parent: &'a mut EncoderState,
685 }
686
687 impl<'a> Drop for FileImpl<'a> {
688 fn drop(&mut self) {
689 if self.remaining_size != 0 {
690 self.parent.add_error(EncodeError::IncompleteFile);
691 }
692
693 self.parent.items.push(self.goodbye_item.clone());
694 }
695 }
696
697 impl<'a> FileImpl<'a> {
698 /// Get the file offset to be able to reference it with `add_hardlink`.
699 pub fn file_offset(&self) -> LinkOffset {
700 LinkOffset(self.goodbye_item.offset)
701 }
702
703 fn check_remaining(&self, size: usize) -> io::Result<()> {
704 if size as u64 > self.remaining_size {
705 io_bail!("attempted to write more than previously allocated");
706 } else {
707 Ok(())
708 }
709 }
710
711 /// Poll write interface to more easily connect to tokio/futures.
712 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
713 pub fn poll_write(
714 self: Pin<&mut Self>,
715 cx: &mut Context,
716 data: &[u8],
717 ) -> Poll<io::Result<usize>> {
718 let this = self.get_mut();
719 this.check_remaining(data.len())?;
720 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
721 match output.poll_seq_write(cx, data) {
722 Poll::Ready(Ok(put)) => {
723 this.remaining_size -= put as u64;
724 Poll::Ready(Ok(put))
725 }
726 other => other,
727 }
728 }
729
730 /// Poll flush interface to more easily connect to tokio/futures.
731 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
732 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
733 unsafe {
734 self.map_unchecked_mut(|this| &mut this.output)
735 .poll_flush(cx)
736 }
737 }
738
739 /// Poll close/shutdown interface to more easily connect to tokio/futures.
740 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
741 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
742 unsafe {
743 self.map_unchecked_mut(|this| &mut this.output)
744 .poll_close(cx)
745 }
746 }
747
748 /// Write file data for the current file entry in a pxar archive.
749 ///
750 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
751 /// requested. Check the return value for how many. There's also a `write_all` method available
752 /// for convenience.
753 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
754 self.check_remaining(data.len())?;
755 let put = seq_write(&mut self.output, data).await?;
756 self.remaining_size -= put as u64;
757 Ok(put)
758 }
759
760 /// Completely write file data for the current file entry in a pxar archive.
761 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
762 self.check_remaining(data.len())?;
763 seq_write_all(&mut self.output, data).await?;
764 self.remaining_size -= data.len() as u64;
765 Ok(())
766 }
767 }
768
769 #[cfg(feature = "tokio-io")]
770 impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
771 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
772 FileImpl::poll_write(self, cx, buf)
773 }
774
775 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
776 FileImpl::poll_flush(self, cx)
777 }
778
779 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
780 FileImpl::poll_close(self, cx)
781 }
782 }
783
784 #[cfg(feature = "futures-io")]
785 impl<'a> futures::io::AsyncWrite for FileImpl<'a> {
786 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
787 FileImpl::poll_write(self, cx, buf)
788 }
789
790 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
791 FileImpl::poll_flush(self, cx)
792 }
793
794 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
795 FileImpl::poll_close(self, cx)
796 }
797 }