1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include "messages/MMonPaxos.h"
20 #include "mon/mon_types.h"
21 #include "common/config.h"
22 #include "include/ceph_assert.h"
23 #include "include/stringify.h"
24 #include "common/Timer.h"
25 #include "messages/PaxosServiceMessage.h"
27 #define dout_subsys ceph_subsys_paxos
29 #define dout_prefix _prefix(_dout, mon, mon->name, mon->rank, paxos_name, state, first_committed, last_committed)
30 static ostream
& _prefix(std::ostream
*_dout
, Monitor
*mon
, const string
& name
,
31 int rank
, const string
& paxos_name
, int state
,
32 version_t first_committed
, version_t last_committed
)
34 return *_dout
<< "mon." << name
<< "@" << rank
35 << "(" << mon
->get_state_name() << ")"
36 << ".paxos(" << paxos_name
<< " " << Paxos::get_statename(state
)
37 << " c " << first_committed
<< ".." << last_committed
41 class Paxos::C_Trimmed
: public Context
{
44 explicit C_Trimmed(Paxos
*p
) : paxos(p
) { }
45 void finish(int r
) override
{
46 paxos
->trimming
= false;
50 MonitorDBStore
*Paxos::get_store()
55 void Paxos::read_and_prepare_transactions(MonitorDBStore::TransactionRef tx
,
56 version_t first
, version_t last
)
58 dout(10) << __func__
<< " first " << first
<< " last " << last
<< dendl
;
59 for (version_t v
= first
; v
<= last
; ++v
) {
60 dout(30) << __func__
<< " apply version " << v
<< dendl
;
62 int err
= get_store()->get(get_name(), v
, bl
);
63 ceph_assert(err
== 0);
64 ceph_assert(bl
.length());
65 decode_append_transaction(tx
, bl
);
67 dout(15) << __func__
<< " total versions " << (last
-first
) << dendl
;
72 // load paxos variables from stable storage
73 last_pn
= get_store()->get(get_name(), "last_pn");
74 accepted_pn
= get_store()->get(get_name(), "accepted_pn");
75 last_committed
= get_store()->get(get_name(), "last_committed");
76 first_committed
= get_store()->get(get_name(), "first_committed");
78 dout(10) << __func__
<< " last_pn: " << last_pn
<< " accepted_pn: "
79 << accepted_pn
<< " last_committed: " << last_committed
80 << " first_committed: " << first_committed
<< dendl
;
82 dout(10) << "init" << dendl
;
83 ceph_assert(is_consistent());
86 void Paxos::init_logger()
88 PerfCountersBuilder
pcb(g_ceph_context
, "paxos", l_paxos_first
, l_paxos_last
);
90 // Because monitors are so few in number, the resource cost of capturing
91 // almost all their perf counters at USEFUL is trivial.
92 pcb
.set_prio_default(PerfCountersBuilder::PRIO_USEFUL
);
94 pcb
.add_u64_counter(l_paxos_start_leader
, "start_leader", "Starts in leader role");
95 pcb
.add_u64_counter(l_paxos_start_peon
, "start_peon", "Starts in peon role");
96 pcb
.add_u64_counter(l_paxos_restart
, "restart", "Restarts");
97 pcb
.add_u64_counter(l_paxos_refresh
, "refresh", "Refreshes");
98 pcb
.add_time_avg(l_paxos_refresh_latency
, "refresh_latency", "Refresh latency");
99 pcb
.add_u64_counter(l_paxos_begin
, "begin", "Started and handled begins");
100 pcb
.add_u64_avg(l_paxos_begin_keys
, "begin_keys", "Keys in transaction on begin");
101 pcb
.add_u64_avg(l_paxos_begin_bytes
, "begin_bytes", "Data in transaction on begin", NULL
, 0, unit_t(UNIT_BYTES
));
102 pcb
.add_time_avg(l_paxos_begin_latency
, "begin_latency", "Latency of begin operation");
103 pcb
.add_u64_counter(l_paxos_commit
, "commit",
105 pcb
.add_u64_avg(l_paxos_commit_keys
, "commit_keys", "Keys in transaction on commit");
106 pcb
.add_u64_avg(l_paxos_commit_bytes
, "commit_bytes", "Data in transaction on commit", NULL
, 0, unit_t(UNIT_BYTES
));
107 pcb
.add_time_avg(l_paxos_commit_latency
, "commit_latency",
108 "Commit latency", "clat");
109 pcb
.add_u64_counter(l_paxos_collect
, "collect", "Peon collects");
110 pcb
.add_u64_avg(l_paxos_collect_keys
, "collect_keys", "Keys in transaction on peon collect");
111 pcb
.add_u64_avg(l_paxos_collect_bytes
, "collect_bytes", "Data in transaction on peon collect", NULL
, 0, unit_t(UNIT_BYTES
));
112 pcb
.add_time_avg(l_paxos_collect_latency
, "collect_latency", "Peon collect latency");
113 pcb
.add_u64_counter(l_paxos_collect_uncommitted
, "collect_uncommitted", "Uncommitted values in started and handled collects");
114 pcb
.add_u64_counter(l_paxos_collect_timeout
, "collect_timeout", "Collect timeouts");
115 pcb
.add_u64_counter(l_paxos_accept_timeout
, "accept_timeout", "Accept timeouts");
116 pcb
.add_u64_counter(l_paxos_lease_ack_timeout
, "lease_ack_timeout", "Lease acknowledgement timeouts");
117 pcb
.add_u64_counter(l_paxos_lease_timeout
, "lease_timeout", "Lease timeouts");
118 pcb
.add_u64_counter(l_paxos_store_state
, "store_state", "Store a shared state on disk");
119 pcb
.add_u64_avg(l_paxos_store_state_keys
, "store_state_keys", "Keys in transaction in stored state");
120 pcb
.add_u64_avg(l_paxos_store_state_bytes
, "store_state_bytes", "Data in transaction in stored state", NULL
, 0, unit_t(UNIT_BYTES
));
121 pcb
.add_time_avg(l_paxos_store_state_latency
, "store_state_latency", "Storing state latency");
122 pcb
.add_u64_counter(l_paxos_share_state
, "share_state", "Sharings of state");
123 pcb
.add_u64_avg(l_paxos_share_state_keys
, "share_state_keys", "Keys in shared state");
124 pcb
.add_u64_avg(l_paxos_share_state_bytes
, "share_state_bytes", "Data in shared state", NULL
, 0, unit_t(UNIT_BYTES
));
125 pcb
.add_u64_counter(l_paxos_new_pn
, "new_pn", "New proposal number queries");
126 pcb
.add_time_avg(l_paxos_new_pn_latency
, "new_pn_latency", "New proposal number getting latency");
127 logger
= pcb
.create_perf_counters();
128 g_ceph_context
->get_perfcounters_collection()->add(logger
);
131 void Paxos::dump_info(Formatter
*f
)
133 f
->open_object_section("paxos");
134 f
->dump_unsigned("first_committed", first_committed
);
135 f
->dump_unsigned("last_committed", last_committed
);
136 f
->dump_unsigned("last_pn", last_pn
);
137 f
->dump_unsigned("accepted_pn", accepted_pn
);
141 // ---------------------------------
146 void Paxos::collect(version_t oldpn
)
148 // we're recoverying, it seems!
149 state
= STATE_RECOVERING
;
150 ceph_assert(mon
->is_leader());
152 // reset the number of lasts received
155 uncommitted_value
.clear();
156 peer_first_committed
.clear();
157 peer_last_committed
.clear();
159 // look for uncommitted value
160 if (get_store()->exists(get_name(), last_committed
+1)) {
161 version_t v
= get_store()->get(get_name(), "pending_v");
162 version_t pn
= get_store()->get(get_name(), "pending_pn");
163 if (v
&& pn
&& v
== last_committed
+ 1) {
166 dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << accepted_pn
167 << " and crossing our fingers" << dendl
;
168 uncommitted_pn
= accepted_pn
;
170 uncommitted_v
= last_committed
+1;
172 get_store()->get(get_name(), last_committed
+1, uncommitted_value
);
173 ceph_assert(uncommitted_value
.length());
174 dout(10) << "learned uncommitted " << (last_committed
+1)
175 << " pn " << uncommitted_pn
176 << " (" << uncommitted_value
.length() << " bytes) from myself"
179 logger
->inc(l_paxos_collect_uncommitted
);
183 accepted_pn
= get_new_proposal_number(std::max(accepted_pn
, oldpn
));
184 accepted_pn_from
= last_committed
;
186 dout(10) << "collect with pn " << accepted_pn
<< dendl
;
189 for (set
<int>::const_iterator p
= mon
->get_quorum().begin();
190 p
!= mon
->get_quorum().end();
192 if (*p
== mon
->rank
) continue;
194 MMonPaxos
*collect
= new MMonPaxos(mon
->get_epoch(), MMonPaxos::OP_COLLECT
,
196 collect
->last_committed
= last_committed
;
197 collect
->first_committed
= first_committed
;
198 collect
->pn
= accepted_pn
;
199 mon
->send_mon_message(collect
, *p
);
203 collect_timeout_event
= mon
->timer
.add_event_after(
204 g_conf()->mon_accept_timeout_factor
*
206 new C_MonContext(mon
, [this](int r
) {
215 void Paxos::handle_collect(MonOpRequestRef op
)
218 op
->mark_paxos_event("handle_collect");
220 MMonPaxos
*collect
= static_cast<MMonPaxos
*>(op
->get_req());
221 dout(10) << "handle_collect " << *collect
<< dendl
;
223 ceph_assert(mon
->is_peon()); // mon epoch filter should catch strays
225 // we're recoverying, it seems!
226 state
= STATE_RECOVERING
;
228 //update the peon recovery timeout
229 reset_lease_timeout();
231 if (collect
->first_committed
> last_committed
+1) {
233 << " leader's lowest version is too high for our last committed"
234 << " (theirs: " << collect
->first_committed
235 << "; ours: " << last_committed
<< ") -- bootstrap!" << dendl
;
236 op
->mark_paxos_event("need to bootstrap");
242 MMonPaxos
*last
= new MMonPaxos(mon
->get_epoch(), MMonPaxos::OP_LAST
,
244 last
->last_committed
= last_committed
;
245 last
->first_committed
= first_committed
;
247 version_t previous_pn
= accepted_pn
;
249 // can we accept this pn?
250 if (collect
->pn
> accepted_pn
) {
252 accepted_pn
= collect
->pn
;
253 accepted_pn_from
= collect
->pn_from
;
254 dout(10) << "accepting pn " << accepted_pn
<< " from "
255 << accepted_pn_from
<< dendl
;
257 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
258 t
->put(get_name(), "accepted_pn", accepted_pn
);
260 dout(30) << __func__
<< " transaction dump:\n";
261 JSONFormatter
f(true);
266 logger
->inc(l_paxos_collect
);
267 logger
->inc(l_paxos_collect_keys
, t
->get_keys());
268 logger
->inc(l_paxos_collect_bytes
, t
->get_bytes());
270 auto start
= ceph::coarse_mono_clock::now();
271 get_store()->apply_transaction(t
);
272 auto end
= ceph::coarse_mono_clock::now();
274 logger
->tinc(l_paxos_collect_latency
, to_timespan(end
- start
));
277 dout(10) << "NOT accepting pn " << collect
->pn
<< " from " << collect
->pn_from
278 << ", we already accepted " << accepted_pn
279 << " from " << accepted_pn_from
<< dendl
;
281 last
->pn
= accepted_pn
;
282 last
->pn_from
= accepted_pn_from
;
284 // share whatever committed values we have
285 if (collect
->last_committed
< last_committed
)
286 share_state(last
, collect
->first_committed
, collect
->last_committed
);
288 // do we have an accepted but uncommitted value?
289 // (it'll be at last_committed+1)
291 if (collect
->last_committed
<= last_committed
&&
292 get_store()->exists(get_name(), last_committed
+1)) {
293 get_store()->get(get_name(), last_committed
+1, bl
);
294 ceph_assert(bl
.length() > 0);
295 dout(10) << " sharing our accepted but uncommitted value for "
296 << last_committed
+1 << " (" << bl
.length() << " bytes)" << dendl
;
297 last
->values
[last_committed
+1] = bl
;
299 version_t v
= get_store()->get(get_name(), "pending_v");
300 version_t pn
= get_store()->get(get_name(), "pending_pn");
301 if (v
&& pn
&& v
== last_committed
+ 1) {
302 last
->uncommitted_pn
= pn
;
304 // previously we didn't record which pn a value was accepted
305 // under! use the pn value we just had... :(
306 dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << previous_pn
307 << " and crossing our fingers" << dendl
;
308 last
->uncommitted_pn
= previous_pn
;
311 logger
->inc(l_paxos_collect_uncommitted
);
315 collect
->get_connection()->send_message(last
);
319 * @note This is Okay. We share our versions between peer_last_committed and
320 * our last_committed (inclusive), and add their bufferlists to the
321 * message. It will be the peer's job to apply them to its store, as
322 * these bufferlists will contain raw transactions.
323 * This function is called by both the Peon and the Leader. The Peon will
324 * share the state with the Leader during handle_collect(), sharing any
325 * values the leader may be missing (i.e., the leader's last_committed is
326 * lower than the peon's last_committed). The Leader will share the state
327 * with the Peon during handle_last(), if the peon's last_committed is
328 * lower than the leader's last_committed.
330 void Paxos::share_state(MMonPaxos
*m
, version_t peer_first_committed
,
331 version_t peer_last_committed
)
333 ceph_assert(peer_last_committed
< last_committed
);
335 dout(10) << "share_state peer has fc " << peer_first_committed
336 << " lc " << peer_last_committed
<< dendl
;
337 version_t v
= peer_last_committed
+ 1;
339 // include incrementals
341 for ( ; v
<= last_committed
; v
++) {
342 if (get_store()->exists(get_name(), v
)) {
343 get_store()->get(get_name(), v
, m
->values
[v
]);
344 ceph_assert(m
->values
[v
].length());
345 dout(10) << " sharing " << v
<< " ("
346 << m
->values
[v
].length() << " bytes)" << dendl
;
347 bytes
+= m
->values
[v
].length() + 16; // paxos_ + 10 digits = 16
350 logger
->inc(l_paxos_share_state
);
351 logger
->inc(l_paxos_share_state_keys
, m
->values
.size());
352 logger
->inc(l_paxos_share_state_bytes
, bytes
);
354 m
->last_committed
= last_committed
;
358 * Store on disk a state that was shared with us
360 * Basically, we received a set of version. Or just one. It doesn't matter.
361 * What matters is that we have to stash it in the store. So, we will simply
362 * write every single bufferlist into their own versions on our side (i.e.,
363 * onto paxos-related keys), and then we will decode those same bufferlists
364 * we just wrote and apply the transactions they hold. We will also update
365 * our first and last committed values to point to the new values, if need
366 * be. All all this is done tightly wrapped in a transaction to ensure we
367 * enjoy the atomicity guarantees given by our awesome k/v store.
369 bool Paxos::store_state(MMonPaxos
*m
)
371 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
372 map
<version_t
,bufferlist
>::iterator start
= m
->values
.begin();
373 bool changed
= false;
375 // build map of values to store
376 // we want to write the range [last_committed, m->last_committed] only.
377 if (start
!= m
->values
.end() &&
378 start
->first
> last_committed
+ 1) {
379 // ignore everything if values start in the future.
380 dout(10) << "store_state ignoring all values, they start at " << start
->first
381 << " > last_committed+1" << dendl
;
385 // push forward the start position on the message's values iterator, up until
386 // we run out of positions or we find a position matching 'last_committed'.
387 while (start
!= m
->values
.end() && start
->first
<= last_committed
) {
391 // make sure we get the right interval of values to apply by pushing forward
392 // the 'end' iterator until it matches the message's 'last_committed'.
393 map
<version_t
,bufferlist
>::iterator end
= start
;
394 while (end
!= m
->values
.end() && end
->first
<= m
->last_committed
) {
395 last_committed
= end
->first
;
400 dout(10) << "store_state nothing to commit" << dendl
;
402 dout(10) << "store_state [" << start
->first
<< ".."
403 << last_committed
<< "]" << dendl
;
404 t
->put(get_name(), "last_committed", last_committed
);
406 // we should apply the state here -- decode every single bufferlist in the
407 // map and append the transactions to 't'.
408 map
<version_t
,bufferlist
>::iterator it
;
409 for (it
= start
; it
!= end
; ++it
) {
410 // write the bufferlist as the version's value
411 t
->put(get_name(), it
->first
, it
->second
);
412 // decode the bufferlist and append it to the transaction we will shortly
414 decode_append_transaction(t
, it
->second
);
417 // discard obsolete uncommitted value?
418 if (uncommitted_v
&& uncommitted_v
<= last_committed
) {
419 dout(10) << " forgetting obsolete uncommitted value " << uncommitted_v
420 << " pn " << uncommitted_pn
<< dendl
;
423 uncommitted_value
.clear();
427 dout(30) << __func__
<< " transaction dump:\n";
428 JSONFormatter
f(true);
433 logger
->inc(l_paxos_store_state
);
434 logger
->inc(l_paxos_store_state_bytes
, t
->get_bytes());
435 logger
->inc(l_paxos_store_state_keys
, t
->get_keys());
437 auto start
= ceph::coarse_mono_clock::now();
438 get_store()->apply_transaction(t
);
439 auto end
= ceph::coarse_mono_clock::now();
441 logger
->tinc(l_paxos_store_state_latency
, to_timespan(end
-start
));
443 // refresh first_committed; this txn may have trimmed.
444 first_committed
= get_store()->get(get_name(), "first_committed");
446 _sanity_check_store();
453 void Paxos::_sanity_check_store()
455 version_t lc
= get_store()->get(get_name(), "last_committed");
456 ceph_assert(lc
== last_committed
);
461 void Paxos::handle_last(MonOpRequestRef op
)
463 op
->mark_paxos_event("handle_last");
464 MMonPaxos
*last
= static_cast<MMonPaxos
*>(op
->get_req());
465 bool need_refresh
= false;
466 int from
= last
->get_source().num();
468 dout(10) << "handle_last " << *last
<< dendl
;
470 if (!mon
->is_leader()) {
471 dout(10) << "not leader, dropping" << dendl
;
475 // note peer's first_ and last_committed, in case we learn a new
476 // commit and need to push it to them.
477 peer_first_committed
[from
] = last
->first_committed
;
478 peer_last_committed
[from
] = last
->last_committed
;
480 if (last
->first_committed
> last_committed
+ 1) {
483 << " lowest version is too high for our last committed"
484 << " (theirs: " << last
->first_committed
485 << "; ours: " << last_committed
<< ") -- bootstrap!" << dendl
;
486 op
->mark_paxos_event("need to bootstrap");
491 ceph_assert(g_conf()->paxos_kill_at
!= 1);
493 // store any committed values if any are specified in the message
494 need_refresh
= store_state(last
);
496 ceph_assert(g_conf()->paxos_kill_at
!= 2);
498 // is everyone contiguous and up to date?
499 for (map
<int,version_t
>::iterator p
= peer_last_committed
.begin();
500 p
!= peer_last_committed
.end();
502 if (p
->second
+ 1 < first_committed
&& first_committed
> 1) {
504 << " peon " << p
->first
505 << " last_committed (" << p
->second
506 << ") is too low for our first_committed (" << first_committed
507 << ") -- bootstrap!" << dendl
;
508 op
->mark_paxos_event("need to bootstrap");
512 if (p
->second
< last_committed
) {
513 // share committed values
514 dout(10) << " sending commit to mon." << p
->first
<< dendl
;
515 MMonPaxos
*commit
= new MMonPaxos(mon
->get_epoch(),
516 MMonPaxos::OP_COMMIT
,
518 share_state(commit
, peer_first_committed
[p
->first
], p
->second
);
519 mon
->send_mon_message(commit
, p
->first
);
523 // do they accept your pn?
524 if (last
->pn
> accepted_pn
) {
526 dout(10) << " they had a higher pn than us, picking a new one." << dendl
;
528 // cancel timeout event
529 mon
->timer
.cancel_event(collect_timeout_event
);
530 collect_timeout_event
= 0;
533 } else if (last
->pn
== accepted_pn
) {
534 // yes, they accepted our pn. great.
536 dout(10) << " they accepted our pn, we now have "
537 << num_last
<< " peons" << dendl
;
539 // did this person send back an accepted but uncommitted value?
540 if (last
->uncommitted_pn
) {
541 if (last
->uncommitted_pn
>= uncommitted_pn
&&
542 last
->last_committed
>= last_committed
&&
543 last
->last_committed
+ 1 >= uncommitted_v
) {
544 uncommitted_v
= last
->last_committed
+1;
545 uncommitted_pn
= last
->uncommitted_pn
;
546 uncommitted_value
= last
->values
[uncommitted_v
];
547 dout(10) << "we learned an uncommitted value for " << uncommitted_v
548 << " pn " << uncommitted_pn
549 << " " << uncommitted_value
.length() << " bytes"
552 dout(10) << "ignoring uncommitted value for " << (last
->last_committed
+1)
553 << " pn " << last
->uncommitted_pn
554 << " " << last
->values
[last
->last_committed
+1].length() << " bytes"
560 if (num_last
== mon
->get_quorum().size()) {
561 // cancel timeout event
562 mon
->timer
.cancel_event(collect_timeout_event
);
563 collect_timeout_event
= 0;
564 peer_first_committed
.clear();
565 peer_last_committed
.clear();
569 // did we learn an old value?
570 if (uncommitted_v
== last_committed
+1 &&
571 uncommitted_value
.length()) {
572 dout(10) << "that's everyone. begin on old learned value" << dendl
;
573 state
= STATE_UPDATING_PREVIOUS
;
574 begin(uncommitted_value
);
577 dout(10) << "that's everyone. active!" << dendl
;
580 need_refresh
= false;
587 // no, this is an old message, discard
588 dout(10) << "old pn, ignoring" << dendl
;
595 void Paxos::collect_timeout()
597 dout(1) << "collect timeout, calling fresh election" << dendl
;
598 collect_timeout_event
= 0;
599 logger
->inc(l_paxos_collect_timeout
);
600 ceph_assert(mon
->is_leader());
606 void Paxos::begin(bufferlist
& v
)
608 dout(10) << "begin for " << last_committed
+1 << " "
609 << v
.length() << " bytes"
612 ceph_assert(mon
->is_leader());
613 ceph_assert(is_updating() || is_updating_previous());
615 // we must already have a majority for this to work.
616 ceph_assert(mon
->get_quorum().size() == 1 ||
617 num_last
> (unsigned)mon
->monmap
->size()/2);
619 // and no value, yet.
620 ceph_assert(new_value
.length() == 0);
622 // accept it ourselves
624 accepted
.insert(mon
->rank
);
627 if (last_committed
== 0) {
628 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
629 // initial base case; set first_committed too
630 t
->put(get_name(), "first_committed", 1);
631 decode_append_transaction(t
, new_value
);
639 // store the proposed value in the store. IF it is accepted, we will then
640 // have to decode it into a transaction and apply it.
641 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
642 t
->put(get_name(), last_committed
+1, new_value
);
644 // note which pn this pending value is for.
645 t
->put(get_name(), "pending_v", last_committed
+ 1);
646 t
->put(get_name(), "pending_pn", accepted_pn
);
648 dout(30) << __func__
<< " transaction dump:\n";
649 JSONFormatter
f(true);
652 auto debug_tx(std::make_shared
<MonitorDBStore::Transaction
>());
653 auto new_value_it
= new_value
.cbegin();
654 debug_tx
->decode(new_value_it
);
656 *_dout
<< "\nbl dump:\n";
660 logger
->inc(l_paxos_begin
);
661 logger
->inc(l_paxos_begin_keys
, t
->get_keys());
662 logger
->inc(l_paxos_begin_bytes
, t
->get_bytes());
664 auto start
= ceph::coarse_mono_clock::now();
665 get_store()->apply_transaction(t
);
666 auto end
= ceph::coarse_mono_clock::now();
668 logger
->tinc(l_paxos_begin_latency
, to_timespan(end
- start
));
670 ceph_assert(g_conf()->paxos_kill_at
!= 3);
672 if (mon
->get_quorum().size() == 1) {
673 // we're alone, take it easy
678 // ask others to accept it too!
679 for (set
<int>::const_iterator p
= mon
->get_quorum().begin();
680 p
!= mon
->get_quorum().end();
682 if (*p
== mon
->rank
) continue;
684 dout(10) << " sending begin to mon." << *p
<< dendl
;
685 MMonPaxos
*begin
= new MMonPaxos(mon
->get_epoch(), MMonPaxos::OP_BEGIN
,
687 begin
->values
[last_committed
+1] = new_value
;
688 begin
->last_committed
= last_committed
;
689 begin
->pn
= accepted_pn
;
691 mon
->send_mon_message(begin
, *p
);
695 accept_timeout_event
= mon
->timer
.add_event_after(
696 g_conf()->mon_accept_timeout_factor
* g_conf()->mon_lease
,
697 new C_MonContext(mon
, [this](int r
) {
705 void Paxos::handle_begin(MonOpRequestRef op
)
707 op
->mark_paxos_event("handle_begin");
708 MMonPaxos
*begin
= static_cast<MMonPaxos
*>(op
->get_req());
709 dout(10) << "handle_begin " << *begin
<< dendl
;
711 // can we accept this?
712 if (begin
->pn
< accepted_pn
) {
713 dout(10) << " we accepted a higher pn " << accepted_pn
<< ", ignoring" << dendl
;
714 op
->mark_paxos_event("have higher pn, ignore");
717 ceph_assert(begin
->pn
== accepted_pn
);
718 ceph_assert(begin
->last_committed
== last_committed
);
720 ceph_assert(g_conf()->paxos_kill_at
!= 4);
722 logger
->inc(l_paxos_begin
);
725 state
= STATE_UPDATING
;
726 lease_expire
= utime_t(); // cancel lease
729 version_t v
= last_committed
+1;
730 dout(10) << "accepting value for " << v
<< " pn " << accepted_pn
<< dendl
;
731 // store the accepted value onto our store. We will have to decode it and
732 // apply its transaction once we receive permission to commit.
733 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
734 t
->put(get_name(), v
, begin
->values
[v
]);
736 // note which pn this pending value is for.
737 t
->put(get_name(), "pending_v", v
);
738 t
->put(get_name(), "pending_pn", accepted_pn
);
740 dout(30) << __func__
<< " transaction dump:\n";
741 JSONFormatter
f(true);
746 logger
->inc(l_paxos_begin_bytes
, t
->get_bytes());
748 auto start
= ceph::coarse_mono_clock::now();
749 get_store()->apply_transaction(t
);
750 auto end
= ceph::coarse_mono_clock::now();
752 logger
->tinc(l_paxos_begin_latency
, to_timespan(end
- start
));
754 ceph_assert(g_conf()->paxos_kill_at
!= 5);
757 MMonPaxos
*accept
= new MMonPaxos(mon
->get_epoch(), MMonPaxos::OP_ACCEPT
,
759 accept
->pn
= accepted_pn
;
760 accept
->last_committed
= last_committed
;
761 begin
->get_connection()->send_message(accept
);
765 void Paxos::handle_accept(MonOpRequestRef op
)
767 op
->mark_paxos_event("handle_accept");
768 MMonPaxos
*accept
= static_cast<MMonPaxos
*>(op
->get_req());
769 dout(10) << "handle_accept " << *accept
<< dendl
;
770 int from
= accept
->get_source().num();
772 if (accept
->pn
!= accepted_pn
) {
773 // we accepted a higher pn, from some other leader
774 dout(10) << " we accepted a higher pn " << accepted_pn
<< ", ignoring" << dendl
;
775 op
->mark_paxos_event("have higher pn, ignore");
778 if (last_committed
> 0 &&
779 accept
->last_committed
< last_committed
-1) {
780 dout(10) << " this is from an old round, ignoring" << dendl
;
781 op
->mark_paxos_event("old round, ignore");
784 ceph_assert(accept
->last_committed
== last_committed
|| // not committed
785 accept
->last_committed
== last_committed
-1); // committed
787 ceph_assert(is_updating() || is_updating_previous());
788 ceph_assert(accepted
.count(from
) == 0);
789 accepted
.insert(from
);
790 dout(10) << " now " << accepted
<< " have accepted" << dendl
;
792 ceph_assert(g_conf()->paxos_kill_at
!= 6);
794 // only commit (and expose committed state) when we get *all* quorum
795 // members to accept. otherwise, they may still be sharing the now
797 // FIXME: we can improve this with an additional lease revocation message
798 // that doesn't block for the persist.
799 if (accepted
== mon
->get_quorum()) {
801 dout(10) << " got majority, committing, done with update" << dendl
;
802 op
->mark_paxos_event("commit_start");
807 void Paxos::accept_timeout()
809 dout(1) << "accept timeout, calling fresh election" << dendl
;
810 accept_timeout_event
= 0;
811 ceph_assert(mon
->is_leader());
812 ceph_assert(is_updating() || is_updating_previous() || is_writing() ||
813 is_writing_previous());
814 logger
->inc(l_paxos_accept_timeout
);
818 struct C_Committed
: public Context
{
820 explicit C_Committed(Paxos
*p
) : paxos(p
) {}
821 void finish(int r
) override
{
823 std::lock_guard
l(paxos
->mon
->lock
);
824 if (paxos
->is_shutdown()) {
825 paxos
->abort_commit();
828 paxos
->commit_finish();
832 void Paxos::abort_commit()
834 ceph_assert(commits_started
> 0);
836 if (commits_started
== 0)
837 shutdown_cond
.Signal();
840 void Paxos::commit_start()
842 dout(10) << __func__
<< " " << (last_committed
+1) << dendl
;
844 ceph_assert(g_conf()->paxos_kill_at
!= 7);
846 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
849 t
->put(get_name(), "last_committed", last_committed
+ 1);
851 // decode the value and apply its transaction to the store.
852 // this value can now be read from last_committed.
853 decode_append_transaction(t
, new_value
);
855 dout(30) << __func__
<< " transaction dump:\n";
856 JSONFormatter
f(true);
861 logger
->inc(l_paxos_commit
);
862 logger
->inc(l_paxos_commit_keys
, t
->get_keys());
863 logger
->inc(l_paxos_commit_bytes
, t
->get_bytes());
864 commit_start_stamp
= ceph_clock_now();
866 get_store()->queue_transaction(t
, new C_Committed(this));
868 if (is_updating_previous())
869 state
= STATE_WRITING_PREVIOUS
;
870 else if (is_updating())
871 state
= STATE_WRITING
;
876 if (mon
->get_quorum().size() > 1) {
877 // cancel timeout event
878 mon
->timer
.cancel_event(accept_timeout_event
);
879 accept_timeout_event
= 0;
883 void Paxos::commit_finish()
885 dout(20) << __func__
<< " " << (last_committed
+1) << dendl
;
886 utime_t end
= ceph_clock_now();
887 logger
->tinc(l_paxos_commit_latency
, end
- commit_start_stamp
);
889 ceph_assert(g_conf()->paxos_kill_at
!= 8);
891 // cancel lease - it was for the old value.
892 // (this would only happen if message layer lost the 'begin', but
893 // leader still got a majority and committed with out us.)
894 lease_expire
= utime_t(); // cancel lease
897 last_commit_time
= ceph_clock_now();
899 // refresh first_committed; this txn may have trimmed.
900 first_committed
= get_store()->get(get_name(), "first_committed");
902 _sanity_check_store();
905 for (set
<int>::const_iterator p
= mon
->get_quorum().begin();
906 p
!= mon
->get_quorum().end();
908 if (*p
== mon
->rank
) continue;
910 dout(10) << " sending commit to mon." << *p
<< dendl
;
911 MMonPaxos
*commit
= new MMonPaxos(mon
->get_epoch(), MMonPaxos::OP_COMMIT
,
913 commit
->values
[last_committed
] = new_value
;
914 commit
->pn
= accepted_pn
;
915 commit
->last_committed
= last_committed
;
917 mon
->send_mon_message(commit
, *p
);
920 ceph_assert(g_conf()->paxos_kill_at
!= 9);
922 // get ready for a new round.
925 // WRITING -> REFRESH
926 // among other things, this lets do_refresh() -> mon->bootstrap() ->
927 // wait_for_paxos_write() know that it doesn't need to flush the store
928 // queue. and it should not, as we are in the async completion thread now!
929 ceph_assert(is_writing() || is_writing_previous());
930 state
= STATE_REFRESH
;
931 ceph_assert(commits_started
> 0);
936 if (mon
->get_quorum().size() > 1) {
940 ceph_assert(g_conf()->paxos_kill_at
!= 10);
947 void Paxos::handle_commit(MonOpRequestRef op
)
949 op
->mark_paxos_event("handle_commit");
950 MMonPaxos
*commit
= static_cast<MMonPaxos
*>(op
->get_req());
951 dout(10) << "handle_commit on " << commit
->last_committed
<< dendl
;
953 logger
->inc(l_paxos_commit
);
955 if (!mon
->is_peon()) {
956 dout(10) << "not a peon, dropping" << dendl
;
961 op
->mark_paxos_event("store_state");
967 void Paxos::extend_lease()
969 ceph_assert(mon
->is_leader());
970 //assert(is_active());
972 lease_expire
= ceph_clock_now();
973 lease_expire
+= g_conf()->mon_lease
;
975 acked_lease
.insert(mon
->rank
);
977 dout(7) << "extend_lease now+" << g_conf()->mon_lease
978 << " (" << lease_expire
<< ")" << dendl
;
981 for (set
<int>::const_iterator p
= mon
->get_quorum().begin();
982 p
!= mon
->get_quorum().end(); ++p
) {
984 if (*p
== mon
->rank
) continue;
985 MMonPaxos
*lease
= new MMonPaxos(mon
->get_epoch(), MMonPaxos::OP_LEASE
,
987 lease
->last_committed
= last_committed
;
988 lease
->lease_timestamp
= lease_expire
;
989 lease
->first_committed
= first_committed
;
990 mon
->send_mon_message(lease
, *p
);
993 // set timeout event.
994 // if old timeout is still in place, leave it.
995 if (!lease_ack_timeout_event
) {
996 lease_ack_timeout_event
= mon
->timer
.add_event_after(
997 g_conf()->mon_lease_ack_timeout_factor
* g_conf()->mon_lease
,
998 new C_MonContext(mon
, [this](int r
) {
1001 lease_ack_timeout();
1006 utime_t at
= lease_expire
;
1007 at
-= g_conf()->mon_lease
;
1008 at
+= g_conf()->mon_lease_renew_interval_factor
* g_conf()->mon_lease
;
1009 lease_renew_event
= mon
->timer
.add_event_at(
1010 at
, new C_MonContext(mon
, [this](int r
) {
1011 if (r
== -ECANCELED
)
1013 lease_renew_timeout();
1017 void Paxos::warn_on_future_time(utime_t t
, entity_name_t from
)
1019 utime_t now
= ceph_clock_now();
1021 utime_t diff
= t
- now
;
1022 if (diff
> g_conf()->mon_clock_drift_allowed
) {
1023 utime_t warn_diff
= now
- last_clock_drift_warn
;
1025 pow(g_conf()->mon_clock_drift_warn_backoff
, clock_drift_warned
)) {
1026 mon
->clog
->warn() << "message from " << from
<< " was stamped " << diff
1027 << "s in the future, clocks not synchronized";
1028 last_clock_drift_warn
= ceph_clock_now();
1029 ++clock_drift_warned
;
1036 bool Paxos::do_refresh()
1038 bool need_bootstrap
= false;
1040 // make sure we have the latest state loaded up
1041 auto start
= ceph::coarse_mono_clock::now();
1042 mon
->refresh_from_paxos(&need_bootstrap
);
1043 auto end
= ceph::coarse_mono_clock::now();
1045 logger
->inc(l_paxos_refresh
);
1046 logger
->tinc(l_paxos_refresh_latency
, to_timespan(end
- start
));
1048 if (need_bootstrap
) {
1049 dout(10) << " doing requested bootstrap" << dendl
;
1057 void Paxos::commit_proposal()
1059 dout(10) << __func__
<< dendl
;
1060 ceph_assert(mon
->is_leader());
1061 ceph_assert(is_refresh());
1063 finish_contexts(g_ceph_context
, committing_finishers
);
1066 void Paxos::finish_round()
1068 dout(10) << __func__
<< dendl
;
1069 ceph_assert(mon
->is_leader());
1071 // ok, now go active!
1072 state
= STATE_ACTIVE
;
1074 dout(20) << __func__
<< " waiting_for_acting" << dendl
;
1075 finish_contexts(g_ceph_context
, waiting_for_active
);
1076 dout(20) << __func__
<< " waiting_for_readable" << dendl
;
1077 finish_contexts(g_ceph_context
, waiting_for_readable
);
1078 dout(20) << __func__
<< " waiting_for_writeable" << dendl
;
1079 finish_contexts(g_ceph_context
, waiting_for_writeable
);
1081 dout(10) << __func__
<< " done w/ waiters, state " << get_statename(state
) << dendl
;
1083 if (should_trim()) {
1087 if (is_active() && pending_proposal
) {
1094 void Paxos::handle_lease(MonOpRequestRef op
)
1096 op
->mark_paxos_event("handle_lease");
1097 MMonPaxos
*lease
= static_cast<MMonPaxos
*>(op
->get_req());
1099 if (!mon
->is_peon() ||
1100 last_committed
!= lease
->last_committed
) {
1101 dout(10) << "handle_lease i'm not a peon, or they're not the leader,"
1102 << " or the last_committed doesn't match, dropping" << dendl
;
1103 op
->mark_paxos_event("invalid lease, ignore");
1107 warn_on_future_time(lease
->sent_timestamp
, lease
->get_source());
1110 if (lease_expire
< lease
->lease_timestamp
) {
1111 lease_expire
= lease
->lease_timestamp
;
1113 utime_t now
= ceph_clock_now();
1114 if (lease_expire
< now
) {
1115 utime_t diff
= now
- lease_expire
;
1116 derr
<< "lease_expire from " << lease
->get_source_inst() << " is " << diff
<< " seconds in the past; mons are probably laggy (or possibly clocks are too skewed)" << dendl
;
1120 state
= STATE_ACTIVE
;
1122 dout(10) << "handle_lease on " << lease
->last_committed
1123 << " now " << lease_expire
<< dendl
;
1126 MMonPaxos
*ack
= new MMonPaxos(mon
->get_epoch(), MMonPaxos::OP_LEASE_ACK
,
1128 ack
->last_committed
= last_committed
;
1129 ack
->first_committed
= first_committed
;
1130 ack
->lease_timestamp
= ceph_clock_now();
1131 encode(mon
->session_map
.feature_map
, ack
->feature_map
);
1132 lease
->get_connection()->send_message(ack
);
1134 // (re)set timeout event.
1135 reset_lease_timeout();
1138 finish_contexts(g_ceph_context
, waiting_for_active
);
1140 finish_contexts(g_ceph_context
, waiting_for_readable
);
1143 void Paxos::handle_lease_ack(MonOpRequestRef op
)
1145 op
->mark_paxos_event("handle_lease_ack");
1146 MMonPaxos
*ack
= static_cast<MMonPaxos
*>(op
->get_req());
1147 int from
= ack
->get_source().num();
1149 if (!lease_ack_timeout_event
) {
1150 dout(10) << "handle_lease_ack from " << ack
->get_source()
1151 << " -- stray (probably since revoked)" << dendl
;
1153 } else if (acked_lease
.count(from
) == 0) {
1154 acked_lease
.insert(from
);
1155 if (ack
->feature_map
.length()) {
1156 auto p
= ack
->feature_map
.cbegin();
1157 FeatureMap
& t
= mon
->quorum_feature_map
[from
];
1160 if (acked_lease
== mon
->get_quorum()) {
1162 dout(10) << "handle_lease_ack from " << ack
->get_source()
1163 << " -- got everyone" << dendl
;
1164 mon
->timer
.cancel_event(lease_ack_timeout_event
);
1165 lease_ack_timeout_event
= 0;
1169 dout(10) << "handle_lease_ack from " << ack
->get_source()
1170 << " -- still need "
1171 << mon
->get_quorum().size() - acked_lease
.size()
1172 << " more" << dendl
;
1175 dout(10) << "handle_lease_ack from " << ack
->get_source()
1176 << " dup (lagging!), ignoring" << dendl
;
1179 warn_on_future_time(ack
->sent_timestamp
, ack
->get_source());
1182 void Paxos::lease_ack_timeout()
1184 dout(1) << "lease_ack_timeout -- calling new election" << dendl
;
1185 ceph_assert(mon
->is_leader());
1186 ceph_assert(is_active());
1187 logger
->inc(l_paxos_lease_ack_timeout
);
1188 lease_ack_timeout_event
= 0;
1192 void Paxos::reset_lease_timeout()
1194 dout(20) << "reset_lease_timeout - setting timeout event" << dendl
;
1195 if (lease_timeout_event
)
1196 mon
->timer
.cancel_event(lease_timeout_event
);
1197 lease_timeout_event
= mon
->timer
.add_event_after(
1198 g_conf()->mon_lease_ack_timeout_factor
* g_conf()->mon_lease
,
1199 new C_MonContext(mon
, [this](int r
) {
1200 if (r
== -ECANCELED
)
1206 void Paxos::lease_timeout()
1208 dout(1) << "lease_timeout -- calling new election" << dendl
;
1209 ceph_assert(mon
->is_peon());
1210 logger
->inc(l_paxos_lease_timeout
);
1211 lease_timeout_event
= 0;
1215 void Paxos::lease_renew_timeout()
1217 lease_renew_event
= 0;
1227 ceph_assert(should_trim());
1228 version_t end
= std::min(get_version() - g_conf()->paxos_min
,
1229 get_first_committed() + g_conf()->paxos_trim_max
);
1231 if (first_committed
>= end
)
1234 dout(10) << "trim to " << end
<< " (was " << first_committed
<< ")" << dendl
;
1236 MonitorDBStore::TransactionRef t
= get_pending_transaction();
1238 for (version_t v
= first_committed
; v
< end
; ++v
) {
1239 dout(10) << "trim " << v
<< dendl
;
1240 t
->erase(get_name(), v
);
1242 t
->put(get_name(), "first_committed", end
);
1243 if (g_conf()->mon_compact_on_trim
) {
1244 dout(10) << " compacting trimmed range" << dendl
;
1245 t
->compact_range(get_name(), stringify(first_committed
- 1), stringify(end
));
1249 queue_pending_finisher(new C_Trimmed(this));
1253 * return a globally unique, monotonically increasing proposal number
1255 version_t
Paxos::get_new_proposal_number(version_t gt
)
1260 // update. make it unique among all monitors.
1264 last_pn
+= (version_t
)mon
->rank
;
1267 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
1268 t
->put(get_name(), "last_pn", last_pn
);
1270 dout(30) << __func__
<< " transaction dump:\n";
1271 JSONFormatter
f(true);
1276 logger
->inc(l_paxos_new_pn
);
1278 auto start
= ceph::coarse_mono_clock::now();
1279 get_store()->apply_transaction(t
);
1280 auto end
= ceph::coarse_mono_clock::now();
1282 logger
->tinc(l_paxos_new_pn_latency
, to_timespan(end
- start
));
1284 dout(10) << "get_new_proposal_number = " << last_pn
<< dendl
;
1289 void Paxos::cancel_events()
1291 if (collect_timeout_event
) {
1292 mon
->timer
.cancel_event(collect_timeout_event
);
1293 collect_timeout_event
= 0;
1295 if (accept_timeout_event
) {
1296 mon
->timer
.cancel_event(accept_timeout_event
);
1297 accept_timeout_event
= 0;
1299 if (lease_renew_event
) {
1300 mon
->timer
.cancel_event(lease_renew_event
);
1301 lease_renew_event
= 0;
1303 if (lease_ack_timeout_event
) {
1304 mon
->timer
.cancel_event(lease_ack_timeout_event
);
1305 lease_ack_timeout_event
= 0;
1307 if (lease_timeout_event
) {
1308 mon
->timer
.cancel_event(lease_timeout_event
);
1309 lease_timeout_event
= 0;
1313 void Paxos::shutdown()
1315 dout(10) << __func__
<< " cancel all contexts" << dendl
;
1317 state
= STATE_SHUTDOWN
;
1319 // discard pending transaction
1320 pending_proposal
.reset();
1322 // Let store finish commits in progress
1323 // XXX: I assume I can't use finish_contexts() because the store
1324 // is going to trigger
1325 while(commits_started
> 0)
1326 shutdown_cond
.Wait(mon
->lock
);
1328 finish_contexts(g_ceph_context
, waiting_for_writeable
, -ECANCELED
);
1329 finish_contexts(g_ceph_context
, waiting_for_readable
, -ECANCELED
);
1330 finish_contexts(g_ceph_context
, waiting_for_active
, -ECANCELED
);
1331 finish_contexts(g_ceph_context
, pending_finishers
, -ECANCELED
);
1332 finish_contexts(g_ceph_context
, committing_finishers
, -ECANCELED
);
1334 g_ceph_context
->get_perfcounters_collection()->remove(logger
);
1338 void Paxos::leader_init()
1343 // discard pending transaction
1344 pending_proposal
.reset();
1346 reset_pending_committing_finishers();
1348 logger
->inc(l_paxos_start_leader
);
1350 if (mon
->get_quorum().size() == 1) {
1351 state
= STATE_ACTIVE
;
1355 state
= STATE_RECOVERING
;
1356 lease_expire
= utime_t();
1357 dout(10) << "leader_init -- starting paxos recovery" << dendl
;
1361 void Paxos::peon_init()
1366 state
= STATE_RECOVERING
;
1367 lease_expire
= utime_t();
1368 dout(10) << "peon_init -- i am a peon" << dendl
;
1370 // start a timer, in case the leader never manages to issue a lease
1371 reset_lease_timeout();
1373 // discard pending transaction
1374 pending_proposal
.reset();
1376 // no chance to write now!
1377 reset_pending_committing_finishers();
1378 finish_contexts(g_ceph_context
, waiting_for_writeable
, -EAGAIN
);
1380 logger
->inc(l_paxos_start_peon
);
1383 void Paxos::restart()
1385 dout(10) << "restart -- canceling timeouts" << dendl
;
1389 if (is_writing() || is_writing_previous()) {
1390 dout(10) << __func__
<< " flushing" << dendl
;
1392 mon
->store
->flush();
1394 dout(10) << __func__
<< " flushed" << dendl
;
1396 state
= STATE_RECOVERING
;
1398 // discard pending transaction
1399 pending_proposal
.reset();
1401 reset_pending_committing_finishers();
1402 finish_contexts(g_ceph_context
, waiting_for_active
, -EAGAIN
);
1404 logger
->inc(l_paxos_restart
);
1407 void Paxos::reset_pending_committing_finishers()
1409 committing_finishers
.splice(committing_finishers
.end(), pending_finishers
);
1410 finish_contexts(g_ceph_context
, committing_finishers
, -EAGAIN
);
1413 void Paxos::dispatch(MonOpRequestRef op
)
1415 ceph_assert(op
->is_type_paxos());
1416 op
->mark_paxos_event("dispatch");
1417 PaxosServiceMessage
*m
= static_cast<PaxosServiceMessage
*>(op
->get_req());
1418 // election in progress?
1419 if (!mon
->is_leader() && !mon
->is_peon()) {
1420 dout(5) << "election in progress, dropping " << *m
<< dendl
;
1425 ceph_assert(mon
->is_leader() ||
1426 (mon
->is_peon() && m
->get_source().num() == mon
->get_leader()));
1428 switch (m
->get_type()) {
1432 MMonPaxos
*pm
= reinterpret_cast<MMonPaxos
*>(m
);
1434 // NOTE: these ops are defined in messages/MMonPaxos.h
1437 case MMonPaxos::OP_COLLECT
:
1440 case MMonPaxos::OP_LAST
:
1443 case MMonPaxos::OP_BEGIN
:
1446 case MMonPaxos::OP_ACCEPT
:
1449 case MMonPaxos::OP_COMMIT
:
1452 case MMonPaxos::OP_LEASE
:
1455 case MMonPaxos::OP_LEASE_ACK
:
1456 handle_lease_ack(op
);
1470 // -----------------
1471 // service interface
1475 bool Paxos::is_readable(version_t v
)
1478 if (v
> last_committed
)
1482 (mon
->is_peon() || mon
->is_leader()) &&
1483 (is_active() || is_updating() || is_writing()) &&
1484 last_committed
> 0 && is_lease_valid(); // must have a value alone, or have lease
1485 dout(5) << __func__
<< " = " << (int)ret
1486 << " - now=" << ceph_clock_now()
1487 << " lease_expire=" << lease_expire
1488 << " has v" << v
<< " lc " << last_committed
1493 bool Paxos::read(version_t v
, bufferlist
&bl
)
1495 if (!get_store()->get(get_name(), v
, bl
))
1500 version_t
Paxos::read_current(bufferlist
&bl
)
1502 if (read(last_committed
, bl
))
1503 return last_committed
;
1508 bool Paxos::is_lease_valid()
1510 return ((mon
->get_quorum().size() == 1)
1511 || (ceph_clock_now() < lease_expire
));
1516 bool Paxos::is_writeable()
1524 void Paxos::propose_pending()
1526 ceph_assert(is_active());
1527 ceph_assert(pending_proposal
);
1532 pending_proposal
->encode(bl
);
1534 dout(10) << __func__
<< " " << (last_committed
+ 1)
1535 << " " << bl
.length() << " bytes" << dendl
;
1536 dout(30) << __func__
<< " transaction dump:\n";
1537 JSONFormatter
f(true);
1538 pending_proposal
->dump(&f
);
1542 pending_proposal
.reset();
1544 committing_finishers
.swap(pending_finishers
);
1545 state
= STATE_UPDATING
;
1549 void Paxos::queue_pending_finisher(Context
*onfinished
)
1551 dout(5) << __func__
<< " " << onfinished
<< dendl
;
1552 ceph_assert(onfinished
);
1553 pending_finishers
.push_back(onfinished
);
1556 MonitorDBStore::TransactionRef
Paxos::get_pending_transaction()
1558 ceph_assert(mon
->is_leader());
1559 if (!pending_proposal
) {
1560 pending_proposal
.reset(new MonitorDBStore::Transaction
);
1561 ceph_assert(pending_finishers
.empty());
1563 return pending_proposal
;
1566 bool Paxos::trigger_propose()
1569 dout(10) << __func__
<< " plugged, not proposing now" << dendl
;
1571 } else if (is_active()) {
1572 dout(10) << __func__
<< " active, proposing now" << dendl
;
1576 dout(10) << __func__
<< " not active, will propose later" << dendl
;
1581 bool Paxos::is_consistent()
1583 return (first_committed
<= last_committed
);