]> git.proxmox.com Git - proxmox-backup.git/blobdiff - src/bin/proxmox-backup-proxy.rs
Merge branch 'master' of ssh://proxdev.maurer-it.com/rust/proxmox-backup
[proxmox-backup.git] / src / bin / proxmox-backup-proxy.rs
index 2228253d7061fda344acbedd7bf2dcc35e2d45bc..0cfd56d511bbce548b13be4288ede128b60afeee 100644 (file)
@@ -4,8 +4,9 @@ use std::os::unix::io::AsRawFd;
 
 use anyhow::{bail, format_err, Error};
 use futures::*;
-use hyper;
+
 use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
+use tokio_stream::wrappers::ReceiverStream;
 
 use proxmox::try_block;
 use proxmox::api::RpcEnvironmentType;
@@ -122,7 +123,7 @@ async fn run() -> Result<(), Error> {
         |listener, ready| {
 
             let connections = accept_connections(listener, acceptor, debug);
-            let connections = hyper::server::accept::from_stream(connections);
+            let connections = hyper::server::accept::from_stream(ReceiverStream::new(connections));
 
             Ok(ready
                .and_then(|_| hyper::Server::builder(connections)
@@ -163,10 +164,10 @@ async fn run() -> Result<(), Error> {
 }
 
 fn accept_connections(
-    mut listener: tokio::net::TcpListener,
+    listener: tokio::net::TcpListener,
     acceptor: Arc<openssl::ssl::SslAcceptor>,
     debug: bool,
-) -> tokio::sync::mpsc::Receiver<Result<tokio_openssl::SslStream<tokio::net::TcpStream>, Error>> {
+) -> tokio::sync::mpsc::Receiver<Result<std::pin::Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>, Error>> {
 
     const MAX_PENDING_ACCEPTS: usize = 1024;
 
@@ -184,7 +185,24 @@ fn accept_connections(
                     sock.set_nodelay(true).unwrap();
                     let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
                     let acceptor = Arc::clone(&acceptor);
-                    let mut sender = sender.clone();
+
+                    let ssl = match openssl::ssl::Ssl::new(acceptor.context()) {
+                        Ok(ssl) => ssl,
+                        Err(err) => {
+                            eprintln!("failed to create Ssl object from Acceptor context - {}", err);
+                            continue;
+                        },
+                    };
+                    let stream = match tokio_openssl::SslStream::new(ssl, sock) {
+                        Ok(stream) => stream,
+                        Err(err) => {
+                            eprintln!("failed to create SslStream using ssl and connection socket - {}", err);
+                            continue;
+                        },
+                    };
+
+                    let mut stream = Box::pin(stream);
+                    let sender = sender.clone();
 
                     if Arc::strong_count(&accept_counter) > MAX_PENDING_ACCEPTS {
                         eprintln!("connection rejected - to many open connections");
@@ -194,16 +212,14 @@ fn accept_connections(
                     let accept_counter = accept_counter.clone();
                     tokio::spawn(async move {
                         let accept_future = tokio::time::timeout(
-                            Duration::new(10, 0), tokio_openssl::accept(&acceptor, sock));
+                            Duration::new(10, 0), stream.as_mut().accept());
 
                         let result = accept_future.await;
 
                         match result {
-                            Ok(Ok(connection)) => {
-                                if let Err(_) = sender.send(Ok(connection)).await {
-                                    if debug {
-                                        eprintln!("detect closed connection channel");
-                                    }
+                            Ok(Ok(())) => {
+                                if sender.send(Ok(stream)).await.is_err() && debug {
+                                    eprintln!("detect closed connection channel");
                                 }
                             }
                             Ok(Err(err)) => {
@@ -565,16 +581,16 @@ async fn schedule_task_log_rotate() {
         false,
         move |worker| {
             job.start(&worker.upid().to_string())?;
-            worker.log(format!("starting task log rotation"));
+            worker.log("starting task log rotation".to_string());
 
             let result = try_block!({
                 let max_size = 512 * 1024 - 1; // an entry has ~ 100b, so > 5000 entries/file
                 let max_files = 20; // times twenty files gives > 100000 task entries
                 let has_rotated = rotate_task_log_archive(max_size, true, Some(max_files))?;
                 if has_rotated {
-                    worker.log(format!("task log archive was rotated"));
+                    worker.log("task log archive was rotated".to_string());
                 } else {
-                    worker.log(format!("task log archive was not rotated"));
+                    worker.log("task log archive was not rotated".to_string());
                 }
 
                 let max_size = 32 * 1024 * 1024 - 1;
@@ -585,18 +601,18 @@ async fn schedule_task_log_rotate() {
                 if logrotate.rotate(max_size, None, Some(max_files))? {
                     println!("rotated access log, telling daemons to re-open log file");
                     proxmox_backup::tools::runtime::block_on(command_reopen_logfiles())?;
-                    worker.log(format!("API access log was rotated"));
+                    worker.log("API access log was rotated".to_string());
                 } else {
-                    worker.log(format!("API access log was not rotated"));
+                    worker.log("API access log was not rotated".to_string());
                 }
 
                 let mut logrotate = LogRotate::new(buildcfg::API_AUTH_LOG_FN, true)
                         .ok_or_else(|| format_err!("could not get API auth log file names"))?;
 
                 if logrotate.rotate(max_size, None, Some(max_files))? {
-                    worker.log(format!("API authentication log was rotated"));
+                    worker.log("API authentication log was rotated".to_string());
                 } else {
-                    worker.log(format!("API authentication log was not rotated"));
+                    worker.log("API authentication log was not rotated".to_string());
                 }
 
                 Ok(())
@@ -733,7 +749,7 @@ async fn generate_host_stats(save: bool) {
         match datastore::config() {
             Ok((config, _)) => {
                 let datastore_list: Vec<datastore::DataStoreConfig> =
-                    config.convert_to_typed_array("datastore").unwrap_or(Vec::new());
+                    config.convert_to_typed_array("datastore").unwrap_or_default();
 
                 for config in datastore_list {