* etc)
*/
class PGShardManager {
- seastar::sharded<OSDSingletonState> osd_singleton_state;
- seastar::sharded<ShardServices> shard_services;
+ seastar::sharded<OSDSingletonState> &osd_singleton_state;
+ seastar::sharded<ShardServices> &shard_services;
+ seastar::sharded<PGShardMapping> &pg_to_shard_mapping;
#define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \
template <typename... Args> \
using cached_map_t = OSDMapService::cached_map_t;
using local_cached_map_t = OSDMapService::local_cached_map_t;
- PGShardManager() = default;
-
- seastar::future<> start(
- const int whoami,
- crimson::net::Messenger &cluster_msgr,
- crimson::net::Messenger &public_msgr,
- crimson::mon::Client &monc,
- crimson::mgr::Client &mgrc,
- crimson::os::FuturizedStore &store);
- seastar::future<> stop();
+ PGShardManager(
+ seastar::sharded<OSDSingletonState> &osd_singleton_state,
+ seastar::sharded<ShardServices> &shard_services,
+ seastar::sharded<PGShardMapping> &pg_to_shard_mapping)
+ : osd_singleton_state(osd_singleton_state),
+ shard_services(shard_services),
+ pg_to_shard_mapping(pg_to_shard_mapping) {}
auto &get_osd_singleton_state() {
ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return osd_singleton_state.local();
}
auto &get_shard_services() {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return shard_services.local();
}
auto &get_shard_services() const {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return shard_services.local();
}
auto &get_local_state() { return get_shard_services().local_state; }
auto &get_local_state() const { return get_shard_services().local_state; }
+ auto &get_pg_to_shard_mapping() { return pg_to_shard_mapping.local(); }
+ auto &get_pg_to_shard_mapping() const { return pg_to_shard_mapping.local(); }
seastar::future<> update_map(local_cached_map_t &&map) {
get_osd_singleton_state().update_map(
FORWARD_TO_OSD_SINGLETON(send_pg_created)
// osd state forwards
- FORWARD(is_active, is_active, get_osd_singleton_state().osd_state)
- FORWARD(is_preboot, is_preboot, get_osd_singleton_state().osd_state)
- FORWARD(is_booting, is_booting, get_osd_singleton_state().osd_state)
- FORWARD(is_stopping, is_stopping, get_osd_singleton_state().osd_state)
- FORWARD(is_prestop, is_prestop, get_osd_singleton_state().osd_state)
- FORWARD(is_initializing, is_initializing, get_osd_singleton_state().osd_state)
- FORWARD(set_prestop, set_prestop, get_osd_singleton_state().osd_state)
- FORWARD(set_preboot, set_preboot, get_osd_singleton_state().osd_state)
- FORWARD(set_booting, set_booting, get_osd_singleton_state().osd_state)
- FORWARD(set_stopping, set_stopping, get_osd_singleton_state().osd_state)
- FORWARD(set_active, set_active, get_osd_singleton_state().osd_state)
- FORWARD(when_active, when_active, get_osd_singleton_state().osd_state)
- FORWARD_CONST(get_osd_state_string, to_string, get_osd_singleton_state().osd_state)
-
- FORWARD(got_map, got_map, get_osd_singleton_state().osdmap_gate)
- FORWARD(wait_for_map, wait_for_map, get_osd_singleton_state().osdmap_gate)
+ FORWARD(is_active, is_active, get_shard_services().local_state.osd_state)
+ FORWARD(is_preboot, is_preboot, get_shard_services().local_state.osd_state)
+ FORWARD(is_booting, is_booting, get_shard_services().local_state.osd_state)
+ FORWARD(is_stopping, is_stopping, get_shard_services().local_state.osd_state)
+ FORWARD(is_prestop, is_prestop, get_shard_services().local_state.osd_state)
+ FORWARD(is_initializing, is_initializing, get_shard_services().local_state.osd_state)
+ FORWARD(set_prestop, set_prestop, get_shard_services().local_state.osd_state)
+ FORWARD(set_preboot, set_preboot, get_shard_services().local_state.osd_state)
+ FORWARD(set_booting, set_booting, get_shard_services().local_state.osd_state)
+ FORWARD(set_stopping, set_stopping, get_shard_services().local_state.osd_state)
+ FORWARD(set_active, set_active, get_shard_services().local_state.osd_state)
+ FORWARD(when_active, when_active, get_shard_services().local_state.osd_state)
+ FORWARD_CONST(get_osd_state_string, to_string, get_shard_services().local_state.osd_state)
+
+ FORWARD(got_map, got_map, get_shard_services().local_state.osdmap_gate)
+ FORWARD(wait_for_map, wait_for_map, get_shard_services().local_state.osdmap_gate)
// Metacoll
FORWARD_TO_OSD_SINGLETON(init_meta_coll)
template <typename F>
auto with_remote_shard_state(core_id_t core, F &&f) {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return shard_services.invoke_on(
core, [f=std::move(f)](auto &target_shard_services) mutable {
return std::invoke(
core_id_t core,
typename T::IRef &&op,
F &&f) {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
if (seastar::this_shard_id() == core) {
auto &target_shard_services = shard_services.local();
return std::invoke(
typename T::IRef op
) {
ceph_assert(op->use_count() == 1);
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
auto &logger = crimson::get_logger(ceph_subsys_osd);
static_assert(T::can_create());
logger.debug("{}: can_create", *op);
- auto core = get_osd_singleton_state().pg_to_shard_mapping.maybe_create_pg(
- op->get_pgid());
-
get_local_state().registry.remove_from_registry(*op);
- return with_remote_shard_state_and_op<T>(
- core, std::move(op),
- [](PerShardState &per_shard_state,
- ShardServices &shard_services,
- typename T::IRef op) {
+ return get_pg_to_shard_mapping().maybe_create_pg(
+ op->get_pgid()
+ ).then([this, op = std::move(op)](auto core) mutable {
+ return this->template with_remote_shard_state_and_op<T>(
+ core, std::move(op),
+ [](PerShardState &per_shard_state,
+ ShardServices &shard_services,
+ typename T::IRef op) {
per_shard_state.registry.add_to_registry(*op);
auto &logger = crimson::get_logger(ceph_subsys_osd);
auto &opref = *op;
auto &&trigger) {
return shard_services.get_or_create_pg(
std::move(trigger),
- opref.get_pgid(), opref.get_epoch(),
+ opref.get_pgid(),
std::move(opref.get_create_info())
);
}).safe_then([&logger, &shard_services, &opref](Ref<PG> pgref) {
})
).then([op=std::move(op)] {});
});
+ });
}
/// Runs opref on the appropriate core, waiting for pg as necessary
typename T::IRef op
) {
ceph_assert(op->use_count() == 1);
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
auto &logger = crimson::get_logger(ceph_subsys_osd);
static_assert(!T::can_create());
logger.debug("{}: !can_create", *op);
- auto core = get_osd_singleton_state().pg_to_shard_mapping.maybe_create_pg(
- op->get_pgid());
-
get_local_state().registry.remove_from_registry(*op);
- return with_remote_shard_state_and_op<T>(
- core, std::move(op),
- [](PerShardState &per_shard_state,
- ShardServices &shard_services,
- typename T::IRef op) {
+ return get_pg_to_shard_mapping().maybe_create_pg(
+ op->get_pgid()
+ ).then([this, op = std::move(op)](auto core) mutable {
+ return this->template with_remote_shard_state_and_op<T>(
+ core, std::move(op),
+ [](PerShardState &per_shard_state,
+ ShardServices &shard_services,
+ typename T::IRef op) {
per_shard_state.registry.add_to_registry(*op);
auto &logger = crimson::get_logger(ceph_subsys_osd);
auto &opref = *op;
})
).then([op=std::move(op)] {});
});
+ });
}
seastar::future<> load_pgs(crimson::os::FuturizedStore& store);
*/
template <typename F>
void for_each_pgid(F &&f) const {
- return get_osd_singleton_state().pg_to_shard_mapping.for_each_pgid(
+ return get_pg_to_shard_mapping().for_each_pgid(
std::forward<F>(f));
}
auto get_num_pgs() const {
- return get_osd_singleton_state().pg_to_shard_mapping.get_num_pgs();
+ return get_pg_to_shard_mapping().get_num_pgs();
}
seastar::future<> broadcast_map_to_pgs(epoch_t epoch);
template <typename F>
auto with_pg(spg_t pgid, F &&f) {
- core_id_t core = get_osd_singleton_state(
- ).pg_to_shard_mapping.get_pg_mapping(pgid);
+ core_id_t core = get_pg_to_shard_mapping().get_pg_mapping(pgid);
return with_remote_shard_state(
core,
[pgid, f=std::move(f)](auto &local_state, auto &local_service) mutable {
template <typename T, typename... Args>
auto start_pg_operation(Args&&... args) {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
auto op = get_local_state().registry.create_operation<T>(
std::forward<Args>(args)...);
auto &logger = crimson::get_logger(ceph_subsys_osd);
auto fut = opref.template enter_stage<>(
opref.get_connection_pipeline().await_active
).then([this, &opref, &logger] {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
logger.debug("{}: start_pg_operation in await_active stage", opref);
- return get_osd_singleton_state().osd_state.when_active();
+ return get_shard_services().local_state.osd_state.when_active();
}).then([&logger, &opref] {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
logger.debug("{}: start_pg_operation active, entering await_map", opref);
return opref.template enter_stage<>(
opref.get_connection_pipeline().await_map);
}).then([this, &logger, &opref] {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
logger.debug("{}: start_pg_operation await_map stage", opref);
using OSDMapBlockingEvent =
OSD_OSDMapGate::OSDMapBlocker::BlockingEvent;
return opref.template with_blocking_event<OSDMapBlockingEvent>(
[this, &opref](auto &&trigger) {
std::ignore = this;
- return get_osd_singleton_state().osdmap_gate.wait_for_map(
- std::move(trigger),
- opref.get_epoch(),
- &get_shard_services());
- });
+ return get_shard_services().local_state.osdmap_gate.wait_for_map(
+ std::move(trigger),
+ opref.get_epoch(),
+ &get_shard_services());
+ });
}).then([&logger, &opref](auto epoch) {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
logger.debug("{}: got map {}, entering get_pg", opref, epoch);
return opref.template enter_stage<>(
opref.get_connection_pipeline().get_pg);
}).then([this, &logger, &opref, op=std::move(op)]() mutable {
logger.debug("{}: in get_pg core {}", opref, seastar::this_shard_id());
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
logger.debug("{}: in get_pg", opref);
if constexpr (T::can_create()) {
logger.debug("{}: can_create", opref);