]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mon/MonmapMonitor.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / mon / MonmapMonitor.cc
index 7a0fb684de7e93cc1077b8d3354ed95d1e09594c..ec72f410d1fcf230384748abc1a367052a446799 100644 (file)
@@ -14,6 +14,7 @@
 
 #include "MonmapMonitor.h"
 #include "Monitor.h"
+#include "OSDMonitor.h"
 #include "messages/MMonCommand.h"
 #include "messages/MMonJoin.h"
 
 #include "common/config.h"
 #include "common/cmdparse.h"
 
-#include "include/assert.h"
+#include "include/ceph_assert.h"
 #include "include/stringify.h"
 
 #define dout_subsys ceph_subsys_mon
 #undef dout_prefix
 #define dout_prefix _prefix(_dout, mon)
-static ostream& _prefix(std::ostream *_dout, Monitor *mon) {
-  return *_dout << "mon." << mon->name << "@" << mon->rank
-               << "(" << mon->get_state_name()
-               << ").monmap v" << mon->monmap->epoch << " ";
+using namespace TOPNSPC::common;
+
+using std::cout;
+using std::dec;
+using std::hex;
+using std::list;
+using std::map;
+using std::make_pair;
+using std::ostream;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::setfill;
+using std::string;
+using std::stringstream;
+using std::to_string;
+using std::vector;
+using std::unique_ptr;
+
+using ceph::bufferlist;
+using ceph::decode;
+using ceph::encode;
+using ceph::Formatter;
+using ceph::JSONFormatter;
+using ceph::make_message;
+using ceph::mono_clock;
+using ceph::mono_time;
+using ceph::timespan_str;
+static ostream& _prefix(std::ostream *_dout, Monitor &mon) {
+  return *_dout << "mon." << mon.name << "@" << mon.rank
+               << "(" << mon.get_state_name()
+               << ").monmap v" << mon.monmap->epoch << " ";
 }
 
 void MonmapMonitor::create_initial()
 {
   dout(10) << __func__ << " using current monmap" << dendl;
-  pending_map = *mon->monmap;
+  pending_map = *mon.monmap;
   pending_map.epoch = 1;
 
-  if (g_conf->mon_debug_no_initial_persistent_features) {
+  if (g_conf()->mon_debug_no_initial_persistent_features) {
     derr << __func__ << " mon_debug_no_initial_persistent_features=true"
         << dendl;
   } else {
     // initialize with default persistent features for new clusters
     pending_map.persistent_features = ceph::features::mon::get_persistent();
+    pending_map.min_mon_release = ceph_release();
   }
 }
 
 void MonmapMonitor::update_from_paxos(bool *need_bootstrap)
 {
   version_t version = get_last_committed();
-  if (version <= mon->monmap->get_epoch())
+  if (version <= mon.monmap->get_epoch())
     return;
 
   dout(10) << __func__ << " version " << version
-          << ", my v " << mon->monmap->epoch << dendl;
+          << ", my v " << mon.monmap->epoch << dendl;
   
-  if (need_bootstrap && version != mon->monmap->get_epoch()) {
+  if (need_bootstrap && version != mon.monmap->get_epoch()) {
     dout(10) << " signaling that we need a bootstrap" << dendl;
     *need_bootstrap = true;
   }
@@ -67,24 +97,36 @@ void MonmapMonitor::update_from_paxos(bool *need_bootstrap)
   // read and decode
   monmap_bl.clear();
   int ret = get_version(version, monmap_bl);
-  assert(ret == 0);
-  assert(monmap_bl.length());
+  ceph_assert(ret == 0);
+  ceph_assert(monmap_bl.length());
 
   dout(10) << __func__ << " got " << version << dendl;
-  mon->monmap->decode(monmap_bl);
+  mon.monmap->decode(monmap_bl);
 
-  if (mon->store->exists("mkfs", "monmap")) {
+  if (mon.store->exists("mkfs", "monmap")) {
     auto t(std::make_shared<MonitorDBStore::Transaction>());
     t->erase("mkfs", "monmap");
-    mon->store->apply_transaction(t);
+    mon.store->apply_transaction(t);
   }
 
   check_subs();
+
+  // make sure we've recorded min_mon_release
+  string val;
+  if (mon.store->read_meta("min_mon_release", &val) < 0 ||
+      val.size() == 0 ||
+      atoi(val.c_str()) != (int)ceph_release()) {
+    dout(10) << __func__ << " updating min_mon_release meta" << dendl;
+    mon.store->write_meta("min_mon_release",
+                          stringify(ceph_release()));
+  }
+
+  mon.notify_new_monmap(true);
 }
 
 void MonmapMonitor::create_pending()
 {
-  pending_map = *mon->monmap;
+  pending_map = *mon.monmap;
   pending_map.epoch++;
   pending_map.last_changed = ceph_clock_now();
   dout(10) << __func__ << " monmap epoch " << pending_map.epoch << dendl;
@@ -94,85 +136,97 @@ void MonmapMonitor::encode_pending(MonitorDBStore::TransactionRef t)
 {
   dout(10) << __func__ << " epoch " << pending_map.epoch << dendl;
 
-  assert(mon->monmap->epoch + 1 == pending_map.epoch ||
+  ceph_assert(mon.monmap->epoch + 1 == pending_map.epoch ||
         pending_map.epoch == 1);  // special case mkfs!
   bufferlist bl;
-  pending_map.encode(bl, mon->get_quorum_con_features());
+  pending_map.encode(bl, mon.get_quorum_con_features());
 
   put_version(t, pending_map.epoch, bl);
   put_last_committed(t, pending_map.epoch);
 
   // generate a cluster fingerprint, too?
   if (pending_map.epoch == 1) {
-    mon->prepare_new_fingerprint(t);
+    mon.prepare_new_fingerprint(t);
   }
+
+  //health
+  health_check_map_t next;
+  pending_map.check_health(&next);
+  encode_health(next, t);
 }
 
 class C_ApplyFeatures : public Context {
   MonmapMonitor *svc;
   mon_feature_t features;
-  public:
-  C_ApplyFeatures(MonmapMonitor *s, const mon_feature_t& f) :
-    svc(s), features(f) { }
+  ceph_release_t min_mon_release;
+public:
+  C_ApplyFeatures(MonmapMonitor *s, const mon_feature_t& f, ceph_release_t mmr) :
+    svc(s), features(f), min_mon_release(mmr) { }
   void finish(int r) override {
     if (r >= 0) {
-      svc->apply_mon_features(features);
+      svc->apply_mon_features(features, min_mon_release);
     } else if (r == -EAGAIN || r == -ECANCELED) {
       // discard features if we're no longer on the quorum that
       // established them in the first place.
       return;
     } else {
-      assert(0 == "bad C_ApplyFeatures return value");
+      ceph_abort_msg("bad C_ApplyFeatures return value");
     }
   }
 };
 
-void MonmapMonitor::apply_mon_features(const mon_feature_t& features)
+void MonmapMonitor::apply_mon_features(const mon_feature_t& features,
+                                      ceph_release_t min_mon_release)
 {
   if (!is_writeable()) {
     dout(5) << __func__ << " wait for service to be writeable" << dendl;
-    wait_for_writeable_ctx(new C_ApplyFeatures(this, features));
+    wait_for_writeable_ctx(new C_ApplyFeatures(this, features, min_mon_release));
+    return;
+  }
+
+  // do nothing here unless we have a full quorum
+  if (mon.get_quorum().size() < mon.monmap->size()) {
     return;
   }
 
-  assert(is_writeable());
-  assert(features.contains_all(pending_map.persistent_features));
+  ceph_assert(is_writeable());
+  ceph_assert(features.contains_all(pending_map.persistent_features));
   // we should never hit this because `features` should be the result
   // of the quorum's supported features. But if it happens, die.
-  assert(ceph::features::mon::get_supported().contains_all(features));
+  ceph_assert(ceph::features::mon::get_supported().contains_all(features));
 
   mon_feature_t new_features =
     (pending_map.persistent_features ^
      (features & ceph::features::mon::get_persistent()));
 
-  if (new_features.empty()) {
-    dout(10) << __func__ << " features match current pending: "
-             << features << dendl;
+  if (new_features.empty() &&
+      pending_map.min_mon_release == min_mon_release) {
+    dout(10) << __func__ << " min_mon_release (" << (int)min_mon_release
+            << ") and features (" << features << ") match" << dendl;
     return;
   }
 
-  if (mon->get_quorum().size() < mon->monmap->size()) {
-    dout(1) << __func__ << " new features " << new_features
-      << " contains features that require a full quorum"
-      << " (quorum size is " << mon->get_quorum().size()
-      << ", requires " << mon->monmap->size() << "): "
-      << new_features
-      << " -- do not enable them!" << dendl;
-    return;
+  if (!new_features.empty()) {
+    dout(1) << __func__ << " applying new features "
+           << new_features << ", had " << pending_map.persistent_features
+           << ", will have "
+           << (new_features | pending_map.persistent_features)
+           << dendl;
+    pending_map.persistent_features |= new_features;
+  }
+  if (min_mon_release > pending_map.min_mon_release) {
+    dout(1) << __func__ << " increasing min_mon_release to "
+           << to_integer<int>(min_mon_release) << " (" << min_mon_release
+           << ")" << dendl;
+    pending_map.min_mon_release = min_mon_release;
   }
 
-  new_features |= pending_map.persistent_features;
-
-  dout(5) << __func__ << " applying new features to monmap;"
-          << " had " << pending_map.persistent_features
-          << ", will have " << new_features << dendl;
-  pending_map.persistent_features = new_features;
   propose_pending();
 }
 
 void MonmapMonitor::on_active()
 {
-  if (get_last_committed() >= 1 && !mon->has_ever_joined) {
+  if (get_last_committed() >= 1 && !mon.has_ever_joined) {
     // make note of the fact that i was, once, part of the quorum.
     dout(10) << "noting that i was, once, part of an active quorum." << dendl;
 
@@ -184,23 +238,32 @@ void MonmapMonitor::on_active()
      */
     auto t(std::make_shared<MonitorDBStore::Transaction>());
     t->put(Monitor::MONITOR_NAME, "joined", 1);
-    mon->store->apply_transaction(t);
-    mon->has_ever_joined = true;
+    mon.store->apply_transaction(t);
+    mon.has_ever_joined = true;
   }
 
-  if (mon->is_leader())
-    mon->clog->info() << "monmap " << *mon->monmap;
+  if (mon.is_leader()) {
+    mon.clog->debug() << "monmap " << *mon.monmap;
+  }
 
-  apply_mon_features(mon->get_quorum_mon_features());
+  apply_mon_features(mon.get_quorum_mon_features(),
+                    mon.quorum_min_mon_release);
 }
 
 bool MonmapMonitor::preprocess_query(MonOpRequestRef op)
 {
-  PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
+  auto m = op->get_req<PaxosServiceMessage>();
   switch (m->get_type()) {
     // READs
   case MSG_MON_COMMAND:
-    return preprocess_command(op);
+    try {
+      return preprocess_command(op);
+    }
+    catch (const bad_cmd_get& e) {
+      bufferlist bl;
+      mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
+      return true;
+    }
   case MSG_MON_JOIN:
     return preprocess_join(op);
   default:
@@ -214,46 +277,65 @@ void MonmapMonitor::dump_info(Formatter *f)
   f->dump_unsigned("monmap_first_committed", get_first_committed());
   f->dump_unsigned("monmap_last_committed", get_last_committed());
   f->open_object_section("monmap");
-  mon->monmap->dump(f);
+  mon.monmap->dump(f);
   f->close_section();
   f->open_array_section("quorum");
-  for (set<int>::iterator q = mon->get_quorum().begin(); q != mon->get_quorum().end(); ++q)
+  for (set<int>::iterator q = mon.get_quorum().begin(); q != mon.get_quorum().end(); ++q)
     f->dump_int("mon", *q);
   f->close_section();
 }
 
 bool MonmapMonitor::preprocess_command(MonOpRequestRef op)
 {
-  MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
+  auto m = op->get_req<MMonCommand>();
   int r = -1;
   bufferlist rdata;
   stringstream ss;
 
-  map<string, cmd_vartype> cmdmap;
+  cmdmap_t cmdmap;
   if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
     string rs = ss.str();
-    mon->reply_command(op, -EINVAL, rs, rdata, get_last_committed());
+    mon.reply_command(op, -EINVAL, rs, rdata, get_last_committed());
     return true;
   }
 
   string prefix;
-  cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
+  cmd_getval(cmdmap, "prefix", prefix);
 
-  MonSession *session = m->get_session();
+  MonSession *session = op->get_session();
   if (!session) {
-    mon->reply_command(op, -EACCES, "access denied", get_last_committed());
+    mon.reply_command(op, -EACCES, "access denied", get_last_committed());
     return true;
   }
 
   string format;
-  cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
+  cmd_getval(cmdmap, "format", format, string("plain"));
   boost::scoped_ptr<Formatter> f(Formatter::create(format));
 
   if (prefix == "mon stat") {
-    mon->monmap->print_summary(ss);
-    ss << ", election epoch " << mon->get_epoch() << ", leader "
-       << mon->get_leader() << " " << mon->get_leader_name()
-       << ", quorum " << mon->get_quorum() << " " << mon->get_quorum_names();
+    if (f) {
+      f->open_object_section("monmap");
+      mon.monmap->dump_summary(f.get());
+      f->dump_string("leader", mon.get_leader_name());
+      f->open_array_section("quorum");
+      for (auto rank: mon.get_quorum()) {
+        std::string name = mon.monmap->get_name(rank);
+        f->open_object_section("mon");
+        f->dump_int("rank", rank);
+        f->dump_string("name", name);
+        f->close_section();  // mon
+      }
+      f->close_section();  // quorum
+      f->close_section();  // monmap
+      f->flush(ss);
+    } else {
+      mon.monmap->print_summary(ss);
+      ss << ", election epoch " << mon.get_epoch() << ", leader "
+         << mon.get_leader() << " " << mon.get_leader_name()
+         << ", quorum " << mon.get_quorum()
+         << " " << mon.get_quorum_names();
+    }
+
     rdata.append(ss);
     ss.str("");
     r = 0;
@@ -263,10 +345,10 @@ bool MonmapMonitor::preprocess_command(MonOpRequestRef op)
 
     epoch_t epoch;
     int64_t epochnum;
-    cmd_getval(g_ceph_context, cmdmap, "epoch", epochnum, (int64_t)0);
+    cmd_getval(cmdmap, "epoch", epochnum, (int64_t)0);
     epoch = epochnum;
 
-    MonMap *p = mon->monmap;
+    MonMap *p = mon.monmap;
     if (epoch) {
       bufferlist bl;
       r = get_version(epoch, bl);
@@ -274,13 +356,13 @@ bool MonmapMonitor::preprocess_command(MonOpRequestRef op)
         ss << "there is no map for epoch " << epoch;
         goto reply;
       }
-      assert(r == 0);
-      assert(bl.length() > 0);
+      ceph_assert(r == 0);
+      ceph_assert(bl.length() > 0);
       p = new MonMap;
       p->decode(bl);
     }
 
-    assert(p != NULL);
+    ceph_assert(p);
 
     if (prefix == "mon getmap") {
       p->encode(rdata, m->get_connection()->get_features());
@@ -292,8 +374,8 @@ bool MonmapMonitor::preprocess_command(MonOpRequestRef op)
         f->open_object_section("monmap");
         p->dump(f.get());
         f->open_array_section("quorum");
-        for (set<int>::iterator q = mon->get_quorum().begin();
-            q != mon->get_quorum().end(); ++q) {
+        for (set<int>::iterator q = mon.get_quorum().begin();
+            q != mon.get_quorum().end(); ++q) {
           f->dump_int("mon", *q);
         }
         f->close_section();
@@ -307,19 +389,21 @@ bool MonmapMonitor::preprocess_command(MonOpRequestRef op)
       rdata.append(ds);
       ss << "dumped monmap epoch " << p->get_epoch();
     }
-    if (p != mon->monmap)
+    if (p != mon.monmap) {
        delete p;
+       p = nullptr;
+    }
 
   } else if (prefix == "mon feature ls") {
    
     bool list_with_value = false;
     string with_value;
-    if (cmd_getval(g_ceph_context, cmdmap, "with_value", with_value) &&
+    if (cmd_getval(cmdmap, "with_value", with_value) &&
         with_value == "--with-value") {
       list_with_value = true;
     }
 
-    MonMap *p = mon->monmap;
+    MonMap *p = mon.monmap;
 
     // list features
     mon_feature_t supported = ceph::features::mon::get_supported();
@@ -388,7 +472,7 @@ reply:
     string rs;
     getline(ss, rs);
 
-    mon->reply_command(op, r, rs, rdata, get_last_committed());
+    mon.reply_command(op, r, rs, rdata, get_last_committed());
     return true;
   } else
     return false;
@@ -397,12 +481,18 @@ reply:
 
 bool MonmapMonitor::prepare_update(MonOpRequestRef op)
 {
-  PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
+  auto m = op->get_req<PaxosServiceMessage>();
   dout(7) << __func__ << " " << *m << " from " << m->get_orig_source_inst() << dendl;
   
   switch (m->get_type()) {
   case MSG_MON_COMMAND:
-    return prepare_command(op);
+    try {
+      return prepare_command(op);
+    } catch (const bad_cmd_get& e) {
+      bufferlist bl;
+      mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
+      return true;
+    }
   case MSG_MON_JOIN:
     return prepare_join(op);
   default:
@@ -414,24 +504,24 @@ bool MonmapMonitor::prepare_update(MonOpRequestRef op)
 
 bool MonmapMonitor::prepare_command(MonOpRequestRef op)
 {
-  MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
+  auto m = op->get_req<MMonCommand>();
   stringstream ss;
   string rs;
   int err = -EINVAL;
 
-  map<string, cmd_vartype> cmdmap;
+  cmdmap_t cmdmap;
   if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
     string rs = ss.str();
-    mon->reply_command(op, -EINVAL, rs, get_last_committed());
+    mon.reply_command(op, -EINVAL, rs, get_last_committed());
     return true;
   }
 
   string prefix;
-  cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
+  cmd_getval(cmdmap, "prefix", prefix);
 
-  MonSession *session = m->get_session();
+  MonSession *session = op->get_session();
   if (!session) {
-    mon->reply_command(op, -EACCES, "access denied", get_last_committed());
+    mon.reply_command(op, -EACCES, "access denied", get_last_committed());
     return true;
   }
 
@@ -463,8 +553,8 @@ bool MonmapMonitor::prepare_command(MonOpRequestRef op)
    * state, thus we are not bound by it.
    */
 
-  assert(mon->monmap);
-  MonMap &monmap = *mon->monmap;
+  ceph_assert(mon.monmap);
+  MonMap &monmap = *mon.monmap;
 
 
   /* Please note:
@@ -488,9 +578,9 @@ bool MonmapMonitor::prepare_command(MonOpRequestRef op)
   bool propose = false;
   if (prefix == "mon add") {
     string name;
-    cmd_getval(g_ceph_context, cmdmap, "name", name);
+    cmd_getval(cmdmap, "name", name);
     string addrstr;
-    cmd_getval(g_ceph_context, cmdmap, "addr", addrstr);
+    cmd_getval(cmdmap, "addr", addrstr);
     entity_addr_t addr;
     bufferlist rdata;
 
@@ -500,10 +590,64 @@ bool MonmapMonitor::prepare_command(MonOpRequestRef op)
       goto reply;
     }
 
-    if (addr.get_port() == 0) {
-      ss << "port defaulted to " << CEPH_MON_PORT;
-      addr.set_port(CEPH_MON_PORT);
+    vector<string> locationvec;
+    map<string, string> loc;
+    cmd_getval(cmdmap, "location", locationvec);
+    CrushWrapper::parse_loc_map(locationvec, &loc);
+    if (locationvec.size() &&
+       !mon.get_quorum_mon_features().contains_all(
+                                       ceph::features::mon::FEATURE_PINGING)) {
+      err = -ENOTSUP;
+      ss << "Not all monitors support adding monitors with a location; please upgrade first!";
+      goto reply;
+    }
+    if (locationvec.size() && !loc.size()) {
+      ss << "We could not parse your input location to anything real; " << locationvec
+        << " turned into an empty map!";
+      err = -EINVAL;
+      goto reply;
+    }
+
+    dout(10) << "mon add setting location for " << name << " to " << loc << dendl;
+
+    // TODO: validate location in crush map
+    if (monmap.stretch_mode_enabled && !loc.size()) {
+      ss << "We are in stretch mode and new monitors must have a location, but "
+        << "could not parse your input location to anything real; " << locationvec
+        << " turned into an empty map!";
+      err = -EINVAL;
+      goto reply;
+    }
+    // TODO: validate location against any existing stretch config
+
+    entity_addrvec_t addrs;
+    if (monmap.persistent_features.contains_all(
+         ceph::features::mon::FEATURE_NAUTILUS)) {
+      if (addr.get_port() == CEPH_MON_PORT_IANA) {
+       addr.set_type(entity_addr_t::TYPE_MSGR2);
+      }
+      if (addr.get_port() == CEPH_MON_PORT_LEGACY) {
+       // if they specified the *old* default they probably don't care
+       addr.set_port(0);
+      }
+      if (addr.get_port()) {
+       addrs.v.push_back(addr);
+      } else {
+       addr.set_type(entity_addr_t::TYPE_MSGR2);
+       addr.set_port(CEPH_MON_PORT_IANA);
+       addrs.v.push_back(addr);
+       addr.set_type(entity_addr_t::TYPE_LEGACY);
+       addr.set_port(CEPH_MON_PORT_LEGACY);
+       addrs.v.push_back(addr);
+      }
+    } else {
+      if (addr.get_port() == 0) {
+       addr.set_port(CEPH_MON_PORT_LEGACY);
+      }
+      addr.set_type(entity_addr_t::TYPE_LEGACY);
+      addrs.v.push_back(addr);
     }
+    dout(20) << __func__ << " addr " << addr << " -> addrs " << addrs << dendl;
 
     /**
      * If we have a monitor with the same name and different addr, then EEXIST
@@ -519,19 +663,19 @@ bool MonmapMonitor::prepare_command(MonOpRequestRef op)
 
     do {
       if (monmap.contains(name)) {
-        if (monmap.get_addr(name) == addr) {
+        if (monmap.get_addrs(name) == addrs) {
           // stable map contains monitor with the same name at the same address.
           // serialize before current pending map.
           err = 0; // for clarity; this has already been set above.
-          ss << "mon." << name << " at " << addr << " already exists";
+          ss << "mon." << name << " at " << addrs << " already exists";
           goto reply;
         } else {
           ss << "mon." << name
-             << " already exists at address " << monmap.get_addr(name);
+             << " already exists at address " << monmap.get_addrs(name);
         }
-      } else if (monmap.contains(addr)) {
+      } else if (monmap.contains(addrs)) {
         // we established on the previous branch that name is different
-        ss << "mon." << monmap.get_name(addr)
+        ss << "mon." << monmap.get_name(addrs)
            << " already exists at address " << addr;
       } else {
         // go ahead and add
@@ -541,6 +685,10 @@ bool MonmapMonitor::prepare_command(MonOpRequestRef op)
       goto reply;
     } while (false);
 
+    if (pending_map.stretch_mode_enabled) {
+      
+    }
+    
     /* Given there's no delay between proposals on the MonmapMonitor (see
      * MonmapMonitor::should_propose()), there is no point in checking for
      * a mismatch between name and addr on pending_map.
@@ -549,16 +697,17 @@ bool MonmapMonitor::prepare_command(MonOpRequestRef op)
      * we can simply go ahead and add the monitor.
      */
 
-    pending_map.add(name, addr);
+    pending_map.add(name, addrs);
+    pending_map.mon_info[name].crush_loc = loc;
     pending_map.last_changed = ceph_clock_now();
-    ss << "adding mon." << name << " at " << addr;
+    ss << "adding mon." << name << " at " << addrs;
     propose = true;
     dout(0) << __func__ << " proposing new mon." << name << dendl;
 
   } else if (prefix == "mon remove" ||
              prefix == "mon rm") {
     string name;
-    cmd_getval(g_ceph_context, cmdmap, "name", name);
+    cmd_getval(cmdmap, "name", name);
     if (!monmap.contains(name)) {
       err = 0;
       ss << "mon." << name << " does not exist or has already been removed";
@@ -600,10 +749,10 @@ bool MonmapMonitor::prepare_command(MonOpRequestRef op)
      * introduced.
      */
 
-    entity_addr_t addr = pending_map.get_addr(name);
+    entity_addrvec_t addrs = pending_map.get_addrs(name);
     pending_map.remove(name);
     pending_map.last_changed = ceph_clock_now();
-    ss << "removing mon." << name << " at " << addr
+    ss << "removing mon." << name << " at " << addrs
        << ", there will be " << pending_map.size() << " monitors" ;
     propose = true;
     err = 0;
@@ -624,7 +773,7 @@ bool MonmapMonitor::prepare_command(MonOpRequestRef op)
      * 'mon flag set/unset'.
      */
     string feature_name;
-    if (!cmd_getval(g_ceph_context, cmdmap, "feature_name", feature_name)) {
+    if (!cmd_getval(cmdmap, "feature_name", feature_name)) {
       ss << "missing required feature name";
       err = -EINVAL;
       goto reply;
@@ -638,9 +787,9 @@ bool MonmapMonitor::prepare_command(MonOpRequestRef op)
       goto reply;
     }
 
-    string sure;
-    if (!cmd_getval(g_ceph_context, cmdmap, "sure", sure) ||
-        sure != "--yes-i-really-mean-it") {
+    bool sure = false;
+    cmd_getval(cmdmap, "yes_i_really_mean_it", sure);
+    if (!sure) {
       ss << "please specify '--yes-i-really-mean-it' if you "
          << "really, **really** want to set feature '"
          << feature << "' in the monmap.";
@@ -648,10 +797,10 @@ bool MonmapMonitor::prepare_command(MonOpRequestRef op)
       goto reply;
     }
 
-    if (!mon->get_quorum_mon_features().contains_all(feature)) {
+    if (!mon.get_quorum_mon_features().contains_all(feature)) {
       ss << "current quorum does not support feature '" << feature
          << "'; supported features: "
-         << mon->get_quorum_mon_features();
+         << mon.get_quorum_mon_features();
       err = -EINVAL;
       goto reply;
     }
@@ -669,11 +818,315 @@ bool MonmapMonitor::prepare_command(MonOpRequestRef op)
     pending_map.last_changed = ceph_clock_now();
     propose = true;
 
-    dout(1) << __func__ << ss.str() << "; new features will be: "
+    dout(1) << __func__ << " " << ss.str() << "; new features will be: "
             << "persistent = " << pending_map.persistent_features
             // output optional nevertheless, for auditing purposes.
             << ", optional = " << pending_map.optional_features << dendl;
-    
+
+  } else if (prefix == "mon set-rank") {
+    string name;
+    int64_t rank;
+    if (!cmd_getval(cmdmap, "name", name) ||
+       !cmd_getval(cmdmap, "rank", rank)) {
+      err = -EINVAL;
+      goto reply;
+    }
+    int oldrank = pending_map.get_rank(name);
+    if (oldrank < 0) {
+      ss << "mon." << name << " does not exist in monmap";
+      err = -ENOENT;
+      goto reply;
+    }
+    err = 0;
+    pending_map.set_rank(name, rank);
+    pending_map.last_changed = ceph_clock_now();
+    propose = true;
+  } else if (prefix == "mon set-addrs") {
+    string name;
+    string addrs;
+    if (!cmd_getval(cmdmap, "name", name) ||
+       !cmd_getval(cmdmap, "addrs", addrs)) {
+      err = -EINVAL;
+      goto reply;
+    }
+    if (!pending_map.contains(name)) {
+      ss << "mon." << name << " does not exist";
+      err = -ENOENT;
+      goto reply;
+    }
+    entity_addrvec_t av;
+    if (!av.parse(addrs.c_str(), nullptr)) {
+      ss << "failed to parse addrs '" << addrs << "'";
+      err = -EINVAL;
+      goto reply;
+    }
+    for (auto& a : av.v) {
+      a.set_nonce(0);
+      if (!a.get_port()) {
+       ss << "monitor must bind to a non-zero port, not " << a;
+       err = -EINVAL;
+       goto reply;
+      }
+    }
+    err = 0;
+    pending_map.set_addrvec(name, av);
+    pending_map.last_changed = ceph_clock_now();
+    propose = true;
+  } else if (prefix == "mon set-weight") {
+    string name;
+    int64_t weight;
+    if (!cmd_getval(cmdmap, "name", name) ||
+        !cmd_getval(cmdmap, "weight", weight)) {
+      err = -EINVAL;
+      goto reply;
+    }
+    if (!pending_map.contains(name)) {
+      ss << "mon." << name << " does not exist";
+      err = -ENOENT;
+      goto reply;
+    }
+    err = 0;
+    pending_map.set_weight(name, weight);
+    pending_map.last_changed = ceph_clock_now();
+    propose = true;
+  } else if (prefix == "mon enable-msgr2") {
+    if (!monmap.get_required_features().contains_all(
+         ceph::features::mon::FEATURE_NAUTILUS)) {
+      err = -EACCES;
+      ss << "all monitors must be running nautilus to enable v2";
+      goto reply;
+    }
+    for (auto& i : pending_map.mon_info) {
+      if (i.second.public_addrs.v.size() == 1 &&
+         i.second.public_addrs.front().is_legacy() &&
+         i.second.public_addrs.front().get_port() == CEPH_MON_PORT_LEGACY) {
+       entity_addrvec_t av;
+       entity_addr_t a = i.second.public_addrs.front();
+       a.set_type(entity_addr_t::TYPE_MSGR2);
+       a.set_port(CEPH_MON_PORT_IANA);
+       av.v.push_back(a);
+       av.v.push_back(i.second.public_addrs.front());
+       dout(10) << " setting mon." << i.first
+                << " addrs " << i.second.public_addrs
+                << " -> " << av << dendl;
+       pending_map.set_addrvec(i.first, av);
+       propose = true;
+       pending_map.last_changed = ceph_clock_now();
+      }
+    }
+    err = 0;
+  } else if (prefix == "mon set election_strategy") {
+    if (!mon.get_quorum_mon_features().contains_all(
+                                       ceph::features::mon::FEATURE_PINGING)) {
+      err = -ENOTSUP;
+      ss << "Not all monitors support changing election strategies; please upgrade first!";
+      goto reply;
+    }
+    string strat;
+    MonMap::election_strategy strategy;
+    if (!cmd_getval(cmdmap, "strategy", strat)) {
+      err = -EINVAL;
+      goto reply;
+    }
+    if (strat == "classic") {
+      strategy = MonMap::CLASSIC;
+    } else if (strat == "disallow") {
+      strategy = MonMap::DISALLOW;
+    } else if (strat == "connectivity") {
+      strategy = MonMap::CONNECTIVITY;
+    } else {
+      err = -EINVAL;
+      goto reply;
+    }
+    err = 0;
+    pending_map.strategy = strategy;
+    propose = true;
+  } else if (prefix == "mon add disallowed_leader") {
+    if (!mon.get_quorum_mon_features().contains_all(
+                                       ceph::features::mon::FEATURE_PINGING)) {
+      err = -ENOTSUP;
+      ss << "Not all monitors support changing election strategies; please upgrade first!";
+      goto reply;
+    }
+    string name;
+    if (!cmd_getval(cmdmap, "name", name)) {
+      err = -EINVAL;
+      goto reply;
+    }
+    if (pending_map.strategy != MonMap::DISALLOW &&
+       pending_map.strategy != MonMap::CONNECTIVITY) {
+      ss << "You cannot disallow monitors in your current election mode";
+      err = -EINVAL;
+      goto reply;
+    }
+    if (!pending_map.contains(name)) {
+      ss << "mon." << name << " does not exist";
+      err = -ENOENT;
+      goto reply;
+    }
+    if (pending_map.disallowed_leaders.count(name)) {
+      ss << "mon." << name << " is already disallowed";
+      err = 0;
+      goto reply;
+    }
+    if (pending_map.disallowed_leaders.size() == pending_map.size() - 1) {
+      ss << "mon." << name << " is the only remaining allowed leader!";
+      err = -EINVAL;
+      goto reply;
+    }
+    pending_map.disallowed_leaders.insert(name);
+    err = 0;
+    propose = true;
+  } else if (prefix == "mon rm disallowed_leader") {
+    if (!mon.get_quorum_mon_features().contains_all(
+                                       ceph::features::mon::FEATURE_PINGING)) {
+      err = -ENOTSUP;
+      ss << "Not all monitors support changing election strategies; please upgrade first!";
+      goto reply;
+    }
+    string name;
+    if (!cmd_getval(cmdmap, "name", name)) {
+      err = -EINVAL;
+      goto reply;
+    }
+    if (pending_map.strategy != MonMap::DISALLOW &&
+       pending_map.strategy != MonMap::CONNECTIVITY) {
+      ss << "You cannot disallow monitors in your current election mode";
+      err = -EINVAL;
+      goto reply;
+    }
+    if (!pending_map.contains(name)) {
+      ss << "mon." << name << " does not exist";
+      err = -ENOENT;
+      goto reply;
+    }
+    if (!pending_map.disallowed_leaders.count(name)) {
+      ss << "mon." << name << " is already allowed";
+      err = 0;
+      goto reply;
+    }
+    pending_map.disallowed_leaders.erase(name);
+    err = 0;
+    propose = true;
+  } else if (prefix == "mon set_location") {
+    if (!mon.get_quorum_mon_features().contains_all(
+                                       ceph::features::mon::FEATURE_PINGING)) {
+      err = -ENOTSUP;
+      ss << "Not all monitors support monitor locations; please upgrade first!";
+      goto reply;
+    }
+    string name;
+    if (!cmd_getval(cmdmap, "name", name)) {
+      err = -EINVAL;
+      goto reply;
+    }
+    if (!pending_map.contains(name)) {
+      ss << "mon." << name << " does not exist";
+      err = -ENOENT;
+      goto reply;
+    }
+
+    if (!mon.osdmon()->is_readable()) {
+      mon.osdmon()->wait_for_readable(op, new Monitor::C_RetryMessage(&mon, op));
+    }
+    vector<string> argvec;
+    map<string, string> loc;
+    cmd_getval(cmdmap, "args", argvec);
+    CrushWrapper::parse_loc_map(argvec, &loc);
+
+    dout(10) << "mon set_location for " << name << " to " << loc << dendl;
+
+    // TODO: validate location in crush map
+    if (!loc.size()) {
+      ss << "We could not parse your input location to anything real; " << argvec
+        << " turned into an empty map!";
+      err = -EINVAL;
+      goto reply;
+    }
+    // TODO: validate location against any existing stretch config
+    pending_map.mon_info[name].crush_loc = loc;
+    err = 0;
+    propose = true;
+  } else if (prefix == "mon enable_stretch_mode") {
+    if (!mon.osdmon()->is_writeable()) {
+      dout(1) << __func__
+             << ":  waiting for osdmon writeable for stretch mode" << dendl;
+      mon.osdmon()->wait_for_writeable(op, new Monitor::C_RetryMessage(&mon, op));
+      return false;
+    }
+    {
+      if (monmap.stretch_mode_enabled) {
+       ss << "stretch mode is already engaged";
+       err = -EINVAL;
+       goto reply;
+      }
+      if (pending_map.stretch_mode_enabled) {
+       ss << "stretch mode currently committing";
+       err = 0;
+       goto reply;
+      }
+      string tiebreaker_mon;
+      if (!cmd_getval(cmdmap, "tiebreaker_mon", tiebreaker_mon)) {
+       ss << "must specify a tiebreaker monitor";
+       err = -EINVAL;
+       goto reply;
+      }
+      string new_crush_rule;
+      if (!cmd_getval(cmdmap, "new_crush_rule", new_crush_rule)) {
+       ss << "must specify a new crush rule that spreads out copies over multiple sites";
+       err = -EINVAL;
+       goto reply;
+      }
+      string dividing_bucket;
+      if (!cmd_getval(cmdmap, "dividing_bucket", dividing_bucket)) {
+       ss << "must specify a dividing bucket";
+       err = -EINVAL;
+       goto reply;
+      }
+      //okay, initial arguments make sense, check pools and cluster state
+      err = mon.osdmon()->check_cluster_features(CEPH_FEATUREMASK_STRETCH_MODE, ss);
+      if (err)
+       goto reply;
+      struct Plugger {
+       Paxos &p;
+       Plugger(Paxos &p) : p(p) { p.plug(); }
+       ~Plugger() { p.unplug(); }
+      } plugger(paxos);
+
+      set<pg_pool_t*> pools;
+      bool okay = false;
+      int errcode = 0;
+
+      mon.osdmon()->try_enable_stretch_mode_pools(ss, &okay, &errcode,
+                                                  &pools, new_crush_rule);
+      if (!okay) {
+       err = errcode;
+       goto reply;
+      }
+      try_enable_stretch_mode(ss, &okay, &errcode, false,
+                             tiebreaker_mon, dividing_bucket);
+      if (!okay) {
+       err = errcode;
+       goto reply;
+      }
+      mon.osdmon()->try_enable_stretch_mode(ss, &okay, &errcode, false,
+                                            dividing_bucket, 2, pools, new_crush_rule);
+      if (!okay) {
+       err = errcode;
+       goto reply;
+      }
+      // everything looks good, actually commit the changes!
+      try_enable_stretch_mode(ss, &okay, &errcode, true,
+                             tiebreaker_mon, dividing_bucket);
+      mon.osdmon()->try_enable_stretch_mode(ss, &okay, &errcode, true,
+                                            dividing_bucket,
+                                            2, // right now we only support 2 sites
+                                            pools, new_crush_rule);
+      ceph_assert(okay == true);
+    }
+    request_proposal(mon.osdmon());
+    err = 0;
+    propose = true;
   } else {
     ss << "unknown command " << prefix;
     err = -EINVAL;
@@ -681,42 +1134,164 @@ bool MonmapMonitor::prepare_command(MonOpRequestRef op)
 
 reply:
   getline(ss, rs);
-  mon->reply_command(op, err, rs, get_last_committed());
+  mon.reply_command(op, err, rs, get_last_committed());
   // we are returning to the user; do not propose.
   return propose;
 }
 
+void MonmapMonitor::try_enable_stretch_mode(stringstream& ss, bool *okay,
+                                           int *errcode, bool commit,
+                                           const string& tiebreaker_mon,
+                                           const string& dividing_bucket)
+{
+  dout(20) << __func__ << dendl;
+  *okay = false;
+  if (pending_map.strategy != MonMap::CONNECTIVITY) {
+    ss << "Monitors must use the connectivity strategy to enable stretch mode";
+    *errcode = -EINVAL;
+    ceph_assert(!commit);
+    return;
+  }
+  if (!pending_map.contains(tiebreaker_mon)) {
+    ss << "mon " << tiebreaker_mon << "does not seem to exist";
+    *errcode = -ENOENT;
+    ceph_assert(!commit);
+    return;
+  }
+  map<string,string> buckets;
+  for (const auto&mii : mon.monmap->mon_info) {
+    const auto& mi = mii.second;
+    const auto& bi = mi.crush_loc.find(dividing_bucket);
+    if (bi == mi.crush_loc.end()) {
+      ss << "Could not find location entry for " << dividing_bucket
+        << " on monitor " << mi.name;
+      *errcode = -EINVAL;
+      ceph_assert(!commit);
+      return;
+    }
+    buckets[mii.first] = bi->second;
+  }
+  string bucket1, bucket2, tiebreaker_bucket;
+  for (auto& i : buckets) {
+    if (i.first == tiebreaker_mon) {
+      tiebreaker_bucket = i.second;
+      continue;
+    }
+    if (bucket1.empty()) {
+      bucket1 = i.second;
+    }
+    if (bucket1 != i.second &&
+       bucket2.empty()) {
+      bucket2 = i.second;
+    }
+    if (bucket1 != i.second &&
+       bucket2 != i.second) {
+      ss << "There are too many monitor buckets for stretch mode, found "
+        << bucket1 << "," << bucket2 << "," << i.second;
+      *errcode = -EINVAL;
+      ceph_assert(!commit);
+      return;
+    }
+  }
+  if (bucket1.empty() || bucket2.empty()) {
+    ss << "There are not enough monitor buckets for stretch mode;"
+       << " must have at least 2 plus the tiebreaker but only found "
+       << (bucket1.empty() ? bucket1 : bucket2);
+    *errcode = -EINVAL;
+    ceph_assert(!commit);
+    return;
+  }
+  if (tiebreaker_bucket == bucket1 ||
+      tiebreaker_bucket == bucket2) {
+    ss << "The named tiebreaker monitor " << tiebreaker_mon
+       << " is in the same CRUSH bucket " << tiebreaker_bucket
+       << " as other monitors";
+    *errcode = -EINVAL;
+    ceph_assert(!commit);
+    return;
+  }
+  if (commit) {
+    pending_map.disallowed_leaders.insert(tiebreaker_mon);
+    pending_map.tiebreaker_mon = tiebreaker_mon;
+    pending_map.stretch_mode_enabled = true;
+  }
+  *okay = true;
+}
+
+void MonmapMonitor::trigger_degraded_stretch_mode(const set<string>& dead_mons)
+{
+  dout(20) << __func__ << dendl;
+  pending_map.stretch_marked_down_mons.insert(dead_mons.begin(), dead_mons.end());
+  propose_pending();
+}
+
+void MonmapMonitor::trigger_healthy_stretch_mode()
+{
+  dout(20) << __func__ << dendl;
+  pending_map.stretch_marked_down_mons.clear();
+  propose_pending();
+}
+
 bool MonmapMonitor::preprocess_join(MonOpRequestRef op)
 {
-  MMonJoin *join = static_cast<MMonJoin*>(op->get_req());
-  dout(10) << __func__ << " " << join->name << " at " << join->addr << dendl;
+  auto join = op->get_req<MMonJoin>();
+  dout(10) << __func__ << " " << join->name << " at " << join->addrs << dendl;
 
-  MonSession *session = join->get_session();
+  MonSession *session = op->get_session();
   if (!session ||
       !session->is_capable("mon", MON_CAP_W | MON_CAP_X)) {
     dout(10) << " insufficient caps" << dendl;
     return true;
   }
 
-  if (pending_map.contains(join->name) && !pending_map.get_addr(join->name).is_blank_ip()) {
+  const auto name_info_i = pending_map.mon_info.find(join->name);
+  if (name_info_i != pending_map.mon_info.end() &&
+      !name_info_i->second.public_addrs.front().is_blank_ip() &&
+      (!join->force_loc || join->crush_loc == name_info_i->second.crush_loc)) {
     dout(10) << " already have " << join->name << dendl;
     return true;
   }
-  if (pending_map.contains(join->addr) && pending_map.get_name(join->addr) == join->name) {
-    dout(10) << " already have " << join->addr << dendl;
+  string addr_name;
+  if (pending_map.contains(join->addrs)) {
+    addr_name = pending_map.get_name(join->addrs);
+  }
+  if (!addr_name.empty() &&
+      addr_name == join->name &&
+      (!join->force_loc || join->crush_loc.empty() ||
+       pending_map.mon_info[addr_name].crush_loc == join->crush_loc)) {
+    dout(10) << " already have " << join->addrs << dendl;
+    return true;
+  }
+  if (pending_map.stretch_mode_enabled &&
+      join->crush_loc.empty() &&
+      (addr_name.empty() ||
+       pending_map.mon_info[addr_name].crush_loc.empty())) {
+    dout(10) << "stretch mode engaged but no source of crush_loc" << dendl;
+    mon.clog->info() << join->name << " attempted to join from " << join->name
+                     << ' ' << join->addrs
+                     << "; but lacks a crush_location for stretch mode";
     return true;
   }
   return false;
 }
+
 bool MonmapMonitor::prepare_join(MonOpRequestRef op)
 {
-  MMonJoin *join = static_cast<MMonJoin*>(op->get_req());
-  dout(0) << "adding/updating " << join->name << " at " << join->addr << " to monitor cluster" << dendl;
+  auto join = op->get_req<MMonJoin>();
+  dout(0) << "adding/updating " << join->name
+         << " at " << join->addrs << " to monitor cluster" << dendl;
+  map<string,string> existing_loc;
+  if (pending_map.contains(join->addrs)) {
+    string name = pending_map.get_name(join->addrs);
+    existing_loc = pending_map.mon_info[name].crush_loc;
+    pending_map.remove(name);
+  }
   if (pending_map.contains(join->name))
     pending_map.remove(join->name);
-  if (pending_map.contains(join->addr))
-    pending_map.remove(pending_map.get_name(join->addr));
-  pending_map.add(join->name, join->addr);
+  pending_map.add(join->name, join->addrs);
+  pending_map.mon_info[join->name].crush_loc =
+    ((join->force_loc || existing_loc.empty()) ?
+     join->crush_loc : existing_loc);
   pending_map.last_changed = ceph_clock_now();
   return true;
 }
@@ -727,37 +1302,12 @@ bool MonmapMonitor::should_propose(double& delay)
   return true;
 }
 
-void MonmapMonitor::get_health(list<pair<health_status_t, string> >& summary,
-                              list<pair<health_status_t, string> > *detail,
-                              CephContext *cct) const
-{
-  int max = mon->monmap->size();
-  int actual = mon->get_quorum().size();
-  if (actual < max) {
-    ostringstream ss;
-    ss << (max-actual) << " mons down, quorum " << mon->get_quorum() << " " << mon->get_quorum_names();
-    summary.push_back(make_pair(HEALTH_WARN, ss.str()));
-    if (detail) {
-      set<int> q = mon->get_quorum();
-      for (int i=0; i<max; i++) {
-       if (q.count(i) == 0) {
-         ostringstream ss;
-         ss << "mon." << mon->monmap->get_name(i) << " (rank " << i
-            << ") addr " << mon->monmap->get_addr(i)
-            << " is down (out of quorum)";
-         detail->push_back(make_pair(HEALTH_WARN, ss.str()));
-       }
-      }
-    }
-  }
-}
-
 int MonmapMonitor::get_monmap(bufferlist &bl)
 {
   version_t latest_ver = get_last_committed();
   dout(10) << __func__ << " ver " << latest_ver << dendl;
 
-  if (!mon->store->exists(get_service_name(), stringify(latest_ver)))
+  if (!mon.store->exists(get_service_name(), stringify(latest_ver)))
     return -ENOENT;
 
   int err = get_version(latest_ver, bl);
@@ -772,7 +1322,7 @@ int MonmapMonitor::get_monmap(bufferlist &bl)
 void MonmapMonitor::check_subs()
 {
   const string type = "monmap";
-  mon->with_session_map([this, &type](const MonSessionMap& session_map) {
+  mon.with_session_map([this, &type](const MonSessionMap& session_map) {
       auto subs = session_map.subs.find(type);
       if (subs == session_map.subs.end())
        return;
@@ -784,14 +1334,14 @@ void MonmapMonitor::check_subs()
 
 void MonmapMonitor::check_sub(Subscription *sub)
 {
-  const auto epoch = mon->monmap->get_epoch();
+  const auto epoch = mon.monmap->get_epoch();
   dout(10) << __func__
           << " monmap next " << sub->next
           << " have " << epoch << dendl;
   if (sub->next <= epoch) {
-    mon->send_latest_monmap(sub->session->con.get());
+    mon.send_latest_monmap(sub->session->con.get());
     if (sub->onetime) {
-      mon->with_session_map([this, sub](MonSessionMap& session_map) {
+      mon.with_session_map([sub](MonSessionMap& session_map) {
          session_map.remove_sub(sub);
        });
     } else {
@@ -799,3 +1349,38 @@ void MonmapMonitor::check_sub(Subscription *sub)
     }
   }
 }
+
+void MonmapMonitor::tick()
+{
+  if (!is_active() ||
+      !mon.is_leader()) {
+    return;
+  }
+
+  if (mon.monmap->created.is_zero()) {
+    dout(10) << __func__ << " detected empty created stamp" << dendl;
+    utime_t ctime;
+    for (version_t v = 1; v <= get_last_committed(); v++) {
+      bufferlist bl;
+      int r = get_version(v, bl);
+      if (r < 0) {
+       continue;
+      }
+      MonMap m;
+      auto p = bl.cbegin();
+      decode(m, p);
+      if (!m.last_changed.is_zero()) {
+       dout(10) << __func__ << " first monmap with last_changed is "
+                << v << " with " << m.last_changed << dendl;
+       ctime = m.last_changed;
+       break;
+      }
+    }
+    if (ctime.is_zero()) {
+      ctime = ceph_clock_now();
+    }
+    dout(10) << __func__ << " updating created stamp to " << ctime << dendl;
+    pending_map.created = ctime;
+    propose_pending();
+  }
+}