*/
#include "DaemonServer.h"
+#include "mgr/Mgr.h"
+#include "include/stringify.h"
#include "include/str_list.h"
#include "auth/RotatingKeyRing.h"
#include "json_spirit/json_spirit_writer.h"
#include "mgr/mgr_commands.h"
+#include "mgr/OSDHealthMetricCollector.h"
#include "mon/MonCommand.h"
#include "messages/MMgrOpen.h"
Finisher &finisher_,
DaemonStateIndex &daemon_state_,
ClusterState &cluster_state_,
- PyModules &py_modules_,
+ PyModuleRegistry &py_modules_,
LogChannelRef clog_,
LogChannelRef audit_clog_)
: Dispatcher(g_ceph_context),
client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes",
- g_conf->mgr_client_bytes)),
+ g_conf->get_val<uint64_t>("mgr_client_bytes"))),
client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages",
- g_conf->mgr_client_messages)),
+ g_conf->get_val<uint64_t>("mgr_client_messages"))),
osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes",
- g_conf->mgr_osd_bytes)),
+ g_conf->get_val<uint64_t>("mgr_osd_bytes"))),
osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages",
- g_conf->mgr_osd_messages)),
+ g_conf->get_val<uint64_t>("mgr_osd_messages"))),
mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes",
- g_conf->mgr_mds_bytes)),
+ g_conf->get_val<uint64_t>("mgr_mds_bytes"))),
mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages",
- g_conf->mgr_mds_messages)),
+ g_conf->get_val<uint64_t>("mgr_mds_messages"))),
mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes",
- g_conf->mgr_mon_bytes)),
+ g_conf->get_val<uint64_t>("mgr_mon_bytes"))),
mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages",
- g_conf->mgr_mon_messages)),
+ g_conf->get_val<uint64_t>("mgr_mon_messages"))),
msgr(nullptr),
monc(monc_),
finisher(finisher_),
py_modules(py_modules_),
clog(clog_),
audit_clog(audit_clog_),
- auth_registry(g_ceph_context,
+ auth_cluster_registry(g_ceph_context,
g_conf->auth_supported.empty() ?
g_conf->auth_cluster_required :
g_conf->auth_supported),
- lock("DaemonServer")
-{}
+ auth_service_registry(g_ceph_context,
+ g_conf->auth_supported.empty() ?
+ g_conf->auth_service_required :
+ g_conf->auth_supported),
+ lock("DaemonServer"),
+ pgmap_ready(false)
+{
+ g_conf->add_observer(this);
+}
DaemonServer::~DaemonServer() {
delete msgr;
+ g_conf->remove_observer(this);
}
int DaemonServer::init(uint64_t gid, entity_addr_t client_addr)
bool& is_valid,
CryptoKey& session_key)
{
- auto handler = auth_registry.get_handler(protocol);
+ AuthAuthorizeHandler *handler = nullptr;
+ if (peer_type == CEPH_ENTITY_TYPE_OSD ||
+ peer_type == CEPH_ENTITY_TYPE_MON ||
+ peer_type == CEPH_ENTITY_TYPE_MDS ||
+ peer_type == CEPH_ENTITY_TYPE_MGR) {
+ handler = auth_cluster_registry.get_handler(protocol);
+ } else {
+ handler = auth_service_registry.get_handler(protocol);
+ }
if (!handler) {
dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl;
is_valid = false;
dout(10) << "unregistering osd." << session->osd_id
<< " session " << session << " con " << con << dendl;
osd_cons[session->osd_id].erase(con);
+
+ auto iter = daemon_connections.find(con);
+ if (iter != daemon_connections.end()) {
+ daemon_connections.erase(iter);
+ }
}
return false;
}
bool DaemonServer::ms_dispatch(Message *m)
{
- Mutex::Locker l(lock);
-
+ // Note that we do *not* take ::lock here, in order to avoid
+ // serializing all message handling. It's up to each handler
+ // to take whatever locks it needs.
switch (m->get_type()) {
case MSG_PGSTATS:
cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
void DaemonServer::maybe_ready(int32_t osd_id)
{
- if (!pgmap_ready && reported_osds.find(osd_id) == reported_osds.end()) {
- dout(4) << "initial report from osd " << osd_id << dendl;
- reported_osds.insert(osd_id);
- std::set<int32_t> up_osds;
+ if (pgmap_ready.load()) {
+ // Fast path: we don't need to take lock because pgmap_ready
+ // is already set
+ } else {
+ Mutex::Locker l(lock);
- cluster_state.with_osdmap([&](const OSDMap& osdmap) {
- osdmap.get_up_osds(up_osds);
- });
+ if (reported_osds.find(osd_id) == reported_osds.end()) {
+ dout(4) << "initial report from osd " << osd_id << dendl;
+ reported_osds.insert(osd_id);
+ std::set<int32_t> up_osds;
- std::set<int32_t> unreported_osds;
- std::set_difference(up_osds.begin(), up_osds.end(),
- reported_osds.begin(), reported_osds.end(),
- std::inserter(unreported_osds, unreported_osds.begin()));
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ osdmap.get_up_osds(up_osds);
+ });
- if (unreported_osds.size() == 0) {
- dout(4) << "all osds have reported, sending PG state to mon" << dendl;
- pgmap_ready = true;
- reported_osds.clear();
- // Avoid waiting for next tick
- send_report();
- } else {
- dout(4) << "still waiting for " << unreported_osds.size() << " osds"
- " to report in before PGMap is ready" << dendl;
+ std::set<int32_t> unreported_osds;
+ std::set_difference(up_osds.begin(), up_osds.end(),
+ reported_osds.begin(), reported_osds.end(),
+ std::inserter(unreported_osds, unreported_osds.begin()));
+
+ if (unreported_osds.size() == 0) {
+ dout(4) << "all osds have reported, sending PG state to mon" << dendl;
+ pgmap_ready = true;
+ reported_osds.clear();
+ // Avoid waiting for next tick
+ send_report();
+ } else {
+ dout(4) << "still waiting for " << unreported_osds.size() << " osds"
+ " to report in before PGMap is ready" << dendl;
+ }
}
}
}
bool DaemonServer::handle_open(MMgrOpen *m)
{
+ Mutex::Locker l(lock);
+
DaemonKey key;
if (!m->service_name.empty()) {
key.first = m->service_name;
dout(4) << "from " << m->get_connection() << " " << key << dendl;
- auto configure = new MMgrConfigure();
- configure->stats_period = g_conf->mgr_stats_period;
- m->get_connection()->send_message(configure);
+ _send_configure(m->get_connection());
DaemonStatePtr daemon;
if (daemon_state.exists(key)) {
if (daemon) {
dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
Mutex::Locker l(daemon->lock);
- daemon_state.get(key)->perf_counters.clear();
+ daemon->perf_counters.clear();
}
if (m->service_daemon) {
}
}
+ if (m->get_connection()->get_peer_type() != entity_name_t::TYPE_CLIENT &&
+ m->service_name.empty())
+ {
+ // Store in set of the daemon/service connections, i.e. those
+ // connections that require an update in the event of stats
+ // configuration changes.
+ daemon_connections.insert(m->get_connection());
+ }
+
m->put();
return true;
}
// themselves to be a daemon for some service.
dout(4) << "rejecting report from non-daemon client " << m->daemon_name
<< dendl;
+ m->get_connection()->mark_down();
m->put();
return true;
}
+ // Look up the DaemonState
DaemonStatePtr daemon;
if (daemon_state.exists(key)) {
dout(20) << "updating existing DaemonState for " << key << dendl;
daemon = daemon_state.get(key);
} else {
- dout(4) << "constructing new DaemonState for " << key << dendl;
- daemon = std::make_shared<DaemonState>(daemon_state.types);
- // FIXME: crap, we don't know the hostname at this stage.
- daemon->key = key;
- daemon_state.insert(daemon);
- // FIXME: we should avoid this case by rejecting MMgrReport from
- // daemons without sessions, and ensuring that session open
- // always contains metadata.
+ // we don't know the hostname at this stage, reject MMgrReport here.
+ dout(5) << "rejecting report from " << key << ", since we do not have its metadata now."
+ << dendl;
+
+ // issue metadata request in background
+ if (!daemon_state.is_updating(key) &&
+ (key.first == "osd" || key.first == "mds")) {
+
+ std::ostringstream oss;
+ auto c = new MetadataUpdate(daemon_state, key);
+ if (key.first == "osd") {
+ oss << "{\"prefix\": \"osd metadata\", \"id\": "
+ << key.second<< "}";
+
+ } else if (key.first == "mds") {
+ c->set_default("addr", stringify(m->get_source_addr()));
+ oss << "{\"prefix\": \"mds metadata\", \"who\": \""
+ << key.second << "\"}";
+
+ } else {
+ ceph_abort();
+ }
+
+ monc->start_mon_command({oss.str()}, {}, &c->outbl, &c->outs, c);
+ }
+
+ {
+ Mutex::Locker l(lock);
+ // kill session
+ MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
+ if (!session) {
+ return false;
+ }
+ m->get_connection()->mark_down();
+ session->put();
+
+ dout(10) << "unregistering osd." << session->osd_id
+ << " session " << session << " con " << m->get_connection() << dendl;
+
+ if (osd_cons.find(session->osd_id) != osd_cons.end()) {
+ osd_cons[session->osd_id].erase(m->get_connection());
+ }
+
+ auto iter = daemon_connections.find(m->get_connection());
+ if (iter != daemon_connections.end()) {
+ daemon_connections.erase(iter);
+ }
+ }
+
+ return false;
}
+
+ // Update the DaemonState
assert(daemon != nullptr);
- auto &daemon_counters = daemon->perf_counters;
{
Mutex::Locker l(daemon->lock);
+ auto &daemon_counters = daemon->perf_counters;
daemon_counters.update(m);
+
+ if (daemon->service_daemon) {
+ utime_t now = ceph_clock_now();
+ if (m->daemon_status) {
+ daemon->service_status = *m->daemon_status;
+ daemon->service_status_stamp = now;
+ }
+ daemon->last_service_beacon = now;
+ } else if (m->daemon_status) {
+ derr << "got status from non-daemon " << key << dendl;
+ }
+ if (m->get_connection()->peer_is_osd()) {
+ // only OSD sends health_checks to me now
+ daemon->osd_health_metrics = std::move(m->osd_health_metrics);
+ }
}
+
// if there are any schema updates, notify the python modules
if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
ostringstream oss;
py_modules.notify_all("perf_schema_update", oss.str());
}
- if (daemon->service_daemon) {
- utime_t now = ceph_clock_now();
- if (m->daemon_status) {
- daemon->service_status = *m->daemon_status;
- daemon->service_status_stamp = now;
- }
- daemon->last_service_beacon = now;
- } else if (m->daemon_status) {
- derr << "got status from non-daemon " << key << dendl;
- }
-
m->put();
return true;
}
bool DaemonServer::handle_command(MCommand *m)
{
+ Mutex::Locker l(lock);
int r = 0;
std::stringstream ss;
std::string prefix;
return true;
}
+ if (prefix == "config set") {
+ std::string key;
+ std::string val;
+ cmd_getval(cct, cmdctx->cmdmap, "key", key);
+ cmd_getval(cct, cmdctx->cmdmap, "value", val);
+ r = cct->_conf->set_val(key, val, true, &ss);
+ if (r == 0) {
+ cct->_conf->apply_changes(nullptr);
+ }
+ cmdctx->reply(0, ss);
+ return true;
+ }
+
// -----------
// PG commands
});
cmdctx->reply(r, "");
return true;
+ } else if (prefix == "osd safe-to-destroy") {
+ vector<string> ids;
+ cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
+ set<int> osds;
+ int r;
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ r = osdmap.parse_osd_id_list(ids, &osds, &ss);
+ });
+ if (!r && osds.empty()) {
+ ss << "must specify one or more OSDs";
+ r = -EINVAL;
+ }
+ if (r < 0) {
+ cmdctx->reply(r, ss);
+ return true;
+ }
+ set<int> active_osds, missing_stats, stored_pgs;
+ int affected_pgs = 0;
+ cluster_state.with_pgmap([&](const PGMap& pg_map) {
+ if (pg_map.num_pg_unknown > 0) {
+ ss << pg_map.num_pg_unknown << " pgs have unknown state; cannot draw"
+ << " any conclusions";
+ r = -EAGAIN;
+ return;
+ }
+ int num_active_clean = 0;
+ for (auto& p : pg_map.num_pg_by_state) {
+ unsigned want = PG_STATE_ACTIVE|PG_STATE_CLEAN;
+ if ((p.first & want) == want) {
+ num_active_clean += p.second;
+ }
+ }
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ for (auto osd : osds) {
+ if (!osdmap.exists(osd)) {
+ continue; // clearly safe to destroy
+ }
+ auto q = pg_map.num_pg_by_osd.find(osd);
+ if (q != pg_map.num_pg_by_osd.end()) {
+ if (q->second.acting > 0 || q->second.up > 0) {
+ active_osds.insert(osd);
+ affected_pgs += q->second.acting + q->second.up;
+ continue;
+ }
+ }
+ if (num_active_clean < pg_map.num_pg) {
+ // all pgs aren't active+clean; we need to be careful.
+ auto p = pg_map.osd_stat.find(osd);
+ if (p == pg_map.osd_stat.end()) {
+ missing_stats.insert(osd);
+ }
+ if (p->second.num_pgs > 0) {
+ stored_pgs.insert(osd);
+ }
+ }
+ }
+ });
+ });
+ if (!r && !active_osds.empty()) {
+ ss << "OSD(s) " << active_osds << " have " << affected_pgs
+ << " pgs currently mapped to them";
+ r = -EBUSY;
+ } else if (!missing_stats.empty()) {
+ ss << "OSD(s) " << missing_stats << " have no reported stats, and not all"
+ << " PGs are active+clean; we cannot draw any conclusions";
+ r = -EAGAIN;
+ } else if (!stored_pgs.empty()) {
+ ss << "OSD(s) " << stored_pgs << " last reported they still store some PG"
+ << " data, and not all PGs are active+clean; we cannot be sure they"
+ << " aren't still needed.";
+ r = -EBUSY;
+ }
+ if (r) {
+ cmdctx->reply(r, ss);
+ return true;
+ }
+ ss << "OSD(s) " << osds << " are safe to destroy without reducing data"
+ << " durability.";
+ cmdctx->reply(0, ss);
+ return true;
+ } else if (prefix == "osd ok-to-stop") {
+ vector<string> ids;
+ cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
+ set<int> osds;
+ int r;
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ r = osdmap.parse_osd_id_list(ids, &osds, &ss);
+ });
+ if (!r && osds.empty()) {
+ ss << "must specify one or more OSDs";
+ r = -EINVAL;
+ }
+ if (r < 0) {
+ cmdctx->reply(r, ss);
+ return true;
+ }
+ map<pg_t,int> pg_delta; // pgid -> net acting set size change
+ int dangerous_pgs = 0;
+ cluster_state.with_pgmap([&](const PGMap& pg_map) {
+ return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ if (pg_map.num_pg_unknown > 0) {
+ ss << pg_map.num_pg_unknown << " pgs have unknown state; "
+ << "cannot draw any conclusions";
+ r = -EAGAIN;
+ return;
+ }
+ for (auto osd : osds) {
+ auto p = pg_map.pg_by_osd.find(osd);
+ if (p != pg_map.pg_by_osd.end()) {
+ for (auto& pgid : p->second) {
+ --pg_delta[pgid];
+ }
+ }
+ }
+ for (auto& p : pg_delta) {
+ auto q = pg_map.pg_stat.find(p.first);
+ if (q == pg_map.pg_stat.end()) {
+ ss << "missing information about " << p.first << "; cannot draw"
+ << " any conclusions";
+ r = -EAGAIN;
+ return;
+ }
+ if (!(q->second.state & PG_STATE_ACTIVE) ||
+ (q->second.state & PG_STATE_DEGRADED)) {
+ // we don't currently have a good way to tell *how* degraded
+ // a degraded PG is, so we have to assume we cannot remove
+ // any more replicas/shards.
+ ++dangerous_pgs;
+ continue;
+ }
+ const pg_pool_t *pi = osdmap.get_pg_pool(p.first.pool());
+ if (!pi) {
+ ++dangerous_pgs; // pool is creating or deleting
+ } else {
+ if (q->second.acting.size() + p.second < pi->min_size) {
+ ++dangerous_pgs;
+ }
+ }
+ }
+ });
+ });
+ if (r) {
+ cmdctx->reply(r, ss);
+ return true;
+ }
+ if (dangerous_pgs) {
+ ss << dangerous_pgs << " PGs are already degraded or might become "
+ << "unavailable";
+ cmdctx->reply(-EBUSY, ss);
+ return true;
+ }
+ ss << "OSD(s) " << osds << " are ok to stop without reducing"
+ << " availability, provided there are no other concurrent failures"
+ << " or interventions. " << pg_delta.size() << " PGs are likely to be"
+ << " degraded (but remain available) as a result.";
+ cmdctx->reply(0, ss);
+ return true;
} else if (prefix == "pg force-recovery" ||
prefix == "pg force-backfill" ||
prefix == "pg cancel-force-recovery" ||
} else {
auto workit = pg_map.pg_stat.find(parsed_pg);
if (workit == pg_map.pg_stat.end()) {
- ss << "pg " << pstr << " not exists; ";
+ ss << "pg " << pstr << " does not exist; ";
r = -ENOENT;
} else {
pg_stat_t workpg = workit->second;
}
break;
case OFR_BACKFILL:
- if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILL)) == 0) {
+ if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILLING)) == 0) {
ss << "pg " << pstr << " doesn't require backfilling; ";
continue;
} else if (workpg.state & PG_STATE_FORCED_BACKFILL) {
}
// None of the special native commands,
- MgrPyModule *handler = nullptr;
+ ActivePyModule *handler = nullptr;
auto py_commands = py_modules.get_py_commands();
for (const auto &pyc : py_commands) {
auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
void DaemonServer::_prune_pending_service_map()
{
utime_t cutoff = ceph_clock_now();
- cutoff -= g_conf->mgr_service_beacon_grace;
+ cutoff -= g_conf->get_val<double>("mgr_service_beacon_grace");
auto p = pending_service_map.services.begin();
while (p != pending_service_map.services.end()) {
auto q = p->second.daemons.begin();
void DaemonServer::send_report()
{
if (!pgmap_ready) {
- if (ceph_clock_now() - started_at > g_conf->mgr_stats_period * 4.0) {
+ if (ceph_clock_now() - started_at > g_conf->get_val<int64_t>("mgr_stats_period") * 4.0) {
pgmap_ready = true;
reported_osds.clear();
dout(1) << "Giving up on OSDs that haven't reported yet, sending "
*_dout << dendl;
});
});
+
+ auto osds = daemon_state.get_by_service("osd");
+ map<osd_metric, unique_ptr<OSDHealthMetricCollector>> accumulated;
+ for (const auto& osd : osds) {
+ Mutex::Locker l(osd.second->lock);
+ for (const auto& metric : osd.second->osd_health_metrics) {
+ auto acc = accumulated.find(metric.get_type());
+ if (acc == accumulated.end()) {
+ auto collector = OSDHealthMetricCollector::create(metric.get_type());
+ if (!collector) {
+ derr << __func__ << " " << osd.first << "." << osd.second
+ << " sent me an unknown health metric: "
+ << static_cast<uint8_t>(metric.get_type()) << dendl;
+ continue;
+ }
+ tie(acc, std::ignore) = accumulated.emplace(metric.get_type(),
+ std::move(collector));
+ }
+ acc->second->update(osd.first, metric);
+ }
+ }
+ for (const auto& acc : accumulated) {
+ acc.second->summarize(m->health_checks);
+ }
// TODO? We currently do not notify the PyModules
// TODO: respect needs_send, so we send the report only if we are asked to do
// so, or the state is updated.
daemon_state.cull(p.first, names);
}
}
+
+
+const char** DaemonServer::get_tracked_conf_keys() const
+{
+ static const char *KEYS[] = {
+ "mgr_stats_threshold",
+ "mgr_stats_period",
+ nullptr
+ };
+
+ return KEYS;
+}
+
+void DaemonServer::handle_conf_change(const struct md_config_t *conf,
+ const std::set <std::string> &changed)
+{
+ dout(4) << "ohai" << dendl;
+ // We may be called within lock (via MCommand `config set`) or outwith the
+ // lock (via admin socket `config set`), so handle either case.
+ const bool initially_locked = lock.is_locked_by_me();
+ if (!initially_locked) {
+ lock.Lock();
+ }
+
+ if (changed.count("mgr_stats_threshold") || changed.count("mgr_stats_period")) {
+ dout(4) << "Updating stats threshold/period on "
+ << daemon_connections.size() << " clients" << dendl;
+ // Send a fresh MMgrConfigure to all clients, so that they can follow
+ // the new policy for transmitting stats
+ for (auto &c : daemon_connections) {
+ _send_configure(c);
+ }
+ }
+}
+
+void DaemonServer::_send_configure(ConnectionRef c)
+{
+ assert(lock.is_locked_by_me());
+
+ auto configure = new MMgrConfigure();
+ configure->stats_period = g_conf->get_val<int64_t>("mgr_stats_period");
+ configure->stats_threshold = g_conf->get_val<int64_t>("mgr_stats_threshold");
+ c->send_message(configure);
+}
+