]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/Elector.cc
303510530b5c223709a69decb79c1dcf9f462d7b
[ceph.git] / ceph / src / mon / Elector.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "Elector.h"
16 #include "Monitor.h"
17
18 #include "common/Timer.h"
19 #include "MonitorDBStore.h"
20 #include "messages/MMonElection.h"
21
22 #include "common/config.h"
23 #include "include/assert.h"
24
25 #define dout_subsys ceph_subsys_mon
26 #undef dout_prefix
27 #define dout_prefix _prefix(_dout, mon, epoch)
28 static ostream& _prefix(std::ostream *_dout, Monitor *mon, epoch_t epoch) {
29 return *_dout << "mon." << mon->name << "@" << mon->rank
30 << "(" << mon->get_state_name()
31 << ").elector(" << epoch << ") ";
32 }
33
34
35 void Elector::init()
36 {
37 epoch = mon->store->get(Monitor::MONITOR_NAME, "election_epoch");
38 if (!epoch)
39 epoch = 1;
40 dout(1) << "init, last seen epoch " << epoch << dendl;
41 }
42
43 void Elector::shutdown()
44 {
45 cancel_timer();
46 }
47
48 void Elector::bump_epoch(epoch_t e)
49 {
50 dout(10) << "bump_epoch " << epoch << " to " << e << dendl;
51 assert(epoch <= e);
52 epoch = e;
53 auto t(std::make_shared<MonitorDBStore::Transaction>());
54 t->put(Monitor::MONITOR_NAME, "election_epoch", epoch);
55 mon->store->apply_transaction(t);
56
57 mon->join_election();
58
59 // clear up some state
60 electing_me = false;
61 acked_me.clear();
62 }
63
64
65 void Elector::start()
66 {
67 if (!participating) {
68 dout(0) << "not starting new election -- not participating" << dendl;
69 return;
70 }
71 dout(5) << "start -- can i be leader?" << dendl;
72
73 acked_me.clear();
74 init();
75
76 // start by trying to elect me
77 if (epoch % 2 == 0) {
78 bump_epoch(epoch+1); // odd == election cycle
79 } else {
80 // do a trivial db write just to ensure it is writeable.
81 auto t(std::make_shared<MonitorDBStore::Transaction>());
82 t->put(Monitor::MONITOR_NAME, "election_writeable_test", rand());
83 int r = mon->store->apply_transaction(t);
84 assert(r >= 0);
85 }
86 start_stamp = ceph_clock_now();
87 electing_me = true;
88 acked_me[mon->rank].cluster_features = CEPH_FEATURES_ALL;
89 acked_me[mon->rank].mon_features = ceph::features::mon::get_supported();
90 leader_acked = -1;
91
92 // bcast to everyone else
93 for (unsigned i=0; i<mon->monmap->size(); ++i) {
94 if ((int)i == mon->rank) continue;
95 MMonElection *m =
96 new MMonElection(MMonElection::OP_PROPOSE, epoch, mon->monmap);
97 m->mon_features = ceph::features::mon::get_supported();
98 mon->messenger->send_message(m, mon->monmap->get_inst(i));
99 }
100
101 reset_timer();
102 }
103
104 void Elector::defer(int who)
105 {
106 dout(5) << "defer to " << who << dendl;
107
108 if (electing_me) {
109 // drop out
110 acked_me.clear();
111 electing_me = false;
112 }
113
114 // ack them
115 leader_acked = who;
116 ack_stamp = ceph_clock_now();
117 MMonElection *m = new MMonElection(MMonElection::OP_ACK, epoch, mon->monmap);
118 m->mon_features = ceph::features::mon::get_supported();
119 m->sharing_bl = mon->get_supported_commands_bl();
120 mon->messenger->send_message(m, mon->monmap->get_inst(who));
121
122 // set a timer
123 reset_timer(1.0); // give the leader some extra time to declare victory
124 }
125
126
127 void Elector::reset_timer(double plus)
128 {
129 // set the timer
130 cancel_timer();
131 /**
132 * This class is used as the callback when the expire_event timer fires up.
133 *
134 * If the expire_event is fired, then it means that we had an election going,
135 * either started by us or by some other participant, but it took too long,
136 * thus expiring.
137 *
138 * When the election expires, we will check if we were the ones who won, and
139 * if so we will declare victory. If that is not the case, then we assume
140 * that the one we defered to didn't declare victory quickly enough (in fact,
141 * as far as we know, we may even be dead); so, just propose ourselves as the
142 * Leader.
143 */
144 expire_event = new C_MonContext(mon, [this](int) {
145 expire();
146 });
147 mon->timer.add_event_after(g_conf->mon_election_timeout + plus,
148 expire_event);
149 }
150
151
152 void Elector::cancel_timer()
153 {
154 if (expire_event) {
155 mon->timer.cancel_event(expire_event);
156 expire_event = 0;
157 }
158 }
159
160 void Elector::expire()
161 {
162 dout(5) << "election timer expired" << dendl;
163
164 // did i win?
165 if (electing_me &&
166 acked_me.size() > (unsigned)(mon->monmap->size() / 2)) {
167 // i win
168 victory();
169 } else {
170 // whoever i deferred to didn't declare victory quickly enough.
171 if (mon->has_ever_joined)
172 start();
173 else
174 mon->bootstrap();
175 }
176 }
177
178
179 void Elector::victory()
180 {
181 leader_acked = -1;
182 electing_me = false;
183
184 uint64_t cluster_features = CEPH_FEATURES_ALL;
185 mon_feature_t mon_features = ceph::features::mon::get_supported();
186 set<int> quorum;
187 for (map<int, elector_features_t>::iterator p = acked_me.begin();
188 p != acked_me.end();
189 ++p) {
190 quorum.insert(p->first);
191 cluster_features &= p->second.cluster_features;
192 mon_features &= p->second.mon_features;
193 }
194
195 cancel_timer();
196
197 assert(epoch % 2 == 1); // election
198 bump_epoch(epoch+1); // is over!
199
200 // decide my supported commands for peons to advertise
201 const bufferlist *cmds_bl = NULL;
202 const MonCommand *cmds;
203 int cmdsize;
204 mon->get_locally_supported_monitor_commands(&cmds, &cmdsize);
205 cmds_bl = &mon->get_supported_commands_bl();
206
207 // tell everyone!
208 for (set<int>::iterator p = quorum.begin();
209 p != quorum.end();
210 ++p) {
211 if (*p == mon->rank) continue;
212 MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, epoch, mon->monmap);
213 m->quorum = quorum;
214 m->quorum_features = cluster_features;
215 m->mon_features = mon_features;
216 m->sharing_bl = *cmds_bl;
217 mon->messenger->send_message(m, mon->monmap->get_inst(*p));
218 }
219
220 // tell monitor
221 mon->win_election(epoch, quorum,
222 cluster_features, mon_features,
223 cmds, cmdsize);
224 }
225
226
227 void Elector::handle_propose(MonOpRequestRef op)
228 {
229 op->mark_event("elector:handle_propose");
230 MMonElection *m = static_cast<MMonElection*>(op->get_req());
231 dout(5) << "handle_propose from " << m->get_source() << dendl;
232 int from = m->get_source().num();
233
234 assert(m->epoch % 2 == 1); // election
235 uint64_t required_features = mon->get_required_features();
236 mon_feature_t required_mon_features = mon->get_required_mon_features();
237
238 dout(10) << __func__ << " required features " << required_features
239 << " " << required_mon_features
240 << ", peer features " << m->get_connection()->get_features()
241 << " " << m->mon_features
242 << dendl;
243
244 if ((required_features ^ m->get_connection()->get_features()) &
245 required_features) {
246 dout(5) << " ignoring propose from mon" << from
247 << " without required features" << dendl;
248 nak_old_peer(op);
249 return;
250 } else if (!m->mon_features.contains_all(required_mon_features)) {
251 // all the features in 'required_mon_features' not in 'm->mon_features'
252 mon_feature_t missing = required_mon_features.diff(m->mon_features);
253 dout(5) << " ignoring propose from mon." << from
254 << " without required mon_features " << missing
255 << dendl;
256 nak_old_peer(op);
257 } else if (m->epoch > epoch) {
258 bump_epoch(m->epoch);
259 } else if (m->epoch < epoch) {
260 // got an "old" propose,
261 if (epoch % 2 == 0 && // in a non-election cycle
262 mon->quorum.count(from) == 0) { // from someone outside the quorum
263 // a mon just started up, call a new election so they can rejoin!
264 dout(5) << " got propose from old epoch, quorum is " << mon->quorum
265 << ", " << m->get_source() << " must have just started" << dendl;
266 // we may be active; make sure we reset things in the monitor appropriately.
267 mon->start_election();
268 } else {
269 dout(5) << " ignoring old propose" << dendl;
270 return;
271 }
272 }
273
274 if (mon->rank < from) {
275 // i would win over them.
276 if (leader_acked >= 0) { // we already acked someone
277 assert(leader_acked < from); // and they still win, of course
278 dout(5) << "no, we already acked " << leader_acked << dendl;
279 } else {
280 // wait, i should win!
281 if (!electing_me) {
282 mon->start_election();
283 }
284 }
285 } else {
286 // they would win over me
287 if (leader_acked < 0 || // haven't acked anyone yet, or
288 leader_acked > from || // they would win over who you did ack, or
289 leader_acked == from) { // this is the guy we're already deferring to
290 defer(from);
291 } else {
292 // ignore them!
293 dout(5) << "no, we already acked " << leader_acked << dendl;
294 }
295 }
296 }
297
298 void Elector::handle_ack(MonOpRequestRef op)
299 {
300 op->mark_event("elector:handle_ack");
301 MMonElection *m = static_cast<MMonElection*>(op->get_req());
302 dout(5) << "handle_ack from " << m->get_source() << dendl;
303 int from = m->get_source().num();
304
305 assert(m->epoch % 2 == 1); // election
306 if (m->epoch > epoch) {
307 dout(5) << "woah, that's a newer epoch, i must have rebooted. bumping and re-starting!" << dendl;
308 bump_epoch(m->epoch);
309 start();
310 return;
311 }
312 assert(m->epoch == epoch);
313 uint64_t required_features = mon->get_required_features();
314 if ((required_features ^ m->get_connection()->get_features()) &
315 required_features) {
316 dout(5) << " ignoring ack from mon" << from
317 << " without required features" << dendl;
318 return;
319 }
320
321 mon_feature_t required_mon_features = mon->get_required_mon_features();
322 if (!m->mon_features.contains_all(required_mon_features)) {
323 mon_feature_t missing = required_mon_features.diff(m->mon_features);
324 dout(5) << " ignoring ack from mon." << from
325 << " without required mon_features " << missing
326 << dendl;
327 return;
328 }
329
330 if (electing_me) {
331 // thanks
332 acked_me[from].cluster_features = m->get_connection()->get_features();
333 acked_me[from].mon_features = m->mon_features;
334 dout(5) << " so far i have {";
335 for (map<int, elector_features_t>::const_iterator p = acked_me.begin();
336 p != acked_me.end();
337 ++p) {
338 if (p != acked_me.begin())
339 *_dout << ",";
340 *_dout << " mon." << p->first << ":"
341 << " features " << p->second.cluster_features
342 << " " << p->second.mon_features;
343 }
344 *_dout << " }" << dendl;
345
346 // is that _everyone_?
347 if (acked_me.size() == mon->monmap->size()) {
348 // if yes, shortcut to election finish
349 victory();
350 }
351 } else {
352 // ignore, i'm deferring already.
353 assert(leader_acked >= 0);
354 }
355 }
356
357
358 void Elector::handle_victory(MonOpRequestRef op)
359 {
360 op->mark_event("elector:handle_victory");
361 MMonElection *m = static_cast<MMonElection*>(op->get_req());
362 dout(5) << "handle_victory from " << m->get_source()
363 << " quorum_features " << m->quorum_features
364 << " " << m->mon_features
365 << dendl;
366 int from = m->get_source().num();
367
368 assert(from < mon->rank);
369 assert(m->epoch % 2 == 0);
370
371 leader_acked = -1;
372
373 // i should have seen this election if i'm getting the victory.
374 if (m->epoch != epoch + 1) {
375 dout(5) << "woah, that's a funny epoch, i must have rebooted. bumping and re-starting!" << dendl;
376 bump_epoch(m->epoch);
377 start();
378 return;
379 }
380
381 bump_epoch(m->epoch);
382
383 // they win
384 mon->lose_election(epoch, m->quorum, from,
385 m->quorum_features, m->mon_features);
386
387 // cancel my timer
388 cancel_timer();
389
390 // stash leader's commands
391 assert(m->sharing_bl.length());
392 MonCommand *new_cmds;
393 int cmdsize;
394 bufferlist::iterator bi = m->sharing_bl.begin();
395 MonCommand::decode_array(&new_cmds, &cmdsize, bi);
396 mon->set_leader_supported_commands(new_cmds, cmdsize);
397 }
398
399 void Elector::nak_old_peer(MonOpRequestRef op)
400 {
401 op->mark_event("elector:nak_old_peer");
402 MMonElection *m = static_cast<MMonElection*>(op->get_req());
403 uint64_t supported_features = m->get_connection()->get_features();
404 uint64_t required_features = mon->get_required_features();
405 mon_feature_t required_mon_features = mon->get_required_mon_features();
406 dout(10) << "sending nak to peer " << m->get_source()
407 << " that only supports " << supported_features
408 << " " << m->mon_features
409 << " of the required " << required_features
410 << " " << required_mon_features
411 << dendl;
412
413 MMonElection *reply = new MMonElection(MMonElection::OP_NAK, m->epoch,
414 mon->monmap);
415 reply->quorum_features = required_features;
416 reply->mon_features = required_mon_features;
417 mon->features.encode(reply->sharing_bl);
418 m->get_connection()->send_message(reply);
419 }
420
421 void Elector::handle_nak(MonOpRequestRef op)
422 {
423 op->mark_event("elector:handle_nak");
424 MMonElection *m = static_cast<MMonElection*>(op->get_req());
425 dout(1) << "handle_nak from " << m->get_source()
426 << " quorum_features " << m->quorum_features
427 << " " << m->mon_features
428 << dendl;
429
430 CompatSet other;
431 bufferlist::iterator bi = m->sharing_bl.begin();
432 other.decode(bi);
433 CompatSet diff = Monitor::get_supported_features().unsupported(other);
434
435 mon_feature_t mon_supported = ceph::features::mon::get_supported();
436 // all features in 'm->mon_features' not in 'mon_supported'
437 mon_feature_t mon_diff = m->mon_features.diff(mon_supported);
438
439 derr << "Shutting down because I do not support required monitor features: { "
440 << diff << " } " << mon_diff << dendl;
441
442 exit(0);
443 // the end!
444 }
445
446 void Elector::dispatch(MonOpRequestRef op)
447 {
448 op->mark_event("elector:dispatch");
449 assert(op->is_type_election());
450
451 switch (op->get_req()->get_type()) {
452
453 case MSG_MON_ELECTION:
454 {
455 if (!participating) {
456 return;
457 }
458 if (op->get_req()->get_source().num() >= mon->monmap->size()) {
459 dout(5) << " ignoring bogus election message with bad mon rank "
460 << op->get_req()->get_source() << dendl;
461 return;
462 }
463
464 MMonElection *em = static_cast<MMonElection*>(op->get_req());
465
466 // assume an old message encoding would have matched
467 if (em->fsid != mon->monmap->fsid) {
468 dout(0) << " ignoring election msg fsid "
469 << em->fsid << " != " << mon->monmap->fsid << dendl;
470 return;
471 }
472
473 if (!mon->monmap->contains(em->get_source_addr())) {
474 dout(1) << "discarding election message: " << em->get_source_addr()
475 << " not in my monmap " << *mon->monmap << dendl;
476 return;
477 }
478
479 MonMap peermap;
480 peermap.decode(em->monmap_bl);
481 if (peermap.epoch > mon->monmap->epoch) {
482 dout(0) << em->get_source_inst() << " has newer monmap epoch " << peermap.epoch
483 << " > my epoch " << mon->monmap->epoch
484 << ", taking it"
485 << dendl;
486 mon->monmap->decode(em->monmap_bl);
487 auto t(std::make_shared<MonitorDBStore::Transaction>());
488 t->put("monmap", mon->monmap->epoch, em->monmap_bl);
489 t->put("monmap", "last_committed", mon->monmap->epoch);
490 mon->store->apply_transaction(t);
491 //mon->monmon()->paxos->stash_latest(mon->monmap->epoch, em->monmap_bl);
492 cancel_timer();
493 mon->bootstrap();
494 return;
495 }
496 if (peermap.epoch < mon->monmap->epoch) {
497 dout(0) << em->get_source_inst() << " has older monmap epoch " << peermap.epoch
498 << " < my epoch " << mon->monmap->epoch
499 << dendl;
500 }
501
502 switch (em->op) {
503 case MMonElection::OP_PROPOSE:
504 handle_propose(op);
505 return;
506 }
507
508 if (em->epoch < epoch) {
509 dout(5) << "old epoch, dropping" << dendl;
510 break;
511 }
512
513 switch (em->op) {
514 case MMonElection::OP_ACK:
515 handle_ack(op);
516 return;
517 case MMonElection::OP_VICTORY:
518 handle_victory(op);
519 return;
520 case MMonElection::OP_NAK:
521 handle_nak(op);
522 return;
523 default:
524 ceph_abort();
525 }
526 }
527 break;
528
529 default:
530 ceph_abort();
531 }
532 }
533
534 void Elector::start_participating()
535 {
536 if (!participating) {
537 participating = true;
538 }
539 }