]> git.proxmox.com Git - proxmox-backup.git/blob - pbs-tools/src/blocking.rs
tfa: handle incompatible challenge data
[proxmox-backup.git] / pbs-tools / src / blocking.rs
1 //! Async wrappers for blocking I/O (adding `block_in_place` around channels/readers)
2
3 use std::io::{self, Read};
4 use std::pin::Pin;
5 use std::task::{Context, Poll};
6 use std::sync::mpsc::Receiver;
7
8 use futures::stream::Stream;
9
10 use pbs_runtime::block_in_place;
11
12 /// Wrapper struct to convert a Reader into a Stream
13 pub struct WrappedReaderStream<R: Read + Unpin> {
14 reader: R,
15 buffer: Vec<u8>,
16 }
17
18 impl <R: Read + Unpin> WrappedReaderStream<R> {
19
20 pub fn new(reader: R) -> Self {
21 let mut buffer = Vec::with_capacity(64*1024);
22 unsafe { buffer.set_len(buffer.capacity()); }
23 Self { reader, buffer }
24 }
25 }
26
27 impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
28 type Item = Result<Vec<u8>, io::Error>;
29
30 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
31 let this = self.get_mut();
32 match block_in_place(|| this.reader.read(&mut this.buffer)) {
33 Ok(n) => {
34 if n == 0 {
35 // EOF
36 Poll::Ready(None)
37 } else {
38 Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
39 }
40 }
41 Err(err) => Poll::Ready(Some(Err(err))),
42 }
43 }
44 }
45
46 /// Wrapper struct to convert a channel Receiver into a Stream
47 pub struct StdChannelStream<T>(pub Receiver<T>);
48
49 impl<T> Stream for StdChannelStream<T> {
50 type Item = T;
51
52 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
53 match block_in_place(|| self.0.recv()) {
54 Ok(data) => Poll::Ready(Some(data)),
55 Err(_) => Poll::Ready(None),// channel closed
56 }
57 }
58 }
59
60 #[cfg(test)]
61 mod test {
62 use std::io;
63
64 use anyhow::Error;
65 use futures::stream::TryStreamExt;
66
67 #[test]
68 fn test_wrapped_stream_reader() -> Result<(), Error> {
69 pbs_runtime::main(async {
70 run_wrapped_stream_reader_test().await
71 })
72 }
73
74 struct DummyReader(usize);
75
76 impl io::Read for DummyReader {
77 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
78 self.0 += 1;
79
80 if self.0 >= 10 {
81 return Ok(0);
82 }
83
84 unsafe {
85 std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len());
86 }
87
88 Ok(buf.len())
89 }
90 }
91
92 async fn run_wrapped_stream_reader_test() -> Result<(), Error> {
93 let mut reader = super::WrappedReaderStream::new(DummyReader(0));
94 while let Some(_data) = reader.try_next().await? {
95 // just waiting
96 }
97 Ok(())
98 }
99 }