]> git.proxmox.com Git - proxmox-backup.git/commitdiff
tokio 1.0: use ReceiverStream from tokio-stream
authorFabian Grünbichler <f.gruenbichler@proxmox.com>
Mon, 11 Jan 2021 08:50:04 +0000 (09:50 +0100)
committerFabian Grünbichler <f.gruenbichler@proxmox.com>
Thu, 14 Jan 2021 15:01:33 +0000 (16:01 +0100)
to wrap a Receiver in a Stream. this will likely move back into tokio
proper once we have a std Stream..

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
src/api2/admin/datastore.rs
src/bin/proxmox-backup-client.rs
src/bin/proxmox-backup-proxy.rs
src/client/backup_writer.rs

index 32352e5c93bfa9a05d95bfc9ed91f3a16bc6b8af..5b9a1e8440dd1577eb3b700b657aade7adbf3b92 100644 (file)
@@ -10,6 +10,7 @@ use futures::*;
 use hyper::http::request::Parts;
 use hyper::{header, Body, Response, StatusCode};
 use serde_json::{json, Value};
+use tokio_stream::wrappers::ReceiverStream;
 
 use proxmox::api::{
     api, ApiResponseFuture, ApiHandler, ApiMethod, Router,
@@ -1562,7 +1563,7 @@ fn pxar_file_download(
                         .map_err(|err| eprintln!("error during finishing of zip: {}", err))
                 });
 
-                Body::wrap_stream(receiver.map_err(move |err| {
+                Body::wrap_stream(ReceiverStream::new(receiver).map_err(move |err| {
                     eprintln!("error during streaming of zip '{:?}' - {}", filepath, err);
                     err
                 }))
index b8f09a4a9b78109238137dfde52c7575874cd882..d91f04cc6bd65c8bd1e102a2c2529e2f097be477 100644 (file)
@@ -12,6 +12,7 @@ use futures::future::FutureExt;
 use futures::stream::{StreamExt, TryStreamExt};
 use serde_json::{json, Value};
 use tokio::sync::mpsc;
+use tokio_stream::wrappers::ReceiverStream;
 use xdg::BaseDirectories;
 
 use pathpatterns::{MatchEntry, MatchType, PatternFlag};
@@ -306,7 +307,7 @@ async fn backup_directory<P: AsRef<Path>>(
 
     let (mut tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
 
-    let stream = rx
+    let stream = ReceiverStream::new(rx)
         .map_err(Error::from);
 
     // spawn chunker inside a separate task so that it can run parallel
index 2228253d7061fda344acbedd7bf2dcc35e2d45bc..16450244de82f23d65e97217e42f97001e45173d 100644 (file)
@@ -6,6 +6,7 @@ use anyhow::{bail, format_err, Error};
 use futures::*;
 use hyper;
 use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
+use tokio_stream::wrappers::ReceiverStream;
 
 use proxmox::try_block;
 use proxmox::api::RpcEnvironmentType;
@@ -122,7 +123,7 @@ async fn run() -> Result<(), Error> {
         |listener, ready| {
 
             let connections = accept_connections(listener, acceptor, debug);
-            let connections = hyper::server::accept::from_stream(connections);
+            let connections = hyper::server::accept::from_stream(ReceiverStream::new(connections));
 
             Ok(ready
                .and_then(|_| hyper::Server::builder(connections)
index 39cd574d2cf5e9ca8366685202175965cd52f741..bcbd6f28c26e600df83023622e3f5c7fda9f4489 100644 (file)
@@ -10,6 +10,7 @@ use futures::future::AbortHandle;
 use serde_json::{json, Value};
 use tokio::io::AsyncReadExt;
 use tokio::sync::{mpsc, oneshot};
+use tokio_stream::wrappers::ReceiverStream;
 
 use proxmox::tools::digest_to_hex;
 
@@ -321,7 +322,7 @@ impl BackupWriter {
         // });
         // old code for reference?
         tokio::spawn(
-            verify_queue_rx
+            ReceiverStream::new(verify_queue_rx)
                 .map(Ok::<_, Error>)
                 .try_for_each(move |response: h2::client::ResponseFuture| {
                     response
@@ -349,7 +350,7 @@ impl BackupWriter {
 
         // FIXME: async-block-ify this code!
         tokio::spawn(
-            verify_queue_rx
+            ReceiverStream::new(verify_queue_rx)
                 .map(Ok::<_, Error>)
                 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
                     match (response, merged_chunk_info) {