]>
Commit | Line | Data |
---|---|---|
8bf293bf DC |
1 | use std::sync::Arc; |
2 | ||
3 | use anyhow::{bail, Error}; | |
4 | use hyper::Body; | |
5 | use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; | |
ab5d5b39 | 6 | use proxmox_http::HttpOptions; |
8bf293bf DC |
7 | use tokio::sync::mpsc; |
8 | ||
da49b98d | 9 | use proxmox_http::client::Client; |
8bf293bf DC |
10 | |
11 | use crate::influxdb::utils; | |
12 | use crate::{Metrics, MetricsData}; | |
13 | ||
14 | struct InfluxDbHttp { | |
da49b98d | 15 | client: Client, |
8bf293bf DC |
16 | healthuri: http::Uri, |
17 | writeuri: http::Uri, | |
18 | token: Option<String>, | |
19 | max_body_size: usize, | |
20 | data: String, | |
21 | channel: mpsc::Receiver<Arc<MetricsData>>, | |
22 | } | |
23 | ||
24 | /// Tests the connection to the given influxdb http server with the given | |
25 | /// parameters. | |
26 | pub async fn test_influxdb_http( | |
27 | uri: &str, | |
28 | organization: &str, | |
29 | bucket: &str, | |
30 | token: Option<&str>, | |
31 | verify_tls: bool, | |
32 | ) -> Result<(), Error> { | |
33 | let (_tx, rx) = mpsc::channel(1); | |
34 | ||
35 | let this = InfluxDbHttp::new(uri, organization, bucket, token, verify_tls, 1, rx)?; | |
36 | ||
37 | this.test_connection().await | |
38 | } | |
39 | ||
e325f4a0 | 40 | /// Get a [`Metrics`] handle for an influxdb server accessed via HTTPS. |
8bf293bf DC |
41 | pub fn influxdb_http( |
42 | uri: &str, | |
43 | organization: &str, | |
44 | bucket: &str, | |
45 | token: Option<&str>, | |
46 | verify_tls: bool, | |
47 | max_body_size: usize, | |
48 | ) -> Result<Metrics, Error> { | |
49 | let (tx, rx) = mpsc::channel(1024); | |
50 | ||
51 | let this = InfluxDbHttp::new( | |
52 | uri, | |
53 | organization, | |
54 | bucket, | |
55 | token, | |
56 | verify_tls, | |
57 | max_body_size, | |
58 | rx, | |
59 | )?; | |
60 | ||
138f32e3 | 61 | let join_handle = Some(tokio::spawn(this.finish())); |
8bf293bf DC |
62 | |
63 | Ok(Metrics { | |
64 | join_handle, | |
65 | channel: Some(tx), | |
66 | }) | |
67 | } | |
68 | ||
69 | impl InfluxDbHttp { | |
70 | fn new( | |
71 | uri: &str, | |
72 | organization: &str, | |
73 | bucket: &str, | |
74 | token: Option<&str>, | |
75 | verify_tls: bool, | |
76 | max_body_size: usize, | |
77 | channel: mpsc::Receiver<Arc<MetricsData>>, | |
78 | ) -> Result<Self, Error> { | |
79 | let client = if verify_tls { | |
da49b98d | 80 | Client::with_options(HttpOptions::default()) |
8bf293bf DC |
81 | } else { |
82 | let mut ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap(); | |
83 | ssl_connector.set_verify(SslVerifyMode::NONE); | |
da49b98d | 84 | Client::with_ssl_connector(ssl_connector.build(), HttpOptions::default()) |
8bf293bf DC |
85 | }; |
86 | ||
87 | let uri: http::uri::Uri = uri.parse()?; | |
88 | let uri_parts = uri.into_parts(); | |
89 | ||
90 | let base_path = if let Some(ref p) = uri_parts.path_and_query { | |
91 | p.path().trim_end_matches('/') | |
92 | } else { | |
93 | "" | |
94 | }; | |
95 | ||
96 | let writeuri = http::uri::Builder::new() | |
97 | .scheme(uri_parts.scheme.clone().unwrap()) | |
98 | .authority(uri_parts.authority.clone().unwrap()) | |
99 | .path_and_query(format!( | |
100 | "{}/api/v2/write?org={}&bucket={}", | |
101 | base_path, organization, bucket | |
102 | )) | |
103 | .build()?; | |
104 | ||
105 | let healthuri = http::uri::Builder::new() | |
106 | .scheme(uri_parts.scheme.unwrap()) | |
107 | .authority(uri_parts.authority.unwrap()) | |
108 | .path_and_query(format!("{}/health", base_path)) | |
109 | .build()?; | |
110 | ||
111 | Ok(InfluxDbHttp { | |
112 | client, | |
113 | writeuri, | |
114 | healthuri, | |
115 | token: token.map(String::from), | |
116 | max_body_size, | |
117 | data: String::new(), | |
118 | channel, | |
119 | }) | |
120 | } | |
121 | ||
122 | async fn test_connection(&self) -> Result<(), Error> { | |
123 | let mut request = http::Request::builder().method("GET").uri(&self.healthuri); | |
124 | ||
125 | if let Some(token) = &self.token { | |
126 | request = request.header("Authorization", format!("Token {}", token)); | |
127 | } | |
128 | ||
129 | let res = self.client.request(request.body(Body::empty())?).await?; | |
130 | ||
131 | let status = res.status(); | |
132 | if !status.is_success() { | |
133 | bail!("got bad status: {}", status); | |
134 | } | |
135 | ||
136 | Ok(()) | |
137 | } | |
138 | ||
139 | async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> { | |
140 | let new_data = utils::format_influxdb_line(&data)?; | |
141 | ||
142 | if self.data.len() + new_data.len() >= self.max_body_size { | |
143 | self.flush().await?; | |
144 | } | |
145 | ||
146 | self.data.push_str(&new_data); | |
147 | ||
148 | if self.data.len() >= self.max_body_size { | |
149 | self.flush().await?; | |
150 | } | |
151 | ||
152 | Ok(()) | |
153 | } | |
154 | ||
155 | async fn flush(&mut self) -> Result<(), Error> { | |
156 | if self.data.is_empty() { | |
157 | return Ok(()); | |
158 | } | |
159 | let mut request = http::Request::builder().method("POST").uri(&self.writeuri); | |
160 | ||
161 | if let Some(token) = &self.token { | |
162 | request = request.header("Authorization", format!("Token {}", token)); | |
163 | } | |
164 | ||
165 | let request = request.body(Body::from(self.data.split_off(0)))?; | |
166 | ||
167 | let res = self.client.request(request).await?; | |
168 | ||
169 | let status = res.status(); | |
170 | if !status.is_success() { | |
171 | bail!("got bad status: {}", status); | |
172 | } | |
173 | Ok(()) | |
174 | } | |
175 | ||
176 | async fn finish(mut self) -> Result<(), Error> { | |
177 | while let Some(data) = self.channel.recv().await { | |
178 | self.add_data(data).await?; | |
179 | } | |
180 | ||
181 | self.flush().await?; | |
182 | ||
183 | Ok(()) | |
184 | } | |
185 | } |