2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright (C) 2016 ScyllaDB
22 #include <seastar/core/prometheus.hh>
23 #include <google/protobuf/io/coded_stream.h>
24 #include <google/protobuf/io/zero_copy_stream_impl_lite.h>
25 #include "proto/metrics2.pb.h"
28 #include <seastar/core/scollectd_api.hh>
29 #include "core/scollectd-impl.hh"
30 #include <seastar/core/metrics_api.hh>
31 #include <seastar/http/function_handlers.hh>
32 #include <boost/algorithm/string/replace.hpp>
33 #include <boost/range/algorithm_ext/erase.hpp>
34 #include <boost/algorithm/string.hpp>
35 #include <boost/range/algorithm.hpp>
36 #include <boost/range/combine.hpp>
37 #include <seastar/core/thread.hh>
41 namespace prometheus
{
42 namespace pm
= io::prometheus::client
;
44 namespace mi
= metrics::impl
;
47 * Taken from an answer in stackoverflow:
48 * http://stackoverflow.com/questions/2340730/are-there-c-equivalents-for-the-protocol-buffers-delimited-i-o-functions-in-ja
50 static bool write_delimited_to(const google::protobuf::MessageLite
& message
,
51 google::protobuf::io::ZeroCopyOutputStream
* rawOutput
) {
52 google::protobuf::io::CodedOutputStream
output(rawOutput
);
54 const int size
= message
.ByteSize();
55 output
.WriteVarint32(size
);
57 uint8_t* buffer
= output
.GetDirectBufferForNBytesAndAdvance(size
);
58 if (buffer
!= nullptr) {
59 message
.SerializeWithCachedSizesToArray(buffer
);
61 message
.SerializeWithCachedSizes(&output
);
62 if (output
.HadError()) {
70 static pm::Metric
* add_label(pm::Metric
* mt
, const metrics::impl::metric_id
& id
, const config
& ctx
) {
71 mt
->mutable_label()->Reserve(id
.labels().size() + 1);
73 auto label
= mt
->add_label();
74 label
->set_name(ctx
.label
->key());
75 label
->set_value(ctx
.label
->value());
77 for (auto &&i
: id
.labels()) {
78 auto label
= mt
->add_label();
79 label
->set_name(i
.first
);
80 label
->set_value(i
.second
);
85 static void fill_metric(pm::MetricFamily
& mf
, const metrics::impl::metric_value
& c
,
86 const metrics::impl::metric_id
& id
, const config
& ctx
) {
88 case scollectd::data_type::DERIVE
:
89 add_label(mf
.add_metric(), id
, ctx
)->mutable_counter()->set_value(c
.i());
90 mf
.set_type(pm::MetricType::COUNTER
);
92 case scollectd::data_type::GAUGE
:
93 add_label(mf
.add_metric(), id
, ctx
)->mutable_gauge()->set_value(c
.d());
94 mf
.set_type(pm::MetricType::GAUGE
);
96 case scollectd::data_type::HISTOGRAM
:
98 auto h
= c
.get_histogram();
99 auto mh
= add_label(mf
.add_metric(), id
,ctx
)->mutable_histogram();
100 mh
->set_sample_count(h
.sample_count
);
101 mh
->set_sample_sum(h
.sample_sum
);
102 for (auto b
: h
.buckets
) {
103 auto bc
= mh
->add_bucket();
104 bc
->set_cumulative_count(b
.count
);
105 bc
->set_upper_bound(b
.upper_bound
);
107 mf
.set_type(pm::MetricType::HISTOGRAM
);
111 add_label(mf
.add_metric(), id
, ctx
)->mutable_counter()->set_value(c
.ui());
112 mf
.set_type(pm::MetricType::COUNTER
);
117 static std::string
to_str(seastar::metrics::impl::data_type dt
) {
119 case seastar::metrics::impl::data_type::GAUGE
:
121 case seastar::metrics::impl::data_type::COUNTER
:
123 case seastar::metrics::impl::data_type::HISTOGRAM
:
125 case seastar::metrics::impl::data_type::DERIVE
:
126 // Prometheus server does not respect derive parameters
127 // So we report them as counter
135 static std::string
to_str(const seastar::metrics::impl::metric_value
& v
) {
137 case seastar::metrics::impl::data_type::GAUGE
:
138 return std::to_string(v
.d());
139 case seastar::metrics::impl::data_type::COUNTER
:
140 return std::to_string(v
.i());
141 case seastar::metrics::impl::data_type::DERIVE
:
142 return std::to_string(v
.ui());
146 return ""; // we should never get here but it makes the compiler happy
149 static void add_name(std::ostream
& s
, const sstring
& name
, const std::map
<sstring
, sstring
>& labels
, const config
& ctx
) {
151 const char* delimiter
= "";
153 s
<< ctx
.label
->key() << "=\"" << ctx
.label
->value() << '"';
157 if (!labels
.empty()) {
158 for (auto l
: labels
) {
160 s
<< l
.first
<< "=\"" << l
.second
<< '"';
169 * \brief iterator for metric family
171 * In prometheus, a single shard collecct all the data from the other
172 * shards and report it.
174 * Each shard returns a value_copy struct that has a vector of vector values (a vector per metric family)
175 * and a vector of metadata (and insdie it a vector of metric metadata)
177 * The metrics are sorted by the metric family name.
179 * In prometheus, all the metrics that belongs to the same metric family are reported together.
181 * For efficiency the results from the metrics layer are kept in a vector.
183 * So we have a vector of shards of a vector of metric families of a vector of values.
185 * To produce the result, we use the metric_family_iterator that is created by metric_family_range.
187 * When iterating over the metrics we use two helper structure.
189 * 1. A map between metric family name and the total number of values (combine on all shards) and
190 * pointer to the metric family metadata.
191 * 2. A vector of positions to the current metric family for each shard.
193 * The metric_family_range returns a metric_family_iterator that goes over all the families.
195 * The iterator returns a metric_family object, that can report the metric_family name, the size (how many
196 * metrics in total belongs to the metric family) and a a foreach_metric method.
198 * The foreach_metric method can be used to perform an action on each of the metric that belongs to
201 * Iterating over the metrics is done:
202 * - go over each of the shard and each of the entry in the position vector:
203 * - if the current family (the metric family that we get from the shard and position) has the current name:
204 * - iterate over each of the metrics belong to that metric family:
206 * for example, if m is a metric_family_range
208 * for (auto&& i : m) {
209 * std::cout << i.name() << std::endl;
210 * i.foreach_metric([](const mi::metric_value& value, const mi::metric_info& value_info) {
211 * std::cout << value_info.id.labels().size() <<std::cout;
215 * Will print all the metric family names followed by the number of labels each metric has.
217 class metric_family_iterator
;
219 class metric_family_range
;
221 class metrics_families_per_shard
{
222 using metrics_family_per_shard_data_container
= std::vector
<foreign_ptr
<mi::values_reference
>>;
223 metrics_family_per_shard_data_container _data
;
224 using comp_function
= std::function
<bool(const sstring
&, const mi::metric_family_metadata
&)>;
226 * \brief find the last item in a range of metric family based on a comparator function
229 metric_family_iterator
find_bound(const sstring
& family_name
, comp_function comp
) const;
233 using const_iterator
= metrics_family_per_shard_data_container::const_iterator
;
234 using iterator
= metrics_family_per_shard_data_container::iterator
;
235 using reference
= metrics_family_per_shard_data_container::reference
;
236 using const_reference
= metrics_family_per_shard_data_container::const_reference
;
239 * \brief find the first item following a metric family range.
240 * metric family are sorted, this will return the first item that is outside
243 metric_family_iterator
upper_bound(const sstring
& family_name
) const;
246 * \brief find the first item in a range of metric family.
247 * metric family are sorted, the first item, is the first to match the
250 metric_family_iterator
lower_bound(const sstring
& family_name
) const;
253 * \defgroup Variables Global variables
257 * @defgroup Vector properties
258 * The following methods making metrics_families_per_shard act as
259 * a vector of foreign_ptr<mi::values_reference>
265 return _data
.begin();
272 const_iterator
begin() const {
273 return _data
.begin();
276 const_iterator
end() const {
280 void resize(size_t new_size
) {
281 _data
.resize(new_size
);
284 reference
& operator[](size_t n
) {
288 const_reference
& operator[](size_t n
) const {
294 static future
<> get_map_value(metrics_families_per_shard
& vec
) {
295 vec
.resize(smp::count
);
296 return parallel_for_each(boost::irange(0u, smp::count
), [&vec
] (auto cpu
) {
297 return smp::submit_to(cpu
, [] {
298 return mi::get_values();
299 }).then([&vec
, cpu
] (auto res
) {
300 vec
[cpu
] = std::move(res
);
307 * \brief a facade class for metric family
309 class metric_family
{
310 const sstring
* _name
= nullptr;
312 const mi::metric_family_info
* _family_info
= nullptr;
313 metric_family_iterator
& _iterator_state
;
314 metric_family(metric_family_iterator
& state
) : _iterator_state(state
) {
316 metric_family(const sstring
* name
, uint32_t size
, const mi::metric_family_info
* family_info
, metric_family_iterator
& state
) :
317 _name(name
), _size(size
), _family_info(family_info
), _iterator_state(state
) {
319 metric_family(const metric_family
& info
, metric_family_iterator
& state
) :
320 metric_family(info
._name
, info
._size
, info
._family_info
, state
) {
323 metric_family(const metric_family
&) = delete;
324 metric_family(metric_family
&&) = delete;
326 const sstring
& name() const {
330 const uint32_t size() const {
334 const mi::metric_family_info
& metadata() const {
335 return *_family_info
;
338 void foreach_metric(std::function
<void(const mi::metric_value
&, const mi::metric_info
&)>&& f
);
341 return !_name
|| !_family_info
;
343 friend class metric_family_iterator
;
346 class metric_family_iterator
{
347 const metrics_families_per_shard
& _families
;
348 std::vector
<size_t> _positions
;
352 if (_positions
.empty()) {
355 const sstring
*new_name
= nullptr;
356 const mi::metric_family_info
* new_family_info
= nullptr;
358 for (auto&& i
: boost::combine(_positions
, _families
)) {
359 auto& pos_in_metric_per_shard
= boost::get
<0>(i
);
360 auto& metric_family
= boost::get
<1>(i
);
361 if (_info
._name
&& pos_in_metric_per_shard
< metric_family
->metadata
->size() &&
362 metric_family
->metadata
->at(pos_in_metric_per_shard
).mf
.name
.compare(*_info
._name
) <= 0) {
363 pos_in_metric_per_shard
++;
365 if (pos_in_metric_per_shard
>= metric_family
->metadata
->size()) {
366 // no more metric family in this shard
369 auto& metadata
= metric_family
->metadata
->at(pos_in_metric_per_shard
);
370 int cmp
= (!new_name
) ? -1 : metadata
.mf
.name
.compare(*new_name
);
372 new_name
= &metadata
.mf
.name
;
373 new_family_info
= &metadata
.mf
;
377 _info
._size
+= metadata
.metrics
.size();
380 _info
._name
= new_name
;
381 _info
._family_info
= new_family_info
;
385 metric_family_iterator() = delete;
386 metric_family_iterator(const metric_family_iterator
& o
) : _families(o
._families
), _positions(o
._positions
), _info(*this) {
390 metric_family_iterator(metric_family_iterator
&& o
) : _families(o
._families
), _positions(std::move(o
._positions
)),
395 metric_family_iterator(const metrics_families_per_shard
& families
,
397 : _families(families
), _positions(shards
, 0), _info(*this) {
401 metric_family_iterator(const metrics_families_per_shard
& families
,
402 std::vector
<size_t>&& positions
)
403 : _families(families
), _positions(std::move(positions
)), _info(*this) {
407 metric_family_iterator
& operator++() {
412 metric_family_iterator
operator++(int) {
413 metric_family_iterator
previous(*this);
418 bool operator!=(const metric_family_iterator
& o
) const {
419 return !(*this == o
);
422 bool operator==(const metric_family_iterator
& o
) const {
429 return name() == o
.name();
432 metric_family
& operator*() {
436 metric_family
* operator->() {
439 const sstring
& name() const {
443 const uint32_t size() const {
447 const mi::metric_family_info
& metadata() const {
448 return *_info
._family_info
;
452 return _positions
.empty() || _info
.end();
455 void foreach_metric(std::function
<void(const mi::metric_value
&, const mi::metric_info
&)>&& f
) {
456 // iterating over the shard vector and the position vector
457 for (auto&& i
: boost::combine(_positions
, _families
)) {
458 auto& pos_in_metric_per_shard
= boost::get
<0>(i
);
459 auto& metric_family
= boost::get
<1>(i
);
460 if (pos_in_metric_per_shard
>= metric_family
->metadata
->size()) {
461 // no more metric family in this shard
464 auto& metadata
= metric_family
->metadata
->at(pos_in_metric_per_shard
);
465 // the the name is different, that means that on this shard, the metric family
466 // does not exist, because everything is sorted by metric family name, this is fine.
467 if (metadata
.mf
.name
== name()) {
468 const mi::value_vector
& values
= metric_family
->values
[pos_in_metric_per_shard
];
469 const mi::metric_metadata_vector
& metrics_metadata
= metadata
.metrics
;
470 for (auto&& vm
: boost::combine(values
, metrics_metadata
)) {
471 auto& value
= boost::get
<0>(vm
);
472 auto& metric_metadata
= boost::get
<1>(vm
);
473 f(value
, metric_metadata
);
481 void metric_family::foreach_metric(std::function
<void(const mi::metric_value
&, const mi::metric_info
&)>&& f
) {
482 _iterator_state
.foreach_metric(std::move(f
));
485 class metric_family_range
{
486 metric_family_iterator _begin
;
487 metric_family_iterator _end
;
489 metric_family_range(const metrics_families_per_shard
& families
) : _begin(families
, smp::count
),
490 _end(metric_family_iterator(families
, 0))
494 metric_family_range(const metric_family_iterator
& b
, const metric_family_iterator
& e
) : _begin(b
), _end(e
)
498 metric_family_iterator
begin() const {
502 metric_family_iterator
end() const {
507 metric_family_iterator
metrics_families_per_shard::find_bound(const sstring
& family_name
, comp_function comp
) const {
508 std::vector
<size_t> positions
;
509 positions
.reserve(smp::count
);
511 for (auto& shard_info
: _data
) {
512 std::vector
<mi::metric_family_metadata
>& metadata
= *(shard_info
->metadata
);
513 std::vector
<mi::metric_family_metadata
>::iterator it_b
= boost::range::upper_bound(metadata
, family_name
, comp
);
514 positions
.emplace_back(it_b
- metadata
.begin());
516 return metric_family_iterator(*this, std::move(positions
));
520 metric_family_iterator
metrics_families_per_shard::lower_bound(const sstring
& family_name
) const {
521 return find_bound(family_name
, [](const sstring
& a
, const mi::metric_family_metadata
& b
) {
522 //sstring doesn't have a <= operator
523 return a
< b
.mf
.name
|| a
== b
.mf
.name
;
527 metric_family_iterator
metrics_families_per_shard::upper_bound(const sstring
& family_name
) const {
528 return find_bound(family_name
, [](const sstring
& a
, const mi::metric_family_metadata
& b
) {
529 return a
< b
.mf
.name
;
534 * \brief a helper function to get metric family range
535 * if metric_family_name is empty will return everything, if not, it will return
536 * the range of metric family that match the metric_family_name.
538 * if prefix is true the match will be based on prefix
540 metric_family_range
get_range(const metrics_families_per_shard
& mf
, const sstring
& metric_family_name
, bool prefix
) {
541 if (metric_family_name
== "") {
542 return metric_family_range(mf
);
544 auto upper_bount_prefix
= metric_family_name
;
545 ++upper_bount_prefix
.back();
547 return metric_family_range(mf
.lower_bound(metric_family_name
), mf
.lower_bound(upper_bount_prefix
));
549 auto lb
= mf
.lower_bound(metric_family_name
);
550 if (lb
.end() || lb
->name() != metric_family_name
) {
551 return metric_family_range(lb
, lb
); // just return an empty range
555 return metric_family_range(lb
, up
);
559 future
<> write_text_representation(output_stream
<char>& out
, const config
& ctx
, const metric_family_range
& m
) {
560 return seastar::async([&ctx
, &out
, &m
] () mutable {
562 for (metric_family
& metric_family
: m
) {
563 auto name
= ctx
.prefix
+ "_" + metric_family
.name();
565 metric_family
.foreach_metric([&out
, &ctx
, &found
, &name
, &metric_family
](auto value
, auto value_info
) mutable {
568 if (metric_family
.metadata().d
.str() != "") {
569 s
<< "# HELP " << name
<< " " << metric_family
.metadata().d
.str() << "\n";
571 s
<< "# TYPE " << name
<< " " << to_str(metric_family
.metadata().type
) << "\n";
574 if (value
.type() == mi::data_type::HISTOGRAM
) {
575 auto&& h
= value
.get_histogram();
576 std::map
<sstring
, sstring
> labels
= value_info
.id
.labels();
577 add_name(s
, name
+ "_sum", labels
, ctx
);
580 add_name(s
, name
+ "_count", labels
, ctx
);
584 auto& le
= labels
["le"];
585 auto bucket
= name
+ "_bucket";
586 for (auto i
: h
.buckets
) {
587 le
= std::to_string(i
.upper_bound
);
588 add_name(s
, bucket
, labels
, ctx
);
592 labels
["le"] = "+Inf";
593 add_name(s
, bucket
, labels
, ctx
);
597 add_name(s
, name
, value_info
.id
.labels(), ctx
);
601 out
.write(s
.str()).get();
602 thread::maybe_yield();
608 future
<> write_protobuf_representation(output_stream
<char>& out
, const config
& ctx
, metric_family_range
& m
) {
609 return do_for_each(m
, [&ctx
, &out
](metric_family
& metric_family
) mutable {
611 google::protobuf::io::StringOutputStream
os(&s
);
613 auto& name
= metric_family
.name();
614 pm::MetricFamily mtf
;
616 mtf
.set_name(ctx
.prefix
+ "_" + name
);
617 mtf
.mutable_metric()->Reserve(metric_family
.size());
618 metric_family
.foreach_metric([&mtf
, &ctx
](auto value
, auto value_info
) {
619 fill_metric(mtf
, value
, value_info
.id
, ctx
);
621 if (!write_delimited_to(mtf
, &os
)) {
622 seastar_logger
.warn("Failed to write protobuf metrics");
628 bool is_accept_text(const std::string
& accept
) {
629 std::vector
<std::string
> strs
;
630 boost::split(strs
, accept
, boost::is_any_of(","));
631 for (auto i
: strs
) {
633 if (boost::starts_with(i
, "application/vnd.google.protobuf;")) {
640 class metrics_handler
: public handler_base
{
645 * \brief tries to trim an asterisk from the end of the string
646 * return true if an asterisk exists.
648 bool trim_asterisk(sstring
& name
) {
649 if (name
.size() && name
.back() == '*') {
650 name
.resize(name
.length() - 1);
653 // Prometheus uses url encoding for the path so '*' is encoded as '%2A'
654 if (boost::algorithm::ends_with(name
, "%2A")) {
655 // This assert is obviously true. It is in here just to
656 // silence a bogus gcc warning:
657 // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=89337
658 assert(name
.length() >= 3);
659 name
.resize(name
.length() - 3);
665 metrics_handler(config ctx
) : _ctx(ctx
) {}
667 future
<std::unique_ptr
<httpd::reply
>> handle(const sstring
& path
,
668 std::unique_ptr
<httpd::request
> req
, std::unique_ptr
<httpd::reply
> rep
) override
{
669 auto text
= is_accept_text(req
->get_header("Accept"));
670 sstring metric_family_name
= req
->get_query_param("name");
671 bool prefix
= trim_asterisk(metric_family_name
);
673 rep
->write_body((text
) ? "txt" : "proto", [this, text
, metric_family_name
, prefix
] (output_stream
<char>&& s
) {
674 return do_with(metrics_families_per_shard(), output_stream
<char>(std::move(s
)),
675 [this, text
, prefix
, &metric_family_name
] (metrics_families_per_shard
& families
, output_stream
<char>& s
) mutable {
676 return get_map_value(families
).then([&s
, &families
, this, text
, prefix
, &metric_family_name
]() mutable {
677 return do_with(get_range(families
, metric_family_name
, prefix
),
678 [&s
, this, text
](metric_family_range
& m
) {
679 return (text
) ? write_text_representation(s
, _ctx
, m
) :
680 write_protobuf_representation(s
, _ctx
, m
);
682 }).finally([&s
] () mutable {
687 return make_ready_future
<std::unique_ptr
<httpd::reply
>>(std::move(rep
));
693 future
<> add_prometheus_routes(http_server
& server
, config ctx
) {
694 server
._routes
.put(GET
, "/metrics", new metrics_handler(ctx
));
695 return make_ready_future
<>();
698 future
<> add_prometheus_routes(distributed
<http_server
>& server
, config ctx
) {
699 return server
.invoke_on_all([ctx
](http_server
& s
) {
700 return add_prometheus_routes(s
, ctx
);
704 future
<> start(httpd::http_server_control
& http_server
, config ctx
) {
705 return add_prometheus_routes(http_server
.server(), ctx
);