1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "PGPeeringEvent.h"
5 #include "common/ceph_releases.h"
6 #include "common/dout.h"
7 #include "PeeringState.h"
9 #include "messages/MOSDPGRemove.h"
10 #include "messages/MBackfillReserve.h"
11 #include "messages/MRecoveryReserve.h"
12 #include "messages/MOSDScrubReserve.h"
13 #include "messages/MOSDPGInfo.h"
14 #include "messages/MOSDPGInfo2.h"
15 #include "messages/MOSDPGTrim.h"
16 #include "messages/MOSDPGLog.h"
17 #include "messages/MOSDPGNotify.h"
18 #include "messages/MOSDPGNotify2.h"
19 #include "messages/MOSDPGQuery.h"
20 #include "messages/MOSDPGQuery2.h"
21 #include "messages/MOSDPGLease.h"
22 #include "messages/MOSDPGLeaseAck.h"
24 #define dout_context cct
25 #define dout_subsys ceph_subsys_osd
34 using std::stringstream
;
37 using ceph::Formatter
;
38 using ceph::make_message
;
40 BufferedRecoveryMessages::BufferedRecoveryMessages(
43 : require_osd_release(r
) {
44 // steal messages from ctx
45 message_map
.swap(ctx
.message_map
);
48 void BufferedRecoveryMessages::send_notify(int to
, const pg_notify_t
&n
)
50 if (require_osd_release
>= ceph_release_t::octopus
) {
51 spg_t
pgid(n
.info
.pgid
.pgid
, n
.to
);
52 send_osd_message(to
, make_message
<MOSDPGNotify2
>(pgid
, n
));
54 send_osd_message(to
, make_message
<MOSDPGNotify
>(n
.epoch_sent
, vector
{n
}));
58 void BufferedRecoveryMessages::send_query(
63 if (require_osd_release
>= ceph_release_t::octopus
) {
65 make_message
<MOSDPGQuery2
>(to_spgid
, q
));
67 auto m
= make_message
<MOSDPGQuery
>(
69 MOSDPGQuery::pg_list_t
{{to_spgid
, q
}});
70 send_osd_message(to
, m
);
74 void BufferedRecoveryMessages::send_info(
79 const pg_info_t
&info
,
80 std::optional
<pg_lease_t
> lease
,
81 std::optional
<pg_lease_ack_t
> lease_ack
)
83 if (require_osd_release
>= ceph_release_t::octopus
) {
86 make_message
<MOSDPGInfo2
>(
97 make_message
<MOSDPGInfo
>(
99 vector
{pg_notify_t
{to_spgid
.shard
,
101 min_epoch
, cur_epoch
,
102 info
, PastIntervals
{}}})
107 void PGPool::update(OSDMapRef map
)
109 const pg_pool_t
*pi
= map
->get_pg_pool(id
);
111 return; // pool has been deleted
114 name
= map
->get_pool_name(id
);
116 bool updated
= false;
117 if ((map
->get_epoch() != cached_epoch
+ 1) ||
118 (pi
->get_snap_epoch() == map
->get_epoch())) {
122 if (info
.is_pool_snaps_mode() && updated
) {
123 snapc
= pi
->get_snap_context();
125 cached_epoch
= map
->get_epoch();
128 /*-------------Peering State Helpers----------------*/
130 #define dout_prefix (dpp->gen_prefix(*_dout))
132 #define psdout(x) ldout(cct, x)
134 PeeringState::PeeringState(
136 pg_shard_t pg_whoami
,
140 DoutPrefixProvider
*dpp
,
142 : state_history(*pl
),
150 pg_whoami(pg_whoami
),
153 missing_loc(spgid
, this, dpp
, cct
),
154 machine(this, cct
, spgid
, dpp
, pl
, &state_history
)
159 void PeeringState::start_handle(PeeringCtx
*new_ctx
) {
161 ceph_assert(!orig_ctx
);
164 if (messages_pending_flush
) {
165 rctx
.emplace(*messages_pending_flush
, *new_ctx
);
167 rctx
.emplace(*new_ctx
);
169 rctx
->start_time
= ceph_clock_now();
173 void PeeringState::begin_block_outgoing() {
174 ceph_assert(!messages_pending_flush
);
175 ceph_assert(orig_ctx
);
177 messages_pending_flush
= BufferedRecoveryMessages(
178 orig_ctx
->require_osd_release
);
179 rctx
.emplace(*messages_pending_flush
, *orig_ctx
);
182 void PeeringState::clear_blocked_outgoing() {
183 ceph_assert(orig_ctx
);
185 messages_pending_flush
= std::optional
<BufferedRecoveryMessages
>();
188 void PeeringState::end_block_outgoing() {
189 ceph_assert(messages_pending_flush
);
190 ceph_assert(orig_ctx
);
193 orig_ctx
->accept_buffered_messages(*messages_pending_flush
);
194 rctx
.emplace(*orig_ctx
);
195 messages_pending_flush
= std::optional
<BufferedRecoveryMessages
>();
198 void PeeringState::end_handle() {
200 utime_t dur
= ceph_clock_now() - rctx
->start_time
;
201 machine
.event_time
+= dur
;
204 machine
.event_count
++;
209 void PeeringState::check_recovery_sources(const OSDMapRef
& osdmap
)
212 * check that any peers we are planning to (or currently) pulling
213 * objects from are dealt with.
215 missing_loc
.check_recovery_sources(osdmap
);
216 pl
->check_recovery_sources(osdmap
);
218 for (auto i
= peer_log_requested
.begin(); i
!= peer_log_requested
.end();) {
219 if (!osdmap
->is_up(i
->osd
)) {
220 psdout(10) << "peer_log_requested removing " << *i
<< dendl
;
221 peer_log_requested
.erase(i
++);
227 for (auto i
= peer_missing_requested
.begin();
228 i
!= peer_missing_requested
.end();) {
229 if (!osdmap
->is_up(i
->osd
)) {
230 psdout(10) << "peer_missing_requested removing " << *i
<< dendl
;
231 peer_missing_requested
.erase(i
++);
238 void PeeringState::update_history(const pg_history_t
& new_history
)
240 auto mnow
= pl
->get_mnow();
241 info
.history
.refresh_prior_readable_until_ub(mnow
, prior_readable_until_ub
);
242 if (info
.history
.merge(new_history
)) {
243 psdout(20) << __func__
<< " advanced history from " << new_history
<< dendl
;
245 if (info
.history
.last_epoch_clean
>= info
.history
.same_interval_since
) {
246 psdout(20) << __func__
<< " clearing past_intervals" << dendl
;
247 past_intervals
.clear();
248 dirty_big_info
= true;
250 prior_readable_until_ub
= info
.history
.get_prior_readable_until_ub(mnow
);
251 if (prior_readable_until_ub
!= ceph::signedspan::zero()) {
253 << " prior_readable_until_ub " << prior_readable_until_ub
254 << " (mnow " << mnow
<< " + "
255 << info
.history
.prior_readable_until_ub
<< ")" << dendl
;
258 pl
->on_info_history_change();
261 hobject_t
PeeringState::earliest_backfill() const
263 hobject_t e
= hobject_t::get_max();
264 for (const pg_shard_t
& bt
: get_backfill_targets()) {
265 const pg_info_t
&pi
= get_peer_info(bt
);
266 e
= std::min(pi
.last_backfill
, e
);
271 void PeeringState::purge_strays()
274 psdout(10) << "purge_strays " << stray_set
<< " but premerge, doing nothing"
278 if (cct
->_conf
.get_val
<bool>("osd_debug_no_purge_strays")) {
281 psdout(10) << "purge_strays " << stray_set
<< dendl
;
283 bool removed
= false;
284 for (auto p
= stray_set
.begin(); p
!= stray_set
.end(); ++p
) {
285 ceph_assert(!is_acting_recovery_backfill(*p
));
286 if (get_osdmap()->is_up(p
->osd
)) {
287 psdout(10) << "sending PGRemove to osd." << *p
<< dendl
;
288 vector
<spg_t
> to_remove
;
289 to_remove
.push_back(spg_t(info
.pgid
.pgid
, p
->shard
));
290 auto m
= make_message
<MOSDPGRemove
>(
293 pl
->send_cluster_message(p
->osd
, m
, get_osdmap_epoch());
295 psdout(10) << "not sending PGRemove to down osd." << *p
<< dendl
;
297 peer_missing
.erase(*p
);
299 missing_loc
.remove_stray_recovery_sources(*p
);
300 peer_purged
.insert(*p
);
304 // if we removed anyone, update peers (which include peer_info)
306 update_heartbeat_peers();
310 // clear _requested maps; we may have to peer() again if we discover
311 // (more) stray content
312 peer_log_requested
.clear();
313 peer_missing_requested
.clear();
316 void PeeringState::query_unfound(Formatter
*f
, string state
)
318 psdout(20) << "Enter PeeringState common QueryUnfound" << dendl
;
320 f
->dump_string("state", state
);
321 f
->dump_bool("available_might_have_unfound", true);
322 f
->open_array_section("might_have_unfound");
323 for (auto p
= might_have_unfound
.begin();
324 p
!= might_have_unfound
.end();
326 if (peer_missing
.count(*p
)) {
327 ; // Ignore already probed OSDs
329 f
->open_object_section("osd");
330 f
->dump_stream("osd") << *p
;
331 if (peer_missing_requested
.count(*p
)) {
332 f
->dump_string("status", "querying");
333 } else if (!get_osdmap()->is_up(p
->osd
)) {
334 f
->dump_string("status", "osd is down");
336 f
->dump_string("status", "not queried");
343 psdout(20) << "Exit PeeringState common QueryUnfound" << dendl
;
347 bool PeeringState::proc_replica_info(
348 pg_shard_t from
, const pg_info_t
&oinfo
, epoch_t send_epoch
)
350 auto p
= peer_info
.find(from
);
351 if (p
!= peer_info
.end() && p
->second
.last_update
== oinfo
.last_update
) {
352 psdout(10) << " got dup osd." << from
<< " info "
353 << oinfo
<< ", identical to ours" << dendl
;
357 if (!get_osdmap()->has_been_up_since(from
.osd
, send_epoch
)) {
358 psdout(10) << " got info " << oinfo
<< " from down osd." << from
359 << " discarding" << dendl
;
363 psdout(10) << " got osd." << from
<< " " << oinfo
<< dendl
;
364 ceph_assert(is_primary());
365 peer_info
[from
] = oinfo
;
366 might_have_unfound
.insert(from
);
368 update_history(oinfo
.history
);
371 if (!is_up(from
) && !is_acting(from
)) {
372 psdout(10) << " osd." << from
<< " has stray content: " << oinfo
<< dendl
;
373 stray_set
.insert(from
);
379 // was this a new info? if so, update peers!
380 if (p
== peer_info
.end())
381 update_heartbeat_peers();
387 void PeeringState::remove_down_peer_info(const OSDMapRef
&osdmap
)
389 // Remove any downed osds from peer_info
390 bool removed
= false;
391 auto p
= peer_info
.begin();
392 while (p
!= peer_info
.end()) {
393 if (!osdmap
->is_up(p
->first
.osd
)) {
394 psdout(10) << " dropping down osd." << p
->first
<< " info " << p
->second
<< dendl
;
395 peer_missing
.erase(p
->first
);
396 peer_log_requested
.erase(p
->first
);
397 peer_missing_requested
.erase(p
->first
);
398 peer_purged
.erase(p
->first
);
399 peer_info
.erase(p
++);
405 // if we removed anyone, update peers (which include peer_info)
407 update_heartbeat_peers();
409 check_recovery_sources(osdmap
);
412 void PeeringState::update_heartbeat_peers()
418 for (unsigned i
=0; i
<acting
.size(); i
++) {
419 if (acting
[i
] != CRUSH_ITEM_NONE
)
420 new_peers
.insert(acting
[i
]);
422 for (unsigned i
=0; i
<up
.size(); i
++) {
423 if (up
[i
] != CRUSH_ITEM_NONE
)
424 new_peers
.insert(up
[i
]);
426 for (auto p
= peer_info
.begin(); p
!= peer_info
.end(); ++p
) {
427 new_peers
.insert(p
->first
.osd
);
429 pl
->update_heartbeat_peers(std::move(new_peers
));
432 void PeeringState::write_if_dirty(ObjectStore::Transaction
& t
)
441 last_persisted_osdmap
< get_osdmap_epoch(),
443 if (dirty_info
|| dirty_big_info
) {
444 last_persisted_osdmap
= get_osdmap_epoch();
445 last_written_info
= info
;
447 dirty_big_info
= false;
451 void PeeringState::advance_map(
452 OSDMapRef osdmap
, OSDMapRef lastmap
,
453 vector
<int>& newup
, int up_primary
,
454 vector
<int>& newacting
, int acting_primary
,
457 ceph_assert(lastmap
== osdmap_ref
);
458 psdout(10) << "handle_advance_map "
459 << newup
<< "/" << newacting
460 << " -- " << up_primary
<< "/" << acting_primary
463 update_osdmap_ref(osdmap
);
467 osdmap
, lastmap
, newup
, up_primary
,
468 newacting
, acting_primary
);
469 handle_event(evt
, &rctx
);
470 if (pool
.info
.last_change
== osdmap_ref
->get_epoch()) {
471 pl
->on_pool_change();
473 readable_interval
= pool
.get_readable_interval(cct
->_conf
);
474 last_require_osd_release
= osdmap
->require_osd_release
;
477 void PeeringState::activate_map(PeeringCtx
&rctx
)
479 psdout(10) << __func__
<< dendl
;
481 handle_event(evt
, &rctx
);
482 if (osdmap_ref
->get_epoch() - last_persisted_osdmap
>
483 cct
->_conf
->osd_pg_epoch_persisted_max_stale
) {
484 psdout(20) << __func__
<< ": Dirtying info: last_persisted is "
485 << last_persisted_osdmap
486 << " while current is " << osdmap_ref
->get_epoch() << dendl
;
489 psdout(20) << __func__
<< ": Not dirtying info: last_persisted is "
490 << last_persisted_osdmap
491 << " while current is " << osdmap_ref
->get_epoch() << dendl
;
493 write_if_dirty(rctx
.transaction
);
495 if (get_osdmap()->check_new_blocklist_entries()) {
496 pl
->check_blocklisted_watchers();
500 void PeeringState::set_last_peering_reset()
502 psdout(20) << "set_last_peering_reset " << get_osdmap_epoch() << dendl
;
503 if (last_peering_reset
!= get_osdmap_epoch()) {
504 last_peering_reset
= get_osdmap_epoch();
505 psdout(10) << "Clearing blocked outgoing recovery messages" << dendl
;
506 clear_blocked_outgoing();
507 if (!pl
->try_flush_or_schedule_async()) {
508 psdout(10) << "Beginning to block outgoing recovery messages" << dendl
;
509 begin_block_outgoing();
511 psdout(10) << "Not blocking outgoing recovery messages" << dendl
;
516 void PeeringState::complete_flush()
518 flushes_in_progress
--;
519 if (flushes_in_progress
== 0) {
524 void PeeringState::check_full_transition(OSDMapRef lastmap
, OSDMapRef osdmap
)
526 const pg_pool_t
*pi
= osdmap
->get_pg_pool(info
.pgid
.pool());
528 return; // pool deleted
530 bool changed
= false;
531 if (pi
->has_flag(pg_pool_t::FLAG_FULL
)) {
532 const pg_pool_t
*opi
= lastmap
->get_pg_pool(info
.pgid
.pool());
533 if (!opi
|| !opi
->has_flag(pg_pool_t::FLAG_FULL
)) {
534 psdout(10) << " pool was marked full in " << osdmap
->get_epoch() << dendl
;
539 info
.history
.last_epoch_marked_full
= osdmap
->get_epoch();
544 bool PeeringState::should_restart_peering(
546 int newactingprimary
,
547 const vector
<int>& newup
,
548 const vector
<int>& newacting
,
552 if (PastIntervals::is_new_interval(
564 psdout(20) << "new interval newup " << newup
565 << " newacting " << newacting
<< dendl
;
568 if (!lastmap
->is_up(pg_whoami
.osd
) && osdmap
->is_up(pg_whoami
.osd
)) {
569 psdout(10) << __func__
<< " osd transitioned from down -> up"
576 /* Called before initializing peering during advance_map */
577 void PeeringState::start_peering_interval(
578 const OSDMapRef lastmap
,
579 const vector
<int>& newup
, int new_up_primary
,
580 const vector
<int>& newacting
, int new_acting_primary
,
581 ObjectStore::Transaction
&t
)
583 const OSDMapRef osdmap
= get_osdmap();
585 set_last_peering_reset();
587 vector
<int> oldacting
, oldup
;
588 int oldrole
= get_role();
591 pl
->clear_ready_to_merge();
595 pg_shard_t old_acting_primary
= get_primary();
596 pg_shard_t old_up_primary
= up_primary
;
597 bool was_old_primary
= is_primary();
598 bool was_old_nonprimary
= is_nonprimary();
600 acting
.swap(oldacting
);
602 init_primary_up_acting(
608 if (info
.stats
.up
!= up
||
609 info
.stats
.acting
!= acting
||
610 info
.stats
.up_primary
!= new_up_primary
||
611 info
.stats
.acting_primary
!= new_acting_primary
) {
613 info
.stats
.up_primary
= new_up_primary
;
614 info
.stats
.acting
= acting
;
615 info
.stats
.acting_primary
= new_acting_primary
;
616 info
.stats
.mapping_epoch
= osdmap
->get_epoch();
619 pl
->clear_publish_stats();
621 // This will now be remapped during a backfill in cases
622 // that it would not have been before.
624 state_set(PG_STATE_REMAPPED
);
626 state_clear(PG_STATE_REMAPPED
);
628 int role
= osdmap
->calc_pg_role(pg_whoami
, acting
);
631 // did acting, up, primary|acker change?
633 psdout(10) << " no lastmap" << dendl
;
635 dirty_big_info
= true;
636 info
.history
.same_interval_since
= osdmap
->get_epoch();
638 std::stringstream debug
;
639 ceph_assert(info
.history
.same_interval_since
!= 0);
640 bool new_interval
= PastIntervals::check_new_interval(
641 old_acting_primary
.osd
,
643 oldacting
, newacting
,
647 info
.history
.same_interval_since
,
648 info
.history
.last_epoch_clean
,
652 missing_loc
.get_recoverable_predicate(),
655 psdout(10) << __func__
<< ": check_new_interval output: "
656 << debug
.str() << dendl
;
658 if (osdmap
->get_epoch() == pl
->oldest_stored_osdmap() &&
659 info
.history
.last_epoch_clean
< osdmap
->get_epoch()) {
660 psdout(10) << " map gap, clearing past_intervals and faking" << dendl
;
661 // our information is incomplete and useless; someone else was clean
662 // after everything we know if osdmaps were trimmed.
663 past_intervals
.clear();
665 psdout(10) << " noting past " << past_intervals
<< dendl
;
668 dirty_big_info
= true;
669 info
.history
.same_interval_since
= osdmap
->get_epoch();
670 if (osdmap
->have_pg_pool(info
.pgid
.pgid
.pool()) &&
671 info
.pgid
.pgid
.is_split(lastmap
->get_pg_num(info
.pgid
.pgid
.pool()),
672 osdmap
->get_pg_num(info
.pgid
.pgid
.pool()),
674 info
.history
.last_epoch_split
= osdmap
->get_epoch();
679 if (old_up_primary
!= up_primary
||
681 info
.history
.same_up_since
= osdmap
->get_epoch();
683 // this comparison includes primary rank via pg_shard_t
684 if (old_acting_primary
!= get_primary()) {
685 info
.history
.same_primary_since
= osdmap
->get_epoch();
689 pl
->on_info_history_change();
691 psdout(1) << __func__
<< " up " << oldup
<< " -> " << up
692 << ", acting " << oldacting
<< " -> " << acting
693 << ", acting_primary " << old_acting_primary
<< " -> "
694 << new_acting_primary
695 << ", up_primary " << old_up_primary
<< " -> " << new_up_primary
696 << ", role " << oldrole
<< " -> " << role
697 << ", features acting " << acting_features
698 << " upacting " << upacting_features
702 state_clear(PG_STATE_ACTIVE
);
703 state_clear(PG_STATE_PEERED
);
704 state_clear(PG_STATE_PREMERGE
);
705 state_clear(PG_STATE_DOWN
);
706 state_clear(PG_STATE_RECOVERY_WAIT
);
707 state_clear(PG_STATE_RECOVERY_TOOFULL
);
708 state_clear(PG_STATE_RECOVERING
);
711 acting_recovery_backfill
.clear();
713 // reset primary/replica state?
714 if (was_old_primary
|| is_primary()) {
715 pl
->clear_want_pg_temp();
716 } else if (was_old_nonprimary
|| is_nonprimary()) {
717 pl
->clear_want_pg_temp();
719 clear_primary_state();
723 ceph_assert(!deleting
);
725 // should we tell the primary we are here?
726 send_notify
= !is_primary();
728 if (role
!= oldrole
||
729 was_old_primary
!= is_primary()) {
730 // did primary change?
731 if (was_old_primary
!= is_primary()) {
732 state_clear(PG_STATE_CLEAN
);
735 pl
->on_role_change();
738 // did primary change?
739 if (get_primary() != old_acting_primary
) {
740 psdout(10) << oldacting
<< " -> " << acting
741 << ", acting primary "
742 << old_acting_primary
<< " -> " << get_primary()
745 // primary is the same.
747 // i am (still) primary. but my replica set changed.
748 state_clear(PG_STATE_CLEAN
);
750 psdout(10) << oldacting
<< " -> " << acting
751 << ", replicas changed" << dendl
;
756 if (acting
.empty() && !up
.empty() && up_primary
== pg_whoami
) {
757 psdout(10) << " acting empty, but i am up[0], clearing pg_temp" << dendl
;
758 pl
->queue_want_pg_temp(acting
);
762 void PeeringState::on_new_interval()
764 dout(20) << __func__
<< dendl
;
765 const OSDMapRef osdmap
= get_osdmap();
767 // initialize features
768 acting_features
= CEPH_FEATURES_SUPPORTED_DEFAULT
;
769 upacting_features
= CEPH_FEATURES_SUPPORTED_DEFAULT
;
770 for (auto p
= acting
.begin(); p
!= acting
.end(); ++p
) {
771 if (*p
== CRUSH_ITEM_NONE
)
773 uint64_t f
= osdmap
->get_xinfo(*p
).features
;
774 acting_features
&= f
;
775 upacting_features
&= f
;
777 for (auto p
= up
.begin(); p
!= up
.end(); ++p
) {
778 if (*p
== CRUSH_ITEM_NONE
)
780 upacting_features
&= osdmap
->get_xinfo(*p
).features
;
782 psdout(20) << __func__
<< " upacting_features 0x" << std::hex
783 << upacting_features
<< std::dec
784 << " from " << acting
<< "+" << up
<< dendl
;
786 psdout(20) << __func__
<< " checking missing set deletes flag. missing = "
787 << get_pg_log().get_missing() << dendl
;
789 if (!pg_log
.get_missing().may_include_deletes
&&
790 !perform_deletes_during_peering()) {
791 pl
->rebuild_missing_set_with_deletes(pg_log
);
794 pg_log
.get_missing().may_include_deletes
==
795 !perform_deletes_during_peering());
799 // update lease bounds for a new interval
800 auto mnow
= pl
->get_mnow();
801 prior_readable_until_ub
= std::max(prior_readable_until_ub
,
803 prior_readable_until_ub
= info
.history
.refresh_prior_readable_until_ub(
804 mnow
, prior_readable_until_ub
);
805 psdout(10) << __func__
<< " prior_readable_until_ub "
806 << prior_readable_until_ub
<< " (mnow " << mnow
<< " + "
807 << info
.history
.prior_readable_until_ub
<< ")" << dendl
;
808 prior_readable_down_osds
.clear(); // we populate this when we build the priorset
812 readable_until_ub_sent
=
813 readable_until_ub_from_primary
= ceph::signedspan::zero();
815 acting_readable_until_ub
.clear();
817 acting_readable_until_ub
.resize(acting
.size(), ceph::signedspan::zero());
820 pl
->on_new_interval();
823 void PeeringState::init_primary_up_acting(
824 const vector
<int> &newup
,
825 const vector
<int> &newacting
,
827 int new_acting_primary
)
831 for (uint8_t i
= 0; i
< acting
.size(); ++i
) {
832 if (acting
[i
] != CRUSH_ITEM_NONE
)
836 pool
.info
.is_erasure() ? shard_id_t(i
) : shard_id_t::NO_SHARD
));
840 for (uint8_t i
= 0; i
< up
.size(); ++i
) {
841 if (up
[i
] != CRUSH_ITEM_NONE
)
845 pool
.info
.is_erasure() ? shard_id_t(i
) : shard_id_t::NO_SHARD
));
847 if (!pool
.info
.is_erasure()) {
849 up_primary
= pg_shard_t(new_up_primary
, shard_id_t::NO_SHARD
);
850 primary
= pg_shard_t(new_acting_primary
, shard_id_t::NO_SHARD
);
853 up_primary
= pg_shard_t();
854 primary
= pg_shard_t();
855 for (uint8_t i
= 0; i
< up
.size(); ++i
) {
856 if (up
[i
] == new_up_primary
) {
857 up_primary
= pg_shard_t(up
[i
], shard_id_t(i
));
861 for (uint8_t i
= 0; i
< acting
.size(); ++i
) {
862 if (acting
[i
] == new_acting_primary
) {
863 primary
= pg_shard_t(acting
[i
], shard_id_t(i
));
867 ceph_assert(up_primary
.osd
== new_up_primary
);
868 ceph_assert(primary
.osd
== new_acting_primary
);
872 void PeeringState::init_hb_stamps()
875 // we care about all other osds in the acting set
876 hb_stamps
.resize(acting
.size() - 1);
878 for (auto p
: acting
) {
879 if (p
== CRUSH_ITEM_NONE
|| p
== get_primary().osd
) {
882 hb_stamps
[i
++] = pl
->get_hb_stamps(p
);
885 } else if (is_nonprimary()) {
886 // we care about just the primary
888 hb_stamps
[0] = pl
->get_hb_stamps(get_primary().osd
);
892 dout(10) << __func__
<< " now " << hb_stamps
<< dendl
;
896 void PeeringState::clear_recovery_state()
898 async_recovery_targets
.clear();
899 backfill_targets
.clear();
902 void PeeringState::clear_primary_state()
904 psdout(10) << "clear_primary_state" << dendl
;
906 // clear peering state
908 peer_log_requested
.clear();
909 peer_missing_requested
.clear();
912 peer_missing
.clear();
913 peer_last_complete_ondisk
.clear();
914 peer_activated
.clear();
915 min_last_complete_ondisk
= eversion_t();
916 pg_trim_to
= eversion_t();
917 might_have_unfound
.clear();
918 need_up_thru
= false;
920 pg_log
.reset_recovery_pointers();
922 clear_recovery_state();
924 last_update_ondisk
= eversion_t();
926 pl
->clear_primary_state();
929 /// return [start,end) bounds for required past_intervals
930 static pair
<epoch_t
, epoch_t
> get_required_past_interval_bounds(
931 const pg_info_t
&info
,
932 epoch_t oldest_map
) {
933 epoch_t start
= std::max(
934 info
.history
.last_epoch_clean
? info
.history
.last_epoch_clean
:
935 info
.history
.epoch_pool_created
,
937 epoch_t end
= std::max(
938 info
.history
.same_interval_since
,
939 info
.history
.epoch_pool_created
);
940 return make_pair(start
, end
);
944 void PeeringState::check_past_interval_bounds() const
946 auto oldest_epoch
= pl
->oldest_stored_osdmap();
947 auto rpib
= get_required_past_interval_bounds(
950 if (rpib
.first
>= rpib
.second
) {
951 // do not warn if the start bound is dictated by oldest_map; the
952 // past intervals are presumably appropriate given the pg info.
953 if (!past_intervals
.empty() &&
954 rpib
.first
> oldest_epoch
) {
955 pl
->get_clog_error() << info
.pgid
<< " required past_interval bounds are"
956 << " empty [" << rpib
<< ") but past_intervals is not: "
958 derr
<< info
.pgid
<< " required past_interval bounds are"
959 << " empty [" << rpib
<< ") but past_intervals is not: "
960 << past_intervals
<< dendl
;
963 if (past_intervals
.empty()) {
964 pl
->get_clog_error() << info
.pgid
<< " required past_interval bounds are"
965 << " not empty [" << rpib
<< ") but past_intervals "
966 << past_intervals
<< " is empty";
967 derr
<< info
.pgid
<< " required past_interval bounds are"
968 << " not empty [" << rpib
<< ") but past_intervals "
969 << past_intervals
<< " is empty" << dendl
;
970 ceph_assert(!past_intervals
.empty());
973 auto apib
= past_intervals
.get_bounds();
974 if (apib
.first
> rpib
.first
) {
975 pl
->get_clog_error() << info
.pgid
<< " past_intervals [" << apib
976 << ") start interval does not contain the required"
977 << " bound [" << rpib
<< ") start";
978 derr
<< info
.pgid
<< " past_intervals [" << apib
979 << ") start interval does not contain the required"
980 << " bound [" << rpib
<< ") start" << dendl
;
981 ceph_abort_msg("past_interval start interval mismatch");
983 if (apib
.second
!= rpib
.second
) {
984 pl
->get_clog_error() << info
.pgid
<< " past_interal bound [" << apib
985 << ") end does not match required [" << rpib
987 derr
<< info
.pgid
<< " past_interal bound [" << apib
988 << ") end does not match required [" << rpib
990 ceph_abort_msg("past_interval end mismatch");
995 int PeeringState::clamp_recovery_priority(int priority
, int pool_recovery_priority
, int max
)
997 static_assert(OSD_RECOVERY_PRIORITY_MIN
< OSD_RECOVERY_PRIORITY_MAX
, "Invalid priority range");
998 static_assert(OSD_RECOVERY_PRIORITY_MIN
>= 0, "Priority range must match unsigned type");
1000 ceph_assert(max
<= OSD_RECOVERY_PRIORITY_MAX
);
1002 // User can't set this too high anymore, but might be a legacy value
1003 if (pool_recovery_priority
> OSD_POOL_PRIORITY_MAX
)
1004 pool_recovery_priority
= OSD_POOL_PRIORITY_MAX
;
1005 if (pool_recovery_priority
< OSD_POOL_PRIORITY_MIN
)
1006 pool_recovery_priority
= OSD_POOL_PRIORITY_MIN
;
1007 // Shift range from min to max to 0 to max - min
1008 pool_recovery_priority
+= (0 - OSD_POOL_PRIORITY_MIN
);
1009 ceph_assert(pool_recovery_priority
>= 0 && pool_recovery_priority
<= (OSD_POOL_PRIORITY_MAX
- OSD_POOL_PRIORITY_MIN
));
1011 priority
+= pool_recovery_priority
;
1013 // Clamp to valid range
1014 if (priority
> max
) {
1016 } else if (priority
< OSD_RECOVERY_PRIORITY_MIN
) {
1017 return OSD_RECOVERY_PRIORITY_MIN
;
1023 unsigned PeeringState::get_recovery_priority()
1025 // a higher value -> a higher priority
1026 int ret
= OSD_RECOVERY_PRIORITY_BASE
;
1029 if (state
& PG_STATE_FORCED_RECOVERY
) {
1030 ret
= OSD_RECOVERY_PRIORITY_FORCED
;
1032 // XXX: This priority boost isn't so much about inactive, but about data-at-risk
1033 if (is_degraded() && info
.stats
.avail_no_missing
.size() < pool
.info
.min_size
) {
1034 base
= OSD_RECOVERY_INACTIVE_PRIORITY_BASE
;
1035 // inactive: no. of replicas < min_size, highest priority since it blocks IO
1036 ret
= base
+ (pool
.info
.min_size
- info
.stats
.avail_no_missing
.size());
1039 int64_t pool_recovery_priority
= 0;
1040 pool
.info
.opts
.get(pool_opts_t::RECOVERY_PRIORITY
, &pool_recovery_priority
);
1042 ret
= clamp_recovery_priority(ret
, pool_recovery_priority
, max_prio_map
[base
]);
1044 psdout(20) << __func__
<< " recovery priority is " << ret
<< dendl
;
1045 return static_cast<unsigned>(ret
);
1048 unsigned PeeringState::get_backfill_priority()
1050 // a higher value -> a higher priority
1051 int ret
= OSD_BACKFILL_PRIORITY_BASE
;
1054 if (state
& PG_STATE_FORCED_BACKFILL
) {
1055 ret
= OSD_BACKFILL_PRIORITY_FORCED
;
1057 if (actingset
.size() < pool
.info
.min_size
) {
1058 base
= OSD_BACKFILL_INACTIVE_PRIORITY_BASE
;
1059 // inactive: no. of replicas < min_size, highest priority since it blocks IO
1060 ret
= base
+ (pool
.info
.min_size
- actingset
.size());
1062 } else if (is_undersized()) {
1063 // undersized: OSD_BACKFILL_DEGRADED_PRIORITY_BASE + num missing replicas
1064 ceph_assert(pool
.info
.size
> actingset
.size());
1065 base
= OSD_BACKFILL_DEGRADED_PRIORITY_BASE
;
1066 ret
= base
+ (pool
.info
.size
- actingset
.size());
1068 } else if (is_degraded()) {
1069 // degraded: baseline degraded
1070 base
= ret
= OSD_BACKFILL_DEGRADED_PRIORITY_BASE
;
1073 // Adjust with pool's recovery priority
1074 int64_t pool_recovery_priority
= 0;
1075 pool
.info
.opts
.get(pool_opts_t::RECOVERY_PRIORITY
, &pool_recovery_priority
);
1077 ret
= clamp_recovery_priority(ret
, pool_recovery_priority
, max_prio_map
[base
]);
1080 psdout(20) << __func__
<< " backfill priority is " << ret
<< dendl
;
1081 return static_cast<unsigned>(ret
);
1084 unsigned PeeringState::get_delete_priority()
1086 auto state
= get_osdmap()->get_state(pg_whoami
.osd
);
1087 if (state
& (CEPH_OSD_BACKFILLFULL
|
1089 return OSD_DELETE_PRIORITY_FULL
;
1090 } else if (state
& CEPH_OSD_NEARFULL
) {
1091 return OSD_DELETE_PRIORITY_FULLISH
;
1093 return OSD_DELETE_PRIORITY_NORMAL
;
1097 bool PeeringState::set_force_recovery(bool b
)
1101 if (!(state
& PG_STATE_FORCED_RECOVERY
) &&
1102 (state
& (PG_STATE_DEGRADED
|
1103 PG_STATE_RECOVERY_WAIT
|
1104 PG_STATE_RECOVERING
))) {
1105 psdout(20) << __func__
<< " set" << dendl
;
1106 state_set(PG_STATE_FORCED_RECOVERY
);
1107 pl
->publish_stats_to_osd();
1110 } else if (state
& PG_STATE_FORCED_RECOVERY
) {
1111 psdout(20) << __func__
<< " clear" << dendl
;
1112 state_clear(PG_STATE_FORCED_RECOVERY
);
1113 pl
->publish_stats_to_osd();
1117 psdout(20) << __func__
<< " state " << get_current_state()
1119 pl
->update_local_background_io_priority(get_recovery_priority());
1124 bool PeeringState::set_force_backfill(bool b
)
1128 if (!(state
& PG_STATE_FORCED_BACKFILL
) &&
1129 (state
& (PG_STATE_DEGRADED
|
1130 PG_STATE_BACKFILL_WAIT
|
1131 PG_STATE_BACKFILLING
))) {
1132 psdout(10) << __func__
<< " set" << dendl
;
1133 state_set(PG_STATE_FORCED_BACKFILL
);
1134 pl
->publish_stats_to_osd();
1137 } else if (state
& PG_STATE_FORCED_BACKFILL
) {
1138 psdout(10) << __func__
<< " clear" << dendl
;
1139 state_clear(PG_STATE_FORCED_BACKFILL
);
1140 pl
->publish_stats_to_osd();
1144 psdout(20) << __func__
<< " state " << get_current_state()
1146 pl
->update_local_background_io_priority(get_backfill_priority());
1151 void PeeringState::schedule_renew_lease()
1153 pl
->schedule_renew_lease(
1155 readable_interval
/ 2);
1158 void PeeringState::send_lease()
1160 epoch_t epoch
= pl
->get_osdmap_epoch();
1161 for (auto peer
: actingset
) {
1162 if (peer
== pg_whoami
) {
1165 pl
->send_cluster_message(
1167 make_message
<MOSDPGLease
>(epoch
,
1168 spg_t(spgid
.pgid
, peer
.shard
),
1174 void PeeringState::proc_lease(const pg_lease_t
& l
)
1176 if (!HAVE_FEATURE(upacting_features
, SERVER_OCTOPUS
)) {
1177 psdout(20) << __func__
<< " no-op, upacting_features 0x" << std::hex
1178 << upacting_features
<< std::dec
1179 << " does not include SERVER_OCTOPUS" << dendl
;
1182 if (!is_nonprimary()) {
1183 psdout(20) << __func__
<< " no-op, !nonprimary" << dendl
;
1186 psdout(10) << __func__
<< " " << l
<< dendl
;
1187 if (l
.readable_until_ub
> readable_until_ub_from_primary
) {
1188 readable_until_ub_from_primary
= l
.readable_until_ub
;
1191 ceph::signedspan ru
= ceph::signedspan::zero();
1192 if (l
.readable_until
!= ceph::signedspan::zero() &&
1193 hb_stamps
[0]->peer_clock_delta_ub
) {
1194 ru
= l
.readable_until
- *hb_stamps
[0]->peer_clock_delta_ub
;
1195 psdout(20) << " peer_clock_delta_ub " << *hb_stamps
[0]->peer_clock_delta_ub
1196 << " -> ru " << ru
<< dendl
;
1198 if (ru
> readable_until
) {
1199 readable_until
= ru
;
1200 psdout(20) << __func__
<< " readable_until now " << readable_until
<< dendl
;
1201 // NOTE: if we ever decide to block/queue ops on the replica,
1202 // we'll need to wake them up here.
1205 ceph::signedspan ruub
;
1206 if (hb_stamps
[0]->peer_clock_delta_lb
) {
1207 ruub
= l
.readable_until_ub
- *hb_stamps
[0]->peer_clock_delta_lb
;
1208 psdout(20) << " peer_clock_delta_lb " << *hb_stamps
[0]->peer_clock_delta_lb
1209 << " -> ruub " << ruub
<< dendl
;
1211 ruub
= pl
->get_mnow() + l
.interval
;
1212 psdout(20) << " no peer_clock_delta_lb -> ruub " << ruub
<< dendl
;
1214 if (ruub
> readable_until_ub
) {
1215 readable_until_ub
= ruub
;
1216 psdout(20) << __func__
<< " readable_until_ub now " << readable_until_ub
1221 void PeeringState::proc_lease_ack(int from
, const pg_lease_ack_t
& a
)
1223 if (!HAVE_FEATURE(upacting_features
, SERVER_OCTOPUS
)) {
1226 auto now
= pl
->get_mnow();
1227 bool was_min
= false;
1228 for (unsigned i
= 0; i
< acting
.size(); ++i
) {
1229 if (from
== acting
[i
]) {
1230 // the lease_ack value is based on the primary's clock
1231 if (a
.readable_until_ub
> acting_readable_until_ub
[i
]) {
1232 if (acting_readable_until_ub
[i
] == readable_until
) {
1235 acting_readable_until_ub
[i
] = a
.readable_until_ub
;
1241 auto old_ru
= readable_until
;
1242 recalc_readable_until();
1244 pl
->recheck_readable();
1249 void PeeringState::proc_renew_lease()
1251 if (!HAVE_FEATURE(upacting_features
, SERVER_OCTOPUS
)) {
1254 renew_lease(pl
->get_mnow());
1256 schedule_renew_lease();
1259 void PeeringState::recalc_readable_until()
1261 assert(is_primary());
1262 ceph::signedspan min
= readable_until_ub_sent
;
1263 for (unsigned i
= 0; i
< acting
.size(); ++i
) {
1264 if (acting
[i
] == pg_whoami
.osd
|| acting
[i
] == CRUSH_ITEM_NONE
) {
1267 dout(20) << __func__
<< " peer osd." << acting
[i
]
1268 << " ruub " << acting_readable_until_ub
[i
] << dendl
;
1269 if (acting_readable_until_ub
[i
] < min
) {
1270 min
= acting_readable_until_ub
[i
];
1273 readable_until
= min
;
1274 readable_until_ub
= min
;
1275 dout(20) << __func__
<< " readable_until[_ub] " << readable_until
1276 << " (sent " << readable_until_ub_sent
<< ")" << dendl
;
1279 bool PeeringState::check_prior_readable_down_osds(const OSDMapRef
& map
)
1281 if (!HAVE_FEATURE(upacting_features
, SERVER_OCTOPUS
)) {
1284 bool changed
= false;
1285 auto p
= prior_readable_down_osds
.begin();
1286 while (p
!= prior_readable_down_osds
.end()) {
1287 if (map
->is_dead(*p
)) {
1288 dout(10) << __func__
<< " prior_readable_down_osds osd." << *p
1289 << " is dead as of epoch " << map
->get_epoch()
1291 p
= prior_readable_down_osds
.erase(p
);
1297 if (changed
&& prior_readable_down_osds
.empty()) {
1298 psdout(10) << " empty prior_readable_down_osds, clearing ub" << dendl
;
1299 clear_prior_readable_until_ub();
1305 bool PeeringState::adjust_need_up_thru(const OSDMapRef osdmap
)
1307 epoch_t up_thru
= osdmap
->get_up_thru(pg_whoami
.osd
);
1309 up_thru
>= info
.history
.same_interval_since
) {
1310 psdout(10) << "adjust_need_up_thru now "
1311 << up_thru
<< ", need_up_thru now false" << dendl
;
1312 need_up_thru
= false;
1318 PastIntervals::PriorSet
PeeringState::build_prior()
1322 for (auto it
= peer_info
.begin(); it
!= peer_info
.end(); ++it
) {
1323 ceph_assert(info
.history
.last_epoch_started
>=
1324 it
->second
.history
.last_epoch_started
);
1328 const OSDMap
&osdmap
= *get_osdmap();
1329 PastIntervals::PriorSet prior
= past_intervals
.get_prior_set(
1330 pool
.info
.is_erasure(),
1331 info
.history
.last_epoch_started
,
1332 &missing_loc
.get_recoverable_predicate(),
1333 [&](epoch_t start
, int osd
, epoch_t
*lost_at
) {
1334 const osd_info_t
*pinfo
= 0;
1335 if (osdmap
.exists(osd
)) {
1336 pinfo
= &osdmap
.get_info(osd
);
1338 *lost_at
= pinfo
->lost_at
;
1341 if (osdmap
.is_up(osd
)) {
1342 return PastIntervals::UP
;
1343 } else if (!pinfo
) {
1344 return PastIntervals::DNE
;
1345 } else if (pinfo
->lost_at
> start
) {
1346 return PastIntervals::LOST
;
1348 return PastIntervals::DOWN
;
1355 if (prior
.pg_down
) {
1356 state_set(PG_STATE_DOWN
);
1359 if (get_osdmap()->get_up_thru(pg_whoami
.osd
) <
1360 info
.history
.same_interval_since
) {
1361 psdout(10) << "up_thru " << get_osdmap()->get_up_thru(pg_whoami
.osd
)
1362 << " < same_since " << info
.history
.same_interval_since
1363 << ", must notify monitor" << dendl
;
1364 need_up_thru
= true;
1366 psdout(10) << "up_thru " << get_osdmap()->get_up_thru(pg_whoami
.osd
)
1367 << " >= same_since " << info
.history
.same_interval_since
1368 << ", all is well" << dendl
;
1369 need_up_thru
= false;
1371 pl
->set_probe_targets(prior
.probe
);
1375 bool PeeringState::needs_recovery() const
1377 ceph_assert(is_primary());
1379 auto &missing
= pg_log
.get_missing();
1381 if (missing
.num_missing()) {
1382 psdout(10) << __func__
<< " primary has " << missing
.num_missing()
1383 << " missing" << dendl
;
1387 ceph_assert(!acting_recovery_backfill
.empty());
1388 for (const pg_shard_t
& peer
: acting_recovery_backfill
) {
1389 if (peer
== get_primary()) {
1392 auto pm
= peer_missing
.find(peer
);
1393 if (pm
== peer_missing
.end()) {
1394 psdout(10) << __func__
<< " osd." << peer
<< " doesn't have missing set"
1398 if (pm
->second
.num_missing()) {
1399 psdout(10) << __func__
<< " osd." << peer
<< " has "
1400 << pm
->second
.num_missing() << " missing" << dendl
;
1405 psdout(10) << __func__
<< " is recovered" << dendl
;
1409 bool PeeringState::needs_backfill() const
1411 ceph_assert(is_primary());
1413 // We can assume that only possible osds that need backfill
1414 // are on the backfill_targets vector nodes.
1415 for (const pg_shard_t
& peer
: backfill_targets
) {
1416 auto pi
= peer_info
.find(peer
);
1417 ceph_assert(pi
!= peer_info
.end());
1418 if (!pi
->second
.last_backfill
.is_max()) {
1419 psdout(10) << __func__
<< " osd." << peer
1420 << " has last_backfill " << pi
->second
.last_backfill
<< dendl
;
1425 psdout(10) << __func__
<< " does not need backfill" << dendl
;
1430 * Returns true unless there is a non-lost OSD in might_have_unfound.
1432 bool PeeringState::all_unfound_are_queried_or_lost(
1433 const OSDMapRef osdmap
) const
1435 ceph_assert(is_primary());
1437 auto peer
= might_have_unfound
.begin();
1438 auto mend
= might_have_unfound
.end();
1439 for (; peer
!= mend
; ++peer
) {
1440 if (peer_missing
.count(*peer
))
1442 auto iter
= peer_info
.find(*peer
);
1443 if (iter
!= peer_info
.end() &&
1444 (iter
->second
.is_empty() || iter
->second
.dne()))
1446 if (!osdmap
->exists(peer
->osd
))
1448 const osd_info_t
&osd_info(osdmap
->get_info(peer
->osd
));
1449 if (osd_info
.lost_at
<= osd_info
.up_from
) {
1450 // If there is even one OSD in might_have_unfound that isn't lost, we
1451 // still might retrieve our unfound.
1455 psdout(10) << "all_unfound_are_queried_or_lost all of might_have_unfound "
1456 << might_have_unfound
1457 << " have been queried or are marked lost" << dendl
;
1462 void PeeringState::reject_reservation()
1464 pl
->unreserve_recovery_space();
1465 pl
->send_cluster_message(
1467 make_message
<MBackfillReserve
>(
1468 MBackfillReserve::REJECT_TOOFULL
,
1469 spg_t(info
.pgid
.pgid
, primary
.shard
),
1470 get_osdmap_epoch()),
1471 get_osdmap_epoch());
1477 * Returns an iterator to the best info in infos sorted by:
1478 * 1) Prefer newer last_update
1479 * 2) Prefer longer tail if it brings another info into contiguity
1480 * 3) Prefer current primary
1482 map
<pg_shard_t
, pg_info_t
>::const_iterator
PeeringState::find_best_info(
1483 const map
<pg_shard_t
, pg_info_t
> &infos
,
1484 bool restrict_to_up_acting
,
1485 bool *history_les_bound
) const
1487 ceph_assert(history_les_bound
);
1488 /* See doc/dev/osd_internals/last_epoch_started.rst before attempting
1489 * to make changes to this process. Also, make sure to update it
1490 * when you find bugs! */
1491 epoch_t max_last_epoch_started_found
= 0;
1492 for (auto i
= infos
.begin(); i
!= infos
.end(); ++i
) {
1493 if (!cct
->_conf
->osd_find_best_info_ignore_history_les
&&
1494 max_last_epoch_started_found
< i
->second
.history
.last_epoch_started
) {
1495 *history_les_bound
= true;
1496 max_last_epoch_started_found
= i
->second
.history
.last_epoch_started
;
1498 if (!i
->second
.is_incomplete() &&
1499 max_last_epoch_started_found
< i
->second
.last_epoch_started
) {
1500 *history_les_bound
= false;
1501 max_last_epoch_started_found
= i
->second
.last_epoch_started
;
1504 eversion_t min_last_update_acceptable
= eversion_t::max();
1505 for (auto i
= infos
.begin(); i
!= infos
.end(); ++i
) {
1506 if (max_last_epoch_started_found
<= i
->second
.last_epoch_started
) {
1507 if (min_last_update_acceptable
> i
->second
.last_update
)
1508 min_last_update_acceptable
= i
->second
.last_update
;
1511 if (min_last_update_acceptable
== eversion_t::max())
1514 auto best
= infos
.end();
1515 // find osd with newest last_update (oldest for ec_pool).
1516 // if there are multiples, prefer
1517 // - a longer tail, if it brings another peer into log contiguity
1518 // - the current primary
1519 for (auto p
= infos
.begin(); p
!= infos
.end(); ++p
) {
1520 if (restrict_to_up_acting
&& !is_up(p
->first
) &&
1521 !is_acting(p
->first
))
1523 // Only consider peers with last_update >= min_last_update_acceptable
1524 if (p
->second
.last_update
< min_last_update_acceptable
)
1526 // Disqualify anyone with a too old last_epoch_started
1527 if (p
->second
.last_epoch_started
< max_last_epoch_started_found
)
1529 // Disqualify anyone who is incomplete (not fully backfilled)
1530 if (p
->second
.is_incomplete())
1532 if (best
== infos
.end()) {
1536 // Prefer newer last_update
1537 if (pool
.info
.require_rollback()) {
1538 if (p
->second
.last_update
> best
->second
.last_update
)
1540 if (p
->second
.last_update
< best
->second
.last_update
) {
1545 if (p
->second
.last_update
< best
->second
.last_update
)
1547 if (p
->second
.last_update
> best
->second
.last_update
) {
1553 // Prefer longer tail
1554 if (p
->second
.log_tail
> best
->second
.log_tail
) {
1556 } else if (p
->second
.log_tail
< best
->second
.log_tail
) {
1561 if (!p
->second
.has_missing() && best
->second
.has_missing()) {
1562 psdout(10) << __func__
<< " prefer osd." << p
->first
1563 << " because it is complete while best has missing"
1567 } else if (p
->second
.has_missing() && !best
->second
.has_missing()) {
1568 psdout(10) << __func__
<< " skipping osd." << p
->first
1569 << " because it has missing while best is complete"
1573 // both are complete or have missing
1577 // prefer current primary (usually the caller), all things being equal
1578 if (p
->first
== pg_whoami
) {
1579 psdout(10) << "calc_acting prefer osd." << p
->first
1580 << " because it is current primary" << dendl
;
1588 void PeeringState::calc_ec_acting(
1589 map
<pg_shard_t
, pg_info_t
>::const_iterator auth_log_shard
,
1591 const vector
<int> &acting
,
1592 const vector
<int> &up
,
1593 const map
<pg_shard_t
, pg_info_t
> &all_info
,
1594 bool restrict_to_up_acting
,
1596 set
<pg_shard_t
> *backfill
,
1597 set
<pg_shard_t
> *acting_backfill
,
1600 vector
<int> want(size
, CRUSH_ITEM_NONE
);
1601 map
<shard_id_t
, set
<pg_shard_t
> > all_info_by_shard
;
1602 for (auto i
= all_info
.begin();
1603 i
!= all_info
.end();
1605 all_info_by_shard
[i
->first
.shard
].insert(i
->first
);
1607 for (uint8_t i
= 0; i
< want
.size(); ++i
) {
1608 ss
<< "For position " << (unsigned)i
<< ": ";
1609 if (up
.size() > (unsigned)i
&& up
[i
] != CRUSH_ITEM_NONE
&&
1610 !all_info
.find(pg_shard_t(up
[i
], shard_id_t(i
)))->second
.is_incomplete() &&
1611 all_info
.find(pg_shard_t(up
[i
], shard_id_t(i
)))->second
.last_update
>=
1612 auth_log_shard
->second
.log_tail
) {
1613 ss
<< " selecting up[i]: " << pg_shard_t(up
[i
], shard_id_t(i
)) << std::endl
;
1617 if (up
.size() > (unsigned)i
&& up
[i
] != CRUSH_ITEM_NONE
) {
1618 ss
<< " backfilling up[i]: " << pg_shard_t(up
[i
], shard_id_t(i
))
1620 backfill
->insert(pg_shard_t(up
[i
], shard_id_t(i
)));
1623 if (acting
.size() > (unsigned)i
&& acting
[i
] != CRUSH_ITEM_NONE
&&
1624 !all_info
.find(pg_shard_t(acting
[i
], shard_id_t(i
)))->second
.is_incomplete() &&
1625 all_info
.find(pg_shard_t(acting
[i
], shard_id_t(i
)))->second
.last_update
>=
1626 auth_log_shard
->second
.log_tail
) {
1627 ss
<< " selecting acting[i]: " << pg_shard_t(acting
[i
], shard_id_t(i
)) << std::endl
;
1628 want
[i
] = acting
[i
];
1629 } else if (!restrict_to_up_acting
) {
1630 for (auto j
= all_info_by_shard
[shard_id_t(i
)].begin();
1631 j
!= all_info_by_shard
[shard_id_t(i
)].end();
1633 ceph_assert(j
->shard
== i
);
1634 if (!all_info
.find(*j
)->second
.is_incomplete() &&
1635 all_info
.find(*j
)->second
.last_update
>=
1636 auth_log_shard
->second
.log_tail
) {
1637 ss
<< " selecting stray: " << *j
<< std::endl
;
1642 if (want
[i
] == CRUSH_ITEM_NONE
)
1643 ss
<< " failed to fill position " << (int)i
<< std::endl
;
1647 for (uint8_t i
= 0; i
< want
.size(); ++i
) {
1648 if (want
[i
] != CRUSH_ITEM_NONE
) {
1649 acting_backfill
->insert(pg_shard_t(want
[i
], shard_id_t(i
)));
1652 acting_backfill
->insert(backfill
->begin(), backfill
->end());
1656 std::pair
<map
<pg_shard_t
, pg_info_t
>::const_iterator
, eversion_t
>
1657 PeeringState::select_replicated_primary(
1658 map
<pg_shard_t
, pg_info_t
>::const_iterator auth_log_shard
,
1659 uint64_t force_auth_primary_missing_objects
,
1660 const std::vector
<int> &up
,
1661 pg_shard_t up_primary
,
1662 const map
<pg_shard_t
, pg_info_t
> &all_info
,
1663 const OSDMapRef osdmap
,
1666 pg_shard_t auth_log_shard_id
= auth_log_shard
->first
;
1668 ss
<< __func__
<< " newest update on osd." << auth_log_shard_id
1669 << " with " << auth_log_shard
->second
<< std::endl
;
1672 auto primary
= all_info
.find(up_primary
);
1674 !primary
->second
.is_incomplete() &&
1675 primary
->second
.last_update
>=
1676 auth_log_shard
->second
.log_tail
) {
1677 if (HAVE_FEATURE(osdmap
->get_up_osd_features(), SERVER_NAUTILUS
)) {
1678 auto approx_missing_objects
=
1679 primary
->second
.stats
.stats
.sum
.num_objects_missing
;
1680 auto auth_version
= auth_log_shard
->second
.last_update
.version
;
1681 auto primary_version
= primary
->second
.last_update
.version
;
1682 if (auth_version
> primary_version
) {
1683 approx_missing_objects
+= auth_version
- primary_version
;
1685 approx_missing_objects
+= primary_version
- auth_version
;
1687 if ((uint64_t)approx_missing_objects
>
1688 force_auth_primary_missing_objects
) {
1689 primary
= auth_log_shard
;
1690 ss
<< "up_primary: " << up_primary
<< ") has approximate "
1691 << approx_missing_objects
1692 << "(>" << force_auth_primary_missing_objects
<<") "
1693 << "missing objects, osd." << auth_log_shard_id
1694 << " selected as primary instead"
1697 ss
<< "up_primary: " << up_primary
<< ") selected as primary"
1701 ss
<< "up_primary: " << up_primary
<< ") selected as primary" << std::endl
;
1704 ceph_assert(!auth_log_shard
->second
.is_incomplete());
1705 ss
<< "up[0] needs backfill, osd." << auth_log_shard_id
1706 << " selected as primary instead" << std::endl
;
1707 primary
= auth_log_shard
;
1710 ss
<< __func__
<< " primary is osd." << primary
->first
1711 << " with " << primary
->second
<< std::endl
;
1713 /* We include auth_log_shard->second.log_tail because in GetLog,
1714 * we will request logs back to the min last_update over our
1715 * acting_backfill set, which will result in our log being extended
1716 * as far backwards as necessary to pick up any peers which can
1717 * be log recovered by auth_log_shard's log */
1718 eversion_t oldest_auth_log_entry
=
1719 std::min(primary
->second
.log_tail
, auth_log_shard
->second
.log_tail
);
1721 return std::make_pair(primary
, oldest_auth_log_entry
);
1726 * calculate the desired acting set.
1728 * Choose an appropriate acting set. Prefer up[0], unless it is
1729 * incomplete, or another osd has a longer tail that allows us to
1730 * bring other up nodes up to date.
1732 void PeeringState::calc_replicated_acting(
1733 map
<pg_shard_t
, pg_info_t
>::const_iterator primary
,
1734 eversion_t oldest_auth_log_entry
,
1736 const vector
<int> &acting
,
1737 const vector
<int> &up
,
1738 pg_shard_t up_primary
,
1739 const map
<pg_shard_t
, pg_info_t
> &all_info
,
1740 bool restrict_to_up_acting
,
1742 set
<pg_shard_t
> *backfill
,
1743 set
<pg_shard_t
> *acting_backfill
,
1744 const OSDMapRef osdmap
,
1748 ss
<< __func__
<< (restrict_to_up_acting
? " restrict_to_up_acting" : "")
1751 want
->push_back(primary
->first
.osd
);
1752 acting_backfill
->insert(primary
->first
);
1754 // select replicas that have log contiguity with primary.
1755 // prefer up, then acting, then any peer_info osds
1757 pg_shard_t up_cand
= pg_shard_t(i
, shard_id_t::NO_SHARD
);
1758 if (up_cand
== primary
->first
)
1760 const pg_info_t
&cur_info
= all_info
.find(up_cand
)->second
;
1761 if (cur_info
.is_incomplete() ||
1762 cur_info
.last_update
< oldest_auth_log_entry
) {
1763 ss
<< " shard " << up_cand
<< " (up) backfill " << cur_info
<< std::endl
;
1764 backfill
->insert(up_cand
);
1765 acting_backfill
->insert(up_cand
);
1768 acting_backfill
->insert(up_cand
);
1769 ss
<< " osd." << i
<< " (up) accepted " << cur_info
<< std::endl
;
1773 if (want
->size() >= size
) {
1777 std::vector
<std::pair
<eversion_t
, int>> candidate_by_last_update
;
1778 candidate_by_last_update
.reserve(acting
.size());
1779 // This no longer has backfill OSDs, but they are covered above.
1780 for (auto i
: acting
) {
1781 pg_shard_t
acting_cand(i
, shard_id_t::NO_SHARD
);
1782 // skip up osds we already considered above
1783 if (acting_cand
== primary
->first
)
1785 auto up_it
= find(up
.begin(), up
.end(), i
);
1786 if (up_it
!= up
.end())
1789 const pg_info_t
&cur_info
= all_info
.find(acting_cand
)->second
;
1790 if (cur_info
.is_incomplete() ||
1791 cur_info
.last_update
< oldest_auth_log_entry
) {
1792 ss
<< " shard " << acting_cand
<< " (acting) REJECTED "
1793 << cur_info
<< std::endl
;
1795 candidate_by_last_update
.emplace_back(cur_info
.last_update
, i
);
1799 auto sort_by_eversion
=[](const std::pair
<eversion_t
, int> &lhs
,
1800 const std::pair
<eversion_t
, int> &rhs
) {
1801 return lhs
.first
> rhs
.first
;
1803 // sort by last_update, in descending order.
1804 std::sort(candidate_by_last_update
.begin(),
1805 candidate_by_last_update
.end(), sort_by_eversion
);
1806 for (auto &p
: candidate_by_last_update
) {
1807 ceph_assert(want
->size() < size
);
1808 want
->push_back(p
.second
);
1809 pg_shard_t s
= pg_shard_t(p
.second
, shard_id_t::NO_SHARD
);
1810 acting_backfill
->insert(s
);
1811 ss
<< " shard " << s
<< " (acting) accepted "
1812 << all_info
.find(s
)->second
<< std::endl
;
1813 if (want
->size() >= size
) {
1818 if (restrict_to_up_acting
) {
1821 candidate_by_last_update
.clear();
1822 candidate_by_last_update
.reserve(all_info
.size()); // overestimate but fine
1823 // continue to search stray to find more suitable peers
1824 for (auto &i
: all_info
) {
1825 // skip up osds we already considered above
1826 if (i
.first
== primary
->first
)
1828 auto up_it
= find(up
.begin(), up
.end(), i
.first
.osd
);
1829 if (up_it
!= up
.end())
1831 auto acting_it
= find(
1832 acting
.begin(), acting
.end(), i
.first
.osd
);
1833 if (acting_it
!= acting
.end())
1836 if (i
.second
.is_incomplete() ||
1837 i
.second
.last_update
< oldest_auth_log_entry
) {
1838 ss
<< " shard " << i
.first
<< " (stray) REJECTED " << i
.second
1841 candidate_by_last_update
.emplace_back(
1842 i
.second
.last_update
, i
.first
.osd
);
1846 if (candidate_by_last_update
.empty()) {
1847 // save us some effort
1851 // sort by last_update, in descending order.
1852 std::sort(candidate_by_last_update
.begin(),
1853 candidate_by_last_update
.end(), sort_by_eversion
);
1855 for (auto &p
: candidate_by_last_update
) {
1856 ceph_assert(want
->size() < size
);
1857 want
->push_back(p
.second
);
1858 pg_shard_t s
= pg_shard_t(p
.second
, shard_id_t::NO_SHARD
);
1859 acting_backfill
->insert(s
);
1860 ss
<< " shard " << s
<< " (stray) accepted "
1861 << all_info
.find(s
)->second
<< std::endl
;
1862 if (want
->size() >= size
) {
1868 // Defines osd preference order: acting set, then larger last_update
1869 using osd_ord_t
= std::tuple
<bool, eversion_t
>; // <acting, last_update>
1870 using osd_id_t
= int;
1872 class bucket_candidates_t
{
1873 std::deque
<std::pair
<osd_ord_t
, osd_id_t
>> osds
;
1877 void add_osd(osd_ord_t ord
, osd_id_t osd
) {
1878 // osds will be added in smallest to largest order
1879 assert(osds
.empty() || osds
.back().first
<= ord
);
1880 osds
.push_back(std::make_pair(ord
, osd
));
1882 osd_id_t
pop_osd() {
1883 ceph_assert(!is_empty());
1884 auto ret
= osds
.back();
1889 void inc_selected() { selected
++; }
1890 unsigned get_num_selected() const { return selected
; }
1892 osd_ord_t
get_ord() const {
1893 return osds
.empty() ? std::make_tuple(false, eversion_t())
1894 : osds
.back().first
;
1897 bool is_empty() const { return osds
.empty(); }
1899 bool operator<(const bucket_candidates_t
&rhs
) const {
1900 return std::make_tuple(-selected
, get_ord()) <
1901 std::make_tuple(-rhs
.selected
, rhs
.get_ord());
1904 friend std::ostream
&operator<<(std::ostream
&, const bucket_candidates_t
&);
1907 std::ostream
&operator<<(std::ostream
&lhs
, const bucket_candidates_t
&cand
)
1909 return lhs
<< "candidates[" << cand
.osds
<< "]";
1912 class bucket_heap_t
{
1913 using elem_t
= std::reference_wrapper
<bucket_candidates_t
>;
1914 std::vector
<elem_t
> heap
;
1916 // Max heap -- should emit buckets in order of preference
1918 bool operator()(const elem_t
&lhs
, const elem_t
&rhs
) {
1919 return lhs
.get() < rhs
.get();
1923 void push_if_nonempty(elem_t e
) {
1924 if (!e
.get().is_empty()) {
1926 std::push_heap(heap
.begin(), heap
.end(), comp());
1930 std::pop_heap(heap
.begin(), heap
.end(), comp());
1931 auto ret
= heap
.back();
1936 bool is_empty() const { return heap
.empty(); }
1940 * calc_replicated_acting_stretch
1942 * Choose an acting set using as much of the up set as possible; filling
1943 * in the remaining slots so as to maximize the number of crush buckets at
1944 * level pool.info.peering_crush_bucket_barrier represented.
1946 * Stretch clusters are a bit special: while they have a "size" the
1947 * same way as normal pools, if we happen to lose a data center
1948 * (we call it a "stretch bucket", but really it'll be a data center or
1949 * a cloud availability zone), we don't actually want to shove
1950 * 2 DC's worth of replication into a single site -- it won't fit!
1951 * So we locally calculate a bucket_max, based
1952 * on the targeted number of stretch buckets for the pool and
1953 * its size. Then we won't pull more than bucket_max from any
1954 * given ancestor even if it leaves us undersized.
1956 * There are two distinct phases: (commented below)
1958 void PeeringState::calc_replicated_acting_stretch(
1959 map
<pg_shard_t
, pg_info_t
>::const_iterator primary
,
1960 eversion_t oldest_auth_log_entry
,
1962 const vector
<int> &acting
,
1963 const vector
<int> &up
,
1964 pg_shard_t up_primary
,
1965 const map
<pg_shard_t
, pg_info_t
> &all_info
,
1966 bool restrict_to_up_acting
,
1968 set
<pg_shard_t
> *backfill
,
1969 set
<pg_shard_t
> *acting_backfill
,
1970 const OSDMapRef osdmap
,
1975 ceph_assert(acting_backfill
);
1976 ceph_assert(backfill
);
1977 ss
<< __func__
<< (restrict_to_up_acting
? " restrict_to_up_acting" : "")
1980 auto used
= [want
](int osd
) {
1981 return std::find(want
->begin(), want
->end(), osd
) != want
->end();
1984 auto usable_info
= [&](const auto &cur_info
) mutable {
1985 return !(cur_info
.is_incomplete() ||
1986 cur_info
.last_update
< oldest_auth_log_entry
);
1989 auto osd_info
= [&](int osd
) mutable -> const pg_info_t
& {
1990 pg_shard_t cand
= pg_shard_t(osd
, shard_id_t::NO_SHARD
);
1991 const pg_info_t
&cur_info
= all_info
.find(cand
)->second
;
1995 auto usable_osd
= [&](int osd
) mutable {
1996 return usable_info(osd_info(osd
));
1999 std::map
<int, bucket_candidates_t
> ancestors
;
2000 auto get_ancestor
= [&](int osd
) mutable {
2001 int ancestor
= osdmap
->crush
->get_parent_of_type(
2003 pool
.info
.peering_crush_bucket_barrier
,
2004 pool
.info
.crush_rule
);
2005 return &ancestors
[ancestor
];
2008 unsigned bucket_max
= pool
.info
.size
/ pool
.info
.peering_crush_bucket_target
;
2009 if (bucket_max
* pool
.info
.peering_crush_bucket_target
< pool
.info
.size
) {
2013 /* 1) Select all usable osds from the up set as well as the primary
2015 * We also stash any unusable osds from up into backfill.
2017 auto add_required
= [&](int osd
) {
2019 want
->push_back(osd
);
2020 acting_backfill
->insert(
2021 pg_shard_t(osd
, shard_id_t::NO_SHARD
));
2022 get_ancestor(osd
)->inc_selected();
2025 add_required(primary
->first
.osd
);
2026 ss
<< " osd " << primary
->first
.osd
<< " primary accepted "
2027 << osd_info(primary
->first
.osd
) << std::endl
;
2028 for (auto upcand
: up
) {
2029 auto upshard
= pg_shard_t(upcand
, shard_id_t::NO_SHARD
);
2030 auto &curinfo
= osd_info(upcand
);
2031 if (usable_osd(upcand
)) {
2032 ss
<< " osd " << upcand
<< " (up) accepted " << curinfo
<< std::endl
;
2033 add_required(upcand
);
2035 ss
<< " osd " << upcand
<< " (up) backfill " << curinfo
<< std::endl
;
2036 backfill
->insert(upshard
);
2037 acting_backfill
->insert(upshard
);
2041 if (want
->size() >= pool
.info
.size
) { // non-failed CRUSH mappings are valid
2042 ss
<< " up set sufficient" << std::endl
;
2045 ss
<< " up set insufficient, considering remaining osds" << std::endl
;
2047 /* 2) Fill out remaining slots from usable osds in all_info
2048 * while maximizing the number of ancestor nodes at the
2049 * barrier_id crush level.
2052 std::vector
<std::pair
<osd_ord_t
, osd_id_t
>> candidates
;
2053 /* To do this, we first filter the set of usable osd into an ordered
2054 * list of usable osds
2056 auto get_osd_ord
= [&](bool is_acting
, const pg_info_t
&info
) -> osd_ord_t
{
2057 return std::make_tuple(
2058 !is_acting
/* acting should sort first */,
2061 for (auto &cand
: acting
) {
2062 auto &cand_info
= osd_info(cand
);
2063 if (!used(cand
) && usable_info(cand_info
)) {
2064 ss
<< " acting candidate " << cand
<< " " << cand_info
<< std::endl
;
2065 candidates
.push_back(std::make_pair(get_osd_ord(true, cand_info
), cand
));
2068 if (!restrict_to_up_acting
) {
2069 for (auto &[cand
, info
] : all_info
) {
2070 if (!used(cand
.osd
) && usable_info(info
) &&
2071 (std::find(acting
.begin(), acting
.end(), cand
.osd
)
2073 ss
<< " other candidate " << cand
<< " " << info
<< std::endl
;
2074 candidates
.push_back(
2075 std::make_pair(get_osd_ord(false, info
), cand
.osd
));
2079 std::sort(candidates
.begin(), candidates
.end());
2081 // We then filter these candidates by ancestor
2082 std::for_each(candidates
.begin(), candidates
.end(), [&](auto cand
) {
2083 get_ancestor(cand
.second
)->add_osd(cand
.first
, cand
.second
);
2087 auto pop_ancestor
= [&](auto &ancestor
) {
2088 ceph_assert(!ancestor
.is_empty());
2089 auto osd
= ancestor
.pop_osd();
2091 ss
<< " accepting candidate " << osd
<< std::endl
;
2093 ceph_assert(!used(osd
));
2094 ceph_assert(usable_osd(osd
));
2096 want
->push_back(osd
);
2097 acting_backfill
->insert(
2098 pg_shard_t(osd
, shard_id_t::NO_SHARD
));
2099 ancestor
.inc_selected();
2102 /* Next, we use the ancestors map to grab a descendant of the
2103 * peering_crush_mandatory_member if not already represented.
2105 * TODO: using 0 here to match other users. Prior to merge, I
2106 * expect that this and other users should instead check against
2109 if (pool
.info
.peering_crush_mandatory_member
!= CRUSH_ITEM_NONE
) {
2110 auto aiter
= ancestors
.find(pool
.info
.peering_crush_mandatory_member
);
2111 if (aiter
!= ancestors
.end() &&
2112 !aiter
->second
.get_num_selected()) {
2113 ss
<< " adding required ancestor " << aiter
->first
<< std::endl
;
2114 ceph_assert(!aiter
->second
.is_empty()); // wouldn't exist otherwise
2115 pop_ancestor(aiter
->second
);
2119 /* We then place the ancestors in a heap ordered by fewest selected
2120 * and then by the ordering token of the next osd */
2121 bucket_heap_t aheap
;
2122 std::for_each(ancestors
.begin(), ancestors
.end(), [&](auto &anc
) {
2123 aheap
.push_if_nonempty(anc
.second
);
2126 /* and pull from this heap until it's empty or we have enough.
2127 * "We have enough" is a sufficient check here for
2128 * stretch_set_can_peer() because our heap sorting always
2129 * pulls from ancestors with the least number of included OSDs,
2130 * so if it is possible to satisfy the bucket_count constraints we
2133 while (!aheap
.is_empty() && want
->size() < pool
.info
.size
) {
2134 auto next
= aheap
.pop();
2135 pop_ancestor(next
.get());
2136 if (next
.get().get_num_selected() < bucket_max
) {
2137 aheap
.push_if_nonempty(next
);
2141 /* The end result is that we should have as many buckets covered as
2142 * possible while respecting up, the primary selection,
2143 * the pool size (given bucket count constraints),
2144 * and the mandatory member.
2149 bool PeeringState::recoverable(const vector
<int> &want
) const
2151 unsigned num_want_acting
= 0;
2152 set
<pg_shard_t
> have
;
2153 for (int i
= 0; i
< (int)want
.size(); ++i
) {
2154 if (want
[i
] != CRUSH_ITEM_NONE
) {
2159 pool
.info
.is_erasure() ? shard_id_t(i
) : shard_id_t::NO_SHARD
));
2163 if (num_want_acting
< pool
.info
.min_size
) {
2164 const bool recovery_ec_pool_below_min_size
=
2165 HAVE_FEATURE(get_osdmap()->get_up_osd_features(), SERVER_OCTOPUS
);
2167 if (pool
.info
.is_erasure() && !recovery_ec_pool_below_min_size
) {
2168 psdout(10) << __func__
<< " failed, ec recovery below min size not supported by pre-octopus" << dendl
;
2170 } else if (!cct
->_conf
.get_val
<bool>("osd_allow_recovery_below_min_size")) {
2171 psdout(10) << __func__
<< " failed, recovery below min size not enabled" << dendl
;
2175 if (missing_loc
.get_recoverable_predicate()(have
)) {
2178 psdout(10) << __func__
<< " failed, not recoverable " << dendl
;
2183 void PeeringState::choose_async_recovery_ec(
2184 const map
<pg_shard_t
, pg_info_t
> &all_info
,
2185 const pg_info_t
&auth_info
,
2187 set
<pg_shard_t
> *async_recovery
,
2188 const OSDMapRef osdmap
) const
2190 set
<pair
<int, pg_shard_t
> > candidates_by_cost
;
2191 for (uint8_t i
= 0; i
< want
->size(); ++i
) {
2192 if ((*want
)[i
] == CRUSH_ITEM_NONE
)
2195 // Considering log entries to recover is accurate enough for
2196 // now. We could use minimum_to_decode_with_cost() later if
2198 pg_shard_t
shard_i((*want
)[i
], shard_id_t(i
));
2199 // do not include strays
2200 if (stray_set
.find(shard_i
) != stray_set
.end())
2202 // Do not include an osd that is not up, since choosing it as
2203 // an async_recovery_target will move it out of the acting set.
2204 // This results in it being identified as a stray during peering,
2205 // because it is no longer in the up or acting set.
2206 if (!is_up(shard_i
))
2208 auto shard_info
= all_info
.find(shard_i
)->second
;
2209 // for ec pools we rollback all entries past the authoritative
2210 // last_update *before* activation. This is relatively inexpensive
2211 // compared to recovery, since it is purely local, so treat shards
2212 // past the authoritative last_update the same as those equal to it.
2213 version_t auth_version
= auth_info
.last_update
.version
;
2214 version_t candidate_version
= shard_info
.last_update
.version
;
2215 if (HAVE_FEATURE(osdmap
->get_up_osd_features(), SERVER_NAUTILUS
)) {
2216 auto approx_missing_objects
=
2217 shard_info
.stats
.stats
.sum
.num_objects_missing
;
2218 if (auth_version
> candidate_version
) {
2219 approx_missing_objects
+= auth_version
- candidate_version
;
2221 if (static_cast<uint64_t>(approx_missing_objects
) >
2222 cct
->_conf
.get_val
<uint64_t>("osd_async_recovery_min_cost")) {
2223 candidates_by_cost
.emplace(approx_missing_objects
, shard_i
);
2226 if (auth_version
> candidate_version
&&
2227 (auth_version
- candidate_version
) > cct
->_conf
.get_val
<uint64_t>("osd_async_recovery_min_cost")) {
2228 candidates_by_cost
.insert(make_pair(auth_version
- candidate_version
, shard_i
));
2233 psdout(20) << __func__
<< " candidates by cost are: " << candidates_by_cost
2236 // take out as many osds as we can for async recovery, in order of cost
2237 for (auto rit
= candidates_by_cost
.rbegin();
2238 rit
!= candidates_by_cost
.rend(); ++rit
) {
2239 pg_shard_t cur_shard
= rit
->second
;
2240 vector
<int> candidate_want(*want
);
2241 candidate_want
[cur_shard
.shard
.id
] = CRUSH_ITEM_NONE
;
2242 if (recoverable(candidate_want
)) {
2243 want
->swap(candidate_want
);
2244 async_recovery
->insert(cur_shard
);
2247 psdout(20) << __func__
<< " result want=" << *want
2248 << " async_recovery=" << *async_recovery
<< dendl
;
2251 void PeeringState::choose_async_recovery_replicated(
2252 const map
<pg_shard_t
, pg_info_t
> &all_info
,
2253 const pg_info_t
&auth_info
,
2255 set
<pg_shard_t
> *async_recovery
,
2256 const OSDMapRef osdmap
) const
2258 set
<pair
<int, pg_shard_t
> > candidates_by_cost
;
2259 for (auto osd_num
: *want
) {
2260 pg_shard_t
shard_i(osd_num
, shard_id_t::NO_SHARD
);
2261 // do not include strays
2262 if (stray_set
.find(shard_i
) != stray_set
.end())
2264 // Do not include an osd that is not up, since choosing it as
2265 // an async_recovery_target will move it out of the acting set.
2266 // This results in it being identified as a stray during peering,
2267 // because it is no longer in the up or acting set.
2268 if (!is_up(shard_i
))
2270 auto shard_info
= all_info
.find(shard_i
)->second
;
2271 // use the approximate magnitude of the difference in length of
2272 // logs plus historical missing objects as the cost of recovery
2273 version_t auth_version
= auth_info
.last_update
.version
;
2274 version_t candidate_version
= shard_info
.last_update
.version
;
2275 if (HAVE_FEATURE(osdmap
->get_up_osd_features(), SERVER_NAUTILUS
)) {
2276 auto approx_missing_objects
=
2277 shard_info
.stats
.stats
.sum
.num_objects_missing
;
2278 if (auth_version
> candidate_version
) {
2279 approx_missing_objects
+= auth_version
- candidate_version
;
2281 approx_missing_objects
+= candidate_version
- auth_version
;
2283 if (static_cast<uint64_t>(approx_missing_objects
) >
2284 cct
->_conf
.get_val
<uint64_t>("osd_async_recovery_min_cost")) {
2285 candidates_by_cost
.emplace(approx_missing_objects
, shard_i
);
2288 size_t approx_entries
;
2289 if (auth_version
> candidate_version
) {
2290 approx_entries
= auth_version
- candidate_version
;
2292 approx_entries
= candidate_version
- auth_version
;
2294 if (approx_entries
> cct
->_conf
.get_val
<uint64_t>("osd_async_recovery_min_cost")) {
2295 candidates_by_cost
.insert(make_pair(approx_entries
, shard_i
));
2300 psdout(20) << __func__
<< " candidates by cost are: " << candidates_by_cost
2302 // take out as many osds as we can for async recovery, in order of cost
2303 for (auto rit
= candidates_by_cost
.rbegin();
2304 rit
!= candidates_by_cost
.rend(); ++rit
) {
2305 if (want
->size() <= pool
.info
.min_size
) {
2308 pg_shard_t cur_shard
= rit
->second
;
2309 vector
<int> candidate_want(*want
);
2310 for (auto it
= candidate_want
.begin(); it
!= candidate_want
.end(); ++it
) {
2311 if (*it
== cur_shard
.osd
) {
2312 candidate_want
.erase(it
);
2313 if (pool
.info
.stretch_set_can_peer(candidate_want
, *osdmap
, NULL
)) {
2314 // if we're in stretch mode, we can only remove the osd if it doesn't
2315 // break peering limits.
2316 want
->swap(candidate_want
);
2317 async_recovery
->insert(cur_shard
);
2324 psdout(20) << __func__
<< " result want=" << *want
2325 << " async_recovery=" << *async_recovery
<< dendl
;
2331 * calculate the desired acting, and request a change with the monitor
2332 * if it differs from the current acting.
2334 * if restrict_to_up_acting=true, we filter out anything that's not in
2335 * up/acting. in order to lift this restriction, we need to
2336 * 1) check whether it's worth switching the acting set any time we get
2337 * a new pg info (not just here, when recovery finishes)
2338 * 2) check whether anything in want_acting went down on each new map
2339 * (and, if so, calculate a new want_acting)
2340 * 3) remove the assertion in PG::PeeringState::Active::react(const AdvMap)
2343 bool PeeringState::choose_acting(pg_shard_t
&auth_log_shard_id
,
2344 bool restrict_to_up_acting
,
2345 bool *history_les_bound
,
2346 bool request_pg_temp_change_only
)
2348 map
<pg_shard_t
, pg_info_t
> all_info(peer_info
.begin(), peer_info
.end());
2349 all_info
[pg_whoami
] = info
;
2351 if (cct
->_conf
->subsys
.should_gather
<dout_subsys
, 10>()) {
2352 for (auto p
= all_info
.begin(); p
!= all_info
.end(); ++p
) {
2353 psdout(10) << __func__
<< " all_info osd." << p
->first
<< " "
2354 << p
->second
<< dendl
;
2358 auto auth_log_shard
= find_best_info(all_info
, restrict_to_up_acting
,
2361 if (auth_log_shard
== all_info
.end()) {
2363 psdout(10) << __func__
<< " no suitable info found (incomplete backfills?),"
2364 << " reverting to up" << dendl
;
2367 pl
->queue_want_pg_temp(empty
);
2369 psdout(10) << __func__
<< " failed" << dendl
;
2370 ceph_assert(want_acting
.empty());
2375 ceph_assert(!auth_log_shard
->second
.is_incomplete());
2376 auth_log_shard_id
= auth_log_shard
->first
;
2378 set
<pg_shard_t
> want_backfill
, want_acting_backfill
;
2381 if (pool
.info
.is_replicated()) {
2382 auto [primary_shard
, oldest_log
] = select_replicated_primary(
2384 cct
->_conf
.get_val
<uint64_t>(
2385 "osd_force_auth_primary_missing_objects"),
2391 if (pool
.info
.is_stretch_pool()) {
2392 calc_replicated_acting_stretch(
2395 get_osdmap()->get_pg_size(info
.pgid
.pgid
),
2400 restrict_to_up_acting
,
2403 &want_acting_backfill
,
2408 calc_replicated_acting(
2411 get_osdmap()->get_pg_size(info
.pgid
.pgid
),
2416 restrict_to_up_acting
,
2419 &want_acting_backfill
,
2427 get_osdmap()->get_pg_size(info
.pgid
.pgid
),
2431 restrict_to_up_acting
,
2434 &want_acting_backfill
,
2437 psdout(10) << ss
.str() << dendl
;
2439 if (!recoverable(want
)) {
2440 want_acting
.clear();
2444 set
<pg_shard_t
> want_async_recovery
;
2445 if (HAVE_FEATURE(get_osdmap()->get_up_osd_features(), SERVER_MIMIC
)) {
2446 if (pool
.info
.is_erasure()) {
2447 choose_async_recovery_ec(
2448 all_info
, auth_log_shard
->second
, &want
, &want_async_recovery
,
2451 choose_async_recovery_replicated(
2452 all_info
, auth_log_shard
->second
, &want
, &want_async_recovery
,
2456 while (want
.size() > pool
.info
.size
) {
2457 // async recovery should have taken out as many osds as it can.
2458 // if not, then always evict the last peer
2459 // (will get synchronously recovered later)
2460 psdout(10) << __func__
<< " evicting osd." << want
.back()
2461 << " from oversized want " << want
<< dendl
;
2464 if (want
!= acting
) {
2465 psdout(10) << __func__
<< " want " << want
<< " != acting " << acting
2466 << ", requesting pg_temp change" << dendl
;
2469 if (!cct
->_conf
->osd_debug_no_acting_change
) {
2470 if (want_acting
== up
) {
2471 // There can't be any pending backfill if
2472 // want is the same as crush map up OSDs.
2473 ceph_assert(want_backfill
.empty());
2475 pl
->queue_want_pg_temp(empty
);
2477 pl
->queue_want_pg_temp(want
);
2482 if (request_pg_temp_change_only
)
2484 want_acting
.clear();
2485 acting_recovery_backfill
= want_acting_backfill
;
2486 psdout(10) << "acting_recovery_backfill is "
2487 << acting_recovery_backfill
<< dendl
;
2489 backfill_targets
.empty() ||
2490 backfill_targets
== want_backfill
);
2491 if (backfill_targets
.empty()) {
2492 // Caller is GetInfo
2493 backfill_targets
= want_backfill
;
2495 // Adding !needs_recovery() to let the async_recovery_targets reset after recovery is complete
2497 async_recovery_targets
.empty() ||
2498 async_recovery_targets
== want_async_recovery
||
2500 if (async_recovery_targets
.empty() || !needs_recovery()) {
2501 async_recovery_targets
= want_async_recovery
;
2503 // Will not change if already set because up would have had to change
2504 // Verify that nothing in backfill is in stray_set
2505 for (auto i
= want_backfill
.begin(); i
!= want_backfill
.end(); ++i
) {
2506 ceph_assert(stray_set
.find(*i
) == stray_set
.end());
2508 psdout(10) << "choose_acting want=" << want
<< " backfill_targets="
2509 << want_backfill
<< " async_recovery_targets="
2510 << async_recovery_targets
<< dendl
;
2514 void PeeringState::log_weirdness()
2516 if (pg_log
.get_tail() != info
.log_tail
)
2517 pl
->get_clog_error() << info
.pgid
2518 << " info mismatch, log.tail " << pg_log
.get_tail()
2519 << " != info.log_tail " << info
.log_tail
;
2520 if (pg_log
.get_head() != info
.last_update
)
2521 pl
->get_clog_error() << info
.pgid
2522 << " info mismatch, log.head " << pg_log
.get_head()
2523 << " != info.last_update " << info
.last_update
;
2525 if (!pg_log
.get_log().empty()) {
2527 if ((pg_log
.get_log().log
.begin()->version
<= pg_log
.get_tail()))
2528 pl
->get_clog_error() << info
.pgid
2529 << " log bound mismatch, info (tail,head] ("
2530 << pg_log
.get_tail() << ","
2531 << pg_log
.get_head() << "]"
2533 << pg_log
.get_log().log
.begin()->version
<< ","
2534 << pg_log
.get_log().log
.rbegin()->version
<< "]";
2537 if (pg_log
.get_log().caller_ops
.size() > pg_log
.get_log().log
.size()) {
2538 pl
->get_clog_error() << info
.pgid
2539 << " caller_ops.size "
2540 << pg_log
.get_log().caller_ops
.size()
2541 << " > log size " << pg_log
.get_log().log
.size();
2546 * Process information from a replica to determine if it could have any
2547 * objects that i need.
2549 * TODO: if the missing set becomes very large, this could get expensive.
2550 * Instead, we probably want to just iterate over our unfound set.
2552 bool PeeringState::search_for_missing(
2553 const pg_info_t
&oinfo
, const pg_missing_t
&omissing
,
2555 PeeringCtxWrapper
&ctx
)
2557 uint64_t num_unfound_before
= missing_loc
.num_unfound();
2558 bool found_missing
= missing_loc
.add_source_info(
2559 from
, oinfo
, omissing
, ctx
.handle
);
2560 if (found_missing
&& num_unfound_before
!= missing_loc
.num_unfound())
2561 pl
->publish_stats_to_osd();
2562 // avoid doing this if the peer is empty. This is abit of paranoia
2563 // to avoid doing something rash if add_source_info() above
2564 // incorrectly decided we found something new. (if the peer has
2565 // last_update=0'0 that's impossible.)
2566 if (found_missing
&&
2567 oinfo
.last_update
!= eversion_t()) {
2568 pg_info_t
tinfo(oinfo
);
2569 tinfo
.pgid
.shard
= pg_whoami
.shard
;
2572 spg_t(info
.pgid
.pgid
, from
.shard
),
2573 get_osdmap_epoch(), // fixme: use lower epoch?
2577 return found_missing
;
2580 bool PeeringState::discover_all_missing(
2581 BufferedRecoveryMessages
&rctx
)
2583 auto &missing
= pg_log
.get_missing();
2584 uint64_t unfound
= get_num_unfound();
2585 bool any
= false; // did we start any queries
2587 psdout(10) << __func__
<< " "
2588 << missing
.num_missing() << " missing, "
2589 << unfound
<< " unfound"
2592 auto m
= might_have_unfound
.begin();
2593 auto mend
= might_have_unfound
.end();
2594 for (; m
!= mend
; ++m
) {
2595 pg_shard_t
peer(*m
);
2597 if (!get_osdmap()->is_up(peer
.osd
)) {
2598 psdout(20) << __func__
<< " skipping down osd." << peer
<< dendl
;
2602 if (peer_purged
.count(peer
)) {
2603 psdout(20) << __func__
<< " skipping purged osd." << peer
<< dendl
;
2607 auto iter
= peer_info
.find(peer
);
2608 if (iter
!= peer_info
.end() &&
2609 (iter
->second
.is_empty() || iter
->second
.dne())) {
2610 // ignore empty peers
2614 // If we've requested any of this stuff, the pg_missing_t information
2615 // should be on its way.
2616 // TODO: coalsce requested_* into a single data structure
2617 if (peer_missing
.find(peer
) != peer_missing
.end()) {
2618 psdout(20) << __func__
<< ": osd." << peer
2619 << ": we already have pg_missing_t" << dendl
;
2622 if (peer_log_requested
.find(peer
) != peer_log_requested
.end()) {
2623 psdout(20) << __func__
<< ": osd." << peer
2624 << ": in peer_log_requested" << dendl
;
2627 if (peer_missing_requested
.find(peer
) != peer_missing_requested
.end()) {
2628 psdout(20) << __func__
<< ": osd." << peer
2629 << ": in peer_missing_requested" << dendl
;
2634 psdout(10) << __func__
<< ": osd." << peer
<< ": requesting pg_missing_t"
2636 peer_missing_requested
.insert(peer
);
2639 spg_t(info
.pgid
.pgid
, peer
.shard
),
2641 pg_query_t::FULLLOG
,
2642 peer
.shard
, pg_whoami
.shard
,
2643 info
.history
, get_osdmap_epoch()));
2649 /* Build the might_have_unfound set.
2651 * This is used by the primary OSD during recovery.
2653 * This set tracks the OSDs which might have unfound objects that the primary
2654 * OSD needs. As we receive pg_missing_t from each OSD in might_have_unfound, we
2655 * will remove the OSD from the set.
2657 void PeeringState::build_might_have_unfound()
2659 ceph_assert(might_have_unfound
.empty());
2660 ceph_assert(is_primary());
2662 psdout(10) << __func__
<< dendl
;
2664 check_past_interval_bounds();
2666 might_have_unfound
= past_intervals
.get_might_have_unfound(
2668 pool
.info
.is_erasure());
2670 // include any (stray) peers
2671 for (auto p
= peer_info
.begin(); p
!= peer_info
.end(); ++p
)
2672 might_have_unfound
.insert(p
->first
);
2674 psdout(15) << __func__
<< ": built " << might_have_unfound
<< dendl
;
2677 void PeeringState::activate(
2678 ObjectStore::Transaction
& t
,
2679 epoch_t activation_epoch
,
2680 PeeringCtxWrapper
&ctx
)
2682 ceph_assert(!is_peered());
2685 state_clear(PG_STATE_DOWN
);
2687 send_notify
= false;
2690 // only update primary last_epoch_started if we will go active
2691 if (acting_set_writeable()) {
2692 ceph_assert(cct
->_conf
->osd_find_best_info_ignore_history_les
||
2693 info
.last_epoch_started
<= activation_epoch
);
2694 info
.last_epoch_started
= activation_epoch
;
2695 info
.last_interval_started
= info
.history
.same_interval_since
;
2697 } else if (is_acting(pg_whoami
)) {
2698 /* update last_epoch_started on acting replica to whatever the primary sent
2699 * unless it's smaller (could happen if we are going peered rather than
2700 * active, see doc/dev/osd_internals/last_epoch_started.rst) */
2701 if (info
.last_epoch_started
< activation_epoch
) {
2702 info
.last_epoch_started
= activation_epoch
;
2703 info
.last_interval_started
= info
.history
.same_interval_since
;
2707 auto &missing
= pg_log
.get_missing();
2709 min_last_complete_ondisk
= eversion_t(0,0); // we don't know (yet)!
2711 last_update_ondisk
= info
.last_update
;
2713 last_update_applied
= info
.last_update
;
2714 last_rollback_info_trimmed_to_applied
= pg_log
.get_can_rollback_to();
2716 need_up_thru
= false;
2718 // write pg info, log
2720 dirty_big_info
= true; // maybe
2722 pl
->schedule_event_on_commit(
2724 std::make_shared
<PGPeeringEvent
>(
2729 activation_epoch
)));
2731 // init complete pointer
2732 if (missing
.num_missing() == 0) {
2733 psdout(10) << "activate - no missing, moving last_complete " << info
.last_complete
2734 << " -> " << info
.last_update
<< dendl
;
2735 info
.last_complete
= info
.last_update
;
2736 info
.stats
.stats
.sum
.num_objects_missing
= 0;
2737 pg_log
.reset_recovery_pointers();
2739 psdout(10) << "activate - not complete, " << missing
<< dendl
;
2740 info
.stats
.stats
.sum
.num_objects_missing
= missing
.num_missing();
2741 pg_log
.activate_not_complete(info
);
2747 // initialize snap_trimq
2748 interval_set
<snapid_t
> to_trim
;
2749 auto& removed_snaps_queue
= get_osdmap()->get_removed_snaps_queue();
2750 auto p
= removed_snaps_queue
.find(info
.pgid
.pgid
.pool());
2751 if (p
!= removed_snaps_queue
.end()) {
2752 dout(20) << "activate - purged_snaps " << info
.purged_snaps
2753 << " removed_snaps " << p
->second
2755 for (auto q
: p
->second
) {
2756 to_trim
.insert(q
.first
, q
.second
);
2759 interval_set
<snapid_t
> purged
;
2760 purged
.intersection_of(to_trim
, info
.purged_snaps
);
2761 to_trim
.subtract(purged
);
2763 if (HAVE_FEATURE(upacting_features
, SERVER_OCTOPUS
)) {
2764 renew_lease(pl
->get_mnow());
2765 // do not schedule until we are actually activated
2768 // adjust purged_snaps: PG may have been inactive while snaps were pruned
2769 // from the removed_snaps_queue in the osdmap. update local purged_snaps
2770 // reflect only those snaps that we thought were pruned and were still in
2772 info
.purged_snaps
.swap(purged
);
2774 // start up replicas
2775 info
.history
.refresh_prior_readable_until_ub(pl
->get_mnow(),
2776 prior_readable_until_ub
);
2778 ceph_assert(!acting_recovery_backfill
.empty());
2779 for (auto i
= acting_recovery_backfill
.begin();
2780 i
!= acting_recovery_backfill
.end();
2782 if (*i
== pg_whoami
) continue;
2783 pg_shard_t peer
= *i
;
2784 ceph_assert(peer_info
.count(peer
));
2785 pg_info_t
& pi
= peer_info
[peer
];
2787 psdout(10) << "activate peer osd." << peer
<< " " << pi
<< dendl
;
2790 ceph_assert(peer_missing
.count(peer
));
2791 pg_missing_t
& pm
= peer_missing
[peer
];
2793 bool needs_past_intervals
= pi
.dne();
2795 if (pi
.last_update
== info
.last_update
) {
2797 if (!pi
.last_backfill
.is_max())
2798 pl
->get_clog_info() << info
.pgid
<< " continuing backfill to osd."
2800 << " from (" << pi
.log_tail
<< "," << pi
.last_update
2801 << "] " << pi
.last_backfill
2802 << " to " << info
.last_update
;
2803 if (!pi
.is_empty()) {
2804 psdout(10) << "activate peer osd." << peer
2805 << " is up to date, queueing in pending_activators" << dendl
;
2808 spg_t(info
.pgid
.pgid
, peer
.shard
),
2809 get_osdmap_epoch(), // fixme: use lower epoch?
2814 psdout(10) << "activate peer osd." << peer
2815 << " is up to date, but sending pg_log anyway" << dendl
;
2816 m
= make_message
<MOSDPGLog
>(
2817 i
->shard
, pg_whoami
.shard
,
2818 get_osdmap_epoch(), info
,
2819 last_peering_reset
);
2822 pg_log
.get_tail() > pi
.last_update
||
2823 pi
.last_backfill
== hobject_t() ||
2824 (backfill_targets
.count(*i
) && pi
.last_backfill
.is_max())) {
2825 /* ^ This last case covers a situation where a replica is not contiguous
2826 * with the auth_log, but is contiguous with this replica. Reshuffling
2827 * the active set to handle this would be tricky, so instead we just go
2828 * ahead and backfill it anyway. This is probably preferrable in any
2829 * case since the replica in question would have to be significantly
2833 pl
->get_clog_debug() << info
.pgid
<< " starting backfill to osd." << peer
2834 << " from (" << pi
.log_tail
<< "," << pi
.last_update
2835 << "] " << pi
.last_backfill
2836 << " to " << info
.last_update
;
2838 pi
.last_update
= info
.last_update
;
2839 pi
.last_complete
= info
.last_update
;
2840 pi
.set_last_backfill(hobject_t());
2841 pi
.last_epoch_started
= info
.last_epoch_started
;
2842 pi
.last_interval_started
= info
.last_interval_started
;
2843 pi
.history
= info
.history
;
2844 pi
.hit_set
= info
.hit_set
;
2845 // Save num_bytes for reservation request, can't be negative
2846 peer_bytes
[peer
] = std::max
<int64_t>(0, pi
.stats
.stats
.sum
.num_bytes
);
2847 pi
.stats
.stats
.clear();
2848 pi
.stats
.stats
.sum
.num_bytes
= peer_bytes
[peer
];
2850 // initialize peer with our purged_snaps.
2851 pi
.purged_snaps
= info
.purged_snaps
;
2853 m
= make_message
<MOSDPGLog
>(
2854 i
->shard
, pg_whoami
.shard
,
2855 get_osdmap_epoch(), pi
,
2856 last_peering_reset
/* epoch to create pg at */);
2858 // send some recent log, so that op dup detection works well.
2859 m
->log
.copy_up_to(cct
, pg_log
.get_log(),
2860 cct
->_conf
->osd_max_pg_log_entries
);
2861 m
->info
.log_tail
= m
->log
.tail
;
2862 pi
.log_tail
= m
->log
.tail
; // sigh...
2867 ceph_assert(pg_log
.get_tail() <= pi
.last_update
);
2868 m
= make_message
<MOSDPGLog
>(
2869 i
->shard
, pg_whoami
.shard
,
2870 get_osdmap_epoch(), info
,
2871 last_peering_reset
/* epoch to create pg at */);
2872 // send new stuff to append to replicas log
2873 m
->log
.copy_after(cct
, pg_log
.get_log(), pi
.last_update
);
2876 // share past_intervals if we are creating the pg on the replica
2877 // based on whether our info for that peer was dne() *before*
2878 // updating pi.history in the backfill block above.
2879 if (m
&& needs_past_intervals
)
2880 m
->past_intervals
= past_intervals
;
2882 // update local version of peer's missing list!
2883 if (m
&& pi
.last_backfill
!= hobject_t()) {
2884 for (auto p
= m
->log
.log
.begin(); p
!= m
->log
.log
.end(); ++p
) {
2885 if (p
->soid
<= pi
.last_backfill
&&
2887 if (perform_deletes_during_peering() && p
->is_delete()) {
2888 pm
.rm(p
->soid
, p
->version
);
2890 pm
.add_next_event(*p
);
2897 dout(10) << "activate peer osd." << peer
<< " sending " << m
->log
2899 m
->lease
= get_lease();
2900 pl
->send_cluster_message(peer
.osd
, m
, get_osdmap_epoch());
2904 pi
.last_update
= info
.last_update
;
2906 // update our missing
2907 if (pm
.num_missing() == 0) {
2908 pi
.last_complete
= pi
.last_update
;
2909 psdout(10) << "activate peer osd." << peer
<< " " << pi
2910 << " uptodate" << dendl
;
2912 psdout(10) << "activate peer osd." << peer
<< " " << pi
2913 << " missing " << pm
<< dendl
;
2917 // Set up missing_loc
2918 set
<pg_shard_t
> complete_shards
;
2919 for (auto i
= acting_recovery_backfill
.begin();
2920 i
!= acting_recovery_backfill
.end();
2922 psdout(20) << __func__
<< " setting up missing_loc from shard " << *i
2924 if (*i
== get_primary()) {
2925 missing_loc
.add_active_missing(missing
);
2926 if (!missing
.have_missing())
2927 complete_shards
.insert(*i
);
2929 auto peer_missing_entry
= peer_missing
.find(*i
);
2930 ceph_assert(peer_missing_entry
!= peer_missing
.end());
2931 missing_loc
.add_active_missing(peer_missing_entry
->second
);
2932 if (!peer_missing_entry
->second
.have_missing() &&
2933 peer_info
[*i
].last_backfill
.is_max())
2934 complete_shards
.insert(*i
);
2938 // If necessary, create might_have_unfound to help us find our unfound objects.
2939 // NOTE: It's important that we build might_have_unfound before trimming the
2941 might_have_unfound
.clear();
2942 if (needs_recovery()) {
2943 // If only one shard has missing, we do a trick to add all others as recovery
2944 // source, this is considered safe since the PGLogs have been merged locally,
2945 // and covers vast majority of the use cases, like one OSD/host is down for
2946 // a while for hardware repairing
2947 if (complete_shards
.size() + 1 == acting_recovery_backfill
.size()) {
2948 missing_loc
.add_batch_sources_info(complete_shards
, ctx
.handle
);
2950 missing_loc
.add_source_info(pg_whoami
, info
, pg_log
.get_missing(),
2952 for (auto i
= acting_recovery_backfill
.begin();
2953 i
!= acting_recovery_backfill
.end();
2955 if (*i
== pg_whoami
) continue;
2956 psdout(10) << __func__
<< ": adding " << *i
<< " as a source" << dendl
;
2957 ceph_assert(peer_missing
.count(*i
));
2958 ceph_assert(peer_info
.count(*i
));
2959 missing_loc
.add_source_info(
2966 for (auto i
= peer_missing
.begin(); i
!= peer_missing
.end(); ++i
) {
2967 if (is_acting_recovery_backfill(i
->first
))
2969 ceph_assert(peer_info
.count(i
->first
));
2971 peer_info
[i
->first
],
2977 build_might_have_unfound();
2979 // Always call now so update_calc_stats() will be accurate
2980 discover_all_missing(ctx
.msgs
);
2984 // num_objects_degraded if calculated should reflect this too, unless no
2985 // missing and we are about to go clean.
2986 if (get_osdmap()->get_pg_size(info
.pgid
.pgid
) > actingset
.size()) {
2987 state_set(PG_STATE_UNDERSIZED
);
2990 state_set(PG_STATE_ACTIVATING
);
2991 pl
->on_activate(std::move(to_trim
));
2993 if (acting_set_writeable()) {
2994 PGLog::LogEntryHandlerRef rollbacker
{pl
->get_log_handler(t
)};
2995 pg_log
.roll_forward(rollbacker
.get());
2999 void PeeringState::share_pg_info()
3001 psdout(10) << "share_pg_info" << dendl
;
3003 info
.history
.refresh_prior_readable_until_ub(pl
->get_mnow(),
3004 prior_readable_until_ub
);
3006 // share new pg_info_t with replicas
3007 ceph_assert(!acting_recovery_backfill
.empty());
3008 for (auto pg_shard
: acting_recovery_backfill
) {
3009 if (pg_shard
== pg_whoami
) continue;
3010 if (auto peer
= peer_info
.find(pg_shard
); peer
!= peer_info
.end()) {
3011 peer
->second
.last_epoch_started
= info
.last_epoch_started
;
3012 peer
->second
.last_interval_started
= info
.last_interval_started
;
3013 peer
->second
.history
.merge(info
.history
);
3016 if (last_require_osd_release
>= ceph_release_t::octopus
) {
3017 m
= make_message
<MOSDPGInfo2
>(spg_t
{info
.pgid
.pgid
, pg_shard
.shard
},
3021 std::optional
<pg_lease_t
>{get_lease()},
3024 m
= make_message
<MOSDPGInfo
>(get_osdmap_epoch(),
3025 MOSDPGInfo::pg_list_t
{
3026 pg_notify_t
{pg_shard
.shard
,
3033 pl
->send_cluster_message(pg_shard
.osd
, m
, get_osdmap_epoch());
3037 void PeeringState::merge_log(
3038 ObjectStore::Transaction
& t
, pg_info_t
&oinfo
, pg_log_t
&& olog
,
3041 PGLog::LogEntryHandlerRef rollbacker
{pl
->get_log_handler(t
)};
3043 oinfo
, std::move(olog
), from
, info
, rollbacker
.get(),
3044 dirty_info
, dirty_big_info
);
3047 void PeeringState::rewind_divergent_log(
3048 ObjectStore::Transaction
& t
, eversion_t newhead
)
3050 PGLog::LogEntryHandlerRef rollbacker
{pl
->get_log_handler(t
)};
3051 pg_log
.rewind_divergent_log(
3052 newhead
, info
, rollbacker
.get(), dirty_info
, dirty_big_info
);
3056 void PeeringState::proc_primary_info(
3057 ObjectStore::Transaction
&t
, const pg_info_t
&oinfo
)
3059 ceph_assert(!is_primary());
3061 update_history(oinfo
.history
);
3062 if (!info
.stats
.stats_invalid
&& info
.stats
.stats
.sum
.num_scrub_errors
) {
3063 info
.stats
.stats
.sum
.num_scrub_errors
= 0;
3064 info
.stats
.stats
.sum
.num_shallow_scrub_errors
= 0;
3065 info
.stats
.stats
.sum
.num_deep_scrub_errors
= 0;
3069 if (!(info
.purged_snaps
== oinfo
.purged_snaps
)) {
3070 psdout(10) << __func__
<< " updating purged_snaps to "
3071 << oinfo
.purged_snaps
3073 info
.purged_snaps
= oinfo
.purged_snaps
;
3075 dirty_big_info
= true;
3079 void PeeringState::proc_master_log(
3080 ObjectStore::Transaction
& t
, pg_info_t
&oinfo
,
3081 pg_log_t
&& olog
, pg_missing_t
&& omissing
, pg_shard_t from
)
3083 psdout(10) << "proc_master_log for osd." << from
<< ": "
3084 << olog
<< " " << omissing
<< dendl
;
3085 ceph_assert(!is_peered() && is_primary());
3087 // merge log into our own log to build master log. no need to
3088 // make any adjustments to their missing map; we are taking their
3089 // log to be authoritative (i.e., their entries are by definitely
3091 merge_log(t
, oinfo
, std::move(olog
), from
);
3092 peer_info
[from
] = oinfo
;
3093 psdout(10) << " peer osd." << from
<< " now " << oinfo
3094 << " " << omissing
<< dendl
;
3095 might_have_unfound
.insert(from
);
3097 // See doc/dev/osd_internals/last_epoch_started
3098 if (oinfo
.last_epoch_started
> info
.last_epoch_started
) {
3099 info
.last_epoch_started
= oinfo
.last_epoch_started
;
3102 if (oinfo
.last_interval_started
> info
.last_interval_started
) {
3103 info
.last_interval_started
= oinfo
.last_interval_started
;
3106 update_history(oinfo
.history
);
3107 ceph_assert(cct
->_conf
->osd_find_best_info_ignore_history_les
||
3108 info
.last_epoch_started
>= info
.history
.last_epoch_started
);
3110 peer_missing
[from
].claim(std::move(omissing
));
3113 void PeeringState::proc_replica_log(
3115 const pg_log_t
&olog
,
3116 pg_missing_t
&& omissing
,
3119 psdout(10) << "proc_replica_log for osd." << from
<< ": "
3120 << oinfo
<< " " << olog
<< " " << omissing
<< dendl
;
3122 pg_log
.proc_replica_log(oinfo
, olog
, omissing
, from
);
3124 peer_info
[from
] = oinfo
;
3125 psdout(10) << " peer osd." << from
<< " now "
3126 << oinfo
<< " " << omissing
<< dendl
;
3127 might_have_unfound
.insert(from
);
3129 for (auto i
= omissing
.get_items().begin();
3130 i
!= omissing
.get_items().end();
3132 psdout(20) << " after missing " << i
->first
3133 << " need " << i
->second
.need
3134 << " have " << i
->second
.have
<< dendl
;
3136 peer_missing
[from
].claim(std::move(omissing
));
3139 void PeeringState::fulfill_info(
3140 pg_shard_t from
, const pg_query_t
&query
,
3141 pair
<pg_shard_t
, pg_info_t
> ¬ify_info
)
3143 ceph_assert(from
== primary
);
3144 ceph_assert(query
.type
== pg_query_t::INFO
);
3147 psdout(10) << "sending info" << dendl
;
3148 notify_info
= make_pair(from
, info
);
3151 void PeeringState::fulfill_log(
3152 pg_shard_t from
, const pg_query_t
&query
, epoch_t query_epoch
)
3154 psdout(10) << "log request from " << from
<< dendl
;
3155 ceph_assert(from
== primary
);
3156 ceph_assert(query
.type
!= pg_query_t::INFO
);
3158 auto mlog
= make_message
<MOSDPGLog
>(
3159 from
.shard
, pg_whoami
.shard
,
3162 mlog
->missing
= pg_log
.get_missing();
3164 // primary -> other, when building master log
3165 if (query
.type
== pg_query_t::LOG
) {
3166 psdout(10) << " sending info+missing+log since " << query
.since
3168 if (query
.since
!= eversion_t() && query
.since
< pg_log
.get_tail()) {
3169 pl
->get_clog_error() << info
.pgid
<< " got broken pg_query_t::LOG since "
3171 << " when my log.tail is " << pg_log
.get_tail()
3172 << ", sending full log instead";
3173 mlog
->log
= pg_log
.get_log(); // primary should not have requested this!!
3175 mlog
->log
.copy_after(cct
, pg_log
.get_log(), query
.since
);
3177 else if (query
.type
== pg_query_t::FULLLOG
) {
3178 psdout(10) << " sending info+missing+full log" << dendl
;
3179 mlog
->log
= pg_log
.get_log();
3182 psdout(10) << " sending " << mlog
->log
<< " " << mlog
->missing
<< dendl
;
3184 pl
->send_cluster_message(from
.osd
, mlog
, get_osdmap_epoch(), true);
3187 void PeeringState::fulfill_query(const MQuery
& query
, PeeringCtxWrapper
&rctx
)
3189 if (query
.query
.type
== pg_query_t::INFO
) {
3190 pair
<pg_shard_t
, pg_info_t
> notify_info
;
3191 // note this refreshes our prior_readable_until_ub value
3192 update_history(query
.query
.history
);
3193 fulfill_info(query
.from
, query
.query
, notify_info
);
3195 notify_info
.first
.osd
,
3197 notify_info
.first
.shard
, pg_whoami
.shard
,
3203 update_history(query
.query
.history
);
3204 fulfill_log(query
.from
, query
.query
, query
.query_epoch
);
3208 void PeeringState::try_mark_clean()
3210 if (actingset
.size() == get_osdmap()->get_pg_size(info
.pgid
.pgid
)) {
3211 state_clear(PG_STATE_FORCED_BACKFILL
| PG_STATE_FORCED_RECOVERY
);
3212 state_set(PG_STATE_CLEAN
);
3213 info
.history
.last_epoch_clean
= get_osdmap_epoch();
3214 info
.history
.last_interval_clean
= info
.history
.same_interval_since
;
3215 past_intervals
.clear();
3216 dirty_big_info
= true;
3220 if (!is_active() && is_peered()) {
3223 if (pool
.info
.is_pending_merge(info
.pgid
.pgid
, &target
)) {
3225 psdout(10) << "ready to merge (target)" << dendl
;
3226 pl
->set_ready_to_merge_target(
3228 info
.history
.last_epoch_started
,
3229 info
.history
.last_epoch_clean
);
3231 psdout(10) << "ready to merge (source)" << dendl
;
3232 pl
->set_ready_to_merge_source(info
.last_update
);
3236 psdout(10) << "not clean, not ready to merge" << dendl
;
3237 // we should have notified OSD in Active state entry point
3241 state_clear(PG_STATE_FORCED_RECOVERY
| PG_STATE_FORCED_BACKFILL
);
3244 pl
->publish_stats_to_osd();
3245 clear_recovery_state();
3248 void PeeringState::split_into(
3249 pg_t child_pgid
, PeeringState
*child
, unsigned split_bits
)
3251 child
->update_osdmap_ref(get_osdmap());
3255 pg_log
.split_into(child_pgid
, split_bits
, &(child
->pg_log
));
3256 child
->info
.last_complete
= info
.last_complete
;
3258 info
.last_update
= pg_log
.get_head();
3259 child
->info
.last_update
= child
->pg_log
.get_head();
3261 child
->info
.last_user_version
= info
.last_user_version
;
3263 info
.log_tail
= pg_log
.get_tail();
3264 child
->info
.log_tail
= child
->pg_log
.get_tail();
3266 // reset last_complete, we might have modified pg_log & missing above
3267 pg_log
.reset_complete_to(&info
);
3268 child
->pg_log
.reset_complete_to(&child
->info
);
3271 child
->info
.history
= info
.history
;
3272 child
->info
.history
.epoch_created
= get_osdmap_epoch();
3273 child
->info
.purged_snaps
= info
.purged_snaps
;
3275 if (info
.last_backfill
.is_max()) {
3276 child
->info
.set_last_backfill(hobject_t::get_max());
3278 // restart backfill on parent and child to be safe. we could
3279 // probably do better in the bitwise sort case, but it's more
3280 // fragile (there may be special work to do on backfill completion
3282 info
.set_last_backfill(hobject_t());
3283 child
->info
.set_last_backfill(hobject_t());
3284 // restarting backfill implies that the missing set is empty,
3285 // since it is only used for objects prior to last_backfill
3286 pg_log
.reset_backfill();
3287 child
->pg_log
.reset_backfill();
3290 child
->info
.stats
= info
.stats
;
3291 child
->info
.stats
.parent_split_bits
= split_bits
;
3292 info
.stats
.stats_invalid
= true;
3293 child
->info
.stats
.stats_invalid
= true;
3294 child
->info
.last_epoch_started
= info
.last_epoch_started
;
3295 child
->info
.last_interval_started
= info
.last_interval_started
;
3297 // There can't be recovery/backfill going on now
3298 int primary
, up_primary
;
3299 vector
<int> newup
, newacting
;
3300 get_osdmap()->pg_to_up_acting_osds(
3301 child
->info
.pgid
.pgid
, &newup
, &up_primary
, &newacting
, &primary
);
3302 child
->init_primary_up_acting(
3307 child
->role
= OSDMap::calc_pg_role(pg_whoami
, child
->acting
);
3309 // this comparison includes primary rank via pg_shard_t
3310 if (get_primary() != child
->get_primary())
3311 child
->info
.history
.same_primary_since
= get_osdmap_epoch();
3313 child
->info
.stats
.up
= newup
;
3314 child
->info
.stats
.up_primary
= up_primary
;
3315 child
->info
.stats
.acting
= newacting
;
3316 child
->info
.stats
.acting_primary
= primary
;
3317 child
->info
.stats
.mapping_epoch
= get_osdmap_epoch();
3320 child
->past_intervals
= past_intervals
;
3322 child
->on_new_interval();
3324 child
->send_notify
= !child
->is_primary();
3326 child
->dirty_info
= true;
3327 child
->dirty_big_info
= true;
3329 dirty_big_info
= true;
3332 void PeeringState::merge_from(
3333 map
<spg_t
,PeeringState
*>& sources
,
3335 unsigned split_bits
,
3336 const pg_merge_meta_t
& last_pg_merge_meta
)
3338 bool incomplete
= false;
3339 if (info
.last_complete
!= info
.last_update
||
3340 info
.is_incomplete() ||
3342 psdout(10) << __func__
<< " target incomplete" << dendl
;
3345 if (last_pg_merge_meta
.source_pgid
!= pg_t()) {
3346 if (info
.pgid
.pgid
!= last_pg_merge_meta
.source_pgid
.get_parent()) {
3347 psdout(10) << __func__
<< " target doesn't match expected parent "
3348 << last_pg_merge_meta
.source_pgid
.get_parent()
3349 << " of source_pgid " << last_pg_merge_meta
.source_pgid
3353 if (info
.last_update
!= last_pg_merge_meta
.target_version
) {
3354 psdout(10) << __func__
<< " target version doesn't match expected "
3355 << last_pg_merge_meta
.target_version
<< dendl
;
3360 PGLog::LogEntryHandlerRef handler
{pl
->get_log_handler(rctx
.transaction
)};
3361 pg_log
.roll_forward(handler
.get());
3363 info
.last_complete
= info
.last_update
; // to fake out trim()
3364 pg_log
.reset_recovery_pointers();
3365 pg_log
.trim(info
.last_update
, info
);
3367 vector
<PGLog
*> log_from
;
3368 for (auto& i
: sources
) {
3369 auto& source
= i
.second
;
3371 psdout(10) << __func__
<< " source " << i
.first
<< " missing" << dendl
;
3375 if (source
->info
.last_complete
!= source
->info
.last_update
||
3376 source
->info
.is_incomplete() ||
3377 source
->info
.dne()) {
3378 psdout(10) << __func__
<< " source " << source
->pg_whoami
3383 if (last_pg_merge_meta
.source_pgid
!= pg_t()) {
3384 if (source
->info
.pgid
.pgid
!= last_pg_merge_meta
.source_pgid
) {
3385 dout(10) << __func__
<< " source " << source
->info
.pgid
.pgid
3386 << " doesn't match expected source pgid "
3387 << last_pg_merge_meta
.source_pgid
<< dendl
;
3390 if (source
->info
.last_update
!= last_pg_merge_meta
.source_version
) {
3391 dout(10) << __func__
<< " source version doesn't match expected "
3392 << last_pg_merge_meta
.target_version
<< dendl
;
3398 PGLog::LogEntryHandlerRef handler
{
3399 source
->pl
->get_log_handler(rctx
.transaction
)};
3400 source
->pg_log
.roll_forward(handler
.get());
3401 source
->info
.last_complete
= source
->info
.last_update
; // to fake out trim()
3402 source
->pg_log
.reset_recovery_pointers();
3403 source
->pg_log
.trim(source
->info
.last_update
, source
->info
);
3404 log_from
.push_back(&source
->pg_log
);
3407 info
.stats
.add(source
->info
.stats
);
3409 // pull up last_update
3410 info
.last_update
= std::max(info
.last_update
, source
->info
.last_update
);
3412 // adopt source's PastIntervals if target has none. we can do this since
3413 // pgp_num has been reduced prior to the merge, so the OSD mappings for
3414 // the PGs are identical.
3415 if (past_intervals
.empty() && !source
->past_intervals
.empty()) {
3416 psdout(10) << __func__
<< " taking source's past_intervals" << dendl
;
3417 past_intervals
= source
->past_intervals
;
3421 info
.last_complete
= info
.last_update
;
3422 info
.log_tail
= info
.last_update
;
3424 info
.last_backfill
= hobject_t();
3428 pg_log
.merge_from(log_from
, info
.last_update
);
3430 // make sure we have a meaningful last_epoch_started/clean (if we were a
3432 if (info
.history
.epoch_created
== 0) {
3433 // start with (a) source's history, since these PGs *should* have been
3434 // remapped in concert with each other...
3435 info
.history
= sources
.begin()->second
->info
.history
;
3437 // we use the last_epoch_{started,clean} we got from
3438 // the caller, which are the epochs that were reported by the PGs were
3439 // found to be ready for merge.
3440 info
.history
.last_epoch_clean
= last_pg_merge_meta
.last_epoch_clean
;
3441 info
.history
.last_epoch_started
= last_pg_merge_meta
.last_epoch_started
;
3442 info
.last_epoch_started
= last_pg_merge_meta
.last_epoch_started
;
3443 psdout(10) << __func__
3444 << " set les/c to " << last_pg_merge_meta
.last_epoch_started
<< "/"
3445 << last_pg_merge_meta
.last_epoch_clean
3446 << " from pool last_dec_*, source pg history was "
3447 << sources
.begin()->second
->info
.history
3450 // above we have pulled down source's history and we need to check
3451 // history.epoch_created again to confirm that source is not a placeholder
3452 // too. (peering requires a sane history.same_interval_since value for any
3453 // non-newly created pg and below here we know we are basically iterating
3454 // back a series of past maps to fake a merge process, hence we need to
3455 // fix history.same_interval_since first so that start_peering_interval()
3456 // will not complain)
3457 if (info
.history
.epoch_created
== 0) {
3458 dout(10) << __func__
<< " both merge target and source are placeholders,"
3459 << " set sis to lec " << info
.history
.last_epoch_clean
3461 info
.history
.same_interval_since
= info
.history
.last_epoch_clean
;
3464 // if the past_intervals start is later than last_epoch_clean, it
3465 // implies the source repeered again but the target didn't, or
3466 // that the source became clean in a later epoch than the target.
3467 // avoid the discrepancy but adjusting the interval start
3468 // backwards to match so that check_past_interval_bounds() will
3470 auto pib
= past_intervals
.get_bounds();
3471 if (info
.history
.last_epoch_clean
< pib
.first
) {
3472 psdout(10) << __func__
<< " last_epoch_clean "
3473 << info
.history
.last_epoch_clean
<< " < past_interval start "
3474 << pib
.first
<< ", adjusting start backwards" << dendl
;
3475 past_intervals
.adjust_start_backwards(info
.history
.last_epoch_clean
);
3478 // Similarly, if the same_interval_since value is later than
3479 // last_epoch_clean, the next interval change will result in a
3480 // past_interval start that is later than last_epoch_clean. This
3481 // can happen if we use the pg_history values from the merge
3482 // source. Adjust the same_interval_since value backwards if that
3483 // happens. (We trust the les and lec values more because they came from
3484 // the real target, whereas the history value we stole from the source.)
3485 if (info
.history
.last_epoch_started
< info
.history
.same_interval_since
) {
3486 psdout(10) << __func__
<< " last_epoch_started "
3487 << info
.history
.last_epoch_started
<< " < same_interval_since "
3488 << info
.history
.same_interval_since
3489 << ", adjusting pg_history backwards" << dendl
;
3490 info
.history
.same_interval_since
= info
.history
.last_epoch_clean
;
3491 // make sure same_{up,primary}_since are <= same_interval_since
3492 info
.history
.same_up_since
= std::min(
3493 info
.history
.same_up_since
, info
.history
.same_interval_since
);
3494 info
.history
.same_primary_since
= std::min(
3495 info
.history
.same_primary_since
, info
.history
.same_interval_since
);
3500 dirty_big_info
= true;
3503 void PeeringState::start_split_stats(
3504 const set
<spg_t
>& childpgs
, vector
<object_stat_sum_t
> *out
)
3506 out
->resize(childpgs
.size() + 1);
3507 info
.stats
.stats
.sum
.split(*out
);
3510 void PeeringState::finish_split_stats(
3511 const object_stat_sum_t
& stats
, ObjectStore::Transaction
&t
)
3513 info
.stats
.stats
.sum
= stats
;
3517 void PeeringState::update_blocked_by()
3519 // set a max on the number of blocking peers we report. if we go
3520 // over, report a random subset. keep the result sorted.
3521 unsigned keep
= std::min
<unsigned>(
3522 blocked_by
.size(), cct
->_conf
->osd_max_pg_blocked_by
);
3523 unsigned skip
= blocked_by
.size() - keep
;
3524 info
.stats
.blocked_by
.clear();
3525 info
.stats
.blocked_by
.resize(keep
);
3527 for (auto p
= blocked_by
.begin(); p
!= blocked_by
.end() && keep
> 0; ++p
) {
3528 if (skip
> 0 && (rand() % (skip
+ keep
) < skip
)) {
3531 info
.stats
.blocked_by
[pos
++] = *p
;
3537 static bool find_shard(const set
<pg_shard_t
> & pgs
, shard_id_t shard
)
3540 if (p
.shard
== shard
)
3545 static pg_shard_t
get_another_shard(const set
<pg_shard_t
> & pgs
, pg_shard_t skip
, shard_id_t shard
)
3547 for (auto&p
: pgs
) {
3550 if (p
.shard
== shard
)
3553 return pg_shard_t();
3556 void PeeringState::update_calc_stats()
3558 info
.stats
.version
= info
.last_update
;
3559 info
.stats
.created
= info
.history
.epoch_created
;
3560 info
.stats
.last_scrub
= info
.history
.last_scrub
;
3561 info
.stats
.last_scrub_stamp
= info
.history
.last_scrub_stamp
;
3562 info
.stats
.last_deep_scrub
= info
.history
.last_deep_scrub
;
3563 info
.stats
.last_deep_scrub_stamp
= info
.history
.last_deep_scrub_stamp
;
3564 info
.stats
.last_clean_scrub_stamp
= info
.history
.last_clean_scrub_stamp
;
3565 info
.stats
.last_epoch_clean
= info
.history
.last_epoch_clean
;
3567 info
.stats
.log_size
= pg_log
.get_head().version
- pg_log
.get_tail().version
;
3568 info
.stats
.ondisk_log_size
= info
.stats
.log_size
;
3569 info
.stats
.log_start
= pg_log
.get_tail();
3570 info
.stats
.ondisk_log_start
= pg_log
.get_tail();
3571 info
.stats
.snaptrimq_len
= pl
->get_snap_trimq_size();
3573 unsigned num_shards
= get_osdmap()->get_pg_size(info
.pgid
.pgid
);
3575 // In rare case that upset is too large (usually transient), use as target
3576 // for calculations below.
3577 unsigned target
= std::max(num_shards
, (unsigned)upset
.size());
3578 // For undersized actingset may be larger with OSDs out
3579 unsigned nrep
= std::max(actingset
.size(), upset
.size());
3580 // calc num_object_copies
3581 info
.stats
.stats
.calc_copies(std::max(target
, nrep
));
3582 info
.stats
.stats
.sum
.num_objects_degraded
= 0;
3583 info
.stats
.stats
.sum
.num_objects_unfound
= 0;
3584 info
.stats
.stats
.sum
.num_objects_misplaced
= 0;
3585 info
.stats
.avail_no_missing
.clear();
3586 info
.stats
.object_location_counts
.clear();
3588 // We should never hit this condition, but if end up hitting it,
3589 // make sure to update num_objects and set PG_STATE_INCONSISTENT.
3590 if (info
.stats
.stats
.sum
.num_objects
< 0) {
3591 psdout(0) << __func__
<< " negative num_objects = "
3592 << info
.stats
.stats
.sum
.num_objects
<< " setting it to 0 "
3594 info
.stats
.stats
.sum
.num_objects
= 0;
3595 state_set(PG_STATE_INCONSISTENT
);
3598 if ((is_remapped() || is_undersized() || !is_clean()) &&
3599 (is_peered()|| is_activating())) {
3600 psdout(20) << __func__
<< " actingset " << actingset
<< " upset "
3601 << upset
<< " acting_recovery_backfill " << acting_recovery_backfill
<< dendl
;
3603 ceph_assert(!acting_recovery_backfill
.empty());
3605 bool estimate
= false;
3607 // NOTE: we only generate degraded, misplaced and unfound
3608 // values for the summation, not individual stat categories.
3609 int64_t num_objects
= info
.stats
.stats
.sum
.num_objects
;
3611 // Objects missing from up nodes, sorted by # objects.
3612 boost::container::flat_set
<pair
<int64_t,pg_shard_t
>> missing_target_objects
;
3613 // Objects missing from nodes not in up, sort by # objects
3614 boost::container::flat_set
<pair
<int64_t,pg_shard_t
>> acting_source_objects
;
3616 // Fill missing_target_objects/acting_source_objects
3622 missing
= pg_log
.get_missing().num_missing();
3623 ceph_assert(acting_recovery_backfill
.count(pg_whoami
));
3624 if (upset
.count(pg_whoami
)) {
3625 missing_target_objects
.emplace(missing
, pg_whoami
);
3627 acting_source_objects
.emplace(missing
, pg_whoami
);
3629 info
.stats
.stats
.sum
.num_objects_missing_on_primary
= missing
;
3631 info
.stats
.avail_no_missing
.push_back(pg_whoami
);
3632 psdout(20) << __func__
<< " shard " << pg_whoami
3633 << " primary objects " << num_objects
3634 << " missing " << missing
3639 for (auto& peer
: peer_info
) {
3640 // Primary should not be in the peer_info, skip if it is.
3641 if (peer
.first
== pg_whoami
) continue;
3642 int64_t missing
= 0;
3643 int64_t peer_num_objects
=
3644 std::max((int64_t)0, peer
.second
.stats
.stats
.sum
.num_objects
);
3645 // Backfill targets always track num_objects accurately
3646 // all other peers track missing accurately.
3647 if (is_backfill_target(peer
.first
)) {
3648 missing
= std::max((int64_t)0, num_objects
- peer_num_objects
);
3650 if (peer_missing
.count(peer
.first
)) {
3651 missing
= peer_missing
[peer
.first
].num_missing();
3653 psdout(20) << __func__
<< " no peer_missing found for "
3654 << peer
.first
<< dendl
;
3655 if (is_recovering()) {
3658 missing
= std::max((int64_t)0, num_objects
- peer_num_objects
);
3661 if (upset
.count(peer
.first
)) {
3662 missing_target_objects
.emplace(missing
, peer
.first
);
3663 } else if (actingset
.count(peer
.first
)) {
3664 acting_source_objects
.emplace(missing
, peer
.first
);
3666 peer
.second
.stats
.stats
.sum
.num_objects_missing
= missing
;
3668 info
.stats
.avail_no_missing
.push_back(peer
.first
);
3669 psdout(20) << __func__
<< " shard " << peer
.first
3670 << " objects " << peer_num_objects
3671 << " missing " << missing
3675 // Compute object_location_counts
3676 for (auto& ml
: missing_loc
.get_missing_locs()) {
3677 info
.stats
.object_location_counts
[ml
.second
]++;
3678 psdout(30) << __func__
<< " " << ml
.first
<< " object_location_counts["
3679 << ml
.second
<< "]=" << info
.stats
.object_location_counts
[ml
.second
]
3682 int64_t not_missing
= num_objects
- missing_loc
.get_missing_locs().size();
3684 // During recovery we know upset == actingset and is being populated
3685 // During backfill we know that all non-missing objects are in the actingset
3686 info
.stats
.object_location_counts
[actingset
] = not_missing
;
3688 psdout(30) << __func__
<< " object_location_counts["
3689 << upset
<< "]=" << info
.stats
.object_location_counts
[upset
]
3691 psdout(20) << __func__
<< " object_location_counts "
3692 << info
.stats
.object_location_counts
<< dendl
;
3694 // A misplaced object is not stored on the correct OSD
3695 int64_t misplaced
= 0;
3696 // a degraded objects has fewer replicas or EC shards than the pool specifies.
3697 int64_t degraded
= 0;
3699 if (is_recovering()) {
3700 for (auto& sml
: missing_loc
.get_missing_by_count()) {
3701 for (auto& ml
: sml
.second
) {
3703 if (sml
.first
== shard_id_t::NO_SHARD
) {
3704 psdout(20) << __func__
<< " ml " << ml
.second
3705 << " upset size " << upset
.size()
3706 << " up " << ml
.first
.up
<< dendl
;
3707 missing_shards
= (int)upset
.size() - ml
.first
.up
;
3709 // Handle shards not even in upset below
3710 if (!find_shard(upset
, sml
.first
))
3712 missing_shards
= std::max(0, 1 - ml
.first
.up
);
3713 psdout(20) << __func__
3714 << " shard " << sml
.first
3715 << " ml " << ml
.second
3716 << " missing shards " << missing_shards
<< dendl
;
3718 int odegraded
= ml
.second
* missing_shards
;
3719 // Copies on other osds but limited to the possible degraded
3720 int more_osds
= std::min(missing_shards
, ml
.first
.other
);
3721 int omisplaced
= ml
.second
* more_osds
;
3722 ceph_assert(omisplaced
<= odegraded
);
3723 odegraded
-= omisplaced
;
3725 misplaced
+= omisplaced
;
3726 degraded
+= odegraded
;
3730 psdout(20) << __func__
<< " missing based degraded "
3731 << degraded
<< dendl
;
3732 psdout(20) << __func__
<< " missing based misplaced "
3733 << misplaced
<< dendl
;
3735 // Handle undersized case
3736 if (pool
.info
.is_replicated()) {
3737 // Add degraded for missing targets (num_objects missing)
3738 ceph_assert(target
>= upset
.size());
3739 unsigned needed
= target
- upset
.size();
3740 degraded
+= num_objects
* needed
;
3742 for (unsigned i
= 0 ; i
< num_shards
; ++i
) {
3743 shard_id_t
shard(i
);
3745 if (!find_shard(upset
, shard
)) {
3746 pg_shard_t pgs
= get_another_shard(actingset
, pg_shard_t(), shard
);
3748 if (pgs
!= pg_shard_t()) {
3751 if (pgs
== pg_whoami
)
3752 missing
= info
.stats
.stats
.sum
.num_objects_missing_on_primary
;
3754 missing
= peer_info
[pgs
].stats
.stats
.sum
.num_objects_missing
;
3756 degraded
+= missing
;
3757 misplaced
+= std::max((int64_t)0, num_objects
- missing
);
3759 // No shard anywhere
3760 degraded
+= num_objects
;
3768 // Handle undersized case
3769 if (pool
.info
.is_replicated()) {
3770 // Add to missing_target_objects
3771 ceph_assert(target
>= missing_target_objects
.size());
3772 unsigned needed
= target
- missing_target_objects
.size();
3774 missing_target_objects
.emplace(num_objects
* needed
, pg_shard_t(pg_shard_t::NO_OSD
));
3776 for (unsigned i
= 0 ; i
< num_shards
; ++i
) {
3777 shard_id_t
shard(i
);
3779 for (const auto& t
: missing_target_objects
) {
3780 if (std::get
<1>(t
).shard
== shard
) {
3786 missing_target_objects
.emplace(num_objects
, pg_shard_t(pg_shard_t::NO_OSD
,shard
));
3790 for (const auto& item
: missing_target_objects
)
3791 psdout(20) << __func__
<< " missing shard " << std::get
<1>(item
)
3792 << " missing= " << std::get
<0>(item
) << dendl
;
3793 for (const auto& item
: acting_source_objects
)
3794 psdout(20) << __func__
<< " acting shard " << std::get
<1>(item
)
3795 << " missing= " << std::get
<0>(item
) << dendl
;
3797 // Handle all objects not in missing for remapped
3799 for (auto m
= missing_target_objects
.rbegin();
3800 m
!= missing_target_objects
.rend(); ++m
) {
3802 int64_t extra_missing
= -1;
3804 if (pool
.info
.is_replicated()) {
3805 if (!acting_source_objects
.empty()) {
3806 auto extra_copy
= acting_source_objects
.begin();
3807 extra_missing
= std::get
<0>(*extra_copy
);
3808 acting_source_objects
.erase(extra_copy
);
3810 } else { // Erasure coded
3811 // Use corresponding shard
3812 for (const auto& a
: acting_source_objects
) {
3813 if (std::get
<1>(a
).shard
== std::get
<1>(*m
).shard
) {
3814 extra_missing
= std::get
<0>(a
);
3815 acting_source_objects
.erase(a
);
3821 if (extra_missing
>= 0 && std::get
<0>(*m
) >= extra_missing
) {
3822 // We don't know which of the objects on the target
3823 // are part of extra_missing so assume are all degraded.
3824 misplaced
+= std::get
<0>(*m
) - extra_missing
;
3825 degraded
+= extra_missing
;
3827 // 1. extra_missing == -1, more targets than sources so degraded
3828 // 2. extra_missing > std::get<0>(m), so that we know that some extra_missing
3829 // previously degraded are now present on the target.
3830 degraded
+= std::get
<0>(*m
);
3833 // If there are still acting that haven't been accounted for
3834 // then they are misplaced
3835 for (const auto& a
: acting_source_objects
) {
3836 int64_t extra_misplaced
= std::max((int64_t)0, num_objects
- std::get
<0>(a
));
3837 psdout(20) << __func__
<< " extra acting misplaced " << extra_misplaced
3839 misplaced
+= extra_misplaced
;
3842 // NOTE: Tests use these messages to verify this code
3843 psdout(20) << __func__
<< " degraded " << degraded
3844 << (estimate
? " (est)": "") << dendl
;
3845 psdout(20) << __func__
<< " misplaced " << misplaced
3846 << (estimate
? " (est)": "")<< dendl
;
3848 info
.stats
.stats
.sum
.num_objects_degraded
= degraded
;
3849 info
.stats
.stats
.sum
.num_objects_unfound
= get_num_unfound();
3850 info
.stats
.stats
.sum
.num_objects_misplaced
= misplaced
;
3854 std::optional
<pg_stat_t
> PeeringState::prepare_stats_for_publish(
3855 bool pg_stats_publish_valid
,
3856 const pg_stat_t
&pg_stats_publish
,
3857 const object_stat_collection_t
&unstable_stats
)
3859 if (info
.stats
.stats
.sum
.num_scrub_errors
) {
3860 state_set(PG_STATE_INCONSISTENT
);
3862 state_clear(PG_STATE_INCONSISTENT
);
3863 state_clear(PG_STATE_FAILED_REPAIR
);
3866 utime_t now
= ceph_clock_now();
3867 if (info
.stats
.state
!= state
) {
3868 info
.stats
.last_change
= now
;
3869 // Optimistic estimation, if we just find out an inactive PG,
3870 // assumt it is active till now.
3871 if (!(state
& PG_STATE_ACTIVE
) &&
3872 (info
.stats
.state
& PG_STATE_ACTIVE
))
3873 info
.stats
.last_active
= now
;
3875 if ((state
& PG_STATE_ACTIVE
) &&
3876 !(info
.stats
.state
& PG_STATE_ACTIVE
))
3877 info
.stats
.last_became_active
= now
;
3878 if ((state
& (PG_STATE_ACTIVE
|PG_STATE_PEERED
)) &&
3879 !(info
.stats
.state
& (PG_STATE_ACTIVE
|PG_STATE_PEERED
)))
3880 info
.stats
.last_became_peered
= now
;
3881 info
.stats
.state
= state
;
3884 update_calc_stats();
3885 if (info
.stats
.stats
.sum
.num_objects_degraded
) {
3886 state_set(PG_STATE_DEGRADED
);
3888 state_clear(PG_STATE_DEGRADED
);
3890 update_blocked_by();
3892 pg_stat_t pre_publish
= info
.stats
;
3893 pre_publish
.stats
.add(unstable_stats
);
3894 utime_t cutoff
= now
;
3895 cutoff
-= cct
->_conf
->osd_pg_stat_report_interval_max
;
3897 // share (some of) our purged_snaps via the pg_stats. limit # of intervals
3898 // because we don't want to make the pg_stat_t structures too expensive.
3899 unsigned max
= cct
->_conf
->osd_max_snap_prune_intervals_per_epoch
;
3901 auto i
= info
.purged_snaps
.begin();
3902 while (num
< max
&& i
!= info
.purged_snaps
.end()) {
3903 pre_publish
.purged_snaps
.insert(i
.get_start(), i
.get_len());
3907 psdout(20) << __func__
<< " reporting purged_snaps "
3908 << pre_publish
.purged_snaps
<< dendl
;
3910 if (pg_stats_publish_valid
&& pre_publish
== pg_stats_publish
&&
3911 info
.stats
.last_fresh
> cutoff
) {
3912 psdout(15) << "publish_stats_to_osd " << pg_stats_publish
.reported_epoch
3913 << ": no change since " << info
.stats
.last_fresh
<< dendl
;
3914 return std::nullopt
;
3916 // update our stat summary and timestamps
3917 info
.stats
.reported_epoch
= get_osdmap_epoch();
3918 ++info
.stats
.reported_seq
;
3920 info
.stats
.last_fresh
= now
;
3922 if (info
.stats
.state
& PG_STATE_CLEAN
)
3923 info
.stats
.last_clean
= now
;
3924 if (info
.stats
.state
& PG_STATE_ACTIVE
)
3925 info
.stats
.last_active
= now
;
3926 if (info
.stats
.state
& (PG_STATE_ACTIVE
|PG_STATE_PEERED
))
3927 info
.stats
.last_peered
= now
;
3928 info
.stats
.last_unstale
= now
;
3929 if ((info
.stats
.state
& PG_STATE_DEGRADED
) == 0)
3930 info
.stats
.last_undegraded
= now
;
3931 if ((info
.stats
.state
& PG_STATE_UNDERSIZED
) == 0)
3932 info
.stats
.last_fullsized
= now
;
3934 psdout(15) << "publish_stats_to_osd " << pg_stats_publish
.reported_epoch
3935 << ":" << pg_stats_publish
.reported_seq
<< dendl
;
3936 return std::make_optional(std::move(pre_publish
));
3940 void PeeringState::init(
3942 const vector
<int>& newup
, int new_up_primary
,
3943 const vector
<int>& newacting
, int new_acting_primary
,
3944 const pg_history_t
& history
,
3945 const PastIntervals
& pi
,
3947 ObjectStore::Transaction
&t
)
3949 psdout(10) << "init role " << role
<< " up "
3950 << newup
<< " acting " << newacting
3951 << " history " << history
3952 << " past_intervals " << pi
3956 init_primary_up_acting(
3960 new_acting_primary
);
3962 info
.history
= history
;
3963 past_intervals
= pi
;
3966 info
.stats
.up_primary
= new_up_primary
;
3967 info
.stats
.acting
= acting
;
3968 info
.stats
.acting_primary
= new_acting_primary
;
3969 info
.stats
.mapping_epoch
= info
.history
.same_interval_since
;
3971 if (!perform_deletes_during_peering()) {
3972 pg_log
.set_missing_may_contain_deletes();
3976 psdout(10) << __func__
<< ": Setting backfill" << dendl
;
3977 info
.set_last_backfill(hobject_t());
3978 info
.last_complete
= info
.last_update
;
3979 pg_log
.mark_log_for_rewrite();
3985 dirty_big_info
= true;
3989 void PeeringState::dump_peering_state(Formatter
*f
)
3991 f
->dump_string("state", get_pg_state_string());
3992 f
->dump_unsigned("epoch", get_osdmap_epoch());
3993 f
->open_array_section("up");
3994 for (auto p
= up
.begin(); p
!= up
.end(); ++p
)
3995 f
->dump_unsigned("osd", *p
);
3997 f
->open_array_section("acting");
3998 for (auto p
= acting
.begin(); p
!= acting
.end(); ++p
)
3999 f
->dump_unsigned("osd", *p
);
4001 if (!backfill_targets
.empty()) {
4002 f
->open_array_section("backfill_targets");
4003 for (auto p
= backfill_targets
.begin(); p
!= backfill_targets
.end(); ++p
)
4004 f
->dump_stream("shard") << *p
;
4007 if (!async_recovery_targets
.empty()) {
4008 f
->open_array_section("async_recovery_targets");
4009 for (auto p
= async_recovery_targets
.begin();
4010 p
!= async_recovery_targets
.end();
4012 f
->dump_stream("shard") << *p
;
4015 if (!acting_recovery_backfill
.empty()) {
4016 f
->open_array_section("acting_recovery_backfill");
4017 for (auto p
= acting_recovery_backfill
.begin();
4018 p
!= acting_recovery_backfill
.end();
4020 f
->dump_stream("shard") << *p
;
4023 f
->open_object_section("info");
4024 update_calc_stats();
4028 f
->open_array_section("peer_info");
4029 for (auto p
= peer_info
.begin(); p
!= peer_info
.end(); ++p
) {
4030 f
->open_object_section("info");
4031 f
->dump_stream("peer") << p
->first
;
4038 void PeeringState::update_stats(
4039 std::function
<bool(pg_history_t
&, pg_stat_t
&)> f
,
4040 ObjectStore::Transaction
*t
) {
4041 if (f(info
.history
, info
.stats
)) {
4042 pl
->publish_stats_to_osd();
4044 pl
->on_info_history_change();
4052 bool PeeringState::append_log_entries_update_missing(
4053 const mempool::osd_pglog::list
<pg_log_entry_t
> &entries
,
4054 ObjectStore::Transaction
&t
, std::optional
<eversion_t
> trim_to
,
4055 std::optional
<eversion_t
> roll_forward_to
)
4057 ceph_assert(!entries
.empty());
4058 ceph_assert(entries
.begin()->version
> info
.last_update
);
4060 PGLog::LogEntryHandlerRef rollbacker
{pl
->get_log_handler(t
)};
4061 bool invalidate_stats
=
4062 pg_log
.append_new_log_entries(
4067 if (roll_forward_to
&& entries
.rbegin()->soid
> info
.last_backfill
) {
4068 pg_log
.roll_forward(rollbacker
.get());
4070 if (roll_forward_to
&& *roll_forward_to
> pg_log
.get_can_rollback_to()) {
4071 pg_log
.roll_forward_to(*roll_forward_to
, rollbacker
.get());
4072 last_rollback_info_trimmed_to_applied
= *roll_forward_to
;
4075 info
.last_update
= pg_log
.get_head();
4077 if (pg_log
.get_missing().num_missing() == 0) {
4078 // advance last_complete since nothing else is missing!
4079 info
.last_complete
= info
.last_update
;
4081 info
.stats
.stats_invalid
= info
.stats
.stats_invalid
|| invalidate_stats
;
4083 psdout(20) << __func__
<< " trim_to bool = " << bool(trim_to
)
4084 << " trim_to = " << (trim_to
? *trim_to
: eversion_t()) << dendl
;
4086 pg_log
.trim(*trim_to
, info
);
4089 return invalidate_stats
;
4092 void PeeringState::merge_new_log_entries(
4093 const mempool::osd_pglog::list
<pg_log_entry_t
> &entries
,
4094 ObjectStore::Transaction
&t
,
4095 std::optional
<eversion_t
> trim_to
,
4096 std::optional
<eversion_t
> roll_forward_to
)
4098 psdout(10) << __func__
<< " " << entries
<< dendl
;
4099 ceph_assert(is_primary());
4101 bool rebuild_missing
= append_log_entries_update_missing(entries
, t
, trim_to
, roll_forward_to
);
4102 for (auto i
= acting_recovery_backfill
.begin();
4103 i
!= acting_recovery_backfill
.end();
4105 pg_shard_t
peer(*i
);
4106 if (peer
== pg_whoami
) continue;
4107 ceph_assert(peer_missing
.count(peer
));
4108 ceph_assert(peer_info
.count(peer
));
4109 pg_missing_t
& pmissing(peer_missing
[peer
]);
4110 psdout(20) << __func__
<< " peer_missing for " << peer
4111 << " = " << pmissing
<< dendl
;
4112 pg_info_t
& pinfo(peer_info
[peer
]);
4113 bool invalidate_stats
= PGLog::append_log_entries_update_missing(
4114 pinfo
.last_backfill
,
4121 pinfo
.last_update
= info
.last_update
;
4122 pinfo
.stats
.stats_invalid
= pinfo
.stats
.stats_invalid
|| invalidate_stats
;
4123 rebuild_missing
= rebuild_missing
|| invalidate_stats
;
4126 if (!rebuild_missing
) {
4130 for (auto &&i
: entries
) {
4131 missing_loc
.rebuild(
4134 acting_recovery_backfill
,
4136 pg_log
.get_missing(),
4142 void PeeringState::add_log_entry(const pg_log_entry_t
& e
, bool applied
)
4144 // raise last_complete only if we were previously up to date
4145 if (info
.last_complete
== info
.last_update
)
4146 info
.last_complete
= e
.version
;
4148 // raise last_update.
4149 ceph_assert(e
.version
> info
.last_update
);
4150 info
.last_update
= e
.version
;
4152 // raise user_version, if it increased (it may have not get bumped
4153 // by all logged updates)
4154 if (e
.user_version
> info
.last_user_version
)
4155 info
.last_user_version
= e
.user_version
;
4158 pg_log
.add(e
, applied
);
4159 psdout(10) << "add_log_entry " << e
<< dendl
;
4163 void PeeringState::append_log(
4164 vector
<pg_log_entry_t
>&& logv
,
4166 eversion_t roll_forward_to
,
4168 ObjectStore::Transaction
&t
,
4169 bool transaction_applied
,
4172 /* The primary has sent an info updating the history, but it may not
4173 * have arrived yet. We want to make sure that we cannot remember this
4174 * write without remembering that it happened in an interval which went
4175 * active in epoch history.last_epoch_started.
4177 if (info
.last_epoch_started
!= info
.history
.last_epoch_started
) {
4178 info
.history
.last_epoch_started
= info
.last_epoch_started
;
4180 if (info
.last_interval_started
!= info
.history
.last_interval_started
) {
4181 info
.history
.last_interval_started
= info
.last_interval_started
;
4183 psdout(10) << "append_log " << pg_log
.get_log() << " " << logv
<< dendl
;
4185 PGLog::LogEntryHandlerRef handler
{pl
->get_log_handler(t
)};
4186 if (!transaction_applied
) {
4187 /* We must be a backfill or async recovery peer, so it's ok if we apply
4188 * out-of-turn since we won't be considered when
4189 * determining a min possible last_update.
4191 * We skip_rollforward() here, which advances the crt, without
4192 * doing an actual rollforward. This avoids cleaning up entries
4193 * from the backend and we do not end up in a situation, where the
4194 * object is deleted before we can _merge_object_divergent_entries().
4196 pg_log
.skip_rollforward();
4199 for (auto p
= logv
.begin(); p
!= logv
.end(); ++p
) {
4200 add_log_entry(*p
, transaction_applied
);
4202 /* We don't want to leave the rollforward artifacts around
4203 * here past last_backfill. It's ok for the same reason as
4205 if (transaction_applied
&&
4206 p
->soid
> info
.last_backfill
) {
4207 pg_log
.roll_forward(handler
.get());
4210 if (transaction_applied
&& roll_forward_to
> pg_log
.get_can_rollback_to()) {
4211 pg_log
.roll_forward_to(
4214 last_rollback_info_trimmed_to_applied
= roll_forward_to
;
4217 psdout(10) << __func__
<< " approx pg log length = "
4218 << pg_log
.get_log().approx_size() << dendl
;
4219 psdout(10) << __func__
<< " transaction_applied = "
4220 << transaction_applied
<< dendl
;
4221 if (!transaction_applied
|| async
)
4222 psdout(10) << __func__
<< " " << pg_whoami
4223 << " is async_recovery or backfill target" << dendl
;
4224 pg_log
.trim(trim_to
, info
, transaction_applied
, async
);
4226 // update the local pg, pg log
4231 min_last_complete_ondisk
= mlcod
;
4234 void PeeringState::recover_got(
4235 const hobject_t
&oid
, eversion_t v
,
4237 ObjectStore::Transaction
&t
)
4239 if (v
> pg_log
.get_can_rollback_to()) {
4240 /* This can only happen during a repair, and even then, it would
4241 * be one heck of a race. If we are repairing the object, the
4242 * write in question must be fully committed, so it's not valid
4243 * to roll it back anyway (and we'll be rolled forward shortly
4245 PGLog::LogEntryHandlerRef handler
{pl
->get_log_handler(t
)};
4246 pg_log
.roll_forward_to(v
, handler
.get());
4249 psdout(10) << "got missing " << oid
<< " v " << v
<< dendl
;
4250 pg_log
.recover_got(oid
, v
, info
);
4251 if (pg_log
.get_log().log
.empty()) {
4252 psdout(10) << "last_complete now " << info
.last_complete
4253 << " while log is empty" << dendl
;
4254 } else if (pg_log
.get_log().complete_to
!= pg_log
.get_log().log
.end()) {
4255 psdout(10) << "last_complete now " << info
.last_complete
4256 << " log.complete_to " << pg_log
.get_log().complete_to
->version
4259 psdout(10) << "last_complete now " << info
.last_complete
4260 << " log.complete_to at end" << dendl
;
4261 //below is not true in the repair case.
4262 //assert(missing.num_missing() == 0); // otherwise, complete_to was wrong.
4263 ceph_assert(info
.last_complete
== info
.last_update
);
4267 ceph_assert(missing_loc
.needs_recovery(oid
));
4269 missing_loc
.add_location(oid
, pg_whoami
);
4277 void PeeringState::update_backfill_progress(
4278 const hobject_t
&updated_backfill
,
4279 const pg_stat_t
&updated_stats
,
4280 bool preserve_local_num_bytes
,
4281 ObjectStore::Transaction
&t
) {
4282 info
.set_last_backfill(updated_backfill
);
4283 if (preserve_local_num_bytes
) {
4284 psdout(25) << __func__
<< " primary " << updated_stats
.stats
.sum
.num_bytes
4285 << " local " << info
.stats
.stats
.sum
.num_bytes
<< dendl
;
4286 int64_t bytes
= info
.stats
.stats
.sum
.num_bytes
;
4287 info
.stats
= updated_stats
;
4288 info
.stats
.stats
.sum
.num_bytes
= bytes
;
4290 psdout(20) << __func__
<< " final " << updated_stats
.stats
.sum
.num_bytes
4291 << " replaces local " << info
.stats
.stats
.sum
.num_bytes
<< dendl
;
4292 info
.stats
= updated_stats
;
4299 void PeeringState::adjust_purged_snaps(
4300 std::function
<void(interval_set
<snapid_t
> &snaps
)> f
) {
4301 f(info
.purged_snaps
);
4303 dirty_big_info
= true;
4306 void PeeringState::on_peer_recover(
4308 const hobject_t
&soid
,
4309 const eversion_t
&version
)
4311 pl
->publish_stats_to_osd();
4313 peer_missing
[peer
].got(soid
, version
);
4314 missing_loc
.add_location(soid
, peer
);
4317 void PeeringState::begin_peer_recover(
4319 const hobject_t soid
)
4321 peer_missing
[peer
].revise_have(soid
, eversion_t());
4324 void PeeringState::force_object_missing(
4325 const set
<pg_shard_t
> &peers
,
4326 const hobject_t
&soid
,
4329 for (auto &&peer
: peers
) {
4330 if (peer
!= primary
) {
4331 peer_missing
[peer
].add(soid
, version
, eversion_t(), false);
4333 pg_log
.missing_add(soid
, version
, eversion_t());
4334 pg_log
.reset_complete_to(&info
);
4335 pg_log
.set_last_requested(0);
4339 missing_loc
.rebuild(
4342 acting_recovery_backfill
,
4344 pg_log
.get_missing(),
4349 void PeeringState::pre_submit_op(
4350 const hobject_t
&hoid
,
4351 const vector
<pg_log_entry_t
>& logv
,
4352 eversion_t at_version
)
4354 if (at_version
> eversion_t()) {
4355 for (auto &&i
: get_acting_recovery_backfill()) {
4356 if (i
== primary
) continue;
4357 pg_info_t
&pinfo
= peer_info
[i
];
4358 // keep peer_info up to date
4359 if (pinfo
.last_complete
== pinfo
.last_update
)
4360 pinfo
.last_complete
= at_version
;
4361 pinfo
.last_update
= at_version
;
4365 bool requires_missing_loc
= false;
4366 for (auto &&i
: get_async_recovery_targets()) {
4367 if (i
== primary
|| !get_peer_missing(i
).is_missing(hoid
))
4369 requires_missing_loc
= true;
4370 for (auto &&entry
: logv
) {
4371 peer_missing
[i
].add_next_event(entry
);
4375 if (requires_missing_loc
) {
4376 for (auto &&entry
: logv
) {
4377 psdout(30) << __func__
<< " missing_loc before: "
4378 << missing_loc
.get_locations(entry
.soid
) << dendl
;
4379 missing_loc
.add_missing(entry
.soid
, entry
.version
,
4380 eversion_t(), entry
.is_delete());
4381 // clear out missing_loc
4382 missing_loc
.clear_location(entry
.soid
);
4383 for (auto &i
: get_actingset()) {
4384 if (!get_peer_missing(i
).is_missing(entry
.soid
))
4385 missing_loc
.add_location(entry
.soid
, i
);
4387 psdout(30) << __func__
<< " missing_loc after: "
4388 << missing_loc
.get_locations(entry
.soid
) << dendl
;
4393 void PeeringState::recovery_committed_to(eversion_t version
)
4395 psdout(10) << __func__
<< " version " << version
4396 << " now ondisk" << dendl
;
4397 last_complete_ondisk
= version
;
4399 if (last_complete_ondisk
== info
.last_update
) {
4400 if (!is_primary()) {
4401 // Either we are a replica or backfill target.
4402 // we are fully up to date. tell the primary!
4403 pl
->send_cluster_message(
4405 make_message
<MOSDPGTrim
>(
4407 spg_t(info
.pgid
.pgid
, primary
.shard
),
4408 last_complete_ondisk
),
4409 get_osdmap_epoch());
4411 calc_min_last_complete_ondisk();
4416 void PeeringState::complete_write(eversion_t v
, eversion_t lc
)
4418 last_update_ondisk
= v
;
4419 last_complete_ondisk
= lc
;
4420 calc_min_last_complete_ondisk();
4423 void PeeringState::calc_trim_to()
4425 size_t target
= pl
->get_target_pg_log_entries();
4427 eversion_t limit
= std::min(
4428 min_last_complete_ondisk
,
4429 pg_log
.get_can_rollback_to());
4430 if (limit
!= eversion_t() &&
4431 limit
!= pg_trim_to
&&
4432 pg_log
.get_log().approx_size() > target
) {
4433 size_t num_to_trim
= std::min(pg_log
.get_log().approx_size() - target
,
4434 cct
->_conf
->osd_pg_log_trim_max
);
4435 if (num_to_trim
< cct
->_conf
->osd_pg_log_trim_min
&&
4436 cct
->_conf
->osd_pg_log_trim_max
>= cct
->_conf
->osd_pg_log_trim_min
) {
4439 auto it
= pg_log
.get_log().log
.begin();
4440 eversion_t new_trim_to
;
4441 for (size_t i
= 0; i
< num_to_trim
; ++i
) {
4442 new_trim_to
= it
->version
;
4444 if (new_trim_to
> limit
) {
4445 new_trim_to
= limit
;
4446 psdout(10) << "calc_trim_to trimming to min_last_complete_ondisk" << dendl
;
4450 psdout(10) << "calc_trim_to " << pg_trim_to
<< " -> " << new_trim_to
<< dendl
;
4451 pg_trim_to
= new_trim_to
;
4452 assert(pg_trim_to
<= pg_log
.get_head());
4453 assert(pg_trim_to
<= min_last_complete_ondisk
);
4457 void PeeringState::calc_trim_to_aggressive()
4459 size_t target
= pl
->get_target_pg_log_entries();
4461 // limit pg log trimming up to the can_rollback_to value
4462 eversion_t limit
= std::min({
4464 pg_log
.get_can_rollback_to(),
4465 last_update_ondisk
});
4466 psdout(10) << __func__
<< " limit = " << limit
<< dendl
;
4468 if (limit
!= eversion_t() &&
4469 limit
!= pg_trim_to
&&
4470 pg_log
.get_log().approx_size() > target
) {
4471 psdout(10) << __func__
<< " approx pg log length = "
4472 << pg_log
.get_log().approx_size() << dendl
;
4473 uint64_t num_to_trim
= std::min
<uint64_t>(pg_log
.get_log().approx_size() - target
,
4474 cct
->_conf
->osd_pg_log_trim_max
);
4475 psdout(10) << __func__
<< " num_to_trim = " << num_to_trim
<< dendl
;
4476 if (num_to_trim
< cct
->_conf
->osd_pg_log_trim_min
&&
4477 cct
->_conf
->osd_pg_log_trim_max
>= cct
->_conf
->osd_pg_log_trim_min
) {
4480 auto it
= pg_log
.get_log().log
.begin(); // oldest log entry
4481 auto rit
= pg_log
.get_log().log
.rbegin();
4482 eversion_t by_n_to_keep
; // start from tail
4483 eversion_t by_n_to_trim
= eversion_t::max(); // start from head
4484 for (size_t i
= 0; it
!= pg_log
.get_log().log
.end(); ++it
, ++rit
) {
4486 if (i
> target
&& by_n_to_keep
== eversion_t()) {
4487 by_n_to_keep
= rit
->version
;
4489 if (i
>= num_to_trim
&& by_n_to_trim
== eversion_t::max()) {
4490 by_n_to_trim
= it
->version
;
4492 if (by_n_to_keep
!= eversion_t() &&
4493 by_n_to_trim
!= eversion_t::max()) {
4498 if (by_n_to_keep
== eversion_t()) {
4502 pg_trim_to
= std::min({by_n_to_keep
, by_n_to_trim
, limit
});
4503 psdout(10) << __func__
<< " pg_trim_to now " << pg_trim_to
<< dendl
;
4504 ceph_assert(pg_trim_to
<= pg_log
.get_head());
4508 void PeeringState::apply_op_stats(
4509 const hobject_t
&soid
,
4510 const object_stat_sum_t
&delta_stats
)
4512 info
.stats
.stats
.add(delta_stats
);
4513 info
.stats
.stats
.floor(0);
4515 for (auto i
= get_backfill_targets().begin();
4516 i
!= get_backfill_targets().end();
4519 pg_info_t
& pinfo
= peer_info
[bt
];
4520 if (soid
<= pinfo
.last_backfill
)
4521 pinfo
.stats
.stats
.add(delta_stats
);
4525 void PeeringState::update_complete_backfill_object_stats(
4526 const hobject_t
&hoid
,
4527 const pg_stat_t
&stats
)
4529 for (auto &&bt
: get_backfill_targets()) {
4530 pg_info_t
& pinfo
= peer_info
[bt
];
4531 //Add stats to all peers that were missing object
4532 if (hoid
> pinfo
.last_backfill
)
4533 pinfo
.stats
.add(stats
);
4537 void PeeringState::update_peer_last_backfill(
4539 const hobject_t
&new_last_backfill
)
4541 pg_info_t
&pinfo
= peer_info
[peer
];
4542 pinfo
.last_backfill
= new_last_backfill
;
4543 if (new_last_backfill
.is_max()) {
4544 /* pinfo.stats might be wrong if we did log-based recovery on the
4545 * backfilled portion in addition to continuing backfill.
4547 pinfo
.stats
= info
.stats
;
4551 void PeeringState::set_revert_with_targets(
4552 const hobject_t
&soid
,
4553 const set
<pg_shard_t
> &good_peers
)
4555 for (auto &&peer
: good_peers
) {
4556 missing_loc
.add_location(soid
, peer
);
4560 void PeeringState::prepare_backfill_for_missing(
4561 const hobject_t
&soid
,
4562 const eversion_t
&version
,
4563 const vector
<pg_shard_t
> &targets
) {
4564 for (auto &&peer
: targets
) {
4565 peer_missing
[peer
].add(soid
, version
, eversion_t(), false);
4569 void PeeringState::update_hset(const pg_hit_set_history_t
&hset_history
)
4571 info
.hit_set
= hset_history
;
4574 /*------------ Peering State Machine----------------*/
4576 #define dout_prefix (context< PeeringMachine >().dpp->gen_prefix(*_dout) \
4577 << "state<" << get_state_name() << ">: ")
4579 #define psdout(x) ldout(context< PeeringMachine >().cct, x)
4581 #define DECLARE_LOCALS \
4582 PeeringState *ps = context< PeeringMachine >().state; \
4584 PeeringListener *pl = context< PeeringMachine >().pl; \
4588 /*------Crashed-------*/
4589 PeeringState::Crashed::Crashed(my_context ctx
)
4591 NamedState(context
< PeeringMachine
>().state_history
, "Crashed")
4593 context
< PeeringMachine
>().log_enter(state_name
);
4594 ceph_abort_msg("we got a bad state machine event");
4598 /*------Initial-------*/
4599 PeeringState::Initial::Initial(my_context ctx
)
4601 NamedState(context
< PeeringMachine
>().state_history
, "Initial")
4603 context
< PeeringMachine
>().log_enter(state_name
);
4606 boost::statechart::result
PeeringState::Initial::react(const MNotifyRec
& notify
)
4609 ps
->proc_replica_info(
4610 notify
.from
, notify
.notify
.info
, notify
.notify
.epoch_sent
);
4611 ps
->set_last_peering_reset();
4612 return transit
< Primary
>();
4615 boost::statechart::result
PeeringState::Initial::react(const MInfoRec
& i
)
4618 ceph_assert(!ps
->is_primary());
4620 return transit
< Stray
>();
4623 boost::statechart::result
PeeringState::Initial::react(const MLogRec
& i
)
4626 ceph_assert(!ps
->is_primary());
4628 return transit
< Stray
>();
4631 void PeeringState::Initial::exit()
4633 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
4635 utime_t dur
= ceph_clock_now() - enter_time
;
4636 pl
->get_peering_perf().tinc(rs_initial_latency
, dur
);
4639 /*------Started-------*/
4640 PeeringState::Started::Started(my_context ctx
)
4642 NamedState(context
< PeeringMachine
>().state_history
, "Started")
4644 context
< PeeringMachine
>().log_enter(state_name
);
4647 boost::statechart::result
4648 PeeringState::Started::react(const IntervalFlush
&)
4650 psdout(10) << "Ending blocked outgoing recovery messages" << dendl
;
4651 context
< PeeringMachine
>().state
->end_block_outgoing();
4652 return discard_event();
4655 boost::statechart::result
PeeringState::Started::react(const AdvMap
& advmap
)
4658 psdout(10) << "Started advmap" << dendl
;
4659 ps
->check_full_transition(advmap
.lastmap
, advmap
.osdmap
);
4660 if (ps
->should_restart_peering(
4662 advmap
.acting_primary
,
4667 psdout(10) << "should_restart_peering, transitioning to Reset"
4670 return transit
< Reset
>();
4672 ps
->remove_down_peer_info(advmap
.osdmap
);
4673 return discard_event();
4676 boost::statechart::result
PeeringState::Started::react(const QueryState
& q
)
4678 q
.f
->open_object_section("state");
4679 q
.f
->dump_string("name", state_name
);
4680 q
.f
->dump_stream("enter_time") << enter_time
;
4681 q
.f
->close_section();
4682 return discard_event();
4685 boost::statechart::result
PeeringState::Started::react(const QueryUnfound
& q
)
4687 q
.f
->dump_string("state", "Started");
4688 q
.f
->dump_bool("available_might_have_unfound", false);
4689 return discard_event();
4692 void PeeringState::Started::exit()
4694 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
4696 utime_t dur
= ceph_clock_now() - enter_time
;
4697 pl
->get_peering_perf().tinc(rs_started_latency
, dur
);
4698 ps
->state_clear(PG_STATE_WAIT
| PG_STATE_LAGGY
);
4701 /*--------Reset---------*/
4702 PeeringState::Reset::Reset(my_context ctx
)
4704 NamedState(context
< PeeringMachine
>().state_history
, "Reset")
4706 context
< PeeringMachine
>().log_enter(state_name
);
4709 ps
->flushes_in_progress
= 0;
4710 ps
->set_last_peering_reset();
4711 ps
->log_weirdness();
4714 boost::statechart::result
4715 PeeringState::Reset::react(const IntervalFlush
&)
4717 psdout(10) << "Ending blocked outgoing recovery messages" << dendl
;
4718 context
< PeeringMachine
>().state
->end_block_outgoing();
4719 return discard_event();
4722 boost::statechart::result
PeeringState::Reset::react(const AdvMap
& advmap
)
4725 psdout(10) << "Reset advmap" << dendl
;
4727 ps
->check_full_transition(advmap
.lastmap
, advmap
.osdmap
);
4729 if (ps
->should_restart_peering(
4731 advmap
.acting_primary
,
4736 psdout(10) << "should restart peering, calling start_peering_interval again"
4738 ps
->start_peering_interval(
4740 advmap
.newup
, advmap
.up_primary
,
4741 advmap
.newacting
, advmap
.acting_primary
,
4742 context
< PeeringMachine
>().get_cur_transaction());
4744 ps
->remove_down_peer_info(advmap
.osdmap
);
4745 ps
->check_past_interval_bounds();
4746 return discard_event();
4749 boost::statechart::result
PeeringState::Reset::react(const ActMap
&)
4752 if (ps
->should_send_notify() && ps
->get_primary().osd
>= 0) {
4753 ps
->info
.history
.refresh_prior_readable_until_ub(
4755 ps
->prior_readable_until_ub
);
4756 context
< PeeringMachine
>().send_notify(
4757 ps
->get_primary().osd
,
4759 ps
->get_primary().shard
, ps
->pg_whoami
.shard
,
4760 ps
->get_osdmap_epoch(),
4761 ps
->get_osdmap_epoch(),
4763 ps
->past_intervals
));
4766 ps
->update_heartbeat_peers();
4768 return transit
< Started
>();
4771 boost::statechart::result
PeeringState::Reset::react(const QueryState
& q
)
4773 q
.f
->open_object_section("state");
4774 q
.f
->dump_string("name", state_name
);
4775 q
.f
->dump_stream("enter_time") << enter_time
;
4776 q
.f
->close_section();
4777 return discard_event();
4780 boost::statechart::result
PeeringState::Reset::react(const QueryUnfound
& q
)
4782 q
.f
->dump_string("state", "Reset");
4783 q
.f
->dump_bool("available_might_have_unfound", false);
4784 return discard_event();
4787 void PeeringState::Reset::exit()
4789 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
4791 utime_t dur
= ceph_clock_now() - enter_time
;
4792 pl
->get_peering_perf().tinc(rs_reset_latency
, dur
);
4795 /*-------Start---------*/
4796 PeeringState::Start::Start(my_context ctx
)
4798 NamedState(context
< PeeringMachine
>().state_history
, "Start")
4800 context
< PeeringMachine
>().log_enter(state_name
);
4803 if (ps
->is_primary()) {
4804 psdout(1) << "transitioning to Primary" << dendl
;
4805 post_event(MakePrimary());
4807 psdout(1) << "transitioning to Stray" << dendl
;
4808 post_event(MakeStray());
4812 void PeeringState::Start::exit()
4814 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
4816 utime_t dur
= ceph_clock_now() - enter_time
;
4817 pl
->get_peering_perf().tinc(rs_start_latency
, dur
);
4820 /*---------Primary--------*/
4821 PeeringState::Primary::Primary(my_context ctx
)
4823 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary")
4825 context
< PeeringMachine
>().log_enter(state_name
);
4827 ceph_assert(ps
->want_acting
.empty());
4829 // set CREATING bit until we have peered for the first time.
4830 if (ps
->info
.history
.last_epoch_started
== 0) {
4831 ps
->state_set(PG_STATE_CREATING
);
4832 // use the history timestamp, which ultimately comes from the
4833 // monitor in the create case.
4834 utime_t t
= ps
->info
.history
.last_scrub_stamp
;
4835 ps
->info
.stats
.last_fresh
= t
;
4836 ps
->info
.stats
.last_active
= t
;
4837 ps
->info
.stats
.last_change
= t
;
4838 ps
->info
.stats
.last_peered
= t
;
4839 ps
->info
.stats
.last_clean
= t
;
4840 ps
->info
.stats
.last_unstale
= t
;
4841 ps
->info
.stats
.last_undegraded
= t
;
4842 ps
->info
.stats
.last_fullsized
= t
;
4843 ps
->info
.stats
.last_scrub_stamp
= t
;
4844 ps
->info
.stats
.last_deep_scrub_stamp
= t
;
4845 ps
->info
.stats
.last_clean_scrub_stamp
= t
;
4849 boost::statechart::result
PeeringState::Primary::react(const MNotifyRec
& notevt
)
4852 psdout(7) << "handle_pg_notify from osd." << notevt
.from
<< dendl
;
4853 ps
->proc_replica_info(
4854 notevt
.from
, notevt
.notify
.info
, notevt
.notify
.epoch_sent
);
4855 return discard_event();
4858 boost::statechart::result
PeeringState::Primary::react(const ActMap
&)
4861 psdout(7) << "handle ActMap primary" << dendl
;
4862 pl
->publish_stats_to_osd();
4863 return discard_event();
4866 boost::statechart::result
PeeringState::Primary::react(
4867 const SetForceRecovery
&)
4870 ps
->set_force_recovery(true);
4871 return discard_event();
4874 boost::statechart::result
PeeringState::Primary::react(
4875 const UnsetForceRecovery
&)
4878 ps
->set_force_recovery(false);
4879 return discard_event();
4882 boost::statechart::result
PeeringState::Primary::react(
4883 const RequestScrub
& evt
)
4886 if (ps
->is_primary()) {
4887 pl
->scrub_requested(evt
.deep
, evt
.repair
);
4888 psdout(10) << "marking for scrub" << dendl
;
4890 return discard_event();
4893 boost::statechart::result
PeeringState::Primary::react(
4894 const SetForceBackfill
&)
4897 ps
->set_force_backfill(true);
4898 return discard_event();
4901 boost::statechart::result
PeeringState::Primary::react(
4902 const UnsetForceBackfill
&)
4905 ps
->set_force_backfill(false);
4906 return discard_event();
4909 void PeeringState::Primary::exit()
4911 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
4913 ps
->want_acting
.clear();
4914 utime_t dur
= ceph_clock_now() - enter_time
;
4915 pl
->get_peering_perf().tinc(rs_primary_latency
, dur
);
4916 pl
->clear_primary_state();
4917 ps
->state_clear(PG_STATE_CREATING
);
4920 /*---------Peering--------*/
4921 PeeringState::Peering::Peering(my_context ctx
)
4923 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Peering"),
4924 history_les_bound(false)
4926 context
< PeeringMachine
>().log_enter(state_name
);
4929 ceph_assert(!ps
->is_peered());
4930 ceph_assert(!ps
->is_peering());
4931 ceph_assert(ps
->is_primary());
4932 ps
->state_set(PG_STATE_PEERING
);
4935 boost::statechart::result
PeeringState::Peering::react(const AdvMap
& advmap
)
4938 psdout(10) << "Peering advmap" << dendl
;
4939 if (prior_set
.affected_by_map(*(advmap
.osdmap
), ps
->dpp
)) {
4940 psdout(1) << "Peering, affected_by_map, going to Reset" << dendl
;
4942 return transit
< Reset
>();
4945 ps
->adjust_need_up_thru(advmap
.osdmap
);
4946 ps
->check_prior_readable_down_osds(advmap
.osdmap
);
4948 return forward_event();
4951 boost::statechart::result
PeeringState::Peering::react(const QueryState
& q
)
4955 q
.f
->open_object_section("state");
4956 q
.f
->dump_string("name", state_name
);
4957 q
.f
->dump_stream("enter_time") << enter_time
;
4959 q
.f
->open_array_section("past_intervals");
4960 ps
->past_intervals
.dump(q
.f
);
4961 q
.f
->close_section();
4963 q
.f
->open_array_section("probing_osds");
4964 for (auto p
= prior_set
.probe
.begin(); p
!= prior_set
.probe
.end(); ++p
)
4965 q
.f
->dump_stream("osd") << *p
;
4966 q
.f
->close_section();
4968 if (prior_set
.pg_down
)
4969 q
.f
->dump_string("blocked", "peering is blocked due to down osds");
4971 q
.f
->open_array_section("down_osds_we_would_probe");
4972 for (auto p
= prior_set
.down
.begin(); p
!= prior_set
.down
.end(); ++p
)
4973 q
.f
->dump_int("osd", *p
);
4974 q
.f
->close_section();
4976 q
.f
->open_array_section("peering_blocked_by");
4977 for (auto p
= prior_set
.blocked_by
.begin();
4978 p
!= prior_set
.blocked_by
.end();
4980 q
.f
->open_object_section("osd");
4981 q
.f
->dump_int("osd", p
->first
);
4982 q
.f
->dump_int("current_lost_at", p
->second
);
4983 q
.f
->dump_string("comment", "starting or marking this osd lost may let us proceed");
4984 q
.f
->close_section();
4986 q
.f
->close_section();
4988 if (history_les_bound
) {
4989 q
.f
->open_array_section("peering_blocked_by_detail");
4990 q
.f
->open_object_section("item");
4991 q
.f
->dump_string("detail","peering_blocked_by_history_les_bound");
4992 q
.f
->close_section();
4993 q
.f
->close_section();
4996 q
.f
->close_section();
4997 return forward_event();
5000 boost::statechart::result
PeeringState::Peering::react(const QueryUnfound
& q
)
5002 q
.f
->dump_string("state", "Peering");
5003 q
.f
->dump_bool("available_might_have_unfound", false);
5004 return discard_event();
5007 void PeeringState::Peering::exit()
5011 psdout(10) << "Leaving Peering" << dendl
;
5012 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5013 ps
->state_clear(PG_STATE_PEERING
);
5014 pl
->clear_probe_targets();
5016 utime_t dur
= ceph_clock_now() - enter_time
;
5017 pl
->get_peering_perf().tinc(rs_peering_latency
, dur
);
5021 /*------Backfilling-------*/
5022 PeeringState::Backfilling::Backfilling(my_context ctx
)
5024 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/Backfilling")
5026 context
< PeeringMachine
>().log_enter(state_name
);
5030 ps
->backfill_reserved
= true;
5031 pl
->on_backfill_reserved();
5032 ps
->state_clear(PG_STATE_BACKFILL_TOOFULL
);
5033 ps
->state_clear(PG_STATE_BACKFILL_WAIT
);
5034 ps
->state_set(PG_STATE_BACKFILLING
);
5035 pl
->publish_stats_to_osd();
5038 void PeeringState::Backfilling::backfill_release_reservations()
5041 pl
->cancel_local_background_io_reservation();
5042 for (auto it
= ps
->backfill_targets
.begin();
5043 it
!= ps
->backfill_targets
.end();
5045 ceph_assert(*it
!= ps
->pg_whoami
);
5046 pl
->send_cluster_message(
5048 make_message
<MBackfillReserve
>(
5049 MBackfillReserve::RELEASE
,
5050 spg_t(ps
->info
.pgid
.pgid
, it
->shard
),
5051 ps
->get_osdmap_epoch()),
5052 ps
->get_osdmap_epoch());
5056 void PeeringState::Backfilling::cancel_backfill()
5059 backfill_release_reservations();
5060 pl
->on_backfill_canceled();
5063 boost::statechart::result
5064 PeeringState::Backfilling::react(const Backfilled
&c
)
5066 backfill_release_reservations();
5067 return transit
<Recovered
>();
5070 boost::statechart::result
5071 PeeringState::Backfilling::react(const DeferBackfill
&c
)
5075 psdout(10) << "defer backfill, retry delay " << c
.delay
<< dendl
;
5076 ps
->state_set(PG_STATE_BACKFILL_WAIT
);
5077 ps
->state_clear(PG_STATE_BACKFILLING
);
5080 pl
->schedule_event_after(
5081 std::make_shared
<PGPeeringEvent
>(
5082 ps
->get_osdmap_epoch(),
5083 ps
->get_osdmap_epoch(),
5086 return transit
<NotBackfilling
>();
5089 boost::statechart::result
5090 PeeringState::Backfilling::react(const UnfoundBackfill
&c
)
5093 psdout(10) << "backfill has unfound, can't continue" << dendl
;
5094 ps
->state_set(PG_STATE_BACKFILL_UNFOUND
);
5095 ps
->state_clear(PG_STATE_BACKFILLING
);
5097 return transit
<NotBackfilling
>();
5100 boost::statechart::result
5101 PeeringState::Backfilling::react(const RemoteReservationRevokedTooFull
&)
5105 ps
->state_set(PG_STATE_BACKFILL_TOOFULL
);
5106 ps
->state_clear(PG_STATE_BACKFILLING
);
5109 pl
->schedule_event_after(
5110 std::make_shared
<PGPeeringEvent
>(
5111 ps
->get_osdmap_epoch(),
5112 ps
->get_osdmap_epoch(),
5114 ps
->cct
->_conf
->osd_backfill_retry_interval
);
5116 return transit
<NotBackfilling
>();
5119 boost::statechart::result
5120 PeeringState::Backfilling::react(const RemoteReservationRevoked
&)
5123 ps
->state_set(PG_STATE_BACKFILL_WAIT
);
5125 if (ps
->needs_backfill()) {
5126 return transit
<WaitLocalBackfillReserved
>();
5128 // raced with MOSDPGBackfill::OP_BACKFILL_FINISH, ignore
5129 return discard_event();
5133 void PeeringState::Backfilling::exit()
5135 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5137 ps
->backfill_reserved
= false;
5138 ps
->state_clear(PG_STATE_BACKFILLING
);
5139 ps
->state_clear(PG_STATE_FORCED_BACKFILL
| PG_STATE_FORCED_RECOVERY
);
5140 utime_t dur
= ceph_clock_now() - enter_time
;
5141 pl
->get_peering_perf().tinc(rs_backfilling_latency
, dur
);
5144 /*--WaitRemoteBackfillReserved--*/
5146 PeeringState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_context ctx
)
5148 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/WaitRemoteBackfillReserved"),
5149 backfill_osd_it(context
< Active
>().remote_shards_to_reserve_backfill
.begin())
5151 context
< PeeringMachine
>().log_enter(state_name
);
5154 ps
->state_set(PG_STATE_BACKFILL_WAIT
);
5155 pl
->publish_stats_to_osd();
5156 post_event(RemoteBackfillReserved());
5159 boost::statechart::result
5160 PeeringState::WaitRemoteBackfillReserved::react(const RemoteBackfillReserved
&evt
)
5164 int64_t num_bytes
= ps
->info
.stats
.stats
.sum
.num_bytes
;
5165 psdout(10) << __func__
<< " num_bytes " << num_bytes
<< dendl
;
5166 if (backfill_osd_it
!=
5167 context
< Active
>().remote_shards_to_reserve_backfill
.end()) {
5168 // The primary never backfills itself
5169 ceph_assert(*backfill_osd_it
!= ps
->pg_whoami
);
5170 pl
->send_cluster_message(
5171 backfill_osd_it
->osd
,
5172 make_message
<MBackfillReserve
>(
5173 MBackfillReserve::REQUEST
,
5174 spg_t(context
< PeeringMachine
>().spgid
.pgid
, backfill_osd_it
->shard
),
5175 ps
->get_osdmap_epoch(),
5176 ps
->get_backfill_priority(),
5178 ps
->peer_bytes
[*backfill_osd_it
]),
5179 ps
->get_osdmap_epoch());
5182 ps
->peer_bytes
.clear();
5183 post_event(AllBackfillsReserved());
5185 return discard_event();
5188 void PeeringState::WaitRemoteBackfillReserved::exit()
5190 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5193 utime_t dur
= ceph_clock_now() - enter_time
;
5194 pl
->get_peering_perf().tinc(rs_waitremotebackfillreserved_latency
, dur
);
5197 void PeeringState::WaitRemoteBackfillReserved::retry()
5200 pl
->cancel_local_background_io_reservation();
5202 // Send CANCEL to all previously acquired reservations
5203 set
<pg_shard_t
>::const_iterator it
, begin
, end
;
5204 begin
= context
< Active
>().remote_shards_to_reserve_backfill
.begin();
5205 end
= context
< Active
>().remote_shards_to_reserve_backfill
.end();
5206 ceph_assert(begin
!= end
);
5207 for (it
= begin
; it
!= backfill_osd_it
; ++it
) {
5208 // The primary never backfills itself
5209 ceph_assert(*it
!= ps
->pg_whoami
);
5210 pl
->send_cluster_message(
5212 make_message
<MBackfillReserve
>(
5213 MBackfillReserve::RELEASE
,
5214 spg_t(context
< PeeringMachine
>().spgid
.pgid
, it
->shard
),
5215 ps
->get_osdmap_epoch()),
5216 ps
->get_osdmap_epoch());
5219 ps
->state_clear(PG_STATE_BACKFILL_WAIT
);
5220 pl
->publish_stats_to_osd();
5222 pl
->schedule_event_after(
5223 std::make_shared
<PGPeeringEvent
>(
5224 ps
->get_osdmap_epoch(),
5225 ps
->get_osdmap_epoch(),
5227 ps
->cct
->_conf
->osd_backfill_retry_interval
);
5230 boost::statechart::result
5231 PeeringState::WaitRemoteBackfillReserved::react(const RemoteReservationRejectedTooFull
&evt
)
5234 ps
->state_set(PG_STATE_BACKFILL_TOOFULL
);
5236 return transit
<NotBackfilling
>();
5239 boost::statechart::result
5240 PeeringState::WaitRemoteBackfillReserved::react(const RemoteReservationRevoked
&evt
)
5243 return transit
<NotBackfilling
>();
5246 /*--WaitLocalBackfillReserved--*/
5247 PeeringState::WaitLocalBackfillReserved::WaitLocalBackfillReserved(my_context ctx
)
5249 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/WaitLocalBackfillReserved")
5251 context
< PeeringMachine
>().log_enter(state_name
);
5254 ps
->state_set(PG_STATE_BACKFILL_WAIT
);
5255 pl
->request_local_background_io_reservation(
5256 ps
->get_backfill_priority(),
5257 std::make_unique
<PGPeeringEvent
>(
5258 ps
->get_osdmap_epoch(),
5259 ps
->get_osdmap_epoch(),
5260 LocalBackfillReserved()),
5261 std::make_unique
<PGPeeringEvent
>(
5262 ps
->get_osdmap_epoch(),
5263 ps
->get_osdmap_epoch(),
5264 DeferBackfill(0.0)));
5265 pl
->publish_stats_to_osd();
5268 void PeeringState::WaitLocalBackfillReserved::exit()
5270 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5272 utime_t dur
= ceph_clock_now() - enter_time
;
5273 pl
->get_peering_perf().tinc(rs_waitlocalbackfillreserved_latency
, dur
);
5276 /*----NotBackfilling------*/
5277 PeeringState::NotBackfilling::NotBackfilling(my_context ctx
)
5279 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/NotBackfilling")
5281 context
< PeeringMachine
>().log_enter(state_name
);
5283 ps
->state_clear(PG_STATE_REPAIR
);
5284 pl
->publish_stats_to_osd();
5287 boost::statechart::result
PeeringState::NotBackfilling::react(const QueryUnfound
& q
)
5291 ps
->query_unfound(q
.f
, "NotBackfilling");
5292 return discard_event();
5295 boost::statechart::result
5296 PeeringState::NotBackfilling::react(const RemoteBackfillReserved
&evt
)
5298 return discard_event();
5301 boost::statechart::result
5302 PeeringState::NotBackfilling::react(const RemoteReservationRejectedTooFull
&evt
)
5304 return discard_event();
5307 void PeeringState::NotBackfilling::exit()
5309 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5312 ps
->state_clear(PG_STATE_BACKFILL_UNFOUND
);
5313 utime_t dur
= ceph_clock_now() - enter_time
;
5314 pl
->get_peering_perf().tinc(rs_notbackfilling_latency
, dur
);
5317 /*----NotRecovering------*/
5318 PeeringState::NotRecovering::NotRecovering(my_context ctx
)
5320 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/NotRecovering")
5322 context
< PeeringMachine
>().log_enter(state_name
);
5324 ps
->state_clear(PG_STATE_REPAIR
);
5325 pl
->publish_stats_to_osd();
5328 boost::statechart::result
PeeringState::NotRecovering::react(const QueryUnfound
& q
)
5332 ps
->query_unfound(q
.f
, "NotRecovering");
5333 return discard_event();
5336 void PeeringState::NotRecovering::exit()
5338 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5341 ps
->state_clear(PG_STATE_RECOVERY_UNFOUND
);
5342 utime_t dur
= ceph_clock_now() - enter_time
;
5343 pl
->get_peering_perf().tinc(rs_notrecovering_latency
, dur
);
5346 /*---RepNotRecovering----*/
5347 PeeringState::RepNotRecovering::RepNotRecovering(my_context ctx
)
5349 NamedState(context
< PeeringMachine
>().state_history
, "Started/ReplicaActive/RepNotRecovering")
5351 context
< PeeringMachine
>().log_enter(state_name
);
5354 boost::statechart::result
5355 PeeringState::RepNotRecovering::react(const RejectTooFullRemoteReservation
&evt
)
5358 ps
->reject_reservation();
5359 post_event(RemoteReservationRejectedTooFull());
5360 return discard_event();
5363 void PeeringState::RepNotRecovering::exit()
5365 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5367 utime_t dur
= ceph_clock_now() - enter_time
;
5368 pl
->get_peering_perf().tinc(rs_repnotrecovering_latency
, dur
);
5371 /*---RepWaitRecoveryReserved--*/
5372 PeeringState::RepWaitRecoveryReserved::RepWaitRecoveryReserved(my_context ctx
)
5374 NamedState(context
< PeeringMachine
>().state_history
, "Started/ReplicaActive/RepWaitRecoveryReserved")
5376 context
< PeeringMachine
>().log_enter(state_name
);
5379 boost::statechart::result
5380 PeeringState::RepWaitRecoveryReserved::react(const RemoteRecoveryReserved
&evt
)
5383 pl
->send_cluster_message(
5385 make_message
<MRecoveryReserve
>(
5386 MRecoveryReserve::GRANT
,
5387 spg_t(ps
->info
.pgid
.pgid
, ps
->primary
.shard
),
5388 ps
->get_osdmap_epoch()),
5389 ps
->get_osdmap_epoch());
5390 return transit
<RepRecovering
>();
5393 boost::statechart::result
5394 PeeringState::RepWaitRecoveryReserved::react(
5395 const RemoteReservationCanceled
&evt
)
5398 pl
->unreserve_recovery_space();
5400 pl
->cancel_remote_recovery_reservation();
5401 return transit
<RepNotRecovering
>();
5404 void PeeringState::RepWaitRecoveryReserved::exit()
5406 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5408 utime_t dur
= ceph_clock_now() - enter_time
;
5409 pl
->get_peering_perf().tinc(rs_repwaitrecoveryreserved_latency
, dur
);
5412 /*-RepWaitBackfillReserved*/
5413 PeeringState::RepWaitBackfillReserved::RepWaitBackfillReserved(my_context ctx
)
5415 NamedState(context
< PeeringMachine
>().state_history
, "Started/ReplicaActive/RepWaitBackfillReserved")
5417 context
< PeeringMachine
>().log_enter(state_name
);
5420 boost::statechart::result
5421 PeeringState::RepNotRecovering::react(const RequestBackfillPrio
&evt
)
5426 if (!pl
->try_reserve_recovery_space(
5427 evt
.primary_num_bytes
, evt
.local_num_bytes
)) {
5428 post_event(RejectTooFullRemoteReservation());
5430 PGPeeringEventURef preempt
;
5431 if (HAVE_FEATURE(ps
->upacting_features
, RECOVERY_RESERVATION_2
)) {
5432 // older peers will interpret preemption as TOOFULL
5433 preempt
= std::make_unique
<PGPeeringEvent
>(
5434 pl
->get_osdmap_epoch(),
5435 pl
->get_osdmap_epoch(),
5436 RemoteBackfillPreempted());
5438 pl
->request_remote_recovery_reservation(
5440 std::make_unique
<PGPeeringEvent
>(
5441 pl
->get_osdmap_epoch(),
5442 pl
->get_osdmap_epoch(),
5443 RemoteBackfillReserved()),
5444 std::move(preempt
));
5446 return transit
<RepWaitBackfillReserved
>();
5449 boost::statechart::result
5450 PeeringState::RepNotRecovering::react(const RequestRecoveryPrio
&evt
)
5454 // fall back to a local reckoning of priority of primary doesn't pass one
5455 // (pre-mimic compat)
5456 int prio
= evt
.priority
? evt
.priority
: ps
->get_recovery_priority();
5458 PGPeeringEventURef preempt
;
5459 if (HAVE_FEATURE(ps
->upacting_features
, RECOVERY_RESERVATION_2
)) {
5460 // older peers can't handle this
5461 preempt
= std::make_unique
<PGPeeringEvent
>(
5462 ps
->get_osdmap_epoch(),
5463 ps
->get_osdmap_epoch(),
5464 RemoteRecoveryPreempted());
5467 pl
->request_remote_recovery_reservation(
5469 std::make_unique
<PGPeeringEvent
>(
5470 ps
->get_osdmap_epoch(),
5471 ps
->get_osdmap_epoch(),
5472 RemoteRecoveryReserved()),
5473 std::move(preempt
));
5474 return transit
<RepWaitRecoveryReserved
>();
5477 void PeeringState::RepWaitBackfillReserved::exit()
5479 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5481 utime_t dur
= ceph_clock_now() - enter_time
;
5482 pl
->get_peering_perf().tinc(rs_repwaitbackfillreserved_latency
, dur
);
5485 boost::statechart::result
5486 PeeringState::RepWaitBackfillReserved::react(const RemoteBackfillReserved
&evt
)
5491 pl
->send_cluster_message(
5493 make_message
<MBackfillReserve
>(
5494 MBackfillReserve::GRANT
,
5495 spg_t(ps
->info
.pgid
.pgid
, ps
->primary
.shard
),
5496 ps
->get_osdmap_epoch()),
5497 ps
->get_osdmap_epoch());
5498 return transit
<RepRecovering
>();
5501 boost::statechart::result
5502 PeeringState::RepWaitBackfillReserved::react(
5503 const RejectTooFullRemoteReservation
&evt
)
5506 ps
->reject_reservation();
5507 post_event(RemoteReservationRejectedTooFull());
5508 return discard_event();
5511 boost::statechart::result
5512 PeeringState::RepWaitBackfillReserved::react(
5513 const RemoteReservationRejectedTooFull
&evt
)
5516 pl
->unreserve_recovery_space();
5518 pl
->cancel_remote_recovery_reservation();
5519 return transit
<RepNotRecovering
>();
5522 boost::statechart::result
5523 PeeringState::RepWaitBackfillReserved::react(
5524 const RemoteReservationCanceled
&evt
)
5527 pl
->unreserve_recovery_space();
5529 pl
->cancel_remote_recovery_reservation();
5530 return transit
<RepNotRecovering
>();
5533 /*---RepRecovering-------*/
5534 PeeringState::RepRecovering::RepRecovering(my_context ctx
)
5536 NamedState(context
< PeeringMachine
>().state_history
, "Started/ReplicaActive/RepRecovering")
5538 context
< PeeringMachine
>().log_enter(state_name
);
5541 boost::statechart::result
5542 PeeringState::RepRecovering::react(const RemoteRecoveryPreempted
&)
5547 pl
->unreserve_recovery_space();
5548 pl
->send_cluster_message(
5550 make_message
<MRecoveryReserve
>(
5551 MRecoveryReserve::REVOKE
,
5552 spg_t(ps
->info
.pgid
.pgid
, ps
->primary
.shard
),
5553 ps
->get_osdmap_epoch()),
5554 ps
->get_osdmap_epoch());
5555 return discard_event();
5558 boost::statechart::result
5559 PeeringState::RepRecovering::react(const BackfillTooFull
&)
5564 pl
->unreserve_recovery_space();
5565 pl
->send_cluster_message(
5567 make_message
<MBackfillReserve
>(
5568 MBackfillReserve::REVOKE_TOOFULL
,
5569 spg_t(ps
->info
.pgid
.pgid
, ps
->primary
.shard
),
5570 ps
->get_osdmap_epoch()),
5571 ps
->get_osdmap_epoch());
5572 return discard_event();
5575 boost::statechart::result
5576 PeeringState::RepRecovering::react(const RemoteBackfillPreempted
&)
5581 pl
->unreserve_recovery_space();
5582 pl
->send_cluster_message(
5584 make_message
<MBackfillReserve
>(
5585 MBackfillReserve::REVOKE
,
5586 spg_t(ps
->info
.pgid
.pgid
, ps
->primary
.shard
),
5587 ps
->get_osdmap_epoch()),
5588 ps
->get_osdmap_epoch());
5589 return discard_event();
5592 void PeeringState::RepRecovering::exit()
5594 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5596 pl
->unreserve_recovery_space();
5598 pl
->cancel_remote_recovery_reservation();
5599 utime_t dur
= ceph_clock_now() - enter_time
;
5600 pl
->get_peering_perf().tinc(rs_reprecovering_latency
, dur
);
5603 /*------Activating--------*/
5604 PeeringState::Activating::Activating(my_context ctx
)
5606 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/Activating")
5608 context
< PeeringMachine
>().log_enter(state_name
);
5611 void PeeringState::Activating::exit()
5613 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5615 utime_t dur
= ceph_clock_now() - enter_time
;
5616 pl
->get_peering_perf().tinc(rs_activating_latency
, dur
);
5619 PeeringState::WaitLocalRecoveryReserved::WaitLocalRecoveryReserved(my_context ctx
)
5621 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/WaitLocalRecoveryReserved")
5623 context
< PeeringMachine
>().log_enter(state_name
);
5626 // Make sure all nodes that part of the recovery aren't full
5627 if (!ps
->cct
->_conf
->osd_debug_skip_full_check_in_recovery
&&
5628 ps
->get_osdmap()->check_full(ps
->acting_recovery_backfill
)) {
5629 post_event(RecoveryTooFull());
5633 ps
->state_clear(PG_STATE_RECOVERY_TOOFULL
);
5634 ps
->state_set(PG_STATE_RECOVERY_WAIT
);
5635 pl
->request_local_background_io_reservation(
5636 ps
->get_recovery_priority(),
5637 std::make_unique
<PGPeeringEvent
>(
5638 ps
->get_osdmap_epoch(),
5639 ps
->get_osdmap_epoch(),
5640 LocalRecoveryReserved()),
5641 std::make_unique
<PGPeeringEvent
>(
5642 ps
->get_osdmap_epoch(),
5643 ps
->get_osdmap_epoch(),
5644 DeferRecovery(0.0)));
5645 pl
->publish_stats_to_osd();
5648 boost::statechart::result
5649 PeeringState::WaitLocalRecoveryReserved::react(const RecoveryTooFull
&evt
)
5652 ps
->state_set(PG_STATE_RECOVERY_TOOFULL
);
5653 pl
->schedule_event_after(
5654 std::make_shared
<PGPeeringEvent
>(
5655 ps
->get_osdmap_epoch(),
5656 ps
->get_osdmap_epoch(),
5658 ps
->cct
->_conf
->osd_recovery_retry_interval
);
5659 return transit
<NotRecovering
>();
5662 void PeeringState::WaitLocalRecoveryReserved::exit()
5664 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5666 utime_t dur
= ceph_clock_now() - enter_time
;
5667 pl
->get_peering_perf().tinc(rs_waitlocalrecoveryreserved_latency
, dur
);
5670 PeeringState::WaitRemoteRecoveryReserved::WaitRemoteRecoveryReserved(my_context ctx
)
5672 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/WaitRemoteRecoveryReserved"),
5673 remote_recovery_reservation_it(context
< Active
>().remote_shards_to_reserve_recovery
.begin())
5675 context
< PeeringMachine
>().log_enter(state_name
);
5676 post_event(RemoteRecoveryReserved());
5679 boost::statechart::result
5680 PeeringState::WaitRemoteRecoveryReserved::react(const RemoteRecoveryReserved
&evt
) {
5683 if (remote_recovery_reservation_it
!=
5684 context
< Active
>().remote_shards_to_reserve_recovery
.end()) {
5685 ceph_assert(*remote_recovery_reservation_it
!= ps
->pg_whoami
);
5686 pl
->send_cluster_message(
5687 remote_recovery_reservation_it
->osd
,
5688 make_message
<MRecoveryReserve
>(
5689 MRecoveryReserve::REQUEST
,
5690 spg_t(context
< PeeringMachine
>().spgid
.pgid
,
5691 remote_recovery_reservation_it
->shard
),
5692 ps
->get_osdmap_epoch(),
5693 ps
->get_recovery_priority()),
5694 ps
->get_osdmap_epoch());
5695 ++remote_recovery_reservation_it
;
5697 post_event(AllRemotesReserved());
5699 return discard_event();
5702 void PeeringState::WaitRemoteRecoveryReserved::exit()
5704 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5706 utime_t dur
= ceph_clock_now() - enter_time
;
5707 pl
->get_peering_perf().tinc(rs_waitremoterecoveryreserved_latency
, dur
);
5710 PeeringState::Recovering::Recovering(my_context ctx
)
5712 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/Recovering")
5714 context
< PeeringMachine
>().log_enter(state_name
);
5717 ps
->state_clear(PG_STATE_RECOVERY_WAIT
);
5718 ps
->state_clear(PG_STATE_RECOVERY_TOOFULL
);
5719 ps
->state_set(PG_STATE_RECOVERING
);
5720 pl
->on_recovery_reserved();
5721 ceph_assert(!ps
->state_test(PG_STATE_ACTIVATING
));
5722 pl
->publish_stats_to_osd();
5725 void PeeringState::Recovering::release_reservations(bool cancel
)
5728 ceph_assert(cancel
|| !ps
->pg_log
.get_missing().have_missing());
5730 // release remote reservations
5731 for (auto i
= context
< Active
>().remote_shards_to_reserve_recovery
.begin();
5732 i
!= context
< Active
>().remote_shards_to_reserve_recovery
.end();
5734 if (*i
== ps
->pg_whoami
) // skip myself
5736 pl
->send_cluster_message(
5738 make_message
<MRecoveryReserve
>(
5739 MRecoveryReserve::RELEASE
,
5740 spg_t(ps
->info
.pgid
.pgid
, i
->shard
),
5741 ps
->get_osdmap_epoch()),
5742 ps
->get_osdmap_epoch());
5746 boost::statechart::result
5747 PeeringState::Recovering::react(const AllReplicasRecovered
&evt
)
5750 ps
->state_clear(PG_STATE_FORCED_RECOVERY
);
5751 release_reservations();
5752 pl
->cancel_local_background_io_reservation();
5753 return transit
<Recovered
>();
5756 boost::statechart::result
5757 PeeringState::Recovering::react(const RequestBackfill
&evt
)
5761 release_reservations();
5763 ps
->state_clear(PG_STATE_FORCED_RECOVERY
);
5764 pl
->cancel_local_background_io_reservation();
5765 pl
->publish_stats_to_osd();
5766 // transit any async_recovery_targets back into acting
5767 // so pg won't have to stay undersized for long
5768 // as backfill might take a long time to complete..
5769 if (!ps
->async_recovery_targets
.empty()) {
5770 pg_shard_t auth_log_shard
;
5771 bool history_les_bound
= false;
5772 // FIXME: Uh-oh we have to check this return value; choose_acting can fail!
5773 ps
->choose_acting(auth_log_shard
, true, &history_les_bound
);
5775 return transit
<WaitLocalBackfillReserved
>();
5778 boost::statechart::result
5779 PeeringState::Recovering::react(const DeferRecovery
&evt
)
5782 if (!ps
->state_test(PG_STATE_RECOVERING
)) {
5783 // we may have finished recovery and have an AllReplicasRecovered
5784 // event queued to move us to the next state.
5785 psdout(10) << "got defer recovery but not recovering" << dendl
;
5786 return discard_event();
5788 psdout(10) << "defer recovery, retry delay " << evt
.delay
<< dendl
;
5789 ps
->state_set(PG_STATE_RECOVERY_WAIT
);
5790 pl
->cancel_local_background_io_reservation();
5791 release_reservations(true);
5792 pl
->schedule_event_after(
5793 std::make_shared
<PGPeeringEvent
>(
5794 ps
->get_osdmap_epoch(),
5795 ps
->get_osdmap_epoch(),
5798 return transit
<NotRecovering
>();
5801 boost::statechart::result
5802 PeeringState::Recovering::react(const UnfoundRecovery
&evt
)
5805 psdout(10) << "recovery has unfound, can't continue" << dendl
;
5806 ps
->state_set(PG_STATE_RECOVERY_UNFOUND
);
5807 pl
->cancel_local_background_io_reservation();
5808 release_reservations(true);
5809 return transit
<NotRecovering
>();
5812 void PeeringState::Recovering::exit()
5814 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5817 utime_t dur
= ceph_clock_now() - enter_time
;
5818 ps
->state_clear(PG_STATE_RECOVERING
);
5819 pl
->get_peering_perf().tinc(rs_recovering_latency
, dur
);
5822 PeeringState::Recovered::Recovered(my_context ctx
)
5824 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/Recovered")
5826 pg_shard_t auth_log_shard
;
5828 context
< PeeringMachine
>().log_enter(state_name
);
5832 ceph_assert(!ps
->needs_recovery());
5834 // if we finished backfill, all acting are active; recheck if
5835 // DEGRADED | UNDERSIZED is appropriate.
5836 ceph_assert(!ps
->acting_recovery_backfill
.empty());
5837 if (ps
->get_osdmap()->get_pg_size(context
< PeeringMachine
>().spgid
.pgid
) <=
5838 ps
->acting_recovery_backfill
.size()) {
5839 ps
->state_clear(PG_STATE_FORCED_BACKFILL
| PG_STATE_FORCED_RECOVERY
);
5840 pl
->publish_stats_to_osd();
5843 // adjust acting set? (e.g. because backfill completed...)
5844 bool history_les_bound
= false;
5845 if (ps
->acting
!= ps
->up
&& !ps
->choose_acting(auth_log_shard
,
5846 true, &history_les_bound
)) {
5847 ceph_assert(ps
->want_acting
.size());
5848 } else if (!ps
->async_recovery_targets
.empty()) {
5849 // FIXME: Uh-oh we have to check this return value; choose_acting can fail!
5850 ps
->choose_acting(auth_log_shard
, true, &history_les_bound
);
5853 if (context
< Active
>().all_replicas_activated
&&
5854 ps
->async_recovery_targets
.empty())
5855 post_event(GoClean());
5858 void PeeringState::Recovered::exit()
5860 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5863 utime_t dur
= ceph_clock_now() - enter_time
;
5864 pl
->get_peering_perf().tinc(rs_recovered_latency
, dur
);
5867 PeeringState::Clean::Clean(my_context ctx
)
5869 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active/Clean")
5871 context
< PeeringMachine
>().log_enter(state_name
);
5875 if (ps
->info
.last_complete
!= ps
->info
.last_update
) {
5880 ps
->try_mark_clean();
5882 context
< PeeringMachine
>().get_cur_transaction().register_on_commit(
5886 void PeeringState::Clean::exit()
5888 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
5891 ps
->state_clear(PG_STATE_CLEAN
);
5892 utime_t dur
= ceph_clock_now() - enter_time
;
5893 pl
->get_peering_perf().tinc(rs_clean_latency
, dur
);
5896 template <typename T
>
5897 set
<pg_shard_t
> unique_osd_shard_set(const pg_shard_t
& skip
, const T
&in
)
5899 set
<int> osds_found
;
5900 set
<pg_shard_t
> out
;
5901 for (auto i
= in
.begin(); i
!= in
.end(); ++i
) {
5902 if (*i
!= skip
&& !osds_found
.count(i
->osd
)) {
5903 osds_found
.insert(i
->osd
);
5910 /*---------Active---------*/
5911 PeeringState::Active::Active(my_context ctx
)
5913 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Active"),
5914 remote_shards_to_reserve_recovery(
5915 unique_osd_shard_set(
5916 context
< PeeringMachine
>().state
->pg_whoami
,
5917 context
< PeeringMachine
>().state
->acting_recovery_backfill
)),
5918 remote_shards_to_reserve_backfill(
5919 unique_osd_shard_set(
5920 context
< PeeringMachine
>().state
->pg_whoami
,
5921 context
< PeeringMachine
>().state
->backfill_targets
)),
5922 all_replicas_activated(false)
5924 context
< PeeringMachine
>().log_enter(state_name
);
5929 ceph_assert(!ps
->backfill_reserved
);
5930 ceph_assert(ps
->is_primary());
5931 psdout(10) << "In Active, about to call activate" << dendl
;
5932 ps
->start_flush(context
< PeeringMachine
>().get_cur_transaction());
5933 ps
->activate(context
< PeeringMachine
>().get_cur_transaction(),
5934 ps
->get_osdmap_epoch(),
5935 context
< PeeringMachine
>().get_recovery_ctx());
5937 // everyone has to commit/ack before we are truly active
5938 ps
->blocked_by
.clear();
5939 for (auto p
= ps
->acting_recovery_backfill
.begin();
5940 p
!= ps
->acting_recovery_backfill
.end();
5942 if (p
->shard
!= ps
->pg_whoami
.shard
) {
5943 ps
->blocked_by
.insert(p
->shard
);
5946 pl
->publish_stats_to_osd();
5947 psdout(10) << "Activate Finished" << dendl
;
5950 boost::statechart::result
PeeringState::Active::react(const AdvMap
& advmap
)
5954 if (ps
->should_restart_peering(
5956 advmap
.acting_primary
,
5961 psdout(10) << "Active advmap interval change, fast return" << dendl
;
5962 return forward_event();
5964 psdout(10) << "Active advmap" << dendl
;
5965 bool need_publish
= false;
5967 pl
->on_active_advmap(advmap
.osdmap
);
5968 if (ps
->dirty_big_info
) {
5969 // share updated purged_snaps to mgr/mon so that we (a) stop reporting
5970 // purged snaps and (b) perhaps share more snaps that we have purged
5971 // but didn't fit in pg_stat_t.
5972 need_publish
= true;
5973 ps
->share_pg_info();
5976 bool need_acting_change
= false;
5977 for (size_t i
= 0; i
< ps
->want_acting
.size(); i
++) {
5978 int osd
= ps
->want_acting
[i
];
5979 if (!advmap
.osdmap
->is_up(osd
)) {
5980 pg_shard_t
osd_with_shard(osd
, shard_id_t(i
));
5981 if (!ps
->is_acting(osd_with_shard
) && !ps
->is_up(osd_with_shard
)) {
5982 psdout(10) << "Active stray osd." << osd
<< " in want_acting is down"
5984 need_acting_change
= true;
5988 if (need_acting_change
) {
5989 psdout(10) << "Active need acting change, call choose_acting again"
5991 // possibly because we re-add some strays into the acting set and
5992 // some of them then go down in a subsequent map before we could see
5993 // the map changing the pg temp.
5994 // call choose_acting again to clear them out.
5995 // note that we leave restrict_to_up_acting to false in order to
5996 // not overkill any chosen stray that is still alive.
5997 pg_shard_t auth_log_shard
;
5998 bool history_les_bound
= false;
5999 ps
->remove_down_peer_info(advmap
.osdmap
);
6000 ps
->choose_acting(auth_log_shard
, false, &history_les_bound
, true);
6003 /* Check for changes in pool size (if the acting set changed as a result,
6004 * this does not matter) */
6005 if (advmap
.lastmap
->get_pg_size(ps
->info
.pgid
.pgid
) !=
6006 ps
->get_osdmap()->get_pg_size(ps
->info
.pgid
.pgid
)) {
6007 if (ps
->get_osdmap()->get_pg_size(ps
->info
.pgid
.pgid
) <=
6008 ps
->actingset
.size()) {
6009 ps
->state_clear(PG_STATE_UNDERSIZED
);
6011 ps
->state_set(PG_STATE_UNDERSIZED
);
6013 // degraded changes will be detected by call from publish_stats_to_osd()
6014 need_publish
= true;
6017 // if we haven't reported our PG stats in a long time, do so now.
6018 if (ps
->info
.stats
.reported_epoch
+ ps
->cct
->_conf
->osd_pg_stat_report_interval_max
< advmap
.osdmap
->get_epoch()) {
6019 psdout(20) << "reporting stats to osd after " << (advmap
.osdmap
->get_epoch() - ps
->info
.stats
.reported_epoch
)
6020 << " epochs" << dendl
;
6021 need_publish
= true;
6025 pl
->publish_stats_to_osd();
6027 if (ps
->check_prior_readable_down_osds(advmap
.osdmap
)) {
6028 pl
->recheck_readable();
6031 return forward_event();
6034 boost::statechart::result
PeeringState::Active::react(const ActMap
&)
6037 psdout(10) << "Active: handling ActMap" << dendl
;
6038 ceph_assert(ps
->is_primary());
6040 pl
->on_active_actmap();
6042 if (ps
->have_unfound()) {
6043 // object may have become unfound
6044 ps
->discover_all_missing(context
<PeeringMachine
>().get_recovery_ctx().msgs
);
6047 uint64_t unfound
= ps
->missing_loc
.num_unfound();
6049 ps
->all_unfound_are_queried_or_lost(ps
->get_osdmap())) {
6050 if (ps
->cct
->_conf
->osd_auto_mark_unfound_lost
) {
6051 pl
->get_clog_error() << context
< PeeringMachine
>().spgid
.pgid
<< " has " << unfound
6052 << " objects unfound and apparently lost, would automatically "
6053 << "mark these objects lost but this feature is not yet implemented "
6054 << "(osd_auto_mark_unfound_lost)";
6056 pl
->get_clog_error() << context
< PeeringMachine
>().spgid
.pgid
<< " has "
6057 << unfound
<< " objects unfound and apparently lost";
6060 return forward_event();
6063 boost::statechart::result
PeeringState::Active::react(const MNotifyRec
& notevt
)
6067 ceph_assert(ps
->is_primary());
6068 if (ps
->peer_info
.count(notevt
.from
)) {
6069 psdout(10) << "Active: got notify from " << notevt
.from
6070 << ", already have info from that osd, ignoring"
6072 } else if (ps
->peer_purged
.count(notevt
.from
)) {
6073 psdout(10) << "Active: got notify from " << notevt
.from
6074 << ", already purged that peer, ignoring"
6077 psdout(10) << "Active: got notify from " << notevt
.from
6078 << ", calling proc_replica_info and discover_all_missing"
6080 ps
->proc_replica_info(
6081 notevt
.from
, notevt
.notify
.info
, notevt
.notify
.epoch_sent
);
6082 if (ps
->have_unfound() || (ps
->is_degraded() && ps
->might_have_unfound
.count(notevt
.from
))) {
6083 ps
->discover_all_missing(
6084 context
<PeeringMachine
>().get_recovery_ctx().msgs
);
6086 // check if it is a previous down acting member that's coming back.
6087 // if so, request pg_temp change to trigger a new interval transition
6088 pg_shard_t auth_log_shard
;
6089 bool history_les_bound
= false;
6090 // FIXME: Uh-oh we have to check this return value; choose_acting can fail!
6091 ps
->choose_acting(auth_log_shard
, false, &history_les_bound
, true);
6092 if (!ps
->want_acting
.empty() && ps
->want_acting
!= ps
->acting
) {
6093 psdout(10) << "Active: got notify from previous acting member "
6094 << notevt
.from
<< ", requesting pg_temp change"
6098 return discard_event();
6101 boost::statechart::result
PeeringState::Active::react(const MTrim
& trim
)
6104 ceph_assert(ps
->is_primary());
6106 // peer is informing us of their last_complete_ondisk
6107 ldout(ps
->cct
,10) << " replica osd." << trim
.from
<< " lcod " << trim
.trim_to
<< dendl
;
6108 ps
->update_peer_last_complete_ondisk(pg_shard_t
{trim
.from
, trim
.shard
},
6110 // trim log when the pg is recovered
6111 ps
->calc_min_last_complete_ondisk();
6112 return discard_event();
6115 boost::statechart::result
PeeringState::Active::react(const MInfoRec
& infoevt
)
6118 ceph_assert(ps
->is_primary());
6120 ceph_assert(!ps
->acting_recovery_backfill
.empty());
6121 if (infoevt
.lease_ack
) {
6122 ps
->proc_lease_ack(infoevt
.from
.osd
, *infoevt
.lease_ack
);
6124 // don't update history (yet) if we are active and primary; the replica
6125 // may be telling us they have activated (and committed) but we can't
6126 // share that until _everyone_ does the same.
6127 if (ps
->is_acting_recovery_backfill(infoevt
.from
) &&
6128 ps
->peer_activated
.count(infoevt
.from
) == 0) {
6129 psdout(10) << " peer osd." << infoevt
.from
6130 << " activated and committed" << dendl
;
6131 ps
->peer_activated
.insert(infoevt
.from
);
6132 ps
->blocked_by
.erase(infoevt
.from
.shard
);
6133 pl
->publish_stats_to_osd();
6134 if (ps
->peer_activated
.size() == ps
->acting_recovery_backfill
.size()) {
6135 all_activated_and_committed();
6138 return discard_event();
6141 boost::statechart::result
PeeringState::Active::react(const MLogRec
& logevt
)
6144 psdout(10) << "searching osd." << logevt
.from
6145 << " log for unfound items" << dendl
;
6146 ps
->proc_replica_log(
6147 logevt
.msg
->info
, logevt
.msg
->log
, std::move(logevt
.msg
->missing
), logevt
.from
);
6148 bool got_missing
= ps
->search_for_missing(
6149 ps
->peer_info
[logevt
.from
],
6150 ps
->peer_missing
[logevt
.from
],
6152 context
< PeeringMachine
>().get_recovery_ctx());
6153 // If there are missing AND we are "fully" active then start recovery now
6154 if (got_missing
&& ps
->state_test(PG_STATE_ACTIVE
)) {
6155 post_event(DoRecovery());
6157 return discard_event();
6160 boost::statechart::result
PeeringState::Active::react(const QueryState
& q
)
6164 q
.f
->open_object_section("state");
6165 q
.f
->dump_string("name", state_name
);
6166 q
.f
->dump_stream("enter_time") << enter_time
;
6169 q
.f
->open_array_section("might_have_unfound");
6170 for (auto p
= ps
->might_have_unfound
.begin();
6171 p
!= ps
->might_have_unfound
.end();
6173 q
.f
->open_object_section("osd");
6174 q
.f
->dump_stream("osd") << *p
;
6175 if (ps
->peer_missing
.count(*p
)) {
6176 q
.f
->dump_string("status", "already probed");
6177 } else if (ps
->peer_missing_requested
.count(*p
)) {
6178 q
.f
->dump_string("status", "querying");
6179 } else if (!ps
->get_osdmap()->is_up(p
->osd
)) {
6180 q
.f
->dump_string("status", "osd is down");
6182 q
.f
->dump_string("status", "not queried");
6184 q
.f
->close_section();
6186 q
.f
->close_section();
6189 q
.f
->open_object_section("recovery_progress");
6190 q
.f
->open_array_section("backfill_targets");
6191 for (auto p
= ps
->backfill_targets
.begin();
6192 p
!= ps
->backfill_targets
.end(); ++p
)
6193 q
.f
->dump_stream("replica") << *p
;
6194 q
.f
->close_section();
6195 pl
->dump_recovery_info(q
.f
);
6196 q
.f
->close_section();
6199 q
.f
->close_section();
6200 return forward_event();
6203 boost::statechart::result
PeeringState::Active::react(const QueryUnfound
& q
)
6207 ps
->query_unfound(q
.f
, "Active");
6208 return discard_event();
6211 boost::statechart::result
PeeringState::Active::react(
6212 const ActivateCommitted
&evt
)
6215 ceph_assert(!ps
->peer_activated
.count(ps
->pg_whoami
));
6216 ps
->peer_activated
.insert(ps
->pg_whoami
);
6217 psdout(10) << "_activate_committed " << evt
.epoch
6218 << " peer_activated now " << ps
->peer_activated
6219 << " last_interval_started "
6220 << ps
->info
.history
.last_interval_started
6221 << " last_epoch_started "
6222 << ps
->info
.history
.last_epoch_started
6223 << " same_interval_since "
6224 << ps
->info
.history
.same_interval_since
6226 ceph_assert(!ps
->acting_recovery_backfill
.empty());
6227 if (ps
->peer_activated
.size() == ps
->acting_recovery_backfill
.size())
6228 all_activated_and_committed();
6229 return discard_event();
6232 boost::statechart::result
PeeringState::Active::react(const AllReplicasActivated
&evt
)
6236 pg_t pgid
= context
< PeeringMachine
>().spgid
.pgid
;
6238 all_replicas_activated
= true;
6240 ps
->state_clear(PG_STATE_ACTIVATING
);
6241 ps
->state_clear(PG_STATE_CREATING
);
6242 ps
->state_clear(PG_STATE_PREMERGE
);
6245 if (ps
->pool
.info
.is_pending_merge(pgid
, &merge_target
)) {
6246 ps
->state_set(PG_STATE_PEERED
);
6247 ps
->state_set(PG_STATE_PREMERGE
);
6249 if (ps
->actingset
.size() != ps
->get_osdmap()->get_pg_size(pgid
)) {
6252 src
.set_ps(ps
->pool
.info
.get_pg_num_pending());
6253 assert(src
.get_parent() == pgid
);
6254 pl
->set_not_ready_to_merge_target(pgid
, src
);
6256 pl
->set_not_ready_to_merge_source(pgid
);
6259 } else if (!ps
->acting_set_writeable()) {
6260 ps
->state_set(PG_STATE_PEERED
);
6262 ps
->state_set(PG_STATE_ACTIVE
);
6265 auto mnow
= pl
->get_mnow();
6266 if (ps
->prior_readable_until_ub
> mnow
) {
6267 psdout(10) << " waiting for prior_readable_until_ub "
6268 << ps
->prior_readable_until_ub
<< " > mnow " << mnow
<< dendl
;
6269 ps
->state_set(PG_STATE_WAIT
);
6270 pl
->queue_check_readable(
6271 ps
->last_peering_reset
,
6272 ps
->prior_readable_until_ub
- mnow
);
6274 psdout(10) << " mnow " << mnow
<< " >= prior_readable_until_ub "
6275 << ps
->prior_readable_until_ub
<< dendl
;
6278 if (ps
->pool
.info
.has_flag(pg_pool_t::FLAG_CREATING
)) {
6279 pl
->send_pg_created(pgid
);
6282 ps
->info
.history
.last_epoch_started
= ps
->info
.last_epoch_started
;
6283 ps
->info
.history
.last_interval_started
= ps
->info
.last_interval_started
;
6284 ps
->dirty_info
= true;
6286 ps
->share_pg_info();
6287 pl
->publish_stats_to_osd();
6289 pl
->on_activate_complete();
6291 return discard_event();
6294 boost::statechart::result
PeeringState::Active::react(const RenewLease
& rl
)
6297 ps
->proc_renew_lease();
6298 return discard_event();
6301 boost::statechart::result
PeeringState::Active::react(const MLeaseAck
& la
)
6304 ps
->proc_lease_ack(la
.from
, la
.lease_ack
);
6305 return discard_event();
6309 boost::statechart::result
PeeringState::Active::react(const CheckReadable
&evt
)
6312 pl
->recheck_readable();
6313 return discard_event();
6317 * update info.history.last_epoch_started ONLY after we and all
6318 * replicas have activated AND committed the activate transaction
6319 * (i.e. the peering results are stable on disk).
6321 void PeeringState::Active::all_activated_and_committed()
6324 psdout(10) << "all_activated_and_committed" << dendl
;
6325 ceph_assert(ps
->is_primary());
6326 ceph_assert(ps
->peer_activated
.size() == ps
->acting_recovery_backfill
.size());
6327 ceph_assert(!ps
->acting_recovery_backfill
.empty());
6328 ceph_assert(ps
->blocked_by
.empty());
6330 if (HAVE_FEATURE(ps
->upacting_features
, SERVER_OCTOPUS
)) {
6331 // this is overkill when the activation is quick, but when it is slow it
6332 // is important, because the lease was renewed by the activate itself but we
6333 // don't know how long ago that was, and simply scheduling now may leave
6334 // a gap in lease coverage. keep it simple and aggressively renew.
6335 ps
->renew_lease(pl
->get_mnow());
6337 ps
->schedule_renew_lease();
6341 ps
->update_calc_stats();
6342 if (ps
->info
.stats
.stats
.sum
.num_objects_degraded
) {
6343 ps
->state_set(PG_STATE_DEGRADED
);
6345 ps
->state_clear(PG_STATE_DEGRADED
);
6348 post_event(PeeringState::AllReplicasActivated());
6352 void PeeringState::Active::exit()
6354 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
6358 pl
->cancel_local_background_io_reservation();
6360 ps
->blocked_by
.clear();
6361 ps
->backfill_reserved
= false;
6362 ps
->state_clear(PG_STATE_ACTIVATING
);
6363 ps
->state_clear(PG_STATE_DEGRADED
);
6364 ps
->state_clear(PG_STATE_UNDERSIZED
);
6365 ps
->state_clear(PG_STATE_BACKFILL_TOOFULL
);
6366 ps
->state_clear(PG_STATE_BACKFILL_WAIT
);
6367 ps
->state_clear(PG_STATE_RECOVERY_WAIT
);
6368 ps
->state_clear(PG_STATE_RECOVERY_TOOFULL
);
6369 utime_t dur
= ceph_clock_now() - enter_time
;
6370 pl
->get_peering_perf().tinc(rs_active_latency
, dur
);
6371 pl
->on_active_exit();
6374 /*------ReplicaActive-----*/
6375 PeeringState::ReplicaActive::ReplicaActive(my_context ctx
)
6377 NamedState(context
< PeeringMachine
>().state_history
, "Started/ReplicaActive")
6379 context
< PeeringMachine
>().log_enter(state_name
);
6382 ps
->start_flush(context
< PeeringMachine
>().get_cur_transaction());
6386 boost::statechart::result
PeeringState::ReplicaActive::react(
6387 const Activate
& actevt
) {
6389 psdout(10) << "In ReplicaActive, about to call activate" << dendl
;
6391 context
< PeeringMachine
>().get_cur_transaction(),
6392 actevt
.activation_epoch
,
6393 context
< PeeringMachine
>().get_recovery_ctx());
6394 psdout(10) << "Activate Finished" << dendl
;
6395 return discard_event();
6398 boost::statechart::result
PeeringState::ReplicaActive::react(
6399 const ActivateCommitted
&evt
)
6402 psdout(10) << __func__
<< " " << evt
.epoch
<< " telling primary" << dendl
;
6404 auto &rctx
= context
<PeeringMachine
>().get_recovery_ctx();
6405 auto epoch
= ps
->get_osdmap_epoch();
6406 pg_info_t i
= ps
->info
;
6407 i
.history
.last_epoch_started
= evt
.activation_epoch
;
6408 i
.history
.last_interval_started
= i
.history
.same_interval_since
;
6410 ps
->get_primary().osd
,
6411 spg_t(ps
->info
.pgid
.pgid
, ps
->get_primary().shard
),
6416 ps
->get_lease_ack());
6418 if (ps
->acting_set_writeable()) {
6419 ps
->state_set(PG_STATE_ACTIVE
);
6421 ps
->state_set(PG_STATE_PEERED
);
6423 pl
->on_activate_committed();
6425 return discard_event();
6428 boost::statechart::result
PeeringState::ReplicaActive::react(const MLease
& l
)
6431 spg_t spgid
= context
< PeeringMachine
>().spgid
;
6432 epoch_t epoch
= pl
->get_osdmap_epoch();
6434 ps
->proc_lease(l
.lease
);
6435 pl
->send_cluster_message(
6436 ps
->get_primary().osd
,
6437 make_message
<MOSDPGLeaseAck
>(epoch
,
6438 spg_t(spgid
.pgid
, ps
->get_primary().shard
),
6439 ps
->get_lease_ack()),
6441 return discard_event();
6444 boost::statechart::result
PeeringState::ReplicaActive::react(const MInfoRec
& infoevt
)
6447 ps
->proc_primary_info(context
<PeeringMachine
>().get_cur_transaction(),
6449 return discard_event();
6452 boost::statechart::result
PeeringState::ReplicaActive::react(const MLogRec
& logevt
)
6455 psdout(10) << "received log from " << logevt
.from
<< dendl
;
6456 ObjectStore::Transaction
&t
= context
<PeeringMachine
>().get_cur_transaction();
6457 ps
->merge_log(t
, logevt
.msg
->info
, std::move(logevt
.msg
->log
), logevt
.from
);
6458 ceph_assert(ps
->pg_log
.get_head() == ps
->info
.last_update
);
6459 if (logevt
.msg
->lease
) {
6460 ps
->proc_lease(*logevt
.msg
->lease
);
6463 return discard_event();
6466 boost::statechart::result
PeeringState::ReplicaActive::react(const MTrim
& trim
)
6469 // primary is instructing us to trim
6470 ps
->pg_log
.trim(trim
.trim_to
, ps
->info
);
6471 ps
->dirty_info
= true;
6472 return discard_event();
6475 boost::statechart::result
PeeringState::ReplicaActive::react(const ActMap
&)
6478 if (ps
->should_send_notify() && ps
->get_primary().osd
>= 0) {
6479 ps
->info
.history
.refresh_prior_readable_until_ub(
6480 pl
->get_mnow(), ps
->prior_readable_until_ub
);
6481 context
< PeeringMachine
>().send_notify(
6482 ps
->get_primary().osd
,
6484 ps
->get_primary().shard
, ps
->pg_whoami
.shard
,
6485 ps
->get_osdmap_epoch(),
6486 ps
->get_osdmap_epoch(),
6488 ps
->past_intervals
));
6490 return discard_event();
6493 boost::statechart::result
PeeringState::ReplicaActive::react(
6494 const MQuery
& query
)
6497 ps
->fulfill_query(query
, context
<PeeringMachine
>().get_recovery_ctx());
6498 return discard_event();
6501 boost::statechart::result
PeeringState::ReplicaActive::react(const QueryState
& q
)
6503 q
.f
->open_object_section("state");
6504 q
.f
->dump_string("name", state_name
);
6505 q
.f
->dump_stream("enter_time") << enter_time
;
6506 q
.f
->close_section();
6507 return forward_event();
6510 boost::statechart::result
PeeringState::ReplicaActive::react(const QueryUnfound
& q
)
6512 q
.f
->dump_string("state", "ReplicaActive");
6513 q
.f
->dump_bool("available_might_have_unfound", false);
6514 return discard_event();
6517 void PeeringState::ReplicaActive::exit()
6519 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
6521 pl
->unreserve_recovery_space();
6523 pl
->cancel_remote_recovery_reservation();
6524 utime_t dur
= ceph_clock_now() - enter_time
;
6525 pl
->get_peering_perf().tinc(rs_replicaactive_latency
, dur
);
6527 ps
->min_last_complete_ondisk
= eversion_t();
6531 PeeringState::Stray::Stray(my_context ctx
)
6533 NamedState(context
< PeeringMachine
>().state_history
, "Started/Stray")
6535 context
< PeeringMachine
>().log_enter(state_name
);
6539 ceph_assert(!ps
->is_peered());
6540 ceph_assert(!ps
->is_peering());
6541 ceph_assert(!ps
->is_primary());
6543 if (!ps
->get_osdmap()->have_pg_pool(ps
->info
.pgid
.pgid
.pool())) {
6544 ldout(ps
->cct
,10) << __func__
<< " pool is deleted" << dendl
;
6545 post_event(DeleteStart());
6547 ps
->start_flush(context
< PeeringMachine
>().get_cur_transaction());
6551 boost::statechart::result
PeeringState::Stray::react(const MLogRec
& logevt
)
6554 MOSDPGLog
*msg
= logevt
.msg
.get();
6555 psdout(10) << "got info+log from osd." << logevt
.from
<< " " << msg
->info
<< " " << msg
->log
<< dendl
;
6557 ObjectStore::Transaction
&t
= context
<PeeringMachine
>().get_cur_transaction();
6558 if (msg
->info
.last_backfill
== hobject_t()) {
6560 ps
->info
= msg
->info
;
6561 pl
->on_info_history_change();
6562 ps
->dirty_info
= true;
6563 ps
->dirty_big_info
= true; // maybe.
6565 PGLog::LogEntryHandlerRef rollbacker
{pl
->get_log_handler(t
)};
6566 ps
->pg_log
.reset_backfill_claim_log(msg
->log
, rollbacker
.get());
6568 ps
->pg_log
.reset_backfill();
6570 ps
->merge_log(t
, msg
->info
, std::move(msg
->log
), logevt
.from
);
6572 if (logevt
.msg
->lease
) {
6573 ps
->proc_lease(*logevt
.msg
->lease
);
6576 ceph_assert(ps
->pg_log
.get_head() == ps
->info
.last_update
);
6578 post_event(Activate(logevt
.msg
->info
.last_epoch_started
));
6579 return transit
<ReplicaActive
>();
6582 boost::statechart::result
PeeringState::Stray::react(const MInfoRec
& infoevt
)
6585 psdout(10) << "got info from osd." << infoevt
.from
<< " " << infoevt
.info
<< dendl
;
6587 if (ps
->info
.last_update
> infoevt
.info
.last_update
) {
6588 // rewind divergent log entries
6589 ObjectStore::Transaction
&t
= context
<PeeringMachine
>().get_cur_transaction();
6590 ps
->rewind_divergent_log(t
, infoevt
.info
.last_update
);
6591 ps
->info
.stats
= infoevt
.info
.stats
;
6592 ps
->info
.hit_set
= infoevt
.info
.hit_set
;
6595 if (infoevt
.lease
) {
6596 ps
->proc_lease(*infoevt
.lease
);
6599 ceph_assert(infoevt
.info
.last_update
== ps
->info
.last_update
);
6600 ceph_assert(ps
->pg_log
.get_head() == ps
->info
.last_update
);
6602 post_event(Activate(infoevt
.info
.last_epoch_started
));
6603 return transit
<ReplicaActive
>();
6606 boost::statechart::result
PeeringState::Stray::react(const MQuery
& query
)
6609 ps
->fulfill_query(query
, context
<PeeringMachine
>().get_recovery_ctx());
6610 return discard_event();
6613 boost::statechart::result
PeeringState::Stray::react(const ActMap
&)
6616 if (ps
->should_send_notify() && ps
->get_primary().osd
>= 0) {
6617 ps
->info
.history
.refresh_prior_readable_until_ub(
6618 pl
->get_mnow(), ps
->prior_readable_until_ub
);
6619 context
< PeeringMachine
>().send_notify(
6620 ps
->get_primary().osd
,
6622 ps
->get_primary().shard
, ps
->pg_whoami
.shard
,
6623 ps
->get_osdmap_epoch(),
6624 ps
->get_osdmap_epoch(),
6626 ps
->past_intervals
));
6628 return discard_event();
6631 void PeeringState::Stray::exit()
6633 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
6635 utime_t dur
= ceph_clock_now() - enter_time
;
6636 pl
->get_peering_perf().tinc(rs_stray_latency
, dur
);
6640 /*--------ToDelete----------*/
6641 PeeringState::ToDelete::ToDelete(my_context ctx
)
6643 NamedState(context
< PeeringMachine
>().state_history
, "Started/ToDelete")
6645 context
< PeeringMachine
>().log_enter(state_name
);
6647 pl
->get_perf_logger().inc(l_osd_pg_removing
);
6650 void PeeringState::ToDelete::exit()
6652 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
6654 // note: on a successful removal, this path doesn't execute. see
6656 pl
->get_perf_logger().dec(l_osd_pg_removing
);
6658 pl
->cancel_local_background_io_reservation();
6661 /*----WaitDeleteReserved----*/
6662 PeeringState::WaitDeleteReserved::WaitDeleteReserved(my_context ctx
)
6664 NamedState(context
< PeeringMachine
>().state_history
,
6665 "Started/ToDelete/WaitDeleteReseved")
6667 context
< PeeringMachine
>().log_enter(state_name
);
6669 context
< ToDelete
>().priority
= ps
->get_delete_priority();
6671 pl
->cancel_local_background_io_reservation();
6672 pl
->request_local_background_io_reservation(
6673 context
<ToDelete
>().priority
,
6674 std::make_unique
<PGPeeringEvent
>(
6675 ps
->get_osdmap_epoch(),
6676 ps
->get_osdmap_epoch(),
6678 std::make_unique
<PGPeeringEvent
>(
6679 ps
->get_osdmap_epoch(),
6680 ps
->get_osdmap_epoch(),
6681 DeleteInterrupted()));
6684 boost::statechart::result
PeeringState::ToDelete::react(
6688 if (ps
->get_delete_priority() != priority
) {
6689 psdout(10) << __func__
<< " delete priority changed, resetting"
6691 return transit
<ToDelete
>();
6693 return discard_event();
6696 void PeeringState::WaitDeleteReserved::exit()
6698 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
6701 /*----Deleting-----*/
6702 PeeringState::Deleting::Deleting(my_context ctx
)
6704 NamedState(context
< PeeringMachine
>().state_history
, "Started/ToDelete/Deleting")
6706 context
< PeeringMachine
>().log_enter(state_name
);
6709 ps
->deleting
= true;
6710 ObjectStore::Transaction
&t
= context
<PeeringMachine
>().get_cur_transaction();
6713 PGLog::LogEntryHandlerRef rollbacker
{pl
->get_log_handler(t
)};
6714 ps
->pg_log
.roll_forward(rollbacker
.get());
6716 // adjust info to backfill
6717 ps
->info
.set_last_backfill(hobject_t());
6718 ps
->pg_log
.reset_backfill();
6719 ps
->dirty_info
= true;
6724 boost::statechart::result
PeeringState::Deleting::react(
6725 const DeleteSome
& evt
)
6728 std::pair
<ghobject_t
, bool> p
;
6729 p
= pl
->do_delete_work(context
<PeeringMachine
>().get_cur_transaction(),
6732 return p
.second
? discard_event() : terminate();
6735 void PeeringState::Deleting::exit()
6737 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
6739 ps
->deleting
= false;
6740 pl
->cancel_local_background_io_reservation();
6743 /*--------GetInfo---------*/
6744 PeeringState::GetInfo::GetInfo(my_context ctx
)
6746 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Peering/GetInfo")
6748 context
< PeeringMachine
>().log_enter(state_name
);
6752 ps
->check_past_interval_bounds();
6753 ps
->log_weirdness();
6754 PastIntervals::PriorSet
&prior_set
= context
< Peering
>().prior_set
;
6756 ceph_assert(ps
->blocked_by
.empty());
6758 prior_set
= ps
->build_prior();
6759 ps
->prior_readable_down_osds
= prior_set
.down
;
6760 if (ps
->prior_readable_down_osds
.empty()) {
6761 psdout(10) << " no prior_set down osds, clearing prior_readable_until_ub"
6763 ps
->clear_prior_readable_until_ub();
6766 ps
->reset_min_peer_features();
6768 if (prior_set
.pg_down
) {
6769 post_event(IsDown());
6770 } else if (peer_info_requested
.empty()) {
6771 post_event(GotInfo());
6775 void PeeringState::GetInfo::get_infos()
6778 PastIntervals::PriorSet
&prior_set
= context
< Peering
>().prior_set
;
6780 ps
->blocked_by
.clear();
6781 for (auto it
= prior_set
.probe
.begin(); it
!= prior_set
.probe
.end(); ++it
) {
6782 pg_shard_t peer
= *it
;
6783 if (peer
== ps
->pg_whoami
) {
6786 if (ps
->peer_info
.count(peer
)) {
6787 psdout(10) << " have osd." << peer
<< " info " << ps
->peer_info
[peer
] << dendl
;
6790 if (peer_info_requested
.count(peer
)) {
6791 psdout(10) << " already requested info from osd." << peer
<< dendl
;
6792 ps
->blocked_by
.insert(peer
.osd
);
6793 } else if (!ps
->get_osdmap()->is_up(peer
.osd
)) {
6794 psdout(10) << " not querying info from down osd." << peer
<< dendl
;
6796 psdout(10) << " querying info from osd." << peer
<< dendl
;
6797 context
< PeeringMachine
>().send_query(
6799 pg_query_t(pg_query_t::INFO
,
6800 it
->shard
, ps
->pg_whoami
.shard
,
6802 ps
->get_osdmap_epoch()));
6803 peer_info_requested
.insert(peer
);
6804 ps
->blocked_by
.insert(peer
.osd
);
6808 ps
->check_prior_readable_down_osds(ps
->get_osdmap());
6810 pl
->publish_stats_to_osd();
6813 boost::statechart::result
PeeringState::GetInfo::react(const MNotifyRec
& infoevt
)
6818 auto p
= peer_info_requested
.find(infoevt
.from
);
6819 if (p
!= peer_info_requested
.end()) {
6820 peer_info_requested
.erase(p
);
6821 ps
->blocked_by
.erase(infoevt
.from
.osd
);
6824 epoch_t old_start
= ps
->info
.history
.last_epoch_started
;
6825 if (ps
->proc_replica_info(
6826 infoevt
.from
, infoevt
.notify
.info
, infoevt
.notify
.epoch_sent
)) {
6827 // we got something new ...
6828 PastIntervals::PriorSet
&prior_set
= context
< Peering
>().prior_set
;
6829 if (old_start
< ps
->info
.history
.last_epoch_started
) {
6830 psdout(10) << " last_epoch_started moved forward, rebuilding prior" << dendl
;
6831 prior_set
= ps
->build_prior();
6832 ps
->prior_readable_down_osds
= prior_set
.down
;
6834 // filter out any osds that got dropped from the probe set from
6835 // peer_info_requested. this is less expensive than restarting
6836 // peering (which would re-probe everyone).
6837 auto p
= peer_info_requested
.begin();
6838 while (p
!= peer_info_requested
.end()) {
6839 if (prior_set
.probe
.count(*p
) == 0) {
6840 psdout(20) << " dropping osd." << *p
<< " from info_requested, no longer in probe set" << dendl
;
6841 peer_info_requested
.erase(p
++);
6848 psdout(20) << "Adding osd: " << infoevt
.from
.osd
<< " peer features: "
6849 << hex
<< infoevt
.features
<< dec
<< dendl
;
6850 ps
->apply_peer_features(infoevt
.features
);
6852 // are we done getting everything?
6853 if (peer_info_requested
.empty() && !prior_set
.pg_down
) {
6854 psdout(20) << "Common peer features: " << hex
<< ps
->get_min_peer_features() << dec
<< dendl
;
6855 psdout(20) << "Common acting features: " << hex
<< ps
->get_min_acting_features() << dec
<< dendl
;
6856 psdout(20) << "Common upacting features: " << hex
<< ps
->get_min_upacting_features() << dec
<< dendl
;
6857 post_event(GotInfo());
6860 return discard_event();
6863 boost::statechart::result
PeeringState::GetInfo::react(const QueryState
& q
)
6866 q
.f
->open_object_section("state");
6867 q
.f
->dump_string("name", state_name
);
6868 q
.f
->dump_stream("enter_time") << enter_time
;
6870 q
.f
->open_array_section("requested_info_from");
6871 for (auto p
= peer_info_requested
.begin();
6872 p
!= peer_info_requested
.end();
6874 q
.f
->open_object_section("osd");
6875 q
.f
->dump_stream("osd") << *p
;
6876 if (ps
->peer_info
.count(*p
)) {
6877 q
.f
->open_object_section("got_info");
6878 ps
->peer_info
[*p
].dump(q
.f
);
6879 q
.f
->close_section();
6881 q
.f
->close_section();
6883 q
.f
->close_section();
6885 q
.f
->close_section();
6886 return forward_event();
6889 boost::statechart::result
PeeringState::GetInfo::react(const QueryUnfound
& q
)
6891 q
.f
->dump_string("state", "GetInfo");
6892 q
.f
->dump_bool("available_might_have_unfound", false);
6893 return discard_event();
6896 void PeeringState::GetInfo::exit()
6898 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
6901 utime_t dur
= ceph_clock_now() - enter_time
;
6902 pl
->get_peering_perf().tinc(rs_getinfo_latency
, dur
);
6903 ps
->blocked_by
.clear();
6906 /*------GetLog------------*/
6907 PeeringState::GetLog::GetLog(my_context ctx
)
6910 context
< PeeringMachine
>().state_history
,
6911 "Started/Primary/Peering/GetLog"),
6914 context
< PeeringMachine
>().log_enter(state_name
);
6918 ps
->log_weirdness();
6921 if (!ps
->choose_acting(auth_log_shard
, false,
6922 &context
< Peering
>().history_les_bound
)) {
6923 if (!ps
->want_acting
.empty()) {
6924 post_event(NeedActingChange());
6926 post_event(IsIncomplete());
6932 if (auth_log_shard
== ps
->pg_whoami
) {
6933 post_event(GotLog());
6937 const pg_info_t
& best
= ps
->peer_info
[auth_log_shard
];
6940 if (ps
->info
.last_update
< best
.log_tail
) {
6941 psdout(10) << " not contiguous with osd." << auth_log_shard
<< ", down" << dendl
;
6942 post_event(IsIncomplete());
6946 // how much log to request?
6947 eversion_t request_log_from
= ps
->info
.last_update
;
6948 ceph_assert(!ps
->acting_recovery_backfill
.empty());
6949 for (auto p
= ps
->acting_recovery_backfill
.begin();
6950 p
!= ps
->acting_recovery_backfill
.end();
6952 if (*p
== ps
->pg_whoami
) continue;
6953 pg_info_t
& ri
= ps
->peer_info
[*p
];
6954 if (ri
.last_update
< ps
->info
.log_tail
&& ri
.last_update
>= best
.log_tail
&&
6955 ri
.last_update
< request_log_from
)
6956 request_log_from
= ri
.last_update
;
6960 psdout(10) << " requesting log from osd." << auth_log_shard
<< dendl
;
6961 context
<PeeringMachine
>().send_query(
6965 auth_log_shard
.shard
, ps
->pg_whoami
.shard
,
6966 request_log_from
, ps
->info
.history
,
6967 ps
->get_osdmap_epoch()));
6969 ceph_assert(ps
->blocked_by
.empty());
6970 ps
->blocked_by
.insert(auth_log_shard
.osd
);
6971 pl
->publish_stats_to_osd();
6974 boost::statechart::result
PeeringState::GetLog::react(const AdvMap
& advmap
)
6976 // make sure our log source didn't go down. we need to check
6977 // explicitly because it may not be part of the prior set, which
6978 // means the Peering state check won't catch it going down.
6979 if (!advmap
.osdmap
->is_up(auth_log_shard
.osd
)) {
6980 psdout(10) << "GetLog: auth_log_shard osd."
6981 << auth_log_shard
.osd
<< " went down" << dendl
;
6983 return transit
< Reset
>();
6986 // let the Peering state do its checks.
6987 return forward_event();
6990 boost::statechart::result
PeeringState::GetLog::react(const MLogRec
& logevt
)
6993 if (logevt
.from
!= auth_log_shard
) {
6994 psdout(10) << "GetLog: discarding log from "
6995 << "non-auth_log_shard osd." << logevt
.from
<< dendl
;
6996 return discard_event();
6998 psdout(10) << "GetLog: received master log from osd."
6999 << logevt
.from
<< dendl
;
7001 post_event(GotLog());
7002 return discard_event();
7005 boost::statechart::result
PeeringState::GetLog::react(const GotLog
&)
7009 psdout(10) << "leaving GetLog" << dendl
;
7011 psdout(10) << "processing master log" << dendl
;
7012 ps
->proc_master_log(context
<PeeringMachine
>().get_cur_transaction(),
7013 msg
->info
, std::move(msg
->log
), std::move(msg
->missing
),
7016 ps
->start_flush(context
< PeeringMachine
>().get_cur_transaction());
7017 return transit
< GetMissing
>();
7020 boost::statechart::result
PeeringState::GetLog::react(const QueryState
& q
)
7022 q
.f
->open_object_section("state");
7023 q
.f
->dump_string("name", state_name
);
7024 q
.f
->dump_stream("enter_time") << enter_time
;
7025 q
.f
->dump_stream("auth_log_shard") << auth_log_shard
;
7026 q
.f
->close_section();
7027 return forward_event();
7030 boost::statechart::result
PeeringState::GetLog::react(const QueryUnfound
& q
)
7032 q
.f
->dump_string("state", "GetLog");
7033 q
.f
->dump_bool("available_might_have_unfound", false);
7034 return discard_event();
7037 void PeeringState::GetLog::exit()
7039 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
7042 utime_t dur
= ceph_clock_now() - enter_time
;
7043 pl
->get_peering_perf().tinc(rs_getlog_latency
, dur
);
7044 ps
->blocked_by
.clear();
7047 /*------WaitActingChange--------*/
7048 PeeringState::WaitActingChange::WaitActingChange(my_context ctx
)
7050 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/WaitActingChange")
7052 context
< PeeringMachine
>().log_enter(state_name
);
7055 boost::statechart::result
PeeringState::WaitActingChange::react(const AdvMap
& advmap
)
7058 OSDMapRef osdmap
= advmap
.osdmap
;
7060 psdout(10) << "verifying no want_acting " << ps
->want_acting
<< " targets didn't go down" << dendl
;
7061 for (auto p
= ps
->want_acting
.begin(); p
!= ps
->want_acting
.end(); ++p
) {
7062 if (!osdmap
->is_up(*p
)) {
7063 psdout(10) << " want_acting target osd." << *p
<< " went down, resetting" << dendl
;
7065 return transit
< Reset
>();
7068 return forward_event();
7071 boost::statechart::result
PeeringState::WaitActingChange::react(const MLogRec
& logevt
)
7073 psdout(10) << "In WaitActingChange, ignoring MLocRec" << dendl
;
7074 return discard_event();
7077 boost::statechart::result
PeeringState::WaitActingChange::react(const MInfoRec
& evt
)
7079 psdout(10) << "In WaitActingChange, ignoring MInfoRec" << dendl
;
7080 return discard_event();
7083 boost::statechart::result
PeeringState::WaitActingChange::react(const MNotifyRec
& evt
)
7085 psdout(10) << "In WaitActingChange, ignoring MNotifyRec" << dendl
;
7086 return discard_event();
7089 boost::statechart::result
PeeringState::WaitActingChange::react(const QueryState
& q
)
7091 q
.f
->open_object_section("state");
7092 q
.f
->dump_string("name", state_name
);
7093 q
.f
->dump_stream("enter_time") << enter_time
;
7094 q
.f
->dump_string("comment", "waiting for pg acting set to change");
7095 q
.f
->close_section();
7096 return forward_event();
7099 boost::statechart::result
PeeringState::WaitActingChange::react(const QueryUnfound
& q
)
7101 q
.f
->dump_string("state", "WaitActingChange");
7102 q
.f
->dump_bool("available_might_have_unfound", false);
7103 return discard_event();
7106 void PeeringState::WaitActingChange::exit()
7108 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
7110 utime_t dur
= ceph_clock_now() - enter_time
;
7111 pl
->get_peering_perf().tinc(rs_waitactingchange_latency
, dur
);
7114 /*------Down--------*/
7115 PeeringState::Down::Down(my_context ctx
)
7117 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Peering/Down")
7119 context
< PeeringMachine
>().log_enter(state_name
);
7122 ps
->state_clear(PG_STATE_PEERING
);
7123 ps
->state_set(PG_STATE_DOWN
);
7125 auto &prior_set
= context
< Peering
>().prior_set
;
7126 ceph_assert(ps
->blocked_by
.empty());
7127 ps
->blocked_by
.insert(prior_set
.down
.begin(), prior_set
.down
.end());
7128 pl
->publish_stats_to_osd();
7131 void PeeringState::Down::exit()
7133 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
7137 ps
->state_clear(PG_STATE_DOWN
);
7138 utime_t dur
= ceph_clock_now() - enter_time
;
7139 pl
->get_peering_perf().tinc(rs_down_latency
, dur
);
7141 ps
->blocked_by
.clear();
7144 boost::statechart::result
PeeringState::Down::react(const QueryState
& q
)
7146 q
.f
->open_object_section("state");
7147 q
.f
->dump_string("name", state_name
);
7148 q
.f
->dump_stream("enter_time") << enter_time
;
7149 q
.f
->dump_string("comment",
7150 "not enough up instances of this PG to go active");
7151 q
.f
->close_section();
7152 return forward_event();
7155 boost::statechart::result
PeeringState::Down::react(const QueryUnfound
& q
)
7157 q
.f
->dump_string("state", "Down");
7158 q
.f
->dump_bool("available_might_have_unfound", false);
7159 return discard_event();
7162 boost::statechart::result
PeeringState::Down::react(const MNotifyRec
& infoevt
)
7166 ceph_assert(ps
->is_primary());
7167 epoch_t old_start
= ps
->info
.history
.last_epoch_started
;
7168 if (!ps
->peer_info
.count(infoevt
.from
) &&
7169 ps
->get_osdmap()->has_been_up_since(infoevt
.from
.osd
, infoevt
.notify
.epoch_sent
)) {
7170 ps
->update_history(infoevt
.notify
.info
.history
);
7172 // if we got something new to make pg escape down state
7173 if (ps
->info
.history
.last_epoch_started
> old_start
) {
7174 psdout(10) << " last_epoch_started moved forward, re-enter getinfo" << dendl
;
7175 ps
->state_clear(PG_STATE_DOWN
);
7176 ps
->state_set(PG_STATE_PEERING
);
7177 return transit
< GetInfo
>();
7180 return discard_event();
7184 /*------Incomplete--------*/
7185 PeeringState::Incomplete::Incomplete(my_context ctx
)
7187 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Peering/Incomplete")
7189 context
< PeeringMachine
>().log_enter(state_name
);
7192 ps
->state_clear(PG_STATE_PEERING
);
7193 ps
->state_set(PG_STATE_INCOMPLETE
);
7195 PastIntervals::PriorSet
&prior_set
= context
< Peering
>().prior_set
;
7196 ceph_assert(ps
->blocked_by
.empty());
7197 ps
->blocked_by
.insert(prior_set
.down
.begin(), prior_set
.down
.end());
7198 pl
->publish_stats_to_osd();
7201 boost::statechart::result
PeeringState::Incomplete::react(const AdvMap
&advmap
) {
7203 int64_t poolnum
= ps
->info
.pgid
.pool();
7205 // Reset if min_size turn smaller than previous value, pg might now be able to go active
7206 if (!advmap
.osdmap
->have_pg_pool(poolnum
) ||
7207 advmap
.lastmap
->get_pools().find(poolnum
)->second
.min_size
>
7208 advmap
.osdmap
->get_pools().find(poolnum
)->second
.min_size
) {
7210 return transit
< Reset
>();
7213 return forward_event();
7216 boost::statechart::result
PeeringState::Incomplete::react(const MNotifyRec
& notevt
) {
7218 psdout(7) << "handle_pg_notify from osd." << notevt
.from
<< dendl
;
7219 if (ps
->proc_replica_info(
7220 notevt
.from
, notevt
.notify
.info
, notevt
.notify
.epoch_sent
)) {
7221 // We got something new, try again!
7222 return transit
< GetLog
>();
7224 return discard_event();
7228 boost::statechart::result
PeeringState::Incomplete::react(
7229 const QueryState
& q
)
7231 q
.f
->open_object_section("state");
7232 q
.f
->dump_string("name", state_name
);
7233 q
.f
->dump_stream("enter_time") << enter_time
;
7234 q
.f
->dump_string("comment", "not enough complete instances of this PG");
7235 q
.f
->close_section();
7236 return forward_event();
7239 boost::statechart::result
PeeringState::Incomplete::react(const QueryUnfound
& q
)
7241 q
.f
->dump_string("state", "Incomplete");
7242 q
.f
->dump_bool("available_might_have_unfound", false);
7243 return discard_event();
7246 void PeeringState::Incomplete::exit()
7248 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
7252 ps
->state_clear(PG_STATE_INCOMPLETE
);
7253 utime_t dur
= ceph_clock_now() - enter_time
;
7254 pl
->get_peering_perf().tinc(rs_incomplete_latency
, dur
);
7256 ps
->blocked_by
.clear();
7259 /*------GetMissing--------*/
7260 PeeringState::GetMissing::GetMissing(my_context ctx
)
7262 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Peering/GetMissing")
7264 context
< PeeringMachine
>().log_enter(state_name
);
7267 ps
->log_weirdness();
7268 ceph_assert(!ps
->acting_recovery_backfill
.empty());
7270 for (auto i
= ps
->acting_recovery_backfill
.begin();
7271 i
!= ps
->acting_recovery_backfill
.end();
7273 if (*i
== ps
->get_primary()) continue;
7274 const pg_info_t
& pi
= ps
->peer_info
[*i
];
7275 // reset this so to make sure the pg_missing_t is initialized and
7276 // has the correct semantics even if we don't need to get a
7277 // missing set from a shard. This way later additions due to
7278 // lost+unfound delete work properly.
7279 ps
->peer_missing
[*i
].may_include_deletes
= !ps
->perform_deletes_during_peering();
7282 continue; // no pg data, nothing divergent
7284 if (pi
.last_update
< ps
->pg_log
.get_tail()) {
7285 psdout(10) << " osd." << *i
<< " is not contiguous, will restart backfill" << dendl
;
7286 ps
->peer_missing
[*i
].clear();
7289 if (pi
.last_backfill
== hobject_t()) {
7290 psdout(10) << " osd." << *i
<< " will fully backfill; can infer empty missing set" << dendl
;
7291 ps
->peer_missing
[*i
].clear();
7295 if (pi
.last_update
== pi
.last_complete
&& // peer has no missing
7296 pi
.last_update
== ps
->info
.last_update
) { // peer is up to date
7297 // replica has no missing and identical log as us. no need to
7299 // FIXME: we can do better here. if last_update==last_complete we
7300 // can infer the rest!
7301 psdout(10) << " osd." << *i
<< " has no missing, identical log" << dendl
;
7302 ps
->peer_missing
[*i
].clear();
7306 // We pull the log from the peer's last_epoch_started to ensure we
7307 // get enough log to detect divergent updates.
7308 since
.epoch
= pi
.last_epoch_started
;
7309 ceph_assert(pi
.last_update
>= ps
->info
.log_tail
); // or else choose_acting() did a bad thing
7310 if (pi
.log_tail
<= since
) {
7311 psdout(10) << " requesting log+missing since " << since
<< " from osd." << *i
<< dendl
;
7312 context
< PeeringMachine
>().send_query(
7316 i
->shard
, ps
->pg_whoami
.shard
,
7317 since
, ps
->info
.history
,
7318 ps
->get_osdmap_epoch()));
7320 psdout(10) << " requesting fulllog+missing from osd." << *i
7321 << " (want since " << since
<< " < log.tail "
7322 << pi
.log_tail
<< ")" << dendl
;
7323 context
< PeeringMachine
>().send_query(
7325 pg_query_t::FULLLOG
,
7326 i
->shard
, ps
->pg_whoami
.shard
,
7327 ps
->info
.history
, ps
->get_osdmap_epoch()));
7329 peer_missing_requested
.insert(*i
);
7330 ps
->blocked_by
.insert(i
->osd
);
7333 if (peer_missing_requested
.empty()) {
7334 if (ps
->need_up_thru
) {
7335 psdout(10) << " still need up_thru update before going active"
7337 post_event(NeedUpThru());
7342 post_event(Activate(ps
->get_osdmap_epoch()));
7344 pl
->publish_stats_to_osd();
7348 boost::statechart::result
PeeringState::GetMissing::react(const MLogRec
& logevt
)
7352 peer_missing_requested
.erase(logevt
.from
);
7353 ps
->proc_replica_log(logevt
.msg
->info
,
7355 std::move(logevt
.msg
->missing
),
7358 if (peer_missing_requested
.empty()) {
7359 if (ps
->need_up_thru
) {
7360 psdout(10) << " still need up_thru update before going active"
7362 post_event(NeedUpThru());
7364 psdout(10) << "Got last missing, don't need missing "
7365 << "posting Activate" << dendl
;
7366 post_event(Activate(ps
->get_osdmap_epoch()));
7369 return discard_event();
7372 boost::statechart::result
PeeringState::GetMissing::react(const QueryState
& q
)
7375 q
.f
->open_object_section("state");
7376 q
.f
->dump_string("name", state_name
);
7377 q
.f
->dump_stream("enter_time") << enter_time
;
7379 q
.f
->open_array_section("peer_missing_requested");
7380 for (auto p
= peer_missing_requested
.begin();
7381 p
!= peer_missing_requested
.end();
7383 q
.f
->open_object_section("osd");
7384 q
.f
->dump_stream("osd") << *p
;
7385 if (ps
->peer_missing
.count(*p
)) {
7386 q
.f
->open_object_section("got_missing");
7387 ps
->peer_missing
[*p
].dump(q
.f
);
7388 q
.f
->close_section();
7390 q
.f
->close_section();
7392 q
.f
->close_section();
7394 q
.f
->close_section();
7395 return forward_event();
7398 boost::statechart::result
PeeringState::GetMissing::react(const QueryUnfound
& q
)
7400 q
.f
->dump_string("state", "GetMising");
7401 q
.f
->dump_bool("available_might_have_unfound", false);
7402 return discard_event();
7405 void PeeringState::GetMissing::exit()
7407 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
7410 utime_t dur
= ceph_clock_now() - enter_time
;
7411 pl
->get_peering_perf().tinc(rs_getmissing_latency
, dur
);
7412 ps
->blocked_by
.clear();
7415 /*------WaitUpThru--------*/
7416 PeeringState::WaitUpThru::WaitUpThru(my_context ctx
)
7418 NamedState(context
< PeeringMachine
>().state_history
, "Started/Primary/Peering/WaitUpThru")
7420 context
< PeeringMachine
>().log_enter(state_name
);
7423 boost::statechart::result
PeeringState::WaitUpThru::react(const ActMap
& am
)
7426 if (!ps
->need_up_thru
) {
7427 post_event(Activate(ps
->get_osdmap_epoch()));
7429 return forward_event();
7432 boost::statechart::result
PeeringState::WaitUpThru::react(const MLogRec
& logevt
)
7435 psdout(10) << "Noting missing from osd." << logevt
.from
<< dendl
;
7436 ps
->peer_missing
[logevt
.from
].claim(std::move(logevt
.msg
->missing
));
7437 ps
->peer_info
[logevt
.from
] = logevt
.msg
->info
;
7438 return discard_event();
7441 boost::statechart::result
PeeringState::WaitUpThru::react(const QueryState
& q
)
7443 q
.f
->open_object_section("state");
7444 q
.f
->dump_string("name", state_name
);
7445 q
.f
->dump_stream("enter_time") << enter_time
;
7446 q
.f
->dump_string("comment", "waiting for osdmap to reflect a new up_thru for this osd");
7447 q
.f
->close_section();
7448 return forward_event();
7451 boost::statechart::result
PeeringState::WaitUpThru::react(const QueryUnfound
& q
)
7453 q
.f
->dump_string("state", "WaitUpThru");
7454 q
.f
->dump_bool("available_might_have_unfound", false);
7455 return discard_event();
7458 void PeeringState::WaitUpThru::exit()
7460 context
< PeeringMachine
>().log_exit(state_name
, enter_time
);
7462 utime_t dur
= ceph_clock_now() - enter_time
;
7463 pl
->get_peering_perf().tinc(rs_waitupthru_latency
, dur
);
7466 /*----PeeringState::PeeringMachine Methods-----*/
7468 #define dout_prefix dpp->gen_prefix(*_dout)
7470 void PeeringState::PeeringMachine::log_enter(const char *state_name
)
7473 psdout(5) << "enter " << state_name
<< dendl
;
7474 pl
->log_state_enter(state_name
);
7477 void PeeringState::PeeringMachine::log_exit(const char *state_name
, utime_t enter_time
)
7480 utime_t dur
= ceph_clock_now() - enter_time
;
7481 psdout(5) << "exit " << state_name
<< " " << dur
<< " " << event_count
<< " " << event_time
<< dendl
;
7482 pl
->log_state_exit(state_name
, enter_time
, event_count
, event_time
);
7484 event_time
= utime_t();
7487 ostream
&operator<<(ostream
&out
, const PeeringState
&ps
) {
7488 out
<< "pg[" << ps
.info
7489 << " " << pg_vector_string(ps
.up
);
7490 if (ps
.acting
!= ps
.up
)
7491 out
<< "/" << pg_vector_string(ps
.acting
);
7493 out
<< "p" << ps
.get_primary();
7494 if (!ps
.async_recovery_targets
.empty())
7495 out
<< " async=[" << ps
.async_recovery_targets
<< "]";
7496 if (!ps
.backfill_targets
.empty())
7497 out
<< " backfill=[" << ps
.backfill_targets
<< "]";
7498 out
<< " r=" << ps
.get_role();
7499 out
<< " lpr=" << ps
.get_last_peering_reset();
7504 if (!ps
.past_intervals
.empty()) {
7505 out
<< " pi=[" << ps
.past_intervals
.get_bounds()
7506 << ")/" << ps
.past_intervals
.size();
7509 if (ps
.is_peered()) {
7510 if (ps
.last_update_ondisk
!= ps
.info
.last_update
)
7511 out
<< " luod=" << ps
.last_update_ondisk
;
7512 if (ps
.last_update_applied
!= ps
.info
.last_update
)
7513 out
<< " lua=" << ps
.last_update_applied
;
7516 if (ps
.pg_log
.get_tail() != ps
.info
.log_tail
||
7517 ps
.pg_log
.get_head() != ps
.info
.last_update
)
7518 out
<< " (info mismatch, " << ps
.pg_log
.get_log() << ")";
7520 if (!ps
.pg_log
.get_log().empty()) {
7521 if ((ps
.pg_log
.get_log().log
.begin()->version
<= ps
.pg_log
.get_tail())) {
7522 out
<< " (log bound mismatch, actual=["
7523 << ps
.pg_log
.get_log().log
.begin()->version
<< ","
7524 << ps
.pg_log
.get_log().log
.rbegin()->version
<< "]";
7529 out
<< " crt=" << ps
.pg_log
.get_can_rollback_to();
7531 if (ps
.last_complete_ondisk
!= ps
.info
.last_complete
)
7532 out
<< " lcod " << ps
.last_complete_ondisk
;
7534 out
<< " mlcod " << ps
.min_last_complete_ondisk
;
7536 out
<< " " << pg_state_string(ps
.get_state());
7537 if (ps
.should_send_notify())
7540 if (ps
.prior_readable_until_ub
!= ceph::signedspan::zero()) {
7541 out
<< " pruub " << ps
.prior_readable_until_ub
7542 << "@" << ps
.get_prior_readable_down_osds();
7547 std::vector
<pg_shard_t
> PeeringState::get_replica_recovery_order() const
7549 std::vector
<std::pair
<unsigned int, pg_shard_t
>> replicas_by_num_missing
,
7550 async_by_num_missing
;
7551 replicas_by_num_missing
.reserve(get_acting_recovery_backfill().size() - 1);
7552 for (auto &p
: get_acting_recovery_backfill()) {
7553 if (p
== get_primary()) {
7556 auto pm
= get_peer_missing().find(p
);
7557 assert(pm
!= get_peer_missing().end());
7558 auto nm
= pm
->second
.num_missing();
7560 if (is_async_recovery_target(p
)) {
7561 async_by_num_missing
.push_back(make_pair(nm
, p
));
7563 replicas_by_num_missing
.push_back(make_pair(nm
, p
));
7567 // sort by number of missing objects, in ascending order.
7568 auto func
= [](const std::pair
<unsigned int, pg_shard_t
> &lhs
,
7569 const std::pair
<unsigned int, pg_shard_t
> &rhs
) {
7570 return lhs
.first
< rhs
.first
;
7572 // acting goes first
7573 std::sort(replicas_by_num_missing
.begin(), replicas_by_num_missing
.end(), func
);
7574 // then async_recovery_targets
7575 std::sort(async_by_num_missing
.begin(), async_by_num_missing
.end(), func
);
7576 replicas_by_num_missing
.insert(replicas_by_num_missing
.end(),
7577 async_by_num_missing
.begin(), async_by_num_missing
.end());
7579 std::vector
<pg_shard_t
> ret
;
7580 ret
.reserve(replicas_by_num_missing
.size());
7581 for (auto p
: replicas_by_num_missing
) {
7582 ret
.push_back(p
.second
);