]> git.proxmox.com Git - proxmox-backup.git/commitdiff
move channel/stream helpers to pbs-tools
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Tue, 20 Jul 2021 09:26:29 +0000 (11:26 +0200)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Tue, 20 Jul 2021 09:27:40 +0000 (11:27 +0200)
pbs_tools
  ::blocking: std/async wrapping with block_in_place
  ::stream: stream <-> AsyncRead/AsyncWrite wrapping

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
pbs-tools/Cargo.toml
pbs-tools/src/blocking.rs [new file with mode: 0644]
pbs-tools/src/lib.rs
pbs-tools/src/stream.rs [new file with mode: 0644]
src/api2/admin/datastore.rs
src/bin/proxmox-backup-client.rs
src/server/rest.rs
src/tools/async_channel_writer.rs [deleted file]
src/tools/mod.rs
src/tools/wrapped_reader_stream.rs [deleted file]

index 0492338dd940a696f98ed21b5d7cceb9e8fdfb6e..ef20a779ef702b0a2dd0bec9373dc687b48dc6fb 100644 (file)
@@ -32,6 +32,7 @@ walkdir = "2"
 proxmox = { version = "0.11.5", default-features = false, features = [ "tokio" ] }
 
 pbs-buildcfg = { path = "../pbs-buildcfg" }
+pbs-runtime = { path = "../pbs-runtime" }
 
 [dev-dependencies]
 tokio = { version = "1.6", features = [ "macros" ] }
diff --git a/pbs-tools/src/blocking.rs b/pbs-tools/src/blocking.rs
new file mode 100644 (file)
index 0000000..f5828df
--- /dev/null
@@ -0,0 +1,99 @@
+//! Async wrappers for blocking I/O (adding `block_in_place` around channels/readers)
+
+use std::io::{self, Read};
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::sync::mpsc::Receiver;
+
+use futures::stream::Stream;
+
+use pbs_runtime::block_in_place;
+
+/// Wrapper struct to convert a Reader into a Stream
+pub struct WrappedReaderStream<R: Read + Unpin> {
+    reader: R,
+    buffer: Vec<u8>,
+}
+
+impl <R: Read + Unpin> WrappedReaderStream<R> {
+
+    pub fn new(reader: R) -> Self {
+        let mut buffer = Vec::with_capacity(64*1024);
+        unsafe { buffer.set_len(buffer.capacity()); }
+        Self { reader, buffer }
+    }
+}
+
+impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
+    type Item = Result<Vec<u8>, io::Error>;
+
+    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+        match block_in_place(|| this.reader.read(&mut this.buffer)) {
+            Ok(n) => {
+                if n == 0 {
+                    // EOF
+                    Poll::Ready(None)
+                } else {
+                    Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
+                }
+            }
+            Err(err) => Poll::Ready(Some(Err(err))),
+        }
+    }
+}
+
+/// Wrapper struct to convert a channel Receiver into a Stream
+pub struct StdChannelStream<T>(pub Receiver<T>);
+
+impl<T> Stream for StdChannelStream<T> {
+    type Item = T;
+
+    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
+        match block_in_place(|| self.0.recv()) {
+            Ok(data) => Poll::Ready(Some(data)),
+            Err(_) => Poll::Ready(None),// channel closed
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::io;
+
+    use anyhow::Error;
+    use futures::stream::TryStreamExt;
+
+    #[test]
+    fn test_wrapped_stream_reader() -> Result<(), Error> {
+        pbs_runtime::main(async {
+            run_wrapped_stream_reader_test().await
+        })
+    }
+
+    struct DummyReader(usize);
+
+    impl io::Read for DummyReader {
+        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+            self.0 += 1;
+
+            if self.0 >= 10 {
+                return Ok(0);
+            }
+
+            unsafe {
+                std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len());
+            }
+
+            Ok(buf.len())
+        }
+    }
+
+    async fn run_wrapped_stream_reader_test() -> Result<(), Error> {
+        let mut reader = super::WrappedReaderStream::new(DummyReader(0));
+        while let Some(_data) = reader.try_next().await? {
+            // just waiting
+        }
+        Ok(())
+    }
+}
index 28950787e4771562a5a501719f56901656242d1e..c64615fdedbaf92a517e8cad145e7ff43eadefe4 100644 (file)
@@ -1,5 +1,6 @@
 pub mod acl;
 pub mod auth;
+pub mod blocking;
 pub mod borrow;
 pub mod broadcast_future;
 pub mod cert;
@@ -7,12 +8,14 @@ pub mod compression;
 pub mod format;
 pub mod fs;
 pub mod json;
