]>
Commit | Line | Data |
---|---|---|
0a29b90c FG |
1 | use std::{ |
2 | future::Future, | |
3 | pin::Pin, | |
4 | task::{Context, Poll}, | |
5 | }; | |
6 | ||
7 | use futures_io::{AsyncBufRead, AsyncRead}; | |
8 | use futures_lite::ready; | |
9 | ||
49aad941 | 10 | use crate::{decode, read::ProgressAction, BandRef, PacketLineRef, StreamingPeekableIter, TextRef, U16_HEX_BYTES}; |
0a29b90c FG |
11 | |
12 | type ReadLineResult<'a> = Option<std::io::Result<Result<PacketLineRef<'a>, decode::Error>>>; | |
13 | /// An implementor of [`AsyncBufRead`] yielding packet lines on each call to [`read_line()`][AsyncBufRead::read_line()]. | |
14 | /// It's also possible to hide the underlying packet lines using the [`Read`][AsyncRead] implementation which is useful | |
15 | /// if they represent binary data, like the one of a pack file. | |
16 | pub struct WithSidebands<'a, T, F> | |
17 | where | |
18 | T: AsyncRead, | |
19 | { | |
20 | state: State<'a, T>, | |
21 | handle_progress: Option<F>, | |
22 | pos: usize, | |
23 | cap: usize, | |
24 | } | |
25 | ||
26 | impl<'a, T, F> Drop for WithSidebands<'a, T, F> | |
27 | where | |
28 | T: AsyncRead, | |
29 | { | |
30 | fn drop(&mut self) { | |
31 | if let State::Idle { ref mut parent } = self.state { | |
32 | parent | |
33 | .as_mut() | |
34 | .expect("parent is always available if we are idle") | |
35 | .reset(); | |
36 | } | |
37 | } | |
38 | } | |
39 | ||
49aad941 | 40 | impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8]) -> ProgressAction> |
0a29b90c FG |
41 | where |
42 | T: AsyncRead, | |
43 | { | |
44 | /// Create a new instance with the given provider as `parent`. | |
45 | pub fn new(parent: &'a mut StreamingPeekableIter<T>) -> Self { | |
46 | WithSidebands { | |
47 | state: State::Idle { parent: Some(parent) }, | |
48 | handle_progress: None, | |
49 | pos: 0, | |
50 | cap: 0, | |
51 | } | |
52 | } | |
53 | } | |
54 | ||
55 | enum State<'a, T> { | |
56 | Idle { | |
57 | parent: Option<&'a mut StreamingPeekableIter<T>>, | |
58 | }, | |
59 | ReadLine { | |
60 | read_line: Pin<Box<dyn Future<Output = ReadLineResult<'a>> + 'a>>, | |
61 | parent_inactive: Option<*mut StreamingPeekableIter<T>>, | |
62 | }, | |
63 | } | |
64 | ||
65 | /// # SAFETY | |
66 | /// It's safe because T is `Send` and we have a test that assures that our `StreamingPeekableIter` is `Send` as well, | |
67 | /// hence the `*mut _` is `Send`. | |
68 | /// `read_line` isn't send and we can't declare it as such as it forces `Send` in all places (BUT WHY IS THAT A PROBLEM, I don't recall). | |
69 | /// However, it's only used when pinned and thus isn't actually sent anywhere, it's a secondary state of the future used after it was Send | |
70 | /// to a thread possibly. | |
71 | // TODO: Is it possible to declare it as it should be? | |
72 | #[allow(unsafe_code, clippy::non_send_fields_in_send_ty)] | |
73 | unsafe impl<'a, T> Send for State<'a, T> where T: Send {} | |
74 | ||
75 | #[cfg(test)] | |
76 | mod tests { | |
77 | use super::*; | |
78 | fn receiver<T: Send>(_i: T) {} | |
79 | ||
80 | /// We want to declare items containing pointers of StreamingPeekableIter `Send` as well, so it must be `Send` itself. | |
81 | #[test] | |
82 | fn streaming_peekable_iter_is_send() { | |
83 | receiver(StreamingPeekableIter::new(Vec::<u8>::new(), &[])); | |
84 | } | |
85 | ||
86 | #[test] | |
87 | fn state_is_send() { | |
88 | let mut s = StreamingPeekableIter::new(Vec::<u8>::new(), &[]); | |
89 | receiver(State::Idle { parent: Some(&mut s) }); | |
90 | } | |
91 | } | |
92 | ||
93 | impl<'a, T, F> WithSidebands<'a, T, F> | |
94 | where | |
95 | T: AsyncRead + Unpin, | |
49aad941 | 96 | F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, |
0a29b90c FG |
97 | { |
98 | /// Create a new instance with the given `parent` provider and the `handle_progress` function. | |
99 | /// | |
100 | /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool` | |
101 | /// being true in case the `text` is to be interpreted as error. | |
102 | pub fn with_progress_handler(parent: &'a mut StreamingPeekableIter<T>, handle_progress: F) -> Self { | |
103 | WithSidebands { | |
104 | state: State::Idle { parent: Some(parent) }, | |
105 | handle_progress: Some(handle_progress), | |
106 | pos: 0, | |
107 | cap: 0, | |
108 | } | |
109 | } | |
110 | ||
111 | /// Create a new instance without a progress handler. | |
112 | pub fn without_progress_handler(parent: &'a mut StreamingPeekableIter<T>) -> Self { | |
113 | WithSidebands { | |
114 | state: State::Idle { parent: Some(parent) }, | |
115 | handle_progress: None, | |
116 | pos: 0, | |
117 | cap: 0, | |
118 | } | |
119 | } | |
120 | ||
121 | /// Forwards to the parent [StreamingPeekableIter::reset_with()] | |
122 | pub fn reset_with(&mut self, delimiters: &'static [PacketLineRef<'static>]) { | |
123 | if let State::Idle { ref mut parent } = self.state { | |
124 | parent | |
125 | .as_mut() | |
126 | .expect("parent is always available if we are idle") | |
127 | .reset_with(delimiters) | |
128 | } | |
129 | } | |
130 | ||
131 | /// Forwards to the parent [StreamingPeekableIter::stopped_at()] | |
132 | pub fn stopped_at(&self) -> Option<PacketLineRef<'static>> { | |
133 | match self.state { | |
134 | State::Idle { ref parent } => { | |
135 | parent | |
136 | .as_ref() | |
137 | .expect("parent is always available if we are idle") | |
138 | .stopped_at | |
139 | } | |
140 | _ => None, | |
141 | } | |
142 | } | |
143 | ||
144 | /// Set or unset the progress handler. | |
145 | pub fn set_progress_handler(&mut self, handle_progress: Option<F>) { | |
146 | self.handle_progress = handle_progress; | |
147 | } | |
148 | ||
149 | /// Effectively forwards to the parent [StreamingPeekableIter::peek_line()], allowing to see what would be returned | |
150 | /// next on a call to [`read_line()`][io::BufRead::read_line()]. | |
151 | /// | |
152 | /// # Warning | |
153 | /// | |
154 | /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it. | |
155 | pub async fn peek_data_line(&mut self) -> Option<std::io::Result<Result<&[u8], decode::Error>>> { | |
156 | match self.state { | |
157 | State::Idle { ref mut parent } => match parent | |
158 | .as_mut() | |
159 | .expect("parent is always available if we are idle") | |
160 | .peek_line() | |
161 | .await | |
162 | { | |
163 | Some(Ok(Ok(PacketLineRef::Data(line)))) => Some(Ok(Ok(line))), | |
164 | Some(Ok(Err(err))) => Some(Ok(Err(err))), | |
165 | Some(Err(err)) => Some(Err(err)), | |
166 | _ => None, | |
167 | }, | |
168 | _ => None, | |
169 | } | |
170 | } | |
171 | ||
172 | /// Read a packet line as string line. | |
49aad941 | 173 | pub fn read_line_to_string<'b>(&'b mut self, buf: &'b mut String) -> ReadLineFuture<'a, 'b, T, F> { |
0a29b90c FG |
174 | ReadLineFuture { parent: self, buf } |
175 | } | |
176 | ||
177 | /// Read a packet line from the underlying packet reader, returning empty lines if a stop-packetline was reached. | |
178 | /// | |
179 | /// # Warning | |
180 | /// | |
181 | /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it. | |
182 | pub async fn read_data_line(&mut self) -> Option<std::io::Result<Result<PacketLineRef<'_>, decode::Error>>> { | |
183 | match &mut self.state { | |
184 | State::Idle { parent: Some(parent) } => { | |
185 | assert_eq!( | |
186 | self.cap, 0, | |
187 | "we don't support partial buffers right now - read-line must be used consistently" | |
188 | ); | |
189 | parent.read_line().await | |
190 | } | |
191 | _ => None, | |
192 | } | |
193 | } | |
194 | } | |
195 | ||
196 | pub struct ReadDataLineFuture<'a, 'b, T: AsyncRead, F> { | |
197 | parent: &'b mut WithSidebands<'a, T, F>, | |
198 | buf: &'b mut Vec<u8>, | |
199 | } | |
200 | ||
201 | impl<'a, 'b, T, F> Future for ReadDataLineFuture<'a, 'b, T, F> | |
202 | where | |
203 | T: AsyncRead + Unpin, | |
49aad941 | 204 | F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, |
0a29b90c FG |
205 | { |
206 | type Output = std::io::Result<usize>; | |
207 | ||
208 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
209 | assert_eq!( | |
210 | self.parent.cap, 0, | |
211 | "we don't support partial buffers right now - read-line must be used consistently" | |
212 | ); | |
213 | let Self { buf, parent } = &mut *self; | |
214 | let line = ready!(Pin::new(parent).poll_fill_buf(cx))?; | |
215 | buf.clear(); | |
216 | buf.extend_from_slice(line); | |
217 | let bytes = line.len(); | |
218 | self.parent.cap = 0; | |
219 | Poll::Ready(Ok(bytes)) | |
220 | } | |
221 | } | |
222 | ||
223 | pub struct ReadLineFuture<'a, 'b, T: AsyncRead, F> { | |
224 | parent: &'b mut WithSidebands<'a, T, F>, | |
225 | buf: &'b mut String, | |
226 | } | |
227 | ||
228 | impl<'a, 'b, T, F> Future for ReadLineFuture<'a, 'b, T, F> | |
229 | where | |
230 | T: AsyncRead + Unpin, | |
49aad941 | 231 | F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, |
0a29b90c FG |
232 | { |
233 | type Output = std::io::Result<usize>; | |
234 | ||
235 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
236 | assert_eq!( | |
237 | self.parent.cap, 0, | |
238 | "we don't support partial buffers right now - read-line must be used consistently" | |
239 | ); | |
240 | let Self { buf, parent } = &mut *self; | |
241 | let line = std::str::from_utf8(ready!(Pin::new(parent).poll_fill_buf(cx))?) | |
242 | .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; | |
243 | buf.clear(); | |
244 | buf.push_str(line); | |
245 | let bytes = line.len(); | |
246 | self.parent.cap = 0; | |
247 | Poll::Ready(Ok(bytes)) | |
248 | } | |
249 | } | |
250 | ||
251 | impl<'a, T, F> AsyncBufRead for WithSidebands<'a, T, F> | |
252 | where | |
253 | T: AsyncRead + Unpin, | |
49aad941 | 254 | F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, |
0a29b90c FG |
255 | { |
256 | fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> { | |
257 | use std::io; | |
258 | ||
259 | use futures_lite::FutureExt; | |
260 | { | |
261 | let this = self.as_mut().get_mut(); | |
262 | if this.pos >= this.cap { | |
263 | let (ofs, cap) = loop { | |
264 | match this.state { | |
265 | State::Idle { ref mut parent } => { | |
266 | let parent = parent.take().expect("parent to be present here"); | |
267 | let inactive = parent as *mut _; | |
268 | this.state = State::ReadLine { | |
269 | read_line: parent.read_line().boxed_local(), | |
270 | parent_inactive: Some(inactive), | |
271 | } | |
272 | } | |
273 | State::ReadLine { | |
274 | ref mut read_line, | |
275 | ref mut parent_inactive, | |
276 | } => { | |
277 | let line = ready!(read_line.poll(cx)); | |
278 | ||
279 | this.state = { | |
280 | let parent = parent_inactive.take().expect("parent pointer always set"); | |
281 | // SAFETY: It's safe to recover the original mutable reference (from which | |
282 | // the `read_line` future was created as the latter isn't accessible anymore | |
283 | // once the state is set to Idle. In other words, either one or the other are | |
284 | // accessible, never both at the same time. | |
285 | // Also: We keep a pointer around which is protected by borrowcheck since it's created | |
286 | // from a legal mutable reference which is moved into the read_line future - if it was manually | |
287 | // implemented we would be able to re-obtain it from there. | |
288 | #[allow(unsafe_code)] | |
289 | let parent = unsafe { &mut *parent }; | |
290 | State::Idle { parent: Some(parent) } | |
291 | }; | |
292 | ||
293 | let line = match line { | |
294 | Some(line) => line?.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?, | |
295 | None => break (0, 0), | |
296 | }; | |
297 | ||
298 | match this.handle_progress.as_mut() { | |
299 | Some(handle_progress) => { | |
300 | let band = line | |
301 | .decode_band() | |
302 | .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; | |
303 | const ENCODED_BAND: usize = 1; | |
304 | match band { | |
305 | BandRef::Data(d) => { | |
306 | if d.is_empty() { | |
307 | continue; | |
308 | } | |
309 | break (U16_HEX_BYTES + ENCODED_BAND, d.len()); | |
310 | } | |
311 | BandRef::Progress(d) => { | |
312 | let text = TextRef::from(d).0; | |
49aad941 FG |
313 | match handle_progress(false, text) { |
314 | ProgressAction::Continue => {} | |
315 | ProgressAction::Interrupt => { | |
316 | return Poll::Ready(Err(io::Error::new( | |
317 | std::io::ErrorKind::Other, | |
318 | "interrupted by user", | |
319 | ))) | |
320 | } | |
321 | }; | |
0a29b90c FG |
322 | } |
323 | BandRef::Error(d) => { | |
324 | let text = TextRef::from(d).0; | |
49aad941 FG |
325 | match handle_progress(true, text) { |
326 | ProgressAction::Continue => {} | |
327 | ProgressAction::Interrupt => { | |
328 | return Poll::Ready(Err(io::Error::new( | |
329 | io::ErrorKind::Other, | |
330 | "interrupted by user", | |
331 | ))) | |
332 | } | |
333 | }; | |
0a29b90c FG |
334 | } |
335 | }; | |
336 | } | |
337 | None => { | |
338 | break match line.as_slice() { | |
339 | Some(d) => (U16_HEX_BYTES, d.len()), | |
340 | None => { | |
341 | return Poll::Ready(Err(io::Error::new( | |
342 | io::ErrorKind::UnexpectedEof, | |
343 | "encountered non-data line in a data-line only context", | |
344 | ))) | |
345 | } | |
346 | } | |
347 | } | |
348 | } | |
349 | } | |
350 | } | |
351 | }; | |
352 | this.cap = cap + ofs; | |
353 | this.pos = ofs; | |
354 | } | |
355 | } | |
356 | let range = self.pos..self.cap; | |
357 | match &self.get_mut().state { | |
358 | State::Idle { parent } => Poll::Ready(Ok(&parent.as_ref().expect("parent always available").buf[range])), | |
359 | State::ReadLine { .. } => unreachable!("at least in theory"), | |
360 | } | |
361 | } | |
362 | ||
363 | fn consume(self: Pin<&mut Self>, amt: usize) { | |
364 | let this = self.get_mut(); | |
365 | this.pos = std::cmp::min(this.pos + amt, this.cap); | |
366 | } | |
367 | } | |
368 | ||
369 | impl<'a, T, F> AsyncRead for WithSidebands<'a, T, F> | |
370 | where | |
371 | T: AsyncRead + Unpin, | |
49aad941 | 372 | F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, |
0a29b90c FG |
373 | { |
374 | fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> { | |
375 | let nread = { | |
376 | use std::io::Read; | |
377 | let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; | |
378 | rem.read(buf)? | |
379 | }; | |
380 | self.consume(nread); | |
381 | Poll::Ready(Ok(nread)) | |
382 | } | |
383 | } |