1 use std
::future
::Future
;
4 use std
::task
::{Context, Poll}
;
6 use anyhow
::{Error, Result}
;
7 use futures
::{future::FutureExt, ready}
;
8 use tokio
::io
::AsyncWrite
;
9 use tokio
::sync
::mpsc
::Sender
;
11 use proxmox
::io_format_err
;
12 use proxmox
::tools
::byte_buffer
::ByteBuffer
;
13 use proxmox
::sys
::error
::io_err_other
;
15 /// Wrapper around tokio::sync::mpsc::Sender, which implements Write
16 pub struct AsyncChannelWriter
{
17 sender
: Option
<Sender
<Result
<Vec
<u8>, Error
>>>,
22 type SendResult
= io
::Result
<Sender
<Result
<Vec
<u8>>>>;
26 Sending(Pin
<Box
<dyn Future
<Output
= SendResult
> + Send
+ '
static>>),
29 impl AsyncChannelWriter
{
30 pub fn new(sender
: Sender
<Result
<Vec
<u8>, Error
>>, buf_size
: usize) -> Self {
33 buf
: ByteBuffer
::with_capacity(buf_size
),
34 state
: WriterState
::Ready
,
43 ) -> Poll
<io
::Result
<usize>> {
45 match &mut self.state
{
46 WriterState
::Ready
=> {
48 if self.buf
.is_empty() {
49 return Poll
::Ready(Ok(0));
52 let free_size
= self.buf
.free_size();
53 if free_size
> buf
.len() || self.buf
.is_empty() {
54 let count
= free_size
.min(buf
.len());
55 self.buf
.get_free_mut_slice()[..count
].copy_from_slice(&buf
[..count
]);
56 self.buf
.add_size(count
);
57 return Poll
::Ready(Ok(count
));
61 let sender
= match self.sender
.take() {
62 Some(sender
) => sender
,
63 None
=> return Poll
::Ready(Err(io_err_other("no sender"))),
66 let data
= self.buf
.remove_data(self.buf
.len()).to_vec();
67 let future
= async
move {
72 .map_err(|err
| io_format_err
!("could not send: {}", err
))
75 self.state
= WriterState
::Sending(future
.boxed());
77 WriterState
::Sending(ref mut future
) => match ready
!(future
.as_mut().poll(cx
)) {
79 self.sender
= Some(sender
);
80 self.state
= WriterState
::Ready
;
82 Err(err
) => return Poll
::Ready(Err(err
)),
89 impl AsyncWrite
for AsyncChannelWriter
{
90 fn poll_write(self: Pin
<&mut Self>, cx
: &mut Context
, buf
: &[u8]) -> Poll
<io
::Result
<usize>> {
91 let this
= self.get_mut();
92 this
.poll_write_impl(cx
, buf
, false)
95 fn poll_flush(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {
96 let this
= self.get_mut();
97 match ready
!(this
.poll_write_impl(cx
, &[], true)) {
98 Ok(_
) => Poll
::Ready(Ok(())),
99 Err(err
) => Poll
::Ready(Err(err
)),
103 fn poll_shutdown(self: Pin
<&mut Self>, cx
: &mut Context
) -> Poll
<io
::Result
<()>> {