1 use std
::io
::{self, Read}
;
3 use std
::task
::{Context, Poll}
;
5 use futures
::stream
::Stream
;
7 use crate::tools
::runtime
::block_in_place
;
9 pub struct WrappedReaderStream
<R
: Read
+ Unpin
> {
14 impl <R
: Read
+ Unpin
> WrappedReaderStream
<R
> {
16 pub fn new(reader
: R
) -> Self {
17 let mut buffer
= Vec
::with_capacity(64*1024);
18 unsafe { buffer.set_len(buffer.capacity()); }
19 Self { reader, buffer }
23 impl<R
: Read
+ Unpin
> Stream
for WrappedReaderStream
<R
> {
24 type Item
= Result
<Vec
<u8>, io
::Error
>;
26 fn poll_next(self: Pin
<&mut Self>, _cx
: &mut Context
) -> Poll
<Option
<Self::Item
>> {
27 let this
= self.get_mut();
28 match block_in_place(|| this
.reader
.read(&mut this
.buffer
)) {
34 Poll
::Ready(Some(Ok(this
.buffer
[..n
].to_vec())))
37 Err(err
) => Poll
::Ready(Some(Err(err
))),
47 use futures
::stream
::TryStreamExt
;
50 fn test_wrapped_stream_reader() -> Result
<(), Error
> {
51 crate::tools
::runtime
::main(async
{
52 run_wrapped_stream_reader_test().await
56 struct DummyReader(usize);
58 impl io
::Read
for DummyReader
{
59 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> {
67 std
::ptr
::write_bytes(buf
.as_mut_ptr(), 0, buf
.len());
74 async
fn run_wrapped_stream_reader_test() -> Result
<(), Error
> {
75 let mut reader
= super::WrappedReaderStream
::new(DummyReader(0));
76 while let Some(_data
) = reader
.try_next().await?
{