1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "PGPeeringEvent.h"
5 #include "common/ceph_releases.h"
6 #include "common/dout.h"
7 #include "PeeringState.h"
9 #include "messages/MOSDPGRemove.h"
10 #include "messages/MBackfillReserve.h"
11 #include "messages/MRecoveryReserve.h"
12 #include "messages/MOSDScrubReserve.h"
13 #include "messages/MOSDPGInfo.h"
14 #include "messages/MOSDPGInfo2.h"
15 #include "messages/MOSDPGTrim.h"
16 #include "messages/MOSDPGLog.h"
17 #include "messages/MOSDPGNotify.h"
18 #include "messages/MOSDPGNotify2.h"
19 #include "messages/MOSDPGQuery.h"
20 #include "messages/MOSDPGQuery2.h"
21 #include "messages/MOSDPGLease.h"
22 #include "messages/MOSDPGLeaseAck.h"
24 #define dout_context cct
25 #define dout_subsys ceph_subsys_osd
34 using std::stringstream
;
37 using ceph::Formatter
;
38 using ceph::make_message
;
40 BufferedRecoveryMessages::BufferedRecoveryMessages(
43 : require_osd_release(r
) {
44 // steal messages from ctx
45 message_map
.swap(ctx
.message_map
);
48 void BufferedRecoveryMessages::send_notify(int to
, const pg_notify_t
&n
)
50 if (require_osd_release
>= ceph_release_t::octopus
) {
51 spg_t
pgid(n
.info
.pgid
.pgid
, n
.to
);
52 send_osd_message(to
, make_message
<MOSDPGNotify2
>(pgid
, n
));
54 send_osd_message(to
, make_message
<MOSDPGNotify
>(n
.epoch_sent
, vector
{n
}));
58 void BufferedRecoveryMessages::send_query(
63 if (require_osd_release
>= ceph_release_t::octopus
) {
65 make_message
<MOSDPGQuery2
>(to_spgid
, q
));
67 auto m
= make_message
<MOSDPGQuery
>(
69 MOSDPGQuery::pg_list_t
{{to_spgid
, q
}});
70 send_osd_message(to
, m
);
74 void BufferedRecoveryMessages::send_info(
79 const pg_info_t
&info
,
80 std::optional
<pg_lease_t
> lease
,
81 std::optional
<pg_lease_ack_t
> lease_ack
)
83 if (require_osd_release
>= ceph_release_t::octopus
) {
86 make_message
<MOSDPGInfo2
>(
97 make_message
<MOSDPGInfo
>(
99 vector
{pg_notify_t
{to_spgid
.shard
,
101 min_epoch
, cur_epoch
,
102 info
, PastIntervals
{}}})
107 void PGPool::update(OSDMapRef map
)
109 const pg_pool_t
*pi
= map
->get_pg_pool(id
);
111 return; // pool has been deleted
114 name
= map
->get_pool_name(id
);
116 bool updated
= false;
117 if ((map
->get_epoch() != cached_epoch
+ 1) ||
118 (pi
->get_snap_epoch() == map
->get_epoch())) {
122 if (info
.is_pool_snaps_mode() && updated
) {
123 snapc
= pi
->get_snap_context();
125 cached_epoch
= map
->get_epoch();
128 /*-------------Peering State Helpers----------------*/
130 #define dout_prefix (dpp->gen_prefix(*_dout))
132 #define psdout(x) ldout(cct, x)
134 PeeringState::PeeringState(
136 pg_shard_t pg_whoami
,
140 DoutPrefixProvider
*dpp
,
142 : state_history(*pl
),
150 pg_whoami(pg_whoami
),
153 missing_loc(spgid
, this, dpp
, cct
),
154 machine(this, cct
, spgid
, dpp
, pl
, &state_history
)
159 void PeeringState::start_handle(PeeringCtx
*new_ctx
) {
161 ceph_assert(!orig_ctx
);
164 if (messages_pending_flush
) {
165 rctx
.emplace(*messages_pending_flush
, *new_ctx
);
167 rctx
.emplace(*new_ctx
);
169 rctx
->start_time
= ceph_clock_now();
173 void PeeringState::begin_block_outgoing() {
174 ceph_assert(!messages_pending_flush
);
175 ceph_assert(orig_ctx
);
177 messages_pending_flush
= BufferedRecoveryMessages(
178 orig_ctx
->require_osd_release
);
179 rctx
.emplace(*messages_pending_flush
, *orig_ctx
);
182 void PeeringState::clear_blocked_outgoing() {
183 ceph_assert(orig_ctx
);
185 messages_pending_flush
= std::optional
<BufferedRecoveryMessages
>();
188 void PeeringState::end_block_outgoing() {
189 ceph_assert(messages_pending_flush
);
190 ceph_assert(orig_ctx
);
193 orig_ctx
->accept_buffered_messages(*messages_pending_flush
);
194 rctx
.emplace(*orig_ctx
);
195 messages_pending_flush
= std::optional
<BufferedRecoveryMessages
>();
198 void PeeringState::end_handle() {
200 utime_t dur
= ceph_clock_now() - rctx
->start_time
;
201 machine
.event_time
+= dur
;
204 machine
.event_count
++;
209 void PeeringState::check_recovery_sources(const OSDMapRef
& osdmap
)
212 * check that any peers we are planning to (or currently) pulling
213 * objects from are dealt with.
215 missing_loc
.check_recovery_sources(osdmap
);
216 pl
->check_recovery_sources(osdmap
);
218 for (auto i
= peer_log_requested
.begin(); i
!= peer_log_requested
.end();) {
219 if (!osdmap
->is_up(i
->osd
)) {
220 psdout(10) << "peer_log_requested removing " << *i
<< dendl
;
221 peer_log_requested
.erase(i
++);
227 for (auto i
= peer_missing_requested
.begin();
228 i
!= peer_missing_requested
.end();) {
229 if (!osdmap
->is_up(i
->osd
)) {
230 psdout(10) << "peer_missing_requested removing " << *i
<< dendl
;
231 peer_missing_requested
.erase(i
++);
238 void PeeringState::update_history(const pg_history_t
& new_history
)
240 auto mnow
= pl
->get_mnow();
241 info
.history
.refresh_prior_readable_until_ub(mnow
, prior_readable_until_ub
);
242 if (info
.history
.merge(new_history
)) {
243 psdout(20) << __func__
<< " advanced history from " << new_history
<< dendl
;
245 if (info
.history
.last_epoch_clean
>= info
.history
.same_interval_since
) {
246 psdout(20) << __func__
<< " clearing past_intervals" << dendl
;
247 past_intervals
.clear();
248 dirty_big_info
= true;
250 prior_readable_until_ub
= info
.history
.get_prior_readable_until_ub(mnow
);
251 if (prior_readable_until_ub
!= ceph::signedspan::zero()) {
253 << " prior_readable_until_ub " << prior_readable_until_ub
254 << " (mnow " << mnow
<< " + "
255 << info
.history
.prior_readable_until_ub
<< ")" << dendl
;
258 pl
->on_info_history_change();
261 hobject_t
PeeringState::earliest_backfill() const
263 hobject_t e
= hobject_t::get_max();
264 for (const pg_shard_t
& bt
: get_backfill_targets()) {
265 const pg_info_t
&pi
= get_peer_info(bt
);
266 e
= std::min(pi
.last_backfill
, e
);
271 void PeeringState::purge_strays()
274 psdout(10) << "purge_strays " << stray_set
<< " but premerge, doing nothing"
278 if (cct
->_conf
.get_val
<bool>("osd_debug_no_purge_strays")) {
281 psdout(10) << "purge_strays " << stray_set
<< dendl
;
283 bool removed
= false;
284 for (auto p
= stray_set
.begin(); p
!= stray_set
.end(); ++p
) {
285 ceph_assert(!is_acting_recovery_backfill(*p
));
286 if (get_osdmap()->is_up(p
->osd
)) {
287 psdout(10) << "sending PGRemove to osd." << *p
<< dendl
;
288 vector
<spg_t
> to_remove
;
289 to_remove
.push_back(spg_t(info
.pgid
.pgid
, p
->shard
));
290 auto m
= make_message
<MOSDPGRemove
>(
293 pl
->send_cluster_message(p
->osd
, m
, get_osdmap_epoch());
295 psdout(10) << "not sending PGRemove to down osd." << *p
<< dendl
;
297 peer_missing
.erase(*p
);
299 missing_loc
.remove_stray_recovery_sources(*p
);
300 peer_purged
.insert(*p
);
304 // if we removed anyone, update peers (which include peer_info)
306 update_heartbeat_peers();
310 // clear _requested maps; we may have to peer() again if we discover
311 // (more) stray content
312 peer_log_requested
.clear();
313 peer_missing_requested
.clear();
316 void PeeringState::query_unfound(Formatter
*f
, string state
)
318 psdout(20) << "Enter PeeringState common QueryUnfound" << dendl
;
320 f
->dump_string("state", state
);
321 f
->dump_bool("available_might_have_unfound", true);
322 f
->open_array_section("might_have_unfound");
323 for (auto p
= might_have_unfound
.begin();
324 p
!= might_have_unfound
.end();
326 if (peer_missing
.count(*p
)) {
327 ; // Ignore already probed OSDs
329 f
->open_object_section("osd");
330 f
->dump_stream("osd") << *p
;
331 if (peer_missing_requested
.count(*p
)) {
332 f
->dump_string("status", "querying");
333 } else if (!get_osdmap()->is_up(p
->osd
)) {
334 f
->dump_string("status", "osd is down");
336 f
->dump_string("status", "not queried");
343 psdout(20) << "Exit PeeringState common QueryUnfound" << dendl
;
347 bool PeeringState::proc_replica_info(
348 pg_shard_t from
, const pg_info_t
&oinfo
, epoch_t send_epoch
)
350 auto p
= peer_info
.find(from
);
351 if (p
!= peer_info
.end() && p
->second
.last_update
== oinfo
.last_update
) {
352 psdout(10) << " got dup osd." << from
<< " info "
353 << oinfo
<< ", identical to ours" << dendl
;
357 if (!get_osdmap()->has_been_up_since(from
.osd
, send_epoch
)) {
358 psdout(10) << " got info " << oinfo
<< " from down osd." << from
359 << " discarding" << dendl
;
363 psdout(10) << " got osd." << from
<< " " << oinfo
<< dendl
;
364 ceph_assert(is_primary());
365 peer_info
[from
] = oinfo
;
366 might_have_unfound
.insert(from
);
368 update_history(oinfo
.history
);
371 if (!is_up(from
) && !is_acting(from
)) {
372 psdout(10) << " osd." << from
<< " has stray content: " << oinfo
<< dendl
;
373 stray_set
.insert(from
);
379 // was this a new info? if so, update peers!
380 if (p
== peer_info
.end())
381 update_heartbeat_peers();
387 void PeeringState::remove_down_peer_info(const OSDMapRef
&osdmap
)
389 // Remove any downed osds from peer_info
390 bool removed
= false;
391 auto p
= peer_info
.begin();
392 while (p
!= peer_info
.end()) {
393 if (!osdmap
->is_up(p
->first
.osd
)) {
394 psdout(10) << " dropping down osd." << p
->first
<< " info " << p
->second
<< dendl
;
395 peer_missing
.erase(p
->first
);
396 peer_log_requested
.erase(p
->first
);
397 peer_missing_requested
.erase(p
->first
);
398 peer_info
.erase(p
++);
404 // Remove any downed osds from peer_purged so we can re-purge if necessary
405 auto it
= peer_purged
.begin();
406 while (it
!= peer_purged
.end()) {
407 if (!osdmap
->is_up(it
->osd
)) {
408 psdout(10) << " dropping down osd." << *it
<< " from peer_purged" << dendl
;
409 peer_purged
.erase(it
++);
415 // if we removed anyone, update peers (which include peer_info)
417 update_heartbeat_peers();
419 check_recovery_sources(osdmap
);
422 void PeeringState::update_heartbeat_peers()
428 for (unsigned i
=0; i
<acting
.size(); i
++) {
429 if (acting
[i
] != CRUSH_ITEM_NONE
)
430 new_peers
.insert(acting
[i
]);
432 for (unsigned i
=0; i
<up
.size(); i
++) {
433 if (up
[i
] != CRUSH_ITEM_NONE
)
434 new_peers
.insert(up
[i
]);
436 for (auto p
= peer_info
.begin(); p
!= peer_info
.end(); ++p
) {
437 new_peers
.insert(p
->first
.osd
);
439 pl
->update_heartbeat_peers(std::move(new_peers
));
442 void PeeringState::write_if_dirty(ObjectStore::Transaction
& t
)
451 last_persisted_osdmap
< get_osdmap_epoch(),
453 if (dirty_info
|| dirty_big_info
) {
454 last_persisted_osdmap
= get_osdmap_epoch();
455 last_written_info
= info
;
457 dirty_big_info
= false;
461 void PeeringState::advance_map(
462 OSDMapRef osdmap
, OSDMapRef lastmap
,
463 vector
<int>& newup
, int up_primary
,
464 vector
<int>& newacting
, int acting_primary
,
467 ceph_assert(lastmap
== osdmap_ref
);
468 psdout(10) << "handle_advance_map "
469 << newup
<< "/" << newacting
470 << " -- " << up_primary
<< "/" << acting_primary
473 update_osdmap_ref(osdmap
);
477 osdmap
, lastmap
, newup
, up_primary
,
478 newacting
, acting_primary
);
479 handle_event(evt
, &rctx
);
480 if (pool
.info
.last_change
== osdmap_ref
->get_epoch()) {
481 pl
->on_pool_change();
483 readable_interval
= pool
.get_readable_interval(cct
->_conf
);
484 last_require_osd_release
= osdmap
->require_osd_release
;
487 void PeeringState::activate_map(PeeringCtx
&rctx
)
489 psdout(10) << __func__
<< dendl
;
491 handle_event(evt
, &rctx
);
492 if (osdmap_ref
->get_epoch() - last_persisted_osdmap
>
493 cct
->_conf
->osd_pg_epoch_persisted_max_stale
) {
494 psdout(20) << __func__
<< ": Dirtying info: last_persisted is "
495 << last_persisted_osdmap
496 << " while current is " << osdmap_ref
->get_epoch() << dendl
;
499 psdout(20) << __func__
<< ": Not dirtying info: last_persisted is "
500 << last_persisted_osdmap
501 << " while current is " << osdmap_ref
->get_epoch() << dendl
;
503 write_if_dirty(rctx
.transaction
);
505 if (get_osdmap()->check_new_blocklist_entries()) {
506 pl
->check_blocklisted_watchers();
510 void PeeringState::set_last_peering_reset()
512 psdout(20) << "set_last_peering_reset " << get_osdmap_epoch() << dendl
;
513 if (last_peering_reset
!= get_osdmap_epoch()) {
514 last_peering_reset
= get_osdmap_epoch();
515 psdout(10) << "Clearing blocked outgoing recovery messages" << dendl
;
516 clear_blocked_outgoing();
517 if (!pl
->try_flush_or_schedule_async()) {
518 psdout(10) << "Beginning to block outgoing recovery messages" << dendl
;
519 begin_block_outgoing();
521 psdout(10) << "Not blocking outgoing recovery messages" << dendl
;
526 void PeeringState::complete_flush()
528 flushes_in_progress
--;
529 if (flushes_in_progress
== 0) {
534 void PeeringState::check_full_transition(OSDMapRef lastmap
, OSDMapRef osdmap
)
536 const pg_pool_t
*pi
= osdmap
->get_pg_pool(info
.pgid
.pool());
538 return; // pool deleted
540 bool changed
= false;
541 if (pi
->has_flag(pg_pool_t::FLAG_FULL
)) {
542 const pg_pool_t
*opi
= lastmap
->get_pg_pool(info
.pgid
.pool());
543 if (!opi
|| !opi
->has_flag(pg_pool_t::FLAG_FULL
)) {
544 psdout(10) << " pool was marked full in " << osdmap
->get_epoch() << dendl
;
549 info
.history
.last_epoch_marked_full
= osdmap
->get_epoch();
554 bool PeeringState::should_restart_peering(
556 int newactingprimary
,
557 const vector
<int>& newup
,
558 const vector
<int>& newacting
,
562 if (PastIntervals::is_new_interval(
574 psdout(20) << "new interval newup " << newup
575 << " newacting " << newacting
<< dendl
;
578 if (!lastmap
->is_up(pg_whoami
.osd
) && osdmap
->is_up(pg_whoami
.osd
)) {
579 psdout(10) << __func__
<< " osd transitioned from down -> up"
586 /* Called before initializing peering during advance_map */
587 void PeeringState::start_peering_interval(
588 const OSDMapRef lastmap
,
589 const vector
<int>& newup
, int new_up_primary
,
590 const vector
<int>& newacting
, int new_acting_primary
,
591 ObjectStore::Transaction
&t
)
593 const OSDMapRef osdmap
= get_osdmap();
595 set_last_peering_reset();
597 vector
<int> oldacting
, oldup
;
598 int oldrole
= get_role();
601 pl
->clear_ready_to_merge();
605 pg_shard_t old_acting_primary
= get_primary();
606 pg_shard_t old_up_primary
= up_primary
;
607 bool was_old_primary
= is_primary();
608 bool was_old_nonprimary
= is_nonprimary();
610 acting
.swap(oldacting
);
612 init_primary_up_acting(
618 if (info
.stats
.up
!= up
||
619 info
.stats
.acting
!= acting
||
620 info
.stats
.up_primary
!= new_up_primary
||
621 info
.stats
.acting_primary
!= new_acting_primary
) {
623 info
.stats
.up_primary
= new_up_primary
;
624 info
.stats
.acting
= acting
;
625 info
.stats
.acting_primary
= new_acting_primary
;
626 info
.stats
.mapping_epoch
= osdmap
->get_epoch();
629 pl
->clear_publish_stats();
631 // This will now be remapped during a backfill in cases
632 // that it would not have been before.
634 state_set(PG_STATE_REMAPPED
);
636 state_clear(PG_STATE_REMAPPED
);
638 int role
= osdmap
->calc_pg_role(pg_whoami
, acting
);
641 // did acting, up, primary|acker change?
643 psdout(10) << " no lastmap" << dendl
;
645 dirty_big_info
= true;
646 info
.history
.same_interval_since
= osdmap
->get_epoch();
648 std::stringstream debug
;
649 ceph_assert(info
.history
.same_interval_since
!= 0);
650 bool new_interval
= PastIntervals::check_new_interval(
651 old_acting_primary
.osd
,
653 oldacting
, newacting
,
657 info
.history
.same_interval_since
,
658 info
.history
.last_epoch_clean
,
662 missing_loc
.get_recoverable_predicate(),
665 psdout(10) << __func__
<< ": check_new_interval output: "
666 << debug
.str() << dendl
;
668 if (osdmap
->get_epoch() == pl
->oldest_stored_osdmap() &&
669 info
.history
.last_epoch_clean
< osdmap
->get_epoch()) {
670 psdout(10) << " map gap, clearing past_intervals and faking" << dendl
;
671 // our information is incomplete and useless; someone else was clean
672 // after everything we know if osdmaps were trimmed.
673 past_intervals
.clear();
675 psdout(10) << " noting past " << past_intervals
<< dendl
;
678 dirty_big_info
= true;
679 info
.history
.same_interval_since
= osdmap
->get_epoch();
680 if (osdmap
->have_pg_pool(info
.pgid
.pgid
.pool()) &&
681 info
.pgid
.pgid
.is_split(lastmap
->get_pg_num(info
.pgid
.pgid
.pool()),
682 osdmap
->get_pg_num(info
.pgid
.pgid
.pool()),
684 info
.history
.last_epoch_split
= osdmap
->get_epoch();
689 if (old_up_primary
!= up_primary
||
691 info
.history
.same_up_since
= osdmap
->get_epoch();
693 // this comparison includes primary rank via pg_shard_t
694 if (old_acting_primary
!= get_primary()) {
695 info
.history
.same_primary_since
= osdmap
->get_epoch();
699 pl
->on_info_history_change();
701 psdout(1) << __func__
<< " up " << oldup
<< " -> " << up
702 << ", acting " << oldacting
<< " -> " << acting
703 << ", acting_primary " << old_acting_primary
<< " -> "
704 << new_acting_primary
705 << ", up_primary " << old_up_primary
<< " -> " << new_up_primary
706 << ", role " << oldrole
<< " -> " << role
707 << ", features acting " << acting_features
708 << " upacting " << upacting_features
712 state_clear(PG_STATE_ACTIVE
);
713 state_clear(PG_STATE_PEERED
);
714 state_clear(PG_STATE_PREMERGE
);
715 state_clear(PG_STATE_DOWN
);
716 state_clear(PG_STATE_RECOVERY_WAIT
);
717 state_clear(PG_STATE_RECOVERY_TOOFULL
);
718 state_clear(PG_STATE_RECOVERING
);
721 acting_recovery_backfill
.clear();
723 // reset primary/replica state?
724 if (was_old_primary
|| is_primary()) {
725 pl
->clear_want_pg_temp();
726 } else if (was_old_nonprimary
|| is_nonprimary()) {
727 pl
->clear_want_pg_temp();
729 clear_primary_state();
733 ceph_assert(!deleting
);
735 // should we tell the primary we are here?
736 send_notify
= !is_primary();
738 if (role
!= oldrole
||
739 was_old_primary
!= is_primary()) {
740 // did primary change?
741 if (was_old_primary
!= is_primary()) {
742 state_clear(PG_STATE_CLEAN
);
745 pl
->on_role_change();
748 // did primary change?
749 if (get_primary() != old_acting_primary
) {
750 psdout(10) << oldacting
<< " -> " << acting
751 << ", acting primary "
752 << old_acting_primary
<< " -> " << get_primary()
755 // primary is the same.
757 // i am (still) primary. but my replica set changed.
758 state_clear(PG_STATE_CLEAN
);
760 psdout(10) << oldacting
<< " -> " << acting
761 << ", replicas changed" << dendl
;
766 if (acting
.empty() && !up
.empty() && up_primary
== pg_whoami
) {
767 psdout(10) << " acting empty, but i am up[0], clearing pg_temp" << dendl
;
768 pl
->queue_want_pg_temp(acting
);
772 void PeeringState::on_new_interval()
774 dout(20) << __func__
<< dendl
;
775 const OSDMapRef osdmap
= get_osdmap();
777 // initialize features
778 acting_features
= CEPH_FEATURES_SUPPORTED_DEFAULT
;
779 upacting_features
= CEPH_FEATURES_SUPPORTED_DEFAULT
;
780 for (auto p
= acting
.begin(); p
!= acting
.end(); ++p
) {
781 if (*p
== CRUSH_ITEM_NONE
)
783 uint64_t f
= osdmap
->get_xinfo(*p
).features
;
784 acting_features
&= f
;
785 upacting_features
&= f
;
787 for (auto p
= up
.begin(); p
!= up
.end(); ++p
) {
788 if (*p
== CRUSH_ITEM_NONE
)
790 upacting_features
&= osdmap
->get_xinfo(*p
).features
;
792 psdout(20) << __func__
<< " upacting_features 0x" << std::hex
793 << upacting_features
<< std::dec
794 << " from " << acting
<< "+" << up
<< dendl
;
796 psdout(20) << __func__
<< " checking missing set deletes flag. missing = "
797 << get_pg_log().get_missing() << dendl
;
799 if (!pg_log
.get_missing().may_include_deletes
&&
800 !perform_deletes_during_peering()) {
801 pl
->rebuild_missing_set_with_deletes(pg_log
);
804 pg_log
.get_missing().may_include_deletes
==
805 !perform_deletes_during_peering());
809 // update lease bounds for a new interval
810 auto mnow
= pl
->get_mnow();
811 prior_readable_until_ub
= std::max(prior_readable_until_ub
,
813 prior_readable_until_ub
= info
.history
.refresh_prior_readable_until_ub(
814 mnow
, prior_readable_until_ub
);
815 psdout(10) << __func__
<< " prior_readable_until_ub "
816 << prior_readable_until_ub
<< " (mnow " << mnow
<< " + "
817 << info
.history
.prior_readable_until_ub
<< ")" << dendl
;
818 prior_readable_down_osds
.clear(); // we populate this when we build the priorset
822 readable_until_ub_sent
=
823 readable_until_ub_from_primary
= ceph::signedspan::zero();
825 acting_readable_until_ub
.clear();
827 acting_readable_until_ub
.resize(acting
.size(), ceph::signedspan::zero());
830 pl
->on_new_interval();
833 void PeeringState::init_primary_up_acting(
834 const vector
<int> &newup
,
835 const vector
<int> &newacting
,
837 int new_acting_primary
)
841 for (uint8_t i
= 0; i
< acting
.size(); ++i
) {
842 if (acting
[i
] != CRUSH_ITEM_NONE
)
846 pool
.info
.is_erasure() ? shard_id_t(i
) : shard_id_t::NO_SHARD
));
850 for (uint8_t i
= 0; i
< up
.size(); ++i
) {
851 if (up
[i
] != CRUSH_ITEM_NONE
)
855 pool
.info
.is_erasure() ? shard_id_t(i
) : shard_id_t::NO_SHARD
));
857 if (!pool
.info
.is_erasure()) {
859 up_primary
= pg_shard_t(new_up_primary
, shard_id_t::NO_SHARD
);
860 primary
= pg_shard_t(new_acting_primary
, shard_id_t::NO_SHARD
);
863 up_primary
= pg_shard_t();
864 primary
= pg_shard_t();
865 for (uint8_t i
= 0; i
< up
.size(); ++i
) {
866 if (up
[i
] == new_up_primary
) {
867 up_primary
= pg_shard_t(up
[i
], shard_id_t(i
));
871 for (uint8_t i
= 0; i
< acting
.size(); ++i
) {
872 if (acting
[i
] == new_acting_primary
) {
873 primary
= pg_shard_t(acting
[i
], shard_id_t(i
));
877 ceph_assert(up_primary
.osd
== new_up_primary
);
878 ceph_assert(primary
.osd
== new_acting_primary
);
882 void PeeringState::init_hb_stamps()
885 // we care about all other osds in the acting set
886 hb_stamps
.resize(acting
.size() - 1);
888 for (auto p
: acting
) {
889 if (p
== CRUSH_ITEM_NONE
|| p
== get_primary().osd
) {
892 hb_stamps
[i
++] = pl
->get_hb_stamps(p
);
895 } else if (is_nonprimary()) {
896 // we care about just the primary
898 hb_stamps
[0] = pl
->get_hb_stamps(get_primary().osd
);
902 dout(10) << __func__
<< " now " << hb_stamps
<< dendl
;
906 void PeeringState::clear_recovery_state()
908 async_recovery_targets
.clear();
909 backfill_targets
.clear();
912 void PeeringState::clear_primary_state()
914 psdout(10) << "clear_primary_state" << dendl
;
916 // clear peering state
918 peer_log_requested
.clear();
919 peer_missing_requested
.clear();
922 peer_missing
.clear();
923 peer_last_complete_ondisk
.clear();
924 peer_activated
.clear();
925 min_last_complete_ondisk
= eversion_t();
926 pg_trim_to
= eversion_t();
927 might_have_unfound
.clear();
928 need_up_thru
= false;
930 pg_log
.reset_recovery_pointers();
932 clear_recovery_state();
934 last_update_ondisk
= eversion_t();
936 pl
->clear_primary_state();
939 /// return [start,end) bounds for required past_intervals
940 static pair
<epoch_t
, epoch_t
> get_required_past_interval_bounds(
941 const pg_info_t
&info
,
942 epoch_t oldest_map
) {
943 epoch_t start
= std::max(
944 info
.history
.last_epoch_clean
? info
.history
.last_epoch_clean
:
945 info
.history
.epoch_pool_created
,
947 epoch_t end
= std::max(
948 info
.history
.same_interval_since
,
949 info
.history
.epoch_pool_created
);
950 return make_pair(start
, end
);
954 void PeeringState::check_past_interval_bounds() const
956 auto oldest_epoch
= pl
->oldest_stored_osdmap();
957 auto rpib
= get_required_past_interval_bounds(
960 if (rpib
.first
>= rpib
.second
) {
961 // do not warn if the start bound is dictated by oldest_map; the
962 // past intervals are presumably appropriate given the pg info.
963 if (!past_intervals
.empty() &&
964 rpib
.first
> oldest_epoch
) {
965 pl
->get_clog_error() << info
.pgid
<< " required past_interval bounds are"
966 << " empty [" << rpib
<< ") but past_intervals is not: "
968 derr
<< info
.pgid
<< " required past_interval bounds are"
969 << " empty [" << rpib
<< ") but past_intervals is not: "
970 << past_intervals
<< dendl
;
973 if (past_intervals
.empty()) {
974 pl
->get_clog_error() << info
.pgid
<< " required past_interval bounds are"
975 << " not empty [" << rpib
<< ") but past_intervals "
976 << past_intervals
<< " is empty";
977 derr
<< info
.pgid
<< " required past_interval bounds are"
978 << " not empty [" << rpib
<< ") but past_intervals "
979 << past_intervals
<< " is empty" << dendl
;
980 ceph_assert(!past_intervals
.empty());
983 auto apib
= past_intervals
.get_bounds();
984 if (apib
.first
> rpib
.first
) {
985 pl
->get_clog_error() << info
.pgid
<< " past_intervals [" << apib
986 << ") start interval does not contain the required"
987 << " bound [" << rpib
<< ") start";
988 derr
<< info
.pgid
<< " past_intervals [" << apib
989 << ") start interval does not contain the required"
990 << " bound [" << rpib
<< ") start" << dendl
;
991 ceph_abort_msg("past_interval start interval mismatch");
993 if (apib
.second
!= rpib
.second
) {
994 pl
->get_clog_error() << info
.pgid
<< " past_interal bound [" << apib
995 << ") end does not match required [" << rpib
997 derr
<< info
.pgid
<< " past_interal bound [" << apib
998 << ") end does not match required [" << rpib
1000 ceph_abort_msg("past_interval end mismatch");
1005 int PeeringState::clamp_recovery_priority(int priority
, int pool_recovery_priority
, int max
)
1007 static_assert(OSD_RECOVERY_PRIORITY_MIN
< OSD_RECOVERY_PRIORITY_MAX
, "Invalid priority range");
1008 static_assert(OSD_RECOVERY_PRIORITY_MIN
>= 0, "Priority range must match unsigned type");
1010 ceph_assert(max
<= OSD_RECOVERY_PRIORITY_MAX
);
1012 // User can't set this too high anymore, but might be a legacy value
1013 if (pool_recovery_priority
> OSD_POOL_PRIORITY_MAX
)
1014 pool_recovery_priority
= OSD_POOL_PRIORITY_MAX
;
1015 if (pool_recovery_priority
< OSD_POOL_PRIORITY_MIN
)
1016 pool_recovery_priority
= OSD_POOL_PRIORITY_MIN
;
1017 // Shift range from min to max to 0 to max - min
1018 pool_recovery_priority
+= (0 - OSD_POOL_PRIORITY_MIN
);
1019 ceph_assert(pool_recovery_priority
>= 0 && pool_recovery_priority
<= (OSD_POOL_PRIORITY_MAX
- OSD_POOL_PRIORITY_MIN
));
1021 priority
+= pool_recovery_priority
;
1023 // Clamp to valid range
1024 if (priority
> max
) {
1026 } else if (priority
< OSD_RECOVERY_PRIORITY_MIN
) {
1027 return OSD_RECOVERY_PRIORITY_MIN
;
1033 unsigned PeeringState::get_recovery_priority()
1035 // a higher value -> a higher priority
1036 int ret
= OSD_RECOVERY_PRIORITY_BASE
;
1039 if (state
& PG_STATE_FORCED_RECOVERY
) {
1040 ret
= OSD_RECOVERY_PRIORITY_FORCED
;
1042 // XXX: This priority boost isn't so much about inactive, but about data-at-risk
1043 if (is_degraded() && info
.stats
.avail_no_missing
.size() < pool
.info
.min_size
) {
1044 base
= OSD_RECOVERY_INACTIVE_PRIORITY_BASE
;
1045 // inactive: no. of replicas < min_size, highest priority since it blocks IO
1046 ret
= base
+ (pool
.info
.min_size
- info
.stats
.avail_no_missing
.size());
1049 int64_t pool_recovery_priority
= 0;
1050 pool
.info
.opts
.get(pool_opts_t::RECOVERY_PRIORITY
, &pool_recovery_priority
);
1052 ret
= clamp_recovery_priority(ret
, pool_recovery_priority
, max_prio_map
[base
]);
1054 psdout(20) << __func__
<< " recovery priority is " << ret
<< dendl
;
1055 return static_cast<unsigned>(ret
);
1058 unsigned PeeringState::get_backfill_priority()
1060 // a higher value -> a higher priority
1061 int ret
= OSD_BACKFILL_PRIORITY_BASE
;
1064 if (state
& PG_STATE_FORCED_BACKFILL
) {
1065 ret
= OSD_BACKFILL_PRIORITY_FORCED
;
1067 if (actingset
.size() < pool
.info
.min_size
) {
1068 base
= OSD_BACKFILL_INACTIVE_PRIORITY_BASE
;
1069 // inactive: no. of replicas < min_size, highest priority since it blocks IO
1070 ret
= base
+ (pool
.info
.min_size
- actingset
.size());
1072 } else if (is_undersized()) {
1073 // undersized: OSD_BACKFILL_DEGRADED_PRIORITY_BASE + num missing replicas
1074 ceph_assert(pool
.info
.size
> actingset
.size());
1075 base
= OSD_BACKFILL_DEGRADED_PRIORITY_BASE
;
1076 ret
= base
+ (pool
.info
.size
- actingset
.size());
1078 } else if (is_degraded()) {
1079 // degraded: baseline degraded
1080 base
= ret
= OSD_BACKFILL_DEGRADED_PRIORITY_BASE
;
1083 // Adjust with pool's recovery priority
1084 int64_t pool_recovery_priority
= 0;
1085 pool
.info
.opts
.get(pool_opts_t::RECOVERY_PRIORITY
, &pool_recovery_priority
);
1087 ret
= clamp_recovery_priority(ret
, pool_recovery_priority
, max_prio_map
[base
]);
1090 psdout(20) << __func__
<< " backfill priority is " << ret
<< dendl
;
1091 return static_cast<unsigned>(ret
);
1094 unsigned PeeringState::get_delete_priority()
1096 auto state
= get_osdmap()->get_state(pg_whoami
.osd
);
1097 if (state
& (CEPH_OSD_BACKFILLFULL
|
1099 return OSD_DELETE_PRIORITY_FULL
;
1100 } else if (state
& CEPH_OSD_NEARFULL
) {
1101 return OSD_DELETE_PRIORITY_FULLISH
;
1103 return OSD_DELETE_PRIORITY_NORMAL
;
1107 bool PeeringState::set_force_recovery(bool b
)
1111 if (!(state
& PG_STATE_FORCED_RECOVERY
) &&
1112 (state
& (PG_STATE_DEGRADED
|
1113 PG_STATE_RECOVERY_WAIT
|
1114 PG_STATE_RECOVERING
))) {
1115 psdout(20) << __func__
<< " set" << dendl
;
1116 state_set(PG_STATE_FORCED_RECOVERY
);
1117 pl
->publish_stats_to_osd();
1120 } else if (state
& PG_STATE_FORCED_RECOVERY
) {
1121 psdout(20) << __func__
<< " clear" << dendl
;
1122 state_clear(PG_STATE_FORCED_RECOVERY
);
1123 pl
->publish_stats_to_osd();
1127 psdout(20) << __func__
<< " state " << get_current_state()
1129 pl
->update_local_background_io_priority(get_recovery_priority());
1134 bool PeeringState::set_force_backfill(bool b
)
1138 if (!(state
& PG_STATE_FORCED_BACKFILL
) &&
1139 (state
& (PG_STATE_DEGRADED
|
1140 PG_STATE_BACKFILL_WAIT
|
1141 PG_STATE_BACKFILLING
))) {
1142 psdout(10) << __func__
<< " set" << dendl
;
1143 state_set(PG_STATE_FORCED_BACKFILL
);
1144 pl
->publish_stats_to_osd();
1147 } else if (state
& PG_STATE_FORCED_BACKFILL
) {
1148 psdout(10) << __func__
<< " clear" << dendl
;
1149 state_clear(PG_STATE_FORCED_BACKFILL
);
1150 pl
->publish_stats_to_osd();
1154 psdout(20) << __func__
<< " state " << get_current_state()
1156 pl
->update_local_background_io_priority(get_backfill_priority());
1161 void PeeringState::schedule_renew_lease()
1163 pl
->schedule_renew_lease(
1165 readable_interval
/ 2);
1168 void PeeringState::send_lease()
1170 epoch_t epoch
= pl
->get_osdmap_epoch();
1171 for (auto peer
: actingset
) {
1172 if (peer
== pg_whoami
) {
1175 pl
->send_cluster_message(
1177 make_message
<MOSDPGLease
>(epoch
,
1178 spg_t(spgid
.pgid
, peer
.shard
),
1184 void PeeringState::proc_lease(const pg_lease_t
& l
)
1186 if (!HAVE_FEATURE(upacting_features
, SERVER_OCTOPUS
)) {
1187 psdout(20) << __func__
<< " no-op, upacting_features 0x" << std::hex
1188 << upacting_features
<< std::dec
1189 << " does not include SERVER_OCTOPUS" << dendl
;
1192 if (!is_nonprimary()) {
1193 psdout(20) << __func__
<< " no-op, !nonprimary" << dendl
;
1196 psdout(10) << __func__
<< " " << l
<< dendl
;
1197 if (l
.readable_until_ub
> readable_until_ub_from_primary
) {
1198 readable_until_ub_from_primary
= l
.readable_until_ub
;
1201 ceph::signedspan ru
= ceph::signedspan::zero();
1202 if (l
.readable_until
!= ceph::signedspan::zero() &&
1203 hb_stamps
[0]->peer_clock_delta_ub
) {
1204 ru
= l
.readable_until
- *hb_stamps
[0]->peer_clock_delta_ub
;
1205 psdout(20) << " peer_clock_delta_ub " << *hb_stamps
[0]->peer_clock_delta_ub
1206 << " -> ru " << ru
<< dendl
;
1208 if (ru
> readable_until
) {
1209 readable_until
= ru
;
1210 psdout(20) << __func__
<< " readable_until now " << readable_until
<< dendl
;
1211 // NOTE: if we ever decide to block/queue ops on the replica,
1212 // we'll need to wake them up here.
1215 ceph::signedspan ruub
;
1216 if (hb_stamps
[0]->peer_clock_delta_lb
) {
1217 ruub
= l
.readable_until_ub
- *hb_stamps
[0]->peer_clock_delta_lb
;
1218 psdout(20) << " peer_clock_delta_lb " << *hb_stamps
[0]->peer_clock_delta_lb
1219 << " -> ruub " << ruub
<< dendl
;
1221 ruub
= pl
->get_mnow() + l
.interval
;
1222 psdout(20) << " no peer_clock_delta_lb -> ruub " << ruub
<< dendl
;
1224 if (ruub
> readable_until_ub
) {
1225 readable_until_ub
= ruub
;
1226 psdout(20) << __func__
<< " readable_until_ub now " << readable_until_ub
1231 void PeeringState::proc_lease_ack(int from
, const pg_lease_ack_t
& a
)
1233 if (!HAVE_FEATURE(upacting_features
, SERVER_OCTOPUS
)) {
1236 auto now
= pl
->get_mnow();
1237 bool was_min
= false;
1238 for (unsigned i
= 0; i
< acting
.size(); ++i
) {
1239 if (from
== acting
[i
]) {
1240 // the lease_ack value is based on the primary's clock
1241 if (a
.readable_until_ub
> acting_readable_until_ub
[i
]) {
1242 if (acting_readable_until_ub
[i
] == readable_until
) {
1245 acting_readable_until_ub
[i
] = a
.readable_until_ub
;
1251 auto old_ru
= readable_until
;
1252 recalc_readable_until();
1254 pl
->recheck_readable();
1259 void PeeringState::proc_renew_lease()
1261 if (!HAVE_FEATURE(upacting_features
, SERVER_OCTOPUS
)) {
1264 renew_lease(pl
->get_mnow());
1266 schedule_renew_lease();
1269 void PeeringState::recalc_readable_until()
1271 assert(is_primary());
1272 ceph::signedspan min
= readable_until_ub_sent
;
1273 for (unsigned i
= 0; i
< acting
.size(); ++i
) {
1274 if (acting
[i
] == pg_whoami
.osd
|| acting
[i
] == CRUSH_ITEM_NONE
) {
1277 dout(20) << __func__
<< " peer osd." << acting
[i
]
1278 << " ruub " << acting_readable_until_ub
[i
] << dendl
;
1279 if (acting_readable_until_ub
[i
] < min
) {
1280 min
= acting_readable_until_ub
[i
];
1283 readable_until
= min
;
1284 readable_until_ub
= min
;
1285 dout(20) << __func__
<< " readable_until[_ub] " << readable_until
1286 << " (sent " << readable_until_ub_sent
<< ")" << dendl
;
1289 bool PeeringState::check_prior_readable_down_osds(const OSDMapRef
& map
)
1291 if (!HAVE_FEATURE(upacting_features
, SERVER_OCTOPUS
)) {
1294 bool changed
= false;
1295 auto p
= prior_readable_down_osds
.begin();
1296 while (p
!= prior_readable_down_osds
.end()) {
1297 if (map
->is_dead(*p
)) {
1298 dout(10) << __func__
<< " prior_readable_down_osds osd." << *p
1299 << " is dead as of epoch " << map
->get_epoch()
1301 p
= prior_readable_down_osds
.erase(p
);
1307 if (changed
&& prior_readable_down_osds
.empty()) {
1308 psdout(10) << " empty prior_readable_down_osds, clearing ub" << dendl
;
1309 clear_prior_readable_until_ub();
1315 bool PeeringState::adjust_need_up_thru(const OSDMapRef osdmap
)
1317 epoch_t up_thru
= osdmap
->get_up_thru(pg_whoami
.osd
);
1319 up_thru
>= info
.history
.same_interval_since
) {
1320 psdout(10) << "adjust_need_up_thru now "
1321 << up_thru
<< ", need_up_thru now false" << dendl
;
1322 need_up_thru
= false;
1328 PastIntervals::PriorSet
PeeringState::build_prior()
1332 for (auto it
= peer_info
.begin(); it
!= peer_info
.end(); ++it
) {
1333 ceph_assert(info
.history
.last_epoch_started
>=
1334 it
->second
.history
.last_epoch_started
);
1338 const OSDMap
&osdmap
= *get_osdmap();
1339 PastIntervals::PriorSet prior
= past_intervals
.get_prior_set(
1340 pool
.info
.is_erasure(),
1341 info
.history
.last_epoch_started
,
1342 &missing_loc
.get_recoverable_predicate(),
1343 [&](epoch_t start
, int osd
, epoch_t
*lost_at
) {
1344 const osd_info_t
*pinfo
= 0;
1345 if (osdmap
.exists(osd
)) {
1346 pinfo
= &osdmap
.get_info(osd
);
1348 *lost_at
= pinfo
->lost_at
;
1351 if (osdmap
.is_up(osd
)) {
1352 return PastIntervals::UP
;
1353 } else if (!pinfo
) {
1354 return PastIntervals::DNE
;
1355 } else if (pinfo
->lost_at
> start
) {
1356 return PastIntervals::LOST
;
1358 return PastIntervals::DOWN
;
1365 if (prior
.pg_down
) {
1366 state_set(PG_STATE_DOWN
);
1369 if (get_osdmap()->get_up_thru(pg_whoami
.osd
) <
1370 info
.history
.same_interval_since
) {
1371 psdout(10) << "up_thru " << get_osdmap()->get_up_thru(pg_whoami
.osd
)
1372 << " < same_since " << info
.history
.same_interval_since
1373 << ", must notify monitor" << dendl
;
1374 need_up_thru
= true;
1376 psdout(10) << "up_thru " << get_osdmap()->get_up_thru(pg_whoami
.osd
)
1377 << " >= same_since " << info
.history
.same_interval_since
1378 << ", all is well" << dendl
;
1379 need_up_thru
= false;
1381 pl
->set_probe_targets(prior
.probe
);
1385 bool PeeringState::needs_recovery() const
1387 ceph_assert(is_primary());
1389 auto &missing
= pg_log
.get_missing();
1391 if (missing
.num_missing()) {
1392 psdout(10) << __func__
<< " primary has " << missing
.num_missing()
1393 << " missing" << dendl
;
1397 ceph_assert(!acting_recovery_backfill
.empty());
1398 for (const pg_shard_t
& peer
: acting_recovery_backfill
) {
1399 if (peer
== get_primary()) {
1402 auto pm
= peer_missing
.find(peer
);
1403 if (pm
== peer_missing
.end()) {
1404 psdout(10) << __func__
<< " osd." << peer
<< " doesn't have missing set"
1408 if (pm
->second
.num_missing()) {
1409 psdout(10) << __func__
<< " osd." << peer
<< " has "
1410 << pm
->second
.num_missing() << " missing" << dendl
;
1415 psdout(10) << __func__
<< " is recovered" << dendl
;
1419 bool PeeringState::needs_backfill() const
1421 ceph_assert(is_primary());
1423 // We can assume that only possible osds that need backfill
1424 // are on the backfill_targets vector nodes.
1425 for (const pg_shard_t
& peer
: backfill_targets
) {
1426 auto pi
= peer_info
.find(peer
);
1427 ceph_assert(pi
!= peer_info
.end());
1428 if (!pi
->second
.last_backfill
.is_max()) {
1429 psdout(10) << __func__
<< " osd." << peer
1430 << " has last_backfill " << pi
->second
.last_backfill
<< dendl
;
1435 psdout(10) << __func__
<< " does not need backfill" << dendl
;
1440 * Returns true unless there is a non-lost OSD in might_have_unfound.
1442 bool PeeringState::all_unfound_are_queried_or_lost(
1443 const OSDMapRef osdmap
) const
1445 ceph_assert(is_primary());
1447 auto peer
= might_have_unfound
.begin();
1448 auto mend
= might_have_unfound
.end();
1449 for (; peer
!= mend
; ++peer
) {
1450 if (peer_missing
.count(*peer
))
1452 auto iter
= peer_info
.find(*peer
);
1453 if (iter
!= peer_info
.end() &&
1454 (iter
->second
.is_empty() || iter
->second
.dne()))
1456 if (!osdmap
->exists(peer
->osd
))
1458 const osd_info_t
&osd_info(osdmap
->get_info(peer
->osd
));
1459 if (osd_info
.lost_at
<= osd_info
.up_from
) {
1460 // If there is even one OSD in might_have_unfound that isn't lost, we
1461 // still might retrieve our unfound.
1465 psdout(10) << "all_unfound_are_queried_or_lost all of might_have_unfound "
1466 << might_have_unfound
1467 << " have been queried or are marked lost" << dendl
;
1472 void PeeringState::reject_reservation()
1474 pl
->unreserve_recovery_space();
1475 pl
->send_cluster_message(
1477 make_message
<MBackfillReserve
>(
1478 MBackfillReserve::REJECT_TOOFULL
,
1479 spg_t(info
.pgid
.pgid
, primary
.shard
),
1480 get_osdmap_epoch()),
1481 get_osdmap_epoch());
1487 * Returns an iterator to the best info in infos sorted by:
1488 * 1) Prefer newer last_update
1489 * 2) Prefer longer tail if it brings another info into contiguity
1490 * 3) Prefer current primary
1492 map
<pg_shard_t
, pg_info_t
>::const_iterator
PeeringState::find_best_info(
1493 const map
<pg_shard_t
, pg_info_t
> &infos
,
1494 bool restrict_to_up_acting
,
1495 bool *history_les_bound
) const
1497 ceph_assert(history_les_bound
);
1498 /* See doc/dev/osd_internals/last_epoch_started.rst before attempting
1499 * to make changes to this process. Also, make sure to update it
1500 * when you find bugs! */
1501 epoch_t max_last_epoch_started_found
= 0;
1502 for (auto i
= infos
.begin(); i
!= infos
.end(); ++i
) {
1503 if (!cct
->_conf
->osd_find_best_info_ignore_history_les
&&
1504 max_last_epoch_started_found
< i
->second
.history
.last_epoch_started
) {
1505 *history_les_bound
= true;
1506 max_last_epoch_started_found
= i
->second
.history
.last_epoch_started
;
1508 if (!i
->second
.is_incomplete() &&
1509 max_last_epoch_started_found
< i
->second
.last_epoch_started
) {
1510 *history_les_bound
= false;
1511 max_last_epoch_started_found
= i
->second
.last_epoch_started
;
1514 eversion_t min_last_update_acceptable
= eversion_t::max();
1515 for (auto i
= infos
.begin(); i
!= infos
.end(); ++i
) {
1516 if (max_last_epoch_started_found
<= i
->second
.last_epoch_started
) {
1517 if (min_last_update_acceptable
> i
->second
.last_update
)
1518 min_last_update_acceptable
= i
->second
.last_update
;
1521 if (min_last_update_acceptable
== eversion_t::max())
1524 auto best
= infos
.end();
1525 // find osd with newest last_update (oldest for ec_pool).
1526 // if there are multiples, prefer
1527 // - a longer tail, if it brings another peer into log contiguity
1528 // - the current primary
1529 for (auto p
= infos
.begin(); p
!= infos
.end(); ++p
) {
1530 if (restrict_to_up_acting
&& !is_up(p
->first
) &&
1531 !is_acting(p
->first
))
1533 // Only consider peers with last_update >= min_last_update_acceptable
1534 if (p
->second
.last_update
< min_last_update_acceptable
)
1536 // Disqualify anyone with a too old last_epoch_started
1537 if (p
->second
.last_epoch_started
< max_last_epoch_started_found
)
1539 // Disqualify anyone who is incomplete (not fully backfilled)
1540 if (p
->second
.is_incomplete())
1542 if (best
== infos
.end()) {
1546 // Prefer newer last_update
1547 if (pool
.info
.require_rollback()) {
1548 if (p
->second
.last_update
> best
->second
.last_update
)
1550 if (p
->second
.last_update
< best
->second
.last_update
) {
1555 if (p
->second
.last_update
< best
->second
.last_update
)
1557 if (p
->second
.last_update
> best
->second
.last_update
) {
1563 // Prefer longer tail
1564 if (p
->second
.log_tail
> best
->second
.log_tail
) {
1566 } else if (p
->second
.log_tail
< best
->second
.log_tail
) {
1571 if (!p
->second
.has_missing() && best
->second
.has_missing()) {
1572 psdout(10) << __func__
<< " prefer osd." << p
->first
1573 << " because it is complete while best has missing"
1577 } else if (p
->second
.has_missing() && !best
->second
.has_missing()) {
1578 psdout(10) << __func__
<< " skipping osd." << p
->first
1579 << " because it has missing while best is complete"
1583 // both are complete or have missing
1587 // prefer current primary (usually the caller), all things being equal
1588 if (p
->first
== pg_whoami
) {
1589 psdout(10) << "calc_acting prefer osd." << p
->first
1590 << " because it is current primary" << dendl
;
1598 void PeeringState::calc_ec_acting(
1599 map
<pg_shard_t
, pg_info_t
>::const_iterator auth_log_shard
,
1601 const vector
<int> &acting
,
1602 const vector
<int> &up
,
1603 const map
<pg_shard_t
, pg_info_t
> &all_info
,
1604 bool restrict_to_up_acting
,
1606 set
<pg_shard_t
> *backfill
,
1607 set
<pg_shard_t
> *acting_backfill
,
1610 vector
<int> want(size
, CRUSH_ITEM_NONE
);
1611 map
<shard_id_t
, set
<pg_shard_t
> > all_info_by_shard
;
1612 for (auto i
= all_info
.begin();
1613 i
!= all_info
.end();
1615 all_info_by_shard
[i
->first
.shard
].insert(i
->first
);
1617 for (uint8_t i
= 0; i
< want
.size(); ++i
) {
1618 ss
<< "For position " << (unsigned)i
<< ": ";
1619 if (up
.size() > (unsigned)i
&& up
[i
] != CRUSH_ITEM_NONE
&&
1620 !all_info
.find(pg_shard_t(up
[i
], shard_id_t(i
)))->second
.is_incomplete() &&
1621 all_info
.find(pg_shard_t(up
[i
], shard_id_t(i
)))->second
.last_update
>=
1622 auth_log_shard
->second
.log_tail
) {
1623 ss
<< " selecting up[i]: " << pg_shard_t(up
[i
], shard_id_t(i
)) << std::endl
;
1627 if (up
.size() > (unsigned)i
&& up
[i
] != CRUSH_ITEM_NONE
) {
1628 ss
<< " backfilling up[i]: " << pg_shard_t(up
[i
], shard_id_t(i
))
1630 backfill
->insert(pg_shard_t(up
[i
], shard_id_t(i
)));
1633 if (acting
.size() > (unsigned)i
&& acting
[i
] != CRUSH_ITEM_NONE
&&
1634 !all_info
.find(pg_shard_t(acting
[i
], shard_id_t(i
)))->second
.is_incomplete() &&
1635 all_info
.find(pg_shard_t(acting
[i
], shard_id_t(i
)))->second
.last_update
>=
1636 auth_log_shard
->second
.log_tail
) {
1637 ss
<< " selecting acting[i]: " << pg_shard_t(acting
[i
], shard_id_t(i
)) << std::endl
;
1638 want
[i
] = acting
[i
];
1639 } else if (!restrict_to_up_acting
) {
1640 for (auto j
= all_info_by_shard
[shard_id_t(i
)].begin();
1641 j
!= all_info_by_shard
[shard_id_t(i
)].end();
1643 ceph_assert(j
->shard
== i
);
1644 if (!all_info
.find(*j
)->second
.is_incomplete() &&
1645 all_info
.find(*j
)->second
.last_update
>=
1646 auth_log_shard
->second
.log_tail
) {
1647 ss
<< " selecting stray: " << *j
<< std::endl
;
1652 if (want
[i
] == CRUSH_ITEM_NONE
)
1653 ss
<< " failed to fill position " << (int)i
<< std::endl
;
1657 for (uint8_t i
= 0; i
< want
.size(); ++i
) {
1658 if (want
[i
] != CRUSH_ITEM_NONE
) {
1659 acting_backfill
->insert(pg_shard_t(want
[i
], shard_id_t(i
)));
1662 acting_backfill
->insert(backfill
->begin(), backfill
->end());
1666 std::pair
<map
<pg_shard_t
, pg_info_t
>::const_iterator
, eversion_t
>
1667 PeeringState::select_replicated_primary(
1668 map
<pg_shard_t
, pg_info_t
>::const_iterator auth_log_shard
,
1669 uint64_t force_auth_primary_missing_objects
,
1670 const std::vector
<int> &up
,
1671 pg_shard_t up_primary
,
1672 const map
<pg_shard_t
, pg_info_t
> &all_info
,
1673 const OSDMapRef osdmap
,
1676 pg_shard_t auth_log_shard_id
= auth_log_shard
->first
;
1678 ss
<< __func__
<< " newest update on osd." << auth_log_shard_id
1679 << " with " << auth_log_shard
->second
<< std::endl
;
1682 auto primary
= all_info
.find(up_primary
);
1684 !primary
->second
.is_incomplete() &&
1685 primary
->second
.last_update
>=
1686 auth_log_shard
->second
.log_tail
) {
1687 if (HAVE_FEATURE(osdmap
->get_up_osd_features(), SERVER_NAUTILUS
)) {
1688 auto approx_missing_objects
=
1689 primary
->second
.stats
.stats
.sum
.num_objects_missing
;
1690 auto auth_version
= auth_log_shard
->second
.last_update
.version
;
1691 auto primary_version
= primary
->second
.last_update
.version
;
1692 if (auth_version
> primary_version
) {
1693 approx_missing_objects
+= auth_version
- primary_version
;
1695 approx_missing_objects
+= primary_version
- auth_version
;
1697 if ((uint64_t)approx_missing_objects
>
1698 force_auth_primary_missing_objects
) {
1699 primary
= auth_log_shard
;
1700 ss
<< "up_primary: " << up_primary
<< ") has approximate "
1701 << approx_missing_objects
1702 << "(>" << force_auth_primary_missing_objects
<<") "
1703 << "missing objects, osd." << auth_log_shard_id
1704 << " selected as primary instead"
1707 ss
<< "up_primary: " << up_primary
<< ") selected as primary"
1711 ss
<< "up_primary: " << up_primary
<< ") selected as primary" << std::endl
;
1714 ceph_assert(!auth_log_shard
->second
.is_incomplete());
1715 ss
<< "up[0] needs backfill, osd." << auth_log_shard_id
1716 << " selected as primary instead" << std::endl
;
1717 primary
= auth_log_shard
;
1720 ss
<< __func__
<< " primary is osd." << primary
->first
1721 << " with " << primary
->second
<< std::endl
;
1723 /* We include auth_log_shard->second.log_tail because in GetLog,
1724 * we will request logs back to the min last_update over our
1725 * acting_backfill set, which will result in our log being extended
1726 * as far backwards as necessary to pick up any peers which can
1727 * be log recovered by auth_log_shard's log */
1728 eversion_t oldest_auth_log_entry
=
1729 std::min(primary
->second
.log_tail
, auth_log_shard
->second
.log_tail
);
1731 return std::make_pair(primary
, oldest_auth_log_entry
);
1736 * calculate the desired acting set.
1738 * Choose an appropriate acting set. Prefer up[0], unless it is
1739 * incomplete, or another osd has a longer tail that allows us to
1740 * bring other up nodes up to date.
1742 void PeeringState::calc_replicated_acting(
1743 map
<pg_shard_t
, pg_info_t
>::const_iterator primary
,
1744 eversion_t oldest_auth_log_entry
,
1746 const vector
<int> &acting
,
1747 const vector
<int> &up
,
1748 pg_shard_t up_primary
,
1749 const map
<pg_shard_t
, pg_info_t
> &all_info
,
1750 bool restrict_to_up_acting
,
1752 set
<pg_shard_t
> *backfill
,
1753 set
<pg_shard_t
> *acting_backfill
,
1754 const OSDMapRef osdmap
,
1758 ss
<< __func__
<< (restrict_to_up_acting
? " restrict_to_up_acting" : "")
1761 want
->push_back(primary
->first
.osd
);
1762 acting_backfill
->insert(primary
->first
);
1764 // select replicas that have log contiguity with primary.
1765 // prefer up, then acting, then any peer_info osds
1767 pg_shard_t up_cand
= pg_shard_t(i
, shard_id_t::NO_SHARD
);
1768 if (up_cand
== primary
->first
)
1770 const pg_info_t
&cur_info
= all_info
.find(up_cand
)->second
;
1771 if (cur_info
.is_incomplete() ||
1772 cur_info
.last_update
< oldest_auth_log_entry
) {
1773 ss
<< " shard " << up_cand
<< " (up) backfill " << cur_info
<< std::endl
;
1774 backfill
->insert(up_cand
);
1775 acting_backfill
->insert(up_cand
);
1778 acting_backfill
->insert(up_cand
);
1779 ss
<< " osd." << i
<< " (up) accepted " << cur_info
<< std::endl
;
1783 if (want
->size() >= size
) {
1787 std::vector
<std::pair
<eversion_t
, int>> candidate_by_last_update
;
1788 candidate_by_last_update
.reserve(acting
.size());
1789 // This no longer has backfill OSDs, but they are covered above.
1790 for (auto i
: acting
) {
1791 pg_shard_t
acting_cand(i
, shard_id_t::NO_SHARD
);
1792 // skip up osds we already considered above
1793 if (acting_cand
== primary
->first
)
1795 auto up_it
= find(up
.begin(), up
.end(), i
);
1796 if (up_it
!= up
.end())
1799 const pg_info_t
&cur_info
= all_info
.find(acting_cand
)->second
;
1800 if (cur_info
.is_incomplete() ||
1801 cur_info
.last_update
< oldest_auth_log_entry
) {
1802 ss
<< " shard " << acting_cand
<< " (acting) REJECTED "
1803 << cur_info
<< std::endl
;
1805 candidate_by_last_update
.emplace_back(cur_info
.last_update
, i
);
1809 auto sort_by_eversion
=[](const std::pair
<eversion_t
, int> &lhs
,
1810 const std::pair
<eversion_t
, int> &rhs
) {
1811 return lhs
.first
> rhs
.first
;
1813 // sort by last_update, in descending order.
1814 std::sort(candidate_by_last_update
.begin(),
1815 candidate_by_last_update
.end(), sort_by_eversion
);
1816 for (auto &p
: candidate_by_last_update
) {
1817 ceph_assert(want
->size() < size
);
1818 want
->push_back(p
.second
);
1819 pg_shard_t s
= pg_shard_t(p
.second
, shard_id_t::NO_SHARD
);
1820 acting_backfill
->insert(s
);
1821 ss
<< " shard " << s
<< " (acting) accepted "
1822 << all_info
.find(s
)->second
<< std::endl
;
1823 if (want
->size() >= size
) {
1828 if (restrict_to_up_acting
) {
1831 candidate_by_last_update
.clear();
1832 candidate_by_last_update
.reserve(all_info
.size()); // overestimate but fine
1833 // continue to search stray to find more suitable peers
1834 for (auto &i
: all_info
) {
1835 // skip up osds we already considered above
1836 if (i
.first
== primary
->first
)
1838 auto up_it
= find(up
.begin(), up
.end(), i
.first
.osd
);
1839 if (up_it
!= up
.end())
1841 auto acting_it
= find(
1842 acting
.begin(), acting
.end(), i
.first
.osd
);
1843 if (acting_it
!= acting
.end())
1846 if (i
.second
.is_incomplete() ||
1847 i
.second
.last_update
< oldest_auth_log_entry
) {
1848 ss
<< " shard " << i
.first
<< " (stray) REJECTED " << i
.second
1851 candidate_by_last_update
.emplace_back(
1852 i
.second
.last_update
, i
.first
.osd
);
1856 if (candidate_by_last_update
.empty()) {
1857 // save us some effort
1861 // sort by last_update, in descending order.
1862 std::sort(candidate_by_last_update
.begin(),
1863 candidate_by_last_update
.end(), sort_by_eversion
);
1865 for (auto &p
: candidate_by_last_update
) {
1866 ceph_assert(want
->size() < size
);
1867 want
->push_back(p
.second
);
1868 pg_shard_t s
= pg_shard_t(p
.second
, shard_id_t::NO_SHARD
);
1869 acting_backfill
->insert(s
);
1870 ss
<< " shard " << s
<< " (stray) accepted "
1871 << all_info
.find(s
)->second
<< std::endl
;
1872 if (want
->size() >= size
) {
1878 // Defines osd preference order: acting set, then larger last_update
1879 using osd_ord_t
= std::tuple
<bool, eversion_t
>; // <acting, last_update>
1880 using osd_id_t
= int;
1882 class bucket_candidates_t
{
1883 std::deque
<std::pair
<osd_ord_t
, osd_id_t
>> osds
;
1887 void add_osd(osd_ord_t ord
, osd_id_t osd
) {
1888 // osds will be added in smallest to largest order
1889 assert(osds
.empty() || osds
.back().first
<= ord
);
1890 osds
.push_back(std::make_pair(ord
, osd
));
1892 osd_id_t
pop_osd() {
1893 ceph_assert(!is_empty());
1894 auto ret
= osds
.back();
1899 void inc_selected() { selected
++; }
1900 unsigned get_num_selected() const { return selected
; }
1902 osd_ord_t
get_ord() const {
1903 return osds
.empty() ? std::make_tuple(false, eversion_t())
1904 : osds
.back().first
;
1907 bool is_empty() const { return osds
.empty(); }
1909 bool operator<(const bucket_candidates_t
&rhs
) const {
1910 return std::make_tuple(-selected
, get_ord()) <
1911 std::make_tuple(-rhs
.selected
, rhs
.get_ord());
1914 friend std::ostream
&operator<<(std::ostream
&, const bucket_candidates_t
&);
1917 std::ostream
&operator<<(std::ostream
&lhs
, const bucket_candidates_t
&cand
)
1919 return lhs
<< "candidates[" << cand
.osds
<< "]";
1922 class bucket_heap_t
{
1923 using elem_t
= std::reference_wrapper
<bucket_candidates_t
>;
1924 std::vector
<elem_t
> heap
;
1926 // Max heap -- should emit buckets in order of preference
1928 bool operator()(const elem_t
&lhs
, const elem_t
&rhs
) {
1929 return lhs
.get() < rhs
.get();
1933 void push_if_nonempty(elem_t e
) {
1934 if (!e
.get().is_empty()) {
1936 std::push_heap(heap
.begin(), heap
.end(), comp());
1940 std::pop_heap(heap
.begin(), heap
.end(), comp());
1941 auto ret
= heap
.back();
1946 bool is_empty() const { return heap
.empty(); }
1950 * calc_replicated_acting_stretch
1952 * Choose an acting set using as much of the up set as possible; filling
1953 * in the remaining slots so as to maximize the number of crush buckets at
1954 * level pool.info.peering_crush_bucket_barrier represented.
1956 * Stretch clusters are a bit special: while they have a "size" the
1957 * same way as normal pools, if we happen to lose a data center
1958 * (we call it a "stretch bucket", but really it'll be a data center or
1959 * a cloud availability zone), we don't actually want to shove
1960 * 2 DC's worth of replication into a single site -- it won't fit!
1961 * So we locally calculate a bucket_max, based
1962 * on the targeted number of stretch buckets for the pool and
1963 * its size. Then we won't pull more than bucket_max from any
1964 * given ancestor even if it leaves us undersized.
1966 * There are two distinct phases: (commented below)
1968 void PeeringState::calc_replicated_acting_stretch(
1969 map
<pg_shard_t
, pg_info_t
>::const_iterator primary
,
1970 eversion_t oldest_auth_log_entry
,
1972 const vector
<int> &acting
,
1973 const vector
<int> &up
,
1974 pg_shard_t up_primary
,
1975 const map
<pg_shard_t
, pg_info_t
> &all_info
,
1976 bool restrict_to_up_acting
,
1978 set
<pg_shard_t
> *backfill
,
1979 set
<pg_shard_t
> *acting_backfill
,
1980 const OSDMapRef osdmap
,
1985 ceph_assert(acting_backfill
);
1986 ceph_assert(backfill
);
1987 ss
<< __func__
<< (restrict_to_up_acting
? " restrict_to_up_acting" : "")
1990 auto used
= [want
](int osd
) {
1991 return std::find(want
->begin(), want
->end(), osd
) != want
->end();
1994 auto usable_info
= [&](const auto &cur_info
) mutable {
1995 return !(cur_info
.is_incomplete() ||
1996 cur_info
.last_update
< oldest_auth_log_entry
);
1999 auto osd_info
= [&](int osd
) mutable -> const pg_info_t
& {
2000 pg_shard_t cand
= pg_shard_t(osd
, shard_id_t::NO_SHARD
);
2001 const pg_info_t
&cur_info
= all_info
.find(cand
)->second
;
2005 auto usable_osd
= [&](int osd
) mutable {
2006 return usable_info(osd_info(osd
));
2009 std::map
<int, bucket_candidates_t
> ancestors
;
2010 auto get_ancestor
= [&](int osd
) mutable {
2011 int ancestor
= osdmap
->crush
->get_parent_of_type(
2013 pool
.info
.peering_crush_bucket_barrier
,
2014 pool
.info
.crush_rule
);
2015 return &ancestors
[ancestor
];
2018 unsigned bucket_max
= pool
.info
.size
/ pool
.info
.peering_crush_bucket_target
;
2019 if (bucket_max
* pool
.info
.peering_crush_bucket_target
< pool
.info
.size
) {
2023 /* 1) Select all usable osds from the up set as well as the primary
2025 * We also stash any unusable osds from up into backfill.
2027 auto add_required
= [&](int osd
) {
2029 want
->push_back(osd
);
2030 acting_backfill
->insert(
2031 pg_shard_t(osd
, shard_id_t::NO_SHARD
));
2032 get_ancestor(osd
)->inc_selected();
2035 add_required(primary
->first
.osd
);
2036 ss
<< " osd " << primary
->first
.osd
<< " primary accepted "
2037 << osd_info(primary
->first
.osd
) << std::endl
;
2038 for (auto upcand
: up
) {
2039 auto upshard
= pg_shard_t(upcand
, shard_id_t::NO_SHARD
);
2040 auto &curinfo
= osd_info(upcand
);
2041 if (usable_osd(upcand
)) {
2042 ss
<< " osd " << upcand
<< " (up) accepted " << curinfo
<< std::endl
;
2043 add_required(upcand
);
2045 ss
<< " osd " << upcand
<< " (up) backfill " << curinfo
<< std::endl
;
2046 backfill
->insert(upshard
);
2047 acting_backfill
->insert(upshard
);
2051 if (want
->size() >= pool
.info
.size
) { // non-failed CRUSH mappings are valid
2052 ss
<< " up set sufficient" << std::endl
;
2055 ss
<< " up set insufficient, considering remaining osds" << std::endl
;
2057 /* 2) Fill out remaining slots from usable osds in all_info
2058 * while maximizing the number of ancestor nodes at the
2059 * barrier_id crush level.
2062 std::vector
<std::pair
<osd_ord_t
, osd_id_t
>> candidates
;
2063 /* To do this, we first filter the set of usable osd into an ordered
2064 * list of usable osds
2066 auto get_osd_ord
= [&](bool is_acting
, const pg_info_t
&info
) -> osd_ord_t
{
2067 return std::make_tuple(
2068 !is_acting
/* acting should sort first */,
2071 for (auto &cand
: acting
) {
2072 auto &cand_info
= osd_info(cand
);
2073 if (!used(cand
) && usable_info(cand_info
)) {
2074 ss
<< " acting candidate " << cand
<< " " << cand_info
<< std::endl
;
2075 candidates
.push_back(std::make_pair(get_osd_ord(true, cand_info
), cand
));
2078 if (!restrict_to_up_acting
) {
2079 for (auto &[cand
, info
] : all_info
) {
2080 if (!used(cand
.osd
) && usable_info(info
) &&
2081 (std::find(acting
.begin(), acting
.end(), cand
.osd
)
2083 ss
<< " other candidate " << cand
<< " " << info
<< std::endl
;
2084 candidates
.push_back(
2085 std::make_pair(get_osd_ord(false, info
), cand
.osd
));
2089 std::sort(candidates
.begin(), candidates
.end());
2091 // We then filter these candidates by ancestor
2092 std::for_each(candidates
.begin(), candidates
.end(), [&](auto cand
) {
2093 get_ancestor(cand
.second
)->add_osd(cand
.first
, cand
.second
);
2097 auto pop_ancestor
= [&](auto &ancestor
) {
2098 ceph_assert(!ancestor
.is_empty());
2099 auto osd
= ancestor
.pop_osd();
2101 ss
<< " accepting candidate " << osd
<< std::endl
;
2103 ceph_assert(!used(osd
));
2104 ceph_assert(usable_osd(osd
));
2106 want
->push_back(osd
);
2107 acting_backfill
->insert(
2108 pg_shard_t(osd
, shard_id_t::NO_SHARD
));
2109 ancestor
.inc_selected();
2112 /* Next, we use the ancestors map to grab a descendant of the
2113 * peering_crush_mandatory_member if not already represented.
2115 * TODO: using 0 here to match other users. Prior to merge, I
2116 * expect that this and other users should instead check against
2119 if (pool
.info
.peering_crush_mandatory_member
!= CRUSH_ITEM_NONE
) {
2120 auto aiter
= ancestors
.find(pool
.info
.peering_crush_mandatory_member
);
2121 if (aiter
!= ancestors
.end() &&
2122 !aiter
->second
.get_num_selected()) {
2123 ss
<< " adding required ancestor " << aiter
->first
<< std::endl
;
2124 ceph_assert(!aiter
->second
.is_empty()); // wouldn't exist otherwise
2125 pop_ancestor(aiter
->second
);
2129 /* We then place the ancestors in a heap ordered by fewest selected
2130 * and then by the ordering token of the next osd */
2131 bucket_heap_t aheap
;
2132 std::for_each(ancestors
.begin(), ancestors
.end(), [&](auto &anc
) {
2133 aheap
.push_if_nonempty(anc
.second
);
2136 /* and pull from this heap until it's empty or we have enough.
2137 * "We have enough" is a sufficient check here for
2138 * stretch_set_can_peer() because our heap sorting always
2139 * pulls from ancestors with the least number of included OSDs,
2140 * so if it is possible to satisfy the bucket_count constraints we
2143 while (!aheap
.is_empty() && want
->size() < pool
.info
.size
) {
2144 auto next
= aheap
.pop();
2145 pop_ancestor(next
.get());
2146 if (next
.get().get_num_selected() < bucket_max
) {
2147 aheap
.push_if_nonempty(next
);
2151 /* The end result is that we should have as many buckets covered as
2152 * possible while respecting up, the primary selection,
2153 * the pool size (given bucket count constraints),
2154 * and the mandatory member.
2159 bool PeeringState::recoverable(const vector
<int> &want
) const
2161 unsigned num_want_acting
= 0;
2162 set
<pg_shard_t
> have
;
2163 for (int i
= 0; i
< (int)want
.size(); ++i
) {
2164 if (want
[i
] != CRUSH_ITEM_NONE
) {
2169 pool
.info
.is_erasure() ? shard_id_t(i
) : shard_id_t::NO_SHARD
));
2173 if (num_want_acting
< pool
.info
.min_size
) {
2174 const bool recovery_ec_pool_below_min_size
=
2175 HAVE_FEATURE(get_osdmap()->get_up_osd_features(), SERVER_OCTOPUS
);
2177 if (pool
.info
.is_erasure() && !recovery_ec_pool_below_min_size
) {
2178 psdout(10) << __func__
<< " failed, ec recovery below min size not supported by pre-octopus" << dendl
;
2180 } else if (!cct
->_conf
.get_val
<bool>("osd_allow_recovery_below_min_size")) {
2181 psdout(10) << __func__
<< " failed, recovery below min size not enabled" << dendl
;
2185 if (missing_loc
.get_recoverable_predicate()(have
)) {
2188 psdout(10) << __func__
<< " failed, not recoverable " << dendl
;
2193 void PeeringState::choose_async_recovery_ec(
2194 const map
<pg_shard_t
, pg_info_t
> &all_info
,
2195 const pg_info_t
&auth_info
,
2197 set
<pg_shard_t
> *async_recovery
,
2198 const OSDMapRef osdmap
) const
2200 set
<pair
<int, pg_shard_t
> > candidates_by_cost
;
2201 for (uint8_t i
= 0; i
< want
->size(); ++i
) {
2202 if ((*want
)[i
] == CRUSH_ITEM_NONE
)
2205 // Considering log entries to recover is accurate enough for
2206 // now. We could use minimum_to_decode_with_cost() later if
2208 pg_shard_t
shard_i((*want
)[i
], shard_id_t(i
));
2209 // do not include strays
2210 if (stray_set
.find(shard_i
) != stray_set
.end())
2212 // Do not include an osd that is not up, since choosing it as
2213 // an async_recovery_target will move it out of the acting set.
2214 // This results in it being identified as a stray during peering,
2215 // because it is no longer in the up or acting set.
2216 if (!is_up(shard_i
))
2218 auto shard_info
= all_info
.find(shard_i
)->second
;
2219 // for ec pools we rollback all entries past the authoritative
2220 // last_update *before* activation. This is relatively inexpensive
2221 // compared to recovery, since it is purely local, so treat shards
2222 // past the authoritative last_update the same as those equal to it.
2223 version_t auth_version
= auth_info
.last_update
.version
;
2224 version_t candidate_version
= shard_info
.last_update
.version
;
2225 if (HAVE_FEATURE(osdmap
->get_up_osd_features(), SERVER_NAUTILUS
)) {
2226 auto approx_missing_objects
=
2227 shard_info
.stats
.stats
.sum
.num_objects_missing
;
2228 if (auth_version
> candidate_version
) {
2229 approx_missing_objects
+= auth_version
- candidate_version
;
2231 if (static_cast<uint64_t>(approx_missing_objects
) >
2232 cct
->_conf
.get_val
<uint64_t>("osd_async_recovery_min_cost")) {
2233 candidates_by_cost
.emplace(approx_missing_objects
, shard_i
);
2236 if (auth_version
> candidate_version
&&
2237 (auth_version
- candidate_version
) > cct
->_conf
.get_val
<uint64_t>("osd_async_recovery_min_cost")) {
2238 candidates_by_cost
.insert(make_pair(auth_version
- candidate_version
, shard_i
));
2243 psdout(20) << __func__
<< " candidates by cost are: " << candidates_by_cost
2246 // take out as many osds as we can for async recovery, in order of cost
2247 for (auto rit
= candidates_by_cost
.rbegin();
2248 rit
!= candidates_by_cost
.rend(); ++rit
) {
2249 pg_shard_t cur_shard
= rit
->second
;
2250 vector
<int> candidate_want(*want
);
2251 candidate_want
[cur_shard
.shard
.id
] = CRUSH_ITEM_NONE
;
2252 if (recoverable(candidate_want
)) {
2253 want
->swap(candidate_want
);
2254 async_recovery
->insert(cur_shard
);
2257 psdout(20) << __func__
<< " result want=" << *want
2258 << " async_recovery=" << *async_recovery
<< dendl
;
2261 void PeeringState::choose_async_recovery_replicated(
2262 const map
<pg_shard_t
, pg_info_t
> &all_info
,
2263 const pg_info_t
&auth_info
,
2265 set
<pg_shard_t
> *async_recovery
,
2266 const OSDMapRef osdmap
) const
2268 set
<pair
<int, pg_shard_t
> > candidates_by_cost
;
2269 for (auto osd_num
: *want
) {
2270 pg_shard_t
shard_i(osd_num
, shard_id_t::NO_SHARD
);
2271 // do not include strays
2272 if (stray_set
.find(shard_i
) != stray_set
.end())
2274 // Do not include an osd that is not up, since choosing it as
2275 // an async_recovery_target will move it out of the acting set.
2276 // This results in it being identified as a stray during peering,
2277 // because it is no longer in the up or acting set.
2278 if (!is_up(shard_i
))
2280 auto shard_info
= all_info
.find(shard_i
)->second
;
2281 // use the approximate magnitude of the difference in length of
2282 // logs plus historical missing objects as the cost of recovery
2283 version_t auth_version
= auth_info
.last_update
.version
;
2284 version_t candidate_version
= shard_info
.last_update
.version
;
2285 if (HAVE_FEATURE(osdmap
->get_up_osd_features(), SERVER_NAUTILUS
)) {
2286 auto approx_missing_objects
=
2287 shard_info
.stats
.stats
.sum
.num_objects_missing
;
2288 if (auth_version
> candidate_version
) {
2289 approx_missing_objects
+= auth_version
- candidate_version
;
2291 approx_missing_objects
+= candidate_version
- auth_version
;
2293 if (static_cast<uint64_t>(approx_missing_objects
) >
2294 cct
->_conf
.get_val
<uint64_t>("osd_async_recovery_min_cost")) {
2295 candidates_by_cost
.emplace(approx_missing_objects
, shard_i
);
2298 size_t approx_entries
;
2299 if (auth_version
> candidate_version
) {
2300 approx_entries
= auth_version
- candidate_version
;
2302 approx_entries
= candidate_version
- auth_version
;
2304 if (approx_entries
> cct
->_conf
.get_val
<uint64_t>("osd_async_recovery_min_cost")) {
2305 candidates_by_cost
.insert(make_pair(approx_entries
, shard_i
));
2310 psdout(20) << __func__
<< " candidates by cost are: " << candidates_by_cost
2312 // take out as many osds as we can for async recovery, in order of cost
2313 for (auto rit
= candidates_by_cost
.rbegin();
2314 rit
!= candidates_by_cost
.rend(); ++rit
) {
2315 if (want
->size() <= pool
.info
.min_size
) {
2318 pg_shard_t cur_shard
= rit
->second
;
2319 vector
<int> candidate_want(*want
);
2320 for (auto it
= candidate_want
.begin(); it
!= candidate_want
.end(); ++it
) {
2321 if (*it
== cur_shard
.osd
) {
2322 candidate_want
.erase(it
);
2323 if (pool
.info
.stretch_set_can_peer(candidate_want
, *osdmap
, NULL
)) {
2324 // if we're in stretch mode, we can only remove the osd if it doesn't
2325 // break peering limits.
2326 want
->swap(candidate_want
);
2327 async_recovery
->insert(cur_shard
);
2334 psdout(20) << __func__
<< " result want=" << *want
2335 << " async_recovery=" << *async_recovery
<< dendl
;
2341 * calculate the desired acting, and request a change with the monitor
2342 * if it differs from the current acting.
2344 * if restrict_to_up_acting=true, we filter out anything that's not in
2345 * up/acting. in order to lift this restriction, we need to
2346 * 1) check whether it's worth switching the acting set any time we get
2347 * a new pg info (not just here, when recovery finishes)
2348 * 2) check whether anything in want_acting went down on each new map
2349 * (and, if so, calculate a new want_acting)
2350 * 3) remove the assertion in PG::PeeringState::Active::react(const AdvMap)
2353 bool PeeringState::choose_acting(pg_shard_t
&auth_log_shard_id
,
2354 bool restrict_to_up_acting
,
2355 bool *history_les_bound
,
2356 bool request_pg_temp_change_only
)
2358 map
<pg_shard_t
, pg_info_t
> all_info(peer_info
.begin(), peer_info
.end());
2359 all_info
[pg_whoami
] = info
;
2361 if (cct
->_conf
->subsys
.should_gather
<dout_subsys
, 10>()) {
2362 for (auto p
= all_info
.begin(); p
!= all_info
.end(); ++p
) {
2363 psdout(10) << __func__
<< " all_info osd." << p
->first
<< " "
2364 << p
->second
<< dendl
;
2368 auto auth_log_shard
= find_best_info(all_info
, restrict_to_up_acting
,
2371 if (auth_log_shard
== all_info
.end()) {
2373 psdout(10) << __func__
<< " no suitable info found (incomplete backfills?),"
2374 << " reverting to up" << dendl
;
2377 pl
->queue_want_pg_temp(empty
);
2379 psdout(10) << __func__
<< " failed" << dendl
;
2380 ceph_assert(want_acting
.empty());
2385 ceph_assert(!auth_log_shard
->second
.is_incomplete());
2386 auth_log_shard_id
= auth_log_shard
->first
;
2388 set
<pg_shard_t
> want_backfill
, want_acting_backfill
;
2391 if (pool
.info
.is_replicated()) {
2392 auto [primary_shard
, oldest_log
] = select_replicated_primary(
2394 cct
->_conf
.get_val
<uint64_t>(
2395 "osd_force_auth_primary_missing_objects"),
2401 if (pool
.info
.is_stretch_pool()) {
2402 calc_replicated_acting_stretch(
2405 get_osdmap()->get_pg_size(info
.pgid
.pgid
),
2410 restrict_to_up_acting
,
2413 &want_acting_backfill
,
2418 calc_replicated_acting(
2421 get_osdmap()->get_pg_size(info
.pgid
.pgid
),
2426 restrict_to_up_acting
,
2429 &want_acting_backfill
,
2437 get_osdmap()->get_pg_size(info
.pgid
.pgid
),
2441 restrict_to_up_acting
,
2444 &want_acting_backfill
,
2447 psdout(10) << ss
.str() << dendl
;
2449 if (!recoverable(want
)) {
2450 want_acting
.clear();
2454 set
<pg_shard_t
> want_async_recovery
;
2455 if (HAVE_FEATURE(get_osdmap()->get_up_osd_features(), SERVER_MIMIC
)) {
2456 if (pool
.info
.is_erasure()) {
2457 choose_async_recovery_ec(
2458 all_info
, auth_log_shard
->second
, &want
, &want_async_recovery
,
2461 choose_async_recovery_replicated(
2462 all_info
, auth_log_shard
->second
, &want
, &want_async_recovery
,
2466 while (want
.size() > pool
.info
.size
) {
2467 // async recovery should have taken out as many osds as it can.
2468 // if not, then always evict the last peer
2469 // (will get synchronously recovered later)
2470 psdout(10) << __func__
<< " evicting osd." << want
.back()
2471 << " from oversized want " << want
<< dendl
;
2474 if (want
!= acting
) {
2475 psdout(10) << __func__
<< " want " << want
<< " != acting " << acting
2476 << ", requesting pg_temp change" << dendl
;
2479 if (!cct
->_conf
->osd_debug_no_acting_change
) {
2480 if (want_acting
== up
) {
2481 // There can't be any pending backfill if
2482 // want is the same as crush map up OSDs.
2483 ceph_assert(want_backfill
.empty());
2485 pl
->queue_want_pg_temp(empty
);
2487 pl
->queue_want_pg_temp(want
);
2492 if (request_pg_temp_change_only
)
2494 want_acting
.clear();
2495 acting_recovery_backfill
= want_acting_backfill
;
2496 psdout(10) << "acting_recovery_backfill is "
2497 << acting_recovery_backfill
<< dendl
;
2499 backfill_targets
.empty() ||
2500 backfill_targets
== want_backfill
);
2501 if (backfill_targets
.empty()) {
2502 // Caller is GetInfo
2503 backfill_targets
= want_backfill
;
2505 // Adding !needs_recovery() to let the async_recovery_targets reset after recovery is complete
2507 async_recovery_targets
.empty() ||
2508 async_recovery_targets
== want_async_recovery
||
2510 if (async_recovery_targets
.empty() || !needs_recovery()) {
2511 async_recovery_targets
= want_async_recovery
;
2513 // Will not change if already set because up would have had to change
2514 // Verify that nothing in backfill is in stray_set
2515 for (auto i
= want_backfill
.begin(); i
!= want_backfill
.end(); ++i
) {
2516 ceph_assert(stray_set
.find(*i
) == stray_set
.end());
2518 psdout(10) << "choose_acting want=" << want
<< " backfill_targets="
2519 << want_backfill
<< " async_recovery_targets="
2520 << async_recovery_targets
<< dendl
;
2524 void PeeringState::log_weirdness()
2526 if (pg_log
.get_tail() != info
.log_tail
)
2527 pl
->get_clog_error() << info
.pgid
2528 << " info mismatch, log.tail " << pg_log
.get_tail()
2529 << " != info.log_tail " << info
.log_tail
;
2530 if (pg_log
.get_head() != info
.last_update
)
2531 pl
->get_clog_error() << info
.pgid
2532 << " info mismatch, log.head " << pg_log
.get_head()
2533 << " != info.last_update " << info
.last_update
;
2535 if (!pg_log
.get_log().empty()) {
2537 if ((pg_log
.get_log().log
.begin()->version
<= pg_log
.get_tail()))
2538 pl
->get_clog_error() << info
.pgid
2539 << " log bound mismatch, info (tail,head] ("
2540 << pg_log
.get_tail() << ","
2541 << pg_log
.get_head() << "]"
2543 << pg_log
.get_log().log
.begin()->version
<< ","
2544 << pg_log
.get_log().log
.rbegin()->version
<< "]";
2547 if (pg_log
.get_log().caller_ops
.size() > pg_log
.get_log().log
.size()) {
2548 pl
->get_clog_error() << info
.pgid
2549 << " caller_ops.size "
2550 << pg_log
.get_log().caller_ops
.size()
2551 << " > log size " << pg_log
.get_log().log
.size();
2556 * Process information from a replica to determine if it could have any
2557 * objects that i need.
2559 * TODO: if the missing set becomes very large, this could get expensive.
2560 * Instead, we probably want to just iterate over our unfound set.
2562 bool PeeringState::search_for_missing(
2563 const pg_info_t
&oinfo
, const pg_missing_t
&omissing
,
2565 PeeringCtxWrapper
&ctx
)
2567 uint64_t num_unfound_before
= missing_loc
.num_unfound();
2568 bool found_missing
= missing_loc
.add_source_info(
2569 from
, oinfo
, omissing
, ctx
.handle
);
2570 if (found_missing
&& num_unfound_before
!= missing_loc
.num_unfound())
2571 pl
->publish_stats_to_osd();
2572 // avoid doing this if the peer is empty. This is abit of paranoia
2573 // to avoid doing something rash if add_source_info() above
2574 // incorrectly decided we found something new. (if the peer has
2575 // last_update=0'0 that's impossible.)
2576 if (found_missing
&&
2577 oinfo
.last_update
!= eversion_t()) {
2578 pg_info_t
tinfo(oinfo
);
2579 tinfo
.pgid
.shard
= pg_whoami
.shard
;
2582 spg_t(info
.pgid
.pgid
, from
.shard
),
2583 get_osdmap_epoch(), // fixme: use lower epoch?
2587 return found_missing
;
2590 bool PeeringState::discover_all_missing(
2591 BufferedRecoveryMessages
&rctx
)
2593 auto &missing
= pg_log
.get_missing();
2594 uint64_t unfound
= get_num_unfound();
2595 bool any
= false; // did we start any queries
2597 psdout(10) << __func__
<< " "
2598 << missing
.num_missing() << " missing, "
2599 << unfound
<< " unfound"
2602 auto m
= might_have_unfound
.begin();
2603 auto mend
= might_have_unfound
.end();
2604 for (; m
!= mend
; ++m
) {
2605 pg_shard_t
peer(*m
);
2607 if (!get_osdmap()->is_up(peer
.osd
)) {
2608 psdout(20) << __func__
<< " skipping down osd." << peer
<< dendl
;
2612 if (peer_purged
.count(peer
)) {
2613 psdout(20) << __func__
<< " skipping purged osd." << peer
<< dendl
;
2617 auto iter
= peer_info
.find(peer
);
2618 if (iter
!= peer_info
.end() &&
2619 (iter
->second
.is_empty() || iter
->second
.dne())) {
2620 // ignore empty peers
2624 // If we've requested any of this stuff, the pg_missing_t information
2625 // should be on its way.
2626 // TODO: coalsce requested_* into a single data structure
2627 if (peer_missing
.find(peer
) != peer_missing
.end()) {
2628 psdout(20) << __func__
<< ": osd." << peer
2629 << ": we already have pg_missing_t" << dendl
;
2632 if (peer_log_requested
.find(peer
) != peer_log_requested
.end()) {
2633 psdout(20) << __func__
<< ": osd." << peer
2634 << ": in peer_log_requested" << dendl
;
2637 if (peer_missing_requested
.find(peer
) != peer_missing_requested
.end()) {
2638 psdout(20) << __func__
<< ": osd." << peer
2639 << ": in peer_missing_requested" << dendl
;
2644 psdout(10) << __func__
<< ": osd." << peer
<< ": requesting pg_missing_t"
2646 peer_missing_requested
.insert(peer
);
2649 spg_t(info
.pgid
.pgid
, peer
.shard
),
2651 pg_query_t::FULLLOG
,
2652 peer
.shard
, pg_whoami
.shard
,
2653 info
.history
, get_osdmap_epoch()));
2659 /* Build the might_have_unfound set.
2661 * This is used by the primary OSD during recovery.
2663 * This set tracks the OSDs which might have unfound objects that the primary
2664 * OSD needs. As we receive pg_missing_t from each OSD in might_have_unfound, we
2665 * will remove the OSD from the set.
2667 void PeeringState::build_might_have_unfound()
2669 ceph_assert(might_have_unfound
.empty());
2670 ceph_assert(is_primary());
2672 psdout(10) << __func__
<< dendl
;
2674 check_past_interval_bounds();
2676 might_have_unfound
= past_intervals
.get_might_have_unfound(
2678 pool
.info
.is_erasure());
2680 // include any (stray) peers
2681 for (auto p
= peer_info
.begin(); p
!= peer_info
.end(); ++p
)
2682 might_have_unfound
.insert(p
->first
);
2684 psdout(15) << __func__
<< ": built " << might_have_unfound
<< dendl
;
2687 void PeeringState::activate(
2688 ObjectStore::Transaction
& t
,
2689 epoch_t activation_epoch
,
2690 PeeringCtxWrapper
&ctx
)
2692 ceph_assert(!is_peered());
2695 state_clear(PG_STATE_DOWN
);
2697 send_notify
= false;
2700 // only update primary last_epoch_started if we will go active
2701 if (acting_set_writeable()) {
2702 ceph_assert(cct
->_conf
->osd_find_best_info_ignore_history_les
||
2703 info
.last_epoch_started
<= activation_epoch
);
2704 info
.last_epoch_started
= activation_epoch
;
2705 info
.last_interval_started
= info
.history
.same_interval_since
;
2707 } else if (is_acting(pg_whoami
)) {
2708 /* update last_epoch_started on acting replica to whatever the primary sent
2709 * unless it's smaller (could happen if we are going peered rather than
2710 * active, see doc/dev/osd_internals/last_epoch_started.rst) */
2711 if (info
.last_epoch_started
< activation_epoch
) {
2712 info
.last_epoch_started
= activation_epoch
;
2713 info
.last_interval_started
= info
.history
.same_interval_since
;
2717 auto &missing
= pg_log
.get_missing();
2719 min_last_complete_ondisk
= eversion_t(0,0); // we don't know (yet)!
2721 last_update_ondisk
= info
.last_update
;
2723 last_update_applied
= info
.last_update
;
2724 last_rollback_info_trimmed_to_applied
= pg_log
.get_can_rollback_to();
2726 need_up_thru
= false;
2728 // write pg info, log
2730 dirty_big_info
= true; // maybe
2732 pl
->schedule_event_on_commit(
2734 std::make_shared
<PGPeeringEvent
>(
2739 activation_epoch
)));
2741 // init complete pointer
2742 if (missing
.num_missing() == 0) {
2743 psdout(10) << "activate - no missing, moving last_complete " << info
.last_complete
2744 << " -> " << info
.last_update
<< dendl
;
2745 info
.last_complete
= info
.last_update
;
2746 info
.stats
.stats
.sum
.num_objects_missing
= 0;
2747 pg_log
.reset_recovery_pointers();
2749 psdout(10) << "activate - not complete, " << missing
<< dendl
;
2750 info
.stats
.stats
.sum
.num_objects_missing
= missing
.num_missing();
2751 pg_log
.activate_not_complete(info
);
2757 // initialize snap_trimq
2758 interval_set
<snapid_t
> to_trim
;
2759 auto& removed_snaps_queue
= get_osdmap()->get_removed_snaps_queue();
2760 auto p
= removed_snaps_queue
.find(info
.pgid
.pgid
.pool());
2761 if (p
!= removed_snaps_queue
.end()) {
2762 dout(20) << "activate - purged_snaps " << info
.purged_snaps
2763 << " removed_snaps " << p
->second
2765 for (auto q
: p
->second
) {
2766 to_trim
.insert(q
.first
, q
.second
);
2769 interval_set
<snapid_t
> purged
;
2770 purged
.intersection_of(to_trim
, info
.purged_snaps
);
2771 to_trim
.subtract(purged
);
2773 if (HAVE_FEATURE(upacting_features
, SERVER_OCTOPUS
)) {
2774 renew_lease(pl
->get_mnow());
2775 // do not schedule until we are actually activated
2778 // adjust purged_snaps: PG may have been inactive while snaps were pruned
2779 // from the removed_snaps_queue in the osdmap. update local purged_snaps
2780 // reflect only those snaps that we thought were pruned and were still in
2782 info
.purged_snaps
.swap(purged
);
2784 // start up replicas
2785 info
.history
.refresh_prior_readable_until_ub(pl
->get_mnow(),
2786 prior_readable_until_ub
);
2788 ceph_assert(!acting_recovery_backfill
.empty());
2789 for (auto i
= acting_recovery_backfill
.begin();
2790 i
!= acting_recovery_backfill
.end();
2792 if (*i
== pg_whoami
) continue;
2793 pg_shard_t peer
= *i
;
2794 ceph_assert(peer_info
.count(peer
));
2795 pg_info_t
& pi
= peer_info
[peer
];
2797 psdout(10) << "activate peer osd." << peer
<< " " << pi
<< dendl
;
2800 ceph_assert(peer_missing
.count(peer
));
2801 pg_missing_t
& pm
= peer_missing
[peer
];
2803 bool needs_past_intervals
= pi
.dne();
2805 // Save num_bytes for backfill reservation request, can't be negative
2806 peer_bytes
[peer
] = std::max
<int64_t>(0, pi
.stats
.stats
.sum
.num_bytes
);
2808 if (pi
.last_update
== info
.last_update
) {
2810 if (!pi
.last_backfill
.is_max())
2811 pl
->get_clog_info() << info
.pgid
<< " continuing backfill to osd."
2813 << " from (" << pi
.log_tail
<< "," << pi
.last_update
2814 << "] " << pi
.last_backfill
2815 << " to " << info
.last_update
;
2816 if (!pi
.is_empty()) {
2817 psdout(10) << "activate peer osd." << peer
2818 << " is up to date, queueing in pending_activators" << dendl
;
2821 spg_t(info
.pgid
.pgid
, peer
.shard
),
2822 get_osdmap_epoch(), // fixme: use lower epoch?
2827 psdout(10) << "activate peer osd." << peer
2828 << " is up to date, but sending pg_log anyway" << dendl
;
2829 m
= make_message
<MOSDPGLog
>(
2830 i
->shard
, pg_whoami
.shard
,
2831 get_osdmap_epoch(), info
,
2832 last_peering_reset
);
2835 pg_log
.get_tail() > pi
.last_update
||
2836 pi
.last_backfill
== hobject_t() ||
2837 (backfill_targets
.count(*i
) && pi
.last_backfill
.is_max())) {
2838 /* ^ This last case covers a situation where a replica is not contiguous
2839 * with the auth_log, but is contiguous with this replica. Reshuffling
2840 * the active set to handle this would be tricky, so instead we just go
2841 * ahead and backfill it anyway. This is probably preferrable in any
2842 * case since the replica in question would have to be significantly
2846 pl
->get_clog_debug() << info
.pgid
<< " starting backfill to osd." << peer
2847 << " from (" << pi
.log_tail
<< "," << pi
.last_update
2848 << "] " << pi
.last_backfill
2849 << " to " << info
.last_update
;
2851 pi
.last_update
= info
.last_update
;
2852 pi
.last_complete
= info
.last_update
;
2853 pi
.set_last_backfill(hobject_t());
2854 pi
.last_epoch_started
= info
.last_epoch_started
;
2855 pi
.last_interval_started
= info
.last_interval_started
;
2856 pi
.history
= info
.history
;
2857 pi
.hit_set
= info
.hit_set
;
2858 pi
.stats
.stats
.clear();
2859 pi
.stats
.stats
.sum
.num_bytes
= peer_bytes
[peer
];
2861 // initialize peer with our purged_snaps.
2862 pi
.purged_snaps
= info
.purged_snaps
;
2864 m
= make_message
<MOSDPGLog
>(
2865 i
->shard
, pg_whoami
.shard
,
2866 get_osdmap_epoch(), pi
,
2867 last_peering_reset
/* epoch to create pg at */);
2869 // send some recent log, so that op dup detection works well.
2870 m
->log
.copy_up_to(cct
, pg_log
.get_log(),
2871 cct
->_conf
->osd_max_pg_log_entries
);
2872 m
->info
.log_tail
= m
->log
.tail
;
2873 pi
.log_tail
= m
->log
.tail
; // sigh...
2878 ceph_assert(pg_log
.get_tail() <= pi
.last_update
);
2879 m
= make_message
<MOSDPGLog
>(
2880 i
->shard
, pg_whoami
.shard
,
2881 get_osdmap_epoch(), info
,
2882 last_peering_reset
/* epoch to create pg at */);
2883 // send new stuff to append to replicas log
2884 m
->log
.copy_after(cct
, pg_log
.get_log(), pi
.last_update
);
2887 // share past_intervals if we are creating the pg on the replica
2888 // based on whether our info for that peer was dne() *before*
2889 // updating pi.history in the backfill block above.
2890 if (m
&& needs_past_intervals
)
2891 m
->past_intervals
= past_intervals
;
2893 // update local version of peer's missing list!
2894 if (m
&& pi
.last_backfill
!= hobject_t()) {
2895 for (auto p
= m
->log
.log
.begin(); p
!= m
->log
.log
.end(); ++p
) {
2896 if (p
->soid
<= pi
.last_backfill
&&
2898 if (perform_deletes_during_peering() && p
->is_delete()) {
2899 pm
.rm(p
->soid
, p
->version
);
2901 pm
.add_next_event(*p
);
2908 dout(10) << "activate peer osd." << peer
<< " sending " << m
->log
2910 m
->lease
= get_lease();
2911 pl
->send_cluster_message(peer
.osd
, m
, get_osdmap_epoch());
2915 pi
.last_update
= info
.last_update
;
2917 // update our missing
2918 if (pm
.num_missing() == 0) {
2919 pi
.last_complete
= pi
.last_update
;
2920 psdout(10) << "activate peer osd." << peer
<< " " << pi
2921 << " uptodate" << dendl
;
2923 psdout(10) << "activate peer osd." << peer
<< " " << pi
2924 << " missing " << pm
<< dendl
;
2928 // Set up missing_loc
2929 set
<pg_shard_t
> complete_shards
;
2930 for (auto i
= acting_recovery_backfill
.begin();
2931 i
!= acting_recovery_backfill
.end();
2933 psdout(20) << __func__
<< " setting up missing_loc from shard " << *i
2935 if (*i
== get_primary()) {
2936 missing_loc
.add_active_missing(missing
);
2937 if (!missing
.have_missing())
2938 complete_shards
.insert(*i
);
2940 auto peer_missing_entry
= peer_missing
.find(*i
);
2941 ceph_assert(peer_missing_entry
!= peer_missing
.end());
2942 missing_loc
.add_active_missing(peer_missing_entry
->second
);
2943 if (!peer_missing_entry
->second
.have_missing() &&
2944 peer_info
[*i
].last_backfill
.is_max())
2945 complete_shards
.insert(*i
);
2949 // If necessary, create might_have_unfound to help us find our unfound objects.
2950 // NOTE: It's important that we build might_have_unfound before trimming the
2952 might_have_unfound
.clear();
2953 if (needs_recovery()) {
2954 // If only one shard has missing, we do a trick to add all others as recovery
2955 // source, this is considered safe since the PGLogs have been merged locally,
2956 // and covers vast majority of the use cases, like one OSD/host is down for
2957 // a while for hardware repairing
2958 if (complete_shards
.size() + 1 == acting_recovery_backfill
.size()) {
2959 missing_loc
.add_batch_sources_info(complete_shards
, ctx
.handle
);
2961 missing_loc
.add_source_info(pg_whoami
, info
, pg_log
.get_missing(),
2963 for (auto i
= acting_recovery_backfill
.begin();
2964 i
!= acting_recovery_backfill
.end();
2966 if (*i
== pg_whoami
) continue;
2967 psdout(10) << __func__
<< ": adding " << *i
<< " as a source" << dendl
;
2968 ceph_assert(peer_missing
.count(*i
));
2969 ceph_assert(peer_info
.count(*i
));
2970 missing_loc
.add_source_info(
2977 for (auto i
= peer_missing
.begin(); i
!= peer_missing
.end(); ++i
) {
2978 if (is_acting_recovery_backfill(i
->first
))
2980 ceph_assert(peer_info
.count(i
->first
));
2982 peer_info
[i
->first
],
2988 build_might_have_unfound();
2990 // Always call now so update_calc_stats() will be accurate
2991 discover_all_missing(ctx
.msgs
);
2995 // num_objects_degraded if calculated should reflect this too, unless no
2996 // missing and we are about to go clean.
2997 if (get_osdmap()->get_pg_size(info
.pgid
.pgid
) > actingset
.size()) {
2998 state_set(PG_STATE_UNDERSIZED
);
3001 state_set(PG_STATE_ACTIVATING
);
3002 pl
->on_activate(std::move(to_trim
));
3004 if (acting_set_writeable()) {
3005 PGLog::LogEntryHandlerRef rollbacker
{pl
->get_log_handler(t
)};
3006 pg_log
.roll_forward(rollbacker
.get());
3010 void PeeringState::share_pg_info()
3012 psdout(10) << "share_pg_info" << dendl
;
3014 info
.history
.refresh_prior_readable_until_ub(pl
->get_mnow(),
3015 prior_readable_until_ub
);
3017 // share new pg_info_t with replicas
3018 ceph_assert(!acting_recovery_backfill
.empty());
3019 for (auto pg_shard
: acting_recovery_backfill
) {
3020 if (pg_shard
== pg_whoami
) continue;
3021 if (auto peer
= peer_info
.find(pg_shard
); peer
!= peer_info
.end()) {
3022 peer
->second
.last_epoch_started
= info
.last_epoch_started
;
3023 peer
->second
.last_interval_started
= info
.last_interval_started
;
3024 peer
->second
.history
.merge(info
.history
);
3027 if (last_require_osd_release
>= ceph_release_t::octopus
) {
3028 m
= make_message
<MOSDPGInfo2
>(spg_t
{info
.pgid
.pgid
, pg_shard
.shard
},
3032 std::optional
<pg_lease_t
>{get_lease()},
3035 m
= make_message
<MOSDPGInfo
>(get_osdmap_epoch(),
3036 MOSDPGInfo::pg_list_t
{
3037 pg_notify_t
{pg_shard
.shard
,
3044 pl
->send_cluster_message(pg_shard
.osd
, m
, get_osdmap_epoch());
3048 void PeeringState::merge_log(
3049 ObjectStore::Transaction
& t
, pg_info_t
&oinfo
, pg_log_t
&& olog
,
3052 PGLog::LogEntryHandlerRef rollbacker
{pl
->get_log_handler(t
)};
3054 oinfo
, std::move(olog
), from
, info
, rollbacker
.get(),
3055 dirty_info
, dirty_big_info
);
3058 void PeeringState::rewind_divergent_log(
3059 ObjectStore::Transaction
& t
, eversion_t newhead
)
3061 PGLog::LogEntryHandlerRef rollbacker
{pl
->get_log_handler(t
)};
3062 pg_log
.rewind_divergent_log(
3063 newhead
, info
, rollbacker
.get(), dirty_info
, dirty_big_info
);
3067 void PeeringState::proc_primary_info(
3068 ObjectStore::Transaction
&t
, const pg_info_t
&oinfo
)
3070 ceph_assert(!is_primary());
3072 update_history(oinfo
.history
);
3073 if (!info
.stats
.stats_invalid
&& info
.stats
.stats
.sum
.num_scrub_errors
) {
3074 info
.stats
.stats
.sum
.num_scrub_errors
= 0;
3075 info
.stats
.stats
.sum
.num_shallow_scrub_errors
= 0;
3076 info
.stats
.stats
.sum
.num_deep_scrub_errors
= 0;
3080 if (!(info
.purged_snaps
== oinfo
.purged_snaps
)) {
3081 psdout(10) << __func__
<< " updating purged_snaps to "
3082 << oinfo
.purged_snaps
3084 info
.purged_snaps
= oinfo
.purged_snaps
;
3086 dirty_big_info
= true;
3090 void PeeringState::proc_master_log(
3091 ObjectStore::Transaction
& t
, pg_info_t
&oinfo
,
3092 pg_log_t
&& olog
, pg_missing_t
&& omissing
, pg_shard_t from
)
3094 psdout(10) << "proc_master_log for osd." << from
<< ": "
3095 << olog
<< " " << omissing
<< dendl
;
3096 ceph_assert(!is_peered() && is_primary());
3098 // merge log into our own log to build master log. no need to
3099 // make any adjustments to their missing map; we are taking their
3100 // log to be authoritative (i.e., their entries are by definitely
3102 merge_log(t
, oinfo
, std::move(olog
), from
);
3103 peer_info
[from
] = oinfo
;
3104 psdout(10) << " peer osd." << from
<< " now " << oinfo
3105 << " " << omissing
<< dendl
;
3106 might_have_unfound
.insert(from
);
3108 // See doc/dev/osd_internals/last_epoch_started
3109 if (oinfo
.last_epoch_started
> info
.last_epoch_started
) {
3110 info
.last_epoch_started
= oinfo
.last_epoch_started
;
3113 if (oinfo
.last_interval_started
> info
.last_interval_started
) {
3114 info
.last_interval_started
= oinfo
.last_interval_started
;
3117 update_history(oinfo
.history
);
3118 ceph_assert(cct
->_conf
->osd_find_best_info_ignore_history_les
||
3119 info
.last_epoch_started
>= info
.history
.last_epoch_started
);
3121 peer_missing
[from
].claim(std::move(omissing
));
3124 void PeeringState::proc_replica_log(
3126 const pg_log_t
&olog
,
3127 pg_missing_t
&& omissing
,
3130 psdout(10) << "proc_replica_log for osd." << from
<< ": "
3131 << oinfo
<< " " << olog
<< " " << omissing
<< dendl
;
3133 pg_log
.proc_replica_log(oinfo
, olog
, omissing
, from
);
3135 peer_info
[from
] = oinfo
;
3136 psdout(10) << " peer osd." << from
<< " now "
3137 << oinfo
<< " " << omissing
<< dendl
;
3138 might_have_unfound
.insert(from
);
3140 for (auto i
= omissing
.get_items().begin();
3141 i
!= omissing
.get_items().end();
3143 psdout(20) << " after missing " << i
->first
3144 << " need " << i
->second
.need
3145 << " have " << i
->second
.have
<< dendl
;
3147 peer_missing
[from
].claim(std::move(omissing
));
3150 void PeeringState::fulfill_info(
3151 pg_shard_t from
, const pg_query_t
&query
,
3152 pair
<pg_shard_t
, pg_info_t
> ¬ify_info
)
3154 ceph_assert(from
== primary
);
3155 ceph_assert(query
.type
== pg_query_t::INFO
);
3158 psdout(10) << "sending info" << dendl
;
3159 notify_info
= make_pair(from
, info
);
3162 void PeeringState::fulfill_log(
3163 pg_shard_t from
, const pg_query_t
&query
, epoch_t query_epoch
)
3165 psdout(10) << "log request from " << from
<< dendl
;
3166 ceph_assert(from
== primary
);
3167 ceph_assert(query
.type
!= pg_query_t::INFO
);
3169 auto mlog
= make_message
<MOSDPGLog
>(
3170 from
.shard
, pg_whoami
.shard
,
3173 mlog
->missing
= pg_log
.get_missing();
3175 // primary -> other, when building master log
3176 if (query
.type
== pg_query_t::LOG
) {
3177 psdout(10) << " sending info+missing+log since " << query
.since
3179 if (query
.since
!= eversion_t() && query
.since
< pg_log
.get_tail()) {
3180 pl
->get_clog_error() << info
.pgid
<< " got broken pg_query_t::LOG since "
3182 << " when my log.tail is " << pg_log
.get_tail()
3183 << ", sending full log instead";
3184 mlog
->log
= pg_log
.get_log(); // primary should not have requested this!!
3186 mlog
->log
.copy_after(cct
, pg_log
.get_log(), query
.since
);
3188 else if (query
.type
== pg_query_t::FULLLOG
) {
3189 psdout(10) << " sending info+missing+full log" << dendl
;
3190 mlog
->log
= pg_log
.get_log();
3193 psdout(10) << " sending " << mlog
->log
<< " " << mlog
->missing
<< dendl
;
3195 pl
->send_cluster_message(from
.osd
, mlog
, get_osdmap_epoch(), true);
3198 void PeeringState::fulfill_query(const MQuery
& query
, PeeringCtxWrapper
&rctx
)
3200 if (query
.query
.type
== pg_query_t::INFO
) {
3201 pair
<pg_shard_t
, pg_info_t
> notify_info
;
3202 // note this refreshes our prior_readable_until_ub value
3203 update_history(query
.query
.history
);
3204 fulfill_info(query
.from
, query
.query
, notify_info
);
3206 notify_info
.first
.osd
,
3208 notify_info
.first
.shard
, pg_whoami
.shard
,
3214 update_history(query
.query
.history
);
3215 fulfill_log(query
.from
, query
.query
, query
.query_epoch
);
3219 void PeeringState::try_mark_clean()
3221 if (actingset
.size() == get_osdmap()->get_pg_size(info
.pgid
.pgid
)) {
3222 state_clear(PG_STATE_FORCED_BACKFILL
| PG_STATE_FORCED_RECOVERY
);
3223 state_set(PG_STATE_CLEAN
);
3224 info
.history
.last_epoch_clean
= get_osdmap_epoch();
3225 info
.history
.last_interval_clean
= info
.history
.same_interval_since
;
3226 past_intervals
.clear();
3227 dirty_big_info
= true;
3231 if (!is_active() && is_peered()) {
3234 if (pool
.info
.is_pending_merge(info
.pgid
.pgid
, &target
)) {
3236 psdout(10) << "ready to merge (target)" << dendl
;
3237 pl
->set_ready_to_merge_target(
3239 info
.history
.last_epoch_started
,
3240 info
.history
.last_epoch_clean
);
3242 psdout(10) << "ready to merge (source)" << dendl
;
3243 pl
->set_ready_to_merge_source(info
.last_update
);
3247 psdout(10) << "not clean, not ready to merge" << dendl
;
3248 // we should have notified OSD in Active state entry point
3252 state_clear(PG_STATE_FORCED_RECOVERY
| PG_STATE_FORCED_BACKFILL
);
3255 pl
->publish_stats_to_osd();
3256 clear_recovery_state();
3259 void PeeringState::split_into(
3260 pg_t child_pgid
, PeeringState
*child
, unsigned split_bits
)
3262 child
->update_osdmap_ref(get_osdmap());
3266 pg_log
.split_into(child_pgid
, split_bits
, &(child
->pg_log
));
3267 child
->info
.last_complete
= info
.last_complete
;
3269 info
.last_update
= pg_log
.get_head();
3270 child
->info
.last_update
= child
->pg_log
.get_head();
3272 child
->info
.last_user_version
= info
.last_user_version
;
3274 info
.log_tail
= pg_log
.get_tail();
3275 child
->info
.log_tail
= child
->pg_log
.get_tail();
3277 // reset last_complete, we might have modified pg_log & missing above
3278 pg_log
.reset_complete_to(&info
);
3279 child
->pg_log
.reset_complete_to(&child
->info
);
3282 child
->info
.history
= info
.history
;
3283 child
->info
.history
.epoch_created
= get_osdmap_epoch();
3284 child
->info
.purged_snaps
= info
.purged_snaps
;
3286 if (info
.last_backfill
.is_max()) {
3287 child
->info
.set_last_backfill(hobject_t::get_max());
3289 // restart backfill on parent and child to be safe. we could
3290 // probably do better in the bitwise sort case, but it's more
3291 // fragile (there may be special work to do on backfill completion
3293 info
.set_last_backfill(hobject_t());
3294 child
->info
.set_last_backfill(hobject_t());
3295 // restarting backfill implies that the missing set is empty,
3296 // since it is only used for objects prior to last_backfill
3297 pg_log
.reset_backfill();
3298 child
->pg_log
.reset_backfill();
3301 child
->info
.stats
= info
.stats
;
3302 child
->info
.stats
.parent_split_bits
= split_bits
;
3303 info
.stats
.stats_invalid
= true;
3304 child
->info
.stats
.stats_invalid
= true;
3305 child
->info
.last_epoch_started
= info
.last_epoch_started
;
3306 child
->info
.last_interval_started
= info
.last_interval_started
;
3308 // There can't be recovery/backfill going on now
3309 int primary
, up_primary
;
3310 vector
<int> newup
, newacting
;
3311 get_osdmap()->pg_to_up_acting_osds(
3312 child
->info
.pgid
.pgid
, &newup
, &up_primary
, &newacting
, &primary
);
3313 child
->init_primary_up_acting(
3318 child
->role
= OSDMap::calc_pg_role(pg_whoami
, child
->acting
);
3320 // this comparison includes primary rank via pg_shard_t
3321 if (get_primary() != child
->get_primary())
3322 child
->info
.history
.same_primary_since
= get_osdmap_epoch();
3324 child
->info
.stats
.up
= newup
;
3325 child
->info
.stats
.up_primary
= up_primary
;
3326 child
->info
.stats
.acting
= newacting
;
3327 child
->info
.stats
.acting_primary
= primary
;
3328 child
->info
.stats
.mapping_epoch
= get_osdmap_epoch();
3331 child
->past_intervals
= past_intervals
;
3333 child
->on_new_interval();
3335 child
->send_notify
= !child
->is_primary();
3337 child
->dirty_info
= true;
3338 child
->dirty_big_info
= true;
3340 dirty_big_info
= true;
3343 void PeeringState::merge_from(
3344 map
<spg_t
,PeeringState
*>& sources
,
3346 unsigned split_bits
,
3347 const pg_merge_meta_t
& last_pg_merge_meta
)
3349 bool incomplete
= false;
3350 if (info
.last_complete
!= info
.last_update
||
3351 info
.is_incomplete() ||
3353 psdout(10) << __func__
<< " target incomplete" << dendl
;
3356 if (last_pg_merge_meta
.source_pgid
!= pg_t()) {
3357 if (info
.pgid
.pgid
!= last_pg_merge_meta
.source_pgid
.get_parent()) {
3358 psdout(10) << __func__
<< " target doesn't match expected parent "
3359 << last_pg_merge_meta
.source_pgid
.get_parent()
3360 << " of source_pgid " << last_pg_merge_meta
.source_pgid
3364 if (info
.last_update
!= last_pg_merge_meta
.target_version
) {
3365 psdout(10) << __func__
<< " target version doesn't match expected "
3366 << last_pg_merge_meta
.target_version
<< dendl
;
3371 PGLog::LogEntryHandlerRef handler
{pl
->get_log_handler(rctx
.transaction
)};
3372 pg_log
.roll_forward(handler
.get());
3374 info
.last_complete
= info
.last_update
; // to fake out trim()
3375 pg_log
.reset_recovery_pointers();
3376 pg_log
.trim(info
.last_update
, info
);
3378 vector
<PGLog
*> log_from
;
3379 for (auto& i
: sources
) {
3380 auto& source
= i
.second
;
3382 psdout(10) << __func__
<< " source " << i
.first
<< " missing" << dendl
;
3386 if (source
->info
.last_complete
!= source
->info
.last_update
||
3387 source
->info
.is_incomplete() ||
3388 source
->info
.dne()) {
3389 psdout(10) << __func__
<< " source " << source
->pg_whoami
3394 if (last_pg_merge_meta
.source_pgid
!= pg_t()) {
3395 if (source
->info
.pgid
.pgid
!= last_pg_merge_meta
.source_pgid
) {
3396 dout(10) << __func__
<< " source " << source
->info
.pgid
.pgid
3397 << " doesn't match expected source pgid "
3398 << last_pg_merge_meta
.source_pgid
<< dendl
;
3401 if (source
->info
.last_update
!= last_pg_merge_meta
.source_version
) {
3402 dout(10) << __func__
<< " source version doesn't match expected "
3403 << last_pg_merge_meta
.target_version
<< dendl
;
3409 PGLog::LogEntryHandlerRef handler
{
3410 source
->pl
->get_log_handler(rctx
.transaction
)};
3411 source
->pg_log
.roll_forward(handler
.get());
3412 source
->info
.last_complete
= source
->info
.last_update
; // to fake out trim()
3413 source
->pg_log
.reset_recovery_pointers();
3414 source
->pg_log
.trim(source
->info
.last_update
, source
->info
);
3415 log_from
.push_back(&source
->pg_log
);
3418 info
.stats
.add(source
->info
.stats
);
3420 // pull up last_update
3421 info
.last_update
= std::max(info
.last_update
, source
->info
.last_update
);
3423 // adopt source's PastIntervals if target has none. we can do this since
3424 // pgp_num has been reduced prior to the merge, so the OSD mappings for
3425 // the PGs are identical.
3426 if (past_intervals
.empty() && !source
->past_intervals
.empty()) {
3427 psdout(10) << __func__
<< " taking source's past_intervals" << dendl
;
3428 past_intervals
= source
->past_intervals
;
3432 info
.last_complete
= info
.last_update
;
3433 info
.log_tail
= info
.last_update
;
3435 info
.last_backfill
= hobject_t();
3439 pg_log
.merge_from(log_from
, info
.last_update
);
3441 // make sure we have a meaningful last_epoch_started/clean (if we were a
3443 if (info
.history
.epoch_created
== 0) {
3444 // start with (a) source's history, since these PGs *should* have been
3445 // remapped in concert with each other...
3446 info
.history
= sources
.begin()->second
->info
.history
;
3448 // we use the last_epoch_{started,clean} we got from
3449 // the caller, which are the epochs that were reported by the PGs were
3450 // found to be ready for merge.
3451 info
.history
.last_epoch_clean
= last_pg_merge_meta
.last_epoch_clean
;
3452 info
.history
.last_epoch_started
= last_pg_merge_meta
.last_epoch_started
;
3453 info
.last_epoch_started
= last_pg_merge_meta
.last_epoch_started
;
3454 psdout(10) << __func__
3455 << " set les/c to " << last_pg_merge_meta
.last_epoch_started
<< "/"
3456 << last_pg_merge_meta
.last_epoch_clean
3457 << " from pool last_dec_*, source pg history was "
3458 << sources
.begin()->second
->info
.history
3461 // above we have pulled down source's history and we need to check
3462 // history.epoch_created again to confirm that source is not a placeholder
3463 // too. (peering requires a sane history.same_interval_since value for any
3464 // non-newly created pg and below here we know we are basically iterating
3465 // back a series of past maps to fake a merge process, hence we need to
3466 // fix history.same_interval_since first so that start_peering_interval()
3467 // will not complain)
3468 if (info
.history
.epoch_created
== 0) {
3469 dout(10) << __func__
<< " both merge target and source are placeholders,"
3470 << " set sis to lec " << info
.history
.last_epoch_clean
3472 info
.history
.same_interval_since
= info
.history
.last_epoch_clean
;
3475 // if the past_intervals start is later than last_epoch_clean, it
3476 // implies the source repeered again but the target didn't, or
3477 // that the source became clean in a later epoch than the target.
3478 // avoid the discrepancy but adjusting the interval start
3479 // backwards to match so that check_past_interval_bounds() will
3481 auto pib
= past_intervals
.get_bounds();
3482 if (info
.history
.last_epoch_clean
< pib
.first
) {
3483 psdout(10) << __func__
<< " last_epoch_clean "
3484 << info
.history
.last_epoch_clean
<< " < past_interval start "
3485 << pib
.first
<< ", adjusting start backwards" << dendl
;
3486 past_intervals
.adjust_start_backwards(info
.history
.last_epoch_clean
);
3489 // Similarly, if the same_interval_since value is later than
3490 // last_epoch_clean, the next interval change will result in a
3491 // past_interval start that is later than last_epoch_clean. This
3492 // can happen if we use the pg_history values from the merge
3493 // source. Adjust the same_interval_since value backwards if that
3494 // happens. (We trust the les and lec values more because they came from
3495 // the real target, whereas the history value we stole from the source.)
3496 if (info
.history
.last_epoch_started
< info
.history
.same_interval_since
) {
3497 psdout(10) << __func__
<< " last_epoch_started "
3498 << info
.history
.last_epoch_started
<< " < same_interval_since "
3499 << info
.history
.same_interval_since
3500 << ", adjusting pg_history backwards" << dendl
;
3501 info
.history
.same_interval_since
= info
.history
.last_epoch_clean
;
3502 // make sure same_{up,primary}_since are <= same_interval_since
3503 info
.history
.same_up_since
= std::min(
3504 info
.history
.same_up_since
, info
.history
.same_interval_since
);
3505 info
.history
.same_primary_since
= std::min(
3506 info
.history
.same_primary_since
, info
.history
.same_interval_since
);
3511 dirty_big_info
= true;
3514 void PeeringState::start_split_stats(
3515 const set
<spg_t
>& childpgs
, vector
<object_stat_sum_t
> *out
)
3517 out
->resize(childpgs
.size() + 1);
3518 info
.stats
.stats
.sum
.split(*out
);
3521 void PeeringState::finish_split_stats(
3522 const object_stat_sum_t
& stats
, ObjectStore::Transaction
&t
)
3524 info
.stats
.stats
.sum
= stats
;
3528 void PeeringState::update_blocked_by()
3530 // set a max on the number of blocking peers we report. if we go
3531 // over, report a random subset. keep the result sorted.
3532 unsigned keep
= std::min
<unsigned>(
3533 blocked_by
.size(), cct
->_conf
->osd_max_pg_blocked_by
);
3534 unsigned skip
= blocked_by
.size() - keep
;
3535 info
.stats
.blocked_by
.clear();
3536 info
.stats
.blocked_by
.resize(keep
);
3538 for (auto p
= blocked_by
.begin(); p
!= blocked_by
.end() && keep
> 0; ++p
) {
3539 if (skip
> 0 && (rand() % (skip
+ keep
) < skip
)) {
3542 info
.stats
.blocked_by
[pos
++] = *p
;
3548 static bool find_shard(const set
<pg_shard_t
> & pgs
, shard_id_t shard
)
3551 if (p
.shard
== shard
)
3556 static pg_shard_t
get_another_shard(const set
<pg_shard_t
> & pgs
, pg_shard_t skip
, shard_id_t shard
)
3558 for (auto&p
: pgs
) {
3561 if (p
.shard
== shard
)
3564 return pg_shard_t();
3567 void PeeringState::update_calc_stats()
3569 info
.stats
.version
= info
.last_update
;
3570 info
.stats
.created
= info
.history
.epoch_created
;
3571 info
.stats
.last_scrub
= info
.history
.last_scrub
;
3572 info
.stats
.last_scrub_stamp
= info
.history
.last_scrub_stamp
;
3573 info
.stats
.last_deep_scrub
= info
.history
.last_deep_scrub
;
3574 info
.stats
.last_deep_scrub_stamp
= info
.history
.last_deep_scrub_stamp
;
3575 info
.stats
.last_clean_scrub_stamp
= info
.history
.last_clean_scrub_stamp
;
3576 info
.stats
.last_epoch_clean
= info
.history
.last_epoch_clean
;
3578 info
.stats
.log_size
= pg_log
.get_head().version
- pg_log
.get_tail().version
;
3579 info
.stats
.ondisk_log_size
= info
.stats
.log_size
;
3580 info
.stats
.log_start
= pg_log
.get_tail();
3581 info
.stats
.ondisk_log_start
= pg_log
.get_tail();
3582 info
.stats
.snaptrimq_len
= pl
->get_snap_trimq_size();
3584 unsigned num_shards
= get_osdmap()->get_pg_size(info
.pgid
.pgid
);
3586 // In rare case that upset is too large (usually transient), use as target
3587 // for calculations below.
3588 unsigned target
= std::max(num_shards
, (unsigned)upset
.size());
3589 // For undersized actingset may be larger with OSDs out
3590 unsigned nrep
= std::max(actingset
.size(), upset
.size());
3591 // calc num_object_copies
3592 info
.stats
.stats
.calc_copies(std::max(target
, nrep
));
3593 info
.stats
.stats
.sum
.num_objects_degraded
= 0;
3594 info
.stats
.stats
.sum
.num_objects_unfound
= 0;
3595 info
.stats
.stats
.sum
.num_objects_misplaced
= 0;
3596 info
.stats
.avail_no_missing
.clear();
3597 info
.stats
.object_location_counts
.clear();
3599 // We should never hit this condition, but if end up hitting it,
3600 // make sure to update num_objects and set PG_STATE_INCONSISTENT.
3601 if (info
.stats
.stats
.sum
.num_objects
< 0) {
3602 psdout(0) << __func__
<< " negative num_objects = "
3603 << info
.stats
.stats
.sum
.num_objects
<< " setting it to 0 "
3605 info
.stats
.stats
.sum
.num_objects
= 0;
3606 state_set(PG_STATE_INCONSISTENT
);
3609 if ((is_remapped() || is_undersized() || !is_clean()) &&
3610 (is_peered()|| is_activating())) {
3611 psdout(20) << __func__
<< " actingset " << actingset
<< " upset "
3612 << upset
<< " acting_recovery_backfill " << acting_recovery_backfill
<< dendl
;
3614 ceph_assert(!acting_recovery_backfill
.empty());
3616 bool estimate
= false;
3618 // NOTE: we only generate degraded, misplaced and unfound
3619 // values for the summation, not individual stat categories.
3620 int64_t num_objects
= info
.stats
.stats
.sum
.num_objects
;
3622 // Objects missing from up nodes, sorted by # objects.
3623 boost::container::flat_set
<pair
<int64_t,pg_shard_t
>> missing_target_objects
;
3624 // Objects missing from nodes not in up, sort by # objects
3625 boost::container::flat_set
<pair
<int64_t,pg_shard_t
>> acting_source_objects
;
3627 // Fill missing_target_objects/acting_source_objects
3633 missing
= pg_log
.get_missing().num_missing();
3634 ceph_assert(acting_recovery_backfill
.count(pg_whoami
));
3635 if (upset
.count(pg_whoami
)) {
3636 missing_target_objects
.emplace(missing
, pg_whoami
);
3638 acting_source_objects
.emplace(missing
, pg_whoami
);
3640 info
.stats
.stats
.sum
.num_objects_missing_on_primary
= missing
;
3642 info
.stats
.avail_no_missing
.push_back(pg_whoami
);
3643 psdout(20) << __func__
<< " shard " << pg_whoami
3644 << " primary objects " << num_objects
3645 << " missing " << missing
3650 for (auto& peer
: peer_info
) {
3651 // Primary should not be in the peer_info, skip if it is.
3652 if (peer
.first
== pg_whoami
) continue;
3653 int64_t missing
= 0;
3654 int64_t peer_num_objects
=
3655 std::max((int64_t)0, peer
.second
.stats
.stats
.sum
.num_objects
);
3656 // Backfill targets always track num_objects accurately
3657 // all other peers track missing accurately.
3658 if (is_backfill_target(peer
.first
)) {
3659 missing
= std::max((int64_t)0, num_objects
- peer_num_objects
);
3661 if (peer_missing
.count(peer
.first
)) {
3662 missing
= peer_missing
[peer
.first
].num_missing();
3664 psdout(20) << __func__
<< " no peer_missing found for "
3665 << peer
.first
<< dendl
;
3666 if (is_recovering()) {
3669 missing
= std::max((int64_t)0, num_objects
- peer_num_objects
);
3672 if (upset
.count(peer
.first
)) {
3673 missing_target_objects
.emplace(missing
, peer
.first
);
3674 } else if (actingset
.count(peer
.first
)) {
3675 acting_source_objects
.emplace(missing
, peer
.first
);
3677 peer
.second
.stats
.stats
.sum
.num_objects_missing
= missing
;
3679 info
.stats
.avail_no_missing
.push_back(peer
.first
);
3680 psdout(20) << __func__
<< " shard " << peer
.first
3681 << " objects " << peer_num_objects
3682 << " missing " << missing
3686 // Compute object_location_counts
3687 for (auto& ml
: missing_loc
.get_missing_locs()) {
3688 info
.stats
.object_location_counts
[ml
.second
]++;
3689 psdout(30) << __func__
<< " " << ml
.first
<< " object_location_counts["
3690 << ml
.second
<< "]=" << info
.stats
.object_location_counts
[ml
.second
]
3693 int64_t not_missing
= num_objects
- missing_loc
.get_missing_locs().size();
3695 // During recovery we know upset == actingset and is being populated
3696 // During backfill we know that all non-missing objects are in the actingset
3697 info
.stats
.object_location_counts
[actingset
] = not_missing
;
3699 psdout(30) << __func__
<< " object_location_counts["
3700 << upset
<< "]=" << info
.stats
.object_location_counts
[upset
]
3702 psdout(20) << __func__
<< " object_location_counts "
3703 << info
.stats
.object_location_counts
<< dendl
;
3705 // A misplaced object is not stored on the correct OSD
3706 int64_t misplaced
= 0;
3707 // a degraded objects has fewer replicas or EC shards than the pool specifies.
3708 int64_t degraded
= 0;
3710 if (is_recovering()) {
3711 for (auto& sml
: missing_loc
.get_missing_by_count()) {
3712 for (auto& ml
: sml
.second
) {
3714 if (sml
.first
== shard_id_t::NO_SHARD
) {
3715 psdout(20) << __func__
<< " ml " << ml
.second
3716 << " upset size " << upset
.size()
3717 << " up " << ml
.first
.up
<< dendl
;
3718 missing_shards
= (int)upset
.size() - ml
.first
.up
;
3720 // Handle shards not even in upset below
3721 if (!find_shard(upset
, sml
.first
))
3723 missing_shards
= std::max(0, 1 - ml
.first
.up
);
3724 psdout(20) << __func__
3725 << " shard " << sml
.first
3726 << " ml " << ml
.second
3727 << " missing shards " << missing_shards
<< dendl
;
3729 int odegraded
= ml
.second
* missing_shards
;
3730 // Copies on other osds but limited to the possible degraded
3731 int more_osds
= std::min(missing_shards
, ml
.first
.other
);
3732 int omisplaced
= ml
.second
* more_osds
;
3733 ceph_assert(omisplaced
<= odegraded
);
3734 odegraded
-= omisplaced
;
3736 misplaced
+= omisplaced
;
3737 degraded
+= odegraded
;
3741 psdout(20) << __func__
<< " missing based degraded "
3742 << degraded
<< dendl
;
3743 psdout(20) << __func__
<< " missing based misplaced "
3744 << misplaced
<< dendl
;
3746 // Handle undersized case
3747 if (pool
.info
.is_replicated()) {
3748 // Add degraded for missing targets (num_objects missing)
3749 ceph_assert(target
>= upset
.size());
3750 unsigned needed
= target
- upset
.size();
3751 degraded
+= num_objects
* needed
;
3753 for (unsigned i
= 0 ; i
< num_shards
; ++i
) {
3754 shard_id_t
shard(i
);
3756 if (!find_shard(upset
, shard
)) {
3757 pg_shard_t pgs
= get_another_shard(actingset
, pg_shard_t(), shard
);
3759 if (pgs
!= pg_shard_t()) {
3762 if (pgs
== pg_whoami
)
3763 missing
= info
.stats
.stats
.sum
.num_objects_missing_on_primary
;
3765 missing
= peer_info
[pgs
].stats
.stats
.sum
.num_objects_missing
;
3767 degraded
+= missing
;
3768 misplaced
+= std::max((int64_t)0, num_objects
- missing
);
3770 // No shard anywhere
3771 degraded
+= num_objects
;
3779 // Handle undersized case
3780 if (pool
.info
.is_replicated()) {
3781 // Add to missing_target_objects
3782 ceph_assert(target
>= missing_target_objects
.size());
3783 unsigned needed
= target
- missing_target_objects
.size();
3785 missing_target_objects
.emplace(num_objects
* needed
, pg_shard_t(pg_shard_t::NO_OSD
));
3787 for (unsigned i
= 0 ; i
< num_shards
; ++i
) {
3788 shard_id_t
shard(i
);
3790 for (const auto& t
: missing_target_objects
) {
3791 if (std::get
<1>(t
).shard
== shard
) {
3797 missing_target_objects
.emplace(num_objects
, pg_shard_t(pg_shard_t::NO_OSD
,shard
));
3801 for (const auto& item
: missing_target_objects
)
3802 psdout(20) << __func__
<< " missing shard " << std::get
<1>(item
)
3803 << " missing= " << std::get
<0>(item
) << dendl
;
3804 for (const auto& item
: acting_source_objects
)
3805 psdout(20) << __func__
<< " acting shard " << std::get
<1>(item
)
3806 << " missing= " << std::get
<0>(item
) << dendl
;
3808 // Handle all objects not in missing for remapped
3810 for (auto m
= missing_target_objects
.rbegin();
3811 m
!= missing_target_objects
.rend(); ++m
) {
3813 int64_t extra_missing
= -1;
3815 if (pool
.info
.is_replicated()) {
3816 if (!acting_source_objects
.empty()) {
3817 auto extra_copy
= acting_source_objects
.begin();
3818 extra_missing
= std::get
<0>(*extra_copy
);
3819 acting_source_objects
.erase(extra_copy
);
3821 } else { // Erasure coded
3822 // Use corresponding shard
3823 for (const auto& a
: acting_source_objects
) {
3824 if (std::get
<1>(a
).shard
== std::get
<1>(*m
).shard
) {
3825 extra_missing
= std::get
<0>(a
);
3826 acting_source_objects
.erase(a
);
3832 if (extra_missing
>= 0 && std::get
<0>(*m
) >= extra_missing
) {
3833 // We don't know which of the objects on the target
3834 // are part of extra_missing so assume are all degraded.
3835 misplaced
+= std::get
<0>(*m
) - extra_missing
;
3836 degraded
+= extra_missing
;
3838 // 1. extra_missing == -1, more targets than sources so degraded
3839 // 2. extra_missing > std::get<0>(m), so that we know that some extra_missing
3840 // previously degraded are now present on the target.
3841 degraded
+= std::get
<0>(*m
);
3844 // If there are still acting that haven't been accounted for
3845 // then they are misplaced
3846 for (const auto& a
: acting_source_objects
) {
3847 int64_t extra_misplaced
= std::max((int64_t)0, num_objects
- std::get
<0>(a
));
3848 psdout(20) << __func__
<< " extra acting misplaced " << extra_misplaced
3850 misplaced
+= extra_misplaced
;
3853 // NOTE: Tests use these messages to verify this code
3854 psdout(20) << __func__
<< " degraded " << degraded
3855 << (estimate
? " (est)": "") << dendl
;
3856 psdout(20) << __func__
<< " misplaced " << misplaced
3857 << (estimate
? " (est)": "")<< dendl
;
3859 info
.stats
.stats
.sum
.num_objects_degraded
= degraded
;
3860 info
.stats
.stats
.sum
.num_objects_unfound
= get_num_unfound();
3861 info
.stats
.stats
.sum
.num_objects_misplaced
= misplaced
;
3865 std::optional
<pg_stat_t
> PeeringState::prepare_stats_for_publish(
3866 bool pg_stats_publish_valid
,
3867 const pg_stat_t
&pg_stats_publish
,
3868 const object_stat_collection_t
&unstable_stats
)
3870 if (info
.stats
.stats
.sum
.num_scrub_errors
) {
3871 state_set(PG_STATE_INCONSISTENT
);
3873 state_clear(PG_STATE_INCONSISTENT
);
3874 state_clear(PG_STATE_FAILED_REPAIR
);
3877 utime_t now
= ceph_clock_now();
3878 if (info
.stats
.state
!= state
) {
3879 info
.stats
.last_change
= now
;
3880 // Optimistic estimation, if we just find out an inactive PG,
3881 // assumt it is active till now.
3882 if (!(state
& PG_STATE_ACTIVE
) &&
3883 (info
.stats
.state
& PG_STATE_ACTIVE
))
3884 info
.stats
.last_active
= now
;
3886 if ((state
& PG_STATE_ACTIVE
) &&
3887 !(info
.stats
.state
& PG_STATE_ACTIVE
))
3888 info
.stats
.last_became_active
= now
;
3889 if ((state
& (PG_STATE_ACTIVE
|PG_STATE_PEERED
)) &&
3890 !(info
.stats
.state
& (PG_STATE_ACTIVE
|PG_STATE_PEERED
)))
3891 info
.stats
.last_became_peered
= now
;
3892 info
.stats
.state
= state
;
3895 update_calc_stats();
3896 if (info
.stats
.stats
.sum
.num_objects_degraded
) {
3897 state_set(PG_STATE_DEGRADED
);
3899 state_clear(PG_STATE_DEGRADED
);
3901 update_blocked_by();
3903 pg_stat_t pre_publish
= info
.stats
;
3904 pre_publish
.stats
.add(unstable_stats
);
3905 utime_t cutoff
= now
;
3906 cutoff
-= cct
->_conf
->osd_pg_stat_report_interval_max
;
3908 // share (some of) our purged_snaps via the pg_stats. limit # of intervals
3909 // because we don't want to make the pg_stat_t structures too expensive.
3910 unsigned max
= cct
->_conf
->osd_max_snap_prune_intervals_per_epoch
;
3912 auto i
= info
.purged_snaps
.begin();
3913 while (num
< max
&& i
!= info
.purged_snaps
.end()) {
3914 pre_publish
.purged_snaps
.insert(i
.get_start(), i
.get_len());
3918 psdout(20) << __func__
<< " reporting purged_snaps "
3919 << pre_publish
.purged_snaps
<< dendl
;
3921 if (pg_stats_publish_valid
&& pre_publish
== pg_stats_publish
&&
3922 info
.stats
.last_fresh
> cutoff
) {
3923 psdout(15) << "publish_stats_to_osd " << pg_stats_publish
.reported_epoch
3924 << ": no change since " << info
.stats
.last_fresh
<< dendl
;
3925 return std::nullopt
;
3927 // update our stat summary and timestamps
3928 info
.stats
.reported_epoch
= get_osdmap_epoch();
3929 ++info
.stats
.reported_seq
;
3931 info
.stats
.last_fresh
= now
;
3933 if (info
.stats
.state
& PG_STATE_CLEAN
)
3934 info
.stats
.last_clean
= now
;
3935 if (info
.stats
.state
& PG_STATE_ACTIVE
)
3936 info
.stats
.last_active
= now
;
3937 if (info
.stats
.state
& (PG_STATE_ACTIVE
|PG_STATE_PEERED
))
3938 info
.stats
.last_peered
= now
;
3939 info
.stats
.last_unstale
= now
;
3940 if ((info
.stats
.state
& PG_STATE_DEGRADED
) == 0)
3941 info
.stats
.last_undegraded
= now
;
3942 if ((info
.stats
.state
& PG_STATE_UNDERSIZED
) == 0)
3943 info
.stats
.last_fullsized
= now
;
3945 psdout(15) << "publish_stats_to_osd " << pg_stats_publish
.reported_epoch
3946 << ":" << pg_stats_publish
.reported_seq
<< dendl
;
3947 return std::make_optional(std::move(pre_publish
));
3951 void PeeringState::init(
3953 const vector
<int>& newup
, int new_up_primary
,
3954 const vector
<int>& newacting
, int new_acting_primary
,
3955 const pg_history_t
& history
,
3956 const PastIntervals
& pi
,
3958 ObjectStore::Transaction
&t
)
3960 psdout(10) << "init role " << role
<< " up "
3961 << newup
<< " acting " << newacting
3962 << " history " << history
3963 << " past_intervals " << pi
3967 init_primary_up_acting(
3971 new_acting_primary
);
3973 info
.history
= history
;
3974 past_intervals
= pi
;
3977 info
.stats
.up_primary
= new_up_primary
;
3978 info
.stats
.acting
= acting
;
3979 info
.stats
.acting_primary
= new_acting_primary
;
3980 info
.stats
.mapping_epoch
= info
.history
.same_interval_since
;
3982 if (!perform_deletes_during_peering()) {
3983 pg_log
.set_missing_may_contain_deletes();
3987 psdout(10) << __func__
<< ": Setting backfill" << dendl
;
3988 info
.set_last_backfill(hobject_t());
3989 info
.last_complete
= info
.last_update
;
3990 pg_log
.mark_log_for_rewrite();
3996 dirty_big_info
= true;
4000 void PeeringState::dump_peering_state(Formatter
*f
)
4002 f
->dump_string("state", get_pg_state_string());
4003 f
->dump_unsigned("epoch", get_osdmap_epoch());
4004 f
->open_array_section("up");
4005 for (auto p
= up
.begin(); p
!= up
.end(); ++p
)
4006 f
->dump_unsigned("osd", *p
);
4008 f
->open_array_section("acting");
4009 for (auto p
= acting
.begin(); p
!= acting
.end(); ++p
)
4010 f
->dump_unsigned("osd", *p
);
4012 if (!backfill_targets
.empty()) {
4013 f
->open_array_section("backfill_targets");
4014 for (auto p
= backfill_targets
.begin(); p
!= backfill_targets
.end(); ++p
)
4015 f
->dump_stream("shard") << *p
;
4018 if (!async_recovery_targets
.empty()) {
4019 f
->open_array_section("async_recovery_targets");
4020 for (auto p
= async_recovery_targets
.begin();
4021 p
!= async_recovery_targets
.end();
4023 f
->dump_stream("shard") << *p
;
4026 if (!acting_recovery_backfill
.empty()) {
4027 f
->open_array_section("acting_recovery_backfill");
4028 for (auto p
= acting_recovery_backfill
.begin();
4029 p
!= acting_recovery_backfill
.end();
4031 f
->dump_stream("shard") << *p
;
4034 f
->open_object_section("info");
4035 update_calc_stats();
4039 f
->open_array_section("peer_info");
4040 for (auto p
= peer_info
.begin(); p
!= peer_info
.end(); ++p
) {
4041 f
->open_object_section("info");
4042 f
->dump_stream("peer") << p
->first
;
4049 void PeeringState::update_stats(
4050 std::function
<bool(pg_history_t
&, pg_stat_t
&)> f
,
4051 ObjectStore::Transaction
*t
) {
4052 if (f(info
.history
, info
.stats
)) {
4053 pl
->publish_stats_to_osd();
4055 pl
->on_info_history_change();
4063 bool PeeringState::append_log_entries_update_missing(
4064 const mempool::osd_pglog::list
<pg_log_entry_t
> &entries
,
4065 ObjectStore::Transaction
&t
, std::optional
<eversion_t
> trim_to
,
4066 std::optional
<eversion_t
> roll_forward_to
)
4068 ceph_assert(!entries
.empty());
4069 ceph_assert(entries
.begin()->version
> info
.last_update
);
4071 PGLog::LogEntryHandlerRef rollbacker
{pl
->get_log_handler(t
)};
4072 bool invalidate_stats
=
4073 pg_log
.append_new_log_entries(
4078 if (roll_forward_to
&& entries
.rbegin()->soid
> info
.last_backfill
) {
4079 pg_log
.roll_forward(rollbacker
.get());
4081 if (roll_forward_to
&& *roll_forward_to
> pg_log
.get_can_rollback_to()) {
4082 pg_log
.roll_forward_to(*roll_forward_to
, rollbacker
.get());
4083 last_rollback_info_trimmed_to_applied
= *roll_forward_to
;
4086 info
.last_update
= pg_log
.get_head();
4088 if (pg_log
.get_missing().num_missing() == 0) {
4089 // advance last_complete since nothing else is missing!
4090 info
.last_complete
= info
.last_update
;
4092 info
.stats
.stats_invalid
= info
.stats
.stats_invalid
|| invalidate_stats
;
4094 psdout(20) << __func__
<< " trim_to bool = " << bool(trim_to
)
4095 << " trim_to = " << (trim_to
? *trim_to
: eversion_t()) << dendl
;
4097 pg_log
.trim(*trim_to
, info
);
4100 return invalidate_stats
;
4103 void PeeringState::merge_new_log_entries(
4104 const mempool::osd_pglog::list
<pg_log_entry_t
> &entries
,
4105 ObjectStore::Transaction
&t
,
4106 std::optional
<eversion_t
> trim_to
,
4107 std::optional
<eversion_t
> roll_forward_to
)
4109 psdout(10) << __func__
<< " " << entries
<< dendl
;
4110 ceph_assert(is_primary());
4112 bool rebuild_missing
= append_log_entries_update_missing(entries
, t
, trim_to
, roll_forward_to
);
4113 for (auto i
= acting_recovery_backfill
.begin();
4114 i
!= acting_recovery_backfill
.end();
4116 pg_shard_t
peer(*i
);
4117 if (peer
== pg_whoami
) continue;
4118 ceph_assert(peer_missing
.count(peer
));
4119 ceph_assert(peer_info
.count(peer
));
4120 pg_missing_t
& pmissing(peer_missing
[peer
]);
4121 psdout(20) << __func__
<< " peer_missing for " << peer
4122 << " = " << pmissing
<< dendl
;
4123 pg_info_t
& pinfo(peer_info
[peer
]);
4124 bool invalidate_stats
= PGLog::append_log_entries_update_missing(
4125 pinfo
.last_backfill
,
4132 pinfo
.last_update
= info
.last_update
;
4133 pinfo
.stats
.stats_invalid
= pinfo
.stats
.stats_invalid
|| invalidate_stats
;
4134 rebuild_missing
= rebuild_missing
|| invalidate_stats
;
4137 if (!rebuild_missing
) {
4141 for (auto &&i
: entries
) {
4142 missing_loc
.rebuild(
4145 acting_recovery_backfill
,
4147 pg_log
.get_missing(),
4153 void PeeringState::add_log_entry(const pg_log_entry_t
& e
, bool applied
)
4155 // raise last_complete only if we were previously up to date
4156 if (info
.last_complete
== info
.last_update
)
4157 info
.last_complete
= e
.version
;
4159 // raise last_update.
4160 ceph_assert(e
.version
> info
.last_update
);
4161 info
.last_update
= e
.version
;
4163 // raise user_version, if it increased (it may have not get bumped
4164 // by all logged updates)
4165 if (e
.user_version
> info
.last_user_version
)
4166 info
.last_user_version
= e
.user_version
;
4169 pg_log
.add(e
, applied
);
4170 psdout(10) << "add_log_entry " << e
<< dendl
;
4174 void PeeringState::append_log(
4175 vector
<pg_log_entry_t
>&& logv
,
4177 eversion_t roll_forward_to
,
4179 ObjectStore::Transaction
&t
,
4180 bool transaction_applied
,
4183 /* The primary has sent an info updating the history, but it may not
4184 * have arrived yet. We want to make sure that we cannot remember this
4185 * write without remembering that it happened in an interval which went
4186 * active in epoch history.last_epoch_started.
4188 if (info
.last_epoch_started
!= info
.history
.last_epoch_started
) {
4189 info
.history
.last_epoch_started
= info
.last_epoch_started
;
4191 if (info
.last_interval_started
!= info
.history
.last_interval_started
) {
4192 info
.history
.last_interval_started
= info
.last_interval_started
;
4194 psdout(10) << "append_log " << pg_log
.get_log() << " " << logv
<< dendl
;
4196 PGLog::LogEntryHandlerRef handler
{pl
->get_log_handler(t
)};
4197 if (!transaction_applied
) {
4198 /* We must be a backfill or async recovery peer, so it's ok if we apply
4199 * out-of-turn since we won't be considered when
4200 * determining a min possible last_update.
4202 * We skip_rollforward() here, which advances the crt, without
4203 * doing an actual rollforward. This avoids cleaning up entries
4204 * from the backend and we do not end up in a situation, where the
4205 * object is deleted before we can _merge_object_divergent_entries().
4207 pg_log
.skip_rollforward();
4210 for (auto p
= logv
.begin(); p
!= logv
.end(); ++p
) {
4211 add_log_entry(*p
, transaction_applied
);
4213 /* We don't want to leave the rollforward artifacts around
4214 * here past last_backfill. It's ok for the same reason as
4216 if (transaction_applied
&&
4217 p
->soid
> info
.last_backfill
) {
4218 pg_log
.roll_forward(handler
.get());
4221 if (transaction_applied
&& roll_forward_to
> pg_log
.get_can_rollback_to()) {
4222 pg_log
.roll_forward_to(
4225 last_rollback_info_trimmed_to_applied
= roll_forward_to
;
4228 psdout(10) << __func__
<< " approx pg log length = "
4229 << pg_log
.get_log().approx_size() << dendl
;
4230 psdout(10) << __func__
<< " transaction_applied = "
4231 << transaction_applied
<< dendl
;
4232 if (!transaction_applied
|| async
)
4233 psdout(10) << __func__
<< " " << pg_whoami
4234 << " is async_recovery or backfill target" << dendl
;
4235 pg_log
.trim(trim_to
, info
, transaction_applied
, async
);
4237 // update the local pg, pg log
4242 min_last_complete_ondisk
= mlcod
;
4245 void PeeringState::recover_got(
4246 const hobject_t
&oid
, eversion_t v
,
4248 ObjectStore::Transaction
&t
)
4250 if (v
> pg_log
.get_can_rollback_to()) {
4251 /* This can only happen during a repair, and even then, it would
4252 * be one heck of a race. If we are repairing the object, the
4253 * write in question must be fully committed, so it's not valid
4254 * to roll it back anyway (and we'll be rolled forward shortly
4256 PGLog::LogEntryHandlerRef handler
{pl
->get_log_handler(t
)};
4257 pg_log
.roll_forward_to(v
, handler
.get());
4260 psdout(10) << "got missing " << oid
<< " v " << v
<< dendl
;
4261 pg_log
.recover_got(oid
, v
, info
);
4262 if (pg_log
.get_log().log
.empty()) {
4263 psdout(10) << "last_complete now " << info
.last_complete
4264 << " while log is empty" << dendl
;
4265 } else if (pg_log
.get_log().complete_to
!= pg_log
.get_log().log
.end()) {
4266 psdout(10) << "last_complete now " << info
.last_complete
4267 << " log.complete_to " << pg_log
.get_log().complete_to
->version
4270 psdout(10) << "last_complete now " << info
.last_complete
4271 << " log.complete_to at end" << dendl
;
4272 //below is not true in the repair case.
4273 //assert(missing.num_missing() == 0); // otherwise, complete_to was wrong.
4274 ceph_assert(info
.last_complete
== info
.last_update
);
4278 ceph_assert(missing_loc
.needs_recovery(oid
));
4280 missing_loc
.add_location(oid
, pg_whoami
);
4288 void PeeringState::update_backfill_progress(
4289 const hobject_t
&updated_backfill
,
4290 const pg_stat_t
&updated_stats
,
4291 bool preserve_local_num_bytes
,
4292 ObjectStore::Transaction
&t
) {
4293 info
.set_last_backfill(updated_backfill
);
4294 if (preserve_local_num_bytes
) {
4295 psdout(25) << __func__
<< " primary " << updated_stats
.stats
.sum
.num_bytes
4296 << " local " << info
.stats
.stats
.sum
.num_bytes
<< dendl
;
4297 int64_t bytes
= info
.stats
.stats
.sum
.num_bytes
;
4298 info
.stats
= updated_stats
;
4299 info
.stats
.stats
.sum
.num_bytes
= bytes
;
4301 psdout(20) << __func__
<< " final " << updated_stats
.stats
.sum
.num_bytes
4302 << " replaces local " << info
.stats
.stats
.sum
.num_bytes
<< dendl
;
4303 info
.stats
= updated_stats
;
4310 void PeeringState::adjust_purged_snaps(
4311 std::function
<void(interval_set
<snapid_t
> &snaps
)> f
) {
4312 f(info
.purged_snaps
);
4314 dirty_big_info
= true;
4317 void PeeringState::on_peer_recover(
4319 const hobject_t
&soid
,
4320 const eversion_t
&version
)
4322 pl
->publish_stats_to_osd();
4324 peer_missing
[peer
].got(soid
, version
);
4325 missing_loc
.add_location(soid
, peer
);
4328 void PeeringState::begin_peer_recover(
4330 const hobject_t soid
)
4332 peer_missing
[peer
].revise_have(soid
, eversion_t());
4335 void PeeringState::force_object_missing(
4336 const set
<pg_shard_t
> &peers
,
4337 const hobject_t
&soid
,
4340 for (auto &&peer
: peers
) {
4341 if (peer
!= primary
) {
4342 peer_missing
[peer
].add(soid
, version
, eversion_t(), false);
4344 pg_log
.missing_add(soid
, version
, eversion_t());
4345 pg_log
.reset_complete_to(&info
);
4346 pg_log
.set_last_requested(0);
4350 missing_loc
.rebuild(
4353 acting_recovery_backfill
,
4355 pg_log
.get_missing(),
4360 void PeeringState::pre_submit_op(
4361 const hobject_t
&hoid
,
4362 const vector
<pg_log_entry_t
>& logv
,
4363 eversion_t at_version
)
4365 if (at_version
> eversion_t()) {
4366 for (auto &&i
: get_acting_recovery_backfill()) {
4367 if (i
== primary
) continue;
4368 pg_info_t
&pinfo
= peer_info
[i
];
4369 // keep peer_info up to date
4370 if (pinfo
.last_complete
== pinfo
.last_update
)
4371 pinfo
.last_complete
= at_version
;
4372 pinfo
.last_update
= at_version
;
4376 bool requires_missing_loc
= false;
4377 for (auto &&i
: get_async_recovery_targets()) {
4378 if (i
== primary
|| !get_peer_missing(i
).is_missing(hoid
))
4380 requires_missing_loc
= true;
4381 for (auto &&entry
: logv
) {
4382 peer_missing
[i
].add_next_event(entry
);
4386 if (requires_missing_loc
) {
4387 for (auto &&entry
: logv
) {
4388 psdout(30) << __func__
<< " missing_loc before: "
4389 << missing_loc
.get_locations(entry
.soid
) << dendl
;
4390 missing_loc
.add_missing(entry
.soid
, entry
.version
,
4391 eversion_t(), entry
.is_delete());
4392 // clear out missing_loc
4393 missing_loc
.clear_location(entry
.soid
);
4394 for (auto &i
: get_actingset()) {
4395 if (!get_peer_missing(i
).is_missing(entry
.soid
))
4396 missing_loc
.add_location(entry
.soid
, i
);
4398 psdout(30) << __func__
<< " missing_loc after: "
4399 << missing_loc
.get_locations(entry
.soid
) << dendl
;
4404 void PeeringState::recovery_committed_to(eversion_t version
)
4406 psdout(10) << __func__
<< " version " << version
4407 << " now ondisk" << dendl
;
4408 last_complete_ondisk
= version
;
4410 if (last_complete_ondisk
== info
.last_update
) {
4411 if (!is_primary()) {
4412 // Either we are a replica or backfill target.
4413 // we are fully up to date. tell the primary!
4414 pl
->send_cluster_message(
4416 make_message
<MOSDPGTrim
>(
4418 spg_t(info
.pgid
.pgid
, primary
.shard
),
4419 last_complete_ondisk
),
4420 get_osdmap_epoch());
4422 calc_min_last_complete_ondisk();
4427 void PeeringState::complete_write(eversion_t v
, eversion_t lc
)
4429 last_update_ondisk
= v
;
4430 last_complete_ondisk
= lc
;
4431 calc_min_last_complete_ondisk();
4434 void PeeringState::calc_trim_to()
4436 size_t target
= pl
->get_target_pg_log_entries();
4438 eversion_t limit
= std::min(
4439 min_last_complete_ondisk
,
4440 pg_log
.get_can_rollback_to());
4441 if (limit
!= eversion_t() &&
4442 limit
!= pg_trim_to
&&
4443 pg_log
.get_log().approx_size() > target
) {
4444 size_t num_to_trim
= std::min(pg_log
.get_log().approx_size() - target
,
4445 cct
->_conf
->osd_pg_log_trim_max
);
4446 if (num_to_trim
< cct
->_conf
->osd_pg_log_trim_min
&&
4447 cct
->_conf
->osd_pg_log_trim_max
>= cct
->_conf
->osd_pg_log_trim_min
) {
4450 auto it
= pg_log
.get_log().log
.begin();
4451 eversion_t new_trim_to
;
4452 for (size_t i
= 0; i
< num_to_trim
; ++i
) {
4453 new_trim_to
= it
->version
;
4455 if (new_trim_to
> limit
) {
4456 new_trim_to
= limit
;
4457 psdout(10) << "calc_trim_to trimming to min_last_complete_ondisk" << dendl
;
4461 psdout(10) << "calc_trim_to " << pg_trim_to
<< " -> " << new_trim_to
<< dendl
;
4462 pg_trim_to
= new_trim_to
;
4463 assert(pg_trim_to
<= pg_log
.get_head());
4464 assert(pg_trim_to
<= min_last_complete_ondisk
);
4468 void PeeringState::calc_trim_to_aggressive()
4470 size_t target
= pl
->get_target_pg_log_entries();
4472 // limit pg log trimming up to the can_rollback_to value
4473 eversion_t limit
= std::min({
4475 pg_log
.get_can_rollback_to(),
4476 last_update_ondisk
});
4477 psdout(10) << __func__
<< " limit = " << limit
<< dendl
;
4479 if (limit
!= eversion_t() &&
4480 limit
!= pg_trim_to
&&
4481 pg_log
.get_log().approx_size() > target
) {
4482 psdout(10) << __func__
<< " approx pg log length = "
4483 << pg_log
.get_log().approx_size() << dendl
;
4484 uint64_t num_to_trim
= std::min
<uint64_t>(pg_log
.get_log().approx_size() - target
,
4485 cct
->_conf
->osd_pg_log_trim_max
);
4486 psdout(10) << __func__
<< " num_to_trim = " << num_to_trim
<< dendl
;
4487 if (num_to_trim
< cct
->_conf
->osd_pg_log_trim_min
&&
4488 cct
->_conf
->osd_pg_log_trim_max
>= cct
->_conf
->osd_pg_log_trim_min
) {
4491 auto it
= pg_log
.get_log().log
.begin(); // oldest log entry
4492 auto rit
= pg_log
.get_log().log
.rbegin();
4493 eversion_t by_n_to_keep
; // start from tail
4494 eversion_t by_n_to_trim
= eversion_t::max(); // start from head
4495 for (size_t i
= 0; it
!= pg_log
.get_log().log
.end(); ++it
, ++rit
) {
4497 if (i
> target
&& by_n_to_keep
== eversion_t()) {
4498 by_n_to_keep
= rit
->version
;
4500 if (i
>= num_to_trim
&& by_n_to_trim
== eversion_t::max()) {
4501 by_n_to_trim
= it
->version
;
4503 if (by_n_to_keep
!= eversion_t() &&
4504 by_n_to_trim
!= eversion_t::max()) {
4509 if (by_n_to_keep
== eversion_t()) {
4513 pg_trim_to
= std::min({by_n_to_keep
, by_n_to_trim
, limit
});
4514 psdout(10) << __func__
<< " pg_trim_to now " << pg_trim_to
<< dendl
;
4515 ceph_assert(pg_trim_to
<= pg_log
.get_head());
4519 void PeeringState::apply_op_stats(
4520 const hobject_t
&soid
,
4521 const object_stat_sum_t
&delta_stats
)
4523 info
.stats
.stats
.add(delta_stats
);
4524 info
.stats
.stats
.floor(0);
4526 for (auto i
= get_backfill_targets().begin();
4527 i
!= get_backfill_targets().end();
4530 pg_info_t
& pinfo
= peer_info
[bt
];
4531 if (soid
<= pinfo
.last_backfill
)
4532 pinfo
.stats
.stats
.add(delta_stats
);
4536 void PeeringState::update_complete_backfill_object_stats(
4537 const hobject_t
&hoid
,
4538 const pg_stat_t
&stats
)
4540 for (auto &&bt
: get_backfill_targets()) {
4541 pg_info_t
& pinfo
= peer_info
[bt
];
4542 //Add stats to all peers that were missing object
4543 if (hoid
> pinfo
.last_backfill
)
4544 pinfo
.stats
.add(stats
);
4548 void PeeringState::update_peer_last_backfill(
4550 const hobject_t
&new_last_backfill
)
4552 pg_info_t
&pinfo
= peer_info
[peer
];
4553 pinfo
.last_backfill
= new_last_backfill
;
4554 if (new_last_backfill
.is_max()) {
4555 /* pinfo.stats might be wrong if we did log-based recovery on the
4556 * backfilled portion in addition to continuing backfill.
4558 pinfo
.stats
= info
.stats
;
4562 void PeeringState::set_revert_with_targets(
4563 const hobject_t
&soid
,
4564 const set
<pg_shard_t
> &good_peers
)
4566 for (auto &&peer
: good_peers
) {
4567 missing_loc
.add_location(soid
, peer
);
4571 void PeeringState::prepare_backfill_for_missing(
4572 const hobject_t
&soid
,
4573 const eversion_t
&version
,
4574 const vector
<pg_shard_t
> &targets
) {
4575 for (auto &&peer
: targets
) {
4576 peer_missing
[peer
].add(soid
, version
, eversion_t(), false);
4580 void PeeringState::update_hset(const pg_hit_set_history_t
&hset_history
)
4582 info
.hit_set
= hset_history
;
4585 /*------------ Peering State Machine----------------*/
4587 #define dout_prefix (context< PeeringMachine >().dpp->gen_prefix(*_dout) \
4588 << "state<" << get_state_name() << ">: ")
4590 #define psdout(x) ldout(context< PeeringMachine >().cct, x)
4592 #define DECLARE_LOCALS \
4593 PeeringState *ps = context< PeeringMachine >().state; \
4595 PeeringListener *pl = context< PeeringMachine >().pl; \
4599 /*------Crashed-------*/
4600 PeeringState::Crashed::Crashed(my_context ctx
)
4602 NamedState(context
< PeeringMachine
>().state_history
, "Crashed")
4604 context
< PeeringMachine
>().log_enter(state_name
);
4605 ceph_abort_msg("we got a bad state machine event");
4609 /*------Initial-------*/
4610 PeeringState::Initial::Initial(my_context ctx
)
4612 NamedState(context
< PeeringMachine
>().state_history
, "Initial")
4614 context
< PeeringMachine
>().log_enter(state_name
);
4617 boost::statechart::result
PeeringState::Initial::react(const MNotifyRec
& notify
)
4620 ps
->proc_replica_info(
4621 notify
.from
, notify
.notify
.info
, notify
.notify
.epoch_sent
);
4622 ps
->set_last_peering_reset();
4623 return transit
< Primary
>();
4626 boost::statechart::result
PeeringState::Initial::react(const MInfoRec
& i
)
4629 ceph_assert(!ps
->is_primary());
4631 return transit
< Stray
>();
4634 boost::statechart::result
PeeringState::Initial::react(const MLogRec
& i
)
4637 ceph_assert(!ps
->is_primary());
4639 return transit
< Stray
>();
4642 void PeeringState::Initial::exit()
4644 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
4646 utime_t dur
= ceph_clock_now() - enter_time
;
4647 pl
->get_peering_perf().tinc(rs_initial_latency
, dur
);
4650 /*------Started-------*/
4651 PeeringState::Started::Started(my_context ctx
)
4653 NamedState(context
< PeeringMachine
>().state_history
, "Started")
4655 context
< PeeringMachine
>().log_enter(state_name
);
4658 boost::statechart::result
4659 PeeringState::Started::react(const IntervalFlush
&)
4661 psdout(10) << "Ending blocked outgoing recovery messages" << dendl
;
4662 context
< PeeringMachine
>().state
->end_block_outgoing();
4663 return discard_event();
4666 boost::statechart::result
PeeringState::Started::react(const AdvMap
& advmap
)
4669 psdout(10) << "Started advmap" << dendl
;
4670 ps
->check_full_transition(advmap
.lastmap
, advmap
.osdmap
);
4671 if (ps
->should_restart_peering(
4673 advmap
.acting_primary
,
4678 psdout(10) << "should_restart_peering, transitioning to Reset"
4681 return transit
< Reset
>();
4683 ps
->remove_down_peer_info(advmap
.osdmap
);
4684 return discard_event();
4687 boost::statechart::result
PeeringState::Started::react(const QueryState
& q
)
4689 q
.f
->open_object_section("state");
4690 q
.f
->dump_string("name", state_name
);
4691 q
.f
->dump_stream("enter_time") << enter_time
;
4692 q
.f
->close_section();
4693 return discard_event();
4696 boost::statechart::result
PeeringState::Started::react(const QueryUnfound
& q
)
4698 q
.f
->dump_string("state", "Started");
4699 q
.f
->dump_bool("available_might_have_unfound", false);
4700 return discard_event();
4703 void PeeringState::Started::exit()
4705 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
4707 utime_t dur
= ceph_clock_now() - enter_time
;
4708 pl
->get_peering_perf().tinc(rs_started_latency
, dur
);
4709 ps
->state_clear(PG_STATE_WAIT
| PG_STATE_LAGGY
);
4712 /*--------Reset---------*/
4713 PeeringState::Reset::Reset(my_context ctx
)
4715 NamedState(context
< PeeringMachine
>().state_history
, "Reset")
4717 context
< PeeringMachine
>().log_enter(state_name
);
4720 ps
->flushes_in_progress
= 0;
4721 ps
->set_last_peering_reset();
4722 ps
->log_weirdness();
4725 boost::statechart::result
4726 PeeringState::Reset::react(const IntervalFlush
&)
4728 psdout(10) << "Ending blocked outgoing recovery messages" << dendl
;
4729 context
< PeeringMachine
>().state
->end_block_outgoing();
4730 return discard_event();
4733 boost::statechart::result
PeeringState::Reset::react(const AdvMap
& advmap
)
4736 psdout(10) << "Reset advmap" << dendl
;
4738 ps
->check_full_transition(advmap
.lastmap
, advmap
.osdmap
);
4740 if (ps
->should_restart_peering(
4742 advmap
.acting_primary
,
4747 psdout(10) << "should restart peering, calling start_peering_interval again"
4749 ps
->start_peering_interval(
4751 advmap
.newup
, advmap
.up_primary
,
4752 advmap
.newacting
, advmap
.acting_primary
,
4753 context
< PeeringMachine
>().get_cur_transaction());
4755 ps
->remove_down_peer_info(advmap
.osdmap
);
4756 ps
->check_past_interval_bounds();
4757 return discard_event();
4760 boost::statechart::result
PeeringState::Reset::react(const ActMap
&)
4763 if (ps
->should_send_notify() && ps
->get_primary().osd
>= 0) {
4764 ps
->info
.history
.refresh_prior_readable_until_ub(
4766 ps
->prior_readable_until_ub
);
4767 context
< PeeringMachine
>().send_notify(
4768 ps
->get_primary().osd
,
4770 ps
->get_primary().shard
, ps
->pg_whoami
.shard
,
4771 ps
->get_osdmap_epoch(),
4772 ps
->get_osdmap_epoch(),
4774 ps
->past_intervals
));
4777 ps
->update_heartbeat_peers();
4779 return transit
< Started
>();
4782 boost::statechart::result
PeeringState::Reset::react(const QueryState
& q
)
4784 q
.f
->open_object_section("state");
4785 q
.f
->dump_string("name", state_name
);
4786 q
.f
->dump_stream("enter_time") << enter_time
;
4787 q
.f
->close_section();
4788 return discard_event();
4791 boost::statechart::result
PeeringState::Reset::react(const QueryUnfound
& q
)
4793 q
.f
->dump_string("state", "Reset");
4794 q
.f
->dump_bool("available_might_have_unfound", false);
4795 return discard_event();
4798 void PeeringState::Reset::exit()
4800 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
4802 utime_t dur
= ceph_clock_now() - enter_time
;
4803 pl
->get_peering_perf().tinc(rs_reset_latency
, dur
);
4806 /*-------Start---------*/
4807 PeeringState::Start::Start(my_context ctx
)
4809 NamedState(context
< PeeringMachine
>().state_history
, "Start")
4811 context
< PeeringMachine
>().log_enter(state_name
);
4814 if (ps
->is_primary()) {
4815 psdout(1) << "transitioning to Primary" << dendl
;
4816 post_event(MakePrimary());
4818 psdout(1) << "transitioning to Stray" << dendl
;
4819 post_event(MakeStray());
4823 void PeeringState::Start::exit()
4825 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
4827 utime_t dur
= ceph_clock_now() - enter_time
;
4828 pl
->get_peering_perf().tinc(rs_start_latency
, dur
);
4831 /*---------Primary--------*/
4832 PeeringState::Primary::Primary(my_context ctx
)
4834 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary")
4836 context
< PeeringMachine
>().log_enter(state_name
);
4838 ceph_assert(ps
->want_acting
.empty());
4840 // set CREATING bit until we have peered for the first time.
4841 if (ps
->info
.history
.last_epoch_started
== 0) {
4842 ps
->state_set(PG_STATE_CREATING
);
4843 // use the history timestamp, which ultimately comes from the
4844 // monitor in the create case.
4845 utime_t t
= ps
->info
.history
.last_scrub_stamp
;
4846 ps
->info
.stats
.last_fresh
= t
;
4847 ps
->info
.stats
.last_active
= t
;
4848 ps
->info
.stats
.last_change
= t
;
4849 ps
->info
.stats
.last_peered
= t
;
4850 ps
->info
.stats
.last_clean
= t
;
4851 ps
->info
.stats
.last_unstale
= t
;
4852 ps
->info
.stats
.last_undegraded
= t
;
4853 ps
->info
.stats
.last_fullsized
= t
;
4854 ps
->info
.stats
.last_scrub_stamp
= t
;
4855 ps
->info
.stats
.last_deep_scrub_stamp
= t
;
4856 ps
->info
.stats
.last_clean_scrub_stamp
= t
;
4860 boost::statechart::result
PeeringState::Primary::react(const MNotifyRec
& notevt
)
4863 psdout(7) << "handle_pg_notify from osd." << notevt
.from
<< dendl
;
4864 ps
->proc_replica_info(
4865 notevt
.from
, notevt
.notify
.info
, notevt
.notify
.epoch_sent
);
4866 return discard_event();
4869 boost::statechart::result
PeeringState::Primary::react(const ActMap
&)
4872 psdout(7) << "handle ActMap primary" << dendl
;
4873 pl
->publish_stats_to_osd();
4874 return discard_event();
4877 boost::statechart::result
PeeringState::Primary::react(
4878 const SetForceRecovery
&)
4881 ps
->set_force_recovery(true);
4882 return discard_event();
4885 boost::statechart::result
PeeringState::Primary::react(
4886 const UnsetForceRecovery
&)
4889 ps
->set_force_recovery(false);
4890 return discard_event();
4893 boost::statechart::result
PeeringState::Primary::react(
4894 const RequestScrub
& evt
)
4897 if (ps
->is_primary()) {
4898 pl
->scrub_requested(evt
.deep
, evt
.repair
);
4899 psdout(10) << "marking for scrub" << dendl
;
4901 return discard_event();
4904 boost::statechart::result
PeeringState::Primary::react(
4905 const SetForceBackfill
&)
4908 ps
->set_force_backfill(true);
4909 return discard_event();
4912 boost::statechart::result
PeeringState::Primary::react(
4913 const UnsetForceBackfill
&)
4916 ps
->set_force_backfill(false);
4917 return discard_event();
4920 void PeeringState::Primary::exit()
4922 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
4924 ps
->want_acting
.clear();
4925 utime_t dur
= ceph_clock_now() - enter_time
;
4926 pl
->get_peering_perf().tinc(rs_primary_latency
, dur
);
4927 pl
->clear_primary_state();
4928 ps
->state_clear(PG_STATE_CREATING
);
4931 /*---------Peering--------*/
4932 PeeringState::Peering::Peering(my_context ctx
)
4934 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Peering"),
4935 history_les_bound(false)
4937 context
< PeeringMachine
>().log_enter(state_name
);
4940 ceph_assert(!ps
->is_peered());
4941 ceph_assert(!ps
->is_peering());
4942 ceph_assert(ps
->is_primary());
4943 ps
->state_set(PG_STATE_PEERING
);
4946 boost::statechart::result
PeeringState::Peering::react(const AdvMap
& advmap
)
4949 psdout(10) << "Peering advmap" << dendl
;
4950 if (prior_set
.affected_by_map(*(advmap
.osdmap
), ps
->dpp
)) {
4951 psdout(1) << "Peering, affected_by_map, going to Reset" << dendl
;
4953 return transit
< Reset
>();
4956 ps
->adjust_need_up_thru(advmap
.osdmap
);
4957 ps
->check_prior_readable_down_osds(advmap
.osdmap
);
4959 return forward_event();
4962 boost::statechart::result
PeeringState::Peering::react(const QueryState
& q
)
4966 q
.f
->open_object_section("state");
4967 q
.f
->dump_string("name", state_name
);
4968 q
.f
->dump_stream("enter_time") << enter_time
;
4970 q
.f
->open_array_section("past_intervals");
4971 ps
->past_intervals
.dump(q
.f
);
4972 q
.f
->close_section();
4974 q
.f
->open_array_section("probing_osds");
4975 for (auto p
= prior_set
.probe
.begin(); p
!= prior_set
.probe
.end(); ++p
)
4976 q
.f
->dump_stream("osd") << *p
;
4977 q
.f
->close_section();
4979 if (prior_set
.pg_down
)
4980 q
.f
->dump_string("blocked", "peering is blocked due to down osds");
4982 q
.f
->open_array_section("down_osds_we_would_probe");
4983 for (auto p
= prior_set
.down
.begin(); p
!= prior_set
.down
.end(); ++p
)
4984 q
.f
->dump_int("osd", *p
);
4985 q
.f
->close_section();
4987 q
.f
->open_array_section("peering_blocked_by");
4988 for (auto p
= prior_set
.blocked_by
.begin();
4989 p
!= prior_set
.blocked_by
.end();
4991 q
.f
->open_object_section("osd");
4992 q
.f
->dump_int("osd", p
->first
);
4993 q
.f
->dump_int("current_lost_at", p
->second
);
4994 q
.f
->dump_string("comment", "starting or marking this osd lost may let us proceed");
4995 q
.f
->close_section();
4997 q
.f
->close_section();
4999 if (history_les_bound
) {
5000 q
.f
->open_array_section("peering_blocked_by_detail");
5001 q
.f
->open_object_section("item");
5002 q
.f
->dump_string("detail","peering_blocked_by_history_les_bound");
5003 q
.f
->close_section();
5004 q
.f
->close_section();
5007 q
.f
->close_section();
5008 return forward_event();
5011 boost::statechart::result
PeeringState::Peering::react(const QueryUnfound
& q
)
5013 q
.f
->dump_string("state", "Peering");
5014 q
.f
->dump_bool("available_might_have_unfound", false);
5015 return discard_event();
5018 void PeeringState::Peering::exit()
5022 psdout(10) << "Leaving Peering" << dendl
;
5023 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5024 ps
->state_clear(PG_STATE_PEERING
);
5025 pl
->clear_probe_targets();
5027 utime_t dur
= ceph_clock_now() - enter_time
;
5028 pl
->get_peering_perf().tinc(rs_peering_latency
, dur
);
5032 /*------Backfilling-------*/
5033 PeeringState::Backfilling::Backfilling(my_context ctx
)
5035 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/Backfilling")
5037 context
< PeeringMachine
>().log_enter(state_name
);
5041 ps
->backfill_reserved
= true;
5042 pl
->on_backfill_reserved();
5043 ps
->state_clear(PG_STATE_BACKFILL_TOOFULL
);
5044 ps
->state_clear(PG_STATE_BACKFILL_WAIT
);
5045 ps
->state_set(PG_STATE_BACKFILLING
);
5046 pl
->publish_stats_to_osd();
5049 void PeeringState::Backfilling::backfill_release_reservations()
5052 pl
->cancel_local_background_io_reservation();
5053 for (auto it
= ps
->backfill_targets
.begin();
5054 it
!= ps
->backfill_targets
.end();
5056 ceph_assert(*it
!= ps
->pg_whoami
);
5057 pl
->send_cluster_message(
5059 make_message
<MBackfillReserve
>(
5060 MBackfillReserve::RELEASE
,
5061 spg_t(ps
->info
.pgid
.pgid
, it
->shard
),
5062 ps
->get_osdmap_epoch()),
5063 ps
->get_osdmap_epoch());
5067 void PeeringState::Backfilling::cancel_backfill()
5070 backfill_release_reservations();
5071 pl
->on_backfill_canceled();
5074 boost::statechart::result
5075 PeeringState::Backfilling::react(const Backfilled
&c
)
5077 backfill_release_reservations();
5078 return transit
<Recovered
>();
5081 boost::statechart::result
5082 PeeringState::Backfilling::react(const DeferBackfill
&c
)
5086 psdout(10) << "defer backfill, retry delay " << c
.delay
<< dendl
;
5087 ps
->state_set(PG_STATE_BACKFILL_WAIT
);
5088 ps
->state_clear(PG_STATE_BACKFILLING
);
5091 pl
->schedule_event_after(
5092 std::make_shared
<PGPeeringEvent
>(
5093 ps
->get_osdmap_epoch(),
5094 ps
->get_osdmap_epoch(),
5097 return transit
<NotBackfilling
>();
5100 boost::statechart::result
5101 PeeringState::Backfilling::react(const UnfoundBackfill
&c
)
5104 psdout(10) << "backfill has unfound, can't continue" << dendl
;
5105 ps
->state_set(PG_STATE_BACKFILL_UNFOUND
);
5106 ps
->state_clear(PG_STATE_BACKFILLING
);
5108 return transit
<NotBackfilling
>();
5111 boost::statechart::result
5112 PeeringState::Backfilling::react(const RemoteReservationRevokedTooFull
&)
5116 ps
->state_set(PG_STATE_BACKFILL_TOOFULL
);
5117 ps
->state_clear(PG_STATE_BACKFILLING
);
5120 pl
->schedule_event_after(
5121 std::make_shared
<PGPeeringEvent
>(
5122 ps
->get_osdmap_epoch(),
5123 ps
->get_osdmap_epoch(),
5125 ps
->cct
->_conf
->osd_backfill_retry_interval
);
5127 return transit
<NotBackfilling
>();
5130 boost::statechart::result
5131 PeeringState::Backfilling::react(const RemoteReservationRevoked
&)
5134 ps
->state_set(PG_STATE_BACKFILL_WAIT
);
5136 if (ps
->needs_backfill()) {
5137 return transit
<WaitLocalBackfillReserved
>();
5139 // raced with MOSDPGBackfill::OP_BACKFILL_FINISH, ignore
5140 return discard_event();
5144 void PeeringState::Backfilling::exit()
5146 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5148 ps
->backfill_reserved
= false;
5149 ps
->state_clear(PG_STATE_BACKFILLING
);
5150 ps
->state_clear(PG_STATE_FORCED_BACKFILL
| PG_STATE_FORCED_RECOVERY
);
5151 utime_t dur
= ceph_clock_now() - enter_time
;
5152 pl
->get_peering_perf().tinc(rs_backfilling_latency
, dur
);
5155 /*--WaitRemoteBackfillReserved--*/
5157 PeeringState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_context ctx
)
5159 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/WaitRemoteBackfillReserved"),
5160 backfill_osd_it(context
< Active
>().remote_shards_to_reserve_backfill
.begin())
5162 context
< PeeringMachine
>().log_enter(state_name
);
5165 ps
->state_set(PG_STATE_BACKFILL_WAIT
);
5166 pl
->publish_stats_to_osd();
5167 post_event(RemoteBackfillReserved());
5170 boost::statechart::result
5171 PeeringState::WaitRemoteBackfillReserved::react(const RemoteBackfillReserved
&evt
)
5175 int64_t num_bytes
= ps
->info
.stats
.stats
.sum
.num_bytes
;
5176 psdout(10) << __func__
<< " num_bytes " << num_bytes
<< dendl
;
5177 if (backfill_osd_it
!=
5178 context
< Active
>().remote_shards_to_reserve_backfill
.end()) {
5179 // The primary never backfills itself
5180 ceph_assert(*backfill_osd_it
!= ps
->pg_whoami
);
5181 pl
->send_cluster_message(
5182 backfill_osd_it
->osd
,
5183 make_message
<MBackfillReserve
>(
5184 MBackfillReserve::REQUEST
,
5185 spg_t(context
< PeeringMachine
>().spgid
.pgid
, backfill_osd_it
->shard
),
5186 ps
->get_osdmap_epoch(),
5187 ps
->get_backfill_priority(),
5189 ps
->peer_bytes
[*backfill_osd_it
]),
5190 ps
->get_osdmap_epoch());
5193 ps
->peer_bytes
.clear();
5194 post_event(AllBackfillsReserved());
5196 return discard_event();
5199 void PeeringState::WaitRemoteBackfillReserved::exit()
5201 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5204 utime_t dur
= ceph_clock_now() - enter_time
;
5205 pl
->get_peering_perf().tinc(rs_waitremotebackfillreserved_latency
, dur
);
5208 void PeeringState::WaitRemoteBackfillReserved::retry()
5211 pl
->cancel_local_background_io_reservation();
5213 // Send CANCEL to all previously acquired reservations
5214 set
<pg_shard_t
>::const_iterator it
, begin
, end
;
5215 begin
= context
< Active
>().remote_shards_to_reserve_backfill
.begin();
5216 end
= context
< Active
>().remote_shards_to_reserve_backfill
.end();
5217 ceph_assert(begin
!= end
);
5218 for (it
= begin
; it
!= backfill_osd_it
; ++it
) {
5219 // The primary never backfills itself
5220 ceph_assert(*it
!= ps
->pg_whoami
);
5221 pl
->send_cluster_message(
5223 make_message
<MBackfillReserve
>(
5224 MBackfillReserve::RELEASE
,
5225 spg_t(context
< PeeringMachine
>().spgid
.pgid
, it
->shard
),
5226 ps
->get_osdmap_epoch()),
5227 ps
->get_osdmap_epoch());
5230 ps
->state_clear(PG_STATE_BACKFILL_WAIT
);
5231 pl
->publish_stats_to_osd();
5233 pl
->schedule_event_after(
5234 std::make_shared
<PGPeeringEvent
>(
5235 ps
->get_osdmap_epoch(),
5236 ps
->get_osdmap_epoch(),
5238 ps
->cct
->_conf
->osd_backfill_retry_interval
);
5241 boost::statechart::result
5242 PeeringState::WaitRemoteBackfillReserved::react(const RemoteReservationRejectedTooFull
&evt
)
5245 ps
->state_set(PG_STATE_BACKFILL_TOOFULL
);
5247 return transit
<NotBackfilling
>();
5250 boost::statechart::result
5251 PeeringState::WaitRemoteBackfillReserved::react(const RemoteReservationRevoked
&evt
)
5254 return transit
<NotBackfilling
>();
5257 /*--WaitLocalBackfillReserved--*/
5258 PeeringState::WaitLocalBackfillReserved::WaitLocalBackfillReserved(my_context ctx
)
5260 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/WaitLocalBackfillReserved")
5262 context
< PeeringMachine
>().log_enter(state_name
);
5265 ps
->state_set(PG_STATE_BACKFILL_WAIT
);
5266 pl
->request_local_background_io_reservation(
5267 ps
->get_backfill_priority(),
5268 std::make_unique
<PGPeeringEvent
>(
5269 ps
->get_osdmap_epoch(),
5270 ps
->get_osdmap_epoch(),
5271 LocalBackfillReserved()),
5272 std::make_unique
<PGPeeringEvent
>(
5273 ps
->get_osdmap_epoch(),
5274 ps
->get_osdmap_epoch(),
5275 DeferBackfill(0.0)));
5276 pl
->publish_stats_to_osd();
5279 void PeeringState::WaitLocalBackfillReserved::exit()
5281 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5283 utime_t dur
= ceph_clock_now() - enter_time
;
5284 pl
->get_peering_perf().tinc(rs_waitlocalbackfillreserved_latency
, dur
);
5287 /*----NotBackfilling------*/
5288 PeeringState::NotBackfilling::NotBackfilling(my_context ctx
)
5290 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/NotBackfilling")
5292 context
< PeeringMachine
>().log_enter(state_name
);
5294 ps
->state_clear(PG_STATE_REPAIR
);
5295 pl
->publish_stats_to_osd();
5298 boost::statechart::result
PeeringState::NotBackfilling::react(const QueryUnfound
& q
)
5302 ps
->query_unfound(q
.f
, "NotBackfilling");
5303 return discard_event();
5306 boost::statechart::result
5307 PeeringState::NotBackfilling::react(const RemoteBackfillReserved
&evt
)
5309 return discard_event();
5312 boost::statechart::result
5313 PeeringState::NotBackfilling::react(const RemoteReservationRejectedTooFull
&evt
)
5315 return discard_event();
5318 void PeeringState::NotBackfilling::exit()
5320 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5323 ps
->state_clear(PG_STATE_BACKFILL_UNFOUND
);
5324 utime_t dur
= ceph_clock_now() - enter_time
;
5325 pl
->get_peering_perf().tinc(rs_notbackfilling_latency
, dur
);
5328 /*----NotRecovering------*/
5329 PeeringState::NotRecovering::NotRecovering(my_context ctx
)
5331 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/NotRecovering")
5333 context
< PeeringMachine
>().log_enter(state_name
);
5335 ps
->state_clear(PG_STATE_REPAIR
);
5336 pl
->publish_stats_to_osd();
5339 boost::statechart::result
PeeringState::NotRecovering::react(const QueryUnfound
& q
)
5343 ps
->query_unfound(q
.f
, "NotRecovering");
5344 return discard_event();
5347 void PeeringState::NotRecovering::exit()
5349 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5352 ps
->state_clear(PG_STATE_RECOVERY_UNFOUND
);
5353 utime_t dur
= ceph_clock_now() - enter_time
;
5354 pl
->get_peering_perf().tinc(rs_notrecovering_latency
, dur
);
5357 /*---RepNotRecovering----*/
5358 PeeringState::RepNotRecovering::RepNotRecovering(my_context ctx
)
5360 NamedState(context
< PeeringMachine
>().state_history
, "Started/ReplicaActive/RepNotRecovering")
5362 context
< PeeringMachine
>().log_enter(state_name
);
5365 boost::statechart::result
5366 PeeringState::RepNotRecovering::react(const RejectTooFullRemoteReservation
&evt
)
5369 ps
->reject_reservation();
5370 post_event(RemoteReservationRejectedTooFull());
5371 return discard_event();
5374 void PeeringState::RepNotRecovering::exit()
5376 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5378 utime_t dur
= ceph_clock_now() - enter_time
;
5379 pl
->get_peering_perf().tinc(rs_repnotrecovering_latency
, dur
);
5382 /*---RepWaitRecoveryReserved--*/
5383 PeeringState::RepWaitRecoveryReserved::RepWaitRecoveryReserved(my_context ctx
)
5385 NamedState(context
< PeeringMachine
>().state_history
, "Started/ReplicaActive/RepWaitRecoveryReserved")
5387 context
< PeeringMachine
>().log_enter(state_name
);
5390 boost::statechart::result
5391 PeeringState::RepWaitRecoveryReserved::react(const RemoteRecoveryReserved
&evt
)
5394 pl
->send_cluster_message(
5396 make_message
<MRecoveryReserve
>(
5397 MRecoveryReserve::GRANT
,
5398 spg_t(ps
->info
.pgid
.pgid
, ps
->primary
.shard
),
5399 ps
->get_osdmap_epoch()),
5400 ps
->get_osdmap_epoch());
5401 return transit
<RepRecovering
>();
5404 boost::statechart::result
5405 PeeringState::RepWaitRecoveryReserved::react(
5406 const RemoteReservationCanceled
&evt
)
5409 pl
->unreserve_recovery_space();
5411 pl
->cancel_remote_recovery_reservation();
5412 return transit
<RepNotRecovering
>();
5415 void PeeringState::RepWaitRecoveryReserved::exit()
5417 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5419 utime_t dur
= ceph_clock_now() - enter_time
;
5420 pl
->get_peering_perf().tinc(rs_repwaitrecoveryreserved_latency
, dur
);
5423 /*-RepWaitBackfillReserved*/
5424 PeeringState::RepWaitBackfillReserved::RepWaitBackfillReserved(my_context ctx
)
5426 NamedState(context
< PeeringMachine
>().state_history
, "Started/ReplicaActive/RepWaitBackfillReserved")
5428 context
< PeeringMachine
>().log_enter(state_name
);
5431 boost::statechart::result
5432 PeeringState::RepNotRecovering::react(const RequestBackfillPrio
&evt
)
5437 if (!pl
->try_reserve_recovery_space(
5438 evt
.primary_num_bytes
, evt
.local_num_bytes
)) {
5439 post_event(RejectTooFullRemoteReservation());
5441 PGPeeringEventURef preempt
;
5442 if (HAVE_FEATURE(ps
->upacting_features
, RECOVERY_RESERVATION_2
)) {
5443 // older peers will interpret preemption as TOOFULL
5444 preempt
= std::make_unique
<PGPeeringEvent
>(
5445 pl
->get_osdmap_epoch(),
5446 pl
->get_osdmap_epoch(),
5447 RemoteBackfillPreempted());
5449 pl
->request_remote_recovery_reservation(
5451 std::make_unique
<PGPeeringEvent
>(
5452 pl
->get_osdmap_epoch(),
5453 pl
->get_osdmap_epoch(),
5454 RemoteBackfillReserved()),
5455 std::move(preempt
));
5457 return transit
<RepWaitBackfillReserved
>();
5460 boost::statechart::result
5461 PeeringState::RepNotRecovering::react(const RequestRecoveryPrio
&evt
)
5465 // fall back to a local reckoning of priority of primary doesn't pass one
5466 // (pre-mimic compat)
5467 int prio
= evt
.priority
? evt
.priority
: ps
->get_recovery_priority();
5469 PGPeeringEventURef preempt
;
5470 if (HAVE_FEATURE(ps
->upacting_features
, RECOVERY_RESERVATION_2
)) {
5471 // older peers can't handle this
5472 preempt
= std::make_unique
<PGPeeringEvent
>(
5473 ps
->get_osdmap_epoch(),
5474 ps
->get_osdmap_epoch(),
5475 RemoteRecoveryPreempted());
5478 pl
->request_remote_recovery_reservation(
5480 std::make_unique
<PGPeeringEvent
>(
5481 ps
->get_osdmap_epoch(),
5482 ps
->get_osdmap_epoch(),
5483 RemoteRecoveryReserved()),
5484 std::move(preempt
));
5485 return transit
<RepWaitRecoveryReserved
>();
5488 void PeeringState::RepWaitBackfillReserved::exit()
5490 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5492 utime_t dur
= ceph_clock_now() - enter_time
;
5493 pl
->get_peering_perf().tinc(rs_repwaitbackfillreserved_latency
, dur
);
5496 boost::statechart::result
5497 PeeringState::RepWaitBackfillReserved::react(const RemoteBackfillReserved
&evt
)
5502 pl
->send_cluster_message(
5504 make_message
<MBackfillReserve
>(
5505 MBackfillReserve::GRANT
,
5506 spg_t(ps
->info
.pgid
.pgid
, ps
->primary
.shard
),
5507 ps
->get_osdmap_epoch()),
5508 ps
->get_osdmap_epoch());
5509 return transit
<RepRecovering
>();
5512 boost::statechart::result
5513 PeeringState::RepWaitBackfillReserved::react(
5514 const RejectTooFullRemoteReservation
&evt
)
5517 ps
->reject_reservation();
5518 post_event(RemoteReservationRejectedTooFull());
5519 return discard_event();
5522 boost::statechart::result
5523 PeeringState::RepWaitBackfillReserved::react(
5524 const RemoteReservationRejectedTooFull
&evt
)
5527 pl
->unreserve_recovery_space();
5529 pl
->cancel_remote_recovery_reservation();
5530 return transit
<RepNotRecovering
>();
5533 boost::statechart::result
5534 PeeringState::RepWaitBackfillReserved::react(
5535 const RemoteReservationCanceled
&evt
)
5538 pl
->unreserve_recovery_space();
5540 pl
->cancel_remote_recovery_reservation();
5541 return transit
<RepNotRecovering
>();
5544 /*---RepRecovering-------*/
5545 PeeringState::RepRecovering::RepRecovering(my_context ctx
)
5547 NamedState(context
< PeeringMachine
>().state_history
, "Started/ReplicaActive/RepRecovering")
5549 context
< PeeringMachine
>().log_enter(state_name
);
5552 boost::statechart::result
5553 PeeringState::RepRecovering::react(const RemoteRecoveryPreempted
&)
5558 pl
->unreserve_recovery_space();
5559 pl
->send_cluster_message(
5561 make_message
<MRecoveryReserve
>(
5562 MRecoveryReserve::REVOKE
,
5563 spg_t(ps
->info
.pgid
.pgid
, ps
->primary
.shard
),
5564 ps
->get_osdmap_epoch()),
5565 ps
->get_osdmap_epoch());
5566 return discard_event();
5569 boost::statechart::result
5570 PeeringState::RepRecovering::react(const BackfillTooFull
&)
5575 pl
->unreserve_recovery_space();
5576 pl
->send_cluster_message(
5578 make_message
<MBackfillReserve
>(
5579 MBackfillReserve::REVOKE_TOOFULL
,
5580 spg_t(ps
->info
.pgid
.pgid
, ps
->primary
.shard
),
5581 ps
->get_osdmap_epoch()),
5582 ps
->get_osdmap_epoch());
5583 return discard_event();
5586 boost::statechart::result
5587 PeeringState::RepRecovering::react(const RemoteBackfillPreempted
&)
5592 pl
->unreserve_recovery_space();
5593 pl
->send_cluster_message(
5595 make_message
<MBackfillReserve
>(
5596 MBackfillReserve::REVOKE
,
5597 spg_t(ps
->info
.pgid
.pgid
, ps
->primary
.shard
),
5598 ps
->get_osdmap_epoch()),
5599 ps
->get_osdmap_epoch());
5600 return discard_event();
5603 void PeeringState::RepRecovering::exit()
5605 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5607 pl
->unreserve_recovery_space();
5609 pl
->cancel_remote_recovery_reservation();
5610 utime_t dur
= ceph_clock_now() - enter_time
;
5611 pl
->get_peering_perf().tinc(rs_reprecovering_latency
, dur
);
5614 /*------Activating--------*/
5615 PeeringState::Activating::Activating(my_context ctx
)
5617 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/Activating")
5619 context
< PeeringMachine
>().log_enter(state_name
);
5622 void PeeringState::Activating::exit()
5624 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5626 utime_t dur
= ceph_clock_now() - enter_time
;
5627 pl
->get_peering_perf().tinc(rs_activating_latency
, dur
);
5630 PeeringState::WaitLocalRecoveryReserved::WaitLocalRecoveryReserved(my_context ctx
)
5632 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/WaitLocalRecoveryReserved")
5634 context
< PeeringMachine
>().log_enter(state_name
);
5637 // Make sure all nodes that part of the recovery aren't full
5638 if (!ps
->cct
->_conf
->osd_debug_skip_full_check_in_recovery
&&
5639 ps
->get_osdmap()->check_full(ps
->acting_recovery_backfill
)) {
5640 post_event(RecoveryTooFull());
5644 ps
->state_clear(PG_STATE_RECOVERY_TOOFULL
);
5645 ps
->state_set(PG_STATE_RECOVERY_WAIT
);
5646 pl
->request_local_background_io_reservation(
5647 ps
->get_recovery_priority(),
5648 std::make_unique
<PGPeeringEvent
>(
5649 ps
->get_osdmap_epoch(),
5650 ps
->get_osdmap_epoch(),
5651 LocalRecoveryReserved()),
5652 std::make_unique
<PGPeeringEvent
>(
5653 ps
->get_osdmap_epoch(),
5654 ps
->get_osdmap_epoch(),
5655 DeferRecovery(0.0)));
5656 pl
->publish_stats_to_osd();
5659 boost::statechart::result
5660 PeeringState::WaitLocalRecoveryReserved::react(const RecoveryTooFull
&evt
)
5663 ps
->state_set(PG_STATE_RECOVERY_TOOFULL
);
5664 pl
->schedule_event_after(
5665 std::make_shared
<PGPeeringEvent
>(
5666 ps
->get_osdmap_epoch(),
5667 ps
->get_osdmap_epoch(),
5669 ps
->cct
->_conf
->osd_recovery_retry_interval
);
5670 return transit
<NotRecovering
>();
5673 void PeeringState::WaitLocalRecoveryReserved::exit()
5675 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5677 utime_t dur
= ceph_clock_now() - enter_time
;
5678 pl
->get_peering_perf().tinc(rs_waitlocalrecoveryreserved_latency
, dur
);
5681 PeeringState::WaitRemoteRecoveryReserved::WaitRemoteRecoveryReserved(my_context ctx
)
5683 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/WaitRemoteRecoveryReserved"),
5684 remote_recovery_reservation_it(context
< Active
>().remote_shards_to_reserve_recovery
.begin())
5686 context
< PeeringMachine
>().log_enter(state_name
);
5687 post_event(RemoteRecoveryReserved());
5690 boost::statechart::result
5691 PeeringState::WaitRemoteRecoveryReserved::react(const RemoteRecoveryReserved
&evt
) {
5694 if (remote_recovery_reservation_it
!=
5695 context
< Active
>().remote_shards_to_reserve_recovery
.end()) {
5696 ceph_assert(*remote_recovery_reservation_it
!= ps
->pg_whoami
);
5697 pl
->send_cluster_message(
5698 remote_recovery_reservation_it
->osd
,
5699 make_message
<MRecoveryReserve
>(
5700 MRecoveryReserve::REQUEST
,
5701 spg_t(context
< PeeringMachine
>().spgid
.pgid
,
5702 remote_recovery_reservation_it
->shard
),
5703 ps
->get_osdmap_epoch(),
5704 ps
->get_recovery_priority()),
5705 ps
->get_osdmap_epoch());
5706 ++remote_recovery_reservation_it
;
5708 post_event(AllRemotesReserved());
5710 return discard_event();
5713 void PeeringState::WaitRemoteRecoveryReserved::exit()
5715 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5717 utime_t dur
= ceph_clock_now() - enter_time
;
5718 pl
->get_peering_perf().tinc(rs_waitremoterecoveryreserved_latency
, dur
);
5721 PeeringState::Recovering::Recovering(my_context ctx
)
5723 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/Recovering")
5725 context
< PeeringMachine
>().log_enter(state_name
);
5728 ps
->state_clear(PG_STATE_RECOVERY_WAIT
);
5729 ps
->state_clear(PG_STATE_RECOVERY_TOOFULL
);
5730 ps
->state_set(PG_STATE_RECOVERING
);
5731 pl
->on_recovery_reserved();
5732 ceph_assert(!ps
->state_test(PG_STATE_ACTIVATING
));
5733 pl
->publish_stats_to_osd();
5736 void PeeringState::Recovering::release_reservations(bool cancel
)
5739 ceph_assert(cancel
|| !ps
->pg_log
.get_missing().have_missing());
5741 // release remote reservations
5742 for (auto i
= context
< Active
>().remote_shards_to_reserve_recovery
.begin();
5743 i
!= context
< Active
>().remote_shards_to_reserve_recovery
.end();
5745 if (*i
== ps
->pg_whoami
) // skip myself
5747 pl
->send_cluster_message(
5749 make_message
<MRecoveryReserve
>(
5750 MRecoveryReserve::RELEASE
,
5751 spg_t(ps
->info
.pgid
.pgid
, i
->shard
),
5752 ps
->get_osdmap_epoch()),
5753 ps
->get_osdmap_epoch());
5757 boost::statechart::result
5758 PeeringState::Recovering::react(const AllReplicasRecovered
&evt
)
5761 ps
->state_clear(PG_STATE_FORCED_RECOVERY
);
5762 release_reservations();
5763 pl
->cancel_local_background_io_reservation();
5764 return transit
<Recovered
>();
5767 boost::statechart::result
5768 PeeringState::Recovering::react(const RequestBackfill
&evt
)
5772 release_reservations();
5774 ps
->state_clear(PG_STATE_FORCED_RECOVERY
);
5775 pl
->cancel_local_background_io_reservation();
5776 pl
->publish_stats_to_osd();
5777 // transit any async_recovery_targets back into acting
5778 // so pg won't have to stay undersized for long
5779 // as backfill might take a long time to complete..
5780 if (!ps
->async_recovery_targets
.empty()) {
5781 pg_shard_t auth_log_shard
;
5782 bool history_les_bound
= false;
5783 // FIXME: Uh-oh we have to check this return value; choose_acting can fail!
5784 ps
->choose_acting(auth_log_shard
, true, &history_les_bound
);
5786 return transit
<WaitLocalBackfillReserved
>();
5789 boost::statechart::result
5790 PeeringState::Recovering::react(const DeferRecovery
&evt
)
5793 if (!ps
->state_test(PG_STATE_RECOVERING
)) {
5794 // we may have finished recovery and have an AllReplicasRecovered
5795 // event queued to move us to the next state.
5796 psdout(10) << "got defer recovery but not recovering" << dendl
;
5797 return discard_event();
5799 psdout(10) << "defer recovery, retry delay " << evt
.delay
<< dendl
;
5800 ps
->state_set(PG_STATE_RECOVERY_WAIT
);
5801 pl
->cancel_local_background_io_reservation();
5802 release_reservations(true);
5803 pl
->schedule_event_after(
5804 std::make_shared
<PGPeeringEvent
>(
5805 ps
->get_osdmap_epoch(),
5806 ps
->get_osdmap_epoch(),
5809 return transit
<NotRecovering
>();
5812 boost::statechart::result
5813 PeeringState::Recovering::react(const UnfoundRecovery
&evt
)
5816 psdout(10) << "recovery has unfound, can't continue" << dendl
;
5817 ps
->state_set(PG_STATE_RECOVERY_UNFOUND
);
5818 pl
->cancel_local_background_io_reservation();
5819 release_reservations(true);
5820 return transit
<NotRecovering
>();
5823 void PeeringState::Recovering::exit()
5825 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5828 utime_t dur
= ceph_clock_now() - enter_time
;
5829 ps
->state_clear(PG_STATE_RECOVERING
);
5830 pl
->get_peering_perf().tinc(rs_recovering_latency
, dur
);
5833 PeeringState::Recovered::Recovered(my_context ctx
)
5835 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/Recovered")
5837 pg_shard_t auth_log_shard
;
5839 context
< PeeringMachine
>().log_enter(state_name
);
5843 ceph_assert(!ps
->needs_recovery());
5845 // if we finished backfill, all acting are active; recheck if
5846 // DEGRADED | UNDERSIZED is appropriate.
5847 ceph_assert(!ps
->acting_recovery_backfill
.empty());
5848 if (ps
->get_osdmap()->get_pg_size(context
< PeeringMachine
>().spgid
.pgid
) <=
5849 ps
->acting_recovery_backfill
.size()) {
5850 ps
->state_clear(PG_STATE_FORCED_BACKFILL
| PG_STATE_FORCED_RECOVERY
);
5851 pl
->publish_stats_to_osd();
5854 // adjust acting set? (e.g. because backfill completed...)
5855 bool history_les_bound
= false;
5856 if (ps
->acting
!= ps
->up
&& !ps
->choose_acting(auth_log_shard
,
5857 true, &history_les_bound
)) {
5858 ceph_assert(ps
->want_acting
.size());
5859 } else if (!ps
->async_recovery_targets
.empty()) {
5860 // FIXME: Uh-oh we have to check this return value; choose_acting can fail!
5861 ps
->choose_acting(auth_log_shard
, true, &history_les_bound
);
5864 if (context
< Active
>().all_replicas_activated
&&
5865 ps
->async_recovery_targets
.empty())
5866 post_event(GoClean());
5869 void PeeringState::Recovered::exit()
5871 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5874 utime_t dur
= ceph_clock_now() - enter_time
;
5875 pl
->get_peering_perf().tinc(rs_recovered_latency
, dur
);
5878 PeeringState::Clean::Clean(my_context ctx
)
5880 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/Clean")
5882 context
< PeeringMachine
>().log_enter(state_name
);
5886 if (ps
->info
.last_complete
!= ps
->info
.last_update
) {
5891 ps
->try_mark_clean();
5893 context
< PeeringMachine
>().get_cur_transaction().register_on_commit(
5897 void PeeringState::Clean::exit()
5899 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5902 ps
->state_clear(PG_STATE_CLEAN
);
5903 utime_t dur
= ceph_clock_now() - enter_time
;
5904 pl
->get_peering_perf().tinc(rs_clean_latency
, dur
);
5907 template <typename T
>
5908 set
<pg_shard_t
> unique_osd_shard_set(const pg_shard_t
& skip
, const T
&in
)
5910 set
<int> osds_found
;
5911 set
<pg_shard_t
> out
;
5912 for (auto i
= in
.begin(); i
!= in
.end(); ++i
) {
5913 if (*i
!= skip
&& !osds_found
.count(i
->osd
)) {
5914 osds_found
.insert(i
->osd
);
5921 /*---------Active---------*/
5922 PeeringState::Active::Active(my_context ctx
)
5924 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active"),
5925 remote_shards_to_reserve_recovery(
5926 unique_osd_shard_set(
5927 context
< PeeringMachine
>().state
->pg_whoami
,
5928 context
< PeeringMachine
>().state
->acting_recovery_backfill
)),
5929 remote_shards_to_reserve_backfill(
5930 unique_osd_shard_set(
5931 context
< PeeringMachine
>().state
->pg_whoami
,
5932 context
< PeeringMachine
>().state
->backfill_targets
)),
5933 all_replicas_activated(false)
5935 context
< PeeringMachine
>().log_enter(state_name
);
5940 ceph_assert(!ps
->backfill_reserved
);
5941 ceph_assert(ps
->is_primary());
5942 psdout(10) << "In Active, about to call activate" << dendl
;
5943 ps
->start_flush(context
< PeeringMachine
>().get_cur_transaction());
5944 ps
->activate(context
< PeeringMachine
>().get_cur_transaction(),
5945 ps
->get_osdmap_epoch(),
5946 context
< PeeringMachine
>().get_recovery_ctx());
5948 // everyone has to commit/ack before we are truly active
5949 ps
->blocked_by
.clear();
5950 for (auto p
= ps
->acting_recovery_backfill
.begin();
5951 p
!= ps
->acting_recovery_backfill
.end();
5953 if (p
->shard
!= ps
->pg_whoami
.shard
) {
5954 ps
->blocked_by
.insert(p
->shard
);
5957 pl
->publish_stats_to_osd();
5958 psdout(10) << "Activate Finished" << dendl
;
5961 boost::statechart::result
PeeringState::Active::react(const AdvMap
& advmap
)
5965 if (ps
->should_restart_peering(
5967 advmap
.acting_primary
,
5972 psdout(10) << "Active advmap interval change, fast return" << dendl
;
5973 return forward_event();
5975 psdout(10) << "Active advmap" << dendl
;
5976 bool need_publish
= false;
5978 pl
->on_active_advmap(advmap
.osdmap
);
5979 if (ps
->dirty_big_info
) {
5980 // share updated purged_snaps to mgr/mon so that we (a) stop reporting
5981 // purged snaps and (b) perhaps share more snaps that we have purged
5982 // but didn't fit in pg_stat_t.
5983 need_publish
= true;
5984 ps
->share_pg_info();
5987 bool need_acting_change
= false;
5988 for (size_t i
= 0; i
< ps
->want_acting
.size(); i
++) {
5989 int osd
= ps
->want_acting
[i
];
5990 if (!advmap
.osdmap
->is_up(osd
)) {
5991 pg_shard_t
osd_with_shard(osd
, shard_id_t(i
));
5992 if (!ps
->is_acting(osd_with_shard
) && !ps
->is_up(osd_with_shard
)) {
5993 psdout(10) << "Active stray osd." << osd
<< " in want_acting is down"
5995 need_acting_change
= true;
5999 if (need_acting_change
) {
6000 psdout(10) << "Active need acting change, call choose_acting again"
6002 // possibly because we re-add some strays into the acting set and
6003 // some of them then go down in a subsequent map before we could see
6004 // the map changing the pg temp.
6005 // call choose_acting again to clear them out.
6006 // note that we leave restrict_to_up_acting to false in order to
6007 // not overkill any chosen stray that is still alive.
6008 pg_shard_t auth_log_shard
;
6009 bool history_les_bound
= false;
6010 ps
->remove_down_peer_info(advmap
.osdmap
);
6011 ps
->choose_acting(auth_log_shard
, false, &history_les_bound
, true);
6014 /* Check for changes in pool size (if the acting set changed as a result,
6015 * this does not matter) */
6016 if (advmap
.lastmap
->get_pg_size(ps
->info
.pgid
.pgid
) !=
6017 ps
->get_osdmap()->get_pg_size(ps
->info
.pgid
.pgid
)) {
6018 if (ps
->get_osdmap()->get_pg_size(ps
->info
.pgid
.pgid
) <=
6019 ps
->actingset
.size()) {
6020 ps
->state_clear(PG_STATE_UNDERSIZED
);
6022 ps
->state_set(PG_STATE_UNDERSIZED
);
6024 // degraded changes will be detected by call from publish_stats_to_osd()
6025 need_publish
= true;
6028 // if we haven't reported our PG stats in a long time, do so now.
6029 if (ps
->info
.stats
.reported_epoch
+ ps
->cct
->_conf
->osd_pg_stat_report_interval_max
< advmap
.osdmap
->get_epoch()) {
6030 psdout(20) << "reporting stats to osd after " << (advmap
.osdmap
->get_epoch() - ps
->info
.stats
.reported_epoch
)
6031 << " epochs" << dendl
;
6032 need_publish
= true;
6036 pl
->publish_stats_to_osd();
6038 if (ps
->check_prior_readable_down_osds(advmap
.osdmap
)) {
6039 pl
->recheck_readable();
6042 return forward_event();
6045 boost::statechart::result
PeeringState::Active::react(const ActMap
&)
6048 psdout(10) << "Active: handling ActMap" << dendl
;
6049 ceph_assert(ps
->is_primary());
6051 pl
->on_active_actmap();
6053 if (ps
->have_unfound()) {
6054 // object may have become unfound
6055 ps
->discover_all_missing(context
<PeeringMachine
>().get_recovery_ctx().msgs
);
6058 uint64_t unfound
= ps
->missing_loc
.num_unfound();
6060 ps
->all_unfound_are_queried_or_lost(ps
->get_osdmap())) {
6061 if (ps
->cct
->_conf
->osd_auto_mark_unfound_lost
) {
6062 pl
->get_clog_error() << context
< PeeringMachine
>().spgid
.pgid
<< " has " << unfound
6063 << " objects unfound and apparently lost, would automatically "
6064 << "mark these objects lost but this feature is not yet implemented "
6065 << "(osd_auto_mark_unfound_lost)";
6067 pl
->get_clog_error() << context
< PeeringMachine
>().spgid
.pgid
<< " has "
6068 << unfound
<< " objects unfound and apparently lost";
6071 return forward_event();
6074 boost::statechart::result
PeeringState::Active::react(const MNotifyRec
& notevt
)
6078 ceph_assert(ps
->is_primary());
6079 if (ps
->peer_info
.count(notevt
.from
)) {
6080 psdout(10) << "Active: got notify from " << notevt
.from
6081 << ", already have info from that osd, ignoring"
6083 } else if (ps
->peer_purged
.count(notevt
.from
)) {
6084 psdout(10) << "Active: got notify from " << notevt
.from
6085 << ", already purged that peer, ignoring"
6088 psdout(10) << "Active: got notify from " << notevt
.from
6089 << ", calling proc_replica_info and discover_all_missing"
6091 ps
->proc_replica_info(
6092 notevt
.from
, notevt
.notify
.info
, notevt
.notify
.epoch_sent
);
6093 if (ps
->have_unfound() || (ps
->is_degraded() && ps
->might_have_unfound
.count(notevt
.from
))) {
6094 ps
->discover_all_missing(
6095 context
<PeeringMachine
>().get_recovery_ctx().msgs
);
6097 // check if it is a previous down acting member that's coming back.
6098 // if so, request pg_temp change to trigger a new interval transition
6099 pg_shard_t auth_log_shard
;
6100 bool history_les_bound
= false;
6101 // FIXME: Uh-oh we have to check this return value; choose_acting can fail!
6102 ps
->choose_acting(auth_log_shard
, false, &history_les_bound
, true);
6103 if (!ps
->want_acting
.empty() && ps
->want_acting
!= ps
->acting
) {
6104 psdout(10) << "Active: got notify from previous acting member "
6105 << notevt
.from
<< ", requesting pg_temp change"
6109 return discard_event();
6112 boost::statechart::result
PeeringState::Active::react(const MTrim
& trim
)
6115 ceph_assert(ps
->is_primary());
6117 // peer is informing us of their last_complete_ondisk
6118 ldout(ps
->cct
,10) << " replica osd." << trim
.from
<< " lcod " << trim
.trim_to
<< dendl
;
6119 ps
->update_peer_last_complete_ondisk(pg_shard_t
{trim
.from
, trim
.shard
},
6121 // trim log when the pg is recovered
6122 ps
->calc_min_last_complete_ondisk();
6123 return discard_event();
6126 boost::statechart::result
PeeringState::Active::react(const MInfoRec
& infoevt
)
6129 ceph_assert(ps
->is_primary());
6131 ceph_assert(!ps
->acting_recovery_backfill
.empty());
6132 if (infoevt
.lease_ack
) {
6133 ps
->proc_lease_ack(infoevt
.from
.osd
, *infoevt
.lease_ack
);
6135 // don't update history (yet) if we are active and primary; the replica
6136 // may be telling us they have activated (and committed) but we can't
6137 // share that until _everyone_ does the same.
6138 if (ps
->is_acting_recovery_backfill(infoevt
.from
) &&
6139 ps
->peer_activated
.count(infoevt
.from
) == 0) {
6140 psdout(10) << " peer osd." << infoevt
.from
6141 << " activated and committed" << dendl
;
6142 ps
->peer_activated
.insert(infoevt
.from
);
6143 ps
->blocked_by
.erase(infoevt
.from
.shard
);
6144 pl
->publish_stats_to_osd();
6145 if (ps
->peer_activated
.size() == ps
->acting_recovery_backfill
.size()) {
6146 all_activated_and_committed();
6149 return discard_event();
6152 boost::statechart::result
PeeringState::Active::react(const MLogRec
& logevt
)
6155 psdout(10) << "searching osd." << logevt
.from
6156 << " log for unfound items" << dendl
;
6157 ps
->proc_replica_log(
6158 logevt
.msg
->info
, logevt
.msg
->log
, std::move(logevt
.msg
->missing
), logevt
.from
);
6159 bool got_missing
= ps
->search_for_missing(
6160 ps
->peer_info
[logevt
.from
],
6161 ps
->peer_missing
[logevt
.from
],
6163 context
< PeeringMachine
>().get_recovery_ctx());
6164 // If there are missing AND we are "fully" active then start recovery now
6165 if (got_missing
&& ps
->state_test(PG_STATE_ACTIVE
)) {
6166 post_event(DoRecovery());
6168 return discard_event();
6171 boost::statechart::result
PeeringState::Active::react(const QueryState
& q
)
6175 q
.f
->open_object_section("state");
6176 q
.f
->dump_string("name", state_name
);
6177 q
.f
->dump_stream("enter_time") << enter_time
;
6180 q
.f
->open_array_section("might_have_unfound");
6181 for (auto p
= ps
->might_have_unfound
.begin();
6182 p
!= ps
->might_have_unfound
.end();
6184 q
.f
->open_object_section("osd");
6185 q
.f
->dump_stream("osd") << *p
;
6186 if (ps
->peer_missing
.count(*p
)) {
6187 q
.f
->dump_string("status", "already probed");
6188 } else if (ps
->peer_missing_requested
.count(*p
)) {
6189 q
.f
->dump_string("status", "querying");
6190 } else if (!ps
->get_osdmap()->is_up(p
->osd
)) {
6191 q
.f
->dump_string("status", "osd is down");
6193 q
.f
->dump_string("status", "not queried");
6195 q
.f
->close_section();
6197 q
.f
->close_section();
6200 q
.f
->open_object_section("recovery_progress");
6201 q
.f
->open_array_section("backfill_targets");
6202 for (auto p
= ps
->backfill_targets
.begin();
6203 p
!= ps
->backfill_targets
.end(); ++p
)
6204 q
.f
->dump_stream("replica") << *p
;
6205 q
.f
->close_section();
6206 pl
->dump_recovery_info(q
.f
);
6207 q
.f
->close_section();
6210 q
.f
->close_section();
6211 return forward_event();
6214 boost::statechart::result
PeeringState::Active::react(const QueryUnfound
& q
)
6218 ps
->query_unfound(q
.f
, "Active");
6219 return discard_event();
6222 boost::statechart::result
PeeringState::Active::react(
6223 const ActivateCommitted
&evt
)
6226 ceph_assert(!ps
->peer_activated
.count(ps
->pg_whoami
));
6227 ps
->peer_activated
.insert(ps
->pg_whoami
);
6228 psdout(10) << "_activate_committed " << evt
.epoch
6229 << " peer_activated now " << ps
->peer_activated
6230 << " last_interval_started "
6231 << ps
->info
.history
.last_interval_started
6232 << " last_epoch_started "
6233 << ps
->info
.history
.last_epoch_started
6234 << " same_interval_since "
6235 << ps
->info
.history
.same_interval_since
6237 ceph_assert(!ps
->acting_recovery_backfill
.empty());
6238 if (ps
->peer_activated
.size() == ps
->acting_recovery_backfill
.size())
6239 all_activated_and_committed();
6240 return discard_event();
6243 boost::statechart::result
PeeringState::Active::react(const AllReplicasActivated
&evt
)
6247 pg_t pgid
= context
< PeeringMachine
>().spgid
.pgid
;
6249 all_replicas_activated
= true;
6251 ps
->state_clear(PG_STATE_ACTIVATING
);
6252 ps
->state_clear(PG_STATE_CREATING
);
6253 ps
->state_clear(PG_STATE_PREMERGE
);
6256 if (ps
->pool
.info
.is_pending_merge(pgid
, &merge_target
)) {
6257 ps
->state_set(PG_STATE_PEERED
);
6258 ps
->state_set(PG_STATE_PREMERGE
);
6260 if (ps
->actingset
.size() != ps
->get_osdmap()->get_pg_size(pgid
)) {
6263 src
.set_ps(ps
->pool
.info
.get_pg_num_pending());
6264 assert(src
.get_parent() == pgid
);
6265 pl
->set_not_ready_to_merge_target(pgid
, src
);
6267 pl
->set_not_ready_to_merge_source(pgid
);
6270 } else if (!ps
->acting_set_writeable()) {
6271 ps
->state_set(PG_STATE_PEERED
);
6273 ps
->state_set(PG_STATE_ACTIVE
);
6276 auto mnow
= pl
->get_mnow();
6277 if (ps
->prior_readable_until_ub
> mnow
) {
6278 psdout(10) << " waiting for prior_readable_until_ub "
6279 << ps
->prior_readable_until_ub
<< " > mnow " << mnow
<< dendl
;
6280 ps
->state_set(PG_STATE_WAIT
);
6281 pl
->queue_check_readable(
6282 ps
->last_peering_reset
,
6283 ps
->prior_readable_until_ub
- mnow
);
6285 psdout(10) << " mnow " << mnow
<< " >= prior_readable_until_ub "
6286 << ps
->prior_readable_until_ub
<< dendl
;
6289 if (ps
->pool
.info
.has_flag(pg_pool_t::FLAG_CREATING
)) {
6290 pl
->send_pg_created(pgid
);
6293 ps
->info
.history
.last_epoch_started
= ps
->info
.last_epoch_started
;
6294 ps
->info
.history
.last_interval_started
= ps
->info
.last_interval_started
;
6295 ps
->dirty_info
= true;
6297 ps
->share_pg_info();
6298 pl
->publish_stats_to_osd();
6300 pl
->on_activate_complete();
6302 return discard_event();
6305 boost::statechart::result
PeeringState::Active::react(const RenewLease
& rl
)
6308 ps
->proc_renew_lease();
6309 return discard_event();
6312 boost::statechart::result
PeeringState::Active::react(const MLeaseAck
& la
)
6315 ps
->proc_lease_ack(la
.from
, la
.lease_ack
);
6316 return discard_event();
6320 boost::statechart::result
PeeringState::Active::react(const CheckReadable
&evt
)
6323 pl
->recheck_readable();
6324 return discard_event();
6328 * update info.history.last_epoch_started ONLY after we and all
6329 * replicas have activated AND committed the activate transaction
6330 * (i.e. the peering results are stable on disk).
6332 void PeeringState::Active::all_activated_and_committed()
6335 psdout(10) << "all_activated_and_committed" << dendl
;
6336 ceph_assert(ps
->is_primary());
6337 ceph_assert(ps
->peer_activated
.size() == ps
->acting_recovery_backfill
.size());
6338 ceph_assert(!ps
->acting_recovery_backfill
.empty());
6339 ceph_assert(ps
->blocked_by
.empty());
6341 if (HAVE_FEATURE(ps
->upacting_features
, SERVER_OCTOPUS
)) {
6342 // this is overkill when the activation is quick, but when it is slow it
6343 // is important, because the lease was renewed by the activate itself but we
6344 // don't know how long ago that was, and simply scheduling now may leave
6345 // a gap in lease coverage. keep it simple and aggressively renew.
6346 ps
->renew_lease(pl
->get_mnow());
6348 ps
->schedule_renew_lease();
6352 ps
->update_calc_stats();
6353 if (ps
->info
.stats
.stats
.sum
.num_objects_degraded
) {
6354 ps
->state_set(PG_STATE_DEGRADED
);
6356 ps
->state_clear(PG_STATE_DEGRADED
);
6359 post_event(PeeringState::AllReplicasActivated());
6363 void PeeringState::Active::exit()
6365 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
6369 pl
->cancel_local_background_io_reservation();
6371 ps
->blocked_by
.clear();
6372 ps
->backfill_reserved
= false;
6373 ps
->state_clear(PG_STATE_ACTIVATING
);
6374 ps
->state_clear(PG_STATE_DEGRADED
);
6375 ps
->state_clear(PG_STATE_UNDERSIZED
);
6376 ps
->state_clear(PG_STATE_BACKFILL_TOOFULL
);
6377 ps
->state_clear(PG_STATE_BACKFILL_WAIT
);
6378 ps
->state_clear(PG_STATE_RECOVERY_WAIT
);
6379 ps
->state_clear(PG_STATE_RECOVERY_TOOFULL
);
6380 utime_t dur
= ceph_clock_now() - enter_time
;
6381 pl
->get_peering_perf().tinc(rs_active_latency
, dur
);
6382 pl
->on_active_exit();
6385 /*------ReplicaActive-----*/
6386 PeeringState::ReplicaActive::ReplicaActive(my_context ctx
)
6388 NamedState(context
< PeeringMachine
>().state_history
, "Started/ReplicaActive")
6390 context
< PeeringMachine
>().log_enter(state_name
);
6393 ps
->start_flush(context
< PeeringMachine
>().get_cur_transaction());
6397 boost::statechart::result
PeeringState::ReplicaActive::react(
6398 const Activate
& actevt
) {
6400 psdout(10) << "In ReplicaActive, about to call activate" << dendl
;
6402 context
< PeeringMachine
>().get_cur_transaction(),
6403 actevt
.activation_epoch
,
6404 context
< PeeringMachine
>().get_recovery_ctx());
6405 psdout(10) << "Activate Finished" << dendl
;
6406 return discard_event();
6409 boost::statechart::result
PeeringState::ReplicaActive::react(
6410 const ActivateCommitted
&evt
)
6413 psdout(10) << __func__
<< " " << evt
.epoch
<< " telling primary" << dendl
;
6415 auto &rctx
= context
<PeeringMachine
>().get_recovery_ctx();
6416 auto epoch
= ps
->get_osdmap_epoch();
6417 pg_info_t i
= ps
->info
;
6418 i
.history
.last_epoch_started
= evt
.activation_epoch
;
6419 i
.history
.last_interval_started
= i
.history
.same_interval_since
;
6421 ps
->get_primary().osd
,
6422 spg_t(ps
->info
.pgid
.pgid
, ps
->get_primary().shard
),
6427 ps
->get_lease_ack());
6429 if (ps
->acting_set_writeable()) {
6430 ps
->state_set(PG_STATE_ACTIVE
);
6432 ps
->state_set(PG_STATE_PEERED
);
6434 pl
->on_activate_committed();
6436 return discard_event();
6439 boost::statechart::result
PeeringState::ReplicaActive::react(const MLease
& l
)
6442 spg_t spgid
= context
< PeeringMachine
>().spgid
;
6443 epoch_t epoch
= pl
->get_osdmap_epoch();
6445 ps
->proc_lease(l
.lease
);
6446 pl
->send_cluster_message(
6447 ps
->get_primary().osd
,
6448 make_message
<MOSDPGLeaseAck
>(epoch
,
6449 spg_t(spgid
.pgid
, ps
->get_primary().shard
),
6450 ps
->get_lease_ack()),
6452 return discard_event();
6455 boost::statechart::result
PeeringState::ReplicaActive::react(const MInfoRec
& infoevt
)
6458 ps
->proc_primary_info(context
<PeeringMachine
>().get_cur_transaction(),
6460 return discard_event();
6463 boost::statechart::result
PeeringState::ReplicaActive::react(const MLogRec
& logevt
)
6466 psdout(10) << "received log from " << logevt
.from
<< dendl
;
6467 ObjectStore::Transaction
&t
= context
<PeeringMachine
>().get_cur_transaction();
6468 ps
->merge_log(t
, logevt
.msg
->info
, std::move(logevt
.msg
->log
), logevt
.from
);
6469 ceph_assert(ps
->pg_log
.get_head() == ps
->info
.last_update
);
6470 if (logevt
.msg
->lease
) {
6471 ps
->proc_lease(*logevt
.msg
->lease
);
6474 return discard_event();
6477 boost::statechart::result
PeeringState::ReplicaActive::react(const MTrim
& trim
)
6480 // primary is instructing us to trim
6481 ps
->pg_log
.trim(trim
.trim_to
, ps
->info
);
6482 ps
->dirty_info
= true;
6483 return discard_event();
6486 boost::statechart::result
PeeringState::ReplicaActive::react(const ActMap
&)
6489 if (ps
->should_send_notify() && ps
->get_primary().osd
>= 0) {
6490 ps
->info
.history
.refresh_prior_readable_until_ub(
6491 pl
->get_mnow(), ps
->prior_readable_until_ub
);
6492 context
< PeeringMachine
>().send_notify(
6493 ps
->get_primary().osd
,
6495 ps
->get_primary().shard
, ps
->pg_whoami
.shard
,
6496 ps
->get_osdmap_epoch(),
6497 ps
->get_osdmap_epoch(),
6499 ps
->past_intervals
));
6501 return discard_event();
6504 boost::statechart::result
PeeringState::ReplicaActive::react(
6505 const MQuery
& query
)
6508 ps
->fulfill_query(query
, context
<PeeringMachine
>().get_recovery_ctx());
6509 return discard_event();
6512 boost::statechart::result
PeeringState::ReplicaActive::react(const QueryState
& q
)
6514 q
.f
->open_object_section("state");
6515 q
.f
->dump_string("name", state_name
);
6516 q
.f
->dump_stream("enter_time") << enter_time
;
6517 q
.f
->close_section();
6518 return forward_event();
6521 boost::statechart::result
PeeringState::ReplicaActive::react(const QueryUnfound
& q
)
6523 q
.f
->dump_string("state", "ReplicaActive");
6524 q
.f
->dump_bool("available_might_have_unfound", false);
6525 return discard_event();
6528 void PeeringState::ReplicaActive::exit()
6530 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
6532 pl
->unreserve_recovery_space();
6534 pl
->cancel_remote_recovery_reservation();
6535 utime_t dur
= ceph_clock_now() - enter_time
;
6536 pl
->get_peering_perf().tinc(rs_replicaactive_latency
, dur
);
6538 ps
->min_last_complete_ondisk
= eversion_t();
6542 PeeringState::Stray::Stray(my_context ctx
)
6544 NamedState(context
< PeeringMachine
>().state_history
, "Started/Stray")
6546 context
< PeeringMachine
>().log_enter(state_name
);
6550 ceph_assert(!ps
->is_peered());
6551 ceph_assert(!ps
->is_peering());
6552 ceph_assert(!ps
->is_primary());
6554 if (!ps
->get_osdmap()->have_pg_pool(ps
->info
.pgid
.pgid
.pool())) {
6555 ldout(ps
->cct
,10) << __func__
<< " pool is deleted" << dendl
;
6556 post_event(DeleteStart());
6558 ps
->start_flush(context
< PeeringMachine
>().get_cur_transaction());
6562 boost::statechart::result
PeeringState::Stray::react(const MLogRec
& logevt
)
6565 MOSDPGLog
*msg
= logevt
.msg
.get();
6566 psdout(10) << "got info+log from osd." << logevt
.from
<< " " << msg
->info
<< " " << msg
->log
<< dendl
;
6568 ObjectStore::Transaction
&t
= context
<PeeringMachine
>().get_cur_transaction();
6569 if (msg
->info
.last_backfill
== hobject_t()) {
6571 ps
->info
= msg
->info
;
6572 pl
->on_info_history_change();
6573 ps
->dirty_info
= true;
6574 ps
->dirty_big_info
= true; // maybe.
6576 PGLog::LogEntryHandlerRef rollbacker
{pl
->get_log_handler(t
)};
6577 ps
->pg_log
.reset_backfill_claim_log(msg
->log
, rollbacker
.get());
6579 ps
->pg_log
.reset_backfill();
6581 ps
->merge_log(t
, msg
->info
, std::move(msg
->log
), logevt
.from
);
6583 if (logevt
.msg
->lease
) {
6584 ps
->proc_lease(*logevt
.msg
->lease
);
6587 ceph_assert(ps
->pg_log
.get_head() == ps
->info
.last_update
);
6589 post_event(Activate(logevt
.msg
->info
.last_epoch_started
));
6590 return transit
<ReplicaActive
>();
6593 boost::statechart::result
PeeringState::Stray::react(const MInfoRec
& infoevt
)
6596 psdout(10) << "got info from osd." << infoevt
.from
<< " " << infoevt
.info
<< dendl
;
6598 if (ps
->info
.last_update
> infoevt
.info
.last_update
) {
6599 // rewind divergent log entries
6600 ObjectStore::Transaction
&t
= context
<PeeringMachine
>().get_cur_transaction();
6601 ps
->rewind_divergent_log(t
, infoevt
.info
.last_update
);
6602 ps
->info
.stats
= infoevt
.info
.stats
;
6603 ps
->info
.hit_set
= infoevt
.info
.hit_set
;
6606 if (infoevt
.lease
) {
6607 ps
->proc_lease(*infoevt
.lease
);
6610 ceph_assert(infoevt
.info
.last_update
== ps
->info
.last_update
);
6611 ceph_assert(ps
->pg_log
.get_head() == ps
->info
.last_update
);
6613 post_event(Activate(infoevt
.info
.last_epoch_started
));
6614 return transit
<ReplicaActive
>();
6617 boost::statechart::result
PeeringState::Stray::react(const MQuery
& query
)
6620 ps
->fulfill_query(query
, context
<PeeringMachine
>().get_recovery_ctx());
6621 return discard_event();
6624 boost::statechart::result
PeeringState::Stray::react(const ActMap
&)
6627 if (ps
->should_send_notify() && ps
->get_primary().osd
>= 0) {
6628 ps
->info
.history
.refresh_prior_readable_until_ub(
6629 pl
->get_mnow(), ps
->prior_readable_until_ub
);
6630 context
< PeeringMachine
>().send_notify(
6631 ps
->get_primary().osd
,
6633 ps
->get_primary().shard
, ps
->pg_whoami
.shard
,
6634 ps
->get_osdmap_epoch(),
6635 ps
->get_osdmap_epoch(),
6637 ps
->past_intervals
));
6639 return discard_event();
6642 void PeeringState::Stray::exit()
6644 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
6646 utime_t dur
= ceph_clock_now() - enter_time
;
6647 pl
->get_peering_perf().tinc(rs_stray_latency
, dur
);
6651 /*--------ToDelete----------*/
6652 PeeringState::ToDelete::ToDelete(my_context ctx
)
6654 NamedState(context
< PeeringMachine
>().state_history
, "Started/ToDelete")
6656 context
< PeeringMachine
>().log_enter(state_name
);
6658 pl
->get_perf_logger().inc(l_osd_pg_removing
);
6661 void PeeringState::ToDelete::exit()
6663 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
6665 // note: on a successful removal, this path doesn't execute. see
6667 pl
->get_perf_logger().dec(l_osd_pg_removing
);
6669 pl
->cancel_local_background_io_reservation();
6672 /*----WaitDeleteReserved----*/
6673 PeeringState::WaitDeleteReserved::WaitDeleteReserved(my_context ctx
)
6675 NamedState(context
< PeeringMachine
>().state_history
,
6676 "Started/ToDelete/WaitDeleteReseved")
6678 context
< PeeringMachine
>().log_enter(state_name
);
6680 context
< ToDelete
>().priority
= ps
->get_delete_priority();
6682 pl
->cancel_local_background_io_reservation();
6683 pl
->request_local_background_io_reservation(
6684 context
<ToDelete
>().priority
,
6685 std::make_unique
<PGPeeringEvent
>(
6686 ps
->get_osdmap_epoch(),
6687 ps
->get_osdmap_epoch(),
6689 std::make_unique
<PGPeeringEvent
>(
6690 ps
->get_osdmap_epoch(),
6691 ps
->get_osdmap_epoch(),
6692 DeleteInterrupted()));
6695 boost::statechart::result
PeeringState::ToDelete::react(
6699 if (ps
->get_delete_priority() != priority
) {
6700 psdout(10) << __func__
<< " delete priority changed, resetting"
6702 return transit
<ToDelete
>();
6704 return discard_event();
6707 void PeeringState::WaitDeleteReserved::exit()
6709 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
6712 /*----Deleting-----*/
6713 PeeringState::Deleting::Deleting(my_context ctx
)
6715 NamedState(context
< PeeringMachine
>().state_history
, "Started/ToDelete/Deleting")
6717 context
< PeeringMachine
>().log_enter(state_name
);
6720 ps
->deleting
= true;
6721 ObjectStore::Transaction
&t
= context
<PeeringMachine
>().get_cur_transaction();
6724 PGLog::LogEntryHandlerRef rollbacker
{pl
->get_log_handler(t
)};
6725 ps
->pg_log
.roll_forward(rollbacker
.get());
6727 // adjust info to backfill
6728 ps
->info
.set_last_backfill(hobject_t());
6729 ps
->pg_log
.reset_backfill();
6730 ps
->dirty_info
= true;
6735 boost::statechart::result
PeeringState::Deleting::react(
6736 const DeleteSome
& evt
)
6739 std::pair
<ghobject_t
, bool> p
;
6740 p
= pl
->do_delete_work(context
<PeeringMachine
>().get_cur_transaction(),
6743 return p
.second
? discard_event() : terminate();
6746 void PeeringState::Deleting::exit()
6748 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
6750 ps
->deleting
= false;
6751 pl
->cancel_local_background_io_reservation();
6754 /*--------GetInfo---------*/
6755 PeeringState::GetInfo::GetInfo(my_context ctx
)
6757 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Peering/GetInfo")
6759 context
< PeeringMachine
>().log_enter(state_name
);
6763 ps
->check_past_interval_bounds();
6764 ps
->log_weirdness();
6765 PastIntervals::PriorSet
&prior_set
= context
< Peering
>().prior_set
;
6767 ceph_assert(ps
->blocked_by
.empty());
6769 prior_set
= ps
->build_prior();
6770 ps
->prior_readable_down_osds
= prior_set
.down
;
6771 if (ps
->prior_readable_down_osds
.empty()) {
6772 psdout(10) << " no prior_set down osds, clearing prior_readable_until_ub"
6774 ps
->clear_prior_readable_until_ub();
6777 ps
->reset_min_peer_features();
6779 if (prior_set
.pg_down
) {
6780 post_event(IsDown());
6781 } else if (peer_info_requested
.empty()) {
6782 post_event(GotInfo());
6786 void PeeringState::GetInfo::get_infos()
6789 PastIntervals::PriorSet
&prior_set
= context
< Peering
>().prior_set
;
6791 ps
->blocked_by
.clear();
6792 for (auto it
= prior_set
.probe
.begin(); it
!= prior_set
.probe
.end(); ++it
) {
6793 pg_shard_t peer
= *it
;
6794 if (peer
== ps
->pg_whoami
) {
6797 if (ps
->peer_info
.count(peer
)) {
6798 psdout(10) << " have osd." << peer
<< " info " << ps
->peer_info
[peer
] << dendl
;
6801 if (peer_info_requested
.count(peer
)) {
6802 psdout(10) << " already requested info from osd." << peer
<< dendl
;
6803 ps
->blocked_by
.insert(peer
.osd
);
6804 } else if (!ps
->get_osdmap()->is_up(peer
.osd
)) {
6805 psdout(10) << " not querying info from down osd." << peer
<< dendl
;
6807 psdout(10) << " querying info from osd." << peer
<< dendl
;
6808 context
< PeeringMachine
>().send_query(
6810 pg_query_t(pg_query_t::INFO
,
6811 it
->shard
, ps
->pg_whoami
.shard
,
6813 ps
->get_osdmap_epoch()));
6814 peer_info_requested
.insert(peer
);
6815 ps
->blocked_by
.insert(peer
.osd
);
6819 ps
->check_prior_readable_down_osds(ps
->get_osdmap());
6821 pl
->publish_stats_to_osd();
6824 boost::statechart::result
PeeringState::GetInfo::react(const MNotifyRec
& infoevt
)
6829 auto p
= peer_info_requested
.find(infoevt
.from
);
6830 if (p
!= peer_info_requested
.end()) {
6831 peer_info_requested
.erase(p
);
6832 ps
->blocked_by
.erase(infoevt
.from
.osd
);
6835 epoch_t old_start
= ps
->info
.history
.last_epoch_started
;
6836 if (ps
->proc_replica_info(
6837 infoevt
.from
, infoevt
.notify
.info
, infoevt
.notify
.epoch_sent
)) {
6838 // we got something new ...
6839 PastIntervals::PriorSet
&prior_set
= context
< Peering
>().prior_set
;
6840 if (old_start
< ps
->info
.history
.last_epoch_started
) {
6841 psdout(10) << " last_epoch_started moved forward, rebuilding prior" << dendl
;
6842 prior_set
= ps
->build_prior();
6843 ps
->prior_readable_down_osds
= prior_set
.down
;
6845 // filter out any osds that got dropped from the probe set from
6846 // peer_info_requested. this is less expensive than restarting
6847 // peering (which would re-probe everyone).
6848 auto p
= peer_info_requested
.begin();
6849 while (p
!= peer_info_requested
.end()) {
6850 if (prior_set
.probe
.count(*p
) == 0) {
6851 psdout(20) << " dropping osd." << *p
<< " from info_requested, no longer in probe set" << dendl
;
6852 peer_info_requested
.erase(p
++);
6859 psdout(20) << "Adding osd: " << infoevt
.from
.osd
<< " peer features: "
6860 << hex
<< infoevt
.features
<< dec
<< dendl
;
6861 ps
->apply_peer_features(infoevt
.features
);
6863 // are we done getting everything?
6864 if (peer_info_requested
.empty() && !prior_set
.pg_down
) {
6865 psdout(20) << "Common peer features: " << hex
<< ps
->get_min_peer_features() << dec
<< dendl
;
6866 psdout(20) << "Common acting features: " << hex
<< ps
->get_min_acting_features() << dec
<< dendl
;
6867 psdout(20) << "Common upacting features: " << hex
<< ps
->get_min_upacting_features() << dec
<< dendl
;
6868 post_event(GotInfo());
6871 return discard_event();
6874 boost::statechart::result
PeeringState::GetInfo::react(const QueryState
& q
)
6877 q
.f
->open_object_section("state");
6878 q
.f
->dump_string("name", state_name
);
6879 q
.f
->dump_stream("enter_time") << enter_time
;
6881 q
.f
->open_array_section("requested_info_from");
6882 for (auto p
= peer_info_requested
.begin();
6883 p
!= peer_info_requested
.end();
6885 q
.f
->open_object_section("osd");
6886 q
.f
->dump_stream("osd") << *p
;
6887 if (ps
->peer_info
.count(*p
)) {
6888 q
.f
->open_object_section("got_info");
6889 ps
->peer_info
[*p
].dump(q
.f
);
6890 q
.f
->close_section();
6892 q
.f
->close_section();
6894 q
.f
->close_section();
6896 q
.f
->close_section();
6897 return forward_event();
6900 boost::statechart::result
PeeringState::GetInfo::react(const QueryUnfound
& q
)
6902 q
.f
->dump_string("state", "GetInfo");
6903 q
.f
->dump_bool("available_might_have_unfound", false);
6904 return discard_event();
6907 void PeeringState::GetInfo::exit()
6909 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
6912 utime_t dur
= ceph_clock_now() - enter_time
;
6913 pl
->get_peering_perf().tinc(rs_getinfo_latency
, dur
);
6914 ps
->blocked_by
.clear();
6917 /*------GetLog------------*/
6918 PeeringState::GetLog::GetLog(my_context ctx
)
6921 context
< PeeringMachine
>().state_history
,
6922 "Started/Primary/Peering/GetLog"),
6925 context
< PeeringMachine
>().log_enter(state_name
);
6929 ps
->log_weirdness();
6932 if (!ps
->choose_acting(auth_log_shard
, false,
6933 &context
< Peering
>().history_les_bound
)) {
6934 if (!ps
->want_acting
.empty()) {
6935 post_event(NeedActingChange());
6937 post_event(IsIncomplete());
6943 if (auth_log_shard
== ps
->pg_whoami
) {
6944 post_event(GotLog());
6948 const pg_info_t
& best
= ps
->peer_info
[auth_log_shard
];
6951 if (ps
->info
.last_update
< best
.log_tail
) {
6952 psdout(10) << " not contiguous with osd." << auth_log_shard
<< ", down" << dendl
;
6953 post_event(IsIncomplete());
6957 // how much log to request?
6958 eversion_t request_log_from
= ps
->info
.last_update
;
6959 ceph_assert(!ps
->acting_recovery_backfill
.empty());
6960 for (auto p
= ps
->acting_recovery_backfill
.begin();
6961 p
!= ps
->acting_recovery_backfill
.end();
6963 if (*p
== ps
->pg_whoami
) continue;
6964 pg_info_t
& ri
= ps
->peer_info
[*p
];
6965 if (ri
.last_update
< ps
->info
.log_tail
&& ri
.last_update
>= best
.log_tail
&&
6966 ri
.last_update
< request_log_from
)
6967 request_log_from
= ri
.last_update
;
6971 psdout(10) << " requesting log from osd." << auth_log_shard
<< dendl
;
6972 context
<PeeringMachine
>().send_query(
6976 auth_log_shard
.shard
, ps
->pg_whoami
.shard
,
6977 request_log_from
, ps
->info
.history
,
6978 ps
->get_osdmap_epoch()));
6980 ceph_assert(ps
->blocked_by
.empty());
6981 ps
->blocked_by
.insert(auth_log_shard
.osd
);
6982 pl
->publish_stats_to_osd();
6985 boost::statechart::result
PeeringState::GetLog::react(const AdvMap
& advmap
)
6987 // make sure our log source didn't go down. we need to check
6988 // explicitly because it may not be part of the prior set, which
6989 // means the Peering state check won't catch it going down.
6990 if (!advmap
.osdmap
->is_up(auth_log_shard
.osd
)) {
6991 psdout(10) << "GetLog: auth_log_shard osd."
6992 << auth_log_shard
.osd
<< " went down" << dendl
;
6994 return transit
< Reset
>();
6997 // let the Peering state do its checks.
6998 return forward_event();
7001 boost::statechart::result
PeeringState::GetLog::react(const MLogRec
& logevt
)
7004 if (logevt
.from
!= auth_log_shard
) {
7005 psdout(10) << "GetLog: discarding log from "
7006 << "non-auth_log_shard osd." << logevt
.from
<< dendl
;
7007 return discard_event();
7009 psdout(10) << "GetLog: received master log from osd."
7010 << logevt
.from
<< dendl
;
7012 post_event(GotLog());
7013 return discard_event();
7016 boost::statechart::result
PeeringState::GetLog::react(const GotLog
&)
7020 psdout(10) << "leaving GetLog" << dendl
;
7022 psdout(10) << "processing master log" << dendl
;
7023 ps
->proc_master_log(context
<PeeringMachine
>().get_cur_transaction(),
7024 msg
->info
, std::move(msg
->log
), std::move(msg
->missing
),
7027 ps
->start_flush(context
< PeeringMachine
>().get_cur_transaction());
7028 return transit
< GetMissing
>();
7031 boost::statechart::result
PeeringState::GetLog::react(const QueryState
& q
)
7033 q
.f
->open_object_section("state");
7034 q
.f
->dump_string("name", state_name
);
7035 q
.f
->dump_stream("enter_time") << enter_time
;
7036 q
.f
->dump_stream("auth_log_shard") << auth_log_shard
;
7037 q
.f
->close_section();
7038 return forward_event();
7041 boost::statechart::result
PeeringState::GetLog::react(const QueryUnfound
& q
)
7043 q
.f
->dump_string("state", "GetLog");
7044 q
.f
->dump_bool("available_might_have_unfound", false);
7045 return discard_event();
7048 void PeeringState::GetLog::exit()
7050 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
7053 utime_t dur
= ceph_clock_now() - enter_time
;
7054 pl
->get_peering_perf().tinc(rs_getlog_latency
, dur
);
7055 ps
->blocked_by
.clear();
7058 /*------WaitActingChange--------*/
7059 PeeringState::WaitActingChange::WaitActingChange(my_context ctx
)
7061 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/WaitActingChange")
7063 context
< PeeringMachine
>().log_enter(state_name
);
7066 boost::statechart::result
PeeringState::WaitActingChange::react(const AdvMap
& advmap
)
7069 OSDMapRef osdmap
= advmap
.osdmap
;
7071 psdout(10) << "verifying no want_acting " << ps
->want_acting
<< " targets didn't go down" << dendl
;
7072 for (auto p
= ps
->want_acting
.begin(); p
!= ps
->want_acting
.end(); ++p
) {
7073 if (!osdmap
->is_up(*p
)) {
7074 psdout(10) << " want_acting target osd." << *p
<< " went down, resetting" << dendl
;
7076 return transit
< Reset
>();
7079 return forward_event();
7082 boost::statechart::result
PeeringState::WaitActingChange::react(const MLogRec
& logevt
)
7084 psdout(10) << "In WaitActingChange, ignoring MLocRec" << dendl
;
7085 return discard_event();
7088 boost::statechart::result
PeeringState::WaitActingChange::react(const MInfoRec
& evt
)
7090 psdout(10) << "In WaitActingChange, ignoring MInfoRec" << dendl
;
7091 return discard_event();
7094 boost::statechart::result
PeeringState::WaitActingChange::react(const MNotifyRec
& evt
)
7096 psdout(10) << "In WaitActingChange, ignoring MNotifyRec" << dendl
;
7097 return discard_event();
7100 boost::statechart::result
PeeringState::WaitActingChange::react(const QueryState
& q
)
7102 q
.f
->open_object_section("state");
7103 q
.f
->dump_string("name", state_name
);
7104 q
.f
->dump_stream("enter_time") << enter_time
;
7105 q
.f
->dump_string("comment", "waiting for pg acting set to change");
7106 q
.f
->close_section();
7107 return forward_event();
7110 boost::statechart::result
PeeringState::WaitActingChange::react(const QueryUnfound
& q
)
7112 q
.f
->dump_string("state", "WaitActingChange");
7113 q
.f
->dump_bool("available_might_have_unfound", false);
7114 return discard_event();
7117 void PeeringState::WaitActingChange::exit()
7119 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
7121 utime_t dur
= ceph_clock_now() - enter_time
;
7122 pl
->get_peering_perf().tinc(rs_waitactingchange_latency
, dur
);
7125 /*------Down--------*/
7126 PeeringState::Down::Down(my_context ctx
)
7128 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Peering/Down")
7130 context
< PeeringMachine
>().log_enter(state_name
);
7133 ps
->state_clear(PG_STATE_PEERING
);
7134 ps
->state_set(PG_STATE_DOWN
);
7136 auto &prior_set
= context
< Peering
>().prior_set
;
7137 ceph_assert(ps
->blocked_by
.empty());
7138 ps
->blocked_by
.insert(prior_set
.down
.begin(), prior_set
.down
.end());
7139 pl
->publish_stats_to_osd();
7142 void PeeringState::Down::exit()
7144 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
7148 ps
->state_clear(PG_STATE_DOWN
);
7149 utime_t dur
= ceph_clock_now() - enter_time
;
7150 pl
->get_peering_perf().tinc(rs_down_latency
, dur
);
7152 ps
->blocked_by
.clear();
7155 boost::statechart::result
PeeringState::Down::react(const QueryState
& q
)
7157 q
.f
->open_object_section("state");
7158 q
.f
->dump_string("name", state_name
);
7159 q
.f
->dump_stream("enter_time") << enter_time
;
7160 q
.f
->dump_string("comment",
7161 "not enough up instances of this PG to go active");
7162 q
.f
->close_section();
7163 return forward_event();
7166 boost::statechart::result
PeeringState::Down::react(const QueryUnfound
& q
)
7168 q
.f
->dump_string("state", "Down");
7169 q
.f
->dump_bool("available_might_have_unfound", false);
7170 return discard_event();
7173 boost::statechart::result
PeeringState::Down::react(const MNotifyRec
& infoevt
)
7177 ceph_assert(ps
->is_primary());
7178 epoch_t old_start
= ps
->info
.history
.last_epoch_started
;
7179 if (!ps
->peer_info
.count(infoevt
.from
) &&
7180 ps
->get_osdmap()->has_been_up_since(infoevt
.from
.osd
, infoevt
.notify
.epoch_sent
)) {
7181 ps
->update_history(infoevt
.notify
.info
.history
);
7183 // if we got something new to make pg escape down state
7184 if (ps
->info
.history
.last_epoch_started
> old_start
) {
7185 psdout(10) << " last_epoch_started moved forward, re-enter getinfo" << dendl
;
7186 ps
->state_clear(PG_STATE_DOWN
);
7187 ps
->state_set(PG_STATE_PEERING
);
7188 return transit
< GetInfo
>();
7191 return discard_event();
7195 /*------Incomplete--------*/
7196 PeeringState::Incomplete::Incomplete(my_context ctx
)
7198 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Peering/Incomplete")
7200 context
< PeeringMachine
>().log_enter(state_name
);
7203 ps
->state_clear(PG_STATE_PEERING
);
7204 ps
->state_set(PG_STATE_INCOMPLETE
);
7206 PastIntervals::PriorSet
&prior_set
= context
< Peering
>().prior_set
;
7207 ceph_assert(ps
->blocked_by
.empty());
7208 ps
->blocked_by
.insert(prior_set
.down
.begin(), prior_set
.down
.end());
7209 pl
->publish_stats_to_osd();
7212 boost::statechart::result
PeeringState::Incomplete::react(const AdvMap
&advmap
) {
7214 int64_t poolnum
= ps
->info
.pgid
.pool();
7216 // Reset if min_size turn smaller than previous value, pg might now be able to go active
7217 if (!advmap
.osdmap
->have_pg_pool(poolnum
) ||
7218 advmap
.lastmap
->get_pools().find(poolnum
)->second
.min_size
>
7219 advmap
.osdmap
->get_pools().find(poolnum
)->second
.min_size
) {
7221 return transit
< Reset
>();
7224 return forward_event();
7227 boost::statechart::result
PeeringState::Incomplete::react(const MNotifyRec
& notevt
) {
7229 psdout(7) << "handle_pg_notify from osd." << notevt
.from
<< dendl
;
7230 if (ps
->proc_replica_info(
7231 notevt
.from
, notevt
.notify
.info
, notevt
.notify
.epoch_sent
)) {
7232 // We got something new, try again!
7233 return transit
< GetLog
>();
7235 return discard_event();
7239 boost::statechart::result
PeeringState::Incomplete::react(
7240 const QueryState
& q
)
7242 q
.f
->open_object_section("state");
7243 q
.f
->dump_string("name", state_name
);
7244 q
.f
->dump_stream("enter_time") << enter_time
;
7245 q
.f
->dump_string("comment", "not enough complete instances of this PG");
7246 q
.f
->close_section();
7247 return forward_event();
7250 boost::statechart::result
PeeringState::Incomplete::react(const QueryUnfound
& q
)
7252 q
.f
->dump_string("state", "Incomplete");
7253 q
.f
->dump_bool("available_might_have_unfound", false);
7254 return discard_event();
7257 void PeeringState::Incomplete::exit()
7259 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
7263 ps
->state_clear(PG_STATE_INCOMPLETE
);
7264 utime_t dur
= ceph_clock_now() - enter_time
;
7265 pl
->get_peering_perf().tinc(rs_incomplete_latency
, dur
);
7267 ps
->blocked_by
.clear();
7270 /*------GetMissing--------*/
7271 PeeringState::GetMissing::GetMissing(my_context ctx
)
7273 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Peering/GetMissing")
7275 context
< PeeringMachine
>().log_enter(state_name
);
7278 ps
->log_weirdness();
7279 ceph_assert(!ps
->acting_recovery_backfill
.empty());
7281 for (auto i
= ps
->acting_recovery_backfill
.begin();
7282 i
!= ps
->acting_recovery_backfill
.end();
7284 if (*i
== ps
->get_primary()) continue;
7285 const pg_info_t
& pi
= ps
->peer_info
[*i
];
7286 // reset this so to make sure the pg_missing_t is initialized and
7287 // has the correct semantics even if we don't need to get a
7288 // missing set from a shard. This way later additions due to
7289 // lost+unfound delete work properly.
7290 ps
->peer_missing
[*i
].may_include_deletes
= !ps
->perform_deletes_during_peering();
7293 continue; // no pg data, nothing divergent
7295 if (pi
.last_update
< ps
->pg_log
.get_tail()) {
7296 psdout(10) << " osd." << *i
<< " is not contiguous, will restart backfill" << dendl
;
7297 ps
->peer_missing
[*i
].clear();
7300 if (pi
.last_backfill
== hobject_t()) {
7301 psdout(10) << " osd." << *i
<< " will fully backfill; can infer empty missing set" << dendl
;
7302 ps
->peer_missing
[*i
].clear();
7306 if (pi
.last_update
== pi
.last_complete
&& // peer has no missing
7307 pi
.last_update
== ps
->info
.last_update
) { // peer is up to date
7308 // replica has no missing and identical log as us. no need to
7310 // FIXME: we can do better here. if last_update==last_complete we
7311 // can infer the rest!
7312 psdout(10) << " osd." << *i
<< " has no missing, identical log" << dendl
;
7313 ps
->peer_missing
[*i
].clear();
7317 // We pull the log from the peer's last_epoch_started to ensure we
7318 // get enough log to detect divergent updates.
7319 since
.epoch
= pi
.last_epoch_started
;
7320 ceph_assert(pi
.last_update
>= ps
->info
.log_tail
); // or else choose_acting() did a bad thing
7321 if (pi
.log_tail
<= since
) {
7322 psdout(10) << " requesting log+missing since " << since
<< " from osd." << *i
<< dendl
;
7323 context
< PeeringMachine
>().send_query(
7327 i
->shard
, ps
->pg_whoami
.shard
,
7328 since
, ps
->info
.history
,
7329 ps
->get_osdmap_epoch()));
7331 psdout(10) << " requesting fulllog+missing from osd." << *i
7332 << " (want since " << since
<< " < log.tail "
7333 << pi
.log_tail
<< ")" << dendl
;
7334 context
< PeeringMachine
>().send_query(
7336 pg_query_t::FULLLOG
,
7337 i
->shard
, ps
->pg_whoami
.shard
,
7338 ps
->info
.history
, ps
->get_osdmap_epoch()));
7340 peer_missing_requested
.insert(*i
);
7341 ps
->blocked_by
.insert(i
->osd
);
7344 if (peer_missing_requested
.empty()) {
7345 if (ps
->need_up_thru
) {
7346 psdout(10) << " still need up_thru update before going active"
7348 post_event(NeedUpThru());
7353 post_event(Activate(ps
->get_osdmap_epoch()));
7355 pl
->publish_stats_to_osd();
7359 boost::statechart::result
PeeringState::GetMissing::react(const MLogRec
& logevt
)
7363 peer_missing_requested
.erase(logevt
.from
);
7364 ps
->proc_replica_log(logevt
.msg
->info
,
7366 std::move(logevt
.msg
->missing
),
7369 if (peer_missing_requested
.empty()) {
7370 if (ps
->need_up_thru
) {
7371 psdout(10) << " still need up_thru update before going active"
7373 post_event(NeedUpThru());
7375 psdout(10) << "Got last missing, don't need missing "
7376 << "posting Activate" << dendl
;
7377 post_event(Activate(ps
->get_osdmap_epoch()));
7380 return discard_event();
7383 boost::statechart::result
PeeringState::GetMissing::react(const QueryState
& q
)
7386 q
.f
->open_object_section("state");
7387 q
.f
->dump_string("name", state_name
);
7388 q
.f
->dump_stream("enter_time") << enter_time
;
7390 q
.f
->open_array_section("peer_missing_requested");
7391 for (auto p
= peer_missing_requested
.begin();
7392 p
!= peer_missing_requested
.end();
7394 q
.f
->open_object_section("osd");
7395 q
.f
->dump_stream("osd") << *p
;
7396 if (ps
->peer_missing
.count(*p
)) {
7397 q
.f
->open_object_section("got_missing");
7398 ps
->peer_missing
[*p
].dump(q
.f
);
7399 q
.f
->close_section();
7401 q
.f
->close_section();
7403 q
.f
->close_section();
7405 q
.f
->close_section();
7406 return forward_event();
7409 boost::statechart::result
PeeringState::GetMissing::react(const QueryUnfound
& q
)
7411 q
.f
->dump_string("state", "GetMising");
7412 q
.f
->dump_bool("available_might_have_unfound", false);
7413 return discard_event();
7416 void PeeringState::GetMissing::exit()
7418 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
7421 utime_t dur
= ceph_clock_now() - enter_time
;
7422 pl
->get_peering_perf().tinc(rs_getmissing_latency
, dur
);
7423 ps
->blocked_by
.clear();
7426 /*------WaitUpThru--------*/
7427 PeeringState::WaitUpThru::WaitUpThru(my_context ctx
)
7429 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Peering/WaitUpThru")
7431 context
< PeeringMachine
>().log_enter(state_name
);
7434 boost::statechart::result
PeeringState::WaitUpThru::react(const ActMap
& am
)
7437 if (!ps
->need_up_thru
) {
7438 post_event(Activate(ps
->get_osdmap_epoch()));
7440 return forward_event();
7443 boost::statechart::result
PeeringState::WaitUpThru::react(const MLogRec
& logevt
)
7446 psdout(10) << "Noting missing from osd." << logevt
.from
<< dendl
;
7447 ps
->peer_missing
[logevt
.from
].claim(std::move(logevt
.msg
->missing
));
7448 ps
->peer_info
[logevt
.from
] = logevt
.msg
->info
;
7449 return discard_event();
7452 boost::statechart::result
PeeringState::WaitUpThru::react(const QueryState
& q
)
7454 q
.f
->open_object_section("state");
7455 q
.f
->dump_string("name", state_name
);
7456 q
.f
->dump_stream("enter_time") << enter_time
;
7457 q
.f
->dump_string("comment", "waiting for osdmap to reflect a new up_thru for this osd");
7458 q
.f
->close_section();
7459 return forward_event();
7462 boost::statechart::result
PeeringState::WaitUpThru::react(const QueryUnfound
& q
)
7464 q
.f
->dump_string("state", "WaitUpThru");
7465 q
.f
->dump_bool("available_might_have_unfound", false);
7466 return discard_event();
7469 void PeeringState::WaitUpThru::exit()
7471 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
7473 utime_t dur
= ceph_clock_now() - enter_time
;
7474 pl
->get_peering_perf().tinc(rs_waitupthru_latency
, dur
);
7477 /*----PeeringState::PeeringMachine Methods-----*/
7479 #define dout_prefix dpp->gen_prefix(*_dout)
7481 void PeeringState::PeeringMachine::log_enter(const char *state_name
)
7484 psdout(5) << "enter " << state_name
<< dendl
;
7485 pl
->log_state_enter(state_name
);
7488 void PeeringState::PeeringMachine::log_exit(const char *state_name
, utime_t enter_time
)
7491 utime_t dur
= ceph_clock_now() - enter_time
;
7492 psdout(5) << "exit " << state_name
<< " " << dur
<< " " << event_count
<< " " << event_time
<< dendl
;
7493 pl
->log_state_exit(state_name
, enter_time
, event_count
, event_time
);
7495 event_time
= utime_t();
7498 ostream
&operator<<(ostream
&out
, const PeeringState
&ps
) {
7499 out
<< "pg[" << ps
.info
7500 << " " << pg_vector_string(ps
.up
);
7501 if (ps
.acting
!= ps
.up
)
7502 out
<< "/" << pg_vector_string(ps
.acting
);
7504 out
<< "p" << ps
.get_primary();
7505 if (!ps
.async_recovery_targets
.empty())
7506 out
<< " async=[" << ps
.async_recovery_targets
<< "]";
7507 if (!ps
.backfill_targets
.empty())
7508 out
<< " backfill=[" << ps
.backfill_targets
<< "]";
7509 out
<< " r=" << ps
.get_role();
7510 out
<< " lpr=" << ps
.get_last_peering_reset();
7515 if (!ps
.past_intervals
.empty()) {
7516 out
<< " pi=[" << ps
.past_intervals
.get_bounds()
7517 << ")/" << ps
.past_intervals
.size();
7520 if (ps
.is_peered()) {
7521 if (ps
.last_update_ondisk
!= ps
.info
.last_update
)
7522 out
<< " luod=" << ps
.last_update_ondisk
;
7523 if (ps
.last_update_applied
!= ps
.info
.last_update
)
7524 out
<< " lua=" << ps
.last_update_applied
;
7527 if (ps
.pg_log
.get_tail() != ps
.info
.log_tail
||
7528 ps
.pg_log
.get_head() != ps
.info
.last_update
)
7529 out
<< " (info mismatch, " << ps
.pg_log
.get_log() << ")";
7531 if (!ps
.pg_log
.get_log().empty()) {
7532 if ((ps
.pg_log
.get_log().log
.begin()->version
<= ps
.pg_log
.get_tail())) {
7533 out
<< " (log bound mismatch, actual=["
7534 << ps
.pg_log
.get_log().log
.begin()->version
<< ","
7535 << ps
.pg_log
.get_log().log
.rbegin()->version
<< "]";
7540 out
<< " crt=" << ps
.pg_log
.get_can_rollback_to();
7542 if (ps
.last_complete_ondisk
!= ps
.info
.last_complete
)
7543 out
<< " lcod " << ps
.last_complete_ondisk
;
7545 out
<< " mlcod " << ps
.min_last_complete_ondisk
;
7547 out
<< " " << pg_state_string(ps
.get_state());
7548 if (ps
.should_send_notify())
7551 if (ps
.prior_readable_until_ub
!= ceph::signedspan::zero()) {
7552 out
<< " pruub " << ps
.prior_readable_until_ub
7553 << "@" << ps
.get_prior_readable_down_osds();
7558 std::vector
<pg_shard_t
> PeeringState::get_replica_recovery_order() const
7560 std::vector
<std::pair
<unsigned int, pg_shard_t
>> replicas_by_num_missing
,
7561 async_by_num_missing
;
7562 replicas_by_num_missing
.reserve(get_acting_recovery_backfill().size() - 1);
7563 for (auto &p
: get_acting_recovery_backfill()) {
7564 if (p
== get_primary()) {
7567 auto pm
= get_peer_missing().find(p
);
7568 assert(pm
!= get_peer_missing().end());
7569 auto nm
= pm
->second
.num_missing();
7571 if (is_async_recovery_target(p
)) {
7572 async_by_num_missing
.push_back(make_pair(nm
, p
));
7574 replicas_by_num_missing
.push_back(make_pair(nm
, p
));
7578 // sort by number of missing objects, in ascending order.
7579 auto func
= [](const std::pair
<unsigned int, pg_shard_t
> &lhs
,
7580 const std::pair
<unsigned int, pg_shard_t
> &rhs
) {
7581 return lhs
.first
< rhs
.first
;
7583 // acting goes first
7584 std::sort(replicas_by_num_missing
.begin(), replicas_by_num_missing
.end(), func
);
7585 // then async_recovery_targets
7586 std::sort(async_by_num_missing
.begin(), async_by_num_missing
.end(), func
);
7587 replicas_by_num_missing
.insert(replicas_by_num_missing
.end(),
7588 async_by_num_missing
.begin(), async_by_num_missing
.end());
7590 std::vector
<pg_shard_t
> ret
;
7591 ret
.reserve(replicas_by_num_missing
.size());
7592 for (auto p
: replicas_by_num_missing
) {
7593 ret
.push_back(p
.second
);