3 use proxmox_protocol
::Chunker
;
4 use futures
::{Async, Poll}
;
5 use futures
::stream
::Stream
;
9 /// Split input stream into dynamic sized chunks
10 pub struct ChunkStream
<S
> {
17 impl <S
> ChunkStream
<S
> {
18 pub fn new(input
: S
) -> Self {
19 Self { input, chunker: Chunker::new(4 * 1024 * 1024), buffer: BytesMut::new(), scan_pos: 0}
23 impl <S
> Stream
for ChunkStream
<S
>
26 S
::Error
: Into
<Error
>,
32 fn poll(&mut self) -> Poll
<Option
<BytesMut
>, Error
> {
35 if self.scan_pos
< self.buffer
.len() {
36 let boundary
= self.chunker
.scan(&self.buffer
[self.scan_pos
..]);
38 let chunk_size
= self.scan_pos
+ boundary
;
41 self.scan_pos
= self.buffer
.len();
43 } else if chunk_size
<= self.buffer
.len() {
44 let result
= self.buffer
.split_to(chunk_size
);
46 return Ok(Async
::Ready(Some(result
)));
48 panic
!("got unexpected chunk boundary from chunker");
52 match self.input
.poll() {
54 return Err(err
.into());
56 Ok(Async
::NotReady
) => {
57 return Ok(Async
::NotReady
);
59 Ok(Async
::Ready(None
)) => {
61 if self.buffer
.len() > 0 {
62 return Ok(Async
::Ready(Some(self.buffer
.take())));
64 return Ok(Async
::Ready(None
));
67 Ok(Async
::Ready(Some(data
))) => {
68 self.buffer
.extend_from_slice(data
.as_ref());
75 /// Split input stream into fixed sized chunks
76 pub struct FixedChunkStream
<S
> {
82 impl <S
> FixedChunkStream
<S
> {
84 pub fn new(input
: S
, chunk_size
: usize) -> Self {
85 Self { input, chunk_size, buffer: BytesMut::new() }
89 impl <S
> Stream
for FixedChunkStream
<S
>
95 type Error
= S
::Error
;
97 fn poll(&mut self) -> Poll
<Option
<BytesMut
>, S
::Error
> {
100 if self.buffer
.len() == self.chunk_size
{
101 return Ok(Async
::Ready(Some(self.buffer
.take())));
102 } else if self.buffer
.len() > self.chunk_size
{
103 let result
= self.buffer
.split_to(self.chunk_size
);
104 return Ok(Async
::Ready(Some(result
)));
107 match self.input
.poll() {
111 Ok(Async
::NotReady
) => {
112 return Ok(Async
::NotReady
);
114 Ok(Async
::Ready(None
)) => {
115 // last chunk can have any size
116 if self.buffer
.len() > 0 {
117 return Ok(Async
::Ready(Some(self.buffer
.take())));
119 return Ok(Async
::Ready(None
));
122 Ok(Async
::Ready(Some(data
))) => {
123 self.buffer
.extend_from_slice(data
.as_ref());