#include "common/Timer.h"
#include "messages/PaxosServiceMessage.h"
+using std::string;
+using std::unique_lock;
+
+using ceph::bufferlist;
+using ceph::Formatter;
+using ceph::JSONFormatter;
+using ceph::to_timespan;
+
#define dout_subsys ceph_subsys_paxos
#undef dout_prefix
-#define dout_prefix _prefix(_dout, mon, mon->name, mon->rank, paxos_name, state, first_committed, last_committed)
-static ostream& _prefix(std::ostream *_dout, Monitor *mon, const string& name,
- int rank, const string& paxos_name, int state,
- version_t first_committed, version_t last_committed)
+#define dout_prefix _prefix(_dout, mon, mon.name, mon.rank, paxos_name, state, first_committed, last_committed)
+static std::ostream& _prefix(std::ostream *_dout, Monitor &mon, const string& name,
+ int rank, const string& paxos_name, int state,
+ version_t first_committed, version_t last_committed)
{
return *_dout << "mon." << name << "@" << rank
- << "(" << mon->get_state_name() << ")"
+ << "(" << mon.get_state_name() << ")"
<< ".paxos(" << paxos_name << " " << Paxos::get_statename(state)
<< " c " << first_committed << ".." << last_committed
<< ") ";
MonitorDBStore *Paxos::get_store()
{
- return mon->store;
+ return mon.store;
}
void Paxos::read_and_prepare_transactions(MonitorDBStore::TransactionRef tx,
{
// we're recoverying, it seems!
state = STATE_RECOVERING;
- ceph_assert(mon->is_leader());
+ ceph_assert(mon.is_leader());
// reset the number of lasts received
uncommitted_v = 0;
dout(10) << "collect with pn " << accepted_pn << dendl;
// send collect
- for (set<int>::const_iterator p = mon->get_quorum().begin();
- p != mon->get_quorum().end();
+ for (auto p = mon.get_quorum().begin();
+ p != mon.get_quorum().end();
++p) {
- if (*p == mon->rank) continue;
-
- MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT,
+ if (*p == mon.rank) continue;
+
+ MMonPaxos *collect = new MMonPaxos(mon.get_epoch(), MMonPaxos::OP_COLLECT,
ceph_clock_now());
collect->last_committed = last_committed;
collect->first_committed = first_committed;
collect->pn = accepted_pn;
- mon->send_mon_message(collect, *p);
+ mon.send_mon_message(collect, *p);
}
// set timeout event
- collect_timeout_event = mon->timer.add_event_after(
+ collect_timeout_event = mon.timer.add_event_after(
g_conf()->mon_accept_timeout_factor *
g_conf()->mon_lease,
- new C_MonContext{mon, [this](int r) {
+ new C_MonContext{&mon, [this](int r) {
if (r == -ECANCELED)
return;
collect_timeout();
auto collect = op->get_req<MMonPaxos>();
dout(10) << "handle_collect " << *collect << dendl;
- ceph_assert(mon->is_peon()); // mon epoch filter should catch strays
+ ceph_assert(mon.is_peon()); // mon epoch filter should catch strays
// we're recoverying, it seems!
state = STATE_RECOVERING;
<< " (theirs: " << collect->first_committed
<< "; ours: " << last_committed << ") -- bootstrap!" << dendl;
op->mark_paxos_event("need to bootstrap");
- mon->bootstrap();
+ mon.bootstrap();
return;
}
// reply
- MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST,
+ MMonPaxos *last = new MMonPaxos(mon.get_epoch(), MMonPaxos::OP_LAST,
ceph_clock_now());
last->last_committed = last_committed;
last->first_committed = first_committed;
bool Paxos::store_state(MMonPaxos *m)
{
auto t(std::make_shared<MonitorDBStore::Transaction>());
- map<version_t,bufferlist>::iterator start = m->values.begin();
+ auto start = m->values.begin();
bool changed = false;
// build map of values to store
// make sure we get the right interval of values to apply by pushing forward
// the 'end' iterator until it matches the message's 'last_committed'.
- map<version_t,bufferlist>::iterator end = start;
+ auto end = start;
while (end != m->values.end() && end->first <= m->last_committed) {
last_committed = end->first;
++end;
// we should apply the state here -- decode every single bufferlist in the
// map and append the transactions to 't'.
- map<version_t,bufferlist>::iterator it;
- for (it = start; it != end; ++it) {
+ for (auto it = start; it != end; ++it) {
// write the bufferlist as the version's value
t->put(get_name(), it->first, it->second);
// decode the bufferlist and append it to the transaction we will shortly
dout(10) << "handle_last " << *last << dendl;
- if (!mon->is_leader()) {
+ if (!mon.is_leader()) {
dout(10) << "not leader, dropping" << dendl;
return;
}
<< " (theirs: " << last->first_committed
<< "; ours: " << last_committed << ") -- bootstrap!" << dendl;
op->mark_paxos_event("need to bootstrap");
- mon->bootstrap();
+ mon.bootstrap();
return;
}
ceph_assert(g_conf()->paxos_kill_at != 2);
// is everyone contiguous and up to date?
- for (map<int,version_t>::iterator p = peer_last_committed.begin();
+ for (auto p = peer_last_committed.begin();
p != peer_last_committed.end();
++p) {
if (p->second + 1 < first_committed && first_committed > 1) {
<< ") is too low for our first_committed (" << first_committed
<< ") -- bootstrap!" << dendl;
op->mark_paxos_event("need to bootstrap");
- mon->bootstrap();
+ mon.bootstrap();
return;
}
if (p->second < last_committed) {
// share committed values
dout(10) << " sending commit to mon." << p->first << dendl;
- MMonPaxos *commit = new MMonPaxos(mon->get_epoch(),
+ MMonPaxos *commit = new MMonPaxos(mon.get_epoch(),
MMonPaxos::OP_COMMIT,
ceph_clock_now());
share_state(commit, peer_first_committed[p->first], p->second);
- mon->send_mon_message(commit, p->first);
+ mon.send_mon_message(commit, p->first);
}
}
dout(10) << " they had a higher pn than us, picking a new one." << dendl;
// cancel timeout event
- mon->timer.cancel_event(collect_timeout_event);
+ mon.timer.cancel_event(collect_timeout_event);
collect_timeout_event = 0;
collect(last->pn);
}
// is that everyone?
- if (num_last == mon->get_quorum().size()) {
+ if (num_last == mon.get_quorum().size()) {
// cancel timeout event
- mon->timer.cancel_event(collect_timeout_event);
+ mon.timer.cancel_event(collect_timeout_event);
collect_timeout_event = 0;
peer_first_committed.clear();
peer_last_committed.clear();
dout(1) << "collect timeout, calling fresh election" << dendl;
collect_timeout_event = 0;
logger->inc(l_paxos_collect_timeout);
- ceph_assert(mon->is_leader());
- mon->bootstrap();
+ ceph_assert(mon.is_leader());
+ mon.bootstrap();
}
<< v.length() << " bytes"
<< dendl;
- ceph_assert(mon->is_leader());
+ ceph_assert(mon.is_leader());
ceph_assert(is_updating() || is_updating_previous());
// we must already have a majority for this to work.
- ceph_assert(mon->get_quorum().size() == 1 ||
- num_last > (unsigned)mon->monmap->size()/2);
+ ceph_assert(mon.get_quorum().size() == 1 ||
+ num_last > (unsigned)mon.monmap->size()/2);
// and no value, yet.
ceph_assert(new_value.length() == 0);
// accept it ourselves
accepted.clear();
- accepted.insert(mon->rank);
+ accepted.insert(mon.rank);
new_value = v;
if (last_committed == 0) {
ceph_assert(g_conf()->paxos_kill_at != 3);
- if (mon->get_quorum().size() == 1) {
+ if (mon.get_quorum().size() == 1) {
// we're alone, take it easy
commit_start();
return;
}
// ask others to accept it too!
- for (set<int>::const_iterator p = mon->get_quorum().begin();
- p != mon->get_quorum().end();
+ for (auto p = mon.get_quorum().begin();
+ p != mon.get_quorum().end();
++p) {
- if (*p == mon->rank) continue;
+ if (*p == mon.rank) continue;
dout(10) << " sending begin to mon." << *p << dendl;
- MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN,
+ MMonPaxos *begin = new MMonPaxos(mon.get_epoch(), MMonPaxos::OP_BEGIN,
ceph_clock_now());
begin->values[last_committed+1] = new_value;
begin->last_committed = last_committed;
begin->pn = accepted_pn;
- mon->send_mon_message(begin, *p);
+ mon.send_mon_message(begin, *p);
}
// set timeout event
- accept_timeout_event = mon->timer.add_event_after(
+ accept_timeout_event = mon.timer.add_event_after(
g_conf()->mon_accept_timeout_factor * g_conf()->mon_lease,
- new C_MonContext{mon, [this](int r) {
+ new C_MonContext{&mon, [this](int r) {
if (r == -ECANCELED)
return;
accept_timeout();
ceph_assert(g_conf()->paxos_kill_at != 5);
// reply
- MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT,
+ MMonPaxos *accept = new MMonPaxos(mon.get_epoch(), MMonPaxos::OP_ACCEPT,
ceph_clock_now());
accept->pn = accepted_pn;
accept->last_committed = last_committed;
// stale state.
// FIXME: we can improve this with an additional lease revocation message
// that doesn't block for the persist.
- if (accepted == mon->get_quorum()) {
+ if (accepted == mon.get_quorum()) {
// yay, commit!
dout(10) << " got majority, committing, done with update" << dendl;
op->mark_paxos_event("commit_start");
{
dout(1) << "accept timeout, calling fresh election" << dendl;
accept_timeout_event = 0;
- ceph_assert(mon->is_leader());
+ ceph_assert(mon.is_leader());
ceph_assert(is_updating() || is_updating_previous() || is_writing() ||
is_writing_previous());
logger->inc(l_paxos_accept_timeout);
- mon->bootstrap();
+ mon.bootstrap();
}
struct C_Committed : public Context {
explicit C_Committed(Paxos *p) : paxos(p) {}
void finish(int r) override {
ceph_assert(r >= 0);
- std::lock_guard l(paxos->mon->lock);
+ std::lock_guard l(paxos->mon.lock);
if (paxos->is_shutdown()) {
paxos->abort_commit();
return;
ceph_abort();
++commits_started;
- if (mon->get_quorum().size() > 1) {
+ if (mon.get_quorum().size() > 1) {
// cancel timeout event
- mon->timer.cancel_event(accept_timeout_event);
+ mon.timer.cancel_event(accept_timeout_event);
accept_timeout_event = 0;
}
}
_sanity_check_store();
// tell everyone
- for (set<int>::const_iterator p = mon->get_quorum().begin();
- p != mon->get_quorum().end();
+ for (auto p = mon.get_quorum().begin();
+ p != mon.get_quorum().end();
++p) {
- if (*p == mon->rank) continue;
+ if (*p == mon.rank) continue;
dout(10) << " sending commit to mon." << *p << dendl;
- MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,
+ MMonPaxos *commit = new MMonPaxos(mon.get_epoch(), MMonPaxos::OP_COMMIT,
ceph_clock_now());
commit->values[last_committed] = new_value;
commit->pn = accepted_pn;
commit->last_committed = last_committed;
- mon->send_mon_message(commit, *p);
+ mon.send_mon_message(commit, *p);
}
ceph_assert(g_conf()->paxos_kill_at != 9);
new_value.clear();
// WRITING -> REFRESH
- // among other things, this lets do_refresh() -> mon->bootstrap() ->
+ // among other things, this lets do_refresh() -> mon.bootstrap() ->
// wait_for_paxos_write() know that it doesn't need to flush the store
// queue. and it should not, as we are in the async completion thread now!
ceph_assert(is_writing() || is_writing_previous());
if (do_refresh()) {
commit_proposal();
- if (mon->get_quorum().size() > 1) {
+ if (mon.get_quorum().size() > 1) {
extend_lease();
}
logger->inc(l_paxos_commit);
- if (!mon->is_peon()) {
+ if (!mon.is_peon()) {
dout(10) << "not a peon, dropping" << dendl;
ceph_abort();
return;
void Paxos::extend_lease()
{
- ceph_assert(mon->is_leader());
+ ceph_assert(mon.is_leader());
//assert(is_active());
lease_expire = ceph::real_clock::now();
lease_expire += ceph::make_timespan(g_conf()->mon_lease);
acked_lease.clear();
- acked_lease.insert(mon->rank);
+ acked_lease.insert(mon.rank);
dout(7) << "extend_lease now+" << g_conf()->mon_lease
<< " (" << lease_expire << ")" << dendl;
// bcast
- for (set<int>::const_iterator p = mon->get_quorum().begin();
- p != mon->get_quorum().end(); ++p) {
+ for (auto p = mon.get_quorum().begin();
+ p != mon.get_quorum().end(); ++p) {
- if (*p == mon->rank) continue;
- MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE,
+ if (*p == mon.rank) continue;
+ MMonPaxos *lease = new MMonPaxos(mon.get_epoch(), MMonPaxos::OP_LEASE,
ceph_clock_now());
lease->last_committed = last_committed;
lease->lease_timestamp = utime_t{lease_expire};
lease->first_committed = first_committed;
- mon->send_mon_message(lease, *p);
+ mon.send_mon_message(lease, *p);
}
// set timeout event.
// if old timeout is still in place, leave it.
if (!lease_ack_timeout_event) {
- lease_ack_timeout_event = mon->timer.add_event_after(
+ lease_ack_timeout_event = mon.timer.add_event_after(
g_conf()->mon_lease_ack_timeout_factor * g_conf()->mon_lease,
- new C_MonContext{mon, [this](int r) {
+ new C_MonContext{&mon, [this](int r) {
if (r == -ECANCELED)
return;
lease_ack_timeout();
at -= ceph::make_timespan(g_conf()->mon_lease);
at += ceph::make_timespan(g_conf()->mon_lease_renew_interval_factor *
g_conf()->mon_lease);
- lease_renew_event = mon->timer.add_event_at(
- at, new C_MonContext{mon, [this](int r) {
+ lease_renew_event = mon.timer.add_event_at(
+ at, new C_MonContext{&mon, [this](int r) {
if (r == -ECANCELED)
return;
lease_renew_timeout();
utime_t warn_diff = now - last_clock_drift_warn;
if (warn_diff >
pow(g_conf()->mon_clock_drift_warn_backoff, clock_drift_warned)) {
- mon->clog->warn() << "message from " << from << " was stamped " << diff
+ mon.clog->warn() << "message from " << from << " was stamped " << diff
<< "s in the future, clocks not synchronized";
last_clock_drift_warn = ceph_clock_now();
++clock_drift_warned;
// make sure we have the latest state loaded up
auto start = ceph::coarse_mono_clock::now();
- mon->refresh_from_paxos(&need_bootstrap);
+ mon.refresh_from_paxos(&need_bootstrap);
auto end = ceph::coarse_mono_clock::now();
logger->inc(l_paxos_refresh);
if (need_bootstrap) {
dout(10) << " doing requested bootstrap" << dendl;
- mon->bootstrap();
+ mon.bootstrap();
return false;
}
void Paxos::commit_proposal()
{
dout(10) << __func__ << dendl;
- ceph_assert(mon->is_leader());
+ ceph_assert(mon.is_leader());
ceph_assert(is_refresh());
finish_contexts(g_ceph_context, committing_finishers);
void Paxos::finish_round()
{
dout(10) << __func__ << dendl;
- ceph_assert(mon->is_leader());
+ ceph_assert(mon.is_leader());
// ok, now go active!
state = STATE_ACTIVE;
op->mark_paxos_event("handle_lease");
auto lease = op->get_req<MMonPaxos>();
// sanity
- if (!mon->is_peon() ||
+ if (!mon.is_peon() ||
last_committed != lease->last_committed) {
dout(10) << "handle_lease i'm not a peon, or they're not the leader,"
<< " or the last_committed doesn't match, dropping" << dendl;
<< " now " << lease_expire << dendl;
// ack
- MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK,
+ MMonPaxos *ack = new MMonPaxos(mon.get_epoch(), MMonPaxos::OP_LEASE_ACK,
ceph_clock_now());
ack->last_committed = last_committed;
ack->first_committed = first_committed;
ack->lease_timestamp = ceph_clock_now();
- encode(mon->session_map.feature_map, ack->feature_map);
+ encode(mon.session_map.feature_map, ack->feature_map);
lease->get_connection()->send_message(ack);
// (re)set timeout event.
acked_lease.insert(from);
if (ack->feature_map.length()) {
auto p = ack->feature_map.cbegin();
- FeatureMap& t = mon->quorum_feature_map[from];
+ FeatureMap& t = mon.quorum_feature_map[from];
decode(t, p);
}
- if (acked_lease == mon->get_quorum()) {
+ if (acked_lease == mon.get_quorum()) {
// yay!
dout(10) << "handle_lease_ack from " << ack->get_source()
<< " -- got everyone" << dendl;
- mon->timer.cancel_event(lease_ack_timeout_event);
+ mon.timer.cancel_event(lease_ack_timeout_event);
lease_ack_timeout_event = 0;
} else {
dout(10) << "handle_lease_ack from " << ack->get_source()
<< " -- still need "
- << mon->get_quorum().size() - acked_lease.size()
+ << mon.get_quorum().size() - acked_lease.size()
<< " more" << dendl;
}
} else {
void Paxos::lease_ack_timeout()
{
dout(1) << "lease_ack_timeout -- calling new election" << dendl;
- ceph_assert(mon->is_leader());
+ ceph_assert(mon.is_leader());
ceph_assert(is_active());
logger->inc(l_paxos_lease_ack_timeout);
lease_ack_timeout_event = 0;
- mon->bootstrap();
+ mon.bootstrap();
}
void Paxos::reset_lease_timeout()
{
dout(20) << "reset_lease_timeout - setting timeout event" << dendl;
if (lease_timeout_event)
- mon->timer.cancel_event(lease_timeout_event);
- lease_timeout_event = mon->timer.add_event_after(
+ mon.timer.cancel_event(lease_timeout_event);
+ lease_timeout_event = mon.timer.add_event_after(
g_conf()->mon_lease_ack_timeout_factor * g_conf()->mon_lease,
- new C_MonContext{mon, [this](int r) {
+ new C_MonContext{&mon, [this](int r) {
if (r == -ECANCELED)
return;
lease_timeout();
void Paxos::lease_timeout()
{
dout(1) << "lease_timeout -- calling new election" << dendl;
- ceph_assert(mon->is_peon());
+ ceph_assert(mon.is_peon());
logger->inc(l_paxos_lease_timeout);
lease_timeout_event = 0;
- mon->bootstrap();
+ mon.bootstrap();
}
void Paxos::lease_renew_timeout()
last_pn /= 100;
last_pn++;
last_pn *= 100;
- last_pn += (version_t)mon->rank;
+ last_pn += (version_t)mon.rank;
// write
auto t(std::make_shared<MonitorDBStore::Transaction>());
void Paxos::cancel_events()
{
if (collect_timeout_event) {
- mon->timer.cancel_event(collect_timeout_event);
+ mon.timer.cancel_event(collect_timeout_event);
collect_timeout_event = 0;
}
if (accept_timeout_event) {
- mon->timer.cancel_event(accept_timeout_event);
+ mon.timer.cancel_event(accept_timeout_event);
accept_timeout_event = 0;
}
if (lease_renew_event) {
- mon->timer.cancel_event(lease_renew_event);
+ mon.timer.cancel_event(lease_renew_event);
lease_renew_event = 0;
}
if (lease_ack_timeout_event) {
- mon->timer.cancel_event(lease_ack_timeout_event);
+ mon.timer.cancel_event(lease_ack_timeout_event);
lease_ack_timeout_event = 0;
}
if (lease_timeout_event) {
- mon->timer.cancel_event(lease_timeout_event);
+ mon.timer.cancel_event(lease_timeout_event);
lease_timeout_event = 0;
}
}
// Let store finish commits in progress
// XXX: I assume I can't use finish_contexts() because the store
// is going to trigger
- unique_lock l{mon->lock, std::adopt_lock};
+ unique_lock l{mon.lock, std::adopt_lock};
shutdown_cond.wait(l, [this] { return commits_started <= 0; });
// Monitor::shutdown() will unlock it
l.release();
logger->inc(l_paxos_start_leader);
- if (mon->get_quorum().size() == 1) {
+ if (mon.get_quorum().size() == 1) {
state = STATE_ACTIVE;
return;
}
if (is_writing() || is_writing_previous()) {
dout(10) << __func__ << " flushing" << dendl;
- mon->lock.unlock();
- mon->store->flush();
- mon->lock.lock();
+ mon.lock.unlock();
+ mon.store->flush();
+ mon.lock.lock();
dout(10) << __func__ << " flushed" << dendl;
}
state = STATE_RECOVERING;
auto *req = op->get_req<MMonPaxos>();
// election in progress?
- if (!mon->is_leader() && !mon->is_peon()) {
+ if (!mon.is_leader() && !mon.is_peon()) {
dout(5) << "election in progress, dropping " << *req << dendl;
return;
}
// check sanity
- ceph_assert(mon->is_leader() ||
- (mon->is_peon() && req->get_source().num() == mon->get_leader()));
+ ceph_assert(mon.is_leader() ||
+ (mon.is_peon() && req->get_source().num() == mon.get_leader()));
// NOTE: these ops are defined in messages/MMonPaxos.h
switch (req->op) {
ret = false;
else
ret =
- (mon->is_peon() || mon->is_leader()) &&
+ (mon.is_peon() || mon.is_leader()) &&
(is_active() || is_updating() || is_writing()) &&
last_committed > 0 && is_lease_valid(); // must have a value alone, or have lease
dout(5) << __func__ << " = " << (int)ret
bool Paxos::is_lease_valid()
{
- return ((mon->get_quorum().size() == 1)
+ return ((mon.get_quorum().size() == 1)
|| (ceph::real_clock::now() < lease_expire));
}
bool Paxos::is_writeable()
{
return
- mon->is_leader() &&
+ mon.is_leader() &&
is_active() &&
is_lease_valid();
}
MonitorDBStore::TransactionRef Paxos::get_pending_transaction()
{
- ceph_assert(mon->is_leader());
+ ceph_assert(mon.is_leader());
if (!pending_proposal) {
pending_proposal.reset(new MonitorDBStore::Transaction);
ceph_assert(pending_finishers.empty());