]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/node/tasks.rs
server/worker_task: let upid_read_status also return the endtime
[proxmox-backup.git] / src / api2 / node / tasks.rs
1 use std::fs::File;
2 use std::io::{BufRead, BufReader};
3
4 use anyhow::{Error};
5 use serde_json::{json, Value};
6
7 use proxmox::api::{api, Router, RpcEnvironment, Permission};
8 use proxmox::api::router::SubdirMap;
9 use proxmox::{identity, list_subdirs_api_method, sortable};
10
11 use crate::tools;
12 use crate::api2::types::*;
13 use crate::server::{self, UPID, TaskState};
14 use crate::config::acl::{PRIV_SYS_AUDIT, PRIV_SYS_MODIFY};
15 use crate::config::cached_user_info::CachedUserInfo;
16
17
18 #[api(
19 input: {
20 properties: {
21 node: {
22 schema: NODE_SCHEMA,
23 },
24 upid: {
25 schema: UPID_SCHEMA,
26 },
27 },
28 },
29 returns: {
30 description: "Task status nformation.",
31 properties: {
32 node: {
33 schema: NODE_SCHEMA,
34 },
35 upid: {
36 schema: UPID_SCHEMA,
37 },
38 pid: {
39 type: i64,
40 description: "The Unix PID.",
41 },
42 pstart: {
43 type: u64,
44 description: "The Unix process start time from `/proc/pid/stat`",
45 },
46 starttime: {
47 type: i64,
48 description: "The task start time (Epoch)",
49 },
50 "type": {
51 type: String,
52 description: "Worker type (arbitrary ASCII string)",
53 },
54 id: {
55 type: String,
56 optional: true,
57 description: "Worker ID (arbitrary ASCII string)",
58 },
59 user: {
60 type: String,
61 description: "The user who started the task.",
62 },
63 status: {
64 type: String,
65 description: "'running' or 'stopped'",
66 },
67 exitstatus: {
68 type: String,
69 optional: true,
70 description: "'OK', 'Error: <msg>', or 'unkwown'.",
71 },
72 },
73 },
74 access: {
75 description: "Users can access there own tasks, or need Sys.Audit on /system/tasks.",
76 permission: &Permission::Anybody,
77 },
78 )]
79 /// Get task status.
80 async fn get_task_status(
81 param: Value,
82 rpcenv: &mut dyn RpcEnvironment,
83 ) -> Result<Value, Error> {
84
85 let upid = extract_upid(&param)?;
86
87 let userid: Userid = rpcenv.get_user().unwrap().parse()?;
88
89 if userid != upid.userid {
90 let user_info = CachedUserInfo::new()?;
91 user_info.check_privs(&userid, &["system", "tasks"], PRIV_SYS_AUDIT, false)?;
92 }
93
94 let mut result = json!({
95 "upid": param["upid"],
96 "node": upid.node,
97 "pid": upid.pid,
98 "pstart": upid.pstart,
99 "starttime": upid.starttime,
100 "type": upid.worker_type,
101 "id": upid.worker_id,
102 "user": upid.userid,
103 });
104
105 if crate::server::worker_is_active(&upid).await? {
106 result["status"] = Value::from("running");
107 } else {
108 let (_, exitstatus) = crate::server::upid_read_status(&upid).unwrap_or((0, TaskState::Unknown));
109 result["status"] = Value::from("stopped");
110 result["exitstatus"] = Value::from(exitstatus.to_string());
111 };
112
113 Ok(result)
114 }
115
116 fn extract_upid(param: &Value) -> Result<UPID, Error> {
117
118 let upid_str = tools::required_string_param(&param, "upid")?;
119
120 upid_str.parse::<UPID>()
121 }
122
123 #[api(
124 input: {
125 properties: {
126 node: {
127 schema: NODE_SCHEMA,
128 },
129 upid: {
130 schema: UPID_SCHEMA,
131 },
132 "test-status": {
133 type: bool,
134 optional: true,
135 description: "Test task status, and set result attribute \"active\" accordingly.",
136 },
137 start: {
138 type: u64,
139 optional: true,
140 description: "Start at this line.",
141 default: 0,
142 },
143 limit: {
144 type: u64,
145 optional: true,
146 description: "Only list this amount of lines.",
147 default: 50,
148 },
149 },
150 },
151 access: {
152 description: "Users can access there own tasks, or need Sys.Audit on /system/tasks.",
153 permission: &Permission::Anybody,
154 },
155 )]
156 /// Read task log.
157 async fn read_task_log(
158 param: Value,
159 mut rpcenv: &mut dyn RpcEnvironment,
160 ) -> Result<Value, Error> {
161
162 let upid = extract_upid(&param)?;
163
164 let userid: Userid = rpcenv.get_user().unwrap().parse()?;
165
166 if userid != upid.userid {
167 let user_info = CachedUserInfo::new()?;
168 user_info.check_privs(&userid, &["system", "tasks"], PRIV_SYS_AUDIT, false)?;
169 }
170
171 let test_status = param["test-status"].as_bool().unwrap_or(false);
172
173 let start = param["start"].as_u64().unwrap_or(0);
174 let mut limit = param["limit"].as_u64().unwrap_or(50);
175
176 let mut count: u64 = 0;
177
178 let path = upid.log_path();
179
180 let file = File::open(path)?;
181
182 let mut lines: Vec<Value> = vec![];
183
184 for line in BufReader::new(file).lines() {
185 match line {
186 Ok(line) => {
187 count += 1;
188 if count < start { continue };
189 if limit == 0 { continue };
190
191 lines.push(json!({ "n": count, "t": line }));
192
193 limit -= 1;
194 }
195 Err(err) => {
196 log::error!("reading task log failed: {}", err);
197 break;
198 }
199 }
200 }
201
202 rpcenv["total"] = Value::from(count);
203
204 if test_status {
205 let active = crate::server::worker_is_active(&upid).await?;
206 rpcenv["active"] = Value::from(active);
207 }
208
209 Ok(json!(lines))
210 }
211
212 #[api(
213 protected: true,
214 input: {
215 properties: {
216 node: {
217 schema: NODE_SCHEMA,
218 },
219 upid: {
220 schema: UPID_SCHEMA,
221 },
222 },
223 },
224 access: {
225 description: "Users can stop there own tasks, or need Sys.Modify on /system/tasks.",
226 permission: &Permission::Anybody,
227 },
228 )]
229 /// Try to stop a task.
230 fn stop_task(
231 param: Value,
232 rpcenv: &mut dyn RpcEnvironment,
233 ) -> Result<Value, Error> {
234
235 let upid = extract_upid(&param)?;
236
237 let userid: Userid = rpcenv.get_user().unwrap().parse()?;
238
239 if userid != upid.userid {
240 let user_info = CachedUserInfo::new()?;
241 user_info.check_privs(&userid, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
242 }
243
244 server::abort_worker_async(upid);
245
246 Ok(Value::Null)
247 }
248
249 #[api(
250 input: {
251 properties: {
252 node: {
253 schema: NODE_SCHEMA
254 },
255 start: {
256 type: u64,
257 description: "List tasks beginning from this offset.",
258 default: 0,
259 optional: true,
260 },
261 limit: {
262 type: u64,
263 description: "Only list this amount of tasks.",
264 default: 50,
265 optional: true,
266 },
267 store: {
268 schema: DATASTORE_SCHEMA,
269 optional: true,
270 },
271 running: {
272 type: bool,
273 description: "Only list running tasks.",
274 optional: true,
275 default: false,
276 },
277 errors: {
278 type: bool,
279 description: "Only list erroneous tasks.",
280 optional:true,
281 default: false,
282 },
283 userfilter: {
284 optional: true,
285 type: String,
286 description: "Only list tasks from this user.",
287 },
288 },
289 },
290 returns: {
291 description: "A list of tasks.",
292 type: Array,
293 items: { type: TaskListItem },
294 },
295 access: {
296 description: "Users can only see there own tasks, unless the have Sys.Audit on /system/tasks.",
297 permission: &Permission::Anybody,
298 },
299 )]
300 /// List tasks.
301 pub fn list_tasks(
302 start: u64,
303 limit: u64,
304 errors: bool,
305 running: bool,
306 param: Value,
307 mut rpcenv: &mut dyn RpcEnvironment,
308 ) -> Result<Vec<TaskListItem>, Error> {
309
310 let userid: Userid = rpcenv.get_user().unwrap().parse()?;
311 let user_info = CachedUserInfo::new()?;
312 let user_privs = user_info.lookup_privs(&userid, &["system", "tasks"]);
313
314 let list_all = (user_privs & PRIV_SYS_AUDIT) != 0;
315
316 let store = param["store"].as_str();
317
318 let userfilter = param["userfilter"].as_str();
319
320 let list = server::read_task_list()?;
321
322 let mut result = vec![];
323
324 let mut count = 0;
325
326 for info in list {
327 if !list_all && info.upid.userid != userid { continue; }
328
329
330 if let Some(userid) = userfilter {
331 if !info.upid.userid.as_str().contains(userid) { continue; }
332 }
333
334 if let Some(store) = store {
335 // Note: useful to select all tasks spawned by proxmox-backup-client
336 let worker_id = match &info.upid.worker_id {
337 Some(w) => w,
338 None => continue, // skip
339 };
340
341 if info.upid.worker_type == "backup" || info.upid.worker_type == "restore" ||
342 info.upid.worker_type == "prune"
343 {
344 let prefix = format!("{}_", store);
345 if !worker_id.starts_with(&prefix) { continue; }
346 } else if info.upid.worker_type == "garbage_collection" {
347 if worker_id != store { continue; }
348 } else {
349 continue; // skip
350 }
351 }
352
353 if let Some(ref state) = info.state {
354 if running { continue; }
355 if errors && state.1 == crate::server::TaskState::OK {
356 continue;
357 }
358 }
359
360 if (count as u64) < start {
361 count += 1;
362 continue;
363 } else {
364 count += 1;
365 }
366
367 if (result.len() as u64) < limit { result.push(info.into()); };
368 }
369
370 rpcenv["total"] = Value::from(count);
371
372 Ok(result)
373 }
374
375 #[sortable]
376 const UPID_API_SUBDIRS: SubdirMap = &sorted!([
377 (
378 "log", &Router::new()
379 .get(&API_METHOD_READ_TASK_LOG)
380 ),
381 (
382 "status", &Router::new()
383 .get(&API_METHOD_GET_TASK_STATUS)
384 )
385 ]);
386
387 pub const UPID_API_ROUTER: Router = Router::new()
388 .get(&list_subdirs_api_method!(UPID_API_SUBDIRS))
389 .delete(&API_METHOD_STOP_TASK)
390 .subdirs(&UPID_API_SUBDIRS);
391
392 pub const ROUTER: Router = Router::new()
393 .get(&API_METHOD_LIST_TASKS)
394 .match_all("upid", &UPID_API_ROUTER);