]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/ConnectionTracker.cc
4d7f5e458094eb083e1691153ed1605f0418c57b
[ceph.git] / ceph / src / mon / ConnectionTracker.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2019 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "ConnectionTracker.h"
16 #include "common/Formatter.h"
17
18 std::ostream& operator<<(std::ostream&o, const ConnectionReport& c) {
19 o << "rank=" << c.rank << ",epoch=" << c.epoch << ",version=" << c.epoch_version
20 << ", current links: " << c.current << ", history: " << c.history;
21 return o;
22 }
23
24 std::ostream& operator<<(std::ostream& o, const ConnectionTracker& c) {
25 o << "rank=" << c.rank << ", epoch=" << c.epoch << ", version=" << c.version
26 << ", half_life=" << c.half_life << ", reports: " << c.peer_reports;
27 return o;
28 }
29
30 ConnectionReport *ConnectionTracker::reports(int p)
31 {
32 auto i = peer_reports.find(p);
33 if (i == peer_reports.end()) {
34 ceph_assert(p != rank);
35 auto[j,k] = peer_reports.insert(std::pair<int,ConnectionReport>(p,ConnectionReport()));
36 i = j;
37 }
38 return &i->second;
39 }
40
41 const ConnectionReport *ConnectionTracker::reports(int p) const
42 {
43 auto i = peer_reports.find(p);
44 if (i == peer_reports.end()) {
45 return NULL;
46 }
47 return &i->second;
48 }
49
50 void ConnectionTracker::receive_peer_report(const ConnectionTracker& o)
51 {
52 for (auto& i : o.peer_reports) {
53 const ConnectionReport& report = i.second;
54 if (report.rank == rank) continue;
55 ConnectionReport& existing = *reports(report.rank);
56 if (report.epoch > existing.epoch ||
57 (report.epoch == existing.epoch &&
58 report.epoch_version > existing.epoch_version)) {
59 existing = report;
60 }
61 }
62 encoding.clear();
63 }
64
65 bool ConnectionTracker::increase_epoch(epoch_t e)
66 {
67 if (e > epoch) {
68 my_reports.epoch_version = version = 0;
69 my_reports.epoch = epoch = e;
70 peer_reports[rank] = my_reports;
71 encoding.clear();
72 return true;
73 }
74 return false;
75 }
76
77 void ConnectionTracker::increase_version()
78 {
79 encoding.clear();
80 ++version;
81 my_reports.epoch_version = version;
82 peer_reports[rank] = my_reports;
83 if ((version % persist_interval) == 0 ) {
84 owner->persist_connectivity_scores();
85 }
86 }
87
88 void ConnectionTracker::report_live_connection(int peer_rank, double units_alive)
89 {
90 // we need to "auto-initialize" to 1, do shenanigans
91 auto i = my_reports.history.find(peer_rank);
92 if (i == my_reports.history.end()) {
93 auto[j,k] = my_reports.history.insert(std::pair<int,double>(peer_rank,1.0));
94 i = j;
95 }
96 double& pscore = i->second;
97 pscore = pscore * (1 - units_alive / (2 * half_life)) +
98 (units_alive / (2 * half_life));
99 pscore = std::min(pscore, 1.0);
100 my_reports.current[peer_rank] = true;
101
102 increase_version();
103 }
104
105 void ConnectionTracker::report_dead_connection(int peer_rank, double units_dead)
106 {
107 // we need to "auto-initialize" to 1, do shenanigans
108 auto i = my_reports.history.find(peer_rank);
109 if (i == my_reports.history.end()) {
110 auto[j,k] = my_reports.history.insert(std::pair<int,double>(peer_rank,1.0));
111 i = j;
112 }
113 double& pscore = i->second;
114 pscore = pscore * (1 - units_dead / (2 * half_life)) -
115 (units_dead / (2*half_life));
116 pscore = std::max(pscore, 0.0);
117 my_reports.current[peer_rank] = false;
118
119 increase_version();
120 }
121
122 void ConnectionTracker::get_total_connection_score(int peer_rank, double *rating,
123 int *live_count) const
124 {
125 *rating = 0;
126 *live_count = 0;
127 double rate = 0;
128 int live = 0;
129
130 for (const auto& i : peer_reports) { // loop through all the scores
131 if (i.first == peer_rank) { // ... except the ones it has for itself, of course!
132 continue;
133 }
134 const auto& report = i.second;
135 auto score_i = report.history.find(peer_rank);
136 auto live_i = report.current.find(peer_rank);
137 if (score_i != report.history.end()) {
138 if (live_i->second) {
139 rate += score_i->second;
140 ++live;
141 }
142 }
143 }
144 *rating = rate;
145 *live_count = live;
146 }
147
148 void ConnectionTracker::notify_rank_removed(int rank_removed)
149 {
150 encoding.clear();
151 size_t starting_size = my_reports.current.size();
152 // erase the removed rank from everywhere
153 my_reports.current.erase(rank_removed);
154 my_reports.history.erase(rank_removed);
155 peer_reports.erase(rank_removed);
156 // Move ranks > rank_removed down by 1
157 // First in my_reports' history+current
158 auto ci = my_reports.current.upper_bound(rank_removed);
159 auto hi = my_reports.history.upper_bound(rank_removed);
160 while (ci != my_reports.current.end()) {
161 ceph_assert(ci->first == hi->first);
162 my_reports.current[ci->first - 1] = ci->second;
163 my_reports.history[hi->first - 1] = hi->second;
164 my_reports.current.erase(ci++);
165 my_reports.history.erase(hi++);
166 }
167 ceph_assert((my_reports.current.size() == starting_size) ||
168 (my_reports.current.size() + 1 == starting_size));
169
170 // now move ranks down one in peer_reports
171 starting_size = peer_reports.size();
172 auto pi = peer_reports.upper_bound(rank_removed);
173 while (pi != peer_reports.end()) {
174 peer_reports[pi->first - 1] = pi->second;
175 peer_reports.erase(pi++);
176 }
177 ceph_assert((peer_reports.size() == starting_size) ||
178 (peer_reports.size() + 1 == starting_size));
179
180 if (rank_removed < rank) {
181 --rank;
182 my_reports.rank = rank;
183 }
184 }
185
186 void ConnectionTracker::encode(bufferlist &bl) const
187 {
188 ENCODE_START(1, 1, bl);
189 encode(rank, bl);
190 encode(epoch, bl);
191 encode(version, bl);
192 encode(half_life, bl);
193 encode(peer_reports, bl);
194 ENCODE_FINISH(bl);
195 }
196
197 void ConnectionTracker::decode(bufferlist::const_iterator& bl) {
198 clear_peer_reports();
199 encoding.clear();
200
201 DECODE_START(1, bl);
202 decode(rank, bl);
203 decode(epoch, bl);
204 decode(version, bl);
205 decode(half_life, bl);
206 decode(peer_reports, bl);
207 DECODE_FINISH(bl);
208 if (rank >=0)
209 my_reports = peer_reports[rank];
210 }
211
212 const bufferlist& ConnectionTracker::get_encoded_bl()
213 {
214 if (!encoding.length()) {
215 encode(encoding);
216 }
217 return encoding;
218 }
219
220 void ConnectionReport::dump(ceph::Formatter *f) const
221 {
222 f->dump_int("rank", rank);
223 f->dump_int("epoch", epoch);
224 f->dump_int("version", epoch_version);
225 f->open_object_section("peer_scores");
226 for (auto i : history) {
227 f->open_object_section("peer");
228 f->dump_int("peer_rank", i.first);
229 f->dump_float("peer_score", i.second);
230 f->dump_bool("peer_alive", current.find(i.first)->second);
231 f->close_section();
232 }
233 f->close_section(); // peer scores
234 }
235
236 void ConnectionReport::generate_test_instances(std::list<ConnectionReport*>& o)
237 {
238 o.push_back(new ConnectionReport);
239 o.push_back(new ConnectionReport);
240 o.back()->rank = 1;
241 o.back()->epoch = 2;
242 o.back()->epoch_version = 3;
243 o.back()->current[0] = true;
244 o.back()->history[0] = .4;
245 }
246
247 void ConnectionTracker::dump(ceph::Formatter *f) const
248 {
249 f->dump_int("rank", rank);
250 f->dump_int("epoch", epoch);
251 f->dump_int("version", version);
252 f->dump_float("half_life", half_life);
253 f->dump_int("persist_interval", persist_interval);
254 f->open_object_section("reports");
255 for (const auto& i : peer_reports) {
256 f->open_object_section("report");
257 i.second.dump(f);
258 f->close_section();
259 }
260 f->close_section(); // reports
261 }
262
263 void ConnectionTracker::generate_test_instances(std::list<ConnectionTracker*>& o)
264 {
265 o.push_back(new ConnectionTracker);
266 o.push_back(new ConnectionTracker);
267 ConnectionTracker *e = o.back();
268 e->rank = 2;
269 e->epoch = 3;
270 e->version = 4;
271 e->peer_reports[0];
272 e->peer_reports[1];
273 e->my_reports = e->peer_reports[2];
274 }