]> git.proxmox.com Git - proxmox-backup.git/blob - src/tools/wrapped_reader_stream.rs
tools: add tokio::main() replacement
[proxmox-backup.git] / src / tools / wrapped_reader_stream.rs
1 use std::io::{self, Read};
2 use std::pin::Pin;
3 use std::task::{Context, Poll};
4
5 use tokio_executor::threadpool::blocking;
6 use futures::stream::Stream;
7
8 pub struct WrappedReaderStream<R: Read + Unpin> {
9 reader: R,
10 buffer: Vec<u8>,
11 }
12
13 impl <R: Read + Unpin> WrappedReaderStream<R> {
14
15 pub fn new(reader: R) -> Self {
16 let mut buffer = Vec::with_capacity(64*1024);
17 unsafe { buffer.set_len(buffer.capacity()); }
18 Self { reader, buffer }
19 }
20 }
21
22 impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
23 type Item = Result<Vec<u8>, io::Error>;
24
25 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
26 let this = self.get_mut();
27 match blocking(|| this.reader.read(&mut this.buffer)) {
28 Poll::Ready(Ok(Ok(n))) => {
29 if n == 0 {
30 // EOF
31 Poll::Ready(None)
32 } else {
33 Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
34 }
35 }
36 Poll::Ready(Ok(Err(err))) => Poll::Ready(Some(Err(err))),
37 Poll::Ready(Err(err)) => Poll::Ready(Some(Err(io::Error::new(
38 io::ErrorKind::Other,
39 err.to_string(),
40 )))),
41 Poll::Pending => Poll::Pending,
42 }
43 }
44 }
45
46 #[cfg(test)]
47 mod test {
48 use std::io;
49
50 use failure::Error;
51 use futures::stream::TryStreamExt;
52
53 #[test]
54 fn test_wrapped_stream_reader() -> Result<(), Error> {
55 // This cannot be used currently, because it doesn't permit blocking() annotations:
56 crate::tools::runtime::main(async {
57 run_wrapped_stream_reader_test().await
58 })
59 }
60
61 struct DummyReader(usize);
62
63 impl io::Read for DummyReader {
64 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
65 self.0 += 1;
66
67 if self.0 >= 10 {
68 return Ok(0);
69 }
70
71 unsafe {
72 std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len());
73 }
74
75 Ok(buf.len())
76 }
77 }
78
79 async fn run_wrapped_stream_reader_test() -> Result<(), Error> {
80 let mut reader = super::WrappedReaderStream::new(DummyReader(0));
81 while let Some(_data) = reader.try_next().await? {
82 // just waiting
83 }
84 Ok(())
85 }
86 }