]> git.proxmox.com Git - proxmox-backup.git/commitdiff
Revert "src/backup/chunk_stream.rs: simplify code"
authorDietmar Maurer <dietmar@proxmox.com>
Sun, 19 May 2019 09:05:56 +0000 (11:05 +0200)
committerDietmar Maurer <dietmar@proxmox.com>
Sun, 19 May 2019 09:05:56 +0000 (11:05 +0200)
This reverts commit e058744d8f4dc1d87fe2a28cd79bc385e934f563.
The optimization was wrong, and produces wrong chunk boundaries.

src/backup/chunk_stream.rs

index eac9dca17cf76995e2a3964ed1fd44d9d9931d40..e21224dc4fa70fe27aced08f9745b8010f539579 100644 (file)
@@ -9,12 +9,13 @@ pub struct ChunkStream<S: Stream<Item=Vec<u8>, Error=Error>> {
     input: S,
     chunker: Chunker,
     buffer: Option<Vec<u8>>,
+    rest: Option<Vec<u8>>,
 }
 
 impl <S: Stream<Item=Vec<u8>, Error=Error>> ChunkStream<S> {
 
     pub fn new(input: S) -> Self {
-        Self { input, chunker: Chunker::new(4 * 1024 * 1024), buffer: None }
+        Self { input, chunker: Chunker::new(4 * 1024 * 1024), buffer: None, rest: None }
     }
 }
 
@@ -33,9 +34,18 @@ impl <S: Stream<Item=Vec<u8>, Error=Error>> Stream for ChunkStream<S> {
                     return Ok(Async::NotReady);
                 }
                 Ok(Async::Ready(None)) => {
-                    return Ok(Async::Ready(self.buffer.take()));
+                    let mut data = self.buffer.take().or_else(|| Some(vec![])).unwrap();
+                    if let Some(rest) = self.rest.take() { data.extend(rest); }
+
+                    if data.len() > 0 {
+                        return Ok(Async::Ready(Some(data)));
+                    } else {
+                        return Ok(Async::Ready(None));
+                    }
                 }
-                Ok(Async::Ready(Some(data))) => {
+                Ok(Async::Ready(Some(mut data))) => {
+
+                    if let Some(rest) = self.rest.take() { data.extend(rest); }
 
                     let buffer = self.buffer.get_or_insert_with(|| Vec::with_capacity(1024*1024));
                     let boundary = self.chunker.scan(&data);
@@ -50,11 +60,10 @@ impl <S: Stream<Item=Vec<u8>, Error=Error>> Stream for ChunkStream<S> {
                         let (left, right) = data.split_at(boundary);
                         buffer.extend(left);
 
-                        let result = self.buffer.take();
-
-                        self.buffer = Some(Vec::from(right));
+                        let rest = self.rest.get_or_insert_with(|| Vec::with_capacity(right.len()));
+                        rest.extend(right);
 
-                        return Ok(Async::Ready(result));
+                        return Ok(Async::Ready(self.buffer.take()));
                     } else {
                         panic!("got unexpected chunk boundary from chunker");
                     }