]> git.proxmox.com Git - proxmox.git/commitdiff
rest server: rust fmt
authorThomas Lamprecht <t.lamprecht@proxmox.com>
Wed, 6 Apr 2022 14:55:39 +0000 (16:55 +0200)
committerThomas Lamprecht <t.lamprecht@proxmox.com>
Wed, 6 Apr 2022 14:55:39 +0000 (16:55 +0200)
Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
13 files changed:
proxmox-rest-server/examples/minimal-rest-server.rs
proxmox-rest-server/src/api_config.rs
proxmox-rest-server/src/command_socket.rs
proxmox-rest-server/src/compression.rs
proxmox-rest-server/src/daemon.rs
proxmox-rest-server/src/environment.rs
proxmox-rest-server/src/file_logger.rs
proxmox-rest-server/src/formatter.rs
proxmox-rest-server/src/h2service.rs
proxmox-rest-server/src/lib.rs
proxmox-rest-server/src/rest.rs
proxmox-rest-server/src/state.rs
proxmox-rest-server/src/worker_task.rs

index 7dd6fbcffc1bfcd9e762fb0eef277d5a71b8947c..0b1bfd533ece42071d322a4a64d398164c875d91 100644 (file)
@@ -1,18 +1,20 @@
-use std::sync::Mutex;
 use std::collections::HashMap;
 use std::future::Future;
 use std::pin::Pin;
+use std::sync::Mutex;
 
 use anyhow::{bail, format_err, Error};
-use lazy_static::lazy_static;
-use hyper::{Body, Response, Method};
 use http::request::Parts;
 use http::HeaderMap;
+use hyper::{Body, Method, Response};
+use lazy_static::lazy_static;
 
+use proxmox_router::{
+    list_subdirs_api_method, Router, RpcEnvironmentType, SubdirMap, UserInformation,
+};
 use proxmox_schema::api;
-use proxmox_router::{list_subdirs_api_method, SubdirMap, Router, RpcEnvironmentType, UserInformation};
 
-use proxmox_rest_server::{ServerAdapter, ApiConfig, AuthError, RestServer, RestEnvironment};
+use proxmox_rest_server::{ApiConfig, AuthError, RestEnvironment, RestServer, ServerAdapter};
 
 // Create a Dummy User information system
 struct DummyUserInfo;
