#include <boost/smart_ptr/make_local_shared.hpp>
#include <fmt/format.h>
#include <fmt/ostream.h>
+#include <seastar/core/timer.hh>
#include "common/pick_address.h"
#include "include/util.h"
-#include "messages/MOSDAlive.h"
+#include "messages/MCommand.h"
#include "messages/MOSDBeacon.h"
#include "messages/MOSDBoot.h"
#include "messages/MOSDMap.h"
+#include "messages/MOSDMarkMeDown.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDPGLog.h"
+#include "messages/MOSDPGPull.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPushReply.h"
+#include "messages/MOSDPGRecoveryDelete.h"
+#include "messages/MOSDPGRecoveryDeleteReply.h"
#include "messages/MOSDRepOpReply.h"
+#include "messages/MOSDScrub2.h"
#include "messages/MPGStats.h"
#include "os/Transaction.h"
#include "osd/ClassHandler.h"
+#include "osd/OSDCap.h"
#include "osd/PGPeeringEvent.h"
#include "osd/PeeringState.h"
+#include "crimson/admin/osd_admin.h"
+#include "crimson/admin/pg_commands.h"
+#include "crimson/common/exception.h"
#include "crimson/mon/MonClient.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Messenger.h"
-#include "crimson/os/cyanstore/cyan_object.h"
#include "crimson/os/futurized_collection.h"
#include "crimson/os/futurized_store.h"
#include "crimson/osd/heartbeat.h"
#include "crimson/osd/osd_operations/compound_peering_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/osd_operations/pg_advance_map.h"
+#include "crimson/osd/osd_operations/recovery_subrequest.h"
#include "crimson/osd/osd_operations/replicated_request.h"
namespace {
local_conf().get_val<std::string>("osd_objectstore"),
local_conf().get_val<std::string>("osd_data"),
local_conf().get_config_values())},
- shard_services{*this, *cluster_msgr, *public_msgr, *monc, *mgrc, *store},
- heartbeat{new Heartbeat{shard_services, *monc, hb_front_msgr, hb_back_msgr}},
+ shard_services{*this, whoami, *cluster_msgr, *public_msgr, *monc, *mgrc, *store},
+ heartbeat{new Heartbeat{whoami, shard_services, *monc, hb_front_msgr, hb_back_msgr}},
// do this in background
- heartbeat_timer{[this] { (void)update_heartbeat_peers(); }},
+ tick_timer{[this] {
+ update_heartbeat_peers();
+ update_stats();
+ }},
asok{seastar::make_lw_shared<crimson::admin::AdminSocket>()},
osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services)))
{
return when_all_succeed(
store->write_meta("ceph_fsid", cluster_fsid.to_string()),
store->write_meta("whoami", std::to_string(whoami)));
- }).then([cluster_fsid, this] {
+ }).then_unpack([cluster_fsid, this] {
fmt::print("created object store {} for osd.{} fsid {}\n",
local_conf().get_val<std::string>("osd_data"),
whoami, cluster_fsid);
cluster_msgr->set_policy(entity_name_t::TYPE_CLIENT,
SocketPolicy::stateless_server(0));
- dispatchers.push_front(this);
- dispatchers.push_front(monc.get());
- dispatchers.push_front(mgrc.get());
+ crimson::net::dispatchers_t dispatchers{this, monc.get(), mgrc.get()};
return seastar::when_all_succeed(
cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max)
- .then([this] { return cluster_msgr->start(&dispatchers); }),
+ .safe_then([this, dispatchers]() mutable {
+ return cluster_msgr->start(dispatchers);
+ }, crimson::net::Messenger::bind_ertr::all_same_way(
+ [] (const std::error_code& e) {
+ logger().error("cluster messenger try_bind(): address range is unavailable.");
+ ceph_abort();
+ })),
public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max)
- .then([this] { return public_msgr->start(&dispatchers); }));
- }).then([this] {
+ .safe_then([this, dispatchers]() mutable {
+ return public_msgr->start(dispatchers);
+ }, crimson::net::Messenger::bind_ertr::all_same_way(
+ [] (const std::error_code& e) {
+ logger().error("public messenger try_bind(): address range is unavailable.");
+ ceph_abort();
+ })));
+ }).then_unpack([this] {
return seastar::when_all_succeed(monc->start(),
mgrc->start());
- }).then([this] {
+ }).then_unpack([this] {
return _add_me_to_crush();
}).then([this] {
monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0);
seastar::future<> OSD::start_boot()
{
state.set_preboot();
- return monc->get_version("osdmap").then([this](version_t newest, version_t oldest) {
+ return monc->get_version("osdmap").then([this](auto&& ret) {
+ auto [newest, oldest] = ret;
return _preboot(oldest, newest);
});
}
{
logger().info("osd.{}: _preboot", whoami);
if (osdmap->get_epoch() == 0) {
- logger().warn("waiting for initial osdmap");
+ logger().info("waiting for initial osdmap");
} else if (osdmap->is_destroyed(whoami)) {
logger().warn("osdmap says I am destroyed");
// provide a small margin so we don't livelock seeing if we
"args": [{}]
}})", whoami, weight, loc);
return monc->run_command({cmd}, {});
- }).then([](int32_t code, string message, bufferlist) {
+ }).then([](auto&& command_result) {
+ [[maybe_unused]] auto [code, message, out] = std::move(command_result);
if (code) {
logger().warn("fail to add to crush: {} ({})", message, code);
throw std::runtime_error("fail to add to crush");
});
}
-seastar::future<> OSD::_send_alive()
+seastar::future<> OSD::handle_command(crimson::net::ConnectionRef conn,
+ Ref<MCommand> m)
{
- auto want = osdmap->get_epoch();
- logger().info(
- "{} want {} up_thru_wanted {}",
- __func__,
- want,
- up_thru_wanted);
- if (!osdmap->exists(whoami)) {
- logger().warn("{} DNE", __func__);
- return seastar::now();
- } else if (want <= up_thru_wanted) {
- logger().debug("{} {} <= {}", __func__, want, up_thru_wanted);
- return seastar::now();
- } else {
- up_thru_wanted = want;
- auto m = make_message<MOSDAlive>(osdmap->get_epoch(), want);
- return monc->send_message(std::move(m));
- }
+ return asok->handle_command(conn, std::move(m));
}
/*
return asok->start(asok_path).then([this] {
return seastar::when_all_succeed(
asok->register_admin_commands(),
- asok->register_command(make_asok_hook<OsdStatusHook>(*this)),
+ asok->register_command(make_asok_hook<OsdStatusHook>(std::as_const(*this))),
asok->register_command(make_asok_hook<SendBeaconHook>(*this)),
- asok->register_command(make_asok_hook<ConfigShowHook>()),
- asok->register_command(make_asok_hook<ConfigGetHook>()),
- asok->register_command(make_asok_hook<ConfigSetHook>()));
+ asok->register_command(make_asok_hook<FlushPgStatsHook>(*this)),
+ asok->register_command(make_asok_hook<DumpPGStateHistory>(std::as_const(*this))),
+ asok->register_command(make_asok_hook<SeastarMetricsHook>()),
+ // PG commands
+ asok->register_command(make_asok_hook<pg::QueryCommand>(*this)),
+ asok->register_command(make_asok_hook<pg::MarkUnfoundLostCommand>(*this)));
+ }).then_unpack([] {
+ return seastar::now();
});
}
{
logger().info("stop");
// see also OSD::shutdown()
- state.set_stopping();
-
- return gate.close().then([this] {
- return asok->stop();
- }).then([this] {
- return heartbeat->stop();
- }).then([this] {
- return monc->stop();
- }).then([this] {
- return when_all_succeed(
- public_msgr->shutdown(),
- cluster_msgr->shutdown());
- }).then([this] {
- return store->umount();
- }).then([this] {
- return store->stop();
- }).handle_exception([](auto ep) {
- logger().error("error while stopping osd: {}", ep);
+ return prepare_to_stop().then([this] {
+ state.set_stopping();
+ logger().debug("prepared to stop");
+ public_msgr->stop();
+ cluster_msgr->stop();
+ auto gate_close_fut = gate.close();
+ return asok->stop().then([this] {
+ return heartbeat->stop();
+ }).then([this] {
+ return store->umount();
+ }).then([this] {
+ return store->stop();
+ }).then([this] {
+ return seastar::parallel_for_each(pg_map.get_pgs(),
+ [](auto& p) {
+ return p.second->stop();
+ });
+ }).then([this] {
+ return monc->stop();
+ }).then([this] {
+ return mgrc->stop();
+ }).then([fut=std::move(gate_close_fut)]() mutable {
+ return std::move(fut);
+ }).then([this] {
+ return when_all_succeed(
+ public_msgr->shutdown(),
+ cluster_msgr->shutdown());
+ }).then_unpack([] {
+ return seastar::now();
+ }).handle_exception([](auto ep) {
+ logger().error("error while stopping osd: {}", ep);
+ });
});
}
f->dump_unsigned("num_pgs", pg_map.get_pgs().size());
}
+void OSD::dump_pg_state_history(Formatter* f) const
+{
+ f->open_array_section("pgs");
+ for (auto [pgid, pg] : pg_map.get_pgs()) {
+ f->open_object_section("pg");
+ f->dump_stream("pg") << pgid;
+ const auto& peering_state = pg->get_peering_state();
+ f->dump_string("currently", peering_state.get_current_state());
+ peering_state.dump_history(f);
+ f->close_section();
+ }
+ f->close_section();
+}
+
+void OSD::print(std::ostream& out) const
+{
+ out << "{osd." << superblock.whoami << " "
+ << superblock.osd_fsid << " [" << superblock.oldest_map
+ << "," << superblock.newest_map << "] " << pg_map.get_pgs().size()
+ << " pgs}";
+}
+
seastar::future<> OSD::load_pgs()
{
return store->list_collections().then([this](auto colls) {
if (pi.is_erasure()) {
ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile);
}
- return seastar::make_ready_future<pg_pool_t, string, ec_profile_t>(
- std::move(pi),
- std::move(name),
- std::move(ec_profile));
+ return seastar::make_ready_future<std::tuple<pg_pool_t, string, ec_profile_t>>(
+ std::make_tuple(std::move(pi),
+ std::move(name),
+ std::move(ec_profile)));
} else {
// pool was deleted; grab final pg_pool_t off disk.
return meta_coll->load_final_pool_info(pgid.pool());
return store->open_collection(cid);
}
};
- return seastar::when_all_succeed(
+ return seastar::when_all(
std::move(get_pool_info),
std::move(get_collection)
- ).then([pgid, create_map, this] (auto info,
- auto coll) {
- auto [pool, name, ec_profile] = std::move(info);
+ ).then([pgid, create_map, this] (auto&& ret) {
+ auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get0());
+ auto coll = std::move(std::get<1>(ret).get0());
return seastar::make_ready_future<Ref<PG>>(
new PG{pgid,
pg_shard_t{whoami, pgid.shard},
seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
{
- return PGMeta{store.get(), pgid}.get_epoch().then([this](epoch_t e) {
+ logger().debug("{}: {}", __func__, pgid);
+
+ return seastar::do_with(PGMeta(store.get(), pgid), [] (auto& pg_meta) {
+ return pg_meta.get_epoch();
+ }).then([this](epoch_t e) {
return get_map(e);
}).then([pgid, this] (auto&& create_map) {
return make_pg(std::move(create_map), pgid, false);
- }).then([this, pgid](Ref<PG> pg) {
+ }).then([this](Ref<PG> pg) {
return pg->read_state(store.get()).then([pg] {
- return seastar::make_ready_future<Ref<PG>>(std::move(pg));
+ return seastar::make_ready_future<Ref<PG>>(std::move(pg));
});
}).handle_exception([pgid](auto ep) {
logger().info("pg {} saw exception on load {}", pgid, ep);
});
}
-seastar::future<> OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
+std::optional<seastar::future<>>
+OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
{
if (state.is_stopping()) {
- return seastar::now();
- }
-
- switch (m->get_type()) {
- case CEPH_MSG_OSD_MAP:
- return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
- case CEPH_MSG_OSD_OP:
- return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
- case MSG_OSD_PG_CREATE2:
- shard_services.start_operation<CompoundPeeringRequest>(
- *this,
- conn->get_shared(),
- m);
- return seastar::now();
- case MSG_OSD_PG_LEASE:
- [[fallthrough]];
- case MSG_OSD_PG_LEASE_ACK:
- [[fallthrough]];
- case MSG_OSD_PG_NOTIFY2:
- [[fallthrough]];
- case MSG_OSD_PG_INFO2:
- [[fallthrough]];
- case MSG_OSD_PG_QUERY2:
- [[fallthrough]];
- case MSG_OSD_PG_LOG:
- return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
- case MSG_OSD_REPOP:
- return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
- case MSG_OSD_REPOPREPLY:
- return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
- default:
- logger().info("{} unhandled message {}", __func__, *m);
- return seastar::now();
- }
-}
-
-seastar::future<> OSD::ms_handle_connect(crimson::net::ConnectionRef conn)
-{
- if (conn->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
- return seastar::now();
- } else {
- return seastar::now();
+ return {};
}
+ bool dispatched = true;
+ gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
+ switch (m->get_type()) {
+ case CEPH_MSG_OSD_MAP:
+ return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
+ case CEPH_MSG_OSD_OP:
+ return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
+ case MSG_OSD_PG_CREATE2:
+ shard_services.start_operation<CompoundPeeringRequest>(
+ *this,
+ conn,
+ m);
+ return seastar::now();
+ case MSG_COMMAND:
+ return handle_command(conn, boost::static_pointer_cast<MCommand>(m));
+ case MSG_OSD_MARK_ME_DOWN:
+ return handle_mark_me_down(conn, boost::static_pointer_cast<MOSDMarkMeDown>(m));
+ case MSG_OSD_PG_PULL:
+ [[fallthrough]];
+ case MSG_OSD_PG_PUSH:
+ [[fallthrough]];
+ case MSG_OSD_PG_PUSH_REPLY:
+ [[fallthrough]];
+ case MSG_OSD_PG_RECOVERY_DELETE:
+ [[fallthrough]];
+ case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+ [[fallthrough]];
+ case MSG_OSD_PG_SCAN:
+ [[fallthrough]];
+ case MSG_OSD_PG_BACKFILL:
+ [[fallthrough]];
+ case MSG_OSD_PG_BACKFILL_REMOVE:
+ return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m));
+ case MSG_OSD_PG_LEASE:
+ [[fallthrough]];
+ case MSG_OSD_PG_LEASE_ACK:
+ [[fallthrough]];
+ case MSG_OSD_PG_NOTIFY2:
+ [[fallthrough]];
+ case MSG_OSD_PG_INFO2:
+ [[fallthrough]];
+ case MSG_OSD_PG_QUERY2:
+ [[fallthrough]];
+ case MSG_OSD_BACKFILL_RESERVE:
+ [[fallthrough]];
+ case MSG_OSD_RECOVERY_RESERVE:
+ [[fallthrough]];
+ case MSG_OSD_PG_LOG:
+ return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
+ case MSG_OSD_REPOP:
+ return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
+ case MSG_OSD_REPOPREPLY:
+ return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
+ case MSG_OSD_SCRUB2:
+ return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
+ default:
+ dispatched = false;
+ return seastar::now();
+ }
+ });
+ return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
}
-seastar::future<> OSD::ms_handle_reset(crimson::net::ConnectionRef conn)
+void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
{
// TODO: cleanup the session attached to this connection
logger().warn("ms_handle_reset");
- return seastar::now();
}
-seastar::future<> OSD::ms_handle_remote_reset(crimson::net::ConnectionRef conn)
+void OSD::ms_handle_remote_reset(crimson::net::ConnectionRef conn)
{
logger().warn("ms_handle_remote_reset");
- return seastar::now();
}
void OSD::handle_authentication(const EntityName& name,
- const AuthCapsInfo& caps)
+ const AuthCapsInfo& caps_info)
+{
+ // TODO: store the parsed cap and associate it with the connection
+ if (caps_info.allow_all) {
+ logger().debug("{} {} has all caps", __func__, name);
+ return;
+ }
+ if (caps_info.caps.length() > 0) {
+ auto p = caps_info.caps.cbegin();
+ string str;
+ try {
+ decode(str, p);
+ } catch (ceph::buffer::error& e) {
+ logger().warn("{} {} failed to decode caps string", __func__, name);
+ return;
+ }
+ OSDCap caps;
+ if (caps.parse(str)) {
+ logger().debug("{} {} has caps {}", __func__, name, str);
+ } else {
+ logger().warn("{} {} failed to parse caps {}", __func__, name, str);
+ }
+ }
+}
+
+void OSD::update_stats()
{
- // todo
+ osd_stat_seq++;
+ osd_stat.up_from = get_up_epoch();
+ osd_stat.hb_peers = heartbeat->get_peers();
+ osd_stat.seq = (static_cast<uint64_t>(get_up_epoch()) << 32) | osd_stat_seq;
+ gate.dispatch_in_background("statfs", *this, [this] {
+ (void) store->stat().then([this](store_statfs_t&& st) {
+ osd_stat.statfs = st;
+ });
+ });
}
-MessageRef OSD::get_stats()
+MessageRef OSD::get_stats() const
{
// todo: m-to-n: collect stats using map-reduce
// MPGStats::had_map_for is not used since PGMonitor was removed
auto m = make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
-
+ m->osd_stat = osd_stat;
for (auto [pgid, pg] : pg_map.get_pgs()) {
if (pg->is_primary()) {
auto stats = pg->get_stats();
return m;
}
+uint64_t OSD::send_pg_stats()
+{
+ // mgr client sends the report message in background
+ mgrc->report();
+ return osd_stat.seq;
+}
+
OSD::cached_map_t OSD::get_map() const
{
return osdmap;
}
}
+seastar::future<std::map<epoch_t, bufferlist>> OSD::load_map_bls(
+ epoch_t first,
+ epoch_t last)
+{
+ return seastar::map_reduce(boost::make_counting_iterator<epoch_t>(first),
+ boost::make_counting_iterator<epoch_t>(last + 1),
+ [this](epoch_t e) {
+ return load_map_bl(e).then([e](auto&& bl) {
+ return seastar::make_ready_future<pair<epoch_t, bufferlist>>(
+ std::make_pair(e, std::move(bl)));
+ });
+ },
+ std::map<epoch_t, bufferlist>{},
+ [](auto&& bls, auto&& epoch_bl) {
+ bls.emplace(std::move(epoch_bl));
+ return std::move(bls);
+ });
+}
+
seastar::future<std::unique_ptr<OSDMap>> OSD::load_map(epoch_t e)
{
auto o = std::make_unique<OSDMap>();
[this](auto &info) -> seastar::future<Ref<PG>> {
return get_map(info->epoch).then(
[&info, this](cached_map_t startmap) ->
- seastar::future<Ref<PG>, cached_map_t> {
+ seastar::future<std::tuple<Ref<PG>, cached_map_t>> {
const spg_t &pgid = info->pgid;
if (info->by_mon) {
int64_t pool_id = pgid.pgid.pool();
"{} ignoring pgid {}, pool dne",
__func__,
pgid);
- return seastar::make_ready_future<Ref<PG>, cached_map_t>(
- Ref<PG>(),
- startmap);
+ return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
+ std::make_tuple(Ref<PG>(), startmap));
}
ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus);
if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) {
"{} dropping {} create, pool does not have CREATING flag set",
__func__,
pgid);
- return seastar::make_ready_future<Ref<PG>, cached_map_t>(
- Ref<PG>(),
- startmap);
+ return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
+ std::make_tuple(Ref<PG>(), startmap));
}
}
return make_pg(startmap, pgid, true).then(
[startmap=std::move(startmap)](auto pg) mutable {
- return seastar::make_ready_future<Ref<PG>, cached_map_t>(
- std::move(pg),
- std::move(startmap));
+ return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
+ std::make_tuple(std::move(pg), std::move(startmap)));
});
- }).then([this, &info](auto pg, auto startmap) ->
+ }).then([this, &info](auto&& ret) ->
seastar::future<Ref<PG>> {
+ auto [pg, startmap] = std::move(ret);
if (!pg)
return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
PeeringCtx rctx{ceph_release_t::octopus};
return shard_services.start_operation<PGAdvanceMap>(
*this, pg, pg->get_osdmap_epoch(),
- osdmap->get_epoch(), std::move(rctx), true).second.then([pg] {
+ osdmap->get_epoch(), std::move(rctx), true).second.then([pg=pg] {
return seastar::make_ready_future<Ref<PG>>(pg);
});
});
});
}
-seastar::future<> OSD::handle_osd_map(crimson::net::Connection* conn,
+seastar::future<> OSD::handle_osd_map(crimson::net::ConnectionRef conn,
Ref<MOSDMap> m)
{
logger().info("handle_osd_map {}", *m);
state.set_active();
beacon_timer.arm_periodic(
std::chrono::seconds(local_conf()->osd_beacon_report_interval));
- heartbeat_timer.arm_periodic(
+ tick_timer.arm_periodic(
std::chrono::seconds(TICK_INTERVAL));
}
+ } else if (!osdmap->is_up(whoami)) {
+ if (state.is_prestop()) {
+ got_stop_ack();
+ return seastar::now();
+ }
}
check_osdmap_features();
// yay!
});
}
-seastar::future<> OSD::handle_osd_op(crimson::net::Connection* conn,
+seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn,
Ref<MOSDOp> m)
{
- shard_services.start_operation<ClientRequest>(
+ (void) shard_services.start_operation<ClientRequest>(
*this,
- conn->get_shared(),
+ conn,
std::move(m));
return seastar::now();
}
-seastar::future<> OSD::handle_rep_op(crimson::net::Connection* conn,
+seastar::future<> OSD::send_incremental_map(crimson::net::ConnectionRef conn,
+ epoch_t first)
+{
+ if (first >= superblock.oldest_map) {
+ return load_map_bls(first, superblock.newest_map)
+ .then([this, conn, first](auto&& bls) {
+ auto m = make_message<MOSDMap>(monc->get_fsid(),
+ osdmap->get_encoding_features());
+ m->oldest_map = first;
+ m->newest_map = superblock.newest_map;
+ m->maps = std::move(bls);
+ return conn->send(m);
+ });
+ } else {
+ return load_map_bl(osdmap->get_epoch())
+ .then([this, conn](auto&& bl) mutable {
+ auto m = make_message<MOSDMap>(monc->get_fsid(),
+ osdmap->get_encoding_features());
+ m->oldest_map = superblock.oldest_map;
+ m->newest_map = superblock.newest_map;
+ m->maps.emplace(osdmap->get_epoch(), std::move(bl));
+ return conn->send(m);
+ });
+ }
+}
+
+seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn,
Ref<MOSDRepOp> m)
{
m->finish_decode();
- shard_services.start_operation<RepRequest>(
+ (void) shard_services.start_operation<RepRequest>(
*this,
- conn->get_shared(),
+ std::move(conn),
std::move(m));
return seastar::now();
}
-seastar::future<> OSD::handle_rep_op_reply(crimson::net::Connection* conn,
+seastar::future<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn,
Ref<MOSDRepOpReply> m)
{
const auto& pgs = pg_map.get_pgs();
return seastar::now();
}
+seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn,
+ Ref<MOSDScrub2> m)
+{
+ if (m->fsid != superblock.cluster_fsid) {
+ logger().warn("fsid mismatched");
+ return seastar::now();
+ }
+ return seastar::parallel_for_each(std::move(m->scrub_pgs),
+ [m, conn, this](spg_t pgid) {
+ pg_shard_t from_shard{static_cast<int>(m->get_source().num()),
+ pgid.shard};
+ PeeringState::RequestScrub scrub_request{m->deep, m->repair};
+ return shard_services.start_operation<RemotePeeringEvent>(
+ *this,
+ conn,
+ shard_services,
+ from_shard,
+ pgid,
+ PGPeeringEvent{m->epoch, m->epoch, scrub_request}).second;
+ });
+}
+
+seastar::future<> OSD::handle_mark_me_down(crimson::net::ConnectionRef conn,
+ Ref<MOSDMarkMeDown> m)
+{
+ if (state.is_prestop()) {
+ got_stop_ack();
+ }
+ return seastar::now();
+}
+
+seastar::future<> OSD::handle_recovery_subreq(crimson::net::ConnectionRef conn,
+ Ref<MOSDFastDispatchOp> m)
+{
+ (void) shard_services.start_operation<RecoverySubRequest>(
+ *this,
+ conn,
+ std::move(m));
+ return seastar::now();
+}
+
bool OSD::should_restart() const
{
if (!osdmap->is_up(whoami)) {
seastar::future<> OSD::restart()
{
beacon_timer.cancel();
- heartbeat_timer.cancel();
+ tick_timer.cancel();
up_epoch = 0;
bind_epoch = osdmap->get_epoch();
// TODO: promote to shutdown if being marked down for multiple times
epoch_t min_last_epoch_clean = osdmap->get_epoch();
auto m = make_message<MOSDBeacon>(osdmap->get_epoch(),
min_last_epoch_clean,
- superblock.last_purged_snaps_scrub);
+ superblock.last_purged_snaps_scrub,
+ local_conf()->osd_beacon_report_interval);
return monc->send_message(m);
}
-seastar::future<> OSD::update_heartbeat_peers()
+void OSD::update_heartbeat_peers()
{
if (!state.is_active()) {
- return seastar::now();
+ return;
}
for (auto& pg : pg_map.get_pgs()) {
vector<int> up, acting;
}
}
}
- return heartbeat->update_peers(whoami);
+ heartbeat->update_peers(whoami);
}
seastar::future<> OSD::handle_peering_op(
- crimson::net::Connection* conn,
+ crimson::net::ConnectionRef conn,
Ref<MOSDPeeringOp> m)
{
const int from = m->get_source().num();
logger().debug("handle_peering_op on {} from {}", m->get_spg(), from);
- shard_services.start_operation<RemotePeeringEvent>(
+ std::unique_ptr<PGPeeringEvent> evt(m->get_event());
+ (void) shard_services.start_operation<RemotePeeringEvent>(
*this,
- conn->get_shared(),
+ conn,
shard_services,
pg_shard_t{from, m->get_spg().shard},
m->get_spg(),
- std::move(*m->get_event()));
+ std::move(*evt));
return seastar::now();
}
epoch_t epoch,
std::unique_ptr<PGCreateInfo> info)
{
- auto [fut, creating] = pg_map.get_pg(pgid, bool(info));
- if (!creating && info) {
- pg_map.set_creating(pgid);
- (void)handle_pg_create_info(std::move(info));
+ if (info) {
+ auto [fut, creating] = pg_map.wait_for_pg(pgid);
+ if (!creating) {
+ pg_map.set_creating(pgid);
+ (void)handle_pg_create_info(std::move(info));
+ }
+ return std::move(fut);
+ } else {
+ return make_ready_blocking_future<Ref<PG>>(pg_map.get_pg(pgid));
}
- return std::move(fut);
}
blocking_future<Ref<PG>> OSD::wait_for_pg(
spg_t pgid)
{
- return pg_map.get_pg(pgid).first;
+ return pg_map.wait_for_pg(pgid).first;
+}
+
+Ref<PG> OSD::get_pg(spg_t pgid)
+{
+ return pg_map.get_pg(pgid);
+}
+
+seastar::future<> OSD::prepare_to_stop()
+{
+ if (osdmap && osdmap->is_up(whoami)) {
+ state.set_prestop();
+ const auto timeout =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::duration<double>(
+ local_conf().get_val<double>("osd_mon_shutdown_timeout")));
+
+ return seastar::with_timeout(
+ seastar::timer<>::clock::now() + timeout,
+ monc->send_message(
+ make_message<MOSDMarkMeDown>(
+ monc->get_fsid(),
+ whoami,
+ osdmap->get_addrs(whoami),
+ osdmap->get_epoch(),
+ true)).then([this] {
+ return stop_acked.get_future();
+ })
+ ).handle_exception_type(
+ [](seastar::timed_out_error&) {
+ return seastar::now();
+ });
+ }
+ return seastar::now();
}
}