#include "messages/MOSDMap.h"
#include "messages/MOSDMarkMeDown.h"
#include "messages/MOSDOp.h"
-#include "messages/MOSDPGLog.h"
-#include "messages/MOSDPGPull.h"
-#include "messages/MOSDPGPush.h"
-#include "messages/MOSDPGPushReply.h"
-#include "messages/MOSDPGRecoveryDelete.h"
-#include "messages/MOSDPGRecoveryDeleteReply.h"
+#include "messages/MOSDPeeringOp.h"
#include "messages/MOSDRepOpReply.h"
#include "messages/MOSDScrub2.h"
#include "messages/MPGStats.h"
#include "crimson/admin/osd_admin.h"
#include "crimson/admin/pg_commands.h"
+#include "crimson/common/buffer_io.h"
#include "crimson/common/exception.h"
#include "crimson/mon/MonClient.h"
#include "crimson/net/Connection.h"
static constexpr int TICK_INTERVAL = 1;
}
+using std::make_unique;
+using std::map;
+using std::pair;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
using crimson::common::local_conf;
using crimson::os::FuturizedStore;
namespace crimson::osd {
OSD::OSD(int id, uint32_t nonce,
+ crimson::os::FuturizedStore& store,
crimson::net::MessengerRef cluster_msgr,
crimson::net::MessengerRef public_msgr,
crimson::net::MessengerRef hb_front_msgr,
public_msgr{public_msgr},
monc{new crimson::mon::Client{*public_msgr, *this}},
mgrc{new crimson::mgr::Client{*public_msgr, *this}},
- store{crimson::os::FuturizedStore::create(
- local_conf().get_val<std::string>("osd_objectstore"),
- local_conf().get_val<std::string>("osd_data"),
- local_conf().get_config_values())},
- shard_services{*this, whoami, *cluster_msgr, *public_msgr, *monc, *mgrc, *store},
+ store{store},
+ shard_services{*this, whoami, *cluster_msgr, *public_msgr, *monc, *mgrc, store},
heartbeat{new Heartbeat{whoami, shard_services, *monc, hb_front_msgr, hb_back_msgr}},
// do this in background
tick_timer{[this] {
update_stats();
}},
asok{seastar::make_lw_shared<crimson::admin::AdminSocket>()},
- osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services)))
+ osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services))),
+ log_client(cluster_msgr.get(), LogClient::NO_FLAGS),
+ clog(log_client.create_channel())
{
osdmaps[0] = boost::make_local_shared<OSDMap>();
for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr),
__func__, cpp_strerror(r));
}
}
+ logger().info("{}: nonce is {}", __func__, nonce);
+ monc->set_log_client(&log_client);
+ clog->set_log_to_monitors(true);
}
OSD::~OSD() = default;
seastar::future<> OSD::mkfs(uuid_d osd_uuid, uuid_d cluster_fsid)
{
- return store->start().then([this, osd_uuid] {
- return store->mkfs(osd_uuid);
+ return store.start().then([this, osd_uuid] {
+ return store.mkfs(osd_uuid).handle_error(
+ crimson::stateful_ec::handle([] (const auto& ec) {
+ logger().error("error creating empty object store in {}: ({}) {}",
+ local_conf().get_val<std::string>("osd_data"),
+ ec.value(), ec.message());
+ std::exit(EXIT_FAILURE);
+ }));
}).then([this] {
- return store->mount();
+ return store.mount().handle_error(
+ crimson::stateful_ec::handle([] (const auto& ec) {
+ logger().error("error mounting object store in {}: ({}) {}",
+ local_conf().get_val<std::string>("osd_data"),
+ ec.value(), ec.message());
+ std::exit(EXIT_FAILURE);
+ }));
}).then([cluster_fsid, this] {
superblock.cluster_fsid = cluster_fsid;
- superblock.osd_fsid = store->get_fsid();
+ superblock.osd_fsid = store.get_fsid();
superblock.whoami = whoami;
superblock.compat_features = get_osd_initial_compat_set();
-
- logger().info(
- "{} writing superblock cluster_fsid {} osd_fsid {}",
- __func__,
- cluster_fsid,
- superblock.osd_fsid);
- return store->create_new_collection(coll_t::meta());
- }).then([this] (auto ch) {
- meta_coll = make_unique<OSDMeta>(ch , store.get());
- ceph::os::Transaction t;
- meta_coll->create(t);
- meta_coll->store_superblock(t, superblock);
- return store->do_transaction(meta_coll->collection(), std::move(t));
+ return _write_superblock();
+ }).then([cluster_fsid, this] {
+ return store.write_meta("ceph_fsid", cluster_fsid.to_string());
+ }).then([this] {
+ return store.write_meta("magic", CEPH_OSD_ONDISK_MAGIC);
+ }).then([this] {
+ return store.write_meta("whoami", std::to_string(whoami));
+ }).then([this] {
+ return _write_key_meta();
+ }).then([this] {
+ return store.write_meta("ready", "ready");
}).then([cluster_fsid, this] {
- return when_all_succeed(
- store->write_meta("ceph_fsid", cluster_fsid.to_string()),
- store->write_meta("whoami", std::to_string(whoami)));
- }).then_unpack([cluster_fsid, this] {
fmt::print("created object store {} for osd.{} fsid {}\n",
local_conf().get_val<std::string>("osd_data"),
whoami, cluster_fsid);
});
}
+seastar::future<> OSD::_write_superblock()
+{
+ return store.open_collection(coll_t::meta()).then([this] (auto ch) {
+ if (ch) {
+ // if we already have superblock, check if it matches
+ meta_coll = make_unique<OSDMeta>(ch, store);
+ return meta_coll->load_superblock().then([this](OSDSuperblock&& sb) {
+ if (sb.cluster_fsid != superblock.cluster_fsid) {
+ logger().error("provided cluster fsid {} != superblock's {}",
+ sb.cluster_fsid, superblock.cluster_fsid);
+ throw std::invalid_argument("mismatched fsid");
+ }
+ if (sb.whoami != superblock.whoami) {
+ logger().error("provided osd id {} != superblock's {}",
+ sb.whoami, superblock.whoami);
+ throw std::invalid_argument("mismatched osd id");
+ }
+ });
+ } else {
+ // meta collection does not yet, create superblock
+ logger().info(
+ "{} writing superblock cluster_fsid {} osd_fsid {}",
+ "_write_superblock",
+ superblock.cluster_fsid,
+ superblock.osd_fsid);
+ return store.create_new_collection(coll_t::meta()).then([this] (auto ch) {
+ meta_coll = make_unique<OSDMeta>(ch, store);
+ ceph::os::Transaction t;
+ meta_coll->create(t);
+ meta_coll->store_superblock(t, superblock);
+ logger().debug("OSD::_write_superblock: do_transaction...");
+ return store.do_transaction(meta_coll->collection(), std::move(t));
+ });
+ }
+ });
+}
+
+// this `to_string` sits in the `crimson::osd` namespace, so we don't brake
+// the language rule on not overloading in `std::`.
+static std::string to_string(const seastar::temporary_buffer<char>& temp_buf)
+{
+ return {temp_buf.get(), temp_buf.size()};
+}
+
+seastar::future<> OSD::_write_key_meta()
+{
+
+ if (auto key = local_conf().get_val<std::string>("key"); !std::empty(key)) {
+ return store.write_meta("osd_key", key);
+ } else if (auto keyfile = local_conf().get_val<std::string>("keyfile");
+ !std::empty(keyfile)) {
+ return read_file(keyfile).then([this] (const auto& temp_buf) {
+ // it's on a truly cold path, so don't worry about memcpy.
+ return store.write_meta("osd_key", to_string(temp_buf));
+ }).handle_exception([keyfile] (auto ep) {
+ logger().error("_write_key_meta: failed to handle keyfile {}: {}",
+ keyfile, ep);
+ ceph_abort();
+ });
+ } else {
+ return seastar::now();
+ }
+}
+
namespace {
entity_addrvec_t pick_addresses(int what) {
entity_addrvec_t addrs;
crimson::common::CephContext cct;
- if (int r = ::pick_addresses(&cct, what, &addrs, -1); r < 0) {
+ // we're interested solely in v2; crimson doesn't do v1
+ const auto flags = what | CEPH_PICK_ADDRESS_MSGR2;
+ if (int r = ::pick_addresses(&cct, flags, &addrs, -1); r < 0) {
throw std::runtime_error("failed to pick address");
}
for (auto addr : addrs.v) {
startup_time = ceph::mono_clock::now();
- return store->start().then([this] {
- return store->mount();
+ return store.start().then([this] {
+ return store.mount().handle_error(
+ crimson::stateful_ec::handle([] (const auto& ec) {
+ logger().error("error mounting object store in {}: ({}) {}",
+ local_conf().get_val<std::string>("osd_data"),
+ ec.value(), ec.message());
+ std::exit(EXIT_FAILURE);
+ }));
}).then([this] {
- return store->open_collection(coll_t::meta());
+ return store.open_collection(coll_t::meta());
}).then([this](auto ch) {
- meta_coll = make_unique<OSDMeta>(ch, store.get());
+ meta_coll = make_unique<OSDMeta>(ch, store);
return meta_coll->load_superblock();
}).then([this](OSDSuperblock&& sb) {
superblock = std::move(sb);
shard_services.update_map(map);
osdmap_gate.got_map(map->get_epoch());
osdmap = std::move(map);
+ bind_epoch = osdmap->get_epoch();
return load_pgs();
}).then([this] {
crimson::net::dispatchers_t dispatchers{this, monc.get(), mgrc.get()};
return seastar::when_all_succeed(
- cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
- local_conf()->ms_bind_port_min,
- local_conf()->ms_bind_port_max)
+ cluster_msgr->bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER))
.safe_then([this, dispatchers]() mutable {
return cluster_msgr->start(dispatchers);
}, crimson::net::Messenger::bind_ertr::all_same_way(
[] (const std::error_code& e) {
- logger().error("cluster messenger try_bind(): address range is unavailable.");
+ logger().error("cluster messenger bind(): {}", e);
ceph_abort();
})),
- public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
- local_conf()->ms_bind_port_min,
- local_conf()->ms_bind_port_max)
+ public_msgr->bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC))
.safe_then([this, dispatchers]() mutable {
return public_msgr->start(dispatchers);
}, crimson::net::Messenger::bind_ertr::all_same_way(
[] (const std::error_code& e) {
- logger().error("public messenger try_bind(): address range is unavailable.");
+ logger().error("public messenger bind(): {}", e);
ceph_abort();
})));
}).then_unpack([this] {
if (auto [addrs, changed] =
replace_unknown_addrs(cluster_msgr->get_myaddrs(),
public_msgr->get_myaddrs()); changed) {
+ logger().debug("replacing unkwnown addrs of cluster messenger");
return cluster_msgr->set_myaddrs(addrs);
} else {
return seastar::now();
}
}).then([this] {
- return heartbeat->start(public_msgr->get_myaddrs(),
- cluster_msgr->get_myaddrs());
+ return heartbeat->start(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
+ pick_addresses(CEPH_PICK_ADDRESS_CLUSTER));
}).then([this] {
// create the admin-socket server, and the objects that register
// to handle incoming commands
return start_asok_admin();
+ }).then([this] {
+ return log_client.set_fsid(monc->get_fsid());
}).then([this] {
return start_boot();
});
{
state.set_booting();
- logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs());
- logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs());
- logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr());
- auto m = make_message<MOSDBoot>(superblock,
- osdmap->get_epoch(),
+ entity_addrvec_t public_addrs = public_msgr->get_myaddrs();
+ entity_addrvec_t cluster_addrs = cluster_msgr->get_myaddrs();
+ entity_addrvec_t hb_back_addrs = heartbeat->get_back_addrs();
+ entity_addrvec_t hb_front_addrs = heartbeat->get_front_addrs();
+ if (cluster_msgr->set_addr_unknowns(public_addrs)) {
+ cluster_addrs = cluster_msgr->get_myaddrs();
+ }
+ if (heartbeat->get_back_msgr()->set_addr_unknowns(cluster_addrs)) {
+ hb_back_addrs = heartbeat->get_back_addrs();
+ }
+ if (heartbeat->get_front_msgr()->set_addr_unknowns(public_addrs)) {
+ hb_front_addrs = heartbeat->get_front_addrs();
+ }
+ logger().info("hb_back_msgr: {}", hb_back_addrs);
+ logger().info("hb_front_msgr: {}", hb_front_addrs);
+ logger().info("cluster_msgr: {}", cluster_addrs);
+
+ auto m = crimson::make_message<MOSDBoot>(superblock,
osdmap->get_epoch(),
- heartbeat->get_back_addrs(),
- heartbeat->get_front_addrs(),
- cluster_msgr->get_myaddrs(),
+ boot_epoch,
+ hb_back_addrs,
+ hb_front_addrs,
+ cluster_addrs,
CEPH_FEATURES_ALL);
collect_sys_info(&m->metadata, NULL);
- return monc->send_message(m);
+ return monc->send_message(std::move(m));
}
seastar::future<> OSD::_add_me_to_crush()
w >= 0) {
return seastar::make_ready_future<double>(w);
} else {
- return store->stat().then([](auto st) {
+ return store.stat().then([](auto st) {
auto total = st.total;
return seastar::make_ready_future<double>(
std::max(.00001,
"weight": {:.4f},
"args": [{}]
}})", whoami, weight, loc);
- return monc->run_command({cmd}, {});
+ return monc->run_command(std::move(cmd), {});
}).then([](auto&& command_result) {
[[maybe_unused]] auto [code, message, out] = std::move(command_result);
if (code) {
auto asok_path = local_conf().get_val<std::string>("admin_socket");
using namespace crimson::admin;
return asok->start(asok_path).then([this] {
- return seastar::when_all_succeed(
- asok->register_admin_commands(),
- asok->register_command(make_asok_hook<OsdStatusHook>(std::as_const(*this))),
- asok->register_command(make_asok_hook<SendBeaconHook>(*this)),
- asok->register_command(make_asok_hook<FlushPgStatsHook>(*this)),
- asok->register_command(make_asok_hook<DumpPGStateHistory>(std::as_const(*this))),
- asok->register_command(make_asok_hook<SeastarMetricsHook>()),
- // PG commands
- asok->register_command(make_asok_hook<pg::QueryCommand>(*this)),
- asok->register_command(make_asok_hook<pg::MarkUnfoundLostCommand>(*this)));
- }).then_unpack([] {
- return seastar::now();
+ asok->register_admin_commands();
+ asok->register_command(make_asok_hook<OsdStatusHook>(std::as_const(*this)));
+ asok->register_command(make_asok_hook<SendBeaconHook>(*this));
+ asok->register_command(make_asok_hook<FlushPgStatsHook>(*this));
+ asok->register_command(make_asok_hook<DumpPGStateHistory>(std::as_const(*this)));
+ asok->register_command(make_asok_hook<DumpMetricsHook>());
+ asok->register_command(make_asok_hook<DumpPerfCountersHook>());
+ asok->register_command(make_asok_hook<InjectDataErrorHook>(get_shard_services()));
+ asok->register_command(make_asok_hook<InjectMDataErrorHook>(get_shard_services()));
+ // PG commands
+ asok->register_command(make_asok_hook<pg::QueryCommand>(*this));
+ asok->register_command(make_asok_hook<pg::MarkUnfoundLostCommand>(*this));
});
}
seastar::future<> OSD::stop()
{
logger().info("stop");
+ beacon_timer.cancel();
+ tick_timer.cancel();
// see also OSD::shutdown()
return prepare_to_stop().then([this] {
state.set_stopping();
return asok->stop().then([this] {
return heartbeat->stop();
}).then([this] {
- return store->umount();
+ return store.umount();
}).then([this] {
- return store->stop();
+ return store.stop();
}).then([this] {
return seastar::parallel_for_each(pg_map.get_pgs(),
[](auto& p) {
}).then([this] {
return when_all_succeed(
public_msgr->shutdown(),
- cluster_msgr->shutdown());
- }).then_unpack([] {
- return seastar::now();
+ cluster_msgr->shutdown()).discard_result();
}).handle_exception([](auto ep) {
logger().error("error while stopping osd: {}", ep);
});
seastar::future<> OSD::load_pgs()
{
- return store->list_collections().then([this](auto colls) {
+ return store.list_collections().then([this](auto colls) {
return seastar::parallel_for_each(colls, [this](auto coll) {
spg_t pgid;
if (coll.is_pg(&pgid)) {
auto get_collection = [pgid, do_create, this] {
const coll_t cid{pgid};
if (do_create) {
- return store->create_new_collection(cid);
+ return store.create_new_collection(cid);
} else {
- return store->open_collection(cid);
+ return store.open_collection(cid);
}
};
return seastar::when_all(
{
logger().debug("{}: {}", __func__, pgid);
- return seastar::do_with(PGMeta(store.get(), pgid), [] (auto& pg_meta) {
+ return seastar::do_with(PGMeta(store, pgid), [](auto& pg_meta) {
return pg_meta.get_epoch();
}).then([this](epoch_t e) {
return get_map(e);
}).then([pgid, this] (auto&& create_map) {
return make_pg(std::move(create_map), pgid, false);
}).then([this](Ref<PG> pg) {
- return pg->read_state(store.get()).then([pg] {
+ return pg->read_state(&store).then([pg] {
return seastar::make_ready_future<Ref<PG>>(std::move(pg));
});
}).handle_exception([pgid](auto ep) {
osd_stat.hb_peers = heartbeat->get_peers();
osd_stat.seq = (static_cast<uint64_t>(get_up_epoch()) << 32) | osd_stat_seq;
gate.dispatch_in_background("statfs", *this, [this] {
- (void) store->stat().then([this](store_statfs_t&& st) {
+ (void) store.stat().then([this](store_statfs_t&& st) {
osd_stat.statfs = st;
});
});
}
-MessageRef OSD::get_stats() const
+MessageURef OSD::get_stats() const
{
// todo: m-to-n: collect stats using map-reduce
// MPGStats::had_map_for is not used since PGMonitor was removed
- auto m = make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
+ auto m = crimson::make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
m->osd_stat = osd_stat;
for (auto [pgid, pg] : pg_map.get_pgs()) {
if (pg->is_primary()) {
auto [pg, startmap] = std::move(ret);
if (!pg)
return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
- PeeringCtx rctx{ceph_release_t::octopus};
const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool());
int up_primary, acting_primary;
int role = startmap->calc_pg_role(pg_shard_t(whoami, info->pgid.shard),
acting);
+ PeeringCtx rctx;
create_pg_collection(
rctx.transaction,
info->pgid,
acting_primary,
info->history,
info->past_intervals,
- false,
rctx.transaction);
return shard_services.start_operation<PGAdvanceMap>(
superblock.clean_thru = last;
}
meta_coll->store_superblock(t, superblock);
- return store->do_transaction(meta_coll->collection(), std::move(t));
+ logger().debug("OSD::handle_osd_map: do_transaction...");
+ return store.do_transaction(meta_coll->collection(), std::move(t));
});
}).then([=] {
// TODO: write to superblock and commit the transaction
}
});
}).then([m, this] {
- if (osdmap->is_up(whoami) &&
- osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
- bind_epoch < osdmap->get_up_from(whoami)) {
- if (state.is_booting()) {
+ if (osdmap->is_up(whoami)) {
+ const auto up_from = osdmap->get_up_from(whoami);
+ logger().info("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}",
+ whoami, osdmap->get_epoch(), up_from, bind_epoch, state);
+ if (bind_epoch < up_from &&
+ osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
+ state.is_booting()) {
logger().info("osd.{}: activating...", whoami);
state.set_active();
beacon_timer.arm_periodic(
tick_timer.arm_periodic(
std::chrono::seconds(TICK_INTERVAL));
}
- } else if (!osdmap->is_up(whoami)) {
+ } else {
if (state.is_prestop()) {
got_stop_ack();
return seastar::now();
}
}
- check_osdmap_features();
- // yay!
- return consume_map(osdmap->get_epoch());
+ return check_osdmap_features().then([this] {
+ // yay!
+ return consume_map(osdmap->get_epoch());
+ });
}).then([m, this] {
if (state.is_active()) {
logger().info("osd.{}: now active", whoami);
- if (!osdmap->exists(whoami)) {
+ if (!osdmap->exists(whoami) ||
+ osdmap->is_stop(whoami)) {
return shutdown();
}
if (should_restart()) {
if (first >= superblock.oldest_map) {
return load_map_bls(first, superblock.newest_map)
.then([this, conn, first](auto&& bls) {
- auto m = make_message<MOSDMap>(monc->get_fsid(),
+ auto m = crimson::make_message<MOSDMap>(monc->get_fsid(),
osdmap->get_encoding_features());
m->oldest_map = first;
m->newest_map = superblock.newest_map;
m->maps = std::move(bls);
- return conn->send(m);
+ return conn->send(std::move(m));
});
} else {
return load_map_bl(osdmap->get_epoch())
.then([this, conn](auto&& bl) mutable {
- auto m = make_message<MOSDMap>(monc->get_fsid(),
+ auto m = crimson::make_message<MOSDMap>(monc->get_fsid(),
osdmap->get_encoding_features());
m->oldest_map = superblock.oldest_map;
m->newest_map = superblock.newest_map;
m->maps.emplace(osdmap->get_epoch(), std::move(bl));
- return conn->send(m);
+ return conn->send(std::move(m));
});
}
}
// FIXME: min lec should be calculated from pg_stat
// and should set m->pgs
epoch_t min_last_epoch_clean = osdmap->get_epoch();
- auto m = make_message<MOSDBeacon>(osdmap->get_epoch(),
+ auto m = crimson::make_message<MOSDBeacon>(osdmap->get_epoch(),
min_last_epoch_clean,
superblock.last_purged_snaps_scrub,
local_conf()->osd_beacon_report_interval);
- return monc->send_message(m);
+ return monc->send_message(std::move(m));
}
void OSD::update_heartbeat_peers()
return seastar::now();
}
-void OSD::check_osdmap_features()
+seastar::future<> OSD::check_osdmap_features()
{
heartbeat->set_require_authorizer(true);
+ return store.write_meta("require_osd_release",
+ stringify((int)osdmap->require_osd_release));
}
seastar::future<> OSD::consume_map(epoch_t epoch)
return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) {
return shard_services.start_operation<PGAdvanceMap>(
*this, pg.second, pg.second->get_osdmap_epoch(), epoch,
- PeeringCtx{ceph_release_t::octopus}, false).second;
+ PeeringCtx{}, false).second;
}).then([epoch, this] {
osdmap_gate.got_map(epoch);
return seastar::make_ready_future();
return seastar::with_timeout(
seastar::timer<>::clock::now() + timeout,
monc->send_message(
- make_message<MOSDMarkMeDown>(
+ crimson::make_message<MOSDMarkMeDown>(
monc->get_fsid(),
whoami,
osdmap->get_addrs(whoami),