1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "PaxosService.h"
16 #include "common/Clock.h"
17 #include "common/config.h"
18 #include "include/stringify.h"
19 #include "include/ceph_assert.h"
20 #include "mon/MonOpRequest.h"
25 using ceph::bufferlist
;
27 #define dout_subsys ceph_subsys_paxos
29 #define dout_prefix _prefix(_dout, mon, paxos, service_name, get_first_committed(), get_last_committed())
30 static ostream
& _prefix(std::ostream
*_dout
, Monitor
&mon
, Paxos
&paxos
, string service_name
,
31 version_t fc
, version_t lc
) {
32 return *_dout
<< "mon." << mon
.name
<< "@" << mon
.rank
33 << "(" << mon
.get_state_name()
34 << ").paxosservice(" << service_name
<< " " << fc
<< ".." << lc
<< ") ";
37 bool PaxosService::dispatch(MonOpRequestRef op
)
39 ceph_assert(op
->is_type_service() || op
->is_type_command());
40 auto m
= op
->get_req
<PaxosServiceMessage
>();
41 op
->mark_event("psvc:dispatch");
43 dout(10) << __func__
<< " " << m
<< " " << *m
44 << " from " << m
->get_orig_source_inst()
45 << " con " << m
->get_connection() << dendl
;
47 if (mon
.is_shutdown()) {
51 // make sure this message isn't forwarded from a previous election epoch
52 if (m
->rx_election_epoch
&&
53 m
->rx_election_epoch
< mon
.get_epoch()) {
54 dout(10) << " discarding forwarded message from previous election epoch "
55 << m
->rx_election_epoch
<< " < " << mon
.get_epoch() << dendl
;
59 // make sure the client is still connected. note that a proxied
60 // connection will be disconnected with a null message; don't drop
61 // those. also ignore loopback (e.g., log) messages.
62 if (m
->get_connection() &&
63 !m
->get_connection()->is_connected() &&
64 m
->get_connection() != mon
.con_self
&&
65 m
->get_connection()->get_messenger() != NULL
) {
66 dout(10) << " discarding message from disconnected client "
67 << m
->get_source_inst() << " " << *m
<< dendl
;
71 // make sure our map is readable and up to date
72 if (!is_readable(m
->version
)) {
73 dout(10) << " waiting for paxos -> readable (v" << m
->version
<< ")" << dendl
;
74 wait_for_readable(op
, new C_RetryMessage(this, op
), m
->version
);
79 if (preprocess_query(op
))
83 if (!mon
.is_leader()) {
84 mon
.forward_request_leader(op
);
89 if (!is_writeable()) {
90 dout(10) << " waiting for paxos -> writeable" << dendl
;
91 wait_for_writeable(op
, new C_RetryMessage(this, op
));
96 if (!prepare_update(op
)) {
101 if (need_immediate_propose
) {
102 dout(10) << __func__
<< " forced immediate propose" << dendl
;
103 need_immediate_propose
= false;
109 if (!should_propose(delay
)) {
110 dout(10) << " not proposing" << dendl
;
120 if (!proposal_timer
) {
122 * Callback class used to propose the pending value once the proposal_timer
125 auto do_propose
= new C_MonContext
{&mon
, [this](int r
) {
129 } else if (r
== -ECANCELED
|| r
== -EAGAIN
) {
132 ceph_abort_msg("bad return value for proposal_timer");
135 dout(10) << " setting proposal_timer " << do_propose
136 << " with delay of " << delay
<< dendl
;
137 proposal_timer
= mon
.timer
.add_event_after(delay
, do_propose
);
139 dout(10) << " proposal_timer already set" << dendl
;
144 void PaxosService::refresh(bool *need_bootstrap
)
146 // update cached versions
147 cached_first_committed
= mon
.store
->get(get_service_name(), first_committed_name
);
148 cached_last_committed
= mon
.store
->get(get_service_name(), last_committed_name
);
150 version_t new_format
= get_value("format_version");
151 if (new_format
!= format_version
) {
152 dout(1) << __func__
<< " upgraded, format " << format_version
<< " -> " << new_format
<< dendl
;
155 format_version
= new_format
;
157 dout(10) << __func__
<< dendl
;
159 update_from_paxos(need_bootstrap
);
162 void PaxosService::post_refresh()
164 dout(10) << __func__
<< dendl
;
168 if (mon
.is_peon() && !waiting_for_finished_proposal
.empty()) {
169 finish_contexts(g_ceph_context
, waiting_for_finished_proposal
, -EAGAIN
);
173 bool PaxosService::should_propose(double& delay
)
175 // simple default policy: quick startup, then some damping.
176 if (get_last_committed() <= 1) {
179 utime_t now
= ceph_clock_now();
180 if ((now
- paxos
.last_commit_time
) > g_conf()->paxos_propose_interval
)
181 delay
= (double)g_conf()->paxos_min_wait
;
183 delay
= (double)(g_conf()->paxos_propose_interval
+ paxos
.last_commit_time
190 void PaxosService::propose_pending()
192 dout(10) << __func__
<< dendl
;
193 ceph_assert(have_pending
);
194 ceph_assert(!proposing
);
195 ceph_assert(mon
.is_leader());
196 ceph_assert(is_active());
198 if (proposal_timer
) {
199 dout(10) << " canceling proposal_timer " << proposal_timer
<< dendl
;
200 mon
.timer
.cancel_event(proposal_timer
);
201 proposal_timer
= NULL
;
205 * @note What we contribute to the pending Paxos transaction is
206 * obtained by calling a function that must be implemented by
207 * the class implementing us. I.e., the function
208 * encode_pending will be the one responsible to encode
209 * whatever is pending on the implementation class into a
210 * bufferlist, so we can then propose that as a value through
213 MonitorDBStore::TransactionRef t
= paxos
.get_pending_transaction();
215 if (should_stash_full())
219 have_pending
= false;
221 if (format_version
> 0) {
222 t
->put(get_service_name(), "format_version", format_version
);
228 * Callback class used to mark us as active once a proposal finishes going
231 * We should wake people up *only* *after* we inform the service we
232 * just went active. And we should wake people up only once we finish
233 * going active. This is why we first go active, avoiding to wake up the
234 * wrong people at the wrong time, such as waking up a C_RetryMessage
235 * before waking up a C_Active, thus ending up without a pending value.
237 class C_Committed
: public Context
{
240 explicit C_Committed(PaxosService
*p
) : ps(p
) { }
241 void finish(int r
) override
{
242 ps
->proposing
= false;
245 else if (r
== -ECANCELED
|| r
== -EAGAIN
)
248 ceph_abort_msg("bad return value for C_Committed");
251 paxos
.queue_pending_finisher(new C_Committed(this));
252 paxos
.trigger_propose();
255 bool PaxosService::should_stash_full()
257 version_t latest_full
= get_version_latest_full();
258 /* @note The first member of the condition is moot and it is here just for
259 * clarity's sake. The second member would end up returing true
260 * nonetheless because, in that event,
261 * latest_full == get_trim_to() == 0.
263 return (!latest_full
||
264 (latest_full
<= get_trim_to()) ||
265 (get_last_committed() - latest_full
> (version_t
)g_conf()->paxos_stash_full_interval
));
268 void PaxosService::restart()
270 dout(10) << __func__
<< dendl
;
271 if (proposal_timer
) {
272 dout(10) << " canceling proposal_timer " << proposal_timer
<< dendl
;
273 mon
.timer
.cancel_event(proposal_timer
);
277 finish_contexts(g_ceph_context
, waiting_for_finished_proposal
, -EAGAIN
);
281 have_pending
= false;
288 void PaxosService::election_finished()
290 dout(10) << __func__
<< dendl
;
292 finish_contexts(g_ceph_context
, waiting_for_finished_proposal
, -EAGAIN
);
294 // make sure we update our state
298 void PaxosService::_active()
300 if (is_proposing()) {
301 dout(10) << __func__
<< " - proposing" << dendl
;
305 dout(10) << __func__
<< " - not active" << dendl
;
307 * Callback used to make sure we call the PaxosService::_active function
308 * whenever a condition is fulfilled.
310 * This is used in multiple situations, from waiting for the Paxos to commit
311 * our proposed value, to waiting for the Paxos to become active once an
312 * election is finished.
314 class C_Active
: public Context
{
317 explicit C_Active(PaxosService
*s
) : svc(s
) {}
318 void finish(int r
) override
{
323 wait_for_active_ctx(new C_Active(this));
326 dout(10) << __func__
<< dendl
;
328 // create pending state?
329 if (mon
.is_leader()) {
330 dout(7) << __func__
<< " creating new pending" << dendl
;
336 if (get_last_committed() == 0) {
337 // create initial state
343 dout(7) << __func__
<< " we are not the leader, hence we propose nothing!" << dendl
;
346 // wake up anyone who came in while we were proposing. note that
347 // anyone waiting for the previous proposal to commit is no longer
348 // on this list; it is on Paxos's.
349 finish_contexts(g_ceph_context
, waiting_for_finished_proposal
, 0);
354 // NOTE: it's possible that this will get called twice if we commit
355 // an old paxos value. Implementations should be mindful of that.
360 void PaxosService::shutdown()
364 if (proposal_timer
) {
365 dout(10) << " canceling proposal_timer " << proposal_timer
<< dendl
;
366 mon
.timer
.cancel_event(proposal_timer
);
370 finish_contexts(g_ceph_context
, waiting_for_finished_proposal
, -EAGAIN
);
375 void PaxosService::maybe_trim()
380 const version_t first_committed
= get_first_committed();
381 version_t trim_to
= get_trim_to();
382 dout(20) << __func__
<< " " << first_committed
<< "~" << trim_to
<< dendl
;
384 if (trim_to
< first_committed
) {
385 dout(10) << __func__
<< " trim_to " << trim_to
<< " < first_committed "
386 << first_committed
<< dendl
;
390 version_t to_remove
= trim_to
- first_committed
;
391 const version_t trim_min
= g_conf().get_val
<version_t
>("paxos_service_trim_min");
393 to_remove
< trim_min
) {
394 dout(10) << __func__
<< " trim_to " << trim_to
<< " would only trim " << to_remove
395 << " < paxos_service_trim_min " << trim_min
<< dendl
;
399 to_remove
= [to_remove
, trim_to
, this] {
400 const version_t trim_max
= g_conf().get_val
<version_t
>("paxos_service_trim_max");
401 if (trim_max
== 0 || to_remove
< trim_max
) {
404 if (to_remove
< trim_max
* 1.5) {
405 dout(10) << __func__
<< " trim to " << trim_to
<< " would only trim " << to_remove
406 << " > paxos_service_trim_max, limiting to " << trim_max
410 const version_t new_trim_max
= (trim_max
+ to_remove
) / 2;
411 const uint64_t trim_max_multiplier
= g_conf().get_val
<uint64_t>("paxos_service_trim_max_multiplier");
412 if (trim_max_multiplier
) {
413 return std::min(new_trim_max
, trim_max
* trim_max_multiplier
);
418 trim_to
= first_committed
+ to_remove
;
420 dout(10) << __func__
<< " trimming to " << trim_to
<< ", " << to_remove
<< " states" << dendl
;
421 MonitorDBStore::TransactionRef t
= paxos
.get_pending_transaction();
422 trim(t
, first_committed
, trim_to
);
423 put_first_committed(t
, trim_to
);
424 cached_first_committed
= trim_to
;
426 // let the service add any extra stuff
427 encode_trim_extra(t
, trim_to
);
429 paxos
.trigger_propose();
432 void PaxosService::trim(MonitorDBStore::TransactionRef t
,
433 version_t from
, version_t to
)
435 dout(10) << __func__
<< " from " << from
<< " to " << to
<< dendl
;
436 ceph_assert(from
!= to
);
438 for (version_t v
= from
; v
< to
; ++v
) {
439 dout(20) << __func__
<< " " << v
<< dendl
;
440 t
->erase(get_service_name(), v
);
442 string full_key
= mon
.store
->combine_strings("full", v
);
443 if (mon
.store
->exists(get_service_name(), full_key
)) {
444 dout(20) << __func__
<< " " << full_key
<< dendl
;
445 t
->erase(get_service_name(), full_key
);
448 if (g_conf()->mon_compact_on_trim
) {
449 dout(20) << " compacting prefix " << get_service_name() << dendl
;
450 t
->compact_range(get_service_name(), stringify(from
- 1), stringify(to
));
451 t
->compact_range(get_service_name(),
452 mon
.store
->combine_strings(full_prefix_name
, from
- 1),
453 mon
.store
->combine_strings(full_prefix_name
, to
));
457 void PaxosService::load_health()
460 mon
.store
->get("health", service_name
, bl
);
462 auto p
= bl
.cbegin();
464 decode(health_checks
, p
);