]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/osd/osd.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / osd / osd.cc
index f0c1724f30ed9592ce18fa6f061c0f8b2fbbe9b9..521cb9ba3bb180b30902811a2d4b849a31852f8b 100644 (file)
 #include <boost/smart_ptr/make_local_shared.hpp>
 #include <fmt/format.h>
 #include <fmt/ostream.h>
+#include <seastar/core/timer.hh>
 
 #include "common/pick_address.h"
 #include "include/util.h"
 
-#include "messages/MOSDAlive.h"
+#include "messages/MCommand.h"
 #include "messages/MOSDBeacon.h"
 #include "messages/MOSDBoot.h"
 #include "messages/MOSDMap.h"
+#include "messages/MOSDMarkMeDown.h"
 #include "messages/MOSDOp.h"
 #include "messages/MOSDPGLog.h"
+#include "messages/MOSDPGPull.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPushReply.h"
+#include "messages/MOSDPGRecoveryDelete.h"
+#include "messages/MOSDPGRecoveryDeleteReply.h"
 #include "messages/MOSDRepOpReply.h"
+#include "messages/MOSDScrub2.h"
 #include "messages/MPGStats.h"
 
 #include "os/Transaction.h"
 #include "osd/ClassHandler.h"
+#include "osd/OSDCap.h"
 #include "osd/PGPeeringEvent.h"
 #include "osd/PeeringState.h"
 
+#include "crimson/admin/osd_admin.h"
+#include "crimson/admin/pg_commands.h"
+#include "crimson/common/exception.h"
 #include "crimson/mon/MonClient.h"
 #include "crimson/net/Connection.h"
 #include "crimson/net/Messenger.h"
-#include "crimson/os/cyanstore/cyan_object.h"
 #include "crimson/os/futurized_collection.h"
 #include "crimson/os/futurized_store.h"
 #include "crimson/osd/heartbeat.h"
@@ -43,6 +54,7 @@
 #include "crimson/osd/osd_operations/compound_peering_request.h"
 #include "crimson/osd/osd_operations/peering_event.h"
 #include "crimson/osd/osd_operations/pg_advance_map.h"
+#include "crimson/osd/osd_operations/recovery_subrequest.h"
 #include "crimson/osd/osd_operations/replicated_request.h"
 
 namespace {
@@ -74,10 +86,13 @@ OSD::OSD(int id, uint32_t nonce,
       local_conf().get_val<std::string>("osd_objectstore"),
       local_conf().get_val<std::string>("osd_data"),
       local_conf().get_config_values())},
-    shard_services{*this, *cluster_msgr, *public_msgr, *monc, *mgrc, *store},
-    heartbeat{new Heartbeat{shard_services, *monc, hb_front_msgr, hb_back_msgr}},
+    shard_services{*this, whoami, *cluster_msgr, *public_msgr, *monc, *mgrc, *store},
+    heartbeat{new Heartbeat{whoami, shard_services, *monc, hb_front_msgr, hb_back_msgr}},
     // do this in background
