]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/osd/pg_shard_manager.h
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / osd / pg_shard_manager.h
index fd99304ba848674d5d2b3e34bc61882be6250433..2f3a3015d1cd67117e6888081c805873c4cf1243 100644 (file)
@@ -24,8 +24,9 @@ namespace crimson::osd {
  * etc)
  */
 class PGShardManager {
-  seastar::sharded<OSDSingletonState> osd_singleton_state;
-  seastar::sharded<ShardServices> shard_services;
+  seastar::sharded<OSDSingletonState> &osd_singleton_state;
+  seastar::sharded<ShardServices> &shard_services;
+  seastar::sharded<PGShardMapping> &pg_to_shard_mapping;
 
 #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET)          \
   template <typename... Args>                                  \
@@ -46,16 +47,13 @@ public:
   using cached_map_t = OSDMapService::cached_map_t;
   using local_cached_map_t = OSDMapService::local_cached_map_t;
 
-  PGShardManager() = default;
-
-  seastar::future<> start(
-    const int whoami,
-    crimson::net::Messenger &cluster_msgr,
-    crimson::net::Messenger &public_msgr,
-    crimson::mon::Client &monc,
-    crimson::mgr::Client &mgrc,
-    crimson::os::FuturizedStore &store);
-  seastar::future<> stop();
+  PGShardManager(
+    seastar::sharded<OSDSingletonState> &osd_singleton_state,
+    seastar::sharded<ShardServices> &shard_services,
+    seastar::sharded<PGShardMapping> &pg_to_shard_mapping)
+  : osd_singleton_state(osd_singleton_state),
+    shard_services(shard_services),
+    pg_to_shard_mapping(pg_to_shard_mapping) {}
 
   auto &get_osd_singleton_state() {
     ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
@@ -66,15 +64,15 @@ public:
     return osd_singleton_state.local();
   }
   auto &get_shard_services() {
-    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
     return shard_services.local();
   }
   auto &get_shard_services() const {
-    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
     return shard_services.local();
   }
   auto &get_local_state() { return get_shard_services().local_state; }
   auto &get_local_state() const { return get_shard_services().local_state; }
+  auto &get_pg_to_shard_mapping() { return pg_to_shard_mapping.local(); }
+  auto &get_pg_to_shard_mapping() const { return pg_to_shard_mapping.local(); }
 
   seastar::future<> update_map(local_cached_map_t &&map) {
     get_osd_singleton_state().update_map(
@@ -109,22 +107,22 @@ public:
   FORWARD_TO_OSD_SINGLETON(send_pg_created)
 
   // osd state forwards
-  FORWARD(is_active, is_active, get_osd_singleton_state().osd_state)
-  FORWARD(is_preboot, is_preboot, get_osd_singleton_state().osd_state)
-  FORWARD(is_booting, is_booting, get_osd_singleton_state().osd_state)
-  FORWARD(is_stopping, is_stopping, get_osd_singleton_state().osd_state)
-  FORWARD(is_prestop, is_prestop, get_osd_singleton_state().osd_state)
-  FORWARD(is_initializing, is_initializing, get_osd_singleton_state().osd_state)
-  FORWARD(set_prestop, set_prestop, get_osd_singleton_state().osd_state)
-  FORWARD(set_preboot, set_preboot, get_osd_singleton_state().osd_state)
-  FORWARD(set_booting, set_booting, get_osd_singleton_state().osd_state)
-  FORWARD(set_stopping, set_stopping, get_osd_singleton_state().osd_state)
-  FORWARD(set_active, set_active, get_osd_singleton_state().osd_state)
-  FORWARD(when_active, when_active, get_osd_singleton_state().osd_state)
-  FORWARD_CONST(get_osd_state_string, to_string, get_osd_singleton_state().osd_state)
-
-  FORWARD(got_map, got_map, get_osd_singleton_state().osdmap_gate)
-  FORWARD(wait_for_map, wait_for_map, get_osd_singleton_state().osdmap_gate)
+  FORWARD(is_active, is_active, get_shard_services().local_state.osd_state)
+  FORWARD(is_preboot, is_preboot, get_shard_services().local_state.osd_state)
+  FORWARD(is_booting, is_booting, get_shard_services().local_state.osd_state)
+  FORWARD(is_stopping, is_stopping, get_shard_services().local_state.osd_state)
+  FORWARD(is_prestop, is_prestop, get_shard_services().local_state.osd_state)
+  FORWARD(is_initializing, is_initializing, get_shard_services().local_state.osd_state)
+  FORWARD(set_prestop, set_prestop, get_shard_services().local_state.osd_state)
+  FORWARD(set_preboot, set_preboot, get_shard_services().local_state.osd_state)
+  FORWARD(set_booting, set_booting, get_shard_services().local_state.osd_state)
+  FORWARD(set_stopping, set_stopping, get_shard_services().local_state.osd_state)
+  FORWARD(set_active, set_active, get_shard_services().local_state.osd_state)
+  FORWARD(when_active, when_active, get_shard_services().local_state.osd_state)
+  FORWARD_CONST(get_osd_state_string, to_string, get_shard_services().local_state.osd_state)
+
+  FORWARD(got_map, got_map, get_shard_services().local_state.osdmap_gate)
+  FORWARD(wait_for_map, wait_for_map, get_shard_services().local_state.osdmap_gate)
 
   // Metacoll
   FORWARD_TO_OSD_SINGLETON(init_meta_coll)
@@ -142,7 +140,6 @@ public:
 
   template <typename F>
   auto with_remote_shard_state(core_id_t core, F &&f) {
-    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
     return shard_services.invoke_on(
       core, [f=std::move(f)](auto &target_shard_services) mutable {
        return std::invoke(
@@ -156,7 +153,6 @@ public:
       core_id_t core,
       typename T::IRef &&op,
       F &&f) {
-    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
     if (seastar::this_shard_id() == core) {
       auto &target_shard_services = shard_services.local();
       return std::invoke(
@@ -188,20 +184,19 @@ public:
     typename T::IRef op
   ) {
     ceph_assert(op->use_count() == 1);
-    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
     auto &logger = crimson::get_logger(ceph_subsys_osd);
     static_assert(T::can_create());
     logger.debug("{}: can_create", *op);
 
-    auto core = get_osd_singleton_state().pg_to_shard_mapping.maybe_create_pg(
-      op->get_pgid());
-
     get_local_state().registry.remove_from_registry(*op);
-    return with_remote_shard_state_and_op<T>(
-      core, std::move(op),
-      [](PerShardState &per_shard_state,
-         ShardServices &shard_services,
-         typename T::IRef op) {
+    return get_pg_to_shard_mapping().maybe_create_pg(
+      op->get_pgid()
+    ).then([this, op = std::move(op)](auto core) mutable {
+      return this->template with_remote_shard_state_and_op<T>(
+        core, std::move(op),
+        [](PerShardState &per_shard_state,
+           ShardServices &shard_services,
+           typename T::IRef op) {
        per_shard_state.registry.add_to_registry(*op);
        auto &logger = crimson::get_logger(ceph_subsys_osd);
        auto &opref = *op;
@@ -211,7 +206,7 @@ public:
              auto &&trigger) {
            return shard_services.get_or_create_pg(
              std::move(trigger),
-             opref.get_pgid(), opref.get_epoch(),
+             opref.get_pgid(),
              std::move(opref.get_create_info())
            );
          }).safe_then([&logger, &shard_services, &opref](Ref<PG> pgref) {
@@ -224,6 +219,7 @@ public:
            })
          ).then([op=std::move(op)] {});
       });
+    });
   }
 
   /// Runs opref on the appropriate core, waiting for pg as necessary
@@ -232,20 +228,19 @@ public:
     typename T::IRef op
   ) {
     ceph_assert(op->use_count() == 1);
-    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
     auto &logger = crimson::get_logger(ceph_subsys_osd);
     static_assert(!T::can_create());
     logger.debug("{}: !can_create", *op);
 
-     auto core = get_osd_singleton_state().pg_to_shard_mapping.maybe_create_pg(
-      op->get_pgid());
-
     get_local_state().registry.remove_from_registry(*op);
-    return with_remote_shard_state_and_op<T>(
-      core, std::move(op),
-      [](PerShardState &per_shard_state,
-         ShardServices &shard_services,
-         typename T::IRef op) {
+    return get_pg_to_shard_mapping().maybe_create_pg(
+      op->get_pgid()
+    ).then([this, op = std::move(op)](auto core) mutable {
+      return this->template with_remote_shard_state_and_op<T>(
+        core, std::move(op),
+        [](PerShardState &per_shard_state,
+           ShardServices &shard_services,
+           typename T::IRef op) {
        per_shard_state.registry.add_to_registry(*op);
        auto &logger = crimson::get_logger(ceph_subsys_osd);
        auto &opref = *op;
@@ -265,6 +260,7 @@ public:
            })
          ).then([op=std::move(op)] {});
       });
+    });
   }
 
   seastar::future<> load_pgs(crimson::os::FuturizedStore& store);
@@ -313,20 +309,19 @@ public:
    */
   template <typename F>
   void for_each_pgid(F &&f) const {
-    return get_osd_singleton_state().pg_to_shard_mapping.for_each_pgid(
+    return get_pg_to_shard_mapping().for_each_pgid(
       std::forward<F>(f));
   }
 
   auto get_num_pgs() const {
-    return get_osd_singleton_state().pg_to_shard_mapping.get_num_pgs();
+    return get_pg_to_shard_mapping().get_num_pgs();
   }
 
   seastar::future<> broadcast_map_to_pgs(epoch_t epoch);
 
   template <typename F>
   auto with_pg(spg_t pgid, F &&f) {
-    core_id_t core = get_osd_singleton_state(
-    ).pg_to_shard_mapping.get_pg_mapping(pgid);
+    core_id_t core = get_pg_to_shard_mapping().get_pg_mapping(pgid);
     return with_remote_shard_state(
       core,
       [pgid, f=std::move(f)](auto &local_state, auto &local_service) mutable {
@@ -338,7 +333,6 @@ public:
 
   template <typename T, typename... Args>
   auto start_pg_operation(Args&&... args) {
-    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
     auto op = get_local_state().registry.create_operation<T>(
       std::forward<Args>(args)...);
     auto &logger = crimson::get_logger(ceph_subsys_osd);
@@ -352,35 +346,30 @@ public:
     auto fut = opref.template enter_stage<>(
       opref.get_connection_pipeline().await_active
     ).then([this, &opref, &logger] {
-      ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
       logger.debug("{}: start_pg_operation in await_active stage", opref);
-      return get_osd_singleton_state().osd_state.when_active();
+      return get_shard_services().local_state.osd_state.when_active();
     }).then([&logger, &opref] {
-      ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
       logger.debug("{}: start_pg_operation active, entering await_map", opref);
       return opref.template enter_stage<>(
        opref.get_connection_pipeline().await_map);
     }).then([this, &logger, &opref] {
-      ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
       logger.debug("{}: start_pg_operation await_map stage", opref);
       using OSDMapBlockingEvent =
        OSD_OSDMapGate::OSDMapBlocker::BlockingEvent;
       return opref.template with_blocking_event<OSDMapBlockingEvent>(
        [this, &opref](auto &&trigger) {
          std::ignore = this;
-         return get_osd_singleton_state().osdmap_gate.wait_for_map(
-           std::move(trigger),
-           opref.get_epoch(),
-           &get_shard_services());
-       });
+         return get_shard_services().local_state.osdmap_gate.wait_for_map(
+             std::move(trigger),
+             opref.get_epoch(),
+             &get_shard_services());
+      });
     }).then([&logger, &opref](auto epoch) {
-      ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
       logger.debug("{}: got map {}, entering get_pg", opref, epoch);
       return opref.template enter_stage<>(
        opref.get_connection_pipeline().get_pg);
     }).then([this, &logger, &opref, op=std::move(op)]() mutable {
       logger.debug("{}: in get_pg core {}", opref, seastar::this_shard_id());
-      ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
       logger.debug("{}: in get_pg", opref);
       if constexpr (T::can_create()) {
        logger.debug("{}: can_create", opref);