]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/Elector.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / mon / Elector.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "Elector.h"
16#include "Monitor.h"
17
18#include "common/Timer.h"
19#include "MonitorDBStore.h"
20#include "messages/MMonElection.h"
21
22#include "common/config.h"
11fdf7f2 23#include "include/ceph_assert.h"
7c673cae
FG
24
25#define dout_subsys ceph_subsys_mon
26#undef dout_prefix
27#define dout_prefix _prefix(_dout, mon, epoch)
28static ostream& _prefix(std::ostream *_dout, Monitor *mon, epoch_t epoch) {
29 return *_dout << "mon." << mon->name << "@" << mon->rank
30 << "(" << mon->get_state_name()
31 << ").elector(" << epoch << ") ";
32}
33
34
35void Elector::init()
36{
37 epoch = mon->store->get(Monitor::MONITOR_NAME, "election_epoch");
d2e6a577
FG
38 if (!epoch) {
39 dout(1) << "init, first boot, initializing epoch at 1 " << dendl;
7c673cae 40 epoch = 1;
d2e6a577
FG
41 } else if (epoch % 2) {
42 dout(1) << "init, last seen epoch " << epoch
43 << ", mid-election, bumping" << dendl;
44 ++epoch;
45 auto t(std::make_shared<MonitorDBStore::Transaction>());
46 t->put(Monitor::MONITOR_NAME, "election_epoch", epoch);
47 mon->store->apply_transaction(t);
48 } else {
49 dout(1) << "init, last seen epoch " << epoch << dendl;
50 }
7c673cae
FG
51}
52
53void Elector::shutdown()
54{
55 cancel_timer();
56}
57
58void Elector::bump_epoch(epoch_t e)
59{
60 dout(10) << "bump_epoch " << epoch << " to " << e << dendl;
11fdf7f2 61 ceph_assert(epoch <= e);
7c673cae
FG
62 epoch = e;
63 auto t(std::make_shared<MonitorDBStore::Transaction>());
64 t->put(Monitor::MONITOR_NAME, "election_epoch", epoch);
65 mon->store->apply_transaction(t);
66
67 mon->join_election();
68
69 // clear up some state
70 electing_me = false;
71 acked_me.clear();
72}
73
74
75void Elector::start()
76{
77 if (!participating) {
78 dout(0) << "not starting new election -- not participating" << dendl;
79 return;
80 }
81 dout(5) << "start -- can i be leader?" << dendl;
82
83 acked_me.clear();
84 init();
85
86 // start by trying to elect me
87 if (epoch % 2 == 0) {
88 bump_epoch(epoch+1); // odd == election cycle
89 } else {
90 // do a trivial db write just to ensure it is writeable.
91 auto t(std::make_shared<MonitorDBStore::Transaction>());
92 t->put(Monitor::MONITOR_NAME, "election_writeable_test", rand());
93 int r = mon->store->apply_transaction(t);
11fdf7f2 94 ceph_assert(r >= 0);
7c673cae 95 }
7c673cae
FG
96 electing_me = true;
97 acked_me[mon->rank].cluster_features = CEPH_FEATURES_ALL;
11fdf7f2 98 acked_me[mon->rank].mon_release = ceph_release();
7c673cae 99 acked_me[mon->rank].mon_features = ceph::features::mon::get_supported();
224ce89b 100 mon->collect_metadata(&acked_me[mon->rank].metadata);
7c673cae
FG
101 leader_acked = -1;
102
103 // bcast to everyone else
104 for (unsigned i=0; i<mon->monmap->size(); ++i) {
105 if ((int)i == mon->rank) continue;
106 MMonElection *m =
107 new MMonElection(MMonElection::OP_PROPOSE, epoch, mon->monmap);
108 m->mon_features = ceph::features::mon::get_supported();
11fdf7f2
TL
109 m->mon_release = ceph_release();
110 mon->send_mon_message(m, i);
7c673cae
FG
111 }
112
113 reset_timer();
114}
115
116void Elector::defer(int who)
117{
118 dout(5) << "defer to " << who << dendl;
119
120 if (electing_me) {
121 // drop out
122 acked_me.clear();
123 electing_me = false;
124 }
125
126 // ack them
127 leader_acked = who;
7c673cae
FG
128 MMonElection *m = new MMonElection(MMonElection::OP_ACK, epoch, mon->monmap);
129 m->mon_features = ceph::features::mon::get_supported();
11fdf7f2 130 m->mon_release = ceph_release();
224ce89b 131 mon->collect_metadata(&m->metadata);
d2e6a577 132
11fdf7f2 133 mon->send_mon_message(m, who);
7c673cae
FG
134
135 // set a timer
136 reset_timer(1.0); // give the leader some extra time to declare victory
137}
138
139
140void Elector::reset_timer(double plus)
141{
142 // set the timer
143 cancel_timer();
144 /**
145 * This class is used as the callback when the expire_event timer fires up.
146 *
147 * If the expire_event is fired, then it means that we had an election going,
148 * either started by us or by some other participant, but it took too long,
149 * thus expiring.
150 *
151 * When the election expires, we will check if we were the ones who won, and
152 * if so we will declare victory. If that is not the case, then we assume
153 * that the one we defered to didn't declare victory quickly enough (in fact,
154 * as far as we know, we may even be dead); so, just propose ourselves as the
155 * Leader.
156 */
3efd9988 157 expire_event = mon->timer.add_event_after(
11fdf7f2 158 g_conf()->mon_election_timeout + plus,
3efd9988
FG
159 new C_MonContext(mon, [this](int) {
160 expire();
161 }));
7c673cae
FG
162}
163
164
165void Elector::cancel_timer()
166{
167 if (expire_event) {
168 mon->timer.cancel_event(expire_event);
169 expire_event = 0;
170 }
171}
172
173void Elector::expire()
174{
175 dout(5) << "election timer expired" << dendl;
176
177 // did i win?
178 if (electing_me &&
179 acked_me.size() > (unsigned)(mon->monmap->size() / 2)) {
180 // i win
181 victory();
182 } else {
183 // whoever i deferred to didn't declare victory quickly enough.
184 if (mon->has_ever_joined)
185 start();
186 else
187 mon->bootstrap();
188 }
189}
190
191
192void Elector::victory()
193{
194 leader_acked = -1;
195 electing_me = false;
196
197 uint64_t cluster_features = CEPH_FEATURES_ALL;
198 mon_feature_t mon_features = ceph::features::mon::get_supported();
199 set<int> quorum;
224ce89b 200 map<int,Metadata> metadata;
11fdf7f2 201 int min_mon_release = -1;
224ce89b 202 for (map<int, elector_info_t>::iterator p = acked_me.begin();
7c673cae
FG
203 p != acked_me.end();
204 ++p) {
205 quorum.insert(p->first);
206 cluster_features &= p->second.cluster_features;
207 mon_features &= p->second.mon_features;
224ce89b 208 metadata[p->first] = p->second.metadata;
11fdf7f2
TL
209 if (min_mon_release < 0 || p->second.mon_release < min_mon_release) {
210 min_mon_release = p->second.mon_release;
211 }
7c673cae
FG
212 }
213
214 cancel_timer();
215
11fdf7f2 216 ceph_assert(epoch % 2 == 1); // election
7c673cae
FG
217 bump_epoch(epoch+1); // is over!
218
7c673cae
FG
219 // tell everyone!
220 for (set<int>::iterator p = quorum.begin();
221 p != quorum.end();
222 ++p) {
223 if (*p == mon->rank) continue;
d2e6a577
FG
224 MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, epoch,
225 mon->monmap);
7c673cae
FG
226 m->quorum = quorum;
227 m->quorum_features = cluster_features;
228 m->mon_features = mon_features;
d2e6a577 229 m->sharing_bl = mon->get_local_commands_bl(mon_features);
11fdf7f2
TL
230 m->mon_release = min_mon_release;
231 mon->send_mon_message(m, *p);
7c673cae 232 }
224ce89b 233
7c673cae
FG
234 // tell monitor
235 mon->win_election(epoch, quorum,
11fdf7f2
TL
236 cluster_features, mon_features, min_mon_release,
237 metadata);
7c673cae
FG
238}
239
240
241void Elector::handle_propose(MonOpRequestRef op)
242{
243 op->mark_event("elector:handle_propose");
244 MMonElection *m = static_cast<MMonElection*>(op->get_req());
245 dout(5) << "handle_propose from " << m->get_source() << dendl;
246 int from = m->get_source().num();
247
11fdf7f2 248 ceph_assert(m->epoch % 2 == 1); // election
7c673cae
FG
249 uint64_t required_features = mon->get_required_features();
250 mon_feature_t required_mon_features = mon->get_required_mon_features();
251
252 dout(10) << __func__ << " required features " << required_features
253 << " " << required_mon_features
254 << ", peer features " << m->get_connection()->get_features()
255 << " " << m->mon_features
256 << dendl;
257
258 if ((required_features ^ m->get_connection()->get_features()) &
259 required_features) {
260 dout(5) << " ignoring propose from mon" << from
261 << " without required features" << dendl;
262 nak_old_peer(op);
263 return;
11fdf7f2
TL
264 } else if (mon->monmap->min_mon_release > m->mon_release) {
265 dout(5) << " ignoring propose from mon" << from
266 << " release " << m->mon_release
267 << " < min_mon_release " << mon->monmap->min_mon_release << dendl;
268 nak_old_peer(op);
269 return;
7c673cae
FG
270 } else if (!m->mon_features.contains_all(required_mon_features)) {
271 // all the features in 'required_mon_features' not in 'm->mon_features'
272 mon_feature_t missing = required_mon_features.diff(m->mon_features);
273 dout(5) << " ignoring propose from mon." << from
274 << " without required mon_features " << missing
275 << dendl;
276 nak_old_peer(op);
277 } else if (m->epoch > epoch) {
278 bump_epoch(m->epoch);
279 } else if (m->epoch < epoch) {
280 // got an "old" propose,
281 if (epoch % 2 == 0 && // in a non-election cycle
282 mon->quorum.count(from) == 0) { // from someone outside the quorum
283 // a mon just started up, call a new election so they can rejoin!
284 dout(5) << " got propose from old epoch, quorum is " << mon->quorum
285 << ", " << m->get_source() << " must have just started" << dendl;
286 // we may be active; make sure we reset things in the monitor appropriately.
287 mon->start_election();
288 } else {
289 dout(5) << " ignoring old propose" << dendl;
290 return;
291 }
292 }
293
294 if (mon->rank < from) {
295 // i would win over them.
296 if (leader_acked >= 0) { // we already acked someone
11fdf7f2 297 ceph_assert(leader_acked < from); // and they still win, of course
7c673cae
FG
298 dout(5) << "no, we already acked " << leader_acked << dendl;
299 } else {
300 // wait, i should win!
301 if (!electing_me) {
302 mon->start_election();
303 }
304 }
305 } else {
306 // they would win over me
307 if (leader_acked < 0 || // haven't acked anyone yet, or
308 leader_acked > from || // they would win over who you did ack, or
309 leader_acked == from) { // this is the guy we're already deferring to
310 defer(from);
311 } else {
312 // ignore them!
313 dout(5) << "no, we already acked " << leader_acked << dendl;
314 }
315 }
316}
317
318void Elector::handle_ack(MonOpRequestRef op)
319{
320 op->mark_event("elector:handle_ack");
321 MMonElection *m = static_cast<MMonElection*>(op->get_req());
322 dout(5) << "handle_ack from " << m->get_source() << dendl;
323 int from = m->get_source().num();
324
11fdf7f2 325 ceph_assert(m->epoch % 2 == 1); // election
7c673cae
FG
326 if (m->epoch > epoch) {
327 dout(5) << "woah, that's a newer epoch, i must have rebooted. bumping and re-starting!" << dendl;
328 bump_epoch(m->epoch);
329 start();
330 return;
331 }
11fdf7f2 332 ceph_assert(m->epoch == epoch);
7c673cae
FG
333 uint64_t required_features = mon->get_required_features();
334 if ((required_features ^ m->get_connection()->get_features()) &
335 required_features) {
336 dout(5) << " ignoring ack from mon" << from
337 << " without required features" << dendl;
338 return;
339 }
340
341 mon_feature_t required_mon_features = mon->get_required_mon_features();
342 if (!m->mon_features.contains_all(required_mon_features)) {
343 mon_feature_t missing = required_mon_features.diff(m->mon_features);
344 dout(5) << " ignoring ack from mon." << from
345 << " without required mon_features " << missing
346 << dendl;
347 return;
348 }
349
350 if (electing_me) {
351 // thanks
352 acked_me[from].cluster_features = m->get_connection()->get_features();
353 acked_me[from].mon_features = m->mon_features;
11fdf7f2 354 acked_me[from].mon_release = m->mon_release;
224ce89b 355 acked_me[from].metadata = m->metadata;
7c673cae 356 dout(5) << " so far i have {";
224ce89b 357 for (map<int, elector_info_t>::const_iterator p = acked_me.begin();
7c673cae
FG
358 p != acked_me.end();
359 ++p) {
360 if (p != acked_me.begin())
361 *_dout << ",";
362 *_dout << " mon." << p->first << ":"
363 << " features " << p->second.cluster_features
364 << " " << p->second.mon_features;
365 }
366 *_dout << " }" << dendl;
367
368 // is that _everyone_?
369 if (acked_me.size() == mon->monmap->size()) {
370 // if yes, shortcut to election finish
371 victory();
372 }
373 } else {
374 // ignore, i'm deferring already.
11fdf7f2 375 ceph_assert(leader_acked >= 0);
7c673cae
FG
376 }
377}
378
379
380void Elector::handle_victory(MonOpRequestRef op)
381{
382 op->mark_event("elector:handle_victory");
383 MMonElection *m = static_cast<MMonElection*>(op->get_req());
384 dout(5) << "handle_victory from " << m->get_source()
385 << " quorum_features " << m->quorum_features
386 << " " << m->mon_features
387 << dendl;
388 int from = m->get_source().num();
389
11fdf7f2
TL
390 ceph_assert(from < mon->rank);
391 ceph_assert(m->epoch % 2 == 0);
7c673cae
FG
392
393 leader_acked = -1;
394
395 // i should have seen this election if i'm getting the victory.
396 if (m->epoch != epoch + 1) {
397 dout(5) << "woah, that's a funny epoch, i must have rebooted. bumping and re-starting!" << dendl;
398 bump_epoch(m->epoch);
399 start();
400 return;
401 }
402
403 bump_epoch(m->epoch);
404
405 // they win
406 mon->lose_election(epoch, m->quorum, from,
11fdf7f2 407 m->quorum_features, m->mon_features, m->mon_release);
7c673cae
FG
408
409 // cancel my timer
410 cancel_timer();
411
412 // stash leader's commands
11fdf7f2 413 ceph_assert(m->sharing_bl.length());
d2e6a577 414 vector<MonCommand> new_cmds;
11fdf7f2 415 auto bi = m->sharing_bl.cbegin();
d2e6a577
FG
416 MonCommand::decode_vector(new_cmds, bi);
417 mon->set_leader_commands(new_cmds);
7c673cae
FG
418}
419
420void Elector::nak_old_peer(MonOpRequestRef op)
421{
422 op->mark_event("elector:nak_old_peer");
423 MMonElection *m = static_cast<MMonElection*>(op->get_req());
424 uint64_t supported_features = m->get_connection()->get_features();
425 uint64_t required_features = mon->get_required_features();
426 mon_feature_t required_mon_features = mon->get_required_mon_features();
427 dout(10) << "sending nak to peer " << m->get_source()
11fdf7f2
TL
428 << " supports " << supported_features << " " << m->mon_features
429 << ", required " << required_features << " " << required_mon_features
430 << ", release " << m->mon_release
431 << " vs required " << mon->monmap->min_mon_release
432 << dendl;
7c673cae
FG
433 MMonElection *reply = new MMonElection(MMonElection::OP_NAK, m->epoch,
434 mon->monmap);
435 reply->quorum_features = required_features;
436 reply->mon_features = required_mon_features;
11fdf7f2 437 reply->mon_release = mon->monmap->min_mon_release;
7c673cae
FG
438 mon->features.encode(reply->sharing_bl);
439 m->get_connection()->send_message(reply);
440}
441
442void Elector::handle_nak(MonOpRequestRef op)
443{
444 op->mark_event("elector:handle_nak");
445 MMonElection *m = static_cast<MMonElection*>(op->get_req());
446 dout(1) << "handle_nak from " << m->get_source()
447 << " quorum_features " << m->quorum_features
448 << " " << m->mon_features
11fdf7f2 449 << " min_mon_release " << m->mon_release
7c673cae
FG
450 << dendl;
451
11fdf7f2
TL
452 if (m->mon_release > ceph_release()) {
453 derr << "Shutting down because I am release " << ceph_release()
454 << " < min_mon_release " << m->mon_release << dendl;
455 } else {
456 CompatSet other;
457 auto bi = m->sharing_bl.cbegin();
458 other.decode(bi);
459 CompatSet diff = Monitor::get_supported_features().unsupported(other);
7c673cae 460
11fdf7f2
TL
461 mon_feature_t mon_supported = ceph::features::mon::get_supported();
462 // all features in 'm->mon_features' not in 'mon_supported'
463 mon_feature_t mon_diff = m->mon_features.diff(mon_supported);
7c673cae 464
11fdf7f2
TL
465 derr << "Shutting down because I lack required monitor features: { "
466 << diff << " } " << mon_diff << dendl;
467 }
7c673cae
FG
468 exit(0);
469 // the end!
470}
471
472void Elector::dispatch(MonOpRequestRef op)
473{
474 op->mark_event("elector:dispatch");
11fdf7f2 475 ceph_assert(op->is_type_election());
7c673cae
FG
476
477 switch (op->get_req()->get_type()) {
478
479 case MSG_MON_ELECTION:
480 {
481 if (!participating) {
482 return;
483 }
484 if (op->get_req()->get_source().num() >= mon->monmap->size()) {
485 dout(5) << " ignoring bogus election message with bad mon rank "
486 << op->get_req()->get_source() << dendl;
487 return;
488 }
489
490 MMonElection *em = static_cast<MMonElection*>(op->get_req());
491
492 // assume an old message encoding would have matched
493 if (em->fsid != mon->monmap->fsid) {
494 dout(0) << " ignoring election msg fsid "
495 << em->fsid << " != " << mon->monmap->fsid << dendl;
496 return;
497 }
498
499 if (!mon->monmap->contains(em->get_source_addr())) {
500 dout(1) << "discarding election message: " << em->get_source_addr()
501 << " not in my monmap " << *mon->monmap << dendl;
502 return;
503 }
504
505 MonMap peermap;
506 peermap.decode(em->monmap_bl);
507 if (peermap.epoch > mon->monmap->epoch) {
508 dout(0) << em->get_source_inst() << " has newer monmap epoch " << peermap.epoch
509 << " > my epoch " << mon->monmap->epoch
510 << ", taking it"
511 << dendl;
512 mon->monmap->decode(em->monmap_bl);
513 auto t(std::make_shared<MonitorDBStore::Transaction>());
514 t->put("monmap", mon->monmap->epoch, em->monmap_bl);
515 t->put("monmap", "last_committed", mon->monmap->epoch);
516 mon->store->apply_transaction(t);
517 //mon->monmon()->paxos->stash_latest(mon->monmap->epoch, em->monmap_bl);
518 cancel_timer();
519 mon->bootstrap();
520 return;
521 }
522 if (peermap.epoch < mon->monmap->epoch) {
523 dout(0) << em->get_source_inst() << " has older monmap epoch " << peermap.epoch
524 << " < my epoch " << mon->monmap->epoch
525 << dendl;
526 }
527
528 switch (em->op) {
529 case MMonElection::OP_PROPOSE:
530 handle_propose(op);
531 return;
532 }
533
534 if (em->epoch < epoch) {
535 dout(5) << "old epoch, dropping" << dendl;
536 break;
537 }
538
539 switch (em->op) {
540 case MMonElection::OP_ACK:
541 handle_ack(op);
542 return;
543 case MMonElection::OP_VICTORY:
544 handle_victory(op);
545 return;
546 case MMonElection::OP_NAK:
547 handle_nak(op);
548 return;
549 default:
550 ceph_abort();
551 }
552 }
553 break;
554
555 default:
556 ceph_abort();
557 }
558}
559
560void Elector::start_participating()
561{
562 if (!participating) {
563 participating = true;
564 }
565}