]> git.proxmox.com Git - pxar.git/commitdiff
encoder: move to stack based state tracking
authorChristian Ebner <c.ebner@proxmox.com>
Tue, 19 Mar 2024 13:51:08 +0000 (14:51 +0100)
committerChristian Ebner <c.ebner@proxmox.com>
Thu, 23 May 2024 12:39:11 +0000 (14:39 +0200)
In preparation for the proxmox-backup-client look-ahead caching,
where a passing around of different encoder instances with internal
references is not feasible.

Instead of creating a new encoder instance for each directory level
and keeping references to the parent state, use an internal stack.
Adds additional helper functions to solve borrow issues, when both
the state and writers have to be accessed by a mutable reference.

This is a breaking change in the pxar library API.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
examples/pxarcmd.rs
src/encoder/aio.rs
src/encoder/mod.rs
src/encoder/sync.rs
tests/simple/fs.rs
tests/simple/main.rs

index e0c779d577880658bb31234670b29e286282ddb7..0294eba4e381543f626430b50af1cc9e84f101ac 100644 (file)
@@ -106,6 +106,7 @@ fn cmd_create(mut args: std::env::ArgsOs) -> Result<(), Error> {
     let mut encoder = Encoder::create(file, &meta)?;
     add_directory(&mut encoder, dir, &dir_path, &mut HashMap::new())?;
     encoder.finish()?;
+    encoder.close()?;
 
     Ok(())
 }
@@ -138,14 +139,14 @@ fn add_directory<'a, T: SeqWrite + 'a>(
 
         let meta = Metadata::from(&file_meta);
         if file_type.is_dir() {
-            let mut dir = encoder.create_directory(file_name, &meta)?;
+            encoder.create_directory(file_name, &meta)?;
             add_directory(
-                &mut dir,
+                encoder,
                 std::fs::read_dir(file_path)?,
                 root_path,
                 &mut *hardlinks,
             )?;
-            dir.finish()?;
+            encoder.finish()?;
         } else if file_type.is_symlink() {
             todo!("symlink handling");
         } else if file_type.is_file() {
index ad25feaac8f2066e5bcef28d562785bfc6fa45b2..f11e57cde1979a51fa6489a71d53eb2c2471edf9 100644 (file)
@@ -98,20 +98,23 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
         &mut self,
         file_name: P,
         metadata: &Metadata,
-    ) -> io::Result<Encoder<'_, T>> {
-        Ok(Encoder {
-            inner: self
-                .inner
-                .create_directory(file_name.as_ref(), metadata)
-                .await?,
-        })
+    ) -> io::Result<()> {
+        self.inner
+            .create_directory(file_name.as_ref(), metadata)
+            .await
     }
 
