#ifndef CEPH_PG_H
#define CEPH_PG_H
-#include <boost/statechart/custom_reaction.hpp>
-#include <boost/statechart/event.hpp>
-#include <boost/statechart/simple_state.hpp>
-#include <boost/statechart/state.hpp>
-#include <boost/statechart/state_machine.hpp>
-#include <boost/statechart/transition.hpp>
-#include <boost/statechart/event_base.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/container/flat_set.hpp>
#include "include/mempool.h"
#include "PGLog.h"
#include "OSDMap.h"
-#include "messages/MOSDPGLog.h"
#include "include/str_list.h"
#include "PGBackend.h"
#include "PGPeeringEvent.h"
#include "PeeringState.h"
+#include "recovery_types.h"
#include "MissingLoc.h"
+#include "scrubber_common.h"
#include "mgr/OSDPerfMetricTypes.h"
#include <string>
#include <tuple>
-//#define DEBUG_RECOVERY_OIDS // track set of recovering oids explicitly, to find counting bugs
+//#define DEBUG_RECOVERY_OIDS // track std::set of recovering oids explicitly, to find counting bugs
//#define PG_DEBUG_REFS // track provenance of pg refs, helpful for finding leaks
class OSD;
class OSDService;
class OSDShard;
class OSDShardPGSlot;
-class MOSDOp;
-class MOSDPGScan;
-class MOSDPGBackfill;
-class MOSDPGInfo;
class PG;
struct OpRequest;
typedef OpRequest::Ref OpRequestRef;
-class MOSDPGLog;
class DynamicPerfStats;
+class PgScrubber;
namespace Scrub {
class Store;
+ class ReplicaReservations;
+ class LocalReservation;
+ class ReservedByRemotePrimary;
+ enum class schedule_result_t;
}
#ifdef PG_DEBUG_REFS
// cppcheck-suppress unreachableCode
per_state_info() : enter(0), exit(0), events(0) {}
};
- map<const char *,per_state_info> info;
+ std::map<const char *,per_state_info> info;
ceph::mutex lock = ceph::make_mutex("PGRecoverStats::lock");
public:
}
void dump(ostream& out) {
std::lock_guard l(lock);
- for (map<const char *,per_state_info>::iterator p = info.begin(); p != info.end(); ++p) {
+ for (std::map<const char *,per_state_info>::iterator p = info.begin(); p != info.end(); ++p) {
per_state_info& i = p->second;
out << i.enter << "\t" << i.exit << "\t"
<< i.events << "\t" << i.event_time << "\t"
}
}
- void dump_formatted(Formatter *f) {
+ void dump_formatted(ceph::Formatter *f) {
std::lock_guard l(lock);
f->open_array_section("pg_recovery_stats");
- for (map<const char *,per_state_info>::iterator p = info.begin();
+ for (std::map<const char *,per_state_info>::iterator p = info.begin();
p != info.end(); ++p) {
per_state_info& i = p->second;
f->open_object_section("recovery_state");
f->dump_stream("total_time") << i.total_time;
f->dump_stream("min_time") << i.min_time;
f->dump_stream("max_time") << i.max_time;
- vector<string> states;
+ std::vector<std::string> states;
get_str_vec(p->first, "/", states);
f->open_array_section("nested_states");
- for (vector<string>::iterator st = states.begin();
+ for (std::vector<std::string>::iterator st = states.begin();
st != states.end(); ++st) {
f->dump_string("state", *st);
}
*
*/
+/// Facilitating scrub-realated object access to private PG data
+class ScrubberPasskey {
+private:
+ friend class Scrub::ReplicaReservations;
+ friend class PrimaryLogScrub;
+ ScrubberPasskey() {}
+ ScrubberPasskey(const ScrubberPasskey&) = default;
+ ScrubberPasskey& operator=(const ScrubberPasskey&) = delete;
+};
+
class PG : public DoutPrefixProvider, public PeeringState::PeeringListener {
- friend class NamedState;
+ friend struct NamedState;
friend class PeeringState;
+ friend class PgScrubber;
public:
const pg_shard_t pg_whoami;
const spg_t pg_id;
+ /// the 'scrubber'. Will be allocated in the derivative (PrimaryLogPG) ctor,
+ /// and be removed only in the PrimaryLogPG destructor.
+ std::unique_ptr<ScrubPgIF> m_scrubber;
+
+ /// flags detailing scheduling/operation characteristics of the next scrub
+ requested_scrub_t m_planned_scrub;
+
+ /// scrubbing state for both Primary & replicas
+ bool is_scrub_active() const { return m_scrubber->is_scrub_active(); }
+
+ /// set when the scrub request is queued, and reset after scrubbing fully
+ /// cleaned up.
+ bool is_scrub_queued_or_active() const { return m_scrubber->is_queued_or_active(); }
+
public:
// -- members --
const coll_t coll;
const char *state_name, utime_t enter_time,
uint64_t events, utime_t event_dur) override;
- void lock_suspend_timeout(ThreadPool::TPHandle &handle) {
- handle.suspend_tp_timeout();
- lock();
- handle.reset_tp_timeout();
- }
void lock(bool no_lockdep = false) const;
void unlock() const;
bool is_locked() const;
});
}
+ static void add_objects_scrubbed_count(
+ int64_t count, pg_stat_t &stats) {
+ stats.objects_scrubbed += count;
+ }
+
+ void add_objects_scrubbed_count(int64_t count) {
+ recovery_state.update_stats(
+ [=](auto &history, auto &stats) {
+ add_objects_scrubbed_count(count, stats);
+ return true;
+ });
+ }
+
+ static void reset_objects_scrubbed(pg_stat_t &stats) {
+ stats.objects_scrubbed = 0;
+ }
+
+ void reset_objects_scrubbed() {
+ recovery_state.update_stats(
+ [=](auto &history, auto &stats) {
+ reset_objects_scrubbed(stats);
+ return true;
+ });
+ }
+
bool is_deleting() const {
return recovery_state.is_deleting();
}
int get_role() const {
return recovery_state.get_role();
}
- const vector<int> get_acting() const {
+ const std::vector<int> get_acting() const {
return recovery_state.get_acting();
}
- const set<pg_shard_t> &get_actingset() const {
+ const std::set<pg_shard_t> &get_actingset() const {
return recovery_state.get_actingset();
}
int get_acting_primary() const {
pg_shard_t get_primary() const {
return recovery_state.get_primary();
}
- const vector<int> get_up() const {
+ const std::vector<int> get_up() const {
return recovery_state.get_up();
}
int get_up_primary() const {
bool is_acting_recovery_backfill(pg_shard_t osd) const {
return recovery_state.is_acting_recovery_backfill(osd);
}
- const set<pg_shard_t> &get_acting_recovery_backfill() const {
+ const std::set<pg_shard_t> &get_acting_recovery_backfill() const {
return recovery_state.get_acting_recovery_backfill();
}
bool is_acting(pg_shard_t osd) const {
bool is_up(pg_shard_t osd) const {
return recovery_state.is_up(osd);
}
- static bool has_shard(bool ec, const vector<int>& v, pg_shard_t osd) {
+ static bool has_shard(bool ec, const std::vector<int>& v, pg_shard_t osd) {
return PeeringState::has_shard(ec, v, osd);
}
/// initialize created PG
void init(
int role,
- const vector<int>& up,
+ const std::vector<int>& up,
int up_primary,
- const vector<int>& acting,
+ const std::vector<int>& acting,
int acting_primary,
const pg_history_t& history,
const PastIntervals& pim,
- bool backfill,
ObjectStore::Transaction &t);
/// read existing pg state off disk
void update_snap_mapper_bits(uint32_t bits) {
snap_mapper.update_bits(bits);
}
- void start_split_stats(const set<spg_t>& childpgs, vector<object_stat_sum_t> *v);
+ void start_split_stats(const std::set<spg_t>& childpgs, std::vector<object_stat_sum_t> *v);
virtual void split_colls(
spg_t child,
int split_bits,
const pg_pool_t *pool,
ObjectStore::Transaction &t) = 0;
void split_into(pg_t child_pgid, PG *child, unsigned split_bits);
- void merge_from(map<spg_t,PGRef>& sources, PeeringCtx &rctx,
+ void merge_from(std::map<spg_t,PGRef>& sources, PeeringCtx &rctx,
unsigned split_bits,
const pg_merge_meta_t& last_pg_merge_meta);
void finish_split_stats(const object_stat_sum_t& stats,
ObjectStore::Transaction &t);
- void scrub(epoch_t queued, ThreadPool::TPHandle &handle);
+ void scrub(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ // a new scrub
+ forward_scrub_event(&ScrubPgIF::initiate_regular_scrub, queued, "StartScrub");
+ }
+
+ /**
+ * a special version of PG::scrub(), which:
+ * - is initiated after repair, and
+ * (not true anymore:)
+ * - is not required to allocate local/remote OSD scrub resources
+ */
+ void recovery_scrub(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ // a new scrub
+ forward_scrub_event(&ScrubPgIF::initiate_scrub_after_repair, queued,
+ "AfterRepairScrub");
+ }
+
+ void replica_scrub(epoch_t queued,
+ Scrub::act_token_t act_token,
+ ThreadPool::TPHandle& handle);
+
+ void replica_scrub_resched(epoch_t queued,
+ Scrub::act_token_t act_token,
+ ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_sched_replica, queued, act_token,
+ "SchedReplica");
+ }
+
+ void scrub_send_resources_granted(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_remotes_reserved, queued, "RemotesReserved");
+ }
+
+ void scrub_send_resources_denied(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_reservation_failure, queued,
+ "ReservationFailure");
+ }
+
+ void scrub_send_scrub_resched(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_scrub_resched, queued, "InternalSchedScrub");
+ }
+
+ void scrub_send_pushes_update(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::active_pushes_notification, queued,
+ "ActivePushesUpd");
+ }
+
+ void scrub_send_applied_update(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::update_applied_notification, queued,
+ "UpdatesApplied");
+ }
+
+ void scrub_send_unblocking(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_scrub_unblock, queued, "Unblocked");
+ }
+
+ void scrub_send_digest_update(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::digest_update_notification, queued, "DigestUpdate");
+ }
+
+ void scrub_send_local_map_ready(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_local_map_done, queued, "IntLocalMapDone");
+ }
+
+ void scrub_send_replmaps_ready(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_replica_maps_ready, queued, "GotReplicas");
+ }
+
+ void scrub_send_replica_pushes(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_replica_pushes_upd, queued,
+ "ReplicaPushesUpd");
+ }
- bool is_scrub_registered();
- void reg_next_scrub();
- void unreg_next_scrub();
+ void scrub_send_maps_compared(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_maps_compared, queued, "MapsCompared");
+ }
+
+ void scrub_send_get_next_chunk(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_get_next_chunk, queued, "NextChunk");
+ }
+
+ void scrub_send_scrub_is_finished(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_scrub_is_finished, queued, "ScrubFinished");
+ }
+
+ void scrub_send_chunk_free(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_chunk_free, queued, "SelectedChunkFree");
+ }
+
+ void scrub_send_chunk_busy(epoch_t queued, ThreadPool::TPHandle& handle)
+ {
+ forward_scrub_event(&ScrubPgIF::send_chunk_busy, queued, "ChunkIsBusy");
+ }
- void queue_want_pg_temp(const vector<int> &wanted) override;
+ void queue_want_pg_temp(const std::vector<int> &wanted) override;
void clear_want_pg_temp() override;
void on_new_interval() override;
void on_info_history_change() override;
- void scrub_requested(bool deep, bool repair, bool need_auto = false) override;
+ void on_primary_status_change(bool was_primary, bool now_primary) override;
+
+ void reschedule_scrub() override;
+
+ void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) override;
uint64_t get_snap_trimq_size() const override {
return snap_trimq.size();
float delay) override;
void request_local_background_io_reservation(
unsigned priority,
- PGPeeringEventRef on_grant,
- PGPeeringEventRef on_preempt) override;
+ PGPeeringEventURef on_grant,
+ PGPeeringEventURef on_preempt) override;
void update_local_background_io_priority(
unsigned priority) override;
void cancel_local_background_io_reservation() override;
void request_remote_recovery_reservation(
unsigned priority,
- PGPeeringEventRef on_grant,
- PGPeeringEventRef on_preempt) override;
+ PGPeeringEventURef on_grant,
+ PGPeeringEventURef on_preempt) override;
void cancel_remote_recovery_reservation() override;
void schedule_event_on_commit(
return finish_recovery();
}
- void on_activate(interval_set<snapid_t> snaps) override {
- ceph_assert(scrubber.callbacks.empty());
- ceph_assert(callbacks_for_degraded_object.empty());
- snap_trimq = snaps;
- release_pg_backoffs();
- projected_last_update = info.last_update;
- }
+ void on_activate(interval_set<snapid_t> snaps) override;
void on_activate_committed() override;
return std::make_unique<PG::PGLogEntryHandler>(this, &t);
}
- void do_delete_work(ObjectStore::Transaction &t) override;
+ std::pair<ghobject_t, bool> do_delete_work(ObjectStore::Transaction &t,
+ ghobject_t _next) override;
void clear_ready_to_merge() override;
void set_not_ready_to_merge_target(pg_t pgid, pg_t src) override;
void queue_flushed(epoch_t started_at);
void handle_advance_map(
OSDMapRef osdmap, OSDMapRef lastmap,
- vector<int>& newup, int up_primary,
- vector<int>& newacting, int acting_primary,
+ std::vector<int>& newup, int up_primary,
+ std::vector<int>& newacting, int acting_primary,
PeeringCtx &rctx);
void handle_activate_map(PeeringCtx &rctx);
void handle_initialize(PeeringCtx &rxcx);
- void handle_query_state(Formatter *f);
+ void handle_query_state(ceph::Formatter *f);
/**
* @param ops_begun returns how many recovery ops the function started
virtual void get_watchers(std::list<obj_watch_item_t> *ls) = 0;
- void dump_pgstate_history(Formatter *f);
- void dump_missing(Formatter *f);
+ void dump_pgstate_history(ceph::Formatter *f);
+ void dump_missing(ceph::Formatter *f);
- void get_pg_stats(std::function<void(const pg_stat_t&, epoch_t lec)> f);
- void with_heartbeat_peers(std::function<void(int)> f);
+ void with_pg_stats(std::function<void(const pg_stat_t&, epoch_t lec)>&& f);
+ void with_heartbeat_peers(std::function<void(int)>&& f);
void shutdown();
virtual void on_shutdown() = 0;
- bool get_must_scrub() const {
- return scrubber.must_scrub;
- }
- bool sched_scrub();
+ bool get_must_scrub() const;
+ Scrub::schedule_result_t sched_scrub();
+ unsigned int scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsigned int suggested_priority) const;
+ /// the version that refers to flags_.priority
+ unsigned int scrub_requeue_priority(Scrub::scrub_prio_t with_priority) const;
+private:
+ // auxiliaries used by sched_scrub():
+ double next_deepscrub_interval() const;
+
+ /// should we perform deep scrub?
+ bool is_time_for_deep(bool allow_deep_scrub,
+ bool allow_scrub,
+ bool has_deep_errors,
+ const requested_scrub_t& planned) const;
+
+ /**
+ * Verify the various 'next scrub' flags in m_planned_scrub against configuration
+ * and scrub-related timestamps.
+ *
+ * @returns an updated copy of the m_planned_flags (or nothing if no scrubbing)
+ */
+ std::optional<requested_scrub_t> verify_scrub_mode() const;
+
+ bool verify_periodic_scrub_mode(bool allow_deep_scrub,
+ bool try_to_auto_repair,
+ bool allow_regular_scrub,
+ bool has_deep_errors,
+ requested_scrub_t& planned) const;
+
+ using ScrubAPI = void (ScrubPgIF::*)(epoch_t epoch_queued);
+ void forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued, std::string_view desc);
+ // and for events that carry a meaningful 'activation token'
+ using ScrubSafeAPI = void (ScrubPgIF::*)(epoch_t epoch_queued,
+ Scrub::act_token_t act_token);
+ void forward_scrub_event(ScrubSafeAPI fn,
+ epoch_t epoch_queued,
+ Scrub::act_token_t act_token,
+ std::string_view desc);
+
+public:
virtual void do_request(
OpRequestRef& op,
ThreadPool::TPHandle &handle
virtual void snap_trimmer(epoch_t epoch_queued) = 0;
virtual void do_command(
- const string_view& prefix,
+ const std::string_view& prefix,
const cmdmap_t& cmdmap,
- const bufferlist& idata,
- std::function<void(int,const std::string&,bufferlist&)> on_finish) = 0;
+ const ceph::buffer::list& idata,
+ std::function<void(int,const std::string&,ceph::buffer::list&)> on_finish) = 0;
virtual bool agent_work(int max) = 0;
virtual bool agent_work(int max, int agent_flush_quota) = 0;
void complete(int r) override;
};
- void _delete_some(ObjectStore::Transaction *t);
-
virtual void set_dynamic_perf_stats_queries(
const std::list<OSDPerfMetricQuery> &queries) {
}
#ifdef PG_DEBUG_REFS
ceph::mutex _ref_id_lock = ceph::make_mutex("PG::_ref_id_lock");
- map<uint64_t, string> _live_ids;
- map<string, uint64_t> _tag_counts;
+ std::map<uint64_t, std::string> _live_ids;
+ std::map<std::string, uint64_t> _tag_counts;
uint64_t _ref_id = 0;
friend uint64_t get_with_id(PG *pg) { return pg->get_with_id(); }
protected:
OSDriver osdriver;
SnapMapper snap_mapper;
- bool eio_errors_to_process = false;
virtual PGBackend *get_pgbackend() = 0;
virtual const PGBackend* get_pgbackend() const = 0;
// ------------------
interval_set<snapid_t> snap_trimq;
- set<snapid_t> snap_trimq_repeat;
+ std::set<snapid_t> snap_trimq_repeat;
/* You should not use these items without taking their respective queue locks
* (if they have one) */
xlist<PG*>::item stat_queue_item;
- bool scrub_queued;
bool recovery_queued;
int recovery_ops_active;
- set<pg_shard_t> waiting_on_backfill;
+ std::set<pg_shard_t> waiting_on_backfill;
#ifdef DEBUG_RECOVERY_OIDS
multiset<hobject_t> recovering_oids;
#endif
public:
bool dne() { return info.dne(); }
- virtual void send_cluster_message(
- int osd, Message *m, epoch_t epoch, bool share_map_update) override;
+ void send_cluster_message(
+ int osd, MessageRef m, epoch_t epoch, bool share_map_update) override;
protected:
epoch_t get_last_peering_reset() const {
}
/* heartbeat peers */
- void set_probe_targets(const set<pg_shard_t> &probe_set) override;
+ void set_probe_targets(const std::set<pg_shard_t> &probe_set) override;
void clear_probe_targets() override;
ceph::mutex heartbeat_peer_lock =
ceph::make_mutex("PG::heartbeat_peer_lock");
- set<int> heartbeat_peers;
- set<int> probe_targets;
-
-public:
- /**
- * BackfillInterval
- *
- * Represents the objects in a range [begin, end)
- *
- * Possible states:
- * 1) begin == end == hobject_t() indicates the the interval is unpopulated
- * 2) Else, objects contains all objects in [begin, end)
- */
- struct BackfillInterval {
- // info about a backfill interval on a peer
- eversion_t version; /// version at which the scan occurred
- map<hobject_t,eversion_t> objects;
- hobject_t begin;
- hobject_t end;
-
- /// clear content
- void clear() {
- *this = BackfillInterval();
- }
-
- /// clear objects list only
- void clear_objects() {
- objects.clear();
- }
-
- /// reinstantiate with a new start+end position and sort order
- void reset(hobject_t start) {
- clear();
- begin = end = start;
- }
-
- /// true if there are no objects in this interval
- bool empty() const {
- return objects.empty();
- }
-
- /// true if interval extends to the end of the range
- bool extends_to_end() const {
- return end.is_max();
- }
-
- /// removes items <= soid and adjusts begin to the first object
- void trim_to(const hobject_t &soid) {
- trim();
- while (!objects.empty() &&
- objects.begin()->first <= soid) {
- pop_front();
- }
- }
-
- /// Adjusts begin to the first object
- void trim() {
- if (!objects.empty())
- begin = objects.begin()->first;
- else
- begin = end;
- }
-
- /// drop first entry, and adjust @begin accordingly
- void pop_front() {
- ceph_assert(!objects.empty());
- objects.erase(objects.begin());
- trim();
- }
-
- /// dump
- void dump(Formatter *f) const {
- f->dump_stream("begin") << begin;
- f->dump_stream("end") << end;
- f->open_array_section("objects");
- for (map<hobject_t, eversion_t>::const_iterator i =
- objects.begin();
- i != objects.end();
- ++i) {
- f->open_object_section("object");
- f->dump_stream("object") << i->first;
- f->dump_stream("version") << i->second;
- f->close_section();
- }
- f->close_section();
- }
- };
+ std::set<int> heartbeat_peers;
+ std::set<int> probe_targets;
protected:
BackfillInterval backfill_info;
- map<pg_shard_t, BackfillInterval> peer_backfill_info;
+ std::map<pg_shard_t, BackfillInterval> peer_backfill_info;
bool backfill_reserving;
// The primary's num_bytes and local num_bytes for this pg, only valid
* queues because we assume they cannot apply at that time (this is
* probably mostly true).
*
- * 3. The requeue_ops helper will push ops onto the waiting_for_map list if
+ * 3. The requeue_ops helper will push ops onto the waiting_for_map std::list if
* it is non-empty.
*
* These three behaviors are generally sufficient to maintain ordering, with
// ops with newer maps than our (or blocked behind them)
// track these by client, since inter-request ordering doesn't otherwise
// matter.
- unordered_map<entity_name_t,list<OpRequestRef>> waiting_for_map;
+ std::unordered_map<entity_name_t,std::list<OpRequestRef>> waiting_for_map;
// ops waiting on peered
- list<OpRequestRef> waiting_for_peered;
+ std::list<OpRequestRef> waiting_for_peered;
/// ops waiting on readble
- list<OpRequestRef> waiting_for_readable;
+ std::list<OpRequestRef> waiting_for_readable;
// ops waiting on active (require peered as well)
- list<OpRequestRef> waiting_for_active;
- list<OpRequestRef> waiting_for_flush;
- list<OpRequestRef> waiting_for_scrub;
+ std::list<OpRequestRef> waiting_for_active;
+ std::list<OpRequestRef> waiting_for_flush;
+ std::list<OpRequestRef> waiting_for_scrub;
- list<OpRequestRef> waiting_for_cache_not_full;
- list<OpRequestRef> waiting_for_clean_to_primary_repair;
- map<hobject_t, list<OpRequestRef>> waiting_for_unreadable_object,
+ std::list<OpRequestRef> waiting_for_cache_not_full;
+ std::list<OpRequestRef> waiting_for_clean_to_primary_repair;
+ std::map<hobject_t, std::list<OpRequestRef>> waiting_for_unreadable_object,
waiting_for_degraded_object,
waiting_for_blocked_object;
- set<hobject_t> objects_blocked_on_cache_full;
- map<hobject_t,snapid_t> objects_blocked_on_degraded_snap;
- map<hobject_t,ObjectContextRef> objects_blocked_on_snap_promotion;
+ std::set<hobject_t> objects_blocked_on_cache_full;
+ std::map<hobject_t,snapid_t> objects_blocked_on_degraded_snap;
+ std::map<hobject_t,ObjectContextRef> objects_blocked_on_snap_promotion;
// Callbacks should assume pg (and nothing else) is locked
- map<hobject_t, list<Context*>> callbacks_for_degraded_object;
+ std::map<hobject_t, std::list<Context*>> callbacks_for_degraded_object;
- map<eversion_t,
- list<
- tuple<OpRequestRef, version_t, int,
- vector<pg_log_op_return_item_t>>>> waiting_for_ondisk;
+ std::map<eversion_t,
+ std::list<
+ std::tuple<OpRequestRef, version_t, int,
+ std::vector<pg_log_op_return_item_t>>>> waiting_for_ondisk;
- void requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m);
+ void requeue_object_waiters(std::map<hobject_t, std::list<OpRequestRef>>& m);
void requeue_op(OpRequestRef op);
- void requeue_ops(list<OpRequestRef> &l);
+ void requeue_ops(std::list<OpRequestRef> &l);
// stats that persist lazily
object_stat_collection_t unstable_stats;
// publish stats
ceph::mutex pg_stats_publish_lock =
ceph::make_mutex("PG::pg_stats_publish_lock");
- bool pg_stats_publish_valid;
- pg_stat_t pg_stats_publish;
+ std::optional<pg_stat_t> pg_stats_publish;
friend class TestOpsSocketHook;
void publish_stats_to_osd() override;
pg->get_pgbackend()->trim(entry, t);
}
};
-
+
void update_object_snap_mapping(
ObjectStore::Transaction *t, const hobject_t &soid,
- const set<snapid_t> &snaps);
+ const std::set<snapid_t> &snaps);
void clear_object_snap_mapping(
ObjectStore::Transaction *t, const hobject_t &soid);
void remove_snap_mapped_object(
void purge_strays();
- void update_heartbeat_peers(set<int> peers) override;
+ void update_heartbeat_peers(std::set<int> peers) override;
Context *finish_sync_event;
virtual void _split_into(pg_t child_pgid, PG *child, unsigned split_bits) = 0;
friend class C_OSD_RepModify_Commit;
- friend class C_DeleteMore;
+ friend struct C_DeleteMore;
// -- backoff --
ceph::mutex backoff_lock = // orders inside Backoff::lock
ceph::make_mutex("PG::backoff_lock");
- map<hobject_t,set<ceph::ref_t<Backoff>>> backoffs;
+ std::map<hobject_t,std::set<ceph::ref_t<Backoff>>> backoffs;
void add_backoff(const ceph::ref_t<Session>& s, const hobject_t& begin, const hobject_t& end);
void release_backoffs(const hobject_t& begin, const hobject_t& end);
hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num());
release_backoffs(begin, end);
}
-protected:
// -- scrub --
-public:
- struct Scrubber {
- Scrubber();
- ~Scrubber();
-
- // metadata
- set<pg_shard_t> reserved_peers;
- bool local_reserved, remote_reserved, reserve_failed;
- epoch_t epoch_start;
-
- // common to both scrubs
- bool active;
- set<pg_shard_t> waiting_on_whom;
- int shallow_errors;
- int deep_errors;
- int fixed;
- ScrubMap primary_scrubmap;
- ScrubMapBuilder primary_scrubmap_pos;
- epoch_t replica_scrub_start = 0;
- ScrubMap replica_scrubmap;
- ScrubMapBuilder replica_scrubmap_pos;
- map<pg_shard_t, ScrubMap> received_maps;
- OpRequestRef active_rep_scrub;
- utime_t scrub_reg_stamp; // stamp we registered for
-
- static utime_t scrub_must_stamp() { return utime_t(0,1); }
-
- omap_stat_t omap_stats = (const struct omap_stat_t){ 0 };
-
- // For async sleep
- bool sleeping = false;
- bool needs_sleep = true;
- utime_t sleep_start;
-
- // flags to indicate explicitly requested scrubs (by admin)
- bool must_scrub, must_deep_scrub, must_repair, need_auto, req_scrub;
-
- // Priority to use for scrub scheduling
- unsigned priority = 0;
-
- bool time_for_deep;
- // this flag indicates whether we would like to do auto-repair of the PG or not
- bool auto_repair;
- // this flag indicates that we are scrubbing post repair to verify everything is fixed
- bool check_repair;
- // this flag indicates that if a regular scrub detects errors <= osd_scrub_auto_repair_num_errors,
- // we should deep scrub in order to auto repair
- bool deep_scrub_on_error;
-
- // Maps from objects with errors to missing/inconsistent peers
- map<hobject_t, set<pg_shard_t>> missing;
- map<hobject_t, set<pg_shard_t>> inconsistent;
-
- // Map from object with errors to good peers
- map<hobject_t, list<pair<ScrubMap::object, pg_shard_t> >> authoritative;
-
- // Cleaned map pending snap metadata scrub
- ScrubMap cleaned_meta_map;
-
- void clean_meta_map(ScrubMap &for_meta_scrub) {
- if (end.is_max() ||
- cleaned_meta_map.objects.empty()) {
- cleaned_meta_map.swap(for_meta_scrub);
- } else {
- auto iter = cleaned_meta_map.objects.end();
- --iter; // not empty, see if clause
- auto begin = cleaned_meta_map.objects.begin();
- if (iter->first.has_snapset()) {
- ++iter;
- } else {
- while (iter != begin) {
- auto next = iter--;
- if (next->first.get_head() != iter->first.get_head()) {
- ++iter;
- break;
- }
- }
- }
- for_meta_scrub.objects.insert(begin, iter);
- cleaned_meta_map.objects.erase(begin, iter);
- }
- }
-
- // digest updates which we are waiting on
- int num_digest_updates_pending;
-
- // chunky scrub
- hobject_t start, end; // [start,end)
- hobject_t max_end; // Largest end that may have been sent to replicas
- eversion_t subset_last_update;
-
- // chunky scrub state
- enum State {
- INACTIVE,
- NEW_CHUNK,
- WAIT_PUSHES,
- WAIT_LAST_UPDATE,
- BUILD_MAP,
- BUILD_MAP_DONE,
- WAIT_REPLICAS,
- COMPARE_MAPS,
- WAIT_DIGEST_UPDATES,
- FINISH,
- BUILD_MAP_REPLICA,
- } state;
-
- std::unique_ptr<Scrub::Store> store;
- // deep scrub
- bool deep;
- int preempt_left;
- int preempt_divisor;
-
- list<Context*> callbacks;
- void add_callback(Context *context) {
- callbacks.push_back(context);
- }
- void run_callbacks() {
- list<Context*> to_run;
- to_run.swap(callbacks);
- for (list<Context*>::iterator i = to_run.begin();
- i != to_run.end();
- ++i) {
- (*i)->complete(0);
- }
- }
-
- static const char *state_string(const PG::Scrubber::State& state) {
- const char *ret = NULL;
- switch( state )
- {
- case INACTIVE: ret = "INACTIVE"; break;
- case NEW_CHUNK: ret = "NEW_CHUNK"; break;
- case WAIT_PUSHES: ret = "WAIT_PUSHES"; break;
- case WAIT_LAST_UPDATE: ret = "WAIT_LAST_UPDATE"; break;
- case BUILD_MAP: ret = "BUILD_MAP"; break;
- case BUILD_MAP_DONE: ret = "BUILD_MAP_DONE"; break;
- case WAIT_REPLICAS: ret = "WAIT_REPLICAS"; break;
- case COMPARE_MAPS: ret = "COMPARE_MAPS"; break;
- case WAIT_DIGEST_UPDATES: ret = "WAIT_DIGEST_UPDATES"; break;
- case FINISH: ret = "FINISH"; break;
- case BUILD_MAP_REPLICA: ret = "BUILD_MAP_REPLICA"; break;
- }
- return ret;
- }
-
- bool is_chunky_scrub_active() const { return state != INACTIVE; }
-
- // clear all state
- void reset() {
- active = false;
- waiting_on_whom.clear();
- if (active_rep_scrub) {
- active_rep_scrub = OpRequestRef();
- }
- received_maps.clear();
-
- must_scrub = false;
- must_deep_scrub = false;
- must_repair = false;
- need_auto = false;
- req_scrub = false;
- time_for_deep = false;
- auto_repair = false;
- check_repair = false;
- deep_scrub_on_error = false;
-
- state = PG::Scrubber::INACTIVE;
- start = hobject_t();
- end = hobject_t();
- max_end = hobject_t();
- subset_last_update = eversion_t();
- shallow_errors = 0;
- deep_errors = 0;
- fixed = 0;
- omap_stats = (const struct omap_stat_t){ 0 };
- deep = false;
- run_callbacks();
- inconsistent.clear();
- missing.clear();
- authoritative.clear();
- num_digest_updates_pending = 0;
- primary_scrubmap = ScrubMap();
- primary_scrubmap_pos.reset();
- replica_scrubmap = ScrubMap();
- replica_scrubmap_pos.reset();
- cleaned_meta_map = ScrubMap();
- sleeping = false;
- needs_sleep = true;
- sleep_start = utime_t();
- }
-
- void create_results(const hobject_t& obj);
- void cleanup_store(ObjectStore::Transaction *t);
- } scrubber;
-
protected:
bool scrub_after_recovery;
- bool save_req_scrub; // Saved for scrub_after_recovery
int active_pushes;
- bool scrub_can_preempt = false;
- bool scrub_preempted = false;
-
- // we allow some number of preemptions of the scrub, which mean we do
- // not block. then we start to block. once we start blocking, we do
- // not stop until the scrub range is completed.
- bool write_blocked_by_scrub(const hobject_t &soid);
-
- /// true if the given range intersects the scrub interval in any way
- bool range_intersects_scrub(const hobject_t &start, const hobject_t& end);
-
void repair_object(
const hobject_t &soid,
- const list<pair<ScrubMap::object, pg_shard_t> > &ok_peers,
- const set<pg_shard_t> &bad_peers);
+ const std::list<std::pair<ScrubMap::object, pg_shard_t> > &ok_peers,
+ const std::set<pg_shard_t> &bad_peers);
+
+ [[nodiscard]] bool ops_blocked_by_scrub() const;
+ [[nodiscard]] Scrub::scrub_prio_t is_scrub_blocking_ops() const;
- void abort_scrub();
- void chunky_scrub(ThreadPool::TPHandle &handle);
- void scrub_compare_maps();
- /**
- * return true if any inconsistency/missing is repaired, false otherwise
- */
- bool scrub_process_inconsistent();
- bool ops_blocked_by_scrub() const;
- void scrub_finish();
- void scrub_clear_state(bool keep_repair = false);
- void _scan_snaps(ScrubMap &map);
void _repair_oinfo_oid(ScrubMap &map);
- void _scan_rollback_obs(const vector<ghobject_t> &rollback_obs);
- void _request_scrub_map(pg_shard_t replica, eversion_t version,
- hobject_t start, hobject_t end, bool deep,
- bool allow_preemption);
- int build_scrub_map_chunk(
- ScrubMap &map,
- ScrubMapBuilder &pos,
- hobject_t start, hobject_t end, bool deep,
- ThreadPool::TPHandle &handle);
+ void _scan_rollback_obs(const std::vector<ghobject_t> &rollback_obs);
/**
* returns true if [begin, end) is good to scrub at this time
* a false return value obliges the implementer to requeue scrub when the
*/
virtual bool _range_available_for_scrub(
const hobject_t &begin, const hobject_t &end) = 0;
- virtual void scrub_snapshot_metadata(
- ScrubMap &map,
- const std::map<hobject_t,
- pair<std::optional<uint32_t>,
- std::optional<uint32_t>>> &missing_digest) { }
- virtual void _scrub_clear_state() { }
- virtual void _scrub_finish() { }
- void clear_scrub_reserved();
- void scrub_reserve_replicas();
- void scrub_unreserve_replicas();
- bool scrub_all_replicas_reserved() const;
-
- void replica_scrub(
- OpRequestRef op,
- ThreadPool::TPHandle &handle);
- void do_replica_scrub_map(OpRequestRef op);
-
- void handle_scrub_reserve_request(OpRequestRef op);
- void handle_scrub_reserve_grant(OpRequestRef op, pg_shard_t from);
- void handle_scrub_reserve_reject(OpRequestRef op, pg_shard_t from);
- void handle_scrub_reserve_release(OpRequestRef op);
+
+ /**
+ * Initiate the process that will create our scrub map for the Primary.
+ * (triggered by MSG_OSD_REP_SCRUB)
+ */
+ void replica_scrub(OpRequestRef op, ThreadPool::TPHandle &handle);
// -- recovery state --
bool is_clean() const { return recovery_state.is_clean(); }
bool is_degraded() const { return recovery_state.is_degraded(); }
bool is_undersized() const { return recovery_state.is_undersized(); }
- bool is_scrubbing() const { return state_test(PG_STATE_SCRUBBING); }
+ bool is_scrubbing() const { return state_test(PG_STATE_SCRUBBING); } // Primary only
bool is_remapped() const { return recovery_state.is_remapped(); }
bool is_peered() const { return recovery_state.is_peered(); }
bool is_recovering() const { return recovery_state.is_recovering(); }
void do_pending_flush();
public:
- virtual void prepare_write(
+ void prepare_write(
pg_info_t &info,
pg_info_t &last_written_info,
PastIntervals &past_intervals,
eversion_t *version,
version_t *user_version,
int *return_code,
- vector<pg_log_op_return_item_t> *op_returns) const;
+ std::vector<pg_log_op_return_item_t> *op_returns) const;
eversion_t projected_last_update;
eversion_t get_next_version() const {
eversion_t at_version(
std::string get_corrupt_pg_log_name() const;
void update_snap_map(
- const vector<pg_log_entry_t> &log_entries,
+ const std::vector<pg_log_entry_t> &log_entries,
ObjectStore::Transaction& t);
- void filter_snapc(vector<snapid_t> &snaps);
+ void filter_snapc(std::vector<snapid_t> &snaps);
virtual void kick_snap_trim() = 0;
virtual void snap_trimmer_scrub_complete() = 0;
- bool requeue_scrub(bool high_priority = false);
+
void queue_recovery();
- bool queue_scrub();
- unsigned get_scrub_priority();
+ void queue_scrub_after_repair();
+ unsigned int get_scrub_priority();
bool try_flush_or_schedule_async() override;
void start_flush_on_transaction(
bool op_has_sufficient_caps(OpRequestRef& op);
// abstract bits
- friend class FlushState;
+ friend struct FlushState;
friend ostream& operator<<(ostream& out, const PG& pg);
// ref to recovery_state.info
const pg_info_t &info;
-};
-ostream& operator<<(ostream& out, const PG::BackfillInterval& bi);
+// ScrubberPasskey getters:
+public:
+ const pg_info_t& get_pg_info(ScrubberPasskey) const {
+ return info;
+ }
+
+ OSDService* get_pg_osd(ScrubberPasskey) const {
+ return osd;
+ }
+
+};
#endif