2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright (C) 2016 ScyllaDB
22 #include <seastar/core/prometheus.hh>
23 #include <google/protobuf/io/coded_stream.h>
24 #include <google/protobuf/io/zero_copy_stream_impl_lite.h>
25 #include "proto/metrics2.pb.h"
28 #include <seastar/core/scollectd_api.hh>
29 #include "core/scollectd-impl.hh"
30 #include <seastar/core/metrics_api.hh>
31 #include <seastar/http/function_handlers.hh>
32 #include <boost/algorithm/string/replace.hpp>
33 #include <boost/range/algorithm_ext/erase.hpp>
34 #include <boost/algorithm/string.hpp>
35 #include <boost/range/algorithm.hpp>
36 #include <boost/range/combine.hpp>
37 #include <seastar/core/thread.hh>
38 #include <seastar/core/loop.hh>
42 extern seastar::logger seastar_logger
;
44 namespace prometheus
{
45 namespace pm
= io::prometheus::client
;
47 namespace mi
= metrics::impl
;
50 * Taken from an answer in stackoverflow:
51 * http://stackoverflow.com/questions/2340730/are-there-c-equivalents-for-the-protocol-buffers-delimited-i-o-functions-in-ja
53 static bool write_delimited_to(const google::protobuf::MessageLite
& message
,
54 google::protobuf::io::ZeroCopyOutputStream
* rawOutput
) {
55 google::protobuf::io::CodedOutputStream
output(rawOutput
);
57 #if GOOGLE_PROTOBUF_VERSION >= 3004000
58 const size_t size
= message
.ByteSizeLong();
59 output
.WriteVarint64(size
);
61 const int size
= message
.ByteSize();
62 output
.WriteVarint32(size
);
65 uint8_t* buffer
= output
.GetDirectBufferForNBytesAndAdvance(size
);
66 if (buffer
!= nullptr) {
67 message
.SerializeWithCachedSizesToArray(buffer
);
69 message
.SerializeWithCachedSizes(&output
);
70 if (output
.HadError()) {
78 static pm::Metric
* add_label(pm::Metric
* mt
, const metrics::impl::metric_id
& id
, const config
& ctx
) {
79 mt
->mutable_label()->Reserve(id
.labels().size() + 1);
81 auto label
= mt
->add_label();
82 label
->set_name(ctx
.label
->key());
83 label
->set_value(ctx
.label
->value());
85 for (auto &&i
: id
.labels()) {
86 auto label
= mt
->add_label();
87 label
->set_name(i
.first
);
88 label
->set_value(i
.second
);
93 static void fill_metric(pm::MetricFamily
& mf
, const metrics::impl::metric_value
& c
,
94 const metrics::impl::metric_id
& id
, const config
& ctx
) {
96 case scollectd::data_type::DERIVE
:
97 add_label(mf
.add_metric(), id
, ctx
)->mutable_counter()->set_value(c
.i());
98 mf
.set_type(pm::MetricType::COUNTER
);
100 case scollectd::data_type::GAUGE
:
101 add_label(mf
.add_metric(), id
, ctx
)->mutable_gauge()->set_value(c
.d());
102 mf
.set_type(pm::MetricType::GAUGE
);
104 case scollectd::data_type::HISTOGRAM
:
106 auto h
= c
.get_histogram();
107 auto mh
= add_label(mf
.add_metric(), id
,ctx
)->mutable_histogram();
108 mh
->set_sample_count(h
.sample_count
);
109 mh
->set_sample_sum(h
.sample_sum
);
110 for (auto b
: h
.buckets
) {
111 auto bc
= mh
->add_bucket();
112 bc
->set_cumulative_count(b
.count
);
113 bc
->set_upper_bound(b
.upper_bound
);
115 mf
.set_type(pm::MetricType::HISTOGRAM
);
119 add_label(mf
.add_metric(), id
, ctx
)->mutable_counter()->set_value(c
.ui());
120 mf
.set_type(pm::MetricType::COUNTER
);
125 static std::string
to_str(seastar::metrics::impl::data_type dt
) {
127 case seastar::metrics::impl::data_type::GAUGE
:
129 case seastar::metrics::impl::data_type::COUNTER
:
131 case seastar::metrics::impl::data_type::HISTOGRAM
:
133 case seastar::metrics::impl::data_type::DERIVE
:
134 // Prometheus server does not respect derive parameters
135 // So we report them as counter
143 static std::string
to_str(const seastar::metrics::impl::metric_value
& v
) {
145 case seastar::metrics::impl::data_type::GAUGE
:
146 return std::to_string(v
.d());
147 case seastar::metrics::impl::data_type::COUNTER
:
148 return std::to_string(v
.i());
149 case seastar::metrics::impl::data_type::DERIVE
:
150 return std::to_string(v
.ui());
154 return ""; // we should never get here but it makes the compiler happy
157 static void add_name(std::ostream
& s
, const sstring
& name
, const std::map
<sstring
, sstring
>& labels
, const config
& ctx
) {
159 const char* delimiter
= "";
161 s
<< ctx
.label
->key() << "=\"" << ctx
.label
->value() << '"';
165 if (!labels
.empty()) {
166 for (auto l
: labels
) {
168 s
<< l
.first
<< "=\"" << l
.second
<< '"';
177 * \brief iterator for metric family
179 * In prometheus, a single shard collecct all the data from the other
180 * shards and report it.
182 * Each shard returns a value_copy struct that has a vector of vector values (a vector per metric family)
183 * and a vector of metadata (and insdie it a vector of metric metadata)
185 * The metrics are sorted by the metric family name.
187 * In prometheus, all the metrics that belongs to the same metric family are reported together.
189 * For efficiency the results from the metrics layer are kept in a vector.
191 * So we have a vector of shards of a vector of metric families of a vector of values.
193 * To produce the result, we use the metric_family_iterator that is created by metric_family_range.
195 * When iterating over the metrics we use two helper structure.
197 * 1. A map between metric family name and the total number of values (combine on all shards) and
198 * pointer to the metric family metadata.
199 * 2. A vector of positions to the current metric family for each shard.
201 * The metric_family_range returns a metric_family_iterator that goes over all the families.
203 * The iterator returns a metric_family object, that can report the metric_family name, the size (how many
204 * metrics in total belongs to the metric family) and a a foreach_metric method.
206 * The foreach_metric method can be used to perform an action on each of the metric that belongs to
209 * Iterating over the metrics is done:
210 * - go over each of the shard and each of the entry in the position vector:
211 * - if the current family (the metric family that we get from the shard and position) has the current name:
212 * - iterate over each of the metrics belong to that metric family:
214 * for example, if m is a metric_family_range
216 * for (auto&& i : m) {
217 * std::cout << i.name() << std::endl;
218 * i.foreach_metric([](const mi::metric_value& value, const mi::metric_info& value_info) {
219 * std::cout << value_info.id.labels().size() <<std::cout;
223 * Will print all the metric family names followed by the number of labels each metric has.
225 class metric_family_iterator
;
227 class metric_family_range
;
229 class metrics_families_per_shard
{
230 using metrics_family_per_shard_data_container
= std::vector
<foreign_ptr
<mi::values_reference
>>;
231 metrics_family_per_shard_data_container _data
;
232 using comp_function
= std::function
<bool(const sstring
&, const mi::metric_family_metadata
&)>;
234 * \brief find the last item in a range of metric family based on a comparator function
237 metric_family_iterator
find_bound(const sstring
& family_name
, comp_function comp
) const;
241 using const_iterator
= metrics_family_per_shard_data_container::const_iterator
;
242 using iterator
= metrics_family_per_shard_data_container::iterator
;
243 using reference
= metrics_family_per_shard_data_container::reference
;
244 using const_reference
= metrics_family_per_shard_data_container::const_reference
;
247 * \brief find the first item following a metric family range.
248 * metric family are sorted, this will return the first item that is outside
251 metric_family_iterator
upper_bound(const sstring
& family_name
) const;
254 * \brief find the first item in a range of metric family.
255 * metric family are sorted, the first item, is the first to match the
258 metric_family_iterator
lower_bound(const sstring
& family_name
) const;
261 * \defgroup Variables Global variables
265 * @defgroup Vector properties
266 * The following methods making metrics_families_per_shard act as
267 * a vector of foreign_ptr<mi::values_reference>
273 return _data
.begin();
280 const_iterator
begin() const {
281 return _data
.begin();
284 const_iterator
end() const {
288 void resize(size_t new_size
) {
289 _data
.resize(new_size
);
292 reference
& operator[](size_t n
) {
296 const_reference
& operator[](size_t n
) const {
302 static future
<> get_map_value(metrics_families_per_shard
& vec
) {
303 vec
.resize(smp::count
);
304 return parallel_for_each(boost::irange(0u, smp::count
), [&vec
] (auto cpu
) {
305 return smp::submit_to(cpu
, [] {
306 return mi::get_values();
307 }).then([&vec
, cpu
] (auto res
) {
308 vec
[cpu
] = std::move(res
);
315 * \brief a facade class for metric family
317 class metric_family
{
318 const sstring
* _name
= nullptr;
320 const mi::metric_family_info
* _family_info
= nullptr;
321 metric_family_iterator
& _iterator_state
;
322 metric_family(metric_family_iterator
& state
) : _iterator_state(state
) {
324 metric_family(const sstring
* name
, uint32_t size
, const mi::metric_family_info
* family_info
, metric_family_iterator
& state
) :
325 _name(name
), _size(size
), _family_info(family_info
), _iterator_state(state
) {
327 metric_family(const metric_family
& info
, metric_family_iterator
& state
) :
328 metric_family(info
._name
, info
._size
, info
._family_info
, state
) {
331 metric_family(const metric_family
&) = delete;
332 metric_family(metric_family
&&) = delete;
334 const sstring
& name() const {
338 const uint32_t size() const {
342 const mi::metric_family_info
& metadata() const {
343 return *_family_info
;
346 void foreach_metric(std::function
<void(const mi::metric_value
&, const mi::metric_info
&)>&& f
);
349 return !_name
|| !_family_info
;
351 friend class metric_family_iterator
;
354 class metric_family_iterator
{
355 const metrics_families_per_shard
& _families
;
356 std::vector
<size_t> _positions
;
360 if (_positions
.empty()) {
363 const sstring
*new_name
= nullptr;
364 const mi::metric_family_info
* new_family_info
= nullptr;
366 for (auto&& i
: boost::combine(_positions
, _families
)) {
367 auto& pos_in_metric_per_shard
= boost::get
<0>(i
);
368 auto& metric_family
= boost::get
<1>(i
);
369 if (_info
._name
&& pos_in_metric_per_shard
< metric_family
->metadata
->size() &&
370 metric_family
->metadata
->at(pos_in_metric_per_shard
).mf
.name
.compare(*_info
._name
) <= 0) {
371 pos_in_metric_per_shard
++;
373 if (pos_in_metric_per_shard
>= metric_family
->metadata
->size()) {
374 // no more metric family in this shard
377 auto& metadata
= metric_family
->metadata
->at(pos_in_metric_per_shard
);
378 int cmp
= (!new_name
) ? -1 : metadata
.mf
.name
.compare(*new_name
);
380 new_name
= &metadata
.mf
.name
;
381 new_family_info
= &metadata
.mf
;
385 _info
._size
+= metadata
.metrics
.size();
388 _info
._name
= new_name
;
389 _info
._family_info
= new_family_info
;
393 metric_family_iterator() = delete;
394 metric_family_iterator(const metric_family_iterator
& o
) : _families(o
._families
), _positions(o
._positions
), _info(*this) {
398 metric_family_iterator(metric_family_iterator
&& o
) : _families(o
._families
), _positions(std::move(o
._positions
)),
403 metric_family_iterator(const metrics_families_per_shard
& families
,
405 : _families(families
), _positions(shards
, 0), _info(*this) {
409 metric_family_iterator(const metrics_families_per_shard
& families
,
410 std::vector
<size_t>&& positions
)
411 : _families(families
), _positions(std::move(positions
)), _info(*this) {
415 metric_family_iterator
& operator++() {
420 metric_family_iterator
operator++(int) {
421 metric_family_iterator
previous(*this);
426 bool operator!=(const metric_family_iterator
& o
) const {
427 return !(*this == o
);
430 bool operator==(const metric_family_iterator
& o
) const {
437 return name() == o
.name();
440 metric_family
& operator*() {
444 metric_family
* operator->() {
447 const sstring
& name() const {
451 const uint32_t size() const {
455 const mi::metric_family_info
& metadata() const {
456 return *_info
._family_info
;
460 return _positions
.empty() || _info
.end();
463 void foreach_metric(std::function
<void(const mi::metric_value
&, const mi::metric_info
&)>&& f
) {
464 // iterating over the shard vector and the position vector
465 for (auto&& i
: boost::combine(_positions
, _families
)) {
466 auto& pos_in_metric_per_shard
= boost::get
<0>(i
);
467 auto& metric_family
= boost::get
<1>(i
);
468 if (pos_in_metric_per_shard
>= metric_family
->metadata
->size()) {
469 // no more metric family in this shard
472 auto& metadata
= metric_family
->metadata
->at(pos_in_metric_per_shard
);
473 // the the name is different, that means that on this shard, the metric family
474 // does not exist, because everything is sorted by metric family name, this is fine.
475 if (metadata
.mf
.name
== name()) {
476 const mi::value_vector
& values
= metric_family
->values
[pos_in_metric_per_shard
];
477 const mi::metric_metadata_vector
& metrics_metadata
= metadata
.metrics
;
478 for (auto&& vm
: boost::combine(values
, metrics_metadata
)) {
479 auto& value
= boost::get
<0>(vm
);
480 auto& metric_metadata
= boost::get
<1>(vm
);
481 f(value
, metric_metadata
);
489 void metric_family::foreach_metric(std::function
<void(const mi::metric_value
&, const mi::metric_info
&)>&& f
) {
490 _iterator_state
.foreach_metric(std::move(f
));
493 class metric_family_range
{
494 metric_family_iterator _begin
;
495 metric_family_iterator _end
;
497 metric_family_range(const metrics_families_per_shard
& families
) : _begin(families
, smp::count
),
498 _end(metric_family_iterator(families
, 0))
502 metric_family_range(const metric_family_iterator
& b
, const metric_family_iterator
& e
) : _begin(b
), _end(e
)
506 metric_family_iterator
begin() const {
510 metric_family_iterator
end() const {
515 metric_family_iterator
metrics_families_per_shard::find_bound(const sstring
& family_name
, comp_function comp
) const {
516 std::vector
<size_t> positions
;
517 positions
.reserve(smp::count
);
519 for (auto& shard_info
: _data
) {
520 std::vector
<mi::metric_family_metadata
>& metadata
= *(shard_info
->metadata
);
521 std::vector
<mi::metric_family_metadata
>::iterator it_b
= boost::range::upper_bound(metadata
, family_name
, comp
);
522 positions
.emplace_back(it_b
- metadata
.begin());
524 return metric_family_iterator(*this, std::move(positions
));
528 metric_family_iterator
metrics_families_per_shard::lower_bound(const sstring
& family_name
) const {
529 return find_bound(family_name
, [](const sstring
& a
, const mi::metric_family_metadata
& b
) {
530 //sstring doesn't have a <= operator
531 return a
< b
.mf
.name
|| a
== b
.mf
.name
;
535 metric_family_iterator
metrics_families_per_shard::upper_bound(const sstring
& family_name
) const {
536 return find_bound(family_name
, [](const sstring
& a
, const mi::metric_family_metadata
& b
) {
537 return a
< b
.mf
.name
;
542 * \brief a helper function to get metric family range
543 * if metric_family_name is empty will return everything, if not, it will return
544 * the range of metric family that match the metric_family_name.
546 * if prefix is true the match will be based on prefix
548 metric_family_range
get_range(const metrics_families_per_shard
& mf
, const sstring
& metric_family_name
, bool prefix
) {
549 if (metric_family_name
== "") {
550 return metric_family_range(mf
);
552 auto upper_bount_prefix
= metric_family_name
;
553 ++upper_bount_prefix
.back();
555 return metric_family_range(mf
.lower_bound(metric_family_name
), mf
.lower_bound(upper_bount_prefix
));
557 auto lb
= mf
.lower_bound(metric_family_name
);
558 if (lb
.end() || lb
->name() != metric_family_name
) {
559 return metric_family_range(lb
, lb
); // just return an empty range
563 return metric_family_range(lb
, up
);
567 future
<> write_text_representation(output_stream
<char>& out
, const config
& ctx
, const metric_family_range
& m
) {
568 return seastar::async([&ctx
, &out
, &m
] () mutable {
570 for (metric_family
& metric_family
: m
) {
571 auto name
= ctx
.prefix
+ "_" + metric_family
.name();
573 metric_family
.foreach_metric([&out
, &ctx
, &found
, &name
, &metric_family
](auto value
, auto value_info
) mutable {
576 if (metric_family
.metadata().d
.str() != "") {
577 s
<< "# HELP " << name
<< " " << metric_family
.metadata().d
.str() << "\n";
579 s
<< "# TYPE " << name
<< " " << to_str(metric_family
.metadata().type
) << "\n";
582 if (value
.type() == mi::data_type::HISTOGRAM
) {
583 auto&& h
= value
.get_histogram();
584 std::map
<sstring
, sstring
> labels
= value_info
.id
.labels();
585 add_name(s
, name
+ "_sum", labels
, ctx
);
588 add_name(s
, name
+ "_count", labels
, ctx
);
592 auto& le
= labels
["le"];
593 auto bucket
= name
+ "_bucket";
594 for (auto i
: h
.buckets
) {
595 le
= std::to_string(i
.upper_bound
);
596 add_name(s
, bucket
, labels
, ctx
);
600 labels
["le"] = "+Inf";
601 add_name(s
, bucket
, labels
, ctx
);
605 add_name(s
, name
, value_info
.id
.labels(), ctx
);
609 out
.write(s
.str()).get();
610 thread::maybe_yield();
616 future
<> write_protobuf_representation(output_stream
<char>& out
, const config
& ctx
, metric_family_range
& m
) {
617 return do_for_each(m
, [&ctx
, &out
](metric_family
& metric_family
) mutable {
619 google::protobuf::io::StringOutputStream
os(&s
);
621 auto& name
= metric_family
.name();
622 pm::MetricFamily mtf
;
624 mtf
.set_name(ctx
.prefix
+ "_" + name
);
625 mtf
.mutable_metric()->Reserve(metric_family
.size());
626 metric_family
.foreach_metric([&mtf
, &ctx
](auto value
, auto value_info
) {
627 fill_metric(mtf
, value
, value_info
.id
, ctx
);
629 if (!write_delimited_to(mtf
, &os
)) {
630 seastar_logger
.warn("Failed to write protobuf metrics");
636 bool is_accept_text(const std::string
& accept
) {
637 std::vector
<std::string
> strs
;
638 boost::split(strs
, accept
, boost::is_any_of(","));
639 for (auto i
: strs
) {
641 if (boost::starts_with(i
, "application/vnd.google.protobuf;")) {
648 class metrics_handler
: public handler_base
{
653 * \brief tries to trim an asterisk from the end of the string
654 * return true if an asterisk exists.
656 bool trim_asterisk(sstring
& name
) {
657 if (name
.size() && name
.back() == '*') {
658 name
.resize(name
.length() - 1);
661 // Prometheus uses url encoding for the path so '*' is encoded as '%2A'
662 if (boost::algorithm::ends_with(name
, "%2A")) {
663 // This assert is obviously true. It is in here just to
664 // silence a bogus gcc warning:
665 // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=89337
666 assert(name
.length() >= 3);
667 name
.resize(name
.length() - 3);
673 metrics_handler(config ctx
) : _ctx(ctx
) {}
675 future
<std::unique_ptr
<httpd::reply
>> handle(const sstring
& path
,
676 std::unique_ptr
<httpd::request
> req
, std::unique_ptr
<httpd::reply
> rep
) override
{
677 auto text
= is_accept_text(req
->get_header("Accept"));
678 sstring metric_family_name
= req
->get_query_param("name");
679 bool prefix
= trim_asterisk(metric_family_name
);
681 rep
->write_body((text
) ? "txt" : "proto", [this, text
, metric_family_name
, prefix
] (output_stream
<char>&& s
) {
682 return do_with(metrics_families_per_shard(), output_stream
<char>(std::move(s
)),
683 [this, text
, prefix
, &metric_family_name
] (metrics_families_per_shard
& families
, output_stream
<char>& s
) mutable {
684 return get_map_value(families
).then([&s
, &families
, this, text
, prefix
, &metric_family_name
]() mutable {
685 return do_with(get_range(families
, metric_family_name
, prefix
),
686 [&s
, this, text
](metric_family_range
& m
) {
687 return (text
) ? write_text_representation(s
, _ctx
, m
) :
688 write_protobuf_representation(s
, _ctx
, m
);
690 }).finally([&s
] () mutable {
695 return make_ready_future
<std::unique_ptr
<httpd::reply
>>(std::move(rep
));
701 future
<> add_prometheus_routes(http_server
& server
, config ctx
) {
702 server
._routes
.put(GET
, "/metrics", new metrics_handler(ctx
));
703 return make_ready_future
<>();
706 future
<> add_prometheus_routes(distributed
<http_server
>& server
, config ctx
) {
707 return server
.invoke_on_all([ctx
](http_server
& s
) {
708 return add_prometheus_routes(s
, ctx
);
712 future
<> start(httpd::http_server_control
& http_server
, config ctx
) {
713 return add_prometheus_routes(http_server
.server(), ctx
);