]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/Paxos.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / mon / Paxos.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <sstream>
16#include "Paxos.h"
17#include "Monitor.h"
18#include "messages/MMonPaxos.h"
19
31f18b77 20#include "mon/mon_types.h"
7c673cae 21#include "common/config.h"
11fdf7f2 22#include "include/ceph_assert.h"
7c673cae
FG
23#include "include/stringify.h"
24#include "common/Timer.h"
25#include "messages/PaxosServiceMessage.h"
26
f67539c2
TL
27using std::string;
28using std::unique_lock;
29
30using ceph::bufferlist;
31using ceph::Formatter;
32using ceph::JSONFormatter;
33using ceph::to_timespan;
34
7c673cae
FG
35#define dout_subsys ceph_subsys_paxos
36#undef dout_prefix
f67539c2
TL
37#define dout_prefix _prefix(_dout, mon, mon.name, mon.rank, paxos_name, state, first_committed, last_committed)
38static std::ostream& _prefix(std::ostream *_dout, Monitor &mon, const string& name,
39 int rank, const string& paxos_name, int state,
40 version_t first_committed, version_t last_committed)
7c673cae
FG
41{
42 return *_dout << "mon." << name << "@" << rank
f67539c2 43 << "(" << mon.get_state_name() << ")"
7c673cae
FG
44 << ".paxos(" << paxos_name << " " << Paxos::get_statename(state)
45 << " c " << first_committed << ".." << last_committed
46 << ") ";
47}
48
49class Paxos::C_Trimmed : public Context {
50 Paxos *paxos;
51public:
52 explicit C_Trimmed(Paxos *p) : paxos(p) { }
53 void finish(int r) override {
54 paxos->trimming = false;
55 }
56};
57
58MonitorDBStore *Paxos::get_store()
59{
f67539c2 60 return mon.store;
7c673cae
FG
61}
62
63void Paxos::read_and_prepare_transactions(MonitorDBStore::TransactionRef tx,
64 version_t first, version_t last)
65{
66 dout(10) << __func__ << " first " << first << " last " << last << dendl;
67 for (version_t v = first; v <= last; ++v) {
68 dout(30) << __func__ << " apply version " << v << dendl;
69 bufferlist bl;
70 int err = get_store()->get(get_name(), v, bl);
11fdf7f2
TL
71 ceph_assert(err == 0);
72 ceph_assert(bl.length());
7c673cae
FG
73 decode_append_transaction(tx, bl);
74 }
75 dout(15) << __func__ << " total versions " << (last-first) << dendl;
76}
77
78void Paxos::init()
79{
80 // load paxos variables from stable storage
81 last_pn = get_store()->get(get_name(), "last_pn");
82 accepted_pn = get_store()->get(get_name(), "accepted_pn");
83 last_committed = get_store()->get(get_name(), "last_committed");
84 first_committed = get_store()->get(get_name(), "first_committed");
85
86 dout(10) << __func__ << " last_pn: " << last_pn << " accepted_pn: "
87 << accepted_pn << " last_committed: " << last_committed
88 << " first_committed: " << first_committed << dendl;
89
90 dout(10) << "init" << dendl;
11fdf7f2 91 ceph_assert(is_consistent());
7c673cae
FG
92}
93
94void Paxos::init_logger()
95{
96 PerfCountersBuilder pcb(g_ceph_context, "paxos", l_paxos_first, l_paxos_last);
3efd9988
FG
97
98 // Because monitors are so few in number, the resource cost of capturing
99 // almost all their perf counters at USEFUL is trivial.
100 pcb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
101
7c673cae
FG
102 pcb.add_u64_counter(l_paxos_start_leader, "start_leader", "Starts in leader role");
103 pcb.add_u64_counter(l_paxos_start_peon, "start_peon", "Starts in peon role");
104 pcb.add_u64_counter(l_paxos_restart, "restart", "Restarts");
105 pcb.add_u64_counter(l_paxos_refresh, "refresh", "Refreshes");
106 pcb.add_time_avg(l_paxos_refresh_latency, "refresh_latency", "Refresh latency");
107 pcb.add_u64_counter(l_paxos_begin, "begin", "Started and handled begins");
108 pcb.add_u64_avg(l_paxos_begin_keys, "begin_keys", "Keys in transaction on begin");
11fdf7f2 109 pcb.add_u64_avg(l_paxos_begin_bytes, "begin_bytes", "Data in transaction on begin", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
110 pcb.add_time_avg(l_paxos_begin_latency, "begin_latency", "Latency of begin operation");
111 pcb.add_u64_counter(l_paxos_commit, "commit",
112 "Commits", "cmt");
113 pcb.add_u64_avg(l_paxos_commit_keys, "commit_keys", "Keys in transaction on commit");
11fdf7f2 114 pcb.add_u64_avg(l_paxos_commit_bytes, "commit_bytes", "Data in transaction on commit", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
115 pcb.add_time_avg(l_paxos_commit_latency, "commit_latency",
116 "Commit latency", "clat");
117 pcb.add_u64_counter(l_paxos_collect, "collect", "Peon collects");
118 pcb.add_u64_avg(l_paxos_collect_keys, "collect_keys", "Keys in transaction on peon collect");
11fdf7f2 119 pcb.add_u64_avg(l_paxos_collect_bytes, "collect_bytes", "Data in transaction on peon collect", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
120 pcb.add_time_avg(l_paxos_collect_latency, "collect_latency", "Peon collect latency");
121 pcb.add_u64_counter(l_paxos_collect_uncommitted, "collect_uncommitted", "Uncommitted values in started and handled collects");
122 pcb.add_u64_counter(l_paxos_collect_timeout, "collect_timeout", "Collect timeouts");
123 pcb.add_u64_counter(l_paxos_accept_timeout, "accept_timeout", "Accept timeouts");
124 pcb.add_u64_counter(l_paxos_lease_ack_timeout, "lease_ack_timeout", "Lease acknowledgement timeouts");
125 pcb.add_u64_counter(l_paxos_lease_timeout, "lease_timeout", "Lease timeouts");
126 pcb.add_u64_counter(l_paxos_store_state, "store_state", "Store a shared state on disk");
127 pcb.add_u64_avg(l_paxos_store_state_keys, "store_state_keys", "Keys in transaction in stored state");
11fdf7f2 128 pcb.add_u64_avg(l_paxos_store_state_bytes, "store_state_bytes", "Data in transaction in stored state", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
129 pcb.add_time_avg(l_paxos_store_state_latency, "store_state_latency", "Storing state latency");
130 pcb.add_u64_counter(l_paxos_share_state, "share_state", "Sharings of state");
131 pcb.add_u64_avg(l_paxos_share_state_keys, "share_state_keys", "Keys in shared state");
11fdf7f2 132 pcb.add_u64_avg(l_paxos_share_state_bytes, "share_state_bytes", "Data in shared state", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
133 pcb.add_u64_counter(l_paxos_new_pn, "new_pn", "New proposal number queries");
134 pcb.add_time_avg(l_paxos_new_pn_latency, "new_pn_latency", "New proposal number getting latency");
135 logger = pcb.create_perf_counters();
136 g_ceph_context->get_perfcounters_collection()->add(logger);
137}
138
139void Paxos::dump_info(Formatter *f)
140{
141 f->open_object_section("paxos");
142 f->dump_unsigned("first_committed", first_committed);
143 f->dump_unsigned("last_committed", last_committed);
144 f->dump_unsigned("last_pn", last_pn);
145 f->dump_unsigned("accepted_pn", accepted_pn);
146 f->close_section();
147}
148
149// ---------------------------------
150
151// PHASE 1
152
153// leader
154void Paxos::collect(version_t oldpn)
155{
156 // we're recoverying, it seems!
157 state = STATE_RECOVERING;
f67539c2 158 ceph_assert(mon.is_leader());
7c673cae
FG
159
160 // reset the number of lasts received
161 uncommitted_v = 0;
162 uncommitted_pn = 0;
163 uncommitted_value.clear();
164 peer_first_committed.clear();
165 peer_last_committed.clear();
166
167 // look for uncommitted value
168 if (get_store()->exists(get_name(), last_committed+1)) {
169 version_t v = get_store()->get(get_name(), "pending_v");
170 version_t pn = get_store()->get(get_name(), "pending_pn");
171 if (v && pn && v == last_committed + 1) {
172 uncommitted_pn = pn;
173 } else {
174 dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << accepted_pn
175 << " and crossing our fingers" << dendl;
176 uncommitted_pn = accepted_pn;
177 }
178 uncommitted_v = last_committed+1;
179
180 get_store()->get(get_name(), last_committed+1, uncommitted_value);
11fdf7f2 181 ceph_assert(uncommitted_value.length());
7c673cae
FG
182 dout(10) << "learned uncommitted " << (last_committed+1)
183 << " pn " << uncommitted_pn
184 << " (" << uncommitted_value.length() << " bytes) from myself"
185 << dendl;
186
187 logger->inc(l_paxos_collect_uncommitted);
188 }
189
190 // pick new pn
11fdf7f2 191 accepted_pn = get_new_proposal_number(std::max(accepted_pn, oldpn));
7c673cae
FG
192 accepted_pn_from = last_committed;
193 num_last = 1;
194 dout(10) << "collect with pn " << accepted_pn << dendl;
195
196 // send collect
f67539c2
TL
197 for (auto p = mon.get_quorum().begin();
198 p != mon.get_quorum().end();
7c673cae 199 ++p) {
f67539c2
TL
200 if (*p == mon.rank) continue;
201
202 MMonPaxos *collect = new MMonPaxos(mon.get_epoch(), MMonPaxos::OP_COLLECT,
7c673cae
FG
203 ceph_clock_now());
204 collect->last_committed = last_committed;
205 collect->first_committed = first_committed;
206 collect->pn = accepted_pn;
f67539c2 207 mon.send_mon_message(collect, *p);
7c673cae
FG
208 }
209
210 // set timeout event
f67539c2 211 collect_timeout_event = mon.timer.add_event_after(
11fdf7f2
TL
212 g_conf()->mon_accept_timeout_factor *
213 g_conf()->mon_lease,
f67539c2 214 new C_MonContext{&mon, [this](int r) {
7c673cae
FG
215 if (r == -ECANCELED)
216 return;
217 collect_timeout();
9f95a23c 218 }});
7c673cae
FG
219}
220
221
222// peon
223void Paxos::handle_collect(MonOpRequestRef op)
224{
225
226 op->mark_paxos_event("handle_collect");
227
9f95a23c 228 auto collect = op->get_req<MMonPaxos>();
7c673cae
FG
229 dout(10) << "handle_collect " << *collect << dendl;
230
f67539c2 231 ceph_assert(mon.is_peon()); // mon epoch filter should catch strays
7c673cae
FG
232
233 // we're recoverying, it seems!
234 state = STATE_RECOVERING;
235
236 //update the peon recovery timeout
237 reset_lease_timeout();
238
239 if (collect->first_committed > last_committed+1) {
240 dout(2) << __func__
241 << " leader's lowest version is too high for our last committed"
242 << " (theirs: " << collect->first_committed
243 << "; ours: " << last_committed << ") -- bootstrap!" << dendl;
244 op->mark_paxos_event("need to bootstrap");
f67539c2 245 mon.bootstrap();
7c673cae
FG
246 return;
247 }
248
249 // reply
f67539c2 250 MMonPaxos *last = new MMonPaxos(mon.get_epoch(), MMonPaxos::OP_LAST,
7c673cae
FG
251 ceph_clock_now());
252 last->last_committed = last_committed;
253 last->first_committed = first_committed;
254
255 version_t previous_pn = accepted_pn;
256
257 // can we accept this pn?
258 if (collect->pn > accepted_pn) {
259 // ok, accept it
260 accepted_pn = collect->pn;
261 accepted_pn_from = collect->pn_from;
262 dout(10) << "accepting pn " << accepted_pn << " from "
263 << accepted_pn_from << dendl;
264
265 auto t(std::make_shared<MonitorDBStore::Transaction>());
266 t->put(get_name(), "accepted_pn", accepted_pn);
267
268 dout(30) << __func__ << " transaction dump:\n";
269 JSONFormatter f(true);
270 t->dump(&f);
271 f.flush(*_dout);
272 *_dout << dendl;
273
274 logger->inc(l_paxos_collect);
275 logger->inc(l_paxos_collect_keys, t->get_keys());
276 logger->inc(l_paxos_collect_bytes, t->get_bytes());
7c673cae 277
11fdf7f2 278 auto start = ceph::coarse_mono_clock::now();
7c673cae 279 get_store()->apply_transaction(t);
11fdf7f2 280 auto end = ceph::coarse_mono_clock::now();
7c673cae 281
11fdf7f2 282 logger->tinc(l_paxos_collect_latency, to_timespan(end - start));
7c673cae
FG
283 } else {
284 // don't accept!
285 dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from
286 << ", we already accepted " << accepted_pn
287 << " from " << accepted_pn_from << dendl;
288 }
289 last->pn = accepted_pn;
290 last->pn_from = accepted_pn_from;
291
292 // share whatever committed values we have
293 if (collect->last_committed < last_committed)
294 share_state(last, collect->first_committed, collect->last_committed);
295
296 // do we have an accepted but uncommitted value?
297 // (it'll be at last_committed+1)
298 bufferlist bl;
299 if (collect->last_committed <= last_committed &&
300 get_store()->exists(get_name(), last_committed+1)) {
301 get_store()->get(get_name(), last_committed+1, bl);
11fdf7f2 302 ceph_assert(bl.length() > 0);
7c673cae
FG
303 dout(10) << " sharing our accepted but uncommitted value for "
304 << last_committed+1 << " (" << bl.length() << " bytes)" << dendl;
305 last->values[last_committed+1] = bl;
306
307 version_t v = get_store()->get(get_name(), "pending_v");
308 version_t pn = get_store()->get(get_name(), "pending_pn");
309 if (v && pn && v == last_committed + 1) {
310 last->uncommitted_pn = pn;
311 } else {
312 // previously we didn't record which pn a value was accepted
313 // under! use the pn value we just had... :(
314 dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << previous_pn
315 << " and crossing our fingers" << dendl;
316 last->uncommitted_pn = previous_pn;
317 }
318
319 logger->inc(l_paxos_collect_uncommitted);
320 }
321
322 // send reply
323 collect->get_connection()->send_message(last);
324}
325
326/**
327 * @note This is Okay. We share our versions between peer_last_committed and
328 * our last_committed (inclusive), and add their bufferlists to the
329 * message. It will be the peer's job to apply them to its store, as
330 * these bufferlists will contain raw transactions.
331 * This function is called by both the Peon and the Leader. The Peon will
332 * share the state with the Leader during handle_collect(), sharing any
333 * values the leader may be missing (i.e., the leader's last_committed is
334 * lower than the peon's last_committed). The Leader will share the state
335 * with the Peon during handle_last(), if the peon's last_committed is
336 * lower than the leader's last_committed.
337 */
338void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed,
339 version_t peer_last_committed)
340{
11fdf7f2 341 ceph_assert(peer_last_committed < last_committed);
7c673cae
FG
342
343 dout(10) << "share_state peer has fc " << peer_first_committed
344 << " lc " << peer_last_committed << dendl;
345 version_t v = peer_last_committed + 1;
346
347 // include incrementals
348 uint64_t bytes = 0;
349 for ( ; v <= last_committed; v++) {
350 if (get_store()->exists(get_name(), v)) {
351 get_store()->get(get_name(), v, m->values[v]);
11fdf7f2 352 ceph_assert(m->values[v].length());
7c673cae
FG
353 dout(10) << " sharing " << v << " ("
354 << m->values[v].length() << " bytes)" << dendl;
355 bytes += m->values[v].length() + 16; // paxos_ + 10 digits = 16
356 }
357 }
358 logger->inc(l_paxos_share_state);
359 logger->inc(l_paxos_share_state_keys, m->values.size());
360 logger->inc(l_paxos_share_state_bytes, bytes);
361
362 m->last_committed = last_committed;
363}
364
365/**
366 * Store on disk a state that was shared with us
367 *
368 * Basically, we received a set of version. Or just one. It doesn't matter.
369 * What matters is that we have to stash it in the store. So, we will simply
370 * write every single bufferlist into their own versions on our side (i.e.,
371 * onto paxos-related keys), and then we will decode those same bufferlists
372 * we just wrote and apply the transactions they hold. We will also update
373 * our first and last committed values to point to the new values, if need
374 * be. All all this is done tightly wrapped in a transaction to ensure we
375 * enjoy the atomicity guarantees given by our awesome k/v store.
376 */
377bool Paxos::store_state(MMonPaxos *m)
378{
379 auto t(std::make_shared<MonitorDBStore::Transaction>());
f67539c2 380 auto start = m->values.begin();
7c673cae
FG
381 bool changed = false;
382
383 // build map of values to store
384 // we want to write the range [last_committed, m->last_committed] only.
385 if (start != m->values.end() &&
386 start->first > last_committed + 1) {
387 // ignore everything if values start in the future.
388 dout(10) << "store_state ignoring all values, they start at " << start->first
389 << " > last_committed+1" << dendl;
390 return false;
391 }
392
393 // push forward the start position on the message's values iterator, up until
394 // we run out of positions or we find a position matching 'last_committed'.
395 while (start != m->values.end() && start->first <= last_committed) {
396 ++start;
397 }
398
399 // make sure we get the right interval of values to apply by pushing forward
400 // the 'end' iterator until it matches the message's 'last_committed'.
f67539c2 401 auto end = start;
7c673cae
FG
402 while (end != m->values.end() && end->first <= m->last_committed) {
403 last_committed = end->first;
404 ++end;
405 }
406
407 if (start == end) {
408 dout(10) << "store_state nothing to commit" << dendl;
409 } else {
410 dout(10) << "store_state [" << start->first << ".."
411 << last_committed << "]" << dendl;
412 t->put(get_name(), "last_committed", last_committed);
413
414 // we should apply the state here -- decode every single bufferlist in the
415 // map and append the transactions to 't'.
f67539c2 416 for (auto it = start; it != end; ++it) {
7c673cae
FG
417 // write the bufferlist as the version's value
418 t->put(get_name(), it->first, it->second);
419 // decode the bufferlist and append it to the transaction we will shortly
420 // apply.
421 decode_append_transaction(t, it->second);
422 }
423
424 // discard obsolete uncommitted value?
425 if (uncommitted_v && uncommitted_v <= last_committed) {
426 dout(10) << " forgetting obsolete uncommitted value " << uncommitted_v
427 << " pn " << uncommitted_pn << dendl;
428 uncommitted_v = 0;
429 uncommitted_pn = 0;
430 uncommitted_value.clear();
431 }
432 }
433 if (!t->empty()) {
434 dout(30) << __func__ << " transaction dump:\n";
435 JSONFormatter f(true);
436 t->dump(&f);
437 f.flush(*_dout);
438 *_dout << dendl;
439
440 logger->inc(l_paxos_store_state);
441 logger->inc(l_paxos_store_state_bytes, t->get_bytes());
442 logger->inc(l_paxos_store_state_keys, t->get_keys());
7c673cae 443
11fdf7f2 444 auto start = ceph::coarse_mono_clock::now();
7c673cae 445 get_store()->apply_transaction(t);
11fdf7f2 446 auto end = ceph::coarse_mono_clock::now();
7c673cae 447
11fdf7f2 448 logger->tinc(l_paxos_store_state_latency, to_timespan(end-start));
7c673cae
FG
449
450 // refresh first_committed; this txn may have trimmed.
451 first_committed = get_store()->get(get_name(), "first_committed");
452
453 _sanity_check_store();
454 changed = true;
455 }
456
457 return changed;
458}
459
460void Paxos::_sanity_check_store()
461{
462 version_t lc = get_store()->get(get_name(), "last_committed");
11fdf7f2 463 ceph_assert(lc == last_committed);
7c673cae
FG
464}
465
466
467// leader
468void Paxos::handle_last(MonOpRequestRef op)
469{
470 op->mark_paxos_event("handle_last");
9f95a23c 471 auto last = op->get_req<MMonPaxos>();
7c673cae
FG
472 bool need_refresh = false;
473 int from = last->get_source().num();
474
475 dout(10) << "handle_last " << *last << dendl;
476
f67539c2 477 if (!mon.is_leader()) {
7c673cae
FG
478 dout(10) << "not leader, dropping" << dendl;
479 return;
480 }
481
482 // note peer's first_ and last_committed, in case we learn a new
483 // commit and need to push it to them.
484 peer_first_committed[from] = last->first_committed;
485 peer_last_committed[from] = last->last_committed;
486
487 if (last->first_committed > last_committed + 1) {
488 dout(5) << __func__
489 << " mon." << from
490 << " lowest version is too high for our last committed"
491 << " (theirs: " << last->first_committed
492 << "; ours: " << last_committed << ") -- bootstrap!" << dendl;
493 op->mark_paxos_event("need to bootstrap");
f67539c2 494 mon.bootstrap();
7c673cae
FG
495 return;
496 }
497
11fdf7f2 498 ceph_assert(g_conf()->paxos_kill_at != 1);
7c673cae
FG
499
500 // store any committed values if any are specified in the message
501 need_refresh = store_state(last);
502
11fdf7f2 503 ceph_assert(g_conf()->paxos_kill_at != 2);
7c673cae
FG
504
505 // is everyone contiguous and up to date?
f67539c2 506 for (auto p = peer_last_committed.begin();
7c673cae
FG
507 p != peer_last_committed.end();
508 ++p) {
509 if (p->second + 1 < first_committed && first_committed > 1) {
510 dout(5) << __func__
511 << " peon " << p->first
512 << " last_committed (" << p->second
513 << ") is too low for our first_committed (" << first_committed
514 << ") -- bootstrap!" << dendl;
515 op->mark_paxos_event("need to bootstrap");
f67539c2 516 mon.bootstrap();
7c673cae
FG
517 return;
518 }
519 if (p->second < last_committed) {
520 // share committed values
521 dout(10) << " sending commit to mon." << p->first << dendl;
f67539c2 522 MMonPaxos *commit = new MMonPaxos(mon.get_epoch(),
7c673cae
FG
523 MMonPaxos::OP_COMMIT,
524 ceph_clock_now());
525 share_state(commit, peer_first_committed[p->first], p->second);
f67539c2 526 mon.send_mon_message(commit, p->first);
7c673cae
FG
527 }
528 }
529
530 // do they accept your pn?
531 if (last->pn > accepted_pn) {
532 // no, try again.
533 dout(10) << " they had a higher pn than us, picking a new one." << dendl;
534
535 // cancel timeout event
f67539c2 536 mon.timer.cancel_event(collect_timeout_event);
7c673cae
FG
537 collect_timeout_event = 0;
538
539 collect(last->pn);
540 } else if (last->pn == accepted_pn) {
541 // yes, they accepted our pn. great.
542 num_last++;
543 dout(10) << " they accepted our pn, we now have "
544 << num_last << " peons" << dendl;
545
546 // did this person send back an accepted but uncommitted value?
547 if (last->uncommitted_pn) {
548 if (last->uncommitted_pn >= uncommitted_pn &&
549 last->last_committed >= last_committed &&
550 last->last_committed + 1 >= uncommitted_v) {
551 uncommitted_v = last->last_committed+1;
552 uncommitted_pn = last->uncommitted_pn;
553 uncommitted_value = last->values[uncommitted_v];
554 dout(10) << "we learned an uncommitted value for " << uncommitted_v
555 << " pn " << uncommitted_pn
556 << " " << uncommitted_value.length() << " bytes"
557 << dendl;
558 } else {
559 dout(10) << "ignoring uncommitted value for " << (last->last_committed+1)
560 << " pn " << last->uncommitted_pn
561 << " " << last->values[last->last_committed+1].length() << " bytes"
562 << dendl;
563 }
564 }
565
566 // is that everyone?
f67539c2 567 if (num_last == mon.get_quorum().size()) {
7c673cae 568 // cancel timeout event
f67539c2 569 mon.timer.cancel_event(collect_timeout_event);
7c673cae
FG
570 collect_timeout_event = 0;
571 peer_first_committed.clear();
572 peer_last_committed.clear();
573
574 // almost...
575
576 // did we learn an old value?
577 if (uncommitted_v == last_committed+1 &&
578 uncommitted_value.length()) {
579 dout(10) << "that's everyone. begin on old learned value" << dendl;
580 state = STATE_UPDATING_PREVIOUS;
581 begin(uncommitted_value);
582 } else {
583 // active!
584 dout(10) << "that's everyone. active!" << dendl;
585 extend_lease();
586
587 need_refresh = false;
588 if (do_refresh()) {
589 finish_round();
590 }
591 }
592 }
593 } else {
594 // no, this is an old message, discard
595 dout(10) << "old pn, ignoring" << dendl;
596 }
597
598 if (need_refresh)
599 (void)do_refresh();
600}
601
602void Paxos::collect_timeout()
603{
604 dout(1) << "collect timeout, calling fresh election" << dendl;
605 collect_timeout_event = 0;
606 logger->inc(l_paxos_collect_timeout);
f67539c2
TL
607 ceph_assert(mon.is_leader());
608 mon.bootstrap();
7c673cae
FG
609}
610
611
612// leader
613void Paxos::begin(bufferlist& v)
614{
615 dout(10) << "begin for " << last_committed+1 << " "
616 << v.length() << " bytes"
617 << dendl;
618
f67539c2 619 ceph_assert(mon.is_leader());
11fdf7f2 620 ceph_assert(is_updating() || is_updating_previous());
7c673cae
FG
621
622 // we must already have a majority for this to work.
f67539c2
TL
623 ceph_assert(mon.get_quorum().size() == 1 ||
624 num_last > (unsigned)mon.monmap->size()/2);
7c673cae
FG
625
626 // and no value, yet.
11fdf7f2 627 ceph_assert(new_value.length() == 0);
7c673cae
FG
628
629 // accept it ourselves
630 accepted.clear();
f67539c2 631 accepted.insert(mon.rank);
7c673cae
FG
632 new_value = v;
633
634 if (last_committed == 0) {
635 auto t(std::make_shared<MonitorDBStore::Transaction>());
636 // initial base case; set first_committed too
637 t->put(get_name(), "first_committed", 1);
638 decode_append_transaction(t, new_value);
639
640 bufferlist tx_bl;
641 t->encode(tx_bl);
642
643 new_value = tx_bl;
644 }
645
646 // store the proposed value in the store. IF it is accepted, we will then
647 // have to decode it into a transaction and apply it.
648 auto t(std::make_shared<MonitorDBStore::Transaction>());
649 t->put(get_name(), last_committed+1, new_value);
650
651 // note which pn this pending value is for.
652 t->put(get_name(), "pending_v", last_committed + 1);
653 t->put(get_name(), "pending_pn", accepted_pn);
654
655 dout(30) << __func__ << " transaction dump:\n";
656 JSONFormatter f(true);
657 t->dump(&f);
658 f.flush(*_dout);
659 auto debug_tx(std::make_shared<MonitorDBStore::Transaction>());
11fdf7f2 660 auto new_value_it = new_value.cbegin();
7c673cae
FG
661 debug_tx->decode(new_value_it);
662 debug_tx->dump(&f);
663 *_dout << "\nbl dump:\n";
664 f.flush(*_dout);
665 *_dout << dendl;
666
667 logger->inc(l_paxos_begin);
668 logger->inc(l_paxos_begin_keys, t->get_keys());
669 logger->inc(l_paxos_begin_bytes, t->get_bytes());
7c673cae 670
11fdf7f2 671 auto start = ceph::coarse_mono_clock::now();
7c673cae 672 get_store()->apply_transaction(t);
11fdf7f2 673 auto end = ceph::coarse_mono_clock::now();
7c673cae 674
11fdf7f2 675 logger->tinc(l_paxos_begin_latency, to_timespan(end - start));
7c673cae 676
11fdf7f2 677 ceph_assert(g_conf()->paxos_kill_at != 3);
7c673cae 678
f67539c2 679 if (mon.get_quorum().size() == 1) {
7c673cae
FG
680 // we're alone, take it easy
681 commit_start();
682 return;
683 }
684
685 // ask others to accept it too!
f67539c2
TL
686 for (auto p = mon.get_quorum().begin();
687 p != mon.get_quorum().end();
7c673cae 688 ++p) {
f67539c2 689 if (*p == mon.rank) continue;
7c673cae
FG
690
691 dout(10) << " sending begin to mon." << *p << dendl;
f67539c2 692 MMonPaxos *begin = new MMonPaxos(mon.get_epoch(), MMonPaxos::OP_BEGIN,
7c673cae
FG
693 ceph_clock_now());
694 begin->values[last_committed+1] = new_value;
695 begin->last_committed = last_committed;
696 begin->pn = accepted_pn;
697
f67539c2 698 mon.send_mon_message(begin, *p);
7c673cae
FG
699 }
700
701 // set timeout event
f67539c2 702 accept_timeout_event = mon.timer.add_event_after(
11fdf7f2 703 g_conf()->mon_accept_timeout_factor * g_conf()->mon_lease,
f67539c2 704 new C_MonContext{&mon, [this](int r) {
3efd9988
FG
705 if (r == -ECANCELED)
706 return;
707 accept_timeout();
9f95a23c 708 }});
7c673cae
FG
709}
710
711// peon
712void Paxos::handle_begin(MonOpRequestRef op)
713{
714 op->mark_paxos_event("handle_begin");
9f95a23c 715 auto begin = op->get_req<MMonPaxos>();
7c673cae
FG
716 dout(10) << "handle_begin " << *begin << dendl;
717
718 // can we accept this?
719 if (begin->pn < accepted_pn) {
720 dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
721 op->mark_paxos_event("have higher pn, ignore");
722 return;
723 }
11fdf7f2
TL
724 ceph_assert(begin->pn == accepted_pn);
725 ceph_assert(begin->last_committed == last_committed);
7c673cae 726
11fdf7f2 727 ceph_assert(g_conf()->paxos_kill_at != 4);
7c673cae
FG
728
729 logger->inc(l_paxos_begin);
730
731 // set state.
732 state = STATE_UPDATING;
9f95a23c 733 lease_expire = {}; // cancel lease
7c673cae
FG
734
735 // yes.
736 version_t v = last_committed+1;
737 dout(10) << "accepting value for " << v << " pn " << accepted_pn << dendl;
738 // store the accepted value onto our store. We will have to decode it and
739 // apply its transaction once we receive permission to commit.
740 auto t(std::make_shared<MonitorDBStore::Transaction>());
741 t->put(get_name(), v, begin->values[v]);
742
743 // note which pn this pending value is for.
744 t->put(get_name(), "pending_v", v);
745 t->put(get_name(), "pending_pn", accepted_pn);
746
747 dout(30) << __func__ << " transaction dump:\n";
748 JSONFormatter f(true);
749 t->dump(&f);
750 f.flush(*_dout);
751 *_dout << dendl;
752
753 logger->inc(l_paxos_begin_bytes, t->get_bytes());
7c673cae 754
11fdf7f2 755 auto start = ceph::coarse_mono_clock::now();
7c673cae 756 get_store()->apply_transaction(t);
11fdf7f2 757 auto end = ceph::coarse_mono_clock::now();
7c673cae 758
11fdf7f2 759 logger->tinc(l_paxos_begin_latency, to_timespan(end - start));
7c673cae 760
11fdf7f2 761 ceph_assert(g_conf()->paxos_kill_at != 5);
7c673cae
FG
762
763 // reply
f67539c2 764 MMonPaxos *accept = new MMonPaxos(mon.get_epoch(), MMonPaxos::OP_ACCEPT,
7c673cae
FG
765 ceph_clock_now());
766 accept->pn = accepted_pn;
767 accept->last_committed = last_committed;
768 begin->get_connection()->send_message(accept);
769}
770
771// leader
772void Paxos::handle_accept(MonOpRequestRef op)
773{
774 op->mark_paxos_event("handle_accept");
9f95a23c 775 auto accept = op->get_req<MMonPaxos>();
7c673cae
FG
776 dout(10) << "handle_accept " << *accept << dendl;
777 int from = accept->get_source().num();
778
779 if (accept->pn != accepted_pn) {
780 // we accepted a higher pn, from some other leader
781 dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
782 op->mark_paxos_event("have higher pn, ignore");
783 return;
784 }
785 if (last_committed > 0 &&
786 accept->last_committed < last_committed-1) {
787 dout(10) << " this is from an old round, ignoring" << dendl;
788 op->mark_paxos_event("old round, ignore");
789 return;
790 }
11fdf7f2 791 ceph_assert(accept->last_committed == last_committed || // not committed
7c673cae
FG
792 accept->last_committed == last_committed-1); // committed
793
11fdf7f2
TL
794 ceph_assert(is_updating() || is_updating_previous());
795 ceph_assert(accepted.count(from) == 0);
7c673cae
FG
796 accepted.insert(from);
797 dout(10) << " now " << accepted << " have accepted" << dendl;
798
11fdf7f2 799 ceph_assert(g_conf()->paxos_kill_at != 6);
7c673cae
FG
800
801 // only commit (and expose committed state) when we get *all* quorum
802 // members to accept. otherwise, they may still be sharing the now
803 // stale state.
804 // FIXME: we can improve this with an additional lease revocation message
805 // that doesn't block for the persist.
f67539c2 806 if (accepted == mon.get_quorum()) {
7c673cae
FG
807 // yay, commit!
808 dout(10) << " got majority, committing, done with update" << dendl;
809 op->mark_paxos_event("commit_start");
810 commit_start();
811 }
812}
813
814void Paxos::accept_timeout()
815{
816 dout(1) << "accept timeout, calling fresh election" << dendl;
817 accept_timeout_event = 0;
f67539c2 818 ceph_assert(mon.is_leader());
11fdf7f2 819 ceph_assert(is_updating() || is_updating_previous() || is_writing() ||
7c673cae
FG
820 is_writing_previous());
821 logger->inc(l_paxos_accept_timeout);
f67539c2 822 mon.bootstrap();
7c673cae
FG
823}
824
825struct C_Committed : public Context {
826 Paxos *paxos;
827 explicit C_Committed(Paxos *p) : paxos(p) {}
828 void finish(int r) override {
11fdf7f2 829 ceph_assert(r >= 0);
f67539c2 830 std::lock_guard l(paxos->mon.lock);
35e4c445
FG
831 if (paxos->is_shutdown()) {
832 paxos->abort_commit();
833 return;
834 }
7c673cae
FG
835 paxos->commit_finish();
836 }
837};
838
35e4c445
FG
839void Paxos::abort_commit()
840{
11fdf7f2 841 ceph_assert(commits_started > 0);
35e4c445
FG
842 --commits_started;
843 if (commits_started == 0)
9f95a23c 844 shutdown_cond.notify_all();
35e4c445
FG
845}
846
7c673cae
FG
847void Paxos::commit_start()
848{
849 dout(10) << __func__ << " " << (last_committed+1) << dendl;
850
11fdf7f2 851 ceph_assert(g_conf()->paxos_kill_at != 7);
7c673cae
FG
852
853 auto t(std::make_shared<MonitorDBStore::Transaction>());
854
855 // commit locally
856 t->put(get_name(), "last_committed", last_committed + 1);
857
858 // decode the value and apply its transaction to the store.
859 // this value can now be read from last_committed.
860 decode_append_transaction(t, new_value);
861
862 dout(30) << __func__ << " transaction dump:\n";
863 JSONFormatter f(true);
864 t->dump(&f);
865 f.flush(*_dout);
866 *_dout << dendl;
867
868 logger->inc(l_paxos_commit);
869 logger->inc(l_paxos_commit_keys, t->get_keys());
870 logger->inc(l_paxos_commit_bytes, t->get_bytes());
871 commit_start_stamp = ceph_clock_now();
872
873 get_store()->queue_transaction(t, new C_Committed(this));
874
875 if (is_updating_previous())
876 state = STATE_WRITING_PREVIOUS;
877 else if (is_updating())
878 state = STATE_WRITING;
879 else
880 ceph_abort();
35e4c445 881 ++commits_started;
7c673cae 882
f67539c2 883 if (mon.get_quorum().size() > 1) {
7c673cae 884 // cancel timeout event
f67539c2 885 mon.timer.cancel_event(accept_timeout_event);
7c673cae
FG
886 accept_timeout_event = 0;
887 }
888}
889
890void Paxos::commit_finish()
891{
892 dout(20) << __func__ << " " << (last_committed+1) << dendl;
893 utime_t end = ceph_clock_now();
894 logger->tinc(l_paxos_commit_latency, end - commit_start_stamp);
895
11fdf7f2 896 ceph_assert(g_conf()->paxos_kill_at != 8);
7c673cae
FG
897
898 // cancel lease - it was for the old value.
899 // (this would only happen if message layer lost the 'begin', but
900 // leader still got a majority and committed with out us.)
9f95a23c 901 lease_expire = {}; // cancel lease
7c673cae
FG
902
903 last_committed++;
904 last_commit_time = ceph_clock_now();
905
906 // refresh first_committed; this txn may have trimmed.
907 first_committed = get_store()->get(get_name(), "first_committed");
908
909 _sanity_check_store();
910
911 // tell everyone
f67539c2
TL
912 for (auto p = mon.get_quorum().begin();
913 p != mon.get_quorum().end();
7c673cae 914 ++p) {
f67539c2 915 if (*p == mon.rank) continue;
7c673cae
FG
916
917 dout(10) << " sending commit to mon." << *p << dendl;
f67539c2 918 MMonPaxos *commit = new MMonPaxos(mon.get_epoch(), MMonPaxos::OP_COMMIT,
7c673cae
FG
919 ceph_clock_now());
920 commit->values[last_committed] = new_value;
921 commit->pn = accepted_pn;
922 commit->last_committed = last_committed;
923
f67539c2 924 mon.send_mon_message(commit, *p);
7c673cae
FG
925 }
926
11fdf7f2 927 ceph_assert(g_conf()->paxos_kill_at != 9);
7c673cae
FG
928
929 // get ready for a new round.
930 new_value.clear();
931
932 // WRITING -> REFRESH
f67539c2 933 // among other things, this lets do_refresh() -> mon.bootstrap() ->
11fdf7f2
TL
934 // wait_for_paxos_write() know that it doesn't need to flush the store
935 // queue. and it should not, as we are in the async completion thread now!
936 ceph_assert(is_writing() || is_writing_previous());
7c673cae 937 state = STATE_REFRESH;
11fdf7f2 938 ceph_assert(commits_started > 0);
35e4c445 939 --commits_started;
7c673cae
FG
940
941 if (do_refresh()) {
942 commit_proposal();
f67539c2 943 if (mon.get_quorum().size() > 1) {
7c673cae
FG
944 extend_lease();
945 }
946
11fdf7f2 947 ceph_assert(g_conf()->paxos_kill_at != 10);
7c673cae
FG
948
949 finish_round();
950 }
951}
952
953
954void Paxos::handle_commit(MonOpRequestRef op)
955{
956 op->mark_paxos_event("handle_commit");
9f95a23c 957 auto commit = op->get_req<MMonPaxos>();
7c673cae
FG
958 dout(10) << "handle_commit on " << commit->last_committed << dendl;
959
960 logger->inc(l_paxos_commit);
961
f67539c2 962 if (!mon.is_peon()) {
7c673cae
FG
963 dout(10) << "not a peon, dropping" << dendl;
964 ceph_abort();
965 return;
966 }
967
968 op->mark_paxos_event("store_state");
969 store_state(commit);
970
11fdf7f2 971 (void)do_refresh();
7c673cae
FG
972}
973
974void Paxos::extend_lease()
975{
f67539c2 976 ceph_assert(mon.is_leader());
7c673cae
FG
977 //assert(is_active());
978
9f95a23c
TL
979 lease_expire = ceph::real_clock::now();
980 lease_expire += ceph::make_timespan(g_conf()->mon_lease);
7c673cae 981 acked_lease.clear();
f67539c2 982 acked_lease.insert(mon.rank);
7c673cae 983
11fdf7f2 984 dout(7) << "extend_lease now+" << g_conf()->mon_lease
7c673cae
FG
985 << " (" << lease_expire << ")" << dendl;
986
987 // bcast
f67539c2
TL
988 for (auto p = mon.get_quorum().begin();
989 p != mon.get_quorum().end(); ++p) {
7c673cae 990
f67539c2
TL
991 if (*p == mon.rank) continue;
992 MMonPaxos *lease = new MMonPaxos(mon.get_epoch(), MMonPaxos::OP_LEASE,
7c673cae
FG
993 ceph_clock_now());
994 lease->last_committed = last_committed;
9f95a23c 995 lease->lease_timestamp = utime_t{lease_expire};
7c673cae 996 lease->first_committed = first_committed;
f67539c2 997 mon.send_mon_message(lease, *p);
7c673cae
FG
998 }
999
1000 // set timeout event.
1001 // if old timeout is still in place, leave it.
1002 if (!lease_ack_timeout_event) {
f67539c2 1003 lease_ack_timeout_event = mon.timer.add_event_after(
11fdf7f2 1004 g_conf()->mon_lease_ack_timeout_factor * g_conf()->mon_lease,
f67539c2 1005 new C_MonContext{&mon, [this](int r) {
3efd9988
FG
1006 if (r == -ECANCELED)
1007 return;
1008 lease_ack_timeout();
9f95a23c 1009 }});
7c673cae
FG
1010 }
1011
1012 // set renew event
9f95a23c
TL
1013 auto at = lease_expire;
1014 at -= ceph::make_timespan(g_conf()->mon_lease);
1015 at += ceph::make_timespan(g_conf()->mon_lease_renew_interval_factor *
1016 g_conf()->mon_lease);
f67539c2
TL
1017 lease_renew_event = mon.timer.add_event_at(
1018 at, new C_MonContext{&mon, [this](int r) {
3efd9988
FG
1019 if (r == -ECANCELED)
1020 return;
1021 lease_renew_timeout();
9f95a23c 1022 }});
7c673cae
FG
1023}
1024
1025void Paxos::warn_on_future_time(utime_t t, entity_name_t from)
1026{
1027 utime_t now = ceph_clock_now();
1028 if (t > now) {
1029 utime_t diff = t - now;
11fdf7f2 1030 if (diff > g_conf()->mon_clock_drift_allowed) {
7c673cae
FG
1031 utime_t warn_diff = now - last_clock_drift_warn;
1032 if (warn_diff >
11fdf7f2 1033 pow(g_conf()->mon_clock_drift_warn_backoff, clock_drift_warned)) {
f67539c2 1034 mon.clog->warn() << "message from " << from << " was stamped " << diff
7c673cae
FG
1035 << "s in the future, clocks not synchronized";
1036 last_clock_drift_warn = ceph_clock_now();
1037 ++clock_drift_warned;
1038 }
1039 }
1040 }
1041
1042}
1043
1044bool Paxos::do_refresh()
1045{
1046 bool need_bootstrap = false;
1047
7c673cae 1048 // make sure we have the latest state loaded up
11fdf7f2 1049 auto start = ceph::coarse_mono_clock::now();
f67539c2 1050 mon.refresh_from_paxos(&need_bootstrap);
11fdf7f2 1051 auto end = ceph::coarse_mono_clock::now();
7c673cae 1052
7c673cae 1053 logger->inc(l_paxos_refresh);
11fdf7f2 1054 logger->tinc(l_paxos_refresh_latency, to_timespan(end - start));
7c673cae
FG
1055
1056 if (need_bootstrap) {
1057 dout(10) << " doing requested bootstrap" << dendl;
f67539c2 1058 mon.bootstrap();
7c673cae
FG
1059 return false;
1060 }
1061
1062 return true;
1063}
1064
1065void Paxos::commit_proposal()
1066{
1067 dout(10) << __func__ << dendl;
f67539c2 1068 ceph_assert(mon.is_leader());
11fdf7f2 1069 ceph_assert(is_refresh());
7c673cae
FG
1070
1071 finish_contexts(g_ceph_context, committing_finishers);
1072}
1073
1074void Paxos::finish_round()
1075{
1076 dout(10) << __func__ << dendl;
f67539c2 1077 ceph_assert(mon.is_leader());
7c673cae
FG
1078
1079 // ok, now go active!
1080 state = STATE_ACTIVE;
1081
1082 dout(20) << __func__ << " waiting_for_acting" << dendl;
1083 finish_contexts(g_ceph_context, waiting_for_active);
1084 dout(20) << __func__ << " waiting_for_readable" << dendl;
1085 finish_contexts(g_ceph_context, waiting_for_readable);
1086 dout(20) << __func__ << " waiting_for_writeable" << dendl;
1087 finish_contexts(g_ceph_context, waiting_for_writeable);
1088
1089 dout(10) << __func__ << " done w/ waiters, state " << get_statename(state) << dendl;
1090
1091 if (should_trim()) {
1092 trim();
1093 }
1094
1095 if (is_active() && pending_proposal) {
1096 propose_pending();
1097 }
1098}
1099
1100
1101// peon
1102void Paxos::handle_lease(MonOpRequestRef op)
1103{
1104 op->mark_paxos_event("handle_lease");
9f95a23c 1105 auto lease = op->get_req<MMonPaxos>();
7c673cae 1106 // sanity
f67539c2 1107 if (!mon.is_peon() ||
7c673cae
FG
1108 last_committed != lease->last_committed) {
1109 dout(10) << "handle_lease i'm not a peon, or they're not the leader,"
1110 << " or the last_committed doesn't match, dropping" << dendl;
1111 op->mark_paxos_event("invalid lease, ignore");
1112 return;
1113 }
1114
1115 warn_on_future_time(lease->sent_timestamp, lease->get_source());
1116
1117 // extend lease
9f95a23c
TL
1118 if (auto new_expire = lease->lease_timestamp.to_real_time();
1119 lease_expire < new_expire) {
1120 lease_expire = new_expire;
7c673cae 1121
9f95a23c 1122 auto now = ceph::real_clock::now();
7c673cae 1123 if (lease_expire < now) {
9f95a23c 1124 auto diff = now - lease_expire;
7c673cae
FG
1125 derr << "lease_expire from " << lease->get_source_inst() << " is " << diff << " seconds in the past; mons are probably laggy (or possibly clocks are too skewed)" << dendl;
1126 }
1127 }
1128
1129 state = STATE_ACTIVE;
1130
1131 dout(10) << "handle_lease on " << lease->last_committed
1132 << " now " << lease_expire << dendl;
1133
1134 // ack
f67539c2 1135 MMonPaxos *ack = new MMonPaxos(mon.get_epoch(), MMonPaxos::OP_LEASE_ACK,
7c673cae
FG
1136 ceph_clock_now());
1137 ack->last_committed = last_committed;
1138 ack->first_committed = first_committed;
1139 ack->lease_timestamp = ceph_clock_now();
f67539c2 1140 encode(mon.session_map.feature_map, ack->feature_map);
7c673cae
FG
1141 lease->get_connection()->send_message(ack);
1142
1143 // (re)set timeout event.
1144 reset_lease_timeout();
1145
1146 // kick waiters
1147 finish_contexts(g_ceph_context, waiting_for_active);
1148 if (is_readable())
1149 finish_contexts(g_ceph_context, waiting_for_readable);
1150}
1151
1152void Paxos::handle_lease_ack(MonOpRequestRef op)
1153{
1154 op->mark_paxos_event("handle_lease_ack");
9f95a23c 1155 auto ack = op->get_req<MMonPaxos>();
7c673cae
FG
1156 int from = ack->get_source().num();
1157
1158 if (!lease_ack_timeout_event) {
224ce89b 1159 dout(10) << "handle_lease_ack from " << ack->get_source()
7c673cae 1160 << " -- stray (probably since revoked)" << dendl;
224ce89b
WB
1161
1162 } else if (acked_lease.count(from) == 0) {
7c673cae 1163 acked_lease.insert(from);
31f18b77 1164 if (ack->feature_map.length()) {
11fdf7f2 1165 auto p = ack->feature_map.cbegin();
f67539c2 1166 FeatureMap& t = mon.quorum_feature_map[from];
11fdf7f2 1167 decode(t, p);
31f18b77 1168 }
f67539c2 1169 if (acked_lease == mon.get_quorum()) {
7c673cae 1170 // yay!
224ce89b 1171 dout(10) << "handle_lease_ack from " << ack->get_source()
7c673cae 1172 << " -- got everyone" << dendl;
f67539c2 1173 mon.timer.cancel_event(lease_ack_timeout_event);
7c673cae
FG
1174 lease_ack_timeout_event = 0;
1175
1176
1177 } else {
224ce89b 1178 dout(10) << "handle_lease_ack from " << ack->get_source()
7c673cae 1179 << " -- still need "
f67539c2 1180 << mon.get_quorum().size() - acked_lease.size()
7c673cae
FG
1181 << " more" << dendl;
1182 }
1183 } else {
224ce89b 1184 dout(10) << "handle_lease_ack from " << ack->get_source()
7c673cae
FG
1185 << " dup (lagging!), ignoring" << dendl;
1186 }
1187
1188 warn_on_future_time(ack->sent_timestamp, ack->get_source());
1189}
1190
1191void Paxos::lease_ack_timeout()
1192{
1193 dout(1) << "lease_ack_timeout -- calling new election" << dendl;
f67539c2 1194 ceph_assert(mon.is_leader());
11fdf7f2 1195 ceph_assert(is_active());
7c673cae
FG
1196 logger->inc(l_paxos_lease_ack_timeout);
1197 lease_ack_timeout_event = 0;
f67539c2 1198 mon.bootstrap();
7c673cae
FG
1199}
1200
1201void Paxos::reset_lease_timeout()
1202{
1203 dout(20) << "reset_lease_timeout - setting timeout event" << dendl;
1204 if (lease_timeout_event)
f67539c2
TL
1205 mon.timer.cancel_event(lease_timeout_event);
1206 lease_timeout_event = mon.timer.add_event_after(
11fdf7f2 1207 g_conf()->mon_lease_ack_timeout_factor * g_conf()->mon_lease,
f67539c2 1208 new C_MonContext{&mon, [this](int r) {
3efd9988
FG
1209 if (r == -ECANCELED)
1210 return;
1211 lease_timeout();
9f95a23c 1212 }});
7c673cae
FG
1213}
1214
1215void Paxos::lease_timeout()
1216{
1217 dout(1) << "lease_timeout -- calling new election" << dendl;
f67539c2 1218 ceph_assert(mon.is_peon());
7c673cae
FG
1219 logger->inc(l_paxos_lease_timeout);
1220 lease_timeout_event = 0;
f67539c2 1221 mon.bootstrap();
7c673cae
FG
1222}
1223
1224void Paxos::lease_renew_timeout()
1225{
1226 lease_renew_event = 0;
1227 extend_lease();
1228}
1229
1230
1231/*
1232 * trim old states
1233 */
1234void Paxos::trim()
1235{
11fdf7f2
TL
1236 ceph_assert(should_trim());
1237 version_t end = std::min(get_version() - g_conf()->paxos_min,
1238 get_first_committed() + g_conf()->paxos_trim_max);
7c673cae
FG
1239
1240 if (first_committed >= end)
1241 return;
1242
1243 dout(10) << "trim to " << end << " (was " << first_committed << ")" << dendl;
1244
1245 MonitorDBStore::TransactionRef t = get_pending_transaction();
1246
1247 for (version_t v = first_committed; v < end; ++v) {
1248 dout(10) << "trim " << v << dendl;
1249 t->erase(get_name(), v);
1250 }
1251 t->put(get_name(), "first_committed", end);
11fdf7f2 1252 if (g_conf()->mon_compact_on_trim) {
7c673cae
FG
1253 dout(10) << " compacting trimmed range" << dendl;
1254 t->compact_range(get_name(), stringify(first_committed - 1), stringify(end));
1255 }
1256
1257 trimming = true;
1258 queue_pending_finisher(new C_Trimmed(this));
1259}
1260
1261/*
1262 * return a globally unique, monotonically increasing proposal number
1263 */
1264version_t Paxos::get_new_proposal_number(version_t gt)
1265{
1266 if (last_pn < gt)
1267 last_pn = gt;
1268
1269 // update. make it unique among all monitors.
1270 last_pn /= 100;
1271 last_pn++;
1272 last_pn *= 100;
f67539c2 1273 last_pn += (version_t)mon.rank;
7c673cae
FG
1274
1275 // write
1276 auto t(std::make_shared<MonitorDBStore::Transaction>());
1277 t->put(get_name(), "last_pn", last_pn);
1278
1279 dout(30) << __func__ << " transaction dump:\n";
1280 JSONFormatter f(true);
1281 t->dump(&f);
1282 f.flush(*_dout);
1283 *_dout << dendl;
1284
1285 logger->inc(l_paxos_new_pn);
7c673cae 1286
11fdf7f2 1287 auto start = ceph::coarse_mono_clock::now();
7c673cae 1288 get_store()->apply_transaction(t);
11fdf7f2 1289 auto end = ceph::coarse_mono_clock::now();
7c673cae 1290
11fdf7f2 1291 logger->tinc(l_paxos_new_pn_latency, to_timespan(end - start));
7c673cae
FG
1292
1293 dout(10) << "get_new_proposal_number = " << last_pn << dendl;
1294 return last_pn;
1295}
1296
1297
1298void Paxos::cancel_events()
1299{
1300 if (collect_timeout_event) {
f67539c2 1301 mon.timer.cancel_event(collect_timeout_event);
7c673cae
FG
1302 collect_timeout_event = 0;
1303 }
1304 if (accept_timeout_event) {
f67539c2 1305 mon.timer.cancel_event(accept_timeout_event);
7c673cae
FG
1306 accept_timeout_event = 0;
1307 }
1308 if (lease_renew_event) {
f67539c2 1309 mon.timer.cancel_event(lease_renew_event);
7c673cae
FG
1310 lease_renew_event = 0;
1311 }
1312 if (lease_ack_timeout_event) {
f67539c2 1313 mon.timer.cancel_event(lease_ack_timeout_event);
7c673cae
FG
1314 lease_ack_timeout_event = 0;
1315 }
1316 if (lease_timeout_event) {
f67539c2 1317 mon.timer.cancel_event(lease_timeout_event);
7c673cae
FG
1318 lease_timeout_event = 0;
1319 }
1320}
1321
1322void Paxos::shutdown()
1323{
1324 dout(10) << __func__ << " cancel all contexts" << dendl;
1325
35e4c445
FG
1326 state = STATE_SHUTDOWN;
1327
7c673cae
FG
1328 // discard pending transaction
1329 pending_proposal.reset();
1330
35e4c445
FG
1331 // Let store finish commits in progress
1332 // XXX: I assume I can't use finish_contexts() because the store
1333 // is going to trigger
f67539c2 1334 unique_lock l{mon.lock, std::adopt_lock};
9f95a23c
TL
1335 shutdown_cond.wait(l, [this] { return commits_started <= 0; });
1336 // Monitor::shutdown() will unlock it
1337 l.release();
35e4c445 1338
7c673cae 1339 finish_contexts(g_ceph_context, waiting_for_writeable, -ECANCELED);
7c673cae
FG
1340 finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED);
1341 finish_contexts(g_ceph_context, waiting_for_active, -ECANCELED);
1342 finish_contexts(g_ceph_context, pending_finishers, -ECANCELED);
1343 finish_contexts(g_ceph_context, committing_finishers, -ECANCELED);
1344 if (logger)
1345 g_ceph_context->get_perfcounters_collection()->remove(logger);
7c673cae
FG
1346}
1347
1348void Paxos::leader_init()
1349{
1350 cancel_events();
1351 new_value.clear();
1352
1353 // discard pending transaction
1354 pending_proposal.reset();
1355
494da23a 1356 reset_pending_committing_finishers();
7c673cae
FG
1357
1358 logger->inc(l_paxos_start_leader);
1359
f67539c2 1360 if (mon.get_quorum().size() == 1) {
7c673cae
FG
1361 state = STATE_ACTIVE;
1362 return;
1363 }
1364
1365 state = STATE_RECOVERING;
9f95a23c 1366 lease_expire = {};
7c673cae
FG
1367 dout(10) << "leader_init -- starting paxos recovery" << dendl;
1368 collect(0);
1369}
1370
1371void Paxos::peon_init()
1372{
1373 cancel_events();
1374 new_value.clear();
1375
1376 state = STATE_RECOVERING;
9f95a23c 1377 lease_expire = {};
7c673cae
FG
1378 dout(10) << "peon_init -- i am a peon" << dendl;
1379
1380 // start a timer, in case the leader never manages to issue a lease
1381 reset_lease_timeout();
1382
1383 // discard pending transaction
1384 pending_proposal.reset();
1385
1386 // no chance to write now!
494da23a 1387 reset_pending_committing_finishers();
7c673cae 1388 finish_contexts(g_ceph_context, waiting_for_writeable, -EAGAIN);
7c673cae
FG
1389
1390 logger->inc(l_paxos_start_peon);
1391}
1392
1393void Paxos::restart()
1394{
1395 dout(10) << "restart -- canceling timeouts" << dendl;
1396 cancel_events();
1397 new_value.clear();
1398
1399 if (is_writing() || is_writing_previous()) {
1400 dout(10) << __func__ << " flushing" << dendl;
f67539c2
TL
1401 mon.lock.unlock();
1402 mon.store->flush();
1403 mon.lock.lock();
7c673cae
FG
1404 dout(10) << __func__ << " flushed" << dendl;
1405 }
1406 state = STATE_RECOVERING;
1407
1408 // discard pending transaction
1409 pending_proposal.reset();
1410
494da23a 1411 reset_pending_committing_finishers();
7c673cae
FG
1412 finish_contexts(g_ceph_context, waiting_for_active, -EAGAIN);
1413
1414 logger->inc(l_paxos_restart);
1415}
1416
494da23a
TL
1417void Paxos::reset_pending_committing_finishers()
1418{
1419 committing_finishers.splice(committing_finishers.end(), pending_finishers);
1420 finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
1421}
7c673cae
FG
1422
1423void Paxos::dispatch(MonOpRequestRef op)
1424{
11fdf7f2 1425 ceph_assert(op->is_type_paxos());
7c673cae 1426 op->mark_paxos_event("dispatch");
9f95a23c
TL
1427
1428 if (op->get_req()->get_type() != MSG_MON_PAXOS) {
1429 dout(0) << "Got unexpected message type " << op->get_req()->get_type()
1430 << " in Paxos::dispatch, aborting!" << dendl;
1431 ceph_abort();
1432 }
1433
1434 auto *req = op->get_req<MMonPaxos>();
1435
7c673cae 1436 // election in progress?
f67539c2 1437 if (!mon.is_leader() && !mon.is_peon()) {
9f95a23c 1438 dout(5) << "election in progress, dropping " << *req << dendl;
7c673cae
FG
1439 return;
1440 }
1441
1442 // check sanity
f67539c2
TL
1443 ceph_assert(mon.is_leader() ||
1444 (mon.is_peon() && req->get_source().num() == mon.get_leader()));
9f95a23c
TL
1445
1446 // NOTE: these ops are defined in messages/MMonPaxos.h
1447 switch (req->op) {
1448 // learner
1449 case MMonPaxos::OP_COLLECT:
1450 handle_collect(op);
1451 break;
1452 case MMonPaxos::OP_LAST:
1453 handle_last(op);
1454 break;
1455 case MMonPaxos::OP_BEGIN:
1456 handle_begin(op);
1457 break;
1458 case MMonPaxos::OP_ACCEPT:
1459 handle_accept(op);
1460 break;
1461 case MMonPaxos::OP_COMMIT:
1462 handle_commit(op);
1463 break;
1464 case MMonPaxos::OP_LEASE:
1465 handle_lease(op);
1466 break;
1467 case MMonPaxos::OP_LEASE_ACK:
1468 handle_lease_ack(op);
7c673cae 1469 break;
7c673cae
FG
1470 default:
1471 ceph_abort();
1472 }
1473}
1474
1475
1476// -----------------
1477// service interface
1478
1479// -- READ --
1480
1481bool Paxos::is_readable(version_t v)
1482{
1483 bool ret;
1484 if (v > last_committed)
1485 ret = false;
1486 else
1487 ret =
f67539c2 1488 (mon.is_peon() || mon.is_leader()) &&
7c673cae
FG
1489 (is_active() || is_updating() || is_writing()) &&
1490 last_committed > 0 && is_lease_valid(); // must have a value alone, or have lease
1491 dout(5) << __func__ << " = " << (int)ret
1492 << " - now=" << ceph_clock_now()
1493 << " lease_expire=" << lease_expire
1494 << " has v" << v << " lc " << last_committed
1495 << dendl;
1496 return ret;
1497}
1498
1499bool Paxos::read(version_t v, bufferlist &bl)
1500{
1501 if (!get_store()->get(get_name(), v, bl))
1502 return false;
1503 return true;
1504}
1505
1506version_t Paxos::read_current(bufferlist &bl)
1507{
1508 if (read(last_committed, bl))
1509 return last_committed;
1510 return 0;
1511}
1512
1513
1514bool Paxos::is_lease_valid()
1515{
f67539c2 1516 return ((mon.get_quorum().size() == 1)
9f95a23c 1517 || (ceph::real_clock::now() < lease_expire));
7c673cae
FG
1518}
1519
1520// -- WRITE --
1521
1522bool Paxos::is_writeable()
1523{
1524 return
f67539c2 1525 mon.is_leader() &&
7c673cae
FG
1526 is_active() &&
1527 is_lease_valid();
1528}
1529
1530void Paxos::propose_pending()
1531{
11fdf7f2
TL
1532 ceph_assert(is_active());
1533 ceph_assert(pending_proposal);
7c673cae
FG
1534
1535 cancel_events();
1536
1537 bufferlist bl;
1538 pending_proposal->encode(bl);
1539
1540 dout(10) << __func__ << " " << (last_committed + 1)
1541 << " " << bl.length() << " bytes" << dendl;
1542 dout(30) << __func__ << " transaction dump:\n";
1543 JSONFormatter f(true);
1544 pending_proposal->dump(&f);
1545 f.flush(*_dout);
1546 *_dout << dendl;
1547
1548 pending_proposal.reset();
1549
1550 committing_finishers.swap(pending_finishers);
1551 state = STATE_UPDATING;
1552 begin(bl);
1553}
1554
1555void Paxos::queue_pending_finisher(Context *onfinished)
1556{
1557 dout(5) << __func__ << " " << onfinished << dendl;
11fdf7f2 1558 ceph_assert(onfinished);
7c673cae
FG
1559 pending_finishers.push_back(onfinished);
1560}
1561
1562MonitorDBStore::TransactionRef Paxos::get_pending_transaction()
1563{
f67539c2 1564 ceph_assert(mon.is_leader());
7c673cae
FG
1565 if (!pending_proposal) {
1566 pending_proposal.reset(new MonitorDBStore::Transaction);
11fdf7f2 1567 ceph_assert(pending_finishers.empty());
7c673cae
FG
1568 }
1569 return pending_proposal;
1570}
1571
1572bool Paxos::trigger_propose()
1573{
31f18b77
FG
1574 if (plugged) {
1575 dout(10) << __func__ << " plugged, not proposing now" << dendl;
1576 return false;
1577 } else if (is_active()) {
7c673cae
FG
1578 dout(10) << __func__ << " active, proposing now" << dendl;
1579 propose_pending();
1580 return true;
1581 } else {
1582 dout(10) << __func__ << " not active, will propose later" << dendl;
1583 return false;
1584 }
1585}
1586
1587bool Paxos::is_consistent()
1588{
1589 return (first_committed <= last_committed);
1590}
1591