]> git.proxmox.com Git - proxmox-backup.git/blob - src/tools/wrapped_reader_stream.rs
switch from failure to anyhow
[proxmox-backup.git] / src / tools / wrapped_reader_stream.rs
1 use std::io::{self, Read};
2 use std::pin::Pin;
3 use std::task::{Context, Poll};
4 use std::sync::mpsc::Receiver;
5
6
7 use futures::stream::Stream;
8
9 use crate::tools::runtime::block_in_place;
10
11 /// Wrapper struct to convert a Reader into a Stream
12 pub struct WrappedReaderStream<R: Read + Unpin> {
13 reader: R,
14 buffer: Vec<u8>,
15 }
16
17 impl <R: Read + Unpin> WrappedReaderStream<R> {
18
19 pub fn new(reader: R) -> Self {
20 let mut buffer = Vec::with_capacity(64*1024);
21 unsafe { buffer.set_len(buffer.capacity()); }
22 Self { reader, buffer }
23 }
24 }
25
26 impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
27 type Item = Result<Vec<u8>, io::Error>;
28
29 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
30 let this = self.get_mut();
31 match block_in_place(|| this.reader.read(&mut this.buffer)) {
32 Ok(n) => {
33 if n == 0 {
34 // EOF
35 Poll::Ready(None)
36 } else {
37 Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
38 }
39 }
40 Err(err) => Poll::Ready(Some(Err(err))),
41 }
42 }
43 }
44
45
46 /// Wrapper struct to convert a channel Receiver into a Stream
47 pub struct StdChannelStream<T>(pub Receiver<T>);
48
49 impl<T> Stream for StdChannelStream<T> {
50 type Item = T;
51
52 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
53 match block_in_place(|| self.0.recv()) {
54 Ok(data) => Poll::Ready(Some(data)),
55 Err(_) => Poll::Ready(None),// channel closed
56 }
57 }
58 }
59
60 #[cfg(test)]
61 mod test {
62 use std::io;
63
64 use anyhow::Error;
65 use futures::stream::TryStreamExt;
66
67 #[test]
68 fn test_wrapped_stream_reader() -> Result<(), Error> {
69 crate::tools::runtime::main(async {
70 run_wrapped_stream_reader_test().await
71 })
72 }
73
74 struct DummyReader(usize);
75
76 impl io::Read for DummyReader {
77 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
78 self.0 += 1;
79
80 if self.0 >= 10 {
81 return Ok(0);
82 }
83
84 unsafe {
85 std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len());
86 }
87
88 Ok(buf.len())
89 }
90 }
91
92 async fn run_wrapped_stream_reader_test() -> Result<(), Error> {
93 let mut reader = super::WrappedReaderStream::new(DummyReader(0));
94 while let Some(_data) = reader.try_next().await? {
95 // just waiting
96 }
97 Ok(())
98 }
99 }