]> git.proxmox.com Git - pxar.git/blob - src/encoder/mod.rs
some clippy fixes
[pxar.git] / src / encoder / mod.rs
1 //! The `pxar` encoder state machine.
2 //!
3 //! This is the implementation used by both the synchronous and async pxar wrappers.
4
5 use std::io;
6 use std::mem::{forget, size_of, size_of_val, take};
7 use std::os::unix::ffi::OsStrExt;
8 use std::path::Path;
9 use std::pin::Pin;
10 use std::sync::{Arc, Mutex};
11 use std::task::{Context, Poll};
12
13 use endian_trait::Endian;
14
15 use crate::binary_tree_array;
16 use crate::decoder::{self, SeqRead};
17 use crate::format::{self, GoodbyeItem};
18 use crate::poll_fn::poll_fn;
19 use crate::Metadata;
20
21 pub mod aio;
22 pub mod sync;
23
24 #[doc(inline)]
25 pub use sync::Encoder;
26
27 /// File reference used to create hard links.
28 #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
29 pub struct LinkOffset(u64);
30
31 impl LinkOffset {
32 #[inline]
33 pub fn raw(self) -> u64 {
34 self.0
35 }
36 }
37
38 /// Sequential write interface used by the encoder's state machine.
39 ///
40 /// This is our internal writer trait which is available for `std::io::Write` types in the
41 /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
42 /// wrapper.
43 pub trait SeqWrite {
44 fn poll_seq_write(
45 self: Pin<&mut Self>,
46 cx: &mut Context,
47 buf: &[u8],
48 ) -> Poll<io::Result<usize>>;
49
50 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
51 }
52
53 /// Allow using trait objects for generics taking a `SeqWrite`.
54 impl<S> SeqWrite for &mut S
55 where
56 S: SeqWrite + ?Sized,
57 {
58 fn poll_seq_write(
59 self: Pin<&mut Self>,
60 cx: &mut Context,
61 buf: &[u8],
62 ) -> Poll<io::Result<usize>> {
63 unsafe {
64 self.map_unchecked_mut(|this| &mut **this)
65 .poll_seq_write(cx, buf)
66 }
67 }
68
69 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
70 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
71 }
72 }
73
74 /// awaitable verison of `poll_seq_write`.
75 async fn seq_write<T: SeqWrite + ?Sized>(
76 output: &mut T,
77 buf: &[u8],
78 position: &mut u64,
79 ) -> io::Result<usize> {
80 let put =
81 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await?;
82 *position += put as u64;
83 Ok(put)
84 }
85
86 /// awaitable version of 'poll_flush'.
87 async fn flush<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<()> {
88 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_flush(cx) }).await
89 }
90
91 /// Write the entire contents of a buffer, handling short writes.
92 async fn seq_write_all<T: SeqWrite + ?Sized>(
93 output: &mut T,
94 mut buf: &[u8],
95 position: &mut u64,
96 ) -> io::Result<()> {
97 while !buf.is_empty() {
98 let got = seq_write(&mut *output, buf, &mut *position).await?;
99 buf = &buf[got..];
100 }
101 Ok(())
102 }
103
104 /// Write an endian-swappable struct.
105 async fn seq_write_struct<E: Endian, T>(
106 output: &mut T,
107 data: E,
108 position: &mut u64,
109 ) -> io::Result<()>
110 where
111 T: SeqWrite + ?Sized,
112 {
113 let data = data.to_le();
114 let buf =
115 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
116 seq_write_all(output, buf, position).await
117 }
118
119 /// Write a pxar entry.
120 async fn seq_write_pxar_entry<T>(
121 output: &mut T,
122 htype: u64,
123 data: &[u8],
124 position: &mut u64,
125 ) -> io::Result<()>
126 where
127 T: SeqWrite + ?Sized,
128 {
129 let header = format::Header::with_content_size(htype, data.len() as u64);
130 header.check_header_size()?;
131
132 seq_write_struct(&mut *output, header, &mut *position).await?;
133 seq_write_all(output, data, position).await
134 }
135
136 /// Write a pxar entry terminated by an additional zero which is not contained in the provided
137 /// data buffer.
138 async fn seq_write_pxar_entry_zero<T>(
139 output: &mut T,
140 htype: u64,
141 data: &[u8],
142 position: &mut u64,
143 ) -> io::Result<()>
144 where
145 T: SeqWrite + ?Sized,
146 {
147 let header = format::Header::with_content_size(htype, 1 + data.len() as u64);
148 header.check_header_size()?;
149
150 seq_write_struct(&mut *output, header, &mut *position).await?;
151 seq_write_all(&mut *output, data, &mut *position).await?;
152 seq_write_all(output, &[0u8], position).await
153 }
154
155 /// Write a pxar entry consiting of an endian-swappable struct.
156 async fn seq_write_pxar_struct_entry<E, T>(
157 output: &mut T,
158 htype: u64,
159 data: E,
160 position: &mut u64,
161 ) -> io::Result<()>
162 where
163 T: SeqWrite + ?Sized,
164 E: Endian,
165 {
166 let data = data.to_le();
167 let buf =
168 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
169 seq_write_pxar_entry(output, htype, buf, position).await
170 }
171
172 /// Error conditions caused by wrong usage of this crate.
173 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
174 pub enum EncodeError {
175 /// The user dropped a `File` without without finishing writing all of its contents.
176 ///
177 /// This is required because the payload lengths is written out at the begining and decoding
178 /// requires there to follow the right amount of data.
179 IncompleteFile,
180
181 /// The user dropped a directory without finalizing it.
182 ///
183 /// Finalizing is required to build the goodbye table at the end of a directory.
184 IncompleteDirectory,
185 }
186
187 #[derive(Default)]
188 struct EncoderState {
189 /// Goodbye items for this directory, excluding the tail.
190 items: Vec<GoodbyeItem>,
191
192 /// User caused error conditions.
193 encode_error: Option<EncodeError>,
194
195 /// Offset of this directory's ENTRY.
196 entry_offset: u64,
197
198 #[allow(dead_code)]
199 /// Offset to this directory's first FILENAME.
200 files_offset: u64,
201
202 /// If this is a subdirectory, this points to the this directory's FILENAME.
203 file_offset: Option<u64>,
204
205 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
206 file_hash: u64,
207
208 /// We need to keep track how much we have written to get offsets.
209 write_position: u64,
210 }
211
212 impl EncoderState {
213 fn merge_error(&mut self, error: Option<EncodeError>) {
214 // one error is enough:
215 if self.encode_error.is_none() {
216 self.encode_error = error;
217 }
218 }
219
220 fn add_error(&mut self, error: EncodeError) {
221 self.merge_error(Some(error));
222 }
223 }
224
225 pub(crate) enum EncoderOutput<'a, T> {
226 Owned(T),
227 Borrowed(&'a mut T),
228 }
229
230 impl<'a, T> EncoderOutput<'a, T> {
231 #[inline]
232 fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
233 where
234 'a: 's,
235 {
236 EncoderOutput::Borrowed(self.as_mut())
237 }
238 }
239
240 impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
241 fn as_mut(&mut self) -> &mut T {
242 match self {
243 EncoderOutput::Owned(o) => o,
244 EncoderOutput::Borrowed(b) => b,
245 }
246 }
247 }
248
249 impl<'a, T> std::convert::From<T> for EncoderOutput<'a, T> {
250 fn from(t: T) -> Self {
251 EncoderOutput::Owned(t)
252 }
253 }
254
255 impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
256 fn from(t: &'a mut T) -> Self {
257 EncoderOutput::Borrowed(t)
258 }
259 }
260
261 /// The encoder state machine implementation for a directory.
262 ///
263 /// We use `async fn` to implement the encoder state machine so that we can easily plug in both
264 /// synchronous or `async` I/O objects in as output.
265 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
266 output: EncoderOutput<'a, T>,
267 state: EncoderState,
268 parent: Option<&'a mut EncoderState>,
269 finished: bool,
270
271 /// Since only the "current" entry can be actively writing files, we share the file copy
272 /// buffer.
273 file_copy_buffer: Arc<Mutex<Vec<u8>>>,
274 }
275
276 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
277 fn drop(&mut self) {
278 if let Some(ref mut parent) = self.parent {
279 // propagate errors:
280 parent.merge_error(self.state.encode_error);
281 if !self.finished {
282 parent.add_error(EncodeError::IncompleteDirectory);
283 }
284 } else if !self.finished {
285 // FIXME: how do we deal with this?
286 // eprintln!("Encoder dropped without finishing!");
287 }
288 }
289 }
290
291 impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
292 pub async fn new(
293 output: EncoderOutput<'a, T>,
294 metadata: &Metadata,
295 ) -> io::Result<EncoderImpl<'a, T>> {
296 if !metadata.is_dir() {
297 io_bail!("directory metadata must contain the directory mode flag");
298 }
299 let mut this = Self {
300 output,
301 state: EncoderState::default(),
302 parent: None,
303 finished: false,
304 file_copy_buffer: Arc::new(Mutex::new(crate::util::vec_new(1024 * 1024))),
305 };
306
307 this.encode_metadata(metadata).await?;
308 this.state.files_offset = this.position();
309
310 Ok(this)
311 }
312
313 fn check(&self) -> io::Result<()> {
314 match self.state.encode_error {
315 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
316 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
317 None => Ok(()),
318 }
319 }
320
321 pub async fn create_file<'b>(
322 &'b mut self,
323 metadata: &Metadata,
324 file_name: &Path,
325 file_size: u64,
326 ) -> io::Result<FileImpl<'b, T>>
327 where
328 'a: 'b,
329 {
330 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
331 .await
332 }
333
334 async fn create_file_do<'b>(
335 &'b mut self,
336 metadata: &Metadata,
337 file_name: &[u8],
338 file_size: u64,
339 ) -> io::Result<FileImpl<'b, T>>
340 where
341 'a: 'b,
342 {
343 self.check()?;
344
345 let file_offset = self.position();
346 self.start_file_do(Some(metadata), file_name).await?;
347
348 let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
349 header.check_header_size()?;
350
351 seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
352
353 let payload_data_offset = self.position();
354
355 let meta_size = payload_data_offset - file_offset;
356
357 Ok(FileImpl {
358 output: self.output.as_mut(),
359 goodbye_item: GoodbyeItem {
360 hash: format::hash_filename(file_name),
361 offset: file_offset,
362 size: file_size + meta_size,
363 },
364 remaining_size: file_size,
365 parent: &mut self.state,
366 })
367 }
368
369 /// Return a file offset usable with `add_hardlink`.
370 pub async fn add_file(
371 &mut self,
372 metadata: &Metadata,
373 file_name: &Path,
374 file_size: u64,
375 content: &mut dyn SeqRead,
376 ) -> io::Result<LinkOffset> {
377 let buf = Arc::clone(&self.file_copy_buffer);
378 let mut file = self.create_file(metadata, file_name, file_size).await?;
379 let mut buf = buf.lock().expect("failed to lock temporary buffer mutex");
380 loop {
381 let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
382 if got == 0 {
383 break;
384 } else {
385 file.write_all(&buf[..got]).await?;
386 }
387 }
388 Ok(file.file_offset())
389 }
390
391 /// Return a file offset usable with `add_hardlink`.
392 pub async fn add_symlink(
393 &mut self,
394 metadata: &Metadata,
395 file_name: &Path,
396 target: &Path,
397 ) -> io::Result<()> {
398 let target = target.as_os_str().as_bytes();
399 let mut data = Vec::with_capacity(target.len() + 1);
400 data.extend(target);
401 data.push(0);
402 let _ofs: LinkOffset = self
403 .add_file_entry(
404 Some(metadata),
405 file_name,
406 Some((format::PXAR_SYMLINK, &data)),
407 )
408 .await?;
409 Ok(())
410 }
411
412 /// Return a file offset usable with `add_hardlink`.
413 pub async fn add_hardlink(
414 &mut self,
415 file_name: &Path,
416 target: &Path,
417 target_offset: LinkOffset,
418 ) -> io::Result<()> {
419 let current_offset = self.position();
420 if current_offset <= target_offset.0 {
421 io_bail!("invalid hardlink offset, can only point to prior files");
422 }
423
424 let offset_bytes = (current_offset - target_offset.0).to_le_bytes();
425 let target_bytes = target.as_os_str().as_bytes();
426 let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len() + 1);
427 hardlink.extend(&offset_bytes);
428 hardlink.extend(target_bytes);
429 hardlink.push(0);
430 let _this_offset: LinkOffset = self
431 .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink)))
432 .await?;
433 Ok(())
434 }
435
436 /// Return a file offset usable with `add_hardlink`.
437 pub async fn add_device(
438 &mut self,
439 metadata: &Metadata,
440 file_name: &Path,
441 device: format::Device,
442 ) -> io::Result<()> {
443 if !metadata.is_device() {
444 io_bail!("entry added via add_device must have a device mode in its metadata");
445 }
446
447 let device = device.to_le();
448 let device = unsafe {
449 std::slice::from_raw_parts(
450 &device as *const format::Device as *const u8,
451 size_of::<format::Device>(),
452 )
453 };
454 let _ofs: LinkOffset = self
455 .add_file_entry(
456 Some(metadata),
457 file_name,
458 Some((format::PXAR_DEVICE, device)),
459 )
460 .await?;
461 Ok(())
462 }
463
464 /// Return a file offset usable with `add_hardlink`.
465 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
466 if !metadata.is_fifo() {
467 io_bail!("entry added via add_device must be of type fifo in its metadata");
468 }
469
470 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
471 Ok(())
472 }
473
474 /// Return a file offset usable with `add_hardlink`.
475 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
476 if !metadata.is_socket() {
477 io_bail!("entry added via add_device must be of type socket in its metadata");
478 }
479
480 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
481 Ok(())
482 }
483
484 /// Return a file offset usable with `add_hardlink`.
485 async fn add_file_entry(
486 &mut self,
487 metadata: Option<&Metadata>,
488 file_name: &Path,
489 entry_htype_data: Option<(u64, &[u8])>,
490 ) -> io::Result<LinkOffset> {
491 self.check()?;
492
493 let file_offset = self.position();
494
495 let file_name = file_name.as_os_str().as_bytes();
496
497 self.start_file_do(metadata, file_name).await?;
498 if let Some((htype, entry_data)) = entry_htype_data {
499 seq_write_pxar_entry(
500 self.output.as_mut(),
501 htype,
502 entry_data,
503 &mut self.state.write_position,
504 )
505 .await?;
506 }
507
508 let end_offset = self.position();
509
510 self.state.items.push(GoodbyeItem {
511 hash: format::hash_filename(file_name),
512 offset: file_offset,
513 size: end_offset - file_offset,
514 });
515
516 Ok(LinkOffset(file_offset))
517 }
518
519 #[inline]
520 fn position(&mut self) -> u64 {
521 self.state.write_position
522 }
523
524 pub async fn create_directory(
525 &mut self,
526 file_name: &Path,
527 metadata: &Metadata,
528 ) -> io::Result<EncoderImpl<'_, T>> {
529 self.check()?;
530
531 if !metadata.is_dir() {
532 io_bail!("directory metadata must contain the directory mode flag");
533 }
534
535 let file_name = file_name.as_os_str().as_bytes();
536 let file_hash = format::hash_filename(file_name);
537
538 let file_offset = self.position();
539 self.encode_filename(file_name).await?;
540
541 let entry_offset = self.position();
542 self.encode_metadata(&metadata).await?;
543
544 let files_offset = self.position();
545
546 // the child will write to OUR state now:
547 let write_position = self.position();
548
549 let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
550
551 Ok(EncoderImpl {
552 // always forward as Borrowed(), to avoid stacking references on nested calls
553 output: self.output.to_borrowed_mut(),
554 state: EncoderState {
555 entry_offset,
556 files_offset,
557 file_offset: Some(file_offset),
558 file_hash,
559 write_position,
560 ..Default::default()
561 },
562 parent: Some(&mut self.state),
563 finished: false,
564 file_copy_buffer,
565 })
566 }
567
568 async fn start_file_do(
569 &mut self,
570 metadata: Option<&Metadata>,
571 file_name: &[u8],
572 ) -> io::Result<()> {
573 self.encode_filename(file_name).await?;
574 if let Some(metadata) = metadata {
575 self.encode_metadata(&metadata).await?;
576 }
577 Ok(())
578 }
579
580 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
581 seq_write_pxar_struct_entry(
582 self.output.as_mut(),
583 format::PXAR_ENTRY,
584 metadata.stat.clone(),
585 &mut self.state.write_position,
586 )
587 .await?;
588
589 for xattr in &metadata.xattrs {
590 self.write_xattr(xattr).await?;
591 }
592
593 self.write_acls(&metadata.acl).await?;
594
595 if let Some(fcaps) = &metadata.fcaps {
596 self.write_file_capabilities(fcaps).await?;
597 }
598
599 if let Some(qpid) = &metadata.quota_project_id {
600 self.write_quota_project_id(qpid).await?;
601 }
602
603 Ok(())
604 }
605
606 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
607 seq_write_pxar_entry(
608 self.output.as_mut(),
609 format::PXAR_XATTR,
610 &xattr.data,
611 &mut self.state.write_position,
612 )
613 .await
614 }
615
616 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
617 for acl in &acl.users {
618 seq_write_pxar_struct_entry(
619 self.output.as_mut(),
620 format::PXAR_ACL_USER,
621 acl.clone(),
622 &mut self.state.write_position,
623 )
624 .await?;
625 }
626
627 for acl in &acl.groups {
628 seq_write_pxar_struct_entry(
629 self.output.as_mut(),
630 format::PXAR_ACL_GROUP,
631 acl.clone(),
632 &mut self.state.write_position,
633 )
634 .await?;
635 }
636
637 if let Some(acl) = &acl.group_obj {
638 seq_write_pxar_struct_entry(
639 self.output.as_mut(),
640 format::PXAR_ACL_GROUP_OBJ,
641 acl.clone(),
642 &mut self.state.write_position,
643 )
644 .await?;
645 }
646
647 if let Some(acl) = &acl.default {
648 seq_write_pxar_struct_entry(
649 self.output.as_mut(),
650 format::PXAR_ACL_DEFAULT,
651 acl.clone(),
652 &mut self.state.write_position,
653 )
654 .await?;
655 }
656
657 for acl in &acl.default_users {
658 seq_write_pxar_struct_entry(
659 self.output.as_mut(),
660 format::PXAR_ACL_DEFAULT_USER,
661 acl.clone(),
662 &mut self.state.write_position,
663 )
664 .await?;
665 }
666
667 for acl in &acl.default_groups {
668 seq_write_pxar_struct_entry(
669 self.output.as_mut(),
670 format::PXAR_ACL_DEFAULT_GROUP,
671 acl.clone(),
672 &mut self.state.write_position,
673 )
674 .await?;
675 }
676
677 Ok(())
678 }
679
680 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
681 seq_write_pxar_entry(
682 self.output.as_mut(),
683 format::PXAR_FCAPS,
684 &fcaps.data,
685 &mut self.state.write_position,
686 )
687 .await
688 }
689
690 async fn write_quota_project_id(
691 &mut self,
692 quota_project_id: &format::QuotaProjectId,
693 ) -> io::Result<()> {
694 seq_write_pxar_struct_entry(
695 self.output.as_mut(),
696 format::PXAR_QUOTA_PROJID,
697 *quota_project_id,
698 &mut self.state.write_position,
699 )
700 .await
701 }
702
703 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
704 crate::util::validate_filename(file_name)?;
705 seq_write_pxar_entry_zero(
706 self.output.as_mut(),
707 format::PXAR_FILENAME,
708 file_name,
709 &mut self.state.write_position,
710 )
711 .await
712 }
713
714 pub async fn finish(mut self) -> io::Result<()> {
715 let tail_bytes = self.finish_goodbye_table().await?;
716 seq_write_pxar_entry(
717 self.output.as_mut(),
718 format::PXAR_GOODBYE,
719 &tail_bytes,
720 &mut self.state.write_position,
721 )
722 .await?;
723
724 if let EncoderOutput::Owned(output) = &mut self.output {
725 flush(output).await?;
726 }
727
728 // done up here because of the self-borrow and to propagate
729 let end_offset = self.position();
730
731 if let Some(parent) = &mut self.parent {
732 parent.write_position = end_offset;
733
734 let file_offset = self
735 .state
736 .file_offset
737 .expect("internal error: parent set but no file_offset?");
738
739 parent.items.push(GoodbyeItem {
740 hash: self.state.file_hash,
741 offset: file_offset,
742 size: end_offset - file_offset,
743 });
744 }
745 self.finished = true;
746 Ok(())
747 }
748
749 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
750 let goodbye_offset = self.position();
751
752 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
753 let mut tail = take(&mut self.state.items);
754 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
755 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
756
757 // sort, then create a BST
758 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
759
760 let mut bst = Vec::with_capacity(tail.len() + 1);
761 unsafe {
762 bst.set_len(tail.len());
763 }
764 binary_tree_array::copy(tail.len(), |src, dest| {
765 let mut item = tail[src].clone();
766 // fixup the goodbye table offsets to be relative and with the right endianess
767 item.offset = goodbye_offset - item.offset;
768 unsafe {
769 std::ptr::write(&mut bst[dest], item.to_le());
770 }
771 });
772 drop(tail);
773
774 bst.push(
775 GoodbyeItem {
776 hash: format::PXAR_GOODBYE_TAIL_MARKER,
777 offset: goodbye_offset - self.state.entry_offset,
778 size: goodbye_size,
779 }
780 .to_le(),
781 );
782
783 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
784 // the items make sense:
785 let data = bst.as_mut_ptr() as *mut u8;
786 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
787 forget(bst);
788 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
789 }
790 }
791
792 /// Writer for a file object in a directory.
793 pub struct FileImpl<'a, S: SeqWrite> {
794 output: &'a mut S,
795
796 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
797 /// directly instead of on Drop of FileImpl?
798 goodbye_item: GoodbyeItem,
799
800 /// While writing data to this file, this is how much space we still have left, this must reach
801 /// exactly zero.
802 remaining_size: u64,
803
804 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
805 /// to, and where we insert our `GoodbyeItem`.
806 parent: &'a mut EncoderState,
807 }
808
809 impl<'a, S: SeqWrite> Drop for FileImpl<'a, S> {
810 fn drop(&mut self) {
811 if self.remaining_size != 0 {
812 self.parent.add_error(EncodeError::IncompleteFile);
813 }
814
815 self.parent.items.push(self.goodbye_item.clone());
816 }
817 }
818
819 impl<'a, S: SeqWrite> FileImpl<'a, S> {
820 /// Get the file offset to be able to reference it with `add_hardlink`.
821 pub fn file_offset(&self) -> LinkOffset {
822 LinkOffset(self.goodbye_item.offset)
823 }
824
825 fn check_remaining(&self, size: usize) -> io::Result<()> {
826 if size as u64 > self.remaining_size {
827 io_bail!("attempted to write more than previously allocated");
828 } else {
829 Ok(())
830 }
831 }
832
833 /// Poll write interface to more easily connect to tokio/futures.
834 #[cfg(feature = "tokio-io")]
835 pub fn poll_write(
836 self: Pin<&mut Self>,
837 cx: &mut Context,
838 data: &[u8],
839 ) -> Poll<io::Result<usize>> {
840 let this = self.get_mut();
841 this.check_remaining(data.len())?;
842 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
843 match output.poll_seq_write(cx, data) {
844 Poll::Ready(Ok(put)) => {
845 this.remaining_size -= put as u64;
846 this.parent.write_position += put as u64;
847 Poll::Ready(Ok(put))
848 }
849 other => other,
850 }
851 }
852
853 /// Poll flush interface to more easily connect to tokio/futures.
854 #[cfg(feature = "tokio-io")]
855 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
856 unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
857 }
858
859 /// Poll close/shutdown interface to more easily connect to tokio/futures.
860 ///
861 /// This just calls flush, though, since we're just a virtual writer writing to the file
862 /// provided by our encoder.
863 #[cfg(feature = "tokio-io")]
864 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
865 unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
866 }
867
868 /// Write file data for the current file entry in a pxar archive.
869 ///
870 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
871 /// requested. Check the return value for how many. There's also a `write_all` method available
872 /// for convenience.
873 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
874 self.check_remaining(data.len())?;
875 let put =
876 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
877 .await?;
878 //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
879 self.remaining_size -= put as u64;
880 self.parent.write_position += put as u64;
881 Ok(put)
882 }
883
884 /// Completely write file data for the current file entry in a pxar archive.
885 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
886 self.check_remaining(data.len())?;
887 seq_write_all(self.output, data, &mut self.parent.write_position).await?;
888 self.remaining_size -= data.len() as u64;
889 Ok(())
890 }
891 }
892
893 #[cfg(feature = "tokio-io")]
894 impl<'a, S: SeqWrite> tokio::io::AsyncWrite for FileImpl<'a, S> {
895 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
896 FileImpl::poll_write(self, cx, buf)
897 }
898
899 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
900 FileImpl::poll_flush(self, cx)
901 }
902
903 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
904 FileImpl::poll_close(self, cx)
905 }
906 }