]> git.proxmox.com Git - proxmox-backup.git/blame - src/api2/node/tasks.rs
api: define subscription key schema and use it
[proxmox-backup.git] / src / api2 / node / tasks.rs
CommitLineData
cad540e9
WB
1use std::fs::File;
2use std::io::{BufRead, BufReader};
063ca5be 3
f7d4e4b5 4use anyhow::{Error};
063ca5be
DM
5use serde_json::{json, Value};
6
e7cb4dc5 7use proxmox::api::{api, Router, RpcEnvironment, Permission};
cad540e9 8use proxmox::api::router::SubdirMap;
9ea4bce4 9use proxmox::{identity, list_subdirs_api_method, sortable};
552c2259
DM
10
11use crate::tools;
4ebf0eab 12use crate::api2::types::*;
768e10d0 13use crate::server::{self, UPID, TaskState, TaskListInfoIterator};
83b6a7cf 14use crate::config::acl::{PRIV_SYS_AUDIT, PRIV_SYS_MODIFY};
720af9f6
DM
15use crate::config::cached_user_info::CachedUserInfo;
16
16245d54
FG
17fn check_task_access(auth_id: &Authid, upid: &UPID) -> Result<(), Error> {
18 let task_auth_id = &upid.auth_id;
19 if auth_id == task_auth_id
20 || (task_auth_id.is_token() && &Authid::from(task_auth_id.user().clone()) == auth_id) {
21 Ok(())
22 } else {
23 let user_info = CachedUserInfo::new()?;
24 user_info.check_privs(auth_id, &["system", "tasks"], PRIV_SYS_AUDIT, false)
25 }
26}
5a12c0e2 27
83b6a7cf
DM
28#[api(
29 input: {
30 properties: {
31 node: {
32 schema: NODE_SCHEMA,
33 },
34 upid: {
35 schema: UPID_SCHEMA,
36 },
37 },
38 },
39 returns: {
9809772b 40 description: "Task status information.",
83b6a7cf
DM
41 properties: {
42 node: {
43 schema: NODE_SCHEMA,
44 },
45 upid: {
46 schema: UPID_SCHEMA,
47 },
48 pid: {
49 type: i64,
50 description: "The Unix PID.",
51 },
52 pstart: {
53 type: u64,
54 description: "The Unix process start time from `/proc/pid/stat`",
55 },
56 starttime: {
57 type: i64,
58 description: "The task start time (Epoch)",
59 },
60 "type": {
61 type: String,
62 description: "Worker type (arbitrary ASCII string)",
63 },
64 id: {
65 type: String,
66 optional: true,
67 description: "Worker ID (arbitrary ASCII string)",
68 },
69 user: {
16245d54 70 type: Userid,
83b6a7cf
DM
71 description: "The user who started the task.",
72 },
16245d54
FG
73 tokenid: {
74 type: Tokenname,
75 optional: true,
76 },
83b6a7cf
DM
77 status: {
78 type: String,
79 description: "'running' or 'stopped'",
80 },
81 exitstatus: {
82 type: String,
83 optional: true,
84 description: "'OK', 'Error: <msg>', or 'unkwown'.",
85 },
86 },
87 },
88 access: {
9809772b 89 description: "Users can access their own tasks, or need Sys.Audit on /system/tasks.",
720af9f6 90 permission: &Permission::Anybody,
83b6a7cf
DM
91 },
92)]
93/// Get task status.
5751e495 94async fn get_task_status(
5a12c0e2 95 param: Value,
720af9f6 96 rpcenv: &mut dyn RpcEnvironment,
5a12c0e2
DM
97) -> Result<Value, Error> {
98
99 let upid = extract_upid(&param)?;
100
e6dc35ac 101 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
16245d54 102 check_task_access(&auth_id, &upid)?;
720af9f6 103
c360bd73
DM
104 let mut result = json!({
105 "upid": param["upid"],
106 "node": upid.node,
107 "pid": upid.pid,
108 "pstart": upid.pstart,
109 "starttime": upid.starttime,
110 "type": upid.worker_type,
111 "id": upid.worker_id,
16245d54 112 "user": upid.auth_id.user(),
c360bd73
DM
113 });
114
16245d54
FG
115 if upid.auth_id.is_token() {
116 result["tokenid"] = Value::from(upid.auth_id.tokenname().unwrap().as_str());
117 }
118
5751e495 119 if crate::server::worker_is_active(&upid).await? {
c360bd73 120 result["status"] = Value::from("running");
5a12c0e2 121 } else {
77bd2a46 122 let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(TaskState::Unknown { endtime: 0 });
c360bd73 123 result["status"] = Value::from("stopped");
4c116baf 124 result["exitstatus"] = Value::from(exitstatus.to_string());
5a12c0e2
DM
125 };
126
127 Ok(result)
128}
129
130fn extract_upid(param: &Value) -> Result<UPID, Error> {
131
132 let upid_str = tools::required_string_param(&param, "upid")?;
133
8560fe3e 134 upid_str.parse::<UPID>()
5a12c0e2
DM
135}
136
83b6a7cf
DM
137#[api(
138 input: {
139 properties: {
140 node: {
141 schema: NODE_SCHEMA,
142 },
143 upid: {
144 schema: UPID_SCHEMA,
145 },
146 "test-status": {
147 type: bool,
148 optional: true,
149 description: "Test task status, and set result attribute \"active\" accordingly.",
150 },
151 start: {
152 type: u64,
153 optional: true,
154 description: "Start at this line.",
155 default: 0,
156 },
157 limit: {
158 type: u64,
159 optional: true,
160 description: "Only list this amount of lines.",
161 default: 50,
162 },
163 },
164 },
165 access: {
720af9f6
DM
166 description: "Users can access there own tasks, or need Sys.Audit on /system/tasks.",
167 permission: &Permission::Anybody,
83b6a7cf
DM
168 },
169)]
170/// Read task log.
5751e495 171async fn read_task_log(
5a12c0e2 172 param: Value,
e8d1da6a 173 mut rpcenv: &mut dyn RpcEnvironment,
5a12c0e2
DM
174) -> Result<Value, Error> {
175
176 let upid = extract_upid(&param)?;
6b508dd5 177
e6dc35ac 178 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
720af9f6 179
16245d54 180 check_task_access(&auth_id, &upid)?;
720af9f6 181
6b508dd5
DM
182 let test_status = param["test-status"].as_bool().unwrap_or(false);
183
5a12c0e2
DM
184 let start = param["start"].as_u64().unwrap_or(0);
185 let mut limit = param["limit"].as_u64().unwrap_or(50);
6b508dd5 186
5a12c0e2
DM
187 let mut count: u64 = 0;
188
189 let path = upid.log_path();
190
191 let file = File::open(path)?;
192
193 let mut lines: Vec<Value> = vec![];
194
195 for line in BufReader::new(file).lines() {
196 match line {
197 Ok(line) => {
198 count += 1;
199 if count < start { continue };
11377a47 200 if limit == 0 { continue };
5a12c0e2
DM
201
202 lines.push(json!({ "n": count, "t": line }));
203
204 limit -= 1;
205 }
206 Err(err) => {
207 log::error!("reading task log failed: {}", err);
208 break;
209 }
210 }
211 }
212
e8d1da6a 213 rpcenv["total"] = Value::from(count);
d8d40dd0 214
6b508dd5 215 if test_status {
5751e495 216 let active = crate::server::worker_is_active(&upid).await?;
e8d1da6a 217 rpcenv["active"] = Value::from(active);
6b508dd5
DM
218 }
219
5a12c0e2
DM
220 Ok(json!(lines))
221}
063ca5be 222
83b6a7cf
DM
223#[api(
224 protected: true,
225 input: {
226 properties: {
227 node: {
228 schema: NODE_SCHEMA,
229 },
230 upid: {
231 schema: UPID_SCHEMA,
232 },
233 },
234 },
235 access: {
720af9f6
DM
236 description: "Users can stop there own tasks, or need Sys.Modify on /system/tasks.",
237 permission: &Permission::Anybody,
83b6a7cf
DM
238 },
239)]
240/// Try to stop a task.
a665dea1
DM
241fn stop_task(
242 param: Value,
720af9f6 243 rpcenv: &mut dyn RpcEnvironment,
a665dea1
DM
244) -> Result<Value, Error> {
245
246 let upid = extract_upid(&param)?;
247
e6dc35ac 248 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
720af9f6 249
e6dc35ac 250 if auth_id != upid.auth_id {
720af9f6 251 let user_info = CachedUserInfo::new()?;
e6dc35ac 252 user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
720af9f6
DM
253 }
254
5751e495 255 server::abort_worker_async(upid);
a665dea1
DM
256
257 Ok(Value::Null)
258}
259
2c4b303c
DM
260#[api(
261 input: {
262 properties: {
263 node: {
264 schema: NODE_SCHEMA
265 },
266 start: {
267 type: u64,
268 description: "List tasks beginning from this offset.",
269 default: 0,
270 optional: true,
271 },
272 limit: {
273 type: u64,
274 description: "Only list this amount of tasks.",
275 default: 50,
276 optional: true,
277 },
278 store: {
279 schema: DATASTORE_SCHEMA,
280 optional: true,
281 },
282 running: {
283 type: bool,
284 description: "Only list running tasks.",
285 optional: true,
ca9dfe5f 286 default: false,
2c4b303c
DM
287 },
288 errors: {
289 type: bool,
290 description: "Only list erroneous tasks.",
291 optional:true,
ca9dfe5f 292 default: false,
2c4b303c
DM
293 },
294 userfilter: {
e7cb4dc5 295 optional: true,
2c4b303c
DM
296 type: String,
297 description: "Only list tasks from this user.",
298 },
299 },
300 },
301 returns: {
302 description: "A list of tasks.",
303 type: Array,
99384f79 304 items: { type: TaskListItem },
2c4b303c 305 },
83b6a7cf 306 access: {
720af9f6
DM
307 description: "Users can only see there own tasks, unless the have Sys.Audit on /system/tasks.",
308 permission: &Permission::Anybody,
83b6a7cf 309 },
2c4b303c
DM
310)]
311/// List tasks.
8528fce8 312pub fn list_tasks(
ca9dfe5f
DM
313 start: u64,
314 limit: u64,
315 errors: bool,
316 running: bool,
005a5b96 317 userfilter: Option<String>,
063ca5be 318 param: Value,
e8d1da6a 319 mut rpcenv: &mut dyn RpcEnvironment,
99384f79 320) -> Result<Vec<TaskListItem>, Error> {
063ca5be 321
e6dc35ac 322 let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
720af9f6 323 let user_info = CachedUserInfo::new()?;
e6dc35ac 324 let user_privs = user_info.lookup_privs(&auth_id, &["system", "tasks"]);
720af9f6
DM
325
326 let list_all = (user_privs & PRIV_SYS_AUDIT) != 0;
327
567d3e00
DM
328 let store = param["store"].as_str();
329
768e10d0 330 let list = TaskListInfoIterator::new(running)?;
d2a2e02b 331
768e10d0
DC
332 let result: Vec<TaskListItem> = list
333 .take_while(|info| !info.is_err())
334 .filter_map(|info| {
335 let info = match info {
336 Ok(info) => info,
337 Err(_) => return None,
338 };
720af9f6 339
16245d54
FG
340 if !list_all && check_task_access(&auth_id, &info.upid).is_err() {
341 return None;
342 }
063ca5be 343
e6dc35ac
FG
344 if let Some(needle) = &userfilter {
345 if !info.upid.auth_id.to_string().contains(needle) { return None; }
d2a2e02b
DM
346 }
347
567d3e00
DM
348 if let Some(store) = store {
349 // Note: useful to select all tasks spawned by proxmox-backup-client
350 let worker_id = match &info.upid.worker_id {
351 Some(w) => w,
768e10d0 352 None => return None, // skip
567d3e00
DM
353 };
354
503995c7
DM
355 if info.upid.worker_type == "backup" || info.upid.worker_type == "restore" ||
356 info.upid.worker_type == "prune"
357 {
4ebda996 358 let prefix = format!("{}:", store);
768e10d0 359 if !worker_id.starts_with(&prefix) { return None; }
503995c7 360 } else if info.upid.worker_type == "garbage_collection" {
768e10d0 361 if worker_id != store { return None; }
567d3e00 362 } else {
768e10d0 363 return None; // skip
567d3e00
DM
364 }
365 }
366
768e10d0 367 match info.state {
df4827f2 368 Some(_) if running => return None,
768e10d0
DC
369 Some(crate::server::TaskState::OK { .. }) if errors => return None,
370 _ => {},
063ca5be
DM
371 }
372
768e10d0
DC
373 Some(info.into())
374 }).skip(start as usize)
375 .take(limit as usize)
376 .collect();
063ca5be 377
768e10d0
DC
378 let mut count = result.len() + start as usize;
379 if result.len() > 0 && result.len() >= limit as usize { // we have a 'virtual' entry as long as we have any new
380 count += 1;
063ca5be
DM
381 }
382
e8d1da6a 383 rpcenv["total"] = Value::from(count);
063ca5be 384
0196b9bf 385 Ok(result)
063ca5be
DM
386}
387
552c2259 388#[sortable]
83b6a7cf 389const UPID_API_SUBDIRS: SubdirMap = &sorted!([
255f378a
DM
390 (
391 "log", &Router::new()
83b6a7cf 392 .get(&API_METHOD_READ_TASK_LOG)
255f378a
DM
393 ),
394 (
395 "status", &Router::new()
83b6a7cf 396 .get(&API_METHOD_GET_TASK_STATUS)
255f378a 397 )
83b6a7cf 398]);
255f378a
DM
399
400pub const UPID_API_ROUTER: Router = Router::new()
401 .get(&list_subdirs_api_method!(UPID_API_SUBDIRS))
83b6a7cf 402 .delete(&API_METHOD_STOP_TASK)
255f378a
DM
403 .subdirs(&UPID_API_SUBDIRS);
404
405pub const ROUTER: Router = Router::new()
2c4b303c 406 .get(&API_METHOD_LIST_TASKS)
255f378a 407 .match_all("upid", &UPID_API_ROUTER);