]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/osd/osd.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / osd / osd.cc
index 521cb9ba3bb180b30902811a2d4b849a31852f8b..8f5a7f9d473e320cd5c4fe712b28d342481f9709 100644 (file)
 #include "messages/MOSDMap.h"
 #include "messages/MOSDMarkMeDown.h"
 #include "messages/MOSDOp.h"
-#include "messages/MOSDPGLog.h"
-#include "messages/MOSDPGPull.h"
-#include "messages/MOSDPGPush.h"
-#include "messages/MOSDPGPushReply.h"
-#include "messages/MOSDPGRecoveryDelete.h"
-#include "messages/MOSDPGRecoveryDeleteReply.h"
+#include "messages/MOSDPeeringOp.h"
 #include "messages/MOSDRepOpReply.h"
 #include "messages/MOSDScrub2.h"
 #include "messages/MPGStats.h"
@@ -39,6 +34,7 @@
 
 #include "crimson/admin/osd_admin.h"
 #include "crimson/admin/pg_commands.h"
+#include "crimson/common/buffer_io.h"
 #include "crimson/common/exception.h"
 #include "crimson/mon/MonClient.h"
 #include "crimson/net/Connection.h"
@@ -64,12 +60,20 @@ namespace {
   static constexpr int TICK_INTERVAL = 1;
 }
 
+using std::make_unique;
+using std::map;
+using std::pair;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
 using crimson::common::local_conf;
 using crimson::os::FuturizedStore;
 
 namespace crimson::osd {
 
 OSD::OSD(int id, uint32_t nonce,
+         crimson::os::FuturizedStore& store,
          crimson::net::MessengerRef cluster_msgr,
          crimson::net::MessengerRef public_msgr,
          crimson::net::MessengerRef hb_front_msgr,
@@ -82,11 +86,8 @@ OSD::OSD(int id, uint32_t nonce,
     public_msgr{public_msgr},
     monc{new crimson::mon::Client{*public_msgr, *this}},
     mgrc{new crimson::mgr::Client{*public_msgr, *this}},
-    store{crimson::os::FuturizedStore::create(
-      local_conf().get_val<std::string>("osd_objectstore"),
-      local_conf().get_val<std::string>("osd_data"),
-      local_conf().get_config_values())},
-    shard_services{*this, whoami, *cluster_msgr, *public_msgr, *monc, *mgrc, *store},
+    store{store},
+    shard_services{*this, whoami, *cluster_msgr, *public_msgr, *monc, *mgrc, store},
     heartbeat{new Heartbeat{whoami, shard_services, *monc, hb_front_msgr, hb_back_msgr}},
     // do this in background
     tick_timer{[this] {
@@ -94,7 +95,9 @@ OSD::OSD(int id, uint32_t nonce,
       update_stats();
     }},
     asok{seastar::make_lw_shared<crimson::admin::AdminSocket>()},
-    osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services)))
+    osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services))),
+    log_client(cluster_msgr.get(), LogClient::NO_FLAGS),
+    clog(log_client.create_channel())
 {
   osdmaps[0] = boost::make_local_shared<OSDMap>();
   for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr),
@@ -110,6 +113,9 @@ OSD::OSD(int id, uint32_t nonce,
                     __func__, cpp_strerror(r));
     }
   }
+  logger().info("{}: nonce is {}", __func__, nonce);
+  monc->set_log_client(&log_client);
+  clog->set_log_to_monitors(true);
 }
 
 OSD::~OSD() = default;
