]> git.proxmox.com Git - pxar.git/blame - src/encoder/mod.rs
make the ReadAtImpl enum variants private
[pxar.git] / src / encoder / mod.rs
CommitLineData
70acf637
WB
1//! The `pxar` encoder state machine.
2//!
3//! This is the implementation used by both the synchronous and async pxar wrappers.
4
5use std::io;
6use std::mem::{forget, size_of, size_of_val, take};
7use std::os::unix::ffi::OsStrExt;
8use std::path::Path;
9use std::pin::Pin;
a08b84b7 10use std::sync::{Arc, Mutex};
70acf637
WB
11use std::task::{Context, Poll};
12
13use endian_trait::Endian;
14
749855a4 15use crate::binary_tree_array;
951620f1 16use crate::decoder::{self, SeqRead};
70acf637
WB
17use crate::format::{self, GoodbyeItem};
18use crate::poll_fn::poll_fn;
19use crate::Metadata;
20
54109840 21pub mod aio;
70acf637
WB
22pub mod sync;
23
24#[doc(inline)]
25pub use sync::Encoder;
26
d6748862
WB
27/// File reference used to create hard links.
28#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
29pub struct LinkOffset(u64);
30
31impl LinkOffset {
32 #[inline]
33 pub fn raw(self) -> u64 {
34 self.0
35 }
36}
37
70acf637
WB
38/// Sequential write interface used by the encoder's state machine.
39///
40/// This is our internal writer trait which is available for `std::io::Write` types in the
41/// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous
42/// wrapper.
43pub trait SeqWrite {
44 fn poll_seq_write(
45 self: Pin<&mut Self>,
46 cx: &mut Context,
47 buf: &[u8],
48 ) -> Poll<io::Result<usize>>;
49
327e33f2 50 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
70acf637
WB
51}
52
53/// Allow using trait objects for generics taking a `SeqWrite`.
d8402433
SR
54impl<S> SeqWrite for &mut S
55where
56 S: SeqWrite + ?Sized,
57{
70acf637
WB
58 fn poll_seq_write(
59 self: Pin<&mut Self>,
60 cx: &mut Context,
61 buf: &[u8],
62 ) -> Poll<io::Result<usize>> {
63 unsafe {
64 self.map_unchecked_mut(|this| &mut **this)
65 .poll_seq_write(cx, buf)
66 }
67 }
68
05356884
WB
69 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
70 unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) }
71 }
70acf637
WB
72}
73
fc9c8c9d 74/// awaitable verison of `poll_seq_write`.
ec4a53ed
WB
75async fn seq_write<T: SeqWrite + ?Sized>(
76 output: &mut T,
77 buf: &[u8],
78 position: &mut u64,
79) -> io::Result<usize> {
80 let put =
81 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await?;
82 *position += put as u64;
83 Ok(put)
fc9c8c9d 84}
70acf637 85
d995b531 86/// awaitable version of 'poll_flush'.
eae6dc06 87async fn flush<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<()> {
d995b531
DC
88 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_flush(cx) }).await
89}
90
fc9c8c9d 91/// Write the entire contents of a buffer, handling short writes.
ec4a53ed
WB
92async fn seq_write_all<T: SeqWrite + ?Sized>(
93 output: &mut T,
94 mut buf: &[u8],
95 position: &mut u64,
96) -> io::Result<()> {
fc9c8c9d 97 while !buf.is_empty() {
ec4a53ed 98 let got = seq_write(&mut *output, buf, &mut *position).await?;
fc9c8c9d 99 buf = &buf[got..];
70acf637 100 }
fc9c8c9d
WB
101 Ok(())
102}
70acf637 103
fc9c8c9d 104/// Write an endian-swappable struct.
ec4a53ed
WB
105async fn seq_write_struct<E: Endian, T>(
106 output: &mut T,
107 data: E,
108 position: &mut u64,
109) -> io::Result<()>
fc9c8c9d
WB
110where
111 T: SeqWrite + ?Sized,
112{
113 let data = data.to_le();
a08b84b7
WB
114 let buf =
115 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
8bff2f7c 116 seq_write_all(output, buf, position).await
fc9c8c9d 117}
70acf637 118
fc9c8c9d 119/// Write a pxar entry.
ec4a53ed
WB
120async fn seq_write_pxar_entry<T>(
121 output: &mut T,
122 htype: u64,
123 data: &[u8],
124 position: &mut u64,
125) -> io::Result<()>
fc9c8c9d
WB
126where
127 T: SeqWrite + ?Sized,
128{
ec0761f9
FG
129 let header = format::Header::with_content_size(htype, data.len() as u64);
130 header.check_header_size()?;
131
ec4a53ed
WB
132 seq_write_struct(&mut *output, header, &mut *position).await?;
133 seq_write_all(output, data, position).await
fc9c8c9d 134}
70acf637 135
fc9c8c9d
WB
136/// Write a pxar entry terminated by an additional zero which is not contained in the provided
137/// data buffer.
ec4a53ed
WB
138async fn seq_write_pxar_entry_zero<T>(
139 output: &mut T,
140 htype: u64,
141 data: &[u8],
142 position: &mut u64,
143) -> io::Result<()>
fc9c8c9d
WB
144where
145 T: SeqWrite + ?Sized,
146{
ec0761f9
FG
147 let header = format::Header::with_content_size(htype, 1 + data.len() as u64);
148 header.check_header_size()?;
149
ec4a53ed
WB
150 seq_write_struct(&mut *output, header, &mut *position).await?;
151 seq_write_all(&mut *output, data, &mut *position).await?;
152 seq_write_all(output, &[0u8], position).await
fc9c8c9d 153}
70acf637 154
fc9c8c9d 155/// Write a pxar entry consiting of an endian-swappable struct.
ec4a53ed
WB
156async fn seq_write_pxar_struct_entry<E, T>(
157 output: &mut T,
158 htype: u64,
159 data: E,
160 position: &mut u64,
161) -> io::Result<()>
fc9c8c9d
WB
162where
163 T: SeqWrite + ?Sized,
164 E: Endian,
165{
166 let data = data.to_le();
a08b84b7
WB
167 let buf =
168 unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) };
8bff2f7c 169 seq_write_pxar_entry(output, htype, buf, position).await
70acf637
WB
170}
171
59e6f679
WB
172/// Error conditions caused by wrong usage of this crate.
173#[derive(Clone, Copy, Debug, Eq, PartialEq)]
174pub enum EncodeError {
175 /// The user dropped a `File` without without finishing writing all of its contents.
176 ///
177 /// This is required because the payload lengths is written out at the begining and decoding
178 /// requires there to follow the right amount of data.
179 IncompleteFile,
180
181 /// The user dropped a directory without finalizing it.
182 ///
183 /// Finalizing is required to build the goodbye table at the end of a directory.
184 IncompleteDirectory,
185}
186
70acf637
WB
187#[derive(Default)]
188struct EncoderState {
189 /// Goodbye items for this directory, excluding the tail.
190 items: Vec<GoodbyeItem>,
191
59e6f679
WB
192 /// User caused error conditions.
193 encode_error: Option<EncodeError>,
70acf637
WB
194
195 /// Offset of this directory's ENTRY.
196 entry_offset: u64,
197
d4a04d53 198 #[allow(dead_code)]
70acf637
WB
199 /// Offset to this directory's first FILENAME.
200 files_offset: u64,
201
202 /// If this is a subdirectory, this points to the this directory's FILENAME.
203 file_offset: Option<u64>,
749855a4
WB
204
205 /// If this is a subdirectory, this contains this directory's hash for the goodbye item.
206 file_hash: u64,
ec4a53ed
WB
207
208 /// We need to keep track how much we have written to get offsets.
209 write_position: u64,
70acf637
WB
210}
211
59e6f679
WB
212impl EncoderState {
213 fn merge_error(&mut self, error: Option<EncodeError>) {
214 // one error is enough:
215 if self.encode_error.is_none() {
216 self.encode_error = error;
217 }
218 }
219
220 fn add_error(&mut self, error: EncodeError) {
221 self.merge_error(Some(error));
222 }
223}
224
d8402433
SR
225pub(crate) enum EncoderOutput<'a, T> {
226 Owned(T),
227 Borrowed(&'a mut T),
228}
229
b8d79e5f
WB
230impl<'a, T> EncoderOutput<'a, T> {
231 #[inline]
d4a04d53 232 fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
b8d79e5f
WB
233 where
234 'a: 's,
235 {
236 EncoderOutput::Borrowed(self.as_mut())
237 }
238}
239
d8402433
SR
240impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
241 fn as_mut(&mut self) -> &mut T {
242 match self {
b8d79e5f 243 EncoderOutput::Owned(o) => o,
d8402433
SR
244 EncoderOutput::Borrowed(b) => b,
245 }
246 }
247}
248
249impl<'a, T> std::convert::From<T> for EncoderOutput<'a, T> {
250 fn from(t: T) -> Self {
251 EncoderOutput::Owned(t)
252 }
253}
254
255impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
256 fn from(t: &'a mut T) -> Self {
257 EncoderOutput::Borrowed(t)
258 }
259}
260
70acf637
WB
261/// The encoder state machine implementation for a directory.
262///
263/// We use `async fn` to implement the encoder state machine so that we can easily plug in both
264/// synchronous or `async` I/O objects in as output.
265pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
d8402433 266 output: EncoderOutput<'a, T>,
70acf637
WB
267 state: EncoderState,
268 parent: Option<&'a mut EncoderState>,
269 finished: bool,
79e3f621
WB
270
271 /// Since only the "current" entry can be actively writing files, we share the file copy
272 /// buffer.
a08b84b7 273 file_copy_buffer: Arc<Mutex<Vec<u8>>>,
70acf637
WB
274}
275
276impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
277 fn drop(&mut self) {
278 if let Some(ref mut parent) = self.parent {
279 // propagate errors:
59e6f679
WB
280 parent.merge_error(self.state.encode_error);
281 if !self.finished {
282 parent.add_error(EncodeError::IncompleteDirectory);
70acf637
WB
283 }
284 } else if !self.finished {
285 // FIXME: how do we deal with this?
2f56c76c 286 // eprintln!("Encoder dropped without finishing!");
70acf637
WB
287 }
288 }
289}
290
291impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
737f75cf
WB
292 pub async fn new(
293 output: EncoderOutput<'a, T>,
294 metadata: &Metadata,
295 ) -> io::Result<EncoderImpl<'a, T>> {
70acf637
WB
296 if !metadata.is_dir() {
297 io_bail!("directory metadata must contain the directory mode flag");
298 }
299 let mut this = Self {
d8402433 300 output,
70acf637
WB
301 state: EncoderState::default(),
302 parent: None,
303 finished: false,
a08b84b7 304 file_copy_buffer: Arc::new(Mutex::new(crate::util::vec_new(1024 * 1024))),
70acf637
WB
305 };
306
307 this.encode_metadata(metadata).await?;
ec4a53ed 308 this.state.files_offset = this.position();
70acf637
WB
309
310 Ok(this)
311 }
312
313 fn check(&self) -> io::Result<()> {
59e6f679
WB
314 match self.state.encode_error {
315 Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
316 Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
a7149b09 317 None => Ok(()),
70acf637
WB
318 }
319 }
320
321 pub async fn create_file<'b>(
322 &'b mut self,
323 metadata: &Metadata,
324 file_name: &Path,
325 file_size: u64,
d8402433 326 ) -> io::Result<FileImpl<'b, T>>
70acf637
WB
327 where
328 'a: 'b,
329 {
330 self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size)
331 .await
332 }
333
334 async fn create_file_do<'b>(
335 &'b mut self,
336 metadata: &Metadata,
337 file_name: &[u8],
338 file_size: u64,
d8402433 339 ) -> io::Result<FileImpl<'b, T>>
70acf637
WB
340 where
341 'a: 'b,
342 {
343 self.check()?;
344
ec4a53ed 345 let file_offset = self.position();
2ab25a17 346 self.start_file_do(Some(metadata), file_name).await?;
70acf637 347
ec0761f9
FG
348 let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
349 header.check_header_size()?;
350
737f75cf 351 seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
70acf637 352
ec4a53ed 353 let payload_data_offset = self.position();
70acf637
WB
354
355 let meta_size = payload_data_offset - file_offset;
356
357 Ok(FileImpl {
d8402433 358 output: self.output.as_mut(),
70acf637
WB
359 goodbye_item: GoodbyeItem {
360 hash: format::hash_filename(file_name),
361 offset: file_offset,
362 size: file_size + meta_size,
363 },
364 remaining_size: file_size,
365 parent: &mut self.state,
366 })
367 }
368
d6748862 369 /// Return a file offset usable with `add_hardlink`.
70acf637
WB
370 pub async fn add_file(
371 &mut self,
372 metadata: &Metadata,
373 file_name: &Path,
374 file_size: u64,
375 content: &mut dyn SeqRead,
d6748862 376 ) -> io::Result<LinkOffset> {
a08b84b7 377 let buf = Arc::clone(&self.file_copy_buffer);
70acf637 378 let mut file = self.create_file(metadata, file_name, file_size).await?;
a08b84b7 379 let mut buf = buf.lock().expect("failed to lock temporary buffer mutex");
70acf637 380 loop {
79e3f621 381 let got = decoder::seq_read(&mut *content, &mut buf[..]).await?;
70acf637
WB
382 if got == 0 {
383 break;
384 } else {
385 file.write_all(&buf[..got]).await?;
386 }
387 }
d6748862 388 Ok(file.file_offset())
70acf637
WB
389 }
390
d6748862 391 /// Return a file offset usable with `add_hardlink`.
2fb73e7b
WB
392 pub async fn add_symlink(
393 &mut self,
394 metadata: &Metadata,
395 file_name: &Path,
396 target: &Path,
501ea220 397 ) -> io::Result<()> {
16f0464d
WB
398 let target = target.as_os_str().as_bytes();
399 let mut data = Vec::with_capacity(target.len() + 1);
400 data.extend(target);
401 data.push(0);
b0487d4f
WB
402 let _ofs: LinkOffset = self
403 .add_file_entry(
404 Some(metadata),
405 file_name,
16f0464d 406 Some((format::PXAR_SYMLINK, &data)),
b0487d4f
WB
407 )
408 .await?;
501ea220 409 Ok(())
0abc4121
WB
410 }
411
d6748862 412 /// Return a file offset usable with `add_hardlink`.
b0752929 413 pub async fn add_hardlink(
0abc4121 414 &mut self,
0abc4121
WB
415 file_name: &Path,
416 target: &Path,
63f2296f 417 target_offset: LinkOffset,
3fe26522 418 ) -> io::Result<()> {
ec4a53ed 419 let current_offset = self.position();
63f2296f
WB
420 if current_offset <= target_offset.0 {
421 io_bail!("invalid hardlink offset, can only point to prior files");
422 }
423
f3e50baa
WB
424 let offset_bytes = (current_offset - target_offset.0).to_le_bytes();
425 let target_bytes = target.as_os_str().as_bytes();
16f0464d 426 let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len() + 1);
f3e50baa
WB
427 hardlink.extend(&offset_bytes);
428 hardlink.extend(target_bytes);
16f0464d 429 hardlink.push(0);
b0487d4f 430 let _this_offset: LinkOffset = self
f3e50baa 431 .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink)))
b0487d4f 432 .await?;
d6748862 433 Ok(())
3fe26522
WB
434 }
435
d6748862 436 /// Return a file offset usable with `add_hardlink`.
3fe26522
WB
437 pub async fn add_device(
438 &mut self,
439 metadata: &Metadata,
440 file_name: &Path,
441 device: format::Device,
501ea220 442 ) -> io::Result<()> {
a7149b09
WB
443 if !metadata.is_device() {
444 io_bail!("entry added via add_device must have a device mode in its metadata");
445 }
446
3fe26522
WB
447 let device = device.to_le();
448 let device = unsafe {
449 std::slice::from_raw_parts(
450 &device as *const format::Device as *const u8,
451 size_of::<format::Device>(),
452 )
453 };
b0487d4f
WB
454 let _ofs: LinkOffset = self
455 .add_file_entry(
456 Some(metadata),
457 file_name,
458 Some((format::PXAR_DEVICE, device)),
459 )
460 .await?;
501ea220 461 Ok(())
3fe26522
WB
462 }
463
d6748862 464 /// Return a file offset usable with `add_hardlink`.
501ea220 465 pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
a7149b09
WB
466 if !metadata.is_fifo() {
467 io_bail!("entry added via add_device must be of type fifo in its metadata");
468 }
469
501ea220
WB
470 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
471 Ok(())
a7149b09
WB
472 }
473
d6748862 474 /// Return a file offset usable with `add_hardlink`.
501ea220 475 pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> {
a7149b09
WB
476 if !metadata.is_socket() {
477 io_bail!("entry added via add_device must be of type socket in its metadata");
478 }
479
501ea220
WB
480 let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?;
481 Ok(())
a7149b09
WB
482 }
483
d6748862 484 /// Return a file offset usable with `add_hardlink`.
3fe26522
WB
485 async fn add_file_entry(
486 &mut self,
2ab25a17 487 metadata: Option<&Metadata>,
3fe26522 488 file_name: &Path,
a7149b09 489 entry_htype_data: Option<(u64, &[u8])>,
d6748862 490 ) -> io::Result<LinkOffset> {
05356884 491 self.check()?;
2fb73e7b 492
ec4a53ed 493 let file_offset = self.position();
2fb73e7b
WB
494
495 let file_name = file_name.as_os_str().as_bytes();
2fb73e7b
WB
496
497 self.start_file_do(metadata, file_name).await?;
a7149b09 498 if let Some((htype, entry_data)) = entry_htype_data {
ec4a53ed 499 seq_write_pxar_entry(
d8402433 500 self.output.as_mut(),
ec4a53ed
WB
501 htype,
502 entry_data,
503 &mut self.state.write_position,
504 )
505 .await?;
a7149b09 506 }
2fb73e7b 507
ec4a53ed 508 let end_offset = self.position();
2fb73e7b
WB
509
510 self.state.items.push(GoodbyeItem {
511 hash: format::hash_filename(file_name),
512 offset: file_offset,
513 size: end_offset - file_offset,
514 });
515
d6748862 516 Ok(LinkOffset(file_offset))
2fb73e7b
WB
517 }
518
70acf637 519 #[inline]
ec4a53ed
WB
520 fn position(&mut self) -> u64 {
521 self.state.write_position
70acf637
WB
522 }
523
d8402433
SR
524 pub async fn create_directory(
525 &mut self,
70acf637
WB
526 file_name: &Path,
527 metadata: &Metadata,
d8402433 528 ) -> io::Result<EncoderImpl<'_, T>> {
70acf637
WB
529 self.check()?;
530
531 if !metadata.is_dir() {
532 io_bail!("directory metadata must contain the directory mode flag");
533 }
534
535 let file_name = file_name.as_os_str().as_bytes();
749855a4 536 let file_hash = format::hash_filename(file_name);
70acf637 537
ec4a53ed 538 let file_offset = self.position();
70acf637
WB
539 self.encode_filename(file_name).await?;
540
ec4a53ed 541 let entry_offset = self.position();
70acf637
WB
542 self.encode_metadata(&metadata).await?;
543
ec4a53ed
WB
544 let files_offset = self.position();
545
546 // the child will write to OUR state now:
547 let write_position = self.position();
70acf637 548
d8402433
SR
549 let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
550
70acf637 551 Ok(EncoderImpl {
d8402433 552 // always forward as Borrowed(), to avoid stacking references on nested calls
d4a04d53 553 output: self.output.to_borrowed_mut(),
70acf637
WB
554 state: EncoderState {
555 entry_offset,
556 files_offset,
557 file_offset: Some(file_offset),
1b25fc08 558 file_hash,
ec4a53ed 559 write_position,
70acf637
WB
560 ..Default::default()
561 },
562 parent: Some(&mut self.state),
563 finished: false,
d8402433 564 file_copy_buffer,
70acf637
WB
565 })
566 }
567
2ab25a17
WB
568 async fn start_file_do(
569 &mut self,
570 metadata: Option<&Metadata>,
571 file_name: &[u8],
572 ) -> io::Result<()> {
70acf637 573 self.encode_filename(file_name).await?;
2ab25a17
WB
574 if let Some(metadata) = metadata {
575 self.encode_metadata(&metadata).await?;
576 }
70acf637
WB
577 Ok(())
578 }
579
580 async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
ec4a53ed 581 seq_write_pxar_struct_entry(
d8402433 582 self.output.as_mut(),
ec4a53ed
WB
583 format::PXAR_ENTRY,
584 metadata.stat.clone(),
585 &mut self.state.write_position,
586 )
587 .await?;
b5a471a3
WB
588
589 for xattr in &metadata.xattrs {
590 self.write_xattr(xattr).await?;
591 }
592
593 self.write_acls(&metadata.acl).await?;
594
595 if let Some(fcaps) = &metadata.fcaps {
596 self.write_file_capabilities(fcaps).await?;
597 }
598
599 if let Some(qpid) = &metadata.quota_project_id {
600 self.write_quota_project_id(qpid).await?;
601 }
602
603 Ok(())
604 }
605
606 async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
ec4a53ed 607 seq_write_pxar_entry(
d8402433 608 self.output.as_mut(),
ec4a53ed
WB
609 format::PXAR_XATTR,
610 &xattr.data,
611 &mut self.state.write_position,
612 )
613 .await
b5a471a3
WB
614 }
615
616 async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
617 for acl in &acl.users {
ec4a53ed 618 seq_write_pxar_struct_entry(
d8402433 619 self.output.as_mut(),
ec4a53ed
WB
620 format::PXAR_ACL_USER,
621 acl.clone(),
622 &mut self.state.write_position,
623 )
624 .await?;
b5a471a3
WB
625 }
626
627 for acl in &acl.groups {
ec4a53ed 628 seq_write_pxar_struct_entry(
d8402433 629 self.output.as_mut(),
ec4a53ed
WB
630 format::PXAR_ACL_GROUP,
631 acl.clone(),
632 &mut self.state.write_position,
633 )
634 .await?;
b5a471a3
WB
635 }
636
637 if let Some(acl) = &acl.group_obj {
ec4a53ed 638 seq_write_pxar_struct_entry(
d8402433 639 self.output.as_mut(),
ec4a53ed
WB
640 format::PXAR_ACL_GROUP_OBJ,
641 acl.clone(),
642 &mut self.state.write_position,
643 )
644 .await?;
b5a471a3
WB
645 }
646
647 if let Some(acl) = &acl.default {
ec4a53ed 648 seq_write_pxar_struct_entry(
d8402433 649 self.output.as_mut(),
ec4a53ed
WB
650 format::PXAR_ACL_DEFAULT,
651 acl.clone(),
652 &mut self.state.write_position,
653 )
654 .await?;
b5a471a3
WB
655 }
656
657 for acl in &acl.default_users {
fc9c8c9d 658 seq_write_pxar_struct_entry(
d8402433 659 self.output.as_mut(),
fc9c8c9d
WB
660 format::PXAR_ACL_DEFAULT_USER,
661 acl.clone(),
ec4a53ed 662 &mut self.state.write_position,
fc9c8c9d
WB
663 )
664 .await?;
b5a471a3
WB
665 }
666
667 for acl in &acl.default_groups {
fc9c8c9d 668 seq_write_pxar_struct_entry(
d8402433 669 self.output.as_mut(),
fc9c8c9d
WB
670 format::PXAR_ACL_DEFAULT_GROUP,
671 acl.clone(),
ec4a53ed 672 &mut self.state.write_position,
fc9c8c9d
WB
673 )
674 .await?;
b5a471a3
WB
675 }
676
70acf637
WB
677 Ok(())
678 }
679
b5a471a3 680 async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
ec4a53ed 681 seq_write_pxar_entry(
d8402433 682 self.output.as_mut(),
ec4a53ed
WB
683 format::PXAR_FCAPS,
684 &fcaps.data,
685 &mut self.state.write_position,
686 )
687 .await
b5a471a3
WB
688 }
689
690 async fn write_quota_project_id(
691 &mut self,
692 quota_project_id: &format::QuotaProjectId,
693 ) -> io::Result<()> {
fc9c8c9d 694 seq_write_pxar_struct_entry(
d8402433 695 self.output.as_mut(),
fc9c8c9d 696 format::PXAR_QUOTA_PROJID,
1b25fc08 697 *quota_project_id,
ec4a53ed 698 &mut self.state.write_position,
fc9c8c9d
WB
699 )
700 .await
b5a471a3
WB
701 }
702
70acf637 703 async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
f3ac1c51 704 crate::util::validate_filename(file_name)?;
ec4a53ed 705 seq_write_pxar_entry_zero(
d8402433 706 self.output.as_mut(),
ec4a53ed
WB
707 format::PXAR_FILENAME,
708 file_name,
709 &mut self.state.write_position,
710 )
711 .await
70acf637
WB
712 }
713
d8402433 714 pub async fn finish(mut self) -> io::Result<()> {
70acf637 715 let tail_bytes = self.finish_goodbye_table().await?;
ec4a53ed 716 seq_write_pxar_entry(
d8402433 717 self.output.as_mut(),
ec4a53ed
WB
718 format::PXAR_GOODBYE,
719 &tail_bytes,
720 &mut self.state.write_position,
721 )
722 .await?;
723
e827b69a
WB
724 if let EncoderOutput::Owned(output) = &mut self.output {
725 flush(output).await?;
726 }
d995b531 727
ec4a53ed
WB
728 // done up here because of the self-borrow and to propagate
729 let end_offset = self.position();
730
749855a4 731 if let Some(parent) = &mut self.parent {
ec4a53ed
WB
732 parent.write_position = end_offset;
733
749855a4
WB
734 let file_offset = self
735 .state
736 .file_offset
737 .expect("internal error: parent set but no file_offset?");
738
749855a4
WB
739 parent.items.push(GoodbyeItem {
740 hash: self.state.file_hash,
741 offset: file_offset,
742 size: end_offset - file_offset,
743 });
744 }
70acf637 745 self.finished = true;
d8402433 746 Ok(())
70acf637
WB
747 }
748
749 async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
ec4a53ed 750 let goodbye_offset = self.position();
70acf637
WB
751
752 // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
753 let mut tail = take(&mut self.state.items);
754 let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
755 let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
756
749855a4
WB
757 // sort, then create a BST
758 tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash));
70acf637 759
749855a4
WB
760 let mut bst = Vec::with_capacity(tail.len() + 1);
761 unsafe {
762 bst.set_len(tail.len());
70acf637 763 }
749855a4
WB
764 binary_tree_array::copy(tail.len(), |src, dest| {
765 let mut item = tail[src].clone();
766 // fixup the goodbye table offsets to be relative and with the right endianess
767 item.offset = goodbye_offset - item.offset;
768 unsafe {
769 std::ptr::write(&mut bst[dest], item.to_le());
770 }
771 });
772 drop(tail);
773
774 bst.push(
775 GoodbyeItem {
776 hash: format::PXAR_GOODBYE_TAIL_MARKER,
777 offset: goodbye_offset - self.state.entry_offset,
778 size: goodbye_size,
779 }
780 .to_le(),
781 );
70acf637
WB
782
783 // turn this into a byte vector since after endian-swapping we can no longer guarantee that
784 // the items make sense:
749855a4
WB
785 let data = bst.as_mut_ptr() as *mut u8;
786 let capacity = bst.capacity() * size_of::<GoodbyeItem>();
787 forget(bst);
70acf637
WB
788 Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) })
789 }
790}
791
792/// Writer for a file object in a directory.
d8402433
SR
793pub struct FileImpl<'a, S: SeqWrite> {
794 output: &'a mut S,
70acf637
WB
795
796 /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
797 /// directly instead of on Drop of FileImpl?
798 goodbye_item: GoodbyeItem,
799
800 /// While writing data to this file, this is how much space we still have left, this must reach
801 /// exactly zero.
802 remaining_size: u64,
803
59e6f679 804 /// The directory containing this file. This is where we propagate the `IncompleteFile` error
70acf637
WB
805 /// to, and where we insert our `GoodbyeItem`.
806 parent: &'a mut EncoderState,
807}
808
d8402433 809impl<'a, S: SeqWrite> Drop for FileImpl<'a, S> {
70acf637
WB
810 fn drop(&mut self) {
811 if self.remaining_size != 0 {
59e6f679 812 self.parent.add_error(EncodeError::IncompleteFile);
70acf637
WB
813 }
814
815 self.parent.items.push(self.goodbye_item.clone());
816 }
817}
818
d8402433 819impl<'a, S: SeqWrite> FileImpl<'a, S> {
d6748862
WB
820 /// Get the file offset to be able to reference it with `add_hardlink`.
821 pub fn file_offset(&self) -> LinkOffset {
822 LinkOffset(self.goodbye_item.offset)
823 }
824
70acf637
WB
825 fn check_remaining(&self, size: usize) -> io::Result<()> {
826 if size as u64 > self.remaining_size {
827 io_bail!("attempted to write more than previously allocated");
828 } else {
829 Ok(())
830 }
831 }
832
05356884 833 /// Poll write interface to more easily connect to tokio/futures.
7aee9c1f 834 #[cfg(feature = "tokio-io")]
05356884
WB
835 pub fn poll_write(
836 self: Pin<&mut Self>,
837 cx: &mut Context,
838 data: &[u8],
839 ) -> Poll<io::Result<usize>> {
840 let this = self.get_mut();
841 this.check_remaining(data.len())?;
842 let output = unsafe { Pin::new_unchecked(&mut *this.output) };
843 match output.poll_seq_write(cx, data) {
844 Poll::Ready(Ok(put)) => {
845 this.remaining_size -= put as u64;
ec4a53ed 846 this.parent.write_position += put as u64;
05356884
WB
847 Poll::Ready(Ok(put))
848 }
849 other => other,
850 }
851 }
852
853 /// Poll flush interface to more easily connect to tokio/futures.
7aee9c1f 854 #[cfg(feature = "tokio-io")]
05356884 855 pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
737f75cf 856 unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
05356884
WB
857 }
858
859 /// Poll close/shutdown interface to more easily connect to tokio/futures.
ec4a53ed
WB
860 ///
861 /// This just calls flush, though, since we're just a virtual writer writing to the file
862 /// provided by our encoder.
7aee9c1f 863 #[cfg(feature = "tokio-io")]
05356884 864 pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
737f75cf 865 unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) }
05356884
WB
866 }
867
70acf637
WB
868 /// Write file data for the current file entry in a pxar archive.
869 ///
870 /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than
871 /// requested. Check the return value for how many. There's also a `write_all` method available
872 /// for convenience.
873 pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
874 self.check_remaining(data.len())?;
ec4a53ed
WB
875 let put =
876 poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
877 .await?;
84a1926f 878 //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
70acf637 879 self.remaining_size -= put as u64;
ec4a53ed 880 self.parent.write_position += put as u64;
70acf637
WB
881 Ok(put)
882 }
883
884 /// Completely write file data for the current file entry in a pxar archive.
885 pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
886 self.check_remaining(data.len())?;
d8402433 887 seq_write_all(self.output, data, &mut self.parent.write_position).await?;
70acf637
WB
888 self.remaining_size -= data.len() as u64;
889 Ok(())
890 }
891}
05356884
WB
892
893#[cfg(feature = "tokio-io")]
d8402433 894impl<'a, S: SeqWrite> tokio::io::AsyncWrite for FileImpl<'a, S> {
327e33f2 895 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
05356884
WB
896 FileImpl::poll_write(self, cx, buf)
897 }
898
899 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
900 FileImpl::poll_flush(self, cx)
901 }
902
903 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
904 FileImpl::poll_close(self, cx)
905 }
906}