monc{new crimson::mon::Client{*public_msgr, *this}},
mgrc{new crimson::mgr::Client{*public_msgr, *this}},
store{store},
+ pg_shard_manager{osd_singleton_state,
+ shard_services,
+ pg_to_shard_mappings},
// do this in background -- continuation rearms timer when complete
tick_timer{[this] {
std::ignore = update_heartbeat_peers(
log_client(cluster_msgr.get(), LogClient::NO_FLAGS),
clog(log_client.create_channel())
{
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr),
std::ref(hb_front_msgr), std::ref(hb_back_msgr)}) {
msgr.get()->set_auth_server(monc.get());
seastar::future<> OSD::open_meta_coll()
{
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return store.get_sharded_store().open_collection(
coll_t::meta()
).then([this](auto ch) {
logger().info("start");
startup_time = ceph::mono_clock::now();
-
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return store.start().then([this] {
- return pg_shard_manager.start(
- whoami, *cluster_msgr,
- *public_msgr, *monc, *mgrc, store);
+ return pg_to_shard_mappings.start(0, seastar::smp::count
+ ).then([this] {
+ return osd_singleton_state.start_single(
+ whoami, std::ref(*cluster_msgr), std::ref(*public_msgr),
+ std::ref(*monc), std::ref(*mgrc));
+ }).then([this] {
+ return osd_states.start();
+ }).then([this] {
+ ceph::mono_time startup_time = ceph::mono_clock::now();
+ return shard_services.start(
+ std::ref(osd_singleton_state),
+ std::ref(pg_to_shard_mappings),
+ whoami,
+ startup_time,
+ osd_singleton_state.local().perf,
+ osd_singleton_state.local().recoverystate_perf,
+ std::ref(store),
+ std::ref(osd_states));
+ });
}).then([this] {
heartbeat.reset(new Heartbeat{
whoami, get_shard_services(),
osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(map));
return pg_shard_manager.update_map(std::move(map));
}).then([this] {
- pg_shard_manager.got_map(osdmap->get_epoch());
+ return shard_services.invoke_on_all([this](auto &local_service) {
+ local_service.local_state.osdmap_gate.got_map(osdmap->get_epoch());
+ });
+ }).then([this] {
bind_epoch = osdmap->get_epoch();
return pg_shard_manager.load_pgs(store);
}).then([this] {
-
uint64_t osd_required =
CEPH_FEATURE_UID |
CEPH_FEATURE_PGID64 |
});
}
-seastar::future<> OSD::handle_command(crimson::net::ConnectionRef conn,
- Ref<MCommand> m)
+seastar::future<> OSD::handle_command(
+ crimson::net::ConnectionRef conn,
+ Ref<MCommand> m)
{
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return asok->handle_command(conn, std::move(m));
}
tick_timer.cancel();
// see also OSD::shutdown()
return prepare_to_stop().then([this] {
- pg_shard_manager.set_stopping();
+ return pg_shard_manager.set_stopping();
+ }).then([this] {
logger().debug("prepared to stop");
public_msgr->stop();
cluster_msgr->stop();
}).then([this] {
return mgrc->stop();
}).then([this] {
- return pg_shard_manager.stop();
+ return shard_services.stop();
+ }).then([this] {
+ return osd_states.stop();
+ }).then([this] {
+ return osd_singleton_state.stop();
+ }).then([this] {
+ return pg_to_shard_mappings.stop();
}).then([fut=std::move(gate_close_fut)]() mutable {
return std::move(fut);
}).then([this] {
OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
{
if (pg_shard_manager.is_stopping()) {
- return {};
+ return seastar::now();
}
- // XXX: we're assuming the `switch` part is executed immediately, and thus
- // we won't smash the stack. Taking into account how `seastar::with_gate`
- // is currently implemented, this seems to be the case (Summer 2022).
- bool dispatched = true;
- gate.dispatch_in_background(__func__, *this, [this, conn=std::move(conn),
- m=std::move(m), &dispatched] {
+ auto maybe_ret = do_ms_dispatch(conn, std::move(m));
+ if (!maybe_ret.has_value()) {
+ return std::nullopt;
+ }
+
+ gate.dispatch_in_background(
+ __func__, *this, [ret=std::move(maybe_ret.value())]() mutable {
+ return std::move(ret);
+ });
+ return seastar::now();
+}
+
+std::optional<seastar::future<>>
+OSD::do_ms_dispatch(
+ crimson::net::ConnectionRef conn,
+ MessageRef m)
+{
+ if (seastar::this_shard_id() != PRIMARY_CORE) {
switch (m->get_type()) {
case CEPH_MSG_OSD_MAP:
- return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
- case CEPH_MSG_OSD_OP:
- return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
- case MSG_OSD_PG_CREATE2:
- return handle_pg_create(
- conn, boost::static_pointer_cast<MOSDPGCreate2>(m));
- return seastar::now();
case MSG_COMMAND:
- return handle_command(conn, boost::static_pointer_cast<MCommand>(m));
case MSG_OSD_MARK_ME_DOWN:
- return handle_mark_me_down(conn, boost::static_pointer_cast<MOSDMarkMeDown>(m));
- case MSG_OSD_PG_PULL:
- [[fallthrough]];
- case MSG_OSD_PG_PUSH:
- [[fallthrough]];
- case MSG_OSD_PG_PUSH_REPLY:
- [[fallthrough]];
- case MSG_OSD_PG_RECOVERY_DELETE:
- [[fallthrough]];
- case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
- [[fallthrough]];
- case MSG_OSD_PG_SCAN:
- [[fallthrough]];
- case MSG_OSD_PG_BACKFILL:
- [[fallthrough]];
- case MSG_OSD_PG_BACKFILL_REMOVE:
- return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m));
- case MSG_OSD_PG_LEASE:
- [[fallthrough]];
- case MSG_OSD_PG_LEASE_ACK:
- [[fallthrough]];
- case MSG_OSD_PG_NOTIFY2:
- [[fallthrough]];
- case MSG_OSD_PG_INFO2:
- [[fallthrough]];
- case MSG_OSD_PG_QUERY2:
- [[fallthrough]];
- case MSG_OSD_BACKFILL_RESERVE:
- [[fallthrough]];
- case MSG_OSD_RECOVERY_RESERVE:
- [[fallthrough]];
- case MSG_OSD_PG_LOG:
- return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
- case MSG_OSD_REPOP:
- return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
- case MSG_OSD_REPOPREPLY:
- return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
- case MSG_OSD_SCRUB2:
- return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
- case MSG_OSD_PG_UPDATE_LOG_MISSING:
- return handle_update_log_missing(conn, boost::static_pointer_cast<
- MOSDPGUpdateLogMissing>(m));
- case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
- return handle_update_log_missing_reply(conn, boost::static_pointer_cast<
- MOSDPGUpdateLogMissingReply>(m));
- default:
- dispatched = false;
- return seastar::now();
+ // FIXME: order is not guaranteed in this path
+ return conn.get_foreign(
+ ).then([this, m=std::move(m)](auto f_conn) {
+ return seastar::smp::submit_to(PRIMARY_CORE,
+ [f_conn=std::move(f_conn), m=std::move(m), this]() mutable {
+ auto conn = make_local_shared_foreign(std::move(f_conn));
+ auto ret = do_ms_dispatch(conn, std::move(m));
+ assert(ret.has_value());
+ return std::move(ret.value());
+ });
+ });
}
- });
- return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
+ }
+
+ switch (m->get_type()) {
+ case CEPH_MSG_OSD_MAP:
+ return handle_osd_map(boost::static_pointer_cast<MOSDMap>(m));
+ case CEPH_MSG_OSD_OP:
+ return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
+ case MSG_OSD_PG_CREATE2:
+ return handle_pg_create(
+ conn, boost::static_pointer_cast<MOSDPGCreate2>(m));
+ return seastar::now();
+ case MSG_COMMAND:
+ return handle_command(conn, boost::static_pointer_cast<MCommand>(m));
+ case MSG_OSD_MARK_ME_DOWN:
+ return handle_mark_me_down(conn, boost::static_pointer_cast<MOSDMarkMeDown>(m));
+ case MSG_OSD_PG_PULL:
+ [[fallthrough]];
+ case MSG_OSD_PG_PUSH:
+ [[fallthrough]];
+ case MSG_OSD_PG_PUSH_REPLY:
+ [[fallthrough]];
+ case MSG_OSD_PG_RECOVERY_DELETE:
+ [[fallthrough]];
+ case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+ [[fallthrough]];
+ case MSG_OSD_PG_SCAN:
+ [[fallthrough]];
+ case MSG_OSD_PG_BACKFILL:
+ [[fallthrough]];
+ case MSG_OSD_PG_BACKFILL_REMOVE:
+ return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m));
+ case MSG_OSD_PG_LEASE:
+ [[fallthrough]];
+ case MSG_OSD_PG_LEASE_ACK:
+ [[fallthrough]];
+ case MSG_OSD_PG_NOTIFY2:
+ [[fallthrough]];
+ case MSG_OSD_PG_INFO2:
+ [[fallthrough]];
+ case MSG_OSD_PG_QUERY2:
+ [[fallthrough]];
+ case MSG_OSD_BACKFILL_RESERVE:
+ [[fallthrough]];
+ case MSG_OSD_RECOVERY_RESERVE:
+ [[fallthrough]];
+ case MSG_OSD_PG_LOG:
+ return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
+ case MSG_OSD_REPOP:
+ return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
+ case MSG_OSD_REPOPREPLY:
+ return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
+ case MSG_OSD_SCRUB2:
+ return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
+ case MSG_OSD_PG_UPDATE_LOG_MISSING:
+ return handle_update_log_missing(conn, boost::static_pointer_cast<
+ MOSDPGUpdateLogMissing>(m));
+ case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
+ return handle_update_log_missing_reply(conn, boost::static_pointer_cast<
+ MOSDPGUpdateLogMissingReply>(m));
+ default:
+ return std::nullopt;
+ }
}
void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
return osd_stat.seq;
}
-bool OSD::require_mon_peer(crimson::net::Connection *conn, Ref<Message> m)
+seastar::future<> OSD::handle_osd_map(Ref<MOSDMap> m)
{
- if (!conn->peer_is_mon()) {
- logger().info("{} received from non-mon {}, {}",
- __func__,
- conn->get_peer_addr(),
- *m);
- return false;
- }
- return true;
+ /* Ensure that only one MOSDMap is processed at a time. Allowing concurrent
+ * processing may eventually be worthwhile, but such an implementation would
+ * need to ensure (among other things)
+ * 1. any particular map is only processed once
+ * 2. PGAdvanceMap operations are processed in order for each PG
+ * As map handling is not presently a bottleneck, we stick to this
+ * simpler invariant for now.
+ * See https://tracker.ceph.com/issues/59165
+ */
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return handle_osd_map_lock.lock().then([this, m] {
+ return _handle_osd_map(m);
+ }).finally([this] {
+ return handle_osd_map_lock.unlock();
+ });
}
-seastar::future<> OSD::handle_osd_map(crimson::net::ConnectionRef conn,
- Ref<MOSDMap> m)
+seastar::future<> OSD::_handle_osd_map(Ref<MOSDMap> m)
{
logger().info("handle_osd_map {}", *m);
if (m->fsid != superblock.cluster_fsid) {
});
}
-seastar::future<> OSD::committed_osd_maps(version_t first,
- version_t last,
- Ref<MOSDMap> m)
+seastar::future<> OSD::committed_osd_maps(
+ version_t first,
+ version_t last,
+ Ref<MOSDMap> m)
{
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last);
// advance through the new maps
return seastar::do_for_each(boost::make_counting_iterator(first),
}
});
}).then([m, this] {
+ auto fut = seastar::now();
if (osdmap->is_up(whoami)) {
const auto up_from = osdmap->get_up_from(whoami);
logger().info("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}",
osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
pg_shard_manager.is_booting()) {
logger().info("osd.{}: activating...", whoami);
- pg_shard_manager.set_active();
- beacon_timer.arm_periodic(
- std::chrono::seconds(local_conf()->osd_beacon_report_interval));
- // timer continuation rearms when complete
- tick_timer.arm(
- std::chrono::seconds(TICK_INTERVAL));
+ fut = pg_shard_manager.set_active().then([this] {
+ beacon_timer.arm_periodic(
+ std::chrono::seconds(local_conf()->osd_beacon_report_interval));
+ // timer continuation rearms when complete
+ tick_timer.arm(
+ std::chrono::seconds(TICK_INTERVAL));
+ });
}
} else {
if (pg_shard_manager.is_prestop()) {
return seastar::now();
}
}
- return check_osdmap_features().then([this] {
- // yay!
- return pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch());
+ return fut.then([this] {
+ return check_osdmap_features().then([this] {
+ // yay!
+ logger().info("osd.{}: committed_osd_maps: broadcasting osdmaps up"
+ " to {} epoch to pgs", whoami, osdmap->get_epoch());
+ return pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch());
+ });
});
}).then([m, this] {
if (pg_shard_manager.is_active()) {
});
}
-seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn,
- Ref<MOSDOp> m)
+seastar::future<> OSD::handle_osd_op(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDOp> m)
{
- (void) pg_shard_manager.start_pg_operation<ClientRequest>(
+ return pg_shard_manager.start_pg_operation<ClientRequest>(
get_shard_services(),
conn,
- std::move(m));
- return seastar::now();
+ std::move(m)).second;
}
-seastar::future<> OSD::handle_pg_create(crimson::net::ConnectionRef conn,
- Ref<MOSDPGCreate2> m)
+seastar::future<> OSD::handle_pg_create(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDPGCreate2> m)
{
- for (auto& [pgid, when] : m->pgs) {
+ return seastar::do_for_each(m->pgs, [this, conn, m](auto& pg) {
+ auto& [pgid, when] = pg;
const auto &[created, created_stamp] = when;
auto q = m->pg_extra.find(pgid);
ceph_assert(q != m->pg_extra.end());
"unmatched past_intervals {} (history {})",
pgid, m->epoch,
pi, history);
+ return seastar::now();
} else {
- std::ignore = pg_shard_manager.start_pg_operation<RemotePeeringEvent>(
+ return pg_shard_manager.start_pg_operation<RemotePeeringEvent>(
conn,
pg_shard_t(),
pgid,
m->epoch,
NullEvt(),
true,
- new PGCreateInfo(pgid, m->epoch, history, pi, true));
+ new PGCreateInfo(pgid, m->epoch, history, pi, true)).second;
}
- }
- return seastar::now();
+ });
}
seastar::future<> OSD::handle_update_log_missing(
Ref<MOSDPGUpdateLogMissing> m)
{
m->decode_payload();
- (void) pg_shard_manager.start_pg_operation<LogMissingRequest>(
+ return pg_shard_manager.start_pg_operation<LogMissingRequest>(
std::move(conn),
- std::move(m));
- return seastar::now();
+ std::move(m)).second;
}
seastar::future<> OSD::handle_update_log_missing_reply(
Ref<MOSDPGUpdateLogMissingReply> m)
{
m->decode_payload();
- (void) pg_shard_manager.start_pg_operation<LogMissingRequestReply>(
+ return pg_shard_manager.start_pg_operation<LogMissingRequestReply>(
std::move(conn),
- std::move(m));
- return seastar::now();
+ std::move(m)).second;
}
-seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn,
- Ref<MOSDRepOp> m)
+seastar::future<> OSD::handle_rep_op(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDRepOp> m)
{
m->finish_decode();
- std::ignore = pg_shard_manager.start_pg_operation<RepRequest>(
+ return pg_shard_manager.start_pg_operation<RepRequest>(
std::move(conn),
- std::move(m));
- return seastar::now();
+ std::move(m)).second;
}
-seastar::future<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn,
- Ref<MOSDRepOpReply> m)
+seastar::future<> OSD::handle_rep_op_reply(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDRepOpReply> m)
{
spg_t pgid = m->get_spg();
return pg_shard_manager.with_pg(
});
}
-seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn,
- Ref<MOSDScrub2> m)
+seastar::future<> OSD::handle_scrub(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDScrub2> m)
{
if (m->fsid != superblock.cluster_fsid) {
logger().warn("fsid mismatched");
});
}
-seastar::future<> OSD::handle_mark_me_down(crimson::net::ConnectionRef conn,
- Ref<MOSDMarkMeDown> m)
+seastar::future<> OSD::handle_mark_me_down(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDMarkMeDown> m)
{
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
if (pg_shard_manager.is_prestop()) {
got_stop_ack();
}
return seastar::now();
}
-seastar::future<> OSD::handle_recovery_subreq(crimson::net::ConnectionRef conn,
- Ref<MOSDFastDispatchOp> m)
+seastar::future<> OSD::handle_recovery_subreq(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDFastDispatchOp> m)
{
- std::ignore = pg_shard_manager.start_pg_operation<RecoverySubRequest>(
- conn, std::move(m));
- return seastar::now();
+ return pg_shard_manager.start_pg_operation<RecoverySubRequest>(
+ conn, std::move(m)).second;
}
bool OSD::should_restart() const
logger().debug("handle_peering_op on {} from {}", m->get_spg(), from);
m->set_features(conn->get_features());
std::unique_ptr<PGPeeringEvent> evt(m->get_event());
- (void) pg_shard_manager.start_pg_operation<RemotePeeringEvent>(
+ return pg_shard_manager.start_pg_operation<RemotePeeringEvent>(
conn,
pg_shard_t{from, m->get_spg().shard},
m->get_spg(),
- std::move(*evt));
- return seastar::now();
+ std::move(*evt)).second;
}
seastar::future<> OSD::check_osdmap_features()
{
- return store.write_meta("require_osd_release",
- stringify((int)osdmap->require_osd_release));
+ assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return store.write_meta(
+ "require_osd_release",
+ stringify((int)osdmap->require_osd_release));
}
seastar::future<> OSD::prepare_to_stop()