]> git.proxmox.com Git - proxmox-backup.git/commitdiff
move ApiConfig, FileLogger and CommandoSocket to proxmox-rest-server workspace
authorDietmar Maurer <dietmar@proxmox.com>
Tue, 21 Sep 2021 05:58:40 +0000 (07:58 +0200)
committerThomas Lamprecht <t.lamprecht@proxmox.com>
Tue, 21 Sep 2021 06:46:41 +0000 (08:46 +0200)
ApiConfig: avoid using  pbs_config::backup_user()
CommandoSocket: avoid using  pbs_config::backup_user()
FileLogger: avoid using  pbs_config::backup_user()
- use atomic_open_or_create_file()

Auth Trait: moved definitions to proxmox-rest-server/src/lib.rs
- removed CachedUserInfo patrameter
- return user as String (not Authid)

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
38 files changed:
Cargo.toml
pbs-api-types/Cargo.toml
pbs-client/Cargo.toml
pbs-config/Cargo.toml
pbs-datastore/Cargo.toml
pbs-fuse-loop/Cargo.toml
pbs-systemd/Cargo.toml
pbs-tape/Cargo.toml
pbs-tools/Cargo.toml
proxmox-backup-client/Cargo.toml
proxmox-backup-debug/Cargo.toml
proxmox-file-restore/Cargo.toml
proxmox-rest-server/Cargo.toml
proxmox-rest-server/src/api_config.rs [new file with mode: 0644]
proxmox-rest-server/src/command_socket.rs [new file with mode: 0644]
proxmox-rest-server/src/file_logger.rs [new file with mode: 0644]
proxmox-rest-server/src/lib.rs
proxmox-rest-server/src/state.rs [new file with mode: 0644]
pxar-bin/Cargo.toml
src/api2/admin/datastore.rs
src/api2/node/mod.rs
src/backup/datastore.rs
src/backup/verify.rs
src/bin/proxmox-backup-api.rs
src/bin/proxmox-backup-proxy.rs
src/bin/proxmox-restore-daemon.rs
src/bin/proxmox_restore_daemon/auth.rs
src/server/auth.rs
src/server/command_socket.rs [deleted file]
src/server/config.rs [deleted file]
src/server/mod.rs
src/server/rest.rs
src/server/state.rs [deleted file]
src/server/worker_task.rs
src/tools/daemon.rs
src/tools/file_logger.rs [deleted file]
src/tools/mod.rs
tests/worker-task-abort.rs

index 913dc98e58682693ec000a4caba104a7e2887425..42528a49310c0a3acace793b0a052e31d8537182 100644 (file)
@@ -96,7 +96,7 @@ zstd = { version = "0.6", features = [ "bindgen" ] }
 pathpatterns = "0.1.2"
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
-proxmox = { version = "0.13.0", features = [ "sortable-macro", "api-macro", "cli", "router", "tfa" ] }
+proxmox = { version = "0.13.3", features = [ "sortable-macro", "api-macro", "cli", "router", "tfa" ] }
 proxmox-acme-rs = "0.2.1"
 proxmox-apt = "0.7.0"
 proxmox-http = { version = "0.4.0", features = [ "client", "http-helpers", "websocket" ] }
index 15507328aabbbc2410d128e9d6325619cab991c5..02c8c2d4c4591b314e519008a9877ce0cc5fdca3 100644 (file)
@@ -14,7 +14,7 @@ openssl = "0.10"
 regex = "1.2"
 serde = { version = "1.0", features = ["derive"] }
 
-proxmox = { version = "0.13.0", default-features = false, features = [ "api-macro" ] }
+proxmox = { version = "0.13.3", default-features = false, features = [ "api-macro" ] }
 
 pbs-systemd = { path = "../pbs-systemd" }
 pbs-tools = { path = "../pbs-tools" }
index fb12636f5eb19ff39f060c77cb7e482c3e435ec8..076acfb592d9d0a8dcca3ec8f1f23d3ab3c5b3eb 100644 (file)
@@ -28,7 +28,7 @@ tower-service = "0.3.0"
 xdg = "2.2"
 
 pathpatterns = "0.1.2"
-proxmox = { version = "0.13.0", default-features = false, features = [ "cli" ] }
+proxmox = { version = "0.13.3", default-features = false, features = [ "cli" ] }
 proxmox-fuse = "0.1.1"
 proxmox-http = { version = "0.4.0", features = [ "client", "http-helpers", "websocket" ] }
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
index 7f4258bd1cd4cb9d844d008bf5509ecb4487bc9e..4883004511877fae954a4ffefb07e432370fadf2 100644 (file)
@@ -16,7 +16,7 @@ nix = "0.19.1"
 regex = "1.2"
 once_cell = "1.3.1"
 
-proxmox = { version = "0.13.0", default-features = false, features = [ "cli" ] }
+proxmox = { version = "0.13.3", default-features = false, features = [ "cli" ] }
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-buildcfg = { path = "../pbs-buildcfg" }
index 32eae0d763b421082ff74537d2ade8b9fd6a0f0f..5b3c7fab81026ef94b49178de32835d384d02db4 100644 (file)
@@ -23,7 +23,7 @@ zstd = { version = "0.6", features = [ "bindgen" ] }
 pathpatterns = "0.1.2"
 pxar = "0.10.1"
 
-proxmox = { version = "0.13.0", default-features = false, features = [ "api-macro" ] }
+proxmox = { version = "0.13.3", default-features = false, features = [ "api-macro" ] }
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-tools = { path = "../pbs-tools" }
index 5865a463c3db5718809917de39e8ce8c7cb38412..c3220be72e369facf03bfc6fe61f8c79fa16d4f3 100644 (file)
@@ -14,7 +14,7 @@ nix = "0.19.1"
 regex = "1.2"
 tokio = { version = "1.6", features = [] }
 
-proxmox = "0.13.0"
+proxmox = "0.13.3"
 proxmox-fuse = "0.1.1"
 
 pbs-tools = { path = "../pbs-tools" }
index fcb604458fd64c01457ea4bc3fd183cb6c532fef..b4575f0ac7ed87366eda12625b6ec8f4d19f3100 100644 (file)
@@ -11,6 +11,6 @@ bitflags = "1.2.1"
 lazy_static = "1.4"
 nom = "5.1"
 
-proxmox = { version = "0.13.0", default-features = false }
+proxmox = { version = "0.13.3", default-features = false }
 
 pbs-tools = { path = "../pbs-tools" }
index 719ef01c8570143cb7235e416aa7440146495d2a..4ffae21e3691110f95c0de264f3c9a64098461f6 100644 (file)
@@ -18,7 +18,7 @@ bitflags = "1.2.1"
 regex = "1.2"
 udev = ">= 0.3, <0.5"
 
-proxmox = { version = "0.13.0", default-features = false, features = [] }
+proxmox = { version = "0.13.3", default-features = false, features = [] }
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-tools = { path = "../pbs-tools" }
index 89c6303cd279dcc331017b35182696401c72d7a8..88f6f54c3234f5c89969df128bd9f1cc8222ec74 100644 (file)
@@ -30,7 +30,7 @@ url = "2.1"
 walkdir = "2"
 zstd = { version = "0.6", features = [ "bindgen" ] }
 
-proxmox = { version = "0.13.0", default-features = false, features = [ "tokio" ] }
+proxmox = { version = "0.13.3", default-features = false, features = [ "tokio" ] }
 
 pbs-buildcfg = { path = "../pbs-buildcfg" }
 pbs-runtime = { path = "../pbs-runtime" }
index b1ecf3e4592271f8d1178e2a0970dabf17fadb3f..6c1bb93644dba2d5c827a81ae4d734e5088d5c0e 100644 (file)
@@ -22,7 +22,7 @@ zstd = { version = "0.6", features = [ "bindgen" ] }
 pathpatterns = "0.1.2"
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
-proxmox = { version = "0.13.0", features = [ "sortable-macro", "api-macro", "cli", "router" ] }
+proxmox = { version = "0.13.3", features = [ "sortable-macro", "api-macro", "cli", "router" ] }
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-buildcfg = { path = "../pbs-buildcfg" }
index 7f1f596d1f936bb36932e8e8e790ceb34f50611d..21b3cc2fa1d1dab3d8f7cd65cff84316cd5ca451 100644 (file)
@@ -9,7 +9,7 @@ anyhow = "1.0"
 walkdir = "2"
 serde_json = "1.0"
 
-proxmox = { version = "0.13.0", features = [ "api-macro", "cli" ] }
+proxmox = { version = "0.13.3", features = [ "api-macro", "cli" ] }
 
 pbs-config = { path = "../pbs-config" }
 pbs-client = { path = "../pbs-client" }
