]> git.proxmox.com Git - proxmox-backup.git/blob - src/tools/wrapped_reader_stream.rs
introduce new runtime tokio helpers
[proxmox-backup.git] / src / tools / wrapped_reader_stream.rs
1 use std::io::{self, Read};
2 use std::pin::Pin;
3 use std::task::{Context, Poll};
4
5 use futures::stream::Stream;
6
7 use crate::tools::runtime::block_in_place;
8
9 pub struct WrappedReaderStream<R: Read + Unpin> {
10 reader: R,
11 buffer: Vec<u8>,
12 }
13
14 impl <R: Read + Unpin> WrappedReaderStream<R> {
15
16 pub fn new(reader: R) -> Self {
17 let mut buffer = Vec::with_capacity(64*1024);
18 unsafe { buffer.set_len(buffer.capacity()); }
19 Self { reader, buffer }
20 }
21 }
22
23 impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
24 type Item = Result<Vec<u8>, io::Error>;
25
26 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
27 let this = self.get_mut();
28 match block_in_place(|| this.reader.read(&mut this.buffer)) {
29 Ok(n) => {
30 if n == 0 {
31 // EOF
32 Poll::Ready(None)
33 } else {
34 Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
35 }
36 }
37 Err(err) => Poll::Ready(Some(Err(err))),
38 }
39 }
40 }
41
42 #[cfg(test)]
43 mod test {
44 use std::io;
45
46 use failure::Error;
47 use futures::stream::TryStreamExt;
48
49 #[test]
50 fn test_wrapped_stream_reader() -> Result<(), Error> {
51 crate::tools::runtime::main(async {
52 run_wrapped_stream_reader_test().await
53 })
54 }
55
56 struct DummyReader(usize);
57
58 impl io::Read for DummyReader {
59 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
60 self.0 += 1;
61
62 if self.0 >= 10 {
63 return Ok(0);
64 }
65
66 unsafe {
67 std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len());
68 }
69
70 Ok(buf.len())
71 }
72 }
73
74 async fn run_wrapped_stream_reader_test() -> Result<(), Error> {
75 let mut reader = super::WrappedReaderStream::new(DummyReader(0));
76 while let Some(_data) = reader.try_next().await? {
77 // just waiting
78 }
79 Ok(())
80 }
81 }