+pub mod lru_cache;
 pub mod nom;
 pub mod ops;
 pub mod percent_encoding;
 pub mod process_locker;
 pub mod sha;
 pub mod str;
+pub mod stream;
 pub mod sync;
 pub mod ticket;
 pub mod tokio;
@@ -20,7 +23,6 @@ pub mod xattr;
 pub mod zip;
 
 pub mod async_lru_cache;
-pub mod lru_cache;
 
 mod command;
 pub use command::{command_output, command_output_as_string, run_command};
diff --git a/pbs-tools/src/stream.rs b/pbs-tools/src/stream.rs
new file mode 100644 (file)
index 0000000..c00c135
--- /dev/null
@@ -0,0 +1,229 @@
+//! Wrappers between async readers and streams.
+
+use std::io::{self, Read};
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use anyhow::{Error, Result};
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+use tokio::sync::mpsc::Sender;
+use futures::ready;
+use futures::future::FutureExt;
+use futures::stream::Stream;
+
+use proxmox::io_format_err;
+use proxmox::tools::byte_buffer::ByteBuffer;
+use proxmox::sys::error::io_err_other;
+
+use pbs_runtime::block_in_place;
+
+/// Wrapper struct to convert a Reader into a Stream
+pub struct WrappedReaderStream<R: Read + Unpin> {
+    reader: R,
+    buffer: Vec<u8>,
+}
+
+impl <R: Read + Unpin> WrappedReaderStream<R> {
+
+    pub fn new(reader: R) -> Self {
+        let mut buffer = Vec::with_capacity(64*1024);
+        unsafe { buffer.set_len(buffer.capacity()); }
+        Self { reader, buffer }
+    }
+}
+
+impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
+    type Item = Result<Vec<u8>, io::Error>;
+
+    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+        match block_in_place(|| this.reader.read(&mut this.buffer)) {
+            Ok(n) => {
+                if n == 0 {
+                    // EOF
+                    Poll::Ready(None)
+                } else {
+                    Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
+                }
+            }
+            Err(err) => Poll::Ready(Some(Err(err))),
+        }
+    }
+}
+
+/// Wrapper struct to convert an AsyncReader into a Stream
+pub struct AsyncReaderStream<R: AsyncRead + Unpin> {
+    reader: R,
+    buffer: Vec<u8>,
+}
+
+impl <R: AsyncRead + Unpin> AsyncReaderStream<R> {
+
+    pub fn new(reader: R) -> Self {
+        let mut buffer = Vec::with_capacity(64*1024);
+        unsafe { buffer.set_len(buffer.capacity()); }
+        Self { reader, buffer }
+    }
+
+    pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self {
+        let mut buffer = Vec::with_capacity(buffer_size);
+        unsafe { buffer.set_len(buffer.capacity()); }
+        Self { reader, buffer }
+    }
+}
+
+impl<R: AsyncRead + Unpin> Stream for AsyncReaderStream<R> {
+    type Item = Result<Vec<u8>, io::Error>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+        let mut read_buf = ReadBuf::new(&mut this.buffer);
+        match ready!(Pin::new(&mut this.reader).poll_read(cx, &mut read_buf)) {
+            Ok(()) => {
+                let n = read_buf.filled().len();
+                if n == 0 {
+                    // EOF
+                    Poll::Ready(None)
+                } else {
+                    Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
+                }
+            }
+            Err(err) => Poll::Ready(Some(Err(err))),
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::io;
+
+    use anyhow::Error;
+    use futures::stream::TryStreamExt;
+
+    #[test]
+    fn test_wrapped_stream_reader() -> Result<(), Error> {
+        pbs_runtime::main(async {
+            run_wrapped_stream_reader_test().await
+        })
+    }
+
+    struct DummyReader(usize);
+
+    impl io::Read for DummyReader {
+        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+            self.0 += 1;
+
+            if self.0 >= 10 {
+                return Ok(0);
+            }
+
+            unsafe {
+                std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len());
+            }
+
+            Ok(buf.len())
+        }
+    }
+
+    async fn run_wrapped_stream_reader_test() -> Result<(), Error> {
+        let mut reader = super::WrappedReaderStream::new(DummyReader(0));
+        while let Some(_data) = reader.try_next().await? {
+            // just waiting
+        }
+        Ok(())
+    }
+}
+
+/// Wrapper around tokio::sync::mpsc::Sender, which implements Write
+pub struct AsyncChannelWriter {
+    sender: Option<Sender<Result<Vec<u8>, Error>>>,
+    buf: ByteBuffer,
+    state: WriterState,
+}
+
+type SendResult = io::Result<Sender<Result<Vec<u8>>>>;
+
+enum WriterState {
+    Ready,
+    Sending(Pin<Box<dyn Future<Output = SendResult> + Send + 'static>>),
+}
+
+impl AsyncChannelWriter {
+    pub fn new(sender: Sender<Result<Vec<u8>, Error>>, buf_size: usize) -> Self {
+        Self {
+            sender: Some(sender),
+            buf: ByteBuffer::with_capacity(buf_size),
+            state: WriterState::Ready,
+        }
+    }
+
+    fn poll_write_impl(
+        &mut self,
+        cx: &mut Context,
+        buf: &[u8],
+        flush: bool,
+    ) -> Poll<io::Result<usize>> {
+        loop {
+            match &mut self.state {
+                WriterState::Ready => {
+                    if flush {
+                        if self.buf.is_empty() {
+                            return Poll::Ready(Ok(0));
+                        }
+                    } else {
+                        let free_size = self.buf.free_size();
+                        if free_size > buf.len() || self.buf.is_empty() {
+                            let count = free_size.min(buf.len());
+                            self.buf.get_free_mut_slice()[..count].copy_from_slice(&buf[..count]);
+                            self.buf.add_size(count);
+                            return Poll::Ready(Ok(count));
+                        }
+                    }
+
+                    let sender = match self.sender.take() {
+                        Some(sender) => sender,
+                        None => return Poll::Ready(Err(io_err_other("no sender"))),
+                    };
+
+                    let data = self.buf.remove_data(self.buf.len()).to_vec();
+                    let future = async move {
+                        sender
+                            .send(Ok(data))
+                            .await
+                            .map(move |_| sender)
+                            .map_err(|err| io_format_err!("could not send: {}", err))
+                    };
+
+                    self.state = WriterState::Sending(future.boxed());
+                }
+                WriterState::Sending(ref mut future) => match ready!(future.as_mut().poll(cx)) {
+                    Ok(sender) => {
+                        self.sender = Some(sender);
+                        self.state = WriterState::Ready;
+                    }
+                    Err(err) => return Poll::Ready(Err(err)),
+                },
+            }
+        }
+    }
+}
+
+impl AsyncWrite for AsyncChannelWriter {
+    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
+        let this = self.get_mut();
+        this.poll_write_impl(cx, buf, false)
+    }
+
+    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
+        let this = self.get_mut();
+        match ready!(this.poll_write_impl(cx, &[], true)) {
+            Ok(_) => Poll::Ready(Ok(())),
+            Err(err) => Poll::Ready(Err(err)),
+        }
+    }
+
+    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
+        self.poll_flush(cx)
+    }
+}
index d1778fd94cf95f9d0c319c35e3ed2c115fdcc105..f3c52413210d773e7e3f7e8c6bc233d83921b6b2 100644 (file)
@@ -27,6 +27,8 @@ use pxar::accessor::aio::Accessor;
 use pxar::EntryKind;
 
 use pbs_client::pxar::create_zip;
