]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/node/tasks.rs
src/server/worker_task.rs: Avoid using pbs-api-type::Authid
[proxmox-backup.git] / src / api2 / node / tasks.rs
1 use std::fs::File;
2 use std::io::{BufRead, BufReader};
3
4 use anyhow::{bail, Error};
5 use serde_json::{json, Value};
6
7 use proxmox::api::{api, Router, RpcEnvironment, Permission};
8 use proxmox::api::router::SubdirMap;
9 use proxmox::{identity, list_subdirs_api_method, sortable};
10
11 use pbs_api_types::{
12 Userid, Authid, Tokenname, TaskListItem, TaskStateType, UPID,
13 NODE_SCHEMA, UPID_SCHEMA, VERIFICATION_JOB_WORKER_ID_REGEX,
14 SYNC_JOB_WORKER_ID_REGEX, DATASTORE_SCHEMA,
15 PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_VERIFY, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
16 };
17
18 use crate::api2::pull::check_pull_privs;
19 use crate::server::{self, UPIDExt, TaskState, TaskListInfoIterator};
20 use pbs_config::CachedUserInfo;
21
22 // matches respective job execution privileges
23 fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) -> Result<(), Error> {
24 match (upid.worker_type.as_str(), &upid.worker_id) {
25 ("verificationjob", Some(workerid)) => {
26 if let Some(captures) = VERIFICATION_JOB_WORKER_ID_REGEX.captures(&workerid) {
27 if let Some(store) = captures.get(1) {
28 return user_info.check_privs(&auth_id,
29 &["datastore", store.as_str()],
30 PRIV_DATASTORE_VERIFY,
31 true);
32 }
33 }
34 },
35 ("syncjob", Some(workerid)) => {
36 if let Some(captures) = SYNC_JOB_WORKER_ID_REGEX.captures(&workerid) {
37 let remote = captures.get(1);
38 let remote_store = captures.get(2);
39 let local_store = captures.get(3);
40
41 if let (Some(remote), Some(remote_store), Some(local_store)) =
42 (remote, remote_store, local_store) {
43
44 return check_pull_privs(&auth_id,
45 local_store.as_str(),
46 remote.as_str(),
47 remote_store.as_str(),
48 false);
49 }
50 }
51 },
52 ("garbage_collection", Some(workerid)) => {
53 return user_info.check_privs(&auth_id,
54 &["datastore", &workerid],
55 PRIV_DATASTORE_MODIFY,
56 true)
57 },
58 ("prune", Some(workerid)) => {
59 return user_info.check_privs(&auth_id,
60 &["datastore",
61 &workerid],
62 PRIV_DATASTORE_MODIFY,
63 true);
64 },
65 _ => bail!("not a scheduled job task"),
66 };
67
68 bail!("not a scheduled job task");
69 }
70
71 // get the store out of the worker_id
72 fn check_job_store(upid: &UPID, store: &str) -> bool {
73 match (upid.worker_type.as_str(), &upid.worker_id) {
74 (workertype, Some(workerid)) if workertype.starts_with("verif") => {
75 if let Some(captures) = VERIFICATION_JOB_WORKER_ID_REGEX.captures(&workerid) {
76 if let Some(jobstore) = captures.get(1) {
77 return store == jobstore.as_str();
78 }
79 } else {
80 return workerid == store;
81 }
82 }
83 ("syncjob", Some(workerid)) => {
84 if let Some(captures) = SYNC_JOB_WORKER_ID_REGEX.captures(&workerid) {
85 if let Some(local_store) = captures.get(3) {
86 return store == local_store.as_str();
87 }
88 }
89 }
90 ("prune", Some(workerid))
91 | ("backup", Some(workerid))
92 | ("garbage_collection", Some(workerid)) => {
93 return workerid == store || workerid.starts_with(&format!("{}:", store));
94 }
95 _ => {}
96 };
97
98 false
99 }
100
101 fn check_task_access(auth_id: &Authid, upid: &UPID) -> Result<(), Error> {
102 let task_auth_id: Authid = upid.auth_id.parse()?;
103 if auth_id == &task_auth_id
104 || (task_auth_id.is_token() && &Authid::from(task_auth_id.user().clone()) == auth_id) {
105 // task owner can always read
106 Ok(())
107 } else {
108 let user_info = CachedUserInfo::new()?;
109
110 // access to all tasks
111 // or task == job which the user/token could have configured/manually executed
112
113 user_info.check_privs(auth_id, &["system", "tasks"], PRIV_SYS_AUDIT, false)
114 .or_else(|_| check_job_privs(&auth_id, &user_info, upid))
115 .or_else(|_| bail!("task access not allowed"))
116 }
117 }
118
119 pub fn tasktype(state: &TaskState) -> TaskStateType {
120 match state {
121 TaskState::OK { .. } => TaskStateType::OK,
122 TaskState::Unknown { .. } => TaskStateType::Unknown,
123 TaskState::Error { .. } => TaskStateType::Error,
124 TaskState::Warning { .. } => TaskStateType::Warning,
125 }
126 }
127
128 #[api(
129 input: {
130 properties: {
131 node: {
132 schema: NODE_SCHEMA,
133 },
134 upid: {
135 schema: UPID_SCHEMA,
136 },
137 },
138 },
139 returns: {
140 description: "Task status information.",
141 properties: {
142 node: {
143 schema: NODE_SCHEMA,
144 },
145 upid: {
146 schema: UPID_SCHEMA,
147 },
148 pid: {
149 type: i64,
150 description: "The Unix PID.",
151 },
152 pstart: {
153 type: u64,
154 description: "The Unix process start time from `/proc/pid/stat`",
155 },
156 starttime: {
157 type: i64,
158 description: "The task start time (Epoch)",
159 },
160 "type": {
161 type: String,
162 description: "Worker type (arbitrary ASCII string)",
163 },
164 id: {
165 type: String,
166 optional: true,
167 description: "Worker ID (arbitrary ASCII string)",
168 },
169 user: {
170 type: Userid,
171 },
172 tokenid: {
173 type: Tokenname,
174 optional: true,
175 },
176 status: {
177 type: String,
178 description: "'running' or 'stopped'",
179 },
180 exitstatus: {
181 type: String,
182 optional: true,
183 description: "'OK', 'Error: <msg>', or 'unkwown'.",
184 },
185 },
186 },
187 access: {
188 description: "Users can access their own tasks, or need Sys.Audit on /system/tasks.",
189 permission: &Permission::Anybody,
190 },
191 )]
192 /// Get task status.
193 async fn get_task_status(
194 param: Value,
195 rpcenv: &mut dyn RpcEnvironment,
196 ) -> Result<Value, Error> {
197
198 let upid = extract_upid(&param)?;
199
200 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
201 check_task_access(&auth_id, &upid)?;
202
203 let task_auth_id: Authid = upid.auth_id.parse()?;
204
205 let mut result = json!({
206 "upid": param["upid"],
207 "node": upid.node,
208 "pid": upid.pid,
209 "pstart": upid.pstart,
210 "starttime": upid.starttime,
211 "type": upid.worker_type,
212 "id": upid.worker_id,
213 "user": task_auth_id.user(),
214 });
215
216 if task_auth_id.is_token() {
217 result["tokenid"] = Value::from(task_auth_id.tokenname().unwrap().as_str());
218 }
219
220 if crate::server::worker_is_active(&upid).await? {
221 result["status"] = Value::from("running");
222 } else {
223 let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
224 result["status"] = Value::from("stopped");
225 result["exitstatus"] = Value::from(exitstatus.to_string());
226 };
227
228 Ok(result)
229 }
230
231 fn extract_upid(param: &Value) -> Result<UPID, Error> {
232
233 let upid_str = pbs_tools::json::required_string_param(&param, "upid")?;
234
235 upid_str.parse::<UPID>()
236 }
237
238 #[api(
239 input: {
240 properties: {
241 node: {
242 schema: NODE_SCHEMA,
243 },
244 upid: {
245 schema: UPID_SCHEMA,
246 },
247 "test-status": {
248 type: bool,
249 optional: true,
250 description: "Test task status, and set result attribute \"active\" accordingly.",
251 },
252 start: {
253 type: u64,
254 optional: true,
255 description: "Start at this line.",
256 default: 0,
257 },
258 limit: {
259 type: u64,
260 optional: true,
261 description: "Only list this amount of lines.",
262 default: 50,
263 },
264 },
265 },
266 access: {
267 description: "Users can access their own tasks, or need Sys.Audit on /system/tasks.",
268 permission: &Permission::Anybody,
269 },
270 )]
271 /// Read task log.
272 async fn read_task_log(
273 param: Value,
274 mut rpcenv: &mut dyn RpcEnvironment,
275 ) -> Result<Value, Error> {
276
277 let upid = extract_upid(&param)?;
278
279 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
280
281 check_task_access(&auth_id, &upid)?;
282
283 let test_status = param["test-status"].as_bool().unwrap_or(false);
284
285 let start = param["start"].as_u64().unwrap_or(0);
286 let mut limit = param["limit"].as_u64().unwrap_or(50);
287
288 let mut count: u64 = 0;
289
290 let path = upid.log_path();
291
292 let file = File::open(path)?;
293
294 let mut lines: Vec<Value> = vec![];
295
296 for line in BufReader::new(file).lines() {
297 match line {
298 Ok(line) => {
299 count += 1;
300 if count < start { continue };
301 if limit == 0 { continue };
302
303 lines.push(json!({ "n": count, "t": line }));
304
305 limit -= 1;
306 }
307 Err(err) => {
308 log::error!("reading task log failed: {}", err);
309 break;
310 }
311 }
312 }
313
314 rpcenv["total"] = Value::from(count);
315
316 if test_status {
317 let active = crate::server::worker_is_active(&upid).await?;
318 rpcenv["active"] = Value::from(active);
319 }
320
321 Ok(json!(lines))
322 }
323
324 #[api(
325 protected: true,
326 input: {
327 properties: {
328 node: {
329 schema: NODE_SCHEMA,
330 },
331 upid: {
332 schema: UPID_SCHEMA,
333 },
334 },
335 },
336 access: {
337 description: "Users can stop their own tasks, or need Sys.Modify on /system/tasks.",
338 permission: &Permission::Anybody,
339 },
340 )]
341 /// Try to stop a task.
342 fn stop_task(
343 param: Value,
344 rpcenv: &mut dyn RpcEnvironment,
345 ) -> Result<Value, Error> {
346
347 let upid = extract_upid(&param)?;
348
349 let auth_id = rpcenv.get_auth_id().unwrap();
350
351 if auth_id != upid.auth_id {
352 let user_info = CachedUserInfo::new()?;
353 let auth_id: Authid = auth_id.parse()?;
354 user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
355 }
356
357 server::abort_worker_async(upid);
358
359 Ok(Value::Null)
360 }
361
362 #[api(
363 input: {
364 properties: {
365 node: {
366 schema: NODE_SCHEMA
367 },
368 start: {
369 type: u64,
370 description: "List tasks beginning from this offset.",
371 default: 0,
372 optional: true,
373 },
374 limit: {
375 type: u64,
376 description: "Only list this amount of tasks. (0 means no limit)",
377 default: 50,
378 optional: true,
379 },
380 store: {
381 schema: DATASTORE_SCHEMA,
382 optional: true,
383 },
384 running: {
385 type: bool,
386 description: "Only list running tasks.",
387 optional: true,
388 default: false,
389 },
390 errors: {
391 type: bool,
392 description: "Only list erroneous tasks.",
393 optional:true,
394 default: false,
395 },
396 userfilter: {
397 optional: true,
398 type: String,
399 description: "Only list tasks from this user.",
400 },
401 since: {
402 type: i64,
403 description: "Only list tasks since this UNIX epoch.",
404 optional: true,
405 },
406 until: {
407 type: i64,
408 description: "Only list tasks until this UNIX epoch.",
409 optional: true,
410 },
411 typefilter: {
412 optional: true,
413 type: String,
414 description: "Only list tasks whose type contains this.",
415 },
416 statusfilter: {
417 optional: true,
418 type: Array,
419 description: "Only list tasks which have any one of the listed status.",
420 items: {
421 type: TaskStateType,
422 },
423 },
424 },
425 },
426 returns: pbs_api_types::NODE_TASKS_LIST_TASKS_RETURN_TYPE,
427 access: {
428 description: "Users can only see their own tasks, unless they have Sys.Audit on /system/tasks.",
429 permission: &Permission::Anybody,
430 },
431 )]
432 /// List tasks.
433 #[allow(clippy::too_many_arguments)]
434 pub fn list_tasks(
435 start: u64,
436 limit: u64,
437 errors: bool,
438 running: bool,
439 userfilter: Option<String>,
440 since: Option<i64>,
441 until: Option<i64>,
442 typefilter: Option<String>,
443 statusfilter: Option<Vec<TaskStateType>>,
444 param: Value,
445 mut rpcenv: &mut dyn RpcEnvironment,
446 ) -> Result<Vec<TaskListItem>, Error> {
447
448 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
449 let user_info = CachedUserInfo::new()?;
450 let user_privs = user_info.lookup_privs(&auth_id, &["system", "tasks"]);
451
452 let list_all = (user_privs & PRIV_SYS_AUDIT) != 0;
453
454 let store = param["store"].as_str();
455
456 let list = TaskListInfoIterator::new(running)?;
457 let limit = if limit > 0 { limit as usize } else { usize::MAX };
458
459 let mut skipped = 0;
460 let mut result: Vec<TaskListItem> = Vec::new();
461
462 for info in list {
463 let info = match info {
464 Ok(info) => info,
465 Err(_) => break,
466 };
467
468 if let Some(until) = until {
469 if info.upid.starttime > until {
470 continue;
471 }
472 }
473
474 if let Some(since) = since {
475 if let Some(ref state) = info.state {
476 if state.endtime() < since {
477 // we reached the tasks that ended before our 'since'
478 // so we can stop iterating
479 break;
480 }
481 }
482 if info.upid.starttime < since {
483 continue;
484 }
485 }
486
487 if !list_all && check_task_access(&auth_id, &info.upid).is_err() {
488 continue;
489 }
490
491 if let Some(needle) = &userfilter {
492 if !info.upid.auth_id.to_string().contains(needle) { continue; }
493 }
494
495 if let Some(store) = store {
496 if !check_job_store(&info.upid, store) { continue; }
497 }
498
499 if let Some(typefilter) = &typefilter {
500 if !info.upid.worker_type.contains(typefilter) { continue; }
501 }
502
503 match (&info.state, &statusfilter) {
504 (Some(_), _) if running => continue,
505 (Some(crate::server::TaskState::OK { .. }), _) if errors => continue,
506 (Some(state), Some(filters)) => {
507 if !filters.contains(&tasktype(state)) {
508 continue;
509 }
510 },
511 (None, Some(_)) => continue,
512 _ => {},
513 }
514
515 if skipped < start as usize {
516 skipped += 1;
517 continue;
518 }
519
520 result.push(info.into());
521
522 if result.len() >= limit {
523 break;
524 }
525 }
526
527 let mut count = result.len() + start as usize;
528 if !result.is_empty() && result.len() >= limit { // we have a 'virtual' entry as long as we have any new
529 count += 1;
530 }
531
532 rpcenv["total"] = Value::from(count);
533
534 Ok(result)
535 }
536
537 #[sortable]
538 const UPID_API_SUBDIRS: SubdirMap = &sorted!([
539 (
540 "log", &Router::new()
541 .get(&API_METHOD_READ_TASK_LOG)
542 ),
543 (
544 "status", &Router::new()
545 .get(&API_METHOD_GET_TASK_STATUS)
546 )
547 ]);
548
549 pub const UPID_API_ROUTER: Router = Router::new()
550 .get(&list_subdirs_api_method!(UPID_API_SUBDIRS))
551 .delete(&API_METHOD_STOP_TASK)
552 .subdirs(&UPID_API_SUBDIRS);
553
554 pub const ROUTER: Router = Router::new()
555 .get(&API_METHOD_LIST_TASKS)
556 .match_all("upid", &UPID_API_ROUTER);