void Paxos::init_logger()
{
PerfCountersBuilder pcb(g_ceph_context, "paxos", l_paxos_first, l_paxos_last);
+
+ // Because monitors are so few in number, the resource cost of capturing
+ // almost all their perf counters at USEFUL is trivial.
+ pcb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
+
pcb.add_u64_counter(l_paxos_start_leader, "start_leader", "Starts in leader role");
pcb.add_u64_counter(l_paxos_start_peon, "start_peon", "Starts in peon role");
pcb.add_u64_counter(l_paxos_restart, "restart", "Restarts");
}
// set timeout event
- collect_timeout_event = new C_MonContext(mon, [this](int r) {
+ collect_timeout_event = mon->timer.add_event_after(
+ g_conf->mon_accept_timeout_factor *
+ g_conf->mon_lease,
+ new C_MonContext(mon, [this](int r) {
if (r == -ECANCELED)
return;
collect_timeout();
- });
- mon->timer.add_event_after(g_conf->mon_accept_timeout_factor *
- g_conf->mon_lease,
- collect_timeout_event);
+ }));
}
}
// set timeout event
- accept_timeout_event = new C_MonContext(mon, [this](int r) {
- if (r == -ECANCELED)
- return;
- accept_timeout();
- });
- mon->timer.add_event_after(g_conf->mon_accept_timeout_factor *
- g_conf->mon_lease,
- accept_timeout_event);
+ accept_timeout_event = mon->timer.add_event_after(
+ g_conf->mon_accept_timeout_factor * g_conf->mon_lease,
+ new C_MonContext(mon, [this](int r) {
+ if (r == -ECANCELED)
+ return;
+ accept_timeout();
+ }));
}
// peon
void finish(int r) override {
assert(r >= 0);
Mutex::Locker l(paxos->mon->lock);
+ if (paxos->is_shutdown()) {
+ paxos->abort_commit();
+ return;
+ }
paxos->commit_finish();
}
};
+void Paxos::abort_commit()
+{
+ assert(commits_started > 0);
+ --commits_started;
+ if (commits_started == 0)
+ shutdown_cond.Signal();
+}
+
void Paxos::commit_start()
{
dout(10) << __func__ << " " << (last_committed+1) << dendl;
state = STATE_WRITING;
else
ceph_abort();
+ ++commits_started;
if (mon->get_quorum().size() > 1) {
// cancel timeout event
// it doesn't need to flush the store queue
assert(is_writing() || is_writing_previous());
state = STATE_REFRESH;
+ assert(commits_started > 0);
+ --commits_started;
if (do_refresh()) {
commit_proposal();
// set timeout event.
// if old timeout is still in place, leave it.
if (!lease_ack_timeout_event) {
- lease_ack_timeout_event = new C_MonContext(mon, [this](int r) {
- if (r == -ECANCELED)
- return;
- lease_ack_timeout();
- });
- mon->timer.add_event_after(g_conf->mon_lease_ack_timeout_factor *
- g_conf->mon_lease,
- lease_ack_timeout_event);
+ lease_ack_timeout_event = mon->timer.add_event_after(
+ g_conf->mon_lease_ack_timeout_factor * g_conf->mon_lease,
+ new C_MonContext(mon, [this](int r) {
+ if (r == -ECANCELED)
+ return;
+ lease_ack_timeout();
+ }));
}
// set renew event
- lease_renew_event = new C_MonContext(mon, [this](int r) {
- if (r == -ECANCELED)
- return;
- lease_renew_timeout();
- });
utime_t at = lease_expire;
at -= g_conf->mon_lease;
at += g_conf->mon_lease_renew_interval_factor * g_conf->mon_lease;
- mon->timer.add_event_at(at, lease_renew_event);
+ lease_renew_event = mon->timer.add_event_at(
+ at, new C_MonContext(mon, [this](int r) {
+ if (r == -ECANCELED)
+ return;
+ lease_renew_timeout();
+ }));
}
void Paxos::warn_on_future_time(utime_t t, entity_name_t from)
int from = ack->get_source().num();
if (!lease_ack_timeout_event) {
- dout(10) << "handle_lease_ack from " << ack->get_source()
+ dout(10) << "handle_lease_ack from " << ack->get_source()
<< " -- stray (probably since revoked)" << dendl;
- }
- else if (acked_lease.count(from) == 0) {
+
+ } else if (acked_lease.count(from) == 0) {
acked_lease.insert(from);
if (ack->feature_map.length()) {
auto p = ack->feature_map.begin();
}
if (acked_lease == mon->get_quorum()) {
// yay!
- dout(10) << "handle_lease_ack from " << ack->get_source()
+ dout(10) << "handle_lease_ack from " << ack->get_source()
<< " -- got everyone" << dendl;
mon->timer.cancel_event(lease_ack_timeout_event);
lease_ack_timeout_event = 0;
} else {
- dout(10) << "handle_lease_ack from " << ack->get_source()
+ dout(10) << "handle_lease_ack from " << ack->get_source()
<< " -- still need "
<< mon->get_quorum().size() - acked_lease.size()
<< " more" << dendl;
}
} else {
- dout(10) << "handle_lease_ack from " << ack->get_source()
+ dout(10) << "handle_lease_ack from " << ack->get_source()
<< " dup (lagging!), ignoring" << dendl;
}
dout(20) << "reset_lease_timeout - setting timeout event" << dendl;
if (lease_timeout_event)
mon->timer.cancel_event(lease_timeout_event);
- lease_timeout_event = new C_MonContext(mon, [this](int r) {
- if (r == -ECANCELED)
- return;
- lease_timeout();
- });
- mon->timer.add_event_after(g_conf->mon_lease_ack_timeout_factor *
- g_conf->mon_lease,
- lease_timeout_event);
+ lease_timeout_event = mon->timer.add_event_after(
+ g_conf->mon_lease_ack_timeout_factor * g_conf->mon_lease,
+ new C_MonContext(mon, [this](int r) {
+ if (r == -ECANCELED)
+ return;
+ lease_timeout();
+ }));
}
void Paxos::lease_timeout()
{
dout(10) << __func__ << " cancel all contexts" << dendl;
+ state = STATE_SHUTDOWN;
+
// discard pending transaction
pending_proposal.reset();
+ // Let store finish commits in progress
+ // XXX: I assume I can't use finish_contexts() because the store
+ // is going to trigger
+ while(commits_started > 0)
+ shutdown_cond.Wait(mon->lock);
+
finish_contexts(g_ceph_context, waiting_for_writeable, -ECANCELED);
finish_contexts(g_ceph_context, waiting_for_commit, -ECANCELED);
finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED);