]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/node/tasks.rs
src/server/worker_task.rs: Avoid using pbs-api-type::Authid
[proxmox-backup.git] / src / api2 / node / tasks.rs
CommitLineData
cad540e9
WB
1use std::fs::File;
2use std::io::{BufRead, BufReader};
063ca5be 3
dbd45a72 4use anyhow::{bail, Error};
063ca5be
DM
5use serde_json::{json, Value};
6
e7cb4dc5 7use proxmox::api::{api, Router, RpcEnvironment, Permission};
cad540e9 8use proxmox::api::router::SubdirMap;
9ea4bce4 9use proxmox::{identity, list_subdirs_api_method, sortable};
552c2259 10
8cc3760e 11use pbs_api_types::{
6227654a 12 Userid, Authid, Tokenname, TaskListItem, TaskStateType, UPID,
8cc3760e
DM
13 NODE_SCHEMA, UPID_SCHEMA, VERIFICATION_JOB_WORKER_ID_REGEX,
14 SYNC_JOB_WORKER_ID_REGEX, DATASTORE_SCHEMA,
15 PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_VERIFY, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
16};
dbd45a72 17
8cc3760e 18use crate::api2::pull::check_pull_privs;
6227654a 19use crate::server::{self, UPIDExt, TaskState, TaskListInfoIterator};
ba3d7e19 20use pbs_config::CachedUserInfo;
720af9f6 21
dbd45a72
FG
22// matches respective job execution privileges
23fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) -> Result<(), Error> {
24 match (upid.worker_type.as_str(), &upid.worker_id) {
25 ("verificationjob", Some(workerid)) => {
26 if let Some(captures) = VERIFICATION_JOB_WORKER_ID_REGEX.captures(&workerid) {
27 if let Some(store) = captures.get(1) {
28 return user_info.check_privs(&auth_id,
29 &["datastore", store.as_str()],
30 PRIV_DATASTORE_VERIFY,
31 true);
32 }
33 }
34 },
35 ("syncjob", Some(workerid)) => {
36 if let Some(captures) = SYNC_JOB_WORKER_ID_REGEX.captures(&workerid) {
37 let remote = captures.get(1);
38 let remote_store = captures.get(2);
39 let local_store = captures.get(3);
40
41 if let (Some(remote), Some(remote_store), Some(local_store)) =
42 (remote, remote_store, local_store) {
43
44 return check_pull_privs(&auth_id,
45 local_store.as_str(),
46 remote.as_str(),
47 remote_store.as_str(),
48 false);
49 }
50 }
51 },
52 ("garbage_collection", Some(workerid)) => {
53 return user_info.check_privs(&auth_id,
54 &["datastore", &workerid],
55 PRIV_DATASTORE_MODIFY,
56 true)
57 },
58 ("prune", Some(workerid)) => {
59 return user_info.check_privs(&auth_id,
60 &["datastore",
61 &workerid],
62 PRIV_DATASTORE_MODIFY,
63 true);
64 },
65 _ => bail!("not a scheduled job task"),
66 };
67
68 bail!("not a scheduled job task");
69}
70
6e880f19
DC
71// get the store out of the worker_id
72fn check_job_store(upid: &UPID, store: &str) -> bool {
73 match (upid.worker_type.as_str(), &upid.worker_id) {
74 (workertype, Some(workerid)) if workertype.starts_with("verif") => {
75 if let Some(captures) = VERIFICATION_JOB_WORKER_ID_REGEX.captures(&workerid) {
76 if let Some(jobstore) = captures.get(1) {
77 return store == jobstore.as_str();
78 }
79 } else {
80 return workerid == store;
81 }
82 }
83 ("syncjob", Some(workerid)) => {
84 if let Some(captures) = SYNC_JOB_WORKER_ID_REGEX.captures(&workerid) {
85 if let Some(local_store) = captures.get(3) {
86 return store == local_store.as_str();
87 }
88 }
89 }
90 ("prune", Some(workerid))
91 | ("backup", Some(workerid))
92 | ("garbage_collection", Some(workerid)) => {
93 return workerid == store || workerid.starts_with(&format!("{}:", store));
94 }
95 _ => {}
96 };
97
98 false
99}
100
16245d54 101fn check_task_access(auth_id: &Authid, upid: &UPID) -> Result<(), Error> {
049a22a3
DM
102 let task_auth_id: Authid = upid.auth_id.parse()?;
103 if auth_id == &task_auth_id
16245d54 104 || (task_auth_id.is_token() && &Authid::from(task_auth_id.user().clone()) == auth_id) {
dbd45a72 105 // task owner can always read
16245d54
FG
106 Ok(())
107 } else {
108 let user_info = CachedUserInfo::new()?;
dbd45a72 109
4d08e259
FG
110 // access to all tasks
111 // or task == job which the user/token could have configured/manually executed
112
113 user_info.check_privs(auth_id, &["system", "tasks"], PRIV_SYS_AUDIT, false)
114 .or_else(|_| check_job_privs(&auth_id, &user_info, upid))
115 .or_else(|_| bail!("task access not allowed"))
16245d54
FG
116 }
117}
5a12c0e2 118
fd18775a
DM
119pub fn tasktype(state: &TaskState) -> TaskStateType {
120 match state {
121 TaskState::OK { .. } => TaskStateType::OK,
122 TaskState::Unknown { .. } => TaskStateType::Unknown,
123 TaskState::Error { .. } => TaskStateType::Error,
124 TaskState::Warning { .. } => TaskStateType::Warning,
125 }
126}
127
83b6a7cf
DM
128#[api(
129 input: {
130 properties: {
131 node: {
132 schema: NODE_SCHEMA,
133 },
134 upid: {
135 schema: UPID_SCHEMA,
136 },
137 },
138 },
139 returns: {
9809772b 140 description: "Task status information.",
83b6a7cf
DM
141 properties: {
142 node: {
143 schema: NODE_SCHEMA,
144 },
145 upid: {
146 schema: UPID_SCHEMA,
147 },
148 pid: {
149 type: i64,
150 description: "The Unix PID.",
151 },
152 pstart: {
153 type: u64,
154 description: "The Unix process start time from `/proc/pid/stat`",
155 },
156 starttime: {
157 type: i64,
158 description: "The task start time (Epoch)",
159 },
160 "type": {
161 type: String,
162 description: "Worker type (arbitrary ASCII string)",
163 },
164 id: {
165 type: String,
166 optional: true,
167 description: "Worker ID (arbitrary ASCII string)",
168 },
169 user: {
16245d54 170 type: Userid,
83b6a7cf 171 },
16245d54
FG
172 tokenid: {
173 type: Tokenname,
174 optional: true,
175 },
83b6a7cf
DM
176 status: {
177 type: String,
178 description: "'running' or 'stopped'",
179 },
180 exitstatus: {
181 type: String,
182 optional: true,
183 description: "'OK', 'Error: <msg>', or 'unkwown'.",
184 },
185 },
186 },
187 access: {
9809772b 188 description: "Users can access their own tasks, or need Sys.Audit on /system/tasks.",
720af9f6 189 permission: &Permission::Anybody,
83b6a7cf
DM
190 },
191)]
192/// Get task status.
5751e495 193async fn get_task_status(
5a12c0e2 194 param: Value,
720af9f6 195 rpcenv: &mut dyn RpcEnvironment,
5a12c0e2
DM
196) -> Result<Value, Error> {
197
198 let upid = extract_upid(&param)?;
199
e6dc35ac 200 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
16245d54 201 check_task_access(&auth_id, &upid)?;
720af9f6 202
049a22a3
DM
203 let task_auth_id: Authid = upid.auth_id.parse()?;
204
c360bd73
DM
205 let mut result = json!({
206 "upid": param["upid"],
207 "node": upid.node,
208 "pid": upid.pid,
209 "pstart": upid.pstart,
210 "starttime": upid.starttime,
211 "type": upid.worker_type,
212 "id": upid.worker_id,
049a22a3 213 "user": task_auth_id.user(),
c360bd73
DM
214 });
215
049a22a3
DM
216 if task_auth_id.is_token() {
217 result["tokenid"] = Value::from(task_auth_id.tokenname().unwrap().as_str());
16245d54
FG
218 }
219
5751e495 220 if crate::server::worker_is_active(&upid).await? {
c360bd73 221 result["status"] = Value::from("running");
5a12c0e2 222 } else {
77bd2a46 223 let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
c360bd73 224 result["status"] = Value::from("stopped");
4c116baf 225 result["exitstatus"] = Value::from(exitstatus.to_string());
5a12c0e2
DM
226 };
227
228 Ok(result)
229}
230
231fn extract_upid(param: &Value) -> Result<UPID, Error> {
232
3c8c2827 233 let upid_str = pbs_tools::json::required_string_param(&param, "upid")?;
5a12c0e2 234
8560fe3e 235 upid_str.parse::<UPID>()
5a12c0e2
DM
236}
237
83b6a7cf
DM
238#[api(
239 input: {
240 properties: {
241 node: {
242 schema: NODE_SCHEMA,
243 },
244 upid: {
245 schema: UPID_SCHEMA,
246 },
247 "test-status": {
248 type: bool,
249 optional: true,
250 description: "Test task status, and set result attribute \"active\" accordingly.",
251 },
252 start: {
253 type: u64,
254 optional: true,
255 description: "Start at this line.",
256 default: 0,
257 },
258 limit: {
259 type: u64,
260 optional: true,
261 description: "Only list this amount of lines.",
262 default: 50,
263 },
264 },
265 },
266 access: {
338c545f 267 description: "Users can access their own tasks, or need Sys.Audit on /system/tasks.",
720af9f6 268 permission: &Permission::Anybody,
83b6a7cf
DM
269 },
270)]
271/// Read task log.
5751e495 272async fn read_task_log(
5a12c0e2 273 param: Value,
e8d1da6a 274 mut rpcenv: &mut dyn RpcEnvironment,
5a12c0e2
DM
275) -> Result<Value, Error> {
276
277 let upid = extract_upid(&param)?;
6b508dd5 278
e6dc35ac 279 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
720af9f6 280
16245d54 281 check_task_access(&auth_id, &upid)?;
720af9f6 282
6b508dd5
DM
283 let test_status = param["test-status"].as_bool().unwrap_or(false);
284
5a12c0e2
DM
285 let start = param["start"].as_u64().unwrap_or(0);
286 let mut limit = param["limit"].as_u64().unwrap_or(50);
6b508dd5 287
5a12c0e2
DM
288 let mut count: u64 = 0;
289
290 let path = upid.log_path();
291
292 let file = File::open(path)?;
293
294 let mut lines: Vec<Value> = vec![];
295
296 for line in BufReader::new(file).lines() {
297 match line {
298 Ok(line) => {
299 count += 1;
300 if count < start { continue };
11377a47 301 if limit == 0 { continue };
5a12c0e2
DM
302
303 lines.push(json!({ "n": count, "t": line }));
304
305 limit -= 1;
306 }
307 Err(err) => {
308 log::error!("reading task log failed: {}", err);
309 break;
310 }
311 }
312 }
313
e8d1da6a 314 rpcenv["total"] = Value::from(count);
d8d40dd0 315
6b508dd5 316 if test_status {
5751e495 317 let active = crate::server::worker_is_active(&upid).await?;
e8d1da6a 318 rpcenv["active"] = Value::from(active);
6b508dd5
DM
319 }
320
5a12c0e2
DM
321 Ok(json!(lines))
322}
063ca5be 323
83b6a7cf
DM
324#[api(
325 protected: true,
326 input: {
327 properties: {
328 node: {
329 schema: NODE_SCHEMA,
330 },
331 upid: {
332 schema: UPID_SCHEMA,
333 },
334 },
335 },
336 access: {
338c545f 337 description: "Users can stop their own tasks, or need Sys.Modify on /system/tasks.",
720af9f6 338 permission: &Permission::Anybody,
83b6a7cf
DM
339 },
340)]
341/// Try to stop a task.
a665dea1
DM
342fn stop_task(
343 param: Value,
720af9f6 344 rpcenv: &mut dyn RpcEnvironment,
a665dea1
DM
345) -> Result<Value, Error> {
346
347 let upid = extract_upid(&param)?;
348
049a22a3 349 let auth_id = rpcenv.get_auth_id().unwrap();
720af9f6 350
e6dc35ac 351 if auth_id != upid.auth_id {
720af9f6 352 let user_info = CachedUserInfo::new()?;
049a22a3 353 let auth_id: Authid = auth_id.parse()?;
e6dc35ac 354 user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
720af9f6
DM
355 }
356
5751e495 357 server::abort_worker_async(upid);
a665dea1
DM
358
359 Ok(Value::Null)
360}
361
2c4b303c
DM
362#[api(
363 input: {
364 properties: {
365 node: {
366 schema: NODE_SCHEMA
367 },
368 start: {
369 type: u64,
370 description: "List tasks beginning from this offset.",
371 default: 0,
372 optional: true,
373 },
374 limit: {
375 type: u64,
e7dd169f 376 description: "Only list this amount of tasks. (0 means no limit)",
2c4b303c
DM
377 default: 50,
378 optional: true,
379 },
380 store: {
381 schema: DATASTORE_SCHEMA,
382 optional: true,
383 },
384 running: {
385 type: bool,
386 description: "Only list running tasks.",
387 optional: true,
ca9dfe5f 388 default: false,
2c4b303c
DM
389 },
390 errors: {
391 type: bool,
392 description: "Only list erroneous tasks.",
393 optional:true,
ca9dfe5f 394 default: false,
2c4b303c
DM
395 },
396 userfilter: {
e7cb4dc5 397 optional: true,
2c4b303c
DM
398 type: String,
399 description: "Only list tasks from this user.",
400 },
a2a7dd15
DC
401 since: {
402 type: i64,
403 description: "Only list tasks since this UNIX epoch.",
404 optional: true,
405 },
c1fa057c
DC
406 until: {
407 type: i64,
408 description: "Only list tasks until this UNIX epoch.",
409 optional: true,
410 },
a2a7dd15
DC
411 typefilter: {
412 optional: true,
413 type: String,
414 description: "Only list tasks whose type contains this.",
415 },
416 statusfilter: {
417 optional: true,
418 type: Array,
419 description: "Only list tasks which have any one of the listed status.",
420 items: {
421 type: TaskStateType,
422 },
423 },
2c4b303c
DM
424 },
425 },
7b570c17 426 returns: pbs_api_types::NODE_TASKS_LIST_TASKS_RETURN_TYPE,
83b6a7cf 427 access: {
338c545f 428 description: "Users can only see their own tasks, unless they have Sys.Audit on /system/tasks.",
720af9f6 429 permission: &Permission::Anybody,
83b6a7cf 430 },
2c4b303c
DM
431)]
432/// List tasks.
367c0ff7 433#[allow(clippy::too_many_arguments)]
8528fce8 434pub fn list_tasks(
ca9dfe5f
DM
435 start: u64,
436 limit: u64,
437 errors: bool,
438 running: bool,
005a5b96 439 userfilter: Option<String>,
a2a7dd15 440 since: Option<i64>,
c1fa057c 441 until: Option<i64>,
a2a7dd15
DC
442 typefilter: Option<String>,
443 statusfilter: Option<Vec<TaskStateType>>,
063ca5be 444 param: Value,
e8d1da6a 445 mut rpcenv: &mut dyn RpcEnvironment,
99384f79 446) -> Result<Vec<TaskListItem>, Error> {
063ca5be 447
e6dc35ac 448 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
720af9f6 449 let user_info = CachedUserInfo::new()?;
e6dc35ac 450 let user_privs = user_info.lookup_privs(&auth_id, &["system", "tasks"]);
720af9f6
DM
451
452 let list_all = (user_privs & PRIV_SYS_AUDIT) != 0;
453
567d3e00
DM
454 let store = param["store"].as_str();
455
768e10d0 456 let list = TaskListInfoIterator::new(running)?;
e7dd169f 457 let limit = if limit > 0 { limit as usize } else { usize::MAX };
d2a2e02b 458
9517a575
DC
459 let mut skipped = 0;
460 let mut result: Vec<TaskListItem> = Vec::new();
461
462 for info in list {
768e10d0
DC
463 let info = match info {
464 Ok(info) => info,
9517a575 465 Err(_) => break,
768e10d0 466 };
720af9f6 467
9517a575
DC
468 if let Some(until) = until {
469 if info.upid.starttime > until {
470 continue;
471 }
472 }
473
474 if let Some(since) = since {
475 if let Some(ref state) = info.state {
476 if state.endtime() < since {
477 // we reached the tasks that ended before our 'since'
478 // so we can stop iterating
479 break;
480 }
481 }
482 if info.upid.starttime < since {
483 continue;
484 }
485 }
486
16245d54 487 if !list_all && check_task_access(&auth_id, &info.upid).is_err() {
9517a575 488 continue;
16245d54 489 }
063ca5be 490
e6dc35ac 491 if let Some(needle) = &userfilter {
9517a575 492 if !info.upid.auth_id.to_string().contains(needle) { continue; }
d2a2e02b
DM
493 }
494
567d3e00 495 if let Some(store) = store {
9517a575 496 if !check_job_store(&info.upid, store) { continue; }
567d3e00
DM
497 }
498
a2a7dd15 499 if let Some(typefilter) = &typefilter {
9517a575 500 if !info.upid.worker_type.contains(typefilter) { continue; }
a2a7dd15
DC
501 }
502
503 match (&info.state, &statusfilter) {
9517a575
DC
504 (Some(_), _) if running => continue,
505 (Some(crate::server::TaskState::OK { .. }), _) if errors => continue,
a2a7dd15 506 (Some(state), Some(filters)) => {
fd18775a 507 if !filters.contains(&tasktype(state)) {
9517a575 508 continue;
a2a7dd15
DC
509 }
510 },
9517a575 511 (None, Some(_)) => continue,
768e10d0 512 _ => {},
063ca5be
DM
513 }
514
9517a575
DC
515 if skipped < start as usize {
516 skipped += 1;
517 continue;
518 }
519
520 result.push(info.into());
521
522 if result.len() >= limit {
523 break;
524 }
525 }
063ca5be 526
768e10d0 527 let mut count = result.len() + start as usize;
3984a5fd 528 if !result.is_empty() && result.len() >= limit { // we have a 'virtual' entry as long as we have any new
768e10d0 529 count += 1;
063ca5be
DM
530 }
531
e8d1da6a 532 rpcenv["total"] = Value::from(count);
063ca5be 533
0196b9bf 534 Ok(result)
063ca5be
DM
535}
536
552c2259 537#[sortable]
83b6a7cf 538const UPID_API_SUBDIRS: SubdirMap = &sorted!([
255f378a
DM
539 (
540 "log", &Router::new()
83b6a7cf 541 .get(&API_METHOD_READ_TASK_LOG)
255f378a
DM
542 ),
543 (
544 "status", &Router::new()
83b6a7cf 545 .get(&API_METHOD_GET_TASK_STATUS)
255f378a 546 )
83b6a7cf 547]);
255f378a
DM
548
549pub const UPID_API_ROUTER: Router = Router::new()
550 .get(&list_subdirs_api_method!(UPID_API_SUBDIRS))
83b6a7cf 551 .delete(&API_METHOD_STOP_TASK)
255f378a
DM
552 .subdirs(&UPID_API_SUBDIRS);
553
554pub const ROUTER: Router = Router::new()
2c4b303c 555 .get(&API_METHOD_LIST_TASKS)
255f378a 556 .match_all("upid", &UPID_API_ROUTER);