]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/chunk_stream.rs
src/backup/fixed_index.rs: remove ChunkStat from struct
[proxmox-backup.git] / src / backup / chunk_stream.rs
1 use failure::*;
2
3 use proxmox_protocol::Chunker;
4 use futures::{Async, Poll};
5 use futures::stream::Stream;
6
7 use bytes::BytesMut;
8
9 /// Split input stream into dynamic sized chunks
10 pub struct ChunkStream<S> {
11 input: S,
12 chunker: Chunker,
13 buffer: BytesMut,
14 scan_pos: usize,
15 }
16
17 impl <S> ChunkStream<S> {
18 pub fn new(input: S) -> Self {
19 Self { input, chunker: Chunker::new(4 * 1024 * 1024), buffer: BytesMut::new(), scan_pos: 0}
20 }
21 }
22
23 impl <S> Stream for ChunkStream<S>
24 where S: Stream,
25 S::Item: AsRef<[u8]>,
26 S::Error: Into<Error>,
27 {
28
29 type Item = BytesMut;
30 type Error = Error;
31
32 fn poll(&mut self) -> Poll<Option<BytesMut>, Error> {
33 loop {
34
35 if self.scan_pos < self.buffer.len() {
36 let boundary = self.chunker.scan(&self.buffer[self.scan_pos..]);
37
38 let chunk_size = self.scan_pos + boundary;
39
40 if boundary == 0 {
41 self.scan_pos = self.buffer.len();
42 // continue poll
43 } else if chunk_size <= self.buffer.len() {
44 let result = self.buffer.split_to(chunk_size);
45 self.scan_pos = 0;
46 return Ok(Async::Ready(Some(result)));
47 } else {
48 panic!("got unexpected chunk boundary from chunker");
49 }
50 }
51
52 match self.input.poll() {
53 Err(err) => {
54 return Err(err.into());
55 }
56 Ok(Async::NotReady) => {
57 return Ok(Async::NotReady);
58 }
59 Ok(Async::Ready(None)) => {
60 self.scan_pos = 0;
61 if self.buffer.len() > 0 {
62 return Ok(Async::Ready(Some(self.buffer.take())));
63 } else {
64 return Ok(Async::Ready(None));
65 }
66 }
67 Ok(Async::Ready(Some(data))) => {
68 self.buffer.extend_from_slice(data.as_ref());
69 }
70 }
71 }
72 }
73 }
74
75 /// Split input stream into fixed sized chunks
76 pub struct FixedChunkStream<S> {
77 input: S,
78 chunk_size: usize,
79 buffer: BytesMut,
80 }
81
82 impl <S> FixedChunkStream<S> {
83
84 pub fn new(input: S, chunk_size: usize) -> Self {
85 Self { input, chunk_size, buffer: BytesMut::new() }
86 }
87 }
88
89 impl <S> Stream for FixedChunkStream<S>
90 where S: Stream,
91 S::Item: AsRef<[u8]>,
92 {
93
94 type Item = BytesMut;
95 type Error = S::Error;
96
97 fn poll(&mut self) -> Poll<Option<BytesMut>, S::Error> {
98 loop {
99
100 if self.buffer.len() == self.chunk_size {
101 return Ok(Async::Ready(Some(self.buffer.take())));
102 } else if self.buffer.len() > self.chunk_size {
103 let result = self.buffer.split_to(self.chunk_size);
104 return Ok(Async::Ready(Some(result)));
105 }
106
107 match self.input.poll() {
108 Err(err) => {
109 return Err(err);
110 }
111 Ok(Async::NotReady) => {
112 return Ok(Async::NotReady);
113 }
114 Ok(Async::Ready(None)) => {
115 // last chunk can have any size
116 if self.buffer.len() > 0 {
117 return Ok(Async::Ready(Some(self.buffer.take())));
118 } else {
119 return Ok(Async::Ready(None));
120 }
121 }
122 Ok(Async::Ready(Some(data))) => {
123 self.buffer.extend_from_slice(data.as_ref());
124 }
125 }
126 }
127 }
128 }