]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/PaxosService.cc
update sources to v12.1.1
[ceph.git] / ceph / src / mon / PaxosService.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "PaxosService.h"
16#include "common/Clock.h"
17#include "common/config.h"
18#include "include/stringify.h"
19#include "include/assert.h"
20#include "mon/MonOpRequest.h"
21
22#define dout_subsys ceph_subsys_paxos
23#undef dout_prefix
24#define dout_prefix _prefix(_dout, mon, paxos, service_name, get_first_committed(), get_last_committed())
25static ostream& _prefix(std::ostream *_dout, Monitor *mon, Paxos *paxos, string service_name,
26 version_t fc, version_t lc) {
27 return *_dout << "mon." << mon->name << "@" << mon->rank
28 << "(" << mon->get_state_name()
29 << ").paxosservice(" << service_name << " " << fc << ".." << lc << ") ";
30}
31
32bool PaxosService::dispatch(MonOpRequestRef op)
33{
34 assert(op->is_type_service() || op->is_type_command());
35 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
36 op->mark_event("psvc:dispatch");
37
224ce89b 38 dout(10) << __func__ << " " << m << " " << *m
7c673cae
FG
39 << " from " << m->get_orig_source_inst()
40 << " con " << m->get_connection() << dendl;
41
42 if (mon->is_shutdown()) {
43 return true;
44 }
45
46 // make sure this message isn't forwarded from a previous election epoch
47 if (m->rx_election_epoch &&
48 m->rx_election_epoch < mon->get_epoch()) {
49 dout(10) << " discarding forwarded message from previous election epoch "
50 << m->rx_election_epoch << " < " << mon->get_epoch() << dendl;
51 return true;
52 }
53
54 // make sure the client is still connected. note that a proxied
55 // connection will be disconnected with a null message; don't drop
56 // those. also ignore loopback (e.g., log) messages.
57 if (m->get_connection() &&
58 !m->get_connection()->is_connected() &&
59 m->get_connection() != mon->con_self &&
60 m->get_connection()->get_messenger() != NULL) {
61 dout(10) << " discarding message from disconnected client "
62 << m->get_source_inst() << " " << *m << dendl;
63 return true;
64 }
65
66 // make sure our map is readable and up to date
67 if (!is_readable(m->version)) {
68 dout(10) << " waiting for paxos -> readable (v" << m->version << ")" << dendl;
69 wait_for_readable(op, new C_RetryMessage(this, op), m->version);
70 return true;
71 }
72
73 // preprocess
74 if (preprocess_query(op))
75 return true; // easy!
76
77 // leader?
78 if (!mon->is_leader()) {
79 mon->forward_request_leader(op);
80 return true;
81 }
82
83 // writeable?
84 if (!is_writeable()) {
85 dout(10) << " waiting for paxos -> writeable" << dendl;
86 wait_for_writeable(op, new C_RetryMessage(this, op));
87 return true;
88 }
89
90 // update
31f18b77
FG
91 if (!prepare_update(op)) {
92 // no changes made.
93 return true;
94 }
95
96 if (need_immediate_propose) {
97 dout(10) << __func__ << " forced immediate propose" << dendl;
98 need_immediate_propose = false;
99 propose_pending();
100 return true;
101 }
102
103 double delay = 0.0;
104 if (!should_propose(delay)) {
105 dout(10) << " not proposing" << dendl;
106 return true;
107 }
108
109 if (delay == 0.0) {
110 propose_pending();
111 return true;
112 }
113
114 // delay a bit
115 if (!proposal_timer) {
116 /**
117 * Callback class used to propose the pending value once the proposal_timer
118 * fires up.
119 */
120 proposal_timer = new C_MonContext(mon, [this](int r) {
121 proposal_timer = 0;
122 if (r >= 0) {
123 propose_pending();
124 } else if (r == -ECANCELED || r == -EAGAIN) {
125 return;
126 } else {
127 assert(0 == "bad return value for proposal_timer");
128 }
129 });
130 dout(10) << " setting proposal_timer " << proposal_timer
131 << " with delay of " << delay << dendl;
132 mon->timer.add_event_after(delay, proposal_timer);
133 } else {
134 dout(10) << " proposal_timer already set" << dendl;
135 }
7c673cae
FG
136 return true;
137}
138
139void PaxosService::refresh(bool *need_bootstrap)
140{
141 // update cached versions
142 cached_first_committed = mon->store->get(get_service_name(), first_committed_name);
143 cached_last_committed = mon->store->get(get_service_name(), last_committed_name);
144
145 version_t new_format = get_value("format_version");
146 if (new_format != format_version) {
147 dout(1) << __func__ << " upgraded, format " << format_version << " -> " << new_format << dendl;
148 on_upgrade();
149 }
150 format_version = new_format;
151
152 dout(10) << __func__ << dendl;
153
154 update_from_paxos(need_bootstrap);
155}
156
157void PaxosService::post_refresh()
158{
159 dout(10) << __func__ << dendl;
160
161 post_paxos_update();
162
163 if (mon->is_peon() && !waiting_for_finished_proposal.empty()) {
164 finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN);
165 }
166}
167
168bool PaxosService::should_propose(double& delay)
169{
170 // simple default policy: quick startup, then some damping.
31f18b77 171 if (get_last_committed() <= 1) {
7c673cae 172 delay = 0.0;
31f18b77 173 } else {
7c673cae
FG
174 utime_t now = ceph_clock_now();
175 if ((now - paxos->last_commit_time) > g_conf->paxos_propose_interval)
176 delay = (double)g_conf->paxos_min_wait;
177 else
178 delay = (double)(g_conf->paxos_propose_interval + paxos->last_commit_time
179 - now);
180 }
181 return true;
182}
183
184
185void PaxosService::propose_pending()
186{
224ce89b 187 dout(10) << __func__ << dendl;
7c673cae
FG
188 assert(have_pending);
189 assert(!proposing);
190 assert(mon->is_leader());
191 assert(is_active());
192
193 if (proposal_timer) {
194 dout(10) << " canceling proposal_timer " << proposal_timer << dendl;
195 mon->timer.cancel_event(proposal_timer);
196 proposal_timer = NULL;
197 }
198
199 /**
200 * @note What we contirbute to the pending Paxos transaction is
201 * obtained by calling a function that must be implemented by
202 * the class implementing us. I.e., the function
203 * encode_pending will be the one responsible to encode
204 * whatever is pending on the implementation class into a
205 * bufferlist, so we can then propose that as a value through
206 * Paxos.
207 */
208 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
209
210 if (should_stash_full())
211 encode_full(t);
212
213 encode_pending(t);
214 have_pending = false;
215
216 if (format_version > 0) {
217 t->put(get_service_name(), "format_version", format_version);
218 }
219
220 // apply to paxos
221 proposing = true;
222 /**
223 * Callback class used to mark us as active once a proposal finishes going
224 * through Paxos.
225 *
226 * We should wake people up *only* *after* we inform the service we
227 * just went active. And we should wake people up only once we finish
228 * going active. This is why we first go active, avoiding to wake up the
229 * wrong people at the wrong time, such as waking up a C_RetryMessage
230 * before waking up a C_Active, thus ending up without a pending value.
231 */
232 class C_Committed : public Context {
233 PaxosService *ps;
234 public:
235 explicit C_Committed(PaxosService *p) : ps(p) { }
236 void finish(int r) override {
237 ps->proposing = false;
238 if (r >= 0)
239 ps->_active();
240 else if (r == -ECANCELED || r == -EAGAIN)
241 return;
242 else
243 assert(0 == "bad return value for C_Committed");
244 }
245 };
246 paxos->queue_pending_finisher(new C_Committed(this));
247 paxos->trigger_propose();
248}
249
250bool PaxosService::should_stash_full()
251{
252 version_t latest_full = get_version_latest_full();
253 /* @note The first member of the condition is moot and it is here just for
254 * clarity's sake. The second member would end up returing true
255 * nonetheless because, in that event,
256 * latest_full == get_trim_to() == 0.
257 */
258 return (!latest_full ||
259 (latest_full <= get_trim_to()) ||
260 (get_last_committed() - latest_full > (version_t)g_conf->paxos_stash_full_interval));
261}
262
263void PaxosService::restart()
264{
224ce89b 265 dout(10) << __func__ << dendl;
7c673cae
FG
266 if (proposal_timer) {
267 dout(10) << " canceling proposal_timer " << proposal_timer << dendl;
268 mon->timer.cancel_event(proposal_timer);
269 proposal_timer = 0;
270 }
271
272 finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN);
273
274 if (have_pending) {
275 discard_pending();
276 have_pending = false;
277 }
278 proposing = false;
279
280 on_restart();
281}
282
283void PaxosService::election_finished()
284{
224ce89b 285 dout(10) << __func__ << dendl;
7c673cae
FG
286
287 finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN);
288
289 // make sure we update our state
290 _active();
291}
292
293void PaxosService::_active()
294{
295 if (is_proposing()) {
224ce89b 296 dout(10) << __func__ << " - proposing" << dendl;
7c673cae
FG
297 return;
298 }
299 if (!is_active()) {
224ce89b 300 dout(10) << __func__ << " - not active" << dendl;
7c673cae
FG
301 /**
302 * Callback used to make sure we call the PaxosService::_active function
303 * whenever a condition is fulfilled.
304 *
305 * This is used in multiple situations, from waiting for the Paxos to commit
306 * our proposed value, to waiting for the Paxos to become active once an
307 * election is finished.
308 */
309 class C_Active : public Context {
310 PaxosService *svc;
311 public:
312 explicit C_Active(PaxosService *s) : svc(s) {}
313 void finish(int r) override {
314 if (r >= 0)
315 svc->_active();
316 }
317 };
318 wait_for_active_ctx(new C_Active(this));
319 return;
320 }
224ce89b 321 dout(10) << __func__ << dendl;
7c673cae
FG
322
323 // create pending state?
324 if (mon->is_leader()) {
224ce89b 325 dout(7) << __func__ << " creating new pending" << dendl;
7c673cae
FG
326 if (!have_pending) {
327 create_pending();
328 have_pending = true;
329 }
330
331 if (get_last_committed() == 0) {
332 // create initial state
333 create_initial();
334 propose_pending();
335 return;
336 }
337 } else {
338 if (!mon->is_leader()) {
339 dout(7) << __func__ << " we are not the leader, hence we propose nothing!" << dendl;
340 }
341 }
342
343 // wake up anyone who came in while we were proposing. note that
344 // anyone waiting for the previous proposal to commit is no longer
345 // on this list; it is on Paxos's.
346 finish_contexts(g_ceph_context, waiting_for_finished_proposal, 0);
347
348 if (mon->is_leader())
349 upgrade_format();
350
351 // NOTE: it's possible that this will get called twice if we commit
352 // an old paxos value. Implementations should be mindful of that.
353 on_active();
354}
355
356
357void PaxosService::shutdown()
358{
359 cancel_events();
360
361 if (proposal_timer) {
362 dout(10) << " canceling proposal_timer " << proposal_timer << dendl;
363 mon->timer.cancel_event(proposal_timer);
364 proposal_timer = 0;
365 }
366
367 finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN);
368
369 on_shutdown();
370}
371
372void PaxosService::maybe_trim()
373{
374 if (!is_writeable())
375 return;
376
377 version_t trim_to = get_trim_to();
378 if (trim_to < get_first_committed())
379 return;
380
381 version_t to_remove = trim_to - get_first_committed();
382 if (g_conf->paxos_service_trim_min > 0 &&
383 to_remove < (version_t)g_conf->paxos_service_trim_min) {
384 dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove
385 << " < paxos_service_trim_min " << g_conf->paxos_service_trim_min << dendl;
386 return;
387 }
388
389 if (g_conf->paxos_service_trim_max > 0 &&
390 to_remove > (version_t)g_conf->paxos_service_trim_max) {
391 dout(10) << __func__ << " trim_to " << trim_to << " would only trim " << to_remove
392 << " > paxos_service_trim_max, limiting to " << g_conf->paxos_service_trim_max
393 << dendl;
394 trim_to = get_first_committed() + g_conf->paxos_service_trim_max;
395 to_remove = g_conf->paxos_service_trim_max;
396 }
397
398 dout(10) << __func__ << " trimming to " << trim_to << ", " << to_remove << " states" << dendl;
399 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
400 trim(t, get_first_committed(), trim_to);
401 put_first_committed(t, trim_to);
402
403 // let the service add any extra stuff
404 encode_trim_extra(t, trim_to);
405
406 paxos->trigger_propose();
407}
408
409void PaxosService::trim(MonitorDBStore::TransactionRef t,
410 version_t from, version_t to)
411{
412 dout(10) << __func__ << " from " << from << " to " << to << dendl;
413 assert(from != to);
414
415 for (version_t v = from; v < to; ++v) {
416 dout(20) << __func__ << " " << v << dendl;
417 t->erase(get_service_name(), v);
418
419 string full_key = mon->store->combine_strings("full", v);
420 if (mon->store->exists(get_service_name(), full_key)) {
421 dout(20) << __func__ << " " << full_key << dendl;
422 t->erase(get_service_name(), full_key);
423 }
424 }
425 if (g_conf->mon_compact_on_trim) {
426 dout(20) << " compacting prefix " << get_service_name() << dendl;
427 t->compact_range(get_service_name(), stringify(from - 1), stringify(to));
428 t->compact_range(get_service_name(),
429 mon->store->combine_strings(full_prefix_name, from - 1),
430 mon->store->combine_strings(full_prefix_name, to));
431 }
432}
433
224ce89b
WB
434void PaxosService::load_health()
435{
436 bufferlist bl;
437 mon->store->get("health", service_name, bl);
438 if (bl.length()) {
439 auto p = bl.begin();
440 ::decode(health_checks, p);
441 }
442}