-    heartbeat_timer{[this] { (void)update_heartbeat_peers(); }},
+    tick_timer{[this] {
+      update_heartbeat_peers();
+      update_stats();
+    }},
     asok{seastar::make_lw_shared<crimson::admin::AdminSocket>()},
     osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services)))
 {
@@ -156,7 +171,7 @@ seastar::future<> OSD::mkfs(uuid_d osd_uuid, uuid_d cluster_fsid)
     return when_all_succeed(
       store->write_meta("ceph_fsid", cluster_fsid.to_string()),
       store->write_meta("whoami", std::to_string(whoami)));
-  }).then([cluster_fsid, this] {
+  }).then_unpack([cluster_fsid, this] {
     fmt::print("created object store {} for osd.{} fsid {}\n",
                local_conf().get_val<std::string>("osd_data"),
                whoami, cluster_fsid);
@@ -250,22 +265,32 @@ seastar::future<> OSD::start()
     cluster_msgr->set_policy(entity_name_t::TYPE_CLIENT,
                              SocketPolicy::stateless_server(0));
 
-    dispatchers.push_front(this);
-    dispatchers.push_front(monc.get());
-    dispatchers.push_front(mgrc.get());
+    crimson::net::dispatchers_t dispatchers{this, monc.get(), mgrc.get()};
     return seastar::when_all_succeed(
       cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
                              local_conf()->ms_bind_port_min,
                              local_conf()->ms_bind_port_max)
-        .then([this] { return cluster_msgr->start(&dispatchers); }),
+        .safe_then([this, dispatchers]() mutable {
+         return cluster_msgr->start(dispatchers);
+        }, crimson::net::Messenger::bind_ertr::all_same_way(
+            [] (const std::error_code& e) {
+          logger().error("cluster messenger try_bind(): address range is unavailable.");
+          ceph_abort();
+        })),
       public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
                             local_conf()->ms_bind_port_min,
                             local_conf()->ms_bind_port_max)
-        .then([this] { return public_msgr->start(&dispatchers); }));
-  }).then([this] {
+        .safe_then([this, dispatchers]() mutable {
+         return public_msgr->start(dispatchers);
+        }, crimson::net::Messenger::bind_ertr::all_same_way(
+            [] (const std::error_code& e) {
+          logger().error("public messenger try_bind(): address range is unavailable.");
+          ceph_abort();
+        })));
+  }).then_unpack([this] {
     return seastar::when_all_succeed(monc->start(),
                                      mgrc->start());
-  }).then([this] {
+  }).then_unpack([this] {
     return _add_me_to_crush();
   }).then([this] {
     monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0);
@@ -295,7 +320,8 @@ seastar::future<> OSD::start()
 seastar::future<> OSD::start_boot()
 {
   state.set_preboot();
-  return monc->get_version("osdmap").then([this](version_t newest, version_t oldest) {
+  return monc->get_version("osdmap").then([this](auto&& ret) {
+    auto [newest, oldest] = ret;
     return _preboot(oldest, newest);
   });
 }
@@ -304,7 +330,7 @@ seastar::future<> OSD::_preboot(version_t oldest, version_t newest)
 {
   logger().info("osd.{}: _preboot", whoami);
   if (osdmap->get_epoch() == 0) {
-    logger().warn("waiting for initial osdmap");
+    logger().info("waiting for initial osdmap");
   } else if (osdmap->is_destroyed(whoami)) {
     logger().warn("osdmap says I am destroyed");
     // provide a small margin so we don't livelock seeing if we
@@ -379,7 +405,8 @@ seastar::future<> OSD::_add_me_to_crush()
       "args": [{}]
     }})", whoami, weight, loc);
     return monc->run_command({cmd}, {});
-  }).then([](int32_t code, string message, bufferlist) {
+  }).then([](auto&& command_result) {
+    [[maybe_unused]] auto [code, message, out] = std::move(command_result);
     if (code) {
       logger().warn("fail to add to crush: {} ({})", message, code);
       throw std::runtime_error("fail to add to crush");
@@ -390,25 +417,10 @@ seastar::future<> OSD::_add_me_to_crush()
   });
 }
 
