]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/node/tasks.rs
move worker_task.rs into proxmox-rest-server crate
[proxmox-backup.git] / src / api2 / node / tasks.rs
CommitLineData
cad540e9
WB
1use std::fs::File;
2use std::io::{BufRead, BufReader};
063ca5be 3
dbd45a72 4use anyhow::{bail, Error};
063ca5be
DM
5use serde_json::{json, Value};
6
e7cb4dc5 7use proxmox::api::{api, Router, RpcEnvironment, Permission};
cad540e9 8use proxmox::api::router::SubdirMap;
9ea4bce4 9use proxmox::{identity, list_subdirs_api_method, sortable};
552c2259 10
8cc3760e 11use pbs_api_types::{
6227654a 12 Userid, Authid, Tokenname, TaskListItem, TaskStateType, UPID,
8cc3760e
DM
13 NODE_SCHEMA, UPID_SCHEMA, VERIFICATION_JOB_WORKER_ID_REGEX,
14 SYNC_JOB_WORKER_ID_REGEX, DATASTORE_SCHEMA,
15 PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_VERIFY, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
16};
dbd45a72 17
8cc3760e 18use crate::api2::pull::check_pull_privs;
b9700a9f
DM
19
20use proxmox_rest_server::{upid_log_path, upid_read_status, TaskState, TaskListInfoIterator};
ba3d7e19 21use pbs_config::CachedUserInfo;
720af9f6 22
dbd45a72
FG
23// matches respective job execution privileges
24fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) -> Result<(), Error> {
25 match (upid.worker_type.as_str(), &upid.worker_id) {
26 ("verificationjob", Some(workerid)) => {
27 if let Some(captures) = VERIFICATION_JOB_WORKER_ID_REGEX.captures(&workerid) {
28 if let Some(store) = captures.get(1) {
29 return user_info.check_privs(&auth_id,
30 &["datastore", store.as_str()],
31 PRIV_DATASTORE_VERIFY,
32 true);
33 }
34 }
35 },
36 ("syncjob", Some(workerid)) => {
37 if let Some(captures) = SYNC_JOB_WORKER_ID_REGEX.captures(&workerid) {
38 let remote = captures.get(1);
39 let remote_store = captures.get(2);
40 let local_store = captures.get(3);
41
42 if let (Some(remote), Some(remote_store), Some(local_store)) =
43 (remote, remote_store, local_store) {
44
45 return check_pull_privs(&auth_id,
46 local_store.as_str(),
47 remote.as_str(),
48 remote_store.as_str(),
49 false);
50 }
51 }
52 },
53 ("garbage_collection", Some(workerid)) => {
54 return user_info.check_privs(&auth_id,
55 &["datastore", &workerid],
56 PRIV_DATASTORE_MODIFY,
57 true)
58 },
59 ("prune", Some(workerid)) => {
60 return user_info.check_privs(&auth_id,
61 &["datastore",
62 &workerid],
63 PRIV_DATASTORE_MODIFY,
64 true);
65 },
66 _ => bail!("not a scheduled job task"),
67 };
68
69 bail!("not a scheduled job task");
70}
71
6e880f19
DC
72// get the store out of the worker_id
73fn check_job_store(upid: &UPID, store: &str) -> bool {
74 match (upid.worker_type.as_str(), &upid.worker_id) {
75 (workertype, Some(workerid)) if workertype.starts_with("verif") => {
76 if let Some(captures) = VERIFICATION_JOB_WORKER_ID_REGEX.captures(&workerid) {
77 if let Some(jobstore) = captures.get(1) {
78 return store == jobstore.as_str();
79 }
80 } else {
81 return workerid == store;
82 }
83 }
84 ("syncjob", Some(workerid)) => {
85 if let Some(captures) = SYNC_JOB_WORKER_ID_REGEX.captures(&workerid) {
86 if let Some(local_store) = captures.get(3) {
87 return store == local_store.as_str();
88 }
89 }
90 }
91 ("prune", Some(workerid))
92 | ("backup", Some(workerid))
93 | ("garbage_collection", Some(workerid)) => {
94 return workerid == store || workerid.starts_with(&format!("{}:", store));
95 }
96 _ => {}
97 };
98
99 false
100}
101
16245d54 102fn check_task_access(auth_id: &Authid, upid: &UPID) -> Result<(), Error> {
049a22a3
DM
103 let task_auth_id: Authid = upid.auth_id.parse()?;
104 if auth_id == &task_auth_id
16245d54 105 || (task_auth_id.is_token() && &Authid::from(task_auth_id.user().clone()) == auth_id) {
dbd45a72 106 // task owner can always read
16245d54
FG
107 Ok(())
108 } else {
109 let user_info = CachedUserInfo::new()?;
dbd45a72 110
4d08e259
FG
111 // access to all tasks
112 // or task == job which the user/token could have configured/manually executed
113
114 user_info.check_privs(auth_id, &["system", "tasks"], PRIV_SYS_AUDIT, false)
115 .or_else(|_| check_job_privs(&auth_id, &user_info, upid))
116 .or_else(|_| bail!("task access not allowed"))
16245d54
FG
117 }
118}
5a12c0e2 119
fd18775a
DM
120pub fn tasktype(state: &TaskState) -> TaskStateType {
121 match state {
122 TaskState::OK { .. } => TaskStateType::OK,
123 TaskState::Unknown { .. } => TaskStateType::Unknown,
124 TaskState::Error { .. } => TaskStateType::Error,
125 TaskState::Warning { .. } => TaskStateType::Warning,
126 }
127}
128
b9700a9f
DM
129fn into_task_list_item(info: proxmox_rest_server::TaskListInfo) -> pbs_api_types::TaskListItem {
130 let (endtime, status) = info
131 .state
132 .map_or_else(|| (None, None), |a| (Some(a.endtime()), Some(a.to_string())));
133
134 pbs_api_types::TaskListItem {
135 upid: info.upid_str,
136 node: "localhost".to_string(),
137 pid: info.upid.pid as i64,
138 pstart: info.upid.pstart,
139 starttime: info.upid.starttime,
140 worker_type: info.upid.worker_type,
141 worker_id: info.upid.worker_id,
142 user: info.upid.auth_id,
143 endtime,
144 status,
145 }
146}
147
83b6a7cf
DM
148#[api(
149 input: {
150 properties: {
151 node: {
152 schema: NODE_SCHEMA,
153 },
154 upid: {
155 schema: UPID_SCHEMA,
156 },
157 },
158 },
159 returns: {
9809772b 160 description: "Task status information.",
83b6a7cf
DM
161 properties: {
162 node: {
163 schema: NODE_SCHEMA,
164 },
165 upid: {
166 schema: UPID_SCHEMA,
167 },
168 pid: {
169 type: i64,
170 description: "The Unix PID.",
171 },
172 pstart: {
173 type: u64,
174 description: "The Unix process start time from `/proc/pid/stat`",
175 },
176 starttime: {
177 type: i64,
178 description: "The task start time (Epoch)",
179 },
180 "type": {
181 type: String,
182 description: "Worker type (arbitrary ASCII string)",
183 },
184 id: {
185 type: String,
186 optional: true,
187 description: "Worker ID (arbitrary ASCII string)",
188 },
189 user: {
16245d54 190 type: Userid,
83b6a7cf 191 },
16245d54
FG
192 tokenid: {
193 type: Tokenname,
194 optional: true,
195 },
83b6a7cf
DM
196 status: {
197 type: String,
198 description: "'running' or 'stopped'",
199 },
200 exitstatus: {
201 type: String,
202 optional: true,
203 description: "'OK', 'Error: <msg>', or 'unkwown'.",
204 },
205 },
206 },
207 access: {
9809772b 208 description: "Users can access their own tasks, or need Sys.Audit on /system/tasks.",
720af9f6 209 permission: &Permission::Anybody,
83b6a7cf
DM
210 },
211)]
212/// Get task status.
5751e495 213async fn get_task_status(
5a12c0e2 214 param: Value,
720af9f6 215 rpcenv: &mut dyn RpcEnvironment,
5a12c0e2
DM
216) -> Result<Value, Error> {
217
218 let upid = extract_upid(&param)?;
219
e6dc35ac 220 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
16245d54 221 check_task_access(&auth_id, &upid)?;
720af9f6 222
049a22a3
DM
223 let task_auth_id: Authid = upid.auth_id.parse()?;
224
c360bd73
DM
225 let mut result = json!({
226 "upid": param["upid"],
227 "node": upid.node,
228 "pid": upid.pid,
229 "pstart": upid.pstart,
230 "starttime": upid.starttime,
231 "type": upid.worker_type,
232 "id": upid.worker_id,
049a22a3 233 "user": task_auth_id.user(),
c360bd73
DM
234 });
235
049a22a3
DM
236 if task_auth_id.is_token() {
237 result["tokenid"] = Value::from(task_auth_id.tokenname().unwrap().as_str());
16245d54
FG
238 }
239
b9700a9f 240 if proxmox_rest_server::worker_is_active(&upid).await? {
c360bd73 241 result["status"] = Value::from("running");
5a12c0e2 242 } else {
0a33fba4 243 let exitstatus = upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
c360bd73 244 result["status"] = Value::from("stopped");
4c116baf 245 result["exitstatus"] = Value::from(exitstatus.to_string());
5a12c0e2
DM
246 };
247
248 Ok(result)
249}
250
251fn extract_upid(param: &Value) -> Result<UPID, Error> {
252
3c8c2827 253 let upid_str = pbs_tools::json::required_string_param(&param, "upid")?;
5a12c0e2 254
8560fe3e 255 upid_str.parse::<UPID>()
5a12c0e2
DM
256}
257
83b6a7cf
DM
258#[api(
259 input: {
260 properties: {
261 node: {
262 schema: NODE_SCHEMA,
263 },
264 upid: {
265 schema: UPID_SCHEMA,
266 },
267 "test-status": {
268 type: bool,
269 optional: true,
270 description: "Test task status, and set result attribute \"active\" accordingly.",
271 },
272 start: {
273 type: u64,
274 optional: true,
275 description: "Start at this line.",
276 default: 0,
277 },
278 limit: {
279 type: u64,
280 optional: true,
281 description: "Only list this amount of lines.",
282 default: 50,
283 },
284 },
285 },
286 access: {
338c545f 287 description: "Users can access their own tasks, or need Sys.Audit on /system/tasks.",
720af9f6 288 permission: &Permission::Anybody,
83b6a7cf
DM
289 },
290)]
291/// Read task log.
5751e495 292async fn read_task_log(
5a12c0e2 293 param: Value,
e8d1da6a 294 mut rpcenv: &mut dyn RpcEnvironment,
5a12c0e2
DM
295) -> Result<Value, Error> {
296
297 let upid = extract_upid(&param)?;
6b508dd5 298
e6dc35ac 299 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
720af9f6 300
16245d54 301 check_task_access(&auth_id, &upid)?;
720af9f6 302
6b508dd5
DM
303 let test_status = param["test-status"].as_bool().unwrap_or(false);
304
5a12c0e2
DM
305 let start = param["start"].as_u64().unwrap_or(0);
306 let mut limit = param["limit"].as_u64().unwrap_or(50);
6b508dd5 307
5a12c0e2
DM
308 let mut count: u64 = 0;
309
0a33fba4 310 let path = upid_log_path(&upid)?;
5a12c0e2
DM
311
312 let file = File::open(path)?;
313
314 let mut lines: Vec<Value> = vec![];
315
316 for line in BufReader::new(file).lines() {
317 match line {
318 Ok(line) => {
319 count += 1;
320 if count < start { continue };
11377a47 321 if limit == 0 { continue };
5a12c0e2
DM
322
323 lines.push(json!({ "n": count, "t": line }));
324
325 limit -= 1;
326 }
327 Err(err) => {
328 log::error!("reading task log failed: {}", err);
329 break;
330 }
331 }
332 }
333
e8d1da6a 334 rpcenv["total"] = Value::from(count);
d8d40dd0 335
6b508dd5 336 if test_status {
b9700a9f 337 let active = proxmox_rest_server::worker_is_active(&upid).await?;
e8d1da6a 338 rpcenv["active"] = Value::from(active);
6b508dd5
DM
339 }
340
5a12c0e2
DM
341 Ok(json!(lines))
342}
063ca5be 343
83b6a7cf
DM
344#[api(
345 protected: true,
346 input: {
347 properties: {
348 node: {
349 schema: NODE_SCHEMA,
350 },
351 upid: {
352 schema: UPID_SCHEMA,
353 },
354 },
355 },
356 access: {
338c545f 357 description: "Users can stop their own tasks, or need Sys.Modify on /system/tasks.",
720af9f6 358 permission: &Permission::Anybody,
83b6a7cf
DM
359 },
360)]
361/// Try to stop a task.
a665dea1
DM
362fn stop_task(
363 param: Value,
720af9f6 364 rpcenv: &mut dyn RpcEnvironment,
a665dea1
DM
365) -> Result<Value, Error> {
366
367 let upid = extract_upid(&param)?;
368
049a22a3 369 let auth_id = rpcenv.get_auth_id().unwrap();
720af9f6 370
e6dc35ac 371 if auth_id != upid.auth_id {
720af9f6 372 let user_info = CachedUserInfo::new()?;
049a22a3 373 let auth_id: Authid = auth_id.parse()?;
e6dc35ac 374 user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
720af9f6
DM
375 }
376
b9700a9f 377 proxmox_rest_server::abort_worker_async(upid);
a665dea1
DM
378
379 Ok(Value::Null)
380}
381
2c4b303c
DM
382#[api(
383 input: {
384 properties: {
385 node: {
386 schema: NODE_SCHEMA
387 },
388 start: {
389 type: u64,
390 description: "List tasks beginning from this offset.",
391 default: 0,
392 optional: true,
393 },
394 limit: {
395 type: u64,
e7dd169f 396 description: "Only list this amount of tasks. (0 means no limit)",
2c4b303c
DM
397 default: 50,
398 optional: true,
399 },
400 store: {
401 schema: DATASTORE_SCHEMA,
402 optional: true,
403 },
404 running: {
405 type: bool,
406 description: "Only list running tasks.",
407 optional: true,
ca9dfe5f 408 default: false,
2c4b303c
DM
409 },
410 errors: {
411 type: bool,
412 description: "Only list erroneous tasks.",
413 optional:true,
ca9dfe5f 414 default: false,
2c4b303c
DM
415 },
416 userfilter: {
e7cb4dc5 417 optional: true,
2c4b303c
DM
418 type: String,
419 description: "Only list tasks from this user.",
420 },
a2a7dd15
DC
421 since: {
422 type: i64,
423 description: "Only list tasks since this UNIX epoch.",
424 optional: true,
425 },
c1fa057c
DC
426 until: {
427 type: i64,
428 description: "Only list tasks until this UNIX epoch.",
429 optional: true,
430 },
a2a7dd15
DC
431 typefilter: {
432 optional: true,
433 type: String,
434 description: "Only list tasks whose type contains this.",
435 },
436 statusfilter: {
437 optional: true,
438 type: Array,
439 description: "Only list tasks which have any one of the listed status.",
440 items: {
441 type: TaskStateType,
442 },
443 },
2c4b303c
DM
444 },
445 },
7b570c17 446 returns: pbs_api_types::NODE_TASKS_LIST_TASKS_RETURN_TYPE,
83b6a7cf 447 access: {
338c545f 448 description: "Users can only see their own tasks, unless they have Sys.Audit on /system/tasks.",
720af9f6 449 permission: &Permission::Anybody,
83b6a7cf 450 },
2c4b303c
DM
451)]
452/// List tasks.
367c0ff7 453#[allow(clippy::too_many_arguments)]
8528fce8 454pub fn list_tasks(
ca9dfe5f
DM
455 start: u64,
456 limit: u64,
457 errors: bool,
458 running: bool,
005a5b96 459 userfilter: Option<String>,
a2a7dd15 460 since: Option<i64>,
c1fa057c 461 until: Option<i64>,
a2a7dd15
DC
462 typefilter: Option<String>,
463 statusfilter: Option<Vec<TaskStateType>>,
063ca5be 464 param: Value,
e8d1da6a 465 mut rpcenv: &mut dyn RpcEnvironment,
99384f79 466) -> Result<Vec<TaskListItem>, Error> {
063ca5be 467
e6dc35ac 468 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
720af9f6 469 let user_info = CachedUserInfo::new()?;
e6dc35ac 470 let user_privs = user_info.lookup_privs(&auth_id, &["system", "tasks"]);
720af9f6
DM
471
472 let list_all = (user_privs & PRIV_SYS_AUDIT) != 0;
473
567d3e00
DM
474 let store = param["store"].as_str();
475
768e10d0 476 let list = TaskListInfoIterator::new(running)?;
e7dd169f 477 let limit = if limit > 0 { limit as usize } else { usize::MAX };
d2a2e02b 478
9517a575
DC
479 let mut skipped = 0;
480 let mut result: Vec<TaskListItem> = Vec::new();
481
482 for info in list {
768e10d0
DC
483 let info = match info {
484 Ok(info) => info,
9517a575 485 Err(_) => break,
768e10d0 486 };
720af9f6 487
9517a575
DC
488 if let Some(until) = until {
489 if info.upid.starttime > until {
490 continue;
491 }
492 }
493
494 if let Some(since) = since {
495 if let Some(ref state) = info.state {
496 if state.endtime() < since {
497 // we reached the tasks that ended before our 'since'
498 // so we can stop iterating
499 break;
500 }
501 }
502 if info.upid.starttime < since {
503 continue;
504 }
505 }
506
16245d54 507 if !list_all && check_task_access(&auth_id, &info.upid).is_err() {
9517a575 508 continue;
16245d54 509 }
063ca5be 510
e6dc35ac 511 if let Some(needle) = &userfilter {
9517a575 512 if !info.upid.auth_id.to_string().contains(needle) { continue; }
d2a2e02b
DM
513 }
514
567d3e00 515 if let Some(store) = store {
9517a575 516 if !check_job_store(&info.upid, store) { continue; }
567d3e00
DM
517 }
518
a2a7dd15 519 if let Some(typefilter) = &typefilter {
9517a575 520 if !info.upid.worker_type.contains(typefilter) { continue; }
a2a7dd15
DC
521 }
522
523 match (&info.state, &statusfilter) {
9517a575 524 (Some(_), _) if running => continue,
b9700a9f 525 (Some(TaskState::OK { .. }), _) if errors => continue,
a2a7dd15 526 (Some(state), Some(filters)) => {
fd18775a 527 if !filters.contains(&tasktype(state)) {
9517a575 528 continue;
a2a7dd15
DC
529 }
530 },
9517a575 531 (None, Some(_)) => continue,
768e10d0 532 _ => {},
063ca5be
DM
533 }
534
9517a575
DC
535 if skipped < start as usize {
536 skipped += 1;
537 continue;
538 }
539
b9700a9f 540 result.push(into_task_list_item(info));
9517a575
DC
541
542 if result.len() >= limit {
543 break;
544 }
545 }
063ca5be 546
768e10d0 547 let mut count = result.len() + start as usize;
3984a5fd 548 if !result.is_empty() && result.len() >= limit { // we have a 'virtual' entry as long as we have any new
768e10d0 549 count += 1;
063ca5be
DM
550 }
551
e8d1da6a 552 rpcenv["total"] = Value::from(count);
063ca5be 553
0196b9bf 554 Ok(result)
063ca5be
DM
555}
556
552c2259 557#[sortable]
83b6a7cf 558const UPID_API_SUBDIRS: SubdirMap = &sorted!([
255f378a
DM
559 (
560 "log", &Router::new()
83b6a7cf 561 .get(&API_METHOD_READ_TASK_LOG)
255f378a
DM
562 ),
563 (
564 "status", &Router::new()
83b6a7cf 565 .get(&API_METHOD_GET_TASK_STATUS)
255f378a 566 )
83b6a7cf 567]);
255f378a
DM
568
569pub const UPID_API_ROUTER: Router = Router::new()
570 .get(&list_subdirs_api_method!(UPID_API_SUBDIRS))
83b6a7cf 571 .delete(&API_METHOD_STOP_TASK)
255f378a
DM
572 .subdirs(&UPID_API_SUBDIRS);
573
574pub const ROUTER: Router = Router::new()
2c4b303c 575 .get(&API_METHOD_LIST_TASKS)
255f378a 576 .match_all("upid", &UPID_API_ROUTER);