#include <fmt/format.h>
#include <fmt/ostream.h>
+#include "common/hobject_fmt.h"
+
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
#include "messages/MOSDRepOp.h"
#include "messages/MOSDRepOpReply.h"
#include "osd/OSDMap.h"
+#include "osd/osd_types_fmt.h"
#include "os/Transaction.h"
#include "crimson/osd/ops_executer.h"
#include "crimson/osd/osd_operations/osdop_params.h"
#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_operations/background_recovery.h"
+#include "crimson/osd/osd_operations/snaptrim_event.h"
#include "crimson/osd/pg_recovery.h"
#include "crimson/osd/replicated_recovery_backend.h"
+#include "crimson/osd/watch.h"
using std::ostream;
using std::set;
}
}
+template <typename T>
+struct fmt::formatter<std::optional<T>> : fmt::formatter<T> {
+ template <typename FormatContext>
+ auto format(const std::optional<T>& v, FormatContext& ctx) const {
+ if (v.has_value()) {
+ return fmt::formatter<T>::format(*v, ctx);
+ }
+ return fmt::format_to(ctx.out(), "<null>");
+ }
+};
+
namespace crimson::osd {
using crimson::common::local_conf;
pg_whoami{pg_shard},
coll_ref{coll_ref},
pgmeta_oid{pgid.make_pgmeta_oid()},
- osdmap_gate("PG::osdmap_gate", std::nullopt),
+ osdmap_gate("PG::osdmap_gate"),
shard_services{shard_services},
- osdmap{osdmap},
backend(
PGBackend::create(
pgid.pgid,
pool,
coll_ref,
shard_services,
- profile)),
+ profile,
+ *this)),
recovery_backend(
std::make_unique<ReplicatedRecoveryBackend>(
*this, shard_services, coll_ref, backend.get())),
osdmap,
this,
this),
+ obc_registry{
+ local_conf()},
+ obc_loader{
+ obc_registry,
+ *backend.get(),
+ *this},
+ osdriver(
+ &shard_services.get_store(),
+ coll_ref,
+ pgid.make_pgmeta_oid()),
+ snap_mapper(
+ this->shard_services.get_cct(),
+ &osdriver,
+ pgid.ps(),
+ pgid.get_split_bits(pool.get_pg_num()),
+ pgid.pool(),
+ pgid.shard),
wait_for_active_blocker(this)
{
peering_state.set_backend_predicates(
PG::~PG() {}
+void PG::check_blocklisted_watchers()
+{
+ logger().debug("{}", __func__);
+ obc_registry.for_each([this](ObjectContextRef obc) {
+ assert(obc);
+ for (const auto& [key, watch] : obc->watchers) {
+ assert(watch->get_pg() == this);
+ const auto& ea = watch->get_peer_addr();
+ logger().debug("watch: Found {} cookie {}. Checking entity_add_t {}",
+ watch->get_entity(), watch->get_cookie(), ea);
+ if (get_osdmap()->is_blocklisted(ea)) {
+ logger().info("watch: Found blocklisted watcher for {}", ea);
+ watch->do_watch_timeout();
+ }
+ }
+ });
+}
+
bool PG::try_flush_or_schedule_async() {
- logger().debug("PG::try_flush_or_schedule_async: do_transaction...");
- (void)shard_services.get_store().do_transaction(
- coll_ref,
- ObjectStore::Transaction()).then(
- [this, epoch=get_osdmap_epoch()]() {
- return shard_services.start_operation<LocalPeeringEvent>(
- this,
- shard_services,
- pg_whoami,
- pgid,
- epoch,
- epoch,
- PeeringState::IntervalFlush());
- });
+ logger().debug("PG::try_flush_or_schedule_async: flush ...");
+ (void)shard_services.get_store().flush(
+ coll_ref
+ ).then(
+ [this, epoch=get_osdmap_epoch()]() {
+ return shard_services.start_operation<LocalPeeringEvent>(
+ this,
+ pg_whoami,
+ pgid,
+ epoch,
+ epoch,
+ PeeringState::IntervalFlush());
+ });
return false;
}
void PG::queue_check_readable(epoch_t last_peering_reset, ceph::timespan delay)
{
// handle the peering event in the background
+ logger().debug(
+ "{}: PG::queue_check_readable lpr: {}, delay: {}",
+ *this, last_peering_reset, delay);
check_readable_timer.cancel();
check_readable_timer.set_callback([last_peering_reset, this] {
+ logger().debug(
+ "{}: PG::queue_check_readable callback lpr: {}",
+ *this, last_peering_reset);
(void) shard_services.start_operation<LocalPeeringEvent>(
this,
- shard_services,
pg_whoami,
pgid,
last_peering_reset,
if (peering_state.state_test(PG_STATE_WAIT)) {
auto prior_readable_until_ub = peering_state.get_prior_readable_until_ub();
if (mnow < prior_readable_until_ub) {
- logger().info("{} will wait (mnow {} < prior_readable_until_ub {})",
- __func__, mnow, prior_readable_until_ub);
+ logger().info(
+ "{}: {} will wait (mnow {} < prior_readable_until_ub {})",
+ *this, __func__, mnow, prior_readable_until_ub);
+ queue_check_readable(
+ peering_state.get_last_peering_reset(),
+ prior_readable_until_ub - mnow);
} else {
- logger().info("{} no longer wait (mnow {} >= prior_readable_until_ub {})",
- __func__, mnow, prior_readable_until_ub);
+ logger().info(
+ "{}:{} no longer wait (mnow {} >= prior_readable_until_ub {})",
+ *this, __func__, mnow, prior_readable_until_ub);
peering_state.state_clear(PG_STATE_WAIT);
peering_state.clear_prior_readable_until_ub();
changed = true;
if (peering_state.state_test(PG_STATE_LAGGY)) {
auto readable_until = peering_state.get_readable_until();
if (readable_until == readable_until.zero()) {
- logger().info("{} still laggy (mnow {}, readable_until zero)",
- __func__, mnow);
+ logger().info(
+ "{}:{} still laggy (mnow {}, readable_until zero)",
+ *this, __func__, mnow);
} else if (mnow >= readable_until) {
- logger().info("{} still laggy (mnow {} >= readable_until {})",
- __func__, mnow, readable_until);
+ logger().info(
+ "{}:{} still laggy (mnow {} >= readable_until {})",
+ *this, __func__, mnow, readable_until);
} else {
- logger().info("{} no longer laggy (mnow {} < readable_until {})",
- __func__, mnow, readable_until);
+ logger().info(
+ "{}:{} no longer laggy (mnow {} < readable_until {})",
+ *this, __func__, mnow, readable_until);
peering_state.state_clear(PG_STATE_LAGGY);
changed = true;
}
unsigned PG::get_target_pg_log_entries() const
{
- const unsigned num_pgs = shard_services.get_pg_num();
- const unsigned target =
- local_conf().get_val<uint64_t>("osd_target_pg_log_entries_per_osd");
+ const unsigned local_num_pgs = shard_services.get_num_local_pgs();
+ const unsigned local_target =
+ local_conf().get_val<uint64_t>("osd_target_pg_log_entries_per_osd") /
+ seastar::smp::count;
const unsigned min_pg_log_entries =
local_conf().get_val<uint64_t>("osd_min_pg_log_entries");
- if (num_pgs > 0 && target > 0) {
+ if (local_num_pgs > 0 && local_target > 0) {
// target an even spread of our budgeted log entries across all
// PGs. note that while we only get to control the entry count
// for primary PGs, we'll normally be responsible for a mix of
// will work out.
const unsigned max_pg_log_entries =
local_conf().get_val<uint64_t>("osd_max_pg_log_entries");
- return std::clamp(target / num_pgs,
+ return std::clamp(local_target / local_num_pgs,
min_pg_log_entries,
max_pg_log_entries);
} else {
}
}
-void PG::on_activate(interval_set<snapid_t>)
+void PG::on_removal(ceph::os::Transaction &t) {
+ t.register_on_commit(
+ new LambdaContext(
+ [this](int r) {
+ ceph_assert(r == 0);
+ (void)shard_services.start_operation<LocalPeeringEvent>(
+ this, pg_whoami, pgid, float(0.001), get_osdmap_epoch(),
+ get_osdmap_epoch(), PeeringState::DeleteSome());
+ }));
+}
+
+void PG::on_activate(interval_set<snapid_t> snaps)
{
+ logger().debug("{}: {} snaps={}", *this, __func__, snaps);
+ snap_trimq = std::move(snaps);
projected_last_update = peering_state.get_info().last_update;
}
void PG::on_activate_complete()
{
- wait_for_active_blocker.on_active();
+ wait_for_active_blocker.unblock();
if (peering_state.needs_recovery()) {
logger().info("{}: requesting recovery",
__func__);
(void) shard_services.start_operation<LocalPeeringEvent>(
this,
- shard_services,
pg_whoami,
pgid,
float(0.001),
__func__);
(void) shard_services.start_operation<LocalPeeringEvent>(
this,
- shard_services,
pg_whoami,
pgid,
float(0.001),
" for pg: {}", __func__, pgid);
(void) shard_services.start_operation<LocalPeeringEvent>(
this,
- shard_services,
pg_whoami,
pgid,
float(0.001),
PeeringState::AllReplicasRecovered{});
}
publish_stats_to_osd();
- backend->on_activate_complete();
}
void PG::prepare_write(pg_info_t &info,
}
pglog.write_log_and_missing(
t, &km, coll_ref->get_cid(), pgmeta_oid,
- peering_state.get_pool().info.require_rollback());
+ peering_state.get_pgpool().info.require_rollback());
if (!km.empty()) {
t.omap_setkeys(coll_ref->get_cid(), pgmeta_oid, km);
}
std::pair<ghobject_t, bool>
PG::do_delete_work(ceph::os::Transaction &t, ghobject_t _next)
{
- // TODO
- shard_services.dec_pg_num();
- return {_next, false};
+ logger().info("removing pg {}", pgid);
+ auto fut = interruptor::make_interruptible(
+ shard_services.get_store().list_objects(
+ coll_ref,
+ _next,
+ ghobject_t::get_max(),
+ local_conf()->osd_target_transaction_size));
+
+ auto [objs_to_rm, next] = fut.get();
+ if (objs_to_rm.empty()) {
+ logger().info("all objs removed, removing coll for {}", pgid);
+ t.remove(coll_ref->get_cid(), pgmeta_oid);
+ t.remove_collection(coll_ref->get_cid());
+ (void) shard_services.get_store().do_transaction(
+ coll_ref, std::move(t)).then([this] {
+ return shard_services.remove_pg(pgid);
+ });
+ return {next, false};
+ } else {
+ for (auto &obj : objs_to_rm) {
+ if (obj == pgmeta_oid) {
+ continue;
+ }
+ logger().trace("pg {}, removing obj {}", pgid, obj);
+ t.remove(coll_ref->get_cid(), obj);
+ }
+ t.register_on_commit(
+ new LambdaContext([this](int r) {
+ ceph_assert(r == 0);
+ logger().trace("triggering more pg delete {}", pgid);
+ (void) shard_services.start_operation<LocalPeeringEvent>(
+ this,
+ pg_whoami,
+ pgid,
+ float(0.001),
+ get_osdmap_epoch(),
+ get_osdmap_epoch(),
+ PeeringState::DeleteSome{});
+ }));
+ return {next, true};
+ }
+}
+
+Context *PG::on_clean()
+{
+ // Not needed yet (will be needed for IO unblocking)
+ return nullptr;
+}
+
+void PG::on_active_actmap()
+{
+ logger().debug("{}: {} snap_trimq={}", *this, __func__, snap_trimq);
+ peering_state.state_clear(PG_STATE_SNAPTRIM_ERROR);
+ // loops until snap_trimq is empty or SNAPTRIM_ERROR.
+ std::ignore = seastar::do_until(
+ [this] { return snap_trimq.empty()
+ || peering_state.state_test(PG_STATE_SNAPTRIM_ERROR);
+ },
+ [this] {
+ peering_state.state_set(PG_STATE_SNAPTRIM);
+ publish_stats_to_osd();
+ const auto to_trim = snap_trimq.range_start();
+ snap_trimq.erase(to_trim);
+ const auto needs_pause = !snap_trimq.empty();
+ return seastar::repeat([to_trim, needs_pause, this] {
+ logger().debug("{}: going to start SnapTrimEvent, to_trim={}",
+ *this, to_trim);
+ return shard_services.start_operation<SnapTrimEvent>(
+ this,
+ snap_mapper,
+ to_trim,
+ needs_pause
+ ).second.handle_error(
+ crimson::ct_error::enoent::handle([this] {
+ logger().error("{}: ENOENT saw, trimming stopped", *this);
+ peering_state.state_set(PG_STATE_SNAPTRIM_ERROR);
+ publish_stats_to_osd();
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ }), crimson::ct_error::eagain::handle([this] {
+ logger().info("{}: EAGAIN saw, trimming restarted", *this);
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ })
+ );
+ }).then([this, trimmed=to_trim] {
+ logger().debug("{}: trimmed snap={}", *this, trimmed);
+ });
+ }).finally([this] {
+ logger().debug("{}: PG::on_active_actmap() finished trimming",
+ *this);
+ peering_state.state_clear(PG_STATE_SNAPTRIM);
+ peering_state.state_clear(PG_STATE_SNAPTRIM_ERROR);
+ publish_stats_to_osd();
+ });
+}
+
+void PG::on_active_advmap(const OSDMapRef &osdmap)
+{
+ const auto new_removed_snaps = osdmap->get_new_removed_snaps();
+ if (auto it = new_removed_snaps.find(get_pgid().pool());
+ it != new_removed_snaps.end()) {
+ bool bad = false;
+ for (auto j : it->second) {
+ if (snap_trimq.intersects(j.first, j.second)) {
+ decltype(snap_trimq) added, overlap;
+ added.insert(j.first, j.second);
+ overlap.intersection_of(snap_trimq, added);
+ logger().error("{}: {} removed_snaps already contains {}",
+ *this, __func__, overlap);
+ bad = true;
+ snap_trimq.union_of(added);
+ } else {
+ snap_trimq.insert(j.first, j.second);
+ }
+ }
+ logger().info("{}: {} new removed snaps {}, snap_trimq now{}",
+ *this, __func__, it->second, snap_trimq);
+ assert(!bad || local_conf().get_val<bool>("osd_debug_verify_cached_snaps"));
+ }
}
void PG::scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type)
events);
}
-ceph::signedspan PG::get_mnow()
+ceph::signedspan PG::get_mnow() const
{
return shard_services.get_mnow();
}
renew_lease_timer.set_callback([last_peering_reset, this] {
(void) shard_services.start_operation<LocalPeeringEvent>(
this,
- shard_services,
pg_whoami,
pgid,
last_peering_reset,
new_acting_primary, history, pi, t);
}
-seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
+seastar::future<> PG::read_state(crimson::os::FuturizedStore::Shard* store)
{
if (__builtin_expect(stopping, false)) {
return seastar::make_exception_future<>(
epoch_t epoch = get_osdmap_epoch();
(void) shard_services.start_operation<LocalPeeringEvent>(
this,
- shard_services,
pg_whoami,
pgid,
epoch,
});
}
-void PG::do_peering_event(
+PG::interruptible_future<> PG::do_peering_event(
PGPeeringEvent& evt, PeeringCtx &rctx)
{
if (peering_state.pg_has_reset_since(evt.get_epoch_requested()) ||
peering_state.pg_has_reset_since(evt.get_epoch_sent())) {
logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
+ return interruptor::now();
} else {
logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
- peering_state.handle_event(
- evt.get_event(),
- &rctx);
- peering_state.write_if_dirty(rctx.transaction);
+ // all peering event handling needs to be run in a dedicated seastar::thread,
+ // so that event processing can involve I/O reqs freely, for example: PG::on_removal,
+ // PG::on_new_interval
+ return interruptor::async([this, &evt, &rctx] {
+ peering_state.handle_event(
+ evt.get_event(),
+ &rctx);
+ peering_state.write_if_dirty(rctx.transaction);
+ });
}
}
-void PG::handle_advance_map(
+seastar::future<> PG::handle_advance_map(
cached_map_t next_map, PeeringCtx &rctx)
{
- vector<int> newup, newacting;
- int up_primary, acting_primary;
- next_map->pg_to_up_acting_osds(
- pgid.pgid,
- &newup, &up_primary,
- &newacting, &acting_primary);
- peering_state.advance_map(
- next_map,
- peering_state.get_osdmap(),
- newup,
- up_primary,
- newacting,
- acting_primary,
- rctx);
- osdmap_gate.got_map(next_map->get_epoch());
+ return seastar::async([this, next_map=std::move(next_map), &rctx] {
+ vector<int> newup, newacting;
+ int up_primary, acting_primary;
+ next_map->pg_to_up_acting_osds(
+ pgid.pgid,
+ &newup, &up_primary,
+ &newacting, &acting_primary);
+ peering_state.advance_map(
+ next_map,
+ peering_state.get_osdmap(),
+ newup,
+ up_primary,
+ newacting,
+ acting_primary,
+ rctx);
+ osdmap_gate.got_map(next_map->get_epoch());
+ });
}
-void PG::handle_activate_map(PeeringCtx &rctx)
+seastar::future<> PG::handle_activate_map(PeeringCtx &rctx)
{
- peering_state.activate_map(rctx);
+ return seastar::async([this, &rctx] {
+ peering_state.activate_map(rctx);
+ });
}
-void PG::handle_initialize(PeeringCtx &rctx)
+seastar::future<> PG::handle_initialize(PeeringCtx &rctx)
{
- peering_state.handle_event(PeeringState::Initialize{}, &rctx);
+ return seastar::async([this, &rctx] {
+ peering_state.handle_event(PeeringState::Initialize{}, &rctx);
+ });
}
return os;
}
-void PG::WaitForActiveBlocker::dump_detail(Formatter *f) const
-{
- f->dump_stream("pgid") << pg->pgid;
-}
-
-void PG::WaitForActiveBlocker::on_active()
-{
- p.set_value();
- p = {};
-}
-
-blocking_future<> PG::WaitForActiveBlocker::wait()
-{
- if (pg->peering_state.is_active()) {
- return make_blocking_future(seastar::now());
- } else {
- return make_blocking_future(p.get_shared_future());
- }
-}
-
-seastar::future<> PG::WaitForActiveBlocker::stop()
-{
- p.set_exception(crimson::common::system_shutdown_exception());
- return seastar::now();
-}
-
std::tuple<PG::interruptible_future<>,
PG::interruptible_future<>>
PG::submit_transaction(
- const OpInfo& op_info,
- const std::vector<OSDOp>& ops,
ObjectContextRef&& obc,
ceph::os::Transaction&& txn,
- osd_op_params_t&& osd_op_p)
+ osd_op_params_t&& osd_op_p,
+ std::vector<pg_log_entry_t>&& log_entries)
{
if (__builtin_expect(stopping, false)) {
return {seastar::make_exception_future<>(
}
epoch_t map_epoch = get_osdmap_epoch();
+ ceph_assert(!has_reset_since(osd_op_p.at_version.epoch));
- if (__builtin_expect(osd_op_p.at_version.epoch != map_epoch, false)) {
- throw crimson::common::actingset_changed(is_primary());
- }
-
- std::vector<pg_log_entry_t> log_entries;
- log_entries.emplace_back(obc->obs.exists ?
- pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
- obc->obs.oi.soid, osd_op_p.at_version, obc->obs.oi.version,
- osd_op_p.user_modify ? osd_op_p.at_version.version : 0,
- osd_op_p.req_id, osd_op_p.mtime,
- op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
- // TODO: refactor the submit_transaction
- if (op_info.allows_returnvec()) {
- // also the per-op values are recorded in the pg log
- log_entries.back().set_op_returns(ops);
- logger().debug("{} op_returns: {}",
- __func__, log_entries.back().op_returns);
- }
- log_entries.back().clean_regions = std::move(osd_op_p.clean_regions);
peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, osd_op_p.at_version);
peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
txn, true, false);
}));
}
-void PG::fill_op_params_bump_pg_version(
- osd_op_params_t& osd_op_p,
- const bool user_modify)
-{
- osd_op_p.at_version = next_version();
- osd_op_p.pg_trim_to = get_pg_trim_to();
- osd_op_p.min_last_complete_ondisk = get_min_last_complete_ondisk();
- osd_op_p.last_complete = get_info().last_complete;
- if (user_modify) {
- osd_op_p.user_at_version = osd_op_p.at_version.version;
- }
-}
-
PG::interruptible_future<> PG::repair_object(
const hobject_t& oid,
eversion_t& v)
PG::do_osd_ops_execute(
seastar::lw_shared_ptr<OpsExecuter> ox,
std::vector<OSDOp>& ops,
- const OpInfo &op_info,
SuccessFunc&& success_func,
FailureFunc&& failure_func)
{
assert(ox);
auto rollbacker = ox->create_rollbacker([this] (auto& obc) {
- return reload_obc(obc).handle_error_interruptible(
+ return obc_loader.reload_obc(obc).handle_error_interruptible(
load_obc_ertr::assert_all{"can't live with object state messed up"});
});
auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func));
ox->get_target(),
ceph_osd_op_name(osd_op.op.op));
return ox->execute_op(osd_op);
- }).safe_then_interruptible([this, ox, &op_info, &ops] {
+ }).safe_then_interruptible([this, ox, &ops] {
logger().debug(
"do_osd_ops_execute: object {} all operations successful",
ox->get_target());
- peering_state.apply_op_stats(ox->get_target(), ox->get_stats());
+ // check for full
+ if ((ox->delta_stats.num_bytes > 0 ||
+ ox->delta_stats.num_objects > 0) &&
+ get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL)) {
+ const auto& m = ox->get_message();
+ if (m.get_reqid().name.is_mds() || // FIXME: ignore MDS for now
+ m.has_flag(CEPH_OSD_FLAG_FULL_FORCE)) {
+ logger().info(" full, but proceeding due to FULL_FORCE or MDS");
+ } else if (m.has_flag(CEPH_OSD_FLAG_FULL_TRY)) {
+ // they tried, they failed.
+ logger().info(" full, replying to FULL_TRY op");
+ if (get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL_QUOTA))
+ return interruptor::make_ready_future<OpsExecuter::rep_op_fut_tuple>(
+ seastar::now(),
+ OpsExecuter::osd_op_ierrorator::future<>(
+ crimson::ct_error::edquot::make()));
+ else
+ return interruptor::make_ready_future<OpsExecuter::rep_op_fut_tuple>(
+ seastar::now(),
+ OpsExecuter::osd_op_ierrorator::future<>(
+ crimson::ct_error::enospc::make()));
+ } else {
+ // drop request
+ logger().info(" full, dropping request (bad client)");
+ return interruptor::make_ready_future<OpsExecuter::rep_op_fut_tuple>(
+ seastar::now(),
+ OpsExecuter::osd_op_ierrorator::future<>(
+ crimson::ct_error::eagain::make()));
+ }
+ }
return std::move(*ox).flush_changes_n_do_ops_effects(
- [this, &op_info, &ops] (auto&& txn,
- auto&& obc,
- auto&& osd_op_p,
- bool user_modify) {
+ ops,
+ snap_mapper,
+ osdriver,
+ [this] (auto&& txn,
+ auto&& obc,
+ auto&& osd_op_p,
+ auto&& log_entries) {
logger().debug(
"do_osd_ops_execute: object {} submitting txn",
obc->get_oid());
- fill_op_params_bump_pg_version(osd_op_p, user_modify);
return submit_transaction(
- op_info,
- ops,
std::move(obc),
std::move(txn),
- std::move(osd_op_p));
+ std::move(osd_op_p),
+ std::move(log_entries));
});
}).safe_then_unpack_interruptible(
[success_func=std::move(success_func), rollbacker, this, failure_func_ptr]
(const std::error_code& e) mutable {
return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
seastar::now(),
+ e.value() == ENOENT ? (*failure_func_ptr)(e) :
rollbacker.rollback_obc_if_modified(e).then_interruptible(
[e, failure_func_ptr] {
return (*failure_func_ptr)(e);
}));
}));
}
+seastar::future<> PG::submit_error_log(
+ Ref<MOSDOp> m,
+ const OpInfo &op_info,
+ ObjectContextRef obc,
+ const std::error_code e,
+ ceph_tid_t rep_tid,
+ eversion_t &version)
+{
+ const osd_reqid_t &reqid = m->get_reqid();
+ mempool::osd_pglog::list<pg_log_entry_t> log_entries;
+ log_entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR,
+ obc->obs.oi.soid,
+ next_version(),
+ eversion_t(), 0,
+ reqid, utime_t(),
+ -e.value()));
+ if (op_info.allows_returnvec()) {
+ log_entries.back().set_op_returns(m->ops);
+ }
+ ceph_assert(is_primary());
+ if (!log_entries.empty()) {
+ ceph_assert(log_entries.rbegin()->version >= projected_last_update);
+ version = projected_last_update = log_entries.rbegin()->version;
+ }
+ ceph::os::Transaction t;
+ peering_state.merge_new_log_entries(
+ log_entries, t, peering_state.get_pg_trim_to(),
+ peering_state.get_min_last_complete_ondisk());
+
+ set<pg_shard_t> waiting_on;
+ for (auto &i : get_acting_recovery_backfill()) {
+ pg_shard_t peer(i);
+ if (peer == pg_whoami) continue;
+ ceph_assert(peering_state.get_peer_missing().count(peer));
+ ceph_assert(peering_state.has_peer_info(peer));
+ auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>(
+ log_entries,
+ spg_t(peering_state.get_info().pgid.pgid, i.shard),
+ pg_whoami.shard,
+ get_osdmap_epoch(),
+ get_last_peering_reset(),
+ rep_tid,
+ peering_state.get_pg_trim_to(),
+ peering_state.get_min_last_complete_ondisk());
+ send_cluster_message(peer.osd, std::move(log_m), get_osdmap_epoch());
+ waiting_on.insert(peer);
+ }
+ waiting_on.insert(pg_whoami);
+ log_entry_update_waiting_on.insert(
+ std::make_pair(rep_tid, log_update_t{std::move(waiting_on)}));
+ return shard_services.get_store().do_transaction(
+ get_collection_ref(), std::move(t))
+ .then([this] {
+ peering_state.update_trim_to();
+ return seastar::now();
+ });
+}
PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
PG::do_osd_ops(
Ref<MOSDOp> m,
+ crimson::net::ConnectionRef conn,
ObjectContextRef obc,
- const OpInfo &op_info)
+ const OpInfo &op_info,
+ const SnapContext& snapc)
{
if (__builtin_expect(stopping, false)) {
throw crimson::common::system_shutdown_exception();
}
return do_osd_ops_execute<MURef<MOSDOpReply>>(
seastar::make_lw_shared<OpsExecuter>(
- Ref<PG>{this}, std::move(obc), op_info, *m),
+ Ref<PG>{this}, obc, op_info, *m, conn, snapc),
m->ops,
- op_info,
- [this, m, rvec = op_info.allows_returnvec()] {
+ [this, m, obc, may_write = op_info.may_write(),
+ may_read = op_info.may_read(), rvec = op_info.allows_returnvec()] {
// TODO: should stop at the first op which returns a negative retval,
// cmpext uses it for returning the index of first unmatched byte
int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
- if (result > 0 && !rvec) {
+ if (may_read && result >= 0) {
+ for (auto &osdop : m->ops) {
+ if (osdop.rval < 0 && !(osdop.op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
+ result = osdop.rval.code;
+ break;
+ }
+ }
+ } else if (result > 0 && may_write && !rvec) {
+ result = 0;
+ } else if (result < 0 && (m->ops.empty() ?
+ 0 : m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
result = 0;
}
auto reply = crimson::make_message<MOSDOpReply>(m.get(),
"do_osd_ops: {} - object {} sending reply",
*m,
m->get_hobj());
+ if (obc->obs.exists) {
+ reply->set_reply_versions(peering_state.get_info().last_update,
+ obc->obs.oi.user_version);
+ } else {
+ reply->set_reply_versions(peering_state.get_info().last_update,
+ peering_state.get_info().last_user_version);
+ }
return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
std::move(reply));
},
- [m, this] (const std::error_code& e) {
- auto reply = crimson::make_message<MOSDOpReply>(
- m.get(), -e.value(), get_osdmap_epoch(), 0, false);
- reply->set_enoent_reply_versions(
- peering_state.get_info().last_update,
- peering_state.get_info().last_user_version);
- return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
+ [m, &op_info, obc, this] (const std::error_code& e) {
+ return seastar::do_with(eversion_t(), [m, &op_info, obc, e, this](auto &version) {
+ auto fut = seastar::now();
+ epoch_t epoch = get_osdmap_epoch();
+ ceph_tid_t rep_tid = shard_services.get_tid();
+ auto last_complete = peering_state.get_info().last_complete;
+ if (op_info.may_write()) {
+ fut = submit_error_log(m, op_info, obc, e, rep_tid, version);
+ }
+ return fut.then([m, e, epoch, &op_info, rep_tid, &version, last_complete, this] {
+ auto log_reply = [m, e, this] {
+ auto reply = crimson::make_message<MOSDOpReply>(
+ m.get(), -e.value(), get_osdmap_epoch(), 0, false);
+ if (m->ops.empty() ? 0 :
+ m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) {
+ reply->set_result(0);
+ }
+ // For all ops except for CMPEXT, the correct error value is encoded
+ // in e.value(). For CMPEXT, osdop.rval has the actual error value.
+ if (e.value() == ct_error::cmp_fail_error_value) {
+ assert(!m->ops.empty());
+ for (auto &osdop : m->ops) {
+ if (osdop.rval < 0) {
+ reply->set_result(osdop.rval);
+ break;
+ }
+ }
+ }
+ reply->set_enoent_reply_versions(
+ peering_state.get_info().last_update,
+ peering_state.get_info().last_user_version);
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
+ std::move(reply));
+ };
+
+ if (!peering_state.pg_has_reset_since(epoch) && op_info.may_write()) {
+ auto it = log_entry_update_waiting_on.find(rep_tid);
+ ceph_assert(it != log_entry_update_waiting_on.end());
+ auto it2 = it->second.waiting_on.find(pg_whoami);
+ ceph_assert(it2 != it->second.waiting_on.end());
+ it->second.waiting_on.erase(it2);
+
+ if (it->second.waiting_on.empty()) {
+ log_entry_update_waiting_on.erase(it);
+ if (version != eversion_t()) {
+ peering_state.complete_write(version, last_complete);
+ }
+ return log_reply();
+ } else {
+ return it->second.all_committed.get_shared_future()
+ .then([this, &version, last_complete, log_reply = std::move(log_reply)] {
+ if (version != eversion_t()) {
+ peering_state.complete_write(version, last_complete);
+ }
+ return log_reply();
+ });
+ }
+ } else {
+ return log_reply();
+ }
+ });
});
+ });
}
PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<>>
ObjectContextRef obc,
std::vector<OSDOp>& ops,
const OpInfo &op_info,
- const do_osd_ops_params_t& msg_params,
+ const do_osd_ops_params_t &&msg_params,
do_osd_ops_success_func_t success_func,
do_osd_ops_failure_func_t failure_func)
{
- return do_osd_ops_execute<void>(
- seastar::make_lw_shared<OpsExecuter>(
- Ref<PG>{this}, std::move(obc), op_info, msg_params),
- ops,
- std::as_const(op_info),
- std::move(success_func),
- std::move(failure_func));
+ // This overload is generally used for internal client requests,
+ // use an empty SnapContext.
+ return seastar::do_with(
+ std::move(msg_params),
+ [=, this, &ops, &op_info](auto &msg_params) {
+ return do_osd_ops_execute<void>(
+ seastar::make_lw_shared<OpsExecuter>(
+ Ref<PG>{this},
+ std::move(obc),
+ op_info,
+ msg_params,
+ msg_params.get_connection(),
+ SnapContext{}
+ ),
+ ops,
+ std::move(success_func),
+ std::move(failure_func));
+ });
}
PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
auto reply = crimson::make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
false);
+ reply->claim_op_out_data(m->ops);
+ reply->set_reply_versions(peering_state.get_info().last_update,
+ peering_state.get_info().last_user_version);
return seastar::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
- }).handle_exception_type_interruptible([=](const crimson::osd::error& e) {
+ }).handle_exception_type_interruptible([=, this](const crimson::osd::error& e) {
auto reply = crimson::make_message<MOSDOpReply>(
m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
reply->set_enoent_reply_versions(peering_state.get_info().last_update,
}
}
-std::optional<hobject_t> PG::resolve_oid(
- const SnapSet &ss,
- const hobject_t &oid)
+void PG::check_blocklisted_obc_watchers(
+ ObjectContextRef &obc)
{
- if (oid.snap > ss.seq) {
- return oid.get_head();
- } else {
- // which clone would it be?
- auto clone = std::upper_bound(
- begin(ss.clones), end(ss.clones),
- oid.snap);
- if (clone == end(ss.clones)) {
- // Doesn't exist, > last clone, < ss.seq
- return std::nullopt;
- }
- auto citer = ss.clone_snaps.find(*clone);
- // TODO: how do we want to handle this kind of logic error?
- ceph_assert(citer != ss.clone_snaps.end());
-
- if (std::find(
- citer->second.begin(),
- citer->second.end(),
- *clone) == citer->second.end()) {
- return std::nullopt;
- } else {
- auto soid = oid;
- soid.snap = *clone;
- return std::optional<hobject_t>(soid);
+ if (obc->watchers.empty()) {
+ for (auto &[src, winfo] : obc->obs.oi.watchers) {
+ auto watch = crimson::osd::Watch::create(
+ obc, winfo, src.second, this);
+ watch->disconnect();
+ auto [it, emplaced] = obc->watchers.emplace(src, std::move(watch));
+ assert(emplaced);
+ logger().debug("added watch for obj {}, client {}",
+ obc->get_oid(), src.second);
}
}
}
-template<RWState::State State>
-PG::load_obc_iertr::future<>
-PG::with_head_obc(ObjectContextRef obc, bool existed, with_obc_func_t&& func)
-{
- logger().debug("{} {}", __func__, obc->get_oid());
- assert(obc->is_head());
- obc->append_to(obc_set_accessing);
- return obc->with_lock<State, IOInterruptCondition>(
- [existed=existed, obc=obc, func=std::move(func), this] {
- auto loaded = load_obc_iertr::make_ready_future<ObjectContextRef>(obc);
- if (existed) {
- logger().debug("with_head_obc: found {} in cache", obc->get_oid());
- } else {
- logger().debug("with_head_obc: cache miss on {}", obc->get_oid());
- loaded = obc->with_promoted_lock<State, IOInterruptCondition>([this, obc] {
- return load_head_obc(obc);
- });
- }
- return loaded.safe_then_interruptible([func = std::move(func)](auto obc) {
- return std::move(func)(std::move(obc));
- });
- }).finally([this, pgref=boost::intrusive_ptr<PG>{this}, obc=std::move(obc)] {
- logger().debug("with_head_obc: released {}", obc->get_oid());
- obc->remove_from(obc_set_accessing);
- });
-}
-
-template<RWState::State State>
-PG::load_obc_iertr::future<>
-PG::with_head_obc(hobject_t oid, with_obc_func_t&& func)
-{
- auto [obc, existed] =
- shard_services.obc_registry.get_cached_obc(std::move(oid));
- return with_head_obc<State>(std::move(obc), existed, std::move(func));
-}
-
-template<RWState::State State>
-PG::interruptible_future<>
-PG::with_existing_head_obc(ObjectContextRef obc, with_obc_func_t&& func)
-{
- constexpr bool existed = true;
- return with_head_obc<State>(
- std::move(obc), existed, std::move(func)
- ).handle_error_interruptible(load_obc_ertr::assert_all{"can't happen"});
-}
-
-template<RWState::State State>
-PG::load_obc_iertr::future<>
-PG::with_clone_obc(hobject_t oid, with_obc_func_t&& func)
-{
- assert(!oid.is_head());
- return with_head_obc<RWState::RWREAD>(oid.get_head(),
- [oid, func=std::move(func), this](auto head) -> load_obc_iertr::future<> {
- auto coid = resolve_oid(head->get_ro_ss(), oid);
- if (!coid) {
- // TODO: return crimson::ct_error::enoent::make();
- logger().error("with_clone_obc: {} clone not found", coid);
- return load_obc_ertr::make_ready_future<>();
- }
- auto [clone, existed] = shard_services.obc_registry.get_cached_obc(*coid);
- return clone->template with_lock<State>(
- [coid=*coid, existed=existed,
- head=std::move(head), clone=std::move(clone),
- func=std::move(func), this]() -> load_obc_iertr::future<> {
- auto loaded = load_obc_iertr::make_ready_future<ObjectContextRef>(clone);
- if (existed) {
- logger().debug("with_clone_obc: found {} in cache", coid);
- } else {
- logger().debug("with_clone_obc: cache miss on {}", coid);
- loaded = clone->template with_promoted_lock<State>(
- [coid, clone, head, this] {
- return backend->load_metadata(coid).safe_then_interruptible(
- [coid, clone=std::move(clone), head=std::move(head)](auto md) mutable {
- clone->set_clone_state(std::move(md->os), std::move(head));
- return clone;
- });
- });
- }
- return loaded.safe_then_interruptible([func = std::move(func)](auto clone) {
- return std::move(func)(std::move(clone));
- });
- });
- });
-}
-
-// explicitly instantiate the used instantiations
-template PG::load_obc_iertr::future<>
-PG::with_head_obc<RWState::RWNONE>(hobject_t, with_obc_func_t&&);
-
-template<RWState::State State>
-PG::interruptible_future<>
-PG::with_existing_clone_obc(ObjectContextRef clone, with_obc_func_t&& func)
-{
- assert(clone);
- assert(clone->get_head_obc());
- assert(!clone->get_oid().is_head());
- return with_existing_head_obc<RWState::RWREAD>(clone->get_head_obc(),
- [clone=std::move(clone), func=std::move(func)] ([[maybe_unused]] auto head) {
- assert(head == clone->get_head_obc());
- return clone->template with_lock<State>(
- [clone=std::move(clone), func=std::move(func)] {
- return std::move(func)(std::move(clone));
- });
- });
-}
-
-PG::load_obc_iertr::future<crimson::osd::ObjectContextRef>
-PG::load_head_obc(ObjectContextRef obc)
-{
- return backend->load_metadata(obc->get_oid()).safe_then_interruptible(
- [obc=std::move(obc)](auto md)
- -> load_obc_ertr::future<crimson::osd::ObjectContextRef> {
- const hobject_t& oid = md->os.oi.soid;
- logger().debug(
- "load_head_obc: loaded obs {} for {}", md->os.oi, oid);
- if (!md->ss) {
- logger().error(
- "load_head_obc: oid {} missing snapset", oid);
- return crimson::ct_error::object_corrupted::make();
- }
- obc->set_head_state(std::move(md->os), std::move(*(md->ss)));
- logger().debug(
- "load_head_obc: returning obc {} for {}",
- obc->obs.oi, obc->obs.oi.soid);
- return load_obc_ertr::make_ready_future<
- crimson::osd::ObjectContextRef>(obc);
- });
-}
-
-PG::load_obc_iertr::future<>
-PG::reload_obc(crimson::osd::ObjectContext& obc) const
-{
- assert(obc.is_head());
- return backend->load_metadata(obc.get_oid()).safe_then_interruptible<false>([&obc](auto md)
- -> load_obc_ertr::future<> {
- logger().debug(
- "{}: reloaded obs {} for {}",
- __func__,
- md->os.oi,
- obc.get_oid());
- if (!md->ss) {
- logger().error(
- "{}: oid {} missing snapset",
- __func__,
- obc.get_oid());
- return crimson::ct_error::object_corrupted::make();
- }
- obc.set_head_state(std::move(md->os), std::move(*(md->ss)));
- return load_obc_ertr::now();
- });
-}
-
PG::load_obc_iertr::future<>
PG::with_locked_obc(const hobject_t &hobj,
const OpInfo &op_info,
throw crimson::common::system_shutdown_exception();
}
const hobject_t oid = get_oid(hobj);
+ auto wrapper = [f=std::move(f), this](auto obc) {
+ check_blocklisted_obc_watchers(obc);
+ return f(obc);
+ };
switch (get_lock_type(op_info)) {
case RWState::RWREAD:
- if (oid.is_head()) {
- return with_head_obc<RWState::RWREAD>(oid, std::move(f));
- } else {
- return with_clone_obc<RWState::RWREAD>(oid, std::move(f));
- }
+ return obc_loader.with_obc<RWState::RWREAD>(oid, std::move(wrapper));
case RWState::RWWRITE:
- if (oid.is_head()) {
- return with_head_obc<RWState::RWWRITE>(oid, std::move(f));
- } else {
- return with_clone_obc<RWState::RWWRITE>(oid, std::move(f));
- }
+ return obc_loader.with_obc<RWState::RWWRITE>(oid, std::move(wrapper));
case RWState::RWEXCL:
- if (oid.is_head()) {
- return with_head_obc<RWState::RWEXCL>(oid, std::move(f));
- } else {
- return with_clone_obc<RWState::RWEXCL>(oid, std::move(f));
- }
+ return obc_loader.with_obc<RWState::RWEXCL>(oid, std::move(wrapper));
default:
ceph_abort();
};
}
-template <RWState::State State>
-PG::interruptible_future<>
-PG::with_locked_obc(ObjectContextRef obc, with_obc_func_t &&f)
-{
- // TODO: a question from rebase: do we really need such checks when
- // the interruptible stuff is being used?
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
- if (obc->is_head()) {
- return with_existing_head_obc<State>(obc, std::move(f));
- } else {
- return with_existing_clone_obc<State>(obc, std::move(f));
- }
-}
-
-// explicitly instantiate the used instantiations
-template PG::interruptible_future<>
-PG::with_locked_obc<RWState::RWEXCL>(ObjectContextRef, with_obc_func_t&&);
-
PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
{
if (__builtin_expect(stopping, false)) {
crimson::common::system_shutdown_exception());
}
+ logger().debug("{}: {}", __func__, *req);
if (can_discard_replica_op(*req)) {
return seastar::now();
}
auto p = req->logbl.cbegin();
std::vector<pg_log_entry_t> log_entries;
decode(log_entries, p);
- peering_state.append_log(std::move(log_entries), req->pg_trim_to,
- req->version, req->min_last_complete_ondisk, txn, !txn.empty(), false);
+ log_operation(std::move(log_entries),
+ req->pg_trim_to,
+ req->version,
+ req->min_last_complete_ondisk,
+ !txn.empty(),
+ txn,
+ false);
logger().debug("PG::handle_rep_op: do_transaction...");
return interruptor::make_interruptible(shard_services.get_store().do_transaction(
coll_ref, std::move(txn))).then_interruptible(
});
}
-void PG::handle_rep_op_reply(crimson::net::ConnectionRef conn,
- const MOSDRepOpReply& m)
+void PG::log_operation(
+ std::vector<pg_log_entry_t>&& logv,
+ const eversion_t &trim_to,
+ const eversion_t &roll_forward_to,
+ const eversion_t &min_last_complete_ondisk,
+ bool transaction_applied,
+ ObjectStore::Transaction &txn,
+ bool async) {
+ logger().debug("{}", __func__);
+ if (is_primary()) {
+ ceph_assert(trim_to <= peering_state.get_last_update_ondisk());
+ }
+ /* TODO: when we add snap mapper and projected log support,
+ * we'll likely want to update them here.
+ *
+ * See src/osd/PrimaryLogPG.h:log_operation for how classic
+ * handles these cases.
+ */
+#if 0
+ if (transaction_applied) {
+ //TODO:
+ //update_snap_map(logv, t);
+ }
+ auto last = logv.rbegin();
+ if (is_primary() && last != logv.rend()) {
+ projected_log.skip_can_rollback_to_to_head();
+ projected_log.trim(cct, last->version, nullptr, nullptr, nullptr);
+ }
+#endif
+ if (!is_primary()) { // && !is_ec_pg()
+ replica_clear_repop_obc(logv);
+ }
+ peering_state.append_log(std::move(logv),
+ trim_to,
+ roll_forward_to,
+ min_last_complete_ondisk,
+ txn,
+ !txn.empty(),
+ false);
+}
+
+void PG::replica_clear_repop_obc(
+ const std::vector<pg_log_entry_t> &logv) {
+ logger().debug("{} clearing {} entries", __func__, logv.size());
+ for (auto &&e: logv) {
+ logger().debug(" {} get_object_boundary(from): {} "
+ " head version(to): {}",
+ e.soid,
+ e.soid.get_object_boundary(),
+ e.soid.get_head());
+ /* Have to blast all clones, they share a snapset */
+ obc_registry.clear_range(
+ e.soid.get_object_boundary(), e.soid.get_head());
+ }
+}
+
+void PG::handle_rep_op_reply(const MOSDRepOpReply& m)
{
if (!can_discard_replica_op(m)) {
backend->got_rep_op_reply(m);
}
}
+PG::interruptible_future<> PG::do_update_log_missing(
+ Ref<MOSDPGUpdateLogMissing> m,
+ crimson::net::ConnectionRef conn)
+{
+ if (__builtin_expect(stopping, false)) {
+ return seastar::make_exception_future<>(
+ crimson::common::system_shutdown_exception());
+ }
+
+ ceph_assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
+ ObjectStore::Transaction t;
+ std::optional<eversion_t> op_trim_to, op_roll_forward_to;
+ if (m->pg_trim_to != eversion_t())
+ op_trim_to = m->pg_trim_to;
+ if (m->pg_roll_forward_to != eversion_t())
+ op_roll_forward_to = m->pg_roll_forward_to;
+ logger().debug("op_trim_to = {}, op_roll_forward_to = {}",
+ op_trim_to, op_roll_forward_to);
+
+ peering_state.append_log_entries_update_missing(
+ m->entries, t, op_trim_to, op_roll_forward_to);
+
+ return interruptor::make_interruptible(shard_services.get_store().do_transaction(
+ coll_ref, std::move(t))).then_interruptible(
+ [m, conn, lcod=peering_state.get_info().last_complete, this] {
+ if (!peering_state.pg_has_reset_since(m->get_epoch())) {
+ peering_state.update_last_complete_ondisk(lcod);
+ auto reply =
+ crimson::make_message<MOSDPGUpdateLogMissingReply>(
+ spg_t(peering_state.get_info().pgid.pgid, get_primary().shard),
+ pg_whoami.shard,
+ m->get_epoch(),
+ m->min_epoch,
+ m->get_tid(),
+ lcod);
+ reply->set_priority(CEPH_MSG_PRIO_HIGH);
+ return conn->send(std::move(reply));
+ }
+ return seastar::now();
+ });
+}
+
+
+PG::interruptible_future<> PG::do_update_log_missing_reply(
+ Ref<MOSDPGUpdateLogMissingReply> m)
+{
+ logger().debug("{}: got reply from {}", __func__, m->get_from());
+
+ auto it = log_entry_update_waiting_on.find(m->get_tid());
+ if (it != log_entry_update_waiting_on.end()) {
+ if (it->second.waiting_on.count(m->get_from())) {
+ it->second.waiting_on.erase(m->get_from());
+ if (m->last_complete_ondisk != eversion_t()) {
+ peering_state.update_peer_last_complete_ondisk(
+ m->get_from(), m->last_complete_ondisk);
+ }
+ } else {
+ logger().error("{} : {} got reply {} from shard we are not waiting for ",
+ __func__, peering_state.get_info().pgid, *m, m->get_from());
+ }
+
+ if (it->second.waiting_on.empty()) {
+ it->second.all_committed.set_value();
+ it->second.all_committed = {};
+ log_entry_update_waiting_on.erase(it);
+ }
+ } else {
+ logger().error("{} : {} got reply {} on unknown tid {}",
+ __func__, peering_state.get_info().pgid, *m, m->get_tid());
+ }
+ return seastar::now();
+}
+
bool PG::old_peering_msg(
const epoch_t reply_epoch,
const epoch_t query_epoch) const
}
void PG::on_change(ceph::os::Transaction &t) {
- logger().debug("{}, {}", __func__, *this);
- for (auto& obc : obc_set_accessing) {
- obc.interrupt(::crimson::common::actingset_changed(is_primary()));
- }
+ logger().debug("{} {}:", *this, __func__);
+ context_registry_on_change();
+ obc_loader.notify_on_change(is_primary());
recovery_backend->on_peering_interval_change(t);
- backend->on_actingset_changed({ is_primary() });
+ backend->on_actingset_changed(is_primary());
+ wait_for_active_blocker.unblock();
+ if (is_primary()) {
+ logger().debug("{} {}: requeueing", *this, __func__);
+ client_request_orderer.requeue(shard_services, this);
+ } else {
+ logger().debug("{} {}: dropping requests", *this, __func__);
+ client_request_orderer.clear_and_cancel();
+ }
+}
+
+void PG::context_registry_on_change() {
+ obc_registry.for_each([](ObjectContextRef obc) {
+ assert(obc);
+ for (auto j = obc->watchers.begin();
+ j != obc->watchers.end();
+ j = obc->watchers.erase(j)) {
+ j->second->discard_state();
+ }
+ });
}
bool PG::can_discard_op(const MOSDOp& m) const {
+ if (m.get_map_epoch() <
+ peering_state.get_info().history.same_primary_since) {
+ logger().debug("{} changed after {} dropping {} ",
+ __func__ , m.get_map_epoch(), m);
+ return true;
+ }
+
+ if ((m.get_flags() & (CEPH_OSD_FLAG_BALANCE_READS |
+ CEPH_OSD_FLAG_LOCALIZE_READS))
+ && !is_primary()
+ && (m.get_map_epoch() <
+ peering_state.get_info().history.same_interval_since))
+ {
+ // Note: the Objecter will resend on interval change without the primary
+ // changing if it actually sent to a replica. If the primary hasn't
+ // changed since the send epoch, we got it, and we're primary, it won't
+ // have resent even if the interval did change as it sent it to the primary
+ // (us).
+ return true;
+ }
return __builtin_expect(m.get_map_epoch()
< peering_state.get_info().history.same_primary_since, false);
}
return false;
}
-PG::interruptible_future<std::tuple<bool, int>>
+PG::interruptible_future<std::optional<PG::complete_op_t>>
PG::already_complete(const osd_reqid_t& reqid)
{
eversion_t version;
if (peering_state.get_pg_log().get_log().get_request(
reqid, &version, &user_version, &ret, &op_returns)) {
- return backend->request_committed(reqid, version).then([ret] {
- return seastar::make_ready_future<std::tuple<bool, int>>(true, ret);
+ complete_op_t dupinfo{
+ user_version,
+ version,
+ ret};
+ return backend->request_committed(reqid, version).then([dupinfo] {
+ return seastar::make_ready_future<std::optional<complete_op_t>>(dupinfo);
});
} else {
- return seastar::make_ready_future<std::tuple<bool, int>>(false, 0);
+ return seastar::make_ready_future<std::optional<complete_op_t>>(std::nullopt);
}
}