]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "PaxosService.h" | |
16 | #include "common/Clock.h" | |
17 | #include "common/config.h" | |
18 | #include "include/stringify.h" | |
11fdf7f2 | 19 | #include "include/ceph_assert.h" |
7c673cae FG |
20 | #include "mon/MonOpRequest.h" |
21 | ||
f67539c2 TL |
22 | using std::ostream; |
23 | using std::string; | |
24 | ||
25 | using ceph::bufferlist; | |
26 | ||
7c673cae FG |
27 | #define dout_subsys ceph_subsys_paxos |
28 | #undef dout_prefix | |
29 | #define dout_prefix _prefix(_dout, mon, paxos, service_name, get_first_committed(), get_last_committed()) | |
f67539c2 | 30 | static ostream& _prefix(std::ostream *_dout, Monitor &mon, Paxos &paxos, string service_name, |
7c673cae | 31 | version_t fc, version_t lc) { |
f67539c2 TL |
32 | return *_dout << "mon." << mon.name << "@" << mon.rank |
33 | << "(" << mon.get_state_name() | |
7c673cae FG |
34 | << ").paxosservice(" << service_name << " " << fc << ".." << lc << ") "; |
35 | } | |
36 | ||
37 | bool PaxosService::dispatch(MonOpRequestRef op) | |
38 | { | |
11fdf7f2 | 39 | ceph_assert(op->is_type_service() || op->is_type_command()); |
9f95a23c | 40 | auto m = op->get_req<PaxosServiceMessage>(); |
7c673cae FG |
41 | op->mark_event("psvc:dispatch"); |
42 | ||
224ce89b | 43 | dout(10) << __func__ << " " << m << " " << *m |
7c673cae FG |
44 | << " from " << m->get_orig_source_inst() |
45 | << " con " << m->get_connection() << dendl; | |
46 | ||
f67539c2 | 47 | if (mon.is_shutdown()) { |
7c673cae FG |
48 | return true; |
49 | } | |
50 | ||
51 | // make sure this message isn't forwarded from a previous election epoch | |
52 | if (m->rx_election_epoch && | |
f67539c2 | 53 | m->rx_election_epoch < mon.get_epoch()) { |
7c673cae | 54 | dout(10) << " discarding forwarded message from previous election epoch " |
f67539c2 | 55 | << m->rx_election_epoch << " < " << mon.get_epoch() << dendl; |
7c673cae FG |
56 | return true; |
57 | } | |
58 | ||
59 | // make sure the client is still connected. note that a proxied | |
60 | // connection will be disconnected with a null message; don't drop | |
61 | // those. also ignore loopback (e.g., log) messages. | |
62 | if (m->get_connection() && | |
63 | !m->get_connection()->is_connected() && | |
f67539c2 | 64 | m->get_connection() != mon.con_self && |
7c673cae FG |
65 | m->get_connection()->get_messenger() != NULL) { |
66 | dout(10) << " discarding message from disconnected client " | |
67 | << m->get_source_inst() << " " << *m << dendl; | |
68 | return true; | |
69 | } | |
70 | ||
71 | // make sure our map is readable and up to date | |
72 | if (!is_readable(m->version)) { | |
73 | dout(10) << " waiting for paxos -> readable (v" << m->version << ")" << dendl; | |
74 | wait_for_readable(op, new C_RetryMessage(this, op), m->version); | |
75 | return true; | |
76 | } | |
77 | ||
78 | // preprocess | |
79 | if (preprocess_query(op)) | |
80 | return true; // easy! | |
81 | ||
82 | // leader? | |
f67539c2 TL |
83 | if (!mon.is_leader()) { |
84 | mon.forward_request_leader(op); | |
7c673cae FG |
85 | return true; |
86 | } | |
87 | ||
88 | // writeable? | |
89 | if (!is_writeable()) { | |
90 | dout(10) << " waiting for paxos -> writeable" << dendl; | |
91 | wait_for_writeable(op, new C_RetryMessage(this, op)); | |
92 | return true; | |
93 | } | |
94 | ||
95 | // update | |
31f18b77 FG |
96 | if (!prepare_update(op)) { |
97 | // no changes made. | |
98 | return true; | |
99 | } | |
100 | ||
101 | if (need_immediate_propose) { | |
102 | dout(10) << __func__ << " forced immediate propose" << dendl; | |
31f18b77 FG |
103 | propose_pending(); |
104 | return true; | |
105 | } | |
106 | ||
107 | double delay = 0.0; | |
108 | if (!should_propose(delay)) { | |
109 | dout(10) << " not proposing" << dendl; | |
110 | return true; | |
111 | } | |
112 | ||
113 | if (delay == 0.0) { | |
114 | propose_pending(); | |
115 | return true; | |
116 | } | |
117 | ||
118 | // delay a bit | |
119 | if (!proposal_timer) { | |
120 | /** | |
121 | * Callback class used to propose the pending value once the proposal_timer | |
122 | * fires up. | |
123 | */ | |
f67539c2 | 124 | auto do_propose = new C_MonContext{&mon, [this](int r) { |
31f18b77 FG |
125 | proposal_timer = 0; |
126 | if (r >= 0) { | |
127 | propose_pending(); | |
128 | } else if (r == -ECANCELED || r == -EAGAIN) { | |
129 | return; | |
130 | } else { | |
11fdf7f2 | 131 | ceph_abort_msg("bad return value for proposal_timer"); |
31f18b77 | 132 | } |
9f95a23c | 133 | }}; |
3efd9988 | 134 | dout(10) << " setting proposal_timer " << do_propose |
31f18b77 | 135 | << " with delay of " << delay << dendl; |
f67539c2 | 136 | proposal_timer = mon.timer.add_event_after(delay, do_propose); |
31f18b77 FG |
137 | } else { |
138 | dout(10) << " proposal_timer already set" << dendl; | |
139 | } | |
7c673cae FG |
140 | return true; |
141 | } | |
142 | ||
143 | void PaxosService::refresh(bool *need_bootstrap) | |
144 | { | |
145 | // update cached versions | |
f67539c2 TL |
146 | cached_first_committed = mon.store->get(get_service_name(), first_committed_name); |
147 | cached_last_committed = mon.store->get(get_service_name(), last_committed_name); | |
7c673cae FG |
148 | |
149 | version_t new_format = get_value("format_version"); | |
150 | if (new_format != format_version) { | |
151 | dout(1) << __func__ << " upgraded, format " << format_version << " -> " << new_format << dendl; | |
152 | on_upgrade(); | |
153 | } | |
154 | format_version = new_format; | |
155 | ||
156 | dout(10) << __func__ << dendl; | |
157 | ||
158 | update_from_paxos(need_bootstrap); | |
159 | } | |
160 | ||
161 | void PaxosService::post_refresh() | |
162 | { | |
163 | dout(10) << __func__ << dendl; | |
164 | ||
165 | post_paxos_update(); | |
166 | ||
f67539c2 | 167 | if (mon.is_peon() && !waiting_for_finished_proposal.empty()) { |
7c673cae FG |
168 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); |
169 | } | |
170 | } | |
171 | ||
172 | bool PaxosService::should_propose(double& delay) | |
173 | { | |
174 | // simple default policy: quick startup, then some damping. | |
31f18b77 | 175 | if (get_last_committed() <= 1) { |
7c673cae | 176 | delay = 0.0; |
31f18b77 | 177 | } else { |
7c673cae | 178 | utime_t now = ceph_clock_now(); |
f67539c2 | 179 | if ((now - paxos.last_commit_time) > g_conf()->paxos_propose_interval) |
11fdf7f2 | 180 | delay = (double)g_conf()->paxos_min_wait; |
7c673cae | 181 | else |
f67539c2 | 182 | delay = (double)(g_conf()->paxos_propose_interval + paxos.last_commit_time |
7c673cae FG |
183 | - now); |
184 | } | |
185 | return true; | |
186 | } | |
187 | ||
188 | ||
189 | void PaxosService::propose_pending() | |
190 | { | |
224ce89b | 191 | dout(10) << __func__ << dendl; |
11fdf7f2 TL |
192 | ceph_assert(have_pending); |
193 | ceph_assert(!proposing); | |
f67539c2 | 194 | ceph_assert(mon.is_leader()); |
11fdf7f2 | 195 | ceph_assert(is_active()); |
7c673cae FG |
196 | |
197 | if (proposal_timer) { | |
198 | dout(10) << " canceling proposal_timer " << proposal_timer << dendl; | |
f67539c2 | 199 | mon.timer.cancel_event(proposal_timer); |
7c673cae FG |
200 | proposal_timer = NULL; |
201 | } | |
202 | ||
203 | /** | |
c07f9fc5 | 204 | * @note What we contribute to the pending Paxos transaction is |
7c673cae FG |
205 | * obtained by calling a function that must be implemented by |
206 | * the class implementing us. I.e., the function | |
207 | * encode_pending will be the one responsible to encode | |
208 | * whatever is pending on the implementation class into a | |
209 | * bufferlist, so we can then propose that as a value through | |
210 | * Paxos. | |
211 | */ | |
f67539c2 | 212 | MonitorDBStore::TransactionRef t = paxos.get_pending_transaction(); |
7c673cae FG |
213 | |
214 | if (should_stash_full()) | |
215 | encode_full(t); | |
216 | ||
217 | encode_pending(t); | |
218 | have_pending = false; | |
219 | ||
220 | if (format_version > 0) { | |
221 | t->put(get_service_name(), "format_version", format_version); | |
222 | } | |
223 | ||
224 | // apply to paxos | |
225 | proposing = true; | |
1e59de90 | 226 | need_immediate_propose = false; /* reset whenever we propose */ |
7c673cae FG |
227 | /** |
228 | * Callback class used to mark us as active once a proposal finishes going | |
229 | * through Paxos. | |
230 | * | |
231 | * We should wake people up *only* *after* we inform the service we | |
232 | * just went active. And we should wake people up only once we finish | |
233 | * going active. This is why we first go active, avoiding to wake up the | |
234 | * wrong people at the wrong time, such as waking up a C_RetryMessage | |
235 | * before waking up a C_Active, thus ending up without a pending value. | |
236 | */ | |
237 | class C_Committed : public Context { | |
238 | PaxosService *ps; | |
239 | public: | |
240 | explicit C_Committed(PaxosService *p) : ps(p) { } | |
241 | void finish(int r) override { | |
242 | ps->proposing = false; | |
243 | if (r >= 0) | |
244 | ps->_active(); | |
245 | else if (r == -ECANCELED || r == -EAGAIN) | |
246 | return; | |
247 | else | |
11fdf7f2 | 248 | ceph_abort_msg("bad return value for C_Committed"); |
7c673cae FG |
249 | } |
250 | }; | |
f67539c2 TL |
251 | paxos.queue_pending_finisher(new C_Committed(this)); |
252 | paxos.trigger_propose(); | |
7c673cae FG |
253 | } |
254 | ||
255 | bool PaxosService::should_stash_full() | |
256 | { | |
257 | version_t latest_full = get_version_latest_full(); | |
258 | /* @note The first member of the condition is moot and it is here just for | |
259 | * clarity's sake. The second member would end up returing true | |
260 | * nonetheless because, in that event, | |
261 | * latest_full == get_trim_to() == 0. | |
262 | */ | |
263 | return (!latest_full || | |
264 | (latest_full <= get_trim_to()) || | |
11fdf7f2 | 265 | (get_last_committed() - latest_full > (version_t)g_conf()->paxos_stash_full_interval)); |
7c673cae FG |
266 | } |
267 | ||
268 | void PaxosService::restart() | |
269 | { | |
224ce89b | 270 | dout(10) << __func__ << dendl; |
7c673cae FG |
271 | if (proposal_timer) { |
272 | dout(10) << " canceling proposal_timer " << proposal_timer << dendl; | |
f67539c2 | 273 | mon.timer.cancel_event(proposal_timer); |
7c673cae FG |
274 | proposal_timer = 0; |
275 | } | |
276 | ||
277 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); | |
278 | ||
279 | if (have_pending) { | |
280 | discard_pending(); | |
281 | have_pending = false; | |
282 | } | |
283 | proposing = false; | |
284 | ||
285 | on_restart(); | |
286 | } | |
287 | ||
288 | void PaxosService::election_finished() | |
289 | { | |
224ce89b | 290 | dout(10) << __func__ << dendl; |
7c673cae FG |
291 | |
292 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); | |
293 | ||
294 | // make sure we update our state | |
295 | _active(); | |
296 | } | |
297 | ||
298 | void PaxosService::_active() | |
299 | { | |
300 | if (is_proposing()) { | |
224ce89b | 301 | dout(10) << __func__ << " - proposing" << dendl; |
7c673cae FG |
302 | return; |
303 | } | |
304 | if (!is_active()) { | |
224ce89b | 305 | dout(10) << __func__ << " - not active" << dendl; |
7c673cae FG |
306 | /** |
307 | * Callback used to make sure we call the PaxosService::_active function | |
308 | * whenever a condition is fulfilled. | |
309 | * | |
310 | * This is used in multiple situations, from waiting for the Paxos to commit | |
311 | * our proposed value, to waiting for the Paxos to become active once an | |
312 | * election is finished. | |
313 | */ | |
314 | class C_Active : public Context { | |
315 | PaxosService *svc; | |
316 | public: | |
317 | explicit C_Active(PaxosService *s) : svc(s) {} | |
318 | void finish(int r) override { | |
319 | if (r >= 0) | |
320 | svc->_active(); | |
321 | } | |
322 | }; | |
323 | wait_for_active_ctx(new C_Active(this)); | |
324 | return; | |
325 | } | |
224ce89b | 326 | dout(10) << __func__ << dendl; |
7c673cae FG |
327 | |
328 | // create pending state? | |
f67539c2 | 329 | if (mon.is_leader()) { |
224ce89b | 330 | dout(7) << __func__ << " creating new pending" << dendl; |
7c673cae FG |
331 | if (!have_pending) { |
332 | create_pending(); | |
333 | have_pending = true; | |
334 | } | |
335 | ||
336 | if (get_last_committed() == 0) { | |
337 | // create initial state | |
338 | create_initial(); | |
339 | propose_pending(); | |
340 | return; | |
341 | } | |
342 | } else { | |
11fdf7f2 | 343 | dout(7) << __func__ << " we are not the leader, hence we propose nothing!" << dendl; |
7c673cae FG |
344 | } |
345 | ||
346 | // wake up anyone who came in while we were proposing. note that | |
347 | // anyone waiting for the previous proposal to commit is no longer | |
348 | // on this list; it is on Paxos's. | |
349 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, 0); | |
350 | ||
f67539c2 | 351 | if (mon.is_leader()) |
7c673cae FG |
352 | upgrade_format(); |
353 | ||
354 | // NOTE: it's possible that this will get called twice if we commit | |
355 | // an old paxos value. Implementations should be mindful of that. | |
356 | on_active(); | |
357 | } | |
358 | ||
359 | ||
360 | void PaxosService::shutdown() | |
361 | { | |
362 | cancel_events(); | |
363 | ||
364 | if (proposal_timer) { | |
365 | dout(10) << " canceling proposal_timer " << proposal_timer << dendl; | |
f67539c2 | 366 | mon.timer.cancel_event(proposal_timer); |
7c673cae FG |
367 | proposal_timer = 0; |
368 | } | |
369 | ||
370 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); | |
371 | ||
372 | on_shutdown(); | |
373 | } | |
374 | ||
375 | void PaxosService::maybe_trim() | |
376 | { | |
377 | if (!is_writeable()) | |
378 | return; | |
379 | ||
522d829b | 380 | const version_t first_committed = get_first_committed(); |
7c673cae | 381 | version_t trim_to = get_trim_to(); |
522d829b TL |
382 | dout(20) << __func__ << " " << first_committed << "~" << trim_to << dendl; |
383 | ||
384 | if (trim_to < first_committed) { | |
385 | dout(10) << __func__ << " trim_to " << trim_to << " < first_committed " | |
386 | << first_committed << dendl; | |
7c673cae | 387 | return; |
f67539c2 | 388 | } |
7c673cae | 389 | |
522d829b | 390 | version_t to_remove = trim_to - first_committed; |
f67539c2 TL |
391 | const version_t trim_min = g_conf().get_val<version_t>("paxos_service_trim_min"); |
392 | if (trim_min > 0 && | |
393 | to_remove < trim_min) { | |
7c673cae | 394 | dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove |
f67539c2 | 395 | << " < paxos_service_trim_min " << trim_min << dendl; |
7c673cae FG |
396 | return; |
397 | } | |
398 | ||
522d829b | 399 | to_remove = [to_remove, trim_to, this] { |
f67539c2 TL |
400 | const version_t trim_max = g_conf().get_val<version_t>("paxos_service_trim_max"); |
401 | if (trim_max == 0 || to_remove < trim_max) { | |
402 | return to_remove; | |
403 | } | |
404 | if (to_remove < trim_max * 1.5) { | |
522d829b | 405 | dout(10) << __func__ << " trim to " << trim_to << " would only trim " << to_remove |
f67539c2 TL |
406 | << " > paxos_service_trim_max, limiting to " << trim_max |
407 | << dendl; | |
408 | return trim_max; | |
409 | } | |
410 | const version_t new_trim_max = (trim_max + to_remove) / 2; | |
411 | const uint64_t trim_max_multiplier = g_conf().get_val<uint64_t>("paxos_service_trim_max_multiplier"); | |
412 | if (trim_max_multiplier) { | |
413 | return std::min(new_trim_max, trim_max * trim_max_multiplier); | |
414 | } else { | |
415 | return new_trim_max; | |
416 | } | |
417 | }(); | |
522d829b | 418 | trim_to = first_committed + to_remove; |
7c673cae FG |
419 | |
420 | dout(10) << __func__ << " trimming to " << trim_to << ", " << to_remove << " states" << dendl; | |
f67539c2 | 421 | MonitorDBStore::TransactionRef t = paxos.get_pending_transaction(); |
522d829b | 422 | trim(t, first_committed, trim_to); |
7c673cae | 423 | put_first_committed(t, trim_to); |
94b18763 | 424 | cached_first_committed = trim_to; |
7c673cae FG |
425 | |
426 | // let the service add any extra stuff | |
427 | encode_trim_extra(t, trim_to); | |
428 | ||
f67539c2 | 429 | paxos.trigger_propose(); |
7c673cae FG |
430 | } |
431 | ||
432 | void PaxosService::trim(MonitorDBStore::TransactionRef t, | |
433 | version_t from, version_t to) | |
434 | { | |
435 | dout(10) << __func__ << " from " << from << " to " << to << dendl; | |
11fdf7f2 | 436 | ceph_assert(from != to); |
7c673cae FG |
437 | |
438 | for (version_t v = from; v < to; ++v) { | |
439 | dout(20) << __func__ << " " << v << dendl; | |
440 | t->erase(get_service_name(), v); | |
441 | ||
f67539c2 TL |
442 | string full_key = mon.store->combine_strings("full", v); |
443 | if (mon.store->exists(get_service_name(), full_key)) { | |
7c673cae FG |
444 | dout(20) << __func__ << " " << full_key << dendl; |
445 | t->erase(get_service_name(), full_key); | |
446 | } | |
447 | } | |
11fdf7f2 | 448 | if (g_conf()->mon_compact_on_trim) { |
7c673cae FG |
449 | dout(20) << " compacting prefix " << get_service_name() << dendl; |
450 | t->compact_range(get_service_name(), stringify(from - 1), stringify(to)); | |
451 | t->compact_range(get_service_name(), | |
f67539c2 TL |
452 | mon.store->combine_strings(full_prefix_name, from - 1), |
453 | mon.store->combine_strings(full_prefix_name, to)); | |
7c673cae FG |
454 | } |
455 | } | |
456 | ||
224ce89b WB |
457 | void PaxosService::load_health() |
458 | { | |
459 | bufferlist bl; | |
f67539c2 | 460 | mon.store->get("health", service_name, bl); |
224ce89b | 461 | if (bl.length()) { |
11fdf7f2 | 462 | auto p = bl.cbegin(); |
f67539c2 | 463 | using ceph::decode; |
11fdf7f2 | 464 | decode(health_checks, p); |
224ce89b WB |
465 | } |
466 | } |