]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/Paxos.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / mon / Paxos.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <sstream>
16 #include "Paxos.h"
17 #include "Monitor.h"
18 #include "messages/MMonPaxos.h"
19
20 #include "common/config.h"
21 #include "include/assert.h"
22 #include "include/stringify.h"
23 #include "common/Timer.h"
24 #include "messages/PaxosServiceMessage.h"
25
26 #define dout_subsys ceph_subsys_paxos
27 #undef dout_prefix
28 #define dout_prefix _prefix(_dout, mon, mon->name, mon->rank, paxos_name, state, first_committed, last_committed)
29 static ostream& _prefix(std::ostream *_dout, Monitor *mon, const string& name,
30 int rank, const string& paxos_name, int state,
31 version_t first_committed, version_t last_committed)
32 {
33 return *_dout << "mon." << name << "@" << rank
34 << "(" << mon->get_state_name() << ")"
35 << ".paxos(" << paxos_name << " " << Paxos::get_statename(state)
36 << " c " << first_committed << ".." << last_committed
37 << ") ";
38 }
39
40 class Paxos::C_Trimmed : public Context {
41 Paxos *paxos;
42 public:
43 explicit C_Trimmed(Paxos *p) : paxos(p) { }
44 void finish(int r) override {
45 paxos->trimming = false;
46 }
47 };
48
49 MonitorDBStore *Paxos::get_store()
50 {
51 return mon->store;
52 }
53
54 void Paxos::read_and_prepare_transactions(MonitorDBStore::TransactionRef tx,
55 version_t first, version_t last)
56 {
57 dout(10) << __func__ << " first " << first << " last " << last << dendl;
58 for (version_t v = first; v <= last; ++v) {
59 dout(30) << __func__ << " apply version " << v << dendl;
60 bufferlist bl;
61 int err = get_store()->get(get_name(), v, bl);
62 assert(err == 0);
63 assert(bl.length());
64 decode_append_transaction(tx, bl);
65 }
66 dout(15) << __func__ << " total versions " << (last-first) << dendl;
67 }
68
69 void Paxos::init()
70 {
71 // load paxos variables from stable storage
72 last_pn = get_store()->get(get_name(), "last_pn");
73 accepted_pn = get_store()->get(get_name(), "accepted_pn");
74 last_committed = get_store()->get(get_name(), "last_committed");
75 first_committed = get_store()->get(get_name(), "first_committed");
76
77 dout(10) << __func__ << " last_pn: " << last_pn << " accepted_pn: "
78 << accepted_pn << " last_committed: " << last_committed
79 << " first_committed: " << first_committed << dendl;
80
81 dout(10) << "init" << dendl;
82 assert(is_consistent());
83 }
84
85 void Paxos::init_logger()
86 {
87 PerfCountersBuilder pcb(g_ceph_context, "paxos", l_paxos_first, l_paxos_last);
88 pcb.add_u64_counter(l_paxos_start_leader, "start_leader", "Starts in leader role");
89 pcb.add_u64_counter(l_paxos_start_peon, "start_peon", "Starts in peon role");
90 pcb.add_u64_counter(l_paxos_restart, "restart", "Restarts");
91 pcb.add_u64_counter(l_paxos_refresh, "refresh", "Refreshes");
92 pcb.add_time_avg(l_paxos_refresh_latency, "refresh_latency", "Refresh latency");
93 pcb.add_u64_counter(l_paxos_begin, "begin", "Started and handled begins");
94 pcb.add_u64_avg(l_paxos_begin_keys, "begin_keys", "Keys in transaction on begin");
95 pcb.add_u64_avg(l_paxos_begin_bytes, "begin_bytes", "Data in transaction on begin");
96 pcb.add_time_avg(l_paxos_begin_latency, "begin_latency", "Latency of begin operation");
97 pcb.add_u64_counter(l_paxos_commit, "commit",
98 "Commits", "cmt");
99 pcb.add_u64_avg(l_paxos_commit_keys, "commit_keys", "Keys in transaction on commit");
100 pcb.add_u64_avg(l_paxos_commit_bytes, "commit_bytes", "Data in transaction on commit");
101 pcb.add_time_avg(l_paxos_commit_latency, "commit_latency",
102 "Commit latency", "clat");
103 pcb.add_u64_counter(l_paxos_collect, "collect", "Peon collects");
104 pcb.add_u64_avg(l_paxos_collect_keys, "collect_keys", "Keys in transaction on peon collect");
105 pcb.add_u64_avg(l_paxos_collect_bytes, "collect_bytes", "Data in transaction on peon collect");
106 pcb.add_time_avg(l_paxos_collect_latency, "collect_latency", "Peon collect latency");
107 pcb.add_u64_counter(l_paxos_collect_uncommitted, "collect_uncommitted", "Uncommitted values in started and handled collects");
108 pcb.add_u64_counter(l_paxos_collect_timeout, "collect_timeout", "Collect timeouts");
109 pcb.add_u64_counter(l_paxos_accept_timeout, "accept_timeout", "Accept timeouts");
110 pcb.add_u64_counter(l_paxos_lease_ack_timeout, "lease_ack_timeout", "Lease acknowledgement timeouts");
111 pcb.add_u64_counter(l_paxos_lease_timeout, "lease_timeout", "Lease timeouts");
112 pcb.add_u64_counter(l_paxos_store_state, "store_state", "Store a shared state on disk");
113 pcb.add_u64_avg(l_paxos_store_state_keys, "store_state_keys", "Keys in transaction in stored state");
114 pcb.add_u64_avg(l_paxos_store_state_bytes, "store_state_bytes", "Data in transaction in stored state");
115 pcb.add_time_avg(l_paxos_store_state_latency, "store_state_latency", "Storing state latency");
116 pcb.add_u64_counter(l_paxos_share_state, "share_state", "Sharings of state");
117 pcb.add_u64_avg(l_paxos_share_state_keys, "share_state_keys", "Keys in shared state");
118 pcb.add_u64_avg(l_paxos_share_state_bytes, "share_state_bytes", "Data in shared state");
119 pcb.add_u64_counter(l_paxos_new_pn, "new_pn", "New proposal number queries");
120 pcb.add_time_avg(l_paxos_new_pn_latency, "new_pn_latency", "New proposal number getting latency");
121 logger = pcb.create_perf_counters();
122 g_ceph_context->get_perfcounters_collection()->add(logger);
123 }
124
125 void Paxos::dump_info(Formatter *f)
126 {
127 f->open_object_section("paxos");
128 f->dump_unsigned("first_committed", first_committed);
129 f->dump_unsigned("last_committed", last_committed);
130 f->dump_unsigned("last_pn", last_pn);
131 f->dump_unsigned("accepted_pn", accepted_pn);
132 f->close_section();
133 }
134
135 // ---------------------------------
136
137 // PHASE 1
138
139 // leader
140 void Paxos::collect(version_t oldpn)
141 {
142 // we're recoverying, it seems!
143 state = STATE_RECOVERING;
144 assert(mon->is_leader());
145
146 // reset the number of lasts received
147 uncommitted_v = 0;
148 uncommitted_pn = 0;
149 uncommitted_value.clear();
150 peer_first_committed.clear();
151 peer_last_committed.clear();
152
153 // look for uncommitted value
154 if (get_store()->exists(get_name(), last_committed+1)) {
155 version_t v = get_store()->get(get_name(), "pending_v");
156 version_t pn = get_store()->get(get_name(), "pending_pn");
157 if (v && pn && v == last_committed + 1) {
158 uncommitted_pn = pn;
159 } else {
160 dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << accepted_pn
161 << " and crossing our fingers" << dendl;
162 uncommitted_pn = accepted_pn;
163 }
164 uncommitted_v = last_committed+1;
165
166 get_store()->get(get_name(), last_committed+1, uncommitted_value);
167 assert(uncommitted_value.length());
168 dout(10) << "learned uncommitted " << (last_committed+1)
169 << " pn " << uncommitted_pn
170 << " (" << uncommitted_value.length() << " bytes) from myself"
171 << dendl;
172
173 logger->inc(l_paxos_collect_uncommitted);
174 }
175
176 // pick new pn
177 accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn));
178 accepted_pn_from = last_committed;
179 num_last = 1;
180 dout(10) << "collect with pn " << accepted_pn << dendl;
181
182 // send collect
183 for (set<int>::const_iterator p = mon->get_quorum().begin();
184 p != mon->get_quorum().end();
185 ++p) {
186 if (*p == mon->rank) continue;
187
188 MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT,
189 ceph_clock_now());
190 collect->last_committed = last_committed;
191 collect->first_committed = first_committed;
192 collect->pn = accepted_pn;
193 mon->messenger->send_message(collect, mon->monmap->get_inst(*p));
194 }
195
196 // set timeout event
197 collect_timeout_event = new C_MonContext(mon, [this](int r) {
198 if (r == -ECANCELED)
199 return;
200 collect_timeout();
201 });
202 mon->timer.add_event_after(g_conf->mon_accept_timeout_factor *
203 g_conf->mon_lease,
204 collect_timeout_event);
205 }
206
207
208 // peon
209 void Paxos::handle_collect(MonOpRequestRef op)
210 {
211
212 op->mark_paxos_event("handle_collect");
213
214 MMonPaxos *collect = static_cast<MMonPaxos*>(op->get_req());
215 dout(10) << "handle_collect " << *collect << dendl;
216
217 assert(mon->is_peon()); // mon epoch filter should catch strays
218
219 // we're recoverying, it seems!
220 state = STATE_RECOVERING;
221
222 //update the peon recovery timeout
223 reset_lease_timeout();
224
225 if (collect->first_committed > last_committed+1) {
226 dout(2) << __func__
227 << " leader's lowest version is too high for our last committed"
228 << " (theirs: " << collect->first_committed
229 << "; ours: " << last_committed << ") -- bootstrap!" << dendl;
230 op->mark_paxos_event("need to bootstrap");
231 mon->bootstrap();
232 return;
233 }
234
235 // reply
236 MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST,
237 ceph_clock_now());
238 last->last_committed = last_committed;
239 last->first_committed = first_committed;
240
241 version_t previous_pn = accepted_pn;
242
243 // can we accept this pn?
244 if (collect->pn > accepted_pn) {
245 // ok, accept it
246 accepted_pn = collect->pn;
247 accepted_pn_from = collect->pn_from;
248 dout(10) << "accepting pn " << accepted_pn << " from "
249 << accepted_pn_from << dendl;
250
251 auto t(std::make_shared<MonitorDBStore::Transaction>());
252 t->put(get_name(), "accepted_pn", accepted_pn);
253
254 dout(30) << __func__ << " transaction dump:\n";
255 JSONFormatter f(true);
256 t->dump(&f);
257 f.flush(*_dout);
258 *_dout << dendl;
259
260 logger->inc(l_paxos_collect);
261 logger->inc(l_paxos_collect_keys, t->get_keys());
262 logger->inc(l_paxos_collect_bytes, t->get_bytes());
263 utime_t start = ceph_clock_now();
264
265 get_store()->apply_transaction(t);
266
267 utime_t end = ceph_clock_now();
268 logger->tinc(l_paxos_collect_latency, end - start);
269 } else {
270 // don't accept!
271 dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from
272 << ", we already accepted " << accepted_pn
273 << " from " << accepted_pn_from << dendl;
274 }
275 last->pn = accepted_pn;
276 last->pn_from = accepted_pn_from;
277
278 // share whatever committed values we have
279 if (collect->last_committed < last_committed)
280 share_state(last, collect->first_committed, collect->last_committed);
281
282 // do we have an accepted but uncommitted value?
283 // (it'll be at last_committed+1)
284 bufferlist bl;
285 if (collect->last_committed <= last_committed &&
286 get_store()->exists(get_name(), last_committed+1)) {
287 get_store()->get(get_name(), last_committed+1, bl);
288 assert(bl.length() > 0);
289 dout(10) << " sharing our accepted but uncommitted value for "
290 << last_committed+1 << " (" << bl.length() << " bytes)" << dendl;
291 last->values[last_committed+1] = bl;
292
293 version_t v = get_store()->get(get_name(), "pending_v");
294 version_t pn = get_store()->get(get_name(), "pending_pn");
295 if (v && pn && v == last_committed + 1) {
296 last->uncommitted_pn = pn;
297 } else {
298 // previously we didn't record which pn a value was accepted
299 // under! use the pn value we just had... :(
300 dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << previous_pn
301 << " and crossing our fingers" << dendl;
302 last->uncommitted_pn = previous_pn;
303 }
304
305 logger->inc(l_paxos_collect_uncommitted);
306 }
307
308 // send reply
309 collect->get_connection()->send_message(last);
310 }
311
312 /**
313 * @note This is Okay. We share our versions between peer_last_committed and
314 * our last_committed (inclusive), and add their bufferlists to the
315 * message. It will be the peer's job to apply them to its store, as
316 * these bufferlists will contain raw transactions.
317 * This function is called by both the Peon and the Leader. The Peon will
318 * share the state with the Leader during handle_collect(), sharing any
319 * values the leader may be missing (i.e., the leader's last_committed is
320 * lower than the peon's last_committed). The Leader will share the state
321 * with the Peon during handle_last(), if the peon's last_committed is
322 * lower than the leader's last_committed.
323 */
324 void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed,
325 version_t peer_last_committed)
326 {
327 assert(peer_last_committed < last_committed);
328
329 dout(10) << "share_state peer has fc " << peer_first_committed
330 << " lc " << peer_last_committed << dendl;
331 version_t v = peer_last_committed + 1;
332
333 // include incrementals
334 uint64_t bytes = 0;
335 for ( ; v <= last_committed; v++) {
336 if (get_store()->exists(get_name(), v)) {
337 get_store()->get(get_name(), v, m->values[v]);
338 assert(m->values[v].length());
339 dout(10) << " sharing " << v << " ("
340 << m->values[v].length() << " bytes)" << dendl;
341 bytes += m->values[v].length() + 16; // paxos_ + 10 digits = 16
342 }
343 }
344 logger->inc(l_paxos_share_state);
345 logger->inc(l_paxos_share_state_keys, m->values.size());
346 logger->inc(l_paxos_share_state_bytes, bytes);
347
348 m->last_committed = last_committed;
349 }
350
351 /**
352 * Store on disk a state that was shared with us
353 *
354 * Basically, we received a set of version. Or just one. It doesn't matter.
355 * What matters is that we have to stash it in the store. So, we will simply
356 * write every single bufferlist into their own versions on our side (i.e.,
357 * onto paxos-related keys), and then we will decode those same bufferlists
358 * we just wrote and apply the transactions they hold. We will also update
359 * our first and last committed values to point to the new values, if need
360 * be. All all this is done tightly wrapped in a transaction to ensure we
361 * enjoy the atomicity guarantees given by our awesome k/v store.
362 */
363 bool Paxos::store_state(MMonPaxos *m)
364 {
365 auto t(std::make_shared<MonitorDBStore::Transaction>());
366 map<version_t,bufferlist>::iterator start = m->values.begin();
367 bool changed = false;
368
369 // build map of values to store
370 // we want to write the range [last_committed, m->last_committed] only.
371 if (start != m->values.end() &&
372 start->first > last_committed + 1) {
373 // ignore everything if values start in the future.
374 dout(10) << "store_state ignoring all values, they start at " << start->first
375 << " > last_committed+1" << dendl;
376 return false;
377 }
378
379 // push forward the start position on the message's values iterator, up until
380 // we run out of positions or we find a position matching 'last_committed'.
381 while (start != m->values.end() && start->first <= last_committed) {
382 ++start;
383 }
384
385 // make sure we get the right interval of values to apply by pushing forward
386 // the 'end' iterator until it matches the message's 'last_committed'.
387 map<version_t,bufferlist>::iterator end = start;
388 while (end != m->values.end() && end->first <= m->last_committed) {
389 last_committed = end->first;
390 ++end;
391 }
392
393 if (start == end) {
394 dout(10) << "store_state nothing to commit" << dendl;
395 } else {
396 dout(10) << "store_state [" << start->first << ".."
397 << last_committed << "]" << dendl;
398 t->put(get_name(), "last_committed", last_committed);
399
400 // we should apply the state here -- decode every single bufferlist in the
401 // map and append the transactions to 't'.
402 map<version_t,bufferlist>::iterator it;
403 for (it = start; it != end; ++it) {
404 // write the bufferlist as the version's value
405 t->put(get_name(), it->first, it->second);
406 // decode the bufferlist and append it to the transaction we will shortly
407 // apply.
408 decode_append_transaction(t, it->second);
409 }
410
411 // discard obsolete uncommitted value?
412 if (uncommitted_v && uncommitted_v <= last_committed) {
413 dout(10) << " forgetting obsolete uncommitted value " << uncommitted_v
414 << " pn " << uncommitted_pn << dendl;
415 uncommitted_v = 0;
416 uncommitted_pn = 0;
417 uncommitted_value.clear();
418 }
419 }
420 if (!t->empty()) {
421 dout(30) << __func__ << " transaction dump:\n";
422 JSONFormatter f(true);
423 t->dump(&f);
424 f.flush(*_dout);
425 *_dout << dendl;
426
427 logger->inc(l_paxos_store_state);
428 logger->inc(l_paxos_store_state_bytes, t->get_bytes());
429 logger->inc(l_paxos_store_state_keys, t->get_keys());
430 utime_t start = ceph_clock_now();
431
432 get_store()->apply_transaction(t);
433
434 utime_t end = ceph_clock_now();
435 logger->tinc(l_paxos_store_state_latency, end - start);
436
437 // refresh first_committed; this txn may have trimmed.
438 first_committed = get_store()->get(get_name(), "first_committed");
439
440 _sanity_check_store();
441 changed = true;
442 }
443
444 return changed;
445 }
446
447 void Paxos::_sanity_check_store()
448 {
449 version_t lc = get_store()->get(get_name(), "last_committed");
450 assert(lc == last_committed);
451 }
452
453
454 // leader
455 void Paxos::handle_last(MonOpRequestRef op)
456 {
457 op->mark_paxos_event("handle_last");
458 MMonPaxos *last = static_cast<MMonPaxos*>(op->get_req());
459 bool need_refresh = false;
460 int from = last->get_source().num();
461
462 dout(10) << "handle_last " << *last << dendl;
463
464 if (!mon->is_leader()) {
465 dout(10) << "not leader, dropping" << dendl;
466 return;
467 }
468
469 // note peer's first_ and last_committed, in case we learn a new
470 // commit and need to push it to them.
471 peer_first_committed[from] = last->first_committed;
472 peer_last_committed[from] = last->last_committed;
473
474 if (last->first_committed > last_committed + 1) {
475 dout(5) << __func__
476 << " mon." << from
477 << " lowest version is too high for our last committed"
478 << " (theirs: " << last->first_committed
479 << "; ours: " << last_committed << ") -- bootstrap!" << dendl;
480 op->mark_paxos_event("need to bootstrap");
481 mon->bootstrap();
482 return;
483 }
484
485 assert(g_conf->paxos_kill_at != 1);
486
487 // store any committed values if any are specified in the message
488 need_refresh = store_state(last);
489
490 assert(g_conf->paxos_kill_at != 2);
491
492 // is everyone contiguous and up to date?
493 for (map<int,version_t>::iterator p = peer_last_committed.begin();
494 p != peer_last_committed.end();
495 ++p) {
496 if (p->second + 1 < first_committed && first_committed > 1) {
497 dout(5) << __func__
498 << " peon " << p->first
499 << " last_committed (" << p->second
500 << ") is too low for our first_committed (" << first_committed
501 << ") -- bootstrap!" << dendl;
502 op->mark_paxos_event("need to bootstrap");
503 mon->bootstrap();
504 return;
505 }
506 if (p->second < last_committed) {
507 // share committed values
508 dout(10) << " sending commit to mon." << p->first << dendl;
509 MMonPaxos *commit = new MMonPaxos(mon->get_epoch(),
510 MMonPaxos::OP_COMMIT,
511 ceph_clock_now());
512 share_state(commit, peer_first_committed[p->first], p->second);
513 mon->messenger->send_message(commit, mon->monmap->get_inst(p->first));
514 }
515 }
516
517 // do they accept your pn?
518 if (last->pn > accepted_pn) {
519 // no, try again.
520 dout(10) << " they had a higher pn than us, picking a new one." << dendl;
521
522 // cancel timeout event
523 mon->timer.cancel_event(collect_timeout_event);
524 collect_timeout_event = 0;
525
526 collect(last->pn);
527 } else if (last->pn == accepted_pn) {
528 // yes, they accepted our pn. great.
529 num_last++;
530 dout(10) << " they accepted our pn, we now have "
531 << num_last << " peons" << dendl;
532
533 // did this person send back an accepted but uncommitted value?
534 if (last->uncommitted_pn) {
535 if (last->uncommitted_pn >= uncommitted_pn &&
536 last->last_committed >= last_committed &&
537 last->last_committed + 1 >= uncommitted_v) {
538 uncommitted_v = last->last_committed+1;
539 uncommitted_pn = last->uncommitted_pn;
540 uncommitted_value = last->values[uncommitted_v];
541 dout(10) << "we learned an uncommitted value for " << uncommitted_v
542 << " pn " << uncommitted_pn
543 << " " << uncommitted_value.length() << " bytes"
544 << dendl;
545 } else {
546 dout(10) << "ignoring uncommitted value for " << (last->last_committed+1)
547 << " pn " << last->uncommitted_pn
548 << " " << last->values[last->last_committed+1].length() << " bytes"
549 << dendl;
550 }
551 }
552
553 // is that everyone?
554 if (num_last == mon->get_quorum().size()) {
555 // cancel timeout event
556 mon->timer.cancel_event(collect_timeout_event);
557 collect_timeout_event = 0;
558 peer_first_committed.clear();
559 peer_last_committed.clear();
560
561 // almost...
562
563 // did we learn an old value?
564 if (uncommitted_v == last_committed+1 &&
565 uncommitted_value.length()) {
566 dout(10) << "that's everyone. begin on old learned value" << dendl;
567 state = STATE_UPDATING_PREVIOUS;
568 begin(uncommitted_value);
569 } else {
570 // active!
571 dout(10) << "that's everyone. active!" << dendl;
572 extend_lease();
573
574 need_refresh = false;
575 if (do_refresh()) {
576 finish_round();
577 }
578 }
579 }
580 } else {
581 // no, this is an old message, discard
582 dout(10) << "old pn, ignoring" << dendl;
583 }
584
585 if (need_refresh)
586 (void)do_refresh();
587 }
588
589 void Paxos::collect_timeout()
590 {
591 dout(1) << "collect timeout, calling fresh election" << dendl;
592 collect_timeout_event = 0;
593 logger->inc(l_paxos_collect_timeout);
594 assert(mon->is_leader());
595 mon->bootstrap();
596 }
597
598
599 // leader
600 void Paxos::begin(bufferlist& v)
601 {
602 dout(10) << "begin for " << last_committed+1 << " "
603 << v.length() << " bytes"
604 << dendl;
605
606 assert(mon->is_leader());
607 assert(is_updating() || is_updating_previous());
608
609 // we must already have a majority for this to work.
610 assert(mon->get_quorum().size() == 1 ||
611 num_last > (unsigned)mon->monmap->size()/2);
612
613 // and no value, yet.
614 assert(new_value.length() == 0);
615
616 // accept it ourselves
617 accepted.clear();
618 accepted.insert(mon->rank);
619 new_value = v;
620
621 if (last_committed == 0) {
622 auto t(std::make_shared<MonitorDBStore::Transaction>());
623 // initial base case; set first_committed too
624 t->put(get_name(), "first_committed", 1);
625 decode_append_transaction(t, new_value);
626
627 bufferlist tx_bl;
628 t->encode(tx_bl);
629
630 new_value = tx_bl;
631 }
632
633 // store the proposed value in the store. IF it is accepted, we will then
634 // have to decode it into a transaction and apply it.
635 auto t(std::make_shared<MonitorDBStore::Transaction>());
636 t->put(get_name(), last_committed+1, new_value);
637
638 // note which pn this pending value is for.
639 t->put(get_name(), "pending_v", last_committed + 1);
640 t->put(get_name(), "pending_pn", accepted_pn);
641
642 dout(30) << __func__ << " transaction dump:\n";
643 JSONFormatter f(true);
644 t->dump(&f);
645 f.flush(*_dout);
646 auto debug_tx(std::make_shared<MonitorDBStore::Transaction>());
647 bufferlist::iterator new_value_it = new_value.begin();
648 debug_tx->decode(new_value_it);
649 debug_tx->dump(&f);
650 *_dout << "\nbl dump:\n";
651 f.flush(*_dout);
652 *_dout << dendl;
653
654 logger->inc(l_paxos_begin);
655 logger->inc(l_paxos_begin_keys, t->get_keys());
656 logger->inc(l_paxos_begin_bytes, t->get_bytes());
657 utime_t start = ceph_clock_now();
658
659 get_store()->apply_transaction(t);
660
661 utime_t end = ceph_clock_now();
662 logger->tinc(l_paxos_begin_latency, end - start);
663
664 assert(g_conf->paxos_kill_at != 3);
665
666 if (mon->get_quorum().size() == 1) {
667 // we're alone, take it easy
668 commit_start();
669 return;
670 }
671
672 // ask others to accept it too!
673 for (set<int>::const_iterator p = mon->get_quorum().begin();
674 p != mon->get_quorum().end();
675 ++p) {
676 if (*p == mon->rank) continue;
677
678 dout(10) << " sending begin to mon." << *p << dendl;
679 MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN,
680 ceph_clock_now());
681 begin->values[last_committed+1] = new_value;
682 begin->last_committed = last_committed;
683 begin->pn = accepted_pn;
684
685 mon->messenger->send_message(begin, mon->monmap->get_inst(*p));
686 }
687
688 // set timeout event
689 accept_timeout_event = new C_MonContext(mon, [this](int r) {
690 if (r == -ECANCELED)
691 return;
692 accept_timeout();
693 });
694 mon->timer.add_event_after(g_conf->mon_accept_timeout_factor *
695 g_conf->mon_lease,
696 accept_timeout_event);
697 }
698
699 // peon
700 void Paxos::handle_begin(MonOpRequestRef op)
701 {
702 op->mark_paxos_event("handle_begin");
703 MMonPaxos *begin = static_cast<MMonPaxos*>(op->get_req());
704 dout(10) << "handle_begin " << *begin << dendl;
705
706 // can we accept this?
707 if (begin->pn < accepted_pn) {
708 dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
709 op->mark_paxos_event("have higher pn, ignore");
710 return;
711 }
712 assert(begin->pn == accepted_pn);
713 assert(begin->last_committed == last_committed);
714
715 assert(g_conf->paxos_kill_at != 4);
716
717 logger->inc(l_paxos_begin);
718
719 // set state.
720 state = STATE_UPDATING;
721 lease_expire = utime_t(); // cancel lease
722
723 // yes.
724 version_t v = last_committed+1;
725 dout(10) << "accepting value for " << v << " pn " << accepted_pn << dendl;
726 // store the accepted value onto our store. We will have to decode it and
727 // apply its transaction once we receive permission to commit.
728 auto t(std::make_shared<MonitorDBStore::Transaction>());
729 t->put(get_name(), v, begin->values[v]);
730
731 // note which pn this pending value is for.
732 t->put(get_name(), "pending_v", v);
733 t->put(get_name(), "pending_pn", accepted_pn);
734
735 dout(30) << __func__ << " transaction dump:\n";
736 JSONFormatter f(true);
737 t->dump(&f);
738 f.flush(*_dout);
739 *_dout << dendl;
740
741 logger->inc(l_paxos_begin_bytes, t->get_bytes());
742 utime_t start = ceph_clock_now();
743
744 get_store()->apply_transaction(t);
745
746 utime_t end = ceph_clock_now();
747 logger->tinc(l_paxos_begin_latency, end - start);
748
749 assert(g_conf->paxos_kill_at != 5);
750
751 // reply
752 MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT,
753 ceph_clock_now());
754 accept->pn = accepted_pn;
755 accept->last_committed = last_committed;
756 begin->get_connection()->send_message(accept);
757 }
758
759 // leader
760 void Paxos::handle_accept(MonOpRequestRef op)
761 {
762 op->mark_paxos_event("handle_accept");
763 MMonPaxos *accept = static_cast<MMonPaxos*>(op->get_req());
764 dout(10) << "handle_accept " << *accept << dendl;
765 int from = accept->get_source().num();
766
767 if (accept->pn != accepted_pn) {
768 // we accepted a higher pn, from some other leader
769 dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
770 op->mark_paxos_event("have higher pn, ignore");
771 return;
772 }
773 if (last_committed > 0 &&
774 accept->last_committed < last_committed-1) {
775 dout(10) << " this is from an old round, ignoring" << dendl;
776 op->mark_paxos_event("old round, ignore");
777 return;
778 }
779 assert(accept->last_committed == last_committed || // not committed
780 accept->last_committed == last_committed-1); // committed
781
782 assert(is_updating() || is_updating_previous());
783 assert(accepted.count(from) == 0);
784 accepted.insert(from);
785 dout(10) << " now " << accepted << " have accepted" << dendl;
786
787 assert(g_conf->paxos_kill_at != 6);
788
789 // only commit (and expose committed state) when we get *all* quorum
790 // members to accept. otherwise, they may still be sharing the now
791 // stale state.
792 // FIXME: we can improve this with an additional lease revocation message
793 // that doesn't block for the persist.
794 if (accepted == mon->get_quorum()) {
795 // yay, commit!
796 dout(10) << " got majority, committing, done with update" << dendl;
797 op->mark_paxos_event("commit_start");
798 commit_start();
799 }
800 }
801
802 void Paxos::accept_timeout()
803 {
804 dout(1) << "accept timeout, calling fresh election" << dendl;
805 accept_timeout_event = 0;
806 assert(mon->is_leader());
807 assert(is_updating() || is_updating_previous() || is_writing() ||
808 is_writing_previous());
809 logger->inc(l_paxos_accept_timeout);
810 mon->bootstrap();
811 }
812
813 struct C_Committed : public Context {
814 Paxos *paxos;
815 explicit C_Committed(Paxos *p) : paxos(p) {}
816 void finish(int r) override {
817 assert(r >= 0);
818 Mutex::Locker l(paxos->mon->lock);
819 paxos->commit_finish();
820 }
821 };
822
823 void Paxos::commit_start()
824 {
825 dout(10) << __func__ << " " << (last_committed+1) << dendl;
826
827 assert(g_conf->paxos_kill_at != 7);
828
829 auto t(std::make_shared<MonitorDBStore::Transaction>());
830
831 // commit locally
832 t->put(get_name(), "last_committed", last_committed + 1);
833
834 // decode the value and apply its transaction to the store.
835 // this value can now be read from last_committed.
836 decode_append_transaction(t, new_value);
837
838 dout(30) << __func__ << " transaction dump:\n";
839 JSONFormatter f(true);
840 t->dump(&f);
841 f.flush(*_dout);
842 *_dout << dendl;
843
844 logger->inc(l_paxos_commit);
845 logger->inc(l_paxos_commit_keys, t->get_keys());
846 logger->inc(l_paxos_commit_bytes, t->get_bytes());
847 commit_start_stamp = ceph_clock_now();
848
849 get_store()->queue_transaction(t, new C_Committed(this));
850
851 if (is_updating_previous())
852 state = STATE_WRITING_PREVIOUS;
853 else if (is_updating())
854 state = STATE_WRITING;
855 else
856 ceph_abort();
857
858 if (mon->get_quorum().size() > 1) {
859 // cancel timeout event
860 mon->timer.cancel_event(accept_timeout_event);
861 accept_timeout_event = 0;
862 }
863 }
864
865 void Paxos::commit_finish()
866 {
867 dout(20) << __func__ << " " << (last_committed+1) << dendl;
868 utime_t end = ceph_clock_now();
869 logger->tinc(l_paxos_commit_latency, end - commit_start_stamp);
870
871 assert(g_conf->paxos_kill_at != 8);
872
873 // cancel lease - it was for the old value.
874 // (this would only happen if message layer lost the 'begin', but
875 // leader still got a majority and committed with out us.)
876 lease_expire = utime_t(); // cancel lease
877
878 last_committed++;
879 last_commit_time = ceph_clock_now();
880
881 // refresh first_committed; this txn may have trimmed.
882 first_committed = get_store()->get(get_name(), "first_committed");
883
884 _sanity_check_store();
885
886 // tell everyone
887 for (set<int>::const_iterator p = mon->get_quorum().begin();
888 p != mon->get_quorum().end();
889 ++p) {
890 if (*p == mon->rank) continue;
891
892 dout(10) << " sending commit to mon." << *p << dendl;
893 MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,
894 ceph_clock_now());
895 commit->values[last_committed] = new_value;
896 commit->pn = accepted_pn;
897 commit->last_committed = last_committed;
898
899 mon->messenger->send_message(commit, mon->monmap->get_inst(*p));
900 }
901
902 assert(g_conf->paxos_kill_at != 9);
903
904 // get ready for a new round.
905 new_value.clear();
906
907 // WRITING -> REFRESH
908 // among other things, this lets do_refresh() -> mon->bootstrap() know
909 // it doesn't need to flush the store queue
910 assert(is_writing() || is_writing_previous());
911 state = STATE_REFRESH;
912
913 if (do_refresh()) {
914 commit_proposal();
915 if (mon->get_quorum().size() > 1) {
916 extend_lease();
917 }
918
919 finish_contexts(g_ceph_context, waiting_for_commit);
920
921 assert(g_conf->paxos_kill_at != 10);
922
923 finish_round();
924 }
925 }
926
927
928 void Paxos::handle_commit(MonOpRequestRef op)
929 {
930 op->mark_paxos_event("handle_commit");
931 MMonPaxos *commit = static_cast<MMonPaxos*>(op->get_req());
932 dout(10) << "handle_commit on " << commit->last_committed << dendl;
933
934 logger->inc(l_paxos_commit);
935
936 if (!mon->is_peon()) {
937 dout(10) << "not a peon, dropping" << dendl;
938 ceph_abort();
939 return;
940 }
941
942 op->mark_paxos_event("store_state");
943 store_state(commit);
944
945 if (do_refresh()) {
946 finish_contexts(g_ceph_context, waiting_for_commit);
947 }
948 }
949
950 void Paxos::extend_lease()
951 {
952 assert(mon->is_leader());
953 //assert(is_active());
954
955 lease_expire = ceph_clock_now();
956 lease_expire += g_conf->mon_lease;
957 acked_lease.clear();
958 acked_lease.insert(mon->rank);
959
960 dout(7) << "extend_lease now+" << g_conf->mon_lease
961 << " (" << lease_expire << ")" << dendl;
962
963 // bcast
964 for (set<int>::const_iterator p = mon->get_quorum().begin();
965 p != mon->get_quorum().end(); ++p) {
966
967 if (*p == mon->rank) continue;
968 MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE,
969 ceph_clock_now());
970 lease->last_committed = last_committed;
971 lease->lease_timestamp = lease_expire;
972 lease->first_committed = first_committed;
973 mon->messenger->send_message(lease, mon->monmap->get_inst(*p));
974 }
975
976 // set timeout event.
977 // if old timeout is still in place, leave it.
978 if (!lease_ack_timeout_event) {
979 lease_ack_timeout_event = new C_MonContext(mon, [this](int r) {
980 if (r == -ECANCELED)
981 return;
982 lease_ack_timeout();
983 });
984 mon->timer.add_event_after(g_conf->mon_lease_ack_timeout_factor *
985 g_conf->mon_lease,
986 lease_ack_timeout_event);
987 }
988
989 // set renew event
990 lease_renew_event = new C_MonContext(mon, [this](int r) {
991 if (r == -ECANCELED)
992 return;
993 lease_renew_timeout();
994 });
995 utime_t at = lease_expire;
996 at -= g_conf->mon_lease;
997 at += g_conf->mon_lease_renew_interval_factor * g_conf->mon_lease;
998 mon->timer.add_event_at(at, lease_renew_event);
999 }
1000
1001 void Paxos::warn_on_future_time(utime_t t, entity_name_t from)
1002 {
1003 utime_t now = ceph_clock_now();
1004 if (t > now) {
1005 utime_t diff = t - now;
1006 if (diff > g_conf->mon_clock_drift_allowed) {
1007 utime_t warn_diff = now - last_clock_drift_warn;
1008 if (warn_diff >
1009 pow(g_conf->mon_clock_drift_warn_backoff, clock_drift_warned)) {
1010 mon->clog->warn() << "message from " << from << " was stamped " << diff
1011 << "s in the future, clocks not synchronized";
1012 last_clock_drift_warn = ceph_clock_now();
1013 ++clock_drift_warned;
1014 }
1015 }
1016 }
1017
1018 }
1019
1020 bool Paxos::do_refresh()
1021 {
1022 bool need_bootstrap = false;
1023
1024 utime_t start = ceph_clock_now();
1025
1026 // make sure we have the latest state loaded up
1027 mon->refresh_from_paxos(&need_bootstrap);
1028
1029 utime_t end = ceph_clock_now();
1030 logger->inc(l_paxos_refresh);
1031 logger->tinc(l_paxos_refresh_latency, end - start);
1032
1033 if (need_bootstrap) {
1034 dout(10) << " doing requested bootstrap" << dendl;
1035 mon->bootstrap();
1036 return false;
1037 }
1038
1039 return true;
1040 }
1041
1042 void Paxos::commit_proposal()
1043 {
1044 dout(10) << __func__ << dendl;
1045 assert(mon->is_leader());
1046 assert(is_refresh());
1047
1048 finish_contexts(g_ceph_context, committing_finishers);
1049 }
1050
1051 void Paxos::finish_round()
1052 {
1053 dout(10) << __func__ << dendl;
1054 assert(mon->is_leader());
1055
1056 // ok, now go active!
1057 state = STATE_ACTIVE;
1058
1059 dout(20) << __func__ << " waiting_for_acting" << dendl;
1060 finish_contexts(g_ceph_context, waiting_for_active);
1061 dout(20) << __func__ << " waiting_for_readable" << dendl;
1062 finish_contexts(g_ceph_context, waiting_for_readable);
1063 dout(20) << __func__ << " waiting_for_writeable" << dendl;
1064 finish_contexts(g_ceph_context, waiting_for_writeable);
1065
1066 dout(10) << __func__ << " done w/ waiters, state " << get_statename(state) << dendl;
1067
1068 if (should_trim()) {
1069 trim();
1070 }
1071
1072 if (is_active() && pending_proposal) {
1073 propose_pending();
1074 }
1075 }
1076
1077
1078 // peon
1079 void Paxos::handle_lease(MonOpRequestRef op)
1080 {
1081 op->mark_paxos_event("handle_lease");
1082 MMonPaxos *lease = static_cast<MMonPaxos*>(op->get_req());
1083 // sanity
1084 if (!mon->is_peon() ||
1085 last_committed != lease->last_committed) {
1086 dout(10) << "handle_lease i'm not a peon, or they're not the leader,"
1087 << " or the last_committed doesn't match, dropping" << dendl;
1088 op->mark_paxos_event("invalid lease, ignore");
1089 return;
1090 }
1091
1092 warn_on_future_time(lease->sent_timestamp, lease->get_source());
1093
1094 // extend lease
1095 if (lease_expire < lease->lease_timestamp) {
1096 lease_expire = lease->lease_timestamp;
1097
1098 utime_t now = ceph_clock_now();
1099 if (lease_expire < now) {
1100 utime_t diff = now - lease_expire;
1101 derr << "lease_expire from " << lease->get_source_inst() << " is " << diff << " seconds in the past; mons are probably laggy (or possibly clocks are too skewed)" << dendl;
1102 }
1103 }
1104
1105 state = STATE_ACTIVE;
1106
1107 dout(10) << "handle_lease on " << lease->last_committed
1108 << " now " << lease_expire << dendl;
1109
1110 // ack
1111 MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK,
1112 ceph_clock_now());
1113 ack->last_committed = last_committed;
1114 ack->first_committed = first_committed;
1115 ack->lease_timestamp = ceph_clock_now();
1116 lease->get_connection()->send_message(ack);
1117
1118 // (re)set timeout event.
1119 reset_lease_timeout();
1120
1121 // kick waiters
1122 finish_contexts(g_ceph_context, waiting_for_active);
1123 if (is_readable())
1124 finish_contexts(g_ceph_context, waiting_for_readable);
1125 }
1126
1127 void Paxos::handle_lease_ack(MonOpRequestRef op)
1128 {
1129 op->mark_paxos_event("handle_lease_ack");
1130 MMonPaxos *ack = static_cast<MMonPaxos*>(op->get_req());
1131 int from = ack->get_source().num();
1132
1133 if (!lease_ack_timeout_event) {
1134 dout(10) << "handle_lease_ack from " << ack->get_source()
1135 << " -- stray (probably since revoked)" << dendl;
1136 }
1137 else if (acked_lease.count(from) == 0) {
1138 acked_lease.insert(from);
1139
1140 if (acked_lease == mon->get_quorum()) {
1141 // yay!
1142 dout(10) << "handle_lease_ack from " << ack->get_source()
1143 << " -- got everyone" << dendl;
1144 mon->timer.cancel_event(lease_ack_timeout_event);
1145 lease_ack_timeout_event = 0;
1146
1147
1148 } else {
1149 dout(10) << "handle_lease_ack from " << ack->get_source()
1150 << " -- still need "
1151 << mon->get_quorum().size() - acked_lease.size()
1152 << " more" << dendl;
1153 }
1154 } else {
1155 dout(10) << "handle_lease_ack from " << ack->get_source()
1156 << " dup (lagging!), ignoring" << dendl;
1157 }
1158
1159 warn_on_future_time(ack->sent_timestamp, ack->get_source());
1160 }
1161
1162 void Paxos::lease_ack_timeout()
1163 {
1164 dout(1) << "lease_ack_timeout -- calling new election" << dendl;
1165 assert(mon->is_leader());
1166 assert(is_active());
1167 logger->inc(l_paxos_lease_ack_timeout);
1168 lease_ack_timeout_event = 0;
1169 mon->bootstrap();
1170 }
1171
1172 void Paxos::reset_lease_timeout()
1173 {
1174 dout(20) << "reset_lease_timeout - setting timeout event" << dendl;
1175 if (lease_timeout_event)
1176 mon->timer.cancel_event(lease_timeout_event);
1177 lease_timeout_event = new C_MonContext(mon, [this](int r) {
1178 if (r == -ECANCELED)
1179 return;
1180 lease_timeout();
1181 });
1182 mon->timer.add_event_after(g_conf->mon_lease_ack_timeout_factor *
1183 g_conf->mon_lease,
1184 lease_timeout_event);
1185 }
1186
1187 void Paxos::lease_timeout()
1188 {
1189 dout(1) << "lease_timeout -- calling new election" << dendl;
1190 assert(mon->is_peon());
1191 logger->inc(l_paxos_lease_timeout);
1192 lease_timeout_event = 0;
1193 mon->bootstrap();
1194 }
1195
1196 void Paxos::lease_renew_timeout()
1197 {
1198 lease_renew_event = 0;
1199 extend_lease();
1200 }
1201
1202
1203 /*
1204 * trim old states
1205 */
1206 void Paxos::trim()
1207 {
1208 assert(should_trim());
1209 version_t end = MIN(get_version() - g_conf->paxos_min,
1210 get_first_committed() + g_conf->paxos_trim_max);
1211
1212 if (first_committed >= end)
1213 return;
1214
1215 dout(10) << "trim to " << end << " (was " << first_committed << ")" << dendl;
1216
1217 MonitorDBStore::TransactionRef t = get_pending_transaction();
1218
1219 for (version_t v = first_committed; v < end; ++v) {
1220 dout(10) << "trim " << v << dendl;
1221 t->erase(get_name(), v);
1222 }
1223 t->put(get_name(), "first_committed", end);
1224 if (g_conf->mon_compact_on_trim) {
1225 dout(10) << " compacting trimmed range" << dendl;
1226 t->compact_range(get_name(), stringify(first_committed - 1), stringify(end));
1227 }
1228
1229 trimming = true;
1230 queue_pending_finisher(new C_Trimmed(this));
1231 }
1232
1233 /*
1234 * return a globally unique, monotonically increasing proposal number
1235 */
1236 version_t Paxos::get_new_proposal_number(version_t gt)
1237 {
1238 if (last_pn < gt)
1239 last_pn = gt;
1240
1241 // update. make it unique among all monitors.
1242 last_pn /= 100;
1243 last_pn++;
1244 last_pn *= 100;
1245 last_pn += (version_t)mon->rank;
1246
1247 // write
1248 auto t(std::make_shared<MonitorDBStore::Transaction>());
1249 t->put(get_name(), "last_pn", last_pn);
1250
1251 dout(30) << __func__ << " transaction dump:\n";
1252 JSONFormatter f(true);
1253 t->dump(&f);
1254 f.flush(*_dout);
1255 *_dout << dendl;
1256
1257 logger->inc(l_paxos_new_pn);
1258 utime_t start = ceph_clock_now();
1259
1260 get_store()->apply_transaction(t);
1261
1262 utime_t end = ceph_clock_now();
1263 logger->tinc(l_paxos_new_pn_latency, end - start);
1264
1265 dout(10) << "get_new_proposal_number = " << last_pn << dendl;
1266 return last_pn;
1267 }
1268
1269
1270 void Paxos::cancel_events()
1271 {
1272 if (collect_timeout_event) {
1273 mon->timer.cancel_event(collect_timeout_event);
1274 collect_timeout_event = 0;
1275 }
1276 if (accept_timeout_event) {
1277 mon->timer.cancel_event(accept_timeout_event);
1278 accept_timeout_event = 0;
1279 }
1280 if (lease_renew_event) {
1281 mon->timer.cancel_event(lease_renew_event);
1282 lease_renew_event = 0;
1283 }
1284 if (lease_ack_timeout_event) {
1285 mon->timer.cancel_event(lease_ack_timeout_event);
1286 lease_ack_timeout_event = 0;
1287 }
1288 if (lease_timeout_event) {
1289 mon->timer.cancel_event(lease_timeout_event);
1290 lease_timeout_event = 0;
1291 }
1292 }
1293
1294 void Paxos::shutdown()
1295 {
1296 dout(10) << __func__ << " cancel all contexts" << dendl;
1297
1298 // discard pending transaction
1299 pending_proposal.reset();
1300
1301 finish_contexts(g_ceph_context, waiting_for_writeable, -ECANCELED);
1302 finish_contexts(g_ceph_context, waiting_for_commit, -ECANCELED);
1303 finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED);
1304 finish_contexts(g_ceph_context, waiting_for_active, -ECANCELED);
1305 finish_contexts(g_ceph_context, pending_finishers, -ECANCELED);
1306 finish_contexts(g_ceph_context, committing_finishers, -ECANCELED);
1307 if (logger)
1308 g_ceph_context->get_perfcounters_collection()->remove(logger);
1309 delete logger;
1310 }
1311
1312 void Paxos::leader_init()
1313 {
1314 cancel_events();
1315 new_value.clear();
1316
1317 // discard pending transaction
1318 pending_proposal.reset();
1319
1320 finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
1321 finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
1322
1323 logger->inc(l_paxos_start_leader);
1324
1325 if (mon->get_quorum().size() == 1) {
1326 state = STATE_ACTIVE;
1327 return;
1328 }
1329
1330 state = STATE_RECOVERING;
1331 lease_expire = utime_t();
1332 dout(10) << "leader_init -- starting paxos recovery" << dendl;
1333 collect(0);
1334 }
1335
1336 void Paxos::peon_init()
1337 {
1338 cancel_events();
1339 new_value.clear();
1340
1341 state = STATE_RECOVERING;
1342 lease_expire = utime_t();
1343 dout(10) << "peon_init -- i am a peon" << dendl;
1344
1345 // start a timer, in case the leader never manages to issue a lease
1346 reset_lease_timeout();
1347
1348 // discard pending transaction
1349 pending_proposal.reset();
1350
1351 // no chance to write now!
1352 finish_contexts(g_ceph_context, waiting_for_writeable, -EAGAIN);
1353 finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
1354 finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
1355 finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
1356
1357 logger->inc(l_paxos_start_peon);
1358 }
1359
1360 void Paxos::restart()
1361 {
1362 dout(10) << "restart -- canceling timeouts" << dendl;
1363 cancel_events();
1364 new_value.clear();
1365
1366 if (is_writing() || is_writing_previous()) {
1367 dout(10) << __func__ << " flushing" << dendl;
1368 mon->lock.Unlock();
1369 mon->store->flush();
1370 mon->lock.Lock();
1371 dout(10) << __func__ << " flushed" << dendl;
1372 }
1373 state = STATE_RECOVERING;
1374
1375 // discard pending transaction
1376 pending_proposal.reset();
1377
1378 finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
1379 finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
1380 finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
1381 finish_contexts(g_ceph_context, waiting_for_active, -EAGAIN);
1382
1383 logger->inc(l_paxos_restart);
1384 }
1385
1386
1387 void Paxos::dispatch(MonOpRequestRef op)
1388 {
1389 assert(op->is_type_paxos());
1390 op->mark_paxos_event("dispatch");
1391 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
1392 // election in progress?
1393 if (!mon->is_leader() && !mon->is_peon()) {
1394 dout(5) << "election in progress, dropping " << *m << dendl;
1395 return;
1396 }
1397
1398 // check sanity
1399 assert(mon->is_leader() ||
1400 (mon->is_peon() && m->get_source().num() == mon->get_leader()));
1401
1402 switch (m->get_type()) {
1403
1404 case MSG_MON_PAXOS:
1405 {
1406 MMonPaxos *pm = reinterpret_cast<MMonPaxos*>(m);
1407
1408 // NOTE: these ops are defined in messages/MMonPaxos.h
1409 switch (pm->op) {
1410 // learner
1411 case MMonPaxos::OP_COLLECT:
1412 handle_collect(op);
1413 break;
1414 case MMonPaxos::OP_LAST:
1415 handle_last(op);
1416 break;
1417 case MMonPaxos::OP_BEGIN:
1418 handle_begin(op);
1419 break;
1420 case MMonPaxos::OP_ACCEPT:
1421 handle_accept(op);
1422 break;
1423 case MMonPaxos::OP_COMMIT:
1424 handle_commit(op);
1425 break;
1426 case MMonPaxos::OP_LEASE:
1427 handle_lease(op);
1428 break;
1429 case MMonPaxos::OP_LEASE_ACK:
1430 handle_lease_ack(op);
1431 break;
1432 default:
1433 ceph_abort();
1434 }
1435 }
1436 break;
1437
1438 default:
1439 ceph_abort();
1440 }
1441 }
1442
1443
1444 // -----------------
1445 // service interface
1446
1447 // -- READ --
1448
1449 bool Paxos::is_readable(version_t v)
1450 {
1451 bool ret;
1452 if (v > last_committed)
1453 ret = false;
1454 else
1455 ret =
1456 (mon->is_peon() || mon->is_leader()) &&
1457 (is_active() || is_updating() || is_writing()) &&
1458 last_committed > 0 && is_lease_valid(); // must have a value alone, or have lease
1459 dout(5) << __func__ << " = " << (int)ret
1460 << " - now=" << ceph_clock_now()
1461 << " lease_expire=" << lease_expire
1462 << " has v" << v << " lc " << last_committed
1463 << dendl;
1464 return ret;
1465 }
1466
1467 bool Paxos::read(version_t v, bufferlist &bl)
1468 {
1469 if (!get_store()->get(get_name(), v, bl))
1470 return false;
1471 return true;
1472 }
1473
1474 version_t Paxos::read_current(bufferlist &bl)
1475 {
1476 if (read(last_committed, bl))
1477 return last_committed;
1478 return 0;
1479 }
1480
1481
1482 bool Paxos::is_lease_valid()
1483 {
1484 return ((mon->get_quorum().size() == 1)
1485 || (ceph_clock_now() < lease_expire));
1486 }
1487
1488 // -- WRITE --
1489
1490 bool Paxos::is_writeable()
1491 {
1492 return
1493 mon->is_leader() &&
1494 is_active() &&
1495 is_lease_valid();
1496 }
1497
1498 void Paxos::propose_pending()
1499 {
1500 assert(is_active());
1501 assert(pending_proposal);
1502
1503 cancel_events();
1504
1505 bufferlist bl;
1506 pending_proposal->encode(bl);
1507
1508 dout(10) << __func__ << " " << (last_committed + 1)
1509 << " " << bl.length() << " bytes" << dendl;
1510 dout(30) << __func__ << " transaction dump:\n";
1511 JSONFormatter f(true);
1512 pending_proposal->dump(&f);
1513 f.flush(*_dout);
1514 *_dout << dendl;
1515
1516 pending_proposal.reset();
1517
1518 committing_finishers.swap(pending_finishers);
1519 state = STATE_UPDATING;
1520 begin(bl);
1521 }
1522
1523 void Paxos::queue_pending_finisher(Context *onfinished)
1524 {
1525 dout(5) << __func__ << " " << onfinished << dendl;
1526 assert(onfinished);
1527 pending_finishers.push_back(onfinished);
1528 }
1529
1530 MonitorDBStore::TransactionRef Paxos::get_pending_transaction()
1531 {
1532 assert(mon->is_leader());
1533 if (!pending_proposal) {
1534 pending_proposal.reset(new MonitorDBStore::Transaction);
1535 assert(pending_finishers.empty());
1536 }
1537 return pending_proposal;
1538 }
1539
1540 bool Paxos::trigger_propose()
1541 {
1542 if (is_active()) {
1543 dout(10) << __func__ << " active, proposing now" << dendl;
1544 propose_pending();
1545 return true;
1546 } else {
1547 dout(10) << __func__ << " not active, will propose later" << dendl;
1548 return false;
1549 }
1550 }
1551
1552 bool Paxos::is_consistent()
1553 {
1554 return (first_committed <= last_committed);
1555 }
1556