1 use super::buf_writer
::BufWriter
;
2 use futures_core
::ready
;
3 use futures_core
::task
::{Context, Poll}
;
4 use futures_io
::AsyncWrite
;
5 use futures_io
::IoSlice
;
6 use pin_project_lite
::pin_project
;
11 /// Wrap a writer, like [`BufWriter`] does, but prioritizes buffering lines
13 /// This was written based on `std::io::LineWriter` which goes into further details
14 /// explaining the code.
16 /// Buffering is actually done using `BufWriter`. This class will leverage `BufWriter`
17 /// to write on-each-line.
19 pub struct LineWriter
<W
: AsyncWrite
> {
21 buf_writer
: BufWriter
<W
>,
25 impl<W
: AsyncWrite
> LineWriter
<W
> {
26 /// Create a new `LineWriter` with default buffer capacity. The default is currently 1KB
27 /// which was taken from `std::io::LineWriter`
28 pub fn new(inner
: W
) -> LineWriter
<W
> {
29 LineWriter
::with_capacity(1024, inner
)
32 /// Creates a new `LineWriter` with the specified buffer capacity.
33 pub fn with_capacity(capacity
: usize, inner
: W
) -> LineWriter
<W
> {
34 LineWriter { buf_writer: BufWriter::with_capacity(capacity, inner) }
37 /// Flush `buf_writer` if last char is "new line"
38 fn flush_if_completed_line(self: Pin
<&mut Self>, cx
: &mut Context
<'_
>) -> Poll
<io
::Result
<()>> {
39 let this
= self.project();
40 match this
.buf_writer
.buffer().last().copied() {
41 Some(b'
\n'
) => this
.buf_writer
.flush_buf(cx
),
42 _
=> Poll
::Ready(Ok(())),
46 /// Returns a reference to `buf_writer`'s internally buffered data.
47 pub fn buffer(&self) -> &[u8] {
48 self.buf_writer
.buffer()
51 /// Acquires a reference to the underlying sink or stream that this combinator is
53 pub fn get_ref(&self) -> &W
{
54 self.buf_writer
.get_ref()
58 impl<W
: AsyncWrite
> AsyncWrite
for LineWriter
<W
> {
60 mut self: Pin
<&mut Self>,
63 ) -> Poll
<io
::Result
<usize>> {
64 let mut this
= self.as_mut().project();
65 let newline_index
= match memchr
::memrchr(b'
\n'
, buf
) {
67 ready
!(self.as_mut().flush_if_completed_line(cx
)?
);
68 return self.project().buf_writer
.poll_write(cx
, buf
);
70 Some(newline_index
) => newline_index
+ 1,
73 ready
!(this
.buf_writer
.as_mut().poll_flush(cx
)?
);
75 let lines
= &buf
[..newline_index
];
77 let flushed
= { ready!(this.buf_writer.as_mut().inner_poll_write(cx, lines))? }
;
80 return Poll
::Ready(Ok(0));
83 let tail
= if flushed
>= newline_index
{
85 } else if newline_index
- flushed
<= this
.buf_writer
.capacity() {
86 &buf
[flushed
..newline_index
]
88 let scan_area
= &buf
[flushed
..];
89 let scan_area
= &scan_area
[..this
.buf_writer
.capacity()];
90 match memchr
::memrchr(b'
\n'
, scan_area
) {
91 Some(newline_index
) => &scan_area
[..newline_index
+ 1],
96 let buffered
= this
.buf_writer
.as_mut().write_to_buf(tail
);
97 Poll
::Ready(Ok(flushed
+ buffered
))
100 fn poll_write_vectored(
101 mut self: Pin
<&mut Self>,
102 cx
: &mut Context
<'_
>,
103 bufs
: &[IoSlice
<'_
>],
104 ) -> Poll
<io
::Result
<usize>> {
105 let mut this
= self.as_mut().project();
106 // `is_write_vectored()` is handled in original code, but not in this crate
107 // see https://github.com/rust-lang/rust/issues/70436
109 let last_newline_buf_idx
= bufs
113 .find_map(|(i
, buf
)| memchr
::memchr(b'
\n'
, buf
).map(|_
| i
));
114 let last_newline_buf_idx
= match last_newline_buf_idx
{
116 ready
!(self.as_mut().flush_if_completed_line(cx
)?
);
117 return self.project().buf_writer
.poll_write_vectored(cx
, bufs
);
122 ready
!(this
.buf_writer
.as_mut().poll_flush(cx
)?
);
124 let (lines
, tail
) = bufs
.split_at(last_newline_buf_idx
+ 1);
126 let flushed
= { ready!(this.buf_writer.as_mut().inner_poll_write_vectored(cx, lines))? }
;
128 return Poll
::Ready(Ok(0));
131 let lines_len
= lines
.iter().map(|buf
| buf
.len()).sum();
132 if flushed
< lines_len
{
133 return Poll
::Ready(Ok(flushed
));
136 let buffered
: usize = tail
138 .filter(|buf
| !buf
.is_empty())
139 .map(|buf
| this
.buf_writer
.as_mut().write_to_buf(buf
))
140 .take_while(|&n
| n
> 0)
143 Poll
::Ready(Ok(flushed
+ buffered
))
146 /// Forward to `buf_writer` 's `BufWriter::poll_flush()`
147 fn poll_flush(mut self: Pin
<&mut Self>, cx
: &mut Context
<'_
>) -> Poll
<io
::Result
<()>> {
148 self.as_mut().project().buf_writer
.poll_flush(cx
)
151 /// Forward to `buf_writer` 's `BufWriter::poll_close()`
152 fn poll_close(mut self: Pin
<&mut Self>, cx
: &mut Context
<'_
>) -> Poll
<io
::Result
<()>> {
153 self.as_mut().project().buf_writer
.poll_close(cx
)