]>
Commit | Line | Data |
---|---|---|
dafc27ae DM |
1 | use failure::*; |
2 | ||
3 | use proxmox_protocol::Chunker; | |
4 | use futures::{Async, Poll}; | |
5 | use futures::stream::Stream; | |
6 | ||
0cc0fffd DM |
7 | use bytes::BytesMut; |
8 | ||
8a7cc756 | 9 | /// Split input stream into dynamic sized chunks |
c052be5c | 10 | pub struct ChunkStream<S> { |
dafc27ae DM |
11 | input: S, |
12 | chunker: Chunker, | |
0cc0fffd DM |
13 | buffer: BytesMut, |
14 | scan_pos: usize, | |
dafc27ae DM |
15 | } |
16 | ||
c052be5c | 17 | impl <S> ChunkStream<S> { |
36898ffc DM |
18 | pub fn new(input: S, chunk_size: Option<usize>) -> Self { |
19 | Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4*1024*1024)), buffer: BytesMut::new(), scan_pos: 0} | |
dafc27ae DM |
20 | } |
21 | } | |
22 | ||
c052be5c DM |
23 | impl <S> Stream for ChunkStream<S> |
24 | where S: Stream, | |
25 | S::Item: AsRef<[u8]>, | |
26 | S::Error: Into<Error>, | |
27 | { | |
dafc27ae | 28 | |
0cc0fffd | 29 | type Item = BytesMut; |
dafc27ae DM |
30 | type Error = Error; |
31 | ||
0cc0fffd | 32 | fn poll(&mut self) -> Poll<Option<BytesMut>, Error> { |
dafc27ae | 33 | loop { |
3be3f3dc | 34 | |
0cc0fffd DM |
35 | if self.scan_pos < self.buffer.len() { |
36 | let boundary = self.chunker.scan(&self.buffer[self.scan_pos..]); | |
37 | ||
38 | let chunk_size = self.scan_pos + boundary; | |
3be3f3dc DM |
39 | |
40 | if boundary == 0 { | |
0cc0fffd | 41 | self.scan_pos = self.buffer.len(); |
3be3f3dc | 42 | // continue poll |
0cc0fffd DM |
43 | } else if chunk_size <= self.buffer.len() { |
44 | let result = self.buffer.split_to(chunk_size); | |
45 | self.scan_pos = 0; | |
46 | return Ok(Async::Ready(Some(result))); | |
3be3f3dc DM |
47 | } else { |
48 | panic!("got unexpected chunk boundary from chunker"); | |
49 | } | |
50 | } | |
51 | ||
dafc27ae DM |
52 | match self.input.poll() { |
53 | Err(err) => { | |
c052be5c | 54 | return Err(err.into()); |
dafc27ae DM |
55 | } |
56 | Ok(Async::NotReady) => { | |
57 | return Ok(Async::NotReady); | |
58 | } | |
59 | Ok(Async::Ready(None)) => { | |
0cc0fffd DM |
60 | self.scan_pos = 0; |
61 | if self.buffer.len() > 0 { | |
62 | return Ok(Async::Ready(Some(self.buffer.take()))); | |
ff77dbbe DM |
63 | } else { |
64 | return Ok(Async::Ready(None)); | |
65 | } | |
dafc27ae | 66 | } |
02fa54ff | 67 | Ok(Async::Ready(Some(data))) => { |
0cc0fffd DM |
68 | self.buffer.extend_from_slice(data.as_ref()); |
69 | } | |
dafc27ae DM |
70 | } |
71 | } | |
72 | } | |
73 | } | |
8a7cc756 DM |
74 | |
75 | /// Split input stream into fixed sized chunks | |
169c0e06 | 76 | pub struct FixedChunkStream<S> { |
8a7cc756 DM |
77 | input: S, |
78 | chunk_size: usize, | |
169c0e06 | 79 | buffer: BytesMut, |
8a7cc756 DM |
80 | } |
81 | ||
169c0e06 | 82 | impl <S> FixedChunkStream<S> { |
8a7cc756 DM |
83 | |
84 | pub fn new(input: S, chunk_size: usize) -> Self { | |
169c0e06 | 85 | Self { input, chunk_size, buffer: BytesMut::new() } |
8a7cc756 DM |
86 | } |
87 | } | |
88 | ||
169c0e06 DM |
89 | impl <S> Stream for FixedChunkStream<S> |
90 | where S: Stream, | |
91 | S::Item: AsRef<[u8]>, | |
92 | { | |
8a7cc756 | 93 | |
169c0e06 DM |
94 | type Item = BytesMut; |
95 | type Error = S::Error; | |
8a7cc756 | 96 | |
169c0e06 | 97 | fn poll(&mut self) -> Poll<Option<BytesMut>, S::Error> { |
8a7cc756 | 98 | loop { |
169c0e06 DM |
99 | |
100 | if self.buffer.len() == self.chunk_size { | |
101 | return Ok(Async::Ready(Some(self.buffer.take()))); | |
102 | } else if self.buffer.len() > self.chunk_size { | |
103 | let result = self.buffer.split_to(self.chunk_size); | |
104 | return Ok(Async::Ready(Some(result))); | |
105 | } | |
106 | ||
8a7cc756 DM |
107 | match self.input.poll() { |
108 | Err(err) => { | |
109 | return Err(err); | |
110 | } | |
111 | Ok(Async::NotReady) => { | |
112 | return Ok(Async::NotReady); | |
113 | } | |
114 | Ok(Async::Ready(None)) => { | |
115 | // last chunk can have any size | |
169c0e06 DM |
116 | if self.buffer.len() > 0 { |
117 | return Ok(Async::Ready(Some(self.buffer.take()))); | |
8a7cc756 | 118 | } else { |
169c0e06 | 119 | return Ok(Async::Ready(None)); |
8a7cc756 DM |
120 | } |
121 | } | |
169c0e06 DM |
122 | Ok(Async::Ready(Some(data))) => { |
123 | self.buffer.extend_from_slice(data.as_ref()); | |
124 | } | |
8a7cc756 DM |
125 | } |
126 | } | |
127 | } | |
128 | } |