]> git.proxmox.com Git - ceph.git/blame_incremental - ceph/src/mon/Elector.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / mon / Elector.cc
... / ...
CommitLineData
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "Elector.h"
16#include "Monitor.h"
17
18#include "common/Timer.h"
19#include "MonitorDBStore.h"
20#include "messages/MMonElection.h"
21#include "messages/MMonPing.h"
22
23#include "common/config.h"
24#include "include/ceph_assert.h"
25
26#define dout_subsys ceph_subsys_mon
27#undef dout_prefix
28#define dout_prefix _prefix(_dout, mon, get_epoch())
29using std::cerr;
30using std::cout;
31using std::dec;
32using std::hex;
33using std::list;
34using std::map;
35using std::make_pair;
36using std::ostream;
37using std::ostringstream;
38using std::pair;
39using std::set;
40using std::setfill;
41using std::string;
42using std::stringstream;
43using std::to_string;
44using std::vector;
45using std::unique_ptr;
46
47using ceph::bufferlist;
48using ceph::decode;
49using ceph::encode;
50using ceph::Formatter;
51using ceph::JSONFormatter;
52using ceph::mono_clock;
53using ceph::mono_time;
54using ceph::timespan_str;
55static ostream& _prefix(std::ostream *_dout, Monitor *mon, epoch_t epoch) {
56 return *_dout << "mon." << mon->name << "@" << mon->rank
57 << "(" << mon->get_state_name()
58 << ").elector(" << epoch << ") ";
59}
60
61Elector::Elector(Monitor *m, int strategy) : logic(this, static_cast<ElectionLogic::election_strategy>(strategy),
62 &peer_tracker,
63 m->cct->_conf.get_val<double>("mon_elector_ignore_propose_margin"),
64 m->cct),
65 peer_tracker(this, m->rank,
66 m->cct->_conf.get_val<uint64_t>("mon_con_tracker_score_halflife"),
67 m->cct->_conf.get_val<uint64_t>("mon_con_tracker_persist_interval"), m->cct),
68 ping_timeout(m->cct->_conf.get_val<double>("mon_elector_ping_timeout")),
69 PING_DIVISOR(m->cct->_conf.get_val<uint64_t>("mon_elector_ping_divisor")),
70 mon(m), elector(this) {
71 bufferlist bl;
72 mon->store->get(Monitor::MONITOR_NAME, "connectivity_scores", bl);
73 if (bl.length()) {
74 bufferlist::const_iterator bi = bl.begin();
75 peer_tracker.decode(bi);
76 }
77}
78
79
80void Elector::persist_epoch(epoch_t e)
81{
82 auto t(std::make_shared<MonitorDBStore::Transaction>());
83 t->put(Monitor::MONITOR_NAME, "election_epoch", e);
84 t->put(Monitor::MONITOR_NAME, "connectivity_scores", peer_tracker.get_encoded_bl());
85 mon->store->apply_transaction(t);
86}
87
88void Elector::persist_connectivity_scores()
89{
90 dout(20) << __func__ << dendl;
91 auto t(std::make_shared<MonitorDBStore::Transaction>());
92 t->put(Monitor::MONITOR_NAME, "connectivity_scores", peer_tracker.get_encoded_bl());
93 mon->store->apply_transaction(t);
94}
95
96epoch_t Elector::read_persisted_epoch() const
97{
98 return mon->store->get(Monitor::MONITOR_NAME, "election_epoch");
99}
100
101void Elector::validate_store()
102{
103 auto t(std::make_shared<MonitorDBStore::Transaction>());
104 t->put(Monitor::MONITOR_NAME, "election_writeable_test", rand());
105 int r = mon->store->apply_transaction(t);
106 ceph_assert(r >= 0);
107}
108
109bool Elector::is_current_member(int rank) const
110{
111 return mon->quorum.count(rank);
112}
113
114void Elector::trigger_new_election()
115{
116 mon->start_election();
117}
118
119int Elector::get_my_rank() const
120{
121 return mon->rank;
122}
123
124void Elector::reset_election()
125{
126 mon->bootstrap();
127}
128
129bool Elector::ever_participated() const
130{
131 return mon->has_ever_joined;
132}
133
134unsigned Elector::paxos_size() const
135{
136 return (unsigned)mon->monmap->size();
137}
138
139void Elector::shutdown()
140{
141 cancel_timer();
142}
143
144void Elector::notify_bump_epoch()
145{
146 mon->join_election();
147}
148
149void Elector::propose_to_peers(epoch_t e, bufferlist& logic_bl)
150{
151 // bcast to everyone else
152 for (unsigned i=0; i<mon->monmap->size(); ++i) {
153 if ((int)i == mon->rank) continue;
154 MMonElection *m =
155 new MMonElection(MMonElection::OP_PROPOSE, e,
156 peer_tracker.get_encoded_bl(),
157 logic.strategy, mon->monmap);
158 m->sharing_bl = logic_bl;
159 m->mon_features = ceph::features::mon::get_supported();
160 m->mon_release = ceph_release();
161 mon->send_mon_message(m, i);
162 }
163}
164
165void Elector::_start()
166{
167 peer_info.clear();
168 peer_info[mon->rank].cluster_features = CEPH_FEATURES_ALL;
169 peer_info[mon->rank].mon_release = ceph_release();
170 peer_info[mon->rank].mon_features = ceph::features::mon::get_supported();
171 mon->collect_metadata(&peer_info[mon->rank].metadata);
172 reset_timer();
173}
174
175void Elector::_defer_to(int who)
176{
177 MMonElection *m = new MMonElection(MMonElection::OP_ACK, get_epoch(),
178 peer_tracker.get_encoded_bl(),
179 logic.strategy, mon->monmap);
180 m->mon_features = ceph::features::mon::get_supported();
181 m->mon_release = ceph_release();
182 mon->collect_metadata(&m->metadata);
183
184 mon->send_mon_message(m, who);
185
186 // set a timer
187 reset_timer(1.0); // give the leader some extra time to declare victory
188}
189
190
191void Elector::reset_timer(double plus)
192{
193 // set the timer
194 cancel_timer();
195 /**
196 * This class is used as the callback when the expire_event timer fires up.
197 *
198 * If the expire_event is fired, then it means that we had an election going,
199 * either started by us or by some other participant, but it took too long,
200 * thus expiring.
201 *
202 * When the election expires, we will check if we were the ones who won, and
203 * if so we will declare victory. If that is not the case, then we assume
204 * that the one we defered to didn't declare victory quickly enough (in fact,
205 * as far as we know, we may even be dead); so, just propose ourselves as the
206 * Leader.
207 */
208 expire_event = mon->timer.add_event_after(
209 g_conf()->mon_election_timeout + plus,
210 new C_MonContext{mon, [this](int) {
211 logic.end_election_period();
212 }});
213}
214
215
216void Elector::cancel_timer()
217{
218 if (expire_event) {
219 mon->timer.cancel_event(expire_event);
220 expire_event = 0;
221 }
222}
223
224void Elector::assimilate_connection_reports(const bufferlist& tbl)
225{
226 dout(10) << __func__ << dendl;
227 ConnectionTracker pct(tbl, mon->cct);
228 peer_tracker.receive_peer_report(pct);
229}
230
231void Elector::message_victory(const std::set<int>& quorum)
232{
233 uint64_t cluster_features = CEPH_FEATURES_ALL;
234 mon_feature_t mon_features = ceph::features::mon::get_supported();
235 map<int,Metadata> metadata;
236 ceph_release_t min_mon_release{ceph_release_t::unknown};
237 for (auto id : quorum) {
238 auto i = peer_info.find(id);
239 ceph_assert(i != peer_info.end());
240 auto& info = i->second;
241 cluster_features &= info.cluster_features;
242 mon_features &= info.mon_features;
243 metadata[id] = info.metadata;
244 if (min_mon_release == ceph_release_t::unknown ||
245 info.mon_release < min_mon_release) {
246 min_mon_release = info.mon_release;
247 }
248 }
249
250 cancel_timer();
251
252
253 // tell everyone!
254 for (set<int>::iterator p = quorum.begin();
255 p != quorum.end();
256 ++p) {
257 if (*p == mon->rank) continue;
258 MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, get_epoch(),
259 peer_tracker.get_encoded_bl(),
260 logic.strategy, mon->monmap);
261 m->quorum = quorum;
262 m->quorum_features = cluster_features;
263 m->mon_features = mon_features;
264 m->sharing_bl = mon->get_local_commands_bl(mon_features);
265 m->mon_release = min_mon_release;
266 mon->send_mon_message(m, *p);
267 }
268
269 // tell monitor
270 mon->win_election(get_epoch(), quorum,
271 cluster_features, mon_features, min_mon_release,
272 metadata);
273}
274
275
276void Elector::handle_propose(MonOpRequestRef op)
277{
278 op->mark_event("elector:handle_propose");
279 auto m = op->get_req<MMonElection>();
280 dout(5) << "handle_propose from " << m->get_source() << dendl;
281 int from = m->get_source().num();
282
283 ceph_assert(m->epoch % 2 == 1); // election
284 uint64_t required_features = mon->get_required_features();
285 mon_feature_t required_mon_features = mon->get_required_mon_features();
286
287 dout(10) << __func__ << " required features " << required_features
288 << " " << required_mon_features
289 << ", peer features " << m->get_connection()->get_features()
290 << " " << m->mon_features
291 << dendl;
292
293 if ((required_features ^ m->get_connection()->get_features()) &
294 required_features) {
295 dout(5) << " ignoring propose from mon" << from
296 << " without required features" << dendl;
297 nak_old_peer(op);
298 return;
299 } else if (mon->monmap->min_mon_release > m->mon_release) {
300 dout(5) << " ignoring propose from mon" << from
301 << " release " << (int)m->mon_release
302 << " < min_mon_release " << (int)mon->monmap->min_mon_release
303 << dendl;
304 nak_old_peer(op);
305 return;
306 } else if (!m->mon_features.contains_all(required_mon_features)) {
307 // all the features in 'required_mon_features' not in 'm->mon_features'
308 mon_feature_t missing = required_mon_features.diff(m->mon_features);
309 dout(5) << " ignoring propose from mon." << from
310 << " without required mon_features " << missing
311 << dendl;
312 nak_old_peer(op);
313 }
314 ConnectionTracker *oct = NULL;
315 if (m->sharing_bl.length()) {
316 oct = new ConnectionTracker(m->sharing_bl, mon->cct);
317 }
318 logic.receive_propose(from, m->epoch, oct);
319 delete oct;
320}
321
322void Elector::handle_ack(MonOpRequestRef op)
323{
324 op->mark_event("elector:handle_ack");
325 auto m = op->get_req<MMonElection>();
326 dout(5) << "handle_ack from " << m->get_source() << dendl;
327 int from = m->get_source().num();
328
329 ceph_assert(m->epoch == get_epoch());
330 uint64_t required_features = mon->get_required_features();
331 if ((required_features ^ m->get_connection()->get_features()) &
332 required_features) {
333 dout(5) << " ignoring ack from mon" << from
334 << " without required features" << dendl;
335 return;
336 }
337
338 mon_feature_t required_mon_features = mon->get_required_mon_features();
339 if (!m->mon_features.contains_all(required_mon_features)) {
340 mon_feature_t missing = required_mon_features.diff(m->mon_features);
341 dout(5) << " ignoring ack from mon." << from
342 << " without required mon_features " << missing
343 << dendl;
344 return;
345 }
346
347 if (logic.electing_me) {
348 // thanks
349 peer_info[from].cluster_features = m->get_connection()->get_features();
350 peer_info[from].mon_features = m->mon_features;
351 peer_info[from].mon_release = m->mon_release;
352 peer_info[from].metadata = m->metadata;
353 dout(5) << " so far i have {";
354 for (auto q = logic.acked_me.begin();
355 q != logic.acked_me.end();
356 ++q) {
357 auto p = peer_info.find(*q);
358 ceph_assert(p != peer_info.end());
359 if (q != logic.acked_me.begin())
360 *_dout << ",";
361 *_dout << " mon." << p->first << ":"
362 << " features " << p->second.cluster_features
363 << " " << p->second.mon_features;
364 }
365 *_dout << " }" << dendl;
366 }
367
368 logic.receive_ack(from, m->epoch);
369}
370
371void Elector::handle_victory(MonOpRequestRef op)
372{
373 op->mark_event("elector:handle_victory");
374 auto m = op->get_req<MMonElection>();
375 dout(5) << "handle_victory from " << m->get_source()
376 << " quorum_features " << m->quorum_features
377 << " " << m->mon_features
378 << dendl;
379 int from = m->get_source().num();
380
381 bool accept_victory = logic.receive_victory_claim(from, m->epoch);
382
383 if (!accept_victory) {
384 return;
385 }
386
387 mon->lose_election(get_epoch(), m->quorum, from,
388 m->quorum_features, m->mon_features, m->mon_release);
389
390 // cancel my timer
391 cancel_timer();
392
393 // stash leader's commands
394 ceph_assert(m->sharing_bl.length());
395 vector<MonCommand> new_cmds;
396 auto bi = m->sharing_bl.cbegin();
397 MonCommand::decode_vector(new_cmds, bi);
398 mon->set_leader_commands(new_cmds);
399}
400
401void Elector::nak_old_peer(MonOpRequestRef op)
402{
403 op->mark_event("elector:nak_old_peer");
404 auto m = op->get_req<MMonElection>();
405 uint64_t supported_features = m->get_connection()->get_features();
406 uint64_t required_features = mon->get_required_features();
407 mon_feature_t required_mon_features = mon->get_required_mon_features();
408 dout(10) << "sending nak to peer " << m->get_source()
409 << " supports " << supported_features << " " << m->mon_features
410 << ", required " << required_features << " " << required_mon_features
411 << ", release " << (int)m->mon_release
412 << " vs required " << (int)mon->monmap->min_mon_release
413 << dendl;
414 MMonElection *reply = new MMonElection(MMonElection::OP_NAK, m->epoch,
415 peer_tracker.get_encoded_bl(),
416 logic.strategy, mon->monmap);
417 reply->quorum_features = required_features;
418 reply->mon_features = required_mon_features;
419 reply->mon_release = mon->monmap->min_mon_release;
420 mon->features.encode(reply->sharing_bl);
421 m->get_connection()->send_message(reply);
422}
423
424void Elector::handle_nak(MonOpRequestRef op)
425{
426 op->mark_event("elector:handle_nak");
427 auto m = op->get_req<MMonElection>();
428 dout(1) << "handle_nak from " << m->get_source()
429 << " quorum_features " << m->quorum_features
430 << " " << m->mon_features
431 << " min_mon_release " << (int)m->mon_release
432 << dendl;
433
434 if (m->mon_release > ceph_release()) {
435 derr << "Shutting down because I am release " << (int)ceph_release()
436 << " < min_mon_release " << (int)m->mon_release << dendl;
437 } else {
438 CompatSet other;
439 auto bi = m->sharing_bl.cbegin();
440 other.decode(bi);
441 CompatSet diff = Monitor::get_supported_features().unsupported(other);
442
443 mon_feature_t mon_supported = ceph::features::mon::get_supported();
444 // all features in 'm->mon_features' not in 'mon_supported'
445 mon_feature_t mon_diff = m->mon_features.diff(mon_supported);
446
447 derr << "Shutting down because I lack required monitor features: { "
448 << diff << " } " << mon_diff << dendl;
449 }
450 exit(0);
451 // the end!
452}
453
454void Elector::begin_peer_ping(int peer)
455{
456 dout(20) << __func__ << " against " << peer << dendl;
457 if (live_pinging.count(peer)) {
458 dout(20) << peer << " already in live_pinging ... return " << dendl;
459 return;
460 }
461
462 if (!mon->get_quorum_mon_features().contains_all(
463 ceph::features::mon::FEATURE_PINGING)) {
464 return;
465 }
466
467 peer_tracker.report_live_connection(peer, 0); // init this peer as existing
468 live_pinging.insert(peer);
469 dead_pinging.erase(peer);
470 peer_acked_ping[peer] = ceph_clock_now();
471 if (!send_peer_ping(peer)) return;
472 mon->timer.add_event_after(ping_timeout / PING_DIVISOR,
473 new C_MonContext{mon, [this, peer](int) {
474 ping_check(peer);
475 }});
476}
477
478bool Elector::send_peer_ping(int peer, const utime_t *n)
479{
480 dout(10) << __func__ << " to peer " << peer << dendl;
481 if (peer >= mon->monmap->ranks.size()) {
482 // Monitor no longer exists in the monmap,
483 // therefore, we shouldn't ping this monitor
484 // since we cannot lookup the address!
485 dout(5) << "peer: " << peer << " >= ranks_size: "
486 << mon->monmap->ranks.size() << " ... dropping to prevent "
487 << "https://tracker.ceph.com/issues/50089" << dendl;
488 live_pinging.erase(peer);
489 return false;
490 }
491 utime_t now;
492 if (n != NULL) {
493 now = *n;
494 } else {
495 now = ceph_clock_now();
496 }
497 MMonPing *ping = new MMonPing(MMonPing::PING, now, peer_tracker.get_encoded_bl());
498 mon->messenger->send_to_mon(ping, mon->monmap->get_addrs(peer));
499 peer_sent_ping[peer] = now;
500 return true;
501}
502
503void Elector::ping_check(int peer)
504{
505 dout(20) << __func__ << " to peer " << peer << dendl;
506
507 if (!live_pinging.count(peer) &&
508 !dead_pinging.count(peer)) {
509 dout(20) << __func__ << peer << " is no longer marked for pinging" << dendl;
510 return;
511 }
512 utime_t now = ceph_clock_now();
513 utime_t& acked_ping = peer_acked_ping[peer];
514 utime_t& newest_ping = peer_sent_ping[peer];
515 if (!acked_ping.is_zero() && acked_ping < now - ping_timeout) {
516 peer_tracker.report_dead_connection(peer, now - acked_ping);
517 acked_ping = now;
518 begin_dead_ping(peer);
519 return;
520 }
521
522 if (acked_ping == newest_ping) {
523 if (!send_peer_ping(peer, &now)) return;
524 }
525
526 mon->timer.add_event_after(ping_timeout / PING_DIVISOR,
527 new C_MonContext{mon, [this, peer](int) {
528 ping_check(peer);
529 }});
530}
531
532void Elector::begin_dead_ping(int peer)
533{
534 dout(20) << __func__ << " to peer " << peer << dendl;
535 if (dead_pinging.count(peer)) {
536 return;
537 }
538
539 live_pinging.erase(peer);
540 dead_pinging.insert(peer);
541 mon->timer.add_event_after(ping_timeout,
542 new C_MonContext{mon, [this, peer](int) {
543 dead_ping(peer);
544 }});
545}
546
547void Elector::dead_ping(int peer)
548{
549 dout(20) << __func__ << " to peer " << peer << dendl;
550 if (!dead_pinging.count(peer)) {
551 dout(20) << __func__ << peer << " is no longer marked for dead pinging" << dendl;
552 return;
553 }
554 ceph_assert(!live_pinging.count(peer));
555
556 utime_t now = ceph_clock_now();
557 utime_t& acked_ping = peer_acked_ping[peer];
558
559 peer_tracker.report_dead_connection(peer, now - acked_ping);
560 acked_ping = now;
561 mon->timer.add_event_after(ping_timeout,
562 new C_MonContext{mon, [this, peer](int) {
563 dead_ping(peer);
564 }});
565}
566
567void Elector::handle_ping(MonOpRequestRef op)
568{
569 MMonPing *m = static_cast<MMonPing*>(op->get_req());
570 int prank = mon->monmap->get_rank(m->get_source_addr());
571 dout(20) << __func__ << " from: " << prank << dendl;
572 begin_peer_ping(prank);
573 assimilate_connection_reports(m->tracker_bl);
574 switch(m->op) {
575 case MMonPing::PING:
576 {
577 MMonPing *reply = new MMonPing(MMonPing::PING_REPLY, m->stamp, peer_tracker.get_encoded_bl());
578 m->get_connection()->send_message(reply);
579 }
580 break;
581
582 case MMonPing::PING_REPLY:
583
584 const utime_t& previous_acked = peer_acked_ping[prank];
585 const utime_t& newest = peer_sent_ping[prank];
586
587 if (m->stamp > newest && !newest.is_zero()) {
588 derr << "dropping PING_REPLY stamp " << m->stamp
589 << " as it is newer than newest sent " << newest << dendl;
590 return;
591 }
592
593 if (m->stamp > previous_acked) {
594 dout(20) << "m->stamp > previous_acked" << dendl;
595 peer_tracker.report_live_connection(prank, m->stamp - previous_acked);
596 peer_acked_ping[prank] = m->stamp;
597 } else{
598 dout(20) << "m->stamp <= previous_acked .. we don't report_live_connection" << dendl;
599 }
600 utime_t now = ceph_clock_now();
601 dout(30) << "now: " << now << " m->stamp: " << m->stamp << " ping_timeout: "
602 << ping_timeout << " PING_DIVISOR: " << PING_DIVISOR << dendl;
603 if (now - m->stamp > ping_timeout / PING_DIVISOR) {
604 if (!send_peer_ping(prank, &now)) return;
605 }
606 break;
607 }
608}
609
610void Elector::dispatch(MonOpRequestRef op)
611{
612 op->mark_event("elector:dispatch");
613 ceph_assert(op->is_type_election_or_ping());
614
615 switch (op->get_req()->get_type()) {
616
617 case MSG_MON_ELECTION:
618 {
619 if (!logic.participating) {
620 return;
621 }
622 if (op->get_req()->get_source().num() >= mon->monmap->size()) {
623 dout(5) << " ignoring bogus election message with bad mon rank "
624 << op->get_req()->get_source() << dendl;
625 return;
626 }
627
628 auto em = op->get_req<MMonElection>();
629 dout(20) << __func__ << " from: " << mon->monmap->get_rank(em->get_source_addr()) << dendl;
630 // assume an old message encoding would have matched
631 if (em->fsid != mon->monmap->fsid) {
632 dout(0) << " ignoring election msg fsid "
633 << em->fsid << " != " << mon->monmap->fsid << dendl;
634 return;
635 }
636
637 if (!mon->monmap->contains(em->get_source_addr())) {
638 dout(1) << "discarding election message: " << em->get_source_addr()
639 << " not in my monmap " << *mon->monmap << dendl;
640 return;
641 }
642
643 MonMap peermap;
644 peermap.decode(em->monmap_bl);
645 if (peermap.epoch > mon->monmap->epoch) {
646 dout(0) << em->get_source_inst() << " has newer monmap epoch " << peermap.epoch
647 << " > my epoch " << mon->monmap->epoch
648 << ", taking it"
649 << dendl;
650 mon->monmap->decode(em->monmap_bl);
651 auto t(std::make_shared<MonitorDBStore::Transaction>());
652 t->put("monmap", mon->monmap->epoch, em->monmap_bl);
653 t->put("monmap", "last_committed", mon->monmap->epoch);
654 mon->store->apply_transaction(t);
655 //mon->monmon()->paxos->stash_latest(mon->monmap->epoch, em->monmap_bl);
656 cancel_timer();
657 mon->notify_new_monmap(false);
658 mon->bootstrap();
659 return;
660 }
661 if (peermap.epoch < mon->monmap->epoch) {
662 dout(0) << em->get_source_inst() << " has older monmap epoch " << peermap.epoch
663 << " < my epoch " << mon->monmap->epoch
664 << dendl;
665 }
666
667 if (em->strategy != logic.strategy) {
668 dout(5) << __func__ << " somehow got an Election message with different strategy "
669 << em->strategy << " from local " << logic.strategy
670 << "; dropping for now to let race resolve" << dendl;
671 return;
672 }
673
674 if (em->scoring_bl.length()) {
675 assimilate_connection_reports(em->scoring_bl);
676 }
677
678 begin_peer_ping(mon->monmap->get_rank(em->get_source_addr()));
679 switch (em->op) {
680 case MMonElection::OP_PROPOSE:
681 handle_propose(op);
682 return;
683 }
684
685 if (em->epoch < get_epoch()) {
686 dout(5) << "old epoch, dropping" << dendl;
687 break;
688 }
689
690 switch (em->op) {
691 case MMonElection::OP_ACK:
692 handle_ack(op);
693 return;
694 case MMonElection::OP_VICTORY:
695 handle_victory(op);
696 return;
697 case MMonElection::OP_NAK:
698 handle_nak(op);
699 return;
700 default:
701 ceph_abort();
702 }
703 }
704 break;
705
706 case MSG_MON_PING:
707 handle_ping(op);
708 break;
709
710 default:
711 ceph_abort();
712 }
713}
714
715void Elector::start_participating()
716{
717 logic.participating = true;
718}
719
720bool Elector::peer_tracker_is_clean()
721{
722 return peer_tracker.is_clean(mon->rank, paxos_size());
723}
724
725void Elector::notify_clear_peer_state()
726{
727 dout(10) << __func__ << dendl;
728 dout(20) << " peer_tracker before: " << peer_tracker << dendl;
729 peer_tracker.notify_reset();
730 peer_tracker.set_rank(mon->rank);
731 dout(20) << " peer_tracker after: " << peer_tracker << dendl;
732}
733
734void Elector::notify_rank_changed(int new_rank)
735{
736 dout(10) << __func__ << " to " << new_rank << dendl;
737 peer_tracker.notify_rank_changed(new_rank);
738 live_pinging.erase(new_rank);
739 dead_pinging.erase(new_rank);
740}
741
742void Elector::notify_rank_removed(int rank_removed, int new_rank)
743{
744 dout(10) << __func__ << ": " << rank_removed << dendl;
745 peer_tracker.notify_rank_removed(rank_removed, new_rank);
746 /* we have to clean up the pinging state, which is annoying
747 because it's not indexed anywhere (and adding indexing
748 would also be annoying).
749 In the case where we are removing any rank that is not the
750 higest, we start with the removed rank and examine the state
751 of the surrounding ranks.
752 Everybody who remains with larger rank gets a new rank one lower
753 than before, and we have to figure out the remaining scheduled
754 ping contexts. So, starting one past with the removed rank, we:
755 * check if the current rank is alive or dead
756 * examine our new rank (one less than before, initially the removed
757 rank)
758 * * erase it if it's in the wrong set
759 * * start pinging it if we're not already
760 * check if the next rank is in the same pinging set, and delete
761 * ourselves if not.
762 In the case where we are removing the highest rank,
763 we erase the removed rank from all sets.
764 */
765 if (rank_removed < paxos_size()) {
766 for (unsigned i = rank_removed + 1; i <= paxos_size() ; ++i) {
767 if (live_pinging.count(i)) {
768 dead_pinging.erase(i-1);
769 if (!live_pinging.count(i-1)) {
770 begin_peer_ping(i-1);
771 }
772 if (!live_pinging.count(i+1)) {
773 live_pinging.erase(i);
774 }
775 }
776 else if (dead_pinging.count(i)) {
777 live_pinging.erase(i-1);
778 if (!dead_pinging.count(i-1)) {
779 begin_dead_ping(i-1);
780 }
781 if (!dead_pinging.count(i+1)) {
782 dead_pinging.erase(i);
783 }
784 } else {
785 // we aren't pinging rank i at all
786 if (i-1 == (unsigned)rank_removed) {
787 // so we special case to make sure we
788 // actually nuke the removed rank
789 dead_pinging.erase(rank_removed);
790 live_pinging.erase(rank_removed);
791 }
792 }
793 }
794 } else {
795 if (live_pinging.count(rank_removed)) {
796 live_pinging.erase(rank_removed);
797 }
798 if (dead_pinging.count(rank_removed)) {
799 dead_pinging.erase(rank_removed);
800 }
801 }
802}
803
804void Elector::notify_strategy_maybe_changed(int strategy)
805{
806 logic.set_election_strategy(static_cast<ElectionLogic::election_strategy>(strategy));
807}