-//use failure::*;
-use tokio_threadpool;
-use std::io::Read;
-use futures::Async;
+use std::io::{self, Read};
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::sync::mpsc::Receiver;
+
+
use futures::stream::Stream;
-pub struct WrappedReaderStream<R: Read> {
+use crate::tools::runtime::block_in_place;
+
+/// Wrapper struct to convert a Reader into a Stream
+pub struct WrappedReaderStream<R: Read + Unpin> {
reader: R,
buffer: Vec<u8>,
}
-impl <R: Read> WrappedReaderStream<R> {
+impl <R: Read + Unpin> WrappedReaderStream<R> {
pub fn new(reader: R) -> Self {
let mut buffer = Vec::with_capacity(64*1024);
}
}
-fn blocking_err() -> std::io::Error {
- std::io::Error::new(
- std::io::ErrorKind::Other,
- "`blocking` annotated I/O must be called from the context of the Tokio runtime.")
+impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
+ type Item = Result<Vec<u8>, io::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
+ let this = self.get_mut();
+ match block_in_place(|| this.reader.read(&mut this.buffer)) {
+ Ok(n) => {
+ if n == 0 {
+ // EOF
+ Poll::Ready(None)
+ } else {
+ Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
+ }
+ }
+ Err(err) => Poll::Ready(Some(Err(err))),
+ }
+ }
}
-impl <R: Read> Stream for WrappedReaderStream<R> {
- type Item = Vec<u8>;
- type Error = std::io::Error;
+/// Wrapper struct to convert a channel Receiver into a Stream
+pub struct StdChannelStream<T>(pub Receiver<T>);
- fn poll(&mut self) -> Result<Async<Option<Vec<u8>>>, std::io::Error> {
- match tokio_threadpool::blocking(|| self.reader.read(&mut self.buffer)) {
- Ok(Async::Ready(Ok(n))) => {
- if n == 0 { // EOF
- Ok(Async::Ready(None))
- } else {
- Ok(Async::Ready(Some(self.buffer[..n].to_vec())))
- }
- },
- Ok(Async::Ready(Err(err))) => Err(err),
- Ok(Async::NotReady) => Ok(Async::NotReady),
- Err(_) => Err(blocking_err()),
+impl<T> Stream for StdChannelStream<T> {
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
+ match block_in_place(|| self.0.recv()) {
+ Ok(data) => Poll::Ready(Some(data)),
+ Err(_) => Poll::Ready(None),// channel closed
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use std::io;
+
+ use failure::Error;
+ use futures::stream::TryStreamExt;
+
+ #[test]
+ fn test_wrapped_stream_reader() -> Result<(), Error> {
+ crate::tools::runtime::main(async {
+ run_wrapped_stream_reader_test().await
+ })
+ }
+
+ struct DummyReader(usize);
+
+ impl io::Read for DummyReader {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.0 += 1;
+
+ if self.0 >= 10 {
+ return Ok(0);
+ }
+
+ unsafe {
+ std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len());
+ }
+
+ Ok(buf.len())
+ }
+ }
+
+ async fn run_wrapped_stream_reader_test() -> Result<(), Error> {
+ let mut reader = super::WrappedReaderStream::new(DummyReader(0));
+ while let Some(_data) = reader.try_next().await? {
+ // just waiting
}
+ Ok(())
}
}