2 use std
::io
::{BufRead, BufReader}
;
4 use anyhow
::{bail, Error}
;
5 use serde_json
::{json, Value}
;
7 use proxmox
::api
::{api, Router, RpcEnvironment, Permission}
;
8 use proxmox
::api
::router
::SubdirMap
;
9 use proxmox
::{identity, list_subdirs_api_method, sortable}
;
12 Userid
, Authid
, Tokenname
, TaskListItem
, TaskStateType
, UPID
,
13 NODE_SCHEMA
, UPID_SCHEMA
, VERIFICATION_JOB_WORKER_ID_REGEX
,
14 SYNC_JOB_WORKER_ID_REGEX
, DATASTORE_SCHEMA
,
15 PRIV_DATASTORE_MODIFY
, PRIV_DATASTORE_VERIFY
, PRIV_SYS_AUDIT
, PRIV_SYS_MODIFY
,
18 use crate::api2
::pull
::check_pull_privs
;
19 use crate::server
::{self, UPIDExt, TaskState, TaskListInfoIterator}
;
20 use pbs_config
::CachedUserInfo
;
22 // matches respective job execution privileges
23 fn check_job_privs(auth_id
: &Authid
, user_info
: &CachedUserInfo
, upid
: &UPID
) -> Result
<(), Error
> {
24 match (upid
.worker_type
.as_str(), &upid
.worker_id
) {
25 ("verificationjob", Some(workerid
)) => {
26 if let Some(captures
) = VERIFICATION_JOB_WORKER_ID_REGEX
.captures(&workerid
) {
27 if let Some(store
) = captures
.get(1) {
28 return user_info
.check_privs(&auth_id
,
29 &["datastore", store
.as_str()],
30 PRIV_DATASTORE_VERIFY
,
35 ("syncjob", Some(workerid
)) => {
36 if let Some(captures
) = SYNC_JOB_WORKER_ID_REGEX
.captures(&workerid
) {
37 let remote
= captures
.get(1);
38 let remote_store
= captures
.get(2);
39 let local_store
= captures
.get(3);
41 if let (Some(remote
), Some(remote_store
), Some(local_store
)) =
42 (remote
, remote_store
, local_store
) {
44 return check_pull_privs(&auth_id
,
47 remote_store
.as_str(),
52 ("garbage_collection", Some(workerid
)) => {
53 return user_info
.check_privs(&auth_id
,
54 &["datastore", &workerid
],
55 PRIV_DATASTORE_MODIFY
,
58 ("prune", Some(workerid
)) => {
59 return user_info
.check_privs(&auth_id
,
62 PRIV_DATASTORE_MODIFY
,
65 _
=> bail
!("not a scheduled job task"),
68 bail
!("not a scheduled job task");
71 // get the store out of the worker_id
72 fn check_job_store(upid
: &UPID
, store
: &str) -> bool
{
73 match (upid
.worker_type
.as_str(), &upid
.worker_id
) {
74 (workertype
, Some(workerid
)) if workertype
.starts_with("verif") => {
75 if let Some(captures
) = VERIFICATION_JOB_WORKER_ID_REGEX
.captures(&workerid
) {
76 if let Some(jobstore
) = captures
.get(1) {
77 return store
== jobstore
.as_str();
80 return workerid
== store
;
83 ("syncjob", Some(workerid
)) => {
84 if let Some(captures
) = SYNC_JOB_WORKER_ID_REGEX
.captures(&workerid
) {
85 if let Some(local_store
) = captures
.get(3) {
86 return store
== local_store
.as_str();
90 ("prune", Some(workerid
))
91 | ("backup", Some(workerid
))
92 | ("garbage_collection", Some(workerid
)) => {
93 return workerid
== store
|| workerid
.starts_with(&format
!("{}:", store
));
101 fn check_task_access(auth_id
: &Authid
, upid
: &UPID
) -> Result
<(), Error
> {
102 let task_auth_id
: Authid
= upid
.auth_id
.parse()?
;
103 if auth_id
== &task_auth_id
104 || (task_auth_id
.is_token() && &Authid
::from(task_auth_id
.user().clone()) == auth_id
) {
105 // task owner can always read
108 let user_info
= CachedUserInfo
::new()?
;
110 // access to all tasks
111 // or task == job which the user/token could have configured/manually executed
113 user_info
.check_privs(auth_id
, &["system", "tasks"], PRIV_SYS_AUDIT
, false)
114 .or_else(|_
| check_job_privs(&auth_id
, &user_info
, upid
))
115 .or_else(|_
| bail
!("task access not allowed"))
119 pub fn tasktype(state
: &TaskState
) -> TaskStateType
{
121 TaskState
::OK { .. }
=> TaskStateType
::OK
,
122 TaskState
::Unknown { .. }
=> TaskStateType
::Unknown
,
123 TaskState
::Error { .. }
=> TaskStateType
::Error
,
124 TaskState
::Warning { .. }
=> TaskStateType
::Warning
,
140 description
: "Task status information.",
150 description
: "The Unix PID.",
154 description
: "The Unix process start time from `/proc/pid/stat`",
158 description
: "The task start time (Epoch)",
162 description
: "Worker type (arbitrary ASCII string)",
167 description
: "Worker ID (arbitrary ASCII string)",
178 description
: "'running' or 'stopped'",
183 description
: "'OK', 'Error: <msg>', or 'unkwown'.",
188 description
: "Users can access their own tasks, or need Sys.Audit on /system/tasks.",
189 permission
: &Permission
::Anybody
,
193 async
fn get_task_status(
195 rpcenv
: &mut dyn RpcEnvironment
,
196 ) -> Result
<Value
, Error
> {
198 let upid
= extract_upid(¶m
)?
;
200 let auth_id
: Authid
= rpcenv
.get_auth_id().unwrap().parse()?
;
201 check_task_access(&auth_id
, &upid
)?
;
203 let task_auth_id
: Authid
= upid
.auth_id
.parse()?
;
205 let mut result
= json
!({
206 "upid": param
["upid"],
209 "pstart": upid
.pstart
,
210 "starttime": upid
.starttime
,
211 "type": upid
.worker_type
,
212 "id": upid
.worker_id
,
213 "user": task_auth_id
.user(),
216 if task_auth_id
.is_token() {
217 result
["tokenid"] = Value
::from(task_auth_id
.tokenname().unwrap().as_str());
220 if crate::server
::worker_is_active(&upid
).await?
{
221 result
["status"] = Value
::from("running");
223 let exitstatus
= crate::server
::upid_read_status(&upid
).unwrap_or(TaskState
::Unknown { endtime: 0 }
);
224 result
["status"] = Value
::from("stopped");
225 result
["exitstatus"] = Value
::from(exitstatus
.to_string());
231 fn extract_upid(param
: &Value
) -> Result
<UPID
, Error
> {
233 let upid_str
= pbs_tools
::json
::required_string_param(¶m
, "upid")?
;
235 upid_str
.parse
::<UPID
>()
250 description
: "Test task status, and set result attribute \"active\" accordingly.",
255 description
: "Start at this line.",
261 description
: "Only list this amount of lines.",
267 description
: "Users can access their own tasks, or need Sys.Audit on /system/tasks.",
268 permission
: &Permission
::Anybody
,
272 async
fn read_task_log(
274 mut rpcenv
: &mut dyn RpcEnvironment
,
275 ) -> Result
<Value
, Error
> {
277 let upid
= extract_upid(¶m
)?
;
279 let auth_id
: Authid
= rpcenv
.get_auth_id().unwrap().parse()?
;
281 check_task_access(&auth_id
, &upid
)?
;
283 let test_status
= param
["test-status"].as_bool().unwrap_or(false);
285 let start
= param
["start"].as_u64().unwrap_or(0);
286 let mut limit
= param
["limit"].as_u64().unwrap_or(50);
288 let mut count
: u64 = 0;
290 let path
= upid
.log_path();
292 let file
= File
::open(path
)?
;
294 let mut lines
: Vec
<Value
> = vec
![];
296 for line
in BufReader
::new(file
).lines() {
300 if count
< start { continue }
;
301 if limit
== 0 { continue }
;
303 lines
.push(json
!({ "n": count, "t": line }
));
308 log
::error
!("reading task log failed: {}", err
);
314 rpcenv
["total"] = Value
::from(count
);
317 let active
= crate::server
::worker_is_active(&upid
).await?
;
318 rpcenv
["active"] = Value
::from(active
);
337 description
: "Users can stop their own tasks, or need Sys.Modify on /system/tasks.",
338 permission
: &Permission
::Anybody
,
341 /// Try to stop a task.
344 rpcenv
: &mut dyn RpcEnvironment
,
345 ) -> Result
<Value
, Error
> {
347 let upid
= extract_upid(¶m
)?
;
349 let auth_id
= rpcenv
.get_auth_id().unwrap();
351 if auth_id
!= upid
.auth_id
{
352 let user_info
= CachedUserInfo
::new()?
;
353 let auth_id
: Authid
= auth_id
.parse()?
;
354 user_info
.check_privs(&auth_id
, &["system", "tasks"], PRIV_SYS_MODIFY
, false)?
;
357 server
::abort_worker_async(upid
);
370 description
: "List tasks beginning from this offset.",
376 description
: "Only list this amount of tasks. (0 means no limit)",
381 schema
: DATASTORE_SCHEMA
,
386 description
: "Only list running tasks.",
392 description
: "Only list erroneous tasks.",
399 description
: "Only list tasks from this user.",
403 description
: "Only list tasks since this UNIX epoch.",
408 description
: "Only list tasks until this UNIX epoch.",
414 description
: "Only list tasks whose type contains this.",
419 description
: "Only list tasks which have any one of the listed status.",
426 returns
: pbs_api_types
::NODE_TASKS_LIST_TASKS_RETURN_TYPE
,
428 description
: "Users can only see their own tasks, unless they have Sys.Audit on /system/tasks.",
429 permission
: &Permission
::Anybody
,
433 #[allow(clippy::too_many_arguments)]
439 userfilter
: Option
<String
>,
442 typefilter
: Option
<String
>,
443 statusfilter
: Option
<Vec
<TaskStateType
>>,
445 mut rpcenv
: &mut dyn RpcEnvironment
,
446 ) -> Result
<Vec
<TaskListItem
>, Error
> {
448 let auth_id
: Authid
= rpcenv
.get_auth_id().unwrap().parse()?
;
449 let user_info
= CachedUserInfo
::new()?
;
450 let user_privs
= user_info
.lookup_privs(&auth_id
, &["system", "tasks"]);
452 let list_all
= (user_privs
& PRIV_SYS_AUDIT
) != 0;
454 let store
= param
["store"].as_str();
456 let list
= TaskListInfoIterator
::new(running
)?
;
457 let limit
= if limit
> 0 { limit as usize }
else { usize::MAX }
;
460 let mut result
: Vec
<TaskListItem
> = Vec
::new();
463 let info
= match info
{
468 if let Some(until
) = until
{
469 if info
.upid
.starttime
> until
{
474 if let Some(since
) = since
{
475 if let Some(ref state
) = info
.state
{
476 if state
.endtime() < since
{
477 // we reached the tasks that ended before our 'since'
478 // so we can stop iterating
482 if info
.upid
.starttime
< since
{
487 if !list_all
&& check_task_access(&auth_id
, &info
.upid
).is_err() {
491 if let Some(needle
) = &userfilter
{
492 if !info
.upid
.auth_id
.to_string().contains(needle
) { continue; }
495 if let Some(store
) = store
{
496 if !check_job_store(&info
.upid
, store
) { continue; }
499 if let Some(typefilter
) = &typefilter
{
500 if !info
.upid
.worker_type
.contains(typefilter
) { continue; }
503 match (&info
.state
, &statusfilter
) {
504 (Some(_
), _
) if running
=> continue,
505 (Some(crate::server
::TaskState
::OK { .. }
), _
) if errors
=> continue,
506 (Some(state
), Some(filters
)) => {
507 if !filters
.contains(&tasktype(state
)) {
511 (None
, Some(_
)) => continue,
515 if skipped
< start
as usize {
520 result
.push(info
.into());
522 if result
.len() >= limit
{
527 let mut count
= result
.len() + start
as usize;
528 if !result
.is_empty() && result
.len() >= limit
{ // we have a 'virtual' entry as long as we have any new
532 rpcenv
["total"] = Value
::from(count
);
538 const UPID_API_SUBDIRS
: SubdirMap
= &sorted
!([
540 "log", &Router
::new()
541 .get(&API_METHOD_READ_TASK_LOG
)
544 "status", &Router
::new()
545 .get(&API_METHOD_GET_TASK_STATUS
)
549 pub const UPID_API_ROUTER
: Router
= Router
::new()
550 .get(&list_subdirs_api_method
!(UPID_API_SUBDIRS
))
551 .delete(&API_METHOD_STOP_TASK
)
552 .subdirs(&UPID_API_SUBDIRS
);
554 pub const ROUTER
: Router
= Router
::new()
555 .get(&API_METHOD_LIST_TASKS
)
556 .match_all("upid", &UPID_API_ROUTER
);