]>
Commit | Line | Data |
---|---|---|
32bef1e2 WB |
1 | use std::pin::Pin; |
2 | use std::task::{Context, Poll}; | |
3 | ||
7d83440c | 4 | use bytes::BytesMut; |
dafc27ae | 5 | use failure::*; |
32bef1e2 WB |
6 | use futures::ready; |
7 | use futures::stream::{Stream, TryStream}; | |
dafc27ae | 8 | |
7d83440c | 9 | use super::Chunker; |
0cc0fffd | 10 | |
8a7cc756 | 11 | /// Split input stream into dynamic sized chunks |
32bef1e2 | 12 | pub struct ChunkStream<S: Unpin> { |
dafc27ae DM |
13 | input: S, |
14 | chunker: Chunker, | |
0cc0fffd DM |
15 | buffer: BytesMut, |
16 | scan_pos: usize, | |
dafc27ae DM |
17 | } |
18 | ||
32bef1e2 | 19 | impl<S: Unpin> ChunkStream<S> { |
36898ffc DM |
20 | pub fn new(input: S, chunk_size: Option<usize>) -> Self { |
21 | Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4*1024*1024)), buffer: BytesMut::new(), scan_pos: 0} | |
dafc27ae DM |
22 | } |
23 | } | |
24 | ||
32bef1e2 WB |
25 | impl<S: Unpin> Unpin for ChunkStream<S> {} |
26 | ||
27 | impl<S: Unpin> Stream for ChunkStream<S> | |
28 | where | |
29 | S: TryStream, | |
30 | S::Ok: AsRef<[u8]>, | |
31 | S::Error: Into<Error>, | |
c052be5c | 32 | { |
dafc27ae | 33 | |
32bef1e2 | 34 | type Item = Result<BytesMut, Error>; |
dafc27ae | 35 | |
32bef1e2 WB |
36 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { |
37 | let this = self.get_mut(); | |
dafc27ae | 38 | loop { |
32bef1e2 WB |
39 | if this.scan_pos < this.buffer.len() { |
40 | let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]); | |
3be3f3dc | 41 | |
32bef1e2 | 42 | let chunk_size = this.scan_pos + boundary; |
3be3f3dc DM |
43 | |
44 | if boundary == 0 { | |
32bef1e2 | 45 | this.scan_pos = this.buffer.len(); |
3be3f3dc | 46 | // continue poll |
32bef1e2 WB |
47 | } else if chunk_size <= this.buffer.len() { |
48 | let result = this.buffer.split_to(chunk_size); | |
49 | this.scan_pos = 0; | |
50 | return Poll::Ready(Some(Ok(result))); | |
3be3f3dc DM |
51 | } else { |
52 | panic!("got unexpected chunk boundary from chunker"); | |
53 | } | |
54 | } | |
55 | ||
32bef1e2 WB |
56 | match ready!(Pin::new(&mut this.input).try_poll_next(cx)) { |
57 | Some(Err(err)) => { | |
58 | return Poll::Ready(Some(Err(err.into()))); | |
dafc27ae | 59 | } |
32bef1e2 WB |
60 | None => { |
61 | this.scan_pos = 0; | |
62 | if this.buffer.len() > 0 { | |
db0cb9ce | 63 | return Poll::Ready(Some(Ok(this.buffer.split()))); |
ff77dbbe | 64 | } else { |
32bef1e2 | 65 | return Poll::Ready(None); |
ff77dbbe | 66 | } |
dafc27ae | 67 | } |
32bef1e2 WB |
68 | Some(Ok(data)) => { |
69 | this.buffer.extend_from_slice(data.as_ref()); | |
70 | } | |
dafc27ae DM |
71 | } |
72 | } | |
73 | } | |
74 | } | |
8a7cc756 DM |
75 | |
76 | /// Split input stream into fixed sized chunks | |
32bef1e2 | 77 | pub struct FixedChunkStream<S: Unpin> { |
8a7cc756 DM |
78 | input: S, |
79 | chunk_size: usize, | |
169c0e06 | 80 | buffer: BytesMut, |
8a7cc756 DM |
81 | } |
82 | ||
32bef1e2 | 83 | impl<S: Unpin> FixedChunkStream<S> { |
8a7cc756 | 84 | pub fn new(input: S, chunk_size: usize) -> Self { |
169c0e06 | 85 | Self { input, chunk_size, buffer: BytesMut::new() } |
8a7cc756 DM |
86 | } |
87 | } | |
88 | ||
32bef1e2 | 89 | impl<S: Unpin> Unpin for FixedChunkStream<S> {} |
8a7cc756 | 90 | |
32bef1e2 WB |
91 | impl<S: Unpin> Stream for FixedChunkStream<S> |
92 | where | |
93 | S: TryStream, | |
94 | S::Ok: AsRef<[u8]>, | |
95 | { | |
96 | type Item = Result<BytesMut, S::Error>; | |
8a7cc756 | 97 | |
32bef1e2 WB |
98 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<BytesMut, S::Error>>> { |
99 | let this = self.get_mut(); | |
8a7cc756 | 100 | loop { |
32bef1e2 | 101 | if this.buffer.len() == this.chunk_size { |
db0cb9ce | 102 | return Poll::Ready(Some(Ok(this.buffer.split()))); |
32bef1e2 WB |
103 | } else if this.buffer.len() > this.chunk_size { |
104 | let result = this.buffer.split_to(this.chunk_size); | |
105 | return Poll::Ready(Some(Ok(result))); | |
169c0e06 DM |
106 | } |
107 | ||
32bef1e2 WB |
108 | match ready!(Pin::new(&mut this.input).try_poll_next(cx)) { |
109 | Some(Err(err)) => { | |
110 | return Poll::Ready(Some(Err(err))); | |
8a7cc756 | 111 | } |
32bef1e2 | 112 | None => { |
8a7cc756 | 113 | // last chunk can have any size |
32bef1e2 | 114 | if this.buffer.len() > 0 { |
db0cb9ce | 115 | return Poll::Ready(Some(Ok(this.buffer.split()))); |
8a7cc756 | 116 | } else { |
32bef1e2 | 117 | return Poll::Ready(None); |
8a7cc756 DM |
118 | } |
119 | } | |
32bef1e2 WB |
120 | Some(Ok(data)) => { |
121 | this.buffer.extend_from_slice(data.as_ref()); | |
169c0e06 | 122 | } |
8a7cc756 DM |
123 | } |
124 | } | |
125 | } | |
126 | } |