]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/node/tasks.rs
e8de42fd68c231f5054523d96fdf4c69635be627
[proxmox-backup.git] / src / api2 / node / tasks.rs
1 use std::fs::File;
2 use std::io::{BufRead, BufReader};
3
4 use anyhow::{Error};
5 use serde_json::{json, Value};
6
7 use proxmox::api::{api, Router, RpcEnvironment, Permission};
8 use proxmox::api::router::SubdirMap;
9 use proxmox::{identity, list_subdirs_api_method, sortable};
10
11 use crate::tools;
12 use crate::api2::types::*;
13 use crate::server::{self, UPID};
14 use crate::config::acl::{PRIV_SYS_AUDIT, PRIV_SYS_MODIFY};
15
16 #[api(
17 input: {
18 properties: {
19 node: {
20 schema: NODE_SCHEMA,
21 },
22 upid: {
23 schema: UPID_SCHEMA,
24 },
25 },
26 },
27 returns: {
28 description: "Task status nformation.",
29 properties: {
30 node: {
31 schema: NODE_SCHEMA,
32 },
33 upid: {
34 schema: UPID_SCHEMA,
35 },
36 pid: {
37 type: i64,
38 description: "The Unix PID.",
39 },
40 pstart: {
41 type: u64,
42 description: "The Unix process start time from `/proc/pid/stat`",
43 },
44 starttime: {
45 type: i64,
46 description: "The task start time (Epoch)",
47 },
48 "type": {
49 type: String,
50 description: "Worker type (arbitrary ASCII string)",
51 },
52 id: {
53 type: String,
54 optional: true,
55 description: "Worker ID (arbitrary ASCII string)",
56 },
57 user: {
58 type: String,
59 description: "The user who started the task.",
60 },
61 status: {
62 type: String,
63 description: "'running' or 'stopped'",
64 },
65 exitstatus: {
66 type: String,
67 optional: true,
68 description: "'OK', 'Error: <msg>', or 'unkwown'.",
69 },
70 },
71 },
72 access: {
73 permission: &Permission::Privilege(&["system", "tasks"], PRIV_SYS_AUDIT, false),
74 },
75 )]
76 /// Get task status.
77 fn get_task_status(
78 param: Value,
79 ) -> Result<Value, Error> {
80
81 let upid = extract_upid(&param)?;
82
83 let mut result = json!({
84 "upid": param["upid"],
85 "node": upid.node,
86 "pid": upid.pid,
87 "pstart": upid.pstart,
88 "starttime": upid.starttime,
89 "type": upid.worker_type,
90 "id": upid.worker_id,
91 "user": upid.username,
92 });
93
94 if crate::server::worker_is_active(&upid) {
95 result["status"] = Value::from("running");
96 } else {
97 let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(String::from("unknown"));
98 result["status"] = Value::from("stopped");
99 result["exitstatus"] = Value::from(exitstatus);
100 };
101
102 Ok(result)
103 }
104
105 fn extract_upid(param: &Value) -> Result<UPID, Error> {
106
107 let upid_str = tools::required_string_param(&param, "upid")?;
108
109 upid_str.parse::<UPID>()
110 }
111
112 #[api(
113 input: {
114 properties: {
115 node: {
116 schema: NODE_SCHEMA,
117 },
118 upid: {
119 schema: UPID_SCHEMA,
120 },
121 "test-status": {
122 type: bool,
123 optional: true,
124 description: "Test task status, and set result attribute \"active\" accordingly.",
125 },
126 start: {
127 type: u64,
128 optional: true,
129 description: "Start at this line.",
130 default: 0,
131 },
132 limit: {
133 type: u64,
134 optional: true,
135 description: "Only list this amount of lines.",
136 default: 50,
137 },
138 },
139 },
140 access: {
141 permission: &Permission::Privilege(&["system", "tasks"], PRIV_SYS_AUDIT, false),
142 },
143 )]
144 /// Read task log.
145 fn read_task_log(
146 param: Value,
147 rpcenv: &mut dyn RpcEnvironment,
148 ) -> Result<Value, Error> {
149
150 let upid = extract_upid(&param)?;
151
152 let test_status = param["test-status"].as_bool().unwrap_or(false);
153
154 let start = param["start"].as_u64().unwrap_or(0);
155 let mut limit = param["limit"].as_u64().unwrap_or(50);
156
157 let mut count: u64 = 0;
158
159 let path = upid.log_path();
160
161 let file = File::open(path)?;
162
163 let mut lines: Vec<Value> = vec![];
164
165 for line in BufReader::new(file).lines() {
166 match line {
167 Ok(line) => {
168 count += 1;
169 if count < start { continue };
170 if limit == 0 { continue };
171
172 lines.push(json!({ "n": count, "t": line }));
173
174 limit -= 1;
175 }
176 Err(err) => {
177 log::error!("reading task log failed: {}", err);
178 break;
179 }
180 }
181 }
182
183 rpcenv.set_result_attrib("total", Value::from(count));
184
185 if test_status {
186 let active = crate::server::worker_is_active(&upid);
187 rpcenv.set_result_attrib("active", Value::from(active));
188 }
189
190 Ok(json!(lines))
191 }
192
193 #[api(
194 protected: true,
195 input: {
196 properties: {
197 node: {
198 schema: NODE_SCHEMA,
199 },
200 upid: {
201 schema: UPID_SCHEMA,
202 },
203 },
204 },
205 access: {
206 permission: &Permission::Privilege(&["system", "tasks"], PRIV_SYS_MODIFY, false),
207 },
208 )]
209 /// Try to stop a task.
210 fn stop_task(
211 param: Value,
212 ) -> Result<Value, Error> {
213
214 let upid = extract_upid(&param)?;
215
216 if crate::server::worker_is_active(&upid) {
217 server::abort_worker_async(upid);
218 }
219
220 Ok(Value::Null)
221 }
222
223 #[api(
224 input: {
225 properties: {
226 node: {
227 schema: NODE_SCHEMA
228 },
229 start: {
230 type: u64,
231 description: "List tasks beginning from this offset.",
232 default: 0,
233 optional: true,
234 },
235 limit: {
236 type: u64,
237 description: "Only list this amount of tasks.",
238 default: 50,
239 optional: true,
240 },
241 store: {
242 schema: DATASTORE_SCHEMA,
243 optional: true,
244 },
245 running: {
246 type: bool,
247 description: "Only list running tasks.",
248 optional: true,
249 },
250 errors: {
251 type: bool,
252 description: "Only list erroneous tasks.",
253 optional:true,
254 },
255 userfilter: {
256 optional:true,
257 type: String,
258 description: "Only list tasks from this user.",
259 },
260 },
261 },
262 returns: {
263 description: "A list of tasks.",
264 type: Array,
265 items: { type: TaskListItem },
266 },
267 access: {
268 permission: &Permission::Privilege(&[], PRIV_SYS_AUDIT, false),
269 },
270 )]
271 /// List tasks.
272 pub fn list_tasks(
273 param: Value,
274 rpcenv: &mut dyn RpcEnvironment,
275 ) -> Result<Vec<TaskListItem>, Error> {
276
277 let start = param["start"].as_u64().unwrap_or(0);
278 let limit = param["limit"].as_u64().unwrap_or(50);
279 let errors = param["errors"].as_bool().unwrap_or(false);
280 let running = param["running"].as_bool().unwrap_or(false);
281
282 let store = param["store"].as_str();
283
284 let userfilter = param["userfilter"].as_str();
285
286 let list = server::read_task_list()?;
287
288 let mut result = vec![];
289
290 let mut count = 0;
291
292 for info in list.iter() {
293 let mut entry = TaskListItem {
294 upid: info.upid_str.clone(),
295 node: "localhost".to_string(),
296 pid: info.upid.pid as i64,
297 pstart: info.upid.pstart,
298 starttime: info.upid.starttime,
299 worker_type: info.upid.worker_type.clone(),
300 worker_id: info.upid.worker_id.clone(),
301 user: info.upid.username.clone(),
302 endtime: None,
303 status: None,
304 };
305
306 if let Some(username) = userfilter {
307 if !info.upid.username.contains(username) { continue; }
308 }
309
310 if let Some(store) = store {
311 // Note: useful to select all tasks spawned by proxmox-backup-client
312 let worker_id = match &info.upid.worker_id {
313 Some(w) => w,
314 None => continue, // skip
315 };
316
317 if info.upid.worker_type == "backup" || info.upid.worker_type == "restore" ||
318 info.upid.worker_type == "prune"
319 {
320 let prefix = format!("{}_", store);
321 if !worker_id.starts_with(&prefix) { continue; }
322 } else if info.upid.worker_type == "garbage_collection" {
323 if worker_id != store { continue; }
324 } else {
325 continue; // skip
326 }
327 }
328
329 if let Some(ref state) = info.state {
330 if running { continue; }
331 if errors && state.1 == "OK" {
332 continue;
333 }
334
335 entry.endtime = Some(state.0);
336 entry.status = Some(state.1.clone());
337 }
338
339 if (count as u64) < start {
340 count += 1;
341 continue;
342 } else {
343 count += 1;
344 }
345
346 if (result.len() as u64) < limit { result.push(entry); };
347 }
348
349 rpcenv.set_result_attrib("total", Value::from(count));
350
351 Ok(result)
352 }
353
354 #[sortable]
355 const UPID_API_SUBDIRS: SubdirMap = &sorted!([
356 (
357 "log", &Router::new()
358 .get(&API_METHOD_READ_TASK_LOG)
359 ),
360 (
361 "status", &Router::new()
362 .get(&API_METHOD_GET_TASK_STATUS)
363 )
364 ]);
365
366 pub const UPID_API_ROUTER: Router = Router::new()
367 .get(&list_subdirs_api_method!(UPID_API_SUBDIRS))
368 .delete(&API_METHOD_STOP_TASK)
369 .subdirs(&UPID_API_SUBDIRS);
370
371 pub const ROUTER: Router = Router::new()
372 .get(&API_METHOD_LIST_TASKS)
373 .match_all("upid", &UPID_API_ROUTER);