index 127397b660880d2c8f0a12b8fc76d4c2dbfdeba8..97f9c4141a4250badca9971c2d21f06457f828b1 100644 (file)
@@ -16,7 +16,7 @@ tokio = { version = "1.6", features = [ "io-std", "rt", "rt-multi-thread", "time
 
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
-proxmox = { version = "0.13.0", features = [ "api-macro", "cli" ] }
+proxmox = { version = "0.13.3", features = [ "api-macro", "cli" ] }
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-buildcfg = { path = "../pbs-buildcfg" }
index 836d9e282efeed230f9c24d03ddf8366db83a226..33ed6f39b3b3d20cc2facadc3937f6e471f23f23 100644 (file)
@@ -7,3 +7,18 @@ description = "REST server implementation"
 
 [dependencies]
 anyhow = "1.0"
+futures = "0.3"
+handlebars = "3.0"
+http = "0.2"
+hyper = { version = "0.14", features = [ "full" ] }
+lazy_static = "1.4"
+libc = "0.2"
+nix = "0.19.1"
+serde = { version = "1.0", features = [] }
+serde_json = "1.0"
+tokio = { version = "1.6", features = ["signal", "process"] }
+
+proxmox = { version = "0.13.3", features = [ "router"] }
+
+# fixme: remove this dependency (pbs_tools::broadcast_future)
+pbs-tools = { path = "../pbs-tools" }
diff --git a/proxmox-rest-server/src/api_config.rs b/proxmox-rest-server/src/api_config.rs
new file mode 100644 (file)
index 0000000..a319e20
--- /dev/null
@@ -0,0 +1,170 @@
+use std::collections::HashMap;
+use std::path::PathBuf;
+use std::time::SystemTime;
+use std::fs::metadata;
+use std::sync::{Arc, Mutex, RwLock};
+
+use anyhow::{bail, Error, format_err};
+use hyper::Method;
+use handlebars::Handlebars;
+use serde::Serialize;
+
+use proxmox::api::{ApiMethod, Router, RpcEnvironmentType};
+use proxmox::tools::fs::{create_path, CreateOptions};
+
+use crate::{ApiAuth, FileLogger, FileLogOptions, CommandoSocket};
+
+pub struct ApiConfig {
+    basedir: PathBuf,
+    router: &'static Router,
+    aliases: HashMap<String, PathBuf>,
+    env_type: RpcEnvironmentType,
+    templates: RwLock<Handlebars<'static>>,
+    template_files: RwLock<HashMap<String, (SystemTime, PathBuf)>>,
+    request_log: Option<Arc<Mutex<FileLogger>>>,
+    pub api_auth: Arc<dyn ApiAuth + Send + Sync>,
+}
+
+impl ApiConfig {
+    pub fn new<B: Into<PathBuf>>(
+        basedir: B,
+        router: &'static Router,
+        env_type: RpcEnvironmentType,
+        api_auth: Arc<dyn ApiAuth + Send + Sync>,
+    ) -> Result<Self, Error> {
+        Ok(Self {
+            basedir: basedir.into(),
+            router,
+            aliases: HashMap::new(),
+            env_type,
+            templates: RwLock::new(Handlebars::new()),
+            template_files: RwLock::new(HashMap::new()),
+            request_log: None,
+            api_auth,
+        })
+    }
+
+    pub fn find_method(
+        &self,
+        components: &[&str],
+        method: Method,
+        uri_param: &mut HashMap<String, String>,
+    ) -> Option<&'static ApiMethod> {
+
+        self.router.find_method(components, method, uri_param)
+    }
+
+    pub fn find_alias(&self, components: &[&str]) -> PathBuf {
+
+        let mut prefix = String::new();
+        let mut filename = self.basedir.clone();
+        let comp_len = components.len();
+        if comp_len >= 1 {
+            prefix.push_str(components[0]);
+            if let Some(subdir) = self.aliases.get(&prefix) {
+                filename.push(subdir);
+                components.iter().skip(1).for_each(|comp| filename.push(comp));
+            } else {
+                components.iter().for_each(|comp| filename.push(comp));
+            }
+        }
+        filename
+    }
+
+    pub fn add_alias<S, P>(&mut self, alias: S, path: P)
+        where S: Into<String>,
+              P: Into<PathBuf>,
+    {
+        self.aliases.insert(alias.into(), path.into());
+    }
+
+    pub fn env_type(&self) -> RpcEnvironmentType {
+        self.env_type
+    }
+
+    pub fn register_template<P>(&self, name: &str, path: P) -> Result<(), Error>
+    where
+        P: Into<PathBuf>
+    {
+        if self.template_files.read().unwrap().contains_key(name) {
+            bail!("template already registered");
+        }
+
+        let path: PathBuf = path.into();
+        let metadata = metadata(&path)?;
+        let mtime = metadata.modified()?;
+
+        self.templates.write().unwrap().register_template_file(name, &path)?;
+        self.template_files.write().unwrap().insert(name.to_string(), (mtime, path));
+
+        Ok(())
+    }
+
+    /// Checks if the template was modified since the last rendering
+    /// if yes, it loads a the new version of the template
+    pub fn render_template<T>(&self, name: &str, data: &T) -> Result<String, Error>
+    where
+        T: Serialize,
+    {
+        let path;
+        let mtime;
+        {
+            let template_files = self.template_files.read().unwrap();
+            let (old_mtime, old_path) = template_files.get(name).ok_or_else(|| format_err!("template not found"))?;
+
+            mtime = metadata(old_path)?.modified()?;
+            if mtime <= *old_mtime {
+                return self.templates.read().unwrap().render(name, data).map_err(|err| format_err!("{}", err));
+            }
+            path = old_path.to_path_buf();
+        }
+
+        {
+            let mut template_files = self.template_files.write().unwrap();
+            let mut templates = self.templates.write().unwrap();
+
+            templates.register_template_file(name, &path)?;
+            template_files.insert(name.to_string(), (mtime, path));
+
+            templates.render(name, data).map_err(|err| format_err!("{}", err))
+        }
+    }
+
+    pub fn enable_file_log<P>(
+        &mut self,
+        path: P,
+        dir_opts: Option<CreateOptions>,
+        file_opts: Option<CreateOptions>,
+        commando_sock: &mut CommandoSocket,
+    ) -> Result<(), Error>
+    where
+        P: Into<PathBuf>
+    {
+        let path: PathBuf = path.into();
+        if let Some(base) = path.parent() {
+            if !base.exists() {
+                create_path(base, None, dir_opts).map_err(|err| format_err!("{}", err))?;
+            }
+        }
+
+        let logger_options = FileLogOptions {
+            append: true,
+            file_opts: file_opts.unwrap_or(CreateOptions::default()),
+            ..Default::default()
+        };
+        let request_log = Arc::new(Mutex::new(FileLogger::new(&path, logger_options)?));
+        self.request_log = Some(Arc::clone(&request_log));
+
+        commando_sock.register_command("api-access-log-reopen".into(), move |_args| {
+            println!("re-opening log file");
+            request_log.lock().unwrap().reopen()?;
+            Ok(serde_json::Value::Null)
+        })?;
+
+        Ok(())
+    }
+
+    pub fn get_file_log(&self) -> Option<&Arc<Mutex<FileLogger>>> {
+        self.request_log.as_ref()
+    }
+}
diff --git a/proxmox-rest-server/src/command_socket.rs b/proxmox-rest-server/src/command_socket.rs
new file mode 100644 (file)
index 0000000..1d62d21
--- /dev/null
@@ -0,0 +1,222 @@
+use anyhow::{bail, format_err, Error};
+
+use std::collections::HashMap;
+use std::os::unix::io::AsRawFd;
+use std::path::{PathBuf, Path};
+use std::sync::Arc;
+
+use futures::*;
+use tokio::net::UnixListener;
+use serde::Serialize;
+use serde_json::Value;
+use nix::sys::socket;
+use nix::unistd::Gid;
+
+// Listens on a Unix Socket to handle simple command asynchronously
+fn create_control_socket<P, F>(path: P, gid: Gid, func: F) -> Result<impl Future<Output = ()>, Error>
+where
+    P: Into<PathBuf>,
+    F: Fn(Value) -> Result<Value, Error> + Send + Sync + 'static,
+{
+    let path: PathBuf = path.into();
+
+    let gid = gid.as_raw();
+
+    let socket = UnixListener::bind(&path)?;
+
+    let func = Arc::new(func);
+
+    let control_future = async move {
+        loop {
+            let (conn, _addr) = match socket.accept().await {
+                Ok(data) => data,
+                Err(err) => {
+                    eprintln!("failed to accept on control socket {:?}: {}", path, err);
+                    continue;
+                }
+            };
+
+            let opt = socket::sockopt::PeerCredentials {};
+            let cred = match socket::getsockopt(conn.as_raw_fd(), opt) {
+                Ok(cred) => cred,
+                Err(err) => {
+                    eprintln!("no permissions - unable to read peer credential - {}", err);
+                    continue;
+                }
+            };
+
+            // check permissions (same gid, root user, or backup group)
+            let mygid = unsafe { libc::getgid() };
+            if !(cred.uid() == 0 || cred.gid() == mygid || cred.gid() == gid) {
+                eprintln!("no permissions for {:?}", cred);
+                continue;
+            }
+
+            let (rx, mut tx) = tokio::io::split(conn);
+
+            let abort_future = super::last_worker_future().map(|_| ());
+
+            use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
+            let func = Arc::clone(&func);
+            let path = path.clone();
+            tokio::spawn(futures::future::select(
+                async move {
+                    let mut rx = tokio::io::BufReader::new(rx);
+                    let mut line = String::new();
+                    loop {
+                        line.clear();
+                        match rx.read_line({ line.clear(); &mut line }).await {
+                            Ok(0) => break,
+                            Ok(_) => (),
+                            Err(err) => {
+                                eprintln!("control socket {:?} read error: {}", path, err);
+                                return;
+                            }
+                        }
+
+                        let response = match line.parse::<Value>() {
+                            Ok(param) => match func(param) {
+                                Ok(res) => format!("OK: {}\n", res),
+                                Err(err) => format!("ERROR: {}\n", err),
+                            }
+                            Err(err) => format!("ERROR: {}\n", err),
+                        };
+
+                        if let Err(err) = tx.write_all(response.as_bytes()).await {
+                            eprintln!("control socket {:?} write response error: {}", path, err);
+                            return;
+                        }
+                    }
+                }.boxed(),
+                abort_future,
+            ).map(|_| ()));
+        }
+    }.boxed();
+
+    let abort_future = crate::last_worker_future().map_err(|_| {});
+    let task = futures::future::select(
+        control_future,
+        abort_future,
+    ).map(|_: futures::future::Either<(Result<(), Error>, _), _>| ());
+
+    Ok(task)
+}
+
+
+pub async fn send_command<P, T>(path: P, params: &T) -> Result<Value, Error>
+where
+    P: AsRef<Path>,
+    T: ?Sized + Serialize,
+{
+    let mut command_string = serde_json::to_string(params)?;
+    command_string.push('\n');
+    send_raw_command(path.as_ref(), &command_string).await
+}
+
+pub async fn send_raw_command<P>(path: P, command_string: &str) -> Result<Value, Error>
+where
+    P: AsRef<Path>,
+{
+    use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
+
+    let mut conn = tokio::net::UnixStream::connect(path)
+        .map_err(move |err| format_err!("control socket connect failed - {}", err))
+        .await?;
+
+    conn.write_all(command_string.as_bytes()).await?;
+    if !command_string.as_bytes().ends_with(b"\n") {
+        conn.write_all(b"\n").await?;
+    }
+
+    AsyncWriteExt::shutdown(&mut conn).await?;
+    let mut rx = tokio::io::BufReader::new(conn);
+    let mut data = String::new();
+    if rx.read_line(&mut data).await? == 0 {
+        bail!("no response");
+    }
+    if let Some(res) = data.strip_prefix("OK: ") {
+        match res.parse::<Value>() {
+            Ok(v) => Ok(v),
+            Err(err) => bail!("unable to parse json response - {}", err),
+        }
+    } else if let Some(err) = data.strip_prefix("ERROR: ") {
+        bail!("{}", err);
+    } else {
+        bail!("unable to parse response: {}", data);
+    }
+}
+
+/// A callback for a specific commando socket.
+pub type CommandoSocketFn = Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
+
+/// Tooling to get a single control command socket where one can register multiple commands
+/// dynamically.
+/// You need to call `spawn()` to make the socket active.
+pub struct CommandoSocket {
+    socket: PathBuf,
+    gid: Gid,
+    commands: HashMap<String, CommandoSocketFn>,
+}
+
+impl CommandoSocket {
+    pub fn new<P>(path: P, gid: Gid) -> Self
+        where P: Into<PathBuf>,
+    {
+        CommandoSocket {
+            socket: path.into(),
+            gid,
+            commands: HashMap::new(),
+        }
+    }
+
+    /// Spawn the socket and consume self, meaning you cannot register commands anymore after
+    /// calling this.
+    pub fn spawn(self) -> Result<(), Error> {
+        let control_future = create_control_socket(self.socket.to_owned(), self.gid, move |param| {
+            let param = param
+                .as_object()
+                .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
+
+            let command = match param.get("command") {
+                Some(Value::String(command)) => command.as_str(),
+                None => bail!("no command"),
+                _ => bail!("unable to parse command"),
+            };
+
+            if !self.commands.contains_key(command) {
+                bail!("got unknown command '{}'", command);
+            }
+
+            match self.commands.get(command) {
+                None => bail!("got unknown command '{}'", command),
+                Some(handler) => {
+                    let args = param.get("args"); //.unwrap_or(&Value::Null);
+                    (handler)(args)
+                },
+            }
+        })?;
+
+        tokio::spawn(control_future);
+
+        Ok(())
+    }
+
+    /// Register a new command with a callback.
+    pub fn register_command<F>(
+        &mut self,
+        command: String,
+        handler: F,
+    ) -> Result<(), Error>
+    where
+        F: Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static,
+    {
+
+        if self.commands.contains_key(&command) {
+            bail!("command '{}' already exists!", command);
+        }
+
+        self.commands.insert(command, Box::new(handler));
+
+        Ok(())
+    }
+}
diff --git a/proxmox-rest-server/src/file_logger.rs b/proxmox-rest-server/src/file_logger.rs
new file mode 100644 (file)
index 0000000..31100e4
--- /dev/null
@@ -0,0 +1,148 @@
+use std::io::Write;
+
+use anyhow::Error;
+use nix::fcntl::OFlag;
+
+use proxmox::tools::fs::{CreateOptions, atomic_open_or_create_file};
+
+/// Log messages with optional automatically added timestamps into files
+///
+/// Logs messages to file, and optionally to standard output.
+///
+///
+/// #### Example:
+/// ```
+/// # use anyhow::{bail, format_err, Error};
+/// use proxmox_rest_server::{flog, FileLogger, FileLogOptions};
+///
+/// # std::fs::remove_file("test.log");
+/// let options = FileLogOptions {
+///     to_stdout: true,
+///     exclusive: true,
+///     ..Default::default()
+/// };
+/// let mut log = FileLogger::new("test.log", options).unwrap();
+/// flog!(log, "A simple log: {}", "Hello!");
+/// # std::fs::remove_file("test.log");
+/// ```
+
+#[derive(Default)]
+/// Options to control the behavior of a ['FileLogger'] instance
+pub struct FileLogOptions {
+    /// Open underlying log file in append mode, useful when multiple concurrent processes
+    /// want to log to the same file (e.g., HTTP access log). Note that it is only atomic
+    /// for writes smaller than the PIPE_BUF (4k on Linux).
+    /// Inside the same process you may need to still use an mutex, for shared access.
+    pub append: bool,
+    /// Open underlying log file as readable
+    pub read: bool,
+    /// If set, ensure that the file is newly created or error out if already existing.
+    pub exclusive: bool,
+    /// Duplicate logged messages to STDOUT, like tee
+    pub to_stdout: bool,
+    /// Prefix messages logged to the file with the current local time as RFC 3339
+    pub prefix_time: bool,
+    /// File owner/group and mode
+    pub file_opts: CreateOptions,
+
+}
+
+pub struct FileLogger {
+    file: std::fs::File,
+    file_name: std::path::PathBuf,
+    options: FileLogOptions,
+}
+
+/// Log messages to [`FileLogger`](tools/struct.FileLogger.html)
+#[macro_export]
+macro_rules! flog {
+    ($log:expr, $($arg:tt)*) => ({
+        $log.log(format!($($arg)*));
+    })
+}
+
+impl FileLogger {
+    pub fn new<P: AsRef<std::path::Path>>(
+        file_name: P,
+        options: FileLogOptions,
+    ) -> Result<Self, Error> {
+        let file = Self::open(&file_name, &options)?;
+
+        let file_name: std::path::PathBuf = file_name.as_ref().to_path_buf();
+
+        Ok(Self { file, file_name, options })
+    }
+
+    pub fn reopen(&mut self) -> Result<&Self, Error> {
+        let file = Self::open(&self.file_name, &self.options)?;
+        self.file = file;
+        Ok(self)
+    }
+
+    fn open<P: AsRef<std::path::Path>>(
+        file_name: P,
+        options: &FileLogOptions,
+    ) -> Result<std::fs::File, Error> {
+
+        let mut flags = OFlag::O_CLOEXEC;
+
+        if options.read  {
+            flags |=  OFlag::O_RDWR;
+        } else {
+            flags |=  OFlag::O_WRONLY;
+        }
+
+        if options.append {
+            flags |=  OFlag::O_APPEND;
+        }
+        if options.exclusive {
+            flags |=  OFlag::O_EXCL;
+        }
+
+        let file = atomic_open_or_create_file(&file_name, flags, &[], options.file_opts.clone())?;
+
+        Ok(file)
+    }
+
+    pub fn log<S: AsRef<str>>(&mut self, msg: S) {
+        let msg = msg.as_ref();
+
+        if self.options.to_stdout {
+            let mut stdout = std::io::stdout();
+            stdout.write_all(msg.as_bytes()).unwrap();
+            stdout.write_all(b"\n").unwrap();
+        }
+
+        let line = if self.options.prefix_time {
+            let now = proxmox::tools::time::epoch_i64();
+            let rfc3339 = match proxmox::tools::time::epoch_to_rfc3339(now) {
+                Ok(rfc3339) => rfc3339,
+                Err(_) => "1970-01-01T00:00:00Z".into(), // for safety, should really not happen!
+            };
+            format!("{}: {}\n", rfc3339, msg)
+        } else {
+            format!("{}\n", msg)
+        };
+        if let Err(err) = self.file.write_all(line.as_bytes()) {
+            // avoid panicking, log methods should not do that
+            // FIXME: or, return result???
+            eprintln!("error writing to log file - {}", err);
+        }
+    }
+}
+
+impl std::io::Write for FileLogger {
+    fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
+        if self.options.to_stdout {
+            let _ = std::io::stdout().write(buf);
+        }
+        self.file.write(buf)
+    }
+
+    fn flush(&mut self) -> Result<(), std::io::Error> {
+        if self.options.to_stdout {
+            let _ = std::io::stdout().flush();
+        }
+        self.file.flush()
+    }
+}
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..38dd610c0272e994af3538d6714df14fea9fdf24 100644 (file)
@@ -0,0 +1,54 @@
+use anyhow::{bail, Error};
+
+mod state;
+pub use state::*;
+
+mod command_socket;
+pub use command_socket::*;
+
+mod file_logger;
+pub use file_logger::{FileLogger, FileLogOptions};
+
+mod api_config;
+pub use api_config::ApiConfig;
+
+pub enum AuthError {
+    Generic(Error),
+    NoData,
+}
+
+impl From<Error> for AuthError {
+    fn from(err: Error) -> Self {
+        AuthError::Generic(err)
+    }
+}
+
+pub trait ApiAuth {
+    fn check_auth(
+        &self,
+        headers: &http::HeaderMap,
+        method: &hyper::Method,
+    ) -> Result<String, AuthError>;
+}
+
+static mut SHUTDOWN_REQUESTED: bool = false;
+
+pub fn request_shutdown() {
+    unsafe {
+        SHUTDOWN_REQUESTED = true;
+    }
+    crate::server_shutdown();
+}
+
+#[inline(always)]
+pub fn shutdown_requested() -> bool {
+    unsafe { SHUTDOWN_REQUESTED }
+}
+
+pub fn fail_on_shutdown() -> Result<(), Error> {
+    if shutdown_requested() {
+        bail!("Server shutdown requested - aborting task");
+    }
+    Ok(())
+}
+
diff --git a/proxmox-rest-server/src/state.rs b/proxmox-rest-server/src/state.rs
new file mode 100644 (file)
index 0000000..468ef0a
--- /dev/null
@@ -0,0 +1,142 @@
+use anyhow::{Error};
+use lazy_static::lazy_static;
+use std::sync::Mutex;
+
+use futures::*;
+
+use tokio::signal::unix::{signal, SignalKind};
+
+use pbs_tools::broadcast_future::BroadcastData;
+
+#[derive(PartialEq, Copy, Clone, Debug)]
+pub enum ServerMode {
+    Normal,
+    Shutdown,
+}
+
+pub struct ServerState {
+    pub mode: ServerMode,
+    pub shutdown_listeners: BroadcastData<()>,
+    pub last_worker_listeners: BroadcastData<()>,
+    pub worker_count: usize,
+    pub internal_task_count: usize,
+    pub reload_request: bool,
+}
+
+lazy_static! {
+    static ref SERVER_STATE: Mutex<ServerState> = Mutex::new(ServerState {
+        mode: ServerMode::Normal,
+        shutdown_listeners: BroadcastData::new(),
+        last_worker_listeners: BroadcastData::new(),
+        worker_count: 0,
+        internal_task_count: 0,
+        reload_request: false,
+    });
+}
+
+pub fn server_state_init() -> Result<(), Error> {
+
+    let mut stream = signal(SignalKind::interrupt())?;
+
+    let future = async move {
+        while stream.recv().await.is_some() {
+            println!("got shutdown request (SIGINT)");
+            SERVER_STATE.lock().unwrap().reload_request = false;
+            crate::request_shutdown();
+        }
+    }.boxed();
+
+    let abort_future = last_worker_future().map_err(|_| {});
+    let task = futures::future::select(future, abort_future);
+
+    tokio::spawn(task.map(|_| ()));
+
+    let mut stream = signal(SignalKind::hangup())?;
+
+    let future = async move {
+        while stream.recv().await.is_some() {
+            println!("got reload request (SIGHUP)");
+            SERVER_STATE.lock().unwrap().reload_request = true;
+            crate::request_shutdown();
+        }
+    }.boxed();
+
+    let abort_future = last_worker_future().map_err(|_| {});
+    let task = futures::future::select(future, abort_future);
+
+    tokio::spawn(task.map(|_| ()));
+
+    Ok(())
+}
+
+pub fn is_reload_request() -> bool {
+    let data = SERVER_STATE.lock().unwrap();
+
+    data.mode == ServerMode::Shutdown && data.reload_request
+}
+
+pub fn server_shutdown() {
+    let mut data = SERVER_STATE.lock().unwrap();
+
+    println!("SET SHUTDOWN MODE");
+
+    data.mode = ServerMode::Shutdown;
+
+    data.shutdown_listeners.notify_listeners(Ok(()));
+
+    drop(data); // unlock
+
+    check_last_worker();
+}
+
+pub fn shutdown_future() -> impl Future<Output = ()> {
+    let mut data = SERVER_STATE.lock().unwrap();
+    data
+        .shutdown_listeners
+        .listen()
+        .map(|_| ())
+}
+
+pub fn last_worker_future() ->  impl Future<Output = Result<(), Error>> {
+    let mut data = SERVER_STATE.lock().unwrap();
+    data.last_worker_listeners.listen()
+}
+
+pub fn set_worker_count(count: usize) {
+    SERVER_STATE.lock().unwrap().worker_count = count;
+
+    check_last_worker();
+}
+
+pub fn check_last_worker() {
+    let mut data = SERVER_STATE.lock().unwrap();
+
+    if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.internal_task_count == 0) { return; }
+
+    data.last_worker_listeners.notify_listeners(Ok(()));
+}
+
+/// Spawns a tokio task that will be tracked for reload
+/// and if it is finished, notify the last_worker_listener if we
+/// are in shutdown mode
+pub fn spawn_internal_task<T>(task: T)
+where
+    T: Future + Send + 'static,
+    T::Output: Send + 'static,
+{
+    let mut data = SERVER_STATE.lock().unwrap();
+    data.internal_task_count += 1;
+
+    tokio::spawn(async move {
+        let _ = tokio::spawn(task).await; // ignore errors
+
+        { // drop mutex
+            let mut data = SERVER_STATE.lock().unwrap();
+            if data.internal_task_count > 0 {
+                data.internal_task_count -= 1;
+            }
+        }
+
+        check_last_worker();
+    });
+}
index e1a47604044d8ad314a236923548ffe81e05bd1f..e322e654d1c8b9b9b0ff83ca858a84d98a1ad130 100644 (file)
@@ -16,7 +16,7 @@ serde_json = "1.0"
 tokio = { version = "1.6", features = [ "rt", "rt-multi-thread" ] }
 
 pathpatterns = "0.1.2"
