1 use std
::io
::{self, Read}
;
3 use std
::task
::{Context, Poll}
;
5 use tokio_executor
::threadpool
::blocking
;
6 use futures
::stream
::Stream
;
8 pub struct WrappedReaderStream
<R
: Read
+ Unpin
> {
13 impl <R
: Read
+ Unpin
> WrappedReaderStream
<R
> {
15 pub fn new(reader
: R
) -> Self {
16 let mut buffer
= Vec
::with_capacity(64*1024);
17 unsafe { buffer.set_len(buffer.capacity()); }
18 Self { reader, buffer }
22 impl<R
: Read
+ Unpin
> Stream
for WrappedReaderStream
<R
> {
23 type Item
= Result
<Vec
<u8>, io
::Error
>;
25 fn poll_next(self: Pin
<&mut Self>, _cx
: &mut Context
) -> Poll
<Option
<Self::Item
>> {
26 let this
= self.get_mut();
27 match blocking(|| this
.reader
.read(&mut this
.buffer
)) {
28 Poll
::Ready(Ok(Ok(n
))) => {
33 Poll
::Ready(Some(Ok(this
.buffer
[..n
].to_vec())))
36 Poll
::Ready(Ok(Err(err
))) => Poll
::Ready(Some(Err(err
))),
37 Poll
::Ready(Err(err
)) => Poll
::Ready(Some(Err(io
::Error
::new(
41 Poll
::Pending
=> Poll
::Pending
,
51 use futures
::stream
::TryStreamExt
;
54 fn test_wrapped_stream_reader() -> Result
<(), Error
> {
55 // This cannot be used currently, because it doesn't permit blocking() annotations:
56 crate::tools
::runtime
::main(async
{
57 run_wrapped_stream_reader_test().await
61 struct DummyReader(usize);
63 impl io
::Read
for DummyReader
{
64 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> {
72 std
::ptr
::write_bytes(buf
.as_mut_ptr(), 0, buf
.len());
79 async
fn run_wrapped_stream_reader_test() -> Result
<(), Error
> {
80 let mut reader
= super::WrappedReaderStream
::new(DummyReader(0));
81 while let Some(_data
) = reader
.try_next().await?
{