]> git.proxmox.com Git - proxmox-backup.git/commitdiff
update a chunk of stuff to the hyper release
authorWolfgang Bumiller <w.bumiller@proxmox.com>
Thu, 12 Dec 2019 14:27:07 +0000 (15:27 +0100)
committerWolfgang Bumiller <w.bumiller@proxmox.com>
Fri, 13 Dec 2019 10:24:41 +0000 (11:24 +0100)
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
23 files changed:
Cargo.toml
src/api2/admin/datastore.rs
src/api2/reader.rs
src/backup/chunk_stream.rs
src/bin/h2client.rs
src/bin/h2s-client.rs
src/bin/h2s-server.rs
src/bin/h2server.rs
src/bin/proxmox-backup-api.rs
src/bin/proxmox-backup-client.rs
src/bin/proxmox-backup-proxy.rs
src/bin/test_chunk_speed2.rs
src/client/backup_writer.rs
src/client/http_client.rs
src/server/command_socket.rs
src/server/rest.rs
src/server/state.rs
src/tools.rs
src/tools/async_io.rs
src/tools/daemon.rs
src/tools/futures.rs
src/tools/runtime.rs
src/tools/wrapped_reader_stream.rs

index 42c4bbcc5b538224ec58d0b3f7d902f3a38b4135..ddfaeaee47bfcfb00febf6701c45f9be72871428 100644 (file)
@@ -10,15 +10,15 @@ path = "src/lib.rs"
 
 [dependencies]
 base64 = "0.10"
-bytes = "0.4"
+bytes = "0.5"
 chrono = "0.4" # Date and time library for Rust
 crc32fast = "1"
 endian_trait = { version = "0.6", features = ["arrays"] }
 failure = "0.1"
-futures-preview = "0.3.0-alpha"
-h2 = { version = "0.2.0-alpha.1", features = ["stream"] }
-http = "0.1"
-hyper = { version = "0.13.0-alpha.1" }
+futures = "0.3"
+h2 = { version = "0.2", features = ["stream"] }
+http = "0.2"
+hyper = "0.13"
 lazy_static = "1.3"
 libc = "0.2"
 log = "0.4"
@@ -35,11 +35,10 @@ serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
 siphasher = "0.3"
 syslog = "4.0"
