use std::io::{self, Read};
use std::pin::Pin;
use std::task::{Context, Poll};
+use std::sync::mpsc::Receiver;
+
-use tokio_executor::threadpool::blocking;
use futures::stream::Stream;
+use crate::tools::runtime::block_in_place;
+
+/// Wrapper struct to convert a Reader into a Stream
pub struct WrappedReaderStream<R: Read + Unpin> {
reader: R,
buffer: Vec<u8>,
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
- match blocking(|| this.reader.read(&mut this.buffer)) {
- Poll::Ready(Ok(Ok(n))) => {
+ match block_in_place(|| this.reader.read(&mut this.buffer)) {
+ Ok(n) => {
if n == 0 {
// EOF
Poll::Ready(None)
Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
}
}
- Poll::Ready(Ok(Err(err))) => Poll::Ready(Some(Err(err))),
- Poll::Ready(Err(err)) => Poll::Ready(Some(Err(io::Error::new(
- io::ErrorKind::Other,
- err.to_string(),
- )))),
- Poll::Pending => Poll::Pending,
+ Err(err) => Poll::Ready(Some(Err(err))),
+ }
+ }
+}
+
+
+/// Wrapper struct to convert a channel Receiver into a Stream
+pub struct StdChannelStream<T>(pub Receiver<T>);
+
+impl<T> Stream for StdChannelStream<T> {
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
+ match block_in_place(|| self.0.recv()) {
+ Ok(data) => Poll::Ready(Some(data)),
+ Err(_) => Poll::Ready(None),// channel closed
}
}
}
#[test]
fn test_wrapped_stream_reader() -> Result<(), Error> {
- // This cannot be used currently, because it doesn't permit blocking() annotations:
crate::tools::runtime::main(async {
run_wrapped_stream_reader_test().await
})