]>
Commit | Line | Data |
---|---|---|
1 | //! The `pxar` encoder state machine. | |
2 | //! | |
3 | //! This is the implementation used by both the synchronous and async pxar wrappers. | |
4 | ||
5 | #![deny(missing_docs)] | |
6 | ||
7 | use std::io; | |
8 | use std::mem::{forget, size_of, size_of_val, take}; | |
9 | use std::os::unix::ffi::OsStrExt; | |
10 | use std::path::Path; | |
11 | use std::pin::Pin; | |
12 | use std::sync::{Arc, Mutex}; | |
13 | use std::task::{Context, Poll}; | |
14 | ||
15 | use endian_trait::Endian; | |
16 | ||
17 | use crate::binary_tree_array; | |
18 | use crate::decoder::{self, SeqRead}; | |
19 | use crate::format::{self, GoodbyeItem}; | |
20 | use crate::poll_fn::poll_fn; | |
21 | use crate::Metadata; | |
22 | ||
23 | pub mod aio; | |
24 | pub mod sync; | |
25 | ||
26 | #[doc(inline)] | |
27 | pub use sync::Encoder; | |
28 | ||
29 | /// File reference used to create hard links. | |
30 | #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)] | |
31 | pub struct LinkOffset(u64); | |
32 | ||
33 | impl LinkOffset { | |
34 | /// Get the raw byte offset of this link. | |
35 | #[inline] | |
36 | pub fn raw(self) -> u64 { | |
37 | self.0 | |
38 | } | |
39 | } | |
40 | ||
41 | /// Sequential write interface used by the encoder's state machine. | |
42 | /// | |
43 | /// This is our internal writer trait which is available for `std::io::Write` types in the | |
44 | /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous | |
45 | /// wrapper. | |
46 | pub trait SeqWrite { | |
47 | /// Attempt to perform a sequential write to the file. On success, the number of written bytes | |
48 | /// is returned as `Poll::Ready(Ok(bytes))`. | |
49 | /// | |
50 | /// If writing is not yet possible, `Poll::Pending` is returned and the current task will be | |
51 | /// notified via the `cx.waker()` when writing becomes possible. | |
52 | fn poll_seq_write( | |
53 | self: Pin<&mut Self>, | |
54 | cx: &mut Context, | |
55 | buf: &[u8], | |
56 | ) -> Poll<io::Result<usize>>; | |
57 | ||
58 | /// Attempt to flush the output, ensuring that all buffered data reaches the destination. | |
59 | /// | |
60 | /// On success, returns `Poll::Ready(Ok(()))`. | |
61 | /// | |
62 | /// If flushing cannot complete immediately, `Poll::Pending` is returned and the current task | |
63 | /// will be notified via `cx.waker()` when progress can be made. | |
64 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>; | |
65 | } | |
66 | ||
67 | /// Allow using trait objects for generics taking a `SeqWrite`. | |
68 | impl<S> SeqWrite for &mut S | |
69 | where | |
70 | S: SeqWrite + ?Sized, | |
71 | { | |
72 | fn poll_seq_write( | |
73 | self: Pin<&mut Self>, | |
74 | cx: &mut Context, | |
75 | buf: &[u8], | |
76 | ) -> Poll<io::Result<usize>> { | |
77 | unsafe { | |
78 | self.map_unchecked_mut(|this| &mut **this) | |
79 | .poll_seq_write(cx, buf) | |
80 | } | |
81 | } | |
82 | ||
83 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
84 | unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) } | |
85 | } | |
86 | } | |
87 | ||
88 | /// awaitable verison of `poll_seq_write`. | |
89 | async fn seq_write<T: SeqWrite + ?Sized>( | |
90 | output: &mut T, | |
91 | buf: &[u8], | |
92 | position: &mut u64, | |
93 | ) -> io::Result<usize> { | |
94 | let put = | |
95 | poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await?; | |
96 | *position += put as u64; | |
97 | Ok(put) | |
98 | } | |
99 | ||
100 | /// awaitable version of 'poll_flush'. | |
101 | async fn flush<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<()> { | |
102 | poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_flush(cx) }).await | |
103 | } | |
104 | ||
105 | /// Write the entire contents of a buffer, handling short writes. | |
106 | async fn seq_write_all<T: SeqWrite + ?Sized>( | |
107 | output: &mut T, | |
108 | mut buf: &[u8], | |
109 | position: &mut u64, | |
110 | ) -> io::Result<()> { | |
111 | while !buf.is_empty() { | |
112 | let got = seq_write(&mut *output, buf, &mut *position).await?; | |
113 | buf = &buf[got..]; | |
114 | } | |
115 | Ok(()) | |
116 | } | |
117 | ||
118 | /// Write an endian-swappable struct. | |
119 | async fn seq_write_struct<E: Endian, T>( | |
120 | output: &mut T, | |
121 | data: E, | |
122 | position: &mut u64, | |
123 | ) -> io::Result<()> | |
124 | where | |
125 | T: SeqWrite + ?Sized, | |
126 | { | |
127 | let data = data.to_le(); | |
128 | let buf = | |
129 | unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) }; | |
130 | seq_write_all(output, buf, position).await | |
131 | } | |
132 | ||
133 | /// Write a pxar entry. | |
134 | async fn seq_write_pxar_entry<T>( | |
135 | output: &mut T, | |
136 | htype: u64, | |
137 | data: &[u8], | |
138 | position: &mut u64, | |
139 | ) -> io::Result<()> | |
140 | where | |
141 | T: SeqWrite + ?Sized, | |
142 | { | |
143 | let header = format::Header::with_content_size(htype, data.len() as u64); | |
144 | header.check_header_size()?; | |
145 | ||
146 | seq_write_struct(&mut *output, header, &mut *position).await?; | |
147 | seq_write_all(output, data, position).await | |
148 | } | |
149 | ||
150 | /// Write a pxar entry terminated by an additional zero which is not contained in the provided | |
151 | /// data buffer. | |
152 | async fn seq_write_pxar_entry_zero<T>( | |
153 | output: &mut T, | |
154 | htype: u64, | |
155 | data: &[u8], | |
156 | position: &mut u64, | |
157 | ) -> io::Result<()> | |
158 | where | |
159 | T: SeqWrite + ?Sized, | |
160 | { | |
161 | let header = format::Header::with_content_size(htype, 1 + data.len() as u64); | |
162 | header.check_header_size()?; | |
163 | ||
164 | seq_write_struct(&mut *output, header, &mut *position).await?; | |
165 | seq_write_all(&mut *output, data, &mut *position).await?; | |
166 | seq_write_all(output, &[0u8], position).await | |
167 | } | |
168 | ||
169 | /// Write a pxar entry consiting of an endian-swappable struct. | |
170 | async fn seq_write_pxar_struct_entry<E, T>( | |
171 | output: &mut T, | |
172 | htype: u64, | |
173 | data: E, | |
174 | position: &mut u64, | |
175 | ) -> io::Result<()> | |
176 | where | |
177 | T: SeqWrite + ?Sized, | |
178 | E: Endian, | |
179 | { | |
180 | let data = data.to_le(); | |
181 | let buf = | |
182 | unsafe { std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) }; | |
183 | seq_write_pxar_entry(output, htype, buf, position).await | |
184 | } | |
185 | ||
186 | /// Error conditions caused by wrong usage of this crate. | |
187 | #[derive(Clone, Copy, Debug, Eq, PartialEq)] | |
188 | pub enum EncodeError { | |
189 | /// The user dropped a `File` without without finishing writing all of its contents. | |
190 | /// | |
191 | /// This is required because the payload lengths is written out at the begining and decoding | |
192 | /// requires there to follow the right amount of data. | |
193 | IncompleteFile, | |
194 | ||
195 | /// The user dropped a directory without finalizing it. | |
196 | /// | |
197 | /// Finalizing is required to build the goodbye table at the end of a directory. | |
198 | IncompleteDirectory, | |
199 | } | |
200 | ||
201 | #[derive(Default)] | |
202 | struct EncoderState { | |
203 | /// Goodbye items for this directory, excluding the tail. | |
204 | items: Vec<GoodbyeItem>, | |
205 | ||
206 | /// User caused error conditions. | |
207 | encode_error: Option<EncodeError>, | |
208 | ||
209 | /// Offset of this directory's ENTRY. | |
210 | entry_offset: u64, | |
211 | ||
212 | #[allow(dead_code)] | |
213 | /// Offset to this directory's first FILENAME. | |
214 | files_offset: u64, | |
215 | ||
216 | /// If this is a subdirectory, this points to the this directory's FILENAME. | |
217 | file_offset: Option<u64>, | |
218 | ||
219 | /// If this is a subdirectory, this contains this directory's hash for the goodbye item. | |
220 | file_hash: u64, | |
221 | ||
222 | /// We need to keep track how much we have written to get offsets. | |
223 | write_position: u64, | |
224 | } | |
225 | ||
226 | impl EncoderState { | |
227 | fn merge_error(&mut self, error: Option<EncodeError>) { | |
228 | // one error is enough: | |
229 | if self.encode_error.is_none() { | |
230 | self.encode_error = error; | |
231 | } | |
232 | } | |
233 | ||
234 | fn add_error(&mut self, error: EncodeError) { | |
235 | self.merge_error(Some(error)); | |
236 | } | |
237 | } | |
238 | ||
239 | pub(crate) enum EncoderOutput<'a, T> { | |
240 | Owned(T), | |
241 | Borrowed(&'a mut T), | |
242 | } | |
243 | ||
244 | impl<'a, T> EncoderOutput<'a, T> { | |
245 | #[inline] | |
246 | fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T> | |
247 | where | |
248 | 'a: 's, | |
249 | { | |
250 | EncoderOutput::Borrowed(self.as_mut()) | |
251 | } | |
252 | } | |
253 | ||
254 | impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> { | |
255 | fn as_mut(&mut self) -> &mut T { | |
256 | match self { | |
257 | EncoderOutput::Owned(o) => o, | |
258 | EncoderOutput::Borrowed(b) => b, | |
259 | } | |
260 | } | |
261 | } | |
262 | ||
263 | impl<'a, T> std::convert::From<T> for EncoderOutput<'a, T> { | |
264 | fn from(t: T) -> Self { | |
265 | EncoderOutput::Owned(t) | |
266 | } | |
267 | } | |
268 | ||
269 | impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> { | |
270 | fn from(t: &'a mut T) -> Self { | |
271 | EncoderOutput::Borrowed(t) | |
272 | } | |
273 | } | |
274 | ||
275 | /// The encoder state machine implementation for a directory. | |
276 | /// | |
277 | /// We use `async fn` to implement the encoder state machine so that we can easily plug in both | |
278 | /// synchronous or `async` I/O objects in as output. | |
279 | pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> { | |
280 | output: EncoderOutput<'a, T>, | |
281 | state: EncoderState, | |
282 | parent: Option<&'a mut EncoderState>, | |
283 | finished: bool, | |
284 | ||
285 | /// Since only the "current" entry can be actively writing files, we share the file copy | |
286 | /// buffer. | |
287 | file_copy_buffer: Arc<Mutex<Vec<u8>>>, | |
288 | } | |
289 | ||
290 | impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> { | |
291 | fn drop(&mut self) { | |
292 | if let Some(ref mut parent) = self.parent { | |
293 | // propagate errors: | |
294 | parent.merge_error(self.state.encode_error); | |
295 | if !self.finished { | |
296 | parent.add_error(EncodeError::IncompleteDirectory); | |
297 | } | |
298 | } else if !self.finished { | |
299 | // FIXME: how do we deal with this? | |
300 | // eprintln!("Encoder dropped without finishing!"); | |
301 | } | |
302 | } | |
303 | } | |
304 | ||
305 | impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { | |
306 | pub async fn new( | |
307 | output: EncoderOutput<'a, T>, | |
308 | metadata: &Metadata, | |
309 | ) -> io::Result<EncoderImpl<'a, T>> { | |
310 | if !metadata.is_dir() { | |
311 | io_bail!("directory metadata must contain the directory mode flag"); | |
312 | } | |
313 | let mut this = Self { | |
314 | output, | |
315 | state: EncoderState::default(), | |
316 | parent: None, | |
317 | finished: false, | |
318 | file_copy_buffer: Arc::new(Mutex::new(crate::util::vec_new(1024 * 1024))), | |
319 | }; | |
320 | ||
321 | this.encode_metadata(metadata).await?; | |
322 | this.state.files_offset = this.position(); | |
323 | ||
324 | Ok(this) | |
325 | } | |
326 | ||
327 | fn check(&self) -> io::Result<()> { | |
328 | match self.state.encode_error { | |
329 | Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"), | |
330 | Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"), | |
331 | None => Ok(()), | |
332 | } | |
333 | } | |
334 | ||
335 | pub async fn create_file<'b>( | |
336 | &'b mut self, | |
337 | metadata: &Metadata, | |
338 | file_name: &Path, | |
339 | file_size: u64, | |
340 | ) -> io::Result<FileImpl<'b, T>> | |
341 | where | |
342 | 'a: 'b, | |
343 | { | |
344 | self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size) | |
345 | .await | |
346 | } | |
347 | ||
348 | async fn create_file_do<'b>( | |
349 | &'b mut self, | |
350 | metadata: &Metadata, | |
351 | file_name: &[u8], | |
352 | file_size: u64, | |
353 | ) -> io::Result<FileImpl<'b, T>> | |
354 | where | |
355 | 'a: 'b, | |
356 | { | |
357 | self.check()?; | |
358 | ||
359 | let file_offset = self.position(); | |
360 | self.start_file_do(Some(metadata), file_name).await?; | |
361 | ||
362 | let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size); | |
363 | header.check_header_size()?; | |
364 | ||
365 | seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?; | |
366 | ||
367 | let payload_data_offset = self.position(); | |
368 | ||
369 | let meta_size = payload_data_offset - file_offset; | |
370 | ||
371 | Ok(FileImpl { | |
372 | output: self.output.as_mut(), | |
373 | goodbye_item: GoodbyeItem { | |
374 | hash: format::hash_filename(file_name), | |
375 | offset: file_offset, | |
376 | size: file_size + meta_size, | |
377 | }, | |
378 | remaining_size: file_size, | |
379 | parent: &mut self.state, | |
380 | }) | |
381 | } | |
382 | ||
383 | /// Return a file offset usable with `add_hardlink`. | |
384 | pub async fn add_file( | |
385 | &mut self, | |
386 | metadata: &Metadata, | |
387 | file_name: &Path, | |
388 | file_size: u64, | |
389 | content: &mut dyn SeqRead, | |
390 | ) -> io::Result<LinkOffset> { | |
391 | let buf = Arc::clone(&self.file_copy_buffer); | |
392 | let mut file = self.create_file(metadata, file_name, file_size).await?; | |
393 | let mut buf = buf.lock().expect("failed to lock temporary buffer mutex"); | |
394 | loop { | |
395 | let got = decoder::seq_read(&mut *content, &mut buf[..]).await?; | |
396 | if got == 0 { | |
397 | break; | |
398 | } else { | |
399 | file.write_all(&buf[..got]).await?; | |
400 | } | |
401 | } | |
402 | Ok(file.file_offset()) | |
403 | } | |
404 | ||
405 | /// Return a file offset usable with `add_hardlink`. | |
406 | pub async fn add_symlink( | |
407 | &mut self, | |
408 | metadata: &Metadata, | |
409 | file_name: &Path, | |
410 | target: &Path, | |
411 | ) -> io::Result<()> { | |
412 | let target = target.as_os_str().as_bytes(); | |
413 | let mut data = Vec::with_capacity(target.len() + 1); | |
414 | data.extend(target); | |
415 | data.push(0); | |
416 | let _ofs: LinkOffset = self | |
417 | .add_file_entry( | |
418 | Some(metadata), | |
419 | file_name, | |
420 | Some((format::PXAR_SYMLINK, &data)), | |
421 | ) | |
422 | .await?; | |
423 | Ok(()) | |
424 | } | |
425 | ||
426 | /// Return a file offset usable with `add_hardlink`. | |
427 | pub async fn add_hardlink( | |
428 | &mut self, | |
429 | file_name: &Path, | |
430 | target: &Path, | |
431 | target_offset: LinkOffset, | |
432 | ) -> io::Result<()> { | |
433 | let current_offset = self.position(); | |
434 | if current_offset <= target_offset.0 { | |
435 | io_bail!("invalid hardlink offset, can only point to prior files"); | |
436 | } | |
437 | ||
438 | let offset_bytes = (current_offset - target_offset.0).to_le_bytes(); | |
439 | let target_bytes = target.as_os_str().as_bytes(); | |
440 | let mut hardlink = Vec::with_capacity(offset_bytes.len() + target_bytes.len() + 1); | |
441 | hardlink.extend(&offset_bytes); | |
442 | hardlink.extend(target_bytes); | |
443 | hardlink.push(0); | |
444 | let _this_offset: LinkOffset = self | |
445 | .add_file_entry(None, file_name, Some((format::PXAR_HARDLINK, &hardlink))) | |
446 | .await?; | |
447 | Ok(()) | |
448 | } | |
449 | ||
450 | /// Return a file offset usable with `add_hardlink`. | |
451 | pub async fn add_device( | |
452 | &mut self, | |
453 | metadata: &Metadata, | |
454 | file_name: &Path, | |
455 | device: format::Device, | |
456 | ) -> io::Result<()> { | |
457 | if !metadata.is_device() { | |
458 | io_bail!("entry added via add_device must have a device mode in its metadata"); | |
459 | } | |
460 | ||
461 | let device = device.to_le(); | |
462 | let device = unsafe { | |
463 | std::slice::from_raw_parts( | |
464 | &device as *const format::Device as *const u8, | |
465 | size_of::<format::Device>(), | |
466 | ) | |
467 | }; | |
468 | let _ofs: LinkOffset = self | |
469 | .add_file_entry( | |
470 | Some(metadata), | |
471 | file_name, | |
472 | Some((format::PXAR_DEVICE, device)), | |
473 | ) | |
474 | .await?; | |
475 | Ok(()) | |
476 | } | |
477 | ||
478 | /// Return a file offset usable with `add_hardlink`. | |
479 | pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> { | |
480 | if !metadata.is_fifo() { | |
481 | io_bail!("entry added via add_device must be of type fifo in its metadata"); | |
482 | } | |
483 | ||
484 | let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?; | |
485 | Ok(()) | |
486 | } | |
487 | ||
488 | /// Return a file offset usable with `add_hardlink`. | |
489 | pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> { | |
490 | if !metadata.is_socket() { | |
491 | io_bail!("entry added via add_device must be of type socket in its metadata"); | |
492 | } | |
493 | ||
494 | let _ofs: LinkOffset = self.add_file_entry(Some(metadata), file_name, None).await?; | |
495 | Ok(()) | |
496 | } | |
497 | ||
498 | /// Return a file offset usable with `add_hardlink`. | |
499 | async fn add_file_entry( | |
500 | &mut self, | |
501 | metadata: Option<&Metadata>, | |
502 | file_name: &Path, | |
503 | entry_htype_data: Option<(u64, &[u8])>, | |
504 | ) -> io::Result<LinkOffset> { | |
505 | self.check()?; | |
506 | ||
507 | let file_offset = self.position(); | |
508 | ||
509 | let file_name = file_name.as_os_str().as_bytes(); | |
510 | ||
511 | self.start_file_do(metadata, file_name).await?; | |
512 | if let Some((htype, entry_data)) = entry_htype_data { | |
513 | seq_write_pxar_entry( | |
514 | self.output.as_mut(), | |
515 | htype, | |
516 | entry_data, | |
517 | &mut self.state.write_position, | |
518 | ) | |
519 | .await?; | |
520 | } | |
521 | ||
522 | let end_offset = self.position(); | |
523 | ||
524 | self.state.items.push(GoodbyeItem { | |
525 | hash: format::hash_filename(file_name), | |
526 | offset: file_offset, | |
527 | size: end_offset - file_offset, | |
528 | }); | |
529 | ||
530 | Ok(LinkOffset(file_offset)) | |
531 | } | |
532 | ||
533 | #[inline] | |
534 | fn position(&mut self) -> u64 { | |
535 | self.state.write_position | |
536 | } | |
537 | ||
538 | pub async fn create_directory( | |
539 | &mut self, | |
540 | file_name: &Path, | |
541 | metadata: &Metadata, | |
542 | ) -> io::Result<EncoderImpl<'_, T>> { | |
543 | self.check()?; | |
544 | ||
545 | if !metadata.is_dir() { | |
546 | io_bail!("directory metadata must contain the directory mode flag"); | |
547 | } | |
548 | ||
549 | let file_name = file_name.as_os_str().as_bytes(); | |
550 | let file_hash = format::hash_filename(file_name); | |
551 | ||
552 | let file_offset = self.position(); | |
553 | self.encode_filename(file_name).await?; | |
554 | ||
555 | let entry_offset = self.position(); | |
556 | self.encode_metadata(&metadata).await?; | |
557 | ||
558 | let files_offset = self.position(); | |
559 | ||
560 | // the child will write to OUR state now: | |
561 | let write_position = self.position(); | |
562 | ||
563 | let file_copy_buffer = Arc::clone(&self.file_copy_buffer); | |
564 | ||
565 | Ok(EncoderImpl { | |
566 | // always forward as Borrowed(), to avoid stacking references on nested calls | |
567 | output: self.output.to_borrowed_mut(), | |
568 | state: EncoderState { | |
569 | entry_offset, | |
570 | files_offset, | |
571 | file_offset: Some(file_offset), | |
572 | file_hash, | |
573 | write_position, | |
574 | ..Default::default() | |
575 | }, | |
576 | parent: Some(&mut self.state), | |
577 | finished: false, | |
578 | file_copy_buffer, | |
579 | }) | |
580 | } | |
581 | ||
582 | async fn start_file_do( | |
583 | &mut self, | |
584 | metadata: Option<&Metadata>, | |
585 | file_name: &[u8], | |
586 | ) -> io::Result<()> { | |
587 | self.encode_filename(file_name).await?; | |
588 | if let Some(metadata) = metadata { | |
589 | self.encode_metadata(&metadata).await?; | |
590 | } | |
591 | Ok(()) | |
592 | } | |
593 | ||
594 | async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> { | |
595 | seq_write_pxar_struct_entry( | |
596 | self.output.as_mut(), | |
597 | format::PXAR_ENTRY, | |
598 | metadata.stat.clone(), | |
599 | &mut self.state.write_position, | |
600 | ) | |
601 | .await?; | |
602 | ||
603 | for xattr in &metadata.xattrs { | |
604 | self.write_xattr(xattr).await?; | |
605 | } | |
606 | ||
607 | self.write_acls(&metadata.acl).await?; | |
608 | ||
609 | if let Some(fcaps) = &metadata.fcaps { | |
610 | self.write_file_capabilities(fcaps).await?; | |
611 | } | |
612 | ||
613 | if let Some(qpid) = &metadata.quota_project_id { | |
614 | self.write_quota_project_id(qpid).await?; | |
615 | } | |
616 | ||
617 | Ok(()) | |
618 | } | |
619 | ||
620 | async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> { | |
621 | seq_write_pxar_entry( | |
622 | self.output.as_mut(), | |
623 | format::PXAR_XATTR, | |
624 | &xattr.data, | |
625 | &mut self.state.write_position, | |
626 | ) | |
627 | .await | |
628 | } | |
629 | ||
630 | async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> { | |
631 | for acl in &acl.users { | |
632 | seq_write_pxar_struct_entry( | |
633 | self.output.as_mut(), | |
634 | format::PXAR_ACL_USER, | |
635 | acl.clone(), | |
636 | &mut self.state.write_position, | |
637 | ) | |
638 | .await?; | |
639 | } | |
640 | ||
641 | for acl in &acl.groups { | |
642 | seq_write_pxar_struct_entry( | |
643 | self.output.as_mut(), | |
644 | format::PXAR_ACL_GROUP, | |
645 | acl.clone(), | |
646 | &mut self.state.write_position, | |
647 | ) | |
648 | .await?; | |
649 | } | |
650 | ||
651 | if let Some(acl) = &acl.group_obj { | |
652 | seq_write_pxar_struct_entry( | |
653 | self.output.as_mut(), | |
654 | format::PXAR_ACL_GROUP_OBJ, | |
655 | acl.clone(), | |
656 | &mut self.state.write_position, | |
657 | ) | |
658 | .await?; | |
659 | } | |
660 | ||
661 | if let Some(acl) = &acl.default { | |
662 | seq_write_pxar_struct_entry( | |
663 | self.output.as_mut(), | |
664 | format::PXAR_ACL_DEFAULT, | |
665 | acl.clone(), | |
666 | &mut self.state.write_position, | |
667 | ) | |
668 | .await?; | |
669 | } | |
670 | ||
671 | for acl in &acl.default_users { | |
672 | seq_write_pxar_struct_entry( | |
673 | self.output.as_mut(), | |
674 | format::PXAR_ACL_DEFAULT_USER, | |
675 | acl.clone(), | |
676 | &mut self.state.write_position, | |
677 | ) | |
678 | .await?; | |
679 | } | |
680 | ||
681 | for acl in &acl.default_groups { | |
682 | seq_write_pxar_struct_entry( | |
683 | self.output.as_mut(), | |
684 | format::PXAR_ACL_DEFAULT_GROUP, | |
685 | acl.clone(), | |
686 | &mut self.state.write_position, | |
687 | ) | |
688 | .await?; | |
689 | } | |
690 | ||
691 | Ok(()) | |
692 | } | |
693 | ||
694 | async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> { | |
695 | seq_write_pxar_entry( | |
696 | self.output.as_mut(), | |
697 | format::PXAR_FCAPS, | |
698 | &fcaps.data, | |
699 | &mut self.state.write_position, | |
700 | ) | |
701 | .await | |
702 | } | |
703 | ||
704 | async fn write_quota_project_id( | |
705 | &mut self, | |
706 | quota_project_id: &format::QuotaProjectId, | |
707 | ) -> io::Result<()> { | |
708 | seq_write_pxar_struct_entry( | |
709 | self.output.as_mut(), | |
710 | format::PXAR_QUOTA_PROJID, | |
711 | *quota_project_id, | |
712 | &mut self.state.write_position, | |
713 | ) | |
714 | .await | |
715 | } | |
716 | ||
717 | async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> { | |
718 | crate::util::validate_filename(file_name)?; | |
719 | seq_write_pxar_entry_zero( | |
720 | self.output.as_mut(), | |
721 | format::PXAR_FILENAME, | |
722 | file_name, | |
723 | &mut self.state.write_position, | |
724 | ) | |
725 | .await | |
726 | } | |
727 | ||
728 | pub async fn finish(mut self) -> io::Result<()> { | |
729 | let tail_bytes = self.finish_goodbye_table().await?; | |
730 | seq_write_pxar_entry( | |
731 | self.output.as_mut(), | |
732 | format::PXAR_GOODBYE, | |
733 | &tail_bytes, | |
734 | &mut self.state.write_position, | |
735 | ) | |
736 | .await?; | |
737 | ||
738 | if let EncoderOutput::Owned(output) = &mut self.output { | |
739 | flush(output).await?; | |
740 | } | |
741 | ||
742 | // done up here because of the self-borrow and to propagate | |
743 | let end_offset = self.position(); | |
744 | ||
745 | if let Some(parent) = &mut self.parent { | |
746 | parent.write_position = end_offset; | |
747 | ||
748 | let file_offset = self | |
749 | .state | |
750 | .file_offset | |
751 | .expect("internal error: parent set but no file_offset?"); | |
752 | ||
753 | parent.items.push(GoodbyeItem { | |
754 | hash: self.state.file_hash, | |
755 | offset: file_offset, | |
756 | size: end_offset - file_offset, | |
757 | }); | |
758 | } | |
759 | self.finished = true; | |
760 | Ok(()) | |
761 | } | |
762 | ||
763 | async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> { | |
764 | let goodbye_offset = self.position(); | |
765 | ||
766 | // "take" out the tail (to not leave an array of endian-swapped structs in `self`) | |
767 | let mut tail = take(&mut self.state.items); | |
768 | let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>(); | |
769 | let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64; | |
770 | ||
771 | // sort, then create a BST | |
772 | tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash)); | |
773 | ||
774 | let mut bst = Vec::with_capacity(tail.len() + 1); | |
775 | unsafe { | |
776 | bst.set_len(tail.len()); | |
777 | } | |
778 | binary_tree_array::copy(tail.len(), |src, dest| { | |
779 | let mut item = tail[src].clone(); | |
780 | // fixup the goodbye table offsets to be relative and with the right endianess | |
781 | item.offset = goodbye_offset - item.offset; | |
782 | unsafe { | |
783 | std::ptr::write(&mut bst[dest], item.to_le()); | |
784 | } | |
785 | }); | |
786 | drop(tail); | |
787 | ||
788 | bst.push( | |
789 | GoodbyeItem { | |
790 | hash: format::PXAR_GOODBYE_TAIL_MARKER, | |
791 | offset: goodbye_offset - self.state.entry_offset, | |
792 | size: goodbye_size, | |
793 | } | |
794 | .to_le(), | |
795 | ); | |
796 | ||
797 | // turn this into a byte vector since after endian-swapping we can no longer guarantee that | |
798 | // the items make sense: | |
799 | let data = bst.as_mut_ptr() as *mut u8; | |
800 | let capacity = bst.capacity() * size_of::<GoodbyeItem>(); | |
801 | forget(bst); | |
802 | Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) }) | |
803 | } | |
804 | } | |
805 | ||
806 | /// Writer for a file object in a directory. | |
807 | pub(crate) struct FileImpl<'a, S: SeqWrite> { | |
808 | output: &'a mut S, | |
809 | ||
810 | /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it | |
811 | /// directly instead of on Drop of FileImpl? | |
812 | goodbye_item: GoodbyeItem, | |
813 | ||
814 | /// While writing data to this file, this is how much space we still have left, this must reach | |
815 | /// exactly zero. | |
816 | remaining_size: u64, | |
817 | ||
818 | /// The directory containing this file. This is where we propagate the `IncompleteFile` error | |
819 | /// to, and where we insert our `GoodbyeItem`. | |
820 | parent: &'a mut EncoderState, | |
821 | } | |
822 | ||
823 | impl<'a, S: SeqWrite> Drop for FileImpl<'a, S> { | |
824 | fn drop(&mut self) { | |
825 | if self.remaining_size != 0 { | |
826 | self.parent.add_error(EncodeError::IncompleteFile); | |
827 | } | |
828 | ||
829 | self.parent.items.push(self.goodbye_item.clone()); | |
830 | } | |
831 | } | |
832 | ||
833 | impl<'a, S: SeqWrite> FileImpl<'a, S> { | |
834 | /// Get the file offset to be able to reference it with `add_hardlink`. | |
835 | pub fn file_offset(&self) -> LinkOffset { | |
836 | LinkOffset(self.goodbye_item.offset) | |
837 | } | |
838 | ||
839 | fn check_remaining(&self, size: usize) -> io::Result<()> { | |
840 | if size as u64 > self.remaining_size { | |
841 | io_bail!("attempted to write more than previously allocated"); | |
842 | } else { | |
843 | Ok(()) | |
844 | } | |
845 | } | |
846 | ||
847 | /// Poll write interface to more easily connect to tokio/futures. | |
848 | #[cfg(feature = "tokio-io")] | |
849 | pub fn poll_write( | |
850 | self: Pin<&mut Self>, | |
851 | cx: &mut Context, | |
852 | data: &[u8], | |
853 | ) -> Poll<io::Result<usize>> { | |
854 | let this = self.get_mut(); | |
855 | this.check_remaining(data.len())?; | |
856 | let output = unsafe { Pin::new_unchecked(&mut *this.output) }; | |
857 | match output.poll_seq_write(cx, data) { | |
858 | Poll::Ready(Ok(put)) => { | |
859 | this.remaining_size -= put as u64; | |
860 | this.parent.write_position += put as u64; | |
861 | Poll::Ready(Ok(put)) | |
862 | } | |
863 | other => other, | |
864 | } | |
865 | } | |
866 | ||
867 | /// Poll flush interface to more easily connect to tokio/futures. | |
868 | #[cfg(feature = "tokio-io")] | |
869 | pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
870 | unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) } | |
871 | } | |
872 | ||
873 | /// Poll close/shutdown interface to more easily connect to tokio/futures. | |
874 | /// | |
875 | /// This just calls flush, though, since we're just a virtual writer writing to the file | |
876 | /// provided by our encoder. | |
877 | #[cfg(feature = "tokio-io")] | |
878 | pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
879 | unsafe { self.map_unchecked_mut(|this| this.output).poll_flush(cx) } | |
880 | } | |
881 | ||
882 | /// Write file data for the current file entry in a pxar archive. | |
883 | /// | |
884 | /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than | |
885 | /// requested. Check the return value for how many. There's also a `write_all` method available | |
886 | /// for convenience. | |
887 | pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> { | |
888 | self.check_remaining(data.len())?; | |
889 | let put = | |
890 | poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) }) | |
891 | .await?; | |
892 | //let put = seq_write(self.output.as_mut().unwrap(), data).await?; | |
893 | self.remaining_size -= put as u64; | |
894 | self.parent.write_position += put as u64; | |
895 | Ok(put) | |
896 | } | |
897 | ||
898 | /// Completely write file data for the current file entry in a pxar archive. | |
899 | pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> { | |
900 | self.check_remaining(data.len())?; | |
901 | seq_write_all(self.output, data, &mut self.parent.write_position).await?; | |
902 | self.remaining_size -= data.len() as u64; | |
903 | Ok(()) | |
904 | } | |
905 | } | |
906 | ||
907 | #[cfg(feature = "tokio-io")] | |
908 | impl<'a, S: SeqWrite> tokio::io::AsyncWrite for FileImpl<'a, S> { | |
909 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> { | |
910 | FileImpl::poll_write(self, cx, buf) | |
911 | } | |
912 | ||
913 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
914 | FileImpl::poll_flush(self, cx) | |
915 | } | |
916 | ||
917 | fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
918 | FileImpl::poll_close(self, cx) | |
919 | } | |
920 | } |