1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "messages/MOSDRepScrub.h"
18 #include "common/errno.h"
19 #include "common/ceph_releases.h"
20 #include "common/config.h"
22 #include "OpRequest.h"
23 #include "ScrubStore.h"
24 #include "pg_scrubber.h"
26 #include "osd/scheduler/OpSchedulerItem.h"
28 #include "common/Timer.h"
29 #include "common/perf_counters.h"
31 #include "messages/MOSDOp.h"
32 #include "messages/MOSDPGNotify.h"
33 #include "messages/MOSDPGInfo.h"
34 #include "messages/MOSDPGScan.h"
35 #include "messages/MOSDPGBackfill.h"
36 #include "messages/MOSDPGBackfillRemove.h"
37 #include "messages/MBackfillReserve.h"
38 #include "messages/MRecoveryReserve.h"
39 #include "messages/MOSDPGPush.h"
40 #include "messages/MOSDPGPushReply.h"
41 #include "messages/MOSDPGPull.h"
42 #include "messages/MOSDECSubOpWrite.h"
43 #include "messages/MOSDECSubOpWriteReply.h"
44 #include "messages/MOSDECSubOpRead.h"
45 #include "messages/MOSDECSubOpReadReply.h"
46 #include "messages/MOSDPGUpdateLogMissing.h"
47 #include "messages/MOSDPGUpdateLogMissingReply.h"
48 #include "messages/MOSDBackoff.h"
49 #include "messages/MOSDScrubReserve.h"
50 #include "messages/MOSDRepOp.h"
51 #include "messages/MOSDRepOpReply.h"
52 #include "messages/MOSDRepScrubMap.h"
53 #include "messages/MOSDPGRecoveryDelete.h"
54 #include "messages/MOSDPGRecoveryDeleteReply.h"
56 #include "common/BackTrace.h"
57 #include "common/EventTrace.h"
60 #define TRACEPOINT_DEFINE
61 #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
62 #include "tracing/pg.h"
63 #undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
64 #undef TRACEPOINT_DEFINE
66 #define tracepoint(...)
71 #define dout_context cct
72 #define dout_subsys ceph_subsys_osd
74 #define dout_prefix _prefix(_dout, this)
78 using std::ostringstream
;
82 using std::stringstream
;
83 using std::unique_ptr
;
86 using ceph::bufferlist
;
87 using ceph::bufferptr
;
90 using ceph::Formatter
;
92 using namespace ceph::osd::scheduler
;
95 static ostream
& _prefix(std::ostream
*_dout
, T
*t
)
97 return t
->gen_prefix(*_dout
);
100 void PG::get(const char* tag
)
103 lgeneric_subdout(cct
, refs
, 5) << "PG::get " << this << " "
104 << "tag " << (tag
? tag
: "(none") << " "
105 << (after
- 1) << " -> " << after
<< dendl
;
107 std::lock_guard
l(_ref_id_lock
);
112 void PG::put(const char* tag
)
116 std::lock_guard
l(_ref_id_lock
);
117 auto tag_counts_entry
= _tag_counts
.find(tag
);
118 ceph_assert(tag_counts_entry
!= _tag_counts
.end());
119 --tag_counts_entry
->second
;
120 if (tag_counts_entry
->second
== 0) {
121 _tag_counts
.erase(tag_counts_entry
);
125 auto local_cct
= cct
;
127 lgeneric_subdout(local_cct
, refs
, 5) << "PG::put " << this << " "
128 << "tag " << (tag
? tag
: "(none") << " "
129 << (after
+ 1) << " -> " << after
136 uint64_t PG::get_with_id()
139 std::lock_guard
l(_ref_id_lock
);
140 uint64_t id
= ++_ref_id
;
144 lgeneric_subdout(cct
, refs
, 5) << "PG::get " << this << " " << info
.pgid
145 << " got id " << id
<< " "
146 << (ref
- 1) << " -> " << ref
148 ceph_assert(!_live_ids
.count(id
));
149 _live_ids
.insert(make_pair(id
, ss
.str()));
153 void PG::put_with_id(uint64_t id
)
156 lgeneric_subdout(cct
, refs
, 5) << "PG::put " << this << " " << info
.pgid
157 << " put id " << id
<< " "
158 << (newref
+ 1) << " -> " << newref
161 std::lock_guard
l(_ref_id_lock
);
162 ceph_assert(_live_ids
.count(id
));
169 void PG::dump_live_ids()
171 std::lock_guard
l(_ref_id_lock
);
172 dout(0) << "\t" << __func__
<< ": " << info
.pgid
<< " live ids:" << dendl
;
173 for (map
<uint64_t, string
>::iterator i
= _live_ids
.begin();
174 i
!= _live_ids
.end();
176 dout(0) << "\t\tid: " << *i
<< dendl
;
178 dout(0) << "\t" << __func__
<< ": " << info
.pgid
<< " live tags:" << dendl
;
179 for (map
<string
, uint64_t>::iterator i
= _tag_counts
.begin();
180 i
!= _tag_counts
.end();
182 dout(0) << "\t\tid: " << *i
<< dendl
;
187 PG::PG(OSDService
*o
, OSDMapRef curmap
,
188 const PGPool
&_pool
, spg_t p
) :
189 pg_whoami(o
->whoami
, p
.shard
),
194 osdriver(osd
->store
, coll_t(), OSD::make_snapmapper_oid()),
199 p
.get_split_bits(_pool
.info
.get_pg_num()),
202 trace_endpoint("0.0.0.0", 0, "PG"),
204 pgmeta_oid(p
.make_pgmeta_oid()),
205 stat_queue_item(this),
207 recovery_queued(false),
208 recovery_ops_active(0),
209 backfill_reserving(false),
210 pg_stats_publish_valid(false),
211 finish_sync_event(NULL
),
212 scrub_after_recovery(false),
222 pool(recovery_state
.get_pool()),
223 info(recovery_state
.get_info())
226 osd
->add_pgid(p
, this);
229 std::stringstream ss
;
230 ss
<< "PG " << info
.pgid
;
231 trace_endpoint
.copy_name(ss
.str());
238 osd
->remove_pgid(info
.pgid
, this);
242 void PG::lock(bool no_lockdep
) const
244 #ifdef CEPH_DEBUG_MUTEX
245 _lock
.lock(no_lockdep
);
248 locked_by
= std::this_thread::get_id();
250 // if we have unrecorded dirty state with the lock dropped, there is a bug
251 ceph_assert(!recovery_state
.debug_has_dirty_state());
253 dout(30) << "lock" << dendl
;
256 bool PG::is_locked() const
258 return ceph_mutex_is_locked(_lock
);
261 void PG::unlock() const
263 //generic_dout(0) << this << " " << info.pgid << " unlock" << dendl;
264 ceph_assert(!recovery_state
.debug_has_dirty_state());
265 #ifndef CEPH_DEBUG_MUTEX
271 std::ostream
& PG::gen_prefix(std::ostream
& out
) const
273 OSDMapRef mapref
= recovery_state
.get_osdmap();
274 #ifdef CEPH_DEBUG_MUTEX
275 if (_lock
.is_locked_by_me()) {
277 if (locked_by
== std::this_thread::get_id()) {
279 out
<< "osd." << osd
->whoami
280 << " pg_epoch: " << (mapref
? mapref
->get_epoch():0)
281 << " " << *this << " ";
283 out
<< "osd." << osd
->whoami
284 << " pg_epoch: " << (mapref
? mapref
->get_epoch():0)
285 << " pg[" << pg_id
.pgid
<< "(unlocked)] ";
290 PerfCounters
&PG::get_peering_perf() {
291 return *(osd
->recoverystate_perf
);
294 PerfCounters
&PG::get_perf_logger() {
295 return *(osd
->logger
);
298 void PG::log_state_enter(const char *state
) {
299 osd
->pg_recovery_stats
.log_enter(state
);
302 void PG::log_state_exit(
303 const char *state_name
, utime_t enter_time
,
304 uint64_t events
, utime_t event_dur
) {
305 osd
->pg_recovery_stats
.log_exit(
306 state_name
, ceph_clock_now() - enter_time
, events
, event_dur
);
309 /********* PG **********/
311 void PG::remove_snap_mapped_object(
312 ObjectStore::Transaction
&t
, const hobject_t
&soid
)
316 ghobject_t(soid
, ghobject_t::NO_GEN
, pg_whoami
.shard
));
317 clear_object_snap_mapping(&t
, soid
);
320 void PG::clear_object_snap_mapping(
321 ObjectStore::Transaction
*t
, const hobject_t
&soid
)
323 OSDriver::OSTransaction
_t(osdriver
.get_transaction(t
));
324 if (soid
.snap
< CEPH_MAXSNAP
) {
325 int r
= snap_mapper
.remove_oid(
328 if (!(r
== 0 || r
== -ENOENT
)) {
329 derr
<< __func__
<< ": remove_oid returned " << cpp_strerror(r
) << dendl
;
335 void PG::update_object_snap_mapping(
336 ObjectStore::Transaction
*t
, const hobject_t
&soid
, const set
<snapid_t
> &snaps
)
338 OSDriver::OSTransaction
_t(osdriver
.get_transaction(t
));
339 ceph_assert(soid
.snap
< CEPH_MAXSNAP
);
340 int r
= snap_mapper
.remove_oid(
343 if (!(r
== 0 || r
== -ENOENT
)) {
344 derr
<< __func__
<< ": remove_oid returned " << cpp_strerror(r
) << dendl
;
353 /******* PG ***********/
354 void PG::clear_primary_state()
356 dout(20) << __func__
<< dendl
;
358 projected_log
= PGLog::IndexedLog();
361 snap_trimq_repeat
.clear();
362 finish_sync_event
= 0; // so that _finish_recovery doesn't go off in another thread
363 release_pg_backoffs();
366 m_scrubber
->discard_replica_reservations();
368 scrub_after_recovery
= false;
374 bool PG::op_has_sufficient_caps(OpRequestRef
& op
)
377 if (op
->get_req()->get_type() != CEPH_MSG_OSD_OP
)
380 auto req
= op
->get_req
<MOSDOp
>();
381 auto priv
= req
->get_connection()->get_priv();
382 auto session
= static_cast<Session
*>(priv
.get());
384 dout(0) << "op_has_sufficient_caps: no session for op " << *req
<< dendl
;
387 OSDCap
& caps
= session
->caps
;
390 const string
&key
= req
->get_hobj().get_key().empty() ?
391 req
->get_oid().name
:
392 req
->get_hobj().get_key();
394 bool cap
= caps
.is_capable(pool
.name
, req
->get_hobj().nspace
,
395 pool
.info
.application_metadata
,
398 op
->need_write_cap(),
400 session
->get_peer_socket_addr());
402 dout(20) << "op_has_sufficient_caps "
403 << "session=" << session
404 << " pool=" << pool
.id
<< " (" << pool
.name
405 << " " << req
->get_hobj().nspace
407 << " pool_app_metadata=" << pool
.info
.application_metadata
408 << " need_read_cap=" << op
->need_read_cap()
409 << " need_write_cap=" << op
->need_write_cap()
410 << " classes=" << op
->classes()
411 << " -> " << (cap
? "yes" : "NO")
416 void PG::queue_recovery()
418 if (!is_primary() || !is_peered()) {
419 dout(10) << "queue_recovery -- not primary or not peered " << dendl
;
420 ceph_assert(!recovery_queued
);
421 } else if (recovery_queued
) {
422 dout(10) << "queue_recovery -- already queued" << dendl
;
424 dout(10) << "queue_recovery -- queuing" << dendl
;
425 recovery_queued
= true;
426 osd
->queue_for_recovery(this);
430 void PG::queue_scrub_after_repair()
432 dout(10) << __func__
<< dendl
;
433 ceph_assert(ceph_mutex_is_locked(_lock
));
435 m_planned_scrub
.must_deep_scrub
= true;
436 m_planned_scrub
.check_repair
= true;
437 m_planned_scrub
.must_scrub
= true;
439 if (is_scrubbing()) {
440 dout(10) << __func__
<< ": scrubbing already" << dendl
;
444 dout(10) << __func__
<< ": already queued" << dendl
;
448 m_scrubber
->set_op_parameters(m_planned_scrub
);
449 dout(15) << __func__
<< ": queueing" << dendl
;
452 osd
->queue_scrub_after_repair(this, Scrub::scrub_prio_t::high_priority
);
455 unsigned PG::get_scrub_priority()
457 // a higher value -> a higher priority
458 int64_t pool_scrub_priority
=
459 pool
.info
.opts
.value_or(pool_opts_t::SCRUB_PRIORITY
, (int64_t)0);
460 return pool_scrub_priority
> 0 ? pool_scrub_priority
: cct
->_conf
->osd_scrub_priority
;
463 Context
*PG::finish_recovery()
465 dout(10) << "finish_recovery" << dendl
;
466 ceph_assert(info
.last_complete
== info
.last_update
);
468 clear_recovery_state();
471 * sync all this before purging strays. but don't block!
473 finish_sync_event
= new C_PG_FinishRecovery(this);
474 return finish_sync_event
;
477 void PG::_finish_recovery(Context
* c
)
479 dout(15) << __func__
<< " finish_sync_event? " << finish_sync_event
<< " clean? "
480 << is_clean() << dendl
;
482 std::scoped_lock locker
{*this};
483 if (recovery_state
.is_deleting() || !is_clean()) {
484 dout(10) << __func__
<< " raced with delete or repair" << dendl
;
487 // When recovery is initiated by a repair, that flag is left on
488 state_clear(PG_STATE_REPAIR
);
489 if (c
== finish_sync_event
) {
490 dout(15) << __func__
<< " scrub_after_recovery? " << scrub_after_recovery
<< dendl
;
491 finish_sync_event
= 0;
492 recovery_state
.purge_strays();
494 publish_stats_to_osd();
496 if (scrub_after_recovery
) {
497 dout(10) << "_finish_recovery requeueing for scrub" << dendl
;
498 scrub_after_recovery
= false;
499 queue_scrub_after_repair();
502 dout(10) << "_finish_recovery -- stale" << dendl
;
506 void PG::start_recovery_op(const hobject_t
& soid
)
508 dout(10) << "start_recovery_op " << soid
509 #ifdef DEBUG_RECOVERY_OIDS
510 << " (" << recovering_oids
<< ")"
513 ceph_assert(recovery_ops_active
>= 0);
514 recovery_ops_active
++;
515 #ifdef DEBUG_RECOVERY_OIDS
516 recovering_oids
.insert(soid
);
518 osd
->start_recovery_op(this, soid
);
521 void PG::finish_recovery_op(const hobject_t
& soid
, bool dequeue
)
523 dout(10) << "finish_recovery_op " << soid
524 #ifdef DEBUG_RECOVERY_OIDS
525 << " (" << recovering_oids
<< ")"
528 ceph_assert(recovery_ops_active
> 0);
529 recovery_ops_active
--;
530 #ifdef DEBUG_RECOVERY_OIDS
531 ceph_assert(recovering_oids
.count(soid
));
532 recovering_oids
.erase(recovering_oids
.find(soid
));
534 osd
->finish_recovery_op(this, soid
, dequeue
);
541 void PG::split_into(pg_t child_pgid
, PG
*child
, unsigned split_bits
)
543 recovery_state
.split_into(child_pgid
, &child
->recovery_state
, split_bits
);
545 child
->update_snap_mapper_bits(split_bits
);
547 child
->snap_trimq
= snap_trimq
;
548 child
->snap_trimq_repeat
= snap_trimq_repeat
;
550 _split_into(child_pgid
, child
, split_bits
);
552 // release all backoffs for simplicity
553 release_backoffs(hobject_t(), hobject_t::get_max());
556 void PG::start_split_stats(const set
<spg_t
>& childpgs
, vector
<object_stat_sum_t
> *out
)
558 recovery_state
.start_split_stats(childpgs
, out
);
561 void PG::finish_split_stats(const object_stat_sum_t
& stats
, ObjectStore::Transaction
&t
)
563 recovery_state
.finish_split_stats(stats
, t
);
566 void PG::merge_from(map
<spg_t
,PGRef
>& sources
, PeeringCtx
&rctx
,
568 const pg_merge_meta_t
& last_pg_merge_meta
)
570 dout(10) << __func__
<< " from " << sources
<< " split_bits " << split_bits
572 map
<spg_t
, PeeringState
*> source_ps
;
573 for (auto &&source
: sources
) {
574 source_ps
.emplace(source
.first
, &source
.second
->recovery_state
);
576 recovery_state
.merge_from(source_ps
, rctx
, split_bits
, last_pg_merge_meta
);
578 for (auto& i
: sources
) {
579 auto& source
= i
.second
;
580 // wipe out source's pgmeta
581 rctx
.transaction
.remove(source
->coll
, source
->pgmeta_oid
);
583 // merge (and destroy source collection)
584 rctx
.transaction
.merge_collection(source
->coll
, coll
, split_bits
);
587 // merge_collection does this, but maybe all of our sources were missing.
588 rctx
.transaction
.collection_set_bits(coll
, split_bits
);
590 snap_mapper
.update_bits(split_bits
);
593 void PG::add_backoff(const ceph::ref_t
<Session
>& s
, const hobject_t
& begin
, const hobject_t
& end
)
596 if (!con
) // OSD::ms_handle_reset clears s->con without a lock
598 auto b
= s
->have_backoff(info
.pgid
, begin
);
600 derr
<< __func__
<< " already have backoff for " << s
<< " begin " << begin
601 << " " << *b
<< dendl
;
604 std::lock_guard
l(backoff_lock
);
605 b
= ceph::make_ref
<Backoff
>(info
.pgid
, this, s
, ++s
->backoff_seq
, begin
, end
);
606 backoffs
[begin
].insert(b
);
608 dout(10) << __func__
<< " session " << s
<< " added " << *b
<< dendl
;
613 CEPH_OSD_BACKOFF_OP_BLOCK
,
619 void PG::release_backoffs(const hobject_t
& begin
, const hobject_t
& end
)
621 dout(10) << __func__
<< " [" << begin
<< "," << end
<< ")" << dendl
;
622 vector
<ceph::ref_t
<Backoff
>> bv
;
624 std::lock_guard
l(backoff_lock
);
625 auto p
= backoffs
.lower_bound(begin
);
626 while (p
!= backoffs
.end()) {
627 int r
= cmp(p
->first
, end
);
628 dout(20) << __func__
<< " ? " << r
<< " " << p
->first
629 << " " << p
->second
<< dendl
;
630 // note: must still examine begin=end=p->first case
631 if (r
> 0 || (r
== 0 && begin
< end
)) {
634 dout(20) << __func__
<< " checking " << p
->first
635 << " " << p
->second
<< dendl
;
636 auto q
= p
->second
.begin();
637 while (q
!= p
->second
.end()) {
638 dout(20) << __func__
<< " checking " << *q
<< dendl
;
639 int r
= cmp((*q
)->begin
, begin
);
640 if (r
== 0 || (r
> 0 && (*q
)->end
< end
)) {
642 q
= p
->second
.erase(q
);
647 if (p
->second
.empty()) {
648 p
= backoffs
.erase(p
);
655 std::lock_guard
l(b
->lock
);
656 dout(10) << __func__
<< " " << *b
<< dendl
;
658 ceph_assert(b
->pg
== this);
659 ConnectionRef con
= b
->session
->con
;
660 if (con
) { // OSD::ms_handle_reset clears s->con without a lock
665 CEPH_OSD_BACKOFF_OP_UNBLOCK
,
671 b
->state
= Backoff::STATE_DELETING
;
673 b
->session
->rm_backoff(b
);
681 void PG::clear_backoffs()
683 dout(10) << __func__
<< " " << dendl
;
684 map
<hobject_t
,set
<ceph::ref_t
<Backoff
>>> ls
;
686 std::lock_guard
l(backoff_lock
);
690 for (auto& b
: p
.second
) {
691 std::lock_guard
l(b
->lock
);
692 dout(10) << __func__
<< " " << *b
<< dendl
;
694 ceph_assert(b
->pg
== this);
696 b
->state
= Backoff::STATE_DELETING
;
698 b
->session
->rm_backoff(b
);
707 // called by Session::clear_backoffs()
708 void PG::rm_backoff(const ceph::ref_t
<Backoff
>& b
)
710 dout(10) << __func__
<< " " << *b
<< dendl
;
711 std::lock_guard
l(backoff_lock
);
712 ceph_assert(ceph_mutex_is_locked_by_me(b
->lock
));
713 ceph_assert(b
->pg
== this);
714 auto p
= backoffs
.find(b
->begin
);
715 // may race with release_backoffs()
716 if (p
!= backoffs
.end()) {
717 auto q
= p
->second
.find(b
);
718 if (q
!= p
->second
.end()) {
720 if (p
->second
.empty()) {
727 void PG::clear_recovery_state()
729 dout(10) << "clear_recovery_state" << dendl
;
731 finish_sync_event
= 0;
734 while (recovery_ops_active
> 0) {
735 #ifdef DEBUG_RECOVERY_OIDS
736 soid
= *recovering_oids
.begin();
738 finish_recovery_op(soid
, true);
741 backfill_info
.clear();
742 peer_backfill_info
.clear();
743 waiting_on_backfill
.clear();
744 _clear_recovery_state(); // pg impl specific hook
747 void PG::cancel_recovery()
749 dout(10) << "cancel_recovery" << dendl
;
750 clear_recovery_state();
753 void PG::set_probe_targets(const set
<pg_shard_t
> &probe_set
)
755 std::lock_guard
l(heartbeat_peer_lock
);
756 probe_targets
.clear();
757 for (set
<pg_shard_t
>::iterator i
= probe_set
.begin();
758 i
!= probe_set
.end();
760 probe_targets
.insert(i
->osd
);
764 void PG::send_cluster_message(
765 int target
, MessageRef m
,
766 epoch_t epoch
, bool share_map_update
=false)
768 ConnectionRef con
= osd
->get_con_osd_cluster(
769 target
, get_osdmap_epoch());
774 if (share_map_update
) {
775 osd
->maybe_share_map(con
.get(), get_osdmap());
777 osd
->send_message_osd_cluster(m
, con
.get());
780 void PG::clear_probe_targets()
782 std::lock_guard
l(heartbeat_peer_lock
);
783 probe_targets
.clear();
786 void PG::update_heartbeat_peers(set
<int> new_peers
)
788 bool need_update
= false;
789 heartbeat_peer_lock
.lock();
790 if (new_peers
== heartbeat_peers
) {
791 dout(10) << "update_heartbeat_peers " << heartbeat_peers
<< " unchanged" << dendl
;
793 dout(10) << "update_heartbeat_peers " << heartbeat_peers
<< " -> " << new_peers
<< dendl
;
794 heartbeat_peers
.swap(new_peers
);
797 heartbeat_peer_lock
.unlock();
800 osd
->need_heartbeat_peer_update();
804 bool PG::check_in_progress_op(
805 const osd_reqid_t
&r
,
807 version_t
*user_version
,
809 vector
<pg_log_op_return_item_t
> *op_returns
813 projected_log
.get_request(r
, version
, user_version
, return_code
,
815 recovery_state
.get_pg_log().get_log().get_request(
816 r
, version
, user_version
, return_code
, op_returns
));
819 void PG::publish_stats_to_osd()
824 std::lock_guard l
{pg_stats_publish_lock
};
825 auto stats
= recovery_state
.prepare_stats_for_publish(
826 pg_stats_publish_valid
,
830 pg_stats_publish
= stats
.value();
831 pg_stats_publish_valid
= true;
835 unsigned PG::get_target_pg_log_entries() const
837 return osd
->get_target_pg_log_entries();
840 void PG::clear_publish_stats()
842 dout(15) << "clear_stats" << dendl
;
843 std::lock_guard l
{pg_stats_publish_lock
};
844 pg_stats_publish_valid
= false;
848 * initialize a newly instantiated pg
850 * Initialize PG state, as when a PG is initially created, or when it
851 * is first instantiated on the current node.
853 * @param role our role/rank
854 * @param newup up set
855 * @param newacting acting set
856 * @param history pg history
857 * @param pi past_intervals
858 * @param backfill true if info should be marked as backfill
859 * @param t transaction to write out our new state in
863 const vector
<int>& newup
, int new_up_primary
,
864 const vector
<int>& newacting
, int new_acting_primary
,
865 const pg_history_t
& history
,
866 const PastIntervals
& pi
,
868 ObjectStore::Transaction
&t
)
871 role
, newup
, new_up_primary
, newacting
,
872 new_acting_primary
, history
, pi
, backfill
, t
);
878 std::scoped_lock l
{*this};
879 recovery_state
.shutdown();
883 #pragma GCC diagnostic ignored "-Wpragmas"
884 #pragma GCC diagnostic push
885 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
887 void PG::upgrade(ObjectStore
*store
)
889 dout(0) << __func__
<< " " << info_struct_v
<< " -> " << pg_latest_struct_v
891 ceph_assert(info_struct_v
<= 10);
892 ObjectStore::Transaction t
;
894 // <do upgrade steps here>
897 ceph_assert(info_struct_v
== 10);
899 // update infover_key
900 if (info_struct_v
< pg_latest_struct_v
) {
901 map
<string
,bufferlist
> v
;
902 __u8 ver
= pg_latest_struct_v
;
903 encode(ver
, v
[string(infover_key
)]);
904 t
.omap_setkeys(coll
, pgmeta_oid
, v
);
907 recovery_state
.force_write_state(t
);
909 ObjectStore::CollectionHandle ch
= store
->open_collection(coll
);
910 int r
= store
->queue_transaction(ch
, std::move(t
));
912 derr
<< __func__
<< ": queue_transaction returned "
913 << cpp_strerror(r
) << dendl
;
919 if (!ch
->flush_commit(&waiter
)) {
924 #pragma GCC diagnostic pop
925 #pragma GCC diagnostic warning "-Wpragmas"
927 void PG::prepare_write(
929 pg_info_t
&last_written_info
,
930 PastIntervals
&past_intervals
,
934 bool need_write_epoch
,
935 ObjectStore::Transaction
&t
)
937 info
.stats
.stats
.add(unstable_stats
);
938 unstable_stats
.clear();
939 map
<string
,bufferlist
> km
;
940 string key_to_remove
;
941 if (dirty_big_info
|| dirty_info
) {
942 int ret
= prepare_info_keymap(
952 cct
->_conf
->osd_fast_info
,
955 ceph_assert(ret
== 0);
957 pglog
.write_log_and_missing(
958 t
, &km
, coll
, pgmeta_oid
, pool
.info
.require_rollback());
960 t
.omap_setkeys(coll
, pgmeta_oid
, km
);
961 if (!key_to_remove
.empty())
962 t
.omap_rmkey(coll
, pgmeta_oid
, key_to_remove
);
965 #pragma GCC diagnostic ignored "-Wpragmas"
966 #pragma GCC diagnostic push
967 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
969 bool PG::_has_removal_flag(ObjectStore
*store
,
973 ghobject_t
pgmeta_oid(pgid
.make_pgmeta_oid());
977 keys
.insert("_remove");
978 map
<string
,bufferlist
> values
;
979 auto ch
= store
->open_collection(coll
);
981 if (store
->omap_get_values(ch
, pgmeta_oid
, keys
, &values
) == 0 &&
988 int PG::peek_map_epoch(ObjectStore
*store
,
993 ghobject_t
legacy_infos_oid(OSD::make_infos_oid());
994 ghobject_t
pgmeta_oid(pgid
.make_pgmeta_oid());
995 epoch_t cur_epoch
= 0;
997 // validate collection name
998 ceph_assert(coll
.is_pg());
1002 keys
.insert(string(infover_key
));
1003 keys
.insert(string(epoch_key
));
1004 map
<string
,bufferlist
> values
;
1005 auto ch
= store
->open_collection(coll
);
1007 int r
= store
->omap_get_values(ch
, pgmeta_oid
, keys
, &values
);
1009 ceph_assert(values
.size() == 2);
1011 // sanity check version
1012 auto bp
= values
[string(infover_key
)].cbegin();
1014 decode(struct_v
, bp
);
1015 ceph_assert(struct_v
>= 8);
1018 bp
= values
[string(epoch_key
)].begin();
1019 decode(cur_epoch
, bp
);
1021 // probably bug 10617; see OSD::load_pgs()
1025 *pepoch
= cur_epoch
;
1029 #pragma GCC diagnostic pop
1030 #pragma GCC diagnostic warning "-Wpragmas"
1032 bool PG::check_log_for_corruption(ObjectStore
*store
)
1034 /// TODO: this method needs to work with the omap log
1038 //! Get the name we're going to save our corrupt page log as
1039 std::string
PG::get_corrupt_pg_log_name() const
1041 const int MAX_BUF
= 512;
1044 time_t my_time(time(NULL
));
1045 const struct tm
*t
= localtime_r(&my_time
, &tm_buf
);
1046 int ret
= strftime(buf
, sizeof(buf
), "corrupt_log_%Y-%m-%d_%k:%M_", t
);
1048 dout(0) << "strftime failed" << dendl
;
1049 return "corrupt_log_unknown_time";
1052 out
+= stringify(info
.pgid
);
1057 ObjectStore
*store
, spg_t pgid
, const coll_t
&coll
,
1058 pg_info_t
&info
, PastIntervals
&past_intervals
,
1062 keys
.insert(string(infover_key
));
1063 keys
.insert(string(info_key
));
1064 keys
.insert(string(biginfo_key
));
1065 keys
.insert(string(fastinfo_key
));
1066 ghobject_t
pgmeta_oid(pgid
.make_pgmeta_oid());
1067 map
<string
,bufferlist
> values
;
1068 auto ch
= store
->open_collection(coll
);
1070 int r
= store
->omap_get_values(ch
, pgmeta_oid
, keys
, &values
);
1071 ceph_assert(r
== 0);
1072 ceph_assert(values
.size() == 3 ||
1073 values
.size() == 4);
1075 auto p
= values
[string(infover_key
)].cbegin();
1076 decode(struct_v
, p
);
1077 ceph_assert(struct_v
>= 10);
1079 p
= values
[string(info_key
)].begin();
1082 p
= values
[string(biginfo_key
)].begin();
1083 decode(past_intervals
, p
);
1084 decode(info
.purged_snaps
, p
);
1086 p
= values
[string(fastinfo_key
)].begin();
1088 pg_fast_info_t fast
;
1090 fast
.try_apply_to(&info
);
1095 void PG::read_state(ObjectStore
*store
)
1097 PastIntervals past_intervals_from_disk
;
1098 pg_info_t info_from_disk
;
1104 past_intervals_from_disk
,
1106 ceph_assert(r
>= 0);
1108 if (info_struct_v
< pg_compat_struct_v
) {
1109 derr
<< "PG needs upgrade, but on-disk data is too old; upgrade to"
1110 << " an older version first." << dendl
;
1111 ceph_abort_msg("PG too old to upgrade");
1114 recovery_state
.init_from_disk_state(
1115 std::move(info_from_disk
),
1116 std::move(past_intervals_from_disk
),
1117 [this, store
] (PGLog
&pglog
) {
1119 pglog
.read_log_and_missing(
1125 cct
->_conf
->osd_ignore_stale_divergent_priors
,
1126 cct
->_conf
->osd_debug_verify_missing_on_start
);
1129 osd
->clog
->error() << oss
.str();
1133 if (info_struct_v
< pg_latest_struct_v
) {
1137 // initialize current mapping
1139 int primary
, up_primary
;
1140 vector
<int> acting
, up
;
1141 get_osdmap()->pg_to_up_acting_osds(
1142 pg_id
.pgid
, &up
, &up_primary
, &acting
, &primary
);
1143 recovery_state
.init_primary_up_acting(
1148 recovery_state
.set_role(OSDMap::calc_pg_role(pg_whoami
, acting
));
1151 // init pool options
1152 store
->set_collection_opts(ch
, pool
.info
.opts
);
1154 PeeringCtx
rctx(ceph_release_t::unknown
);
1155 handle_initialize(rctx
);
1156 // note: we don't activate here because we know the OSD will advance maps
1158 write_if_dirty(rctx
.transaction
);
1159 store
->queue_transaction(ch
, std::move(rctx
.transaction
));
1162 void PG::update_snap_map(
1163 const vector
<pg_log_entry_t
> &log_entries
,
1164 ObjectStore::Transaction
&t
)
1166 for (auto i
= log_entries
.cbegin(); i
!= log_entries
.cend(); ++i
) {
1167 OSDriver::OSTransaction
_t(osdriver
.get_transaction(&t
));
1168 if (i
->soid
.snap
< CEPH_MAXSNAP
) {
1169 if (i
->is_delete()) {
1170 int r
= snap_mapper
.remove_oid(
1174 derr
<< __func__
<< " remove_oid " << i
->soid
<< " failed with " << r
<< dendl
;
1175 // On removal tolerate missing key corruption
1176 ceph_assert(r
== 0 || r
== -ENOENT
);
1177 } else if (i
->is_update()) {
1178 ceph_assert(i
->snaps
.length() > 0);
1179 vector
<snapid_t
> snaps
;
1180 bufferlist snapbl
= i
->snaps
;
1181 auto p
= snapbl
.cbegin();
1185 derr
<< __func__
<< " decode snaps failure on " << *i
<< dendl
;
1188 set
<snapid_t
> _snaps(snaps
.begin(), snaps
.end());
1190 if (i
->is_clone() || i
->is_promote()) {
1191 snap_mapper
.add_oid(
1195 } else if (i
->is_modify()) {
1196 int r
= snap_mapper
.update_snaps(
1201 ceph_assert(r
== 0);
1203 ceph_assert(i
->is_clean());
1211 * filter trimming|trimmed snaps out of snapcontext
1213 void PG::filter_snapc(vector
<snapid_t
> &snaps
)
1215 // nothing needs to trim, we can return immediately
1216 if (snap_trimq
.empty() && info
.purged_snaps
.empty())
1219 bool filtering
= false;
1220 vector
<snapid_t
> newsnaps
;
1221 for (vector
<snapid_t
>::iterator p
= snaps
.begin();
1224 if (snap_trimq
.contains(*p
) || info
.purged_snaps
.contains(*p
)) {
1226 // start building a new vector with what we've seen so far
1227 dout(10) << "filter_snapc filtering " << snaps
<< dendl
;
1228 newsnaps
.insert(newsnaps
.begin(), snaps
.begin(), p
);
1231 dout(20) << "filter_snapc removing trimq|purged snap " << *p
<< dendl
;
1234 newsnaps
.push_back(*p
); // continue building new vector
1238 snaps
.swap(newsnaps
);
1239 dout(10) << "filter_snapc result " << snaps
<< dendl
;
1243 void PG::requeue_object_waiters(map
<hobject_t
, list
<OpRequestRef
>>& m
)
1245 for (auto it
= m
.begin(); it
!= m
.end(); ++it
)
1246 requeue_ops(it
->second
);
1250 void PG::requeue_op(OpRequestRef op
)
1252 auto p
= waiting_for_map
.find(op
->get_source());
1253 if (p
!= waiting_for_map
.end()) {
1254 dout(20) << __func__
<< " " << op
<< " (waiting_for_map " << p
->first
<< ")"
1256 p
->second
.push_front(op
);
1258 dout(20) << __func__
<< " " << op
<< dendl
;
1261 unique_ptr
<OpSchedulerItem::OpQueueable
>(new PGOpItem(info
.pgid
, op
)),
1262 op
->get_req()->get_cost(),
1263 op
->get_req()->get_priority(),
1264 op
->get_req()->get_recv_stamp(),
1265 op
->get_req()->get_source().num(),
1266 get_osdmap_epoch()));
1270 void PG::requeue_ops(list
<OpRequestRef
> &ls
)
1272 for (list
<OpRequestRef
>::reverse_iterator i
= ls
.rbegin();
1280 void PG::requeue_map_waiters()
1282 epoch_t epoch
= get_osdmap_epoch();
1283 auto p
= waiting_for_map
.begin();
1284 while (p
!= waiting_for_map
.end()) {
1285 if (epoch
< p
->second
.front()->min_epoch
) {
1286 dout(20) << __func__
<< " " << p
->first
<< " front op "
1287 << p
->second
.front() << " must still wait, doing nothing"
1291 dout(20) << __func__
<< " " << p
->first
<< " " << p
->second
<< dendl
;
1292 for (auto q
= p
->second
.rbegin(); q
!= p
->second
.rend(); ++q
) {
1294 osd
->enqueue_front(OpSchedulerItem(
1295 unique_ptr
<OpSchedulerItem::OpQueueable
>(new PGOpItem(info
.pgid
, req
)),
1296 req
->get_req()->get_cost(),
1297 req
->get_req()->get_priority(),
1298 req
->get_req()->get_recv_stamp(),
1299 req
->get_req()->get_source().num(),
1302 p
= waiting_for_map
.erase(p
);
1307 bool PG::get_must_scrub() const
1309 dout(20) << __func__
<< " must_scrub? " << (m_planned_scrub
.must_scrub
? "true" : "false") << dendl
;
1310 return m_planned_scrub
.must_scrub
;
1313 unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority
) const
1315 return m_scrubber
->scrub_requeue_priority(with_priority
);
1318 unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority
, unsigned int suggested_priority
) const
1320 return m_scrubber
->scrub_requeue_priority(with_priority
, suggested_priority
);
1323 // ==========================================================================================
1327 * implementation note:
1328 * PG::sched_scrub() is called only once per a specific scrub session.
1329 * That call commits us to the whatever choices are made (deep/shallow, etc').
1330 * Unless failing to start scrubbing, the 'planned scrub' flag-set is 'frozen' into
1331 * PgScrubber's m_flags, then cleared.
1333 bool PG::sched_scrub()
1335 dout(15) << __func__
<< " pg(" << info
.pgid
1336 << (is_active() ? ") <active>" : ") <not-active>")
1337 << (is_clean() ? " <clean>" : " <not-clean>") << dendl
;
1338 ceph_assert(ceph_mutex_is_locked(_lock
));
1339 ceph_assert(!is_scrubbing());
1341 if (!is_primary() || !is_active() || !is_clean()) {
1346 // only applicable to the very first time a scrub event is queued
1347 // (until handled and posted to the scrub FSM)
1348 dout(10) << __func__
<< ": already queued" << dendl
;
1352 // analyse the combination of the requested scrub flags, the osd/pool configuration
1353 // and the PG status to determine whether we should scrub now, and what type of scrub
1355 auto updated_flags
= verify_scrub_mode();
1356 if (!updated_flags
) {
1357 // the stars do not align for starting a scrub for this PG at this time
1358 // (due to configuration or priority issues)
1359 // The reason was already reported by the callee.
1360 dout(10) << __func__
<< ": failed to initiate a scrub" << dendl
;
1364 // try to reserve the local OSD resources. If failing: no harm. We will
1365 // be retried by the OSD later on.
1366 if (!m_scrubber
->reserve_local()) {
1367 dout(10) << __func__
<< ": failed to reserve locally" << dendl
;
1371 // can commit to the updated flags now, as nothing will stop the scrub
1372 m_planned_scrub
= *updated_flags
;
1374 // An interrupted recovery repair could leave this set.
1375 state_clear(PG_STATE_REPAIR
);
1377 // Pass control to the scrubber. It is the scrubber that handles the replicas'
1378 // resources reservations.
1379 m_scrubber
->set_op_parameters(m_planned_scrub
);
1381 dout(10) << __func__
<< ": queueing" << dendl
;
1383 scrub_queued
= true;
1384 osd
->queue_for_scrub(this, Scrub::scrub_prio_t::low_priority
);
1388 double PG::next_deepscrub_interval() const
1390 double deep_scrub_interval
=
1391 pool
.info
.opts
.value_or(pool_opts_t::DEEP_SCRUB_INTERVAL
, 0.0);
1392 if (deep_scrub_interval
<= 0.0)
1393 deep_scrub_interval
= cct
->_conf
->osd_deep_scrub_interval
;
1394 return info
.history
.last_deep_scrub_stamp
+ deep_scrub_interval
;
1397 bool PG::is_time_for_deep(bool allow_deep_scrub
,
1399 bool has_deep_errors
,
1400 const requested_scrub_t
& planned
) const
1402 dout(10) << __func__
<< ": need_auto?" << planned
.need_auto
<< " allow_deep_scrub? " << allow_deep_scrub
<< dendl
;
1404 if (!allow_deep_scrub
)
1407 if (planned
.need_auto
) {
1408 dout(10) << __func__
<< ": need repair after scrub errors" << dendl
;
1412 if (ceph_clock_now() >= next_deepscrub_interval())
1415 if (has_deep_errors
) {
1416 osd
->clog
->info() << "osd." << osd
->whoami
<< " pg " << info
.pgid
1417 << " Deep scrub errors, upgrading scrub to deep-scrub";
1421 // we only flip coins if 'allow_scrub' is asserted. Otherwise - as this function is
1422 // called often, we will probably be deep-scrubbing most of the time.
1424 bool deep_coin_flip
=
1425 (rand() % 100) < cct
->_conf
->osd_deep_scrub_randomize_ratio
* 100;
1427 dout(15) << __func__
<< ": time_for_deep=" << planned
.time_for_deep
1428 << " deep_coin_flip=" << deep_coin_flip
<< dendl
;
1437 bool PG::verify_periodic_scrub_mode(bool allow_deep_scrub
,
1438 bool try_to_auto_repair
,
1439 bool allow_regular_scrub
,
1440 bool has_deep_errors
,
1441 requested_scrub_t
& planned
) const
1444 ceph_assert(!planned
.must_deep_scrub
&& !planned
.must_repair
);
1446 if (!allow_deep_scrub
&& has_deep_errors
) {
1448 << "osd." << osd
->whoami
<< " pg " << info
.pgid
1449 << " Regular scrub skipped due to deep-scrub errors and nodeep-scrub set";
1453 if (allow_deep_scrub
) {
1454 // Initial entry and scheduled scrubs without nodeep_scrub set get here
1456 planned
.time_for_deep
=
1457 is_time_for_deep(allow_deep_scrub
, allow_regular_scrub
, has_deep_errors
, planned
);
1459 if (try_to_auto_repair
) {
1460 if (planned
.time_for_deep
) {
1461 dout(20) << __func__
<< ": auto repair with deep scrubbing" << dendl
;
1462 planned
.auto_repair
= true;
1463 } else if (allow_regular_scrub
) {
1464 dout(20) << __func__
<< ": auto repair with scrubbing, rescrub if errors found"
1466 planned
.deep_scrub_on_error
= true;
1471 dout(20) << __func__
<< " updated flags: " << planned
1472 << " allow_regular_scrub: " << allow_regular_scrub
<< dendl
;
1474 // NOSCRUB so skip regular scrubs
1475 if (!allow_regular_scrub
&& !planned
.time_for_deep
) {
1482 std::optional
<requested_scrub_t
> PG::verify_scrub_mode() const
1484 dout(10) << __func__
<< " processing pg " << info
.pgid
<< dendl
;
1486 bool allow_deep_scrub
= !(get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB
) ||
1487 pool
.info
.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB
));
1488 bool allow_regular_scrub
= !(get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB
) ||
1489 pool
.info
.has_flag(pg_pool_t::FLAG_NOSCRUB
));
1490 bool has_deep_errors
= (info
.stats
.stats
.sum
.num_deep_scrub_errors
> 0);
1491 bool try_to_auto_repair
=
1492 (cct
->_conf
->osd_scrub_auto_repair
&& get_pgbackend()->auto_repair_supported());
1494 auto upd_flags
= m_planned_scrub
;
1496 upd_flags
.time_for_deep
= false;
1497 // Clear these in case user issues the scrub/repair command during
1498 // the scheduling of the scrub/repair (e.g. request reservation)
1499 upd_flags
.deep_scrub_on_error
= false;
1500 upd_flags
.auto_repair
= false;
1502 if (upd_flags
.must_scrub
&& !upd_flags
.must_deep_scrub
&& has_deep_errors
) {
1503 osd
->clog
->error() << "osd." << osd
->whoami
<< " pg " << info
.pgid
1504 << " Regular scrub request, deep-scrub details will be lost";
1507 if (!upd_flags
.must_scrub
) {
1508 // All periodic scrub handling goes here because must_scrub is
1509 // always set for must_deep_scrub and must_repair.
1511 bool can_start_periodic
=
1512 verify_periodic_scrub_mode(allow_deep_scrub
, try_to_auto_repair
,
1513 allow_regular_scrub
, has_deep_errors
, upd_flags
);
1514 if (!can_start_periodic
) {
1515 return std::nullopt
;
1519 // scrubbing while recovering?
1521 bool prevented_by_recovery
=
1522 osd
->is_recovery_active() && !cct
->_conf
->osd_scrub_during_recovery
&&
1523 (!cct
->_conf
->osd_repair_during_recovery
|| !upd_flags
.must_repair
);
1525 if (prevented_by_recovery
) {
1526 dout(20) << __func__
<< ": scrubbing prevented during recovery" << dendl
;
1527 return std::nullopt
;
1530 upd_flags
.need_auto
= false;
1534 void PG::reg_next_scrub()
1536 m_scrubber
->reg_next_scrub(m_planned_scrub
);
1539 void PG::on_info_history_change()
1542 m_scrubber
->unreg_next_scrub();
1543 m_scrubber
->reg_next_scrub(m_planned_scrub
);
1547 void PG::scrub_requested(scrub_level_t scrub_level
, scrub_type_t scrub_type
)
1549 m_scrubber
->scrub_requested(scrub_level
, scrub_type
, m_planned_scrub
);
1552 void PG::clear_ready_to_merge() {
1553 osd
->clear_ready_to_merge(this);
1556 void PG::queue_want_pg_temp(const vector
<int> &wanted
) {
1557 osd
->queue_want_pg_temp(get_pgid().pgid
, wanted
);
1560 void PG::clear_want_pg_temp() {
1561 osd
->remove_want_pg_temp(get_pgid().pgid
);
1564 void PG::on_role_change() {
1565 requeue_ops(waiting_for_peered
);
1566 plpg_on_role_change();
1569 void PG::on_new_interval() {
1570 dout(20) << __func__
<< " scrub_queued was " << scrub_queued
<< " flags: " << m_planned_scrub
<< dendl
;
1571 scrub_queued
= false;
1572 projected_last_update
= eversion_t();
1576 epoch_t
PG::oldest_stored_osdmap() {
1577 return osd
->get_superblock().oldest_map
;
1580 OstreamTemp
PG::get_clog_info() {
1581 return osd
->clog
->info();
1584 OstreamTemp
PG::get_clog_debug() {
1585 return osd
->clog
->debug();
1588 OstreamTemp
PG::get_clog_error() {
1589 return osd
->clog
->error();
1592 void PG::schedule_event_after(
1593 PGPeeringEventRef event
,
1595 std::lock_guard
lock(osd
->recovery_request_lock
);
1596 osd
->recovery_request_timer
.add_event_after(
1598 new QueuePeeringEvt(
1603 void PG::request_local_background_io_reservation(
1605 PGPeeringEventURef on_grant
,
1606 PGPeeringEventURef on_preempt
) {
1607 osd
->local_reserver
.request_reservation(
1609 on_grant
? new QueuePeeringEvt(
1610 this, std::move(on_grant
)) : nullptr,
1612 on_preempt
? new QueuePeeringEvt(
1613 this, std::move(on_preempt
)) : nullptr);
1616 void PG::update_local_background_io_priority(
1617 unsigned priority
) {
1618 osd
->local_reserver
.update_priority(
1623 void PG::cancel_local_background_io_reservation() {
1624 osd
->local_reserver
.cancel_reservation(
1628 void PG::request_remote_recovery_reservation(
1630 PGPeeringEventURef on_grant
,
1631 PGPeeringEventURef on_preempt
) {
1632 osd
->remote_reserver
.request_reservation(
1634 on_grant
? new QueuePeeringEvt(
1635 this, std::move(on_grant
)) : nullptr,
1637 on_preempt
? new QueuePeeringEvt(
1638 this, std::move(on_preempt
)) : nullptr);
1641 void PG::cancel_remote_recovery_reservation() {
1642 osd
->remote_reserver
.cancel_reservation(
1646 void PG::schedule_event_on_commit(
1647 ObjectStore::Transaction
&t
,
1648 PGPeeringEventRef on_commit
)
1650 t
.register_on_commit(new QueuePeeringEvt(this, on_commit
));
1653 void PG::on_activate(interval_set
<snapid_t
> snaps
)
1655 ceph_assert(!m_scrubber
->are_callbacks_pending());
1656 ceph_assert(callbacks_for_degraded_object
.empty());
1658 release_pg_backoffs();
1659 projected_last_update
= info
.last_update
;
1662 void PG::on_active_exit()
1664 backfill_reserving
= false;
1668 void PG::on_active_advmap(const OSDMapRef
&osdmap
)
1670 const auto& new_removed_snaps
= osdmap
->get_new_removed_snaps();
1671 auto i
= new_removed_snaps
.find(get_pgid().pool());
1672 if (i
!= new_removed_snaps
.end()) {
1674 for (auto j
: i
->second
) {
1675 if (snap_trimq
.intersects(j
.first
, j
.second
)) {
1676 decltype(snap_trimq
) added
, overlap
;
1677 added
.insert(j
.first
, j
.second
);
1678 overlap
.intersection_of(snap_trimq
, added
);
1679 derr
<< __func__
<< " removed_snaps already contains "
1680 << overlap
<< dendl
;
1682 snap_trimq
.union_of(added
);
1684 snap_trimq
.insert(j
.first
, j
.second
);
1687 dout(10) << __func__
<< " new removed_snaps " << i
->second
1688 << ", snap_trimq now " << snap_trimq
<< dendl
;
1689 ceph_assert(!bad
|| !cct
->_conf
->osd_debug_verify_cached_snaps
);
1692 const auto& new_purged_snaps
= osdmap
->get_new_purged_snaps();
1693 auto j
= new_purged_snaps
.find(get_pgid().pgid
.pool());
1694 if (j
!= new_purged_snaps
.end()) {
1696 for (auto k
: j
->second
) {
1697 if (!recovery_state
.get_info().purged_snaps
.contains(k
.first
, k
.second
)) {
1698 interval_set
<snapid_t
> rm
, overlap
;
1699 rm
.insert(k
.first
, k
.second
);
1700 overlap
.intersection_of(recovery_state
.get_info().purged_snaps
, rm
);
1701 derr
<< __func__
<< " purged_snaps does not contain "
1702 << rm
<< ", only " << overlap
<< dendl
;
1703 recovery_state
.adjust_purged_snaps(
1704 [&overlap
](auto &purged_snaps
) {
1705 purged_snaps
.subtract(overlap
);
1707 // This can currently happen in the normal (if unlikely) course of
1708 // events. Because adding snaps to purged_snaps does not increase
1709 // the pg version or add a pg log entry, we don't reliably propagate
1710 // purged_snaps additions to other OSDs.
1713 // - primary and replicas update purged_snaps
1714 // - no object updates
1715 // - pg mapping changes, new primary on different node
1716 // - new primary pg version == eversion_t(), so info is not
1720 recovery_state
.adjust_purged_snaps(
1721 [&k
](auto &purged_snaps
) {
1722 purged_snaps
.erase(k
.first
, k
.second
);
1726 dout(10) << __func__
<< " new purged_snaps " << j
->second
1727 << ", now " << recovery_state
.get_info().purged_snaps
<< dendl
;
1728 ceph_assert(!bad
|| !cct
->_conf
->osd_debug_verify_cached_snaps
);
1732 void PG::queue_snap_retrim(snapid_t snap
)
1736 dout(10) << __func__
<< " snap " << snap
<< " - not active and primary"
1740 if (!snap_trimq
.contains(snap
)) {
1741 snap_trimq
.insert(snap
);
1742 snap_trimq_repeat
.insert(snap
);
1743 dout(20) << __func__
<< " snap " << snap
1744 << ", trimq now " << snap_trimq
1745 << ", repeat " << snap_trimq_repeat
<< dendl
;
1748 dout(20) << __func__
<< " snap " << snap
1749 << " already in trimq " << snap_trimq
<< dendl
;
1753 void PG::on_active_actmap()
1755 if (cct
->_conf
->osd_check_for_log_corruption
)
1756 check_log_for_corruption(osd
->store
);
1759 if (recovery_state
.is_active()) {
1760 dout(10) << "Active: kicking snap trim" << dendl
;
1764 if (recovery_state
.is_peered() &&
1765 !recovery_state
.is_clean() &&
1766 !recovery_state
.get_osdmap()->test_flag(CEPH_OSDMAP_NOBACKFILL
) &&
1767 (!recovery_state
.get_osdmap()->test_flag(CEPH_OSDMAP_NOREBALANCE
) ||
1768 recovery_state
.is_degraded())) {
1773 void PG::on_backfill_reserved()
1775 backfill_reserving
= false;
1779 void PG::on_backfill_canceled()
1781 if (!waiting_on_backfill
.empty()) {
1782 waiting_on_backfill
.clear();
1783 finish_recovery_op(hobject_t::get_max());
1787 void PG::on_recovery_reserved()
1792 void PG::set_not_ready_to_merge_target(pg_t pgid
, pg_t src
)
1794 osd
->set_not_ready_to_merge_target(pgid
, src
);
1797 void PG::set_not_ready_to_merge_source(pg_t pgid
)
1799 osd
->set_not_ready_to_merge_source(pgid
);
1802 void PG::set_ready_to_merge_target(eversion_t lu
, epoch_t les
, epoch_t lec
)
1804 osd
->set_ready_to_merge_target(this, lu
, les
, lec
);
1807 void PG::set_ready_to_merge_source(eversion_t lu
)
1809 osd
->set_ready_to_merge_source(this, lu
);
1812 void PG::send_pg_created(pg_t pgid
)
1814 osd
->send_pg_created(pgid
);
1817 ceph::signedspan
PG::get_mnow()
1819 return osd
->get_mnow();
1822 HeartbeatStampsRef
PG::get_hb_stamps(int peer
)
1824 return osd
->get_hb_stamps(peer
);
1827 void PG::schedule_renew_lease(epoch_t lpr
, ceph::timespan delay
)
1829 auto spgid
= info
.pgid
;
1831 osd
->mono_timer
.add_event(
1834 o
->queue_renew_lease(lpr
, spgid
);
1838 void PG::queue_check_readable(epoch_t lpr
, ceph::timespan delay
)
1840 osd
->queue_check_readable(info
.pgid
, lpr
, delay
);
1843 void PG::rebuild_missing_set_with_deletes(PGLog
&pglog
)
1845 pglog
.rebuild_missing_set_with_deletes(
1848 recovery_state
.get_info());
1851 void PG::on_activate_committed()
1853 if (!is_primary()) {
1855 if (recovery_state
.needs_flush() == 0) {
1856 requeue_ops(waiting_for_peered
);
1857 } else if (!waiting_for_peered
.empty()) {
1858 dout(10) << __func__
<< " flushes in progress, moving "
1859 << waiting_for_peered
.size() << " items to waiting_for_flush"
1861 ceph_assert(waiting_for_flush
.empty());
1862 waiting_for_flush
.swap(waiting_for_peered
);
1867 // Compute pending backfill data
1868 static int64_t pending_backfill(CephContext
*cct
, int64_t bf_bytes
, int64_t local_bytes
)
1870 lgeneric_dout(cct
, 20) << __func__
<< " Adjust local usage "
1871 << (local_bytes
>> 10) << "KiB"
1872 << " primary usage " << (bf_bytes
>> 10)
1875 return std::max((int64_t)0, bf_bytes
- local_bytes
);
1879 // We can zero the value of primary num_bytes as just an atomic.
1880 // However, setting above zero reserves space for backfill and requires
1881 // the OSDService::stat_lock which protects all OSD usage
1882 bool PG::try_reserve_recovery_space(
1883 int64_t primary_bytes
, int64_t local_bytes
) {
1884 // Use tentative_bacfill_full() to make sure enough
1885 // space is available to handle target bytes from primary.
1887 // TODO: If we passed num_objects from primary we could account for
1888 // an estimate of the metadata overhead.
1890 // TODO: If we had compressed_allocated and compressed_original from primary
1891 // we could compute compression ratio and adjust accordingly.
1893 // XXX: There is no way to get omap overhead and this would only apply
1894 // to whatever possibly different partition that is storing the database.
1896 // update_osd_stat() from heartbeat will do this on a new
1897 // statfs using ps->primary_bytes.
1898 uint64_t pending_adjustment
= 0;
1899 if (primary_bytes
) {
1900 // For erasure coded pool overestimate by a full stripe per object
1901 // because we don't know how each objected rounded to the nearest stripe
1902 if (pool
.info
.is_erasure()) {
1903 primary_bytes
/= (int)get_pgbackend()->get_ec_data_chunk_count();
1904 primary_bytes
+= get_pgbackend()->get_ec_stripe_chunk_size() *
1905 info
.stats
.stats
.sum
.num_objects
;
1906 local_bytes
/= (int)get_pgbackend()->get_ec_data_chunk_count();
1907 local_bytes
+= get_pgbackend()->get_ec_stripe_chunk_size() *
1908 info
.stats
.stats
.sum
.num_objects
;
1910 pending_adjustment
= pending_backfill(
1914 dout(10) << __func__
<< " primary_bytes " << (primary_bytes
>> 10)
1916 << " local " << (local_bytes
>> 10) << "KiB"
1917 << " pending_adjustments " << (pending_adjustment
>> 10) << "KiB"
1921 // This lock protects not only the stats OSDService but also setting the
1922 // pg primary_bytes. That's why we don't immediately unlock
1923 std::lock_guard l
{osd
->stat_lock
};
1924 osd_stat_t cur_stat
= osd
->osd_stat
;
1925 if (cct
->_conf
->osd_debug_reject_backfill_probability
> 0 &&
1926 (rand()%1000 < (cct
->_conf
->osd_debug_reject_backfill_probability
*1000.0))) {
1927 dout(10) << "backfill reservation rejected: failure injection"
1930 } else if (!cct
->_conf
->osd_debug_skip_full_check_in_backfill_reservation
&&
1931 osd
->tentative_backfill_full(this, pending_adjustment
, cur_stat
)) {
1932 dout(10) << "backfill reservation rejected: backfill full"
1936 // Don't reserve space if skipped reservation check, this is used
1937 // to test the other backfill full check AND in case a corruption
1938 // of num_bytes requires ignoring that value and trying the
1940 if (primary_bytes
&&
1941 !cct
->_conf
->osd_debug_skip_full_check_in_backfill_reservation
) {
1942 primary_num_bytes
.store(primary_bytes
);
1943 local_num_bytes
.store(local_bytes
);
1945 unreserve_recovery_space();
1951 void PG::unreserve_recovery_space() {
1952 primary_num_bytes
.store(0);
1953 local_num_bytes
.store(0);
1956 void PG::_scan_rollback_obs(const vector
<ghobject_t
> &rollback_obs
)
1958 ObjectStore::Transaction t
;
1959 eversion_t trimmed_to
= recovery_state
.get_last_rollback_info_trimmed_to_applied();
1960 for (vector
<ghobject_t
>::const_iterator i
= rollback_obs
.begin();
1961 i
!= rollback_obs
.end();
1963 if (i
->generation
< trimmed_to
.version
) {
1964 dout(10) << __func__
<< "osd." << osd
->whoami
1965 << " pg " << info
.pgid
1966 << " found obsolete rollback obj "
1967 << *i
<< " generation < trimmed_to "
1969 << "...repaired" << dendl
;
1974 derr
<< __func__
<< ": queueing trans to clean up obsolete rollback objs"
1976 osd
->store
->queue_transaction(ch
, std::move(t
), NULL
);
1981 void PG::_repair_oinfo_oid(ScrubMap
&smap
)
1983 for (map
<hobject_t
, ScrubMap::object
>::reverse_iterator i
= smap
.objects
.rbegin();
1984 i
!= smap
.objects
.rend();
1986 const hobject_t
&hoid
= i
->first
;
1987 ScrubMap::object
&o
= i
->second
;
1990 if (o
.attrs
.find(OI_ATTR
) == o
.attrs
.end()) {
1993 bl
.push_back(o
.attrs
[OI_ATTR
]);
2000 if (oi
.soid
!= hoid
) {
2001 ObjectStore::Transaction t
;
2002 OSDriver::OSTransaction
_t(osdriver
.get_transaction(&t
));
2003 osd
->clog
->error() << "osd." << osd
->whoami
2004 << " found object info error on pg "
2006 << " oid " << hoid
<< " oid in object info: "
2012 encode(oi
, bl
, get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD
, nullptr));
2014 bufferptr
bp(bl
.c_str(), bl
.length());
2015 o
.attrs
[OI_ATTR
] = bp
;
2017 t
.setattr(coll
, ghobject_t(hoid
), OI_ATTR
, bl
);
2018 int r
= osd
->store
->queue_transaction(ch
, std::move(t
));
2020 derr
<< __func__
<< ": queue_transaction got " << cpp_strerror(r
)
2027 void PG::repair_object(
2028 const hobject_t
&soid
,
2029 const list
<pair
<ScrubMap::object
, pg_shard_t
> > &ok_peers
,
2030 const set
<pg_shard_t
> &bad_peers
)
2032 set
<pg_shard_t
> ok_shards
;
2033 for (auto &&peer
: ok_peers
) ok_shards
.insert(peer
.second
);
2035 dout(10) << "repair_object " << soid
2036 << " bad_peers osd.{" << bad_peers
<< "},"
2037 << " ok_peers osd.{" << ok_shards
<< "}" << dendl
;
2039 const ScrubMap::object
&po
= ok_peers
.back().first
;
2044 if (po
.attrs
.count(OI_ATTR
)) {
2045 bv
.push_back(po
.attrs
.find(OI_ATTR
)->second
);
2047 auto bliter
= bv
.cbegin();
2050 dout(0) << __func__
<< ": Need version of replica, bad object_info_t: "
2055 if (bad_peers
.count(get_primary())) {
2056 // We should only be scrubbing if the PG is clean.
2057 ceph_assert(waiting_for_unreadable_object
.empty());
2058 dout(10) << __func__
<< ": primary = " << get_primary() << dendl
;
2061 /* No need to pass ok_peers, they must not be missing the object, so
2062 * force_object_missing will add them to missing_loc anyway */
2063 recovery_state
.force_object_missing(bad_peers
, soid
, oi
.version
);
2066 void PG::forward_scrub_event(ScrubAPI fn
, epoch_t epoch_queued
)
2068 dout(20) << __func__
<< " queued at: " << epoch_queued
<< dendl
;
2069 if (is_active() && m_scrubber
) {
2070 ((*m_scrubber
).*fn
)(epoch_queued
);
2072 // pg might be in the process of being deleted
2073 dout(5) << __func__
<< " refusing to forward. " << (is_clean() ? "(clean) " : "(not clean) ") <<
2074 (is_active() ? "(active) " : "(not active) ") << dendl
;
2078 void PG::replica_scrub(OpRequestRef op
, ThreadPool::TPHandle
& handle
)
2080 dout(10) << __func__
<< " (op)" << dendl
;
2082 m_scrubber
->replica_scrub_op(op
);
2085 void PG::scrub(epoch_t epoch_queued
, ThreadPool::TPHandle
& handle
)
2087 dout(10) << __func__
<< " queued at: " << epoch_queued
<< dendl
;
2089 scrub_queued
= false;
2090 forward_scrub_event(&ScrubPgIF::initiate_regular_scrub
, epoch_queued
);
2093 // note: no need to secure OSD resources for a recovery scrub
2094 void PG::recovery_scrub(epoch_t epoch_queued
,
2095 [[maybe_unused
]] ThreadPool::TPHandle
& handle
)
2097 dout(10) << __func__
<< " queued at: " << epoch_queued
<< dendl
;
2099 scrub_queued
= false;
2100 forward_scrub_event(&ScrubPgIF::initiate_scrub_after_repair
, epoch_queued
);
2103 void PG::replica_scrub(epoch_t epoch_queued
,
2104 [[maybe_unused
]] ThreadPool::TPHandle
& handle
)
2106 dout(10) << __func__
<< " queued at: " << epoch_queued
2107 << (is_primary() ? " (primary)" : " (replica)") << dendl
;
2108 scrub_queued
= false;
2109 forward_scrub_event(&ScrubPgIF::send_start_replica
, epoch_queued
);
2112 void PG::scrub_send_scrub_resched(epoch_t epoch_queued
,
2113 [[maybe_unused
]] ThreadPool::TPHandle
& handle
)
2115 dout(10) << __func__
<< " queued at: " << epoch_queued
<< dendl
;
2116 scrub_queued
= false;
2117 forward_scrub_event(&ScrubPgIF::send_scrub_resched
, epoch_queued
);
2120 void PG::scrub_send_resources_granted(epoch_t epoch_queued
,
2121 [[maybe_unused
]] ThreadPool::TPHandle
& handle
)
2123 dout(10) << __func__
<< " queued at: " << epoch_queued
<< dendl
;
2124 forward_scrub_event(&ScrubPgIF::send_remotes_reserved
, epoch_queued
);
2127 void PG::scrub_send_resources_denied(epoch_t epoch_queued
,
2128 [[maybe_unused
]] ThreadPool::TPHandle
& handle
)
2130 dout(10) << __func__
<< " queued at: " << epoch_queued
<< dendl
;
2131 forward_scrub_event(&ScrubPgIF::send_reservation_failure
, epoch_queued
);
2134 void PG::replica_scrub_resched(epoch_t epoch_queued
,
2135 [[maybe_unused
]] ThreadPool::TPHandle
& handle
)
2137 dout(10) << __func__
<< " queued at: " << epoch_queued
<< dendl
;
2138 scrub_queued
= false;
2139 forward_scrub_event(&ScrubPgIF::send_sched_replica
, epoch_queued
);
2142 void PG::scrub_send_pushes_update(epoch_t epoch_queued
,
2143 [[maybe_unused
]] ThreadPool::TPHandle
& handle
)
2145 dout(10) << __func__
<< " queued at: " << epoch_queued
<< dendl
;
2146 forward_scrub_event(&ScrubPgIF::active_pushes_notification
, epoch_queued
);
2149 void PG::scrub_send_replica_pushes(epoch_t epoch_queued
,
2150 [[maybe_unused
]] ThreadPool::TPHandle
& handle
)
2152 dout(15) << __func__
<< " queued at: " << epoch_queued
<< dendl
;
2153 forward_scrub_event(&ScrubPgIF::send_replica_pushes_upd
, epoch_queued
);
2156 void PG::scrub_send_applied_update(epoch_t epoch_queued
,
2157 [[maybe_unused
]] ThreadPool::TPHandle
& handle
)
2159 dout(15) << __func__
<< " queued at: " << epoch_queued
<< dendl
;
2160 forward_scrub_event(&ScrubPgIF::update_applied_notification
, epoch_queued
);
2163 void PG::scrub_send_unblocking(epoch_t epoch_queued
,
2164 [[maybe_unused
]] ThreadPool::TPHandle
& handle
)
2166 dout(15) << __func__
<< " queued at: " << epoch_queued
<< dendl
;
2167 forward_scrub_event(&ScrubPgIF::send_scrub_unblock
, epoch_queued
);
2170 void PG::scrub_send_digest_update(epoch_t epoch_queued
,
2171 [[maybe_unused
]] ThreadPool::TPHandle
& handle
)
2173 dout(15) << __func__
<< " queued at: " << epoch_queued
<< dendl
;
2174 forward_scrub_event(&ScrubPgIF::digest_update_notification
, epoch_queued
);
2177 void PG::scrub_send_replmaps_ready(epoch_t epoch_queued
,
2178 [[maybe_unused
]] ThreadPool::TPHandle
& handle
)
2180 dout(15) << __func__
<< " queued at: " << epoch_queued
<< dendl
;
2181 forward_scrub_event(&ScrubPgIF::send_replica_maps_ready
, epoch_queued
);
2184 bool PG::ops_blocked_by_scrub() const
2186 return !waiting_for_scrub
.empty();
2189 Scrub::scrub_prio_t
PG::is_scrub_blocking_ops() const
2191 return waiting_for_scrub
.empty() ? Scrub::scrub_prio_t::low_priority
2192 : Scrub::scrub_prio_t::high_priority
;
2195 bool PG::old_peering_msg(epoch_t reply_epoch
, epoch_t query_epoch
)
2197 if (auto last_reset
= get_last_peering_reset();
2198 last_reset
> reply_epoch
|| last_reset
> query_epoch
) {
2199 dout(10) << "old_peering_msg reply_epoch " << reply_epoch
<< " query_epoch "
2200 << query_epoch
<< " last_peering_reset " << last_reset
<< dendl
;
2209 FlushState(PG
*pg
, epoch_t epoch
) : pg(pg
), epoch(epoch
) {}
2211 std::scoped_lock l
{*pg
};
2212 if (!pg
->pg_has_reset_since(epoch
)) {
2213 pg
->recovery_state
.complete_flush();
2217 typedef std::shared_ptr
<FlushState
> FlushStateRef
;
2219 void PG::start_flush_on_transaction(ObjectStore::Transaction
&t
)
2221 // flush in progress ops
2222 FlushStateRef
flush_trigger (std::make_shared
<FlushState
>(
2223 this, get_osdmap_epoch()));
2224 t
.register_on_applied(new ContainerContext
<FlushStateRef
>(flush_trigger
));
2225 t
.register_on_commit(new ContainerContext
<FlushStateRef
>(flush_trigger
));
2228 bool PG::try_flush_or_schedule_async()
2230 Context
*c
= new QueuePeeringEvt(
2231 this, get_osdmap_epoch(), PeeringState::IntervalFlush());
2232 if (!ch
->flush_commit(c
)) {
2240 ostream
& operator<<(ostream
& out
, const PG
& pg
)
2242 out
<< pg
.recovery_state
;
2244 // listing all scrub-related flags - both current and "planned next scrub"
2245 if (pg
.is_scrubbing()) {
2246 out
<< *pg
.m_scrubber
;
2248 out
<< pg
.m_planned_scrub
;
2250 if (pg
.recovery_ops_active
)
2251 out
<< " rops=" << pg
.recovery_ops_active
;
2253 //out << " (" << pg.pg_log.get_tail() << "," << pg.pg_log.get_head() << "]";
2254 if (pg
.recovery_state
.have_missing()) {
2255 out
<< " m=" << pg
.recovery_state
.get_num_missing();
2256 if (pg
.is_primary()) {
2257 uint64_t unfound
= pg
.recovery_state
.get_num_unfound();
2259 out
<< " u=" << unfound
;
2262 if (!pg
.is_clean()) {
2263 out
<< " mbc=" << pg
.recovery_state
.get_missing_by_count();
2265 if (!pg
.snap_trimq
.empty()) {
2267 // only show a count if the set is large
2268 if (pg
.snap_trimq
.num_intervals() > 16) {
2269 out
<< pg
.snap_trimq
.size();
2270 if (!pg
.snap_trimq_repeat
.empty()) {
2271 out
<< "(" << pg
.snap_trimq_repeat
.size() << ")";
2274 out
<< pg
.snap_trimq
;
2275 if (!pg
.snap_trimq_repeat
.empty()) {
2276 out
<< "(" << pg
.snap_trimq_repeat
<< ")";
2280 if (!pg
.recovery_state
.get_info().purged_snaps
.empty()) {
2281 out
<< " ps="; // snap trim queue / purged snaps
2282 if (pg
.recovery_state
.get_info().purged_snaps
.num_intervals() > 16) {
2283 out
<< pg
.recovery_state
.get_info().purged_snaps
.size();
2285 out
<< pg
.recovery_state
.get_info().purged_snaps
;
2293 bool PG::can_discard_op(OpRequestRef
& op
)
2295 auto m
= op
->get_req
<MOSDOp
>();
2296 if (cct
->_conf
->osd_discard_disconnected_ops
&& OSD::op_is_discardable(m
)) {
2297 dout(20) << " discard " << *m
<< dendl
;
2301 if (m
->get_map_epoch() < info
.history
.same_primary_since
) {
2302 dout(7) << " changed after " << m
->get_map_epoch()
2303 << ", dropping " << *m
<< dendl
;
2307 if ((m
->get_flags() & (CEPH_OSD_FLAG_BALANCE_READS
|
2308 CEPH_OSD_FLAG_LOCALIZE_READS
)) &&
2310 m
->get_map_epoch() < info
.history
.same_interval_since
) {
2311 // Note: the Objecter will resend on interval change without the primary
2312 // changing if it actually sent to a replica. If the primary hasn't
2313 // changed since the send epoch, we got it, and we're primary, it won't
2314 // have resent even if the interval did change as it sent it to the primary
2320 if (m
->get_connection()->has_feature(CEPH_FEATURE_RESEND_ON_SPLIT
)) {
2321 // >= luminous client
2322 if (m
->get_connection()->has_feature(CEPH_FEATURE_SERVER_NAUTILUS
)) {
2323 // >= nautilus client
2324 if (m
->get_map_epoch() < pool
.info
.get_last_force_op_resend()) {
2325 dout(7) << __func__
<< " sent before last_force_op_resend "
2326 << pool
.info
.last_force_op_resend
2327 << ", dropping" << *m
<< dendl
;
2331 // == < nautilus client (luminous or mimic)
2332 if (m
->get_map_epoch() < pool
.info
.get_last_force_op_resend_prenautilus()) {
2333 dout(7) << __func__
<< " sent before last_force_op_resend_prenautilus "
2334 << pool
.info
.last_force_op_resend_prenautilus
2335 << ", dropping" << *m
<< dendl
;
2339 if (m
->get_map_epoch() < info
.history
.last_epoch_split
) {
2340 dout(7) << __func__
<< " pg split in "
2341 << info
.history
.last_epoch_split
<< ", dropping" << dendl
;
2344 } else if (m
->get_connection()->has_feature(CEPH_FEATURE_OSD_POOLRESEND
)) {
2345 // < luminous client
2346 if (m
->get_map_epoch() < pool
.info
.get_last_force_op_resend_preluminous()) {
2347 dout(7) << __func__
<< " sent before last_force_op_resend_preluminous "
2348 << pool
.info
.last_force_op_resend_preluminous
2349 << ", dropping" << *m
<< dendl
;
2357 template<typename T
, int MSGTYPE
>
2358 bool PG::can_discard_replica_op(OpRequestRef
& op
)
2360 auto m
= op
->get_req
<T
>();
2361 ceph_assert(m
->get_type() == MSGTYPE
);
2363 int from
= m
->get_source().num();
2365 // if a repop is replied after a replica goes down in a new osdmap, and
2366 // before the pg advances to this new osdmap, the repop replies before this
2367 // repop can be discarded by that replica OSD, because the primary resets the
2368 // connection to it when handling the new osdmap marking it down, and also
2369 // resets the messenger sesssion when the replica reconnects. to avoid the
2370 // out-of-order replies, the messages from that replica should be discarded.
2371 OSDMapRef next_map
= osd
->get_next_osdmap();
2372 if (next_map
->is_down(from
)) {
2373 dout(20) << " " << __func__
<< " dead for nextmap is down " << from
<< dendl
;
2376 /* Mostly, this overlaps with the old_peering_msg
2377 * condition. An important exception is pushes
2378 * sent by replicas not in the acting set, since
2379 * if such a replica goes down it does not cause
2380 * a new interval. */
2381 if (next_map
->get_down_at(from
) >= m
->map_epoch
) {
2382 dout(20) << " " << __func__
<< " dead for 'get_down_at' " << from
<< dendl
;
2387 // if pg changes _at all_, we reset and repeer!
2388 if (old_peering_msg(m
->map_epoch
, m
->map_epoch
)) {
2389 dout(10) << "can_discard_replica_op pg changed " << info
.history
2390 << " after " << m
->map_epoch
2391 << ", dropping" << dendl
;
2397 bool PG::can_discard_scan(OpRequestRef op
)
2399 auto m
= op
->get_req
<MOSDPGScan
>();
2400 ceph_assert(m
->get_type() == MSG_OSD_PG_SCAN
);
2402 if (old_peering_msg(m
->map_epoch
, m
->query_epoch
)) {
2403 dout(10) << " got old scan, ignoring" << dendl
;
2409 bool PG::can_discard_backfill(OpRequestRef op
)
2411 auto m
= op
->get_req
<MOSDPGBackfill
>();
2412 ceph_assert(m
->get_type() == MSG_OSD_PG_BACKFILL
);
2414 if (old_peering_msg(m
->map_epoch
, m
->query_epoch
)) {
2415 dout(10) << " got old backfill, ignoring" << dendl
;
2423 bool PG::can_discard_request(OpRequestRef
& op
)
2425 switch (op
->get_req()->get_type()) {
2426 case CEPH_MSG_OSD_OP
:
2427 return can_discard_op(op
);
2428 case CEPH_MSG_OSD_BACKOFF
:
2429 return false; // never discard
2431 return can_discard_replica_op
<MOSDRepOp
, MSG_OSD_REPOP
>(op
);
2432 case MSG_OSD_PG_PUSH
:
2433 return can_discard_replica_op
<MOSDPGPush
, MSG_OSD_PG_PUSH
>(op
);
2434 case MSG_OSD_PG_PULL
:
2435 return can_discard_replica_op
<MOSDPGPull
, MSG_OSD_PG_PULL
>(op
);
2436 case MSG_OSD_PG_PUSH_REPLY
:
2437 return can_discard_replica_op
<MOSDPGPushReply
, MSG_OSD_PG_PUSH_REPLY
>(op
);
2438 case MSG_OSD_REPOPREPLY
:
2439 return can_discard_replica_op
<MOSDRepOpReply
, MSG_OSD_REPOPREPLY
>(op
);
2440 case MSG_OSD_PG_RECOVERY_DELETE
:
2441 return can_discard_replica_op
<MOSDPGRecoveryDelete
, MSG_OSD_PG_RECOVERY_DELETE
>(op
);
2443 case MSG_OSD_PG_RECOVERY_DELETE_REPLY
:
2444 return can_discard_replica_op
<MOSDPGRecoveryDeleteReply
, MSG_OSD_PG_RECOVERY_DELETE_REPLY
>(op
);
2446 case MSG_OSD_EC_WRITE
:
2447 return can_discard_replica_op
<MOSDECSubOpWrite
, MSG_OSD_EC_WRITE
>(op
);
2448 case MSG_OSD_EC_WRITE_REPLY
:
2449 return can_discard_replica_op
<MOSDECSubOpWriteReply
, MSG_OSD_EC_WRITE_REPLY
>(op
);
2450 case MSG_OSD_EC_READ
:
2451 return can_discard_replica_op
<MOSDECSubOpRead
, MSG_OSD_EC_READ
>(op
);
2452 case MSG_OSD_EC_READ_REPLY
:
2453 return can_discard_replica_op
<MOSDECSubOpReadReply
, MSG_OSD_EC_READ_REPLY
>(op
);
2454 case MSG_OSD_REP_SCRUB
:
2455 return can_discard_replica_op
<MOSDRepScrub
, MSG_OSD_REP_SCRUB
>(op
);
2456 case MSG_OSD_SCRUB_RESERVE
:
2457 return can_discard_replica_op
<MOSDScrubReserve
, MSG_OSD_SCRUB_RESERVE
>(op
);
2458 case MSG_OSD_REP_SCRUBMAP
:
2459 return can_discard_replica_op
<MOSDRepScrubMap
, MSG_OSD_REP_SCRUBMAP
>(op
);
2460 case MSG_OSD_PG_UPDATE_LOG_MISSING
:
2461 return can_discard_replica_op
<
2462 MOSDPGUpdateLogMissing
, MSG_OSD_PG_UPDATE_LOG_MISSING
>(op
);
2463 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY
:
2464 return can_discard_replica_op
<
2465 MOSDPGUpdateLogMissingReply
, MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY
>(op
);
2467 case MSG_OSD_PG_SCAN
:
2468 return can_discard_scan(op
);
2469 case MSG_OSD_PG_BACKFILL
:
2470 return can_discard_backfill(op
);
2471 case MSG_OSD_PG_BACKFILL_REMOVE
:
2472 return can_discard_replica_op
<MOSDPGBackfillRemove
,
2473 MSG_OSD_PG_BACKFILL_REMOVE
>(op
);
2478 void PG::do_peering_event(PGPeeringEventRef evt
, PeeringCtx
&rctx
)
2480 dout(10) << __func__
<< ": " << evt
->get_desc() << dendl
;
2481 ceph_assert(have_same_or_newer_map(evt
->get_epoch_sent()));
2482 if (old_peering_evt(evt
)) {
2483 dout(10) << "discard old " << evt
->get_desc() << dendl
;
2485 recovery_state
.handle_event(evt
, &rctx
);
2487 // write_if_dirty regardless of path above to ensure we capture any work
2488 // done by OSD::advance_pg().
2489 write_if_dirty(rctx
.transaction
);
2492 void PG::queue_peering_event(PGPeeringEventRef evt
)
2494 if (old_peering_evt(evt
))
2496 osd
->osd
->enqueue_peering_evt(info
.pgid
, evt
);
2499 void PG::queue_null(epoch_t msg_epoch
,
2500 epoch_t query_epoch
)
2502 dout(10) << "null" << dendl
;
2503 queue_peering_event(
2504 PGPeeringEventRef(std::make_shared
<PGPeeringEvent
>(msg_epoch
, query_epoch
,
2508 void PG::find_unfound(epoch_t queued
, PeeringCtx
&rctx
)
2511 * if we couldn't start any recovery ops and things are still
2512 * unfound, see if we can discover more missing object locations.
2513 * It may be that our initial locations were bad and we errored
2514 * out while trying to pull.
2516 if (!recovery_state
.discover_all_missing(rctx
)) {
2518 if (state_test(PG_STATE_BACKFILLING
)) {
2519 auto evt
= PGPeeringEventRef(
2523 PeeringState::UnfoundBackfill()));
2524 queue_peering_event(evt
);
2525 action
= "in backfill";
2526 } else if (state_test(PG_STATE_RECOVERING
)) {
2527 auto evt
= PGPeeringEventRef(
2531 PeeringState::UnfoundRecovery()));
2532 queue_peering_event(evt
);
2533 action
= "in recovery";
2535 action
= "already out of recovery/backfill";
2537 dout(10) << __func__
<< ": no luck, giving up on this pg for now (" << action
<< ")" << dendl
;
2539 dout(10) << __func__
<< ": no luck, giving up on this pg for now (queue_recovery)" << dendl
;
2544 void PG::handle_advance_map(
2545 OSDMapRef osdmap
, OSDMapRef lastmap
,
2546 vector
<int>& newup
, int up_primary
,
2547 vector
<int>& newacting
, int acting_primary
,
2550 dout(10) << __func__
<< ": " << osdmap
->get_epoch() << dendl
;
2551 osd_shard
->update_pg_epoch(pg_slot
, osdmap
->get_epoch());
2552 recovery_state
.advance_map(
2562 void PG::handle_activate_map(PeeringCtx
&rctx
)
2564 dout(10) << __func__
<< ": " << get_osdmap()->get_epoch()
2566 recovery_state
.activate_map(rctx
);
2568 requeue_map_waiters();
2571 void PG::handle_initialize(PeeringCtx
&rctx
)
2573 dout(10) << __func__
<< dendl
;
2574 PeeringState::Initialize evt
;
2575 recovery_state
.handle_event(evt
, &rctx
);
2579 void PG::handle_query_state(Formatter
*f
)
2581 dout(10) << "handle_query_state" << dendl
;
2582 PeeringState::QueryState
q(f
);
2583 recovery_state
.handle_event(q
, 0);
2585 // This code has moved to after the close of recovery_state array.
2586 // I don't think that scrub is a recovery state
2587 if (is_primary() && is_active() && m_scrubber
&& m_scrubber
->is_scrub_active()) {
2588 m_scrubber
->handle_query_state(f
);
2592 void PG::init_collection_pool_opts()
2594 auto r
= osd
->store
->set_collection_opts(ch
, pool
.info
.opts
);
2595 if (r
< 0 && r
!= -EOPNOTSUPP
) {
2596 derr
<< __func__
<< " set_collection_opts returns error:" << r
<< dendl
;
2600 void PG::on_pool_change()
2602 init_collection_pool_opts();
2603 plpg_on_pool_change();
2606 void PG::C_DeleteMore::complete(int r
) {
2607 ceph_assert(r
== 0);
2609 if (!pg
->pg_has_reset_since(epoch
)) {
2610 pg
->osd
->queue_for_pg_delete(pg
->get_pgid(), epoch
);
2616 std::pair
<ghobject_t
, bool> PG::do_delete_work(
2617 ObjectStore::Transaction
&t
,
2620 dout(10) << __func__
<< dendl
;
2623 float osd_delete_sleep
= osd
->osd
->get_osd_delete_sleep();
2624 if (osd_delete_sleep
> 0 && delete_needs_sleep
) {
2625 epoch_t e
= get_osdmap()->get_epoch();
2627 auto delete_requeue_callback
= new LambdaContext([this, pgref
, e
](int r
) {
2628 dout(20) << __func__
<< " wake up at "
2630 << ", re-queuing delete" << dendl
;
2631 std::scoped_lock locker
{*this};
2632 delete_needs_sleep
= false;
2633 if (!pg_has_reset_since(e
)) {
2634 osd
->queue_for_pg_delete(get_pgid(), e
);
2638 auto delete_schedule_time
= ceph::real_clock::now();
2639 delete_schedule_time
+= ceph::make_timespan(osd_delete_sleep
);
2640 std::lock_guard l
{osd
->sleep_lock
};
2641 osd
->sleep_timer
.add_event_at(delete_schedule_time
,
2642 delete_requeue_callback
);
2643 dout(20) << __func__
<< " Delete scheduled at " << delete_schedule_time
<< dendl
;
2644 return std::make_pair(_next
, true);
2648 delete_needs_sleep
= true;
2652 vector
<ghobject_t
> olist
;
2653 int max
= std::min(osd
->store
->get_ideal_list_max(),
2654 (int)cct
->_conf
->osd_target_transaction_size
);
2656 osd
->store
->collection_list(
2659 ghobject_t::get_max(),
2663 dout(20) << __func__
<< " " << olist
<< dendl
;
2665 // make sure we've removed everything
2666 // by one more listing from the beginning
2667 if (_next
!= ghobject_t() && olist
.empty()) {
2668 next
= ghobject_t();
2669 osd
->store
->collection_list(
2672 ghobject_t::get_max(),
2676 if (!olist
.empty()) {
2677 dout(0) << __func__
<< " additional unexpected onode list"
2678 <<" (new onodes has appeared since PG removal started"
2683 OSDriver::OSTransaction
_t(osdriver
.get_transaction(&t
));
2685 for (auto& oid
: olist
) {
2686 if (oid
== pgmeta_oid
) {
2689 if (oid
.is_pgmeta()) {
2690 osd
->clog
->warn() << info
.pgid
<< " found stray pgmeta-like " << oid
2691 << " during PG removal";
2693 int r
= snap_mapper
.remove_oid(oid
.hobj
, &_t
);
2694 if (r
!= 0 && r
!= -ENOENT
) {
2697 t
.remove(coll
, oid
);
2700 bool running
= true;
2702 dout(20) << __func__
<< " deleting " << num
<< " objects" << dendl
;
2703 Context
*fin
= new C_DeleteMore(this, get_osdmap_epoch());
2704 t
.register_on_commit(fin
);
2706 if (cct
->_conf
->osd_inject_failure_on_pg_removal
) {
2710 // final flush here to ensure completions drop refs. Of particular concern
2711 // are the SnapMapper ContainerContexts.
2714 PGLog::clear_info_log(info
.pgid
, &t
);
2715 t
.remove_collection(coll
);
2716 t
.register_on_commit(new ContainerContext
<PGRef
>(pgref
));
2717 t
.register_on_applied(new ContainerContext
<PGRef
>(pgref
));
2718 osd
->store
->queue_transaction(ch
, std::move(t
));
2722 if (!osd
->try_finish_pg_delete(this, pool
.info
.get_pg_num())) {
2723 dout(1) << __func__
<< " raced with merge, reinstantiating" << dendl
;
2724 ch
= osd
->store
->create_new_collection(coll
);
2725 create_pg_collection(t
,
2727 info
.pgid
.get_split_bits(pool
.info
.get_pg_num()));
2728 init_pg_ondisk(t
, info
.pgid
, &pool
.info
);
2729 recovery_state
.reset_last_persisted();
2731 recovery_state
.set_delete_complete();
2733 // cancel reserver here, since the PG is about to get deleted and the
2734 // exit() methods don't run when that happens.
2735 osd
->local_reserver
.cancel_reservation(info
.pgid
);
2740 return {next
, running
};
2743 int PG::pg_stat_adjust(osd_stat_t
*ns
)
2745 osd_stat_t
&new_stat
= *ns
;
2749 // Adjust the kb_used by adding pending backfill data
2750 uint64_t reserved_num_bytes
= get_reserved_num_bytes();
2752 // For now we don't consider projected space gains here
2753 // I suggest we have an optional 2 pass backfill that frees up
2754 // space in a first pass. This could be triggered when at nearfull
2755 // or near to backfillfull.
2756 if (reserved_num_bytes
> 0) {
2757 // TODO: Handle compression by adjusting by the PGs average
2758 // compression precentage.
2759 dout(20) << __func__
<< " reserved_num_bytes " << (reserved_num_bytes
>> 10) << "KiB"
2760 << " Before kb_used " << new_stat
.statfs
.kb_used() << "KiB" << dendl
;
2761 if (new_stat
.statfs
.available
> reserved_num_bytes
)
2762 new_stat
.statfs
.available
-= reserved_num_bytes
;
2764 new_stat
.statfs
.available
= 0;
2765 dout(20) << __func__
<< " After kb_used " << new_stat
.statfs
.kb_used() << "KiB" << dendl
;
2771 void PG::dump_pgstate_history(Formatter
*f
)
2773 std::scoped_lock l
{*this};
2774 recovery_state
.dump_history(f
);
2777 void PG::dump_missing(Formatter
*f
)
2779 for (auto& i
: recovery_state
.get_pg_log().get_missing().get_items()) {
2780 f
->open_object_section("object");
2781 f
->dump_object("oid", i
.first
);
2782 f
->dump_object("missing_info", i
.second
);
2783 if (recovery_state
.get_missing_loc().needs_recovery(i
.first
)) {
2786 recovery_state
.get_missing_loc().is_unfound(i
.first
));
2787 f
->open_array_section("locations");
2788 for (auto l
: recovery_state
.get_missing_loc().get_locations(i
.first
)) {
2789 f
->dump_object("shard", l
);
2797 void PG::get_pg_stats(std::function
<void(const pg_stat_t
&, epoch_t lec
)> f
)
2799 std::lock_guard l
{pg_stats_publish_lock
};
2800 if (pg_stats_publish_valid
) {
2801 f(pg_stats_publish
, pg_stats_publish
.get_effective_last_epoch_clean());
2805 void PG::with_heartbeat_peers(std::function
<void(int)> f
)
2807 std::lock_guard l
{heartbeat_peer_lock
};
2808 for (auto p
: heartbeat_peers
) {
2811 for (auto p
: probe_targets
) {
2816 uint64_t PG::get_min_alloc_size() const {
2817 return osd
->store
->get_min_alloc_size();