-    /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
-    pub async fn finish(self) -> io::Result<()> {
+    /// Finish this directory. This is mandatory, encodes the end for the current directory.
+    pub async fn finish(&mut self) -> io::Result<()> {
         self.inner.finish().await
     }
 
+    /// Close the encoder instance. This is mandatory, encodes the end for the optional payload
+    /// output stream, if some is given
+    pub async fn close(self) -> io::Result<()> {
+        self.inner.close().await
+    }
+
     /// Add a symbolic link to the archive.
     pub async fn add_symlink<PF: AsRef<Path>, PT: AsRef<Path>>(
         &mut self,
@@ -295,11 +298,12 @@ mod test {
                 .await
                 .unwrap();
             {
-                let mut dir = encoder
+                encoder
                     .create_directory("baba", &Metadata::dir_builder(0o700).build())
                     .await
                     .unwrap();
-                dir.create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
+                encoder
+                    .create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
                     .await
                     .unwrap();
             }
index da41733df8d4805eca4a9dabb9fcd56443fea6da..2bc31286cdd2ad91adebfb36777df55df104173c 100644 (file)
@@ -221,9 +221,17 @@ struct EncoderState {
 
     /// We need to keep track how much we have written to get offsets.
     write_position: u64,
+
+    /// Mark the encoder state as correctly finished, ready to be dropped
+    finished: bool,
 }
 
 impl EncoderState {
+    #[inline]
+    fn position(&self) -> u64 {
+        self.write_position
+    }
+
     fn merge_error(&mut self, error: Option<EncodeError>) {
         // one error is enough:
         if self.encode_error.is_none() {
@@ -234,6 +242,23 @@ impl EncoderState {
     fn add_error(&mut self, error: EncodeError) {
         self.merge_error(Some(error));
     }
+
+    fn finish(&mut self) -> Option<EncodeError> {
+        self.finished = true;
+        self.encode_error.take()
+    }
+}
+
+impl Drop for EncoderState {
+    fn drop(&mut self) {
+        if !self.finished {
+            eprintln!("unfinished encoder state dropped");
+        }
+
+        if self.encode_error.is_some() {
+            eprintln!("finished encoder state with errors");
+        }
+    }
 }
 
 pub(crate) enum EncoderOutput<'a, T> {
@@ -241,16 +266,6 @@ pub(crate) enum EncoderOutput<'a, T> {
     Borrowed(&'a mut T),
 }
 
-impl<'a, T> EncoderOutput<'a, T> {
-    #[inline]
-    fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
-    where
-        'a: 's,
-    {
-        EncoderOutput::Borrowed(self.as_mut())
-    }
-}
-
 impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
     fn as_mut(&mut self) -> &mut T {
         match self {
@@ -278,8 +293,8 @@ impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
 /// synchronous or `async` I/O objects in as output.
 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
     output: EncoderOutput<'a, T>,
-    state: EncoderState,
-    parent: Option<&'a mut EncoderState>,
+    /// EncoderState stack storing the state for each directory level
+    state: Vec<EncoderState>,
     finished: bool,
 
     /// Since only the "current" entry can be actively writing files, we share the file copy
@@ -289,15 +304,12 @@ pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
 
 impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
     fn drop(&mut self) {
-        if let Some(ref mut parent) = self.parent {
-            // propagate errors:
-            parent.merge_error(self.state.encode_error);
-            if !self.finished {
-                parent.add_error(EncodeError::IncompleteDirectory);
-            }
-        } else if !self.finished {
-            // FIXME: how do we deal with this?
-            // eprintln!("Encoder dropped without finishing!");
+        if !self.finished {
+            eprintln!("unclosed encoder dropped");
+        }
+
+        if !self.state.is_empty() {
+            eprintln!("closed encoder dropped with state");
         }
     }
 }
@@ -312,8 +324,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         }
         let mut this = Self {
             output,
-            state: EncoderState::default(),
-            parent: None,
+            state: vec![EncoderState::default()],
             finished: false,
             file_copy_buffer: Arc::new(Mutex::new(unsafe {
                 crate::util::vec_new_uninitialized(1024 * 1024)
@@ -321,19 +332,45 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         };
 
         this.encode_metadata(metadata).await?;
-        this.state.files_offset = this.position();
+        let state = this.state_mut()?;
+        state.files_offset = state.position();
 
         Ok(this)
     }
 
     fn check(&self) -> io::Result<()> {
-        match self.state.encode_error {
+        if self.finished {
+            io_bail!("unexpected encoder finished state");
+        }
+        let state = self.state()?;
+        match state.encode_error {
             Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
             Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
             None => Ok(()),
         }
     }
 
+    fn state(&self) -> io::Result<&EncoderState> {
+        self.state
+            .last()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))
+    }
+
+    fn state_mut(&mut self) -> io::Result<&mut EncoderState> {
+        self.state
+            .last_mut()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))
+    }
+
+    fn output_state(&mut self) -> io::Result<(&mut T, &mut EncoderState)> {
+        Ok((
+            self.output.as_mut(),
+            self.state
+                .last_mut()
+                .ok_or_else(|| io_format_err!("encoder state stack underflow"))?,
+        ))
+    }
+
     pub async fn create_file<'b>(
         &'b mut self,
         metadata: &Metadata,
@@ -358,27 +395,27 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     {
         self.check()?;
 
-        let file_offset = self.position();
+        let file_offset = self.state()?.position();
         self.start_file_do(Some(metadata), file_name).await?;
 
         let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
         header.check_header_size()?;
+        let (output, state) = self.output_state()?;
+        seq_write_struct(output, header, &mut state.write_position).await?;
 
-        seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
-
-        let payload_data_offset = self.position();
+        let payload_data_offset = state.position();
 
         let meta_size = payload_data_offset - file_offset;
 
         Ok(FileImpl {
-            output: self.output.as_mut(),
+            output,
             goodbye_item: GoodbyeItem {
                 hash: format::hash_filename(file_name),
                 offset: file_offset,
                 size: file_size + meta_size,
             },
             remaining_size: file_size,
-            parent: &mut self.state,
+            parent: state,
         })
     }
 
@@ -459,7 +496,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         target: &Path,
         target_offset: LinkOffset,
     ) -> io::Result<()> {
-        let current_offset = self.position();
+        let current_offset = self.state()?.position();
         if current_offset <= target_offset.0 {
             io_bail!("invalid hardlink offset, can only point to prior files");
         }
@@ -533,24 +570,20 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     ) -> io::Result<LinkOffset> {
         self.check()?;
 
-        let file_offset = self.position();
+        let file_offset = self.state()?.position();
 
         let file_name = file_name.as_os_str().as_bytes();
 
         self.start_file_do(metadata, file_name).await?;
+
+        let (output, state) = self.output_state()?;
         if let Some((htype, entry_data)) = entry_htype_data {
-            seq_write_pxar_entry(
-                self.output.as_mut(),
-                htype,
-                entry_data,
-                &mut self.state.write_position,
-            )
-            .await?;
+            seq_write_pxar_entry(output, htype, entry_data, &mut state.write_position).await?;
         }
 
-        let end_offset = self.position();
+        let end_offset = state.position();
 
-        self.state.items.push(GoodbyeItem {
+        state.items.push(GoodbyeItem {
             hash: format::hash_filename(file_name),
             offset: file_offset,
             size: end_offset - file_offset,
@@ -559,16 +592,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         Ok(LinkOffset(file_offset))
     }
 
-    #[inline]
-    fn position(&mut self) -> u64 {
-        self.state.write_position
-    }
-
     pub async fn create_directory(
         &mut self,
         file_name: &Path,
         metadata: &Metadata,
-    ) -> io::Result<EncoderImpl<'_, T>> {
+    ) -> io::Result<()> {
         self.check()?;
 
         if !metadata.is_dir() {
@@ -578,34 +606,30 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         let file_name = file_name.as_os_str().as_bytes();
         let file_hash = format::hash_filename(file_name);
 
-        let file_offset = self.position();
+        let file_offset = self.state()?.position();
         self.encode_filename(file_name).await?;
 
-        let entry_offset = self.position();
+        let entry_offset = self.state()?.position();
         self.encode_metadata(metadata).await?;
 
-        let files_offset = self.position();
+        let state = self.state_mut()?;
+        let files_offset = state.position();
 
         // the child will write to OUR state now:
-        let write_position = self.position();
-
-        let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
-
-        Ok(EncoderImpl {
-            // always forward as Borrowed(), to avoid stacking references on nested calls
-            output: self.output.to_borrowed_mut(),
-            state: EncoderState {
-                entry_offset,
-                files_offset,
-                file_offset: Some(file_offset),
-                file_hash,
-                write_position,
-                ..Default::default()
-            },
-            parent: Some(&mut self.state),
+        let write_position = state.position();
+
+        self.state.push(EncoderState {
+            items: Vec::new(),
+            encode_error: None,
+            entry_offset,
+            files_offset,
+            file_offset: Some(file_offset),
+            file_hash,
+            write_position,
             finished: false,
-            file_copy_buffer,
-        })
+        });
+
+        Ok(())
     }
 
     async fn start_file_do(
@@ -621,11 +645,12 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
+        let (output, state) = self.output_state()?;
         seq_write_pxar_struct_entry(
-            self.output.as_mut(),
+            output,
             format::PXAR_ENTRY,
             metadata.stat.clone(),
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await?;
 
@@ -647,72 +672,74 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
+        let (output, state) = self.output_state()?;
         seq_write_pxar_entry(
-            self.output.as_mut(),
+            output,
             format::PXAR_XATTR,
             &xattr.data,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await
     }
 
     async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
+        let (output, state) = self.output_state()?;
         for acl in &acl.users {
             seq_write_pxar_struct_entry(
-                self.output.as_mut(),
+                output,
                 format::PXAR_ACL_USER,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
 
         for acl in &acl.groups {
             seq_write_pxar_struct_entry(
-                self.output.as_mut(),
+                output,
                 format::PXAR_ACL_GROUP,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
 
         if let Some(acl) = &acl.group_obj {
             seq_write_pxar_struct_entry(
-                self.output.as_mut(),
+                output,
                 format::PXAR_ACL_GROUP_OBJ,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
 
         if let Some(acl) = &acl.default {
             seq_write_pxar_struct_entry(
-                self.output.as_mut(),
+                output,
                 format::PXAR_ACL_DEFAULT,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
 
         for acl in &acl.default_users {
             seq_write_pxar_struct_entry(
-                self.output.as_mut(),
+                output,
                 format::PXAR_ACL_DEFAULT_USER,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
 
         for acl in &acl.default_groups {
             seq_write_pxar_struct_entry(
-                self.output.as_mut(),
+                output,
                 format::PXAR_ACL_DEFAULT_GROUP,
                 acl.clone(),
-                &mut self.state.write_position,
+                &mut state.write_position,
             )
             .await?;
         }
@@ -721,11 +748,12 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
     }
 
     async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
+        let (output, state) = self.output_state()?;
         seq_write_pxar_entry(
-            self.output.as_mut(),
+            output,
             format::PXAR_FCAPS,
             &fcaps.data,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await
     }
@@ -734,66 +762,89 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         &mut self,
         quota_project_id: &format::QuotaProjectId,
     ) -> io::Result<()> {
+        let (output, state) = self.output_state()?;
         seq_write_pxar_struct_entry(
-            self.output.as_mut(),
+            output,
             format::PXAR_QUOTA_PROJID,
             *quota_project_id,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await
     }
 
     async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
         crate::util::validate_filename(file_name)?;
+        let (output, state) = self.output_state()?;
         seq_write_pxar_entry_zero(
-            self.output.as_mut(),
+            output,
             format::PXAR_FILENAME,
             file_name,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await
     }
 
-    pub async fn finish(mut self) -> io::Result<()> {
+    pub async fn close(mut self) -> io::Result<()> {
+        if !self.state.is_empty() {
+            io_bail!("unexpected state on encoder close");
+        }
+
+        if let EncoderOutput::Owned(output) = &mut self.output {
+            flush(output).await?;
+        }
+
+        self.finished = true;
+
+        Ok(())
+    }
+
+    pub async fn finish(&mut self) -> io::Result<()> {
         let tail_bytes = self.finish_goodbye_table().await?;
+        let mut state = self
+            .state
+            .pop()
+            .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
         seq_write_pxar_entry(
             self.output.as_mut(),
             format::PXAR_GOODBYE,
             &tail_bytes,
-            &mut self.state.write_position,
+            &mut state.write_position,
         )
         .await?;
 
-        if let EncoderOutput::Owned(output) = &mut self.output {
-            flush(output).await?;
-        }
+        let end_offset = state.position();
 
-        // done up here because of the self-borrow and to propagate
-        let end_offset = self.position();
-
-        if let Some(parent) = &mut self.parent {
+        let encode_error = state.finish();
+        if let Some(parent) = self.state.last_mut() {
             parent.write_position = end_offset;
 
-            let file_offset = self
-                .state
+            let file_offset = state
                 .file_offset
                 .expect("internal error: parent set but no file_offset?");
 
             parent.items.push(GoodbyeItem {
-                hash: self.state.file_hash,
+                hash: state.file_hash,
                 offset: file_offset,
                 size: end_offset - file_offset,
             });
+            // propagate errors
+            parent.merge_error(encode_error);
+            Ok(())
+        } else {
+            match encode_error {
+                Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
+                Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
+                None => Ok(()),
+            }
         }
-        self.finished = true;
-        Ok(())
     }
 
     async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
-        let goodbye_offset = self.position();
+        let state = self.state_mut()?;
+        let goodbye_offset = state.position();
 
         // "take" out the tail (to not leave an array of endian-swapped structs in `self`)
-        let mut tail = take(&mut self.state.items);
+        let mut tail = take(&mut state.items);
         let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
         let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
 
@@ -818,7 +869,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         bst.push(
             GoodbyeItem {
                 hash: format::PXAR_GOODBYE_TAIL_MARKER,
-                offset: goodbye_offset - self.state.entry_offset,
+                offset: goodbye_offset - state.entry_offset,
                 size: goodbye_size,
             }
             .to_le(),
@@ -845,8 +896,8 @@ pub(crate) struct FileImpl<'a, S: SeqWrite> {
     /// exactly zero.
     remaining_size: u64,
 
-    /// The directory containing this file. This is where we propagate the `IncompleteFile` error
-    /// to, and where we insert our `GoodbyeItem`.
+    /// The directory stack with the last item being the directory containing this file. This is
+    /// where we propagate the `IncompleteFile` error to, and where we insert our `GoodbyeItem`.
     parent: &'a mut EncoderState,
 }
 
index 1ec91b8c58573cfb34b1d1bfabe5463b259339e6..48a97af2eb8e298de6020eefb203addf91e44525 100644 (file)
@@ -99,17 +99,21 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
         &mut self,
         file_name: P,
         metadata: &Metadata,
-    ) -> io::Result<Encoder<'_, T>> {
-        Ok(Encoder {
-            inner: poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))?,
-        })
+    ) -> io::Result<()> {
+        poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))
     }
 
-    /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
-    pub fn finish(self) -> io::Result<()> {
+    /// Finish this directory. This is mandatory, encodes the end for the current directory.
+    pub fn finish(&mut self) -> io::Result<()> {
         poll_result_once(self.inner.finish())
     }
 
+    /// Close the encoder instance. This is mandatory, encodes the end for the optional payload
+    /// output stream, if some is given
+    pub fn close(self) -> io::Result<()> {
+        poll_result_once(self.inner.close())
+    }
+
     /// Add a symbolic link to the archive.
     pub fn add_symlink<PF: AsRef<Path>, PT: AsRef<Path>>(
         &mut self,
index 9a89c4de254d55e8c828c1b7cdd299d130661073..42848054049a22e8beee973c14fd114ccf618b29 100644 (file)
@@ -144,12 +144,12 @@ impl Entry {
 
             EntryKind::Directory(entries) => {
                 self.no_hardlink()?;
-                let mut dir = encoder.create_directory(&self.name, &self.metadata)?;
+                encoder.create_directory(&self.name, &self.metadata)?;
                 let path = path.join(&self.name);
                 for entry in entries {
-                    entry.encode_into(&mut dir, hardlinks, &path)?;
+                    entry.encode_into(encoder, hardlinks, &path)?;
                 }
-                dir.finish()?;
+                encoder.finish()?;
             }
 
             EntryKind::Symlink(path) => {
index d661c7d0ad6075bd84bdf3704e1c66f8c1532122..e55457fd49a0ec7b7cc9cd372b62803fef11f621 100644 (file)
@@ -51,6 +51,9 @@ fn test1() {
     encoder
         .finish()
         .expect("failed to finish encoding the pxar archive");
+    encoder
+        .close()
+        .expect("failed to close the encoder instance");
 
     assert!(!file.is_empty(), "encoder did not write any data");