]> git.proxmox.com Git - pxar.git/blob - src/encoder/mod.rs
encoder: introduce LinkOffset
[pxar.git] / src / encoder / mod.rs
1 //! The `pxar` encoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 use std::io;
6 use std::mem::{forget, size_of, size_of_val, take};
7 use std::os::unix::ffi::OsStrExt;
8 use std::path::Path;
9 use std::pin::Pin;
10 use std::task::{Context, Poll};
11
12 use endian_trait::Endian;
13
14 use crate::binary_tree_array;
15 use crate::decoder::{self, SeqRead};
16 use crate::format::{self, GoodbyeItem};
17 use crate::poll_fn::poll_fn;
18 use crate::Metadata;
19
20 pub mod aio;
21 pub mod sync;
22
23 #[doc(inline)]
24 pub use sync::Encoder;
25
26 /// File reference used to create hard links.
27 #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
28 pub struct LinkOffset(u64);
29
30 impl LinkOffset {
31 #[inline]
32 pub fn raw(self) -> u64 {
33 self.0
34 }
35 }
36
37 /// Sequential write interface used by the encoder's state machine.
38 ///
39 /// This is our internal writer trait which is available for `std::io::Write` types in the
40 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
41 /// wrapper.
42 pub trait SeqWrite {
43 fn poll_seq_write(
44 self: Pin<&mut Self>,
45 cx: &mut Context,
46 buf: &[u8],
47 ) -> Poll<io::Result<usize>>;
48
49 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
50
51 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
52
53 /// While writing to a pxar archive we need to remember how much dat we've written to track some
54 /// offsets. Particularly items like the goodbye table need to be able to compute offsets to
55 /// further back in the archive.
56 fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>>;
57
58 /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
59 /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
60 /// subdirectory in that would have a trait object pointing to the trait object, and so on.
61 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
62 where
63 Self: Sized,
64 {
65 self as &mut dyn SeqWrite
66 }
67 }
68
69 /// Allow using trait objects for generics taking a `SeqWrite`.
70 impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
71 fn poll_seq_write(
72 self: Pin<&mut Self>,
73 cx: &mut Context,
74 buf: &[u8],
75 ) -> Poll<io::Result<usize>> {
76 unsafe {
77 self.map_unchecked_mut(|this| &mut **this)
78 .poll_seq_write(cx, buf)
79 }
80 }
81
82 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
83 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
84 }
85
86 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
87 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_close(cx) }
88 }
89
90 fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
91 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_position(cx) }
92 }
93
94 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
95 where
96 Self: Sized,
97 {
98 &mut **self
99 }
100 }
101
102 /// awaitable version of `poll_position`.
103 async fn seq_write_position<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<u64> {
104 poll_fn(move |cx| unsafe { Pin::new_unchecked(&mut *output).poll_position(cx) }).await
105 }
106
107 /// awaitable verison of `poll_seq_write`.
108 async fn seq_write<T: SeqWrite + ?Sized>(output: &mut T, buf: &[u8]) -> io::Result<usize> {
109 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await
110 }
111
112 /// Write the entire contents of a buffer, handling short writes.
113 async fn seq_write_all<T: SeqWrite + ?Sized>(output: &mut T, mut buf: &[u8]) -> io::Result<()> {
114 while !buf.is_empty() {
115 let got = seq_write(&mut *output, buf).await?;
116 buf = &buf[got..];
117 }
118 Ok(())
119 }
120
121 /// Write an endian-swappable struct.
122 async fn seq_write_struct<E: Endian, T>(output: &mut T, data: E) -> io::Result<()>
123 where
124 T: SeqWrite + ?Sized,
125 {
126 let data = data.to_le();
127 seq_write_all(output, unsafe {
128 std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
129 })
130 .await
131 }
132
133 /// Write a pxar entry.
134 async fn seq_write_pxar_entry<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
135 where
136 T: SeqWrite + ?Sized,
137 {
138 seq_write_struct(
139 &mut *output,
140 format::Header::with_content_size(htype, data.len() as u64),
141 )
142 .await?;
143 seq_write_all(output, data).await
144 }
145
146 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
147 /// data buffer.
148 async fn seq_write_pxar_entry_zero<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
149 where
150 T: SeqWrite + ?Sized,
151 {
152 seq_write_struct(
153 &mut *output,
154 format::Header::with_content_size(htype, 1 + data.len() as u64),
155 )
156 .await?;
157 seq_write_all(&mut *output, data).await?;
158 seq_write_all(output, &[0u8]).await
159 }
160
161 /// Write a pxar entry consiting of an endian-swappable struct.
162 async fn seq_write_pxar_struct_entry<E, T>(output: &mut T, htype: u64, data: E) -> io::Result<()>
163 where
164 T: SeqWrite + ?Sized,
165 E: Endian,
166 {
167 let data = data.to_le();
168 seq_write_pxar_entry(output, htype, unsafe {
169 std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
170 })
171 .await
172 }
173
174 /// Error conditions caused by wrong usage of this crate.
175 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
176 pub enum EncodeError {
177 /// The user dropped a `File` without without finishing writing all of its contents.
178 ///
179 /// This is required because the payload lengths is written out at the begining and decoding
180 /// requires there to follow the right amount of data.
181 IncompleteFile,
182
183 /// The user dropped a directory without finalizing it.
184 ///
185 /// Finalizing is required to build the goodbye table at the end of a directory.
186 IncompleteDirectory,
187 }
188
189 #[derive(Default)]
190 struct EncoderState {
191 /// Goodbye items for this directory, excluding the tail.
192 items: Vec<GoodbyeItem>,
193
194 /// User caused error conditions.
195 encode_error: Option<EncodeError>,
196
197 /// Offset of this directory's ENTRY.
198 entry_offset: u64,
199
200 /// Offset to this directory's first FILENAME.
201 files_offset: u64,
202
203 /// If this is a subdirectory, this points to the this directory's FILENAME.
204 file_offset: Option<u64>,
205
206 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
207 file_hash: u64,
208 }
209
210 impl EncoderState {
211 fn merge_error(&mut self, error: Option<EncodeError>) {
212 // one error is enough:
213 if self.encode_error.is_none() {
214 self.encode_error = error;
215 }
216 }
217
218 fn add_error(&mut self, error: EncodeError) {
219 self.merge_error(Some(error));
220 }
221 }
222
223 /// The encoder state machine implementation for a directory.
224 ///
225 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
226 /// synchronous or `async` I/O objects in as output.
227 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
228 output: T,
229 state: EncoderState,
230 parent: Option<&'a mut EncoderState>,
231 finished: bool,
232 }
233
234 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
235 fn drop(&mut self) {
236 if let Some(ref mut parent) = self.parent {
237 // propagate errors:
238 parent.merge_error(self.state.encode_error);
239 if !self.finished {
240 parent.add_error(EncodeError::IncompleteDirectory);
241 }
242 } else if !self.finished {
243 // FIXME: how do we deal with this?
244 // eprintln!("Encoder dropped without finishing!");
245 }
246 }
247 }
248
249 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
250 pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
251 if !metadata.is_dir() {
252 io_bail!("directory metadata must contain the directory mode flag");
253 }
254 let mut this = Self {
255 output,
256 state: EncoderState::default(),
257 parent: None,
258 finished: false,
259 };
260
261 this.encode_metadata(metadata).await?;
262 this.state.files_offset = seq_write_position(&mut this.output).await?;
263
264 Ok(this)
265 }
266
267 fn check(&self) -> io::Result<()> {
268 match self.state.encode_error {
269 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
270 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
271 None => Ok(()),
272 }
273 }
274
275 pub async fn create_file<'b>(
276 &'b mut self,
277 metadata: &Metadata,
278 file_name: &Path,
279 file_size: u64,
280 ) -> io::Result<FileImpl<'b>>
281 where
282 'a: 'b,
283 {
284 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
285 .await
286 }
287
288 async fn create_file_do<'b>(
289 &'b mut self,
290 metadata: &Metadata,
291 file_name: &[u8],
292 file_size: u64,
293 ) -> io::Result<FileImpl<'b>>
294 where
295 'a: 'b,
296 {
297 self.check()?;
298
299 let file_offset = seq_write_position(&mut self.output).await?;
300 self.start_file_do(Some(metadata), file_name).await?;
301
302 seq_write_struct(
303 &mut self.output,
304 format::Header::with_content_size(format::PXAR_PAYLOAD, file_size),
305 )
306 .await?;
307
308 let payload_data_offset = seq_write_position(&mut self.output).await?;
309
310 let meta_size = payload_data_offset - file_offset;
311
312 Ok(FileImpl {
313 output: &mut self.output,
314 goodbye_item: GoodbyeItem {
315 hash: format::hash_filename(file_name),
316 offset: file_offset,
317 size: file_size + meta_size,
318 },
319 remaining_size: file_size,
320 parent: &mut self.state,
321 })
322 }
323
324 /// Return a file offset usable with `add_hardlink`.
325 pub async fn add_file(
326 &mut self,
327 metadata: &Metadata,
328 file_name: &Path,
329 file_size: u64,
330 content: &mut dyn SeqRead,
331 ) -> io::Result<LinkOffset> {
332 let mut file = self.create_file(metadata, file_name, file_size).await?;
333 let mut buf = crate::util::vec_new(4096);
334 loop {
335 let got = decoder::seq_read(&mut *content, &mut buf).await?;
336 if got == 0 {
337 break;
338 } else {
339 file.write_all(&buf[..got]).await?;
340 }
341 }
342 Ok(file.file_offset())
343 }
344
345 /// Return a file offset usable with `add_hardlink`.
346 pub async fn add_symlink(
347 &mut self,
348 metadata: &Metadata,
349 file_name: &Path,
350 target: &Path,
351 ) -> io::Result<LinkOffset> {
352 self.add_file_entry(
353 Some(metadata),
354 file_name,
355 Some((format::PXAR_SYMLINK, target.as_os_str().as_bytes())),
356 )
357 .await
358 }
359
360 /// Return a file offset usable with `add_hardlink`.
361 pub async fn add_hardlink(
362 &mut self,
363 file_name: &Path,
364 target: &Path,
365 offset: LinkOffset,
366 ) -> io::Result<()> {
367 let hardlink = format::Hardlink {
368 offset: offset.0,
369 data: target.as_os_str().as_bytes().to_vec(),
370 };
371 let hardlink = unsafe {
372 std::slice::from_raw_parts(
373 &hardlink as *const format::Hardlink as *const u8,
374 size_of::<format::Hardlink>(),
375 )
376 };
377 let _this_offset: LinkOffset = self.add_file_entry(
378 None,
379 file_name,
380 Some((format::PXAR_HARDLINK, hardlink)),
381 )
382 .await?;
383 Ok(())
384 }
385
386 /// Return a file offset usable with `add_hardlink`.
387 pub async fn add_device(
388 &mut self,
389 metadata: &Metadata,
390 file_name: &Path,
391 device: format::Device,
392 ) -> io::Result<LinkOffset> {
393 if !metadata.is_device() {
394 io_bail!("entry added via add_device must have a device mode in its metadata");
395 }
396
397 let device = device.to_le();
398 let device = unsafe {
399 std::slice::from_raw_parts(
400 &device as *const format::Device as *const u8,
401 size_of::<format::Device>(),
402 )
403 };
404 self.add_file_entry(
405 Some(metadata),
406 file_name,
407 Some((format::PXAR_DEVICE, device)),
408 )
409 .await
410 }
411
412 /// Return a file offset usable with `add_hardlink`.
413 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<LinkOffset> {
414 if !metadata.is_fifo() {
415 io_bail!("entry added via add_device must be of type fifo in its metadata");
416 }
417
418 self.add_file_entry(Some(metadata), file_name, None).await
419 }
420
421 /// Return a file offset usable with `add_hardlink`.
422 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<LinkOffset> {
423 if !metadata.is_socket() {
424 io_bail!("entry added via add_device must be of type socket in its metadata");
425 }
426
427 self.add_file_entry(Some(metadata), file_name, None).await
428 }
429
430 /// Return a file offset usable with `add_hardlink`.
431 async fn add_file_entry(
432 &mut self,
433 metadata: Option<&Metadata>,
434 file_name: &Path,
435 entry_htype_data: Option<(u64, &[u8])>,
436 ) -> io::Result<LinkOffset> {
437 self.check()?;
438
439 let file_offset = seq_write_position(&mut self.output).await?;
440
441 let file_name = file_name.as_os_str().as_bytes();
442
443 self.start_file_do(metadata, file_name).await?;
444 if let Some((htype, entry_data)) = entry_htype_data {
445 seq_write_pxar_entry_zero(&mut self.output, htype, entry_data).await?;
446 }
447
448 let end_offset = seq_write_position(&mut self.output).await?;
449
450 self.state.items.push(GoodbyeItem {
451 hash: format::hash_filename(file_name),
452 offset: file_offset,
453 size: end_offset - file_offset,
454 });
455
456 Ok(LinkOffset(file_offset))
457 }
458
459 /// Helper
460 #[inline]
461 async fn position(&mut self) -> io::Result<u64> {
462 seq_write_position(&mut self.output).await
463 }
464
465 pub async fn create_directory<'b>(
466 &'b mut self,
467 file_name: &Path,
468 metadata: &Metadata,
469 ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>>
470 where
471 'a: 'b,
472 {
473 self.check()?;
474
475 if !metadata.is_dir() {
476 io_bail!("directory metadata must contain the directory mode flag");
477 }
478
479 let file_name = file_name.as_os_str().as_bytes();
480 let file_hash = format::hash_filename(file_name);
481
482 let file_offset = self.position().await?;
483 self.encode_filename(file_name).await?;
484
485 let entry_offset = self.position().await?;
486 self.encode_metadata(&metadata).await?;
487
488 let files_offset = self.position().await?;
489
490 Ok(EncoderImpl {
491 output: self.output.as_trait_object(),
492 state: EncoderState {
493 entry_offset,
494 files_offset,
495 file_offset: Some(file_offset),
496 file_hash: file_hash,
497 ..Default::default()
498 },
499 parent: Some(&mut self.state),
500 finished: false,
501 })
502 }
503
504 async fn start_file_do(
505 &mut self,
506 metadata: Option<&Metadata>,
507 file_name: &[u8],
508 ) -> io::Result<()> {
509 self.encode_filename(file_name).await?;
510 if let Some(metadata) = metadata {
511 self.encode_metadata(&metadata).await?;
512 }
513 Ok(())
514 }
515
516 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
517 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ENTRY, metadata.stat.clone())
518 .await?;
519
520 for xattr in &metadata.xattrs {
521 self.write_xattr(xattr).await?;
522 }
523
524 self.write_acls(&metadata.acl).await?;
525
526 if let Some(fcaps) = &metadata.fcaps {
527 self.write_file_capabilities(fcaps).await?;
528 }
529
530 if let Some(qpid) = &metadata.quota_project_id {
531 self.write_quota_project_id(qpid).await?;
532 }
533
534 Ok(())
535 }
536
537 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
538 seq_write_pxar_entry(&mut self.output, format::PXAR_XATTR, &xattr.data).await
539 }
540
541 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
542 for acl in &acl.users {
543 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_USER, acl.clone())
544 .await?;
545 }
546
547 for acl in &acl.groups {
548 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP, acl.clone())
549 .await?;
550 }
551
552 if let Some(acl) = &acl.group_obj {
553 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP_OBJ, acl.clone())
554 .await?;
555 }
556
557 if let Some(acl) = &acl.default {
558 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_DEFAULT, acl.clone())
559 .await?;
560 }
561
562 for acl in &acl.default_users {
563 seq_write_pxar_struct_entry(
564 &mut self.output,
565 format::PXAR_ACL_DEFAULT_USER,
566 acl.clone(),
567 )
568 .await?;
569 }
570
571 for acl in &acl.default_groups {
572 seq_write_pxar_struct_entry(
573 &mut self.output,
574 format::PXAR_ACL_DEFAULT_GROUP,
575 acl.clone(),
576 )
577 .await?;
578 }
579
580 Ok(())
581 }
582
583 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
584 seq_write_pxar_entry(&mut self.output, format::PXAR_FCAPS, &fcaps.data).await
585 }
586
587 async fn write_quota_project_id(
588 &mut self,
589 quota_project_id: &format::QuotaProjectId,
590 ) -> io::Result<()> {
591 seq_write_pxar_struct_entry(
592 &mut self.output,
593 format::PXAR_QUOTA_PROJID,
594 quota_project_id.clone(),
595 )
596 .await
597 }
598
599 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
600 seq_write_pxar_entry_zero(&mut self.output, format::PXAR_FILENAME, file_name).await
601 }
602
603 pub async fn finish(mut self) -> io::Result<()> {
604 let tail_bytes = self.finish_goodbye_table().await?;
605 seq_write_pxar_entry(&mut self.output, format::PXAR_GOODBYE, &tail_bytes).await?;
606 if let Some(parent) = &mut self.parent {
607 let file_offset = self
608 .state
609 .file_offset
610 .expect("internal error: parent set but no file_offset?");
611
612 let end_offset = seq_write_position(&mut self.output).await?;
613
614 parent.items.push(GoodbyeItem {
615 hash: self.state.file_hash,
616 offset: file_offset,
617 size: end_offset - file_offset,
618 });
619 }
620 self.finished = true;
621 Ok(())
622 }
623
624 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
625 let goodbye_offset = seq_write_position(&mut self.output).await?;
626
627 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
628 let mut tail = take(&mut self.state.items);
629 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
630 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
631
632 // sort, then create a BST
633 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
634
635 let mut bst = Vec::with_capacity(tail.len() + 1);
636 unsafe {
637 bst.set_len(tail.len());
638 }
639 binary_tree_array::copy(tail.len(), |src, dest| {
640 let mut item = tail[src].clone();
641 // fixup the goodbye table offsets to be relative and with the right endianess
642 item.offset = goodbye_offset - item.offset;
643 unsafe {
644 std::ptr::write(&mut bst[dest], item.to_le());
645 }
646 });
647 drop(tail);
648
649 bst.push(
650 GoodbyeItem {
651 hash: format::PXAR_GOODBYE_TAIL_MARKER,
652 offset: goodbye_offset - self.state.entry_offset,
653 size: goodbye_size,
654 }
655 .to_le(),
656 );
657
658 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
659 // the items make sense:
660 let data = bst.as_mut_ptr() as *mut u8;
661 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
662 forget(bst);
663 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
664 }
665 }
666
667 /// Writer for a file object in a directory.
668 pub struct FileImpl<'a> {
669 output: &'a mut dyn SeqWrite,
670
671 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
672 /// directly instead of on Drop of FileImpl?
673 goodbye_item: GoodbyeItem,
674
675 /// While writing data to this file, this is how much space we still have left, this must reach
676 /// exactly zero.
677 remaining_size: u64,
678
679 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
680 /// to, and where we insert our `GoodbyeItem`.
681 parent: &'a mut EncoderState,
682 }
683
684 impl<'a> Drop for FileImpl<'a> {
685 fn drop(&mut self) {
686 if self.remaining_size != 0 {
687 self.parent.add_error(EncodeError::IncompleteFile);
688 }
689
690 self.parent.items.push(self.goodbye_item.clone());
691 }
692 }
693
694 impl<'a> FileImpl<'a> {
695 /// Get the file offset to be able to reference it with `add_hardlink`.
696 pub fn file_offset(&self) -> LinkOffset {
697 LinkOffset(self.goodbye_item.offset)
698 }
699
700 fn check_remaining(&self, size: usize) -> io::Result<()> {
701 if size as u64 > self.remaining_size {
702 io_bail!("attempted to write more than previously allocated");
703 } else {
704 Ok(())
705 }
706 }
707
708 /// Poll write interface to more easily connect to tokio/futures.
709 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
710 pub fn poll_write(
711 self: Pin<&mut Self>,
712 cx: &mut Context,
713 data: &[u8],
714 ) -> Poll<io::Result<usize>> {
715 let this = self.get_mut();
716 this.check_remaining(data.len())?;
717 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
718 match output.poll_seq_write(cx, data) {
719 Poll::Ready(Ok(put)) => {
720 this.remaining_size -= put as u64;
721 Poll::Ready(Ok(put))
722 }
723 other => other,
724 }
725 }
726
727 /// Poll flush interface to more easily connect to tokio/futures.
728 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
729 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
730 unsafe {
731 self.map_unchecked_mut(|this| &mut this.output)
732 .poll_flush(cx)
733 }
734 }
735
736 /// Poll close/shutdown interface to more easily connect to tokio/futures.
737 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
738 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
739 unsafe {
740 self.map_unchecked_mut(|this| &mut this.output)
741 .poll_close(cx)
742 }
743 }
744
745 /// Write file data for the current file entry in a pxar archive.
746 ///
747 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
748 /// requested. Check the return value for how many. There's also a `write_all` method available
749 /// for convenience.
750 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
751 self.check_remaining(data.len())?;
752 let put = seq_write(&mut self.output, data).await?;
753 self.remaining_size -= put as u64;
754 Ok(put)
755 }
756
757 /// Completely write file data for the current file entry in a pxar archive.
758 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
759 self.check_remaining(data.len())?;
760 seq_write_all(&mut self.output, data).await?;
761 self.remaining_size -= data.len() as u64;
762 Ok(())
763 }
764 }
765
766 #[cfg(feature = "tokio-io")]
767 impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
768 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
769 FileImpl::poll_write(self, cx, buf)
770 }
771
772 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
773 FileImpl::poll_flush(self, cx)
774 }
775
776 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
777 FileImpl::poll_close(self, cx)
778 }
779 }
780
781 #[cfg(feature = "futures-io")]
782 impl<'a> futures::io::AsyncWrite for FileImpl<'a> {
783 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
784 FileImpl::poll_write(self, cx, buf)
785 }
786
787 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
788 FileImpl::poll_flush(self, cx)
789 }
790
791 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
792 FileImpl::poll_close(self, cx)
793 }
794 }