]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/node/tasks.rs
sync job: fix worker ID parsing
[proxmox-backup.git] / src / api2 / node / tasks.rs
CommitLineData
cad540e9
WB
1use std::fs::File;
2use std::io::{BufRead, BufReader};
063ca5be 3
dbd45a72 4use anyhow::{bail, Error};
063ca5be
DM
5use serde_json::{json, Value};
6
dc7a5b34 7use proxmox_router::{list_subdirs_api_method, Permission, Router, RpcEnvironment, SubdirMap};
6ef1b649 8use proxmox_schema::api;
dc7a5b34 9use proxmox_sys::sortable;
552c2259 10
8cc3760e 11use pbs_api_types::{
dc7a5b34 12 Authid, TaskListItem, TaskStateType, Tokenname, Userid, DATASTORE_SCHEMA, NODE_SCHEMA,
8cc3760e 13 PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_VERIFY, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
dc7a5b34 14 SYNC_JOB_WORKER_ID_REGEX, UPID, UPID_SCHEMA, VERIFICATION_JOB_WORKER_ID_REGEX,
8cc3760e 15};
dbd45a72 16
8cc3760e 17use crate::api2::pull::check_pull_privs;
b9700a9f 18
ba3d7e19 19use pbs_config::CachedUserInfo;
dc7a5b34 20use proxmox_rest_server::{upid_log_path, upid_read_status, TaskListInfoIterator, TaskState};
720af9f6 21
dbd45a72
FG
22// matches respective job execution privileges
23fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) -> Result<(), Error> {
24 match (upid.worker_type.as_str(), &upid.worker_id) {
b9b2d635 25 // FIXME: parse namespace here?
dbd45a72 26 ("verificationjob", Some(workerid)) => {
9a37bd6c 27 if let Some(captures) = VERIFICATION_JOB_WORKER_ID_REGEX.captures(workerid) {
dbd45a72 28 if let Some(store) = captures.get(1) {
dc7a5b34
TL
29 return user_info.check_privs(
30 auth_id,
31 &["datastore", store.as_str()],
32 PRIV_DATASTORE_VERIFY,
33 true,
34 );
dbd45a72
FG
35 }
36 }
dc7a5b34 37 }
dbd45a72 38 ("syncjob", Some(workerid)) => {
9a37bd6c 39 if let Some(captures) = SYNC_JOB_WORKER_ID_REGEX.captures(workerid) {
dbd45a72
FG
40 let remote = captures.get(1);
41 let remote_store = captures.get(2);
42 let local_store = captures.get(3);
c06c1b4b 43 let local_ns = captures.get(4).map(|m| m.as_str());
dbd45a72
FG
44
45 if let (Some(remote), Some(remote_store), Some(local_store)) =
dc7a5b34
TL
46 (remote, remote_store, local_store)
47 {
48 return check_pull_privs(
49 auth_id,
50 local_store.as_str(),
c06c1b4b 51 local_ns,
dc7a5b34
TL
52 remote.as_str(),
53 remote_store.as_str(),
54 false,
55 );
dbd45a72
FG
56 }
57 }
dc7a5b34 58 }
dbd45a72 59 ("garbage_collection", Some(workerid)) => {
dc7a5b34
TL
60 return user_info.check_privs(
61 auth_id,
62 &["datastore", workerid],
63 PRIV_DATASTORE_MODIFY,
64 true,
65 )
66 }
dbd45a72 67 ("prune", Some(workerid)) => {
36971618
FG
68 let mut acl_path = vec!["datastore"];
69 acl_path.extend(workerid.split(':'));
70 let acl_path = match acl_path.len() {
71 4 => &acl_path[..3], // contains group as fourth element
72 2 | 3 => &acl_path[..], // store + optional NS
73 _ => {
74 bail!("invalid worker ID for prune task");
75 }
76 };
77
78 return user_info.check_privs(auth_id, acl_path, PRIV_DATASTORE_MODIFY, true);
dc7a5b34 79 }
dbd45a72
FG
80 _ => bail!("not a scheduled job task"),
81 };
82
83 bail!("not a scheduled job task");
84}
85
6e880f19
DC
86// get the store out of the worker_id
87fn check_job_store(upid: &UPID, store: &str) -> bool {
88 match (upid.worker_type.as_str(), &upid.worker_id) {
89 (workertype, Some(workerid)) if workertype.starts_with("verif") => {
9a37bd6c 90 if let Some(captures) = VERIFICATION_JOB_WORKER_ID_REGEX.captures(workerid) {
6e880f19
DC
91 if let Some(jobstore) = captures.get(1) {
92 return store == jobstore.as_str();
93 }
94 } else {
95 return workerid == store;
96 }
97 }
98 ("syncjob", Some(workerid)) => {
9a37bd6c 99 if let Some(captures) = SYNC_JOB_WORKER_ID_REGEX.captures(workerid) {
6e880f19
DC
100 if let Some(local_store) = captures.get(3) {
101 return store == local_store.as_str();
102 }
103 }
104 }
105 ("prune", Some(workerid))
106 | ("backup", Some(workerid))
107 | ("garbage_collection", Some(workerid)) => {
108 return workerid == store || workerid.starts_with(&format!("{}:", store));
109 }
110 _ => {}
111 };
112
113 false
114}
115
16245d54 116fn check_task_access(auth_id: &Authid, upid: &UPID) -> Result<(), Error> {
049a22a3
DM
117 let task_auth_id: Authid = upid.auth_id.parse()?;
118 if auth_id == &task_auth_id
dc7a5b34
TL
119 || (task_auth_id.is_token() && &Authid::from(task_auth_id.user().clone()) == auth_id)
120 {
dbd45a72 121 // task owner can always read
16245d54
FG
122 Ok(())
123 } else {
124 let user_info = CachedUserInfo::new()?;
dbd45a72 125
4d08e259
FG
126 // access to all tasks
127 // or task == job which the user/token could have configured/manually executed
128
dc7a5b34
TL
129 user_info
130 .check_privs(auth_id, &["system", "tasks"], PRIV_SYS_AUDIT, false)
9a37bd6c 131 .or_else(|_| check_job_privs(auth_id, &user_info, upid))
4d08e259 132 .or_else(|_| bail!("task access not allowed"))
16245d54
FG
133 }
134}
5a12c0e2 135
fd18775a
DM
136pub fn tasktype(state: &TaskState) -> TaskStateType {
137 match state {
138 TaskState::OK { .. } => TaskStateType::OK,
139 TaskState::Unknown { .. } => TaskStateType::Unknown,
140 TaskState::Error { .. } => TaskStateType::Error,
141 TaskState::Warning { .. } => TaskStateType::Warning,
142 }
143}
144
b9700a9f 145fn into_task_list_item(info: proxmox_rest_server::TaskListInfo) -> pbs_api_types::TaskListItem {
dc7a5b34
TL
146 let (endtime, status) = info.state.map_or_else(
147 || (None, None),
148 |a| (Some(a.endtime()), Some(a.to_string())),
149 );
b9700a9f
DM
150
151 pbs_api_types::TaskListItem {
152 upid: info.upid_str,
153 node: "localhost".to_string(),
154 pid: info.upid.pid as i64,
155 pstart: info.upid.pstart,
156 starttime: info.upid.starttime,
157 worker_type: info.upid.worker_type,
158 worker_id: info.upid.worker_id,
159 user: info.upid.auth_id,
160 endtime,
161 status,
162 }
163}
164
83b6a7cf
DM
165#[api(
166 input: {
167 properties: {
168 node: {
169 schema: NODE_SCHEMA,
170 },
171 upid: {
172 schema: UPID_SCHEMA,
173 },
174 },
175 },
176 returns: {
9809772b 177 description: "Task status information.",
83b6a7cf
DM
178 properties: {
179 node: {
180 schema: NODE_SCHEMA,
181 },
182 upid: {
183 schema: UPID_SCHEMA,
184 },
185 pid: {
186 type: i64,
187 description: "The Unix PID.",
188 },
189 pstart: {
190 type: u64,
191 description: "The Unix process start time from `/proc/pid/stat`",
192 },
193 starttime: {
194 type: i64,
195 description: "The task start time (Epoch)",
196 },
197 "type": {
198 type: String,
199 description: "Worker type (arbitrary ASCII string)",
200 },
201 id: {
202 type: String,
203 optional: true,
204 description: "Worker ID (arbitrary ASCII string)",
205 },
206 user: {
16245d54 207 type: Userid,
83b6a7cf 208 },
16245d54
FG
209 tokenid: {
210 type: Tokenname,
211 optional: true,
212 },
83b6a7cf
DM
213 status: {
214 type: String,
215 description: "'running' or 'stopped'",
216 },
217 exitstatus: {
218 type: String,
219 optional: true,
220 description: "'OK', 'Error: <msg>', or 'unkwown'.",
221 },
222 },
223 },
224 access: {
9809772b 225 description: "Users can access their own tasks, or need Sys.Audit on /system/tasks.",
720af9f6 226 permission: &Permission::Anybody,
83b6a7cf
DM
227 },
228)]
229/// Get task status.
dc7a5b34 230async fn get_task_status(param: Value, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Error> {
5a12c0e2
DM
231 let upid = extract_upid(&param)?;
232
e6dc35ac 233 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
16245d54 234 check_task_access(&auth_id, &upid)?;
720af9f6 235
049a22a3
DM
236 let task_auth_id: Authid = upid.auth_id.parse()?;
237
c360bd73
DM
238 let mut result = json!({
239 "upid": param["upid"],
240 "node": upid.node,
241 "pid": upid.pid,
242 "pstart": upid.pstart,
243 "starttime": upid.starttime,
244 "type": upid.worker_type,
245 "id": upid.worker_id,
049a22a3 246 "user": task_auth_id.user(),
c360bd73
DM
247 });
248
049a22a3
DM
249 if task_auth_id.is_token() {
250 result["tokenid"] = Value::from(task_auth_id.tokenname().unwrap().as_str());
16245d54
FG
251 }
252
b9700a9f 253 if proxmox_rest_server::worker_is_active(&upid).await? {
c360bd73 254 result["status"] = Value::from("running");
5a12c0e2 255 } else {
0a33fba4 256 let exitstatus = upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
c360bd73 257 result["status"] = Value::from("stopped");
4c116baf 258 result["exitstatus"] = Value::from(exitstatus.to_string());
5a12c0e2
DM
259 };
260
261 Ok(result)
262}
263
264fn extract_upid(param: &Value) -> Result<UPID, Error> {
9a37bd6c 265 let upid_str = pbs_tools::json::required_string_param(param, "upid")?;
5a12c0e2 266
8560fe3e 267 upid_str.parse::<UPID>()
5a12c0e2
DM
268}
269
83b6a7cf
DM
270#[api(
271 input: {
272 properties: {
273 node: {
274 schema: NODE_SCHEMA,
275 },
276 upid: {
277 schema: UPID_SCHEMA,
278 },
279 "test-status": {
280 type: bool,
281 optional: true,
282 description: "Test task status, and set result attribute \"active\" accordingly.",
283 },
284 start: {
285 type: u64,
286 optional: true,
287 description: "Start at this line.",
288 default: 0,
289 },
290 limit: {
291 type: u64,
292 optional: true,
293 description: "Only list this amount of lines.",
294 default: 50,
295 },
296 },
297 },
298 access: {
338c545f 299 description: "Users can access their own tasks, or need Sys.Audit on /system/tasks.",
720af9f6 300 permission: &Permission::Anybody,
83b6a7cf
DM
301 },
302)]
303/// Read task log.
41c1a179 304async fn read_task_log(param: Value, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Error> {
5a12c0e2 305 let upid = extract_upid(&param)?;
6b508dd5 306
e6dc35ac 307 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
720af9f6 308
16245d54 309 check_task_access(&auth_id, &upid)?;
720af9f6 310
6b508dd5
DM
311 let test_status = param["test-status"].as_bool().unwrap_or(false);
312
5a12c0e2
DM
313 let start = param["start"].as_u64().unwrap_or(0);
314 let mut limit = param["limit"].as_u64().unwrap_or(50);
6b508dd5 315
5a12c0e2
DM
316 let mut count: u64 = 0;
317
0a33fba4 318 let path = upid_log_path(&upid)?;
5a12c0e2
DM
319
320 let file = File::open(path)?;
321
322 let mut lines: Vec<Value> = vec![];
323
324 for line in BufReader::new(file).lines() {
325 match line {
326 Ok(line) => {
327 count += 1;
dc7a5b34
TL
328 if count < start {
329 continue;
330 };
331 if limit == 0 {
332 continue;
333 };
5a12c0e2
DM
334
335 lines.push(json!({ "n": count, "t": line }));
336
337 limit -= 1;
338 }
339 Err(err) => {
340 log::error!("reading task log failed: {}", err);
341 break;
342 }
343 }
344 }
345
e8d1da6a 346 rpcenv["total"] = Value::from(count);
d8d40dd0 347
6b508dd5 348 if test_status {
b9700a9f 349 let active = proxmox_rest_server::worker_is_active(&upid).await?;
e8d1da6a 350 rpcenv["active"] = Value::from(active);
6b508dd5
DM
351 }
352
5a12c0e2
DM
353 Ok(json!(lines))
354}
063ca5be 355
83b6a7cf
DM
356#[api(
357 protected: true,
358 input: {
359 properties: {
360 node: {
361 schema: NODE_SCHEMA,
362 },
363 upid: {
364 schema: UPID_SCHEMA,
365 },
366 },
367 },
368 access: {
338c545f 369 description: "Users can stop their own tasks, or need Sys.Modify on /system/tasks.",
720af9f6 370 permission: &Permission::Anybody,
83b6a7cf
DM
371 },
372)]
373/// Try to stop a task.
dc7a5b34 374fn stop_task(param: Value, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Error> {
a665dea1
DM
375 let upid = extract_upid(&param)?;
376
049a22a3 377 let auth_id = rpcenv.get_auth_id().unwrap();
720af9f6 378
e6dc35ac 379 if auth_id != upid.auth_id {
720af9f6 380 let user_info = CachedUserInfo::new()?;
049a22a3 381 let auth_id: Authid = auth_id.parse()?;
e6dc35ac 382 user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
720af9f6
DM
383 }
384
2e44983a 385 proxmox_rest_server::abort_worker_nowait(upid);
a665dea1
DM
386
387 Ok(Value::Null)
388}
389
2c4b303c 390#[api(
b7c3eaa9 391 streaming: true,
2c4b303c
DM
392 input: {
393 properties: {
394 node: {
395 schema: NODE_SCHEMA
396 },
397 start: {
398 type: u64,
399 description: "List tasks beginning from this offset.",
400 default: 0,
401 optional: true,
402 },
403 limit: {
404 type: u64,
e7dd169f 405 description: "Only list this amount of tasks. (0 means no limit)",
2c4b303c
DM
406 default: 50,
407 optional: true,
408 },
409 store: {
410 schema: DATASTORE_SCHEMA,
411 optional: true,
412 },
413 running: {
414 type: bool,
415 description: "Only list running tasks.",
416 optional: true,
ca9dfe5f 417 default: false,
2c4b303c
DM
418 },
419 errors: {
420 type: bool,
421 description: "Only list erroneous tasks.",
422 optional:true,
ca9dfe5f 423 default: false,
2c4b303c
DM
424 },
425 userfilter: {
e7cb4dc5 426 optional: true,
2c4b303c
DM
427 type: String,
428 description: "Only list tasks from this user.",
429 },
a2a7dd15
DC
430 since: {
431 type: i64,
432 description: "Only list tasks since this UNIX epoch.",
433 optional: true,
434 },
c1fa057c
DC
435 until: {
436 type: i64,
437 description: "Only list tasks until this UNIX epoch.",
438 optional: true,
439 },
a2a7dd15
DC
440 typefilter: {
441 optional: true,
442 type: String,
443 description: "Only list tasks whose type contains this.",
444 },
445 statusfilter: {
446 optional: true,
447 type: Array,
448 description: "Only list tasks which have any one of the listed status.",
449 items: {
450 type: TaskStateType,
451 },
452 },
2c4b303c
DM
453 },
454 },
7b570c17 455 returns: pbs_api_types::NODE_TASKS_LIST_TASKS_RETURN_TYPE,
83b6a7cf 456 access: {
338c545f 457 description: "Users can only see their own tasks, unless they have Sys.Audit on /system/tasks.",
720af9f6 458 permission: &Permission::Anybody,
83b6a7cf 459 },
2c4b303c
DM
460)]
461/// List tasks.
367c0ff7 462#[allow(clippy::too_many_arguments)]
8528fce8 463pub fn list_tasks(
ca9dfe5f
DM
464 start: u64,
465 limit: u64,
466 errors: bool,
467 running: bool,
005a5b96 468 userfilter: Option<String>,
a2a7dd15 469 since: Option<i64>,
c1fa057c 470 until: Option<i64>,
a2a7dd15
DC
471 typefilter: Option<String>,
472 statusfilter: Option<Vec<TaskStateType>>,
063ca5be 473 param: Value,
41c1a179 474 rpcenv: &mut dyn RpcEnvironment,
99384f79 475) -> Result<Vec<TaskListItem>, Error> {
e6dc35ac 476 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
720af9f6 477 let user_info = CachedUserInfo::new()?;
e6dc35ac 478 let user_privs = user_info.lookup_privs(&auth_id, &["system", "tasks"]);
720af9f6
DM
479
480 let list_all = (user_privs & PRIV_SYS_AUDIT) != 0;
481
567d3e00
DM
482 let store = param["store"].as_str();
483
768e10d0 484 let list = TaskListInfoIterator::new(running)?;
dc7a5b34
TL
485 let limit = if limit > 0 {
486 limit as usize
487 } else {
488 usize::MAX
489 };
d2a2e02b 490
9517a575
DC
491 let mut skipped = 0;
492 let mut result: Vec<TaskListItem> = Vec::new();
493
494 for info in list {
768e10d0
DC
495 let info = match info {
496 Ok(info) => info,
9517a575 497 Err(_) => break,
768e10d0 498 };
720af9f6 499
9517a575
DC
500 if let Some(until) = until {
501 if info.upid.starttime > until {
502 continue;
503 }
504 }
505
506 if let Some(since) = since {
507 if let Some(ref state) = info.state {
508 if state.endtime() < since {
509 // we reached the tasks that ended before our 'since'
510 // so we can stop iterating
511 break;
512 }
513 }
514 if info.upid.starttime < since {
515 continue;
516 }
517 }
518
16245d54 519 if !list_all && check_task_access(&auth_id, &info.upid).is_err() {
9517a575 520 continue;
16245d54 521 }
063ca5be 522
e6dc35ac 523 if let Some(needle) = &userfilter {
dc7a5b34
TL
524 if !info.upid.auth_id.to_string().contains(needle) {
525 continue;
526 }
d2a2e02b
DM
527 }
528
567d3e00 529 if let Some(store) = store {
dc7a5b34
TL
530 if !check_job_store(&info.upid, store) {
531 continue;
532 }
567d3e00
DM
533 }
534
a2a7dd15 535 if let Some(typefilter) = &typefilter {
dc7a5b34
TL
536 if !info.upid.worker_type.contains(typefilter) {
537 continue;
538 }
a2a7dd15
DC
539 }
540
541 match (&info.state, &statusfilter) {
9517a575 542 (Some(_), _) if running => continue,
b9700a9f 543 (Some(TaskState::OK { .. }), _) if errors => continue,
a2a7dd15 544 (Some(state), Some(filters)) => {
fd18775a 545 if !filters.contains(&tasktype(state)) {
9517a575 546 continue;
a2a7dd15 547 }
dc7a5b34 548 }
9517a575 549 (None, Some(_)) => continue,
dc7a5b34 550 _ => {}
063ca5be
DM
551 }
552
9517a575
DC
553 if skipped < start as usize {
554 skipped += 1;
555 continue;
556 }
557
b9700a9f 558 result.push(into_task_list_item(info));
9517a575
DC
559
560 if result.len() >= limit {
561 break;
562 }
563 }
063ca5be 564
768e10d0 565 let mut count = result.len() + start as usize;
dc7a5b34
TL
566 if !result.is_empty() && result.len() >= limit {
567 // we have a 'virtual' entry as long as we have any new
768e10d0 568 count += 1;
063ca5be
DM
569 }
570
e8d1da6a 571 rpcenv["total"] = Value::from(count);
063ca5be 572
0196b9bf 573 Ok(result)
063ca5be
DM
574}
575
552c2259 576#[sortable]
83b6a7cf 577const UPID_API_SUBDIRS: SubdirMap = &sorted!([
dc7a5b34
TL
578 ("log", &Router::new().get(&API_METHOD_READ_TASK_LOG)),
579 ("status", &Router::new().get(&API_METHOD_GET_TASK_STATUS))
83b6a7cf 580]);
255f378a
DM
581
582pub const UPID_API_ROUTER: Router = Router::new()
583 .get(&list_subdirs_api_method!(UPID_API_SUBDIRS))
83b6a7cf 584 .delete(&API_METHOD_STOP_TASK)
9a37bd6c 585 .subdirs(UPID_API_SUBDIRS);
255f378a
DM
586
587pub const ROUTER: Router = Router::new()
2c4b303c 588 .get(&API_METHOD_LIST_TASKS)
255f378a 589 .match_all("upid", &UPID_API_ROUTER);