#include "messages/MBackfillReserve.h"
#include "messages/MRecoveryReserve.h"
#include "messages/MOSDScrubReserve.h"
-#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGInfo2.h"
#include "messages/MOSDPGTrim.h"
#include "messages/MOSDPGLog.h"
-#include "messages/MOSDPGNotify.h"
#include "messages/MOSDPGNotify2.h"
-#include "messages/MOSDPGQuery.h"
#include "messages/MOSDPGQuery2.h"
#include "messages/MOSDPGLease.h"
#include "messages/MOSDPGLeaseAck.h"
#define dout_context cct
#define dout_subsys ceph_subsys_osd
-BufferedRecoveryMessages::BufferedRecoveryMessages(
- ceph_release_t r,
- PeeringCtx &ctx)
- : require_osd_release(r) {
+using std::dec;
+using std::hex;
+using std::make_pair;
+using std::map;
+using std::ostream;
+using std::pair;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+using ceph::Formatter;
+using ceph::make_message;
+
+BufferedRecoveryMessages::BufferedRecoveryMessages(PeeringCtx &ctx)
// steal messages from ctx
- message_map.swap(ctx.message_map);
-}
+ : message_map{std::move(ctx.message_map)}
+{}
void BufferedRecoveryMessages::send_notify(int to, const pg_notify_t &n)
{
- if (require_osd_release >= ceph_release_t::octopus) {
- spg_t pgid(n.info.pgid.pgid, n.to);
- send_osd_message(to, make_message<MOSDPGNotify2>(pgid, n));
- } else {
- send_osd_message(to, make_message<MOSDPGNotify>(n.epoch_sent, vector{n}));
- }
+ spg_t pgid(n.info.pgid.pgid, n.to);
+ send_osd_message(to, TOPNSPC::make_message<MOSDPGNotify2>(pgid, n));
}
void BufferedRecoveryMessages::send_query(
spg_t to_spgid,
const pg_query_t &q)
{
- if (require_osd_release >= ceph_release_t::octopus) {
- send_osd_message(to,
- make_message<MOSDPGQuery2>(to_spgid, q));
- } else {
- auto m = make_message<MOSDPGQuery>(
- q.epoch_sent,
- MOSDPGQuery::pg_list_t{{to_spgid, q}});
- send_osd_message(to, m);
- }
+ send_osd_message(to, TOPNSPC::make_message<MOSDPGQuery2>(to_spgid, q));
}
void BufferedRecoveryMessages::send_info(
std::optional<pg_lease_t> lease,
std::optional<pg_lease_ack_t> lease_ack)
{
- if (require_osd_release >= ceph_release_t::octopus) {
- send_osd_message(
- to,
- make_message<MOSDPGInfo2>(
- to_spgid,
- info,
- cur_epoch,
- min_epoch,
- lease,
- lease_ack)
- );
- } else {
- send_osd_message(
- to,
- make_message<MOSDPGInfo>(
- cur_epoch,
- vector{pg_notify_t{to_spgid.shard,
- info.pgid.shard,
- min_epoch, cur_epoch,
- info, PastIntervals{}}})
- );
- }
+ send_osd_message(
+ to,
+ TOPNSPC::make_message<MOSDPGInfo2>(
+ to_spgid,
+ info,
+ cur_epoch,
+ min_epoch,
+ lease,
+ lease_ack)
+ );
}
-void PGPool::update(CephContext *cct, OSDMapRef map)
+void PGPool::update(OSDMapRef map)
{
const pg_pool_t *pi = map->get_pg_pool(id);
if (!pi) {
pg_whoami(pg_whoami),
info(spgid),
pg_log(cct),
+ last_require_osd_release(curmap->require_osd_release),
missing_loc(spgid, this, dpp, cct),
machine(this, cct, spgid, dpp, pl, &state_history)
{
ceph_assert(!messages_pending_flush);
ceph_assert(orig_ctx);
ceph_assert(rctx);
- messages_pending_flush = BufferedRecoveryMessages(
- orig_ctx->require_osd_release);
+ messages_pending_flush.emplace();
rctx.emplace(*messages_pending_flush, *orig_ctx);
}
missing_loc.check_recovery_sources(osdmap);
pl->check_recovery_sources(osdmap);
- for (set<pg_shard_t>::iterator i = peer_log_requested.begin();
- i != peer_log_requested.end();
- ) {
+ for (auto i = peer_log_requested.begin(); i != peer_log_requested.end();) {
if (!osdmap->is_up(i->osd)) {
psdout(10) << "peer_log_requested removing " << *i << dendl;
peer_log_requested.erase(i++);
}
}
- for (set<pg_shard_t>::iterator i = peer_missing_requested.begin();
- i != peer_missing_requested.end();
- ) {
+ for (auto i = peer_missing_requested.begin();
+ i != peer_missing_requested.end();) {
if (!osdmap->is_up(i->osd)) {
psdout(10) << "peer_missing_requested removing " << *i << dendl;
peer_missing_requested.erase(i++);
pl->on_info_history_change();
}
+hobject_t PeeringState::earliest_backfill() const
+{
+ hobject_t e = hobject_t::get_max();
+ for (const pg_shard_t& bt : get_backfill_targets()) {
+ const pg_info_t &pi = get_peer_info(bt);
+ e = std::min(pi.last_backfill, e);
+ }
+ return e;
+}
+
void PeeringState::purge_strays()
{
if (is_premerge()) {
psdout(10) << "purge_strays " << stray_set << dendl;
bool removed = false;
- for (set<pg_shard_t>::iterator p = stray_set.begin();
- p != stray_set.end();
- ++p) {
+ for (auto p = stray_set.begin(); p != stray_set.end(); ++p) {
ceph_assert(!is_acting_recovery_backfill(*p));
if (get_osdmap()->is_up(p->osd)) {
psdout(10) << "sending PGRemove to osd." << *p << dendl;
vector<spg_t> to_remove;
to_remove.push_back(spg_t(info.pgid.pgid, p->shard));
- MOSDPGRemove *m = new MOSDPGRemove(
+ auto m = TOPNSPC::make_message<MOSDPGRemove>(
get_osdmap_epoch(),
to_remove);
- pl->send_cluster_message(p->osd, m, get_osdmap_epoch());
+ pl->send_cluster_message(p->osd, std::move(m), get_osdmap_epoch());
} else {
psdout(10) << "not sending PGRemove to down osd." << *p << dendl;
}
peer_missing_requested.clear();
}
+void PeeringState::query_unfound(Formatter *f, string state)
+{
+ psdout(20) << "Enter PeeringState common QueryUnfound" << dendl;
+ {
+ f->dump_string("state", state);
+ f->dump_bool("available_might_have_unfound", true);
+ f->open_array_section("might_have_unfound");
+ for (auto p = might_have_unfound.begin();
+ p != might_have_unfound.end();
+ ++p) {
+ if (peer_missing.count(*p)) {
+ ; // Ignore already probed OSDs
+ } else {
+ f->open_object_section("osd");
+ f->dump_stream("osd") << *p;
+ if (peer_missing_requested.count(*p)) {
+ f->dump_string("status", "querying");
+ } else if (!get_osdmap()->is_up(p->osd)) {
+ f->dump_string("status", "osd is down");
+ } else {
+ f->dump_string("status", "not queried");
+ }
+ f->close_section();
+ }
+ }
+ f->close_section();
+ }
+ psdout(20) << "Exit PeeringState common QueryUnfound" << dendl;
+ return;
+}
bool PeeringState::proc_replica_info(
pg_shard_t from, const pg_info_t &oinfo, epoch_t send_epoch)
{
- map<pg_shard_t, pg_info_t>::iterator p = peer_info.find(from);
+ auto p = peer_info.find(from);
if (p != peer_info.end() && p->second.last_update == oinfo.last_update) {
psdout(10) << " got dup osd." << from << " info "
<< oinfo << ", identical to ours" << dendl;
{
// Remove any downed osds from peer_info
bool removed = false;
- map<pg_shard_t, pg_info_t>::iterator p = peer_info.begin();
+ auto p = peer_info.begin();
while (p != peer_info.end()) {
if (!osdmap->is_up(p->first.osd)) {
psdout(10) << " dropping down osd." << p->first << " info " << p->second << dendl;
peer_missing.erase(p->first);
peer_log_requested.erase(p->first);
peer_missing_requested.erase(p->first);
- peer_purged.erase(p->first);
peer_info.erase(p++);
removed = true;
} else
++p;
}
+ // Remove any downed osds from peer_purged so we can re-purge if necessary
+ auto it = peer_purged.begin();
+ while (it != peer_purged.end()) {
+ if (!osdmap->is_up(it->osd)) {
+ psdout(10) << " dropping down osd." << *it << " from peer_purged" << dendl;
+ peer_purged.erase(it++);
+ } else {
+ ++it;
+ }
+ }
+
// if we removed anyone, update peers (which include peer_info)
if (removed)
update_heartbeat_peers();
if (up[i] != CRUSH_ITEM_NONE)
new_peers.insert(up[i]);
}
- for (map<pg_shard_t,pg_info_t>::iterator p = peer_info.begin();
- p != peer_info.end();
- ++p) {
+ for (auto p = peer_info.begin(); p != peer_info.end(); ++p) {
new_peers.insert(p->first.osd);
}
pl->update_heartbeat_peers(std::move(new_peers));
<< dendl;
update_osdmap_ref(osdmap);
- pool.update(cct, osdmap);
+ pool.update(osdmap);
AdvMap evt(
osdmap, lastmap, newup, up_primary,
if (pool.info.last_change == osdmap_ref->get_epoch()) {
pl->on_pool_change();
}
- readable_interval = pool.get_readable_interval();
+ readable_interval = pool.get_readable_interval(cct->_conf);
last_require_osd_release = osdmap->require_osd_release;
}
}
write_if_dirty(rctx.transaction);
- if (get_osdmap()->check_new_blacklist_entries()) {
- pl->check_blacklisted_watchers();
+ if (get_osdmap()->check_new_blocklist_entries()) {
+ pl->check_blocklisted_watchers();
}
}
// did primary change?
if (was_old_primary != is_primary()) {
state_clear(PG_STATE_CLEAN);
+ // queue/dequeue the scrubber
+ pl->on_primary_status_change(was_old_primary, is_primary());
}
pl->on_role_change();
}
}
+ if (is_primary() && was_old_primary) {
+ pl->reschedule_scrub();
+ }
+
if (acting.empty() && !up.empty() && up_primary == pg_whoami) {
psdout(10) << " acting empty, but i am up[0], clearing pg_temp" << dendl;
pl->queue_want_pg_temp(acting);
// initialize features
acting_features = CEPH_FEATURES_SUPPORTED_DEFAULT;
upacting_features = CEPH_FEATURES_SUPPORTED_DEFAULT;
- for (vector<int>::iterator p = acting.begin(); p != acting.end(); ++p) {
+ for (auto p = acting.begin(); p != acting.end(); ++p) {
if (*p == CRUSH_ITEM_NONE)
continue;
uint64_t f = osdmap->get_xinfo(*p).features;
acting_features &= f;
upacting_features &= f;
}
- for (vector<int>::iterator p = up.begin(); p != up.end(); ++p) {
+ for (auto p = up.begin(); p != up.end(); ++p) {
if (*p == CRUSH_ITEM_NONE)
continue;
upacting_features &= osdmap->get_xinfo(*p).features;
if (state & PG_STATE_FORCED_BACKFILL) {
ret = OSD_BACKFILL_PRIORITY_FORCED;
} else {
- if (acting.size() < pool.info.min_size) {
+ if (actingset.size() < pool.info.min_size) {
base = OSD_BACKFILL_INACTIVE_PRIORITY_BASE;
// inactive: no. of replicas < min_size, highest priority since it blocks IO
- ret = base + (pool.info.min_size - acting.size());
+ ret = base + (pool.info.min_size - actingset.size());
} else if (is_undersized()) {
// undersized: OSD_BACKFILL_DEGRADED_PRIORITY_BASE + num missing replicas
}
pl->send_cluster_message(
peer.osd,
- new MOSDPGLease(epoch,
+ TOPNSPC::make_message<MOSDPGLease>(epoch,
spg_t(spgid.pgid, peer.shard),
get_lease()),
epoch);
void PeeringState::proc_lease(const pg_lease_t& l)
{
- if (!HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)) {
- psdout(20) << __func__ << " no-op, upacting_features 0x" << std::hex
- << upacting_features << std::dec
- << " does not include SERVER_OCTOPUS" << dendl;
- return;
- }
+ assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS));
if (!is_nonprimary()) {
psdout(20) << __func__ << " no-op, !nonprimary" << dendl;
return;
void PeeringState::proc_lease_ack(int from, const pg_lease_ack_t& a)
{
- if (!HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)) {
- return;
- }
+ assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS));
auto now = pl->get_mnow();
bool was_min = false;
for (unsigned i = 0; i < acting.size(); ++i) {
void PeeringState::proc_renew_lease()
{
- if (!HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)) {
- return;
- }
+ assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS));
renew_lease(pl->get_mnow());
send_lease();
schedule_renew_lease();
bool PeeringState::check_prior_readable_down_osds(const OSDMapRef& map)
{
- if (!HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)) {
- return false;
- }
+ assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS));
bool changed = false;
auto p = prior_readable_down_osds.begin();
while (p != prior_readable_down_osds.end()) {
{
if (1) {
// sanity check
- for (map<pg_shard_t,pg_info_t>::iterator it = peer_info.begin();
- it != peer_info.end();
- ++it) {
+ for (auto it = peer_info.begin(); it != peer_info.end(); ++it) {
ceph_assert(info.history.last_epoch_started >=
it->second.history.last_epoch_started);
}
}
ceph_assert(!acting_recovery_backfill.empty());
- set<pg_shard_t>::const_iterator end = acting_recovery_backfill.end();
- set<pg_shard_t>::const_iterator a = acting_recovery_backfill.begin();
- for (; a != end; ++a) {
- if (*a == get_primary()) continue;
- pg_shard_t peer = *a;
- map<pg_shard_t, pg_missing_t>::const_iterator pm = peer_missing.find(peer);
+ for (const pg_shard_t& peer : acting_recovery_backfill) {
+ if (peer == get_primary()) {
+ continue;
+ }
+ auto pm = peer_missing.find(peer);
if (pm == peer_missing.end()) {
psdout(10) << __func__ << " osd." << peer << " doesn't have missing set"
<< dendl;
// We can assume that only possible osds that need backfill
// are on the backfill_targets vector nodes.
- set<pg_shard_t>::const_iterator end = backfill_targets.end();
- set<pg_shard_t>::const_iterator a = backfill_targets.begin();
- for (; a != end; ++a) {
- pg_shard_t peer = *a;
- map<pg_shard_t, pg_info_t>::const_iterator pi = peer_info.find(peer);
+ for (const pg_shard_t& peer : backfill_targets) {
+ auto pi = peer_info.find(peer);
+ ceph_assert(pi != peer_info.end());
if (!pi->second.last_backfill.is_max()) {
psdout(10) << __func__ << " osd." << peer
<< " has last_backfill " << pi->second.last_backfill << dendl;
{
ceph_assert(is_primary());
- set<pg_shard_t>::const_iterator peer = might_have_unfound.begin();
- set<pg_shard_t>::const_iterator mend = might_have_unfound.end();
+ auto peer = might_have_unfound.begin();
+ auto mend = might_have_unfound.end();
for (; peer != mend; ++peer) {
if (peer_missing.count(*peer))
continue;
- map<pg_shard_t, pg_info_t>::const_iterator iter = peer_info.find(*peer);
+ auto iter = peer_info.find(*peer);
if (iter != peer_info.end() &&
(iter->second.is_empty() || iter->second.dne()))
continue;
pl->unreserve_recovery_space();
pl->send_cluster_message(
primary.osd,
- new MBackfillReserve(
+ TOPNSPC::make_message<MBackfillReserve>(
MBackfillReserve::REJECT_TOOFULL,
spg_t(info.pgid.pgid, primary.shard),
get_osdmap_epoch()),
/* See doc/dev/osd_internals/last_epoch_started.rst before attempting
* to make changes to this process. Also, make sure to update it
* when you find bugs! */
- eversion_t min_last_update_acceptable = eversion_t::max();
epoch_t max_last_epoch_started_found = 0;
- for (map<pg_shard_t, pg_info_t>::const_iterator i = infos.begin();
- i != infos.end();
- ++i) {
+ for (auto i = infos.begin(); i != infos.end(); ++i) {
if (!cct->_conf->osd_find_best_info_ignore_history_les &&
max_last_epoch_started_found < i->second.history.last_epoch_started) {
*history_les_bound = true;
max_last_epoch_started_found = i->second.last_epoch_started;
}
}
- for (map<pg_shard_t, pg_info_t>::const_iterator i = infos.begin();
- i != infos.end();
- ++i) {
+ eversion_t min_last_update_acceptable = eversion_t::max();
+ for (auto i = infos.begin(); i != infos.end(); ++i) {
if (max_last_epoch_started_found <= i->second.last_epoch_started) {
if (min_last_update_acceptable > i->second.last_update)
min_last_update_acceptable = i->second.last_update;
if (min_last_update_acceptable == eversion_t::max())
return infos.end();
- map<pg_shard_t, pg_info_t>::const_iterator best = infos.end();
+ auto best = infos.end();
// find osd with newest last_update (oldest for ec_pool).
// if there are multiples, prefer
// - a longer tail, if it brings another peer into log contiguity
// - the current primary
- for (map<pg_shard_t, pg_info_t>::const_iterator p = infos.begin();
- p != infos.end();
- ++p) {
+ for (auto p = infos.begin(); p != infos.end(); ++p) {
if (restrict_to_up_acting && !is_up(p->first) &&
!is_acting(p->first))
continue;
{
vector<int> want(size, CRUSH_ITEM_NONE);
map<shard_id_t, set<pg_shard_t> > all_info_by_shard;
- for (map<pg_shard_t, pg_info_t>::const_iterator i = all_info.begin();
+ for (auto i = all_info.begin();
i != all_info.end();
++i) {
all_info_by_shard[i->first.shard].insert(i->first);
ss << " selecting acting[i]: " << pg_shard_t(acting[i], shard_id_t(i)) << std::endl;
want[i] = acting[i];
} else if (!restrict_to_up_acting) {
- for (set<pg_shard_t>::iterator j = all_info_by_shard[shard_id_t(i)].begin();
+ for (auto j = all_info_by_shard[shard_id_t(i)].begin();
j != all_info_by_shard[shard_id_t(i)].end();
++j) {
ceph_assert(j->shard == i);
_want->swap(want);
}
-/**
- * calculate the desired acting set.
- *
- * Choose an appropriate acting set. Prefer up[0], unless it is
- * incomplete, or another osd has a longer tail that allows us to
- * bring other up nodes up to date.
- */
-void PeeringState::calc_replicated_acting(
+std::pair<map<pg_shard_t, pg_info_t>::const_iterator, eversion_t>
+PeeringState::select_replicated_primary(
map<pg_shard_t, pg_info_t>::const_iterator auth_log_shard,
uint64_t force_auth_primary_missing_objects,
- unsigned size,
- const vector<int> &acting,
- const vector<int> &up,
+ const std::vector<int> &up,
pg_shard_t up_primary,
const map<pg_shard_t, pg_info_t> &all_info,
- bool restrict_to_up_acting,
- vector<int> *want,
- set<pg_shard_t> *backfill,
- set<pg_shard_t> *acting_backfill,
const OSDMapRef osdmap,
ostream &ss)
{
pg_shard_t auth_log_shard_id = auth_log_shard->first;
ss << __func__ << " newest update on osd." << auth_log_shard_id
- << " with " << auth_log_shard->second
- << (restrict_to_up_acting ? " restrict_to_up_acting" : "") << std::endl;
+ << " with " << auth_log_shard->second << std::endl;
// select primary
auto primary = all_info.find(up_primary);
!primary->second.is_incomplete() &&
primary->second.last_update >=
auth_log_shard->second.log_tail) {
- if (HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS)) {
- auto approx_missing_objects =
- primary->second.stats.stats.sum.num_objects_missing;
- auto auth_version = auth_log_shard->second.last_update.version;
- auto primary_version = primary->second.last_update.version;
- if (auth_version > primary_version) {
- approx_missing_objects += auth_version - primary_version;
- } else {
- approx_missing_objects += primary_version - auth_version;
- }
- if ((uint64_t)approx_missing_objects >
- force_auth_primary_missing_objects) {
- primary = auth_log_shard;
- ss << "up_primary: " << up_primary << ") has approximate "
- << approx_missing_objects
- << "(>" << force_auth_primary_missing_objects <<") "
- << "missing objects, osd." << auth_log_shard_id
- << " selected as primary instead"
- << std::endl;
- } else {
- ss << "up_primary: " << up_primary << ") selected as primary"
- << std::endl;
- }
+ assert(HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS));
+ auto approx_missing_objects =
+ primary->second.stats.stats.sum.num_objects_missing;
+ auto auth_version = auth_log_shard->second.last_update.version;
+ auto primary_version = primary->second.last_update.version;
+ if (auth_version > primary_version) {
+ approx_missing_objects += auth_version - primary_version;
+ } else {
+ approx_missing_objects += primary_version - auth_version;
+ }
+ if ((uint64_t)approx_missing_objects >
+ force_auth_primary_missing_objects) {
+ primary = auth_log_shard;
+ ss << "up_primary: " << up_primary << ") has approximate "
+ << approx_missing_objects
+ << "(>" << force_auth_primary_missing_objects <<") "
+ << "missing objects, osd." << auth_log_shard_id
+ << " selected as primary instead"
+ << std::endl;
} else {
- ss << "up_primary: " << up_primary << ") selected as primary" << std::endl;
+ ss << "up_primary: " << up_primary << ") selected as primary"
+ << std::endl;
}
} else {
ceph_assert(!auth_log_shard->second.is_incomplete());
ss << __func__ << " primary is osd." << primary->first
<< " with " << primary->second << std::endl;
- want->push_back(primary->first.osd);
- acting_backfill->insert(primary->first);
/* We include auth_log_shard->second.log_tail because in GetLog,
* we will request logs back to the min last_update over our
eversion_t oldest_auth_log_entry =
std::min(primary->second.log_tail, auth_log_shard->second.log_tail);
+ return std::make_pair(primary, oldest_auth_log_entry);
+}
+
+
+/**
+ * calculate the desired acting set.
+ *
+ * Choose an appropriate acting set. Prefer up[0], unless it is
+ * incomplete, or another osd has a longer tail that allows us to
+ * bring other up nodes up to date.
+ */
+void PeeringState::calc_replicated_acting(
+ map<pg_shard_t, pg_info_t>::const_iterator primary,
+ eversion_t oldest_auth_log_entry,
+ unsigned size,
+ const vector<int> &acting,
+ const vector<int> &up,
+ pg_shard_t up_primary,
+ const map<pg_shard_t, pg_info_t> &all_info,
+ bool restrict_to_up_acting,
+ vector<int> *want,
+ set<pg_shard_t> *backfill,
+ set<pg_shard_t> *acting_backfill,
+ const OSDMapRef osdmap,
+ const PGPool& pool,
+ ostream &ss)
+{
+ ss << __func__ << (restrict_to_up_acting ? " restrict_to_up_acting" : "")
+ << std::endl;
+
+ want->push_back(primary->first.osd);
+ acting_backfill->insert(primary->first);
+
// select replicas that have log contiguity with primary.
// prefer up, then acting, then any peer_info osds
for (auto i : up) {
// skip up osds we already considered above
if (acting_cand == primary->first)
continue;
- vector<int>::const_iterator up_it = find(up.begin(), up.end(), i);
+ auto up_it = find(up.begin(), up.end(), i);
if (up_it != up.end())
continue;
// skip up osds we already considered above
if (i.first == primary->first)
continue;
- vector<int>::const_iterator up_it = find(up.begin(), up.end(), i.first.osd);
+ auto up_it = find(up.begin(), up.end(), i.first.osd);
if (up_it != up.end())
continue;
- vector<int>::const_iterator acting_it = find(
+ auto acting_it = find(
acting.begin(), acting.end(), i.first.osd);
if (acting_it != acting.end())
continue;
}
}
+// Defines osd preference order: acting set, then larger last_update
+using osd_ord_t = std::tuple<bool, eversion_t>; // <acting, last_update>
+using osd_id_t = int;
+
+class bucket_candidates_t {
+ std::deque<std::pair<osd_ord_t, osd_id_t>> osds;
+ int selected = 0;
+
+public:
+ void add_osd(osd_ord_t ord, osd_id_t osd) {
+ // osds will be added in smallest to largest order
+ assert(osds.empty() || osds.back().first <= ord);
+ osds.push_back(std::make_pair(ord, osd));
+ }
+ osd_id_t pop_osd() {
+ ceph_assert(!is_empty());
+ auto ret = osds.front();
+ osds.pop_front();
+ return ret.second;
+ }
+
+ void inc_selected() { selected++; }
+ unsigned get_num_selected() const { return selected; }
+
+ osd_ord_t get_ord() const {
+ return osds.empty() ? std::make_tuple(false, eversion_t())
+ : osds.front().first;
+ }
+
+ bool is_empty() const { return osds.empty(); }
+
+ bool operator<(const bucket_candidates_t &rhs) const {
+ return std::make_tuple(-selected, get_ord()) <
+ std::make_tuple(-rhs.selected, rhs.get_ord());
+ }
+
+ friend std::ostream &operator<<(std::ostream &, const bucket_candidates_t &);
+};
+
+std::ostream &operator<<(std::ostream &lhs, const bucket_candidates_t &cand)
+{
+ return lhs << "candidates[" << cand.osds << "]";
+}
+
+class bucket_heap_t {
+ using elem_t = std::reference_wrapper<bucket_candidates_t>;
+ std::vector<elem_t> heap;
+
+ // Max heap -- should emit buckets in order of preference
+ struct comp {
+ bool operator()(const elem_t &lhs, const elem_t &rhs) {
+ return lhs.get() < rhs.get();
+ }
+ };
+public:
+ void push_if_nonempty(elem_t e) {
+ if (!e.get().is_empty()) {
+ heap.push_back(e);
+ std::push_heap(heap.begin(), heap.end(), comp());
+ }
+ }
+ elem_t pop() {
+ std::pop_heap(heap.begin(), heap.end(), comp());
+ auto ret = heap.back();
+ heap.pop_back();
+ return ret;
+ }
+
+ bool is_empty() const { return heap.empty(); }
+};
+
+/**
+ * calc_replicated_acting_stretch
+ *
+ * Choose an acting set using as much of the up set as possible; filling
+ * in the remaining slots so as to maximize the number of crush buckets at
+ * level pool.info.peering_crush_bucket_barrier represented.
+ *
+ * Stretch clusters are a bit special: while they have a "size" the
+ * same way as normal pools, if we happen to lose a data center
+ * (we call it a "stretch bucket", but really it'll be a data center or
+ * a cloud availability zone), we don't actually want to shove
+ * 2 DC's worth of replication into a single site -- it won't fit!
+ * So we locally calculate a bucket_max, based
+ * on the targeted number of stretch buckets for the pool and
+ * its size. Then we won't pull more than bucket_max from any
+ * given ancestor even if it leaves us undersized.
+
+ * There are two distinct phases: (commented below)
+ */
+void PeeringState::calc_replicated_acting_stretch(
+ map<pg_shard_t, pg_info_t>::const_iterator primary,
+ eversion_t oldest_auth_log_entry,
+ unsigned size,
+ const vector<int> &acting,
+ const vector<int> &up,
+ pg_shard_t up_primary,
+ const map<pg_shard_t, pg_info_t> &all_info,
+ bool restrict_to_up_acting,
+ vector<int> *want,
+ set<pg_shard_t> *backfill,
+ set<pg_shard_t> *acting_backfill,
+ const OSDMapRef osdmap,
+ const PGPool& pool,
+ ostream &ss)
+{
+ ceph_assert(want);
+ ceph_assert(acting_backfill);
+ ceph_assert(backfill);
+ ss << __func__ << (restrict_to_up_acting ? " restrict_to_up_acting" : "")
+ << std::endl;
+
+ auto used = [want](int osd) {
+ return std::find(want->begin(), want->end(), osd) != want->end();
+ };
+
+ auto usable_info = [&](const auto &cur_info) mutable {
+ return !(cur_info.is_incomplete() ||
+ cur_info.last_update < oldest_auth_log_entry);
+ };
+
+ auto osd_info = [&](int osd) mutable -> const pg_info_t & {
+ pg_shard_t cand = pg_shard_t(osd, shard_id_t::NO_SHARD);
+ const pg_info_t &cur_info = all_info.find(cand)->second;
+ return cur_info;
+ };
+
+ auto usable_osd = [&](int osd) mutable {
+ return usable_info(osd_info(osd));
+ };
+
+ std::map<int, bucket_candidates_t> ancestors;
+ auto get_ancestor = [&](int osd) mutable {
+ int ancestor = osdmap->crush->get_parent_of_type(
+ osd,
+ pool.info.peering_crush_bucket_barrier,
+ pool.info.crush_rule);
+ return &ancestors[ancestor];
+ };
+
+ unsigned bucket_max = pool.info.size / pool.info.peering_crush_bucket_target;
+ if (bucket_max * pool.info.peering_crush_bucket_target < pool.info.size) {
+ ++bucket_max;
+ }
+
+ /* 1) Select all usable osds from the up set as well as the primary
+ *
+ * We also stash any unusable osds from up into backfill.
+ */
+ auto add_required = [&](int osd) {
+ if (!used(osd)) {
+ want->push_back(osd);
+ acting_backfill->insert(
+ pg_shard_t(osd, shard_id_t::NO_SHARD));
+ get_ancestor(osd)->inc_selected();
+ }
+ };
+ add_required(primary->first.osd);
+ ss << " osd " << primary->first.osd << " primary accepted "
+ << osd_info(primary->first.osd) << std::endl;
+ for (auto upcand: up) {
+ auto upshard = pg_shard_t(upcand, shard_id_t::NO_SHARD);
+ auto &curinfo = osd_info(upcand);
+ if (usable_osd(upcand)) {
+ ss << " osd " << upcand << " (up) accepted " << curinfo << std::endl;
+ add_required(upcand);
+ } else {
+ ss << " osd " << upcand << " (up) backfill " << curinfo << std::endl;
+ backfill->insert(upshard);
+ acting_backfill->insert(upshard);
+ }
+ }
+
+ if (want->size() >= pool.info.size) { // non-failed CRUSH mappings are valid
+ ss << " up set sufficient" << std::endl;
+ return;
+ }
+ ss << " up set insufficient, considering remaining osds" << std::endl;
+
+ /* 2) Fill out remaining slots from usable osds in all_info
+ * while maximizing the number of ancestor nodes at the
+ * barrier_id crush level.
+ */
+ {
+ std::vector<std::pair<osd_ord_t, osd_id_t>> candidates;
+ /* To do this, we first filter the set of usable osd into an ordered
+ * list of usable osds
+ */
+ auto get_osd_ord = [&](bool is_acting, const pg_info_t &info) -> osd_ord_t {
+ return std::make_tuple(
+ !is_acting /* acting should sort first */,
+ info.last_update);
+ };
+ for (auto &cand : acting) {
+ auto &cand_info = osd_info(cand);
+ if (!used(cand) && usable_info(cand_info)) {
+ ss << " acting candidate " << cand << " " << cand_info << std::endl;
+ candidates.push_back(std::make_pair(get_osd_ord(true, cand_info), cand));
+ }
+ }
+ if (!restrict_to_up_acting) {
+ for (auto &[cand, info] : all_info) {
+ if (!used(cand.osd) && usable_info(info) &&
+ (std::find(acting.begin(), acting.end(), cand.osd)
+ == acting.end())) {
+ ss << " other candidate " << cand << " " << info << std::endl;
+ candidates.push_back(
+ std::make_pair(get_osd_ord(false, info), cand.osd));
+ }
+ }
+ }
+ std::sort(candidates.begin(), candidates.end());
+
+ // We then filter these candidates by ancestor
+ std::for_each(candidates.begin(), candidates.end(), [&](auto cand) {
+ get_ancestor(cand.second)->add_osd(cand.first, cand.second);
+ });
+ }
+
+ auto pop_ancestor = [&](auto &ancestor) {
+ ceph_assert(!ancestor.is_empty());
+ auto osd = ancestor.pop_osd();
+
+ ss << " accepting candidate " << osd << std::endl;
+
+ ceph_assert(!used(osd));
+ ceph_assert(usable_osd(osd));
+
+ want->push_back(osd);
+ acting_backfill->insert(
+ pg_shard_t(osd, shard_id_t::NO_SHARD));
+ ancestor.inc_selected();
+ };
+
+ /* Next, we use the ancestors map to grab a descendant of the
+ * peering_crush_mandatory_member if not already represented.
+ *
+ * TODO: using 0 here to match other users. Prior to merge, I
+ * expect that this and other users should instead check against
+ * CRUSH_ITEM_NONE.
+ */
+ if (pool.info.peering_crush_mandatory_member != CRUSH_ITEM_NONE) {
+ auto aiter = ancestors.find(pool.info.peering_crush_mandatory_member);
+ if (aiter != ancestors.end() &&
+ !aiter->second.get_num_selected()) {
+ ss << " adding required ancestor " << aiter->first << std::endl;
+ ceph_assert(!aiter->second.is_empty()); // wouldn't exist otherwise
+ pop_ancestor(aiter->second);
+ }
+ }
+
+ /* We then place the ancestors in a heap ordered by fewest selected
+ * and then by the ordering token of the next osd */
+ bucket_heap_t aheap;
+ std::for_each(ancestors.begin(), ancestors.end(), [&](auto &anc) {
+ aheap.push_if_nonempty(anc.second);
+ });
+
+ /* and pull from this heap until it's empty or we have enough.
+ * "We have enough" is a sufficient check here for
+ * stretch_set_can_peer() because our heap sorting always
+ * pulls from ancestors with the least number of included OSDs,
+ * so if it is possible to satisfy the bucket_count constraints we
+ * will do so.
+ */
+ while (!aheap.is_empty() && want->size() < pool.info.size) {
+ auto next = aheap.pop();
+ pop_ancestor(next.get());
+ if (next.get().get_num_selected() < bucket_max) {
+ aheap.push_if_nonempty(next);
+ }
+ }
+
+ /* The end result is that we should have as many buckets covered as
+ * possible while respecting up, the primary selection,
+ * the pool size (given bucket count constraints),
+ * and the mandatory member.
+ */
+}
+
+
bool PeeringState::recoverable(const vector<int> &want) const
{
unsigned num_want_acting = 0;
}
if (num_want_acting < pool.info.min_size) {
- const bool recovery_ec_pool_below_min_size=
- HAVE_FEATURE(get_osdmap()->get_up_osd_features(), SERVER_OCTOPUS);
-
- if (pool.info.is_erasure() && !recovery_ec_pool_below_min_size) {
- psdout(10) << __func__ << " failed, ec recovery below min size not supported by pre-octopus" << dendl;
- return false;
- } else if (!cct->_conf.get_val<bool>("osd_allow_recovery_below_min_size")) {
+ if (!cct->_conf.get_val<bool>("osd_allow_recovery_below_min_size")) {
psdout(10) << __func__ << " failed, recovery below min size not enabled" << dendl;
return false;
}
// past the authoritative last_update the same as those equal to it.
version_t auth_version = auth_info.last_update.version;
version_t candidate_version = shard_info.last_update.version;
- if (HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS)) {
- auto approx_missing_objects =
- shard_info.stats.stats.sum.num_objects_missing;
- if (auth_version > candidate_version) {
- approx_missing_objects += auth_version - candidate_version;
- }
- if (static_cast<uint64_t>(approx_missing_objects) >
- cct->_conf.get_val<uint64_t>("osd_async_recovery_min_cost")) {
- candidates_by_cost.emplace(approx_missing_objects, shard_i);
- }
- } else {
- if (auth_version > candidate_version &&
- (auth_version - candidate_version) > cct->_conf.get_val<uint64_t>("osd_async_recovery_min_cost")) {
- candidates_by_cost.insert(make_pair(auth_version - candidate_version, shard_i));
- }
+ assert(HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS));
+ auto approx_missing_objects =
+ shard_info.stats.stats.sum.num_objects_missing;
+ if (auth_version > candidate_version) {
+ approx_missing_objects += auth_version - candidate_version;
+ }
+ if (static_cast<uint64_t>(approx_missing_objects) >
+ cct->_conf.get_val<uint64_t>("osd_async_recovery_min_cost")) {
+ candidates_by_cost.emplace(approx_missing_objects, shard_i);
}
}
// logs plus historical missing objects as the cost of recovery
version_t auth_version = auth_info.last_update.version;
version_t candidate_version = shard_info.last_update.version;
- if (HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS)) {
- auto approx_missing_objects =
- shard_info.stats.stats.sum.num_objects_missing;
- if (auth_version > candidate_version) {
- approx_missing_objects += auth_version - candidate_version;
- } else {
- approx_missing_objects += candidate_version - auth_version;
- }
- if (static_cast<uint64_t>(approx_missing_objects) >
- cct->_conf.get_val<uint64_t>("osd_async_recovery_min_cost")) {
- candidates_by_cost.emplace(approx_missing_objects, shard_i);
- }
+ assert(HAVE_FEATURE(osdmap->get_up_osd_features(), SERVER_NAUTILUS));
+ auto approx_missing_objects =
+ shard_info.stats.stats.sum.num_objects_missing;
+ if (auth_version > candidate_version) {
+ approx_missing_objects += auth_version - candidate_version;
} else {
- size_t approx_entries;
- if (auth_version > candidate_version) {
- approx_entries = auth_version - candidate_version;
- } else {
- approx_entries = candidate_version - auth_version;
- }
- if (approx_entries > cct->_conf.get_val<uint64_t>("osd_async_recovery_min_cost")) {
- candidates_by_cost.insert(make_pair(approx_entries, shard_i));
- }
+ approx_missing_objects += candidate_version - auth_version;
+ }
+ if (static_cast<uint64_t>(approx_missing_objects) >
+ cct->_conf.get_val<uint64_t>("osd_async_recovery_min_cost")) {
+ candidates_by_cost.emplace(approx_missing_objects, shard_i);
}
}
vector<int> candidate_want(*want);
for (auto it = candidate_want.begin(); it != candidate_want.end(); ++it) {
if (*it == cur_shard.osd) {
- candidate_want.erase(it);
- want->swap(candidate_want);
- async_recovery->insert(cur_shard);
- break;
+ candidate_want.erase(it);
+ if (pool.info.stretch_set_can_peer(candidate_want, *osdmap, NULL)) {
+ // if we're in stretch mode, we can only remove the osd if it doesn't
+ // break peering limits.
+ want->swap(candidate_want);
+ async_recovery->insert(cur_shard);
+ }
+ break;
}
}
}
+
psdout(20) << __func__ << " result want=" << *want
<< " async_recovery=" << *async_recovery << dendl;
}
-
-
/**
* choose acting
*
all_info[pg_whoami] = info;
if (cct->_conf->subsys.should_gather<dout_subsys, 10>()) {
- for (map<pg_shard_t, pg_info_t>::iterator p = all_info.begin();
- p != all_info.end();
- ++p) {
+ for (auto p = all_info.begin(); p != all_info.end(); ++p) {
psdout(10) << __func__ << " all_info osd." << p->first << " "
<< p->second << dendl;
}
}
- map<pg_shard_t, pg_info_t>::const_iterator auth_log_shard =
- find_best_info(all_info, restrict_to_up_acting, history_les_bound);
+ auto auth_log_shard = find_best_info(all_info, restrict_to_up_acting,
+ history_les_bound);
if (auth_log_shard == all_info.end()) {
if (up != acting) {
set<pg_shard_t> want_backfill, want_acting_backfill;
vector<int> want;
stringstream ss;
- if (pool.info.is_replicated())
- calc_replicated_acting(
+ if (pool.info.is_replicated()) {
+ auto [primary_shard, oldest_log] = select_replicated_primary(
auth_log_shard,
cct->_conf.get_val<uint64_t>(
- "osd_force_auth_primary_missing_objects"),
- get_osdmap()->get_pg_size(info.pgid.pgid),
- acting,
+ "osd_force_auth_primary_missing_objects"),
up,
up_primary,
all_info,
- restrict_to_up_acting,
- &want,
- &want_backfill,
- &want_acting_backfill,
get_osdmap(),
ss);
- else
+ if (pool.info.is_stretch_pool()) {
+ calc_replicated_acting_stretch(
+ primary_shard,
+ oldest_log,
+ get_osdmap()->get_pg_size(info.pgid.pgid),
+ acting,
+ up,
+ up_primary,
+ all_info,
+ restrict_to_up_acting,
+ &want,
+ &want_backfill,
+ &want_acting_backfill,
+ get_osdmap(),
+ pool,
+ ss);
+ } else {
+ calc_replicated_acting(
+ primary_shard,
+ oldest_log,
+ get_osdmap()->get_pg_size(info.pgid.pgid),
+ acting,
+ up,
+ up_primary,
+ all_info,
+ restrict_to_up_acting,
+ &want,
+ &want_backfill,
+ &want_acting_backfill,
+ get_osdmap(),
+ pool,
+ ss);
+ }
+ } else {
calc_ec_acting(
auth_log_shard,
get_osdmap()->get_pg_size(info.pgid.pgid),
&want_backfill,
&want_acting_backfill,
ss);
+ }
psdout(10) << ss.str() << dendl;
if (!recoverable(want)) {
}
return false;
}
+
if (request_pg_temp_change_only)
return true;
want_acting.clear();
}
// Will not change if already set because up would have had to change
// Verify that nothing in backfill is in stray_set
- for (set<pg_shard_t>::iterator i = want_backfill.begin();
- i != want_backfill.end();
- ++i) {
+ for (auto i = want_backfill.begin(); i != want_backfill.end(); ++i) {
ceph_assert(stray_set.find(*i) == stray_set.end());
}
psdout(10) << "choose_acting want=" << want << " backfill_targets="
<< unfound << " unfound"
<< dendl;
- std::set<pg_shard_t>::const_iterator m = might_have_unfound.begin();
- std::set<pg_shard_t>::const_iterator mend = might_have_unfound.end();
+ auto m = might_have_unfound.begin();
+ auto mend = might_have_unfound.end();
for (; m != mend; ++m) {
pg_shard_t peer(*m);
continue;
}
- map<pg_shard_t, pg_info_t>::const_iterator iter = peer_info.find(peer);
+ auto iter = peer_info.find(peer);
if (iter != peer_info.end() &&
(iter->second.is_empty() || iter->second.dne())) {
// ignore empty peers
pool.info.is_erasure());
// include any (stray) peers
- for (map<pg_shard_t, pg_info_t>::iterator p = peer_info.begin();
- p != peer_info.end();
- ++p)
+ for (auto p = peer_info.begin(); p != peer_info.end(); ++p)
might_have_unfound.insert(p->first);
psdout(15) << __func__ << ": built " << might_have_unfound << dendl;
if (is_primary()) {
// only update primary last_epoch_started if we will go active
- if (acting.size() >= pool.info.min_size) {
+ if (acting_set_writeable()) {
ceph_assert(cct->_conf->osd_find_best_info_ignore_history_les ||
info.last_epoch_started <= activation_epoch);
info.last_epoch_started = activation_epoch;
purged.intersection_of(to_trim, info.purged_snaps);
to_trim.subtract(purged);
- if (HAVE_FEATURE(upacting_features, SERVER_OCTOPUS)) {
- renew_lease(pl->get_mnow());
- // do not schedule until we are actually activated
- }
+ assert(HAVE_FEATURE(upacting_features, SERVER_OCTOPUS));
+ renew_lease(pl->get_mnow());
+ // do not schedule until we are actually activated
// adjust purged_snaps: PG may have been inactive while snaps were pruned
// from the removed_snaps_queue in the osdmap. update local purged_snaps
info.purged_snaps.swap(purged);
// start up replicas
+ if (prior_readable_down_osds.empty()) {
+ dout(10) << __func__ << " no prior_readable_down_osds to wait on, clearing ub"
+ << dendl;
+ clear_prior_readable_until_ub();
+ }
info.history.refresh_prior_readable_until_ub(pl->get_mnow(),
prior_readable_until_ub);
ceph_assert(!acting_recovery_backfill.empty());
- for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
+ for (auto i = acting_recovery_backfill.begin();
i != acting_recovery_backfill.end();
++i) {
if (*i == pg_whoami) continue;
psdout(10) << "activate peer osd." << peer << " " << pi << dendl;
- MOSDPGLog *m = 0;
+ #if defined(WITH_SEASTAR)
+ MURef<MOSDPGLog> m;
+ #else
+ MRef<MOSDPGLog> m;
+ #endif
ceph_assert(peer_missing.count(peer));
pg_missing_t& pm = peer_missing[peer];
bool needs_past_intervals = pi.dne();
+ // Save num_bytes for backfill reservation request, can't be negative
+ peer_bytes[peer] = std::max<int64_t>(0, pi.stats.stats.sum.num_bytes);
+
if (pi.last_update == info.last_update) {
// empty log
if (!pi.last_backfill.is_max())
} else {
psdout(10) << "activate peer osd." << peer
<< " is up to date, but sending pg_log anyway" << dendl;
- m = new MOSDPGLog(
+ m = TOPNSPC::make_message<MOSDPGLog>(
i->shard, pg_whoami.shard,
get_osdmap_epoch(), info,
last_peering_reset);
pi.last_interval_started = info.last_interval_started;
pi.history = info.history;
pi.hit_set = info.hit_set;
- // Save num_bytes for reservation request, can't be negative
- peer_bytes[peer] = std::max<int64_t>(0, pi.stats.stats.sum.num_bytes);
pi.stats.stats.clear();
pi.stats.stats.sum.num_bytes = peer_bytes[peer];
// initialize peer with our purged_snaps.
pi.purged_snaps = info.purged_snaps;
- m = new MOSDPGLog(
+ m = TOPNSPC::make_message<MOSDPGLog>(
i->shard, pg_whoami.shard,
get_osdmap_epoch(), pi,
last_peering_reset /* epoch to create pg at */);
} else {
// catch up
ceph_assert(pg_log.get_tail() <= pi.last_update);
- m = new MOSDPGLog(
+ m = TOPNSPC::make_message<MOSDPGLog>(
i->shard, pg_whoami.shard,
get_osdmap_epoch(), info,
last_peering_reset /* epoch to create pg at */);
// update local version of peer's missing list!
if (m && pi.last_backfill != hobject_t()) {
- for (list<pg_log_entry_t>::iterator p = m->log.log.begin();
- p != m->log.log.end();
- ++p) {
+ for (auto p = m->log.log.begin(); p != m->log.log.end(); ++p) {
if (p->soid <= pi.last_backfill &&
!p->is_error()) {
if (perform_deletes_during_peering() && p->is_delete()) {
dout(10) << "activate peer osd." << peer << " sending " << m->log
<< dendl;
m->lease = get_lease();
- pl->send_cluster_message(peer.osd, m, get_osdmap_epoch());
+ pl->send_cluster_message(peer.osd, std::move(m), get_osdmap_epoch());
}
// peer now has
// Set up missing_loc
set<pg_shard_t> complete_shards;
- for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
+ for (auto i = acting_recovery_backfill.begin();
i != acting_recovery_backfill.end();
++i) {
psdout(20) << __func__ << " setting up missing_loc from shard " << *i
} else {
missing_loc.add_source_info(pg_whoami, info, pg_log.get_missing(),
ctx.handle);
- for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
+ for (auto i = acting_recovery_backfill.begin();
i != acting_recovery_backfill.end();
++i) {
if (*i == pg_whoami) continue;
ctx.handle);
}
}
- for (map<pg_shard_t, pg_missing_t>::iterator i = peer_missing.begin();
- i != peer_missing.end();
- ++i) {
+ for (auto i = peer_missing.begin(); i != peer_missing.end(); ++i) {
if (is_acting_recovery_backfill(i->first))
continue;
ceph_assert(peer_info.count(i->first));
state_set(PG_STATE_ACTIVATING);
pl->on_activate(std::move(to_trim));
}
- if (acting.size() >= pool.info.min_size) {
+ if (acting_set_writeable()) {
PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)};
pg_log.roll_forward(rollbacker.get());
}
peer->second.last_interval_started = info.last_interval_started;
peer->second.history.merge(info.history);
}
- Message* m = nullptr;
- if (last_require_osd_release >= ceph_release_t::octopus) {
- m = new MOSDPGInfo2{spg_t{info.pgid.pgid, pg_shard.shard},
+ auto m = TOPNSPC::make_message<MOSDPGInfo2>(spg_t{info.pgid.pgid, pg_shard.shard},
info,
get_osdmap_epoch(),
get_osdmap_epoch(),
- get_lease(), {}};
- } else {
- m = new MOSDPGInfo{get_osdmap_epoch(),
- {pg_notify_t{pg_shard.shard,
- pg_whoami.shard,
- get_osdmap_epoch(),
- get_osdmap_epoch(),
- info,
- past_intervals}}};
- }
- pl->send_cluster_message(pg_shard.osd, m, get_osdmap_epoch());
+ std::optional<pg_lease_t>{get_lease()},
+ std::nullopt);
+ pl->send_cluster_message(pg_shard.osd, std::move(m), get_osdmap_epoch());
}
}
void PeeringState::merge_log(
- ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog,
+ ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t&& olog,
pg_shard_t from)
{
PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)};
pg_log.merge_log(
- oinfo, olog, from, info, rollbacker.get(), dirty_info, dirty_big_info);
+ oinfo, std::move(olog), from, info, rollbacker.get(),
+ dirty_info, dirty_big_info);
}
void PeeringState::rewind_divergent_log(
void PeeringState::proc_master_log(
ObjectStore::Transaction& t, pg_info_t &oinfo,
- pg_log_t &olog, pg_missing_t& omissing, pg_shard_t from)
+ pg_log_t&& olog, pg_missing_t&& omissing, pg_shard_t from)
{
psdout(10) << "proc_master_log for osd." << from << ": "
<< olog << " " << omissing << dendl;
// make any adjustments to their missing map; we are taking their
// log to be authoritative (i.e., their entries are by definitely
// non-divergent).
- merge_log(t, oinfo, olog, from);
+ merge_log(t, oinfo, std::move(olog), from);
peer_info[from] = oinfo;
psdout(10) << " peer osd." << from << " now " << oinfo
<< " " << omissing << dendl;
ceph_assert(cct->_conf->osd_find_best_info_ignore_history_les ||
info.last_epoch_started >= info.history.last_epoch_started);
- peer_missing[from].claim(omissing);
+ peer_missing[from].claim(std::move(omissing));
}
void PeeringState::proc_replica_log(
pg_info_t &oinfo,
const pg_log_t &olog,
- pg_missing_t& omissing,
+ pg_missing_t&& omissing,
pg_shard_t from)
{
psdout(10) << "proc_replica_log for osd." << from << ": "
<< oinfo << " " << omissing << dendl;
might_have_unfound.insert(from);
- for (map<hobject_t, pg_missing_item>::const_iterator i =
- omissing.get_items().begin();
+ for (auto i = omissing.get_items().begin();
i != omissing.get_items().end();
++i) {
psdout(20) << " after missing " << i->first
<< " need " << i->second.need
<< " have " << i->second.have << dendl;
}
- peer_missing[from].claim(omissing);
+ peer_missing[from].claim(std::move(omissing));
}
void PeeringState::fulfill_info(
ceph_assert(from == primary);
ceph_assert(query.type != pg_query_t::INFO);
- MOSDPGLog *mlog = new MOSDPGLog(
+ auto mlog = TOPNSPC::make_message<MOSDPGLog>(
from.shard, pg_whoami.shard,
get_osdmap_epoch(),
info, query_epoch);
psdout(10) << " sending " << mlog->log << " " << mlog->missing << dendl;
- pl->send_cluster_message(from.osd, mlog, get_osdmap_epoch(), true);
+ pl->send_cluster_message(from.osd, std::move(mlog), get_osdmap_epoch(), true);
}
void PeeringState::fulfill_query(const MQuery& query, PeeringCtxWrapper &rctx)
child->info.stats.parent_split_bits = split_bits;
info.stats.stats_invalid = true;
child->info.stats.stats_invalid = true;
+ child->info.stats.objects_trimmed = 0;
+ child->info.stats.snaptrim_duration = 0.0;
child->info.last_epoch_started = info.last_epoch_started;
child->info.last_interval_started = info.last_interval_started;
if (get_primary() != child->get_primary())
child->info.history.same_primary_since = get_osdmap_epoch();
- child->info.stats.up = up;
+ child->info.stats.up = newup;
child->info.stats.up_primary = up_primary;
- child->info.stats.acting = acting;
+ child->info.stats.acting = newacting;
child->info.stats.acting_primary = primary;
child->info.stats.mapping_epoch = get_osdmap_epoch();
info.stats.blocked_by.clear();
info.stats.blocked_by.resize(keep);
unsigned pos = 0;
- for (set<int>::iterator p = blocked_by.begin();
- p != blocked_by.end() && keep > 0;
- ++p) {
+ for (auto p = blocked_by.begin(); p != blocked_by.end() && keep > 0; ++p) {
if (skip > 0 && (rand() % (skip + keep) < skip)) {
--skip;
} else {
}
std::optional<pg_stat_t> PeeringState::prepare_stats_for_publish(
- bool pg_stats_publish_valid,
- const pg_stat_t &pg_stats_publish,
+ const std::optional<pg_stat_t> &pg_stats_publish,
const object_stat_collection_t &unstable_stats)
{
if (info.stats.stats.sum.num_scrub_errors) {
+ psdout(10) << __func__ << " inconsistent due to " <<
+ info.stats.stats.sum.num_scrub_errors << " scrub errors" << dendl;
state_set(PG_STATE_INCONSISTENT);
} else {
state_clear(PG_STATE_INCONSISTENT);
if (info.stats.state != state) {
info.stats.last_change = now;
// Optimistic estimation, if we just find out an inactive PG,
- // assumt it is active till now.
+ // assume it is active till now.
if (!(state & PG_STATE_ACTIVE) &&
(info.stats.state & PG_STATE_ACTIVE))
info.stats.last_active = now;
psdout(20) << __func__ << " reporting purged_snaps "
<< pre_publish.purged_snaps << dendl;
- if (pg_stats_publish_valid && pre_publish == pg_stats_publish &&
+ if (pg_stats_publish && pre_publish == *pg_stats_publish &&
info.stats.last_fresh > cutoff) {
- psdout(15) << "publish_stats_to_osd " << pg_stats_publish.reported_epoch
+ psdout(15) << "publish_stats_to_osd " << pg_stats_publish->reported_epoch
<< ": no change since " << info.stats.last_fresh << dendl;
return std::nullopt;
} else {
if ((info.stats.state & PG_STATE_UNDERSIZED) == 0)
info.stats.last_fullsized = now;
- psdout(15) << "publish_stats_to_osd " << pg_stats_publish.reported_epoch
- << ":" << pg_stats_publish.reported_seq << dendl;
+ psdout(15) << "publish_stats_to_osd " << pre_publish.reported_epoch
+ << ":" << pre_publish.reported_seq << dendl;
return std::make_optional(std::move(pre_publish));
}
}
const vector<int>& newacting, int new_acting_primary,
const pg_history_t& history,
const PastIntervals& pi,
- bool backfill,
ObjectStore::Transaction &t)
{
psdout(10) << "init role " << role << " up "
pg_log.set_missing_may_contain_deletes();
}
- if (backfill) {
- psdout(10) << __func__ << ": Setting backfill" << dendl;
- info.set_last_backfill(hobject_t());
- info.last_complete = info.last_update;
- pg_log.mark_log_for_rewrite();
- }
-
on_new_interval();
dirty_info = true;
f->dump_string("state", get_pg_state_string());
f->dump_unsigned("epoch", get_osdmap_epoch());
f->open_array_section("up");
- for (vector<int>::const_iterator p = up.begin(); p != up.end(); ++p)
+ for (auto p = up.begin(); p != up.end(); ++p)
f->dump_unsigned("osd", *p);
f->close_section();
f->open_array_section("acting");
- for (vector<int>::const_iterator p = acting.begin(); p != acting.end(); ++p)
+ for (auto p = acting.begin(); p != acting.end(); ++p)
f->dump_unsigned("osd", *p);
f->close_section();
if (!backfill_targets.empty()) {
f->open_array_section("backfill_targets");
- for (set<pg_shard_t>::iterator p = backfill_targets.begin();
- p != backfill_targets.end();
- ++p)
+ for (auto p = backfill_targets.begin(); p != backfill_targets.end(); ++p)
f->dump_stream("shard") << *p;
f->close_section();
}
if (!async_recovery_targets.empty()) {
f->open_array_section("async_recovery_targets");
- for (set<pg_shard_t>::iterator p = async_recovery_targets.begin();
+ for (auto p = async_recovery_targets.begin();
p != async_recovery_targets.end();
++p)
f->dump_stream("shard") << *p;
}
if (!acting_recovery_backfill.empty()) {
f->open_array_section("acting_recovery_backfill");
- for (set<pg_shard_t>::iterator p = acting_recovery_backfill.begin();
+ for (auto p = acting_recovery_backfill.begin();
p != acting_recovery_backfill.end();
++p)
f->dump_stream("shard") << *p;
f->close_section();
f->open_array_section("peer_info");
- for (map<pg_shard_t, pg_info_t>::const_iterator p = peer_info.begin();
- p != peer_info.end();
- ++p) {
+ for (auto p = peer_info.begin(); p != peer_info.end(); ++p) {
f->open_object_section("info");
f->dump_stream("peer") << p->first;
p->second.dump(f);
f->close_section();
}
+ f->close_section();
}
void PeeringState::update_stats(
if (f(info.history, info.stats)) {
pl->publish_stats_to_osd();
}
- pl->on_info_history_change();
+ pl->reschedule_scrub();
if (t) {
dirty_info = true;
}
}
+void PeeringState::update_stats_wo_resched(
+ std::function<void(pg_history_t &, pg_stat_t &)> f)
+{
+ f(info.history, info.stats);
+}
+
bool PeeringState::append_log_entries_update_missing(
const mempool::osd_pglog::list<pg_log_entry_t> &entries,
ObjectStore::Transaction &t, std::optional<eversion_t> trim_to,
ceph_assert(is_primary());
bool rebuild_missing = append_log_entries_update_missing(entries, t, trim_to, roll_forward_to);
- for (set<pg_shard_t>::const_iterator i = acting_recovery_backfill.begin();
+ for (auto i = acting_recovery_backfill.begin();
i != acting_recovery_backfill.end();
++i) {
pg_shard_t peer(*i);
void PeeringState::append_log(
- const vector<pg_log_entry_t>& logv,
+ vector<pg_log_entry_t>&& logv,
eversion_t trim_to,
eversion_t roll_forward_to,
eversion_t mlcod,
pg_log.skip_rollforward();
}
- for (vector<pg_log_entry_t>::const_iterator p = logv.begin();
- p != logv.end();
- ++p) {
+ for (auto p = logv.begin(); p != logv.end(); ++p) {
add_log_entry(*p, transaction_applied);
/* We don't want to leave the rollforward artifacts around
psdout(10) << __func__ << " approx pg log length = "
<< pg_log.get_log().approx_size() << dendl;
+ psdout(10) << __func__ << " dups pg log length = "
+ << pg_log.get_log().dups.size() << dendl;
psdout(10) << __func__ << " transaction_applied = "
<< transaction_applied << dendl;
if (!transaction_applied || async)
// we are fully up to date. tell the primary!
pl->send_cluster_message(
get_primary().osd,
- new MOSDPGTrim(
+ TOPNSPC::make_message<MOSDPGTrim>(
get_osdmap_epoch(),
spg_t(info.pgid.pgid, primary.shard),
last_complete_ondisk),
cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
return;
}
- list<pg_log_entry_t>::const_iterator it = pg_log.get_log().log.begin();
+ auto it = pg_log.get_log().log.begin();
eversion_t new_trim_to;
for (size_t i = 0; i < num_to_trim; ++i) {
new_trim_to = it->version;
info.stats.stats.add(delta_stats);
info.stats.stats.floor(0);
- for (set<pg_shard_t>::const_iterator i = get_backfill_targets().begin();
+ for (auto i = get_backfill_targets().begin();
i != get_backfill_targets().end();
++i) {
pg_shard_t bt = *i;
return discard_event();
}
+boost::statechart::result PeeringState::Started::react(const QueryUnfound& q)
+{
+ q.f->dump_string("state", "Started");
+ q.f->dump_bool("available_might_have_unfound", false);
+ return discard_event();
+}
+
void PeeringState::Started::exit()
{
context< PeeringMachine >().log_exit(state_name, enter_time);
return discard_event();
}
+boost::statechart::result PeeringState::Reset::react(const QueryUnfound& q)
+{
+ q.f->dump_string("state", "Reset");
+ q.f->dump_bool("available_might_have_unfound", false);
+ return discard_event();
+}
+
void PeeringState::Reset::exit()
{
context< PeeringMachine >().log_exit(state_name, enter_time);
q.f->close_section();
q.f->open_array_section("probing_osds");
- for (set<pg_shard_t>::iterator p = prior_set.probe.begin();
- p != prior_set.probe.end();
- ++p)
+ for (auto p = prior_set.probe.begin(); p != prior_set.probe.end(); ++p)
q.f->dump_stream("osd") << *p;
q.f->close_section();
q.f->dump_string("blocked", "peering is blocked due to down osds");
q.f->open_array_section("down_osds_we_would_probe");
- for (set<int>::iterator p = prior_set.down.begin();
- p != prior_set.down.end();
- ++p)
+ for (auto p = prior_set.down.begin(); p != prior_set.down.end(); ++p)
q.f->dump_int("osd", *p);
q.f->close_section();
q.f->open_array_section("peering_blocked_by");
- for (map<int,epoch_t>::iterator p = prior_set.blocked_by.begin();
+ for (auto p = prior_set.blocked_by.begin();
p != prior_set.blocked_by.end();
++p) {
q.f->open_object_section("osd");
return forward_event();
}
+boost::statechart::result PeeringState::Peering::react(const QueryUnfound& q)
+{
+ q.f->dump_string("state", "Peering");
+ q.f->dump_bool("available_might_have_unfound", false);
+ return discard_event();
+}
+
void PeeringState::Peering::exit()
{
{
DECLARE_LOCALS;
pl->cancel_local_background_io_reservation();
- for (set<pg_shard_t>::iterator it = ps->backfill_targets.begin();
+ for (auto it = ps->backfill_targets.begin();
it != ps->backfill_targets.end();
++it) {
ceph_assert(*it != ps->pg_whoami);
pl->send_cluster_message(
it->osd,
- new MBackfillReserve(
+ TOPNSPC::make_message<MBackfillReserve>(
MBackfillReserve::RELEASE,
spg_t(ps->info.pgid.pgid, it->shard),
ps->get_osdmap_epoch()),
ceph_assert(*backfill_osd_it != ps->pg_whoami);
pl->send_cluster_message(
backfill_osd_it->osd,
- new MBackfillReserve(
+ TOPNSPC::make_message<MBackfillReserve>(
MBackfillReserve::REQUEST,
spg_t(context< PeeringMachine >().spgid.pgid, backfill_osd_it->shard),
ps->get_osdmap_epoch(),
ceph_assert(*it != ps->pg_whoami);
pl->send_cluster_message(
it->osd,
- new MBackfillReserve(
+ TOPNSPC::make_message<MBackfillReserve>(
MBackfillReserve::RELEASE,
spg_t(context< PeeringMachine >().spgid.pgid, it->shard),
ps->get_osdmap_epoch()),
ps->state_set(PG_STATE_BACKFILL_WAIT);
pl->request_local_background_io_reservation(
ps->get_backfill_priority(),
- std::make_shared<PGPeeringEvent>(
+ std::make_unique<PGPeeringEvent>(
ps->get_osdmap_epoch(),
ps->get_osdmap_epoch(),
LocalBackfillReserved()),
- std::make_shared<PGPeeringEvent>(
+ std::make_unique<PGPeeringEvent>(
ps->get_osdmap_epoch(),
ps->get_osdmap_epoch(),
DeferBackfill(0.0)));
pl->publish_stats_to_osd();
}
+boost::statechart::result PeeringState::NotBackfilling::react(const QueryUnfound& q)
+{
+ DECLARE_LOCALS;
+
+ ps->query_unfound(q.f, "NotBackfilling");
+ return discard_event();
+}
+
boost::statechart::result
PeeringState::NotBackfilling::react(const RemoteBackfillReserved &evt)
{
pl->publish_stats_to_osd();
}
+boost::statechart::result PeeringState::NotRecovering::react(const QueryUnfound& q)
+{
+ DECLARE_LOCALS;
+
+ ps->query_unfound(q.f, "NotRecovering");
+ return discard_event();
+}
+
void PeeringState::NotRecovering::exit()
{
context< PeeringMachine >().log_exit(state_name, enter_time);
DECLARE_LOCALS;
pl->send_cluster_message(
ps->primary.osd,
- new MRecoveryReserve(
+ TOPNSPC::make_message<MRecoveryReserve>(
MRecoveryReserve::GRANT,
spg_t(ps->info.pgid.pgid, ps->primary.shard),
ps->get_osdmap_epoch()),
evt.primary_num_bytes, evt.local_num_bytes)) {
post_event(RejectTooFullRemoteReservation());
} else {
- PGPeeringEventRef preempt;
+ PGPeeringEventURef preempt;
if (HAVE_FEATURE(ps->upacting_features, RECOVERY_RESERVATION_2)) {
// older peers will interpret preemption as TOOFULL
- preempt = std::make_shared<PGPeeringEvent>(
+ preempt = std::make_unique<PGPeeringEvent>(
pl->get_osdmap_epoch(),
pl->get_osdmap_epoch(),
RemoteBackfillPreempted());
}
pl->request_remote_recovery_reservation(
evt.priority,
- std::make_shared<PGPeeringEvent>(
+ std::make_unique<PGPeeringEvent>(
pl->get_osdmap_epoch(),
pl->get_osdmap_epoch(),
RemoteBackfillReserved()),
- preempt);
+ std::move(preempt));
}
return transit<RepWaitBackfillReserved>();
}
// (pre-mimic compat)
int prio = evt.priority ? evt.priority : ps->get_recovery_priority();
- PGPeeringEventRef preempt;
+ PGPeeringEventURef preempt;
if (HAVE_FEATURE(ps->upacting_features, RECOVERY_RESERVATION_2)) {
// older peers can't handle this
- preempt = std::make_shared<PGPeeringEvent>(
+ preempt = std::make_unique<PGPeeringEvent>(
ps->get_osdmap_epoch(),
ps->get_osdmap_epoch(),
RemoteRecoveryPreempted());
pl->request_remote_recovery_reservation(
prio,
- std::make_shared<PGPeeringEvent>(
+ std::make_unique<PGPeeringEvent>(
ps->get_osdmap_epoch(),
ps->get_osdmap_epoch(),
RemoteRecoveryReserved()),
- preempt);
+ std::move(preempt));
return transit<RepWaitRecoveryReserved>();
}
pl->send_cluster_message(
ps->primary.osd,
- new MBackfillReserve(
+ TOPNSPC::make_message<MBackfillReserve>(
MBackfillReserve::GRANT,
spg_t(ps->info.pgid.pgid, ps->primary.shard),
ps->get_osdmap_epoch()),
pl->unreserve_recovery_space();
pl->send_cluster_message(
ps->primary.osd,
- new MRecoveryReserve(
+ TOPNSPC::make_message<MRecoveryReserve>(
MRecoveryReserve::REVOKE,
spg_t(ps->info.pgid.pgid, ps->primary.shard),
ps->get_osdmap_epoch()),
pl->unreserve_recovery_space();
pl->send_cluster_message(
ps->primary.osd,
- new MBackfillReserve(
+ TOPNSPC::make_message<MBackfillReserve>(
MBackfillReserve::REVOKE_TOOFULL,
spg_t(ps->info.pgid.pgid, ps->primary.shard),
ps->get_osdmap_epoch()),
pl->unreserve_recovery_space();
pl->send_cluster_message(
ps->primary.osd,
- new MBackfillReserve(
+ TOPNSPC::make_message<MBackfillReserve>(
MBackfillReserve::REVOKE,
spg_t(ps->info.pgid.pgid, ps->primary.shard),
ps->get_osdmap_epoch()),
ps->state_set(PG_STATE_RECOVERY_WAIT);
pl->request_local_background_io_reservation(
ps->get_recovery_priority(),
- std::make_shared<PGPeeringEvent>(
+ std::make_unique<PGPeeringEvent>(
ps->get_osdmap_epoch(),
ps->get_osdmap_epoch(),
LocalRecoveryReserved()),
- std::make_shared<PGPeeringEvent>(
+ std::make_unique<PGPeeringEvent>(
ps->get_osdmap_epoch(),
ps->get_osdmap_epoch(),
DeferRecovery(0.0)));
ceph_assert(*remote_recovery_reservation_it != ps->pg_whoami);
pl->send_cluster_message(
remote_recovery_reservation_it->osd,
- new MRecoveryReserve(
+ TOPNSPC::make_message<MRecoveryReserve>(
MRecoveryReserve::REQUEST,
spg_t(context< PeeringMachine >().spgid.pgid,
remote_recovery_reservation_it->shard),
ceph_assert(cancel || !ps->pg_log.get_missing().have_missing());
// release remote reservations
- for (set<pg_shard_t>::const_iterator i =
- context< Active >().remote_shards_to_reserve_recovery.begin();
- i != context< Active >().remote_shards_to_reserve_recovery.end();
- ++i) {
+ for (auto i = context< Active >().remote_shards_to_reserve_recovery.begin();
+ i != context< Active >().remote_shards_to_reserve_recovery.end();
+ ++i) {
if (*i == ps->pg_whoami) // skip myself
continue;
pl->send_cluster_message(
i->osd,
- new MRecoveryReserve(
+ TOPNSPC::make_message<MRecoveryReserve>(
MRecoveryReserve::RELEASE,
spg_t(ps->info.pgid.pgid, i->shard),
ps->get_osdmap_epoch()),
if (!ps->async_recovery_targets.empty()) {
pg_shard_t auth_log_shard;
bool history_les_bound = false;
+ // FIXME: Uh-oh we have to check this return value; choose_acting can fail!
ps->choose_acting(auth_log_shard, true, &history_les_bound);
}
return transit<WaitLocalBackfillReserved>();
true, &history_les_bound)) {
ceph_assert(ps->want_acting.size());
} else if (!ps->async_recovery_targets.empty()) {
+ // FIXME: Uh-oh we have to check this return value; choose_acting can fail!
ps->choose_acting(auth_log_shard, true, &history_les_bound);
}
{
set<int> osds_found;
set<pg_shard_t> out;
- for (typename T::const_iterator i = in.begin();
- i != in.end();
- ++i) {
+ for (auto i = in.begin(); i != in.end(); ++i) {
if (*i != skip && !osds_found.count(i->osd)) {
osds_found.insert(i->osd);
out.insert(*i);
// everyone has to commit/ack before we are truly active
ps->blocked_by.clear();
- for (set<pg_shard_t>::iterator p = ps->acting_recovery_backfill.begin();
+ for (auto p = ps->acting_recovery_backfill.begin();
p != ps->acting_recovery_backfill.end();
++p) {
if (p->shard != ps->pg_whoami.shard) {
// if so, request pg_temp change to trigger a new interval transition
pg_shard_t auth_log_shard;
bool history_les_bound = false;
+ // FIXME: Uh-oh we have to check this return value; choose_acting can fail!
ps->choose_acting(auth_log_shard, false, &history_les_bound, true);
if (!ps->want_acting.empty() && ps->want_acting != ps->acting) {
psdout(10) << "Active: got notify from previous acting member "
psdout(10) << "searching osd." << logevt.from
<< " log for unfound items" << dendl;
ps->proc_replica_log(
- logevt.msg->info, logevt.msg->log, logevt.msg->missing, logevt.from);
+ logevt.msg->info, logevt.msg->log, std::move(logevt.msg->missing), logevt.from);
bool got_missing = ps->search_for_missing(
ps->peer_info[logevt.from],
ps->peer_missing[logevt.from],
{
q.f->open_array_section("might_have_unfound");
- for (set<pg_shard_t>::iterator p = ps->might_have_unfound.begin();
+ for (auto p = ps->might_have_unfound.begin();
p != ps->might_have_unfound.end();
++p) {
q.f->open_object_section("osd");
{
q.f->open_object_section("recovery_progress");
q.f->open_array_section("backfill_targets");
- for (set<pg_shard_t>::const_iterator p = ps->backfill_targets.begin();
+ for (auto p = ps->backfill_targets.begin();
p != ps->backfill_targets.end(); ++p)
q.f->dump_stream("replica") << *p;
q.f->close_section();
return forward_event();
}
+boost::statechart::result PeeringState::Active::react(const QueryUnfound& q)
+{
+ DECLARE_LOCALS;
+
+ ps->query_unfound(q.f, "Active");
+ return discard_event();
+}
+
boost::statechart::result PeeringState::Active::react(
const ActivateCommitted &evt)
{
pl->set_not_ready_to_merge_source(pgid);
}
}
- } else if (ps->acting.size() < ps->pool.info.min_size) {
+ } else if (!ps->acting_set_writeable()) {
ps->state_set(PG_STATE_PEERED);
} else {
ps->state_set(PG_STATE_ACTIVE);
ceph_assert(!ps->acting_recovery_backfill.empty());
ceph_assert(ps->blocked_by.empty());
- if (HAVE_FEATURE(ps->upacting_features, SERVER_OCTOPUS)) {
- // this is overkill when the activation is quick, but when it is slow it
- // is important, because the lease was renewed by the activate itself but we
- // don't know how long ago that was, and simply scheduling now may leave
- // a gap in lease coverage. keep it simple and aggressively renew.
- ps->renew_lease(pl->get_mnow());
- ps->send_lease();
- ps->schedule_renew_lease();
- }
+ assert(HAVE_FEATURE(ps->upacting_features, SERVER_OCTOPUS));
+ // this is overkill when the activation is quick, but when it is slow it
+ // is important, because the lease was renewed by the activate itself but we
+ // don't know how long ago that was, and simply scheduling now may leave
+ // a gap in lease coverage. keep it simple and aggressively renew.
+ ps->renew_lease(pl->get_mnow());
+ ps->send_lease();
+ ps->schedule_renew_lease();
// Degraded?
ps->update_calc_stats();
{}, /* lease */
ps->get_lease_ack());
- if (ps->acting.size() >= ps->pool.info.min_size) {
+ if (ps->acting_set_writeable()) {
ps->state_set(PG_STATE_ACTIVE);
} else {
ps->state_set(PG_STATE_PEERED);
ps->proc_lease(l.lease);
pl->send_cluster_message(
ps->get_primary().osd,
- new MOSDPGLeaseAck(epoch,
+ TOPNSPC::make_message<MOSDPGLeaseAck>(epoch,
spg_t(spgid.pgid, ps->get_primary().shard),
ps->get_lease_ack()),
epoch);
DECLARE_LOCALS;
psdout(10) << "received log from " << logevt.from << dendl;
ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction();
- ps->merge_log(t, logevt.msg->info, logevt.msg->log, logevt.from);
+ ps->merge_log(t, logevt.msg->info, std::move(logevt.msg->log), logevt.from);
ceph_assert(ps->pg_log.get_head() == ps->info.last_update);
if (logevt.msg->lease) {
ps->proc_lease(*logevt.msg->lease);
return forward_event();
}
+boost::statechart::result PeeringState::ReplicaActive::react(const QueryUnfound& q)
+{
+ q.f->dump_string("state", "ReplicaActive");
+ q.f->dump_bool("available_might_have_unfound", false);
+ return discard_event();
+}
+
void PeeringState::ReplicaActive::exit()
{
context< PeeringMachine >().log_exit(state_name, enter_time);
ps->pg_log.reset_backfill();
} else {
- ps->merge_log(t, msg->info, msg->log, logevt.from);
+ ps->merge_log(t, msg->info, std::move(msg->log), logevt.from);
}
if (logevt.msg->lease) {
ps->proc_lease(*logevt.msg->lease);
context< PeeringMachine >().log_exit(state_name, enter_time);
DECLARE_LOCALS;
// note: on a successful removal, this path doesn't execute. see
- // _delete_some().
+ // do_delete_work().
pl->get_perf_logger().dec(l_osd_pg_removing);
pl->cancel_local_background_io_reservation();
pl->cancel_local_background_io_reservation();
pl->request_local_background_io_reservation(
context<ToDelete>().priority,
- std::make_shared<PGPeeringEvent>(
+ std::make_unique<PGPeeringEvent>(
ps->get_osdmap_epoch(),
ps->get_osdmap_epoch(),
DeleteReserved()),
- std::make_shared<PGPeeringEvent>(
+ std::make_unique<PGPeeringEvent>(
ps->get_osdmap_epoch(),
ps->get_osdmap_epoch(),
DeleteInterrupted()));
NamedState(context< PeeringMachine >().state_history, "Started/ToDelete/Deleting")
{
context< PeeringMachine >().log_enter(state_name);
+
DECLARE_LOCALS;
ps->deleting = true;
ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction();
const DeleteSome& evt)
{
DECLARE_LOCALS;
- pl->do_delete_work(context<PeeringMachine>().get_cur_transaction());
- return discard_event();
+ std::pair<ghobject_t, bool> p;
+ p = pl->do_delete_work(context<PeeringMachine>().get_cur_transaction(),
+ next);
+ next = p.first;
+ return p.second ? discard_event() : terminate();
}
void PeeringState::Deleting::exit()
prior_set = ps->build_prior();
ps->prior_readable_down_osds = prior_set.down;
+
if (ps->prior_readable_down_osds.empty()) {
- psdout(10) << " no prior_set down osds, clearing prior_readable_until_ub"
+ psdout(10) << " no prior_set down osds, will clear prior_readable_until_ub before activating"
<< dendl;
- ps->clear_prior_readable_until_ub();
}
ps->reset_min_peer_features();
PastIntervals::PriorSet &prior_set = context< Peering >().prior_set;
ps->blocked_by.clear();
- for (set<pg_shard_t>::const_iterator it = prior_set.probe.begin();
- it != prior_set.probe.end();
- ++it) {
+ for (auto it = prior_set.probe.begin(); it != prior_set.probe.end(); ++it) {
pg_shard_t peer = *it;
if (peer == ps->pg_whoami) {
continue;
DECLARE_LOCALS;
- set<pg_shard_t>::iterator p = peer_info_requested.find(infoevt.from);
+ auto p = peer_info_requested.find(infoevt.from);
if (p != peer_info_requested.end()) {
peer_info_requested.erase(p);
ps->blocked_by.erase(infoevt.from.osd);
// filter out any osds that got dropped from the probe set from
// peer_info_requested. this is less expensive than restarting
// peering (which would re-probe everyone).
- set<pg_shard_t>::iterator p = peer_info_requested.begin();
+ auto p = peer_info_requested.begin();
while (p != peer_info_requested.end()) {
if (prior_set.probe.count(*p) == 0) {
psdout(20) << " dropping osd." << *p << " from info_requested, no longer in probe set" << dendl;
q.f->dump_stream("enter_time") << enter_time;
q.f->open_array_section("requested_info_from");
- for (set<pg_shard_t>::iterator p = peer_info_requested.begin();
+ for (auto p = peer_info_requested.begin();
p != peer_info_requested.end();
++p) {
q.f->open_object_section("osd");
return forward_event();
}
+boost::statechart::result PeeringState::GetInfo::react(const QueryUnfound& q)
+{
+ q.f->dump_string("state", "GetInfo");
+ q.f->dump_bool("available_might_have_unfound", false);
+ return discard_event();
+}
+
void PeeringState::GetInfo::exit()
{
context< PeeringMachine >().log_exit(state_name, enter_time);
// how much log to request?
eversion_t request_log_from = ps->info.last_update;
ceph_assert(!ps->acting_recovery_backfill.empty());
- for (set<pg_shard_t>::iterator p = ps->acting_recovery_backfill.begin();
+ for (auto p = ps->acting_recovery_backfill.begin();
p != ps->acting_recovery_backfill.end();
++p) {
if (*p == ps->pg_whoami) continue;
if (msg) {
psdout(10) << "processing master log" << dendl;
ps->proc_master_log(context<PeeringMachine>().get_cur_transaction(),
- msg->info, msg->log, msg->missing,
+ msg->info, std::move(msg->log), std::move(msg->missing),
auth_log_shard);
}
ps->start_flush(context< PeeringMachine >().get_cur_transaction());
return forward_event();
}
+boost::statechart::result PeeringState::GetLog::react(const QueryUnfound& q)
+{
+ q.f->dump_string("state", "GetLog");
+ q.f->dump_bool("available_might_have_unfound", false);
+ return discard_event();
+}
+
void PeeringState::GetLog::exit()
{
context< PeeringMachine >().log_exit(state_name, enter_time);
OSDMapRef osdmap = advmap.osdmap;
psdout(10) << "verifying no want_acting " << ps->want_acting << " targets didn't go down" << dendl;
- for (vector<int>::iterator p = ps->want_acting.begin(); p != ps->want_acting.end(); ++p) {
+ for (auto p = ps->want_acting.begin(); p != ps->want_acting.end(); ++p) {
if (!osdmap->is_up(*p)) {
psdout(10) << " want_acting target osd." << *p << " went down, resetting" << dendl;
post_event(advmap);
return forward_event();
}
+boost::statechart::result PeeringState::WaitActingChange::react(const QueryUnfound& q)
+{
+ q.f->dump_string("state", "WaitActingChange");
+ q.f->dump_bool("available_might_have_unfound", false);
+ return discard_event();
+}
+
void PeeringState::WaitActingChange::exit()
{
context< PeeringMachine >().log_exit(state_name, enter_time);
return forward_event();
}
+boost::statechart::result PeeringState::Down::react(const QueryUnfound& q)
+{
+ q.f->dump_string("state", "Down");
+ q.f->dump_bool("available_might_have_unfound", false);
+ return discard_event();
+}
+
boost::statechart::result PeeringState::Down::react(const MNotifyRec& infoevt)
{
DECLARE_LOCALS;
return forward_event();
}
+boost::statechart::result PeeringState::Incomplete::react(const QueryUnfound& q)
+{
+ q.f->dump_string("state", "Incomplete");
+ q.f->dump_bool("available_might_have_unfound", false);
+ return discard_event();
+}
+
void PeeringState::Incomplete::exit()
{
context< PeeringMachine >().log_exit(state_name, enter_time);
ps->log_weirdness();
ceph_assert(!ps->acting_recovery_backfill.empty());
eversion_t since;
- for (set<pg_shard_t>::iterator i = ps->acting_recovery_backfill.begin();
+ for (auto i = ps->acting_recovery_backfill.begin();
i != ps->acting_recovery_backfill.end();
++i) {
if (*i == ps->get_primary()) continue;
DECLARE_LOCALS;
peer_missing_requested.erase(logevt.from);
- ps->proc_replica_log(logevt.msg->info, logevt.msg->log, logevt.msg->missing, logevt.from);
+ ps->proc_replica_log(logevt.msg->info,
+ logevt.msg->log,
+ std::move(logevt.msg->missing),
+ logevt.from);
if (peer_missing_requested.empty()) {
if (ps->need_up_thru) {
q.f->dump_stream("enter_time") << enter_time;
q.f->open_array_section("peer_missing_requested");
- for (set<pg_shard_t>::iterator p = peer_missing_requested.begin();
+ for (auto p = peer_missing_requested.begin();
p != peer_missing_requested.end();
++p) {
q.f->open_object_section("osd");
return forward_event();
}
+boost::statechart::result PeeringState::GetMissing::react(const QueryUnfound& q)
+{
+ q.f->dump_string("state", "GetMising");
+ q.f->dump_bool("available_might_have_unfound", false);
+ return discard_event();
+}
+
void PeeringState::GetMissing::exit()
{
context< PeeringMachine >().log_exit(state_name, enter_time);
{
DECLARE_LOCALS;
psdout(10) << "Noting missing from osd." << logevt.from << dendl;
- ps->peer_missing[logevt.from].claim(logevt.msg->missing);
+ ps->peer_missing[logevt.from].claim(std::move(logevt.msg->missing));
ps->peer_info[logevt.from] = logevt.msg->info;
return discard_event();
}
return forward_event();
}
+boost::statechart::result PeeringState::WaitUpThru::react(const QueryUnfound& q)
+{
+ q.f->dump_string("state", "WaitUpThru");
+ q.f->dump_bool("available_might_have_unfound", false);
+ return discard_event();
+}
+
void PeeringState::WaitUpThru::exit()
{
context< PeeringMachine >().log_exit(state_name, enter_time);
}
return out;
}
+
+std::vector<pg_shard_t> PeeringState::get_replica_recovery_order() const
+{
+ std::vector<std::pair<unsigned int, pg_shard_t>> replicas_by_num_missing,
+ async_by_num_missing;
+ replicas_by_num_missing.reserve(get_acting_recovery_backfill().size() - 1);
+ for (auto &p : get_acting_recovery_backfill()) {
+ if (p == get_primary()) {
+ continue;
+ }
+ auto pm = get_peer_missing().find(p);
+ assert(pm != get_peer_missing().end());
+ auto nm = pm->second.num_missing();
+ if (nm != 0) {
+ if (is_async_recovery_target(p)) {
+ async_by_num_missing.push_back(make_pair(nm, p));
+ } else {
+ replicas_by_num_missing.push_back(make_pair(nm, p));
+ }
+ }
+ }
+ // sort by number of missing objects, in ascending order.
+ auto func = [](const std::pair<unsigned int, pg_shard_t> &lhs,
+ const std::pair<unsigned int, pg_shard_t> &rhs) {
+ return lhs.first < rhs.first;
+ };
+ // acting goes first
+ std::sort(replicas_by_num_missing.begin(), replicas_by_num_missing.end(), func);
+ // then async_recovery_targets
+ std::sort(async_by_num_missing.begin(), async_by_num_missing.end(), func);
+ replicas_by_num_missing.insert(replicas_by_num_missing.end(),
+ async_by_num_missing.begin(), async_by_num_missing.end());
+
+ std::vector<pg_shard_t> ret;
+ ret.reserve(replicas_by_num_missing.size());
+ for (auto p : replicas_by_num_missing) {
+ ret.push_back(p.second);
+ }
+ return ret;
+}
+
+