]> git.proxmox.com Git - proxmox-backup.git/blame - src/client/task_log.rs
move more tools for the client into subcrates
[proxmox-backup.git] / src / client / task_log.rs
CommitLineData
52f7a730
DM
1use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
2
f7d4e4b5 3use anyhow::{bail, Error};
4470eba5 4use serde_json::{json, Value};
52f7a730
DM
5use tokio::signal::unix::{signal, SignalKind};
6use futures::*;
5a0b484b 7
4470eba5
DM
8use proxmox::api::cli::format_and_print_result;
9
4805edc4 10use pbs_tools::percent_encoding::percent_encode_component;
4470eba5 11
4805edc4
WB
12use super::HttpClient;
13use crate::server::{UPID, worker_is_active_local};
5a0b484b 14
52f7a730
DM
15/// Display task log on console
16///
17/// This polls the task API and prints the log to the console. It also
18/// catches interrupt signals, and sends a abort request to the task if
19/// the user presses CTRL-C. Two interrupts cause an immediate end of
20/// the loop. The task may still run in that case.
5a0b484b 21pub async fn display_task_log(
e68269fc 22 client: &mut HttpClient,
5a0b484b
DM
23 upid_str: &str,
24 strip_date: bool,
25) -> Result<(), Error> {
26
52f7a730
DM
27 let mut signal_stream = signal(SignalKind::interrupt())?;
28 let abort_count = Arc::new(AtomicUsize::new(0));
29 let abort_count2 = Arc::clone(&abort_count);
5a0b484b 30
52f7a730
DM
31 let abort_future = async move {
32 while signal_stream.recv().await.is_some() {
33 println!("got shutdown request (SIGINT)");
34 let prev_count = abort_count2.fetch_add(1, Ordering::SeqCst);
35 if prev_count >= 1 {
36 println!("forced exit (task still running)");
37 break;
38 }
39 }
40 Ok::<_, Error>(())
41 };
5a0b484b 42
52f7a730 43 let request_future = async move {
5a0b484b 44
52f7a730
DM
45 let mut start = 1;
46 let limit = 500;
5a0b484b 47
52f7a730 48 loop {
5a0b484b 49
52f7a730
DM
50 let abort = abort_count.load(Ordering::Relaxed);
51 if abort > 0 {
4805edc4 52 let path = format!("api2/json/nodes/localhost/tasks/{}", percent_encode_component(upid_str));
52f7a730 53 let _ = client.delete(&path, None).await?;
5a0b484b 54 }
5a0b484b 55
52f7a730
DM
56 let param = json!({ "start": start, "limit": limit, "test-status": true });
57
4805edc4 58 let path = format!("api2/json/nodes/localhost/tasks/{}/log", percent_encode_component(upid_str));
52f7a730
DM
59 let result = client.get(&path, Some(param)).await?;
60
61 let active = result["active"].as_bool().unwrap();
62 let total = result["total"].as_u64().unwrap();
63 let data = result["data"].as_array().unwrap();
64
65 let lines = data.len();
66
67 for item in data {
68 let n = item["n"].as_u64().unwrap();
69 let t = item["t"].as_str().unwrap();
70 if n != start { bail!("got wrong line number in response data ({} != {}", n, start); }
71 if strip_date && t.len() > 27 && &t[25..27] == ": " {
72 let line = &t[27..];
73 println!("{}", line);
74 } else {
75 println!("{}", t);
76 }
77 start += 1;
78 }
79
80 if start > total {
81 if active {
82 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
83 } else {
84 break;
85 }
86 } else if lines != limit {
87 bail!("got wrong number of lines from server ({} != {})", lines, limit);
5a0b484b 88 }
5a0b484b 89 }
52f7a730
DM
90
91 Ok(())
92 };
93
94 futures::select!{
95 request = request_future.fuse() => request?,
96 abort = abort_future.fuse() => abort?,
97 };
5a0b484b
DM
98
99 Ok(())
100}
4470eba5
DM
101
102/// Display task result (upid), or view task log - depending on output format
103pub async fn view_task_result(
e68269fc 104 client: &mut HttpClient,
4470eba5
DM
105 result: Value,
106 output_format: &str,
107) -> Result<(), Error> {
108 let data = &result["data"];
109 if output_format == "text" {
110 if let Some(upid) = data.as_str() {
111 display_task_log(client, upid, true).await?;
112 }
113 } else {
114 format_and_print_result(&data, &output_format);
115 }
116
117 Ok(())
118}
119
120/// Wait for a locally spanned worker task
121///
122/// Note: local workers should print logs to stdout, so there is no
123/// need to fetch/display logs. We just wait for the worker to finish.
124pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
125
126 let upid: UPID = upid_str.parse()?;
127
128 let sleep_duration = core::time::Duration::new(0, 100_000_000);
129
130 loop {
131 if worker_is_active_local(&upid) {
132 tokio::time::sleep(sleep_duration).await;
133 } else {
134 break;
135 }
136 }
137 Ok(())
138}