]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/Elector.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / mon / Elector.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "Elector.h"
16#include "Monitor.h"
17
18#include "common/Timer.h"
19#include "MonitorDBStore.h"
20#include "messages/MMonElection.h"
f67539c2 21#include "messages/MMonPing.h"
7c673cae
FG
22
23#include "common/config.h"
11fdf7f2 24#include "include/ceph_assert.h"
7c673cae
FG
25
26#define dout_subsys ceph_subsys_mon
27#undef dout_prefix
9f95a23c 28#define dout_prefix _prefix(_dout, mon, get_epoch())
f67539c2
TL
29using std::cerr;
30using std::cout;
31using std::dec;
32using std::hex;
33using std::list;
34using std::map;
35using std::make_pair;
36using std::ostream;
37using std::ostringstream;
38using std::pair;
39using std::set;
40using std::setfill;
41using std::string;
42using std::stringstream;
43using std::to_string;
44using std::vector;
45using std::unique_ptr;
46
47using ceph::bufferlist;
48using ceph::decode;
49using ceph::encode;
50using ceph::Formatter;
51using ceph::JSONFormatter;
52using ceph::mono_clock;
53using ceph::mono_time;
54using ceph::timespan_str;
7c673cae
FG
55static ostream& _prefix(std::ostream *_dout, Monitor *mon, epoch_t epoch) {
56 return *_dout << "mon." << mon->name << "@" << mon->rank
57 << "(" << mon->get_state_name()
58 << ").elector(" << epoch << ") ";
59}
60
f67539c2
TL
61Elector::Elector(Monitor *m, int strategy) : logic(this, static_cast<ElectionLogic::election_strategy>(strategy),
62 &peer_tracker,
63 m->cct->_conf.get_val<double>("mon_elector_ignore_propose_margin"),
64 m->cct),
65 peer_tracker(this, m->rank,
66 m->cct->_conf.get_val<uint64_t>("mon_con_tracker_score_halflife"),
39ae355f 67 m->cct->_conf.get_val<uint64_t>("mon_con_tracker_persist_interval"), m->cct),
f67539c2
TL
68 ping_timeout(m->cct->_conf.get_val<double>("mon_elector_ping_timeout")),
69 PING_DIVISOR(m->cct->_conf.get_val<uint64_t>("mon_elector_ping_divisor")),
70 mon(m), elector(this) {
71 bufferlist bl;
72 mon->store->get(Monitor::MONITOR_NAME, "connectivity_scores", bl);
73 if (bl.length()) {
74 bufferlist::const_iterator bi = bl.begin();
75 peer_tracker.decode(bi);
76 }
77}
7c673cae 78
9f95a23c
TL
79
80void Elector::persist_epoch(epoch_t e)
7c673cae 81{
9f95a23c
TL
82 auto t(std::make_shared<MonitorDBStore::Transaction>());
83 t->put(Monitor::MONITOR_NAME, "election_epoch", e);
f67539c2
TL
84 t->put(Monitor::MONITOR_NAME, "connectivity_scores", peer_tracker.get_encoded_bl());
85 mon->store->apply_transaction(t);
86}
87
88void Elector::persist_connectivity_scores()
89{
39ae355f 90 dout(20) << __func__ << dendl;
f67539c2
TL
91 auto t(std::make_shared<MonitorDBStore::Transaction>());
92 t->put(Monitor::MONITOR_NAME, "connectivity_scores", peer_tracker.get_encoded_bl());
9f95a23c 93 mon->store->apply_transaction(t);
7c673cae
FG
94}
95
9f95a23c 96epoch_t Elector::read_persisted_epoch() const
7c673cae 97{
9f95a23c 98 return mon->store->get(Monitor::MONITOR_NAME, "election_epoch");
7c673cae
FG
99}
100
9f95a23c 101void Elector::validate_store()
7c673cae 102{
7c673cae 103 auto t(std::make_shared<MonitorDBStore::Transaction>());
9f95a23c
TL
104 t->put(Monitor::MONITOR_NAME, "election_writeable_test", rand());
105 int r = mon->store->apply_transaction(t);
106 ceph_assert(r >= 0);
107}
7c673cae 108
9f95a23c
TL
109bool Elector::is_current_member(int rank) const
110{
111 return mon->quorum.count(rank);
112}
7c673cae 113
9f95a23c
TL
114void Elector::trigger_new_election()
115{
116 mon->start_election();
7c673cae
FG
117}
118
9f95a23c
TL
119int Elector::get_my_rank() const
120{
121 return mon->rank;
122}
7c673cae 123
9f95a23c 124void Elector::reset_election()
7c673cae 125{
9f95a23c
TL
126 mon->bootstrap();
127}
7c673cae 128
9f95a23c
TL
129bool Elector::ever_participated() const
130{
131 return mon->has_ever_joined;
132}
133
134unsigned Elector::paxos_size() const
135{
136 return (unsigned)mon->monmap->size();
137}
138
139void Elector::shutdown()
140{
141 cancel_timer();
142}
7c673cae 143
9f95a23c
TL
144void Elector::notify_bump_epoch()
145{
146 mon->join_election();
147}
148
f67539c2 149void Elector::propose_to_peers(epoch_t e, bufferlist& logic_bl)
9f95a23c 150{
7c673cae
FG
151 // bcast to everyone else
152 for (unsigned i=0; i<mon->monmap->size(); ++i) {
153 if ((int)i == mon->rank) continue;
154 MMonElection *m =
f67539c2
TL
155 new MMonElection(MMonElection::OP_PROPOSE, e,
156 peer_tracker.get_encoded_bl(),
157 logic.strategy, mon->monmap);
158 m->sharing_bl = logic_bl;
7c673cae 159 m->mon_features = ceph::features::mon::get_supported();
11fdf7f2
TL
160 m->mon_release = ceph_release();
161 mon->send_mon_message(m, i);
9f95a23c 162 }
7c673cae
FG
163}
164
9f95a23c 165void Elector::_start()
7c673cae 166{
9f95a23c
TL
167 peer_info.clear();
168 peer_info[mon->rank].cluster_features = CEPH_FEATURES_ALL;
169 peer_info[mon->rank].mon_release = ceph_release();
170 peer_info[mon->rank].mon_features = ceph::features::mon::get_supported();
171 mon->collect_metadata(&peer_info[mon->rank].metadata);
172 reset_timer();
173}
7c673cae 174
9f95a23c
TL
175void Elector::_defer_to(int who)
176{
f67539c2
TL
177 MMonElection *m = new MMonElection(MMonElection::OP_ACK, get_epoch(),
178 peer_tracker.get_encoded_bl(),
179 logic.strategy, mon->monmap);
7c673cae 180 m->mon_features = ceph::features::mon::get_supported();
11fdf7f2 181 m->mon_release = ceph_release();
224ce89b 182 mon->collect_metadata(&m->metadata);
d2e6a577 183
11fdf7f2 184 mon->send_mon_message(m, who);
7c673cae
FG
185
186 // set a timer
187 reset_timer(1.0); // give the leader some extra time to declare victory
188}
189
190
191void Elector::reset_timer(double plus)
192{
193 // set the timer
194 cancel_timer();
195 /**
196 * This class is used as the callback when the expire_event timer fires up.
197 *
198 * If the expire_event is fired, then it means that we had an election going,
199 * either started by us or by some other participant, but it took too long,
200 * thus expiring.
201 *
202 * When the election expires, we will check if we were the ones who won, and
203 * if so we will declare victory. If that is not the case, then we assume
204 * that the one we defered to didn't declare victory quickly enough (in fact,
205 * as far as we know, we may even be dead); so, just propose ourselves as the
206 * Leader.
207 */
3efd9988 208 expire_event = mon->timer.add_event_after(
11fdf7f2 209 g_conf()->mon_election_timeout + plus,
9f95a23c
TL
210 new C_MonContext{mon, [this](int) {
211 logic.end_election_period();
212 }});
7c673cae
FG
213}
214
215
216void Elector::cancel_timer()
217{
218 if (expire_event) {
219 mon->timer.cancel_event(expire_event);
220 expire_event = 0;
221 }
222}
223
f67539c2
TL
224void Elector::assimilate_connection_reports(const bufferlist& tbl)
225{
39ae355f
TL
226 dout(10) << __func__ << dendl;
227 ConnectionTracker pct(tbl, mon->cct);
f67539c2
TL
228 peer_tracker.receive_peer_report(pct);
229}
230
9f95a23c 231void Elector::message_victory(const std::set<int>& quorum)
7c673cae 232{
7c673cae
FG
233 uint64_t cluster_features = CEPH_FEATURES_ALL;
234 mon_feature_t mon_features = ceph::features::mon::get_supported();
224ce89b 235 map<int,Metadata> metadata;
9f95a23c
TL
236 ceph_release_t min_mon_release{ceph_release_t::unknown};
237 for (auto id : quorum) {
238 auto i = peer_info.find(id);
239 ceph_assert(i != peer_info.end());
240 auto& info = i->second;
241 cluster_features &= info.cluster_features;
242 mon_features &= info.mon_features;
243 metadata[id] = info.metadata;
244 if (min_mon_release == ceph_release_t::unknown ||
245 info.mon_release < min_mon_release) {
246 min_mon_release = info.mon_release;
11fdf7f2 247 }
7c673cae
FG
248 }
249
250 cancel_timer();
251
7c673cae 252
7c673cae
FG
253 // tell everyone!
254 for (set<int>::iterator p = quorum.begin();
255 p != quorum.end();
256 ++p) {
257 if (*p == mon->rank) continue;
9f95a23c 258 MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, get_epoch(),
f67539c2
TL
259 peer_tracker.get_encoded_bl(),
260 logic.strategy, mon->monmap);
7c673cae
FG
261 m->quorum = quorum;
262 m->quorum_features = cluster_features;
263 m->mon_features = mon_features;
d2e6a577 264 m->sharing_bl = mon->get_local_commands_bl(mon_features);
11fdf7f2
TL
265 m->mon_release = min_mon_release;
266 mon->send_mon_message(m, *p);
7c673cae 267 }
224ce89b 268
7c673cae 269 // tell monitor
9f95a23c 270 mon->win_election(get_epoch(), quorum,
11fdf7f2
TL
271 cluster_features, mon_features, min_mon_release,
272 metadata);
7c673cae
FG
273}
274
275
276void Elector::handle_propose(MonOpRequestRef op)
277{
278 op->mark_event("elector:handle_propose");
9f95a23c 279 auto m = op->get_req<MMonElection>();
7c673cae
FG
280 dout(5) << "handle_propose from " << m->get_source() << dendl;
281 int from = m->get_source().num();
282
11fdf7f2 283 ceph_assert(m->epoch % 2 == 1); // election
7c673cae
FG
284 uint64_t required_features = mon->get_required_features();
285 mon_feature_t required_mon_features = mon->get_required_mon_features();
286
287 dout(10) << __func__ << " required features " << required_features
288 << " " << required_mon_features
289 << ", peer features " << m->get_connection()->get_features()
290 << " " << m->mon_features
291 << dendl;
292
293 if ((required_features ^ m->get_connection()->get_features()) &
294 required_features) {
295 dout(5) << " ignoring propose from mon" << from
296 << " without required features" << dendl;
297 nak_old_peer(op);
298 return;
11fdf7f2
TL
299 } else if (mon->monmap->min_mon_release > m->mon_release) {
300 dout(5) << " ignoring propose from mon" << from
81eedcae
TL
301 << " release " << (int)m->mon_release
302 << " < min_mon_release " << (int)mon->monmap->min_mon_release
303 << dendl;
11fdf7f2
TL
304 nak_old_peer(op);
305 return;
7c673cae
FG
306 } else if (!m->mon_features.contains_all(required_mon_features)) {
307 // all the features in 'required_mon_features' not in 'm->mon_features'
308 mon_feature_t missing = required_mon_features.diff(m->mon_features);
309 dout(5) << " ignoring propose from mon." << from
310 << " without required mon_features " << missing
311 << dendl;
312 nak_old_peer(op);
7c673cae 313 }
f67539c2
TL
314 ConnectionTracker *oct = NULL;
315 if (m->sharing_bl.length()) {
39ae355f 316 oct = new ConnectionTracker(m->sharing_bl, mon->cct);
f67539c2
TL
317 }
318 logic.receive_propose(from, m->epoch, oct);
319 delete oct;
7c673cae
FG
320}
321
322void Elector::handle_ack(MonOpRequestRef op)
323{
324 op->mark_event("elector:handle_ack");
9f95a23c 325 auto m = op->get_req<MMonElection>();
7c673cae
FG
326 dout(5) << "handle_ack from " << m->get_source() << dendl;
327 int from = m->get_source().num();
328
9f95a23c 329 ceph_assert(m->epoch == get_epoch());
7c673cae
FG
330 uint64_t required_features = mon->get_required_features();
331 if ((required_features ^ m->get_connection()->get_features()) &
332 required_features) {
333 dout(5) << " ignoring ack from mon" << from
334 << " without required features" << dendl;
335 return;
336 }
337
338 mon_feature_t required_mon_features = mon->get_required_mon_features();
339 if (!m->mon_features.contains_all(required_mon_features)) {
340 mon_feature_t missing = required_mon_features.diff(m->mon_features);
341 dout(5) << " ignoring ack from mon." << from
342 << " without required mon_features " << missing
343 << dendl;
344 return;
345 }
346
9f95a23c 347 if (logic.electing_me) {
7c673cae 348 // thanks
9f95a23c
TL
349 peer_info[from].cluster_features = m->get_connection()->get_features();
350 peer_info[from].mon_features = m->mon_features;
351 peer_info[from].mon_release = m->mon_release;
352 peer_info[from].metadata = m->metadata;
7c673cae 353 dout(5) << " so far i have {";
9f95a23c
TL
354 for (auto q = logic.acked_me.begin();
355 q != logic.acked_me.end();
356 ++q) {
357 auto p = peer_info.find(*q);
358 ceph_assert(p != peer_info.end());
359 if (q != logic.acked_me.begin())
7c673cae
FG
360 *_dout << ",";
361 *_dout << " mon." << p->first << ":"
362 << " features " << p->second.cluster_features
363 << " " << p->second.mon_features;
364 }
365 *_dout << " }" << dendl;
7c673cae 366 }
7c673cae 367
9f95a23c
TL
368 logic.receive_ack(from, m->epoch);
369}
7c673cae
FG
370
371void Elector::handle_victory(MonOpRequestRef op)
372{
373 op->mark_event("elector:handle_victory");
9f95a23c 374 auto m = op->get_req<MMonElection>();
7c673cae
FG
375 dout(5) << "handle_victory from " << m->get_source()
376 << " quorum_features " << m->quorum_features
377 << " " << m->mon_features
378 << dendl;
379 int from = m->get_source().num();
380
9f95a23c 381 bool accept_victory = logic.receive_victory_claim(from, m->epoch);
7c673cae 382
9f95a23c 383 if (!accept_victory) {
7c673cae
FG
384 return;
385 }
386
9f95a23c 387 mon->lose_election(get_epoch(), m->quorum, from,
11fdf7f2 388 m->quorum_features, m->mon_features, m->mon_release);
7c673cae
FG
389
390 // cancel my timer
391 cancel_timer();
392
393 // stash leader's commands
11fdf7f2 394 ceph_assert(m->sharing_bl.length());
d2e6a577 395 vector<MonCommand> new_cmds;
11fdf7f2 396 auto bi = m->sharing_bl.cbegin();
d2e6a577
FG
397 MonCommand::decode_vector(new_cmds, bi);
398 mon->set_leader_commands(new_cmds);
7c673cae
FG
399}
400
401void Elector::nak_old_peer(MonOpRequestRef op)
402{
403 op->mark_event("elector:nak_old_peer");
9f95a23c 404 auto m = op->get_req<MMonElection>();
7c673cae
FG
405 uint64_t supported_features = m->get_connection()->get_features();
406 uint64_t required_features = mon->get_required_features();
407 mon_feature_t required_mon_features = mon->get_required_mon_features();
408 dout(10) << "sending nak to peer " << m->get_source()
11fdf7f2
TL
409 << " supports " << supported_features << " " << m->mon_features
410 << ", required " << required_features << " " << required_mon_features
81eedcae
TL
411 << ", release " << (int)m->mon_release
412 << " vs required " << (int)mon->monmap->min_mon_release
11fdf7f2 413 << dendl;
7c673cae 414 MMonElection *reply = new MMonElection(MMonElection::OP_NAK, m->epoch,
f67539c2
TL
415 peer_tracker.get_encoded_bl(),
416 logic.strategy, mon->monmap);
7c673cae
FG
417 reply->quorum_features = required_features;
418 reply->mon_features = required_mon_features;
11fdf7f2 419 reply->mon_release = mon->monmap->min_mon_release;
7c673cae
FG
420 mon->features.encode(reply->sharing_bl);
421 m->get_connection()->send_message(reply);
422}
423
424void Elector::handle_nak(MonOpRequestRef op)
425{
426 op->mark_event("elector:handle_nak");
9f95a23c 427 auto m = op->get_req<MMonElection>();
7c673cae
FG
428 dout(1) << "handle_nak from " << m->get_source()
429 << " quorum_features " << m->quorum_features
430 << " " << m->mon_features
81eedcae 431 << " min_mon_release " << (int)m->mon_release
7c673cae
FG
432 << dendl;
433
11fdf7f2 434 if (m->mon_release > ceph_release()) {
81eedcae
TL
435 derr << "Shutting down because I am release " << (int)ceph_release()
436 << " < min_mon_release " << (int)m->mon_release << dendl;
11fdf7f2
TL
437 } else {
438 CompatSet other;
439 auto bi = m->sharing_bl.cbegin();
440 other.decode(bi);
441 CompatSet diff = Monitor::get_supported_features().unsupported(other);
7c673cae 442
11fdf7f2
TL
443 mon_feature_t mon_supported = ceph::features::mon::get_supported();
444 // all features in 'm->mon_features' not in 'mon_supported'
445 mon_feature_t mon_diff = m->mon_features.diff(mon_supported);
7c673cae 446
11fdf7f2
TL
447 derr << "Shutting down because I lack required monitor features: { "
448 << diff << " } " << mon_diff << dendl;
449 }
7c673cae
FG
450 exit(0);
451 // the end!
452}
453
f67539c2
TL
454void Elector::begin_peer_ping(int peer)
455{
39ae355f 456 dout(20) << __func__ << " against " << peer << dendl;
f67539c2 457 if (live_pinging.count(peer)) {
39ae355f 458 dout(20) << peer << " already in live_pinging ... return " << dendl;
f67539c2
TL
459 return;
460 }
461
462 if (!mon->get_quorum_mon_features().contains_all(
463 ceph::features::mon::FEATURE_PINGING)) {
464 return;
465 }
466
f67539c2
TL
467 peer_tracker.report_live_connection(peer, 0); // init this peer as existing
468 live_pinging.insert(peer);
469 dead_pinging.erase(peer);
470 peer_acked_ping[peer] = ceph_clock_now();
39ae355f 471 if (!send_peer_ping(peer)) return;
f67539c2
TL
472 mon->timer.add_event_after(ping_timeout / PING_DIVISOR,
473 new C_MonContext{mon, [this, peer](int) {
474 ping_check(peer);
475 }});
476}
477
39ae355f 478bool Elector::send_peer_ping(int peer, const utime_t *n)
f67539c2
TL
479{
480 dout(10) << __func__ << " to peer " << peer << dendl;
39ae355f
TL
481 if (peer >= mon->monmap->ranks.size()) {
482 // Monitor no longer exists in the monmap,
483 // therefore, we shouldn't ping this monitor
484 // since we cannot lookup the address!
485 dout(5) << "peer: " << peer << " >= ranks_size: "
486 << mon->monmap->ranks.size() << " ... dropping to prevent "
487 << "https://tracker.ceph.com/issues/50089" << dendl;
488 live_pinging.erase(peer);
489 return false;
490 }
f67539c2
TL
491 utime_t now;
492 if (n != NULL) {
493 now = *n;
494 } else {
495 now = ceph_clock_now();
496 }
497 MMonPing *ping = new MMonPing(MMonPing::PING, now, peer_tracker.get_encoded_bl());
498 mon->messenger->send_to_mon(ping, mon->monmap->get_addrs(peer));
499 peer_sent_ping[peer] = now;
39ae355f 500 return true;
f67539c2
TL
501}
502
503void Elector::ping_check(int peer)
504{
505 dout(20) << __func__ << " to peer " << peer << dendl;
39ae355f 506
f67539c2
TL
507 if (!live_pinging.count(peer) &&
508 !dead_pinging.count(peer)) {
509 dout(20) << __func__ << peer << " is no longer marked for pinging" << dendl;
510 return;
511 }
512 utime_t now = ceph_clock_now();
513 utime_t& acked_ping = peer_acked_ping[peer];
514 utime_t& newest_ping = peer_sent_ping[peer];
515 if (!acked_ping.is_zero() && acked_ping < now - ping_timeout) {
516 peer_tracker.report_dead_connection(peer, now - acked_ping);
517 acked_ping = now;
518 begin_dead_ping(peer);
519 return;
520 }
521
522 if (acked_ping == newest_ping) {
39ae355f 523 if (!send_peer_ping(peer, &now)) return;
f67539c2
TL
524 }
525
526 mon->timer.add_event_after(ping_timeout / PING_DIVISOR,
527 new C_MonContext{mon, [this, peer](int) {
528 ping_check(peer);
529 }});
530}
531
532void Elector::begin_dead_ping(int peer)
533{
39ae355f 534 dout(20) << __func__ << " to peer " << peer << dendl;
f67539c2
TL
535 if (dead_pinging.count(peer)) {
536 return;
537 }
538
539 live_pinging.erase(peer);
540 dead_pinging.insert(peer);
541 mon->timer.add_event_after(ping_timeout,
542 new C_MonContext{mon, [this, peer](int) {
543 dead_ping(peer);
544 }});
545}
546
547void Elector::dead_ping(int peer)
548{
549 dout(20) << __func__ << " to peer " << peer << dendl;
550 if (!dead_pinging.count(peer)) {
551 dout(20) << __func__ << peer << " is no longer marked for dead pinging" << dendl;
552 return;
553 }
554 ceph_assert(!live_pinging.count(peer));
555
556 utime_t now = ceph_clock_now();
557 utime_t& acked_ping = peer_acked_ping[peer];
558
559 peer_tracker.report_dead_connection(peer, now - acked_ping);
560 acked_ping = now;
561 mon->timer.add_event_after(ping_timeout,
562 new C_MonContext{mon, [this, peer](int) {
563 dead_ping(peer);
564 }});
565}
566
567void Elector::handle_ping(MonOpRequestRef op)
568{
569 MMonPing *m = static_cast<MMonPing*>(op->get_req());
f67539c2 570 int prank = mon->monmap->get_rank(m->get_source_addr());
39ae355f 571 dout(20) << __func__ << " from: " << prank << dendl;
f67539c2
TL
572 begin_peer_ping(prank);
573 assimilate_connection_reports(m->tracker_bl);
574 switch(m->op) {
575 case MMonPing::PING:
576 {
577 MMonPing *reply = new MMonPing(MMonPing::PING_REPLY, m->stamp, peer_tracker.get_encoded_bl());
578 m->get_connection()->send_message(reply);
579 }
580 break;
581
582 case MMonPing::PING_REPLY:
39ae355f 583
f67539c2
TL
584 const utime_t& previous_acked = peer_acked_ping[prank];
585 const utime_t& newest = peer_sent_ping[prank];
39ae355f 586
f67539c2
TL
587 if (m->stamp > newest && !newest.is_zero()) {
588 derr << "dropping PING_REPLY stamp " << m->stamp
589 << " as it is newer than newest sent " << newest << dendl;
590 return;
591 }
39ae355f 592
f67539c2 593 if (m->stamp > previous_acked) {
39ae355f 594 dout(20) << "m->stamp > previous_acked" << dendl;
f67539c2
TL
595 peer_tracker.report_live_connection(prank, m->stamp - previous_acked);
596 peer_acked_ping[prank] = m->stamp;
39ae355f
TL
597 } else{
598 dout(20) << "m->stamp <= previous_acked .. we don't report_live_connection" << dendl;
f67539c2
TL
599 }
600 utime_t now = ceph_clock_now();
39ae355f
TL
601 dout(30) << "now: " << now << " m->stamp: " << m->stamp << " ping_timeout: "
602 << ping_timeout << " PING_DIVISOR: " << PING_DIVISOR << dendl;
f67539c2 603 if (now - m->stamp > ping_timeout / PING_DIVISOR) {
39ae355f 604 if (!send_peer_ping(prank, &now)) return;
f67539c2
TL
605 }
606 break;
607 }
608}
609
7c673cae
FG
610void Elector::dispatch(MonOpRequestRef op)
611{
612 op->mark_event("elector:dispatch");
f67539c2 613 ceph_assert(op->is_type_election_or_ping());
7c673cae
FG
614
615 switch (op->get_req()->get_type()) {
616
617 case MSG_MON_ELECTION:
618 {
9f95a23c 619 if (!logic.participating) {
7c673cae
FG
620 return;
621 }
622 if (op->get_req()->get_source().num() >= mon->monmap->size()) {
623 dout(5) << " ignoring bogus election message with bad mon rank "
624 << op->get_req()->get_source() << dendl;
625 return;
626 }
627
9f95a23c 628 auto em = op->get_req<MMonElection>();
39ae355f 629 dout(20) << __func__ << " from: " << mon->monmap->get_rank(em->get_source_addr()) << dendl;
7c673cae
FG
630 // assume an old message encoding would have matched
631 if (em->fsid != mon->monmap->fsid) {
632 dout(0) << " ignoring election msg fsid "
633 << em->fsid << " != " << mon->monmap->fsid << dendl;
634 return;
635 }
636
637 if (!mon->monmap->contains(em->get_source_addr())) {
638 dout(1) << "discarding election message: " << em->get_source_addr()
639 << " not in my monmap " << *mon->monmap << dendl;
640 return;
641 }
642
643 MonMap peermap;
644 peermap.decode(em->monmap_bl);
645 if (peermap.epoch > mon->monmap->epoch) {
646 dout(0) << em->get_source_inst() << " has newer monmap epoch " << peermap.epoch
647 << " > my epoch " << mon->monmap->epoch
648 << ", taking it"
649 << dendl;
650 mon->monmap->decode(em->monmap_bl);
651 auto t(std::make_shared<MonitorDBStore::Transaction>());
652 t->put("monmap", mon->monmap->epoch, em->monmap_bl);
653 t->put("monmap", "last_committed", mon->monmap->epoch);
654 mon->store->apply_transaction(t);
655 //mon->monmon()->paxos->stash_latest(mon->monmap->epoch, em->monmap_bl);
656 cancel_timer();
b3b6e05e 657 mon->notify_new_monmap(false);
7c673cae
FG
658 mon->bootstrap();
659 return;
660 }
661 if (peermap.epoch < mon->monmap->epoch) {
662 dout(0) << em->get_source_inst() << " has older monmap epoch " << peermap.epoch
663 << " < my epoch " << mon->monmap->epoch
664 << dendl;
f67539c2 665 }
7c673cae 666
f67539c2
TL
667 if (em->strategy != logic.strategy) {
668 dout(5) << __func__ << " somehow got an Election message with different strategy "
669 << em->strategy << " from local " << logic.strategy
670 << "; dropping for now to let race resolve" << dendl;
671 return;
672 }
673
674 if (em->scoring_bl.length()) {
675 assimilate_connection_reports(em->scoring_bl);
676 }
677
678 begin_peer_ping(mon->monmap->get_rank(em->get_source_addr()));
7c673cae
FG
679 switch (em->op) {
680 case MMonElection::OP_PROPOSE:
681 handle_propose(op);
682 return;
683 }
684
9f95a23c 685 if (em->epoch < get_epoch()) {
7c673cae
FG
686 dout(5) << "old epoch, dropping" << dendl;
687 break;
688 }
689
690 switch (em->op) {
691 case MMonElection::OP_ACK:
692 handle_ack(op);
693 return;
694 case MMonElection::OP_VICTORY:
695 handle_victory(op);
696 return;
697 case MMonElection::OP_NAK:
698 handle_nak(op);
699 return;
700 default:
701 ceph_abort();
702 }
703 }
704 break;
f67539c2
TL
705
706 case MSG_MON_PING:
707 handle_ping(op);
708 break;
7c673cae
FG
709
710 default:
711 ceph_abort();
712 }
713}
714
715void Elector::start_participating()
716{
9f95a23c 717 logic.participating = true;
7c673cae 718}
f67539c2 719
39ae355f
TL
720bool Elector::peer_tracker_is_clean()
721{
722 return peer_tracker.is_clean(mon->rank, paxos_size());
723}
724
f67539c2
TL
725void Elector::notify_clear_peer_state()
726{
39ae355f
TL
727 dout(10) << __func__ << dendl;
728 dout(20) << " peer_tracker before: " << peer_tracker << dendl;
f67539c2 729 peer_tracker.notify_reset();
39ae355f
TL
730 peer_tracker.set_rank(mon->rank);
731 dout(20) << " peer_tracker after: " << peer_tracker << dendl;
f67539c2
TL
732}
733
734void Elector::notify_rank_changed(int new_rank)
735{
39ae355f 736 dout(10) << __func__ << " to " << new_rank << dendl;
f67539c2
TL
737 peer_tracker.notify_rank_changed(new_rank);
738 live_pinging.erase(new_rank);
739 dead_pinging.erase(new_rank);
740}
741
39ae355f 742void Elector::notify_rank_removed(int rank_removed, int new_rank)
f67539c2 743{
39ae355f
TL
744 dout(10) << __func__ << ": " << rank_removed << dendl;
745 peer_tracker.notify_rank_removed(rank_removed, new_rank);
f67539c2
TL
746 /* we have to clean up the pinging state, which is annoying
747 because it's not indexed anywhere (and adding indexing
2a845540
TL
748 would also be annoying).
749 In the case where we are removing any rank that is not the
750 higest, we start with the removed rank and examine the state
751 of the surrounding ranks.
f67539c2
TL
752 Everybody who remains with larger rank gets a new rank one lower
753 than before, and we have to figure out the remaining scheduled
754 ping contexts. So, starting one past with the removed rank, we:
755 * check if the current rank is alive or dead
756 * examine our new rank (one less than before, initially the removed
757 rank)
758 * * erase it if it's in the wrong set
759 * * start pinging it if we're not already
760 * check if the next rank is in the same pinging set, and delete
761 * ourselves if not.
2a845540
TL
762 In the case where we are removing the highest rank,
763 we erase the removed rank from all sets.
f67539c2 764 */
2a845540
TL
765 if (rank_removed < paxos_size()) {
766 for (unsigned i = rank_removed + 1; i <= paxos_size() ; ++i) {
767 if (live_pinging.count(i)) {
768 dead_pinging.erase(i-1);
769 if (!live_pinging.count(i-1)) {
770 begin_peer_ping(i-1);
771 }
772 if (!live_pinging.count(i+1)) {
773 live_pinging.erase(i);
774 }
f67539c2 775 }
2a845540
TL
776 else if (dead_pinging.count(i)) {
777 live_pinging.erase(i-1);
778 if (!dead_pinging.count(i-1)) {
779 begin_dead_ping(i-1);
780 }
781 if (!dead_pinging.count(i+1)) {
782 dead_pinging.erase(i);
783 }
784 } else {
785 // we aren't pinging rank i at all
786 if (i-1 == (unsigned)rank_removed) {
787 // so we special case to make sure we
788 // actually nuke the removed rank
789 dead_pinging.erase(rank_removed);
790 live_pinging.erase(rank_removed);
791 }
f67539c2 792 }
2a845540
TL
793 }
794 } else {
795 if (live_pinging.count(rank_removed)) {
796 live_pinging.erase(rank_removed);
797 }
798 if (dead_pinging.count(rank_removed)) {
799 dead_pinging.erase(rank_removed);
800 }
801 }
f67539c2
TL
802}
803
804void Elector::notify_strategy_maybe_changed(int strategy)
805{
806 logic.set_election_strategy(static_cast<ElectionLogic::election_strategy>(strategy));
807}