]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/Elector.cc
update sources to v12.1.1
[ceph.git] / ceph / src / mon / Elector.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "Elector.h"
16#include "Monitor.h"
17
18#include "common/Timer.h"
19#include "MonitorDBStore.h"
20#include "messages/MMonElection.h"
21
22#include "common/config.h"
23#include "include/assert.h"
24
25#define dout_subsys ceph_subsys_mon
26#undef dout_prefix
27#define dout_prefix _prefix(_dout, mon, epoch)
28static ostream& _prefix(std::ostream *_dout, Monitor *mon, epoch_t epoch) {
29 return *_dout << "mon." << mon->name << "@" << mon->rank
30 << "(" << mon->get_state_name()
31 << ").elector(" << epoch << ") ";
32}
33
34
35void Elector::init()
36{
37 epoch = mon->store->get(Monitor::MONITOR_NAME, "election_epoch");
38 if (!epoch)
39 epoch = 1;
40 dout(1) << "init, last seen epoch " << epoch << dendl;
41}
42
43void Elector::shutdown()
44{
45 cancel_timer();
46}
47
48void Elector::bump_epoch(epoch_t e)
49{
50 dout(10) << "bump_epoch " << epoch << " to " << e << dendl;
51 assert(epoch <= e);
52 epoch = e;
53 auto t(std::make_shared<MonitorDBStore::Transaction>());
54 t->put(Monitor::MONITOR_NAME, "election_epoch", epoch);
55 mon->store->apply_transaction(t);
56
57 mon->join_election();
58
59 // clear up some state
60 electing_me = false;
61 acked_me.clear();
62}
63
64
65void Elector::start()
66{
67 if (!participating) {
68 dout(0) << "not starting new election -- not participating" << dendl;
69 return;
70 }
71 dout(5) << "start -- can i be leader?" << dendl;
72
73 acked_me.clear();
74 init();
75
76 // start by trying to elect me
77 if (epoch % 2 == 0) {
78 bump_epoch(epoch+1); // odd == election cycle
79 } else {
80 // do a trivial db write just to ensure it is writeable.
81 auto t(std::make_shared<MonitorDBStore::Transaction>());
82 t->put(Monitor::MONITOR_NAME, "election_writeable_test", rand());
83 int r = mon->store->apply_transaction(t);
84 assert(r >= 0);
85 }
86 start_stamp = ceph_clock_now();
87 electing_me = true;
88 acked_me[mon->rank].cluster_features = CEPH_FEATURES_ALL;
89 acked_me[mon->rank].mon_features = ceph::features::mon::get_supported();
224ce89b 90 mon->collect_metadata(&acked_me[mon->rank].metadata);
7c673cae
FG
91 leader_acked = -1;
92
93 // bcast to everyone else
94 for (unsigned i=0; i<mon->monmap->size(); ++i) {
95 if ((int)i == mon->rank) continue;
96 MMonElection *m =
97 new MMonElection(MMonElection::OP_PROPOSE, epoch, mon->monmap);
98 m->mon_features = ceph::features::mon::get_supported();
99 mon->messenger->send_message(m, mon->monmap->get_inst(i));
100 }
101
102 reset_timer();
103}
104
105void Elector::defer(int who)
106{
107 dout(5) << "defer to " << who << dendl;
108
109 if (electing_me) {
110 // drop out
111 acked_me.clear();
112 electing_me = false;
113 }
114
115 // ack them
116 leader_acked = who;
117 ack_stamp = ceph_clock_now();
118 MMonElection *m = new MMonElection(MMonElection::OP_ACK, epoch, mon->monmap);
119 m->mon_features = ceph::features::mon::get_supported();
120 m->sharing_bl = mon->get_supported_commands_bl();
224ce89b 121 mon->collect_metadata(&m->metadata);
7c673cae
FG
122 mon->messenger->send_message(m, mon->monmap->get_inst(who));
123
124 // set a timer
125 reset_timer(1.0); // give the leader some extra time to declare victory
126}
127
128
129void Elector::reset_timer(double plus)
130{
131 // set the timer
132 cancel_timer();
133 /**
134 * This class is used as the callback when the expire_event timer fires up.
135 *
136 * If the expire_event is fired, then it means that we had an election going,
137 * either started by us or by some other participant, but it took too long,
138 * thus expiring.
139 *
140 * When the election expires, we will check if we were the ones who won, and
141 * if so we will declare victory. If that is not the case, then we assume
142 * that the one we defered to didn't declare victory quickly enough (in fact,
143 * as far as we know, we may even be dead); so, just propose ourselves as the
144 * Leader.
145 */
146 expire_event = new C_MonContext(mon, [this](int) {
147 expire();
148 });
149 mon->timer.add_event_after(g_conf->mon_election_timeout + plus,
150 expire_event);
151}
152
153
154void Elector::cancel_timer()
155{
156 if (expire_event) {
157 mon->timer.cancel_event(expire_event);
158 expire_event = 0;
159 }
160}
161
162void Elector::expire()
163{
164 dout(5) << "election timer expired" << dendl;
165
166 // did i win?
167 if (electing_me &&
168 acked_me.size() > (unsigned)(mon->monmap->size() / 2)) {
169 // i win
170 victory();
171 } else {
172 // whoever i deferred to didn't declare victory quickly enough.
173 if (mon->has_ever_joined)
174 start();
175 else
176 mon->bootstrap();
177 }
178}
179
180
181void Elector::victory()
182{
183 leader_acked = -1;
184 electing_me = false;
185
186 uint64_t cluster_features = CEPH_FEATURES_ALL;
187 mon_feature_t mon_features = ceph::features::mon::get_supported();
188 set<int> quorum;
224ce89b
WB
189 map<int,Metadata> metadata;
190 for (map<int, elector_info_t>::iterator p = acked_me.begin();
7c673cae
FG
191 p != acked_me.end();
192 ++p) {
193 quorum.insert(p->first);
194 cluster_features &= p->second.cluster_features;
195 mon_features &= p->second.mon_features;
224ce89b 196 metadata[p->first] = p->second.metadata;
7c673cae
FG
197 }
198
199 cancel_timer();
200
201 assert(epoch % 2 == 1); // election
202 bump_epoch(epoch+1); // is over!
203
204 // decide my supported commands for peons to advertise
205 const bufferlist *cmds_bl = NULL;
206 const MonCommand *cmds;
207 int cmdsize;
208 mon->get_locally_supported_monitor_commands(&cmds, &cmdsize);
209 cmds_bl = &mon->get_supported_commands_bl();
210
211 // tell everyone!
212 for (set<int>::iterator p = quorum.begin();
213 p != quorum.end();
214 ++p) {
215 if (*p == mon->rank) continue;
216 MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, epoch, mon->monmap);
217 m->quorum = quorum;
218 m->quorum_features = cluster_features;
219 m->mon_features = mon_features;
220 m->sharing_bl = *cmds_bl;
221 mon->messenger->send_message(m, mon->monmap->get_inst(*p));
222 }
224ce89b 223
7c673cae
FG
224 // tell monitor
225 mon->win_election(epoch, quorum,
224ce89b 226 cluster_features, mon_features, metadata,
7c673cae
FG
227 cmds, cmdsize);
228}
229
230
231void Elector::handle_propose(MonOpRequestRef op)
232{
233 op->mark_event("elector:handle_propose");
234 MMonElection *m = static_cast<MMonElection*>(op->get_req());
235 dout(5) << "handle_propose from " << m->get_source() << dendl;
236 int from = m->get_source().num();
237
238 assert(m->epoch % 2 == 1); // election
239 uint64_t required_features = mon->get_required_features();
240 mon_feature_t required_mon_features = mon->get_required_mon_features();
241
242 dout(10) << __func__ << " required features " << required_features
243 << " " << required_mon_features
244 << ", peer features " << m->get_connection()->get_features()
245 << " " << m->mon_features
246 << dendl;
247
248 if ((required_features ^ m->get_connection()->get_features()) &
249 required_features) {
250 dout(5) << " ignoring propose from mon" << from
251 << " without required features" << dendl;
252 nak_old_peer(op);
253 return;
254 } else if (!m->mon_features.contains_all(required_mon_features)) {
255 // all the features in 'required_mon_features' not in 'm->mon_features'
256 mon_feature_t missing = required_mon_features.diff(m->mon_features);
257 dout(5) << " ignoring propose from mon." << from
258 << " without required mon_features " << missing
259 << dendl;
260 nak_old_peer(op);
261 } else if (m->epoch > epoch) {
262 bump_epoch(m->epoch);
263 } else if (m->epoch < epoch) {
264 // got an "old" propose,
265 if (epoch % 2 == 0 && // in a non-election cycle
266 mon->quorum.count(from) == 0) { // from someone outside the quorum
267 // a mon just started up, call a new election so they can rejoin!
268 dout(5) << " got propose from old epoch, quorum is " << mon->quorum
269 << ", " << m->get_source() << " must have just started" << dendl;
270 // we may be active; make sure we reset things in the monitor appropriately.
271 mon->start_election();
272 } else {
273 dout(5) << " ignoring old propose" << dendl;
274 return;
275 }
276 }
277
278 if (mon->rank < from) {
279 // i would win over them.
280 if (leader_acked >= 0) { // we already acked someone
281 assert(leader_acked < from); // and they still win, of course
282 dout(5) << "no, we already acked " << leader_acked << dendl;
283 } else {
284 // wait, i should win!
285 if (!electing_me) {
286 mon->start_election();
287 }
288 }
289 } else {
290 // they would win over me
291 if (leader_acked < 0 || // haven't acked anyone yet, or
292 leader_acked > from || // they would win over who you did ack, or
293 leader_acked == from) { // this is the guy we're already deferring to
294 defer(from);
295 } else {
296 // ignore them!
297 dout(5) << "no, we already acked " << leader_acked << dendl;
298 }
299 }
300}
301
302void Elector::handle_ack(MonOpRequestRef op)
303{
304 op->mark_event("elector:handle_ack");
305 MMonElection *m = static_cast<MMonElection*>(op->get_req());
306 dout(5) << "handle_ack from " << m->get_source() << dendl;
307 int from = m->get_source().num();
308
309 assert(m->epoch % 2 == 1); // election
310 if (m->epoch > epoch) {
311 dout(5) << "woah, that's a newer epoch, i must have rebooted. bumping and re-starting!" << dendl;
312 bump_epoch(m->epoch);
313 start();
314 return;
315 }
316 assert(m->epoch == epoch);
317 uint64_t required_features = mon->get_required_features();
318 if ((required_features ^ m->get_connection()->get_features()) &
319 required_features) {
320 dout(5) << " ignoring ack from mon" << from
321 << " without required features" << dendl;
322 return;
323 }
324
325 mon_feature_t required_mon_features = mon->get_required_mon_features();
326 if (!m->mon_features.contains_all(required_mon_features)) {
327 mon_feature_t missing = required_mon_features.diff(m->mon_features);
328 dout(5) << " ignoring ack from mon." << from
329 << " without required mon_features " << missing
330 << dendl;
331 return;
332 }
333
334 if (electing_me) {
335 // thanks
336 acked_me[from].cluster_features = m->get_connection()->get_features();
337 acked_me[from].mon_features = m->mon_features;
224ce89b 338 acked_me[from].metadata = m->metadata;
7c673cae 339 dout(5) << " so far i have {";
224ce89b 340 for (map<int, elector_info_t>::const_iterator p = acked_me.begin();
7c673cae
FG
341 p != acked_me.end();
342 ++p) {
343 if (p != acked_me.begin())
344 *_dout << ",";
345 *_dout << " mon." << p->first << ":"
346 << " features " << p->second.cluster_features
347 << " " << p->second.mon_features;
348 }
349 *_dout << " }" << dendl;
350
351 // is that _everyone_?
352 if (acked_me.size() == mon->monmap->size()) {
353 // if yes, shortcut to election finish
354 victory();
355 }
356 } else {
357 // ignore, i'm deferring already.
358 assert(leader_acked >= 0);
359 }
360}
361
362
363void Elector::handle_victory(MonOpRequestRef op)
364{
365 op->mark_event("elector:handle_victory");
366 MMonElection *m = static_cast<MMonElection*>(op->get_req());
367 dout(5) << "handle_victory from " << m->get_source()
368 << " quorum_features " << m->quorum_features
369 << " " << m->mon_features
370 << dendl;
371 int from = m->get_source().num();
372
373 assert(from < mon->rank);
374 assert(m->epoch % 2 == 0);
375
376 leader_acked = -1;
377
378 // i should have seen this election if i'm getting the victory.
379 if (m->epoch != epoch + 1) {
380 dout(5) << "woah, that's a funny epoch, i must have rebooted. bumping and re-starting!" << dendl;
381 bump_epoch(m->epoch);
382 start();
383 return;
384 }
385
386 bump_epoch(m->epoch);
387
388 // they win
389 mon->lose_election(epoch, m->quorum, from,
390 m->quorum_features, m->mon_features);
391
392 // cancel my timer
393 cancel_timer();
394
395 // stash leader's commands
396 assert(m->sharing_bl.length());
397 MonCommand *new_cmds;
398 int cmdsize;
399 bufferlist::iterator bi = m->sharing_bl.begin();
400 MonCommand::decode_array(&new_cmds, &cmdsize, bi);
401 mon->set_leader_supported_commands(new_cmds, cmdsize);
402}
403
404void Elector::nak_old_peer(MonOpRequestRef op)
405{
406 op->mark_event("elector:nak_old_peer");
407 MMonElection *m = static_cast<MMonElection*>(op->get_req());
408 uint64_t supported_features = m->get_connection()->get_features();
409 uint64_t required_features = mon->get_required_features();
410 mon_feature_t required_mon_features = mon->get_required_mon_features();
411 dout(10) << "sending nak to peer " << m->get_source()
412 << " that only supports " << supported_features
413 << " " << m->mon_features
414 << " of the required " << required_features
415 << " " << required_mon_features
416 << dendl;
417
418 MMonElection *reply = new MMonElection(MMonElection::OP_NAK, m->epoch,
419 mon->monmap);
420 reply->quorum_features = required_features;
421 reply->mon_features = required_mon_features;
422 mon->features.encode(reply->sharing_bl);
423 m->get_connection()->send_message(reply);
424}
425
426void Elector::handle_nak(MonOpRequestRef op)
427{
428 op->mark_event("elector:handle_nak");
429 MMonElection *m = static_cast<MMonElection*>(op->get_req());
430 dout(1) << "handle_nak from " << m->get_source()
431 << " quorum_features " << m->quorum_features
432 << " " << m->mon_features
433 << dendl;
434
435 CompatSet other;
436 bufferlist::iterator bi = m->sharing_bl.begin();
437 other.decode(bi);
438 CompatSet diff = Monitor::get_supported_features().unsupported(other);
439
440 mon_feature_t mon_supported = ceph::features::mon::get_supported();
441 // all features in 'm->mon_features' not in 'mon_supported'
442 mon_feature_t mon_diff = m->mon_features.diff(mon_supported);
443
444 derr << "Shutting down because I do not support required monitor features: { "
445 << diff << " } " << mon_diff << dendl;
446
447 exit(0);
448 // the end!
449}
450
451void Elector::dispatch(MonOpRequestRef op)
452{
453 op->mark_event("elector:dispatch");
454 assert(op->is_type_election());
455
456 switch (op->get_req()->get_type()) {
457
458 case MSG_MON_ELECTION:
459 {
460 if (!participating) {
461 return;
462 }
463 if (op->get_req()->get_source().num() >= mon->monmap->size()) {
464 dout(5) << " ignoring bogus election message with bad mon rank "
465 << op->get_req()->get_source() << dendl;
466 return;
467 }
468
469 MMonElection *em = static_cast<MMonElection*>(op->get_req());
470
471 // assume an old message encoding would have matched
472 if (em->fsid != mon->monmap->fsid) {
473 dout(0) << " ignoring election msg fsid "
474 << em->fsid << " != " << mon->monmap->fsid << dendl;
475 return;
476 }
477
478 if (!mon->monmap->contains(em->get_source_addr())) {
479 dout(1) << "discarding election message: " << em->get_source_addr()
480 << " not in my monmap " << *mon->monmap << dendl;
481 return;
482 }
483
484 MonMap peermap;
485 peermap.decode(em->monmap_bl);
486 if (peermap.epoch > mon->monmap->epoch) {
487 dout(0) << em->get_source_inst() << " has newer monmap epoch " << peermap.epoch
488 << " > my epoch " << mon->monmap->epoch
489 << ", taking it"
490 << dendl;
491 mon->monmap->decode(em->monmap_bl);
492 auto t(std::make_shared<MonitorDBStore::Transaction>());
493 t->put("monmap", mon->monmap->epoch, em->monmap_bl);
494 t->put("monmap", "last_committed", mon->monmap->epoch);
495 mon->store->apply_transaction(t);
496 //mon->monmon()->paxos->stash_latest(mon->monmap->epoch, em->monmap_bl);
497 cancel_timer();
498 mon->bootstrap();
499 return;
500 }
501 if (peermap.epoch < mon->monmap->epoch) {
502 dout(0) << em->get_source_inst() << " has older monmap epoch " << peermap.epoch
503 << " < my epoch " << mon->monmap->epoch
504 << dendl;
505 }
506
507 switch (em->op) {
508 case MMonElection::OP_PROPOSE:
509 handle_propose(op);
510 return;
511 }
512
513 if (em->epoch < epoch) {
514 dout(5) << "old epoch, dropping" << dendl;
515 break;
516 }
517
518 switch (em->op) {
519 case MMonElection::OP_ACK:
520 handle_ack(op);
521 return;
522 case MMonElection::OP_VICTORY:
523 handle_victory(op);
524 return;
525 case MMonElection::OP_NAK:
526 handle_nak(op);
527 return;
528 default:
529 ceph_abort();
530 }
531 }
532 break;
533
534 default:
535 ceph_abort();
536 }
537}
538
539void Elector::start_participating()
540{
541 if (!participating) {
542 participating = true;
543 }
544}