Ok(task)
}
-
+/// Send a command to the specified socket
pub async fn send_command<P, T>(path: P, params: &T) -> Result<Value, Error>
where
P: AsRef<Path>,
send_raw_command(path.as_ref(), &command_string).await
}
+/// Send a raw command (string) to the specified socket
pub async fn send_raw_command<P>(path: P, command_string: &str) -> Result<Value, Error>
where
P: AsRef<Path>,
}
}
-/// A callback for a specific commando socket.
-pub type CommandoSocketFn = Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
+// A callback for a specific commando socket.
+type CommandoSocketFn = Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
-/// Tooling to get a single control command socket where one can register multiple commands
-/// dynamically.
+/// Tooling to get a single control command socket where one can
+/// register multiple commands dynamically.
+///
/// You need to call `spawn()` to make the socket active.
pub struct CommandoSocket {
socket: PathBuf,
mod h2service;
pub use h2service::*;
+/// Authentification Error
pub enum AuthError {
Generic(Error),
NoData,
}
}
+/// User Authentification trait
pub trait ApiAuth {
+ /// Extract user credentials from headers and check them.
+ ///
+ /// If credenthials are valid, returns the username and a
+ /// [UserInformation] object to query additional user data.
fn check_auth(
&self,
headers: &http::HeaderMap,
static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime;
}
+/// Retruns the current process ID (see [libc::getpid])
+///
+/// The value is cached at startup (so it is invalid after a fork)
pub fn pid() -> i32 {
*PID
}
+/// Returns the starttime of the process (see [PidStat])
+///
+/// The value is cached at startup (so it is invalid after a fork)
pub fn pstart() -> u64 {
*PSTART
}
+/// Helper to write the PID into a file
pub fn write_pid(pid_fn: &str) -> Result<(), Error> {
let pid_str = format!("{}\n", *PID);
proxmox::tools::fs::replace_file(pid_fn, pid_str.as_bytes(), CreateOptions::new())
}
+/// Helper to read the PID from a file
pub fn read_pid(pid_fn: &str) -> Result<i32, Error> {
let pid = proxmox::tools::fs::file_get_contents(pid_fn)?;
let pid = std::str::from_utf8(&pid)?.trim();
pid.parse().map_err(|err| format_err!("could not parse pid - {}", err))
}
+/// Returns the control socket path for a specific process ID.
+///
+/// Note: The control socket always uses @/run/proxmox-backup/ as
+/// prefix for historic reason. This does not matter because the
+/// generated path is unique for each ``pid`` anyways.
pub fn ctrl_sock_from_pid(pid: i32) -> String {
// Note: The control socket always uses @/run/proxmox-backup/ as prefix
// for historc reason.
format!("\0{}/control-{}.sock", "/run/proxmox-backup", pid)
}
+/// Returns the control socket path for this server.
pub fn our_ctrl_sock() -> String {
ctrl_sock_from_pid(*PID)
}
static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
+/// Request a server shutdown (usually called from [catch_shutdown_signal])
pub fn request_shutdown() {
SHUTDOWN_REQUESTED.store(true, Ordering::SeqCst);
crate::server_shutdown();
}
+/// Returns true if there was a shutdown request.
#[inline(always)]
pub fn shutdown_requested() -> bool {
SHUTDOWN_REQUESTED.load(Ordering::SeqCst)
}
+/// Raise an error if there was a shutdown request.
pub fn fail_on_shutdown() -> Result<(), Error> {
if shutdown_requested() {
bail!("Server shutdown requested - aborting task");
use pbs_tools::broadcast_future::BroadcastData;
+use crate::request_shutdown;
+
#[derive(PartialEq, Copy, Clone, Debug)]
enum ServerMode {
Normal,
}
/// Listen to ``SIGINT`` for server shutdown
+///
+/// This calls [request_shutdown] when receiving the signal.
pub fn catch_shutdown_signal() -> Result<(), Error> {
let mut stream = signal(SignalKind::interrupt())?;
while stream.recv().await.is_some() {
log::info!("got shutdown request (SIGINT)");
SERVER_STATE.lock().unwrap().reload_request = false;
- crate::request_shutdown();
+ request_shutdown();
}
}.boxed();
}
/// Listen to ``SIGHUP`` for server reload
+///
+/// This calls [request_shutdown] when receiving the signal, and tries
+/// to restart the server.
pub fn catch_reload_signal() -> Result<(), Error> {
let mut stream = signal(SignalKind::hangup())?;
Ok(())
}
-pub fn is_reload_request() -> bool {
+pub(crate) fn is_reload_request() -> bool {
let data = SERVER_STATE.lock().unwrap();
data.mode == ServerMode::Shutdown && data.reload_request
}
-pub fn server_shutdown() {
+
+pub(crate) fn server_shutdown() {
let mut data = SERVER_STATE.lock().unwrap();
log::info!("request_shutdown");
check_last_worker();
}
+/// Future to signal server shutdown
pub fn shutdown_future() -> impl Future<Output = ()> {
let mut data = SERVER_STATE.lock().unwrap();
data
.map(|_| ())
}
+/// Future to signal when last worker task finished
pub fn last_worker_future() -> impl Future<Output = Result<(), Error>> {
let mut data = SERVER_STATE.lock().unwrap();
data.last_worker_listeners.listen()
}
-pub fn set_worker_count(count: usize) {
+pub(crate) fn set_worker_count(count: usize) {
SERVER_STATE.lock().unwrap().worker_count = count;
check_last_worker();
}
-pub fn check_last_worker() {
+pub(crate) fn check_last_worker() {
let mut data = SERVER_STATE.lock().unwrap();
if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.internal_task_count == 0) { return; }
}
}
+/// Register task control command on a [CommandoSocket].
+///
+/// This create two commands:
+///
+/// * ``worker-task-abort <UPID>``: calls [abort_local_worker]
+///
+/// * ``worker-task-status <UPID>``: return true of false, depending on
+/// whether the worker is running or stopped.
pub fn register_task_control_commands(
commando_sock: &mut CommandoSocket,
) -> Result<(), Error> {
Ok(())
}
-pub fn abort_worker_async(upid: UPID) {
+/// Try to abort a worker task, but do no wait
+///
+/// Errors (if any) are simply logged.
+pub fn abort_worker_nowait(upid: UPID) {
tokio::spawn(async move {
if let Err(err) = abort_worker(upid).await {
- eprintln!("abort worker failed - {}", err);
+ log::error!("abort worker task failed - {}", err);
}
});
}
+/// Abort a worker task
+///
+/// By sending ``worker-task-abort`` to the control socket.
pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
let sock = crate::ctrl_sock_from_pid(upid.pid);
state
}),
Err(err) => {
- eprintln!("unable to parse worker status '{}' - {}", line, err);
+ log::warn!("unable to parse worker status '{}' - {}", line, err);
continue;
}
};
read_task_file(file)
}
+/// Iterate over existing/active worker tasks
pub struct TaskListInfoIterator {
list: VecDeque<TaskListInfo>,
end: bool,
}
impl TaskListInfoIterator {
+ /// Creates a new iterator instance.
pub fn new(active_only: bool) -> Result<Self, Error> {
let setup = worker_task_setup()?;
/// Request abort
pub fn request_abort(&self) {
- eprintln!("set abort flag for worker {}", self.upid);
-
let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
if !prev_abort { // log abort one time
self.log_message(format!("received abort request ..."));
user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
}
- proxmox_rest_server::abort_worker_async(upid);
+ proxmox_rest_server::abort_worker_nowait(upid);
Ok(Value::Null)
}
}
Ok(wid) => {
println!("WORKER: {}", wid);
- proxmox_rest_server::abort_worker_async(wid.parse::<UPID>().unwrap());
+ proxmox_rest_server::abort_worker_nowait(wid.parse::<UPID>().unwrap());
proxmox_rest_server::wait_for_local_worker(&wid).await.unwrap();
}
}