+use pbs_tools::blocking::WrappedReaderStream;
+use pbs_tools::stream::{AsyncReaderStream, AsyncChannelWriter};
 use pbs_tools::json::{required_integer_param, required_string_param};
 
 use crate::api2::types::*;
@@ -37,7 +39,6 @@ use crate::config::datastore;
 use crate::config::cached_user_info::CachedUserInfo;
 
 use crate::server::{jobstate::Job, WorkerTask};
-use crate::tools::{AsyncChannelWriter, AsyncReaderStream, WrappedReaderStream};
 
 use crate::config::acl::{
     PRIV_DATASTORE_AUDIT,
index ce9aee7b134c4e7acb95a10c711e4fe01bbb4a8c..6f898e55c31664e91d7d37cc1ce7974ddc798702 100644 (file)
@@ -79,7 +79,6 @@ use pbs_tools::json;
 use proxmox_backup::backup::{
     BufferedDynamicReader,
 };
-use proxmox_backup::tools;
 
 mod proxmox_backup_client;
 use proxmox_backup_client::*;
@@ -487,7 +486,7 @@ fn spawn_catalog_upload(
     encrypt: bool,
 ) -> Result<CatalogUploadResult, Error> {
     let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes
-    let catalog_stream = tools::StdChannelStream(catalog_rx);
+    let catalog_stream = pbs_tools::blocking::StdChannelStream(catalog_rx);
     let catalog_chunk_size = 512*1024;
     let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
 
index 166804849891ba4100b9d308a99bb05d6b1163fa..62b63a5d430b30c0ee2ce4d0e2d37e93a3f893ff 100644 (file)
@@ -31,6 +31,7 @@ use proxmox::api::{
 use proxmox::http_err;
 
 use pbs_tools::compression::{DeflateEncoder, Level};
+use pbs_tools::stream::AsyncReaderStream;
 
 use super::auth::AuthError;
 use super::environment::RestEnvironment;
@@ -42,7 +43,6 @@ use crate::auth_helpers::*;
 use crate::config::cached_user_info::CachedUserInfo;
 use crate::tools;
 use crate::tools::compression::CompressionMethod;
-use crate::tools::AsyncReaderStream;
 use crate::tools::FileLogger;
 
 extern "C" {
diff --git a/src/tools/async_channel_writer.rs b/src/tools/async_channel_writer.rs
deleted file mode 100644 (file)
index f48bd55..0000000
+++ /dev/null
@@ -1,106 +0,0 @@
-use std::future::Future;
-use std::io;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-
-use anyhow::{Error, Result};
-use futures::{future::FutureExt, ready};
-use tokio::io::AsyncWrite;
-use tokio::sync::mpsc::Sender;
-
-use proxmox::io_format_err;
-use proxmox::tools::byte_buffer::ByteBuffer;
-use proxmox::sys::error::io_err_other;
-
-/// Wrapper around tokio::sync::mpsc::Sender, which implements Write
-pub struct AsyncChannelWriter {
-    sender: Option<Sender<Result<Vec<u8>, Error>>>,
-    buf: ByteBuffer,
-    state: WriterState,
-}
-
-type SendResult = io::Result<Sender<Result<Vec<u8>>>>;
-
-enum WriterState {
-    Ready,
-    Sending(Pin<Box<dyn Future<Output = SendResult> + Send + 'static>>),
-}
-
-impl AsyncChannelWriter {
-    pub fn new(sender: Sender<Result<Vec<u8>, Error>>, buf_size: usize) -> Self {
-        Self {
-            sender: Some(sender),
-            buf: ByteBuffer::with_capacity(buf_size),
-            state: WriterState::Ready,
-        }
-    }
-
-    fn poll_write_impl(
-        &mut self,
-        cx: &mut Context,
-        buf: &[u8],
-        flush: bool,
-    ) -> Poll<io::Result<usize>> {
-        loop {
-            match &mut self.state {
-                WriterState::Ready => {
-                    if flush {
-                        if self.buf.is_empty() {
-                            return Poll::Ready(Ok(0));
-                        }
-                    } else {
-                        let free_size = self.buf.free_size();
-                        if free_size > buf.len() || self.buf.is_empty() {
-                            let count = free_size.min(buf.len());
-                            self.buf.get_free_mut_slice()[..count].copy_from_slice(&buf[..count]);
-                            self.buf.add_size(count);
-                            return Poll::Ready(Ok(count));
-                        }
-                    }
-
-                    let sender = match self.sender.take() {
-                        Some(sender) => sender,
-                        None => return Poll::Ready(Err(io_err_other("no sender"))),
-                    };
-
-                    let data = self.buf.remove_data(self.buf.len()).to_vec();
-                    let future = async move {
-                        sender
-                            .send(Ok(data))
-                            .await
-                            .map(move |_| sender)
-                            .map_err(|err| io_format_err!("could not send: {}", err))
-                    };
-
-                    self.state = WriterState::Sending(future.boxed());
-                }
-                WriterState::Sending(ref mut future) => match ready!(future.as_mut().poll(cx)) {
-                    Ok(sender) => {
-                        self.sender = Some(sender);
-                        self.state = WriterState::Ready;
-                    }
-                    Err(err) => return Poll::Ready(Err(err)),
-                },
-            }
-        }
-    }
-}
-
-impl AsyncWrite for AsyncChannelWriter {
-    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
-        let this = self.get_mut();
-        this.poll_write_impl(cx, buf, false)
-    }
-
-    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
-        let this = self.get_mut();
-        match ready!(this.poll_write_impl(cx, &[], true)) {
-            Ok(_) => Poll::Ready(Ok(())),
-            Err(err) => Poll::Ready(Err(err)),
-        }
-    }
-
-    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
-        self.poll_flush(cx)
-    }
-}
index eba9a70cfea2b52e1947a3e6619d4cdf9afa1026..2d2d923ab4d5ae74777f52918f535ad64fbe4358 100644 (file)
@@ -48,12 +48,6 @@ pub mod paperkey;
 pub mod parallel_handler;
 pub use parallel_handler::ParallelHandler;
 
-mod wrapped_reader_stream;
-pub use wrapped_reader_stream::{AsyncReaderStream, StdChannelStream, WrappedReaderStream};
-
-mod async_channel_writer;
-pub use async_channel_writer::AsyncChannelWriter;
-
 mod file_logger;
 pub use file_logger::{FileLogger, FileLogOptions};
 
diff --git a/src/tools/wrapped_reader_stream.rs b/src/tools/wrapped_reader_stream.rs
deleted file mode 100644 (file)
index 6217545..0000000
+++ /dev/null
@@ -1,141 +0,0 @@
-use std::io::{self, Read};
-use std::pin::Pin;
-use std::task::{Context, Poll};
-use std::sync::mpsc::Receiver;
-
-use tokio::io::{AsyncRead, ReadBuf};
-use futures::ready;
-use futures::stream::Stream;
-
-use pbs_runtime::block_in_place;
-
-/// Wrapper struct to convert a Reader into a Stream
-pub struct WrappedReaderStream<R: Read + Unpin> {
-    reader: R,
-    buffer: Vec<u8>,
-}
-
-impl <R: Read + Unpin> WrappedReaderStream<R> {
-
-    pub fn new(reader: R) -> Self {
-        let mut buffer = Vec::with_capacity(64*1024);
-        unsafe { buffer.set_len(buffer.capacity()); }
-        Self { reader, buffer }
-    }
-}
-
-impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
-    type Item = Result<Vec<u8>, io::Error>;
-
-    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
-        let this = self.get_mut();
-        match block_in_place(|| this.reader.read(&mut this.buffer)) {
-            Ok(n) => {
-                if n == 0 {
-                    // EOF
-                    Poll::Ready(None)
-                } else {
-                    Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
-                }
-            }
-            Err(err) => Poll::Ready(Some(Err(err))),
-        }
-    }
-}
-
-/// Wrapper struct to convert an AsyncReader into a Stream
-pub struct AsyncReaderStream<R: AsyncRead + Unpin> {
-    reader: R,
-    buffer: Vec<u8>,
-}
-
-impl <R: AsyncRead + Unpin> AsyncReaderStream<R> {
-
-    pub fn new(reader: R) -> Self {
-        let mut buffer = Vec::with_capacity(64*1024);
-        unsafe { buffer.set_len(buffer.capacity()); }
-        Self { reader, buffer }
-    }
-
-    pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self {
-        let mut buffer = Vec::with_capacity(buffer_size);
-        unsafe { buffer.set_len(buffer.capacity()); }
-        Self { reader, buffer }
-    }
-}
-
-impl<R: AsyncRead + Unpin> Stream for AsyncReaderStream<R> {
-    type Item = Result<Vec<u8>, io::Error>;
-
-    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
-        let this = self.get_mut();
-        let mut read_buf = ReadBuf::new(&mut this.buffer);
-        match ready!(Pin::new(&mut this.reader).poll_read(cx, &mut read_buf)) {
-            Ok(()) => {
-                let n = read_buf.filled().len();
-                if n == 0 {
-                    // EOF
-                    Poll::Ready(None)
-                } else {
-                    Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
-                }
-            }
-            Err(err) => Poll::Ready(Some(Err(err))),
-        }
-    }
-}
-
-/// Wrapper struct to convert a channel Receiver into a Stream
-pub struct StdChannelStream<T>(pub Receiver<T>);
-
-impl<T> Stream for StdChannelStream<T> {
-    type Item = T;
-
-    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
-        match block_in_place(|| self.0.recv()) {
-            Ok(data) => Poll::Ready(Some(data)),
-            Err(_) => Poll::Ready(None),// channel closed
-        }
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use std::io;
-
-    use anyhow::Error;
-    use futures::stream::TryStreamExt;
-
-    #[test]
-    fn test_wrapped_stream_reader() -> Result<(), Error> {
-        pbs_runtime::main(async {
-            run_wrapped_stream_reader_test().await
-        })
-    }
-
-    struct DummyReader(usize);
-
-    impl io::Read for DummyReader {
-        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-            self.0 += 1;
-
-            if self.0 >= 10 {
-                return Ok(0);
-            }
-
-            unsafe {
-                std::ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len());
-            }
-
-            Ok(buf.len())
-        }
-    }
-
-    async fn run_wrapped_stream_reader_test() -> Result<(), Error> {
-        let mut reader = super::WrappedReaderStream::new(DummyReader(0));
-        while let Some(_data) = reader.try_next().await? {
-            // just waiting
-        }
-        Ok(())
-    }
-}