]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/Elector.cc
42be292d45146a3f14c1eca3c030243735825ea0
[ceph.git] / ceph / src / mon / Elector.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "Elector.h"
16 #include "Monitor.h"
17
18 #include "common/Timer.h"
19 #include "MonitorDBStore.h"
20 #include "messages/MMonElection.h"
21 #include "messages/MMonPing.h"
22
23 #include "common/config.h"
24 #include "include/ceph_assert.h"
25
26 #define dout_subsys ceph_subsys_mon
27 #undef dout_prefix
28 #define dout_prefix _prefix(_dout, mon, get_epoch())
29 using std::cerr;
30 using std::cout;
31 using std::dec;
32 using std::hex;
33 using std::list;
34 using std::map;
35 using std::make_pair;
36 using std::ostream;
37 using std::ostringstream;
38 using std::pair;
39 using std::set;
40 using std::setfill;
41 using std::string;
42 using std::stringstream;
43 using std::to_string;
44 using std::vector;
45 using std::unique_ptr;
46
47 using ceph::bufferlist;
48 using ceph::decode;
49 using ceph::encode;
50 using ceph::Formatter;
51 using ceph::JSONFormatter;
52 using ceph::mono_clock;
53 using ceph::mono_time;
54 using ceph::timespan_str;
55 static ostream& _prefix(std::ostream *_dout, Monitor *mon, epoch_t epoch) {
56 return *_dout << "mon." << mon->name << "@" << mon->rank
57 << "(" << mon->get_state_name()
58 << ").elector(" << epoch << ") ";
59 }
60
61 Elector::Elector(Monitor *m, int strategy) : logic(this, static_cast<ElectionLogic::election_strategy>(strategy),
62 &peer_tracker,
63 m->cct->_conf.get_val<double>("mon_elector_ignore_propose_margin"),
64 m->cct),
65 peer_tracker(this, m->rank,
66 m->cct->_conf.get_val<uint64_t>("mon_con_tracker_score_halflife"),
67 m->cct->_conf.get_val<uint64_t>("mon_con_tracker_persist_interval")),
68 ping_timeout(m->cct->_conf.get_val<double>("mon_elector_ping_timeout")),
69 PING_DIVISOR(m->cct->_conf.get_val<uint64_t>("mon_elector_ping_divisor")),
70 mon(m), elector(this) {
71 bufferlist bl;
72 mon->store->get(Monitor::MONITOR_NAME, "connectivity_scores", bl);
73 if (bl.length()) {
74 bufferlist::const_iterator bi = bl.begin();
75 peer_tracker.decode(bi);
76 }
77 }
78
79
80 void Elector::persist_epoch(epoch_t e)
81 {
82 auto t(std::make_shared<MonitorDBStore::Transaction>());
83 t->put(Monitor::MONITOR_NAME, "election_epoch", e);
84 t->put(Monitor::MONITOR_NAME, "connectivity_scores", peer_tracker.get_encoded_bl());
85 mon->store->apply_transaction(t);
86 }
87
88 void Elector::persist_connectivity_scores()
89 {
90 auto t(std::make_shared<MonitorDBStore::Transaction>());
91 t->put(Monitor::MONITOR_NAME, "connectivity_scores", peer_tracker.get_encoded_bl());
92 mon->store->apply_transaction(t);
93 }
94
95 epoch_t Elector::read_persisted_epoch() const
96 {
97 return mon->store->get(Monitor::MONITOR_NAME, "election_epoch");
98 }
99
100 void Elector::validate_store()
101 {
102 auto t(std::make_shared<MonitorDBStore::Transaction>());
103 t->put(Monitor::MONITOR_NAME, "election_writeable_test", rand());
104 int r = mon->store->apply_transaction(t);
105 ceph_assert(r >= 0);
106 }
107
108 bool Elector::is_current_member(int rank) const
109 {
110 return mon->quorum.count(rank);
111 }
112
113 void Elector::trigger_new_election()
114 {
115 mon->start_election();
116 }
117
118 int Elector::get_my_rank() const
119 {
120 return mon->rank;
121 }
122
123 void Elector::reset_election()
124 {
125 mon->bootstrap();
126 }
127
128 bool Elector::ever_participated() const
129 {
130 return mon->has_ever_joined;
131 }
132
133 unsigned Elector::paxos_size() const
134 {
135 return (unsigned)mon->monmap->size();
136 }
137
138 void Elector::shutdown()
139 {
140 cancel_timer();
141 }
142
143 void Elector::notify_bump_epoch()
144 {
145 mon->join_election();
146 }
147
148 void Elector::propose_to_peers(epoch_t e, bufferlist& logic_bl)
149 {
150 // bcast to everyone else
151 for (unsigned i=0; i<mon->monmap->size(); ++i) {
152 if ((int)i == mon->rank) continue;
153 MMonElection *m =
154 new MMonElection(MMonElection::OP_PROPOSE, e,
155 peer_tracker.get_encoded_bl(),
156 logic.strategy, mon->monmap);
157 m->sharing_bl = logic_bl;
158 m->mon_features = ceph::features::mon::get_supported();
159 m->mon_release = ceph_release();
160 mon->send_mon_message(m, i);
161 }
162 }
163
164 void Elector::_start()
165 {
166 peer_info.clear();
167 peer_info[mon->rank].cluster_features = CEPH_FEATURES_ALL;
168 peer_info[mon->rank].mon_release = ceph_release();
169 peer_info[mon->rank].mon_features = ceph::features::mon::get_supported();
170 mon->collect_metadata(&peer_info[mon->rank].metadata);
171 reset_timer();
172 }
173
174 void Elector::_defer_to(int who)
175 {
176 MMonElection *m = new MMonElection(MMonElection::OP_ACK, get_epoch(),
177 peer_tracker.get_encoded_bl(),
178 logic.strategy, mon->monmap);
179 m->mon_features = ceph::features::mon::get_supported();
180 m->mon_release = ceph_release();
181 mon->collect_metadata(&m->metadata);
182
183 mon->send_mon_message(m, who);
184
185 // set a timer
186 reset_timer(1.0); // give the leader some extra time to declare victory
187 }
188
189
190 void Elector::reset_timer(double plus)
191 {
192 // set the timer
193 cancel_timer();
194 /**
195 * This class is used as the callback when the expire_event timer fires up.
196 *
197 * If the expire_event is fired, then it means that we had an election going,
198 * either started by us or by some other participant, but it took too long,
199 * thus expiring.
200 *
201 * When the election expires, we will check if we were the ones who won, and
202 * if so we will declare victory. If that is not the case, then we assume
203 * that the one we defered to didn't declare victory quickly enough (in fact,
204 * as far as we know, we may even be dead); so, just propose ourselves as the
205 * Leader.
206 */
207 expire_event = mon->timer.add_event_after(
208 g_conf()->mon_election_timeout + plus,
209 new C_MonContext{mon, [this](int) {
210 logic.end_election_period();
211 }});
212 }
213
214
215 void Elector::cancel_timer()
216 {
217 if (expire_event) {
218 mon->timer.cancel_event(expire_event);
219 expire_event = 0;
220 }
221 }
222
223 void Elector::assimilate_connection_reports(const bufferlist& tbl)
224 {
225 ConnectionTracker pct(tbl);
226 peer_tracker.receive_peer_report(pct);
227 }
228
229 void Elector::message_victory(const std::set<int>& quorum)
230 {
231 uint64_t cluster_features = CEPH_FEATURES_ALL;
232 mon_feature_t mon_features = ceph::features::mon::get_supported();
233 map<int,Metadata> metadata;
234 ceph_release_t min_mon_release{ceph_release_t::unknown};
235 for (auto id : quorum) {
236 auto i = peer_info.find(id);
237 ceph_assert(i != peer_info.end());
238 auto& info = i->second;
239 cluster_features &= info.cluster_features;
240 mon_features &= info.mon_features;
241 metadata[id] = info.metadata;
242 if (min_mon_release == ceph_release_t::unknown ||
243 info.mon_release < min_mon_release) {
244 min_mon_release = info.mon_release;
245 }
246 }
247
248 cancel_timer();
249
250
251 // tell everyone!
252 for (set<int>::iterator p = quorum.begin();
253 p != quorum.end();
254 ++p) {
255 if (*p == mon->rank) continue;
256 MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, get_epoch(),
257 peer_tracker.get_encoded_bl(),
258 logic.strategy, mon->monmap);
259 m->quorum = quorum;
260 m->quorum_features = cluster_features;
261 m->mon_features = mon_features;
262 m->sharing_bl = mon->get_local_commands_bl(mon_features);
263 m->mon_release = min_mon_release;
264 mon->send_mon_message(m, *p);
265 }
266
267 // tell monitor
268 mon->win_election(get_epoch(), quorum,
269 cluster_features, mon_features, min_mon_release,
270 metadata);
271 }
272
273
274 void Elector::handle_propose(MonOpRequestRef op)
275 {
276 op->mark_event("elector:handle_propose");
277 auto m = op->get_req<MMonElection>();
278 dout(5) << "handle_propose from " << m->get_source() << dendl;
279 int from = m->get_source().num();
280
281 ceph_assert(m->epoch % 2 == 1); // election
282 uint64_t required_features = mon->get_required_features();
283 mon_feature_t required_mon_features = mon->get_required_mon_features();
284
285 dout(10) << __func__ << " required features " << required_features
286 << " " << required_mon_features
287 << ", peer features " << m->get_connection()->get_features()
288 << " " << m->mon_features
289 << dendl;
290
291 if ((required_features ^ m->get_connection()->get_features()) &
292 required_features) {
293 dout(5) << " ignoring propose from mon" << from
294 << " without required features" << dendl;
295 nak_old_peer(op);
296 return;
297 } else if (mon->monmap->min_mon_release > m->mon_release) {
298 dout(5) << " ignoring propose from mon" << from
299 << " release " << (int)m->mon_release
300 << " < min_mon_release " << (int)mon->monmap->min_mon_release
301 << dendl;
302 nak_old_peer(op);
303 return;
304 } else if (!m->mon_features.contains_all(required_mon_features)) {
305 // all the features in 'required_mon_features' not in 'm->mon_features'
306 mon_feature_t missing = required_mon_features.diff(m->mon_features);
307 dout(5) << " ignoring propose from mon." << from
308 << " without required mon_features " << missing
309 << dendl;
310 nak_old_peer(op);
311 }
312 ConnectionTracker *oct = NULL;
313 if (m->sharing_bl.length()) {
314 oct = new ConnectionTracker(m->sharing_bl);
315 }
316 logic.receive_propose(from, m->epoch, oct);
317 delete oct;
318 }
319
320 void Elector::handle_ack(MonOpRequestRef op)
321 {
322 op->mark_event("elector:handle_ack");
323 auto m = op->get_req<MMonElection>();
324 dout(5) << "handle_ack from " << m->get_source() << dendl;
325 int from = m->get_source().num();
326
327 ceph_assert(m->epoch == get_epoch());
328 uint64_t required_features = mon->get_required_features();
329 if ((required_features ^ m->get_connection()->get_features()) &
330 required_features) {
331 dout(5) << " ignoring ack from mon" << from
332 << " without required features" << dendl;
333 return;
334 }
335
336 mon_feature_t required_mon_features = mon->get_required_mon_features();
337 if (!m->mon_features.contains_all(required_mon_features)) {
338 mon_feature_t missing = required_mon_features.diff(m->mon_features);
339 dout(5) << " ignoring ack from mon." << from
340 << " without required mon_features " << missing
341 << dendl;
342 return;
343 }
344
345 if (logic.electing_me) {
346 // thanks
347 peer_info[from].cluster_features = m->get_connection()->get_features();
348 peer_info[from].mon_features = m->mon_features;
349 peer_info[from].mon_release = m->mon_release;
350 peer_info[from].metadata = m->metadata;
351 dout(5) << " so far i have {";
352 for (auto q = logic.acked_me.begin();
353 q != logic.acked_me.end();
354 ++q) {
355 auto p = peer_info.find(*q);
356 ceph_assert(p != peer_info.end());
357 if (q != logic.acked_me.begin())
358 *_dout << ",";
359 *_dout << " mon." << p->first << ":"
360 << " features " << p->second.cluster_features
361 << " " << p->second.mon_features;
362 }
363 *_dout << " }" << dendl;
364 }
365
366 logic.receive_ack(from, m->epoch);
367 }
368
369 void Elector::handle_victory(MonOpRequestRef op)
370 {
371 op->mark_event("elector:handle_victory");
372 auto m = op->get_req<MMonElection>();
373 dout(5) << "handle_victory from " << m->get_source()
374 << " quorum_features " << m->quorum_features
375 << " " << m->mon_features
376 << dendl;
377 int from = m->get_source().num();
378
379 bool accept_victory = logic.receive_victory_claim(from, m->epoch);
380
381 if (!accept_victory) {
382 return;
383 }
384
385 mon->lose_election(get_epoch(), m->quorum, from,
386 m->quorum_features, m->mon_features, m->mon_release);
387
388 // cancel my timer
389 cancel_timer();
390
391 // stash leader's commands
392 ceph_assert(m->sharing_bl.length());
393 vector<MonCommand> new_cmds;
394 auto bi = m->sharing_bl.cbegin();
395 MonCommand::decode_vector(new_cmds, bi);
396 mon->set_leader_commands(new_cmds);
397 }
398
399 void Elector::nak_old_peer(MonOpRequestRef op)
400 {
401 op->mark_event("elector:nak_old_peer");
402 auto m = op->get_req<MMonElection>();
403 uint64_t supported_features = m->get_connection()->get_features();
404 uint64_t required_features = mon->get_required_features();
405 mon_feature_t required_mon_features = mon->get_required_mon_features();
406 dout(10) << "sending nak to peer " << m->get_source()
407 << " supports " << supported_features << " " << m->mon_features
408 << ", required " << required_features << " " << required_mon_features
409 << ", release " << (int)m->mon_release
410 << " vs required " << (int)mon->monmap->min_mon_release
411 << dendl;
412 MMonElection *reply = new MMonElection(MMonElection::OP_NAK, m->epoch,
413 peer_tracker.get_encoded_bl(),
414 logic.strategy, mon->monmap);
415 reply->quorum_features = required_features;
416 reply->mon_features = required_mon_features;
417 reply->mon_release = mon->monmap->min_mon_release;
418 mon->features.encode(reply->sharing_bl);
419 m->get_connection()->send_message(reply);
420 }
421
422 void Elector::handle_nak(MonOpRequestRef op)
423 {
424 op->mark_event("elector:handle_nak");
425 auto m = op->get_req<MMonElection>();
426 dout(1) << "handle_nak from " << m->get_source()
427 << " quorum_features " << m->quorum_features
428 << " " << m->mon_features
429 << " min_mon_release " << (int)m->mon_release
430 << dendl;
431
432 if (m->mon_release > ceph_release()) {
433 derr << "Shutting down because I am release " << (int)ceph_release()
434 << " < min_mon_release " << (int)m->mon_release << dendl;
435 } else {
436 CompatSet other;
437 auto bi = m->sharing_bl.cbegin();
438 other.decode(bi);
439 CompatSet diff = Monitor::get_supported_features().unsupported(other);
440
441 mon_feature_t mon_supported = ceph::features::mon::get_supported();
442 // all features in 'm->mon_features' not in 'mon_supported'
443 mon_feature_t mon_diff = m->mon_features.diff(mon_supported);
444
445 derr << "Shutting down because I lack required monitor features: { "
446 << diff << " } " << mon_diff << dendl;
447 }
448 exit(0);
449 // the end!
450 }
451
452 void Elector::begin_peer_ping(int peer)
453 {
454 if (live_pinging.count(peer)) {
455 return;
456 }
457
458 if (!mon->get_quorum_mon_features().contains_all(
459 ceph::features::mon::FEATURE_PINGING)) {
460 return;
461 }
462
463 dout(5) << __func__ << " against " << peer << dendl;
464
465 peer_tracker.report_live_connection(peer, 0); // init this peer as existing
466 live_pinging.insert(peer);
467 dead_pinging.erase(peer);
468 peer_acked_ping[peer] = ceph_clock_now();
469 send_peer_ping(peer);
470 mon->timer.add_event_after(ping_timeout / PING_DIVISOR,
471 new C_MonContext{mon, [this, peer](int) {
472 ping_check(peer);
473 }});
474 }
475
476 void Elector::send_peer_ping(int peer, const utime_t *n)
477 {
478 dout(10) << __func__ << " to peer " << peer << dendl;
479
480 utime_t now;
481 if (n != NULL) {
482 now = *n;
483 } else {
484 now = ceph_clock_now();
485 }
486 MMonPing *ping = new MMonPing(MMonPing::PING, now, peer_tracker.get_encoded_bl());
487 mon->messenger->send_to_mon(ping, mon->monmap->get_addrs(peer));
488 peer_sent_ping[peer] = now;
489 }
490
491 void Elector::ping_check(int peer)
492 {
493 dout(20) << __func__ << " to peer " << peer << dendl;
494 if (!live_pinging.count(peer) &&
495 !dead_pinging.count(peer)) {
496 dout(20) << __func__ << peer << " is no longer marked for pinging" << dendl;
497 return;
498 }
499 utime_t now = ceph_clock_now();
500 utime_t& acked_ping = peer_acked_ping[peer];
501 utime_t& newest_ping = peer_sent_ping[peer];
502 if (!acked_ping.is_zero() && acked_ping < now - ping_timeout) {
503 peer_tracker.report_dead_connection(peer, now - acked_ping);
504 acked_ping = now;
505 begin_dead_ping(peer);
506 return;
507 }
508
509 if (acked_ping == newest_ping) {
510 send_peer_ping(peer, &now);
511 }
512
513 mon->timer.add_event_after(ping_timeout / PING_DIVISOR,
514 new C_MonContext{mon, [this, peer](int) {
515 ping_check(peer);
516 }});
517 }
518
519 void Elector::begin_dead_ping(int peer)
520 {
521 if (dead_pinging.count(peer)) {
522 return;
523 }
524
525 live_pinging.erase(peer);
526 dead_pinging.insert(peer);
527 mon->timer.add_event_after(ping_timeout,
528 new C_MonContext{mon, [this, peer](int) {
529 dead_ping(peer);
530 }});
531 }
532
533 void Elector::dead_ping(int peer)
534 {
535 dout(20) << __func__ << " to peer " << peer << dendl;
536 if (!dead_pinging.count(peer)) {
537 dout(20) << __func__ << peer << " is no longer marked for dead pinging" << dendl;
538 return;
539 }
540 ceph_assert(!live_pinging.count(peer));
541
542 utime_t now = ceph_clock_now();
543 utime_t& acked_ping = peer_acked_ping[peer];
544
545 peer_tracker.report_dead_connection(peer, now - acked_ping);
546 acked_ping = now;
547 mon->timer.add_event_after(ping_timeout,
548 new C_MonContext{mon, [this, peer](int) {
549 dead_ping(peer);
550 }});
551 }
552
553 void Elector::handle_ping(MonOpRequestRef op)
554 {
555 MMonPing *m = static_cast<MMonPing*>(op->get_req());
556 dout(10) << __func__ << " " << *m << dendl;
557
558 int prank = mon->monmap->get_rank(m->get_source_addr());
559 begin_peer_ping(prank);
560 assimilate_connection_reports(m->tracker_bl);
561 switch(m->op) {
562 case MMonPing::PING:
563 {
564 MMonPing *reply = new MMonPing(MMonPing::PING_REPLY, m->stamp, peer_tracker.get_encoded_bl());
565 m->get_connection()->send_message(reply);
566 }
567 break;
568
569 case MMonPing::PING_REPLY:
570 const utime_t& previous_acked = peer_acked_ping[prank];
571 const utime_t& newest = peer_sent_ping[prank];
572 if (m->stamp > newest && !newest.is_zero()) {
573 derr << "dropping PING_REPLY stamp " << m->stamp
574 << " as it is newer than newest sent " << newest << dendl;
575 return;
576 }
577 if (m->stamp > previous_acked) {
578 peer_tracker.report_live_connection(prank, m->stamp - previous_acked);
579 peer_acked_ping[prank] = m->stamp;
580 }
581 utime_t now = ceph_clock_now();
582 if (now - m->stamp > ping_timeout / PING_DIVISOR) {
583 send_peer_ping(prank, &now);
584 }
585 break;
586 }
587 }
588
589 void Elector::dispatch(MonOpRequestRef op)
590 {
591 op->mark_event("elector:dispatch");
592 ceph_assert(op->is_type_election_or_ping());
593
594 switch (op->get_req()->get_type()) {
595
596 case MSG_MON_ELECTION:
597 {
598 if (!logic.participating) {
599 return;
600 }
601 if (op->get_req()->get_source().num() >= mon->monmap->size()) {
602 dout(5) << " ignoring bogus election message with bad mon rank "
603 << op->get_req()->get_source() << dendl;
604 return;
605 }
606
607 auto em = op->get_req<MMonElection>();
608
609 // assume an old message encoding would have matched
610 if (em->fsid != mon->monmap->fsid) {
611 dout(0) << " ignoring election msg fsid "
612 << em->fsid << " != " << mon->monmap->fsid << dendl;
613 return;
614 }
615
616 if (!mon->monmap->contains(em->get_source_addr())) {
617 dout(1) << "discarding election message: " << em->get_source_addr()
618 << " not in my monmap " << *mon->monmap << dendl;
619 return;
620 }
621
622 MonMap peermap;
623 peermap.decode(em->monmap_bl);
624 if (peermap.epoch > mon->monmap->epoch) {
625 dout(0) << em->get_source_inst() << " has newer monmap epoch " << peermap.epoch
626 << " > my epoch " << mon->monmap->epoch
627 << ", taking it"
628 << dendl;
629 mon->monmap->decode(em->monmap_bl);
630 auto t(std::make_shared<MonitorDBStore::Transaction>());
631 t->put("monmap", mon->monmap->epoch, em->monmap_bl);
632 t->put("monmap", "last_committed", mon->monmap->epoch);
633 mon->store->apply_transaction(t);
634 //mon->monmon()->paxos->stash_latest(mon->monmap->epoch, em->monmap_bl);
635 cancel_timer();
636 mon->notify_new_monmap(false);
637 mon->bootstrap();
638 return;
639 }
640 if (peermap.epoch < mon->monmap->epoch) {
641 dout(0) << em->get_source_inst() << " has older monmap epoch " << peermap.epoch
642 << " < my epoch " << mon->monmap->epoch
643 << dendl;
644 }
645
646 if (em->strategy != logic.strategy) {
647 dout(5) << __func__ << " somehow got an Election message with different strategy "
648 << em->strategy << " from local " << logic.strategy
649 << "; dropping for now to let race resolve" << dendl;
650 return;
651 }
652
653 if (em->scoring_bl.length()) {
654 assimilate_connection_reports(em->scoring_bl);
655 }
656
657 begin_peer_ping(mon->monmap->get_rank(em->get_source_addr()));
658 switch (em->op) {
659 case MMonElection::OP_PROPOSE:
660 handle_propose(op);
661 return;
662 }
663
664 if (em->epoch < get_epoch()) {
665 dout(5) << "old epoch, dropping" << dendl;
666 break;
667 }
668
669 switch (em->op) {
670 case MMonElection::OP_ACK:
671 handle_ack(op);
672 return;
673 case MMonElection::OP_VICTORY:
674 handle_victory(op);
675 return;
676 case MMonElection::OP_NAK:
677 handle_nak(op);
678 return;
679 default:
680 ceph_abort();
681 }
682 }
683 break;
684
685 case MSG_MON_PING:
686 handle_ping(op);
687 break;
688
689 default:
690 ceph_abort();
691 }
692 }
693
694 void Elector::start_participating()
695 {
696 logic.participating = true;
697 }
698
699 void Elector::notify_clear_peer_state()
700 {
701 peer_tracker.notify_reset();
702 }
703
704 void Elector::notify_rank_changed(int new_rank)
705 {
706 peer_tracker.notify_rank_changed(new_rank);
707 live_pinging.erase(new_rank);
708 dead_pinging.erase(new_rank);
709 }
710
711 void Elector::notify_rank_removed(int rank_removed)
712 {
713 peer_tracker.notify_rank_removed(rank_removed);
714 /* we have to clean up the pinging state, which is annoying
715 because it's not indexed anywhere (and adding indexing
716 would also be annoying).
717 In the case where we are removing any rank that is not the
718 higest, we start with the removed rank and examine the state
719 of the surrounding ranks.
720 Everybody who remains with larger rank gets a new rank one lower
721 than before, and we have to figure out the remaining scheduled
722 ping contexts. So, starting one past with the removed rank, we:
723 * check if the current rank is alive or dead
724 * examine our new rank (one less than before, initially the removed
725 rank)
726 * * erase it if it's in the wrong set
727 * * start pinging it if we're not already
728 * check if the next rank is in the same pinging set, and delete
729 * ourselves if not.
730 In the case where we are removing the highest rank,
731 we erase the removed rank from all sets.
732 */
733 if (rank_removed < paxos_size()) {
734 for (unsigned i = rank_removed + 1; i <= paxos_size() ; ++i) {
735 if (live_pinging.count(i)) {
736 dead_pinging.erase(i-1);
737 if (!live_pinging.count(i-1)) {
738 begin_peer_ping(i-1);
739 }
740 if (!live_pinging.count(i+1)) {
741 live_pinging.erase(i);
742 }
743 }
744 else if (dead_pinging.count(i)) {
745 live_pinging.erase(i-1);
746 if (!dead_pinging.count(i-1)) {
747 begin_dead_ping(i-1);
748 }
749 if (!dead_pinging.count(i+1)) {
750 dead_pinging.erase(i);
751 }
752 } else {
753 // we aren't pinging rank i at all
754 if (i-1 == (unsigned)rank_removed) {
755 // so we special case to make sure we
756 // actually nuke the removed rank
757 dead_pinging.erase(rank_removed);
758 live_pinging.erase(rank_removed);
759 }
760 }
761 }
762 } else {
763 if (live_pinging.count(rank_removed)) {
764 live_pinging.erase(rank_removed);
765 }
766 if (dead_pinging.count(rank_removed)) {
767 dead_pinging.erase(rank_removed);
768 }
769 }
770 }
771
772 void Elector::notify_strategy_maybe_changed(int strategy)
773 {
774 logic.set_election_strategy(static_cast<ElectionLogic::election_strategy>(strategy));
775 }