]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "PaxosService.h" | |
16 | #include "common/Clock.h" | |
17 | #include "common/config.h" | |
18 | #include "include/stringify.h" | |
19 | #include "include/assert.h" | |
20 | #include "mon/MonOpRequest.h" | |
21 | ||
22 | #define dout_subsys ceph_subsys_paxos | |
23 | #undef dout_prefix | |
24 | #define dout_prefix _prefix(_dout, mon, paxos, service_name, get_first_committed(), get_last_committed()) | |
25 | static ostream& _prefix(std::ostream *_dout, Monitor *mon, Paxos *paxos, string service_name, | |
26 | version_t fc, version_t lc) { | |
27 | return *_dout << "mon." << mon->name << "@" << mon->rank | |
28 | << "(" << mon->get_state_name() | |
29 | << ").paxosservice(" << service_name << " " << fc << ".." << lc << ") "; | |
30 | } | |
31 | ||
32 | bool PaxosService::dispatch(MonOpRequestRef op) | |
33 | { | |
34 | assert(op->is_type_service() || op->is_type_command()); | |
35 | PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req()); | |
36 | op->mark_event("psvc:dispatch"); | |
37 | ||
38 | dout(10) << "dispatch " << m << " " << *m | |
39 | << " from " << m->get_orig_source_inst() | |
40 | << " con " << m->get_connection() << dendl; | |
41 | ||
42 | if (mon->is_shutdown()) { | |
43 | return true; | |
44 | } | |
45 | ||
46 | // make sure this message isn't forwarded from a previous election epoch | |
47 | if (m->rx_election_epoch && | |
48 | m->rx_election_epoch < mon->get_epoch()) { | |
49 | dout(10) << " discarding forwarded message from previous election epoch " | |
50 | << m->rx_election_epoch << " < " << mon->get_epoch() << dendl; | |
51 | return true; | |
52 | } | |
53 | ||
54 | // make sure the client is still connected. note that a proxied | |
55 | // connection will be disconnected with a null message; don't drop | |
56 | // those. also ignore loopback (e.g., log) messages. | |
57 | if (m->get_connection() && | |
58 | !m->get_connection()->is_connected() && | |
59 | m->get_connection() != mon->con_self && | |
60 | m->get_connection()->get_messenger() != NULL) { | |
61 | dout(10) << " discarding message from disconnected client " | |
62 | << m->get_source_inst() << " " << *m << dendl; | |
63 | return true; | |
64 | } | |
65 | ||
66 | // make sure our map is readable and up to date | |
67 | if (!is_readable(m->version)) { | |
68 | dout(10) << " waiting for paxos -> readable (v" << m->version << ")" << dendl; | |
69 | wait_for_readable(op, new C_RetryMessage(this, op), m->version); | |
70 | return true; | |
71 | } | |
72 | ||
73 | // preprocess | |
74 | if (preprocess_query(op)) | |
75 | return true; // easy! | |
76 | ||
77 | // leader? | |
78 | if (!mon->is_leader()) { | |
79 | mon->forward_request_leader(op); | |
80 | return true; | |
81 | } | |
82 | ||
83 | // writeable? | |
84 | if (!is_writeable()) { | |
85 | dout(10) << " waiting for paxos -> writeable" << dendl; | |
86 | wait_for_writeable(op, new C_RetryMessage(this, op)); | |
87 | return true; | |
88 | } | |
89 | ||
90 | // update | |
91 | if (prepare_update(op)) { | |
92 | double delay = 0.0; | |
93 | if (should_propose(delay)) { | |
94 | if (delay == 0.0) { | |
95 | propose_pending(); | |
96 | } else { | |
97 | // delay a bit | |
98 | if (!proposal_timer) { | |
99 | /** | |
100 | * Callback class used to propose the pending value once the proposal_timer | |
101 | * fires up. | |
102 | */ | |
103 | proposal_timer = new C_MonContext(mon, [this](int r) { | |
104 | proposal_timer = 0; | |
105 | if (r >= 0) | |
106 | propose_pending(); | |
107 | else if (r == -ECANCELED || r == -EAGAIN) | |
108 | return; | |
109 | else | |
110 | assert(0 == "bad return value for proposal_timer"); | |
111 | }); | |
112 | dout(10) << " setting proposal_timer " << proposal_timer << " with delay of " << delay << dendl; | |
113 | mon->timer.add_event_after(delay, proposal_timer); | |
114 | } else { | |
115 | dout(10) << " proposal_timer already set" << dendl; | |
116 | } | |
117 | } | |
118 | } else { | |
119 | dout(10) << " not proposing" << dendl; | |
120 | } | |
121 | } | |
122 | return true; | |
123 | } | |
124 | ||
125 | void PaxosService::refresh(bool *need_bootstrap) | |
126 | { | |
127 | // update cached versions | |
128 | cached_first_committed = mon->store->get(get_service_name(), first_committed_name); | |
129 | cached_last_committed = mon->store->get(get_service_name(), last_committed_name); | |
130 | ||
131 | version_t new_format = get_value("format_version"); | |
132 | if (new_format != format_version) { | |
133 | dout(1) << __func__ << " upgraded, format " << format_version << " -> " << new_format << dendl; | |
134 | on_upgrade(); | |
135 | } | |
136 | format_version = new_format; | |
137 | ||
138 | dout(10) << __func__ << dendl; | |
139 | ||
140 | update_from_paxos(need_bootstrap); | |
141 | } | |
142 | ||
143 | void PaxosService::post_refresh() | |
144 | { | |
145 | dout(10) << __func__ << dendl; | |
146 | ||
147 | post_paxos_update(); | |
148 | ||
149 | if (mon->is_peon() && !waiting_for_finished_proposal.empty()) { | |
150 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); | |
151 | } | |
152 | } | |
153 | ||
154 | bool PaxosService::should_propose(double& delay) | |
155 | { | |
156 | // simple default policy: quick startup, then some damping. | |
157 | if (get_last_committed() <= 1) | |
158 | delay = 0.0; | |
159 | else { | |
160 | utime_t now = ceph_clock_now(); | |
161 | if ((now - paxos->last_commit_time) > g_conf->paxos_propose_interval) | |
162 | delay = (double)g_conf->paxos_min_wait; | |
163 | else | |
164 | delay = (double)(g_conf->paxos_propose_interval + paxos->last_commit_time | |
165 | - now); | |
166 | } | |
167 | return true; | |
168 | } | |
169 | ||
170 | ||
171 | void PaxosService::propose_pending() | |
172 | { | |
173 | dout(10) << "propose_pending" << dendl; | |
174 | assert(have_pending); | |
175 | assert(!proposing); | |
176 | assert(mon->is_leader()); | |
177 | assert(is_active()); | |
178 | ||
179 | if (proposal_timer) { | |
180 | dout(10) << " canceling proposal_timer " << proposal_timer << dendl; | |
181 | mon->timer.cancel_event(proposal_timer); | |
182 | proposal_timer = NULL; | |
183 | } | |
184 | ||
185 | /** | |
186 | * @note What we contirbute to the pending Paxos transaction is | |
187 | * obtained by calling a function that must be implemented by | |
188 | * the class implementing us. I.e., the function | |
189 | * encode_pending will be the one responsible to encode | |
190 | * whatever is pending on the implementation class into a | |
191 | * bufferlist, so we can then propose that as a value through | |
192 | * Paxos. | |
193 | */ | |
194 | MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); | |
195 | ||
196 | if (should_stash_full()) | |
197 | encode_full(t); | |
198 | ||
199 | encode_pending(t); | |
200 | have_pending = false; | |
201 | ||
202 | if (format_version > 0) { | |
203 | t->put(get_service_name(), "format_version", format_version); | |
204 | } | |
205 | ||
206 | // apply to paxos | |
207 | proposing = true; | |
208 | /** | |
209 | * Callback class used to mark us as active once a proposal finishes going | |
210 | * through Paxos. | |
211 | * | |
212 | * We should wake people up *only* *after* we inform the service we | |
213 | * just went active. And we should wake people up only once we finish | |
214 | * going active. This is why we first go active, avoiding to wake up the | |
215 | * wrong people at the wrong time, such as waking up a C_RetryMessage | |
216 | * before waking up a C_Active, thus ending up without a pending value. | |
217 | */ | |
218 | class C_Committed : public Context { | |
219 | PaxosService *ps; | |
220 | public: | |
221 | explicit C_Committed(PaxosService *p) : ps(p) { } | |
222 | void finish(int r) override { | |
223 | ps->proposing = false; | |
224 | if (r >= 0) | |
225 | ps->_active(); | |
226 | else if (r == -ECANCELED || r == -EAGAIN) | |
227 | return; | |
228 | else | |
229 | assert(0 == "bad return value for C_Committed"); | |
230 | } | |
231 | }; | |
232 | paxos->queue_pending_finisher(new C_Committed(this)); | |
233 | paxos->trigger_propose(); | |
234 | } | |
235 | ||
236 | bool PaxosService::should_stash_full() | |
237 | { | |
238 | version_t latest_full = get_version_latest_full(); | |
239 | /* @note The first member of the condition is moot and it is here just for | |
240 | * clarity's sake. The second member would end up returing true | |
241 | * nonetheless because, in that event, | |
242 | * latest_full == get_trim_to() == 0. | |
243 | */ | |
244 | return (!latest_full || | |
245 | (latest_full <= get_trim_to()) || | |
246 | (get_last_committed() - latest_full > (version_t)g_conf->paxos_stash_full_interval)); | |
247 | } | |
248 | ||
249 | void PaxosService::restart() | |
250 | { | |
251 | dout(10) << "restart" << dendl; | |
252 | if (proposal_timer) { | |
253 | dout(10) << " canceling proposal_timer " << proposal_timer << dendl; | |
254 | mon->timer.cancel_event(proposal_timer); | |
255 | proposal_timer = 0; | |
256 | } | |
257 | ||
258 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); | |
259 | ||
260 | if (have_pending) { | |
261 | discard_pending(); | |
262 | have_pending = false; | |
263 | } | |
264 | proposing = false; | |
265 | ||
266 | on_restart(); | |
267 | } | |
268 | ||
269 | void PaxosService::election_finished() | |
270 | { | |
271 | dout(10) << "election_finished" << dendl; | |
272 | ||
273 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); | |
274 | ||
275 | // make sure we update our state | |
276 | _active(); | |
277 | } | |
278 | ||
279 | void PaxosService::_active() | |
280 | { | |
281 | if (is_proposing()) { | |
282 | dout(10) << "_acting - proposing" << dendl; | |
283 | return; | |
284 | } | |
285 | if (!is_active()) { | |
286 | dout(10) << "_active - not active" << dendl; | |
287 | /** | |
288 | * Callback used to make sure we call the PaxosService::_active function | |
289 | * whenever a condition is fulfilled. | |
290 | * | |
291 | * This is used in multiple situations, from waiting for the Paxos to commit | |
292 | * our proposed value, to waiting for the Paxos to become active once an | |
293 | * election is finished. | |
294 | */ | |
295 | class C_Active : public Context { | |
296 | PaxosService *svc; | |
297 | public: | |
298 | explicit C_Active(PaxosService *s) : svc(s) {} | |
299 | void finish(int r) override { | |
300 | if (r >= 0) | |
301 | svc->_active(); | |
302 | } | |
303 | }; | |
304 | wait_for_active_ctx(new C_Active(this)); | |
305 | return; | |
306 | } | |
307 | dout(10) << "_active" << dendl; | |
308 | ||
309 | // create pending state? | |
310 | if (mon->is_leader()) { | |
311 | dout(7) << "_active creating new pending" << dendl; | |
312 | if (!have_pending) { | |
313 | create_pending(); | |
314 | have_pending = true; | |
315 | } | |
316 | ||
317 | if (get_last_committed() == 0) { | |
318 | // create initial state | |
319 | create_initial(); | |
320 | propose_pending(); | |
321 | return; | |
322 | } | |
323 | } else { | |
324 | if (!mon->is_leader()) { | |
325 | dout(7) << __func__ << " we are not the leader, hence we propose nothing!" << dendl; | |
326 | } | |
327 | } | |
328 | ||
329 | // wake up anyone who came in while we were proposing. note that | |
330 | // anyone waiting for the previous proposal to commit is no longer | |
331 | // on this list; it is on Paxos's. | |
332 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, 0); | |
333 | ||
334 | if (mon->is_leader()) | |
335 | upgrade_format(); | |
336 | ||
337 | // NOTE: it's possible that this will get called twice if we commit | |
338 | // an old paxos value. Implementations should be mindful of that. | |
339 | on_active(); | |
340 | } | |
341 | ||
342 | ||
343 | void PaxosService::shutdown() | |
344 | { | |
345 | cancel_events(); | |
346 | ||
347 | if (proposal_timer) { | |
348 | dout(10) << " canceling proposal_timer " << proposal_timer << dendl; | |
349 | mon->timer.cancel_event(proposal_timer); | |
350 | proposal_timer = 0; | |
351 | } | |
352 | ||
353 | finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); | |
354 | ||
355 | on_shutdown(); | |
356 | } | |
357 | ||
358 | void PaxosService::maybe_trim() | |
359 | { | |
360 | if (!is_writeable()) | |
361 | return; | |
362 | ||
363 | version_t trim_to = get_trim_to(); | |
364 | if (trim_to < get_first_committed()) | |
365 | return; | |
366 | ||
367 | version_t to_remove = trim_to - get_first_committed(); | |
368 | if (g_conf->paxos_service_trim_min > 0 && | |
369 | to_remove < (version_t)g_conf->paxos_service_trim_min) { | |
370 | dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove | |
371 | << " < paxos_service_trim_min " << g_conf->paxos_service_trim_min << dendl; | |
372 | return; | |
373 | } | |
374 | ||
375 | if (g_conf->paxos_service_trim_max > 0 && | |
376 | to_remove > (version_t)g_conf->paxos_service_trim_max) { | |
377 | dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove | |
378 | << " > paxos_service_trim_max, limiting to " << g_conf->paxos_service_trim_max | |
379 | << dendl; | |
380 | trim_to = get_first_committed() + g_conf->paxos_service_trim_max; | |
381 | to_remove = g_conf->paxos_service_trim_max; | |
382 | } | |
383 | ||
384 | dout(10) << __func__ << " trimming to " << trim_to << ", " << to_remove << " states" << dendl; | |
385 | MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); | |
386 | trim(t, get_first_committed(), trim_to); | |
387 | put_first_committed(t, trim_to); | |
388 | ||
389 | // let the service add any extra stuff | |
390 | encode_trim_extra(t, trim_to); | |
391 | ||
392 | paxos->trigger_propose(); | |
393 | } | |
394 | ||
395 | void PaxosService::trim(MonitorDBStore::TransactionRef t, | |
396 | version_t from, version_t to) | |
397 | { | |
398 | dout(10) << __func__ << " from " << from << " to " << to << dendl; | |
399 | assert(from != to); | |
400 | ||
401 | for (version_t v = from; v < to; ++v) { | |
402 | dout(20) << __func__ << " " << v << dendl; | |
403 | t->erase(get_service_name(), v); | |
404 | ||
405 | string full_key = mon->store->combine_strings("full", v); | |
406 | if (mon->store->exists(get_service_name(), full_key)) { | |
407 | dout(20) << __func__ << " " << full_key << dendl; | |
408 | t->erase(get_service_name(), full_key); | |
409 | } | |
410 | } | |
411 | if (g_conf->mon_compact_on_trim) { | |
412 | dout(20) << " compacting prefix " << get_service_name() << dendl; | |
413 | t->compact_range(get_service_name(), stringify(from - 1), stringify(to)); | |
414 | t->compact_range(get_service_name(), | |
415 | mon->store->combine_strings(full_prefix_name, from - 1), | |
416 | mon->store->combine_strings(full_prefix_name, to)); | |
417 | } | |
418 | } | |
419 |