pathpatterns = "0.1.2"
pxar = { version = "0.10.1", features = [ "tokio-io" ] }
-proxmox = { version = "0.13.0", features = [ "sortable-macro", "api-macro", "cli", "router", "tfa" ] }
+proxmox = { version = "0.13.3", features = [ "sortable-macro", "api-macro", "cli", "router", "tfa" ] }
proxmox-acme-rs = "0.2.1"
proxmox-apt = "0.7.0"
proxmox-http = { version = "0.4.0", features = [ "client", "http-helpers", "websocket" ] }
regex = "1.2"
serde = { version = "1.0", features = ["derive"] }
-proxmox = { version = "0.13.0", default-features = false, features = [ "api-macro" ] }
+proxmox = { version = "0.13.3", default-features = false, features = [ "api-macro" ] }
pbs-systemd = { path = "../pbs-systemd" }
pbs-tools = { path = "../pbs-tools" }
xdg = "2.2"
pathpatterns = "0.1.2"
-proxmox = { version = "0.13.0", default-features = false, features = [ "cli" ] }
+proxmox = { version = "0.13.3", default-features = false, features = [ "cli" ] }
proxmox-fuse = "0.1.1"
proxmox-http = { version = "0.4.0", features = [ "client", "http-helpers", "websocket" ] }
pxar = { version = "0.10.1", features = [ "tokio-io" ] }
regex = "1.2"
once_cell = "1.3.1"
-proxmox = { version = "0.13.0", default-features = false, features = [ "cli" ] }
+proxmox = { version = "0.13.3", default-features = false, features = [ "cli" ] }
pbs-api-types = { path = "../pbs-api-types" }
pbs-buildcfg = { path = "../pbs-buildcfg" }
pathpatterns = "0.1.2"
pxar = "0.10.1"
-proxmox = { version = "0.13.0", default-features = false, features = [ "api-macro" ] }
+proxmox = { version = "0.13.3", default-features = false, features = [ "api-macro" ] }
pbs-api-types = { path = "../pbs-api-types" }
pbs-tools = { path = "../pbs-tools" }
regex = "1.2"
tokio = { version = "1.6", features = [] }
-proxmox = "0.13.0"
+proxmox = "0.13.3"
proxmox-fuse = "0.1.1"
pbs-tools = { path = "../pbs-tools" }
lazy_static = "1.4"
nom = "5.1"
-proxmox = { version = "0.13.0", default-features = false }
+proxmox = { version = "0.13.3", default-features = false }
pbs-tools = { path = "../pbs-tools" }
regex = "1.2"
udev = ">= 0.3, <0.5"
-proxmox = { version = "0.13.0", default-features = false, features = [] }
+proxmox = { version = "0.13.3", default-features = false, features = [] }
pbs-api-types = { path = "../pbs-api-types" }
pbs-tools = { path = "../pbs-tools" }
walkdir = "2"
zstd = { version = "0.6", features = [ "bindgen" ] }
-proxmox = { version = "0.13.0", default-features = false, features = [ "tokio" ] }
+proxmox = { version = "0.13.3", default-features = false, features = [ "tokio" ] }
pbs-buildcfg = { path = "../pbs-buildcfg" }
pbs-runtime = { path = "../pbs-runtime" }
pathpatterns = "0.1.2"
pxar = { version = "0.10.1", features = [ "tokio-io" ] }
-proxmox = { version = "0.13.0", features = [ "sortable-macro", "api-macro", "cli", "router" ] }
+proxmox = { version = "0.13.3", features = [ "sortable-macro", "api-macro", "cli", "router" ] }
pbs-api-types = { path = "../pbs-api-types" }
pbs-buildcfg = { path = "../pbs-buildcfg" }
walkdir = "2"
serde_json = "1.0"
-proxmox = { version = "0.13.0", features = [ "api-macro", "cli" ] }
+proxmox = { version = "0.13.3", features = [ "api-macro", "cli" ] }
pbs-config = { path = "../pbs-config" }
pbs-client = { path = "../pbs-client" }
pxar = { version = "0.10.1", features = [ "tokio-io" ] }
-proxmox = { version = "0.13.0", features = [ "api-macro", "cli" ] }
+proxmox = { version = "0.13.3", features = [ "api-macro", "cli" ] }
pbs-api-types = { path = "../pbs-api-types" }
pbs-buildcfg = { path = "../pbs-buildcfg" }
[dependencies]
anyhow = "1.0"
+futures = "0.3"
+handlebars = "3.0"
+http = "0.2"
+hyper = { version = "0.14", features = [ "full" ] }
+lazy_static = "1.4"
+libc = "0.2"
+nix = "0.19.1"
+serde = { version = "1.0", features = [] }
+serde_json = "1.0"
+tokio = { version = "1.6", features = ["signal", "process"] }
+
+proxmox = { version = "0.13.3", features = [ "router"] }
+
+# fixme: remove this dependency (pbs_tools::broadcast_future)
+pbs-tools = { path = "../pbs-tools" }
--- /dev/null
+use std::collections::HashMap;
+use std::path::PathBuf;
+use std::time::SystemTime;
+use std::fs::metadata;
+use std::sync::{Arc, Mutex, RwLock};
+
+use anyhow::{bail, Error, format_err};
+use hyper::Method;
+use handlebars::Handlebars;
+use serde::Serialize;
+
+use proxmox::api::{ApiMethod, Router, RpcEnvironmentType};
+use proxmox::tools::fs::{create_path, CreateOptions};
+
+use crate::{ApiAuth, FileLogger, FileLogOptions, CommandoSocket};
+
+pub struct ApiConfig {
+ basedir: PathBuf,
+ router: &'static Router,
+ aliases: HashMap<String, PathBuf>,
+ env_type: RpcEnvironmentType,
+ templates: RwLock<Handlebars<'static>>,
+ template_files: RwLock<HashMap<String, (SystemTime, PathBuf)>>,
+ request_log: Option<Arc<Mutex<FileLogger>>>,
+ pub api_auth: Arc<dyn ApiAuth + Send + Sync>,
+}
+
+impl ApiConfig {
+ pub fn new<B: Into<PathBuf>>(
+ basedir: B,
+ router: &'static Router,
+ env_type: RpcEnvironmentType,
+ api_auth: Arc<dyn ApiAuth + Send + Sync>,
+ ) -> Result<Self, Error> {
+ Ok(Self {
+ basedir: basedir.into(),
+ router,
+ aliases: HashMap::new(),
+ env_type,
+ templates: RwLock::new(Handlebars::new()),
+ template_files: RwLock::new(HashMap::new()),
+ request_log: None,
+ api_auth,
+ })
+ }
+
+ pub fn find_method(
+ &self,
+ components: &[&str],
+ method: Method,
+ uri_param: &mut HashMap<String, String>,
+ ) -> Option<&'static ApiMethod> {
+
+ self.router.find_method(components, method, uri_param)
+ }
+
+ pub fn find_alias(&self, components: &[&str]) -> PathBuf {
+
+ let mut prefix = String::new();
+ let mut filename = self.basedir.clone();
+ let comp_len = components.len();
+ if comp_len >= 1 {
+ prefix.push_str(components[0]);
+ if let Some(subdir) = self.aliases.get(&prefix) {
+ filename.push(subdir);
+ components.iter().skip(1).for_each(|comp| filename.push(comp));
+ } else {
+ components.iter().for_each(|comp| filename.push(comp));
+ }
+ }
+ filename
+ }
+
+ pub fn add_alias<S, P>(&mut self, alias: S, path: P)
+ where S: Into<String>,
+ P: Into<PathBuf>,
+ {
+ self.aliases.insert(alias.into(), path.into());
+ }
+
+ pub fn env_type(&self) -> RpcEnvironmentType {
+ self.env_type
+ }
+
+ pub fn register_template<P>(&self, name: &str, path: P) -> Result<(), Error>
+ where
+ P: Into<PathBuf>
+ {
+ if self.template_files.read().unwrap().contains_key(name) {
+ bail!("template already registered");
+ }
+
+ let path: PathBuf = path.into();
+ let metadata = metadata(&path)?;
+ let mtime = metadata.modified()?;
+
+ self.templates.write().unwrap().register_template_file(name, &path)?;
+ self.template_files.write().unwrap().insert(name.to_string(), (mtime, path));
+
+ Ok(())
+ }
+
+ /// Checks if the template was modified since the last rendering
+ /// if yes, it loads a the new version of the template
+ pub fn render_template<T>(&self, name: &str, data: &T) -> Result<String, Error>
+ where
+ T: Serialize,
+ {
+ let path;
+ let mtime;
+ {
+ let template_files = self.template_files.read().unwrap();
+ let (old_mtime, old_path) = template_files.get(name).ok_or_else(|| format_err!("template not found"))?;
+
+ mtime = metadata(old_path)?.modified()?;
+ if mtime <= *old_mtime {
+ return self.templates.read().unwrap().render(name, data).map_err(|err| format_err!("{}", err));
+ }
+ path = old_path.to_path_buf();
+ }
+
+ {
+ let mut template_files = self.template_files.write().unwrap();
+ let mut templates = self.templates.write().unwrap();
+
+ templates.register_template_file(name, &path)?;
+ template_files.insert(name.to_string(), (mtime, path));
+
+ templates.render(name, data).map_err(|err| format_err!("{}", err))
+ }
+ }
+
+ pub fn enable_file_log<P>(
+ &mut self,
+ path: P,
+ dir_opts: Option<CreateOptions>,
+ file_opts: Option<CreateOptions>,
+ commando_sock: &mut CommandoSocket,
+ ) -> Result<(), Error>
+ where
+ P: Into<PathBuf>
+ {
+ let path: PathBuf = path.into();
+ if let Some(base) = path.parent() {
+ if !base.exists() {
+ create_path(base, None, dir_opts).map_err(|err| format_err!("{}", err))?;
+ }
+ }
+
+ let logger_options = FileLogOptions {
+ append: true,
+ file_opts: file_opts.unwrap_or(CreateOptions::default()),
+ ..Default::default()
+ };
+ let request_log = Arc::new(Mutex::new(FileLogger::new(&path, logger_options)?));
+ self.request_log = Some(Arc::clone(&request_log));
+
+ commando_sock.register_command("api-access-log-reopen".into(), move |_args| {
+ println!("re-opening log file");
+ request_log.lock().unwrap().reopen()?;
+ Ok(serde_json::Value::Null)
+ })?;
+
+ Ok(())
+ }
+
+ pub fn get_file_log(&self) -> Option<&Arc<Mutex<FileLogger>>> {
+ self.request_log.as_ref()
+ }
+}
--- /dev/null
+use anyhow::{bail, format_err, Error};
+
+use std::collections::HashMap;
+use std::os::unix::io::AsRawFd;
+use std::path::{PathBuf, Path};
+use std::sync::Arc;
+
+use futures::*;
+use tokio::net::UnixListener;
+use serde::Serialize;
+use serde_json::Value;
+use nix::sys::socket;
+use nix::unistd::Gid;
+
+// Listens on a Unix Socket to handle simple command asynchronously
+fn create_control_socket<P, F>(path: P, gid: Gid, func: F) -> Result<impl Future<Output = ()>, Error>
+where
+ P: Into<PathBuf>,
+ F: Fn(Value) -> Result<Value, Error> + Send + Sync + 'static,
+{
+ let path: PathBuf = path.into();
+
+ let gid = gid.as_raw();
+
+ let socket = UnixListener::bind(&path)?;
+
+ let func = Arc::new(func);
+
+ let control_future = async move {
+ loop {
+ let (conn, _addr) = match socket.accept().await {
+ Ok(data) => data,
+ Err(err) => {
+ eprintln!("failed to accept on control socket {:?}: {}", path, err);
+ continue;
+ }
+ };
+
+ let opt = socket::sockopt::PeerCredentials {};
+ let cred = match socket::getsockopt(conn.as_raw_fd(), opt) {
+ Ok(cred) => cred,
+ Err(err) => {
+ eprintln!("no permissions - unable to read peer credential - {}", err);
+ continue;
+ }
+ };
+
+ // check permissions (same gid, root user, or backup group)
+ let mygid = unsafe { libc::getgid() };
+ if !(cred.uid() == 0 || cred.gid() == mygid || cred.gid() == gid) {
+ eprintln!("no permissions for {:?}", cred);
+ continue;
+ }
+
+ let (rx, mut tx) = tokio::io::split(conn);
+
+ let abort_future = super::last_worker_future().map(|_| ());
+
+ use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
+ let func = Arc::clone(&func);
+ let path = path.clone();
+ tokio::spawn(futures::future::select(
+ async move {
+ let mut rx = tokio::io::BufReader::new(rx);
+ let mut line = String::new();
+ loop {
+ line.clear();
+ match rx.read_line({ line.clear(); &mut line }).await {
+ Ok(0) => break,
+ Ok(_) => (),
+ Err(err) => {
+ eprintln!("control socket {:?} read error: {}", path, err);
+ return;
+ }
+ }
+
+ let response = match line.parse::<Value>() {
+ Ok(param) => match func(param) {
+ Ok(res) => format!("OK: {}\n", res),
+ Err(err) => format!("ERROR: {}\n", err),
+ }
+ Err(err) => format!("ERROR: {}\n", err),
+ };
+
+ if let Err(err) = tx.write_all(response.as_bytes()).await {
+ eprintln!("control socket {:?} write response error: {}", path, err);
+ return;
+ }
+ }
+ }.boxed(),
+ abort_future,
+ ).map(|_| ()));
+ }
+ }.boxed();
+
+ let abort_future = crate::last_worker_future().map_err(|_| {});
+ let task = futures::future::select(
+ control_future,
+ abort_future,
+ ).map(|_: futures::future::Either<(Result<(), Error>, _), _>| ());
+
+ Ok(task)
+}
+
+
+pub async fn send_command<P, T>(path: P, params: &T) -> Result<Value, Error>
+where
+ P: AsRef<Path>,
+ T: ?Sized + Serialize,
+{
+ let mut command_string = serde_json::to_string(params)?;
+ command_string.push('\n');
+ send_raw_command(path.as_ref(), &command_string).await
+}
+
+pub async fn send_raw_command<P>(path: P, command_string: &str) -> Result<Value, Error>
+where
+ P: AsRef<Path>,
+{
+ use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
+
+ let mut conn = tokio::net::UnixStream::connect(path)
+ .map_err(move |err| format_err!("control socket connect failed - {}", err))
+ .await?;
+
+ conn.write_all(command_string.as_bytes()).await?;
+ if !command_string.as_bytes().ends_with(b"\n") {
+ conn.write_all(b"\n").await?;
+ }
+
+ AsyncWriteExt::shutdown(&mut conn).await?;
+ let mut rx = tokio::io::BufReader::new(conn);
+ let mut data = String::new();
+ if rx.read_line(&mut data).await? == 0 {
+ bail!("no response");
+ }
+ if let Some(res) = data.strip_prefix("OK: ") {
+ match res.parse::<Value>() {
+ Ok(v) => Ok(v),
+ Err(err) => bail!("unable to parse json response - {}", err),
+ }
+ } else if let Some(err) = data.strip_prefix("ERROR: ") {
+ bail!("{}", err);
+ } else {
+ bail!("unable to parse response: {}", data);
+ }
+}
+
+/// A callback for a specific commando socket.
+pub type CommandoSocketFn = Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
+
+/// Tooling to get a single control command socket where one can register multiple commands
+/// dynamically.
+/// You need to call `spawn()` to make the socket active.
+pub struct CommandoSocket {
+ socket: PathBuf,
+ gid: Gid,
+ commands: HashMap<String, CommandoSocketFn>,
+}
+
+impl CommandoSocket {
+ pub fn new<P>(path: P, gid: Gid) -> Self
+ where P: Into<PathBuf>,
+ {
+ CommandoSocket {
+ socket: path.into(),
+ gid,
+ commands: HashMap::new(),
+ }
+ }
+
+ /// Spawn the socket and consume self, meaning you cannot register commands anymore after
+ /// calling this.
+ pub fn spawn(self) -> Result<(), Error> {
+ let control_future = create_control_socket(self.socket.to_owned(), self.gid, move |param| {
+ let param = param
+ .as_object()
+ .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
+
+ let command = match param.get("command") {
+ Some(Value::String(command)) => command.as_str(),
+ None => bail!("no command"),
+ _ => bail!("unable to parse command"),
+ };
+
+ if !self.commands.contains_key(command) {
+ bail!("got unknown command '{}'", command);
+ }
+
+ match self.commands.get(command) {
+ None => bail!("got unknown command '{}'", command),
+ Some(handler) => {
+ let args = param.get("args"); //.unwrap_or(&Value::Null);
+ (handler)(args)
+ },
+ }
+ })?;
+
+ tokio::spawn(control_future);
+
+ Ok(())
+ }
+
+ /// Register a new command with a callback.
+ pub fn register_command<F>(
+ &mut self,
+ command: String,
+ handler: F,
+ ) -> Result<(), Error>
+ where
+ F: Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static,
+ {
+
+ if self.commands.contains_key(&command) {
+ bail!("command '{}' already exists!", command);
+ }
+
+ self.commands.insert(command, Box::new(handler));
+
+ Ok(())
+ }
+}
--- /dev/null
+use std::io::Write;
+
+use anyhow::Error;
+use nix::fcntl::OFlag;
+
+use proxmox::tools::fs::{CreateOptions, atomic_open_or_create_file};
+
+/// Log messages with optional automatically added timestamps into files
+///
+/// Logs messages to file, and optionally to standard output.
+///
+///
+/// #### Example:
+/// ```
+/// # use anyhow::{bail, format_err, Error};
+/// use proxmox_rest_server::{flog, FileLogger, FileLogOptions};
+///
+/// # std::fs::remove_file("test.log");
+/// let options = FileLogOptions {
+/// to_stdout: true,
+/// exclusive: true,
+/// ..Default::default()
+/// };
+/// let mut log = FileLogger::new("test.log", options).unwrap();
+/// flog!(log, "A simple log: {}", "Hello!");
+/// # std::fs::remove_file("test.log");
+/// ```
+
+#[derive(Default)]
+/// Options to control the behavior of a ['FileLogger'] instance
+pub struct FileLogOptions {
+ /// Open underlying log file in append mode, useful when multiple concurrent processes
+ /// want to log to the same file (e.g., HTTP access log). Note that it is only atomic
+ /// for writes smaller than the PIPE_BUF (4k on Linux).
+ /// Inside the same process you may need to still use an mutex, for shared access.
+ pub append: bool,
+ /// Open underlying log file as readable
+ pub read: bool,
+ /// If set, ensure that the file is newly created or error out if already existing.
+ pub exclusive: bool,
+ /// Duplicate logged messages to STDOUT, like tee
+ pub to_stdout: bool,
+ /// Prefix messages logged to the file with the current local time as RFC 3339
+ pub prefix_time: bool,
+ /// File owner/group and mode
+ pub file_opts: CreateOptions,
+
+}
+
+pub struct FileLogger {
+ file: std::fs::File,
+ file_name: std::path::PathBuf,
+ options: FileLogOptions,
+}
+
+/// Log messages to [`FileLogger`](tools/struct.FileLogger.html)
+#[macro_export]
+macro_rules! flog {
+ ($log:expr, $($arg:tt)*) => ({
+ $log.log(format!($($arg)*));
+ })
+}
+
+impl FileLogger {
+ pub fn new<P: AsRef<std::path::Path>>(
+ file_name: P,
+ options: FileLogOptions,
+ ) -> Result<Self, Error> {
+ let file = Self::open(&file_name, &options)?;
+
+ let file_name: std::path::PathBuf = file_name.as_ref().to_path_buf();
+
+ Ok(Self { file, file_name, options })
+ }
+
+ pub fn reopen(&mut self) -> Result<&Self, Error> {
+ let file = Self::open(&self.file_name, &self.options)?;
+ self.file = file;
+ Ok(self)
+ }
+
+ fn open<P: AsRef<std::path::Path>>(
+ file_name: P,
+ options: &FileLogOptions,
+ ) -> Result<std::fs::File, Error> {
+
+ let mut flags = OFlag::O_CLOEXEC;
+
+ if options.read {
+ flags |= OFlag::O_RDWR;
+ } else {
+ flags |= OFlag::O_WRONLY;
+ }
+
+ if options.append {
+ flags |= OFlag::O_APPEND;
+ }
+ if options.exclusive {
+ flags |= OFlag::O_EXCL;
+ }
+
+ let file = atomic_open_or_create_file(&file_name, flags, &[], options.file_opts.clone())?;
+
+ Ok(file)
+ }
+
+ pub fn log<S: AsRef<str>>(&mut self, msg: S) {
+ let msg = msg.as_ref();
+
+ if self.options.to_stdout {
+ let mut stdout = std::io::stdout();
+ stdout.write_all(msg.as_bytes()).unwrap();
+ stdout.write_all(b"\n").unwrap();
+ }
+
+ let line = if self.options.prefix_time {
+ let now = proxmox::tools::time::epoch_i64();
+ let rfc3339 = match proxmox::tools::time::epoch_to_rfc3339(now) {
+ Ok(rfc3339) => rfc3339,
+ Err(_) => "1970-01-01T00:00:00Z".into(), // for safety, should really not happen!
+ };
+ format!("{}: {}\n", rfc3339, msg)
+ } else {
+ format!("{}\n", msg)
+ };
+ if let Err(err) = self.file.write_all(line.as_bytes()) {
+ // avoid panicking, log methods should not do that
+ // FIXME: or, return result???
+ eprintln!("error writing to log file - {}", err);
+ }
+ }
+}
+
+impl std::io::Write for FileLogger {
+ fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
+ if self.options.to_stdout {
+ let _ = std::io::stdout().write(buf);
+ }
+ self.file.write(buf)
+ }
+
+ fn flush(&mut self) -> Result<(), std::io::Error> {
+ if self.options.to_stdout {
+ let _ = std::io::stdout().flush();
+ }
+ self.file.flush()
+ }
+}
+use anyhow::{bail, Error};
+
+mod state;
+pub use state::*;
+
+mod command_socket;
+pub use command_socket::*;
+
+mod file_logger;
+pub use file_logger::{FileLogger, FileLogOptions};
+
+mod api_config;
+pub use api_config::ApiConfig;
+
+pub enum AuthError {
+ Generic(Error),
+ NoData,
+}
+
+impl From<Error> for AuthError {
+ fn from(err: Error) -> Self {
+ AuthError::Generic(err)
+ }
+}
+
+pub trait ApiAuth {
+ fn check_auth(
+ &self,
+ headers: &http::HeaderMap,
+ method: &hyper::Method,
+ ) -> Result<String, AuthError>;
+}
+
+static mut SHUTDOWN_REQUESTED: bool = false;
+
+pub fn request_shutdown() {
+ unsafe {
+ SHUTDOWN_REQUESTED = true;
+ }
+ crate::server_shutdown();
+}
+
+#[inline(always)]
+pub fn shutdown_requested() -> bool {
+ unsafe { SHUTDOWN_REQUESTED }
+}
+
+pub fn fail_on_shutdown() -> Result<(), Error> {
+ if shutdown_requested() {
+ bail!("Server shutdown requested - aborting task");
+ }
+ Ok(())
+}
+
--- /dev/null
+use anyhow::{Error};
+use lazy_static::lazy_static;
+use std::sync::Mutex;
+
+use futures::*;
+
+use tokio::signal::unix::{signal, SignalKind};
+
+use pbs_tools::broadcast_future::BroadcastData;
+
+#[derive(PartialEq, Copy, Clone, Debug)]
+pub enum ServerMode {
+ Normal,
+ Shutdown,
+}
+
+pub struct ServerState {
+ pub mode: ServerMode,
+ pub shutdown_listeners: BroadcastData<()>,
+ pub last_worker_listeners: BroadcastData<()>,
+ pub worker_count: usize,
+ pub internal_task_count: usize,
+ pub reload_request: bool,
+}
+
+lazy_static! {
+ static ref SERVER_STATE: Mutex<ServerState> = Mutex::new(ServerState {
+ mode: ServerMode::Normal,
+ shutdown_listeners: BroadcastData::new(),
+ last_worker_listeners: BroadcastData::new(),
+ worker_count: 0,
+ internal_task_count: 0,
+ reload_request: false,
+ });
+}
+
+pub fn server_state_init() -> Result<(), Error> {
+
+ let mut stream = signal(SignalKind::interrupt())?;
+
+ let future = async move {
+ while stream.recv().await.is_some() {
+ println!("got shutdown request (SIGINT)");
+ SERVER_STATE.lock().unwrap().reload_request = false;
+ crate::request_shutdown();
+ }
+ }.boxed();
+
+ let abort_future = last_worker_future().map_err(|_| {});
+ let task = futures::future::select(future, abort_future);
+
+ tokio::spawn(task.map(|_| ()));
+
+ let mut stream = signal(SignalKind::hangup())?;
+
+ let future = async move {
+ while stream.recv().await.is_some() {
+ println!("got reload request (SIGHUP)");
+ SERVER_STATE.lock().unwrap().reload_request = true;
+ crate::request_shutdown();
+ }
+ }.boxed();
+
+ let abort_future = last_worker_future().map_err(|_| {});
+ let task = futures::future::select(future, abort_future);
+
+ tokio::spawn(task.map(|_| ()));
+
+ Ok(())
+}
+
+pub fn is_reload_request() -> bool {
+ let data = SERVER_STATE.lock().unwrap();
+
+ data.mode == ServerMode::Shutdown && data.reload_request
+}
+
+pub fn server_shutdown() {
+ let mut data = SERVER_STATE.lock().unwrap();
+
+ println!("SET SHUTDOWN MODE");
+
+ data.mode = ServerMode::Shutdown;
+
+ data.shutdown_listeners.notify_listeners(Ok(()));
+
+ drop(data); // unlock
+
+ check_last_worker();
+}
+
+pub fn shutdown_future() -> impl Future<Output = ()> {
+ let mut data = SERVER_STATE.lock().unwrap();
+ data
+ .shutdown_listeners
+ .listen()
+ .map(|_| ())
+}
+
+pub fn last_worker_future() -> impl Future<Output = Result<(), Error>> {
+ let mut data = SERVER_STATE.lock().unwrap();
+ data.last_worker_listeners.listen()
+}
+
+pub fn set_worker_count(count: usize) {
+ SERVER_STATE.lock().unwrap().worker_count = count;
+
+ check_last_worker();
+}
+
+pub fn check_last_worker() {
+ let mut data = SERVER_STATE.lock().unwrap();
+
+ if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.internal_task_count == 0) { return; }
+
+ data.last_worker_listeners.notify_listeners(Ok(()));
+}
+
+/// Spawns a tokio task that will be tracked for reload
+/// and if it is finished, notify the last_worker_listener if we
+/// are in shutdown mode
+pub fn spawn_internal_task<T>(task: T)
+where
+ T: Future + Send + 'static,
+ T::Output: Send + 'static,
+{
+ let mut data = SERVER_STATE.lock().unwrap();
+ data.internal_task_count += 1;
+
+ tokio::spawn(async move {
+ let _ = tokio::spawn(task).await; // ignore errors
+
+ { // drop mutex
+ let mut data = SERVER_STATE.lock().unwrap();
+ if data.internal_task_count > 0 {
+ data.internal_task_count -= 1;
+ }
+ }
+
+ check_last_worker();
+ });
+}
tokio = { version = "1.6", features = [ "rt", "rt-multi-thread" ] }
pathpatterns = "0.1.2"
-proxmox = { version = "0.13.0", default-features = false, features = [] }
+proxmox = { version = "0.13.3", default-features = false, features = [] }
pxar = { version = "0.10.1", features = [ "tokio-io" ] }
pbs-client = { path = "../pbs-client" }
EntryKind::Directory => {
let (sender, receiver) = tokio::sync::mpsc::channel(100);
let channelwriter = AsyncChannelWriter::new(sender, 1024 * 1024);
- crate::server::spawn_internal_task(
+ proxmox_rest_server::spawn_internal_task(
create_zip(channelwriter, decoder, path.clone(), false)
);
Body::wrap_stream(ReceiverStream::new(receiver).map_err(move |err| {
let (ws, response) = WebSocket::new(parts.headers.clone())?;
- crate::server::spawn_internal_task(async move {
+ proxmox_rest_server::spawn_internal_task(async move {
let conn: Upgraded = match hyper::upgrade::on(Request::from_parts(parts, req_body))
.map_err(Error::from)
.await
{
- Ok(upgraded) => upgraded,
+ Ok(upgraded) => upgraded,
_ => bail!("error"),
};
use pbs_tools::fs::{lock_dir_noblock, DirLockGuard};
use pbs_tools::process_locker::ProcessLockSharedGuard;
use pbs_config::{open_backup_lockfile, BackupLockGuard};
-
-use crate::tools::fail_on_shutdown;
+use proxmox_rest_server::fail_on_shutdown;
lazy_static! {
static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
let check_abort = |pos: usize| -> Result<(), Error> {
if pos & 1023 == 0 {
verify_worker.worker.check_abort()?;
- crate::tools::fail_on_shutdown()?;
+ proxmox_rest_server::fail_on_shutdown()?;
}
Ok(())
};
for (pos, _) in chunk_list {
verify_worker.worker.check_abort()?;
- crate::tools::fail_on_shutdown()?;
+ proxmox_rest_server::fail_on_shutdown()?;
let info = index.chunk_info(pos).unwrap();
});
verify_worker.worker.check_abort()?;
- crate::tools::fail_on_shutdown()?;
+ proxmox_rest_server::fail_on_shutdown()?;
if let Err(err) = result {
task_log!(
use proxmox::try_block;
use proxmox::api::RpcEnvironmentType;
+use proxmox::tools::fs::CreateOptions;
use pbs_tools::auth::private_auth_key;
+use proxmox_rest_server::ApiConfig;
use proxmox_backup::server::{
self,
}
let _ = csrf_secret(); // load with lazy_static
- let mut config = server::ApiConfig::new(
+ let mut config = ApiConfig::new(
pbs_buildcfg::JS_DIR,
&proxmox_backup::api2::ROUTER,
RpcEnvironmentType::PRIVILEGED,
default_api_auth(),
)?;
- let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock());
+ let backup_user = pbs_config::backup_user()?;
+ let mut commando_sock = proxmox_rest_server::CommandoSocket::new(crate::server::our_ctrl_sock(), backup_user.gid);
- config.enable_file_log(pbs_buildcfg::API_ACCESS_LOG_FN, &mut commando_sock)?;
+ let dir_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
+ let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
+
+ config.enable_file_log(
+ pbs_buildcfg::API_ACCESS_LOG_FN,
+ Some(dir_opts),
+ Some(file_opts),
+ &mut commando_sock,
+ )?;
let rest_server = RestServer::new(config);
Ok(ready
.and_then(|_| hyper::Server::builder(incoming)
.serve(rest_server)
- .with_graceful_shutdown(server::shutdown_future())
+ .with_graceful_shutdown(proxmox_rest_server::shutdown_future())
.map_err(Error::from)
)
.map(|e| {
let init_result: Result<(), Error> = try_block!({
server::register_task_control_commands(&mut commando_sock)?;
commando_sock.spawn()?;
- server::server_state_init()?;
+ proxmox_rest_server::server_state_init()?;
Ok(())
});
server.await?;
log::info!("server shutting down, waiting for active workers to complete");
- proxmox_backup::server::last_worker_future().await?;
+ proxmox_rest_server::last_worker_future().await?;
log::info!("done - exit server");
use proxmox::try_block;
use proxmox::api::RpcEnvironmentType;
use proxmox::sys::linux::socket::set_tcp_keepalive;
+use proxmox::tools::fs::CreateOptions;
+
+use proxmox_rest_server::ApiConfig;
use proxmox_backup::{
backup::DataStore,
server::{
auth::default_api_auth,
WorkerTask,
- ApiConfig,
rest::*,
jobstate::{
self,
config.register_template("index", &indexpath)?;
config.register_template("console", "/usr/share/pve-xtermjs/index.html.hbs")?;
- let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock());
+ let backup_user = pbs_config::backup_user()?;
+ let mut commando_sock = proxmox_rest_server::CommandoSocket::new(crate::server::our_ctrl_sock(), backup_user.gid);
+
+ let dir_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
+ let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
- config.enable_file_log(pbs_buildcfg::API_ACCESS_LOG_FN, &mut commando_sock)?;
+ config.enable_file_log(
+ pbs_buildcfg::API_ACCESS_LOG_FN,
+ Some(dir_opts),
+ Some(file_opts),
+ &mut commando_sock,
+ )?;
let rest_server = RestServer::new(config);
Ok(ready
.and_then(|_| hyper::Server::builder(connections)
.serve(rest_server)
- .with_graceful_shutdown(server::shutdown_future())
+ .with_graceful_shutdown(proxmox_rest_server::shutdown_future())
.map_err(Error::from)
)
.map_err(|err| eprintln!("server error: {}", err))
let init_result: Result<(), Error> = try_block!({
server::register_task_control_commands(&mut commando_sock)?;
commando_sock.spawn()?;
- server::server_state_init()?;
+ proxmox_rest_server::server_state_init()?;
Ok(())
});
server.await?;
log::info!("server shutting down, waiting for active workers to complete");
- proxmox_backup::server::last_worker_future().await?;
+ proxmox_rest_server::last_worker_future().await?;
log::info!("done - exit server");
Ok(())
}
fn start_stat_generator() {
- let abort_future = server::shutdown_future();
+ let abort_future = proxmox_rest_server::shutdown_future();
let future = Box::pin(run_stat_generator());
let task = futures::future::select(future, abort_future);
tokio::spawn(task.map(|_| ()));
}
fn start_task_scheduler() {
- let abort_future = server::shutdown_future();
+ let abort_future = proxmox_rest_server::shutdown_future();
let future = Box::pin(run_task_scheduler());
let task = futures::future::select(future, abort_future);
tokio::spawn(task.map(|_| ()));
async fn command_reopen_logfiles() -> Result<(), Error> {
// only care about the most recent daemon instance for each, proxy & api, as other older ones
// should not respond to new requests anyway, but only finish their current one and then exit.
- let sock = server::our_ctrl_sock();
- let f1 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
+ let sock = crate::server::our_ctrl_sock();
+ let f1 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
- let pid = server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
- let sock = server::ctrl_sock_from_pid(pid);
- let f2 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
+ let pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
+ let sock = crate::server::ctrl_sock_from_pid(pid);
+ let f2 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
match futures::join!(f1, f2) {
(Err(e1), Err(e2)) => Err(format_err!("reopen commands failed, proxy: {}; api: {}", e1, e2)),
use tokio_stream::wrappers::ReceiverStream;
use proxmox::api::RpcEnvironmentType;
-use proxmox_backup::server::{rest::*, ApiConfig};
use pbs_client::DEFAULT_VSOCK_PORT;
+use proxmox_rest_server::ApiConfig;
+
+use proxmox_backup::server::rest::*;
mod proxmox_restore_daemon;
use proxmox_restore_daemon::*;
use anyhow::{bail, format_err, Error};
-use pbs_api_types::Authid;
-
-use pbs_config::CachedUserInfo;
-use proxmox_backup::server::auth::{ApiAuth, AuthError};
+use proxmox_rest_server::{ApiAuth, AuthError};
const TICKET_FILE: &str = "/ticket";
&self,
headers: &http::HeaderMap,
_method: &hyper::Method,
- _user_info: &CachedUserInfo,
- ) -> Result<Authid, AuthError> {
+ ) -> Result<String, AuthError> {
match headers.get(hyper::header::AUTHORIZATION) {
Some(header) if header.to_str().unwrap_or("") == &self.ticket => {
- Ok(Authid::root_auth_id().to_owned())
+ Ok(String::from("root@pam"))
}
_ => {
return Err(AuthError::Generic(format_err!(
//! Provides authentication primitives for the HTTP server
-use anyhow::{format_err, Error};
+use anyhow::format_err;
use std::sync::Arc;
use pbs_tools::ticket::{self, Ticket};
use pbs_config::{token_shadow, CachedUserInfo};
use pbs_api_types::{Authid, Userid};
+use proxmox_rest_server::{ApiAuth, AuthError};
use crate::auth_helpers::*;
use crate::tools;
use hyper::header;
use percent_encoding::percent_decode_str;
-pub enum AuthError {
- Generic(Error),
- NoData,
-}
-
-impl From<Error> for AuthError {
- fn from(err: Error) -> Self {
- AuthError::Generic(err)
- }
-}
-
-pub trait ApiAuth {
- fn check_auth(
- &self,
- headers: &http::HeaderMap,
- method: &hyper::Method,
- user_info: &CachedUserInfo,
- ) -> Result<Authid, AuthError>;
-}
-
struct UserAuthData {
ticket: String,
csrf_token: Option<String>,
&self,
headers: &http::HeaderMap,
method: &hyper::Method,
- user_info: &CachedUserInfo,
- ) -> Result<Authid, AuthError> {
+ ) -> Result<String, AuthError> {
+
+ let user_info = CachedUserInfo::new()?;
+
let auth_data = Self::extract_auth_data(headers);
match auth_data {
Some(AuthData::User(user_auth_data)) => {
}
}
- Ok(auth_id)
+ Ok(auth_id.to_string())
}
Some(AuthData::ApiToken(api_token)) => {
let mut parts = api_token.splitn(2, ':');
token_shadow::verify_secret(&tokenid, &tokensecret)?;
- Ok(tokenid)
+ Ok(tokenid.to_string())
}
None => Err(AuthError::NoData),
}
+++ /dev/null
-use anyhow::{bail, format_err, Error};
-
-use std::collections::HashMap;
-use std::os::unix::io::AsRawFd;
-use std::path::{PathBuf, Path};
-use std::sync::Arc;
-
-use futures::*;
-use tokio::net::UnixListener;
-use serde::Serialize;
-use serde_json::Value;
-use nix::sys::socket;
-
-/// Listens on a Unix Socket to handle simple command asynchronously
-fn create_control_socket<P, F>(path: P, func: F) -> Result<impl Future<Output = ()>, Error>
-where
- P: Into<PathBuf>,
- F: Fn(Value) -> Result<Value, Error> + Send + Sync + 'static,
-{
- let path: PathBuf = path.into();
-
- let backup_user = pbs_config::backup_user()?;
- let backup_gid = backup_user.gid.as_raw();
-
- let socket = UnixListener::bind(&path)?;
-
- let func = Arc::new(func);
-
- let control_future = async move {
- loop {
- let (conn, _addr) = match socket.accept().await {
- Ok(data) => data,
- Err(err) => {
- eprintln!("failed to accept on control socket {:?}: {}", path, err);
- continue;
- }
- };
-
- let opt = socket::sockopt::PeerCredentials {};
- let cred = match socket::getsockopt(conn.as_raw_fd(), opt) {
- Ok(cred) => cred,
- Err(err) => {
- eprintln!("no permissions - unable to read peer credential - {}", err);
- continue;
- }
- };
-
- // check permissions (same gid, root user, or backup group)
- let mygid = unsafe { libc::getgid() };
- if !(cred.uid() == 0 || cred.gid() == mygid || cred.gid() == backup_gid) {
- eprintln!("no permissions for {:?}", cred);
- continue;
- }
-
- let (rx, mut tx) = tokio::io::split(conn);
-
- let abort_future = super::last_worker_future().map(|_| ());
-
- use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
- let func = Arc::clone(&func);
- let path = path.clone();
- tokio::spawn(futures::future::select(
- async move {
- let mut rx = tokio::io::BufReader::new(rx);
- let mut line = String::new();
- loop {
- line.clear();
- match rx.read_line({ line.clear(); &mut line }).await {
- Ok(0) => break,
- Ok(_) => (),
- Err(err) => {
- eprintln!("control socket {:?} read error: {}", path, err);
- return;
- }
- }
-
- let response = match line.parse::<Value>() {
- Ok(param) => match func(param) {
- Ok(res) => format!("OK: {}\n", res),
- Err(err) => format!("ERROR: {}\n", err),
- }
- Err(err) => format!("ERROR: {}\n", err),
- };
-
- if let Err(err) = tx.write_all(response.as_bytes()).await {
- eprintln!("control socket {:?} write response error: {}", path, err);
- return;
- }
- }
- }.boxed(),
- abort_future,
- ).map(|_| ()));
- }
- }.boxed();
-
- let abort_future = super::last_worker_future().map_err(|_| {});
- let task = futures::future::select(
- control_future,
- abort_future,
- ).map(|_: futures::future::Either<(Result<(), Error>, _), _>| ());
-
- Ok(task)
-}
-
-
-pub async fn send_command<P, T>(path: P, params: &T) -> Result<Value, Error>
-where
- P: AsRef<Path>,
- T: ?Sized + Serialize,
-{
- let mut command_string = serde_json::to_string(params)?;
- command_string.push('\n');
- send_raw_command(path.as_ref(), &command_string).await
-}
-
-pub async fn send_raw_command<P>(path: P, command_string: &str) -> Result<Value, Error>
-where
- P: AsRef<Path>,
-{
- use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
-
- let mut conn = tokio::net::UnixStream::connect(path)
- .map_err(move |err| format_err!("control socket connect failed - {}", err))
- .await?;
-
- conn.write_all(command_string.as_bytes()).await?;
- if !command_string.as_bytes().ends_with(b"\n") {
- conn.write_all(b"\n").await?;
- }
-
- AsyncWriteExt::shutdown(&mut conn).await?;
- let mut rx = tokio::io::BufReader::new(conn);
- let mut data = String::new();
- if rx.read_line(&mut data).await? == 0 {
- bail!("no response");
- }
- if let Some(res) = data.strip_prefix("OK: ") {
- match res.parse::<Value>() {
- Ok(v) => Ok(v),
- Err(err) => bail!("unable to parse json response - {}", err),
- }
- } else if let Some(err) = data.strip_prefix("ERROR: ") {
- bail!("{}", err);
- } else {
- bail!("unable to parse response: {}", data);
- }
-}
-
-/// A callback for a specific commando socket.
-pub type CommandoSocketFn = Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
-
-/// Tooling to get a single control command socket where one can register multiple commands
-/// dynamically.
-/// You need to call `spawn()` to make the socket active.
-pub struct CommandoSocket {
- socket: PathBuf,
- commands: HashMap<String, CommandoSocketFn>,
-}
-
-impl CommandoSocket {
- pub fn new<P>(path: P) -> Self
- where P: Into<PathBuf>,
- {
- CommandoSocket {
- socket: path.into(),
- commands: HashMap::new(),
- }
- }
-
- /// Spawn the socket and consume self, meaning you cannot register commands anymore after
- /// calling this.
- pub fn spawn(self) -> Result<(), Error> {
- let control_future = create_control_socket(self.socket.to_owned(), move |param| {
- let param = param
- .as_object()
- .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
-
- let command = match param.get("command") {
- Some(Value::String(command)) => command.as_str(),
- None => bail!("no command"),
- _ => bail!("unable to parse command"),
- };
-
- if !self.commands.contains_key(command) {
- bail!("got unknown command '{}'", command);
- }
-
- match self.commands.get(command) {
- None => bail!("got unknown command '{}'", command),
- Some(handler) => {
- let args = param.get("args"); //.unwrap_or(&Value::Null);
- (handler)(args)
- },
- }
- })?;
-
- tokio::spawn(control_future);
-
- Ok(())
- }
-
- /// Register a new command with a callback.
- pub fn register_command<F>(
- &mut self,
- command: String,
- handler: F,
- ) -> Result<(), Error>
- where
- F: Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static,
- {
-
- if self.commands.contains_key(&command) {
- bail!("command '{}' already exists!", command);
- }
-
- self.commands.insert(command, Box::new(handler));
-
- Ok(())
- }
-}
+++ /dev/null
-use std::collections::HashMap;
-use std::path::PathBuf;
-use std::time::SystemTime;
-use std::fs::metadata;
-use std::sync::{Arc, Mutex, RwLock};
-
-use anyhow::{bail, Error, format_err};
-use hyper::Method;
-use handlebars::Handlebars;
-use serde::Serialize;
-
-use proxmox::api::{ApiMethod, Router, RpcEnvironmentType};
-use proxmox::tools::fs::{create_path, CreateOptions};
-
-use crate::tools::{FileLogger, FileLogOptions};
-use super::auth::ApiAuth;
-
-pub struct ApiConfig {
- basedir: PathBuf,
- router: &'static Router,
- aliases: HashMap<String, PathBuf>,
- env_type: RpcEnvironmentType,
- templates: RwLock<Handlebars<'static>>,
- template_files: RwLock<HashMap<String, (SystemTime, PathBuf)>>,
- request_log: Option<Arc<Mutex<FileLogger>>>,
- pub api_auth: Arc<dyn ApiAuth + Send + Sync>,
-}
-
-impl ApiConfig {
- pub fn new<B: Into<PathBuf>>(
- basedir: B,
- router: &'static Router,
- env_type: RpcEnvironmentType,
- api_auth: Arc<dyn ApiAuth + Send + Sync>,
- ) -> Result<Self, Error> {
- Ok(Self {
- basedir: basedir.into(),
- router,
- aliases: HashMap::new(),
- env_type,
- templates: RwLock::new(Handlebars::new()),
- template_files: RwLock::new(HashMap::new()),
- request_log: None,
- api_auth,
- })
- }
-
- pub fn find_method(
- &self,
- components: &[&str],
- method: Method,
- uri_param: &mut HashMap<String, String>,
- ) -> Option<&'static ApiMethod> {
-
- self.router.find_method(components, method, uri_param)
- }
-
- pub fn find_alias(&self, components: &[&str]) -> PathBuf {
-
- let mut prefix = String::new();
- let mut filename = self.basedir.clone();
- let comp_len = components.len();
- if comp_len >= 1 {
- prefix.push_str(components[0]);
- if let Some(subdir) = self.aliases.get(&prefix) {
- filename.push(subdir);
- components.iter().skip(1).for_each(|comp| filename.push(comp));
- } else {
- components.iter().for_each(|comp| filename.push(comp));
- }
- }
- filename
- }
-
- pub fn add_alias<S, P>(&mut self, alias: S, path: P)
- where S: Into<String>,
- P: Into<PathBuf>,
- {
- self.aliases.insert(alias.into(), path.into());
- }
-
- pub fn env_type(&self) -> RpcEnvironmentType {
- self.env_type
- }
-
- pub fn register_template<P>(&self, name: &str, path: P) -> Result<(), Error>
- where
- P: Into<PathBuf>
- {
- if self.template_files.read().unwrap().contains_key(name) {
- bail!("template already registered");
- }
-
- let path: PathBuf = path.into();
- let metadata = metadata(&path)?;
- let mtime = metadata.modified()?;
-
- self.templates.write().unwrap().register_template_file(name, &path)?;
- self.template_files.write().unwrap().insert(name.to_string(), (mtime, path));
-
- Ok(())
- }
-
- /// Checks if the template was modified since the last rendering
- /// if yes, it loads a the new version of the template
- pub fn render_template<T>(&self, name: &str, data: &T) -> Result<String, Error>
- where
- T: Serialize,
- {
- let path;
- let mtime;
- {
- let template_files = self.template_files.read().unwrap();
- let (old_mtime, old_path) = template_files.get(name).ok_or_else(|| format_err!("template not found"))?;
-
- mtime = metadata(old_path)?.modified()?;
- if mtime <= *old_mtime {
- return self.templates.read().unwrap().render(name, data).map_err(|err| format_err!("{}", err));
- }
- path = old_path.to_path_buf();
- }
-
- {
- let mut template_files = self.template_files.write().unwrap();
- let mut templates = self.templates.write().unwrap();
-
- templates.register_template_file(name, &path)?;
- template_files.insert(name.to_string(), (mtime, path));
-
- templates.render(name, data).map_err(|err| format_err!("{}", err))
- }
- }
-
- pub fn enable_file_log<P>(
- &mut self,
- path: P,
- commando_sock: &mut super::CommandoSocket,
- ) -> Result<(), Error>
- where
- P: Into<PathBuf>
- {
- let path: PathBuf = path.into();
- if let Some(base) = path.parent() {
- if !base.exists() {
- let backup_user = pbs_config::backup_user()?;
- let opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
- create_path(base, None, Some(opts)).map_err(|err| format_err!("{}", err))?;
- }
- }
-
- let logger_options = FileLogOptions {
- append: true,
- owned_by_backup: true,
- ..Default::default()
- };
- let request_log = Arc::new(Mutex::new(FileLogger::new(&path, logger_options)?));
- self.request_log = Some(Arc::clone(&request_log));
-
- commando_sock.register_command("api-access-log-reopen".into(), move |_args| {
- println!("re-opening log file");
- request_log.lock().unwrap().reopen()?;
- Ok(serde_json::Value::Null)
- })?;
-
- Ok(())
- }
-
- pub fn get_file_log(&self) -> Option<&Arc<Mutex<FileLogger>>> {
- self.request_log.as_ref()
- }
-}
mod upid;
pub use upid::*;
-mod state;
-pub use state::*;
-
-mod command_socket;
-pub use command_socket::*;
-
mod worker_task;
pub use worker_task::*;
mod h2service;
pub use h2service::*;
-pub mod config;
-pub use config::*;
-
pub mod formatter;
#[macro_use]
pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
let proxy_pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
let sock = crate::server::ctrl_sock_from_pid(proxy_pid);
- let _: Value = crate::server::send_raw_command(sock, "{\"command\":\"reload-certificate\"}\n")
+ let _: Value = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"reload-certificate\"}\n")
.await?;
Ok(())
}
pub(crate) async fn notify_datastore_removed() -> Result<(), Error> {
let proxy_pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
let sock = crate::server::ctrl_sock_from_pid(proxy_pid);
- let _: Value = crate::server::send_raw_command(sock, "{\"command\":\"datastore-removed\"}\n")
+ let _: Value = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"datastore-removed\"}\n")
.await?;
Ok(())
}
RpcEnvironmentType,
};
use proxmox::http_err;
+use proxmox::tools::fs::CreateOptions;
use pbs_tools::compression::{DeflateEncoder, Level};
use pbs_tools::stream::AsyncReaderStream;
use pbs_api_types::{Authid, Userid};
+use proxmox_rest_server::{ApiConfig, FileLogger, FileLogOptions, AuthError};
-use super::auth::AuthError;
use super::environment::RestEnvironment;
use super::formatter::*;
-use super::ApiConfig;
use crate::auth_helpers::*;
use pbs_config::CachedUserInfo;
use crate::tools;
use crate::tools::compression::CompressionMethod;
-use crate::tools::FileLogger;
extern "C" {
fn tzset();
}
}
pub fn auth_logger() -> Result<FileLogger, Error> {
- let logger_options = tools::FileLogOptions {
+ let backup_user = pbs_config::backup_user()?;
+
+ let file_opts = CreateOptions::new()
+ .owner(backup_user.uid)
+ .group(backup_user.gid);
+
+ let logger_options = FileLogOptions {
append: true,
prefix_time: true,
- owned_by_backup: true,
+ file_opts,
..Default::default()
};
FileLogger::new(pbs_buildcfg::API_AUTH_LOG_FN, logger_options)
rpcenv.set_client_ip(Some(*peer));
- let user_info = CachedUserInfo::new()?;
let auth = &api.api_auth;
let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000);
}
if auth_required {
- match auth.check_auth(&parts.headers, &method, &user_info) {
- Ok(authid) => rpcenv.set_auth_id(Some(authid.to_string())),
+ match auth.check_auth(&parts.headers, &method) {
+ Ok(authid) => rpcenv.set_auth_id(Some(authid)),
Err(auth_err) => {
let err = match auth_err {
AuthError::Generic(err) => err,
}
Some(api_method) => {
let auth_id = rpcenv.get_auth_id();
+ let user_info = CachedUserInfo::new()?;
+
if !check_api_permission(
api_method.access.permission,
auth_id.as_deref(),
if comp_len == 0 {
let language = extract_lang_header(&parts.headers);
- match auth.check_auth(&parts.headers, &method, &user_info) {
+ match auth.check_auth(&parts.headers, &method) {
Ok(auth_id) => {
+ let auth_id: Authid = auth_id.parse()?;
if !auth_id.is_token() {
let userid = auth_id.user();
let new_csrf_token = assemble_csrf_prevention_token(csrf_secret(), userid);
+++ /dev/null
-use anyhow::{Error};
-use lazy_static::lazy_static;
-use std::sync::Mutex;
-
-use futures::*;
-
-use tokio::signal::unix::{signal, SignalKind};
-
-use pbs_tools::broadcast_future::BroadcastData;
-
-#[derive(PartialEq, Copy, Clone, Debug)]
-pub enum ServerMode {
- Normal,
- Shutdown,
-}
-
-pub struct ServerState {
- pub mode: ServerMode,
- pub shutdown_listeners: BroadcastData<()>,
- pub last_worker_listeners: BroadcastData<()>,
- pub worker_count: usize,
- pub internal_task_count: usize,
- pub reload_request: bool,
-}
-
-lazy_static! {
- static ref SERVER_STATE: Mutex<ServerState> = Mutex::new(ServerState {
- mode: ServerMode::Normal,
- shutdown_listeners: BroadcastData::new(),
- last_worker_listeners: BroadcastData::new(),
- worker_count: 0,
- internal_task_count: 0,
- reload_request: false,
- });
-}
-
-pub fn server_state_init() -> Result<(), Error> {
-
- let mut stream = signal(SignalKind::interrupt())?;
-
- let future = async move {
- while stream.recv().await.is_some() {
- println!("got shutdown request (SIGINT)");
- SERVER_STATE.lock().unwrap().reload_request = false;
- crate::tools::request_shutdown();
- }
- }.boxed();
-
- let abort_future = last_worker_future().map_err(|_| {});
- let task = futures::future::select(future, abort_future);
-
- tokio::spawn(task.map(|_| ()));
-
- let mut stream = signal(SignalKind::hangup())?;
-
- let future = async move {
- while stream.recv().await.is_some() {
- println!("got reload request (SIGHUP)");
- SERVER_STATE.lock().unwrap().reload_request = true;
- crate::tools::request_shutdown();
- }
- }.boxed();
-
- let abort_future = last_worker_future().map_err(|_| {});
- let task = futures::future::select(future, abort_future);
-
- tokio::spawn(task.map(|_| ()));
-
- Ok(())
-}
-
-pub fn is_reload_request() -> bool {
- let data = SERVER_STATE.lock().unwrap();
-
- data.mode == ServerMode::Shutdown && data.reload_request
-}
-
-pub fn server_shutdown() {
- let mut data = SERVER_STATE.lock().unwrap();
-
- println!("SET SHUTDOWN MODE");
-
- data.mode = ServerMode::Shutdown;
-
- data.shutdown_listeners.notify_listeners(Ok(()));
-
- drop(data); // unlock
-
- check_last_worker();
-}
-
-pub fn shutdown_future() -> impl Future<Output = ()> {
- let mut data = SERVER_STATE.lock().unwrap();
- data
- .shutdown_listeners
- .listen()
- .map(|_| ())
-}
-
-pub fn last_worker_future() -> impl Future<Output = Result<(), Error>> {
- let mut data = SERVER_STATE.lock().unwrap();
- data.last_worker_listeners.listen()
-}
-
-pub fn set_worker_count(count: usize) {
- SERVER_STATE.lock().unwrap().worker_count = count;
-
- check_last_worker();
-}
-
-pub fn check_last_worker() {
- let mut data = SERVER_STATE.lock().unwrap();
-
- if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.internal_task_count == 0) { return; }
-
- data.last_worker_listeners.notify_listeners(Ok(()));
-}
-
-/// Spawns a tokio task that will be tracked for reload
-/// and if it is finished, notify the last_worker_listener if we
-/// are in shutdown mode
-pub fn spawn_internal_task<T>(task: T)
-where
- T: Future + Send + 'static,
- T::Output: Send + 'static,
-{
- let mut data = SERVER_STATE.lock().unwrap();
- data.internal_task_count += 1;
-
- tokio::spawn(async move {
- let _ = tokio::spawn(task).await; // ignore errors
-
- { // drop mutex
- let mut data = SERVER_STATE.lock().unwrap();
- if data.internal_task_count > 0 {
- data.internal_task_count -= 1;
- }
- }
-
- check_last_worker();
- });
-}
use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
use pbs_api_types::{Authid, TaskStateType, UPID};
use pbs_config::{open_backup_lockfile, BackupLockGuard};
+use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
use super::UPIDExt;
-use crate::server;
-use crate::tools::{FileLogger, FileLogOptions};
-
macro_rules! taskdir {
($subdir:expr) => (concat!(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
}
/// checks if the task UPID refers to a worker from this process
fn is_local_worker(upid: &UPID) -> bool {
- upid.pid == server::pid() && upid.pstart == server::pstart()
+ upid.pid == crate::server::pid() && upid.pstart == crate::server::pstart()
}
/// Test if the task is still running
return Ok(false);
}
- let sock = server::ctrl_sock_from_pid(upid.pid);
+ let sock = crate::server::ctrl_sock_from_pid(upid.pid);
let cmd = json!({
"command": "worker-task-status",
"args": {
"upid": upid.to_string(),
},
});
- let status = super::send_command(sock, &cmd).await?;
+ let status = proxmox_rest_server::send_command(sock, &cmd).await?;
if let Some(active) = status.as_bool() {
Ok(active)
}
pub fn register_task_control_commands(
- commando_sock: &mut super::CommandoSocket,
+ commando_sock: &mut CommandoSocket,
) -> Result<(), Error> {
fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
let args = if let Some(args) = args { args } else { bail!("missing args") };
pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
- let sock = server::ctrl_sock_from_pid(upid.pid);
+ let sock = crate::server::ctrl_sock_from_pid(upid.pid);
let cmd = json!({
"command": "worker-task-abort",
"args": {
"upid": upid.to_string(),
},
});
- super::send_command(sock, &cmd).map_ok(|_| ()).await
+ proxmox_rest_server::send_command(sock, &cmd).map_ok(|_| ()).await
}
fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
/// task/future. Each task can `log()` messages, which are stored
/// persistently to files. Task should poll the `abort_requested`
/// flag, and stop execution when requested.
-#[derive(Debug)]
pub struct WorkerTask {
upid: UPID,
data: Mutex<WorkerTaskData>,
}
}
-#[derive(Debug)]
struct WorkerTaskData {
logger: FileLogger,
progress: f64, // 0..1
{
let mut hash = WORKER_TASK_LIST.lock().unwrap();
hash.insert(task_id, worker.clone());
- super::set_worker_count(hash.len());
+ proxmox_rest_server::set_worker_count(hash.len());
}
update_active_workers(Some(&upid))?;
WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
let _ = update_active_workers(None);
- super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
+ proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
}
/// Log a message.
use proxmox::tools::io::{ReadExt, WriteExt};
-use crate::server;
use crate::tools::{fd_change_cloexec, self};
#[link(name = "systemd")]
).await?;
let server_future = create_service(listener, NotifyReady)?;
- let shutdown_future = server::shutdown_future();
+ let shutdown_future = proxmox_rest_server::shutdown_future();
let finish_future = match future::select(server_future, shutdown_future).await {
Either::Left((_, _)) => {
- crate::tools::request_shutdown(); // make sure we are in shutdown mode
+ proxmox_rest_server::request_shutdown(); // make sure we are in shutdown mode
None
}
Either::Right((_, server_future)) => Some(server_future),
let mut reloader = Some(reloader);
- if server::is_reload_request() {
+ if proxmox_rest_server::is_reload_request() {
log::info!("daemon reload...");
if let Err(e) = systemd_notify(SystemdNotify::Reloading) {
log::error!("failed to notify systemd about the state change: {}", e);
}
// FIXME: this is a hack, replace with sd_notify_barrier when available
- if server::is_reload_request() {
+ if proxmox_rest_server::is_reload_request() {
wait_service_is_not_state(service_name, "reloading").await?;
}
+++ /dev/null
-use anyhow::Error;
-use std::io::Write;
-
-/// Log messages with optional automatically added timestamps into files
-///
-/// Logs messages to file, and optionally to standard output.
-///
-///
-/// #### Example:
-/// ```
-/// # use anyhow::{bail, format_err, Error};
-/// use proxmox_backup::flog;
-/// use proxmox_backup::tools::{FileLogger, FileLogOptions};
-///
-/// # std::fs::remove_file("test.log");
-/// let options = FileLogOptions {
-/// to_stdout: true,
-/// exclusive: true,
-/// ..Default::default()
-/// };
-/// let mut log = FileLogger::new("test.log", options).unwrap();
-/// flog!(log, "A simple log: {}", "Hello!");
-/// # std::fs::remove_file("test.log");
-/// ```
-
-#[derive(Debug, Default)]
-/// Options to control the behavior of a ['FileLogger'] instance
-pub struct FileLogOptions {
- /// Open underlying log file in append mode, useful when multiple concurrent processes
- /// want to log to the same file (e.g., HTTP access log). Note that it is only atomic
- /// for writes smaller than the PIPE_BUF (4k on Linux).
- /// Inside the same process you may need to still use an mutex, for shared access.
- pub append: bool,
- /// Open underlying log file as readable
- pub read: bool,
- /// If set, ensure that the file is newly created or error out if already existing.
- pub exclusive: bool,
- /// Duplicate logged messages to STDOUT, like tee
- pub to_stdout: bool,
- /// Prefix messages logged to the file with the current local time as RFC 3339
- pub prefix_time: bool,
- /// if set, the file is tried to be chowned by the backup:backup user/group
- /// Note, this is not designed race free as anybody could set it to another user afterwards
- /// anyway. It must thus be used by all processes which doe not run as backup uid/gid.
- pub owned_by_backup: bool,
-}
-
-#[derive(Debug)]
-pub struct FileLogger {
- file: std::fs::File,
- file_name: std::path::PathBuf,
- options: FileLogOptions,
-}
-
-/// Log messages to [`FileLogger`](tools/struct.FileLogger.html)
-#[macro_export]
-macro_rules! flog {
- ($log:expr, $($arg:tt)*) => ({
- $log.log(format!($($arg)*));
- })
-}
-
-impl FileLogger {
- pub fn new<P: AsRef<std::path::Path>>(
- file_name: P,
- options: FileLogOptions,
- ) -> Result<Self, Error> {
- let file = Self::open(&file_name, &options)?;
-
- let file_name: std::path::PathBuf = file_name.as_ref().to_path_buf();
-
- Ok(Self { file, file_name, options })
- }
-
- pub fn reopen(&mut self) -> Result<&Self, Error> {
- let file = Self::open(&self.file_name, &self.options)?;
- self.file = file;
- Ok(self)
- }
-
- fn open<P: AsRef<std::path::Path>>(
- file_name: P,
- options: &FileLogOptions,
- ) -> Result<std::fs::File, Error> {
- let file = std::fs::OpenOptions::new()
- .read(options.read)
- .write(true)
- .append(options.append)
- .create_new(options.exclusive)
- .create(!options.exclusive)
- .open(&file_name)?;
-
- if options.owned_by_backup {
- let backup_user = pbs_config::backup_user()?;
- nix::unistd::chown(file_name.as_ref(), Some(backup_user.uid), Some(backup_user.gid))?;
- }
-
- Ok(file)
- }
-
- pub fn log<S: AsRef<str>>(&mut self, msg: S) {
- let msg = msg.as_ref();
-
- if self.options.to_stdout {
- let mut stdout = std::io::stdout();
- stdout.write_all(msg.as_bytes()).unwrap();
- stdout.write_all(b"\n").unwrap();
- }
-
- let line = if self.options.prefix_time {
- let now = proxmox::tools::time::epoch_i64();
- let rfc3339 = match proxmox::tools::time::epoch_to_rfc3339(now) {
- Ok(rfc3339) => rfc3339,
- Err(_) => "1970-01-01T00:00:00Z".into(), // for safety, should really not happen!
- };
- format!("{}: {}\n", rfc3339, msg)
- } else {
- format!("{}\n", msg)
- };
- if let Err(err) = self.file.write_all(line.as_bytes()) {
- // avoid panicking, log methods should not do that
- // FIXME: or, return result???
- eprintln!("error writing to log file - {}", err);
- }
- }
-}
-
-impl std::io::Write for FileLogger {
- fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
- if self.options.to_stdout {
- let _ = std::io::stdout().write(buf);
- }
- self.file.write(buf)
- }
-
- fn flush(&mut self) -> Result<(), std::io::Error> {
- if self.options.to_stdout {
- let _ = std::io::stdout().flush();
- }
- self.file.flush()
- }
-}
pub mod parallel_handler;
pub use parallel_handler::ParallelHandler;
-mod file_logger;
-pub use file_logger::{FileLogger, FileLogOptions};
-
/// Shortcut for md5 sums.
pub fn md5sum(data: &[u8]) -> Result<DigestBytes, Error> {
hash(MessageDigest::md5(), data).map_err(Error::from)
Ok(())
}
-static mut SHUTDOWN_REQUESTED: bool = false;
-
-pub fn request_shutdown() {
- unsafe {
- SHUTDOWN_REQUESTED = true;
- }
- crate::server::server_shutdown();
-}
-
-#[inline(always)]
-pub fn shutdown_requested() -> bool {
- unsafe { SHUTDOWN_REQUESTED }
-}
-
-pub fn fail_on_shutdown() -> Result<(), Error> {
- if shutdown_requested() {
- bail!("Server shutdown requested - aborting task");
- }
- Ok(())
-}
-
/// safe wrapper for `nix::sys::socket::socketpair` defaulting to `O_CLOEXEC` and guarding the file
/// descriptors.
pub fn socketpair() -> Result<(Fd, Fd), Error> {
use anyhow::{bail, Error};
-#[macro_use]
extern crate proxmox_backup;
extern crate tokio;
use pbs_api_types::{Authid, UPID};
+use proxmox_rest_server::{flog, CommandoSocket};
use proxmox_backup::server;
-use proxmox_backup::tools;
fn garbage_collection(worker: &server::WorkerTask) -> Result<(), Error> {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
- let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock());
+ let mut commando_sock = CommandoSocket::new(server::our_ctrl_sock(), nix::unistd::Gid::current());
let init_result: Result<(), Error> = try_block!({
server::register_task_control_commands(&mut commando_sock)?;
- server::server_state_init()?;
+ proxmox_rest_server::server_state_init()?;
Ok(())
});
println!("WORKER {}", worker);
let result = garbage_collection(&worker);
- tools::request_shutdown();
+ proxmox_rest_server::request_shutdown();
if let Err(err) = result {
println!("got expected error: {}", err);