]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "PaxosService.h" | |
16 | #include "common/Clock.h" | |
17 | #include "common/config.h" | |
18 | #include "include/stringify.h" | |
19 | #include "include/assert.h" | |
20 | #include "mon/MonOpRequest.h" | |
21 | ||
22 | #define dout_subsys ceph_subsys_paxos | |
23 | #undef dout_prefix | |
24 | #define dout_prefix _prefix(_dout, mon, paxos, service_name, get_first_committed(), get_last_committed()) | |
25 | static ostream& _prefix(std::ostream *_dout, Monitor *mon, Paxos *paxos, string service_name, | |
26 | version_t fc, version_t lc) { | |
27 | return *_dout << "mon." << mon->name << "@" << mon->rank | |
28 | << "(" << mon->get_state_name() | |
29 | << ").paxosservice(" << service_name << " " << fc << ".." << lc << ") "; | |
30 | } | |
31 | ||
32 | bool PaxosService::dispatch(MonOpRequestRef op) | |
33 | { | |
34 | assert(op->is_type_service() || op->is_type_command()); | |
35 | PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req()); | |
36 | op->mark_event("psvc:dispatch"); | |
37 | ||
224ce89b | 38 | dout(10) << __func__ << " " << m << " " << *m |
7c673cae FG |
39 | << " from " << m->get_orig_source_inst() |
40 | << " con " << m->get_connection() << dendl; | |
41 | ||
42 | if (mon->is_shutdown()) { | |
43 | return true; | |
44 | } | |
45 | ||
46 | // make sure this message isn't forwarded from a previous election epoch | |
47 | if (m->rx_election_epoch && | |
48 | m->rx_election_epoch < mon->get_epoch()) { | |
49 | dout(10) << " discarding forwarded message from previous election epoch " | |
50 | << m->rx_election_epoch << " < " << mon->get_epoch() << dendl; | |
51 | return true; | |
52 | } | |
53 | ||
54 | // make sure the client is still connected. note that a proxied | |
55 | // connection will be disconnected with a null message; don't drop | |
56 | // those. also ignore loopback (e.g., log) messages. | |
57 | if (m->get_connection() && | |
58 | !m->get_connection()->is_connected() && | |
59 | m->get_connection() != mon->con_self && | |
60 | m->get_connection()->get_messenger() != NULL) { | |
61 | dout(10) << " discarding message from disconnected client " | |
62 | << m->get_source_inst() << " " << *m << dendl; | |
63 | return true; | |
64 | } | |
65 | ||
66 | // make sure our map is readable and up to date | |
67 | if (!is_readable(m->version)) { | |
68 | dout(10) << " waiting for paxos -> readable (v" << m->version << ")" << dendl; | |
69 | wait_for_readable(op, new C_RetryMessage(this, op), m->version); | |
70 | return true; | |
71 | } | |
72 | ||
73 | // preprocess | |
74 | if (preprocess_query(op)) | |
75 | return true; // easy! | |
76 | ||
77 | // leader? | |
78 | if (!mon->is_leader()) { | |
79 | mon->forward_request_leader(op); | |
80 | return true; | |
81 | } | |
82 | ||
83 | // writeable? | |
84 | if (!is_writeable()) { | |
85 | dout(10) << " waiting for paxos -> writeable" << dendl; | |
86 | wait_for_writeable(op, new C_RetryMessage(this, op)); | |
87 | return true; | |
88 | } | |
89 | ||
90 | // update | |
31f18b77 FG |
91 | if (!prepare_update(op)) { |
92 | // no changes made. | |
93 | return true; | |
94 | } | |
95 | ||
96 | if (need_immediate_propose) { | |
97 | dout(10) << __func__ << " forced immediate propose" << dendl; | |
98 | need_immediate_propose = false; | |
99 | propose_pending(); | |
100 | return true; | |
101 | } | |
102 | ||
103 | double delay = 0.0; | |
104 | if (!should_propose(delay)) { | |
105 | dout(10) << " not proposing" << dendl; | |
106 | return true; | |
107 | } | |
108 | ||
109 | if (delay == 0.0) { | |
110 | propose_pending(); | |
111 | return true; | |
112 | } | |
113 | ||
114 | // delay a bit | |
115 | if (!proposal_timer) { | |
116 | /** | |
117 | * Callback class used to propose the pending value once the proposal_timer | |
118 | * fires up. | |
119 | */ | |
120 | proposal_timer = new C_MonContext(mon, [this](int r) { | |
121 | proposal_timer = 0; | |
122 | if (r >= 0) { | |
123 | propose_pending(); | |
124 | } else if (r == -ECANCELED || r == -EAGAIN) { | |
125 | return; | |
126 | } else { | |
127 | assert(0 == "bad return value for proposal_timer"); | |
128 | } | |
129 | }); | |
130 | dout(10) << " setting proposal_timer " << proposal_timer | |
131 | << " with delay of " << delay << dendl; | |
132 | mon->timer.add_event_after(delay, proposal_timer); | |
133 | } else { | |
134 | dout(10) << " proposal_timer already set" << dendl; | |
135 | } | |
7c673cae FG |
136 | return true; |
137 | } | |
138 | ||
139 | void PaxosService::refresh(bool *need_bootstrap) | |
140 | { | |
141 | // update cached versions | |
142 | cached_first_committed = mon->store->get(get_service_name(), first_committed_name); | |
143 | cached_last_committed = mon->store->get(get_service_name(), last_committed_name); | |
144 | ||
145 | version_t new_format = get_value("format_version"); | |
146 | if (new_format != format_version) { | |
147 | dout(1) << __func__ << " upgraded, format " << format_version << " -> " << new_format << dendl; | |
148 | on_upgrade(); | |
149 | } | |
150 | format_version = new_format; | |
151 | ||
152 | dout(10) << __func__ << dendl; | |
153 | ||
154 | update_from_paxos(need_bootstrap); | |
155 | } | |
156 | ||
157 | void PaxosService::post_refresh() | |
158 | { | |
159 | dout(10) << __func__ << dendl; | |
160 | ||
161 | post_paxos_update(); | |
162 | ||
163 | if (mon->is_peon() && !waiting_for_finished_proposal.empty()) { | |
164 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); | |
165 | } | |
166 | } | |
167 | ||
168 | bool PaxosService::should_propose(double& delay) | |
169 | { | |
170 | // simple default policy: quick startup, then some damping. | |
31f18b77 | 171 | if (get_last_committed() <= 1) { |
7c673cae | 172 | delay = 0.0; |
31f18b77 | 173 | } else { |
7c673cae FG |
174 | utime_t now = ceph_clock_now(); |
175 | if ((now - paxos->last_commit_time) > g_conf->paxos_propose_interval) | |
176 | delay = (double)g_conf->paxos_min_wait; | |
177 | else | |
178 | delay = (double)(g_conf->paxos_propose_interval + paxos->last_commit_time | |
179 | - now); | |
180 | } | |
181 | return true; | |
182 | } | |
183 | ||
184 | ||
185 | void PaxosService::propose_pending() | |
186 | { | |
224ce89b | 187 | dout(10) << __func__ << dendl; |
7c673cae FG |
188 | assert(have_pending); |
189 | assert(!proposing); | |
190 | assert(mon->is_leader()); | |
191 | assert(is_active()); | |
192 | ||
193 | if (proposal_timer) { | |
194 | dout(10) << " canceling proposal_timer " << proposal_timer << dendl; | |
195 | mon->timer.cancel_event(proposal_timer); | |
196 | proposal_timer = NULL; | |
197 | } | |
198 | ||
199 | /** | |
200 | * @note What we contirbute to the pending Paxos transaction is | |
201 | * obtained by calling a function that must be implemented by | |
202 | * the class implementing us. I.e., the function | |
203 | * encode_pending will be the one responsible to encode | |
204 | * whatever is pending on the implementation class into a | |
205 | * bufferlist, so we can then propose that as a value through | |
206 | * Paxos. | |
207 | */ | |
208 | MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); | |
209 | ||
210 | if (should_stash_full()) | |
211 | encode_full(t); | |
212 | ||
213 | encode_pending(t); | |
214 | have_pending = false; | |
215 | ||
216 | if (format_version > 0) { | |
217 | t->put(get_service_name(), "format_version", format_version); | |
218 | } | |
219 | ||
220 | // apply to paxos | |
221 | proposing = true; | |
222 | /** | |
223 | * Callback class used to mark us as active once a proposal finishes going | |
224 | * through Paxos. | |
225 | * | |
226 | * We should wake people up *only* *after* we inform the service we | |
227 | * just went active. And we should wake people up only once we finish | |
228 | * going active. This is why we first go active, avoiding to wake up the | |
229 | * wrong people at the wrong time, such as waking up a C_RetryMessage | |
230 | * before waking up a C_Active, thus ending up without a pending value. | |
231 | */ | |
232 | class C_Committed : public Context { | |
233 | PaxosService *ps; | |
234 | public: | |
235 | explicit C_Committed(PaxosService *p) : ps(p) { } | |
236 | void finish(int r) override { | |
237 | ps->proposing = false; | |
238 | if (r >= 0) | |
239 | ps->_active(); | |
240 | else if (r == -ECANCELED || r == -EAGAIN) | |
241 | return; | |
242 | else | |
243 | assert(0 == "bad return value for C_Committed"); | |
244 | } | |
245 | }; | |
246 | paxos->queue_pending_finisher(new C_Committed(this)); | |
247 | paxos->trigger_propose(); | |
248 | } | |
249 | ||
250 | bool PaxosService::should_stash_full() | |
251 | { | |
252 | version_t latest_full = get_version_latest_full(); | |
253 | /* @note The first member of the condition is moot and it is here just for | |
254 | * clarity's sake. The second member would end up returing true | |
255 | * nonetheless because, in that event, | |
256 | * latest_full == get_trim_to() == 0. | |
257 | */ | |
258 | return (!latest_full || | |
259 | (latest_full <= get_trim_to()) || | |
260 | (get_last_committed() - latest_full > (version_t)g_conf->paxos_stash_full_interval)); | |
261 | } | |
262 | ||
263 | void PaxosService::restart() | |
264 | { | |
224ce89b | 265 | dout(10) << __func__ << dendl; |
7c673cae FG |
266 | if (proposal_timer) { |
267 | dout(10) << " canceling proposal_timer " << proposal_timer << dendl; | |
268 | mon->timer.cancel_event(proposal_timer); | |
269 | proposal_timer = 0; | |
270 | } | |
271 | ||
272 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); | |
273 | ||
274 | if (have_pending) { | |
275 | discard_pending(); | |
276 | have_pending = false; | |
277 | } | |
278 | proposing = false; | |
279 | ||
280 | on_restart(); | |
281 | } | |
282 | ||
283 | void PaxosService::election_finished() | |
284 | { | |
224ce89b | 285 | dout(10) << __func__ << dendl; |
7c673cae FG |
286 | |
287 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); | |
288 | ||
289 | // make sure we update our state | |
290 | _active(); | |
291 | } | |
292 | ||
293 | void PaxosService::_active() | |
294 | { | |
295 | if (is_proposing()) { | |
224ce89b | 296 | dout(10) << __func__ << " - proposing" << dendl; |
7c673cae FG |
297 | return; |
298 | } | |
299 | if (!is_active()) { | |
224ce89b | 300 | dout(10) << __func__ << " - not active" << dendl; |
7c673cae FG |
301 | /** |
302 | * Callback used to make sure we call the PaxosService::_active function | |
303 | * whenever a condition is fulfilled. | |
304 | * | |
305 | * This is used in multiple situations, from waiting for the Paxos to commit | |
306 | * our proposed value, to waiting for the Paxos to become active once an | |
307 | * election is finished. | |
308 | */ | |
309 | class C_Active : public Context { | |
310 | PaxosService *svc; | |
311 | public: | |
312 | explicit C_Active(PaxosService *s) : svc(s) {} | |
313 | void finish(int r) override { | |
314 | if (r >= 0) | |
315 | svc->_active(); | |
316 | } | |
317 | }; | |
318 | wait_for_active_ctx(new C_Active(this)); | |
319 | return; | |
320 | } | |
224ce89b | 321 | dout(10) << __func__ << dendl; |
7c673cae FG |
322 | |
323 | // create pending state? | |
324 | if (mon->is_leader()) { | |
224ce89b | 325 | dout(7) << __func__ << " creating new pending" << dendl; |
7c673cae FG |
326 | if (!have_pending) { |
327 | create_pending(); | |
328 | have_pending = true; | |
329 | } | |
330 | ||
331 | if (get_last_committed() == 0) { | |
332 | // create initial state | |
333 | create_initial(); | |
334 | propose_pending(); | |
335 | return; | |
336 | } | |
337 | } else { | |
338 | if (!mon->is_leader()) { | |
339 | dout(7) << __func__ << " we are not the leader, hence we propose nothing!" << dendl; | |
340 | } | |
341 | } | |
342 | ||
343 | // wake up anyone who came in while we were proposing. note that | |
344 | // anyone waiting for the previous proposal to commit is no longer | |
345 | // on this list; it is on Paxos's. | |
346 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, 0); | |
347 | ||
348 | if (mon->is_leader()) | |
349 | upgrade_format(); | |
350 | ||
351 | // NOTE: it's possible that this will get called twice if we commit | |
352 | // an old paxos value. Implementations should be mindful of that. | |
353 | on_active(); | |
354 | } | |
355 | ||
356 | ||
357 | void PaxosService::shutdown() | |
358 | { | |
359 | cancel_events(); | |
360 | ||
361 | if (proposal_timer) { | |
362 | dout(10) << " canceling proposal_timer " << proposal_timer << dendl; | |
363 | mon->timer.cancel_event(proposal_timer); | |
364 | proposal_timer = 0; | |
365 | } | |
366 | ||
367 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); | |
368 | ||
369 | on_shutdown(); | |
370 | } | |
371 | ||
372 | void PaxosService::maybe_trim() | |
373 | { | |
374 | if (!is_writeable()) | |
375 | return; | |
376 | ||
377 | version_t trim_to = get_trim_to(); | |
378 | if (trim_to < get_first_committed()) | |
379 | return; | |
380 | ||
381 | version_t to_remove = trim_to - get_first_committed(); | |
382 | if (g_conf->paxos_service_trim_min > 0 && | |
383 | to_remove < (version_t)g_conf->paxos_service_trim_min) { | |
384 | dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove | |
385 | << " < paxos_service_trim_min " << g_conf->paxos_service_trim_min << dendl; | |
386 | return; | |
387 | } | |
388 | ||
389 | if (g_conf->paxos_service_trim_max > 0 && | |
390 | to_remove > (version_t)g_conf->paxos_service_trim_max) { | |
391 | dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove | |
392 | << " > paxos_service_trim_max, limiting to " << g_conf->paxos_service_trim_max | |
393 | << dendl; | |
394 | trim_to = get_first_committed() + g_conf->paxos_service_trim_max; | |
395 | to_remove = g_conf->paxos_service_trim_max; | |
396 | } | |
397 | ||
398 | dout(10) << __func__ << " trimming to " << trim_to << ", " << to_remove << " states" << dendl; | |
399 | MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); | |
400 | trim(t, get_first_committed(), trim_to); | |
401 | put_first_committed(t, trim_to); | |
402 | ||
403 | // let the service add any extra stuff | |
404 | encode_trim_extra(t, trim_to); | |
405 | ||
406 | paxos->trigger_propose(); | |
407 | } | |
408 | ||
409 | void PaxosService::trim(MonitorDBStore::TransactionRef t, | |
410 | version_t from, version_t to) | |
411 | { | |
412 | dout(10) << __func__ << " from " << from << " to " << to << dendl; | |
413 | assert(from != to); | |
414 | ||
415 | for (version_t v = from; v < to; ++v) { | |
416 | dout(20) << __func__ << " " << v << dendl; | |
417 | t->erase(get_service_name(), v); | |
418 | ||
419 | string full_key = mon->store->combine_strings("full", v); | |
420 | if (mon->store->exists(get_service_name(), full_key)) { | |
421 | dout(20) << __func__ << " " << full_key << dendl; | |
422 | t->erase(get_service_name(), full_key); | |
423 | } | |
424 | } | |
425 | if (g_conf->mon_compact_on_trim) { | |
426 | dout(20) << " compacting prefix " << get_service_name() << dendl; | |
427 | t->compact_range(get_service_name(), stringify(from - 1), stringify(to)); | |
428 | t->compact_range(get_service_name(), | |
429 | mon->store->combine_strings(full_prefix_name, from - 1), | |
430 | mon->store->combine_strings(full_prefix_name, to)); | |
431 | } | |
432 | } | |
433 | ||
224ce89b WB |
434 | void PaxosService::load_health() |
435 | { | |
436 | bufferlist bl; | |
437 | mon->store->get("health", service_name, bl); | |
438 | if (bl.length()) { | |
439 | auto p = bl.begin(); | |
440 | ::decode(health_checks, p); | |
441 | } | |
442 | } |