]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/node/tasks.rs
move token_shadow to pbs_config workspace
[proxmox-backup.git] / src / api2 / node / tasks.rs
CommitLineData
cad540e9
WB
1use std::fs::File;
2use std::io::{BufRead, BufReader};
063ca5be 3
dbd45a72 4use anyhow::{bail, Error};
063ca5be
DM
5use serde_json::{json, Value};
6
e7cb4dc5 7use proxmox::api::{api, Router, RpcEnvironment, Permission};
cad540e9 8use proxmox::api::router::SubdirMap;
9ea4bce4 9use proxmox::{identity, list_subdirs_api_method, sortable};
552c2259 10
4ebf0eab 11use crate::api2::types::*;
dbd45a72
FG
12use crate::api2::pull::check_pull_privs;
13
95f9d67c 14use crate::server::{self, UPID, UPIDExt, TaskState, TaskListInfoIterator};
dbd45a72
FG
15use crate::config::acl::{
16 PRIV_DATASTORE_MODIFY,
17 PRIV_DATASTORE_VERIFY,
18 PRIV_SYS_AUDIT,
19 PRIV_SYS_MODIFY,
20};
720af9f6
DM
21use crate::config::cached_user_info::CachedUserInfo;
22
dbd45a72
FG
23// matches respective job execution privileges
24fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) -> Result<(), Error> {
25 match (upid.worker_type.as_str(), &upid.worker_id) {
26 ("verificationjob", Some(workerid)) => {
27 if let Some(captures) = VERIFICATION_JOB_WORKER_ID_REGEX.captures(&workerid) {
28 if let Some(store) = captures.get(1) {
29 return user_info.check_privs(&auth_id,
30 &["datastore", store.as_str()],
31 PRIV_DATASTORE_VERIFY,
32 true);
33 }
34 }
35 },
36 ("syncjob", Some(workerid)) => {
37 if let Some(captures) = SYNC_JOB_WORKER_ID_REGEX.captures(&workerid) {
38 let remote = captures.get(1);
39 let remote_store = captures.get(2);
40 let local_store = captures.get(3);
41
42 if let (Some(remote), Some(remote_store), Some(local_store)) =
43 (remote, remote_store, local_store) {
44
45 return check_pull_privs(&auth_id,
46 local_store.as_str(),
47 remote.as_str(),
48 remote_store.as_str(),
49 false);
50 }
51 }
52 },
53 ("garbage_collection", Some(workerid)) => {
54 return user_info.check_privs(&auth_id,
55 &["datastore", &workerid],
56 PRIV_DATASTORE_MODIFY,
57 true)
58 },
59 ("prune", Some(workerid)) => {
60 return user_info.check_privs(&auth_id,
61 &["datastore",
62 &workerid],
63 PRIV_DATASTORE_MODIFY,
64 true);
65 },
66 _ => bail!("not a scheduled job task"),
67 };
68
69 bail!("not a scheduled job task");
70}
71
6e880f19
DC
72// get the store out of the worker_id
73fn check_job_store(upid: &UPID, store: &str) -> bool {
74 match (upid.worker_type.as_str(), &upid.worker_id) {
75 (workertype, Some(workerid)) if workertype.starts_with("verif") => {
76 if let Some(captures) = VERIFICATION_JOB_WORKER_ID_REGEX.captures(&workerid) {
77 if let Some(jobstore) = captures.get(1) {
78 return store == jobstore.as_str();
79 }
80 } else {
81 return workerid == store;
82 }
83 }
84 ("syncjob", Some(workerid)) => {
85 if let Some(captures) = SYNC_JOB_WORKER_ID_REGEX.captures(&workerid) {
86 if let Some(local_store) = captures.get(3) {
87 return store == local_store.as_str();
88 }
89 }
90 }
91 ("prune", Some(workerid))
92 | ("backup", Some(workerid))
93 | ("garbage_collection", Some(workerid)) => {
94 return workerid == store || workerid.starts_with(&format!("{}:", store));
95 }
96 _ => {}
97 };
98
99 false
100}
101
16245d54
FG
102fn check_task_access(auth_id: &Authid, upid: &UPID) -> Result<(), Error> {
103 let task_auth_id = &upid.auth_id;
104 if auth_id == task_auth_id
105 || (task_auth_id.is_token() && &Authid::from(task_auth_id.user().clone()) == auth_id) {
dbd45a72 106 // task owner can always read
16245d54
FG
107 Ok(())
108 } else {
109 let user_info = CachedUserInfo::new()?;
dbd45a72 110
4d08e259
FG
111 // access to all tasks
112 // or task == job which the user/token could have configured/manually executed
113
114 user_info.check_privs(auth_id, &["system", "tasks"], PRIV_SYS_AUDIT, false)
115 .or_else(|_| check_job_privs(&auth_id, &user_info, upid))
116 .or_else(|_| bail!("task access not allowed"))
16245d54
FG
117 }
118}
5a12c0e2 119
83b6a7cf
DM
120#[api(
121 input: {
122 properties: {
123 node: {
124 schema: NODE_SCHEMA,
125 },
126 upid: {
127 schema: UPID_SCHEMA,
128 },
129 },
130 },
131 returns: {
9809772b 132 description: "Task status information.",
83b6a7cf
DM
133 properties: {
134 node: {
135 schema: NODE_SCHEMA,
136 },
137 upid: {
138 schema: UPID_SCHEMA,
139 },
140 pid: {
141 type: i64,
142 description: "The Unix PID.",
143 },
144 pstart: {
145 type: u64,
146 description: "The Unix process start time from `/proc/pid/stat`",
147 },
148 starttime: {
149 type: i64,
150 description: "The task start time (Epoch)",
151 },
152 "type": {
153 type: String,
154 description: "Worker type (arbitrary ASCII string)",
155 },
156 id: {
157 type: String,
158 optional: true,
159 description: "Worker ID (arbitrary ASCII string)",
160 },
161 user: {
16245d54 162 type: Userid,
83b6a7cf 163 },
16245d54
FG
164 tokenid: {
165 type: Tokenname,
166 optional: true,
167 },
83b6a7cf
DM
168 status: {
169 type: String,
170 description: "'running' or 'stopped'",
171 },
172 exitstatus: {
173 type: String,
174 optional: true,
175 description: "'OK', 'Error: <msg>', or 'unkwown'.",
176 },
177 },
178 },
179 access: {
9809772b 180 description: "Users can access their own tasks, or need Sys.Audit on /system/tasks.",
720af9f6 181 permission: &Permission::Anybody,
83b6a7cf
DM
182 },
183)]
184/// Get task status.
5751e495 185async fn get_task_status(
5a12c0e2 186 param: Value,
720af9f6 187 rpcenv: &mut dyn RpcEnvironment,
5a12c0e2
DM
188) -> Result<Value, Error> {
189
190 let upid = extract_upid(&param)?;
191
e6dc35ac 192 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
16245d54 193 check_task_access(&auth_id, &upid)?;
720af9f6 194
c360bd73
DM
195 let mut result = json!({
196 "upid": param["upid"],
197 "node": upid.node,
198 "pid": upid.pid,
199 "pstart": upid.pstart,
200 "starttime": upid.starttime,
201 "type": upid.worker_type,
202 "id": upid.worker_id,
16245d54 203 "user": upid.auth_id.user(),
c360bd73
DM
204 });
205
16245d54
FG
206 if upid.auth_id.is_token() {
207 result["tokenid"] = Value::from(upid.auth_id.tokenname().unwrap().as_str());
208 }
209
5751e495 210 if crate::server::worker_is_active(&upid).await? {
c360bd73 211 result["status"] = Value::from("running");
5a12c0e2 212 } else {
77bd2a46 213 let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
c360bd73 214 result["status"] = Value::from("stopped");
4c116baf 215 result["exitstatus"] = Value::from(exitstatus.to_string());
5a12c0e2
DM
216 };
217
218 Ok(result)
219}
220
221fn extract_upid(param: &Value) -> Result<UPID, Error> {
222
3c8c2827 223 let upid_str = pbs_tools::json::required_string_param(&param, "upid")?;
5a12c0e2 224
8560fe3e 225 upid_str.parse::<UPID>()
5a12c0e2
DM
226}
227
83b6a7cf
DM
228#[api(
229 input: {
230 properties: {
231 node: {
232 schema: NODE_SCHEMA,
233 },
234 upid: {
235 schema: UPID_SCHEMA,
236 },
237 "test-status": {
238 type: bool,
239 optional: true,
240 description: "Test task status, and set result attribute \"active\" accordingly.",
241 },
242 start: {
243 type: u64,
244 optional: true,
245 description: "Start at this line.",
246 default: 0,
247 },
248 limit: {
249 type: u64,
250 optional: true,
251 description: "Only list this amount of lines.",
252 default: 50,
253 },
254 },
255 },
256 access: {
338c545f 257 description: "Users can access their own tasks, or need Sys.Audit on /system/tasks.",
720af9f6 258 permission: &Permission::Anybody,
83b6a7cf
DM
259 },
260)]
261/// Read task log.
5751e495 262async fn read_task_log(
5a12c0e2 263 param: Value,
e8d1da6a 264 mut rpcenv: &mut dyn RpcEnvironment,
5a12c0e2
DM
265) -> Result<Value, Error> {
266
267 let upid = extract_upid(&param)?;
6b508dd5 268
e6dc35ac 269 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
720af9f6 270
16245d54 271 check_task_access(&auth_id, &upid)?;
720af9f6 272
6b508dd5
DM
273 let test_status = param["test-status"].as_bool().unwrap_or(false);
274
5a12c0e2
DM
275 let start = param["start"].as_u64().unwrap_or(0);
276 let mut limit = param["limit"].as_u64().unwrap_or(50);
6b508dd5 277
5a12c0e2
DM
278 let mut count: u64 = 0;
279
280 let path = upid.log_path();
281
282 let file = File::open(path)?;
283
284 let mut lines: Vec<Value> = vec![];
285
286 for line in BufReader::new(file).lines() {
287 match line {
288 Ok(line) => {
289 count += 1;
290 if count < start { continue };
11377a47 291 if limit == 0 { continue };
5a12c0e2
DM
292
293 lines.push(json!({ "n": count, "t": line }));
294
295 limit -= 1;
296 }
297 Err(err) => {
298 log::error!("reading task log failed: {}", err);
299 break;
300 }
301 }
302 }
303
e8d1da6a 304 rpcenv["total"] = Value::from(count);
d8d40dd0 305
6b508dd5 306 if test_status {
5751e495 307 let active = crate::server::worker_is_active(&upid).await?;
e8d1da6a 308 rpcenv["active"] = Value::from(active);
6b508dd5
DM
309 }
310
5a12c0e2
DM
311 Ok(json!(lines))
312}
063ca5be 313
83b6a7cf
DM
314#[api(
315 protected: true,
316 input: {
317 properties: {
318 node: {
319 schema: NODE_SCHEMA,
320 },
321 upid: {
322 schema: UPID_SCHEMA,
323 },
324 },
325 },
326 access: {
338c545f 327 description: "Users can stop their own tasks, or need Sys.Modify on /system/tasks.",
720af9f6 328 permission: &Permission::Anybody,
83b6a7cf
DM
329 },
330)]
331/// Try to stop a task.
a665dea1
DM
332fn stop_task(
333 param: Value,
720af9f6 334 rpcenv: &mut dyn RpcEnvironment,
a665dea1
DM
335) -> Result<Value, Error> {
336
337 let upid = extract_upid(&param)?;
338
e6dc35ac 339 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
720af9f6 340
e6dc35ac 341 if auth_id != upid.auth_id {
720af9f6 342 let user_info = CachedUserInfo::new()?;
e6dc35ac 343 user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
720af9f6
DM
344 }
345
5751e495 346 server::abort_worker_async(upid);
a665dea1
DM
347
348 Ok(Value::Null)
349}
350
2c4b303c
DM
351#[api(
352 input: {
353 properties: {
354 node: {
355 schema: NODE_SCHEMA
356 },
357 start: {
358 type: u64,
359 description: "List tasks beginning from this offset.",
360 default: 0,
361 optional: true,
362 },
363 limit: {
364 type: u64,
e7dd169f 365 description: "Only list this amount of tasks. (0 means no limit)",
2c4b303c
DM
366 default: 50,
367 optional: true,
368 },
369 store: {
370 schema: DATASTORE_SCHEMA,
371 optional: true,
372 },
373 running: {
374 type: bool,
375 description: "Only list running tasks.",
376 optional: true,
ca9dfe5f 377 default: false,
2c4b303c
DM
378 },
379 errors: {
380 type: bool,
381 description: "Only list erroneous tasks.",
382 optional:true,
ca9dfe5f 383 default: false,
2c4b303c
DM
384 },
385 userfilter: {
e7cb4dc5 386 optional: true,
2c4b303c
DM
387 type: String,
388 description: "Only list tasks from this user.",
389 },
a2a7dd15
DC
390 since: {
391 type: i64,
392 description: "Only list tasks since this UNIX epoch.",
393 optional: true,
394 },
c1fa057c
DC
395 until: {
396 type: i64,
397 description: "Only list tasks until this UNIX epoch.",
398 optional: true,
399 },
a2a7dd15
DC
400 typefilter: {
401 optional: true,
402 type: String,
403 description: "Only list tasks whose type contains this.",
404 },
405 statusfilter: {
406 optional: true,
407 type: Array,
408 description: "Only list tasks which have any one of the listed status.",
409 items: {
410 type: TaskStateType,
411 },
412 },
2c4b303c
DM
413 },
414 },
7b570c17 415 returns: pbs_api_types::NODE_TASKS_LIST_TASKS_RETURN_TYPE,
83b6a7cf 416 access: {
338c545f 417 description: "Users can only see their own tasks, unless they have Sys.Audit on /system/tasks.",
720af9f6 418 permission: &Permission::Anybody,
83b6a7cf 419 },
2c4b303c
DM
420)]
421/// List tasks.
367c0ff7 422#[allow(clippy::too_many_arguments)]
8528fce8 423pub fn list_tasks(
ca9dfe5f
DM
424 start: u64,
425 limit: u64,
426 errors: bool,
427 running: bool,
005a5b96 428 userfilter: Option<String>,
a2a7dd15 429 since: Option<i64>,
c1fa057c 430 until: Option<i64>,
a2a7dd15
DC
431 typefilter: Option<String>,
432 statusfilter: Option<Vec<TaskStateType>>,
063ca5be 433 param: Value,
e8d1da6a 434 mut rpcenv: &mut dyn RpcEnvironment,
99384f79 435) -> Result<Vec<TaskListItem>, Error> {
063ca5be 436
e6dc35ac 437 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
720af9f6 438 let user_info = CachedUserInfo::new()?;
e6dc35ac 439 let user_privs = user_info.lookup_privs(&auth_id, &["system", "tasks"]);
720af9f6
DM
440
441 let list_all = (user_privs & PRIV_SYS_AUDIT) != 0;
442
567d3e00
DM
443 let store = param["store"].as_str();
444
768e10d0 445 let list = TaskListInfoIterator::new(running)?;
e7dd169f 446 let limit = if limit > 0 { limit as usize } else { usize::MAX };
d2a2e02b 447
9517a575
DC
448 let mut skipped = 0;
449 let mut result: Vec<TaskListItem> = Vec::new();
450
451 for info in list {
768e10d0
DC
452 let info = match info {
453 Ok(info) => info,
9517a575 454 Err(_) => break,
768e10d0 455 };
720af9f6 456
9517a575
DC
457 if let Some(until) = until {
458 if info.upid.starttime > until {
459 continue;
460 }
461 }
462
463 if let Some(since) = since {
464 if let Some(ref state) = info.state {
465 if state.endtime() < since {
466 // we reached the tasks that ended before our 'since'
467 // so we can stop iterating
468 break;
469 }
470 }
471 if info.upid.starttime < since {
472 continue;
473 }
474 }
475
16245d54 476 if !list_all && check_task_access(&auth_id, &info.upid).is_err() {
9517a575 477 continue;
16245d54 478 }
063ca5be 479
e6dc35ac 480 if let Some(needle) = &userfilter {
9517a575 481 if !info.upid.auth_id.to_string().contains(needle) { continue; }
d2a2e02b
DM
482 }
483
567d3e00 484 if let Some(store) = store {
9517a575 485 if !check_job_store(&info.upid, store) { continue; }
567d3e00
DM
486 }
487
a2a7dd15 488 if let Some(typefilter) = &typefilter {
9517a575 489 if !info.upid.worker_type.contains(typefilter) { continue; }
a2a7dd15
DC
490 }
491
492 match (&info.state, &statusfilter) {
9517a575
DC
493 (Some(_), _) if running => continue,
494 (Some(crate::server::TaskState::OK { .. }), _) if errors => continue,
a2a7dd15
DC
495 (Some(state), Some(filters)) => {
496 if !filters.contains(&state.tasktype()) {
9517a575 497 continue;
a2a7dd15
DC
498 }
499 },
9517a575 500 (None, Some(_)) => continue,
768e10d0 501 _ => {},
063ca5be
DM
502 }
503
9517a575
DC
504 if skipped < start as usize {
505 skipped += 1;
506 continue;
507 }
508
509 result.push(info.into());
510
511 if result.len() >= limit {
512 break;
513 }
514 }
063ca5be 515
768e10d0 516 let mut count = result.len() + start as usize;
3984a5fd 517 if !result.is_empty() && result.len() >= limit { // we have a 'virtual' entry as long as we have any new
768e10d0 518 count += 1;
063ca5be
DM
519 }
520
e8d1da6a 521 rpcenv["total"] = Value::from(count);
063ca5be 522
0196b9bf 523 Ok(result)
063ca5be
DM
524}
525
552c2259 526#[sortable]
83b6a7cf 527const UPID_API_SUBDIRS: SubdirMap = &sorted!([
255f378a
DM
528 (
529 "log", &Router::new()
83b6a7cf 530 .get(&API_METHOD_READ_TASK_LOG)
255f378a
DM
531 ),
532 (
533 "status", &Router::new()
83b6a7cf 534 .get(&API_METHOD_GET_TASK_STATUS)
255f378a 535 )
83b6a7cf 536]);
255f378a
DM
537
538pub const UPID_API_ROUTER: Router = Router::new()
539 .get(&list_subdirs_api_method!(UPID_API_SUBDIRS))
83b6a7cf 540 .delete(&API_METHOD_STOP_TASK)
255f378a
DM
541 .subdirs(&UPID_API_SUBDIRS);
542
543pub const ROUTER: Router = Router::new()
2c4b303c 544 .get(&API_METHOD_LIST_TASKS)
255f378a 545 .match_all("upid", &UPID_API_ROUTER);