1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include "messages/MMonPaxos.h"
20 #include "common/config.h"
21 #include "include/assert.h"
22 #include "include/stringify.h"
23 #include "common/Timer.h"
24 #include "messages/PaxosServiceMessage.h"
26 #define dout_subsys ceph_subsys_paxos
28 #define dout_prefix _prefix(_dout, mon, mon->name, mon->rank, paxos_name, state, first_committed, last_committed)
29 static ostream
& _prefix(std::ostream
*_dout
, Monitor
*mon
, const string
& name
,
30 int rank
, const string
& paxos_name
, int state
,
31 version_t first_committed
, version_t last_committed
)
33 return *_dout
<< "mon." << name
<< "@" << rank
34 << "(" << mon
->get_state_name() << ")"
35 << ".paxos(" << paxos_name
<< " " << Paxos::get_statename(state
)
36 << " c " << first_committed
<< ".." << last_committed
40 class Paxos::C_Trimmed
: public Context
{
43 explicit C_Trimmed(Paxos
*p
) : paxos(p
) { }
44 void finish(int r
) override
{
45 paxos
->trimming
= false;
49 MonitorDBStore
*Paxos::get_store()
54 void Paxos::read_and_prepare_transactions(MonitorDBStore::TransactionRef tx
,
55 version_t first
, version_t last
)
57 dout(10) << __func__
<< " first " << first
<< " last " << last
<< dendl
;
58 for (version_t v
= first
; v
<= last
; ++v
) {
59 dout(30) << __func__
<< " apply version " << v
<< dendl
;
61 int err
= get_store()->get(get_name(), v
, bl
);
64 decode_append_transaction(tx
, bl
);
66 dout(15) << __func__
<< " total versions " << (last
-first
) << dendl
;
71 // load paxos variables from stable storage
72 last_pn
= get_store()->get(get_name(), "last_pn");
73 accepted_pn
= get_store()->get(get_name(), "accepted_pn");
74 last_committed
= get_store()->get(get_name(), "last_committed");
75 first_committed
= get_store()->get(get_name(), "first_committed");
77 dout(10) << __func__
<< " last_pn: " << last_pn
<< " accepted_pn: "
78 << accepted_pn
<< " last_committed: " << last_committed
79 << " first_committed: " << first_committed
<< dendl
;
81 dout(10) << "init" << dendl
;
82 assert(is_consistent());
85 void Paxos::init_logger()
87 PerfCountersBuilder
pcb(g_ceph_context
, "paxos", l_paxos_first
, l_paxos_last
);
88 pcb
.add_u64_counter(l_paxos_start_leader
, "start_leader", "Starts in leader role");
89 pcb
.add_u64_counter(l_paxos_start_peon
, "start_peon", "Starts in peon role");
90 pcb
.add_u64_counter(l_paxos_restart
, "restart", "Restarts");
91 pcb
.add_u64_counter(l_paxos_refresh
, "refresh", "Refreshes");
92 pcb
.add_time_avg(l_paxos_refresh_latency
, "refresh_latency", "Refresh latency");
93 pcb
.add_u64_counter(l_paxos_begin
, "begin", "Started and handled begins");
94 pcb
.add_u64_avg(l_paxos_begin_keys
, "begin_keys", "Keys in transaction on begin");
95 pcb
.add_u64_avg(l_paxos_begin_bytes
, "begin_bytes", "Data in transaction on begin");
96 pcb
.add_time_avg(l_paxos_begin_latency
, "begin_latency", "Latency of begin operation");
97 pcb
.add_u64_counter(l_paxos_commit
, "commit",
99 pcb
.add_u64_avg(l_paxos_commit_keys
, "commit_keys", "Keys in transaction on commit");
100 pcb
.add_u64_avg(l_paxos_commit_bytes
, "commit_bytes", "Data in transaction on commit");
101 pcb
.add_time_avg(l_paxos_commit_latency
, "commit_latency",
102 "Commit latency", "clat");
103 pcb
.add_u64_counter(l_paxos_collect
, "collect", "Peon collects");
104 pcb
.add_u64_avg(l_paxos_collect_keys
, "collect_keys", "Keys in transaction on peon collect");
105 pcb
.add_u64_avg(l_paxos_collect_bytes
, "collect_bytes", "Data in transaction on peon collect");
106 pcb
.add_time_avg(l_paxos_collect_latency
, "collect_latency", "Peon collect latency");
107 pcb
.add_u64_counter(l_paxos_collect_uncommitted
, "collect_uncommitted", "Uncommitted values in started and handled collects");
108 pcb
.add_u64_counter(l_paxos_collect_timeout
, "collect_timeout", "Collect timeouts");
109 pcb
.add_u64_counter(l_paxos_accept_timeout
, "accept_timeout", "Accept timeouts");
110 pcb
.add_u64_counter(l_paxos_lease_ack_timeout
, "lease_ack_timeout", "Lease acknowledgement timeouts");
111 pcb
.add_u64_counter(l_paxos_lease_timeout
, "lease_timeout", "Lease timeouts");
112 pcb
.add_u64_counter(l_paxos_store_state
, "store_state", "Store a shared state on disk");
113 pcb
.add_u64_avg(l_paxos_store_state_keys
, "store_state_keys", "Keys in transaction in stored state");
114 pcb
.add_u64_avg(l_paxos_store_state_bytes
, "store_state_bytes", "Data in transaction in stored state");
115 pcb
.add_time_avg(l_paxos_store_state_latency
, "store_state_latency", "Storing state latency");
116 pcb
.add_u64_counter(l_paxos_share_state
, "share_state", "Sharings of state");
117 pcb
.add_u64_avg(l_paxos_share_state_keys
, "share_state_keys", "Keys in shared state");
118 pcb
.add_u64_avg(l_paxos_share_state_bytes
, "share_state_bytes", "Data in shared state");
119 pcb
.add_u64_counter(l_paxos_new_pn
, "new_pn", "New proposal number queries");
120 pcb
.add_time_avg(l_paxos_new_pn_latency
, "new_pn_latency", "New proposal number getting latency");
121 logger
= pcb
.create_perf_counters();
122 g_ceph_context
->get_perfcounters_collection()->add(logger
);
125 void Paxos::dump_info(Formatter
*f
)
127 f
->open_object_section("paxos");
128 f
->dump_unsigned("first_committed", first_committed
);
129 f
->dump_unsigned("last_committed", last_committed
);
130 f
->dump_unsigned("last_pn", last_pn
);
131 f
->dump_unsigned("accepted_pn", accepted_pn
);
135 // ---------------------------------
140 void Paxos::collect(version_t oldpn
)
142 // we're recoverying, it seems!
143 state
= STATE_RECOVERING
;
144 assert(mon
->is_leader());
146 // reset the number of lasts received
149 uncommitted_value
.clear();
150 peer_first_committed
.clear();
151 peer_last_committed
.clear();
153 // look for uncommitted value
154 if (get_store()->exists(get_name(), last_committed
+1)) {
155 version_t v
= get_store()->get(get_name(), "pending_v");
156 version_t pn
= get_store()->get(get_name(), "pending_pn");
157 if (v
&& pn
&& v
== last_committed
+ 1) {
160 dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << accepted_pn
161 << " and crossing our fingers" << dendl
;
162 uncommitted_pn
= accepted_pn
;
164 uncommitted_v
= last_committed
+1;
166 get_store()->get(get_name(), last_committed
+1, uncommitted_value
);
167 assert(uncommitted_value
.length());
168 dout(10) << "learned uncommitted " << (last_committed
+1)
169 << " pn " << uncommitted_pn
170 << " (" << uncommitted_value
.length() << " bytes) from myself"
173 logger
->inc(l_paxos_collect_uncommitted
);
177 accepted_pn
= get_new_proposal_number(MAX(accepted_pn
, oldpn
));
178 accepted_pn_from
= last_committed
;
180 dout(10) << "collect with pn " << accepted_pn
<< dendl
;
183 for (set
<int>::const_iterator p
= mon
->get_quorum().begin();
184 p
!= mon
->get_quorum().end();
186 if (*p
== mon
->rank
) continue;
188 MMonPaxos
*collect
= new MMonPaxos(mon
->get_epoch(), MMonPaxos::OP_COLLECT
,
190 collect
->last_committed
= last_committed
;
191 collect
->first_committed
= first_committed
;
192 collect
->pn
= accepted_pn
;
193 mon
->messenger
->send_message(collect
, mon
->monmap
->get_inst(*p
));
197 collect_timeout_event
= new C_MonContext(mon
, [this](int r
) {
202 mon
->timer
.add_event_after(g_conf
->mon_accept_timeout_factor
*
204 collect_timeout_event
);
209 void Paxos::handle_collect(MonOpRequestRef op
)
212 op
->mark_paxos_event("handle_collect");
214 MMonPaxos
*collect
= static_cast<MMonPaxos
*>(op
->get_req());
215 dout(10) << "handle_collect " << *collect
<< dendl
;
217 assert(mon
->is_peon()); // mon epoch filter should catch strays
219 // we're recoverying, it seems!
220 state
= STATE_RECOVERING
;
222 //update the peon recovery timeout
223 reset_lease_timeout();
225 if (collect
->first_committed
> last_committed
+1) {
227 << " leader's lowest version is too high for our last committed"
228 << " (theirs: " << collect
->first_committed
229 << "; ours: " << last_committed
<< ") -- bootstrap!" << dendl
;
230 op
->mark_paxos_event("need to bootstrap");
236 MMonPaxos
*last
= new MMonPaxos(mon
->get_epoch(), MMonPaxos::OP_LAST
,
238 last
->last_committed
= last_committed
;
239 last
->first_committed
= first_committed
;
241 version_t previous_pn
= accepted_pn
;
243 // can we accept this pn?
244 if (collect
->pn
> accepted_pn
) {
246 accepted_pn
= collect
->pn
;
247 accepted_pn_from
= collect
->pn_from
;
248 dout(10) << "accepting pn " << accepted_pn
<< " from "
249 << accepted_pn_from
<< dendl
;
251 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
252 t
->put(get_name(), "accepted_pn", accepted_pn
);
254 dout(30) << __func__
<< " transaction dump:\n";
255 JSONFormatter
f(true);
260 logger
->inc(l_paxos_collect
);
261 logger
->inc(l_paxos_collect_keys
, t
->get_keys());
262 logger
->inc(l_paxos_collect_bytes
, t
->get_bytes());
263 utime_t start
= ceph_clock_now();
265 get_store()->apply_transaction(t
);
267 utime_t end
= ceph_clock_now();
268 logger
->tinc(l_paxos_collect_latency
, end
- start
);
271 dout(10) << "NOT accepting pn " << collect
->pn
<< " from " << collect
->pn_from
272 << ", we already accepted " << accepted_pn
273 << " from " << accepted_pn_from
<< dendl
;
275 last
->pn
= accepted_pn
;
276 last
->pn_from
= accepted_pn_from
;
278 // share whatever committed values we have
279 if (collect
->last_committed
< last_committed
)
280 share_state(last
, collect
->first_committed
, collect
->last_committed
);
282 // do we have an accepted but uncommitted value?
283 // (it'll be at last_committed+1)
285 if (collect
->last_committed
<= last_committed
&&
286 get_store()->exists(get_name(), last_committed
+1)) {
287 get_store()->get(get_name(), last_committed
+1, bl
);
288 assert(bl
.length() > 0);
289 dout(10) << " sharing our accepted but uncommitted value for "
290 << last_committed
+1 << " (" << bl
.length() << " bytes)" << dendl
;
291 last
->values
[last_committed
+1] = bl
;
293 version_t v
= get_store()->get(get_name(), "pending_v");
294 version_t pn
= get_store()->get(get_name(), "pending_pn");
295 if (v
&& pn
&& v
== last_committed
+ 1) {
296 last
->uncommitted_pn
= pn
;
298 // previously we didn't record which pn a value was accepted
299 // under! use the pn value we just had... :(
300 dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << previous_pn
301 << " and crossing our fingers" << dendl
;
302 last
->uncommitted_pn
= previous_pn
;
305 logger
->inc(l_paxos_collect_uncommitted
);
309 collect
->get_connection()->send_message(last
);
313 * @note This is Okay. We share our versions between peer_last_committed and
314 * our last_committed (inclusive), and add their bufferlists to the
315 * message. It will be the peer's job to apply them to its store, as
316 * these bufferlists will contain raw transactions.
317 * This function is called by both the Peon and the Leader. The Peon will
318 * share the state with the Leader during handle_collect(), sharing any
319 * values the leader may be missing (i.e., the leader's last_committed is
320 * lower than the peon's last_committed). The Leader will share the state
321 * with the Peon during handle_last(), if the peon's last_committed is
322 * lower than the leader's last_committed.
324 void Paxos::share_state(MMonPaxos
*m
, version_t peer_first_committed
,
325 version_t peer_last_committed
)
327 assert(peer_last_committed
< last_committed
);
329 dout(10) << "share_state peer has fc " << peer_first_committed
330 << " lc " << peer_last_committed
<< dendl
;
331 version_t v
= peer_last_committed
+ 1;
333 // include incrementals
335 for ( ; v
<= last_committed
; v
++) {
336 if (get_store()->exists(get_name(), v
)) {
337 get_store()->get(get_name(), v
, m
->values
[v
]);
338 assert(m
->values
[v
].length());
339 dout(10) << " sharing " << v
<< " ("
340 << m
->values
[v
].length() << " bytes)" << dendl
;
341 bytes
+= m
->values
[v
].length() + 16; // paxos_ + 10 digits = 16
344 logger
->inc(l_paxos_share_state
);
345 logger
->inc(l_paxos_share_state_keys
, m
->values
.size());
346 logger
->inc(l_paxos_share_state_bytes
, bytes
);
348 m
->last_committed
= last_committed
;
352 * Store on disk a state that was shared with us
354 * Basically, we received a set of version. Or just one. It doesn't matter.
355 * What matters is that we have to stash it in the store. So, we will simply
356 * write every single bufferlist into their own versions on our side (i.e.,
357 * onto paxos-related keys), and then we will decode those same bufferlists
358 * we just wrote and apply the transactions they hold. We will also update
359 * our first and last committed values to point to the new values, if need
360 * be. All all this is done tightly wrapped in a transaction to ensure we
361 * enjoy the atomicity guarantees given by our awesome k/v store.
363 bool Paxos::store_state(MMonPaxos
*m
)
365 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
366 map
<version_t
,bufferlist
>::iterator start
= m
->values
.begin();
367 bool changed
= false;
369 // build map of values to store
370 // we want to write the range [last_committed, m->last_committed] only.
371 if (start
!= m
->values
.end() &&
372 start
->first
> last_committed
+ 1) {
373 // ignore everything if values start in the future.
374 dout(10) << "store_state ignoring all values, they start at " << start
->first
375 << " > last_committed+1" << dendl
;
379 // push forward the start position on the message's values iterator, up until
380 // we run out of positions or we find a position matching 'last_committed'.
381 while (start
!= m
->values
.end() && start
->first
<= last_committed
) {
385 // make sure we get the right interval of values to apply by pushing forward
386 // the 'end' iterator until it matches the message's 'last_committed'.
387 map
<version_t
,bufferlist
>::iterator end
= start
;
388 while (end
!= m
->values
.end() && end
->first
<= m
->last_committed
) {
389 last_committed
= end
->first
;
394 dout(10) << "store_state nothing to commit" << dendl
;
396 dout(10) << "store_state [" << start
->first
<< ".."
397 << last_committed
<< "]" << dendl
;
398 t
->put(get_name(), "last_committed", last_committed
);
400 // we should apply the state here -- decode every single bufferlist in the
401 // map and append the transactions to 't'.
402 map
<version_t
,bufferlist
>::iterator it
;
403 for (it
= start
; it
!= end
; ++it
) {
404 // write the bufferlist as the version's value
405 t
->put(get_name(), it
->first
, it
->second
);
406 // decode the bufferlist and append it to the transaction we will shortly
408 decode_append_transaction(t
, it
->second
);
411 // discard obsolete uncommitted value?
412 if (uncommitted_v
&& uncommitted_v
<= last_committed
) {
413 dout(10) << " forgetting obsolete uncommitted value " << uncommitted_v
414 << " pn " << uncommitted_pn
<< dendl
;
417 uncommitted_value
.clear();
421 dout(30) << __func__
<< " transaction dump:\n";
422 JSONFormatter
f(true);
427 logger
->inc(l_paxos_store_state
);
428 logger
->inc(l_paxos_store_state_bytes
, t
->get_bytes());
429 logger
->inc(l_paxos_store_state_keys
, t
->get_keys());
430 utime_t start
= ceph_clock_now();
432 get_store()->apply_transaction(t
);
434 utime_t end
= ceph_clock_now();
435 logger
->tinc(l_paxos_store_state_latency
, end
- start
);
437 // refresh first_committed; this txn may have trimmed.
438 first_committed
= get_store()->get(get_name(), "first_committed");
440 _sanity_check_store();
447 void Paxos::_sanity_check_store()
449 version_t lc
= get_store()->get(get_name(), "last_committed");
450 assert(lc
== last_committed
);
455 void Paxos::handle_last(MonOpRequestRef op
)
457 op
->mark_paxos_event("handle_last");
458 MMonPaxos
*last
= static_cast<MMonPaxos
*>(op
->get_req());
459 bool need_refresh
= false;
460 int from
= last
->get_source().num();
462 dout(10) << "handle_last " << *last
<< dendl
;
464 if (!mon
->is_leader()) {
465 dout(10) << "not leader, dropping" << dendl
;
469 // note peer's first_ and last_committed, in case we learn a new
470 // commit and need to push it to them.
471 peer_first_committed
[from
] = last
->first_committed
;
472 peer_last_committed
[from
] = last
->last_committed
;
474 if (last
->first_committed
> last_committed
+ 1) {
477 << " lowest version is too high for our last committed"
478 << " (theirs: " << last
->first_committed
479 << "; ours: " << last_committed
<< ") -- bootstrap!" << dendl
;
480 op
->mark_paxos_event("need to bootstrap");
485 assert(g_conf
->paxos_kill_at
!= 1);
487 // store any committed values if any are specified in the message
488 need_refresh
= store_state(last
);
490 assert(g_conf
->paxos_kill_at
!= 2);
492 // is everyone contiguous and up to date?
493 for (map
<int,version_t
>::iterator p
= peer_last_committed
.begin();
494 p
!= peer_last_committed
.end();
496 if (p
->second
+ 1 < first_committed
&& first_committed
> 1) {
498 << " peon " << p
->first
499 << " last_committed (" << p
->second
500 << ") is too low for our first_committed (" << first_committed
501 << ") -- bootstrap!" << dendl
;
502 op
->mark_paxos_event("need to bootstrap");
506 if (p
->second
< last_committed
) {
507 // share committed values
508 dout(10) << " sending commit to mon." << p
->first
<< dendl
;
509 MMonPaxos
*commit
= new MMonPaxos(mon
->get_epoch(),
510 MMonPaxos::OP_COMMIT
,
512 share_state(commit
, peer_first_committed
[p
->first
], p
->second
);
513 mon
->messenger
->send_message(commit
, mon
->monmap
->get_inst(p
->first
));
517 // do they accept your pn?
518 if (last
->pn
> accepted_pn
) {
520 dout(10) << " they had a higher pn than us, picking a new one." << dendl
;
522 // cancel timeout event
523 mon
->timer
.cancel_event(collect_timeout_event
);
524 collect_timeout_event
= 0;
527 } else if (last
->pn
== accepted_pn
) {
528 // yes, they accepted our pn. great.
530 dout(10) << " they accepted our pn, we now have "
531 << num_last
<< " peons" << dendl
;
533 // did this person send back an accepted but uncommitted value?
534 if (last
->uncommitted_pn
) {
535 if (last
->uncommitted_pn
>= uncommitted_pn
&&
536 last
->last_committed
>= last_committed
&&
537 last
->last_committed
+ 1 >= uncommitted_v
) {
538 uncommitted_v
= last
->last_committed
+1;
539 uncommitted_pn
= last
->uncommitted_pn
;
540 uncommitted_value
= last
->values
[uncommitted_v
];
541 dout(10) << "we learned an uncommitted value for " << uncommitted_v
542 << " pn " << uncommitted_pn
543 << " " << uncommitted_value
.length() << " bytes"
546 dout(10) << "ignoring uncommitted value for " << (last
->last_committed
+1)
547 << " pn " << last
->uncommitted_pn
548 << " " << last
->values
[last
->last_committed
+1].length() << " bytes"
554 if (num_last
== mon
->get_quorum().size()) {
555 // cancel timeout event
556 mon
->timer
.cancel_event(collect_timeout_event
);
557 collect_timeout_event
= 0;
558 peer_first_committed
.clear();
559 peer_last_committed
.clear();
563 // did we learn an old value?
564 if (uncommitted_v
== last_committed
+1 &&
565 uncommitted_value
.length()) {
566 dout(10) << "that's everyone. begin on old learned value" << dendl
;
567 state
= STATE_UPDATING_PREVIOUS
;
568 begin(uncommitted_value
);
571 dout(10) << "that's everyone. active!" << dendl
;
574 need_refresh
= false;
581 // no, this is an old message, discard
582 dout(10) << "old pn, ignoring" << dendl
;
589 void Paxos::collect_timeout()
591 dout(1) << "collect timeout, calling fresh election" << dendl
;
592 collect_timeout_event
= 0;
593 logger
->inc(l_paxos_collect_timeout
);
594 assert(mon
->is_leader());
600 void Paxos::begin(bufferlist
& v
)
602 dout(10) << "begin for " << last_committed
+1 << " "
603 << v
.length() << " bytes"
606 assert(mon
->is_leader());
607 assert(is_updating() || is_updating_previous());
609 // we must already have a majority for this to work.
610 assert(mon
->get_quorum().size() == 1 ||
611 num_last
> (unsigned)mon
->monmap
->size()/2);
613 // and no value, yet.
614 assert(new_value
.length() == 0);
616 // accept it ourselves
618 accepted
.insert(mon
->rank
);
621 if (last_committed
== 0) {
622 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
623 // initial base case; set first_committed too
624 t
->put(get_name(), "first_committed", 1);
625 decode_append_transaction(t
, new_value
);
633 // store the proposed value in the store. IF it is accepted, we will then
634 // have to decode it into a transaction and apply it.
635 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
636 t
->put(get_name(), last_committed
+1, new_value
);
638 // note which pn this pending value is for.
639 t
->put(get_name(), "pending_v", last_committed
+ 1);
640 t
->put(get_name(), "pending_pn", accepted_pn
);
642 dout(30) << __func__
<< " transaction dump:\n";
643 JSONFormatter
f(true);
646 auto debug_tx(std::make_shared
<MonitorDBStore::Transaction
>());
647 bufferlist::iterator new_value_it
= new_value
.begin();
648 debug_tx
->decode(new_value_it
);
650 *_dout
<< "\nbl dump:\n";
654 logger
->inc(l_paxos_begin
);
655 logger
->inc(l_paxos_begin_keys
, t
->get_keys());
656 logger
->inc(l_paxos_begin_bytes
, t
->get_bytes());
657 utime_t start
= ceph_clock_now();
659 get_store()->apply_transaction(t
);
661 utime_t end
= ceph_clock_now();
662 logger
->tinc(l_paxos_begin_latency
, end
- start
);
664 assert(g_conf
->paxos_kill_at
!= 3);
666 if (mon
->get_quorum().size() == 1) {
667 // we're alone, take it easy
672 // ask others to accept it too!
673 for (set
<int>::const_iterator p
= mon
->get_quorum().begin();
674 p
!= mon
->get_quorum().end();
676 if (*p
== mon
->rank
) continue;
678 dout(10) << " sending begin to mon." << *p
<< dendl
;
679 MMonPaxos
*begin
= new MMonPaxos(mon
->get_epoch(), MMonPaxos::OP_BEGIN
,
681 begin
->values
[last_committed
+1] = new_value
;
682 begin
->last_committed
= last_committed
;
683 begin
->pn
= accepted_pn
;
685 mon
->messenger
->send_message(begin
, mon
->monmap
->get_inst(*p
));
689 accept_timeout_event
= new C_MonContext(mon
, [this](int r
) {
694 mon
->timer
.add_event_after(g_conf
->mon_accept_timeout_factor
*
696 accept_timeout_event
);
700 void Paxos::handle_begin(MonOpRequestRef op
)
702 op
->mark_paxos_event("handle_begin");
703 MMonPaxos
*begin
= static_cast<MMonPaxos
*>(op
->get_req());
704 dout(10) << "handle_begin " << *begin
<< dendl
;
706 // can we accept this?
707 if (begin
->pn
< accepted_pn
) {
708 dout(10) << " we accepted a higher pn " << accepted_pn
<< ", ignoring" << dendl
;
709 op
->mark_paxos_event("have higher pn, ignore");
712 assert(begin
->pn
== accepted_pn
);
713 assert(begin
->last_committed
== last_committed
);
715 assert(g_conf
->paxos_kill_at
!= 4);
717 logger
->inc(l_paxos_begin
);
720 state
= STATE_UPDATING
;
721 lease_expire
= utime_t(); // cancel lease
724 version_t v
= last_committed
+1;
725 dout(10) << "accepting value for " << v
<< " pn " << accepted_pn
<< dendl
;
726 // store the accepted value onto our store. We will have to decode it and
727 // apply its transaction once we receive permission to commit.
728 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
729 t
->put(get_name(), v
, begin
->values
[v
]);
731 // note which pn this pending value is for.
732 t
->put(get_name(), "pending_v", v
);
733 t
->put(get_name(), "pending_pn", accepted_pn
);
735 dout(30) << __func__
<< " transaction dump:\n";
736 JSONFormatter
f(true);
741 logger
->inc(l_paxos_begin_bytes
, t
->get_bytes());
742 utime_t start
= ceph_clock_now();
744 get_store()->apply_transaction(t
);
746 utime_t end
= ceph_clock_now();
747 logger
->tinc(l_paxos_begin_latency
, end
- start
);
749 assert(g_conf
->paxos_kill_at
!= 5);
752 MMonPaxos
*accept
= new MMonPaxos(mon
->get_epoch(), MMonPaxos::OP_ACCEPT
,
754 accept
->pn
= accepted_pn
;
755 accept
->last_committed
= last_committed
;
756 begin
->get_connection()->send_message(accept
);
760 void Paxos::handle_accept(MonOpRequestRef op
)
762 op
->mark_paxos_event("handle_accept");
763 MMonPaxos
*accept
= static_cast<MMonPaxos
*>(op
->get_req());
764 dout(10) << "handle_accept " << *accept
<< dendl
;
765 int from
= accept
->get_source().num();
767 if (accept
->pn
!= accepted_pn
) {
768 // we accepted a higher pn, from some other leader
769 dout(10) << " we accepted a higher pn " << accepted_pn
<< ", ignoring" << dendl
;
770 op
->mark_paxos_event("have higher pn, ignore");
773 if (last_committed
> 0 &&
774 accept
->last_committed
< last_committed
-1) {
775 dout(10) << " this is from an old round, ignoring" << dendl
;
776 op
->mark_paxos_event("old round, ignore");
779 assert(accept
->last_committed
== last_committed
|| // not committed
780 accept
->last_committed
== last_committed
-1); // committed
782 assert(is_updating() || is_updating_previous());
783 assert(accepted
.count(from
) == 0);
784 accepted
.insert(from
);
785 dout(10) << " now " << accepted
<< " have accepted" << dendl
;
787 assert(g_conf
->paxos_kill_at
!= 6);
789 // only commit (and expose committed state) when we get *all* quorum
790 // members to accept. otherwise, they may still be sharing the now
792 // FIXME: we can improve this with an additional lease revocation message
793 // that doesn't block for the persist.
794 if (accepted
== mon
->get_quorum()) {
796 dout(10) << " got majority, committing, done with update" << dendl
;
797 op
->mark_paxos_event("commit_start");
802 void Paxos::accept_timeout()
804 dout(1) << "accept timeout, calling fresh election" << dendl
;
805 accept_timeout_event
= 0;
806 assert(mon
->is_leader());
807 assert(is_updating() || is_updating_previous() || is_writing() ||
808 is_writing_previous());
809 logger
->inc(l_paxos_accept_timeout
);
813 struct C_Committed
: public Context
{
815 explicit C_Committed(Paxos
*p
) : paxos(p
) {}
816 void finish(int r
) override
{
818 Mutex::Locker
l(paxos
->mon
->lock
);
819 paxos
->commit_finish();
823 void Paxos::commit_start()
825 dout(10) << __func__
<< " " << (last_committed
+1) << dendl
;
827 assert(g_conf
->paxos_kill_at
!= 7);
829 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
832 t
->put(get_name(), "last_committed", last_committed
+ 1);
834 // decode the value and apply its transaction to the store.
835 // this value can now be read from last_committed.
836 decode_append_transaction(t
, new_value
);
838 dout(30) << __func__
<< " transaction dump:\n";
839 JSONFormatter
f(true);
844 logger
->inc(l_paxos_commit
);
845 logger
->inc(l_paxos_commit_keys
, t
->get_keys());
846 logger
->inc(l_paxos_commit_bytes
, t
->get_bytes());
847 commit_start_stamp
= ceph_clock_now();
849 get_store()->queue_transaction(t
, new C_Committed(this));
851 if (is_updating_previous())
852 state
= STATE_WRITING_PREVIOUS
;
853 else if (is_updating())
854 state
= STATE_WRITING
;
858 if (mon
->get_quorum().size() > 1) {
859 // cancel timeout event
860 mon
->timer
.cancel_event(accept_timeout_event
);
861 accept_timeout_event
= 0;
865 void Paxos::commit_finish()
867 dout(20) << __func__
<< " " << (last_committed
+1) << dendl
;
868 utime_t end
= ceph_clock_now();
869 logger
->tinc(l_paxos_commit_latency
, end
- commit_start_stamp
);
871 assert(g_conf
->paxos_kill_at
!= 8);
873 // cancel lease - it was for the old value.
874 // (this would only happen if message layer lost the 'begin', but
875 // leader still got a majority and committed with out us.)
876 lease_expire
= utime_t(); // cancel lease
879 last_commit_time
= ceph_clock_now();
881 // refresh first_committed; this txn may have trimmed.
882 first_committed
= get_store()->get(get_name(), "first_committed");
884 _sanity_check_store();
887 for (set
<int>::const_iterator p
= mon
->get_quorum().begin();
888 p
!= mon
->get_quorum().end();
890 if (*p
== mon
->rank
) continue;
892 dout(10) << " sending commit to mon." << *p
<< dendl
;
893 MMonPaxos
*commit
= new MMonPaxos(mon
->get_epoch(), MMonPaxos::OP_COMMIT
,
895 commit
->values
[last_committed
] = new_value
;
896 commit
->pn
= accepted_pn
;
897 commit
->last_committed
= last_committed
;
899 mon
->messenger
->send_message(commit
, mon
->monmap
->get_inst(*p
));
902 assert(g_conf
->paxos_kill_at
!= 9);
904 // get ready for a new round.
907 // WRITING -> REFRESH
908 // among other things, this lets do_refresh() -> mon->bootstrap() know
909 // it doesn't need to flush the store queue
910 assert(is_writing() || is_writing_previous());
911 state
= STATE_REFRESH
;
915 if (mon
->get_quorum().size() > 1) {
919 finish_contexts(g_ceph_context
, waiting_for_commit
);
921 assert(g_conf
->paxos_kill_at
!= 10);
928 void Paxos::handle_commit(MonOpRequestRef op
)
930 op
->mark_paxos_event("handle_commit");
931 MMonPaxos
*commit
= static_cast<MMonPaxos
*>(op
->get_req());
932 dout(10) << "handle_commit on " << commit
->last_committed
<< dendl
;
934 logger
->inc(l_paxos_commit
);
936 if (!mon
->is_peon()) {
937 dout(10) << "not a peon, dropping" << dendl
;
942 op
->mark_paxos_event("store_state");
946 finish_contexts(g_ceph_context
, waiting_for_commit
);
950 void Paxos::extend_lease()
952 assert(mon
->is_leader());
953 //assert(is_active());
955 lease_expire
= ceph_clock_now();
956 lease_expire
+= g_conf
->mon_lease
;
958 acked_lease
.insert(mon
->rank
);
960 dout(7) << "extend_lease now+" << g_conf
->mon_lease
961 << " (" << lease_expire
<< ")" << dendl
;
964 for (set
<int>::const_iterator p
= mon
->get_quorum().begin();
965 p
!= mon
->get_quorum().end(); ++p
) {
967 if (*p
== mon
->rank
) continue;
968 MMonPaxos
*lease
= new MMonPaxos(mon
->get_epoch(), MMonPaxos::OP_LEASE
,
970 lease
->last_committed
= last_committed
;
971 lease
->lease_timestamp
= lease_expire
;
972 lease
->first_committed
= first_committed
;
973 mon
->messenger
->send_message(lease
, mon
->monmap
->get_inst(*p
));
976 // set timeout event.
977 // if old timeout is still in place, leave it.
978 if (!lease_ack_timeout_event
) {
979 lease_ack_timeout_event
= new C_MonContext(mon
, [this](int r
) {
984 mon
->timer
.add_event_after(g_conf
->mon_lease_ack_timeout_factor
*
986 lease_ack_timeout_event
);
990 lease_renew_event
= new C_MonContext(mon
, [this](int r
) {
993 lease_renew_timeout();
995 utime_t at
= lease_expire
;
996 at
-= g_conf
->mon_lease
;
997 at
+= g_conf
->mon_lease_renew_interval_factor
* g_conf
->mon_lease
;
998 mon
->timer
.add_event_at(at
, lease_renew_event
);
1001 void Paxos::warn_on_future_time(utime_t t
, entity_name_t from
)
1003 utime_t now
= ceph_clock_now();
1005 utime_t diff
= t
- now
;
1006 if (diff
> g_conf
->mon_clock_drift_allowed
) {
1007 utime_t warn_diff
= now
- last_clock_drift_warn
;
1009 pow(g_conf
->mon_clock_drift_warn_backoff
, clock_drift_warned
)) {
1010 mon
->clog
->warn() << "message from " << from
<< " was stamped " << diff
1011 << "s in the future, clocks not synchronized";
1012 last_clock_drift_warn
= ceph_clock_now();
1013 ++clock_drift_warned
;
1020 bool Paxos::do_refresh()
1022 bool need_bootstrap
= false;
1024 utime_t start
= ceph_clock_now();
1026 // make sure we have the latest state loaded up
1027 mon
->refresh_from_paxos(&need_bootstrap
);
1029 utime_t end
= ceph_clock_now();
1030 logger
->inc(l_paxos_refresh
);
1031 logger
->tinc(l_paxos_refresh_latency
, end
- start
);
1033 if (need_bootstrap
) {
1034 dout(10) << " doing requested bootstrap" << dendl
;
1042 void Paxos::commit_proposal()
1044 dout(10) << __func__
<< dendl
;
1045 assert(mon
->is_leader());
1046 assert(is_refresh());
1048 finish_contexts(g_ceph_context
, committing_finishers
);
1051 void Paxos::finish_round()
1053 dout(10) << __func__
<< dendl
;
1054 assert(mon
->is_leader());
1056 // ok, now go active!
1057 state
= STATE_ACTIVE
;
1059 dout(20) << __func__
<< " waiting_for_acting" << dendl
;
1060 finish_contexts(g_ceph_context
, waiting_for_active
);
1061 dout(20) << __func__
<< " waiting_for_readable" << dendl
;
1062 finish_contexts(g_ceph_context
, waiting_for_readable
);
1063 dout(20) << __func__
<< " waiting_for_writeable" << dendl
;
1064 finish_contexts(g_ceph_context
, waiting_for_writeable
);
1066 dout(10) << __func__
<< " done w/ waiters, state " << get_statename(state
) << dendl
;
1068 if (should_trim()) {
1072 if (is_active() && pending_proposal
) {
1079 void Paxos::handle_lease(MonOpRequestRef op
)
1081 op
->mark_paxos_event("handle_lease");
1082 MMonPaxos
*lease
= static_cast<MMonPaxos
*>(op
->get_req());
1084 if (!mon
->is_peon() ||
1085 last_committed
!= lease
->last_committed
) {
1086 dout(10) << "handle_lease i'm not a peon, or they're not the leader,"
1087 << " or the last_committed doesn't match, dropping" << dendl
;
1088 op
->mark_paxos_event("invalid lease, ignore");
1092 warn_on_future_time(lease
->sent_timestamp
, lease
->get_source());
1095 if (lease_expire
< lease
->lease_timestamp
) {
1096 lease_expire
= lease
->lease_timestamp
;
1098 utime_t now
= ceph_clock_now();
1099 if (lease_expire
< now
) {
1100 utime_t diff
= now
- lease_expire
;
1101 derr
<< "lease_expire from " << lease
->get_source_inst() << " is " << diff
<< " seconds in the past; mons are probably laggy (or possibly clocks are too skewed)" << dendl
;
1105 state
= STATE_ACTIVE
;
1107 dout(10) << "handle_lease on " << lease
->last_committed
1108 << " now " << lease_expire
<< dendl
;
1111 MMonPaxos
*ack
= new MMonPaxos(mon
->get_epoch(), MMonPaxos::OP_LEASE_ACK
,
1113 ack
->last_committed
= last_committed
;
1114 ack
->first_committed
= first_committed
;
1115 ack
->lease_timestamp
= ceph_clock_now();
1116 lease
->get_connection()->send_message(ack
);
1118 // (re)set timeout event.
1119 reset_lease_timeout();
1122 finish_contexts(g_ceph_context
, waiting_for_active
);
1124 finish_contexts(g_ceph_context
, waiting_for_readable
);
1127 void Paxos::handle_lease_ack(MonOpRequestRef op
)
1129 op
->mark_paxos_event("handle_lease_ack");
1130 MMonPaxos
*ack
= static_cast<MMonPaxos
*>(op
->get_req());
1131 int from
= ack
->get_source().num();
1133 if (!lease_ack_timeout_event
) {
1134 dout(10) << "handle_lease_ack from " << ack
->get_source()
1135 << " -- stray (probably since revoked)" << dendl
;
1137 else if (acked_lease
.count(from
) == 0) {
1138 acked_lease
.insert(from
);
1140 if (acked_lease
== mon
->get_quorum()) {
1142 dout(10) << "handle_lease_ack from " << ack
->get_source()
1143 << " -- got everyone" << dendl
;
1144 mon
->timer
.cancel_event(lease_ack_timeout_event
);
1145 lease_ack_timeout_event
= 0;
1149 dout(10) << "handle_lease_ack from " << ack
->get_source()
1150 << " -- still need "
1151 << mon
->get_quorum().size() - acked_lease
.size()
1152 << " more" << dendl
;
1155 dout(10) << "handle_lease_ack from " << ack
->get_source()
1156 << " dup (lagging!), ignoring" << dendl
;
1159 warn_on_future_time(ack
->sent_timestamp
, ack
->get_source());
1162 void Paxos::lease_ack_timeout()
1164 dout(1) << "lease_ack_timeout -- calling new election" << dendl
;
1165 assert(mon
->is_leader());
1166 assert(is_active());
1167 logger
->inc(l_paxos_lease_ack_timeout
);
1168 lease_ack_timeout_event
= 0;
1172 void Paxos::reset_lease_timeout()
1174 dout(20) << "reset_lease_timeout - setting timeout event" << dendl
;
1175 if (lease_timeout_event
)
1176 mon
->timer
.cancel_event(lease_timeout_event
);
1177 lease_timeout_event
= new C_MonContext(mon
, [this](int r
) {
1178 if (r
== -ECANCELED
)
1182 mon
->timer
.add_event_after(g_conf
->mon_lease_ack_timeout_factor
*
1184 lease_timeout_event
);
1187 void Paxos::lease_timeout()
1189 dout(1) << "lease_timeout -- calling new election" << dendl
;
1190 assert(mon
->is_peon());
1191 logger
->inc(l_paxos_lease_timeout
);
1192 lease_timeout_event
= 0;
1196 void Paxos::lease_renew_timeout()
1198 lease_renew_event
= 0;
1208 assert(should_trim());
1209 version_t end
= MIN(get_version() - g_conf
->paxos_min
,
1210 get_first_committed() + g_conf
->paxos_trim_max
);
1212 if (first_committed
>= end
)
1215 dout(10) << "trim to " << end
<< " (was " << first_committed
<< ")" << dendl
;
1217 MonitorDBStore::TransactionRef t
= get_pending_transaction();
1219 for (version_t v
= first_committed
; v
< end
; ++v
) {
1220 dout(10) << "trim " << v
<< dendl
;
1221 t
->erase(get_name(), v
);
1223 t
->put(get_name(), "first_committed", end
);
1224 if (g_conf
->mon_compact_on_trim
) {
1225 dout(10) << " compacting trimmed range" << dendl
;
1226 t
->compact_range(get_name(), stringify(first_committed
- 1), stringify(end
));
1230 queue_pending_finisher(new C_Trimmed(this));
1234 * return a globally unique, monotonically increasing proposal number
1236 version_t
Paxos::get_new_proposal_number(version_t gt
)
1241 // update. make it unique among all monitors.
1245 last_pn
+= (version_t
)mon
->rank
;
1248 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1249 t
->put(get_name(), "last_pn", last_pn
);
1251 dout(30) << __func__
<< " transaction dump:\n";
1252 JSONFormatter
f(true);
1257 logger
->inc(l_paxos_new_pn
);
1258 utime_t start
= ceph_clock_now();
1260 get_store()->apply_transaction(t
);
1262 utime_t end
= ceph_clock_now();
1263 logger
->tinc(l_paxos_new_pn_latency
, end
- start
);
1265 dout(10) << "get_new_proposal_number = " << last_pn
<< dendl
;
1270 void Paxos::cancel_events()
1272 if (collect_timeout_event
) {
1273 mon
->timer
.cancel_event(collect_timeout_event
);
1274 collect_timeout_event
= 0;
1276 if (accept_timeout_event
) {
1277 mon
->timer
.cancel_event(accept_timeout_event
);
1278 accept_timeout_event
= 0;
1280 if (lease_renew_event
) {
1281 mon
->timer
.cancel_event(lease_renew_event
);
1282 lease_renew_event
= 0;
1284 if (lease_ack_timeout_event
) {
1285 mon
->timer
.cancel_event(lease_ack_timeout_event
);
1286 lease_ack_timeout_event
= 0;
1288 if (lease_timeout_event
) {
1289 mon
->timer
.cancel_event(lease_timeout_event
);
1290 lease_timeout_event
= 0;
1294 void Paxos::shutdown()
1296 dout(10) << __func__
<< " cancel all contexts" << dendl
;
1298 // discard pending transaction
1299 pending_proposal
.reset();
1301 finish_contexts(g_ceph_context
, waiting_for_writeable
, -ECANCELED
);
1302 finish_contexts(g_ceph_context
, waiting_for_commit
, -ECANCELED
);
1303 finish_contexts(g_ceph_context
, waiting_for_readable
, -ECANCELED
);
1304 finish_contexts(g_ceph_context
, waiting_for_active
, -ECANCELED
);
1305 finish_contexts(g_ceph_context
, pending_finishers
, -ECANCELED
);
1306 finish_contexts(g_ceph_context
, committing_finishers
, -ECANCELED
);
1308 g_ceph_context
->get_perfcounters_collection()->remove(logger
);
1312 void Paxos::leader_init()
1317 // discard pending transaction
1318 pending_proposal
.reset();
1320 finish_contexts(g_ceph_context
, pending_finishers
, -EAGAIN
);
1321 finish_contexts(g_ceph_context
, committing_finishers
, -EAGAIN
);
1323 logger
->inc(l_paxos_start_leader
);
1325 if (mon
->get_quorum().size() == 1) {
1326 state
= STATE_ACTIVE
;
1330 state
= STATE_RECOVERING
;
1331 lease_expire
= utime_t();
1332 dout(10) << "leader_init -- starting paxos recovery" << dendl
;
1336 void Paxos::peon_init()
1341 state
= STATE_RECOVERING
;
1342 lease_expire
= utime_t();
1343 dout(10) << "peon_init -- i am a peon" << dendl
;
1345 // start a timer, in case the leader never manages to issue a lease
1346 reset_lease_timeout();
1348 // discard pending transaction
1349 pending_proposal
.reset();
1351 // no chance to write now!
1352 finish_contexts(g_ceph_context
, waiting_for_writeable
, -EAGAIN
);
1353 finish_contexts(g_ceph_context
, waiting_for_commit
, -EAGAIN
);
1354 finish_contexts(g_ceph_context
, pending_finishers
, -EAGAIN
);
1355 finish_contexts(g_ceph_context
, committing_finishers
, -EAGAIN
);
1357 logger
->inc(l_paxos_start_peon
);
1360 void Paxos::restart()
1362 dout(10) << "restart -- canceling timeouts" << dendl
;
1366 if (is_writing() || is_writing_previous()) {
1367 dout(10) << __func__
<< " flushing" << dendl
;
1369 mon
->store
->flush();
1371 dout(10) << __func__
<< " flushed" << dendl
;
1373 state
= STATE_RECOVERING
;
1375 // discard pending transaction
1376 pending_proposal
.reset();
1378 finish_contexts(g_ceph_context
, committing_finishers
, -EAGAIN
);
1379 finish_contexts(g_ceph_context
, pending_finishers
, -EAGAIN
);
1380 finish_contexts(g_ceph_context
, waiting_for_commit
, -EAGAIN
);
1381 finish_contexts(g_ceph_context
, waiting_for_active
, -EAGAIN
);
1383 logger
->inc(l_paxos_restart
);
1387 void Paxos::dispatch(MonOpRequestRef op
)
1389 assert(op
->is_type_paxos());
1390 op
->mark_paxos_event("dispatch");
1391 PaxosServiceMessage
*m
= static_cast<PaxosServiceMessage
*>(op
->get_req());
1392 // election in progress?
1393 if (!mon
->is_leader() && !mon
->is_peon()) {
1394 dout(5) << "election in progress, dropping " << *m
<< dendl
;
1399 assert(mon
->is_leader() ||
1400 (mon
->is_peon() && m
->get_source().num() == mon
->get_leader()));
1402 switch (m
->get_type()) {
1406 MMonPaxos
*pm
= reinterpret_cast<MMonPaxos
*>(m
);
1408 // NOTE: these ops are defined in messages/MMonPaxos.h
1411 case MMonPaxos::OP_COLLECT
:
1414 case MMonPaxos::OP_LAST
:
1417 case MMonPaxos::OP_BEGIN
:
1420 case MMonPaxos::OP_ACCEPT
:
1423 case MMonPaxos::OP_COMMIT
:
1426 case MMonPaxos::OP_LEASE
:
1429 case MMonPaxos::OP_LEASE_ACK
:
1430 handle_lease_ack(op
);
1444 // -----------------
1445 // service interface
1449 bool Paxos::is_readable(version_t v
)
1452 if (v
> last_committed
)
1456 (mon
->is_peon() || mon
->is_leader()) &&
1457 (is_active() || is_updating() || is_writing()) &&
1458 last_committed
> 0 && is_lease_valid(); // must have a value alone, or have lease
1459 dout(5) << __func__
<< " = " << (int)ret
1460 << " - now=" << ceph_clock_now()
1461 << " lease_expire=" << lease_expire
1462 << " has v" << v
<< " lc " << last_committed
1467 bool Paxos::read(version_t v
, bufferlist
&bl
)
1469 if (!get_store()->get(get_name(), v
, bl
))
1474 version_t
Paxos::read_current(bufferlist
&bl
)
1476 if (read(last_committed
, bl
))
1477 return last_committed
;
1482 bool Paxos::is_lease_valid()
1484 return ((mon
->get_quorum().size() == 1)
1485 || (ceph_clock_now() < lease_expire
));
1490 bool Paxos::is_writeable()
1498 void Paxos::propose_pending()
1500 assert(is_active());
1501 assert(pending_proposal
);
1506 pending_proposal
->encode(bl
);
1508 dout(10) << __func__
<< " " << (last_committed
+ 1)
1509 << " " << bl
.length() << " bytes" << dendl
;
1510 dout(30) << __func__
<< " transaction dump:\n";
1511 JSONFormatter
f(true);
1512 pending_proposal
->dump(&f
);
1516 pending_proposal
.reset();
1518 committing_finishers
.swap(pending_finishers
);
1519 state
= STATE_UPDATING
;
1523 void Paxos::queue_pending_finisher(Context
*onfinished
)
1525 dout(5) << __func__
<< " " << onfinished
<< dendl
;
1527 pending_finishers
.push_back(onfinished
);
1530 MonitorDBStore::TransactionRef
Paxos::get_pending_transaction()
1532 assert(mon
->is_leader());
1533 if (!pending_proposal
) {
1534 pending_proposal
.reset(new MonitorDBStore::Transaction
);
1535 assert(pending_finishers
.empty());
1537 return pending_proposal
;
1540 bool Paxos::trigger_propose()
1543 dout(10) << __func__
<< " active, proposing now" << dendl
;
1547 dout(10) << __func__
<< " not active, will propose later" << dendl
;
1552 bool Paxos::is_consistent()
1554 return (first_committed
<= last_committed
);