*/
#include "PG.h"
-// #include "msg/Messenger.h"
#include "messages/MOSDRepScrub.h"
-// #include "common/cmdparse.h"
-// #include "common/ceph_context.h"
#include "common/errno.h"
#include "common/ceph_releases.h"
#include "OSD.h"
#include "OpRequest.h"
#include "ScrubStore.h"
+#include "pg_scrubber.h"
#include "Session.h"
#include "osd/scheduler/OpSchedulerItem.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDPGNotify.h"
-// #include "messages/MOSDPGLog.h"
#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGScan.h"
#include "messages/MOSDPGBackfill.h"
#undef dout_prefix
#define dout_prefix _prefix(_dout, this)
+using std::list;
+using std::map;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::unique_ptr;
+using std::vector;
+
+using ceph::bufferlist;
+using ceph::bufferptr;
+using ceph::decode;
+using ceph::encode;
+using ceph::Formatter;
+
using namespace ceph::osd::scheduler;
template <class T>
pg_stats_publish_valid(false),
finish_sync_event(NULL),
scrub_after_recovery(false),
- save_req_scrub(false),
active_pushes(0),
recovery_state(
o->cct,
osd->pg_recovery_stats.log_exit(
state_name, ceph_clock_now() - enter_time, events, event_dur);
}
-
+
/********* PG **********/
void PG::remove_snap_mapped_object(
/******* PG ***********/
void PG::clear_primary_state()
{
+ dout(20) << __func__ << dendl;
+
projected_log = PGLog::IndexedLog();
snap_trimq.clear();
finish_sync_event = 0; // so that _finish_recovery doesn't go off in another thread
release_pg_backoffs();
- scrubber.reserved_peers.clear();
+ if (m_scrubber) {
+ m_scrubber->discard_replica_reservations();
+ }
scrub_after_recovery = false;
- save_req_scrub = false;
agent_clear();
}
-PG::Scrubber::Scrubber()
- : local_reserved(false), remote_reserved(false), reserve_failed(false),
- epoch_start(0),
- active(false),
- shallow_errors(0), deep_errors(0), fixed(0),
- must_scrub(false), must_deep_scrub(false), must_repair(false),
- need_auto(false), req_scrub(false), time_for_deep(false),
- auto_repair(false),
- check_repair(false),
- deep_scrub_on_error(false),
- num_digest_updates_pending(0),
- state(INACTIVE),
- deep(false)
-{}
-
-PG::Scrubber::~Scrubber() {}
bool PG::op_has_sufficient_caps(OpRequestRef& op)
{
return cap;
}
-bool PG::requeue_scrub(bool high_priority)
-{
- ceph_assert(ceph_mutex_is_locked(_lock));
- if (scrub_queued) {
- dout(10) << __func__ << ": already queued" << dendl;
- return false;
- } else {
- dout(10) << __func__ << ": queueing" << dendl;
- scrub_queued = true;
- osd->queue_for_scrub(this, high_priority);
- return true;
- }
-}
-
void PG::queue_recovery()
{
if (!is_primary() || !is_peered()) {
}
}
-bool PG::queue_scrub()
+void PG::queue_scrub_after_repair()
{
+ dout(10) << __func__ << dendl;
ceph_assert(ceph_mutex_is_locked(_lock));
+
+ m_planned_scrub.must_deep_scrub = true;
+ m_planned_scrub.check_repair = true;
+ m_planned_scrub.must_scrub = true;
+
if (is_scrubbing()) {
- return false;
+ dout(10) << __func__ << ": scrubbing already" << dendl;
+ return;
}
- // An interrupted recovery repair could leave this set.
- state_clear(PG_STATE_REPAIR);
- if (scrubber.need_auto) {
- scrubber.must_scrub = true;
- scrubber.must_deep_scrub = true;
- scrubber.auto_repair = true;
- scrubber.need_auto = false;
- }
- scrubber.priority = scrubber.must_scrub ?
- cct->_conf->osd_requested_scrub_priority : get_scrub_priority();
- scrubber.must_scrub = false;
- state_set(PG_STATE_SCRUBBING);
- if (scrubber.must_deep_scrub) {
- state_set(PG_STATE_DEEP_SCRUB);
- scrubber.must_deep_scrub = false;
- }
- if (scrubber.must_repair || scrubber.auto_repair) {
- state_set(PG_STATE_REPAIR);
- scrubber.must_repair = false;
- }
- requeue_scrub();
- return true;
+ if (scrub_queued) {
+ dout(10) << __func__ << ": already queued" << dendl;
+ return;
+ }
+
+ m_scrubber->set_op_parameters(m_planned_scrub);
+ dout(15) << __func__ << ": queueing" << dendl;
+
+ scrub_queued = true;
+ osd->queue_scrub_after_repair(this, Scrub::scrub_prio_t::high_priority);
}
unsigned PG::get_scrub_priority()
{
// a higher value -> a higher priority
- int64_t pool_scrub_priority = 0;
- pool.info.opts.get(pool_opts_t::SCRUB_PRIORITY, &pool_scrub_priority);
+ int64_t pool_scrub_priority =
+ pool.info.opts.value_or(pool_opts_t::SCRUB_PRIORITY, (int64_t)0);
return pool_scrub_priority > 0 ? pool_scrub_priority : cct->_conf->osd_scrub_priority;
}
return finish_sync_event;
}
-void PG::_finish_recovery(Context *c)
+void PG::_finish_recovery(Context* c)
{
+ dout(15) << __func__ << " finish_sync_event? " << finish_sync_event << " clean? "
+ << is_clean() << dendl;
+
std::scoped_lock locker{*this};
if (recovery_state.is_deleting() || !is_clean()) {
dout(10) << __func__ << " raced with delete or repair" << dendl;
// When recovery is initiated by a repair, that flag is left on
state_clear(PG_STATE_REPAIR);
if (c == finish_sync_event) {
- dout(10) << "_finish_recovery" << dendl;
+ dout(15) << __func__ << " scrub_after_recovery? " << scrub_after_recovery << dendl;
finish_sync_event = 0;
recovery_state.purge_strays();
if (scrub_after_recovery) {
dout(10) << "_finish_recovery requeueing for scrub" << dendl;
scrub_after_recovery = false;
- scrubber.must_deep_scrub = true;
- scrubber.check_repair = true;
- // We remember whether req_scrub was set when scrub_after_recovery set to true
- scrubber.req_scrub = save_req_scrub;
- queue_scrub();
+ queue_scrub_after_repair();
}
} else {
dout(10) << "_finish_recovery -- stale" << dendl;
{
dout(10) << "finish_recovery_op " << soid
#ifdef DEBUG_RECOVERY_OIDS
- << " (" << recovering_oids << ")"
+ << " (" << recovering_oids << ")"
#endif
<< dendl;
ceph_assert(recovery_ops_active > 0);
}
}
-void PG::clear_recovery_state()
+void PG::clear_recovery_state()
{
dout(10) << "clear_recovery_state" << dendl;
}
void PG::send_cluster_message(
- int target, Message *m,
+ int target, MessageRef m,
epoch_t epoch, bool share_map_update=false)
{
ConnectionRef con = osd->get_con_osd_cluster(
target, get_osdmap_epoch());
if (!con) {
- m->put();
return;
}
const vector<pg_log_entry_t> &log_entries,
ObjectStore::Transaction &t)
{
- for (vector<pg_log_entry_t>::const_iterator i = log_entries.begin();
- i != log_entries.end();
- ++i) {
+ for (auto i = log_entries.cbegin(); i != log_entries.cend(); ++i) {
OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
if (i->soid.snap < CEPH_MAXSNAP) {
if (i->is_delete()) {
int r = snap_mapper.remove_oid(
i->soid,
&_t);
- if (r != 0)
+ if (r)
derr << __func__ << " remove_oid " << i->soid << " failed with " << r << dendl;
// On removal tolerate missing key corruption
ceph_assert(r == 0 || r == -ENOENT);
void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m)
{
- for (map<hobject_t, list<OpRequestRef>>::iterator it = m.begin();
- it != m.end();
- ++it)
+ for (auto it = m.begin(); it != m.end(); ++it)
requeue_ops(it->second);
m.clear();
}
}
}
+bool PG::get_must_scrub() const
+{
+ dout(20) << __func__ << " must_scrub? " << (m_planned_scrub.must_scrub ? "true" : "false") << dendl;
+ return m_planned_scrub.must_scrub;
+}
+
+unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority) const
+{
+ return m_scrubber->scrub_requeue_priority(with_priority);
+}
+
+unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsigned int suggested_priority) const
+{
+ return m_scrubber->scrub_requeue_priority(with_priority, suggested_priority);
+}
// ==========================================================================================
// SCRUB
/*
- * when holding pg and sched_scrub_lock, then the states are:
- * scheduling:
- * scrubber.local_reserved = true
- * scrubber.active = false
- * scrubber.reserved_peers includes whoami
- * osd->scrubs_local++
- * scheduling, replica declined:
- * scrubber.local_reserved = true
- * scrubber.reserved_peers includes -1
- * osd->scrub_local++
- * pending:
- * scrubber.local_reserved = true
- * scrubber.active = false
- * scrubber.reserved_peers.size() == acting.size();
- * pg on scrub_wq
- * osd->scrub_local++
- * scrubbing:
- * scrubber.local_reserved = true;
- * scrubber.active = true
- * scrubber.reserved_peers empty
+ * implementation note:
+ * PG::sched_scrub() is called only once per a specific scrub session.
+ * That call commits us to the whatever choices are made (deep/shallow, etc').
+ * Unless failing to start scrubbing, the 'planned scrub' flag-set is 'frozen' into
+ * PgScrubber's m_flags, then cleared.
*/
-
-// returns true if a scrub has been newly kicked off
bool PG::sched_scrub()
{
+ dout(15) << __func__ << " pg(" << info.pgid
+ << (is_active() ? ") <active>" : ") <not-active>")
+ << (is_clean() ? " <clean>" : " <not-clean>") << dendl;
ceph_assert(ceph_mutex_is_locked(_lock));
ceph_assert(!is_scrubbing());
- if (!(is_primary() && is_active() && is_clean())) {
+
+ if (!is_primary() || !is_active() || !is_clean()) {
return false;
}
- // All processing the first time through commits us to whatever
- // choices are made.
- if (!scrubber.local_reserved) {
- dout(20) << __func__ << ": Start processing pg " << info.pgid << dendl;
+ if (scrub_queued) {
+ // only applicable to the very first time a scrub event is queued
+ // (until handled and posted to the scrub FSM)
+ dout(10) << __func__ << ": already queued" << dendl;
+ return false;
+ }
- bool allow_deep_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
- pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB));
- bool allow_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
- pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB));
- bool has_deep_errors = (info.stats.stats.sum.num_deep_scrub_errors > 0);
- bool try_to_auto_repair = (cct->_conf->osd_scrub_auto_repair
- && get_pgbackend()->auto_repair_supported());
+ // analyse the combination of the requested scrub flags, the osd/pool configuration
+ // and the PG status to determine whether we should scrub now, and what type of scrub
+ // should that be.
+ auto updated_flags = verify_scrub_mode();
+ if (!updated_flags) {
+ // the stars do not align for starting a scrub for this PG at this time
+ // (due to configuration or priority issues)
+ // The reason was already reported by the callee.
+ dout(10) << __func__ << ": failed to initiate a scrub" << dendl;
+ return false;
+ }
- scrubber.time_for_deep = false;
- // Clear these in case user issues the scrub/repair command during
- // the scheduling of the scrub/repair (e.g. request reservation)
- scrubber.deep_scrub_on_error = false;
- scrubber.auto_repair = false;
+ // try to reserve the local OSD resources. If failing: no harm. We will
+ // be retried by the OSD later on.
+ if (!m_scrubber->reserve_local()) {
+ dout(10) << __func__ << ": failed to reserve locally" << dendl;
+ return false;
+ }
- // All periodic scrub handling goes here because must_scrub is
- // always set for must_deep_scrub and must_repair.
- if (!scrubber.must_scrub) {
- ceph_assert(!scrubber.must_deep_scrub && !scrubber.must_repair);
- // Handle deep scrub determination only if allowed
- if (allow_deep_scrub) {
- // Initial entry and scheduled scrubs without nodeep_scrub set get here
- if (scrubber.need_auto) {
- dout(20) << __func__ << ": need repair after scrub errors" << dendl;
- scrubber.time_for_deep = true;
- } else {
- double deep_scrub_interval = 0;
- pool.info.opts.get(pool_opts_t::DEEP_SCRUB_INTERVAL, &deep_scrub_interval);
- if (deep_scrub_interval <= 0) {
- deep_scrub_interval = cct->_conf->osd_deep_scrub_interval;
- }
- scrubber.time_for_deep = ceph_clock_now() >=
- info.history.last_deep_scrub_stamp + deep_scrub_interval;
-
- bool deep_coin_flip = false;
- // If we randomize when !allow_scrub && allow_deep_scrub, then it guarantees
- // we will deep scrub because this function is called often.
- if (!scrubber.time_for_deep && allow_scrub)
- deep_coin_flip = (rand() % 100) < cct->_conf->osd_deep_scrub_randomize_ratio * 100;
- dout(20) << __func__ << ": time_for_deep=" << scrubber.time_for_deep << " deep_coin_flip=" << deep_coin_flip << dendl;
-
- scrubber.time_for_deep = (scrubber.time_for_deep || deep_coin_flip);
- }
+ // can commit to the updated flags now, as nothing will stop the scrub
+ m_planned_scrub = *updated_flags;
- if (!scrubber.time_for_deep && has_deep_errors) {
- osd->clog->info() << "osd." << osd->whoami
- << " pg " << info.pgid
- << " Deep scrub errors, upgrading scrub to deep-scrub";
- scrubber.time_for_deep = true;
- }
+ // An interrupted recovery repair could leave this set.
+ state_clear(PG_STATE_REPAIR);
- if (try_to_auto_repair) {
- if (scrubber.time_for_deep) {
- dout(20) << __func__ << ": auto repair with deep scrubbing" << dendl;
- scrubber.auto_repair = true;
- } else if (allow_scrub) {
- dout(20) << __func__ << ": auto repair with scrubbing, rescrub if errors found" << dendl;
- scrubber.deep_scrub_on_error = true;
- }
- }
- } else { // !allow_deep_scrub
- dout(20) << __func__ << ": nodeep_scrub set" << dendl;
- if (has_deep_errors) {
- osd->clog->error() << "osd." << osd->whoami
- << " pg " << info.pgid
- << " Regular scrub skipped due to deep-scrub errors and nodeep-scrub set";
- return false;
- }
- }
+ // Pass control to the scrubber. It is the scrubber that handles the replicas'
+ // resources reservations.
+ m_scrubber->set_op_parameters(m_planned_scrub);
- //NOSCRUB so skip regular scrubs
- if (!allow_scrub && !scrubber.time_for_deep) {
- return false;
- }
- // scrubber.must_scrub
- } else if (!scrubber.must_deep_scrub && has_deep_errors) {
- osd->clog->error() << "osd." << osd->whoami
- << " pg " << info.pgid
- << " Regular scrub request, deep-scrub details will be lost";
- }
- // Unless precluded this was handle above
- scrubber.need_auto = false;
-
- ceph_assert(scrubber.reserved_peers.empty());
- bool allow_scrubing = cct->_conf->osd_scrub_during_recovery ||
- (cct->_conf->osd_repair_during_recovery && scrubber.must_repair) ||
- !osd->is_recovery_active();
- if (allow_scrubing &&
- osd->inc_scrubs_local()) {
- dout(20) << __func__ << ": reserved locally, reserving replicas" << dendl;
- scrubber.local_reserved = true;
- scrubber.reserved_peers.insert(pg_whoami);
- scrub_reserve_replicas();
- } else {
- dout(20) << __func__ << ": failed to reserve locally" << dendl;
- return false;
- }
+ dout(10) << __func__ << ": queueing" << dendl;
+
+ scrub_queued = true;
+ osd->queue_for_scrub(this, Scrub::scrub_prio_t::low_priority);
+ return true;
+}
+
+double PG::next_deepscrub_interval() const
+{
+ double deep_scrub_interval =
+ pool.info.opts.value_or(pool_opts_t::DEEP_SCRUB_INTERVAL, 0.0);
+ if (deep_scrub_interval <= 0.0)
+ deep_scrub_interval = cct->_conf->osd_deep_scrub_interval;
+ return info.history.last_deep_scrub_stamp + deep_scrub_interval;
+}
+
+bool PG::is_time_for_deep(bool allow_deep_scrub,
+ bool allow_scrub,
+ bool has_deep_errors,
+ const requested_scrub_t& planned) const
+{
+ dout(10) << __func__ << ": need_auto?" << planned.need_auto << " allow_deep_scrub? " << allow_deep_scrub << dendl;
+
+ if (!allow_deep_scrub)
+ return false;
+
+ if (planned.need_auto) {
+ dout(10) << __func__ << ": need repair after scrub errors" << dendl;
+ return true;
+ }
+
+ if (ceph_clock_now() >= next_deepscrub_interval())
+ return true;
+
+ if (has_deep_errors) {
+ osd->clog->info() << "osd." << osd->whoami << " pg " << info.pgid
+ << " Deep scrub errors, upgrading scrub to deep-scrub";
+ return true;
}
- if (scrubber.local_reserved) {
- if (scrubber.reserve_failed) {
- dout(20) << __func__ << ": failed, a peer declined" << dendl;
- clear_scrub_reserved();
- scrub_unreserve_replicas();
+ // we only flip coins if 'allow_scrub' is asserted. Otherwise - as this function is
+ // called often, we will probably be deep-scrubbing most of the time.
+ if (allow_scrub) {
+ bool deep_coin_flip =
+ (rand() % 100) < cct->_conf->osd_deep_scrub_randomize_ratio * 100;
+
+ dout(15) << __func__ << ": time_for_deep=" << planned.time_for_deep
+ << " deep_coin_flip=" << deep_coin_flip << dendl;
+
+ if (deep_coin_flip)
+ return true;
+ }
+
+ return false;
+}
+
+bool PG::verify_periodic_scrub_mode(bool allow_deep_scrub,
+ bool try_to_auto_repair,
+ bool allow_regular_scrub,
+ bool has_deep_errors,
+ requested_scrub_t& planned) const
+
+{
+ ceph_assert(!planned.must_deep_scrub && !planned.must_repair);
+
+ if (!allow_deep_scrub && has_deep_errors) {
+ osd->clog->error()
+ << "osd." << osd->whoami << " pg " << info.pgid
+ << " Regular scrub skipped due to deep-scrub errors and nodeep-scrub set";
return false;
- } else if (scrubber.reserved_peers.size() == get_actingset().size()) {
- dout(20) << __func__ << ": success, reserved self and replicas" << dendl;
- if (scrubber.time_for_deep) {
- dout(10) << __func__ << ": scrub will be deep" << dendl;
- state_set(PG_STATE_DEEP_SCRUB);
- scrubber.time_for_deep = false;
+ }
+
+ if (allow_deep_scrub) {
+ // Initial entry and scheduled scrubs without nodeep_scrub set get here
+
+ planned.time_for_deep =
+ is_time_for_deep(allow_deep_scrub, allow_regular_scrub, has_deep_errors, planned);
+
+ if (try_to_auto_repair) {
+ if (planned.time_for_deep) {
+ dout(20) << __func__ << ": auto repair with deep scrubbing" << dendl;
+ planned.auto_repair = true;
+ } else if (allow_regular_scrub) {
+ dout(20) << __func__ << ": auto repair with scrubbing, rescrub if errors found"
+ << dendl;
+ planned.deep_scrub_on_error = true;
}
- queue_scrub();
- } else {
- // none declined, since scrubber.reserved is set
- dout(20) << __func__ << ": reserved " << scrubber.reserved_peers
- << ", waiting for replicas" << dendl;
}
}
+
+ dout(20) << __func__ << " updated flags: " << planned
+ << " allow_regular_scrub: " << allow_regular_scrub << dendl;
+
+ // NOSCRUB so skip regular scrubs
+ if (!allow_regular_scrub && !planned.time_for_deep) {
+ return false;
+ }
+
return true;
}
-bool PG::is_scrub_registered()
+std::optional<requested_scrub_t> PG::verify_scrub_mode() const
{
- return !scrubber.scrub_reg_stamp.is_zero();
-}
+ dout(10) << __func__ << " processing pg " << info.pgid << dendl;
-void PG::reg_next_scrub()
-{
- if (!is_primary())
- return;
+ bool allow_deep_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
+ pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB));
+ bool allow_regular_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
+ pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB));
+ bool has_deep_errors = (info.stats.stats.sum.num_deep_scrub_errors > 0);
+ bool try_to_auto_repair =
+ (cct->_conf->osd_scrub_auto_repair && get_pgbackend()->auto_repair_supported());
- utime_t reg_stamp;
- bool must = false;
- if (scrubber.must_scrub || scrubber.need_auto) {
- // Set the smallest time that isn't utime_t()
- reg_stamp = Scrubber::scrub_must_stamp();
- must = true;
- } else if (info.stats.stats_invalid && cct->_conf->osd_scrub_invalid_stats) {
- reg_stamp = ceph_clock_now();
- must = true;
- } else {
- reg_stamp = info.history.last_scrub_stamp;
+ auto upd_flags = m_planned_scrub;
+
+ upd_flags.time_for_deep = false;
+ // Clear these in case user issues the scrub/repair command during
+ // the scheduling of the scrub/repair (e.g. request reservation)
+ upd_flags.deep_scrub_on_error = false;
+ upd_flags.auto_repair = false;
+
+ if (upd_flags.must_scrub && !upd_flags.must_deep_scrub && has_deep_errors) {
+ osd->clog->error() << "osd." << osd->whoami << " pg " << info.pgid
+ << " Regular scrub request, deep-scrub details will be lost";
+ }
+
+ if (!upd_flags.must_scrub) {
+ // All periodic scrub handling goes here because must_scrub is
+ // always set for must_deep_scrub and must_repair.
+
+ bool can_start_periodic =
+ verify_periodic_scrub_mode(allow_deep_scrub, try_to_auto_repair,
+ allow_regular_scrub, has_deep_errors, upd_flags);
+ if (!can_start_periodic) {
+ return std::nullopt;
+ }
}
- // note down the sched_time, so we can locate this scrub, and remove it
- // later on.
- double scrub_min_interval = 0, scrub_max_interval = 0;
- pool.info.opts.get(pool_opts_t::SCRUB_MIN_INTERVAL, &scrub_min_interval);
- pool.info.opts.get(pool_opts_t::SCRUB_MAX_INTERVAL, &scrub_max_interval);
- ceph_assert(!is_scrub_registered());
- scrubber.scrub_reg_stamp = osd->reg_pg_scrub(info.pgid,
- reg_stamp,
- scrub_min_interval,
- scrub_max_interval,
- must);
- dout(10) << __func__ << " pg " << pg_id << " register next scrub, scrub time "
- << scrubber.scrub_reg_stamp << ", must = " << (int)must << dendl;
+
+ // scrubbing while recovering?
+
+ bool prevented_by_recovery =
+ osd->is_recovery_active() && !cct->_conf->osd_scrub_during_recovery &&
+ (!cct->_conf->osd_repair_during_recovery || !upd_flags.must_repair);
+
+ if (prevented_by_recovery) {
+ dout(20) << __func__ << ": scrubbing prevented during recovery" << dendl;
+ return std::nullopt;
+ }
+
+ upd_flags.need_auto = false;
+ return upd_flags;
}
-void PG::unreg_next_scrub()
+void PG::reg_next_scrub()
{
- if (is_scrub_registered()) {
- osd->unreg_pg_scrub(info.pgid, scrubber.scrub_reg_stamp);
- scrubber.scrub_reg_stamp = utime_t();
- }
+ m_scrubber->reg_next_scrub(m_planned_scrub);
}
void PG::on_info_history_change()
{
- unreg_next_scrub();
- reg_next_scrub();
+ if (m_scrubber) {
+ m_scrubber->unreg_next_scrub();
+ m_scrubber->reg_next_scrub(m_planned_scrub);
+ }
}
-void PG::scrub_requested(bool deep, bool repair, bool need_auto)
+void PG::scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type)
{
- unreg_next_scrub();
- if (need_auto) {
- scrubber.need_auto = true;
- } else {
- scrubber.must_scrub = true;
- scrubber.must_deep_scrub = deep || repair;
- scrubber.must_repair = repair;
- // User might intervene, so clear this
- scrubber.need_auto = false;
- scrubber.req_scrub = true;
- }
- reg_next_scrub();
+ m_scrubber->scrub_requested(scrub_level, scrub_type, m_planned_scrub);
}
void PG::clear_ready_to_merge() {
}
void PG::on_new_interval() {
+ dout(20) << __func__ << " scrub_queued was " << scrub_queued << " flags: " << m_planned_scrub << dendl;
scrub_queued = false;
projected_last_update = eversion_t();
cancel_recovery();
void PG::request_local_background_io_reservation(
unsigned priority,
- PGPeeringEventRef on_grant,
- PGPeeringEventRef on_preempt) {
+ PGPeeringEventURef on_grant,
+ PGPeeringEventURef on_preempt) {
osd->local_reserver.request_reservation(
pg_id,
on_grant ? new QueuePeeringEvt(
- this, on_grant) : nullptr,
+ this, std::move(on_grant)) : nullptr,
priority,
on_preempt ? new QueuePeeringEvt(
- this, on_preempt) : nullptr);
+ this, std::move(on_preempt)) : nullptr);
}
void PG::update_local_background_io_priority(
void PG::request_remote_recovery_reservation(
unsigned priority,
- PGPeeringEventRef on_grant,
- PGPeeringEventRef on_preempt) {
+ PGPeeringEventURef on_grant,
+ PGPeeringEventURef on_preempt) {
osd->remote_reserver.request_reservation(
pg_id,
on_grant ? new QueuePeeringEvt(
- this, on_grant) : nullptr,
+ this, std::move(on_grant)) : nullptr,
priority,
on_preempt ? new QueuePeeringEvt(
- this, on_preempt) : nullptr);
+ this, std::move(on_preempt)) : nullptr);
}
void PG::cancel_remote_recovery_reservation() {
t.register_on_commit(new QueuePeeringEvt(this, on_commit));
}
+void PG::on_activate(interval_set<snapid_t> snaps)
+{
+ ceph_assert(!m_scrubber->are_callbacks_pending());
+ ceph_assert(callbacks_for_degraded_object.empty());
+ snap_trimq = snaps;
+ release_pg_backoffs();
+ projected_last_update = info.last_update;
+}
+
void PG::on_active_exit()
{
backfill_reserving = false;
}
}
-void PG::do_replica_scrub_map(OpRequestRef op)
-{
- auto m = op->get_req<MOSDRepScrubMap>();
- dout(7) << __func__ << " " << *m << dendl;
- if (m->map_epoch < info.history.same_interval_since) {
- dout(10) << __func__ << " discarding old from "
- << m->map_epoch << " < " << info.history.same_interval_since
- << dendl;
- return;
- }
- if (!scrubber.is_chunky_scrub_active()) {
- dout(10) << __func__ << " scrub isn't active" << dendl;
- return;
- }
-
- op->mark_started();
-
- auto p = const_cast<bufferlist&>(m->get_data()).cbegin();
- scrubber.received_maps[m->from].decode(p, info.pgid.pool());
- dout(10) << "map version is "
- << scrubber.received_maps[m->from].valid_through
- << dendl;
-
- dout(10) << __func__ << " waiting_on_whom was " << scrubber.waiting_on_whom
- << dendl;
- ceph_assert(scrubber.waiting_on_whom.count(m->from));
- scrubber.waiting_on_whom.erase(m->from);
- if (m->preempted) {
- dout(10) << __func__ << " replica was preempted, setting flag" << dendl;
- scrub_preempted = true;
- }
- if (scrubber.waiting_on_whom.empty()) {
- requeue_scrub(ops_blocked_by_scrub());
- }
-}
-
-// send scrub v3 messages (chunky scrub)
-void PG::_request_scrub_map(
- pg_shard_t replica, eversion_t version,
- hobject_t start, hobject_t end,
- bool deep,
- bool allow_preemption)
-{
- ceph_assert(replica != pg_whoami);
- dout(10) << "scrub requesting scrubmap from osd." << replica
- << " deep " << (int)deep << dendl;
- MOSDRepScrub *repscrubop = new MOSDRepScrub(
- spg_t(info.pgid.pgid, replica.shard), version,
- get_osdmap_epoch(),
- get_last_peering_reset(),
- start, end, deep,
- allow_preemption,
- scrubber.priority,
- ops_blocked_by_scrub());
- // default priority, we want the rep scrub processed prior to any recovery
- // or client io messages (we are holding a lock!)
- osd->send_message_osd_cluster(
- replica.osd, repscrubop, get_osdmap_epoch());
-}
-
-void PG::handle_scrub_reserve_request(OpRequestRef op)
-{
- dout(7) << __func__ << " " << *op->get_req() << dendl;
- op->mark_started();
- if (scrubber.remote_reserved) {
- dout(10) << __func__ << " ignoring reserve request: Already reserved"
- << dendl;
- return;
- }
- if ((cct->_conf->osd_scrub_during_recovery || !osd->is_recovery_active()) &&
- osd->inc_scrubs_remote()) {
- scrubber.remote_reserved = true;
- } else {
- dout(20) << __func__ << ": failed to reserve remotely" << dendl;
- scrubber.remote_reserved = false;
- }
- auto m = op->get_req<MOSDScrubReserve>();
- Message *reply = new MOSDScrubReserve(
- spg_t(info.pgid.pgid, get_primary().shard),
- m->map_epoch,
- scrubber.remote_reserved ? MOSDScrubReserve::GRANT : MOSDScrubReserve::REJECT,
- pg_whoami);
- osd->send_message_osd_cluster(reply, op->get_req()->get_connection());
-}
-
-void PG::handle_scrub_reserve_grant(OpRequestRef op, pg_shard_t from)
-{
- dout(7) << __func__ << " " << *op->get_req() << dendl;
- op->mark_started();
- if (!scrubber.local_reserved) {
- dout(10) << "ignoring obsolete scrub reserve reply" << dendl;
- return;
- }
- if (scrubber.reserved_peers.find(from) != scrubber.reserved_peers.end()) {
- dout(10) << " already had osd." << from << " reserved" << dendl;
- } else {
- dout(10) << " osd." << from << " scrub reserve = success" << dendl;
- scrubber.reserved_peers.insert(from);
- sched_scrub();
- }
-}
-
-void PG::handle_scrub_reserve_reject(OpRequestRef op, pg_shard_t from)
-{
- dout(7) << __func__ << " " << *op->get_req() << dendl;
- op->mark_started();
- if (!scrubber.local_reserved) {
- dout(10) << "ignoring obsolete scrub reserve reply" << dendl;
- return;
- }
- if (scrubber.reserved_peers.find(from) != scrubber.reserved_peers.end()) {
- dout(10) << " already had osd." << from << " reserved" << dendl;
- } else {
- /* One decline stops this pg from being scheduled for scrubbing. */
- dout(10) << " osd." << from << " scrub reserve = fail" << dendl;
- scrubber.reserve_failed = true;
- sched_scrub();
- }
-}
-
-void PG::handle_scrub_reserve_release(OpRequestRef op)
-{
- dout(7) << __func__ << " " << *op->get_req() << dendl;
- op->mark_started();
- clear_scrub_reserved();
-}
-
// Compute pending backfill data
static int64_t pending_backfill(CephContext *cct, int64_t bf_bytes, int64_t local_bytes)
{
void PG::unreserve_recovery_space() {
primary_num_bytes.store(0);
local_num_bytes.store(0);
- return;
-}
-
-void PG::clear_scrub_reserved()
-{
- scrubber.reserved_peers.clear();
- scrubber.reserve_failed = false;
-
- if (scrubber.local_reserved) {
- scrubber.local_reserved = false;
- osd->dec_scrubs_local();
- }
- if (scrubber.remote_reserved) {
- scrubber.remote_reserved = false;
- osd->dec_scrubs_remote();
- }
-}
-
-void PG::scrub_reserve_replicas()
-{
- ceph_assert(recovery_state.get_backfill_targets().empty());
- std::vector<std::pair<int, Message*>> messages;
- messages.reserve(get_actingset().size());
- epoch_t e = get_osdmap_epoch();
- for (set<pg_shard_t>::iterator i = get_actingset().begin();
- i != get_actingset().end();
- ++i) {
- if (*i == pg_whoami) continue;
- dout(10) << "scrub requesting reserve from osd." << *i << dendl;
- Message* m = new MOSDScrubReserve(spg_t(info.pgid.pgid, i->shard), e,
- MOSDScrubReserve::REQUEST, pg_whoami);
- messages.push_back(std::make_pair(i->osd, m));
- }
- if (!messages.empty()) {
- osd->send_message_osd_cluster(messages, e);
- }
-}
-
-void PG::scrub_unreserve_replicas()
-{
- ceph_assert(recovery_state.get_backfill_targets().empty());
- std::vector<std::pair<int, Message*>> messages;
- messages.reserve(get_actingset().size());
- epoch_t e = get_osdmap_epoch();
- for (set<pg_shard_t>::iterator i = get_actingset().begin();
- i != get_actingset().end();
- ++i) {
- if (*i == pg_whoami) continue;
- dout(10) << "scrub requesting unreserve from osd." << *i << dendl;
- Message* m = new MOSDScrubReserve(spg_t(info.pgid.pgid, i->shard), e,
- MOSDScrubReserve::RELEASE, pg_whoami);
- messages.push_back(std::make_pair(i->osd, m));
- }
- if (!messages.empty()) {
- osd->send_message_osd_cluster(messages, e);
- }
}
void PG::_scan_rollback_obs(const vector<ghobject_t> &rollback_obs)
}
}
-void PG::_scan_snaps(ScrubMap &smap)
-{
- hobject_t head;
- SnapSet snapset;
-
- // Test qa/standalone/scrub/osd-scrub-snaps.sh uses this message to verify
- // caller using clean_meta_map(), and it works properly.
- dout(20) << __func__ << " start" << dendl;
-
- for (map<hobject_t, ScrubMap::object>::reverse_iterator i = smap.objects.rbegin();
- i != smap.objects.rend();
- ++i) {
- const hobject_t &hoid = i->first;
- ScrubMap::object &o = i->second;
-
- dout(20) << __func__ << " " << hoid << dendl;
-
- ceph_assert(!hoid.is_snapdir());
- if (hoid.is_head()) {
- // parse the SnapSet
- bufferlist bl;
- if (o.attrs.find(SS_ATTR) == o.attrs.end()) {
- continue;
- }
- bl.push_back(o.attrs[SS_ATTR]);
- auto p = bl.cbegin();
- try {
- decode(snapset, p);
- } catch(...) {
- continue;
- }
- head = hoid.get_head();
- continue;
- }
- if (hoid.snap < CEPH_MAXSNAP) {
- // check and if necessary fix snap_mapper
- if (hoid.get_head() != head) {
- derr << __func__ << " no head for " << hoid << " (have " << head << ")"
- << dendl;
- continue;
- }
- set<snapid_t> obj_snaps;
- auto p = snapset.clone_snaps.find(hoid.snap);
- if (p == snapset.clone_snaps.end()) {
- derr << __func__ << " no clone_snaps for " << hoid << " in " << snapset
- << dendl;
- continue;
- }
- obj_snaps.insert(p->second.begin(), p->second.end());
- set<snapid_t> cur_snaps;
- int r = snap_mapper.get_snaps(hoid, &cur_snaps);
- if (r != 0 && r != -ENOENT) {
- derr << __func__ << ": get_snaps returned " << cpp_strerror(r) << dendl;
- ceph_abort();
- }
- if (r == -ENOENT || cur_snaps != obj_snaps) {
- ObjectStore::Transaction t;
- OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
- if (r == 0) {
- r = snap_mapper.remove_oid(hoid, &_t);
- if (r != 0) {
- derr << __func__ << ": remove_oid returned " << cpp_strerror(r)
- << dendl;
- ceph_abort();
- }
- osd->clog->error() << "osd." << osd->whoami
- << " found snap mapper error on pg "
- << info.pgid
- << " oid " << hoid << " snaps in mapper: "
- << cur_snaps << ", oi: "
- << obj_snaps
- << "...repaired";
- } else {
- osd->clog->error() << "osd." << osd->whoami
- << " found snap mapper error on pg "
- << info.pgid
- << " oid " << hoid << " snaps missing in mapper"
- << ", should be: "
- << obj_snaps
- << " was " << cur_snaps << " r " << r
- << "...repaired";
- }
- snap_mapper.add_oid(hoid, obj_snaps, &_t);
-
- // wait for repair to apply to avoid confusing other bits of the system.
- {
- ceph::condition_variable my_cond;
- ceph::mutex my_lock = ceph::make_mutex("PG::_scan_snaps my_lock");
- int r = 0;
- bool done;
- t.register_on_applied_sync(
- new C_SafeCond(my_lock, my_cond, &done, &r));
- r = osd->store->queue_transaction(ch, std::move(t));
- if (r != 0) {
- derr << __func__ << ": queue_transaction got " << cpp_strerror(r)
- << dendl;
- } else {
- std::unique_lock l{my_lock};
- my_cond.wait(l, [&done] { return done;});
- }
- }
- }
- }
- }
-}
void PG::_repair_oinfo_oid(ScrubMap &smap)
{
}
}
}
-int PG::build_scrub_map_chunk(
- ScrubMap &map,
- ScrubMapBuilder &pos,
- hobject_t start,
- hobject_t end,
- bool deep,
- ThreadPool::TPHandle &handle)
-{
- dout(10) << __func__ << " [" << start << "," << end << ") "
- << " pos " << pos
- << dendl;
-
- // start
- while (pos.empty()) {
- pos.deep = deep;
- map.valid_through = info.last_update;
-
- // objects
- vector<ghobject_t> rollback_obs;
- pos.ret = get_pgbackend()->objects_list_range(
- start,
- end,
- &pos.ls,
- &rollback_obs);
- if (pos.ret < 0) {
- dout(5) << "objects_list_range error: " << pos.ret << dendl;
- return pos.ret;
- }
- if (pos.ls.empty()) {
- break;
- }
- _scan_rollback_obs(rollback_obs);
- pos.pos = 0;
- return -EINPROGRESS;
- }
-
- // scan objects
- while (!pos.done()) {
- int r = get_pgbackend()->be_scan_list(map, pos);
- if (r == -EINPROGRESS) {
- return r;
- }
- }
-
- // finish
- dout(20) << __func__ << " finishing" << dendl;
- ceph_assert(pos.done());
- _repair_oinfo_oid(map);
- if (!is_primary()) {
- ScrubMap for_meta_scrub;
- // In case we restarted smaller chunk, clear old data
- scrubber.cleaned_meta_map.clear_from(scrubber.start);
- scrubber.cleaned_meta_map.insert(map);
- scrubber.clean_meta_map(for_meta_scrub);
- _scan_snaps(for_meta_scrub);
- }
-
- dout(20) << __func__ << " done, got " << map.objects.size() << " items"
- << dendl;
- return 0;
-}
-
-void PG::Scrubber::cleanup_store(ObjectStore::Transaction *t) {
- if (!store)
- return;
- struct OnComplete : Context {
- std::unique_ptr<Scrub::Store> store;
- explicit OnComplete(
- std::unique_ptr<Scrub::Store> &&store)
- : store(std::move(store)) {}
- void finish(int) override {}
- };
- store->cleanup(t);
- t->register_on_complete(new OnComplete(std::move(store)));
- ceph_assert(!store);
-}
void PG::repair_object(
const hobject_t &soid,
recovery_state.force_object_missing(bad_peers, soid, oi.version);
}
-/* replica_scrub
- *
- * Wait for last_update_applied to match msg->scrub_to as above. Wait
- * for pushes to complete in case of recent recovery. Build a single
- * scrubmap of objects that are in the range [msg->start, msg->end).
- */
-void PG::replica_scrub(
- OpRequestRef op,
- ThreadPool::TPHandle &handle)
+void PG::forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued)
{
- auto msg = op->get_req<MOSDRepScrub>();
- ceph_assert(!scrubber.active_rep_scrub);
- dout(7) << "replica_scrub" << dendl;
-
- if (msg->map_epoch < info.history.same_interval_since) {
- dout(10) << "replica_scrub discarding old replica_scrub from "
- << msg->map_epoch << " < " << info.history.same_interval_since
- << dendl;
- return;
- }
-
- ceph_assert(msg->chunky);
- if (active_pushes > 0) {
- dout(10) << "waiting for active pushes to finish" << dendl;
- scrubber.active_rep_scrub = op;
- return;
- }
-
- scrubber.state = Scrubber::BUILD_MAP_REPLICA;
- scrubber.replica_scrub_start = msg->min_epoch;
- scrubber.start = msg->start;
- scrubber.end = msg->end;
- scrubber.max_end = msg->end;
- scrubber.deep = msg->deep;
- scrubber.epoch_start = info.history.same_interval_since;
- if (msg->priority) {
- scrubber.priority = msg->priority;
+ dout(20) << __func__ << " queued at: " << epoch_queued << dendl;
+ if (is_active() && m_scrubber) {
+ ((*m_scrubber).*fn)(epoch_queued);
} else {
- scrubber.priority = get_scrub_priority();
+ // pg might be in the process of being deleted
+ dout(5) << __func__ << " refusing to forward. " << (is_clean() ? "(clean) " : "(not clean) ") <<
+ (is_active() ? "(active) " : "(not active) ") << dendl;
}
+}
- scrub_can_preempt = msg->allow_preemption;
- scrub_preempted = false;
- scrubber.replica_scrubmap_pos.reset();
-
- requeue_scrub(msg->high_priority);
+void PG::replica_scrub(OpRequestRef op, ThreadPool::TPHandle& handle)
+{
+ dout(10) << __func__ << " (op)" << dendl;
+ if (m_scrubber)
+ m_scrubber->replica_scrub_op(op);
}
-/* Scrub:
- * PG_STATE_SCRUBBING is set when the scrub is queued
- *
- * scrub will be chunky if all OSDs in PG support chunky scrub
- * scrub will fail if OSDs are too old.
- */
-void PG::scrub(epoch_t queued, ThreadPool::TPHandle &handle)
-{
- OSDService *osds = osd;
- double scrub_sleep = osds->osd->scrub_sleep_time(scrubber.must_scrub);
- if (scrub_sleep > 0 &&
- (scrubber.state == PG::Scrubber::NEW_CHUNK ||
- scrubber.state == PG::Scrubber::INACTIVE) &&
- scrubber.needs_sleep) {
- ceph_assert(!scrubber.sleeping);
- dout(20) << __func__ << " state is INACTIVE|NEW_CHUNK, sleeping" << dendl;
-
- // Do an async sleep so we don't block the op queue
- spg_t pgid = get_pgid();
- int state = scrubber.state;
- auto scrub_requeue_callback =
- new LambdaContext([osds, pgid, state](int r) {
- PGRef pg = osds->osd->lookup_lock_pg(pgid);
- if (pg == nullptr) {
- lgeneric_dout(osds->osd->cct, 20)
- << "scrub_requeue_callback: Could not find "
- << "PG " << pgid << " can't complete scrub requeue after sleep"
- << dendl;
- return;
- }
- pg->scrubber.sleeping = false;
- pg->scrubber.needs_sleep = false;
- lgeneric_dout(pg->cct, 20)
- << "scrub_requeue_callback: slept for "
- << ceph_clock_now() - pg->scrubber.sleep_start
- << ", re-queuing scrub with state " << state << dendl;
- pg->scrub_queued = false;
- pg->requeue_scrub();
- pg->scrubber.sleep_start = utime_t();
- pg->unlock();
- });
- std::lock_guard l(osd->sleep_lock);
- osd->sleep_timer.add_event_after(scrub_sleep,
- scrub_requeue_callback);
- scrubber.sleeping = true;
- scrubber.sleep_start = ceph_clock_now();
- return;
- }
- if (pg_has_reset_since(queued)) {
- return;
- }
- ceph_assert(scrub_queued);
+void PG::scrub(epoch_t epoch_queued, ThreadPool::TPHandle& handle)
+{
+ dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
+ // a new scrub
scrub_queued = false;
- scrubber.needs_sleep = true;
-
- // for the replica
- if (!is_primary() &&
- scrubber.state == PG::Scrubber::BUILD_MAP_REPLICA) {
- chunky_scrub(handle);
- return;
- }
-
- if (!is_primary() || !is_active() || !is_clean() || !is_scrubbing()) {
- dout(10) << "scrub -- not primary or active or not clean" << dendl;
- state_clear(PG_STATE_SCRUBBING);
- state_clear(PG_STATE_REPAIR);
- state_clear(PG_STATE_DEEP_SCRUB);
- publish_stats_to_osd();
- return;
- }
-
- if (!scrubber.active) {
- ceph_assert(recovery_state.get_backfill_targets().empty());
-
- scrubber.deep = state_test(PG_STATE_DEEP_SCRUB);
-
- dout(10) << "starting a new chunky scrub" << dendl;
- }
-
- chunky_scrub(handle);
+ forward_scrub_event(&ScrubPgIF::initiate_regular_scrub, epoch_queued);
}
-void PG::abort_scrub()
+// note: no need to secure OSD resources for a recovery scrub
+void PG::recovery_scrub(epoch_t epoch_queued,
+ [[maybe_unused]] ThreadPool::TPHandle& handle)
{
- scrub_clear_state();
- scrub_unreserve_replicas();
+ dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
+ // a new scrub
+ scrub_queued = false;
+ forward_scrub_event(&ScrubPgIF::initiate_scrub_after_repair, epoch_queued);
}
-/*
- * Chunky scrub scrubs objects one chunk at a time with writes blocked for that
- * chunk.
- *
- * The object store is partitioned into chunks which end on hash boundaries. For
- * each chunk, the following logic is performed:
- *
- * (1) Block writes on the chunk
- * (2) Request maps from replicas
- * (3) Wait for pushes to be applied (after recovery)
- * (4) Wait for writes to flush on the chunk
- * (5) Wait for maps from replicas
- * (6) Compare / repair all scrub maps
- * (7) Wait for digest updates to apply
- *
- * This logic is encoded in the mostly linear state machine:
- *
- * +------------------+
- * _________v__________ |
- * | | |
- * | INACTIVE | |
- * |____________________| |
- * | |
- * | +----------+ |
- * _________v___v______ | |
- * | | | |
- * | NEW_CHUNK | | |
- * |____________________| | |
- * | | |
- * _________v__________ | |
- * | | | |
- * | WAIT_PUSHES | | |
- * |____________________| | |
- * | | |
- * _________v__________ | |
- * | | | |
- * | WAIT_LAST_UPDATE | | |
- * |____________________| | |
- * | | |
- * _________v__________ | |
- * | | | |
- * | BUILD_MAP | | |
- * |____________________| | |
- * | | |
- * _________v__________ | |
- * | | | |
- * | WAIT_REPLICAS | | |
- * |____________________| | |
- * | | |
- * _________v__________ | |
- * | | | |
- * | COMPARE_MAPS | | |
- * |____________________| | |
- * | | |
- * | | |
- * _________v__________ | |
- * | | | |
- * |WAIT_DIGEST_UPDATES | | |
- * |____________________| | |
- * | | | |
- * | +----------+ |
- * _________v__________ |
- * | | |
- * | FINISH | |
- * |____________________| |
- * | |
- * +------------------+
- *
- * The primary determines the last update from the subset by walking the log. If
- * it sees a log entry pertaining to a file in the chunk, it tells the replicas
- * to wait until that update is applied before building a scrub map. Both the
- * primary and replicas will wait for any active pushes to be applied.
- *
- * In contrast to classic_scrub, chunky_scrub is entirely handled by scrub_wq.
- *
- * scrubber.state encodes the current state of the scrub (refer to state diagram
- * for details).
- */
-void PG::chunky_scrub(ThreadPool::TPHandle &handle)
-{
- // check for map changes
- if (scrubber.is_chunky_scrub_active()) {
- if (scrubber.epoch_start != info.history.same_interval_since) {
- dout(10) << "scrub pg changed, aborting" << dendl;
- abort_scrub();
- return;
- }
- }
-
- bool done = false;
- int ret;
-
- while (!done) {
- dout(20) << "scrub state " << Scrubber::state_string(scrubber.state)
- << " [" << scrubber.start << "," << scrubber.end << ")"
- << " max_end " << scrubber.max_end << dendl;
-
- switch (scrubber.state) {
- case PG::Scrubber::INACTIVE:
- dout(10) << "scrub start" << dendl;
- ceph_assert(is_primary());
-
- publish_stats_to_osd();
- scrubber.epoch_start = info.history.same_interval_since;
- scrubber.active = true;
-
- {
- ObjectStore::Transaction t;
- scrubber.cleanup_store(&t);
- scrubber.store.reset(Scrub::Store::create(osd->store, &t,
- info.pgid, coll));
- osd->store->queue_transaction(ch, std::move(t), nullptr);
- }
-
- // Don't include temporary objects when scrubbing
- scrubber.start = info.pgid.pgid.get_hobj_start();
- scrubber.state = PG::Scrubber::NEW_CHUNK;
-
- {
- bool repair = state_test(PG_STATE_REPAIR);
- bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
- const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
- stringstream oss;
- oss << info.pgid.pgid << " " << mode << " starts" << std::endl;
- osd->clog->debug(oss);
- }
-
- scrubber.preempt_left = cct->_conf.get_val<uint64_t>(
- "osd_scrub_max_preemptions");
- scrubber.preempt_divisor = 1;
- break;
-
- case PG::Scrubber::NEW_CHUNK:
- scrubber.primary_scrubmap = ScrubMap();
- scrubber.received_maps.clear();
-
- // begin (possible) preemption window
- if (scrub_preempted) {
- scrubber.preempt_left--;
- scrubber.preempt_divisor *= 2;
- dout(10) << __func__ << " preempted, " << scrubber.preempt_left
- << " left" << dendl;
- scrub_preempted = false;
- }
- scrub_can_preempt = scrubber.preempt_left > 0;
-
- {
- /* get the start and end of our scrub chunk
- *
- * Our scrub chunk has an important restriction we're going to need to
- * respect. We can't let head be start or end.
- * Using a half-open interval means that if end == head,
- * we'd scrub/lock head and the clone right next to head in different
- * chunks which would allow us to miss clones created between
- * scrubbing that chunk and scrubbing the chunk including head.
- * This isn't true for any of the other clones since clones can
- * only be created "just to the left of" head. There is one exception
- * to this: promotion of clones which always happens to the left of the
- * left-most clone, but promote_object checks the scrubber in that
- * case, so it should be ok. Also, it's ok to "miss" clones at the
- * left end of the range if we are a tier because they may legitimately
- * not exist (see _scrub).
- */
- ceph_assert(scrubber.preempt_divisor > 0);
- int min = std::max<int64_t>(3, cct->_conf->osd_scrub_chunk_min /
- scrubber.preempt_divisor);
- int max = std::max<int64_t>(min, cct->_conf->osd_scrub_chunk_max /
- scrubber.preempt_divisor);
- hobject_t start = scrubber.start;
- hobject_t candidate_end;
- vector<hobject_t> objects;
- ret = get_pgbackend()->objects_list_partial(
- start,
- min,
- max,
- &objects,
- &candidate_end);
- ceph_assert(ret >= 0);
-
- if (!objects.empty()) {
- hobject_t back = objects.back();
- while (candidate_end.is_head() &&
- candidate_end == back.get_head()) {
- candidate_end = back;
- objects.pop_back();
- if (objects.empty()) {
- ceph_assert(0 ==
- "Somehow we got more than 2 objects which"
- "have the same head but are not clones");
- }
- back = objects.back();
- }
- if (candidate_end.is_head()) {
- ceph_assert(candidate_end != back.get_head());
- candidate_end = candidate_end.get_object_boundary();
- }
- } else {
- ceph_assert(candidate_end.is_max());
- }
-
- if (!_range_available_for_scrub(scrubber.start, candidate_end)) {
- // we'll be requeued by whatever made us unavailable for scrub
- dout(10) << __func__ << ": scrub blocked somewhere in range "
- << "[" << scrubber.start << ", " << candidate_end << ")"
- << dendl;
- done = true;
- break;
- }
- scrubber.end = candidate_end;
- if (scrubber.end > scrubber.max_end)
- scrubber.max_end = scrubber.end;
- }
-
- // walk the log to find the latest update that affects our chunk
- scrubber.subset_last_update = eversion_t();
- for (auto p = projected_log.log.rbegin();
- p != projected_log.log.rend();
- ++p) {
- if (p->soid >= scrubber.start &&
- p->soid < scrubber.end) {
- scrubber.subset_last_update = p->version;
- break;
- }
- }
- if (scrubber.subset_last_update == eversion_t()) {
- for (list<pg_log_entry_t>::const_reverse_iterator p =
- recovery_state.get_pg_log().get_log().log.rbegin();
- p != recovery_state.get_pg_log().get_log().log.rend();
- ++p) {
- if (p->soid >= scrubber.start &&
- p->soid < scrubber.end) {
- scrubber.subset_last_update = p->version;
- break;
- }
- }
- }
-
- scrubber.state = PG::Scrubber::WAIT_PUSHES;
- break;
-
- case PG::Scrubber::WAIT_PUSHES:
- if (active_pushes == 0) {
- scrubber.state = PG::Scrubber::WAIT_LAST_UPDATE;
- } else {
- dout(15) << "wait for pushes to apply" << dendl;
- done = true;
- }
- break;
-
- case PG::Scrubber::WAIT_LAST_UPDATE:
- if (recovery_state.get_last_update_applied() <
- scrubber.subset_last_update) {
- // will be requeued by op_applied
- dout(15) << "wait for EC read/modify/writes to queue" << dendl;
- done = true;
- break;
- }
-
- // ask replicas to scan
- scrubber.waiting_on_whom.insert(pg_whoami);
-
- // request maps from replicas
- for (set<pg_shard_t>::iterator i = get_acting_recovery_backfill().begin();
- i != get_acting_recovery_backfill().end();
- ++i) {
- if (*i == pg_whoami) continue;
- _request_scrub_map(*i, scrubber.subset_last_update,
- scrubber.start, scrubber.end, scrubber.deep,
- scrubber.preempt_left > 0);
- scrubber.waiting_on_whom.insert(*i);
- }
- dout(10) << __func__ << " waiting_on_whom " << scrubber.waiting_on_whom
- << dendl;
-
- scrubber.state = PG::Scrubber::BUILD_MAP;
- scrubber.primary_scrubmap_pos.reset();
- break;
-
- case PG::Scrubber::BUILD_MAP:
- ceph_assert(recovery_state.get_last_update_applied() >=
- scrubber.subset_last_update);
-
- // build my own scrub map
- if (scrub_preempted) {
- dout(10) << __func__ << " preempted" << dendl;
- scrubber.state = PG::Scrubber::BUILD_MAP_DONE;
- break;
- }
- ret = build_scrub_map_chunk(
- scrubber.primary_scrubmap,
- scrubber.primary_scrubmap_pos,
- scrubber.start, scrubber.end,
- scrubber.deep,
- handle);
- if (ret == -EINPROGRESS) {
- requeue_scrub();
- done = true;
- break;
- }
- scrubber.state = PG::Scrubber::BUILD_MAP_DONE;
- break;
-
- case PG::Scrubber::BUILD_MAP_DONE:
- if (scrubber.primary_scrubmap_pos.ret < 0) {
- dout(5) << "error: " << scrubber.primary_scrubmap_pos.ret
- << ", aborting" << dendl;
- scrub_clear_state();
- scrub_unreserve_replicas();
- return;
- }
- dout(10) << __func__ << " waiting_on_whom was "
- << scrubber.waiting_on_whom << dendl;
- ceph_assert(scrubber.waiting_on_whom.count(pg_whoami));
- scrubber.waiting_on_whom.erase(pg_whoami);
-
- scrubber.state = PG::Scrubber::WAIT_REPLICAS;
- break;
-
- case PG::Scrubber::WAIT_REPLICAS:
- if (!scrubber.waiting_on_whom.empty()) {
- // will be requeued by do_replica_scrub_map
- dout(10) << "wait for replicas to build scrub map" << dendl;
- done = true;
- break;
- }
- // Since repair is only by request and we need to scrub afterward
- // treat the same as req_scrub.
- if (!scrubber.req_scrub) {
- if (state_test(PG_STATE_DEEP_SCRUB)) {
- if (get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
- pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB)) {
- dout(10) << "nodeep_scrub set, aborting" << dendl;
- abort_scrub();
- return;
- }
- } else if (state_test(PG_STATE_SCRUBBING)) {
- if (get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) || pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB)) {
- dout(10) << "noscrub set, aborting" << dendl;
- abort_scrub();
- return;
- }
- }
- }
- // end (possible) preemption window
- scrub_can_preempt = false;
- if (scrub_preempted) {
- dout(10) << __func__ << " preempted, restarting chunk" << dendl;
- scrubber.state = PG::Scrubber::NEW_CHUNK;
- } else {
- scrubber.state = PG::Scrubber::COMPARE_MAPS;
- }
- break;
-
- case PG::Scrubber::COMPARE_MAPS:
- ceph_assert(recovery_state.get_last_update_applied() >=
- scrubber.subset_last_update);
- ceph_assert(scrubber.waiting_on_whom.empty());
-
- scrub_compare_maps();
- scrubber.start = scrubber.end;
- scrubber.run_callbacks();
-
- // requeue the writes from the chunk that just finished
- requeue_ops(waiting_for_scrub);
-
- scrubber.state = PG::Scrubber::WAIT_DIGEST_UPDATES;
-
- // fall-thru
-
- case PG::Scrubber::WAIT_DIGEST_UPDATES:
- if (scrubber.num_digest_updates_pending) {
- dout(10) << __func__ << " waiting on "
- << scrubber.num_digest_updates_pending
- << " digest updates" << dendl;
- done = true;
- break;
- }
-
- scrubber.preempt_left = cct->_conf.get_val<uint64_t>(
- "osd_scrub_max_preemptions");
- scrubber.preempt_divisor = 1;
-
- if (!(scrubber.end.is_max())) {
- scrubber.state = PG::Scrubber::NEW_CHUNK;
- requeue_scrub();
- done = true;
- } else {
- scrubber.state = PG::Scrubber::FINISH;
- }
-
- break;
-
- case PG::Scrubber::FINISH:
- scrub_finish();
- scrubber.state = PG::Scrubber::INACTIVE;
- done = true;
-
- if (!snap_trimq.empty()) {
- dout(10) << "scrub finished, requeuing snap_trimmer" << dendl;
- snap_trimmer_scrub_complete();
- }
-
- break;
-
- case PG::Scrubber::BUILD_MAP_REPLICA:
- // build my own scrub map
- if (scrub_preempted) {
- dout(10) << __func__ << " preempted" << dendl;
- ret = 0;
- } else {
- ret = build_scrub_map_chunk(
- scrubber.replica_scrubmap,
- scrubber.replica_scrubmap_pos,
- scrubber.start, scrubber.end,
- scrubber.deep,
- handle);
- }
- if (ret == -EINPROGRESS) {
- requeue_scrub();
- done = true;
- break;
- }
- // reply
- {
- MOSDRepScrubMap *reply = new MOSDRepScrubMap(
- spg_t(info.pgid.pgid, get_primary().shard),
- scrubber.replica_scrub_start,
- pg_whoami);
- reply->preempted = scrub_preempted;
- ::encode(scrubber.replica_scrubmap, reply->get_data());
- osd->send_message_osd_cluster(
- get_primary().osd, reply,
- scrubber.replica_scrub_start);
- }
- scrub_preempted = false;
- scrub_can_preempt = false;
- scrubber.state = PG::Scrubber::INACTIVE;
- scrubber.replica_scrubmap = ScrubMap();
- scrubber.replica_scrubmap_pos = ScrubMapBuilder();
- scrubber.start = hobject_t();
- scrubber.end = hobject_t();
- scrubber.max_end = hobject_t();
- done = true;
- break;
-
- default:
- ceph_abort();
- }
- }
- dout(20) << "scrub final state " << Scrubber::state_string(scrubber.state)
- << " [" << scrubber.start << "," << scrubber.end << ")"
- << " max_end " << scrubber.max_end << dendl;
+void PG::replica_scrub(epoch_t epoch_queued,
+ [[maybe_unused]] ThreadPool::TPHandle& handle)
+{
+ dout(10) << __func__ << " queued at: " << epoch_queued
+ << (is_primary() ? " (primary)" : " (replica)") << dendl;
+ scrub_queued = false;
+ forward_scrub_event(&ScrubPgIF::send_start_replica, epoch_queued);
}
-bool PG::write_blocked_by_scrub(const hobject_t& soid)
+void PG::scrub_send_scrub_resched(epoch_t epoch_queued,
+ [[maybe_unused]] ThreadPool::TPHandle& handle)
{
- if (soid < scrubber.start || soid >= scrubber.end) {
- return false;
- }
- if (scrub_can_preempt) {
- if (!scrub_preempted) {
- dout(10) << __func__ << " " << soid << " preempted" << dendl;
- scrub_preempted = true;
- } else {
- dout(10) << __func__ << " " << soid << " already preempted" << dendl;
- }
- return false;
- }
- return true;
+ dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
+ scrub_queued = false;
+ forward_scrub_event(&ScrubPgIF::send_scrub_resched, epoch_queued);
}
-bool PG::range_intersects_scrub(const hobject_t &start, const hobject_t& end)
+void PG::scrub_send_resources_granted(epoch_t epoch_queued,
+ [[maybe_unused]] ThreadPool::TPHandle& handle)
{
- // does [start, end] intersect [scrubber.start, scrubber.max_end)
- return (start < scrubber.max_end &&
- end >= scrubber.start);
+ dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
+ forward_scrub_event(&ScrubPgIF::send_remotes_reserved, epoch_queued);
}
-void PG::scrub_clear_state(bool has_error)
+void PG::scrub_send_resources_denied(epoch_t epoch_queued,
+ [[maybe_unused]] ThreadPool::TPHandle& handle)
{
- ceph_assert(is_locked());
- state_clear(PG_STATE_SCRUBBING);
- if (!has_error)
- state_clear(PG_STATE_REPAIR);
- state_clear(PG_STATE_DEEP_SCRUB);
- publish_stats_to_osd();
-
- scrubber.req_scrub = false;
- // local -> nothing.
- if (scrubber.local_reserved) {
- osd->dec_scrubs_local();
- scrubber.local_reserved = false;
- scrubber.reserved_peers.clear();
- }
-
- requeue_ops(waiting_for_scrub);
-
- scrubber.reset();
-
- // type-specific state clear
- _scrub_clear_state();
+ dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
+ forward_scrub_event(&ScrubPgIF::send_reservation_failure, epoch_queued);
}
-void PG::scrub_compare_maps()
+void PG::replica_scrub_resched(epoch_t epoch_queued,
+ [[maybe_unused]] ThreadPool::TPHandle& handle)
{
- dout(10) << __func__ << " has maps, analyzing" << dendl;
-
- // construct authoritative scrub map for type specific scrubbing
- scrubber.cleaned_meta_map.insert(scrubber.primary_scrubmap);
- map<hobject_t,
- pair<std::optional<uint32_t>,
- std::optional<uint32_t>>> missing_digest;
-
- map<pg_shard_t, ScrubMap *> maps;
- maps[pg_whoami] = &scrubber.primary_scrubmap;
-
- for (const auto& i : get_acting_recovery_backfill()) {
- if (i == pg_whoami) continue;
- dout(2) << __func__ << " replica " << i << " has "
- << scrubber.received_maps[i].objects.size()
- << " items" << dendl;
- maps[i] = &scrubber.received_maps[i];
- }
-
- set<hobject_t> master_set;
-
- // Construct master set
- for (const auto map : maps) {
- for (const auto i : map.second->objects) {
- master_set.insert(i.first);
- }
- }
-
- stringstream ss;
- get_pgbackend()->be_omap_checks(maps, master_set,
- scrubber.omap_stats, ss);
-
- if (!ss.str().empty()) {
- osd->clog->warn(ss);
- }
-
- if (recovery_state.get_acting().size() > 1) {
- dout(10) << __func__ << " comparing replica scrub maps" << dendl;
-
- // Map from object with errors to good peer
- map<hobject_t, list<pg_shard_t>> authoritative;
-
- dout(2) << __func__ << get_primary() << " has "
- << scrubber.primary_scrubmap.objects.size() << " items" << dendl;
-
- ss.str("");
- ss.clear();
-
- get_pgbackend()->be_compare_scrubmaps(
- maps,
- master_set,
- state_test(PG_STATE_REPAIR),
- scrubber.missing,
- scrubber.inconsistent,
- authoritative,
- missing_digest,
- scrubber.shallow_errors,
- scrubber.deep_errors,
- scrubber.store.get(),
- info.pgid, recovery_state.get_acting(),
- ss);
- dout(2) << ss.str() << dendl;
-
- if (!ss.str().empty()) {
- osd->clog->error(ss);
- }
-
- for (map<hobject_t, list<pg_shard_t>>::iterator i = authoritative.begin();
- i != authoritative.end();
- ++i) {
- list<pair<ScrubMap::object, pg_shard_t> > good_peers;
- for (list<pg_shard_t>::const_iterator j = i->second.begin();
- j != i->second.end();
- ++j) {
- good_peers.emplace_back(maps[*j]->objects[i->first], *j);
- }
- scrubber.authoritative.emplace(i->first, good_peers);
- }
-
- for (map<hobject_t, list<pg_shard_t>>::iterator i = authoritative.begin();
- i != authoritative.end();
- ++i) {
- scrubber.cleaned_meta_map.objects.erase(i->first);
- scrubber.cleaned_meta_map.objects.insert(
- *(maps[i->second.back()]->objects.find(i->first))
- );
- }
- }
-
- ScrubMap for_meta_scrub;
- scrubber.clean_meta_map(for_meta_scrub);
-
- // ok, do the pg-type specific scrubbing
- scrub_snapshot_metadata(for_meta_scrub, missing_digest);
- // Called here on the primary can use an authoritative map if it isn't the primary
- _scan_snaps(for_meta_scrub);
- if (!scrubber.store->empty()) {
- if (state_test(PG_STATE_REPAIR)) {
- dout(10) << __func__ << ": discarding scrub results" << dendl;
- scrubber.store->flush(nullptr);
- } else {
- dout(10) << __func__ << ": updating scrub object" << dendl;
- ObjectStore::Transaction t;
- scrubber.store->flush(&t);
- osd->store->queue_transaction(ch, std::move(t), nullptr);
- }
- }
+ dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
+ scrub_queued = false;
+ forward_scrub_event(&ScrubPgIF::send_sched_replica, epoch_queued);
}
-bool PG::scrub_process_inconsistent()
-{
- dout(10) << __func__ << ": checking authoritative" << dendl;
- bool repair = state_test(PG_STATE_REPAIR);
- bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
- const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
-
- // authoriative only store objects which missing or inconsistent.
- if (!scrubber.authoritative.empty()) {
- stringstream ss;
- ss << info.pgid << " " << mode << " "
- << scrubber.missing.size() << " missing, "
- << scrubber.inconsistent.size() << " inconsistent objects";
- dout(2) << ss.str() << dendl;
- osd->clog->error(ss);
- if (repair) {
- state_clear(PG_STATE_CLEAN);
- for (map<hobject_t, list<pair<ScrubMap::object, pg_shard_t> >>::iterator i =
- scrubber.authoritative.begin();
- i != scrubber.authoritative.end();
- ++i) {
- auto missing_entry = scrubber.missing.find(i->first);
- if (missing_entry != scrubber.missing.end()) {
- repair_object(
- i->first,
- i->second,
- missing_entry->second);
- scrubber.fixed += missing_entry->second.size();
- }
- if (scrubber.inconsistent.count(i->first)) {
- repair_object(
- i->first,
- i->second,
- scrubber.inconsistent[i->first]);
- scrubber.fixed += missing_entry->second.size();
- }
- }
- }
- }
- return (!scrubber.authoritative.empty() && repair);
+void PG::scrub_send_pushes_update(epoch_t epoch_queued,
+ [[maybe_unused]] ThreadPool::TPHandle& handle)
+{
+ dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
+ forward_scrub_event(&ScrubPgIF::active_pushes_notification, epoch_queued);
}
-bool PG::ops_blocked_by_scrub() const {
- return (waiting_for_scrub.size() != 0);
+void PG::scrub_send_replica_pushes(epoch_t epoch_queued,
+ [[maybe_unused]] ThreadPool::TPHandle& handle)
+{
+ dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
+ forward_scrub_event(&ScrubPgIF::send_replica_pushes_upd, epoch_queued);
}
-// the part that actually finalizes a scrub
-void PG::scrub_finish()
+void PG::scrub_send_applied_update(epoch_t epoch_queued,
+ [[maybe_unused]] ThreadPool::TPHandle& handle)
{
- dout(20) << __func__ << dendl;
- bool repair = state_test(PG_STATE_REPAIR);
- bool do_auto_scrub = false;
- // if the repair request comes from auto-repair and large number of errors,
- // we would like to cancel auto-repair
- if (repair && scrubber.auto_repair
- && scrubber.authoritative.size() > cct->_conf->osd_scrub_auto_repair_num_errors) {
- state_clear(PG_STATE_REPAIR);
- repair = false;
- }
- bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
- const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
-
- // if a regular scrub had errors within the limit, do a deep scrub to auto repair.
- if (scrubber.deep_scrub_on_error
- && scrubber.authoritative.size()
- && scrubber.authoritative.size() <= cct->_conf->osd_scrub_auto_repair_num_errors) {
- ceph_assert(!deep_scrub);
- do_auto_scrub = true;
- dout(20) << __func__ << " Try to auto repair after scrub errors" << dendl;
- }
- scrubber.deep_scrub_on_error = false;
-
- // type-specific finish (can tally more errors)
- _scrub_finish();
-
- bool has_error = scrub_process_inconsistent();
-
- {
- stringstream oss;
- oss << info.pgid.pgid << " " << mode << " ";
- int total_errors = scrubber.shallow_errors + scrubber.deep_errors;
- if (total_errors)
- oss << total_errors << " errors";
- else
- oss << "ok";
- if (!deep_scrub && info.stats.stats.sum.num_deep_scrub_errors)
- oss << " ( " << info.stats.stats.sum.num_deep_scrub_errors
- << " remaining deep scrub error details lost)";
- if (repair)
- oss << ", " << scrubber.fixed << " fixed";
- if (total_errors)
- osd->clog->error(oss);
- else
- osd->clog->debug(oss);
- }
-
- // Since we don't know which errors were fixed, we can only clear them
- // when every one has been fixed.
- if (repair) {
- if (scrubber.fixed == scrubber.shallow_errors + scrubber.deep_errors) {
- ceph_assert(deep_scrub);
- scrubber.shallow_errors = scrubber.deep_errors = 0;
- dout(20) << __func__ << " All may be fixed" << dendl;
- } else if (has_error) {
- // Deep scrub in order to get corrected error counts
- scrub_after_recovery = true;
- save_req_scrub = scrubber.req_scrub;
- dout(20) << __func__ << " Set scrub_after_recovery, req_scrub=" << save_req_scrub << dendl;
- } else if (scrubber.shallow_errors || scrubber.deep_errors) {
- // We have errors but nothing can be fixed, so there is no repair
- // possible.
- state_set(PG_STATE_FAILED_REPAIR);
- dout(10) << __func__ << " " << (scrubber.shallow_errors + scrubber.deep_errors)
- << " error(s) present with no repair possible" << dendl;
- }
- }
-
- {
- // finish up
- ObjectStore::Transaction t;
- recovery_state.update_stats(
- [this, deep_scrub](auto &history, auto &stats) {
- utime_t now = ceph_clock_now();
- history.last_scrub = recovery_state.get_info().last_update;
- history.last_scrub_stamp = now;
- if (scrubber.deep) {
- history.last_deep_scrub = recovery_state.get_info().last_update;
- history.last_deep_scrub_stamp = now;
- }
+ dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
+ forward_scrub_event(&ScrubPgIF::update_applied_notification, epoch_queued);
+}
- if (deep_scrub) {
- if ((scrubber.shallow_errors == 0) && (scrubber.deep_errors == 0))
- history.last_clean_scrub_stamp = now;
- stats.stats.sum.num_shallow_scrub_errors = scrubber.shallow_errors;
- stats.stats.sum.num_deep_scrub_errors = scrubber.deep_errors;
- stats.stats.sum.num_large_omap_objects = scrubber.omap_stats.large_omap_objects;
- stats.stats.sum.num_omap_bytes = scrubber.omap_stats.omap_bytes;
- stats.stats.sum.num_omap_keys = scrubber.omap_stats.omap_keys;
- dout(25) << "scrub_finish shard " << pg_whoami << " num_omap_bytes = "
- << stats.stats.sum.num_omap_bytes << " num_omap_keys = "
- << stats.stats.sum.num_omap_keys << dendl;
- } else {
- stats.stats.sum.num_shallow_scrub_errors = scrubber.shallow_errors;
- // XXX: last_clean_scrub_stamp doesn't mean the pg is not inconsistent
- // because of deep-scrub errors
- if (scrubber.shallow_errors == 0)
- history.last_clean_scrub_stamp = now;
- }
- stats.stats.sum.num_scrub_errors =
- stats.stats.sum.num_shallow_scrub_errors +
- stats.stats.sum.num_deep_scrub_errors;
- if (scrubber.check_repair) {
- scrubber.check_repair = false;
- if (info.stats.stats.sum.num_scrub_errors) {
- state_set(PG_STATE_FAILED_REPAIR);
- dout(10) << "scrub_finish " << info.stats.stats.sum.num_scrub_errors
- << " error(s) still present after re-scrub" << dendl;
- }
- }
- return true;
- },
- &t);
- int tr = osd->store->queue_transaction(ch, std::move(t), NULL);
- ceph_assert(tr == 0);
- }
+void PG::scrub_send_unblocking(epoch_t epoch_queued,
+ [[maybe_unused]] ThreadPool::TPHandle& handle)
+{
+ dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
+ forward_scrub_event(&ScrubPgIF::send_scrub_unblock, epoch_queued);
+}
- if (has_error) {
- queue_peering_event(
- PGPeeringEventRef(
- std::make_shared<PGPeeringEvent>(
- get_osdmap_epoch(),
- get_osdmap_epoch(),
- PeeringState::DoRecovery())));
- }
+void PG::scrub_send_digest_update(epoch_t epoch_queued,
+ [[maybe_unused]] ThreadPool::TPHandle& handle)
+{
+ dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
+ forward_scrub_event(&ScrubPgIF::digest_update_notification, epoch_queued);
+}
- scrub_clear_state(has_error);
- scrub_unreserve_replicas();
+void PG::scrub_send_replmaps_ready(epoch_t epoch_queued,
+ [[maybe_unused]] ThreadPool::TPHandle& handle)
+{
+ dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
+ forward_scrub_event(&ScrubPgIF::send_replica_maps_ready, epoch_queued);
+}
- if (do_auto_scrub) {
- scrub_requested(false, false, true);
- }
+bool PG::ops_blocked_by_scrub() const
+{
+ return !waiting_for_scrub.empty();
+}
- if (is_active() && is_primary()) {
- recovery_state.share_pg_info();
- }
+Scrub::scrub_prio_t PG::is_scrub_blocking_ops() const
+{
+ return waiting_for_scrub.empty() ? Scrub::scrub_prio_t::low_priority
+ : Scrub::scrub_prio_t::high_priority;
}
bool PG::old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch)
{
- if (get_last_peering_reset() > reply_epoch ||
- get_last_peering_reset() > query_epoch) {
- dout(10) << "old_peering_msg reply_epoch " << reply_epoch << " query_epoch " << query_epoch
- << " last_peering_reset " << get_last_peering_reset()
- << dendl;
+ if (auto last_reset = get_last_peering_reset();
+ last_reset > reply_epoch || last_reset > query_epoch) {
+ dout(10) << "old_peering_msg reply_epoch " << reply_epoch << " query_epoch "
+ << query_epoch << " last_peering_reset " << last_reset << dendl;
return true;
}
return false;
bool PG::try_flush_or_schedule_async()
{
-
Context *c = new QueuePeeringEvt(
this, get_osdmap_epoch(), PeeringState::IntervalFlush());
if (!ch->flush_commit(c)) {
ostream& operator<<(ostream& out, const PG& pg)
{
out << pg.recovery_state;
- if (pg.scrubber.must_repair)
- out << " MUST_REPAIR";
- if (pg.scrubber.auto_repair)
- out << " AUTO_REPAIR";
- if (pg.scrubber.check_repair)
- out << " CHECK_REPAIR";
- if (pg.scrubber.deep_scrub_on_error)
- out << " DEEP_SCRUB_ON_ERROR";
- if (pg.scrubber.must_deep_scrub)
- out << " MUST_DEEP_SCRUB";
- if (pg.scrubber.must_scrub)
- out << " MUST_SCRUB";
- if (pg.scrubber.time_for_deep)
- out << " TIME_FOR_DEEP";
- if (pg.scrubber.need_auto)
- out << " NEED_AUTO";
- if (pg.scrubber.req_scrub)
- out << " REQ_SCRUB";
+
+ // listing all scrub-related flags - both current and "planned next scrub"
+ if (pg.is_scrubbing()) {
+ out << *pg.m_scrubber;
+ }
+ out << pg.m_planned_scrub;
if (pg.recovery_ops_active)
out << " rops=" << pg.recovery_ops_active;
}
out << "]";
-
-
return out;
}
// resets the messenger sesssion when the replica reconnects. to avoid the
// out-of-order replies, the messages from that replica should be discarded.
OSDMapRef next_map = osd->get_next_osdmap();
- if (next_map->is_down(from))
+ if (next_map->is_down(from)) {
+ dout(20) << " " << __func__ << " dead for nextmap is down " << from << dendl;
return true;
+ }
/* Mostly, this overlaps with the old_peering_msg
* condition. An important exception is pushes
* sent by replicas not in the acting set, since
* if such a replica goes down it does not cause
* a new interval. */
- if (next_map->get_down_at(from) >= m->map_epoch)
+ if (next_map->get_down_at(from) >= m->map_epoch) {
+ dout(20) << " " << __func__ << " dead for 'get_down_at' " << from << dendl;
return true;
+ }
// same pg?
// if pg changes _at all_, we reset and repeer!
recovery_state.handle_event(evt, &rctx);
}
+
void PG::handle_query_state(Formatter *f)
{
dout(10) << "handle_query_state" << dendl;
PeeringState::QueryState q(f);
recovery_state.handle_event(q, 0);
- if (is_primary() && is_active()) {
- f->open_object_section("scrub");
- f->dump_stream("scrubber.epoch_start") << scrubber.epoch_start;
- f->dump_bool("scrubber.active", scrubber.active);
- f->dump_string("scrubber.state", PG::Scrubber::state_string(scrubber.state));
- f->dump_stream("scrubber.start") << scrubber.start;
- f->dump_stream("scrubber.end") << scrubber.end;
- f->dump_stream("scrubber.max_end") << scrubber.max_end;
- f->dump_stream("scrubber.subset_last_update") << scrubber.subset_last_update;
- f->dump_bool("scrubber.deep", scrubber.deep);
- {
- f->open_array_section("scrubber.waiting_on_whom");
- for (set<pg_shard_t>::iterator p = scrubber.waiting_on_whom.begin();
- p != scrubber.waiting_on_whom.end();
- ++p) {
- f->dump_stream("shard") << *p;
- }
- f->close_section();
- }
- f->close_section();
+ // This code has moved to after the close of recovery_state array.
+ // I don't think that scrub is a recovery state
+ if (is_primary() && is_active() && m_scrubber && m_scrubber->is_scrub_active()) {
+ m_scrubber->handle_query_state(f);
}
}
delete this;
}
-ghobject_t PG::do_delete_work(ObjectStore::Transaction &t,
- ghobject_t _next)
+std::pair<ghobject_t, bool> PG::do_delete_work(
+ ObjectStore::Transaction &t,
+ ghobject_t _next)
{
dout(10) << __func__ << dendl;
osd->sleep_timer.add_event_at(delete_schedule_time,
delete_requeue_callback);
dout(20) << __func__ << " Delete scheduled at " << delete_schedule_time << dendl;
- return _next;
+ return std::make_pair(_next, true);
}
}
t.remove(coll, oid);
++num;
}
+ bool running = true;
if (num) {
dout(20) << __func__ << " deleting " << num << " objects" << dendl;
Context *fin = new C_DeleteMore(this, get_osdmap_epoch());
// exit() methods don't run when that happens.
osd->local_reserver.cancel_reservation(info.pgid);
- osd->logger->dec(l_osd_pg_removing);
+ running = false;
}
}
- return next;
+ return {next, running};
}
int PG::pg_stat_adjust(osd_stat_t *ns)
return 0;
}
-ostream& operator<<(ostream& out, const PG::BackfillInterval& bi)
-{
- out << "BackfillInfo(" << bi.begin << "-" << bi.end
- << " " << bi.objects.size() << " objects";
- if (!bi.objects.empty())
- out << " " << bi.objects;
- out << ")";
- return out;
-}
-
void PG::dump_pgstate_history(Formatter *f)
{
std::scoped_lock l{*this};