@@ -145,33 +151,39 @@ CompatSet get_osd_initial_compat_set()
 
 seastar::future<> OSD::mkfs(uuid_d osd_uuid, uuid_d cluster_fsid)
 {
-  return store->start().then([this, osd_uuid] {
-    return store->mkfs(osd_uuid);
+  return store.start().then([this, osd_uuid] {
+    return store.mkfs(osd_uuid).handle_error(
+      crimson::stateful_ec::handle([] (const auto& ec) {
+        logger().error("error creating empty object store in {}: ({}) {}",
+                       local_conf().get_val<std::string>("osd_data"),
+                       ec.value(), ec.message());
+        std::exit(EXIT_FAILURE);
+      }));
   }).then([this] {
-    return store->mount();
+    return store.mount().handle_error(
+      crimson::stateful_ec::handle([] (const auto& ec) {
+        logger().error("error mounting object store in {}: ({}) {}",
+                       local_conf().get_val<std::string>("osd_data"),
+                       ec.value(), ec.message());
+        std::exit(EXIT_FAILURE);
+      }));
   }).then([cluster_fsid, this] {
     superblock.cluster_fsid = cluster_fsid;
-    superblock.osd_fsid = store->get_fsid();
+    superblock.osd_fsid = store.get_fsid();
     superblock.whoami = whoami;
     superblock.compat_features = get_osd_initial_compat_set();
-
-    logger().info(
-      "{} writing superblock cluster_fsid {} osd_fsid {}",
-      __func__,
-      cluster_fsid,
-      superblock.osd_fsid);
-    return store->create_new_collection(coll_t::meta());
-  }).then([this] (auto ch) {
-    meta_coll = make_unique<OSDMeta>(ch , store.get());
-    ceph::os::Transaction t;
-    meta_coll->create(t);
-    meta_coll->store_superblock(t, superblock);
-    return store->do_transaction(meta_coll->collection(), std::move(t));
+    return _write_superblock();
+  }).then([cluster_fsid, this] {
+    return store.write_meta("ceph_fsid", cluster_fsid.to_string());
+  }).then([this] {
+    return store.write_meta("magic", CEPH_OSD_ONDISK_MAGIC);
+  }).then([this] {
+    return store.write_meta("whoami", std::to_string(whoami));
+  }).then([this] {
+    return _write_key_meta();
+  }).then([this] {
+    return store.write_meta("ready", "ready");
   }).then([cluster_fsid, this] {
-    return when_all_succeed(
-      store->write_meta("ceph_fsid", cluster_fsid.to_string()),
-      store->write_meta("whoami", std::to_string(whoami)));
-  }).then_unpack([cluster_fsid, this] {
     fmt::print("created object store {} for osd.{} fsid {}\n",
                local_conf().get_val<std::string>("osd_data"),
                whoami, cluster_fsid);
@@ -179,11 +191,77 @@ seastar::future<> OSD::mkfs(uuid_d osd_uuid, uuid_d cluster_fsid)
   });
 }
 
+seastar::future<> OSD::_write_superblock()
+{
+  return store.open_collection(coll_t::meta()).then([this] (auto ch) {
+    if (ch) {
+      // if we already have superblock, check if it matches
+      meta_coll = make_unique<OSDMeta>(ch, store);
+      return meta_coll->load_superblock().then([this](OSDSuperblock&& sb) {
+        if (sb.cluster_fsid != superblock.cluster_fsid) {
+          logger().error("provided cluster fsid {} != superblock's {}",
+                         sb.cluster_fsid, superblock.cluster_fsid);
+          throw std::invalid_argument("mismatched fsid");
+        }
+        if (sb.whoami != superblock.whoami) {
+          logger().error("provided osd id {} != superblock's {}",
+                         sb.whoami, superblock.whoami);
+          throw std::invalid_argument("mismatched osd id");
+        }
+      });
+    } else {
+      // meta collection does not yet, create superblock
+      logger().info(
+        "{} writing superblock cluster_fsid {} osd_fsid {}",
+        "_write_superblock",
+        superblock.cluster_fsid,
+        superblock.osd_fsid);
+      return store.create_new_collection(coll_t::meta()).then([this] (auto ch) {
+        meta_coll = make_unique<OSDMeta>(ch, store);
+        ceph::os::Transaction t;
+        meta_coll->create(t);
+        meta_coll->store_superblock(t, superblock);
+        logger().debug("OSD::_write_superblock: do_transaction...");
+        return store.do_transaction(meta_coll->collection(), std::move(t));
+      });
+    }
+  });
+}
+
+// this `to_string` sits in the `crimson::osd` namespace, so we don't brake
+// the language rule on not overloading in `std::`.
+static std::string to_string(const seastar::temporary_buffer<char>& temp_buf)
+{
+  return {temp_buf.get(), temp_buf.size()};
+}
+
+seastar::future<> OSD::_write_key_meta()
+{
+
+  if (auto key = local_conf().get_val<std::string>("key"); !std::empty(key)) {
+    return store.write_meta("osd_key", key);
+  } else if (auto keyfile = local_conf().get_val<std::string>("keyfile");
+             !std::empty(keyfile)) {
+    return read_file(keyfile).then([this] (const auto& temp_buf) {
+      // it's on a truly cold path, so don't worry about memcpy.
+      return store.write_meta("osd_key", to_string(temp_buf));
+    }).handle_exception([keyfile] (auto ep) {
+      logger().error("_write_key_meta: failed to handle keyfile {}: {}",
+                     keyfile, ep);
+      ceph_abort();
+    });
+  } else {
+    return seastar::now();
+  }
+}
+
 namespace {
   entity_addrvec_t pick_addresses(int what) {
     entity_addrvec_t addrs;
     crimson::common::CephContext cct;
-    if (int r = ::pick_addresses(&cct, what, &addrs, -1); r < 0) {
+    // we're interested solely in v2; crimson doesn't do v1
+    const auto flags = what | CEPH_PICK_ADDRESS_MSGR2;
+    if (int r = ::pick_addresses(&cct, flags, &addrs, -1); r < 0) {
       throw std::runtime_error("failed to pick address");
     }
     for (auto addr : addrs.v) {
@@ -226,12 +304,18 @@ seastar::future<> OSD::start()
 
   startup_time = ceph::mono_clock::now();
 
-  return store->start().then([this] {
-    return store->mount();
+  return store.start().then([this] {
+    return store.mount().handle_error(
+      crimson::stateful_ec::handle([] (const auto& ec) {
+        logger().error("error mounting object store in {}: ({}) {}",
+                       local_conf().get_val<std::string>("osd_data"),
+                       ec.value(), ec.message());
+        std::exit(EXIT_FAILURE);
+      }));
   }).then([this] {
-    return store->open_collection(coll_t::meta());
+    return store.open_collection(coll_t::meta());
   }).then([this](auto ch) {
-    meta_coll = make_unique<OSDMeta>(ch, store.get());
+    meta_coll = make_unique<OSDMeta>(ch, store);
     return meta_coll->load_superblock();
   }).then([this](OSDSuperblock&& sb) {
     superblock = std::move(sb);
@@ -240,6 +324,7 @@ seastar::future<> OSD::start()
     shard_services.update_map(map);
     osdmap_gate.got_map(map->get_epoch());
     osdmap = std::move(map);
+    bind_epoch = osdmap->get_epoch();
     return load_pgs();
   }).then([this] {
 
@@ -267,24 +352,20 @@ seastar::future<> OSD::start()
 
     crimson::net::dispatchers_t dispatchers{this, monc.get(), mgrc.get()};
     return seastar::when_all_succeed(
-      cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
-                             local_conf()->ms_bind_port_min,
-                             local_conf()->ms_bind_port_max)
+      cluster_msgr->bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER))
         .safe_then([this, dispatchers]() mutable {
          return cluster_msgr->start(dispatchers);
         }, crimson::net::Messenger::bind_ertr::all_same_way(
             [] (const std::error_code& e) {
-          logger().error("cluster messenger try_bind(): address range is unavailable.");
+          logger().error("cluster messenger bind(): {}", e);
           ceph_abort();
         })),
-      public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
-                            local_conf()->ms_bind_port_min,
-                            local_conf()->ms_bind_port_max)
+      public_msgr->bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC))
         .safe_then([this, dispatchers]() mutable {
          return public_msgr->start(dispatchers);
         }, crimson::net::Messenger::bind_ertr::all_same_way(
             [] (const std::error_code& e) {
-          logger().error("public messenger try_bind(): address range is unavailable.");
+          logger().error("public messenger bind(): {}", e);
           ceph_abort();
         })));
   }).then_unpack([this] {
@@ -301,17 +382,20 @@ seastar::future<> OSD::start()
     if (auto [addrs, changed] =
         replace_unknown_addrs(cluster_msgr->get_myaddrs(),
                               public_msgr->get_myaddrs()); changed) {
+      logger().debug("replacing unkwnown addrs of cluster messenger");
       return cluster_msgr->set_myaddrs(addrs);
     } else {
       return seastar::now();
     }
   }).then([this] {
-    return heartbeat->start(public_msgr->get_myaddrs(),
-                            cluster_msgr->get_myaddrs());
+    return heartbeat->start(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
+                            pick_addresses(CEPH_PICK_ADDRESS_CLUSTER));
   }).then([this] {
     // create the admin-socket server, and the objects that register
     // to handle incoming commands
     return start_asok_admin();
+  }).then([this] {
+    return log_client.set_fsid(monc->get_fsid());
   }).then([this] {
     return start_boot();
   });
@@ -363,18 +447,32 @@ seastar::future<> OSD::_send_boot()
 {
   state.set_booting();
 
-  logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs());
-  logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs());
-  logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr());
-  auto m = make_message<MOSDBoot>(superblock,
-                                  osdmap->get_epoch(),
+  entity_addrvec_t public_addrs = public_msgr->get_myaddrs();
+  entity_addrvec_t cluster_addrs = cluster_msgr->get_myaddrs();
+  entity_addrvec_t hb_back_addrs = heartbeat->get_back_addrs();
+  entity_addrvec_t hb_front_addrs = heartbeat->get_front_addrs();
+  if (cluster_msgr->set_addr_unknowns(public_addrs)) {
+    cluster_addrs = cluster_msgr->get_myaddrs();
+  }
+  if (heartbeat->get_back_msgr()->set_addr_unknowns(cluster_addrs)) {
+    hb_back_addrs = heartbeat->get_back_addrs();
+  }
+  if (heartbeat->get_front_msgr()->set_addr_unknowns(public_addrs)) {
+    hb_front_addrs = heartbeat->get_front_addrs();
+  }
+  logger().info("hb_back_msgr: {}", hb_back_addrs);
+  logger().info("hb_front_msgr: {}", hb_front_addrs);
+  logger().info("cluster_msgr: {}", cluster_addrs);
+
+  auto m = crimson::make_message<MOSDBoot>(superblock,
                                   osdmap->get_epoch(),
-                                  heartbeat->get_back_addrs(),
-                                  heartbeat->get_front_addrs(),
-                                  cluster_msgr->get_myaddrs(),
+                                  boot_epoch,
+                                  hb_back_addrs,
+                                  hb_front_addrs,
+                                  cluster_addrs,
                                   CEPH_FEATURES_ALL);
   collect_sys_info(&m->metadata, NULL);
-  return monc->send_message(m);
+  return monc->send_message(std::move(m));
 }
 
 seastar::future<> OSD::_add_me_to_crush()
