2 use std
::task
::{Context, Poll}
;
7 use futures
::stream
::{Stream, TryStream}
;
11 /// Split input stream into dynamic sized chunks
12 pub struct ChunkStream
<S
: Unpin
> {
19 impl<S
: Unpin
> ChunkStream
<S
> {
20 pub fn new(input
: S
, chunk_size
: Option
<usize>) -> Self {
21 Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4*1024*1024)), buffer: BytesMut::new(), scan_pos: 0}
25 impl<S
: Unpin
> Unpin
for ChunkStream
<S
> {}
27 impl<S
: Unpin
> Stream
for ChunkStream
<S
>
31 S
::Error
: Into
<Error
>,
34 type Item
= Result
<BytesMut
, Error
>;
36 fn poll_next(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Option
<Self::Item
>> {
37 let this
= self.get_mut();
39 if this
.scan_pos
< this
.buffer
.len() {
40 let boundary
= this
.chunker
.scan(&this
.buffer
[this
.scan_pos
..]);
42 let chunk_size
= this
.scan_pos
+ boundary
;
45 this
.scan_pos
= this
.buffer
.len();
47 } else if chunk_size
<= this
.buffer
.len() {
48 let result
= this
.buffer
.split_to(chunk_size
);
50 return Poll
::Ready(Some(Ok(result
)));
52 panic
!("got unexpected chunk boundary from chunker");
56 match ready
!(Pin
::new(&mut this
.input
).try_poll_next(cx
)) {
58 return Poll
::Ready(Some(Err(err
.into())));
62 if this
.buffer
.len() > 0 {
63 return Poll
::Ready(Some(Ok(this
.buffer
.split())));
65 return Poll
::Ready(None
);
69 this
.buffer
.extend_from_slice(data
.as_ref());
76 /// Split input stream into fixed sized chunks
77 pub struct FixedChunkStream
<S
: Unpin
> {
83 impl<S
: Unpin
> FixedChunkStream
<S
> {
84 pub fn new(input
: S
, chunk_size
: usize) -> Self {
85 Self { input, chunk_size, buffer: BytesMut::new() }
89 impl<S
: Unpin
> Unpin
for FixedChunkStream
<S
> {}
91 impl<S
: Unpin
> Stream
for FixedChunkStream
<S
>
96 type Item
= Result
<BytesMut
, S
::Error
>;
98 fn poll_next(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Option
<Result
<BytesMut
, S
::Error
>>> {
99 let this
= self.get_mut();
101 if this
.buffer
.len() == this
.chunk_size
{
102 return Poll
::Ready(Some(Ok(this
.buffer
.split())));
103 } else if this
.buffer
.len() > this
.chunk_size
{
104 let result
= this
.buffer
.split_to(this
.chunk_size
);
105 return Poll
::Ready(Some(Ok(result
)));
108 match ready
!(Pin
::new(&mut this
.input
).try_poll_next(cx
)) {
110 return Poll
::Ready(Some(Err(err
)));
113 // last chunk can have any size
114 if this
.buffer
.len() > 0 {
115 return Poll
::Ready(Some(Ok(this
.buffer
.split())));
117 return Poll
::Ready(None
);
121 this
.buffer
.extend_from_slice(data
.as_ref());