-seastar::future<> OSD::_send_alive()
+seastar::future<> OSD::handle_command(crimson::net::ConnectionRef conn,
+                                     Ref<MCommand> m)
 {
-  auto want = osdmap->get_epoch();
-  logger().info(
-    "{} want {} up_thru_wanted {}",
-    __func__,
-    want,
-    up_thru_wanted);
-  if (!osdmap->exists(whoami)) {
-    logger().warn("{} DNE", __func__);
-    return seastar::now();
-  } else if (want <= up_thru_wanted) {
-    logger().debug("{} {} <= {}", __func__, want, up_thru_wanted);
-    return seastar::now();
-  } else {
-    up_thru_wanted = want;
-    auto m = make_message<MOSDAlive>(osdmap->get_epoch(), want);
-    return monc->send_message(std::move(m));
-  }
+  return asok->handle_command(conn, std::move(m));
 }
 
 /*
@@ -425,11 +437,16 @@ seastar::future<> OSD::start_asok_admin()
   return asok->start(asok_path).then([this] {
     return seastar::when_all_succeed(
       asok->register_admin_commands(),
-      asok->register_command(make_asok_hook<OsdStatusHook>(*this)),
+      asok->register_command(make_asok_hook<OsdStatusHook>(std::as_const(*this))),
       asok->register_command(make_asok_hook<SendBeaconHook>(*this)),
-      asok->register_command(make_asok_hook<ConfigShowHook>()),
-      asok->register_command(make_asok_hook<ConfigGetHook>()),
-      asok->register_command(make_asok_hook<ConfigSetHook>()));
+      asok->register_command(make_asok_hook<FlushPgStatsHook>(*this)),
+      asok->register_command(make_asok_hook<DumpPGStateHistory>(std::as_const(*this))),
+      asok->register_command(make_asok_hook<SeastarMetricsHook>()),
+      // PG commands
+      asok->register_command(make_asok_hook<pg::QueryCommand>(*this)),
+      asok->register_command(make_asok_hook<pg::MarkUnfoundLostCommand>(*this)));
+  }).then_unpack([] {
+    return seastar::now();
   });
 }
 
@@ -437,24 +454,38 @@ seastar::future<> OSD::stop()
 {
   logger().info("stop");
   // see also OSD::shutdown()
-  state.set_stopping();
-
-  return gate.close().then([this] {
-    return asok->stop();
-  }).then([this] {
-    return heartbeat->stop();
-  }).then([this] {
-    return monc->stop();
-  }).then([this] {
-    return when_all_succeed(
-      public_msgr->shutdown(),
-      cluster_msgr->shutdown());
-  }).then([this] {
-    return store->umount();
-  }).then([this] {
-    return store->stop();
-  }).handle_exception([](auto ep) {
-    logger().error("error while stopping osd: {}", ep);
+  return prepare_to_stop().then([this] {
+    state.set_stopping();
+    logger().debug("prepared to stop");
+    public_msgr->stop();
+    cluster_msgr->stop();
+    auto gate_close_fut = gate.close();
+    return asok->stop().then([this] {
+      return heartbeat->stop();
+    }).then([this] {
+      return store->umount();
+    }).then([this] {
+      return store->stop();
+    }).then([this] {
+      return seastar::parallel_for_each(pg_map.get_pgs(),
+       [](auto& p) {
+       return p.second->stop();
+      });
+    }).then([this] {
+      return monc->stop();
+    }).then([this] {
+      return mgrc->stop();
+    }).then([fut=std::move(gate_close_fut)]() mutable {
+      return std::move(fut);
+    }).then([this] {
+      return when_all_succeed(
+         public_msgr->shutdown(),
+         cluster_msgr->shutdown());
+    }).then_unpack([] {
+      return seastar::now();
+    }).handle_exception([](auto ep) {
+      logger().error("error while stopping osd: {}", ep);
+    });
   });
 }
 
@@ -469,6 +500,28 @@ void OSD::dump_status(Formatter* f) const
   f->dump_unsigned("num_pgs", pg_map.get_pgs().size());
 }
 
+void OSD::dump_pg_state_history(Formatter* f) const
+{
+  f->open_array_section("pgs");
+  for (auto [pgid, pg] : pg_map.get_pgs()) {
+    f->open_object_section("pg");
+    f->dump_stream("pg") << pgid;
+    const auto& peering_state = pg->get_peering_state();
+    f->dump_string("currently", peering_state.get_current_state());
+    peering_state.dump_history(f);
+    f->close_section();
+  }
+  f->close_section();
+}
+
+void OSD::print(std::ostream& out) const
+{
+  out << "{osd." << superblock.whoami << " "
+    << superblock.osd_fsid << " [" << superblock.oldest_map
+    << "," << superblock.newest_map << "] " << pg_map.get_pgs().size()
+    << " pgs}";
+}
+
 seastar::future<> OSD::load_pgs()
 {
   return store->list_collections().then([this](auto colls) {
@@ -505,10 +558,10 @@ seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map,
       if (pi.is_erasure()) {
        ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile);
       }
-      return seastar::make_ready_future<pg_pool_t, string, ec_profile_t>(
-       std::move(pi),
-       std::move(name),
-       std::move(ec_profile));
+      return seastar::make_ready_future<std::tuple<pg_pool_t, string, ec_profile_t>>(
+        std::make_tuple(std::move(pi),
+                       std::move(name),
+                       std::move(ec_profile)));
     } else {
       // pool was deleted; grab final pg_pool_t off disk.
       return meta_coll->load_final_pool_info(pgid.pool());
@@ -522,12 +575,12 @@ seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map,
       return store->open_collection(cid);
     }
   };
-  return seastar::when_all_succeed(
+  return seastar::when_all(
     std::move(get_pool_info),
     std::move(get_collection)
-  ).then([pgid, create_map, this] (auto info,
-                                  auto coll) {
-    auto [pool, name, ec_profile] = std::move(info);
+  ).then([pgid, create_map, this] (auto&& ret) {
+    auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get0());
+    auto coll = std::move(std::get<1>(ret).get0());
     return seastar::make_ready_future<Ref<PG>>(
       new PG{pgid,
             pg_shard_t{whoami, pgid.shard},
@@ -542,13 +595,17 @@ seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map,
 
 seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
 {
-  return PGMeta{store.get(), pgid}.get_epoch().then([this](epoch_t e) {
+  logger().debug("{}: {}", __func__, pgid);
+
+  return seastar::do_with(PGMeta(store.get(), pgid), [] (auto& pg_meta) {
+    return pg_meta.get_epoch();
+  }).then([this](epoch_t e) {
     return get_map(e);
   }).then([pgid, this] (auto&& create_map) {
     return make_pg(std::move(create_map), pgid, false);
-  }).then([this, pgid](Ref<PG> pg) {
+  }).then([this](Ref<PG> pg) {
     return pg->read_state(store.get()).then([pg] {
-      return seastar::make_ready_future<Ref<PG>>(std::move(pg));
+       return seastar::make_ready_future<Ref<PG>>(std::move(pg));
     });
   }).handle_exception([pgid](auto ep) {
     logger().info("pg {} saw exception on load {}", pgid, ep);
@@ -557,79 +614,131 @@ seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
   });
 }
 
-seastar::future<> OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
+std::optional<seastar::future<>>
+OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
 {
   if (state.is_stopping()) {
-    return seastar::now();
-  }
-
-  switch (m->get_type()) {
-  case CEPH_MSG_OSD_MAP:
-    return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
-  case CEPH_MSG_OSD_OP:
-    return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
-  case MSG_OSD_PG_CREATE2:
-    shard_services.start_operation<CompoundPeeringRequest>(
-      *this,
-      conn->get_shared(),
-      m);
-    return seastar::now();
-  case MSG_OSD_PG_LEASE:
-    [[fallthrough]];
-  case MSG_OSD_PG_LEASE_ACK:
-    [[fallthrough]];
-  case MSG_OSD_PG_NOTIFY2:
-    [[fallthrough]];
-  case MSG_OSD_PG_INFO2:
-    [[fallthrough]];
-  case MSG_OSD_PG_QUERY2:
-    [[fallthrough]];
-  case MSG_OSD_PG_LOG:
-    return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
-  case MSG_OSD_REPOP:
-    return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
-  case MSG_OSD_REPOPREPLY:
-    return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
-  default:
-    logger().info("{} unhandled message {}", __func__, *m);
-    return seastar::now();
-  }
-}
-
-seastar::future<> OSD::ms_handle_connect(crimson::net::ConnectionRef conn)
-{
-  if (conn->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
-    return seastar::now();
-  } else {
-    return seastar::now();
+    return {};
   }
+  bool dispatched = true;
+  gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
+    switch (m->get_type()) {
+    case CEPH_MSG_OSD_MAP:
+      return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
+    case CEPH_MSG_OSD_OP:
+      return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
+    case MSG_OSD_PG_CREATE2:
+      shard_services.start_operation<CompoundPeeringRequest>(
+       *this,
+       conn,
+       m);
+      return seastar::now();
+    case MSG_COMMAND:
+      return handle_command(conn, boost::static_pointer_cast<MCommand>(m));
+    case MSG_OSD_MARK_ME_DOWN:
+      return handle_mark_me_down(conn, boost::static_pointer_cast<MOSDMarkMeDown>(m));
+    case MSG_OSD_PG_PULL:
+      [[fallthrough]];
+    case MSG_OSD_PG_PUSH:
+      [[fallthrough]];
+    case MSG_OSD_PG_PUSH_REPLY:
+      [[fallthrough]];
+    case MSG_OSD_PG_RECOVERY_DELETE:
+      [[fallthrough]];
+    case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+      [[fallthrough]];
+    case MSG_OSD_PG_SCAN:
+      [[fallthrough]];
+    case MSG_OSD_PG_BACKFILL:
+      [[fallthrough]];
+    case MSG_OSD_PG_BACKFILL_REMOVE:
+      return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m));
+    case MSG_OSD_PG_LEASE:
+      [[fallthrough]];
+    case MSG_OSD_PG_LEASE_ACK:
+      [[fallthrough]];
+    case MSG_OSD_PG_NOTIFY2:
+      [[fallthrough]];
+    case MSG_OSD_PG_INFO2:
+      [[fallthrough]];
+    case MSG_OSD_PG_QUERY2:
+      [[fallthrough]];
+    case MSG_OSD_BACKFILL_RESERVE:
+      [[fallthrough]];
+    case MSG_OSD_RECOVERY_RESERVE:
+      [[fallthrough]];
+    case MSG_OSD_PG_LOG:
+      return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
+    case MSG_OSD_REPOP:
+      return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
+    case MSG_OSD_REPOPREPLY:
+      return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
+    case MSG_OSD_SCRUB2:
+      return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
+    default:
+      dispatched = false;
+      return seastar::now();
+    }
+  });
+  return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
 }
 
-seastar::future<> OSD::ms_handle_reset(crimson::net::ConnectionRef conn)
+void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
 {
   // TODO: cleanup the session attached to this connection
   logger().warn("ms_handle_reset");
-  return seastar::now();
 }
 
-seastar::future<> OSD::ms_handle_remote_reset(crimson::net::ConnectionRef conn)
+void OSD::ms_handle_remote_reset(crimson::net::ConnectionRef conn)
 {
   logger().warn("ms_handle_remote_reset");
-  return seastar::now();
 }
 
 void OSD::handle_authentication(const EntityName& name,
-                               const AuthCapsInfo& caps)
+                               const AuthCapsInfo& caps_info)
+{
+  // TODO: store the parsed cap and associate it with the connection
+  if (caps_info.allow_all) {
+    logger().debug("{} {} has all caps", __func__, name);
+    return;
+  }
+  if (caps_info.caps.length() > 0) {
+    auto p = caps_info.caps.cbegin();
+    string str;
+    try {
+      decode(str, p);
+    } catch (ceph::buffer::error& e) {
+      logger().warn("{} {} failed to decode caps string", __func__, name);
+      return;
+    }
+    OSDCap caps;
+    if (caps.parse(str)) {
+      logger().debug("{} {} has caps {}", __func__, name, str);
+    } else {
+      logger().warn("{} {} failed to parse caps {}", __func__, name, str);
+    }
+  }
+}
+
+void OSD::update_stats()
 {
-  // todo
+  osd_stat_seq++;
+  osd_stat.up_from = get_up_epoch();
+  osd_stat.hb_peers = heartbeat->get_peers();
+  osd_stat.seq = (static_cast<uint64_t>(get_up_epoch()) << 32) | osd_stat_seq;
+  gate.dispatch_in_background("statfs", *this, [this] {
+    (void) store->stat().then([this](store_statfs_t&& st) {
+      osd_stat.statfs = st;
+    });
+  });
 }
 
-MessageRef OSD::get_stats()
+MessageRef OSD::get_stats() const
 {
   // todo: m-to-n: collect stats using map-reduce
   // MPGStats::had_map_for is not used since PGMonitor was removed
   auto m = make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
-
+  m->osd_stat = osd_stat;
   for (auto [pgid, pg] : pg_map.get_pgs()) {
     if (pg->is_primary()) {
       auto stats = pg->get_stats();
@@ -641,6 +750,13 @@ MessageRef OSD::get_stats()
   return m;
 }
 
+uint64_t OSD::send_pg_stats()
+{
+  // mgr client sends the report message in background
+  mgrc->report();
+  return osd_stat.seq;
+}
+
 OSD::cached_map_t OSD::get_map() const
 {
   return osdmap;
@@ -675,6 +791,25 @@ seastar::future<bufferlist> OSD::load_map_bl(epoch_t e)
   }
 }
 
+seastar::future<std::map<epoch_t, bufferlist>> OSD::load_map_bls(
+  epoch_t first,
+  epoch_t last)
+{
+  return seastar::map_reduce(boost::make_counting_iterator<epoch_t>(first),
+                            boost::make_counting_iterator<epoch_t>(last + 1),
+                            [this](epoch_t e) {
+    return load_map_bl(e).then([e](auto&& bl) {
+       return seastar::make_ready_future<pair<epoch_t, bufferlist>>(
+           std::make_pair(e, std::move(bl)));
+    });
+  },
+  std::map<epoch_t, bufferlist>{},
+  [](auto&& bls, auto&& epoch_bl) {
+    bls.emplace(std::move(epoch_bl));
+    return std::move(bls);
+  });
+}
+
 seastar::future<std::unique_ptr<OSDMap>> OSD::load_map(epoch_t e)
 {
   auto o = std::make_unique<OSDMap>();
@@ -740,7 +875,7 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
     [this](auto &info) -> seastar::future<Ref<PG>> {
       return get_map(info->epoch).then(
        [&info, this](cached_map_t startmap) ->
-       seastar::future<Ref<PG>, cached_map_t> {
+       seastar::future<std::tuple<Ref<PG>, cached_map_t>> {
          const spg_t &pgid = info->pgid;
          if (info->by_mon) {
            int64_t pool_id = pgid.pgid.pool();
@@ -750,9 +885,8 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
                "{} ignoring pgid {}, pool dne",
                __func__,
                pgid);
-             return seastar::make_ready_future<Ref<PG>, cached_map_t>(
-               Ref<PG>(),
-               startmap);
+             return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
+                std::make_tuple(Ref<PG>(), startmap));
            }
            ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus);
            if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) {
@@ -763,19 +897,18 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
                "{} dropping {} create, pool does not have CREATING flag set",
                __func__,
                pgid);
-             return seastar::make_ready_future<Ref<PG>, cached_map_t>(
-               Ref<PG>(),
-               startmap);
+             return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
+                std::make_tuple(Ref<PG>(), startmap));
            }
          }
          return make_pg(startmap, pgid, true).then(
            [startmap=std::move(startmap)](auto pg) mutable {
-             return seastar::make_ready_future<Ref<PG>, cached_map_t>(
-               std::move(pg),
-               std::move(startmap));
+             return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
+                std::make_tuple(std::move(pg), std::move(startmap)));
            });
-      }).then([this, &info](auto pg, auto startmap) ->
+      }).then([this, &info](auto&& ret) ->
               seastar::future<Ref<PG>> {
+        auto [pg, startmap] = std::move(ret);
         if (!pg)
           return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
         PeeringCtx rctx{ceph_release_t::octopus};
@@ -811,14 +944,14 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
 
         return shard_services.start_operation<PGAdvanceMap>(
           *this, pg, pg->get_osdmap_epoch(),
-          osdmap->get_epoch(), std::move(rctx), true).second.then([pg] {
+          osdmap->get_epoch(), std::move(rctx), true).second.then([pg=pg] {
             return seastar::make_ready_future<Ref<PG>>(pg);
         });
       });
   });
 }
 
-seastar::future<> OSD::handle_osd_map(crimson::net::Connection* conn,
+seastar::future<> OSD::handle_osd_map(crimson::net::ConnectionRef conn,
                                       Ref<MOSDMap> m)
 {
   logger().info("handle_osd_map {}", *m);
@@ -915,9 +1048,14 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
         state.set_active();
         beacon_timer.arm_periodic(
           std::chrono::seconds(local_conf()->osd_beacon_report_interval));
-        heartbeat_timer.arm_periodic(
+        tick_timer.arm_periodic(
           std::chrono::seconds(TICK_INTERVAL));
       }
+    } else if (!osdmap->is_up(whoami)) {
+      if (state.is_prestop()) {
+       got_stop_ack();
+       return seastar::now();
+      }
     }
     check_osdmap_features();
     // yay!
@@ -950,28 +1088,54 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
   });
 }
 
-seastar::future<> OSD::handle_osd_op(crimson::net::Connection* conn,
+seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn,
                                      Ref<MOSDOp> m)
 {
-  shard_services.start_operation<ClientRequest>(
+  (void) shard_services.start_operation<ClientRequest>(
     *this,
-    conn->get_shared(),
+    conn,
     std::move(m));
   return seastar::now();
 }
 
-seastar::future<> OSD::handle_rep_op(crimson::net::Connection* conn,
+seastar::future<> OSD::send_incremental_map(crimson::net::ConnectionRef conn,
+                                           epoch_t first)
+{
+  if (first >= superblock.oldest_map) {
+    return load_map_bls(first, superblock.newest_map)
+    .then([this, conn, first](auto&& bls) {
+      auto m = make_message<MOSDMap>(monc->get_fsid(),
+         osdmap->get_encoding_features());
+      m->oldest_map = first;
+      m->newest_map = superblock.newest_map;
+      m->maps = std::move(bls);
+      return conn->send(m);
+    });
+  } else {
+    return load_map_bl(osdmap->get_epoch())
+    .then([this, conn](auto&& bl) mutable {
+      auto m = make_message<MOSDMap>(monc->get_fsid(),
+         osdmap->get_encoding_features());
+      m->oldest_map = superblock.oldest_map;
+      m->newest_map = superblock.newest_map;
+      m->maps.emplace(osdmap->get_epoch(), std::move(bl));
+      return conn->send(m);
+    });
+  }
+}
+
+seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn,
                                     Ref<MOSDRepOp> m)
 {
   m->finish_decode();
-  shard_services.start_operation<RepRequest>(
+  (void) shard_services.start_operation<RepRequest>(
     *this,
-    conn->get_shared(),
+    std::move(conn),
     std::move(m));
   return seastar::now();
 }
 
-seastar::future<> OSD::handle_rep_op_reply(crimson::net::Connection* conn,
+seastar::future<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn,
                                           Ref<MOSDRepOpReply> m)
 {
   const auto& pgs = pg_map.get_pgs();
@@ -984,6 +1148,47 @@ seastar::future<> OSD::handle_rep_op_reply(crimson::net::Connection* conn,
   return seastar::now();
 }
 
+seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn,
+                                   Ref<MOSDScrub2> m)
+{
+  if (m->fsid != superblock.cluster_fsid) {
+    logger().warn("fsid mismatched");
+    return seastar::now();
+  }
+  return seastar::parallel_for_each(std::move(m->scrub_pgs),
+    [m, conn, this](spg_t pgid) {
+    pg_shard_t from_shard{static_cast<int>(m->get_source().num()),
+                          pgid.shard};
+    PeeringState::RequestScrub scrub_request{m->deep, m->repair};
+    return shard_services.start_operation<RemotePeeringEvent>(
+      *this,
+      conn,
+      shard_services,
+      from_shard,
+      pgid,
+      PGPeeringEvent{m->epoch, m->epoch, scrub_request}).second;
+  });
+}
+
+seastar::future<> OSD::handle_mark_me_down(crimson::net::ConnectionRef conn,
+                                          Ref<MOSDMarkMeDown> m)
+{
+  if (state.is_prestop()) {
+    got_stop_ack();
+  }
+  return seastar::now();
+}
+
+seastar::future<> OSD::handle_recovery_subreq(crimson::net::ConnectionRef conn,
+                                  Ref<MOSDFastDispatchOp> m)
+{
+  (void) shard_services.start_operation<RecoverySubRequest>(
+    *this,
+    conn,
+    std::move(m));
+  return seastar::now();
+}
+
 bool OSD::should_restart() const
 {
   if (!osdmap->is_up(whoami)) {
@@ -1010,7 +1215,7 @@ bool OSD::should_restart() const
 seastar::future<> OSD::restart()
 {
   beacon_timer.cancel();
-  heartbeat_timer.cancel();
+  tick_timer.cancel();
   up_epoch = 0;
   bind_epoch = osdmap->get_epoch();
   // TODO: promote to shutdown if being marked down for multiple times
@@ -1036,14 +1241,15 @@ seastar::future<> OSD::send_beacon()
   epoch_t min_last_epoch_clean = osdmap->get_epoch();
   auto m = make_message<MOSDBeacon>(osdmap->get_epoch(),
                                     min_last_epoch_clean,
-                                   superblock.last_purged_snaps_scrub);
+                                    superblock.last_purged_snaps_scrub,
+                                    local_conf()->osd_beacon_report_interval);
   return monc->send_message(m);
 }
 
-seastar::future<> OSD::update_heartbeat_peers()
+void OSD::update_heartbeat_peers()
 {
   if (!state.is_active()) {
-    return seastar::now();
+    return;
   }
   for (auto& pg : pg_map.get_pgs()) {
     vector<int> up, acting;
@@ -1058,22 +1264,23 @@ seastar::future<> OSD::update_heartbeat_peers()
       }
     }
   }
-  return heartbeat->update_peers(whoami);
+  heartbeat->update_peers(whoami);
 }
 
 seastar::future<> OSD::handle_peering_op(
-  crimson::net::Connection* conn,
+  crimson::net::ConnectionRef conn,
   Ref<MOSDPeeringOp> m)
 {
   const int from = m->get_source().num();
   logger().debug("handle_peering_op on {} from {}", m->get_spg(), from);
-  shard_services.start_operation<RemotePeeringEvent>(
+  std::unique_ptr<PGPeeringEvent> evt(m->get_event());
+  (void) shard_services.start_operation<RemotePeeringEvent>(
     *this,
-    conn->get_shared(),
+    conn,
     shard_services,
     pg_shard_t{from, m->get_spg().shard},
     m->get_spg(),
-    std::move(*m->get_event()));
+    std::move(*evt));
   return seastar::now();
 }
 
@@ -1103,18 +1310,55 @@ OSD::get_or_create_pg(
   epoch_t epoch,
   std::unique_ptr<PGCreateInfo> info)
 {
-  auto [fut, creating] = pg_map.get_pg(pgid, bool(info));
-  if (!creating && info) {
-    pg_map.set_creating(pgid);
-    (void)handle_pg_create_info(std::move(info));
+  if (info) {
+    auto [fut, creating] = pg_map.wait_for_pg(pgid);
+    if (!creating) {
+      pg_map.set_creating(pgid);
+      (void)handle_pg_create_info(std::move(info));
+    }
+    return std::move(fut);
+  } else {
+    return make_ready_blocking_future<Ref<PG>>(pg_map.get_pg(pgid));
   }
-  return std::move(fut);
 }
 
 blocking_future<Ref<PG>> OSD::wait_for_pg(
   spg_t pgid)
 {
-  return pg_map.get_pg(pgid).first;
+  return pg_map.wait_for_pg(pgid).first;
+}
+
+Ref<PG> OSD::get_pg(spg_t pgid)
+{
+  return pg_map.get_pg(pgid);
+}
+
+seastar::future<> OSD::prepare_to_stop()
+{
+  if (osdmap && osdmap->is_up(whoami)) {
+    state.set_prestop();
+    const auto timeout =
+      std::chrono::duration_cast<std::chrono::milliseconds>(
+       std::chrono::duration<double>(
+         local_conf().get_val<double>("osd_mon_shutdown_timeout")));
+
+    return seastar::with_timeout(
+      seastar::timer<>::clock::now() + timeout,
+      monc->send_message(
+         make_message<MOSDMarkMeDown>(
+           monc->get_fsid(),
+           whoami,
+           osdmap->get_addrs(whoami),
+           osdmap->get_epoch(),
+           true)).then([this] {
+       return stop_acked.get_future();
+      })
+    ).handle_exception_type(
+      [](seastar::timed_out_error&) {
+      return seastar::now();
+    });
+  }
+  return seastar::now();
 }
 
 }