]>
Commit | Line | Data |
---|---|---|
52f7a730 DM |
1 | use std::sync::{Arc, atomic::{AtomicUsize, Ordering}}; |
2 | ||
f7d4e4b5 | 3 | use anyhow::{bail, Error}; |
4470eba5 | 4 | use serde_json::{json, Value}; |
52f7a730 DM |
5 | use tokio::signal::unix::{signal, SignalKind}; |
6 | use futures::*; | |
5a0b484b | 7 | |
4470eba5 DM |
8 | use proxmox::api::cli::format_and_print_result; |
9 | ||
4805edc4 | 10 | use pbs_tools::percent_encoding::percent_encode_component; |
4470eba5 | 11 | |
4805edc4 WB |
12 | use super::HttpClient; |
13 | use crate::server::{UPID, worker_is_active_local}; | |
5a0b484b | 14 | |
52f7a730 DM |
15 | /// Display task log on console |
16 | /// | |
17 | /// This polls the task API and prints the log to the console. It also | |
18 | /// catches interrupt signals, and sends a abort request to the task if | |
19 | /// the user presses CTRL-C. Two interrupts cause an immediate end of | |
20 | /// the loop. The task may still run in that case. | |
5a0b484b | 21 | pub async fn display_task_log( |
e68269fc | 22 | client: &mut HttpClient, |
5a0b484b DM |
23 | upid_str: &str, |
24 | strip_date: bool, | |
25 | ) -> Result<(), Error> { | |
26 | ||
52f7a730 DM |
27 | let mut signal_stream = signal(SignalKind::interrupt())?; |
28 | let abort_count = Arc::new(AtomicUsize::new(0)); | |
29 | let abort_count2 = Arc::clone(&abort_count); | |
5a0b484b | 30 | |
52f7a730 DM |
31 | let abort_future = async move { |
32 | while signal_stream.recv().await.is_some() { | |
33 | println!("got shutdown request (SIGINT)"); | |
34 | let prev_count = abort_count2.fetch_add(1, Ordering::SeqCst); | |
35 | if prev_count >= 1 { | |
36 | println!("forced exit (task still running)"); | |
37 | break; | |
38 | } | |
39 | } | |
40 | Ok::<_, Error>(()) | |
41 | }; | |
5a0b484b | 42 | |
52f7a730 | 43 | let request_future = async move { |
5a0b484b | 44 | |
52f7a730 DM |
45 | let mut start = 1; |
46 | let limit = 500; | |
5a0b484b | 47 | |
52f7a730 | 48 | loop { |
5a0b484b | 49 | |
52f7a730 DM |
50 | let abort = abort_count.load(Ordering::Relaxed); |
51 | if abort > 0 { | |
4805edc4 | 52 | let path = format!("api2/json/nodes/localhost/tasks/{}", percent_encode_component(upid_str)); |
52f7a730 | 53 | let _ = client.delete(&path, None).await?; |
5a0b484b | 54 | } |
5a0b484b | 55 | |
52f7a730 DM |
56 | let param = json!({ "start": start, "limit": limit, "test-status": true }); |
57 | ||
4805edc4 | 58 | let path = format!("api2/json/nodes/localhost/tasks/{}/log", percent_encode_component(upid_str)); |
52f7a730 DM |
59 | let result = client.get(&path, Some(param)).await?; |
60 | ||
61 | let active = result["active"].as_bool().unwrap(); | |
62 | let total = result["total"].as_u64().unwrap(); | |
63 | let data = result["data"].as_array().unwrap(); | |
64 | ||
65 | let lines = data.len(); | |
66 | ||
67 | for item in data { | |
68 | let n = item["n"].as_u64().unwrap(); | |
69 | let t = item["t"].as_str().unwrap(); | |
70 | if n != start { bail!("got wrong line number in response data ({} != {}", n, start); } | |
71 | if strip_date && t.len() > 27 && &t[25..27] == ": " { | |
72 | let line = &t[27..]; | |
73 | println!("{}", line); | |
74 | } else { | |
75 | println!("{}", t); | |
76 | } | |
77 | start += 1; | |
78 | } | |
79 | ||
80 | if start > total { | |
81 | if active { | |
82 | tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; | |
83 | } else { | |
84 | break; | |
85 | } | |
86 | } else if lines != limit { | |
87 | bail!("got wrong number of lines from server ({} != {})", lines, limit); | |
5a0b484b | 88 | } |
5a0b484b | 89 | } |
52f7a730 DM |
90 | |
91 | Ok(()) | |
92 | }; | |
93 | ||
94 | futures::select!{ | |
95 | request = request_future.fuse() => request?, | |
96 | abort = abort_future.fuse() => abort?, | |
97 | }; | |
5a0b484b DM |
98 | |
99 | Ok(()) | |
100 | } | |
4470eba5 DM |
101 | |
102 | /// Display task result (upid), or view task log - depending on output format | |
103 | pub async fn view_task_result( | |
e68269fc | 104 | client: &mut HttpClient, |
4470eba5 DM |
105 | result: Value, |
106 | output_format: &str, | |
107 | ) -> Result<(), Error> { | |
108 | let data = &result["data"]; | |
109 | if output_format == "text" { | |
110 | if let Some(upid) = data.as_str() { | |
111 | display_task_log(client, upid, true).await?; | |
112 | } | |
113 | } else { | |
114 | format_and_print_result(&data, &output_format); | |
115 | } | |
116 | ||
117 | Ok(()) | |
118 | } | |
119 | ||
120 | /// Wait for a locally spanned worker task | |
121 | /// | |
122 | /// Note: local workers should print logs to stdout, so there is no | |
123 | /// need to fetch/display logs. We just wait for the worker to finish. | |
124 | pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> { | |
125 | ||
126 | let upid: UPID = upid_str.parse()?; | |
127 | ||
128 | let sleep_duration = core::time::Duration::new(0, 100_000_000); | |
129 | ||
130 | loop { | |
131 | if worker_is_active_local(&upid) { | |
132 | tokio::time::sleep(sleep_duration).await; | |
133 | } else { | |
134 | break; | |
135 | } | |
136 | } | |
137 | Ok(()) | |
138 | } |