3 use futures
::stream
::Stream
;
4 use futures
::{Async, Poll}
;
8 /// Split input stream into dynamic sized chunks
9 pub struct ChunkStream
<S
> {
16 impl <S
> ChunkStream
<S
> {
17 pub fn new(input
: S
, chunk_size
: Option
<usize>) -> Self {
18 Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4*1024*1024)), buffer: BytesMut::new(), scan_pos: 0}
22 impl <S
> Stream
for ChunkStream
<S
>
25 S
::Error
: Into
<Error
>,
31 fn poll(&mut self) -> Poll
<Option
<BytesMut
>, Error
> {
34 if self.scan_pos
< self.buffer
.len() {
35 let boundary
= self.chunker
.scan(&self.buffer
[self.scan_pos
..]);
37 let chunk_size
= self.scan_pos
+ boundary
;
40 self.scan_pos
= self.buffer
.len();
42 } else if chunk_size
<= self.buffer
.len() {
43 let result
= self.buffer
.split_to(chunk_size
);
45 return Ok(Async
::Ready(Some(result
)));
47 panic
!("got unexpected chunk boundary from chunker");
51 match self.input
.poll() {
53 return Err(err
.into());
55 Ok(Async
::NotReady
) => {
56 return Ok(Async
::NotReady
);
58 Ok(Async
::Ready(None
)) => {
60 if self.buffer
.len() > 0 {
61 return Ok(Async
::Ready(Some(self.buffer
.take())));
63 return Ok(Async
::Ready(None
));
66 Ok(Async
::Ready(Some(data
))) => {
67 self.buffer
.extend_from_slice(data
.as_ref());
74 /// Split input stream into fixed sized chunks
75 pub struct FixedChunkStream
<S
> {
81 impl <S
> FixedChunkStream
<S
> {
83 pub fn new(input
: S
, chunk_size
: usize) -> Self {
84 Self { input, chunk_size, buffer: BytesMut::new() }
88 impl <S
> Stream
for FixedChunkStream
<S
>
94 type Error
= S
::Error
;
96 fn poll(&mut self) -> Poll
<Option
<BytesMut
>, S
::Error
> {
99 if self.buffer
.len() == self.chunk_size
{
100 return Ok(Async
::Ready(Some(self.buffer
.take())));
101 } else if self.buffer
.len() > self.chunk_size
{
102 let result
= self.buffer
.split_to(self.chunk_size
);
103 return Ok(Async
::Ready(Some(result
)));
106 match self.input
.poll() {
110 Ok(Async
::NotReady
) => {
111 return Ok(Async
::NotReady
);
113 Ok(Async
::Ready(None
)) => {
114 // last chunk can have any size
115 if self.buffer
.len() > 0 {
116 return Ok(Async
::Ready(Some(self.buffer
.take())));
118 return Ok(Async
::Ready(None
));
121 Ok(Async
::Ready(Some(data
))) => {
122 self.buffer
.extend_from_slice(data
.as_ref());