1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "PaxosService.h"
16 #include "common/Clock.h"
17 #include "common/config.h"
18 #include "include/stringify.h"
19 #include "include/assert.h"
20 #include "mon/MonOpRequest.h"
22 #define dout_subsys ceph_subsys_paxos
24 #define dout_prefix _prefix(_dout, mon, paxos, service_name, get_first_committed(), get_last_committed())
25 static ostream
& _prefix(std::ostream
*_dout
, Monitor
*mon
, Paxos
*paxos
, string service_name
,
26 version_t fc
, version_t lc
) {
27 return *_dout
<< "mon." << mon
->name
<< "@" << mon
->rank
28 << "(" << mon
->get_state_name()
29 << ").paxosservice(" << service_name
<< " " << fc
<< ".." << lc
<< ") ";
32 bool PaxosService::dispatch(MonOpRequestRef op
)
34 assert(op
->is_type_service() || op
->is_type_command());
35 PaxosServiceMessage
*m
= static_cast<PaxosServiceMessage
*>(op
->get_req());
36 op
->mark_event("psvc:dispatch");
38 dout(10) << __func__
<< " " << m
<< " " << *m
39 << " from " << m
->get_orig_source_inst()
40 << " con " << m
->get_connection() << dendl
;
42 if (mon
->is_shutdown()) {
46 // make sure this message isn't forwarded from a previous election epoch
47 if (m
->rx_election_epoch
&&
48 m
->rx_election_epoch
< mon
->get_epoch()) {
49 dout(10) << " discarding forwarded message from previous election epoch "
50 << m
->rx_election_epoch
<< " < " << mon
->get_epoch() << dendl
;
54 // make sure the client is still connected. note that a proxied
55 // connection will be disconnected with a null message; don't drop
56 // those. also ignore loopback (e.g., log) messages.
57 if (m
->get_connection() &&
58 !m
->get_connection()->is_connected() &&
59 m
->get_connection() != mon
->con_self
&&
60 m
->get_connection()->get_messenger() != NULL
) {
61 dout(10) << " discarding message from disconnected client "
62 << m
->get_source_inst() << " " << *m
<< dendl
;
66 // make sure our map is readable and up to date
67 if (!is_readable(m
->version
)) {
68 dout(10) << " waiting for paxos -> readable (v" << m
->version
<< ")" << dendl
;
69 wait_for_readable(op
, new C_RetryMessage(this, op
), m
->version
);
74 if (preprocess_query(op
))
78 if (!mon
->is_leader()) {
79 mon
->forward_request_leader(op
);
84 if (!is_writeable()) {
85 dout(10) << " waiting for paxos -> writeable" << dendl
;
86 wait_for_writeable(op
, new C_RetryMessage(this, op
));
91 if (!prepare_update(op
)) {
96 if (need_immediate_propose
) {
97 dout(10) << __func__
<< " forced immediate propose" << dendl
;
98 need_immediate_propose
= false;
104 if (!should_propose(delay
)) {
105 dout(10) << " not proposing" << dendl
;
115 if (!proposal_timer
) {
117 * Callback class used to propose the pending value once the proposal_timer
120 proposal_timer
= new C_MonContext(mon
, [this](int r
) {
124 } else if (r
== -ECANCELED
|| r
== -EAGAIN
) {
127 assert(0 == "bad return value for proposal_timer");
130 dout(10) << " setting proposal_timer " << proposal_timer
131 << " with delay of " << delay
<< dendl
;
132 mon
->timer
.add_event_after(delay
, proposal_timer
);
134 dout(10) << " proposal_timer already set" << dendl
;
139 void PaxosService::refresh(bool *need_bootstrap
)
141 // update cached versions
142 cached_first_committed
= mon
->store
->get(get_service_name(), first_committed_name
);
143 cached_last_committed
= mon
->store
->get(get_service_name(), last_committed_name
);
145 version_t new_format
= get_value("format_version");
146 if (new_format
!= format_version
) {
147 dout(1) << __func__
<< " upgraded, format " << format_version
<< " -> " << new_format
<< dendl
;
150 format_version
= new_format
;
152 dout(10) << __func__
<< dendl
;
154 update_from_paxos(need_bootstrap
);
157 void PaxosService::post_refresh()
159 dout(10) << __func__
<< dendl
;
163 if (mon
->is_peon() && !waiting_for_finished_proposal
.empty()) {
164 finish_contexts(g_ceph_context
, waiting_for_finished_proposal
, -EAGAIN
);
168 bool PaxosService::should_propose(double& delay
)
170 // simple default policy: quick startup, then some damping.
171 if (get_last_committed() <= 1) {
174 utime_t now
= ceph_clock_now();
175 if ((now
- paxos
->last_commit_time
) > g_conf
->paxos_propose_interval
)
176 delay
= (double)g_conf
->paxos_min_wait
;
178 delay
= (double)(g_conf
->paxos_propose_interval
+ paxos
->last_commit_time
185 void PaxosService::propose_pending()
187 dout(10) << __func__
<< dendl
;
188 assert(have_pending
);
190 assert(mon
->is_leader());
193 if (proposal_timer
) {
194 dout(10) << " canceling proposal_timer " << proposal_timer
<< dendl
;
195 mon
->timer
.cancel_event(proposal_timer
);
196 proposal_timer
= NULL
;
200 * @note What we contirbute to the pending Paxos transaction is
201 * obtained by calling a function that must be implemented by
202 * the class implementing us. I.e., the function
203 * encode_pending will be the one responsible to encode
204 * whatever is pending on the implementation class into a
205 * bufferlist, so we can then propose that as a value through
208 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
210 if (should_stash_full())
214 have_pending
= false;
216 if (format_version
> 0) {
217 t
->put(get_service_name(), "format_version", format_version
);
223 * Callback class used to mark us as active once a proposal finishes going
226 * We should wake people up *only* *after* we inform the service we
227 * just went active. And we should wake people up only once we finish
228 * going active. This is why we first go active, avoiding to wake up the
229 * wrong people at the wrong time, such as waking up a C_RetryMessage
230 * before waking up a C_Active, thus ending up without a pending value.
232 class C_Committed
: public Context
{
235 explicit C_Committed(PaxosService
*p
) : ps(p
) { }
236 void finish(int r
) override
{
237 ps
->proposing
= false;
240 else if (r
== -ECANCELED
|| r
== -EAGAIN
)
243 assert(0 == "bad return value for C_Committed");
246 paxos
->queue_pending_finisher(new C_Committed(this));
247 paxos
->trigger_propose();
250 bool PaxosService::should_stash_full()
252 version_t latest_full
= get_version_latest_full();
253 /* @note The first member of the condition is moot and it is here just for
254 * clarity's sake. The second member would end up returing true
255 * nonetheless because, in that event,
256 * latest_full == get_trim_to() == 0.
258 return (!latest_full
||
259 (latest_full
<= get_trim_to()) ||
260 (get_last_committed() - latest_full
> (version_t
)g_conf
->paxos_stash_full_interval
));
263 void PaxosService::restart()
265 dout(10) << __func__
<< dendl
;
266 if (proposal_timer
) {
267 dout(10) << " canceling proposal_timer " << proposal_timer
<< dendl
;
268 mon
->timer
.cancel_event(proposal_timer
);
272 finish_contexts(g_ceph_context
, waiting_for_finished_proposal
, -EAGAIN
);
276 have_pending
= false;
283 void PaxosService::election_finished()
285 dout(10) << __func__
<< dendl
;
287 finish_contexts(g_ceph_context
, waiting_for_finished_proposal
, -EAGAIN
);
289 // make sure we update our state
293 void PaxosService::_active()
295 if (is_proposing()) {
296 dout(10) << __func__
<< " - proposing" << dendl
;
300 dout(10) << __func__
<< " - not active" << dendl
;
302 * Callback used to make sure we call the PaxosService::_active function
303 * whenever a condition is fulfilled.
305 * This is used in multiple situations, from waiting for the Paxos to commit
306 * our proposed value, to waiting for the Paxos to become active once an
307 * election is finished.
309 class C_Active
: public Context
{
312 explicit C_Active(PaxosService
*s
) : svc(s
) {}
313 void finish(int r
) override
{
318 wait_for_active_ctx(new C_Active(this));
321 dout(10) << __func__
<< dendl
;
323 // create pending state?
324 if (mon
->is_leader()) {
325 dout(7) << __func__
<< " creating new pending" << dendl
;
331 if (get_last_committed() == 0) {
332 // create initial state
338 if (!mon
->is_leader()) {
339 dout(7) << __func__
<< " we are not the leader, hence we propose nothing!" << dendl
;
343 // wake up anyone who came in while we were proposing. note that
344 // anyone waiting for the previous proposal to commit is no longer
345 // on this list; it is on Paxos's.
346 finish_contexts(g_ceph_context
, waiting_for_finished_proposal
, 0);
348 if (mon
->is_leader())
351 // NOTE: it's possible that this will get called twice if we commit
352 // an old paxos value. Implementations should be mindful of that.
357 void PaxosService::shutdown()
361 if (proposal_timer
) {
362 dout(10) << " canceling proposal_timer " << proposal_timer
<< dendl
;
363 mon
->timer
.cancel_event(proposal_timer
);
367 finish_contexts(g_ceph_context
, waiting_for_finished_proposal
, -EAGAIN
);
372 void PaxosService::maybe_trim()
377 version_t trim_to
= get_trim_to();
378 if (trim_to
< get_first_committed())
381 version_t to_remove
= trim_to
- get_first_committed();
382 if (g_conf
->paxos_service_trim_min
> 0 &&
383 to_remove
< (version_t
)g_conf
->paxos_service_trim_min
) {
384 dout(10) << __func__
<< " trim_to " << trim_to
<< " would only trim " << to_remove
385 << " < paxos_service_trim_min " << g_conf
->paxos_service_trim_min
<< dendl
;
389 if (g_conf
->paxos_service_trim_max
> 0 &&
390 to_remove
> (version_t
)g_conf
->paxos_service_trim_max
) {
391 dout(10) << __func__
<< " trim_to " << trim_to
<< " would only trim " << to_remove
392 << " > paxos_service_trim_max, limiting to " << g_conf
->paxos_service_trim_max
394 trim_to
= get_first_committed() + g_conf
->paxos_service_trim_max
;
395 to_remove
= g_conf
->paxos_service_trim_max
;
398 dout(10) << __func__
<< " trimming to " << trim_to
<< ", " << to_remove
<< " states" << dendl
;
399 MonitorDBStore::TransactionRef t
= paxos
->get_pending_transaction();
400 trim(t
, get_first_committed(), trim_to
);
401 put_first_committed(t
, trim_to
);
403 // let the service add any extra stuff
404 encode_trim_extra(t
, trim_to
);
406 paxos
->trigger_propose();
409 void PaxosService::trim(MonitorDBStore::TransactionRef t
,
410 version_t from
, version_t to
)
412 dout(10) << __func__
<< " from " << from
<< " to " << to
<< dendl
;
415 for (version_t v
= from
; v
< to
; ++v
) {
416 dout(20) << __func__
<< " " << v
<< dendl
;
417 t
->erase(get_service_name(), v
);
419 string full_key
= mon
->store
->combine_strings("full", v
);
420 if (mon
->store
->exists(get_service_name(), full_key
)) {
421 dout(20) << __func__
<< " " << full_key
<< dendl
;
422 t
->erase(get_service_name(), full_key
);
425 if (g_conf
->mon_compact_on_trim
) {
426 dout(20) << " compacting prefix " << get_service_name() << dendl
;
427 t
->compact_range(get_service_name(), stringify(from
- 1), stringify(to
));
428 t
->compact_range(get_service_name(),
429 mon
->store
->combine_strings(full_prefix_name
, from
- 1),
430 mon
->store
->combine_strings(full_prefix_name
, to
));
434 void PaxosService::load_health()
437 mon
->store
->get("health", service_name
, bl
);
440 ::decode(health_checks
, p
);