]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/node/tasks.rs
b725f35d59431a38cb9142d6858ee5e9f8cae798
[proxmox-backup.git] / src / api2 / node / tasks.rs
1 use std::fs::File;
2 use std::io::{BufRead, BufReader};
3
4 use failure::*;
5 use serde_json::{json, Value};
6
7 use proxmox::{sortable, identity};
8 use proxmox::api::list_subdirs_api_method;
9 use proxmox::api::{ApiHandler, ApiMethod, Router, RpcEnvironment};
10 use proxmox::api::router::SubdirMap;
11 use proxmox::api::schema::*;
12
13 use crate::tools;
14 use crate::api2::types::*;
15 use crate::server::{self, UPID};
16
17 fn get_task_status(
18 param: Value,
19 _info: &ApiMethod,
20 _rpcenv: &mut dyn RpcEnvironment,
21 ) -> Result<Value, Error> {
22
23 let upid = extract_upid(&param)?;
24
25 let mut result = json!({
26 "upid": param["upid"],
27 "node": upid.node,
28 "pid": upid.pid,
29 "pstart": upid.pstart,
30 "starttime": upid.starttime,
31 "type": upid.worker_type,
32 "id": upid.worker_id,
33 "user": upid.username,
34 });
35
36 if crate::server::worker_is_active(&upid) {
37 result["status"] = Value::from("running");
38 } else {
39 let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(String::from("unknown"));
40 result["status"] = Value::from("stopped");
41 result["exitstatus"] = Value::from(exitstatus);
42 };
43
44 Ok(result)
45 }
46
47 fn extract_upid(param: &Value) -> Result<UPID, Error> {
48
49 let upid_str = tools::required_string_param(&param, "upid")?;
50
51 upid_str.parse::<UPID>()
52 }
53
54 fn read_task_log(
55 param: Value,
56 _info: &ApiMethod,
57 rpcenv: &mut dyn RpcEnvironment,
58 ) -> Result<Value, Error> {
59
60 let upid = extract_upid(&param)?;
61
62 let test_status = param["test-status"].as_bool().unwrap_or(false);
63
64 let start = param["start"].as_u64().unwrap_or(0);
65 let mut limit = param["limit"].as_u64().unwrap_or(50);
66
67 let mut count: u64 = 0;
68
69 let path = upid.log_path();
70
71 let file = File::open(path)?;
72
73 let mut lines: Vec<Value> = vec![];
74
75 for line in BufReader::new(file).lines() {
76 match line {
77 Ok(line) => {
78 count += 1;
79 if count < start { continue };
80 if limit == 0 { continue };
81
82 lines.push(json!({ "n": count, "t": line }));
83
84 limit -= 1;
85 }
86 Err(err) => {
87 log::error!("reading task log failed: {}", err);
88 break;
89 }
90 }
91 }
92
93 rpcenv.set_result_attrib("total", Value::from(count));
94
95 if test_status {
96 let active = crate::server::worker_is_active(&upid);
97 rpcenv.set_result_attrib("active", Value::from(active));
98 }
99
100 Ok(json!(lines))
101 }
102
103 fn stop_task(
104 param: Value,
105 _info: &ApiMethod,
106 _rpcenv: &mut dyn RpcEnvironment,
107 ) -> Result<Value, Error> {
108
109 let upid = extract_upid(&param)?;
110
111 if crate::server::worker_is_active(&upid) {
112 server::abort_worker_async(upid);
113 }
114
115 Ok(Value::Null)
116 }
117
118 fn list_tasks(
119 param: Value,
120 _info: &ApiMethod,
121 rpcenv: &mut dyn RpcEnvironment,
122 ) -> Result<Value, Error> {
123
124 let start = param["start"].as_u64().unwrap_or(0);
125 let limit = param["limit"].as_u64().unwrap_or(50);
126 let errors = param["errors"].as_bool().unwrap_or(false);
127 let running = param["running"].as_bool().unwrap_or(false);
128
129 let store = param["store"].as_str();
130
131 let userfilter = param["userfilter"].as_str();
132
133 let list = server::read_task_list()?;
134
135 let mut result = vec![];
136
137 let mut count = 0;
138
139 for info in list.iter() {
140 let mut entry = json!({
141 "upid": info.upid_str,
142 "node": "localhost",
143 "pid": info.upid.pid,
144 "pstart": info.upid.pstart,
145 "starttime": info.upid.starttime,
146 "type": info.upid.worker_type,
147 "id": info.upid.worker_id,
148 "user": info.upid.username,
149 });
150
151 if let Some(username) = userfilter {
152 if !info.upid.username.contains(username) { continue; }
153 }
154
155 if let Some(store) = store {
156 // Note: useful to select all tasks spawned by proxmox-backup-client
157 let worker_id = match &info.upid.worker_id {
158 Some(w) => w,
159 None => continue, // skip
160 };
161
162 if info.upid.worker_type == "backup" || info.upid.worker_type == "restore" ||
163 info.upid.worker_type == "prune"
164 {
165 let prefix = format!("{}_", store);
166 if !worker_id.starts_with(&prefix) { continue; }
167 } else if info.upid.worker_type == "garbage_collection" {
168 if worker_id != store { continue; }
169 } else {
170 continue; // skip
171 }
172 }
173
174 if let Some(ref state) = info.state {
175 if running { continue; }
176 if errors && state.1 == "OK" {
177 continue;
178 }
179
180 entry["endtime"] = Value::from(state.0);
181 entry["status"] = Value::from(state.1.clone());
182 }
183
184 if (count as u64) < start {
185 count += 1;
186 continue;
187 } else {
188 count += 1;
189 }
190
191 if (result.len() as u64) < limit { result.push(entry); };
192 }
193
194 rpcenv.set_result_attrib("total", Value::from(count));
195
196 Ok(json!(result))
197 }
198
199 #[sortable]
200 const UPID_API_SUBDIRS: SubdirMap = &[
201 (
202 "log", &Router::new()
203 .get(
204 &ApiMethod::new(
205 &ApiHandler::Sync(&read_task_log),
206 &ObjectSchema::new(
207 "Read task log.",
208 &sorted!([
209 ("node", false, &NODE_SCHEMA),
210 ( "test-status",
211 true,
212 &BooleanSchema::new(
213 "Test task status, and set result attribute \"active\" accordingly."
214 ).schema()
215 ),
216 ("upid", false, &UPID_SCHEMA),
217 ("start", true, &IntegerSchema::new("Start at this line.")
218 .minimum(0)
219 .default(0)
220 .schema()
221 ),
222 ("limit", true, &IntegerSchema::new("Only list this amount of lines.")
223 .minimum(0)
224 .default(50)
225 .schema()
226 ),
227 ]),
228 )
229 )
230 )
231 ),
232 (
233 "status", &Router::new()
234 .get(
235 &ApiMethod::new(
236 &ApiHandler::Sync(&get_task_status),
237 &ObjectSchema::new(
238 "Get task status.",
239 &sorted!([
240 ("node", false, &NODE_SCHEMA),
241 ("upid", false, &UPID_SCHEMA),
242 ]),
243 )
244 )
245 )
246 )
247 ];
248
249 #[sortable]
250 pub const UPID_API_ROUTER: Router = Router::new()
251 .get(&list_subdirs_api_method!(UPID_API_SUBDIRS))
252 .delete(
253 &ApiMethod::new(
254 &ApiHandler::Sync(&stop_task),
255 &ObjectSchema::new(
256 "Try to stop a task.",
257 &sorted!([
258 ("node", false, &NODE_SCHEMA),
259 ("upid", false, &UPID_SCHEMA),
260 ]),
261 )
262 ).protected(true)
263 )
264 .subdirs(&UPID_API_SUBDIRS);
265
266 #[sortable]
267 pub const ROUTER: Router = Router::new()
268 .get(
269 &ApiMethod::new(
270 &ApiHandler::Sync(&list_tasks),
271 &ObjectSchema::new(
272 "List tasks.",
273 &sorted!([
274 ("node", false, &NODE_SCHEMA),
275 ("start", true, &IntegerSchema::new("List tasks beginning from this offset.")
276 .minimum(0)
277 .default(0)
278 .schema()
279 ),
280 ("limit", true, &IntegerSchema::new("Only list this amount of tasks.")
281 .minimum(0)
282 .default(50)
283 .schema()
284 ),
285 ("store", true, &DATASTORE_SCHEMA),
286 ("running", true, &BooleanSchema::new("Only list running tasks.").schema()),
287 ("errors", true, &BooleanSchema::new("Only list erroneous tasks.").schema()),
288 ("userfilter", true, &StringSchema::new("Only list tasks from this user.").schema()),
289 ]),
290 )
291 )
292 )
293 .match_all("upid", &UPID_API_ROUTER);