#include "MonmapMonitor.h"
#include "Monitor.h"
+#include "OSDMonitor.h"
#include "messages/MMonCommand.h"
#include "messages/MMonJoin.h"
#include "common/config.h"
#include "common/cmdparse.h"
-#include "include/assert.h"
+#include "include/ceph_assert.h"
#include "include/stringify.h"
#define dout_subsys ceph_subsys_mon
#undef dout_prefix
#define dout_prefix _prefix(_dout, mon)
-static ostream& _prefix(std::ostream *_dout, Monitor *mon) {
- return *_dout << "mon." << mon->name << "@" << mon->rank
- << "(" << mon->get_state_name()
- << ").monmap v" << mon->monmap->epoch << " ";
+using namespace TOPNSPC::common;
+
+using std::cout;
+using std::dec;
+using std::hex;
+using std::list;
+using std::map;
+using std::make_pair;
+using std::ostream;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::setfill;
+using std::string;
+using std::stringstream;
+using std::to_string;
+using std::vector;
+using std::unique_ptr;
+
+using ceph::bufferlist;
+using ceph::decode;
+using ceph::encode;
+using ceph::Formatter;
+using ceph::JSONFormatter;
+using ceph::make_message;
+using ceph::mono_clock;
+using ceph::mono_time;
+using ceph::timespan_str;
+static ostream& _prefix(std::ostream *_dout, Monitor &mon) {
+ return *_dout << "mon." << mon.name << "@" << mon.rank
+ << "(" << mon.get_state_name()
+ << ").monmap v" << mon.monmap->epoch << " ";
}
void MonmapMonitor::create_initial()
{
dout(10) << __func__ << " using current monmap" << dendl;
- pending_map = *mon->monmap;
+ pending_map = *mon.monmap;
pending_map.epoch = 1;
- if (g_conf->mon_debug_no_initial_persistent_features) {
+ if (g_conf()->mon_debug_no_initial_persistent_features) {
derr << __func__ << " mon_debug_no_initial_persistent_features=true"
<< dendl;
} else {
// initialize with default persistent features for new clusters
pending_map.persistent_features = ceph::features::mon::get_persistent();
+ pending_map.min_mon_release = ceph_release();
}
}
void MonmapMonitor::update_from_paxos(bool *need_bootstrap)
{
version_t version = get_last_committed();
- if (version <= mon->monmap->get_epoch())
+ if (version <= mon.monmap->get_epoch())
return;
dout(10) << __func__ << " version " << version
- << ", my v " << mon->monmap->epoch << dendl;
+ << ", my v " << mon.monmap->epoch << dendl;
- if (need_bootstrap && version != mon->monmap->get_epoch()) {
+ if (need_bootstrap && version != mon.monmap->get_epoch()) {
dout(10) << " signaling that we need a bootstrap" << dendl;
*need_bootstrap = true;
}
// read and decode
monmap_bl.clear();
int ret = get_version(version, monmap_bl);
- assert(ret == 0);
- assert(monmap_bl.length());
+ ceph_assert(ret == 0);
+ ceph_assert(monmap_bl.length());
dout(10) << __func__ << " got " << version << dendl;
- mon->monmap->decode(monmap_bl);
+ mon.monmap->decode(monmap_bl);
- if (mon->store->exists("mkfs", "monmap")) {
+ if (mon.store->exists("mkfs", "monmap")) {
auto t(std::make_shared<MonitorDBStore::Transaction>());
t->erase("mkfs", "monmap");
- mon->store->apply_transaction(t);
+ mon.store->apply_transaction(t);
}
check_subs();
+
+ // make sure we've recorded min_mon_release
+ string val;
+ if (mon.store->read_meta("min_mon_release", &val) < 0 ||
+ val.size() == 0 ||
+ atoi(val.c_str()) != (int)ceph_release()) {
+ dout(10) << __func__ << " updating min_mon_release meta" << dendl;
+ mon.store->write_meta("min_mon_release",
+ stringify(ceph_release()));
+ }
+
+ mon.notify_new_monmap(true);
}
void MonmapMonitor::create_pending()
{
- pending_map = *mon->monmap;
+ pending_map = *mon.monmap;
pending_map.epoch++;
pending_map.last_changed = ceph_clock_now();
dout(10) << __func__ << " monmap epoch " << pending_map.epoch << dendl;
{
dout(10) << __func__ << " epoch " << pending_map.epoch << dendl;
- assert(mon->monmap->epoch + 1 == pending_map.epoch ||
+ ceph_assert(mon.monmap->epoch + 1 == pending_map.epoch ||
pending_map.epoch == 1); // special case mkfs!
bufferlist bl;
- pending_map.encode(bl, mon->get_quorum_con_features());
+ pending_map.encode(bl, mon.get_quorum_con_features());
put_version(t, pending_map.epoch, bl);
put_last_committed(t, pending_map.epoch);
// generate a cluster fingerprint, too?
if (pending_map.epoch == 1) {
- mon->prepare_new_fingerprint(t);
+ mon.prepare_new_fingerprint(t);
}
+
+ //health
+ health_check_map_t next;
+ pending_map.check_health(&next);
+ encode_health(next, t);
}
class C_ApplyFeatures : public Context {
MonmapMonitor *svc;
mon_feature_t features;
- public:
- C_ApplyFeatures(MonmapMonitor *s, const mon_feature_t& f) :
- svc(s), features(f) { }
+ ceph_release_t min_mon_release;
+public:
+ C_ApplyFeatures(MonmapMonitor *s, const mon_feature_t& f, ceph_release_t mmr) :
+ svc(s), features(f), min_mon_release(mmr) { }
void finish(int r) override {
if (r >= 0) {
- svc->apply_mon_features(features);
+ svc->apply_mon_features(features, min_mon_release);
} else if (r == -EAGAIN || r == -ECANCELED) {
// discard features if we're no longer on the quorum that
// established them in the first place.
return;
} else {
- assert(0 == "bad C_ApplyFeatures return value");
+ ceph_abort_msg("bad C_ApplyFeatures return value");
}
}
};
-void MonmapMonitor::apply_mon_features(const mon_feature_t& features)
+void MonmapMonitor::apply_mon_features(const mon_feature_t& features,
+ ceph_release_t min_mon_release)
{
if (!is_writeable()) {
dout(5) << __func__ << " wait for service to be writeable" << dendl;
- wait_for_writeable_ctx(new C_ApplyFeatures(this, features));
+ wait_for_writeable_ctx(new C_ApplyFeatures(this, features, min_mon_release));
+ return;
+ }
+
+ // do nothing here unless we have a full quorum
+ if (mon.get_quorum().size() < mon.monmap->size()) {
return;
}
- assert(is_writeable());
- assert(features.contains_all(pending_map.persistent_features));
+ ceph_assert(is_writeable());
+ ceph_assert(features.contains_all(pending_map.persistent_features));
// we should never hit this because `features` should be the result
// of the quorum's supported features. But if it happens, die.
- assert(ceph::features::mon::get_supported().contains_all(features));
+ ceph_assert(ceph::features::mon::get_supported().contains_all(features));
mon_feature_t new_features =
(pending_map.persistent_features ^
(features & ceph::features::mon::get_persistent()));
- if (new_features.empty()) {
- dout(10) << __func__ << " features match current pending: "
- << features << dendl;
+ if (new_features.empty() &&
+ pending_map.min_mon_release == min_mon_release) {
+ dout(10) << __func__ << " min_mon_release (" << (int)min_mon_release
+ << ") and features (" << features << ") match" << dendl;
return;
}
- if (mon->get_quorum().size() < mon->monmap->size()) {
- dout(1) << __func__ << " new features " << new_features
- << " contains features that require a full quorum"
- << " (quorum size is " << mon->get_quorum().size()
- << ", requires " << mon->monmap->size() << "): "
- << new_features
- << " -- do not enable them!" << dendl;
- return;
+ if (!new_features.empty()) {
+ dout(1) << __func__ << " applying new features "
+ << new_features << ", had " << pending_map.persistent_features
+ << ", will have "
+ << (new_features | pending_map.persistent_features)
+ << dendl;
+ pending_map.persistent_features |= new_features;
+ }
+ if (min_mon_release > pending_map.min_mon_release) {
+ dout(1) << __func__ << " increasing min_mon_release to "
+ << to_integer<int>(min_mon_release) << " (" << min_mon_release
+ << ")" << dendl;
+ pending_map.min_mon_release = min_mon_release;
}
- new_features |= pending_map.persistent_features;
-
- dout(5) << __func__ << " applying new features to monmap;"
- << " had " << pending_map.persistent_features
- << ", will have " << new_features << dendl;
- pending_map.persistent_features = new_features;
propose_pending();
}
void MonmapMonitor::on_active()
{
- if (get_last_committed() >= 1 && !mon->has_ever_joined) {
+ if (get_last_committed() >= 1 && !mon.has_ever_joined) {
// make note of the fact that i was, once, part of the quorum.
dout(10) << "noting that i was, once, part of an active quorum." << dendl;
*/
auto t(std::make_shared<MonitorDBStore::Transaction>());
t->put(Monitor::MONITOR_NAME, "joined", 1);
- mon->store->apply_transaction(t);
- mon->has_ever_joined = true;
+ mon.store->apply_transaction(t);
+ mon.has_ever_joined = true;
}
- if (mon->is_leader())
- mon->clog->info() << "monmap " << *mon->monmap;
+ if (mon.is_leader()) {
+ mon.clog->debug() << "monmap " << *mon.monmap;
+ }
- apply_mon_features(mon->get_quorum_mon_features());
+ apply_mon_features(mon.get_quorum_mon_features(),
+ mon.quorum_min_mon_release);
}
bool MonmapMonitor::preprocess_query(MonOpRequestRef op)
{
- PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
+ auto m = op->get_req<PaxosServiceMessage>();
switch (m->get_type()) {
// READs
case MSG_MON_COMMAND:
- return preprocess_command(op);
+ try {
+ return preprocess_command(op);
+ }
+ catch (const bad_cmd_get& e) {
+ bufferlist bl;
+ mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
+ return true;
+ }
case MSG_MON_JOIN:
return preprocess_join(op);
default:
f->dump_unsigned("monmap_first_committed", get_first_committed());
f->dump_unsigned("monmap_last_committed", get_last_committed());
f->open_object_section("monmap");
- mon->monmap->dump(f);
+ mon.monmap->dump(f);
f->close_section();
f->open_array_section("quorum");
- for (set<int>::iterator q = mon->get_quorum().begin(); q != mon->get_quorum().end(); ++q)
+ for (set<int>::iterator q = mon.get_quorum().begin(); q != mon.get_quorum().end(); ++q)
f->dump_int("mon", *q);
f->close_section();
}
bool MonmapMonitor::preprocess_command(MonOpRequestRef op)
{
- MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
+ auto m = op->get_req<MMonCommand>();
int r = -1;
bufferlist rdata;
stringstream ss;
- map<string, cmd_vartype> cmdmap;
+ cmdmap_t cmdmap;
if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
string rs = ss.str();
- mon->reply_command(op, -EINVAL, rs, rdata, get_last_committed());
+ mon.reply_command(op, -EINVAL, rs, rdata, get_last_committed());
return true;
}
string prefix;
- cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
+ cmd_getval(cmdmap, "prefix", prefix);
- MonSession *session = m->get_session();
+ MonSession *session = op->get_session();
if (!session) {
- mon->reply_command(op, -EACCES, "access denied", get_last_committed());
+ mon.reply_command(op, -EACCES, "access denied", get_last_committed());
return true;
}
string format;
- cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
+ cmd_getval(cmdmap, "format", format, string("plain"));
boost::scoped_ptr<Formatter> f(Formatter::create(format));
if (prefix == "mon stat") {
- mon->monmap->print_summary(ss);
- ss << ", election epoch " << mon->get_epoch() << ", leader "
- << mon->get_leader() << " " << mon->get_leader_name()
- << ", quorum " << mon->get_quorum() << " " << mon->get_quorum_names();
+ if (f) {
+ f->open_object_section("monmap");
+ mon.monmap->dump_summary(f.get());
+ f->dump_string("leader", mon.get_leader_name());
+ f->open_array_section("quorum");
+ for (auto rank: mon.get_quorum()) {
+ std::string name = mon.monmap->get_name(rank);
+ f->open_object_section("mon");
+ f->dump_int("rank", rank);
+ f->dump_string("name", name);
+ f->close_section(); // mon
+ }
+ f->close_section(); // quorum
+ f->close_section(); // monmap
+ f->flush(ss);
+ } else {
+ mon.monmap->print_summary(ss);
+ ss << ", election epoch " << mon.get_epoch() << ", leader "
+ << mon.get_leader() << " " << mon.get_leader_name()
+ << ", quorum " << mon.get_quorum()
+ << " " << mon.get_quorum_names();
+ }
+
rdata.append(ss);
ss.str("");
r = 0;
epoch_t epoch;
int64_t epochnum;
- cmd_getval(g_ceph_context, cmdmap, "epoch", epochnum, (int64_t)0);
+ cmd_getval(cmdmap, "epoch", epochnum, (int64_t)0);
epoch = epochnum;
- MonMap *p = mon->monmap;
+ MonMap *p = mon.monmap;
if (epoch) {
bufferlist bl;
r = get_version(epoch, bl);
ss << "there is no map for epoch " << epoch;
goto reply;
}
- assert(r == 0);
- assert(bl.length() > 0);
+ ceph_assert(r == 0);
+ ceph_assert(bl.length() > 0);
p = new MonMap;
p->decode(bl);
}
- assert(p != NULL);
+ ceph_assert(p);
if (prefix == "mon getmap") {
p->encode(rdata, m->get_connection()->get_features());
f->open_object_section("monmap");
p->dump(f.get());
f->open_array_section("quorum");
- for (set<int>::iterator q = mon->get_quorum().begin();
- q != mon->get_quorum().end(); ++q) {
+ for (set<int>::iterator q = mon.get_quorum().begin();
+ q != mon.get_quorum().end(); ++q) {
f->dump_int("mon", *q);
}
f->close_section();
rdata.append(ds);
ss << "dumped monmap epoch " << p->get_epoch();
}
- if (p != mon->monmap)
+ if (p != mon.monmap) {
delete p;
+ p = nullptr;
+ }
} else if (prefix == "mon feature ls") {
bool list_with_value = false;
string with_value;
- if (cmd_getval(g_ceph_context, cmdmap, "with_value", with_value) &&
+ if (cmd_getval(cmdmap, "with_value", with_value) &&
with_value == "--with-value") {
list_with_value = true;
}
- MonMap *p = mon->monmap;
+ MonMap *p = mon.monmap;
// list features
mon_feature_t supported = ceph::features::mon::get_supported();
string rs;
getline(ss, rs);
- mon->reply_command(op, r, rs, rdata, get_last_committed());
+ mon.reply_command(op, r, rs, rdata, get_last_committed());
return true;
} else
return false;
bool MonmapMonitor::prepare_update(MonOpRequestRef op)
{
- PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
+ auto m = op->get_req<PaxosServiceMessage>();
dout(7) << __func__ << " " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
case MSG_MON_COMMAND:
- return prepare_command(op);
+ try {
+ return prepare_command(op);
+ } catch (const bad_cmd_get& e) {
+ bufferlist bl;
+ mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
+ return true;
+ }
case MSG_MON_JOIN:
return prepare_join(op);
default:
bool MonmapMonitor::prepare_command(MonOpRequestRef op)
{
- MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
+ auto m = op->get_req<MMonCommand>();
stringstream ss;
string rs;
int err = -EINVAL;
- map<string, cmd_vartype> cmdmap;
+ cmdmap_t cmdmap;
if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
string rs = ss.str();
- mon->reply_command(op, -EINVAL, rs, get_last_committed());
+ mon.reply_command(op, -EINVAL, rs, get_last_committed());
return true;
}
string prefix;
- cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
+ cmd_getval(cmdmap, "prefix", prefix);
- MonSession *session = m->get_session();
+ MonSession *session = op->get_session();
if (!session) {
- mon->reply_command(op, -EACCES, "access denied", get_last_committed());
+ mon.reply_command(op, -EACCES, "access denied", get_last_committed());
return true;
}
* state, thus we are not bound by it.
*/
- assert(mon->monmap);
- MonMap &monmap = *mon->monmap;
+ ceph_assert(mon.monmap);
+ MonMap &monmap = *mon.monmap;
/* Please note:
bool propose = false;
if (prefix == "mon add") {
string name;
- cmd_getval(g_ceph_context, cmdmap, "name", name);
+ cmd_getval(cmdmap, "name", name);
string addrstr;
- cmd_getval(g_ceph_context, cmdmap, "addr", addrstr);
+ cmd_getval(cmdmap, "addr", addrstr);
entity_addr_t addr;
bufferlist rdata;
goto reply;
}
- if (addr.get_port() == 0) {
- ss << "port defaulted to " << CEPH_MON_PORT;
- addr.set_port(CEPH_MON_PORT);
+ vector<string> locationvec;
+ map<string, string> loc;
+ cmd_getval(cmdmap, "location", locationvec);
+ CrushWrapper::parse_loc_map(locationvec, &loc);
+ if (locationvec.size() &&
+ !mon.get_quorum_mon_features().contains_all(
+ ceph::features::mon::FEATURE_PINGING)) {
+ err = -ENOTSUP;
+ ss << "Not all monitors support adding monitors with a location; please upgrade first!";
+ goto reply;
+ }
+ if (locationvec.size() && !loc.size()) {
+ ss << "We could not parse your input location to anything real; " << locationvec
+ << " turned into an empty map!";
+ err = -EINVAL;
+ goto reply;
+ }
+
+ dout(10) << "mon add setting location for " << name << " to " << loc << dendl;
+
+ // TODO: validate location in crush map
+ if (monmap.stretch_mode_enabled && !loc.size()) {
+ ss << "We are in stretch mode and new monitors must have a location, but "
+ << "could not parse your input location to anything real; " << locationvec
+ << " turned into an empty map!";
+ err = -EINVAL;
+ goto reply;
+ }
+ // TODO: validate location against any existing stretch config
+
+ entity_addrvec_t addrs;
+ if (monmap.persistent_features.contains_all(
+ ceph::features::mon::FEATURE_NAUTILUS)) {
+ if (addr.get_port() == CEPH_MON_PORT_IANA) {
+ addr.set_type(entity_addr_t::TYPE_MSGR2);
+ }
+ if (addr.get_port() == CEPH_MON_PORT_LEGACY) {
+ // if they specified the *old* default they probably don't care
+ addr.set_port(0);
+ }
+ if (addr.get_port()) {
+ addrs.v.push_back(addr);
+ } else {
+ addr.set_type(entity_addr_t::TYPE_MSGR2);
+ addr.set_port(CEPH_MON_PORT_IANA);
+ addrs.v.push_back(addr);
+ addr.set_type(entity_addr_t::TYPE_LEGACY);
+ addr.set_port(CEPH_MON_PORT_LEGACY);
+ addrs.v.push_back(addr);
+ }
+ } else {
+ if (addr.get_port() == 0) {
+ addr.set_port(CEPH_MON_PORT_LEGACY);
+ }
+ addr.set_type(entity_addr_t::TYPE_LEGACY);
+ addrs.v.push_back(addr);
}
+ dout(20) << __func__ << " addr " << addr << " -> addrs " << addrs << dendl;
/**
* If we have a monitor with the same name and different addr, then EEXIST
do {
if (monmap.contains(name)) {
- if (monmap.get_addr(name) == addr) {
+ if (monmap.get_addrs(name) == addrs) {
// stable map contains monitor with the same name at the same address.
// serialize before current pending map.
err = 0; // for clarity; this has already been set above.
- ss << "mon." << name << " at " << addr << " already exists";
+ ss << "mon." << name << " at " << addrs << " already exists";
goto reply;
} else {
ss << "mon." << name
- << " already exists at address " << monmap.get_addr(name);
+ << " already exists at address " << monmap.get_addrs(name);
}
- } else if (monmap.contains(addr)) {
+ } else if (monmap.contains(addrs)) {
// we established on the previous branch that name is different
- ss << "mon." << monmap.get_name(addr)
+ ss << "mon." << monmap.get_name(addrs)
<< " already exists at address " << addr;
} else {
// go ahead and add
goto reply;
} while (false);
+ if (pending_map.stretch_mode_enabled) {
+
+ }
+
/* Given there's no delay between proposals on the MonmapMonitor (see
* MonmapMonitor::should_propose()), there is no point in checking for
* a mismatch between name and addr on pending_map.
* we can simply go ahead and add the monitor.
*/
- pending_map.add(name, addr);
+ pending_map.add(name, addrs);
+ pending_map.mon_info[name].crush_loc = loc;
pending_map.last_changed = ceph_clock_now();
- ss << "adding mon." << name << " at " << addr;
+ ss << "adding mon." << name << " at " << addrs;
propose = true;
dout(0) << __func__ << " proposing new mon." << name << dendl;
} else if (prefix == "mon remove" ||
prefix == "mon rm") {
string name;
- cmd_getval(g_ceph_context, cmdmap, "name", name);
+ cmd_getval(cmdmap, "name", name);
if (!monmap.contains(name)) {
err = 0;
ss << "mon." << name << " does not exist or has already been removed";
* introduced.
*/
- entity_addr_t addr = pending_map.get_addr(name);
+ entity_addrvec_t addrs = pending_map.get_addrs(name);
pending_map.remove(name);
pending_map.last_changed = ceph_clock_now();
- ss << "removing mon." << name << " at " << addr
+ ss << "removing mon." << name << " at " << addrs
<< ", there will be " << pending_map.size() << " monitors" ;
propose = true;
err = 0;
* 'mon flag set/unset'.
*/
string feature_name;
- if (!cmd_getval(g_ceph_context, cmdmap, "feature_name", feature_name)) {
+ if (!cmd_getval(cmdmap, "feature_name", feature_name)) {
ss << "missing required feature name";
err = -EINVAL;
goto reply;
goto reply;
}
- string sure;
- if (!cmd_getval(g_ceph_context, cmdmap, "sure", sure) ||
- sure != "--yes-i-really-mean-it") {
+ bool sure = false;
+ cmd_getval(cmdmap, "yes_i_really_mean_it", sure);
+ if (!sure) {
ss << "please specify '--yes-i-really-mean-it' if you "
<< "really, **really** want to set feature '"
<< feature << "' in the monmap.";
goto reply;
}
- if (!mon->get_quorum_mon_features().contains_all(feature)) {
+ if (!mon.get_quorum_mon_features().contains_all(feature)) {
ss << "current quorum does not support feature '" << feature
<< "'; supported features: "
- << mon->get_quorum_mon_features();
+ << mon.get_quorum_mon_features();
err = -EINVAL;
goto reply;
}
pending_map.last_changed = ceph_clock_now();
propose = true;
- dout(1) << __func__ << ss.str() << "; new features will be: "
+ dout(1) << __func__ << " " << ss.str() << "; new features will be: "
<< "persistent = " << pending_map.persistent_features
// output optional nevertheless, for auditing purposes.
<< ", optional = " << pending_map.optional_features << dendl;
-
+
+ } else if (prefix == "mon set-rank") {
+ string name;
+ int64_t rank;
+ if (!cmd_getval(cmdmap, "name", name) ||
+ !cmd_getval(cmdmap, "rank", rank)) {
+ err = -EINVAL;
+ goto reply;
+ }
+ int oldrank = pending_map.get_rank(name);
+ if (oldrank < 0) {
+ ss << "mon." << name << " does not exist in monmap";
+ err = -ENOENT;
+ goto reply;
+ }
+ err = 0;
+ pending_map.set_rank(name, rank);
+ pending_map.last_changed = ceph_clock_now();
+ propose = true;
+ } else if (prefix == "mon set-addrs") {
+ string name;
+ string addrs;
+ if (!cmd_getval(cmdmap, "name", name) ||
+ !cmd_getval(cmdmap, "addrs", addrs)) {
+ err = -EINVAL;
+ goto reply;
+ }
+ if (!pending_map.contains(name)) {
+ ss << "mon." << name << " does not exist";
+ err = -ENOENT;
+ goto reply;
+ }
+ entity_addrvec_t av;
+ if (!av.parse(addrs.c_str(), nullptr)) {
+ ss << "failed to parse addrs '" << addrs << "'";
+ err = -EINVAL;
+ goto reply;
+ }
+ for (auto& a : av.v) {
+ a.set_nonce(0);
+ if (!a.get_port()) {
+ ss << "monitor must bind to a non-zero port, not " << a;
+ err = -EINVAL;
+ goto reply;
+ }
+ }
+ err = 0;
+ pending_map.set_addrvec(name, av);
+ pending_map.last_changed = ceph_clock_now();
+ propose = true;
+ } else if (prefix == "mon set-weight") {
+ string name;
+ int64_t weight;
+ if (!cmd_getval(cmdmap, "name", name) ||
+ !cmd_getval(cmdmap, "weight", weight)) {
+ err = -EINVAL;
+ goto reply;
+ }
+ if (!pending_map.contains(name)) {
+ ss << "mon." << name << " does not exist";
+ err = -ENOENT;
+ goto reply;
+ }
+ err = 0;
+ pending_map.set_weight(name, weight);
+ pending_map.last_changed = ceph_clock_now();
+ propose = true;
+ } else if (prefix == "mon enable-msgr2") {
+ if (!monmap.get_required_features().contains_all(
+ ceph::features::mon::FEATURE_NAUTILUS)) {
+ err = -EACCES;
+ ss << "all monitors must be running nautilus to enable v2";
+ goto reply;
+ }
+ for (auto& i : pending_map.mon_info) {
+ if (i.second.public_addrs.v.size() == 1 &&
+ i.second.public_addrs.front().is_legacy() &&
+ i.second.public_addrs.front().get_port() == CEPH_MON_PORT_LEGACY) {
+ entity_addrvec_t av;
+ entity_addr_t a = i.second.public_addrs.front();
+ a.set_type(entity_addr_t::TYPE_MSGR2);
+ a.set_port(CEPH_MON_PORT_IANA);
+ av.v.push_back(a);
+ av.v.push_back(i.second.public_addrs.front());
+ dout(10) << " setting mon." << i.first
+ << " addrs " << i.second.public_addrs
+ << " -> " << av << dendl;
+ pending_map.set_addrvec(i.first, av);
+ propose = true;
+ pending_map.last_changed = ceph_clock_now();
+ }
+ }
+ err = 0;
+ } else if (prefix == "mon set election_strategy") {
+ if (!mon.get_quorum_mon_features().contains_all(
+ ceph::features::mon::FEATURE_PINGING)) {
+ err = -ENOTSUP;
+ ss << "Not all monitors support changing election strategies; please upgrade first!";
+ goto reply;
+ }
+ string strat;
+ MonMap::election_strategy strategy;
+ if (!cmd_getval(cmdmap, "strategy", strat)) {
+ err = -EINVAL;
+ goto reply;
+ }
+ if (strat == "classic") {
+ strategy = MonMap::CLASSIC;
+ } else if (strat == "disallow") {
+ strategy = MonMap::DISALLOW;
+ } else if (strat == "connectivity") {
+ strategy = MonMap::CONNECTIVITY;
+ } else {
+ err = -EINVAL;
+ goto reply;
+ }
+ err = 0;
+ pending_map.strategy = strategy;
+ propose = true;
+ } else if (prefix == "mon add disallowed_leader") {
+ if (!mon.get_quorum_mon_features().contains_all(
+ ceph::features::mon::FEATURE_PINGING)) {
+ err = -ENOTSUP;
+ ss << "Not all monitors support changing election strategies; please upgrade first!";
+ goto reply;
+ }
+ string name;
+ if (!cmd_getval(cmdmap, "name", name)) {
+ err = -EINVAL;
+ goto reply;
+ }
+ if (pending_map.strategy != MonMap::DISALLOW &&
+ pending_map.strategy != MonMap::CONNECTIVITY) {
+ ss << "You cannot disallow monitors in your current election mode";
+ err = -EINVAL;
+ goto reply;
+ }
+ if (!pending_map.contains(name)) {
+ ss << "mon." << name << " does not exist";
+ err = -ENOENT;
+ goto reply;
+ }
+ if (pending_map.disallowed_leaders.count(name)) {
+ ss << "mon." << name << " is already disallowed";
+ err = 0;
+ goto reply;
+ }
+ if (pending_map.disallowed_leaders.size() == pending_map.size() - 1) {
+ ss << "mon." << name << " is the only remaining allowed leader!";
+ err = -EINVAL;
+ goto reply;
+ }
+ pending_map.disallowed_leaders.insert(name);
+ err = 0;
+ propose = true;
+ } else if (prefix == "mon rm disallowed_leader") {
+ if (!mon.get_quorum_mon_features().contains_all(
+ ceph::features::mon::FEATURE_PINGING)) {
+ err = -ENOTSUP;
+ ss << "Not all monitors support changing election strategies; please upgrade first!";
+ goto reply;
+ }
+ string name;
+ if (!cmd_getval(cmdmap, "name", name)) {
+ err = -EINVAL;
+ goto reply;
+ }
+ if (pending_map.strategy != MonMap::DISALLOW &&
+ pending_map.strategy != MonMap::CONNECTIVITY) {
+ ss << "You cannot disallow monitors in your current election mode";
+ err = -EINVAL;
+ goto reply;
+ }
+ if (!pending_map.contains(name)) {
+ ss << "mon." << name << " does not exist";
+ err = -ENOENT;
+ goto reply;
+ }
+ if (!pending_map.disallowed_leaders.count(name)) {
+ ss << "mon." << name << " is already allowed";
+ err = 0;
+ goto reply;
+ }
+ pending_map.disallowed_leaders.erase(name);
+ err = 0;
+ propose = true;
+ } else if (prefix == "mon set_location") {
+ if (!mon.get_quorum_mon_features().contains_all(
+ ceph::features::mon::FEATURE_PINGING)) {
+ err = -ENOTSUP;
+ ss << "Not all monitors support monitor locations; please upgrade first!";
+ goto reply;
+ }
+ string name;
+ if (!cmd_getval(cmdmap, "name", name)) {
+ err = -EINVAL;
+ goto reply;
+ }
+ if (!pending_map.contains(name)) {
+ ss << "mon." << name << " does not exist";
+ err = -ENOENT;
+ goto reply;
+ }
+
+ if (!mon.osdmon()->is_readable()) {
+ mon.osdmon()->wait_for_readable(op, new Monitor::C_RetryMessage(&mon, op));
+ }
+ vector<string> argvec;
+ map<string, string> loc;
+ cmd_getval(cmdmap, "args", argvec);
+ CrushWrapper::parse_loc_map(argvec, &loc);
+
+ dout(10) << "mon set_location for " << name << " to " << loc << dendl;
+
+ // TODO: validate location in crush map
+ if (!loc.size()) {
+ ss << "We could not parse your input location to anything real; " << argvec
+ << " turned into an empty map!";
+ err = -EINVAL;
+ goto reply;
+ }
+ // TODO: validate location against any existing stretch config
+ pending_map.mon_info[name].crush_loc = loc;
+ err = 0;
+ propose = true;
+ } else if (prefix == "mon enable_stretch_mode") {
+ if (!mon.osdmon()->is_writeable()) {
+ dout(1) << __func__
+ << ": waiting for osdmon writeable for stretch mode" << dendl;
+ mon.osdmon()->wait_for_writeable(op, new Monitor::C_RetryMessage(&mon, op));
+ return false;
+ }
+ {
+ if (monmap.stretch_mode_enabled) {
+ ss << "stretch mode is already engaged";
+ err = -EINVAL;
+ goto reply;
+ }
+ if (pending_map.stretch_mode_enabled) {
+ ss << "stretch mode currently committing";
+ err = 0;
+ goto reply;
+ }
+ string tiebreaker_mon;
+ if (!cmd_getval(cmdmap, "tiebreaker_mon", tiebreaker_mon)) {
+ ss << "must specify a tiebreaker monitor";
+ err = -EINVAL;
+ goto reply;
+ }
+ string new_crush_rule;
+ if (!cmd_getval(cmdmap, "new_crush_rule", new_crush_rule)) {
+ ss << "must specify a new crush rule that spreads out copies over multiple sites";
+ err = -EINVAL;
+ goto reply;
+ }
+ string dividing_bucket;
+ if (!cmd_getval(cmdmap, "dividing_bucket", dividing_bucket)) {
+ ss << "must specify a dividing bucket";
+ err = -EINVAL;
+ goto reply;
+ }
+ //okay, initial arguments make sense, check pools and cluster state
+ err = mon.osdmon()->check_cluster_features(CEPH_FEATUREMASK_STRETCH_MODE, ss);
+ if (err)
+ goto reply;
+ struct Plugger {
+ Paxos &p;
+ Plugger(Paxos &p) : p(p) { p.plug(); }
+ ~Plugger() { p.unplug(); }
+ } plugger(paxos);
+
+ set<pg_pool_t*> pools;
+ bool okay = false;
+ int errcode = 0;
+
+ mon.osdmon()->try_enable_stretch_mode_pools(ss, &okay, &errcode,
+ &pools, new_crush_rule);
+ if (!okay) {
+ err = errcode;
+ goto reply;
+ }
+ try_enable_stretch_mode(ss, &okay, &errcode, false,
+ tiebreaker_mon, dividing_bucket);
+ if (!okay) {
+ err = errcode;
+ goto reply;
+ }
+ mon.osdmon()->try_enable_stretch_mode(ss, &okay, &errcode, false,
+ dividing_bucket, 2, pools, new_crush_rule);
+ if (!okay) {
+ err = errcode;
+ goto reply;
+ }
+ // everything looks good, actually commit the changes!
+ try_enable_stretch_mode(ss, &okay, &errcode, true,
+ tiebreaker_mon, dividing_bucket);
+ mon.osdmon()->try_enable_stretch_mode(ss, &okay, &errcode, true,
+ dividing_bucket,
+ 2, // right now we only support 2 sites
+ pools, new_crush_rule);
+ ceph_assert(okay == true);
+ }
+ request_proposal(mon.osdmon());
+ err = 0;
+ propose = true;
} else {
ss << "unknown command " << prefix;
err = -EINVAL;
reply:
getline(ss, rs);
- mon->reply_command(op, err, rs, get_last_committed());
+ mon.reply_command(op, err, rs, get_last_committed());
// we are returning to the user; do not propose.
return propose;
}
+void MonmapMonitor::try_enable_stretch_mode(stringstream& ss, bool *okay,
+ int *errcode, bool commit,
+ const string& tiebreaker_mon,
+ const string& dividing_bucket)
+{
+ dout(20) << __func__ << dendl;
+ *okay = false;
+ if (pending_map.strategy != MonMap::CONNECTIVITY) {
+ ss << "Monitors must use the connectivity strategy to enable stretch mode";
+ *errcode = -EINVAL;
+ ceph_assert(!commit);
+ return;
+ }
+ if (!pending_map.contains(tiebreaker_mon)) {
+ ss << "mon " << tiebreaker_mon << "does not seem to exist";
+ *errcode = -ENOENT;
+ ceph_assert(!commit);
+ return;
+ }
+ map<string,string> buckets;
+ for (const auto&mii : mon.monmap->mon_info) {
+ const auto& mi = mii.second;
+ const auto& bi = mi.crush_loc.find(dividing_bucket);
+ if (bi == mi.crush_loc.end()) {
+ ss << "Could not find location entry for " << dividing_bucket
+ << " on monitor " << mi.name;
+ *errcode = -EINVAL;
+ ceph_assert(!commit);
+ return;
+ }
+ buckets[mii.first] = bi->second;
+ }
+ string bucket1, bucket2, tiebreaker_bucket;
+ for (auto& i : buckets) {
+ if (i.first == tiebreaker_mon) {
+ tiebreaker_bucket = i.second;
+ continue;
+ }
+ if (bucket1.empty()) {
+ bucket1 = i.second;
+ }
+ if (bucket1 != i.second &&
+ bucket2.empty()) {
+ bucket2 = i.second;
+ }
+ if (bucket1 != i.second &&
+ bucket2 != i.second) {
+ ss << "There are too many monitor buckets for stretch mode, found "
+ << bucket1 << "," << bucket2 << "," << i.second;
+ *errcode = -EINVAL;
+ ceph_assert(!commit);
+ return;
+ }
+ }
+ if (bucket1.empty() || bucket2.empty()) {
+ ss << "There are not enough monitor buckets for stretch mode;"
+ << " must have at least 2 plus the tiebreaker but only found "
+ << (bucket1.empty() ? bucket1 : bucket2);
+ *errcode = -EINVAL;
+ ceph_assert(!commit);
+ return;
+ }
+ if (tiebreaker_bucket == bucket1 ||
+ tiebreaker_bucket == bucket2) {
+ ss << "The named tiebreaker monitor " << tiebreaker_mon
+ << " is in the same CRUSH bucket " << tiebreaker_bucket
+ << " as other monitors";
+ *errcode = -EINVAL;
+ ceph_assert(!commit);
+ return;
+ }
+ if (commit) {
+ pending_map.disallowed_leaders.insert(tiebreaker_mon);
+ pending_map.tiebreaker_mon = tiebreaker_mon;
+ pending_map.stretch_mode_enabled = true;
+ }
+ *okay = true;
+}
+
+void MonmapMonitor::trigger_degraded_stretch_mode(const set<string>& dead_mons)
+{
+ dout(20) << __func__ << dendl;
+ pending_map.stretch_marked_down_mons.insert(dead_mons.begin(), dead_mons.end());
+ propose_pending();
+}
+
+void MonmapMonitor::trigger_healthy_stretch_mode()
+{
+ dout(20) << __func__ << dendl;
+ pending_map.stretch_marked_down_mons.clear();
+ propose_pending();
+}
+
bool MonmapMonitor::preprocess_join(MonOpRequestRef op)
{
- MMonJoin *join = static_cast<MMonJoin*>(op->get_req());
- dout(10) << __func__ << " " << join->name << " at " << join->addr << dendl;
+ auto join = op->get_req<MMonJoin>();
+ dout(10) << __func__ << " " << join->name << " at " << join->addrs << dendl;
- MonSession *session = join->get_session();
+ MonSession *session = op->get_session();
if (!session ||
!session->is_capable("mon", MON_CAP_W | MON_CAP_X)) {
dout(10) << " insufficient caps" << dendl;
return true;
}
- if (pending_map.contains(join->name) && !pending_map.get_addr(join->name).is_blank_ip()) {
+ const auto name_info_i = pending_map.mon_info.find(join->name);
+ if (name_info_i != pending_map.mon_info.end() &&
+ !name_info_i->second.public_addrs.front().is_blank_ip() &&
+ (!join->force_loc || join->crush_loc == name_info_i->second.crush_loc)) {
dout(10) << " already have " << join->name << dendl;
return true;
}
- if (pending_map.contains(join->addr) && pending_map.get_name(join->addr) == join->name) {
- dout(10) << " already have " << join->addr << dendl;
+ string addr_name;
+ if (pending_map.contains(join->addrs)) {
+ addr_name = pending_map.get_name(join->addrs);
+ }
+ if (!addr_name.empty() &&
+ addr_name == join->name &&
+ (!join->force_loc || join->crush_loc.empty() ||
+ pending_map.mon_info[addr_name].crush_loc == join->crush_loc)) {
+ dout(10) << " already have " << join->addrs << dendl;
+ return true;
+ }
+ if (pending_map.stretch_mode_enabled &&
+ join->crush_loc.empty() &&
+ (addr_name.empty() ||
+ pending_map.mon_info[addr_name].crush_loc.empty())) {
+ dout(10) << "stretch mode engaged but no source of crush_loc" << dendl;
+ mon.clog->info() << join->name << " attempted to join from " << join->name
+ << ' ' << join->addrs
+ << "; but lacks a crush_location for stretch mode";
return true;
}
return false;
}
+
bool MonmapMonitor::prepare_join(MonOpRequestRef op)
{
- MMonJoin *join = static_cast<MMonJoin*>(op->get_req());
- dout(0) << "adding/updating " << join->name << " at " << join->addr << " to monitor cluster" << dendl;
+ auto join = op->get_req<MMonJoin>();
+ dout(0) << "adding/updating " << join->name
+ << " at " << join->addrs << " to monitor cluster" << dendl;
+ map<string,string> existing_loc;
+ if (pending_map.contains(join->addrs)) {
+ string name = pending_map.get_name(join->addrs);
+ existing_loc = pending_map.mon_info[name].crush_loc;
+ pending_map.remove(name);
+ }
if (pending_map.contains(join->name))
pending_map.remove(join->name);
- if (pending_map.contains(join->addr))
- pending_map.remove(pending_map.get_name(join->addr));
- pending_map.add(join->name, join->addr);
+ pending_map.add(join->name, join->addrs);
+ pending_map.mon_info[join->name].crush_loc =
+ ((join->force_loc || existing_loc.empty()) ?
+ join->crush_loc : existing_loc);
pending_map.last_changed = ceph_clock_now();
return true;
}
return true;
}
-void MonmapMonitor::get_health(list<pair<health_status_t, string> >& summary,
- list<pair<health_status_t, string> > *detail,
- CephContext *cct) const
-{
- int max = mon->monmap->size();
- int actual = mon->get_quorum().size();
- if (actual < max) {
- ostringstream ss;
- ss << (max-actual) << " mons down, quorum " << mon->get_quorum() << " " << mon->get_quorum_names();
- summary.push_back(make_pair(HEALTH_WARN, ss.str()));
- if (detail) {
- set<int> q = mon->get_quorum();
- for (int i=0; i<max; i++) {
- if (q.count(i) == 0) {
- ostringstream ss;
- ss << "mon." << mon->monmap->get_name(i) << " (rank " << i
- << ") addr " << mon->monmap->get_addr(i)
- << " is down (out of quorum)";
- detail->push_back(make_pair(HEALTH_WARN, ss.str()));
- }
- }
- }
- }
-}
-
int MonmapMonitor::get_monmap(bufferlist &bl)
{
version_t latest_ver = get_last_committed();
dout(10) << __func__ << " ver " << latest_ver << dendl;
- if (!mon->store->exists(get_service_name(), stringify(latest_ver)))
+ if (!mon.store->exists(get_service_name(), stringify(latest_ver)))
return -ENOENT;
int err = get_version(latest_ver, bl);
void MonmapMonitor::check_subs()
{
const string type = "monmap";
- mon->with_session_map([this, &type](const MonSessionMap& session_map) {
+ mon.with_session_map([this, &type](const MonSessionMap& session_map) {
auto subs = session_map.subs.find(type);
if (subs == session_map.subs.end())
return;
void MonmapMonitor::check_sub(Subscription *sub)
{
- const auto epoch = mon->monmap->get_epoch();
+ const auto epoch = mon.monmap->get_epoch();
dout(10) << __func__
<< " monmap next " << sub->next
<< " have " << epoch << dendl;
if (sub->next <= epoch) {
- mon->send_latest_monmap(sub->session->con.get());
+ mon.send_latest_monmap(sub->session->con.get());
if (sub->onetime) {
- mon->with_session_map([this, sub](MonSessionMap& session_map) {
+ mon.with_session_map([sub](MonSessionMap& session_map) {
session_map.remove_sub(sub);
});
} else {
}
}
}
+
+void MonmapMonitor::tick()
+{
+ if (!is_active() ||
+ !mon.is_leader()) {
+ return;
+ }
+
+ if (mon.monmap->created.is_zero()) {
+ dout(10) << __func__ << " detected empty created stamp" << dendl;
+ utime_t ctime;
+ for (version_t v = 1; v <= get_last_committed(); v++) {
+ bufferlist bl;
+ int r = get_version(v, bl);
+ if (r < 0) {
+ continue;
+ }
+ MonMap m;
+ auto p = bl.cbegin();
+ decode(m, p);
+ if (!m.last_changed.is_zero()) {
+ dout(10) << __func__ << " first monmap with last_changed is "
+ << v << " with " << m.last_changed << dendl;
+ ctime = m.last_changed;
+ break;
+ }
+ }
+ if (ctime.is_zero()) {
+ ctime = ceph_clock_now();
+ }
+ dout(10) << __func__ << " updating created stamp to " << ctime << dendl;
+ pending_map.created = ctime;
+ propose_pending();
+ }
+}