]> git.proxmox.com Git - proxmox-backup.git/blame - src/backup/chunk_stream.rs
remove proxmox-protocol subcrate
[proxmox-backup.git] / src / backup / chunk_stream.rs
CommitLineData
7d83440c 1use bytes::BytesMut;
dafc27ae 2use failure::*;
dafc27ae 3use futures::stream::Stream;
7d83440c 4use futures::{Async, Poll};
dafc27ae 5
7d83440c 6use super::Chunker;
0cc0fffd 7
8a7cc756 8/// Split input stream into dynamic sized chunks
c052be5c 9pub struct ChunkStream<S> {
dafc27ae
DM
10 input: S,
11 chunker: Chunker,
0cc0fffd
DM
12 buffer: BytesMut,
13 scan_pos: usize,
dafc27ae
DM
14}
15
c052be5c 16impl <S> ChunkStream<S> {
36898ffc
DM
17 pub fn new(input: S, chunk_size: Option<usize>) -> Self {
18 Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4*1024*1024)), buffer: BytesMut::new(), scan_pos: 0}
dafc27ae
DM
19 }
20}
21
c052be5c
DM
22impl <S> Stream for ChunkStream<S>
23 where S: Stream,
24 S::Item: AsRef<[u8]>,
25 S::Error: Into<Error>,
26{
dafc27ae 27
0cc0fffd 28 type Item = BytesMut;
dafc27ae
DM
29 type Error = Error;
30
0cc0fffd 31 fn poll(&mut self) -> Poll<Option<BytesMut>, Error> {
dafc27ae 32 loop {
3be3f3dc 33
0cc0fffd
DM
34 if self.scan_pos < self.buffer.len() {
35 let boundary = self.chunker.scan(&self.buffer[self.scan_pos..]);
36
37 let chunk_size = self.scan_pos + boundary;
3be3f3dc
DM
38
39 if boundary == 0 {
0cc0fffd 40 self.scan_pos = self.buffer.len();
3be3f3dc 41 // continue poll
0cc0fffd
DM
42 } else if chunk_size <= self.buffer.len() {
43 let result = self.buffer.split_to(chunk_size);
44 self.scan_pos = 0;
45 return Ok(Async::Ready(Some(result)));
3be3f3dc
DM
46 } else {
47 panic!("got unexpected chunk boundary from chunker");
48 }
49 }
50
dafc27ae
DM
51 match self.input.poll() {
52 Err(err) => {
c052be5c 53 return Err(err.into());
dafc27ae
DM
54 }
55 Ok(Async::NotReady) => {
56 return Ok(Async::NotReady);
57 }
58 Ok(Async::Ready(None)) => {
0cc0fffd
DM
59 self.scan_pos = 0;
60 if self.buffer.len() > 0 {
61 return Ok(Async::Ready(Some(self.buffer.take())));
ff77dbbe
DM
62 } else {
63 return Ok(Async::Ready(None));
64 }
dafc27ae 65 }
02fa54ff 66 Ok(Async::Ready(Some(data))) => {
0cc0fffd
DM
67 self.buffer.extend_from_slice(data.as_ref());
68 }
dafc27ae
DM
69 }
70 }
71 }
72}
8a7cc756
DM
73
74/// Split input stream into fixed sized chunks
169c0e06 75pub struct FixedChunkStream<S> {
8a7cc756
DM
76 input: S,
77 chunk_size: usize,
169c0e06 78 buffer: BytesMut,
8a7cc756
DM
79}
80
169c0e06 81impl <S> FixedChunkStream<S> {
8a7cc756
DM
82
83 pub fn new(input: S, chunk_size: usize) -> Self {
169c0e06 84 Self { input, chunk_size, buffer: BytesMut::new() }
8a7cc756
DM
85 }
86}
87
169c0e06
DM
88impl <S> Stream for FixedChunkStream<S>
89 where S: Stream,
90 S::Item: AsRef<[u8]>,
91{
8a7cc756 92
169c0e06
DM
93 type Item = BytesMut;
94 type Error = S::Error;
8a7cc756 95
169c0e06 96 fn poll(&mut self) -> Poll<Option<BytesMut>, S::Error> {
8a7cc756 97 loop {
169c0e06
DM
98
99 if self.buffer.len() == self.chunk_size {
100 return Ok(Async::Ready(Some(self.buffer.take())));
101 } else if self.buffer.len() > self.chunk_size {
102 let result = self.buffer.split_to(self.chunk_size);
103 return Ok(Async::Ready(Some(result)));
104 }
105
8a7cc756
DM
106 match self.input.poll() {
107 Err(err) => {
108 return Err(err);
109 }
110 Ok(Async::NotReady) => {
111 return Ok(Async::NotReady);
112 }
113 Ok(Async::Ready(None)) => {
114 // last chunk can have any size
169c0e06
DM
115 if self.buffer.len() > 0 {
116 return Ok(Async::Ready(Some(self.buffer.take())));
8a7cc756 117 } else {
169c0e06 118 return Ok(Async::Ready(None));
8a7cc756
DM
119 }
120 }
169c0e06
DM
121 Ok(Async::Ready(Some(data))) => {
122 self.buffer.extend_from_slice(data.as_ref());
123 }
8a7cc756
DM
124 }
125 }
126 }
127}