+use std::pin::Pin;
+use std::task::{Context, Poll};
+
use bytes::BytesMut;
use failure::*;
-use futures::stream::Stream;
-use futures::{Async, Poll};
+use futures::ready;
+use futures::stream::{Stream, TryStream};
use super::Chunker;
/// Split input stream into dynamic sized chunks
-pub struct ChunkStream<S> {
+pub struct ChunkStream<S: Unpin> {
input: S,
chunker: Chunker,
buffer: BytesMut,
scan_pos: usize,
}
-impl <S> ChunkStream<S> {
+impl<S: Unpin> ChunkStream<S> {
pub fn new(input: S, chunk_size: Option<usize>) -> Self {
Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4*1024*1024)), buffer: BytesMut::new(), scan_pos: 0}
}
}
-impl <S> Stream for ChunkStream<S>
- where S: Stream,
- S::Item: AsRef<[u8]>,
- S::Error: Into<Error>,
+impl<S: Unpin> Unpin for ChunkStream<S> {}
+
+impl<S: Unpin> Stream for ChunkStream<S>
+where
+ S: TryStream,
+ S::Ok: AsRef<[u8]>,
+ S::Error: Into<Error>,
{
- type Item = BytesMut;
- type Error = Error;
+ type Item = Result<BytesMut, Error>;
- fn poll(&mut self) -> Poll<Option<BytesMut>, Error> {
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+ let this = self.get_mut();
loop {
+ if this.scan_pos < this.buffer.len() {
+ let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]);
- if self.scan_pos < self.buffer.len() {
- let boundary = self.chunker.scan(&self.buffer[self.scan_pos..]);
-
- let chunk_size = self.scan_pos + boundary;
+ let chunk_size = this.scan_pos + boundary;
if boundary == 0 {
- self.scan_pos = self.buffer.len();
+ this.scan_pos = this.buffer.len();
// continue poll
- } else if chunk_size <= self.buffer.len() {
- let result = self.buffer.split_to(chunk_size);
- self.scan_pos = 0;
- return Ok(Async::Ready(Some(result)));
+ } else if chunk_size <= this.buffer.len() {
+ let result = this.buffer.split_to(chunk_size);
+ this.scan_pos = 0;
+ return Poll::Ready(Some(Ok(result)));
} else {
panic!("got unexpected chunk boundary from chunker");
}
}
- match self.input.poll() {
- Err(err) => {
- return Err(err.into());
+ match ready!(Pin::new(&mut this.input).try_poll_next(cx)) {
+ Some(Err(err)) => {
+ return Poll::Ready(Some(Err(err.into())));
}
- Ok(Async::NotReady) => {
- return Ok(Async::NotReady);
- }
- Ok(Async::Ready(None)) => {
- self.scan_pos = 0;
- if self.buffer.len() > 0 {
- return Ok(Async::Ready(Some(self.buffer.take())));
+ None => {
+ this.scan_pos = 0;
+ if this.buffer.len() > 0 {
+ return Poll::Ready(Some(Ok(this.buffer.take())));
} else {
- return Ok(Async::Ready(None));
+ return Poll::Ready(None);
}
}
- Ok(Async::Ready(Some(data))) => {
- self.buffer.extend_from_slice(data.as_ref());
- }
+ Some(Ok(data)) => {
+ this.buffer.extend_from_slice(data.as_ref());
+ }
}
}
}
}
/// Split input stream into fixed sized chunks
-pub struct FixedChunkStream<S> {
+pub struct FixedChunkStream<S: Unpin> {
input: S,
chunk_size: usize,
buffer: BytesMut,
}
-impl <S> FixedChunkStream<S> {
-
+impl<S: Unpin> FixedChunkStream<S> {
pub fn new(input: S, chunk_size: usize) -> Self {
Self { input, chunk_size, buffer: BytesMut::new() }
}
}
-impl <S> Stream for FixedChunkStream<S>
- where S: Stream,
- S::Item: AsRef<[u8]>,
-{
+impl<S: Unpin> Unpin for FixedChunkStream<S> {}
- type Item = BytesMut;
- type Error = S::Error;
+impl<S: Unpin> Stream for FixedChunkStream<S>
+where
+ S: TryStream,
+ S::Ok: AsRef<[u8]>,
+{
+ type Item = Result<BytesMut, S::Error>;
- fn poll(&mut self) -> Poll<Option<BytesMut>, S::Error> {
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<BytesMut, S::Error>>> {
+ let this = self.get_mut();
loop {
-
- if self.buffer.len() == self.chunk_size {
- return Ok(Async::Ready(Some(self.buffer.take())));
- } else if self.buffer.len() > self.chunk_size {
- let result = self.buffer.split_to(self.chunk_size);
- return Ok(Async::Ready(Some(result)));
+ if this.buffer.len() == this.chunk_size {
+ return Poll::Ready(Some(Ok(this.buffer.take())));
+ } else if this.buffer.len() > this.chunk_size {
+ let result = this.buffer.split_to(this.chunk_size);
+ return Poll::Ready(Some(Ok(result)));
}
- match self.input.poll() {
- Err(err) => {
- return Err(err);
- }
- Ok(Async::NotReady) => {
- return Ok(Async::NotReady);
+ match ready!(Pin::new(&mut this.input).try_poll_next(cx)) {
+ Some(Err(err)) => {
+ return Poll::Ready(Some(Err(err)));
}
- Ok(Async::Ready(None)) => {
+ None => {
// last chunk can have any size
- if self.buffer.len() > 0 {
- return Ok(Async::Ready(Some(self.buffer.take())));
+ if this.buffer.len() > 0 {
+ return Poll::Ready(Some(Ok(this.buffer.take())));
} else {
- return Ok(Async::Ready(None));
+ return Poll::Ready(None);
}
}
- Ok(Async::Ready(Some(data))) => {
- self.buffer.extend_from_slice(data.as_ref());
+ Some(Ok(data)) => {
+ this.buffer.extend_from_slice(data.as_ref());
}
}
}