X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Fosd%2FPG.h;h=59469602405707ac79535b0c712a77baea5fa294;hb=20effc670b57271cb089376d6d0800990e5218d5;hp=8691a273a8a48913bd5d6eccfae95e933ae10470;hpb=b32b81446b3b05102be0267e79203f59329c1d97;p=ceph.git diff --git a/ceph/src/osd/PG.h b/ceph/src/osd/PG.h index 8691a273a..594696024 100644 --- a/ceph/src/osd/PG.h +++ b/ceph/src/osd/PG.h @@ -15,20 +15,13 @@ #ifndef CEPH_PG_H #define CEPH_PG_H -#include -#include -#include -#include -#include -#include -#include #include -#include -#include "include/memory.h" +#include #include "include/mempool.h" // re-include our assert to clobber boost's -#include "include/assert.h" +#include "include/ceph_assert.h" +#include "include/common_fwd.h" #include "include/types.h" #include "include/stringify.h" @@ -40,96 +33,44 @@ #include "PGLog.h" #include "OSDMap.h" -#include "messages/MOSDPGLog.h" #include "include/str_list.h" #include "PGBackend.h" +#include "PGPeeringEvent.h" +#include "PeeringState.h" +#include "recovery_types.h" +#include "MissingLoc.h" +#include "scrubber_common.h" + +#include "mgr/OSDPerfMetricTypes.h" #include #include #include -#include #include #include -using namespace std; - -// #include "include/unordered_map.h" -// #include "include/unordered_set.h" -//#define DEBUG_RECOVERY_OIDS // track set of recovering oids explicitly, to find counting bugs +//#define DEBUG_RECOVERY_OIDS // track std::set of recovering oids explicitly, to find counting bugs +//#define PG_DEBUG_REFS // track provenance of pg refs, helpful for finding leaks class OSD; class OSDService; -class MOSDOp; -class MOSDPGScan; -class MOSDPGBackfill; -class MOSDPGInfo; +class OSDShard; +class OSDShardPGSlot; class PG; struct OpRequest; typedef OpRequest::Ref OpRequestRef; -class MOSDPGLog; -class CephContext; +class DynamicPerfStats; +class PgScrubber; namespace Scrub { class Store; + class ReplicaReservations; + class LocalReservation; + class ReservedByRemotePrimary; + enum class schedule_result_t; } -void intrusive_ptr_add_ref(PG *pg); -void intrusive_ptr_release(PG *pg); - -using state_history_entry = std::tuple; -using embedded_state = std::pair; - -struct PGStateInstance { - // Time spent in pg states - - void setepoch(const epoch_t current_epoch) { - this_epoch = current_epoch; - } - - void enter_state(const utime_t entime, const char* state) { - embedded_states.push(std::make_pair(entime, state)); - } - - void exit_state(const utime_t extime) { - embedded_state this_state = embedded_states.top(); - state_history.push_back(state_history_entry{ - this_state.first, extime, this_state.second}); - embedded_states.pop(); - } - - epoch_t this_epoch; - utime_t enter_time; - std::vector state_history; - std::stack embedded_states; -}; - -class PGStateHistory { - // Member access protected with the PG lock -public: - PGStateHistory() : buffer(10) {} - - void enter(PG* pg, const utime_t entime, const char* state); - - void exit(const char* state); - - void reset() { - pi = nullptr; - } - - void set_pg_in_destructor() { pg_in_destructor = true; } - - void dump(Formatter* f) const; - -private: - bool pg_in_destructor = false; - PG* thispg = nullptr; - std::unique_ptr tmppi; - PGStateInstance* pi = nullptr; - boost::circular_buffer> buffer; - -}; - #ifdef PG_DEBUG_REFS #include "common/tracked_int_ptr.hpp" uint64_t get_with_id(PG *pg); @@ -150,19 +91,19 @@ class PGRecoveryStats { // cppcheck-suppress unreachableCode per_state_info() : enter(0), exit(0), events(0) {} }; - map info; - Mutex lock; + std::map info; + ceph::mutex lock = ceph::make_mutex("PGRecoverStats::lock"); public: - PGRecoveryStats() : lock("PGRecoverStats::lock") {} + PGRecoveryStats() = default; void reset() { - Mutex::Locker l(lock); + std::lock_guard l(lock); info.clear(); } void dump(ostream& out) { - Mutex::Locker l(lock); - for (map::iterator p = info.begin(); p != info.end(); ++p) { + std::lock_guard l(lock); + for (std::map::iterator p = info.begin(); p != info.end(); ++p) { per_state_info& i = p->second; out << i.enter << "\t" << i.exit << "\t" << i.events << "\t" << i.event_time << "\t" @@ -172,10 +113,10 @@ class PGRecoveryStats { } } - void dump_formatted(Formatter *f) { - Mutex::Locker l(lock); + void dump_formatted(ceph::Formatter *f) { + std::lock_guard l(lock); f->open_array_section("pg_recovery_stats"); - for (map::iterator p = info.begin(); + for (std::map::iterator p = info.begin(); p != info.end(); ++p) { per_state_info& i = p->second; f->open_object_section("recovery_state"); @@ -186,10 +127,10 @@ class PGRecoveryStats { f->dump_stream("total_time") << i.total_time; f->dump_stream("min_time") << i.min_time; f->dump_stream("max_time") << i.max_time; - vector states; + std::vector states; get_str_vec(p->first, "/", states); f->open_array_section("nested_states"); - for (vector::iterator st = states.begin(); + for (std::vector::iterator st = states.begin(); st != states.end(); ++st) { f->dump_string("state", *st); } @@ -200,11 +141,11 @@ class PGRecoveryStats { } void log_enter(const char *s) { - Mutex::Locker l(lock); + std::lock_guard l(lock); info[s].enter++; } void log_exit(const char *s, utime_t dur, uint64_t events, utime_t event_dur) { - Mutex::Locker l(lock); + std::lock_guard l(lock); per_state_info &i = info[s]; i.exit++; i.total_time += dur; @@ -217,634 +158,818 @@ class PGRecoveryStats { } }; -struct PGPool { - CephContext* cct; - epoch_t cached_epoch; - int64_t id; - string name; - uint64_t auid; - - pg_pool_t info; - SnapContext snapc; // the default pool snapc, ready to go. - - interval_set cached_removed_snaps; // current removed_snaps set - interval_set newly_removed_snaps; // newly removed in the last epoch - - PGPool(CephContext* cct, OSDMapRef map, int64_t i) - : cct(cct), - cached_epoch(map->get_epoch()), - id(i), - name(map->get_pool_name(id)), - auid(map->get_pg_pool(id)->auid) { - const pg_pool_t *pi = map->get_pg_pool(id); - assert(pi); - info = *pi; - snapc = pi->get_snap_context(); - pi->build_removed_snaps(cached_removed_snaps); - } - - void update(OSDMapRef map); -}; - /** PG - Replica Placement Group * */ -class PG : public DoutPrefixProvider { -protected: - OSDService *osd; - CephContext *cct; - OSDriver osdriver; - SnapMapper snap_mapper; - bool eio_errors_to_process = false; +/// Facilitating scrub-realated object access to private PG data +class ScrubberPasskey { +private: + friend class Scrub::ReplicaReservations; + friend class PrimaryLogScrub; + ScrubberPasskey() {} + ScrubberPasskey(const ScrubberPasskey&) = default; + ScrubberPasskey& operator=(const ScrubberPasskey&) = delete; +}; + +class PG : public DoutPrefixProvider, public PeeringState::PeeringListener { + friend struct NamedState; + friend class PeeringState; + friend class PgScrubber; - virtual PGBackend *get_pgbackend() = 0; public: - std::string gen_prefix() const override; - CephContext *get_cct() const override { return cct; } - unsigned get_subsys() const override { return ceph_subsys_osd; } + const pg_shard_t pg_whoami; + const spg_t pg_id; + + /// the 'scrubber'. Will be allocated in the derivative (PrimaryLogPG) ctor, + /// and be removed only in the PrimaryLogPG destructor. + std::unique_ptr m_scrubber; + + /// flags detailing scheduling/operation characteristics of the next scrub + requested_scrub_t m_planned_scrub; + + /// scrubbing state for both Primary & replicas + bool is_scrub_active() const { return m_scrubber->is_scrub_active(); } + + /// set when the scrub request is queued, and reset after scrubbing fully + /// cleaned up. + bool is_scrub_queued_or_active() const { return m_scrubber->is_queued_or_active(); } + +public: + // -- members -- + const coll_t coll; + + ObjectStore::CollectionHandle ch; + + // -- methods -- + std::ostream& gen_prefix(std::ostream& out) const override; + CephContext *get_cct() const override { + return cct; + } + unsigned get_subsys() const override { + return ceph_subsys_osd; + } + + const char* const get_current_state() const { + return recovery_state.get_current_state(); + } + + const OSDMapRef& get_osdmap() const { + ceph_assert(is_locked()); + return recovery_state.get_osdmap(); + } + + epoch_t get_osdmap_epoch() const override final { + return recovery_state.get_osdmap()->get_epoch(); + } + + PerfCounters &get_peering_perf() override; + PerfCounters &get_perf_logger() override; + void log_state_enter(const char *state) override; + void log_state_exit( + const char *state_name, utime_t enter_time, + uint64_t events, utime_t event_dur) override; + + void lock(bool no_lockdep = false) const; + void unlock() const; + bool is_locked() const; + + const spg_t& get_pgid() const { + return pg_id; + } + + const PGPool& get_pool() const { + return pool; + } + uint64_t get_last_user_version() const { + return info.last_user_version; + } + const pg_history_t& get_history() const { + return info.history; + } + bool get_need_up_thru() const { + return recovery_state.get_need_up_thru(); + } + epoch_t get_same_interval_since() const { + return info.history.same_interval_since; + } + + static void set_last_scrub_stamp( + utime_t t, pg_history_t &history, pg_stat_t &stats) { + stats.last_scrub_stamp = t; + history.last_scrub_stamp = t; + } + + void set_last_scrub_stamp(utime_t t) { + recovery_state.update_stats( + [=](auto &history, auto &stats) { + set_last_scrub_stamp(t, history, stats); + return true; + }); + } + + static void set_last_deep_scrub_stamp( + utime_t t, pg_history_t &history, pg_stat_t &stats) { + stats.last_deep_scrub_stamp = t; + history.last_deep_scrub_stamp = t; + } + + void set_last_deep_scrub_stamp(utime_t t) { + recovery_state.update_stats( + [=](auto &history, auto &stats) { + set_last_deep_scrub_stamp(t, history, stats); + return true; + }); + } + + static void add_objects_scrubbed_count( + int64_t count, pg_stat_t &stats) { + stats.objects_scrubbed += count; + } + + void add_objects_scrubbed_count(int64_t count) { + recovery_state.update_stats( + [=](auto &history, auto &stats) { + add_objects_scrubbed_count(count, stats); + return true; + }); + } + + static void reset_objects_scrubbed(pg_stat_t &stats) { + stats.objects_scrubbed = 0; + } + + void reset_objects_scrubbed() { + recovery_state.update_stats( + [=](auto &history, auto &stats) { + reset_objects_scrubbed(stats); + return true; + }); + } + + bool is_deleting() const { + return recovery_state.is_deleting(); + } + bool is_deleted() const { + return recovery_state.is_deleted(); + } + bool is_nonprimary() const { + return recovery_state.is_nonprimary(); + } + bool is_primary() const { + return recovery_state.is_primary(); + } + bool pg_has_reset_since(epoch_t e) { + ceph_assert(is_locked()); + return recovery_state.pg_has_reset_since(e); + } + + bool is_ec_pg() const { + return recovery_state.is_ec_pg(); + } + int get_role() const { + return recovery_state.get_role(); + } + const std::vector get_acting() const { + return recovery_state.get_acting(); + } + const std::set &get_actingset() const { + return recovery_state.get_actingset(); + } + int get_acting_primary() const { + return recovery_state.get_acting_primary(); + } + pg_shard_t get_primary() const { + return recovery_state.get_primary(); + } + const std::vector get_up() const { + return recovery_state.get_up(); + } + int get_up_primary() const { + return recovery_state.get_up_primary(); + } + const PastIntervals& get_past_intervals() const { + return recovery_state.get_past_intervals(); + } + bool is_acting_recovery_backfill(pg_shard_t osd) const { + return recovery_state.is_acting_recovery_backfill(osd); + } + const std::set &get_acting_recovery_backfill() const { + return recovery_state.get_acting_recovery_backfill(); + } + bool is_acting(pg_shard_t osd) const { + return recovery_state.is_acting(osd); + } + bool is_up(pg_shard_t osd) const { + return recovery_state.is_up(osd); + } + static bool has_shard(bool ec, const std::vector& v, pg_shard_t osd) { + return PeeringState::has_shard(ec, v, osd); + } + + /// initialize created PG + void init( + int role, + const std::vector& up, + int up_primary, + const std::vector& acting, + int acting_primary, + const pg_history_t& history, + const PastIntervals& pim, + ObjectStore::Transaction &t); + + /// read existing pg state off disk + void read_state(ObjectStore *store); + static int peek_map_epoch(ObjectStore *store, spg_t pgid, epoch_t *pepoch); + + static int get_latest_struct_v() { + return pg_latest_struct_v; + } + static int get_compat_struct_v() { + return pg_compat_struct_v; + } + static int read_info( + ObjectStore *store, spg_t pgid, const coll_t &coll, + pg_info_t &info, PastIntervals &past_intervals, + __u8 &); + static bool _has_removal_flag(ObjectStore *store, spg_t pgid); + + void rm_backoff(const ceph::ref_t& b); - /*** PG ****/ void update_snap_mapper_bits(uint32_t bits) { snap_mapper.update_bits(bits); } - /// get_is_recoverable_predicate: caller owns returned pointer and must delete when done - IsPGRecoverablePredicate *get_is_recoverable_predicate() { - return get_pgbackend()->get_is_recoverable_predicate(); + void start_split_stats(const std::set& childpgs, std::vector *v); + virtual void split_colls( + spg_t child, + int split_bits, + int seed, + const pg_pool_t *pool, + ObjectStore::Transaction &t) = 0; + void split_into(pg_t child_pgid, PG *child, unsigned split_bits); + void merge_from(std::map& sources, PeeringCtx &rctx, + unsigned split_bits, + const pg_merge_meta_t& last_pg_merge_meta); + void finish_split_stats(const object_stat_sum_t& stats, + ObjectStore::Transaction &t); + + void scrub(epoch_t queued, ThreadPool::TPHandle& handle) + { + // a new scrub + forward_scrub_event(&ScrubPgIF::initiate_regular_scrub, queued, "StartScrub"); + } + + /** + * a special version of PG::scrub(), which: + * - is initiated after repair, and + * (not true anymore:) + * - is not required to allocate local/remote OSD scrub resources + */ + void recovery_scrub(epoch_t queued, ThreadPool::TPHandle& handle) + { + // a new scrub + forward_scrub_event(&ScrubPgIF::initiate_scrub_after_repair, queued, + "AfterRepairScrub"); } -protected: - OSDMapRef osdmap_ref; - OSDMapRef last_persisted_osdmap_ref; - PGPool pool; - void requeue_map_waiters(); + void replica_scrub(epoch_t queued, + Scrub::act_token_t act_token, + ThreadPool::TPHandle& handle); - void update_osdmap_ref(OSDMapRef newmap) { - assert(_lock.is_locked_by_me()); - osdmap_ref = std::move(newmap); + void replica_scrub_resched(epoch_t queued, + Scrub::act_token_t act_token, + ThreadPool::TPHandle& handle) + { + forward_scrub_event(&ScrubPgIF::send_sched_replica, queued, act_token, + "SchedReplica"); } -public: - OSDMapRef get_osdmap() const { - assert(is_locked()); - assert(osdmap_ref); - return osdmap_ref; + void scrub_send_resources_granted(epoch_t queued, ThreadPool::TPHandle& handle) + { + forward_scrub_event(&ScrubPgIF::send_remotes_reserved, queued, "RemotesReserved"); } -protected: - /** locking and reference counting. - * I destroy myself when the reference count hits zero. - * lock() should be called before doing anything. - * get() should be called on pointer copy (to another thread, etc.). - * put() should be called on destruction of some previously copied pointer. - * unlock() when done with the current pointer (_most common_). - */ - mutable Mutex _lock; - std::atomic_uint ref{0}; + void scrub_send_resources_denied(epoch_t queued, ThreadPool::TPHandle& handle) + { + forward_scrub_event(&ScrubPgIF::send_reservation_failure, queued, + "ReservationFailure"); + } -#ifdef PG_DEBUG_REFS - Mutex _ref_id_lock; - map _live_ids; - map _tag_counts; - uint64_t _ref_id; -#endif + void scrub_send_scrub_resched(epoch_t queued, ThreadPool::TPHandle& handle) + { + forward_scrub_event(&ScrubPgIF::send_scrub_resched, queued, "InternalSchedScrub"); + } -public: - bool deleting; // true while in removing or OSD is shutting down + void scrub_send_pushes_update(epoch_t queued, ThreadPool::TPHandle& handle) + { + forward_scrub_event(&ScrubPgIF::active_pushes_notification, queued, + "ActivePushesUpd"); + } - ZTracer::Endpoint trace_endpoint; + void scrub_send_applied_update(epoch_t queued, ThreadPool::TPHandle& handle) + { + forward_scrub_event(&ScrubPgIF::update_applied_notification, queued, + "UpdatesApplied"); + } - void lock_suspend_timeout(ThreadPool::TPHandle &handle); - void lock(bool no_lockdep = false) const; - void unlock() const { - //generic_dout(0) << this << " " << info.pgid << " unlock" << dendl; - assert(!dirty_info); - assert(!dirty_big_info); - _lock.Unlock(); + void scrub_send_unblocking(epoch_t queued, ThreadPool::TPHandle& handle) + { + forward_scrub_event(&ScrubPgIF::send_scrub_unblock, queued, "Unblocked"); } - bool is_locked() const { - return _lock.is_locked(); + void scrub_send_digest_update(epoch_t queued, ThreadPool::TPHandle& handle) + { + forward_scrub_event(&ScrubPgIF::digest_update_notification, queued, "DigestUpdate"); } -#ifdef PG_DEBUG_REFS - uint64_t get_with_id(); - void put_with_id(uint64_t); - void dump_live_ids(); -#endif - void get(const char* tag); - void put(const char* tag); + void scrub_send_local_map_ready(epoch_t queued, ThreadPool::TPHandle& handle) + { + forward_scrub_event(&ScrubPgIF::send_local_map_done, queued, "IntLocalMapDone"); + } - bool dirty_info, dirty_big_info; + void scrub_send_replmaps_ready(epoch_t queued, ThreadPool::TPHandle& handle) + { + forward_scrub_event(&ScrubPgIF::send_replica_maps_ready, queued, "GotReplicas"); + } -public: - bool is_ec_pg() const { - return pool.info.ec_pool(); - } - // pg state - pg_info_t info; ///< current pg info - pg_info_t last_written_info; ///< last written info - __u8 info_struct_v; - static const __u8 cur_struct_v = 10; - // v10 is the new past_intervals encoding - // v9 was fastinfo_key addition - // v8 was the move to a per-pg pgmeta object - // v7 was SnapMapper addition in 86658392516d5175b2756659ef7ffaaf95b0f8ad - // (first appeared in cuttlefish). - static const __u8 compat_struct_v = 7; - bool must_upgrade() { - return info_struct_v < cur_struct_v; - } - bool can_upgrade() { - return info_struct_v >= compat_struct_v; + void scrub_send_replica_pushes(epoch_t queued, ThreadPool::TPHandle& handle) + { + forward_scrub_event(&ScrubPgIF::send_replica_pushes_upd, queued, + "ReplicaPushesUpd"); } - void upgrade(ObjectStore *store); - const coll_t coll; - ObjectStore::CollectionHandle ch; - PGLog pg_log; - static string get_info_key(spg_t pgid) { - return stringify(pgid) + "_info"; + void scrub_send_maps_compared(epoch_t queued, ThreadPool::TPHandle& handle) + { + forward_scrub_event(&ScrubPgIF::send_maps_compared, queued, "MapsCompared"); } - static string get_biginfo_key(spg_t pgid) { - return stringify(pgid) + "_biginfo"; + + void scrub_send_get_next_chunk(epoch_t queued, ThreadPool::TPHandle& handle) + { + forward_scrub_event(&ScrubPgIF::send_get_next_chunk, queued, "NextChunk"); } - static string get_epoch_key(spg_t pgid) { - return stringify(pgid) + "_epoch"; + + void scrub_send_scrub_is_finished(epoch_t queued, ThreadPool::TPHandle& handle) + { + forward_scrub_event(&ScrubPgIF::send_scrub_is_finished, queued, "ScrubFinished"); } - ghobject_t pgmeta_oid; - class MissingLoc { - map needs_recovery_map; - map > missing_loc; - set missing_loc_sources; - PG *pg; - set empty_set; - public: - boost::scoped_ptr is_readable; - boost::scoped_ptr is_recoverable; - explicit MissingLoc(PG *pg) - : pg(pg) {} - void set_backend_predicates( - IsPGReadablePredicate *_is_readable, - IsPGRecoverablePredicate *_is_recoverable) { - is_readable.reset(_is_readable); - is_recoverable.reset(_is_recoverable); - } - string gen_prefix() const { return pg->gen_prefix(); } - bool needs_recovery( - const hobject_t &hoid, - eversion_t *v = 0) const { - map::const_iterator i = - needs_recovery_map.find(hoid); - if (i == needs_recovery_map.end()) - return false; - if (v) - *v = i->second.need; - return true; - } - bool is_deleted(const hobject_t &hoid) const { - auto i = needs_recovery_map.find(hoid); - if (i == needs_recovery_map.end()) - return false; - return i->second.is_delete(); - } - bool is_unfound(const hobject_t &hoid) const { - return needs_recovery(hoid) && !is_deleted(hoid) && ( - !missing_loc.count(hoid) || - !(*is_recoverable)(missing_loc.find(hoid)->second)); - } - bool readable_with_acting( - const hobject_t &hoid, - const set &acting) const; - uint64_t num_unfound() const { - uint64_t ret = 0; - for (map::const_iterator i = - needs_recovery_map.begin(); - i != needs_recovery_map.end(); - ++i) { - if (is_unfound(i->first)) - ++ret; - } - return ret; - } + void scrub_send_chunk_free(epoch_t queued, ThreadPool::TPHandle& handle) + { + forward_scrub_event(&ScrubPgIF::send_chunk_free, queued, "SelectedChunkFree"); + } - bool have_unfound() const { - for (map::const_iterator i = - needs_recovery_map.begin(); - i != needs_recovery_map.end(); - ++i) { - if (is_unfound(i->first)) - return true; - } - return false; - } - void clear() { - needs_recovery_map.clear(); - missing_loc.clear(); - missing_loc_sources.clear(); - } + void scrub_send_chunk_busy(epoch_t queued, ThreadPool::TPHandle& handle) + { + forward_scrub_event(&ScrubPgIF::send_chunk_busy, queued, "ChunkIsBusy"); + } - void add_location(const hobject_t &hoid, pg_shard_t location) { - missing_loc[hoid].insert(location); - } - void remove_location(const hobject_t &hoid, pg_shard_t location) { - missing_loc[hoid].erase(location); - } - void add_active_missing(const pg_missing_t &missing) { - for (map::const_iterator i = - missing.get_items().begin(); - i != missing.get_items().end(); - ++i) { - map::const_iterator j = - needs_recovery_map.find(i->first); - if (j == needs_recovery_map.end()) { - needs_recovery_map.insert(*i); - } else { - lgeneric_dout(pg->cct, 0) << this << " " << pg->info.pgid << " unexpected need for " - << i->first << " have " << j->second - << " tried to add " << i->second << dendl; - assert(i->second.need == j->second.need); - } - } - } + void queue_want_pg_temp(const std::vector &wanted) override; + void clear_want_pg_temp() override; - void add_missing(const hobject_t &hoid, eversion_t need, eversion_t have) { - needs_recovery_map[hoid] = pg_missing_item(need, have); - } - void revise_need(const hobject_t &hoid, eversion_t need) { - assert(needs_recovery(hoid)); - needs_recovery_map[hoid].need = need; - } + void on_new_interval() override; - /// Adds info about a possible recovery source - bool add_source_info( - pg_shard_t source, ///< [in] source - const pg_info_t &oinfo, ///< [in] info - const pg_missing_t &omissing, ///< [in] (optional) missing - ThreadPool::TPHandle* handle ///< [in] ThreadPool handle - ); ///< @return whether a new object location was discovered - - /// Adds recovery sources in batch - void add_batch_sources_info( - const set &sources, ///< [in] a set of resources which can be used for all objects - ThreadPool::TPHandle* handle ///< [in] ThreadPool handle - ); - - /// Uses osdmap to update structures for now down sources - void check_recovery_sources(const OSDMapRef& osdmap); - - /// Call when hoid is no longer missing in acting set - void recovered(const hobject_t &hoid) { - needs_recovery_map.erase(hoid); - missing_loc.erase(hoid); - } + void on_role_change() override; + virtual void plpg_on_role_change() = 0; - /// Call to update structures for hoid after a change - void rebuild( - const hobject_t &hoid, - pg_shard_t self, - const set to_recover, - const pg_info_t &info, - const pg_missing_t &missing, - const map &pmissing, - const map &pinfo) { - recovered(hoid); - boost::optional item; - auto miter = missing.get_items().find(hoid); - if (miter != missing.get_items().end()) { - item = miter->second; - } else { - for (auto &&i: to_recover) { - if (i == self) - continue; - auto pmiter = pmissing.find(i); - assert(pmiter != pmissing.end()); - miter = pmiter->second.get_items().find(hoid); - if (miter != pmiter->second.get_items().end()) { - item = miter->second; - break; - } - } - } - if (!item) - return; // recovered! - - needs_recovery_map[hoid] = *item; - if (item->is_delete()) - return; - auto mliter = - missing_loc.insert(make_pair(hoid, set())).first; - assert(info.last_backfill.is_max()); - assert(info.last_update >= item->need); - if (!missing.is_missing(hoid)) - mliter->second.insert(self); - for (auto &&i: pmissing) { - auto pinfoiter = pinfo.find(i.first); - assert(pinfoiter != pinfo.end()); - if (item->need <= pinfoiter->second.last_update && - hoid <= pinfoiter->second.last_backfill && - !i.second.is_missing(hoid)) - mliter->second.insert(i.first); - } - } + void init_collection_pool_opts(); + void on_pool_change() override; + virtual void plpg_on_pool_change() = 0; - const set &get_locations(const hobject_t &hoid) const { - return missing_loc.count(hoid) ? - missing_loc.find(hoid)->second : empty_set; - } - const map> &get_missing_locs() const { - return missing_loc; - } - const map &get_needs_recovery() const { - return needs_recovery_map; + void on_info_history_change() override; + + void on_primary_status_change(bool was_primary, bool now_primary) override; + + void reschedule_scrub() override; + + void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) override; + + uint64_t get_snap_trimq_size() const override { + return snap_trimq.size(); + } + unsigned get_target_pg_log_entries() const override; + + void clear_publish_stats() override; + void clear_primary_state() override; + + epoch_t oldest_stored_osdmap() override; + OstreamTemp get_clog_error() override; + OstreamTemp get_clog_info() override; + OstreamTemp get_clog_debug() override; + + void schedule_event_after( + PGPeeringEventRef event, + float delay) override; + void request_local_background_io_reservation( + unsigned priority, + PGPeeringEventURef on_grant, + PGPeeringEventURef on_preempt) override; + void update_local_background_io_priority( + unsigned priority) override; + void cancel_local_background_io_reservation() override; + + void request_remote_recovery_reservation( + unsigned priority, + PGPeeringEventURef on_grant, + PGPeeringEventURef on_preempt) override; + void cancel_remote_recovery_reservation() override; + + void schedule_event_on_commit( + ObjectStore::Transaction &t, + PGPeeringEventRef on_commit) override; + + void on_active_exit() override; + + Context *on_clean() override { + if (is_active()) { + kick_snap_trim(); } - } missing_loc; - - PastIntervals past_intervals; + requeue_ops(waiting_for_clean_to_primary_repair); + return finish_recovery(); + } - interval_set snap_trimq; + void on_activate(interval_set snaps) override; - /* You should not use these items without taking their respective queue locks - * (if they have one) */ - xlist::item stat_queue_item; - bool scrub_queued; - bool recovery_queued; + void on_activate_committed() override; - int recovery_ops_active; - set waiting_on_backfill; -#ifdef DEBUG_RECOVERY_OIDS - set recovering_oids; -#endif + void on_active_actmap() override; + void on_active_advmap(const OSDMapRef &osdmap) override; -protected: - int role; // 0 = primary, 1 = replica, -1=none. - unsigned state; // PG_STATE_* + void queue_snap_retrim(snapid_t snap); - bool send_notify; ///< true if we are non-primary and should notify the primary + void on_backfill_reserved() override; + void on_backfill_canceled() override; + void on_recovery_reserved() override; + + bool is_forced_recovery_or_backfill() const { + return recovery_state.is_forced_recovery_or_backfill(); + } + + PGLog::LogEntryHandlerRef get_log_handler( + ObjectStore::Transaction &t) override { + return std::make_unique(this, &t); + } + + std::pair do_delete_work(ObjectStore::Transaction &t, + ghobject_t _next) override; + + void clear_ready_to_merge() override; + void set_not_ready_to_merge_target(pg_t pgid, pg_t src) override; + void set_not_ready_to_merge_source(pg_t pgid) override; + void set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec) override; + void set_ready_to_merge_source(eversion_t lu) override; + + void send_pg_created(pg_t pgid) override; + + ceph::signedspan get_mnow() override; + HeartbeatStampsRef get_hb_stamps(int peer) override; + void schedule_renew_lease(epoch_t lpr, ceph::timespan delay) override; + void queue_check_readable(epoch_t lpr, ceph::timespan delay) override; + + void rebuild_missing_set_with_deletes(PGLog &pglog) override; + + void queue_peering_event(PGPeeringEventRef evt); + void do_peering_event(PGPeeringEventRef evt, PeeringCtx &rcx); + void queue_null(epoch_t msg_epoch, epoch_t query_epoch); + void queue_flushed(epoch_t started_at); + void handle_advance_map( + OSDMapRef osdmap, OSDMapRef lastmap, + std::vector& newup, int up_primary, + std::vector& newacting, int acting_primary, + PeeringCtx &rctx); + void handle_activate_map(PeeringCtx &rctx); + void handle_initialize(PeeringCtx &rxcx); + void handle_query_state(ceph::Formatter *f); + + /** + * @param ops_begun returns how many recovery ops the function started + * @returns true if any useful work was accomplished; false otherwise + */ + virtual bool start_recovery_ops( + uint64_t max, + ThreadPool::TPHandle &handle, + uint64_t *ops_begun) = 0; + + // more work after the above, but with a PeeringCtx + void find_unfound(epoch_t queued, PeeringCtx &rctx); + + virtual void get_watchers(std::list *ls) = 0; + + void dump_pgstate_history(ceph::Formatter *f); + void dump_missing(ceph::Formatter *f); + + void with_pg_stats(std::function&& f); + void with_heartbeat_peers(std::function&& f); + + void shutdown(); + virtual void on_shutdown() = 0; + + bool get_must_scrub() const; + Scrub::schedule_result_t sched_scrub(); + + unsigned int scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsigned int suggested_priority) const; + /// the version that refers to flags_.priority + unsigned int scrub_requeue_priority(Scrub::scrub_prio_t with_priority) const; +private: + // auxiliaries used by sched_scrub(): + double next_deepscrub_interval() const; + + /// should we perform deep scrub? + bool is_time_for_deep(bool allow_deep_scrub, + bool allow_scrub, + bool has_deep_errors, + const requested_scrub_t& planned) const; + + /** + * Verify the various 'next scrub' flags in m_planned_scrub against configuration + * and scrub-related timestamps. + * + * @returns an updated copy of the m_planned_flags (or nothing if no scrubbing) + */ + std::optional verify_scrub_mode() const; + + bool verify_periodic_scrub_mode(bool allow_deep_scrub, + bool try_to_auto_repair, + bool allow_regular_scrub, + bool has_deep_errors, + requested_scrub_t& planned) const; + + using ScrubAPI = void (ScrubPgIF::*)(epoch_t epoch_queued); + void forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued, std::string_view desc); + // and for events that carry a meaningful 'activation token' + using ScrubSafeAPI = void (ScrubPgIF::*)(epoch_t epoch_queued, + Scrub::act_token_t act_token); + void forward_scrub_event(ScrubSafeAPI fn, + epoch_t epoch_queued, + Scrub::act_token_t act_token, + std::string_view desc); public: - eversion_t last_update_ondisk; // last_update that has committed; ONLY DEFINED WHEN is_active() - eversion_t last_complete_ondisk; // last_complete that has committed. - eversion_t last_update_applied; + virtual void do_request( + OpRequestRef& op, + ThreadPool::TPHandle &handle + ) = 0; + virtual void clear_cache() = 0; + virtual int get_cache_obj_count() = 0; + virtual void snap_trimmer(epoch_t epoch_queued) = 0; + virtual void do_command( + const std::string_view& prefix, + const cmdmap_t& cmdmap, + const ceph::buffer::list& idata, + std::function on_finish) = 0; - struct C_UpdateLastRollbackInfoTrimmedToApplied : Context { + virtual bool agent_work(int max) = 0; + virtual bool agent_work(int max, int agent_flush_quota) = 0; + virtual void agent_stop() = 0; + virtual void agent_delay() = 0; + virtual void agent_clear() = 0; + virtual void agent_choose_mode_restart() = 0; + + struct C_DeleteMore : public Context { PGRef pg; - epoch_t e; - eversion_t v; - C_UpdateLastRollbackInfoTrimmedToApplied(PG *pg, epoch_t e, eversion_t v) - : pg(pg), e(e), v(v) {} - void finish(int) override { - pg->lock(); - if (!pg->pg_has_reset_since(e)) { - pg->last_rollback_info_trimmed_to_applied = v; - } - pg->unlock(); + epoch_t epoch; + C_DeleteMore(PG *p, epoch_t e) : pg(p), epoch(e) {} + void finish(int r) override { + ceph_abort(); } - }; - // entries <= last_rollback_info_trimmed_to_applied have been trimmed, - // and the transaction has applied - eversion_t last_rollback_info_trimmed_to_applied; - - // primary state - public: - pg_shard_t primary; - pg_shard_t pg_whoami; - pg_shard_t up_primary; - vector up, acting, want_acting; - set actingbackfill, actingset, upset; - map peer_last_complete_ondisk; - eversion_t min_last_complete_ondisk; // up: min over last_complete_ondisk, peer_last_complete_ondisk - eversion_t pg_trim_to; - - set blocked_by; ///< osds we are blocked by (for pg stats) - - // [primary only] content recovery state - -public: - struct BufferedRecoveryMessages { - map > query_map; - map > > info_map; - map > > notify_list; + void complete(int r) override; }; - struct RecoveryCtx { - utime_t start_time; - map > *query_map; - map > > *info_map; - map > > *notify_list; - set created_pgs; - C_Contexts *on_applied; - C_Contexts *on_safe; - ObjectStore::Transaction *transaction; - ThreadPool::TPHandle* handle; - RecoveryCtx(map > *query_map, - map > > *info_map, - map > > *notify_list, - C_Contexts *on_applied, - C_Contexts *on_safe, - ObjectStore::Transaction *transaction) - : query_map(query_map), info_map(info_map), - notify_list(notify_list), - on_applied(on_applied), - on_safe(on_safe), - transaction(transaction), - handle(NULL) {} - - RecoveryCtx(BufferedRecoveryMessages &buf, RecoveryCtx &rctx) - : query_map(&(buf.query_map)), - info_map(&(buf.info_map)), - notify_list(&(buf.notify_list)), - on_applied(rctx.on_applied), - on_safe(rctx.on_safe), - transaction(rctx.transaction), - handle(rctx.handle) {} - - void accept_buffered_messages(BufferedRecoveryMessages &m) { - assert(query_map); - assert(info_map); - assert(notify_list); - for (map >::iterator i = m.query_map.begin(); - i != m.query_map.end(); - ++i) { - map &omap = (*query_map)[i->first]; - for (map::iterator j = i->second.begin(); - j != i->second.end(); - ++j) { - omap[j->first] = j->second; - } - } - for (map > >::iterator i - = m.info_map.begin(); - i != m.info_map.end(); - ++i) { - vector > &ovec = - (*info_map)[i->first]; - ovec.reserve(ovec.size() + i->second.size()); - ovec.insert(ovec.end(), i->second.begin(), i->second.end()); - } - for (map > >::iterator i - = m.notify_list.begin(); - i != m.notify_list.end(); - ++i) { - vector > &ovec = - (*notify_list)[i->first]; - ovec.reserve(ovec.size() + i->second.size()); - ovec.insert(ovec.end(), i->second.begin(), i->second.end()); - } - } - }; + virtual void set_dynamic_perf_stats_queries( + const std::list &queries) { + } + virtual void get_dynamic_perf_stats(DynamicPerfStats *stats) { + } + + uint64_t get_min_alloc_size() const; + + // reference counting +#ifdef PG_DEBUG_REFS + uint64_t get_with_id(); + void put_with_id(uint64_t); + void dump_live_ids(); +#endif + void get(const char* tag); + void put(const char* tag); + int get_num_ref() { + return ref; + } + // ctor + PG(OSDService *o, OSDMapRef curmap, + const PGPool &pool, spg_t p); + ~PG() override; - PGStateHistory pgstate_history; + // prevent copying + explicit PG(const PG& rhs) = delete; + PG& operator=(const PG& rhs) = delete; - struct NamedState { - const char *state_name; - utime_t enter_time; - PG* pg; - const char *get_state_name() { return state_name; } - NamedState(PG *pg_, const char *state_name_) - : state_name(state_name_), enter_time(ceph_clock_now()), pg(pg_) { - pg->pgstate_history.enter(pg, enter_time, state_name); - } - virtual ~NamedState() { pg->pgstate_history.exit(state_name); } - }; +protected: + // ------------- + // protected + OSDService *osd; +public: + OSDShard *osd_shard = nullptr; + OSDShardPGSlot *pg_slot = nullptr; +protected: + CephContext *cct; + + // locking and reference counting. + // I destroy myself when the reference count hits zero. + // lock() should be called before doing anything. + // get() should be called on pointer copy (to another thread, etc.). + // put() should be called on destruction of some previously copied pointer. + // unlock() when done with the current pointer (_most common_). + mutable ceph::mutex _lock = ceph::make_mutex("PG::_lock"); +#ifndef CEPH_DEBUG_MUTEX + mutable std::thread::id locked_by; +#endif + std::atomic ref{0}; + +#ifdef PG_DEBUG_REFS + ceph::mutex _ref_id_lock = ceph::make_mutex("PG::_ref_id_lock"); + std::map _live_ids; + std::map _tag_counts; + uint64_t _ref_id = 0; + + friend uint64_t get_with_id(PG *pg) { return pg->get_with_id(); } + friend void put_with_id(PG *pg, uint64_t id) { return pg->put_with_id(id); } +#endif + +private: + friend void intrusive_ptr_add_ref(PG *pg) { + pg->get("intptr"); + } + friend void intrusive_ptr_release(PG *pg) { + pg->put("intptr"); + } + // ===================== protected: + OSDriver osdriver; + SnapMapper snap_mapper; - /* - * peer_info -- projected (updates _before_ replicas ack) - * peer_missing -- committed (updates _after_ replicas ack) - */ - - bool need_up_thru; - set stray_set; // non-acting osds that have PG data. - eversion_t oldest_update; // acting: lowest (valid) last_update in active set - map peer_info; // info from peers (stray or prior) - set peer_purged; // peers purged - map peer_missing; - set peer_log_requested; // logs i've requested (and start stamps) - set peer_missing_requested; + virtual PGBackend *get_pgbackend() = 0; + virtual const PGBackend* get_pgbackend() const = 0; - // i deleted these strays; ignore racing PGInfo from them - set peer_activated; +protected: + void requeue_map_waiters(); - // primary-only, recovery-only state - set might_have_unfound; // These osds might have objects on them - // which are unfound on the primary - epoch_t last_peering_reset; +protected: + ZTracer::Endpoint trace_endpoint; - /* heartbeat peers */ - void set_probe_targets(const set &probe_set); - void clear_probe_targets(); -public: - Mutex heartbeat_peer_lock; - set heartbeat_peers; - set probe_targets; - /** - * BackfillInterval - * - * Represents the objects in a range [begin, end) - * - * Possible states: - * 1) begin == end == hobject_t() indicates the the interval is unpopulated - * 2) Else, objects contains all objects in [begin, end) - */ - struct BackfillInterval { - // info about a backfill interval on a peer - eversion_t version; /// version at which the scan occurred - map objects; - hobject_t begin; - hobject_t end; - - /// clear content - void clear() { - *this = BackfillInterval(); - } +protected: + __u8 info_struct_v = 0; + void upgrade(ObjectStore *store); - /// clear objects list only - void clear_objects() { - objects.clear(); - } +protected: + ghobject_t pgmeta_oid; - /// reinstantiate with a new start+end position and sort order - void reset(hobject_t start) { - clear(); - begin = end = start; - } + // ------------------ + interval_set snap_trimq; + std::set snap_trimq_repeat; - /// true if there are no objects in this interval - bool empty() const { - return objects.empty(); - } + /* You should not use these items without taking their respective queue locks + * (if they have one) */ + xlist::item stat_queue_item; + bool recovery_queued; - /// true if interval extends to the end of the range - bool extends_to_end() const { - return end.is_max(); - } + int recovery_ops_active; + std::set waiting_on_backfill; +#ifdef DEBUG_RECOVERY_OIDS + multiset recovering_oids; +#endif - /// removes items <= soid and adjusts begin to the first object - void trim_to(const hobject_t &soid) { - trim(); - while (!objects.empty() && - objects.begin()->first <= soid) { - pop_front(); - } - } +public: + bool dne() { return info.dne(); } - /// Adjusts begin to the first object - void trim() { - if (!objects.empty()) - begin = objects.begin()->first; - else - begin = end; - } + void send_cluster_message( + int osd, MessageRef m, epoch_t epoch, bool share_map_update) override; + +protected: + epoch_t get_last_peering_reset() const { + return recovery_state.get_last_peering_reset(); + } - /// drop first entry, and adjust @begin accordingly - void pop_front() { - assert(!objects.empty()); - objects.erase(objects.begin()); - trim(); - } + /* heartbeat peers */ + void set_probe_targets(const std::set &probe_set) override; + void clear_probe_targets() override; - /// dump - void dump(Formatter *f) const { - f->dump_stream("begin") << begin; - f->dump_stream("end") << end; - f->open_array_section("objects"); - for (map::const_iterator i = - objects.begin(); - i != objects.end(); - ++i) { - f->open_object_section("object"); - f->dump_stream("object") << i->first; - f->dump_stream("version") << i->second; - f->close_section(); - } - f->close_section(); - } - }; + ceph::mutex heartbeat_peer_lock = + ceph::make_mutex("PG::heartbeat_peer_lock"); + std::set heartbeat_peers; + std::set probe_targets; protected: BackfillInterval backfill_info; - map peer_backfill_info; - bool backfill_reserved; + std::map peer_backfill_info; bool backfill_reserving; - friend class OSD; + // The primary's num_bytes and local num_bytes for this pg, only valid + // during backfill for non-primary shards. + // Both of these are adjusted for EC to reflect the on-disk bytes + std::atomic primary_num_bytes = 0; + std::atomic local_num_bytes = 0; public: - set backfill_targets; + // Space reserved for backfill is primary_num_bytes - local_num_bytes + // Don't care that difference itself isn't atomic + uint64_t get_reserved_num_bytes() { + int64_t primary = primary_num_bytes.load(); + int64_t local = local_num_bytes.load(); + if (primary > local) + return primary - local; + else + return 0; + } + + bool is_remote_backfilling() { + return primary_num_bytes.load() > 0; + } + + bool try_reserve_recovery_space(int64_t primary, int64_t local) override; + void unreserve_recovery_space() override; + + // If num_bytes are inconsistent and local_num- goes negative + // it's ok, because it would then be ignored. + + // The value of num_bytes could be negative, + // but we don't let local_num_bytes go negative. + void add_local_num_bytes(int64_t num_bytes) { + if (num_bytes) { + int64_t prev_bytes = local_num_bytes.load(); + int64_t new_bytes; + do { + new_bytes = prev_bytes + num_bytes; + if (new_bytes < 0) + new_bytes = 0; + } while(!local_num_bytes.compare_exchange_weak(prev_bytes, new_bytes)); + } + } + void sub_local_num_bytes(int64_t num_bytes) { + ceph_assert(num_bytes >= 0); + if (num_bytes) { + int64_t prev_bytes = local_num_bytes.load(); + int64_t new_bytes; + do { + new_bytes = prev_bytes - num_bytes; + if (new_bytes < 0) + new_bytes = 0; + } while(!local_num_bytes.compare_exchange_weak(prev_bytes, new_bytes)); + } + } + // The value of num_bytes could be negative, + // but we don't let info.stats.stats.sum.num_bytes go negative. + void add_num_bytes(int64_t num_bytes) { + ceph_assert(ceph_mutex_is_locked_by_me(_lock)); + if (num_bytes) { + recovery_state.update_stats( + [num_bytes](auto &history, auto &stats) { + stats.stats.sum.num_bytes += num_bytes; + if (stats.stats.sum.num_bytes < 0) { + stats.stats.sum.num_bytes = 0; + } + return false; + }); + } + } + void sub_num_bytes(int64_t num_bytes) { + ceph_assert(ceph_mutex_is_locked_by_me(_lock)); + ceph_assert(num_bytes >= 0); + if (num_bytes) { + recovery_state.update_stats( + [num_bytes](auto &history, auto &stats) { + stats.stats.sum.num_bytes -= num_bytes; + if (stats.stats.sum.num_bytes < 0) { + stats.stats.sum.num_bytes = 0; + } + return false; + }); + } + } - bool is_backfill_targets(pg_shard_t osd) { - return backfill_targets.count(osd); + // Only used in testing so not worried about needing the PG lock here + int64_t get_stats_num_bytes() { + std::lock_guard l{_lock}; + int num_bytes = info.stats.stats.sum.num_bytes; + if (pool.info.is_erasure()) { + num_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count(); + // Round up each object by a stripe + num_bytes += get_pgbackend()->get_ec_stripe_chunk_size() * info.stats.stats.sum.num_objects; + } + int64_t lnb = local_num_bytes.load(); + if (lnb && lnb != num_bytes) { + lgeneric_dout(cct, 0) << this << " " << info.pgid << " num_bytes mismatch " + << lnb << " vs stats " + << info.stats.stats.sum.num_bytes << " / chunk " + << get_pgbackend()->get_ec_data_chunk_count() + << dendl; + } + return num_bytes; } protected: @@ -866,14 +991,17 @@ protected: * - waiting_for_map * - may start or stop blocking at any time (depending on client epoch) * - waiting_for_peered - * - !is_peered() or flushes_in_progress + * - !is_peered() * - only starts blocking on interval change; never restarts + * - waiting_for_flush + * - flushes_in_progress + * - waiting for final flush during activate * - waiting_for_active * - !is_active() * - only starts blocking on interval change; never restarts - * - waiting_for_flush - * - is_active() and flushes_in_progress - * - waiting for final flush during activate + * - waiting_for_readable + * - now > readable_until + * - unblocks when we get fresh(er) osd_pings * - waiting_for_scrub * - starts and stops blocking for varying intervals during scrub * - waiting_for_unreadable_object @@ -894,7 +1022,7 @@ protected: * queues because we assume they cannot apply at that time (this is * probably mostly true). * - * 3. The requeue_ops helper will push ops onto the waiting_for_map list if + * 3. The requeue_ops helper will push ops onto the waiting_for_map std::list if * it is non-empty. * * These three behaviors are generally sufficient to maintain ordering, with @@ -903,141 +1031,63 @@ protected: * encounter an unexpected error. FIXME. */ - // pg waiters - unsigned flushes_in_progress; - // ops with newer maps than our (or blocked behind them) // track these by client, since inter-request ordering doesn't otherwise // matter. - unordered_map> waiting_for_map; + std::unordered_map> waiting_for_map; // ops waiting on peered - list waiting_for_peered; + std::list waiting_for_peered; + + /// ops waiting on readble + std::list waiting_for_readable; // ops waiting on active (require peered as well) - list waiting_for_active; - list waiting_for_flush; - list waiting_for_scrub; + std::list waiting_for_active; + std::list waiting_for_flush; + std::list waiting_for_scrub; - list waiting_for_cache_not_full; - list waiting_for_clean_to_primary_repair; - map> waiting_for_unreadable_object, + std::list waiting_for_cache_not_full; + std::list waiting_for_clean_to_primary_repair; + std::map> waiting_for_unreadable_object, waiting_for_degraded_object, waiting_for_blocked_object; - set objects_blocked_on_cache_full; - map objects_blocked_on_degraded_snap; - map objects_blocked_on_snap_promotion; + std::set objects_blocked_on_cache_full; + std::map objects_blocked_on_degraded_snap; + std::map objects_blocked_on_snap_promotion; // Callbacks should assume pg (and nothing else) is locked - map> callbacks_for_degraded_object; + std::map> callbacks_for_degraded_object; - map > > waiting_for_ondisk; + std::map>>> waiting_for_ondisk; - void requeue_object_waiters(map>& m); + void requeue_object_waiters(std::map>& m); void requeue_op(OpRequestRef op); - void requeue_ops(list &l); + void requeue_ops(std::list &l); // stats that persist lazily object_stat_collection_t unstable_stats; // publish stats - Mutex pg_stats_publish_lock; - bool pg_stats_publish_valid; - pg_stat_t pg_stats_publish; - - // for ordering writes - ceph::shared_ptr osr; - - void _update_calc_stats(); - void _update_blocked_by(); - void publish_stats_to_osd(); - void clear_publish_stats(); + ceph::mutex pg_stats_publish_lock = + ceph::make_mutex("PG::pg_stats_publish_lock"); + std::optional pg_stats_publish; -public: - void clear_primary_state(); + friend class TestOpsSocketHook; + void publish_stats_to_osd() override; - bool is_actingbackfill(pg_shard_t osd) const { - return actingbackfill.count(osd); - } - bool is_acting(pg_shard_t osd) const { - return has_shard(pool.info.ec_pool(), acting, osd); - } - bool is_up(pg_shard_t osd) const { - return has_shard(pool.info.ec_pool(), up, osd); - } - static bool has_shard(bool ec, const vector& v, pg_shard_t osd) { - if (ec) { - return v.size() > (unsigned)osd.shard && v[osd.shard] == osd.osd; - } else { - return std::find(v.begin(), v.end(), osd.osd) != v.end(); - } + bool needs_recovery() const { + return recovery_state.needs_recovery(); } - - bool needs_recovery() const; - bool needs_backfill() const; - - /// clip calculated priority to reasonable range - inline int clamp_recovery_priority(int priority); - /// get log recovery reservation priority - unsigned get_recovery_priority(); - /// get backfill reservation priority - unsigned get_backfill_priority(); - - void mark_clean(); ///< mark an active pg clean - void _change_recovery_force_mode(int new_mode, bool clear); - - /// return [start,end) bounds for required past_intervals - static pair get_required_past_interval_bounds( - const pg_info_t &info, - epoch_t oldest_map) { - epoch_t start = MAX( - info.history.last_epoch_clean ? info.history.last_epoch_clean : - info.history.epoch_pool_created, - oldest_map); - epoch_t end = MAX( - info.history.same_interval_since, - info.history.epoch_pool_created); - return make_pair(start, end); + bool needs_backfill() const { + return recovery_state.needs_backfill(); } - void check_past_interval_bounds() const; - PastIntervals::PriorSet build_prior(); - - void remove_down_peer_info(const OSDMapRef osdmap); - - bool adjust_need_up_thru(const OSDMapRef osdmap); bool all_unfound_are_queried_or_lost(const OSDMapRef osdmap) const; - virtual void dump_recovery_info(Formatter *f) const = 0; - - bool calc_min_last_complete_ondisk() { - eversion_t min = last_complete_ondisk; - assert(!actingbackfill.empty()); - for (set::iterator i = actingbackfill.begin(); - i != actingbackfill.end(); - ++i) { - if (*i == get_primary()) continue; - if (peer_last_complete_ondisk.count(*i) == 0) - return false; // we don't have complete info - eversion_t a = peer_last_complete_ondisk[*i]; - if (a < min) - min = a; - } - if (min == min_last_complete_ondisk) - return false; - min_last_complete_ondisk = min; - return true; - } - - virtual void calc_trim_to() = 0; - - void proc_replica_log(pg_info_t &oinfo, const pg_log_t &olog, - pg_missing_t& omissing, pg_shard_t from); - void proc_master_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog, - pg_missing_t& omissing, pg_shard_t from); - bool proc_replica_info( - pg_shard_t from, const pg_info_t &info, epoch_t send_epoch); struct PGLogEntryHandler : public PGLog::LogEntryHandler { PG *pg; @@ -1052,7 +1102,7 @@ public: pg->get_pgbackend()->try_stash(hoid, v, t); } void rollback(const pg_log_entry_t &entry) override { - assert(entry.can_rollback()); + ceph_assert(entry.can_rollback()); pg->get_pgbackend()->rollback(entry, t); } void rollforward(const pg_log_entry_t &entry) override { @@ -1062,320 +1112,90 @@ public: pg->get_pgbackend()->trim(entry, t); } }; - + void update_object_snap_mapping( ObjectStore::Transaction *t, const hobject_t &soid, - const set &snaps); + const std::set &snaps); void clear_object_snap_mapping( ObjectStore::Transaction *t, const hobject_t &soid); void remove_snap_mapped_object( ObjectStore::Transaction& t, const hobject_t& soid); - void merge_log( - ObjectStore::Transaction& t, pg_info_t &oinfo, - pg_log_t &olog, pg_shard_t from); - void rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead); - bool search_for_missing( - const pg_info_t &oinfo, const pg_missing_t &omissing, - pg_shard_t fromosd, - RecoveryCtx*); - - void check_for_lost_objects(); - void forget_lost_objects(); - - void discover_all_missing(std::map > &query_map); - - void trim_write_ahead(); - - map::const_iterator find_best_info( - const map &infos, - bool restrict_to_up_acting, - bool *history_les_bound) const; - static void calc_ec_acting( - map::const_iterator auth_log_shard, - unsigned size, - const vector &acting, - pg_shard_t acting_primary, - const vector &up, - pg_shard_t up_primary, - const map &all_info, - bool restrict_to_up_acting, - vector *want, - set *backfill, - set *acting_backfill, - pg_shard_t *want_primary, - ostream &ss); - static void calc_replicated_acting( - map::const_iterator auth_log_shard, - unsigned size, - const vector &acting, - pg_shard_t acting_primary, - const vector &up, - pg_shard_t up_primary, - const map &all_info, - bool restrict_to_up_acting, - vector *want, - set *backfill, - set *acting_backfill, - pg_shard_t *want_primary, - ostream &ss); - bool choose_acting(pg_shard_t &auth_log_shard, - bool restrict_to_up_acting, - bool *history_les_bound); - void build_might_have_unfound(); - void activate( - ObjectStore::Transaction& t, - epoch_t activation_epoch, - list& tfin, - map >& query_map, - map > > *activator_map, - RecoveryCtx *ctx); - void _activate_committed(epoch_t epoch, epoch_t activation_epoch); - void all_activated_and_committed(); - - void proc_primary_info(ObjectStore::Transaction &t, const pg_info_t &info); bool have_unfound() const { - return missing_loc.have_unfound(); + return recovery_state.have_unfound(); } uint64_t get_num_unfound() const { - return missing_loc.num_unfound(); + return recovery_state.get_num_unfound(); } virtual void check_local() = 0; - /** - * @param ops_begun returns how many recovery ops the function started - * @returns true if any useful work was accomplished; false otherwise - */ - virtual bool start_recovery_ops( - uint64_t max, - ThreadPool::TPHandle &handle, - uint64_t *ops_begun) = 0; - void purge_strays(); - void update_heartbeat_peers(); + void update_heartbeat_peers(std::set peers) override; Context *finish_sync_event; - void finish_recovery(list& tfin); + Context *finish_recovery(); void _finish_recovery(Context *c); + struct C_PG_FinishRecovery : public Context { + PGRef pg; + explicit C_PG_FinishRecovery(PG *p) : pg(p) {} + void finish(int r) override { + pg->_finish_recovery(this); + } + }; void cancel_recovery(); void clear_recovery_state(); virtual void _clear_recovery_state() = 0; - virtual void check_recovery_sources(const OSDMapRef& newmap) = 0; void start_recovery_op(const hobject_t& soid); void finish_recovery_op(const hobject_t& soid, bool dequeue=false); - void split_into(pg_t child_pgid, PG *child, unsigned split_bits); virtual void _split_into(pg_t child_pgid, PG *child, unsigned split_bits) = 0; friend class C_OSD_RepModify_Commit; + friend struct C_DeleteMore; // -- backoff -- - Mutex backoff_lock; // orders inside Backoff::lock - map> backoffs; + ceph::mutex backoff_lock = // orders inside Backoff::lock + ceph::make_mutex("PG::backoff_lock"); + std::map>> backoffs; - void add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end); + void add_backoff(const ceph::ref_t& s, const hobject_t& begin, const hobject_t& end); void release_backoffs(const hobject_t& begin, const hobject_t& end); void release_backoffs(const hobject_t& o) { release_backoffs(o, o); } void clear_backoffs(); - void add_pg_backoff(SessionRef s) { + void add_pg_backoff(const ceph::ref_t& s) { hobject_t begin = info.pgid.pgid.get_hobj_start(); hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num()); add_backoff(s, begin, end); } +public: void release_pg_backoffs() { hobject_t begin = info.pgid.pgid.get_hobj_start(); hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num()); release_backoffs(begin, end); } - void rm_backoff(BackoffRef b); - // -- scrub -- - struct Scrubber { - Scrubber(); - ~Scrubber(); - - // metadata - set reserved_peers; - bool reserved, reserve_failed; - epoch_t epoch_start; - - // common to both scrubs - bool active; - int waiting_on; - set waiting_on_whom; - int shallow_errors; - int deep_errors; - int fixed; - ScrubMap primary_scrubmap; - map received_maps; - OpRequestRef active_rep_scrub; - utime_t scrub_reg_stamp; // stamp we registered for - - // For async sleep - bool sleeping = false; - bool needs_sleep = true; - utime_t sleep_start; - - // flags to indicate explicitly requested scrubs (by admin) - bool must_scrub, must_deep_scrub, must_repair; - - // Priority to use for scrub scheduling - unsigned priority; - - // this flag indicates whether we would like to do auto-repair of the PG or not - bool auto_repair; - - // Maps from objects with errors to missing/inconsistent peers - map> missing; - map> inconsistent; - - // Map from object with errors to good peers - map >> authoritative; - - // Cleaned map pending snap metadata scrub - ScrubMap cleaned_meta_map; - - // digest updates which we are waiting on - int num_digest_updates_pending; - - // chunky scrub - hobject_t start, end; - eversion_t subset_last_update; - - // chunky scrub state - enum State { - INACTIVE, - NEW_CHUNK, - WAIT_PUSHES, - WAIT_LAST_UPDATE, - BUILD_MAP, - WAIT_REPLICAS, - COMPARE_MAPS, - WAIT_DIGEST_UPDATES, - FINISH, - } state; - - std::unique_ptr store; - // deep scrub - bool deep; - uint32_t seed; - - list callbacks; - void add_callback(Context *context) { - callbacks.push_back(context); - } - void run_callbacks() { - list to_run; - to_run.swap(callbacks); - for (list::iterator i = to_run.begin(); - i != to_run.end(); - ++i) { - (*i)->complete(0); - } - } - - static const char *state_string(const PG::Scrubber::State& state) { - const char *ret = NULL; - switch( state ) - { - case INACTIVE: ret = "INACTIVE"; break; - case NEW_CHUNK: ret = "NEW_CHUNK"; break; - case WAIT_PUSHES: ret = "WAIT_PUSHES"; break; - case WAIT_LAST_UPDATE: ret = "WAIT_LAST_UPDATE"; break; - case BUILD_MAP: ret = "BUILD_MAP"; break; - case WAIT_REPLICAS: ret = "WAIT_REPLICAS"; break; - case COMPARE_MAPS: ret = "COMPARE_MAPS"; break; - case WAIT_DIGEST_UPDATES: ret = "WAIT_DIGEST_UPDATES"; break; - case FINISH: ret = "FINISH"; break; - } - return ret; - } - - bool is_chunky_scrub_active() const { return state != INACTIVE; } - - // classic (non chunk) scrubs block all writes - // chunky scrubs only block writes to a range - bool write_blocked_by_scrub(const hobject_t &soid) { - return (soid >= start && soid < end); - } - - // clear all state - void reset() { - active = false; - waiting_on = 0; - waiting_on_whom.clear(); - if (active_rep_scrub) { - active_rep_scrub = OpRequestRef(); - } - received_maps.clear(); - - must_scrub = false; - must_deep_scrub = false; - must_repair = false; - auto_repair = false; - - state = PG::Scrubber::INACTIVE; - start = hobject_t(); - end = hobject_t(); - subset_last_update = eversion_t(); - shallow_errors = 0; - deep_errors = 0; - fixed = 0; - deep = false; - seed = 0; - run_callbacks(); - inconsistent.clear(); - missing.clear(); - authoritative.clear(); - num_digest_updates_pending = 0; - cleaned_meta_map = ScrubMap(); - sleeping = false; - needs_sleep = true; - sleep_start = utime_t(); - } - - void create_results(const hobject_t& obj); - void cleanup_store(ObjectStore::Transaction *t); - } scrubber; - +protected: bool scrub_after_recovery; int active_pushes; void repair_object( - const hobject_t& soid, list > *ok_peers, - pg_shard_t bad_peer); + const hobject_t &soid, + const std::list > &ok_peers, + const std::set &bad_peers); + + [[nodiscard]] bool ops_blocked_by_scrub() const; + [[nodiscard]] Scrub::scrub_prio_t is_scrub_blocking_ops() const; - void scrub(epoch_t queued, ThreadPool::TPHandle &handle); - void chunky_scrub(ThreadPool::TPHandle &handle); - void scrub_compare_maps(); - /** - * return true if any inconsistency/missing is repaired, false otherwise - */ - bool scrub_process_inconsistent(); - bool ops_blocked_by_scrub() const; - void scrub_finish(); - void scrub_clear_state(); - void _scan_snaps(ScrubMap &map); void _repair_oinfo_oid(ScrubMap &map); - void _scan_rollback_obs( - const vector &rollback_obs, - ThreadPool::TPHandle &handle); - void _request_scrub_map(pg_shard_t replica, eversion_t version, - hobject_t start, hobject_t end, bool deep, - uint32_t seed); - int build_scrub_map_chunk( - ScrubMap &map, - hobject_t start, hobject_t end, bool deep, uint32_t seed, - ThreadPool::TPHandle &handle); + void _scan_rollback_obs(const std::vector &rollback_obs); /** * returns true if [begin, end) is good to scrub at this time * a false return value obliges the implementer to requeue scrub when the @@ -1383,1197 +1203,137 @@ public: */ virtual bool _range_available_for_scrub( const hobject_t &begin, const hobject_t &end) = 0; - virtual void scrub_snapshot_metadata( - ScrubMap &map, - const std::map> &missing_digest) { } - virtual void _scrub_clear_state() { } - virtual void _scrub_finish() { } - virtual void split_colls( - spg_t child, - int split_bits, - int seed, - const pg_pool_t *pool, - ObjectStore::Transaction *t) = 0; - void clear_scrub_reserved(); - void scrub_reserve_replicas(); - void scrub_unreserve_replicas(); - bool scrub_all_replicas_reserved() const; - bool sched_scrub(); - void reg_next_scrub(); - void unreg_next_scrub(); - - void replica_scrub( - OpRequestRef op, - ThreadPool::TPHandle &handle); - void do_replica_scrub_map(OpRequestRef op); - void sub_op_scrub_map(OpRequestRef op); - - void handle_scrub_reserve_request(OpRequestRef op); - void handle_scrub_reserve_grant(OpRequestRef op, pg_shard_t from); - void handle_scrub_reserve_reject(OpRequestRef op, pg_shard_t from); - void handle_scrub_reserve_release(OpRequestRef op); - - void reject_reservation(); - void schedule_backfill_retry(float retry); - void schedule_recovery_retry(float retry); + + /** + * Initiate the process that will create our scrub map for the Primary. + * (triggered by MSG_OSD_REP_SCRUB) + */ + void replica_scrub(OpRequestRef op, ThreadPool::TPHandle &handle); // -- recovery state -- - template struct QueuePeeringEvt : Context { PGRef pg; - epoch_t epoch; - EVT evt; + PGPeeringEventRef evt; + + template QueuePeeringEvt(PG *pg, epoch_t epoch, EVT evt) : - pg(pg), epoch(epoch), evt(evt) {} + pg(pg), evt(std::make_shared(epoch, epoch, evt)) {} + + QueuePeeringEvt(PG *pg, PGPeeringEventRef evt) : + pg(pg), evt(std::move(evt)) {} + void finish(int r) override { pg->lock(); - pg->queue_peering_event(PG::CephPeeringEvtRef( - new PG::CephPeeringEvt( - epoch, - epoch, - evt))); + pg->queue_peering_event(std::move(evt)); pg->unlock(); } }; - class CephPeeringEvt { - epoch_t epoch_sent; - epoch_t epoch_requested; - boost::intrusive_ptr< const boost::statechart::event_base > evt; - string desc; - public: - MEMPOOL_CLASS_HELPERS(); - template - CephPeeringEvt(epoch_t epoch_sent, - epoch_t epoch_requested, - const T &evt_) : - epoch_sent(epoch_sent), epoch_requested(epoch_requested), - evt(evt_.intrusive_from_this()) { - stringstream out; - out << "epoch_sent: " << epoch_sent - << " epoch_requested: " << epoch_requested << " "; - evt_.print(&out); - desc = out.str(); - } - epoch_t get_epoch_sent() { return epoch_sent; } - epoch_t get_epoch_requested() { return epoch_requested; } - const boost::statechart::event_base &get_event() { return *evt; } - string get_desc() { return desc; } - }; - typedef ceph::shared_ptr CephPeeringEvtRef; - list peering_queue; // op queue - list peering_waiters; - - struct QueryState : boost::statechart::event< QueryState > { - Formatter *f; - explicit QueryState(Formatter *f) : f(f) {} - void print(std::ostream *out) const { - *out << "Query"; - } - }; - - struct MInfoRec : boost::statechart::event< MInfoRec > { - pg_shard_t from; - pg_info_t info; - epoch_t msg_epoch; - MInfoRec(pg_shard_t from, const pg_info_t &info, epoch_t msg_epoch) : - from(from), info(info), msg_epoch(msg_epoch) {} - void print(std::ostream *out) const { - *out << "MInfoRec from " << from << " info: " << info; - } - }; - - struct MLogRec : boost::statechart::event< MLogRec > { - pg_shard_t from; - boost::intrusive_ptr msg; - MLogRec(pg_shard_t from, MOSDPGLog *msg) : - from(from), msg(msg) {} - void print(std::ostream *out) const { - *out << "MLogRec from " << from; - } - }; - - struct MNotifyRec : boost::statechart::event< MNotifyRec > { - pg_shard_t from; - pg_notify_t notify; - uint64_t features; - MNotifyRec(pg_shard_t from, const pg_notify_t ¬ify, uint64_t f) : - from(from), notify(notify), features(f) {} - void print(std::ostream *out) const { - *out << "MNotifyRec from " << from << " notify: " << notify - << " features: 0x" << hex << features << dec; - } - }; - - struct MQuery : boost::statechart::event< MQuery > { - pg_shard_t from; - pg_query_t query; - epoch_t query_epoch; - MQuery(pg_shard_t from, const pg_query_t &query, epoch_t query_epoch): - from(from), query(query), query_epoch(query_epoch) {} - void print(std::ostream *out) const { - *out << "MQuery from " << from - << " query_epoch " << query_epoch - << " query: " << query; - } - }; - - struct AdvMap : boost::statechart::event< AdvMap > { - OSDMapRef osdmap; - OSDMapRef lastmap; - vector newup, newacting; - int up_primary, acting_primary; - AdvMap( - OSDMapRef osdmap, OSDMapRef lastmap, - vector& newup, int up_primary, - vector& newacting, int acting_primary): - osdmap(osdmap), lastmap(lastmap), - newup(newup), - newacting(newacting), - up_primary(up_primary), - acting_primary(acting_primary) {} - void print(std::ostream *out) const { - *out << "AdvMap"; - } - }; - struct ActMap : boost::statechart::event< ActMap > { - ActMap() : boost::statechart::event< ActMap >() {} - void print(std::ostream *out) const { - *out << "ActMap"; - } - }; - struct Activate : boost::statechart::event< Activate > { - epoch_t activation_epoch; - explicit Activate(epoch_t q) : boost::statechart::event< Activate >(), - activation_epoch(q) {} - void print(std::ostream *out) const { - *out << "Activate from " << activation_epoch; - } - }; - struct RequestBackfillPrio : boost::statechart::event< RequestBackfillPrio > { - unsigned priority; - explicit RequestBackfillPrio(unsigned prio) : - boost::statechart::event< RequestBackfillPrio >(), - priority(prio) {} - void print(std::ostream *out) const { - *out << "RequestBackfillPrio: priority " << priority; - } - }; -#define TrivialEvent(T) struct T : boost::statechart::event< T > { \ - T() : boost::statechart::event< T >() {} \ - void print(std::ostream *out) const { \ - *out << #T; \ - } \ - }; - struct DeferBackfill : boost::statechart::event { - float delay; - explicit DeferBackfill(float delay) : delay(delay) {} - void print(std::ostream *out) const { - *out << "DeferBackfill: delay " << delay; - } - }; - struct DeferRecovery : boost::statechart::event { - float delay; - explicit DeferRecovery(float delay) : delay(delay) {} - void print(std::ostream *out) const { - *out << "DeferRecovery: delay " << delay; - } - }; - struct UnfoundBackfill : boost::statechart::event { - explicit UnfoundBackfill() {} - void print(std::ostream *out) const { - *out << "UnfoundBackfill"; - } - }; - struct UnfoundRecovery : boost::statechart::event { - explicit UnfoundRecovery() {} - void print(std::ostream *out) const { - *out << "UnfoundRecovery"; - } - }; +public: + int pg_stat_adjust(osd_stat_t *new_stat); protected: - TrivialEvent(Initialize) - TrivialEvent(Load) - TrivialEvent(GotInfo) - TrivialEvent(NeedUpThru) - TrivialEvent(NullEvt) - TrivialEvent(FlushedEvt) - TrivialEvent(Backfilled) - TrivialEvent(LocalBackfillReserved) - TrivialEvent(RemoteBackfillReserved) - TrivialEvent(RejectRemoteReservation) - TrivialEvent(RemoteReservationRejected) - TrivialEvent(RemoteReservationCanceled) - TrivialEvent(RequestBackfill) - TrivialEvent(RequestRecovery) - TrivialEvent(RecoveryDone) - TrivialEvent(BackfillTooFull) - TrivialEvent(RecoveryTooFull) - - TrivialEvent(MakePrimary) - TrivialEvent(MakeStray) - TrivialEvent(NeedActingChange) - TrivialEvent(IsIncomplete) - TrivialEvent(IsDown) - - TrivialEvent(AllReplicasRecovered) - TrivialEvent(DoRecovery) - TrivialEvent(LocalRecoveryReserved) - TrivialEvent(RemoteRecoveryReserved) - TrivialEvent(AllRemotesReserved) - TrivialEvent(AllBackfillsReserved) - TrivialEvent(GoClean) - - TrivialEvent(AllReplicasActivated) - - TrivialEvent(IntervalFlush) - - /* Encapsulates PG recovery process */ - class RecoveryState { - void start_handle(RecoveryCtx *new_ctx); - void end_handle(); - public: - void begin_block_outgoing(); - void end_block_outgoing(); - void clear_blocked_outgoing(); - private: - - /* States */ - struct Initial; - class RecoveryMachine : public boost::statechart::state_machine< RecoveryMachine, Initial > { - RecoveryState *state; - public: - PG *pg; - - utime_t event_time; - uint64_t event_count; - - void clear_event_counters() { - event_time = utime_t(); - event_count = 0; - } - - void log_enter(const char *state_name); - void log_exit(const char *state_name, utime_t duration); - - RecoveryMachine(RecoveryState *state, PG *pg) : state(state), pg(pg), event_count(0) {} - - /* Accessor functions for state methods */ - ObjectStore::Transaction* get_cur_transaction() { - assert(state->rctx); - assert(state->rctx->transaction); - return state->rctx->transaction; - } - - void send_query(pg_shard_t to, const pg_query_t &query) { - assert(state->rctx); - assert(state->rctx->query_map); - (*state->rctx->query_map)[to.osd][spg_t(pg->info.pgid.pgid, to.shard)] = - query; - } - - map > *get_query_map() { - assert(state->rctx); - assert(state->rctx->query_map); - return state->rctx->query_map; - } - - map > > *get_info_map() { - assert(state->rctx); - assert(state->rctx->info_map); - return state->rctx->info_map; - } - - list< Context* > *get_on_safe_context_list() { - assert(state->rctx); - assert(state->rctx->on_safe); - return &(state->rctx->on_safe->contexts); - } - - list< Context * > *get_on_applied_context_list() { - assert(state->rctx); - assert(state->rctx->on_applied); - return &(state->rctx->on_applied->contexts); - } - - RecoveryCtx *get_recovery_ctx() { return &*(state->rctx); } - - void send_notify(pg_shard_t to, - const pg_notify_t &info, const PastIntervals &pi) { - assert(state->rctx); - assert(state->rctx->notify_list); - (*state->rctx->notify_list)[to.osd].push_back(make_pair(info, pi)); - } - }; - friend class RecoveryMachine; - - /* States */ - // Initial - // Reset - // Start - // Started - // Primary - // WaitActingChange - // Peering - // GetInfo - // GetLog - // GetMissing - // WaitUpThru - // Incomplete - // Active - // Activating - // Clean - // Recovered - // Backfilling - // WaitRemoteBackfillReserved - // WaitLocalBackfillReserved - // NotBackfilling - // NotRecovering - // Recovering - // WaitRemoteRecoveryReserved - // WaitLocalRecoveryReserved - // ReplicaActive - // RepNotRecovering - // RepRecovering - // RepWaitBackfillReserved - // RepWaitRecoveryReserved - // Stray - - struct Crashed : boost::statechart::state< Crashed, RecoveryMachine >, NamedState { - explicit Crashed(my_context ctx); - }; - - struct Reset; - - struct Initial : boost::statechart::state< Initial, RecoveryMachine >, NamedState { - explicit Initial(my_context ctx); - void exit(); - - typedef boost::mpl::list < - boost::statechart::transition< Initialize, Reset >, - boost::statechart::custom_reaction< Load >, - boost::statechart::custom_reaction< NullEvt >, - boost::statechart::transition< boost::statechart::event_base, Crashed > - > reactions; - - boost::statechart::result react(const Load&); - boost::statechart::result react(const MNotifyRec&); - boost::statechart::result react(const MInfoRec&); - boost::statechart::result react(const MLogRec&); - boost::statechart::result react(const boost::statechart::event_base&) { - return discard_event(); - } - }; - - struct Reset : boost::statechart::state< Reset, RecoveryMachine >, NamedState { - explicit Reset(my_context ctx); - void exit(); - - typedef boost::mpl::list < - boost::statechart::custom_reaction< QueryState >, - boost::statechart::custom_reaction< AdvMap >, - boost::statechart::custom_reaction< ActMap >, - boost::statechart::custom_reaction< NullEvt >, - boost::statechart::custom_reaction< FlushedEvt >, - boost::statechart::custom_reaction< IntervalFlush >, - boost::statechart::transition< boost::statechart::event_base, Crashed > - > reactions; - boost::statechart::result react(const QueryState& q); - boost::statechart::result react(const AdvMap&); - boost::statechart::result react(const ActMap&); - boost::statechart::result react(const FlushedEvt&); - boost::statechart::result react(const IntervalFlush&); - boost::statechart::result react(const boost::statechart::event_base&) { - return discard_event(); - } - }; - - struct Start; - - struct Started : boost::statechart::state< Started, RecoveryMachine, Start >, NamedState { - explicit Started(my_context ctx); - void exit(); - - typedef boost::mpl::list < - boost::statechart::custom_reaction< QueryState >, - boost::statechart::custom_reaction< AdvMap >, - boost::statechart::custom_reaction< NullEvt >, - boost::statechart::custom_reaction< FlushedEvt >, - boost::statechart::custom_reaction< IntervalFlush >, - boost::statechart::transition< boost::statechart::event_base, Crashed > - > reactions; - boost::statechart::result react(const QueryState& q); - boost::statechart::result react(const AdvMap&); - boost::statechart::result react(const FlushedEvt&); - boost::statechart::result react(const IntervalFlush&); - boost::statechart::result react(const boost::statechart::event_base&) { - return discard_event(); - } - }; - - struct Primary; - struct Stray; - - struct Start : boost::statechart::state< Start, Started >, NamedState { - explicit Start(my_context ctx); - void exit(); - - typedef boost::mpl::list < - boost::statechart::transition< MakePrimary, Primary >, - boost::statechart::transition< MakeStray, Stray > - > reactions; - }; - - struct Peering; - struct WaitActingChange; - struct Incomplete; - struct Down; - - struct Primary : boost::statechart::state< Primary, Started, Peering >, NamedState { - explicit Primary(my_context ctx); - void exit(); - - typedef boost::mpl::list < - boost::statechart::custom_reaction< ActMap >, - boost::statechart::custom_reaction< MNotifyRec >, - boost::statechart::transition< NeedActingChange, WaitActingChange > - > reactions; - boost::statechart::result react(const ActMap&); - boost::statechart::result react(const MNotifyRec&); - }; - - struct WaitActingChange : boost::statechart::state< WaitActingChange, Primary>, - NamedState { - typedef boost::mpl::list < - boost::statechart::custom_reaction< QueryState >, - boost::statechart::custom_reaction< AdvMap >, - boost::statechart::custom_reaction< MLogRec >, - boost::statechart::custom_reaction< MInfoRec >, - boost::statechart::custom_reaction< MNotifyRec > - > reactions; - explicit WaitActingChange(my_context ctx); - boost::statechart::result react(const QueryState& q); - boost::statechart::result react(const AdvMap&); - boost::statechart::result react(const MLogRec&); - boost::statechart::result react(const MInfoRec&); - boost::statechart::result react(const MNotifyRec&); - void exit(); - }; - - struct GetInfo; - struct Active; - - struct Peering : boost::statechart::state< Peering, Primary, GetInfo >, NamedState { - PastIntervals::PriorSet prior_set; - bool history_les_bound; //< need osd_find_best_info_ignore_history_les - - explicit Peering(my_context ctx); - void exit(); - - typedef boost::mpl::list < - boost::statechart::custom_reaction< QueryState >, - boost::statechart::transition< Activate, Active >, - boost::statechart::custom_reaction< AdvMap > - > reactions; - boost::statechart::result react(const QueryState& q); - boost::statechart::result react(const AdvMap &advmap); - }; - - struct WaitLocalRecoveryReserved; - struct Activating; - struct Active : boost::statechart::state< Active, Primary, Activating >, NamedState { - explicit Active(my_context ctx); - void exit(); - - const set remote_shards_to_reserve_recovery; - const set remote_shards_to_reserve_backfill; - bool all_replicas_activated; - - typedef boost::mpl::list < - boost::statechart::custom_reaction< QueryState >, - boost::statechart::custom_reaction< ActMap >, - boost::statechart::custom_reaction< AdvMap >, - boost::statechart::custom_reaction< MInfoRec >, - boost::statechart::custom_reaction< MNotifyRec >, - boost::statechart::custom_reaction< MLogRec >, - boost::statechart::custom_reaction< Backfilled >, - boost::statechart::custom_reaction< AllReplicasActivated >, - boost::statechart::custom_reaction< DeferRecovery >, - boost::statechart::custom_reaction< DeferBackfill >, - boost::statechart::custom_reaction< UnfoundRecovery >, - boost::statechart::custom_reaction< UnfoundBackfill >, - boost::statechart::custom_reaction< DoRecovery> - > reactions; - boost::statechart::result react(const QueryState& q); - boost::statechart::result react(const ActMap&); - boost::statechart::result react(const AdvMap&); - boost::statechart::result react(const MInfoRec& infoevt); - boost::statechart::result react(const MNotifyRec& notevt); - boost::statechart::result react(const MLogRec& logevt); - boost::statechart::result react(const Backfilled&) { - return discard_event(); - } - boost::statechart::result react(const AllReplicasActivated&); - boost::statechart::result react(const DeferRecovery& evt) { - return discard_event(); - } - boost::statechart::result react(const DeferBackfill& evt) { - return discard_event(); - } - boost::statechart::result react(const UnfoundRecovery& evt) { - return discard_event(); - } - boost::statechart::result react(const UnfoundBackfill& evt) { - return discard_event(); - } - boost::statechart::result react(const DoRecovery&) { - return discard_event(); - } - }; - - struct Clean : boost::statechart::state< Clean, Active >, NamedState { - typedef boost::mpl::list< - boost::statechart::transition< DoRecovery, WaitLocalRecoveryReserved > - > reactions; - explicit Clean(my_context ctx); - void exit(); - }; - - struct Recovered : boost::statechart::state< Recovered, Active >, NamedState { - typedef boost::mpl::list< - boost::statechart::transition< GoClean, Clean >, - boost::statechart::transition< DoRecovery, WaitLocalRecoveryReserved >, - boost::statechart::custom_reaction< AllReplicasActivated > - > reactions; - explicit Recovered(my_context ctx); - void exit(); - boost::statechart::result react(const AllReplicasActivated&) { - post_event(GoClean()); - return forward_event(); - } - }; - - struct Backfilling : boost::statechart::state< Backfilling, Active >, NamedState { - typedef boost::mpl::list< - boost::statechart::transition< Backfilled, Recovered >, - boost::statechart::custom_reaction< DeferBackfill >, - boost::statechart::custom_reaction< UnfoundBackfill >, - boost::statechart::custom_reaction< RemoteReservationRejected > - > reactions; - explicit Backfilling(my_context ctx); - boost::statechart::result react(const RemoteReservationRejected& evt); - boost::statechart::result react(const DeferBackfill& evt); - boost::statechart::result react(const UnfoundBackfill& evt); - void exit(); - }; - - struct WaitRemoteBackfillReserved : boost::statechart::state< WaitRemoteBackfillReserved, Active >, NamedState { - typedef boost::mpl::list< - boost::statechart::custom_reaction< RemoteBackfillReserved >, - boost::statechart::custom_reaction< RemoteReservationRejected >, - boost::statechart::transition< AllBackfillsReserved, Backfilling > - > reactions; - set::const_iterator backfill_osd_it; - explicit WaitRemoteBackfillReserved(my_context ctx); - void exit(); - boost::statechart::result react(const RemoteBackfillReserved& evt); - boost::statechart::result react(const RemoteReservationRejected& evt); - }; - - struct WaitLocalBackfillReserved : boost::statechart::state< WaitLocalBackfillReserved, Active >, NamedState { - typedef boost::mpl::list< - boost::statechart::transition< LocalBackfillReserved, WaitRemoteBackfillReserved > - > reactions; - explicit WaitLocalBackfillReserved(my_context ctx); - void exit(); - }; - - struct NotBackfilling : boost::statechart::state< NotBackfilling, Active>, NamedState { - typedef boost::mpl::list< - boost::statechart::transition< RequestBackfill, WaitLocalBackfillReserved>, - boost::statechart::custom_reaction< RemoteBackfillReserved >, - boost::statechart::custom_reaction< RemoteReservationRejected > - > reactions; - explicit NotBackfilling(my_context ctx); - void exit(); - boost::statechart::result react(const RemoteBackfillReserved& evt); - boost::statechart::result react(const RemoteReservationRejected& evt); - }; - - struct NotRecovering : boost::statechart::state< NotRecovering, Active>, NamedState { - typedef boost::mpl::list< - boost::statechart::transition< DoRecovery, WaitLocalRecoveryReserved >, - boost::statechart::custom_reaction< DeferRecovery >, - boost::statechart::custom_reaction< UnfoundRecovery > - > reactions; - explicit NotRecovering(my_context ctx); - boost::statechart::result react(const DeferRecovery& evt) { - /* no-op */ - return discard_event(); - } - boost::statechart::result react(const UnfoundRecovery& evt) { - /* no-op */ - return discard_event(); - } - void exit(); - }; - - struct RepNotRecovering; - struct ReplicaActive : boost::statechart::state< ReplicaActive, Started, RepNotRecovering >, NamedState { - explicit ReplicaActive(my_context ctx); - void exit(); - - typedef boost::mpl::list < - boost::statechart::custom_reaction< QueryState >, - boost::statechart::custom_reaction< ActMap >, - boost::statechart::custom_reaction< MQuery >, - boost::statechart::custom_reaction< MInfoRec >, - boost::statechart::custom_reaction< MLogRec >, - boost::statechart::custom_reaction< Activate >, - boost::statechart::custom_reaction< DeferRecovery >, - boost::statechart::custom_reaction< DeferBackfill >, - boost::statechart::custom_reaction< UnfoundRecovery >, - boost::statechart::custom_reaction< UnfoundBackfill > - > reactions; - boost::statechart::result react(const QueryState& q); - boost::statechart::result react(const MInfoRec& infoevt); - boost::statechart::result react(const MLogRec& logevt); - boost::statechart::result react(const ActMap&); - boost::statechart::result react(const MQuery&); - boost::statechart::result react(const Activate&); - boost::statechart::result react(const DeferRecovery& evt) { - return discard_event(); - } - boost::statechart::result react(const DeferBackfill& evt) { - return discard_event(); - } - boost::statechart::result react(const UnfoundRecovery& evt) { - return discard_event(); - } - boost::statechart::result react(const UnfoundBackfill& evt) { - return discard_event(); - } - }; - - struct RepRecovering : boost::statechart::state< RepRecovering, ReplicaActive >, NamedState { - typedef boost::mpl::list< - boost::statechart::transition< RecoveryDone, RepNotRecovering >, - // for compat with old peers - boost::statechart::transition< RemoteReservationRejected, RepNotRecovering >, - boost::statechart::transition< RemoteReservationCanceled, RepNotRecovering >, - boost::statechart::custom_reaction< BackfillTooFull > - > reactions; - explicit RepRecovering(my_context ctx); - boost::statechart::result react(const BackfillTooFull &evt); - void exit(); - }; - - struct RepWaitBackfillReserved : boost::statechart::state< RepWaitBackfillReserved, ReplicaActive >, NamedState { - typedef boost::mpl::list< - boost::statechart::custom_reaction< RemoteBackfillReserved >, - boost::statechart::custom_reaction< RejectRemoteReservation >, - boost::statechart::custom_reaction< RemoteReservationRejected >, - boost::statechart::custom_reaction< RemoteReservationCanceled > - > reactions; - explicit RepWaitBackfillReserved(my_context ctx); - void exit(); - boost::statechart::result react(const RemoteBackfillReserved &evt); - boost::statechart::result react(const RejectRemoteReservation &evt); - boost::statechart::result react(const RemoteReservationRejected &evt); - boost::statechart::result react(const RemoteReservationCanceled &evt); - }; - - struct RepWaitRecoveryReserved : boost::statechart::state< RepWaitRecoveryReserved, ReplicaActive >, NamedState { - typedef boost::mpl::list< - boost::statechart::custom_reaction< RemoteRecoveryReserved >, - // for compat with old peers - boost::statechart::custom_reaction< RemoteReservationRejected >, - boost::statechart::custom_reaction< RemoteReservationCanceled > - > reactions; - explicit RepWaitRecoveryReserved(my_context ctx); - void exit(); - boost::statechart::result react(const RemoteRecoveryReserved &evt); - boost::statechart::result react(const RemoteReservationRejected &evt) { - // for compat with old peers - post_event(RemoteReservationCanceled()); - return discard_event(); - } - boost::statechart::result react(const RemoteReservationCanceled &evt); - }; - - struct RepNotRecovering : boost::statechart::state< RepNotRecovering, ReplicaActive>, NamedState { - typedef boost::mpl::list< - boost::statechart::custom_reaction< RequestBackfillPrio >, - boost::statechart::transition< RequestRecovery, RepWaitRecoveryReserved >, - boost::statechart::custom_reaction< RejectRemoteReservation >, - boost::statechart::transition< RemoteReservationRejected, RepNotRecovering >, - boost::statechart::transition< RemoteReservationCanceled, RepNotRecovering >, - boost::statechart::transition< RecoveryDone, RepNotRecovering > // for compat with pre-reservation peers - > reactions; - explicit RepNotRecovering(my_context ctx); - boost::statechart::result react(const RequestBackfillPrio &evt); - boost::statechart::result react(const RejectRemoteReservation &evt); - void exit(); - }; - - struct Recovering : boost::statechart::state< Recovering, Active >, NamedState { - typedef boost::mpl::list < - boost::statechart::custom_reaction< AllReplicasRecovered >, - boost::statechart::custom_reaction< DeferRecovery >, - boost::statechart::custom_reaction< UnfoundRecovery >, - boost::statechart::custom_reaction< RequestBackfill > - > reactions; - explicit Recovering(my_context ctx); - void exit(); - void release_reservations(bool cancel = false); - boost::statechart::result react(const AllReplicasRecovered &evt); - boost::statechart::result react(const DeferRecovery& evt); - boost::statechart::result react(const UnfoundRecovery& evt); - boost::statechart::result react(const RequestBackfill &evt); - }; - - struct WaitRemoteRecoveryReserved : boost::statechart::state< WaitRemoteRecoveryReserved, Active >, NamedState { - typedef boost::mpl::list < - boost::statechart::custom_reaction< RemoteRecoveryReserved >, - boost::statechart::transition< AllRemotesReserved, Recovering > - > reactions; - set::const_iterator remote_recovery_reservation_it; - explicit WaitRemoteRecoveryReserved(my_context ctx); - boost::statechart::result react(const RemoteRecoveryReserved &evt); - void exit(); - }; - - struct WaitLocalRecoveryReserved : boost::statechart::state< WaitLocalRecoveryReserved, Active >, NamedState { - typedef boost::mpl::list < - boost::statechart::transition< LocalRecoveryReserved, WaitRemoteRecoveryReserved >, - boost::statechart::custom_reaction< RecoveryTooFull > - > reactions; - explicit WaitLocalRecoveryReserved(my_context ctx); - void exit(); - boost::statechart::result react(const RecoveryTooFull &evt); - }; - - struct Activating : boost::statechart::state< Activating, Active >, NamedState { - typedef boost::mpl::list < - boost::statechart::transition< AllReplicasRecovered, Recovered >, - boost::statechart::transition< DoRecovery, WaitLocalRecoveryReserved >, - boost::statechart::transition< RequestBackfill, WaitLocalBackfillReserved > - > reactions; - explicit Activating(my_context ctx); - void exit(); - }; - - struct Stray : boost::statechart::state< Stray, Started >, NamedState { - map > pending_queries; - - explicit Stray(my_context ctx); - void exit(); - - typedef boost::mpl::list < - boost::statechart::custom_reaction< MQuery >, - boost::statechart::custom_reaction< MLogRec >, - boost::statechart::custom_reaction< MInfoRec >, - boost::statechart::custom_reaction< ActMap >, - boost::statechart::custom_reaction< RecoveryDone > - > reactions; - boost::statechart::result react(const MQuery& query); - boost::statechart::result react(const MLogRec& logevt); - boost::statechart::result react(const MInfoRec& infoevt); - boost::statechart::result react(const ActMap&); - boost::statechart::result react(const RecoveryDone&) { - return discard_event(); - } - }; - - struct GetLog; - - struct GetInfo : boost::statechart::state< GetInfo, Peering >, NamedState { - set peer_info_requested; - - explicit GetInfo(my_context ctx); - void exit(); - void get_infos(); - - typedef boost::mpl::list < - boost::statechart::custom_reaction< QueryState >, - boost::statechart::transition< GotInfo, GetLog >, - boost::statechart::custom_reaction< MNotifyRec >, - boost::statechart::transition< IsDown, Down > - > reactions; - boost::statechart::result react(const QueryState& q); - boost::statechart::result react(const MNotifyRec& infoevt); - }; - - struct GotLog : boost::statechart::event< GotLog > { - GotLog() : boost::statechart::event< GotLog >() {} - }; - - struct GetLog : boost::statechart::state< GetLog, Peering >, NamedState { - pg_shard_t auth_log_shard; - boost::intrusive_ptr msg; - - explicit GetLog(my_context ctx); - void exit(); - - typedef boost::mpl::list < - boost::statechart::custom_reaction< QueryState >, - boost::statechart::custom_reaction< MLogRec >, - boost::statechart::custom_reaction< GotLog >, - boost::statechart::custom_reaction< AdvMap >, - boost::statechart::transition< IsIncomplete, Incomplete > - > reactions; - boost::statechart::result react(const AdvMap&); - boost::statechart::result react(const QueryState& q); - boost::statechart::result react(const MLogRec& logevt); - boost::statechart::result react(const GotLog&); - }; - - struct WaitUpThru; - - struct GetMissing : boost::statechart::state< GetMissing, Peering >, NamedState { - set peer_missing_requested; - - explicit GetMissing(my_context ctx); - void exit(); - - typedef boost::mpl::list < - boost::statechart::custom_reaction< QueryState >, - boost::statechart::custom_reaction< MLogRec >, - boost::statechart::transition< NeedUpThru, WaitUpThru > - > reactions; - boost::statechart::result react(const QueryState& q); - boost::statechart::result react(const MLogRec& logevt); - }; - - struct WaitUpThru : boost::statechart::state< WaitUpThru, Peering >, NamedState { - explicit WaitUpThru(my_context ctx); - void exit(); - - typedef boost::mpl::list < - boost::statechart::custom_reaction< QueryState >, - boost::statechart::custom_reaction< ActMap >, - boost::statechart::custom_reaction< MLogRec > - > reactions; - boost::statechart::result react(const QueryState& q); - boost::statechart::result react(const ActMap& am); - boost::statechart::result react(const MLogRec& logrec); - }; - - struct Down : boost::statechart::state< Down, Peering>, NamedState { - explicit Down(my_context ctx); - typedef boost::mpl::list < - boost::statechart::custom_reaction< QueryState > - > reactions; - boost::statechart::result react(const QueryState& infoevt); - void exit(); - }; - - struct Incomplete : boost::statechart::state< Incomplete, Peering>, NamedState { - typedef boost::mpl::list < - boost::statechart::custom_reaction< AdvMap >, - boost::statechart::custom_reaction< MNotifyRec >, - boost::statechart::custom_reaction< QueryState > - > reactions; - explicit Incomplete(my_context ctx); - boost::statechart::result react(const AdvMap &advmap); - boost::statechart::result react(const MNotifyRec& infoevt); - boost::statechart::result react(const QueryState& infoevt); - void exit(); - }; - - - RecoveryMachine machine; - PG *pg; - - /// context passed in by state machine caller - RecoveryCtx *orig_ctx; - - /// populated if we are buffering messages pending a flush - boost::optional messages_pending_flush; - - /** - * populated between start_handle() and end_handle(), points into - * the message lists for messages_pending_flush while blocking messages - * or into orig_ctx otherwise - */ - boost::optional rctx; - - public: - explicit RecoveryState(PG *pg) - : machine(this, pg), pg(pg), orig_ctx(0) { - machine.initiate(); - } - - void handle_event(const boost::statechart::event_base &evt, - RecoveryCtx *rctx) { - start_handle(rctx); - machine.process_event(evt); - end_handle(); - } - - void handle_event(CephPeeringEvtRef evt, - RecoveryCtx *rctx) { - start_handle(rctx); - machine.process_event(evt->get_event()); - end_handle(); - } - - } recovery_state; + bool delete_needs_sleep = false; +protected: + bool state_test(uint64_t m) const { return recovery_state.state_test(m); } + void state_set(uint64_t m) { recovery_state.state_set(m); } + void state_clear(uint64_t m) { recovery_state.state_clear(m); } - public: - PG(OSDService *o, OSDMapRef curmap, - const PGPool &pool, spg_t p); - ~PG() override; - - private: - // Prevent copying - explicit PG(const PG& rhs); - PG& operator=(const PG& rhs); - const spg_t pg_id; - uint64_t peer_features; - uint64_t acting_features; - uint64_t upacting_features; - - epoch_t last_epoch; - - public: - const spg_t& get_pgid() const { return pg_id; } - - void reset_min_peer_features() { - peer_features = CEPH_FEATURES_SUPPORTED_DEFAULT; - } - uint64_t get_min_peer_features() const { return peer_features; } - void apply_peer_features(uint64_t f) { peer_features &= f; } - - uint64_t get_min_acting_features() const { return acting_features; } - uint64_t get_min_upacting_features() const { return upacting_features; } - bool perform_deletes_during_peering() const { - return !(get_osdmap()->test_flag(CEPH_OSDMAP_RECOVERY_DELETES)); - } - - void init_primary_up_acting( - const vector &newup, - const vector &newacting, - int new_up_primary, - int new_acting_primary) { - actingset.clear(); - acting = newacting; - for (uint8_t i = 0; i < acting.size(); ++i) { - if (acting[i] != CRUSH_ITEM_NONE) - actingset.insert( - pg_shard_t( - acting[i], - pool.info.ec_pool() ? shard_id_t(i) : shard_id_t::NO_SHARD)); - } - upset.clear(); - up = newup; - for (uint8_t i = 0; i < up.size(); ++i) { - if (up[i] != CRUSH_ITEM_NONE) - upset.insert( - pg_shard_t( - up[i], - pool.info.ec_pool() ? shard_id_t(i) : shard_id_t::NO_SHARD)); - } - if (!pool.info.ec_pool()) { - up_primary = pg_shard_t(new_up_primary, shard_id_t::NO_SHARD); - primary = pg_shard_t(new_acting_primary, shard_id_t::NO_SHARD); - return; - } - up_primary = pg_shard_t(); - primary = pg_shard_t(); - for (uint8_t i = 0; i < up.size(); ++i) { - if (up[i] == new_up_primary) { - up_primary = pg_shard_t(up[i], shard_id_t(i)); - break; - } - } - for (uint8_t i = 0; i < acting.size(); ++i) { - if (acting[i] == new_acting_primary) { - primary = pg_shard_t(acting[i], shard_id_t(i)); - break; - } - } - assert(up_primary.osd == new_up_primary); - assert(primary.osd == new_acting_primary); - } - pg_shard_t get_primary() const { return primary; } - - int get_role() const { return role; } - void set_role(int r) { role = r; } - - bool is_primary() const { return pg_whoami == primary; } - bool is_replica() const { return role > 0; } - - epoch_t get_last_peering_reset() const { return last_peering_reset; } - - //int get_state() const { return state; } - bool state_test(int m) const { return (state & m) != 0; } - void state_set(int m) { state |= m; } - void state_clear(int m) { state &= ~m; } - - bool is_complete() const { return info.last_complete == info.last_update; } - bool should_send_notify() const { return send_notify; } - - int get_state() const { return state; } - bool is_active() const { return state_test(PG_STATE_ACTIVE); } - bool is_activating() const { return state_test(PG_STATE_ACTIVATING); } - bool is_peering() const { return state_test(PG_STATE_PEERING); } - bool is_down() const { return state_test(PG_STATE_DOWN); } - bool is_recovery_unfound() const { return state_test(PG_STATE_RECOVERY_UNFOUND); } - bool is_backfill_unfound() const { return state_test(PG_STATE_BACKFILL_UNFOUND); } - bool is_incomplete() const { return state_test(PG_STATE_INCOMPLETE); } - bool is_clean() const { return state_test(PG_STATE_CLEAN); } - bool is_degraded() const { return state_test(PG_STATE_DEGRADED); } - bool is_undersized() const { return state_test(PG_STATE_UNDERSIZED); } - - bool is_scrubbing() const { return state_test(PG_STATE_SCRUBBING); } - bool is_remapped() const { return state_test(PG_STATE_REMAPPED); } - bool is_peered() const { - return state_test(PG_STATE_ACTIVE) || state_test(PG_STATE_PEERED); - } - - bool is_empty() const { return info.last_update == eversion_t(0,0); } + bool is_complete() const { + return recovery_state.is_complete(); + } + bool should_send_notify() const { + return recovery_state.should_send_notify(); + } - void init( - int role, - const vector& up, - int up_primary, - const vector& acting, - int acting_primary, - const pg_history_t& history, - const PastIntervals& pim, - bool backfill, - ObjectStore::Transaction *t); + bool is_active() const { return recovery_state.is_active(); } + bool is_activating() const { return recovery_state.is_activating(); } + bool is_peering() const { return recovery_state.is_peering(); } + bool is_down() const { return recovery_state.is_down(); } + bool is_recovery_unfound() const { return recovery_state.is_recovery_unfound(); } + bool is_backfill_unfound() const { return recovery_state.is_backfill_unfound(); } + bool is_incomplete() const { return recovery_state.is_incomplete(); } + bool is_clean() const { return recovery_state.is_clean(); } + bool is_degraded() const { return recovery_state.is_degraded(); } + bool is_undersized() const { return recovery_state.is_undersized(); } + bool is_scrubbing() const { return state_test(PG_STATE_SCRUBBING); } // Primary only + bool is_remapped() const { return recovery_state.is_remapped(); } + bool is_peered() const { return recovery_state.is_peered(); } + bool is_recovering() const { return recovery_state.is_recovering(); } + bool is_premerge() const { return recovery_state.is_premerge(); } + bool is_repair() const { return recovery_state.is_repair(); } + bool is_laggy() const { return state_test(PG_STATE_LAGGY); } + bool is_wait() const { return state_test(PG_STATE_WAIT); } + + bool is_empty() const { return recovery_state.is_empty(); } // pg on-disk state void do_pending_flush(); - static void _create(ObjectStore::Transaction& t, spg_t pgid, int bits); - static void _init(ObjectStore::Transaction& t, - spg_t pgid, const pg_pool_t *pool); - -private: - void prepare_write_info(map *km); - - void update_store_with_options(); - void update_store_on_load(); - public: - static int _prepare_write_info( - CephContext* cct, - map *km, - epoch_t epoch, + void prepare_write( pg_info_t &info, pg_info_t &last_written_info, PastIntervals &past_intervals, + PGLog &pglog, + bool dirty_info, bool dirty_big_info, - bool dirty_epoch, - bool try_fast_info, - PerfCounters *logger = nullptr); - void write_if_dirty(ObjectStore::Transaction& t); + bool need_write_epoch, + ObjectStore::Transaction &t) override; + + void write_if_dirty(PeeringCtx &rctx) { + write_if_dirty(rctx.transaction); + } +protected: + void write_if_dirty(ObjectStore::Transaction& t) { + recovery_state.write_if_dirty(t); + } PGLog::IndexedLog projected_log; bool check_in_progress_op( const osd_reqid_t &r, eversion_t *version, version_t *user_version, - int *return_code) const; + int *return_code, + std::vector *op_returns) const; eversion_t projected_last_update; eversion_t get_next_version() const { eversion_t at_version( - get_osdmap()->get_epoch(), + get_osdmap_epoch(), projected_last_update.version+1); - assert(at_version > info.last_update); - assert(at_version > pg_log.get_head()); - assert(at_version > projected_last_update); + ceph_assert(at_version > info.last_update); + ceph_assert(at_version > recovery_state.get_pg_log().get_head()); + ceph_assert(at_version > projected_last_update); return at_version; } - void add_log_entry(const pg_log_entry_t& e, bool applied); - void append_log( - const vector& logv, - eversion_t trim_to, - eversion_t roll_forward_to, - ObjectStore::Transaction &t, - bool transaction_applied = true); bool check_log_for_corruption(ObjectStore *store); - void trim_log(); std::string get_corrupt_pg_log_name() const; - static int read_info( - ObjectStore *store, spg_t pgid, const coll_t &coll, - bufferlist &bl, pg_info_t &info, PastIntervals &past_intervals, - __u8 &); - void read_state(ObjectStore *store, bufferlist &bl); - static bool _has_removal_flag(ObjectStore *store, spg_t pgid); - static int peek_map_epoch(ObjectStore *store, spg_t pgid, - epoch_t *pepoch, bufferlist *bl); + void update_snap_map( - const vector &log_entries, + const std::vector &log_entries, ObjectStore::Transaction& t); - void filter_snapc(vector &snaps); - - void log_weirdness(); + void filter_snapc(std::vector &snaps); virtual void kick_snap_trim() = 0; virtual void snap_trimmer_scrub_complete() = 0; - bool requeue_scrub(bool high_priority = false); - void queue_recovery(); - bool queue_scrub(); - unsigned get_scrub_priority(); - - /// share pg info after a pg is active - void share_pg_info(); + void queue_recovery(); + void queue_scrub_after_repair(); + unsigned int get_scrub_priority(); - bool append_log_entries_update_missing( - const mempool::osd_pglog::list &entries, - ObjectStore::Transaction &t); - - /** - * Merge entries updating missing as necessary on all - * actingbackfill logs and missings (also missing_loc) - */ - void merge_new_log_entries( - const mempool::osd_pglog::list &entries, - ObjectStore::Transaction &t); + bool try_flush_or_schedule_async() override; + void start_flush_on_transaction( + ObjectStore::Transaction &t) override; - void reset_interval_flush(); - void start_peering_interval( - const OSDMapRef lastmap, - const vector& newup, int up_primary, - const vector& newacting, int acting_primary, - ObjectStore::Transaction *t); - void on_new_interval(); - virtual void _on_new_interval() = 0; - void start_flush(ObjectStore::Transaction *t, - list *on_applied, - list *on_safe); - void set_last_peering_reset(); - bool pg_has_reset_since(epoch_t e) { - assert(is_locked()); - return deleting || e < get_last_peering_reset(); + void update_history(const pg_history_t& history) { + recovery_state.update_history(history); } - void update_history(const pg_history_t& history); - void fulfill_info(pg_shard_t from, const pg_query_t &query, - pair ¬ify_info); - void fulfill_log(pg_shard_t from, const pg_query_t &query, epoch_t query_epoch); - - void check_full_transition(OSDMapRef lastmap, OSDMapRef osdmap); - - bool should_restart_peering( - int newupprimary, - int newactingprimary, - const vector& newup, - const vector& newacting, - OSDMapRef lastmap, - OSDMapRef osdmap); - // OpRequest queueing bool can_discard_op(OpRequestRef& op); bool can_discard_scan(OpRequestRef op); @@ -2584,83 +1344,40 @@ public: bool can_discard_replica_op(OpRequestRef& op); bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch); - bool old_peering_evt(CephPeeringEvtRef evt) { + bool old_peering_evt(PGPeeringEventRef evt) { return old_peering_msg(evt->get_epoch_sent(), evt->get_epoch_requested()); } - static bool have_same_or_newer_map(epoch_t cur_epoch, epoch_t e) { - return e <= cur_epoch; - } bool have_same_or_newer_map(epoch_t e) { - return e <= get_osdmap()->get_epoch(); + return e <= get_osdmap_epoch(); } bool op_has_sufficient_caps(OpRequestRef& op); + // abstract bits + friend struct FlushState; - // recovery bits - void take_waiters(); - void queue_peering_event(CephPeeringEvtRef evt); - void handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx); - void queue_query(epoch_t msg_epoch, epoch_t query_epoch, - pg_shard_t from, const pg_query_t& q); - void queue_null(epoch_t msg_epoch, epoch_t query_epoch); - void queue_flushed(epoch_t started_at); - void handle_advance_map( - OSDMapRef osdmap, OSDMapRef lastmap, - vector& newup, int up_primary, - vector& newacting, int acting_primary, - RecoveryCtx *rctx); - void handle_activate_map(RecoveryCtx *rctx); - void handle_create(RecoveryCtx *rctx); - void handle_loaded(RecoveryCtx *rctx); - void handle_query_state(Formatter *f); - - virtual void on_removal(ObjectStore::Transaction *t) = 0; + friend ostream& operator<<(ostream& out, const PG& pg); +protected: + PeeringState recovery_state; - // abstract bits - virtual void do_request( - OpRequestRef& op, - ThreadPool::TPHandle &handle - ) = 0; + // ref to recovery_state.pool + const PGPool &pool; - virtual void do_op(OpRequestRef& op) = 0; - virtual void do_sub_op(OpRequestRef op) = 0; - virtual void do_sub_op_reply(OpRequestRef op) = 0; - virtual void do_scan( - OpRequestRef op, - ThreadPool::TPHandle &handle - ) = 0; - virtual void do_backfill(OpRequestRef op) = 0; - virtual void snap_trimmer(epoch_t epoch_queued) = 0; + // ref to recovery_state.info + const pg_info_t &info; - virtual int do_command( - cmdmap_t cmdmap, - ostream& ss, - bufferlist& idata, - bufferlist& odata, - ConnectionRef conn, - ceph_tid_t tid) = 0; - - virtual void on_role_change() = 0; - virtual void on_pool_change() = 0; - virtual void on_change(ObjectStore::Transaction *t) = 0; - virtual void on_activate() = 0; - virtual void on_flushed() = 0; - virtual void on_shutdown() = 0; - virtual void check_blacklisted_watchers() = 0; - virtual void get_watchers(std::list&) = 0; - virtual bool agent_work(int max) = 0; - virtual bool agent_work(int max, int agent_flush_quota) = 0; - virtual void agent_stop() = 0; - virtual void agent_delay() = 0; - virtual void agent_clear() = 0; - virtual void agent_choose_mode_restart() = 0; -}; +// ScrubberPasskey getters: +public: + const pg_info_t& get_pg_info(ScrubberPasskey) const { + return info; + } -ostream& operator<<(ostream& out, const PG& pg); + OSDService* get_pg_osd(ScrubberPasskey) const { + return osd; + } -ostream& operator<<(ostream& out, const PG::BackfillInterval& bi); +}; #endif