]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | // Copyright The OpenTelemetry Authors |
2 | // SPDX-License-Identifier: Apache-2.0 | |
3 | ||
4 | #pragma once | |
5 | #ifdef ENABLE_METRICS_PREVIEW | |
6 | ||
7 | # include <algorithm> | |
8 | # include <mutex> | |
9 | # include <stdexcept> | |
10 | # include <vector> | |
11 | # include "opentelemetry/_metrics/instrument.h" | |
12 | # include "opentelemetry/sdk/_metrics/aggregator/aggregator.h" | |
13 | # include "opentelemetry/version.h" | |
14 | ||
15 | OPENTELEMETRY_BEGIN_NAMESPACE | |
16 | namespace sdk | |
17 | { | |
18 | namespace metrics | |
19 | { | |
20 | ||
21 | template <class T> | |
22 | class HistogramAggregator final : public Aggregator<T> | |
23 | { | |
24 | ||
25 | public: | |
26 | /** | |
27 | * Constructor for the histogram aggregator. A sorted vector of boundaries is expected and | |
28 | * boundaries are doubles regardless of the aggregator's templated data type. | |
29 | * | |
30 | * Sum is stored in values_[0] | |
31 | * Count is stored in position_[1] | |
32 | */ | |
33 | HistogramAggregator(opentelemetry::metrics::InstrumentKind kind, std::vector<double> boundaries) | |
34 | { | |
35 | if (!std::is_sorted(boundaries.begin(), boundaries.end())) | |
36 | { | |
37 | # if __EXCEPTIONS | |
38 | throw std::invalid_argument("Histogram boundaries must be monotonic."); | |
39 | # else | |
40 | std::terminate(); | |
41 | # endif | |
42 | } | |
43 | this->kind_ = kind; | |
44 | this->agg_kind_ = AggregatorKind::Histogram; | |
45 | boundaries_ = boundaries; | |
46 | this->values_ = std::vector<T>(2, 0); | |
47 | this->checkpoint_ = std::vector<T>(2, 0); | |
48 | bucketCounts_ = std::vector<int>(boundaries_.size() + 1, 0); | |
49 | bucketCounts_ckpt_ = std::vector<int>(boundaries_.size() + 1, 0); | |
50 | } | |
51 | ||
52 | /** | |
53 | * Receives a captured value from the instrument and inserts it into the current histogram counts. | |
54 | * | |
55 | * Depending on the use case, a linear search or binary search based implementation may be | |
56 | * preferred. In uniformly distributed datasets, linear search outperforms binary search until 512 | |
57 | * buckets. However, if the distribution is strongly skewed right (for example server latency | |
58 | * where most values may be <10ms but the range is from 0 - 1000 ms), a linear search could be | |
59 | * superior even with more than 500 buckets as almost all values inserted would be at the | |
60 | * beginning of the boundaries array and thus found more quickly through linear search. | |
61 | * | |
62 | * @param val, the raw value used in aggregation | |
63 | * @return none | |
64 | */ | |
65 | void update(T val) override | |
66 | { | |
67 | this->mu_.lock(); | |
68 | this->updated_ = true; | |
69 | size_t bucketID = boundaries_.size(); | |
70 | for (size_t i = 0; i < boundaries_.size(); i++) | |
71 | { | |
72 | if (val < boundaries_[i]) // concurrent read is thread-safe | |
73 | { | |
74 | bucketID = i; | |
75 | break; | |
76 | } | |
77 | } | |
78 | ||
79 | // Alternate implementation with binary search | |
80 | // auto pos = std::lower_bound (boundaries_.begin(), boundaries_.end(), val); | |
81 | // bucketCounts_[pos-boundaries_.begin()] += 1; | |
82 | ||
83 | this->values_[0] += val; | |
84 | this->values_[1] += 1; | |
85 | bucketCounts_[bucketID] += 1; | |
86 | this->mu_.unlock(); | |
87 | } | |
88 | ||
89 | /** | |
90 | * Checkpoints the current value. This function will overwrite the current checkpoint with the | |
91 | * current value. | |
92 | * | |
93 | * @param none | |
94 | * @return none | |
95 | */ | |
96 | void checkpoint() override | |
97 | { | |
98 | this->mu_.lock(); | |
99 | this->updated_ = false; | |
100 | this->checkpoint_ = this->values_; | |
101 | this->values_[0] = 0; | |
102 | this->values_[1] = 0; | |
103 | bucketCounts_ckpt_ = bucketCounts_; | |
104 | std::fill(bucketCounts_.begin(), bucketCounts_.end(), 0); | |
105 | this->mu_.unlock(); | |
106 | } | |
107 | ||
108 | /** | |
109 | * Merges the values of two aggregators in a semantically accurate manner. | |
110 | * A histogram aggregator can only be merged with another histogram aggregator that has the same | |
111 | * boudnaries. A histogram merge first adds the sum and count values then iterates over the adds | |
112 | * the bucket counts element by element. | |
113 | * | |
114 | * @param other, the aggregator with merge with | |
115 | * @return none | |
116 | */ | |
117 | void merge(HistogramAggregator other) | |
118 | { | |
119 | this->mu_.lock(); | |
120 | ||
121 | // Ensure that incorrect types are not merged | |
122 | if (this->agg_kind_ != other.agg_kind_) | |
123 | { | |
124 | # if __EXCEPTIONS | |
125 | throw std::invalid_argument("Aggregators of different types cannot be merged."); | |
126 | # else | |
127 | std::terminate(); | |
128 | # endif | |
129 | // Reject histogram merges with differing boundary vectors | |
130 | } | |
131 | else if (other.boundaries_ != this->boundaries_) | |
132 | { | |
133 | # if __EXCEPTIONS | |
134 | throw std::invalid_argument("Histogram boundaries do not match."); | |
135 | # else | |
136 | std::terminate(); | |
137 | # endif | |
138 | } | |
139 | ||
140 | this->values_[0] += other.values_[0]; | |
141 | this->values_[1] += other.values_[1]; | |
142 | ||
143 | this->checkpoint_[0] += other.checkpoint_[0]; | |
144 | this->checkpoint_[1] += other.checkpoint_[1]; | |
145 | ||
146 | for (size_t i = 0; i < bucketCounts_.size(); i++) | |
147 | { | |
148 | bucketCounts_[i] += other.bucketCounts_[i]; | |
149 | bucketCounts_ckpt_[i] += other.bucketCounts_ckpt_[i]; | |
150 | } | |
151 | this->mu_.unlock(); | |
152 | } | |
153 | ||
154 | /** | |
155 | * Returns the checkpointed value | |
156 | * | |
157 | * @param none | |
158 | * @return the value of the checkpoint | |
159 | */ | |
160 | std::vector<T> get_checkpoint() override { return this->checkpoint_; } | |
161 | ||
162 | /** | |
163 | * Returns the current values | |
164 | * | |
165 | * @param none | |
166 | * @return the present aggregator values | |
167 | */ | |
168 | std::vector<T> get_values() override { return this->values_; } | |
169 | ||
170 | /** | |
171 | * Returns the bucket boundaries specified at this aggregator's creation. | |
172 | * | |
173 | * @param none | |
174 | * @return the aggregator boundaries | |
175 | */ | |
176 | virtual std::vector<double> get_boundaries() override { return boundaries_; } | |
177 | ||
178 | /** | |
179 | * Returns the current counts for each bucket . | |
180 | * | |
181 | * @param none | |
182 | * @return the aggregator bucket counts | |
183 | */ | |
184 | virtual std::vector<int> get_counts() override { return bucketCounts_ckpt_; } | |
185 | ||
186 | HistogramAggregator(const HistogramAggregator &cp) | |
187 | { | |
188 | this->values_ = cp.values_; | |
189 | this->checkpoint_ = cp.checkpoint_; | |
190 | this->kind_ = cp.kind_; | |
191 | this->agg_kind_ = cp.agg_kind_; | |
192 | boundaries_ = cp.boundaries_; | |
193 | bucketCounts_ = cp.bucketCounts_; | |
194 | bucketCounts_ckpt_ = cp.bucketCounts_ckpt_; | |
195 | // use default initialized mutex as they cannot be copied | |
196 | } | |
197 | ||
198 | private: | |
199 | std::vector<double> boundaries_; | |
200 | std::vector<int> bucketCounts_; | |
201 | std::vector<int> bucketCounts_ckpt_; | |
202 | }; | |
203 | ||
204 | } // namespace metrics | |
205 | } // namespace sdk | |
206 | OPENTELEMETRY_END_NAMESPACE | |
207 | #endif |