]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/Paxos.cc
update sources to v12.1.1
[ceph.git] / ceph / src / mon / Paxos.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <sstream>
16 #include "Paxos.h"
17 #include "Monitor.h"
18 #include "messages/MMonPaxos.h"
19
20 #include "mon/mon_types.h"
21 #include "common/config.h"
22 #include "include/assert.h"
23 #include "include/stringify.h"
24 #include "common/Timer.h"
25 #include "messages/PaxosServiceMessage.h"
26
27 #define dout_subsys ceph_subsys_paxos
28 #undef dout_prefix
29 #define dout_prefix _prefix(_dout, mon, mon->name, mon->rank, paxos_name, state, first_committed, last_committed)
30 static ostream& _prefix(std::ostream *_dout, Monitor *mon, const string& name,
31 int rank, const string& paxos_name, int state,
32 version_t first_committed, version_t last_committed)
33 {
34 return *_dout << "mon." << name << "@" << rank
35 << "(" << mon->get_state_name() << ")"
36 << ".paxos(" << paxos_name << " " << Paxos::get_statename(state)
37 << " c " << first_committed << ".." << last_committed
38 << ") ";
39 }
40
41 class Paxos::C_Trimmed : public Context {
42 Paxos *paxos;
43 public:
44 explicit C_Trimmed(Paxos *p) : paxos(p) { }
45 void finish(int r) override {
46 paxos->trimming = false;
47 }
48 };
49
50 MonitorDBStore *Paxos::get_store()
51 {
52 return mon->store;
53 }
54
55 void Paxos::read_and_prepare_transactions(MonitorDBStore::TransactionRef tx,
56 version_t first, version_t last)
57 {
58 dout(10) << __func__ << " first " << first << " last " << last << dendl;
59 for (version_t v = first; v <= last; ++v) {
60 dout(30) << __func__ << " apply version " << v << dendl;
61 bufferlist bl;
62 int err = get_store()->get(get_name(), v, bl);
63 assert(err == 0);
64 assert(bl.length());
65 decode_append_transaction(tx, bl);
66 }
67 dout(15) << __func__ << " total versions " << (last-first) << dendl;
68 }
69
70 void Paxos::init()
71 {
72 // load paxos variables from stable storage
73 last_pn = get_store()->get(get_name(), "last_pn");
74 accepted_pn = get_store()->get(get_name(), "accepted_pn");
75 last_committed = get_store()->get(get_name(), "last_committed");
76 first_committed = get_store()->get(get_name(), "first_committed");
77
78 dout(10) << __func__ << " last_pn: " << last_pn << " accepted_pn: "
79 << accepted_pn << " last_committed: " << last_committed
80 << " first_committed: " << first_committed << dendl;
81
82 dout(10) << "init" << dendl;
83 assert(is_consistent());
84 }
85
86 void Paxos::init_logger()
87 {
88 PerfCountersBuilder pcb(g_ceph_context, "paxos", l_paxos_first, l_paxos_last);
89 pcb.add_u64_counter(l_paxos_start_leader, "start_leader", "Starts in leader role");
90 pcb.add_u64_counter(l_paxos_start_peon, "start_peon", "Starts in peon role");
91 pcb.add_u64_counter(l_paxos_restart, "restart", "Restarts");
92 pcb.add_u64_counter(l_paxos_refresh, "refresh", "Refreshes");
93 pcb.add_time_avg(l_paxos_refresh_latency, "refresh_latency", "Refresh latency");
94 pcb.add_u64_counter(l_paxos_begin, "begin", "Started and handled begins");
95 pcb.add_u64_avg(l_paxos_begin_keys, "begin_keys", "Keys in transaction on begin");
96 pcb.add_u64_avg(l_paxos_begin_bytes, "begin_bytes", "Data in transaction on begin");
97 pcb.add_time_avg(l_paxos_begin_latency, "begin_latency", "Latency of begin operation");
98 pcb.add_u64_counter(l_paxos_commit, "commit",
99 "Commits", "cmt");
100 pcb.add_u64_avg(l_paxos_commit_keys, "commit_keys", "Keys in transaction on commit");
101 pcb.add_u64_avg(l_paxos_commit_bytes, "commit_bytes", "Data in transaction on commit");
102 pcb.add_time_avg(l_paxos_commit_latency, "commit_latency",
103 "Commit latency", "clat");
104 pcb.add_u64_counter(l_paxos_collect, "collect", "Peon collects");
105 pcb.add_u64_avg(l_paxos_collect_keys, "collect_keys", "Keys in transaction on peon collect");
106 pcb.add_u64_avg(l_paxos_collect_bytes, "collect_bytes", "Data in transaction on peon collect");
107 pcb.add_time_avg(l_paxos_collect_latency, "collect_latency", "Peon collect latency");
108 pcb.add_u64_counter(l_paxos_collect_uncommitted, "collect_uncommitted", "Uncommitted values in started and handled collects");
109 pcb.add_u64_counter(l_paxos_collect_timeout, "collect_timeout", "Collect timeouts");
110 pcb.add_u64_counter(l_paxos_accept_timeout, "accept_timeout", "Accept timeouts");
111 pcb.add_u64_counter(l_paxos_lease_ack_timeout, "lease_ack_timeout", "Lease acknowledgement timeouts");
112 pcb.add_u64_counter(l_paxos_lease_timeout, "lease_timeout", "Lease timeouts");
113 pcb.add_u64_counter(l_paxos_store_state, "store_state", "Store a shared state on disk");
114 pcb.add_u64_avg(l_paxos_store_state_keys, "store_state_keys", "Keys in transaction in stored state");
115 pcb.add_u64_avg(l_paxos_store_state_bytes, "store_state_bytes", "Data in transaction in stored state");
116 pcb.add_time_avg(l_paxos_store_state_latency, "store_state_latency", "Storing state latency");
117 pcb.add_u64_counter(l_paxos_share_state, "share_state", "Sharings of state");
118 pcb.add_u64_avg(l_paxos_share_state_keys, "share_state_keys", "Keys in shared state");
119 pcb.add_u64_avg(l_paxos_share_state_bytes, "share_state_bytes", "Data in shared state");
120 pcb.add_u64_counter(l_paxos_new_pn, "new_pn", "New proposal number queries");
121 pcb.add_time_avg(l_paxos_new_pn_latency, "new_pn_latency", "New proposal number getting latency");
122 logger = pcb.create_perf_counters();
123 g_ceph_context->get_perfcounters_collection()->add(logger);
124 }
125
126 void Paxos::dump_info(Formatter *f)
127 {
128 f->open_object_section("paxos");
129 f->dump_unsigned("first_committed", first_committed);
130 f->dump_unsigned("last_committed", last_committed);
131 f->dump_unsigned("last_pn", last_pn);
132 f->dump_unsigned("accepted_pn", accepted_pn);
133 f->close_section();
134 }
135
136 // ---------------------------------
137
138 // PHASE 1
139
140 // leader
141 void Paxos::collect(version_t oldpn)
142 {
143 // we're recoverying, it seems!
144 state = STATE_RECOVERING;
145 assert(mon->is_leader());
146
147 // reset the number of lasts received
148 uncommitted_v = 0;
149 uncommitted_pn = 0;
150 uncommitted_value.clear();
151 peer_first_committed.clear();
152 peer_last_committed.clear();
153
154 // look for uncommitted value
155 if (get_store()->exists(get_name(), last_committed+1)) {
156 version_t v = get_store()->get(get_name(), "pending_v");
157 version_t pn = get_store()->get(get_name(), "pending_pn");
158 if (v && pn && v == last_committed + 1) {
159 uncommitted_pn = pn;
160 } else {
161 dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << accepted_pn
162 << " and crossing our fingers" << dendl;
163 uncommitted_pn = accepted_pn;
164 }
165 uncommitted_v = last_committed+1;
166
167 get_store()->get(get_name(), last_committed+1, uncommitted_value);
168 assert(uncommitted_value.length());
169 dout(10) << "learned uncommitted " << (last_committed+1)
170 << " pn " << uncommitted_pn
171 << " (" << uncommitted_value.length() << " bytes) from myself"
172 << dendl;
173
174 logger->inc(l_paxos_collect_uncommitted);
175 }
176
177 // pick new pn
178 accepted_pn = get_new_proposal_number(MAX(accepted_pn, oldpn));
179 accepted_pn_from = last_committed;
180 num_last = 1;
181 dout(10) << "collect with pn " << accepted_pn << dendl;
182
183 // send collect
184 for (set<int>::const_iterator p = mon->get_quorum().begin();
185 p != mon->get_quorum().end();
186 ++p) {
187 if (*p == mon->rank) continue;
188
189 MMonPaxos *collect = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COLLECT,
190 ceph_clock_now());
191 collect->last_committed = last_committed;
192 collect->first_committed = first_committed;
193 collect->pn = accepted_pn;
194 mon->messenger->send_message(collect, mon->monmap->get_inst(*p));
195 }
196
197 // set timeout event
198 collect_timeout_event = new C_MonContext(mon, [this](int r) {
199 if (r == -ECANCELED)
200 return;
201 collect_timeout();
202 });
203 mon->timer.add_event_after(g_conf->mon_accept_timeout_factor *
204 g_conf->mon_lease,
205 collect_timeout_event);
206 }
207
208
209 // peon
210 void Paxos::handle_collect(MonOpRequestRef op)
211 {
212
213 op->mark_paxos_event("handle_collect");
214
215 MMonPaxos *collect = static_cast<MMonPaxos*>(op->get_req());
216 dout(10) << "handle_collect " << *collect << dendl;
217
218 assert(mon->is_peon()); // mon epoch filter should catch strays
219
220 // we're recoverying, it seems!
221 state = STATE_RECOVERING;
222
223 //update the peon recovery timeout
224 reset_lease_timeout();
225
226 if (collect->first_committed > last_committed+1) {
227 dout(2) << __func__
228 << " leader's lowest version is too high for our last committed"
229 << " (theirs: " << collect->first_committed
230 << "; ours: " << last_committed << ") -- bootstrap!" << dendl;
231 op->mark_paxos_event("need to bootstrap");
232 mon->bootstrap();
233 return;
234 }
235
236 // reply
237 MMonPaxos *last = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LAST,
238 ceph_clock_now());
239 last->last_committed = last_committed;
240 last->first_committed = first_committed;
241
242 version_t previous_pn = accepted_pn;
243
244 // can we accept this pn?
245 if (collect->pn > accepted_pn) {
246 // ok, accept it
247 accepted_pn = collect->pn;
248 accepted_pn_from = collect->pn_from;
249 dout(10) << "accepting pn " << accepted_pn << " from "
250 << accepted_pn_from << dendl;
251
252 auto t(std::make_shared<MonitorDBStore::Transaction>());
253 t->put(get_name(), "accepted_pn", accepted_pn);
254
255 dout(30) << __func__ << " transaction dump:\n";
256 JSONFormatter f(true);
257 t->dump(&f);
258 f.flush(*_dout);
259 *_dout << dendl;
260
261 logger->inc(l_paxos_collect);
262 logger->inc(l_paxos_collect_keys, t->get_keys());
263 logger->inc(l_paxos_collect_bytes, t->get_bytes());
264 utime_t start = ceph_clock_now();
265
266 get_store()->apply_transaction(t);
267
268 utime_t end = ceph_clock_now();
269 logger->tinc(l_paxos_collect_latency, end - start);
270 } else {
271 // don't accept!
272 dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from
273 << ", we already accepted " << accepted_pn
274 << " from " << accepted_pn_from << dendl;
275 }
276 last->pn = accepted_pn;
277 last->pn_from = accepted_pn_from;
278
279 // share whatever committed values we have
280 if (collect->last_committed < last_committed)
281 share_state(last, collect->first_committed, collect->last_committed);
282
283 // do we have an accepted but uncommitted value?
284 // (it'll be at last_committed+1)
285 bufferlist bl;
286 if (collect->last_committed <= last_committed &&
287 get_store()->exists(get_name(), last_committed+1)) {
288 get_store()->get(get_name(), last_committed+1, bl);
289 assert(bl.length() > 0);
290 dout(10) << " sharing our accepted but uncommitted value for "
291 << last_committed+1 << " (" << bl.length() << " bytes)" << dendl;
292 last->values[last_committed+1] = bl;
293
294 version_t v = get_store()->get(get_name(), "pending_v");
295 version_t pn = get_store()->get(get_name(), "pending_pn");
296 if (v && pn && v == last_committed + 1) {
297 last->uncommitted_pn = pn;
298 } else {
299 // previously we didn't record which pn a value was accepted
300 // under! use the pn value we just had... :(
301 dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << previous_pn
302 << " and crossing our fingers" << dendl;
303 last->uncommitted_pn = previous_pn;
304 }
305
306 logger->inc(l_paxos_collect_uncommitted);
307 }
308
309 // send reply
310 collect->get_connection()->send_message(last);
311 }
312
313 /**
314 * @note This is Okay. We share our versions between peer_last_committed and
315 * our last_committed (inclusive), and add their bufferlists to the
316 * message. It will be the peer's job to apply them to its store, as
317 * these bufferlists will contain raw transactions.
318 * This function is called by both the Peon and the Leader. The Peon will
319 * share the state with the Leader during handle_collect(), sharing any
320 * values the leader may be missing (i.e., the leader's last_committed is
321 * lower than the peon's last_committed). The Leader will share the state
322 * with the Peon during handle_last(), if the peon's last_committed is
323 * lower than the leader's last_committed.
324 */
325 void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed,
326 version_t peer_last_committed)
327 {
328 assert(peer_last_committed < last_committed);
329
330 dout(10) << "share_state peer has fc " << peer_first_committed
331 << " lc " << peer_last_committed << dendl;
332 version_t v = peer_last_committed + 1;
333
334 // include incrementals
335 uint64_t bytes = 0;
336 for ( ; v <= last_committed; v++) {
337 if (get_store()->exists(get_name(), v)) {
338 get_store()->get(get_name(), v, m->values[v]);
339 assert(m->values[v].length());
340 dout(10) << " sharing " << v << " ("
341 << m->values[v].length() << " bytes)" << dendl;
342 bytes += m->values[v].length() + 16; // paxos_ + 10 digits = 16
343 }
344 }
345 logger->inc(l_paxos_share_state);
346 logger->inc(l_paxos_share_state_keys, m->values.size());
347 logger->inc(l_paxos_share_state_bytes, bytes);
348
349 m->last_committed = last_committed;
350 }
351
352 /**
353 * Store on disk a state that was shared with us
354 *
355 * Basically, we received a set of version. Or just one. It doesn't matter.
356 * What matters is that we have to stash it in the store. So, we will simply
357 * write every single bufferlist into their own versions on our side (i.e.,
358 * onto paxos-related keys), and then we will decode those same bufferlists
359 * we just wrote and apply the transactions they hold. We will also update
360 * our first and last committed values to point to the new values, if need
361 * be. All all this is done tightly wrapped in a transaction to ensure we
362 * enjoy the atomicity guarantees given by our awesome k/v store.
363 */
364 bool Paxos::store_state(MMonPaxos *m)
365 {
366 auto t(std::make_shared<MonitorDBStore::Transaction>());
367 map<version_t,bufferlist>::iterator start = m->values.begin();
368 bool changed = false;
369
370 // build map of values to store
371 // we want to write the range [last_committed, m->last_committed] only.
372 if (start != m->values.end() &&
373 start->first > last_committed + 1) {
374 // ignore everything if values start in the future.
375 dout(10) << "store_state ignoring all values, they start at " << start->first
376 << " > last_committed+1" << dendl;
377 return false;
378 }
379
380 // push forward the start position on the message's values iterator, up until
381 // we run out of positions or we find a position matching 'last_committed'.
382 while (start != m->values.end() && start->first <= last_committed) {
383 ++start;
384 }
385
386 // make sure we get the right interval of values to apply by pushing forward
387 // the 'end' iterator until it matches the message's 'last_committed'.
388 map<version_t,bufferlist>::iterator end = start;
389 while (end != m->values.end() && end->first <= m->last_committed) {
390 last_committed = end->first;
391 ++end;
392 }
393
394 if (start == end) {
395 dout(10) << "store_state nothing to commit" << dendl;
396 } else {
397 dout(10) << "store_state [" << start->first << ".."
398 << last_committed << "]" << dendl;
399 t->put(get_name(), "last_committed", last_committed);
400
401 // we should apply the state here -- decode every single bufferlist in the
402 // map and append the transactions to 't'.
403 map<version_t,bufferlist>::iterator it;
404 for (it = start; it != end; ++it) {
405 // write the bufferlist as the version's value
406 t->put(get_name(), it->first, it->second);
407 // decode the bufferlist and append it to the transaction we will shortly
408 // apply.
409 decode_append_transaction(t, it->second);
410 }
411
412 // discard obsolete uncommitted value?
413 if (uncommitted_v && uncommitted_v <= last_committed) {
414 dout(10) << " forgetting obsolete uncommitted value " << uncommitted_v
415 << " pn " << uncommitted_pn << dendl;
416 uncommitted_v = 0;
417 uncommitted_pn = 0;
418 uncommitted_value.clear();
419 }
420 }
421 if (!t->empty()) {
422 dout(30) << __func__ << " transaction dump:\n";
423 JSONFormatter f(true);
424 t->dump(&f);
425 f.flush(*_dout);
426 *_dout << dendl;
427
428 logger->inc(l_paxos_store_state);
429 logger->inc(l_paxos_store_state_bytes, t->get_bytes());
430 logger->inc(l_paxos_store_state_keys, t->get_keys());
431 utime_t start = ceph_clock_now();
432
433 get_store()->apply_transaction(t);
434
435 utime_t end = ceph_clock_now();
436 logger->tinc(l_paxos_store_state_latency, end - start);
437
438 // refresh first_committed; this txn may have trimmed.
439 first_committed = get_store()->get(get_name(), "first_committed");
440
441 _sanity_check_store();
442 changed = true;
443 }
444
445 return changed;
446 }
447
448 void Paxos::_sanity_check_store()
449 {
450 version_t lc = get_store()->get(get_name(), "last_committed");
451 assert(lc == last_committed);
452 }
453
454
455 // leader
456 void Paxos::handle_last(MonOpRequestRef op)
457 {
458 op->mark_paxos_event("handle_last");
459 MMonPaxos *last = static_cast<MMonPaxos*>(op->get_req());
460 bool need_refresh = false;
461 int from = last->get_source().num();
462
463 dout(10) << "handle_last " << *last << dendl;
464
465 if (!mon->is_leader()) {
466 dout(10) << "not leader, dropping" << dendl;
467 return;
468 }
469
470 // note peer's first_ and last_committed, in case we learn a new
471 // commit and need to push it to them.
472 peer_first_committed[from] = last->first_committed;
473 peer_last_committed[from] = last->last_committed;
474
475 if (last->first_committed > last_committed + 1) {
476 dout(5) << __func__
477 << " mon." << from
478 << " lowest version is too high for our last committed"
479 << " (theirs: " << last->first_committed
480 << "; ours: " << last_committed << ") -- bootstrap!" << dendl;
481 op->mark_paxos_event("need to bootstrap");
482 mon->bootstrap();
483 return;
484 }
485
486 assert(g_conf->paxos_kill_at != 1);
487
488 // store any committed values if any are specified in the message
489 need_refresh = store_state(last);
490
491 assert(g_conf->paxos_kill_at != 2);
492
493 // is everyone contiguous and up to date?
494 for (map<int,version_t>::iterator p = peer_last_committed.begin();
495 p != peer_last_committed.end();
496 ++p) {
497 if (p->second + 1 < first_committed && first_committed > 1) {
498 dout(5) << __func__
499 << " peon " << p->first
500 << " last_committed (" << p->second
501 << ") is too low for our first_committed (" << first_committed
502 << ") -- bootstrap!" << dendl;
503 op->mark_paxos_event("need to bootstrap");
504 mon->bootstrap();
505 return;
506 }
507 if (p->second < last_committed) {
508 // share committed values
509 dout(10) << " sending commit to mon." << p->first << dendl;
510 MMonPaxos *commit = new MMonPaxos(mon->get_epoch(),
511 MMonPaxos::OP_COMMIT,
512 ceph_clock_now());
513 share_state(commit, peer_first_committed[p->first], p->second);
514 mon->messenger->send_message(commit, mon->monmap->get_inst(p->first));
515 }
516 }
517
518 // do they accept your pn?
519 if (last->pn > accepted_pn) {
520 // no, try again.
521 dout(10) << " they had a higher pn than us, picking a new one." << dendl;
522
523 // cancel timeout event
524 mon->timer.cancel_event(collect_timeout_event);
525 collect_timeout_event = 0;
526
527 collect(last->pn);
528 } else if (last->pn == accepted_pn) {
529 // yes, they accepted our pn. great.
530 num_last++;
531 dout(10) << " they accepted our pn, we now have "
532 << num_last << " peons" << dendl;
533
534 // did this person send back an accepted but uncommitted value?
535 if (last->uncommitted_pn) {
536 if (last->uncommitted_pn >= uncommitted_pn &&
537 last->last_committed >= last_committed &&
538 last->last_committed + 1 >= uncommitted_v) {
539 uncommitted_v = last->last_committed+1;
540 uncommitted_pn = last->uncommitted_pn;
541 uncommitted_value = last->values[uncommitted_v];
542 dout(10) << "we learned an uncommitted value for " << uncommitted_v
543 << " pn " << uncommitted_pn
544 << " " << uncommitted_value.length() << " bytes"
545 << dendl;
546 } else {
547 dout(10) << "ignoring uncommitted value for " << (last->last_committed+1)
548 << " pn " << last->uncommitted_pn
549 << " " << last->values[last->last_committed+1].length() << " bytes"
550 << dendl;
551 }
552 }
553
554 // is that everyone?
555 if (num_last == mon->get_quorum().size()) {
556 // cancel timeout event
557 mon->timer.cancel_event(collect_timeout_event);
558 collect_timeout_event = 0;
559 peer_first_committed.clear();
560 peer_last_committed.clear();
561
562 // almost...
563
564 // did we learn an old value?
565 if (uncommitted_v == last_committed+1 &&
566 uncommitted_value.length()) {
567 dout(10) << "that's everyone. begin on old learned value" << dendl;
568 state = STATE_UPDATING_PREVIOUS;
569 begin(uncommitted_value);
570 } else {
571 // active!
572 dout(10) << "that's everyone. active!" << dendl;
573 extend_lease();
574
575 need_refresh = false;
576 if (do_refresh()) {
577 finish_round();
578 }
579 }
580 }
581 } else {
582 // no, this is an old message, discard
583 dout(10) << "old pn, ignoring" << dendl;
584 }
585
586 if (need_refresh)
587 (void)do_refresh();
588 }
589
590 void Paxos::collect_timeout()
591 {
592 dout(1) << "collect timeout, calling fresh election" << dendl;
593 collect_timeout_event = 0;
594 logger->inc(l_paxos_collect_timeout);
595 assert(mon->is_leader());
596 mon->bootstrap();
597 }
598
599
600 // leader
601 void Paxos::begin(bufferlist& v)
602 {
603 dout(10) << "begin for " << last_committed+1 << " "
604 << v.length() << " bytes"
605 << dendl;
606
607 assert(mon->is_leader());
608 assert(is_updating() || is_updating_previous());
609
610 // we must already have a majority for this to work.
611 assert(mon->get_quorum().size() == 1 ||
612 num_last > (unsigned)mon->monmap->size()/2);
613
614 // and no value, yet.
615 assert(new_value.length() == 0);
616
617 // accept it ourselves
618 accepted.clear();
619 accepted.insert(mon->rank);
620 new_value = v;
621
622 if (last_committed == 0) {
623 auto t(std::make_shared<MonitorDBStore::Transaction>());
624 // initial base case; set first_committed too
625 t->put(get_name(), "first_committed", 1);
626 decode_append_transaction(t, new_value);
627
628 bufferlist tx_bl;
629 t->encode(tx_bl);
630
631 new_value = tx_bl;
632 }
633
634 // store the proposed value in the store. IF it is accepted, we will then
635 // have to decode it into a transaction and apply it.
636 auto t(std::make_shared<MonitorDBStore::Transaction>());
637 t->put(get_name(), last_committed+1, new_value);
638
639 // note which pn this pending value is for.
640 t->put(get_name(), "pending_v", last_committed + 1);
641 t->put(get_name(), "pending_pn", accepted_pn);
642
643 dout(30) << __func__ << " transaction dump:\n";
644 JSONFormatter f(true);
645 t->dump(&f);
646 f.flush(*_dout);
647 auto debug_tx(std::make_shared<MonitorDBStore::Transaction>());
648 bufferlist::iterator new_value_it = new_value.begin();
649 debug_tx->decode(new_value_it);
650 debug_tx->dump(&f);
651 *_dout << "\nbl dump:\n";
652 f.flush(*_dout);
653 *_dout << dendl;
654
655 logger->inc(l_paxos_begin);
656 logger->inc(l_paxos_begin_keys, t->get_keys());
657 logger->inc(l_paxos_begin_bytes, t->get_bytes());
658 utime_t start = ceph_clock_now();
659
660 get_store()->apply_transaction(t);
661
662 utime_t end = ceph_clock_now();
663 logger->tinc(l_paxos_begin_latency, end - start);
664
665 assert(g_conf->paxos_kill_at != 3);
666
667 if (mon->get_quorum().size() == 1) {
668 // we're alone, take it easy
669 commit_start();
670 return;
671 }
672
673 // ask others to accept it too!
674 for (set<int>::const_iterator p = mon->get_quorum().begin();
675 p != mon->get_quorum().end();
676 ++p) {
677 if (*p == mon->rank) continue;
678
679 dout(10) << " sending begin to mon." << *p << dendl;
680 MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN,
681 ceph_clock_now());
682 begin->values[last_committed+1] = new_value;
683 begin->last_committed = last_committed;
684 begin->pn = accepted_pn;
685
686 mon->messenger->send_message(begin, mon->monmap->get_inst(*p));
687 }
688
689 // set timeout event
690 accept_timeout_event = new C_MonContext(mon, [this](int r) {
691 if (r == -ECANCELED)
692 return;
693 accept_timeout();
694 });
695 mon->timer.add_event_after(g_conf->mon_accept_timeout_factor *
696 g_conf->mon_lease,
697 accept_timeout_event);
698 }
699
700 // peon
701 void Paxos::handle_begin(MonOpRequestRef op)
702 {
703 op->mark_paxos_event("handle_begin");
704 MMonPaxos *begin = static_cast<MMonPaxos*>(op->get_req());
705 dout(10) << "handle_begin " << *begin << dendl;
706
707 // can we accept this?
708 if (begin->pn < accepted_pn) {
709 dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
710 op->mark_paxos_event("have higher pn, ignore");
711 return;
712 }
713 assert(begin->pn == accepted_pn);
714 assert(begin->last_committed == last_committed);
715
716 assert(g_conf->paxos_kill_at != 4);
717
718 logger->inc(l_paxos_begin);
719
720 // set state.
721 state = STATE_UPDATING;
722 lease_expire = utime_t(); // cancel lease
723
724 // yes.
725 version_t v = last_committed+1;
726 dout(10) << "accepting value for " << v << " pn " << accepted_pn << dendl;
727 // store the accepted value onto our store. We will have to decode it and
728 // apply its transaction once we receive permission to commit.
729 auto t(std::make_shared<MonitorDBStore::Transaction>());
730 t->put(get_name(), v, begin->values[v]);
731
732 // note which pn this pending value is for.
733 t->put(get_name(), "pending_v", v);
734 t->put(get_name(), "pending_pn", accepted_pn);
735
736 dout(30) << __func__ << " transaction dump:\n";
737 JSONFormatter f(true);
738 t->dump(&f);
739 f.flush(*_dout);
740 *_dout << dendl;
741
742 logger->inc(l_paxos_begin_bytes, t->get_bytes());
743 utime_t start = ceph_clock_now();
744
745 get_store()->apply_transaction(t);
746
747 utime_t end = ceph_clock_now();
748 logger->tinc(l_paxos_begin_latency, end - start);
749
750 assert(g_conf->paxos_kill_at != 5);
751
752 // reply
753 MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT,
754 ceph_clock_now());
755 accept->pn = accepted_pn;
756 accept->last_committed = last_committed;
757 begin->get_connection()->send_message(accept);
758 }
759
760 // leader
761 void Paxos::handle_accept(MonOpRequestRef op)
762 {
763 op->mark_paxos_event("handle_accept");
764 MMonPaxos *accept = static_cast<MMonPaxos*>(op->get_req());
765 dout(10) << "handle_accept " << *accept << dendl;
766 int from = accept->get_source().num();
767
768 if (accept->pn != accepted_pn) {
769 // we accepted a higher pn, from some other leader
770 dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl;
771 op->mark_paxos_event("have higher pn, ignore");
772 return;
773 }
774 if (last_committed > 0 &&
775 accept->last_committed < last_committed-1) {
776 dout(10) << " this is from an old round, ignoring" << dendl;
777 op->mark_paxos_event("old round, ignore");
778 return;
779 }
780 assert(accept->last_committed == last_committed || // not committed
781 accept->last_committed == last_committed-1); // committed
782
783 assert(is_updating() || is_updating_previous());
784 assert(accepted.count(from) == 0);
785 accepted.insert(from);
786 dout(10) << " now " << accepted << " have accepted" << dendl;
787
788 assert(g_conf->paxos_kill_at != 6);
789
790 // only commit (and expose committed state) when we get *all* quorum
791 // members to accept. otherwise, they may still be sharing the now
792 // stale state.
793 // FIXME: we can improve this with an additional lease revocation message
794 // that doesn't block for the persist.
795 if (accepted == mon->get_quorum()) {
796 // yay, commit!
797 dout(10) << " got majority, committing, done with update" << dendl;
798 op->mark_paxos_event("commit_start");
799 commit_start();
800 }
801 }
802
803 void Paxos::accept_timeout()
804 {
805 dout(1) << "accept timeout, calling fresh election" << dendl;
806 accept_timeout_event = 0;
807 assert(mon->is_leader());
808 assert(is_updating() || is_updating_previous() || is_writing() ||
809 is_writing_previous());
810 logger->inc(l_paxos_accept_timeout);
811 mon->bootstrap();
812 }
813
814 struct C_Committed : public Context {
815 Paxos *paxos;
816 explicit C_Committed(Paxos *p) : paxos(p) {}
817 void finish(int r) override {
818 assert(r >= 0);
819 Mutex::Locker l(paxos->mon->lock);
820 paxos->commit_finish();
821 }
822 };
823
824 void Paxos::commit_start()
825 {
826 dout(10) << __func__ << " " << (last_committed+1) << dendl;
827
828 assert(g_conf->paxos_kill_at != 7);
829
830 auto t(std::make_shared<MonitorDBStore::Transaction>());
831
832 // commit locally
833 t->put(get_name(), "last_committed", last_committed + 1);
834
835 // decode the value and apply its transaction to the store.
836 // this value can now be read from last_committed.
837 decode_append_transaction(t, new_value);
838
839 dout(30) << __func__ << " transaction dump:\n";
840 JSONFormatter f(true);
841 t->dump(&f);
842 f.flush(*_dout);
843 *_dout << dendl;
844
845 logger->inc(l_paxos_commit);
846 logger->inc(l_paxos_commit_keys, t->get_keys());
847 logger->inc(l_paxos_commit_bytes, t->get_bytes());
848 commit_start_stamp = ceph_clock_now();
849
850 get_store()->queue_transaction(t, new C_Committed(this));
851
852 if (is_updating_previous())
853 state = STATE_WRITING_PREVIOUS;
854 else if (is_updating())
855 state = STATE_WRITING;
856 else
857 ceph_abort();
858
859 if (mon->get_quorum().size() > 1) {
860 // cancel timeout event
861 mon->timer.cancel_event(accept_timeout_event);
862 accept_timeout_event = 0;
863 }
864 }
865
866 void Paxos::commit_finish()
867 {
868 dout(20) << __func__ << " " << (last_committed+1) << dendl;
869 utime_t end = ceph_clock_now();
870 logger->tinc(l_paxos_commit_latency, end - commit_start_stamp);
871
872 assert(g_conf->paxos_kill_at != 8);
873
874 // cancel lease - it was for the old value.
875 // (this would only happen if message layer lost the 'begin', but
876 // leader still got a majority and committed with out us.)
877 lease_expire = utime_t(); // cancel lease
878
879 last_committed++;
880 last_commit_time = ceph_clock_now();
881
882 // refresh first_committed; this txn may have trimmed.
883 first_committed = get_store()->get(get_name(), "first_committed");
884
885 _sanity_check_store();
886
887 // tell everyone
888 for (set<int>::const_iterator p = mon->get_quorum().begin();
889 p != mon->get_quorum().end();
890 ++p) {
891 if (*p == mon->rank) continue;
892
893 dout(10) << " sending commit to mon." << *p << dendl;
894 MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,
895 ceph_clock_now());
896 commit->values[last_committed] = new_value;
897 commit->pn = accepted_pn;
898 commit->last_committed = last_committed;
899
900 mon->messenger->send_message(commit, mon->monmap->get_inst(*p));
901 }
902
903 assert(g_conf->paxos_kill_at != 9);
904
905 // get ready for a new round.
906 new_value.clear();
907
908 // WRITING -> REFRESH
909 // among other things, this lets do_refresh() -> mon->bootstrap() know
910 // it doesn't need to flush the store queue
911 assert(is_writing() || is_writing_previous());
912 state = STATE_REFRESH;
913
914 if (do_refresh()) {
915 commit_proposal();
916 if (mon->get_quorum().size() > 1) {
917 extend_lease();
918 }
919
920 finish_contexts(g_ceph_context, waiting_for_commit);
921
922 assert(g_conf->paxos_kill_at != 10);
923
924 finish_round();
925 }
926 }
927
928
929 void Paxos::handle_commit(MonOpRequestRef op)
930 {
931 op->mark_paxos_event("handle_commit");
932 MMonPaxos *commit = static_cast<MMonPaxos*>(op->get_req());
933 dout(10) << "handle_commit on " << commit->last_committed << dendl;
934
935 logger->inc(l_paxos_commit);
936
937 if (!mon->is_peon()) {
938 dout(10) << "not a peon, dropping" << dendl;
939 ceph_abort();
940 return;
941 }
942
943 op->mark_paxos_event("store_state");
944 store_state(commit);
945
946 if (do_refresh()) {
947 finish_contexts(g_ceph_context, waiting_for_commit);
948 }
949 }
950
951 void Paxos::extend_lease()
952 {
953 assert(mon->is_leader());
954 //assert(is_active());
955
956 lease_expire = ceph_clock_now();
957 lease_expire += g_conf->mon_lease;
958 acked_lease.clear();
959 acked_lease.insert(mon->rank);
960
961 dout(7) << "extend_lease now+" << g_conf->mon_lease
962 << " (" << lease_expire << ")" << dendl;
963
964 // bcast
965 for (set<int>::const_iterator p = mon->get_quorum().begin();
966 p != mon->get_quorum().end(); ++p) {
967
968 if (*p == mon->rank) continue;
969 MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE,
970 ceph_clock_now());
971 lease->last_committed = last_committed;
972 lease->lease_timestamp = lease_expire;
973 lease->first_committed = first_committed;
974 mon->messenger->send_message(lease, mon->monmap->get_inst(*p));
975 }
976
977 // set timeout event.
978 // if old timeout is still in place, leave it.
979 if (!lease_ack_timeout_event) {
980 lease_ack_timeout_event = new C_MonContext(mon, [this](int r) {
981 if (r == -ECANCELED)
982 return;
983 lease_ack_timeout();
984 });
985 mon->timer.add_event_after(g_conf->mon_lease_ack_timeout_factor *
986 g_conf->mon_lease,
987 lease_ack_timeout_event);
988 }
989
990 // set renew event
991 lease_renew_event = new C_MonContext(mon, [this](int r) {
992 if (r == -ECANCELED)
993 return;
994 lease_renew_timeout();
995 });
996 utime_t at = lease_expire;
997 at -= g_conf->mon_lease;
998 at += g_conf->mon_lease_renew_interval_factor * g_conf->mon_lease;
999 mon->timer.add_event_at(at, lease_renew_event);
1000 }
1001
1002 void Paxos::warn_on_future_time(utime_t t, entity_name_t from)
1003 {
1004 utime_t now = ceph_clock_now();
1005 if (t > now) {
1006 utime_t diff = t - now;
1007 if (diff > g_conf->mon_clock_drift_allowed) {
1008 utime_t warn_diff = now - last_clock_drift_warn;
1009 if (warn_diff >
1010 pow(g_conf->mon_clock_drift_warn_backoff, clock_drift_warned)) {
1011 mon->clog->warn() << "message from " << from << " was stamped " << diff
1012 << "s in the future, clocks not synchronized";
1013 last_clock_drift_warn = ceph_clock_now();
1014 ++clock_drift_warned;
1015 }
1016 }
1017 }
1018
1019 }
1020
1021 bool Paxos::do_refresh()
1022 {
1023 bool need_bootstrap = false;
1024
1025 utime_t start = ceph_clock_now();
1026
1027 // make sure we have the latest state loaded up
1028 mon->refresh_from_paxos(&need_bootstrap);
1029
1030 utime_t end = ceph_clock_now();
1031 logger->inc(l_paxos_refresh);
1032 logger->tinc(l_paxos_refresh_latency, end - start);
1033
1034 if (need_bootstrap) {
1035 dout(10) << " doing requested bootstrap" << dendl;
1036 mon->bootstrap();
1037 return false;
1038 }
1039
1040 return true;
1041 }
1042
1043 void Paxos::commit_proposal()
1044 {
1045 dout(10) << __func__ << dendl;
1046 assert(mon->is_leader());
1047 assert(is_refresh());
1048
1049 finish_contexts(g_ceph_context, committing_finishers);
1050 }
1051
1052 void Paxos::finish_round()
1053 {
1054 dout(10) << __func__ << dendl;
1055 assert(mon->is_leader());
1056
1057 // ok, now go active!
1058 state = STATE_ACTIVE;
1059
1060 dout(20) << __func__ << " waiting_for_acting" << dendl;
1061 finish_contexts(g_ceph_context, waiting_for_active);
1062 dout(20) << __func__ << " waiting_for_readable" << dendl;
1063 finish_contexts(g_ceph_context, waiting_for_readable);
1064 dout(20) << __func__ << " waiting_for_writeable" << dendl;
1065 finish_contexts(g_ceph_context, waiting_for_writeable);
1066
1067 dout(10) << __func__ << " done w/ waiters, state " << get_statename(state) << dendl;
1068
1069 if (should_trim()) {
1070 trim();
1071 }
1072
1073 if (is_active() && pending_proposal) {
1074 propose_pending();
1075 }
1076 }
1077
1078
1079 // peon
1080 void Paxos::handle_lease(MonOpRequestRef op)
1081 {
1082 op->mark_paxos_event("handle_lease");
1083 MMonPaxos *lease = static_cast<MMonPaxos*>(op->get_req());
1084 // sanity
1085 if (!mon->is_peon() ||
1086 last_committed != lease->last_committed) {
1087 dout(10) << "handle_lease i'm not a peon, or they're not the leader,"
1088 << " or the last_committed doesn't match, dropping" << dendl;
1089 op->mark_paxos_event("invalid lease, ignore");
1090 return;
1091 }
1092
1093 warn_on_future_time(lease->sent_timestamp, lease->get_source());
1094
1095 // extend lease
1096 if (lease_expire < lease->lease_timestamp) {
1097 lease_expire = lease->lease_timestamp;
1098
1099 utime_t now = ceph_clock_now();
1100 if (lease_expire < now) {
1101 utime_t diff = now - lease_expire;
1102 derr << "lease_expire from " << lease->get_source_inst() << " is " << diff << " seconds in the past; mons are probably laggy (or possibly clocks are too skewed)" << dendl;
1103 }
1104 }
1105
1106 state = STATE_ACTIVE;
1107
1108 dout(10) << "handle_lease on " << lease->last_committed
1109 << " now " << lease_expire << dendl;
1110
1111 // ack
1112 MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK,
1113 ceph_clock_now());
1114 ack->last_committed = last_committed;
1115 ack->first_committed = first_committed;
1116 ack->lease_timestamp = ceph_clock_now();
1117 ::encode(mon->session_map.feature_map, ack->feature_map);
1118 lease->get_connection()->send_message(ack);
1119
1120 // (re)set timeout event.
1121 reset_lease_timeout();
1122
1123 // kick waiters
1124 finish_contexts(g_ceph_context, waiting_for_active);
1125 if (is_readable())
1126 finish_contexts(g_ceph_context, waiting_for_readable);
1127 }
1128
1129 void Paxos::handle_lease_ack(MonOpRequestRef op)
1130 {
1131 op->mark_paxos_event("handle_lease_ack");
1132 MMonPaxos *ack = static_cast<MMonPaxos*>(op->get_req());
1133 int from = ack->get_source().num();
1134
1135 if (!lease_ack_timeout_event) {
1136 dout(10) << "handle_lease_ack from " << ack->get_source()
1137 << " -- stray (probably since revoked)" << dendl;
1138
1139 } else if (acked_lease.count(from) == 0) {
1140 acked_lease.insert(from);
1141 if (ack->feature_map.length()) {
1142 auto p = ack->feature_map.begin();
1143 FeatureMap& t = mon->quorum_feature_map[from];
1144 ::decode(t, p);
1145 }
1146 if (acked_lease == mon->get_quorum()) {
1147 // yay!
1148 dout(10) << "handle_lease_ack from " << ack->get_source()
1149 << " -- got everyone" << dendl;
1150 mon->timer.cancel_event(lease_ack_timeout_event);
1151 lease_ack_timeout_event = 0;
1152
1153
1154 } else {
1155 dout(10) << "handle_lease_ack from " << ack->get_source()
1156 << " -- still need "
1157 << mon->get_quorum().size() - acked_lease.size()
1158 << " more" << dendl;
1159 }
1160 } else {
1161 dout(10) << "handle_lease_ack from " << ack->get_source()
1162 << " dup (lagging!), ignoring" << dendl;
1163 }
1164
1165 warn_on_future_time(ack->sent_timestamp, ack->get_source());
1166 }
1167
1168 void Paxos::lease_ack_timeout()
1169 {
1170 dout(1) << "lease_ack_timeout -- calling new election" << dendl;
1171 assert(mon->is_leader());
1172 assert(is_active());
1173 logger->inc(l_paxos_lease_ack_timeout);
1174 lease_ack_timeout_event = 0;
1175 mon->bootstrap();
1176 }
1177
1178 void Paxos::reset_lease_timeout()
1179 {
1180 dout(20) << "reset_lease_timeout - setting timeout event" << dendl;
1181 if (lease_timeout_event)
1182 mon->timer.cancel_event(lease_timeout_event);
1183 lease_timeout_event = new C_MonContext(mon, [this](int r) {
1184 if (r == -ECANCELED)
1185 return;
1186 lease_timeout();
1187 });
1188 mon->timer.add_event_after(g_conf->mon_lease_ack_timeout_factor *
1189 g_conf->mon_lease,
1190 lease_timeout_event);
1191 }
1192
1193 void Paxos::lease_timeout()
1194 {
1195 dout(1) << "lease_timeout -- calling new election" << dendl;
1196 assert(mon->is_peon());
1197 logger->inc(l_paxos_lease_timeout);
1198 lease_timeout_event = 0;
1199 mon->bootstrap();
1200 }
1201
1202 void Paxos::lease_renew_timeout()
1203 {
1204 lease_renew_event = 0;
1205 extend_lease();
1206 }
1207
1208
1209 /*
1210 * trim old states
1211 */
1212 void Paxos::trim()
1213 {
1214 assert(should_trim());
1215 version_t end = MIN(get_version() - g_conf->paxos_min,
1216 get_first_committed() + g_conf->paxos_trim_max);
1217
1218 if (first_committed >= end)
1219 return;
1220
1221 dout(10) << "trim to " << end << " (was " << first_committed << ")" << dendl;
1222
1223 MonitorDBStore::TransactionRef t = get_pending_transaction();
1224
1225 for (version_t v = first_committed; v < end; ++v) {
1226 dout(10) << "trim " << v << dendl;
1227 t->erase(get_name(), v);
1228 }
1229 t->put(get_name(), "first_committed", end);
1230 if (g_conf->mon_compact_on_trim) {
1231 dout(10) << " compacting trimmed range" << dendl;
1232 t->compact_range(get_name(), stringify(first_committed - 1), stringify(end));
1233 }
1234
1235 trimming = true;
1236 queue_pending_finisher(new C_Trimmed(this));
1237 }
1238
1239 /*
1240 * return a globally unique, monotonically increasing proposal number
1241 */
1242 version_t Paxos::get_new_proposal_number(version_t gt)
1243 {
1244 if (last_pn < gt)
1245 last_pn = gt;
1246
1247 // update. make it unique among all monitors.
1248 last_pn /= 100;
1249 last_pn++;
1250 last_pn *= 100;
1251 last_pn += (version_t)mon->rank;
1252
1253 // write
1254 auto t(std::make_shared<MonitorDBStore::Transaction>());
1255 t->put(get_name(), "last_pn", last_pn);
1256
1257 dout(30) << __func__ << " transaction dump:\n";
1258 JSONFormatter f(true);
1259 t->dump(&f);
1260 f.flush(*_dout);
1261 *_dout << dendl;
1262
1263 logger->inc(l_paxos_new_pn);
1264 utime_t start = ceph_clock_now();
1265
1266 get_store()->apply_transaction(t);
1267
1268 utime_t end = ceph_clock_now();
1269 logger->tinc(l_paxos_new_pn_latency, end - start);
1270
1271 dout(10) << "get_new_proposal_number = " << last_pn << dendl;
1272 return last_pn;
1273 }
1274
1275
1276 void Paxos::cancel_events()
1277 {
1278 if (collect_timeout_event) {
1279 mon->timer.cancel_event(collect_timeout_event);
1280 collect_timeout_event = 0;
1281 }
1282 if (accept_timeout_event) {
1283 mon->timer.cancel_event(accept_timeout_event);
1284 accept_timeout_event = 0;
1285 }
1286 if (lease_renew_event) {
1287 mon->timer.cancel_event(lease_renew_event);
1288 lease_renew_event = 0;
1289 }
1290 if (lease_ack_timeout_event) {
1291 mon->timer.cancel_event(lease_ack_timeout_event);
1292 lease_ack_timeout_event = 0;
1293 }
1294 if (lease_timeout_event) {
1295 mon->timer.cancel_event(lease_timeout_event);
1296 lease_timeout_event = 0;
1297 }
1298 }
1299
1300 void Paxos::shutdown()
1301 {
1302 dout(10) << __func__ << " cancel all contexts" << dendl;
1303
1304 // discard pending transaction
1305 pending_proposal.reset();
1306
1307 finish_contexts(g_ceph_context, waiting_for_writeable, -ECANCELED);
1308 finish_contexts(g_ceph_context, waiting_for_commit, -ECANCELED);
1309 finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED);
1310 finish_contexts(g_ceph_context, waiting_for_active, -ECANCELED);
1311 finish_contexts(g_ceph_context, pending_finishers, -ECANCELED);
1312 finish_contexts(g_ceph_context, committing_finishers, -ECANCELED);
1313 if (logger)
1314 g_ceph_context->get_perfcounters_collection()->remove(logger);
1315 delete logger;
1316 }
1317
1318 void Paxos::leader_init()
1319 {
1320 cancel_events();
1321 new_value.clear();
1322
1323 // discard pending transaction
1324 pending_proposal.reset();
1325
1326 finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
1327 finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
1328
1329 logger->inc(l_paxos_start_leader);
1330
1331 if (mon->get_quorum().size() == 1) {
1332 state = STATE_ACTIVE;
1333 return;
1334 }
1335
1336 state = STATE_RECOVERING;
1337 lease_expire = utime_t();
1338 dout(10) << "leader_init -- starting paxos recovery" << dendl;
1339 collect(0);
1340 }
1341
1342 void Paxos::peon_init()
1343 {
1344 cancel_events();
1345 new_value.clear();
1346
1347 state = STATE_RECOVERING;
1348 lease_expire = utime_t();
1349 dout(10) << "peon_init -- i am a peon" << dendl;
1350
1351 // start a timer, in case the leader never manages to issue a lease
1352 reset_lease_timeout();
1353
1354 // discard pending transaction
1355 pending_proposal.reset();
1356
1357 // no chance to write now!
1358 finish_contexts(g_ceph_context, waiting_for_writeable, -EAGAIN);
1359 finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
1360 finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
1361 finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
1362
1363 logger->inc(l_paxos_start_peon);
1364 }
1365
1366 void Paxos::restart()
1367 {
1368 dout(10) << "restart -- canceling timeouts" << dendl;
1369 cancel_events();
1370 new_value.clear();
1371
1372 if (is_writing() || is_writing_previous()) {
1373 dout(10) << __func__ << " flushing" << dendl;
1374 mon->lock.Unlock();
1375 mon->store->flush();
1376 mon->lock.Lock();
1377 dout(10) << __func__ << " flushed" << dendl;
1378 }
1379 state = STATE_RECOVERING;
1380
1381 // discard pending transaction
1382 pending_proposal.reset();
1383
1384 finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
1385 finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
1386 finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
1387 finish_contexts(g_ceph_context, waiting_for_active, -EAGAIN);
1388
1389 logger->inc(l_paxos_restart);
1390 }
1391
1392
1393 void Paxos::dispatch(MonOpRequestRef op)
1394 {
1395 assert(op->is_type_paxos());
1396 op->mark_paxos_event("dispatch");
1397 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
1398 // election in progress?
1399 if (!mon->is_leader() && !mon->is_peon()) {
1400 dout(5) << "election in progress, dropping " << *m << dendl;
1401 return;
1402 }
1403
1404 // check sanity
1405 assert(mon->is_leader() ||
1406 (mon->is_peon() && m->get_source().num() == mon->get_leader()));
1407
1408 switch (m->get_type()) {
1409
1410 case MSG_MON_PAXOS:
1411 {
1412 MMonPaxos *pm = reinterpret_cast<MMonPaxos*>(m);
1413
1414 // NOTE: these ops are defined in messages/MMonPaxos.h
1415 switch (pm->op) {
1416 // learner
1417 case MMonPaxos::OP_COLLECT:
1418 handle_collect(op);
1419 break;
1420 case MMonPaxos::OP_LAST:
1421 handle_last(op);
1422 break;
1423 case MMonPaxos::OP_BEGIN:
1424 handle_begin(op);
1425 break;
1426 case MMonPaxos::OP_ACCEPT:
1427 handle_accept(op);
1428 break;
1429 case MMonPaxos::OP_COMMIT:
1430 handle_commit(op);
1431 break;
1432 case MMonPaxos::OP_LEASE:
1433 handle_lease(op);
1434 break;
1435 case MMonPaxos::OP_LEASE_ACK:
1436 handle_lease_ack(op);
1437 break;
1438 default:
1439 ceph_abort();
1440 }
1441 }
1442 break;
1443
1444 default:
1445 ceph_abort();
1446 }
1447 }
1448
1449
1450 // -----------------
1451 // service interface
1452
1453 // -- READ --
1454
1455 bool Paxos::is_readable(version_t v)
1456 {
1457 bool ret;
1458 if (v > last_committed)
1459 ret = false;
1460 else
1461 ret =
1462 (mon->is_peon() || mon->is_leader()) &&
1463 (is_active() || is_updating() || is_writing()) &&
1464 last_committed > 0 && is_lease_valid(); // must have a value alone, or have lease
1465 dout(5) << __func__ << " = " << (int)ret
1466 << " - now=" << ceph_clock_now()
1467 << " lease_expire=" << lease_expire
1468 << " has v" << v << " lc " << last_committed
1469 << dendl;
1470 return ret;
1471 }
1472
1473 bool Paxos::read(version_t v, bufferlist &bl)
1474 {
1475 if (!get_store()->get(get_name(), v, bl))
1476 return false;
1477 return true;
1478 }
1479
1480 version_t Paxos::read_current(bufferlist &bl)
1481 {
1482 if (read(last_committed, bl))
1483 return last_committed;
1484 return 0;
1485 }
1486
1487
1488 bool Paxos::is_lease_valid()
1489 {
1490 return ((mon->get_quorum().size() == 1)
1491 || (ceph_clock_now() < lease_expire));
1492 }
1493
1494 // -- WRITE --
1495
1496 bool Paxos::is_writeable()
1497 {
1498 return
1499 mon->is_leader() &&
1500 is_active() &&
1501 is_lease_valid();
1502 }
1503
1504 void Paxos::propose_pending()
1505 {
1506 assert(is_active());
1507 assert(pending_proposal);
1508
1509 cancel_events();
1510
1511 bufferlist bl;
1512 pending_proposal->encode(bl);
1513
1514 dout(10) << __func__ << " " << (last_committed + 1)
1515 << " " << bl.length() << " bytes" << dendl;
1516 dout(30) << __func__ << " transaction dump:\n";
1517 JSONFormatter f(true);
1518 pending_proposal->dump(&f);
1519 f.flush(*_dout);
1520 *_dout << dendl;
1521
1522 pending_proposal.reset();
1523
1524 committing_finishers.swap(pending_finishers);
1525 state = STATE_UPDATING;
1526 begin(bl);
1527 }
1528
1529 void Paxos::queue_pending_finisher(Context *onfinished)
1530 {
1531 dout(5) << __func__ << " " << onfinished << dendl;
1532 assert(onfinished);
1533 pending_finishers.push_back(onfinished);
1534 }
1535
1536 MonitorDBStore::TransactionRef Paxos::get_pending_transaction()
1537 {
1538 assert(mon->is_leader());
1539 if (!pending_proposal) {
1540 pending_proposal.reset(new MonitorDBStore::Transaction);
1541 assert(pending_finishers.empty());
1542 }
1543 return pending_proposal;
1544 }
1545
1546 bool Paxos::trigger_propose()
1547 {
1548 if (plugged) {
1549 dout(10) << __func__ << " plugged, not proposing now" << dendl;
1550 return false;
1551 } else if (is_active()) {
1552 dout(10) << __func__ << " active, proposing now" << dendl;
1553 propose_pending();
1554 return true;
1555 } else {
1556 dout(10) << __func__ << " not active, will propose later" << dendl;
1557 return false;
1558 }
1559 }
1560
1561 bool Paxos::is_consistent()
1562 {
1563 return (first_committed <= last_committed);
1564 }
1565