]> git.proxmox.com Git - rustc.git/blob - vendor/gix-packetline-blocking/src/read/sidebands/async_io.rs
New upstream version 1.74.1+dfsg1
[rustc.git] / vendor / gix-packetline-blocking / src / read / sidebands / async_io.rs
1 use std::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll},
5 };
6
7 use futures_io::{AsyncBufRead, AsyncRead};
8 use futures_lite::ready;
9
10 use crate::{decode, read::ProgressAction, BandRef, PacketLineRef, StreamingPeekableIter, TextRef, U16_HEX_BYTES};
11
12 type ReadLineResult<'a> = Option<std::io::Result<Result<PacketLineRef<'a>, decode::Error>>>;
13 /// An implementor of [`AsyncBufRead`] yielding packet lines on each call to [`read_line()`][AsyncBufRead::read_line()].
14 /// It's also possible to hide the underlying packet lines using the [`Read`][AsyncRead] implementation which is useful
15 /// if they represent binary data, like the one of a pack file.
16 pub struct WithSidebands<'a, T, F>
17 where
18 T: AsyncRead,
19 {
20 state: State<'a, T>,
21 handle_progress: Option<F>,
22 pos: usize,
23 cap: usize,
24 }
25
26 impl<'a, T, F> Drop for WithSidebands<'a, T, F>
27 where
28 T: AsyncRead,
29 {
30 fn drop(&mut self) {
31 if let State::Idle { ref mut parent } = self.state {
32 parent
33 .as_mut()
34 .expect("parent is always available if we are idle")
35 .reset();
36 }
37 }
38 }
39
40 impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8]) -> ProgressAction>
41 where
42 T: AsyncRead,
43 {
44 /// Create a new instance with the given provider as `parent`.
45 pub fn new(parent: &'a mut StreamingPeekableIter<T>) -> Self {
46 WithSidebands {
47 state: State::Idle { parent: Some(parent) },
48 handle_progress: None,
49 pos: 0,
50 cap: 0,
51 }
52 }
53 }
54
55 enum State<'a, T> {
56 Idle {
57 parent: Option<&'a mut StreamingPeekableIter<T>>,
58 },
59 ReadLine {
60 read_line: Pin<Box<dyn Future<Output = ReadLineResult<'a>> + 'a>>,
61 parent_inactive: Option<*mut StreamingPeekableIter<T>>,
62 },
63 }
64
65 /// # SAFETY
66 /// It's safe because T is `Send` and we have a test that assures that our `StreamingPeekableIter` is `Send` as well,
67 /// hence the `*mut _` is `Send`.
68 /// `read_line` isn't send and we can't declare it as such as it forces `Send` in all places (BUT WHY IS THAT A PROBLEM, I don't recall).
69 /// However, it's only used when pinned and thus isn't actually sent anywhere, it's a secondary state of the future used after it was Send
70 /// to a thread possibly.
71 // TODO: Is it possible to declare it as it should be?
72 #[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
73 unsafe impl<'a, T> Send for State<'a, T> where T: Send {}
74
75 #[cfg(test)]
76 mod tests {
77 use super::*;
78 fn receiver<T: Send>(_i: T) {}
79
80 /// We want to declare items containing pointers of `StreamingPeekableIter` `Send` as well, so it must be `Send` itself.
81 #[test]
82 fn streaming_peekable_iter_is_send() {
83 receiver(StreamingPeekableIter::new(Vec::<u8>::new(), &[]));
84 }
85
86 #[test]
87 fn state_is_send() {
88 let mut s = StreamingPeekableIter::new(Vec::<u8>::new(), &[]);
89 receiver(State::Idle { parent: Some(&mut s) });
90 }
91 }
92
93 impl<'a, T, F> WithSidebands<'a, T, F>
94 where
95 T: AsyncRead + Unpin,
96 F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
97 {
98 /// Create a new instance with the given `parent` provider and the `handle_progress` function.
99 ///
100 /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool`
101 /// being true in case the `text` is to be interpreted as error.
102 pub fn with_progress_handler(parent: &'a mut StreamingPeekableIter<T>, handle_progress: F) -> Self {
103 WithSidebands {
104 state: State::Idle { parent: Some(parent) },
105 handle_progress: Some(handle_progress),
106 pos: 0,
107 cap: 0,
108 }
109 }
110
111 /// Create a new instance without a progress handler.
112 pub fn without_progress_handler(parent: &'a mut StreamingPeekableIter<T>) -> Self {
113 WithSidebands {
114 state: State::Idle { parent: Some(parent) },
115 handle_progress: None,
116 pos: 0,
117 cap: 0,
118 }
119 }
120
121 /// Forwards to the parent [`StreamingPeekableIter::reset_with()`]
122 pub fn reset_with(&mut self, delimiters: &'static [PacketLineRef<'static>]) {
123 if let State::Idle { ref mut parent } = self.state {
124 parent
125 .as_mut()
126 .expect("parent is always available if we are idle")
127 .reset_with(delimiters)
128 }
129 }
130
131 /// Forwards to the parent [`StreamingPeekableIter::stopped_at()`]
132 pub fn stopped_at(&self) -> Option<PacketLineRef<'static>> {
133 match self.state {
134 State::Idle { ref parent } => {
135 parent
136 .as_ref()
137 .expect("parent is always available if we are idle")
138 .stopped_at
139 }
140 _ => None,
141 }
142 }
143
144 /// Set or unset the progress handler.
145 pub fn set_progress_handler(&mut self, handle_progress: Option<F>) {
146 self.handle_progress = handle_progress;
147 }
148
149 /// Effectively forwards to the parent [`StreamingPeekableIter::peek_line()`], allowing to see what would be returned
150 /// next on a call to [`read_line()`][io::BufRead::read_line()].
151 ///
152 /// # Warning
153 ///
154 /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it.
155 pub async fn peek_data_line(&mut self) -> Option<std::io::Result<Result<&[u8], decode::Error>>> {
156 match self.state {
157 State::Idle { ref mut parent } => match parent
158 .as_mut()
159 .expect("parent is always available if we are idle")
160 .peek_line()
161 .await
162 {
163 Some(Ok(Ok(PacketLineRef::Data(line)))) => Some(Ok(Ok(line))),
164 Some(Ok(Err(err))) => Some(Ok(Err(err))),
165 Some(Err(err)) => Some(Err(err)),
166 _ => None,
167 },
168 _ => None,
169 }
170 }
171
172 /// Read a packet line as string line.
173 pub fn read_line_to_string<'b>(&'b mut self, buf: &'b mut String) -> ReadLineFuture<'a, 'b, T, F> {
174 ReadLineFuture { parent: self, buf }
175 }
176
177 /// Read a packet line from the underlying packet reader, returning empty lines if a stop-packetline was reached.
178 ///
179 /// # Warning
180 ///
181 /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it.
182 pub async fn read_data_line(&mut self) -> Option<std::io::Result<Result<PacketLineRef<'_>, decode::Error>>> {
183 match &mut self.state {
184 State::Idle { parent: Some(parent) } => {
185 assert_eq!(
186 self.cap, 0,
187 "we don't support partial buffers right now - read-line must be used consistently"
188 );
189 parent.read_line().await
190 }
191 _ => None,
192 }
193 }
194 }
195
196 pub struct ReadDataLineFuture<'a, 'b, T: AsyncRead, F> {
197 parent: &'b mut WithSidebands<'a, T, F>,
198 buf: &'b mut Vec<u8>,
199 }
200
201 impl<'a, 'b, T, F> Future for ReadDataLineFuture<'a, 'b, T, F>
202 where
203 T: AsyncRead + Unpin,
204 F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
205 {
206 type Output = std::io::Result<usize>;
207
208 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
209 assert_eq!(
210 self.parent.cap, 0,
211 "we don't support partial buffers right now - read-line must be used consistently"
212 );
213 let Self { buf, parent } = &mut *self;
214 let line = ready!(Pin::new(parent).poll_fill_buf(cx))?;
215 buf.clear();
216 buf.extend_from_slice(line);
217 let bytes = line.len();
218 self.parent.cap = 0;
219 Poll::Ready(Ok(bytes))
220 }
221 }
222
223 pub struct ReadLineFuture<'a, 'b, T: AsyncRead, F> {
224 parent: &'b mut WithSidebands<'a, T, F>,
225 buf: &'b mut String,
226 }
227
228 impl<'a, 'b, T, F> Future for ReadLineFuture<'a, 'b, T, F>
229 where
230 T: AsyncRead + Unpin,
231 F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
232 {
233 type Output = std::io::Result<usize>;
234
235 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
236 assert_eq!(
237 self.parent.cap, 0,
238 "we don't support partial buffers right now - read-line must be used consistently"
239 );
240 let Self { buf, parent } = &mut *self;
241 let line = std::str::from_utf8(ready!(Pin::new(parent).poll_fill_buf(cx))?)
242 .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?;
243 buf.clear();
244 buf.push_str(line);
245 let bytes = line.len();
246 self.parent.cap = 0;
247 Poll::Ready(Ok(bytes))
248 }
249 }
250
251 impl<'a, T, F> AsyncBufRead for WithSidebands<'a, T, F>
252 where
253 T: AsyncRead + Unpin,
254 F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
255 {
256 fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
257 use std::io;
258
259 use futures_lite::FutureExt;
260 {
261 let this = self.as_mut().get_mut();
262 if this.pos >= this.cap {
263 let (ofs, cap) = loop {
264 match this.state {
265 State::Idle { ref mut parent } => {
266 let parent = parent.take().expect("parent to be present here");
267 let inactive = parent as *mut _;
268 this.state = State::ReadLine {
269 read_line: parent.read_line().boxed_local(),
270 parent_inactive: Some(inactive),
271 }
272 }
273 State::ReadLine {
274 ref mut read_line,
275 ref mut parent_inactive,
276 } => {
277 let line = ready!(read_line.poll(cx));
278
279 this.state = {
280 let parent = parent_inactive.take().expect("parent pointer always set");
281 // SAFETY: It's safe to recover the original mutable reference (from which
282 // the `read_line` future was created as the latter isn't accessible anymore
283 // once the state is set to Idle. In other words, either one or the other are
284 // accessible, never both at the same time.
285 // Also: We keep a pointer around which is protected by borrowcheck since it's created
286 // from a legal mutable reference which is moved into the read_line future - if it was manually
287 // implemented we would be able to re-obtain it from there.
288 #[allow(unsafe_code)]
289 let parent = unsafe { &mut *parent };
290 State::Idle { parent: Some(parent) }
291 };
292
293 let line = match line {
294 Some(line) => line?.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?,
295 None => break (0, 0),
296 };
297
298 match this.handle_progress.as_mut() {
299 Some(handle_progress) => {
300 let band = line
301 .decode_band()
302 .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
303 const ENCODED_BAND: usize = 1;
304 match band {
305 BandRef::Data(d) => {
306 if d.is_empty() {
307 continue;
308 }
309 break (U16_HEX_BYTES + ENCODED_BAND, d.len());
310 }
311 BandRef::Progress(d) => {
312 let text = TextRef::from(d).0;
313 match handle_progress(false, text) {
314 ProgressAction::Continue => {}
315 ProgressAction::Interrupt => {
316 return Poll::Ready(Err(io::Error::new(
317 std::io::ErrorKind::Other,
318 "interrupted by user",
319 )))
320 }
321 };
322 }
323 BandRef::Error(d) => {
324 let text = TextRef::from(d).0;
325 match handle_progress(true, text) {
326 ProgressAction::Continue => {}
327 ProgressAction::Interrupt => {
328 return Poll::Ready(Err(io::Error::new(
329 io::ErrorKind::Other,
330 "interrupted by user",
331 )))
332 }
333 };
334 }
335 };
336 }
337 None => {
338 break match line.as_slice() {
339 Some(d) => (U16_HEX_BYTES, d.len()),
340 None => {
341 return Poll::Ready(Err(io::Error::new(
342 io::ErrorKind::UnexpectedEof,
343 "encountered non-data line in a data-line only context",
344 )))
345 }
346 }
347 }
348 }
349 }
350 }
351 };
352 this.cap = cap + ofs;
353 this.pos = ofs;
354 }
355 }
356 let range = self.pos..self.cap;
357 match &self.get_mut().state {
358 State::Idle { parent } => Poll::Ready(Ok(&parent.as_ref().expect("parent always available").buf[range])),
359 State::ReadLine { .. } => unreachable!("at least in theory"),
360 }
361 }
362
363 fn consume(self: Pin<&mut Self>, amt: usize) {
364 let this = self.get_mut();
365 this.pos = std::cmp::min(this.pos + amt, this.cap);
366 }
367 }
368
369 impl<'a, T, F> AsyncRead for WithSidebands<'a, T, F>
370 where
371 T: AsyncRead + Unpin,
372 F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
373 {
374 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
375 let nread = {
376 use std::io::Read;
377 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
378 rem.read(buf)?
379 };
380 self.consume(nread);
381 Poll::Ready(Ok(nread))
382 }
383 }