1 //! Wrappers between async readers and streams.
3 use std
::io
::{self, Read}
;
4 use std
::future
::Future
;
6 use std
::task
::{Context, Poll}
;
8 use anyhow
::{Error, Result}
;
9 use tokio
::io
::{AsyncRead, AsyncWrite, ReadBuf}
;
10 use tokio
::sync
::mpsc
::Sender
;
12 use futures
::future
::FutureExt
;
13 use futures
::stream
::Stream
;
15 use proxmox
::io_format_err
;
16 use proxmox
::sys
::error
::io_err_other
;
17 use proxmox_io
::ByteBuffer
;
19 use pbs_runtime
::block_in_place
;
21 /// Wrapper struct to convert a Reader into a Stream
22 pub struct WrappedReaderStream
<R
: Read
+ Unpin
> {
27 impl <R
: Read
+ Unpin
> WrappedReaderStream
<R
> {
29 pub fn new(reader
: R
) -> Self {
30 let mut buffer
= Vec
::with_capacity(64*1024);
31 unsafe { buffer.set_len(buffer.capacity()); }
32 Self { reader, buffer }
36 impl<R
: Read
+ Unpin
> Stream
for WrappedReaderStream
<R
> {
37 type Item
= Result
<Vec
<u8>, io
::Error
>;
39 fn poll_next(self: Pin
<&mut Self>, _cx
: &mut Context
) -> Poll
<Option
<Self::Item
>> {
40 let this
= self.get_mut();
41 match block_in_place(|| this
.reader
.read(&mut this
.buffer
)) {
47 Poll
::Ready(Some(Ok(this
.buffer
[..n
].to_vec())))
50 Err(err
) => Poll
::Ready(Some(Err(err
))),
55 /// Wrapper struct to convert an AsyncReader into a Stream
56 pub struct AsyncReaderStream
<R
: AsyncRead
+ Unpin
> {
61 impl <R
: AsyncRead
+ Unpin
> AsyncReaderStream
<R
> {
63 pub fn new(reader
: R
) -> Self {
64 let mut buffer
= Vec
::with_capacity(64*1024);
65 unsafe { buffer.set_len(buffer.capacity()); }
66 Self { reader, buffer }
69 pub fn with_buffer_size(reader
: R
, buffer_size
: usize) -> Self {
70 let mut buffer
= Vec
::with_capacity(buffer_size
);
71 unsafe { buffer.set_len(buffer.capacity()); }
72 Self { reader, buffer }
76 impl<R
: AsyncRead
+ Unpin
> Stream
for AsyncReaderStream
<R
> {
77 type Item
= Result
<Vec
<u8>, io
::Error
>;
79 fn poll_next(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<Option
<Self::Item
>> {
80 let this
= self.get_mut();
81 let mut read_buf
= ReadBuf
::new(&mut this
.buffer
);
82 match ready
!(Pin
::new(&mut this
.reader
).poll_read(cx
, &mut read_buf
)) {
84 let n
= read_buf
.filled().len();
89 Poll
::Ready(Some(Ok(this
.buffer
[..n
].to_vec())))
92 Err(err
) => Poll
::Ready(Some(Err(err
))),
102 use futures
::stream
::TryStreamExt
;
105 fn test_wrapped_stream_reader() -> Result
<(), Error
> {
106 pbs_runtime
::main(async
{
107 run_wrapped_stream_reader_test().await
111 struct DummyReader(usize);
113 impl io
::Read
for DummyReader
{
114 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> {
122 std
::ptr
::write_bytes(buf
.as_mut_ptr(), 0, buf
.len());
129 async
fn run_wrapped_stream_reader_test() -> Result
<(), Error
> {
130 let mut reader
= super::WrappedReaderStream
::new(DummyReader(0));
131 while let Some(_data
) = reader
.try_next().await?
{
138 /// Wrapper around tokio::sync::mpsc::Sender, which implements Write
139 pub struct AsyncChannelWriter
{
140 sender
: Option
<Sender
<Result
<Vec
<u8>, Error
>>>,
145 type SendResult
= io
::Result
<Sender
<Result
<Vec
<u8>>>>;
149 Sending(Pin
<Box
<dyn Future
<Output
= SendResult
> + Send
+ '
static>>),
152 impl AsyncChannelWriter
{
153 pub fn new(sender
: Sender
<Result
<Vec
<u8>, Error
>>, buf_size
: usize) -> Self {
155 sender
: Some(sender
),
156 buf
: ByteBuffer
::with_capacity(buf_size
),
157 state
: WriterState
::Ready
,
166 ) -> Poll
<io
::Result
<usize>> {
168 match &mut self.state
{
169 WriterState
::Ready
=> {
171 if self.buf
.is_empty() {
172 return Poll
::Ready(Ok(0));
175 let free_size
= self.buf
.free_size();
176 if free_size
> buf
.len() || self.buf
.is_empty() {
177 let count
= free_size
.min(buf
.len());
178 self.buf
.get_free_mut_slice()[..count
].copy_from_slice(&buf
[..count
]);
179 self.buf
.add_size(count
);
180 return Poll
::Ready(Ok(count
));
184 let sender
= match self.sender
.take() {
185 Some(sender
) => sender
,
186 None
=> return Poll
::Ready(Err(io_err_other("no sender"))),
189 let data
= self.buf
.remove_data(self.buf
.len()).to_vec();
190 let future
= async
move {
194 .map(move |_
| sender
)
195 .map_err(|err
| io_format_err
!("could not send: {}", err
))
198 self.state
= WriterState
::Sending(future
.boxed());
200 WriterState
::Sending(ref mut future
) => match ready
!(future
.as_mut().poll(cx
)) {
202 self.sender
= Some(sender
);
203 self.state
= WriterState
::Ready
;
205 Err(err
) => return Poll
::Ready(Err(err
)),
212 impl AsyncWrite
for AsyncChannelWriter
{
213 fn poll_write(self: Pin
<&mut Self>, cx
: &mut Context
, buf
: &[u8]) -> Poll
<io
::Result
<usize>> {
214 let this
= self.get_mut();
215 this
.poll_write_impl(cx
, buf
, false)
218 fn poll_flush(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {
219 let this
= self.get_mut();
220 match ready
!(this
.poll_write_impl(cx
, &[], true)) {
221 Ok(_
) => Poll
::Ready(Ok(())),
222 Err(err
) => Poll
::Ready(Err(err
)),
226 fn poll_shutdown(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {