]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/osd/shard_services.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / osd / shard_services.cc
index 17192e65d76a85e618d3ce95231906d464ce4714..647d8d6bee4a52b8ac60ca4931e6b6d57e07304b 100644 (file)
@@ -1,9 +1,12 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
+#include <boost/smart_ptr/make_local_shared.hpp>
+
 #include "crimson/osd/shard_services.h"
 
 #include "messages/MOSDAlive.h"
+#include "messages/MOSDMap.h"
 #include "messages/MOSDPGCreated.h"
 #include "messages/MOSDPGTemp.h"
 
@@ -16,6 +19,9 @@
 #include "crimson/net/Connection.h"
 #include "crimson/os/cyanstore/cyan_store.h"
 #include "crimson/osd/osdmap_service.h"
+#include "crimson/osd/osd_operations/pg_advance_map.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/pg_meta.h"
 
 namespace {
   seastar::logger& logger() {
@@ -27,67 +33,125 @@ using std::vector;
 
 namespace crimson::osd {
 
-ShardServices::ShardServices(
-  OSDMapService &osdmap_service,
-  const int whoami,
-  crimson::net::Messenger &cluster_msgr,
-  crimson::net::Messenger &public_msgr,
-  crimson::mon::Client &monc,
-  crimson::mgr::Client &mgrc,
+PerShardState::PerShardState(
+  int whoami,
+  ceph::mono_time startup_time,
+  PerfCounters *perf,
+  PerfCounters *recoverystate_perf,
   crimson::os::FuturizedStore &store)
-    : osdmap_service(osdmap_service),
-      whoami(whoami),
-      cluster_msgr(cluster_msgr),
-      public_msgr(public_msgr),
-      monc(monc),
-      mgrc(mgrc),
-      store(store),
-      throttler(crimson::common::local_conf()),
-      obc_registry(crimson::common::local_conf()),
-      local_reserver(
-       &cct,
-       &finisher,
-       crimson::common::local_conf()->osd_max_backfills,
-       crimson::common::local_conf()->osd_min_recovery_priority),
-      remote_reserver(
-       &cct,
-       &finisher,
-       crimson::common::local_conf()->osd_max_backfills,
-       crimson::common::local_conf()->osd_min_recovery_priority)
+  : whoami(whoami),
+    store(store.get_sharded_store()),
+    perf(perf), recoverystate_perf(recoverystate_perf),
+    throttler(crimson::common::local_conf()),
+    next_tid(
+      static_cast<ceph_tid_t>(seastar::this_shard_id()) <<
+      (std::numeric_limits<ceph_tid_t>::digits - 8)),
+    startup_time(startup_time)
+{}
+
+seastar::future<> PerShardState::dump_ops_in_flight(Formatter *f) const
 {
-  perf = build_osd_logger(&cct);
-  cct.get_perfcounters_collection()->add(perf);
+  registry.for_each_op([f](const auto &op) {
+    op.dump(f);
+  });
+  return seastar::now();
+}
 
-  recoverystate_perf = build_recoverystate_perf(&cct);
-  cct.get_perfcounters_collection()->add(recoverystate_perf);
+seastar::future<> PerShardState::stop_pgs()
+{
+  assert_core();
+  return seastar::parallel_for_each(
+    pg_map.get_pgs(),
+    [](auto& p) {
+      return p.second->stop();
+    });
+}
 
-  crimson::common::local_conf().add_observer(this);
+std::map<pg_t, pg_stat_t> PerShardState::get_pg_stats() const
+{
+  assert_core();
+  std::map<pg_t, pg_stat_t> ret;
+  for (auto [pgid, pg] : pg_map.get_pgs()) {
+    if (pg->is_primary()) {
+      auto stats = pg->get_stats();
+      // todo: update reported_epoch,reported_seq,last_fresh
+      stats.reported_epoch = osdmap->get_epoch();
+      ret.emplace(pgid.pgid, std::move(stats));
+    }
+  }
+  return ret;
 }
 
-const char** ShardServices::get_tracked_conf_keys() const
+seastar::future<> PerShardState::broadcast_map_to_pgs(
+  ShardServices &shard_services,
+  epoch_t epoch)
 {
-  static const char* KEYS[] = {
-    "osd_max_backfills",
-    "osd_min_recovery_priority",
-    nullptr
-  };
-  return KEYS;
+  assert_core();
+  auto &pgs = pg_map.get_pgs();
+  return seastar::parallel_for_each(
+    pgs.begin(), pgs.end(),
+    [=, &shard_services](auto& pg) {
+      return shard_services.start_operation<PGAdvanceMap>(
+       shard_services,
+       pg.second, epoch,
+       PeeringCtx{}, false).second;
+    });
 }
 
-void ShardServices::handle_conf_change(const ConfigProxy& conf,
-                                      const std::set <std::string> &changed)
+Ref<PG> PerShardState::get_pg(spg_t pgid)
 {
-  if (changed.count("osd_max_backfills")) {
-    local_reserver.set_max(conf->osd_max_backfills);
-    remote_reserver.set_max(conf->osd_max_backfills);
-  }
-  if (changed.count("osd_min_recovery_priority")) {
-    local_reserver.set_min_priority(conf->osd_min_recovery_priority);
-    remote_reserver.set_min_priority(conf->osd_min_recovery_priority);
+  assert_core();
+  return pg_map.get_pg(pgid);
+}
+
+HeartbeatStampsRef PerShardState::get_hb_stamps(int peer)
+{
+  assert_core();
+  auto [stamps, added] = heartbeat_stamps.try_emplace(peer);
+  if (added) {
+    stamps->second = ceph::make_ref<HeartbeatStamps>(peer);
   }
+  return stamps->second;
+}
+
+OSDSingletonState::OSDSingletonState(
+  int whoami,
+  crimson::net::Messenger &cluster_msgr,
+  crimson::net::Messenger &public_msgr,
+  crimson::mon::Client &monc,
+  crimson::mgr::Client &mgrc)
+  : whoami(whoami),
+    osdmap_gate("OSDSingletonState::osdmap_gate"),
+    cluster_msgr(cluster_msgr),
+    public_msgr(public_msgr),
+    monc(monc),
+    mgrc(mgrc),
+    local_reserver(
+      &cct,
+      &finisher,
+      crimson::common::local_conf()->osd_max_backfills,
+      crimson::common::local_conf()->osd_min_recovery_priority),
+    remote_reserver(
+      &cct,
+      &finisher,
+      crimson::common::local_conf()->osd_max_backfills,
+      crimson::common::local_conf()->osd_min_recovery_priority),
+    snap_reserver(
+      &cct,
+      &finisher,
+      crimson::common::local_conf()->osd_max_trimming_pgs)
+{
+  crimson::common::local_conf().add_observer(this);
+  osdmaps[0] = boost::make_local_shared<OSDMap>();
+
+  perf = build_osd_logger(&cct);
+  cct.get_perfcounters_collection()->add(perf);
+
+  recoverystate_perf = build_recoverystate_perf(&cct);
+  cct.get_perfcounters_collection()->add(recoverystate_perf);
 }
 
-seastar::future<> ShardServices::send_to_osd(
+seastar::future<> OSDSingletonState::send_to_osd(
   int peer, MessageURef m, epoch_t from_epoch)
 {
   if (osdmap->is_down(peer)) {
@@ -104,49 +168,22 @@ seastar::future<> ShardServices::send_to_osd(
   }
 }
 
-seastar::future<> ShardServices::dispatch_context_transaction(
-  crimson::os::CollectionRef col, PeeringCtx &ctx) {
-  logger().debug("ShardServices::dispatch_context_transaction: do_transaction...");
-  auto ret = store.do_transaction(
-    col,
-    std::move(ctx.transaction));
-  ctx.reset_transaction();
-  return ret;
-}
-
-seastar::future<> ShardServices::dispatch_context_messages(
-  BufferedRecoveryMessages &&ctx)
-{
-  auto ret = seastar::parallel_for_each(std::move(ctx.message_map),
-    [this](auto& osd_messages) {
-      auto& [peer, messages] = osd_messages;
-      logger().debug("dispatch_context_messages sending messages to {}", peer);
-      return seastar::parallel_for_each(
-        std::move(messages), [=, peer=peer](auto& m) {
-        return send_to_osd(peer, std::move(m), osdmap->get_epoch());
-      });
-    });
-  ctx.message_map.clear();
-  return ret;
-}
-
-seastar::future<> ShardServices::dispatch_context(
-  crimson::os::CollectionRef col,
-  PeeringCtx &&ctx)
+seastar::future<> OSDSingletonState::osdmap_subscribe(
+  version_t epoch, bool force_request)
 {
-  ceph_assert(col || ctx.transaction.empty());
-  return seastar::when_all_succeed(
-    dispatch_context_messages(
-      BufferedRecoveryMessages{ctx}),
-    col ? dispatch_context_transaction(col, ctx) : seastar::now()
-  ).then_unpack([] {
+  logger().info("{}({})", __func__, epoch);
+  if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
+      force_request) {
+    return monc.renew_subs();
+  } else {
     return seastar::now();
-  });
+  }
 }
 
-void ShardServices::queue_want_pg_temp(pg_t pgid,
-                                   const vector<int>& want,
-                                   bool forced)
+void OSDSingletonState::queue_want_pg_temp(
+  pg_t pgid,
+  const vector<int>& want,
+  bool forced)
 {
   auto p = pg_temp_pending.find(pgid);
   if (p == pg_temp_pending.end() ||
@@ -156,13 +193,13 @@ void ShardServices::queue_want_pg_temp(pg_t pgid,
   }
 }
 
-void ShardServices::remove_want_pg_temp(pg_t pgid)
+void OSDSingletonState::remove_want_pg_temp(pg_t pgid)
 {
   pg_temp_wanted.erase(pgid);
   pg_temp_pending.erase(pgid);
 }
 
-void ShardServices::requeue_pg_temp()
+void OSDSingletonState::requeue_pg_temp()
 {
   unsigned old_wanted = pg_temp_wanted.size();
   unsigned old_pending = pg_temp_pending.size();
@@ -176,18 +213,7 @@ void ShardServices::requeue_pg_temp()
     pg_temp_wanted.size());
 }
 
-std::ostream& operator<<(
-  std::ostream& out,
-  const ShardServices::pg_temp_t& pg_temp)
-{
-  out << pg_temp.acting;
-  if (pg_temp.forced) {
-    out << " (forced)";
-  }
-  return out;
-}
-
-seastar::future<> ShardServices::send_pg_temp()
+seastar::future<> OSDSingletonState::send_pg_temp()
 {
   if (pg_temp_wanted.empty())
     return seastar::now();
@@ -213,17 +239,18 @@ seastar::future<> ShardServices::send_pg_temp()
     });
 }
 
-void ShardServices::update_map(cached_map_t new_osdmap)
-{
-  osdmap = std::move(new_osdmap);
-}
-
-ShardServices::cached_map_t &ShardServices::get_osdmap()
+std::ostream& operator<<(
+  std::ostream& out,
+  const OSDSingletonState::pg_temp_t& pg_temp)
 {
-  return osdmap;
+  out << pg_temp.acting;
+  if (pg_temp.forced) {
+    out << " (forced)";
+  }
+  return out;
 }
 
-seastar::future<> ShardServices::send_pg_created(pg_t pgid)
+seastar::future<> OSDSingletonState::send_pg_created(pg_t pgid)
 {
   logger().debug(__func__);
   auto o = get_osdmap();
@@ -232,7 +259,7 @@ seastar::future<> ShardServices::send_pg_created(pg_t pgid)
   return monc.send_message(crimson::make_message<MOSDPGCreated>(pgid));
 }
 
-seastar::future<> ShardServices::send_pg_created()
+seastar::future<> OSDSingletonState::send_pg_created()
 {
   logger().debug(__func__);
   auto o = get_osdmap();
@@ -243,7 +270,7 @@ seastar::future<> ShardServices::send_pg_created()
     });
 }
 
-void ShardServices::prune_pg_created()
+void OSDSingletonState::prune_pg_created()
 {
   logger().debug(__func__);
   auto o = get_osdmap();
@@ -260,27 +287,7 @@ void ShardServices::prune_pg_created()
   }
 }
 
-seastar::future<> ShardServices::osdmap_subscribe(version_t epoch, bool force_request)
-{
-  logger().info("{}({})", __func__, epoch);
-  if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
-      force_request) {
-    return monc.renew_subs();
-  } else {
-    return seastar::now();
-  }
-}
-
-HeartbeatStampsRef ShardServices::get_hb_stamps(int peer)
-{
-  auto [stamps, added] = heartbeat_stamps.try_emplace(peer);
-  if (added) {
-    stamps->second = ceph::make_ref<HeartbeatStamps>(peer);
-  }
-  return stamps->second;
-}
-
-seastar::future<> ShardServices::send_alive(const epoch_t want)
+seastar::future<> OSDSingletonState::send_alive(const epoch_t want)
 {
   logger().info(
     "{} want={} up_thru_wanted={}",
@@ -309,4 +316,434 @@ seastar::future<> ShardServices::send_alive(const epoch_t want)
   }
 }
 
+const char** OSDSingletonState::get_tracked_conf_keys() const
+{
+  static const char* KEYS[] = {
+    "osd_max_backfills",
+    "osd_min_recovery_priority",
+    "osd_max_trimming_pgs",
+    nullptr
+  };
+  return KEYS;
+}
+
+void OSDSingletonState::handle_conf_change(
+  const ConfigProxy& conf,
+  const std::set <std::string> &changed)
+{
+  if (changed.count("osd_max_backfills")) {
+    local_reserver.set_max(conf->osd_max_backfills);
+    remote_reserver.set_max(conf->osd_max_backfills);
+  }
+  if (changed.count("osd_min_recovery_priority")) {
+    local_reserver.set_min_priority(conf->osd_min_recovery_priority);
+    remote_reserver.set_min_priority(conf->osd_min_recovery_priority);
+  }
+  if (changed.count("osd_max_trimming_pgs")) {
+    snap_reserver.set_max(conf->osd_max_trimming_pgs);
+  }
+}
+
+seastar::future<OSDSingletonState::local_cached_map_t>
+OSDSingletonState::get_local_map(epoch_t e)
+{
+  // TODO: use LRU cache for managing osdmap, fallback to disk if we have to
+  if (auto found = osdmaps.find(e); found) {
+    return seastar::make_ready_future<local_cached_map_t>(std::move(found));
+  } else {
+    return load_map(e).then([e, this](std::unique_ptr<OSDMap> osdmap) {
+      return seastar::make_ready_future<local_cached_map_t>(
+       osdmaps.insert(e, std::move(osdmap)));
+    });
+  }
+}
+
+void OSDSingletonState::store_map_bl(
+  ceph::os::Transaction& t,
+  epoch_t e, bufferlist&& bl)
+{
+  meta_coll->store_map(t, e, bl);
+  map_bl_cache.insert(e, std::move(bl));
+}
+
+seastar::future<bufferlist> OSDSingletonState::load_map_bl(
+  epoch_t e)
+{
+  if (std::optional<bufferlist> found = map_bl_cache.find(e); found) {
+    return seastar::make_ready_future<bufferlist>(*found);
+  } else {
+    return meta_coll->load_map(e);
+  }
+}
+
+seastar::future<std::map<epoch_t, bufferlist>> OSDSingletonState::load_map_bls(
+  epoch_t first,
+  epoch_t last)
+{
+  logger().debug("{} loading maps [{},{}]",
+                 __func__, first, last);
+  ceph_assert(first <= last);
+  return seastar::map_reduce(boost::make_counting_iterator<epoch_t>(first),
+                            boost::make_counting_iterator<epoch_t>(last + 1),
+                            [this](epoch_t e) {
+    return load_map_bl(e).then([e](auto&& bl) {
+      return seastar::make_ready_future<std::pair<epoch_t, bufferlist>>(
+       std::make_pair(e, std::move(bl)));
+    });
+  },
+  std::map<epoch_t, bufferlist>{},
+  [](auto&& bls, auto&& epoch_bl) {
+    bls.emplace(std::move(epoch_bl));
+    return std::move(bls);
+  });
+}
+
+seastar::future<std::unique_ptr<OSDMap>> OSDSingletonState::load_map(epoch_t e)
+{
+  auto o = std::make_unique<OSDMap>();
+  if (e > 0) {
+    return load_map_bl(e).then([o=std::move(o)](bufferlist bl) mutable {
+      o->decode(bl);
+      return seastar::make_ready_future<std::unique_ptr<OSDMap>>(std::move(o));
+    });
+  } else {
+    return seastar::make_ready_future<std::unique_ptr<OSDMap>>(std::move(o));
+  }
+}
+
+seastar::future<> OSDSingletonState::store_maps(ceph::os::Transaction& t,
+                                  epoch_t start, Ref<MOSDMap> m)
+{
+  return seastar::do_for_each(
+    boost::make_counting_iterator(start),
+    boost::make_counting_iterator(m->get_last() + 1),
+    [&t, m, this](epoch_t e) {
+      if (auto p = m->maps.find(e); p != m->maps.end()) {
+       auto o = std::make_unique<OSDMap>();
+       o->decode(p->second);
+       logger().info("store_maps osdmap.{}", e);
+       store_map_bl(t, e, std::move(std::move(p->second)));
+       osdmaps.insert(e, std::move(o));
+       return seastar::now();
+      } else if (auto p = m->incremental_maps.find(e);
+                p != m->incremental_maps.end()) {
+       return load_map(e - 1).then([e, bl=p->second, &t, this](auto o) {
+         OSDMap::Incremental inc;
+         auto i = bl.cbegin();
+         inc.decode(i);
+         o->apply_incremental(inc);
+         bufferlist fbl;
+         o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
+         store_map_bl(t, e, std::move(fbl));
+         osdmaps.insert(e, std::move(o));
+         return seastar::now();
+       });
+      } else {
+       logger().error("MOSDMap lied about what maps it had?");
+       return seastar::now();
+      }
+    });
+}
+
+seastar::future<Ref<PG>> ShardServices::make_pg(
+  OSDMapService::cached_map_t create_map,
+  spg_t pgid,
+  bool do_create)
+{
+  using ec_profile_t = std::map<std::string, std::string>;
+  auto get_pool_info_for_pg = [create_map, pgid, this] {
+    if (create_map->have_pg_pool(pgid.pool())) {
+      pg_pool_t pi = *create_map->get_pg_pool(pgid.pool());
+      std::string name = create_map->get_pool_name(pgid.pool());
+      ec_profile_t ec_profile;
+      if (pi.is_erasure()) {
+       ec_profile = create_map->get_erasure_code_profile(
+         pi.erasure_code_profile);
+      }
+      return seastar::make_ready_future<
+       std::tuple<pg_pool_t,std::string, ec_profile_t>
+       >(std::make_tuple(
+           std::move(pi),
+           std::move(name),
+           std::move(ec_profile)));
+    } else {
+      // pool was deleted; grab final pg_pool_t off disk.
+      return get_pool_info(pgid.pool());
+    }
+  };
+  auto get_collection = [pgid, do_create, this] {
+    const coll_t cid{pgid};
+    if (do_create) {
+      return get_store().create_new_collection(cid);
+    } else {
+      return get_store().open_collection(cid);
+    }
+  };
+  return seastar::when_all(
+    std::move(get_pool_info_for_pg),
+    std::move(get_collection)
+  ).then([pgid, create_map, this](auto &&ret) {
+    auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get0());
+    auto coll = std::move(std::get<1>(ret).get0());
+    return seastar::make_ready_future<Ref<PG>>(
+      new PG{
+       pgid,
+       pg_shard_t{local_state.whoami, pgid.shard},
+       std::move(coll),
+       std::move(pool),
+       std::move(name),
+       create_map,
+       *this,
+       ec_profile});
+  });
+}
+
+seastar::future<Ref<PG>> ShardServices::handle_pg_create_info(
+  std::unique_ptr<PGCreateInfo> info) {
+  return seastar::do_with(
+    std::move(info),
+    [this](auto &info)
+    -> seastar::future<Ref<PG>> {
+      return get_map(info->epoch).then(
+       [&info, this](cached_map_t startmap)
+       -> seastar::future<std::tuple<Ref<PG>, cached_map_t>> {
+         const spg_t &pgid = info->pgid;
+         if (info->by_mon) {
+           int64_t pool_id = pgid.pgid.pool();
+           const pg_pool_t *pool = get_map()->get_pg_pool(pool_id);
+           if (!pool) {
+             logger().debug(
+               "{} ignoring pgid {}, pool dne",
+               __func__,
+               pgid);
+             local_state.pg_map.pg_creation_canceled(pgid);
+             return seastar::make_ready_future<
+               std::tuple<Ref<PG>, OSDMapService::cached_map_t>
+               >(std::make_tuple(Ref<PG>(), startmap));
+           } else if (!pool->is_crimson()) {
+             logger().debug(
+               "{} ignoring pgid {}, pool lacks crimson flag",
+               __func__,
+               pgid);
+             local_state.pg_map.pg_creation_canceled(pgid);
+             return seastar::make_ready_future<
+               std::tuple<Ref<PG>, OSDMapService::cached_map_t>
+               >(std::make_tuple(Ref<PG>(), startmap));
+           }
+           ceph_assert(get_map()->require_osd_release >=
+                       ceph_release_t::octopus);
+           if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) {
+             // this ensures we do not process old creating messages after the
+             // pool's initial pgs have been created (and pg are subsequently
+             // allowed to split or merge).
+             logger().debug(
+               "{} dropping {} create, pool does not have CREATING flag set",
+               __func__,
+               pgid);
+             local_state.pg_map.pg_creation_canceled(pgid);
+             return seastar::make_ready_future<
+               std::tuple<Ref<PG>, OSDMapService::cached_map_t>
+               >(std::make_tuple(Ref<PG>(), startmap));
+           }
+         }
+         return make_pg(
+           startmap, pgid, true
+         ).then([startmap=std::move(startmap)](auto pg) mutable {
+           return seastar::make_ready_future<
+             std::tuple<Ref<PG>, OSDMapService::cached_map_t>
+             >(std::make_tuple(std::move(pg), std::move(startmap)));
+         });
+       }).then([this, &info](auto &&ret)
+               ->seastar::future<Ref<PG>> {
+         auto [pg, startmap] = std::move(ret);
+         if (!pg)
+           return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
+         const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool());
+
+         int up_primary, acting_primary;
+         vector<int> up, acting;
+         startmap->pg_to_up_acting_osds(
+           info->pgid.pgid, &up, &up_primary, &acting, &acting_primary);
+
+         int role = startmap->calc_pg_role(
+           pg_shard_t(local_state.whoami, info->pgid.shard),
+           acting);
+
+         PeeringCtx rctx;
+         create_pg_collection(
+           rctx.transaction,
+           info->pgid,
+           info->pgid.get_split_bits(pp->get_pg_num()));
+         init_pg_ondisk(
+           rctx.transaction,
+           info->pgid,
+           pp);
+
+         pg->init(
+           role,
+           up,
+           up_primary,
+           acting,
+           acting_primary,
+           info->history,
+           info->past_intervals,
+           rctx.transaction);
+
+         return start_operation<PGAdvanceMap>(
+           *this, pg, get_map()->get_epoch(), std::move(rctx), true
+         ).second.then([pg=pg] {
+           return seastar::make_ready_future<Ref<PG>>(pg);
+         });
+       });
+    });
+}
+
+
+ShardServices::get_or_create_pg_ret
+ShardServices::get_or_create_pg(
+  PGMap::PGCreationBlockingEvent::TriggerI&& trigger,
+  spg_t pgid,
+  epoch_t epoch,
+  std::unique_ptr<PGCreateInfo> info)
+{
+  if (info) {
+    auto [fut, creating] = local_state.pg_map.wait_for_pg(
+      std::move(trigger), pgid);
+    if (!creating) {
+      local_state.pg_map.set_creating(pgid);
+      (void)handle_pg_create_info(
+       std::move(info));
+    }
+    return std::move(fut);
+  } else {
+    return get_or_create_pg_ret(
+      get_or_create_pg_ertr::ready_future_marker{},
+      local_state.pg_map.get_pg(pgid));
+  }
+}
+
+ShardServices::wait_for_pg_ret
+ShardServices::wait_for_pg(
+  PGMap::PGCreationBlockingEvent::TriggerI&& trigger, spg_t pgid)
+{
+  return local_state.pg_map.wait_for_pg(std::move(trigger), pgid).first;
+}
+
+seastar::future<Ref<PG>> ShardServices::load_pg(spg_t pgid)
+
+{
+  logger().debug("{}: {}", __func__, pgid);
+
+  return seastar::do_with(PGMeta(get_store(), pgid), [](auto& pg_meta) {
+    return pg_meta.get_epoch();
+  }).then([this](epoch_t e) {
+    return get_map(e);
+  }).then([pgid, this](auto&& create_map) {
+    return make_pg(std::move(create_map), pgid, false);
+  }).then([this](Ref<PG> pg) {
+    return pg->read_state(&get_store()).then([pg] {
+       return seastar::make_ready_future<Ref<PG>>(std::move(pg));
+    });
+  }).handle_exception([pgid](auto ep) {
+    logger().info("pg {} saw exception on load {}", pgid, ep);
+    ceph_abort("Could not load pg" == 0);
+    return seastar::make_exception_future<Ref<PG>>(ep);
+  });
+}
+
+seastar::future<> ShardServices::dispatch_context_transaction(
+  crimson::os::CollectionRef col, PeeringCtx &ctx) {
+  if (ctx.transaction.empty()) {
+    logger().debug("ShardServices::dispatch_context_transaction: empty transaction");
+    return seastar::now();
+  }
+
+  logger().debug("ShardServices::dispatch_context_transaction: do_transaction ...");
+  auto ret = get_store().do_transaction(
+    col,
+    std::move(ctx.transaction));
+  ctx.reset_transaction();
+  return ret;
+}
+
+seastar::future<> ShardServices::dispatch_context_messages(
+  BufferedRecoveryMessages &&ctx)
+{
+  auto ret = seastar::parallel_for_each(std::move(ctx.message_map),
+    [this](auto& osd_messages) {
+      auto& [peer, messages] = osd_messages;
+      logger().debug("dispatch_context_messages sending messages to {}", peer);
+      return seastar::parallel_for_each(
+        std::move(messages), [=, peer=peer, this](auto& m) {
+        return send_to_osd(peer, std::move(m), local_state.osdmap->get_epoch());
+      });
+    });
+  ctx.message_map.clear();
+  return ret;
+}
+
+seastar::future<> ShardServices::dispatch_context(
+  crimson::os::CollectionRef col,
+  PeeringCtx &&ctx)
+{
+  ceph_assert(col || ctx.transaction.empty());
+  return seastar::when_all_succeed(
+    dispatch_context_messages(
+      BufferedRecoveryMessages{ctx}),
+    col ? dispatch_context_transaction(col, ctx) : seastar::now()
+  ).then_unpack([] {
+    return seastar::now();
+  });
+}
+
+seastar::future<> OSDSingletonState::send_incremental_map(
+  crimson::net::Connection &conn,
+  epoch_t first)
+{
+  if (first >= superblock.oldest_map) {
+    return load_map_bls(
+      first, superblock.newest_map
+    ).then([this, &conn, first](auto&& bls) {
+      auto m = crimson::make_message<MOSDMap>(
+       monc.get_fsid(),
+       osdmap->get_encoding_features());
+      m->cluster_osdmap_trim_lower_bound = first;
+      m->newest_map = superblock.newest_map;
+      m->maps = std::move(bls);
+      return conn.send(std::move(m));
+    });
+  } else {
+    return load_map_bl(osdmap->get_epoch()
+    ).then([this, &conn](auto&& bl) mutable {
+      auto m = crimson::make_message<MOSDMap>(
+       monc.get_fsid(),
+       osdmap->get_encoding_features());
+      /* TODO: once we support the tracking of superblock's
+       *       cluster_osdmap_trim_lower_bound, the MOSDMap should
+       *       be populated with this value instead of the oldest_map.
+       *       See: OSD::handle_osd_map for how classic updates the
+       *       cluster's trim lower bound.
+       */
+      m->cluster_osdmap_trim_lower_bound = superblock.oldest_map;
+      m->newest_map = superblock.newest_map;
+      m->maps.emplace(osdmap->get_epoch(), std::move(bl));
+      return conn.send(std::move(m));
+    });
+  }
+}
+
+seastar::future<> OSDSingletonState::send_incremental_map_to_osd(
+  int osd,
+  epoch_t first)
+{
+  if (osdmap->is_down(osd)) {
+    logger().info("{}: osd.{} is_down", __func__, osd);
+    return seastar::now();
+  } else {
+    auto conn = cluster_msgr.connect(
+      osdmap->get_cluster_addrs(osd).front(), CEPH_ENTITY_TYPE_OSD);
+    return send_incremental_map(*conn, first);
+  }
+}
+
 };