]> git.proxmox.com Git - proxmox-backup.git/blame - src/backup/chunk_stream.rs
src/backup/chunk_stream.rs: add optional chunk_size parameter
[proxmox-backup.git] / src / backup / chunk_stream.rs
CommitLineData
dafc27ae
DM
1use failure::*;
2
3use proxmox_protocol::Chunker;
4use futures::{Async, Poll};
5use futures::stream::Stream;
6
0cc0fffd
DM
7use bytes::BytesMut;
8
8a7cc756 9/// Split input stream into dynamic sized chunks
c052be5c 10pub struct ChunkStream<S> {
dafc27ae
DM
11 input: S,
12 chunker: Chunker,
0cc0fffd
DM
13 buffer: BytesMut,
14 scan_pos: usize,
dafc27ae
DM
15}
16
c052be5c 17impl <S> ChunkStream<S> {
36898ffc
DM
18 pub fn new(input: S, chunk_size: Option<usize>) -> Self {
19 Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4*1024*1024)), buffer: BytesMut::new(), scan_pos: 0}
dafc27ae
DM
20 }
21}
22
c052be5c
DM
23impl <S> Stream for ChunkStream<S>
24 where S: Stream,
25 S::Item: AsRef<[u8]>,
26 S::Error: Into<Error>,
27{
dafc27ae 28
0cc0fffd 29 type Item = BytesMut;
dafc27ae
DM
30 type Error = Error;
31
0cc0fffd 32 fn poll(&mut self) -> Poll<Option<BytesMut>, Error> {
dafc27ae 33 loop {
3be3f3dc 34
0cc0fffd
DM
35 if self.scan_pos < self.buffer.len() {
36 let boundary = self.chunker.scan(&self.buffer[self.scan_pos..]);
37
38 let chunk_size = self.scan_pos + boundary;
3be3f3dc
DM
39
40 if boundary == 0 {
0cc0fffd 41 self.scan_pos = self.buffer.len();
3be3f3dc 42 // continue poll
0cc0fffd
DM
43 } else if chunk_size <= self.buffer.len() {
44 let result = self.buffer.split_to(chunk_size);
45 self.scan_pos = 0;
46 return Ok(Async::Ready(Some(result)));
3be3f3dc
DM
47 } else {
48 panic!("got unexpected chunk boundary from chunker");
49 }
50 }
51
dafc27ae
DM
52 match self.input.poll() {
53 Err(err) => {
c052be5c 54 return Err(err.into());
dafc27ae
DM
55 }
56 Ok(Async::NotReady) => {
57 return Ok(Async::NotReady);
58 }
59 Ok(Async::Ready(None)) => {
0cc0fffd
DM
60 self.scan_pos = 0;
61 if self.buffer.len() > 0 {
62 return Ok(Async::Ready(Some(self.buffer.take())));
ff77dbbe
DM
63 } else {
64 return Ok(Async::Ready(None));
65 }
dafc27ae 66 }
02fa54ff 67 Ok(Async::Ready(Some(data))) => {
0cc0fffd
DM
68 self.buffer.extend_from_slice(data.as_ref());
69 }
dafc27ae
DM
70 }
71 }
72 }
73}
8a7cc756
DM
74
75/// Split input stream into fixed sized chunks
169c0e06 76pub struct FixedChunkStream<S> {
8a7cc756
DM
77 input: S,
78 chunk_size: usize,
169c0e06 79 buffer: BytesMut,
8a7cc756
DM
80}
81
169c0e06 82impl <S> FixedChunkStream<S> {
8a7cc756
DM
83
84 pub fn new(input: S, chunk_size: usize) -> Self {
169c0e06 85 Self { input, chunk_size, buffer: BytesMut::new() }
8a7cc756
DM
86 }
87}
88
169c0e06
DM
89impl <S> Stream for FixedChunkStream<S>
90 where S: Stream,
91 S::Item: AsRef<[u8]>,
92{
8a7cc756 93
169c0e06
DM
94 type Item = BytesMut;
95 type Error = S::Error;
8a7cc756 96
169c0e06 97 fn poll(&mut self) -> Poll<Option<BytesMut>, S::Error> {
8a7cc756 98 loop {
169c0e06
DM
99
100 if self.buffer.len() == self.chunk_size {
101 return Ok(Async::Ready(Some(self.buffer.take())));
102 } else if self.buffer.len() > self.chunk_size {
103 let result = self.buffer.split_to(self.chunk_size);
104 return Ok(Async::Ready(Some(result)));
105 }
106
8a7cc756
DM
107 match self.input.poll() {
108 Err(err) => {
109 return Err(err);
110 }
111 Ok(Async::NotReady) => {
112 return Ok(Async::NotReady);
113 }
114 Ok(Async::Ready(None)) => {
115 // last chunk can have any size
169c0e06
DM
116 if self.buffer.len() > 0 {
117 return Ok(Async::Ready(Some(self.buffer.take())));
8a7cc756 118 } else {
169c0e06 119 return Ok(Async::Ready(None));
8a7cc756
DM
120 }
121 }
169c0e06
DM
122 Ok(Async::Ready(Some(data))) => {
123 self.buffer.extend_from_slice(data.as_ref());
124 }
8a7cc756
DM
125 }
126 }
127 }
128}