#include "auth/RotatingKeyRing.h"
#include "json_spirit/json_spirit_writer.h"
+#include "mgr/mgr_commands.h"
+#include "mon/MonCommand.h"
+
#include "messages/MMgrOpen.h"
#include "messages/MMgrConfigure.h"
#include "messages/MMonMgrReport.h"
#include "messages/MCommandReply.h"
#include "messages/MPGStats.h"
#include "messages/MOSDScrub.h"
+#include "messages/MOSDForceRecovery.h"
#include "common/errno.h"
#define dout_context g_ceph_context
#undef dout_prefix
#define dout_prefix *_dout << "mgr.server " << __func__ << " "
+
+
DaemonServer::DaemonServer(MonClient *monc_,
Finisher &finisher_,
DaemonStateIndex &daemon_state_,
s->inst.addr = con->get_peer_addr();
AuthCapsInfo caps_info;
- is_valid = handler->verify_authorizer(
- cct, monc->rotating_secrets.get(),
- authorizer_data,
- authorizer_reply, s->entity_name,
- s->global_id, caps_info,
- session_key);
+ RotatingKeyRing *keys = monc->rotating_secrets.get();
+ if (keys) {
+ is_valid = handler->verify_authorizer(
+ cct, keys,
+ authorizer_data,
+ authorizer_reply, s->entity_name,
+ s->global_id, caps_info,
+ session_key);
+ } else {
+ dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl;
+ is_valid = false;
+ }
if (is_valid) {
if (caps_info.allow_all) {
configure->stats_period = g_conf->mgr_stats_period;
m->get_connection()->send_message(configure);
+ DaemonStatePtr daemon;
if (daemon_state.exists(key)) {
+ daemon = daemon_state.get(key);
+ }
+ if (daemon) {
dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
+ Mutex::Locker l(daemon->lock);
daemon_state.get(key)->perf_counters.clear();
}
if (m->service_daemon) {
- DaemonStatePtr daemon;
- if (daemon_state.exists(key)) {
- daemon = daemon_state.get(key);
- } else {
+ if (!daemon) {
dout(4) << "constructing new DaemonState for " << key << dendl;
daemon = std::make_shared<DaemonState>(daemon_state.types);
daemon->key = key;
}
daemon_state.insert(daemon);
}
+ Mutex::Locker l(daemon->lock);
daemon->service_daemon = true;
daemon->metadata = m->daemon_metadata;
daemon->service_status = m->daemon_status;
}
assert(daemon != nullptr);
auto &daemon_counters = daemon->perf_counters;
- daemon_counters.update(m);
+ {
+ Mutex::Locker l(daemon->lock);
+ daemon_counters.update(m);
+ }
+ // if there are any schema updates, notify the python modules
+ if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
+ ostringstream oss;
+ oss << key.first << '.' << key.second;
+ py_modules.notify_all("perf_schema_update", oss.str());
+ }
if (daemon->service_daemon) {
utime_t now = ceph_clock_now();
return true;
}
-struct MgrCommand {
- string cmdstring;
- string helpstring;
- string module;
- string perm;
- string availability;
-
- bool requires_perm(char p) const {
- return (perm.find(p) != string::npos);
- }
-
-} mgr_commands[] = {
-
-#define COMMAND(parsesig, helptext, module, perm, availability) \
- {parsesig, helptext, module, perm, availability},
-#include "MgrCommands.h"
-#undef COMMAND
-};
void DaemonServer::_generate_command_map(
map<string,cmd_vartype>& cmdmap,
}
}
-const MgrCommand *DaemonServer::_get_mgrcommand(
+const MonCommand *DaemonServer::_get_mgrcommand(
const string &cmd_prefix,
- MgrCommand *cmds,
- int cmds_size)
+ const std::vector<MonCommand> &cmds)
{
- MgrCommand *this_cmd = NULL;
- for (MgrCommand *cp = cmds;
- cp < &cmds[cmds_size]; cp++) {
- if (cp->cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
- this_cmd = cp;
+ const MonCommand *this_cmd = nullptr;
+ for (const auto &cmd : cmds) {
+ if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
+ this_cmd = &cmd;
break;
}
}
const string &prefix,
const map<string,cmd_vartype>& cmdmap,
const map<string,string>& param_str_map,
- const MgrCommand *this_cmd) {
+ const MonCommand *this_cmd) {
if (s->entity_name.is_mon()) {
// mon is all-powerful. even when it is forwarding commands on behalf of
dout(4) << "prefix=" << prefix << dendl;
if (prefix == "get_command_descriptions") {
- int cmdnum = 0;
-
dout(10) << "reading commands from python modules" << dendl;
- auto py_commands = py_modules.get_commands();
+ const auto py_commands = py_modules.get_commands();
+ int cmdnum = 0;
JSONFormatter f;
f.open_object_section("command_descriptions");
- for (const auto &pyc : py_commands) {
+
+ auto dump_cmd = [&cmdnum, &f](const MonCommand &mc){
ostringstream secname;
secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
- dout(20) << "Dumping " << pyc.cmdstring << " (" << pyc.helpstring
- << ")" << dendl;
- dump_cmddesc_to_json(&f, secname.str(), pyc.cmdstring, pyc.helpstring,
- "mgr", pyc.perm, "cli", 0);
+ dump_cmddesc_to_json(&f, secname.str(), mc.cmdstring, mc.helpstring,
+ mc.module, mc.req_perms, mc.availability, 0);
cmdnum++;
+ };
+
+ for (const auto &pyc : py_commands) {
+ dump_cmd(pyc);
}
- for (const auto &cp : mgr_commands) {
- ostringstream secname;
- secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
- dump_cmddesc_to_json(&f, secname.str(), cp.cmdstring, cp.helpstring,
- cp.module, cp.perm, cp.availability, 0);
- cmdnum++;
+ for (const auto &mgr_cmd : mgr_commands) {
+ dump_cmd(mgr_cmd);
}
+
f.close_section(); // command_descriptions
f.flush(cmdctx->odata);
cmdctx->reply(0, ss);
}
// lookup command
- const MgrCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands,
- ARRAY_SIZE(mgr_commands));
+ const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands);
_generate_command_map(cmdctx->cmdmap, param_str_map);
if (!mgr_cmd) {
- MgrCommand py_command = {"", "", "py", "rw", "cli"};
+ MonCommand py_command = {"", "", "py", "rw", "cli"};
if (!_allowed_command(session.get(), py_command.module, prefix, cmdctx->cmdmap,
param_str_map, &py_command)) {
dout(1) << " access denied" << dendl;
DaemonKey key(p.first, q.first);
assert(daemon_state.exists(key));
auto daemon = daemon_state.get(key);
+ Mutex::Locker l(daemon->lock);
f->dump_stream("status_stamp") << daemon->service_status_stamp;
f->dump_stream("last_beacon") << daemon->last_service_beacon;
f->open_object_section("status");
});
cmdctx->reply(r, "");
return true;
+ } else if (prefix == "pg force-recovery" ||
+ prefix == "pg force-backfill" ||
+ prefix == "pg cancel-force-recovery" ||
+ prefix == "pg cancel-force-backfill") {
+ string forceop = prefix.substr(3, string::npos);
+ list<pg_t> parsed_pgs;
+ map<int, list<pg_t> > osdpgs;
+
+ // figure out actual op just once
+ int actual_op = 0;
+ if (forceop == "force-recovery") {
+ actual_op = OFR_RECOVERY;
+ } else if (forceop == "force-backfill") {
+ actual_op = OFR_BACKFILL;
+ } else if (forceop == "cancel-force-backfill") {
+ actual_op = OFR_BACKFILL | OFR_CANCEL;
+ } else if (forceop == "cancel-force-recovery") {
+ actual_op = OFR_RECOVERY | OFR_CANCEL;
+ }
+
+ // covnert pg names to pgs, discard any invalid ones while at it
+ {
+ // we don't want to keep pgidstr and pgidstr_nodup forever
+ vector<string> pgidstr;
+ // get pgids to process and prune duplicates
+ cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
+ set<string> pgidstr_nodup(pgidstr.begin(), pgidstr.end());
+ if (pgidstr.size() != pgidstr_nodup.size()) {
+ // move elements only when there were duplicates, as this
+ // reorders them
+ pgidstr.resize(pgidstr_nodup.size());
+ auto it = pgidstr_nodup.begin();
+ for (size_t i = 0 ; i < pgidstr_nodup.size(); i++) {
+ pgidstr[i] = std::move(*it++);
+ }
+ }
+
+ cluster_state.with_pgmap([&](const PGMap& pg_map) {
+ for (auto& pstr : pgidstr) {
+ pg_t parsed_pg;
+ if (!parsed_pg.parse(pstr.c_str())) {
+ ss << "invalid pgid '" << pstr << "'; ";
+ r = -EINVAL;
+ } else {
+ auto workit = pg_map.pg_stat.find(parsed_pg);
+ if (workit == pg_map.pg_stat.end()) {
+ ss << "pg " << pstr << " not exists; ";
+ r = -ENOENT;
+ } else {
+ pg_stat_t workpg = workit->second;
+
+ // discard pgs for which user requests are pointless
+ switch (actual_op)
+ {
+ case OFR_RECOVERY:
+ if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING)) == 0) {
+ // don't return error, user script may be racing with cluster. not fatal.
+ ss << "pg " << pstr << " doesn't require recovery; ";
+ continue;
+ } else if (workpg.state & PG_STATE_FORCED_RECOVERY) {
+ ss << "pg " << pstr << " recovery already forced; ";
+ // return error, as it may be a bug in user script
+ r = -EINVAL;
+ continue;
+ }
+ break;
+ case OFR_BACKFILL:
+ if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILL)) == 0) {
+ ss << "pg " << pstr << " doesn't require backfilling; ";
+ continue;
+ } else if (workpg.state & PG_STATE_FORCED_BACKFILL) {
+ ss << "pg " << pstr << " backfill already forced; ";
+ r = -EINVAL;
+ continue;
+ }
+ break;
+ case OFR_BACKFILL | OFR_CANCEL:
+ if ((workpg.state & PG_STATE_FORCED_BACKFILL) == 0) {
+ ss << "pg " << pstr << " backfill not forced; ";
+ continue;
+ }
+ break;
+ case OFR_RECOVERY | OFR_CANCEL:
+ if ((workpg.state & PG_STATE_FORCED_RECOVERY) == 0) {
+ ss << "pg " << pstr << " recovery not forced; ";
+ continue;
+ }
+ break;
+ default:
+ assert(0 == "actual_op value is not supported");
+ }
+
+ parsed_pgs.push_back(std::move(parsed_pg));
+ }
+ }
+ }
+
+ // group pgs to process by osd
+ for (auto& pgid : parsed_pgs) {
+ auto workit = pg_map.pg_stat.find(pgid);
+ if (workit != pg_map.pg_stat.end()) {
+ pg_stat_t workpg = workit->second;
+ set<int32_t> osds(workpg.up.begin(), workpg.up.end());
+ osds.insert(workpg.acting.begin(), workpg.acting.end());
+ for (auto i : osds) {
+ osdpgs[i].push_back(pgid);
+ }
+ }
+ }
+
+ });
+ }
+
+ // respond with error only when no pgs are correct
+ // yes, in case of mixed errors, only the last one will be emitted,
+ // but the message presented will be fine
+ if (parsed_pgs.size() != 0) {
+ // clear error to not confuse users/scripts
+ r = 0;
+ }
+
+ // optimize the command -> messages conversion, use only one message per distinct OSD
+ cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+ for (auto& i : osdpgs) {
+ if (osdmap.is_up(i.first)) {
+ vector<pg_t> pgvec(make_move_iterator(i.second.begin()), make_move_iterator(i.second.end()));
+ auto p = osd_cons.find(i.first);
+ if (p == osd_cons.end()) {
+ ss << "osd." << i.first << " is not currently connected";
+ r = -EAGAIN;
+ continue;
+ }
+ for (auto& con : p->second) {
+ con->send_message(new MOSDForceRecovery(monc->get_fsid(), pgvec, actual_op));
+ }
+ ss << "instructing pg(s) " << i.second << " on osd." << i.first << " to " << forceop << "; ";
+ }
+ }
+ });
+ ss << std::endl;
+ cmdctx->reply(r, ss);
+ return true;
} else {
r = cluster_state.with_pgmap([&](const PGMap& pg_map) {
return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
// None of the special native commands,
MgrPyModule *handler = nullptr;
- auto py_commands = py_modules.get_commands();
+ auto py_commands = py_modules.get_py_commands();
for (const auto &pyc : py_commands) {
auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
continue;
}
auto daemon = daemon_state.get(key);
+ Mutex::Locker l(daemon->lock);
if (daemon->last_service_beacon == utime_t()) {
// we must have just restarted; assume they are alive now.
daemon->last_service_beacon = ceph_clock_now();
}
auto m = new MMonMgrReport();
+ py_modules.get_health_checks(&m->health_checks);
+
cluster_state.with_pgmap([&](const PGMap& pg_map) {
cluster_state.update_delta_stats();
pg_map.get_health_checks(g_ceph_context, osdmap,
&m->health_checks);
+
dout(10) << m->health_checks.checks.size() << " health checks"
<< dendl;
dout(20) << "health checks:\n";