1 use std
::sync
::{Arc, atomic::{AtomicUsize, Ordering}
};
3 use anyhow
::{bail, Error}
;
4 use serde_json
::{json, Value}
;
5 use tokio
::signal
::unix
::{signal, SignalKind}
;
8 use proxmox
::api
::cli
::format_and_print_result
;
10 use pbs_tools
::percent_encoding
::percent_encode_component
;
12 use super::HttpClient
;
13 use crate::server
::{UPID, worker_is_active_local}
;
15 /// Display task log on console
17 /// This polls the task API and prints the log to the console. It also
18 /// catches interrupt signals, and sends a abort request to the task if
19 /// the user presses CTRL-C. Two interrupts cause an immediate end of
20 /// the loop. The task may still run in that case.
21 pub async
fn display_task_log(
22 client
: &mut HttpClient
,
25 ) -> Result
<(), Error
> {
27 let mut signal_stream
= signal(SignalKind
::interrupt())?
;
28 let abort_count
= Arc
::new(AtomicUsize
::new(0));
29 let abort_count2
= Arc
::clone(&abort_count
);
31 let abort_future
= async
move {
32 while signal_stream
.recv().await
.is_some() {
33 println
!("got shutdown request (SIGINT)");
34 let prev_count
= abort_count2
.fetch_add(1, Ordering
::SeqCst
);
36 println
!("forced exit (task still running)");
43 let request_future
= async
move {
50 let abort
= abort_count
.load(Ordering
::Relaxed
);
52 let path
= format
!("api2/json/nodes/localhost/tasks/{}", percent_encode_component(upid_str
));
53 let _
= client
.delete(&path
, None
).await?
;
56 let param
= json
!({ "start": start, "limit": limit, "test-status": true }
);
58 let path
= format
!("api2/json/nodes/localhost/tasks/{}/log", percent_encode_component(upid_str
));
59 let result
= client
.get(&path
, Some(param
)).await?
;
61 let active
= result
["active"].as_bool().unwrap();
62 let total
= result
["total"].as_u64().unwrap();
63 let data
= result
["data"].as_array().unwrap();
65 let lines
= data
.len();
68 let n
= item
["n"].as_u64().unwrap();
69 let t
= item
["t"].as_str().unwrap();
70 if n
!= start { bail!("got wrong line number in response data ({}
!= {}
", n, start); }
71 if strip_date && t.len() > 27 && &t[25..27] == ": " {
82 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
86 } else if lines != limit {
87 bail!("got wrong number of lines from
server ({}
!= {}
)", lines, limit);
95 request = request_future.fuse() => request?,
96 abort = abort_future.fuse() => abort?,
102 /// Display task result (upid), or view task log - depending on output format
103 pub async fn view_task_result(
104 client: &mut HttpClient,
107 ) -> Result<(), Error> {
108 let data = &result["data
"];
109 if output_format == "text
" {
110 if let Some(upid) = data.as_str() {
111 display_task_log(client, upid, true).await?;
114 format_and_print_result(&data, &output_format);
120 /// Wait for a locally spanned worker task
122 /// Note: local workers should print logs to stdout, so there is no
123 /// need to fetch/display logs. We just wait for the worker to finish.
124 pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
126 let upid: UPID = upid_str.parse()?;
128 let sleep_duration = core::time::Duration::new(0, 100_000_000);
131 if worker_is_active_local(&upid) {
132 tokio::time::sleep(sleep_duration).await;