-tokio = { version = "0.2.0-alpha.4" }
-tokio-executor = { version = "0.2.0-alpha.4" }
-tokio-net = { version = "0.2.0-alpha.4", features = ["signal"] }
-tokio-openssl = "0.4.0-alpha.2"
-tower-service = "0.3.0-alpha.1"
+tokio = { version = "0.2.0", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
+tokio-util = { version = "0.2.0", features = [ "codec" ] }
+tokio-openssl = "0.4.0"
+tower-service = "0.3.0"
 url = "1.7"
 valgrind_request = { version = "1.1", optional = true }
 walkdir = "2"
index 1794309447ea2fc44181f6697a086207d4b659c7..6bf6bea8b2a0fcf68968adaf8dc4154a9a289aba 100644 (file)
@@ -507,8 +507,8 @@ fn download_file(
             .map_err(|err| http_err!(BAD_REQUEST, format!("File open failed: {}", err)))
             .await?;
 
-        let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
-            .map_ok(|bytes| hyper::Chunk::from(bytes.freeze()));
+        let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
+            .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
         let body = Body::wrap_stream(payload);
 
         // fixme: set other headers ?
index 9f425b5c880b0c51f728ce18647a0f53b6f6e0f7..428015263ef0e20ee871234acd1415147ea8b361 100644 (file)
@@ -192,8 +192,8 @@ fn download_file(
 
         env.log(format!("download {:?}", path3));
 
-        let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
-            .map_ok(|bytes| hyper::Chunk::from(bytes.freeze()));
+        let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
+            .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
 
         let body = Body::wrap_stream(payload);
 
@@ -275,8 +275,8 @@ fn download_chunk_old(
         .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
         .and_then(move |file| {
             env2.debug(format!("download chunk {:?}", path3));
-            let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
-                .map_ok(|bytes| hyper::Chunk::from(bytes.freeze()));
+            let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
+                .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
 
             let body = Body::wrap_stream(payload);
 
index 4d0ca6b6f3804f82f558fe0470537d34aaa6610a..2d2a95c735872efda23c9dee6601e0517915566e 100644 (file)
@@ -60,7 +60,7 @@ where
                 None => {
                     this.scan_pos = 0;
                     if this.buffer.len() > 0 {
-                        return Poll::Ready(Some(Ok(this.buffer.take())));
+                        return Poll::Ready(Some(Ok(this.buffer.split())));
                     } else {
                         return Poll::Ready(None);
                     }
@@ -99,7 +99,7 @@ where
         let this = self.get_mut();
         loop {
             if this.buffer.len() == this.chunk_size {
-                return Poll::Ready(Some(Ok(this.buffer.take())));
+                return Poll::Ready(Some(Ok(this.buffer.split())));
             } else if this.buffer.len() > this.chunk_size {
                 let result = this.buffer.split_to(this.chunk_size);
                 return Poll::Ready(Some(Ok(result)));
@@ -112,7 +112,7 @@ where
                 None => {
                     // last chunk can have any size
                     if this.buffer.len() > 0 {
-                        return Poll::Ready(Some(Ok(this.buffer.take())));
+                        return Poll::Ready(Some(Ok(this.buffer.split())));
                     } else {
                         return Poll::Ready(None);
                     }
index 6abb014bbca13b5b888fa6740d7028e7a012ff5c..542ecff0b96b318e8e649b0b3359ce09a87d41dc 100644 (file)
@@ -35,7 +35,7 @@ impl Future for Process {
             } else {
                 match futures::ready!(Pin::new(&mut this.body).poll_next(cx)) {
                     Some(Ok(chunk)) => {
-                        this.body.release_capacity().release_capacity(chunk.len())?;
+                        this.body.flow_control().release_capacity(chunk.len())?;
                         this.bytes += chunk.len();
                         // println!("GOT FRAME {}", chunk.len());
                     },
index 70bb088e812bbd35e336a3681352dcd82e738450..df9a95b80380f23bac8e89774755f7d8f8913738 100644 (file)
@@ -34,7 +34,7 @@ impl Future for Process {
             } else {
                 match futures::ready!(Pin::new(&mut this.body).poll_next(cx)) {
                     Some(Ok(chunk)) => {
-                        this.body.release_capacity().release_capacity(chunk.len())?;
+                        this.body.flow_control().release_capacity(chunk.len())?;
                         this.bytes += chunk.len();
                         // println!("GOT FRAME {}", chunk.len());
                     },
index b8c7926ac2414cd7ca6fcb11884cab9da2acbf54..39483af24446dac1478217817110d15af9ef026b 100644 (file)
@@ -24,12 +24,12 @@ async fn main() -> Result<(), Error> {
 
     let acceptor = Arc::new(acceptor.build());
 
-    let listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
+    let mut listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
 
     println!("listening on {:?}", listener.local_addr());
 
-    let mut incoming = listener.incoming();
-    while let Some(socket) = incoming.try_next().await? {
+    loop {
+        let (socket, _addr) = listener.accept().await?;
         tokio::spawn(handle_connection(socket, Arc::clone(&acceptor))
             .map(|res| {
                 if let Err(err) = res {
@@ -37,8 +37,6 @@ async fn main() -> Result<(), Error> {
                 }
             }));
     }
-
-    Ok(())
 }
 
 async fn handle_connection(
index 8477ec719a109345dfaefd3dfc733cca9f2b9e76..3d602134cd945784a95b8cbc5bae644003e49b11 100644 (file)
@@ -10,12 +10,12 @@ use proxmox_backup::client::pipe_to_stream::PipeToSendStream;
 
 #[tokio::main]
 async fn main() -> Result<(), Error> {
-    let listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
+    let mut listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?;
 
     println!("listening on {:?}", listener.local_addr());
 
-    let mut incoming = listener.incoming();
-    while let Some(socket) = incoming.try_next().await? {
+    loop {
+        let (socket, _addr) = listener.accept().await?;
         tokio::spawn(handle_connection(socket)
             .map(|res| {
                 if let Err(err) = res {
@@ -23,8 +23,6 @@ async fn main() -> Result<(), Error> {
                 }
             }));
     }
-
-    Ok(())
 }
 
 async fn handle_connection<T: AsyncRead + AsyncWrite + Unpin>(socket: T) -> Result<(), Error> {
index 64ca7c42b892955de205808e70465bffa8a68450..9062304a6633fae136eb9c3d1d2a4580efc587cf 100644 (file)
@@ -52,8 +52,9 @@ async fn run() -> Result<(), Error> {
     let server = daemon::create_daemon(
         ([127,0,0,1], 82).into(),
         move |listener, ready| {
+            let incoming = proxmox_backup::tools::async_io::StaticIncoming::from(listener);
             Ok(ready
-                .and_then(|_| hyper::Server::builder(listener.incoming())
+                .and_then(|_| hyper::Server::builder(incoming)
                     .serve(rest_server)
                     .with_graceful_shutdown(server::shutdown_future())
                     .map_err(Error::from)
index 0831c4d6da332a447e61f12034b81ad301c8e02c..4908de8bd750852c9eb2834c90ac1a8f7ec06199 100644 (file)
@@ -186,7 +186,9 @@ async fn backup_directory<P: AsRef<Path>>(
 
     // spawn chunker inside a separate task so that it can run parallel
     tokio::spawn(async move {
-        let _ = tx.send_all(&mut chunk_stream).await;
+        while let Some(v) = chunk_stream.next().await {
+            let _ = tx.send(v).await;
+        }
     });
 
     let stats = client
@@ -210,7 +212,7 @@ async fn backup_image<P: AsRef<Path>>(
 
     let file = tokio::fs::File::open(path).await?;
 
-    let stream = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
+    let stream = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
         .map_err(Error::from);
 
     let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4*1024*1024));
@@ -2443,8 +2445,9 @@ We do not extraxt '.pxar' archives when writing to stdandard output.
 }
 
 fn async_main<F: Future>(fut: F) -> <F as Future>::Output {
-    let rt = tokio::runtime::Runtime::new().unwrap();
+    let mut rt = tokio::runtime::Runtime::new().unwrap();
     let ret = rt.block_on(fut);
-    rt.shutdown_now();
+    // This does not exist anymore. We need to actually stop our runaways instead...
+    // rt.shutdown_now();
     ret
 }
index b6e23883b8ed885f3a51a0bfcc9ea89e3246a08d..d4513cbc95057b59b57917389d660d3cfa1ee118 100644 (file)
@@ -66,10 +66,9 @@ async fn run() -> Result<(), Error> {
     let server = daemon::create_daemon(
         ([0,0,0,0,0,0,0,0], 8007).into(),
         |listener, ready| {
-            let connections = listener
-                .incoming()
+            let connections = proxmox_backup::tools::async_io::StaticIncoming::from(listener)
                 .map_err(Error::from)
-                .try_filter_map(move |sock| {
+                .try_filter_map(move |(sock, _addr)| {
                     let acceptor = Arc::clone(&acceptor);
                     async move {
                         sock.set_nodelay(true).unwrap();
@@ -81,6 +80,7 @@ async fn run() -> Result<(), Error> {
                         )
                     }
                 });
+            let connections = proxmox_backup::tools::async_io::HyperAccept(connections);
 
             Ok(ready
                 .and_then(|_| hyper::Server::builder(connections)
index 3fe7024a33138a8c348f62cb202931ec08cc8661..27f7a1570621200087678356c9c501a8dd98ab2f 100644 (file)
@@ -23,7 +23,7 @@ async fn run() -> Result<(), Error> {
 
     let file = tokio::fs::File::open("random-test.dat").await?;
 
-    let stream = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
+    let stream = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
         .map_ok(|bytes| bytes.to_vec())
         .map_err(Error::from);
 
index ec4d19e63a8a13e3d5c8d5787fa2efff674d167c..381cb31e426c239f1bfed4813c4022a5941d1577 100644 (file)
@@ -267,7 +267,21 @@ impl BackupWriter {
         let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
         let (verify_result_tx, verify_result_rx) = oneshot::channel();
 
-        hyper::rt::spawn(
+        // FIXME: check if this works as expected as replacement for the combinator below?
+        // tokio::spawn(async move {
+        //     let result: Result<(), Error> = (async move {
+        //         while let Some(response) = verify_queue_rx.recv().await {
+        //             match H2Client::h2api_response(response.await?).await {
+        //                 Ok(result) => println!("RESPONSE: {:?}", result),
+        //                 Err(err) => bail!("pipelined request failed: {}", err),
+        //             }
+        //         }
+        //         Ok(())
+        //     }).await;
+        //     let _ignore_closed_channel = verify_result_tx.send(result);
+        // });
+        // old code for reference?
+        tokio::spawn(
             verify_queue_rx
                 .map(Ok::<_, Error>)
                 .try_for_each(|response: h2::client::ResponseFuture| {
@@ -294,7 +308,8 @@ impl BackupWriter {
 
         let h2_2 = h2.clone();
 
-        hyper::rt::spawn(
+        // FIXME: async-block-ify this code!
+        tokio::spawn(
             verify_queue_rx
                 .map(Ok::<_, Error>)
                 .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
@@ -329,7 +344,7 @@ impl BackupWriter {
                             println!("append chunks list len ({})", digest_list.len());
                             let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
                             let request = H2Client::request_builder("localhost", "PUT", &path, None, Some("application/json")).unwrap();
-                            let param_data = bytes::Bytes::from(param.to_string().as_bytes());
+                            let param_data = bytes::Bytes::from(param.to_string().into_bytes());
                             let upload_data = Some(param_data);
                             h2_2.send_request(request, upload_data)
                                 .and_then(move |response| {
@@ -373,12 +388,12 @@ impl BackupWriter {
         }
 
         let mut body = resp.into_body();
-        let mut release_capacity = body.release_capacity().clone();
+        let mut flow_control = body.flow_control().clone();
 
         let mut stream = DigestListDecoder::new(body.map_err(Error::from));
 
         while let Some(chunk) = stream.try_next().await? {
-            let _ = release_capacity.release_capacity(chunk.len());
+            let _ = flow_control.release_capacity(chunk.len());
             println!("GOT DOWNLOAD {}", digest_to_hex(&chunk));
             known_chunks.lock().unwrap().insert(chunk);
         }
@@ -466,7 +481,7 @@ impl BackupWriter {
                     println!("upload new chunk {} ({} bytes, offset {})", digest_str,
                              chunk_info.chunk_len, offset);
 
-                    let chunk_data = chunk_info.chunk.raw_data();
+                    let chunk_data = chunk_info.chunk.into_inner();
                     let param = json!({
                         "wid": wid,
                         "digest": digest_str,
@@ -487,7 +502,7 @@ impl BackupWriter {
                             upload_queue
                                 .send((new_info, Some(response)))
                                 .await
-                                .map_err(Error::from)
+                                .map_err(|err| format_err!("failed to send to upload queue: {}", err))
                         })
                     )
                 } else {
@@ -496,7 +511,7 @@ impl BackupWriter {
                         upload_queue
                             .send((merged_chunk_info, None))
                             .await
-                            .map_err(Error::from)
+                            .map_err(|err| format_err!("failed to send to upload queue: {}", err))
                     })
                 }
             })
index 2399e3ebf758cbf0bf78d30887424f9656039d95..15a13e5c09949c35acffa49af67d52a0d1165518 100644 (file)
@@ -1,4 +1,5 @@
 use std::io::Write;
+use std::task::{Context, Poll};
 
 use chrono::Utc;
 use failure::*;
@@ -329,7 +330,7 @@ impl HttpClient {
         let connection = connection.map(|_| ());
 
         // Spawn a new task to drive the connection state
-        hyper::rt::spawn(connection);
+        tokio::spawn(connection);
 
         // Wait until the `SendRequest` handle has available capacity.
         let c = h2.ready().await?;
@@ -358,10 +359,7 @@ impl HttpClient {
 
     async fn api_response(response: Response<Body>) -> Result<Value, Error> {
         let status = response.status();
-        let data = response
-            .into_body()
-            .try_concat()
-            .await?;
+        let data = hyper::body::to_bytes(response.into_body()).await?;
 
         let text = String::from_utf8(data.to_vec()).unwrap();
         if status.is_success() {
@@ -487,10 +485,9 @@ impl H2Client {
         }
 
         let mut body = resp.into_body();
-        let mut release_capacity = body.release_capacity().clone();
-
-        while let Some(chunk) = body.try_next().await? {
-            let _ = release_capacity.release_capacity(chunk.len());
+        while let Some(chunk) = body.data().await {
+            let chunk = chunk?;
+            body.flow_control().release_capacity(chunk.len())?;
             output.write_all(&chunk)?;
         }
 
@@ -561,18 +558,14 @@ impl H2Client {
 
         let (_head, mut body) = response.into_parts();
 
-        // The `release_capacity` handle allows the caller to manage
-        // flow control.
-        //
-        // Whenever data is received, the caller is responsible for
-        // releasing capacity back to the server once it has freed
-        // the data from memory.
-        let mut release_capacity = body.release_capacity().clone();
-
         let mut data = Vec::new();
-        while let Some(chunk) = body.try_next().await? {
+        while let Some(chunk) = body.data().await {
+            let chunk = chunk?;
+            // Whenever data is received, the caller is responsible for
+            // releasing capacity back to the server once it has freed
+            // the data from memory.
             // Let the server send more data.
-            let _ = release_capacity.release_capacity(chunk.len());
+            body.flow_control().release_capacity(chunk.len())?;
             data.extend(chunk);
         }
 
@@ -632,9 +625,10 @@ impl H2Client {
     }
 }
 
+#[derive(Clone)]
 pub struct HttpsConnector {
     http: HttpConnector,
-    ssl_connector: SslConnector,
+    ssl_connector: std::sync::Arc<SslConnector>,
 }
 
 impl HttpsConnector {
@@ -643,7 +637,7 @@ impl HttpsConnector {
 
         Self {
             http,
-            ssl_connector,
+            ssl_connector: std::sync::Arc::new(ssl_connector),
         }
     }
 }
@@ -653,29 +647,38 @@ type MaybeTlsStream = EitherStream<
     tokio_openssl::SslStream<tokio::net::TcpStream>,
 >;
 
-impl hyper::client::connect::Connect for HttpsConnector {
-    type Transport = MaybeTlsStream;
+impl hyper::service::Service<Uri> for HttpsConnector {
+    type Response = MaybeTlsStream;
     type Error = Error;
-    type Future = Box<dyn Future<Output = Result<(
-        Self::Transport,
-        hyper::client::connect::Connected,
-    ), Error>> + Send + Unpin + 'static>;
-
-    fn connect(&self, dst: hyper::client::connect::Destination) -> Self::Future {
-        let is_https = dst.scheme() == "https";
-        let host = dst.host().to_string();
+    type Future = std::pin::Pin<Box<
+        dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static
+    >>;
 
-        let config = self.ssl_connector.configure();
-        let conn = self.http.connect(dst);
+    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        // This connector is always ready, but others might not be.
+        Poll::Ready(Ok(()))
+    }
 
-        Box::new(Box::pin(async move {
-            let (conn, connected) = conn.await?;
+    fn call(&mut self, dst: Uri) -> Self::Future {
+        let mut this = self.clone();
+        async move {
+            let is_https = dst
+                .scheme()
+                .ok_or_else(|| format_err!("missing URL scheme"))?
+                == "https";
+            let host = dst
+                .host()
+                .ok_or_else(|| format_err!("missing hostname in destination url?"))?
+                .to_string();
+
+            let config = this.ssl_connector.configure();
+            let conn = this.http.call(dst).await?;
             if is_https {
                 let conn = tokio_openssl::connect(config?, &host, conn).await?;
-                Ok((MaybeTlsStream::Right(conn), connected))
+                Ok(MaybeTlsStream::Right(conn))
             } else {
-                Ok((MaybeTlsStream::Left(conn), connected))
+                Ok(MaybeTlsStream::Left(conn))
             }
-        }))
+        }.boxed()
     }
 }
index 0dd3bc0e1b5f8725af0410bb59cf83deec7eaaf2..6baefe192f0062e95d84accd9f0547fc2dc6f51c 100644 (file)
@@ -2,7 +2,7 @@ use failure::*;
 
 use futures::*;
 
-use tokio::net::unix::UnixListener;
+use tokio::net::UnixListener;
 
 use std::path::PathBuf;
 use serde_json::Value;
@@ -11,23 +11,25 @@ use std::os::unix::io::AsRawFd;
 use nix::sys::socket;
 
 /// Listens on a Unix Socket to handle simple command asynchronously
-pub fn create_control_socket<P, F>(path: P, f: F) -> Result<impl Future<Output = ()>, Error>
+pub fn create_control_socket<P, F>(path: P, func: F) -> Result<impl Future<Output = ()>, Error>
 where
     P: Into<PathBuf>,
     F: Fn(Value) -> Result<Value, Error> + Send + Sync + 'static,
 {
     let path: PathBuf = path.into();
 
-    let socket = UnixListener::bind(&path)?;
+    let mut socket = UnixListener::bind(&path)?;
 
-    let f = Arc::new(f);
-    let path2 = Arc::new(path);
-    let path3 = path2.clone();
+    let func = Arc::new(func);
 
-    let control_future = socket.incoming()
-        .map_err(Error::from)
-        .and_then(|conn| {
-            use futures::future::{err, ok};
+    let control_future = async move {
+        loop {
+            let (conn, _addr) = socket
+                .accept()
+                .await
+                .map_err(|err| {
+                    format_err!("failed to accept on control socket {:?}: {}", path, err)
+                })?;
 
             // check permissions (same gid, or root user)
             let opt = socket::sockopt::PeerCredentials {};
@@ -35,28 +37,19 @@ where
                 Ok(cred) => {
                     let mygid = unsafe { libc::getgid() };
                     if !(cred.uid() == 0 || cred.gid() == mygid) {
-                        return err(format_err!("no permissions for {:?}", cred));
+                        bail!("no permissions for {:?}", cred);
                     }
                 }
-                Err(e) => {
-                    return err(format_err!(
-                        "no permissions - unable to read peer credential - {}",
-                        e,
-                    ));
-                }
+                Err(e) => bail!("no permissions - unable to read peer credential - {}", e),
             }
-            ok(conn)
-        })
-        .map_err(move |err| { eprintln!("failed to accept on control socket {:?}: {}", path2, err); })
-        .try_for_each(move |conn| {
-            let f = Arc::clone(&f);
 
-            let (rx, mut tx) = conn.split();
-            let path = path3.clone();
+            let (rx, mut tx) = tokio::io::split(conn);
 
             let abort_future = super::last_worker_future().map(|_| ());
 
             use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
+            let func = Arc::clone(&func);
+            let path = path.clone();
             tokio::spawn(futures::future::select(
                 async move {
                     let mut rx = tokio::io::BufReader::new(rx);
@@ -73,7 +66,7 @@ where
                         }
 
                         let response = match line.parse::<Value>() {
-                            Ok(param) => match f(param) {
+                            Ok(param) => match func(param) {
                                 Ok(res) => format!("OK: {}\n", res),
                                 Err(err) => format!("ERROR: {}\n", err),
                             }
@@ -88,14 +81,14 @@ where
                 }.boxed(),
                 abort_future,
             ).map(|_| ()));
-            futures::future::ok(())
-        });
+        }
+    }.boxed();
 
     let abort_future = super::last_worker_future().map_err(|_| {});
     let task = futures::future::select(
         control_future,
         abort_future,
-    ).map(|_| ());
+    ).map(|_: futures::future::Either<(Result<(), Error>, _), _>| ());
 
     Ok(task)
 }
@@ -112,9 +105,7 @@ pub fn send_command<P>(
 
     tokio::net::UnixStream::connect(path)
         .map_err(move |err| format_err!("control socket connect failed - {}", err))
-        .and_then(move |conn| {
-
-            let (rx, mut tx) = conn.split();
+        .and_then(move |mut conn| {
 
             let mut command_string = params.to_string();
             command_string.push('\n');
@@ -122,9 +113,9 @@ pub fn send_command<P>(
             async move {
                 use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
 
-                tx.write_all(command_string.as_bytes()).await?;
-                tx.shutdown().await?;
-                let mut rx = tokio::io::BufReader::new(rx);
+                conn.write_all(command_string.as_bytes()).await?;
+                AsyncWriteExt::shutdown(&mut conn).await?;
+                let mut rx = tokio::io::BufReader::new(conn);
                 let mut data = String::new();
                 if rx.read_line(&mut data).await? == 0 {
                     bail!("no response");
index e20fa798c5f28f4174df5e4f60c9c9a889135f7a..119883f11440a6b916a95f5620c87a9d1d15d52c 100644 (file)
@@ -1,4 +1,5 @@
 use std::collections::HashMap;
+use std::future::Future;
 use std::hash::BuildHasher;
 use std::path::{Path, PathBuf};
 use std::pin::Pin;
@@ -10,10 +11,10 @@ use futures::future::{self, FutureExt, TryFutureExt};
 use futures::stream::TryStreamExt;
 use hyper::header;
 use hyper::http::request::Parts;
-use hyper::rt::Future;
 use hyper::{Body, Request, Response, StatusCode};
 use serde_json::{json, Value};
 use tokio::fs::File;
+use tokio::time::Instant;
 use url::form_urlencoded;
 
 use proxmox::api::http_err;
@@ -291,7 +292,7 @@ pub async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHasher +
         Err(err) => {
             if let Some(httperr) = err.downcast_ref::<HttpError>() {
                 if httperr.code == StatusCode::UNAUTHORIZED {
-                    tokio::timer::delay(delay_unauth_time).await;
+                    tokio::time::delay_until(Instant::from_std(delay_unauth_time)).await;
                 }
             }
             (formatter.format_error)(err)
@@ -417,8 +418,8 @@ async fn chuncked_static_file_download(filename: PathBuf) -> Result<Response<Bod
         .await
         .map_err(|err| http_err!(BAD_REQUEST, format!("File open failed: {}", err)))?;
 
-    let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
-        .map_ok(|bytes| hyper::Chunk::from(bytes.freeze()));
+    let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
+        .map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
     let body = Body::wrap_stream(payload);
 
     // fixme: set other headers ?
@@ -531,7 +532,7 @@ pub async fn handle_request(api: Arc<ApiConfig>, req: Request<Body>) -> Result<R
                     Err(err) => {
                         // always delay unauthorized calls by 3 seconds (from start of request)
                         let err = http_err!(UNAUTHORIZED, format!("permission check failed - {}", err));
-                        tokio::timer::delay(delay_unauth_time).await;
+                        tokio::time::delay_until(Instant::from_std(delay_unauth_time)).await;
                         return Ok((formatter.format_error)(err));
                     }
                 }
@@ -567,7 +568,7 @@ pub async fn handle_request(api: Arc<ApiConfig>, req: Request<Body>) -> Result<R
                         return Ok(get_index(Some(username), Some(new_token)));
                     }
                     _ => {
-                        tokio::timer::delay(delay_unauth_time).await;
+                        tokio::time::delay_until(Instant::from_std(delay_unauth_time)).await;
                         return Ok(get_index(None, None));
                     }
                 }
index dd1c9fdaacb2f56cc22d7c6aad00440bfc848cf8..251931e32bd369637d321f8faba1010d6c6f63da 100644 (file)
@@ -4,7 +4,7 @@ use std::sync::Mutex;
 
 use futures::*;
 
-use tokio_net::signal::unix::{signal, SignalKind};
+use tokio::signal::unix::{signal, SignalKind};
 
 use crate::tools::{self, BroadcastData};
 
@@ -34,28 +34,30 @@ lazy_static! {
 
 pub fn server_state_init() -> Result<(), Error> {
 
-    let stream = signal(SignalKind::interrupt())?;
+    let mut stream = signal(SignalKind::interrupt())?;
 
-    let future = stream.for_each(|_| {
-        println!("got shutdown request (SIGINT)");
-        SERVER_STATE.lock().unwrap().reload_request = false;
-        tools::request_shutdown();
-        futures::future::ready(())
-    });
+    let future = async move {
+        while stream.recv().await.is_some() {
+            println!("got shutdown request (SIGINT)");
+            SERVER_STATE.lock().unwrap().reload_request = false;
+            tools::request_shutdown();
+        }
+    }.boxed();
 
     let abort_future = last_worker_future().map_err(|_| {});
     let task = futures::future::select(future, abort_future);
 
     tokio::spawn(task.map(|_| ()));
 
-    let stream = signal(SignalKind::hangup())?;
+    let mut stream = signal(SignalKind::hangup())?;
 
-    let future = stream.for_each(|_| {
-        println!("got reload request (SIGHUP)");
-        SERVER_STATE.lock().unwrap().reload_request = true;
-        tools::request_shutdown();
-        futures::future::ready(())
-    });
+    let future = async move {
+        while stream.recv().await.is_some() {
+            println!("got reload request (SIGHUP)");
+            SERVER_STATE.lock().unwrap().reload_request = true;
+            tools::request_shutdown();
+        }
+    }.boxed();
 
     let abort_future = last_worker_future().map_err(|_| {});
     let task = futures::future::select(future, abort_future);
index 38d822fa506b9a2340426d668cc6ca8246e8248f..90cc8f1bb85c2caaf7c58e4058a2e11c5518fe24 100644 (file)
@@ -19,7 +19,6 @@ use proxmox::tools::vec;
 
 pub mod acl;
 pub mod async_io;
-pub mod async_mutex;
 pub mod borrow;
 pub mod daemon;
 pub mod fs;
index 2ce01a68c93421a71d79b889020dd3bce34e8def..c3b9d9359407a8b667d372299763cf0f0535badb 100644 (file)
@@ -1,10 +1,15 @@
 //! Generic AsyncRead/AsyncWrite utilities.
 
 use std::io;
+use std::mem::MaybeUninit;
+use std::os::unix::io::{AsRawFd, RawFd};
 use std::pin::Pin;
 use std::task::{Context, Poll};
 
+use futures::stream::{Stream, TryStream};
 use tokio::io::{AsyncRead, AsyncWrite};
+use tokio::net::TcpListener;
+use hyper::client::connect::Connection;
 
 pub enum EitherStream<L, R> {
     Left(L),
@@ -27,7 +32,7 @@ impl<L: AsyncRead, R: AsyncRead> AsyncRead for EitherStream<L, R> {
         }
     }
 
-    unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
+    unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
         match *self {
             EitherStream::Left(ref s) => s.prepare_uninitialized_buffer(buf),
             EitherStream::Right(ref s) => s.prepare_uninitialized_buffer(buf),
@@ -109,3 +114,83 @@ impl<L: AsyncWrite, R: AsyncWrite> AsyncWrite for EitherStream<L, R> {
         }
     }
 }
+
+// we need this for crate::client::http_client:
+impl Connection for EitherStream<
+    tokio::net::TcpStream,
+    tokio_openssl::SslStream<tokio::net::TcpStream>,
+> {
+    fn connected(&self) -> hyper::client::connect::Connected {
+        match self {
+            EitherStream::Left(s) => s.connected(),
+            EitherStream::Right(s) => s.get_ref().connected(),
+        }
+    }
+}
+
+/// Tokio's `Incoming` now is a reference type and hyper's `AddrIncoming` misses some standard
+/// stuff like `AsRawFd`, so here's something implementing hyper's `Accept` from a `TcpListener`
+pub struct StaticIncoming(TcpListener);
+
+impl From<TcpListener> for StaticIncoming {
+    fn from(inner: TcpListener) -> Self {
+        Self(inner)
+    }
+}
+
+impl AsRawFd for StaticIncoming {
+    fn as_raw_fd(&self) -> RawFd {
+        self.0.as_raw_fd()
+    }
+}
+
+impl hyper::server::accept::Accept for StaticIncoming {
+    type Conn = tokio::net::TcpStream;
+    type Error = std::io::Error;
+
+    fn poll_accept(
+        self: Pin<&mut Self>,
+        cx: &mut Context,
+    ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
+        match self.get_mut().0.poll_accept(cx) {
+            Poll::Pending => Poll::Pending,
+            Poll::Ready(Ok((conn, _addr))) => Poll::Ready(Some(Ok(conn))),
+            Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
+        }
+    }
+}
+
+/// We also implement TryStream for this, as tokio doesn't do this anymore either and we want to be
+/// able to map connections to then add eg. ssl to them. This support code makes the changes
+/// required for hyper 0.13 a bit less annoying to read.
+impl Stream for StaticIncoming {
+    type Item = std::io::Result<(tokio::net::TcpStream, std::net::SocketAddr)>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+        match self.get_mut().0.poll_accept(cx) {
+            Poll::Pending => Poll::Pending,
+            Poll::Ready(result) => Poll::Ready(Some(result)),
+        }
+    }
+}
+
+/// Implement hyper's `Accept` for any `TryStream` of sockets:
+pub struct HyperAccept<T>(pub T);
+
+
+impl<T, I> hyper::server::accept::Accept for HyperAccept<T>
+where
+    T: TryStream<Ok = I>,
+    I: AsyncRead + AsyncWrite,
+{
+    type Conn = I;
+    type Error = T::Error;
+
+    fn poll_accept(
+        self: Pin<&mut Self>,
+        cx: &mut Context,
+    ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
+        let this = unsafe { self.map_unchecked_mut(|this| &mut this.0) };
+        this.try_poll_next(cx)
+    }
+}
index febca8bd187ed0a588f59ca8debbde37babf36c2..0eaf70e88c08a006e31a4ecd87b801f557725c55 100644 (file)
@@ -193,7 +193,6 @@ impl Reloadable for tokio::net::TcpListener {
         fd_change_cloexec(fd, true)?;
         Ok(Self::from_std(
             unsafe { std::net::TcpListener::from_raw_fd(fd) },
-            &tokio_net::driver::Handle::default(),
         )?)
     }
 }
index 19cd2a166cb3d6b8dd30a843062005f6679bf6d0..30d7b7cf15cf779b09ce32c046cd16770372ee02 100644 (file)
@@ -7,8 +7,7 @@ use std::task::{Context, Poll};
 
 use failure::Error;
 use futures::future::FutureExt;
-
-use crate::tools::async_mutex::{AsyncLockGuard, AsyncMutex, LockFuture};
+use tokio::sync::oneshot;
 
 /// Make a future cancellable.
 ///
@@ -42,11 +41,11 @@ use crate::tools::async_mutex::{AsyncLockGuard, AsyncMutex, LockFuture};
 pub struct Cancellable<T: Future + Unpin> {
     /// Our core: we're waiting on a future, on on a lock. The cancel method just unlocks the
     /// lock, so that our LockFuture finishes.
-    inner: futures::future::Select<T, LockFuture<()>>,
+    inner: futures::future::Select<T, oneshot::Receiver<()>>,
 
     /// When this future is created, this holds a guard. When a `Canceller` wants to cancel the
     /// future, it'll drop this guard, causing our inner future to resolve to `None`.
-    guard: Arc<Mutex<Option<AsyncLockGuard<()>>>>,
+    sender: Arc<Mutex<Option<oneshot::Sender<()>>>>,
 }
 
 /// Reference to a cancellable future. Multiple instances may exist simultaneously.
@@ -55,14 +54,14 @@ pub struct Cancellable<T: Future + Unpin> {
 ///
 /// This can be cloned to be used in multiple places.
 #[derive(Clone)]
-pub struct Canceller(Arc<Mutex<Option<AsyncLockGuard<()>>>>);
+pub struct Canceller(Arc<Mutex<Option<oneshot::Sender<()>>>>);
 
 impl Canceller {
     /// Cancel the associated future.
     ///
     /// This does nothing if the future already finished successfully.
     pub fn cancel(&self) {
-        *self.0.lock().unwrap() = None;
+        let _ = self.0.lock().unwrap().take().unwrap().send(());
     }
 }
 
@@ -71,19 +70,20 @@ impl<T: Future + Unpin> Cancellable<T> {
     ///
     /// Returns a future and a `Canceller` which can be cloned and used later to cancel the future.
     pub fn new(inner: T) -> Result<(Self, Canceller), Error> {
-        // we don't even need to sture the mutex...
-        let (mutex, guard) = AsyncMutex::new_locked(())?;
+        // we don't even need to store the mutex...
+        let (tx, rx) = oneshot::channel();
         let this = Self {
-            inner: futures::future::select(inner, mutex.lock()),
-            guard: Arc::new(Mutex::new(Some(guard))),
+            inner: futures::future::select(inner, rx),
+            sender: Arc::new(Mutex::new(Some(tx))),
         };
+
         let canceller = this.canceller();
         Ok((this, canceller))
     }
 
     /// Create another `Canceller` for this future.
     pub fn canceller(&self) -> Canceller {
-        Canceller(self.guard.clone())
+        Canceller(Arc::clone(&self.sender))
     }
 }
 
index 6d9445016534a421f03c22a3e78797388c444e63..f5e2ca92c5b9492f642118118935f58d4bfc5dd6 100644 (file)
@@ -7,7 +7,7 @@ where
     F: Future<Output = T> + Send + 'static,
     T: std::fmt::Debug + Send + 'static,
 {
-    let rt = tokio::runtime::Runtime::new().unwrap();
+    let mut rt = tokio::runtime::Runtime::new().unwrap();
     rt.block_on(async {
         let (tx, rx) = tokio::sync::oneshot::channel();
 
index 2f8bf682c769b5e3452efe8b91fcdffc51111ce8..422b17cdfe9dddb4de41c346c448ee7ec8eff994 100644 (file)
@@ -2,7 +2,7 @@ use std::io::{self, Read};
 use std::pin::Pin;
 use std::task::{Context, Poll};
 
-use tokio_executor::threadpool::blocking;
+use tokio::task::block_in_place;
 use futures::stream::Stream;
 
 pub struct WrappedReaderStream<R: Read + Unpin> {
@@ -24,8 +24,8 @@ impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
 
     fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
         let this = self.get_mut();
-        match blocking(|| this.reader.read(&mut this.buffer)) {
-            Poll::Ready(Ok(Ok(n))) => {
+        match block_in_place(|| this.reader.read(&mut this.buffer)) {
+            Ok(n) => {
                 if n == 0 {
                     // EOF
                     Poll::Ready(None)
@@ -33,12 +33,7 @@ impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
                     Poll::Ready(Some(Ok(this.buffer[..n].to_vec())))
                 }
             }
-            Poll::Ready(Ok(Err(err))) => Poll::Ready(Some(Err(err))),
-            Poll::Ready(Err(err)) => Poll::Ready(Some(Err(io::Error::new(
-                io::ErrorKind::Other,
-                err.to_string(),
-            )))),
-            Poll::Pending => Poll::Pending,
+            Err(err) => Poll::Ready(Some(Err(err))),
         }
     }
 }