7 use futures_io
::{AsyncBufRead, AsyncRead}
;
8 use futures_lite
::ready
;
10 use crate::{decode, read::ProgressAction, BandRef, PacketLineRef, StreamingPeekableIter, TextRef, U16_HEX_BYTES}
;
12 type ReadLineResult
<'a
> = Option
<std
::io
::Result
<Result
<PacketLineRef
<'a
>, decode
::Error
>>>;
13 /// An implementor of [`AsyncBufRead`] yielding packet lines on each call to [`read_line()`][AsyncBufRead::read_line()].
14 /// It's also possible to hide the underlying packet lines using the [`Read`][AsyncRead] implementation which is useful
15 /// if they represent binary data, like the one of a pack file.
16 pub struct WithSidebands
<'a
, T
, F
>
21 handle_progress
: Option
<F
>,
26 impl<'a
, T
, F
> Drop
for WithSidebands
<'a
, T
, F
>
31 if let State
::Idle { ref mut parent }
= self.state
{
34 .expect("parent is always available if we are idle")
40 impl<'a
, T
> WithSidebands
<'a
, T
, fn(bool
, &[u8]) -> ProgressAction
>
44 /// Create a new instance with the given provider as `parent`.
45 pub fn new(parent
: &'a
mut StreamingPeekableIter
<T
>) -> Self {
47 state
: State
::Idle { parent: Some(parent) }
,
48 handle_progress
: None
,
57 parent
: Option
<&'a
mut StreamingPeekableIter
<T
>>,
60 read_line
: Pin
<Box
<dyn Future
<Output
= ReadLineResult
<'a
>> + 'a
>>,
61 parent_inactive
: Option
<*mut StreamingPeekableIter
<T
>>,
66 /// It's safe because T is `Send` and we have a test that assures that our `StreamingPeekableIter` is `Send` as well,
67 /// hence the `*mut _` is `Send`.
68 /// `read_line` isn't send and we can't declare it as such as it forces `Send` in all places (BUT WHY IS THAT A PROBLEM, I don't recall).
69 /// However, it's only used when pinned and thus isn't actually sent anywhere, it's a secondary state of the future used after it was Send
70 /// to a thread possibly.
71 // TODO: Is it possible to declare it as it should be?
72 #[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
73 unsafe impl<'a
, T
> Send
for State
<'a
, T
> where T
: Send {}
78 fn receiver
<T
: Send
>(_i
: T
) {}
80 /// We want to declare items containing pointers of `StreamingPeekableIter` `Send` as well, so it must be `Send` itself.
82 fn streaming_peekable_iter_is_send() {
83 receiver(StreamingPeekableIter
::new(Vec
::<u8>::new(), &[]));
88 let mut s
= StreamingPeekableIter
::new(Vec
::<u8>::new(), &[]);
89 receiver(State
::Idle { parent: Some(&mut s) }
);
93 impl<'a
, T
, F
> WithSidebands
<'a
, T
, F
>
96 F
: FnMut(bool
, &[u8]) -> ProgressAction
+ Unpin
,
98 /// Create a new instance with the given `parent` provider and the `handle_progress` function.
100 /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool`
101 /// being true in case the `text` is to be interpreted as error.
102 pub fn with_progress_handler(parent
: &'a
mut StreamingPeekableIter
<T
>, handle_progress
: F
) -> Self {
104 state
: State
::Idle { parent: Some(parent) }
,
105 handle_progress
: Some(handle_progress
),
111 /// Create a new instance without a progress handler.
112 pub fn without_progress_handler(parent
: &'a
mut StreamingPeekableIter
<T
>) -> Self {
114 state
: State
::Idle { parent: Some(parent) }
,
115 handle_progress
: None
,
121 /// Forwards to the parent [`StreamingPeekableIter::reset_with()`]
122 pub fn reset_with(&mut self, delimiters
: &'
static [PacketLineRef
<'
static>]) {
123 if let State
::Idle { ref mut parent }
= self.state
{
126 .expect("parent is always available if we are idle")
127 .reset_with(delimiters
)
131 /// Forwards to the parent [`StreamingPeekableIter::stopped_at()`]
132 pub fn stopped_at(&self) -> Option
<PacketLineRef
<'
static>> {
134 State
::Idle { ref parent }
=> {
137 .expect("parent is always available if we are idle")
144 /// Set or unset the progress handler.
145 pub fn set_progress_handler(&mut self, handle_progress
: Option
<F
>) {
146 self.handle_progress
= handle_progress
;
149 /// Effectively forwards to the parent [`StreamingPeekableIter::peek_line()`], allowing to see what would be returned
150 /// next on a call to [`read_line()`][io::BufRead::read_line()].
154 /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it.
155 pub async
fn peek_data_line(&mut self) -> Option
<std
::io
::Result
<Result
<&[u8], decode
::Error
>>> {
157 State
::Idle { ref mut parent }
=> match parent
159 .expect("parent is always available if we are idle")
163 Some(Ok(Ok(PacketLineRef
::Data(line
)))) => Some(Ok(Ok(line
))),
164 Some(Ok(Err(err
))) => Some(Ok(Err(err
))),
165 Some(Err(err
)) => Some(Err(err
)),
172 /// Read a packet line as string line.
173 pub fn read_line_to_string
<'b
>(&'b
mut self, buf
: &'b
mut String
) -> ReadLineFuture
<'a
, 'b
, T
, F
> {
174 ReadLineFuture { parent: self, buf }
177 /// Read a packet line from the underlying packet reader, returning empty lines if a stop-packetline was reached.
181 /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it.
182 pub async
fn read_data_line(&mut self) -> Option
<std
::io
::Result
<Result
<PacketLineRef
<'_
>, decode
::Error
>>> {
183 match &mut self.state
{
184 State
::Idle { parent: Some(parent) }
=> {
187 "we don't support partial buffers right now - read-line must be used consistently"
189 parent
.read_line().await
196 pub struct ReadDataLineFuture
<'a
, 'b
, T
: AsyncRead
, F
> {
197 parent
: &'b
mut WithSidebands
<'a
, T
, F
>,
198 buf
: &'b
mut Vec
<u8>,
201 impl<'a
, 'b
, T
, F
> Future
for ReadDataLineFuture
<'a
, 'b
, T
, F
>
203 T
: AsyncRead
+ Unpin
,
204 F
: FnMut(bool
, &[u8]) -> ProgressAction
+ Unpin
,
206 type Output
= std
::io
::Result
<usize>;
208 fn poll(mut self: Pin
<&mut Self>, cx
: &mut Context
<'_
>) -> Poll
<Self::Output
> {
211 "we don't support partial buffers right now - read-line must be used consistently"
213 let Self { buf, parent }
= &mut *self;
214 let line
= ready
!(Pin
::new(parent
).poll_fill_buf(cx
))?
;
216 buf
.extend_from_slice(line
);
217 let bytes
= line
.len();
219 Poll
::Ready(Ok(bytes
))
223 pub struct ReadLineFuture
<'a
, 'b
, T
: AsyncRead
, F
> {
224 parent
: &'b
mut WithSidebands
<'a
, T
, F
>,
228 impl<'a
, 'b
, T
, F
> Future
for ReadLineFuture
<'a
, 'b
, T
, F
>
230 T
: AsyncRead
+ Unpin
,
231 F
: FnMut(bool
, &[u8]) -> ProgressAction
+ Unpin
,
233 type Output
= std
::io
::Result
<usize>;
235 fn poll(mut self: Pin
<&mut Self>, cx
: &mut Context
<'_
>) -> Poll
<Self::Output
> {
238 "we don't support partial buffers right now - read-line must be used consistently"
240 let Self { buf, parent }
= &mut *self;
241 let line
= std
::str::from_utf8(ready
!(Pin
::new(parent
).poll_fill_buf(cx
))?
)
242 .map_err(|err
| std
::io
::Error
::new(std
::io
::ErrorKind
::Other
, err
))?
;
245 let bytes
= line
.len();
247 Poll
::Ready(Ok(bytes
))
251 impl<'a
, T
, F
> AsyncBufRead
for WithSidebands
<'a
, T
, F
>
253 T
: AsyncRead
+ Unpin
,
254 F
: FnMut(bool
, &[u8]) -> ProgressAction
+ Unpin
,
256 fn poll_fill_buf(mut self: Pin
<&mut Self>, cx
: &mut Context
<'_
>) -> Poll
<std
::io
::Result
<&[u8]>> {
259 use futures_lite
::FutureExt
;
261 let this
= self.as_mut().get_mut();
262 if this
.pos
>= this
.cap
{
263 let (ofs
, cap
) = loop {
265 State
::Idle { ref mut parent }
=> {
266 let parent
= parent
.take().expect("parent to be present here");
267 let inactive
= parent
as *mut _
;
268 this
.state
= State
::ReadLine
{
269 read_line
: parent
.read_line().boxed_local(),
270 parent_inactive
: Some(inactive
),
275 ref mut parent_inactive
,
277 let line
= ready
!(read_line
.poll(cx
));
280 let parent
= parent_inactive
.take().expect("parent pointer always set");
281 // SAFETY: It's safe to recover the original mutable reference (from which
282 // the `read_line` future was created as the latter isn't accessible anymore
283 // once the state is set to Idle. In other words, either one or the other are
284 // accessible, never both at the same time.
285 // Also: We keep a pointer around which is protected by borrowcheck since it's created
286 // from a legal mutable reference which is moved into the read_line future - if it was manually
287 // implemented we would be able to re-obtain it from there.
288 #[allow(unsafe_code)]
289 let parent
= unsafe { &mut *parent }
;
290 State
::Idle { parent: Some(parent) }
293 let line
= match line
{
294 Some(line
) => line?
.map_err(|err
| io
::Error
::new(io
::ErrorKind
::Other
, err
))?
,
295 None
=> break (0, 0),
298 match this
.handle_progress
.as_mut() {
299 Some(handle_progress
) => {
302 .map_err(|err
| io
::Error
::new(io
::ErrorKind
::Other
, err
))?
;
303 const ENCODED_BAND
: usize = 1;
305 BandRef
::Data(d
) => {
309 break (U16_HEX_BYTES
+ ENCODED_BAND
, d
.len());
311 BandRef
::Progress(d
) => {
312 let text
= TextRef
::from(d
).0;
313 match handle_progress(false, text
) {
314 ProgressAction
::Continue
=> {}
315 ProgressAction
::Interrupt
=> {
316 return Poll
::Ready(Err(io
::Error
::new(
317 std
::io
::ErrorKind
::Other
,
318 "interrupted by user",
323 BandRef
::Error(d
) => {
324 let text
= TextRef
::from(d
).0;
325 match handle_progress(true, text
) {
326 ProgressAction
::Continue
=> {}
327 ProgressAction
::Interrupt
=> {
328 return Poll
::Ready(Err(io
::Error
::new(
329 io
::ErrorKind
::Other
,
330 "interrupted by user",
338 break match line
.as_slice() {
339 Some(d
) => (U16_HEX_BYTES
, d
.len()),
341 return Poll
::Ready(Err(io
::Error
::new(
342 io
::ErrorKind
::UnexpectedEof
,
343 "encountered non-data line in a data-line only context",
352 this
.cap
= cap
+ ofs
;
356 let range
= self.pos
..self.cap
;
357 match &self.get_mut().state
{
358 State
::Idle { parent }
=> Poll
::Ready(Ok(&parent
.as_ref().expect("parent always available").buf
[range
])),
359 State
::ReadLine { .. }
=> unreachable
!("at least in theory"),
363 fn consume(self: Pin
<&mut Self>, amt
: usize) {
364 let this
= self.get_mut();
365 this
.pos
= std
::cmp
::min(this
.pos
+ amt
, this
.cap
);
369 impl<'a
, T
, F
> AsyncRead
for WithSidebands
<'a
, T
, F
>
371 T
: AsyncRead
+ Unpin
,
372 F
: FnMut(bool
, &[u8]) -> ProgressAction
+ Unpin
,
374 fn poll_read(mut self: Pin
<&mut Self>, cx
: &mut Context
<'_
>, buf
: &mut [u8]) -> Poll
<std
::io
::Result
<usize>> {
377 let mut rem
= ready
!(self.as_mut().poll_fill_buf(cx
))?
;
381 Poll
::Ready(Ok(nread
))