-proxmox = { version = "0.13.0", default-features = false, features = [] }
+proxmox = { version = "0.13.3", default-features = false, features = [] }
 pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 
 pbs-client = { path = "../pbs-client" }
index 33700a9020927134ced5c0a587d86484cd762d80..dc1c0ae9bbd81f71cbe00d81697952005048c4b9 100644 (file)
@@ -1505,7 +1505,7 @@ pub fn pxar_file_download(
             EntryKind::Directory => {
                 let (sender, receiver) = tokio::sync::mpsc::channel(100);
                 let channelwriter = AsyncChannelWriter::new(sender, 1024 * 1024);
-                crate::server::spawn_internal_task(
+                proxmox_rest_server::spawn_internal_task(
                     create_zip(channelwriter, decoder, path.clone(), false)
                 );
                 Body::wrap_stream(ReceiverStream::new(receiver).map_err(move |err| {
index 898e6291a2182fe0449a9fd9a8d4aa36b680509e..ecc1e2e083768d931ce00c53e9c9a21ec106ea77 100644 (file)
@@ -295,12 +295,12 @@ fn upgrade_to_websocket(
 
         let (ws, response) = WebSocket::new(parts.headers.clone())?;
 
-        crate::server::spawn_internal_task(async move {
+        proxmox_rest_server::spawn_internal_task(async move {
             let conn: Upgraded = match hyper::upgrade::on(Request::from_parts(parts, req_body))
                 .map_err(Error::from)
                 .await
             {
-                Ok(upgraded) => upgraded,
+               Ok(upgraded) => upgraded,
                 _ => bail!("error"),
             };
 
index d248ecaf484fd96aec0a3de038a174a0a1931062..df8d46b67614b691ca071fd9c24e29a75d7d892f 100644 (file)
@@ -29,8 +29,7 @@ use pbs_tools::format::HumanByte;
 use pbs_tools::fs::{lock_dir_noblock, DirLockGuard};
 use pbs_tools::process_locker::ProcessLockSharedGuard;
 use pbs_config::{open_backup_lockfile, BackupLockGuard};
-
-use crate::tools::fail_on_shutdown;
+use proxmox_rest_server::fail_on_shutdown;
 
 lazy_static! {
     static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
index 6e188c5fc492034d26edaca8636be7280beebe85..b8d2b2f3c0da58d7eab1fd7d410ae1434239126b 100644 (file)
@@ -172,7 +172,7 @@ fn verify_index_chunks(
     let check_abort = |pos: usize| -> Result<(), Error> {
         if pos & 1023 == 0 {
             verify_worker.worker.check_abort()?;
-            crate::tools::fail_on_shutdown()?;
+            proxmox_rest_server::fail_on_shutdown()?;
         }
         Ok(())
     };
@@ -184,7 +184,7 @@ fn verify_index_chunks(
 
     for (pos, _) in chunk_list {
         verify_worker.worker.check_abort()?;
-        crate::tools::fail_on_shutdown()?;
+        proxmox_rest_server::fail_on_shutdown()?;
 
         let info = index.chunk_info(pos).unwrap();
 
@@ -376,7 +376,7 @@ pub fn verify_backup_dir_with_lock(
         });
 
         verify_worker.worker.check_abort()?;
-        crate::tools::fail_on_shutdown()?;
+        proxmox_rest_server::fail_on_shutdown()?;
 
         if let Err(err) = result {
             task_log!(
index c8751bc540daee065da3126774b364d4a44b9ce5..452bbc3a04e33a803d77e2c8e37015ae4a9b8804 100644 (file)
@@ -3,8 +3,10 @@ use futures::*;
 
 use proxmox::try_block;
 use proxmox::api::RpcEnvironmentType;
+use proxmox::tools::fs::CreateOptions;
 
 use pbs_tools::auth::private_auth_key;
+use proxmox_rest_server::ApiConfig;
 
 use proxmox_backup::server::{
     self,
@@ -57,16 +59,25 @@ async fn run() -> Result<(), Error> {
     }
     let _ = csrf_secret(); // load with lazy_static
 
-    let mut config = server::ApiConfig::new(
+    let mut config = ApiConfig::new(
         pbs_buildcfg::JS_DIR,
         &proxmox_backup::api2::ROUTER,
         RpcEnvironmentType::PRIVILEGED,
         default_api_auth(),
     )?;
 
-    let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock());
+    let backup_user = pbs_config::backup_user()?;
+    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(crate::server::our_ctrl_sock(), backup_user.gid);
 
-    config.enable_file_log(pbs_buildcfg::API_ACCESS_LOG_FN, &mut commando_sock)?;
+    let dir_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
+    let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
+
+    config.enable_file_log(
+        pbs_buildcfg::API_ACCESS_LOG_FN,
+        Some(dir_opts),
+        Some(file_opts),
+        &mut commando_sock,
+    )?;
 
     let rest_server = RestServer::new(config);
 
@@ -78,7 +89,7 @@ async fn run() -> Result<(), Error> {
             Ok(ready
                 .and_then(|_| hyper::Server::builder(incoming)
                     .serve(rest_server)
-                    .with_graceful_shutdown(server::shutdown_future())
+                    .with_graceful_shutdown(proxmox_rest_server::shutdown_future())
                     .map_err(Error::from)
                 )
                 .map(|e| {
@@ -97,7 +108,7 @@ async fn run() -> Result<(), Error> {
     let init_result: Result<(), Error> = try_block!({
         server::register_task_control_commands(&mut commando_sock)?;
         commando_sock.spawn()?;
-        server::server_state_init()?;
+        proxmox_rest_server::server_state_init()?;
         Ok(())
     });
 
@@ -107,7 +118,7 @@ async fn run() -> Result<(), Error> {
 
     server.await?;
     log::info!("server shutting down, waiting for active workers to complete");
-    proxmox_backup::server::last_worker_future().await?;
+    proxmox_rest_server::last_worker_future().await?;
 
     log::info!("done - exit server");
 
index 4240711f9927dc037dc890ab6703329990237b4e..de534a659d397313d4b4eb1746c42623d32b54c5 100644 (file)
@@ -12,13 +12,15 @@ use serde_json::Value;
 use proxmox::try_block;
 use proxmox::api::RpcEnvironmentType;
 use proxmox::sys::linux::socket::set_tcp_keepalive;
+use proxmox::tools::fs::CreateOptions;
+
+use proxmox_rest_server::ApiConfig;
 
 use proxmox_backup::{
     backup::DataStore,
     server::{
         auth::default_api_auth,
         WorkerTask,
-        ApiConfig,
         rest::*,
         jobstate::{
             self,
@@ -106,9 +108,18 @@ async fn run() -> Result<(), Error> {
     config.register_template("index", &indexpath)?;
     config.register_template("console", "/usr/share/pve-xtermjs/index.html.hbs")?;
 
-    let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock());
+    let backup_user = pbs_config::backup_user()?;
+    let mut commando_sock = proxmox_rest_server::CommandoSocket::new(crate::server::our_ctrl_sock(), backup_user.gid);
+
+    let dir_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
+    let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
 
-    config.enable_file_log(pbs_buildcfg::API_ACCESS_LOG_FN, &mut commando_sock)?;
+    config.enable_file_log(
+        pbs_buildcfg::API_ACCESS_LOG_FN,
+        Some(dir_opts),
+        Some(file_opts),
+        &mut commando_sock,
+    )?;
 
     let rest_server = RestServer::new(config);
 
@@ -158,7 +169,7 @@ async fn run() -> Result<(), Error> {
             Ok(ready
                .and_then(|_| hyper::Server::builder(connections)
                     .serve(rest_server)
-                    .with_graceful_shutdown(server::shutdown_future())
+                    .with_graceful_shutdown(proxmox_rest_server::shutdown_future())
                     .map_err(Error::from)
                 )
                 .map_err(|err| eprintln!("server error: {}", err))
@@ -174,7 +185,7 @@ async fn run() -> Result<(), Error> {
     let init_result: Result<(), Error> = try_block!({
         server::register_task_control_commands(&mut commando_sock)?;
         commando_sock.spawn()?;
-        server::server_state_init()?;
+        proxmox_rest_server::server_state_init()?;
         Ok(())
     });
 
@@ -187,7 +198,7 @@ async fn run() -> Result<(), Error> {
 
     server.await?;
     log::info!("server shutting down, waiting for active workers to complete");
-    proxmox_backup::server::last_worker_future().await?;
+    proxmox_rest_server::last_worker_future().await?;
     log::info!("done - exit server");
 
     Ok(())
@@ -304,14 +315,14 @@ async fn accept_connection(
 }
 
 fn start_stat_generator() {
-    let abort_future = server::shutdown_future();
+    let abort_future = proxmox_rest_server::shutdown_future();
     let future = Box::pin(run_stat_generator());
     let task = futures::future::select(future, abort_future);
     tokio::spawn(task.map(|_| ()));
 }
 
 fn start_task_scheduler() {
-    let abort_future = server::shutdown_future();
+    let abort_future = proxmox_rest_server::shutdown_future();
     let future = Box::pin(run_task_scheduler());
     let task = futures::future::select(future, abort_future);
     tokio::spawn(task.map(|_| ()));
@@ -706,12 +717,12 @@ async fn schedule_task_log_rotate() {
 async fn command_reopen_logfiles() -> Result<(), Error> {
     // only care about the most recent daemon instance for each, proxy & api, as other older ones
     // should not respond to new requests anyway, but only finish their current one and then exit.
-    let sock = server::our_ctrl_sock();
-    let f1 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
+    let sock = crate::server::our_ctrl_sock();
+    let f1 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
 
-    let pid = server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
-    let sock = server::ctrl_sock_from_pid(pid);
-    let f2 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
+    let pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
+    let sock = crate::server::ctrl_sock_from_pid(pid);
+    let f2 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
 
     match futures::join!(f1, f2) {
         (Err(e1), Err(e2)) => Err(format_err!("reopen commands failed, proxy: {}; api: {}", e1, e2)),
index e9018eccfb6875ac66468abc9b8dbc6e748c6679..c5122cffa45b62775bb66b87b66aea79da11adea 100644 (file)
@@ -15,9 +15,11 @@ use tokio::sync::mpsc;
 use tokio_stream::wrappers::ReceiverStream;
 
 use proxmox::api::RpcEnvironmentType;
-use proxmox_backup::server::{rest::*, ApiConfig};
 
 use pbs_client::DEFAULT_VSOCK_PORT;
+use proxmox_rest_server::ApiConfig;
+
+use proxmox_backup::server::rest::*;
 
 mod proxmox_restore_daemon;
 use proxmox_restore_daemon::*;
index 30309bb88d2ecae2d359793d86ee13eadc3fba4c..ea1dabe67d4233410d7b57745718297acc7924dd 100644 (file)
@@ -4,10 +4,7 @@ use std::io::prelude::*;
 
 use anyhow::{bail, format_err, Error};
 
-use pbs_api_types::Authid;
-
-use pbs_config::CachedUserInfo;
-use proxmox_backup::server::auth::{ApiAuth, AuthError};
+use proxmox_rest_server::{ApiAuth, AuthError};
 
 const TICKET_FILE: &str = "/ticket";
 
@@ -20,11 +17,10 @@ impl ApiAuth for StaticAuth {
         &self,
         headers: &http::HeaderMap,
         _method: &hyper::Method,
-        _user_info: &CachedUserInfo,
-    ) -> Result<Authid, AuthError> {
+    ) -> Result<String, AuthError> {
         match headers.get(hyper::header::AUTHORIZATION) {
             Some(header) if header.to_str().unwrap_or("") == &self.ticket => {
-                Ok(Authid::root_auth_id().to_owned())
+                Ok(String::from("root@pam"))
             }
             _ => {
                 return Err(AuthError::Generic(format_err!(
index 1993317708dede870116da9d90a85ea2cb606743..321d1e02c2602a2cefd539bd1e3594999a3b6cf6 100644 (file)
@@ -1,11 +1,12 @@
 //! Provides authentication primitives for the HTTP server
-use anyhow::{format_err, Error};
+use anyhow::format_err;
 
 use std::sync::Arc;
 
 use pbs_tools::ticket::{self, Ticket};
 use pbs_config::{token_shadow, CachedUserInfo};
 use pbs_api_types::{Authid, Userid};
+use proxmox_rest_server::{ApiAuth, AuthError};
 
 use crate::auth_helpers::*;
 use crate::tools;
@@ -13,26 +14,6 @@ use crate::tools;
 use hyper::header;
 use percent_encoding::percent_decode_str;
 
-pub enum AuthError {
-    Generic(Error),
-    NoData,
-}
-
-impl From<Error> for AuthError {
-    fn from(err: Error) -> Self {
-        AuthError::Generic(err)
-    }
-}
-
-pub trait ApiAuth {
-    fn check_auth(
-        &self,
-        headers: &http::HeaderMap,
-        method: &hyper::Method,
-        user_info: &CachedUserInfo,
-    ) -> Result<Authid, AuthError>;
-}
-
 struct UserAuthData {
     ticket: String,
     csrf_token: Option<String>,
@@ -80,8 +61,10 @@ impl ApiAuth for UserApiAuth {
         &self,
         headers: &http::HeaderMap,
         method: &hyper::Method,
-        user_info: &CachedUserInfo,
-    ) -> Result<Authid, AuthError> {
+    ) -> Result<String, AuthError> {
+
+        let user_info = CachedUserInfo::new()?;
+
         let auth_data = Self::extract_auth_data(headers);
         match auth_data {
             Some(AuthData::User(user_auth_data)) => {
@@ -111,7 +94,7 @@ impl ApiAuth for UserApiAuth {
                     }
                 }
 
-                Ok(auth_id)
+                Ok(auth_id.to_string())
             }
             Some(AuthData::ApiToken(api_token)) => {
                 let mut parts = api_token.splitn(2, ':');
@@ -133,7 +116,7 @@ impl ApiAuth for UserApiAuth {
 
                 token_shadow::verify_secret(&tokenid, &tokensecret)?;
 
-                Ok(tokenid)
+                Ok(tokenid.to_string())
             }
             None => Err(AuthError::NoData),
         }
diff --git a/src/server/command_socket.rs b/src/server/command_socket.rs
deleted file mode 100644 (file)
index e3bd0c1..0000000
+++ /dev/null
@@ -1,220 +0,0 @@
-use anyhow::{bail, format_err, Error};
-
-use std::collections::HashMap;
-use std::os::unix::io::AsRawFd;
-use std::path::{PathBuf, Path};
-use std::sync::Arc;
-
-use futures::*;
-use tokio::net::UnixListener;
-use serde::Serialize;
-use serde_json::Value;
-use nix::sys::socket;
-
-/// Listens on a Unix Socket to handle simple command asynchronously
-fn create_control_socket<P, F>(path: P, func: F) -> Result<impl Future<Output = ()>, Error>
-where
-    P: Into<PathBuf>,
-    F: Fn(Value) -> Result<Value, Error> + Send + Sync + 'static,
-{
-    let path: PathBuf = path.into();
-
-    let backup_user = pbs_config::backup_user()?;
-    let backup_gid = backup_user.gid.as_raw();
-
-    let socket = UnixListener::bind(&path)?;
-
-    let func = Arc::new(func);
-
-    let control_future = async move {
-        loop {
-            let (conn, _addr) = match socket.accept().await {
-                Ok(data) => data,
-                Err(err) => {
-                    eprintln!("failed to accept on control socket {:?}: {}", path, err);
-                    continue;
-                }
-            };
-
-            let opt = socket::sockopt::PeerCredentials {};
-            let cred = match socket::getsockopt(conn.as_raw_fd(), opt) {
-                Ok(cred) => cred,
-                Err(err) => {
-                    eprintln!("no permissions - unable to read peer credential - {}", err);
-                    continue;
-                }
-            };
-
-            // check permissions (same gid, root user, or backup group)
-            let mygid = unsafe { libc::getgid() };
-            if !(cred.uid() == 0 || cred.gid() == mygid || cred.gid() == backup_gid) {
-                eprintln!("no permissions for {:?}", cred);
-                continue;
-            }
-
-            let (rx, mut tx) = tokio::io::split(conn);
-
-            let abort_future = super::last_worker_future().map(|_| ());
-
-            use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
-            let func = Arc::clone(&func);
-            let path = path.clone();
-            tokio::spawn(futures::future::select(
-                async move {
-                    let mut rx = tokio::io::BufReader::new(rx);
-                    let mut line = String::new();
-                    loop {
-                        line.clear();
-                        match rx.read_line({ line.clear(); &mut line }).await {
-                            Ok(0) => break,
-                            Ok(_) => (),
-                            Err(err) => {
-                                eprintln!("control socket {:?} read error: {}", path, err);
-                                return;
-                            }
-                        }
-
-                        let response = match line.parse::<Value>() {
-                            Ok(param) => match func(param) {
-                                Ok(res) => format!("OK: {}\n", res),
-                                Err(err) => format!("ERROR: {}\n", err),
-                            }
-                            Err(err) => format!("ERROR: {}\n", err),
-                        };
-
-                        if let Err(err) = tx.write_all(response.as_bytes()).await {
-                            eprintln!("control socket {:?} write response error: {}", path, err);
-                            return;
-                        }
-                    }
-                }.boxed(),
-                abort_future,
-            ).map(|_| ()));
-        }
-    }.boxed();
-
-    let abort_future = super::last_worker_future().map_err(|_| {});
-    let task = futures::future::select(
-        control_future,
-        abort_future,
-    ).map(|_: futures::future::Either<(Result<(), Error>, _), _>| ());
-
-    Ok(task)
-}
-
-
-pub async fn send_command<P, T>(path: P, params: &T) -> Result<Value, Error>
-where
-    P: AsRef<Path>,
-    T: ?Sized + Serialize,
-{
-    let mut command_string = serde_json::to_string(params)?;
-    command_string.push('\n');
-    send_raw_command(path.as_ref(), &command_string).await
-}
-
-pub async fn send_raw_command<P>(path: P, command_string: &str) -> Result<Value, Error>
-where
-    P: AsRef<Path>,
-{
-    use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
-
-    let mut conn = tokio::net::UnixStream::connect(path)
-        .map_err(move |err| format_err!("control socket connect failed - {}", err))
-        .await?;
-
-    conn.write_all(command_string.as_bytes()).await?;
-    if !command_string.as_bytes().ends_with(b"\n") {
-        conn.write_all(b"\n").await?;
-    }
-
-    AsyncWriteExt::shutdown(&mut conn).await?;
-    let mut rx = tokio::io::BufReader::new(conn);
-    let mut data = String::new();
-    if rx.read_line(&mut data).await? == 0 {
-        bail!("no response");
-    }
-    if let Some(res) = data.strip_prefix("OK: ") {
-        match res.parse::<Value>() {
-            Ok(v) => Ok(v),
-            Err(err) => bail!("unable to parse json response - {}", err),
-        }
-    } else if let Some(err) = data.strip_prefix("ERROR: ") {
-        bail!("{}", err);
-    } else {
-        bail!("unable to parse response: {}", data);
-    }
-}
-
-/// A callback for a specific commando socket.
-pub type CommandoSocketFn = Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
-
-/// Tooling to get a single control command socket where one can register multiple commands
-/// dynamically.
-/// You need to call `spawn()` to make the socket active.
-pub struct CommandoSocket {
-    socket: PathBuf,
-    commands: HashMap<String, CommandoSocketFn>,
-}
-
-impl CommandoSocket {
-    pub fn new<P>(path: P) -> Self
-        where P: Into<PathBuf>,
-    {
-        CommandoSocket {
-            socket: path.into(),
-            commands: HashMap::new(),
-        }
-    }
-
-    /// Spawn the socket and consume self, meaning you cannot register commands anymore after
-    /// calling this.
-    pub fn spawn(self) -> Result<(), Error> {
-        let control_future = create_control_socket(self.socket.to_owned(), move |param| {
-            let param = param
-                .as_object()
-                .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
-
-            let command = match param.get("command") {
-                Some(Value::String(command)) => command.as_str(),
-                None => bail!("no command"),
-                _ => bail!("unable to parse command"),
-            };
-
-            if !self.commands.contains_key(command) {
-                bail!("got unknown command '{}'", command);
-            }
-
-            match self.commands.get(command) {
-                None => bail!("got unknown command '{}'", command),
-                Some(handler) => {
-                    let args = param.get("args"); //.unwrap_or(&Value::Null);
-                    (handler)(args)
-                },
-            }
-        })?;
-
-        tokio::spawn(control_future);
-
-        Ok(())
-    }
-
-    /// Register a new command with a callback.
-    pub fn register_command<F>(
-        &mut self,
-        command: String,
-        handler: F,
-    ) -> Result<(), Error>
-    where
-        F: Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static,
-    {
-
-        if self.commands.contains_key(&command) {
-            bail!("command '{}' already exists!", command);
-        }
-
-        self.commands.insert(command, Box::new(handler));
-
-        Ok(())
-    }
-}
diff --git a/src/server/config.rs b/src/server/config.rs
deleted file mode 100644 (file)
index 195d7a8..0000000
+++ /dev/null
@@ -1,171 +0,0 @@
-use std::collections::HashMap;
-use std::path::PathBuf;
-use std::time::SystemTime;
-use std::fs::metadata;
-use std::sync::{Arc, Mutex, RwLock};
-
-use anyhow::{bail, Error, format_err};
-use hyper::Method;
-use handlebars::Handlebars;
-use serde::Serialize;
-
-use proxmox::api::{ApiMethod, Router, RpcEnvironmentType};
-use proxmox::tools::fs::{create_path, CreateOptions};
-
-use crate::tools::{FileLogger, FileLogOptions};
-use super::auth::ApiAuth;
-
-pub struct ApiConfig {
-    basedir: PathBuf,
-    router: &'static Router,
-    aliases: HashMap<String, PathBuf>,
-    env_type: RpcEnvironmentType,
-    templates: RwLock<Handlebars<'static>>,
-    template_files: RwLock<HashMap<String, (SystemTime, PathBuf)>>,
-    request_log: Option<Arc<Mutex<FileLogger>>>,
-    pub api_auth: Arc<dyn ApiAuth + Send + Sync>,
-}
-
-impl ApiConfig {
-    pub fn new<B: Into<PathBuf>>(
-        basedir: B,
-        router: &'static Router,
-        env_type: RpcEnvironmentType,
-        api_auth: Arc<dyn ApiAuth + Send + Sync>,
-    ) -> Result<Self, Error> {
-        Ok(Self {
-            basedir: basedir.into(),
-            router,
-            aliases: HashMap::new(),
-            env_type,
-            templates: RwLock::new(Handlebars::new()),
-            template_files: RwLock::new(HashMap::new()),
-            request_log: None,
-            api_auth,
-        })
-    }
-
-    pub fn find_method(
-        &self,
-        components: &[&str],
-        method: Method,
-        uri_param: &mut HashMap<String, String>,
-    ) -> Option<&'static ApiMethod> {
-
-        self.router.find_method(components, method, uri_param)
-    }
-
-    pub fn find_alias(&self, components: &[&str]) -> PathBuf {
-
-        let mut prefix = String::new();
-        let mut filename = self.basedir.clone();
-        let comp_len = components.len();
-        if comp_len >= 1 {
-            prefix.push_str(components[0]);
-            if let Some(subdir) = self.aliases.get(&prefix) {
-                filename.push(subdir);
-                components.iter().skip(1).for_each(|comp| filename.push(comp));
-            } else {
-                components.iter().for_each(|comp| filename.push(comp));
-            }
-        }
-        filename
-    }
-
-    pub fn add_alias<S, P>(&mut self, alias: S, path: P)
-        where S: Into<String>,
-              P: Into<PathBuf>,
-    {
-        self.aliases.insert(alias.into(), path.into());
-    }
-
-    pub fn env_type(&self) -> RpcEnvironmentType {
-        self.env_type
-    }
-
-    pub fn register_template<P>(&self, name: &str, path: P) -> Result<(), Error>
-    where
-        P: Into<PathBuf>
-    {
-        if self.template_files.read().unwrap().contains_key(name) {
-            bail!("template already registered");
-        }
-
-        let path: PathBuf = path.into();
-        let metadata = metadata(&path)?;
-        let mtime = metadata.modified()?;
-
-        self.templates.write().unwrap().register_template_file(name, &path)?;
-        self.template_files.write().unwrap().insert(name.to_string(), (mtime, path));
-
-        Ok(())
-    }
-
-    /// Checks if the template was modified since the last rendering
-    /// if yes, it loads a the new version of the template
-    pub fn render_template<T>(&self, name: &str, data: &T) -> Result<String, Error>
-    where
-        T: Serialize,
-    {
-        let path;
-        let mtime;
-        {
-            let template_files = self.template_files.read().unwrap();
-            let (old_mtime, old_path) = template_files.get(name).ok_or_else(|| format_err!("template not found"))?;
-
-            mtime = metadata(old_path)?.modified()?;
-            if mtime <= *old_mtime {
-                return self.templates.read().unwrap().render(name, data).map_err(|err| format_err!("{}", err));
-            }
-            path = old_path.to_path_buf();
-        }
-
-        {
-            let mut template_files = self.template_files.write().unwrap();
-            let mut templates = self.templates.write().unwrap();
-
-            templates.register_template_file(name, &path)?;
-            template_files.insert(name.to_string(), (mtime, path));
-
-            templates.render(name, data).map_err(|err| format_err!("{}", err))
-        }
-    }
-
-    pub fn enable_file_log<P>(
-        &mut self,
-        path: P,
-        commando_sock: &mut super::CommandoSocket,
-    ) -> Result<(), Error>
-    where
-        P: Into<PathBuf>
-    {
-        let path: PathBuf = path.into();
-        if let Some(base) = path.parent() {
-            if !base.exists() {
-                let backup_user = pbs_config::backup_user()?;
-                let opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
-                create_path(base, None, Some(opts)).map_err(|err| format_err!("{}", err))?;
-            }
-        }
-
-        let logger_options = FileLogOptions {
-            append: true,
-            owned_by_backup: true,
-            ..Default::default()
-        };
-        let request_log = Arc::new(Mutex::new(FileLogger::new(&path, logger_options)?));
-        self.request_log = Some(Arc::clone(&request_log));
-
-        commando_sock.register_command("api-access-log-reopen".into(), move |_args| {
-            println!("re-opening log file");
-            request_log.lock().unwrap().reopen()?;
-            Ok(serde_json::Value::Null)
-        })?;
-
-        Ok(())
-    }
-
-    pub fn get_file_log(&self) -> Option<&Arc<Mutex<FileLogger>>> {
-        self.request_log.as_ref()
-    }
-}
index 52c6e7bc6fc9ffe2a8f9b6558ac6cdd5ab4a18da..69cfe7b051adb8d05eba129b90d2cf66847d2fc1 100644 (file)
@@ -52,21 +52,12 @@ pub use environment::*;
 mod upid;
 pub use upid::*;
 
-mod state;
-pub use state::*;
-
-mod command_socket;
-pub use command_socket::*;
-
 mod worker_task;
 pub use worker_task::*;
 
 mod h2service;
 pub use h2service::*;
 
-pub mod config;
-pub use config::*;
-
 pub mod formatter;
 
 #[macro_use]
@@ -98,7 +89,7 @@ pub mod pull;
 pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
     let proxy_pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
     let sock = crate::server::ctrl_sock_from_pid(proxy_pid);
-    let _: Value = crate::server::send_raw_command(sock, "{\"command\":\"reload-certificate\"}\n")
+    let _: Value = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"reload-certificate\"}\n")
         .await?;
     Ok(())
 }
@@ -106,7 +97,7 @@ pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
 pub(crate) async fn notify_datastore_removed() -> Result<(), Error> {
     let proxy_pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
     let sock = crate::server::ctrl_sock_from_pid(proxy_pid);
-    let _: Value = crate::server::send_raw_command(sock, "{\"command\":\"datastore-removed\"}\n")
+    let _: Value = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"datastore-removed\"}\n")
         .await?;
     Ok(())
 }
index a648832a0ca2db54bccfe2373fc1c6e8112ea68e..0c4df9063349bd95847526a3b4388d559e111e6c 100644 (file)
@@ -29,21 +29,20 @@ use proxmox::api::{
     RpcEnvironmentType,
 };
 use proxmox::http_err;
+use proxmox::tools::fs::CreateOptions;
 
 use pbs_tools::compression::{DeflateEncoder, Level};
 use pbs_tools::stream::AsyncReaderStream;
 use pbs_api_types::{Authid, Userid};
+use proxmox_rest_server::{ApiConfig, FileLogger, FileLogOptions, AuthError};
 
-use super::auth::AuthError;
 use super::environment::RestEnvironment;
 use super::formatter::*;
-use super::ApiConfig;
 
 use crate::auth_helpers::*;
 use pbs_config::CachedUserInfo;
 use crate::tools;
 use crate::tools::compression::CompressionMethod;
-use crate::tools::FileLogger;
 
 extern "C" {
     fn tzset();
@@ -196,10 +195,16 @@ fn log_response(
     }
 }
 pub fn auth_logger() -> Result<FileLogger, Error> {
-    let logger_options = tools::FileLogOptions {
+    let backup_user = pbs_config::backup_user()?;
+
+    let file_opts = CreateOptions::new()
+        .owner(backup_user.uid)
+        .group(backup_user.gid);
+
+    let logger_options = FileLogOptions {
         append: true,
         prefix_time: true,
-        owned_by_backup: true,
+        file_opts,
         ..Default::default()
     };
     FileLogger::new(pbs_buildcfg::API_AUTH_LOG_FN, logger_options)
@@ -681,7 +686,6 @@ async fn handle_request(
 
     rpcenv.set_client_ip(Some(*peer));
 
-    let user_info = CachedUserInfo::new()?;
     let auth = &api.api_auth;
 
     let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000);
@@ -708,8 +712,8 @@ async fn handle_request(
             }
 
             if auth_required {
-                match auth.check_auth(&parts.headers, &method, &user_info) {
-                    Ok(authid) => rpcenv.set_auth_id(Some(authid.to_string())),
+                match auth.check_auth(&parts.headers, &method) {
+                    Ok(authid) => rpcenv.set_auth_id(Some(authid)),
                     Err(auth_err) => {
                         let err = match auth_err {
                             AuthError::Generic(err) => err,
@@ -738,6 +742,8 @@ async fn handle_request(
                 }
                 Some(api_method) => {
                     let auth_id = rpcenv.get_auth_id();
+                    let user_info = CachedUserInfo::new()?;
+
                     if !check_api_permission(
                         api_method.access.permission,
                         auth_id.as_deref(),
@@ -779,8 +785,9 @@ async fn handle_request(
 
         if comp_len == 0 {
             let language = extract_lang_header(&parts.headers);
-            match auth.check_auth(&parts.headers, &method, &user_info) {
+            match auth.check_auth(&parts.headers, &method) {
                 Ok(auth_id) => {
+                    let auth_id: Authid = auth_id.parse()?;
                     if !auth_id.is_token() {
                         let userid = auth_id.user();
                         let new_csrf_token = assemble_csrf_prevention_token(csrf_secret(), userid);
diff --git a/src/server/state.rs b/src/server/state.rs
deleted file mode 100644 (file)
index d294c93..0000000
+++ /dev/null
@@ -1,142 +0,0 @@
-use anyhow::{Error};
-use lazy_static::lazy_static;
-use std::sync::Mutex;
-
-use futures::*;
-
-use tokio::signal::unix::{signal, SignalKind};
-
-use pbs_tools::broadcast_future::BroadcastData;
-
-#[derive(PartialEq, Copy, Clone, Debug)]
-pub enum ServerMode {
-    Normal,
-    Shutdown,
-}
-
-pub struct ServerState {
-    pub mode: ServerMode,
-    pub shutdown_listeners: BroadcastData<()>,
-    pub last_worker_listeners: BroadcastData<()>,
-    pub worker_count: usize,
-    pub internal_task_count: usize,
-    pub reload_request: bool,
-}
-
-lazy_static! {
-    static ref SERVER_STATE: Mutex<ServerState> = Mutex::new(ServerState {
-        mode: ServerMode::Normal,
-        shutdown_listeners: BroadcastData::new(),
-        last_worker_listeners: BroadcastData::new(),
-        worker_count: 0,
-        internal_task_count: 0,
-        reload_request: false,
-    });
-}
-
-pub fn server_state_init() -> Result<(), Error> {
-
-    let mut stream = signal(SignalKind::interrupt())?;
-
-    let future = async move {
-        while stream.recv().await.is_some() {
-            println!("got shutdown request (SIGINT)");
-            SERVER_STATE.lock().unwrap().reload_request = false;
-            crate::tools::request_shutdown();
-        }
-    }.boxed();
-
-    let abort_future = last_worker_future().map_err(|_| {});
-    let task = futures::future::select(future, abort_future);
-
-    tokio::spawn(task.map(|_| ()));
-
-    let mut stream = signal(SignalKind::hangup())?;
-
-    let future = async move {
-        while stream.recv().await.is_some() {
-            println!("got reload request (SIGHUP)");
-            SERVER_STATE.lock().unwrap().reload_request = true;
-            crate::tools::request_shutdown();
-        }
-    }.boxed();
-
-    let abort_future = last_worker_future().map_err(|_| {});
-    let task = futures::future::select(future, abort_future);
-
-    tokio::spawn(task.map(|_| ()));
-
-    Ok(())
-}
-
-pub fn is_reload_request() -> bool {
-    let data = SERVER_STATE.lock().unwrap();
-
-    data.mode == ServerMode::Shutdown && data.reload_request
-}
-
-pub fn server_shutdown() {
-    let mut data = SERVER_STATE.lock().unwrap();
-
-    println!("SET SHUTDOWN MODE");
-
-    data.mode = ServerMode::Shutdown;
-
-    data.shutdown_listeners.notify_listeners(Ok(()));
-
-    drop(data); // unlock
-
-    check_last_worker();
-}
-
-pub fn shutdown_future() -> impl Future<Output = ()> {
-    let mut data = SERVER_STATE.lock().unwrap();
-    data
-        .shutdown_listeners
-        .listen()
-        .map(|_| ())
-}
-
-pub fn last_worker_future() ->  impl Future<Output = Result<(), Error>> {
-    let mut data = SERVER_STATE.lock().unwrap();
-    data.last_worker_listeners.listen()
-}
-
-pub fn set_worker_count(count: usize) {
-    SERVER_STATE.lock().unwrap().worker_count = count;
-
-    check_last_worker();
-}
-
-pub fn check_last_worker() {
-    let mut data = SERVER_STATE.lock().unwrap();
-
-    if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.internal_task_count == 0) { return; }
-
-    data.last_worker_listeners.notify_listeners(Ok(()));
-}
-
-/// Spawns a tokio task that will be tracked for reload
-/// and if it is finished, notify the last_worker_listener if we
-/// are in shutdown mode
-pub fn spawn_internal_task<T>(task: T)
-where
-    T: Future + Send + 'static,
-    T::Output: Send + 'static,
-{
-    let mut data = SERVER_STATE.lock().unwrap();
-    data.internal_task_count += 1;
-
-    tokio::spawn(async move {
-        let _ = tokio::spawn(task).await; // ignore errors
-
-        { // drop mutex
-            let mut data = SERVER_STATE.lock().unwrap();
-            if data.internal_task_count > 0 {
-                data.internal_task_count -= 1;
-            }
-        }
-
-        check_last_worker();
-    });
-}
index 2ef8ba9da4d3bd1b6ad138ef437bb7df8a9a312c..c62ba7df9391a4488d0af29ba7a4d94f60cfda90 100644 (file)
@@ -20,12 +20,10 @@ use pbs_buildcfg;
 use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
 use pbs_api_types::{Authid, TaskStateType, UPID};
 use pbs_config::{open_backup_lockfile, BackupLockGuard};
+use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
 
 use super::UPIDExt;
 
-use crate::server;
-use crate::tools::{FileLogger, FileLogOptions};
-
 macro_rules! taskdir {
     ($subdir:expr) => (concat!(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
 }
@@ -41,7 +39,7 @@ lazy_static! {
 
 /// checks if the task UPID refers to a worker from this process
 fn is_local_worker(upid: &UPID) -> bool {
-    upid.pid == server::pid() && upid.pstart == server::pstart()
+    upid.pid == crate::server::pid() && upid.pstart == crate::server::pstart()
 }
 
 /// Test if the task is still running
@@ -54,14 +52,14 @@ pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
         return Ok(false);
     }
 
-    let sock = server::ctrl_sock_from_pid(upid.pid);
+    let sock = crate::server::ctrl_sock_from_pid(upid.pid);
     let cmd = json!({
         "command": "worker-task-status",
         "args": {
             "upid": upid.to_string(),
         },
     });
-    let status = super::send_command(sock, &cmd).await?;
+    let status = proxmox_rest_server::send_command(sock, &cmd).await?;
 
     if let Some(active) = status.as_bool() {
         Ok(active)
@@ -84,7 +82,7 @@ pub fn worker_is_active_local(upid: &UPID) -> bool {
 }
 
 pub fn register_task_control_commands(
-    commando_sock: &mut super::CommandoSocket,
+    commando_sock: &mut CommandoSocket,
 ) -> Result<(), Error> {
     fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
         let args = if let Some(args) = args { args } else { bail!("missing args") };
@@ -128,14 +126,14 @@ pub fn abort_worker_async(upid: UPID) {
 
 pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
 
-    let sock = server::ctrl_sock_from_pid(upid.pid);
+    let sock = crate::server::ctrl_sock_from_pid(upid.pid);
     let cmd = json!({
         "command": "worker-task-abort",
         "args": {
             "upid": upid.to_string(),
         },
     });
-    super::send_command(sock, &cmd).map_ok(|_| ()).await
+    proxmox_rest_server::send_command(sock, &cmd).map_ok(|_| ()).await
 }
 
 fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
@@ -579,7 +577,6 @@ impl Iterator for TaskListInfoIterator {
 /// task/future. Each task can `log()` messages, which are stored
 /// persistently to files. Task should poll the `abort_requested`
 /// flag, and stop execution when requested.
-#[derive(Debug)]
 pub struct WorkerTask {
     upid: UPID,
     data: Mutex<WorkerTaskData>,
@@ -593,7 +590,6 @@ impl std::fmt::Display for WorkerTask {
     }
 }
 
-#[derive(Debug)]
 struct WorkerTaskData {
     logger: FileLogger,
     progress: f64, // 0..1
@@ -642,7 +638,7 @@ impl WorkerTask {
         {
             let mut hash = WORKER_TASK_LIST.lock().unwrap();
             hash.insert(task_id, worker.clone());
-            super::set_worker_count(hash.len());
+            proxmox_rest_server::set_worker_count(hash.len());
         }
 
         update_active_workers(Some(&upid))?;
@@ -729,7 +725,7 @@ impl WorkerTask {
 
         WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
         let _ = update_active_workers(None);
-        super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
+        proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
     }
 
     /// Log a message.
index d298bf16ed84f41c2d670a0a747b95bea944d376..1291601bab5ea4e28ff8b85a92b97e4004a61f2a 100644 (file)
@@ -16,7 +16,6 @@ use futures::future::{self, Either};
 
 use proxmox::tools::io::{ReadExt, WriteExt};
 
-use crate::server;
 use crate::tools::{fd_change_cloexec, self};
 
 #[link(name = "systemd")]
@@ -274,11 +273,11 @@ where
     ).await?;
 
     let server_future = create_service(listener, NotifyReady)?;
-    let shutdown_future = server::shutdown_future();
+    let shutdown_future = proxmox_rest_server::shutdown_future();
 
     let finish_future = match future::select(server_future, shutdown_future).await {
         Either::Left((_, _)) => {
-            crate::tools::request_shutdown(); // make sure we are in shutdown mode
+            proxmox_rest_server::request_shutdown(); // make sure we are in shutdown mode
             None
         }
         Either::Right((_, server_future)) => Some(server_future),
@@ -286,7 +285,7 @@ where
 
     let mut reloader = Some(reloader);
 
-    if server::is_reload_request() {
+    if proxmox_rest_server::is_reload_request() {
         log::info!("daemon reload...");
         if let Err(e) = systemd_notify(SystemdNotify::Reloading) {
             log::error!("failed to notify systemd about the state change: {}", e);
@@ -305,7 +304,7 @@ where
     }
 
     // FIXME: this is a hack, replace with sd_notify_barrier when available
-    if server::is_reload_request() {
+    if proxmox_rest_server::is_reload_request() {
         wait_service_is_not_state(service_name, "reloading").await?;
     }
 
diff --git a/src/tools/file_logger.rs b/src/tools/file_logger.rs
deleted file mode 100644 (file)
index 5b8db2c..0000000
+++ /dev/null
@@ -1,142 +0,0 @@
-use anyhow::Error;
-use std::io::Write;
-
-/// Log messages with optional automatically added timestamps into files
-///
-/// Logs messages to file, and optionally to standard output.
-///
-///
-/// #### Example:
-/// ```
-/// # use anyhow::{bail, format_err, Error};
-/// use proxmox_backup::flog;
-/// use proxmox_backup::tools::{FileLogger, FileLogOptions};
-///
-/// # std::fs::remove_file("test.log");
-/// let options = FileLogOptions {
-///     to_stdout: true,
-///     exclusive: true,
-///     ..Default::default()
-/// };
-/// let mut log = FileLogger::new("test.log", options).unwrap();
-/// flog!(log, "A simple log: {}", "Hello!");
-/// # std::fs::remove_file("test.log");
-/// ```
-
-#[derive(Debug, Default)]
-/// Options to control the behavior of a ['FileLogger'] instance
-pub struct FileLogOptions {
-    /// Open underlying log file in append mode, useful when multiple concurrent processes
-    /// want to log to the same file (e.g., HTTP access log). Note that it is only atomic
-    /// for writes smaller than the PIPE_BUF (4k on Linux).
-    /// Inside the same process you may need to still use an mutex, for shared access.
-    pub append: bool,
-    /// Open underlying log file as readable
-    pub read: bool,
-    /// If set, ensure that the file is newly created or error out if already existing.
-    pub exclusive: bool,
-    /// Duplicate logged messages to STDOUT, like tee
-    pub to_stdout: bool,
-    /// Prefix messages logged to the file with the current local time as RFC 3339
-    pub prefix_time: bool,
-    /// if set, the file is tried to be chowned by the backup:backup user/group
-    /// Note, this is not designed race free as anybody could set it to another user afterwards
-    /// anyway. It must thus be used by all processes which doe not run as backup uid/gid.
-    pub owned_by_backup: bool,
-}
-
-#[derive(Debug)]
-pub struct FileLogger {
-    file: std::fs::File,
-    file_name: std::path::PathBuf,
-    options: FileLogOptions,
-}
-
-/// Log messages to [`FileLogger`](tools/struct.FileLogger.html)
-#[macro_export]
-macro_rules! flog {
-    ($log:expr, $($arg:tt)*) => ({
-        $log.log(format!($($arg)*));
-    })
-}
-
-impl FileLogger {
-    pub fn new<P: AsRef<std::path::Path>>(
-        file_name: P,
-        options: FileLogOptions,
-    ) -> Result<Self, Error> {
-        let file = Self::open(&file_name, &options)?;
-
-        let file_name: std::path::PathBuf = file_name.as_ref().to_path_buf();
-
-        Ok(Self { file, file_name, options })
-    }
-
-    pub fn reopen(&mut self) -> Result<&Self, Error> {
-        let file = Self::open(&self.file_name, &self.options)?;
-        self.file = file;
-        Ok(self)
-    }
-
-    fn open<P: AsRef<std::path::Path>>(
-        file_name: P,
-        options: &FileLogOptions,
-    ) -> Result<std::fs::File, Error> {
-        let file = std::fs::OpenOptions::new()
-            .read(options.read)
-            .write(true)
-            .append(options.append)
-            .create_new(options.exclusive)
-            .create(!options.exclusive)
-            .open(&file_name)?;
-
-        if options.owned_by_backup {
-            let backup_user = pbs_config::backup_user()?;
-            nix::unistd::chown(file_name.as_ref(), Some(backup_user.uid), Some(backup_user.gid))?;
-        }
-
-        Ok(file)
-    }
-
-    pub fn log<S: AsRef<str>>(&mut self, msg: S) {
-        let msg = msg.as_ref();
-
-        if self.options.to_stdout {
-            let mut stdout = std::io::stdout();
-            stdout.write_all(msg.as_bytes()).unwrap();
-            stdout.write_all(b"\n").unwrap();
-        }
-
-        let line = if self.options.prefix_time {
-            let now = proxmox::tools::time::epoch_i64();
-            let rfc3339 = match proxmox::tools::time::epoch_to_rfc3339(now) {
-                Ok(rfc3339) => rfc3339,
-                Err(_) => "1970-01-01T00:00:00Z".into(), // for safety, should really not happen!
-            };
-            format!("{}: {}\n", rfc3339, msg)
-        } else {
-            format!("{}\n", msg)
-        };
-        if let Err(err) = self.file.write_all(line.as_bytes()) {
-            // avoid panicking, log methods should not do that
-            // FIXME: or, return result???
-            eprintln!("error writing to log file - {}", err);
-        }
-    }
-}
-
-impl std::io::Write for FileLogger {
-    fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
-        if self.options.to_stdout {
-            let _ = std::io::stdout().write(buf);
-        }
-        self.file.write(buf)
-    }
-
-    fn flush(&mut self) -> Result<(), std::io::Error> {
-        if self.options.to_stdout {
-            let _ = std::io::stdout().flush();
-        }
-        self.file.flush()
-    }
-}
index 64e592b20dec615d82c1e925b49c8fd7fca662f2..f8b363f549958af0cf4160c3ae47479550b1cebf 100644 (file)
@@ -31,9 +31,6 @@ pub mod ticket;
 pub mod parallel_handler;
 pub use parallel_handler::ParallelHandler;
 
-mod file_logger;
-pub use file_logger::{FileLogger, FileLogOptions};
-
 /// Shortcut for md5 sums.
 pub fn md5sum(data: &[u8]) -> Result<DigestBytes, Error> {
     hash(MessageDigest::md5(), data).map_err(Error::from)
@@ -123,27 +120,6 @@ pub fn fd_change_cloexec(fd: RawFd, on: bool) -> Result<(), Error> {
     Ok(())
 }
 
-static mut SHUTDOWN_REQUESTED: bool = false;
-
-pub fn request_shutdown() {
-    unsafe {
-        SHUTDOWN_REQUESTED = true;
-    }
-    crate::server::server_shutdown();
-}
-
-#[inline(always)]
-pub fn shutdown_requested() -> bool {
-    unsafe { SHUTDOWN_REQUESTED }
-}
-
-pub fn fail_on_shutdown() -> Result<(), Error> {
-    if shutdown_requested() {
-        bail!("Server shutdown requested - aborting task");
-    }
-    Ok(())
-}
-
 /// safe wrapper for `nix::sys::socket::socketpair` defaulting to `O_CLOEXEC` and guarding the file
 /// descriptors.
 pub fn socketpair() -> Result<(Fd, Fd), Error> {
index 736ae65951e44af4ff57700953f5ca3e7c86b95f..7271ea558a23fa7e9039d1aa093e2fb1c57325fb 100644 (file)
@@ -1,6 +1,5 @@
 use anyhow::{bail, Error};
 
-#[macro_use]
 extern crate proxmox_backup;
 
 extern crate tokio;
@@ -10,8 +9,8 @@ use proxmox::try_block;
 
 use pbs_api_types::{Authid, UPID};
 
+use proxmox_rest_server::{flog, CommandoSocket};
 use proxmox_backup::server;
-use proxmox_backup::tools;
 
 fn garbage_collection(worker: &server::WorkerTask) -> Result<(), Error> {
 
@@ -45,11 +44,11 @@ fn worker_task_abort() -> Result<(), Error> {
     let rt = tokio::runtime::Runtime::new().unwrap();
     rt.block_on(async move {
 
-        let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock());
+        let mut commando_sock = CommandoSocket::new(server::our_ctrl_sock(), nix::unistd::Gid::current());
 
         let init_result: Result<(), Error> = try_block!({
             server::register_task_control_commands(&mut commando_sock)?;
-            server::server_state_init()?;
+            proxmox_rest_server::server_state_init()?;
             Ok(())
         });
 
@@ -73,7 +72,7 @@ fn worker_task_abort() -> Result<(), Error> {
                 println!("WORKER {}", worker);
 
                 let result = garbage_collection(&worker);
-                tools::request_shutdown();
+                proxmox_rest_server::request_shutdown();
 
                 if let Err(err) = result {
                     println!("got expected error: {}", err);