1 use std
::io
::{self, Read}
;
3 use std
::task
::{Context, Poll}
;
4 use std
::sync
::mpsc
::Receiver
;
7 use futures
::stream
::Stream
;
9 use crate::tools
::runtime
::block_in_place
;
11 /// Wrapper struct to convert a Reader into a Stream
12 pub struct WrappedReaderStream
<R
: Read
+ Unpin
> {
17 impl <R
: Read
+ Unpin
> WrappedReaderStream
<R
> {
19 pub fn new(reader
: R
) -> Self {
20 let mut buffer
= Vec
::with_capacity(64*1024);
21 unsafe { buffer.set_len(buffer.capacity()); }
22 Self { reader, buffer }
26 impl<R
: Read
+ Unpin
> Stream
for WrappedReaderStream
<R
> {
27 type Item
= Result
<Vec
<u8>, io
::Error
>;
29 fn poll_next(self: Pin
<&mut Self>, _cx
: &mut Context
) -> Poll
<Option
<Self::Item
>> {
30 let this
= self.get_mut();
31 match block_in_place(|| this
.reader
.read(&mut this
.buffer
)) {
37 Poll
::Ready(Some(Ok(this
.buffer
[..n
].to_vec())))
40 Err(err
) => Poll
::Ready(Some(Err(err
))),
46 /// Wrapper struct to convert a channel Receiver into a Stream
47 pub struct StdChannelStream
<T
>(pub Receiver
<T
>);
49 impl<T
> Stream
for StdChannelStream
<T
> {
52 fn poll_next(self: Pin
<&mut Self>, _cx
: &mut Context
) -> Poll
<Option
<Self::Item
>> {
53 match block_in_place(|| self.0.recv()) {
54 Ok(data
) => Poll
::Ready(Some(data
)),
55 Err(_
) => Poll
::Ready(None
),// channel closed
65 use futures
::stream
::TryStreamExt
;
68 fn test_wrapped_stream_reader() -> Result
<(), Error
> {
69 crate::tools
::runtime
::main(async
{
70 run_wrapped_stream_reader_test().await
74 struct DummyReader(usize);
76 impl io
::Read
for DummyReader
{
77 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> {
85 std
::ptr
::write_bytes(buf
.as_mut_ptr(), 0, buf
.len());
92 async
fn run_wrapped_stream_reader_test() -> Result
<(), Error
> {
93 let mut reader
= super::WrappedReaderStream
::new(DummyReader(0));
94 while let Some(_data
) = reader
.try_next().await?
{