]>
Commit | Line | Data |
---|---|---|
063ca5be DM |
1 | use failure::*; |
2 | ||
5a12c0e2 | 3 | use crate::tools; |
063ca5be DM |
4 | use crate::api_schema::*; |
5 | use crate::api_schema::router::*; | |
6 | use serde_json::{json, Value}; | |
5a12c0e2 DM |
7 | use std::sync::Arc; |
8 | use std::fs::File; | |
9 | use std::io::{BufRead,BufReader}; | |
063ca5be | 10 | |
5a12c0e2 DM |
11 | use crate::server::{self, UPID}; |
12 | ||
13 | fn get_task_status( | |
14 | param: Value, | |
15 | _info: &ApiMethod, | |
16 | _rpcenv: &mut RpcEnvironment, | |
17 | ) -> Result<Value, Error> { | |
18 | ||
19 | let upid = extract_upid(¶m)?; | |
20 | ||
c360bd73 DM |
21 | let mut result = json!({ |
22 | "upid": param["upid"], | |
23 | "node": upid.node, | |
24 | "pid": upid.pid, | |
25 | "pstart": upid.pstart, | |
26 | "starttime": upid.starttime, | |
27 | "type": upid.worker_type, | |
28 | "id": upid.worker_id, | |
29 | "user": upid.username, | |
30 | }); | |
31 | ||
32 | if crate::server::worker_is_active(&upid) { | |
33 | result["status"] = Value::from("running"); | |
5a12c0e2 | 34 | } else { |
c360bd73 DM |
35 | let exitstatus = crate::server::upid_read_status(&upid).unwrap_or(String::from("unknown")); |
36 | result["status"] = Value::from("stopped"); | |
37 | result["exitstatus"] = Value::from(exitstatus); | |
5a12c0e2 DM |
38 | }; |
39 | ||
40 | Ok(result) | |
41 | } | |
42 | ||
43 | fn extract_upid(param: &Value) -> Result<UPID, Error> { | |
44 | ||
45 | let upid_str = tools::required_string_param(¶m, "upid")?; | |
46 | ||
47 | let upid = match upid_str.parse::<UPID>() { | |
48 | Ok(v) => v, | |
49 | Err(err) => bail!("unable to parse UPID '{}' - {}", upid_str, err), | |
50 | }; | |
51 | ||
52 | Ok(upid) | |
53 | } | |
54 | ||
55 | fn read_task_log( | |
56 | param: Value, | |
57 | _info: &ApiMethod, | |
d8d40dd0 | 58 | rpcenv: &mut RpcEnvironment, |
5a12c0e2 DM |
59 | ) -> Result<Value, Error> { |
60 | ||
61 | let upid = extract_upid(¶m)?; | |
62 | let start = param["start"].as_u64().unwrap_or(0); | |
63 | let mut limit = param["limit"].as_u64().unwrap_or(50); | |
64 | let mut count: u64 = 0; | |
65 | ||
66 | let path = upid.log_path(); | |
67 | ||
68 | let file = File::open(path)?; | |
69 | ||
70 | let mut lines: Vec<Value> = vec![]; | |
71 | ||
72 | for line in BufReader::new(file).lines() { | |
73 | match line { | |
74 | Ok(line) => { | |
75 | count += 1; | |
76 | if count < start { continue }; | |
77 | if limit <= 0 { continue }; | |
78 | ||
79 | lines.push(json!({ "n": count, "t": line })); | |
80 | ||
81 | limit -= 1; | |
82 | } | |
83 | Err(err) => { | |
84 | log::error!("reading task log failed: {}", err); | |
85 | break; | |
86 | } | |
87 | } | |
88 | } | |
89 | ||
d8d40dd0 DM |
90 | rpcenv.set_result_attrib("total", Value::from(count)); |
91 | ||
5a12c0e2 DM |
92 | Ok(json!(lines)) |
93 | } | |
063ca5be | 94 | |
a665dea1 DM |
95 | fn stop_task( |
96 | param: Value, | |
97 | _info: &ApiMethod, | |
98 | _rpcenv: &mut RpcEnvironment, | |
99 | ) -> Result<Value, Error> { | |
100 | ||
101 | let upid = extract_upid(¶m)?; | |
102 | ||
103 | if crate::server::worker_is_active(&upid) { | |
104 | server::abort_worker_async(upid); | |
105 | } | |
106 | ||
107 | Ok(Value::Null) | |
108 | } | |
109 | ||
063ca5be DM |
110 | fn list_tasks( |
111 | param: Value, | |
112 | _info: &ApiMethod, | |
113 | rpcenv: &mut RpcEnvironment, | |
114 | ) -> Result<Value, Error> { | |
115 | ||
116 | let start = param["start"].as_u64().unwrap_or(0); | |
117 | let limit = param["limit"].as_u64().unwrap_or(50); | |
118 | let errors = param["errors"].as_bool().unwrap_or(false); | |
119 | ||
d2a2e02b DM |
120 | let userfilter = param["userfilter"].as_str(); |
121 | ||
063ca5be DM |
122 | let list = server::read_task_list()?; |
123 | ||
124 | let mut result = vec![]; | |
125 | ||
126 | let mut count = 0; | |
127 | ||
128 | for info in list.iter() { | |
129 | let mut entry = json!({ | |
130 | "upid": info.upid_str, | |
131 | "node": "localhost", | |
132 | "pid": info.upid.pid, | |
133 | "pstart": info.upid.pstart, | |
134 | "starttime": info.upid.starttime, | |
135 | "type": info.upid.worker_type, | |
136 | "id": info.upid.worker_id, | |
137 | "user": info.upid.username, | |
138 | }); | |
139 | ||
d2a2e02b DM |
140 | if let Some(username) = userfilter { |
141 | if !info.upid.username.contains(username) { continue; } | |
142 | } | |
143 | ||
063ca5be DM |
144 | if let Some(ref state) = info.state { |
145 | if errors && state.1 == "OK" { | |
146 | continue; | |
147 | } | |
148 | ||
149 | entry["endtime"] = Value::from(state.0); | |
150 | entry["status"] = Value::from(state.1.clone()); | |
151 | } | |
152 | ||
3c3bee2e | 153 | if (count as u64) < start { |
063ca5be DM |
154 | count += 1; |
155 | continue; | |
156 | } else { | |
157 | count += 1; | |
158 | } | |
159 | ||
160 | if (result.len() as u64) < limit { result.push(entry); }; | |
161 | } | |
162 | ||
163 | rpcenv.set_result_attrib("total", Value::from(count)); | |
164 | ||
165 | Ok(json!(result)) | |
166 | } | |
167 | ||
168 | pub fn router() -> Router { | |
169 | ||
d8d40dd0 | 170 | let upid_schema: Arc<Schema> = Arc::new( |
5a12c0e2 DM |
171 | StringSchema::new("Unique Process/Task ID.") |
172 | .max_length(256) | |
173 | .into() | |
174 | ); | |
175 | ||
176 | let upid_api = Router::new() | |
177 | .get(ApiMethod::new( | |
062d4916 | 178 | || { |
5a12c0e2 DM |
179 | let mut result = vec![]; |
180 | for cmd in &["log", "status"] { | |
181 | result.push(json!({"subdir": cmd })); | |
182 | } | |
183 | Ok(Value::from(result)) | |
184 | }, | |
185 | ObjectSchema::new("Directory index.") | |
d8d40dd0 | 186 | .required("node", crate::api2::node::NODE_SCHEMA.clone()) |
5a12c0e2 DM |
187 | .required("upid", upid_schema.clone())) |
188 | ) | |
a665dea1 DM |
189 | .delete(ApiMethod::new( |
190 | stop_task, | |
191 | ObjectSchema::new("Try to stop a task.") | |
192 | .required("node", crate::api2::node::NODE_SCHEMA.clone()) | |
193 | .required("upid", upid_schema.clone())).protected(true) | |
194 | ||
195 | ) | |
5a12c0e2 DM |
196 | .subdir( |
197 | "log", Router::new() | |
198 | .get( | |
199 | ApiMethod::new( | |
200 | read_task_log, | |
201 | ObjectSchema::new("Read task log.") | |
d8d40dd0 | 202 | .required("node", crate::api2::node::NODE_SCHEMA.clone()) |
5a12c0e2 DM |
203 | .required("upid", upid_schema.clone()) |
204 | .optional( | |
205 | "start", | |
206 | IntegerSchema::new("Start at this line.") | |
207 | .minimum(0) | |
208 | .default(0) | |
209 | ) | |
210 | .optional( | |
211 | "limit", | |
212 | IntegerSchema::new("Only list this amount of lines.") | |
213 | .minimum(0) | |
214 | .default(50) | |
215 | ) | |
216 | ) | |
217 | ) | |
218 | ) | |
219 | .subdir( | |
220 | "status", Router::new() | |
221 | .get( | |
222 | ApiMethod::new( | |
223 | get_task_status, | |
224 | ObjectSchema::new("Get task status.") | |
d8d40dd0 | 225 | .required("node", crate::api2::node::NODE_SCHEMA.clone()) |
5a12c0e2 DM |
226 | .required("upid", upid_schema.clone())) |
227 | ) | |
228 | ); | |
229 | ||
230 | ||
063ca5be DM |
231 | let route = Router::new() |
232 | .get(ApiMethod::new( | |
233 | list_tasks, | |
234 | ObjectSchema::new("List tasks.") | |
d8d40dd0 | 235 | .required("node", crate::api2::node::NODE_SCHEMA.clone()) |
063ca5be DM |
236 | .optional( |
237 | "start", | |
238 | IntegerSchema::new("List tasks beginning from this offset.") | |
239 | .minimum(0) | |
240 | .default(0) | |
241 | ) | |
242 | .optional( | |
243 | "limit", | |
244 | IntegerSchema::new("Only list this amount of tasks.") | |
245 | .minimum(0) | |
246 | .default(50) | |
247 | ) | |
248 | .optional( | |
249 | "errors", | |
250 | BooleanSchema::new("Only list erroneous tasks.") | |
251 | ) | |
d2a2e02b DM |
252 | .optional( |
253 | "userfilter", | |
254 | StringSchema::new("Only list tasks from this user.") | |
255 | ) | |
256 | ) | |
5a12c0e2 DM |
257 | ) |
258 | .match_all("upid", upid_api); | |
063ca5be DM |
259 | |
260 | route | |
261 | } |