1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include "common/Timer.h"
19 #include "MonitorDBStore.h"
20 #include "messages/MMonElection.h"
21 #include "messages/MMonPing.h"
23 #include "common/config.h"
24 #include "include/ceph_assert.h"
26 #define dout_subsys ceph_subsys_mon
28 #define dout_prefix _prefix(_dout, mon, get_epoch())
37 using std::ostringstream
;
42 using std::stringstream
;
45 using std::unique_ptr
;
47 using ceph::bufferlist
;
50 using ceph::Formatter
;
51 using ceph::JSONFormatter
;
52 using ceph::mono_clock
;
53 using ceph::mono_time
;
54 using ceph::timespan_str
;
55 static ostream
& _prefix(std::ostream
*_dout
, Monitor
*mon
, epoch_t epoch
) {
56 return *_dout
<< "mon." << mon
->name
<< "@" << mon
->rank
57 << "(" << mon
->get_state_name()
58 << ").elector(" << epoch
<< ") ";
61 Elector::Elector(Monitor
*m
, int strategy
) : logic(this, static_cast<ElectionLogic::election_strategy
>(strategy
),
63 m
->cct
->_conf
.get_val
<double>("mon_elector_ignore_propose_margin"),
65 peer_tracker(this, m
->rank
,
66 m
->cct
->_conf
.get_val
<uint64_t>("mon_con_tracker_score_halflife"),
67 m
->cct
->_conf
.get_val
<uint64_t>("mon_con_tracker_persist_interval")),
68 ping_timeout(m
->cct
->_conf
.get_val
<double>("mon_elector_ping_timeout")),
69 PING_DIVISOR(m
->cct
->_conf
.get_val
<uint64_t>("mon_elector_ping_divisor")),
70 mon(m
), elector(this) {
72 mon
->store
->get(Monitor::MONITOR_NAME
, "connectivity_scores", bl
);
74 bufferlist::const_iterator bi
= bl
.begin();
75 peer_tracker
.decode(bi
);
80 void Elector::persist_epoch(epoch_t e
)
82 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
83 t
->put(Monitor::MONITOR_NAME
, "election_epoch", e
);
84 t
->put(Monitor::MONITOR_NAME
, "connectivity_scores", peer_tracker
.get_encoded_bl());
85 mon
->store
->apply_transaction(t
);
88 void Elector::persist_connectivity_scores()
90 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
91 t
->put(Monitor::MONITOR_NAME
, "connectivity_scores", peer_tracker
.get_encoded_bl());
92 mon
->store
->apply_transaction(t
);
95 epoch_t
Elector::read_persisted_epoch() const
97 return mon
->store
->get(Monitor::MONITOR_NAME
, "election_epoch");
100 void Elector::validate_store()
102 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
103 t
->put(Monitor::MONITOR_NAME
, "election_writeable_test", rand());
104 int r
= mon
->store
->apply_transaction(t
);
108 bool Elector::is_current_member(int rank
) const
110 return mon
->quorum
.count(rank
);
113 void Elector::trigger_new_election()
115 mon
->start_election();
118 int Elector::get_my_rank() const
123 void Elector::reset_election()
128 bool Elector::ever_participated() const
130 return mon
->has_ever_joined
;
133 unsigned Elector::paxos_size() const
135 return (unsigned)mon
->monmap
->size();
138 void Elector::shutdown()
143 void Elector::notify_bump_epoch()
145 mon
->join_election();
148 void Elector::propose_to_peers(epoch_t e
, bufferlist
& logic_bl
)
150 // bcast to everyone else
151 for (unsigned i
=0; i
<mon
->monmap
->size(); ++i
) {
152 if ((int)i
== mon
->rank
) continue;
154 new MMonElection(MMonElection::OP_PROPOSE
, e
,
155 peer_tracker
.get_encoded_bl(),
156 logic
.strategy
, mon
->monmap
);
157 m
->sharing_bl
= logic_bl
;
158 m
->mon_features
= ceph::features::mon::get_supported();
159 m
->mon_release
= ceph_release();
160 mon
->send_mon_message(m
, i
);
164 void Elector::_start()
167 peer_info
[mon
->rank
].cluster_features
= CEPH_FEATURES_ALL
;
168 peer_info
[mon
->rank
].mon_release
= ceph_release();
169 peer_info
[mon
->rank
].mon_features
= ceph::features::mon::get_supported();
170 mon
->collect_metadata(&peer_info
[mon
->rank
].metadata
);
174 void Elector::_defer_to(int who
)
176 MMonElection
*m
= new MMonElection(MMonElection::OP_ACK
, get_epoch(),
177 peer_tracker
.get_encoded_bl(),
178 logic
.strategy
, mon
->monmap
);
179 m
->mon_features
= ceph::features::mon::get_supported();
180 m
->mon_release
= ceph_release();
181 mon
->collect_metadata(&m
->metadata
);
183 mon
->send_mon_message(m
, who
);
186 reset_timer(1.0); // give the leader some extra time to declare victory
190 void Elector::reset_timer(double plus
)
195 * This class is used as the callback when the expire_event timer fires up.
197 * If the expire_event is fired, then it means that we had an election going,
198 * either started by us or by some other participant, but it took too long,
201 * When the election expires, we will check if we were the ones who won, and
202 * if so we will declare victory. If that is not the case, then we assume
203 * that the one we defered to didn't declare victory quickly enough (in fact,
204 * as far as we know, we may even be dead); so, just propose ourselves as the
207 expire_event
= mon
->timer
.add_event_after(
208 g_conf()->mon_election_timeout
+ plus
,
209 new C_MonContext
{mon
, [this](int) {
210 logic
.end_election_period();
215 void Elector::cancel_timer()
218 mon
->timer
.cancel_event(expire_event
);
223 void Elector::assimilate_connection_reports(const bufferlist
& tbl
)
225 ConnectionTracker
pct(tbl
);
226 peer_tracker
.receive_peer_report(pct
);
229 void Elector::message_victory(const std::set
<int>& quorum
)
231 uint64_t cluster_features
= CEPH_FEATURES_ALL
;
232 mon_feature_t mon_features
= ceph::features::mon::get_supported();
233 map
<int,Metadata
> metadata
;
234 ceph_release_t min_mon_release
{ceph_release_t::unknown
};
235 for (auto id
: quorum
) {
236 auto i
= peer_info
.find(id
);
237 ceph_assert(i
!= peer_info
.end());
238 auto& info
= i
->second
;
239 cluster_features
&= info
.cluster_features
;
240 mon_features
&= info
.mon_features
;
241 metadata
[id
] = info
.metadata
;
242 if (min_mon_release
== ceph_release_t::unknown
||
243 info
.mon_release
< min_mon_release
) {
244 min_mon_release
= info
.mon_release
;
252 for (set
<int>::iterator p
= quorum
.begin();
255 if (*p
== mon
->rank
) continue;
256 MMonElection
*m
= new MMonElection(MMonElection::OP_VICTORY
, get_epoch(),
257 peer_tracker
.get_encoded_bl(),
258 logic
.strategy
, mon
->monmap
);
260 m
->quorum_features
= cluster_features
;
261 m
->mon_features
= mon_features
;
262 m
->sharing_bl
= mon
->get_local_commands_bl(mon_features
);
263 m
->mon_release
= min_mon_release
;
264 mon
->send_mon_message(m
, *p
);
268 mon
->win_election(get_epoch(), quorum
,
269 cluster_features
, mon_features
, min_mon_release
,
274 void Elector::handle_propose(MonOpRequestRef op
)
276 op
->mark_event("elector:handle_propose");
277 auto m
= op
->get_req
<MMonElection
>();
278 dout(5) << "handle_propose from " << m
->get_source() << dendl
;
279 int from
= m
->get_source().num();
281 ceph_assert(m
->epoch
% 2 == 1); // election
282 uint64_t required_features
= mon
->get_required_features();
283 mon_feature_t required_mon_features
= mon
->get_required_mon_features();
285 dout(10) << __func__
<< " required features " << required_features
286 << " " << required_mon_features
287 << ", peer features " << m
->get_connection()->get_features()
288 << " " << m
->mon_features
291 if ((required_features
^ m
->get_connection()->get_features()) &
293 dout(5) << " ignoring propose from mon" << from
294 << " without required features" << dendl
;
297 } else if (mon
->monmap
->min_mon_release
> m
->mon_release
) {
298 dout(5) << " ignoring propose from mon" << from
299 << " release " << (int)m
->mon_release
300 << " < min_mon_release " << (int)mon
->monmap
->min_mon_release
304 } else if (!m
->mon_features
.contains_all(required_mon_features
)) {
305 // all the features in 'required_mon_features' not in 'm->mon_features'
306 mon_feature_t missing
= required_mon_features
.diff(m
->mon_features
);
307 dout(5) << " ignoring propose from mon." << from
308 << " without required mon_features " << missing
312 ConnectionTracker
*oct
= NULL
;
313 if (m
->sharing_bl
.length()) {
314 oct
= new ConnectionTracker(m
->sharing_bl
);
316 logic
.receive_propose(from
, m
->epoch
, oct
);
320 void Elector::handle_ack(MonOpRequestRef op
)
322 op
->mark_event("elector:handle_ack");
323 auto m
= op
->get_req
<MMonElection
>();
324 dout(5) << "handle_ack from " << m
->get_source() << dendl
;
325 int from
= m
->get_source().num();
327 ceph_assert(m
->epoch
== get_epoch());
328 uint64_t required_features
= mon
->get_required_features();
329 if ((required_features
^ m
->get_connection()->get_features()) &
331 dout(5) << " ignoring ack from mon" << from
332 << " without required features" << dendl
;
336 mon_feature_t required_mon_features
= mon
->get_required_mon_features();
337 if (!m
->mon_features
.contains_all(required_mon_features
)) {
338 mon_feature_t missing
= required_mon_features
.diff(m
->mon_features
);
339 dout(5) << " ignoring ack from mon." << from
340 << " without required mon_features " << missing
345 if (logic
.electing_me
) {
347 peer_info
[from
].cluster_features
= m
->get_connection()->get_features();
348 peer_info
[from
].mon_features
= m
->mon_features
;
349 peer_info
[from
].mon_release
= m
->mon_release
;
350 peer_info
[from
].metadata
= m
->metadata
;
351 dout(5) << " so far i have {";
352 for (auto q
= logic
.acked_me
.begin();
353 q
!= logic
.acked_me
.end();
355 auto p
= peer_info
.find(*q
);
356 ceph_assert(p
!= peer_info
.end());
357 if (q
!= logic
.acked_me
.begin())
359 *_dout
<< " mon." << p
->first
<< ":"
360 << " features " << p
->second
.cluster_features
361 << " " << p
->second
.mon_features
;
363 *_dout
<< " }" << dendl
;
366 logic
.receive_ack(from
, m
->epoch
);
369 void Elector::handle_victory(MonOpRequestRef op
)
371 op
->mark_event("elector:handle_victory");
372 auto m
= op
->get_req
<MMonElection
>();
373 dout(5) << "handle_victory from " << m
->get_source()
374 << " quorum_features " << m
->quorum_features
375 << " " << m
->mon_features
377 int from
= m
->get_source().num();
379 bool accept_victory
= logic
.receive_victory_claim(from
, m
->epoch
);
381 if (!accept_victory
) {
385 mon
->lose_election(get_epoch(), m
->quorum
, from
,
386 m
->quorum_features
, m
->mon_features
, m
->mon_release
);
391 // stash leader's commands
392 ceph_assert(m
->sharing_bl
.length());
393 vector
<MonCommand
> new_cmds
;
394 auto bi
= m
->sharing_bl
.cbegin();
395 MonCommand::decode_vector(new_cmds
, bi
);
396 mon
->set_leader_commands(new_cmds
);
399 void Elector::nak_old_peer(MonOpRequestRef op
)
401 op
->mark_event("elector:nak_old_peer");
402 auto m
= op
->get_req
<MMonElection
>();
403 uint64_t supported_features
= m
->get_connection()->get_features();
404 uint64_t required_features
= mon
->get_required_features();
405 mon_feature_t required_mon_features
= mon
->get_required_mon_features();
406 dout(10) << "sending nak to peer " << m
->get_source()
407 << " supports " << supported_features
<< " " << m
->mon_features
408 << ", required " << required_features
<< " " << required_mon_features
409 << ", release " << (int)m
->mon_release
410 << " vs required " << (int)mon
->monmap
->min_mon_release
412 MMonElection
*reply
= new MMonElection(MMonElection::OP_NAK
, m
->epoch
,
413 peer_tracker
.get_encoded_bl(),
414 logic
.strategy
, mon
->monmap
);
415 reply
->quorum_features
= required_features
;
416 reply
->mon_features
= required_mon_features
;
417 reply
->mon_release
= mon
->monmap
->min_mon_release
;
418 mon
->features
.encode(reply
->sharing_bl
);
419 m
->get_connection()->send_message(reply
);
422 void Elector::handle_nak(MonOpRequestRef op
)
424 op
->mark_event("elector:handle_nak");
425 auto m
= op
->get_req
<MMonElection
>();
426 dout(1) << "handle_nak from " << m
->get_source()
427 << " quorum_features " << m
->quorum_features
428 << " " << m
->mon_features
429 << " min_mon_release " << (int)m
->mon_release
432 if (m
->mon_release
> ceph_release()) {
433 derr
<< "Shutting down because I am release " << (int)ceph_release()
434 << " < min_mon_release " << (int)m
->mon_release
<< dendl
;
437 auto bi
= m
->sharing_bl
.cbegin();
439 CompatSet diff
= Monitor::get_supported_features().unsupported(other
);
441 mon_feature_t mon_supported
= ceph::features::mon::get_supported();
442 // all features in 'm->mon_features' not in 'mon_supported'
443 mon_feature_t mon_diff
= m
->mon_features
.diff(mon_supported
);
445 derr
<< "Shutting down because I lack required monitor features: { "
446 << diff
<< " } " << mon_diff
<< dendl
;
452 void Elector::begin_peer_ping(int peer
)
454 if (live_pinging
.count(peer
)) {
458 if (!mon
->get_quorum_mon_features().contains_all(
459 ceph::features::mon::FEATURE_PINGING
)) {
463 dout(5) << __func__
<< " against " << peer
<< dendl
;
465 peer_tracker
.report_live_connection(peer
, 0); // init this peer as existing
466 live_pinging
.insert(peer
);
467 dead_pinging
.erase(peer
);
468 peer_acked_ping
[peer
] = ceph_clock_now();
469 send_peer_ping(peer
);
470 mon
->timer
.add_event_after(ping_timeout
/ PING_DIVISOR
,
471 new C_MonContext
{mon
, [this, peer
](int) {
476 void Elector::send_peer_ping(int peer
, const utime_t
*n
)
478 dout(10) << __func__
<< " to peer " << peer
<< dendl
;
484 now
= ceph_clock_now();
486 MMonPing
*ping
= new MMonPing(MMonPing::PING
, now
, peer_tracker
.get_encoded_bl());
487 mon
->messenger
->send_to_mon(ping
, mon
->monmap
->get_addrs(peer
));
488 peer_sent_ping
[peer
] = now
;
491 void Elector::ping_check(int peer
)
493 dout(20) << __func__
<< " to peer " << peer
<< dendl
;
494 if (!live_pinging
.count(peer
) &&
495 !dead_pinging
.count(peer
)) {
496 dout(20) << __func__
<< peer
<< " is no longer marked for pinging" << dendl
;
499 utime_t now
= ceph_clock_now();
500 utime_t
& acked_ping
= peer_acked_ping
[peer
];
501 utime_t
& newest_ping
= peer_sent_ping
[peer
];
502 if (!acked_ping
.is_zero() && acked_ping
< now
- ping_timeout
) {
503 peer_tracker
.report_dead_connection(peer
, now
- acked_ping
);
505 begin_dead_ping(peer
);
509 if (acked_ping
== newest_ping
) {
510 send_peer_ping(peer
, &now
);
513 mon
->timer
.add_event_after(ping_timeout
/ PING_DIVISOR
,
514 new C_MonContext
{mon
, [this, peer
](int) {
519 void Elector::begin_dead_ping(int peer
)
521 if (dead_pinging
.count(peer
)) {
525 live_pinging
.erase(peer
);
526 dead_pinging
.insert(peer
);
527 mon
->timer
.add_event_after(ping_timeout
,
528 new C_MonContext
{mon
, [this, peer
](int) {
533 void Elector::dead_ping(int peer
)
535 dout(20) << __func__
<< " to peer " << peer
<< dendl
;
536 if (!dead_pinging
.count(peer
)) {
537 dout(20) << __func__
<< peer
<< " is no longer marked for dead pinging" << dendl
;
540 ceph_assert(!live_pinging
.count(peer
));
542 utime_t now
= ceph_clock_now();
543 utime_t
& acked_ping
= peer_acked_ping
[peer
];
545 peer_tracker
.report_dead_connection(peer
, now
- acked_ping
);
547 mon
->timer
.add_event_after(ping_timeout
,
548 new C_MonContext
{mon
, [this, peer
](int) {
553 void Elector::handle_ping(MonOpRequestRef op
)
555 MMonPing
*m
= static_cast<MMonPing
*>(op
->get_req());
556 dout(10) << __func__
<< " " << *m
<< dendl
;
558 int prank
= mon
->monmap
->get_rank(m
->get_source_addr());
559 begin_peer_ping(prank
);
560 assimilate_connection_reports(m
->tracker_bl
);
564 MMonPing
*reply
= new MMonPing(MMonPing::PING_REPLY
, m
->stamp
, peer_tracker
.get_encoded_bl());
565 m
->get_connection()->send_message(reply
);
569 case MMonPing::PING_REPLY
:
570 const utime_t
& previous_acked
= peer_acked_ping
[prank
];
571 const utime_t
& newest
= peer_sent_ping
[prank
];
572 if (m
->stamp
> newest
&& !newest
.is_zero()) {
573 derr
<< "dropping PING_REPLY stamp " << m
->stamp
574 << " as it is newer than newest sent " << newest
<< dendl
;
577 if (m
->stamp
> previous_acked
) {
578 peer_tracker
.report_live_connection(prank
, m
->stamp
- previous_acked
);
579 peer_acked_ping
[prank
] = m
->stamp
;
581 utime_t now
= ceph_clock_now();
582 if (now
- m
->stamp
> ping_timeout
/ PING_DIVISOR
) {
583 send_peer_ping(prank
, &now
);
589 void Elector::dispatch(MonOpRequestRef op
)
591 op
->mark_event("elector:dispatch");
592 ceph_assert(op
->is_type_election_or_ping());
594 switch (op
->get_req()->get_type()) {
596 case MSG_MON_ELECTION
:
598 if (!logic
.participating
) {
601 if (op
->get_req()->get_source().num() >= mon
->monmap
->size()) {
602 dout(5) << " ignoring bogus election message with bad mon rank "
603 << op
->get_req()->get_source() << dendl
;
607 auto em
= op
->get_req
<MMonElection
>();
609 // assume an old message encoding would have matched
610 if (em
->fsid
!= mon
->monmap
->fsid
) {
611 dout(0) << " ignoring election msg fsid "
612 << em
->fsid
<< " != " << mon
->monmap
->fsid
<< dendl
;
616 if (!mon
->monmap
->contains(em
->get_source_addr())) {
617 dout(1) << "discarding election message: " << em
->get_source_addr()
618 << " not in my monmap " << *mon
->monmap
<< dendl
;
623 peermap
.decode(em
->monmap_bl
);
624 if (peermap
.epoch
> mon
->monmap
->epoch
) {
625 dout(0) << em
->get_source_inst() << " has newer monmap epoch " << peermap
.epoch
626 << " > my epoch " << mon
->monmap
->epoch
629 mon
->monmap
->decode(em
->monmap_bl
);
630 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
631 t
->put("monmap", mon
->monmap
->epoch
, em
->monmap_bl
);
632 t
->put("monmap", "last_committed", mon
->monmap
->epoch
);
633 mon
->store
->apply_transaction(t
);
634 //mon->monmon()->paxos->stash_latest(mon->monmap->epoch, em->monmap_bl);
636 mon
->notify_new_monmap(false);
640 if (peermap
.epoch
< mon
->monmap
->epoch
) {
641 dout(0) << em
->get_source_inst() << " has older monmap epoch " << peermap
.epoch
642 << " < my epoch " << mon
->monmap
->epoch
646 if (em
->strategy
!= logic
.strategy
) {
647 dout(5) << __func__
<< " somehow got an Election message with different strategy "
648 << em
->strategy
<< " from local " << logic
.strategy
649 << "; dropping for now to let race resolve" << dendl
;
653 if (em
->scoring_bl
.length()) {
654 assimilate_connection_reports(em
->scoring_bl
);
657 begin_peer_ping(mon
->monmap
->get_rank(em
->get_source_addr()));
659 case MMonElection::OP_PROPOSE
:
664 if (em
->epoch
< get_epoch()) {
665 dout(5) << "old epoch, dropping" << dendl
;
670 case MMonElection::OP_ACK
:
673 case MMonElection::OP_VICTORY
:
676 case MMonElection::OP_NAK
:
694 void Elector::start_participating()
696 logic
.participating
= true;
699 void Elector::notify_clear_peer_state()
701 peer_tracker
.notify_reset();
704 void Elector::notify_rank_changed(int new_rank
)
706 peer_tracker
.notify_rank_changed(new_rank
);
707 live_pinging
.erase(new_rank
);
708 dead_pinging
.erase(new_rank
);
711 void Elector::notify_rank_removed(int rank_removed
)
713 peer_tracker
.notify_rank_removed(rank_removed
);
714 /* we have to clean up the pinging state, which is annoying
715 because it's not indexed anywhere (and adding indexing
716 would also be annoying).
717 In the case where we are removing any rank that is not the
718 higest, we start with the removed rank and examine the state
719 of the surrounding ranks.
720 Everybody who remains with larger rank gets a new rank one lower
721 than before, and we have to figure out the remaining scheduled
722 ping contexts. So, starting one past with the removed rank, we:
723 * check if the current rank is alive or dead
724 * examine our new rank (one less than before, initially the removed
726 * * erase it if it's in the wrong set
727 * * start pinging it if we're not already
728 * check if the next rank is in the same pinging set, and delete
730 In the case where we are removing the highest rank,
731 we erase the removed rank from all sets.
733 if (rank_removed
< paxos_size()) {
734 for (unsigned i
= rank_removed
+ 1; i
<= paxos_size() ; ++i
) {
735 if (live_pinging
.count(i
)) {
736 dead_pinging
.erase(i
-1);
737 if (!live_pinging
.count(i
-1)) {
738 begin_peer_ping(i
-1);
740 if (!live_pinging
.count(i
+1)) {
741 live_pinging
.erase(i
);
744 else if (dead_pinging
.count(i
)) {
745 live_pinging
.erase(i
-1);
746 if (!dead_pinging
.count(i
-1)) {
747 begin_dead_ping(i
-1);
749 if (!dead_pinging
.count(i
+1)) {
750 dead_pinging
.erase(i
);
753 // we aren't pinging rank i at all
754 if (i
-1 == (unsigned)rank_removed
) {
755 // so we special case to make sure we
756 // actually nuke the removed rank
757 dead_pinging
.erase(rank_removed
);
758 live_pinging
.erase(rank_removed
);
763 if (live_pinging
.count(rank_removed
)) {
764 live_pinging
.erase(rank_removed
);
766 if (dead_pinging
.count(rank_removed
)) {
767 dead_pinging
.erase(rank_removed
);
772 void Elector::notify_strategy_maybe_changed(int strategy
)
774 logic
.set_election_strategy(static_cast<ElectionLogic::election_strategy
>(strategy
));