*/
+#include <iterator>
#include <sstream>
+#include <tuple>
#include <stdlib.h>
#include <signal.h>
#include <limits.h>
#include <boost/scope_exit.hpp>
#include <boost/algorithm/string/predicate.hpp>
+#include "json_spirit/json_spirit_reader.h"
+#include "json_spirit/json_spirit_writer.h"
+
#include "Monitor.h"
#include "common/version.h"
+#include "common/blkdev.h"
+#include "common/cmdparse.h"
+#include "common/signal.h"
#include "osd/OSDMap.h"
#include "messages/MGenericMessage.h"
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
-#include "messages/MMonHealth.h"
#include "messages/MMonMetadata.h"
#include "messages/MMonSync.h"
#include "messages/MMonScrub.h"
#include "messages/MMonPaxos.h"
#include "messages/MRoute.h"
#include "messages/MForward.h"
-#include "messages/MStatfs.h"
#include "messages/MMonSubscribe.h"
#include "messages/MMonSubscribeAck.h"
#include "messages/MAuthReply.h"
-#include "messages/MTimeCheck.h"
+#include "messages/MTimeCheck2.h"
#include "messages/MPing.h"
#include "common/strtol.h"
#include "OSDMonitor.h"
#include "MDSMonitor.h"
#include "MonmapMonitor.h"
-#include "PGMonitor.h"
#include "LogMonitor.h"
#include "AuthMonitor.h"
#include "MgrMonitor.h"
#include "MgrStatMonitor.h"
+#include "ConfigMonitor.h"
#include "mon/QuorumService.h"
-#include "mon/OldHealthMonitor.h"
#include "mon/HealthMonitor.h"
#include "mon/ConfigKeyService.h"
#include "common/config.h"
#include "common/cmdparse.h"
-#include "include/assert.h"
+#include "include/ceph_assert.h"
#include "include/compat.h"
#include "perfglue/heap_profiler.h"
#undef COMMAND
#undef COMMAND_WITH_FLAG
#define FLAG(f) (MonCommand::FLAG_##f)
-#define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
- {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
-#define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
- {parsesig, helptext, modulename, req_perms, avail, flags},
+#define COMMAND(parsesig, helptext, modulename, req_perms) \
+ {parsesig, helptext, modulename, req_perms, FLAG(NONE)},
+#define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, flags) \
+ {parsesig, helptext, modulename, req_perms, flags},
MonCommand mon_commands[] = {
#include <mon/MonCommands.h>
};
-MonCommand pgmonitor_commands[] = {
-#include <mon/PGMonitorCommands.h>
-};
#undef COMMAND
#undef COMMAND_WITH_FLAG
+
void C_MonContext::finish(int r) {
if (mon->is_shutdown())
return;
Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
Messenger *m, Messenger *mgr_m, MonMap *map) :
Dispatcher(cct_),
+ AuthServer(cct_),
name(nm),
rank(-1),
messenger(m),
lock("Monitor::lock"),
timer(cct_, lock),
finisher(cct_, "mon_finisher", "fin"),
- cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf->mon_cpu_threads),
+ cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads),
has_ever_joined(false),
logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
monmap(map),
cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
auth_service_required(cct,
cct->_conf->auth_supported.empty() ?
- cct->_conf->auth_service_required : cct->_conf->auth_supported ),
+ cct->_conf->auth_service_required : cct->_conf->auth_supported),
mgr_messenger(mgr_m),
mgr_client(cct_, mgr_m),
- pgservice(nullptr),
+ gss_ktfile_client(cct->_conf.get_val<std::string>("gss_ktab_client_file")),
store(s),
- state(STATE_PROBING),
-
elector(this),
required_features(0),
leader(0),
paxos_service(PAXOS_NUM),
admin_hook(NULL),
routed_request_tid(0),
- op_tracker(cct, true, 1)
+ op_tracker(cct, g_conf().get_val<bool>("mon_enable_op_tracker"), 1)
{
clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
update_log_clients();
+ if (!gss_ktfile_client.empty()) {
+ // Assert we can export environment variable
+ /*
+ The default client keytab is used, if it is present and readable,
+ to automatically obtain initial credentials for GSSAPI client
+ applications. The principal name of the first entry in the client
+ keytab is used by default when obtaining initial credentials.
+ 1. The KRB5_CLIENT_KTNAME environment variable.
+ 2. The default_client_keytab_name profile variable in [libdefaults].
+ 3. The hardcoded default, DEFCKTNAME.
+ */
+ const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
+ gss_ktfile_client.c_str(), 1));
+ ceph_assert(set_result == 0);
+ }
+
+ op_tracker.set_complaint_and_threshold(
+ g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count(),
+ g_conf().get_val<int64_t>("mon_op_log_threshold"));
+ op_tracker.set_history_size_and_duration(
+ g_conf().get_val<uint64_t>("mon_op_history_size"),
+ g_conf().get_val<std::chrono::seconds>("mon_op_history_duration").count());
+ op_tracker.set_history_slow_op_size_and_threshold(
+ g_conf().get_val<uint64_t>("mon_op_history_slow_op_size"),
+ g_conf().get_val<std::chrono::seconds>("mon_op_history_slow_op_threshold").count());
+
paxos = new Paxos(this, "paxos");
- paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap");
- paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap");
- paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap");
- paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap");
- paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm");
- paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth");
- paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr");
- paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat");
- paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health");
-
- health_monitor = new OldHealthMonitor(this);
+ paxos_service[PAXOS_MDSMAP].reset(new MDSMonitor(this, paxos, "mdsmap"));
+ paxos_service[PAXOS_MONMAP].reset(new MonmapMonitor(this, paxos, "monmap"));
+ paxos_service[PAXOS_OSDMAP].reset(new OSDMonitor(cct, this, paxos, "osdmap"));
+ paxos_service[PAXOS_LOG].reset(new LogMonitor(this, paxos, "logm"));
+ paxos_service[PAXOS_AUTH].reset(new AuthMonitor(this, paxos, "auth"));
+ paxos_service[PAXOS_MGR].reset(new MgrMonitor(this, paxos, "mgr"));
+ paxos_service[PAXOS_MGRSTAT].reset(new MgrStatMonitor(this, paxos, "mgrstat"));
+ paxos_service[PAXOS_HEALTH].reset(new HealthMonitor(this, paxos, "health"));
+ paxos_service[PAXOS_CONFIG].reset(new ConfigMonitor(this, paxos, "config"));
+
config_key_service = new ConfigKeyService(this, paxos);
- mon_caps = new MonCap();
- bool r = mon_caps->parse("allow *", NULL);
- assert(r);
+ bool r = mon_caps.parse("allow *", NULL);
+ ceph_assert(r);
exited_quorum = ceph_clock_now();
// prepare local commands
- local_mon_commands.resize(ARRAY_SIZE(mon_commands));
- for (unsigned i = 0; i < ARRAY_SIZE(mon_commands); ++i) {
+ local_mon_commands.resize(std::size(mon_commands));
+ for (unsigned i = 0; i < std::size(mon_commands); ++i) {
local_mon_commands[i] = mon_commands[i];
}
MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
- local_upgrading_mon_commands = local_mon_commands;
- for (unsigned i = 0; i < ARRAY_SIZE(pgmonitor_commands); ++i) {
- local_upgrading_mon_commands.push_back(pgmonitor_commands[i]);
+ prenautilus_local_mon_commands = local_mon_commands;
+ for (auto& i : prenautilus_local_mon_commands) {
+ std::string n = cmddesc_get_prenautilus_compat(i.cmdstring);
+ if (n != i.cmdstring) {
+ dout(20) << " pre-nautilus cmd " << i.cmdstring << " -> " << n << dendl;
+ i.cmdstring = n;
+ }
}
- MonCommand::encode_vector(local_upgrading_mon_commands,
- local_upgrading_mon_commands_bl);
+ MonCommand::encode_vector(prenautilus_local_mon_commands, prenautilus_local_mon_commands_bl);
// assume our commands until we have an election. this only means
// we won't reply with EINVAL before the election; any command that
// actually matters will wait until we have quorum etc and then
// retry (and revalidate).
leader_mon_commands = local_mon_commands;
-
- // note: OSDMonitor may update this based on the luminous flag.
- pgservice = mgrstatmon()->get_pg_stat_service();
}
Monitor::~Monitor()
{
- for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
- delete *p;
- delete health_monitor;
+ op_tracker.on_shutdown();
+
+ paxos_service.clear();
delete config_key_service;
delete paxos;
- assert(session_map.sessions.empty());
- delete mon_caps;
+ ceph_assert(session_map.sessions.empty());
}
Monitor *mon;
public:
explicit AdminHook(Monitor *m) : mon(m) {}
- bool call(std::string command, cmdmap_t& cmdmap, std::string format,
- bufferlist& out) override {
+ bool call(std::string_view command, const cmdmap_t& cmdmap,
+ std::string_view format, bufferlist& out) override {
stringstream ss;
mon->do_admin_command(command, cmdmap, format, ss);
out.append(ss);
}
};
-void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
- ostream& ss)
+void Monitor::do_admin_command(std::string_view command, const cmdmap_t& cmdmap,
+ std::string_view format, std::ostream& ss)
{
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
boost::scoped_ptr<Formatter> f(Formatter::create(format));
string args;
- for (cmdmap_t::iterator p = cmdmap.begin();
+ for (auto p = cmdmap.begin();
p != cmdmap.end(); ++p) {
if (p->first == "prefix")
continue;
goto abort;
}
sync_force(f.get(), ss);
- } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
+ } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0 ||
+ command.compare(0, 24, "add_bootstrap_peer_hintv") == 0) {
if (!_add_bootstrap_peer_hint(command, cmdmap, ss))
goto abort;
} else if (command == "quorum enter") {
f->flush(ss);
}
+ } else if (command == "dump_historic_ops") {
+ if (op_tracker.dump_historic_ops(f.get())) {
+ f->flush(ss);
+ } else {
+ ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
+ please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
+ }
+ } else if (command == "dump_historic_ops_by_duration" ) {
+ if (op_tracker.dump_historic_ops(f.get(), true)) {
+ f->flush(ss);
+ } else {
+ ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
+ please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
+ }
+ } else if (command == "dump_historic_slow_ops") {
+ if (op_tracker.dump_historic_slow_ops(f.get(), {})) {
+ f->flush(ss);
+ } else {
+ ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
+ please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
+ }
} else {
- assert(0 == "bad AdminSocket command binding");
+ ceph_abort_msg("bad AdminSocket command binding");
}
(read_only ? audit_clog->debug() : audit_clog->info())
<< "from='admin socket' "
void Monitor::handle_signal(int signum)
{
- assert(signum == SIGINT || signum == SIGTERM);
+ ceph_assert(signum == SIGINT || signum == SIGTERM);
derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
shutdown();
}
compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
+ compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
+ compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
return compat;
}
t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
store->apply_transaction(t);
} else {
- bufferlist::iterator it = featuresbl.begin();
+ auto it = featuresbl.cbegin();
features->decode(it);
}
}
"mon_health_to_clog_tick_interval",
// scrub interval
"mon_scrub_interval",
+ "mon_allow_pool_delete",
+ // osdmap pruning - observed, not handled.
+ "mon_osdmap_full_prune_enabled",
+ "mon_osdmap_full_prune_min",
+ "mon_osdmap_full_prune_interval",
+ "mon_osdmap_full_prune_txsize",
+ // debug options - observed, not handled
+ "mon_debug_extra_checks",
+ "mon_debug_block_osdmap_trim",
NULL
};
return KEYS;
}
-void Monitor::handle_conf_change(const struct md_config_t *conf,
+void Monitor::handle_conf_change(const ConfigProxy& conf,
const std::set<std::string> &changed)
{
sanitize_options();
// mon_lease must be greater than mon_lease_renewal; otherwise we
// may incur in leases expiring before they are renewed.
- if (g_conf->mon_lease_renew_interval_factor >= 1.0) {
+ if (g_conf()->mon_lease_renew_interval_factor >= 1.0) {
clog->error() << "mon_lease_renew_interval_factor ("
- << g_conf->mon_lease_renew_interval_factor
+ << g_conf()->mon_lease_renew_interval_factor
<< ") must be less than 1.0";
r = -EINVAL;
}
// with the same value, for a given small vale, could mean timing out if
// the monitors happened to be overloaded -- or even under normal load for
// a small enough value.
- if (g_conf->mon_lease_ack_timeout_factor <= 1.0) {
+ if (g_conf()->mon_lease_ack_timeout_factor <= 1.0) {
clog->error() << "mon_lease_ack_timeout_factor ("
- << g_conf->mon_lease_ack_timeout_factor
+ << g_conf()->mon_lease_ack_timeout_factor
<< ") must be greater than 1.0";
r = -EINVAL;
}
return r;
}
- assert(!logger);
+ ceph_assert(!logger);
{
PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess",
cct->get_perfcounters_collection()->add(logger);
}
- assert(!cluster_logger);
+ ceph_assert(!cluster_logger);
{
PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
- pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster", NULL, 0, unit_t(BYTES));
- pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space", NULL, 0, unit_t(BYTES));
- pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space", NULL, 0, unit_t(BYTES));
+ pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster", NULL, 0, unit_t(UNIT_BYTES));
+ pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space", NULL, 0, unit_t(UNIT_BYTES));
+ pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space", NULL, 0, unit_t(UNIT_BYTES));
pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
- pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects");
- pcb.add_u64(l_cluster_num_mds_up, "num_mds_up", "MDSs that are up");
- pcb.add_u64(l_cluster_num_mds_in, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
- pcb.add_u64(l_cluster_num_mds_failed, "num_mds_failed", "Failed MDS");
- pcb.add_u64(l_cluster_mds_epoch, "mds_epoch", "Current epoch of MDS map");
+ pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects", NULL, 0, unit_t(UNIT_BYTES));
cluster_logger = pcb.create_perf_counters();
}
if (!has_ever_joined) {
// impose initial quorum restrictions?
list<string> initial_members;
- get_str_list(g_conf->mon_initial_members, initial_members);
+ get_str_list(g_conf()->mon_initial_members, initial_members);
if (!initial_members.empty()) {
dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
- monmap->set_initial_members(g_ceph_context, initial_members, name, messenger->get_myaddr(),
- &extra_probe_peers);
+ monmap->set_initial_members(
+ g_ceph_context, initial_members, name, messenger->get_myaddrs(),
+ &extra_probe_peers);
dout(10) << " monmap is " << *monmap << dendl;
dout(10) << " extra probe peers " << extra_probe_peers << dendl;
} else if (!monmap->contains(name)) {
derr << "not in monmap and have been in a quorum before; "
<< "must have been removed" << dendl;
- if (g_conf->mon_force_quorum_join) {
+ if (g_conf()->mon_force_quorum_join) {
dout(0) << "we should have died but "
<< "'mon_force_quorum_join' is set -- allowing boot" << dendl;
} else {
dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
init_paxos();
- health_monitor->init();
if (is_keyring_required()) {
// we need to bootstrap authentication keys so we can form an
if (err == 0 && bl.length() > 0) {
// Attempt to decode and extract keyring only if it is found.
KeyRing keyring;
- bufferlist::iterator p = bl.begin();
- ::decode(keyring, p);
+ auto p = bl.cbegin();
+ decode(keyring, p);
extract_save_mon_key(keyring);
}
}
- string keyring_loc = g_conf->mon_data + "/keyring";
+ string keyring_loc = g_conf()->mon_data + "/keyring";
r = keyring.load(cct, keyring_loc);
if (r < 0) {
keyring.encode_plaintext(bl);
write_default_keyring(bl);
} else {
- derr << "unable to load initial keyring " << g_conf->keyring << dendl;
+ derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
lock.Unlock();
return r;
}
lock.Unlock();
r = admin_socket->register_command("mon_status", "mon_status", admin_hook,
"show current monitor status");
- assert(r == 0);
+ ceph_assert(r == 0);
r = admin_socket->register_command("quorum_status", "quorum_status",
admin_hook, "show current quorum status");
- assert(r == 0);
+ ceph_assert(r == 0);
r = admin_socket->register_command("sync_force",
"sync_force name=validate,"
"type=CephChoices,"
"strings=--yes-i-really-mean-it",
admin_hook,
"force sync of and clear monitor store");
- assert(r == 0);
+ ceph_assert(r == 0);
r = admin_socket->register_command("add_bootstrap_peer_hint",
"add_bootstrap_peer_hint name=addr,"
"type=CephIPAddr",
admin_hook,
"add peer address as potential bootstrap"
" peer for cluster bringup");
- assert(r == 0);
+ ceph_assert(r == 0);
+ r = admin_socket->register_command("add_bootstrap_peer_hintv",
+ "add_bootstrap_peer_hintv name=addrv,"
+ "type=CephString",
+ admin_hook,
+ "add peer address vector as potential bootstrap"
+ " peer for cluster bringup");
+ ceph_assert(r == 0);
r = admin_socket->register_command("quorum enter", "quorum enter",
admin_hook,
"force monitor back into quorum");
- assert(r == 0);
+ ceph_assert(r == 0);
r = admin_socket->register_command("quorum exit", "quorum exit",
admin_hook,
"force monitor out of the quorum");
- assert(r == 0);
+ ceph_assert(r == 0);
r = admin_socket->register_command("ops",
"ops",
admin_hook,
"show the ops currently in flight");
- assert(r == 0);
+ ceph_assert(r == 0);
r = admin_socket->register_command("sessions",
"sessions",
admin_hook,
"list existing sessions");
- assert(r == 0);
+ ceph_assert(r == 0);
+ r = admin_socket->register_command("dump_historic_ops", "dump_historic_ops",
+ admin_hook,
+ "show recent ops");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command("dump_historic_ops_by_duration", "dump_historic_ops_by_duration",
+ admin_hook,
+ "show recent ops, sorted by duration");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command("dump_historic_slow_ops", "dump_historic_slow_ops",
+ admin_hook,
+ "show recent slow ops");
+ ceph_assert(r == 0);
lock.Lock();
// add ourselves as a conf observer
- g_conf->add_observer(this);
+ g_conf().add_observer(this);
+
+ messenger->set_auth_client(this);
+ messenger->set_auth_server(this);
+ mgr_messenger->set_auth_client(this);
+
+ auth_registry.refresh_config();
lock.Unlock();
return 0;
int Monitor::init()
{
dout(2) << "init" << dendl;
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
finisher.start();
// i'm ready!
messenger->add_dispatcher_tail(this);
+ // kickstart pet mgrclient
mgr_client.init();
mgr_messenger->add_dispatcher_tail(&mgr_client);
mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
+ mgrmon()->prime_mgr_client();
+ state = STATE_PROBING;
bootstrap();
// add features of myself into feature_map
session_map.feature_map.add_mon(con_self->get_features());
paxos->init();
// init services
- for (int i = 0; i < PAXOS_NUM; ++i) {
- paxos_service[i]->init();
+ for (auto& svc : paxos_service) {
+ svc->init();
}
refresh_from_paxos(NULL);
int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
if (r >= 0) {
try {
- bufferlist::iterator p = bl.begin();
- ::decode(fingerprint, p);
+ auto p = bl.cbegin();
+ decode(fingerprint, p);
}
catch (buffer::error& e) {
dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
dout(10) << __func__ << " no cluster_fingerprint" << dendl;
}
- for (int i = 0; i < PAXOS_NUM; ++i) {
- paxos_service[i]->refresh(need_bootstrap);
+ for (auto& svc : paxos_service) {
+ svc->refresh(need_bootstrap);
}
- for (int i = 0; i < PAXOS_NUM; ++i) {
- paxos_service[i]->post_refresh();
+ for (auto& svc : paxos_service) {
+ svc->post_refresh();
}
load_metadata();
}
wait_for_paxos_write();
+ {
+ std::lock_guard l(auth_lock);
+ authmon()->_set_mon_num_rank(0, 0);
+ }
+
state = STATE_SHUTDOWN;
lock.Unlock();
- g_conf->remove_observer(this);
+ g_conf().remove_observer(this);
lock.Lock();
if (admin_hook) {
- AdminSocket* admin_socket = cct->get_admin_socket();
- admin_socket->unregister_command("mon_status");
- admin_socket->unregister_command("quorum_status");
- admin_socket->unregister_command("sync_force");
- admin_socket->unregister_command("add_bootstrap_peer_hint");
- admin_socket->unregister_command("quorum enter");
- admin_socket->unregister_command("quorum exit");
- admin_socket->unregister_command("ops");
- admin_socket->unregister_command("sessions");
+ cct->get_admin_socket()->unregister_commands(admin_hook);
delete admin_hook;
admin_hook = NULL;
}
// clean up
paxos->shutdown();
- for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
- (*p)->shutdown();
- health_monitor->shutdown();
+ for (auto& svc : paxos_service) {
+ svc->shutdown();
+ }
finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
}
}
+void Monitor::respawn()
+{
+ // --- WARNING TO FUTURE COPY/PASTERS ---
+ // You must also add a call like
+ //
+ // ceph_pthread_setname(pthread_self(), "ceph-mon");
+ //
+ // to main() so that /proc/$pid/stat field 2 contains "(ceph-mon)"
+ // instead of "(exe)", so that killall (and log rotation) will work.
+
+ dout(0) << __func__ << dendl;
+
+ char *new_argv[orig_argc+1];
+ dout(1) << " e: '" << orig_argv[0] << "'" << dendl;
+ for (int i=0; i<orig_argc; i++) {
+ new_argv[i] = (char *)orig_argv[i];
+ dout(1) << " " << i << ": '" << orig_argv[i] << "'" << dendl;
+ }
+ new_argv[orig_argc] = NULL;
+
+ /* Determine the path to our executable, test if Linux /proc/self/exe exists.
+ * This allows us to exec the same executable even if it has since been
+ * unlinked.
+ */
+ char exe_path[PATH_MAX] = "";
+#ifdef PROCPREFIX
+ if (readlink(PROCPREFIX "/proc/self/exe", exe_path, PATH_MAX-1) != -1) {
+ dout(1) << "respawning with exe " << exe_path << dendl;
+ strcpy(exe_path, PROCPREFIX "/proc/self/exe");
+ } else {
+#else
+ {
+#endif
+ /* Print CWD for the user's interest */
+ char buf[PATH_MAX];
+ char *cwd = getcwd(buf, sizeof(buf));
+ ceph_assert(cwd);
+ dout(1) << " cwd " << cwd << dendl;
+
+ /* Fall back to a best-effort: just running in our CWD */
+ strncpy(exe_path, orig_argv[0], PATH_MAX-1);
+ }
+
+ dout(1) << " exe_path " << exe_path << dendl;
+
+ unblock_all_signals(NULL);
+ execv(exe_path, new_argv);
+
+ dout(0) << "respawn execv " << orig_argv[0]
+ << " failed with " << cpp_strerror(errno) << dendl;
+
+ // We have to assert out here, because suicide() returns, and callers
+ // to respawn expect it never to return.
+ ceph_abort();
+}
+
void Monitor::bootstrap()
{
dout(10) << "bootstrap" << dendl;
unregister_cluster_logger();
cancel_probe_timeout();
+ if (monmap->get_epoch() == 0) {
+ dout(10) << "reverting to legacy ranks for seed monmap (epoch 0)" << dendl;
+ monmap->calc_legacy_ranks();
+ }
+ dout(10) << "monmap " << *monmap << dendl;
+
+ if (monmap->min_mon_release &&
+ monmap->min_mon_release + 2 < (int)ceph_release()) {
+ derr << "current monmap has min_mon_release "
+ << (int)monmap->min_mon_release
+ << " (" << ceph_release_name(monmap->min_mon_release)
+ << ") which is >2 releases older than me " << ceph_release()
+ << " (" << ceph_release_name(ceph_release()) << "), stopping."
+ << dendl;
+ exit(0);
+ }
+
// note my rank
- int newrank = monmap->get_rank(messenger->get_myaddr());
+ int newrank = monmap->get_rank(messenger->get_myaddrs());
if (newrank < 0 && rank >= 0) {
// was i ever part of the quorum?
if (has_ever_joined) {
exit(0);
}
}
+ if (newrank >= 0 &&
+ monmap->get_addrs(newrank) != messenger->get_myaddrs()) {
+ dout(0) << " monmap addrs for rank " << newrank << " changed, i am "
+ << messenger->get_myaddrs()
+ << ", monmap is " << monmap->get_addrs(newrank) << ", respawning"
+ << dendl;
+ respawn();
+ }
if (newrank != rank) {
dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
messenger->set_myname(entity_name_t::MON(newrank));
_reset();
// sync store
- if (g_conf->mon_compact_on_bootstrap) {
+ if (g_conf()->mon_compact_on_bootstrap) {
dout(10) << "bootstrap -- triggering compaction" << dendl;
store->compact();
dout(10) << "bootstrap -- finished compaction" << dendl;
dout(10) << "probing other monitors" << dendl;
for (unsigned i = 0; i < monmap->size(); i++) {
if ((int)i != rank)
- messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
- monmap->get_inst(i));
- }
- for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
- p != extra_probe_peers.end();
- ++p) {
- if (*p != messenger->get_myaddr()) {
- entity_inst_t i;
- i.name = entity_name_t::MON(-1);
- i.addr = *p;
- messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
+ send_mon_message(
+ new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
+ ceph_release()),
+ i);
+ }
+ for (auto& av : extra_probe_peers) {
+ if (av != messenger->get_myaddrs()) {
+ messenger->send_to_mon(
+ new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
+ ceph_release()),
+ av);
}
}
}
-bool Monitor::_add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss)
+bool Monitor::_add_bootstrap_peer_hint(std::string_view cmd,
+ const cmdmap_t& cmdmap,
+ ostream& ss)
{
- string addrstr;
- if (!cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) {
- ss << "unable to parse address string value '"
- << cmd_vartype_stringify(cmdmap["addr"]) << "'";
- return false;
- }
- dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '"
- << addrstr << "'" << dendl;
-
- entity_addr_t addr;
- const char *end = 0;
- if (!addr.parse(addrstr.c_str(), &end)) {
- ss << "failed to parse addr '" << addrstr << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
- return false;
- }
-
if (is_leader() || is_peon()) {
ss << "mon already active; ignoring bootstrap hint";
return true;
}
- if (addr.get_port() == 0)
- addr.set_port(CEPH_MON_PORT);
+ entity_addrvec_t addrs;
+ string addrstr;
+ if (cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) {
+ dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' addr '"
+ << addrstr << "'" << dendl;
+
+ entity_addr_t addr;
+ const char *end = 0;
+ if (!addr.parse(addrstr.c_str(), &end, entity_addr_t::TYPE_ANY)) {
+ ss << "failed to parse addrs '" << addrstr
+ << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
+ return false;
+ }
+
+ addrs.v.push_back(addr);
+ if (addr.get_port() == 0) {
+ addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
+ addrs.v[0].set_port(CEPH_MON_PORT_IANA);
+ addrs.v.push_back(addr);
+ addrs.v[1].set_type(entity_addr_t::TYPE_LEGACY);
+ addrs.v[1].set_port(CEPH_MON_PORT_LEGACY);
+ } else if (addr.get_type() == entity_addr_t::TYPE_ANY) {
+ if (addr.get_port() == CEPH_MON_PORT_LEGACY) {
+ addrs.v[0].set_type(entity_addr_t::TYPE_LEGACY);
+ } else {
+ addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
+ }
+ }
+ } else if (cmd_getval(g_ceph_context, cmdmap, "addrv", addrstr)) {
+ dout(10) << "_add_bootstrap_peer_hintv '" << cmd << "' addrv '"
+ << addrstr << "'" << dendl;
+ const char *end = 0;
+ if (!addrs.parse(addrstr.c_str(), &end)) {
+ ss << "failed to parse addrs '" << addrstr
+ << "'; syntax is 'add_bootstrap_peer_hintv v2:ip:port[,v1:ip:port]'";
+ return false;
+ }
+ } else {
+ ss << "no addr or addrv provided";
+ return false;
+ }
- extra_probe_peers.insert(addr);
- ss << "adding peer " << addr << " to list: " << extra_probe_peers;
+ extra_probe_peers.insert(addrs);
+ ss << "adding peer " << addrs << " to list: " << extra_probe_peers;
return true;
}
{
dout(10) << __func__ << dendl;
+ // disable authentication
+ {
+ std::lock_guard l(auth_lock);
+ authmon()->_set_mon_num_rank(0, 0);
+ }
+
cancel_probe_timeout();
timecheck_finish();
health_events_cleanup();
scrub_event_cancel();
leader_since = utime_t();
+ quorum_since = {};
if (!quorum.empty()) {
exited_quorum = ceph_clock_now();
}
paxos->restart();
- for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
- (*p)->restart();
- health_monitor->finish();
+ for (auto& svc : paxos_service) {
+ svc->restart();
+ }
}
{
set<string> targets;
targets.insert(paxos->get_name());
- for (int i = 0; i < PAXOS_NUM; ++i)
- paxos_service[i]->get_store_prefixes(targets);
+ for (auto& svc : paxos_service) {
+ svc->get_store_prefixes(targets);
+ }
ConfigKeyService *config_key_service_ptr = dynamic_cast<ConfigKeyService*>(config_key_service);
- assert(config_key_service_ptr);
+ ceph_assert(config_key_service_ptr);
config_key_service_ptr->get_store_prefixes(targets);
return targets;
}
void Monitor::sync_timeout()
{
dout(10) << __func__ << dendl;
- assert(state == STATE_SYNCHRONIZING);
+ ceph_assert(state == STATE_SYNCHRONIZING);
bootstrap();
}
derr << __func__
<< " something wrong happened while reading the store: "
<< cpp_strerror(err) << dendl;
- assert(0 == "error reading the store");
+ ceph_abort_msg("error reading the store");
}
} else {
latest_monmap.decode(monmon_bl);
derr << __func__
<< " something wrong happened while reading the store: "
<< cpp_strerror(err) << dendl;
- assert(0 == "error reading the store");
+ ceph_abort_msg("error reading the store");
}
- assert(backup_bl.length() > 0);
+ ceph_assert(backup_bl.length() > 0);
MonMap backup_monmap;
backup_monmap.decode(backup_bl);
sync_timeout_event = NULL;
}
- sync_provider = entity_inst_t();
+ sync_provider = entity_addrvec_t();
sync_cookie = 0;
sync_full = false;
sync_start_version = 0;
sync_providers.clear();
}
-void Monitor::sync_start(entity_inst_t &other, bool full)
+void Monitor::sync_start(entity_addrvec_t &addrs, bool full)
{
- dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;
+ dout(10) << __func__ << " " << addrs << (full ? " full" : " recent") << dendl;
- assert(state == STATE_PROBING ||
+ ceph_assert(state == STATE_PROBING ||
state == STATE_SYNCHRONIZING);
state = STATE_SYNCHRONIZING;
sync_stash_critical_state(t);
t->put("mon_sync", "in_sync", 1);
- sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
+ sync_last_committed_floor = std::max(sync_last_committed_floor, paxos->get_version());
dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
<< sync_last_committed_floor << dendl;
t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
store->apply_transaction(t);
- assert(g_conf->mon_sync_requester_kill_at != 1);
+ ceph_assert(g_conf()->mon_sync_requester_kill_at != 1);
// clear the underlying store
set<string> targets = get_sync_targets_names();
// deciding a partial or no sync is needed.
paxos->init();
- assert(g_conf->mon_sync_requester_kill_at != 2);
+ ceph_assert(g_conf()->mon_sync_requester_kill_at != 2);
}
// assume 'other' as the leader. We will update the leader once we receive
// a reply to the sync start.
- sync_provider = other;
+ sync_provider = addrs;
sync_reset_timeout();
MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
if (!sync_full)
m->last_committed = paxos->get_version();
- messenger->send_message(m, sync_provider);
+ messenger->send_to_mon(m, sync_provider);
}
void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
dout(10) << __func__ << dendl;
bufferlist backup_monmap;
sync_obtain_latest_monmap(backup_monmap);
- assert(backup_monmap.length() > 0);
+ ceph_assert(backup_monmap.length() > 0);
t->put("mon_sync", "latest_monmap", backup_monmap);
}
if (sync_timeout_event)
timer.cancel_event(sync_timeout_event);
sync_timeout_event = timer.add_event_after(
- g_conf->mon_sync_timeout,
+ g_conf()->mon_sync_timeout,
new C_MonContext(this, [this](int) {
sync_timeout();
}));
{
dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
- assert(g_conf->mon_sync_requester_kill_at != 7);
+ ceph_assert(g_conf()->mon_sync_requester_kill_at != 7);
if (sync_full) {
// finalize the paxos commits
store->apply_transaction(tx);
}
- assert(g_conf->mon_sync_requester_kill_at != 8);
+ ceph_assert(g_conf()->mon_sync_requester_kill_at != 8);
auto t(std::make_shared<MonitorDBStore::Transaction>());
t->erase("mon_sync", "in_sync");
t->erase("mon_sync", "last_committed_floor");
store->apply_transaction(t);
- assert(g_conf->mon_sync_requester_kill_at != 9);
+ ceph_assert(g_conf()->mon_sync_requester_kill_at != 9);
init_paxos();
- assert(g_conf->mon_sync_requester_kill_at != 10);
+ ceph_assert(g_conf()->mon_sync_requester_kill_at != 10);
bootstrap();
}
default:
dout(0) << __func__ << " unknown op " << m->op << dendl;
- assert(0 == "unknown op");
+ ceph_abort_msg("unknown op");
}
}
return;
}
- assert(g_conf->mon_sync_provider_kill_at != 1);
+ ceph_assert(g_conf()->mon_sync_provider_kill_at != 1);
// make sure they can understand us.
if ((required_features ^ m->get_connection()->get_features()) &
// process instance. there is no need to be unique *across*
// monitors, though.
uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
- assert(sync_providers.count(cookie) == 0);
+ ceph_assert(sync_providers.count(cookie) == 0);
dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
SyncProvider& sp = sync_providers[cookie];
sp.cookie = cookie;
- sp.entity = m->get_source_inst();
- sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
+ sp.addrs = m->get_source_addrs();
+ sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
set<string> sync_targets;
if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
return;
}
- assert(g_conf->mon_sync_provider_kill_at != 2);
+ ceph_assert(g_conf()->mon_sync_provider_kill_at != 2);
SyncProvider& sp = sync_providers[m->cookie];
- sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
+ sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
if (sp.last_committed < paxos->get_first_committed() &&
paxos->get_first_committed() > 1) {
MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
auto tx(std::make_shared<MonitorDBStore::Transaction>());
- int left = g_conf->mon_sync_max_payload_size;
+ int left = g_conf()->mon_sync_max_payload_size;
while (sp.last_committed < paxos->get_version() && left > 0) {
bufferlist bl;
sp.last_committed++;
int err = store->get(paxos->get_name(), sp.last_committed, bl);
- assert(err == 0);
+ ceph_assert(err == 0);
tx->put(paxos->get_name(), sp.last_committed, bl);
left -= bl.length();
<< " key " << sp.last_key << dendl;
reply->op = MMonSync::OP_LAST_CHUNK;
- assert(g_conf->mon_sync_provider_kill_at != 3);
+ ceph_assert(g_conf()->mon_sync_provider_kill_at != 3);
// clean up our local state
sync_providers.erase(sp.cookie);
}
- ::encode(*tx, reply->chunk_bl);
+ encode(*tx, reply->chunk_bl);
m->get_connection()->send_message(reply);
}
dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
return;
}
- if (m->get_source_inst() != sync_provider) {
+ if (m->get_source_addrs() != sync_provider) {
dout(10) << __func__ << " source does not match, discarding" << dendl;
return;
}
sync_reset_timeout();
sync_get_next_chunk();
- assert(g_conf->mon_sync_requester_kill_at != 3);
+ ceph_assert(g_conf()->mon_sync_requester_kill_at != 3);
}
void Monitor::sync_get_next_chunk()
{
dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
- if (g_conf->mon_inject_sync_get_chunk_delay > 0) {
- dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl;
- usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0));
+ if (g_conf()->mon_inject_sync_get_chunk_delay > 0) {
+ dout(20) << __func__ << " injecting delay of " << g_conf()->mon_inject_sync_get_chunk_delay << dendl;
+ usleep((long long)(g_conf()->mon_inject_sync_get_chunk_delay * 1000000.0));
}
MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
- messenger->send_message(r, sync_provider);
+ messenger->send_to_mon(r, sync_provider);
- assert(g_conf->mon_sync_requester_kill_at != 4);
+ ceph_assert(g_conf()->mon_sync_requester_kill_at != 4);
}
void Monitor::handle_sync_chunk(MonOpRequestRef op)
dout(10) << __func__ << " cookie does not match, discarding" << dendl;
return;
}
- if (m->get_source_inst() != sync_provider) {
+ if (m->get_source_addrs() != sync_provider) {
dout(10) << __func__ << " source does not match, discarding" << dendl;
return;
}
- assert(state == STATE_SYNCHRONIZING);
- assert(g_conf->mon_sync_requester_kill_at != 5);
+ ceph_assert(state == STATE_SYNCHRONIZING);
+ ceph_assert(g_conf()->mon_sync_requester_kill_at != 5);
auto tx(std::make_shared<MonitorDBStore::Transaction>());
tx->append_from_encoded(m->chunk_bl);
store->apply_transaction(tx);
- assert(g_conf->mon_sync_requester_kill_at != 6);
+ ceph_assert(g_conf()->mon_sync_requester_kill_at != 6);
if (!sync_full) {
dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
while (p != sync_providers.end()) {
if (now > p->second.timeout) {
- dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl;
+ dout(10) << __func__ << " expiring cookie " << p->second.cookie
+ << " for " << p->second.addrs << dendl;
sync_providers.erase(p++);
} else {
++p;
probe_timeout_event = new C_MonContext(this, [this](int r) {
probe_timeout(r);
});
- double t = g_conf->mon_probe_timeout;
+ double t = g_conf()->mon_probe_timeout;
if (timer.add_event_after(t, probe_timeout_event)) {
dout(10) << "reset_probe_timeout " << probe_timeout_event
<< " after " << t << " seconds" << dendl;
void Monitor::probe_timeout(int r)
{
dout(4) << "probe_timeout " << probe_timeout_event << dendl;
- assert(is_probing() || is_synchronizing());
- assert(probe_timeout_event);
+ ceph_assert(is_probing() || is_synchronizing());
+ ceph_assert(probe_timeout_event);
probe_timeout_event = NULL;
bootstrap();
}
break;
case MMonProbe::OP_MISSING_FEATURES:
- derr << __func__ << " missing features, have " << CEPH_FEATURES_ALL
+ derr << __func__ << " require release " << m->mon_release << " > "
+ << ceph_release() << ", or missing features (have " << CEPH_FEATURES_ALL
<< ", required " << m->required_features
- << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL)
+ << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL) << ")"
<< dendl;
break;
}
dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
<< " features " << m->get_connection()->get_features() << dendl;
uint64_t missing = required_features & ~m->get_connection()->get_features();
- if (missing) {
- dout(1) << " peer " << m->get_source_addr() << " missing features "
- << missing << dendl;
- if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY)) {
- MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
- name, has_ever_joined);
- m->required_features = required_features;
- m->get_connection()->send_message(r);
- }
+ if (m->mon_release < monmap->min_mon_release || missing) {
+ dout(1) << " peer " << m->get_source_addr() << " release " << m->mon_release
+ << " < min_mon_release " << monmap->min_mon_release
+ << ", or missing features " << missing << dendl;
+ MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
+ name, has_ever_joined, monmap->min_mon_release);
+ m->required_features = required_features;
+ m->get_connection()->send_message(r);
goto out;
}
}
MMonProbe *r;
- r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
+ r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined,
+ ceph_release());
r->name = name;
r->quorum = quorum;
monmap->encode(r->monmap_bl, m->get_connection()->get_features());
// did we discover a peer here?
if (!monmap->contains(m->get_source_addr())) {
- dout(1) << " adding peer " << m->get_source_addr()
+ dout(1) << " adding peer " << m->get_source_addrs()
<< " to list of hints" << dendl;
- extra_probe_peers.insert(m->get_source_addr());
+ extra_probe_peers.insert(m->get_source_addrs());
}
out:
void Monitor::handle_probe_reply(MonOpRequestRef op)
{
MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
- dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
+ dout(10) << "handle_probe_reply " << m->get_source_inst()
+ << " " << *m << dendl;
dout(10) << " monmap is " << *monmap << dendl;
// discover name and addrs during probing or electing states.
bootstrap();
return;
}
- } else {
+ } else if (peer_name.size()) {
dout(10) << " peer name is " << peer_name << dendl;
+ } else {
+ dout(10) << " peer " << m->get_source_addr() << " not in map" << dendl;
}
// new initial peer?
if (monmap->get_epoch() == 0 &&
monmap->contains(m->name) &&
- monmap->get_addr(m->name).is_blank_ip()) {
- dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
- monmap->set_addr(m->name, m->get_source_addr());
+ monmap->get_addrs(m->name).front().is_blank_ip()) {
+ dout(1) << " learned initial mon " << m->name
+ << " addrs " << m->get_source_addrs() << dendl;
+ monmap->set_addrvec(m->name, m->get_source_addrs());
bootstrap();
return;
return;
}
- assert(paxos != NULL);
+ ceph_assert(paxos != NULL);
if (is_synchronizing()) {
dout(10) << " currently syncing" << dendl;
return;
}
- entity_inst_t other = m->get_source_inst();
+ entity_addrvec_t other = m->get_source_addrs();
if (m->paxos_last_version < sync_last_committed_floor) {
dout(10) << " peer paxos versions [" << m->paxos_first_version
sync_start(other, true);
return;
}
- if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
+ if (paxos->get_version() + g_conf()->paxos_max_join_drift < m->paxos_last_version) {
dout(10) << " peer paxos last version " << m->paxos_last_version
<< " vs my version " << paxos->get_version()
<< " (too far ahead)"
}
}
+ // did the existing cluster complete upgrade to luminous?
+ if (osdmon()->osdmap.get_epoch()) {
+ if (osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
+ derr << __func__ << " existing cluster has not completed upgrade to"
+ << " luminous; 'ceph osd require_osd_release luminous' before"
+ << " upgrading" << dendl;
+ exit(0);
+ }
+ if (!osdmon()->osdmap.test_flag(CEPH_OSDMAP_PURGED_SNAPDIRS) ||
+ !osdmon()->osdmap.test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) {
+ derr << __func__ << " existing cluster has not completed a full luminous"
+ << " scrub to purge legacy snapdir objects; please scrub before"
+ << " upgrading beyond luminous." << dendl;
+ exit(0);
+ }
+ }
+
// is there an existing quorum?
if (m->quorum.size()) {
dout(10) << " existing quorum " << m->quorum << dendl;
<< dendl;
if (monmap->contains(name) &&
- !monmap->get_addr(name).is_blank_ip()) {
+ !monmap->get_addrs(name).front().is_blank_ip()) {
// i'm part of the cluster; just initiate a new election
start_election();
} else {
dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
- messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
- monmap->get_inst(*m->quorum.begin()));
+ send_mon_message(
+ new MMonJoin(monmap->fsid, name, messenger->get_myaddrs()),
+ *m->quorum.begin());
}
} else {
if (monmap->contains(m->name)) {
return;
}
- unsigned need = monmap->size() / 2 + 1;
+ unsigned need = monmap->min_quorum_size();
dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
if (outside_quorum.size() >= need) {
if (outside_quorum.count(name)) {
elector.advance_epoch();
rank = monmap->get_rank(name);
- assert(rank == 0);
+ ceph_assert(rank == 0);
set<int> q;
q.insert(rank);
win_election(elector.get_epoch(), q,
CEPH_FEATURES_ALL,
ceph::features::mon::get_supported(),
+ ceph_release(),
metadata);
}
const utime_t& Monitor::get_leader_since() const
{
- assert(state == STATE_LEADER);
+ ceph_assert(state == STATE_LEADER);
return leader_since;
}
void Monitor::_finish_svc_election()
{
- assert(state == STATE_LEADER || state == STATE_PEON);
+ ceph_assert(state == STATE_LEADER || state == STATE_PEON);
- for (auto p : paxos_service) {
+ for (auto& svc : paxos_service) {
// we already called election_finished() on monmon(); avoid callig twice
- if (state == STATE_LEADER && p == monmon())
+ if (state == STATE_LEADER && svc.get() == monmon())
continue;
- p->election_finished();
+ svc->election_finished();
}
}
void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
const mon_feature_t& mon_features,
+ int min_mon_release,
const map<int,Metadata>& metadata)
{
dout(10) << __func__ << " epoch " << epoch << " quorum " << active
<< " features " << features
<< " mon_features " << mon_features
+ << " min_mon_release " << min_mon_release
<< dendl;
- assert(is_electing());
+ ceph_assert(is_electing());
state = STATE_LEADER;
leader_since = ceph_clock_now();
+ quorum_since = mono_clock::now();
leader = rank;
quorum = active;
quorum_con_features = features;
quorum_mon_features = mon_features;
+ quorum_min_mon_release = min_mon_release;
pending_metadata = metadata;
outside_quorum.clear();
// round without agreeing on who the participants are.
monmon()->election_finished();
_finish_svc_election();
- health_monitor->start(epoch);
logger->inc(l_mon_election_win);
// do that anyway for other reasons, though.
MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
bufferlist bl;
- ::encode(m, bl);
+ encode(m, bl);
t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
}
dout(20) << __func__ << " healthmon proposing, waiting" << dendl;
healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext(this,
[this](int r){
- assert(lock.is_locked_by_me());
+ ceph_assert(lock.is_locked_by_me());
do_health_to_clog_interval();
}));
void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
uint64_t features,
- const mon_feature_t& mon_features)
+ const mon_feature_t& mon_features,
+ int min_mon_release)
{
state = STATE_PEON;
leader_since = utime_t();
+ quorum_since = mono_clock::now();
leader = l;
quorum = q;
outside_quorum.clear();
quorum_con_features = features;
quorum_mon_features = mon_features;
+ quorum_min_mon_release = min_mon_release;
dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
<< " quorum is " << quorum << " features are " << quorum_con_features
<< " mon_features are " << quorum_mon_features
+ << " min_mon_release " << min_mon_release
<< dendl;
paxos->peon_init();
_finish_svc_election();
- health_monitor->start(epoch);
logger->inc(l_mon_election_lose);
finish_election();
+}
- if ((quorum_con_features & CEPH_FEATURE_MON_METADATA) &&
- !HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)) {
- // for pre-luminous mons only
- Metadata sys_info;
- collect_metadata(&sys_info);
- messenger->send_message(new MMonMetadata(sys_info),
- monmap->get_inst(get_leader()));
+namespace {
+std::string collect_compression_algorithms()
+{
+ ostringstream os;
+ bool printed = false;
+ for (auto [name, key] : Compressor::compression_algorithms) {
+ if (printed) {
+ os << ", ";
+ } else {
+ printed = true;
+ }
+ std::ignore = key;
+ os << name;
}
+ return os.str();
+}
}
void Monitor::collect_metadata(Metadata *m)
{
collect_sys_info(m, g_ceph_context);
- (*m)["addr"] = stringify(messenger->get_myaddr());
+ (*m)["addrs"] = stringify(messenger->get_myaddrs());
+ (*m)["compression_algorithms"] = collect_compression_algorithms();
+
+ // infer storage device
+ string devname = store->get_devname();
+ set<string> devnames;
+ get_raw_devices(devname, &devnames);
+ (*m)["devices"] = stringify(devnames);
+ string devids;
+ for (auto& devname : devnames) {
+ string err;
+ string id = get_device_id(devname, &err);
+ if (id.size()) {
+ if (!devids.empty()) {
+ devids += ",";
+ }
+ devids += devname + "=" + id;
+ } else {
+ derr << "failed to get devid for " << devname << ": " << err << dendl;
+ }
+ }
+ (*m)["device_ids"] = devids;
}
void Monitor::finish_election()
update_logger();
register_cluster_logger();
+ // enable authentication
+ {
+ std::lock_guard l(auth_lock);
+ authmon()->_set_mon_num_rank(monmap->size(), rank);
+ }
+
// am i named properly?
- string cur_name = monmap->get_name(messenger->get_myaddr());
+ string cur_name = monmap->get_name(messenger->get_myaddrs());
if (cur_name != name) {
dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
- messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
- monmap->get_inst(*quorum.begin()));
+ send_mon_message(
+ new MMonJoin(monmap->fsid, name, messenger->get_myaddrs()),
+ *quorum.begin());
}
}
void Monitor::apply_quorum_to_compatset_features()
{
CompatSet new_features(features);
- if (quorum_con_features & CEPH_FEATURE_OSD_ERASURE_CODES) {
- new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
- }
+ new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
}
- if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2) {
- new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
- }
- if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3) {
- new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
- }
+ new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
+ new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
dout(5) << __func__ << dendl;
_apply_compatset_features(new_features);
}
* once you unset it.
*/
if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
- assert(ceph::features::mon::get_persistent().contains_all(
+ ceph_assert(ceph::features::mon::get_persistent().contains_all(
ceph::features::mon::FEATURE_KRAKEN));
// this feature should only ever be set if the quorum supports it.
- assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
+ ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
}
if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
- assert(ceph::features::mon::get_persistent().contains_all(
+ ceph_assert(ceph::features::mon::get_persistent().contains_all(
ceph::features::mon::FEATURE_LUMINOUS));
// this feature should only ever be set if the quorum supports it.
- assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
+ ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
}
+ if (monmap_features.contains_all(ceph::features::mon::FEATURE_MIMIC)) {
+ ceph_assert(ceph::features::mon::get_persistent().contains_all(
+ ceph::features::mon::FEATURE_MIMIC));
+ // this feature should only ever be set if the quorum supports it.
+ ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_MIMIC));
+ new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
+ }
+ if (monmap_features.contains_all(ceph::features::mon::FEATURE_NAUTILUS)) {
+ ceph_assert(ceph::features::mon::get_persistent().contains_all(
+ ceph::features::mon::FEATURE_NAUTILUS));
+ // this feature should only ever be set if the quorum supports it.
+ ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_NAUTILUS));
+ new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
+ }
dout(5) << __func__ << dendl;
_apply_compatset_features(new_features);
required_features = 0;
// compatset
- if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES)) {
- required_features |= CEPH_FEATURE_OSD_ERASURE_CODES;
- }
if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
required_features |= CEPH_FEATURE_OSDMAP_ENC;
}
- if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2)) {
- required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2;
- }
- if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3)) {
- required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3;
- }
if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
}
if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) {
required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
}
+ if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_MIMIC)) {
+ required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
+ }
+ if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS)) {
+ required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
+ CEPH_FEATUREMASK_CEPHX_V2;
+ }
// monmap
if (monmap->get_required_features().contains_all(
ceph::features::mon::FEATURE_LUMINOUS)) {
required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
}
+ if (monmap->get_required_features().contains_all(
+ ceph::features::mon::FEATURE_MIMIC)) {
+ required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
+ }
+ if (monmap->get_required_features().contains_all(
+ ceph::features::mon::FEATURE_NAUTILUS)) {
+ required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
+ CEPH_FEATUREMASK_CEPHX_V2;
+ }
dout(10) << __func__ << " required_features " << required_features << dendl;
}
f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin()));
+ if (!quorum.empty()) {
+ f->dump_int(
+ "quorum_age",
+ std::chrono::duration_cast<std::chrono::seconds>(
+ mono_clock::now() - quorum_since).count());
+ }
+
f->open_object_section("monmap");
monmap->dump(f);
f->close_section(); // monmap
for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
f->dump_int("mon", *p);
}
-
f->close_section(); // quorum
+ if (!quorum.empty()) {
+ f->dump_int(
+ "quorum_age",
+ std::chrono::duration_cast<std::chrono::seconds>(
+ mono_clock::now() - quorum_since).count());
+ }
+
f->open_object_section("features");
f->dump_stream("required_con") << required_features;
mon_feature_t req_mon_features = get_required_mon_features();
f->close_section(); // outside_quorum
f->open_array_section("extra_probe_peers");
- for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
+ for (set<entity_addrvec_t>::iterator p = extra_probe_peers.begin();
p != extra_probe_peers.end();
- ++p)
- f->dump_stream("peer") << *p;
+ ++p) {
+ f->dump_object("peer", *p);
+ }
f->close_section(); // extra_probe_peers
f->open_array_section("sync_provider");
p != sync_providers.end();
++p) {
f->dump_unsigned("cookie", p->second.cookie);
- f->dump_stream("entity") << p->second.entity;
+ f->dump_object("addrs", p->second.addrs);
f->dump_stream("timeout") << p->second.timeout;
f->dump_unsigned("last_committed", p->second.last_committed);
f->dump_stream("last_key") << p->second.last_key;
f->close_section();
}
- if (g_conf->mon_sync_provider_kill_at > 0)
- f->dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
- if (g_conf->mon_sync_requester_kill_at > 0)
- f->dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
+ if (g_conf()->mon_sync_provider_kill_at > 0)
+ f->dump_int("provider_kill_at", g_conf()->mon_sync_provider_kill_at);
+ if (g_conf()->mon_sync_requester_kill_at > 0)
+ f->dump_int("requester_kill_at", g_conf()->mon_sync_requester_kill_at);
f->open_object_section("monmap");
monmap->dump(f);
if (changed.count("mon_health_to_clog")) {
if (!cct->_conf->mon_health_to_clog) {
health_events_cleanup();
+ return;
} else {
if (!health_tick_event) {
health_tick_start();
dout(10) << __func__ << (force ? " (force)" : "") << dendl;
- if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
- string summary;
- health_status_t level = get_health_status(false, nullptr, &summary);
- if (!force &&
- summary == health_status_cache.summary &&
- level == health_status_cache.overall)
- return;
- clog->health(level) << "overall " << summary;
- health_status_cache.summary = summary;
- health_status_cache.overall = level;
- } else {
- // for jewel only
- list<string> status;
- health_status_t overall = get_health(status, NULL, NULL);
- dout(25) << __func__
- << (force ? " (force)" : "")
- << dendl;
-
- string summary = joinify(status.begin(), status.end(), string("; "));
-
- if (!force &&
- overall == health_status_cache.overall &&
- !health_status_cache.summary.empty() &&
- health_status_cache.summary == summary) {
- // we got a dup!
- return;
- }
-
- clog->info() << summary;
-
- health_status_cache.overall = overall;
- health_status_cache.summary = summary;
- }
+ string summary;
+ health_status_t level = get_health_status(false, nullptr, &summary);
+ if (!force &&
+ summary == health_status_cache.summary &&
+ level == health_status_cache.overall)
+ return;
+ clog->health(level) << "overall " << summary;
+ health_status_cache.summary = summary;
+ health_status_cache.overall = level;
}
health_status_t Monitor::get_health_status(
const char *sep2)
{
health_status_t r = HEALTH_OK;
- bool compat = g_conf->mon_health_preluminous_compat;
- bool compat_warn = g_conf->get_val<bool>("mon_health_preluminous_compat_warning");
if (f) {
f->open_object_section("health");
f->open_object_section("checks");
if (f) {
f->close_section();
f->dump_stream("status") << r;
+ f->close_section();
} else {
// one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
*plain = stringify(r);
*plain += "\n";
}
- const std::string old_fields_message = "'ceph health' JSON format has "
- "changed in luminous. If you see this your monitoring system is "
- "scraping the wrong fields. Disable this with 'mon health preluminous "
- "compat warning = false'";
-
- if (f && (compat || compat_warn)) {
- health_status_t cr = compat_warn ? min(HEALTH_WARN, r) : r;
- f->open_array_section("summary");
- if (compat_warn) {
- f->open_object_section("item");
- f->dump_stream("severity") << HEALTH_WARN;
- f->dump_string("summary", old_fields_message);
- f->close_section();
- }
- if (compat) {
- for (auto& svc : paxos_service) {
- svc->get_health_checks().dump_summary_compat(f);
- }
- }
- f->close_section();
- f->dump_stream("overall_status") << cr;
- }
-
- if (want_detail) {
- if (f && (compat || compat_warn)) {
- f->open_array_section("detail");
- if (compat_warn) {
- f->dump_string("item", old_fields_message);
- }
- }
-
+ if (want_detail && !f) {
for (auto& svc : paxos_service) {
- svc->get_health_checks().dump_detail(f, plain, compat);
- }
-
- if (f && (compat || compat_warn)) {
- f->close_section();
+ svc->get_health_checks().dump_detail(plain);
}
}
- if (f) {
- f->close_section();
- }
+
return r;
}
const health_check_map_t& previous,
MonitorDBStore::TransactionRef t)
{
- if (!g_conf->mon_health_to_clog) {
+ if (!g_conf()->mon_health_to_clog) {
return;
}
dout(10) << __func__ << " updated " << updated.checks.size()
<< " previous " << previous.checks.size()
<< dendl;
- const auto min_log_period = g_conf->get_val<int64_t>(
+ const auto min_log_period = g_conf().get_val<int64_t>(
"mon_health_log_update_period");
for (auto& p : updated.checks) {
auto q = previous.checks.find(p.first);
}
}
-health_status_t Monitor::get_health(list<string>& status,
- bufferlist *detailbl,
- Formatter *f)
-{
- list<pair<health_status_t,string> > summary;
- list<pair<health_status_t,string> > detail;
-
- if (f)
- f->open_object_section("health");
-
- for (vector<PaxosService*>::iterator p = paxos_service.begin();
- p != paxos_service.end();
- ++p) {
- PaxosService *s = *p;
- s->get_health(summary, detailbl ? &detail : NULL, cct);
- }
-
- health_monitor->get_health(summary, (detailbl ? &detail : NULL));
-
- health_status_t overall = HEALTH_OK;
- if (!timecheck_skews.empty()) {
- list<string> warns;
- for (map<entity_inst_t,double>::iterator i = timecheck_skews.begin();
- i != timecheck_skews.end(); ++i) {
- entity_inst_t inst = i->first;
- double skew = i->second;
- double latency = timecheck_latencies[inst];
- string name = monmap->get_name(inst.addr);
- ostringstream tcss;
- health_status_t tcstatus = timecheck_status(tcss, skew, latency);
- if (tcstatus != HEALTH_OK) {
- if (overall > tcstatus)
- overall = tcstatus;
- warns.push_back(name);
- ostringstream tmp_ss;
- tmp_ss << "mon." << name
- << " addr " << inst.addr << " " << tcss.str()
- << " (latency " << latency << "s)";
- detail.push_back(make_pair(tcstatus, tmp_ss.str()));
- }
- }
- if (!warns.empty()) {
- ostringstream ss;
- ss << "clock skew detected on";
- while (!warns.empty()) {
- ss << " mon." << warns.front();
- warns.pop_front();
- if (!warns.empty())
- ss << ",";
- }
- status.push_back(ss.str());
- summary.push_back(make_pair(HEALTH_WARN, "Monitor clock skew detected "));
- }
- }
-
- if (f)
- f->open_array_section("summary");
- if (!summary.empty()) {
- while (!summary.empty()) {
- if (overall > summary.front().first)
- overall = summary.front().first;
- status.push_back(summary.front().second);
- if (f) {
- f->open_object_section("item");
- f->dump_stream("severity") << summary.front().first;
- f->dump_string("summary", summary.front().second);
- f->close_section();
- }
- summary.pop_front();
- }
- }
- if (f)
- f->close_section();
-
- stringstream fss;
- fss << overall;
- status.push_front(fss.str());
- if (f)
- f->dump_stream("overall_status") << overall;
-
- if (f)
- f->open_array_section("detail");
- while (!detail.empty()) {
- if (f)
- f->dump_string("item", detail.front().second);
- else if (detailbl != NULL) {
- detailbl->append(detail.front().second);
- detailbl->append('\n');
- }
- detail.pop_front();
- }
- if (f)
- f->close_section();
-
- if (f)
- f->close_section();
-
- return overall;
-}
-
void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
{
if (f)
f->open_object_section("status");
+ mono_clock::time_point now = mono_clock::now();
if (f) {
f->dump_stream("fsid") << monmap->get_fsid();
- if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
- get_health_status(false, f, nullptr);
- } else {
- list<string> health_str;
- get_health(health_str, nullptr, f);
- }
+ get_health_status(false, f, nullptr);
f->dump_unsigned("election_epoch", get_epoch());
{
f->open_array_section("quorum");
for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
f->dump_string("id", monmap->get_name(*p));
f->close_section();
+ f->dump_int(
+ "quorum_age",
+ std::chrono::duration_cast<std::chrono::seconds>(
+ mono_clock::now() - quorum_since).count());
}
f->open_object_section("monmap");
monmap->dump(f);
osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
f->close_section();
f->open_object_section("pgmap");
- pgservice->print_summary(f, NULL);
+ mgrstatmon()->print_summary(f, NULL);
f->close_section();
f->open_object_section("fsmap");
mdsmon()->get_fsmap().print_summary(f, NULL);
f->close_section();
f->dump_object("servicemap", mgrstatmon()->get_service_map());
+
+ f->open_object_section("progress_events");
+ for (auto& i : mgrstatmon()->get_progress_events()) {
+ f->dump_object(i.first.c_str(), i.second);
+ }
+ f->close_section();
+
f->close_section();
} else {
ss << " cluster:\n";
ss << " id: " << monmap->get_fsid() << "\n";
string health;
- if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
- get_health_status(false, nullptr, &health,
- "\n ", "\n ");
- } else {
- list<string> ls;
- get_health(ls, NULL, f);
- health = joinify(ls.begin(), ls.end(),
- string("\n "));
- }
+ get_health_status(false, nullptr, &health,
+ "\n ", "\n ");
ss << " health: " << health << "\n";
ss << "\n \n services:\n";
const auto quorum_names = get_quorum_names();
const auto mon_count = monmap->mon_info.size();
ss << " mon: " << spacing << mon_count << " daemons, quorum "
- << quorum_names;
+ << quorum_names << " (age " << timespan_str(now - quorum_since) << ")";
if (quorum_names.size() != mon_count) {
std::list<std::string> out_of_q;
for (size_t i = 0; i < monmap->ranks.size(); ++i) {
}
ss << "\n \n data:\n";
- pgservice->print_summary(NULL, &ss);
+ mgrstatmon()->print_summary(NULL, &ss);
+
+ auto& pem = mgrstatmon()->get_progress_events();
+ if (!pem.empty()) {
+ ss << "\n \n progress:\n";
+ for (auto& i : pem) {
+ ss << " " << i.second.message << "\n";
+ ss << " [";
+ unsigned j;
+ for (j=0; j < i.second.progress * 30; ++j) {
+ ss << '=';
+ }
+ for (; j < 30; ++j) {
+ ss << '.';
+ }
+ ss << "]\n";
+ }
+ }
ss << "\n ";
}
}
-void Monitor::_generate_command_map(map<string,cmd_vartype>& cmdmap,
+void Monitor::_generate_command_map(cmdmap_t& cmdmap,
map<string,string> ¶m_str_map)
{
- for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
- p != cmdmap.end(); ++p) {
+ for (auto p = cmdmap.begin(); p != cmdmap.end(); ++p) {
if (p->first == "prefix")
continue;
if (p->first == "caps") {
return nullptr;
}
-bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix,
- const map<string,cmd_vartype>& cmdmap,
+bool Monitor::_allowed_command(MonSession *s, const string &module,
+ const string &prefix, const cmdmap_t& cmdmap,
const map<string,string>& param_str_map,
const MonCommand *this_cmd) {
CEPH_ENTITY_TYPE_MON,
s->entity_name,
module, prefix, param_str_map,
- cmd_r, cmd_w, cmd_x);
+ cmd_r, cmd_w, cmd_x,
+ s->get_peer_socket_addr());
dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
return capable;
void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
Formatter *f,
- bufferlist *rdata,
- bool hide_mgr_flag)
+ uint64_t features,
+ bufferlist *rdata)
{
int cmdnum = 0;
f->open_object_section("command_descriptions");
for (const auto &cmd : commands) {
unsigned flags = cmd.flags;
- if (hide_mgr_flag) {
- flags &= ~MonCommand::FLAG_MGR;
- }
ostringstream secname;
secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
- dump_cmddesc_to_json(f, secname.str(),
+ dump_cmddesc_to_json(f, features, secname.str(),
cmd.cmdstring, cmd.helpstring, cmd.module,
- cmd.req_perms, cmd.availability, flags);
+ cmd.req_perms, flags);
cmdnum++;
}
f->close_section(); // command_descriptions
bool Monitor::is_keyring_required()
{
- string auth_cluster_required = g_conf->auth_supported.empty() ?
- g_conf->auth_cluster_required : g_conf->auth_supported;
- string auth_service_required = g_conf->auth_supported.empty() ?
- g_conf->auth_service_required : g_conf->auth_supported;
-
- return auth_service_required == "cephx" ||
- auth_cluster_required == "cephx";
+ return auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX) ||
+ auth_service_required.is_supported_auth(CEPH_AUTH_CEPHX) ||
+ auth_cluster_required.is_supported_auth(CEPH_AUTH_GSS) ||
+ auth_service_required.is_supported_auth(CEPH_AUTH_GSS);
}
struct C_MgrProxyCommand : public Context {
C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
: mon(mon), op(op), size(s) { }
void finish(int r) {
- Mutex::Locker l(mon->lock);
+ std::lock_guard l(mon->lock);
mon->mgr_proxy_bytes -= size;
mon->reply_command(op, r, outs, outbl, 0);
}
void Monitor::handle_command(MonOpRequestRef op)
{
- assert(op->is_type_command());
+ ceph_assert(op->is_type_command());
MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
if (m->fsid != monmap->fsid) {
dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
return;
}
- MonSession *session = static_cast<MonSession *>(
- m->get_connection()->get_priv());
+ MonSession *session = op->get_session();
if (!session) {
dout(5) << __func__ << " dropping stray message " << *m << dendl;
return;
}
- BOOST_SCOPE_EXIT_ALL(=) {
- session->put();
- };
if (m->cmd.empty()) {
string rs = "No command supplied";
string prefix;
vector<string> fullcmd;
- map<string, cmd_vartype> cmdmap;
+ cmdmap_t cmdmap;
stringstream ss, ds;
bufferlist rdata;
string rs;
if (prefix == "get_command_descriptions") {
bufferlist rdata;
Formatter *f = Formatter::create("json");
- // hide mgr commands until luminous upgrade is complete
- bool hide_mgr_flag =
- osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS;
-
- std::vector<MonCommand> commands;
- // only include mgr commands once all mons are upgrade (and we've dropped
- // the hard-coded PGMonitor commands)
- if (quorum_mon_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
- commands = static_cast<MgrMonitor*>(
- paxos_service[PAXOS_MGR])->get_command_descs();
- }
+ std::vector<MonCommand> commands = static_cast<MgrMonitor*>(
+ paxos_service[PAXOS_MGR].get())->get_command_descs();
for (auto& c : leader_mon_commands) {
commands.push_back(c);
}
- format_command_descriptions(commands, f, &rdata, hide_mgr_flag);
+ auto features = m->get_connection()->get_features();
+ format_command_descriptions(commands, f, features, &rdata);
delete f;
reply_command(op, 0, "", rdata, 0);
return;
param_str_map, mon_cmd)) {
dout(1) << __func__ << " access denied" << dendl;
(cmd_is_rw ? audit_clog->info() : audit_clog->debug())
- << "from='" << session->inst << "' "
+ << "from='" << session->name << " " << session->addrs << "' "
<< "entity='" << session->entity_name << "' "
<< "cmd=" << m->cmd << ": access denied";
reply_command(op, -EACCES, "access denied", 0);
}
(cmd_is_rw ? audit_clog->info() : audit_clog->debug())
- << "from='" << session->inst << "' "
- << "entity='" << session->entity_name << "' "
- << "cmd=" << m->cmd << ": dispatch";
+ << "from='" << session->name << " " << session->addrs << "' "
+ << "entity='" << session->entity_name << "' "
+ << "cmd=" << m->cmd << ": dispatch";
- if (mon_cmd->is_mgr() &&
- osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
+ if (mon_cmd->is_mgr()) {
const auto& hdr = m->get_header();
uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
- uint64_t max = g_conf->get_val<uint64_t>("mon_client_bytes")
- * g_conf->get_val<double>("mon_mgr_proxy_client_bytes_ratio");
+ uint64_t max = g_conf().get_val<Option::size_t>("mon_client_bytes")
+ * g_conf().get_val<double>("mon_mgr_proxy_client_bytes_ratio");
if (mgr_proxy_bytes + size > max) {
dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
<< " + " << size << " > max " << max << dendl;
mdsmon()->dispatch(op);
return;
}
- if ((module == "osd" || prefix == "pg map") &&
+ if ((module == "osd" ||
+ prefix == "pg map" ||
+ prefix == "pg repeer") &&
prefix != "osd last-stat-seq") {
osdmon()->dispatch(op);
return;
}
-
- if (module == "pg") {
- pgmon()->dispatch(op);
+ if (module == "config") {
+ configmon()->dispatch(op);
return;
}
+
if (module == "mon" &&
/* Let the Monitor class handle the following commands:
* 'mon compact'
prefix != "mon sync force" &&
prefix != "mon metadata" &&
prefix != "mon versions" &&
- prefix != "mon count-metadata") {
+ prefix != "mon count-metadata" &&
+ prefix != "mon ok-to-stop" &&
+ prefix != "mon ok-to-add-offline" &&
+ prefix != "mon ok-to-rm") {
monmon()->dispatch(op);
return;
}
if (prefix == "compact" || prefix == "mon compact") {
dout(1) << "triggering manual compaction" << dendl;
- utime_t start = ceph_clock_now();
- store->compact();
- utime_t end = ceph_clock_now();
- end -= start;
- dout(1) << "finished manual compaction in " << end << " seconds" << dendl;
+ auto start = ceph::coarse_mono_clock::now();
+ store->compact_async();
+ auto end = ceph::coarse_mono_clock::now();
+ double duration = std::chrono::duration<double>(end-start).count();
+ dout(1) << "finished manual compaction in " << duration << " seconds" << dendl;
ostringstream oss;
- oss << "compacted " << g_conf->get_val<std::string>("mon_keyvaluedb") << " in " << end << " seconds";
+ oss << "compacted " << g_conf().get_val<std::string>("mon_keyvaluedb") << " in " << duration << " seconds";
rs = oss.str();
r = 0;
}
if (!injected_args.empty()) {
dout(0) << "parsing injected options '" << injected_args << "'" << dendl;
ostringstream oss;
- r = g_conf->injectargs(str_join(injected_args, " "), &oss);
+ r = g_conf().injectargs(str_join(injected_args, " "), &oss);
ss << "injectargs:" << oss.str();
rs = ss.str();
goto out;
if (!timecheck_skews.empty()) {
f->open_object_section("time_skew_status");
for (auto& i : timecheck_skews) {
- entity_inst_t inst = i.first;
double skew = i.second;
- double latency = timecheck_latencies[inst];
- string name = monmap->get_name(inst.addr);
+ double latency = timecheck_latencies[i.first];
+ string name = monmap->get_name(i.first);
ostringstream tcss;
health_status_t tcstatus = timecheck_status(tcss, skew, latency);
f->open_object_section(name.c_str());
cmd_getval(cct, cmdmap, "key", key);
std::string val;
cmd_getval(cct, cmdmap, "value", val);
- r = g_conf->set_val(key, val, true, &ss);
+ r = g_conf().set_val(key, val, &ss);
if (r == 0) {
- g_conf->apply_changes(nullptr);
+ g_conf().apply_changes(nullptr);
}
rs = ss.str();
goto out;
}
rdata.append(ds);
} else if (prefix == "health") {
- if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
- string plain;
- get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
- if (f) {
- f->flush(rdata);
- } else {
- rdata.append(plain);
- }
+ string plain;
+ get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
+ if (f) {
+ f->flush(rdata);
} else {
- list<string> health_str;
- get_health(health_str, detail == "detail" ? &rdata : NULL, f.get());
- if (f) {
- f->flush(ds);
- ds << '\n';
- } else {
- assert(!health_str.empty());
- ds << health_str.front();
- health_str.pop_front();
- if (!health_str.empty()) {
- ds << ' ';
- ds << joinify(health_str.begin(), health_str.end(), string("; "));
- }
- }
- bufferlist comb;
- comb.append(ds);
- if (detail == "detail")
- comb.append(rdata);
- rdata = comb;
+ rdata.append(plain);
}
} else if (prefix == "df") {
bool verbose = (detail == "detail");
if (f)
f->open_object_section("stats");
- pgservice->dump_fs_stats(&ds, f.get(), verbose);
- if (!f)
- ds << '\n';
- pgservice->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
+ mgrstatmon()->dump_cluster_stats(&ds, f.get(), verbose);
+ if (!f) {
+ ds << "\n \n";
+ }
+ mgrstatmon()->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
if (f) {
f->close_section();
ds << '\n';
}
} else {
- assert(0 == "We should never get here!");
+ ceph_abort_msg("We should never get here!");
return;
}
rdata.append(ds);
tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
f->dump_string("tag", tagstr);
- if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
- get_health_status(true, f.get(), nullptr);
- } else {
- list<string> health_str;
- get_health(health_str, nullptr, f.get());
- }
+ get_health_status(true, f.get(), nullptr);
monmon()->dump_info(f.get());
osdmon()->dump_info(f.get());
mdsmon()->dump_info(f.get());
authmon()->dump_info(f.get());
- pgservice->dump_info(f.get());
+ mgrstatmon()->dump_info(f.get());
paxos->dump_info(f.get());
f->flush(rdata);
ostringstream ss2;
- ss2 << "report " << rdata.crc32c(CEPH_MON_PORT);
+ ss2 << "report " << rdata.crc32c(CEPH_MON_PORT_LEGACY);
rs = ss2.str();
r = 0;
} else if (prefix == "osd last-stat-seq") {
print_nodes(f.get(), ds);
osdmon()->print_nodes(f.get());
mdsmon()->print_nodes(f.get());
+ mgrmon()->print_nodes(f.get());
f->close_section();
} else if (node_type == "mon") {
print_nodes(f.get(), ds);
osdmon()->print_nodes(f.get());
} else if (node_type == "mds") {
mdsmon()->print_nodes(f.get());
+ } else if (node_type == "mgr") {
+ mgrmon()->print_nodes(f.get());
}
f->flush(ds);
rdata.append(ds);
rdata.append(ds);
rs = "";
r = 0;
+ } else if (prefix == "mon ok-to-stop") {
+ vector<string> ids;
+ if (!cmd_getval(g_ceph_context, cmdmap, "ids", ids)) {
+ r = -EINVAL;
+ goto out;
+ }
+ set<string> wouldbe;
+ for (auto rank : quorum) {
+ wouldbe.insert(monmap->get_name(rank));
+ }
+ for (auto& n : ids) {
+ if (monmap->contains(n)) {
+ wouldbe.erase(n);
+ }
+ }
+ if (wouldbe.size() < monmap->min_quorum_size()) {
+ r = -EBUSY;
+ rs = "not enough monitors would be available (" + stringify(wouldbe) +
+ ") after stopping mons " + stringify(ids);
+ goto out;
+ }
+ r = 0;
+ rs = "quorum should be preserved (" + stringify(wouldbe) +
+ ") after stopping " + stringify(ids);
+ } else if (prefix == "mon ok-to-add-offline") {
+ if (quorum.size() < monmap->min_quorum_size(monmap->size() + 1)) {
+ rs = "adding a monitor may break quorum (until that monitor starts)";
+ r = -EBUSY;
+ goto out;
+ }
+ rs = "adding another mon that is not yet online will not break quorum";
+ r = 0;
+ } else if (prefix == "mon ok-to-rm") {
+ string id;
+ if (!cmd_getval(g_ceph_context, cmdmap, "id", id)) {
+ r = -EINVAL;
+ rs = "must specify a monitor id";
+ goto out;
+ }
+ if (!monmap->contains(id)) {
+ r = 0;
+ rs = "mon." + id + " does not exist";
+ goto out;
+ }
+ int rank = monmap->get_rank(id);
+ if (quorum.count(rank) &&
+ quorum.size() - 1 < monmap->min_quorum_size(monmap->size() - 1)) {
+ r = -EBUSY;
+ rs = "removing mon." + id + " would break quorum";
+ goto out;
+ }
+ r = 0;
+ rs = "safe to remove mon." + id;
} else if (prefix == "mon_status") {
get_mon_status(f.get(), ds);
if (f)
r = 0;
} else if (prefix == "sync force" ||
prefix == "mon sync force") {
- string validate1, validate2;
- cmd_getval(g_ceph_context, cmdmap, "validate1", validate1);
- cmd_getval(g_ceph_context, cmdmap, "validate2", validate2);
- if (validate1 != "--yes-i-really-mean-it" ||
- validate2 != "--i-know-what-i-am-doing") {
+ bool validate1 = false;
+ cmd_getval(g_ceph_context, cmdmap, "yes_i_really_mean_it", validate1);
+ bool validate2 = false;
+ cmd_getval(g_ceph_context, cmdmap, "i_know_what_i_am_doing", validate2);
+
+ if (!validate1 || !validate2) {
r = -EINVAL;
rs = "are you SURE? this will mean the monitor store will be "
"erased. pass '--yes-i-really-mean-it "
// XXX 1-element vector, change at callee or make vector here?
vector<string> heapcmd_vec;
get_str_vec(heapcmd, heapcmd_vec);
+ string value;
+ if (cmd_getval(g_ceph_context, cmdmap, "value", value))
+ heapcmd_vec.push_back(value);
ceph_heap_profiler_handle_command(heapcmd_vec, ds);
rdata.append(ds);
rs = "";
f->flush(rdata);
rs = "";
r = 0;
+ } else if (prefix == "smart") {
+ string want_devid;
+ cmd_getval(cct, cmdmap, "devid", want_devid);
+
+ string devname = store->get_devname();
+ set<string> devnames;
+ get_raw_devices(devname, &devnames);
+ json_spirit::mObject json_map;
+ uint64_t smart_timeout = cct->_conf.get_val<uint64_t>(
+ "mon_smart_report_timeout");
+ for (auto& devname : devnames) {
+ string err;
+ string devid = get_device_id(devname, &err);
+ if (want_devid.size() && want_devid != devid) {
+ derr << "get_device_id failed on " << devname << ": " << err << dendl;
+ continue;
+ }
+ json_spirit::mValue smart_json;
+ if (block_device_get_metrics(devname, smart_timeout,
+ &smart_json)) {
+ dout(10) << "block_device_get_metrics failed for /dev/" << devname
+ << dendl;
+ continue;
+ }
+ json_map[devid] = smart_json;
+ }
+ ostringstream ss;
+ json_spirit::write(json_map, ss, json_spirit::pretty_print);
+ rdata.append(ss.str());
+ r = 0;
+ rs = "";
}
out:
bufferlist& rdata, version_t version)
{
MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
- assert(m->get_type() == MSG_MON_COMMAND);
+ ceph_assert(m->get_type() == MSG_MON_COMMAND);
MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
reply->set_tid(m->get_tid());
reply->set_data(rdata);
MonSession *session = op->get_session();
PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
- if (req->get_source().is_mon() && req->get_source_addr() != messenger->get_myaddr()) {
+ if (req->get_source().is_mon() && req->get_source_addrs() != messenger->get_myaddrs()) {
dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
} else if (session->proxy_con) {
dout(10) << "forward_request won't double fwd request " << *req << dendl;
} else if (!session->closed) {
RoutedRequest *rr = new RoutedRequest;
rr->tid = ++routed_request_tid;
- rr->client_inst = req->get_source_inst();
rr->con = req->get_connection();
rr->con_features = rr->con->get_features();
encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
} else if (req->get_source().is_mon()) {
forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
}
- messenger->send_message(forward, monmap->get_inst(mon));
+ send_mon_message(forward, mon);
op->mark_forwarded();
- assert(op->get_req()->get_type() != 0);
+ ceph_assert(op->get_req()->get_type() != 0);
} else {
dout(10) << "forward_request no session for request " << *req << dendl;
}
// fake connection attached to forwarded messages
struct AnonConnection : public Connection {
- explicit AnonConnection(CephContext *cct) : Connection(cct, NULL) {}
+ entity_addr_t socket_addr;
+ explicit AnonConnection(CephContext *cct,
+ const entity_addr_t& sa)
+ : Connection(cct, NULL),
+ socket_addr(sa) {}
int send_message(Message *m) override {
- assert(!"send_message on anonymous connection");
+ ceph_assert(!"send_message on anonymous connection");
}
void send_keepalive() override {
- assert(!"send_keepalive on anonymous connection");
+ ceph_assert(!"send_keepalive on anonymous connection");
}
void mark_down() override {
// silently ignore
// silengtly ignore
}
bool is_connected() override { return false; }
+ entity_addr_t get_peer_socket_addr() const override {
+ return socket_addr;
+ }
};
//extract the original message and put it into the regular dispatch function
void Monitor::handle_forward(MonOpRequestRef op)
{
MForward *m = static_cast<MForward*>(op->get_req());
- dout(10) << "received forwarded message from " << m->client
+ dout(10) << "received forwarded message from "
+ << ceph_entity_type_name(m->client_type)
+ << " " << m->client_addrs
<< " via " << m->get_source_inst() << dendl;
MonSession *session = op->get_session();
- assert(session);
+ ceph_assert(session);
if (!session->is_capable("mon", MON_CAP_X)) {
dout(0) << "forward from entity with insufficient caps! "
// see PaxosService::dispatch(); we rely on this being anon
// (c->msgr == NULL)
PaxosServiceMessage *req = m->claim_message();
- assert(req != NULL);
-
- ConnectionRef c(new AnonConnection(cct));
- MonSession *s = new MonSession(req->get_source_inst(),
- static_cast<Connection*>(c.get()));
- c->set_priv(s->get());
- c->set_peer_addr(m->client.addr);
- c->set_peer_type(m->client.name.type());
+ ceph_assert(req != NULL);
+
+ ConnectionRef c(new AnonConnection(cct, m->client_socket_addr));
+ MonSession *s = new MonSession(static_cast<Connection*>(c.get()));
+ s->_ident(req->get_source(),
+ req->get_source_addrs());
+ c->set_priv(RefCountedPtr{s, false});
+ c->set_peer_addrs(m->client_addrs);
+ c->set_peer_type(m->client_type);
c->set_features(m->con_features);
+ s->authenticated = true;
s->caps = m->client_caps;
dout(10) << " caps are " << s->caps << dendl;
s->entity_name = m->entity_name;
*/
req->rx_election_epoch = get_epoch();
- /* Because this is a special fake connection, we need to break
- the ref loop between Connection and MonSession differently
- than we normally do. Here, the Message refers to the Connection
- which refers to the Session, and nobody else refers to the Connection
- or the Session. And due to the special nature of this message,
- nobody refers to the Connection via the Session. So, clear out that
- half of the ref loop.*/
- s->con.reset(NULL);
-
dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
-
_ms_dispatch(req);
- s->put();
- }
-}
-
-void Monitor::try_send_message(Message *m, const entity_inst_t& to)
-{
- dout(10) << "try_send_message " << *m << " to " << to << dendl;
-
- bufferlist bl;
- encode_message(m, quorum_con_features, bl);
-
- messenger->send_message(m, to);
- for (int i=0; i<(int)monmap->size(); i++) {
- if (i != rank)
- messenger->send_message(new MRoute(bl, to), monmap->get_inst(i));
+ // break the session <-> con ref loop by removing the con->session
+ // reference, which is no longer needed once the MonOpRequest is
+ // set up.
+ c->set_priv(NULL);
}
}
op->mark_event(__func__);
MonSession *session = op->get_session();
- assert(session);
+ ceph_assert(session);
Message *req = op->get_req();
ConnectionRef con = op->get_connection();
return;
}
if (m->msg)
- dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
+ dout(10) << "handle_route tid " << m->session_mon_tid << " " << *m->msg
+ << dendl;
else
- dout(10) << "handle_route null to " << m->dest << dendl;
+ dout(10) << "handle_route tid " << m->session_mon_tid << " null" << dendl;
// look it up
if (m->session_mon_tid) {
osdmon()->send_incremental(m->send_osdmap_first, rr->session,
true, MonOpRequestRef());
}
- assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
+ ceph_assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
routed_requests.erase(m->session_mon_tid);
rr->session->routed_request_tids.erase(m->session_mon_tid);
delete rr;
dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
}
} else {
- dout(10) << " not a routed request, trying to send anyway" << dendl;
- if (m->msg) {
- messenger->send_message(m->msg, m->dest);
- m->msg = NULL;
- }
+ dout(10) << " not a routed request, ignoring" << dendl;
}
}
rr->op->mark_event("retry routed request");
retry.push_back(new C_RetryMessage(this, rr->op));
if (rr->session) {
- assert(rr->session->routed_request_tids.count(p->first));
+ ceph_assert(rr->session->routed_request_tids.count(p->first));
rr->session->routed_request_tids.erase(p->first);
}
delete rr;
} else {
- bufferlist::iterator q = rr->request_bl.begin();
- PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q);
+ auto q = rr->request_bl.cbegin();
+ PaxosServiceMessage *req =
+ (PaxosServiceMessage *)decode_message(cct, 0, q);
rr->op->mark_event("resend forwarded message to leader");
- dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
- MForward *forward = new MForward(rr->tid, req, rr->con_features,
+ dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req
+ << dendl;
+ MForward *forward = new MForward(rr->tid,
+ req,
+ rr->con_features,
rr->session->caps);
req->put(); // forward takes its own ref; drop ours.
- forward->client = rr->client_inst;
+ forward->client_type = rr->con->get_peer_type();
+ forward->client_addrs = rr->con->get_peer_addrs();
+ forward->client_socket_addr = rr->con->get_peer_socket_addr();
forward->set_priority(req->get_priority());
- messenger->send_message(forward, monmap->get_inst(mon));
+ send_mon_message(forward, mon);
}
}
if (mon == rank) {
void Monitor::remove_session(MonSession *s)
{
- dout(10) << "remove_session " << s << " " << s->inst
+ dout(10) << "remove_session " << s << " " << s->name << " " << s->addrs
<< " features 0x" << std::hex << s->con_features << std::dec << dendl;
- assert(s->con);
- assert(!s->closed);
+ ceph_assert(s->con);
+ ceph_assert(!s->closed);
for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
p != s->routed_request_tids.end();
++p) {
- assert(routed_requests.count(*p));
+ ceph_assert(routed_requests.count(*p));
RoutedRequest *rr = routed_requests[*p];
dout(10) << " dropping routed request " << rr->tid << dendl;
delete rr;
routed_requests.erase(*p);
}
s->routed_request_tids.clear();
- s->con->set_priv(NULL);
+ s->con->set_priv(nullptr);
session_map.remove_session(s);
logger->set(l_mon_num_sessions, session_map.get_size());
logger->inc(l_mon_session_rm);
void Monitor::remove_all_sessions()
{
- Mutex::Locker l(session_map_lock);
+ std::lock_guard l(session_map_lock);
while (!session_map.sessions.empty()) {
MonSession *s = session_map.sessions.front();
remove_session(s);
- if (logger)
- logger->inc(l_mon_session_rm);
+ logger->inc(l_mon_session_rm);
}
if (logger)
logger->set(l_mon_num_sessions, session_map.get_size());
}
-void Monitor::send_command(const entity_inst_t& inst,
- const vector<string>& com)
+void Monitor::send_mon_message(Message *m, int rank)
{
- dout(10) << "send_command " << inst << "" << com << dendl;
- MMonCommand *c = new MMonCommand(monmap->fsid);
- c->cmd = com;
- try_send_message(c, inst);
+ messenger->send_to_mon(m, monmap->get_addrs(rank));
}
void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
// proxied sessions aren't registered and don't have a con; don't remove
// those.
if (!s->proxy_con) {
- Mutex::Locker l(session_map_lock);
+ std::lock_guard l(session_map_lock);
remove_session(s);
}
op->mark_zap();
ConnectionRef con = m->get_connection();
{
- Mutex::Locker l(session_map_lock);
- s = session_map.new_session(m->get_source_inst(), con.get());
+ std::lock_guard l(session_map_lock);
+ s = session_map.new_session(m->get_source(),
+ m->get_source_addrs(),
+ con.get());
}
- assert(s);
- con->set_priv(s->get());
+ ceph_assert(s);
+ con->set_priv(RefCountedPtr{s, false});
dout(10) << __func__ << " new session " << s << " " << *s
<< " features 0x" << std::hex
<< s->con_features << std::dec << dendl;
// give it monitor caps; the peer type has been authenticated
dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
if (!s->caps.is_allow_all()) // but no need to repeatedly copy
- s->caps = *mon_caps;
+ s->caps = mon_caps;
+ s->authenticated = true;
}
- s->put();
} else {
- dout(20) << __func__ << " existing session " << s << " for " << s->inst
+ dout(20) << __func__ << " existing session " << s << " for " << s->name
<< dendl;
}
- assert(s);
+ ceph_assert(s);
s->session_timeout = ceph_clock_now();
- s->session_timeout += g_conf->mon_session_timeout;
+ s->session_timeout += g_conf()->mon_session_timeout;
if (s->auth_handler) {
s->entity_name = s->auth_handler->get_entity_name();
dout(20) << " caps " << s->caps.get_str() << dendl;
if ((is_synchronizing() ||
- (s->global_id == 0 && !exited_quorum.is_zero())) &&
+ (!s->authenticated && !exited_quorum.is_zero())) &&
!src_is_mon &&
m->get_type() != CEPH_MSG_PING) {
waitlist_or_zap_client(op);
{
op->mark_event("mon:dispatch_op");
MonSession *s = op->get_session();
- assert(s);
+ ceph_assert(s);
if (s->closed) {
dout(10) << " session closed, dropping " << op->get_req() << dendl;
return;
/* we will consider the default type as being 'monitor' until proven wrong */
op->set_type_monitor();
/* deal with all messages that do not necessarily need caps */
- bool dealt_with = true;
switch (op->get_req()->get_type()) {
// auth
case MSG_MON_GLOBAL_ID:
op->set_type_service();
/* no need to check caps here */
paxos_service[PAXOS_AUTH]->dispatch(op);
- break;
+ return;
case CEPH_MSG_PING:
handle_ping(op);
- break;
+ return;
+ }
- /* MMonGetMap may be used by clients to obtain a monmap *before*
- * authenticating with the monitor. We need to handle these without
- * checking caps because, even on a cluster without cephx, we only set
- * session caps *after* the auth handshake. A good example of this
- * is when a client calls MonClient::get_monmap_privately(), which does
- * not authenticate when obtaining a monmap.
- */
+ if (!op->get_session()->authenticated) {
+ dout(5) << __func__ << " " << op->get_req()->get_source_inst()
+ << " is not authenticated, dropping " << *(op->get_req())
+ << dendl;
+ return;
+ }
+
+ switch (op->get_req()->get_type()) {
case CEPH_MSG_MON_GET_MAP:
handle_mon_get_map(op);
- break;
+ return;
+
+ case MSG_GET_CONFIG:
+ configmon()->handle_get_config(op);
+ return;
case CEPH_MSG_MON_METADATA:
return handle_mon_metadata(op);
- default:
- dealt_with = false;
- break;
+ case CEPH_MSG_MON_SUBSCRIBE:
+ /* FIXME: check what's being subscribed, filter accordingly */
+ handle_subscribe(op);
+ return;
}
- if (dealt_with)
- return;
/* well, maybe the op belongs to a service... */
op->set_type_service();
/* deal with all messages which caps should be checked somewhere else */
- dealt_with = true;
switch (op->get_req()->get_type()) {
// OSDs
case MSG_OSD_PGTEMP:
case MSG_OSD_PG_CREATED:
case MSG_REMOVE_SNAPS:
+ case MSG_OSD_PG_READY_TO_MERGE:
paxos_service[PAXOS_OSDMAP]->dispatch(op);
- break;
+ return;
// MDSs
case MSG_MDS_BEACON:
case MSG_MDS_OFFLOAD_TARGETS:
paxos_service[PAXOS_MDSMAP]->dispatch(op);
- break;
+ return;
// Mgrs
case MSG_MGR_BEACON:
paxos_service[PAXOS_MGR]->dispatch(op);
- break;
+ return;
// MgrStat
- case CEPH_MSG_STATFS:
- // this is an ugly hack, sorry! force the version to 1 so that we do
- // not run afoul of the is_readable() paxos check. the client is going
- // by the pgmonitor version and the MgrStatMonitor version will lag behind
- // that until we complete the upgrade. The paxos ordering crap really
- // doesn't matter for statfs results, so just kludge around it here.
- if (osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
- ((MStatfs*)op->get_req())->version = 1;
- }
case MSG_MON_MGR_REPORT:
+ case CEPH_MSG_STATFS:
case MSG_GETPOOLSTATS:
paxos_service[PAXOS_MGRSTAT]->dispatch(op);
- break;
-
- // pg
- case MSG_PGSTATS:
- paxos_service[PAXOS_PGMAP]->dispatch(op);
- break;
+ return;
- // log
+ // log
case MSG_LOG:
paxos_service[PAXOS_LOG]->dispatch(op);
- break;
+ return;
// handle_command() does its own caps checking
case MSG_MON_COMMAND:
op->set_type_command();
handle_command(op);
- break;
-
- default:
- dealt_with = false;
- break;
+ return;
}
- if (dealt_with)
- return;
/* nop, looks like it's not a service message; revert back to monitor */
op->set_type_monitor();
dout(5) << __func__ << " " << op->get_req()->get_source_inst()
<< " not enough caps for " << *(op->get_req()) << " -- dropping"
<< dendl;
- goto drop;
+ return;
}
- dealt_with = true;
switch (op->get_req()->get_type()) {
-
// misc
case CEPH_MSG_MON_GET_VERSION:
handle_get_version(op);
- break;
-
- case CEPH_MSG_MON_SUBSCRIBE:
- /* FIXME: check what's being subscribed, filter accordingly */
- handle_subscribe(op);
- break;
-
- default:
- dealt_with = false;
- break;
+ return;
}
- if (dealt_with)
- return;
if (!op->is_src_mon()) {
dout(1) << __func__ << " unexpected monitor message from"
<< " non-monitor entity " << op->get_req()->get_source_inst()
<< " " << *(op->get_req()) << " -- dropping" << dendl;
- goto drop;
+ return;
}
/* messages that should only be sent by another monitor */
- dealt_with = true;
switch (op->get_req()->get_type()) {
case MSG_ROUTE:
handle_route(op);
- break;
+ return;
case MSG_MON_PROBE:
handle_probe(op);
- break;
+ return;
// Sync (i.e., the new slurp, but on steroids)
case MSG_MON_SYNC:
handle_sync(op);
- break;
+ return;
case MSG_MON_SCRUB:
handle_scrub(op);
- break;
+ return;
/* log acks are sent from a monitor we sent the MLog to, and are
never sent by clients to us. */
case MSG_LOGACK:
log_client.handle_log_ack((MLogAck*)op->get_req());
- break;
+ return;
// monmap
case MSG_MON_JOIN:
op->set_type_service();
paxos_service[PAXOS_MONMAP]->dispatch(op);
- break;
+ return;
// paxos
case MSG_MON_PAXOS:
MMonPaxos *pm = static_cast<MMonPaxos*>(op->get_req());
if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
//can't send these!
- break;
+ return;
}
if (state == STATE_SYNCHRONIZING) {
// good, thus just drop them and ignore them.
dout(10) << __func__ << " ignore paxos msg from "
<< pm->get_source_inst() << dendl;
- break;
+ return;
}
// sanitize
if (pm->epoch > get_epoch()) {
bootstrap();
- break;
+ return;
}
if (pm->epoch != get_epoch()) {
- break;
+ return;
}
paxos->dispatch(op);
}
- break;
+ return;
// elector messages
case MSG_MON_ELECTION:
if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
dout(0) << "MMonElection received from entity without enough caps!"
<< op->get_session()->caps << dendl;
- break;
+ return;;
}
if (!is_probing() && !is_synchronizing()) {
elector.dispatch(op);
}
- break;
+ return;
case MSG_FORWARD:
handle_forward(op);
- break;
+ return;
case MSG_TIMECHECK:
+ dout(5) << __func__ << " ignoring " << op << dendl;
+ return;
+ case MSG_TIMECHECK2:
handle_timecheck(op);
- break;
+ return;
case MSG_MON_HEALTH:
- health_monitor->dispatch(op);
+ dout(5) << __func__ << " dropping deprecated message: "
+ << *op->get_req() << dendl;
break;
-
case MSG_MON_HEALTH_CHECKS:
op->set_type_service();
paxos_service[PAXOS_HEALTH]->dispatch(op);
- break;
-
- default:
- dealt_with = false;
- break;
- }
- if (!dealt_with) {
- dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
- goto drop;
+ return;
}
- return;
-
-drop:
+ dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
return;
}
MPing *m = static_cast<MPing*>(op->get_req());
dout(10) << __func__ << " " << *m << dendl;
MPing *reply = new MPing;
- entity_inst_t inst = m->get_source_inst();
bufferlist payload;
boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
f->open_object_section("pong");
- if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
- get_health_status(false, f.get(), nullptr);
- } else {
- list<string> health_str;
- get_health(health_str, nullptr, f.get());
- }
-
+ get_health_status(false, f.get(), nullptr);
{
stringstream ss;
get_mon_status(f.get(), ss);
f->close_section();
stringstream ss;
f->flush(ss);
- ::encode(ss.str(), payload);
+ encode(ss.str(), payload);
reply->set_payload(payload);
dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
- messenger->send_message(reply, inst);
+ m->get_connection()->send_message(reply);
}
void Monitor::timecheck_start()
{
dout(10) << __func__ << dendl;
timecheck_cleanup();
- timecheck_start_round();
+ if (get_quorum_mon_features().contains_all(
+ ceph::features::mon::FEATURE_NAUTILUS)) {
+ timecheck_start_round();
+ }
}
void Monitor::timecheck_finish()
void Monitor::timecheck_start_round()
{
dout(10) << __func__ << " curr " << timecheck_round << dendl;
- assert(is_leader());
+ ceph_assert(is_leader());
if (monmap->size() == 1) {
- assert(0 == "We are alone; this shouldn't have been scheduled!");
+ ceph_abort_msg("We are alone; this shouldn't have been scheduled!");
return;
}
if (timecheck_round % 2) {
dout(10) << __func__ << " there's a timecheck going on" << dendl;
utime_t curr_time = ceph_clock_now();
- double max = g_conf->mon_timecheck_interval*3;
+ double max = g_conf()->mon_timecheck_interval*3;
if (curr_time - timecheck_round_start < max) {
dout(10) << __func__ << " keep current round going" << dendl;
goto out;
}
}
- assert(timecheck_round % 2 == 0);
+ ceph_assert(timecheck_round % 2 == 0);
timecheck_acks = 0;
timecheck_round ++;
timecheck_round_start = ceph_clock_now();
void Monitor::timecheck_finish_round(bool success)
{
dout(10) << __func__ << " curr " << timecheck_round << dendl;
- assert(timecheck_round % 2);
+ ceph_assert(timecheck_round % 2);
timecheck_round ++;
timecheck_round_start = utime_t();
if (success) {
- assert(timecheck_waiting.empty());
- assert(timecheck_acks == quorum.size());
+ ceph_assert(timecheck_waiting.empty());
+ ceph_assert(timecheck_acks == quorum.size());
timecheck_report();
timecheck_check_skews();
return;
dout(10) << __func__ << " " << timecheck_waiting.size()
<< " peers still waiting:";
- for (map<entity_inst_t,utime_t>::iterator p = timecheck_waiting.begin();
- p != timecheck_waiting.end(); ++p) {
- *_dout << " " << p->first.name;
+ for (auto& p : timecheck_waiting) {
+ *_dout << " mon." << p.first;
}
*_dout << dendl;
timecheck_waiting.clear();
void Monitor::timecheck_check_skews()
{
dout(10) << __func__ << dendl;
- assert(is_leader());
- assert((timecheck_round % 2) == 0);
+ ceph_assert(is_leader());
+ ceph_assert((timecheck_round % 2) == 0);
if (monmap->size() == 1) {
- assert(0 == "We are alone; we shouldn't have gotten here!");
+ ceph_abort_msg("We are alone; we shouldn't have gotten here!");
return;
}
- assert(timecheck_latencies.size() == timecheck_skews.size());
+ ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
bool found_skew = false;
- for (map<entity_inst_t, double>::iterator p = timecheck_skews.begin();
- p != timecheck_skews.end(); ++p) {
-
+ for (auto& p : timecheck_skews) {
double abs_skew;
- if (timecheck_has_skew(p->second, &abs_skew)) {
+ if (timecheck_has_skew(p.second, &abs_skew)) {
dout(10) << __func__
- << " " << p->first << " skew " << abs_skew << dendl;
+ << " " << p.first << " skew " << abs_skew << dendl;
found_skew = true;
}
}
void Monitor::timecheck_report()
{
dout(10) << __func__ << dendl;
- assert(is_leader());
- assert((timecheck_round % 2) == 0);
+ ceph_assert(is_leader());
+ ceph_assert((timecheck_round % 2) == 0);
if (monmap->size() == 1) {
- assert(0 == "We are alone; we shouldn't have gotten here!");
+ ceph_abort_msg("We are alone; we shouldn't have gotten here!");
return;
}
- assert(timecheck_latencies.size() == timecheck_skews.size());
+ ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
bool do_output = true; // only output report once
for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
if (monmap->get_name(*q) == name)
continue;
- MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT);
+ MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_REPORT);
m->epoch = get_epoch();
m->round = timecheck_round;
- for (map<entity_inst_t, double>::iterator it = timecheck_skews.begin();
- it != timecheck_skews.end(); ++it) {
- double skew = it->second;
- double latency = timecheck_latencies[it->first];
+ for (auto& it : timecheck_skews) {
+ double skew = it.second;
+ double latency = timecheck_latencies[it.first];
- m->skews[it->first] = skew;
- m->latencies[it->first] = latency;
+ m->skews[it.first] = skew;
+ m->latencies[it.first] = latency;
if (do_output) {
- dout(25) << __func__ << " " << it->first
+ dout(25) << __func__ << " mon." << it.first
<< " latency " << latency
<< " skew " << skew << dendl;
}
}
do_output = false;
- entity_inst_t inst = monmap->get_inst(*q);
- dout(10) << __func__ << " send report to " << inst << dendl;
- messenger->send_message(m, inst);
+ dout(10) << __func__ << " send report to mon." << *q << dendl;
+ send_mon_message(m, *q);
}
}
void Monitor::timecheck()
{
dout(10) << __func__ << dendl;
- assert(is_leader());
+ ceph_assert(is_leader());
if (monmap->size() == 1) {
- assert(0 == "We are alone; we shouldn't have gotten here!");
+ ceph_abort_msg("We are alone; we shouldn't have gotten here!");
return;
}
- assert(timecheck_round % 2 != 0);
+ ceph_assert(timecheck_round % 2 != 0);
timecheck_acks = 1; // we ack ourselves
<< " round " << timecheck_round << dendl;
// we are at the eye of the storm; the point of reference
- timecheck_skews[messenger->get_myinst()] = 0.0;
- timecheck_latencies[messenger->get_myinst()] = 0.0;
+ timecheck_skews[rank] = 0.0;
+ timecheck_latencies[rank] = 0.0;
for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
if (monmap->get_name(*it) == name)
continue;
- entity_inst_t inst = monmap->get_inst(*it);
utime_t curr_time = ceph_clock_now();
- timecheck_waiting[inst] = curr_time;
- MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING);
+ timecheck_waiting[*it] = curr_time;
+ MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_PING);
m->epoch = get_epoch();
m->round = timecheck_round;
- dout(10) << __func__ << " send " << *m << " to " << inst << dendl;
- messenger->send_message(m, inst);
+ dout(10) << __func__ << " send " << *m << " to mon." << *it << dendl;
+ send_mon_message(m, *it);
}
}
const double latency)
{
health_status_t status = HEALTH_OK;
- assert(latency >= 0);
+ ceph_assert(latency >= 0);
double abs_skew;
if (timecheck_has_skew(skew_bound, &abs_skew)) {
status = HEALTH_WARN;
ss << "clock skew " << abs_skew << "s"
- << " > max " << g_conf->mon_clock_drift_allowed << "s";
+ << " > max " << g_conf()->mon_clock_drift_allowed << "s";
}
return status;
void Monitor::handle_timecheck_leader(MonOpRequestRef op)
{
- MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
+ MTimeCheck2 *m = static_cast<MTimeCheck2*>(op->get_req());
dout(10) << __func__ << " " << *m << dendl;
/* handles PONG's */
- assert(m->op == MTimeCheck::OP_PONG);
+ ceph_assert(m->op == MTimeCheck2::OP_PONG);
- entity_inst_t other = m->get_source_inst();
+ int other = m->get_source().num();
if (m->epoch < get_epoch()) {
dout(1) << __func__ << " got old timecheck epoch " << m->epoch
<< " from " << other
<< " -- severely lagged? discard" << dendl;
return;
}
- assert(m->epoch == get_epoch());
+ ceph_assert(m->epoch == get_epoch());
if (m->round < timecheck_round) {
dout(1) << __func__ << " got old round " << m->round
utime_t curr_time = ceph_clock_now();
- assert(timecheck_waiting.count(other) > 0);
+ ceph_assert(timecheck_waiting.count(other) > 0);
utime_t timecheck_sent = timecheck_waiting[other];
timecheck_waiting.erase(other);
if (curr_time < timecheck_sent) {
* may be masked by an even higher latency, but with high latencies
* we probably have worse issues to deal with than just skewed clocks.
*/
- assert(latency >= 0);
+ ceph_assert(latency >= 0);
double delta = ((double) m->timestamp) - ((double) curr_time);
double abs_delta = (delta > 0 ? delta : -delta);
if (timecheck_acks == quorum.size()) {
dout(10) << __func__ << " got pongs from everybody ("
<< timecheck_acks << " total)" << dendl;
- assert(timecheck_skews.size() == timecheck_acks);
- assert(timecheck_waiting.empty());
+ ceph_assert(timecheck_skews.size() == timecheck_acks);
+ ceph_assert(timecheck_waiting.empty());
// everyone has acked, so bump the round to finish it.
timecheck_finish_round();
}
void Monitor::handle_timecheck_peon(MonOpRequestRef op)
{
- MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
+ MTimeCheck2 *m = static_cast<MTimeCheck2*>(op->get_req());
dout(10) << __func__ << " " << *m << dendl;
- assert(is_peon());
- assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT);
+ ceph_assert(is_peon());
+ ceph_assert(m->op == MTimeCheck2::OP_PING || m->op == MTimeCheck2::OP_REPORT);
if (m->epoch != get_epoch()) {
dout(1) << __func__ << " got wrong epoch "
timecheck_round = m->round;
- if (m->op == MTimeCheck::OP_REPORT) {
- assert((timecheck_round % 2) == 0);
+ if (m->op == MTimeCheck2::OP_REPORT) {
+ ceph_assert((timecheck_round % 2) == 0);
timecheck_latencies.swap(m->latencies);
timecheck_skews.swap(m->skews);
return;
}
- assert((timecheck_round % 2) != 0);
- MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG);
+ ceph_assert((timecheck_round % 2) != 0);
+ MTimeCheck2 *reply = new MTimeCheck2(MTimeCheck2::OP_PONG);
utime_t curr_time = ceph_clock_now();
reply->timestamp = curr_time;
reply->epoch = m->epoch;
void Monitor::handle_timecheck(MonOpRequestRef op)
{
- MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
+ MTimeCheck2 *m = static_cast<MTimeCheck2*>(op->get_req());
dout(10) << __func__ << " " << *m << dendl;
if (is_leader()) {
- if (m->op != MTimeCheck::OP_PONG) {
+ if (m->op != MTimeCheck2::OP_PONG) {
dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
} else {
handle_timecheck_leader(op);
}
} else if (is_peon()) {
- if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) {
+ if (m->op != MTimeCheck2::OP_PING && m->op != MTimeCheck2::OP_REPORT) {
dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
} else {
handle_timecheck_peon(op);
bool reply = false;
MonSession *s = op->get_session();
- assert(s);
+ ceph_assert(s);
+
+ if (m->hostname.size()) {
+ s->remote_host = m->hostname;
+ }
for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
p != m->what.end();
++p) {
+ if (p->first == "monmap" || p->first == "config") {
+ // these require no caps
+ } else if (!s->is_capable("mon", MON_CAP_R)) {
+ dout(5) << __func__ << " " << op->get_req()->get_source_inst()
+ << " not enough caps for " << *(op->get_req()) << " -- dropping"
+ << dendl;
+ continue;
+ }
+
// if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
reply = true;
for (map<string, Subscription*>::iterator it = s->sub_map.begin();
it != s->sub_map.end(); ) {
if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
- Mutex::Locker l(session_map_lock);
+ std::lock_guard l(session_map_lock);
session_map.remove_sub((it++)->second);
} else {
++it;
}
{
- Mutex::Locker l(session_map_lock);
+ std::lock_guard l(session_map_lock);
session_map.add_update_sub(s, p->first, p->second.start,
p->second.flags & CEPH_SUBSCRIBE_ONETIME,
m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
if ((int)s->is_capable("mds", MON_CAP_R)) {
Subscription *sub = s->sub_map[p->first];
- assert(sub != nullptr);
+ ceph_assert(sub != nullptr);
mdsmon()->check_sub(sub);
}
} else if (p->first == "osdmap") {
}
} else if (p->first == "osd_pg_creates") {
if ((int)s->is_capable("osd", MON_CAP_W)) {
- if (monmap->get_required_features().contains_all(
- ceph::features::mon::FEATURE_LUMINOUS)) {
- osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
- } else {
- pgmon()->check_sub(s->sub_map["osd_pg_creates"]);
- }
+ osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
}
} else if (p->first == "monmap") {
monmon()->check_sub(s->sub_map[p->first]);
mgrmon()->check_sub(s->sub_map[p->first]);
} else if (p->first == "servicemap") {
mgrstatmon()->check_sub(s->sub_map[p->first]);
+ } else if (p->first == "config") {
+ configmon()->check_sub(s);
}
}
ConnectionRef con = m->get_connection();
if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
m->get_connection()->send_message(new MMonSubscribeAck(
- monmap->get_fsid(), (int)g_conf->mon_subscribe_interval));
+ monmap->get_fsid(), (int)g_conf()->mon_subscribe_interval));
}
}
PaxosService *svc = NULL;
MonSession *s = op->get_session();
- assert(s);
+ ceph_assert(s);
if (!is_leader() && !is_peon()) {
dout(10) << " waiting for quorum" << dendl;
if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
return false;
- MonSession *s = static_cast<MonSession *>(con->get_priv());
+ auto priv = con->get_priv();
+ auto s = static_cast<MonSession*>(priv.get());
if (!s)
return false;
// break any con <-> session ref cycle
- s->con->set_priv(NULL);
+ s->con->set_priv(nullptr);
if (is_shutdown())
return false;
- Mutex::Locker l(lock);
+ std::lock_guard l(lock);
- dout(10) << "reset/close on session " << s->inst << dendl;
- if (!s->closed) {
- Mutex::Locker l(session_map_lock);
+ dout(10) << "reset/close on session " << s->name << " " << s->addrs << dendl;
+ if (!s->closed && s->item.is_on_list()) {
+ std::lock_guard l(session_map_lock);
remove_session(s);
}
- s->put();
return true;
}
MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
bufferlist bl;
- ::encode(pending_metadata, bl);
+ encode(pending_metadata, bl);
t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
paxos->trigger_propose();
}
int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
if (r)
return r;
- bufferlist::iterator it = bl.begin();
- ::decode(mon_metadata, it);
+ auto it = bl.cbegin();
+ decode(mon_metadata, it);
pending_metadata = mon_metadata;
return 0;
int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
{
- assert(f);
+ ceph_assert(f);
if (!mon_metadata.count(mon)) {
err << "mon." << mon << " not found";
return -EINVAL;
int Monitor::print_nodes(Formatter *f, ostream& err)
{
- map<string, list<int> > mons; // hostname => mon
+ map<string, list<string> > mons; // hostname => mon
for (map<int, Metadata>::iterator it = mon_metadata.begin();
it != mon_metadata.end(); ++it) {
const Metadata& m = it->second;
// not likely though
continue;
}
- mons[hostname->second].push_back(it->first);
+ mons[hostname->second].push_back(monmap->get_name(it->first));
}
dump_services(f, mons, "mon");
int Monitor::scrub_start()
{
dout(10) << __func__ << dendl;
- assert(is_leader());
+ ceph_assert(is_leader());
if (!scrub_result.empty()) {
clog->info() << "scrub already in progress";
int Monitor::scrub()
{
- assert(is_leader());
- assert(scrub_state);
+ ceph_assert(is_leader());
+ ceph_assert(scrub_state);
scrub_cancel_timeout();
wait_for_paxos_write();
MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
num_keys);
r->key = scrub_state->last_key;
- messenger->send_message(r, monmap->get_inst(*p));
+ send_mon_message(r, *p);
}
// scrub my keys
scrub_reset_timeout();
if (quorum.size() == 1) {
- assert(scrub_state->finished == true);
+ ceph_assert(scrub_state->finished == true);
scrub_finish();
}
return 0;
scrub_reset_timeout();
int from = m->get_source().num();
- assert(scrub_result.count(from) == 0);
+ ceph_assert(scrub_result.count(from) == 0);
scrub_result[from] = m->result;
if (scrub_result.size() == quorum.size()) {
pair<string,string> *start,
int *num_keys)
{
- assert(r != NULL);
- assert(start != NULL);
- assert(num_keys != NULL);
+ ceph_assert(r != NULL);
+ ceph_assert(start != NULL);
+ ceph_assert(num_keys != NULL);
set<string> prefixes = get_sync_targets_names();
prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
bufferlist bl;
int err = store->get(k.first, k.second, bl);
- assert(err == 0);
+ ceph_assert(err == 0);
uint32_t key_crc = bl.crc32c(0);
dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
dout(15) << __func__ << " reset timeout event" << dendl;
scrub_cancel_timeout();
scrub_timeout_event = timer.add_event_after(
- g_conf->mon_scrub_timeout,
+ g_conf()->mon_scrub_timeout,
new C_MonContext(this, [this](int) {
scrub_timeout();
}));
/************ TICK ***************/
void Monitor::new_tick()
{
- timer.add_event_after(g_conf->mon_tick_interval, new C_MonContext(this, [this](int) {
+ timer.add_event_after(g_conf()->mon_tick_interval, new C_MonContext(this, [this](int) {
tick();
}));
}
// Check if we need to emit any delayed health check updated messages
if (is_leader()) {
- const auto min_period = g_conf->get_val<int64_t>(
+ const auto min_period = g_conf().get_val<int64_t>(
"mon_health_log_update_period");
for (auto& svc : paxos_service) {
auto health = svc->get_health_checks();
}
- for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) {
- (*p)->tick();
- (*p)->maybe_trim();
+ for (auto& svc : paxos_service) {
+ svc->tick();
+ svc->maybe_trim();
}
// trim sessions
{
- Mutex::Locker l(session_map_lock);
+ std::lock_guard l(session_map_lock);
auto p = session_map.sessions.begin();
bool out_for_too_long = (!exited_quorum.is_zero() &&
- now > (exited_quorum + 2*g_conf->mon_lease));
+ now > (exited_quorum + 2*g_conf()->mon_lease));
while (!p.end()) {
MonSession *s = *p;
++p;
// don't trim monitors
- if (s->inst.name.is_mon())
+ if (s->name.is_mon())
continue;
if (s->session_timeout < now && s->con) {
// check keepalive, too
s->session_timeout = s->con->get_last_keepalive();
- s->session_timeout += g_conf->mon_session_timeout;
+ s->session_timeout += g_conf()->mon_session_timeout;
}
if (s->session_timeout < now) {
- dout(10) << " trimming session " << s->con << " " << s->inst
+ dout(10) << " trimming session " << s->con << " " << s->name
+ << " " << s->addrs
<< " (timeout " << s->session_timeout
<< " < now " << now << ")" << dendl;
} else if (out_for_too_long) {
// boot the client Session because we've taken too long getting back in
- dout(10) << " trimming session " << s->con << " " << s->inst
+ dout(10) << " trimming session " << s->con << " " << s->name
<< " because we've been out of quorum too long" << dendl;
} else {
continue;
paxos->trigger_propose();
}
+ mgr_client.update_daemon_health(get_health_metrics());
new_tick();
}
+vector<DaemonHealthMetric> Monitor::get_health_metrics()
+{
+ vector<DaemonHealthMetric> metrics;
+
+ utime_t oldest_secs;
+ const utime_t now = ceph_clock_now();
+ auto too_old = now;
+ too_old -= g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count();
+ int slow = 0;
+ TrackedOpRef oldest_op;
+ auto count_slow_ops = [&](TrackedOp& op) {
+ if (op.get_initiated() < too_old) {
+ slow++;
+ if (!oldest_op || op.get_initiated() < oldest_op->get_initiated()) {
+ oldest_op = &op;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ };
+ if (op_tracker.visit_ops_in_flight(&oldest_secs, count_slow_ops)) {
+ if (slow) {
+ derr << __func__ << " reporting " << slow << " slow ops, oldest is "
+ << oldest_op->get_desc() << dendl;
+ }
+ metrics.emplace_back(daemon_metric::SLOW_OPS, slow, oldest_secs);
+ } else {
+ metrics.emplace_back(daemon_metric::SLOW_OPS, 0, 0);
+ }
+ return metrics;
+}
+
void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
{
uuid_d nf;
dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
bufferlist bl;
- ::encode(nf, bl);
+ encode(nf, bl);
t->put(MONITOR_NAME, "cluster_fingerprint", bl);
}
int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
if (r == -ENOENT)
return r;
- assert(r == 0);
+ ceph_assert(r == 0);
string es(ebl.c_str(), ebl.length());
KeyRing keyring;
string keyring_filename;
- r = ceph_resolve_file_search(g_conf->keyring, keyring_filename);
+ r = ceph_resolve_file_search(g_conf()->keyring, keyring_filename);
if (r) {
- derr << "unable to find a keyring file on " << g_conf->keyring
+ derr << "unable to find a keyring file on " << g_conf()->keyring
<< ": " << cpp_strerror(r) << dendl;
- if (g_conf->key != "") {
- string keyring_plaintext = "[mon.]\n\tkey = " + g_conf->key +
+ if (g_conf()->key != "") {
+ string keyring_plaintext = "[mon.]\n\tkey = " + g_conf()->key +
"\n\tcaps mon = \"allow *\"\n";
bufferlist bl;
bl.append(keyring_plaintext);
try {
- bufferlist::iterator i = bl.begin();
+ auto i = bl.cbegin();
keyring.decode_plaintext(i);
}
catch (const buffer::error& e) {
} else {
r = keyring.load(g_ceph_context, keyring_filename);
if (r < 0) {
- derr << "unable to load initial keyring " << g_conf->keyring << dendl;
+ derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
return r;
}
}
int Monitor::write_default_keyring(bufferlist& bl)
{
ostringstream os;
- os << g_conf->mon_data << "/keyring";
+ os << g_conf()->mon_data << "/keyring";
int err = 0;
int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT|O_CLOEXEC, 0600);
}
}
-bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
- bool force_new)
+// AuthClient methods -- for mon <-> mon communication
+int Monitor::get_auth_request(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ uint32_t *method,
+ vector<uint32_t> *preferred_modes,
+ bufferlist *out)
+{
+ std::scoped_lock l(auth_lock);
+ if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
+ con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
+ return -EACCES;
+ }
+ AuthAuthorizer *auth;
+ if (!ms_get_authorizer(con->get_peer_type(), &auth)) {
+ return -EACCES;
+ }
+ auth_meta->authorizer.reset(auth);
+ auth_registry.get_supported_modes(con->get_peer_type(),
+ auth->protocol,
+ preferred_modes);
+ *method = auth->protocol;
+ *out = auth->bl;
+ return 0;
+}
+
+int Monitor::handle_auth_reply_more(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ const bufferlist& bl,
+ bufferlist *reply)
+{
+ std::scoped_lock l(auth_lock);
+ if (!auth_meta->authorizer) {
+ derr << __func__ << " no authorizer?" << dendl;
+ return -EACCES;
+ }
+ auth_meta->authorizer->add_challenge(cct, bl);
+ *reply = auth_meta->authorizer->bl;
+ return 0;
+}
+
+int Monitor::handle_auth_done(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ uint64_t global_id,
+ uint32_t con_mode,
+ const bufferlist& bl,
+ CryptoKey *session_key,
+ std::string *connection_secret)
+{
+ std::scoped_lock l(auth_lock);
+ // verify authorizer reply
+ auto p = bl.begin();
+ if (!auth_meta->authorizer->verify_reply(p, connection_secret)) {
+ dout(0) << __func__ << " failed verifying authorizer reply" << dendl;
+ return -EACCES;
+ }
+ auth_meta->session_key = auth_meta->authorizer->session_key;
+ return 0;
+}
+
+int Monitor::handle_auth_bad_method(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ uint32_t old_auth_method,
+ int result,
+ const std::vector<uint32_t>& allowed_methods,
+ const std::vector<uint32_t>& allowed_modes)
+{
+ derr << __func__ << " hmm, they didn't like " << old_auth_method
+ << " result " << cpp_strerror(result) << dendl;
+ return -EACCES;
+}
+
+bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer)
{
dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id)
<< dendl;
if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
// auth_none
dout(20) << __func__ << " building auth_none authorizer" << dendl;
- AuthNoneClientHandler handler(g_ceph_context, nullptr);
+ AuthNoneClientHandler handler{g_ceph_context};
handler.set_global_id(0);
*authorizer = handler.build_authorizer(service_id);
return true;
return false;
}
- ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info,
- secret, (uint64_t)-1);
+ ret = key_server.build_session_auth_info(
+ service_id, auth_ticket_info.ticket, info, secret, (uint64_t)-1);
if (ret < 0) {
dout(0) << __func__ << " failed to build mon session_auth_info "
<< cpp_strerror(ret) << dendl;
}
} else if (service_id == CEPH_ENTITY_TYPE_MGR) {
// mgr
- ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info);
+ ret = key_server.build_session_auth_info(
+ service_id, auth_ticket_info.ticket, info);
if (ret < 0) {
derr << __func__ << " failed to build mgr service session_auth_info "
<< cpp_strerror(ret) << dendl;
return false;
}
bufferlist ticket_data;
- ::encode(blob, ticket_data);
+ encode(blob, ticket_data);
- bufferlist::iterator iter = ticket_data.begin();
+ auto iter = ticket_data.cbegin();
CephXTicketHandler handler(g_ceph_context, service_id);
- ::decode(handler.ticket, iter);
+ decode(handler.ticket, iter);
handler.session_key = info.session_key;
return true;
}
-bool Monitor::ms_verify_authorizer(Connection *con, int peer_type,
- int protocol, bufferlist& authorizer_data,
- bufferlist& authorizer_reply,
- bool& isvalid, CryptoKey& session_key,
- std::unique_ptr<AuthAuthorizerChallenge> *challenge)
+KeyStore *Monitor::ms_get_auth1_authorizer_keystore()
{
- dout(10) << "ms_verify_authorizer " << con->get_peer_addr()
- << " " << ceph_entity_type_name(peer_type)
- << " protocol " << protocol << dendl;
+ return &keyring;
+}
- if (is_shutdown())
- return false;
+int Monitor::handle_auth_request(
+ Connection *con,
+ AuthConnectionMeta *auth_meta,
+ bool more,
+ uint32_t auth_method,
+ const bufferlist &payload,
+ bufferlist *reply)
+{
+ std::scoped_lock l(auth_lock);
- if (peer_type == CEPH_ENTITY_TYPE_MON &&
- auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
- // monitor, and cephx is enabled
- isvalid = false;
- if (protocol == CEPH_AUTH_CEPHX) {
- bufferlist::iterator iter = authorizer_data.begin();
- CephXServiceTicketInfo auth_ticket_info;
-
- if (authorizer_data.length()) {
- bool ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter,
- auth_ticket_info, challenge, authorizer_reply);
- if (ret) {
- session_key = auth_ticket_info.session_key;
- isvalid = true;
- } else {
- dout(0) << "ms_verify_authorizer bad authorizer from mon " << con->get_peer_addr() << dendl;
- }
+ // NOTE: be careful, the Connection hasn't fully negotiated yet, so
+ // e.g., peer_features, peer_addrs, and others are still unknown.
+
+ dout(10) << __func__ << " con " << con << (more ? " (more)":" (start)")
+ << " method " << auth_method
+ << " payload " << payload.length()
+ << dendl;
+ if (!more) {
+ auth_meta->auth_mode = payload[0];
+ }
+
+ if (auth_meta->auth_mode >= AUTH_MODE_AUTHORIZER &&
+ auth_meta->auth_mode <= AUTH_MODE_AUTHORIZER_MAX) {
+ AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
+ auth_method);
+ if (!ah) {
+ lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
+ << auth_method << dendl;
+ return -EOPNOTSUPP;
+ }
+ bool was_challenge = (bool)auth_meta->authorizer_challenge;
+ bool isvalid = ah->verify_authorizer(
+ cct,
+ &keyring,
+ payload,
+ auth_meta->get_connection_secret_length(),
+ reply,
+ &con->peer_name,
+ &con->peer_global_id,
+ &con->peer_caps_info,
+ &auth_meta->session_key,
+ &auth_meta->connection_secret,
+ &auth_meta->authorizer_challenge);
+ if (isvalid) {
+ ms_handle_authentication(con);
+ return 1;
+ }
+ if (!more && !was_challenge && auth_meta->authorizer_challenge) {
+ return 0;
+ }
+ dout(10) << __func__ << " bad authorizer on " << con << dendl;
+ return -EACCES;
+ } else if (auth_meta->auth_mode < AUTH_MODE_MON &&
+ auth_meta->auth_mode > AUTH_MODE_MON_MAX) {
+ derr << __func__ << " unrecognized auth mode " << auth_meta->auth_mode
+ << dendl;
+ return -EACCES;
+ }
+
+ // wait until we've formed an initial quorum on mkfs so that we have
+ // the initial keys (e.g., client.admin).
+ if (authmon()->get_last_committed() == 0) {
+ dout(10) << __func__ << " haven't formed initial quorum, EBUSY" << dendl;
+ return -EBUSY;
+ }
+
+ RefCountedPtr priv;
+ MonSession *s;
+ int32_t r = 0;
+ auto p = payload.begin();
+ if (!more) {
+ if (con->get_priv()) {
+ return -EACCES; // wtf
+ }
+
+ // handler?
+ unique_ptr<AuthServiceHandler> auth_handler{get_auth_service_handler(
+ auth_method, g_ceph_context, &key_server)};
+ if (!auth_handler) {
+ dout(1) << __func__ << " auth_method " << auth_method << " not supported"
+ << dendl;
+ return -EOPNOTSUPP;
+ }
+
+ uint8_t mode;
+ EntityName entity_name;
+
+ try {
+ decode(mode, p);
+ if (mode < AUTH_MODE_MON ||
+ mode > AUTH_MODE_MON_MAX) {
+ dout(1) << __func__ << " invalid mode " << (int)mode << dendl;
+ return -EACCES;
+ }
+ assert(mode >= AUTH_MODE_MON && mode <= AUTH_MODE_MON_MAX);
+ decode(entity_name, p);
+ decode(con->peer_global_id, p);
+ } catch (buffer::error& e) {
+ dout(1) << __func__ << " failed to decode, " << e.what() << dendl;
+ return -EACCES;
+ }
+
+ // supported method?
+ if (entity_name.get_type() == CEPH_ENTITY_TYPE_MON ||
+ entity_name.get_type() == CEPH_ENTITY_TYPE_OSD ||
+ entity_name.get_type() == CEPH_ENTITY_TYPE_MDS ||
+ entity_name.get_type() == CEPH_ENTITY_TYPE_MGR) {
+ if (!auth_cluster_required.is_supported_auth(auth_method)) {
+ dout(10) << __func__ << " entity " << entity_name << " method "
+ << auth_method << " not among supported "
+ << auth_cluster_required.get_supported_set() << dendl;
+ return -EOPNOTSUPP;
}
} else {
- dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl;
+ if (!auth_service_required.is_supported_auth(auth_method)) {
+ dout(10) << __func__ << " entity " << entity_name << " method "
+ << auth_method << " not among supported "
+ << auth_cluster_required.get_supported_set() << dendl;
+ return -EOPNOTSUPP;
+ }
+ }
+
+ // for msgr1 we would do some weirdness here to ensure signatures
+ // are supported by the client if we require it. for msgr2 that
+ // is not necessary.
+
+ if (!con->peer_global_id) {
+ con->peer_global_id = authmon()->_assign_global_id();
+ if (!con->peer_global_id) {
+ dout(1) << __func__ << " failed to assign global_id" << dendl;
+ return -EBUSY;
+ }
+ dout(10) << __func__ << " assigned global_id " << con->peer_global_id
+ << dendl;
+ }
+
+ // set up partial session
+ s = new MonSession(con);
+ s->auth_handler = auth_handler.release();
+ con->set_priv(RefCountedPtr{s, false});
+
+ r = s->auth_handler->start_session(
+ entity_name,
+ auth_meta->get_connection_secret_length(),
+ reply,
+ &con->peer_caps_info,
+ &auth_meta->session_key,
+ &auth_meta->connection_secret);
+ } else {
+ priv = con->get_priv();
+ if (!priv) {
+ // this can happen if the async ms_handle_reset event races with
+ // the unlocked call into handle_auth_request
+ return -EACCES;
}
+ s = static_cast<MonSession*>(priv.get());
+ r = s->auth_handler->handle_request(
+ p,
+ auth_meta->get_connection_secret_length(),
+ reply,
+ &con->peer_global_id,
+ &con->peer_caps_info,
+ &auth_meta->session_key,
+ &auth_meta->connection_secret);
+ }
+ if (r > 0 &&
+ !s->authenticated) {
+ ms_handle_authentication(con);
+ }
+
+ dout(30) << " r " << r << " reply:\n";
+ reply->hexdump(*_dout);
+ *_dout << dendl;
+ return r;
+}
+
+void Monitor::ms_handle_accept(Connection *con)
+{
+ auto priv = con->get_priv();
+ MonSession *s = static_cast<MonSession*>(priv.get());
+ if (!s) {
+ // legacy protocol v1?
+ dout(10) << __func__ << " con " << con << " no session" << dendl;
+ return;
+ }
+
+ if (s->item.is_on_list()) {
+ dout(10) << __func__ << " con " << con << " session " << s
+ << " already on list" << dendl;
} else {
- // who cares.
- isvalid = true;
+ dout(10) << __func__ << " con " << con << " session " << s
+ << " registering session for "
+ << con->get_peer_addrs() << dendl;
+ s->_ident(entity_name_t(con->get_peer_type(), con->get_peer_id()),
+ con->get_peer_addrs());
+ std::lock_guard l(session_map_lock);
+ session_map.add_session(s);
}
- return true;
+}
+
+int Monitor::ms_handle_authentication(Connection *con)
+{
+ if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+ // mon <-> mon connections need no Session, and setting one up
+ // creates an awkward ref cycle between Session and Connection.
+ return 1;
+ }
+
+ auto priv = con->get_priv();
+ MonSession *s = static_cast<MonSession*>(priv.get());
+ if (!s) {
+ // must be msgr2, otherwise dispatch would have set up the session.
+ s = session_map.new_session(
+ entity_name_t(con->get_peer_type(), -1), // we don't know yet
+ con->get_peer_addrs(),
+ con);
+ assert(s);
+ dout(10) << __func__ << " adding session " << s << " to con " << con
+ << dendl;
+ con->set_priv(s);
+ logger->set(l_mon_num_sessions, session_map.get_size());
+ logger->inc(l_mon_session_add);
+ }
+ dout(10) << __func__ << " session " << s << " con " << con
+ << " addr " << s->con->get_peer_addr()
+ << " " << *s << dendl;
+
+ AuthCapsInfo &caps_info = con->get_peer_caps_info();
+ int ret = 0;
+ if (caps_info.allow_all) {
+ s->caps.set_allow_all();
+ s->authenticated = true;
+ ret = 1;
+ }
+ if (caps_info.caps.length()) {
+ bufferlist::const_iterator p = caps_info.caps.cbegin();
+ string str;
+ try {
+ decode(str, p);
+ } catch (const buffer::error &err) {
+ derr << __func__ << " corrupt cap data for " << con->get_peer_entity_name()
+ << " in auth db" << dendl;
+ str.clear();
+ ret = -EPERM;
+ }
+ if (ret >= 0) {
+ if (s->caps.parse(str, NULL)) {
+ s->authenticated = true;
+ ret = 1;
+ } else {
+ derr << __func__ << " unparseable caps '" << str << "' for "
+ << con->get_peer_entity_name() << dendl;
+ ret = -EPERM;
+ }
+ }
+ }
+
+ return ret;
}