]>
Commit | Line | Data |
---|---|---|
7d83440c | 1 | use bytes::BytesMut; |
dafc27ae | 2 | use failure::*; |
dafc27ae | 3 | use futures::stream::Stream; |
7d83440c | 4 | use futures::{Async, Poll}; |
dafc27ae | 5 | |
7d83440c | 6 | use super::Chunker; |
0cc0fffd | 7 | |
8a7cc756 | 8 | /// Split input stream into dynamic sized chunks |
c052be5c | 9 | pub struct ChunkStream<S> { |
dafc27ae DM |
10 | input: S, |
11 | chunker: Chunker, | |
0cc0fffd DM |
12 | buffer: BytesMut, |
13 | scan_pos: usize, | |
dafc27ae DM |
14 | } |
15 | ||
c052be5c | 16 | impl <S> ChunkStream<S> { |
36898ffc DM |
17 | pub fn new(input: S, chunk_size: Option<usize>) -> Self { |
18 | Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4*1024*1024)), buffer: BytesMut::new(), scan_pos: 0} | |
dafc27ae DM |
19 | } |
20 | } | |
21 | ||
c052be5c DM |
22 | impl <S> Stream for ChunkStream<S> |
23 | where S: Stream, | |
24 | S::Item: AsRef<[u8]>, | |
25 | S::Error: Into<Error>, | |
26 | { | |
dafc27ae | 27 | |
0cc0fffd | 28 | type Item = BytesMut; |
dafc27ae DM |
29 | type Error = Error; |
30 | ||
0cc0fffd | 31 | fn poll(&mut self) -> Poll<Option<BytesMut>, Error> { |
dafc27ae | 32 | loop { |
3be3f3dc | 33 | |
0cc0fffd DM |
34 | if self.scan_pos < self.buffer.len() { |
35 | let boundary = self.chunker.scan(&self.buffer[self.scan_pos..]); | |
36 | ||
37 | let chunk_size = self.scan_pos + boundary; | |
3be3f3dc DM |
38 | |
39 | if boundary == 0 { | |
0cc0fffd | 40 | self.scan_pos = self.buffer.len(); |
3be3f3dc | 41 | // continue poll |
0cc0fffd DM |
42 | } else if chunk_size <= self.buffer.len() { |
43 | let result = self.buffer.split_to(chunk_size); | |
44 | self.scan_pos = 0; | |
45 | return Ok(Async::Ready(Some(result))); | |
3be3f3dc DM |
46 | } else { |
47 | panic!("got unexpected chunk boundary from chunker"); | |
48 | } | |
49 | } | |
50 | ||
dafc27ae DM |
51 | match self.input.poll() { |
52 | Err(err) => { | |
c052be5c | 53 | return Err(err.into()); |
dafc27ae DM |
54 | } |
55 | Ok(Async::NotReady) => { | |
56 | return Ok(Async::NotReady); | |
57 | } | |
58 | Ok(Async::Ready(None)) => { | |
0cc0fffd DM |
59 | self.scan_pos = 0; |
60 | if self.buffer.len() > 0 { | |
61 | return Ok(Async::Ready(Some(self.buffer.take()))); | |
ff77dbbe DM |
62 | } else { |
63 | return Ok(Async::Ready(None)); | |
64 | } | |
dafc27ae | 65 | } |
02fa54ff | 66 | Ok(Async::Ready(Some(data))) => { |
0cc0fffd DM |
67 | self.buffer.extend_from_slice(data.as_ref()); |
68 | } | |
dafc27ae DM |
69 | } |
70 | } | |
71 | } | |
72 | } | |
8a7cc756 DM |
73 | |
74 | /// Split input stream into fixed sized chunks | |
169c0e06 | 75 | pub struct FixedChunkStream<S> { |
8a7cc756 DM |
76 | input: S, |
77 | chunk_size: usize, | |
169c0e06 | 78 | buffer: BytesMut, |
8a7cc756 DM |
79 | } |
80 | ||
169c0e06 | 81 | impl <S> FixedChunkStream<S> { |
8a7cc756 DM |
82 | |
83 | pub fn new(input: S, chunk_size: usize) -> Self { | |
169c0e06 | 84 | Self { input, chunk_size, buffer: BytesMut::new() } |
8a7cc756 DM |
85 | } |
86 | } | |
87 | ||
169c0e06 DM |
88 | impl <S> Stream for FixedChunkStream<S> |
89 | where S: Stream, | |
90 | S::Item: AsRef<[u8]>, | |
91 | { | |
8a7cc756 | 92 | |
169c0e06 DM |
93 | type Item = BytesMut; |
94 | type Error = S::Error; | |
8a7cc756 | 95 | |
169c0e06 | 96 | fn poll(&mut self) -> Poll<Option<BytesMut>, S::Error> { |
8a7cc756 | 97 | loop { |
169c0e06 DM |
98 | |
99 | if self.buffer.len() == self.chunk_size { | |
100 | return Ok(Async::Ready(Some(self.buffer.take()))); | |
101 | } else if self.buffer.len() > self.chunk_size { | |
102 | let result = self.buffer.split_to(self.chunk_size); | |
103 | return Ok(Async::Ready(Some(result))); | |
104 | } | |
105 | ||
8a7cc756 DM |
106 | match self.input.poll() { |
107 | Err(err) => { | |
108 | return Err(err); | |
109 | } | |
110 | Ok(Async::NotReady) => { | |
111 | return Ok(Async::NotReady); | |
112 | } | |
113 | Ok(Async::Ready(None)) => { | |
114 | // last chunk can have any size | |
169c0e06 DM |
115 | if self.buffer.len() > 0 { |
116 | return Ok(Async::Ready(Some(self.buffer.take()))); | |
8a7cc756 | 117 | } else { |
169c0e06 | 118 | return Ok(Async::Ready(None)); |
8a7cc756 DM |
119 | } |
120 | } | |
169c0e06 DM |
121 | Ok(Async::Ready(Some(data))) => { |
122 | self.buffer.extend_from_slice(data.as_ref()); | |
123 | } | |
8a7cc756 DM |
124 | } |
125 | } | |
126 | } | |
127 | } |