]> git.proxmox.com Git - proxmox-backup.git/blob - src/backup/chunk_stream.rs
remove proxmox-protocol subcrate
[proxmox-backup.git] / src / backup / chunk_stream.rs
1 use bytes::BytesMut;
2 use failure::*;
3 use futures::stream::Stream;
4 use futures::{Async, Poll};
5
6 use super::Chunker;
7
8 /// Split input stream into dynamic sized chunks
9 pub struct ChunkStream<S> {
10 input: S,
11 chunker: Chunker,
12 buffer: BytesMut,
13 scan_pos: usize,
14 }
15
16 impl <S> ChunkStream<S> {
17 pub fn new(input: S, chunk_size: Option<usize>) -> Self {
18 Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4*1024*1024)), buffer: BytesMut::new(), scan_pos: 0}
19 }
20 }
21
22 impl <S> Stream for ChunkStream<S>
23 where S: Stream,
24 S::Item: AsRef<[u8]>,
25 S::Error: Into<Error>,
26 {
27
28 type Item = BytesMut;
29 type Error = Error;
30
31 fn poll(&mut self) -> Poll<Option<BytesMut>, Error> {
32 loop {
33
34 if self.scan_pos < self.buffer.len() {
35 let boundary = self.chunker.scan(&self.buffer[self.scan_pos..]);
36
37 let chunk_size = self.scan_pos + boundary;
38
39 if boundary == 0 {
40 self.scan_pos = self.buffer.len();
41 // continue poll
42 } else if chunk_size <= self.buffer.len() {
43 let result = self.buffer.split_to(chunk_size);
44 self.scan_pos = 0;
45 return Ok(Async::Ready(Some(result)));
46 } else {
47 panic!("got unexpected chunk boundary from chunker");
48 }
49 }
50
51 match self.input.poll() {
52 Err(err) => {
53 return Err(err.into());
54 }
55 Ok(Async::NotReady) => {
56 return Ok(Async::NotReady);
57 }
58 Ok(Async::Ready(None)) => {
59 self.scan_pos = 0;
60 if self.buffer.len() > 0 {
61 return Ok(Async::Ready(Some(self.buffer.take())));
62 } else {
63 return Ok(Async::Ready(None));
64 }
65 }
66 Ok(Async::Ready(Some(data))) => {
67 self.buffer.extend_from_slice(data.as_ref());
68 }
69 }
70 }
71 }
72 }
73
74 /// Split input stream into fixed sized chunks
75 pub struct FixedChunkStream<S> {
76 input: S,
77 chunk_size: usize,
78 buffer: BytesMut,
79 }
80
81 impl <S> FixedChunkStream<S> {
82
83 pub fn new(input: S, chunk_size: usize) -> Self {
84 Self { input, chunk_size, buffer: BytesMut::new() }
85 }
86 }
87
88 impl <S> Stream for FixedChunkStream<S>
89 where S: Stream,
90 S::Item: AsRef<[u8]>,
91 {
92
93 type Item = BytesMut;
94 type Error = S::Error;
95
96 fn poll(&mut self) -> Poll<Option<BytesMut>, S::Error> {
97 loop {
98
99 if self.buffer.len() == self.chunk_size {
100 return Ok(Async::Ready(Some(self.buffer.take())));
101 } else if self.buffer.len() > self.chunk_size {
102 let result = self.buffer.split_to(self.chunk_size);
103 return Ok(Async::Ready(Some(result)));
104 }
105
106 match self.input.poll() {
107 Err(err) => {
108 return Err(err);
109 }
110 Ok(Async::NotReady) => {
111 return Ok(Async::NotReady);
112 }
113 Ok(Async::Ready(None)) => {
114 // last chunk can have any size
115 if self.buffer.len() > 0 {
116 return Ok(Async::Ready(Some(self.buffer.take())));
117 } else {
118 return Ok(Async::Ready(None));
119 }
120 }
121 Ok(Async::Ready(Some(data))) => {
122 self.buffer.extend_from_slice(data.as_ref());
123 }
124 }
125 }
126 }
127 }