]> git.proxmox.com Git - pxar.git/blob - src/encoder/mod.rs
switch to using mod.rs
[pxar.git] / src / encoder / mod.rs
1 //! The `pxar` encoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 use std::io;
6 use std::mem::{forget, size_of, size_of_val, take};
7 use std::os::unix::ffi::OsStrExt;
8 use std::path::Path;
9 use std::pin::Pin;
10 use std::task::{Context, Poll};
11
12 use endian_trait::Endian;
13
14 use crate::binary_tree_array;
15 use crate::decoder::{self, SeqRead};
16 use crate::format::{self, GoodbyeItem};
17 use crate::poll_fn::poll_fn;
18 use crate::Metadata;
19
20 pub mod aio;
21 pub mod sync;
22
23 #[doc(inline)]
24 pub use sync::Encoder;
25
26 /// Sequential write interface used by the encoder's state machine.
27 ///
28 /// This is our internal writer trait which is available for `std::io::Write` types in the
29 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
30 /// wrapper.
31 pub trait SeqWrite {
32 fn poll_seq_write(
33 self: Pin<&mut Self>,
34 cx: &mut Context,
35 buf: &[u8],
36 ) -> Poll<io::Result<usize>>;
37
38 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
39
40 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
41
42 /// While writing to a pxar archive we need to remember how much dat we've written to track some
43 /// offsets. Particularly items like the goodbye table need to be able to compute offsets to
44 /// further back in the archive.
45 fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>>;
46
47 /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
48 /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
49 /// subdirectory in that would have a trait object pointing to the trait object, and so on.
50 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
51 where
52 Self: Sized,
53 {
54 self as &mut dyn SeqWrite
55 }
56 }
57
58 /// Allow using trait objects for generics taking a `SeqWrite`.
59 impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
60 fn poll_seq_write(
61 self: Pin<&mut Self>,
62 cx: &mut Context,
63 buf: &[u8],
64 ) -> Poll<io::Result<usize>> {
65 unsafe {
66 self.map_unchecked_mut(|this| &mut **this)
67 .poll_seq_write(cx, buf)
68 }
69 }
70
71 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
72 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
73 }
74
75 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
76 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_close(cx) }
77 }
78
79 fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
80 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_position(cx) }
81 }
82
83 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
84 where
85 Self: Sized,
86 {
87 &mut **self
88 }
89 }
90
91 /// awaitable version of `poll_position`.
92 async fn seq_write_position<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<u64> {
93 poll_fn(move |cx| unsafe { Pin::new_unchecked(&mut *output).poll_position(cx) }).await
94 }
95
96 /// awaitable verison of `poll_seq_write`.
97 async fn seq_write<T: SeqWrite + ?Sized>(output: &mut T, buf: &[u8]) -> io::Result<usize> {
98 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await
99 }
100
101 /// Write the entire contents of a buffer, handling short writes.
102 async fn seq_write_all<T: SeqWrite + ?Sized>(output: &mut T, mut buf: &[u8]) -> io::Result<()> {
103 while !buf.is_empty() {
104 let got = seq_write(&mut *output, buf).await?;
105 buf = &buf[got..];
106 }
107 Ok(())
108 }
109
110 /// Write an endian-swappable struct.
111 async fn seq_write_struct<E: Endian, T>(output: &mut T, data: E) -> io::Result<()>
112 where
113 T: SeqWrite + ?Sized,
114 {
115 let data = data.to_le();
116 seq_write_all(output, unsafe {
117 std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
118 })
119 .await
120 }
121
122 /// Write a pxar entry.
123 async fn seq_write_pxar_entry<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
124 where
125 T: SeqWrite + ?Sized,
126 {
127 seq_write_struct(
128 &mut *output,
129 format::Header::with_content_size(htype, data.len() as u64),
130 )
131 .await?;
132 seq_write_all(output, data).await
133 }
134
135 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
136 /// data buffer.
137 async fn seq_write_pxar_entry_zero<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
138 where
139 T: SeqWrite + ?Sized,
140 {
141 seq_write_struct(
142 &mut *output,
143 format::Header::with_content_size(htype, 1 + data.len() as u64),
144 )
145 .await?;
146 seq_write_all(&mut *output, data).await?;
147 seq_write_all(output, &[0u8]).await
148 }
149
150 /// Write a pxar entry consiting of an endian-swappable struct.
151 async fn seq_write_pxar_struct_entry<E, T>(output: &mut T, htype: u64, data: E) -> io::Result<()>
152 where
153 T: SeqWrite + ?Sized,
154 E: Endian,
155 {
156 let data = data.to_le();
157 seq_write_pxar_entry(output, htype, unsafe {
158 std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
159 })
160 .await
161 }
162
163 /// Error conditions caused by wrong usage of this crate.
164 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
165 pub enum EncodeError {
166 /// The user dropped a `File` without without finishing writing all of its contents.
167 ///
168 /// This is required because the payload lengths is written out at the begining and decoding
169 /// requires there to follow the right amount of data.
170 IncompleteFile,
171
172 /// The user dropped a directory without finalizing it.
173 ///
174 /// Finalizing is required to build the goodbye table at the end of a directory.
175 IncompleteDirectory,
176 }
177
178 #[derive(Default)]
179 struct EncoderState {
180 /// Goodbye items for this directory, excluding the tail.
181 items: Vec<GoodbyeItem>,
182
183 /// User caused error conditions.
184 encode_error: Option<EncodeError>,
185
186 /// Offset of this directory's ENTRY.
187 entry_offset: u64,
188
189 /// Offset to this directory's first FILENAME.
190 files_offset: u64,
191
192 /// If this is a subdirectory, this points to the this directory's FILENAME.
193 file_offset: Option<u64>,
194
195 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
196 file_hash: u64,
197 }
198
199 impl EncoderState {
200 fn merge_error(&mut self, error: Option<EncodeError>) {
201 // one error is enough:
202 if self.encode_error.is_none() {
203 self.encode_error = error;
204 }
205 }
206
207 fn add_error(&mut self, error: EncodeError) {
208 self.merge_error(Some(error));
209 }
210 }
211
212 /// The encoder state machine implementation for a directory.
213 ///
214 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
215 /// synchronous or `async` I/O objects in as output.
216 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
217 output: T,
218 state: EncoderState,
219 parent: Option<&'a mut EncoderState>,
220 finished: bool,
221 }
222
223 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
224 fn drop(&mut self) {
225 if let Some(ref mut parent) = self.parent {
226 // propagate errors:
227 parent.merge_error(self.state.encode_error);
228 if !self.finished {
229 parent.add_error(EncodeError::IncompleteDirectory);
230 }
231 } else if !self.finished {
232 // FIXME: how do we deal with this?
233 // eprintln!("Encoder dropped without finishing!");
234 }
235 }
236 }
237
238 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
239 pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
240 if !metadata.is_dir() {
241 io_bail!("directory metadata must contain the directory mode flag");
242 }
243 let mut this = Self {
244 output,
245 state: EncoderState::default(),
246 parent: None,
247 finished: false,
248 };
249
250 this.encode_metadata(metadata).await?;
251 this.state.files_offset = seq_write_position(&mut this.output).await?;
252
253 Ok(this)
254 }
255
256 fn check(&self) -> io::Result<()> {
257 match self.state.encode_error {
258 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
259 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
260 None => Ok(()),
261 }
262 }
263
264 pub async fn create_file<'b>(
265 &'b mut self,
266 metadata: &Metadata,
267 file_name: &Path,
268 file_size: u64,
269 ) -> io::Result<FileImpl<'b>>
270 where
271 'a: 'b,
272 {
273 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
274 .await
275 }
276
277 async fn create_file_do<'b>(
278 &'b mut self,
279 metadata: &Metadata,
280 file_name: &[u8],
281 file_size: u64,
282 ) -> io::Result<FileImpl<'b>>
283 where
284 'a: 'b,
285 {
286 self.check()?;
287
288 let file_offset = seq_write_position(&mut self.output).await?;
289 self.start_file_do(Some(metadata), file_name).await?;
290
291 seq_write_struct(
292 &mut self.output,
293 format::Header::with_content_size(format::PXAR_PAYLOAD, file_size),
294 )
295 .await?;
296
297 let payload_data_offset = seq_write_position(&mut self.output).await?;
298
299 let meta_size = payload_data_offset - file_offset;
300
301 Ok(FileImpl {
302 output: &mut self.output,
303 goodbye_item: GoodbyeItem {
304 hash: format::hash_filename(file_name),
305 offset: file_offset,
306 size: file_size + meta_size,
307 },
308 remaining_size: file_size,
309 parent: &mut self.state,
310 })
311 }
312
313 pub async fn add_file(
314 &mut self,
315 metadata: &Metadata,
316 file_name: &Path,
317 file_size: u64,
318 content: &mut dyn SeqRead,
319 ) -> io::Result<()> {
320 let mut file = self.create_file(metadata, file_name, file_size).await?;
321 let mut buf = crate::util::vec_new(4096);
322 loop {
323 let got = decoder::seq_read(&mut *content, &mut buf).await?;
324 if got == 0 {
325 break;
326 } else {
327 file.write_all(&buf[..got]).await?;
328 }
329 }
330 Ok(())
331 }
332
333 pub async fn add_symlink(
334 &mut self,
335 metadata: &Metadata,
336 file_name: &Path,
337 target: &Path,
338 ) -> io::Result<()> {
339 self.add_file_entry(
340 Some(metadata),
341 file_name,
342 Some((format::PXAR_SYMLINK, target.as_os_str().as_bytes())),
343 )
344 .await
345 }
346
347 pub async fn add_hardlink(
348 &mut self,
349 file_name: &Path,
350 target: &Path,
351 offset: u64,
352 ) -> io::Result<()> {
353 let hardlink = format::Hardlink {
354 offset,
355 data: target.as_os_str().as_bytes().to_vec(),
356 };
357 let hardlink = unsafe {
358 std::slice::from_raw_parts(
359 &hardlink as *const format::Hardlink as *const u8,
360 size_of::<format::Hardlink>(),
361 )
362 };
363 self.add_file_entry(
364 None,
365 file_name,
366 Some((format::PXAR_HARDLINK, hardlink)),
367 )
368 .await
369 }
370
371 pub async fn add_device(
372 &mut self,
373 metadata: &Metadata,
374 file_name: &Path,
375 device: format::Device,
376 ) -> io::Result<()> {
377 if !metadata.is_device() {
378 io_bail!("entry added via add_device must have a device mode in its metadata");
379 }
380
381 let device = device.to_le();
382 let device = unsafe {
383 std::slice::from_raw_parts(
384 &device as *const format::Device as *const u8,
385 size_of::<format::Device>(),
386 )
387 };
388 self.add_file_entry(
389 Some(metadata),
390 file_name,
391 Some((format::PXAR_DEVICE, device)),
392 )
393 .await
394 }
395
396 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
397 if !metadata.is_fifo() {
398 io_bail!("entry added via add_device must be of type fifo in its metadata");
399 }
400
401 self.add_file_entry(Some(metadata), file_name, None).await
402 }
403
404 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
405 if !metadata.is_socket() {
406 io_bail!("entry added via add_device must be of type socket in its metadata");
407 }
408
409 self.add_file_entry(Some(metadata), file_name, None).await
410 }
411
412 async fn add_file_entry(
413 &mut self,
414 metadata: Option<&Metadata>,
415 file_name: &Path,
416 entry_htype_data: Option<(u64, &[u8])>,
417 ) -> io::Result<()> {
418 self.check()?;
419
420 let file_offset = seq_write_position(&mut self.output).await?;
421
422 let file_name = file_name.as_os_str().as_bytes();
423
424 self.start_file_do(metadata, file_name).await?;
425 if let Some((htype, entry_data)) = entry_htype_data {
426 seq_write_pxar_entry_zero(&mut self.output, htype, entry_data).await?;
427 }
428
429 let end_offset = seq_write_position(&mut self.output).await?;
430
431 self.state.items.push(GoodbyeItem {
432 hash: format::hash_filename(file_name),
433 offset: file_offset,
434 size: end_offset - file_offset,
435 });
436
437 Ok(())
438 }
439
440 /// Helper
441 #[inline]
442 async fn position(&mut self) -> io::Result<u64> {
443 seq_write_position(&mut self.output).await
444 }
445
446 pub async fn create_directory<'b>(
447 &'b mut self,
448 file_name: &Path,
449 metadata: &Metadata,
450 ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>>
451 where
452 'a: 'b,
453 {
454 self.check()?;
455
456 if !metadata.is_dir() {
457 io_bail!("directory metadata must contain the directory mode flag");
458 }
459
460 let file_name = file_name.as_os_str().as_bytes();
461 let file_hash = format::hash_filename(file_name);
462
463 let file_offset = self.position().await?;
464 self.encode_filename(file_name).await?;
465
466 let entry_offset = self.position().await?;
467 self.encode_metadata(&metadata).await?;
468
469 let files_offset = self.position().await?;
470
471 Ok(EncoderImpl {
472 output: self.output.as_trait_object(),
473 state: EncoderState {
474 entry_offset,
475 files_offset,
476 file_offset: Some(file_offset),
477 file_hash: file_hash,
478 ..Default::default()
479 },
480 parent: Some(&mut self.state),
481 finished: false,
482 })
483 }
484
485 async fn start_file_do(
486 &mut self,
487 metadata: Option<&Metadata>,
488 file_name: &[u8],
489 ) -> io::Result<()> {
490 self.encode_filename(file_name).await?;
491 if let Some(metadata) = metadata {
492 self.encode_metadata(&metadata).await?;
493 }
494 Ok(())
495 }
496
497 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
498 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ENTRY, metadata.stat.clone())
499 .await?;
500
501 for xattr in &metadata.xattrs {
502 self.write_xattr(xattr).await?;
503 }
504
505 self.write_acls(&metadata.acl).await?;
506
507 if let Some(fcaps) = &metadata.fcaps {
508 self.write_file_capabilities(fcaps).await?;
509 }
510
511 if let Some(qpid) = &metadata.quota_project_id {
512 self.write_quota_project_id(qpid).await?;
513 }
514
515 Ok(())
516 }
517
518 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
519 seq_write_pxar_entry(&mut self.output, format::PXAR_XATTR, &xattr.data).await
520 }
521
522 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
523 for acl in &acl.users {
524 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_USER, acl.clone())
525 .await?;
526 }
527
528 for acl in &acl.groups {
529 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP, acl.clone())
530 .await?;
531 }
532
533 if let Some(acl) = &acl.group_obj {
534 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP_OBJ, acl.clone())
535 .await?;
536 }
537
538 if let Some(acl) = &acl.default {
539 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_DEFAULT, acl.clone())
540 .await?;
541 }
542
543 for acl in &acl.default_users {
544 seq_write_pxar_struct_entry(
545 &mut self.output,
546 format::PXAR_ACL_DEFAULT_USER,
547 acl.clone(),
548 )
549 .await?;
550 }
551
552 for acl in &acl.default_groups {
553 seq_write_pxar_struct_entry(
554 &mut self.output,
555 format::PXAR_ACL_DEFAULT_GROUP,
556 acl.clone(),
557 )
558 .await?;
559 }
560
561 Ok(())
562 }
563
564 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
565 seq_write_pxar_entry(&mut self.output, format::PXAR_FCAPS, &fcaps.data).await
566 }
567
568 async fn write_quota_project_id(
569 &mut self,
570 quota_project_id: &format::QuotaProjectId,
571 ) -> io::Result<()> {
572 seq_write_pxar_struct_entry(
573 &mut self.output,
574 format::PXAR_QUOTA_PROJID,
575 quota_project_id.clone(),
576 )
577 .await
578 }
579
580 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
581 seq_write_pxar_entry_zero(&mut self.output, format::PXAR_FILENAME, file_name).await
582 }
583
584 pub async fn finish(mut self) -> io::Result<()> {
585 let tail_bytes = self.finish_goodbye_table().await?;
586 seq_write_pxar_entry(&mut self.output, format::PXAR_GOODBYE, &tail_bytes).await?;
587 if let Some(parent) = &mut self.parent {
588 let file_offset = self
589 .state
590 .file_offset
591 .expect("internal error: parent set but no file_offset?");
592
593 let end_offset = seq_write_position(&mut self.output).await?;
594
595 parent.items.push(GoodbyeItem {
596 hash: self.state.file_hash,
597 offset: file_offset,
598 size: end_offset - file_offset,
599 });
600 }
601 self.finished = true;
602 Ok(())
603 }
604
605 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
606 let goodbye_offset = seq_write_position(&mut self.output).await?;
607
608 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
609 let mut tail = take(&mut self.state.items);
610 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
611 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
612
613 // sort, then create a BST
614 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
615
616 let mut bst = Vec::with_capacity(tail.len() + 1);
617 unsafe {
618 bst.set_len(tail.len());
619 }
620 binary_tree_array::copy(tail.len(), |src, dest| {
621 let mut item = tail[src].clone();
622 // fixup the goodbye table offsets to be relative and with the right endianess
623 item.offset = goodbye_offset - item.offset;
624 unsafe {
625 std::ptr::write(&mut bst[dest], item.to_le());
626 }
627 });
628 drop(tail);
629
630 bst.push(
631 GoodbyeItem {
632 hash: format::PXAR_GOODBYE_TAIL_MARKER,
633 offset: goodbye_offset - self.state.entry_offset,
634 size: goodbye_size,
635 }
636 .to_le(),
637 );
638
639 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
640 // the items make sense:
641 let data = bst.as_mut_ptr() as *mut u8;
642 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
643 forget(bst);
644 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
645 }
646 }
647
648 /// Writer for a file object in a directory.
649 pub struct FileImpl<'a> {
650 output: &'a mut dyn SeqWrite,
651
652 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
653 /// directly instead of on Drop of FileImpl?
654 goodbye_item: GoodbyeItem,
655
656 /// While writing data to this file, this is how much space we still have left, this must reach
657 /// exactly zero.
658 remaining_size: u64,
659
660 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
661 /// to, and where we insert our `GoodbyeItem`.
662 parent: &'a mut EncoderState,
663 }
664
665 impl<'a> Drop for FileImpl<'a> {
666 fn drop(&mut self) {
667 if self.remaining_size != 0 {
668 self.parent.add_error(EncodeError::IncompleteFile);
669 }
670
671 self.parent.items.push(self.goodbye_item.clone());
672 }
673 }
674
675 impl<'a> FileImpl<'a> {
676 fn check_remaining(&self, size: usize) -> io::Result<()> {
677 if size as u64 > self.remaining_size {
678 io_bail!("attempted to write more than previously allocated");
679 } else {
680 Ok(())
681 }
682 }
683
684 /// Poll write interface to more easily connect to tokio/futures.
685 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
686 pub fn poll_write(
687 self: Pin<&mut Self>,
688 cx: &mut Context,
689 data: &[u8],
690 ) -> Poll<io::Result<usize>> {
691 let this = self.get_mut();
692 this.check_remaining(data.len())?;
693 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
694 match output.poll_seq_write(cx, data) {
695 Poll::Ready(Ok(put)) => {
696 this.remaining_size -= put as u64;
697 Poll::Ready(Ok(put))
698 }
699 other => other,
700 }
701 }
702
703 /// Poll flush interface to more easily connect to tokio/futures.
704 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
705 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
706 unsafe {
707 self.map_unchecked_mut(|this| &mut this.output)
708 .poll_flush(cx)
709 }
710 }
711
712 /// Poll close/shutdown interface to more easily connect to tokio/futures.
713 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
714 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
715 unsafe {
716 self.map_unchecked_mut(|this| &mut this.output)
717 .poll_close(cx)
718 }
719 }
720
721 /// Write file data for the current file entry in a pxar archive.
722 ///
723 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
724 /// requested. Check the return value for how many. There's also a `write_all` method available
725 /// for convenience.
726 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
727 self.check_remaining(data.len())?;
728 let put = seq_write(&mut self.output, data).await?;
729 self.remaining_size -= put as u64;
730 Ok(put)
731 }
732
733 /// Completely write file data for the current file entry in a pxar archive.
734 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
735 self.check_remaining(data.len())?;
736 seq_write_all(&mut self.output, data).await?;
737 self.remaining_size -= data.len() as u64;
738 Ok(())
739 }
740 }
741
742 #[cfg(feature = "tokio-io")]
743 impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
744 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
745 FileImpl::poll_write(self, cx, buf)
746 }
747
748 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
749 FileImpl::poll_flush(self, cx)
750 }
751
752 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
753 FileImpl::poll_close(self, cx)
754 }
755 }
756
757 #[cfg(feature = "futures-io")]
758 impl<'a> futures::io::AsyncWrite for FileImpl<'a> {
759 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
760 FileImpl::poll_write(self, cx, buf)
761 }
762
763 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
764 FileImpl::poll_flush(self, cx)
765 }
766
767 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
768 FileImpl::poll_close(self, cx)
769 }
770 }