]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | // Copyright The OpenTelemetry Authors |
2 | // SPDX-License-Identifier: Apache-2.0 | |
3 | ||
4 | #pragma once | |
5 | #ifdef ENABLE_METRICS_PREVIEW | |
6 | ||
7 | # include <map> | |
8 | ||
9 | # include "opentelemetry/common/macros.h" | |
10 | # include "opentelemetry/sdk/_metrics/aggregator/counter_aggregator.h" | |
11 | # include "opentelemetry/sdk/_metrics/aggregator/exact_aggregator.h" | |
12 | # include "opentelemetry/sdk/_metrics/aggregator/gauge_aggregator.h" | |
13 | # include "opentelemetry/sdk/_metrics/aggregator/histogram_aggregator.h" | |
14 | # include "opentelemetry/sdk/_metrics/aggregator/min_max_sum_count_aggregator.h" | |
15 | # include "opentelemetry/sdk/_metrics/aggregator/sketch_aggregator.h" | |
16 | # include "opentelemetry/sdk/_metrics/processor.h" | |
17 | # include "opentelemetry/sdk/_metrics/record.h" | |
18 | # include "opentelemetry/version.h" | |
19 | ||
20 | OPENTELEMETRY_BEGIN_NAMESPACE | |
21 | ||
22 | namespace sdk | |
23 | { | |
24 | ||
25 | namespace metrics | |
26 | { | |
27 | ||
28 | struct KeyStruct | |
29 | { | |
30 | std::string name; | |
31 | std::string description; | |
32 | std::string labels; | |
33 | opentelemetry::metrics::InstrumentKind ins_kind; | |
34 | ||
35 | // constructor | |
36 | KeyStruct(std::string name, | |
37 | std::string description, | |
38 | std::string labels, | |
39 | opentelemetry::metrics::InstrumentKind ins_kind) | |
40 | { | |
41 | this->name = name; | |
42 | this->description = description; | |
43 | this->labels = labels; | |
44 | this->ins_kind = ins_kind; | |
45 | } | |
46 | ||
47 | // operator== is required to compare keys in case of hash collision | |
48 | bool operator==(const KeyStruct &p) const | |
49 | { | |
50 | return name == p.name && description == p.description && labels == p.labels && | |
51 | ins_kind == p.ins_kind; | |
52 | } | |
53 | }; | |
54 | ||
55 | struct KeyStruct_Hash | |
56 | { | |
57 | std::size_t operator()(const KeyStruct &keystruct) const | |
58 | { | |
59 | std::size_t name_size = keystruct.name.length(); | |
60 | std::size_t desc_size = keystruct.description.length(); | |
61 | std::size_t labels_size = keystruct.labels.length(); | |
62 | std::size_t ins_size = (int)keystruct.ins_kind; | |
63 | ||
64 | return (name_size ^ desc_size ^ labels_size) + ins_size; | |
65 | } | |
66 | }; | |
67 | ||
68 | class UngroupedMetricsProcessor : public MetricsProcessor | |
69 | { | |
70 | public: | |
71 | explicit UngroupedMetricsProcessor(bool stateful); | |
72 | ||
73 | std::vector<opentelemetry::sdk::metrics::Record> CheckpointSelf() noexcept override; | |
74 | ||
75 | virtual void FinishedCollection() noexcept override; | |
76 | ||
77 | virtual void process(opentelemetry::sdk::metrics::Record record) noexcept override; | |
78 | ||
79 | private: | |
80 | bool stateful_; | |
81 | std::unordered_map<KeyStruct, opentelemetry::sdk::metrics::AggregatorVariant, KeyStruct_Hash> | |
82 | batch_map_; | |
83 | ||
84 | /** | |
85 | * get_instrument returns the instrument from the passed in AggregatorVariant. We have to | |
86 | * unpack the variant then get the instrument from the Aggreagtor. | |
87 | */ | |
88 | opentelemetry::metrics::InstrumentKind get_instrument( | |
89 | opentelemetry::sdk::metrics::AggregatorVariant aggregator) | |
90 | { | |
91 | if (nostd::holds_alternative<std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<short>>>( | |
92 | aggregator)) | |
93 | { | |
94 | return nostd::get<std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<short>>>(aggregator) | |
95 | ->get_instrument_kind(); | |
96 | } | |
97 | else if (nostd::holds_alternative< | |
98 | std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<int>>>(aggregator)) | |
99 | { | |
100 | return nostd::get<std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<int>>>(aggregator) | |
101 | ->get_instrument_kind(); | |
102 | } | |
103 | else if (nostd::holds_alternative< | |
104 | std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<float>>>(aggregator)) | |
105 | { | |
106 | return nostd::get<std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<float>>>(aggregator) | |
107 | ->get_instrument_kind(); | |
108 | } | |
109 | else if (nostd::holds_alternative< | |
110 | std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<double>>>(aggregator)) | |
111 | { | |
112 | return nostd::get<std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<double>>>( | |
113 | aggregator) | |
114 | ->get_instrument_kind(); | |
115 | } | |
116 | ||
117 | return opentelemetry::metrics::InstrumentKind::Counter; | |
118 | } | |
119 | ||
120 | /** | |
121 | * aggregator_copy creates a copy of the aggregtor passed through process() for a | |
122 | * stateful processor. For Sketch, Histogram and Exact we also need to pass in | |
123 | * additional constructor values | |
124 | */ | |
125 | template <typename T> | |
126 | std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>> aggregator_copy( | |
127 | std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>> aggregator) | |
128 | { | |
129 | auto ins_kind = aggregator->get_instrument_kind(); | |
130 | auto agg_kind = aggregator->get_aggregator_kind(); | |
131 | ||
132 | switch (agg_kind) | |
133 | { | |
134 | case opentelemetry::sdk::metrics::AggregatorKind::Counter: | |
135 | return std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>>( | |
136 | new opentelemetry::sdk::metrics::CounterAggregator<T>(ins_kind)); | |
137 | ||
138 | case opentelemetry::sdk::metrics::AggregatorKind::MinMaxSumCount: | |
139 | return std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>>( | |
140 | new opentelemetry::sdk::metrics::MinMaxSumCountAggregator<T>(ins_kind)); | |
141 | ||
142 | case opentelemetry::sdk::metrics::AggregatorKind::Gauge: | |
143 | return std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>>( | |
144 | new opentelemetry::sdk::metrics::GaugeAggregator<T>(ins_kind)); | |
145 | ||
146 | case opentelemetry::sdk::metrics::AggregatorKind::Sketch: | |
147 | return std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>>( | |
148 | new opentelemetry::sdk::metrics::SketchAggregator<T>( | |
149 | ins_kind, aggregator->get_error_bound(), aggregator->get_max_buckets())); | |
150 | ||
151 | case opentelemetry::sdk::metrics::AggregatorKind::Histogram: | |
152 | return std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>>( | |
153 | new opentelemetry::sdk::metrics::HistogramAggregator<T>(ins_kind, | |
154 | aggregator->get_boundaries())); | |
155 | ||
156 | case opentelemetry::sdk::metrics::AggregatorKind::Exact: | |
157 | return std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>>( | |
158 | new opentelemetry::sdk::metrics::ExactAggregator<T>( | |
159 | ins_kind, aggregator->get_quant_estimation())); | |
160 | ||
161 | default: | |
162 | return std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>>( | |
163 | new opentelemetry::sdk::metrics::CounterAggregator<T>(ins_kind)); | |
164 | } | |
165 | }; | |
166 | ||
167 | /** | |
168 | * aggregator_for will return an Aggregator based off the instrument passed in. This should be | |
169 | * the function that we assign Aggreagtors for instruments, but is currently unused in our | |
170 | * pipeline. | |
171 | */ | |
172 | template <typename T> | |
173 | std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>> aggregator_for( | |
174 | opentelemetry::metrics::InstrumentKind ins_kind) | |
175 | { | |
176 | switch (ins_kind) | |
177 | { | |
178 | case opentelemetry::metrics::InstrumentKind::Counter: | |
179 | return std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>>( | |
180 | new opentelemetry::sdk::metrics::CounterAggregator<T>(ins_kind)); | |
181 | ||
182 | case opentelemetry::metrics::InstrumentKind::UpDownCounter: | |
183 | return std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>>( | |
184 | new opentelemetry::sdk::metrics::CounterAggregator<T>(ins_kind)); | |
185 | ||
186 | case opentelemetry::metrics::InstrumentKind::ValueRecorder: | |
187 | return std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>>( | |
188 | new opentelemetry::sdk::metrics::MinMaxSumCountAggregator<T>(ins_kind)); | |
189 | ||
190 | case opentelemetry::metrics::InstrumentKind::SumObserver: | |
191 | return std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>>( | |
192 | new opentelemetry::sdk::metrics::CounterAggregator<T>(ins_kind)); | |
193 | ||
194 | case opentelemetry::metrics::InstrumentKind::UpDownSumObserver: | |
195 | return std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>>( | |
196 | new opentelemetry::sdk::metrics::CounterAggregator<T>(ins_kind)); | |
197 | ||
198 | case opentelemetry::metrics::InstrumentKind::ValueObserver: | |
199 | return std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>>( | |
200 | new opentelemetry::sdk::metrics::MinMaxSumCountAggregator<T>(ins_kind)); | |
201 | ||
202 | default: | |
203 | return std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>>( | |
204 | new opentelemetry::sdk::metrics::CounterAggregator<T>(ins_kind)); | |
205 | } | |
206 | }; | |
207 | ||
208 | /** | |
209 | * merge_aggreagtors takes in two shared pointers to aggregators of the same kind. | |
210 | * We first need to dynamically cast to the actual Aggregator that is held in the | |
211 | * Aggregator<T> wrapper. Then we must get the underlying pointer from the shared | |
212 | * pointer and merge them together. | |
213 | */ | |
214 | template <typename T> | |
215 | void merge_aggregators(std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>> batch_agg, | |
216 | std::shared_ptr<opentelemetry::sdk::metrics::Aggregator<T>> record_agg) | |
217 | { | |
218 | auto agg_kind = batch_agg->get_aggregator_kind(); | |
219 | if (agg_kind == opentelemetry::sdk::metrics::AggregatorKind::Counter) | |
220 | { | |
221 | # ifdef OPENTELEMETRY_RTTI_ENABLED | |
222 | std::shared_ptr<opentelemetry::sdk::metrics::CounterAggregator<T>> temp_batch_agg_counter = | |
223 | std::dynamic_pointer_cast<opentelemetry::sdk::metrics::CounterAggregator<T>>(batch_agg); | |
224 | ||
225 | std::shared_ptr<opentelemetry::sdk::metrics::CounterAggregator<T>> temp_record_agg_counter = | |
226 | std::dynamic_pointer_cast<opentelemetry::sdk::metrics::CounterAggregator<T>>(record_agg); | |
227 | # else | |
228 | std::shared_ptr<opentelemetry::sdk::metrics::CounterAggregator<T>> temp_batch_agg_counter = | |
229 | std::static_pointer_cast<opentelemetry::sdk::metrics::CounterAggregator<T>>(batch_agg); | |
230 | ||
231 | std::shared_ptr<opentelemetry::sdk::metrics::CounterAggregator<T>> temp_record_agg_counter = | |
232 | std::static_pointer_cast<opentelemetry::sdk::metrics::CounterAggregator<T>>(record_agg); | |
233 | # endif | |
234 | auto temp_batch_agg_raw_counter = temp_batch_agg_counter.get(); | |
235 | auto temp_record_agg_raw_counter = temp_record_agg_counter.get(); | |
236 | ||
237 | temp_batch_agg_raw_counter->merge(*temp_record_agg_raw_counter); | |
238 | } | |
239 | else if (agg_kind == opentelemetry::sdk::metrics::AggregatorKind::MinMaxSumCount) | |
240 | { | |
241 | # ifdef OPENTELEMETRY_RTTI_ENABLED | |
242 | std::shared_ptr<opentelemetry::sdk::metrics::MinMaxSumCountAggregator<T>> | |
243 | temp_batch_agg_mmsc = | |
244 | std::dynamic_pointer_cast<opentelemetry::sdk::metrics::MinMaxSumCountAggregator<T>>( | |
245 | batch_agg); | |
246 | ||
247 | std::shared_ptr<opentelemetry::sdk::metrics::MinMaxSumCountAggregator<T>> | |
248 | temp_record_agg_mmsc = | |
249 | std::dynamic_pointer_cast<opentelemetry::sdk::metrics::MinMaxSumCountAggregator<T>>( | |
250 | record_agg); | |
251 | # else | |
252 | std::shared_ptr<opentelemetry::sdk::metrics::MinMaxSumCountAggregator<T>> | |
253 | temp_batch_agg_mmsc = | |
254 | std::static_pointer_cast<opentelemetry::sdk::metrics::MinMaxSumCountAggregator<T>>( | |
255 | batch_agg); | |
256 | ||
257 | std::shared_ptr<opentelemetry::sdk::metrics::MinMaxSumCountAggregator<T>> | |
258 | temp_record_agg_mmsc = | |
259 | std::static_pointer_cast<opentelemetry::sdk::metrics::MinMaxSumCountAggregator<T>>( | |
260 | record_agg); | |
261 | # endif | |
262 | ||
263 | auto temp_batch_agg_raw_mmsc = temp_batch_agg_mmsc.get(); | |
264 | auto temp_record_agg_raw_mmsc = temp_record_agg_mmsc.get(); | |
265 | ||
266 | temp_batch_agg_raw_mmsc->merge(*temp_record_agg_raw_mmsc); | |
267 | } | |
268 | else if (agg_kind == opentelemetry::sdk::metrics::AggregatorKind::Gauge) | |
269 | { | |
270 | # ifdef OPENTELEMETRY_RTTI_ENABLED | |
271 | std::shared_ptr<opentelemetry::sdk::metrics::GaugeAggregator<T>> temp_batch_agg_gauge = | |
272 | std::dynamic_pointer_cast<opentelemetry::sdk::metrics::GaugeAggregator<T>>(batch_agg); | |
273 | ||
274 | std::shared_ptr<opentelemetry::sdk::metrics::GaugeAggregator<T>> temp_record_agg_gauge = | |
275 | std::dynamic_pointer_cast<opentelemetry::sdk::metrics::GaugeAggregator<T>>(record_agg); | |
276 | # else | |
277 | std::shared_ptr<opentelemetry::sdk::metrics::GaugeAggregator<T>> temp_batch_agg_gauge = | |
278 | std::static_pointer_cast<opentelemetry::sdk::metrics::GaugeAggregator<T>>(batch_agg); | |
279 | ||
280 | std::shared_ptr<opentelemetry::sdk::metrics::GaugeAggregator<T>> temp_record_agg_gauge = | |
281 | std::static_pointer_cast<opentelemetry::sdk::metrics::GaugeAggregator<T>>(record_agg); | |
282 | # endif | |
283 | ||
284 | auto temp_batch_agg_raw_gauge = temp_batch_agg_gauge.get(); | |
285 | auto temp_record_agg_raw_gauge = temp_record_agg_gauge.get(); | |
286 | ||
287 | temp_batch_agg_raw_gauge->merge(*temp_record_agg_raw_gauge); | |
288 | } | |
289 | else if (agg_kind == opentelemetry::sdk::metrics::AggregatorKind::Sketch) | |
290 | { | |
291 | # ifdef OPENTELEMETRY_RTTI_ENABLED | |
292 | std::shared_ptr<opentelemetry::sdk::metrics::SketchAggregator<T>> temp_batch_agg_sketch = | |
293 | std::dynamic_pointer_cast<opentelemetry::sdk::metrics::SketchAggregator<T>>(batch_agg); | |
294 | ||
295 | std::shared_ptr<opentelemetry::sdk::metrics::SketchAggregator<T>> temp_record_agg_sketch = | |
296 | std::dynamic_pointer_cast<opentelemetry::sdk::metrics::SketchAggregator<T>>(record_agg); | |
297 | # else | |
298 | std::shared_ptr<opentelemetry::sdk::metrics::SketchAggregator<T>> temp_batch_agg_sketch = | |
299 | std::static_pointer_cast<opentelemetry::sdk::metrics::SketchAggregator<T>>(batch_agg); | |
300 | ||
301 | std::shared_ptr<opentelemetry::sdk::metrics::SketchAggregator<T>> temp_record_agg_sketch = | |
302 | std::static_pointer_cast<opentelemetry::sdk::metrics::SketchAggregator<T>>(record_agg); | |
303 | # endif | |
304 | auto temp_batch_agg_raw_sketch = temp_batch_agg_sketch.get(); | |
305 | auto temp_record_agg_raw_sketch = temp_record_agg_sketch.get(); | |
306 | ||
307 | temp_batch_agg_raw_sketch->merge(*temp_record_agg_raw_sketch); | |
308 | } | |
309 | else if (agg_kind == opentelemetry::sdk::metrics::AggregatorKind::Histogram) | |
310 | { | |
311 | # ifdef OPENTELEMETRY_RTTI_ENABLED | |
312 | std::shared_ptr<opentelemetry::sdk::metrics::HistogramAggregator<T>> | |
313 | temp_batch_agg_histogram = | |
314 | std::dynamic_pointer_cast<opentelemetry::sdk::metrics::HistogramAggregator<T>>( | |
315 | batch_agg); | |
316 | ||
317 | std::shared_ptr<opentelemetry::sdk::metrics::HistogramAggregator<T>> | |
318 | temp_record_agg_histogram = | |
319 | std::dynamic_pointer_cast<opentelemetry::sdk::metrics::HistogramAggregator<T>>( | |
320 | record_agg); | |
321 | # else | |
322 | std::shared_ptr<opentelemetry::sdk::metrics::HistogramAggregator<T>> | |
323 | temp_batch_agg_histogram = | |
324 | std::static_pointer_cast<opentelemetry::sdk::metrics::HistogramAggregator<T>>( | |
325 | batch_agg); | |
326 | ||
327 | std::shared_ptr<opentelemetry::sdk::metrics::HistogramAggregator<T>> | |
328 | temp_record_agg_histogram = | |
329 | std::static_pointer_cast<opentelemetry::sdk::metrics::HistogramAggregator<T>>( | |
330 | record_agg); | |
331 | # endif | |
332 | ||
333 | auto temp_batch_agg_raw_histogram = temp_batch_agg_histogram.get(); | |
334 | auto temp_record_agg_raw_histogram = temp_record_agg_histogram.get(); | |
335 | ||
336 | temp_batch_agg_raw_histogram->merge(*temp_record_agg_raw_histogram); | |
337 | } | |
338 | else if (agg_kind == opentelemetry::sdk::metrics::AggregatorKind::Exact) | |
339 | { | |
340 | # ifdef OPENTELEMETRY_RTTI_ENABLED | |
341 | std::shared_ptr<opentelemetry::sdk::metrics::ExactAggregator<T>> temp_batch_agg_exact = | |
342 | std::dynamic_pointer_cast<opentelemetry::sdk::metrics::ExactAggregator<T>>(batch_agg); | |
343 | ||
344 | std::shared_ptr<opentelemetry::sdk::metrics::ExactAggregator<T>> temp_record_agg_exact = | |
345 | std::dynamic_pointer_cast<opentelemetry::sdk::metrics::ExactAggregator<T>>(record_agg); | |
346 | # else | |
347 | std::shared_ptr<opentelemetry::sdk::metrics::ExactAggregator<T>> temp_batch_agg_exact = | |
348 | std::static_pointer_cast<opentelemetry::sdk::metrics::ExactAggregator<T>>(batch_agg); | |
349 | ||
350 | std::shared_ptr<opentelemetry::sdk::metrics::ExactAggregator<T>> temp_record_agg_exact = | |
351 | std::static_pointer_cast<opentelemetry::sdk::metrics::ExactAggregator<T>>(record_agg); | |
352 | # endif | |
353 | ||
354 | auto temp_batch_agg_raw_exact = temp_batch_agg_exact.get(); | |
355 | auto temp_record_agg_raw_exact = temp_record_agg_exact.get(); | |
356 | ||
357 | temp_batch_agg_raw_exact->merge(*temp_record_agg_raw_exact); | |
358 | } | |
359 | } | |
360 | }; | |
361 | } // namespace metrics | |
362 | } // namespace sdk | |
363 | ||
364 | OPENTELEMETRY_END_NAMESPACE | |
365 | #endif |