]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "PaxosService.h" | |
16 | #include "common/Clock.h" | |
17 | #include "common/config.h" | |
18 | #include "include/stringify.h" | |
11fdf7f2 | 19 | #include "include/ceph_assert.h" |
7c673cae FG |
20 | #include "mon/MonOpRequest.h" |
21 | ||
f67539c2 TL |
22 | using std::ostream; |
23 | using std::string; | |
24 | ||
25 | using ceph::bufferlist; | |
26 | ||
7c673cae FG |
27 | #define dout_subsys ceph_subsys_paxos |
28 | #undef dout_prefix | |
29 | #define dout_prefix _prefix(_dout, mon, paxos, service_name, get_first_committed(), get_last_committed()) | |
f67539c2 | 30 | static ostream& _prefix(std::ostream *_dout, Monitor &mon, Paxos &paxos, string service_name, |
7c673cae | 31 | version_t fc, version_t lc) { |
f67539c2 TL |
32 | return *_dout << "mon." << mon.name << "@" << mon.rank |
33 | << "(" << mon.get_state_name() | |
7c673cae FG |
34 | << ").paxosservice(" << service_name << " " << fc << ".." << lc << ") "; |
35 | } | |
36 | ||
37 | bool PaxosService::dispatch(MonOpRequestRef op) | |
38 | { | |
11fdf7f2 | 39 | ceph_assert(op->is_type_service() || op->is_type_command()); |
9f95a23c | 40 | auto m = op->get_req<PaxosServiceMessage>(); |
7c673cae FG |
41 | op->mark_event("psvc:dispatch"); |
42 | ||
224ce89b | 43 | dout(10) << __func__ << " " << m << " " << *m |
7c673cae FG |
44 | << " from " << m->get_orig_source_inst() |
45 | << " con " << m->get_connection() << dendl; | |
46 | ||
f67539c2 | 47 | if (mon.is_shutdown()) { |
7c673cae FG |
48 | return true; |
49 | } | |
50 | ||
51 | // make sure this message isn't forwarded from a previous election epoch | |
52 | if (m->rx_election_epoch && | |
f67539c2 | 53 | m->rx_election_epoch < mon.get_epoch()) { |
7c673cae | 54 | dout(10) << " discarding forwarded message from previous election epoch " |
f67539c2 | 55 | << m->rx_election_epoch << " < " << mon.get_epoch() << dendl; |
7c673cae FG |
56 | return true; |
57 | } | |
58 | ||
59 | // make sure the client is still connected. note that a proxied | |
60 | // connection will be disconnected with a null message; don't drop | |
61 | // those. also ignore loopback (e.g., log) messages. | |
62 | if (m->get_connection() && | |
63 | !m->get_connection()->is_connected() && | |
f67539c2 | 64 | m->get_connection() != mon.con_self && |
7c673cae FG |
65 | m->get_connection()->get_messenger() != NULL) { |
66 | dout(10) << " discarding message from disconnected client " | |
67 | << m->get_source_inst() << " " << *m << dendl; | |
68 | return true; | |
69 | } | |
70 | ||
71 | // make sure our map is readable and up to date | |
72 | if (!is_readable(m->version)) { | |
73 | dout(10) << " waiting for paxos -> readable (v" << m->version << ")" << dendl; | |
74 | wait_for_readable(op, new C_RetryMessage(this, op), m->version); | |
75 | return true; | |
76 | } | |
77 | ||
78 | // preprocess | |
79 | if (preprocess_query(op)) | |
80 | return true; // easy! | |
81 | ||
82 | // leader? | |
f67539c2 TL |
83 | if (!mon.is_leader()) { |
84 | mon.forward_request_leader(op); | |
7c673cae FG |
85 | return true; |
86 | } | |
87 | ||
88 | // writeable? | |
89 | if (!is_writeable()) { | |
90 | dout(10) << " waiting for paxos -> writeable" << dendl; | |
91 | wait_for_writeable(op, new C_RetryMessage(this, op)); | |
92 | return true; | |
93 | } | |
94 | ||
95 | // update | |
31f18b77 FG |
96 | if (!prepare_update(op)) { |
97 | // no changes made. | |
98 | return true; | |
99 | } | |
100 | ||
101 | if (need_immediate_propose) { | |
102 | dout(10) << __func__ << " forced immediate propose" << dendl; | |
103 | need_immediate_propose = false; | |
104 | propose_pending(); | |
105 | return true; | |
106 | } | |
107 | ||
108 | double delay = 0.0; | |
109 | if (!should_propose(delay)) { | |
110 | dout(10) << " not proposing" << dendl; | |
111 | return true; | |
112 | } | |
113 | ||
114 | if (delay == 0.0) { | |
115 | propose_pending(); | |
116 | return true; | |
117 | } | |
118 | ||
119 | // delay a bit | |
120 | if (!proposal_timer) { | |
121 | /** | |
122 | * Callback class used to propose the pending value once the proposal_timer | |
123 | * fires up. | |
124 | */ | |
f67539c2 | 125 | auto do_propose = new C_MonContext{&mon, [this](int r) { |
31f18b77 FG |
126 | proposal_timer = 0; |
127 | if (r >= 0) { | |
128 | propose_pending(); | |
129 | } else if (r == -ECANCELED || r == -EAGAIN) { | |
130 | return; | |
131 | } else { | |
11fdf7f2 | 132 | ceph_abort_msg("bad return value for proposal_timer"); |
31f18b77 | 133 | } |
9f95a23c | 134 | }}; |
3efd9988 | 135 | dout(10) << " setting proposal_timer " << do_propose |
31f18b77 | 136 | << " with delay of " << delay << dendl; |
f67539c2 | 137 | proposal_timer = mon.timer.add_event_after(delay, do_propose); |
31f18b77 FG |
138 | } else { |
139 | dout(10) << " proposal_timer already set" << dendl; | |
140 | } | |
7c673cae FG |
141 | return true; |
142 | } | |
143 | ||
144 | void PaxosService::refresh(bool *need_bootstrap) | |
145 | { | |
146 | // update cached versions | |
f67539c2 TL |
147 | cached_first_committed = mon.store->get(get_service_name(), first_committed_name); |
148 | cached_last_committed = mon.store->get(get_service_name(), last_committed_name); | |
7c673cae FG |
149 | |
150 | version_t new_format = get_value("format_version"); | |
151 | if (new_format != format_version) { | |
152 | dout(1) << __func__ << " upgraded, format " << format_version << " -> " << new_format << dendl; | |
153 | on_upgrade(); | |
154 | } | |
155 | format_version = new_format; | |
156 | ||
157 | dout(10) << __func__ << dendl; | |
158 | ||
159 | update_from_paxos(need_bootstrap); | |
160 | } | |
161 | ||
162 | void PaxosService::post_refresh() | |
163 | { | |
164 | dout(10) << __func__ << dendl; | |
165 | ||
166 | post_paxos_update(); | |
167 | ||
f67539c2 | 168 | if (mon.is_peon() && !waiting_for_finished_proposal.empty()) { |
7c673cae FG |
169 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); |
170 | } | |
171 | } | |
172 | ||
173 | bool PaxosService::should_propose(double& delay) | |
174 | { | |
175 | // simple default policy: quick startup, then some damping. | |
31f18b77 | 176 | if (get_last_committed() <= 1) { |
7c673cae | 177 | delay = 0.0; |
31f18b77 | 178 | } else { |
7c673cae | 179 | utime_t now = ceph_clock_now(); |
f67539c2 | 180 | if ((now - paxos.last_commit_time) > g_conf()->paxos_propose_interval) |
11fdf7f2 | 181 | delay = (double)g_conf()->paxos_min_wait; |
7c673cae | 182 | else |
f67539c2 | 183 | delay = (double)(g_conf()->paxos_propose_interval + paxos.last_commit_time |
7c673cae FG |
184 | - now); |
185 | } | |
186 | return true; | |
187 | } | |
188 | ||
189 | ||
190 | void PaxosService::propose_pending() | |
191 | { | |
224ce89b | 192 | dout(10) << __func__ << dendl; |
11fdf7f2 TL |
193 | ceph_assert(have_pending); |
194 | ceph_assert(!proposing); | |
f67539c2 | 195 | ceph_assert(mon.is_leader()); |
11fdf7f2 | 196 | ceph_assert(is_active()); |
7c673cae FG |
197 | |
198 | if (proposal_timer) { | |
199 | dout(10) << " canceling proposal_timer " << proposal_timer << dendl; | |
f67539c2 | 200 | mon.timer.cancel_event(proposal_timer); |
7c673cae FG |
201 | proposal_timer = NULL; |
202 | } | |
203 | ||
204 | /** | |
c07f9fc5 | 205 | * @note What we contribute to the pending Paxos transaction is |
7c673cae FG |
206 | * obtained by calling a function that must be implemented by |
207 | * the class implementing us. I.e., the function | |
208 | * encode_pending will be the one responsible to encode | |
209 | * whatever is pending on the implementation class into a | |
210 | * bufferlist, so we can then propose that as a value through | |
211 | * Paxos. | |
212 | */ | |
f67539c2 | 213 | MonitorDBStore::TransactionRef t = paxos.get_pending_transaction(); |
7c673cae FG |
214 | |
215 | if (should_stash_full()) | |
216 | encode_full(t); | |
217 | ||
218 | encode_pending(t); | |
219 | have_pending = false; | |
220 | ||
221 | if (format_version > 0) { | |
222 | t->put(get_service_name(), "format_version", format_version); | |
223 | } | |
224 | ||
225 | // apply to paxos | |
226 | proposing = true; | |
227 | /** | |
228 | * Callback class used to mark us as active once a proposal finishes going | |
229 | * through Paxos. | |
230 | * | |
231 | * We should wake people up *only* *after* we inform the service we | |
232 | * just went active. And we should wake people up only once we finish | |
233 | * going active. This is why we first go active, avoiding to wake up the | |
234 | * wrong people at the wrong time, such as waking up a C_RetryMessage | |
235 | * before waking up a C_Active, thus ending up without a pending value. | |
236 | */ | |
237 | class C_Committed : public Context { | |
238 | PaxosService *ps; | |
239 | public: | |
240 | explicit C_Committed(PaxosService *p) : ps(p) { } | |
241 | void finish(int r) override { | |
242 | ps->proposing = false; | |
243 | if (r >= 0) | |
244 | ps->_active(); | |
245 | else if (r == -ECANCELED || r == -EAGAIN) | |
246 | return; | |
247 | else | |
11fdf7f2 | 248 | ceph_abort_msg("bad return value for C_Committed"); |
7c673cae FG |
249 | } |
250 | }; | |
f67539c2 TL |
251 | paxos.queue_pending_finisher(new C_Committed(this)); |
252 | paxos.trigger_propose(); | |
7c673cae FG |
253 | } |
254 | ||
255 | bool PaxosService::should_stash_full() | |
256 | { | |
257 | version_t latest_full = get_version_latest_full(); | |
258 | /* @note The first member of the condition is moot and it is here just for | |
259 | * clarity's sake. The second member would end up returing true | |
260 | * nonetheless because, in that event, | |
261 | * latest_full == get_trim_to() == 0. | |
262 | */ | |
263 | return (!latest_full || | |
264 | (latest_full <= get_trim_to()) || | |
11fdf7f2 | 265 | (get_last_committed() - latest_full > (version_t)g_conf()->paxos_stash_full_interval)); |
7c673cae FG |
266 | } |
267 | ||
268 | void PaxosService::restart() | |
269 | { | |
224ce89b | 270 | dout(10) << __func__ << dendl; |
7c673cae FG |
271 | if (proposal_timer) { |
272 | dout(10) << " canceling proposal_timer " << proposal_timer << dendl; | |
f67539c2 | 273 | mon.timer.cancel_event(proposal_timer); |
7c673cae FG |
274 | proposal_timer = 0; |
275 | } | |
276 | ||
277 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); | |
278 | ||
279 | if (have_pending) { | |
280 | discard_pending(); | |
281 | have_pending = false; | |
282 | } | |
283 | proposing = false; | |
284 | ||
285 | on_restart(); | |
286 | } | |
287 | ||
288 | void PaxosService::election_finished() | |
289 | { | |
224ce89b | 290 | dout(10) << __func__ << dendl; |
7c673cae FG |
291 | |
292 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); | |
293 | ||
294 | // make sure we update our state | |
295 | _active(); | |
296 | } | |
297 | ||
298 | void PaxosService::_active() | |
299 | { | |
300 | if (is_proposing()) { | |
224ce89b | 301 | dout(10) << __func__ << " - proposing" << dendl; |
7c673cae FG |
302 | return; |
303 | } | |
304 | if (!is_active()) { | |
224ce89b | 305 | dout(10) << __func__ << " - not active" << dendl; |
7c673cae FG |
306 | /** |
307 | * Callback used to make sure we call the PaxosService::_active function | |
308 | * whenever a condition is fulfilled. | |
309 | * | |
310 | * This is used in multiple situations, from waiting for the Paxos to commit | |
311 | * our proposed value, to waiting for the Paxos to become active once an | |
312 | * election is finished. | |
313 | */ | |
314 | class C_Active : public Context { | |
315 | PaxosService *svc; | |
316 | public: | |
317 | explicit C_Active(PaxosService *s) : svc(s) {} | |
318 | void finish(int r) override { | |
319 | if (r >= 0) | |
320 | svc->_active(); | |
321 | } | |
322 | }; | |
323 | wait_for_active_ctx(new C_Active(this)); | |
324 | return; | |
325 | } | |
224ce89b | 326 | dout(10) << __func__ << dendl; |
7c673cae FG |
327 | |
328 | // create pending state? | |
f67539c2 | 329 | if (mon.is_leader()) { |
224ce89b | 330 | dout(7) << __func__ << " creating new pending" << dendl; |
7c673cae FG |
331 | if (!have_pending) { |
332 | create_pending(); | |
333 | have_pending = true; | |
334 | } | |
335 | ||
336 | if (get_last_committed() == 0) { | |
337 | // create initial state | |
338 | create_initial(); | |
339 | propose_pending(); | |
340 | return; | |
341 | } | |
342 | } else { | |
11fdf7f2 | 343 | dout(7) << __func__ << " we are not the leader, hence we propose nothing!" << dendl; |
7c673cae FG |
344 | } |
345 | ||
346 | // wake up anyone who came in while we were proposing. note that | |
347 | // anyone waiting for the previous proposal to commit is no longer | |
348 | // on this list; it is on Paxos's. | |
349 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, 0); | |
350 | ||
f67539c2 | 351 | if (mon.is_leader()) |
7c673cae FG |
352 | upgrade_format(); |
353 | ||
354 | // NOTE: it's possible that this will get called twice if we commit | |
355 | // an old paxos value. Implementations should be mindful of that. | |
356 | on_active(); | |
357 | } | |
358 | ||
359 | ||
360 | void PaxosService::shutdown() | |
361 | { | |
362 | cancel_events(); | |
363 | ||
364 | if (proposal_timer) { | |
365 | dout(10) << " canceling proposal_timer " << proposal_timer << dendl; | |
f67539c2 | 366 | mon.timer.cancel_event(proposal_timer); |
7c673cae FG |
367 | proposal_timer = 0; |
368 | } | |
369 | ||
370 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); | |
371 | ||
372 | on_shutdown(); | |
373 | } | |
374 | ||
375 | void PaxosService::maybe_trim() | |
376 | { | |
377 | if (!is_writeable()) | |
378 | return; | |
379 | ||
380 | version_t trim_to = get_trim_to(); | |
f67539c2 TL |
381 | if (trim_to < get_first_committed()) { |
382 | dout(10) << __func__ << " trim_to " << trim_to << " < first_committed" | |
383 | << get_first_committed() << dendl; | |
7c673cae | 384 | return; |
f67539c2 | 385 | } |
7c673cae FG |
386 | |
387 | version_t to_remove = trim_to - get_first_committed(); | |
f67539c2 TL |
388 | const version_t trim_min = g_conf().get_val<version_t>("paxos_service_trim_min"); |
389 | if (trim_min > 0 && | |
390 | to_remove < trim_min) { | |
7c673cae | 391 | dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove |
f67539c2 | 392 | << " < paxos_service_trim_min " << trim_min << dendl; |
7c673cae FG |
393 | return; |
394 | } | |
395 | ||
f67539c2 TL |
396 | to_remove = [to_remove, this] { |
397 | const version_t trim_max = g_conf().get_val<version_t>("paxos_service_trim_max"); | |
398 | if (trim_max == 0 || to_remove < trim_max) { | |
399 | return to_remove; | |
400 | } | |
401 | if (to_remove < trim_max * 1.5) { | |
402 | dout(10) << __func__ << " trim to " << get_trim_to() << " would only trim " << to_remove | |
403 | << " > paxos_service_trim_max, limiting to " << trim_max | |
404 | << dendl; | |
405 | return trim_max; | |
406 | } | |
407 | const version_t new_trim_max = (trim_max + to_remove) / 2; | |
408 | const uint64_t trim_max_multiplier = g_conf().get_val<uint64_t>("paxos_service_trim_max_multiplier"); | |
409 | if (trim_max_multiplier) { | |
410 | return std::min(new_trim_max, trim_max * trim_max_multiplier); | |
411 | } else { | |
412 | return new_trim_max; | |
413 | } | |
414 | }(); | |
415 | trim_to = get_first_committed() + to_remove; | |
7c673cae FG |
416 | |
417 | dout(10) << __func__ << " trimming to " << trim_to << ", " << to_remove << " states" << dendl; | |
f67539c2 | 418 | MonitorDBStore::TransactionRef t = paxos.get_pending_transaction(); |
7c673cae FG |
419 | trim(t, get_first_committed(), trim_to); |
420 | put_first_committed(t, trim_to); | |
94b18763 | 421 | cached_first_committed = trim_to; |
7c673cae FG |
422 | |
423 | // let the service add any extra stuff | |
424 | encode_trim_extra(t, trim_to); | |
425 | ||
f67539c2 | 426 | paxos.trigger_propose(); |
7c673cae FG |
427 | } |
428 | ||
429 | void PaxosService::trim(MonitorDBStore::TransactionRef t, | |
430 | version_t from, version_t to) | |
431 | { | |
432 | dout(10) << __func__ << " from " << from << " to " << to << dendl; | |
11fdf7f2 | 433 | ceph_assert(from != to); |
7c673cae FG |
434 | |
435 | for (version_t v = from; v < to; ++v) { | |
436 | dout(20) << __func__ << " " << v << dendl; | |
437 | t->erase(get_service_name(), v); | |
438 | ||
f67539c2 TL |
439 | string full_key = mon.store->combine_strings("full", v); |
440 | if (mon.store->exists(get_service_name(), full_key)) { | |
7c673cae FG |
441 | dout(20) << __func__ << " " << full_key << dendl; |
442 | t->erase(get_service_name(), full_key); | |
443 | } | |
444 | } | |
11fdf7f2 | 445 | if (g_conf()->mon_compact_on_trim) { |
7c673cae FG |
446 | dout(20) << " compacting prefix " << get_service_name() << dendl; |
447 | t->compact_range(get_service_name(), stringify(from - 1), stringify(to)); | |
448 | t->compact_range(get_service_name(), | |
f67539c2 TL |
449 | mon.store->combine_strings(full_prefix_name, from - 1), |
450 | mon.store->combine_strings(full_prefix_name, to)); | |
7c673cae FG |
451 | } |
452 | } | |
453 | ||
224ce89b WB |
454 | void PaxosService::load_health() |
455 | { | |
456 | bufferlist bl; | |
f67539c2 | 457 | mon.store->get("health", service_name, bl); |
224ce89b | 458 | if (bl.length()) { |
11fdf7f2 | 459 | auto p = bl.cbegin(); |
f67539c2 | 460 | using ceph::decode; |
11fdf7f2 | 461 | decode(health_checks, p); |
224ce89b WB |
462 | } |
463 | } |