1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include <boost/statechart/custom_reaction.hpp>
19 #include <boost/statechart/event.hpp>
20 #include <boost/statechart/simple_state.hpp>
21 #include <boost/statechart/state.hpp>
22 #include <boost/statechart/state_machine.hpp>
23 #include <boost/statechart/transition.hpp>
24 #include <boost/statechart/event_base.hpp>
25 #include <boost/scoped_ptr.hpp>
26 #include <boost/container/flat_set.hpp>
27 #include "include/mempool.h"
29 // re-include our assert to clobber boost's
30 #include "include/ceph_assert.h"
31 #include "include/common_fwd.h"
33 #include "include/types.h"
34 #include "include/stringify.h"
35 #include "osd_types.h"
36 #include "include/xlist.h"
37 #include "SnapMapper.h"
39 #include "common/Timer.h"
43 #include "messages/MOSDPGLog.h"
44 #include "include/str_list.h"
45 #include "PGBackend.h"
46 #include "PGPeeringEvent.h"
47 #include "PeeringState.h"
48 #include "MissingLoc.h"
50 #include "mgr/OSDPerfMetricTypes.h"
58 //#define DEBUG_RECOVERY_OIDS // track set of recovering oids explicitly, to find counting bugs
59 //#define PG_DEBUG_REFS // track provenance of pg refs, helpful for finding leaks
72 typedef OpRequest::Ref OpRequestRef
;
74 class DynamicPerfStats
;
81 #include "common/tracked_int_ptr.hpp"
82 uint64_t get_with_id(PG
*pg
);
83 void put_with_id(PG
*pg
, uint64_t id
);
84 typedef TrackedIntPtr
<PG
> PGRef
;
86 typedef boost::intrusive_ptr
<PG
> PGRef
;
89 class PGRecoveryStats
{
90 struct per_state_info
{
91 uint64_t enter
, exit
; // enter/exit counts
93 utime_t event_time
; // time spent processing events
94 utime_t total_time
; // total time in state
95 utime_t min_time
, max_time
;
97 // cppcheck-suppress unreachableCode
98 per_state_info() : enter(0), exit(0), events(0) {}
100 map
<const char *,per_state_info
> info
;
101 ceph::mutex lock
= ceph::make_mutex("PGRecoverStats::lock");
104 PGRecoveryStats() = default;
107 std::lock_guard
l(lock
);
110 void dump(ostream
& out
) {
111 std::lock_guard
l(lock
);
112 for (map
<const char *,per_state_info
>::iterator p
= info
.begin(); p
!= info
.end(); ++p
) {
113 per_state_info
& i
= p
->second
;
114 out
<< i
.enter
<< "\t" << i
.exit
<< "\t"
115 << i
.events
<< "\t" << i
.event_time
<< "\t"
116 << i
.total_time
<< "\t"
117 << i
.min_time
<< "\t" << i
.max_time
<< "\t"
122 void dump_formatted(Formatter
*f
) {
123 std::lock_guard
l(lock
);
124 f
->open_array_section("pg_recovery_stats");
125 for (map
<const char *,per_state_info
>::iterator p
= info
.begin();
126 p
!= info
.end(); ++p
) {
127 per_state_info
& i
= p
->second
;
128 f
->open_object_section("recovery_state");
129 f
->dump_int("enter", i
.enter
);
130 f
->dump_int("exit", i
.exit
);
131 f
->dump_int("events", i
.events
);
132 f
->dump_stream("event_time") << i
.event_time
;
133 f
->dump_stream("total_time") << i
.total_time
;
134 f
->dump_stream("min_time") << i
.min_time
;
135 f
->dump_stream("max_time") << i
.max_time
;
136 vector
<string
> states
;
137 get_str_vec(p
->first
, "/", states
);
138 f
->open_array_section("nested_states");
139 for (vector
<string
>::iterator st
= states
.begin();
140 st
!= states
.end(); ++st
) {
141 f
->dump_string("state", *st
);
149 void log_enter(const char *s
) {
150 std::lock_guard
l(lock
);
153 void log_exit(const char *s
, utime_t dur
, uint64_t events
, utime_t event_dur
) {
154 std::lock_guard
l(lock
);
155 per_state_info
&i
= info
[s
];
158 if (dur
> i
.max_time
)
160 if (dur
< i
.min_time
|| i
.min_time
== utime_t())
163 i
.event_time
+= event_dur
;
167 /** PG - Replica Placement Group
171 class PG
: public DoutPrefixProvider
, public PeeringState::PeeringListener
{
172 friend class NamedState
;
173 friend class PeeringState
;
176 const pg_shard_t pg_whoami
;
183 ObjectStore::CollectionHandle ch
;
186 std::ostream
& gen_prefix(std::ostream
& out
) const override
;
187 CephContext
*get_cct() const override
{
190 unsigned get_subsys() const override
{
191 return ceph_subsys_osd
;
194 const char* const get_current_state() const {
195 return recovery_state
.get_current_state();
198 const OSDMapRef
& get_osdmap() const {
199 ceph_assert(is_locked());
200 return recovery_state
.get_osdmap();
203 epoch_t
get_osdmap_epoch() const override final
{
204 return recovery_state
.get_osdmap()->get_epoch();
207 PerfCounters
&get_peering_perf() override
;
208 PerfCounters
&get_perf_logger() override
;
209 void log_state_enter(const char *state
) override
;
211 const char *state_name
, utime_t enter_time
,
212 uint64_t events
, utime_t event_dur
) override
;
214 void lock_suspend_timeout(ThreadPool::TPHandle
&handle
) {
215 handle
.suspend_tp_timeout();
217 handle
.reset_tp_timeout();
219 void lock(bool no_lockdep
= false) const;
221 bool is_locked() const;
223 const spg_t
& get_pgid() const {
227 const PGPool
& get_pool() const {
230 uint64_t get_last_user_version() const {
231 return info
.last_user_version
;
233 const pg_history_t
& get_history() const {
236 bool get_need_up_thru() const {
237 return recovery_state
.get_need_up_thru();
239 epoch_t
get_same_interval_since() const {
240 return info
.history
.same_interval_since
;
243 static void set_last_scrub_stamp(
244 utime_t t
, pg_history_t
&history
, pg_stat_t
&stats
) {
245 stats
.last_scrub_stamp
= t
;
246 history
.last_scrub_stamp
= t
;
249 void set_last_scrub_stamp(utime_t t
) {
250 recovery_state
.update_stats(
251 [=](auto &history
, auto &stats
) {
252 set_last_scrub_stamp(t
, history
, stats
);
257 static void set_last_deep_scrub_stamp(
258 utime_t t
, pg_history_t
&history
, pg_stat_t
&stats
) {
259 stats
.last_deep_scrub_stamp
= t
;
260 history
.last_deep_scrub_stamp
= t
;
263 void set_last_deep_scrub_stamp(utime_t t
) {
264 recovery_state
.update_stats(
265 [=](auto &history
, auto &stats
) {
266 set_last_deep_scrub_stamp(t
, history
, stats
);
271 bool is_deleting() const {
272 return recovery_state
.is_deleting();
274 bool is_deleted() const {
275 return recovery_state
.is_deleted();
277 bool is_nonprimary() const {
278 return recovery_state
.is_nonprimary();
280 bool is_primary() const {
281 return recovery_state
.is_primary();
283 bool pg_has_reset_since(epoch_t e
) {
284 ceph_assert(is_locked());
285 return recovery_state
.pg_has_reset_since(e
);
288 bool is_ec_pg() const {
289 return recovery_state
.is_ec_pg();
291 int get_role() const {
292 return recovery_state
.get_role();
294 const vector
<int> get_acting() const {
295 return recovery_state
.get_acting();
297 const set
<pg_shard_t
> &get_actingset() const {
298 return recovery_state
.get_actingset();
300 int get_acting_primary() const {
301 return recovery_state
.get_acting_primary();
303 pg_shard_t
get_primary() const {
304 return recovery_state
.get_primary();
306 const vector
<int> get_up() const {
307 return recovery_state
.get_up();
309 int get_up_primary() const {
310 return recovery_state
.get_up_primary();
312 const PastIntervals
& get_past_intervals() const {
313 return recovery_state
.get_past_intervals();
315 bool is_acting_recovery_backfill(pg_shard_t osd
) const {
316 return recovery_state
.is_acting_recovery_backfill(osd
);
318 const set
<pg_shard_t
> &get_acting_recovery_backfill() const {
319 return recovery_state
.get_acting_recovery_backfill();
321 bool is_acting(pg_shard_t osd
) const {
322 return recovery_state
.is_acting(osd
);
324 bool is_up(pg_shard_t osd
) const {
325 return recovery_state
.is_up(osd
);
327 static bool has_shard(bool ec
, const vector
<int>& v
, pg_shard_t osd
) {
328 return PeeringState::has_shard(ec
, v
, osd
);
331 /// initialize created PG
334 const vector
<int>& up
,
336 const vector
<int>& acting
,
338 const pg_history_t
& history
,
339 const PastIntervals
& pim
,
341 ObjectStore::Transaction
&t
);
343 /// read existing pg state off disk
344 void read_state(ObjectStore
*store
);
345 static int peek_map_epoch(ObjectStore
*store
, spg_t pgid
, epoch_t
*pepoch
);
347 static int get_latest_struct_v() {
348 return pg_latest_struct_v
;
350 static int get_compat_struct_v() {
351 return pg_compat_struct_v
;
353 static int read_info(
354 ObjectStore
*store
, spg_t pgid
, const coll_t
&coll
,
355 pg_info_t
&info
, PastIntervals
&past_intervals
,
357 static bool _has_removal_flag(ObjectStore
*store
, spg_t pgid
);
359 void rm_backoff(const ceph::ref_t
<Backoff
>& b
);
361 void update_snap_mapper_bits(uint32_t bits
) {
362 snap_mapper
.update_bits(bits
);
364 void start_split_stats(const set
<spg_t
>& childpgs
, vector
<object_stat_sum_t
> *v
);
365 virtual void split_colls(
369 const pg_pool_t
*pool
,
370 ObjectStore::Transaction
&t
) = 0;
371 void split_into(pg_t child_pgid
, PG
*child
, unsigned split_bits
);
372 void merge_from(map
<spg_t
,PGRef
>& sources
, PeeringCtx
&rctx
,
374 const pg_merge_meta_t
& last_pg_merge_meta
);
375 void finish_split_stats(const object_stat_sum_t
& stats
,
376 ObjectStore::Transaction
&t
);
378 void scrub(epoch_t queued
, ThreadPool::TPHandle
&handle
);
380 bool is_scrub_registered();
381 void reg_next_scrub();
382 void unreg_next_scrub();
384 void queue_want_pg_temp(const vector
<int> &wanted
) override
;
385 void clear_want_pg_temp() override
;
387 void on_new_interval() override
;
389 void on_role_change() override
;
390 virtual void plpg_on_role_change() = 0;
392 void init_collection_pool_opts();
393 void on_pool_change() override
;
394 virtual void plpg_on_pool_change() = 0;
396 void on_info_history_change() override
;
398 void scrub_requested(bool deep
, bool repair
, bool need_auto
= false) override
;
400 uint64_t get_snap_trimq_size() const override
{
401 return snap_trimq
.size();
403 unsigned get_target_pg_log_entries() const override
;
405 void clear_publish_stats() override
;
406 void clear_primary_state() override
;
408 epoch_t
oldest_stored_osdmap() override
;
409 OstreamTemp
get_clog_error() override
;
410 OstreamTemp
get_clog_info() override
;
411 OstreamTemp
get_clog_debug() override
;
413 void schedule_event_after(
414 PGPeeringEventRef event
,
415 float delay
) override
;
416 void request_local_background_io_reservation(
418 PGPeeringEventRef on_grant
,
419 PGPeeringEventRef on_preempt
) override
;
420 void update_local_background_io_priority(
421 unsigned priority
) override
;
422 void cancel_local_background_io_reservation() override
;
424 void request_remote_recovery_reservation(
426 PGPeeringEventRef on_grant
,
427 PGPeeringEventRef on_preempt
) override
;
428 void cancel_remote_recovery_reservation() override
;
430 void schedule_event_on_commit(
431 ObjectStore::Transaction
&t
,
432 PGPeeringEventRef on_commit
) override
;
434 void on_active_exit() override
;
436 Context
*on_clean() override
{
440 requeue_ops(waiting_for_clean_to_primary_repair
);
441 return finish_recovery();
444 void on_activate(interval_set
<snapid_t
> snaps
) override
{
445 ceph_assert(scrubber
.callbacks
.empty());
446 ceph_assert(callbacks_for_degraded_object
.empty());
448 release_pg_backoffs();
449 projected_last_update
= info
.last_update
;
452 void on_activate_committed() override
;
454 void on_active_actmap() override
;
455 void on_active_advmap(const OSDMapRef
&osdmap
) override
;
457 void queue_snap_retrim(snapid_t snap
);
459 void on_backfill_reserved() override
;
460 void on_backfill_canceled() override
;
461 void on_recovery_reserved() override
;
463 bool is_forced_recovery_or_backfill() const {
464 return recovery_state
.is_forced_recovery_or_backfill();
467 PGLog::LogEntryHandlerRef
get_log_handler(
468 ObjectStore::Transaction
&t
) override
{
469 return std::make_unique
<PG::PGLogEntryHandler
>(this, &t
);
472 ghobject_t
do_delete_work(ObjectStore::Transaction
&t
,
473 ghobject_t _next
) override
;
475 void clear_ready_to_merge() override
;
476 void set_not_ready_to_merge_target(pg_t pgid
, pg_t src
) override
;
477 void set_not_ready_to_merge_source(pg_t pgid
) override
;
478 void set_ready_to_merge_target(eversion_t lu
, epoch_t les
, epoch_t lec
) override
;
479 void set_ready_to_merge_source(eversion_t lu
) override
;
481 void send_pg_created(pg_t pgid
) override
;
483 ceph::signedspan
get_mnow() override
;
484 HeartbeatStampsRef
get_hb_stamps(int peer
) override
;
485 void schedule_renew_lease(epoch_t lpr
, ceph::timespan delay
) override
;
486 void queue_check_readable(epoch_t lpr
, ceph::timespan delay
) override
;
488 void rebuild_missing_set_with_deletes(PGLog
&pglog
) override
;
490 void queue_peering_event(PGPeeringEventRef evt
);
491 void do_peering_event(PGPeeringEventRef evt
, PeeringCtx
&rcx
);
492 void queue_null(epoch_t msg_epoch
, epoch_t query_epoch
);
493 void queue_flushed(epoch_t started_at
);
494 void handle_advance_map(
495 OSDMapRef osdmap
, OSDMapRef lastmap
,
496 vector
<int>& newup
, int up_primary
,
497 vector
<int>& newacting
, int acting_primary
,
499 void handle_activate_map(PeeringCtx
&rctx
);
500 void handle_initialize(PeeringCtx
&rxcx
);
501 void handle_query_state(Formatter
*f
);
504 * @param ops_begun returns how many recovery ops the function started
505 * @returns true if any useful work was accomplished; false otherwise
507 virtual bool start_recovery_ops(
509 ThreadPool::TPHandle
&handle
,
510 uint64_t *ops_begun
) = 0;
512 // more work after the above, but with a PeeringCtx
513 void find_unfound(epoch_t queued
, PeeringCtx
&rctx
);
515 virtual void get_watchers(std::list
<obj_watch_item_t
> *ls
) = 0;
517 void dump_pgstate_history(Formatter
*f
);
518 void dump_missing(Formatter
*f
);
520 void get_pg_stats(std::function
<void(const pg_stat_t
&, epoch_t lec
)> f
);
521 void with_heartbeat_peers(std::function
<void(int)> f
);
524 virtual void on_shutdown() = 0;
526 bool get_must_scrub() const {
527 return scrubber
.must_scrub
;
531 virtual void do_request(
533 ThreadPool::TPHandle
&handle
535 virtual void clear_cache() = 0;
536 virtual int get_cache_obj_count() = 0;
538 virtual void snap_trimmer(epoch_t epoch_queued
) = 0;
539 virtual void do_command(
540 const string_view
& prefix
,
541 const cmdmap_t
& cmdmap
,
542 const bufferlist
& idata
,
543 std::function
<void(int,const std::string
&,bufferlist
&)> on_finish
) = 0;
545 virtual bool agent_work(int max
) = 0;
546 virtual bool agent_work(int max
, int agent_flush_quota
) = 0;
547 virtual void agent_stop() = 0;
548 virtual void agent_delay() = 0;
549 virtual void agent_clear() = 0;
550 virtual void agent_choose_mode_restart() = 0;
552 struct C_DeleteMore
: public Context
{
555 C_DeleteMore(PG
*p
, epoch_t e
) : pg(p
), epoch(e
) {}
556 void finish(int r
) override
{
559 void complete(int r
) override
;
562 void _delete_some(ObjectStore::Transaction
*t
);
564 virtual void set_dynamic_perf_stats_queries(
565 const std::list
<OSDPerfMetricQuery
> &queries
) {
567 virtual void get_dynamic_perf_stats(DynamicPerfStats
*stats
) {
570 uint64_t get_min_alloc_size() const;
572 // reference counting
574 uint64_t get_with_id();
575 void put_with_id(uint64_t);
576 void dump_live_ids();
578 void get(const char* tag
);
579 void put(const char* tag
);
585 PG(OSDService
*o
, OSDMapRef curmap
,
586 const PGPool
&pool
, spg_t p
);
590 explicit PG(const PG
& rhs
) = delete;
591 PG
& operator=(const PG
& rhs
) = delete;
598 OSDShard
*osd_shard
= nullptr;
599 OSDShardPGSlot
*pg_slot
= nullptr;
603 // locking and reference counting.
604 // I destroy myself when the reference count hits zero.
605 // lock() should be called before doing anything.
606 // get() should be called on pointer copy (to another thread, etc.).
607 // put() should be called on destruction of some previously copied pointer.
608 // unlock() when done with the current pointer (_most common_).
609 mutable ceph::mutex _lock
= ceph::make_mutex("PG::_lock");
610 #ifndef CEPH_DEBUG_MUTEX
611 mutable std::thread::id locked_by
;
613 std::atomic
<unsigned int> ref
{0};
616 ceph::mutex _ref_id_lock
= ceph::make_mutex("PG::_ref_id_lock");
617 map
<uint64_t, string
> _live_ids
;
618 map
<string
, uint64_t> _tag_counts
;
619 uint64_t _ref_id
= 0;
621 friend uint64_t get_with_id(PG
*pg
) { return pg
->get_with_id(); }
622 friend void put_with_id(PG
*pg
, uint64_t id
) { return pg
->put_with_id(id
); }
626 friend void intrusive_ptr_add_ref(PG
*pg
) {
629 friend void intrusive_ptr_release(PG
*pg
) {
634 // =====================
638 SnapMapper snap_mapper
;
639 bool eio_errors_to_process
= false;
641 virtual PGBackend
*get_pgbackend() = 0;
642 virtual const PGBackend
* get_pgbackend() const = 0;
645 void requeue_map_waiters();
649 ZTracer::Endpoint trace_endpoint
;
653 __u8 info_struct_v
= 0;
654 void upgrade(ObjectStore
*store
);
657 ghobject_t pgmeta_oid
;
659 // ------------------
660 interval_set
<snapid_t
> snap_trimq
;
661 set
<snapid_t
> snap_trimq_repeat
;
663 /* You should not use these items without taking their respective queue locks
664 * (if they have one) */
665 xlist
<PG
*>::item stat_queue_item
;
667 bool recovery_queued
;
669 int recovery_ops_active
;
670 set
<pg_shard_t
> waiting_on_backfill
;
671 #ifdef DEBUG_RECOVERY_OIDS
672 multiset
<hobject_t
> recovering_oids
;
676 bool dne() { return info
.dne(); }
678 virtual void send_cluster_message(
679 int osd
, Message
*m
, epoch_t epoch
, bool share_map_update
) override
;
682 epoch_t
get_last_peering_reset() const {
683 return recovery_state
.get_last_peering_reset();
686 /* heartbeat peers */
687 void set_probe_targets(const set
<pg_shard_t
> &probe_set
) override
;
688 void clear_probe_targets() override
;
690 ceph::mutex heartbeat_peer_lock
=
691 ceph::make_mutex("PG::heartbeat_peer_lock");
692 set
<int> heartbeat_peers
;
693 set
<int> probe_targets
;
699 * Represents the objects in a range [begin, end)
702 * 1) begin == end == hobject_t() indicates the the interval is unpopulated
703 * 2) Else, objects contains all objects in [begin, end)
705 struct BackfillInterval
{
706 // info about a backfill interval on a peer
707 eversion_t version
; /// version at which the scan occurred
708 map
<hobject_t
,eversion_t
> objects
;
714 *this = BackfillInterval();
717 /// clear objects list only
718 void clear_objects() {
722 /// reinstantiate with a new start+end position and sort order
723 void reset(hobject_t start
) {
728 /// true if there are no objects in this interval
730 return objects
.empty();
733 /// true if interval extends to the end of the range
734 bool extends_to_end() const {
738 /// removes items <= soid and adjusts begin to the first object
739 void trim_to(const hobject_t
&soid
) {
741 while (!objects
.empty() &&
742 objects
.begin()->first
<= soid
) {
747 /// Adjusts begin to the first object
749 if (!objects
.empty())
750 begin
= objects
.begin()->first
;
755 /// drop first entry, and adjust @begin accordingly
757 ceph_assert(!objects
.empty());
758 objects
.erase(objects
.begin());
763 void dump(Formatter
*f
) const {
764 f
->dump_stream("begin") << begin
;
765 f
->dump_stream("end") << end
;
766 f
->open_array_section("objects");
767 for (map
<hobject_t
, eversion_t
>::const_iterator i
=
771 f
->open_object_section("object");
772 f
->dump_stream("object") << i
->first
;
773 f
->dump_stream("version") << i
->second
;
781 BackfillInterval backfill_info
;
782 map
<pg_shard_t
, BackfillInterval
> peer_backfill_info
;
783 bool backfill_reserving
;
785 // The primary's num_bytes and local num_bytes for this pg, only valid
786 // during backfill for non-primary shards.
787 // Both of these are adjusted for EC to reflect the on-disk bytes
788 std::atomic
<int64_t> primary_num_bytes
= 0;
789 std::atomic
<int64_t> local_num_bytes
= 0;
792 // Space reserved for backfill is primary_num_bytes - local_num_bytes
793 // Don't care that difference itself isn't atomic
794 uint64_t get_reserved_num_bytes() {
795 int64_t primary
= primary_num_bytes
.load();
796 int64_t local
= local_num_bytes
.load();
798 return primary
- local
;
803 bool is_remote_backfilling() {
804 return primary_num_bytes
.load() > 0;
807 bool try_reserve_recovery_space(int64_t primary
, int64_t local
) override
;
808 void unreserve_recovery_space() override
;
810 // If num_bytes are inconsistent and local_num- goes negative
811 // it's ok, because it would then be ignored.
813 // The value of num_bytes could be negative,
814 // but we don't let local_num_bytes go negative.
815 void add_local_num_bytes(int64_t num_bytes
) {
817 int64_t prev_bytes
= local_num_bytes
.load();
820 new_bytes
= prev_bytes
+ num_bytes
;
823 } while(!local_num_bytes
.compare_exchange_weak(prev_bytes
, new_bytes
));
826 void sub_local_num_bytes(int64_t num_bytes
) {
827 ceph_assert(num_bytes
>= 0);
829 int64_t prev_bytes
= local_num_bytes
.load();
832 new_bytes
= prev_bytes
- num_bytes
;
835 } while(!local_num_bytes
.compare_exchange_weak(prev_bytes
, new_bytes
));
838 // The value of num_bytes could be negative,
839 // but we don't let info.stats.stats.sum.num_bytes go negative.
840 void add_num_bytes(int64_t num_bytes
) {
841 ceph_assert(ceph_mutex_is_locked_by_me(_lock
));
843 recovery_state
.update_stats(
844 [num_bytes
](auto &history
, auto &stats
) {
845 stats
.stats
.sum
.num_bytes
+= num_bytes
;
846 if (stats
.stats
.sum
.num_bytes
< 0) {
847 stats
.stats
.sum
.num_bytes
= 0;
853 void sub_num_bytes(int64_t num_bytes
) {
854 ceph_assert(ceph_mutex_is_locked_by_me(_lock
));
855 ceph_assert(num_bytes
>= 0);
857 recovery_state
.update_stats(
858 [num_bytes
](auto &history
, auto &stats
) {
859 stats
.stats
.sum
.num_bytes
-= num_bytes
;
860 if (stats
.stats
.sum
.num_bytes
< 0) {
861 stats
.stats
.sum
.num_bytes
= 0;
868 // Only used in testing so not worried about needing the PG lock here
869 int64_t get_stats_num_bytes() {
870 std::lock_guard l
{_lock
};
871 int num_bytes
= info
.stats
.stats
.sum
.num_bytes
;
872 if (pool
.info
.is_erasure()) {
873 num_bytes
/= (int)get_pgbackend()->get_ec_data_chunk_count();
874 // Round up each object by a stripe
875 num_bytes
+= get_pgbackend()->get_ec_stripe_chunk_size() * info
.stats
.stats
.sum
.num_objects
;
877 int64_t lnb
= local_num_bytes
.load();
878 if (lnb
&& lnb
!= num_bytes
) {
879 lgeneric_dout(cct
, 0) << this << " " << info
.pgid
<< " num_bytes mismatch "
880 << lnb
<< " vs stats "
881 << info
.stats
.stats
.sum
.num_bytes
<< " / chunk "
882 << get_pgbackend()->get_ec_data_chunk_count()
891 * blocked request wait hierarchy
893 * In order to preserve request ordering we need to be careful about the
894 * order in which blocked requests get requeued. Generally speaking, we
895 * push the requests back up to the op_wq in reverse order (most recent
896 * request first) so that they come back out again in the original order.
897 * However, because there are multiple wait queues, we need to requeue
898 * waitlists in order. Generally speaking, we requeue the wait lists
899 * that are checked first.
901 * Here are the various wait lists, in the order they are used during
902 * request processing, with notes:
905 * - may start or stop blocking at any time (depending on client epoch)
906 * - waiting_for_peered
908 * - only starts blocking on interval change; never restarts
909 * - waiting_for_flush
910 * - flushes_in_progress
911 * - waiting for final flush during activate
912 * - waiting_for_active
914 * - only starts blocking on interval change; never restarts
915 * - waiting_for_readable
916 * - now > readable_until
917 * - unblocks when we get fresh(er) osd_pings
918 * - waiting_for_scrub
919 * - starts and stops blocking for varying intervals during scrub
920 * - waiting_for_unreadable_object
921 * - never restarts once object is readable (* except for EIO?)
922 * - waiting_for_degraded_object
923 * - never restarts once object is writeable (* except for EIO?)
924 * - waiting_for_blocked_object
925 * - starts and stops based on proxied op activity
927 * - starts and stops based on read/write activity
931 * 1. During and interval change, we requeue *everything* in the above order.
933 * 2. When an obc rwlock is released, we check for a scrub block and requeue
934 * the op there if it applies. We ignore the unreadable/degraded/blocked
935 * queues because we assume they cannot apply at that time (this is
936 * probably mostly true).
938 * 3. The requeue_ops helper will push ops onto the waiting_for_map list if
941 * These three behaviors are generally sufficient to maintain ordering, with
942 * the possible exception of cases where we make an object degraded or
943 * unreadable that was previously okay, e.g. when scrub or op processing
944 * encounter an unexpected error. FIXME.
947 // ops with newer maps than our (or blocked behind them)
948 // track these by client, since inter-request ordering doesn't otherwise
950 unordered_map
<entity_name_t
,list
<OpRequestRef
>> waiting_for_map
;
952 // ops waiting on peered
953 list
<OpRequestRef
> waiting_for_peered
;
955 /// ops waiting on readble
956 list
<OpRequestRef
> waiting_for_readable
;
958 // ops waiting on active (require peered as well)
959 list
<OpRequestRef
> waiting_for_active
;
960 list
<OpRequestRef
> waiting_for_flush
;
961 list
<OpRequestRef
> waiting_for_scrub
;
963 list
<OpRequestRef
> waiting_for_cache_not_full
;
964 list
<OpRequestRef
> waiting_for_clean_to_primary_repair
;
965 map
<hobject_t
, list
<OpRequestRef
>> waiting_for_unreadable_object
,
966 waiting_for_degraded_object
,
967 waiting_for_blocked_object
;
969 set
<hobject_t
> objects_blocked_on_cache_full
;
970 map
<hobject_t
,snapid_t
> objects_blocked_on_degraded_snap
;
971 map
<hobject_t
,ObjectContextRef
> objects_blocked_on_snap_promotion
;
973 // Callbacks should assume pg (and nothing else) is locked
974 map
<hobject_t
, list
<Context
*>> callbacks_for_degraded_object
;
978 tuple
<OpRequestRef
, version_t
, int,
979 vector
<pg_log_op_return_item_t
>>>> waiting_for_ondisk
;
981 void requeue_object_waiters(map
<hobject_t
, list
<OpRequestRef
>>& m
);
982 void requeue_op(OpRequestRef op
);
983 void requeue_ops(list
<OpRequestRef
> &l
);
985 // stats that persist lazily
986 object_stat_collection_t unstable_stats
;
989 ceph::mutex pg_stats_publish_lock
=
990 ceph::make_mutex("PG::pg_stats_publish_lock");
991 bool pg_stats_publish_valid
;
992 pg_stat_t pg_stats_publish
;
994 friend class TestOpsSocketHook
;
995 void publish_stats_to_osd() override
;
997 bool needs_recovery() const {
998 return recovery_state
.needs_recovery();
1000 bool needs_backfill() const {
1001 return recovery_state
.needs_backfill();
1004 bool all_unfound_are_queried_or_lost(const OSDMapRef osdmap
) const;
1006 struct PGLogEntryHandler
: public PGLog::LogEntryHandler
{
1008 ObjectStore::Transaction
*t
;
1009 PGLogEntryHandler(PG
*pg
, ObjectStore::Transaction
*t
) : pg(pg
), t(t
) {}
1012 void remove(const hobject_t
&hoid
) override
{
1013 pg
->get_pgbackend()->remove(hoid
, t
);
1015 void try_stash(const hobject_t
&hoid
, version_t v
) override
{
1016 pg
->get_pgbackend()->try_stash(hoid
, v
, t
);
1018 void rollback(const pg_log_entry_t
&entry
) override
{
1019 ceph_assert(entry
.can_rollback());
1020 pg
->get_pgbackend()->rollback(entry
, t
);
1022 void rollforward(const pg_log_entry_t
&entry
) override
{
1023 pg
->get_pgbackend()->rollforward(entry
, t
);
1025 void trim(const pg_log_entry_t
&entry
) override
{
1026 pg
->get_pgbackend()->trim(entry
, t
);
1030 void update_object_snap_mapping(
1031 ObjectStore::Transaction
*t
, const hobject_t
&soid
,
1032 const set
<snapid_t
> &snaps
);
1033 void clear_object_snap_mapping(
1034 ObjectStore::Transaction
*t
, const hobject_t
&soid
);
1035 void remove_snap_mapped_object(
1036 ObjectStore::Transaction
& t
, const hobject_t
& soid
);
1038 bool have_unfound() const {
1039 return recovery_state
.have_unfound();
1041 uint64_t get_num_unfound() const {
1042 return recovery_state
.get_num_unfound();
1045 virtual void check_local() = 0;
1047 void purge_strays();
1049 void update_heartbeat_peers(set
<int> peers
) override
;
1051 Context
*finish_sync_event
;
1053 Context
*finish_recovery();
1054 void _finish_recovery(Context
*c
);
1055 struct C_PG_FinishRecovery
: public Context
{
1057 explicit C_PG_FinishRecovery(PG
*p
) : pg(p
) {}
1058 void finish(int r
) override
{
1059 pg
->_finish_recovery(this);
1062 void cancel_recovery();
1063 void clear_recovery_state();
1064 virtual void _clear_recovery_state() = 0;
1065 void start_recovery_op(const hobject_t
& soid
);
1066 void finish_recovery_op(const hobject_t
& soid
, bool dequeue
=false);
1068 virtual void _split_into(pg_t child_pgid
, PG
*child
, unsigned split_bits
) = 0;
1070 friend class C_OSD_RepModify_Commit
;
1071 friend class C_DeleteMore
;
1074 ceph::mutex backoff_lock
= // orders inside Backoff::lock
1075 ceph::make_mutex("PG::backoff_lock");
1076 map
<hobject_t
,set
<ceph::ref_t
<Backoff
>>> backoffs
;
1078 void add_backoff(const ceph::ref_t
<Session
>& s
, const hobject_t
& begin
, const hobject_t
& end
);
1079 void release_backoffs(const hobject_t
& begin
, const hobject_t
& end
);
1080 void release_backoffs(const hobject_t
& o
) {
1081 release_backoffs(o
, o
);
1083 void clear_backoffs();
1085 void add_pg_backoff(const ceph::ref_t
<Session
>& s
) {
1086 hobject_t begin
= info
.pgid
.pgid
.get_hobj_start();
1087 hobject_t end
= info
.pgid
.pgid
.get_hobj_end(pool
.info
.get_pg_num());
1088 add_backoff(s
, begin
, end
);
1091 void release_pg_backoffs() {
1092 hobject_t begin
= info
.pgid
.pgid
.get_hobj_start();
1093 hobject_t end
= info
.pgid
.pgid
.get_hobj_end(pool
.info
.get_pg_num());
1094 release_backoffs(begin
, end
);
1105 set
<pg_shard_t
> reserved_peers
;
1106 bool local_reserved
, remote_reserved
, reserve_failed
;
1107 epoch_t epoch_start
;
1109 // common to both scrubs
1111 set
<pg_shard_t
> waiting_on_whom
;
1115 ScrubMap primary_scrubmap
;
1116 ScrubMapBuilder primary_scrubmap_pos
;
1117 epoch_t replica_scrub_start
= 0;
1118 ScrubMap replica_scrubmap
;
1119 ScrubMapBuilder replica_scrubmap_pos
;
1120 map
<pg_shard_t
, ScrubMap
> received_maps
;
1121 OpRequestRef active_rep_scrub
;
1122 utime_t scrub_reg_stamp
; // stamp we registered for
1124 static utime_t
scrub_must_stamp() { return utime_t(0,1); }
1126 omap_stat_t omap_stats
= (const struct omap_stat_t
){ 0 };
1129 bool sleeping
= false;
1130 bool needs_sleep
= true;
1131 utime_t sleep_start
;
1133 // flags to indicate explicitly requested scrubs (by admin)
1134 bool must_scrub
, must_deep_scrub
, must_repair
, need_auto
, req_scrub
;
1136 // Priority to use for scrub scheduling
1137 unsigned priority
= 0;
1140 // this flag indicates whether we would like to do auto-repair of the PG or not
1142 // this flag indicates that we are scrubbing post repair to verify everything is fixed
1144 // this flag indicates that if a regular scrub detects errors <= osd_scrub_auto_repair_num_errors,
1145 // we should deep scrub in order to auto repair
1146 bool deep_scrub_on_error
;
1148 // Maps from objects with errors to missing/inconsistent peers
1149 map
<hobject_t
, set
<pg_shard_t
>> missing
;
1150 map
<hobject_t
, set
<pg_shard_t
>> inconsistent
;
1152 // Map from object with errors to good peers
1153 map
<hobject_t
, list
<pair
<ScrubMap::object
, pg_shard_t
> >> authoritative
;
1155 // Cleaned map pending snap metadata scrub
1156 ScrubMap cleaned_meta_map
;
1158 void clean_meta_map(ScrubMap
&for_meta_scrub
) {
1160 cleaned_meta_map
.objects
.empty()) {
1161 cleaned_meta_map
.swap(for_meta_scrub
);
1163 auto iter
= cleaned_meta_map
.objects
.end();
1164 --iter
; // not empty, see if clause
1165 auto begin
= cleaned_meta_map
.objects
.begin();
1166 if (iter
->first
.has_snapset()) {
1169 while (iter
!= begin
) {
1171 if (next
->first
.get_head() != iter
->first
.get_head()) {
1177 for_meta_scrub
.objects
.insert(begin
, iter
);
1178 cleaned_meta_map
.objects
.erase(begin
, iter
);
1182 // digest updates which we are waiting on
1183 int num_digest_updates_pending
;
1186 hobject_t start
, end
; // [start,end)
1187 hobject_t max_end
; // Largest end that may have been sent to replicas
1188 eversion_t subset_last_update
;
1190 // chunky scrub state
1200 WAIT_DIGEST_UPDATES
,
1205 std::unique_ptr
<Scrub::Store
> store
;
1209 int preempt_divisor
;
1211 list
<Context
*> callbacks
;
1212 void add_callback(Context
*context
) {
1213 callbacks
.push_back(context
);
1215 void run_callbacks() {
1216 list
<Context
*> to_run
;
1217 to_run
.swap(callbacks
);
1218 for (list
<Context
*>::iterator i
= to_run
.begin();
1225 static const char *state_string(const PG::Scrubber::State
& state
) {
1226 const char *ret
= NULL
;
1229 case INACTIVE
: ret
= "INACTIVE"; break;
1230 case NEW_CHUNK
: ret
= "NEW_CHUNK"; break;
1231 case WAIT_PUSHES
: ret
= "WAIT_PUSHES"; break;
1232 case WAIT_LAST_UPDATE
: ret
= "WAIT_LAST_UPDATE"; break;
1233 case BUILD_MAP
: ret
= "BUILD_MAP"; break;
1234 case BUILD_MAP_DONE
: ret
= "BUILD_MAP_DONE"; break;
1235 case WAIT_REPLICAS
: ret
= "WAIT_REPLICAS"; break;
1236 case COMPARE_MAPS
: ret
= "COMPARE_MAPS"; break;
1237 case WAIT_DIGEST_UPDATES
: ret
= "WAIT_DIGEST_UPDATES"; break;
1238 case FINISH
: ret
= "FINISH"; break;
1239 case BUILD_MAP_REPLICA
: ret
= "BUILD_MAP_REPLICA"; break;
1244 bool is_chunky_scrub_active() const { return state
!= INACTIVE
; }
1249 waiting_on_whom
.clear();
1250 if (active_rep_scrub
) {
1251 active_rep_scrub
= OpRequestRef();
1253 received_maps
.clear();
1256 must_deep_scrub
= false;
1257 must_repair
= false;
1260 time_for_deep
= false;
1261 auto_repair
= false;
1262 check_repair
= false;
1263 deep_scrub_on_error
= false;
1265 state
= PG::Scrubber::INACTIVE
;
1266 start
= hobject_t();
1268 max_end
= hobject_t();
1269 subset_last_update
= eversion_t();
1273 omap_stats
= (const struct omap_stat_t
){ 0 };
1276 inconsistent
.clear();
1278 authoritative
.clear();
1279 num_digest_updates_pending
= 0;
1280 primary_scrubmap
= ScrubMap();
1281 primary_scrubmap_pos
.reset();
1282 replica_scrubmap
= ScrubMap();
1283 replica_scrubmap_pos
.reset();
1284 cleaned_meta_map
= ScrubMap();
1287 sleep_start
= utime_t();
1290 void create_results(const hobject_t
& obj
);
1291 void cleanup_store(ObjectStore::Transaction
*t
);
1295 bool scrub_after_recovery
;
1296 bool save_req_scrub
; // Saved for scrub_after_recovery
1300 bool scrub_can_preempt
= false;
1301 bool scrub_preempted
= false;
1303 // we allow some number of preemptions of the scrub, which mean we do
1304 // not block. then we start to block. once we start blocking, we do
1305 // not stop until the scrub range is completed.
1306 bool write_blocked_by_scrub(const hobject_t
&soid
);
1308 /// true if the given range intersects the scrub interval in any way
1309 bool range_intersects_scrub(const hobject_t
&start
, const hobject_t
& end
);
1312 const hobject_t
&soid
,
1313 const list
<pair
<ScrubMap::object
, pg_shard_t
> > &ok_peers
,
1314 const set
<pg_shard_t
> &bad_peers
);
1317 void chunky_scrub(ThreadPool::TPHandle
&handle
);
1318 void scrub_compare_maps();
1320 * return true if any inconsistency/missing is repaired, false otherwise
1322 bool scrub_process_inconsistent();
1323 bool ops_blocked_by_scrub() const;
1324 void scrub_finish();
1325 void scrub_clear_state(bool keep_repair
= false);
1326 void _scan_snaps(ScrubMap
&map
);
1327 void _repair_oinfo_oid(ScrubMap
&map
);
1328 void _scan_rollback_obs(const vector
<ghobject_t
> &rollback_obs
);
1329 void _request_scrub_map(pg_shard_t replica
, eversion_t version
,
1330 hobject_t start
, hobject_t end
, bool deep
,
1331 bool allow_preemption
);
1332 int build_scrub_map_chunk(
1334 ScrubMapBuilder
&pos
,
1335 hobject_t start
, hobject_t end
, bool deep
,
1336 ThreadPool::TPHandle
&handle
);
1338 * returns true if [begin, end) is good to scrub at this time
1339 * a false return value obliges the implementer to requeue scrub when the
1340 * condition preventing scrub clears
1342 virtual bool _range_available_for_scrub(
1343 const hobject_t
&begin
, const hobject_t
&end
) = 0;
1344 virtual void scrub_snapshot_metadata(
1346 const std::map
<hobject_t
,
1347 pair
<std::optional
<uint32_t>,
1348 std::optional
<uint32_t>>> &missing_digest
) { }
1349 virtual void _scrub_clear_state() { }
1350 virtual void _scrub_finish() { }
1351 void clear_scrub_reserved();
1352 void scrub_reserve_replicas();
1353 void scrub_unreserve_replicas();
1354 bool scrub_all_replicas_reserved() const;
1358 ThreadPool::TPHandle
&handle
);
1359 void do_replica_scrub_map(OpRequestRef op
);
1361 void handle_scrub_reserve_request(OpRequestRef op
);
1362 void handle_scrub_reserve_grant(OpRequestRef op
, pg_shard_t from
);
1363 void handle_scrub_reserve_reject(OpRequestRef op
, pg_shard_t from
);
1364 void handle_scrub_reserve_release(OpRequestRef op
);
1366 // -- recovery state --
1368 struct QueuePeeringEvt
: Context
{
1370 PGPeeringEventRef evt
;
1372 template <class EVT
>
1373 QueuePeeringEvt(PG
*pg
, epoch_t epoch
, EVT evt
) :
1374 pg(pg
), evt(std::make_shared
<PGPeeringEvent
>(epoch
, epoch
, evt
)) {}
1376 QueuePeeringEvt(PG
*pg
, PGPeeringEventRef evt
) :
1377 pg(pg
), evt(std::move(evt
)) {}
1379 void finish(int r
) override
{
1381 pg
->queue_peering_event(std::move(evt
));
1388 int pg_stat_adjust(osd_stat_t
*new_stat
);
1390 bool delete_needs_sleep
= false;
1393 bool state_test(uint64_t m
) const { return recovery_state
.state_test(m
); }
1394 void state_set(uint64_t m
) { recovery_state
.state_set(m
); }
1395 void state_clear(uint64_t m
) { recovery_state
.state_clear(m
); }
1397 bool is_complete() const {
1398 return recovery_state
.is_complete();
1400 bool should_send_notify() const {
1401 return recovery_state
.should_send_notify();
1404 bool is_active() const { return recovery_state
.is_active(); }
1405 bool is_activating() const { return recovery_state
.is_activating(); }
1406 bool is_peering() const { return recovery_state
.is_peering(); }
1407 bool is_down() const { return recovery_state
.is_down(); }
1408 bool is_recovery_unfound() const { return recovery_state
.is_recovery_unfound(); }
1409 bool is_backfill_unfound() const { return recovery_state
.is_backfill_unfound(); }
1410 bool is_incomplete() const { return recovery_state
.is_incomplete(); }
1411 bool is_clean() const { return recovery_state
.is_clean(); }
1412 bool is_degraded() const { return recovery_state
.is_degraded(); }
1413 bool is_undersized() const { return recovery_state
.is_undersized(); }
1414 bool is_scrubbing() const { return state_test(PG_STATE_SCRUBBING
); }
1415 bool is_remapped() const { return recovery_state
.is_remapped(); }
1416 bool is_peered() const { return recovery_state
.is_peered(); }
1417 bool is_recovering() const { return recovery_state
.is_recovering(); }
1418 bool is_premerge() const { return recovery_state
.is_premerge(); }
1419 bool is_repair() const { return recovery_state
.is_repair(); }
1420 bool is_laggy() const { return state_test(PG_STATE_LAGGY
); }
1421 bool is_wait() const { return state_test(PG_STATE_WAIT
); }
1423 bool is_empty() const { return recovery_state
.is_empty(); }
1426 void do_pending_flush();
1429 virtual void prepare_write(
1431 pg_info_t
&last_written_info
,
1432 PastIntervals
&past_intervals
,
1435 bool dirty_big_info
,
1436 bool need_write_epoch
,
1437 ObjectStore::Transaction
&t
) override
;
1439 void write_if_dirty(PeeringCtx
&rctx
) {
1440 write_if_dirty(rctx
.transaction
);
1443 void write_if_dirty(ObjectStore::Transaction
& t
) {
1444 recovery_state
.write_if_dirty(t
);
1447 PGLog::IndexedLog projected_log
;
1448 bool check_in_progress_op(
1449 const osd_reqid_t
&r
,
1450 eversion_t
*version
,
1451 version_t
*user_version
,
1453 vector
<pg_log_op_return_item_t
> *op_returns
) const;
1454 eversion_t projected_last_update
;
1455 eversion_t
get_next_version() const {
1456 eversion_t
at_version(
1458 projected_last_update
.version
+1);
1459 ceph_assert(at_version
> info
.last_update
);
1460 ceph_assert(at_version
> recovery_state
.get_pg_log().get_head());
1461 ceph_assert(at_version
> projected_last_update
);
1465 bool check_log_for_corruption(ObjectStore
*store
);
1467 std::string
get_corrupt_pg_log_name() const;
1469 void update_snap_map(
1470 const vector
<pg_log_entry_t
> &log_entries
,
1471 ObjectStore::Transaction
& t
);
1473 void filter_snapc(vector
<snapid_t
> &snaps
);
1475 virtual void kick_snap_trim() = 0;
1476 virtual void snap_trimmer_scrub_complete() = 0;
1477 bool requeue_scrub(bool high_priority
= false);
1478 void queue_recovery();
1480 unsigned get_scrub_priority();
1482 bool try_flush_or_schedule_async() override
;
1483 void start_flush_on_transaction(
1484 ObjectStore::Transaction
&t
) override
;
1486 void update_history(const pg_history_t
& history
) {
1487 recovery_state
.update_history(history
);
1490 // OpRequest queueing
1491 bool can_discard_op(OpRequestRef
& op
);
1492 bool can_discard_scan(OpRequestRef op
);
1493 bool can_discard_backfill(OpRequestRef op
);
1494 bool can_discard_request(OpRequestRef
& op
);
1496 template<typename T
, int MSGTYPE
>
1497 bool can_discard_replica_op(OpRequestRef
& op
);
1499 bool old_peering_msg(epoch_t reply_epoch
, epoch_t query_epoch
);
1500 bool old_peering_evt(PGPeeringEventRef evt
) {
1501 return old_peering_msg(evt
->get_epoch_sent(), evt
->get_epoch_requested());
1503 bool have_same_or_newer_map(epoch_t e
) {
1504 return e
<= get_osdmap_epoch();
1507 bool op_has_sufficient_caps(OpRequestRef
& op
);
1510 friend class FlushState
;
1512 friend ostream
& operator<<(ostream
& out
, const PG
& pg
);
1515 PeeringState recovery_state
;
1517 // ref to recovery_state.pool
1520 // ref to recovery_state.info
1521 const pg_info_t
&info
;
1525 ostream
& operator<<(ostream
& out
, const PG::BackfillInterval
& bi
);