]>
Commit | Line | Data |
---|---|---|
70acf637 WB |
1 | //! The `pxar` encoder state machine. |
2 | //! | |
3 | //! This is the implementation used by both the synchronous and async pxar wrappers. | |
4 | ||
5 | use std::io; | |
6 | use std::mem::{forget, size_of, size_of_val, take}; | |
7 | use std::os::unix::ffi::OsStrExt; | |
8 | use std::path::Path; | |
9 | use std::pin::Pin; | |
10 | use std::task::{Context, Poll}; | |
11 | ||
12 | use endian_trait::Endian; | |
13 | ||
749855a4 | 14 | use crate::binary_tree_array; |
951620f1 | 15 | use crate::decoder::{self, SeqRead}; |
70acf637 WB |
16 | use crate::format::{self, GoodbyeItem}; |
17 | use crate::poll_fn::poll_fn; | |
18 | use crate::Metadata; | |
19 | ||
54109840 | 20 | pub mod aio; |
70acf637 WB |
21 | pub mod sync; |
22 | ||
23 | #[doc(inline)] | |
24 | pub use sync::Encoder; | |
25 | ||
26 | /// Sequential write interface used by the encoder's state machine. | |
27 | /// | |
28 | /// This is our internal writer trait which is available for `std::io::Write` types in the | |
29 | /// synchronous wrapper and for both `tokio` and `future` `AsyncWrite` types in the asynchronous | |
30 | /// wrapper. | |
31 | pub trait SeqWrite { | |
32 | fn poll_seq_write( | |
33 | self: Pin<&mut Self>, | |
34 | cx: &mut Context, | |
35 | buf: &[u8], | |
36 | ) -> Poll<io::Result<usize>>; | |
37 | ||
327e33f2 | 38 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>; |
05356884 | 39 | |
327e33f2 | 40 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>; |
05356884 | 41 | |
70acf637 WB |
42 | /// While writing to a pxar archive we need to remember how much dat we've written to track some |
43 | /// offsets. Particularly items like the goodbye table need to be able to compute offsets to | |
44 | /// further back in the archive. | |
45 | fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>>; | |
46 | ||
47 | /// To avoid recursively borrowing each time we nest into a subdirectory we add this helper. | |
48 | /// Otherwise starting a subdirectory will get a trait object pointing to `T`, nesting another | |
49 | /// subdirectory in that would have a trait object pointing to the trait object, and so on. | |
50 | fn as_trait_object(&mut self) -> &mut dyn SeqWrite | |
51 | where | |
52 | Self: Sized, | |
53 | { | |
54 | self as &mut dyn SeqWrite | |
55 | } | |
56 | } | |
57 | ||
58 | /// Allow using trait objects for generics taking a `SeqWrite`. | |
59 | impl<'a> SeqWrite for &mut (dyn SeqWrite + 'a) { | |
60 | fn poll_seq_write( | |
61 | self: Pin<&mut Self>, | |
62 | cx: &mut Context, | |
63 | buf: &[u8], | |
64 | ) -> Poll<io::Result<usize>> { | |
65 | unsafe { | |
66 | self.map_unchecked_mut(|this| &mut **this) | |
67 | .poll_seq_write(cx, buf) | |
68 | } | |
69 | } | |
70 | ||
05356884 WB |
71 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { |
72 | unsafe { self.map_unchecked_mut(|this| &mut **this).poll_flush(cx) } | |
73 | } | |
74 | ||
75 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
76 | unsafe { self.map_unchecked_mut(|this| &mut **this).poll_close(cx) } | |
77 | } | |
78 | ||
70acf637 WB |
79 | fn poll_position(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> { |
80 | unsafe { self.map_unchecked_mut(|this| &mut **this).poll_position(cx) } | |
81 | } | |
82 | ||
83 | fn as_trait_object(&mut self) -> &mut dyn SeqWrite | |
84 | where | |
85 | Self: Sized, | |
86 | { | |
87 | &mut **self | |
88 | } | |
89 | } | |
90 | ||
fc9c8c9d WB |
91 | /// awaitable version of `poll_position`. |
92 | async fn seq_write_position<T: SeqWrite + ?Sized>(output: &mut T) -> io::Result<u64> { | |
93 | poll_fn(move |cx| unsafe { Pin::new_unchecked(&mut *output).poll_position(cx) }).await | |
94 | } | |
70acf637 | 95 | |
fc9c8c9d WB |
96 | /// awaitable verison of `poll_seq_write`. |
97 | async fn seq_write<T: SeqWrite + ?Sized>(output: &mut T, buf: &[u8]) -> io::Result<usize> { | |
98 | poll_fn(|cx| unsafe { Pin::new_unchecked(&mut *output).poll_seq_write(cx, buf) }).await | |
99 | } | |
70acf637 | 100 | |
fc9c8c9d WB |
101 | /// Write the entire contents of a buffer, handling short writes. |
102 | async fn seq_write_all<T: SeqWrite + ?Sized>(output: &mut T, mut buf: &[u8]) -> io::Result<()> { | |
103 | while !buf.is_empty() { | |
104 | let got = seq_write(&mut *output, buf).await?; | |
105 | buf = &buf[got..]; | |
70acf637 | 106 | } |
fc9c8c9d WB |
107 | Ok(()) |
108 | } | |
70acf637 | 109 | |
fc9c8c9d WB |
110 | /// Write an endian-swappable struct. |
111 | async fn seq_write_struct<E: Endian, T>(output: &mut T, data: E) -> io::Result<()> | |
112 | where | |
113 | T: SeqWrite + ?Sized, | |
114 | { | |
115 | let data = data.to_le(); | |
116 | seq_write_all(output, unsafe { | |
117 | std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) | |
118 | }) | |
119 | .await | |
120 | } | |
70acf637 | 121 | |
fc9c8c9d WB |
122 | /// Write a pxar entry. |
123 | async fn seq_write_pxar_entry<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()> | |
124 | where | |
125 | T: SeqWrite + ?Sized, | |
126 | { | |
127 | seq_write_struct( | |
128 | &mut *output, | |
129 | format::Header::with_content_size(htype, data.len() as u64), | |
130 | ) | |
131 | .await?; | |
132 | seq_write_all(output, data).await | |
133 | } | |
70acf637 | 134 | |
fc9c8c9d WB |
135 | /// Write a pxar entry terminated by an additional zero which is not contained in the provided |
136 | /// data buffer. | |
137 | async fn seq_write_pxar_entry_zero<T>(output: &mut T, htype: u64, data: &[u8]) -> io::Result<()> | |
138 | where | |
139 | T: SeqWrite + ?Sized, | |
140 | { | |
141 | seq_write_struct( | |
142 | &mut *output, | |
143 | format::Header::with_content_size(htype, 1 + data.len() as u64), | |
144 | ) | |
145 | .await?; | |
146 | seq_write_all(&mut *output, data).await?; | |
147 | seq_write_all(output, &[0u8]).await | |
148 | } | |
70acf637 | 149 | |
fc9c8c9d WB |
150 | /// Write a pxar entry consiting of an endian-swappable struct. |
151 | async fn seq_write_pxar_struct_entry<E, T>(output: &mut T, htype: u64, data: E) -> io::Result<()> | |
152 | where | |
153 | T: SeqWrite + ?Sized, | |
154 | E: Endian, | |
155 | { | |
156 | let data = data.to_le(); | |
157 | seq_write_pxar_entry(output, htype, unsafe { | |
158 | std::slice::from_raw_parts(&data as *const E as *const u8, size_of_val(&data)) | |
159 | }) | |
160 | .await | |
70acf637 WB |
161 | } |
162 | ||
59e6f679 WB |
163 | /// Error conditions caused by wrong usage of this crate. |
164 | #[derive(Clone, Copy, Debug, Eq, PartialEq)] | |
165 | pub enum EncodeError { | |
166 | /// The user dropped a `File` without without finishing writing all of its contents. | |
167 | /// | |
168 | /// This is required because the payload lengths is written out at the begining and decoding | |
169 | /// requires there to follow the right amount of data. | |
170 | IncompleteFile, | |
171 | ||
172 | /// The user dropped a directory without finalizing it. | |
173 | /// | |
174 | /// Finalizing is required to build the goodbye table at the end of a directory. | |
175 | IncompleteDirectory, | |
176 | } | |
177 | ||
70acf637 WB |
178 | #[derive(Default)] |
179 | struct EncoderState { | |
180 | /// Goodbye items for this directory, excluding the tail. | |
181 | items: Vec<GoodbyeItem>, | |
182 | ||
59e6f679 WB |
183 | /// User caused error conditions. |
184 | encode_error: Option<EncodeError>, | |
70acf637 WB |
185 | |
186 | /// Offset of this directory's ENTRY. | |
187 | entry_offset: u64, | |
188 | ||
189 | /// Offset to this directory's first FILENAME. | |
190 | files_offset: u64, | |
191 | ||
192 | /// If this is a subdirectory, this points to the this directory's FILENAME. | |
193 | file_offset: Option<u64>, | |
749855a4 WB |
194 | |
195 | /// If this is a subdirectory, this contains this directory's hash for the goodbye item. | |
196 | file_hash: u64, | |
70acf637 WB |
197 | } |
198 | ||
59e6f679 WB |
199 | impl EncoderState { |
200 | fn merge_error(&mut self, error: Option<EncodeError>) { | |
201 | // one error is enough: | |
202 | if self.encode_error.is_none() { | |
203 | self.encode_error = error; | |
204 | } | |
205 | } | |
206 | ||
207 | fn add_error(&mut self, error: EncodeError) { | |
208 | self.merge_error(Some(error)); | |
209 | } | |
210 | } | |
211 | ||
70acf637 WB |
212 | /// The encoder state machine implementation for a directory. |
213 | /// | |
214 | /// We use `async fn` to implement the encoder state machine so that we can easily plug in both | |
215 | /// synchronous or `async` I/O objects in as output. | |
216 | pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> { | |
217 | output: T, | |
218 | state: EncoderState, | |
219 | parent: Option<&'a mut EncoderState>, | |
220 | finished: bool, | |
221 | } | |
222 | ||
223 | impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> { | |
224 | fn drop(&mut self) { | |
225 | if let Some(ref mut parent) = self.parent { | |
226 | // propagate errors: | |
59e6f679 WB |
227 | parent.merge_error(self.state.encode_error); |
228 | if !self.finished { | |
229 | parent.add_error(EncodeError::IncompleteDirectory); | |
70acf637 WB |
230 | } |
231 | } else if !self.finished { | |
232 | // FIXME: how do we deal with this? | |
2f56c76c | 233 | // eprintln!("Encoder dropped without finishing!"); |
70acf637 WB |
234 | } |
235 | } | |
236 | } | |
237 | ||
238 | impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { | |
239 | pub async fn new(output: T, metadata: &Metadata) -> io::Result<EncoderImpl<'a, T>> { | |
240 | if !metadata.is_dir() { | |
241 | io_bail!("directory metadata must contain the directory mode flag"); | |
242 | } | |
243 | let mut this = Self { | |
244 | output, | |
245 | state: EncoderState::default(), | |
246 | parent: None, | |
247 | finished: false, | |
248 | }; | |
249 | ||
250 | this.encode_metadata(metadata).await?; | |
fc9c8c9d | 251 | this.state.files_offset = seq_write_position(&mut this.output).await?; |
70acf637 WB |
252 | |
253 | Ok(this) | |
254 | } | |
255 | ||
256 | fn check(&self) -> io::Result<()> { | |
59e6f679 WB |
257 | match self.state.encode_error { |
258 | Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"), | |
259 | Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"), | |
a7149b09 | 260 | None => Ok(()), |
70acf637 WB |
261 | } |
262 | } | |
263 | ||
264 | pub async fn create_file<'b>( | |
265 | &'b mut self, | |
266 | metadata: &Metadata, | |
267 | file_name: &Path, | |
268 | file_size: u64, | |
269 | ) -> io::Result<FileImpl<'b>> | |
270 | where | |
271 | 'a: 'b, | |
272 | { | |
273 | self.create_file_do(metadata, file_name.as_os_str().as_bytes(), file_size) | |
274 | .await | |
275 | } | |
276 | ||
277 | async fn create_file_do<'b>( | |
278 | &'b mut self, | |
279 | metadata: &Metadata, | |
280 | file_name: &[u8], | |
281 | file_size: u64, | |
282 | ) -> io::Result<FileImpl<'b>> | |
283 | where | |
284 | 'a: 'b, | |
285 | { | |
286 | self.check()?; | |
287 | ||
fc9c8c9d | 288 | let file_offset = seq_write_position(&mut self.output).await?; |
70acf637 WB |
289 | self.start_file_do(metadata, file_name).await?; |
290 | ||
fc9c8c9d WB |
291 | seq_write_struct( |
292 | &mut self.output, | |
293 | format::Header::with_content_size(format::PXAR_PAYLOAD, file_size), | |
294 | ) | |
295 | .await?; | |
70acf637 | 296 | |
fc9c8c9d | 297 | let payload_data_offset = seq_write_position(&mut self.output).await?; |
70acf637 WB |
298 | |
299 | let meta_size = payload_data_offset - file_offset; | |
300 | ||
301 | Ok(FileImpl { | |
302 | output: &mut self.output, | |
303 | goodbye_item: GoodbyeItem { | |
304 | hash: format::hash_filename(file_name), | |
305 | offset: file_offset, | |
306 | size: file_size + meta_size, | |
307 | }, | |
308 | remaining_size: file_size, | |
309 | parent: &mut self.state, | |
310 | }) | |
311 | } | |
312 | ||
313 | pub async fn add_file( | |
314 | &mut self, | |
315 | metadata: &Metadata, | |
316 | file_name: &Path, | |
317 | file_size: u64, | |
318 | content: &mut dyn SeqRead, | |
319 | ) -> io::Result<()> { | |
320 | let mut file = self.create_file(metadata, file_name, file_size).await?; | |
321 | let mut buf = crate::util::vec_new(4096); | |
322 | loop { | |
951620f1 | 323 | let got = decoder::seq_read(&mut *content, &mut buf).await?; |
70acf637 WB |
324 | if got == 0 { |
325 | break; | |
326 | } else { | |
327 | file.write_all(&buf[..got]).await?; | |
328 | } | |
329 | } | |
330 | Ok(()) | |
331 | } | |
332 | ||
2fb73e7b WB |
333 | pub async fn add_symlink( |
334 | &mut self, | |
335 | metadata: &Metadata, | |
336 | file_name: &Path, | |
337 | target: &Path, | |
0abc4121 WB |
338 | ) -> io::Result<()> { |
339 | self.add_link(metadata, file_name, target, format::PXAR_SYMLINK) | |
340 | .await | |
341 | } | |
342 | ||
343 | pub async fn add_hardlink( | |
344 | &mut self, | |
345 | metadata: &Metadata, | |
346 | file_name: &Path, | |
347 | target: &Path, | |
348 | ) -> io::Result<()> { | |
349 | self.add_link(metadata, file_name, target, format::PXAR_HARDLINK) | |
350 | .await | |
351 | } | |
352 | ||
353 | async fn add_link( | |
354 | &mut self, | |
355 | metadata: &Metadata, | |
356 | file_name: &Path, | |
357 | target: &Path, | |
358 | htype: u64, | |
3fe26522 | 359 | ) -> io::Result<()> { |
a7149b09 WB |
360 | self.add_file_entry( |
361 | metadata, | |
362 | file_name, | |
363 | Some((htype, target.as_os_str().as_bytes())), | |
364 | ) | |
365 | .await | |
3fe26522 WB |
366 | } |
367 | ||
368 | pub async fn add_device( | |
369 | &mut self, | |
370 | metadata: &Metadata, | |
371 | file_name: &Path, | |
372 | device: format::Device, | |
373 | ) -> io::Result<()> { | |
a7149b09 WB |
374 | if !metadata.is_device() { |
375 | io_bail!("entry added via add_device must have a device mode in its metadata"); | |
376 | } | |
377 | ||
3fe26522 WB |
378 | let device = device.to_le(); |
379 | let device = unsafe { | |
380 | std::slice::from_raw_parts( | |
381 | &device as *const format::Device as *const u8, | |
382 | size_of::<format::Device>(), | |
383 | ) | |
384 | }; | |
a7149b09 | 385 | self.add_file_entry(metadata, file_name, Some((format::PXAR_DEVICE, device))) |
3fe26522 WB |
386 | .await |
387 | } | |
388 | ||
a7149b09 WB |
389 | pub async fn add_fifo(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> { |
390 | if !metadata.is_fifo() { | |
391 | io_bail!("entry added via add_device must be of type fifo in its metadata"); | |
392 | } | |
393 | ||
394 | self.add_file_entry(metadata, file_name, None).await | |
395 | } | |
396 | ||
397 | pub async fn add_socket(&mut self, metadata: &Metadata, file_name: &Path) -> io::Result<()> { | |
398 | if !metadata.is_socket() { | |
399 | io_bail!("entry added via add_device must be of type socket in its metadata"); | |
400 | } | |
401 | ||
402 | self.add_file_entry(metadata, file_name, None).await | |
403 | } | |
404 | ||
3fe26522 WB |
405 | async fn add_file_entry( |
406 | &mut self, | |
407 | metadata: &Metadata, | |
408 | file_name: &Path, | |
a7149b09 | 409 | entry_htype_data: Option<(u64, &[u8])>, |
2fb73e7b | 410 | ) -> io::Result<()> { |
05356884 | 411 | self.check()?; |
2fb73e7b | 412 | |
fc9c8c9d | 413 | let file_offset = seq_write_position(&mut self.output).await?; |
2fb73e7b WB |
414 | |
415 | let file_name = file_name.as_os_str().as_bytes(); | |
2fb73e7b WB |
416 | |
417 | self.start_file_do(metadata, file_name).await?; | |
a7149b09 | 418 | if let Some((htype, entry_data)) = entry_htype_data { |
fc9c8c9d | 419 | seq_write_pxar_entry_zero(&mut self.output, htype, entry_data).await?; |
a7149b09 | 420 | } |
2fb73e7b | 421 | |
fc9c8c9d | 422 | let end_offset = seq_write_position(&mut self.output).await?; |
2fb73e7b WB |
423 | |
424 | self.state.items.push(GoodbyeItem { | |
425 | hash: format::hash_filename(file_name), | |
426 | offset: file_offset, | |
427 | size: end_offset - file_offset, | |
428 | }); | |
429 | ||
430 | Ok(()) | |
431 | } | |
432 | ||
70acf637 WB |
433 | /// Helper |
434 | #[inline] | |
435 | async fn position(&mut self) -> io::Result<u64> { | |
fc9c8c9d | 436 | seq_write_position(&mut self.output).await |
70acf637 WB |
437 | } |
438 | ||
439 | pub async fn create_directory<'b>( | |
440 | &'b mut self, | |
441 | file_name: &Path, | |
442 | metadata: &Metadata, | |
443 | ) -> io::Result<EncoderImpl<'b, &'b mut dyn SeqWrite>> | |
444 | where | |
445 | 'a: 'b, | |
446 | { | |
447 | self.check()?; | |
448 | ||
449 | if !metadata.is_dir() { | |
450 | io_bail!("directory metadata must contain the directory mode flag"); | |
451 | } | |
452 | ||
453 | let file_name = file_name.as_os_str().as_bytes(); | |
749855a4 | 454 | let file_hash = format::hash_filename(file_name); |
70acf637 WB |
455 | |
456 | let file_offset = self.position().await?; | |
457 | self.encode_filename(file_name).await?; | |
458 | ||
459 | let entry_offset = self.position().await?; | |
460 | self.encode_metadata(&metadata).await?; | |
461 | ||
462 | let files_offset = self.position().await?; | |
463 | ||
464 | Ok(EncoderImpl { | |
465 | output: self.output.as_trait_object(), | |
466 | state: EncoderState { | |
467 | entry_offset, | |
468 | files_offset, | |
469 | file_offset: Some(file_offset), | |
749855a4 | 470 | file_hash: file_hash, |
70acf637 WB |
471 | ..Default::default() |
472 | }, | |
473 | parent: Some(&mut self.state), | |
474 | finished: false, | |
475 | }) | |
476 | } | |
477 | ||
478 | async fn start_file_do(&mut self, metadata: &Metadata, file_name: &[u8]) -> io::Result<()> { | |
479 | self.encode_filename(file_name).await?; | |
480 | self.encode_metadata(&metadata).await?; | |
481 | Ok(()) | |
482 | } | |
483 | ||
484 | async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> { | |
fc9c8c9d | 485 | seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ENTRY, metadata.stat.clone()) |
70acf637 | 486 | .await?; |
b5a471a3 WB |
487 | |
488 | for xattr in &metadata.xattrs { | |
489 | self.write_xattr(xattr).await?; | |
490 | } | |
491 | ||
492 | self.write_acls(&metadata.acl).await?; | |
493 | ||
494 | if let Some(fcaps) = &metadata.fcaps { | |
495 | self.write_file_capabilities(fcaps).await?; | |
496 | } | |
497 | ||
498 | if let Some(qpid) = &metadata.quota_project_id { | |
499 | self.write_quota_project_id(qpid).await?; | |
500 | } | |
501 | ||
502 | Ok(()) | |
503 | } | |
504 | ||
505 | async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> { | |
fc9c8c9d | 506 | seq_write_pxar_entry(&mut self.output, format::PXAR_XATTR, &xattr.data).await |
b5a471a3 WB |
507 | } |
508 | ||
509 | async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> { | |
510 | for acl in &acl.users { | |
fc9c8c9d | 511 | seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_USER, acl.clone()) |
b5a471a3 WB |
512 | .await?; |
513 | } | |
514 | ||
515 | for acl in &acl.groups { | |
fc9c8c9d | 516 | seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP, acl.clone()) |
b5a471a3 WB |
517 | .await?; |
518 | } | |
519 | ||
520 | if let Some(acl) = &acl.group_obj { | |
fc9c8c9d | 521 | seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_GROUP_OBJ, acl.clone()) |
b5a471a3 WB |
522 | .await?; |
523 | } | |
524 | ||
525 | if let Some(acl) = &acl.default { | |
fc9c8c9d | 526 | seq_write_pxar_struct_entry(&mut self.output, format::PXAR_ACL_DEFAULT, acl.clone()) |
b5a471a3 WB |
527 | .await?; |
528 | } | |
529 | ||
530 | for acl in &acl.default_users { | |
fc9c8c9d WB |
531 | seq_write_pxar_struct_entry( |
532 | &mut self.output, | |
533 | format::PXAR_ACL_DEFAULT_USER, | |
534 | acl.clone(), | |
535 | ) | |
536 | .await?; | |
b5a471a3 WB |
537 | } |
538 | ||
539 | for acl in &acl.default_groups { | |
fc9c8c9d WB |
540 | seq_write_pxar_struct_entry( |
541 | &mut self.output, | |
542 | format::PXAR_ACL_DEFAULT_GROUP, | |
543 | acl.clone(), | |
544 | ) | |
545 | .await?; | |
b5a471a3 WB |
546 | } |
547 | ||
70acf637 WB |
548 | Ok(()) |
549 | } | |
550 | ||
b5a471a3 | 551 | async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> { |
fc9c8c9d | 552 | seq_write_pxar_entry(&mut self.output, format::PXAR_FCAPS, &fcaps.data).await |
b5a471a3 WB |
553 | } |
554 | ||
555 | async fn write_quota_project_id( | |
556 | &mut self, | |
557 | quota_project_id: &format::QuotaProjectId, | |
558 | ) -> io::Result<()> { | |
fc9c8c9d WB |
559 | seq_write_pxar_struct_entry( |
560 | &mut self.output, | |
561 | format::PXAR_QUOTA_PROJID, | |
562 | quota_project_id.clone(), | |
563 | ) | |
564 | .await | |
b5a471a3 WB |
565 | } |
566 | ||
70acf637 | 567 | async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> { |
fc9c8c9d | 568 | seq_write_pxar_entry_zero(&mut self.output, format::PXAR_FILENAME, file_name).await |
70acf637 WB |
569 | } |
570 | ||
571 | pub async fn finish(mut self) -> io::Result<()> { | |
572 | let tail_bytes = self.finish_goodbye_table().await?; | |
fc9c8c9d | 573 | seq_write_pxar_entry(&mut self.output, format::PXAR_GOODBYE, &tail_bytes).await?; |
749855a4 WB |
574 | if let Some(parent) = &mut self.parent { |
575 | let file_offset = self | |
576 | .state | |
577 | .file_offset | |
578 | .expect("internal error: parent set but no file_offset?"); | |
579 | ||
fc9c8c9d | 580 | let end_offset = seq_write_position(&mut self.output).await?; |
749855a4 WB |
581 | |
582 | parent.items.push(GoodbyeItem { | |
583 | hash: self.state.file_hash, | |
584 | offset: file_offset, | |
585 | size: end_offset - file_offset, | |
586 | }); | |
587 | } | |
70acf637 WB |
588 | self.finished = true; |
589 | Ok(()) | |
590 | } | |
591 | ||
592 | async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> { | |
fc9c8c9d | 593 | let goodbye_offset = seq_write_position(&mut self.output).await?; |
70acf637 WB |
594 | |
595 | // "take" out the tail (to not leave an array of endian-swapped structs in `self`) | |
596 | let mut tail = take(&mut self.state.items); | |
597 | let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>(); | |
598 | let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64; | |
599 | ||
749855a4 WB |
600 | // sort, then create a BST |
601 | tail.sort_unstable_by(|a, b| a.hash.cmp(&b.hash)); | |
70acf637 | 602 | |
749855a4 WB |
603 | let mut bst = Vec::with_capacity(tail.len() + 1); |
604 | unsafe { | |
605 | bst.set_len(tail.len()); | |
70acf637 | 606 | } |
749855a4 WB |
607 | binary_tree_array::copy(tail.len(), |src, dest| { |
608 | let mut item = tail[src].clone(); | |
609 | // fixup the goodbye table offsets to be relative and with the right endianess | |
610 | item.offset = goodbye_offset - item.offset; | |
611 | unsafe { | |
612 | std::ptr::write(&mut bst[dest], item.to_le()); | |
613 | } | |
614 | }); | |
615 | drop(tail); | |
616 | ||
617 | bst.push( | |
618 | GoodbyeItem { | |
619 | hash: format::PXAR_GOODBYE_TAIL_MARKER, | |
620 | offset: goodbye_offset - self.state.entry_offset, | |
621 | size: goodbye_size, | |
622 | } | |
623 | .to_le(), | |
624 | ); | |
70acf637 WB |
625 | |
626 | // turn this into a byte vector since after endian-swapping we can no longer guarantee that | |
627 | // the items make sense: | |
749855a4 WB |
628 | let data = bst.as_mut_ptr() as *mut u8; |
629 | let capacity = bst.capacity() * size_of::<GoodbyeItem>(); | |
630 | forget(bst); | |
70acf637 WB |
631 | Ok(unsafe { Vec::from_raw_parts(data, tail_size, capacity) }) |
632 | } | |
633 | } | |
634 | ||
635 | /// Writer for a file object in a directory. | |
636 | pub struct FileImpl<'a> { | |
637 | output: &'a mut dyn SeqWrite, | |
638 | ||
639 | /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it | |
640 | /// directly instead of on Drop of FileImpl? | |
641 | goodbye_item: GoodbyeItem, | |
642 | ||
643 | /// While writing data to this file, this is how much space we still have left, this must reach | |
644 | /// exactly zero. | |
645 | remaining_size: u64, | |
646 | ||
59e6f679 | 647 | /// The directory containing this file. This is where we propagate the `IncompleteFile` error |
70acf637 WB |
648 | /// to, and where we insert our `GoodbyeItem`. |
649 | parent: &'a mut EncoderState, | |
650 | } | |
651 | ||
652 | impl<'a> Drop for FileImpl<'a> { | |
653 | fn drop(&mut self) { | |
654 | if self.remaining_size != 0 { | |
59e6f679 | 655 | self.parent.add_error(EncodeError::IncompleteFile); |
70acf637 WB |
656 | } |
657 | ||
658 | self.parent.items.push(self.goodbye_item.clone()); | |
659 | } | |
660 | } | |
661 | ||
662 | impl<'a> FileImpl<'a> { | |
663 | fn check_remaining(&self, size: usize) -> io::Result<()> { | |
664 | if size as u64 > self.remaining_size { | |
665 | io_bail!("attempted to write more than previously allocated"); | |
666 | } else { | |
667 | Ok(()) | |
668 | } | |
669 | } | |
670 | ||
05356884 WB |
671 | /// Poll write interface to more easily connect to tokio/futures. |
672 | #[cfg(any(feature = "tokio-io", feature = "futures-io"))] | |
673 | pub fn poll_write( | |
674 | self: Pin<&mut Self>, | |
675 | cx: &mut Context, | |
676 | data: &[u8], | |
677 | ) -> Poll<io::Result<usize>> { | |
678 | let this = self.get_mut(); | |
679 | this.check_remaining(data.len())?; | |
680 | let output = unsafe { Pin::new_unchecked(&mut *this.output) }; | |
681 | match output.poll_seq_write(cx, data) { | |
682 | Poll::Ready(Ok(put)) => { | |
683 | this.remaining_size -= put as u64; | |
684 | Poll::Ready(Ok(put)) | |
685 | } | |
686 | other => other, | |
687 | } | |
688 | } | |
689 | ||
690 | /// Poll flush interface to more easily connect to tokio/futures. | |
691 | #[cfg(any(feature = "tokio-io", feature = "futures-io"))] | |
692 | pub fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
693 | unsafe { | |
327e33f2 WB |
694 | self.map_unchecked_mut(|this| &mut this.output) |
695 | .poll_flush(cx) | |
05356884 WB |
696 | } |
697 | } | |
698 | ||
699 | /// Poll close/shutdown interface to more easily connect to tokio/futures. | |
700 | #[cfg(any(feature = "tokio-io", feature = "futures-io"))] | |
701 | pub fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
702 | unsafe { | |
327e33f2 WB |
703 | self.map_unchecked_mut(|this| &mut this.output) |
704 | .poll_close(cx) | |
05356884 WB |
705 | } |
706 | } | |
707 | ||
70acf637 WB |
708 | /// Write file data for the current file entry in a pxar archive. |
709 | /// | |
710 | /// This forwards to the output's `SeqWrite::poll_seq_write` and may write fewer bytes than | |
711 | /// requested. Check the return value for how many. There's also a `write_all` method available | |
712 | /// for convenience. | |
713 | pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> { | |
714 | self.check_remaining(data.len())?; | |
fc9c8c9d | 715 | let put = seq_write(&mut self.output, data).await?; |
70acf637 WB |
716 | self.remaining_size -= put as u64; |
717 | Ok(put) | |
718 | } | |
719 | ||
720 | /// Completely write file data for the current file entry in a pxar archive. | |
721 | pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> { | |
722 | self.check_remaining(data.len())?; | |
fc9c8c9d | 723 | seq_write_all(&mut self.output, data).await?; |
70acf637 WB |
724 | self.remaining_size -= data.len() as u64; |
725 | Ok(()) | |
726 | } | |
727 | } | |
05356884 WB |
728 | |
729 | #[cfg(feature = "tokio-io")] | |
730 | impl<'a> tokio::io::AsyncWrite for FileImpl<'a> { | |
327e33f2 | 731 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> { |
05356884 WB |
732 | FileImpl::poll_write(self, cx, buf) |
733 | } | |
734 | ||
735 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
736 | FileImpl::poll_flush(self, cx) | |
737 | } | |
738 | ||
739 | fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
740 | FileImpl::poll_close(self, cx) | |
741 | } | |
742 | } | |
743 | ||
744 | #[cfg(feature = "futures-io")] | |
745 | impl<'a> futures::io::AsyncWrite for FileImpl<'a> { | |
327e33f2 | 746 | fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> { |
05356884 WB |
747 | FileImpl::poll_write(self, cx, buf) |
748 | } | |
749 | ||
750 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
751 | FileImpl::poll_flush(self, cx) | |
752 | } | |
753 | ||
754 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
755 | FileImpl::poll_close(self, cx) | |
756 | } | |
757 | } |