@@ -34,13 +36,17 @@ struct MinimalServer;
 
 // implement the server adapter
 impl ServerAdapter for MinimalServer {
-
     // normally this would check and authenticate the user
     fn check_auth(
         &self,
         _headers: &HeaderMap,
         _method: &Method,
-    ) -> Pin<Box<dyn Future<Output = Result<(String, Box<dyn UserInformation + Sync + Send>), AuthError>> + Send>> {
+    ) -> Pin<
+        Box<
+            dyn Future<Output = Result<(String, Box<dyn UserInformation + Sync + Send>), AuthError>>
+                + Send,
+        >,
+    > {
         Box::pin(async move {
             // get some global/cached userinfo
             let userinfo: Box<dyn UserInformation + Sync + Send> = Box::new(DummyUserInfo);
@@ -121,7 +127,12 @@ fn create_item(name: String, value: String) -> Result<(), Error> {
 )]
 /// returns the value of an item
 fn get_item(name: String) -> Result<String, Error> {
-    ITEM_MAP.lock().unwrap().get(&name).map(|s| s.to_string()).ok_or_else(|| format_err!("no such item '{}'", name))
+    ITEM_MAP
+        .lock()
+        .unwrap()
+        .get(&name)
+        .map(|s| s.to_string())
+        .ok_or_else(|| format_err!("no such item '{}'", name))
 }
 
 #[api(
@@ -177,13 +188,9 @@ const SUBDIRS: SubdirMap = &[
         &Router::new()
             .get(&API_METHOD_LIST_ITEMS)
             .post(&API_METHOD_CREATE_ITEM)
-            .match_all("name", &ITEM_ROUTER)
-    ),
-    (
-        "ping",
-        &Router::new()
-            .get(&API_METHOD_PING)
+            .match_all("name", &ITEM_ROUTER),
     ),
+    ("ping", &Router::new().get(&API_METHOD_PING)),
 ];
 
 const ROUTER: Router = Router::new()
@@ -191,7 +198,6 @@ const ROUTER: Router = Router::new()
     .subdirs(SUBDIRS);
 
 async fn run() -> Result<(), Error> {
-
     // we first have to configure the api environment (basedir etc.)
 
     let config = ApiConfig::new(
@@ -204,21 +210,16 @@ async fn run() -> Result<(), Error> {
 
     // then we have to create a daemon that listens, accepts and serves
     // the api to clients
-    proxmox_rest_server::daemon::create_daemon(
-        ([127, 0, 0, 1], 65000).into(),
-        move |listener| {
-            let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?;
+    proxmox_rest_server::daemon::create_daemon(([127, 0, 0, 1], 65000).into(), move |listener| {
+        let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?;
 
-            Ok(async move {
+        Ok(async move {
+            hyper::Server::builder(incoming).serve(rest_server).await?;
 
-                hyper::Server::builder(incoming)
-                    .serve(rest_server)
-                    .await?;
-
-                Ok(())
-            })
-        },
-    ).await?;
+            Ok(())
+        })
+    })
+    .await?;
 
     Ok(())
 }
index ad76a15f0370827c877b11c64674a68c9f2d0e4d..9d257fd1283149456110de64b4916127e3af590e 100644 (file)
@@ -1,22 +1,21 @@
 use std::collections::HashMap;
-use std::path::PathBuf;
-use std::time::SystemTime;
 use std::fs::metadata;
-use std::sync::{Arc, Mutex, RwLock};
+use std::path::PathBuf;
 use std::pin::Pin;
+use std::sync::{Arc, Mutex, RwLock};
+use std::time::SystemTime;
 
-use anyhow::{bail, Error, format_err};
-use hyper::{Method, Body, Response};
+use anyhow::{bail, format_err, Error};
 use hyper::http::request::Parts;
+use hyper::{Body, Method, Response};
 
 use handlebars::Handlebars;
 use serde::Serialize;
 
-use proxmox_sys::fs::{create_path, CreateOptions};
 use proxmox_router::{ApiMethod, Router, RpcEnvironmentType, UserInformation};
+use proxmox_sys::fs::{create_path, CreateOptions};
 
-use crate::{ServerAdapter, AuthError, FileLogger, FileLogOptions, CommandSocket, RestEnvironment};
-
+use crate::{AuthError, CommandSocket, FileLogOptions, FileLogger, RestEnvironment, ServerAdapter};
 
 /// REST server configuration
 pub struct ApiConfig {
@@ -87,12 +86,10 @@ impl ApiConfig {
         method: Method,
         uri_param: &mut HashMap<String, String>,
     ) -> Option<&'static ApiMethod> {
-
         self.router.find_method(components, method, uri_param)
     }
 
     pub(crate) fn find_alias(&self, components: &[&str]) -> PathBuf {
-
         let mut prefix = String::new();
         let mut filename = self.basedir.clone();
         let comp_len = components.len();
@@ -100,7 +97,10 @@ impl ApiConfig {
             prefix.push_str(components[0]);
             if let Some(subdir) = self.aliases.get(&prefix) {
                 filename.push(subdir);
-                components.iter().skip(1).for_each(|comp| filename.push(comp));
+                components
+                    .iter()
+                    .skip(1)
+                    .for_each(|comp| filename.push(comp));
             } else {
                 components.iter().for_each(|comp| filename.push(comp));
             }
@@ -121,8 +121,9 @@ impl ApiConfig {
     /// # }
     /// ```
     pub fn add_alias<S, P>(&mut self, alias: S, path: P)
-        where S: Into<String>,
-              P: Into<PathBuf>,
+    where
+        S: Into<String>,
+        P: Into<PathBuf>,
     {
         self.aliases.insert(alias.into(), path.into());
     }
@@ -136,7 +137,7 @@ impl ApiConfig {
     /// Those templates cane be use with [render_template](Self::render_template) to generate pages.
     pub fn register_template<P>(&self, name: &str, path: P) -> Result<(), Error>
     where
-        P: Into<PathBuf>
+        P: Into<PathBuf>,
     {
         if self.template_files.read().unwrap().contains_key(name) {
             bail!("template already registered");
@@ -146,8 +147,14 @@ impl ApiConfig {
         let metadata = metadata(&path)?;
         let mtime = metadata.modified()?;
 
-        self.templates.write().unwrap().register_template_file(name, &path)?;
-        self.template_files.write().unwrap().insert(name.to_string(), (mtime, path));
+        self.templates
+            .write()
+            .unwrap()
+            .register_template_file(name, &path)?;
+        self.template_files
+            .write()
+            .unwrap()
+            .insert(name.to_string(), (mtime, path));
 
         Ok(())
     }
@@ -162,11 +169,18 @@ impl ApiConfig {
         let mtime;
         {
             let template_files = self.template_files.read().unwrap();
-            let (old_mtime, old_path) = template_files.get(name).ok_or_else(|| format_err!("template not found"))?;
+            let (old_mtime, old_path) = template_files
+                .get(name)
+                .ok_or_else(|| format_err!("template not found"))?;
 
             mtime = metadata(old_path)?.modified()?;
             if mtime <= *old_mtime {
-                return self.templates.read().unwrap().render(name, data).map_err(|err| format_err!("{}", err));
+                return self
+                    .templates
+                    .read()
+                    .unwrap()
+                    .render(name, data)
+                    .map_err(|err| format_err!("{}", err));
             }
             path = old_path.to_path_buf();
         }
@@ -178,7 +192,9 @@ impl ApiConfig {
             templates.register_template_file(name, &path)?;
             template_files.insert(name.to_string(), (mtime, path));
 
-            templates.render(name, data).map_err(|err| format_err!("{}", err))
+            templates
+                .render(name, data)
+                .map_err(|err| format_err!("{}", err))
         }
     }
 
@@ -195,7 +211,7 @@ impl ApiConfig {
         commando_sock: &mut CommandSocket,
     ) -> Result<(), Error>
     where
-        P: Into<PathBuf>
+        P: Into<PathBuf>,
     {
         let path: PathBuf = path.into();
         if let Some(base) = path.parent() {
@@ -234,7 +250,7 @@ impl ApiConfig {
         commando_sock: &mut CommandSocket,
     ) -> Result<(), Error>
     where
-        P: Into<PathBuf>
+        P: Into<PathBuf>,
     {
         let path: PathBuf = path.into();
         if let Some(base) = path.parent() {
index 46814c4f0f2fb61b5dabdfdfc1d369bc986d08bd..a7fc75764fa418119c697bec859e5db4f178ff64 100644 (file)
@@ -2,18 +2,22 @@ use anyhow::{bail, format_err, Error};
 
 use std::collections::HashMap;
 use std::os::unix::io::AsRawFd;
-use std::path::{PathBuf, Path};
+use std::path::{Path, PathBuf};
 use std::sync::Arc;
 
 use futures::*;
-use tokio::net::UnixListener;
-use serde::Serialize;
-use serde_json::Value;
 use nix::sys::socket;
 use nix::unistd::Gid;
+use serde::Serialize;
+use serde_json::Value;
+use tokio::net::UnixListener;
 
 // Listens on a Unix Socket to handle simple command asynchronously
-fn create_control_socket<P, F>(path: P, gid: Gid, func: F) -> Result<impl Future<Output = ()>, Error>
+fn create_control_socket<P, F>(
+    path: P,
+    gid: Gid,
+    func: F,
+) -> Result<impl Future<Output = ()>, Error>
 where
     P: Into<PathBuf>,
     F: Fn(Value) -> Result<Value, Error> + Send + Sync + 'static,
@@ -59,45 +63,57 @@ where
             use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
             let func = Arc::clone(&func);
             let path = path.clone();
-            tokio::spawn(futures::future::select(
-                async move {
-                    let mut rx = tokio::io::BufReader::new(rx);
-                    let mut line = String::new();
-                    loop {
-                        line.clear();
-                        match rx.read_line({ line.clear(); &mut line }).await {
-                            Ok(0) => break,
-                            Ok(_) => (),
-                            Err(err) => {
-                                eprintln!("control socket {:?} read error: {}", path, err);
-                                return;
+            tokio::spawn(
+                futures::future::select(
+                    async move {
+                        let mut rx = tokio::io::BufReader::new(rx);
+                        let mut line = String::new();
+                        loop {
+                            line.clear();
+                            match rx
+                                .read_line({
+                                    line.clear();
+                                    &mut line
+                                })
+                                .await
+                            {
+                                Ok(0) => break,
+                                Ok(_) => (),
+                                Err(err) => {
+                                    eprintln!("control socket {:?} read error: {}", path, err);
+                                    return;
+                                }
                             }
-                        }
 
-                        let response = match line.parse::<Value>() {
-                            Ok(param) => match func(param) {
-                                Ok(res) => format!("OK: {}\n", res),
+                            let response = match line.parse::<Value>() {
+                                Ok(param) => match func(param) {
+                                    Ok(res) => format!("OK: {}\n", res),
+                                    Err(err) => format!("ERROR: {}\n", err),
+                                },
                                 Err(err) => format!("ERROR: {}\n", err),
-                            }
-                            Err(err) => format!("ERROR: {}\n", err),
-                        };
+                            };
 
-                        if let Err(err) = tx.write_all(response.as_bytes()).await {
-                            eprintln!("control socket {:?} write response error: {}", path, err);
-                            return;
+                            if let Err(err) = tx.write_all(response.as_bytes()).await {
+                                eprintln!(
+                                    "control socket {:?} write response error: {}",
+                                    path, err
+                                );
+                                return;
+                            }
                         }
                     }
-                }.boxed(),
-                abort_future,
-            ).map(|_| ()));
+                    .boxed(),
+                    abort_future,
+                )
+                .map(|_| ()),
+            );
         }
-    }.boxed();
+    }
+    .boxed();
 
     let abort_future = crate::last_worker_future().map_err(|_| {});
-    let task = futures::future::select(
-        control_future,
-        abort_future,
-    ).map(|_: futures::future::Either<(Result<(), Error>, _), _>| ());
+    let task = futures::future::select(control_future, abort_future)
+        .map(|_: futures::future::Either<(Result<(), Error>, _), _>| ());
 
     Ok(task)
 }
@@ -148,7 +164,8 @@ where
 }
 
 // A callback for a specific commando socket.
-type CommandSocketFn = Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
+type CommandSocketFn =
+    Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
 
 /// Tooling to get a single control command socket where one can
 /// register multiple commands dynamically.
@@ -164,7 +181,8 @@ pub struct CommandSocket {
 impl CommandSocket {
     /// Creates a new instance.
     pub fn new<P>(path: P, gid: Gid) -> Self
-        where P: Into<PathBuf>,
+    where
+        P: Into<PathBuf>,
     {
         CommandSocket {
             socket: path.into(),
@@ -176,29 +194,30 @@ impl CommandSocket {
     /// Spawn the socket and consume self, meaning you cannot register commands anymore after
     /// calling this.
     pub fn spawn(self) -> Result<(), Error> {
-        let control_future = create_control_socket(self.socket.to_owned(), self.gid, move |param| {
-            let param = param
-                .as_object()
-                .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
-
-            let command = match param.get("command") {
-                Some(Value::String(command)) => command.as_str(),
-                None => bail!("no command"),
-                _ => bail!("unable to parse command"),
-            };
-
-            if !self.commands.contains_key(command) {
-                bail!("got unknown command '{}'", command);
-            }
+        let control_future =
+            create_control_socket(self.socket.to_owned(), self.gid, move |param| {
+                let param = param.as_object().ok_or_else(|| {
+                    format_err!("unable to parse parameters (expected json object)")
+                })?;
+
+                let command = match param.get("command") {
+                    Some(Value::String(command)) => command.as_str(),
+                    None => bail!("no command"),
+                    _ => bail!("unable to parse command"),
+                };
+
+                if !self.commands.contains_key(command) {
+                    bail!("got unknown command '{}'", command);
+                }
 
-            match self.commands.get(command) {
-                None => bail!("got unknown command '{}'", command),
-                Some(handler) => {
-                    let args = param.get("args"); //.unwrap_or(&Value::Null);
-                    (handler)(args)
-                },
-            }
-        })?;
+                match self.commands.get(command) {
+                    None => bail!("got unknown command '{}'", command),
+                    Some(handler) => {
+                        let args = param.get("args"); //.unwrap_or(&Value::Null);
+                        (handler)(args)
+                    }
+                }
+            })?;
 
         tokio::spawn(control_future);
 
@@ -206,15 +225,10 @@ impl CommandSocket {
     }
 
     /// Register a new command with a callback.
-    pub fn register_command<F>(
-        &mut self,
-        command: String,
-        handler: F,
-    ) -> Result<(), Error>
+    pub fn register_command<F>(&mut self, command: String, handler: F) -> Result<(), Error>
     where
         F: Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static,
     {
-
         if self.commands.contains_key(&command) {
             bail!("command '{}' already exists!", command);
         }
index 19626efc80155d8bf947c1181d65773a33784cde..189d7041cabb0cb7691e050fb815dcafccdc1b2a 100644 (file)
@@ -5,8 +5,8 @@ use hyper::header;
 #[derive(Eq, Ord, PartialEq, PartialOrd, Debug)]
 pub enum CompressionMethod {
     Deflate,
-//    Gzip,
-//    Brotli,
+    //    Gzip,
+    //    Brotli,
 }
 
 impl CompressionMethod {
@@ -16,8 +16,8 @@ impl CompressionMethod {
 
     pub fn extension(&self) -> &'static str {
         match *self {
-//            CompressionMethod::Brotli => "br",
-//            CompressionMethod::Gzip => "gzip",
+            //            CompressionMethod::Brotli => "br",
+            //            CompressionMethod::Gzip => "gzip",
             CompressionMethod::Deflate => "deflate",
         }
     }
@@ -28,8 +28,8 @@ impl std::str::FromStr for CompressionMethod {
 
     fn from_str(s: &str) -> Result<Self, Self::Err> {
         match s {
-//            "br" => Ok(CompressionMethod::Brotli),
-//            "gzip" => Ok(CompressionMethod::Gzip),
+            //            "br" => Ok(CompressionMethod::Brotli),
+            //            "gzip" => Ok(CompressionMethod::Gzip),
             "deflate" => Ok(CompressionMethod::Deflate),
             // http accept-encoding allows to give weights with ';q='
             other if other.starts_with("deflate;q=") => Ok(CompressionMethod::Deflate),
index 973cd30656b42ca327f3c7b8e47a0b750de1d32b..4e09118d6d9c54db2dbb64bd3a5465d38c4a71f0 100644 (file)
@@ -3,9 +3,9 @@
 use std::ffi::CString;
 use std::future::Future;
 use std::io::{Read, Write};
-use std::os::raw::{c_char, c_uchar, c_int};
-use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::os::raw::{c_char, c_int, c_uchar};
 use std::os::unix::ffi::OsStrExt;
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
 use std::panic::UnwindSafe;
 use std::path::PathBuf;
 
@@ -13,8 +13,8 @@ use anyhow::{bail, format_err, Error};
 use futures::future::{self, Either};
 use nix::unistd::{fork, ForkResult};
 
-use proxmox_sys::fd::{fd_change_cloexec, Fd};
 use proxmox_io::{ReadExt, WriteExt};
+use proxmox_sys::fd::{fd_change_cloexec, Fd};
 
 // Unfortunately FnBox is nightly-only and Box<FnOnce> is unusable, so just use Box<Fn>...
 type BoxedStoreFunc = Box<dyn FnMut() -> Result<String, Error> + UnwindSafe + Send>;
@@ -102,9 +102,8 @@ impl Reloader {
                         // At this point we call pre-exec helpers. We must be certain that if they fail for
                         // whatever reason we can still call `_exit()`, so use catch_unwind.
                         match std::panic::catch_unwind(move || {
-                            let mut pnew = unsafe {
-                                std::fs::File::from_raw_fd(pnew.into_raw_fd())
-                            };
+                            let mut pnew =
+                                unsafe { std::fs::File::from_raw_fd(pnew.into_raw_fd()) };
                             let pid = nix::unistd::Pid::this();
                             if let Err(e) = unsafe { pnew.write_host_value(pid.as_raw()) } {
                                 log::error!("failed to send new server PID to parent: {}", e);
@@ -125,16 +124,19 @@ impl Reloader {
                             std::mem::drop(pnew);
 
                             // Try to reopen STDOUT/STDERR journald streams to get correct PID in logs
-                            let ident = CString::new(self.self_exe.file_name().unwrap().as_bytes()).unwrap();
+                            let ident = CString::new(self.self_exe.file_name().unwrap().as_bytes())
+                                .unwrap();
                             let ident = ident.as_bytes();
-                            let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_INFO, 1) };
+                            let fd =
+                                unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_INFO, 1) };
                             if fd >= 0 && fd != 1 {
                                 let fd = proxmox_sys::fd::Fd(fd); // add drop handler
                                 nix::unistd::dup2(fd.as_raw_fd(), 1)?;
                             } else {
                                 log::error!("failed to update STDOUT journal redirection ({})", fd);
                             }
-                            let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_ERR, 1) };
+                            let fd =
+                                unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_ERR, 1) };
                             if fd >= 0 && fd != 2 {
                                 let fd = proxmox_sys::fd::Fd(fd); // add drop handler
                                 nix::unistd::dup2(fd.as_raw_fd(), 2)?;
@@ -143,8 +145,7 @@ impl Reloader {
                             }
 
                             self.do_reexec(new_args)
-                        })
-                        {
+                        }) {
                             Ok(Ok(())) => log::error!("do_reexec returned!"),
                             Ok(Err(err)) => log::error!("do_reexec failed: {}", err),
                             Err(_) => log::error!("panic in re-exec"),
@@ -157,20 +158,22 @@ impl Reloader {
                     Err(e) => log::error!("fork() failed, restart delayed: {}", e),
                 }
                 // No matter how we managed to get here, this is the time where we bail out quickly:
-                unsafe {
-                    libc::_exit(-1)
-                }
+                unsafe { libc::_exit(-1) }
             }
             Ok(ForkResult::Parent { child }) => {
-                log::debug!("forked off a new server (first pid: {}), waiting for 2nd pid", child);
+                log::debug!(
+                    "forked off a new server (first pid: {}), waiting for 2nd pid",
+                    child
+                );
                 std::mem::drop(pnew);
-                let mut pold = unsafe {
-                    std::fs::File::from_raw_fd(pold.into_raw_fd())
-                };
+                let mut pold = unsafe { std::fs::File::from_raw_fd(pold.into_raw_fd()) };
                 let child = nix::unistd::Pid::from_raw(match unsafe { pold.read_le_value() } {
                     Ok(v) => v,
                     Err(e) => {
-                        log::error!("failed to receive pid of double-forked child process: {}", e);
+                        log::error!(
+                            "failed to receive pid of double-forked child process: {}",
+                            e
+                        );
                         // systemd will complain but won't kill the service...
                         return Ok(());
                     }
@@ -215,9 +218,10 @@ impl Reloadable for tokio::net::TcpListener {
     // FIXME: We could become "independent" of the TcpListener and its reference to the file
     // descriptor by `dup()`ing it (and check if the listener still exists via kcmp()?)
     fn get_store_func(&self) -> Result<BoxedStoreFunc, Error> {
-        let mut fd_opt = Some(Fd(
-            nix::fcntl::fcntl(self.as_raw_fd(), nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0))?
-        ));
+        let mut fd_opt = Some(Fd(nix::fcntl::fcntl(
+            self.as_raw_fd(),
+            nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0),
+        )?));
         Ok(Box::new(move || {
             let fd = fd_opt.take().unwrap();
             fd_change_cloexec(fd.as_raw_fd(), false)?;
@@ -226,13 +230,13 @@ impl Reloadable for tokio::net::TcpListener {
     }
 
     fn restore(var: &str) -> Result<Self, Error> {
-        let fd = var.parse::<u32>()
-            .map_err(|e| format_err!("invalid file descriptor: {}", e))?
-            as RawFd;
+        let fd = var
+            .parse::<u32>()
+            .map_err(|e| format_err!("invalid file descriptor: {}", e))? as RawFd;
         fd_change_cloexec(fd, true)?;
-        Ok(Self::from_std(
-            unsafe { std::net::TcpListener::from_raw_fd(fd) },
-        )?)
+        Ok(Self::from_std(unsafe {
+            std::net::TcpListener::from_raw_fd(fd)
+        })?)
     }
 }
 
@@ -253,10 +257,11 @@ where
 {
     let mut reloader = Reloader::new()?;
 
-    let listener: tokio::net::TcpListener = reloader.restore(
-        "PROXMOX_BACKUP_LISTEN_FD",
-        move || async move { Ok(tokio::net::TcpListener::bind(&address).await?) },
-    ).await?;
+    let listener: tokio::net::TcpListener = reloader
+        .restore("PROXMOX_BACKUP_LISTEN_FD", move || async move {
+            Ok(tokio::net::TcpListener::bind(&address).await?)
+        })
+        .await?;
 
     let service = create_service(listener)?;
 
@@ -308,7 +313,11 @@ where
 
 #[link(name = "systemd")]
 extern "C" {
-    fn sd_journal_stream_fd(identifier: *const c_uchar, priority: c_int, level_prefix: c_int) -> c_int;
+    fn sd_journal_stream_fd(
+        identifier: *const c_uchar,
+        priority: c_int,
+        level_prefix: c_int,
+    ) -> c_int;
     fn sd_notify(unset_environment: c_int, state: *const c_char) -> c_int;
     fn sd_notify_barrier(unset_environment: c_int, timeout: u64) -> c_int;
 }
@@ -328,7 +337,7 @@ pub fn systemd_notify(state: SystemdNotify) -> Result<(), Error> {
         SystemdNotify::Ready => {
             log::info!("service is ready");
             CString::new("READY=1")
-        },
+        }
         SystemdNotify::Reloading => CString::new("RELOADING=1"),
         SystemdNotify::Stopping => CString::new("STOPPING=1"),
         SystemdNotify::Status(msg) => CString::new(format!("STATUS={}", msg)),
@@ -336,7 +345,10 @@ pub fn systemd_notify(state: SystemdNotify) -> Result<(), Error> {
     }?;
     let rc = unsafe { sd_notify(0, message.as_ptr()) };
     if rc < 0 {
-        bail!("systemd_notify failed: {}", std::io::Error::from_raw_os_error(-rc));
+        bail!(
+            "systemd_notify failed: {}",
+            std::io::Error::from_raw_os_error(-rc)
+        );
     }
 
     Ok(())
@@ -346,7 +358,10 @@ pub fn systemd_notify(state: SystemdNotify) -> Result<(), Error> {
 pub fn systemd_notify_barrier(timeout: u64) -> Result<(), Error> {
     let rc = unsafe { sd_notify_barrier(0, timeout) };
     if rc < 0 {
-        bail!("systemd_notify_barrier failed: {}", std::io::Error::from_raw_os_error(-rc));
+        bail!(
+            "systemd_notify_barrier failed: {}",
+            std::io::Error::from_raw_os_error(-rc)
+        );
     }
     Ok(())
 }
index b1fa3a33d39eabc368633bccffbf4b0d50a3150d..b4dff76b142e1fdf5f3381cdcf3c8e8370f87b09 100644 (file)
@@ -1,5 +1,5 @@
-use std::sync::Arc;
 use std::net::SocketAddr;
+use std::sync::Arc;
 
 use serde_json::{json, Value};
 
@@ -42,13 +42,19 @@ impl RestEnvironment {
     pub fn log_failed_auth(&self, failed_auth_id: Option<String>, msg: &str) {
         let msg = match (self.client_ip, failed_auth_id) {
             (Some(peer), Some(user)) => {
-                format!("authentication failure; rhost={} user={} msg={}", peer, user, msg)
+                format!(
+                    "authentication failure; rhost={} user={} msg={}",
+                    peer, user, msg
+                )
             }
             (Some(peer), None) => {
                 format!("authentication failure; rhost={} msg={}", peer, msg)
             }
             (None, Some(user)) => {
-                format!("authentication failure; rhost=unknown user={} msg={}", user, msg)
+                format!(
+                    "authentication failure; rhost=unknown user={} msg={}",
+                    user, msg
+                )
             }
             (None, None) => {
                 format!("authentication failure; rhost=unknown msg={}", msg)
@@ -59,12 +65,10 @@ impl RestEnvironment {
             auth_logger.lock().unwrap().log(&msg);
         }
     }
-
 }
 
 impl RpcEnvironment for RestEnvironment {
-
-    fn result_attrib_mut (&mut self) -> &mut Value {
+    fn result_attrib_mut(&mut self) -> &mut Value {
         &mut self.result_attributes
     }
 
index 7d4d3f86bf326717ef0188c2d212c3cf3fe777d0..eeabadabc699216b43f3818f3296d059dffe7d07 100644 (file)
@@ -3,7 +3,7 @@ use std::io::Write;
 use anyhow::Error;
 use nix::fcntl::OFlag;
 
-use proxmox_sys::fs::{CreateOptions, atomic_open_or_create_file};
+use proxmox_sys::fs::{atomic_open_or_create_file, CreateOptions};
 
 /// Options to control the behavior of a [FileLogger] instance
 #[derive(Default)]
@@ -23,7 +23,6 @@ pub struct FileLogOptions {
     pub prefix_time: bool,
     /// File owner/group and mode
     pub file_opts: CreateOptions,
-
 }
 
 /// Log messages with optional automatically added timestamps into files
@@ -66,7 +65,11 @@ impl FileLogger {
 
         let file_name: std::path::PathBuf = file_name.as_ref().to_path_buf();
 
-        Ok(Self { file, file_name, options })
+        Ok(Self {
+            file,
+            file_name,
+            options,
+        })
     }
 
     pub fn reopen(&mut self) -> Result<&Self, Error> {
@@ -79,23 +82,23 @@ impl FileLogger {
         file_name: P,
         options: &FileLogOptions,
     ) -> Result<std::fs::File, Error> {
-
         let mut flags = OFlag::O_CLOEXEC;
 
-        if options.read  {
-            flags |=  OFlag::O_RDWR;
+        if options.read {
+            flags |= OFlag::O_RDWR;
         } else {
-            flags |=  OFlag::O_WRONLY;
+            flags |= OFlag::O_WRONLY;
         }
 
         if options.append {
-            flags |=  OFlag::O_APPEND;
+            flags |= OFlag::O_APPEND;
         }
         if options.exclusive {
-            flags |=  OFlag::O_EXCL;
+            flags |= OFlag::O_EXCL;
         }
 
-        let file = atomic_open_or_create_file(&file_name, flags, &[], options.file_opts.clone(), false)?;
+        let file =
+            atomic_open_or_create_file(&file_name, flags, &[], options.file_opts.clone(), false)?;
 
         Ok(file)
     }
index e3958826ca8ad216239577f5f70d31df86f7f515..e31df5b8110f6004be9e3cfbec4c00d4147f6fee 100644 (file)
@@ -1,11 +1,11 @@
 //! Helpers to format response data
 use std::collections::HashMap;
 
-use anyhow::{Error};
+use anyhow::Error;
 use serde_json::{json, Value};
 
-use hyper::{Body, Response, StatusCode};
 use hyper::header;
+use hyper::{Body, Response, StatusCode};
 
 use proxmox_router::{HttpError, RpcEnvironment};
 use proxmox_schema::ParameterError;
@@ -22,7 +22,11 @@ pub trait OutputFormatter: Send + Sync {
     fn format_error(&self, err: Error) -> Response<Body>;
 
     /// Transform a [Result] into a http response
-    fn format_result(&self, result: Result<Value, Error>, rpcenv: &dyn RpcEnvironment) -> Response<Body> {
+    fn format_result(
+        &self,
+        result: Result<Value, Error>,
+        rpcenv: &dyn RpcEnvironment,
+    ) -> Response<Body> {
         match result {
             Ok(data) => self.format_data(data, rpcenv),
             Err(err) => self.format_error(err),
@@ -33,7 +37,6 @@ pub trait OutputFormatter: Send + Sync {
 static JSON_CONTENT_TYPE: &str = "application/json;charset=UTF-8";
 
 fn json_data_response(data: Value) -> Response<Body> {
-
     let json_str = data.to_string();
 
     let raw = json_str.into_bytes();
@@ -41,13 +44,13 @@ fn json_data_response(data: Value) -> Response<Body> {
     let mut response = Response::new(raw.into());
     response.headers_mut().insert(
         header::CONTENT_TYPE,
-        header::HeaderValue::from_static(JSON_CONTENT_TYPE));
+        header::HeaderValue::from_static(JSON_CONTENT_TYPE),
+    );
 
     response
 }
 
-fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment)
-{
+fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) {
     let attributes = match rpcenv.result_attrib().as_object() {
         Some(attr) => attr,
         None => return,
@@ -58,7 +61,6 @@ fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment)
     }
 }
 
-
 struct JsonFormatter();
 
 /// Format data as ``application/json``
@@ -73,13 +75,9 @@ struct JsonFormatter();
 /// message as string.
 pub static JSON_FORMATTER: &'static dyn OutputFormatter = &JsonFormatter();
 
-impl  OutputFormatter for JsonFormatter {
-
+impl OutputFormatter for JsonFormatter {
     fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body> {
-
-        let mut result = json!({
-            "data": data
-        });
+        let mut result = json!({ "data": data });
 
         add_result_attributes(&mut result, rpcenv);
 
@@ -87,7 +85,6 @@ impl  OutputFormatter for JsonFormatter {
     }
 
     fn format_error(&self, err: Error) -> Response<Body> {
-
         let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
             let mut resp = Response::new(Body::from(apierr.message.clone()));
             *resp.status_mut() = apierr.code;
@@ -100,9 +97,12 @@ impl  OutputFormatter for JsonFormatter {
 
         response.headers_mut().insert(
             header::CONTENT_TYPE,
-            header::HeaderValue::from_static(JSON_CONTENT_TYPE));
+            header::HeaderValue::from_static(JSON_CONTENT_TYPE),
+        );
 
-        response.extensions_mut().insert(ErrorMessageExtension(err.to_string()));
+        response
+            .extensions_mut()
+            .insert(ErrorMessageExtension(err.to_string()));
 
         response
     }
@@ -128,10 +128,8 @@ pub static EXTJS_FORMATTER: &'static dyn OutputFormatter = &ExtJsFormatter();
 
 struct ExtJsFormatter();
 
-impl  OutputFormatter for ExtJsFormatter {
-
+impl OutputFormatter for ExtJsFormatter {
     fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body> {
-
         let mut result = json!({
             "data": data,
             "success": true
@@ -143,7 +141,6 @@ impl  OutputFormatter for ExtJsFormatter {
     }
 
     fn format_error(&self, err: Error) -> Response<Body> {
-
         let message: String;
         let mut errors = HashMap::new();
 
@@ -165,7 +162,9 @@ impl  OutputFormatter for ExtJsFormatter {
 
         let mut response = json_data_response(result);
 
-        response.extensions_mut().insert(ErrorMessageExtension(message));
+        response
+            .extensions_mut()
+            .insert(ErrorMessageExtension(message));
 
         response
     }
index f5fcdeeac97d114216ac530b553136390a25ed26..3f90c178d5954fc82a8fc7b600b8c6db25541bb1 100644 (file)
@@ -1,4 +1,4 @@
-use anyhow::{Error};
+use anyhow::Error;
 
 use std::collections::HashMap;
 use std::pin::Pin;
@@ -8,11 +8,11 @@ use std::task::{Context, Poll};
 use futures::*;
 use hyper::{Body, Request, Response, StatusCode};
 
-use proxmox_router::{ApiResponseFuture, HttpError, Router, RpcEnvironment};
 use proxmox_router::http_err;
+use proxmox_router::{ApiResponseFuture, HttpError, Router, RpcEnvironment};
 
-use crate::{normalize_uri_path, WorkerTask};
 use crate::formatter::*;
+use crate::{normalize_uri_path, WorkerTask};
 
 /// Hyper Service implementation to handle stateful H2 connections.
 ///
@@ -26,24 +26,29 @@ pub struct H2Service<E> {
     debug: bool,
 }
 
-impl <E: RpcEnvironment + Clone> H2Service<E> {
-
+impl<E: RpcEnvironment + Clone> H2Service<E> {
     pub fn new(rpcenv: E, worker: Arc<WorkerTask>, router: &'static Router, debug: bool) -> Self {
-        Self { rpcenv, worker, router, debug }
+        Self {
+            rpcenv,
+            worker,
+            router,
+            debug,
+        }
     }
 
     pub fn debug<S: AsRef<str>>(&self, msg: S) {
-        if self.debug { self.worker.log_message(msg); }
+        if self.debug {
+            self.worker.log_message(msg);
+        }
     }
 
     fn handle_request(&self, req: Request<Body>) -> ApiResponseFuture {
-
         let (parts, body) = req.into_parts();
 
         let method = parts.method.clone();
 
         let (path, components) = match normalize_uri_path(parts.uri.path()) {
-            Ok((p,c)) => (p, c),
+            Ok((p, c)) => (p, c),
             Err(err) => return future::err(http_err!(BAD_REQUEST, "{}", err)).boxed(),
         };
 
@@ -58,15 +63,24 @@ impl <E: RpcEnvironment + Clone> H2Service<E> {
                 let err = http_err!(NOT_FOUND, "Path '{}' not found.", path);
                 future::ok(formatter.format_error(err)).boxed()
             }
-            Some(api_method) => {
-                crate::rest::handle_api_request(
-                    self.rpcenv.clone(), api_method, formatter, parts, body, uri_param).boxed()
-            }
+            Some(api_method) => crate::rest::handle_api_request(
+                self.rpcenv.clone(),
+                api_method,
+                formatter,
+                parts,
+                body,
+                uri_param,
+            )
+            .boxed(),
         }
     }
 
-    fn log_response(worker: Arc<WorkerTask>, method: hyper::Method, path: &str, resp: &Response<Body>) {
-
+    fn log_response(
+        worker: Arc<WorkerTask>,
+        method: hyper::Method,
+        path: &str,
+        resp: &Response<Body>,
+    ) {
         let status = resp.status();
 
         if !status.is_success() {
@@ -89,7 +103,7 @@ impl <E: RpcEnvironment + Clone> H2Service<E> {
     }
 }
 
-impl <E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Service<E> {
+impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Service<E> {
     type Response = Response<Body>;
     type Error = Error;
     #[allow(clippy::type_complexity)]
@@ -111,15 +125,17 @@ impl <E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Ser
                     Ok::<_, Error>(res)
                 }
                 Err(err) => {
-                     if let Some(apierr) = err.downcast_ref::<HttpError>() {
+                    if let Some(apierr) = err.downcast_ref::<HttpError>() {
                         let mut resp = Response::new(Body::from(apierr.message.clone()));
-                        resp.extensions_mut().insert(ErrorMessageExtension(apierr.message.clone()));
+                        resp.extensions_mut()
+                            .insert(ErrorMessageExtension(apierr.message.clone()));
                         *resp.status_mut() = apierr.code;
                         Self::log_response(worker, method, &path, &resp);
                         Ok(resp)
                     } else {
                         let mut resp = Response::new(Body::from(err.to_string()));
-                        resp.extensions_mut().insert(ErrorMessageExtension(err.to_string()));
+                        resp.extensions_mut()
+                            .insert(ErrorMessageExtension(err.to_string()));
                         *resp.status_mut() = StatusCode::BAD_REQUEST;
                         Self::log_response(worker, method, &path, &resp);
                         Ok(resp)
index a20d9070d31674d0b91d70fd463c85f36f2284b4..dc538a80b6a951ca1da278962a1587413e1abfcb 100644 (file)
 //!   - worker task management
 //! * generic interface to authenticate user
 
-use std::sync::atomic::{Ordering, AtomicBool};
 use std::future::Future;
 use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
 
 use anyhow::{bail, format_err, Error};
-use nix::unistd::Pid;
-use hyper::{Body, Response, Method};
 use http::request::Parts;
 use http::HeaderMap;
+use hyper::{Body, Method, Response};
+use nix::unistd::Pid;
 
+use proxmox_router::UserInformation;
 use proxmox_sys::fd::Fd;
-use proxmox_sys::linux::procfs::PidStat;
 use proxmox_sys::fs::CreateOptions;
-use proxmox_router::UserInformation;
+use proxmox_sys::linux::procfs::PidStat;
 
 mod compression;
 pub use compression::*;
@@ -47,7 +47,7 @@ mod command_socket;
 pub use command_socket::*;
 
 mod file_logger;
-pub use file_logger::{FileLogger, FileLogOptions};
+pub use file_logger::{FileLogOptions, FileLogger};
 
 mod api_config;
 pub use api_config::ApiConfig;
@@ -75,7 +75,6 @@ impl From<Error> for AuthError {
 
 /// User Authentication and index/root page generation methods
 pub trait ServerAdapter: Send + Sync {
-
     /// Returns the index/root page
     fn get_index(
         &self,
@@ -91,11 +90,16 @@ pub trait ServerAdapter: Send + Sync {
         &'a self,
         headers: &'a HeaderMap,
         method: &'a Method,
-    ) -> Pin<Box<dyn Future<Output = Result<(String, Box<dyn UserInformation + Sync + Send>), AuthError>> + Send + 'a>>;
-
+    ) -> Pin<
+        Box<
+            dyn Future<Output = Result<(String, Box<dyn UserInformation + Sync + Send>), AuthError>>
+                + Send
+                + 'a,
+        >,
+    >;
 }
 
-lazy_static::lazy_static!{
+lazy_static::lazy_static! {
     static ref PID: i32 = unsafe { libc::getpid() };
     static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime;
 }
@@ -124,7 +128,8 @@ pub fn write_pid(pid_fn: &str) -> Result<(), Error> {
 pub fn read_pid(pid_fn: &str) -> Result<i32, Error> {
     let pid = proxmox_sys::fs::file_get_contents(pid_fn)?;
     let pid = std::str::from_utf8(&pid)?.trim();
-    pid.parse().map_err(|err| format_err!("could not parse pid - {}", err))
+    pid.parse()
+        .map_err(|err| format_err!("could not parse pid - {}", err))
 }
 
 /// Returns the control socket path for a specific process ID.
@@ -178,7 +183,6 @@ pub fn socketpair() -> Result<(Fd, Fd), Error> {
     Ok((Fd(pa), Fd(pb)))
 }
 
-
 /// Extract a specific cookie from cookie header.
 /// We assume cookie_name is already url encoded.
 pub fn extract_cookie(cookie: &str, cookie_name: &str) -> Option<String> {
index 3343d5d6633400a1368d07c2de14206c36103ee3..2aadf1ed267ab10cb53b2b50c25837bcce2fe94a 100644 (file)
@@ -18,24 +18,24 @@ use regex::Regex;
 use serde_json::Value;
 use tokio::fs::File;
 use tokio::time::Instant;
-use url::form_urlencoded;
 use tower_service::Service;
+use url::form_urlencoded;
 
+use proxmox_router::http_err;
 use proxmox_router::{
     check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
     RpcEnvironmentType, UserInformation,
 };
-use proxmox_router::http_err;
 use proxmox_schema::{ObjectSchemaType, ParameterSchema};
 
 use proxmox_http::client::RateLimitedStream;
 
-use proxmox_compression::{DeflateEncoder, Level};
 use proxmox_async::stream::AsyncReaderStream;
+use proxmox_compression::{DeflateEncoder, Level};
 
 use crate::{
-    ApiConfig, FileLogger, AuthError, RestEnvironment, CompressionMethod,
-    normalize_uri_path, formatter::*,
+    formatter::*, normalize_uri_path, ApiConfig, AuthError, CompressionMethod, FileLogger,
+    RestEnvironment,
 };
 
 extern "C" {
@@ -47,9 +47,15 @@ struct AuthStringExtension(String);
 struct EmptyUserInformation {}
 
 impl UserInformation for EmptyUserInformation {
-    fn is_superuser(&self, _userid: &str) -> bool { false }
-    fn is_group_member(&self, _userid: &str, _group: &str) -> bool { false }
-    fn lookup_privs(&self, _userid: &str, _path: &[&str]) -> u64 { 0 }
+    fn is_superuser(&self, _userid: &str) -> bool {
+        false
+    }
+    fn is_group_member(&self, _userid: &str, _group: &str) -> bool {
+        false
+    }
+    fn lookup_privs(&self, _userid: &str, _path: &[&str]) -> u64 {
+        0
+    }
 }
 
 /// REST server implementation (configured with [ApiConfig])
@@ -98,9 +104,7 @@ impl Service<&Pin<Box<tokio_openssl::SslStream<RateLimitedStream<tokio::net::Tcp
     }
 }
 
-impl Service<&Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>>
-    for RestServer
-{
+impl Service<&Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>> for RestServer {
     type Response = ApiService;
     type Error = Error;
     type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
@@ -134,7 +138,7 @@ impl Service<&hyper::server::conn::AddrStream> for RestServer {
     }
 
     fn call(&mut self, ctx: &hyper::server::conn::AddrStream) -> Self::Future {
-        let peer =  ctx.remote_addr();
+        let peer = ctx.remote_addr();
         future::ok(ApiService {
             peer,
             api_config: self.api_config.clone(),
@@ -494,7 +498,6 @@ pub(crate) async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHa
     Ok(resp)
 }
 
-
 fn extension_to_content_type(filename: &Path) -> (&'static str, bool) {
     if let Some(ext) = filename.extension().and_then(|osstr| osstr.to_str()) {
         return match ext {
@@ -671,7 +674,8 @@ async fn handle_request(
                 }
             }
 
-            let mut user_info: Box<dyn UserInformation + Send + Sync> = Box::new(EmptyUserInformation {});
+            let mut user_info: Box<dyn UserInformation + Send + Sync> =
+                Box::new(EmptyUserInformation {});
 
             if auth_required {
                 match api.check_auth(&parts.headers, &method).await {
@@ -730,7 +734,9 @@ async fn handle_request(
                     };
 
                     if let Some(auth_id) = auth_id {
-                        response.extensions_mut().insert(AuthStringExtension(auth_id));
+                        response
+                            .extensions_mut()
+                            .insert(AuthStringExtension(auth_id));
                     }
 
                     return Ok(response);
index e4234c76ffb9b33784462ee3d6d9b641592905d3..c3b81627450c942548802366a6f7e300cb0c62f1 100644 (file)
@@ -1,4 +1,4 @@
-use anyhow::{Error};
+use anyhow::Error;
 use lazy_static::lazy_static;
 use std::sync::Mutex;
 
@@ -40,7 +40,6 @@ lazy_static! {
 ///
 /// This calls [request_shutdown] when receiving the signal.
 pub fn catch_shutdown_signal() -> Result<(), Error> {
-
     let mut stream = signal(SignalKind::interrupt())?;
 
     let future = async move {
@@ -49,7 +48,8 @@ pub fn catch_shutdown_signal() -> Result<(), Error> {
             SERVER_STATE.lock().unwrap().reload_request = false;
             request_shutdown();
         }
-    }.boxed();
+    }
+    .boxed();
 
     let abort_future = last_worker_future().map_err(|_| {});
     let task = futures::future::select(future, abort_future);
@@ -64,7 +64,6 @@ pub fn catch_shutdown_signal() -> Result<(), Error> {
 /// This calls [request_shutdown] when receiving the signal, and tries
 /// to restart the server.
 pub fn catch_reload_signal() -> Result<(), Error> {
-
     let mut stream = signal(SignalKind::hangup())?;
 
     let future = async move {
@@ -73,7 +72,8 @@ pub fn catch_reload_signal() -> Result<(), Error> {
             SERVER_STATE.lock().unwrap().reload_request = true;
             crate::request_shutdown();
         }
-    }.boxed();
+    }
+    .boxed();
 
     let abort_future = last_worker_future().map_err(|_| {});
     let task = futures::future::select(future, abort_future);
@@ -89,7 +89,6 @@ pub(crate) fn is_reload_request() -> bool {
     data.mode == ServerMode::Shutdown && data.reload_request
 }
 
-
 pub(crate) fn server_shutdown() {
     let mut data = SERVER_STATE.lock().unwrap();
 
@@ -107,14 +106,11 @@ pub(crate) fn server_shutdown() {
 /// Future to signal server shutdown
 pub fn shutdown_future() -> impl Future<Output = ()> {
     let mut data = SERVER_STATE.lock().unwrap();
-    data
-        .shutdown_listeners
-        .listen()
-        .map(|_| ())
+    data.shutdown_listeners.listen().map(|_| ())
 }
 
 /// Future to signal when last worker task finished
-pub fn last_worker_future() ->  impl Future<Output = Result<(), Error>> {
+pub fn last_worker_future() -> impl Future<Output = Result<(), Error>> {
     let mut data = SERVER_STATE.lock().unwrap();
     data.last_worker_listeners.listen()
 }
@@ -128,7 +124,12 @@ pub(crate) fn set_worker_count(count: usize) {
 pub(crate) fn check_last_worker() {
     let mut data = SERVER_STATE.lock().unwrap();
 
-    if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.internal_task_count == 0) { return; }
+    if !(data.mode == ServerMode::Shutdown
+        && data.worker_count == 0
+        && data.internal_task_count == 0)
+    {
+        return;
+    }
 
     data.last_worker_listeners.notify_listeners(Ok(()));
 }
@@ -147,7 +148,8 @@ where
     tokio::spawn(async move {
         let _ = tokio::spawn(task).await; // ignore errors
 
-        { // drop mutex
+        {
+            // drop mutex
             let mut data = SERVER_STATE.lock().unwrap();
             if data.internal_task_count > 0 {
                 data.internal_task_count -= 1;
index 643e1872b5926d56d27f544afb372db99e85ae5b..f34cd1fc36091a56c9dd58ef87a38ada954c2a19 100644 (file)
@@ -1,30 +1,30 @@
 use std::collections::{HashMap, VecDeque};
 use std::fs::File;
-use std::path::PathBuf;
-use std::io::{Read, Write, BufRead, BufReader};
+use std::io::{BufRead, BufReader, Read, Write};
 use std::panic::UnwindSafe;
+use std::path::PathBuf;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::{Arc, Mutex};
-use std::time::{SystemTime, Duration};
+use std::time::{Duration, SystemTime};
 
 use anyhow::{bail, format_err, Error};
 use futures::*;
 use lazy_static::lazy_static;
-use serde_json::{json, Value};
-use serde::{Serialize, Deserialize};
-use tokio::sync::oneshot;
 use nix::fcntl::OFlag;
 use once_cell::sync::OnceCell;
+use serde::{Deserialize, Serialize};
+use serde_json::{json, Value};
+use tokio::sync::oneshot;
 
-use proxmox_sys::linux::procfs;
-use proxmox_sys::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
 use proxmox_lang::try_block;
 use proxmox_schema::upid::UPID;
+use proxmox_sys::fs::{atomic_open_or_create_file, create_path, replace_file, CreateOptions};
+use proxmox_sys::linux::procfs;
 
-use proxmox_sys::WorkerTaskContext;
 use proxmox_sys::logrotate::{LogRotate, LogRotateFiles};
+use proxmox_sys::WorkerTaskContext;
 
-use crate::{CommandSocket, FileLogger, FileLogOptions};
+use crate::{CommandSocket, FileLogOptions, FileLogger};
 
 struct TaskListLockGuard(File);
 
@@ -40,14 +40,13 @@ struct WorkerTaskSetup {
 static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
 
 fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
-    WORKER_TASK_SETUP.get()
+    WORKER_TASK_SETUP
+        .get()
         .ok_or_else(|| format_err!("WorkerTask library is not initialized"))
 }
 
 impl WorkerTaskSetup {
-
     fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
-
         let mut taskdir = basedir;
         taskdir.push("tasks");
 
@@ -74,17 +73,15 @@ impl WorkerTaskSetup {
     }
 
     fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
-        let options =  self.file_opts.clone()
+        let options = self
+            .file_opts
+            .clone()
             .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
 
         let timeout = std::time::Duration::new(10, 0);
 
-        let file = proxmox_sys::fs::open_file_locked(
-            &self.task_lock_fn,
-            timeout,
-            exclusive,
-            options,
-        )?;
+        let file =
+            proxmox_sys::fs::open_file_locked(&self.task_lock_fn, timeout, exclusive, options)?;
 
         Ok(TaskListLockGuard(file))
     }
@@ -99,7 +96,6 @@ impl WorkerTaskSetup {
     // atomically read/update the task list, update status of finished tasks
     // new_upid is added to the list when specified.
     fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
-
         let lock = self.lock_task_list_files(true)?;
 
         // TODO remove with 1.x
@@ -121,45 +117,48 @@ impl WorkerTaskSetup {
                 if !worker_is_active_local(&info.upid) {
                     // println!("Detected stopped task '{}'", &info.upid_str);
                     let now = proxmox_time::epoch_i64();
-                    let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
+                    let status =
+                        upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
                     finish_list.push(TaskListInfo {
                         upid: info.upid,
                         upid_str: info.upid_str,
-                        state: Some(status)
+                        state: Some(status),
                     });
                     return None;
                 }
 
                 Some(info)
-            }).collect();
+            })
+            .collect();
 
         if let Some(upid) = new_upid {
-            active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
+            active_list.push(TaskListInfo {
+                upid: upid.clone(),
+                upid_str: upid.to_string(),
+                state: None,
+            });
         }
 
         let active_raw = render_task_list(&active_list);
 
-        let options =  self.file_opts.clone()
+        let options = self
+            .file_opts
+            .clone()
             .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
 
-        replace_file(
-            &self.active_tasks_fn,
-            active_raw.as_bytes(),
-            options,
-            false,
-        )?;
-
-        finish_list.sort_unstable_by(|a, b| {
-            match (&a.state, &b.state) {
-                (Some(s1), Some(s2)) => s1.cmp(s2),
-                (Some(_), None) => std::cmp::Ordering::Less,
-                (None, Some(_)) => std::cmp::Ordering::Greater,
-                _ => a.upid.starttime.cmp(&b.upid.starttime),
-            }
+        replace_file(&self.active_tasks_fn, active_raw.as_bytes(), options, false)?;
+
+        finish_list.sort_unstable_by(|a, b| match (&a.state, &b.state) {
+            (Some(s1), Some(s2)) => s1.cmp(s2),
+            (Some(_), None) => std::cmp::Ordering::Less,
+            (None, Some(_)) => std::cmp::Ordering::Greater,
+            _ => a.upid.starttime.cmp(&b.upid.starttime),
         });
 
         if !finish_list.is_empty() {
-            let options =  self.file_opts.clone()
+            let options = self
+                .file_opts
+                .clone()
                 .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
 
             let mut writer = atomic_open_or_create_file(
@@ -187,15 +186,17 @@ impl WorkerTaskSetup {
 
     // Create task log directory with correct permissions
     fn create_task_log_dirs(&self) -> Result<(), Error> {
-
         try_block!({
-            let dir_opts = self.file_opts.clone()
+            let dir_opts = self
+                .file_opts
+                .clone()
                 .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
 
             create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts))?;
             // fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
             Ok(())
-        }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
+        })
+        .map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
     }
 }
 
@@ -203,7 +204,8 @@ impl WorkerTaskSetup {
 pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
     let setup = WorkerTaskSetup::new(basedir, file_opts);
     setup.create_task_log_dirs()?;
-    WORKER_TASK_SETUP.set(setup)
+    WORKER_TASK_SETUP
+        .set(setup)
         .map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
 }
 
@@ -215,17 +217,11 @@ pub fn rotate_task_log_archive(
     max_files: Option<usize>,
     options: Option<CreateOptions>,
 ) -> Result<bool, Error> {
-
     let setup = worker_task_setup()?;
 
     let _lock = setup.lock_task_list_files(true)?;
 
-    let mut logrotate = LogRotate::new(
-        &setup.task_archive_fn,
-        compress,
-        max_files,
-        options,
-    )?;
+    let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress, max_files, options)?;
 
     logrotate.rotate(size_threshold)
 }
@@ -237,12 +233,7 @@ pub fn cleanup_old_tasks(compressed: bool) -> Result<(), Error> {
 
     let _lock = setup.lock_task_list_files(true)?;
 
-    let logrotate = LogRotate::new(
-        &setup.task_archive_fn,
-        compressed,
-        None,
-        None,
-    )?;
+    let logrotate = LogRotate::new(&setup.task_archive_fn, compressed, None, None)?;
 
     let mut timestamp = None;
     if let Some(last_file) = logrotate.files().last() {
@@ -265,7 +256,8 @@ pub fn cleanup_old_tasks(compressed: bool) -> Result<(), Error> {
             SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(timestamp as u64))
         } else {
             SystemTime::UNIX_EPOCH.checked_sub(Duration::from_secs(-timestamp as u64))
-        }.ok_or_else(|| format_err!("could not calculate cutoff time"))?;
+        }
+        .ok_or_else(|| format_err!("could not calculate cutoff time"))?;
 
         for i in 0..256 {
             let mut path = setup.taskdir.clone();
@@ -279,8 +271,8 @@ pub fn cleanup_old_tasks(compressed: bool) -> Result<(), Error> {
 
                 if modified < cutoff_time {
                     match std::fs::remove_file(path) {
-                        Ok(()) => {},
-                        Err(err) if err.kind() == std::io::ErrorKind::NotFound => {},
+                        Ok(()) => {}
+                        Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
                         Err(err) => bail!("could not remove file: {}", err),
                     }
                 }
@@ -291,7 +283,6 @@ pub fn cleanup_old_tasks(compressed: bool) -> Result<(), Error> {
     Ok(())
 }
 
-
 /// Path to the worker log file
 pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
     let setup = worker_task_setup()?;
@@ -302,10 +293,11 @@ pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
 /// If there is not a single line with at valid datetime, we assume the
 /// starttime to be the endtime
 pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
-
     let setup = worker_task_setup()?;
 
-    let mut status = TaskState::Unknown { endtime: upid.starttime };
+    let mut status = TaskState::Unknown {
+        endtime: upid.starttime,
+    };
 
     let path = setup.log_path(upid);
 
@@ -325,7 +317,7 @@ pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
     }
 
     let last_line = match data.iter().rposition(|c| *c == b'\n') {
-        Some(start) if data.len() > (start+1) => &data[start+1..],
+        Some(start) if data.len() > (start + 1) => &data[start + 1..],
         Some(_) => &data, // should not happen, since we removed all trailing newlines
         None => &data,
     };
@@ -350,7 +342,8 @@ pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
 }
 
 lazy_static! {
-    static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
+    static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> =
+        Mutex::new(HashMap::new());
 }
 
 /// checks if the task UPID refers to a worker from this process
@@ -405,11 +398,13 @@ pub fn worker_is_active_local(upid: &UPID) -> bool {
 ///
 /// * ``worker-task-status <UPID>``: return true of false, depending on
 /// whether the worker is running or stopped.
-pub fn register_task_control_commands(
-    commando_sock: &mut CommandSocket,
-) -> Result<(), Error> {
+pub fn register_task_control_commands(commando_sock: &mut CommandSocket) -> Result<(), Error> {
     fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
-        let args = if let Some(args) = args { args } else { bail!("missing args") };
+        let args = if let Some(args) = args {
+            args
+        } else {
+            bail!("missing args")
+        };
         let upid = match args.get("upid") {
             Some(Value::String(upid)) => upid.parse::<UPID>()?,
             None => bail!("no upid in args"),
@@ -454,7 +449,6 @@ pub fn abort_worker_nowait(upid: UPID) {
 ///
 /// By sending ``worker-task-abort`` to the control socket.
 pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
-
     let sock = crate::ctrl_sock_from_pid(upid.pid);
     let cmd = json!({
         "command": "worker-task-abort",
@@ -466,7 +460,6 @@ pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
 }
 
 fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
-
     let data = line.splitn(3, ' ').collect::<Vec<&str>>();
 
     let len = data.len();
@@ -519,10 +512,15 @@ impl TaskState {
             Ok(TaskState::OK { endtime })
         } else if let Some(warnings) = s.strip_prefix("WARNINGS: ") {
             let count: u64 = warnings.parse()?;
-            Ok(TaskState::Warning{ count, endtime })
+            Ok(TaskState::Warning { count, endtime })
         } else if !s.is_empty() {
-            let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string();
-            Ok(TaskState::Error{ message, endtime })
+            let message = if let Some(err) = s.strip_prefix("ERROR: ") {
+                err
+            } else {
+                s
+            }
+            .to_string();
+            Ok(TaskState::Error { message, endtime })
         } else {
             bail!("unable to parse Task Status '{}'", s);
         }
@@ -545,7 +543,7 @@ impl std::fmt::Display for TaskState {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         match self {
             TaskState::Unknown { .. } => write!(f, "unknown"),
-            TaskState::OK { .. }=> write!(f, "OK"),
+            TaskState::OK { .. } => write!(f, "OK"),
             TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count),
             TaskState::Error { message, .. } => write!(f, "{}", message),
         }
@@ -568,7 +566,12 @@ pub struct TaskListInfo {
 fn render_task_line(info: &TaskListInfo) -> String {
     let mut raw = String::new();
     if let Some(status) = &info.state {
-        raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status));
+        raw.push_str(&format!(
+            "{} {:08X} {}\n",
+            info.upid_str,
+            status.endtime(),
+            status
+        ));
     } else {
         raw.push_str(&info.upid_str);
         raw.push('\n');
@@ -587,8 +590,7 @@ fn render_task_list(list: &[TaskListInfo]) -> String {
 
 // note this is not locked, caller has to make sure it is
 // this will skip (and log) lines that are not valid status lines
-fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
-{
+fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error> {
     let reader = BufReader::new(reader);
     let mut list = Vec::new();
     for line in reader.lines() {
@@ -597,7 +599,7 @@ fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
             Ok((upid_str, upid, state)) => list.push(TaskListInfo {
                 upid_str,
                 upid,
-                state
+                state,
             }),
             Err(err) => {
                 log::warn!("unable to parse worker status '{}' - {}", line, err);
@@ -634,7 +636,6 @@ pub struct TaskListInfoIterator {
 impl TaskListInfoIterator {
     /// Creates a new iterator instance.
     pub fn new(active_only: bool) -> Result<Self, Error> {
-
         let setup = worker_task_setup()?;
 
         let (read_lock, active_list) = {
@@ -685,7 +686,7 @@ impl Iterator for TaskListInfoIterator {
             if let Some(element) = self.list.pop_back() {
                 return Some(Ok(element));
             } else if self.end {
-                    return None;
+                return None;
             } else {
                 if let Some(mut archive) = self.archive.take() {
                     if let Some(file) = archive.next() {
@@ -720,7 +721,6 @@ pub struct WorkerTask {
 }
 
 impl std::fmt::Display for WorkerTask {
-
     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
         self.upid.fmt(f)
     }
@@ -734,14 +734,12 @@ struct WorkerTaskData {
 }
 
 impl WorkerTask {
-
     pub fn new(
         worker_type: &str,
         worker_id: Option<String>,
         auth_id: String,
         to_stdout: bool,
     ) -> Result<Arc<Self>, Error> {
-
         let setup = worker_task_setup()?;
 
         let upid = UPID::new(worker_type, worker_id, auth_id)?;
@@ -751,7 +749,9 @@ impl WorkerTask {
 
         path.push(format!("{:02X}", upid.pstart & 255));
 
-        let dir_opts = setup.file_opts.clone()
+        let dir_opts = setup
+            .file_opts
+            .clone()
             .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
 
         create_path(&path, None, Some(dir_opts))?;
@@ -800,8 +800,9 @@ impl WorkerTask {
         to_stdout: bool,
         f: F,
     ) -> Result<String, Error>
-        where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
-              T: Send + 'static + Future<Output = Result<(), Error>>,
+    where
+        F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
+        T: Send + 'static + Future<Output = Result<(), Error>>,
     {
         let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
         let upid_str = worker.upid.to_string();
@@ -822,29 +823,26 @@ impl WorkerTask {
         to_stdout: bool,
         f: F,
     ) -> Result<String, Error>
-        where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
+    where
+        F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>,
     {
         let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
         let upid_str = worker.upid.to_string();
 
-        let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {
-            let worker1 = worker.clone();
-            let result = match std::panic::catch_unwind(move || f(worker1)) {
-                Ok(r) => r,
-                Err(panic) => {
-                    match panic.downcast::<&str>() {
-                        Ok(panic_msg) => {
-                            Err(format_err!("worker panicked: {}", panic_msg))
-                        }
-                        Err(_) => {
-                            Err(format_err!("worker panicked: unknown type."))
-                        }
-                    }
-                }
-            };
-
-            worker.log_result(&result);
-        });
+        let _child = std::thread::Builder::new()
+            .name(upid_str.clone())
+            .spawn(move || {
+                let worker1 = worker.clone();
+                let result = match std::panic::catch_unwind(move || f(worker1)) {
+                    Ok(r) => r,
+                    Err(panic) => match panic.downcast::<&str>() {
+                        Ok(panic_msg) => Err(format_err!("worker panicked: {}", panic_msg)),
+                        Err(_) => Err(format_err!("worker panicked: unknown type.")),
+                    },
+                };
+
+                worker.log_result(&result);
+            });
 
         Ok(upid_str)
     }
@@ -856,9 +854,15 @@ impl WorkerTask {
         let endtime = proxmox_time::epoch_i64();
 
         if let Err(err) = result {
-            TaskState::Error { message: err.to_string(), endtime }
+            TaskState::Error {
+                message: err.to_string(),
+                endtime,
+            }
         } else if warn_count > 0 {
-            TaskState::Warning { count: warn_count, endtime }
+            TaskState::Warning {
+                count: warn_count,
+                endtime,
+            }
         } else {
             TaskState::OK { endtime }
         }
@@ -893,30 +897,33 @@ impl WorkerTask {
             let mut data = self.data.lock().unwrap();
             data.progress = progress;
         } else {
-           // fixme:  log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
+            // fixme:  log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
         }
     }
 
     /// Request abort
     pub fn request_abort(&self) {
         let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
-        if !prev_abort { // log abort one time
+        if !prev_abort {
+            // log abort one time
             self.log_message("received abort request ...".to_string());
         }
         // noitify listeners
         let mut data = self.data.lock().unwrap();
         loop {
             match data.abort_listeners.pop() {
-                None => { break; },
+                None => {
+                    break;
+                }
                 Some(ch) => {
                     let _ = ch.send(()); // ignore errors here
-                },
+                }
             }
         }
     }
 
     /// Get a future which resolves on task abort
-    pub fn abort_future(&self) ->  oneshot::Receiver<()> {
+    pub fn abort_future(&self) -> oneshot::Receiver<()> {
         let (tx, rx) = oneshot::channel::<()>();
 
         let mut data = self.data.lock().unwrap();
@@ -934,7 +941,6 @@ impl WorkerTask {
 }
 
 impl WorkerTaskContext for WorkerTask {
-
     fn abort_requested(&self) -> bool {
         self.abort_requested.load(Ordering::SeqCst)
     }
@@ -963,7 +969,6 @@ impl WorkerTaskContext for WorkerTask {
 /// Note: local workers should print logs to stdout, so there is no
 /// need to fetch/display logs. We just wait for the worker to finish.
 pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
-
     let upid: UPID = upid_str.parse()?;
 
     let sleep_duration = core::time::Duration::new(0, 100_000_000);