]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/Paxos.cc
update sources to 12.2.8
[ceph.git] / ceph / src / mon / Paxos.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <sstream>
16#include "Paxos.h"
17#include "Monitor.h"
18#include "messages/MMonPaxos.h"
19
31f18b77 20#include "mon/mon_types.h"
7c673cae
FG
21#include "common/config.h"
22#include "include/assert.h"
23#include "include/stringify.h"
24#include "common/Timer.h"
25#include "messages/PaxosServiceMessage.h"
26
27#define dout_subsys ceph_subsys_paxos
28#undef dout_prefix
29#define dout_prefix _prefix(_dout, mon, mon->name, mon->rank, paxos_name, state, first_committed, last_committed)
30static ostream& _prefix(std::ostream *_dout, Monitor *mon, const string& name,
31 int rank, const string& paxos_name, int state,
32 version_t first_committed, version_t last_committed)
33{
34 return *_dout << "mon." << name << "@" << rank
35 << "(" << mon->get_state_name() << ")"
36 << ".paxos(" << paxos_name << " " << Paxos::get_statename(state)
37 << " c " << first_committed << ".." << last_committed
38 << ") ";
39}
40
41class Paxos::C_Trimmed : public Context {
42 Paxos *paxos;
43public:
44 explicit C_Trimmed(Paxos *p) : paxos(p) { }
45 void finish(int r) override {
46 paxos->trimming = false;
47 }
48};
49
50MonitorDBStore *Paxos::get_store()
51{
52 return mon->store;
53}
54
55void Paxos::read_and_prepare_transactions(MonitorDBStore::TransactionRef tx,
56 version_t first, version_t last)
57{
58 dout(10) << __func__ << " first " << first << " last " << last << dendl;
59 for (version_t v = first; v <= last; ++v) {
60 dout(30) << __func__ << " apply version " << v << dendl;
61 bufferlist bl;
62 int err = get_store()->get(get_name(), v, bl);
63 assert(err == 0);
64 assert(bl.length());
65 decode_append_transaction(tx, bl);
66 }
67 dout(15) << __func__ << " total versions " << (last-first) << dendl;
68}
69
70void Paxos::init()
71{
72 // load paxos variables from stable storage
73 last_pn = get_store()->get(get_name(), "last_pn");
74 accepted_pn = get_store()->get(get_name(), "accepted_pn");
75 last_committed = get_store()->get(get_name(), "last_committed");
76 first_committed = get_store()->get(get_name(), "first_committed");
77
78 dout(10) << __func__ << " last_pn: " << last_pn << " accepted_pn: "
79 << accepted_pn << " last_committed: " << last_committed
80 << " first_committed: " << first_committed << dendl;
81
82 dout(10) << "init" << dendl;
83 assert(is_consistent());
84}
85
86void Paxos::init_logger()
87{
88 PerfCountersBuilder pcb(g_ceph_context, "paxos", l_paxos_first, l_paxos_last);
3efd9988
FG
89
90 // Because monitors are so few in number, the resource cost of capturing
91 // almost all their perf counters at USEFUL is trivial.
92 pcb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
93
7c673cae
FG
94 pcb.add_u64_counter(l_paxos_start_leader, "start_leader", "Starts in leader role");
95 pcb.add_u64_counter(l_paxos_start_peon, "start_peon", "Starts in peon role");
96 pcb.add_u64_counter(l_paxos_restart, "restart", "Restarts");
97 pcb.add_u64_counter(l_paxos_refresh, "refresh", "Refreshes");
98 pcb.add_time_avg(l_paxos_refresh_latency, "refresh_latency", "Refresh latency");
99 pcb.add_u64_counter(l_paxos_begin, "begin", "Started and handled begins");
100 pcb.add_u64_avg(l_paxos_begin_keys, "begin_keys", "Keys in transaction on begin");
1adf2230 101 pcb.add_u64_avg(l_paxos_begin_bytes, "begin_bytes", "Data in transaction on begin", NULL, 0, unit_t(BYTES));
7c673cae
FG
102 pcb.add_time_avg(l_paxos_begin_latency, "begin_latency", "Latency of begin operation");
103 pcb.add_u64_counter(l_paxos_commit, "commit",
104 "Commits", "cmt");
105 pcb.add_u64_avg(l_paxos_commit_keys, "commit_keys", "Keys in transaction on commit");
1adf2230 106 pcb.add_u64_avg(l_paxos_commit_bytes, "commit_bytes", "Data in transaction on commit", NULL, 0, unit_t(BYTES));
7c673cae
FG
107 pcb.add_time_avg(l_paxos_commit_latency, "commit_latency",
108 "Commit latency", "clat");
109 pcb.add_u64_counter(l_paxos_collect, "collect", "Peon collects");
110 pcb.add_u64_avg(l_paxos_collect_keys, "collect_keys", "Keys in transaction on peon collect");
1adf2230 111 pcb.add_u64_avg(l_paxos_collect_bytes, "collect_bytes", "Data in transaction on peon collect", NULL, 0, unit_t(BYTES));
7c673cae
FG
112 pcb.add_time_avg(l_paxos_collect_latency, "collect_latency", "Peon collect latency");
113 pcb.add_u64_counter(l_paxos_collect_uncommitted, "collect_uncommitted", "Uncommitted values in started and handled collects");
114 pcb.add_u64_counter(l_paxos_collect_timeout, "collect_timeout", "Collect timeouts");
115 pcb.add_u64_counter(l_paxos_accept_timeout, "accept_timeout", "Accept timeouts");
116 pcb.add_u64_counter(l_paxos_lease_ack_timeout, "lease_ack_timeout", "Lease acknowledgement timeouts");
117 pcb.add_u64_counter(l_paxos_lease_timeout, "lease_timeout", "Lease timeouts");
118 pcb.add_u64_counter(l_paxos_store_state, "store_state", "Store a shared state on disk");
119 pcb.add_u64_avg(l_paxos_store_state_keys, "store_state_keys", "Keys in transaction in stored state");
1adf2230 120 pcb.add_u64_avg(l_paxos_store_state_bytes, "store_state_bytes", "Data in transaction in stored state", NULL, 0, unit_t(BYTES));
7c673cae
FG
121 pcb.add_time_avg(l_paxos_store_state_latency, "store_state_latency", "Storing state latency");
122 pcb.add_u64_counter(l_paxos_share_state, "share_state", "Sharings of state");
123 pcb.add_u64_avg(l_paxos_share_state_keys, "share_state_keys", "Keys in shared state");
1adf2230 124 pcb.add_u64_avg(l_paxos_share_state_bytes, "share_state_bytes", "Data in shared state", NULL, 0, unit_t(BYTES));
7c673cae
FG
125 pcb.add_u64_counter(l_paxos_new_pn, "new_pn", "New proposal number queries");
126 pcb.add_time_avg(l_paxos_new_pn_latency, "new_pn_latency", "New proposal number getting latency");
127 logger = pcb.create_perf_counters();
128 g_ceph_context->get_perfcounters_collection()->add(logger);
129}
130
131void Paxos::dump_info(Formatter *f)
132{
133 f->open_object_section("paxos");
134 f->dump_unsigned("first_committed", first_committed);
135 f->dump_unsigned("last_committed", last_committed);
136 f->dump_unsigned("last_pn", last_pn);
137 f->dump_unsigned("accepted_pn", accepted_pn);
138 f->close_section();
139}
140
141// ---------------------------------
142
143// PHASE 1
144
145// leader
146void Paxos::collect(version_t oldpn)
147{
148 // we're recoverying, it seems!
149 state = STATE_RECOVERING;
150 assert(mon->is_leader());
151
152 // reset the number of lasts received
153 uncommitted_v = 0;
154 uncommitted_pn = 0;
155 uncommitted_value.clear();
156 peer_first_committed.clear();
157 peer_last_committed.clear();
158
159 // look for uncommitted value
160 if (get_store()->exists(get_name(), last_committed+1)) {
161 version_t v = get_store()->get(get_name(), "pending_v");
162 version_t pn = get_store()->get(get_name(), "pending_pn");
163 if (v && pn && v == last_committed + 1) {
164 uncommitted_pn = pn;
165 } else {
166 dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << accepted_pn
167 << " and crossing our fingers" << dendl;
168 uncommitted_pn = accepted_pn;
169 }
170 uncommitted_v = last_committed+1;
171
172 get_store()->get(get_name(), last_committed+1, uncommitted_value);
173 assert(uncommitted_value.length());
174 dout(10) << "learned uncommitted " << (last_committed+1)
175 << " pn " << uncommitted_pn
176 << " (" << uncommitted_value.length() << " bytes) from myself"
177 << dendl;
178
179 logger->inc(l_paxos_collect_uncommitted);
180 }
181
182 // pick new pn
183 accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn));
184 accepted_pn_from = last_committed;
185 num_last = 1;
186 dout(10) << "collect with pn " << accepted_pn << dendl;
187
188 // send collect
189 for (set<int>::const_iterator p = mon->get_quorum().begin();
190 p != mon->get_quorum().end();
191 ++p) {
192 if (*p == mon->rank) continue;
193
194 MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT,
195 ceph_clock_now());
196 collect->last_committed = last_committed;
197 collect->first_committed = first_committed;
198 collect->pn = accepted_pn;
199 mon->messenger->send_message(collect, mon->monmap->get_inst(*p));
200 }
201
202 // set timeout event
3efd9988
FG
203 collect_timeout_event = mon->timer.add_event_after(
204 g_conf->mon_accept_timeout_factor *
205 g_conf->mon_lease,
206 new C_MonContext(mon, [this](int r) {
7c673cae
FG
207 if (r == -ECANCELED)
208 return;
209 collect_timeout();
3efd9988 210 }));
7c673cae
FG
211}
212
213
214// peon
215void Paxos::handle_collect(MonOpRequestRef op)
216{
217
218 op->mark_paxos_event("handle_collect");
219
220 MMonPaxos *collect = static_cast<MMonPaxos*>(op->get_req());
221 dout(10) << "handle_collect " << *collect << dendl;
222
223 assert(mon->is_peon()); // mon epoch filter should catch strays
224
225 // we're recoverying, it seems!
226 state = STATE_RECOVERING;
227
228 //update the peon recovery timeout
229 reset_lease_timeout();
230
231 if (collect->first_committed > last_committed+1) {
232 dout(2) << __func__
233 << " leader's lowest version is too high for our last committed"
234 << " (theirs: " << collect->first_committed
235 << "; ours: " << last_committed << ") -- bootstrap!" << dendl;
236 op->mark_paxos_event("need to bootstrap");
237 mon->bootstrap();
238 return;
239 }
240
241 // reply
242 MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST,
243 ceph_clock_now());
244 last->last_committed = last_committed;
245 last->first_committed = first_committed;
246
247 version_t previous_pn = accepted_pn;
248
249 // can we accept this pn?
250 if (collect->pn > accepted_pn) {
251 // ok, accept it
252 accepted_pn = collect->pn;
253 accepted_pn_from = collect->pn_from;
254 dout(10) << "accepting pn " << accepted_pn << " from "
255 << accepted_pn_from << dendl;
256
257 auto t(std::make_shared<MonitorDBStore::Transaction>());
258 t->put(get_name(), "accepted_pn", accepted_pn);
259
260 dout(30) << __func__ << " transaction dump:\n";
261 JSONFormatter f(true);
262 t->dump(&f);
263 f.flush(*_dout);
264 *_dout << dendl;
265
266 logger->inc(l_paxos_collect);
267 logger->inc(l_paxos_collect_keys, t->get_keys());
268 logger->inc(l_paxos_collect_bytes, t->get_bytes());
269 utime_t start = ceph_clock_now();
270
271 get_store()->apply_transaction(t);
272
273 utime_t end = ceph_clock_now();
274 logger->tinc(l_paxos_collect_latency, end - start);
275 } else {
276 // don't accept!
277 dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from
278 << ", we already accepted " << accepted_pn
279 << " from " << accepted_pn_from << dendl;
280 }
281 last->pn = accepted_pn;
282 last->pn_from = accepted_pn_from;
283
284 // share whatever committed values we have
285 if (collect->last_committed < last_committed)
286 share_state(last, collect->first_committed, collect->last_committed);
287
288 // do we have an accepted but uncommitted value?
289 // (it'll be at last_committed+1)
290 bufferlist bl;
291 if (collect->last_committed <= last_committed &&
292 get_store()->exists(get_name(), last_committed+1)) {
293 get_store()->get(get_name(), last_committed+1, bl);
294 assert(bl.length() > 0);
295 dout(10) << " sharing our accepted but uncommitted value for "
296 << last_committed+1 << " (" << bl.length() << " bytes)" << dendl;
297 last->values[last_committed+1] = bl;
298
299 version_t v = get_store()->get(get_name(), "pending_v");
300 version_t pn = get_store()->get(get_name(), "pending_pn");
301 if (v && pn && v == last_committed + 1) {
302 last->uncommitted_pn = pn;
303 } else {
304 // previously we didn't record which pn a value was accepted
305 // under! use the pn value we just had... :(
306 dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << previous_pn
307 << " and crossing our fingers" << dendl;
308 last->uncommitted_pn = previous_pn;
309 }
310
311 logger->inc(l_paxos_collect_uncommitted);
312 }
313
314 // send reply
315 collect->get_connection()->send_message(last);
316}
317
318/**
319 * @note This is Okay. We share our versions between peer_last_committed and
320 * our last_committed (inclusive), and add their bufferlists to the
321 * message. It will be the peer's job to apply them to its store, as
322 * these bufferlists will contain raw transactions.
323 * This function is called by both the Peon and the Leader. The Peon will
324 * share the state with the Leader during handle_collect(), sharing any
325 * values the leader may be missing (i.e., the leader's last_committed is
326 * lower than the peon's last_committed). The Leader will share the state
327 * with the Peon during handle_last(), if the peon's last_committed is
328 * lower than the leader's last_committed.
329 */
330void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed,
331 version_t peer_last_committed)
332{
333 assert(peer_last_committed < last_committed);
334
335 dout(10) << "share_state peer has fc " << peer_first_committed
336 << " lc " << peer_last_committed << dendl;
337 version_t v = peer_last_committed + 1;
338
339 // include incrementals
340 uint64_t bytes = 0;
341 for ( ; v <= last_committed; v++) {
342 if (get_store()->exists(get_name(), v)) {
343 get_store()->get(get_name(), v, m->values[v]);
344 assert(m->values[v].length());
345 dout(10) << " sharing " << v << " ("
346 << m->values[v].length() << " bytes)" << dendl;
347 bytes += m->values[v].length() + 16; // paxos_ + 10 digits = 16
348 }
349 }
350 logger->inc(l_paxos_share_state);
351 logger->inc(l_paxos_share_state_keys, m->values.size());
352 logger->inc(l_paxos_share_state_bytes, bytes);
353
354 m->last_committed = last_committed;
355}
356
357/**
358 * Store on disk a state that was shared with us
359 *
360 * Basically, we received a set of version. Or just one. It doesn't matter.
361 * What matters is that we have to stash it in the store. So, we will simply
362 * write every single bufferlist into their own versions on our side (i.e.,
363 * onto paxos-related keys), and then we will decode those same bufferlists
364 * we just wrote and apply the transactions they hold. We will also update
365 * our first and last committed values to point to the new values, if need
366 * be. All all this is done tightly wrapped in a transaction to ensure we
367 * enjoy the atomicity guarantees given by our awesome k/v store.
368 */
369bool Paxos::store_state(MMonPaxos *m)
370{
371 auto t(std::make_shared<MonitorDBStore::Transaction>());
372 map<version_t,bufferlist>::iterator start = m->values.begin();
373 bool changed = false;
374
375 // build map of values to store
376 // we want to write the range [last_committed, m->last_committed] only.
377 if (start != m->values.end() &&
378 start->first > last_committed + 1) {
379 // ignore everything if values start in the future.
380 dout(10) << "store_state ignoring all values, they start at " << start->first
381 << " > last_committed+1" << dendl;
382 return false;
383 }
384
385 // push forward the start position on the message's values iterator, up until
386 // we run out of positions or we find a position matching 'last_committed'.
387 while (start != m->values.end() && start->first <= last_committed) {
388 ++start;
389 }
390
391 // make sure we get the right interval of values to apply by pushing forward
392 // the 'end' iterator until it matches the message's 'last_committed'.
393 map<version_t,bufferlist>::iterator end = start;
394 while (end != m->values.end() && end->first <= m->last_committed) {
395 last_committed = end->first;
396 ++end;
397 }
398
399 if (start == end) {
400 dout(10) << "store_state nothing to commit" << dendl;
401 } else {
402 dout(10) << "store_state [" << start->first << ".."
403 << last_committed << "]" << dendl;
404 t->put(get_name(), "last_committed", last_committed);
405
406 // we should apply the state here -- decode every single bufferlist in the
407 // map and append the transactions to 't'.
408 map<version_t,bufferlist>::iterator it;
409 for (it = start; it != end; ++it) {
410 // write the bufferlist as the version's value
411 t->put(get_name(), it->first, it->second);
412 // decode the bufferlist and append it to the transaction we will shortly
413 // apply.
414 decode_append_transaction(t, it->second);
415 }
416
417 // discard obsolete uncommitted value?
418 if (uncommitted_v && uncommitted_v <= last_committed) {
419 dout(10) << " forgetting obsolete uncommitted value " << uncommitted_v
420 << " pn " << uncommitted_pn << dendl;
421 uncommitted_v = 0;
422 uncommitted_pn = 0;
423 uncommitted_value.clear();
424 }
425 }
426 if (!t->empty()) {
427 dout(30) << __func__ << " transaction dump:\n";
428 JSONFormatter f(true);
429 t->dump(&f);
430 f.flush(*_dout);
431 *_dout << dendl;
432
433 logger->inc(l_paxos_store_state);
434 logger->inc(l_paxos_store_state_bytes, t->get_bytes());
435 logger->inc(l_paxos_store_state_keys, t->get_keys());
436 utime_t start = ceph_clock_now();
437
438 get_store()->apply_transaction(t);
439
440 utime_t end = ceph_clock_now();
441 logger->tinc(l_paxos_store_state_latency, end - start);
442
443 // refresh first_committed; this txn may have trimmed.
444 first_committed = get_store()->get(get_name(), "first_committed");
445
446 _sanity_check_store();
447 changed = true;
448 }
449
450 return changed;
451}
452
453void Paxos::_sanity_check_store()
454{
455 version_t lc = get_store()->get(get_name(), "last_committed");
456 assert(lc == last_committed);
457}
458
459
460// leader
461void Paxos::handle_last(MonOpRequestRef op)
462{
463 op->mark_paxos_event("handle_last");
464 MMonPaxos *last = static_cast<MMonPaxos*>(op->get_req());
465 bool need_refresh = false;
466 int from = last->get_source().num();
467
468 dout(10) << "handle_last " << *last << dendl;
469
470 if (!mon->is_leader()) {
471 dout(10) << "not leader, dropping" << dendl;
472 return;
473 }
474
475 // note peer's first_ and last_committed, in case we learn a new
476 // commit and need to push it to them.
477 peer_first_committed[from] = last->first_committed;
478 peer_last_committed[from] = last->last_committed;
479
480 if (last->first_committed > last_committed + 1) {
481 dout(5) << __func__
482 << " mon." << from
483 << " lowest version is too high for our last committed"
484 << " (theirs: " << last->first_committed
485 << "; ours: " << last_committed << ") -- bootstrap!" << dendl;
486 op->mark_paxos_event("need to bootstrap");
487 mon->bootstrap();
488 return;
489 }
490
491 assert(g_conf->paxos_kill_at != 1);
492
493 // store any committed values if any are specified in the message
494 need_refresh = store_state(last);
495
496 assert(g_conf->paxos_kill_at != 2);
497
498 // is everyone contiguous and up to date?
499 for (map<int,version_t>::iterator p = peer_last_committed.begin();
500 p != peer_last_committed.end();
501 ++p) {
502 if (p->second + 1 < first_committed && first_committed > 1) {
503 dout(5) << __func__
504 << " peon " << p->first
505 << " last_committed (" << p->second
506 << ") is too low for our first_committed (" << first_committed
507 << ") -- bootstrap!" << dendl;
508 op->mark_paxos_event("need to bootstrap");
509 mon->bootstrap();
510 return;
511 }
512 if (p->second < last_committed) {
513 // share committed values
514 dout(10) << " sending commit to mon." << p->first << dendl;
515 MMonPaxos *commit = new MMonPaxos(mon->get_epoch(),
516 MMonPaxos::OP_COMMIT,
517 ceph_clock_now());
518 share_state(commit, peer_first_committed[p->first], p->second);
519 mon->messenger->send_message(commit, mon->monmap->get_inst(p->first));
520 }
521 }
522
523 // do they accept your pn?
524 if (last->pn > accepted_pn) {
525 // no, try again.
526 dout(10) << " they had a higher pn than us, picking a new one." << dendl;
527
528 // cancel timeout event
529 mon->timer.cancel_event(collect_timeout_event);
530 collect_timeout_event = 0;
531
532 collect(last->pn);
533 } else if (last->pn == accepted_pn) {
534 // yes, they accepted our pn. great.
535 num_last++;
536 dout(10) << " they accepted our pn, we now have "
537 << num_last << " peons" << dendl;
538
539 // did this person send back an accepted but uncommitted value?
540 if (last->uncommitted_pn) {
541 if (last->uncommitted_pn >= uncommitted_pn &&
542 last->last_committed >= last_committed &&
543 last->last_committed + 1 >= uncommitted_v) {
544 uncommitted_v = last->last_committed+1;
545 uncommitted_pn = last->uncommitted_pn;
546 uncommitted_value = last->values[uncommitted_v];
547 dout(10) << "we learned an uncommitted value for " << uncommitted_v
548 << " pn " << uncommitted_pn
549 << " " << uncommitted_value.length() << " bytes"
550 << dendl;
551 } else {
552 dout(10) << "ignoring uncommitted value for " << (last->last_committed+1)
553 << " pn " << last->uncommitted_pn
554 << " " << last->values[last->last_committed+1].length() << " bytes"
555 << dendl;
556 }
557 }
558
559 // is that everyone?
560 if (num_last == mon->get_quorum().size()) {
561 // cancel timeout event
562 mon->timer.cancel_event(collect_timeout_event);
563 collect_timeout_event = 0;
564 peer_first_committed.clear();
565 peer_last_committed.clear();
566
567 // almost...
568
569 // did we learn an old value?
570 if (uncommitted_v == last_committed+1 &&
571 uncommitted_value.length()) {
572 dout(10) << "that's everyone. begin on old learned value" << dendl;
573 state = STATE_UPDATING_PREVIOUS;
574 begin(uncommitted_value);
575 } else {
576 // active!
577 dout(10) << "that's everyone. active!" << dendl;
578 extend_lease();
579
580 need_refresh = false;
581 if (do_refresh()) {
582 finish_round();
583 }
584 }
585 }
586 } else {
587 // no, this is an old message, discard
588 dout(10) << "old pn, ignoring" << dendl;
589 }
590
591 if (need_refresh)
592 (void)do_refresh();
593}
594
595void Paxos::collect_timeout()
596{
597 dout(1) << "collect timeout, calling fresh election" << dendl;
598 collect_timeout_event = 0;
599 logger->inc(l_paxos_collect_timeout);
600 assert(mon->is_leader());
601 mon->bootstrap();
602}
603
604
605// leader
606void Paxos::begin(bufferlist& v)
607{
608 dout(10) << "begin for " << last_committed+1 << " "
609 << v.length() << " bytes"
610 << dendl;
611
612 assert(mon->is_leader());
613 assert(is_updating() || is_updating_previous());
614
615 // we must already have a majority for this to work.
616 assert(mon->get_quorum().size() == 1 ||
617 num_last > (unsigned)mon->monmap->size()/2);
618
619 // and no value, yet.
620 assert(new_value.length() == 0);
621
622 // accept it ourselves
623 accepted.clear();
624 accepted.insert(mon->rank);
625 new_value = v;
626
627 if (last_committed == 0) {
628 auto t(std::make_shared<MonitorDBStore::Transaction>());
629 // initial base case; set first_committed too
630 t->put(get_name(), "first_committed", 1);
631 decode_append_transaction(t, new_value);
632
633 bufferlist tx_bl;
634 t->encode(tx_bl);
635
636 new_value = tx_bl;
637 }
638
639 // store the proposed value in the store. IF it is accepted, we will then
640 // have to decode it into a transaction and apply it.
641 auto t(std::make_shared<MonitorDBStore::Transaction>());
642 t->put(get_name(), last_committed+1, new_value);
643
644 // note which pn this pending value is for.
645 t->put(get_name(), "pending_v", last_committed + 1);
646 t->put(get_name(), "pending_pn", accepted_pn);
647
648 dout(30) << __func__ << " transaction dump:\n";
649 JSONFormatter f(true);
650 t->dump(&f);
651 f.flush(*_dout);
652 auto debug_tx(std::make_shared<MonitorDBStore::Transaction>());
653 bufferlist::iterator new_value_it = new_value.begin();
654 debug_tx->decode(new_value_it);
655 debug_tx->dump(&f);
656 *_dout << "\nbl dump:\n";
657 f.flush(*_dout);
658 *_dout << dendl;
659
660 logger->inc(l_paxos_begin);
661 logger->inc(l_paxos_begin_keys, t->get_keys());
662 logger->inc(l_paxos_begin_bytes, t->get_bytes());
663 utime_t start = ceph_clock_now();
664
665 get_store()->apply_transaction(t);
666
667 utime_t end = ceph_clock_now();
668 logger->tinc(l_paxos_begin_latency, end - start);
669
670 assert(g_conf->paxos_kill_at != 3);
671
672 if (mon->get_quorum().size() == 1) {
673 // we're alone, take it easy
674 commit_start();
675 return;
676 }
677
678 // ask others to accept it too!
679 for (set<int>::const_iterator p = mon->get_quorum().begin();
680 p != mon->get_quorum().end();
681 ++p) {
682 if (*p == mon->rank) continue;
683
684 dout(10) << " sending begin to mon." << *p << dendl;
685 MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN,
686 ceph_clock_now());
687 begin->values[last_committed+1] = new_value;
688 begin->last_committed = last_committed;
689 begin->pn = accepted_pn;
690
691 mon->messenger->send_message(begin, mon->monmap->get_inst(*p));
692 }
693
694 // set timeout event
3efd9988
FG
695 accept_timeout_event = mon->timer.add_event_after(
696 g_conf->mon_accept_timeout_factor * g_conf->mon_lease,
697 new C_MonContext(mon, [this](int r) {
698 if (r == -ECANCELED)
699 return;
700 accept_timeout();
701 }));
7c673cae
FG
702}
703
704// peon
705void Paxos::handle_begin(MonOpRequestRef op)
706{
707 op->mark_paxos_event("handle_begin");
708 MMonPaxos *begin = static_cast<MMonPaxos*>(op->get_req());
709 dout(10) << "handle_begin " << *begin << dendl;
710
711 // can we accept this?
712 if (begin->pn < accepted_pn) {
713 dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
714 op->mark_paxos_event("have higher pn, ignore");
715 return;
716 }
717 assert(begin->pn == accepted_pn);
718 assert(begin->last_committed == last_committed);
719
720 assert(g_conf->paxos_kill_at != 4);
721
722 logger->inc(l_paxos_begin);
723
724 // set state.
725 state = STATE_UPDATING;
726 lease_expire = utime_t(); // cancel lease
727
728 // yes.
729 version_t v = last_committed+1;
730 dout(10) << "accepting value for " << v << " pn " << accepted_pn << dendl;
731 // store the accepted value onto our store. We will have to decode it and
732 // apply its transaction once we receive permission to commit.
733 auto t(std::make_shared<MonitorDBStore::Transaction>());
734 t->put(get_name(), v, begin->values[v]);
735
736 // note which pn this pending value is for.
737 t->put(get_name(), "pending_v", v);
738 t->put(get_name(), "pending_pn", accepted_pn);
739
740 dout(30) << __func__ << " transaction dump:\n";
741 JSONFormatter f(true);
742 t->dump(&f);
743 f.flush(*_dout);
744 *_dout << dendl;
745
746 logger->inc(l_paxos_begin_bytes, t->get_bytes());
747 utime_t start = ceph_clock_now();
748
749 get_store()->apply_transaction(t);
750
751 utime_t end = ceph_clock_now();
752 logger->tinc(l_paxos_begin_latency, end - start);
753
754 assert(g_conf->paxos_kill_at != 5);
755
756 // reply
757 MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT,
758 ceph_clock_now());
759 accept->pn = accepted_pn;
760 accept->last_committed = last_committed;
761 begin->get_connection()->send_message(accept);
762}
763
764// leader
765void Paxos::handle_accept(MonOpRequestRef op)
766{
767 op->mark_paxos_event("handle_accept");
768 MMonPaxos *accept = static_cast<MMonPaxos*>(op->get_req());
769 dout(10) << "handle_accept " << *accept << dendl;
770 int from = accept->get_source().num();
771
772 if (accept->pn != accepted_pn) {
773 // we accepted a higher pn, from some other leader
774 dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
775 op->mark_paxos_event("have higher pn, ignore");
776 return;
777 }
778 if (last_committed > 0 &&
779 accept->last_committed < last_committed-1) {
780 dout(10) << " this is from an old round, ignoring" << dendl;
781 op->mark_paxos_event("old round, ignore");
782 return;
783 }
784 assert(accept->last_committed == last_committed || // not committed
785 accept->last_committed == last_committed-1); // committed
786
787 assert(is_updating() || is_updating_previous());
788 assert(accepted.count(from) == 0);
789 accepted.insert(from);
790 dout(10) << " now " << accepted << " have accepted" << dendl;
791
792 assert(g_conf->paxos_kill_at != 6);
793
794 // only commit (and expose committed state) when we get *all* quorum
795 // members to accept. otherwise, they may still be sharing the now
796 // stale state.
797 // FIXME: we can improve this with an additional lease revocation message
798 // that doesn't block for the persist.
799 if (accepted == mon->get_quorum()) {
800 // yay, commit!
801 dout(10) << " got majority, committing, done with update" << dendl;
802 op->mark_paxos_event("commit_start");
803 commit_start();
804 }
805}
806
807void Paxos::accept_timeout()
808{
809 dout(1) << "accept timeout, calling fresh election" << dendl;
810 accept_timeout_event = 0;
811 assert(mon->is_leader());
812 assert(is_updating() || is_updating_previous() || is_writing() ||
813 is_writing_previous());
814 logger->inc(l_paxos_accept_timeout);
815 mon->bootstrap();
816}
817
818struct C_Committed : public Context {
819 Paxos *paxos;
820 explicit C_Committed(Paxos *p) : paxos(p) {}
821 void finish(int r) override {
822 assert(r >= 0);
823 Mutex::Locker l(paxos->mon->lock);
35e4c445
FG
824 if (paxos->is_shutdown()) {
825 paxos->abort_commit();
826 return;
827 }
7c673cae
FG
828 paxos->commit_finish();
829 }
830};
831
35e4c445
FG
832void Paxos::abort_commit()
833{
834 assert(commits_started > 0);
835 --commits_started;
836 if (commits_started == 0)
837 shutdown_cond.Signal();
838}
839
7c673cae
FG
840void Paxos::commit_start()
841{
842 dout(10) << __func__ << " " << (last_committed+1) << dendl;
843
844 assert(g_conf->paxos_kill_at != 7);
845
846 auto t(std::make_shared<MonitorDBStore::Transaction>());
847
848 // commit locally
849 t->put(get_name(), "last_committed", last_committed + 1);
850
851 // decode the value and apply its transaction to the store.
852 // this value can now be read from last_committed.
853 decode_append_transaction(t, new_value);
854
855 dout(30) << __func__ << " transaction dump:\n";
856 JSONFormatter f(true);
857 t->dump(&f);
858 f.flush(*_dout);
859 *_dout << dendl;
860
861 logger->inc(l_paxos_commit);
862 logger->inc(l_paxos_commit_keys, t->get_keys());
863 logger->inc(l_paxos_commit_bytes, t->get_bytes());
864 commit_start_stamp = ceph_clock_now();
865
866 get_store()->queue_transaction(t, new C_Committed(this));
867
868 if (is_updating_previous())
869 state = STATE_WRITING_PREVIOUS;
870 else if (is_updating())
871 state = STATE_WRITING;
872 else
873 ceph_abort();
35e4c445 874 ++commits_started;
7c673cae
FG
875
876 if (mon->get_quorum().size() > 1) {
877 // cancel timeout event
878 mon->timer.cancel_event(accept_timeout_event);
879 accept_timeout_event = 0;
880 }
881}
882
883void Paxos::commit_finish()
884{
885 dout(20) << __func__ << " " << (last_committed+1) << dendl;
886 utime_t end = ceph_clock_now();
887 logger->tinc(l_paxos_commit_latency, end - commit_start_stamp);
888
889 assert(g_conf->paxos_kill_at != 8);
890
891 // cancel lease - it was for the old value.
892 // (this would only happen if message layer lost the 'begin', but
893 // leader still got a majority and committed with out us.)
894 lease_expire = utime_t(); // cancel lease
895
896 last_committed++;
897 last_commit_time = ceph_clock_now();
898
899 // refresh first_committed; this txn may have trimmed.
900 first_committed = get_store()->get(get_name(), "first_committed");
901
902 _sanity_check_store();
903
904 // tell everyone
905 for (set<int>::const_iterator p = mon->get_quorum().begin();
906 p != mon->get_quorum().end();
907 ++p) {
908 if (*p == mon->rank) continue;
909
910 dout(10) << " sending commit to mon." << *p << dendl;
911 MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,
912 ceph_clock_now());
913 commit->values[last_committed] = new_value;
914 commit->pn = accepted_pn;
915 commit->last_committed = last_committed;
916
917 mon->messenger->send_message(commit, mon->monmap->get_inst(*p));
918 }
919
920 assert(g_conf->paxos_kill_at != 9);
921
922 // get ready for a new round.
923 new_value.clear();
924
925 // WRITING -> REFRESH
926 // among other things, this lets do_refresh() -> mon->bootstrap() know
927 // it doesn't need to flush the store queue
928 assert(is_writing() || is_writing_previous());
929 state = STATE_REFRESH;
35e4c445
FG
930 assert(commits_started > 0);
931 --commits_started;
7c673cae
FG
932
933 if (do_refresh()) {
934 commit_proposal();
935 if (mon->get_quorum().size() > 1) {
936 extend_lease();
937 }
938
939 finish_contexts(g_ceph_context, waiting_for_commit);
940
941 assert(g_conf->paxos_kill_at != 10);
942
943 finish_round();
944 }
945}
946
947
948void Paxos::handle_commit(MonOpRequestRef op)
949{
950 op->mark_paxos_event("handle_commit");
951 MMonPaxos *commit = static_cast<MMonPaxos*>(op->get_req());
952 dout(10) << "handle_commit on " << commit->last_committed << dendl;
953
954 logger->inc(l_paxos_commit);
955
956 if (!mon->is_peon()) {
957 dout(10) << "not a peon, dropping" << dendl;
958 ceph_abort();
959 return;
960 }
961
962 op->mark_paxos_event("store_state");
963 store_state(commit);
964
965 if (do_refresh()) {
966 finish_contexts(g_ceph_context, waiting_for_commit);
967 }
968}
969
970void Paxos::extend_lease()
971{
972 assert(mon->is_leader());
973 //assert(is_active());
974
975 lease_expire = ceph_clock_now();
976 lease_expire += g_conf->mon_lease;
977 acked_lease.clear();
978 acked_lease.insert(mon->rank);
979
980 dout(7) << "extend_lease now+" << g_conf->mon_lease
981 << " (" << lease_expire << ")" << dendl;
982
983 // bcast
984 for (set<int>::const_iterator p = mon->get_quorum().begin();
985 p != mon->get_quorum().end(); ++p) {
986
987 if (*p == mon->rank) continue;
988 MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE,
989 ceph_clock_now());
990 lease->last_committed = last_committed;
991 lease->lease_timestamp = lease_expire;
992 lease->first_committed = first_committed;
993 mon->messenger->send_message(lease, mon->monmap->get_inst(*p));
994 }
995
996 // set timeout event.
997 // if old timeout is still in place, leave it.
998 if (!lease_ack_timeout_event) {
3efd9988
FG
999 lease_ack_timeout_event = mon->timer.add_event_after(
1000 g_conf->mon_lease_ack_timeout_factor * g_conf->mon_lease,
1001 new C_MonContext(mon, [this](int r) {
1002 if (r == -ECANCELED)
1003 return;
1004 lease_ack_timeout();
1005 }));
7c673cae
FG
1006 }
1007
1008 // set renew event
7c673cae
FG
1009 utime_t at = lease_expire;
1010 at -= g_conf->mon_lease;
1011 at += g_conf->mon_lease_renew_interval_factor * g_conf->mon_lease;
3efd9988
FG
1012 lease_renew_event = mon->timer.add_event_at(
1013 at, new C_MonContext(mon, [this](int r) {
1014 if (r == -ECANCELED)
1015 return;
1016 lease_renew_timeout();
1017 }));
7c673cae
FG
1018}
1019
1020void Paxos::warn_on_future_time(utime_t t, entity_name_t from)
1021{
1022 utime_t now = ceph_clock_now();
1023 if (t > now) {
1024 utime_t diff = t - now;
1025 if (diff > g_conf->mon_clock_drift_allowed) {
1026 utime_t warn_diff = now - last_clock_drift_warn;
1027 if (warn_diff >
1028 pow(g_conf->mon_clock_drift_warn_backoff, clock_drift_warned)) {
1029 mon->clog->warn() << "message from " << from << " was stamped " << diff
1030 << "s in the future, clocks not synchronized";
1031 last_clock_drift_warn = ceph_clock_now();
1032 ++clock_drift_warned;
1033 }
1034 }
1035 }
1036
1037}
1038
1039bool Paxos::do_refresh()
1040{
1041 bool need_bootstrap = false;
1042
1043 utime_t start = ceph_clock_now();
1044
1045 // make sure we have the latest state loaded up
1046 mon->refresh_from_paxos(&need_bootstrap);
1047
1048 utime_t end = ceph_clock_now();
1049 logger->inc(l_paxos_refresh);
1050 logger->tinc(l_paxos_refresh_latency, end - start);
1051
1052 if (need_bootstrap) {
1053 dout(10) << " doing requested bootstrap" << dendl;
1054 mon->bootstrap();
1055 return false;
1056 }
1057
1058 return true;
1059}
1060
1061void Paxos::commit_proposal()
1062{
1063 dout(10) << __func__ << dendl;
1064 assert(mon->is_leader());
1065 assert(is_refresh());
1066
1067 finish_contexts(g_ceph_context, committing_finishers);
1068}
1069
1070void Paxos::finish_round()
1071{
1072 dout(10) << __func__ << dendl;
1073 assert(mon->is_leader());
1074
1075 // ok, now go active!
1076 state = STATE_ACTIVE;
1077
1078 dout(20) << __func__ << " waiting_for_acting" << dendl;
1079 finish_contexts(g_ceph_context, waiting_for_active);
1080 dout(20) << __func__ << " waiting_for_readable" << dendl;
1081 finish_contexts(g_ceph_context, waiting_for_readable);
1082 dout(20) << __func__ << " waiting_for_writeable" << dendl;
1083 finish_contexts(g_ceph_context, waiting_for_writeable);
1084
1085 dout(10) << __func__ << " done w/ waiters, state " << get_statename(state) << dendl;
1086
1087 if (should_trim()) {
1088 trim();
1089 }
1090
1091 if (is_active() && pending_proposal) {
1092 propose_pending();
1093 }
1094}
1095
1096
1097// peon
1098void Paxos::handle_lease(MonOpRequestRef op)
1099{
1100 op->mark_paxos_event("handle_lease");
1101 MMonPaxos *lease = static_cast<MMonPaxos*>(op->get_req());
1102 // sanity
1103 if (!mon->is_peon() ||
1104 last_committed != lease->last_committed) {
1105 dout(10) << "handle_lease i'm not a peon, or they're not the leader,"
1106 << " or the last_committed doesn't match, dropping" << dendl;
1107 op->mark_paxos_event("invalid lease, ignore");
1108 return;
1109 }
1110
1111 warn_on_future_time(lease->sent_timestamp, lease->get_source());
1112
1113 // extend lease
1114 if (lease_expire < lease->lease_timestamp) {
1115 lease_expire = lease->lease_timestamp;
1116
1117 utime_t now = ceph_clock_now();
1118 if (lease_expire < now) {
1119 utime_t diff = now - lease_expire;
1120 derr << "lease_expire from " << lease->get_source_inst() << " is " << diff << " seconds in the past; mons are probably laggy (or possibly clocks are too skewed)" << dendl;
1121 }
1122 }
1123
1124 state = STATE_ACTIVE;
1125
1126 dout(10) << "handle_lease on " << lease->last_committed
1127 << " now " << lease_expire << dendl;
1128
1129 // ack
1130 MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK,
1131 ceph_clock_now());
1132 ack->last_committed = last_committed;
1133 ack->first_committed = first_committed;
1134 ack->lease_timestamp = ceph_clock_now();
31f18b77 1135 ::encode(mon->session_map.feature_map, ack->feature_map);
7c673cae
FG
1136 lease->get_connection()->send_message(ack);
1137
1138 // (re)set timeout event.
1139 reset_lease_timeout();
1140
1141 // kick waiters
1142 finish_contexts(g_ceph_context, waiting_for_active);
1143 if (is_readable())
1144 finish_contexts(g_ceph_context, waiting_for_readable);
1145}
1146
1147void Paxos::handle_lease_ack(MonOpRequestRef op)
1148{
1149 op->mark_paxos_event("handle_lease_ack");
1150 MMonPaxos *ack = static_cast<MMonPaxos*>(op->get_req());
1151 int from = ack->get_source().num();
1152
1153 if (!lease_ack_timeout_event) {
224ce89b 1154 dout(10) << "handle_lease_ack from " << ack->get_source()
7c673cae 1155 << " -- stray (probably since revoked)" << dendl;
224ce89b
WB
1156
1157 } else if (acked_lease.count(from) == 0) {
7c673cae 1158 acked_lease.insert(from);
31f18b77
FG
1159 if (ack->feature_map.length()) {
1160 auto p = ack->feature_map.begin();
1161 FeatureMap& t = mon->quorum_feature_map[from];
1162 ::decode(t, p);
1163 }
7c673cae
FG
1164 if (acked_lease == mon->get_quorum()) {
1165 // yay!
224ce89b 1166 dout(10) << "handle_lease_ack from " << ack->get_source()
7c673cae
FG
1167 << " -- got everyone" << dendl;
1168 mon->timer.cancel_event(lease_ack_timeout_event);
1169 lease_ack_timeout_event = 0;
1170
1171
1172 } else {
224ce89b 1173 dout(10) << "handle_lease_ack from " << ack->get_source()
7c673cae
FG
1174 << " -- still need "
1175 << mon->get_quorum().size() - acked_lease.size()
1176 << " more" << dendl;
1177 }
1178 } else {
224ce89b 1179 dout(10) << "handle_lease_ack from " << ack->get_source()
7c673cae
FG
1180 << " dup (lagging!), ignoring" << dendl;
1181 }
1182
1183 warn_on_future_time(ack->sent_timestamp, ack->get_source());
1184}
1185
1186void Paxos::lease_ack_timeout()
1187{
1188 dout(1) << "lease_ack_timeout -- calling new election" << dendl;
1189 assert(mon->is_leader());
1190 assert(is_active());
1191 logger->inc(l_paxos_lease_ack_timeout);
1192 lease_ack_timeout_event = 0;
1193 mon->bootstrap();
1194}
1195
1196void Paxos::reset_lease_timeout()
1197{
1198 dout(20) << "reset_lease_timeout - setting timeout event" << dendl;
1199 if (lease_timeout_event)
1200 mon->timer.cancel_event(lease_timeout_event);
3efd9988
FG
1201 lease_timeout_event = mon->timer.add_event_after(
1202 g_conf->mon_lease_ack_timeout_factor * g_conf->mon_lease,
1203 new C_MonContext(mon, [this](int r) {
1204 if (r == -ECANCELED)
1205 return;
1206 lease_timeout();
1207 }));
7c673cae
FG
1208}
1209
1210void Paxos::lease_timeout()
1211{
1212 dout(1) << "lease_timeout -- calling new election" << dendl;
1213 assert(mon->is_peon());
1214 logger->inc(l_paxos_lease_timeout);
1215 lease_timeout_event = 0;
1216 mon->bootstrap();
1217}
1218
1219void Paxos::lease_renew_timeout()
1220{
1221 lease_renew_event = 0;
1222 extend_lease();
1223}
1224
1225
1226/*
1227 * trim old states
1228 */
1229void Paxos::trim()
1230{
1231 assert(should_trim());
1232 version_t end = MIN(get_version() - g_conf->paxos_min,
1233 get_first_committed() + g_conf->paxos_trim_max);
1234
1235 if (first_committed >= end)
1236 return;
1237
1238 dout(10) << "trim to " << end << " (was " << first_committed << ")" << dendl;
1239
1240 MonitorDBStore::TransactionRef t = get_pending_transaction();
1241
1242 for (version_t v = first_committed; v < end; ++v) {
1243 dout(10) << "trim " << v << dendl;
1244 t->erase(get_name(), v);
1245 }
1246 t->put(get_name(), "first_committed", end);
1247 if (g_conf->mon_compact_on_trim) {
1248 dout(10) << " compacting trimmed range" << dendl;
1249 t->compact_range(get_name(), stringify(first_committed - 1), stringify(end));
1250 }
1251
1252 trimming = true;
1253 queue_pending_finisher(new C_Trimmed(this));
1254}
1255
1256/*
1257 * return a globally unique, monotonically increasing proposal number
1258 */
1259version_t Paxos::get_new_proposal_number(version_t gt)
1260{
1261 if (last_pn < gt)
1262 last_pn = gt;
1263
1264 // update. make it unique among all monitors.
1265 last_pn /= 100;
1266 last_pn++;
1267 last_pn *= 100;
1268 last_pn += (version_t)mon->rank;
1269
1270 // write
1271 auto t(std::make_shared<MonitorDBStore::Transaction>());
1272 t->put(get_name(), "last_pn", last_pn);
1273
1274 dout(30) << __func__ << " transaction dump:\n";
1275 JSONFormatter f(true);
1276 t->dump(&f);
1277 f.flush(*_dout);
1278 *_dout << dendl;
1279
1280 logger->inc(l_paxos_new_pn);
1281 utime_t start = ceph_clock_now();
1282
1283 get_store()->apply_transaction(t);
1284
1285 utime_t end = ceph_clock_now();
1286 logger->tinc(l_paxos_new_pn_latency, end - start);
1287
1288 dout(10) << "get_new_proposal_number = " << last_pn << dendl;
1289 return last_pn;
1290}
1291
1292
1293void Paxos::cancel_events()
1294{
1295 if (collect_timeout_event) {
1296 mon->timer.cancel_event(collect_timeout_event);
1297 collect_timeout_event = 0;
1298 }
1299 if (accept_timeout_event) {
1300 mon->timer.cancel_event(accept_timeout_event);
1301 accept_timeout_event = 0;
1302 }
1303 if (lease_renew_event) {
1304 mon->timer.cancel_event(lease_renew_event);
1305 lease_renew_event = 0;
1306 }
1307 if (lease_ack_timeout_event) {
1308 mon->timer.cancel_event(lease_ack_timeout_event);
1309 lease_ack_timeout_event = 0;
1310 }
1311 if (lease_timeout_event) {
1312 mon->timer.cancel_event(lease_timeout_event);
1313 lease_timeout_event = 0;
1314 }
1315}
1316
1317void Paxos::shutdown()
1318{
1319 dout(10) << __func__ << " cancel all contexts" << dendl;
1320
35e4c445
FG
1321 state = STATE_SHUTDOWN;
1322
7c673cae
FG
1323 // discard pending transaction
1324 pending_proposal.reset();
1325
35e4c445
FG
1326 // Let store finish commits in progress
1327 // XXX: I assume I can't use finish_contexts() because the store
1328 // is going to trigger
1329 while(commits_started > 0)
1330 shutdown_cond.Wait(mon->lock);
1331
7c673cae
FG
1332 finish_contexts(g_ceph_context, waiting_for_writeable, -ECANCELED);
1333 finish_contexts(g_ceph_context, waiting_for_commit, -ECANCELED);
1334 finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED);
1335 finish_contexts(g_ceph_context, waiting_for_active, -ECANCELED);
1336 finish_contexts(g_ceph_context, pending_finishers, -ECANCELED);
1337 finish_contexts(g_ceph_context, committing_finishers, -ECANCELED);
1338 if (logger)
1339 g_ceph_context->get_perfcounters_collection()->remove(logger);
1340 delete logger;
1341}
1342
1343void Paxos::leader_init()
1344{
1345 cancel_events();
1346 new_value.clear();
1347
1348 // discard pending transaction
1349 pending_proposal.reset();
1350
1351 finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
1352 finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
1353
1354 logger->inc(l_paxos_start_leader);
1355
1356 if (mon->get_quorum().size() == 1) {
1357 state = STATE_ACTIVE;
1358 return;
1359 }
1360
1361 state = STATE_RECOVERING;
1362 lease_expire = utime_t();
1363 dout(10) << "leader_init -- starting paxos recovery" << dendl;
1364 collect(0);
1365}
1366
1367void Paxos::peon_init()
1368{
1369 cancel_events();
1370 new_value.clear();
1371
1372 state = STATE_RECOVERING;
1373 lease_expire = utime_t();
1374 dout(10) << "peon_init -- i am a peon" << dendl;
1375
1376 // start a timer, in case the leader never manages to issue a lease
1377 reset_lease_timeout();
1378
1379 // discard pending transaction
1380 pending_proposal.reset();
1381
1382 // no chance to write now!
1383 finish_contexts(g_ceph_context, waiting_for_writeable, -EAGAIN);
1384 finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
1385 finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
1386 finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
1387
1388 logger->inc(l_paxos_start_peon);
1389}
1390
1391void Paxos::restart()
1392{
1393 dout(10) << "restart -- canceling timeouts" << dendl;
1394 cancel_events();
1395 new_value.clear();
1396
1397 if (is_writing() || is_writing_previous()) {
1398 dout(10) << __func__ << " flushing" << dendl;
1399 mon->lock.Unlock();
1400 mon->store->flush();
1401 mon->lock.Lock();
1402 dout(10) << __func__ << " flushed" << dendl;
1403 }
1404 state = STATE_RECOVERING;
1405
1406 // discard pending transaction
1407 pending_proposal.reset();
1408
1409 finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
1410 finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
1411 finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
1412 finish_contexts(g_ceph_context, waiting_for_active, -EAGAIN);
1413
1414 logger->inc(l_paxos_restart);
1415}
1416
1417
1418void Paxos::dispatch(MonOpRequestRef op)
1419{
1420 assert(op->is_type_paxos());
1421 op->mark_paxos_event("dispatch");
1422 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
1423 // election in progress?
1424 if (!mon->is_leader() && !mon->is_peon()) {
1425 dout(5) << "election in progress, dropping " << *m << dendl;
1426 return;
1427 }
1428
1429 // check sanity
1430 assert(mon->is_leader() ||
1431 (mon->is_peon() && m->get_source().num() == mon->get_leader()));
1432
1433 switch (m->get_type()) {
1434
1435 case MSG_MON_PAXOS:
1436 {
1437 MMonPaxos *pm = reinterpret_cast<MMonPaxos*>(m);
1438
1439 // NOTE: these ops are defined in messages/MMonPaxos.h
1440 switch (pm->op) {
1441 // learner
1442 case MMonPaxos::OP_COLLECT:
1443 handle_collect(op);
1444 break;
1445 case MMonPaxos::OP_LAST:
1446 handle_last(op);
1447 break;
1448 case MMonPaxos::OP_BEGIN:
1449 handle_begin(op);
1450 break;
1451 case MMonPaxos::OP_ACCEPT:
1452 handle_accept(op);
1453 break;
1454 case MMonPaxos::OP_COMMIT:
1455 handle_commit(op);
1456 break;
1457 case MMonPaxos::OP_LEASE:
1458 handle_lease(op);
1459 break;
1460 case MMonPaxos::OP_LEASE_ACK:
1461 handle_lease_ack(op);
1462 break;
1463 default:
1464 ceph_abort();
1465 }
1466 }
1467 break;
1468
1469 default:
1470 ceph_abort();
1471 }
1472}
1473
1474
1475// -----------------
1476// service interface
1477
1478// -- READ --
1479
1480bool Paxos::is_readable(version_t v)
1481{
1482 bool ret;
1483 if (v > last_committed)
1484 ret = false;
1485 else
1486 ret =
1487 (mon->is_peon() || mon->is_leader()) &&
1488 (is_active() || is_updating() || is_writing()) &&
1489 last_committed > 0 && is_lease_valid(); // must have a value alone, or have lease
1490 dout(5) << __func__ << " = " << (int)ret
1491 << " - now=" << ceph_clock_now()
1492 << " lease_expire=" << lease_expire
1493 << " has v" << v << " lc " << last_committed
1494 << dendl;
1495 return ret;
1496}
1497
1498bool Paxos::read(version_t v, bufferlist &bl)
1499{
1500 if (!get_store()->get(get_name(), v, bl))
1501 return false;
1502 return true;
1503}
1504
1505version_t Paxos::read_current(bufferlist &bl)
1506{
1507 if (read(last_committed, bl))
1508 return last_committed;
1509 return 0;
1510}
1511
1512
1513bool Paxos::is_lease_valid()
1514{
1515 return ((mon->get_quorum().size() == 1)
1516 || (ceph_clock_now() < lease_expire));
1517}
1518
1519// -- WRITE --
1520
1521bool Paxos::is_writeable()
1522{
1523 return
1524 mon->is_leader() &&
1525 is_active() &&
1526 is_lease_valid();
1527}
1528
1529void Paxos::propose_pending()
1530{
1531 assert(is_active());
1532 assert(pending_proposal);
1533
1534 cancel_events();
1535
1536 bufferlist bl;
1537 pending_proposal->encode(bl);
1538
1539 dout(10) << __func__ << " " << (last_committed + 1)
1540 << " " << bl.length() << " bytes" << dendl;
1541 dout(30) << __func__ << " transaction dump:\n";
1542 JSONFormatter f(true);
1543 pending_proposal->dump(&f);
1544 f.flush(*_dout);
1545 *_dout << dendl;
1546
1547 pending_proposal.reset();
1548
1549 committing_finishers.swap(pending_finishers);
1550 state = STATE_UPDATING;
1551 begin(bl);
1552}
1553
1554void Paxos::queue_pending_finisher(Context *onfinished)
1555{
1556 dout(5) << __func__ << " " << onfinished << dendl;
1557 assert(onfinished);
1558 pending_finishers.push_back(onfinished);
1559}
1560
1561MonitorDBStore::TransactionRef Paxos::get_pending_transaction()
1562{
1563 assert(mon->is_leader());
1564 if (!pending_proposal) {
1565 pending_proposal.reset(new MonitorDBStore::Transaction);
1566 assert(pending_finishers.empty());
1567 }
1568 return pending_proposal;
1569}
1570
1571bool Paxos::trigger_propose()
1572{
31f18b77
FG
1573 if (plugged) {
1574 dout(10) << __func__ << " plugged, not proposing now" << dendl;
1575 return false;
1576 } else if (is_active()) {
7c673cae
FG
1577 dout(10) << __func__ << " active, proposing now" << dendl;
1578 propose_pending();
1579 return true;
1580 } else {
1581 dout(10) << __func__ << " not active, will propose later" << dendl;
1582 return false;
1583 }
1584}
1585
1586bool Paxos::is_consistent()
1587{
1588 return (first_committed <= last_committed);
1589}
1590