+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
#include "osd.h"
+#include <sys/utsname.h>
+
+#include <boost/iterator/counting_iterator.hpp>
#include <boost/range/join.hpp>
#include <boost/smart_ptr/make_local_shared.hpp>
+#include <fmt/format.h>
+#include <fmt/ostream.h>
#include "common/pick_address.h"
+#include "include/util.h"
+
+#include "messages/MOSDAlive.h"
#include "messages/MOSDBeacon.h"
#include "messages/MOSDBoot.h"
#include "messages/MOSDMap.h"
+#include "messages/MOSDOp.h"
+#include "messages/MOSDPGLog.h"
+#include "messages/MOSDRepOpReply.h"
+#include "messages/MPGStats.h"
+
+#include "os/Transaction.h"
+#include "osd/ClassHandler.h"
+#include "osd/PGPeeringEvent.h"
+#include "osd/PeeringState.h"
+
+#include "crimson/mon/MonClient.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Messenger.h"
-#include "crimson/os/cyan_collection.h"
-#include "crimson/os/cyan_object.h"
-#include "crimson/os/cyan_store.h"
-#include "crimson/os/Transaction.h"
+#include "crimson/os/cyanstore/cyan_object.h"
+#include "crimson/os/futurized_collection.h"
+#include "crimson/os/futurized_store.h"
#include "crimson/osd/heartbeat.h"
#include "crimson/osd/osd_meta.h"
#include "crimson/osd/pg.h"
+#include "crimson/osd/pg_backend.h"
#include "crimson/osd/pg_meta.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_operations/compound_peering_request.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_operations/pg_advance_map.h"
+#include "crimson/osd/osd_operations/replicated_request.h"
namespace {
seastar::logger& logger() {
- return ceph::get_logger(ceph_subsys_osd);
- }
-
- template<typename Message, typename... Args>
- Ref<Message> make_message(Args&&... args)
- {
- return {new Message{std::forward<Args>(args)...}, false};
+ return crimson::get_logger(ceph_subsys_osd);
}
static constexpr int TICK_INTERVAL = 1;
}
-using ceph::common::local_conf;
-using ceph::os::CyanStore;
+using crimson::common::local_conf;
+using crimson::os::FuturizedStore;
+
+namespace crimson::osd {
-OSD::OSD(int id, uint32_t nonce)
+OSD::OSD(int id, uint32_t nonce,
+ crimson::net::MessengerRef cluster_msgr,
+ crimson::net::MessengerRef public_msgr,
+ crimson::net::MessengerRef hb_front_msgr,
+ crimson::net::MessengerRef hb_back_msgr)
: whoami{id},
nonce{nonce},
- beacon_timer{[this] { send_beacon(); }},
- heartbeat_timer{[this] { update_heartbeat_peers(); }}
+ // do this in background
+ beacon_timer{[this] { (void)send_beacon(); }},
+ cluster_msgr{cluster_msgr},
+ public_msgr{public_msgr},
+ monc{new crimson::mon::Client{*public_msgr, *this}},
+ mgrc{new crimson::mgr::Client{*public_msgr, *this}},
+ store{crimson::os::FuturizedStore::create(
+ local_conf().get_val<std::string>("osd_objectstore"),
+ local_conf().get_val<std::string>("osd_data"),
+ local_conf().get_config_values())},
+ shard_services{*this, *cluster_msgr, *public_msgr, *monc, *mgrc, *store},
+ heartbeat{new Heartbeat{shard_services, *monc, hb_front_msgr, hb_back_msgr}},
+ // do this in background
+ heartbeat_timer{[this] { (void)update_heartbeat_peers(); }},
+ asok{seastar::make_lw_shared<crimson::admin::AdminSocket>()},
+ osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services)))
{
osdmaps[0] = boost::make_local_shared<OSDMap>();
+ for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr),
+ std::ref(hb_front_msgr), std::ref(hb_back_msgr)}) {
+ msgr.get()->set_auth_server(monc.get());
+ msgr.get()->set_auth_client(monc.get());
+ }
+
+ if (local_conf()->osd_open_classes_on_start) {
+ const int r = ClassHandler::get_instance().open_all_classes();
+ if (r) {
+ logger().warn("{} warning: got an error loading one or more classes: {}",
+ __func__, cpp_strerror(r));
+ }
+ }
}
OSD::~OSD() = default;
}
}
-seastar::future<> OSD::mkfs(uuid_d cluster_fsid)
+seastar::future<> OSD::mkfs(uuid_d osd_uuid, uuid_d cluster_fsid)
{
- const auto data_path = local_conf().get_val<std::string>("osd_data");
- store = std::make_unique<ceph::os::CyanStore>(data_path);
- uuid_d osd_fsid;
- osd_fsid.generate_random();
- return store->mkfs(osd_fsid).then([this] {
+ return store->start().then([this, osd_uuid] {
+ return store->mkfs(osd_uuid);
+ }).then([this] {
return store->mount();
- }).then([cluster_fsid, osd_fsid, this] {
+ }).then([cluster_fsid, this] {
superblock.cluster_fsid = cluster_fsid;
- superblock.osd_fsid = osd_fsid;
+ superblock.osd_fsid = store->get_fsid();
superblock.whoami = whoami;
superblock.compat_features = get_osd_initial_compat_set();
- meta_coll = make_unique<OSDMeta>(
- store->create_new_collection(coll_t::meta()), store.get());
+ logger().info(
+ "{} writing superblock cluster_fsid {} osd_fsid {}",
+ __func__,
+ cluster_fsid,
+ superblock.osd_fsid);
+ return store->create_new_collection(coll_t::meta());
+ }).then([this] (auto ch) {
+ meta_coll = make_unique<OSDMeta>(ch , store.get());
ceph::os::Transaction t;
meta_coll->create(t);
meta_coll->store_superblock(t, superblock);
return store->do_transaction(meta_coll->collection(), std::move(t));
}).then([cluster_fsid, this] {
- store->write_meta("ceph_fsid", cluster_fsid.to_string());
- store->write_meta("whoami", std::to_string(whoami));
+ return when_all_succeed(
+ store->write_meta("ceph_fsid", cluster_fsid.to_string()),
+ store->write_meta("whoami", std::to_string(whoami)));
+ }).then([cluster_fsid, this] {
+ fmt::print("created object store {} for osd.{} fsid {}\n",
+ local_conf().get_val<std::string>("osd_data"),
+ whoami, cluster_fsid);
return seastar::now();
});
}
namespace {
entity_addrvec_t pick_addresses(int what) {
entity_addrvec_t addrs;
- CephContext cct;
+ crimson::common::CephContext cct;
if (int r = ::pick_addresses(&cct, what, &addrs, -1); r < 0) {
throw std::runtime_error("failed to pick address");
}
- // TODO: v2: ::pick_addresses() returns v2 addresses, but crimson-msgr does
- // not support v2 yet. remove following set_type() once v2 support is ready.
for (auto addr : addrs.v) {
- addr.set_type(addr.TYPE_LEGACY);
logger().info("picked address {}", addr);
}
return addrs;
{
logger().info("start");
- return seastar::when_all_succeed(
- ceph::net::Messenger::create(entity_name_t::OSD(whoami),
- "cluster",
- nonce,
- seastar::engine().cpu_id())
- .then([this] (auto msgr) { cluster_msgr = msgr; }),
- ceph::net::Messenger::create(entity_name_t::OSD(whoami),
- "client",
- nonce,
- seastar::engine().cpu_id())
- .then([this] (auto msgr) { public_msgr = msgr; }))
- .then([this] {
- monc.reset(new ceph::mon::Client{*public_msgr});
- heartbeat.reset(new Heartbeat{whoami, nonce, *this, *monc});
-
- for (auto msgr : {cluster_msgr, public_msgr}) {
- if (local_conf()->ms_crc_data) {
- msgr->set_crc_data();
- }
- if (local_conf()->ms_crc_header) {
- msgr->set_crc_header();
- }
- }
- dispatchers.push_front(this);
- dispatchers.push_front(monc.get());
+ startup_time = ceph::mono_clock::now();
- const auto data_path = local_conf().get_val<std::string>("osd_data");
- store = std::make_unique<ceph::os::CyanStore>(data_path);
+ return store->start().then([this] {
return store->mount();
}).then([this] {
- meta_coll = make_unique<OSDMeta>(store->open_collection(coll_t::meta()),
- store.get());
+ return store->open_collection(coll_t::meta());
+ }).then([this](auto ch) {
+ meta_coll = make_unique<OSDMeta>(ch, store.get());
return meta_coll->load_superblock();
}).then([this](OSDSuperblock&& sb) {
superblock = std::move(sb);
return get_map(superblock.current_epoch);
}).then([this](cached_map_t&& map) {
+ shard_services.update_map(map);
+ osdmap_gate.got_map(map->get_epoch());
osdmap = std::move(map);
return load_pgs();
}).then([this] {
+
+ uint64_t osd_required =
+ CEPH_FEATURE_UID |
+ CEPH_FEATURE_PGID64 |
+ CEPH_FEATURE_OSDENC;
+ using crimson::net::SocketPolicy;
+
+ public_msgr->set_default_policy(SocketPolicy::stateless_server(0));
+ public_msgr->set_policy(entity_name_t::TYPE_MON,
+ SocketPolicy::lossy_client(osd_required));
+ public_msgr->set_policy(entity_name_t::TYPE_MGR,
+ SocketPolicy::lossy_client(osd_required));
+ public_msgr->set_policy(entity_name_t::TYPE_OSD,
+ SocketPolicy::stateless_server(0));
+
+ cluster_msgr->set_default_policy(SocketPolicy::stateless_server(0));
+ cluster_msgr->set_policy(entity_name_t::TYPE_MON,
+ SocketPolicy::lossy_client(0));
+ cluster_msgr->set_policy(entity_name_t::TYPE_OSD,
+ SocketPolicy::lossless_peer(osd_required));
+ cluster_msgr->set_policy(entity_name_t::TYPE_CLIENT,
+ SocketPolicy::stateless_server(0));
+
+ dispatchers.push_front(this);
+ dispatchers.push_front(monc.get());
+ dispatchers.push_front(mgrc.get());
return seastar::when_all_succeed(
cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max)
.then([this] { return public_msgr->start(&dispatchers); }));
}).then([this] {
- return monc->start();
+ return seastar::when_all_succeed(monc->start(),
+ mgrc->start());
+ }).then([this] {
+ return _add_me_to_crush();
}).then([this] {
monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0);
monc->sub_want("mgrmap", 0, 0);
if (auto [addrs, changed] =
replace_unknown_addrs(cluster_msgr->get_myaddrs(),
public_msgr->get_myaddrs()); changed) {
- cluster_msgr->set_myaddrs(addrs);
+ return cluster_msgr->set_myaddrs(addrs);
+ } else {
+ return seastar::now();
}
+ }).then([this] {
return heartbeat->start(public_msgr->get_myaddrs(),
cluster_msgr->get_myaddrs());
+ }).then([this] {
+ // create the admin-socket server, and the objects that register
+ // to handle incoming commands
+ return start_asok_admin();
}).then([this] {
return start_boot();
});
logger().warn("osdmap NOUP flag is set, waiting for it to clear");
} else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
logger().error("osdmap SORTBITWISE OSDMap flag is NOT set; please set it");
- } else if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) {
- logger().error("osdmap require_osd_release < luminous; please upgrade to luminous");
+ } else if (osdmap->require_osd_release < ceph_release_t::octopus) {
+ logger().error("osdmap require_osd_release < octopus; please upgrade to octopus");
} else if (false) {
// TODO: update mon if current fullness state is different from osdmap
} else if (version_t n = local_conf()->osd_map_message_max;
}
// get all the latest maps
if (osdmap->get_epoch() + 1 >= oldest) {
- return osdmap_subscribe(osdmap->get_epoch() + 1, false);
+ return shard_services.osdmap_subscribe(osdmap->get_epoch() + 1, false);
} else {
- return osdmap_subscribe(oldest - 1, true);
+ return shard_services.osdmap_subscribe(oldest - 1, true);
}
}
heartbeat->get_front_addrs(),
cluster_msgr->get_myaddrs(),
CEPH_FEATURES_ALL);
+ collect_sys_info(&m->metadata, NULL);
return monc->send_message(m);
}
+seastar::future<> OSD::_add_me_to_crush()
+{
+ if (!local_conf().get_val<bool>("osd_crush_update_on_start")) {
+ return seastar::now();
+ }
+ auto get_weight = [this] {
+ if (auto w = local_conf().get_val<double>("osd_crush_initial_weight");
+ w >= 0) {
+ return seastar::make_ready_future<double>(w);
+ } else {
+ return store->stat().then([](auto st) {
+ auto total = st.total;
+ return seastar::make_ready_future<double>(
+ std::max(.00001,
+ double(total) / double(1ull << 40))); // TB
+ });
+ }
+ };
+ return get_weight().then([this](auto weight) {
+ const crimson::crush::CrushLocation loc{make_unique<CephContext>().get()};
+ logger().info("{} crush location is {}", __func__, loc);
+ string cmd = fmt::format(R"({{
+ "prefix": "osd crush create-or-move",
+ "id": {},
+ "weight": {:.4f},
+ "args": [{}]
+ }})", whoami, weight, loc);
+ return monc->run_command({cmd}, {});
+ }).then([](int32_t code, string message, bufferlist) {
+ if (code) {
+ logger().warn("fail to add to crush: {} ({})", message, code);
+ throw std::runtime_error("fail to add to crush");
+ } else {
+ logger().info("added to crush: {}", message);
+ }
+ return seastar::now();
+ });
+}
+
+seastar::future<> OSD::_send_alive()
+{
+ auto want = osdmap->get_epoch();
+ logger().info(
+ "{} want {} up_thru_wanted {}",
+ __func__,
+ want,
+ up_thru_wanted);
+ if (!osdmap->exists(whoami)) {
+ logger().warn("{} DNE", __func__);
+ return seastar::now();
+ } else if (want <= up_thru_wanted) {
+ logger().debug("{} {} <= {}", __func__, want, up_thru_wanted);
+ return seastar::now();
+ } else {
+ up_thru_wanted = want;
+ auto m = make_message<MOSDAlive>(osdmap->get_epoch(), want);
+ return monc->send_message(std::move(m));
+ }
+}
+
+/*
+ The OSD's Admin Socket object created here has two servers (i.e. - blocks of commands
+ to handle) registered to it:
+ - OSD's specific commands are handled by the OSD object;
+ - there are some common commands registered to be directly handled by the AdminSocket object
+ itself.
+*/
+seastar::future<> OSD::start_asok_admin()
+{
+ auto asok_path = local_conf().get_val<std::string>("admin_socket");
+ using namespace crimson::admin;
+ return asok->start(asok_path).then([this] {
+ return seastar::when_all_succeed(
+ asok->register_admin_commands(),
+ asok->register_command(make_asok_hook<OsdStatusHook>(*this)),
+ asok->register_command(make_asok_hook<SendBeaconHook>(*this)),
+ asok->register_command(make_asok_hook<ConfigShowHook>()),
+ asok->register_command(make_asok_hook<ConfigGetHook>()),
+ asok->register_command(make_asok_hook<ConfigSetHook>()));
+ });
+}
+
seastar::future<> OSD::stop()
{
+ logger().info("stop");
// see also OSD::shutdown()
state.set_stopping();
+
return gate.close().then([this] {
+ return asok->stop();
+ }).then([this] {
return heartbeat->stop();
}).then([this] {
return monc->stop();
}).then([this] {
- return public_msgr->shutdown();
+ return when_all_succeed(
+ public_msgr->shutdown(),
+ cluster_msgr->shutdown());
+ }).then([this] {
+ return store->umount();
}).then([this] {
- return cluster_msgr->shutdown();
+ return store->stop();
+ }).handle_exception([](auto ep) {
+ logger().error("error while stopping osd: {}", ep);
});
}
+void OSD::dump_status(Formatter* f) const
+{
+ f->dump_stream("cluster_fsid") << superblock.cluster_fsid;
+ f->dump_stream("osd_fsid") << superblock.osd_fsid;
+ f->dump_unsigned("whoami", superblock.whoami);
+ f->dump_string("state", state.to_string());
+ f->dump_unsigned("oldest_map", superblock.oldest_map);
+ f->dump_unsigned("newest_map", superblock.newest_map);
+ f->dump_unsigned("num_pgs", pg_map.get_pgs().size());
+}
+
seastar::future<> OSD::load_pgs()
{
- return seastar::parallel_for_each(store->list_collections(),
- [this](auto coll) {
+ return store->list_collections().then([this](auto colls) {
+ return seastar::parallel_for_each(colls, [this](auto coll) {
spg_t pgid;
if (coll.is_pg(&pgid)) {
return load_pg(pgid).then([pgid, this](auto&& pg) {
logger().info("load_pgs: loaded {}", pgid);
- pgs.emplace(pgid, std::move(pg));
+ pg_map.pg_loaded(pgid, std::move(pg));
+ shard_services.inc_pg_num();
return seastar::now();
});
} else if (coll.is_temp(&pgid)) {
return seastar::now();
}
});
+ });
}
-seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
+seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map,
+ spg_t pgid,
+ bool do_create)
{
using ec_profile_t = map<string,string>;
- return PGMeta{store.get(), pgid}.get_epoch().then([this](epoch_t e) {
- return get_map(e);
- }).then([pgid, this] (auto&& create_map) {
+ auto get_pool_info = [create_map, pgid, this] {
if (create_map->have_pg_pool(pgid.pool())) {
pg_pool_t pi = *create_map->get_pg_pool(pgid.pool());
string name = create_map->get_pool_name(pgid.pool());
ec_profile_t ec_profile;
if (pi.is_erasure()) {
- ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile);
+ ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile);
}
- return seastar::make_ready_future<pg_pool_t,
- string,
- ec_profile_t>(std::move(pi),
- std::move(name),
- std::move(ec_profile));
+ return seastar::make_ready_future<pg_pool_t, string, ec_profile_t>(
+ std::move(pi),
+ std::move(name),
+ std::move(ec_profile));
} else {
// pool was deleted; grab final pg_pool_t off disk.
return meta_coll->load_final_pool_info(pgid.pool());
}
- }).then([this](pg_pool_t&& pool, string&& name, ec_profile_t&& ec_profile) {
- Ref<PG> pg{new PG{std::move(pool),
- std::move(name),
- std::move(ec_profile)}};
- return seastar::make_ready_future<Ref<PG>>(std::move(pg));
+ };
+ auto get_collection = [pgid, do_create, this] {
+ const coll_t cid{pgid};
+ if (do_create) {
+ return store->create_new_collection(cid);
+ } else {
+ return store->open_collection(cid);
+ }
+ };
+ return seastar::when_all_succeed(
+ std::move(get_pool_info),
+ std::move(get_collection)
+ ).then([pgid, create_map, this] (auto info,
+ auto coll) {
+ auto [pool, name, ec_profile] = std::move(info);
+ return seastar::make_ready_future<Ref<PG>>(
+ new PG{pgid,
+ pg_shard_t{whoami, pgid.shard},
+ std::move(coll),
+ std::move(pool),
+ std::move(name),
+ create_map,
+ shard_services,
+ ec_profile});
});
}
-seastar::future<> OSD::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m)
+seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
+{
+ return PGMeta{store.get(), pgid}.get_epoch().then([this](epoch_t e) {
+ return get_map(e);
+ }).then([pgid, this] (auto&& create_map) {
+ return make_pg(std::move(create_map), pgid, false);
+ }).then([this, pgid](Ref<PG> pg) {
+ return pg->read_state(store.get()).then([pg] {
+ return seastar::make_ready_future<Ref<PG>>(std::move(pg));
+ });
+ }).handle_exception([pgid](auto ep) {
+ logger().info("pg {} saw exception on load {}", pgid, ep);
+ ceph_abort("Could not load pg" == 0);
+ return seastar::make_exception_future<Ref<PG>>(ep);
+ });
+}
+
+seastar::future<> OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
{
- logger().info("ms_dispatch {}", *m);
if (state.is_stopping()) {
return seastar::now();
}
switch (m->get_type()) {
case CEPH_MSG_OSD_MAP:
return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
+ case CEPH_MSG_OSD_OP:
+ return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
+ case MSG_OSD_PG_CREATE2:
+ shard_services.start_operation<CompoundPeeringRequest>(
+ *this,
+ conn->get_shared(),
+ m);
+ return seastar::now();
+ case MSG_OSD_PG_LEASE:
+ [[fallthrough]];
+ case MSG_OSD_PG_LEASE_ACK:
+ [[fallthrough]];
+ case MSG_OSD_PG_NOTIFY2:
+ [[fallthrough]];
+ case MSG_OSD_PG_INFO2:
+ [[fallthrough]];
+ case MSG_OSD_PG_QUERY2:
+ [[fallthrough]];
+ case MSG_OSD_PG_LOG:
+ return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
+ case MSG_OSD_REPOP:
+ return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
+ case MSG_OSD_REPOPREPLY:
+ return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
default:
+ logger().info("{} unhandled message {}", __func__, *m);
return seastar::now();
}
}
-seastar::future<> OSD::ms_handle_connect(ceph::net::ConnectionRef conn)
+seastar::future<> OSD::ms_handle_connect(crimson::net::ConnectionRef conn)
{
if (conn->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
return seastar::now();
}
}
-seastar::future<> OSD::ms_handle_reset(ceph::net::ConnectionRef conn)
+seastar::future<> OSD::ms_handle_reset(crimson::net::ConnectionRef conn)
{
// TODO: cleanup the session attached to this connection
logger().warn("ms_handle_reset");
return seastar::now();
}
-seastar::future<> OSD::ms_handle_remote_reset(ceph::net::ConnectionRef conn)
+seastar::future<> OSD::ms_handle_remote_reset(crimson::net::ConnectionRef conn)
{
logger().warn("ms_handle_remote_reset");
return seastar::now();
}
+void OSD::handle_authentication(const EntityName& name,
+ const AuthCapsInfo& caps)
+{
+ // todo
+}
+
+MessageRef OSD::get_stats()
+{
+ // todo: m-to-n: collect stats using map-reduce
+ // MPGStats::had_map_for is not used since PGMonitor was removed
+ auto m = make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
+
+ for (auto [pgid, pg] : pg_map.get_pgs()) {
+ if (pg->is_primary()) {
+ auto stats = pg->get_stats();
+ // todo: update reported_epoch,reported_seq,last_fresh
+ stats.reported_epoch = osdmap->get_epoch();
+ m->pg_stat.emplace(pgid.pgid, std::move(stats));
+ }
+ }
+ return m;
+}
+
OSD::cached_map_t OSD::get_map() const
{
return osdmap;
if (auto found = osdmaps.find(e); found) {
return seastar::make_ready_future<cached_map_t>(std::move(found));
} else {
- return load_map_bl(e).then([e, this](bufferlist bl) {
- auto osdmap = std::make_unique<OSDMap>();
- osdmap->decode(bl);
+ return load_map(e).then([e, this](unique_ptr<OSDMap> osdmap) {
return seastar::make_ready_future<cached_map_t>(
osdmaps.insert(e, std::move(osdmap)));
});
}
}
+seastar::future<std::unique_ptr<OSDMap>> OSD::load_map(epoch_t e)
+{
+ auto o = std::make_unique<OSDMap>();
+ if (e > 0) {
+ return load_map_bl(e).then([o=std::move(o)](bufferlist bl) mutable {
+ o->decode(bl);
+ return seastar::make_ready_future<unique_ptr<OSDMap>>(std::move(o));
+ });
+ } else {
+ return seastar::make_ready_future<unique_ptr<OSDMap>>(std::move(o));
+ }
+}
+
seastar::future<> OSD::store_maps(ceph::os::Transaction& t,
epoch_t start, Ref<MOSDMap> m)
{
- return seastar::do_for_each(boost::counting_iterator<epoch_t>(start),
- boost::counting_iterator<epoch_t>(m->get_last() + 1),
+ return seastar::do_for_each(boost::make_counting_iterator(start),
+ boost::make_counting_iterator(m->get_last() + 1),
[&t, m, this](epoch_t e) {
if (auto p = m->maps.find(e); p != m->maps.end()) {
auto o = std::make_unique<OSDMap>();
return seastar::now();
} else if (auto p = m->incremental_maps.find(e);
p != m->incremental_maps.end()) {
- OSDMap::Incremental inc;
- auto i = p->second.cbegin();
- inc.decode(i);
- return load_map_bl(e - 1)
- .then([&t, e, inc=std::move(inc), this](bufferlist bl) {
- auto o = std::make_unique<OSDMap>();
- o->decode(bl);
- o->apply_incremental(inc);
- bufferlist fbl;
- o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
- store_map_bl(t, e, std::move(fbl));
- osdmaps.insert(e, std::move(o));
- return seastar::now();
+ return load_map(e - 1).then([e, bl=p->second, &t, this](auto o) {
+ OSDMap::Incremental inc;
+ auto i = bl.cbegin();
+ inc.decode(i);
+ o->apply_incremental(inc);
+ bufferlist fbl;
+ o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
+ store_map_bl(t, e, std::move(fbl));
+ osdmaps.insert(e, std::move(o));
+ return seastar::now();
});
} else {
logger().error("MOSDMap lied about what maps it had?");
});
}
-seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request)
+bool OSD::require_mon_peer(crimson::net::Connection *conn, Ref<Message> m)
{
- logger().info("{}({})", __func__, epoch);
- if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
- force_request) {
- return monc->renew_subs();
- } else {
- return seastar::now();
+ if (!conn->peer_is_mon()) {
+ logger().info("{} received from non-mon {}, {}",
+ __func__,
+ conn->get_peer_addr(),
+ *m);
+ return false;
}
+ return true;
}
-seastar::future<> OSD::handle_osd_map(ceph::net::ConnectionRef conn,
+seastar::future<Ref<PG>> OSD::handle_pg_create_info(
+ std::unique_ptr<PGCreateInfo> info) {
+ return seastar::do_with(
+ std::move(info),
+ [this](auto &info) -> seastar::future<Ref<PG>> {
+ return get_map(info->epoch).then(
+ [&info, this](cached_map_t startmap) ->
+ seastar::future<Ref<PG>, cached_map_t> {
+ const spg_t &pgid = info->pgid;
+ if (info->by_mon) {
+ int64_t pool_id = pgid.pgid.pool();
+ const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
+ if (!pool) {
+ logger().debug(
+ "{} ignoring pgid {}, pool dne",
+ __func__,
+ pgid);
+ return seastar::make_ready_future<Ref<PG>, cached_map_t>(
+ Ref<PG>(),
+ startmap);
+ }
+ ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus);
+ if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) {
+ // this ensures we do not process old creating messages after the
+ // pool's initial pgs have been created (and pg are subsequently
+ // allowed to split or merge).
+ logger().debug(
+ "{} dropping {} create, pool does not have CREATING flag set",
+ __func__,
+ pgid);
+ return seastar::make_ready_future<Ref<PG>, cached_map_t>(
+ Ref<PG>(),
+ startmap);
+ }
+ }
+ return make_pg(startmap, pgid, true).then(
+ [startmap=std::move(startmap)](auto pg) mutable {
+ return seastar::make_ready_future<Ref<PG>, cached_map_t>(
+ std::move(pg),
+ std::move(startmap));
+ });
+ }).then([this, &info](auto pg, auto startmap) ->
+ seastar::future<Ref<PG>> {
+ if (!pg)
+ return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
+ PeeringCtx rctx{ceph_release_t::octopus};
+ const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool());
+
+ int up_primary, acting_primary;
+ vector<int> up, acting;
+ startmap->pg_to_up_acting_osds(
+ info->pgid.pgid, &up, &up_primary, &acting, &acting_primary);
+
+ int role = startmap->calc_pg_role(pg_shard_t(whoami, info->pgid.shard),
+ acting);
+
+ create_pg_collection(
+ rctx.transaction,
+ info->pgid,
+ info->pgid.get_split_bits(pp->get_pg_num()));
+ init_pg_ondisk(
+ rctx.transaction,
+ info->pgid,
+ pp);
+
+ pg->init(
+ role,
+ up,
+ up_primary,
+ acting,
+ acting_primary,
+ info->history,
+ info->past_intervals,
+ false,
+ rctx.transaction);
+
+ return shard_services.start_operation<PGAdvanceMap>(
+ *this, pg, pg->get_osdmap_epoch(),
+ osdmap->get_epoch(), std::move(rctx), true).second.then([pg] {
+ return seastar::make_ready_future<Ref<PG>>(pg);
+ });
+ });
+ });
+}
+
+seastar::future<> OSD::handle_osd_map(crimson::net::Connection* conn,
Ref<MOSDMap> m)
{
logger().info("handle_osd_map {}", *m);
const auto first = m->get_first();
const auto last = m->get_last();
-
+ logger().info("handle_osd_map epochs [{}..{}], i have {}, src has [{}..{}]",
+ first, last, superblock.newest_map, m->oldest_map, m->newest_map);
// make sure there is something new, here, before we bother flushing
// the queues and such
if (last <= superblock.newest_map) {
logger().info("handle_osd_map message skips epochs {}..{}",
start, first - 1);
if (m->oldest_map <= start) {
- return osdmap_subscribe(start, false);
+ return shard_services.osdmap_subscribe(start, false);
}
// always try to get the full range of maps--as many as we can. this
// 1- is good to have
// 2- is at present the only way to ensure that we get a *full* map as
// the first map!
if (m->oldest_map < first) {
- return osdmap_subscribe(m->oldest_map - 1, true);
+ return shard_services.osdmap_subscribe(m->oldest_map - 1, true);
}
skip_maps = true;
start = first;
{
logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last);
// advance through the new maps
- return seastar::parallel_for_each(boost::irange(first, last + 1),
- [this](epoch_t cur) {
+ return seastar::do_for_each(boost::make_counting_iterator(first),
+ boost::make_counting_iterator(last + 1),
+ [this](epoch_t cur) {
return get_map(cur).then([this](cached_map_t&& o) {
osdmap = std::move(o);
- if (up_epoch != 0 &&
+ shard_services.update_map(osdmap);
+ if (up_epoch == 0 &&
osdmap->is_up(whoami) &&
osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
up_epoch = osdmap->get_epoch();
std::chrono::seconds(TICK_INTERVAL));
}
}
-
+ check_osdmap_features();
+ // yay!
+ return consume_map(osdmap->get_epoch());
+ }).then([m, this] {
if (state.is_active()) {
logger().info("osd.{}: now active", whoami);
if (!osdmap->exists(whoami)) {
});
}
+seastar::future<> OSD::handle_osd_op(crimson::net::Connection* conn,
+ Ref<MOSDOp> m)
+{
+ shard_services.start_operation<ClientRequest>(
+ *this,
+ conn->get_shared(),
+ std::move(m));
+ return seastar::now();
+}
+
+seastar::future<> OSD::handle_rep_op(crimson::net::Connection* conn,
+ Ref<MOSDRepOp> m)
+{
+ m->finish_decode();
+ shard_services.start_operation<RepRequest>(
+ *this,
+ conn->get_shared(),
+ std::move(m));
+ return seastar::now();
+}
+
+seastar::future<> OSD::handle_rep_op_reply(crimson::net::Connection* conn,
+ Ref<MOSDRepOpReply> m)
+{
+ const auto& pgs = pg_map.get_pgs();
+ if (auto pg = pgs.find(m->get_spg()); pg != pgs.end()) {
+ m->finish_decode();
+ pg->second->handle_rep_op_reply(conn, *m);
+ } else {
+ logger().warn("stale reply: {}", *m);
+ }
+ return seastar::now();
+}
+
bool OSD::should_restart() const
{
if (!osdmap->is_up(whoami)) {
seastar::future<> OSD::restart()
{
+ beacon_timer.cancel();
+ heartbeat_timer.cancel();
up_epoch = 0;
bind_epoch = osdmap->get_epoch();
// TODO: promote to shutdown if being marked down for multiple times
seastar::future<> OSD::send_beacon()
{
+ if (!state.is_active()) {
+ return seastar::now();
+ }
// FIXME: min lec should be calculated from pg_stat
// and should set m->pgs
epoch_t min_last_epoch_clean = osdmap->get_epoch();
auto m = make_message<MOSDBeacon>(osdmap->get_epoch(),
- min_last_epoch_clean);
+ min_last_epoch_clean,
+ superblock.last_purged_snaps_scrub);
return monc->send_message(m);
}
-void OSD::update_heartbeat_peers()
+seastar::future<> OSD::update_heartbeat_peers()
{
if (!state.is_active()) {
- return;
+ return seastar::now();
}
- for (auto& pg : pgs) {
+ for (auto& pg : pg_map.get_pgs()) {
vector<int> up, acting;
osdmap->pg_to_up_acting_osds(pg.first.pgid,
&up, nullptr,
&acting, nullptr);
- for (auto osd : boost::join(up, acting)) {
- if (osd != CRUSH_ITEM_NONE) {
+ for (int osd : boost::join(up, acting)) {
+ if (osd == CRUSH_ITEM_NONE || osd == whoami) {
+ continue;
+ } else {
heartbeat->add_peer(osd, osdmap->get_epoch());
}
}
}
- heartbeat->update_peers(whoami);
+ return heartbeat->update_peers(whoami);
+}
+
+seastar::future<> OSD::handle_peering_op(
+ crimson::net::Connection* conn,
+ Ref<MOSDPeeringOp> m)
+{
+ const int from = m->get_source().num();
+ logger().debug("handle_peering_op on {} from {}", m->get_spg(), from);
+ shard_services.start_operation<RemotePeeringEvent>(
+ *this,
+ conn->get_shared(),
+ shard_services,
+ pg_shard_t{from, m->get_spg().shard},
+ m->get_spg(),
+ std::move(*m->get_event()));
+ return seastar::now();
+}
+
+void OSD::check_osdmap_features()
+{
+ heartbeat->set_require_authorizer(true);
+}
+
+seastar::future<> OSD::consume_map(epoch_t epoch)
+{
+ // todo: m-to-n: broadcast this news to all shards
+ auto &pgs = pg_map.get_pgs();
+ return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) {
+ return shard_services.start_operation<PGAdvanceMap>(
+ *this, pg.second, pg.second->get_osdmap_epoch(), epoch,
+ PeeringCtx{ceph_release_t::octopus}, false).second;
+ }).then([epoch, this] {
+ osdmap_gate.got_map(epoch);
+ return seastar::make_ready_future();
+ });
+}
+
+
+blocking_future<Ref<PG>>
+OSD::get_or_create_pg(
+ spg_t pgid,
+ epoch_t epoch,
+ std::unique_ptr<PGCreateInfo> info)
+{
+ auto [fut, creating] = pg_map.get_pg(pgid, bool(info));
+ if (!creating && info) {
+ pg_map.set_creating(pgid);
+ (void)handle_pg_create_info(std::move(info));
+ }
+ return std::move(fut);
+}
+
+blocking_future<Ref<PG>> OSD::wait_for_pg(
+ spg_t pgid)
+{
+ return pg_map.get_pg(pgid).first;
+}
+
}