1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include <boost/scoped_ptr.hpp>
19 #include <boost/container/flat_set.hpp>
20 #include "include/mempool.h"
22 // re-include our assert to clobber boost's
23 #include "include/ceph_assert.h"
24 #include "include/common_fwd.h"
26 #include "include/types.h"
27 #include "include/stringify.h"
28 #include "osd_types.h"
29 #include "include/xlist.h"
30 #include "SnapMapper.h"
32 #include "common/Timer.h"
36 #include "include/str_list.h"
37 #include "PGBackend.h"
38 #include "PGPeeringEvent.h"
39 #include "PeeringState.h"
40 #include "recovery_types.h"
41 #include "MissingLoc.h"
42 #include "scrubber_common.h"
44 #include "mgr/OSDPerfMetricTypes.h"
52 //#define DEBUG_RECOVERY_OIDS // track std::set of recovering oids explicitly, to find counting bugs
53 //#define PG_DEBUG_REFS // track provenance of pg refs, helpful for finding leaks
58 struct OSDShardPGSlot
;
62 typedef OpRequest::Ref OpRequestRef
;
63 class DynamicPerfStats
;
69 class ReplicaReservations
;
70 class LocalReservation
;
71 class ReservedByRemotePrimary
;
72 enum class schedule_result_t
;
76 #include "common/tracked_int_ptr.hpp"
77 uint64_t get_with_id(PG
*pg
);
78 void put_with_id(PG
*pg
, uint64_t id
);
79 typedef TrackedIntPtr
<PG
> PGRef
;
81 typedef boost::intrusive_ptr
<PG
> PGRef
;
84 class PGRecoveryStats
{
85 struct per_state_info
{
86 uint64_t enter
, exit
; // enter/exit counts
88 utime_t event_time
; // time spent processing events
89 utime_t total_time
; // total time in state
90 utime_t min_time
, max_time
;
92 // cppcheck-suppress unreachableCode
93 per_state_info() : enter(0), exit(0), events(0) {}
95 std::map
<const char *,per_state_info
> info
;
96 ceph::mutex lock
= ceph::make_mutex("PGRecoverStats::lock");
99 PGRecoveryStats() = default;
102 std::lock_guard
l(lock
);
105 void dump(ostream
& out
) {
106 std::lock_guard
l(lock
);
107 for (std::map
<const char *,per_state_info
>::iterator p
= info
.begin(); p
!= info
.end(); ++p
) {
108 per_state_info
& i
= p
->second
;
109 out
<< i
.enter
<< "\t" << i
.exit
<< "\t"
110 << i
.events
<< "\t" << i
.event_time
<< "\t"
111 << i
.total_time
<< "\t"
112 << i
.min_time
<< "\t" << i
.max_time
<< "\t"
117 void dump_formatted(ceph::Formatter
*f
) {
118 std::lock_guard
l(lock
);
119 f
->open_array_section("pg_recovery_stats");
120 for (std::map
<const char *,per_state_info
>::iterator p
= info
.begin();
121 p
!= info
.end(); ++p
) {
122 per_state_info
& i
= p
->second
;
123 f
->open_object_section("recovery_state");
124 f
->dump_int("enter", i
.enter
);
125 f
->dump_int("exit", i
.exit
);
126 f
->dump_int("events", i
.events
);
127 f
->dump_stream("event_time") << i
.event_time
;
128 f
->dump_stream("total_time") << i
.total_time
;
129 f
->dump_stream("min_time") << i
.min_time
;
130 f
->dump_stream("max_time") << i
.max_time
;
131 std::vector
<std::string
> states
;
132 get_str_vec(p
->first
, "/", states
);
133 f
->open_array_section("nested_states");
134 for (std::vector
<std::string
>::iterator st
= states
.begin();
135 st
!= states
.end(); ++st
) {
136 f
->dump_string("state", *st
);
144 void log_enter(const char *s
) {
145 std::lock_guard
l(lock
);
148 void log_exit(const char *s
, utime_t dur
, uint64_t events
, utime_t event_dur
) {
149 std::lock_guard
l(lock
);
150 per_state_info
&i
= info
[s
];
153 if (dur
> i
.max_time
)
155 if (dur
< i
.min_time
|| i
.min_time
== utime_t())
158 i
.event_time
+= event_dur
;
162 /** PG - Replica Placement Group
166 class PG
: public DoutPrefixProvider
,
167 public PeeringState::PeeringListener
,
168 public Scrub::PgScrubBeListener
{
169 friend struct NamedState
;
170 friend class PeeringState
;
171 friend class PgScrubber
;
172 friend class ScrubBackend
;
175 const pg_shard_t pg_whoami
;
178 /// the 'scrubber'. Will be allocated in the derivative (PrimaryLogPG) ctor,
179 /// and be removed only in the PrimaryLogPG destructor.
180 std::unique_ptr
<ScrubPgIF
> m_scrubber
;
182 /// flags detailing scheduling/operation characteristics of the next scrub
183 requested_scrub_t m_planned_scrub
;
185 const requested_scrub_t
& get_planned_scrub() const {
186 return m_planned_scrub
;
189 /// scrubbing state for both Primary & replicas
190 bool is_scrub_active() const { return m_scrubber
->is_scrub_active(); }
192 /// set when the scrub request is queued, and reset after scrubbing fully
194 bool is_scrub_queued_or_active() const { return m_scrubber
->is_queued_or_active(); }
200 ObjectStore::CollectionHandle ch
;
203 std::ostream
& gen_prefix(std::ostream
& out
) const override
;
204 CephContext
*get_cct() const override
{
207 unsigned get_subsys() const override
{
208 return ceph_subsys_osd
;
211 const char* const get_current_state() const {
212 return recovery_state
.get_current_state();
215 const OSDMapRef
& get_osdmap() const {
216 ceph_assert(is_locked());
217 return recovery_state
.get_osdmap();
220 epoch_t
get_osdmap_epoch() const override final
{
221 return recovery_state
.get_osdmap()->get_epoch();
224 PerfCounters
&get_peering_perf() override
;
225 PerfCounters
&get_perf_logger() override
;
226 void log_state_enter(const char *state
) override
;
228 const char *state_name
, utime_t enter_time
,
229 uint64_t events
, utime_t event_dur
) override
;
231 void lock(bool no_lockdep
= false) const;
233 bool is_locked() const;
235 const spg_t
& get_pgid() const {
239 const PGPool
& get_pgpool() const final
{
242 uint64_t get_last_user_version() const {
243 return info
.last_user_version
;
245 const pg_history_t
& get_history() const {
248 bool get_need_up_thru() const {
249 return recovery_state
.get_need_up_thru();
251 epoch_t
get_same_interval_since() const {
252 return info
.history
.same_interval_since
;
255 bool is_waiting_for_unreadable_object() const final
257 return !waiting_for_unreadable_object
.empty();
260 static void set_last_scrub_stamp(
261 utime_t t
, pg_history_t
&history
, pg_stat_t
&stats
) {
262 stats
.last_scrub_stamp
= t
;
263 history
.last_scrub_stamp
= t
;
266 void set_last_scrub_stamp(utime_t t
) {
267 recovery_state
.update_stats(
268 [t
](auto &history
, auto &stats
) {
269 set_last_scrub_stamp(t
, history
, stats
);
274 static void set_last_deep_scrub_stamp(
275 utime_t t
, pg_history_t
&history
, pg_stat_t
&stats
) {
276 stats
.last_deep_scrub_stamp
= t
;
277 history
.last_deep_scrub_stamp
= t
;
280 void set_last_deep_scrub_stamp(utime_t t
) {
281 recovery_state
.update_stats(
282 [t
](auto &history
, auto &stats
) {
283 set_last_deep_scrub_stamp(t
, history
, stats
);
288 static void add_objects_scrubbed_count(
289 int64_t count
, pg_stat_t
&stats
) {
290 stats
.objects_scrubbed
+= count
;
293 void add_objects_scrubbed_count(int64_t count
) {
294 recovery_state
.update_stats(
295 [count
](auto &history
, auto &stats
) {
296 add_objects_scrubbed_count(count
, stats
);
301 static void reset_objects_scrubbed(pg_stat_t
&stats
) {
302 stats
.objects_scrubbed
= 0;
305 void reset_objects_scrubbed()
307 recovery_state
.update_stats([](auto& history
, auto& stats
) {
308 reset_objects_scrubbed(stats
);
313 bool is_deleting() const {
314 return recovery_state
.is_deleting();
316 bool is_deleted() const {
317 return recovery_state
.is_deleted();
319 bool is_nonprimary() const {
320 return recovery_state
.is_nonprimary();
322 bool is_primary() const {
323 return recovery_state
.is_primary();
325 bool pg_has_reset_since(epoch_t e
) {
326 ceph_assert(is_locked());
327 return recovery_state
.pg_has_reset_since(e
);
330 bool is_ec_pg() const {
331 return recovery_state
.is_ec_pg();
333 int get_role() const {
334 return recovery_state
.get_role();
336 const std::vector
<int> get_acting() const {
337 return recovery_state
.get_acting();
339 const std::set
<pg_shard_t
> &get_actingset() const {
340 return recovery_state
.get_actingset();
342 int get_acting_primary() const {
343 return recovery_state
.get_acting_primary();
345 pg_shard_t
get_primary() const final
{
346 return recovery_state
.get_primary();
348 const std::vector
<int> get_up() const {
349 return recovery_state
.get_up();
351 int get_up_primary() const {
352 return recovery_state
.get_up_primary();
354 const PastIntervals
& get_past_intervals() const {
355 return recovery_state
.get_past_intervals();
357 bool is_acting_recovery_backfill(pg_shard_t osd
) const {
358 return recovery_state
.is_acting_recovery_backfill(osd
);
360 const std::set
<pg_shard_t
> &get_acting_recovery_backfill() const {
361 return recovery_state
.get_acting_recovery_backfill();
363 bool is_acting(pg_shard_t osd
) const {
364 return recovery_state
.is_acting(osd
);
366 bool is_up(pg_shard_t osd
) const {
367 return recovery_state
.is_up(osd
);
369 static bool has_shard(bool ec
, const std::vector
<int>& v
, pg_shard_t osd
) {
370 return PeeringState::has_shard(ec
, v
, osd
);
373 /// initialize created PG
376 const std::vector
<int>& up
,
378 const std::vector
<int>& acting
,
380 const pg_history_t
& history
,
381 const PastIntervals
& pim
,
382 ObjectStore::Transaction
&t
);
384 /// read existing pg state off disk
385 void read_state(ObjectStore
*store
);
386 static int peek_map_epoch(ObjectStore
*store
, spg_t pgid
, epoch_t
*pepoch
);
388 static int get_latest_struct_v() {
389 return pg_latest_struct_v
;
391 static int get_compat_struct_v() {
392 return pg_compat_struct_v
;
394 static int read_info(
395 ObjectStore
*store
, spg_t pgid
, const coll_t
&coll
,
396 pg_info_t
&info
, PastIntervals
&past_intervals
,
398 static bool _has_removal_flag(ObjectStore
*store
, spg_t pgid
);
400 void rm_backoff(const ceph::ref_t
<Backoff
>& b
);
402 void update_snap_mapper_bits(uint32_t bits
) {
403 snap_mapper
.update_bits(bits
);
405 void start_split_stats(const std::set
<spg_t
>& childpgs
, std::vector
<object_stat_sum_t
> *v
);
406 virtual void split_colls(
410 const pg_pool_t
*pool
,
411 ObjectStore::Transaction
&t
) = 0;
412 void split_into(pg_t child_pgid
, PG
*child
, unsigned split_bits
);
413 void merge_from(std::map
<spg_t
,PGRef
>& sources
, PeeringCtx
&rctx
,
415 const pg_merge_meta_t
& last_pg_merge_meta
);
416 void finish_split_stats(const object_stat_sum_t
& stats
,
417 ObjectStore::Transaction
&t
);
419 void scrub(epoch_t queued
, ThreadPool::TPHandle
& handle
)
422 forward_scrub_event(&ScrubPgIF::initiate_regular_scrub
, queued
, "StartScrub");
426 * a special version of PG::scrub(), which:
427 * - is initiated after repair, and
428 * (not true anymore:)
429 * - is not required to allocate local/remote OSD scrub resources
431 void recovery_scrub(epoch_t queued
, ThreadPool::TPHandle
& handle
)
434 forward_scrub_event(&ScrubPgIF::initiate_scrub_after_repair
, queued
,
438 void replica_scrub(epoch_t queued
,
439 Scrub::act_token_t act_token
,
440 ThreadPool::TPHandle
& handle
);
442 void replica_scrub_resched(epoch_t queued
,
443 Scrub::act_token_t act_token
,
444 ThreadPool::TPHandle
& handle
)
446 forward_scrub_event(&ScrubPgIF::send_sched_replica
, queued
, act_token
,
450 void scrub_send_resources_granted(epoch_t queued
, ThreadPool::TPHandle
& handle
)
452 forward_scrub_event(&ScrubPgIF::send_remotes_reserved
, queued
, "RemotesReserved");
455 void scrub_send_resources_denied(epoch_t queued
, ThreadPool::TPHandle
& handle
)
457 forward_scrub_event(&ScrubPgIF::send_reservation_failure
, queued
,
458 "ReservationFailure");
461 void scrub_send_scrub_resched(epoch_t queued
, ThreadPool::TPHandle
& handle
)
463 forward_scrub_event(&ScrubPgIF::send_scrub_resched
, queued
, "InternalSchedScrub");
466 void scrub_send_pushes_update(epoch_t queued
, ThreadPool::TPHandle
& handle
)
468 forward_scrub_event(&ScrubPgIF::active_pushes_notification
, queued
,
472 void scrub_send_applied_update(epoch_t queued
, ThreadPool::TPHandle
& handle
)
474 forward_scrub_event(&ScrubPgIF::update_applied_notification
, queued
,
478 void scrub_send_unblocking(epoch_t queued
, ThreadPool::TPHandle
& handle
)
480 forward_scrub_event(&ScrubPgIF::send_scrub_unblock
, queued
, "Unblocked");
483 void scrub_send_digest_update(epoch_t queued
, ThreadPool::TPHandle
& handle
)
485 forward_scrub_event(&ScrubPgIF::digest_update_notification
, queued
, "DigestUpdate");
488 void scrub_send_local_map_ready(epoch_t queued
, ThreadPool::TPHandle
& handle
)
490 forward_scrub_event(&ScrubPgIF::send_local_map_done
, queued
, "IntLocalMapDone");
493 void scrub_send_replmaps_ready(epoch_t queued
, ThreadPool::TPHandle
& handle
)
495 forward_scrub_event(&ScrubPgIF::send_replica_maps_ready
, queued
, "GotReplicas");
498 void scrub_send_replica_pushes(epoch_t queued
, ThreadPool::TPHandle
& handle
)
500 forward_scrub_event(&ScrubPgIF::send_replica_pushes_upd
, queued
,
504 void scrub_send_get_next_chunk(epoch_t queued
, ThreadPool::TPHandle
& handle
)
506 forward_scrub_event(&ScrubPgIF::send_get_next_chunk
, queued
, "NextChunk");
509 void scrub_send_scrub_is_finished(epoch_t queued
, ThreadPool::TPHandle
& handle
)
511 forward_scrub_event(&ScrubPgIF::send_scrub_is_finished
, queued
, "ScrubFinished");
514 void scrub_send_chunk_free(epoch_t queued
, ThreadPool::TPHandle
& handle
)
516 forward_scrub_event(&ScrubPgIF::send_chunk_free
, queued
, "SelectedChunkFree");
519 void scrub_send_chunk_busy(epoch_t queued
, ThreadPool::TPHandle
& handle
)
521 forward_scrub_event(&ScrubPgIF::send_chunk_busy
, queued
, "ChunkIsBusy");
524 void queue_want_pg_temp(const std::vector
<int> &wanted
) override
;
525 void clear_want_pg_temp() override
;
527 void on_new_interval() override
;
529 void on_role_change() override
;
530 virtual void plpg_on_role_change() = 0;
532 void init_collection_pool_opts();
533 void on_pool_change() override
;
534 virtual void plpg_on_pool_change() = 0;
536 void on_info_history_change() override
;
538 void on_primary_status_change(bool was_primary
, bool now_primary
) override
;
540 void reschedule_scrub() override
;
542 void scrub_requested(scrub_level_t scrub_level
, scrub_type_t scrub_type
) override
;
544 uint64_t get_snap_trimq_size() const override
{
545 return snap_trimq
.size();
548 static void add_objects_trimmed_count(
549 int64_t count
, pg_stat_t
&stats
) {
550 stats
.objects_trimmed
+= count
;
553 void add_objects_trimmed_count(int64_t count
) {
554 recovery_state
.update_stats_wo_resched(
555 [count
](auto &history
, auto &stats
) {
556 add_objects_trimmed_count(count
, stats
);
560 static void reset_objects_trimmed(pg_stat_t
&stats
) {
561 stats
.objects_trimmed
= 0;
564 void reset_objects_trimmed() {
565 recovery_state
.update_stats_wo_resched(
566 [](auto &history
, auto &stats
) {
567 reset_objects_trimmed(stats
);
571 utime_t snaptrim_begin_stamp
;
573 void set_snaptrim_begin_stamp() {
574 snaptrim_begin_stamp
= ceph_clock_now();
577 void set_snaptrim_duration() {
578 utime_t cur_stamp
= ceph_clock_now();
579 utime_t duration
= cur_stamp
- snaptrim_begin_stamp
;
580 recovery_state
.update_stats_wo_resched(
581 [duration
](auto &history
, auto &stats
) {
582 stats
.snaptrim_duration
= double(duration
);
586 unsigned get_target_pg_log_entries() const override
;
588 void clear_publish_stats() override
;
589 void clear_primary_state() override
;
591 epoch_t
cluster_osdmap_trim_lower_bound() override
;
592 OstreamTemp
get_clog_error() override
;
593 OstreamTemp
get_clog_info() override
;
594 OstreamTemp
get_clog_debug() override
;
596 void schedule_event_after(
597 PGPeeringEventRef event
,
598 float delay
) override
;
599 void request_local_background_io_reservation(
601 PGPeeringEventURef on_grant
,
602 PGPeeringEventURef on_preempt
) override
;
603 void update_local_background_io_priority(
604 unsigned priority
) override
;
605 void cancel_local_background_io_reservation() override
;
607 void request_remote_recovery_reservation(
609 PGPeeringEventURef on_grant
,
610 PGPeeringEventURef on_preempt
) override
;
611 void cancel_remote_recovery_reservation() override
;
613 void schedule_event_on_commit(
614 ObjectStore::Transaction
&t
,
615 PGPeeringEventRef on_commit
) override
;
617 void on_active_exit() override
;
619 Context
*on_clean() override
{
623 requeue_ops(waiting_for_clean_to_primary_repair
);
624 return finish_recovery();
627 void on_activate(interval_set
<snapid_t
> snaps
) override
;
629 void on_activate_committed() override
;
631 void on_active_actmap() override
;
632 void on_active_advmap(const OSDMapRef
&osdmap
) override
;
634 void queue_snap_retrim(snapid_t snap
);
636 void on_backfill_reserved() override
;
637 void on_backfill_canceled() override
;
638 void on_recovery_reserved() override
;
640 bool is_forced_recovery_or_backfill() const {
641 return recovery_state
.is_forced_recovery_or_backfill();
644 PGLog::LogEntryHandlerRef
get_log_handler(
645 ObjectStore::Transaction
&t
) override
{
646 return std::make_unique
<PG::PGLogEntryHandler
>(this, &t
);
649 std::pair
<ghobject_t
, bool> do_delete_work(ObjectStore::Transaction
&t
,
650 ghobject_t _next
) override
;
652 void clear_ready_to_merge() override
;
653 void set_not_ready_to_merge_target(pg_t pgid
, pg_t src
) override
;
654 void set_not_ready_to_merge_source(pg_t pgid
) override
;
655 void set_ready_to_merge_target(eversion_t lu
, epoch_t les
, epoch_t lec
) override
;
656 void set_ready_to_merge_source(eversion_t lu
) override
;
658 void send_pg_created(pg_t pgid
) override
;
660 ceph::signedspan
get_mnow() const override
;
661 HeartbeatStampsRef
get_hb_stamps(int peer
) override
;
662 void schedule_renew_lease(epoch_t lpr
, ceph::timespan delay
) override
;
663 void queue_check_readable(epoch_t lpr
, ceph::timespan delay
) override
;
665 void rebuild_missing_set_with_deletes(PGLog
&pglog
) override
;
667 void queue_peering_event(PGPeeringEventRef evt
);
668 void do_peering_event(PGPeeringEventRef evt
, PeeringCtx
&rcx
);
669 void queue_null(epoch_t msg_epoch
, epoch_t query_epoch
);
670 void queue_flushed(epoch_t started_at
);
671 void handle_advance_map(
672 OSDMapRef osdmap
, OSDMapRef lastmap
,
673 std::vector
<int>& newup
, int up_primary
,
674 std::vector
<int>& newacting
, int acting_primary
,
676 void handle_activate_map(PeeringCtx
&rctx
);
677 void handle_initialize(PeeringCtx
&rxcx
);
678 void handle_query_state(ceph::Formatter
*f
);
681 * @param ops_begun returns how many recovery ops the function started
682 * @returns true if any useful work was accomplished; false otherwise
684 virtual bool start_recovery_ops(
686 ThreadPool::TPHandle
&handle
,
687 uint64_t *ops_begun
) = 0;
689 // more work after the above, but with a PeeringCtx
690 void find_unfound(epoch_t queued
, PeeringCtx
&rctx
);
692 virtual void get_watchers(std::list
<obj_watch_item_t
> *ls
) = 0;
694 void dump_pgstate_history(ceph::Formatter
*f
);
695 void dump_missing(ceph::Formatter
*f
);
697 void with_pg_stats(ceph::coarse_real_clock::time_point now_is
,
698 std::function
<void(const pg_stat_t
&, epoch_t lec
)>&& f
);
699 void with_heartbeat_peers(std::function
<void(int)>&& f
);
702 virtual void on_shutdown() = 0;
704 bool get_must_scrub() const;
705 Scrub::schedule_result_t
sched_scrub();
707 unsigned int scrub_requeue_priority(Scrub::scrub_prio_t with_priority
, unsigned int suggested_priority
) const;
708 /// the version that refers to flags_.priority
709 unsigned int scrub_requeue_priority(Scrub::scrub_prio_t with_priority
) const;
711 // auxiliaries used by sched_scrub():
712 double next_deepscrub_interval() const;
714 /// should we perform deep scrub?
715 bool is_time_for_deep(bool allow_deep_scrub
,
716 bool allow_shallow_scrub
,
717 bool has_deep_errors
,
718 const requested_scrub_t
& planned
) const;
721 * Validate the various 'next scrub' flags in m_planned_scrub against configuration
722 * and scrub-related timestamps.
724 * @returns an updated copy of the m_planned_flags (or nothing if no scrubbing)
726 std::optional
<requested_scrub_t
> validate_scrub_mode() const;
728 std::optional
<requested_scrub_t
> validate_periodic_mode(
729 bool allow_deep_scrub
,
730 bool try_to_auto_repair
,
731 bool allow_shallow_scrub
,
733 bool has_deep_errors
,
734 const requested_scrub_t
& planned
) const;
736 std::optional
<requested_scrub_t
> validate_initiated_scrub(
737 bool allow_deep_scrub
,
738 bool try_to_auto_repair
,
740 bool has_deep_errors
,
741 const requested_scrub_t
& planned
) const;
743 using ScrubAPI
= void (ScrubPgIF::*)(epoch_t epoch_queued
);
744 void forward_scrub_event(ScrubAPI fn
, epoch_t epoch_queued
, std::string_view desc
);
745 // and for events that carry a meaningful 'activation token'
746 using ScrubSafeAPI
= void (ScrubPgIF::*)(epoch_t epoch_queued
,
747 Scrub::act_token_t act_token
);
748 void forward_scrub_event(ScrubSafeAPI fn
,
749 epoch_t epoch_queued
,
750 Scrub::act_token_t act_token
,
751 std::string_view desc
);
754 virtual void do_request(
756 ThreadPool::TPHandle
&handle
758 virtual void clear_cache() = 0;
759 virtual int get_cache_obj_count() = 0;
761 virtual void snap_trimmer(epoch_t epoch_queued
) = 0;
762 virtual void do_command(
763 const std::string_view
& prefix
,
764 const cmdmap_t
& cmdmap
,
765 const ceph::buffer::list
& idata
,
766 std::function
<void(int,const std::string
&,ceph::buffer::list
&)> on_finish
) = 0;
768 virtual bool agent_work(int max
) = 0;
769 virtual bool agent_work(int max
, int agent_flush_quota
) = 0;
770 virtual void agent_stop() = 0;
771 virtual void agent_delay() = 0;
772 virtual void agent_clear() = 0;
773 virtual void agent_choose_mode_restart() = 0;
775 struct C_DeleteMore
: public Context
{
778 C_DeleteMore(PG
*p
, epoch_t e
) : pg(p
), epoch(e
) {}
779 void finish(int r
) override
{
782 void complete(int r
) override
;
785 virtual void set_dynamic_perf_stats_queries(
786 const std::list
<OSDPerfMetricQuery
> &queries
) {
788 virtual void get_dynamic_perf_stats(DynamicPerfStats
*stats
) {
791 uint64_t get_min_alloc_size() const;
793 // reference counting
795 uint64_t get_with_id();
796 void put_with_id(uint64_t);
797 void dump_live_ids();
799 void get(const char* tag
);
800 void put(const char* tag
);
806 PG(OSDService
*o
, OSDMapRef curmap
,
807 const PGPool
&pool
, spg_t p
);
811 explicit PG(const PG
& rhs
) = delete;
812 PG
& operator=(const PG
& rhs
) = delete;
819 OSDShard
*osd_shard
= nullptr;
820 OSDShardPGSlot
*pg_slot
= nullptr;
824 // locking and reference counting.
825 // I destroy myself when the reference count hits zero.
826 // lock() should be called before doing anything.
827 // get() should be called on pointer copy (to another thread, etc.).
828 // put() should be called on destruction of some previously copied pointer.
829 // unlock() when done with the current pointer (_most common_).
830 mutable ceph::mutex _lock
= ceph::make_mutex("PG::_lock");
831 #ifndef CEPH_DEBUG_MUTEX
832 mutable std::thread::id locked_by
;
834 std::atomic
<unsigned int> ref
{0};
837 ceph::mutex _ref_id_lock
= ceph::make_mutex("PG::_ref_id_lock");
838 std::map
<uint64_t, std::string
> _live_ids
;
839 std::map
<std::string
, uint64_t> _tag_counts
;
840 uint64_t _ref_id
= 0;
842 friend uint64_t get_with_id(PG
*pg
) { return pg
->get_with_id(); }
843 friend void put_with_id(PG
*pg
, uint64_t id
) { return pg
->put_with_id(id
); }
847 friend void intrusive_ptr_add_ref(PG
*pg
) {
850 friend void intrusive_ptr_release(PG
*pg
) {
855 // =====================
859 SnapMapper snap_mapper
;
861 virtual PGBackend
*get_pgbackend() = 0;
862 virtual const PGBackend
* get_pgbackend() const = 0;
865 void requeue_map_waiters();
869 ZTracer::Endpoint trace_endpoint
;
873 __u8 info_struct_v
= 0;
874 void upgrade(ObjectStore
*store
);
877 ghobject_t pgmeta_oid
;
879 // ------------------
880 interval_set
<snapid_t
> snap_trimq
;
881 std::set
<snapid_t
> snap_trimq_repeat
;
883 /* You should not use these items without taking their respective queue locks
884 * (if they have one) */
885 xlist
<PG
*>::item stat_queue_item
;
886 bool recovery_queued
;
888 int recovery_ops_active
;
889 std::set
<pg_shard_t
> waiting_on_backfill
;
890 #ifdef DEBUG_RECOVERY_OIDS
891 multiset
<hobject_t
> recovering_oids
;
895 bool dne() { return info
.dne(); }
897 void send_cluster_message(
898 int osd
, MessageRef m
, epoch_t epoch
, bool share_map_update
) override
;
901 epoch_t
get_last_peering_reset() const {
902 return recovery_state
.get_last_peering_reset();
905 /* heartbeat peers */
906 void set_probe_targets(const std::set
<pg_shard_t
> &probe_set
) override
;
907 void clear_probe_targets() override
;
909 ceph::mutex heartbeat_peer_lock
=
910 ceph::make_mutex("PG::heartbeat_peer_lock");
911 std::set
<int> heartbeat_peers
;
912 std::set
<int> probe_targets
;
915 BackfillInterval backfill_info
;
916 std::map
<pg_shard_t
, BackfillInterval
> peer_backfill_info
;
917 bool backfill_reserving
;
919 // The primary's num_bytes and local num_bytes for this pg, only valid
920 // during backfill for non-primary shards.
921 // Both of these are adjusted for EC to reflect the on-disk bytes
922 std::atomic
<int64_t> primary_num_bytes
= 0;
923 std::atomic
<int64_t> local_num_bytes
= 0;
926 // Space reserved for backfill is primary_num_bytes - local_num_bytes
927 // Don't care that difference itself isn't atomic
928 uint64_t get_reserved_num_bytes() {
929 int64_t primary
= primary_num_bytes
.load();
930 int64_t local
= local_num_bytes
.load();
932 return primary
- local
;
937 bool is_remote_backfilling() {
938 return primary_num_bytes
.load() > 0;
941 bool try_reserve_recovery_space(int64_t primary
, int64_t local
) override
;
942 void unreserve_recovery_space() override
;
944 // If num_bytes are inconsistent and local_num- goes negative
945 // it's ok, because it would then be ignored.
947 // The value of num_bytes could be negative,
948 // but we don't let local_num_bytes go negative.
949 void add_local_num_bytes(int64_t num_bytes
) {
951 int64_t prev_bytes
= local_num_bytes
.load();
954 new_bytes
= prev_bytes
+ num_bytes
;
957 } while(!local_num_bytes
.compare_exchange_weak(prev_bytes
, new_bytes
));
960 void sub_local_num_bytes(int64_t num_bytes
) {
961 ceph_assert(num_bytes
>= 0);
963 int64_t prev_bytes
= local_num_bytes
.load();
966 new_bytes
= prev_bytes
- num_bytes
;
969 } while(!local_num_bytes
.compare_exchange_weak(prev_bytes
, new_bytes
));
972 // The value of num_bytes could be negative,
973 // but we don't let info.stats.stats.sum.num_bytes go negative.
974 void add_num_bytes(int64_t num_bytes
) {
975 ceph_assert(ceph_mutex_is_locked_by_me(_lock
));
977 recovery_state
.update_stats(
978 [num_bytes
](auto &history
, auto &stats
) {
979 stats
.stats
.sum
.num_bytes
+= num_bytes
;
980 if (stats
.stats
.sum
.num_bytes
< 0) {
981 stats
.stats
.sum
.num_bytes
= 0;
987 void sub_num_bytes(int64_t num_bytes
) {
988 ceph_assert(ceph_mutex_is_locked_by_me(_lock
));
989 ceph_assert(num_bytes
>= 0);
991 recovery_state
.update_stats(
992 [num_bytes
](auto &history
, auto &stats
) {
993 stats
.stats
.sum
.num_bytes
-= num_bytes
;
994 if (stats
.stats
.sum
.num_bytes
< 0) {
995 stats
.stats
.sum
.num_bytes
= 0;
1002 // Only used in testing so not worried about needing the PG lock here
1003 int64_t get_stats_num_bytes() {
1004 std::lock_guard l
{_lock
};
1005 int num_bytes
= info
.stats
.stats
.sum
.num_bytes
;
1006 if (pool
.info
.is_erasure()) {
1007 num_bytes
/= (int)get_pgbackend()->get_ec_data_chunk_count();
1008 // Round up each object by a stripe
1009 num_bytes
+= get_pgbackend()->get_ec_stripe_chunk_size() * info
.stats
.stats
.sum
.num_objects
;
1011 int64_t lnb
= local_num_bytes
.load();
1012 if (lnb
&& lnb
!= num_bytes
) {
1013 lgeneric_dout(cct
, 0) << this << " " << info
.pgid
<< " num_bytes mismatch "
1014 << lnb
<< " vs stats "
1015 << info
.stats
.stats
.sum
.num_bytes
<< " / chunk "
1016 << get_pgbackend()->get_ec_data_chunk_count()
1025 * blocked request wait hierarchy
1027 * In order to preserve request ordering we need to be careful about the
1028 * order in which blocked requests get requeued. Generally speaking, we
1029 * push the requests back up to the op_wq in reverse order (most recent
1030 * request first) so that they come back out again in the original order.
1031 * However, because there are multiple wait queues, we need to requeue
1032 * waitlists in order. Generally speaking, we requeue the wait lists
1033 * that are checked first.
1035 * Here are the various wait lists, in the order they are used during
1036 * request processing, with notes:
1039 * - may start or stop blocking at any time (depending on client epoch)
1040 * - waiting_for_peered
1042 * - only starts blocking on interval change; never restarts
1043 * - waiting_for_flush
1044 * - flushes_in_progress
1045 * - waiting for final flush during activate
1046 * - waiting_for_active
1048 * - only starts blocking on interval change; never restarts
1049 * - waiting_for_readable
1050 * - now > readable_until
1051 * - unblocks when we get fresh(er) osd_pings
1052 * - waiting_for_scrub
1053 * - starts and stops blocking for varying intervals during scrub
1054 * - waiting_for_unreadable_object
1055 * - never restarts once object is readable (* except for EIO?)
1056 * - waiting_for_degraded_object
1057 * - never restarts once object is writeable (* except for EIO?)
1058 * - waiting_for_blocked_object
1059 * - starts and stops based on proxied op activity
1061 * - starts and stops based on read/write activity
1065 * 1. During and interval change, we requeue *everything* in the above order.
1067 * 2. When an obc rwlock is released, we check for a scrub block and requeue
1068 * the op there if it applies. We ignore the unreadable/degraded/blocked
1069 * queues because we assume they cannot apply at that time (this is
1070 * probably mostly true).
1072 * 3. The requeue_ops helper will push ops onto the waiting_for_map std::list if
1075 * These three behaviors are generally sufficient to maintain ordering, with
1076 * the possible exception of cases where we make an object degraded or
1077 * unreadable that was previously okay, e.g. when scrub or op processing
1078 * encounter an unexpected error. FIXME.
1081 // ops with newer maps than our (or blocked behind them)
1082 // track these by client, since inter-request ordering doesn't otherwise
1084 std::unordered_map
<entity_name_t
,std::list
<OpRequestRef
>> waiting_for_map
;
1086 // ops waiting on peered
1087 std::list
<OpRequestRef
> waiting_for_peered
;
1089 /// ops waiting on readble
1090 std::list
<OpRequestRef
> waiting_for_readable
;
1092 // ops waiting on active (require peered as well)
1093 std::list
<OpRequestRef
> waiting_for_active
;
1094 std::list
<OpRequestRef
> waiting_for_flush
;
1095 std::list
<OpRequestRef
> waiting_for_scrub
;
1097 std::list
<OpRequestRef
> waiting_for_cache_not_full
;
1098 std::list
<OpRequestRef
> waiting_for_clean_to_primary_repair
;
1099 std::map
<hobject_t
, std::list
<OpRequestRef
>> waiting_for_unreadable_object
,
1100 waiting_for_degraded_object
,
1101 waiting_for_blocked_object
;
1103 std::set
<hobject_t
> objects_blocked_on_cache_full
;
1104 std::map
<hobject_t
,snapid_t
> objects_blocked_on_degraded_snap
;
1105 std::map
<hobject_t
,ObjectContextRef
> objects_blocked_on_snap_promotion
;
1107 // Callbacks should assume pg (and nothing else) is locked
1108 std::map
<hobject_t
, std::list
<Context
*>> callbacks_for_degraded_object
;
1110 std::map
<eversion_t
,
1112 std::tuple
<OpRequestRef
, version_t
, int,
1113 std::vector
<pg_log_op_return_item_t
>>>> waiting_for_ondisk
;
1115 void requeue_object_waiters(std::map
<hobject_t
, std::list
<OpRequestRef
>>& m
);
1116 void requeue_op(OpRequestRef op
);
1117 void requeue_ops(std::list
<OpRequestRef
> &l
);
1119 // stats that persist lazily
1120 object_stat_collection_t unstable_stats
;
1123 ceph::mutex pg_stats_publish_lock
=
1124 ceph::make_mutex("PG::pg_stats_publish_lock");
1125 std::optional
<pg_stat_t
> pg_stats_publish
;
1127 friend class TestOpsSocketHook
;
1128 void publish_stats_to_osd() override
;
1130 bool needs_recovery() const {
1131 return recovery_state
.needs_recovery();
1133 bool needs_backfill() const {
1134 return recovery_state
.needs_backfill();
1137 bool all_unfound_are_queried_or_lost(const OSDMapRef osdmap
) const;
1139 struct PGLogEntryHandler
: public PGLog::LogEntryHandler
{
1141 ObjectStore::Transaction
*t
;
1142 PGLogEntryHandler(PG
*pg
, ObjectStore::Transaction
*t
) : pg(pg
), t(t
) {}
1145 void remove(const hobject_t
&hoid
) override
{
1146 pg
->get_pgbackend()->remove(hoid
, t
);
1148 void try_stash(const hobject_t
&hoid
, version_t v
) override
{
1149 pg
->get_pgbackend()->try_stash(hoid
, v
, t
);
1151 void rollback(const pg_log_entry_t
&entry
) override
{
1152 ceph_assert(entry
.can_rollback());
1153 pg
->get_pgbackend()->rollback(entry
, t
);
1155 void rollforward(const pg_log_entry_t
&entry
) override
{
1156 pg
->get_pgbackend()->rollforward(entry
, t
);
1158 void trim(const pg_log_entry_t
&entry
) override
{
1159 pg
->get_pgbackend()->trim(entry
, t
);
1163 void update_object_snap_mapping(
1164 ObjectStore::Transaction
*t
, const hobject_t
&soid
,
1165 const std::set
<snapid_t
> &snaps
);
1166 void clear_object_snap_mapping(
1167 ObjectStore::Transaction
*t
, const hobject_t
&soid
);
1168 void remove_snap_mapped_object(
1169 ObjectStore::Transaction
& t
, const hobject_t
& soid
);
1171 bool have_unfound() const {
1172 return recovery_state
.have_unfound();
1174 uint64_t get_num_unfound() const {
1175 return recovery_state
.get_num_unfound();
1178 virtual void check_local() = 0;
1180 void purge_strays();
1182 void update_heartbeat_peers(std::set
<int> peers
) override
;
1184 Context
*finish_sync_event
;
1186 Context
*finish_recovery();
1187 void _finish_recovery(Context
*c
);
1188 struct C_PG_FinishRecovery
: public Context
{
1190 explicit C_PG_FinishRecovery(PG
*p
) : pg(p
) {}
1191 void finish(int r
) override
{
1192 pg
->_finish_recovery(this);
1195 void cancel_recovery();
1196 void clear_recovery_state();
1197 virtual void _clear_recovery_state() = 0;
1198 void start_recovery_op(const hobject_t
& soid
);
1199 void finish_recovery_op(const hobject_t
& soid
, bool dequeue
=false);
1201 virtual void _split_into(pg_t child_pgid
, PG
*child
, unsigned split_bits
) = 0;
1203 friend class C_OSD_RepModify_Commit
;
1204 friend struct C_DeleteMore
;
1207 ceph::mutex backoff_lock
= // orders inside Backoff::lock
1208 ceph::make_mutex("PG::backoff_lock");
1209 std::map
<hobject_t
,std::set
<ceph::ref_t
<Backoff
>>> backoffs
;
1211 void add_backoff(const ceph::ref_t
<Session
>& s
, const hobject_t
& begin
, const hobject_t
& end
);
1212 void release_backoffs(const hobject_t
& begin
, const hobject_t
& end
);
1213 void release_backoffs(const hobject_t
& o
) {
1214 release_backoffs(o
, o
);
1216 void clear_backoffs();
1218 void add_pg_backoff(const ceph::ref_t
<Session
>& s
) {
1219 hobject_t begin
= info
.pgid
.pgid
.get_hobj_start();
1220 hobject_t end
= info
.pgid
.pgid
.get_hobj_end(pool
.info
.get_pg_num());
1221 add_backoff(s
, begin
, end
);
1224 void release_pg_backoffs() {
1225 hobject_t begin
= info
.pgid
.pgid
.get_hobj_start();
1226 hobject_t end
= info
.pgid
.pgid
.get_hobj_end(pool
.info
.get_pg_num());
1227 release_backoffs(begin
, end
);
1232 bool scrub_after_recovery
;
1236 [[nodiscard
]] bool ops_blocked_by_scrub() const;
1237 [[nodiscard
]] Scrub::scrub_prio_t
is_scrub_blocking_ops() const;
1239 void _scan_rollback_obs(const std::vector
<ghobject_t
> &rollback_obs
);
1241 * returns true if [begin, end) is good to scrub at this time
1242 * a false return value obliges the implementer to requeue scrub when the
1243 * condition preventing scrub clears
1245 virtual bool _range_available_for_scrub(
1246 const hobject_t
&begin
, const hobject_t
&end
) = 0;
1249 * Initiate the process that will create our scrub map for the Primary.
1250 * (triggered by MSG_OSD_REP_SCRUB)
1252 void replica_scrub(OpRequestRef op
, ThreadPool::TPHandle
&handle
);
1254 // -- recovery state --
1256 struct QueuePeeringEvt
: Context
{
1258 PGPeeringEventRef evt
;
1260 template <class EVT
>
1261 QueuePeeringEvt(PG
*pg
, epoch_t epoch
, EVT evt
) :
1262 pg(pg
), evt(std::make_shared
<PGPeeringEvent
>(epoch
, epoch
, evt
)) {}
1264 QueuePeeringEvt(PG
*pg
, PGPeeringEventRef evt
) :
1265 pg(pg
), evt(std::move(evt
)) {}
1267 void finish(int r
) override
{
1269 pg
->queue_peering_event(std::move(evt
));
1276 int pg_stat_adjust(osd_stat_t
*new_stat
);
1278 bool delete_needs_sleep
= false;
1281 bool state_test(uint64_t m
) const { return recovery_state
.state_test(m
); }
1282 void state_set(uint64_t m
) { recovery_state
.state_set(m
); }
1283 void state_clear(uint64_t m
) { recovery_state
.state_clear(m
); }
1285 bool is_complete() const {
1286 return recovery_state
.is_complete();
1288 bool should_send_notify() const {
1289 return recovery_state
.should_send_notify();
1292 bool is_active() const { return recovery_state
.is_active(); }
1293 bool is_activating() const { return recovery_state
.is_activating(); }
1294 bool is_peering() const { return recovery_state
.is_peering(); }
1295 bool is_down() const { return recovery_state
.is_down(); }
1296 bool is_recovery_unfound() const { return recovery_state
.is_recovery_unfound(); }
1297 bool is_backfill_unfound() const { return recovery_state
.is_backfill_unfound(); }
1298 bool is_incomplete() const { return recovery_state
.is_incomplete(); }
1299 bool is_clean() const { return recovery_state
.is_clean(); }
1300 bool is_degraded() const { return recovery_state
.is_degraded(); }
1301 bool is_undersized() const { return recovery_state
.is_undersized(); }
1302 bool is_scrubbing() const { return state_test(PG_STATE_SCRUBBING
); } // Primary only
1303 bool is_remapped() const { return recovery_state
.is_remapped(); }
1304 bool is_peered() const { return recovery_state
.is_peered(); }
1305 bool is_recovering() const { return recovery_state
.is_recovering(); }
1306 bool is_premerge() const { return recovery_state
.is_premerge(); }
1307 bool is_repair() const { return recovery_state
.is_repair(); }
1308 bool is_laggy() const { return state_test(PG_STATE_LAGGY
); }
1309 bool is_wait() const { return state_test(PG_STATE_WAIT
); }
1311 bool is_empty() const { return recovery_state
.is_empty(); }
1314 void do_pending_flush();
1319 pg_info_t
&last_written_info
,
1320 PastIntervals
&past_intervals
,
1323 bool dirty_big_info
,
1324 bool need_write_epoch
,
1325 ObjectStore::Transaction
&t
) override
;
1327 void write_if_dirty(PeeringCtx
&rctx
) {
1328 write_if_dirty(rctx
.transaction
);
1331 void write_if_dirty(ObjectStore::Transaction
& t
) {
1332 recovery_state
.write_if_dirty(t
);
1335 PGLog::IndexedLog projected_log
;
1336 bool check_in_progress_op(
1337 const osd_reqid_t
&r
,
1338 eversion_t
*version
,
1339 version_t
*user_version
,
1341 std::vector
<pg_log_op_return_item_t
> *op_returns
) const;
1342 eversion_t projected_last_update
;
1343 eversion_t
get_next_version() const {
1344 eversion_t
at_version(
1346 projected_last_update
.version
+1);
1347 ceph_assert(at_version
> info
.last_update
);
1348 ceph_assert(at_version
> recovery_state
.get_pg_log().get_head());
1349 ceph_assert(at_version
> projected_last_update
);
1353 bool check_log_for_corruption(ObjectStore
*store
);
1355 std::string
get_corrupt_pg_log_name() const;
1357 void update_snap_map(
1358 const std::vector
<pg_log_entry_t
> &log_entries
,
1359 ObjectStore::Transaction
& t
);
1361 void filter_snapc(std::vector
<snapid_t
> &snaps
);
1363 virtual void kick_snap_trim() = 0;
1364 virtual void snap_trimmer_scrub_complete() = 0;
1366 void queue_recovery();
1367 void queue_scrub_after_repair();
1368 unsigned int get_scrub_priority();
1370 bool try_flush_or_schedule_async() override
;
1371 void start_flush_on_transaction(
1372 ObjectStore::Transaction
&t
) override
;
1374 void update_history(const pg_history_t
& history
) {
1375 recovery_state
.update_history(history
);
1378 // OpRequest queueing
1379 bool can_discard_op(OpRequestRef
& op
);
1380 bool can_discard_scan(OpRequestRef op
);
1381 bool can_discard_backfill(OpRequestRef op
);
1382 bool can_discard_request(OpRequestRef
& op
);
1384 template<typename T
, int MSGTYPE
>
1385 bool can_discard_replica_op(OpRequestRef
& op
);
1387 bool old_peering_msg(epoch_t reply_epoch
, epoch_t query_epoch
);
1388 bool old_peering_evt(PGPeeringEventRef evt
) {
1389 return old_peering_msg(evt
->get_epoch_sent(), evt
->get_epoch_requested());
1391 bool have_same_or_newer_map(epoch_t e
) {
1392 return e
<= get_osdmap_epoch();
1395 bool op_has_sufficient_caps(OpRequestRef
& op
);
1398 friend struct FlushState
;
1400 friend ostream
& operator<<(ostream
& out
, const PG
& pg
);
1403 PeeringState recovery_state
;
1405 // ref to recovery_state.pool
1408 // ref to recovery_state.info
1409 const pg_info_t
&info
;
1412 // ScrubberPasskey getters/misc:
1414 const pg_info_t
& get_pg_info(ScrubberPasskey
) const final
{ return info
; }
1416 OSDService
* get_pg_osd(ScrubberPasskey
) const { return osd
; }
1418 requested_scrub_t
& get_planned_scrub(ScrubberPasskey
)
1420 return m_planned_scrub
;
1423 void force_object_missing(ScrubberPasskey
,
1424 const std::set
<pg_shard_t
>& peer
,
1425 const hobject_t
& oid
,
1426 eversion_t version
) final
1428 recovery_state
.force_object_missing(peer
, oid
, version
);
1431 uint64_t logical_to_ondisk_size(uint64_t logical_size
) const final
1433 return get_pgbackend()->be_get_ondisk_size(logical_size
);