]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/PaxosService.cc
import ceph 16.2.6
[ceph.git] / ceph / src / mon / PaxosService.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "PaxosService.h"
16 #include "common/Clock.h"
17 #include "common/config.h"
18 #include "include/stringify.h"
19 #include "include/ceph_assert.h"
20 #include "mon/MonOpRequest.h"
21
22 using std::ostream;
23 using std::string;
24
25 using ceph::bufferlist;
26
27 #define dout_subsys ceph_subsys_paxos
28 #undef dout_prefix
29 #define dout_prefix _prefix(_dout, mon, paxos, service_name, get_first_committed(), get_last_committed())
30 static ostream& _prefix(std::ostream *_dout, Monitor &mon, Paxos &paxos, string service_name,
31 version_t fc, version_t lc) {
32 return *_dout << "mon." << mon.name << "@" << mon.rank
33 << "(" << mon.get_state_name()
34 << ").paxosservice(" << service_name << " " << fc << ".." << lc << ") ";
35 }
36
37 bool PaxosService::dispatch(MonOpRequestRef op)
38 {
39 ceph_assert(op->is_type_service() || op->is_type_command());
40 auto m = op->get_req<PaxosServiceMessage>();
41 op->mark_event("psvc:dispatch");
42
43 dout(10) << __func__ << " " << m << " " << *m
44 << " from " << m->get_orig_source_inst()
45 << " con " << m->get_connection() << dendl;
46
47 if (mon.is_shutdown()) {
48 return true;
49 }
50
51 // make sure this message isn't forwarded from a previous election epoch
52 if (m->rx_election_epoch &&
53 m->rx_election_epoch < mon.get_epoch()) {
54 dout(10) << " discarding forwarded message from previous election epoch "
55 << m->rx_election_epoch << " < " << mon.get_epoch() << dendl;
56 return true;
57 }
58
59 // make sure the client is still connected. note that a proxied
60 // connection will be disconnected with a null message; don't drop
61 // those. also ignore loopback (e.g., log) messages.
62 if (m->get_connection() &&
63 !m->get_connection()->is_connected() &&
64 m->get_connection() != mon.con_self &&
65 m->get_connection()->get_messenger() != NULL) {
66 dout(10) << " discarding message from disconnected client "
67 << m->get_source_inst() << " " << *m << dendl;
68 return true;
69 }
70
71 // make sure our map is readable and up to date
72 if (!is_readable(m->version)) {
73 dout(10) << " waiting for paxos -> readable (v" << m->version << ")" << dendl;
74 wait_for_readable(op, new C_RetryMessage(this, op), m->version);
75 return true;
76 }
77
78 // preprocess
79 if (preprocess_query(op))
80 return true; // easy!
81
82 // leader?
83 if (!mon.is_leader()) {
84 mon.forward_request_leader(op);
85 return true;
86 }
87
88 // writeable?
89 if (!is_writeable()) {
90 dout(10) << " waiting for paxos -> writeable" << dendl;
91 wait_for_writeable(op, new C_RetryMessage(this, op));
92 return true;
93 }
94
95 // update
96 if (!prepare_update(op)) {
97 // no changes made.
98 return true;
99 }
100
101 if (need_immediate_propose) {
102 dout(10) << __func__ << " forced immediate propose" << dendl;
103 need_immediate_propose = false;
104 propose_pending();
105 return true;
106 }
107
108 double delay = 0.0;
109 if (!should_propose(delay)) {
110 dout(10) << " not proposing" << dendl;
111 return true;
112 }
113
114 if (delay == 0.0) {
115 propose_pending();
116 return true;
117 }
118
119 // delay a bit
120 if (!proposal_timer) {
121 /**
122 * Callback class used to propose the pending value once the proposal_timer
123 * fires up.
124 */
125 auto do_propose = new C_MonContext{&mon, [this](int r) {
126 proposal_timer = 0;
127 if (r >= 0) {
128 propose_pending();
129 } else if (r == -ECANCELED || r == -EAGAIN) {
130 return;
131 } else {
132 ceph_abort_msg("bad return value for proposal_timer");
133 }
134 }};
135 dout(10) << " setting proposal_timer " << do_propose
136 << " with delay of " << delay << dendl;
137 proposal_timer = mon.timer.add_event_after(delay, do_propose);
138 } else {
139 dout(10) << " proposal_timer already set" << dendl;
140 }
141 return true;
142 }
143
144 void PaxosService::refresh(bool *need_bootstrap)
145 {
146 // update cached versions
147 cached_first_committed = mon.store->get(get_service_name(), first_committed_name);
148 cached_last_committed = mon.store->get(get_service_name(), last_committed_name);
149
150 version_t new_format = get_value("format_version");
151 if (new_format != format_version) {
152 dout(1) << __func__ << " upgraded, format " << format_version << " -> " << new_format << dendl;
153 on_upgrade();
154 }
155 format_version = new_format;
156
157 dout(10) << __func__ << dendl;
158
159 update_from_paxos(need_bootstrap);
160 }
161
162 void PaxosService::post_refresh()
163 {
164 dout(10) << __func__ << dendl;
165
166 post_paxos_update();
167
168 if (mon.is_peon() && !waiting_for_finished_proposal.empty()) {
169 finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN);
170 }
171 }
172
173 bool PaxosService::should_propose(double& delay)
174 {
175 // simple default policy: quick startup, then some damping.
176 if (get_last_committed() <= 1) {
177 delay = 0.0;
178 } else {
179 utime_t now = ceph_clock_now();
180 if ((now - paxos.last_commit_time) > g_conf()->paxos_propose_interval)
181 delay = (double)g_conf()->paxos_min_wait;
182 else
183 delay = (double)(g_conf()->paxos_propose_interval + paxos.last_commit_time
184 - now);
185 }
186 return true;
187 }
188
189
190 void PaxosService::propose_pending()
191 {
192 dout(10) << __func__ << dendl;
193 ceph_assert(have_pending);
194 ceph_assert(!proposing);
195 ceph_assert(mon.is_leader());
196 ceph_assert(is_active());
197
198 if (proposal_timer) {
199 dout(10) << " canceling proposal_timer " << proposal_timer << dendl;
200 mon.timer.cancel_event(proposal_timer);
201 proposal_timer = NULL;
202 }
203
204 /**
205 * @note What we contribute to the pending Paxos transaction is
206 * obtained by calling a function that must be implemented by
207 * the class implementing us. I.e., the function
208 * encode_pending will be the one responsible to encode
209 * whatever is pending on the implementation class into a
210 * bufferlist, so we can then propose that as a value through
211 * Paxos.
212 */
213 MonitorDBStore::TransactionRef t = paxos.get_pending_transaction();
214
215 if (should_stash_full())
216 encode_full(t);
217
218 encode_pending(t);
219 have_pending = false;
220
221 if (format_version > 0) {
222 t->put(get_service_name(), "format_version", format_version);
223 }
224
225 // apply to paxos
226 proposing = true;
227 /**
228 * Callback class used to mark us as active once a proposal finishes going
229 * through Paxos.
230 *
231 * We should wake people up *only* *after* we inform the service we
232 * just went active. And we should wake people up only once we finish
233 * going active. This is why we first go active, avoiding to wake up the
234 * wrong people at the wrong time, such as waking up a C_RetryMessage
235 * before waking up a C_Active, thus ending up without a pending value.
236 */
237 class C_Committed : public Context {
238 PaxosService *ps;
239 public:
240 explicit C_Committed(PaxosService *p) : ps(p) { }
241 void finish(int r) override {
242 ps->proposing = false;
243 if (r >= 0)
244 ps->_active();
245 else if (r == -ECANCELED || r == -EAGAIN)
246 return;
247 else
248 ceph_abort_msg("bad return value for C_Committed");
249 }
250 };
251 paxos.queue_pending_finisher(new C_Committed(this));
252 paxos.trigger_propose();
253 }
254
255 bool PaxosService::should_stash_full()
256 {
257 version_t latest_full = get_version_latest_full();
258 /* @note The first member of the condition is moot and it is here just for
259 * clarity's sake. The second member would end up returing true
260 * nonetheless because, in that event,
261 * latest_full == get_trim_to() == 0.
262 */
263 return (!latest_full ||
264 (latest_full <= get_trim_to()) ||
265 (get_last_committed() - latest_full > (version_t)g_conf()->paxos_stash_full_interval));
266 }
267
268 void PaxosService::restart()
269 {
270 dout(10) << __func__ << dendl;
271 if (proposal_timer) {
272 dout(10) << " canceling proposal_timer " << proposal_timer << dendl;
273 mon.timer.cancel_event(proposal_timer);
274 proposal_timer = 0;
275 }
276
277 finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN);
278
279 if (have_pending) {
280 discard_pending();
281 have_pending = false;
282 }
283 proposing = false;
284
285 on_restart();
286 }
287
288 void PaxosService::election_finished()
289 {
290 dout(10) << __func__ << dendl;
291
292 finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN);
293
294 // make sure we update our state
295 _active();
296 }
297
298 void PaxosService::_active()
299 {
300 if (is_proposing()) {
301 dout(10) << __func__ << " - proposing" << dendl;
302 return;
303 }
304 if (!is_active()) {
305 dout(10) << __func__ << " - not active" << dendl;
306 /**
307 * Callback used to make sure we call the PaxosService::_active function
308 * whenever a condition is fulfilled.
309 *
310 * This is used in multiple situations, from waiting for the Paxos to commit
311 * our proposed value, to waiting for the Paxos to become active once an
312 * election is finished.
313 */
314 class C_Active : public Context {
315 PaxosService *svc;
316 public:
317 explicit C_Active(PaxosService *s) : svc(s) {}
318 void finish(int r) override {
319 if (r >= 0)
320 svc->_active();
321 }
322 };
323 wait_for_active_ctx(new C_Active(this));
324 return;
325 }
326 dout(10) << __func__ << dendl;
327
328 // create pending state?
329 if (mon.is_leader()) {
330 dout(7) << __func__ << " creating new pending" << dendl;
331 if (!have_pending) {
332 create_pending();
333 have_pending = true;
334 }
335
336 if (get_last_committed() == 0) {
337 // create initial state
338 create_initial();
339 propose_pending();
340 return;
341 }
342 } else {
343 dout(7) << __func__ << " we are not the leader, hence we propose nothing!" << dendl;
344 }
345
346 // wake up anyone who came in while we were proposing. note that
347 // anyone waiting for the previous proposal to commit is no longer
348 // on this list; it is on Paxos's.
349 finish_contexts(g_ceph_context, waiting_for_finished_proposal, 0);
350
351 if (mon.is_leader())
352 upgrade_format();
353
354 // NOTE: it's possible that this will get called twice if we commit
355 // an old paxos value. Implementations should be mindful of that.
356 on_active();
357 }
358
359
360 void PaxosService::shutdown()
361 {
362 cancel_events();
363
364 if (proposal_timer) {
365 dout(10) << " canceling proposal_timer " << proposal_timer << dendl;
366 mon.timer.cancel_event(proposal_timer);
367 proposal_timer = 0;
368 }
369
370 finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN);
371
372 on_shutdown();
373 }
374
375 void PaxosService::maybe_trim()
376 {
377 if (!is_writeable())
378 return;
379
380 const version_t first_committed = get_first_committed();
381 version_t trim_to = get_trim_to();
382 dout(20) << __func__ << " " << first_committed << "~" << trim_to << dendl;
383
384 if (trim_to < first_committed) {
385 dout(10) << __func__ << " trim_to " << trim_to << " < first_committed "
386 << first_committed << dendl;
387 return;
388 }
389
390 version_t to_remove = trim_to - first_committed;
391 const version_t trim_min = g_conf().get_val<version_t>("paxos_service_trim_min");
392 if (trim_min > 0 &&
393 to_remove < trim_min) {
394 dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove
395 << " < paxos_service_trim_min " << trim_min << dendl;
396 return;
397 }
398
399 to_remove = [to_remove, trim_to, this] {
400 const version_t trim_max = g_conf().get_val<version_t>("paxos_service_trim_max");
401 if (trim_max == 0 || to_remove < trim_max) {
402 return to_remove;
403 }
404 if (to_remove < trim_max * 1.5) {
405 dout(10) << __func__ << " trim to " << trim_to << " would only trim " << to_remove
406 << " > paxos_service_trim_max, limiting to " << trim_max
407 << dendl;
408 return trim_max;
409 }
410 const version_t new_trim_max = (trim_max + to_remove) / 2;
411 const uint64_t trim_max_multiplier = g_conf().get_val<uint64_t>("paxos_service_trim_max_multiplier");
412 if (trim_max_multiplier) {
413 return std::min(new_trim_max, trim_max * trim_max_multiplier);
414 } else {
415 return new_trim_max;
416 }
417 }();
418 trim_to = first_committed + to_remove;
419
420 dout(10) << __func__ << " trimming to " << trim_to << ", " << to_remove << " states" << dendl;
421 MonitorDBStore::TransactionRef t = paxos.get_pending_transaction();
422 trim(t, first_committed, trim_to);
423 put_first_committed(t, trim_to);
424 cached_first_committed = trim_to;
425
426 // let the service add any extra stuff
427 encode_trim_extra(t, trim_to);
428
429 paxos.trigger_propose();
430 }
431
432 void PaxosService::trim(MonitorDBStore::TransactionRef t,
433 version_t from, version_t to)
434 {
435 dout(10) << __func__ << " from " << from << " to " << to << dendl;
436 ceph_assert(from != to);
437
438 for (version_t v = from; v < to; ++v) {
439 dout(20) << __func__ << " " << v << dendl;
440 t->erase(get_service_name(), v);
441
442 string full_key = mon.store->combine_strings("full", v);
443 if (mon.store->exists(get_service_name(), full_key)) {
444 dout(20) << __func__ << " " << full_key << dendl;
445 t->erase(get_service_name(), full_key);
446 }
447 }
448 if (g_conf()->mon_compact_on_trim) {
449 dout(20) << " compacting prefix " << get_service_name() << dendl;
450 t->compact_range(get_service_name(), stringify(from - 1), stringify(to));
451 t->compact_range(get_service_name(),
452 mon.store->combine_strings(full_prefix_name, from - 1),
453 mon.store->combine_strings(full_prefix_name, to));
454 }
455 }
456
457 void PaxosService::load_health()
458 {
459 bufferlist bl;
460 mon.store->get("health", service_name, bl);
461 if (bl.length()) {
462 auto p = bl.cbegin();
463 using ceph::decode;
464 decode(health_checks, p);
465 }
466 }