command_table.clear();
timer.shutdown();
- session.reset();
+ if (session) {
+ session->con->mark_down();
+ session.reset();
+ }
}
bool MgrClient::ms_dispatch(Message *m)
if (last_connect_attempt != utime_t()) {
utime_t now = ceph_clock_now();
utime_t when = last_connect_attempt;
- when += cct->_conf->mgr_connect_retry_interval;
+ when += cct->_conf->get_val<double>("mgr_connect_retry_interval");
if (now < when) {
if (!connect_retry_callback) {
- connect_retry_callback = new FunctionContext([this](int r){
- connect_retry_callback = nullptr;
- reconnect();
- });
- timer.add_event_at(when, connect_retry_callback);
+ connect_retry_callback = timer.add_event_at(
+ when,
+ new FunctionContext([this](int r){
+ connect_retry_callback = nullptr;
+ reconnect();
+ }));
}
ldout(cct, 4) << "waiting to retry connect until " << when << dendl;
return;
session.reset(new MgrSessionState());
session->con = msgr->get_connection(inst);
+ if (service_daemon) {
+ daemon_dirty_status = true;
+ }
+
// Don't send an open if we're just a client (i.e. doing
// command-sending, not stats etc)
- if (g_conf && !g_conf->name.is_client()) {
- auto open = new MMgrOpen();
- open->daemon_name = g_conf->name.get_id();
- session->con->send_message(open);
+ if (!cct->_conf->name.is_client() || service_daemon) {
+ _send_open();
}
// resend any pending commands
}
}
+void MgrClient::_send_open()
+{
+ if (session && session->con) {
+ auto open = new MMgrOpen();
+ if (!service_name.empty()) {
+ open->service_name = service_name;
+ open->daemon_name = daemon_name;
+ } else {
+ open->daemon_name = cct->_conf->name.get_id();
+ }
+ if (service_daemon) {
+ open->service_daemon = service_daemon;
+ open->daemon_metadata = daemon_metadata;
+ }
+ session->con->send_message(open);
+ }
+}
+
bool MgrClient::handle_mgr_map(MMgrMap *m)
{
assert(lock.is_locked_by_me());
return false;
}
-void MgrClient::send_report()
+void MgrClient::_send_stats()
+{
+ _send_report();
+ _send_pgstats();
+ if (stats_period != 0) {
+ report_callback = timer.add_event_after(
+ stats_period,
+ new FunctionContext([this](int) {
+ _send_stats();
+ }));
+ }
+}
+
+void MgrClient::_send_report()
{
assert(lock.is_locked_by_me());
assert(session);
pcc->with_counters([this, report](
const PerfCountersCollection::CounterMap &by_path)
{
+ // Helper for checking whether a counter should be included
+ auto include_counter = [this](
+ const PerfCounters::perf_counter_data_any_d &ctr,
+ const PerfCounters &perf_counters)
+ {
+ return perf_counters.get_adjusted_priority(ctr.prio) >= (int)stats_threshold;
+ };
+
+ // Helper for cases where we want to forget a counter
+ auto undeclare = [report, this](const std::string &path)
+ {
+ report->undeclare_types.push_back(path);
+ ldout(cct,20) << " undeclare " << path << dendl;
+ session->declared.erase(path);
+ };
+
ENCODE_START(1, 1, report->packed);
+
+ // Find counters that no longer exist, and undeclare them
for (auto p = session->declared.begin(); p != session->declared.end(); ) {
- if (by_path.count(*p) == 0) {
- report->undeclare_types.push_back(*p);
- ldout(cct,20) << __func__ << " undeclare " << *p << dendl;
- p = session->declared.erase(p);
- } else {
- ++p;
+ const auto &path = *(p++);
+ if (by_path.count(path) == 0) {
+ undeclare(path);
}
}
+
for (const auto &i : by_path) {
auto& path = i.first;
- auto& data = *(i.second);
+ auto& data = *(i.second.data);
+ auto& perf_counters = *(i.second.perf_counters);
+
+ // Find counters that still exist, but are no longer permitted by
+ // stats_threshold
+ if (!include_counter(data, perf_counters)) {
+ if (session->declared.count(path)) {
+ undeclare(path);
+ }
+ continue;
+ }
if (session->declared.count(path) == 0) {
- ldout(cct,20) << __func__ << " declare " << path << dendl;
+ ldout(cct,20) << " declare " << path << dendl;
PerfCounterType type;
type.path = path;
if (data.description) {
type.nick = data.nick;
}
type.type = data.type;
+ type.priority = perf_counters.get_adjusted_priority(data.prio);
+ type.unit = data.unit;
report->declare_types.push_back(std::move(type));
session->declared.insert(path);
}
- ::encode(static_cast<uint64_t>(data.u64.read()), report->packed);
+ ::encode(static_cast<uint64_t>(data.u64), report->packed);
if (data.type & PERFCOUNTER_LONGRUNAVG) {
- ::encode(static_cast<uint64_t>(data.avgcount.read()), report->packed);
- ::encode(static_cast<uint64_t>(data.avgcount2.read()), report->packed);
+ ::encode(static_cast<uint64_t>(data.avgcount), report->packed);
+ ::encode(static_cast<uint64_t>(data.avgcount2), report->packed);
}
}
ENCODE_FINISH(report->packed);
- ldout(cct, 20) << by_path.size() << " counters, of which "
- << report->declare_types.size() << " new" << dendl;
+ ldout(cct, 20) << "sending " << session->declared.size() << " counters ("
+ "of possible " << by_path.size() << "), "
+ << report->declare_types.size() << " new, "
+ << report->undeclare_types.size() << " removed"
+ << dendl;
});
ldout(cct, 20) << "encoded " << report->packed.length() << " bytes" << dendl;
- report->daemon_name = g_conf->name.get_id();
+ if (daemon_name.size()) {
+ report->daemon_name = daemon_name;
+ } else {
+ report->daemon_name = cct->_conf->name.get_id();
+ }
+ report->service_name = service_name;
+
+ if (daemon_dirty_status) {
+ report->daemon_status = daemon_status;
+ daemon_dirty_status = false;
+ }
+ report->osd_health_metrics = std::move(osd_health_metrics);
session->con->send_message(report);
+}
- if (stats_period != 0) {
- report_callback = new FunctionContext([this](int r){send_report();});
- timer.add_event_after(stats_period, report_callback);
- }
+void MgrClient::send_pgstats()
+{
+ Mutex::Locker l(lock);
+ _send_pgstats();
+}
- if (pgstats_cb) {
- MPGStats *m_stats = pgstats_cb();
- session->con->send_message(m_stats);
+void MgrClient::_send_pgstats()
+{
+ if (pgstats_cb && session) {
+ session->con->send_message(pgstats_cb());
}
}
ldout(cct, 4) << "stats_period=" << m->stats_period << dendl;
+ if (stats_threshold != m->stats_threshold) {
+ ldout(cct, 4) << "updated stats threshold: " << m->stats_threshold << dendl;
+ stats_threshold = m->stats_threshold;
+ }
+
bool starting = (stats_period == 0) && (m->stats_period != 0);
stats_period = m->stats_period;
if (starting) {
- send_report();
+ _send_stats();
}
m->put();
return true;
}
+int MgrClient::service_daemon_register(
+ const std::string& service,
+ const std::string& name,
+ const std::map<std::string,std::string>& metadata)
+{
+ Mutex::Locker l(lock);
+ if (name == "osd" ||
+ name == "mds" ||
+ name == "client" ||
+ name == "mon" ||
+ name == "mgr") {
+ // normal ceph entity types are not allowed!
+ return -EINVAL;
+ }
+ if (service_daemon) {
+ return -EEXIST;
+ }
+ ldout(cct,1) << service << "." << name << " metadata " << metadata << dendl;
+ service_daemon = true;
+ service_name = service;
+ daemon_name = name;
+ daemon_metadata = metadata;
+ daemon_dirty_status = true;
+
+ // late register?
+ if (cct->_conf->name.is_client() && session && session->con) {
+ _send_open();
+ }
+
+ return 0;
+}
+
+int MgrClient::service_daemon_update_status(
+ const std::map<std::string,std::string>& status)
+{
+ Mutex::Locker l(lock);
+ ldout(cct,10) << status << dendl;
+ daemon_status = status;
+ daemon_dirty_status = true;
+ return 0;
+}
+
+void MgrClient::update_osd_health(std::vector<OSDHealthMetric>&& metrics)
+{
+ Mutex::Locker l(lock);
+ osd_health_metrics = std::move(metrics);
+}