]> git.proxmox.com Git - pxar.git/blob - src/encoder/mod.rs
fix device encoding
[pxar.git] / src / encoder / mod.rs
1 //! The `pxar` encoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 use std::cell::RefCell;
6 use std::io;
7 use std::mem::{forget, size_of, size_of_val, take};
8 use std::os::unix::ffi::OsStrExt;
9 use std::path::Path;
10 use std::pin::Pin;
11 use std::rc::Rc;
12 use std::task::{Context, Poll};
13
14 use endian_trait::Endian;
15
16 use crate::binary_tree_array;
17 use crate::decoder::{self, SeqRead};
18 use crate::format::{self, GoodbyeItem};
19 use crate::poll_fn::poll_fn;
20 use crate::Metadata;
21
22 pub mod aio;
23 pub mod sync;
24
25 #[doc(inline)]
26 pub use sync::Encoder;
27
28 /// File reference used to create hard links.
29 #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
30 pub struct LinkOffset(u64);
31
32 impl LinkOffset {
33 #[inline]
34 pub fn raw(self) -> u64 {
35 self.0
36 }
37 }
38
39 /// Sequential write interface used by the encoder's state machine.
40 ///
41 /// This is our internal writer trait which is available for `std::io::Write` types in the
42 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
43 /// wrapper.
44 pub trait SeqWrite {
45 fn poll_seq_write(
46 self: Pin<&mut Self>,
47 cx: &mut Context,
48 buf: &[u8],
49 ) -> Poll<io::Result<usize>>;
50
51 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
52
53 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
54
55 /// While writing to a pxar archive we need to remember how much dat we've written to track some
56 /// offsets. Particularly items like the goodbye table need to be able to compute offsets to
57 /// further back in the archive.
58 fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>>;
59
60 /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper.
61 /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another
62 /// subdirectory in that would have a trait object pointing to the trait object, and so on.
63 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
64 where
65 Self: Sized,
66 {
67 self as &mut dyn SeqWrite
68 }
69 }
70
71 /// Allow using trait objects for generics taking a `SeqWrite`.
72 impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) {
73 fn poll_seq_write(
74 self: Pin<&mut Self>,
75 cx: &mut Context,
76 buf: &[u8],
77 ) -> Poll<io::Result<usize>> {
78 unsafe {
79 self.map_unchecked_mut(|this| &mut **this)
80 .poll_seq_write(cx, buf)
81 }
82 }
83
84 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
85 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
86 }
87
88 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
89 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_close(cx) }
90 }
91
92 fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
93 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_position(cx) }
94 }
95
96 fn as_trait_object(&mut self) -> &mut dyn SeqWrite
97 where
98 Self: Sized,
99 {
100 &mut **self
101 }
102 }
103
104 /// awaitable version of `poll_position`.
105 async fn seq_write_position<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<u64> {
106 poll_fn(move |cx| unsafe { Pin::new_unchecked(&mut *output).poll_position(cx) }).await
107 }
108
109 /// awaitable verison of `poll_seq_write`.
110 async fn seq_write<T: SeqWrite + ?Sized>(output: &mut T, buf: &[u8]) -> io::Result<usize> {
111 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await
112 }
113
114 /// Write the entire contents of a buffer, handling short writes.
115 async fn seq_write_all<T: SeqWrite + ?Sized>(output: &mut T, mut buf: &[u8]) -> io::Result<()> {
116 while !buf.is_empty() {
117 let got = seq_write(&mut *output, buf).await?;
118 buf = &buf[got..];
119 }
120 Ok(())
121 }
122
123 /// Write an endian-swappable struct.
124 async fn seq_write_struct<E: Endian, T>(output: &mut T, data: E) -> io::Result<()>
125 where
126 T: SeqWrite + ?Sized,
127 {
128 let data = data.to_le();
129 seq_write_all(output, unsafe {
130 std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
131 })
132 .await
133 }
134
135 /// Write a pxar entry.
136 async fn seq_write_pxar_entry<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
137 where
138 T: SeqWrite + ?Sized,
139 {
140 let header = format::Header::with_content_size(htype, data.len() as u64);
141 header.check_header_size()?;
142
143 seq_write_struct(
144 &mut *output,
145 header,
146 )
147 .await?;
148 seq_write_all(output, data).await
149 }
150
151 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
152 /// data buffer.
153 async fn seq_write_pxar_entry_zero<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()>
154 where
155 T: SeqWrite + ?Sized,
156 {
157 let header = format::Header::with_content_size(htype, 1 + data.len() as u64);
158 header.check_header_size()?;
159
160 seq_write_struct(
161 &mut *output,
162 header,
163 )
164 .await?;
165 seq_write_all(&mut *output, data).await?;
166 seq_write_all(output, &[0u8]).await
167 }
168
169 /// Write a pxar entry consiting of an endian-swappable struct.
170 async fn seq_write_pxar_struct_entry<E, T>(output: &mut T, htype: u64, data: E) -> io::Result<()>
171 where
172 T: SeqWrite + ?Sized,
173 E: Endian,
174 {
175 let data = data.to_le();
176 seq_write_pxar_entry(output, htype, unsafe {
177 std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data))
178 })
179 .await
180 }
181
182 /// Error conditions caused by wrong usage of this crate.
183 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
184 pub enum EncodeError {
185 /// The user dropped a `File` without without finishing writing all of its contents.
186 ///
187 /// This is required because the payload lengths is written out at the begining and decoding
188 /// requires there to follow the right amount of data.
189 IncompleteFile,
190
191 /// The user dropped a directory without finalizing it.
192 ///
193 /// Finalizing is required to build the goodbye table at the end of a directory.
194 IncompleteDirectory,
195 }
196
197 #[derive(Default)]
198 struct EncoderState {
199 /// Goodbye items for this directory, excluding the tail.
200 items: Vec<GoodbyeItem>,
201
202 /// User caused error conditions.
203 encode_error: Option<EncodeError>,
204
205 /// Offset of this directory's ENTRY.
206 entry_offset: u64,
207
208 /// Offset to this directory's first FILENAME.
209 files_offset: u64,
210
211 /// If this is a subdirectory, this points to the this directory's FILENAME.
212 file_offset: Option<u64>,
213
214 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
215 file_hash: u64,
216 }
217
218 impl EncoderState {
219 fn merge_error(&mut self, error: Option<EncodeError>) {
220 // one error is enough:
221 if self.encode_error.is_none() {
222 self.encode_error = error;
223 }
224 }
225
226 fn add_error(&mut self, error: EncodeError) {
227 self.merge_error(Some(error));
228 }
229 }
230
231 /// The encoder state machine implementation for a directory.
232 ///
233 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
234 /// synchronous or `async` I/O objects in as output.
235 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
236 output: T,
237 state: EncoderState,
238 parent: Option<&'a mut EncoderState>,
239 finished: bool,
240
241 /// Since only the "current" entry can be actively writing files, we share the file copy
242 /// buffer.
243 file_copy_buffer: Rc<RefCell<Vec<u8>>>,
244 }
245
246 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
247 fn drop(&mut self) {
248 if let Some(ref mut parent) = self.parent {
249 // propagate errors:
250 parent.merge_error(self.state.encode_error);
251 if !self.finished {
252 parent.add_error(EncodeError::IncompleteDirectory);
253 }
254 } else if !self.finished {
255 // FIXME: how do we deal with this?
256 // eprintln!("Encoder dropped without finishing!");
257 }
258 }
259 }
260
261 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
262 pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> {
263 if !metadata.is_dir() {
264 io_bail!("directory metadata must contain the directory mode flag");
265 }
266 let mut this = Self {
267 output,
268 state: EncoderState::default(),
269 parent: None,
270 finished: false,
271 file_copy_buffer: Rc::new(RefCell::new(crate::util::vec_new(1024 * 1024))),
272 };
273
274 this.encode_metadata(metadata).await?;
275 this.state.files_offset = seq_write_position(&mut this.output).await?;
276
277 Ok(this)
278 }
279
280 fn check(&self) -> io::Result<()> {
281 match self.state.encode_error {
282 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
283 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
284 None => Ok(()),
285 }
286 }
287
288 pub async fn create_file<'b>(
289 &'b mut self,
290 metadata: &Metadata,
291 file_name: &Path,
292 file_size: u64,
293 ) -> io::Result<FileImpl<'b>>
294 where
295 'a: 'b,
296 {
297 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
298 .await
299 }
300
301 async fn create_file_do<'b>(
302 &'b mut self,
303 metadata: &Metadata,
304 file_name: &[u8],
305 file_size: u64,
306 ) -> io::Result<FileImpl<'b>>
307 where
308 'a: 'b,
309 {
310 self.check()?;
311
312 let file_offset = seq_write_position(&mut self.output).await?;
313 self.start_file_do(Some(metadata), file_name).await?;
314
315 let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
316 header.check_header_size()?;
317
318 seq_write_struct(
319 &mut self.output,
320 header,
321 )
322 .await?;
323
324 let payload_data_offset = seq_write_position(&mut self.output).await?;
325
326 let meta_size = payload_data_offset - file_offset;
327
328 Ok(FileImpl {
329 output: &mut self.output,
330 goodbye_item: GoodbyeItem {
331 hash: format::hash_filename(file_name),
332 offset: file_offset,
333 size: file_size + meta_size,
334 },
335 remaining_size: file_size,
336 parent: &mut self.state,
337 })
338 }
339
340 /// Return a file offset usable with `add_hardlink`.
341 pub async fn add_file(
342 &mut self,
343 metadata: &Metadata,
344 file_name: &Path,
345 file_size: u64,
346 content: &mut dyn SeqRead,
347 ) -> io::Result<LinkOffset> {
348 let buf = Rc::clone(&self.file_copy_buffer);
349 let mut file = self.create_file(metadata, file_name, file_size).await?;
350 let mut buf = buf.borrow_mut();
351 loop {
352 let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
353 if got == 0 {
354 break;
355 } else {
356 file.write_all(&buf[..got]).await?;
357 }
358 }
359 Ok(file.file_offset())
360 }
361
362 /// Return a file offset usable with `add_hardlink`.
363 pub async fn add_symlink(
364 &mut self,
365 metadata: &Metadata,
366 file_name: &Path,
367 target: &Path,
368 ) -> io::Result<()> {
369 let target = target.as_os_str().as_bytes();
370 let mut data = Vec::with_capacity(target.len() + 1);
371 data.extend(target);
372 data.push(0);
373 let _ofs: LinkOffset = self
374 .add_file_entry(
375 Some(metadata),
376 file_name,
377 Some((format::PXAR_SYMLINK, &data)),
378 )
379 .await?;
380 Ok(())
381 }
382
383 /// Return a file offset usable with `add_hardlink`.
384 pub async fn add_hardlink(
385 &mut self,
386 file_name: &Path,
387 target: &Path,
388 target_offset: LinkOffset,
389 ) -> io::Result<()> {
390 let current_offset = seq_write_position(&mut self.output).await?;
391 if current_offset <= target_offset.0 {
392 io_bail!("invalid hardlink offset, can only point to prior files");
393 }
394
395 let offset_bytes = (current_offset - target_offset.0).to_le_bytes();
396 let target_bytes = target.as_os_str().as_bytes();
397 let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len() + 1);
398 hardlink.extend(&offset_bytes);
399 hardlink.extend(target_bytes);
400 hardlink.push(0);
401 let _this_offset: LinkOffset = self
402 .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink)))
403 .await?;
404 Ok(())
405 }
406
407 /// Return a file offset usable with `add_hardlink`.
408 pub async fn add_device(
409 &mut self,
410 metadata: &Metadata,
411 file_name: &Path,
412 device: format::Device,
413 ) -> io::Result<()> {
414 if !metadata.is_device() {
415 io_bail!("entry added via add_device must have a device mode in its metadata");
416 }
417
418 let device = device.to_le();
419 let device = unsafe {
420 std::slice::from_raw_parts(
421 &device as *const format::Device as *const u8,
422 size_of::<format::Device>(),
423 )
424 };
425 let _ofs: LinkOffset = self
426 .add_file_entry(
427 Some(metadata),
428 file_name,
429 Some((format::PXAR_DEVICE, device)),
430 )
431 .await?;
432 Ok(())
433 }
434
435 /// Return a file offset usable with `add_hardlink`.
436 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
437 if !metadata.is_fifo() {
438 io_bail!("entry added via add_device must be of type fifo in its metadata");
439 }
440
441 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
442 Ok(())
443 }
444
445 /// Return a file offset usable with `add_hardlink`.
446 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
447 if !metadata.is_socket() {
448 io_bail!("entry added via add_device must be of type socket in its metadata");
449 }
450
451 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
452 Ok(())
453 }
454
455 /// Return a file offset usable with `add_hardlink`.
456 async fn add_file_entry(
457 &mut self,
458 metadata: Option<&Metadata>,
459 file_name: &Path,
460 entry_htype_data: Option<(u64, &[u8])>,
461 ) -> io::Result<LinkOffset> {
462 self.check()?;
463
464 let file_offset = seq_write_position(&mut self.output).await?;
465
466 let file_name = file_name.as_os_str().as_bytes();
467
468 self.start_file_do(metadata, file_name).await?;
469 if let Some((htype, entry_data)) = entry_htype_data {
470 seq_write_pxar_entry(&mut self.output, htype, entry_data).await?;
471 }
472
473 let end_offset = seq_write_position(&mut self.output).await?;
474
475 self.state.items.push(GoodbyeItem {
476 hash: format::hash_filename(file_name),
477 offset: file_offset,
478 size: end_offset - file_offset,
479 });
480
481 Ok(LinkOffset(file_offset))
482 }
483
484 /// Helper
485 #[inline]
486 async fn position(&mut self) -> io::Result<u64> {
487 seq_write_position(&mut self.output).await
488 }
489
490 pub async fn create_directory<'b>(
491 &'b mut self,
492 file_name: &Path,
493 metadata: &Metadata,
494 ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>>
495 where
496 'a: 'b,
497 {
498 self.check()?;
499
500 if !metadata.is_dir() {
501 io_bail!("directory metadata must contain the directory mode flag");
502 }
503
504 let file_name = file_name.as_os_str().as_bytes();
505 let file_hash = format::hash_filename(file_name);
506
507 let file_offset = self.position().await?;
508 self.encode_filename(file_name).await?;
509
510 let entry_offset = self.position().await?;
511 self.encode_metadata(&metadata).await?;
512
513 let files_offset = self.position().await?;
514
515 Ok(EncoderImpl {
516 output: self.output.as_trait_object(),
517 state: EncoderState {
518 entry_offset,
519 files_offset,
520 file_offset: Some(file_offset),
521 file_hash: file_hash,
522 ..Default::default()
523 },
524 parent: Some(&mut self.state),
525 finished: false,
526 file_copy_buffer: Rc::clone(&self.file_copy_buffer),
527 })
528 }
529
530 async fn start_file_do(
531 &mut self,
532 metadata: Option<&Metadata>,
533 file_name: &[u8],
534 ) -> io::Result<()> {
535 self.encode_filename(file_name).await?;
536 if let Some(metadata) = metadata {
537 self.encode_metadata(&metadata).await?;
538 }
539 Ok(())
540 }
541
542 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
543 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ENTRY, metadata.stat.clone())
544 .await?;
545
546 for xattr in &metadata.xattrs {
547 self.write_xattr(xattr).await?;
548 }
549
550 self.write_acls(&metadata.acl).await?;
551
552 if let Some(fcaps) = &metadata.fcaps {
553 self.write_file_capabilities(fcaps).await?;
554 }
555
556 if let Some(qpid) = &metadata.quota_project_id {
557 self.write_quota_project_id(qpid).await?;
558 }
559
560 Ok(())
561 }
562
563 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
564 seq_write_pxar_entry(&mut self.output, format::PXAR_XATTR, &xattr.data).await
565 }
566
567 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
568 for acl in &acl.users {
569 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_USER, acl.clone())
570 .await?;
571 }
572
573 for acl in &acl.groups {
574 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP, acl.clone())
575 .await?;
576 }
577
578 if let Some(acl) = &acl.group_obj {
579 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP_OBJ, acl.clone())
580 .await?;
581 }
582
583 if let Some(acl) = &acl.default {
584 seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_DEFAULT, acl.clone())
585 .await?;
586 }
587
588 for acl in &acl.default_users {
589 seq_write_pxar_struct_entry(
590 &mut self.output,
591 format::PXAR_ACL_DEFAULT_USER,
592 acl.clone(),
593 )
594 .await?;
595 }
596
597 for acl in &acl.default_groups {
598 seq_write_pxar_struct_entry(
599 &mut self.output,
600 format::PXAR_ACL_DEFAULT_GROUP,
601 acl.clone(),
602 )
603 .await?;
604 }
605
606 Ok(())
607 }
608
609 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
610 seq_write_pxar_entry(&mut self.output, format::PXAR_FCAPS, &fcaps.data).await
611 }
612
613 async fn write_quota_project_id(
614 &mut self,
615 quota_project_id: &format::QuotaProjectId,
616 ) -> io::Result<()> {
617 seq_write_pxar_struct_entry(
618 &mut self.output,
619 format::PXAR_QUOTA_PROJID,
620 quota_project_id.clone(),
621 )
622 .await
623 }
624
625 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
626 crate::util::validate_filename(file_name)?;
627 seq_write_pxar_entry_zero(&mut self.output, format::PXAR_FILENAME, file_name).await
628 }
629
630 pub async fn finish(mut self) -> io::Result<()> {
631 let tail_bytes = self.finish_goodbye_table().await?;
632 seq_write_pxar_entry(&mut self.output, format::PXAR_GOODBYE, &tail_bytes).await?;
633 if let Some(parent) = &mut self.parent {
634 let file_offset = self
635 .state
636 .file_offset
637 .expect("internal error: parent set but no file_offset?");
638
639 let end_offset = seq_write_position(&mut self.output).await?;
640
641 parent.items.push(GoodbyeItem {
642 hash: self.state.file_hash,
643 offset: file_offset,
644 size: end_offset - file_offset,
645 });
646 }
647 self.finished = true;
648 Ok(())
649 }
650
651 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
652 let goodbye_offset = seq_write_position(&mut self.output).await?;
653
654 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
655 let mut tail = take(&mut self.state.items);
656 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
657 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
658
659 // sort, then create a BST
660 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
661
662 let mut bst = Vec::with_capacity(tail.len() + 1);
663 unsafe {
664 bst.set_len(tail.len());
665 }
666 binary_tree_array::copy(tail.len(), |src, dest| {
667 let mut item = tail[src].clone();
668 // fixup the goodbye table offsets to be relative and with the right endianess
669 item.offset = goodbye_offset - item.offset;
670 unsafe {
671 std::ptr::write(&mut bst[dest], item.to_le());
672 }
673 });
674 drop(tail);
675
676 bst.push(
677 GoodbyeItem {
678 hash: format::PXAR_GOODBYE_TAIL_MARKER,
679 offset: goodbye_offset - self.state.entry_offset,
680 size: goodbye_size,
681 }
682 .to_le(),
683 );
684
685 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
686 // the items make sense:
687 let data = bst.as_mut_ptr() as *mut u8;
688 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
689 forget(bst);
690 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
691 }
692 }
693
694 /// Writer for a file object in a directory.
695 pub struct FileImpl<'a> {
696 output: &'a mut dyn SeqWrite,
697
698 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
699 /// directly instead of on Drop of FileImpl?
700 goodbye_item: GoodbyeItem,
701
702 /// While writing data to this file, this is how much space we still have left, this must reach
703 /// exactly zero.
704 remaining_size: u64,
705
706 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
707 /// to, and where we insert our `GoodbyeItem`.
708 parent: &'a mut EncoderState,
709 }
710
711 impl<'a> Drop for FileImpl<'a> {
712 fn drop(&mut self) {
713 if self.remaining_size != 0 {
714 self.parent.add_error(EncodeError::IncompleteFile);
715 }
716
717 self.parent.items.push(self.goodbye_item.clone());
718 }
719 }
720
721 impl<'a> FileImpl<'a> {
722 /// Get the file offset to be able to reference it with `add_hardlink`.
723 pub fn file_offset(&self) -> LinkOffset {
724 LinkOffset(self.goodbye_item.offset)
725 }
726
727 fn check_remaining(&self, size: usize) -> io::Result<()> {
728 if size as u64 > self.remaining_size {
729 io_bail!("attempted to write more than previously allocated");
730 } else {
731 Ok(())
732 }
733 }
734
735 /// Poll write interface to more easily connect to tokio/futures.
736 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
737 pub fn poll_write(
738 self: Pin<&mut Self>,
739 cx: &mut Context,
740 data: &[u8],
741 ) -> Poll<io::Result<usize>> {
742 let this = self.get_mut();
743 this.check_remaining(data.len())?;
744 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
745 match output.poll_seq_write(cx, data) {
746 Poll::Ready(Ok(put)) => {
747 this.remaining_size -= put as u64;
748 Poll::Ready(Ok(put))
749 }
750 other => other,
751 }
752 }
753
754 /// Poll flush interface to more easily connect to tokio/futures.
755 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
756 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
757 unsafe {
758 self.map_unchecked_mut(|this| &mut this.output)
759 .poll_flush(cx)
760 }
761 }
762
763 /// Poll close/shutdown interface to more easily connect to tokio/futures.
764 #[cfg(any(feature = "tokio-io", feature = "futures-io"))]
765 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
766 unsafe {
767 self.map_unchecked_mut(|this| &mut this.output)
768 .poll_close(cx)
769 }
770 }
771
772 /// Write file data for the current file entry in a pxar archive.
773 ///
774 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
775 /// requested. Check the return value for how many. There's also a `write_all` method available
776 /// for convenience.
777 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
778 self.check_remaining(data.len())?;
779 let put = seq_write(&mut self.output, data).await?;
780 self.remaining_size -= put as u64;
781 Ok(put)
782 }
783
784 /// Completely write file data for the current file entry in a pxar archive.
785 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
786 self.check_remaining(data.len())?;
787 seq_write_all(&mut self.output, data).await?;
788 self.remaining_size -= data.len() as u64;
789 Ok(())
790 }
791 }
792
793 #[cfg(feature = "tokio-io")]
794 impl<'a> tokio::io::AsyncWrite for FileImpl<'a> {
795 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
796 FileImpl::poll_write(self, cx, buf)
797 }
798
799 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
800 FileImpl::poll_flush(self, cx)
801 }
802
803 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
804 FileImpl::poll_close(self, cx)
805 }
806 }
807
808 #[cfg(feature = "futures-io")]
809 impl<'a> futures::io::AsyncWrite for FileImpl<'a> {
810 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
811 FileImpl::poll_write(self, cx, buf)
812 }
813
814 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
815 FileImpl::poll_flush(self, cx)
816 }
817
818 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
819 FileImpl::poll_close(self, cx)
820 }
821 }