]> git.proxmox.com Git - pxar.git/blob - src/encoder/mod.rs
3e1d7219621982f74b0d785ca40a12d7a18b1291
[pxar.git] / src / encoder / mod.rs
1 //! The `pxar` encoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 use std::cell::RefCell;
6 use std::io;
7 use std::mem::{forget, size_of, size_of_val, take};
8 use std::os::unix::ffi::OsStrExt;
9 use std::path::Path;
10 use std::pin::Pin;
11 use std::rc::Rc;
12 use std::task::{Context, Poll};
13
14 use endian_trait::Endian;
15
16 use crate::binary_tree_array;
17 use crate::decoder::{self, SeqRead};
18 use crate::format::{self, GoodbyeItem};
19 use crate::poll_fn::poll_fn;
20 use crate::Metadata;
21
22 pub mod aio;
23 pub mod sync;
24
25 #[doc(inline)]
26 pub use sync::Encoder;
27
28 /// File reference used to create hard links.
29 #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
30 pub struct LinkOffset(u64);
31
32 impl LinkOffset {
33 #[inline]
34 pub fn raw(self) -> u64 {
35 self.0
36 }
37 }
38
39 /// Sequential write interface used by the encoder's state machine.
40 ///
41 /// This is our internal writer trait which is available for `std::io::Write` types in the
42 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
43 /// wrapper.
44 pub trait SeqWrite {
45 fn poll_seq_write(
46 self: Pin<&mut Self>,
47 cx: &mut Context,
48 buf: &[u8],
49 ) -> Poll<io::Result<usize>>;
50
51 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
52
53 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
54
55 /// While writing to a pxar archive we need to remember how much dat we've written to track some
56 /// offsets. Particularly items like the goodbye table need to be able to compute offsets to
57 /// further back in the archive.
58 fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>>;
59
60 /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
61 /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
62 /// subdirectory in that would have a trait object pointing to the trait object, and so on.
63 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
64 where
65 Self: Sized,
66 {
67 self as &mut dyn SeqWrite
68 }
69 }
70
71 /// Allow using trait objects for generics taking a `SeqWrite`.
72 impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
73 fn poll_seq_write(
74 self: Pin<&mut Self>,
75 cx: &mut Context,
76 buf: &[u8],
77 ) -> Poll<io::Result<usize>> {
78 unsafe {
79 self.map_unchecked_mut(|this| &mut **this)
80 .poll_seq_write(cx, buf)
81 }
82 }
83
84 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
85 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
86 }
87
88 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
89 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_close(cx) }
90 }
91
92 fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
93 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_position(cx) }
94 }
95
96 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
97 where
98 Self: Sized,
99 {
100 &mut **self
101 }
102 }
103
104 /// awaitable version of `poll_position`.
105 async fn seq_write_position<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<u64> {
106 poll_fn(move |cx| unsafe { Pin::new_unchecked(&mut *output).poll_position(cx) }).await
107 }
108
109 /// awaitable verison of `poll_seq_write`.
110 async fn seq_write<T: SeqWrite + ?Sized>(output: &mut T, buf: &[u8]) -> io::Result<usize> {
111 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await
112 }
113
114 /// Write the entire contents of a buffer, handling short writes.
115 async fn seq_write_all<T: SeqWrite + ?Sized>(output: &mut T, mut buf: &[u8]) -> io::Result<()> {
116 while !buf.is_empty() {
117 let got = seq_write(&mut *output, buf).await?;
118 buf = &buf[got..];
119 }
120 Ok(())
121 }
122
123 /// Write an endian-swappable struct.
124 async fn seq_write_struct<E: Endian, T>(output: &mut T, data: E) -> io::Result<()>
125 where
126 T: SeqWrite + ?Sized,
127 {
128 let data = data.to_le();
129 seq_write_all(output, unsafe {
130 std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
131 })
132 .await
133 }
134
135 /// Write a pxar entry.
136 async fn seq_write_pxar_entry<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
137 where
138 T: SeqWrite + ?Sized,
139 {
140 seq_write_struct(
141 &mut *output,
142 format::Header::with_content_size(htype, data.len() as u64),
143 )
144 .await?;
145 seq_write_all(output, data).await
146 }
147
148 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
149 /// data buffer.
150 async fn seq_write_pxar_entry_zero<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
151 where
152 T: SeqWrite + ?Sized,
153 {
154 seq_write_struct(
155 &mut *output,
156 format::Header::with_content_size(htype, 1 + data.len() as u64),
157 )
158 .await?;
159 seq_write_all(&mut *output, data).await?;
160 seq_write_all(output, &[0u8]).await
161 }
162
163 /// Write a pxar entry consiting of an endian-swappable struct.
164 async fn seq_write_pxar_struct_entry<E, T>(output: &mut T, htype: u64, data: E) -> io::Result<()>
165 where
166 T: SeqWrite + ?Sized,
167 E: Endian,
168 {
169 let data = data.to_le();
170 seq_write_pxar_entry(output, htype, unsafe {
171 std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
172 })
173 .await
174 }
175
176 /// Error conditions caused by wrong usage of this crate.
177 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
178 pub enum EncodeError {
179 /// The user dropped a `File` without without finishing writing all of its contents.
180 ///
181 /// This is required because the payload lengths is written out at the begining and decoding
182 /// requires there to follow the right amount of data.
183 IncompleteFile,
184
185 /// The user dropped a directory without finalizing it.
186 ///
187 /// Finalizing is required to build the goodbye table at the end of a directory.
188 IncompleteDirectory,
189 }
190
191 #[derive(Default)]
192 struct EncoderState {
193 /// Goodbye items for this directory, excluding the tail.
194 items: Vec<GoodbyeItem>,
195
196 /// User caused error conditions.
197 encode_error: Option<EncodeError>,
198
199 /// Offset of this directory's ENTRY.
200 entry_offset: u64,
201
202 /// Offset to this directory's first FILENAME.
203 files_offset: u64,
204
205 /// If this is a subdirectory, this points to the this directory's FILENAME.
206 file_offset: Option<u64>,
207
208 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
209 file_hash: u64,
210 }
211
212 impl EncoderState {
213 fn merge_error(&mut self, error: Option<EncodeError>) {
214 // one error is enough:
215 if self.encode_error.is_none() {
216 self.encode_error = error;
217 }
218 }
219
220 fn add_error(&mut self, error: EncodeError) {
221 self.merge_error(Some(error));
222 }
223 }
224
225 /// The encoder state machine implementation for a directory.
226 ///
227 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
228 /// synchronous or `async` I/O objects in as output.
229 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
230 output: T,
231 state: EncoderState,
232 parent: Option<&'a mut EncoderState>,
233 finished: bool,
234
235 /// Since only the "current" entry can be actively writing files, we share the file copy
236 /// buffer.
237 file_copy_buffer: Rc<RefCell<Vec<u8>>>,
238 }
239
240 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
241 fn drop(&mut self) {
242 if let Some(ref mut parent) = self.parent {
243 // propagate errors:
244 parent.merge_error(self.state.encode_error);
245 if !self.finished {
246 parent.add_error(EncodeError::IncompleteDirectory);
247 }
248 } else if !self.finished {
249 // FIXME: how do we deal with this?
250 // eprintln!("Encoder dropped without finishing!");
251 }
252 }
253 }
254
255 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
256 pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
257 if !metadata.is_dir() {
258 io_bail!("directory metadata must contain the directory mode flag");
259 }
260 let mut this = Self {
261 output,
262 state: EncoderState::default(),
263 parent: None,
264 finished: false,
265 file_copy_buffer: Rc::new(RefCell::new(crate::util::vec_new(1024 * 1024))),
266 };
267
268 this.encode_metadata(metadata).await?;
269 this.state.files_offset = seq_write_position(&mut this.output).await?;
270
271 Ok(this)
272 }
273
274 fn check(&self) -> io::Result<()> {
275 match self.state.encode_error {
276 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
277 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
278 None => Ok(()),
279 }
280 }
281
282 pub async fn create_file<'b>(
283 &'b mut self,
284 metadata: &Metadata,
285 file_name: &Path,
286 file_size: u64,
287 ) -> io::Result<FileImpl<'b>>
288 where
289 'a: 'b,
290 {
291 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
292 .await
293 }
294
295 async fn create_file_do<'b>(
296 &'b mut self,
297 metadata: &Metadata,
298 file_name: &[u8],
299 file_size: u64,
300 ) -> io::Result<FileImpl<'b>>
301 where
302 'a: 'b,
303 {
304 self.check()?;
305
306 let file_offset = seq_write_position(&mut self.output).await?;
307 self.start_file_do(Some(metadata), file_name).await?;
308
309 seq_write_struct(
310 &mut self.output,
311 format::Header::with_content_size(format::PXAR_PAYLOAD, file_size),
312 )
313 .await?;
314
315 let payload_data_offset = seq_write_position(&mut self.output).await?;
316
317 let meta_size = payload_data_offset - file_offset;
318
319 Ok(FileImpl {
320 output: &mut self.output,
321 goodbye_item: GoodbyeItem {
322 hash: format::hash_filename(file_name),
323 offset: file_offset,
324 size: file_size + meta_size,
325 },
326 remaining_size: file_size,
327 parent: &mut self.state,
328 })
329 }
330
331 /// Return a file offset usable with `add_hardlink`.
332 pub async fn add_file(
333 &mut self,
334 metadata: &Metadata,
335 file_name: &Path,
336 file_size: u64,
337 content: &mut dyn SeqRead,
338 ) -> io::Result<LinkOffset> {
339 let buf = Rc::clone(&self.file_copy_buffer);
340 let mut file = self.create_file(metadata, file_name, file_size).await?;
341 let mut buf = buf.borrow_mut();
342 loop {
343 let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
344 if got == 0 {
345 break;
346 } else {
347 file.write_all(&buf[..got]).await?;
348 }
349 }
350 Ok(file.file_offset())
351 }
352
353 /// Return a file offset usable with `add_hardlink`.
354 pub async fn add_symlink(
355 &mut self,
356 metadata: &Metadata,
357 file_name: &Path,
358 target: &Path,
359 ) -> io::Result<()> {
360 let _ofs: LinkOffset = self
361 .add_file_entry(
362 Some(metadata),
363 file_name,
364 Some((format::PXAR_SYMLINK, target.as_os_str().as_bytes())),
365 )
366 .await?;
367 Ok(())
368 }
369
370 /// Return a file offset usable with `add_hardlink`.
371 pub async fn add_hardlink(
372 &mut self,
373 file_name: &Path,
374 target: &Path,
375 target_offset: LinkOffset,
376 ) -> io::Result<()> {
377 let current_offset = seq_write_position(&mut self.output).await?;
378 if current_offset <= target_offset.0 {
379 io_bail!("invalid hardlink offset, can only point to prior files");
380 }
381
382 let offset_bytes = (current_offset - target_offset.0).to_le_bytes();
383 let target_bytes = target.as_os_str().as_bytes();
384 let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len());
385 hardlink.extend(&offset_bytes);
386 hardlink.extend(target_bytes);
387 let _this_offset: LinkOffset = self
388 .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink)))
389 .await?;
390 Ok(())
391 }
392
393 /// Return a file offset usable with `add_hardlink`.
394 pub async fn add_device(
395 &mut self,
396 metadata: &Metadata,
397 file_name: &Path,
398 device: format::Device,
399 ) -> io::Result<()> {
400 if !metadata.is_device() {
401 io_bail!("entry added via add_device must have a device mode in its metadata");
402 }
403
404 let device = device.to_le();
405 let device = unsafe {
406 std::slice::from_raw_parts(
407 &device as *const format::Device as *const u8,
408 size_of::<format::Device>(),
409 )
410 };
411 let _ofs: LinkOffset = self
412 .add_file_entry(
413 Some(metadata),
414 file_name,
415 Some((format::PXAR_DEVICE, device)),
416 )
417 .await?;
418 Ok(())
419 }
420
421 /// Return a file offset usable with `add_hardlink`.
422 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
423 if !metadata.is_fifo() {
424 io_bail!("entry added via add_device must be of type fifo in its metadata");
425 }
426
427 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
428 Ok(())
429 }
430
431 /// Return a file offset usable with `add_hardlink`.
432 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
433 if !metadata.is_socket() {
434 io_bail!("entry added via add_device must be of type socket in its metadata");
435 }
436
437 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
438 Ok(())
439 }
440
441 /// Return a file offset usable with `add_hardlink`.
442 async fn add_file_entry(
443 &mut self,
444 metadata: Option<&Metadata>,
445 file_name: &Path,
446 entry_htype_data: Option<(u64, &[u8])>,
447 ) -> io::Result<LinkOffset> {
448 self.check()?;
449
450 let file_offset = seq_write_position(&mut self.output).await?;
451
452 let file_name = file_name.as_os_str().as_bytes();
453
454 self.start_file_do(metadata, file_name).await?;
455 if let Some((htype, entry_data)) = entry_htype_data {
456 seq_write_pxar_entry_zero(&mut self.output, htype, entry_data).await?;
457 }
458
459 let end_offset = seq_write_position(&mut self.output).await?;
460
461 self.state.items.push(GoodbyeItem {
462 hash: format::hash_filename(file_name),
463 offset: file_offset,
464 size: end_offset - file_offset,
465 });
466
467 Ok(LinkOffset(file_offset))
468 }
469
470 /// Helper
471 #[inline]
472 async fn position(&mut self) -> io::Result<u64> {
473 seq_write_position(&mut self.output).await
474 }
475
476 pub async fn create_directory<'b>(
477 &'b mut self,
478 file_name: &Path,
479 metadata: &Metadata,
480 ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>>
481 where
482 'a: 'b,
483 {
484 self.check()?;
485
486 if !metadata.is_dir() {
487 io_bail!("directory metadata must contain the directory mode flag");
488 }
489
490 let file_name = file_name.as_os_str().as_bytes();
491 let file_hash = format::hash_filename(file_name);
492
493 let file_offset = self.position().await?;
494 self.encode_filename(file_name).await?;
495
496 let entry_offset = self.position().await?;
497 self.encode_metadata(&metadata).await?;
498
499 let files_offset = self.position().await?;
500
501 Ok(EncoderImpl {
502 output: self.output.as_trait_object(),
503 state: EncoderState {
504 entry_offset,
505 files_offset,
506 file_offset: Some(file_offset),
507 file_hash: file_hash,
508 ..Default::default()
509 },
510 parent: Some(&mut self.state),
511 finished: false,
512 file_copy_buffer: Rc::clone(&self.file_copy_buffer),
513 })
514 }
515
516 async fn start_file_do(
517 &mut self,
518 metadata: Option<&Metadata>,
519 file_name: &[u8],
520 ) -> io::Result<()> {
521 self.encode_filename(file_name).await?;
522 if let Some(metadata) = metadata {
523 self.encode_metadata(&metadata).await?;
524 }
525 Ok(())
526 }
527
528 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
529 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ENTRY, metadata.stat.clone())
530 .await?;
531
532 for xattr in &metadata.xattrs {
533 self.write_xattr(xattr).await?;
534 }
535
536 self.write_acls(&metadata.acl).await?;
537
538 if let Some(fcaps) = &metadata.fcaps {
539 self.write_file_capabilities(fcaps).await?;
540 }
541
542 if let Some(qpid) = &metadata.quota_project_id {
543 self.write_quota_project_id(qpid).await?;
544 }
545
546 Ok(())
547 }
548
549 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
550 seq_write_pxar_entry(&mut self.output, format::PXAR_XATTR, &xattr.data).await
551 }
552
553 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
554 for acl in &acl.users {
555 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_USER, acl.clone())
556 .await?;
557 }
558
559 for acl in &acl.groups {
560 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP, acl.clone())
561 .await?;
562 }
563
564 if let Some(acl) = &acl.group_obj {
565 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP_OBJ, acl.clone())
566 .await?;
567 }
568
569 if let Some(acl) = &acl.default {
570 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_DEFAULT, acl.clone())
571 .await?;
572 }
573
574 for acl in &acl.default_users {
575 seq_write_pxar_struct_entry(
576 &mut self.output,
577 format::PXAR_ACL_DEFAULT_USER,
578 acl.clone(),
579 )
580 .await?;
581 }
582
583 for acl in &acl.default_groups {
584 seq_write_pxar_struct_entry(
585 &mut self.output,
586 format::PXAR_ACL_DEFAULT_GROUP,
587 acl.clone(),
588 )
589 .await?;
590 }
591
592 Ok(())
593 }
594
595 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
596 seq_write_pxar_entry(&mut self.output, format::PXAR_FCAPS, &fcaps.data).await
597 }
598
599 async fn write_quota_project_id(
600 &mut self,
601 quota_project_id: &format::QuotaProjectId,
602 ) -> io::Result<()> {
603 seq_write_pxar_struct_entry(
604 &mut self.output,
605 format::PXAR_QUOTA_PROJID,
606 quota_project_id.clone(),
607 )
608 .await
609 }
610
611 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
612 crate::util::validate_filename(file_name)?;
613 seq_write_pxar_entry_zero(&mut self.output, format::PXAR_FILENAME, file_name).await
614 }
615
616 pub async fn finish(mut self) -> io::Result<()> {
617 let tail_bytes = self.finish_goodbye_table().await?;
618 seq_write_pxar_entry(&mut self.output, format::PXAR_GOODBYE, &tail_bytes).await?;
619 if let Some(parent) = &mut self.parent {
620 let file_offset = self
621 .state
622 .file_offset
623 .expect("internal error: parent set but no file_offset?");
624
625 let end_offset = seq_write_position(&mut self.output).await?;
626
627 parent.items.push(GoodbyeItem {
628 hash: self.state.file_hash,
629 offset: file_offset,
630 size: end_offset - file_offset,
631 });
632 }
633 self.finished = true;
634 Ok(())
635 }
636
637 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
638 let goodbye_offset = seq_write_position(&mut self.output).await?;
639
640 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
641 let mut tail = take(&mut self.state.items);
642 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
643 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
644
645 // sort, then create a BST
646 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
647
648 let mut bst = Vec::with_capacity(tail.len() + 1);
649 unsafe {
650 bst.set_len(tail.len());
651 }
652 binary_tree_array::copy(tail.len(), |src, dest| {
653 let mut item = tail[src].clone();
654 // fixup the goodbye table offsets to be relative and with the right endianess
655 item.offset = goodbye_offset - item.offset;
656 unsafe {
657 std::ptr::write(&mut bst[dest], item.to_le());
658 }
659 });
660 drop(tail);
661
662 bst.push(
663 GoodbyeItem {
664 hash: format::PXAR_GOODBYE_TAIL_MARKER,
665 offset: goodbye_offset - self.state.entry_offset,
666 size: goodbye_size,
667 }
668 .to_le(),
669 );
670
671 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
672 // the items make sense:
673 let data = bst.as_mut_ptr() as *mut u8;
674 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
675 forget(bst);
676 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
677 }
678 }
679
680 /// Writer for a file object in a directory.
681 pub struct FileImpl<'a> {
682 output: &'a mut dyn SeqWrite,
683
684 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
685 /// directly instead of on Drop of FileImpl?
686 goodbye_item: GoodbyeItem,
687
688 /// While writing data to this file, this is how much space we still have left, this must reach
689 /// exactly zero.
690 remaining_size: u64,
691
692 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
693 /// to, and where we insert our `GoodbyeItem`.
694 parent: &'a mut EncoderState,
695 }
696
697 impl<'a> Drop for FileImpl<'a> {
698 fn drop(&mut self) {
699 if self.remaining_size != 0 {
700 self.parent.add_error(EncodeError::IncompleteFile);
701 }
702
703 self.parent.items.push(self.goodbye_item.clone());
704 }
705 }
706
707 impl<'a> FileImpl<'a> {
708 /// Get the file offset to be able to reference it with `add_hardlink`.
709 pub fn file_offset(&self) -> LinkOffset {
710 LinkOffset(self.goodbye_item.offset)
711 }
712
713 fn check_remaining(&self, size: usize) -> io::Result<()> {
714 if size as u64 > self.remaining_size {
715 io_bail!("attempted to write more than previously allocated");
716 } else {
717 Ok(())
718 }
719 }
720
721 /// Poll write interface to more easily connect to tokio/futures.
722 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
723 pub fn poll_write(
724 self: Pin<&mut Self>,
725 cx: &mut Context,
726 data: &[u8],
727 ) -> Poll<io::Result<usize>> {
728 let this = self.get_mut();
729 this.check_remaining(data.len())?;
730 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
731 match output.poll_seq_write(cx, data) {
732 Poll::Ready(Ok(put)) => {
733 this.remaining_size -= put as u64;
734 Poll::Ready(Ok(put))
735 }
736 other => other,
737 }
738 }
739
740 /// Poll flush interface to more easily connect to tokio/futures.
741 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
742 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
743 unsafe {
744 self.map_unchecked_mut(|this| &mut this.output)
745 .poll_flush(cx)
746 }
747 }
748
749 /// Poll close/shutdown interface to more easily connect to tokio/futures.
750 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
751 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
752 unsafe {
753 self.map_unchecked_mut(|this| &mut this.output)
754 .poll_close(cx)
755 }
756 }
757
758 /// Write file data for the current file entry in a pxar archive.
759 ///
760 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
761 /// requested. Check the return value for how many. There's also a `write_all` method available
762 /// for convenience.
763 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
764 self.check_remaining(data.len())?;
765 let put = seq_write(&mut self.output, data).await?;
766 self.remaining_size -= put as u64;
767 Ok(put)
768 }
769
770 /// Completely write file data for the current file entry in a pxar archive.
771 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
772 self.check_remaining(data.len())?;
773 seq_write_all(&mut self.output, data).await?;
774 self.remaining_size -= data.len() as u64;
775 Ok(())
776 }
777 }
778
779 #[cfg(feature = "tokio-io")]
780 impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
781 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
782 FileImpl::poll_write(self, cx, buf)
783 }
784
785 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
786 FileImpl::poll_flush(self, cx)
787 }
788
789 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
790 FileImpl::poll_close(self, cx)
791 }
792 }
793
794 #[cfg(feature = "futures-io")]
795 impl<'a> futures::io::AsyncWrite for FileImpl<'a> {
796 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
797 FileImpl::poll_write(self, cx, buf)
798 }
799
800 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
801 FileImpl::poll_flush(self, cx)
802 }
803
804 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
805 FileImpl::poll_close(self, cx)
806 }
807 }