#include "ConnectionTracker.h"
#include "common/Formatter.h"
+#include "common/dout.h"
+#include "include/ceph_assert.h"
+
+#define dout_subsys ceph_subsys_mon
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, rank, epoch, version)
+
+static std::ostream& _prefix(std::ostream *_dout, int rank, epoch_t epoch, uint64_t version) {
+ return *_dout << "rank: " << rank << " version: "<< version << " ConnectionTracker(" << epoch << ") ";
+}
std::ostream& operator<<(std::ostream&o, const ConnectionReport& c) {
o << "rank=" << c.rank << ",epoch=" << c.epoch << ",version=" << c.epoch_version
void ConnectionTracker::receive_peer_report(const ConnectionTracker& o)
{
+ ldout(cct, 30) << __func__ << dendl;
for (auto& i : o.peer_reports) {
const ConnectionReport& report = i.second;
- if (report.rank == rank) continue;
- ConnectionReport& existing = *reports(report.rank);
+ if (i.first == rank) continue;
+ ConnectionReport& existing = *reports(i.first);
if (report.epoch > existing.epoch ||
(report.epoch == existing.epoch &&
report.epoch_version > existing.epoch_version)) {
+ ldout(cct, 30) << " new peer_report is more updated" << dendl;
+ ldout(cct, 30) << "existing: " << existing << dendl;
+ ldout(cct, 30) << "new: " << report << dendl;
existing = report;
}
}
bool ConnectionTracker::increase_epoch(epoch_t e)
{
+ ldout(cct, 30) << __func__ << " to " << e << dendl;
if (e > epoch) {
my_reports.epoch_version = version = 0;
my_reports.epoch = epoch = e;
void ConnectionTracker::increase_version()
{
+ ldout(cct, 30) << __func__ << " to " << version+1 << dendl;
encoding.clear();
++version;
my_reports.epoch_version = version;
peer_reports[rank] = my_reports;
if ((version % persist_interval) == 0 ) {
+ ldout(cct, 30) << version << " % " << persist_interval << " == 0" << dendl;
owner->persist_connectivity_scores();
}
}
void ConnectionTracker::report_live_connection(int peer_rank, double units_alive)
{
+ ldout(cct, 30) << __func__ << " peer_rank: " << peer_rank << " units_alive: " << units_alive << dendl;
+ ldout(cct, 30) << "my_reports before: " << my_reports << dendl;
+ if (peer_rank == rank) {
+ lderr(cct) << "Got a report from my own rank, hopefully this is startup weirdness, dropping" << dendl;
+ return;
+ }
// we need to "auto-initialize" to 1, do shenanigans
auto i = my_reports.history.find(peer_rank);
if (i == my_reports.history.end()) {
+ ldout(cct, 30) << "couldn't find: " << peer_rank
+ << " in my_reports.history" << "... inserting: "
+ << "(" << peer_rank << ", 1" << dendl;
auto[j,k] = my_reports.history.insert(std::pair<int,double>(peer_rank,1.0));
i = j;
}
double& pscore = i->second;
+ ldout(cct, 30) << "adding new pscore to my_reports" << dendl;
pscore = pscore * (1 - units_alive / (2 * half_life)) +
(units_alive / (2 * half_life));
pscore = std::min(pscore, 1.0);
my_reports.current[peer_rank] = true;
increase_version();
+ ldout(cct, 30) << "my_reports after: " << my_reports << dendl;
}
void ConnectionTracker::report_dead_connection(int peer_rank, double units_dead)
{
+ ldout(cct, 30) << __func__ << " peer_rank: " << peer_rank << " units_dead: " << units_dead << dendl;
+ ldout(cct, 30) << "my_reports before: " << my_reports << dendl;
+ if (peer_rank == rank) {
+ lderr(cct) << "Got a report from my own rank, hopefully this is startup weirdness, dropping" << dendl;
+ return;
+ }
// we need to "auto-initialize" to 1, do shenanigans
auto i = my_reports.history.find(peer_rank);
if (i == my_reports.history.end()) {
+ ldout(cct, 30) << "couldn't find: " << peer_rank
+ << " in my_reports.history" << "... inserting: "
+ << "(" << peer_rank << ", 1" << dendl;
auto[j,k] = my_reports.history.insert(std::pair<int,double>(peer_rank,1.0));
i = j;
}
double& pscore = i->second;
+ ldout(cct, 30) << "adding new pscore to my_reports" << dendl;
pscore = pscore * (1 - units_dead / (2 * half_life)) -
(units_dead / (2*half_life));
pscore = std::max(pscore, 0.0);
my_reports.current[peer_rank] = false;
increase_version();
+ ldout(cct, 30) << "my_reports after: " << my_reports << dendl;
}
void ConnectionTracker::get_total_connection_score(int peer_rank, double *rating,
int *live_count) const
{
+ ldout(cct, 30) << __func__ << dendl;
*rating = 0;
*live_count = 0;
double rate = 0;
*live_count = live;
}
-void ConnectionTracker::notify_rank_removed(int rank_removed)
+void ConnectionTracker::notify_rank_changed(int new_rank)
+{
+ ldout(cct, 20) << __func__ << " to " << new_rank << dendl;
+ if (new_rank == rank) return;
+ ldout(cct, 20) << "peer_reports before: " << peer_reports << dendl;
+ peer_reports.erase(rank);
+ peer_reports.erase(new_rank);
+ my_reports.rank = new_rank;
+ rank = new_rank;
+ encoding.clear();
+ ldout(cct, 20) << "peer_reports after: " << peer_reports << dendl;
+
+ increase_version();
+}
+
+void ConnectionTracker::notify_rank_removed(int rank_removed, int new_rank)
{
+ ldout(cct, 20) << __func__ << " " << rank_removed
+ << " new_rank: " << new_rank << dendl;
+ ldout(cct, 20) << "my_reports before: " << my_reports << dendl;
+ ldout(cct, 20) << "peer_reports before: " << peer_reports << dendl;
+ ldout(cct, 20) << "my rank before: " << rank << dendl;
+
encoding.clear();
- size_t starting_size = my_reports.current.size();
- // erase the removed rank from everywhere
+ size_t starting_size_current = my_reports.current.size();
+ // Lets adjust everything in my report.
my_reports.current.erase(rank_removed);
my_reports.history.erase(rank_removed);
- peer_reports.erase(rank_removed);
- // Move ranks > rank_removed down by 1
- // First in my_reports' history+current
auto ci = my_reports.current.upper_bound(rank_removed);
auto hi = my_reports.history.upper_bound(rank_removed);
while (ci != my_reports.current.end()) {
my_reports.current.erase(ci++);
my_reports.history.erase(hi++);
}
- ceph_assert((my_reports.current.size() == starting_size) ||
- (my_reports.current.size() + 1 == starting_size));
+ ceph_assert((my_reports.current.size() == starting_size_current) ||
+ (my_reports.current.size() + 1 == starting_size_current));
- // now move ranks down one in peer_reports
- starting_size = peer_reports.size();
+ size_t starting_size = peer_reports.size();
auto pi = peer_reports.upper_bound(rank_removed);
+ // Remove the target rank and adjust everything that comes after.
+ // Note that we don't adjust current and history for our peer_reports
+ // because it is better to rely on our peers on that information.
+ peer_reports.erase(rank_removed);
while (pi != peer_reports.end()) {
- peer_reports[pi->first - 1] = pi->second;
- peer_reports.erase(pi++);
+ peer_reports[pi->first - 1] = pi->second; // copy content of next rank to ourself.
+ peer_reports.erase(pi++); // destroy our next rank and move on.
}
+
ceph_assert((peer_reports.size() == starting_size) ||
- (peer_reports.size() + 1 == starting_size));
+ (peer_reports.size() + 1 == starting_size));
- if (rank_removed < rank) {
+ if (rank_removed < rank) { // if the rank removed is lower than us, we need to adjust.
--rank;
- my_reports.rank = rank;
+ my_reports.rank = rank; // also adjust my_reports.rank.
+ }
+
+ ldout(cct, 20) << "my rank after: " << rank << dendl;
+ ldout(cct, 20) << "peer_reports after: " << peer_reports << dendl;
+ ldout(cct, 20) << "my_reports after: " << my_reports << dendl;
+
+ //check if the new_rank from monmap is equal to our adjusted rank.
+ ceph_assert(rank == new_rank);
+
+ increase_version();
+}
+
+bool ConnectionTracker::is_clean(int mon_rank, int monmap_size)
+{
+ ldout(cct, 30) << __func__ << dendl;
+ // check consistency between our rank according
+ // to monmap and our rank according to our report.
+ if (rank != mon_rank ||
+ my_reports.rank != mon_rank) {
+ return false;
+ } else if (!peer_reports.empty()){
+ // if peer_report max rank is greater than monmap max rank
+ // then there is a problem.
+ if (peer_reports.rbegin()->first > monmap_size - 1) return false;
}
+ return true;
}
void ConnectionTracker::encode(bufferlist &bl) const