]> git.proxmox.com Git - pxar.git/blob - src/encoder/mod.rs
more clippy lints
[pxar.git] / src / encoder / mod.rs
1 //! The `pxar` encoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 use std::cell::RefCell;
6 use std::io;
7 use std::mem::{forget, size_of, size_of_val, take};
8 use std::os::unix::ffi::OsStrExt;
9 use std::path::Path;
10 use std::pin::Pin;
11 use std::rc::Rc;
12 use std::task::{Context, Poll};
13
14 use endian_trait::Endian;
15
16 use crate::binary_tree_array;
17 use crate::decoder::{self, SeqRead};
18 use crate::format::{self, GoodbyeItem};
19 use crate::poll_fn::poll_fn;
20 use crate::Metadata;
21
22 pub mod aio;
23 pub mod sync;
24
25 #[doc(inline)]
26 pub use sync::Encoder;
27
28 /// File reference used to create hard links.
29 #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
30 pub struct LinkOffset(u64);
31
32 impl LinkOffset {
33 #[inline]
34 pub fn raw(self) -> u64 {
35 self.0
36 }
37 }
38
39 /// Sequential write interface used by the encoder's state machine.
40 ///
41 /// This is our internal writer trait which is available for `std::io::Write` types in the
42 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
43 /// wrapper.
44 pub trait SeqWrite {
45 fn poll_seq_write(
46 self: Pin<&mut Self>,
47 cx: &mut Context,
48 buf: &[u8],
49 ) -> Poll<io::Result<usize>>;
50
51 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
52
53 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
54
55 /// While writing to a pxar archive we need to remember how much dat we've written to track some
56 /// offsets. Particularly items like the goodbye table need to be able to compute offsets to
57 /// further back in the archive.
58 fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>>;
59
60 /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
61 /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
62 /// subdirectory in that would have a trait object pointing to the trait object, and so on.
63 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
64 where
65 Self: Sized,
66 {
67 self as &mut dyn SeqWrite
68 }
69 }
70
71 /// Allow using trait objects for generics taking a `SeqWrite`.
72 impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
73 fn poll_seq_write(
74 self: Pin<&mut Self>,
75 cx: &mut Context,
76 buf: &[u8],
77 ) -> Poll<io::Result<usize>> {
78 unsafe {
79 self.map_unchecked_mut(|this| &mut **this)
80 .poll_seq_write(cx, buf)
81 }
82 }
83
84 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
85 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
86 }
87
88 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
89 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_close(cx) }
90 }
91
92 fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
93 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_position(cx) }
94 }
95
96 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
97 where
98 Self: Sized,
99 {
100 &mut **self
101 }
102 }
103
104 /// awaitable version of `poll_position`.
105 async fn seq_write_position<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<u64> {
106 poll_fn(move |cx| unsafe { Pin::new_unchecked(&mut *output).poll_position(cx) }).await
107 }
108
109 /// awaitable verison of `poll_seq_write`.
110 async fn seq_write<T: SeqWrite + ?Sized>(output: &mut T, buf: &[u8]) -> io::Result<usize> {
111 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await
112 }
113
114 /// Write the entire contents of a buffer, handling short writes.
115 async fn seq_write_all<T: SeqWrite + ?Sized>(output: &mut T, mut buf: &[u8]) -> io::Result<()> {
116 while !buf.is_empty() {
117 let got = seq_write(&mut *output, buf).await?;
118 buf = &buf[got..];
119 }
120 Ok(())
121 }
122
123 /// Write an endian-swappable struct.
124 async fn seq_write_struct<E: Endian, T>(output: &mut T, data: E) -> io::Result<()>
125 where
126 T: SeqWrite + ?Sized,
127 {
128 let data = data.to_le();
129 seq_write_all(output, unsafe {
130 std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
131 })
132 .await
133 }
134
135 /// Write a pxar entry.
136 async fn seq_write_pxar_entry<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
137 where
138 T: SeqWrite + ?Sized,
139 {
140 let header = format::Header::with_content_size(htype, data.len() as u64);
141 header.check_header_size()?;
142
143 seq_write_struct(&mut *output, header).await?;
144 seq_write_all(output, data).await
145 }
146
147 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
148 /// data buffer.
149 async fn seq_write_pxar_entry_zero<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
150 where
151 T: SeqWrite + ?Sized,
152 {
153 let header = format::Header::with_content_size(htype, 1 + data.len() as u64);
154 header.check_header_size()?;
155
156 seq_write_struct(&mut *output, header).await?;
157 seq_write_all(&mut *output, data).await?;
158 seq_write_all(output, &[0u8]).await
159 }
160
161 /// Write a pxar entry consiting of an endian-swappable struct.
162 async fn seq_write_pxar_struct_entry<E, T>(output: &mut T, htype: u64, data: E) -> io::Result<()>
163 where
164 T: SeqWrite + ?Sized,
165 E: Endian,
166 {
167 let data = data.to_le();
168 seq_write_pxar_entry(output, htype, unsafe {
169 std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
170 })
171 .await
172 }
173
174 /// Error conditions caused by wrong usage of this crate.
175 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
176 pub enum EncodeError {
177 /// The user dropped a `File` without without finishing writing all of its contents.
178 ///
179 /// This is required because the payload lengths is written out at the begining and decoding
180 /// requires there to follow the right amount of data.
181 IncompleteFile,
182
183 /// The user dropped a directory without finalizing it.
184 ///
185 /// Finalizing is required to build the goodbye table at the end of a directory.
186 IncompleteDirectory,
187 }
188
189 #[derive(Default)]
190 struct EncoderState {
191 /// Goodbye items for this directory, excluding the tail.
192 items: Vec<GoodbyeItem>,
193
194 /// User caused error conditions.
195 encode_error: Option<EncodeError>,
196
197 /// Offset of this directory's ENTRY.
198 entry_offset: u64,
199
200 /// Offset to this directory's first FILENAME.
201 files_offset: u64,
202
203 /// If this is a subdirectory, this points to the this directory's FILENAME.
204 file_offset: Option<u64>,
205
206 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
207 file_hash: u64,
208 }
209
210 impl EncoderState {
211 fn merge_error(&mut self, error: Option<EncodeError>) {
212 // one error is enough:
213 if self.encode_error.is_none() {
214 self.encode_error = error;
215 }
216 }
217
218 fn add_error(&mut self, error: EncodeError) {
219 self.merge_error(Some(error));
220 }
221 }
222
223 /// The encoder state machine implementation for a directory.
224 ///
225 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
226 /// synchronous or `async` I/O objects in as output.
227 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
228 output: T,
229 state: EncoderState,
230 parent: Option<&'a mut EncoderState>,
231 finished: bool,
232
233 /// Since only the "current" entry can be actively writing files, we share the file copy
234 /// buffer.
235 file_copy_buffer: Rc<RefCell<Vec<u8>>>,
236 }
237
238 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
239 fn drop(&mut self) {
240 if let Some(ref mut parent) = self.parent {
241 // propagate errors:
242 parent.merge_error(self.state.encode_error);
243 if !self.finished {
244 parent.add_error(EncodeError::IncompleteDirectory);
245 }
246 } else if !self.finished {
247 // FIXME: how do we deal with this?
248 // eprintln!("Encoder dropped without finishing!");
249 }
250 }
251 }
252
253 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
254 pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
255 if !metadata.is_dir() {
256 io_bail!("directory metadata must contain the directory mode flag");
257 }
258 let mut this = Self {
259 output,
260 state: EncoderState::default(),
261 parent: None,
262 finished: false,
263 file_copy_buffer: Rc::new(RefCell::new(crate::util::vec_new(1024 * 1024))),
264 };
265
266 this.encode_metadata(metadata).await?;
267 this.state.files_offset = seq_write_position(&mut this.output).await?;
268
269 Ok(this)
270 }
271
272 fn check(&self) -> io::Result<()> {
273 match self.state.encode_error {
274 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
275 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
276 None => Ok(()),
277 }
278 }
279
280 pub async fn create_file<'b>(
281 &'b mut self,
282 metadata: &Metadata,
283 file_name: &Path,
284 file_size: u64,
285 ) -> io::Result<FileImpl<'b>>
286 where
287 'a: 'b,
288 {
289 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
290 .await
291 }
292
293 async fn create_file_do<'b>(
294 &'b mut self,
295 metadata: &Metadata,
296 file_name: &[u8],
297 file_size: u64,
298 ) -> io::Result<FileImpl<'b>>
299 where
300 'a: 'b,
301 {
302 self.check()?;
303
304 let file_offset = seq_write_position(&mut self.output).await?;
305 self.start_file_do(Some(metadata), file_name).await?;
306
307 let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
308 header.check_header_size()?;
309
310 seq_write_struct(&mut self.output, header).await?;
311
312 let payload_data_offset = seq_write_position(&mut self.output).await?;
313
314 let meta_size = payload_data_offset - file_offset;
315
316 Ok(FileImpl {
317 output: &mut self.output,
318 goodbye_item: GoodbyeItem {
319 hash: format::hash_filename(file_name),
320 offset: file_offset,
321 size: file_size + meta_size,
322 },
323 remaining_size: file_size,
324 parent: &mut self.state,
325 })
326 }
327
328 /// Return a file offset usable with `add_hardlink`.
329 pub async fn add_file(
330 &mut self,
331 metadata: &Metadata,
332 file_name: &Path,
333 file_size: u64,
334 content: &mut dyn SeqRead,
335 ) -> io::Result<LinkOffset> {
336 let buf = Rc::clone(&self.file_copy_buffer);
337 let mut file = self.create_file(metadata, file_name, file_size).await?;
338 let mut buf = buf.borrow_mut();
339 loop {
340 let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
341 if got == 0 {
342 break;
343 } else {
344 file.write_all(&buf[..got]).await?;
345 }
346 }
347 Ok(file.file_offset())
348 }
349
350 /// Return a file offset usable with `add_hardlink`.
351 pub async fn add_symlink(
352 &mut self,
353 metadata: &Metadata,
354 file_name: &Path,
355 target: &Path,
356 ) -> io::Result<()> {
357 let target = target.as_os_str().as_bytes();
358 let mut data = Vec::with_capacity(target.len() + 1);
359 data.extend(target);
360 data.push(0);
361 let _ofs: LinkOffset = self
362 .add_file_entry(
363 Some(metadata),
364 file_name,
365 Some((format::PXAR_SYMLINK, &data)),
366 )
367 .await?;
368 Ok(())
369 }
370
371 /// Return a file offset usable with `add_hardlink`.
372 pub async fn add_hardlink(
373 &mut self,
374 file_name: &Path,
375 target: &Path,
376 target_offset: LinkOffset,
377 ) -> io::Result<()> {
378 let current_offset = seq_write_position(&mut self.output).await?;
379 if current_offset <= target_offset.0 {
380 io_bail!("invalid hardlink offset, can only point to prior files");
381 }
382
383 let offset_bytes = (current_offset - target_offset.0).to_le_bytes();
384 let target_bytes = target.as_os_str().as_bytes();
385 let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len() + 1);
386 hardlink.extend(&offset_bytes);
387 hardlink.extend(target_bytes);
388 hardlink.push(0);
389 let _this_offset: LinkOffset = self
390 .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink)))
391 .await?;
392 Ok(())
393 }
394
395 /// Return a file offset usable with `add_hardlink`.
396 pub async fn add_device(
397 &mut self,
398 metadata: &Metadata,
399 file_name: &Path,
400 device: format::Device,
401 ) -> io::Result<()> {
402 if !metadata.is_device() {
403 io_bail!("entry added via add_device must have a device mode in its metadata");
404 }
405
406 let device = device.to_le();
407 let device = unsafe {
408 std::slice::from_raw_parts(
409 &device as *const format::Device as *const u8,
410 size_of::<format::Device>(),
411 )
412 };
413 let _ofs: LinkOffset = self
414 .add_file_entry(
415 Some(metadata),
416 file_name,
417 Some((format::PXAR_DEVICE, device)),
418 )
419 .await?;
420 Ok(())
421 }
422
423 /// Return a file offset usable with `add_hardlink`.
424 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
425 if !metadata.is_fifo() {
426 io_bail!("entry added via add_device must be of type fifo in its metadata");
427 }
428
429 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
430 Ok(())
431 }
432
433 /// Return a file offset usable with `add_hardlink`.
434 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
435 if !metadata.is_socket() {
436 io_bail!("entry added via add_device must be of type socket in its metadata");
437 }
438
439 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
440 Ok(())
441 }
442
443 /// Return a file offset usable with `add_hardlink`.
444 async fn add_file_entry(
445 &mut self,
446 metadata: Option<&Metadata>,
447 file_name: &Path,
448 entry_htype_data: Option<(u64, &[u8])>,
449 ) -> io::Result<LinkOffset> {
450 self.check()?;
451
452 let file_offset = seq_write_position(&mut self.output).await?;
453
454 let file_name = file_name.as_os_str().as_bytes();
455
456 self.start_file_do(metadata, file_name).await?;
457 if let Some((htype, entry_data)) = entry_htype_data {
458 seq_write_pxar_entry(&mut self.output, htype, entry_data).await?;
459 }
460
461 let end_offset = seq_write_position(&mut self.output).await?;
462
463 self.state.items.push(GoodbyeItem {
464 hash: format::hash_filename(file_name),
465 offset: file_offset,
466 size: end_offset - file_offset,
467 });
468
469 Ok(LinkOffset(file_offset))
470 }
471
472 /// Helper
473 #[inline]
474 async fn position(&mut self) -> io::Result<u64> {
475 seq_write_position(&mut self.output).await
476 }
477
478 pub async fn create_directory<'b>(
479 &'b mut self,
480 file_name: &Path,
481 metadata: &Metadata,
482 ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>>
483 where
484 'a: 'b,
485 {
486 self.check()?;
487
488 if !metadata.is_dir() {
489 io_bail!("directory metadata must contain the directory mode flag");
490 }
491
492 let file_name = file_name.as_os_str().as_bytes();
493 let file_hash = format::hash_filename(file_name);
494
495 let file_offset = self.position().await?;
496 self.encode_filename(file_name).await?;
497
498 let entry_offset = self.position().await?;
499 self.encode_metadata(&metadata).await?;
500
501 let files_offset = self.position().await?;
502
503 Ok(EncoderImpl {
504 output: self.output.as_trait_object(),
505 state: EncoderState {
506 entry_offset,
507 files_offset,
508 file_offset: Some(file_offset),
509 file_hash,
510 ..Default::default()
511 },
512 parent: Some(&mut self.state),
513 finished: false,
514 file_copy_buffer: Rc::clone(&self.file_copy_buffer),
515 })
516 }
517
518 async fn start_file_do(
519 &mut self,
520 metadata: Option<&Metadata>,
521 file_name: &[u8],
522 ) -> io::Result<()> {
523 self.encode_filename(file_name).await?;
524 if let Some(metadata) = metadata {
525 self.encode_metadata(&metadata).await?;
526 }
527 Ok(())
528 }
529
530 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
531 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ENTRY, metadata.stat.clone())
532 .await?;
533
534 for xattr in &metadata.xattrs {
535 self.write_xattr(xattr).await?;
536 }
537
538 self.write_acls(&metadata.acl).await?;
539
540 if let Some(fcaps) = &metadata.fcaps {
541 self.write_file_capabilities(fcaps).await?;
542 }
543
544 if let Some(qpid) = &metadata.quota_project_id {
545 self.write_quota_project_id(qpid).await?;
546 }
547
548 Ok(())
549 }
550
551 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
552 seq_write_pxar_entry(&mut self.output, format::PXAR_XATTR, &xattr.data).await
553 }
554
555 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
556 for acl in &acl.users {
557 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_USER, acl.clone())
558 .await?;
559 }
560
561 for acl in &acl.groups {
562 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP, acl.clone())
563 .await?;
564 }
565
566 if let Some(acl) = &acl.group_obj {
567 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP_OBJ, acl.clone())
568 .await?;
569 }
570
571 if let Some(acl) = &acl.default {
572 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_DEFAULT, acl.clone())
573 .await?;
574 }
575
576 for acl in &acl.default_users {
577 seq_write_pxar_struct_entry(
578 &mut self.output,
579 format::PXAR_ACL_DEFAULT_USER,
580 acl.clone(),
581 )
582 .await?;
583 }
584
585 for acl in &acl.default_groups {
586 seq_write_pxar_struct_entry(
587 &mut self.output,
588 format::PXAR_ACL_DEFAULT_GROUP,
589 acl.clone(),
590 )
591 .await?;
592 }
593
594 Ok(())
595 }
596
597 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
598 seq_write_pxar_entry(&mut self.output, format::PXAR_FCAPS, &fcaps.data).await
599 }
600
601 async fn write_quota_project_id(
602 &mut self,
603 quota_project_id: &format::QuotaProjectId,
604 ) -> io::Result<()> {
605 seq_write_pxar_struct_entry(
606 &mut self.output,
607 format::PXAR_QUOTA_PROJID,
608 *quota_project_id,
609 )
610 .await
611 }
612
613 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
614 crate::util::validate_filename(file_name)?;
615 seq_write_pxar_entry_zero(&mut self.output, format::PXAR_FILENAME, file_name).await
616 }
617
618 pub async fn finish(mut self) -> io::Result<()> {
619 let tail_bytes = self.finish_goodbye_table().await?;
620 seq_write_pxar_entry(&mut self.output, format::PXAR_GOODBYE, &tail_bytes).await?;
621 if let Some(parent) = &mut self.parent {
622 let file_offset = self
623 .state
624 .file_offset
625 .expect("internal error: parent set but no file_offset?");
626
627 let end_offset = seq_write_position(&mut self.output).await?;
628
629 parent.items.push(GoodbyeItem {
630 hash: self.state.file_hash,
631 offset: file_offset,
632 size: end_offset - file_offset,
633 });
634 }
635 self.finished = true;
636 Ok(())
637 }
638
639 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
640 let goodbye_offset = seq_write_position(&mut self.output).await?;
641
642 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
643 let mut tail = take(&mut self.state.items);
644 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
645 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
646
647 // sort, then create a BST
648 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
649
650 let mut bst = Vec::with_capacity(tail.len() + 1);
651 unsafe {
652 bst.set_len(tail.len());
653 }
654 binary_tree_array::copy(tail.len(), |src, dest| {
655 let mut item = tail[src].clone();
656 // fixup the goodbye table offsets to be relative and with the right endianess
657 item.offset = goodbye_offset - item.offset;
658 unsafe {
659 std::ptr::write(&mut bst[dest], item.to_le());
660 }
661 });
662 drop(tail);
663
664 bst.push(
665 GoodbyeItem {
666 hash: format::PXAR_GOODBYE_TAIL_MARKER,
667 offset: goodbye_offset - self.state.entry_offset,
668 size: goodbye_size,
669 }
670 .to_le(),
671 );
672
673 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
674 // the items make sense:
675 let data = bst.as_mut_ptr() as *mut u8;
676 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
677 forget(bst);
678 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
679 }
680 }
681
682 /// Writer for a file object in a directory.
683 pub struct FileImpl<'a> {
684 output: &'a mut dyn SeqWrite,
685
686 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
687 /// directly instead of on Drop of FileImpl?
688 goodbye_item: GoodbyeItem,
689
690 /// While writing data to this file, this is how much space we still have left, this must reach
691 /// exactly zero.
692 remaining_size: u64,
693
694 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
695 /// to, and where we insert our `GoodbyeItem`.
696 parent: &'a mut EncoderState,
697 }
698
699 impl<'a> Drop for FileImpl<'a> {
700 fn drop(&mut self) {
701 if self.remaining_size != 0 {
702 self.parent.add_error(EncodeError::IncompleteFile);
703 }
704
705 self.parent.items.push(self.goodbye_item.clone());
706 }
707 }
708
709 impl<'a> FileImpl<'a> {
710 /// Get the file offset to be able to reference it with `add_hardlink`.
711 pub fn file_offset(&self) -> LinkOffset {
712 LinkOffset(self.goodbye_item.offset)
713 }
714
715 fn check_remaining(&self, size: usize) -> io::Result<()> {
716 if size as u64 > self.remaining_size {
717 io_bail!("attempted to write more than previously allocated");
718 } else {
719 Ok(())
720 }
721 }
722
723 /// Poll write interface to more easily connect to tokio/futures.
724 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
725 pub fn poll_write(
726 self: Pin<&mut Self>,
727 cx: &mut Context,
728 data: &[u8],
729 ) -> Poll<io::Result<usize>> {
730 let this = self.get_mut();
731 this.check_remaining(data.len())?;
732 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
733 match output.poll_seq_write(cx, data) {
734 Poll::Ready(Ok(put)) => {
735 this.remaining_size -= put as u64;
736 Poll::Ready(Ok(put))
737 }
738 other => other,
739 }
740 }
741
742 /// Poll flush interface to more easily connect to tokio/futures.
743 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
744 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
745 unsafe {
746 self.map_unchecked_mut(|this| &mut this.output)
747 .poll_flush(cx)
748 }
749 }
750
751 /// Poll close/shutdown interface to more easily connect to tokio/futures.
752 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
753 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
754 unsafe {
755 self.map_unchecked_mut(|this| &mut this.output)
756 .poll_close(cx)
757 }
758 }
759
760 /// Write file data for the current file entry in a pxar archive.
761 ///
762 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
763 /// requested. Check the return value for how many. There's also a `write_all` method available
764 /// for convenience.
765 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
766 self.check_remaining(data.len())?;
767 let put = seq_write(&mut self.output, data).await?;
768 self.remaining_size -= put as u64;
769 Ok(put)
770 }
771
772 /// Completely write file data for the current file entry in a pxar archive.
773 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
774 self.check_remaining(data.len())?;
775 seq_write_all(&mut self.output, data).await?;
776 self.remaining_size -= data.len() as u64;
777 Ok(())
778 }
779 }
780
781 #[cfg(feature = "tokio-io")]
782 impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
783 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
784 FileImpl::poll_write(self, cx, buf)
785 }
786
787 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
788 FileImpl::poll_flush(self, cx)
789 }
790
791 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
792 FileImpl::poll_close(self, cx)
793 }
794 }
795
796 #[cfg(feature = "futures-io")]
797 impl<'a> futures::io::AsyncWrite for FileImpl<'a> {
798 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
799 FileImpl::poll_write(self, cx, buf)
800 }
801
802 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
803 FileImpl::poll_flush(self, cx)
804 }
805
806 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
807 FileImpl::poll_close(self, cx)
808 }
809 }