#include <boost/range/combine.hpp>
#include <seastar/core/thread.hh>
#include <seastar/core/loop.hh>
+#include <regex>
namespace seastar {
case seastar::metrics::impl::data_type::GAUGE:
return "gauge";
case seastar::metrics::impl::data_type::COUNTER:
+ case seastar::metrics::impl::data_type::REAL_COUNTER:
return "counter";
case seastar::metrics::impl::data_type::HISTOGRAM:
return "histogram";
- case seastar::metrics::impl::data_type::DERIVE:
- case seastar::metrics::impl::data_type::ABSOLUTE:
- // Prometheus server does not respect derive/absolute parameters
- // So we report them as counter
- return "counter";
+ case seastar::metrics::impl::data_type::SUMMARY:
+ return "summary";
}
return "untyped";
}
static std::string to_str(const seastar::metrics::impl::metric_value& v) {
switch (v.type()) {
case seastar::metrics::impl::data_type::GAUGE:
+ case seastar::metrics::impl::data_type::REAL_COUNTER:
return std::to_string(v.d());
case seastar::metrics::impl::data_type::COUNTER:
- case seastar::metrics::impl::data_type::ABSOLUTE:
- return std::to_string(v.ui());
- case seastar::metrics::impl::data_type::DERIVE:
return std::to_string(v.i());
case seastar::metrics::impl::data_type::HISTOGRAM:
+ case seastar::metrics::impl::data_type::SUMMARY:
break;
}
return ""; // we should never get here but it makes the compiler happy
}
-future<> write_text_representation(output_stream<char>& out, const config& ctx, const metric_family_range& m) {
- return seastar::async([&ctx, &out, &m] () mutable {
+void write_histogram(std::stringstream& s, const config& ctx, const sstring& name, const seastar::metrics::histogram& h, std::map<sstring, sstring> labels) noexcept {
+ add_name(s, name + "_sum", labels, ctx);
+ s << h.sample_sum << '\n';
+
+ add_name(s, name + "_count", labels, ctx);
+ s << h.sample_count << '\n';
+
+ auto& le = labels["le"];
+ auto bucket = name + "_bucket";
+ for (auto i : h.buckets) {
+ le = std::to_string(i.upper_bound);
+ add_name(s, bucket, labels, ctx);
+ s << i.count << '\n';
+ }
+ labels["le"] = "+Inf";
+ add_name(s, bucket, labels, ctx);
+ s << h.sample_count << '\n';
+}
+
+void write_summary(std::stringstream& s, const config& ctx, const sstring& name, const seastar::metrics::histogram& h, std::map<sstring, sstring> labels) noexcept {
+ if (h.sample_sum) {
+ add_name(s, name + "_sum", labels, ctx);
+ s << h.sample_sum << '\n';
+ }
+ if (h.sample_count) {
+ add_name(s, name + "_count", labels, ctx);
+ s << h.sample_count << '\n';
+ }
+ auto& le = labels["quantile"];
+ for (auto i : h.buckets) {
+ le = std::to_string(i.upper_bound);
+ add_name(s, name, labels, ctx);
+ s << i.count << '\n';
+ }
+}
+/*!
+ * \brief a helper class to aggregate metrics over labels
+ *
+ * This class sum multiple metrics based on a list of labels.
+ * It returns one or more metrics each aggregated by the aggregate_by labels.
+ *
+ * To use it, you define what labels it should aggregate by and then pass to
+ * it metrics with their labels.
+ * For example if a metrics has a 'shard' and 'name' labels and you aggregate by 'shard'
+ * it would return a map of metrics each with only the 'name' label
+ *
+ */
+class metric_aggregate_by_labels {
+ std::vector<std::string> _labels_to_aggregate_by;
+ std::unordered_map<std::map<sstring, sstring>, seastar::metrics::impl::metric_value> _values;
+public:
+ metric_aggregate_by_labels(std::vector<std::string> labels) : _labels_to_aggregate_by(std::move(labels)) {
+ }
+ /*!
+ * \brief add a metric
+ *
+ * This method gets a metric and its labels and adds it to the aggregated metric.
+ * For example, if a metric has the labels {'shard':'0', 'name':'myhist'} and we are aggregating
+ * over 'shard'
+ * The metric would be added to the aggregated metric with labels {'name':'myhist'}.
+ *
+ */
+ void add(const seastar::metrics::impl::metric_value& m, std::map<sstring, sstring> labels) noexcept {
+ for (auto&& l : _labels_to_aggregate_by) {
+ labels.erase(l);
+ }
+ std::unordered_map<std::map<sstring, sstring>, seastar::metrics::impl::metric_value>::iterator i = _values.find(labels);
+ if ( i == _values.end()) {
+ _values.emplace(std::move(labels), m);
+ } else {
+ i->second += m;
+ }
+ }
+ const std::unordered_map<std::map<sstring, sstring>, seastar::metrics::impl::metric_value>& get_values() const noexcept {
+ return _values;
+ }
+ bool empty() const noexcept {
+ return _values.empty();
+ }
+};
+
+std::string get_value_as_string(std::stringstream& s, const mi::metric_value& value) noexcept {
+ std::string value_str;
+ try {
+ value_str = to_str(value);
+ } catch (const std::range_error& e) {
+ seastar_logger.debug("prometheus: get_value_as_string: {}: {}", s.str(), e.what());
+ value_str = "NaN";
+ } catch (...) {
+ auto ex = std::current_exception();
+ // print this error as it's ignored later on by `connection::start_response`
+ seastar_logger.error("prometheus: get_value_as_string: {}: {}", s.str(), ex);
+ std::rethrow_exception(std::move(ex));
+ }
+ return value_str;
+}
+
+future<> write_text_representation(output_stream<char>& out, const config& ctx, const metric_family_range& m, bool show_help, std::function<bool(const mi::labels_type&)> filter) {
+ return seastar::async([&ctx, &out, &m, show_help, filter] () mutable {
bool found = false;
+ std::stringstream s;
for (metric_family& metric_family : m) {
auto name = ctx.prefix + "_" + metric_family.name();
found = false;
- metric_family.foreach_metric([&out, &ctx, &found, &name, &metric_family](auto value, auto value_info) mutable {
- std::stringstream s;
+ metric_aggregate_by_labels aggregated_values(metric_family.metadata().aggregate_labels);
+ bool should_aggregate = !metric_family.metadata().aggregate_labels.empty();
+ metric_family.foreach_metric([&s, &out, &ctx, &found, &name, &metric_family, &aggregated_values, should_aggregate, show_help, &filter](auto value, auto value_info) mutable {
+ s.clear();
+ s.str("");
+ if ((value_info.should_skip_when_empty && value.is_empty()) || !filter(value_info.id.labels())) {
+ return;
+ }
if (!found) {
- if (metric_family.metadata().d.str() != "") {
- s << "# HELP " << name << " " << metric_family.metadata().d.str() << "\n";
+ if (show_help && metric_family.metadata().d.str() != "") {
+ s << "# HELP " << name << " " << metric_family.metadata().d.str() << '\n';
}
- s << "# TYPE " << name << " " << to_str(metric_family.metadata().type) << "\n";
+ s << "# TYPE " << name << " " << to_str(metric_family.metadata().type) << '\n';
found = true;
}
- if (value.type() == mi::data_type::HISTOGRAM) {
- auto&& h = value.get_histogram();
- std::map<sstring, sstring> labels = value_info.id.labels();
- add_name(s, name + "_sum", labels, ctx);
- s << h.sample_sum;
- s << "\n";
- add_name(s, name + "_count", labels, ctx);
- s << h.sample_count;
- s << "\n";
-
- auto& le = labels["le"];
- auto bucket = name + "_bucket";
- for (auto i : h.buckets) {
- le = std::to_string(i.upper_bound);
- add_name(s, bucket, labels, ctx);
- s << i.count;
- s << "\n";
- }
- labels["le"] = "+Inf";
- add_name(s, bucket, labels, ctx);
- s << h.sample_count;
- s << "\n";
+ if (should_aggregate) {
+ aggregated_values.add(value, value_info.id.labels());
+ } else if (value.type() == mi::data_type::SUMMARY) {
+ write_summary(s, ctx, name, value.get_histogram(), value_info.id.labels());
+ } else if (value.type() == mi::data_type::HISTOGRAM) {
+ write_histogram(s, ctx, name, value.get_histogram(), value_info.id.labels());
} else {
add_name(s, name, value_info.id.labels(), ctx);
- std::string value_str;
- try {
- value_str = to_str(value);
- } catch (const std::range_error& e) {
- seastar_logger.debug("prometheus: write_text_representation: {}: {}", s.str(), e.what());
- value_str = "NaN";
- } catch (...) {
- auto ex = std::current_exception();
- // print this error as it's ignored later on by `connection::start_response`
- seastar_logger.error("prometheus: write_text_representation: {}: {}", s.str(), ex);
- std::rethrow_exception(std::move(ex));
- }
- s << value_str;
- s << "\n";
+ s << get_value_as_string(s, value) << '\n';
}
out.write(s.str()).get();
thread::maybe_yield();
});
+ if (!aggregated_values.empty()) {
+ for (auto&& h : aggregated_values.get_values()) {
+ s.clear();
+ s.str("");
+ if (h.second.type() == mi::data_type::HISTOGRAM) {
+ write_histogram(s, ctx, name, h.second.get_histogram(), h.first);
+ } else {
+ add_name(s, name, h.first, ctx);
+ s << get_value_as_string(s, h.second) << '\n';
+ }
+ out.write(s.str()).get();
+ thread::maybe_yield();
+ }
+ }
}
});
}
class metrics_handler : public handler_base {
sstring _prefix;
config _ctx;
+ static std::function<bool(const mi::labels_type&)> _true_function;
/*!
* \brief tries to trim an asterisk from the end of the string
}
return false;
}
+ /*!
+ * \brief Return a filter function, based on the request
+ *
+ * A filter function filter what metrics should be included.
+ * It returns true if a metric should be included, or false otherwise.
+ * The filters are created from the request query parameters.
+ */
+ std::function<bool(const mi::labels_type&)> make_filter(const http::request& req) {
+ std::unordered_map<sstring, std::regex> matcher;
+ auto labels = mi::get_local_impl()->get_labels();
+ for (auto&& qp : req.query_parameters) {
+ if (labels.find(qp.first) != labels.end()) {
+ matcher.emplace(qp.first, std::regex(qp.second.c_str()));
+ }
+ }
+ return (matcher.empty()) ? _true_function : [matcher](const mi::labels_type& labels) {
+ for (auto&& m : matcher) {
+ auto l = labels.find(m.first);
+ if (!std::regex_match((l == labels.end())? "" : l->second.c_str(), m.second)) {
+ return false;
+ }
+ }
+ return true;
+ };
+ }
+
public:
metrics_handler(config ctx) : _ctx(ctx) {}
- future<std::unique_ptr<httpd::reply>> handle(const sstring& path,
- std::unique_ptr<httpd::request> req, std::unique_ptr<httpd::reply> rep) override {
- sstring metric_family_name = req->get_query_param("name");
+ future<std::unique_ptr<http::reply>> handle(const sstring& path,
+ std::unique_ptr<http::request> req, std::unique_ptr<http::reply> rep) override {
+ sstring metric_family_name = req->get_query_param("__name__");
bool prefix = trim_asterisk(metric_family_name);
+ bool show_help = req->get_query_param("__help__") != "false";
+ std::function<bool(const mi::labels_type&)> filter = make_filter(*req);
- rep->write_body("txt", [this, metric_family_name, prefix] (output_stream<char>&& s) {
+ rep->write_body("txt", [this, metric_family_name, prefix, show_help, filter] (output_stream<char>&& s) {
return do_with(metrics_families_per_shard(), output_stream<char>(std::move(s)),
- [this, prefix, &metric_family_name] (metrics_families_per_shard& families, output_stream<char>& s) mutable {
- return get_map_value(families).then([&s, &families, this, prefix, &metric_family_name]() mutable {
+ [this, prefix, &metric_family_name, show_help, filter] (metrics_families_per_shard& families, output_stream<char>& s) mutable {
+ return get_map_value(families).then([&s, &families, this, prefix, &metric_family_name, show_help, filter]() mutable {
return do_with(get_range(families, metric_family_name, prefix),
- [&s, this](metric_family_range& m) {
- return write_text_representation(s, _ctx, m);
+ [&s, this, show_help, filter](metric_family_range& m) {
+ return write_text_representation(s, _ctx, m, show_help, filter);
});
}).finally([&s] () mutable {
return s.close();
});
});
});
- return make_ready_future<std::unique_ptr<httpd::reply>>(std::move(rep));
+ return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
}
};
-
+std::function<bool(const mi::labels_type&)> metrics_handler::_true_function = [](const mi::labels_type&) {
+ return true;
+};
future<> add_prometheus_routes(http_server& server, config ctx) {
server._routes.put(GET, "/metrics", new metrics_handler(ctx));