#include "messages/MCommandReply.h"
#include "messages/MPGStats.h"
#include "messages/MOSDScrub.h"
+#include "common/errno.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_mgr
msgr->start();
msgr->add_dispatcher_tail(this);
+ started_at = ceph_clock_now();
+
return 0;
}
if (peer_type == CEPH_ENTITY_TYPE_OSD) {
Mutex::Locker l(lock);
s->osd_id = atoi(s->entity_name.get_id().c_str());
- dout(10) << __func__ << " registering osd." << s->osd_id << " session "
+ dout(10) << "registering osd." << s->osd_id << " session "
<< s << " con " << con << dendl;
osd_cons[s->osd_id].insert(con);
}
}
session->put(); // SessionRef takes a ref
Mutex::Locker l(lock);
- dout(10) << __func__ << " unregistering osd." << session->osd_id
+ dout(10) << "unregistering osd." << session->osd_id
<< " session " << session << " con " << con << dendl;
osd_cons[session->osd_id].erase(con);
}
switch (m->get_type()) {
case MSG_PGSTATS:
cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
+ maybe_ready(m->get_source().num());
m->put();
return true;
case MSG_MGR_REPORT:
};
}
+void DaemonServer::maybe_ready(int32_t osd_id)
+{
+ if (!pgmap_ready && reported_osds.find(osd_id) == reported_osds.end()) {
+ dout(4) << "initial report from osd " << osd_id << dendl;
+ reported_osds.insert(osd_id);
+ std::set<int32_t> up_osds;
+
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ osdmap.get_up_osds(up_osds);
+ });
+
+ std::set<int32_t> unreported_osds;
+ std::set_difference(up_osds.begin(), up_osds.end(),
+ reported_osds.begin(), reported_osds.end(),
+ std::inserter(unreported_osds, unreported_osds.begin()));
+
+ if (unreported_osds.size() == 0) {
+ dout(4) << "all osds have reported, sending PG state to mon" << dendl;
+ pgmap_ready = true;
+ reported_osds.clear();
+ // Avoid waiting for next tick
+ send_report();
+ } else {
+ dout(4) << "still waiting for " << unreported_osds.size() << " osds"
+ " to report in before PGMap is ready" << dendl;
+ }
+ }
+}
+
void DaemonServer::shutdown()
{
- dout(10) << __func__ << dendl;
+ dout(10) << "begin" << dendl;
msgr->shutdown();
msgr->wait();
- dout(10) << __func__ << " done" << dendl;
+ dout(10) << "done" << dendl;
}
bool DaemonServer::handle_open(MMgrOpen *m)
{
- uint32_t type = m->get_connection()->get_peer_type();
- DaemonKey key(type, m->daemon_name);
+ DaemonKey key;
+ if (!m->service_name.empty()) {
+ key.first = m->service_name;
+ } else {
+ key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
+ }
+ key.second = m->daemon_name;
- dout(4) << "from " << m->get_connection() << " name "
- << ceph_entity_type_name(type) << "." << m->daemon_name << dendl;
+ dout(4) << "from " << m->get_connection() << " " << key << dendl;
auto configure = new MMgrConfigure();
- if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT) {
- // We don't want clients to send us stats
- configure->stats_period = 0;
- } else {
- configure->stats_period = g_conf->mgr_stats_period;
- }
+ configure->stats_period = g_conf->mgr_stats_period;
m->get_connection()->send_message(configure);
if (daemon_state.exists(key)) {
daemon_state.get(key)->perf_counters.clear();
}
+ if (m->service_daemon) {
+ DaemonStatePtr daemon;
+ if (daemon_state.exists(key)) {
+ daemon = daemon_state.get(key);
+ } else {
+ dout(4) << "constructing new DaemonState for " << key << dendl;
+ daemon = std::make_shared<DaemonState>(daemon_state.types);
+ daemon->key = key;
+ if (m->daemon_metadata.count("hostname")) {
+ daemon->hostname = m->daemon_metadata["hostname"];
+ }
+ daemon_state.insert(daemon);
+ }
+ daemon->service_daemon = true;
+ daemon->metadata = m->daemon_metadata;
+ daemon->service_status = m->daemon_status;
+
+ utime_t now = ceph_clock_now();
+ auto d = pending_service_map.get_daemon(m->service_name,
+ m->daemon_name);
+ if (d->gid != (uint64_t)m->get_source().num()) {
+ dout(10) << "registering " << key << " in pending_service_map" << dendl;
+ d->gid = m->get_source().num();
+ d->addr = m->get_source_addr();
+ d->start_epoch = pending_service_map.epoch;
+ d->start_stamp = now;
+ d->metadata = m->daemon_metadata;
+ pending_service_map_dirty = pending_service_map.epoch;
+ }
+ }
+
m->put();
return true;
}
bool DaemonServer::handle_report(MMgrReport *m)
{
- uint32_t type = m->get_connection()->get_peer_type();
- DaemonKey key(type, m->daemon_name);
+ DaemonKey key;
+ if (!m->service_name.empty()) {
+ key.first = m->service_name;
+ } else {
+ key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
+ }
+ key.second = m->daemon_name;
- dout(4) << "from " << m->get_connection() << " name "
- << ceph_entity_type_name(type) << "." << m->daemon_name << dendl;
+ dout(4) << "from " << m->get_connection() << " " << key << dendl;
- if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT) {
- // Clients should not be sending us stats
- dout(4) << "rejecting report from client " << m->daemon_name << dendl;
+ if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
+ m->service_name.empty()) {
+ // Clients should not be sending us stats unless they are declaring
+ // themselves to be a daemon for some service.
+ dout(4) << "rejecting report from non-daemon client " << m->daemon_name
+ << dendl;
m->put();
return true;
}
DaemonStatePtr daemon;
if (daemon_state.exists(key)) {
- dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
+ dout(20) << "updating existing DaemonState for " << key << dendl;
daemon = daemon_state.get(key);
} else {
- dout(4) << "constructing new DaemonState for " << m->daemon_name << dendl;
+ dout(4) << "constructing new DaemonState for " << key << dendl;
daemon = std::make_shared<DaemonState>(daemon_state.types);
// FIXME: crap, we don't know the hostname at this stage.
daemon->key = key;
daemon_state.insert(daemon);
- // FIXME: we should request metadata at this stage
+ // FIXME: we should avoid this case by rejecting MMgrReport from
+ // daemons without sessions, and ensuring that session open
+ // always contains metadata.
}
-
assert(daemon != nullptr);
auto &daemon_counters = daemon->perf_counters;
daemon_counters.update(m);
-
+
+ if (daemon->service_daemon) {
+ utime_t now = ceph_clock_now();
+ if (m->daemon_status) {
+ daemon->service_status = *m->daemon_status;
+ daemon->service_status_stamp = now;
+ }
+ daemon->last_service_beacon = now;
+ } else if (m->daemon_status) {
+ derr << "got status from non-daemon " << key << dendl;
+ }
+
m->put();
return true;
}
con->mark_disposable();
}
- dout(1) << "do_command r=" << r << " " << rs << dendl;
+ dout(1) << "handle_command " << cpp_strerror(r) << " " << rs << dendl;
if (con) {
MCommandReply *reply = new MCommandReply(r, rs);
reply->set_tid(m->get_tid());
<< "entity='" << session->entity_name << "' "
<< "cmd=" << m->cmd << ": dispatch";
+ // ----------------
+ // service map commands
+ if (prefix == "service dump") {
+ if (!f)
+ f.reset(Formatter::create("json-pretty"));
+ cluster_state.with_servicemap([&](const ServiceMap &service_map) {
+ f->dump_object("service_map", service_map);
+ });
+ f->flush(cmdctx->odata);
+ cmdctx->reply(0, ss);
+ return true;
+ }
+ if (prefix == "service status") {
+ if (!f)
+ f.reset(Formatter::create("json-pretty"));
+ // only include state from services that are in the persisted service map
+ f->open_object_section("service_status");
+ ServiceMap s;
+ cluster_state.with_servicemap([&](const ServiceMap& service_map) {
+ s = service_map;
+ });
+ for (auto& p : s.services) {
+ f->open_object_section(p.first.c_str());
+ for (auto& q : p.second.daemons) {
+ f->open_object_section(q.first.c_str());
+ DaemonKey key(p.first, q.first);
+ assert(daemon_state.exists(key));
+ auto daemon = daemon_state.get(key);
+ f->dump_stream("status_stamp") << daemon->service_status_stamp;
+ f->dump_stream("last_beacon") << daemon->last_service_beacon;
+ f->open_object_section("status");
+ for (auto& r : daemon->service_status) {
+ f->dump_string(r.first.c_str(), r.second);
+ }
+ f->close_section();
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ f->flush(cmdctx->odata);
+ cmdctx->reply(0, ss);
+ return true;
+ }
+
// -----------
// PG commands
get_str_vec(prefix, pvec);
set<int> osds;
- if (whostr == "*") {
+ if (whostr == "*" || whostr == "all" || whostr == "any") {
cluster_state.with_osdmap([&](const OSDMap& osdmap) {
for (int i = 0; i < osdmap.get_max_osd(); i++)
if (osdmap.is_up(i)) {
}
}
+void DaemonServer::_prune_pending_service_map()
+{
+ utime_t cutoff = ceph_clock_now();
+ cutoff -= g_conf->mgr_service_beacon_grace;
+ auto p = pending_service_map.services.begin();
+ while (p != pending_service_map.services.end()) {
+ auto q = p->second.daemons.begin();
+ while (q != p->second.daemons.end()) {
+ DaemonKey key(p->first, q->first);
+ if (!daemon_state.exists(key)) {
+ derr << "missing key " << key << dendl;
+ ++q;
+ continue;
+ }
+ auto daemon = daemon_state.get(key);
+ if (daemon->last_service_beacon == utime_t()) {
+ // we must have just restarted; assume they are alive now.
+ daemon->last_service_beacon = ceph_clock_now();
+ ++q;
+ continue;
+ }
+ if (daemon->last_service_beacon < cutoff) {
+ dout(10) << "pruning stale " << p->first << "." << q->first
+ << " last_beacon " << daemon->last_service_beacon << dendl;
+ q = p->second.daemons.erase(q);
+ pending_service_map_dirty = pending_service_map.epoch;
+ } else {
+ ++q;
+ }
+ }
+ if (p->second.daemons.empty()) {
+ p = pending_service_map.services.erase(p);
+ pending_service_map_dirty = pending_service_map.epoch;
+ } else {
+ ++p;
+ }
+ }
+}
+
void DaemonServer::send_report()
{
+ if (!pgmap_ready) {
+ if (ceph_clock_now() - started_at > g_conf->mgr_stats_period * 4.0) {
+ pgmap_ready = true;
+ reported_osds.clear();
+ dout(1) << "Giving up on OSDs that haven't reported yet, sending "
+ << "potentially incomplete PG state to mon" << dendl;
+ } else {
+ dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
+ << dendl;
+ return;
+ }
+ }
+
auto m = new MMonMgrReport();
cluster_state.with_pgmap([&](const PGMap& pg_map) {
cluster_state.update_delta_stats();
- // FIXME: reporting health detail here might be a bad idea?
+ if (pending_service_map.epoch) {
+ _prune_pending_service_map();
+ if (pending_service_map_dirty >= pending_service_map.epoch) {
+ pending_service_map.modified = ceph_clock_now();
+ ::encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
+ dout(10) << "sending service_map e" << pending_service_map.epoch
+ << dendl;
+ pending_service_map.epoch++;
+ }
+ }
+
cluster_state.with_osdmap([&](const OSDMap& osdmap) {
// FIXME: no easy way to get mon features here. this will do for
// now, though, as long as we don't make a backward-incompat change.
pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL);
dout(10) << pg_map << dendl;
- pg_map.get_health(g_ceph_context, osdmap,
- m->health_summary,
- &m->health_detail);
+
+ pg_map.get_health_checks(g_ceph_context, osdmap,
+ &m->health_checks);
+ dout(10) << m->health_checks.checks.size() << " health checks"
+ << dendl;
+ dout(20) << "health checks:\n";
+ JSONFormatter jf(true);
+ jf.dump_object("health_checks", m->health_checks);
+ jf.flush(*_dout);
+ *_dout << dendl;
});
});
// TODO? We currently do not notify the PyModules
// so, or the state is updated.
monc->send_mon_message(m);
}
+
+void DaemonServer::got_service_map()
+{
+ Mutex::Locker l(lock);
+
+ cluster_state.with_servicemap([&](const ServiceMap& service_map) {
+ if (pending_service_map.epoch == 0) {
+ // we just started up
+ dout(10) << "got initial map e" << service_map.epoch << dendl;
+ pending_service_map = service_map;
+ } else {
+ // we we already active and therefore must have persisted it,
+ // which means ours is the same or newer.
+ dout(10) << "got updated map e" << service_map.epoch << dendl;
+ }
+ pending_service_map.epoch = service_map.epoch + 1;
+ });
+
+ // cull missing daemons, populate new ones
+ for (auto& p : pending_service_map.services) {
+ std::set<std::string> names;
+ for (auto& q : p.second.daemons) {
+ names.insert(q.first);
+ DaemonKey key(p.first, q.first);
+ if (!daemon_state.exists(key)) {
+ auto daemon = std::make_shared<DaemonState>(daemon_state.types);
+ daemon->key = key;
+ daemon->metadata = q.second.metadata;
+ if (q.second.metadata.count("hostname")) {
+ daemon->hostname = q.second.metadata["hostname"];
+ }
+ daemon->service_daemon = true;
+ daemon_state.insert(daemon);
+ dout(10) << "added missing " << key << dendl;
+ }
+ }
+ daemon_state.cull(p.first, names);
+ }
+}