]> git.proxmox.com Git - pxar.git/blob - src/encoder/mod.rs
encoder: use relative hardlink offsets
[pxar.git] / src / encoder / mod.rs
1 //! The `pxar` encoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 use std::io;
6 use std::mem::{forget, size_of, size_of_val, take};
7 use std::os::unix::ffi::OsStrExt;
8 use std::path::Path;
9 use std::pin::Pin;
10 use std::task::{Context, Poll};
11
12 use endian_trait::Endian;
13
14 use crate::binary_tree_array;
15 use crate::decoder::{self, SeqRead};
16 use crate::format::{self, GoodbyeItem};
17 use crate::poll_fn::poll_fn;
18 use crate::Metadata;
19
20 pub mod aio;
21 pub mod sync;
22
23 #[doc(inline)]
24 pub use sync::Encoder;
25
26 /// File reference used to create hard links.
27 #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
28 pub struct LinkOffset(u64);
29
30 impl LinkOffset {
31 #[inline]
32 pub fn raw(self) -> u64 {
33 self.0
34 }
35 }
36
37 /// Sequential write interface used by the encoder's state machine.
38 ///
39 /// This is our internal writer trait which is available for `std::io::Write` types in the
40 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
41 /// wrapper.
42 pub trait SeqWrite {
43 fn poll_seq_write(
44 self: Pin<&mut Self>,
45 cx: &mut Context,
46 buf: &[u8],
47 ) -> Poll<io::Result<usize>>;
48
49 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
50
51 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
52
53 /// While writing to a pxar archive we need to remember how much dat we've written to track some
54 /// offsets. Particularly items like the goodbye table need to be able to compute offsets to
55 /// further back in the archive.
56 fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>>;
57
58 /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
59 /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
60 /// subdirectory in that would have a trait object pointing to the trait object, and so on.
61 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
62 where
63 Self: Sized,
64 {
65 self as &mut dyn SeqWrite
66 }
67 }
68
69 /// Allow using trait objects for generics taking a `SeqWrite`.
70 impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
71 fn poll_seq_write(
72 self: Pin<&mut Self>,
73 cx: &mut Context,
74 buf: &[u8],
75 ) -> Poll<io::Result<usize>> {
76 unsafe {
77 self.map_unchecked_mut(|this| &mut **this)
78 .poll_seq_write(cx, buf)
79 }
80 }
81
82 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
83 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
84 }
85
86 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
87 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_close(cx) }
88 }
89
90 fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
91 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_position(cx) }
92 }
93
94 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
95 where
96 Self: Sized,
97 {
98 &mut **self
99 }
100 }
101
102 /// awaitable version of `poll_position`.
103 async fn seq_write_position<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<u64> {
104 poll_fn(move |cx| unsafe { Pin::new_unchecked(&mut *output).poll_position(cx) }).await
105 }
106
107 /// awaitable verison of `poll_seq_write`.
108 async fn seq_write<T: SeqWrite + ?Sized>(output: &mut T, buf: &[u8]) -> io::Result<usize> {
109 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await
110 }
111
112 /// Write the entire contents of a buffer, handling short writes.
113 async fn seq_write_all<T: SeqWrite + ?Sized>(output: &mut T, mut buf: &[u8]) -> io::Result<()> {
114 while !buf.is_empty() {
115 let got = seq_write(&mut *output, buf).await?;
116 buf = &buf[got..];
117 }
118 Ok(())
119 }
120
121 /// Write an endian-swappable struct.
122 async fn seq_write_struct<E: Endian, T>(output: &mut T, data: E) -> io::Result<()>
123 where
124 T: SeqWrite + ?Sized,
125 {
126 let data = data.to_le();
127 seq_write_all(output, unsafe {
128 std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
129 })
130 .await
131 }
132
133 /// Write a pxar entry.
134 async fn seq_write_pxar_entry<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
135 where
136 T: SeqWrite + ?Sized,
137 {
138 seq_write_struct(
139 &mut *output,
140 format::Header::with_content_size(htype, data.len() as u64),
141 )
142 .await?;
143 seq_write_all(output, data).await
144 }
145
146 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
147 /// data buffer.
148 async fn seq_write_pxar_entry_zero<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
149 where
150 T: SeqWrite + ?Sized,
151 {
152 seq_write_struct(
153 &mut *output,
154 format::Header::with_content_size(htype, 1 + data.len() as u64),
155 )
156 .await?;
157 seq_write_all(&mut *output, data).await?;
158 seq_write_all(output, &[0u8]).await
159 }
160
161 /// Write a pxar entry consiting of an endian-swappable struct.
162 async fn seq_write_pxar_struct_entry<E, T>(output: &mut T, htype: u64, data: E) -> io::Result<()>
163 where
164 T: SeqWrite + ?Sized,
165 E: Endian,
166 {
167 let data = data.to_le();
168 seq_write_pxar_entry(output, htype, unsafe {
169 std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
170 })
171 .await
172 }
173
174 /// Error conditions caused by wrong usage of this crate.
175 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
176 pub enum EncodeError {
177 /// The user dropped a `File` without without finishing writing all of its contents.
178 ///
179 /// This is required because the payload lengths is written out at the begining and decoding
180 /// requires there to follow the right amount of data.
181 IncompleteFile,
182
183 /// The user dropped a directory without finalizing it.
184 ///
185 /// Finalizing is required to build the goodbye table at the end of a directory.
186 IncompleteDirectory,
187 }
188
189 #[derive(Default)]
190 struct EncoderState {
191 /// Goodbye items for this directory, excluding the tail.
192 items: Vec<GoodbyeItem>,
193
194 /// User caused error conditions.
195 encode_error: Option<EncodeError>,
196
197 /// Offset of this directory's ENTRY.
198 entry_offset: u64,
199
200 /// Offset to this directory's first FILENAME.
201 files_offset: u64,
202
203 /// If this is a subdirectory, this points to the this directory's FILENAME.
204 file_offset: Option<u64>,
205
206 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
207 file_hash: u64,
208 }
209
210 impl EncoderState {
211 fn merge_error(&mut self, error: Option<EncodeError>) {
212 // one error is enough:
213 if self.encode_error.is_none() {
214 self.encode_error = error;
215 }
216 }
217
218 fn add_error(&mut self, error: EncodeError) {
219 self.merge_error(Some(error));
220 }
221 }
222
223 /// The encoder state machine implementation for a directory.
224 ///
225 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
226 /// synchronous or `async` I/O objects in as output.
227 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
228 output: T,
229 state: EncoderState,
230 parent: Option<&'a mut EncoderState>,
231 finished: bool,
232 }
233
234 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
235 fn drop(&mut self) {
236 if let Some(ref mut parent) = self.parent {
237 // propagate errors:
238 parent.merge_error(self.state.encode_error);
239 if !self.finished {
240 parent.add_error(EncodeError::IncompleteDirectory);
241 }
242 } else if !self.finished {
243 // FIXME: how do we deal with this?
244 // eprintln!("Encoder dropped without finishing!");
245 }
246 }
247 }
248
249 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
250 pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
251 if !metadata.is_dir() {
252 io_bail!("directory metadata must contain the directory mode flag");
253 }
254 let mut this = Self {
255 output,
256 state: EncoderState::default(),
257 parent: None,
258 finished: false,
259 };
260
261 this.encode_metadata(metadata).await?;
262 this.state.files_offset = seq_write_position(&mut this.output).await?;
263
264 Ok(this)
265 }
266
267 fn check(&self) -> io::Result<()> {
268 match self.state.encode_error {
269 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
270 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
271 None => Ok(()),
272 }
273 }
274
275 pub async fn create_file<'b>(
276 &'b mut self,
277 metadata: &Metadata,
278 file_name: &Path,
279 file_size: u64,
280 ) -> io::Result<FileImpl<'b>>
281 where
282 'a: 'b,
283 {
284 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
285 .await
286 }
287
288 async fn create_file_do<'b>(
289 &'b mut self,
290 metadata: &Metadata,
291 file_name: &[u8],
292 file_size: u64,
293 ) -> io::Result<FileImpl<'b>>
294 where
295 'a: 'b,
296 {
297 self.check()?;
298
299 let file_offset = seq_write_position(&mut self.output).await?;
300 self.start_file_do(Some(metadata), file_name).await?;
301
302 seq_write_struct(
303 &mut self.output,
304 format::Header::with_content_size(format::PXAR_PAYLOAD, file_size),
305 )
306 .await?;
307
308 let payload_data_offset = seq_write_position(&mut self.output).await?;
309
310 let meta_size = payload_data_offset - file_offset;
311
312 Ok(FileImpl {
313 output: &mut self.output,
314 goodbye_item: GoodbyeItem {
315 hash: format::hash_filename(file_name),
316 offset: file_offset,
317 size: file_size + meta_size,
318 },
319 remaining_size: file_size,
320 parent: &mut self.state,
321 })
322 }
323
324 /// Return a file offset usable with `add_hardlink`.
325 pub async fn add_file(
326 &mut self,
327 metadata: &Metadata,
328 file_name: &Path,
329 file_size: u64,
330 content: &mut dyn SeqRead,
331 ) -> io::Result<LinkOffset> {
332 let mut file = self.create_file(metadata, file_name, file_size).await?;
333 let mut buf = crate::util::vec_new(4096);
334 loop {
335 let got = decoder::seq_read(&mut *content, &mut buf).await?;
336 if got == 0 {
337 break;
338 } else {
339 file.write_all(&buf[..got]).await?;
340 }
341 }
342 Ok(file.file_offset())
343 }
344
345 /// Return a file offset usable with `add_hardlink`.
346 pub async fn add_symlink(
347 &mut self,
348 metadata: &Metadata,
349 file_name: &Path,
350 target: &Path,
351 ) -> io::Result<()> {
352 let _ofs: LinkOffset = self
353 .add_file_entry(
354 Some(metadata),
355 file_name,
356 Some((format::PXAR_SYMLINK, target.as_os_str().as_bytes())),
357 )
358 .await?;
359 Ok(())
360 }
361
362 /// Return a file offset usable with `add_hardlink`.
363 pub async fn add_hardlink(
364 &mut self,
365 file_name: &Path,
366 target: &Path,
367 target_offset: LinkOffset,
368 ) -> io::Result<()> {
369 let current_offset = seq_write_position(&mut self.output).await?;
370 if current_offset <= target_offset.0 {
371 io_bail!("invalid hardlink offset, can only point to prior files");
372 }
373
374 let hardlink = format::Hardlink {
375 offset: current_offset - target_offset.0,
376 data: target.as_os_str().as_bytes().to_vec(),
377 };
378 let hardlink = unsafe {
379 std::slice::from_raw_parts(
380 &hardlink as *const format::Hardlink as *const u8,
381 size_of::<format::Hardlink>(),
382 )
383 };
384 let _this_offset: LinkOffset = self
385 .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, hardlink)))
386 .await?;
387 Ok(())
388 }
389
390 /// Return a file offset usable with `add_hardlink`.
391 pub async fn add_device(
392 &mut self,
393 metadata: &Metadata,
394 file_name: &Path,
395 device: format::Device,
396 ) -> io::Result<()> {
397 if !metadata.is_device() {
398 io_bail!("entry added via add_device must have a device mode in its metadata");
399 }
400
401 let device = device.to_le();
402 let device = unsafe {
403 std::slice::from_raw_parts(
404 &device as *const format::Device as *const u8,
405 size_of::<format::Device>(),
406 )
407 };
408 let _ofs: LinkOffset = self
409 .add_file_entry(
410 Some(metadata),
411 file_name,
412 Some((format::PXAR_DEVICE, device)),
413 )
414 .await?;
415 Ok(())
416 }
417
418 /// Return a file offset usable with `add_hardlink`.
419 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
420 if !metadata.is_fifo() {
421 io_bail!("entry added via add_device must be of type fifo in its metadata");
422 }
423
424 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
425 Ok(())
426 }
427
428 /// Return a file offset usable with `add_hardlink`.
429 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
430 if !metadata.is_socket() {
431 io_bail!("entry added via add_device must be of type socket in its metadata");
432 }
433
434 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
435 Ok(())
436 }
437
438 /// Return a file offset usable with `add_hardlink`.
439 async fn add_file_entry(
440 &mut self,
441 metadata: Option<&Metadata>,
442 file_name: &Path,
443 entry_htype_data: Option<(u64, &[u8])>,
444 ) -> io::Result<LinkOffset> {
445 self.check()?;
446
447 let file_offset = seq_write_position(&mut self.output).await?;
448
449 let file_name = file_name.as_os_str().as_bytes();
450
451 self.start_file_do(metadata, file_name).await?;
452 if let Some((htype, entry_data)) = entry_htype_data {
453 seq_write_pxar_entry_zero(&mut self.output, htype, entry_data).await?;
454 }
455
456 let end_offset = seq_write_position(&mut self.output).await?;
457
458 self.state.items.push(GoodbyeItem {
459 hash: format::hash_filename(file_name),
460 offset: file_offset,
461 size: end_offset - file_offset,
462 });
463
464 Ok(LinkOffset(file_offset))
465 }
466
467 /// Helper
468 #[inline]
469 async fn position(&mut self) -> io::Result<u64> {
470 seq_write_position(&mut self.output).await
471 }
472
473 pub async fn create_directory<'b>(
474 &'b mut self,
475 file_name: &Path,
476 metadata: &Metadata,
477 ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>>
478 where
479 'a: 'b,
480 {
481 self.check()?;
482
483 if !metadata.is_dir() {
484 io_bail!("directory metadata must contain the directory mode flag");
485 }
486
487 let file_name = file_name.as_os_str().as_bytes();
488 let file_hash = format::hash_filename(file_name);
489
490 let file_offset = self.position().await?;
491 self.encode_filename(file_name).await?;
492
493 let entry_offset = self.position().await?;
494 self.encode_metadata(&metadata).await?;
495
496 let files_offset = self.position().await?;
497
498 Ok(EncoderImpl {
499 output: self.output.as_trait_object(),
500 state: EncoderState {
501 entry_offset,
502 files_offset,
503 file_offset: Some(file_offset),
504 file_hash: file_hash,
505 ..Default::default()
506 },
507 parent: Some(&mut self.state),
508 finished: false,
509 })
510 }
511
512 async fn start_file_do(
513 &mut self,
514 metadata: Option<&Metadata>,
515 file_name: &[u8],
516 ) -> io::Result<()> {
517 self.encode_filename(file_name).await?;
518 if let Some(metadata) = metadata {
519 self.encode_metadata(&metadata).await?;
520 }
521 Ok(())
522 }
523
524 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
525 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ENTRY, metadata.stat.clone())
526 .await?;
527
528 for xattr in &metadata.xattrs {
529 self.write_xattr(xattr).await?;
530 }
531
532 self.write_acls(&metadata.acl).await?;
533
534 if let Some(fcaps) = &metadata.fcaps {
535 self.write_file_capabilities(fcaps).await?;
536 }
537
538 if let Some(qpid) = &metadata.quota_project_id {
539 self.write_quota_project_id(qpid).await?;
540 }
541
542 Ok(())
543 }
544
545 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
546 seq_write_pxar_entry(&mut self.output, format::PXAR_XATTR, &xattr.data).await
547 }
548
549 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
550 for acl in &acl.users {
551 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_USER, acl.clone())
552 .await?;
553 }
554
555 for acl in &acl.groups {
556 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP, acl.clone())
557 .await?;
558 }
559
560 if let Some(acl) = &acl.group_obj {
561 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP_OBJ, acl.clone())
562 .await?;
563 }
564
565 if let Some(acl) = &acl.default {
566 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_DEFAULT, acl.clone())
567 .await?;
568 }
569
570 for acl in &acl.default_users {
571 seq_write_pxar_struct_entry(
572 &mut self.output,
573 format::PXAR_ACL_DEFAULT_USER,
574 acl.clone(),
575 )
576 .await?;
577 }
578
579 for acl in &acl.default_groups {
580 seq_write_pxar_struct_entry(
581 &mut self.output,
582 format::PXAR_ACL_DEFAULT_GROUP,
583 acl.clone(),
584 )
585 .await?;
586 }
587
588 Ok(())
589 }
590
591 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
592 seq_write_pxar_entry(&mut self.output, format::PXAR_FCAPS, &fcaps.data).await
593 }
594
595 async fn write_quota_project_id(
596 &mut self,
597 quota_project_id: &format::QuotaProjectId,
598 ) -> io::Result<()> {
599 seq_write_pxar_struct_entry(
600 &mut self.output,
601 format::PXAR_QUOTA_PROJID,
602 quota_project_id.clone(),
603 )
604 .await
605 }
606
607 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
608 seq_write_pxar_entry_zero(&mut self.output, format::PXAR_FILENAME, file_name).await
609 }
610
611 pub async fn finish(mut self) -> io::Result<()> {
612 let tail_bytes = self.finish_goodbye_table().await?;
613 seq_write_pxar_entry(&mut self.output, format::PXAR_GOODBYE, &tail_bytes).await?;
614 if let Some(parent) = &mut self.parent {
615 let file_offset = self
616 .state
617 .file_offset
618 .expect("internal error: parent set but no file_offset?");
619
620 let end_offset = seq_write_position(&mut self.output).await?;
621
622 parent.items.push(GoodbyeItem {
623 hash: self.state.file_hash,
624 offset: file_offset,
625 size: end_offset - file_offset,
626 });
627 }
628 self.finished = true;
629 Ok(())
630 }
631
632 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
633 let goodbye_offset = seq_write_position(&mut self.output).await?;
634
635 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
636 let mut tail = take(&mut self.state.items);
637 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
638 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
639
640 // sort, then create a BST
641 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
642
643 let mut bst = Vec::with_capacity(tail.len() + 1);
644 unsafe {
645 bst.set_len(tail.len());
646 }
647 binary_tree_array::copy(tail.len(), |src, dest| {
648 let mut item = tail[src].clone();
649 // fixup the goodbye table offsets to be relative and with the right endianess
650 item.offset = goodbye_offset - item.offset;
651 unsafe {
652 std::ptr::write(&mut bst[dest], item.to_le());
653 }
654 });
655 drop(tail);
656
657 bst.push(
658 GoodbyeItem {
659 hash: format::PXAR_GOODBYE_TAIL_MARKER,
660 offset: goodbye_offset - self.state.entry_offset,
661 size: goodbye_size,
662 }
663 .to_le(),
664 );
665
666 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
667 // the items make sense:
668 let data = bst.as_mut_ptr() as *mut u8;
669 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
670 forget(bst);
671 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
672 }
673 }
674
675 /// Writer for a file object in a directory.
676 pub struct FileImpl<'a> {
677 output: &'a mut dyn SeqWrite,
678
679 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
680 /// directly instead of on Drop of FileImpl?
681 goodbye_item: GoodbyeItem,
682
683 /// While writing data to this file, this is how much space we still have left, this must reach
684 /// exactly zero.
685 remaining_size: u64,
686
687 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
688 /// to, and where we insert our `GoodbyeItem`.
689 parent: &'a mut EncoderState,
690 }
691
692 impl<'a> Drop for FileImpl<'a> {
693 fn drop(&mut self) {
694 if self.remaining_size != 0 {
695 self.parent.add_error(EncodeError::IncompleteFile);
696 }
697
698 self.parent.items.push(self.goodbye_item.clone());
699 }
700 }
701
702 impl<'a> FileImpl<'a> {
703 /// Get the file offset to be able to reference it with `add_hardlink`.
704 pub fn file_offset(&self) -> LinkOffset {
705 LinkOffset(self.goodbye_item.offset)
706 }
707
708 fn check_remaining(&self, size: usize) -> io::Result<()> {
709 if size as u64 > self.remaining_size {
710 io_bail!("attempted to write more than previously allocated");
711 } else {
712 Ok(())
713 }
714 }
715
716 /// Poll write interface to more easily connect to tokio/futures.
717 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
718 pub fn poll_write(
719 self: Pin<&mut Self>,
720 cx: &mut Context,
721 data: &[u8],
722 ) -> Poll<io::Result<usize>> {
723 let this = self.get_mut();
724 this.check_remaining(data.len())?;
725 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
726 match output.poll_seq_write(cx, data) {
727 Poll::Ready(Ok(put)) => {
728 this.remaining_size -= put as u64;
729 Poll::Ready(Ok(put))
730 }
731 other => other,
732 }
733 }
734
735 /// Poll flush interface to more easily connect to tokio/futures.
736 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
737 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
738 unsafe {
739 self.map_unchecked_mut(|this| &mut this.output)
740 .poll_flush(cx)
741 }
742 }
743
744 /// Poll close/shutdown interface to more easily connect to tokio/futures.
745 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
746 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
747 unsafe {
748 self.map_unchecked_mut(|this| &mut this.output)
749 .poll_close(cx)
750 }
751 }
752
753 /// Write file data for the current file entry in a pxar archive.
754 ///
755 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
756 /// requested. Check the return value for how many. There's also a `write_all` method available
757 /// for convenience.
758 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
759 self.check_remaining(data.len())?;
760 let put = seq_write(&mut self.output, data).await?;
761 self.remaining_size -= put as u64;
762 Ok(put)
763 }
764
765 /// Completely write file data for the current file entry in a pxar archive.
766 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
767 self.check_remaining(data.len())?;
768 seq_write_all(&mut self.output, data).await?;
769 self.remaining_size -= data.len() as u64;
770 Ok(())
771 }
772 }
773
774 #[cfg(feature = "tokio-io")]
775 impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
776 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
777 FileImpl::poll_write(self, cx, buf)
778 }
779
780 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
781 FileImpl::poll_flush(self, cx)
782 }
783
784 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
785 FileImpl::poll_close(self, cx)
786 }
787 }
788
789 #[cfg(feature = "futures-io")]
790 impl<'a> futures::io::AsyncWrite for FileImpl<'a> {
791 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
792 FileImpl::poll_write(self, cx, buf)
793 }
794
795 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
796 FileImpl::poll_flush(self, cx)
797 }
798
799 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
800 FileImpl::poll_close(self, cx)
801 }
802 }