-use std::sync::Mutex;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
+use std::sync::Mutex;
use anyhow::{bail, format_err, Error};
-use lazy_static::lazy_static;
-use hyper::{Body, Response, Method};
use http::request::Parts;
use http::HeaderMap;
+use hyper::{Body, Method, Response};
+use lazy_static::lazy_static;
+use proxmox_router::{
+ list_subdirs_api_method, Router, RpcEnvironmentType, SubdirMap, UserInformation,
+};
use proxmox_schema::api;
-use proxmox_router::{list_subdirs_api_method, SubdirMap, Router, RpcEnvironmentType, UserInformation};
-use proxmox_rest_server::{ServerAdapter, ApiConfig, AuthError, RestServer, RestEnvironment};
+use proxmox_rest_server::{ApiConfig, AuthError, RestEnvironment, RestServer, ServerAdapter};
// Create a Dummy User information system
struct DummyUserInfo;
// implement the server adapter
impl ServerAdapter for MinimalServer {
-
// normally this would check and authenticate the user
fn check_auth(
&self,
_headers: &HeaderMap,
_method: &Method,
- ) -> Pin<Box<dyn Future<Output = Result<(String, Box<dyn UserInformation + Sync + Send>), AuthError>> + Send>> {
+ ) -> Pin<
+ Box<
+ dyn Future<Output = Result<(String, Box<dyn UserInformation + Sync + Send>), AuthError>>
+ + Send,
+ >,
+ > {
Box::pin(async move {
// get some global/cached userinfo
let userinfo: Box<dyn UserInformation + Sync + Send> = Box::new(DummyUserInfo);
)]
/// returns the value of an item
fn get_item(name: String) -> Result<String, Error> {
- ITEM_MAP.lock().unwrap().get(&name).map(|s| s.to_string()).ok_or_else(|| format_err!("no such item '{}'", name))
+ ITEM_MAP
+ .lock()
+ .unwrap()
+ .get(&name)
+ .map(|s| s.to_string())
+ .ok_or_else(|| format_err!("no such item '{}'", name))
}
#[api(
&Router::new()
.get(&API_METHOD_LIST_ITEMS)
.post(&API_METHOD_CREATE_ITEM)
- .match_all("name", &ITEM_ROUTER)
- ),
- (
- "ping",
- &Router::new()
- .get(&API_METHOD_PING)
+ .match_all("name", &ITEM_ROUTER),
),
+ ("ping", &Router::new().get(&API_METHOD_PING)),
];
const ROUTER: Router = Router::new()
.subdirs(SUBDIRS);
async fn run() -> Result<(), Error> {
-
// we first have to configure the api environment (basedir etc.)
let config = ApiConfig::new(
// then we have to create a daemon that listens, accepts and serves
// the api to clients
- proxmox_rest_server::daemon::create_daemon(
- ([127, 0, 0, 1], 65000).into(),
- move |listener| {
- let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?;
+ proxmox_rest_server::daemon::create_daemon(([127, 0, 0, 1], 65000).into(), move |listener| {
+ let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?;
- Ok(async move {
+ Ok(async move {
+ hyper::Server::builder(incoming).serve(rest_server).await?;
- hyper::Server::builder(incoming)
- .serve(rest_server)
- .await?;
-
- Ok(())
- })
- },
- ).await?;
+ Ok(())
+ })
+ })
+ .await?;
Ok(())
}
use std::collections::HashMap;
-use std::path::PathBuf;
-use std::time::SystemTime;
use std::fs::metadata;
-use std::sync::{Arc, Mutex, RwLock};
+use std::path::PathBuf;
use std::pin::Pin;
+use std::sync::{Arc, Mutex, RwLock};
+use std::time::SystemTime;
-use anyhow::{bail, Error, format_err};
-use hyper::{Method, Body, Response};
+use anyhow::{bail, format_err, Error};
use hyper::http::request::Parts;
+use hyper::{Body, Method, Response};
use handlebars::Handlebars;
use serde::Serialize;
-use proxmox_sys::fs::{create_path, CreateOptions};
use proxmox_router::{ApiMethod, Router, RpcEnvironmentType, UserInformation};
+use proxmox_sys::fs::{create_path, CreateOptions};
-use crate::{ServerAdapter, AuthError, FileLogger, FileLogOptions, CommandSocket, RestEnvironment};
-
+use crate::{AuthError, CommandSocket, FileLogOptions, FileLogger, RestEnvironment, ServerAdapter};
/// REST server configuration
pub struct ApiConfig {
method: Method,
uri_param: &mut HashMap<String, String>,
) -> Option<&'static ApiMethod> {
-
self.router.find_method(components, method, uri_param)
}
pub(crate) fn find_alias(&self, components: &[&str]) -> PathBuf {
-
let mut prefix = String::new();
let mut filename = self.basedir.clone();
let comp_len = components.len();
prefix.push_str(components[0]);
if let Some(subdir) = self.aliases.get(&prefix) {
filename.push(subdir);
- components.iter().skip(1).for_each(|comp| filename.push(comp));
+ components
+ .iter()
+ .skip(1)
+ .for_each(|comp| filename.push(comp));
} else {
components.iter().for_each(|comp| filename.push(comp));
}
/// # }
/// ```
pub fn add_alias<S, P>(&mut self, alias: S, path: P)
- where S: Into<String>,
- P: Into<PathBuf>,
+ where
+ S: Into<String>,
+ P: Into<PathBuf>,
{
self.aliases.insert(alias.into(), path.into());
}
/// Those templates cane be use with [render_template](Self::render_template) to generate pages.
pub fn register_template<P>(&self, name: &str, path: P) -> Result<(), Error>
where
- P: Into<PathBuf>
+ P: Into<PathBuf>,
{
if self.template_files.read().unwrap().contains_key(name) {
bail!("template already registered");
let metadata = metadata(&path)?;
let mtime = metadata.modified()?;
- self.templates.write().unwrap().register_template_file(name, &path)?;
- self.template_files.write().unwrap().insert(name.to_string(), (mtime, path));
+ self.templates
+ .write()
+ .unwrap()
+ .register_template_file(name, &path)?;
+ self.template_files
+ .write()
+ .unwrap()
+ .insert(name.to_string(), (mtime, path));
Ok(())
}
let mtime;
{
let template_files = self.template_files.read().unwrap();
- let (old_mtime, old_path) = template_files.get(name).ok_or_else(|| format_err!("template not found"))?;
+ let (old_mtime, old_path) = template_files
+ .get(name)
+ .ok_or_else(|| format_err!("template not found"))?;
mtime = metadata(old_path)?.modified()?;
if mtime <= *old_mtime {
- return self.templates.read().unwrap().render(name, data).map_err(|err| format_err!("{}", err));
+ return self
+ .templates
+ .read()
+ .unwrap()
+ .render(name, data)
+ .map_err(|err| format_err!("{}", err));
}
path = old_path.to_path_buf();
}
templates.register_template_file(name, &path)?;
template_files.insert(name.to_string(), (mtime, path));
- templates.render(name, data).map_err(|err| format_err!("{}", err))
+ templates
+ .render(name, data)
+ .map_err(|err| format_err!("{}", err))
}
}
commando_sock: &mut CommandSocket,
) -> Result<(), Error>
where
- P: Into<PathBuf>
+ P: Into<PathBuf>,
{
let path: PathBuf = path.into();
if let Some(base) = path.parent() {
commando_sock: &mut CommandSocket,
) -> Result<(), Error>
where
- P: Into<PathBuf>
+ P: Into<PathBuf>,
{
let path: PathBuf = path.into();
if let Some(base) = path.parent() {
use std::collections::HashMap;
use std::os::unix::io::AsRawFd;
-use std::path::{PathBuf, Path};
+use std::path::{Path, PathBuf};
use std::sync::Arc;
use futures::*;
-use tokio::net::UnixListener;
-use serde::Serialize;
-use serde_json::Value;
use nix::sys::socket;
use nix::unistd::Gid;
+use serde::Serialize;
+use serde_json::Value;
+use tokio::net::UnixListener;
// Listens on a Unix Socket to handle simple command asynchronously
-fn create_control_socket<P, F>(path: P, gid: Gid, func: F) -> Result<impl Future<Output = ()>, Error>
+fn create_control_socket<P, F>(
+ path: P,
+ gid: Gid,
+ func: F,
+) -> Result<impl Future<Output = ()>, Error>
where
P: Into<PathBuf>,
F: Fn(Value) -> Result<Value, Error> + Send + Sync + 'static,
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
let func = Arc::clone(&func);
let path = path.clone();
- tokio::spawn(futures::future::select(
- async move {
- let mut rx = tokio::io::BufReader::new(rx);
- let mut line = String::new();
- loop {
- line.clear();
- match rx.read_line({ line.clear(); &mut line }).await {
- Ok(0) => break,
- Ok(_) => (),
- Err(err) => {
- eprintln!("control socket {:?} read error: {}", path, err);
- return;
+ tokio::spawn(
+ futures::future::select(
+ async move {
+ let mut rx = tokio::io::BufReader::new(rx);
+ let mut line = String::new();
+ loop {
+ line.clear();
+ match rx
+ .read_line({
+ line.clear();
+ &mut line
+ })
+ .await
+ {
+ Ok(0) => break,
+ Ok(_) => (),
+ Err(err) => {
+ eprintln!("control socket {:?} read error: {}", path, err);
+ return;
+ }
}
- }
- let response = match line.parse::<Value>() {
- Ok(param) => match func(param) {
- Ok(res) => format!("OK: {}\n", res),
+ let response = match line.parse::<Value>() {
+ Ok(param) => match func(param) {
+ Ok(res) => format!("OK: {}\n", res),
+ Err(err) => format!("ERROR: {}\n", err),
+ },
Err(err) => format!("ERROR: {}\n", err),
- }
- Err(err) => format!("ERROR: {}\n", err),
- };
+ };
- if let Err(err) = tx.write_all(response.as_bytes()).await {
- eprintln!("control socket {:?} write response error: {}", path, err);
- return;
+ if let Err(err) = tx.write_all(response.as_bytes()).await {
+ eprintln!(
+ "control socket {:?} write response error: {}",
+ path, err
+ );
+ return;
+ }
}
}
- }.boxed(),
- abort_future,
- ).map(|_| ()));
+ .boxed(),
+ abort_future,
+ )
+ .map(|_| ()),
+ );
}
- }.boxed();
+ }
+ .boxed();
let abort_future = crate::last_worker_future().map_err(|_| {});
- let task = futures::future::select(
- control_future,
- abort_future,
- ).map(|_: futures::future::Either<(Result<(), Error>, _), _>| ());
+ let task = futures::future::select(control_future, abort_future)
+ .map(|_: futures::future::Either<(Result<(), Error>, _), _>| ());
Ok(task)
}
}
// A callback for a specific commando socket.
-type CommandSocketFn = Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
+type CommandSocketFn =
+ Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
/// Tooling to get a single control command socket where one can
/// register multiple commands dynamically.
impl CommandSocket {
/// Creates a new instance.
pub fn new<P>(path: P, gid: Gid) -> Self
- where P: Into<PathBuf>,
+ where
+ P: Into<PathBuf>,
{
CommandSocket {
socket: path.into(),
/// Spawn the socket and consume self, meaning you cannot register commands anymore after
/// calling this.
pub fn spawn(self) -> Result<(), Error> {
- let control_future = create_control_socket(self.socket.to_owned(), self.gid, move |param| {
- let param = param
- .as_object()
- .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
-
- let command = match param.get("command") {
- Some(Value::String(command)) => command.as_str(),
- None => bail!("no command"),
- _ => bail!("unable to parse command"),
- };
-
- if !self.commands.contains_key(command) {
- bail!("got unknown command '{}'", command);
- }
+ let control_future =
+ create_control_socket(self.socket.to_owned(), self.gid, move |param| {
+ let param = param.as_object().ok_or_else(|| {
+ format_err!("unable to parse parameters (expected json object)")
+ })?;
+
+ let command = match param.get("command") {
+ Some(Value::String(command)) => command.as_str(),
+ None => bail!("no command"),
+ _ => bail!("unable to parse command"),
+ };
+
+ if !self.commands.contains_key(command) {
+ bail!("got unknown command '{}'", command);
+ }
- match self.commands.get(command) {
- None => bail!("got unknown command '{}'", command),
- Some(handler) => {
- let args = param.get("args"); //.unwrap_or(&Value::Null);
- (handler)(args)
- },
- }
- })?;
+ match self.commands.get(command) {
+ None => bail!("got unknown command '{}'", command),
+ Some(handler) => {
+ let args = param.get("args"); //.unwrap_or(&Value::Null);
+ (handler)(args)
+ }
+ }
+ })?;
tokio::spawn(control_future);
}
/// Register a new command with a callback.
- pub fn register_command<F>(
- &mut self,
- command: String,
- handler: F,
- ) -> Result<(), Error>
+ pub fn register_command<F>(&mut self, command: String, handler: F) -> Result<(), Error>
where
F: Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static,
{
-
if self.commands.contains_key(&command) {
bail!("command '{}' already exists!", command);
}
#[derive(Eq, Ord, PartialEq, PartialOrd, Debug)]
pub enum CompressionMethod {
Deflate,
-// Gzip,
-// Brotli,
+ // Gzip,
+ // Brotli,
}
impl CompressionMethod {
pub fn extension(&self) -> &'static str {
match *self {
-// CompressionMethod::Brotli => "br",
-// CompressionMethod::Gzip => "gzip",
+ // CompressionMethod::Brotli => "br",
+ // CompressionMethod::Gzip => "gzip",
CompressionMethod::Deflate => "deflate",
}
}
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
-// "br" => Ok(CompressionMethod::Brotli),
-// "gzip" => Ok(CompressionMethod::Gzip),
+ // "br" => Ok(CompressionMethod::Brotli),
+ // "gzip" => Ok(CompressionMethod::Gzip),
"deflate" => Ok(CompressionMethod::Deflate),
// http accept-encoding allows to give weights with ';q='
other if other.starts_with("deflate;q=") => Ok(CompressionMethod::Deflate),
use std::ffi::CString;
use std::future::Future;
use std::io::{Read, Write};
-use std::os::raw::{c_char, c_uchar, c_int};
-use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::os::raw::{c_char, c_int, c_uchar};
use std::os::unix::ffi::OsStrExt;
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::panic::UnwindSafe;
use std::path::PathBuf;
use futures::future::{self, Either};
use nix::unistd::{fork, ForkResult};
-use proxmox_sys::fd::{fd_change_cloexec, Fd};
use proxmox_io::{ReadExt, WriteExt};
+use proxmox_sys::fd::{fd_change_cloexec, Fd};
// Unfortunately FnBox is nightly-only and Box<FnOnce> is unusable, so just use Box<Fn>...
type BoxedStoreFunc = Box<dyn FnMut() -> Result<String, Error> + UnwindSafe + Send>;
// At this point we call pre-exec helpers. We must be certain that if they fail for
// whatever reason we can still call `_exit()`, so use catch_unwind.
match std::panic::catch_unwind(move || {
- let mut pnew = unsafe {
- std::fs::File::from_raw_fd(pnew.into_raw_fd())
- };
+ let mut pnew =
+ unsafe { std::fs::File::from_raw_fd(pnew.into_raw_fd()) };
let pid = nix::unistd::Pid::this();
if let Err(e) = unsafe { pnew.write_host_value(pid.as_raw()) } {
log::error!("failed to send new server PID to parent: {}", e);
std::mem::drop(pnew);
// Try to reopen STDOUT/STDERR journald streams to get correct PID in logs
- let ident = CString::new(self.self_exe.file_name().unwrap().as_bytes()).unwrap();
+ let ident = CString::new(self.self_exe.file_name().unwrap().as_bytes())
+ .unwrap();
let ident = ident.as_bytes();
- let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_INFO, 1) };
+ let fd =
+ unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_INFO, 1) };
if fd >= 0 && fd != 1 {
let fd = proxmox_sys::fd::Fd(fd); // add drop handler
nix::unistd::dup2(fd.as_raw_fd(), 1)?;
} else {
log::error!("failed to update STDOUT journal redirection ({})", fd);
}
- let fd = unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_ERR, 1) };
+ let fd =
+ unsafe { sd_journal_stream_fd(ident.as_ptr(), libc::LOG_ERR, 1) };
if fd >= 0 && fd != 2 {
let fd = proxmox_sys::fd::Fd(fd); // add drop handler
nix::unistd::dup2(fd.as_raw_fd(), 2)?;
}
self.do_reexec(new_args)
- })
- {
+ }) {
Ok(Ok(())) => log::error!("do_reexec returned!"),
Ok(Err(err)) => log::error!("do_reexec failed: {}", err),
Err(_) => log::error!("panic in re-exec"),
Err(e) => log::error!("fork() failed, restart delayed: {}", e),
}
// No matter how we managed to get here, this is the time where we bail out quickly:
- unsafe {
- libc::_exit(-1)
- }
+ unsafe { libc::_exit(-1) }
}
Ok(ForkResult::Parent { child }) => {
- log::debug!("forked off a new server (first pid: {}), waiting for 2nd pid", child);
+ log::debug!(
+ "forked off a new server (first pid: {}), waiting for 2nd pid",
+ child
+ );
std::mem::drop(pnew);
- let mut pold = unsafe {
- std::fs::File::from_raw_fd(pold.into_raw_fd())
- };
+ let mut pold = unsafe { std::fs::File::from_raw_fd(pold.into_raw_fd()) };
let child = nix::unistd::Pid::from_raw(match unsafe { pold.read_le_value() } {
Ok(v) => v,
Err(e) => {
- log::error!("failed to receive pid of double-forked child process: {}", e);
+ log::error!(
+ "failed to receive pid of double-forked child process: {}",
+ e
+ );
// systemd will complain but won't kill the service...
return Ok(());
}
// FIXME: We could become "independent" of the TcpListener and its reference to the file
// descriptor by `dup()`ing it (and check if the listener still exists via kcmp()?)
fn get_store_func(&self) -> Result<BoxedStoreFunc, Error> {
- let mut fd_opt = Some(Fd(
- nix::fcntl::fcntl(self.as_raw_fd(), nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0))?
- ));
+ let mut fd_opt = Some(Fd(nix::fcntl::fcntl(
+ self.as_raw_fd(),
+ nix::fcntl::FcntlArg::F_DUPFD_CLOEXEC(0),
+ )?));
Ok(Box::new(move || {
let fd = fd_opt.take().unwrap();
fd_change_cloexec(fd.as_raw_fd(), false)?;
}
fn restore(var: &str) -> Result<Self, Error> {
- let fd = var.parse::<u32>()
- .map_err(|e| format_err!("invalid file descriptor: {}", e))?
- as RawFd;
+ let fd = var
+ .parse::<u32>()
+ .map_err(|e| format_err!("invalid file descriptor: {}", e))? as RawFd;
fd_change_cloexec(fd, true)?;
- Ok(Self::from_std(
- unsafe { std::net::TcpListener::from_raw_fd(fd) },
- )?)
+ Ok(Self::from_std(unsafe {
+ std::net::TcpListener::from_raw_fd(fd)
+ })?)
}
}
{
let mut reloader = Reloader::new()?;
- let listener: tokio::net::TcpListener = reloader.restore(
- "PROXMOX_BACKUP_LISTEN_FD",
- move || async move { Ok(tokio::net::TcpListener::bind(&address).await?) },
- ).await?;
+ let listener: tokio::net::TcpListener = reloader
+ .restore("PROXMOX_BACKUP_LISTEN_FD", move || async move {
+ Ok(tokio::net::TcpListener::bind(&address).await?)
+ })
+ .await?;
let service = create_service(listener)?;
#[link(name = "systemd")]
extern "C" {
- fn sd_journal_stream_fd(identifier: *const c_uchar, priority: c_int, level_prefix: c_int) -> c_int;
+ fn sd_journal_stream_fd(
+ identifier: *const c_uchar,
+ priority: c_int,
+ level_prefix: c_int,
+ ) -> c_int;
fn sd_notify(unset_environment: c_int, state: *const c_char) -> c_int;
fn sd_notify_barrier(unset_environment: c_int, timeout: u64) -> c_int;
}
SystemdNotify::Ready => {
log::info!("service is ready");
CString::new("READY=1")
- },
+ }
SystemdNotify::Reloading => CString::new("RELOADING=1"),
SystemdNotify::Stopping => CString::new("STOPPING=1"),
SystemdNotify::Status(msg) => CString::new(format!("STATUS={}", msg)),
}?;
let rc = unsafe { sd_notify(0, message.as_ptr()) };
if rc < 0 {
- bail!("systemd_notify failed: {}", std::io::Error::from_raw_os_error(-rc));
+ bail!(
+ "systemd_notify failed: {}",
+ std::io::Error::from_raw_os_error(-rc)
+ );
}
Ok(())
pub fn systemd_notify_barrier(timeout: u64) -> Result<(), Error> {
let rc = unsafe { sd_notify_barrier(0, timeout) };
if rc < 0 {
- bail!("systemd_notify_barrier failed: {}", std::io::Error::from_raw_os_error(-rc));
+ bail!(
+ "systemd_notify_barrier failed: {}",
+ std::io::Error::from_raw_os_error(-rc)
+ );
}
Ok(())
}
-use std::sync::Arc;
use std::net::SocketAddr;
+use std::sync::Arc;
use serde_json::{json, Value};
pub fn log_failed_auth(&self, failed_auth_id: Option<String>, msg: &str) {
let msg = match (self.client_ip, failed_auth_id) {
(Some(peer), Some(user)) => {
- format!("authentication failure; rhost={} user={} msg={}", peer, user, msg)
+ format!(
+ "authentication failure; rhost={} user={} msg={}",
+ peer, user, msg
+ )
}
(Some(peer), None) => {
format!("authentication failure; rhost={} msg={}", peer, msg)
}
(None, Some(user)) => {
- format!("authentication failure; rhost=unknown user={} msg={}", user, msg)
+ format!(
+ "authentication failure; rhost=unknown user={} msg={}",
+ user, msg
+ )
}
(None, None) => {
format!("authentication failure; rhost=unknown msg={}", msg)
auth_logger.lock().unwrap().log(&msg);
}
}
-
}
impl RpcEnvironment for RestEnvironment {
-
- fn result_attrib_mut (&mut self) -> &mut Value {
+ fn result_attrib_mut(&mut self) -> &mut Value {
&mut self.result_attributes
}
use anyhow::Error;
use nix::fcntl::OFlag;
-use proxmox_sys::fs::{CreateOptions, atomic_open_or_create_file};
+use proxmox_sys::fs::{atomic_open_or_create_file, CreateOptions};
/// Options to control the behavior of a [FileLogger] instance
#[derive(Default)]
pub prefix_time: bool,
/// File owner/group and mode
pub file_opts: CreateOptions,
-
}
/// Log messages with optional automatically added timestamps into files
let file_name: std::path::PathBuf = file_name.as_ref().to_path_buf();
- Ok(Self { file, file_name, options })
+ Ok(Self {
+ file,
+ file_name,
+ options,
+ })
}
pub fn reopen(&mut self) -> Result<&Self, Error> {
file_name: P,
options: &FileLogOptions,
) -> Result<std::fs::File, Error> {
-
let mut flags = OFlag::O_CLOEXEC;
- if options.read {
- flags |= OFlag::O_RDWR;
+ if options.read {
+ flags |= OFlag::O_RDWR;
} else {
- flags |= OFlag::O_WRONLY;
+ flags |= OFlag::O_WRONLY;
}
if options.append {
- flags |= OFlag::O_APPEND;
+ flags |= OFlag::O_APPEND;
}
if options.exclusive {
- flags |= OFlag::O_EXCL;
+ flags |= OFlag::O_EXCL;
}
- let file = atomic_open_or_create_file(&file_name, flags, &[], options.file_opts.clone(), false)?;
+ let file =
+ atomic_open_or_create_file(&file_name, flags, &[], options.file_opts.clone(), false)?;
Ok(file)
}
//! Helpers to format response data
use std::collections::HashMap;
-use anyhow::{Error};
+use anyhow::Error;
use serde_json::{json, Value};
-use hyper::{Body, Response, StatusCode};
use hyper::header;
+use hyper::{Body, Response, StatusCode};
use proxmox_router::{HttpError, RpcEnvironment};
use proxmox_schema::ParameterError;
fn format_error(&self, err: Error) -> Response<Body>;
/// Transform a [Result] into a http response
- fn format_result(&self, result: Result<Value, Error>, rpcenv: &dyn RpcEnvironment) -> Response<Body> {
+ fn format_result(
+ &self,
+ result: Result<Value, Error>,
+ rpcenv: &dyn RpcEnvironment,
+ ) -> Response<Body> {
match result {
Ok(data) => self.format_data(data, rpcenv),
Err(err) => self.format_error(err),
static JSON_CONTENT_TYPE: &str = "application/json;charset=UTF-8";
fn json_data_response(data: Value) -> Response<Body> {
-
let json_str = data.to_string();
let raw = json_str.into_bytes();
let mut response = Response::new(raw.into());
response.headers_mut().insert(
header::CONTENT_TYPE,
- header::HeaderValue::from_static(JSON_CONTENT_TYPE));
+ header::HeaderValue::from_static(JSON_CONTENT_TYPE),
+ );
response
}
-fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment)
-{
+fn add_result_attributes(result: &mut Value, rpcenv: &dyn RpcEnvironment) {
let attributes = match rpcenv.result_attrib().as_object() {
Some(attr) => attr,
None => return,
}
}
-
struct JsonFormatter();
/// Format data as ``application/json``
/// message as string.
pub static JSON_FORMATTER: &'static dyn OutputFormatter = &JsonFormatter();
-impl OutputFormatter for JsonFormatter {
-
+impl OutputFormatter for JsonFormatter {
fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body> {
-
- let mut result = json!({
- "data": data
- });
+ let mut result = json!({ "data": data });
add_result_attributes(&mut result, rpcenv);
}
fn format_error(&self, err: Error) -> Response<Body> {
-
let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
let mut resp = Response::new(Body::from(apierr.message.clone()));
*resp.status_mut() = apierr.code;
response.headers_mut().insert(
header::CONTENT_TYPE,
- header::HeaderValue::from_static(JSON_CONTENT_TYPE));
+ header::HeaderValue::from_static(JSON_CONTENT_TYPE),
+ );
- response.extensions_mut().insert(ErrorMessageExtension(err.to_string()));
+ response
+ .extensions_mut()
+ .insert(ErrorMessageExtension(err.to_string()));
response
}
struct ExtJsFormatter();
-impl OutputFormatter for ExtJsFormatter {
-
+impl OutputFormatter for ExtJsFormatter {
fn format_data(&self, data: Value, rpcenv: &dyn RpcEnvironment) -> Response<Body> {
-
let mut result = json!({
"data": data,
"success": true
}
fn format_error(&self, err: Error) -> Response<Body> {
-
let message: String;
let mut errors = HashMap::new();
let mut response = json_data_response(result);
- response.extensions_mut().insert(ErrorMessageExtension(message));
+ response
+ .extensions_mut()
+ .insert(ErrorMessageExtension(message));
response
}
-use anyhow::{Error};
+use anyhow::Error;
use std::collections::HashMap;
use std::pin::Pin;
use futures::*;
use hyper::{Body, Request, Response, StatusCode};
-use proxmox_router::{ApiResponseFuture, HttpError, Router, RpcEnvironment};
use proxmox_router::http_err;
+use proxmox_router::{ApiResponseFuture, HttpError, Router, RpcEnvironment};
-use crate::{normalize_uri_path, WorkerTask};
use crate::formatter::*;
+use crate::{normalize_uri_path, WorkerTask};
/// Hyper Service implementation to handle stateful H2 connections.
///
debug: bool,
}
-impl <E: RpcEnvironment + Clone> H2Service<E> {
-
+impl<E: RpcEnvironment + Clone> H2Service<E> {
pub fn new(rpcenv: E, worker: Arc<WorkerTask>, router: &'static Router, debug: bool) -> Self {
- Self { rpcenv, worker, router, debug }
+ Self {
+ rpcenv,
+ worker,
+ router,
+ debug,
+ }
}
pub fn debug<S: AsRef<str>>(&self, msg: S) {
- if self.debug { self.worker.log_message(msg); }
+ if self.debug {
+ self.worker.log_message(msg);
+ }
}
fn handle_request(&self, req: Request<Body>) -> ApiResponseFuture {
-
let (parts, body) = req.into_parts();
let method = parts.method.clone();
let (path, components) = match normalize_uri_path(parts.uri.path()) {
- Ok((p,c)) => (p, c),
+ Ok((p, c)) => (p, c),
Err(err) => return future::err(http_err!(BAD_REQUEST, "{}", err)).boxed(),
};
let err = http_err!(NOT_FOUND, "Path '{}' not found.", path);
future::ok(formatter.format_error(err)).boxed()
}
- Some(api_method) => {
- crate::rest::handle_api_request(
- self.rpcenv.clone(), api_method, formatter, parts, body, uri_param).boxed()
- }
+ Some(api_method) => crate::rest::handle_api_request(
+ self.rpcenv.clone(),
+ api_method,
+ formatter,
+ parts,
+ body,
+ uri_param,
+ )
+ .boxed(),
}
}
- fn log_response(worker: Arc<WorkerTask>, method: hyper::Method, path: &str, resp: &Response<Body>) {
-
+ fn log_response(
+ worker: Arc<WorkerTask>,
+ method: hyper::Method,
+ path: &str,
+ resp: &Response<Body>,
+ ) {
let status = resp.status();
if !status.is_success() {
}
}
-impl <E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Service<E> {
+impl<E: RpcEnvironment + Clone> tower_service::Service<Request<Body>> for H2Service<E> {
type Response = Response<Body>;
type Error = Error;
#[allow(clippy::type_complexity)]
Ok::<_, Error>(res)
}
Err(err) => {
- if let Some(apierr) = err.downcast_ref::<HttpError>() {
+ if let Some(apierr) = err.downcast_ref::<HttpError>() {
let mut resp = Response::new(Body::from(apierr.message.clone()));
- resp.extensions_mut().insert(ErrorMessageExtension(apierr.message.clone()));
+ resp.extensions_mut()
+ .insert(ErrorMessageExtension(apierr.message.clone()));
*resp.status_mut() = apierr.code;
Self::log_response(worker, method, &path, &resp);
Ok(resp)
} else {
let mut resp = Response::new(Body::from(err.to_string()));
- resp.extensions_mut().insert(ErrorMessageExtension(err.to_string()));
+ resp.extensions_mut()
+ .insert(ErrorMessageExtension(err.to_string()));
*resp.status_mut() = StatusCode::BAD_REQUEST;
Self::log_response(worker, method, &path, &resp);
Ok(resp)
//! - worker task management
//! * generic interface to authenticate user
-use std::sync::atomic::{Ordering, AtomicBool};
use std::future::Future;
use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
use anyhow::{bail, format_err, Error};
-use nix::unistd::Pid;
-use hyper::{Body, Response, Method};
use http::request::Parts;
use http::HeaderMap;
+use hyper::{Body, Method, Response};
+use nix::unistd::Pid;
+use proxmox_router::UserInformation;
use proxmox_sys::fd::Fd;
-use proxmox_sys::linux::procfs::PidStat;
use proxmox_sys::fs::CreateOptions;
-use proxmox_router::UserInformation;
+use proxmox_sys::linux::procfs::PidStat;
mod compression;
pub use compression::*;
pub use command_socket::*;
mod file_logger;
-pub use file_logger::{FileLogger, FileLogOptions};
+pub use file_logger::{FileLogOptions, FileLogger};
mod api_config;
pub use api_config::ApiConfig;
/// User Authentication and index/root page generation methods
pub trait ServerAdapter: Send + Sync {
-
/// Returns the index/root page
fn get_index(
&self,
&'a self,
headers: &'a HeaderMap,
method: &'a Method,
- ) -> Pin<Box<dyn Future<Output = Result<(String, Box<dyn UserInformation + Sync + Send>), AuthError>> + Send + 'a>>;
-
+ ) -> Pin<
+ Box<
+ dyn Future<Output = Result<(String, Box<dyn UserInformation + Sync + Send>), AuthError>>
+ + Send
+ + 'a,
+ >,
+ >;
}
-lazy_static::lazy_static!{
+lazy_static::lazy_static! {
static ref PID: i32 = unsafe { libc::getpid() };
static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime;
}
pub fn read_pid(pid_fn: &str) -> Result<i32, Error> {
let pid = proxmox_sys::fs::file_get_contents(pid_fn)?;
let pid = std::str::from_utf8(&pid)?.trim();
- pid.parse().map_err(|err| format_err!("could not parse pid - {}", err))
+ pid.parse()
+ .map_err(|err| format_err!("could not parse pid - {}", err))
}
/// Returns the control socket path for a specific process ID.
Ok((Fd(pa), Fd(pb)))
}
-
/// Extract a specific cookie from cookie header.
/// We assume cookie_name is already url encoded.
pub fn extract_cookie(cookie: &str, cookie_name: &str) -> Option<String> {
use serde_json::Value;
use tokio::fs::File;
use tokio::time::Instant;
-use url::form_urlencoded;
use tower_service::Service;
+use url::form_urlencoded;
+use proxmox_router::http_err;
use proxmox_router::{
check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
RpcEnvironmentType, UserInformation,
};
-use proxmox_router::http_err;
use proxmox_schema::{ObjectSchemaType, ParameterSchema};
use proxmox_http::client::RateLimitedStream;
-use proxmox_compression::{DeflateEncoder, Level};
use proxmox_async::stream::AsyncReaderStream;
+use proxmox_compression::{DeflateEncoder, Level};
use crate::{
- ApiConfig, FileLogger, AuthError, RestEnvironment, CompressionMethod,
- normalize_uri_path, formatter::*,
+ formatter::*, normalize_uri_path, ApiConfig, AuthError, CompressionMethod, FileLogger,
+ RestEnvironment,
};
extern "C" {
struct EmptyUserInformation {}
impl UserInformation for EmptyUserInformation {
- fn is_superuser(&self, _userid: &str) -> bool { false }
- fn is_group_member(&self, _userid: &str, _group: &str) -> bool { false }
- fn lookup_privs(&self, _userid: &str, _path: &[&str]) -> u64 { 0 }
+ fn is_superuser(&self, _userid: &str) -> bool {
+ false
+ }
+ fn is_group_member(&self, _userid: &str, _group: &str) -> bool {
+ false
+ }
+ fn lookup_privs(&self, _userid: &str, _path: &[&str]) -> u64 {
+ 0
+ }
}
/// REST server implementation (configured with [ApiConfig])
}
}
-impl Service<&Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>>
- for RestServer
-{
+impl Service<&Pin<Box<tokio_openssl::SslStream<tokio::net::TcpStream>>>> for RestServer {
type Response = ApiService;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<ApiService, Error>> + Send>>;
}
fn call(&mut self, ctx: &hyper::server::conn::AddrStream) -> Self::Future {
- let peer = ctx.remote_addr();
+ let peer = ctx.remote_addr();
future::ok(ApiService {
peer,
api_config: self.api_config.clone(),
Ok(resp)
}
-
fn extension_to_content_type(filename: &Path) -> (&'static str, bool) {
if let Some(ext) = filename.extension().and_then(|osstr| osstr.to_str()) {
return match ext {
}
}
- let mut user_info: Box<dyn UserInformation + Send + Sync> = Box::new(EmptyUserInformation {});
+ let mut user_info: Box<dyn UserInformation + Send + Sync> =
+ Box::new(EmptyUserInformation {});
if auth_required {
match api.check_auth(&parts.headers, &method).await {
};
if let Some(auth_id) = auth_id {
- response.extensions_mut().insert(AuthStringExtension(auth_id));
+ response
+ .extensions_mut()
+ .insert(AuthStringExtension(auth_id));
}
return Ok(response);
-use anyhow::{Error};
+use anyhow::Error;
use lazy_static::lazy_static;
use std::sync::Mutex;
///
/// This calls [request_shutdown] when receiving the signal.
pub fn catch_shutdown_signal() -> Result<(), Error> {
-
let mut stream = signal(SignalKind::interrupt())?;
let future = async move {
SERVER_STATE.lock().unwrap().reload_request = false;
request_shutdown();
}
- }.boxed();
+ }
+ .boxed();
let abort_future = last_worker_future().map_err(|_| {});
let task = futures::future::select(future, abort_future);
/// This calls [request_shutdown] when receiving the signal, and tries
/// to restart the server.
pub fn catch_reload_signal() -> Result<(), Error> {
-
let mut stream = signal(SignalKind::hangup())?;
let future = async move {
SERVER_STATE.lock().unwrap().reload_request = true;
crate::request_shutdown();
}
- }.boxed();
+ }
+ .boxed();
let abort_future = last_worker_future().map_err(|_| {});
let task = futures::future::select(future, abort_future);
data.mode == ServerMode::Shutdown && data.reload_request
}
-
pub(crate) fn server_shutdown() {
let mut data = SERVER_STATE.lock().unwrap();
/// Future to signal server shutdown
pub fn shutdown_future() -> impl Future<Output = ()> {
let mut data = SERVER_STATE.lock().unwrap();
- data
- .shutdown_listeners
- .listen()
- .map(|_| ())
+ data.shutdown_listeners.listen().map(|_| ())
}
/// Future to signal when last worker task finished
-pub fn last_worker_future() -> impl Future<Output = Result<(), Error>> {
+pub fn last_worker_future() -> impl Future<Output = Result<(), Error>> {
let mut data = SERVER_STATE.lock().unwrap();
data.last_worker_listeners.listen()
}
pub(crate) fn check_last_worker() {
let mut data = SERVER_STATE.lock().unwrap();
- if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.internal_task_count == 0) { return; }
+ if !(data.mode == ServerMode::Shutdown
+ && data.worker_count == 0
+ && data.internal_task_count == 0)
+ {
+ return;
+ }
data.last_worker_listeners.notify_listeners(Ok(()));
}
tokio::spawn(async move {
let _ = tokio::spawn(task).await; // ignore errors
- { // drop mutex
+ {
+ // drop mutex
let mut data = SERVER_STATE.lock().unwrap();
if data.internal_task_count > 0 {
data.internal_task_count -= 1;
use std::collections::{HashMap, VecDeque};
use std::fs::File;
-use std::path::PathBuf;
-use std::io::{Read, Write, BufRead, BufReader};
+use std::io::{BufRead, BufReader, Read, Write};
use std::panic::UnwindSafe;
+use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
-use std::time::{SystemTime, Duration};
+use std::time::{Duration, SystemTime};
use anyhow::{bail, format_err, Error};
use futures::*;
use lazy_static::lazy_static;
-use serde_json::{json, Value};
-use serde::{Serialize, Deserialize};
-use tokio::sync::oneshot;
use nix::fcntl::OFlag;
use once_cell::sync::OnceCell;
+use serde::{Deserialize, Serialize};
+use serde_json::{json, Value};
+use tokio::sync::oneshot;
-use proxmox_sys::linux::procfs;
-use proxmox_sys::fs::{create_path, replace_file, atomic_open_or_create_file, CreateOptions};
use proxmox_lang::try_block;
use proxmox_schema::upid::UPID;
+use proxmox_sys::fs::{atomic_open_or_create_file, create_path, replace_file, CreateOptions};
+use proxmox_sys::linux::procfs;
-use proxmox_sys::WorkerTaskContext;
use proxmox_sys::logrotate::{LogRotate, LogRotateFiles};
+use proxmox_sys::WorkerTaskContext;
-use crate::{CommandSocket, FileLogger, FileLogOptions};
+use crate::{CommandSocket, FileLogOptions, FileLogger};
struct TaskListLockGuard(File);
static WORKER_TASK_SETUP: OnceCell<WorkerTaskSetup> = OnceCell::new();
fn worker_task_setup() -> Result<&'static WorkerTaskSetup, Error> {
- WORKER_TASK_SETUP.get()
+ WORKER_TASK_SETUP
+ .get()
.ok_or_else(|| format_err!("WorkerTask library is not initialized"))
}
impl WorkerTaskSetup {
-
fn new(basedir: PathBuf, file_opts: CreateOptions) -> Self {
-
let mut taskdir = basedir;
taskdir.push("tasks");
}
fn lock_task_list_files(&self, exclusive: bool) -> Result<TaskListLockGuard, Error> {
- let options = self.file_opts.clone()
+ let options = self
+ .file_opts
+ .clone()
.perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
let timeout = std::time::Duration::new(10, 0);
- let file = proxmox_sys::fs::open_file_locked(
- &self.task_lock_fn,
- timeout,
- exclusive,
- options,
- )?;
+ let file =
+ proxmox_sys::fs::open_file_locked(&self.task_lock_fn, timeout, exclusive, options)?;
Ok(TaskListLockGuard(file))
}
// atomically read/update the task list, update status of finished tasks
// new_upid is added to the list when specified.
fn update_active_workers(&self, new_upid: Option<&UPID>) -> Result<(), Error> {
-
let lock = self.lock_task_list_files(true)?;
// TODO remove with 1.x
if !worker_is_active_local(&info.upid) {
// println!("Detected stopped task '{}'", &info.upid_str);
let now = proxmox_time::epoch_i64();
- let status = upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
+ let status =
+ upid_read_status(&info.upid).unwrap_or(TaskState::Unknown { endtime: now });
finish_list.push(TaskListInfo {
upid: info.upid,
upid_str: info.upid_str,
- state: Some(status)
+ state: Some(status),
});
return None;
}
Some(info)
- }).collect();
+ })
+ .collect();
if let Some(upid) = new_upid {
- active_list.push(TaskListInfo { upid: upid.clone(), upid_str: upid.to_string(), state: None });
+ active_list.push(TaskListInfo {
+ upid: upid.clone(),
+ upid_str: upid.to_string(),
+ state: None,
+ });
}
let active_raw = render_task_list(&active_list);
- let options = self.file_opts.clone()
+ let options = self
+ .file_opts
+ .clone()
.perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
- replace_file(
- &self.active_tasks_fn,
- active_raw.as_bytes(),
- options,
- false,
- )?;
-
- finish_list.sort_unstable_by(|a, b| {
- match (&a.state, &b.state) {
- (Some(s1), Some(s2)) => s1.cmp(s2),
- (Some(_), None) => std::cmp::Ordering::Less,
- (None, Some(_)) => std::cmp::Ordering::Greater,
- _ => a.upid.starttime.cmp(&b.upid.starttime),
- }
+ replace_file(&self.active_tasks_fn, active_raw.as_bytes(), options, false)?;
+
+ finish_list.sort_unstable_by(|a, b| match (&a.state, &b.state) {
+ (Some(s1), Some(s2)) => s1.cmp(s2),
+ (Some(_), None) => std::cmp::Ordering::Less,
+ (None, Some(_)) => std::cmp::Ordering::Greater,
+ _ => a.upid.starttime.cmp(&b.upid.starttime),
});
if !finish_list.is_empty() {
- let options = self.file_opts.clone()
+ let options = self
+ .file_opts
+ .clone()
.perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
let mut writer = atomic_open_or_create_file(
// Create task log directory with correct permissions
fn create_task_log_dirs(&self) -> Result<(), Error> {
-
try_block!({
- let dir_opts = self.file_opts.clone()
+ let dir_opts = self
+ .file_opts
+ .clone()
.perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
create_path(&self.taskdir, Some(dir_opts.clone()), Some(dir_opts))?;
// fixme:??? create_path(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR, None, Some(opts))?;
Ok(())
- }).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
+ })
+ .map_err(|err: Error| format_err!("unable to create task log dir - {}", err))
}
}
pub fn init_worker_tasks(basedir: PathBuf, file_opts: CreateOptions) -> Result<(), Error> {
let setup = WorkerTaskSetup::new(basedir, file_opts);
setup.create_task_log_dirs()?;
- WORKER_TASK_SETUP.set(setup)
+ WORKER_TASK_SETUP
+ .set(setup)
.map_err(|_| format_err!("init_worker_tasks failed - already initialized"))
}
max_files: Option<usize>,
options: Option<CreateOptions>,
) -> Result<bool, Error> {
-
let setup = worker_task_setup()?;
let _lock = setup.lock_task_list_files(true)?;
- let mut logrotate = LogRotate::new(
- &setup.task_archive_fn,
- compress,
- max_files,
- options,
- )?;
+ let mut logrotate = LogRotate::new(&setup.task_archive_fn, compress, max_files, options)?;
logrotate.rotate(size_threshold)
}
let _lock = setup.lock_task_list_files(true)?;
- let logrotate = LogRotate::new(
- &setup.task_archive_fn,
- compressed,
- None,
- None,
- )?;
+ let logrotate = LogRotate::new(&setup.task_archive_fn, compressed, None, None)?;
let mut timestamp = None;
if let Some(last_file) = logrotate.files().last() {
SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(timestamp as u64))
} else {
SystemTime::UNIX_EPOCH.checked_sub(Duration::from_secs(-timestamp as u64))
- }.ok_or_else(|| format_err!("could not calculate cutoff time"))?;
+ }
+ .ok_or_else(|| format_err!("could not calculate cutoff time"))?;
for i in 0..256 {
let mut path = setup.taskdir.clone();
if modified < cutoff_time {
match std::fs::remove_file(path) {
- Ok(()) => {},
- Err(err) if err.kind() == std::io::ErrorKind::NotFound => {},
+ Ok(()) => {}
+ Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
Err(err) => bail!("could not remove file: {}", err),
}
}
Ok(())
}
-
/// Path to the worker log file
pub fn upid_log_path(upid: &UPID) -> Result<std::path::PathBuf, Error> {
let setup = worker_task_setup()?;
/// If there is not a single line with at valid datetime, we assume the
/// starttime to be the endtime
pub fn upid_read_status(upid: &UPID) -> Result<TaskState, Error> {
-
let setup = worker_task_setup()?;
- let mut status = TaskState::Unknown { endtime: upid.starttime };
+ let mut status = TaskState::Unknown {
+ endtime: upid.starttime,
+ };
let path = setup.log_path(upid);
}
let last_line = match data.iter().rposition(|c| *c == b'\n') {
- Some(start) if data.len() > (start+1) => &data[start+1..],
+ Some(start) if data.len() > (start + 1) => &data[start + 1..],
Some(_) => &data, // should not happen, since we removed all trailing newlines
None => &data,
};
}
lazy_static! {
- static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
+ static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> =
+ Mutex::new(HashMap::new());
}
/// checks if the task UPID refers to a worker from this process
///
/// * ``worker-task-status <UPID>``: return true of false, depending on
/// whether the worker is running or stopped.
-pub fn register_task_control_commands(
- commando_sock: &mut CommandSocket,
-) -> Result<(), Error> {
+pub fn register_task_control_commands(commando_sock: &mut CommandSocket) -> Result<(), Error> {
fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
- let args = if let Some(args) = args { args } else { bail!("missing args") };
+ let args = if let Some(args) = args {
+ args
+ } else {
+ bail!("missing args")
+ };
let upid = match args.get("upid") {
Some(Value::String(upid)) => upid.parse::<UPID>()?,
None => bail!("no upid in args"),
///
/// By sending ``worker-task-abort`` to the control socket.
pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
-
let sock = crate::ctrl_sock_from_pid(upid.pid);
let cmd = json!({
"command": "worker-task-abort",
}
fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
-
let data = line.splitn(3, ' ').collect::<Vec<&str>>();
let len = data.len();
Ok(TaskState::OK { endtime })
} else if let Some(warnings) = s.strip_prefix("WARNINGS: ") {
let count: u64 = warnings.parse()?;
- Ok(TaskState::Warning{ count, endtime })
+ Ok(TaskState::Warning { count, endtime })
} else if !s.is_empty() {
- let message = if let Some(err) = s.strip_prefix("ERROR: ") { err } else { s }.to_string();
- Ok(TaskState::Error{ message, endtime })
+ let message = if let Some(err) = s.strip_prefix("ERROR: ") {
+ err
+ } else {
+ s
+ }
+ .to_string();
+ Ok(TaskState::Error { message, endtime })
} else {
bail!("unable to parse Task Status '{}'", s);
}
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TaskState::Unknown { .. } => write!(f, "unknown"),
- TaskState::OK { .. }=> write!(f, "OK"),
+ TaskState::OK { .. } => write!(f, "OK"),
TaskState::Warning { count, .. } => write!(f, "WARNINGS: {}", count),
TaskState::Error { message, .. } => write!(f, "{}", message),
}
fn render_task_line(info: &TaskListInfo) -> String {
let mut raw = String::new();
if let Some(status) = &info.state {
- raw.push_str(&format!("{} {:08X} {}\n", info.upid_str, status.endtime(), status));
+ raw.push_str(&format!(
+ "{} {:08X} {}\n",
+ info.upid_str,
+ status.endtime(),
+ status
+ ));
} else {
raw.push_str(&info.upid_str);
raw.push('\n');
// note this is not locked, caller has to make sure it is
// this will skip (and log) lines that are not valid status lines
-fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
-{
+fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error> {
let reader = BufReader::new(reader);
let mut list = Vec::new();
for line in reader.lines() {
Ok((upid_str, upid, state)) => list.push(TaskListInfo {
upid_str,
upid,
- state
+ state,
}),
Err(err) => {
log::warn!("unable to parse worker status '{}' - {}", line, err);
impl TaskListInfoIterator {
/// Creates a new iterator instance.
pub fn new(active_only: bool) -> Result<Self, Error> {
-
let setup = worker_task_setup()?;
let (read_lock, active_list) = {
if let Some(element) = self.list.pop_back() {
return Some(Ok(element));
} else if self.end {
- return None;
+ return None;
} else {
if let Some(mut archive) = self.archive.take() {
if let Some(file) = archive.next() {
}
impl std::fmt::Display for WorkerTask {
-
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
self.upid.fmt(f)
}
}
impl WorkerTask {
-
pub fn new(
worker_type: &str,
worker_id: Option<String>,
auth_id: String,
to_stdout: bool,
) -> Result<Arc<Self>, Error> {
-
let setup = worker_task_setup()?;
let upid = UPID::new(worker_type, worker_id, auth_id)?;
path.push(format!("{:02X}", upid.pstart & 255));
- let dir_opts = setup.file_opts.clone()
+ let dir_opts = setup
+ .file_opts
+ .clone()
.perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
create_path(&path, None, Some(dir_opts))?;
to_stdout: bool,
f: F,
) -> Result<String, Error>
- where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
- T: Send + 'static + Future<Output = Result<(), Error>>,
+ where
+ F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
+ T: Send + 'static + Future<Output = Result<(), Error>>,
{
let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
let upid_str = worker.upid.to_string();
to_stdout: bool,
f: F,
) -> Result<String, Error>
- where F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
+ where
+ F: Send + UnwindSafe + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>,
{
let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
let upid_str = worker.upid.to_string();
- let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {
- let worker1 = worker.clone();
- let result = match std::panic::catch_unwind(move || f(worker1)) {
- Ok(r) => r,
- Err(panic) => {
- match panic.downcast::<&str>() {
- Ok(panic_msg) => {
- Err(format_err!("worker panicked: {}", panic_msg))
- }
- Err(_) => {
- Err(format_err!("worker panicked: unknown type."))
- }
- }
- }
- };
-
- worker.log_result(&result);
- });
+ let _child = std::thread::Builder::new()
+ .name(upid_str.clone())
+ .spawn(move || {
+ let worker1 = worker.clone();
+ let result = match std::panic::catch_unwind(move || f(worker1)) {
+ Ok(r) => r,
+ Err(panic) => match panic.downcast::<&str>() {
+ Ok(panic_msg) => Err(format_err!("worker panicked: {}", panic_msg)),
+ Err(_) => Err(format_err!("worker panicked: unknown type.")),
+ },
+ };
+
+ worker.log_result(&result);
+ });
Ok(upid_str)
}
let endtime = proxmox_time::epoch_i64();
if let Err(err) = result {
- TaskState::Error { message: err.to_string(), endtime }
+ TaskState::Error {
+ message: err.to_string(),
+ endtime,
+ }
} else if warn_count > 0 {
- TaskState::Warning { count: warn_count, endtime }
+ TaskState::Warning {
+ count: warn_count,
+ endtime,
+ }
} else {
TaskState::OK { endtime }
}
let mut data = self.data.lock().unwrap();
data.progress = progress;
} else {
- // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
+ // fixme: log!("task '{}': ignoring strange value for progress '{}'", self.upid, progress);
}
}
/// Request abort
pub fn request_abort(&self) {
let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
- if !prev_abort { // log abort one time
+ if !prev_abort {
+ // log abort one time
self.log_message("received abort request ...".to_string());
}
// noitify listeners
let mut data = self.data.lock().unwrap();
loop {
match data.abort_listeners.pop() {
- None => { break; },
+ None => {
+ break;
+ }
Some(ch) => {
let _ = ch.send(()); // ignore errors here
- },
+ }
}
}
}
/// Get a future which resolves on task abort
- pub fn abort_future(&self) -> oneshot::Receiver<()> {
+ pub fn abort_future(&self) -> oneshot::Receiver<()> {
let (tx, rx) = oneshot::channel::<()>();
let mut data = self.data.lock().unwrap();
}
impl WorkerTaskContext for WorkerTask {
-
fn abort_requested(&self) -> bool {
self.abort_requested.load(Ordering::SeqCst)
}
/// Note: local workers should print logs to stdout, so there is no
/// need to fetch/display logs. We just wait for the worker to finish.
pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
-
let upid: UPID = upid_str.parse()?;
let sleep_duration = core::time::Duration::new(0, 100_000_000);