]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/PaxosService.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / mon / PaxosService.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "PaxosService.h"
16#include "common/Clock.h"
17#include "common/config.h"
18#include "include/stringify.h"
11fdf7f2 19#include "include/ceph_assert.h"
7c673cae
FG
20#include "mon/MonOpRequest.h"
21
f67539c2
TL
22using std::ostream;
23using std::string;
24
25using ceph::bufferlist;
26
7c673cae
FG
27#define dout_subsys ceph_subsys_paxos
28#undef dout_prefix
29#define dout_prefix _prefix(_dout, mon, paxos, service_name, get_first_committed(), get_last_committed())
f67539c2 30static ostream& _prefix(std::ostream *_dout, Monitor &mon, Paxos &paxos, string service_name,
7c673cae 31 version_t fc, version_t lc) {
f67539c2
TL
32 return *_dout << "mon." << mon.name << "@" << mon.rank
33 << "(" << mon.get_state_name()
7c673cae
FG
34 << ").paxosservice(" << service_name << " " << fc << ".." << lc << ") ";
35}
36
37bool PaxosService::dispatch(MonOpRequestRef op)
38{
11fdf7f2 39 ceph_assert(op->is_type_service() || op->is_type_command());
9f95a23c 40 auto m = op->get_req<PaxosServiceMessage>();
7c673cae
FG
41 op->mark_event("psvc:dispatch");
42
224ce89b 43 dout(10) << __func__ << " " << m << " " << *m
7c673cae
FG
44 << " from " << m->get_orig_source_inst()
45 << " con " << m->get_connection() << dendl;
46
f67539c2 47 if (mon.is_shutdown()) {
7c673cae
FG
48 return true;
49 }
50
51 // make sure this message isn't forwarded from a previous election epoch
52 if (m->rx_election_epoch &&
f67539c2 53 m->rx_election_epoch < mon.get_epoch()) {
7c673cae 54 dout(10) << " discarding forwarded message from previous election epoch "
f67539c2 55 << m->rx_election_epoch << " < " << mon.get_epoch() << dendl;
7c673cae
FG
56 return true;
57 }
58
59 // make sure the client is still connected. note that a proxied
60 // connection will be disconnected with a null message; don't drop
61 // those. also ignore loopback (e.g., log) messages.
62 if (m->get_connection() &&
63 !m->get_connection()->is_connected() &&
f67539c2 64 m->get_connection() != mon.con_self &&
7c673cae
FG
65 m->get_connection()->get_messenger() != NULL) {
66 dout(10) << " discarding message from disconnected client "
67 << m->get_source_inst() << " " << *m << dendl;
68 return true;
69 }
70
71 // make sure our map is readable and up to date
72 if (!is_readable(m->version)) {
73 dout(10) << " waiting for paxos -> readable (v" << m->version << ")" << dendl;
74 wait_for_readable(op, new C_RetryMessage(this, op), m->version);
75 return true;
76 }
77
78 // preprocess
79 if (preprocess_query(op))
80 return true; // easy!
81
82 // leader?
f67539c2
TL
83 if (!mon.is_leader()) {
84 mon.forward_request_leader(op);
7c673cae
FG
85 return true;
86 }
87
88 // writeable?
89 if (!is_writeable()) {
90 dout(10) << " waiting for paxos -> writeable" << dendl;
91 wait_for_writeable(op, new C_RetryMessage(this, op));
92 return true;
93 }
94
95 // update
31f18b77
FG
96 if (!prepare_update(op)) {
97 // no changes made.
98 return true;
99 }
100
101 if (need_immediate_propose) {
102 dout(10) << __func__ << " forced immediate propose" << dendl;
31f18b77
FG
103 propose_pending();
104 return true;
105 }
106
107 double delay = 0.0;
108 if (!should_propose(delay)) {
109 dout(10) << " not proposing" << dendl;
110 return true;
111 }
112
113 if (delay == 0.0) {
114 propose_pending();
115 return true;
116 }
117
118 // delay a bit
119 if (!proposal_timer) {
120 /**
121 * Callback class used to propose the pending value once the proposal_timer
122 * fires up.
123 */
f67539c2 124 auto do_propose = new C_MonContext{&mon, [this](int r) {
31f18b77
FG
125 proposal_timer = 0;
126 if (r >= 0) {
127 propose_pending();
128 } else if (r == -ECANCELED || r == -EAGAIN) {
129 return;
130 } else {
11fdf7f2 131 ceph_abort_msg("bad return value for proposal_timer");
31f18b77 132 }
9f95a23c 133 }};
3efd9988 134 dout(10) << " setting proposal_timer " << do_propose
31f18b77 135 << " with delay of " << delay << dendl;
f67539c2 136 proposal_timer = mon.timer.add_event_after(delay, do_propose);
31f18b77
FG
137 } else {
138 dout(10) << " proposal_timer already set" << dendl;
139 }
7c673cae
FG
140 return true;
141}
142
143void PaxosService::refresh(bool *need_bootstrap)
144{
145 // update cached versions
f67539c2
TL
146 cached_first_committed = mon.store->get(get_service_name(), first_committed_name);
147 cached_last_committed = mon.store->get(get_service_name(), last_committed_name);
7c673cae
FG
148
149 version_t new_format = get_value("format_version");
150 if (new_format != format_version) {
151 dout(1) << __func__ << " upgraded, format " << format_version << " -> " << new_format << dendl;
152 on_upgrade();
153 }
154 format_version = new_format;
155
156 dout(10) << __func__ << dendl;
157
158 update_from_paxos(need_bootstrap);
159}
160
161void PaxosService::post_refresh()
162{
163 dout(10) << __func__ << dendl;
164
165 post_paxos_update();
166
f67539c2 167 if (mon.is_peon() && !waiting_for_finished_proposal.empty()) {
7c673cae
FG
168 finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN);
169 }
170}
171
172bool PaxosService::should_propose(double& delay)
173{
174 // simple default policy: quick startup, then some damping.
31f18b77 175 if (get_last_committed() <= 1) {
7c673cae 176 delay = 0.0;
31f18b77 177 } else {
7c673cae 178 utime_t now = ceph_clock_now();
f67539c2 179 if ((now - paxos.last_commit_time) > g_conf()->paxos_propose_interval)
11fdf7f2 180 delay = (double)g_conf()->paxos_min_wait;
7c673cae 181 else
f67539c2 182 delay = (double)(g_conf()->paxos_propose_interval + paxos.last_commit_time
7c673cae
FG
183 - now);
184 }
185 return true;
186}
187
188
189void PaxosService::propose_pending()
190{
224ce89b 191 dout(10) << __func__ << dendl;
11fdf7f2
TL
192 ceph_assert(have_pending);
193 ceph_assert(!proposing);
f67539c2 194 ceph_assert(mon.is_leader());
11fdf7f2 195 ceph_assert(is_active());
7c673cae
FG
196
197 if (proposal_timer) {
198 dout(10) << " canceling proposal_timer " << proposal_timer << dendl;
f67539c2 199 mon.timer.cancel_event(proposal_timer);
7c673cae
FG
200 proposal_timer = NULL;
201 }
202
203 /**
c07f9fc5 204 * @note What we contribute to the pending Paxos transaction is
7c673cae
FG
205 * obtained by calling a function that must be implemented by
206 * the class implementing us. I.e., the function
207 * encode_pending will be the one responsible to encode
208 * whatever is pending on the implementation class into a
209 * bufferlist, so we can then propose that as a value through
210 * Paxos.
211 */
f67539c2 212 MonitorDBStore::TransactionRef t = paxos.get_pending_transaction();
7c673cae
FG
213
214 if (should_stash_full())
215 encode_full(t);
216
217 encode_pending(t);
218 have_pending = false;
219
220 if (format_version > 0) {
221 t->put(get_service_name(), "format_version", format_version);
222 }
223
224 // apply to paxos
225 proposing = true;
1e59de90 226 need_immediate_propose = false; /* reset whenever we propose */
7c673cae
FG
227 /**
228 * Callback class used to mark us as active once a proposal finishes going
229 * through Paxos.
230 *
231 * We should wake people up *only* *after* we inform the service we
232 * just went active. And we should wake people up only once we finish
233 * going active. This is why we first go active, avoiding to wake up the
234 * wrong people at the wrong time, such as waking up a C_RetryMessage
235 * before waking up a C_Active, thus ending up without a pending value.
236 */
237 class C_Committed : public Context {
238 PaxosService *ps;
239 public:
240 explicit C_Committed(PaxosService *p) : ps(p) { }
241 void finish(int r) override {
242 ps->proposing = false;
243 if (r >= 0)
244 ps->_active();
245 else if (r == -ECANCELED || r == -EAGAIN)
246 return;
247 else
11fdf7f2 248 ceph_abort_msg("bad return value for C_Committed");
7c673cae
FG
249 }
250 };
f67539c2
TL
251 paxos.queue_pending_finisher(new C_Committed(this));
252 paxos.trigger_propose();
7c673cae
FG
253}
254
255bool PaxosService::should_stash_full()
256{
257 version_t latest_full = get_version_latest_full();
258 /* @note The first member of the condition is moot and it is here just for
259 * clarity's sake. The second member would end up returing true
260 * nonetheless because, in that event,
261 * latest_full == get_trim_to() == 0.
262 */
263 return (!latest_full ||
264 (latest_full <= get_trim_to()) ||
11fdf7f2 265 (get_last_committed() - latest_full > (version_t)g_conf()->paxos_stash_full_interval));
7c673cae
FG
266}
267
268void PaxosService::restart()
269{
224ce89b 270 dout(10) << __func__ << dendl;
7c673cae
FG
271 if (proposal_timer) {
272 dout(10) << " canceling proposal_timer " << proposal_timer << dendl;
f67539c2 273 mon.timer.cancel_event(proposal_timer);
7c673cae
FG
274 proposal_timer = 0;
275 }
276
277 finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN);
278
279 if (have_pending) {
280 discard_pending();
281 have_pending = false;
282 }
283 proposing = false;
284
285 on_restart();
286}
287
288void PaxosService::election_finished()
289{
224ce89b 290 dout(10) << __func__ << dendl;
7c673cae
FG
291
292 finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN);
293
294 // make sure we update our state
295 _active();
296}
297
298void PaxosService::_active()
299{
300 if (is_proposing()) {
224ce89b 301 dout(10) << __func__ << " - proposing" << dendl;
7c673cae
FG
302 return;
303 }
304 if (!is_active()) {
224ce89b 305 dout(10) << __func__ << " - not active" << dendl;
7c673cae
FG
306 /**
307 * Callback used to make sure we call the PaxosService::_active function
308 * whenever a condition is fulfilled.
309 *
310 * This is used in multiple situations, from waiting for the Paxos to commit
311 * our proposed value, to waiting for the Paxos to become active once an
312 * election is finished.
313 */
314 class C_Active : public Context {
315 PaxosService *svc;
316 public:
317 explicit C_Active(PaxosService *s) : svc(s) {}
318 void finish(int r) override {
319 if (r >= 0)
320 svc->_active();
321 }
322 };
323 wait_for_active_ctx(new C_Active(this));
324 return;
325 }
224ce89b 326 dout(10) << __func__ << dendl;
7c673cae
FG
327
328 // create pending state?
f67539c2 329 if (mon.is_leader()) {
224ce89b 330 dout(7) << __func__ << " creating new pending" << dendl;
7c673cae
FG
331 if (!have_pending) {
332 create_pending();
333 have_pending = true;
334 }
335
336 if (get_last_committed() == 0) {
337 // create initial state
338 create_initial();
339 propose_pending();
340 return;
341 }
342 } else {
11fdf7f2 343 dout(7) << __func__ << " we are not the leader, hence we propose nothing!" << dendl;
7c673cae
FG
344 }
345
346 // wake up anyone who came in while we were proposing. note that
347 // anyone waiting for the previous proposal to commit is no longer
348 // on this list; it is on Paxos's.
349 finish_contexts(g_ceph_context, waiting_for_finished_proposal, 0);
350
f67539c2 351 if (mon.is_leader())
7c673cae
FG
352 upgrade_format();
353
354 // NOTE: it's possible that this will get called twice if we commit
355 // an old paxos value. Implementations should be mindful of that.
356 on_active();
357}
358
359
360void PaxosService::shutdown()
361{
362 cancel_events();
363
364 if (proposal_timer) {
365 dout(10) << " canceling proposal_timer " << proposal_timer << dendl;
f67539c2 366 mon.timer.cancel_event(proposal_timer);
7c673cae
FG
367 proposal_timer = 0;
368 }
369
370 finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN);
371
372 on_shutdown();
373}
374
375void PaxosService::maybe_trim()
376{
377 if (!is_writeable())
378 return;
379
522d829b 380 const version_t first_committed = get_first_committed();
7c673cae 381 version_t trim_to = get_trim_to();
522d829b
TL
382 dout(20) << __func__ << " " << first_committed << "~" << trim_to << dendl;
383
384 if (trim_to < first_committed) {
385 dout(10) << __func__ << " trim_to " << trim_to << " < first_committed "
386 << first_committed << dendl;
7c673cae 387 return;
f67539c2 388 }
7c673cae 389
522d829b 390 version_t to_remove = trim_to - first_committed;
f67539c2
TL
391 const version_t trim_min = g_conf().get_val<version_t>("paxos_service_trim_min");
392 if (trim_min > 0 &&
393 to_remove < trim_min) {
7c673cae 394 dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove
f67539c2 395 << " < paxos_service_trim_min " << trim_min << dendl;
7c673cae
FG
396 return;
397 }
398
522d829b 399 to_remove = [to_remove, trim_to, this] {
f67539c2
TL
400 const version_t trim_max = g_conf().get_val<version_t>("paxos_service_trim_max");
401 if (trim_max == 0 || to_remove < trim_max) {
402 return to_remove;
403 }
404 if (to_remove < trim_max * 1.5) {
522d829b 405 dout(10) << __func__ << " trim to " << trim_to << " would only trim " << to_remove
f67539c2
TL
406 << " > paxos_service_trim_max, limiting to " << trim_max
407 << dendl;
408 return trim_max;
409 }
410 const version_t new_trim_max = (trim_max + to_remove) / 2;
411 const uint64_t trim_max_multiplier = g_conf().get_val<uint64_t>("paxos_service_trim_max_multiplier");
412 if (trim_max_multiplier) {
413 return std::min(new_trim_max, trim_max * trim_max_multiplier);
414 } else {
415 return new_trim_max;
416 }
417 }();
522d829b 418 trim_to = first_committed + to_remove;
7c673cae
FG
419
420 dout(10) << __func__ << " trimming to " << trim_to << ", " << to_remove << " states" << dendl;
f67539c2 421 MonitorDBStore::TransactionRef t = paxos.get_pending_transaction();
522d829b 422 trim(t, first_committed, trim_to);
7c673cae 423 put_first_committed(t, trim_to);
94b18763 424 cached_first_committed = trim_to;
7c673cae
FG
425
426 // let the service add any extra stuff
427 encode_trim_extra(t, trim_to);
428
f67539c2 429 paxos.trigger_propose();
7c673cae
FG
430}
431
432void PaxosService::trim(MonitorDBStore::TransactionRef t,
433 version_t from, version_t to)
434{
435 dout(10) << __func__ << " from " << from << " to " << to << dendl;
11fdf7f2 436 ceph_assert(from != to);
7c673cae
FG
437
438 for (version_t v = from; v < to; ++v) {
439 dout(20) << __func__ << " " << v << dendl;
440 t->erase(get_service_name(), v);
441
f67539c2
TL
442 string full_key = mon.store->combine_strings("full", v);
443 if (mon.store->exists(get_service_name(), full_key)) {
7c673cae
FG
444 dout(20) << __func__ << " " << full_key << dendl;
445 t->erase(get_service_name(), full_key);
446 }
447 }
11fdf7f2 448 if (g_conf()->mon_compact_on_trim) {
7c673cae
FG
449 dout(20) << " compacting prefix " << get_service_name() << dendl;
450 t->compact_range(get_service_name(), stringify(from - 1), stringify(to));
451 t->compact_range(get_service_name(),
f67539c2
TL
452 mon.store->combine_strings(full_prefix_name, from - 1),
453 mon.store->combine_strings(full_prefix_name, to));
7c673cae
FG
454 }
455}
456
224ce89b
WB
457void PaxosService::load_health()
458{
459 bufferlist bl;
f67539c2 460 mon.store->get("health", service_name, bl);
224ce89b 461 if (bl.length()) {
11fdf7f2 462 auto p = bl.cbegin();
f67539c2 463 using ceph::decode;
11fdf7f2 464 decode(health_checks, p);
224ce89b
WB
465 }
466}