]> git.proxmox.com Git - proxmox.git/blame - proxmox-metrics/src/influxdb/http.rs
http: rename SimpleHttp to Client
[proxmox.git] / proxmox-metrics / src / influxdb / http.rs
CommitLineData
8bf293bf
DC
1use std::sync::Arc;
2
3use anyhow::{bail, Error};
4use hyper::Body;
5use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
ab5d5b39 6use proxmox_http::HttpOptions;
8bf293bf
DC
7use tokio::sync::mpsc;
8
da49b98d 9use proxmox_http::client::Client;
8bf293bf
DC
10
11use crate::influxdb::utils;
12use crate::{Metrics, MetricsData};
13
14struct InfluxDbHttp {
da49b98d 15 client: Client,
8bf293bf
DC
16 healthuri: http::Uri,
17 writeuri: http::Uri,
18 token: Option<String>,
19 max_body_size: usize,
20 data: String,
21 channel: mpsc::Receiver<Arc<MetricsData>>,
22}
23
24/// Tests the connection to the given influxdb http server with the given
25/// parameters.
26pub async fn test_influxdb_http(
27 uri: &str,
28 organization: &str,
29 bucket: &str,
30 token: Option<&str>,
31 verify_tls: bool,
32) -> Result<(), Error> {
33 let (_tx, rx) = mpsc::channel(1);
34
35 let this = InfluxDbHttp::new(uri, organization, bucket, token, verify_tls, 1, rx)?;
36
37 this.test_connection().await
38}
39
e325f4a0 40/// Get a [`Metrics`] handle for an influxdb server accessed via HTTPS.
8bf293bf
DC
41pub fn influxdb_http(
42 uri: &str,
43 organization: &str,
44 bucket: &str,
45 token: Option<&str>,
46 verify_tls: bool,
47 max_body_size: usize,
48) -> Result<Metrics, Error> {
49 let (tx, rx) = mpsc::channel(1024);
50
51 let this = InfluxDbHttp::new(
52 uri,
53 organization,
54 bucket,
55 token,
56 verify_tls,
57 max_body_size,
58 rx,
59 )?;
60
138f32e3 61 let join_handle = Some(tokio::spawn(this.finish()));
8bf293bf
DC
62
63 Ok(Metrics {
64 join_handle,
65 channel: Some(tx),
66 })
67}
68
69impl InfluxDbHttp {
70 fn new(
71 uri: &str,
72 organization: &str,
73 bucket: &str,
74 token: Option<&str>,
75 verify_tls: bool,
76 max_body_size: usize,
77 channel: mpsc::Receiver<Arc<MetricsData>>,
78 ) -> Result<Self, Error> {
79 let client = if verify_tls {
da49b98d 80 Client::with_options(HttpOptions::default())
8bf293bf
DC
81 } else {
82 let mut ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap();
83 ssl_connector.set_verify(SslVerifyMode::NONE);
da49b98d 84 Client::with_ssl_connector(ssl_connector.build(), HttpOptions::default())
8bf293bf
DC
85 };
86
87 let uri: http::uri::Uri = uri.parse()?;
88 let uri_parts = uri.into_parts();
89
90 let base_path = if let Some(ref p) = uri_parts.path_and_query {
91 p.path().trim_end_matches('/')
92 } else {
93 ""
94 };
95
96 let writeuri = http::uri::Builder::new()
97 .scheme(uri_parts.scheme.clone().unwrap())
98 .authority(uri_parts.authority.clone().unwrap())
99 .path_and_query(format!(
100 "{}/api/v2/write?org={}&bucket={}",
101 base_path, organization, bucket
102 ))
103 .build()?;
104
105 let healthuri = http::uri::Builder::new()
106 .scheme(uri_parts.scheme.unwrap())
107 .authority(uri_parts.authority.unwrap())
108 .path_and_query(format!("{}/health", base_path))
109 .build()?;
110
111 Ok(InfluxDbHttp {
112 client,
113 writeuri,
114 healthuri,
115 token: token.map(String::from),
116 max_body_size,
117 data: String::new(),
118 channel,
119 })
120 }
121
122 async fn test_connection(&self) -> Result<(), Error> {
123 let mut request = http::Request::builder().method("GET").uri(&self.healthuri);
124
125 if let Some(token) = &self.token {
126 request = request.header("Authorization", format!("Token {}", token));
127 }
128
129 let res = self.client.request(request.body(Body::empty())?).await?;
130
131 let status = res.status();
132 if !status.is_success() {
133 bail!("got bad status: {}", status);
134 }
135
136 Ok(())
137 }
138
139 async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
140 let new_data = utils::format_influxdb_line(&data)?;
141
142 if self.data.len() + new_data.len() >= self.max_body_size {
143 self.flush().await?;
144 }
145
146 self.data.push_str(&new_data);
147
148 if self.data.len() >= self.max_body_size {
149 self.flush().await?;
150 }
151
152 Ok(())
153 }
154
155 async fn flush(&mut self) -> Result<(), Error> {
156 if self.data.is_empty() {
157 return Ok(());
158 }
159 let mut request = http::Request::builder().method("POST").uri(&self.writeuri);
160
161 if let Some(token) = &self.token {
162 request = request.header("Authorization", format!("Token {}", token));
163 }
164
165 let request = request.body(Body::from(self.data.split_off(0)))?;
166
167 let res = self.client.request(request).await?;
168
169 let status = res.status();
170 if !status.is_success() {
171 bail!("got bad status: {}", status);
172 }
173 Ok(())
174 }
175
176 async fn finish(mut self) -> Result<(), Error> {
177 while let Some(data) = self.channel.recv().await {
178 self.add_data(data).await?;
179 }
180
181 self.flush().await?;
182
183 Ok(())
184 }
185}