-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
#include "pg.h"
#include "crimson/osd/pg_recovery.h"
#include "crimson/osd/replicated_recovery_backend.h"
+using std::ostream;
+using std::set;
+using std::string;
+using std::vector;
+
namespace {
seastar::logger& logger() {
return crimson::get_logger(ceph_subsys_osd);
PG::~PG() {}
bool PG::try_flush_or_schedule_async() {
+ logger().debug("PG::try_flush_or_schedule_async: do_transaction...");
(void)shard_services.get_store().do_transaction(
coll_ref,
ObjectStore::Transaction()).then(
return false;
}
+void PG::publish_stats_to_osd()
+{
+ if (!is_primary())
+ return;
+ if (auto new_pg_stats = peering_state.prepare_stats_for_publish(
+ pg_stats,
+ object_stat_collection_t());
+ new_pg_stats.has_value()) {
+ pg_stats = std::move(new_pg_stats);
+ }
+}
+
+void PG::clear_publish_stats()
+{
+ pg_stats.reset();
+}
+
+pg_stat_t PG::get_stats() const
+{
+ return pg_stats.value_or(pg_stat_t{});
+}
+
void PG::queue_check_readable(epoch_t last_peering_reset, ceph::timespan delay)
{
// handle the peering event in the background
shard_services,
pg_whoami,
pgid,
+ float(0.001),
get_osdmap_epoch(),
get_osdmap_epoch(),
PeeringState::DoRecovery{});
shard_services,
pg_whoami,
pgid,
+ float(0.001),
get_osdmap_epoch(),
get_osdmap_epoch(),
PeeringState::RequestBackfill{});
shard_services,
pg_whoami,
pgid,
+ float(0.001),
get_osdmap_epoch(),
get_osdmap_epoch(),
PeeringState::AllReplicasRecovered{});
}
+ publish_stats_to_osd();
backend->on_activate_complete();
}
const vector<int>& newacting, int new_acting_primary,
const pg_history_t& history,
const PastIntervals& pi,
- bool backfill,
ObjectStore::Transaction &t)
{
peering_state.init(
role, newup, new_up_primary, newacting,
- new_acting_primary, history, pi, backfill, t);
+ new_acting_primary, history, pi, t);
}
seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
crimson::common::system_shutdown_exception());
}
- return seastar::do_with(PGMeta(store, pgid), [] (auto& pg_meta) {
+ return seastar::do_with(PGMeta(*store, pgid), [] (auto& pg_meta) {
return pg_meta.load();
}).then([this, store](auto&& ret) {
auto [pg_info, past_intervals] = std::move(ret);
});
}
-void PG::do_peering_event(
- const boost::statechart::event_base &evt,
- PeeringCtx &rctx)
-{
- peering_state.handle_event(
- evt,
- &rctx);
- peering_state.write_if_dirty(rctx.transaction);
-}
-
void PG::do_peering_event(
PGPeeringEvent& evt, PeeringCtx &rctx)
{
- if (!peering_state.pg_has_reset_since(evt.get_epoch_requested())) {
- logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
- do_peering_event(evt.get_event(), rctx);
- } else {
+ if (peering_state.pg_has_reset_since(evt.get_epoch_requested()) ||
+ peering_state.pg_has_reset_since(evt.get_epoch_sent())) {
logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
+ } else {
+ logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
+ peering_state.handle_event(
+ evt.get_event(),
+ &rctx);
+ peering_state.write_if_dirty(rctx.transaction);
}
}
void PG::handle_initialize(PeeringCtx &rctx)
{
- PeeringState::Initialize evt;
- peering_state.handle_event(evt, &rctx);
+ peering_state.handle_event(PeeringState::Initialize{}, &rctx);
}
return seastar::now();
}
-seastar::future<> PG::submit_transaction(const OpInfo& op_info,
- const std::vector<OSDOp>& ops,
- ObjectContextRef&& obc,
- ceph::os::Transaction&& txn,
- const osd_op_params_t& osd_op_p)
+std::tuple<PG::interruptible_future<>,
+ PG::interruptible_future<>>
+PG::submit_transaction(
+ const OpInfo& op_info,
+ const std::vector<OSDOp>& ops,
+ ObjectContextRef&& obc,
+ ceph::os::Transaction&& txn,
+ osd_op_params_t&& osd_op_p)
{
if (__builtin_expect(stopping, false)) {
- return seastar::make_exception_future<>(
- crimson::common::system_shutdown_exception());
+ return {seastar::make_exception_future<>(
+ crimson::common::system_shutdown_exception()),
+ seastar::now()};
}
epoch_t map_epoch = get_osdmap_epoch();
pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
obc->obs.oi.soid, osd_op_p.at_version, obc->obs.oi.version,
osd_op_p.user_modify ? osd_op_p.at_version.version : 0,
- osd_op_p.req->get_reqid(), osd_op_p.req->get_mtime(),
+ osd_op_p.req_id, osd_op_p.mtime,
op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
// TODO: refactor the submit_transaction
if (op_info.allows_returnvec()) {
peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
txn, true, false);
- return backend->mutate_object(peering_state.get_acting_recovery_backfill(),
- std::move(obc),
- std::move(txn),
- std::move(osd_op_p),
- peering_state.get_last_peering_reset(),
- map_epoch,
- std::move(log_entries)).then(
+ auto [submitted, all_completed] = backend->mutate_object(
+ peering_state.get_acting_recovery_backfill(),
+ std::move(obc),
+ std::move(txn),
+ std::move(osd_op_p),
+ peering_state.get_last_peering_reset(),
+ map_epoch,
+ std::move(log_entries));
+ return std::make_tuple(std::move(submitted), all_completed.then_interruptible(
[this, last_complete=peering_state.get_info().last_complete,
at_version=osd_op_p.at_version](auto acked) {
for (const auto& peer : acked) {
}
peering_state.complete_write(at_version, last_complete);
return seastar::now();
- });
+ }));
}
-osd_op_params_t&& PG::fill_op_params_bump_pg_version(
- osd_op_params_t&& osd_op_p,
- Ref<MOSDOp> m,
+void PG::fill_op_params_bump_pg_version(
+ osd_op_params_t& osd_op_p,
const bool user_modify)
{
- osd_op_p.req = std::move(m);
osd_op_p.at_version = next_version();
osd_op_p.pg_trim_to = get_pg_trim_to();
osd_op_p.min_last_complete_ondisk = get_min_last_complete_ondisk();
if (user_modify) {
osd_op_p.user_at_version = osd_op_p.at_version.version;
}
- return std::move(osd_op_p);
}
-seastar::future<Ref<MOSDOpReply>> PG::handle_failed_op(
- const std::error_code& e,
- ObjectContextRef obc,
- const OpsExecuter& ox,
- const MOSDOp& m) const
+PG::interruptible_future<> PG::repair_object(
+ const hobject_t& oid,
+ eversion_t& v)
{
- // Oops, an operation had failed. do_osd_ops() altogether with
- // OpsExecuter already dropped the ObjectStore::Transaction if
- // there was any. However, this is not enough to completely
- // rollback as we gave OpsExecuter the very single copy of `obc`
- // we maintain and we did it for both reading and writing.
- // Now all modifications must be reverted.
- //
- // Let's just reload from the store. Evicting from the shared
- // LRU would be tricky as next MOSDOp (the one at `get_obc`
- // phase) could actually already finished the lookup. Fortunately,
- // this is supposed to live on cold paths, so performance is not
- // a concern -- simplicity wins.
- //
- // The conditional's purpose is to efficiently handle hot errors
- // which may appear as a result of e.g. CEPH_OSD_OP_CMPXATTR or
- // CEPH_OSD_OP_OMAP_CMP. These are read-like ops and clients
- // typically append them before any write. If OpsExecuter hasn't
- // seen any modifying operation, `obc` is supposed to be kept
- // unchanged.
- assert(e.value() > 0);
- const bool need_reload_obc = ox.has_seen_write();
- logger().debug(
- "{}: {} - object {} got error code {}, {}; need_reload_obc {}",
- __func__,
- m,
- obc->obs.oi.soid,
- e.value(),
- e.message(),
- need_reload_obc);
- return (need_reload_obc ? reload_obc(*obc)
- : load_obc_ertr::now()
- ).safe_then([&e, &m, obc = std::move(obc), this] {
- auto reply = make_message<MOSDOpReply>(
- &m, -e.value(), get_osdmap_epoch(), 0, false);
- reply->set_enoent_reply_versions(
- peering_state.get_info().last_update,
- peering_state.get_info().last_user_version);
- return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
- }, load_obc_ertr::assert_all{ "can't live with object state messed up" });
+ // see also PrimaryLogPG::rep_repair_primary_object()
+ assert(is_primary());
+ logger().debug("{}: {} peers osd.{}", __func__, oid, get_acting_recovery_backfill());
+ // Add object to PG's missing set if it isn't there already
+ assert(!get_local_missing().is_missing(oid));
+ peering_state.force_object_missing(pg_whoami, oid, v);
+ auto [op, fut] = get_shard_services().start_operation<UrgentRecovery>(
+ oid, v, this, get_shard_services(), get_osdmap_epoch());
+ return std::move(fut);
}
-seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(
- Ref<MOSDOp> m,
- ObjectContextRef obc,
- const OpInfo &op_info)
+template <class Ret, class SuccessFunc, class FailureFunc>
+PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ret>>
+PG::do_osd_ops_execute(
+ seastar::lw_shared_ptr<OpsExecuter> ox,
+ std::vector<OSDOp>& ops,
+ const OpInfo &op_info,
+ SuccessFunc&& success_func,
+ FailureFunc&& failure_func)
{
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
-
- using osd_op_errorator = OpsExecuter::osd_op_errorator;
- const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
- : m->get_hobj();
- auto ox = std::make_unique<OpsExecuter>(
- obc, op_info, get_pool().info, get_backend(), *m);
- return crimson::do_for_each(
- m->ops, [obc, m, ox = ox.get()](OSDOp& osd_op) {
+ assert(ox);
+ auto rollbacker = ox->create_rollbacker([this] (auto& obc) {
+ return reload_obc(obc).handle_error_interruptible(
+ load_obc_ertr::assert_all{"can't live with object state messed up"});
+ });
+ auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func));
+ return interruptor::do_for_each(ops, [ox](OSDOp& osd_op) {
logger().debug(
- "do_osd_ops: {} - object {} - handling op {}",
- *m,
- obc->obs.oi.soid,
+ "do_osd_ops_execute: object {} - handling op {}",
+ ox->get_target(),
ceph_osd_op_name(osd_op.op.op));
return ox->execute_op(osd_op);
- }).safe_then([this, obc, m, ox = ox.get(), &op_info] {
+ }).safe_then_interruptible([this, ox, &op_info, &ops] {
logger().debug(
- "do_osd_ops: {} - object {} all operations successful",
- *m,
- obc->obs.oi.soid);
- return std::move(*ox).flush_changes(
- [m] (auto&& obc) -> osd_op_errorator::future<> {
- logger().debug(
- "do_osd_ops: {} - object {} txn is empty, bypassing mutate",
- *m,
- obc->obs.oi.soid);
- return osd_op_errorator::now();
- },
- [this, m, &op_info] (auto&& txn,
- auto&& obc,
- auto&& osd_op_p,
- bool user_modify) -> osd_op_errorator::future<> {
+ "do_osd_ops_execute: object {} all operations successful",
+ ox->get_target());
+ peering_state.apply_op_stats(ox->get_target(), ox->get_stats());
+ return std::move(*ox).flush_changes_n_do_ops_effects(
+ [this, &op_info, &ops] (auto&& txn,
+ auto&& obc,
+ auto&& osd_op_p,
+ bool user_modify) {
logger().debug(
- "do_osd_ops: {} - object {} submitting txn",
- *m,
- obc->obs.oi.soid);
- auto filled_osd_op_p = fill_op_params_bump_pg_version(
- std::move(osd_op_p),
- std::move(m),
- user_modify);
+ "do_osd_ops_execute: object {} submitting txn",
+ obc->get_oid());
+ fill_op_params_bump_pg_version(osd_op_p, user_modify);
return submit_transaction(
op_info,
- filled_osd_op_p.req->ops,
+ ops,
std::move(obc),
std::move(txn),
- std::move(filled_osd_op_p));
- });
- }).safe_then([this,
- m,
- obc,
- rvec = op_info.allows_returnvec()] {
- // TODO: should stop at the first op which returns a negative retval,
- // cmpext uses it for returning the index of first unmatched byte
- int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
- if (result > 0 && !rvec) {
- result = 0;
- }
- auto reply = make_message<MOSDOpReply>(m.get(),
- result,
- get_osdmap_epoch(),
- 0,
- false);
- reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
- logger().debug(
- "do_osd_ops: {} - object {} sending reply",
- *m,
- obc->obs.oi.soid);
- return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
- }, osd_op_errorator::all_same_way([ox = ox.get(),
- m,
- obc,
- this] (const std::error_code& e) {
- return handle_failed_op(e, std::move(obc), *ox, *m);
- })).handle_exception_type([ox_deleter = std::move(ox),
- m,
- obc,
- this] (const crimson::osd::error& e) {
- // we need this handler because throwing path which aren't errorated yet.
- logger().debug("encountered the legacy error handling path!");
- return handle_failed_op(e.code(), std::move(obc), *ox_deleter, *m);
- });
+ std::move(osd_op_p));
+ });
+ }).safe_then_unpack_interruptible(
+ [success_func=std::move(success_func), rollbacker, this, failure_func_ptr]
+ (auto submitted_fut, auto all_completed_fut) mutable {
+ return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
+ std::move(submitted_fut),
+ all_completed_fut.safe_then_interruptible_tuple(
+ std::move(success_func),
+ crimson::ct_error::object_corrupted::handle(
+ [rollbacker, this] (const std::error_code& e) mutable {
+ // this is a path for EIO. it's special because we want to fix the obejct
+ // and try again. that is, the layer above `PG::do_osd_ops` is supposed to
+ // restart the execution.
+ return rollbacker.rollback_obc_if_modified(e).then_interruptible(
+ [obc=rollbacker.get_obc(), this] {
+ return repair_object(obc->obs.oi.soid,
+ obc->obs.oi.version).then_interruptible([] {
+ return do_osd_ops_iertr::future<Ret>{crimson::ct_error::eagain::make()};
+ });
+ });
+ }), OpsExecuter::osd_op_errorator::all_same_way(
+ [rollbacker, failure_func_ptr]
+ (const std::error_code& e) mutable {
+ return rollbacker.rollback_obc_if_modified(e).then_interruptible(
+ [e, failure_func_ptr] {
+ return (*failure_func_ptr)(e);
+ });
+ })
+ )
+ );
+ }, OpsExecuter::osd_op_errorator::all_same_way(
+ [rollbacker, failure_func_ptr]
+ (const std::error_code& e) mutable {
+ return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
+ seastar::now(),
+ rollbacker.rollback_obc_if_modified(e).then_interruptible(
+ [e, failure_func_ptr] {
+ return (*failure_func_ptr)(e);
+ }));
+ }));
}
-seastar::future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
+PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
+PG::do_osd_ops(
+ Ref<MOSDOp> m,
+ ObjectContextRef obc,
+ const OpInfo &op_info)
+{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+ return do_osd_ops_execute<MURef<MOSDOpReply>>(
+ seastar::make_lw_shared<OpsExecuter>(
+ Ref<PG>{this}, std::move(obc), op_info, *m),
+ m->ops,
+ op_info,
+ [this, m, rvec = op_info.allows_returnvec()] {
+ // TODO: should stop at the first op which returns a negative retval,
+ // cmpext uses it for returning the index of first unmatched byte
+ int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
+ if (result > 0 && !rvec) {
+ result = 0;
+ }
+ auto reply = crimson::make_message<MOSDOpReply>(m.get(),
+ result,
+ get_osdmap_epoch(),
+ 0,
+ false);
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ logger().debug(
+ "do_osd_ops: {} - object {} sending reply",
+ *m,
+ m->get_hobj());
+ return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
+ std::move(reply));
+ },
+ [m, this] (const std::error_code& e) {
+ auto reply = crimson::make_message<MOSDOpReply>(
+ m.get(), -e.value(), get_osdmap_epoch(), 0, false);
+ reply->set_enoent_reply_versions(
+ peering_state.get_info().last_update,
+ peering_state.get_info().last_user_version);
+ return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
+ });
+}
+
+PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<>>
+PG::do_osd_ops(
+ ObjectContextRef obc,
+ std::vector<OSDOp>& ops,
+ const OpInfo &op_info,
+ const do_osd_ops_params_t& msg_params,
+ do_osd_ops_success_func_t success_func,
+ do_osd_ops_failure_func_t failure_func)
+{
+ return do_osd_ops_execute<void>(
+ seastar::make_lw_shared<OpsExecuter>(
+ Ref<PG>{this}, std::move(obc), op_info, msg_params),
+ ops,
+ std::as_const(op_info),
+ std::move(success_func),
+ std::move(failure_func));
+}
+
+PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
{
if (__builtin_expect(stopping, false)) {
throw crimson::common::system_shutdown_exception();
auto ox = std::make_unique<PgOpsExecuter>(std::as_const(*this),
std::as_const(*m));
- return seastar::do_for_each(m->ops, [ox = ox.get()](OSDOp& osd_op) {
+ return interruptor::do_for_each(m->ops, [ox = ox.get()](OSDOp& osd_op) {
logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op.op.op));
return ox->execute_op(osd_op);
- }).then([m, this, ox = std::move(ox)] {
- auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
+ }).then_interruptible([m, this, ox = std::move(ox)] {
+ auto reply = crimson::make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
false);
- return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
- }).handle_exception_type([=](const crimson::osd::error& e) {
- auto reply = make_message<MOSDOpReply>(
+ return seastar::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
+ }).handle_exception_type_interruptible([=](const crimson::osd::error& e) {
+ auto reply = crimson::make_message<MOSDOpReply>(
m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
reply->set_enoent_reply_versions(peering_state.get_info().last_update,
peering_state.get_info().last_user_version);
- return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ return seastar::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
});
}
-hobject_t PG::get_oid(const MOSDOp &m)
+hobject_t PG::get_oid(const hobject_t& hobj)
{
- return (m.get_snapid() == CEPH_SNAPDIR ?
- m.get_hobj().get_head() :
- m.get_hobj());
+ return hobj.snap == CEPH_SNAPDIR ? hobj.get_head() : hobj;
}
RWState::State PG::get_lock_type(const OpInfo &op_info)
}
template<RWState::State State>
-PG::load_obc_ertr::future<>
-PG::with_head_obc(hobject_t oid, with_obc_func_t&& func)
+PG::load_obc_iertr::future<>
+PG::with_head_obc(ObjectContextRef obc, bool existed, with_obc_func_t&& func)
{
- assert(oid.is_head());
- auto [obc, existed] = shard_services.obc_registry.get_cached_obc(oid);
- return obc->with_lock<State>(
- [oid=std::move(oid), existed=existed, obc=std::move(obc),
- func=std::move(func), this] {
- auto loaded = load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
+ logger().debug("{} {}", __func__, obc->get_oid());
+ assert(obc->is_head());
+ obc->append_to(obc_set_accessing);
+ return obc->with_lock<State, IOInterruptCondition>(
+ [existed=existed, obc=obc, func=std::move(func), this] {
+ auto loaded = load_obc_iertr::make_ready_future<ObjectContextRef>(obc);
if (existed) {
- logger().debug("with_head_obc: found {} in cache", oid);
+ logger().debug("with_head_obc: found {} in cache", obc->get_oid());
} else {
- logger().debug("with_head_obc: cache miss on {}", oid);
- loaded = obc->with_promoted_lock<State>([this, obc] {
+ logger().debug("with_head_obc: cache miss on {}", obc->get_oid());
+ loaded = obc->with_promoted_lock<State, IOInterruptCondition>([this, obc] {
return load_head_obc(obc);
});
}
- return loaded.safe_then([func=std::move(func)](auto obc) {
- return func(std::move(obc));
+ return loaded.safe_then_interruptible([func = std::move(func)](auto obc) {
+ return std::move(func)(std::move(obc));
});
+ }).finally([this, pgref=boost::intrusive_ptr<PG>{this}, obc=std::move(obc)] {
+ logger().debug("with_head_obc: released {}", obc->get_oid());
+ obc->remove_from(obc_set_accessing);
});
}
template<RWState::State State>
-PG::load_obc_ertr::future<>
+PG::load_obc_iertr::future<>
+PG::with_head_obc(hobject_t oid, with_obc_func_t&& func)
+{
+ auto [obc, existed] =
+ shard_services.obc_registry.get_cached_obc(std::move(oid));
+ return with_head_obc<State>(std::move(obc), existed, std::move(func));
+}
+
+template<RWState::State State>
+PG::interruptible_future<>
+PG::with_existing_head_obc(ObjectContextRef obc, with_obc_func_t&& func)
+{
+ constexpr bool existed = true;
+ return with_head_obc<State>(
+ std::move(obc), existed, std::move(func)
+ ).handle_error_interruptible(load_obc_ertr::assert_all{"can't happen"});
+}
+
+template<RWState::State State>
+PG::load_obc_iertr::future<>
PG::with_clone_obc(hobject_t oid, with_obc_func_t&& func)
{
assert(!oid.is_head());
return with_head_obc<RWState::RWREAD>(oid.get_head(),
- [oid, func=std::move(func), this](auto head) -> load_obc_ertr::future<> {
+ [oid, func=std::move(func), this](auto head) -> load_obc_iertr::future<> {
auto coid = resolve_oid(head->get_ro_ss(), oid);
if (!coid) {
// TODO: return crimson::ct_error::enoent::make();
return clone->template with_lock<State>(
[coid=*coid, existed=existed,
head=std::move(head), clone=std::move(clone),
- func=std::move(func), this]() -> load_obc_ertr::future<> {
- auto loaded = load_obc_ertr::make_ready_future<ObjectContextRef>(clone);
+ func=std::move(func), this]() -> load_obc_iertr::future<> {
+ auto loaded = load_obc_iertr::make_ready_future<ObjectContextRef>(clone);
if (existed) {
logger().debug("with_clone_obc: found {} in cache", coid);
} else {
logger().debug("with_clone_obc: cache miss on {}", coid);
loaded = clone->template with_promoted_lock<State>(
[coid, clone, head, this] {
- return backend->load_metadata(coid).safe_then(
+ return backend->load_metadata(coid).safe_then_interruptible(
[coid, clone=std::move(clone), head=std::move(head)](auto md) mutable {
clone->set_clone_state(std::move(md->os), std::move(head));
return clone;
});
});
}
- return loaded.safe_then([func=std::move(func)](auto clone) {
- return func(std::move(clone));
+ return loaded.safe_then_interruptible([func = std::move(func)](auto clone) {
+ return std::move(func)(std::move(clone));
});
});
});
}
// explicitly instantiate the used instantiations
-template PG::load_obc_ertr::future<>
+template PG::load_obc_iertr::future<>
PG::with_head_obc<RWState::RWNONE>(hobject_t, with_obc_func_t&&);
-PG::load_obc_ertr::future<crimson::osd::ObjectContextRef>
+template<RWState::State State>
+PG::interruptible_future<>
+PG::with_existing_clone_obc(ObjectContextRef clone, with_obc_func_t&& func)
+{
+ assert(clone);
+ assert(clone->get_head_obc());
+ assert(!clone->get_oid().is_head());
+ return with_existing_head_obc<RWState::RWREAD>(clone->get_head_obc(),
+ [clone=std::move(clone), func=std::move(func)] ([[maybe_unused]] auto head) {
+ assert(head == clone->get_head_obc());
+ return clone->template with_lock<State>(
+ [clone=std::move(clone), func=std::move(func)] {
+ return std::move(func)(std::move(clone));
+ });
+ });
+}
+
+PG::load_obc_iertr::future<crimson::osd::ObjectContextRef>
PG::load_head_obc(ObjectContextRef obc)
{
- hobject_t oid = obc->get_oid();
- return backend->load_metadata(oid).safe_then([obc=std::move(obc)](auto md)
+ return backend->load_metadata(obc->get_oid()).safe_then_interruptible(
+ [obc=std::move(obc)](auto md)
-> load_obc_ertr::future<crimson::osd::ObjectContextRef> {
const hobject_t& oid = md->os.oi.soid;
logger().debug(
});
}
-PG::load_obc_ertr::future<>
+PG::load_obc_iertr::future<>
PG::reload_obc(crimson::osd::ObjectContext& obc) const
{
assert(obc.is_head());
- return backend->load_metadata(obc.get_oid()).safe_then([&obc](auto md)
+ return backend->load_metadata(obc.get_oid()).safe_then_interruptible<false>([&obc](auto md)
-> load_obc_ertr::future<> {
logger().debug(
"{}: reloaded obs {} for {}",
});
}
-PG::load_obc_ertr::future<>
-PG::with_locked_obc(Ref<MOSDOp> &m, const OpInfo &op_info,
- Operation *op, PG::with_obc_func_t &&f)
+PG::load_obc_iertr::future<>
+PG::with_locked_obc(const hobject_t &hobj,
+ const OpInfo &op_info,
+ with_obc_func_t &&f)
{
if (__builtin_expect(stopping, false)) {
throw crimson::common::system_shutdown_exception();
}
- const hobject_t oid = get_oid(*m);
+ const hobject_t oid = get_oid(hobj);
switch (get_lock_type(op_info)) {
case RWState::RWREAD:
if (oid.is_head()) {
}
case RWState::RWEXCL:
if (oid.is_head()) {
- return with_head_obc<RWState::RWWRITE>(oid, std::move(f));
+ return with_head_obc<RWState::RWEXCL>(oid, std::move(f));
} else {
- return with_clone_obc<RWState::RWWRITE>(oid, std::move(f));
+ return with_clone_obc<RWState::RWEXCL>(oid, std::move(f));
}
default:
ceph_abort();
};
}
-seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
+template <RWState::State State>
+PG::interruptible_future<>
+PG::with_locked_obc(ObjectContextRef obc, with_obc_func_t &&f)
+{
+ // TODO: a question from rebase: do we really need such checks when
+ // the interruptible stuff is being used?
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+ if (obc->is_head()) {
+ return with_existing_head_obc<State>(obc, std::move(f));
+ } else {
+ return with_existing_clone_obc<State>(obc, std::move(f));
+ }
+}
+
+// explicitly instantiate the used instantiations
+template PG::interruptible_future<>
+PG::with_locked_obc<RWState::RWEXCL>(ObjectContextRef, with_obc_func_t&&);
+
+PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
{
if (__builtin_expect(stopping, false)) {
return seastar::make_exception_future<>(
decode(log_entries, p);
peering_state.append_log(std::move(log_entries), req->pg_trim_to,
req->version, req->min_last_complete_ondisk, txn, !txn.empty(), false);
- return shard_services.get_store().do_transaction(coll_ref, std::move(txn))
- .then([req, lcod=peering_state.get_info().last_complete, this] {
+ logger().debug("PG::handle_rep_op: do_transaction...");
+ return interruptor::make_interruptible(shard_services.get_store().do_transaction(
+ coll_ref, std::move(txn))).then_interruptible(
+ [req, lcod=peering_state.get_info().last_complete, this] {
peering_state.update_last_complete_ondisk(lcod);
const auto map_epoch = get_osdmap_epoch();
- auto reply = make_message<MOSDRepOpReply>(
+ auto reply = crimson::make_message<MOSDRepOpReply>(
req.get(), pg_whoami, 0,
map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
reply->set_last_complete_ondisk(lcod);
- return shard_services.send_to_osd(req->from.osd, reply, map_epoch);
+ return shard_services.send_to_osd(req->from.osd, std::move(reply), map_epoch);
});
}
}
}
-template <typename MsgType>
-bool PG::can_discard_replica_op(const MsgType& m) const
+bool PG::old_peering_msg(
+ const epoch_t reply_epoch,
+ const epoch_t query_epoch) const
+{
+ if (const epoch_t lpr = peering_state.get_last_peering_reset();
+ lpr > reply_epoch || lpr > query_epoch) {
+ logger().debug("{}: pg changed {} lpr {}, reply_epoch {}, query_epoch {}",
+ __func__, get_info().history, lpr, reply_epoch, query_epoch);
+ return true;
+ }
+ return false;
+}
+
+bool PG::can_discard_replica_op(const Message& m, epoch_t m_map_epoch) const
{
// if a repop is replied after a replica goes down in a new osdmap, and
// before the pg advances to this new osdmap, the repop replies before this
// sent by replicas not in the acting set, since
// if such a replica goes down it does not cause
// a new interval.
- if (osdmap->get_down_at(from_osd) >= m.map_epoch) {
+ if (osdmap->get_down_at(from_osd) >= m_map_epoch) {
return true;
}
// same pg?
// if pg changes *at all*, we reset and repeer!
- if (epoch_t lpr = peering_state.get_last_peering_reset();
- lpr > m.map_epoch) {
- logger().debug("{}: pg changed {} after {}, dropping",
- __func__, get_info().history, m.map_epoch);
- return true;
- }
- return false;
+ return old_peering_msg(m_map_epoch, m_map_epoch);
}
seastar::future<> PG::stop()
{
logger().info("PG {} {}", pgid, __func__);
stopping = true;
+ cancel_local_background_io_reservation();
+ cancel_remote_recovery_reservation();
+ check_readable_timer.cancel();
+ renew_lease_timer.cancel();
return osdmap_gate.stop().then([this] {
return wait_for_active_blocker.stop();
}).then([this] {
}
void PG::on_change(ceph::os::Transaction &t) {
+ logger().debug("{}, {}", __func__, *this);
+ for (auto& obc : obc_set_accessing) {
+ obc.interrupt(::crimson::common::actingset_changed(is_primary()));
+ }
recovery_backend->on_peering_interval_change(t);
backend->on_actingset_changed({ is_primary() });
}
return false;
}
+PG::interruptible_future<std::tuple<bool, int>>
+PG::already_complete(const osd_reqid_t& reqid)
+{
+ eversion_t version;
+ version_t user_version;
+ int ret;
+ std::vector<pg_log_op_return_item_t> op_returns;
+
+ if (peering_state.get_pg_log().get_log().get_request(
+ reqid, &version, &user_version, &ret, &op_returns)) {
+ return backend->request_committed(reqid, version).then([ret] {
+ return seastar::make_ready_future<std::tuple<bool, int>>(true, ret);
+ });
+ } else {
+ return seastar::make_ready_future<std::tuple<bool, int>>(false, 0);
+ }
+}
+
}