]> git.proxmox.com Git - proxmox-backup.git/blob - src/tools/wrapped_reader_stream.rs
tools/wrapped_reader_stream.rs: only implement Stream (without AsyncRead)
[proxmox-backup.git] / src / tools / wrapped_reader_stream.rs
1 use failure::*;
2 use tokio_threadpool;
3 use std::io::Read;
4 use futures::Async;
5 use futures::stream::Stream;
6
7 pub struct WrappedReaderStream<R: Read> {
8 reader: R,
9 }
10
11 impl <R: Read> WrappedReaderStream<R> {
12
13 pub fn new(reader: R) -> Self {
14 Self { reader }
15 }
16 }
17
18 fn blocking_err() -> std::io::Error {
19 std::io::Error::new(
20 std::io::ErrorKind::Other,
21 "`blocking` annotated I/O must be called from the context of the Tokio runtime.")
22 }
23
24 impl <R: Read> Stream for WrappedReaderStream<R> {
25
26 type Item = Vec<u8>;
27 type Error = std::io::Error;
28
29 fn poll(&mut self) -> Result<Async<Option<Vec<u8>>>, std::io::Error> {
30 let mut buf = [0u8;64*1024];
31 match tokio_threadpool::blocking(|| self.reader.read(&mut buf)) {
32 Ok(Async::Ready(Ok(n))) => {
33 if n == 0 { // EOF
34 Ok(Async::Ready(None))
35 } else {
36 Ok(Async::Ready(Some(buf[..n].to_vec())))
37 }
38 },
39 Ok(Async::Ready(Err(err))) => Err(err),
40 Ok(Async::NotReady) => Ok(Async::NotReady),
41 Err(err) => Err(blocking_err()),
42 }
43 }
44 }