1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include "common/Timer.h"
19 #include "MonitorDBStore.h"
20 #include "messages/MMonElection.h"
22 #include "common/config.h"
23 #include "include/assert.h"
25 #define dout_subsys ceph_subsys_mon
27 #define dout_prefix _prefix(_dout, mon, epoch)
28 static ostream
& _prefix(std::ostream
*_dout
, Monitor
*mon
, epoch_t epoch
) {
29 return *_dout
<< "mon." << mon
->name
<< "@" << mon
->rank
30 << "(" << mon
->get_state_name()
31 << ").elector(" << epoch
<< ") ";
37 epoch
= mon
->store
->get(Monitor::MONITOR_NAME
, "election_epoch");
40 dout(1) << "init, last seen epoch " << epoch
<< dendl
;
43 void Elector::shutdown()
48 void Elector::bump_epoch(epoch_t e
)
50 dout(10) << "bump_epoch " << epoch
<< " to " << e
<< dendl
;
53 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
54 t
->put(Monitor::MONITOR_NAME
, "election_epoch", epoch
);
55 mon
->store
->apply_transaction(t
);
59 // clear up some state
68 dout(0) << "not starting new election -- not participating" << dendl
;
71 dout(5) << "start -- can i be leader?" << dendl
;
76 // start by trying to elect me
78 bump_epoch(epoch
+1); // odd == election cycle
80 // do a trivial db write just to ensure it is writeable.
81 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
82 t
->put(Monitor::MONITOR_NAME
, "election_writeable_test", rand());
83 int r
= mon
->store
->apply_transaction(t
);
86 start_stamp
= ceph_clock_now();
88 acked_me
[mon
->rank
].cluster_features
= CEPH_FEATURES_ALL
;
89 acked_me
[mon
->rank
].mon_features
= ceph::features::mon::get_supported();
90 mon
->collect_metadata(&acked_me
[mon
->rank
].metadata
);
93 // bcast to everyone else
94 for (unsigned i
=0; i
<mon
->monmap
->size(); ++i
) {
95 if ((int)i
== mon
->rank
) continue;
97 new MMonElection(MMonElection::OP_PROPOSE
, epoch
, mon
->monmap
);
98 m
->mon_features
= ceph::features::mon::get_supported();
99 mon
->messenger
->send_message(m
, mon
->monmap
->get_inst(i
));
105 void Elector::defer(int who
)
107 dout(5) << "defer to " << who
<< dendl
;
117 ack_stamp
= ceph_clock_now();
118 MMonElection
*m
= new MMonElection(MMonElection::OP_ACK
, epoch
, mon
->monmap
);
119 m
->mon_features
= ceph::features::mon::get_supported();
120 m
->sharing_bl
= mon
->get_supported_commands_bl();
121 mon
->collect_metadata(&m
->metadata
);
122 mon
->messenger
->send_message(m
, mon
->monmap
->get_inst(who
));
125 reset_timer(1.0); // give the leader some extra time to declare victory
129 void Elector::reset_timer(double plus
)
134 * This class is used as the callback when the expire_event timer fires up.
136 * If the expire_event is fired, then it means that we had an election going,
137 * either started by us or by some other participant, but it took too long,
140 * When the election expires, we will check if we were the ones who won, and
141 * if so we will declare victory. If that is not the case, then we assume
142 * that the one we defered to didn't declare victory quickly enough (in fact,
143 * as far as we know, we may even be dead); so, just propose ourselves as the
146 expire_event
= new C_MonContext(mon
, [this](int) {
149 mon
->timer
.add_event_after(g_conf
->mon_election_timeout
+ plus
,
154 void Elector::cancel_timer()
157 mon
->timer
.cancel_event(expire_event
);
162 void Elector::expire()
164 dout(5) << "election timer expired" << dendl
;
168 acked_me
.size() > (unsigned)(mon
->monmap
->size() / 2)) {
172 // whoever i deferred to didn't declare victory quickly enough.
173 if (mon
->has_ever_joined
)
181 void Elector::victory()
186 uint64_t cluster_features
= CEPH_FEATURES_ALL
;
187 mon_feature_t mon_features
= ceph::features::mon::get_supported();
189 map
<int,Metadata
> metadata
;
190 for (map
<int, elector_info_t
>::iterator p
= acked_me
.begin();
193 quorum
.insert(p
->first
);
194 cluster_features
&= p
->second
.cluster_features
;
195 mon_features
&= p
->second
.mon_features
;
196 metadata
[p
->first
] = p
->second
.metadata
;
201 assert(epoch
% 2 == 1); // election
202 bump_epoch(epoch
+1); // is over!
204 // decide my supported commands for peons to advertise
205 const bufferlist
*cmds_bl
= NULL
;
206 const MonCommand
*cmds
;
208 mon
->get_locally_supported_monitor_commands(&cmds
, &cmdsize
);
209 cmds_bl
= &mon
->get_supported_commands_bl();
212 for (set
<int>::iterator p
= quorum
.begin();
215 if (*p
== mon
->rank
) continue;
216 MMonElection
*m
= new MMonElection(MMonElection::OP_VICTORY
, epoch
, mon
->monmap
);
218 m
->quorum_features
= cluster_features
;
219 m
->mon_features
= mon_features
;
220 m
->sharing_bl
= *cmds_bl
;
221 mon
->messenger
->send_message(m
, mon
->monmap
->get_inst(*p
));
225 mon
->win_election(epoch
, quorum
,
226 cluster_features
, mon_features
, metadata
,
231 void Elector::handle_propose(MonOpRequestRef op
)
233 op
->mark_event("elector:handle_propose");
234 MMonElection
*m
= static_cast<MMonElection
*>(op
->get_req());
235 dout(5) << "handle_propose from " << m
->get_source() << dendl
;
236 int from
= m
->get_source().num();
238 assert(m
->epoch
% 2 == 1); // election
239 uint64_t required_features
= mon
->get_required_features();
240 mon_feature_t required_mon_features
= mon
->get_required_mon_features();
242 dout(10) << __func__
<< " required features " << required_features
243 << " " << required_mon_features
244 << ", peer features " << m
->get_connection()->get_features()
245 << " " << m
->mon_features
248 if ((required_features
^ m
->get_connection()->get_features()) &
250 dout(5) << " ignoring propose from mon" << from
251 << " without required features" << dendl
;
254 } else if (!m
->mon_features
.contains_all(required_mon_features
)) {
255 // all the features in 'required_mon_features' not in 'm->mon_features'
256 mon_feature_t missing
= required_mon_features
.diff(m
->mon_features
);
257 dout(5) << " ignoring propose from mon." << from
258 << " without required mon_features " << missing
261 } else if (m
->epoch
> epoch
) {
262 bump_epoch(m
->epoch
);
263 } else if (m
->epoch
< epoch
) {
264 // got an "old" propose,
265 if (epoch
% 2 == 0 && // in a non-election cycle
266 mon
->quorum
.count(from
) == 0) { // from someone outside the quorum
267 // a mon just started up, call a new election so they can rejoin!
268 dout(5) << " got propose from old epoch, quorum is " << mon
->quorum
269 << ", " << m
->get_source() << " must have just started" << dendl
;
270 // we may be active; make sure we reset things in the monitor appropriately.
271 mon
->start_election();
273 dout(5) << " ignoring old propose" << dendl
;
278 if (mon
->rank
< from
) {
279 // i would win over them.
280 if (leader_acked
>= 0) { // we already acked someone
281 assert(leader_acked
< from
); // and they still win, of course
282 dout(5) << "no, we already acked " << leader_acked
<< dendl
;
284 // wait, i should win!
286 mon
->start_election();
290 // they would win over me
291 if (leader_acked
< 0 || // haven't acked anyone yet, or
292 leader_acked
> from
|| // they would win over who you did ack, or
293 leader_acked
== from
) { // this is the guy we're already deferring to
297 dout(5) << "no, we already acked " << leader_acked
<< dendl
;
302 void Elector::handle_ack(MonOpRequestRef op
)
304 op
->mark_event("elector:handle_ack");
305 MMonElection
*m
= static_cast<MMonElection
*>(op
->get_req());
306 dout(5) << "handle_ack from " << m
->get_source() << dendl
;
307 int from
= m
->get_source().num();
309 assert(m
->epoch
% 2 == 1); // election
310 if (m
->epoch
> epoch
) {
311 dout(5) << "woah, that's a newer epoch, i must have rebooted. bumping and re-starting!" << dendl
;
312 bump_epoch(m
->epoch
);
316 assert(m
->epoch
== epoch
);
317 uint64_t required_features
= mon
->get_required_features();
318 if ((required_features
^ m
->get_connection()->get_features()) &
320 dout(5) << " ignoring ack from mon" << from
321 << " without required features" << dendl
;
325 mon_feature_t required_mon_features
= mon
->get_required_mon_features();
326 if (!m
->mon_features
.contains_all(required_mon_features
)) {
327 mon_feature_t missing
= required_mon_features
.diff(m
->mon_features
);
328 dout(5) << " ignoring ack from mon." << from
329 << " without required mon_features " << missing
336 acked_me
[from
].cluster_features
= m
->get_connection()->get_features();
337 acked_me
[from
].mon_features
= m
->mon_features
;
338 acked_me
[from
].metadata
= m
->metadata
;
339 dout(5) << " so far i have {";
340 for (map
<int, elector_info_t
>::const_iterator p
= acked_me
.begin();
343 if (p
!= acked_me
.begin())
345 *_dout
<< " mon." << p
->first
<< ":"
346 << " features " << p
->second
.cluster_features
347 << " " << p
->second
.mon_features
;
349 *_dout
<< " }" << dendl
;
351 // is that _everyone_?
352 if (acked_me
.size() == mon
->monmap
->size()) {
353 // if yes, shortcut to election finish
357 // ignore, i'm deferring already.
358 assert(leader_acked
>= 0);
363 void Elector::handle_victory(MonOpRequestRef op
)
365 op
->mark_event("elector:handle_victory");
366 MMonElection
*m
= static_cast<MMonElection
*>(op
->get_req());
367 dout(5) << "handle_victory from " << m
->get_source()
368 << " quorum_features " << m
->quorum_features
369 << " " << m
->mon_features
371 int from
= m
->get_source().num();
373 assert(from
< mon
->rank
);
374 assert(m
->epoch
% 2 == 0);
378 // i should have seen this election if i'm getting the victory.
379 if (m
->epoch
!= epoch
+ 1) {
380 dout(5) << "woah, that's a funny epoch, i must have rebooted. bumping and re-starting!" << dendl
;
381 bump_epoch(m
->epoch
);
386 bump_epoch(m
->epoch
);
389 mon
->lose_election(epoch
, m
->quorum
, from
,
390 m
->quorum_features
, m
->mon_features
);
395 // stash leader's commands
396 assert(m
->sharing_bl
.length());
397 MonCommand
*new_cmds
;
399 bufferlist::iterator bi
= m
->sharing_bl
.begin();
400 MonCommand::decode_array(&new_cmds
, &cmdsize
, bi
);
401 mon
->set_leader_supported_commands(new_cmds
, cmdsize
);
404 void Elector::nak_old_peer(MonOpRequestRef op
)
406 op
->mark_event("elector:nak_old_peer");
407 MMonElection
*m
= static_cast<MMonElection
*>(op
->get_req());
408 uint64_t supported_features
= m
->get_connection()->get_features();
409 uint64_t required_features
= mon
->get_required_features();
410 mon_feature_t required_mon_features
= mon
->get_required_mon_features();
411 dout(10) << "sending nak to peer " << m
->get_source()
412 << " that only supports " << supported_features
413 << " " << m
->mon_features
414 << " of the required " << required_features
415 << " " << required_mon_features
418 MMonElection
*reply
= new MMonElection(MMonElection::OP_NAK
, m
->epoch
,
420 reply
->quorum_features
= required_features
;
421 reply
->mon_features
= required_mon_features
;
422 mon
->features
.encode(reply
->sharing_bl
);
423 m
->get_connection()->send_message(reply
);
426 void Elector::handle_nak(MonOpRequestRef op
)
428 op
->mark_event("elector:handle_nak");
429 MMonElection
*m
= static_cast<MMonElection
*>(op
->get_req());
430 dout(1) << "handle_nak from " << m
->get_source()
431 << " quorum_features " << m
->quorum_features
432 << " " << m
->mon_features
436 bufferlist::iterator bi
= m
->sharing_bl
.begin();
438 CompatSet diff
= Monitor::get_supported_features().unsupported(other
);
440 mon_feature_t mon_supported
= ceph::features::mon::get_supported();
441 // all features in 'm->mon_features' not in 'mon_supported'
442 mon_feature_t mon_diff
= m
->mon_features
.diff(mon_supported
);
444 derr
<< "Shutting down because I do not support required monitor features: { "
445 << diff
<< " } " << mon_diff
<< dendl
;
451 void Elector::dispatch(MonOpRequestRef op
)
453 op
->mark_event("elector:dispatch");
454 assert(op
->is_type_election());
456 switch (op
->get_req()->get_type()) {
458 case MSG_MON_ELECTION
:
460 if (!participating
) {
463 if (op
->get_req()->get_source().num() >= mon
->monmap
->size()) {
464 dout(5) << " ignoring bogus election message with bad mon rank "
465 << op
->get_req()->get_source() << dendl
;
469 MMonElection
*em
= static_cast<MMonElection
*>(op
->get_req());
471 // assume an old message encoding would have matched
472 if (em
->fsid
!= mon
->monmap
->fsid
) {
473 dout(0) << " ignoring election msg fsid "
474 << em
->fsid
<< " != " << mon
->monmap
->fsid
<< dendl
;
478 if (!mon
->monmap
->contains(em
->get_source_addr())) {
479 dout(1) << "discarding election message: " << em
->get_source_addr()
480 << " not in my monmap " << *mon
->monmap
<< dendl
;
485 peermap
.decode(em
->monmap_bl
);
486 if (peermap
.epoch
> mon
->monmap
->epoch
) {
487 dout(0) << em
->get_source_inst() << " has newer monmap epoch " << peermap
.epoch
488 << " > my epoch " << mon
->monmap
->epoch
491 mon
->monmap
->decode(em
->monmap_bl
);
492 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
493 t
->put("monmap", mon
->monmap
->epoch
, em
->monmap_bl
);
494 t
->put("monmap", "last_committed", mon
->monmap
->epoch
);
495 mon
->store
->apply_transaction(t
);
496 //mon->monmon()->paxos->stash_latest(mon->monmap->epoch, em->monmap_bl);
501 if (peermap
.epoch
< mon
->monmap
->epoch
) {
502 dout(0) << em
->get_source_inst() << " has older monmap epoch " << peermap
.epoch
503 << " < my epoch " << mon
->monmap
->epoch
508 case MMonElection::OP_PROPOSE
:
513 if (em
->epoch
< epoch
) {
514 dout(5) << "old epoch, dropping" << dendl
;
519 case MMonElection::OP_ACK
:
522 case MMonElection::OP_VICTORY
:
525 case MMonElection::OP_NAK
:
539 void Elector::start_participating()
541 if (!participating
) {
542 participating
= true;