#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
-#include "messages/MOSDPGInfo.h"
-#include "messages/MOSDPGLog.h"
-#include "messages/MOSDPGNotify.h"
-#include "messages/MOSDPGQuery.h"
#include "messages/MOSDRepOp.h"
#include "messages/MOSDRepOpReply.h"
#include "os/Transaction.h"
+#include "crimson/common/exception.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Messenger.h"
#include "crimson/os/cyanstore/cyan_store.h"
#include "crimson/osd/pg_meta.h"
#include "crimson/osd/pg_backend.h"
#include "crimson/osd/ops_executer.h"
+#include "crimson/osd/osd_operations/osdop_params.h"
#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/pg_recovery.h"
+#include "crimson/osd/replicated_recovery_backend.h"
namespace {
seastar::logger& logger() {
coll_ref,
shard_services,
profile)),
+ recovery_backend(
+ std::make_unique<ReplicatedRecoveryBackend>(
+ *this, shard_services, coll_ref, backend.get())),
+ recovery_handler(
+ std::make_unique<PGRecovery>(this)),
peering_state(
shard_services.get_cct(),
pg_shard,
pgid,
PGPool(
- shard_services.get_cct(),
osdmap,
pgid.pool(),
pool,
// handle the peering event in the background
check_readable_timer.cancel();
check_readable_timer.set_callback([last_peering_reset, this] {
- shard_services.start_operation<LocalPeeringEvent>(
+ (void) shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
wait_for_active_blocker.on_active();
if (peering_state.needs_recovery()) {
- shard_services.start_operation<LocalPeeringEvent>(
+ logger().info("{}: requesting recovery",
+ __func__);
+ (void) shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
get_osdmap_epoch(),
PeeringState::DoRecovery{});
} else if (peering_state.needs_backfill()) {
- shard_services.start_operation<LocalPeeringEvent>(
+ logger().info("{}: requesting backfill",
+ __func__);
+ (void) shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
get_osdmap_epoch(),
PeeringState::RequestBackfill{});
} else {
- shard_services.start_operation<LocalPeeringEvent>(
+ logger().debug("{}: no need to recover or backfill, AllReplicasRecovered",
+ " for pg: {}", __func__, pgid);
+ (void) shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
get_osdmap_epoch(),
PeeringState::AllReplicasRecovered{});
}
+ backend->on_activate_complete();
}
void PG::prepare_write(pg_info_t &info,
}
}
-ghobject_t PG::do_delete_work(ceph::os::Transaction &t,
- ghobject_t _next)
+std::pair<ghobject_t, bool>
+PG::do_delete_work(ceph::os::Transaction &t, ghobject_t _next)
{
// TODO
shard_services.dec_pg_num();
+ return {_next, false};
+}
+
+void PG::scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type)
+{
+ // TODO: should update the stats upon finishing the scrub
+ peering_state.update_stats([scrub_level, this](auto& history, auto& stats) {
+ const utime_t now = ceph_clock_now();
+ history.last_scrub = peering_state.get_info().last_update;
+ history.last_scrub_stamp = now;
+ history.last_clean_scrub_stamp = now;
+ if (scrub_level == scrub_level_t::deep) {
+ history.last_deep_scrub = history.last_scrub;
+ history.last_deep_scrub_stamp = now;
+ }
+ // yes, please publish the stats
+ return true;
+ });
}
void PG::log_state_enter(const char *state) {
// handle the peering event in the background
renew_lease_timer.cancel();
renew_lease_timer.set_callback([last_peering_reset, this] {
- shard_services.start_operation<LocalPeeringEvent>(
+ (void) shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
{
- return PGMeta{store, pgid}.load(
- ).then([this, store](pg_info_t pg_info, PastIntervals past_intervals) {
+ if (__builtin_expect(stopping, false)) {
+ return seastar::make_exception_future<>(
+ crimson::common::system_shutdown_exception());
+ }
+
+ return seastar::do_with(PGMeta(store, pgid), [] (auto& pg_meta) {
+ return pg_meta.load();
+ }).then([this, store](auto&& ret) {
+ auto [pg_info, past_intervals] = std::move(ret);
return peering_state.init_from_disk_state(
std::move(pg_info),
std::move(past_intervals),
peering_state.set_role(rr);
epoch_t epoch = get_osdmap_epoch();
- shard_services.start_operation<LocalPeeringEvent>(
+ (void) shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
pg_whoami,
PGPeeringEvent& evt, PeeringCtx &rctx)
{
if (!peering_state.pg_has_reset_since(evt.get_epoch_requested())) {
- logger().debug("{} handling {}", __func__, evt.get_desc());
+ logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
do_peering_event(evt.get_event(), rctx);
} else {
logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
out << peering_state << " ";
}
+void PG::dump_primary(Formatter* f)
+{
+ peering_state.dump_peering_state(f);
+
+ f->open_array_section("recovery_state");
+ PeeringState::QueryState q(f);
+ peering_state.handle_event(q, 0);
+ f->close_section();
+
+ // TODO: snap_trimq
+ // TODO: scrubber state
+ // TODO: agent state
+}
std::ostream& operator<<(std::ostream& os, const PG& pg)
{
}
}
-seastar::future<> PG::submit_transaction(ObjectContextRef&& obc,
+seastar::future<> PG::WaitForActiveBlocker::stop()
+{
+ p.set_exception(crimson::common::system_shutdown_exception());
+ return seastar::now();
+}
+
+seastar::future<> PG::submit_transaction(const OpInfo& op_info,
+ const std::vector<OSDOp>& ops,
+ ObjectContextRef&& obc,
ceph::os::Transaction&& txn,
- const MOSDOp& req)
+ const osd_op_params_t& osd_op_p)
{
+ if (__builtin_expect(stopping, false)) {
+ return seastar::make_exception_future<>(
+ crimson::common::system_shutdown_exception());
+ }
+
epoch_t map_epoch = get_osdmap_epoch();
- eversion_t at_version{map_epoch, projected_last_update.version + 1};
+
+ if (__builtin_expect(osd_op_p.at_version.epoch != map_epoch, false)) {
+ throw crimson::common::actingset_changed(is_primary());
+ }
+
+ std::vector<pg_log_entry_t> log_entries;
+ log_entries.emplace_back(obc->obs.exists ?
+ pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
+ obc->obs.oi.soid, osd_op_p.at_version, obc->obs.oi.version,
+ osd_op_p.user_modify ? osd_op_p.at_version.version : 0,
+ osd_op_p.req->get_reqid(), osd_op_p.req->get_mtime(),
+ op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
+ // TODO: refactor the submit_transaction
+ if (op_info.allows_returnvec()) {
+ // also the per-op values are recorded in the pg log
+ log_entries.back().set_op_returns(ops);
+ logger().debug("{} op_returns: {}",
+ __func__, log_entries.back().op_returns);
+ }
+ log_entries.back().clean_regions = std::move(osd_op_p.clean_regions);
+ peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, osd_op_p.at_version);
+ peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
+ txn, true, false);
+
return backend->mutate_object(peering_state.get_acting_recovery_backfill(),
std::move(obc),
std::move(txn),
- req,
+ std::move(osd_op_p),
peering_state.get_last_peering_reset(),
map_epoch,
- at_version).then([this](auto acked) {
+ std::move(log_entries)).then(
+ [this, last_complete=peering_state.get_info().last_complete,
+ at_version=osd_op_p.at_version](auto acked) {
for (const auto& peer : acked) {
peering_state.update_peer_last_complete_ondisk(
peer.shard, peer.last_complete_ondisk);
}
+ peering_state.complete_write(at_version, last_complete);
return seastar::now();
});
}
+osd_op_params_t&& PG::fill_op_params_bump_pg_version(
+ osd_op_params_t&& osd_op_p,
+ Ref<MOSDOp> m,
+ const bool user_modify)
+{
+ osd_op_p.req = std::move(m);
+ osd_op_p.at_version = next_version();
+ osd_op_p.pg_trim_to = get_pg_trim_to();
+ osd_op_p.min_last_complete_ondisk = get_min_last_complete_ondisk();
+ osd_op_p.last_complete = get_info().last_complete;
+ if (user_modify) {
+ osd_op_p.user_at_version = osd_op_p.at_version.version;
+ }
+ return std::move(osd_op_p);
+}
+
+seastar::future<Ref<MOSDOpReply>> PG::handle_failed_op(
+ const std::error_code& e,
+ ObjectContextRef obc,
+ const OpsExecuter& ox,
+ const MOSDOp& m) const
+{
+ // Oops, an operation had failed. do_osd_ops() altogether with
+ // OpsExecuter already dropped the ObjectStore::Transaction if
+ // there was any. However, this is not enough to completely
+ // rollback as we gave OpsExecuter the very single copy of `obc`
+ // we maintain and we did it for both reading and writing.
+ // Now all modifications must be reverted.
+ //
+ // Let's just reload from the store. Evicting from the shared
+ // LRU would be tricky as next MOSDOp (the one at `get_obc`
+ // phase) could actually already finished the lookup. Fortunately,
+ // this is supposed to live on cold paths, so performance is not
+ // a concern -- simplicity wins.
+ //
+ // The conditional's purpose is to efficiently handle hot errors
+ // which may appear as a result of e.g. CEPH_OSD_OP_CMPXATTR or
+ // CEPH_OSD_OP_OMAP_CMP. These are read-like ops and clients
+ // typically append them before any write. If OpsExecuter hasn't
+ // seen any modifying operation, `obc` is supposed to be kept
+ // unchanged.
+ assert(e.value() > 0);
+ const bool need_reload_obc = ox.has_seen_write();
+ logger().debug(
+ "{}: {} - object {} got error code {}, {}; need_reload_obc {}",
+ __func__,
+ m,
+ obc->obs.oi.soid,
+ e.value(),
+ e.message(),
+ need_reload_obc);
+ return (need_reload_obc ? reload_obc(*obc)
+ : load_obc_ertr::now()
+ ).safe_then([&e, &m, obc = std::move(obc), this] {
+ auto reply = make_message<MOSDOpReply>(
+ &m, -e.value(), get_osdmap_epoch(), 0, false);
+ reply->set_enoent_reply_versions(
+ peering_state.get_info().last_update,
+ peering_state.get_info().last_user_version);
+ return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ }, load_obc_ertr::assert_all{ "can't live with object state messed up" });
+}
+
seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(
Ref<MOSDOp> m,
- ObjectContextRef obc)
+ ObjectContextRef obc,
+ const OpInfo &op_info)
{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+
using osd_op_errorator = OpsExecuter::osd_op_errorator;
const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
: m->get_hobj();
- auto ox =
- std::make_unique<OpsExecuter>(obc, *this/* as const& */, m);
+ auto ox = std::make_unique<OpsExecuter>(
+ obc, op_info, get_pool().info, get_backend(), *m);
return crimson::do_for_each(
m->ops, [obc, m, ox = ox.get()](OSDOp& osd_op) {
logger().debug(
*m,
obc->obs.oi.soid,
ceph_osd_op_name(osd_op.op.op));
- return ox->execute_osd_op(osd_op);
- }).safe_then([this, obc, m, ox = ox.get()] {
+ return ox->execute_op(osd_op);
+ }).safe_then([this, obc, m, ox = ox.get(), &op_info] {
logger().debug(
"do_osd_ops: {} - object {} all operations successful",
*m,
obc->obs.oi.soid);
- return std::move(*ox).submit_changes(
- [this, m] (auto&& txn, auto&& obc) -> osd_op_errorator::future<> {
- // XXX: the entire lambda could be scheduled conditionally. ::if_then()?
- if (txn.empty()) {
- logger().debug(
- "do_osd_ops: {} - object {} txn is empty, bypassing mutate",
- *m,
- obc->obs.oi.soid);
- return osd_op_errorator::now();
- } else {
- logger().debug(
- "do_osd_ops: {} - object {} submitting txn",
- *m,
- obc->obs.oi.soid);
- return submit_transaction(std::move(obc), std::move(txn), *m);
- }
+ return std::move(*ox).flush_changes(
+ [m] (auto&& obc) -> osd_op_errorator::future<> {
+ logger().debug(
+ "do_osd_ops: {} - object {} txn is empty, bypassing mutate",
+ *m,
+ obc->obs.oi.soid);
+ return osd_op_errorator::now();
+ },
+ [this, m, &op_info] (auto&& txn,
+ auto&& obc,
+ auto&& osd_op_p,
+ bool user_modify) -> osd_op_errorator::future<> {
+ logger().debug(
+ "do_osd_ops: {} - object {} submitting txn",
+ *m,
+ obc->obs.oi.soid);
+ auto filled_osd_op_p = fill_op_params_bump_pg_version(
+ std::move(osd_op_p),
+ std::move(m),
+ user_modify);
+ return submit_transaction(
+ op_info,
+ filled_osd_op_p.req->ops,
+ std::move(obc),
+ std::move(txn),
+ std::move(filled_osd_op_p));
});
- }).safe_then([m, obc, this, ox_deleter = std::move(ox)] {
- auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
- 0, false);
+ }).safe_then([this,
+ m,
+ obc,
+ rvec = op_info.allows_returnvec()] {
+ // TODO: should stop at the first op which returns a negative retval,
+ // cmpext uses it for returning the index of first unmatched byte
+ int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
+ if (result > 0 && !rvec) {
+ result = 0;
+ }
+ auto reply = make_message<MOSDOpReply>(m.get(),
+ result,
+ get_osdmap_epoch(),
+ 0,
+ false);
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
logger().debug(
"do_osd_ops: {} - object {} sending reply",
*m,
obc->obs.oi.soid);
return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
- }, OpsExecuter::osd_op_errorator::all_same_way([=] (const std::error_code& e) {
- assert(e.value() > 0);
- logger().debug(
- "do_osd_ops: {} - object {} got error code {}, {}",
- *m,
- obc->obs.oi.soid,
- e.value(),
- e.message());
- auto reply = make_message<MOSDOpReply>(
- m.get(), -e.value(), get_osdmap_epoch(), 0, false);
- reply->set_enoent_reply_versions(peering_state.get_info().last_update,
- peering_state.get_info().last_user_version);
- return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
- })).handle_exception_type([=,&oid](const crimson::osd::error& e) {
+ }, osd_op_errorator::all_same_way([ox = ox.get(),
+ m,
+ obc,
+ this] (const std::error_code& e) {
+ return handle_failed_op(e, std::move(obc), *ox, *m);
+ })).handle_exception_type([ox_deleter = std::move(ox),
+ m,
+ obc,
+ this] (const crimson::osd::error& e) {
// we need this handler because throwing path which aren't errorated yet.
- logger().debug(
- "do_osd_ops: {} - object {} got unhandled exception {} ({})",
- *m,
- obc->obs.oi.soid,
- e.code(),
- e.what());
- auto reply = make_message<MOSDOpReply>(
- m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
- reply->set_enoent_reply_versions(peering_state.get_info().last_update,
- peering_state.get_info().last_user_version);
- return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ logger().debug("encountered the legacy error handling path!");
+ return handle_failed_op(e.code(), std::move(obc), *ox_deleter, *m);
});
}
seastar::future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
{
- auto ox = std::make_unique<OpsExecuter>(*this/* as const& */, m);
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+
+ auto ox = std::make_unique<PgOpsExecuter>(std::as_const(*this),
+ std::as_const(*m));
return seastar::do_for_each(m->ops, [ox = ox.get()](OSDOp& osd_op) {
logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op.op.op));
- return ox->execute_pg_op(osd_op);
+ return ox->execute_op(osd_op);
}).then([m, this, ox = std::move(ox)] {
auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
});
}
-std::pair<hobject_t, RWState::State> PG::get_oid_and_lock(
- const MOSDOp &m,
- const OpInfo &op_info)
+hobject_t PG::get_oid(const MOSDOp &m)
+{
+ return (m.get_snapid() == CEPH_SNAPDIR ?
+ m.get_hobj().get_head() :
+ m.get_hobj());
+}
+
+RWState::State PG::get_lock_type(const OpInfo &op_info)
{
- auto oid = m.get_snapid() == CEPH_SNAPDIR ?
- m.get_hobj().get_head() : m.get_hobj();
- RWState::State lock_type = RWState::RWNONE;
if (op_info.rwordered() && op_info.may_read()) {
- lock_type = RWState::RWState::RWEXCL;
+ return RWState::RWEXCL;
} else if (op_info.rwordered()) {
- lock_type = RWState::RWState::RWWRITE;
+ return RWState::RWWRITE;
} else {
ceph_assert(op_info.may_read());
- lock_type = RWState::RWState::RWREAD;
+ return RWState::RWREAD;
}
- return std::make_pair(oid, lock_type);
}
std::optional<hobject_t> PG::resolve_oid(
}
}
-PG::load_obc_ertr::future<
- std::pair<crimson::osd::ObjectContextRef, bool>>
-PG::get_or_load_clone_obc(hobject_t oid, ObjectContextRef head)
+template<RWState::State State>
+PG::load_obc_ertr::future<>
+PG::with_head_obc(hobject_t oid, with_obc_func_t&& func)
{
- ceph_assert(!oid.is_head());
- using ObjectContextRef = crimson::osd::ObjectContextRef;
- auto coid = resolve_oid(head->get_ro_ss(), oid);
- if (!coid) {
- return load_obc_ertr::make_ready_future<
- std::pair<crimson::osd::ObjectContextRef, bool>>(
- std::make_pair(ObjectContextRef(), true)
- );
- }
- auto [obc, existed] = shard_services.obc_registry.get_cached_obc(*coid);
- if (existed) {
- return load_obc_ertr::make_ready_future<
- std::pair<crimson::osd::ObjectContextRef, bool>>(
- std::make_pair(obc, true)
- );
- } else {
- bool got = obc->maybe_get_excl();
- ceph_assert(got);
- return backend->load_metadata(*coid).safe_then(
- [oid, obc=std::move(obc), head](auto &&md) mutable {
- obc->set_clone_state(std::move(md->os), std::move(head));
- return load_obc_ertr::make_ready_future<
- std::pair<crimson::osd::ObjectContextRef, bool>>(
- std::make_pair(obc, false)
- );
+ assert(oid.is_head());
+ auto [obc, existed] = shard_services.obc_registry.get_cached_obc(oid);
+ return obc->with_lock<State>(
+ [oid=std::move(oid), existed=existed, obc=std::move(obc),
+ func=std::move(func), this] {
+ auto loaded = load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
+ if (existed) {
+ logger().debug("with_head_obc: found {} in cache", oid);
+ } else {
+ logger().debug("with_head_obc: cache miss on {}", oid);
+ loaded = obc->with_promoted_lock<State>([this, obc] {
+ return load_head_obc(obc);
});
- }
+ }
+ return loaded.safe_then([func=std::move(func)](auto obc) {
+ return func(std::move(obc));
+ });
+ });
}
-PG::load_obc_ertr::future<
- std::pair<crimson::osd::ObjectContextRef, bool>>
-PG::get_or_load_head_obc(hobject_t oid)
+template<RWState::State State>
+PG::load_obc_ertr::future<>
+PG::with_clone_obc(hobject_t oid, with_obc_func_t&& func)
{
- ceph_assert(oid.is_head());
- auto [obc, existed] = shard_services.obc_registry.get_cached_obc(oid);
- if (existed) {
+ assert(!oid.is_head());
+ return with_head_obc<RWState::RWREAD>(oid.get_head(),
+ [oid, func=std::move(func), this](auto head) -> load_obc_ertr::future<> {
+ auto coid = resolve_oid(head->get_ro_ss(), oid);
+ if (!coid) {
+ // TODO: return crimson::ct_error::enoent::make();
+ logger().error("with_clone_obc: {} clone not found", coid);
+ return load_obc_ertr::make_ready_future<>();
+ }
+ auto [clone, existed] = shard_services.obc_registry.get_cached_obc(*coid);
+ return clone->template with_lock<State>(
+ [coid=*coid, existed=existed,
+ head=std::move(head), clone=std::move(clone),
+ func=std::move(func), this]() -> load_obc_ertr::future<> {
+ auto loaded = load_obc_ertr::make_ready_future<ObjectContextRef>(clone);
+ if (existed) {
+ logger().debug("with_clone_obc: found {} in cache", coid);
+ } else {
+ logger().debug("with_clone_obc: cache miss on {}", coid);
+ loaded = clone->template with_promoted_lock<State>(
+ [coid, clone, head, this] {
+ return backend->load_metadata(coid).safe_then(
+ [coid, clone=std::move(clone), head=std::move(head)](auto md) mutable {
+ clone->set_clone_state(std::move(md->os), std::move(head));
+ return clone;
+ });
+ });
+ }
+ return loaded.safe_then([func=std::move(func)](auto clone) {
+ return func(std::move(clone));
+ });
+ });
+ });
+}
+
+// explicitly instantiate the used instantiations
+template PG::load_obc_ertr::future<>
+PG::with_head_obc<RWState::RWNONE>(hobject_t, with_obc_func_t&&);
+
+PG::load_obc_ertr::future<crimson::osd::ObjectContextRef>
+PG::load_head_obc(ObjectContextRef obc)
+{
+ hobject_t oid = obc->get_oid();
+ return backend->load_metadata(oid).safe_then([obc=std::move(obc)](auto md)
+ -> load_obc_ertr::future<crimson::osd::ObjectContextRef> {
+ const hobject_t& oid = md->os.oi.soid;
logger().debug(
- "{}: found {} in cache",
- __func__,
- oid);
+ "load_head_obc: loaded obs {} for {}", md->os.oi, oid);
+ if (!md->ss) {
+ logger().error(
+ "load_head_obc: oid {} missing snapset", oid);
+ return crimson::ct_error::object_corrupted::make();
+ }
+ obc->set_head_state(std::move(md->os), std::move(*(md->ss)));
+ logger().debug(
+ "load_head_obc: returning obc {} for {}",
+ obc->obs.oi, obc->obs.oi.soid);
return load_obc_ertr::make_ready_future<
- std::pair<crimson::osd::ObjectContextRef, bool>>(
- std::make_pair(std::move(obc), true)
- );
- } else {
+ crimson::osd::ObjectContextRef>(obc);
+ });
+}
+
+PG::load_obc_ertr::future<>
+PG::reload_obc(crimson::osd::ObjectContext& obc) const
+{
+ assert(obc.is_head());
+ return backend->load_metadata(obc.get_oid()).safe_then([&obc](auto md)
+ -> load_obc_ertr::future<> {
logger().debug(
- "{}: cache miss on {}",
+ "{}: reloaded obs {} for {}",
__func__,
- oid);
- bool got = obc->maybe_get_excl();
- ceph_assert(got);
- return backend->load_metadata(oid).safe_then(
- [oid, obc=std::move(obc)](auto md) ->
- load_obc_ertr::future<
- std::pair<crimson::osd::ObjectContextRef, bool>>
- {
- logger().debug(
- "{}: loaded obs {} for {}",
- __func__,
- md->os.oi,
- oid);
- if (!md->ss) {
- logger().error(
- "{}: oid {} missing snapset",
- __func__,
- oid);
- return crimson::ct_error::object_corrupted::make();
- }
- obc->set_head_state(std::move(md->os), std::move(*(md->ss)));
- logger().debug(
- "{}: returning obc {} for {}",
- __func__,
- obc->obs.oi,
- obc->obs.oi.soid);
- return load_obc_ertr::make_ready_future<
- std::pair<crimson::osd::ObjectContextRef, bool>>(
- std::make_pair(obc, false)
- );
- });
- }
+ md->os.oi,
+ obc.get_oid());
+ if (!md->ss) {
+ logger().error(
+ "{}: oid {} missing snapset",
+ __func__,
+ obc.get_oid());
+ return crimson::ct_error::object_corrupted::make();
+ }
+ obc.set_head_state(std::move(md->os), std::move(*(md->ss)));
+ return load_obc_ertr::now();
+ });
}
-PG::load_obc_ertr::future<crimson::osd::ObjectContextRef>
-PG::get_locked_obc(
- Operation *op, const hobject_t &oid, RWState::State type)
-{
- return get_or_load_head_obc(oid.get_head()).safe_then(
- [this, op, oid, type](auto p) -> load_obc_ertr::future<ObjectContextRef>{
- auto &[head_obc, head_existed] = p;
- if (oid.is_head()) {
- if (head_existed) {
- return head_obc->get_lock_type(op, type).then([head_obc=head_obc] {
- ceph_assert(head_obc->loaded);
- return load_obc_ertr::make_ready_future<ObjectContextRef>(head_obc);
- });
- } else {
- head_obc->degrade_excl_to(type);
- return load_obc_ertr::make_ready_future<ObjectContextRef>(head_obc);
- }
- } else {
- return head_obc->get_lock_type(op, RWState::RWREAD).then(
- [this, head_obc=head_obc, oid] {
- ceph_assert(head_obc->loaded);
- return get_or_load_clone_obc(oid, head_obc);
- }).safe_then([head_obc=head_obc, op, oid, type](auto p) {
- auto &[obc, existed] = p;
- if (existed) {
- return load_obc_ertr::future<>(
- obc->get_lock_type(op, type)).safe_then([obc=obc] {
- ceph_assert(obc->loaded);
- return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
- });
- } else {
- obc->degrade_excl_to(type);
- return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
- }
- }).safe_then([head_obc=head_obc](auto obc) {
- head_obc->put_lock_type(RWState::RWREAD);
- return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
- });
- }
- });
+PG::load_obc_ertr::future<>
+PG::with_locked_obc(Ref<MOSDOp> &m, const OpInfo &op_info,
+ Operation *op, PG::with_obc_func_t &&f)
+{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+ const hobject_t oid = get_oid(*m);
+ switch (get_lock_type(op_info)) {
+ case RWState::RWREAD:
+ if (oid.is_head()) {
+ return with_head_obc<RWState::RWREAD>(oid, std::move(f));
+ } else {
+ return with_clone_obc<RWState::RWREAD>(oid, std::move(f));
+ }
+ case RWState::RWWRITE:
+ if (oid.is_head()) {
+ return with_head_obc<RWState::RWWRITE>(oid, std::move(f));
+ } else {
+ return with_clone_obc<RWState::RWWRITE>(oid, std::move(f));
+ }
+ case RWState::RWEXCL:
+ if (oid.is_head()) {
+ return with_head_obc<RWState::RWWRITE>(oid, std::move(f));
+ } else {
+ return with_clone_obc<RWState::RWWRITE>(oid, std::move(f));
+ }
+ default:
+ ceph_abort();
+ };
}
seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
{
+ if (__builtin_expect(stopping, false)) {
+ return seastar::make_exception_future<>(
+ crimson::common::system_shutdown_exception());
+ }
+
+ if (can_discard_replica_op(*req)) {
+ return seastar::now();
+ }
+
ceph::os::Transaction txn;
auto encoded_txn = req->get_data().cbegin();
decode(txn, encoded_txn);
+ auto p = req->logbl.cbegin();
+ std::vector<pg_log_entry_t> log_entries;
+ decode(log_entries, p);
+ peering_state.append_log(std::move(log_entries), req->pg_trim_to,
+ req->version, req->min_last_complete_ondisk, txn, !txn.empty(), false);
return shard_services.get_store().do_transaction(coll_ref, std::move(txn))
.then([req, lcod=peering_state.get_info().last_complete, this] {
peering_state.update_last_complete_ondisk(lcod);
});
}
-void PG::handle_rep_op_reply(crimson::net::Connection* conn,
+void PG::handle_rep_op_reply(crimson::net::ConnectionRef conn,
const MOSDRepOpReply& m)
{
- backend->got_rep_op_reply(m);
+ if (!can_discard_replica_op(m)) {
+ backend->got_rep_op_reply(m);
+ }
+}
+
+template <typename MsgType>
+bool PG::can_discard_replica_op(const MsgType& m) const
+{
+ // if a repop is replied after a replica goes down in a new osdmap, and
+ // before the pg advances to this new osdmap, the repop replies before this
+ // repop can be discarded by that replica OSD, because the primary resets the
+ // connection to it when handling the new osdmap marking it down, and also
+ // resets the messenger sesssion when the replica reconnects. to avoid the
+ // out-of-order replies, the messages from that replica should be discarded.
+ const auto osdmap = peering_state.get_osdmap();
+ const int from_osd = m.get_source().num();
+ if (osdmap->is_down(from_osd)) {
+ return true;
+ }
+ // Mostly, this overlaps with the old_peering_msg
+ // condition. An important exception is pushes
+ // sent by replicas not in the acting set, since
+ // if such a replica goes down it does not cause
+ // a new interval.
+ if (osdmap->get_down_at(from_osd) >= m.map_epoch) {
+ return true;
+ }
+ // same pg?
+ // if pg changes *at all*, we reset and repeer!
+ if (epoch_t lpr = peering_state.get_last_peering_reset();
+ lpr > m.map_epoch) {
+ logger().debug("{}: pg changed {} after {}, dropping",
+ __func__, get_info().history, m.map_epoch);
+ return true;
+ }
+ return false;
+}
+
+seastar::future<> PG::stop()
+{
+ logger().info("PG {} {}", pgid, __func__);
+ stopping = true;
+ return osdmap_gate.stop().then([this] {
+ return wait_for_active_blocker.stop();
+ }).then([this] {
+ return recovery_handler->stop();
+ }).then([this] {
+ return recovery_backend->stop();
+ }).then([this] {
+ return backend->stop();
+ });
+}
+
+void PG::on_change(ceph::os::Transaction &t) {
+ recovery_backend->on_peering_interval_change(t);
+ backend->on_actingset_changed({ is_primary() });
+}
+
+bool PG::can_discard_op(const MOSDOp& m) const {
+ return __builtin_expect(m.get_map_epoch()
+ < peering_state.get_info().history.same_primary_since, false);
+}
+
+bool PG::is_degraded_or_backfilling_object(const hobject_t& soid) const {
+ /* The conditions below may clear (on_local_recover, before we queue
+ * the transaction) before we actually requeue the degraded waiters
+ * in on_global_recover after the transaction completes.
+ */
+ if (peering_state.get_pg_log().get_missing().get_items().count(soid))
+ return true;
+ ceph_assert(!get_acting_recovery_backfill().empty());
+ for (auto& peer : get_acting_recovery_backfill()) {
+ if (peer == get_primary()) continue;
+ auto peer_missing_entry = peering_state.get_peer_missing().find(peer);
+ // If an object is missing on an async_recovery_target, return false.
+ // This will not block the op and the object is async recovered later.
+ if (peer_missing_entry != peering_state.get_peer_missing().end() &&
+ peer_missing_entry->second.get_items().count(soid)) {
+ return true;
+ }
+ // Object is degraded if after last_backfill AND
+ // we are backfilling it
+ if (is_backfill_target(peer) &&
+ peering_state.get_peer_info(peer).last_backfill <= soid &&
+ recovery_handler->backfill_state->get_last_backfill_started() >= soid &&
+ recovery_backend->is_recovering(soid)) {
+ return true;
+ }
+ }
+ return false;
}
}