@@ -387,7 +485,7 @@ seastar::future<> OSD::_add_me_to_crush()
        w >= 0) {
       return seastar::make_ready_future<double>(w);
     } else {
-       return store->stat().then([](auto st) {
+       return store.stat().then([](auto st) {
          auto total = st.total;
         return seastar::make_ready_future<double>(
            std::max(.00001,
@@ -404,7 +502,7 @@ seastar::future<> OSD::_add_me_to_crush()
       "weight": {:.4f},
       "args": [{}]
     }})", whoami, weight, loc);
-    return monc->run_command({cmd}, {});
+    return monc->run_command(std::move(cmd), {});
   }).then([](auto&& command_result) {
     [[maybe_unused]] auto [code, message, out] = std::move(command_result);
     if (code) {
@@ -435,24 +533,26 @@ seastar::future<> OSD::start_asok_admin()
   auto asok_path = local_conf().get_val<std::string>("admin_socket");
   using namespace crimson::admin;
   return asok->start(asok_path).then([this] {
-    return seastar::when_all_succeed(
-      asok->register_admin_commands(),
-      asok->register_command(make_asok_hook<OsdStatusHook>(std::as_const(*this))),
-      asok->register_command(make_asok_hook<SendBeaconHook>(*this)),
-      asok->register_command(make_asok_hook<FlushPgStatsHook>(*this)),
-      asok->register_command(make_asok_hook<DumpPGStateHistory>(std::as_const(*this))),
-      asok->register_command(make_asok_hook<SeastarMetricsHook>()),
-      // PG commands
-      asok->register_command(make_asok_hook<pg::QueryCommand>(*this)),
-      asok->register_command(make_asok_hook<pg::MarkUnfoundLostCommand>(*this)));
-  }).then_unpack([] {
-    return seastar::now();
+    asok->register_admin_commands();
+    asok->register_command(make_asok_hook<OsdStatusHook>(std::as_const(*this)));
+    asok->register_command(make_asok_hook<SendBeaconHook>(*this));
+    asok->register_command(make_asok_hook<FlushPgStatsHook>(*this));
+    asok->register_command(make_asok_hook<DumpPGStateHistory>(std::as_const(*this)));
+    asok->register_command(make_asok_hook<DumpMetricsHook>());
+    asok->register_command(make_asok_hook<DumpPerfCountersHook>());
+    asok->register_command(make_asok_hook<InjectDataErrorHook>(get_shard_services()));
+    asok->register_command(make_asok_hook<InjectMDataErrorHook>(get_shard_services()));
+    // PG commands
+    asok->register_command(make_asok_hook<pg::QueryCommand>(*this));
+    asok->register_command(make_asok_hook<pg::MarkUnfoundLostCommand>(*this));
   });
 }
 
 seastar::future<> OSD::stop()
 {
   logger().info("stop");
+  beacon_timer.cancel();
+  tick_timer.cancel();
   // see also OSD::shutdown()
   return prepare_to_stop().then([this] {
     state.set_stopping();
@@ -463,9 +563,9 @@ seastar::future<> OSD::stop()
     return asok->stop().then([this] {
       return heartbeat->stop();
     }).then([this] {
-      return store->umount();
+      return store.umount();
     }).then([this] {
-      return store->stop();
+      return store.stop();
     }).then([this] {
       return seastar::parallel_for_each(pg_map.get_pgs(),
        [](auto& p) {
@@ -480,9 +580,7 @@ seastar::future<> OSD::stop()
     }).then([this] {
       return when_all_succeed(
          public_msgr->shutdown(),
-         cluster_msgr->shutdown());
-    }).then_unpack([] {
-      return seastar::now();
+         cluster_msgr->shutdown()).discard_result();
     }).handle_exception([](auto ep) {
       logger().error("error while stopping osd: {}", ep);
     });
@@ -524,7 +622,7 @@ void OSD::print(std::ostream& out) const
 
 seastar::future<> OSD::load_pgs()
 {
-  return store->list_collections().then([this](auto colls) {
+  return store.list_collections().then([this](auto colls) {
     return seastar::parallel_for_each(colls, [this](auto coll) {
       spg_t pgid;
       if (coll.is_pg(&pgid)) {
@@ -570,9 +668,9 @@ seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map,
   auto get_collection = [pgid, do_create, this] {
     const coll_t cid{pgid};
     if (do_create) {
-      return store->create_new_collection(cid);
+      return store.create_new_collection(cid);
     } else {
-      return store->open_collection(cid);
+      return store.open_collection(cid);
     }
   };
   return seastar::when_all(
@@ -597,14 +695,14 @@ seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
 {
   logger().debug("{}: {}", __func__, pgid);
 
-  return seastar::do_with(PGMeta(store.get(), pgid), [] (auto& pg_meta) {
+  return seastar::do_with(PGMeta(store, pgid), [](auto& pg_meta) {
     return pg_meta.get_epoch();
   }).then([this](epoch_t e) {
     return get_map(e);
   }).then([pgid, this] (auto&& create_map) {
     return make_pg(std::move(create_map), pgid, false);
   }).then([this](Ref<PG> pg) {
-    return pg->read_state(store.get()).then([pg] {
+    return pg->read_state(&store).then([pg] {
        return seastar::make_ready_future<Ref<PG>>(std::move(pg));
     });
   }).handle_exception([pgid](auto ep) {
@@ -727,17 +825,17 @@ void OSD::update_stats()
   osd_stat.hb_peers = heartbeat->get_peers();
   osd_stat.seq = (static_cast<uint64_t>(get_up_epoch()) << 32) | osd_stat_seq;
   gate.dispatch_in_background("statfs", *this, [this] {
-    (void) store->stat().then([this](store_statfs_t&& st) {
+    (void) store.stat().then([this](store_statfs_t&& st) {
       osd_stat.statfs = st;
     });
   });
 }
 
-MessageRef OSD::get_stats() const
+MessageURef OSD::get_stats() const
 {
   // todo: m-to-n: collect stats using map-reduce
   // MPGStats::had_map_for is not used since PGMonitor was removed
-  auto m = make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
+  auto m = crimson::make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
   m->osd_stat = osd_stat;
   for (auto [pgid, pg] : pg_map.get_pgs()) {
     if (pg->is_primary()) {
@@ -911,7 +1009,6 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
         auto [pg, startmap] = std::move(ret);
         if (!pg)
           return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
-        PeeringCtx rctx{ceph_release_t::octopus};
         const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool());
 
         int up_primary, acting_primary;
@@ -922,6 +1019,7 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
         int role = startmap->calc_pg_role(pg_shard_t(whoami, info->pgid.shard),
                                           acting);
 
+        PeeringCtx rctx;
         create_pg_collection(
           rctx.transaction,
           info->pgid,
@@ -939,7 +1037,6 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
           acting_primary,
           info->history,
           info->past_intervals,
-          false,
           rctx.transaction);
 
         return shard_services.start_operation<PGAdvanceMap>(
@@ -1010,7 +1107,8 @@ seastar::future<> OSD::handle_osd_map(crimson::net::ConnectionRef conn,
         superblock.clean_thru = last;
       }
       meta_coll->store_superblock(t, superblock);
-      return store->do_transaction(meta_coll->collection(), std::move(t));
+      logger().debug("OSD::handle_osd_map: do_transaction...");
+      return store.do_transaction(meta_coll->collection(), std::move(t));
     });
   }).then([=] {
     // TODO: write to superblock and commit the transaction
@@ -1040,10 +1138,13 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
       }
     });
   }).then([m, this] {
-    if (osdmap->is_up(whoami) &&
-        osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
-        bind_epoch < osdmap->get_up_from(whoami)) {
-      if (state.is_booting()) {
+    if (osdmap->is_up(whoami)) {
+      const auto up_from = osdmap->get_up_from(whoami);
+      logger().info("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}",
+                    whoami, osdmap->get_epoch(), up_from, bind_epoch, state);
+      if (bind_epoch < up_from &&
+          osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
+          state.is_booting()) {
         logger().info("osd.{}: activating...", whoami);
         state.set_active();
         beacon_timer.arm_periodic(
@@ -1051,19 +1152,21 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
         tick_timer.arm_periodic(
           std::chrono::seconds(TICK_INTERVAL));
       }
-    } else if (!osdmap->is_up(whoami)) {
+    } else {
       if (state.is_prestop()) {
        got_stop_ack();
        return seastar::now();
       }
     }
-    check_osdmap_features();
-    // yay!
-    return consume_map(osdmap->get_epoch());
+    return check_osdmap_features().then([this] {
+      // yay!
+      return consume_map(osdmap->get_epoch());
+    });
   }).then([m, this] {
     if (state.is_active()) {
       logger().info("osd.{}: now active", whoami);
-      if (!osdmap->exists(whoami)) {
+      if (!osdmap->exists(whoami) ||
+         osdmap->is_stop(whoami)) {
         return shutdown();
       }
       if (should_restart()) {
@@ -1104,22 +1207,22 @@ seastar::future<> OSD::send_incremental_map(crimson::net::ConnectionRef conn,
   if (first >= superblock.oldest_map) {
     return load_map_bls(first, superblock.newest_map)
     .then([this, conn, first](auto&& bls) {
-      auto m = make_message<MOSDMap>(monc->get_fsid(),
+      auto m = crimson::make_message<MOSDMap>(monc->get_fsid(),
          osdmap->get_encoding_features());
       m->oldest_map = first;
       m->newest_map = superblock.newest_map;
       m->maps = std::move(bls);
-      return conn->send(m);
+      return conn->send(std::move(m));
     });
   } else {
     return load_map_bl(osdmap->get_epoch())
     .then([this, conn](auto&& bl) mutable {
-      auto m = make_message<MOSDMap>(monc->get_fsid(),
+      auto m = crimson::make_message<MOSDMap>(monc->get_fsid(),
          osdmap->get_encoding_features());
       m->oldest_map = superblock.oldest_map;
       m->newest_map = superblock.newest_map;
       m->maps.emplace(osdmap->get_epoch(), std::move(bl));
-      return conn->send(m);
+      return conn->send(std::move(m));
     });
   }
 }
@@ -1239,11 +1342,11 @@ seastar::future<> OSD::send_beacon()
   // FIXME: min lec should be calculated from pg_stat
   //        and should set m->pgs
   epoch_t min_last_epoch_clean = osdmap->get_epoch();
-  auto m = make_message<MOSDBeacon>(osdmap->get_epoch(),
+  auto m = crimson::make_message<MOSDBeacon>(osdmap->get_epoch(),
                                     min_last_epoch_clean,
                                     superblock.last_purged_snaps_scrub,
                                     local_conf()->osd_beacon_report_interval);
-  return monc->send_message(m);
+  return monc->send_message(std::move(m));
 }
 
 void OSD::update_heartbeat_peers()
@@ -1284,9 +1387,11 @@ seastar::future<> OSD::handle_peering_op(
   return seastar::now();
 }
 
-void OSD::check_osdmap_features()
+seastar::future<> OSD::check_osdmap_features()
 {
   heartbeat->set_require_authorizer(true);
+  return store.write_meta("require_osd_release",
+                          stringify((int)osdmap->require_osd_release));
 }
 
 seastar::future<> OSD::consume_map(epoch_t epoch)
@@ -1296,7 +1401,7 @@ seastar::future<> OSD::consume_map(epoch_t epoch)
   return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) {
     return shard_services.start_operation<PGAdvanceMap>(
       *this, pg.second, pg.second->get_osdmap_epoch(), epoch,
-      PeeringCtx{ceph_release_t::octopus}, false).second;
+      PeeringCtx{}, false).second;
   }).then([epoch, this] {
     osdmap_gate.got_map(epoch);
     return seastar::make_ready_future();
@@ -1345,7 +1450,7 @@ seastar::future<> OSD::prepare_to_stop()
     return seastar::with_timeout(
       seastar::timer<>::clock::now() + timeout,
       monc->send_message(
-         make_message<MOSDMarkMeDown>(
+         crimson::make_message<MOSDMarkMeDown>(
            monc->get_fsid(),
            whoami,
            osdmap->get_addrs(whoami),