1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 // #include "msg/Messenger.h"
17 #include "messages/MOSDRepScrub.h"
18 // #include "common/cmdparse.h"
19 // #include "common/ceph_context.h"
21 #include "common/errno.h"
22 #include "common/ceph_releases.h"
23 #include "common/config.h"
25 #include "OpRequest.h"
26 #include "ScrubStore.h"
28 #include "osd/scheduler/OpSchedulerItem.h"
30 #include "common/Timer.h"
31 #include "common/perf_counters.h"
33 #include "messages/MOSDOp.h"
34 #include "messages/MOSDPGNotify.h"
35 // #include "messages/MOSDPGLog.h"
36 #include "messages/MOSDPGInfo.h"
37 #include "messages/MOSDPGScan.h"
38 #include "messages/MOSDPGBackfill.h"
39 #include "messages/MOSDPGBackfillRemove.h"
40 #include "messages/MBackfillReserve.h"
41 #include "messages/MRecoveryReserve.h"
42 #include "messages/MOSDPGPush.h"
43 #include "messages/MOSDPGPushReply.h"
44 #include "messages/MOSDPGPull.h"
45 #include "messages/MOSDECSubOpWrite.h"
46 #include "messages/MOSDECSubOpWriteReply.h"
47 #include "messages/MOSDECSubOpRead.h"
48 #include "messages/MOSDECSubOpReadReply.h"
49 #include "messages/MOSDPGUpdateLogMissing.h"
50 #include "messages/MOSDPGUpdateLogMissingReply.h"
51 #include "messages/MOSDBackoff.h"
52 #include "messages/MOSDScrubReserve.h"
53 #include "messages/MOSDRepOp.h"
54 #include "messages/MOSDRepOpReply.h"
55 #include "messages/MOSDRepScrubMap.h"
56 #include "messages/MOSDPGRecoveryDelete.h"
57 #include "messages/MOSDPGRecoveryDeleteReply.h"
59 #include "common/BackTrace.h"
60 #include "common/EventTrace.h"
63 #define TRACEPOINT_DEFINE
64 #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
65 #include "tracing/pg.h"
66 #undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
67 #undef TRACEPOINT_DEFINE
69 #define tracepoint(...)
74 #define dout_context cct
75 #define dout_subsys ceph_subsys_osd
77 #define dout_prefix _prefix(_dout, this)
79 using namespace ceph::osd::scheduler
;
82 static ostream
& _prefix(std::ostream
*_dout
, T
*t
)
84 return t
->gen_prefix(*_dout
);
87 void PG::get(const char* tag
)
90 lgeneric_subdout(cct
, refs
, 5) << "PG::get " << this << " "
91 << "tag " << (tag
? tag
: "(none") << " "
92 << (after
- 1) << " -> " << after
<< dendl
;
94 std::lock_guard
l(_ref_id_lock
);
99 void PG::put(const char* tag
)
103 std::lock_guard
l(_ref_id_lock
);
104 auto tag_counts_entry
= _tag_counts
.find(tag
);
105 ceph_assert(tag_counts_entry
!= _tag_counts
.end());
106 --tag_counts_entry
->second
;
107 if (tag_counts_entry
->second
== 0) {
108 _tag_counts
.erase(tag_counts_entry
);
112 auto local_cct
= cct
;
114 lgeneric_subdout(local_cct
, refs
, 5) << "PG::put " << this << " "
115 << "tag " << (tag
? tag
: "(none") << " "
116 << (after
+ 1) << " -> " << after
123 uint64_t PG::get_with_id()
126 std::lock_guard
l(_ref_id_lock
);
127 uint64_t id
= ++_ref_id
;
131 lgeneric_subdout(cct
, refs
, 5) << "PG::get " << this << " " << info
.pgid
132 << " got id " << id
<< " "
133 << (ref
- 1) << " -> " << ref
135 ceph_assert(!_live_ids
.count(id
));
136 _live_ids
.insert(make_pair(id
, ss
.str()));
140 void PG::put_with_id(uint64_t id
)
143 lgeneric_subdout(cct
, refs
, 5) << "PG::put " << this << " " << info
.pgid
144 << " put id " << id
<< " "
145 << (newref
+ 1) << " -> " << newref
148 std::lock_guard
l(_ref_id_lock
);
149 ceph_assert(_live_ids
.count(id
));
156 void PG::dump_live_ids()
158 std::lock_guard
l(_ref_id_lock
);
159 dout(0) << "\t" << __func__
<< ": " << info
.pgid
<< " live ids:" << dendl
;
160 for (map
<uint64_t, string
>::iterator i
= _live_ids
.begin();
161 i
!= _live_ids
.end();
163 dout(0) << "\t\tid: " << *i
<< dendl
;
165 dout(0) << "\t" << __func__
<< ": " << info
.pgid
<< " live tags:" << dendl
;
166 for (map
<string
, uint64_t>::iterator i
= _tag_counts
.begin();
167 i
!= _tag_counts
.end();
169 dout(0) << "\t\tid: " << *i
<< dendl
;
174 PG::PG(OSDService
*o
, OSDMapRef curmap
,
175 const PGPool
&_pool
, spg_t p
) :
176 pg_whoami(o
->whoami
, p
.shard
),
181 osdriver(osd
->store
, coll_t(), OSD::make_snapmapper_oid()),
186 p
.get_split_bits(_pool
.info
.get_pg_num()),
189 trace_endpoint("0.0.0.0", 0, "PG"),
191 pgmeta_oid(p
.make_pgmeta_oid()),
192 stat_queue_item(this),
194 recovery_queued(false),
195 recovery_ops_active(0),
196 backfill_reserving(false),
197 pg_stats_publish_valid(false),
198 finish_sync_event(NULL
),
199 scrub_after_recovery(false),
200 save_req_scrub(false),
210 pool(recovery_state
.get_pool()),
211 info(recovery_state
.get_info())
214 osd
->add_pgid(p
, this);
217 std::stringstream ss
;
218 ss
<< "PG " << info
.pgid
;
219 trace_endpoint
.copy_name(ss
.str());
226 osd
->remove_pgid(info
.pgid
, this);
230 void PG::lock(bool no_lockdep
) const
232 #ifdef CEPH_DEBUG_MUTEX
233 _lock
.lock(no_lockdep
);
236 locked_by
= std::this_thread::get_id();
238 // if we have unrecorded dirty state with the lock dropped, there is a bug
239 ceph_assert(!recovery_state
.debug_has_dirty_state());
241 dout(30) << "lock" << dendl
;
244 bool PG::is_locked() const
246 return ceph_mutex_is_locked(_lock
);
249 void PG::unlock() const
251 //generic_dout(0) << this << " " << info.pgid << " unlock" << dendl;
252 ceph_assert(!recovery_state
.debug_has_dirty_state());
253 #ifndef CEPH_DEBUG_MUTEX
259 std::ostream
& PG::gen_prefix(std::ostream
& out
) const
261 OSDMapRef mapref
= recovery_state
.get_osdmap();
262 #ifdef CEPH_DEBUG_MUTEX
263 if (_lock
.is_locked_by_me()) {
265 if (locked_by
== std::this_thread::get_id()) {
267 out
<< "osd." << osd
->whoami
268 << " pg_epoch: " << (mapref
? mapref
->get_epoch():0)
269 << " " << *this << " ";
271 out
<< "osd." << osd
->whoami
272 << " pg_epoch: " << (mapref
? mapref
->get_epoch():0)
273 << " pg[" << pg_id
.pgid
<< "(unlocked)] ";
278 PerfCounters
&PG::get_peering_perf() {
279 return *(osd
->recoverystate_perf
);
282 PerfCounters
&PG::get_perf_logger() {
283 return *(osd
->logger
);
286 void PG::log_state_enter(const char *state
) {
287 osd
->pg_recovery_stats
.log_enter(state
);
290 void PG::log_state_exit(
291 const char *state_name
, utime_t enter_time
,
292 uint64_t events
, utime_t event_dur
) {
293 osd
->pg_recovery_stats
.log_exit(
294 state_name
, ceph_clock_now() - enter_time
, events
, event_dur
);
297 /********* PG **********/
299 void PG::remove_snap_mapped_object(
300 ObjectStore::Transaction
&t
, const hobject_t
&soid
)
304 ghobject_t(soid
, ghobject_t::NO_GEN
, pg_whoami
.shard
));
305 clear_object_snap_mapping(&t
, soid
);
308 void PG::clear_object_snap_mapping(
309 ObjectStore::Transaction
*t
, const hobject_t
&soid
)
311 OSDriver::OSTransaction
_t(osdriver
.get_transaction(t
));
312 if (soid
.snap
< CEPH_MAXSNAP
) {
313 int r
= snap_mapper
.remove_oid(
316 if (!(r
== 0 || r
== -ENOENT
)) {
317 derr
<< __func__
<< ": remove_oid returned " << cpp_strerror(r
) << dendl
;
323 void PG::update_object_snap_mapping(
324 ObjectStore::Transaction
*t
, const hobject_t
&soid
, const set
<snapid_t
> &snaps
)
326 OSDriver::OSTransaction
_t(osdriver
.get_transaction(t
));
327 ceph_assert(soid
.snap
< CEPH_MAXSNAP
);
328 int r
= snap_mapper
.remove_oid(
331 if (!(r
== 0 || r
== -ENOENT
)) {
332 derr
<< __func__
<< ": remove_oid returned " << cpp_strerror(r
) << dendl
;
341 /******* PG ***********/
342 void PG::clear_primary_state()
344 projected_log
= PGLog::IndexedLog();
347 snap_trimq_repeat
.clear();
348 finish_sync_event
= 0; // so that _finish_recovery doesn't go off in another thread
349 release_pg_backoffs();
351 scrubber
.reserved_peers
.clear();
352 scrub_after_recovery
= false;
353 save_req_scrub
= false;
358 PG::Scrubber::Scrubber()
359 : local_reserved(false), remote_reserved(false), reserve_failed(false),
362 shallow_errors(0), deep_errors(0), fixed(0),
363 must_scrub(false), must_deep_scrub(false), must_repair(false),
364 need_auto(false), req_scrub(false), time_for_deep(false),
367 deep_scrub_on_error(false),
368 num_digest_updates_pending(0),
373 PG::Scrubber::~Scrubber() {}
375 bool PG::op_has_sufficient_caps(OpRequestRef
& op
)
378 if (op
->get_req()->get_type() != CEPH_MSG_OSD_OP
)
381 auto req
= op
->get_req
<MOSDOp
>();
382 auto priv
= req
->get_connection()->get_priv();
383 auto session
= static_cast<Session
*>(priv
.get());
385 dout(0) << "op_has_sufficient_caps: no session for op " << *req
<< dendl
;
388 OSDCap
& caps
= session
->caps
;
391 const string
&key
= req
->get_hobj().get_key().empty() ?
392 req
->get_oid().name
:
393 req
->get_hobj().get_key();
395 bool cap
= caps
.is_capable(pool
.name
, req
->get_hobj().nspace
,
396 pool
.info
.application_metadata
,
399 op
->need_write_cap(),
401 session
->get_peer_socket_addr());
403 dout(20) << "op_has_sufficient_caps "
404 << "session=" << session
405 << " pool=" << pool
.id
<< " (" << pool
.name
406 << " " << req
->get_hobj().nspace
408 << " pool_app_metadata=" << pool
.info
.application_metadata
409 << " need_read_cap=" << op
->need_read_cap()
410 << " need_write_cap=" << op
->need_write_cap()
411 << " classes=" << op
->classes()
412 << " -> " << (cap
? "yes" : "NO")
417 bool PG::requeue_scrub(bool high_priority
)
419 ceph_assert(ceph_mutex_is_locked(_lock
));
421 dout(10) << __func__
<< ": already queued" << dendl
;
424 dout(10) << __func__
<< ": queueing" << dendl
;
426 osd
->queue_for_scrub(this, high_priority
);
431 void PG::queue_recovery()
433 if (!is_primary() || !is_peered()) {
434 dout(10) << "queue_recovery -- not primary or not peered " << dendl
;
435 ceph_assert(!recovery_queued
);
436 } else if (recovery_queued
) {
437 dout(10) << "queue_recovery -- already queued" << dendl
;
439 dout(10) << "queue_recovery -- queuing" << dendl
;
440 recovery_queued
= true;
441 osd
->queue_for_recovery(this);
445 bool PG::queue_scrub()
447 ceph_assert(ceph_mutex_is_locked(_lock
));
448 if (is_scrubbing()) {
451 // An interrupted recovery repair could leave this set.
452 state_clear(PG_STATE_REPAIR
);
453 if (scrubber
.need_auto
) {
454 scrubber
.must_scrub
= true;
455 scrubber
.must_deep_scrub
= true;
456 scrubber
.auto_repair
= true;
457 scrubber
.need_auto
= false;
459 scrubber
.priority
= scrubber
.must_scrub
?
460 cct
->_conf
->osd_requested_scrub_priority
: get_scrub_priority();
461 scrubber
.must_scrub
= false;
462 state_set(PG_STATE_SCRUBBING
);
463 if (scrubber
.must_deep_scrub
) {
464 state_set(PG_STATE_DEEP_SCRUB
);
465 scrubber
.must_deep_scrub
= false;
467 if (scrubber
.must_repair
|| scrubber
.auto_repair
) {
468 state_set(PG_STATE_REPAIR
);
469 scrubber
.must_repair
= false;
475 unsigned PG::get_scrub_priority()
477 // a higher value -> a higher priority
478 int64_t pool_scrub_priority
= 0;
479 pool
.info
.opts
.get(pool_opts_t::SCRUB_PRIORITY
, &pool_scrub_priority
);
480 return pool_scrub_priority
> 0 ? pool_scrub_priority
: cct
->_conf
->osd_scrub_priority
;
483 Context
*PG::finish_recovery()
485 dout(10) << "finish_recovery" << dendl
;
486 ceph_assert(info
.last_complete
== info
.last_update
);
488 clear_recovery_state();
491 * sync all this before purging strays. but don't block!
493 finish_sync_event
= new C_PG_FinishRecovery(this);
494 return finish_sync_event
;
497 void PG::_finish_recovery(Context
*c
)
499 std::scoped_lock locker
{*this};
500 if (recovery_state
.is_deleting() || !is_clean()) {
501 dout(10) << __func__
<< " raced with delete or repair" << dendl
;
504 // When recovery is initiated by a repair, that flag is left on
505 state_clear(PG_STATE_REPAIR
);
506 if (c
== finish_sync_event
) {
507 dout(10) << "_finish_recovery" << dendl
;
508 finish_sync_event
= 0;
509 recovery_state
.purge_strays();
511 publish_stats_to_osd();
513 if (scrub_after_recovery
) {
514 dout(10) << "_finish_recovery requeueing for scrub" << dendl
;
515 scrub_after_recovery
= false;
516 scrubber
.must_deep_scrub
= true;
517 scrubber
.check_repair
= true;
518 // We remember whether req_scrub was set when scrub_after_recovery set to true
519 scrubber
.req_scrub
= save_req_scrub
;
523 dout(10) << "_finish_recovery -- stale" << dendl
;
527 void PG::start_recovery_op(const hobject_t
& soid
)
529 dout(10) << "start_recovery_op " << soid
530 #ifdef DEBUG_RECOVERY_OIDS
531 << " (" << recovering_oids
<< ")"
534 ceph_assert(recovery_ops_active
>= 0);
535 recovery_ops_active
++;
536 #ifdef DEBUG_RECOVERY_OIDS
537 recovering_oids
.insert(soid
);
539 osd
->start_recovery_op(this, soid
);
542 void PG::finish_recovery_op(const hobject_t
& soid
, bool dequeue
)
544 dout(10) << "finish_recovery_op " << soid
545 #ifdef DEBUG_RECOVERY_OIDS
546 << " (" << recovering_oids
<< ")"
549 ceph_assert(recovery_ops_active
> 0);
550 recovery_ops_active
--;
551 #ifdef DEBUG_RECOVERY_OIDS
552 ceph_assert(recovering_oids
.count(soid
));
553 recovering_oids
.erase(recovering_oids
.find(soid
));
555 osd
->finish_recovery_op(this, soid
, dequeue
);
562 void PG::split_into(pg_t child_pgid
, PG
*child
, unsigned split_bits
)
564 recovery_state
.split_into(child_pgid
, &child
->recovery_state
, split_bits
);
566 child
->update_snap_mapper_bits(split_bits
);
568 child
->snap_trimq
= snap_trimq
;
569 child
->snap_trimq_repeat
= snap_trimq_repeat
;
571 _split_into(child_pgid
, child
, split_bits
);
573 // release all backoffs for simplicity
574 release_backoffs(hobject_t(), hobject_t::get_max());
577 void PG::start_split_stats(const set
<spg_t
>& childpgs
, vector
<object_stat_sum_t
> *out
)
579 recovery_state
.start_split_stats(childpgs
, out
);
582 void PG::finish_split_stats(const object_stat_sum_t
& stats
, ObjectStore::Transaction
&t
)
584 recovery_state
.finish_split_stats(stats
, t
);
587 void PG::merge_from(map
<spg_t
,PGRef
>& sources
, PeeringCtx
&rctx
,
589 const pg_merge_meta_t
& last_pg_merge_meta
)
591 dout(10) << __func__
<< " from " << sources
<< " split_bits " << split_bits
593 map
<spg_t
, PeeringState
*> source_ps
;
594 for (auto &&source
: sources
) {
595 source_ps
.emplace(source
.first
, &source
.second
->recovery_state
);
597 recovery_state
.merge_from(source_ps
, rctx
, split_bits
, last_pg_merge_meta
);
599 for (auto& i
: sources
) {
600 auto& source
= i
.second
;
601 // wipe out source's pgmeta
602 rctx
.transaction
.remove(source
->coll
, source
->pgmeta_oid
);
604 // merge (and destroy source collection)
605 rctx
.transaction
.merge_collection(source
->coll
, coll
, split_bits
);
608 // merge_collection does this, but maybe all of our sources were missing.
609 rctx
.transaction
.collection_set_bits(coll
, split_bits
);
611 snap_mapper
.update_bits(split_bits
);
614 void PG::add_backoff(const ceph::ref_t
<Session
>& s
, const hobject_t
& begin
, const hobject_t
& end
)
617 if (!con
) // OSD::ms_handle_reset clears s->con without a lock
619 auto b
= s
->have_backoff(info
.pgid
, begin
);
621 derr
<< __func__
<< " already have backoff for " << s
<< " begin " << begin
622 << " " << *b
<< dendl
;
625 std::lock_guard
l(backoff_lock
);
626 b
= ceph::make_ref
<Backoff
>(info
.pgid
, this, s
, ++s
->backoff_seq
, begin
, end
);
627 backoffs
[begin
].insert(b
);
629 dout(10) << __func__
<< " session " << s
<< " added " << *b
<< dendl
;
634 CEPH_OSD_BACKOFF_OP_BLOCK
,
640 void PG::release_backoffs(const hobject_t
& begin
, const hobject_t
& end
)
642 dout(10) << __func__
<< " [" << begin
<< "," << end
<< ")" << dendl
;
643 vector
<ceph::ref_t
<Backoff
>> bv
;
645 std::lock_guard
l(backoff_lock
);
646 auto p
= backoffs
.lower_bound(begin
);
647 while (p
!= backoffs
.end()) {
648 int r
= cmp(p
->first
, end
);
649 dout(20) << __func__
<< " ? " << r
<< " " << p
->first
650 << " " << p
->second
<< dendl
;
651 // note: must still examine begin=end=p->first case
652 if (r
> 0 || (r
== 0 && begin
< end
)) {
655 dout(20) << __func__
<< " checking " << p
->first
656 << " " << p
->second
<< dendl
;
657 auto q
= p
->second
.begin();
658 while (q
!= p
->second
.end()) {
659 dout(20) << __func__
<< " checking " << *q
<< dendl
;
660 int r
= cmp((*q
)->begin
, begin
);
661 if (r
== 0 || (r
> 0 && (*q
)->end
< end
)) {
663 q
= p
->second
.erase(q
);
668 if (p
->second
.empty()) {
669 p
= backoffs
.erase(p
);
676 std::lock_guard
l(b
->lock
);
677 dout(10) << __func__
<< " " << *b
<< dendl
;
679 ceph_assert(b
->pg
== this);
680 ConnectionRef con
= b
->session
->con
;
681 if (con
) { // OSD::ms_handle_reset clears s->con without a lock
686 CEPH_OSD_BACKOFF_OP_UNBLOCK
,
692 b
->state
= Backoff::STATE_DELETING
;
694 b
->session
->rm_backoff(b
);
702 void PG::clear_backoffs()
704 dout(10) << __func__
<< " " << dendl
;
705 map
<hobject_t
,set
<ceph::ref_t
<Backoff
>>> ls
;
707 std::lock_guard
l(backoff_lock
);
711 for (auto& b
: p
.second
) {
712 std::lock_guard
l(b
->lock
);
713 dout(10) << __func__
<< " " << *b
<< dendl
;
715 ceph_assert(b
->pg
== this);
717 b
->state
= Backoff::STATE_DELETING
;
719 b
->session
->rm_backoff(b
);
728 // called by Session::clear_backoffs()
729 void PG::rm_backoff(const ceph::ref_t
<Backoff
>& b
)
731 dout(10) << __func__
<< " " << *b
<< dendl
;
732 std::lock_guard
l(backoff_lock
);
733 ceph_assert(ceph_mutex_is_locked_by_me(b
->lock
));
734 ceph_assert(b
->pg
== this);
735 auto p
= backoffs
.find(b
->begin
);
736 // may race with release_backoffs()
737 if (p
!= backoffs
.end()) {
738 auto q
= p
->second
.find(b
);
739 if (q
!= p
->second
.end()) {
741 if (p
->second
.empty()) {
748 void PG::clear_recovery_state()
750 dout(10) << "clear_recovery_state" << dendl
;
752 finish_sync_event
= 0;
755 while (recovery_ops_active
> 0) {
756 #ifdef DEBUG_RECOVERY_OIDS
757 soid
= *recovering_oids
.begin();
759 finish_recovery_op(soid
, true);
762 backfill_info
.clear();
763 peer_backfill_info
.clear();
764 waiting_on_backfill
.clear();
765 _clear_recovery_state(); // pg impl specific hook
768 void PG::cancel_recovery()
770 dout(10) << "cancel_recovery" << dendl
;
771 clear_recovery_state();
774 void PG::set_probe_targets(const set
<pg_shard_t
> &probe_set
)
776 std::lock_guard
l(heartbeat_peer_lock
);
777 probe_targets
.clear();
778 for (set
<pg_shard_t
>::iterator i
= probe_set
.begin();
779 i
!= probe_set
.end();
781 probe_targets
.insert(i
->osd
);
785 void PG::send_cluster_message(
786 int target
, Message
*m
,
787 epoch_t epoch
, bool share_map_update
=false)
789 ConnectionRef con
= osd
->get_con_osd_cluster(
790 target
, get_osdmap_epoch());
796 if (share_map_update
) {
797 osd
->maybe_share_map(con
.get(), get_osdmap());
799 osd
->send_message_osd_cluster(m
, con
.get());
802 void PG::clear_probe_targets()
804 std::lock_guard
l(heartbeat_peer_lock
);
805 probe_targets
.clear();
808 void PG::update_heartbeat_peers(set
<int> new_peers
)
810 bool need_update
= false;
811 heartbeat_peer_lock
.lock();
812 if (new_peers
== heartbeat_peers
) {
813 dout(10) << "update_heartbeat_peers " << heartbeat_peers
<< " unchanged" << dendl
;
815 dout(10) << "update_heartbeat_peers " << heartbeat_peers
<< " -> " << new_peers
<< dendl
;
816 heartbeat_peers
.swap(new_peers
);
819 heartbeat_peer_lock
.unlock();
822 osd
->need_heartbeat_peer_update();
826 bool PG::check_in_progress_op(
827 const osd_reqid_t
&r
,
829 version_t
*user_version
,
831 vector
<pg_log_op_return_item_t
> *op_returns
835 projected_log
.get_request(r
, version
, user_version
, return_code
,
837 recovery_state
.get_pg_log().get_log().get_request(
838 r
, version
, user_version
, return_code
, op_returns
));
841 void PG::publish_stats_to_osd()
846 std::lock_guard l
{pg_stats_publish_lock
};
847 auto stats
= recovery_state
.prepare_stats_for_publish(
848 pg_stats_publish_valid
,
852 pg_stats_publish
= stats
.value();
853 pg_stats_publish_valid
= true;
857 unsigned PG::get_target_pg_log_entries() const
859 return osd
->get_target_pg_log_entries();
862 void PG::clear_publish_stats()
864 dout(15) << "clear_stats" << dendl
;
865 std::lock_guard l
{pg_stats_publish_lock
};
866 pg_stats_publish_valid
= false;
870 * initialize a newly instantiated pg
872 * Initialize PG state, as when a PG is initially created, or when it
873 * is first instantiated on the current node.
875 * @param role our role/rank
876 * @param newup up set
877 * @param newacting acting set
878 * @param history pg history
879 * @param pi past_intervals
880 * @param backfill true if info should be marked as backfill
881 * @param t transaction to write out our new state in
885 const vector
<int>& newup
, int new_up_primary
,
886 const vector
<int>& newacting
, int new_acting_primary
,
887 const pg_history_t
& history
,
888 const PastIntervals
& pi
,
890 ObjectStore::Transaction
&t
)
893 role
, newup
, new_up_primary
, newacting
,
894 new_acting_primary
, history
, pi
, backfill
, t
);
900 std::scoped_lock l
{*this};
901 recovery_state
.shutdown();
905 #pragma GCC diagnostic ignored "-Wpragmas"
906 #pragma GCC diagnostic push
907 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
909 void PG::upgrade(ObjectStore
*store
)
911 dout(0) << __func__
<< " " << info_struct_v
<< " -> " << pg_latest_struct_v
913 ceph_assert(info_struct_v
<= 10);
914 ObjectStore::Transaction t
;
916 // <do upgrade steps here>
919 ceph_assert(info_struct_v
== 10);
921 // update infover_key
922 if (info_struct_v
< pg_latest_struct_v
) {
923 map
<string
,bufferlist
> v
;
924 __u8 ver
= pg_latest_struct_v
;
925 encode(ver
, v
[string(infover_key
)]);
926 t
.omap_setkeys(coll
, pgmeta_oid
, v
);
929 recovery_state
.force_write_state(t
);
931 ObjectStore::CollectionHandle ch
= store
->open_collection(coll
);
932 int r
= store
->queue_transaction(ch
, std::move(t
));
934 derr
<< __func__
<< ": queue_transaction returned "
935 << cpp_strerror(r
) << dendl
;
941 if (!ch
->flush_commit(&waiter
)) {
946 #pragma GCC diagnostic pop
947 #pragma GCC diagnostic warning "-Wpragmas"
949 void PG::prepare_write(
951 pg_info_t
&last_written_info
,
952 PastIntervals
&past_intervals
,
956 bool need_write_epoch
,
957 ObjectStore::Transaction
&t
)
959 info
.stats
.stats
.add(unstable_stats
);
960 unstable_stats
.clear();
961 map
<string
,bufferlist
> km
;
962 string key_to_remove
;
963 if (dirty_big_info
|| dirty_info
) {
964 int ret
= prepare_info_keymap(
974 cct
->_conf
->osd_fast_info
,
977 ceph_assert(ret
== 0);
979 pglog
.write_log_and_missing(
980 t
, &km
, coll
, pgmeta_oid
, pool
.info
.require_rollback());
982 t
.omap_setkeys(coll
, pgmeta_oid
, km
);
983 if (!key_to_remove
.empty())
984 t
.omap_rmkey(coll
, pgmeta_oid
, key_to_remove
);
987 #pragma GCC diagnostic ignored "-Wpragmas"
988 #pragma GCC diagnostic push
989 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
991 bool PG::_has_removal_flag(ObjectStore
*store
,
995 ghobject_t
pgmeta_oid(pgid
.make_pgmeta_oid());
999 keys
.insert("_remove");
1000 map
<string
,bufferlist
> values
;
1001 auto ch
= store
->open_collection(coll
);
1003 if (store
->omap_get_values(ch
, pgmeta_oid
, keys
, &values
) == 0 &&
1010 int PG::peek_map_epoch(ObjectStore
*store
,
1015 ghobject_t
legacy_infos_oid(OSD::make_infos_oid());
1016 ghobject_t
pgmeta_oid(pgid
.make_pgmeta_oid());
1017 epoch_t cur_epoch
= 0;
1019 // validate collection name
1020 ceph_assert(coll
.is_pg());
1024 keys
.insert(string(infover_key
));
1025 keys
.insert(string(epoch_key
));
1026 map
<string
,bufferlist
> values
;
1027 auto ch
= store
->open_collection(coll
);
1029 int r
= store
->omap_get_values(ch
, pgmeta_oid
, keys
, &values
);
1031 ceph_assert(values
.size() == 2);
1033 // sanity check version
1034 auto bp
= values
[string(infover_key
)].cbegin();
1036 decode(struct_v
, bp
);
1037 ceph_assert(struct_v
>= 8);
1040 bp
= values
[string(epoch_key
)].begin();
1041 decode(cur_epoch
, bp
);
1043 // probably bug 10617; see OSD::load_pgs()
1047 *pepoch
= cur_epoch
;
1051 #pragma GCC diagnostic pop
1052 #pragma GCC diagnostic warning "-Wpragmas"
1054 bool PG::check_log_for_corruption(ObjectStore
*store
)
1056 /// TODO: this method needs to work with the omap log
1060 //! Get the name we're going to save our corrupt page log as
1061 std::string
PG::get_corrupt_pg_log_name() const
1063 const int MAX_BUF
= 512;
1066 time_t my_time(time(NULL
));
1067 const struct tm
*t
= localtime_r(&my_time
, &tm_buf
);
1068 int ret
= strftime(buf
, sizeof(buf
), "corrupt_log_%Y-%m-%d_%k:%M_", t
);
1070 dout(0) << "strftime failed" << dendl
;
1071 return "corrupt_log_unknown_time";
1074 out
+= stringify(info
.pgid
);
1079 ObjectStore
*store
, spg_t pgid
, const coll_t
&coll
,
1080 pg_info_t
&info
, PastIntervals
&past_intervals
,
1084 keys
.insert(string(infover_key
));
1085 keys
.insert(string(info_key
));
1086 keys
.insert(string(biginfo_key
));
1087 keys
.insert(string(fastinfo_key
));
1088 ghobject_t
pgmeta_oid(pgid
.make_pgmeta_oid());
1089 map
<string
,bufferlist
> values
;
1090 auto ch
= store
->open_collection(coll
);
1092 int r
= store
->omap_get_values(ch
, pgmeta_oid
, keys
, &values
);
1093 ceph_assert(r
== 0);
1094 ceph_assert(values
.size() == 3 ||
1095 values
.size() == 4);
1097 auto p
= values
[string(infover_key
)].cbegin();
1098 decode(struct_v
, p
);
1099 ceph_assert(struct_v
>= 10);
1101 p
= values
[string(info_key
)].begin();
1104 p
= values
[string(biginfo_key
)].begin();
1105 decode(past_intervals
, p
);
1106 decode(info
.purged_snaps
, p
);
1108 p
= values
[string(fastinfo_key
)].begin();
1110 pg_fast_info_t fast
;
1112 fast
.try_apply_to(&info
);
1117 void PG::read_state(ObjectStore
*store
)
1119 PastIntervals past_intervals_from_disk
;
1120 pg_info_t info_from_disk
;
1126 past_intervals_from_disk
,
1128 ceph_assert(r
>= 0);
1130 if (info_struct_v
< pg_compat_struct_v
) {
1131 derr
<< "PG needs upgrade, but on-disk data is too old; upgrade to"
1132 << " an older version first." << dendl
;
1133 ceph_abort_msg("PG too old to upgrade");
1136 recovery_state
.init_from_disk_state(
1137 std::move(info_from_disk
),
1138 std::move(past_intervals_from_disk
),
1139 [this, store
] (PGLog
&pglog
) {
1141 pglog
.read_log_and_missing(
1147 cct
->_conf
->osd_ignore_stale_divergent_priors
,
1148 cct
->_conf
->osd_debug_verify_missing_on_start
);
1151 osd
->clog
->error() << oss
.str();
1155 if (info_struct_v
< pg_latest_struct_v
) {
1159 // initialize current mapping
1161 int primary
, up_primary
;
1162 vector
<int> acting
, up
;
1163 get_osdmap()->pg_to_up_acting_osds(
1164 pg_id
.pgid
, &up
, &up_primary
, &acting
, &primary
);
1165 recovery_state
.init_primary_up_acting(
1170 recovery_state
.set_role(OSDMap::calc_pg_role(pg_whoami
, acting
));
1173 // init pool options
1174 store
->set_collection_opts(ch
, pool
.info
.opts
);
1176 PeeringCtx
rctx(ceph_release_t::unknown
);
1177 handle_initialize(rctx
);
1178 // note: we don't activate here because we know the OSD will advance maps
1180 write_if_dirty(rctx
.transaction
);
1181 store
->queue_transaction(ch
, std::move(rctx
.transaction
));
1184 void PG::update_snap_map(
1185 const vector
<pg_log_entry_t
> &log_entries
,
1186 ObjectStore::Transaction
&t
)
1188 for (vector
<pg_log_entry_t
>::const_iterator i
= log_entries
.begin();
1189 i
!= log_entries
.end();
1191 OSDriver::OSTransaction
_t(osdriver
.get_transaction(&t
));
1192 if (i
->soid
.snap
< CEPH_MAXSNAP
) {
1193 if (i
->is_delete()) {
1194 int r
= snap_mapper
.remove_oid(
1198 derr
<< __func__
<< " remove_oid " << i
->soid
<< " failed with " << r
<< dendl
;
1199 // On removal tolerate missing key corruption
1200 ceph_assert(r
== 0 || r
== -ENOENT
);
1201 } else if (i
->is_update()) {
1202 ceph_assert(i
->snaps
.length() > 0);
1203 vector
<snapid_t
> snaps
;
1204 bufferlist snapbl
= i
->snaps
;
1205 auto p
= snapbl
.cbegin();
1209 derr
<< __func__
<< " decode snaps failure on " << *i
<< dendl
;
1212 set
<snapid_t
> _snaps(snaps
.begin(), snaps
.end());
1214 if (i
->is_clone() || i
->is_promote()) {
1215 snap_mapper
.add_oid(
1219 } else if (i
->is_modify()) {
1220 int r
= snap_mapper
.update_snaps(
1225 ceph_assert(r
== 0);
1227 ceph_assert(i
->is_clean());
1235 * filter trimming|trimmed snaps out of snapcontext
1237 void PG::filter_snapc(vector
<snapid_t
> &snaps
)
1239 // nothing needs to trim, we can return immediately
1240 if (snap_trimq
.empty() && info
.purged_snaps
.empty())
1243 bool filtering
= false;
1244 vector
<snapid_t
> newsnaps
;
1245 for (vector
<snapid_t
>::iterator p
= snaps
.begin();
1248 if (snap_trimq
.contains(*p
) || info
.purged_snaps
.contains(*p
)) {
1250 // start building a new vector with what we've seen so far
1251 dout(10) << "filter_snapc filtering " << snaps
<< dendl
;
1252 newsnaps
.insert(newsnaps
.begin(), snaps
.begin(), p
);
1255 dout(20) << "filter_snapc removing trimq|purged snap " << *p
<< dendl
;
1258 newsnaps
.push_back(*p
); // continue building new vector
1262 snaps
.swap(newsnaps
);
1263 dout(10) << "filter_snapc result " << snaps
<< dendl
;
1267 void PG::requeue_object_waiters(map
<hobject_t
, list
<OpRequestRef
>>& m
)
1269 for (map
<hobject_t
, list
<OpRequestRef
>>::iterator it
= m
.begin();
1272 requeue_ops(it
->second
);
1276 void PG::requeue_op(OpRequestRef op
)
1278 auto p
= waiting_for_map
.find(op
->get_source());
1279 if (p
!= waiting_for_map
.end()) {
1280 dout(20) << __func__
<< " " << op
<< " (waiting_for_map " << p
->first
<< ")"
1282 p
->second
.push_front(op
);
1284 dout(20) << __func__
<< " " << op
<< dendl
;
1287 unique_ptr
<OpSchedulerItem::OpQueueable
>(new PGOpItem(info
.pgid
, op
)),
1288 op
->get_req()->get_cost(),
1289 op
->get_req()->get_priority(),
1290 op
->get_req()->get_recv_stamp(),
1291 op
->get_req()->get_source().num(),
1292 get_osdmap_epoch()));
1296 void PG::requeue_ops(list
<OpRequestRef
> &ls
)
1298 for (list
<OpRequestRef
>::reverse_iterator i
= ls
.rbegin();
1306 void PG::requeue_map_waiters()
1308 epoch_t epoch
= get_osdmap_epoch();
1309 auto p
= waiting_for_map
.begin();
1310 while (p
!= waiting_for_map
.end()) {
1311 if (epoch
< p
->second
.front()->min_epoch
) {
1312 dout(20) << __func__
<< " " << p
->first
<< " front op "
1313 << p
->second
.front() << " must still wait, doing nothing"
1317 dout(20) << __func__
<< " " << p
->first
<< " " << p
->second
<< dendl
;
1318 for (auto q
= p
->second
.rbegin(); q
!= p
->second
.rend(); ++q
) {
1320 osd
->enqueue_front(OpSchedulerItem(
1321 unique_ptr
<OpSchedulerItem::OpQueueable
>(new PGOpItem(info
.pgid
, req
)),
1322 req
->get_req()->get_cost(),
1323 req
->get_req()->get_priority(),
1324 req
->get_req()->get_recv_stamp(),
1325 req
->get_req()->get_source().num(),
1328 p
= waiting_for_map
.erase(p
);
1334 // ==========================================================================================
1338 * when holding pg and sched_scrub_lock, then the states are:
1340 * scrubber.local_reserved = true
1341 * scrubber.active = false
1342 * scrubber.reserved_peers includes whoami
1343 * osd->scrubs_local++
1344 * scheduling, replica declined:
1345 * scrubber.local_reserved = true
1346 * scrubber.reserved_peers includes -1
1347 * osd->scrub_local++
1349 * scrubber.local_reserved = true
1350 * scrubber.active = false
1351 * scrubber.reserved_peers.size() == acting.size();
1353 * osd->scrub_local++
1355 * scrubber.local_reserved = true;
1356 * scrubber.active = true
1357 * scrubber.reserved_peers empty
1360 // returns true if a scrub has been newly kicked off
1361 bool PG::sched_scrub()
1363 ceph_assert(ceph_mutex_is_locked(_lock
));
1364 ceph_assert(!is_scrubbing());
1365 if (!(is_primary() && is_active() && is_clean())) {
1369 // All processing the first time through commits us to whatever
1370 // choices are made.
1371 if (!scrubber
.local_reserved
) {
1372 dout(20) << __func__
<< ": Start processing pg " << info
.pgid
<< dendl
;
1374 bool allow_deep_scrub
= !(get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB
) ||
1375 pool
.info
.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB
));
1376 bool allow_scrub
= !(get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB
) ||
1377 pool
.info
.has_flag(pg_pool_t::FLAG_NOSCRUB
));
1378 bool has_deep_errors
= (info
.stats
.stats
.sum
.num_deep_scrub_errors
> 0);
1379 bool try_to_auto_repair
= (cct
->_conf
->osd_scrub_auto_repair
1380 && get_pgbackend()->auto_repair_supported());
1382 scrubber
.time_for_deep
= false;
1383 // Clear these in case user issues the scrub/repair command during
1384 // the scheduling of the scrub/repair (e.g. request reservation)
1385 scrubber
.deep_scrub_on_error
= false;
1386 scrubber
.auto_repair
= false;
1388 // All periodic scrub handling goes here because must_scrub is
1389 // always set for must_deep_scrub and must_repair.
1390 if (!scrubber
.must_scrub
) {
1391 ceph_assert(!scrubber
.must_deep_scrub
&& !scrubber
.must_repair
);
1392 // Handle deep scrub determination only if allowed
1393 if (allow_deep_scrub
) {
1394 // Initial entry and scheduled scrubs without nodeep_scrub set get here
1395 if (scrubber
.need_auto
) {
1396 dout(20) << __func__
<< ": need repair after scrub errors" << dendl
;
1397 scrubber
.time_for_deep
= true;
1399 double deep_scrub_interval
= 0;
1400 pool
.info
.opts
.get(pool_opts_t::DEEP_SCRUB_INTERVAL
, &deep_scrub_interval
);
1401 if (deep_scrub_interval
<= 0) {
1402 deep_scrub_interval
= cct
->_conf
->osd_deep_scrub_interval
;
1404 scrubber
.time_for_deep
= ceph_clock_now() >=
1405 info
.history
.last_deep_scrub_stamp
+ deep_scrub_interval
;
1407 bool deep_coin_flip
= false;
1408 // If we randomize when !allow_scrub && allow_deep_scrub, then it guarantees
1409 // we will deep scrub because this function is called often.
1410 if (!scrubber
.time_for_deep
&& allow_scrub
)
1411 deep_coin_flip
= (rand() % 100) < cct
->_conf
->osd_deep_scrub_randomize_ratio
* 100;
1412 dout(20) << __func__
<< ": time_for_deep=" << scrubber
.time_for_deep
<< " deep_coin_flip=" << deep_coin_flip
<< dendl
;
1414 scrubber
.time_for_deep
= (scrubber
.time_for_deep
|| deep_coin_flip
);
1417 if (!scrubber
.time_for_deep
&& has_deep_errors
) {
1418 osd
->clog
->info() << "osd." << osd
->whoami
1419 << " pg " << info
.pgid
1420 << " Deep scrub errors, upgrading scrub to deep-scrub";
1421 scrubber
.time_for_deep
= true;
1424 if (try_to_auto_repair
) {
1425 if (scrubber
.time_for_deep
) {
1426 dout(20) << __func__
<< ": auto repair with deep scrubbing" << dendl
;
1427 scrubber
.auto_repair
= true;
1428 } else if (allow_scrub
) {
1429 dout(20) << __func__
<< ": auto repair with scrubbing, rescrub if errors found" << dendl
;
1430 scrubber
.deep_scrub_on_error
= true;
1433 } else { // !allow_deep_scrub
1434 dout(20) << __func__
<< ": nodeep_scrub set" << dendl
;
1435 if (has_deep_errors
) {
1436 osd
->clog
->error() << "osd." << osd
->whoami
1437 << " pg " << info
.pgid
1438 << " Regular scrub skipped due to deep-scrub errors and nodeep-scrub set";
1443 //NOSCRUB so skip regular scrubs
1444 if (!allow_scrub
&& !scrubber
.time_for_deep
) {
1447 // scrubber.must_scrub
1448 } else if (!scrubber
.must_deep_scrub
&& has_deep_errors
) {
1449 osd
->clog
->error() << "osd." << osd
->whoami
1450 << " pg " << info
.pgid
1451 << " Regular scrub request, deep-scrub details will be lost";
1453 // Unless precluded this was handle above
1454 scrubber
.need_auto
= false;
1456 ceph_assert(scrubber
.reserved_peers
.empty());
1457 bool allow_scrubing
= cct
->_conf
->osd_scrub_during_recovery
||
1458 (cct
->_conf
->osd_repair_during_recovery
&& scrubber
.must_repair
) ||
1459 !osd
->is_recovery_active();
1460 if (allow_scrubing
&&
1461 osd
->inc_scrubs_local()) {
1462 dout(20) << __func__
<< ": reserved locally, reserving replicas" << dendl
;
1463 scrubber
.local_reserved
= true;
1464 scrubber
.reserved_peers
.insert(pg_whoami
);
1465 scrub_reserve_replicas();
1467 dout(20) << __func__
<< ": failed to reserve locally" << dendl
;
1472 if (scrubber
.local_reserved
) {
1473 if (scrubber
.reserve_failed
) {
1474 dout(20) << __func__
<< ": failed, a peer declined" << dendl
;
1475 clear_scrub_reserved();
1476 scrub_unreserve_replicas();
1478 } else if (scrubber
.reserved_peers
.size() == get_actingset().size()) {
1479 dout(20) << __func__
<< ": success, reserved self and replicas" << dendl
;
1480 if (scrubber
.time_for_deep
) {
1481 dout(10) << __func__
<< ": scrub will be deep" << dendl
;
1482 state_set(PG_STATE_DEEP_SCRUB
);
1483 scrubber
.time_for_deep
= false;
1487 // none declined, since scrubber.reserved is set
1488 dout(20) << __func__
<< ": reserved " << scrubber
.reserved_peers
1489 << ", waiting for replicas" << dendl
;
1495 bool PG::is_scrub_registered()
1497 return !scrubber
.scrub_reg_stamp
.is_zero();
1500 void PG::reg_next_scrub()
1507 if (scrubber
.must_scrub
|| scrubber
.need_auto
) {
1508 // Set the smallest time that isn't utime_t()
1509 reg_stamp
= Scrubber::scrub_must_stamp();
1511 } else if (info
.stats
.stats_invalid
&& cct
->_conf
->osd_scrub_invalid_stats
) {
1512 reg_stamp
= ceph_clock_now();
1515 reg_stamp
= info
.history
.last_scrub_stamp
;
1517 // note down the sched_time, so we can locate this scrub, and remove it
1519 double scrub_min_interval
= 0, scrub_max_interval
= 0;
1520 pool
.info
.opts
.get(pool_opts_t::SCRUB_MIN_INTERVAL
, &scrub_min_interval
);
1521 pool
.info
.opts
.get(pool_opts_t::SCRUB_MAX_INTERVAL
, &scrub_max_interval
);
1522 ceph_assert(!is_scrub_registered());
1523 scrubber
.scrub_reg_stamp
= osd
->reg_pg_scrub(info
.pgid
,
1528 dout(10) << __func__
<< " pg " << pg_id
<< " register next scrub, scrub time "
1529 << scrubber
.scrub_reg_stamp
<< ", must = " << (int)must
<< dendl
;
1532 void PG::unreg_next_scrub()
1534 if (is_scrub_registered()) {
1535 osd
->unreg_pg_scrub(info
.pgid
, scrubber
.scrub_reg_stamp
);
1536 scrubber
.scrub_reg_stamp
= utime_t();
1540 void PG::on_info_history_change()
1546 void PG::scrub_requested(bool deep
, bool repair
, bool need_auto
)
1550 scrubber
.need_auto
= true;
1552 scrubber
.must_scrub
= true;
1553 scrubber
.must_deep_scrub
= deep
|| repair
;
1554 scrubber
.must_repair
= repair
;
1555 // User might intervene, so clear this
1556 scrubber
.need_auto
= false;
1557 scrubber
.req_scrub
= true;
1562 void PG::clear_ready_to_merge() {
1563 osd
->clear_ready_to_merge(this);
1566 void PG::queue_want_pg_temp(const vector
<int> &wanted
) {
1567 osd
->queue_want_pg_temp(get_pgid().pgid
, wanted
);
1570 void PG::clear_want_pg_temp() {
1571 osd
->remove_want_pg_temp(get_pgid().pgid
);
1574 void PG::on_role_change() {
1575 requeue_ops(waiting_for_peered
);
1576 plpg_on_role_change();
1579 void PG::on_new_interval() {
1580 scrub_queued
= false;
1581 projected_last_update
= eversion_t();
1585 epoch_t
PG::oldest_stored_osdmap() {
1586 return osd
->get_superblock().oldest_map
;
1589 OstreamTemp
PG::get_clog_info() {
1590 return osd
->clog
->info();
1593 OstreamTemp
PG::get_clog_debug() {
1594 return osd
->clog
->debug();
1597 OstreamTemp
PG::get_clog_error() {
1598 return osd
->clog
->error();
1601 void PG::schedule_event_after(
1602 PGPeeringEventRef event
,
1604 std::lock_guard
lock(osd
->recovery_request_lock
);
1605 osd
->recovery_request_timer
.add_event_after(
1607 new QueuePeeringEvt(
1612 void PG::request_local_background_io_reservation(
1614 PGPeeringEventRef on_grant
,
1615 PGPeeringEventRef on_preempt
) {
1616 osd
->local_reserver
.request_reservation(
1618 on_grant
? new QueuePeeringEvt(
1619 this, on_grant
) : nullptr,
1621 on_preempt
? new QueuePeeringEvt(
1622 this, on_preempt
) : nullptr);
1625 void PG::update_local_background_io_priority(
1626 unsigned priority
) {
1627 osd
->local_reserver
.update_priority(
1632 void PG::cancel_local_background_io_reservation() {
1633 osd
->local_reserver
.cancel_reservation(
1637 void PG::request_remote_recovery_reservation(
1639 PGPeeringEventRef on_grant
,
1640 PGPeeringEventRef on_preempt
) {
1641 osd
->remote_reserver
.request_reservation(
1643 on_grant
? new QueuePeeringEvt(
1644 this, on_grant
) : nullptr,
1646 on_preempt
? new QueuePeeringEvt(
1647 this, on_preempt
) : nullptr);
1650 void PG::cancel_remote_recovery_reservation() {
1651 osd
->remote_reserver
.cancel_reservation(
1655 void PG::schedule_event_on_commit(
1656 ObjectStore::Transaction
&t
,
1657 PGPeeringEventRef on_commit
)
1659 t
.register_on_commit(new QueuePeeringEvt(this, on_commit
));
1662 void PG::on_active_exit()
1664 backfill_reserving
= false;
1668 void PG::on_active_advmap(const OSDMapRef
&osdmap
)
1670 const auto& new_removed_snaps
= osdmap
->get_new_removed_snaps();
1671 auto i
= new_removed_snaps
.find(get_pgid().pool());
1672 if (i
!= new_removed_snaps
.end()) {
1674 for (auto j
: i
->second
) {
1675 if (snap_trimq
.intersects(j
.first
, j
.second
)) {
1676 decltype(snap_trimq
) added
, overlap
;
1677 added
.insert(j
.first
, j
.second
);
1678 overlap
.intersection_of(snap_trimq
, added
);
1679 derr
<< __func__
<< " removed_snaps already contains "
1680 << overlap
<< dendl
;
1682 snap_trimq
.union_of(added
);
1684 snap_trimq
.insert(j
.first
, j
.second
);
1687 dout(10) << __func__
<< " new removed_snaps " << i
->second
1688 << ", snap_trimq now " << snap_trimq
<< dendl
;
1689 ceph_assert(!bad
|| !cct
->_conf
->osd_debug_verify_cached_snaps
);
1692 const auto& new_purged_snaps
= osdmap
->get_new_purged_snaps();
1693 auto j
= new_purged_snaps
.find(get_pgid().pgid
.pool());
1694 if (j
!= new_purged_snaps
.end()) {
1696 for (auto k
: j
->second
) {
1697 if (!recovery_state
.get_info().purged_snaps
.contains(k
.first
, k
.second
)) {
1698 interval_set
<snapid_t
> rm
, overlap
;
1699 rm
.insert(k
.first
, k
.second
);
1700 overlap
.intersection_of(recovery_state
.get_info().purged_snaps
, rm
);
1701 derr
<< __func__
<< " purged_snaps does not contain "
1702 << rm
<< ", only " << overlap
<< dendl
;
1703 recovery_state
.adjust_purged_snaps(
1704 [&overlap
](auto &purged_snaps
) {
1705 purged_snaps
.subtract(overlap
);
1707 // This can currently happen in the normal (if unlikely) course of
1708 // events. Because adding snaps to purged_snaps does not increase
1709 // the pg version or add a pg log entry, we don't reliably propagate
1710 // purged_snaps additions to other OSDs.
1713 // - primary and replicas update purged_snaps
1714 // - no object updates
1715 // - pg mapping changes, new primary on different node
1716 // - new primary pg version == eversion_t(), so info is not
1720 recovery_state
.adjust_purged_snaps(
1721 [&k
](auto &purged_snaps
) {
1722 purged_snaps
.erase(k
.first
, k
.second
);
1726 dout(10) << __func__
<< " new purged_snaps " << j
->second
1727 << ", now " << recovery_state
.get_info().purged_snaps
<< dendl
;
1728 ceph_assert(!bad
|| !cct
->_conf
->osd_debug_verify_cached_snaps
);
1732 void PG::queue_snap_retrim(snapid_t snap
)
1736 dout(10) << __func__
<< " snap " << snap
<< " - not active and primary"
1740 if (!snap_trimq
.contains(snap
)) {
1741 snap_trimq
.insert(snap
);
1742 snap_trimq_repeat
.insert(snap
);
1743 dout(20) << __func__
<< " snap " << snap
1744 << ", trimq now " << snap_trimq
1745 << ", repeat " << snap_trimq_repeat
<< dendl
;
1748 dout(20) << __func__
<< " snap " << snap
1749 << " already in trimq " << snap_trimq
<< dendl
;
1753 void PG::on_active_actmap()
1755 if (cct
->_conf
->osd_check_for_log_corruption
)
1756 check_log_for_corruption(osd
->store
);
1759 if (recovery_state
.is_active()) {
1760 dout(10) << "Active: kicking snap trim" << dendl
;
1764 if (recovery_state
.is_peered() &&
1765 !recovery_state
.is_clean() &&
1766 !recovery_state
.get_osdmap()->test_flag(CEPH_OSDMAP_NOBACKFILL
) &&
1767 (!recovery_state
.get_osdmap()->test_flag(CEPH_OSDMAP_NOREBALANCE
) ||
1768 recovery_state
.is_degraded())) {
1773 void PG::on_backfill_reserved()
1775 backfill_reserving
= false;
1779 void PG::on_backfill_canceled()
1781 if (!waiting_on_backfill
.empty()) {
1782 waiting_on_backfill
.clear();
1783 finish_recovery_op(hobject_t::get_max());
1787 void PG::on_recovery_reserved()
1792 void PG::set_not_ready_to_merge_target(pg_t pgid
, pg_t src
)
1794 osd
->set_not_ready_to_merge_target(pgid
, src
);
1797 void PG::set_not_ready_to_merge_source(pg_t pgid
)
1799 osd
->set_not_ready_to_merge_source(pgid
);
1802 void PG::set_ready_to_merge_target(eversion_t lu
, epoch_t les
, epoch_t lec
)
1804 osd
->set_ready_to_merge_target(this, lu
, les
, lec
);
1807 void PG::set_ready_to_merge_source(eversion_t lu
)
1809 osd
->set_ready_to_merge_source(this, lu
);
1812 void PG::send_pg_created(pg_t pgid
)
1814 osd
->send_pg_created(pgid
);
1817 ceph::signedspan
PG::get_mnow()
1819 return osd
->get_mnow();
1822 HeartbeatStampsRef
PG::get_hb_stamps(int peer
)
1824 return osd
->get_hb_stamps(peer
);
1827 void PG::schedule_renew_lease(epoch_t lpr
, ceph::timespan delay
)
1829 auto spgid
= info
.pgid
;
1831 osd
->mono_timer
.add_event(
1834 o
->queue_renew_lease(lpr
, spgid
);
1838 void PG::queue_check_readable(epoch_t lpr
, ceph::timespan delay
)
1840 osd
->queue_check_readable(info
.pgid
, lpr
, delay
);
1843 void PG::rebuild_missing_set_with_deletes(PGLog
&pglog
)
1845 pglog
.rebuild_missing_set_with_deletes(
1848 recovery_state
.get_info());
1851 void PG::on_activate_committed()
1853 if (!is_primary()) {
1855 if (recovery_state
.needs_flush() == 0) {
1856 requeue_ops(waiting_for_peered
);
1857 } else if (!waiting_for_peered
.empty()) {
1858 dout(10) << __func__
<< " flushes in progress, moving "
1859 << waiting_for_peered
.size() << " items to waiting_for_flush"
1861 ceph_assert(waiting_for_flush
.empty());
1862 waiting_for_flush
.swap(waiting_for_peered
);
1867 void PG::do_replica_scrub_map(OpRequestRef op
)
1869 auto m
= op
->get_req
<MOSDRepScrubMap
>();
1870 dout(7) << __func__
<< " " << *m
<< dendl
;
1871 if (m
->map_epoch
< info
.history
.same_interval_since
) {
1872 dout(10) << __func__
<< " discarding old from "
1873 << m
->map_epoch
<< " < " << info
.history
.same_interval_since
1877 if (!scrubber
.is_chunky_scrub_active()) {
1878 dout(10) << __func__
<< " scrub isn't active" << dendl
;
1884 auto p
= const_cast<bufferlist
&>(m
->get_data()).cbegin();
1885 scrubber
.received_maps
[m
->from
].decode(p
, info
.pgid
.pool());
1886 dout(10) << "map version is "
1887 << scrubber
.received_maps
[m
->from
].valid_through
1890 dout(10) << __func__
<< " waiting_on_whom was " << scrubber
.waiting_on_whom
1892 ceph_assert(scrubber
.waiting_on_whom
.count(m
->from
));
1893 scrubber
.waiting_on_whom
.erase(m
->from
);
1895 dout(10) << __func__
<< " replica was preempted, setting flag" << dendl
;
1896 scrub_preempted
= true;
1898 if (scrubber
.waiting_on_whom
.empty()) {
1899 requeue_scrub(ops_blocked_by_scrub());
1903 // send scrub v3 messages (chunky scrub)
1904 void PG::_request_scrub_map(
1905 pg_shard_t replica
, eversion_t version
,
1906 hobject_t start
, hobject_t end
,
1908 bool allow_preemption
)
1910 ceph_assert(replica
!= pg_whoami
);
1911 dout(10) << "scrub requesting scrubmap from osd." << replica
1912 << " deep " << (int)deep
<< dendl
;
1913 MOSDRepScrub
*repscrubop
= new MOSDRepScrub(
1914 spg_t(info
.pgid
.pgid
, replica
.shard
), version
,
1916 get_last_peering_reset(),
1920 ops_blocked_by_scrub());
1921 // default priority, we want the rep scrub processed prior to any recovery
1922 // or client io messages (we are holding a lock!)
1923 osd
->send_message_osd_cluster(
1924 replica
.osd
, repscrubop
, get_osdmap_epoch());
1927 void PG::handle_scrub_reserve_request(OpRequestRef op
)
1929 dout(7) << __func__
<< " " << *op
->get_req() << dendl
;
1931 if (scrubber
.remote_reserved
) {
1932 dout(10) << __func__
<< " ignoring reserve request: Already reserved"
1936 if ((cct
->_conf
->osd_scrub_during_recovery
|| !osd
->is_recovery_active()) &&
1937 osd
->inc_scrubs_remote()) {
1938 scrubber
.remote_reserved
= true;
1940 dout(20) << __func__
<< ": failed to reserve remotely" << dendl
;
1941 scrubber
.remote_reserved
= false;
1943 auto m
= op
->get_req
<MOSDScrubReserve
>();
1944 Message
*reply
= new MOSDScrubReserve(
1945 spg_t(info
.pgid
.pgid
, get_primary().shard
),
1947 scrubber
.remote_reserved
? MOSDScrubReserve::GRANT
: MOSDScrubReserve::REJECT
,
1949 osd
->send_message_osd_cluster(reply
, op
->get_req()->get_connection());
1952 void PG::handle_scrub_reserve_grant(OpRequestRef op
, pg_shard_t from
)
1954 dout(7) << __func__
<< " " << *op
->get_req() << dendl
;
1956 if (!scrubber
.local_reserved
) {
1957 dout(10) << "ignoring obsolete scrub reserve reply" << dendl
;
1960 if (scrubber
.reserved_peers
.find(from
) != scrubber
.reserved_peers
.end()) {
1961 dout(10) << " already had osd." << from
<< " reserved" << dendl
;
1963 dout(10) << " osd." << from
<< " scrub reserve = success" << dendl
;
1964 scrubber
.reserved_peers
.insert(from
);
1969 void PG::handle_scrub_reserve_reject(OpRequestRef op
, pg_shard_t from
)
1971 dout(7) << __func__
<< " " << *op
->get_req() << dendl
;
1973 if (!scrubber
.local_reserved
) {
1974 dout(10) << "ignoring obsolete scrub reserve reply" << dendl
;
1977 if (scrubber
.reserved_peers
.find(from
) != scrubber
.reserved_peers
.end()) {
1978 dout(10) << " already had osd." << from
<< " reserved" << dendl
;
1980 /* One decline stops this pg from being scheduled for scrubbing. */
1981 dout(10) << " osd." << from
<< " scrub reserve = fail" << dendl
;
1982 scrubber
.reserve_failed
= true;
1987 void PG::handle_scrub_reserve_release(OpRequestRef op
)
1989 dout(7) << __func__
<< " " << *op
->get_req() << dendl
;
1991 clear_scrub_reserved();
1994 // Compute pending backfill data
1995 static int64_t pending_backfill(CephContext
*cct
, int64_t bf_bytes
, int64_t local_bytes
)
1997 lgeneric_dout(cct
, 20) << __func__
<< " Adjust local usage "
1998 << (local_bytes
>> 10) << "KiB"
1999 << " primary usage " << (bf_bytes
>> 10)
2002 return std::max((int64_t)0, bf_bytes
- local_bytes
);
2006 // We can zero the value of primary num_bytes as just an atomic.
2007 // However, setting above zero reserves space for backfill and requires
2008 // the OSDService::stat_lock which protects all OSD usage
2009 bool PG::try_reserve_recovery_space(
2010 int64_t primary_bytes
, int64_t local_bytes
) {
2011 // Use tentative_bacfill_full() to make sure enough
2012 // space is available to handle target bytes from primary.
2014 // TODO: If we passed num_objects from primary we could account for
2015 // an estimate of the metadata overhead.
2017 // TODO: If we had compressed_allocated and compressed_original from primary
2018 // we could compute compression ratio and adjust accordingly.
2020 // XXX: There is no way to get omap overhead and this would only apply
2021 // to whatever possibly different partition that is storing the database.
2023 // update_osd_stat() from heartbeat will do this on a new
2024 // statfs using ps->primary_bytes.
2025 uint64_t pending_adjustment
= 0;
2026 if (primary_bytes
) {
2027 // For erasure coded pool overestimate by a full stripe per object
2028 // because we don't know how each objected rounded to the nearest stripe
2029 if (pool
.info
.is_erasure()) {
2030 primary_bytes
/= (int)get_pgbackend()->get_ec_data_chunk_count();
2031 primary_bytes
+= get_pgbackend()->get_ec_stripe_chunk_size() *
2032 info
.stats
.stats
.sum
.num_objects
;
2033 local_bytes
/= (int)get_pgbackend()->get_ec_data_chunk_count();
2034 local_bytes
+= get_pgbackend()->get_ec_stripe_chunk_size() *
2035 info
.stats
.stats
.sum
.num_objects
;
2037 pending_adjustment
= pending_backfill(
2041 dout(10) << __func__
<< " primary_bytes " << (primary_bytes
>> 10)
2043 << " local " << (local_bytes
>> 10) << "KiB"
2044 << " pending_adjustments " << (pending_adjustment
>> 10) << "KiB"
2048 // This lock protects not only the stats OSDService but also setting the
2049 // pg primary_bytes. That's why we don't immediately unlock
2050 std::lock_guard l
{osd
->stat_lock
};
2051 osd_stat_t cur_stat
= osd
->osd_stat
;
2052 if (cct
->_conf
->osd_debug_reject_backfill_probability
> 0 &&
2053 (rand()%1000 < (cct
->_conf
->osd_debug_reject_backfill_probability
*1000.0))) {
2054 dout(10) << "backfill reservation rejected: failure injection"
2057 } else if (!cct
->_conf
->osd_debug_skip_full_check_in_backfill_reservation
&&
2058 osd
->tentative_backfill_full(this, pending_adjustment
, cur_stat
)) {
2059 dout(10) << "backfill reservation rejected: backfill full"
2063 // Don't reserve space if skipped reservation check, this is used
2064 // to test the other backfill full check AND in case a corruption
2065 // of num_bytes requires ignoring that value and trying the
2067 if (primary_bytes
&&
2068 !cct
->_conf
->osd_debug_skip_full_check_in_backfill_reservation
) {
2069 primary_num_bytes
.store(primary_bytes
);
2070 local_num_bytes
.store(local_bytes
);
2072 unreserve_recovery_space();
2078 void PG::unreserve_recovery_space() {
2079 primary_num_bytes
.store(0);
2080 local_num_bytes
.store(0);
2084 void PG::clear_scrub_reserved()
2086 scrubber
.reserved_peers
.clear();
2087 scrubber
.reserve_failed
= false;
2089 if (scrubber
.local_reserved
) {
2090 scrubber
.local_reserved
= false;
2091 osd
->dec_scrubs_local();
2093 if (scrubber
.remote_reserved
) {
2094 scrubber
.remote_reserved
= false;
2095 osd
->dec_scrubs_remote();
2099 void PG::scrub_reserve_replicas()
2101 ceph_assert(recovery_state
.get_backfill_targets().empty());
2102 std::vector
<std::pair
<int, Message
*>> messages
;
2103 messages
.reserve(get_actingset().size());
2104 epoch_t e
= get_osdmap_epoch();
2105 for (set
<pg_shard_t
>::iterator i
= get_actingset().begin();
2106 i
!= get_actingset().end();
2108 if (*i
== pg_whoami
) continue;
2109 dout(10) << "scrub requesting reserve from osd." << *i
<< dendl
;
2110 Message
* m
= new MOSDScrubReserve(spg_t(info
.pgid
.pgid
, i
->shard
), e
,
2111 MOSDScrubReserve::REQUEST
, pg_whoami
);
2112 messages
.push_back(std::make_pair(i
->osd
, m
));
2114 if (!messages
.empty()) {
2115 osd
->send_message_osd_cluster(messages
, e
);
2119 void PG::scrub_unreserve_replicas()
2121 ceph_assert(recovery_state
.get_backfill_targets().empty());
2122 std::vector
<std::pair
<int, Message
*>> messages
;
2123 messages
.reserve(get_actingset().size());
2124 epoch_t e
= get_osdmap_epoch();
2125 for (set
<pg_shard_t
>::iterator i
= get_actingset().begin();
2126 i
!= get_actingset().end();
2128 if (*i
== pg_whoami
) continue;
2129 dout(10) << "scrub requesting unreserve from osd." << *i
<< dendl
;
2130 Message
* m
= new MOSDScrubReserve(spg_t(info
.pgid
.pgid
, i
->shard
), e
,
2131 MOSDScrubReserve::RELEASE
, pg_whoami
);
2132 messages
.push_back(std::make_pair(i
->osd
, m
));
2134 if (!messages
.empty()) {
2135 osd
->send_message_osd_cluster(messages
, e
);
2139 void PG::_scan_rollback_obs(const vector
<ghobject_t
> &rollback_obs
)
2141 ObjectStore::Transaction t
;
2142 eversion_t trimmed_to
= recovery_state
.get_last_rollback_info_trimmed_to_applied();
2143 for (vector
<ghobject_t
>::const_iterator i
= rollback_obs
.begin();
2144 i
!= rollback_obs
.end();
2146 if (i
->generation
< trimmed_to
.version
) {
2147 dout(10) << __func__
<< "osd." << osd
->whoami
2148 << " pg " << info
.pgid
2149 << " found obsolete rollback obj "
2150 << *i
<< " generation < trimmed_to "
2152 << "...repaired" << dendl
;
2157 derr
<< __func__
<< ": queueing trans to clean up obsolete rollback objs"
2159 osd
->store
->queue_transaction(ch
, std::move(t
), NULL
);
2163 void PG::_scan_snaps(ScrubMap
&smap
)
2168 // Test qa/standalone/scrub/osd-scrub-snaps.sh uses this message to verify
2169 // caller using clean_meta_map(), and it works properly.
2170 dout(20) << __func__
<< " start" << dendl
;
2172 for (map
<hobject_t
, ScrubMap::object
>::reverse_iterator i
= smap
.objects
.rbegin();
2173 i
!= smap
.objects
.rend();
2175 const hobject_t
&hoid
= i
->first
;
2176 ScrubMap::object
&o
= i
->second
;
2178 dout(20) << __func__
<< " " << hoid
<< dendl
;
2180 ceph_assert(!hoid
.is_snapdir());
2181 if (hoid
.is_head()) {
2182 // parse the SnapSet
2184 if (o
.attrs
.find(SS_ATTR
) == o
.attrs
.end()) {
2187 bl
.push_back(o
.attrs
[SS_ATTR
]);
2188 auto p
= bl
.cbegin();
2194 head
= hoid
.get_head();
2197 if (hoid
.snap
< CEPH_MAXSNAP
) {
2198 // check and if necessary fix snap_mapper
2199 if (hoid
.get_head() != head
) {
2200 derr
<< __func__
<< " no head for " << hoid
<< " (have " << head
<< ")"
2204 set
<snapid_t
> obj_snaps
;
2205 auto p
= snapset
.clone_snaps
.find(hoid
.snap
);
2206 if (p
== snapset
.clone_snaps
.end()) {
2207 derr
<< __func__
<< " no clone_snaps for " << hoid
<< " in " << snapset
2211 obj_snaps
.insert(p
->second
.begin(), p
->second
.end());
2212 set
<snapid_t
> cur_snaps
;
2213 int r
= snap_mapper
.get_snaps(hoid
, &cur_snaps
);
2214 if (r
!= 0 && r
!= -ENOENT
) {
2215 derr
<< __func__
<< ": get_snaps returned " << cpp_strerror(r
) << dendl
;
2218 if (r
== -ENOENT
|| cur_snaps
!= obj_snaps
) {
2219 ObjectStore::Transaction t
;
2220 OSDriver::OSTransaction
_t(osdriver
.get_transaction(&t
));
2222 r
= snap_mapper
.remove_oid(hoid
, &_t
);
2224 derr
<< __func__
<< ": remove_oid returned " << cpp_strerror(r
)
2228 osd
->clog
->error() << "osd." << osd
->whoami
2229 << " found snap mapper error on pg "
2231 << " oid " << hoid
<< " snaps in mapper: "
2232 << cur_snaps
<< ", oi: "
2236 osd
->clog
->error() << "osd." << osd
->whoami
2237 << " found snap mapper error on pg "
2239 << " oid " << hoid
<< " snaps missing in mapper"
2242 << " was " << cur_snaps
<< " r " << r
2245 snap_mapper
.add_oid(hoid
, obj_snaps
, &_t
);
2247 // wait for repair to apply to avoid confusing other bits of the system.
2249 ceph::condition_variable my_cond
;
2250 ceph::mutex my_lock
= ceph::make_mutex("PG::_scan_snaps my_lock");
2253 t
.register_on_applied_sync(
2254 new C_SafeCond(my_lock
, my_cond
, &done
, &r
));
2255 r
= osd
->store
->queue_transaction(ch
, std::move(t
));
2257 derr
<< __func__
<< ": queue_transaction got " << cpp_strerror(r
)
2260 std::unique_lock l
{my_lock
};
2261 my_cond
.wait(l
, [&done
] { return done
;});
2269 void PG::_repair_oinfo_oid(ScrubMap
&smap
)
2271 for (map
<hobject_t
, ScrubMap::object
>::reverse_iterator i
= smap
.objects
.rbegin();
2272 i
!= smap
.objects
.rend();
2274 const hobject_t
&hoid
= i
->first
;
2275 ScrubMap::object
&o
= i
->second
;
2278 if (o
.attrs
.find(OI_ATTR
) == o
.attrs
.end()) {
2281 bl
.push_back(o
.attrs
[OI_ATTR
]);
2288 if (oi
.soid
!= hoid
) {
2289 ObjectStore::Transaction t
;
2290 OSDriver::OSTransaction
_t(osdriver
.get_transaction(&t
));
2291 osd
->clog
->error() << "osd." << osd
->whoami
2292 << " found object info error on pg "
2294 << " oid " << hoid
<< " oid in object info: "
2300 encode(oi
, bl
, get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD
, nullptr));
2302 bufferptr
bp(bl
.c_str(), bl
.length());
2303 o
.attrs
[OI_ATTR
] = bp
;
2305 t
.setattr(coll
, ghobject_t(hoid
), OI_ATTR
, bl
);
2306 int r
= osd
->store
->queue_transaction(ch
, std::move(t
));
2308 derr
<< __func__
<< ": queue_transaction got " << cpp_strerror(r
)
2314 int PG::build_scrub_map_chunk(
2316 ScrubMapBuilder
&pos
,
2320 ThreadPool::TPHandle
&handle
)
2322 dout(10) << __func__
<< " [" << start
<< "," << end
<< ") "
2327 while (pos
.empty()) {
2329 map
.valid_through
= info
.last_update
;
2332 vector
<ghobject_t
> rollback_obs
;
2333 pos
.ret
= get_pgbackend()->objects_list_range(
2339 dout(5) << "objects_list_range error: " << pos
.ret
<< dendl
;
2342 if (pos
.ls
.empty()) {
2345 _scan_rollback_obs(rollback_obs
);
2347 return -EINPROGRESS
;
2351 while (!pos
.done()) {
2352 int r
= get_pgbackend()->be_scan_list(map
, pos
);
2353 if (r
== -EINPROGRESS
) {
2359 dout(20) << __func__
<< " finishing" << dendl
;
2360 ceph_assert(pos
.done());
2361 _repair_oinfo_oid(map
);
2362 if (!is_primary()) {
2363 ScrubMap for_meta_scrub
;
2364 // In case we restarted smaller chunk, clear old data
2365 scrubber
.cleaned_meta_map
.clear_from(scrubber
.start
);
2366 scrubber
.cleaned_meta_map
.insert(map
);
2367 scrubber
.clean_meta_map(for_meta_scrub
);
2368 _scan_snaps(for_meta_scrub
);
2371 dout(20) << __func__
<< " done, got " << map
.objects
.size() << " items"
2376 void PG::Scrubber::cleanup_store(ObjectStore::Transaction
*t
) {
2379 struct OnComplete
: Context
{
2380 std::unique_ptr
<Scrub::Store
> store
;
2381 explicit OnComplete(
2382 std::unique_ptr
<Scrub::Store
> &&store
)
2383 : store(std::move(store
)) {}
2384 void finish(int) override
{}
2387 t
->register_on_complete(new OnComplete(std::move(store
)));
2388 ceph_assert(!store
);
2391 void PG::repair_object(
2392 const hobject_t
&soid
,
2393 const list
<pair
<ScrubMap::object
, pg_shard_t
> > &ok_peers
,
2394 const set
<pg_shard_t
> &bad_peers
)
2396 set
<pg_shard_t
> ok_shards
;
2397 for (auto &&peer
: ok_peers
) ok_shards
.insert(peer
.second
);
2399 dout(10) << "repair_object " << soid
2400 << " bad_peers osd.{" << bad_peers
<< "},"
2401 << " ok_peers osd.{" << ok_shards
<< "}" << dendl
;
2403 const ScrubMap::object
&po
= ok_peers
.back().first
;
2408 if (po
.attrs
.count(OI_ATTR
)) {
2409 bv
.push_back(po
.attrs
.find(OI_ATTR
)->second
);
2411 auto bliter
= bv
.cbegin();
2414 dout(0) << __func__
<< ": Need version of replica, bad object_info_t: "
2419 if (bad_peers
.count(get_primary())) {
2420 // We should only be scrubbing if the PG is clean.
2421 ceph_assert(waiting_for_unreadable_object
.empty());
2422 dout(10) << __func__
<< ": primary = " << get_primary() << dendl
;
2425 /* No need to pass ok_peers, they must not be missing the object, so
2426 * force_object_missing will add them to missing_loc anyway */
2427 recovery_state
.force_object_missing(bad_peers
, soid
, oi
.version
);
2432 * Wait for last_update_applied to match msg->scrub_to as above. Wait
2433 * for pushes to complete in case of recent recovery. Build a single
2434 * scrubmap of objects that are in the range [msg->start, msg->end).
2436 void PG::replica_scrub(
2438 ThreadPool::TPHandle
&handle
)
2440 auto msg
= op
->get_req
<MOSDRepScrub
>();
2441 ceph_assert(!scrubber
.active_rep_scrub
);
2442 dout(7) << "replica_scrub" << dendl
;
2444 if (msg
->map_epoch
< info
.history
.same_interval_since
) {
2445 dout(10) << "replica_scrub discarding old replica_scrub from "
2446 << msg
->map_epoch
<< " < " << info
.history
.same_interval_since
2451 ceph_assert(msg
->chunky
);
2452 if (active_pushes
> 0) {
2453 dout(10) << "waiting for active pushes to finish" << dendl
;
2454 scrubber
.active_rep_scrub
= op
;
2458 scrubber
.state
= Scrubber::BUILD_MAP_REPLICA
;
2459 scrubber
.replica_scrub_start
= msg
->min_epoch
;
2460 scrubber
.start
= msg
->start
;
2461 scrubber
.end
= msg
->end
;
2462 scrubber
.max_end
= msg
->end
;
2463 scrubber
.deep
= msg
->deep
;
2464 scrubber
.epoch_start
= info
.history
.same_interval_since
;
2465 if (msg
->priority
) {
2466 scrubber
.priority
= msg
->priority
;
2468 scrubber
.priority
= get_scrub_priority();
2471 scrub_can_preempt
= msg
->allow_preemption
;
2472 scrub_preempted
= false;
2473 scrubber
.replica_scrubmap_pos
.reset();
2475 requeue_scrub(msg
->high_priority
);
2479 * PG_STATE_SCRUBBING is set when the scrub is queued
2481 * scrub will be chunky if all OSDs in PG support chunky scrub
2482 * scrub will fail if OSDs are too old.
2484 void PG::scrub(epoch_t queued
, ThreadPool::TPHandle
&handle
)
2486 OSDService
*osds
= osd
;
2487 double scrub_sleep
= osds
->osd
->scrub_sleep_time(scrubber
.must_scrub
);
2488 if (scrub_sleep
> 0 &&
2489 (scrubber
.state
== PG::Scrubber::NEW_CHUNK
||
2490 scrubber
.state
== PG::Scrubber::INACTIVE
) &&
2491 scrubber
.needs_sleep
) {
2492 ceph_assert(!scrubber
.sleeping
);
2493 dout(20) << __func__
<< " state is INACTIVE|NEW_CHUNK, sleeping" << dendl
;
2495 // Do an async sleep so we don't block the op queue
2496 spg_t pgid
= get_pgid();
2497 int state
= scrubber
.state
;
2498 auto scrub_requeue_callback
=
2499 new LambdaContext([osds
, pgid
, state
](int r
) {
2500 PGRef pg
= osds
->osd
->lookup_lock_pg(pgid
);
2501 if (pg
== nullptr) {
2502 lgeneric_dout(osds
->osd
->cct
, 20)
2503 << "scrub_requeue_callback: Could not find "
2504 << "PG " << pgid
<< " can't complete scrub requeue after sleep"
2508 pg
->scrubber
.sleeping
= false;
2509 pg
->scrubber
.needs_sleep
= false;
2510 lgeneric_dout(pg
->cct
, 20)
2511 << "scrub_requeue_callback: slept for "
2512 << ceph_clock_now() - pg
->scrubber
.sleep_start
2513 << ", re-queuing scrub with state " << state
<< dendl
;
2514 pg
->scrub_queued
= false;
2515 pg
->requeue_scrub();
2516 pg
->scrubber
.sleep_start
= utime_t();
2519 std::lock_guard
l(osd
->sleep_lock
);
2520 osd
->sleep_timer
.add_event_after(scrub_sleep
,
2521 scrub_requeue_callback
);
2522 scrubber
.sleeping
= true;
2523 scrubber
.sleep_start
= ceph_clock_now();
2526 if (pg_has_reset_since(queued
)) {
2529 ceph_assert(scrub_queued
);
2530 scrub_queued
= false;
2531 scrubber
.needs_sleep
= true;
2534 if (!is_primary() &&
2535 scrubber
.state
== PG::Scrubber::BUILD_MAP_REPLICA
) {
2536 chunky_scrub(handle
);
2540 if (!is_primary() || !is_active() || !is_clean() || !is_scrubbing()) {
2541 dout(10) << "scrub -- not primary or active or not clean" << dendl
;
2542 state_clear(PG_STATE_SCRUBBING
);
2543 state_clear(PG_STATE_REPAIR
);
2544 state_clear(PG_STATE_DEEP_SCRUB
);
2545 publish_stats_to_osd();
2549 if (!scrubber
.active
) {
2550 ceph_assert(recovery_state
.get_backfill_targets().empty());
2552 scrubber
.deep
= state_test(PG_STATE_DEEP_SCRUB
);
2554 dout(10) << "starting a new chunky scrub" << dendl
;
2557 chunky_scrub(handle
);
2560 void PG::abort_scrub()
2562 scrub_clear_state();
2563 scrub_unreserve_replicas();
2567 * Chunky scrub scrubs objects one chunk at a time with writes blocked for that
2570 * The object store is partitioned into chunks which end on hash boundaries. For
2571 * each chunk, the following logic is performed:
2573 * (1) Block writes on the chunk
2574 * (2) Request maps from replicas
2575 * (3) Wait for pushes to be applied (after recovery)
2576 * (4) Wait for writes to flush on the chunk
2577 * (5) Wait for maps from replicas
2578 * (6) Compare / repair all scrub maps
2579 * (7) Wait for digest updates to apply
2581 * This logic is encoded in the mostly linear state machine:
2583 * +------------------+
2584 * _________v__________ |
2587 * |____________________| |
2590 * _________v___v______ | |
2593 * |____________________| | |
2595 * _________v__________ | |
2597 * | WAIT_PUSHES | | |
2598 * |____________________| | |
2600 * _________v__________ | |
2602 * | WAIT_LAST_UPDATE | | |
2603 * |____________________| | |
2605 * _________v__________ | |
2608 * |____________________| | |
2610 * _________v__________ | |
2612 * | WAIT_REPLICAS | | |
2613 * |____________________| | |
2615 * _________v__________ | |
2617 * | COMPARE_MAPS | | |
2618 * |____________________| | |
2621 * _________v__________ | |
2623 * |WAIT_DIGEST_UPDATES | | |
2624 * |____________________| | |
2627 * _________v__________ |
2630 * |____________________| |
2632 * +------------------+
2634 * The primary determines the last update from the subset by walking the log. If
2635 * it sees a log entry pertaining to a file in the chunk, it tells the replicas
2636 * to wait until that update is applied before building a scrub map. Both the
2637 * primary and replicas will wait for any active pushes to be applied.
2639 * In contrast to classic_scrub, chunky_scrub is entirely handled by scrub_wq.
2641 * scrubber.state encodes the current state of the scrub (refer to state diagram
2644 void PG::chunky_scrub(ThreadPool::TPHandle
&handle
)
2646 // Since repair is only by request and we need to scrub afterward
2647 // treat the same as req_scrub.
2648 if (!scrubber
.req_scrub
) {
2649 if (state_test(PG_STATE_DEEP_SCRUB
)) {
2650 if (get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB
) ||
2651 pool
.info
.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB
)) {
2652 dout(10) << "nodeep_scrub set, aborting" << dendl
;
2656 } else if (state_test(PG_STATE_SCRUBBING
)) {
2657 if (get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB
) || pool
.info
.has_flag(pg_pool_t::FLAG_NOSCRUB
)) {
2658 dout(10) << "noscrub set, aborting" << dendl
;
2664 // check for map changes
2665 if (scrubber
.is_chunky_scrub_active()) {
2666 if (scrubber
.epoch_start
!= info
.history
.same_interval_since
) {
2667 dout(10) << "scrub pg changed, aborting" << dendl
;
2677 dout(20) << "scrub state " << Scrubber::state_string(scrubber
.state
)
2678 << " [" << scrubber
.start
<< "," << scrubber
.end
<< ")"
2679 << " max_end " << scrubber
.max_end
<< dendl
;
2681 switch (scrubber
.state
) {
2682 case PG::Scrubber::INACTIVE
:
2683 dout(10) << "scrub start" << dendl
;
2684 ceph_assert(is_primary());
2686 publish_stats_to_osd();
2687 scrubber
.epoch_start
= info
.history
.same_interval_since
;
2688 scrubber
.active
= true;
2691 ObjectStore::Transaction t
;
2692 scrubber
.cleanup_store(&t
);
2693 scrubber
.store
.reset(Scrub::Store::create(osd
->store
, &t
,
2695 osd
->store
->queue_transaction(ch
, std::move(t
), nullptr);
2698 // Don't include temporary objects when scrubbing
2699 scrubber
.start
= info
.pgid
.pgid
.get_hobj_start();
2700 scrubber
.state
= PG::Scrubber::NEW_CHUNK
;
2703 bool repair
= state_test(PG_STATE_REPAIR
);
2704 bool deep_scrub
= state_test(PG_STATE_DEEP_SCRUB
);
2705 const char *mode
= (repair
? "repair": (deep_scrub
? "deep-scrub" : "scrub"));
2707 oss
<< info
.pgid
.pgid
<< " " << mode
<< " starts" << std::endl
;
2708 osd
->clog
->debug(oss
);
2711 scrubber
.preempt_left
= cct
->_conf
.get_val
<uint64_t>(
2712 "osd_scrub_max_preemptions");
2713 scrubber
.preempt_divisor
= 1;
2716 case PG::Scrubber::NEW_CHUNK
:
2717 scrubber
.primary_scrubmap
= ScrubMap();
2718 scrubber
.received_maps
.clear();
2720 // begin (possible) preemption window
2721 if (scrub_preempted
) {
2722 scrubber
.preempt_left
--;
2723 scrubber
.preempt_divisor
*= 2;
2724 dout(10) << __func__
<< " preempted, " << scrubber
.preempt_left
2725 << " left" << dendl
;
2726 scrub_preempted
= false;
2728 scrub_can_preempt
= scrubber
.preempt_left
> 0;
2731 /* get the start and end of our scrub chunk
2733 * Our scrub chunk has an important restriction we're going to need to
2734 * respect. We can't let head be start or end.
2735 * Using a half-open interval means that if end == head,
2736 * we'd scrub/lock head and the clone right next to head in different
2737 * chunks which would allow us to miss clones created between
2738 * scrubbing that chunk and scrubbing the chunk including head.
2739 * This isn't true for any of the other clones since clones can
2740 * only be created "just to the left of" head. There is one exception
2741 * to this: promotion of clones which always happens to the left of the
2742 * left-most clone, but promote_object checks the scrubber in that
2743 * case, so it should be ok. Also, it's ok to "miss" clones at the
2744 * left end of the range if we are a tier because they may legitimately
2745 * not exist (see _scrub).
2747 ceph_assert(scrubber
.preempt_divisor
> 0);
2748 int min
= std::max
<int64_t>(3, cct
->_conf
->osd_scrub_chunk_min
/
2749 scrubber
.preempt_divisor
);
2750 int max
= std::max
<int64_t>(min
, cct
->_conf
->osd_scrub_chunk_max
/
2751 scrubber
.preempt_divisor
);
2752 hobject_t start
= scrubber
.start
;
2753 hobject_t candidate_end
;
2754 vector
<hobject_t
> objects
;
2755 ret
= get_pgbackend()->objects_list_partial(
2761 ceph_assert(ret
>= 0);
2763 if (!objects
.empty()) {
2764 hobject_t back
= objects
.back();
2765 while (candidate_end
.is_head() &&
2766 candidate_end
== back
.get_head()) {
2767 candidate_end
= back
;
2769 if (objects
.empty()) {
2771 "Somehow we got more than 2 objects which"
2772 "have the same head but are not clones");
2774 back
= objects
.back();
2776 if (candidate_end
.is_head()) {
2777 ceph_assert(candidate_end
!= back
.get_head());
2778 candidate_end
= candidate_end
.get_object_boundary();
2781 ceph_assert(candidate_end
.is_max());
2784 if (!_range_available_for_scrub(scrubber
.start
, candidate_end
)) {
2785 // we'll be requeued by whatever made us unavailable for scrub
2786 dout(10) << __func__
<< ": scrub blocked somewhere in range "
2787 << "[" << scrubber
.start
<< ", " << candidate_end
<< ")"
2792 scrubber
.end
= candidate_end
;
2793 if (scrubber
.end
> scrubber
.max_end
)
2794 scrubber
.max_end
= scrubber
.end
;
2797 // walk the log to find the latest update that affects our chunk
2798 scrubber
.subset_last_update
= eversion_t();
2799 for (auto p
= projected_log
.log
.rbegin();
2800 p
!= projected_log
.log
.rend();
2802 if (p
->soid
>= scrubber
.start
&&
2803 p
->soid
< scrubber
.end
) {
2804 scrubber
.subset_last_update
= p
->version
;
2808 if (scrubber
.subset_last_update
== eversion_t()) {
2809 for (list
<pg_log_entry_t
>::const_reverse_iterator p
=
2810 recovery_state
.get_pg_log().get_log().log
.rbegin();
2811 p
!= recovery_state
.get_pg_log().get_log().log
.rend();
2813 if (p
->soid
>= scrubber
.start
&&
2814 p
->soid
< scrubber
.end
) {
2815 scrubber
.subset_last_update
= p
->version
;
2821 scrubber
.state
= PG::Scrubber::WAIT_PUSHES
;
2824 case PG::Scrubber::WAIT_PUSHES
:
2825 if (active_pushes
== 0) {
2826 scrubber
.state
= PG::Scrubber::WAIT_LAST_UPDATE
;
2828 dout(15) << "wait for pushes to apply" << dendl
;
2833 case PG::Scrubber::WAIT_LAST_UPDATE
:
2834 if (recovery_state
.get_last_update_applied() <
2835 scrubber
.subset_last_update
) {
2836 // will be requeued by op_applied
2837 dout(15) << "wait for EC read/modify/writes to queue" << dendl
;
2842 // ask replicas to scan
2843 scrubber
.waiting_on_whom
.insert(pg_whoami
);
2845 // request maps from replicas
2846 for (set
<pg_shard_t
>::iterator i
= get_acting_recovery_backfill().begin();
2847 i
!= get_acting_recovery_backfill().end();
2849 if (*i
== pg_whoami
) continue;
2850 _request_scrub_map(*i
, scrubber
.subset_last_update
,
2851 scrubber
.start
, scrubber
.end
, scrubber
.deep
,
2852 scrubber
.preempt_left
> 0);
2853 scrubber
.waiting_on_whom
.insert(*i
);
2855 dout(10) << __func__
<< " waiting_on_whom " << scrubber
.waiting_on_whom
2858 scrubber
.state
= PG::Scrubber::BUILD_MAP
;
2859 scrubber
.primary_scrubmap_pos
.reset();
2862 case PG::Scrubber::BUILD_MAP
:
2863 ceph_assert(recovery_state
.get_last_update_applied() >=
2864 scrubber
.subset_last_update
);
2866 // build my own scrub map
2867 if (scrub_preempted
) {
2868 dout(10) << __func__
<< " preempted" << dendl
;
2869 scrubber
.state
= PG::Scrubber::BUILD_MAP_DONE
;
2872 ret
= build_scrub_map_chunk(
2873 scrubber
.primary_scrubmap
,
2874 scrubber
.primary_scrubmap_pos
,
2875 scrubber
.start
, scrubber
.end
,
2878 if (ret
== -EINPROGRESS
) {
2883 scrubber
.state
= PG::Scrubber::BUILD_MAP_DONE
;
2886 case PG::Scrubber::BUILD_MAP_DONE
:
2887 if (scrubber
.primary_scrubmap_pos
.ret
< 0) {
2888 dout(5) << "error: " << scrubber
.primary_scrubmap_pos
.ret
2889 << ", aborting" << dendl
;
2890 scrub_clear_state();
2891 scrub_unreserve_replicas();
2894 dout(10) << __func__
<< " waiting_on_whom was "
2895 << scrubber
.waiting_on_whom
<< dendl
;
2896 ceph_assert(scrubber
.waiting_on_whom
.count(pg_whoami
));
2897 scrubber
.waiting_on_whom
.erase(pg_whoami
);
2899 scrubber
.state
= PG::Scrubber::WAIT_REPLICAS
;
2902 case PG::Scrubber::WAIT_REPLICAS
:
2903 if (!scrubber
.waiting_on_whom
.empty()) {
2904 // will be requeued by do_replica_scrub_map
2905 dout(10) << "wait for replicas to build scrub map" << dendl
;
2909 // end (possible) preemption window
2910 scrub_can_preempt
= false;
2911 if (scrub_preempted
) {
2912 dout(10) << __func__
<< " preempted, restarting chunk" << dendl
;
2913 scrubber
.state
= PG::Scrubber::NEW_CHUNK
;
2915 scrubber
.state
= PG::Scrubber::COMPARE_MAPS
;
2919 case PG::Scrubber::COMPARE_MAPS
:
2920 ceph_assert(recovery_state
.get_last_update_applied() >=
2921 scrubber
.subset_last_update
);
2922 ceph_assert(scrubber
.waiting_on_whom
.empty());
2924 scrub_compare_maps();
2925 scrubber
.start
= scrubber
.end
;
2926 scrubber
.run_callbacks();
2928 // requeue the writes from the chunk that just finished
2929 requeue_ops(waiting_for_scrub
);
2931 scrubber
.state
= PG::Scrubber::WAIT_DIGEST_UPDATES
;
2935 case PG::Scrubber::WAIT_DIGEST_UPDATES
:
2936 if (scrubber
.num_digest_updates_pending
) {
2937 dout(10) << __func__
<< " waiting on "
2938 << scrubber
.num_digest_updates_pending
2939 << " digest updates" << dendl
;
2944 scrubber
.preempt_left
= cct
->_conf
.get_val
<uint64_t>(
2945 "osd_scrub_max_preemptions");
2946 scrubber
.preempt_divisor
= 1;
2948 if (!(scrubber
.end
.is_max())) {
2949 scrubber
.state
= PG::Scrubber::NEW_CHUNK
;
2953 scrubber
.state
= PG::Scrubber::FINISH
;
2958 case PG::Scrubber::FINISH
:
2960 scrubber
.state
= PG::Scrubber::INACTIVE
;
2963 if (!snap_trimq
.empty()) {
2964 dout(10) << "scrub finished, requeuing snap_trimmer" << dendl
;
2965 snap_trimmer_scrub_complete();
2970 case PG::Scrubber::BUILD_MAP_REPLICA
:
2971 // build my own scrub map
2972 if (scrub_preempted
) {
2973 dout(10) << __func__
<< " preempted" << dendl
;
2976 ret
= build_scrub_map_chunk(
2977 scrubber
.replica_scrubmap
,
2978 scrubber
.replica_scrubmap_pos
,
2979 scrubber
.start
, scrubber
.end
,
2983 if (ret
== -EINPROGRESS
) {
2990 MOSDRepScrubMap
*reply
= new MOSDRepScrubMap(
2991 spg_t(info
.pgid
.pgid
, get_primary().shard
),
2992 scrubber
.replica_scrub_start
,
2994 reply
->preempted
= scrub_preempted
;
2995 ::encode(scrubber
.replica_scrubmap
, reply
->get_data());
2996 osd
->send_message_osd_cluster(
2997 get_primary().osd
, reply
,
2998 scrubber
.replica_scrub_start
);
3000 scrub_preempted
= false;
3001 scrub_can_preempt
= false;
3002 scrubber
.state
= PG::Scrubber::INACTIVE
;
3003 scrubber
.replica_scrubmap
= ScrubMap();
3004 scrubber
.replica_scrubmap_pos
= ScrubMapBuilder();
3005 scrubber
.start
= hobject_t();
3006 scrubber
.end
= hobject_t();
3007 scrubber
.max_end
= hobject_t();
3015 dout(20) << "scrub final state " << Scrubber::state_string(scrubber
.state
)
3016 << " [" << scrubber
.start
<< "," << scrubber
.end
<< ")"
3017 << " max_end " << scrubber
.max_end
<< dendl
;
3020 bool PG::write_blocked_by_scrub(const hobject_t
& soid
)
3022 if (soid
< scrubber
.start
|| soid
>= scrubber
.end
) {
3025 if (scrub_can_preempt
) {
3026 if (!scrub_preempted
) {
3027 dout(10) << __func__
<< " " << soid
<< " preempted" << dendl
;
3028 scrub_preempted
= true;
3030 dout(10) << __func__
<< " " << soid
<< " already preempted" << dendl
;
3037 bool PG::range_intersects_scrub(const hobject_t
&start
, const hobject_t
& end
)
3039 // does [start, end] intersect [scrubber.start, scrubber.max_end)
3040 return (start
< scrubber
.max_end
&&
3041 end
>= scrubber
.start
);
3044 void PG::scrub_clear_state(bool has_error
)
3046 ceph_assert(is_locked());
3047 state_clear(PG_STATE_SCRUBBING
);
3049 state_clear(PG_STATE_REPAIR
);
3050 state_clear(PG_STATE_DEEP_SCRUB
);
3051 publish_stats_to_osd();
3053 scrubber
.req_scrub
= false;
3054 // local -> nothing.
3055 if (scrubber
.local_reserved
) {
3056 osd
->dec_scrubs_local();
3057 scrubber
.local_reserved
= false;
3058 scrubber
.reserved_peers
.clear();
3061 requeue_ops(waiting_for_scrub
);
3065 // type-specific state clear
3066 _scrub_clear_state();
3069 void PG::scrub_compare_maps()
3071 dout(10) << __func__
<< " has maps, analyzing" << dendl
;
3073 // construct authoritative scrub map for type specific scrubbing
3074 scrubber
.cleaned_meta_map
.insert(scrubber
.primary_scrubmap
);
3076 pair
<std::optional
<uint32_t>,
3077 std::optional
<uint32_t>>> missing_digest
;
3079 map
<pg_shard_t
, ScrubMap
*> maps
;
3080 maps
[pg_whoami
] = &scrubber
.primary_scrubmap
;
3082 for (const auto& i
: get_acting_recovery_backfill()) {
3083 if (i
== pg_whoami
) continue;
3084 dout(2) << __func__
<< " replica " << i
<< " has "
3085 << scrubber
.received_maps
[i
].objects
.size()
3086 << " items" << dendl
;
3087 maps
[i
] = &scrubber
.received_maps
[i
];
3090 set
<hobject_t
> master_set
;
3092 // Construct master set
3093 for (const auto map
: maps
) {
3094 for (const auto i
: map
.second
->objects
) {
3095 master_set
.insert(i
.first
);
3100 get_pgbackend()->be_omap_checks(maps
, master_set
,
3101 scrubber
.omap_stats
, ss
);
3103 if (!ss
.str().empty()) {
3104 osd
->clog
->warn(ss
);
3107 if (recovery_state
.get_acting().size() > 1) {
3108 dout(10) << __func__
<< " comparing replica scrub maps" << dendl
;
3110 // Map from object with errors to good peer
3111 map
<hobject_t
, list
<pg_shard_t
>> authoritative
;
3113 dout(2) << __func__
<< get_primary() << " has "
3114 << scrubber
.primary_scrubmap
.objects
.size() << " items" << dendl
;
3119 get_pgbackend()->be_compare_scrubmaps(
3122 state_test(PG_STATE_REPAIR
),
3124 scrubber
.inconsistent
,
3127 scrubber
.shallow_errors
,
3128 scrubber
.deep_errors
,
3129 scrubber
.store
.get(),
3130 info
.pgid
, recovery_state
.get_acting(),
3132 dout(2) << ss
.str() << dendl
;
3134 if (!ss
.str().empty()) {
3135 osd
->clog
->error(ss
);
3138 for (map
<hobject_t
, list
<pg_shard_t
>>::iterator i
= authoritative
.begin();
3139 i
!= authoritative
.end();
3141 list
<pair
<ScrubMap::object
, pg_shard_t
> > good_peers
;
3142 for (list
<pg_shard_t
>::const_iterator j
= i
->second
.begin();
3143 j
!= i
->second
.end();
3145 good_peers
.emplace_back(maps
[*j
]->objects
[i
->first
], *j
);
3147 scrubber
.authoritative
.emplace(i
->first
, good_peers
);
3150 for (map
<hobject_t
, list
<pg_shard_t
>>::iterator i
= authoritative
.begin();
3151 i
!= authoritative
.end();
3153 scrubber
.cleaned_meta_map
.objects
.erase(i
->first
);
3154 scrubber
.cleaned_meta_map
.objects
.insert(
3155 *(maps
[i
->second
.back()]->objects
.find(i
->first
))
3160 ScrubMap for_meta_scrub
;
3161 scrubber
.clean_meta_map(for_meta_scrub
);
3163 // ok, do the pg-type specific scrubbing
3164 scrub_snapshot_metadata(for_meta_scrub
, missing_digest
);
3165 // Called here on the primary can use an authoritative map if it isn't the primary
3166 _scan_snaps(for_meta_scrub
);
3167 if (!scrubber
.store
->empty()) {
3168 if (state_test(PG_STATE_REPAIR
)) {
3169 dout(10) << __func__
<< ": discarding scrub results" << dendl
;
3170 scrubber
.store
->flush(nullptr);
3172 dout(10) << __func__
<< ": updating scrub object" << dendl
;
3173 ObjectStore::Transaction t
;
3174 scrubber
.store
->flush(&t
);
3175 osd
->store
->queue_transaction(ch
, std::move(t
), nullptr);
3180 bool PG::scrub_process_inconsistent()
3182 dout(10) << __func__
<< ": checking authoritative" << dendl
;
3183 bool repair
= state_test(PG_STATE_REPAIR
);
3184 bool deep_scrub
= state_test(PG_STATE_DEEP_SCRUB
);
3185 const char *mode
= (repair
? "repair": (deep_scrub
? "deep-scrub" : "scrub"));
3187 // authoriative only store objects which missing or inconsistent.
3188 if (!scrubber
.authoritative
.empty()) {
3190 ss
<< info
.pgid
<< " " << mode
<< " "
3191 << scrubber
.missing
.size() << " missing, "
3192 << scrubber
.inconsistent
.size() << " inconsistent objects";
3193 dout(2) << ss
.str() << dendl
;
3194 osd
->clog
->error(ss
);
3196 state_clear(PG_STATE_CLEAN
);
3197 for (map
<hobject_t
, list
<pair
<ScrubMap::object
, pg_shard_t
> >>::iterator i
=
3198 scrubber
.authoritative
.begin();
3199 i
!= scrubber
.authoritative
.end();
3201 auto missing_entry
= scrubber
.missing
.find(i
->first
);
3202 if (missing_entry
!= scrubber
.missing
.end()) {
3206 missing_entry
->second
);
3207 scrubber
.fixed
+= missing_entry
->second
.size();
3209 if (scrubber
.inconsistent
.count(i
->first
)) {
3213 scrubber
.inconsistent
[i
->first
]);
3214 scrubber
.fixed
+= missing_entry
->second
.size();
3219 return (!scrubber
.authoritative
.empty() && repair
);
3222 bool PG::ops_blocked_by_scrub() const {
3223 return (waiting_for_scrub
.size() != 0);
3226 // the part that actually finalizes a scrub
3227 void PG::scrub_finish()
3229 dout(20) << __func__
<< dendl
;
3230 bool repair
= state_test(PG_STATE_REPAIR
);
3231 bool do_auto_scrub
= false;
3232 // if the repair request comes from auto-repair and large number of errors,
3233 // we would like to cancel auto-repair
3234 if (repair
&& scrubber
.auto_repair
3235 && scrubber
.authoritative
.size() > cct
->_conf
->osd_scrub_auto_repair_num_errors
) {
3236 state_clear(PG_STATE_REPAIR
);
3239 bool deep_scrub
= state_test(PG_STATE_DEEP_SCRUB
);
3240 const char *mode
= (repair
? "repair": (deep_scrub
? "deep-scrub" : "scrub"));
3242 // if a regular scrub had errors within the limit, do a deep scrub to auto repair.
3243 if (scrubber
.deep_scrub_on_error
3244 && scrubber
.authoritative
.size()
3245 && scrubber
.authoritative
.size() <= cct
->_conf
->osd_scrub_auto_repair_num_errors
) {
3246 ceph_assert(!deep_scrub
);
3247 do_auto_scrub
= true;
3248 dout(20) << __func__
<< " Try to auto repair after scrub errors" << dendl
;
3250 scrubber
.deep_scrub_on_error
= false;
3252 // type-specific finish (can tally more errors)
3255 bool has_error
= scrub_process_inconsistent();
3259 oss
<< info
.pgid
.pgid
<< " " << mode
<< " ";
3260 int total_errors
= scrubber
.shallow_errors
+ scrubber
.deep_errors
;
3262 oss
<< total_errors
<< " errors";
3265 if (!deep_scrub
&& info
.stats
.stats
.sum
.num_deep_scrub_errors
)
3266 oss
<< " ( " << info
.stats
.stats
.sum
.num_deep_scrub_errors
3267 << " remaining deep scrub error details lost)";
3269 oss
<< ", " << scrubber
.fixed
<< " fixed";
3271 osd
->clog
->error(oss
);
3273 osd
->clog
->debug(oss
);
3276 // Since we don't know which errors were fixed, we can only clear them
3277 // when every one has been fixed.
3279 if (scrubber
.fixed
== scrubber
.shallow_errors
+ scrubber
.deep_errors
) {
3280 ceph_assert(deep_scrub
);
3281 scrubber
.shallow_errors
= scrubber
.deep_errors
= 0;
3282 dout(20) << __func__
<< " All may be fixed" << dendl
;
3283 } else if (has_error
) {
3284 // Deep scrub in order to get corrected error counts
3285 scrub_after_recovery
= true;
3286 save_req_scrub
= scrubber
.req_scrub
;
3287 dout(20) << __func__
<< " Set scrub_after_recovery, req_scrub=" << save_req_scrub
<< dendl
;
3288 } else if (scrubber
.shallow_errors
|| scrubber
.deep_errors
) {
3289 // We have errors but nothing can be fixed, so there is no repair
3291 state_set(PG_STATE_FAILED_REPAIR
);
3292 dout(10) << __func__
<< " " << (scrubber
.shallow_errors
+ scrubber
.deep_errors
)
3293 << " error(s) present with no repair possible" << dendl
;
3299 ObjectStore::Transaction t
;
3300 recovery_state
.update_stats(
3301 [this, deep_scrub
](auto &history
, auto &stats
) {
3302 utime_t now
= ceph_clock_now();
3303 history
.last_scrub
= recovery_state
.get_info().last_update
;
3304 history
.last_scrub_stamp
= now
;
3305 if (scrubber
.deep
) {
3306 history
.last_deep_scrub
= recovery_state
.get_info().last_update
;
3307 history
.last_deep_scrub_stamp
= now
;
3311 if ((scrubber
.shallow_errors
== 0) && (scrubber
.deep_errors
== 0))
3312 history
.last_clean_scrub_stamp
= now
;
3313 stats
.stats
.sum
.num_shallow_scrub_errors
= scrubber
.shallow_errors
;
3314 stats
.stats
.sum
.num_deep_scrub_errors
= scrubber
.deep_errors
;
3315 stats
.stats
.sum
.num_large_omap_objects
= scrubber
.omap_stats
.large_omap_objects
;
3316 stats
.stats
.sum
.num_omap_bytes
= scrubber
.omap_stats
.omap_bytes
;
3317 stats
.stats
.sum
.num_omap_keys
= scrubber
.omap_stats
.omap_keys
;
3318 dout(25) << "scrub_finish shard " << pg_whoami
<< " num_omap_bytes = "
3319 << stats
.stats
.sum
.num_omap_bytes
<< " num_omap_keys = "
3320 << stats
.stats
.sum
.num_omap_keys
<< dendl
;
3322 stats
.stats
.sum
.num_shallow_scrub_errors
= scrubber
.shallow_errors
;
3323 // XXX: last_clean_scrub_stamp doesn't mean the pg is not inconsistent
3324 // because of deep-scrub errors
3325 if (scrubber
.shallow_errors
== 0)
3326 history
.last_clean_scrub_stamp
= now
;
3328 stats
.stats
.sum
.num_scrub_errors
=
3329 stats
.stats
.sum
.num_shallow_scrub_errors
+
3330 stats
.stats
.sum
.num_deep_scrub_errors
;
3331 if (scrubber
.check_repair
) {
3332 scrubber
.check_repair
= false;
3333 if (info
.stats
.stats
.sum
.num_scrub_errors
) {
3334 state_set(PG_STATE_FAILED_REPAIR
);
3335 dout(10) << "scrub_finish " << info
.stats
.stats
.sum
.num_scrub_errors
3336 << " error(s) still present after re-scrub" << dendl
;
3342 int tr
= osd
->store
->queue_transaction(ch
, std::move(t
), NULL
);
3343 ceph_assert(tr
== 0);
3347 queue_peering_event(
3349 std::make_shared
<PGPeeringEvent
>(
3352 PeeringState::DoRecovery())));
3355 scrub_clear_state(has_error
);
3356 scrub_unreserve_replicas();
3358 if (do_auto_scrub
) {
3359 scrub_requested(false, false, true);
3362 if (is_active() && is_primary()) {
3363 recovery_state
.share_pg_info();
3367 bool PG::old_peering_msg(epoch_t reply_epoch
, epoch_t query_epoch
)
3369 if (get_last_peering_reset() > reply_epoch
||
3370 get_last_peering_reset() > query_epoch
) {
3371 dout(10) << "old_peering_msg reply_epoch " << reply_epoch
<< " query_epoch " << query_epoch
3372 << " last_peering_reset " << get_last_peering_reset()
3382 FlushState(PG
*pg
, epoch_t epoch
) : pg(pg
), epoch(epoch
) {}
3384 std::scoped_lock l
{*pg
};
3385 if (!pg
->pg_has_reset_since(epoch
)) {
3386 pg
->recovery_state
.complete_flush();
3390 typedef std::shared_ptr
<FlushState
> FlushStateRef
;
3392 void PG::start_flush_on_transaction(ObjectStore::Transaction
&t
)
3394 // flush in progress ops
3395 FlushStateRef
flush_trigger (std::make_shared
<FlushState
>(
3396 this, get_osdmap_epoch()));
3397 t
.register_on_applied(new ContainerContext
<FlushStateRef
>(flush_trigger
));
3398 t
.register_on_commit(new ContainerContext
<FlushStateRef
>(flush_trigger
));
3401 bool PG::try_flush_or_schedule_async()
3404 Context
*c
= new QueuePeeringEvt(
3405 this, get_osdmap_epoch(), PeeringState::IntervalFlush());
3406 if (!ch
->flush_commit(c
)) {
3414 ostream
& operator<<(ostream
& out
, const PG
& pg
)
3416 out
<< pg
.recovery_state
;
3417 if (pg
.scrubber
.must_repair
)
3418 out
<< " MUST_REPAIR";
3419 if (pg
.scrubber
.auto_repair
)
3420 out
<< " AUTO_REPAIR";
3421 if (pg
.scrubber
.check_repair
)
3422 out
<< " CHECK_REPAIR";
3423 if (pg
.scrubber
.deep_scrub_on_error
)
3424 out
<< " DEEP_SCRUB_ON_ERROR";
3425 if (pg
.scrubber
.must_deep_scrub
)
3426 out
<< " MUST_DEEP_SCRUB";
3427 if (pg
.scrubber
.must_scrub
)
3428 out
<< " MUST_SCRUB";
3429 if (pg
.scrubber
.time_for_deep
)
3430 out
<< " TIME_FOR_DEEP";
3431 if (pg
.scrubber
.need_auto
)
3432 out
<< " NEED_AUTO";
3433 if (pg
.scrubber
.req_scrub
)
3434 out
<< " REQ_SCRUB";
3436 if (pg
.recovery_ops_active
)
3437 out
<< " rops=" << pg
.recovery_ops_active
;
3439 //out << " (" << pg.pg_log.get_tail() << "," << pg.pg_log.get_head() << "]";
3440 if (pg
.recovery_state
.have_missing()) {
3441 out
<< " m=" << pg
.recovery_state
.get_num_missing();
3442 if (pg
.is_primary()) {
3443 uint64_t unfound
= pg
.recovery_state
.get_num_unfound();
3445 out
<< " u=" << unfound
;
3448 if (!pg
.is_clean()) {
3449 out
<< " mbc=" << pg
.recovery_state
.get_missing_by_count();
3451 if (!pg
.snap_trimq
.empty()) {
3453 // only show a count if the set is large
3454 if (pg
.snap_trimq
.num_intervals() > 16) {
3455 out
<< pg
.snap_trimq
.size();
3456 if (!pg
.snap_trimq_repeat
.empty()) {
3457 out
<< "(" << pg
.snap_trimq_repeat
.size() << ")";
3460 out
<< pg
.snap_trimq
;
3461 if (!pg
.snap_trimq_repeat
.empty()) {
3462 out
<< "(" << pg
.snap_trimq_repeat
<< ")";
3466 if (!pg
.recovery_state
.get_info().purged_snaps
.empty()) {
3467 out
<< " ps="; // snap trim queue / purged snaps
3468 if (pg
.recovery_state
.get_info().purged_snaps
.num_intervals() > 16) {
3469 out
<< pg
.recovery_state
.get_info().purged_snaps
.size();
3471 out
<< pg
.recovery_state
.get_info().purged_snaps
;
3481 bool PG::can_discard_op(OpRequestRef
& op
)
3483 auto m
= op
->get_req
<MOSDOp
>();
3484 if (cct
->_conf
->osd_discard_disconnected_ops
&& OSD::op_is_discardable(m
)) {
3485 dout(20) << " discard " << *m
<< dendl
;
3489 if (m
->get_map_epoch() < info
.history
.same_primary_since
) {
3490 dout(7) << " changed after " << m
->get_map_epoch()
3491 << ", dropping " << *m
<< dendl
;
3495 if ((m
->get_flags() & (CEPH_OSD_FLAG_BALANCE_READS
|
3496 CEPH_OSD_FLAG_LOCALIZE_READS
)) &&
3498 m
->get_map_epoch() < info
.history
.same_interval_since
) {
3499 // Note: the Objecter will resend on interval change without the primary
3500 // changing if it actually sent to a replica. If the primary hasn't
3501 // changed since the send epoch, we got it, and we're primary, it won't
3502 // have resent even if the interval did change as it sent it to the primary
3508 if (m
->get_connection()->has_feature(CEPH_FEATURE_RESEND_ON_SPLIT
)) {
3509 // >= luminous client
3510 if (m
->get_connection()->has_feature(CEPH_FEATURE_SERVER_NAUTILUS
)) {
3511 // >= nautilus client
3512 if (m
->get_map_epoch() < pool
.info
.get_last_force_op_resend()) {
3513 dout(7) << __func__
<< " sent before last_force_op_resend "
3514 << pool
.info
.last_force_op_resend
3515 << ", dropping" << *m
<< dendl
;
3519 // == < nautilus client (luminous or mimic)
3520 if (m
->get_map_epoch() < pool
.info
.get_last_force_op_resend_prenautilus()) {
3521 dout(7) << __func__
<< " sent before last_force_op_resend_prenautilus "
3522 << pool
.info
.last_force_op_resend_prenautilus
3523 << ", dropping" << *m
<< dendl
;
3527 if (m
->get_map_epoch() < info
.history
.last_epoch_split
) {
3528 dout(7) << __func__
<< " pg split in "
3529 << info
.history
.last_epoch_split
<< ", dropping" << dendl
;
3532 } else if (m
->get_connection()->has_feature(CEPH_FEATURE_OSD_POOLRESEND
)) {
3533 // < luminous client
3534 if (m
->get_map_epoch() < pool
.info
.get_last_force_op_resend_preluminous()) {
3535 dout(7) << __func__
<< " sent before last_force_op_resend_preluminous "
3536 << pool
.info
.last_force_op_resend_preluminous
3537 << ", dropping" << *m
<< dendl
;
3545 template<typename T
, int MSGTYPE
>
3546 bool PG::can_discard_replica_op(OpRequestRef
& op
)
3548 auto m
= op
->get_req
<T
>();
3549 ceph_assert(m
->get_type() == MSGTYPE
);
3551 int from
= m
->get_source().num();
3553 // if a repop is replied after a replica goes down in a new osdmap, and
3554 // before the pg advances to this new osdmap, the repop replies before this
3555 // repop can be discarded by that replica OSD, because the primary resets the
3556 // connection to it when handling the new osdmap marking it down, and also
3557 // resets the messenger sesssion when the replica reconnects. to avoid the
3558 // out-of-order replies, the messages from that replica should be discarded.
3559 OSDMapRef next_map
= osd
->get_next_osdmap();
3560 if (next_map
->is_down(from
))
3562 /* Mostly, this overlaps with the old_peering_msg
3563 * condition. An important exception is pushes
3564 * sent by replicas not in the acting set, since
3565 * if such a replica goes down it does not cause
3566 * a new interval. */
3567 if (next_map
->get_down_at(from
) >= m
->map_epoch
)
3571 // if pg changes _at all_, we reset and repeer!
3572 if (old_peering_msg(m
->map_epoch
, m
->map_epoch
)) {
3573 dout(10) << "can_discard_replica_op pg changed " << info
.history
3574 << " after " << m
->map_epoch
3575 << ", dropping" << dendl
;
3581 bool PG::can_discard_scan(OpRequestRef op
)
3583 auto m
= op
->get_req
<MOSDPGScan
>();
3584 ceph_assert(m
->get_type() == MSG_OSD_PG_SCAN
);
3586 if (old_peering_msg(m
->map_epoch
, m
->query_epoch
)) {
3587 dout(10) << " got old scan, ignoring" << dendl
;
3593 bool PG::can_discard_backfill(OpRequestRef op
)
3595 auto m
= op
->get_req
<MOSDPGBackfill
>();
3596 ceph_assert(m
->get_type() == MSG_OSD_PG_BACKFILL
);
3598 if (old_peering_msg(m
->map_epoch
, m
->query_epoch
)) {
3599 dout(10) << " got old backfill, ignoring" << dendl
;
3607 bool PG::can_discard_request(OpRequestRef
& op
)
3609 switch (op
->get_req()->get_type()) {
3610 case CEPH_MSG_OSD_OP
:
3611 return can_discard_op(op
);
3612 case CEPH_MSG_OSD_BACKOFF
:
3613 return false; // never discard
3615 return can_discard_replica_op
<MOSDRepOp
, MSG_OSD_REPOP
>(op
);
3616 case MSG_OSD_PG_PUSH
:
3617 return can_discard_replica_op
<MOSDPGPush
, MSG_OSD_PG_PUSH
>(op
);
3618 case MSG_OSD_PG_PULL
:
3619 return can_discard_replica_op
<MOSDPGPull
, MSG_OSD_PG_PULL
>(op
);
3620 case MSG_OSD_PG_PUSH_REPLY
:
3621 return can_discard_replica_op
<MOSDPGPushReply
, MSG_OSD_PG_PUSH_REPLY
>(op
);
3622 case MSG_OSD_REPOPREPLY
:
3623 return can_discard_replica_op
<MOSDRepOpReply
, MSG_OSD_REPOPREPLY
>(op
);
3624 case MSG_OSD_PG_RECOVERY_DELETE
:
3625 return can_discard_replica_op
<MOSDPGRecoveryDelete
, MSG_OSD_PG_RECOVERY_DELETE
>(op
);
3627 case MSG_OSD_PG_RECOVERY_DELETE_REPLY
:
3628 return can_discard_replica_op
<MOSDPGRecoveryDeleteReply
, MSG_OSD_PG_RECOVERY_DELETE_REPLY
>(op
);
3630 case MSG_OSD_EC_WRITE
:
3631 return can_discard_replica_op
<MOSDECSubOpWrite
, MSG_OSD_EC_WRITE
>(op
);
3632 case MSG_OSD_EC_WRITE_REPLY
:
3633 return can_discard_replica_op
<MOSDECSubOpWriteReply
, MSG_OSD_EC_WRITE_REPLY
>(op
);
3634 case MSG_OSD_EC_READ
:
3635 return can_discard_replica_op
<MOSDECSubOpRead
, MSG_OSD_EC_READ
>(op
);
3636 case MSG_OSD_EC_READ_REPLY
:
3637 return can_discard_replica_op
<MOSDECSubOpReadReply
, MSG_OSD_EC_READ_REPLY
>(op
);
3638 case MSG_OSD_REP_SCRUB
:
3639 return can_discard_replica_op
<MOSDRepScrub
, MSG_OSD_REP_SCRUB
>(op
);
3640 case MSG_OSD_SCRUB_RESERVE
:
3641 return can_discard_replica_op
<MOSDScrubReserve
, MSG_OSD_SCRUB_RESERVE
>(op
);
3642 case MSG_OSD_REP_SCRUBMAP
:
3643 return can_discard_replica_op
<MOSDRepScrubMap
, MSG_OSD_REP_SCRUBMAP
>(op
);
3644 case MSG_OSD_PG_UPDATE_LOG_MISSING
:
3645 return can_discard_replica_op
<
3646 MOSDPGUpdateLogMissing
, MSG_OSD_PG_UPDATE_LOG_MISSING
>(op
);
3647 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY
:
3648 return can_discard_replica_op
<
3649 MOSDPGUpdateLogMissingReply
, MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY
>(op
);
3651 case MSG_OSD_PG_SCAN
:
3652 return can_discard_scan(op
);
3653 case MSG_OSD_PG_BACKFILL
:
3654 return can_discard_backfill(op
);
3655 case MSG_OSD_PG_BACKFILL_REMOVE
:
3656 return can_discard_replica_op
<MOSDPGBackfillRemove
,
3657 MSG_OSD_PG_BACKFILL_REMOVE
>(op
);
3662 void PG::do_peering_event(PGPeeringEventRef evt
, PeeringCtx
&rctx
)
3664 dout(10) << __func__
<< ": " << evt
->get_desc() << dendl
;
3665 ceph_assert(have_same_or_newer_map(evt
->get_epoch_sent()));
3666 if (old_peering_evt(evt
)) {
3667 dout(10) << "discard old " << evt
->get_desc() << dendl
;
3669 recovery_state
.handle_event(evt
, &rctx
);
3671 // write_if_dirty regardless of path above to ensure we capture any work
3672 // done by OSD::advance_pg().
3673 write_if_dirty(rctx
.transaction
);
3676 void PG::queue_peering_event(PGPeeringEventRef evt
)
3678 if (old_peering_evt(evt
))
3680 osd
->osd
->enqueue_peering_evt(info
.pgid
, evt
);
3683 void PG::queue_null(epoch_t msg_epoch
,
3684 epoch_t query_epoch
)
3686 dout(10) << "null" << dendl
;
3687 queue_peering_event(
3688 PGPeeringEventRef(std::make_shared
<PGPeeringEvent
>(msg_epoch
, query_epoch
,
3692 void PG::find_unfound(epoch_t queued
, PeeringCtx
&rctx
)
3695 * if we couldn't start any recovery ops and things are still
3696 * unfound, see if we can discover more missing object locations.
3697 * It may be that our initial locations were bad and we errored
3698 * out while trying to pull.
3700 if (!recovery_state
.discover_all_missing(rctx
)) {
3702 if (state_test(PG_STATE_BACKFILLING
)) {
3703 auto evt
= PGPeeringEventRef(
3707 PeeringState::UnfoundBackfill()));
3708 queue_peering_event(evt
);
3709 action
= "in backfill";
3710 } else if (state_test(PG_STATE_RECOVERING
)) {
3711 auto evt
= PGPeeringEventRef(
3715 PeeringState::UnfoundRecovery()));
3716 queue_peering_event(evt
);
3717 action
= "in recovery";
3719 action
= "already out of recovery/backfill";
3721 dout(10) << __func__
<< ": no luck, giving up on this pg for now (" << action
<< ")" << dendl
;
3723 dout(10) << __func__
<< ": no luck, giving up on this pg for now (queue_recovery)" << dendl
;
3728 void PG::handle_advance_map(
3729 OSDMapRef osdmap
, OSDMapRef lastmap
,
3730 vector
<int>& newup
, int up_primary
,
3731 vector
<int>& newacting
, int acting_primary
,
3734 dout(10) << __func__
<< ": " << osdmap
->get_epoch() << dendl
;
3735 osd_shard
->update_pg_epoch(pg_slot
, osdmap
->get_epoch());
3736 recovery_state
.advance_map(
3746 void PG::handle_activate_map(PeeringCtx
&rctx
)
3748 dout(10) << __func__
<< ": " << get_osdmap()->get_epoch()
3750 recovery_state
.activate_map(rctx
);
3752 requeue_map_waiters();
3755 void PG::handle_initialize(PeeringCtx
&rctx
)
3757 dout(10) << __func__
<< dendl
;
3758 PeeringState::Initialize evt
;
3759 recovery_state
.handle_event(evt
, &rctx
);
3762 void PG::handle_query_state(Formatter
*f
)
3764 dout(10) << "handle_query_state" << dendl
;
3765 PeeringState::QueryState
q(f
);
3766 recovery_state
.handle_event(q
, 0);
3768 if (is_primary() && is_active()) {
3769 f
->open_object_section("scrub");
3770 f
->dump_stream("scrubber.epoch_start") << scrubber
.epoch_start
;
3771 f
->dump_bool("scrubber.active", scrubber
.active
);
3772 f
->dump_string("scrubber.state", PG::Scrubber::state_string(scrubber
.state
));
3773 f
->dump_stream("scrubber.start") << scrubber
.start
;
3774 f
->dump_stream("scrubber.end") << scrubber
.end
;
3775 f
->dump_stream("scrubber.max_end") << scrubber
.max_end
;
3776 f
->dump_stream("scrubber.subset_last_update") << scrubber
.subset_last_update
;
3777 f
->dump_bool("scrubber.deep", scrubber
.deep
);
3779 f
->open_array_section("scrubber.waiting_on_whom");
3780 for (set
<pg_shard_t
>::iterator p
= scrubber
.waiting_on_whom
.begin();
3781 p
!= scrubber
.waiting_on_whom
.end();
3783 f
->dump_stream("shard") << *p
;
3791 void PG::init_collection_pool_opts()
3793 auto r
= osd
->store
->set_collection_opts(ch
, pool
.info
.opts
);
3794 if (r
< 0 && r
!= -EOPNOTSUPP
) {
3795 derr
<< __func__
<< " set_collection_opts returns error:" << r
<< dendl
;
3799 void PG::on_pool_change()
3801 init_collection_pool_opts();
3802 plpg_on_pool_change();
3805 void PG::C_DeleteMore::complete(int r
) {
3806 ceph_assert(r
== 0);
3808 if (!pg
->pg_has_reset_since(epoch
)) {
3809 pg
->osd
->queue_for_pg_delete(pg
->get_pgid(), epoch
);
3815 void PG::do_delete_work(ObjectStore::Transaction
&t
)
3817 dout(10) << __func__
<< dendl
;
3820 float osd_delete_sleep
= osd
->osd
->get_osd_delete_sleep();
3821 if (osd_delete_sleep
> 0 && delete_needs_sleep
) {
3822 epoch_t e
= get_osdmap()->get_epoch();
3824 auto delete_requeue_callback
= new LambdaContext([this, pgref
, e
](int r
) {
3825 dout(20) << __func__
<< " wake up at "
3827 << ", re-queuing delete" << dendl
;
3828 std::scoped_lock locker
{*this};
3829 delete_needs_sleep
= false;
3830 if (!pg_has_reset_since(e
)) {
3831 osd
->queue_for_pg_delete(get_pgid(), e
);
3835 auto delete_schedule_time
= ceph::real_clock::now();
3836 delete_schedule_time
+= ceph::make_timespan(osd_delete_sleep
);
3837 std::lock_guard l
{osd
->sleep_lock
};
3838 osd
->sleep_timer
.add_event_at(delete_schedule_time
,
3839 delete_requeue_callback
);
3840 dout(20) << __func__
<< " Delete scheduled at " << delete_schedule_time
<< dendl
;
3845 delete_needs_sleep
= true;
3847 vector
<ghobject_t
> olist
;
3848 int max
= std::min(osd
->store
->get_ideal_list_max(),
3849 (int)cct
->_conf
->osd_target_transaction_size
);
3851 osd
->store
->collection_list(
3854 ghobject_t::get_max(),
3858 dout(20) << __func__
<< " " << olist
<< dendl
;
3860 OSDriver::OSTransaction
_t(osdriver
.get_transaction(&t
));
3862 for (auto& oid
: olist
) {
3863 if (oid
== pgmeta_oid
) {
3866 if (oid
.is_pgmeta()) {
3867 osd
->clog
->warn() << info
.pgid
<< " found stray pgmeta-like " << oid
3868 << " during PG removal";
3870 int r
= snap_mapper
.remove_oid(oid
.hobj
, &_t
);
3871 if (r
!= 0 && r
!= -ENOENT
) {
3874 t
.remove(coll
, oid
);
3878 dout(20) << __func__
<< " deleting " << num
<< " objects" << dendl
;
3879 Context
*fin
= new C_DeleteMore(this, get_osdmap_epoch());
3880 t
.register_on_commit(fin
);
3882 dout(20) << __func__
<< " finished" << dendl
;
3883 if (cct
->_conf
->osd_inject_failure_on_pg_removal
) {
3887 // final flush here to ensure completions drop refs. Of particular concern
3888 // are the SnapMapper ContainerContexts.
3891 PGLog::clear_info_log(info
.pgid
, &t
);
3892 t
.remove_collection(coll
);
3893 t
.register_on_commit(new ContainerContext
<PGRef
>(pgref
));
3894 t
.register_on_applied(new ContainerContext
<PGRef
>(pgref
));
3895 osd
->store
->queue_transaction(ch
, std::move(t
));
3899 if (!osd
->try_finish_pg_delete(this, pool
.info
.get_pg_num())) {
3900 dout(1) << __func__
<< " raced with merge, reinstantiating" << dendl
;
3901 ch
= osd
->store
->create_new_collection(coll
);
3902 create_pg_collection(t
,
3904 info
.pgid
.get_split_bits(pool
.info
.get_pg_num()));
3905 init_pg_ondisk(t
, info
.pgid
, &pool
.info
);
3906 recovery_state
.reset_last_persisted();
3908 recovery_state
.set_delete_complete();
3910 // cancel reserver here, since the PG is about to get deleted and the
3911 // exit() methods don't run when that happens.
3912 osd
->local_reserver
.cancel_reservation(info
.pgid
);
3914 osd
->logger
->dec(l_osd_pg_removing
);
3919 int PG::pg_stat_adjust(osd_stat_t
*ns
)
3921 osd_stat_t
&new_stat
= *ns
;
3925 // Adjust the kb_used by adding pending backfill data
3926 uint64_t reserved_num_bytes
= get_reserved_num_bytes();
3928 // For now we don't consider projected space gains here
3929 // I suggest we have an optional 2 pass backfill that frees up
3930 // space in a first pass. This could be triggered when at nearfull
3931 // or near to backfillfull.
3932 if (reserved_num_bytes
> 0) {
3933 // TODO: Handle compression by adjusting by the PGs average
3934 // compression precentage.
3935 dout(20) << __func__
<< " reserved_num_bytes " << (reserved_num_bytes
>> 10) << "KiB"
3936 << " Before kb_used " << new_stat
.statfs
.kb_used() << "KiB" << dendl
;
3937 if (new_stat
.statfs
.available
> reserved_num_bytes
)
3938 new_stat
.statfs
.available
-= reserved_num_bytes
;
3940 new_stat
.statfs
.available
= 0;
3941 dout(20) << __func__
<< " After kb_used " << new_stat
.statfs
.kb_used() << "KiB" << dendl
;
3947 ostream
& operator<<(ostream
& out
, const PG::BackfillInterval
& bi
)
3949 out
<< "BackfillInfo(" << bi
.begin
<< "-" << bi
.end
3950 << " " << bi
.objects
.size() << " objects";
3951 if (!bi
.objects
.empty())
3952 out
<< " " << bi
.objects
;
3957 void PG::dump_pgstate_history(Formatter
*f
)
3959 std::scoped_lock l
{*this};
3960 recovery_state
.dump_history(f
);
3963 void PG::dump_missing(Formatter
*f
)
3965 for (auto& i
: recovery_state
.get_pg_log().get_missing().get_items()) {
3966 f
->open_object_section("object");
3967 f
->dump_object("oid", i
.first
);
3968 f
->dump_object("missing_info", i
.second
);
3969 if (recovery_state
.get_missing_loc().needs_recovery(i
.first
)) {
3972 recovery_state
.get_missing_loc().is_unfound(i
.first
));
3973 f
->open_array_section("locations");
3974 for (auto l
: recovery_state
.get_missing_loc().get_locations(i
.first
)) {
3975 f
->dump_object("shard", l
);
3983 void PG::get_pg_stats(std::function
<void(const pg_stat_t
&, epoch_t lec
)> f
)
3985 std::lock_guard l
{pg_stats_publish_lock
};
3986 if (pg_stats_publish_valid
) {
3987 f(pg_stats_publish
, pg_stats_publish
.get_effective_last_epoch_clean());
3991 void PG::with_heartbeat_peers(std::function
<void(int)> f
)
3993 std::lock_guard l
{heartbeat_peer_lock
};
3994 for (auto p
: heartbeat_peers
) {
3997 for (auto p
: probe_targets
) {
4002 uint64_t PG::get_min_alloc_size() const {
4003 return osd
->store
->get_min_alloc_size();