]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/Monitor.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / mon / Monitor.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
11fdf7f2 16#include <iterator>
7c673cae 17#include <sstream>
11fdf7f2 18#include <tuple>
7c673cae
FG
19#include <stdlib.h>
20#include <signal.h>
21#include <limits.h>
22#include <cstring>
23#include <boost/scope_exit.hpp>
24#include <boost/algorithm/string/predicate.hpp>
25
11fdf7f2
TL
26#include "json_spirit/json_spirit_reader.h"
27#include "json_spirit/json_spirit_writer.h"
28
7c673cae
FG
29#include "Monitor.h"
30#include "common/version.h"
11fdf7f2
TL
31#include "common/blkdev.h"
32#include "common/cmdparse.h"
33#include "common/signal.h"
7c673cae
FG
34
35#include "osd/OSDMap.h"
36
37#include "MonitorDBStore.h"
38
39#include "messages/PaxosServiceMessage.h"
40#include "messages/MMonMap.h"
41#include "messages/MMonGetMap.h"
42#include "messages/MMonGetVersion.h"
43#include "messages/MMonGetVersionReply.h"
44#include "messages/MGenericMessage.h"
45#include "messages/MMonCommand.h"
46#include "messages/MMonCommandAck.h"
7c673cae
FG
47#include "messages/MMonSync.h"
48#include "messages/MMonScrub.h"
49#include "messages/MMonProbe.h"
50#include "messages/MMonJoin.h"
51#include "messages/MMonPaxos.h"
52#include "messages/MRoute.h"
53#include "messages/MForward.h"
54
55#include "messages/MMonSubscribe.h"
56#include "messages/MMonSubscribeAck.h"
57
9f95a23c
TL
58#include "messages/MCommand.h"
59#include "messages/MCommandReply.h"
60
7c673cae
FG
61#include "messages/MAuthReply.h"
62
11fdf7f2 63#include "messages/MTimeCheck2.h"
7c673cae
FG
64#include "messages/MPing.h"
65
66#include "common/strtol.h"
67#include "common/ceph_argparse.h"
68#include "common/Timer.h"
69#include "common/Clock.h"
70#include "common/errno.h"
71#include "common/perf_counters.h"
72#include "common/admin_socket.h"
73#include "global/signal_handler.h"
74#include "common/Formatter.h"
75#include "include/stringify.h"
76#include "include/color.h"
77#include "include/ceph_fs.h"
78#include "include/str_list.h"
79
80#include "OSDMonitor.h"
81#include "MDSMonitor.h"
82#include "MonmapMonitor.h"
7c673cae
FG
83#include "LogMonitor.h"
84#include "AuthMonitor.h"
85#include "MgrMonitor.h"
31f18b77 86#include "MgrStatMonitor.h"
11fdf7f2 87#include "ConfigMonitor.h"
f67539c2 88#include "KVMonitor.h"
7c673cae 89#include "mon/HealthMonitor.h"
7c673cae
FG
90#include "common/config.h"
91#include "common/cmdparse.h"
11fdf7f2 92#include "include/ceph_assert.h"
7c673cae
FG
93#include "include/compat.h"
94#include "perfglue/heap_profiler.h"
95
96#include "auth/none/AuthNoneClientHandler.h"
97
98#define dout_subsys ceph_subsys_mon
99#undef dout_prefix
100#define dout_prefix _prefix(_dout, this)
9f95a23c 101using namespace TOPNSPC::common;
f67539c2
TL
102
103using std::cout;
104using std::dec;
105using std::hex;
106using std::list;
107using std::map;
108using std::make_pair;
109using std::ostream;
110using std::ostringstream;
111using std::pair;
112using std::set;
113using std::setfill;
114using std::string;
115using std::stringstream;
116using std::to_string;
117using std::vector;
118using std::unique_ptr;
119
120using ceph::bufferlist;
121using ceph::decode;
122using ceph::encode;
123using ceph::ErasureCodeInterfaceRef;
124using ceph::ErasureCodeProfile;
125using ceph::Formatter;
126using ceph::JSONFormatter;
127using ceph::make_message;
128using ceph::mono_clock;
129using ceph::mono_time;
130using ceph::timespan_str;
131
132
7c673cae
FG
133static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
134 return *_dout << "mon." << mon->name << "@" << mon->rank
135 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
136}
137
138const string Monitor::MONITOR_NAME = "monitor";
139const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
140
141
142#undef FLAG
143#undef COMMAND
144#undef COMMAND_WITH_FLAG
7c673cae 145#define FLAG(f) (MonCommand::FLAG_##f)
11fdf7f2
TL
146#define COMMAND(parsesig, helptext, modulename, req_perms) \
147 {parsesig, helptext, modulename, req_perms, FLAG(NONE)},
148#define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, flags) \
149 {parsesig, helptext, modulename, req_perms, flags},
d2e6a577 150MonCommand mon_commands[] = {
7c673cae 151#include <mon/MonCommands.h>
d2e6a577 152};
7c673cae
FG
153#undef COMMAND
154#undef COMMAND_WITH_FLAG
7c673cae 155
7c673cae
FG
156Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
157 Messenger *m, Messenger *mgr_m, MonMap *map) :
158 Dispatcher(cct_),
11fdf7f2 159 AuthServer(cct_),
7c673cae
FG
160 name(nm),
161 rank(-1),
162 messenger(m),
163 con_self(m ? m->get_loopback_connection() : NULL),
7c673cae
FG
164 timer(cct_, lock),
165 finisher(cct_, "mon_finisher", "fin"),
11fdf7f2 166 cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads),
7c673cae
FG
167 has_ever_joined(false),
168 logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
169 monmap(map),
170 log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
171 key_server(cct, &keyring),
172 auth_cluster_required(cct,
173 cct->_conf->auth_supported.empty() ?
174 cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
175 auth_service_required(cct,
176 cct->_conf->auth_supported.empty() ?
11fdf7f2 177 cct->_conf->auth_service_required : cct->_conf->auth_supported),
7c673cae 178 mgr_messenger(mgr_m),
9f95a23c 179 mgr_client(cct_, mgr_m, monmap),
11fdf7f2 180 gss_ktfile_client(cct->_conf.get_val<std::string>("gss_ktab_client_file")),
7c673cae
FG
181 store(s),
182
f67539c2 183 elector(this, map->strategy),
7c673cae
FG
184 required_features(0),
185 leader(0),
186 quorum_con_features(0),
187 // scrub
188 scrub_version(0),
189 scrub_event(NULL),
190 scrub_timeout_event(NULL),
191
192 // sync state
193 sync_provider_count(0),
194 sync_cookie(0),
195 sync_full(false),
196 sync_start_version(0),
197 sync_timeout_event(NULL),
198 sync_last_committed_floor(0),
199
200 timecheck_round(0),
201 timecheck_acks(0),
202 timecheck_rounds_since_clean(0),
203 timecheck_event(NULL),
204
7c673cae
FG
205 admin_hook(NULL),
206 routed_request_tid(0),
11fdf7f2 207 op_tracker(cct, g_conf().get_val<bool>("mon_enable_op_tracker"), 1)
7c673cae
FG
208{
209 clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
210 audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
211
212 update_log_clients();
213
11fdf7f2
TL
214 if (!gss_ktfile_client.empty()) {
215 // Assert we can export environment variable
216 /*
217 The default client keytab is used, if it is present and readable,
218 to automatically obtain initial credentials for GSSAPI client
219 applications. The principal name of the first entry in the client
220 keytab is used by default when obtaining initial credentials.
221 1. The KRB5_CLIENT_KTNAME environment variable.
222 2. The default_client_keytab_name profile variable in [libdefaults].
223 3. The hardcoded default, DEFCKTNAME.
224 */
225 const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
226 gss_ktfile_client.c_str(), 1));
227 ceph_assert(set_result == 0);
228 }
229
230 op_tracker.set_complaint_and_threshold(
231 g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count(),
232 g_conf().get_val<int64_t>("mon_op_log_threshold"));
233 op_tracker.set_history_size_and_duration(
234 g_conf().get_val<uint64_t>("mon_op_history_size"),
235 g_conf().get_val<std::chrono::seconds>("mon_op_history_duration").count());
236 op_tracker.set_history_slow_op_size_and_threshold(
237 g_conf().get_val<uint64_t>("mon_op_history_slow_op_size"),
238 g_conf().get_val<std::chrono::seconds>("mon_op_history_slow_op_threshold").count());
239
f67539c2 240 paxos = std::make_unique<Paxos>(*this, "paxos");
7c673cae 241
f67539c2
TL
242 paxos_service[PAXOS_MDSMAP].reset(new MDSMonitor(*this, *paxos, "mdsmap"));
243 paxos_service[PAXOS_MONMAP].reset(new MonmapMonitor(*this, *paxos, "monmap"));
244 paxos_service[PAXOS_OSDMAP].reset(new OSDMonitor(cct, *this, *paxos, "osdmap"));
245 paxos_service[PAXOS_LOG].reset(new LogMonitor(*this, *paxos, "logm"));
246 paxos_service[PAXOS_AUTH].reset(new AuthMonitor(*this, *paxos, "auth"));
247 paxos_service[PAXOS_MGR].reset(new MgrMonitor(*this, *paxos, "mgr"));
248 paxos_service[PAXOS_MGRSTAT].reset(new MgrStatMonitor(*this, *paxos, "mgrstat"));
249 paxos_service[PAXOS_HEALTH].reset(new HealthMonitor(*this, *paxos, "health"));
250 paxos_service[PAXOS_CONFIG].reset(new ConfigMonitor(*this, *paxos, "config"));
251 paxos_service[PAXOS_KV].reset(new KVMonitor(*this, *paxos, "kv"));
7c673cae 252
11fdf7f2
TL
253 bool r = mon_caps.parse("allow *", NULL);
254 ceph_assert(r);
7c673cae
FG
255
256 exited_quorum = ceph_clock_now();
257
d2e6a577 258 // prepare local commands
11fdf7f2
TL
259 local_mon_commands.resize(std::size(mon_commands));
260 for (unsigned i = 0; i < std::size(mon_commands); ++i) {
d2e6a577
FG
261 local_mon_commands[i] = mon_commands[i];
262 }
263 MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
264
11fdf7f2
TL
265 prenautilus_local_mon_commands = local_mon_commands;
266 for (auto& i : prenautilus_local_mon_commands) {
267 std::string n = cmddesc_get_prenautilus_compat(i.cmdstring);
268 if (n != i.cmdstring) {
269 dout(20) << " pre-nautilus cmd " << i.cmdstring << " -> " << n << dendl;
270 i.cmdstring = n;
271 }
d2e6a577 272 }
11fdf7f2 273 MonCommand::encode_vector(prenautilus_local_mon_commands, prenautilus_local_mon_commands_bl);
d2e6a577 274
7c673cae
FG
275 // assume our commands until we have an election. this only means
276 // we won't reply with EINVAL before the election; any command that
277 // actually matters will wait until we have quorum etc and then
278 // retry (and revalidate).
d2e6a577 279 leader_mon_commands = local_mon_commands;
7c673cae
FG
280}
281
7c673cae
FG
282Monitor::~Monitor()
283{
11fdf7f2
TL
284 op_tracker.on_shutdown();
285
f67539c2 286 delete logger;
11fdf7f2 287 ceph_assert(session_map.sessions.empty());
7c673cae
FG
288}
289
290
291class AdminHook : public AdminSocketHook {
292 Monitor *mon;
293public:
294 explicit AdminHook(Monitor *m) : mon(m) {}
9f95a23c
TL
295 int call(std::string_view command, const cmdmap_t& cmdmap,
296 Formatter *f,
297 std::ostream& errss,
298 bufferlist& out) override {
299 stringstream outss;
300 int r = mon->do_admin_command(command, cmdmap, f, errss, outss);
301 out.append(outss);
302 return r;
7c673cae
FG
303 }
304};
305
9f95a23c
TL
306int Monitor::do_admin_command(
307 std::string_view command,
308 const cmdmap_t& cmdmap,
309 Formatter *f,
310 std::ostream& err,
311 std::ostream& out)
7c673cae 312{
11fdf7f2 313 std::lock_guard l(lock);
7c673cae 314
9f95a23c 315 int r = 0;
7c673cae 316 string args;
11fdf7f2 317 for (auto p = cmdmap.begin();
7c673cae
FG
318 p != cmdmap.end(); ++p) {
319 if (p->first == "prefix")
320 continue;
321 if (!args.empty())
322 args += ", ";
323 args += cmd_vartype_stringify(p->second);
324 }
325 args = "[" + args + "]";
326
327 bool read_only = (command == "mon_status" ||
328 command == "mon metadata" ||
329 command == "quorum_status" ||
224ce89b
WB
330 command == "ops" ||
331 command == "sessions");
7c673cae
FG
332
333 (read_only ? audit_clog->debug() : audit_clog->info())
334 << "from='admin socket' entity='admin socket' "
335 << "cmd='" << command << "' args=" << args << ": dispatch";
336
337 if (command == "mon_status") {
9f95a23c 338 get_mon_status(f);
7c673cae 339 } else if (command == "quorum_status") {
9f95a23c 340 _quorum_status(f, out);
7c673cae
FG
341 } else if (command == "sync_force") {
342 string validate;
9f95a23c 343 if ((!cmd_getval(cmdmap, "validate", validate)) ||
7c673cae 344 (validate != "--yes-i-really-mean-it")) {
9f95a23c
TL
345 err << "are you SURE? this will mean the monitor store will be erased "
346 "the next time the monitor is restarted. pass "
347 "'--yes-i-really-mean-it' if you really do.";
348 r = -EPERM;
7c673cae
FG
349 goto abort;
350 }
9f95a23c 351 sync_force(f);
11fdf7f2
TL
352 } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0 ||
353 command.compare(0, 24, "add_bootstrap_peer_hintv") == 0) {
9f95a23c 354 if (!_add_bootstrap_peer_hint(command, cmdmap, out))
7c673cae
FG
355 goto abort;
356 } else if (command == "quorum enter") {
357 elector.start_participating();
358 start_election();
9f95a23c 359 out << "started responding to quorum, initiated new election";
7c673cae
FG
360 } else if (command == "quorum exit") {
361 start_election();
362 elector.stop_participating();
9f95a23c 363 out << "stopped responding to quorum, initiated new election";
7c673cae 364 } else if (command == "ops") {
9f95a23c 365 (void)op_tracker.dump_ops_in_flight(f);
224ce89b 366 } else if (command == "sessions") {
9f95a23c
TL
367 f->open_array_section("sessions");
368 for (auto p : session_map.sessions) {
369 f->dump_object("session", *p);
224ce89b 370 }
9f95a23c 371 f->close_section();
11fdf7f2 372 } else if (command == "dump_historic_ops") {
9f95a23c
TL
373 if (!op_tracker.dump_historic_ops(f)) {
374 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
11fdf7f2
TL
375 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
376 }
377 } else if (command == "dump_historic_ops_by_duration" ) {
9f95a23c
TL
378 if (op_tracker.dump_historic_ops(f, true)) {
379 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
11fdf7f2
TL
380 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
381 }
382 } else if (command == "dump_historic_slow_ops") {
9f95a23c
TL
383 if (op_tracker.dump_historic_slow_ops(f, {})) {
384 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
11fdf7f2
TL
385 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
386 }
9f95a23c
TL
387 } else if (command == "quorum") {
388 string quorumcmd;
389 cmd_getval(cmdmap, "quorumcmd", quorumcmd);
390 if (quorumcmd == "exit") {
391 start_election();
392 elector.stop_participating();
393 out << "stopped responding to quorum, initiated new election" << std::endl;
394 } else if (quorumcmd == "enter") {
395 elector.start_participating();
396 start_election();
397 out << "started responding to quorum, initiated new election" << std::endl;
398 } else {
399 err << "needs a valid 'quorum' command" << std::endl;
400 }
f67539c2
TL
401 } else if (command == "connection scores dump") {
402 if (!get_quorum_mon_features().contains_all(
403 ceph::features::mon::FEATURE_PINGING)) {
404 err << "Not all monitors support changing election strategies; \
405 please upgrade them first!";
406 }
407 elector.dump_connection_scores(f);
408 } else if (command == "connection scores reset") {
409 if (!get_quorum_mon_features().contains_all(
410 ceph::features::mon::FEATURE_PINGING)) {
411 err << "Not all monitors support changing election strategies; \
412 please upgrade them first!";
413 }
414 elector.notify_clear_peer_state();
9f95a23c
TL
415 } else if (command == "smart") {
416 string want_devid;
417 cmd_getval(cmdmap, "devid", want_devid);
418
419 string devname = store->get_devname();
420 set<string> devnames;
421 get_raw_devices(devname, &devnames);
422 json_spirit::mObject json_map;
423 uint64_t smart_timeout = cct->_conf.get_val<uint64_t>(
424 "mon_smart_report_timeout");
425 for (auto& devname : devnames) {
426 string err;
427 string devid = get_device_id(devname, &err);
428 if (want_devid.size() && want_devid != devid) {
429 derr << "get_device_id failed on " << devname << ": " << err << dendl;
430 continue;
431 }
432 json_spirit::mValue smart_json;
433 if (block_device_get_metrics(devname, smart_timeout,
434 &smart_json)) {
435 dout(10) << "block_device_get_metrics failed for /dev/" << devname
436 << dendl;
437 continue;
438 }
439 json_map[devid] = smart_json;
440 }
441 json_spirit::write(json_map, out, json_spirit::pretty_print);
442 } else if (command == "heap") {
443 if (!ceph_using_tcmalloc()) {
444 err << "could not issue heap profiler command -- not using tcmalloc!";
445 r = -EOPNOTSUPP;
446 goto abort;
447 }
448 string cmd;
449 if (!cmd_getval(cmdmap, "heapcmd", cmd)) {
450 err << "unable to get value for command \"" << cmd << "\"";
451 r = -EINVAL;
452 goto abort;
453 }
454 std::vector<std::string> cmd_vec;
455 get_str_vec(cmd, cmd_vec);
456 string val;
457 if (cmd_getval(cmdmap, "value", val)) {
458 cmd_vec.push_back(val);
459 }
460 ceph_heap_profiler_handle_command(cmd_vec, out);
461 } else if (command == "compact") {
462 dout(1) << "triggering manual compaction" << dendl;
463 auto start = ceph::coarse_mono_clock::now();
464 store->compact_async();
465 auto end = ceph::coarse_mono_clock::now();
466 auto duration = ceph::to_seconds<double>(end - start);
467 dout(1) << "finished manual compaction in "
468 << duration << " seconds" << dendl;
469 out << "compacted " << g_conf().get_val<std::string>("mon_keyvaluedb")
470 << " in " << duration << " seconds";
471 } else {
11fdf7f2 472 ceph_abort_msg("bad AdminSocket command binding");
7c673cae
FG
473 }
474 (read_only ? audit_clog->debug() : audit_clog->info())
475 << "from='admin socket' "
476 << "entity='admin socket' "
477 << "cmd=" << command << " "
478 << "args=" << args << ": finished";
9f95a23c 479 return r;
7c673cae
FG
480
481abort:
482 (read_only ? audit_clog->debug() : audit_clog->info())
483 << "from='admin socket' "
484 << "entity='admin socket' "
485 << "cmd=" << command << " "
486 << "args=" << args << ": aborted";
9f95a23c 487 return r;
7c673cae
FG
488}
489
490void Monitor::handle_signal(int signum)
491{
11fdf7f2 492 ceph_assert(signum == SIGINT || signum == SIGTERM);
7c673cae
FG
493 derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
494 shutdown();
495}
496
497CompatSet Monitor::get_initial_supported_features()
498{
499 CompatSet::FeatureSet ceph_mon_feature_compat;
500 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
501 CompatSet::FeatureSet ceph_mon_feature_incompat;
502 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
503 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
504 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
505 ceph_mon_feature_incompat);
506}
507
508CompatSet Monitor::get_supported_features()
509{
510 CompatSet compat = get_initial_supported_features();
511 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
512 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
513 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
514 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
515 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
181888fb 516 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
11fdf7f2
TL
517 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
518 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
9f95a23c 519 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS);
f67539c2 520 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_PACIFIC);
7c673cae
FG
521 return compat;
522}
523
524CompatSet Monitor::get_legacy_features()
525{
526 CompatSet::FeatureSet ceph_mon_feature_compat;
527 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
528 CompatSet::FeatureSet ceph_mon_feature_incompat;
529 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
530 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
531 ceph_mon_feature_incompat);
532}
533
534int Monitor::check_features(MonitorDBStore *store)
535{
536 CompatSet required = get_supported_features();
537 CompatSet ondisk;
538
539 read_features_off_disk(store, &ondisk);
540
541 if (!required.writeable(ondisk)) {
542 CompatSet diff = required.unsupported(ondisk);
543 generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
544 return -EPERM;
545 }
546
547 return 0;
548}
549
550void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
551{
552 bufferlist featuresbl;
553 store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
554 if (featuresbl.length() == 0) {
555 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
556 << "Assuming it is old-style and introducing one." << dendl;
557 //we only want the baseline ~v.18 features assumed to be on disk.
558 //If new features are introduced this code needs to disappear or
559 //be made smarter.
560 *features = get_legacy_features();
561
562 features->encode(featuresbl);
563 auto t(std::make_shared<MonitorDBStore::Transaction>());
564 t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
565 store->apply_transaction(t);
566 } else {
11fdf7f2 567 auto it = featuresbl.cbegin();
7c673cae
FG
568 features->decode(it);
569 }
570}
571
572void Monitor::read_features()
573{
574 read_features_off_disk(store, &features);
575 dout(10) << "features " << features << dendl;
576
577 calc_quorum_requirements();
578 dout(10) << "required_features " << required_features << dendl;
579}
580
581void Monitor::write_features(MonitorDBStore::TransactionRef t)
582{
583 bufferlist bl;
584 features.encode(bl);
585 t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
586}
587
588const char** Monitor::get_tracked_conf_keys() const
589{
590 static const char* KEYS[] = {
591 "crushtool", // helpful for testing
592 "mon_election_timeout",
593 "mon_lease",
594 "mon_lease_renew_interval_factor",
595 "mon_lease_ack_timeout_factor",
596 "mon_accept_timeout_factor",
597 // clog & admin clog
598 "clog_to_monitors",
599 "clog_to_syslog",
600 "clog_to_syslog_facility",
601 "clog_to_syslog_level",
602 "clog_to_graylog",
603 "clog_to_graylog_host",
604 "clog_to_graylog_port",
605 "host",
606 "fsid",
607 // periodic health to clog
608 "mon_health_to_clog",
609 "mon_health_to_clog_interval",
610 "mon_health_to_clog_tick_interval",
611 // scrub interval
612 "mon_scrub_interval",
11fdf7f2
TL
613 "mon_allow_pool_delete",
614 // osdmap pruning - observed, not handled.
615 "mon_osdmap_full_prune_enabled",
616 "mon_osdmap_full_prune_min",
617 "mon_osdmap_full_prune_interval",
618 "mon_osdmap_full_prune_txsize",
619 // debug options - observed, not handled
620 "mon_debug_extra_checks",
621 "mon_debug_block_osdmap_trim",
7c673cae
FG
622 NULL
623 };
624 return KEYS;
625}
626
11fdf7f2 627void Monitor::handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
628 const std::set<std::string> &changed)
629{
630 sanitize_options();
631
632 dout(10) << __func__ << " " << changed << dendl;
633
634 if (changed.count("clog_to_monitors") ||
635 changed.count("clog_to_syslog") ||
636 changed.count("clog_to_syslog_level") ||
637 changed.count("clog_to_syslog_facility") ||
638 changed.count("clog_to_graylog") ||
639 changed.count("clog_to_graylog_host") ||
640 changed.count("clog_to_graylog_port") ||
641 changed.count("host") ||
642 changed.count("fsid")) {
643 update_log_clients();
644 }
645
646 if (changed.count("mon_health_to_clog") ||
647 changed.count("mon_health_to_clog_interval") ||
648 changed.count("mon_health_to_clog_tick_interval")) {
9f95a23c
TL
649 finisher.queue(new C_MonContext{this, [this, changed](int) {
650 std::lock_guard l{lock};
651 health_to_clog_update_conf(changed);
652 }});
7c673cae
FG
653 }
654
655 if (changed.count("mon_scrub_interval")) {
f67539c2
TL
656 auto scrub_interval =
657 conf.get_val<std::chrono::seconds>("mon_scrub_interval");
9f95a23c
TL
658 finisher.queue(new C_MonContext{this, [this, scrub_interval](int) {
659 std::lock_guard l{lock};
494da23a 660 scrub_update_interval(scrub_interval);
9f95a23c 661 }});
7c673cae
FG
662 }
663}
664
665void Monitor::update_log_clients()
666{
667 map<string,string> log_to_monitors;
668 map<string,string> log_to_syslog;
669 map<string,string> log_channel;
670 map<string,string> log_prio;
671 map<string,string> log_to_graylog;
672 map<string,string> log_to_graylog_host;
673 map<string,string> log_to_graylog_port;
674 uuid_d fsid;
675 string host;
676
677 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
678 log_channel, log_prio, log_to_graylog,
679 log_to_graylog_host, log_to_graylog_port,
680 fsid, host))
681 return;
682
683 clog->update_config(log_to_monitors, log_to_syslog,
684 log_channel, log_prio, log_to_graylog,
685 log_to_graylog_host, log_to_graylog_port,
686 fsid, host);
687
688 audit_clog->update_config(log_to_monitors, log_to_syslog,
689 log_channel, log_prio, log_to_graylog,
690 log_to_graylog_host, log_to_graylog_port,
691 fsid, host);
692}
693
694int Monitor::sanitize_options()
695{
696 int r = 0;
697
698 // mon_lease must be greater than mon_lease_renewal; otherwise we
699 // may incur in leases expiring before they are renewed.
11fdf7f2 700 if (g_conf()->mon_lease_renew_interval_factor >= 1.0) {
7c673cae 701 clog->error() << "mon_lease_renew_interval_factor ("
11fdf7f2 702 << g_conf()->mon_lease_renew_interval_factor
7c673cae
FG
703 << ") must be less than 1.0";
704 r = -EINVAL;
705 }
706
707 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
708 // got time to renew the lease and get an ack for it. Having both options
709 // with the same value, for a given small vale, could mean timing out if
710 // the monitors happened to be overloaded -- or even under normal load for
711 // a small enough value.
11fdf7f2 712 if (g_conf()->mon_lease_ack_timeout_factor <= 1.0) {
7c673cae 713 clog->error() << "mon_lease_ack_timeout_factor ("
11fdf7f2 714 << g_conf()->mon_lease_ack_timeout_factor
7c673cae
FG
715 << ") must be greater than 1.0";
716 r = -EINVAL;
717 }
718
719 return r;
720}
721
722int Monitor::preinit()
723{
9f95a23c 724 std::unique_lock l(lock);
7c673cae
FG
725
726 dout(1) << "preinit fsid " << monmap->fsid << dendl;
727
728 int r = sanitize_options();
729 if (r < 0) {
730 derr << "option sanitization failed!" << dendl;
7c673cae
FG
731 return r;
732 }
733
11fdf7f2 734 ceph_assert(!logger);
7c673cae
FG
735 {
736 PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
3efd9988
FG
737 pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess",
738 PerfCountersBuilder::PRIO_USEFUL);
739 pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions",
740 "sadd", PerfCountersBuilder::PRIO_INTERESTING);
741 pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions",
742 "srm", PerfCountersBuilder::PRIO_INTERESTING);
743 pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions",
744 "strm", PerfCountersBuilder::PRIO_USEFUL);
745 pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in",
746 "ecnt", PerfCountersBuilder::PRIO_USEFUL);
747 pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started",
748 "estt", PerfCountersBuilder::PRIO_INTERESTING);
749 pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won",
750 "ewon", PerfCountersBuilder::PRIO_INTERESTING);
751 pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost",
752 "elst", PerfCountersBuilder::PRIO_INTERESTING);
7c673cae
FG
753 logger = pcb.create_perf_counters();
754 cct->get_perfcounters_collection()->add(logger);
755 }
756
11fdf7f2 757 ceph_assert(!cluster_logger);
7c673cae
FG
758 {
759 PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
760 pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
761 pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
762 pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
763 pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
764 pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
765 pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
11fdf7f2
TL
766 pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster", NULL, 0, unit_t(UNIT_BYTES));
767 pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space", NULL, 0, unit_t(UNIT_BYTES));
768 pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
769 pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
770 pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
771 pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
772 pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
773 pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
774 pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
775 pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
776 pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
777 pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
11fdf7f2 778 pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
779 cluster_logger = pcb.create_perf_counters();
780 }
781
782 paxos->init_logger();
783
784 // verify cluster_uuid
785 {
786 int r = check_fsid();
787 if (r == -ENOENT)
788 r = write_fsid();
789 if (r < 0) {
7c673cae
FG
790 return r;
791 }
792 }
793
794 // open compatset
795 read_features();
796
797 // have we ever joined a quorum?
798 has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
799 dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
800
801 if (!has_ever_joined) {
802 // impose initial quorum restrictions?
803 list<string> initial_members;
11fdf7f2 804 get_str_list(g_conf()->mon_initial_members, initial_members);
7c673cae
FG
805
806 if (!initial_members.empty()) {
807 dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
808
11fdf7f2
TL
809 monmap->set_initial_members(
810 g_ceph_context, initial_members, name, messenger->get_myaddrs(),
811 &extra_probe_peers);
7c673cae
FG
812
813 dout(10) << " monmap is " << *monmap << dendl;
814 dout(10) << " extra probe peers " << extra_probe_peers << dendl;
815 }
816 } else if (!monmap->contains(name)) {
817 derr << "not in monmap and have been in a quorum before; "
818 << "must have been removed" << dendl;
11fdf7f2 819 if (g_conf()->mon_force_quorum_join) {
7c673cae
FG
820 dout(0) << "we should have died but "
821 << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
822 } else {
823 derr << "commit suicide!" << dendl;
7c673cae
FG
824 return -ENOENT;
825 }
826 }
827
828 {
829 // We have a potentially inconsistent store state in hands. Get rid of it
830 // and start fresh.
831 bool clear_store = false;
832 if (store->exists("mon_sync", "in_sync")) {
833 dout(1) << __func__ << " clean up potentially inconsistent store state"
834 << dendl;
835 clear_store = true;
836 }
837
838 if (store->get("mon_sync", "force_sync") > 0) {
839 dout(1) << __func__ << " force sync by clearing store state" << dendl;
840 clear_store = true;
841 }
842
843 if (clear_store) {
844 set<string> sync_prefixes = get_sync_targets_names();
845 store->clear(sync_prefixes);
846 }
847 }
848
849 sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
850 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
851
852 init_paxos();
7c673cae
FG
853
854 if (is_keyring_required()) {
855 // we need to bootstrap authentication keys so we can form an
856 // initial quorum.
857 if (authmon()->get_last_committed() == 0) {
858 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
859 bufferlist bl;
860 int err = store->get("mkfs", "keyring", bl);
861 if (err == 0 && bl.length() > 0) {
862 // Attempt to decode and extract keyring only if it is found.
863 KeyRing keyring;
11fdf7f2
TL
864 auto p = bl.cbegin();
865 decode(keyring, p);
7c673cae
FG
866 extract_save_mon_key(keyring);
867 }
868 }
869
11fdf7f2 870 string keyring_loc = g_conf()->mon_data + "/keyring";
7c673cae
FG
871
872 r = keyring.load(cct, keyring_loc);
873 if (r < 0) {
874 EntityName mon_name;
875 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
876 EntityAuth mon_key;
877 if (key_server.get_auth(mon_name, mon_key)) {
878 dout(1) << "copying mon. key from old db to external keyring" << dendl;
879 keyring.add(mon_name, mon_key);
880 bufferlist bl;
881 keyring.encode_plaintext(bl);
882 write_default_keyring(bl);
883 } else {
11fdf7f2 884 derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
7c673cae
FG
885 return r;
886 }
887 }
888 }
889
890 admin_hook = new AdminHook(this);
891 AdminSocket* admin_socket = cct->get_admin_socket();
892
893 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
9f95a23c
TL
894 l.unlock();
895 // register tell/asock commands
896 for (const auto& command : local_mon_commands) {
897 if (!command.is_tell()) {
898 continue;
899 }
900 const auto prefix = cmddesc_get_prefix(command.cmdstring);
901 if (prefix == "injectargs" ||
902 prefix == "version" ||
903 prefix == "tell") {
904 // not registerd by me
905 continue;
906 }
907 r = admin_socket->register_command(command.cmdstring, admin_hook,
908 command.helpstring);
909 ceph_assert(r == 0);
910 }
911 l.lock();
7c673cae
FG
912
913 // add ourselves as a conf observer
11fdf7f2
TL
914 g_conf().add_observer(this);
915
916 messenger->set_auth_client(this);
917 messenger->set_auth_server(this);
918 mgr_messenger->set_auth_client(this);
919
920 auth_registry.refresh_config();
7c673cae 921
7c673cae
FG
922 return 0;
923}
924
925int Monitor::init()
926{
927 dout(2) << "init" << dendl;
11fdf7f2 928 std::lock_guard l(lock);
7c673cae
FG
929
930 finisher.start();
931
932 // start ticker
933 timer.init();
934 new_tick();
935
936 cpu_tp.start();
937
938 // i'm ready!
939 messenger->add_dispatcher_tail(this);
940
11fdf7f2 941 // kickstart pet mgrclient
7c673cae
FG
942 mgr_client.init();
943 mgr_messenger->add_dispatcher_tail(&mgr_client);
944 mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
11fdf7f2 945 mgrmon()->prime_mgr_client();
7c673cae 946
11fdf7f2 947 state = STATE_PROBING;
7c673cae 948 bootstrap();
b5b8bbf5
FG
949 // add features of myself into feature_map
950 session_map.feature_map.add_mon(con_self->get_features());
7c673cae
FG
951 return 0;
952}
953
954void Monitor::init_paxos()
955{
956 dout(10) << __func__ << dendl;
957 paxos->init();
958
959 // init services
11fdf7f2
TL
960 for (auto& svc : paxos_service) {
961 svc->init();
7c673cae
FG
962 }
963
964 refresh_from_paxos(NULL);
965}
966
967void Monitor::refresh_from_paxos(bool *need_bootstrap)
968{
969 dout(10) << __func__ << dendl;
970
971 bufferlist bl;
972 int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
973 if (r >= 0) {
974 try {
11fdf7f2
TL
975 auto p = bl.cbegin();
976 decode(fingerprint, p);
7c673cae 977 }
f67539c2 978 catch (ceph::buffer::error& e) {
7c673cae
FG
979 dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
980 }
981 } else {
982 dout(10) << __func__ << " no cluster_fingerprint" << dendl;
983 }
984
11fdf7f2
TL
985 for (auto& svc : paxos_service) {
986 svc->refresh(need_bootstrap);
7c673cae 987 }
11fdf7f2
TL
988 for (auto& svc : paxos_service) {
989 svc->post_refresh();
7c673cae 990 }
224ce89b 991 load_metadata();
7c673cae
FG
992}
993
994void Monitor::register_cluster_logger()
995{
996 if (!cluster_logger_registered) {
997 dout(10) << "register_cluster_logger" << dendl;
998 cluster_logger_registered = true;
999 cct->get_perfcounters_collection()->add(cluster_logger);
1000 } else {
1001 dout(10) << "register_cluster_logger - already registered" << dendl;
1002 }
1003}
1004
1005void Monitor::unregister_cluster_logger()
1006{
1007 if (cluster_logger_registered) {
1008 dout(10) << "unregister_cluster_logger" << dendl;
1009 cluster_logger_registered = false;
1010 cct->get_perfcounters_collection()->remove(cluster_logger);
1011 } else {
1012 dout(10) << "unregister_cluster_logger - not registered" << dendl;
1013 }
1014}
1015
1016void Monitor::update_logger()
1017{
1018 cluster_logger->set(l_cluster_num_mon, monmap->size());
1019 cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
1020}
1021
1022void Monitor::shutdown()
1023{
1024 dout(1) << "shutdown" << dendl;
1025
9f95a23c 1026 lock.lock();
7c673cae
FG
1027
1028 wait_for_paxos_write();
1029
11fdf7f2
TL
1030 {
1031 std::lock_guard l(auth_lock);
1032 authmon()->_set_mon_num_rank(0, 0);
1033 }
1034
7c673cae
FG
1035 state = STATE_SHUTDOWN;
1036
9f95a23c 1037 lock.unlock();
11fdf7f2 1038 g_conf().remove_observer(this);
9f95a23c 1039 lock.lock();
7c673cae
FG
1040
1041 if (admin_hook) {
11fdf7f2 1042 cct->get_admin_socket()->unregister_commands(admin_hook);
7c673cae
FG
1043 delete admin_hook;
1044 admin_hook = NULL;
1045 }
1046
1047 elector.shutdown();
1048
1049 mgr_client.shutdown();
1050
9f95a23c 1051 lock.unlock();
7c673cae
FG
1052 finisher.wait_for_empty();
1053 finisher.stop();
9f95a23c 1054 lock.lock();
7c673cae
FG
1055
1056 // clean up
1057 paxos->shutdown();
11fdf7f2
TL
1058 for (auto& svc : paxos_service) {
1059 svc->shutdown();
1060 }
7c673cae
FG
1061
1062 finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
1063 finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
1064
1065 timer.shutdown();
1066
1067 cpu_tp.stop();
1068
1069 remove_all_sessions();
1070
f64942e4
AA
1071 log_client.shutdown();
1072
1073 // unlock before msgr shutdown...
9f95a23c 1074 lock.unlock();
f64942e4
AA
1075
1076 // shutdown messenger before removing logger from perfcounter collection,
1077 // otherwise _ms_dispatch() will try to update deleted logger
1078 messenger->shutdown();
1079 mgr_messenger->shutdown();
1080
7c673cae
FG
1081 if (logger) {
1082 cct->get_perfcounters_collection()->remove(logger);
7c673cae
FG
1083 }
1084 if (cluster_logger) {
1085 if (cluster_logger_registered)
1086 cct->get_perfcounters_collection()->remove(cluster_logger);
1087 delete cluster_logger;
1088 cluster_logger = NULL;
1089 }
7c673cae
FG
1090}
1091
1092void Monitor::wait_for_paxos_write()
1093{
1094 if (paxos->is_writing() || paxos->is_writing_previous()) {
1095 dout(10) << __func__ << " flushing pending write" << dendl;
9f95a23c 1096 lock.unlock();
7c673cae 1097 store->flush();
9f95a23c 1098 lock.lock();
7c673cae
FG
1099 dout(10) << __func__ << " flushed pending write" << dendl;
1100 }
1101}
1102
11fdf7f2
TL
1103void Monitor::respawn()
1104{
1105 // --- WARNING TO FUTURE COPY/PASTERS ---
1106 // You must also add a call like
1107 //
1108 // ceph_pthread_setname(pthread_self(), "ceph-mon");
1109 //
1110 // to main() so that /proc/$pid/stat field 2 contains "(ceph-mon)"
1111 // instead of "(exe)", so that killall (and log rotation) will work.
1112
1113 dout(0) << __func__ << dendl;
1114
1115 char *new_argv[orig_argc+1];
1116 dout(1) << " e: '" << orig_argv[0] << "'" << dendl;
1117 for (int i=0; i<orig_argc; i++) {
1118 new_argv[i] = (char *)orig_argv[i];
1119 dout(1) << " " << i << ": '" << orig_argv[i] << "'" << dendl;
1120 }
1121 new_argv[orig_argc] = NULL;
1122
1123 /* Determine the path to our executable, test if Linux /proc/self/exe exists.
1124 * This allows us to exec the same executable even if it has since been
1125 * unlinked.
1126 */
1127 char exe_path[PATH_MAX] = "";
1128#ifdef PROCPREFIX
1129 if (readlink(PROCPREFIX "/proc/self/exe", exe_path, PATH_MAX-1) != -1) {
1130 dout(1) << "respawning with exe " << exe_path << dendl;
1131 strcpy(exe_path, PROCPREFIX "/proc/self/exe");
1132 } else {
1133#else
1134 {
1135#endif
1136 /* Print CWD for the user's interest */
1137 char buf[PATH_MAX];
1138 char *cwd = getcwd(buf, sizeof(buf));
1139 ceph_assert(cwd);
1140 dout(1) << " cwd " << cwd << dendl;
1141
1142 /* Fall back to a best-effort: just running in our CWD */
1143 strncpy(exe_path, orig_argv[0], PATH_MAX-1);
1144 }
1145
1146 dout(1) << " exe_path " << exe_path << dendl;
1147
1148 unblock_all_signals(NULL);
1149 execv(exe_path, new_argv);
1150
1151 dout(0) << "respawn execv " << orig_argv[0]
1152 << " failed with " << cpp_strerror(errno) << dendl;
1153
1154 // We have to assert out here, because suicide() returns, and callers
1155 // to respawn expect it never to return.
1156 ceph_abort();
1157}
1158
7c673cae
FG
1159void Monitor::bootstrap()
1160{
1161 dout(10) << "bootstrap" << dendl;
1162 wait_for_paxos_write();
1163
1164 sync_reset_requester();
1165 unregister_cluster_logger();
1166 cancel_probe_timeout();
1167
11fdf7f2
TL
1168 if (monmap->get_epoch() == 0) {
1169 dout(10) << "reverting to legacy ranks for seed monmap (epoch 0)" << dendl;
1170 monmap->calc_legacy_ranks();
1171 }
1172 dout(10) << "monmap " << *monmap << dendl;
9f95a23c
TL
1173 {
1174 auto from_release = monmap->min_mon_release;
1175 ostringstream err;
1176 if (!can_upgrade_from(from_release, "min_mon_release", err)) {
1177 derr << "current monmap has " << err.str() << " stopping." << dendl;
1178 exit(0);
1179 }
11fdf7f2 1180 }
7c673cae 1181 // note my rank
11fdf7f2 1182 int newrank = monmap->get_rank(messenger->get_myaddrs());
7c673cae
FG
1183 if (newrank < 0 && rank >= 0) {
1184 // was i ever part of the quorum?
1185 if (has_ever_joined) {
1186 dout(0) << " removed from monmap, suicide." << dendl;
1187 exit(0);
1188 }
f67539c2 1189 elector.notify_clear_peer_state();
7c673cae 1190 }
11fdf7f2
TL
1191 if (newrank >= 0 &&
1192 monmap->get_addrs(newrank) != messenger->get_myaddrs()) {
1193 dout(0) << " monmap addrs for rank " << newrank << " changed, i am "
1194 << messenger->get_myaddrs()
1195 << ", monmap is " << monmap->get_addrs(newrank) << ", respawning"
1196 << dendl;
9f95a23c
TL
1197
1198 if (monmap->get_epoch()) {
1199 // store this map in temp mon_sync location so that we use it on
1200 // our next startup
1201 derr << " stashing newest monmap " << monmap->get_epoch()
1202 << " for next startup" << dendl;
1203 bufferlist bl;
1204 monmap->encode(bl, -1);
1205 auto t(std::make_shared<MonitorDBStore::Transaction>());
1206 t->put("mon_sync", "temp_newer_monmap", bl);
1207 store->apply_transaction(t);
1208 }
1209
11fdf7f2
TL
1210 respawn();
1211 }
7c673cae
FG
1212 if (newrank != rank) {
1213 dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
1214 messenger->set_myname(entity_name_t::MON(newrank));
1215 rank = newrank;
f67539c2 1216 elector.notify_rank_changed(rank);
7c673cae
FG
1217
1218 // reset all connections, or else our peers will think we are someone else.
1219 messenger->mark_down_all();
1220 }
1221
1222 // reset
1223 state = STATE_PROBING;
1224
1225 _reset();
1226
1227 // sync store
11fdf7f2 1228 if (g_conf()->mon_compact_on_bootstrap) {
7c673cae
FG
1229 dout(10) << "bootstrap -- triggering compaction" << dendl;
1230 store->compact();
1231 dout(10) << "bootstrap -- finished compaction" << dendl;
1232 }
1233
f67539c2
TL
1234 // stretch mode bits
1235 set_elector_disallowed_leaders(false);
1236
7c673cae
FG
1237 // singleton monitor?
1238 if (monmap->size() == 1 && rank == 0) {
1239 win_standalone_election();
1240 return;
1241 }
1242
1243 reset_probe_timeout();
1244
1245 // i'm outside the quorum
1246 if (monmap->contains(name))
1247 outside_quorum.insert(name);
1248
1249 // probe monitors
1250 dout(10) << "probing other monitors" << dendl;
1251 for (unsigned i = 0; i < monmap->size(); i++) {
1252 if ((int)i != rank)
11fdf7f2
TL
1253 send_mon_message(
1254 new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
1255 ceph_release()),
1256 i);
1257 }
1258 for (auto& av : extra_probe_peers) {
1259 if (av != messenger->get_myaddrs()) {
1260 messenger->send_to_mon(
1261 new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
1262 ceph_release()),
1263 av);
7c673cae
FG
1264 }
1265 }
1266}
1267
11fdf7f2
TL
1268bool Monitor::_add_bootstrap_peer_hint(std::string_view cmd,
1269 const cmdmap_t& cmdmap,
1270 ostream& ss)
7c673cae 1271{
7c673cae
FG
1272 if (is_leader() || is_peon()) {
1273 ss << "mon already active; ignoring bootstrap hint";
1274 return true;
1275 }
1276
11fdf7f2
TL
1277 entity_addrvec_t addrs;
1278 string addrstr;
9f95a23c 1279 if (cmd_getval(cmdmap, "addr", addrstr)) {
11fdf7f2
TL
1280 dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' addr '"
1281 << addrstr << "'" << dendl;
1282
1283 entity_addr_t addr;
1284 const char *end = 0;
1285 if (!addr.parse(addrstr.c_str(), &end, entity_addr_t::TYPE_ANY)) {
1286 ss << "failed to parse addrs '" << addrstr
1287 << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1288 return false;
1289 }
1290
1291 addrs.v.push_back(addr);
1292 if (addr.get_port() == 0) {
1293 addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
1294 addrs.v[0].set_port(CEPH_MON_PORT_IANA);
1295 addrs.v.push_back(addr);
1296 addrs.v[1].set_type(entity_addr_t::TYPE_LEGACY);
1297 addrs.v[1].set_port(CEPH_MON_PORT_LEGACY);
1298 } else if (addr.get_type() == entity_addr_t::TYPE_ANY) {
1299 if (addr.get_port() == CEPH_MON_PORT_LEGACY) {
1300 addrs.v[0].set_type(entity_addr_t::TYPE_LEGACY);
1301 } else {
1302 addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
1303 }
1304 }
9f95a23c 1305 } else if (cmd_getval(cmdmap, "addrv", addrstr)) {
11fdf7f2
TL
1306 dout(10) << "_add_bootstrap_peer_hintv '" << cmd << "' addrv '"
1307 << addrstr << "'" << dendl;
1308 const char *end = 0;
1309 if (!addrs.parse(addrstr.c_str(), &end)) {
1310 ss << "failed to parse addrs '" << addrstr
1311 << "'; syntax is 'add_bootstrap_peer_hintv v2:ip:port[,v1:ip:port]'";
1312 return false;
1313 }
1314 } else {
1315 ss << "no addr or addrv provided";
1316 return false;
1317 }
7c673cae 1318
11fdf7f2
TL
1319 extra_probe_peers.insert(addrs);
1320 ss << "adding peer " << addrs << " to list: " << extra_probe_peers;
7c673cae
FG
1321 return true;
1322}
1323
1324// called by bootstrap(), or on leader|peon -> electing
1325void Monitor::_reset()
1326{
1327 dout(10) << __func__ << dendl;
1328
11fdf7f2
TL
1329 // disable authentication
1330 {
1331 std::lock_guard l(auth_lock);
1332 authmon()->_set_mon_num_rank(0, 0);
1333 }
1334
7c673cae
FG
1335 cancel_probe_timeout();
1336 timecheck_finish();
1337 health_events_cleanup();
181888fb 1338 health_check_log_times.clear();
7c673cae
FG
1339 scrub_event_cancel();
1340
1341 leader_since = utime_t();
11fdf7f2 1342 quorum_since = {};
7c673cae
FG
1343 if (!quorum.empty()) {
1344 exited_quorum = ceph_clock_now();
1345 }
1346 quorum.clear();
1347 outside_quorum.clear();
31f18b77 1348 quorum_feature_map.clear();
7c673cae
FG
1349
1350 scrub_reset();
1351
1352 paxos->restart();
1353
11fdf7f2
TL
1354 for (auto& svc : paxos_service) {
1355 svc->restart();
1356 }
7c673cae
FG
1357}
1358
1359
1360// -----------------------------------------------------------
1361// sync
1362
1363set<string> Monitor::get_sync_targets_names()
1364{
1365 set<string> targets;
1366 targets.insert(paxos->get_name());
11fdf7f2
TL
1367 for (auto& svc : paxos_service) {
1368 svc->get_store_prefixes(targets);
1369 }
7c673cae
FG
1370 return targets;
1371}
1372
1373
1374void Monitor::sync_timeout()
1375{
1376 dout(10) << __func__ << dendl;
11fdf7f2 1377 ceph_assert(state == STATE_SYNCHRONIZING);
7c673cae
FG
1378 bootstrap();
1379}
1380
1381void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1382{
1383 dout(1) << __func__ << dendl;
1384
1385 MonMap latest_monmap;
1386
1387 // Grab latest monmap from MonmapMonitor
1388 bufferlist monmon_bl;
1389 int err = monmon()->get_monmap(monmon_bl);
1390 if (err < 0) {
1391 if (err != -ENOENT) {
1392 derr << __func__
1393 << " something wrong happened while reading the store: "
1394 << cpp_strerror(err) << dendl;
11fdf7f2 1395 ceph_abort_msg("error reading the store");
7c673cae
FG
1396 }
1397 } else {
1398 latest_monmap.decode(monmon_bl);
1399 }
1400
1401 // Grab last backed up monmap (if any) and compare epochs
1402 if (store->exists("mon_sync", "latest_monmap")) {
1403 bufferlist backup_bl;
1404 int err = store->get("mon_sync", "latest_monmap", backup_bl);
1405 if (err < 0) {
1406 derr << __func__
1407 << " something wrong happened while reading the store: "
1408 << cpp_strerror(err) << dendl;
11fdf7f2 1409 ceph_abort_msg("error reading the store");
7c673cae 1410 }
11fdf7f2 1411 ceph_assert(backup_bl.length() > 0);
7c673cae
FG
1412
1413 MonMap backup_monmap;
1414 backup_monmap.decode(backup_bl);
1415
1416 if (backup_monmap.epoch > latest_monmap.epoch)
1417 latest_monmap = backup_monmap;
1418 }
1419
1420 // Check if our current monmap's epoch is greater than the one we've
1421 // got so far.
1422 if (monmap->epoch > latest_monmap.epoch)
1423 latest_monmap = *monmap;
1424
1425 dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1426
1427 latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1428}
1429
1430void Monitor::sync_reset_requester()
1431{
1432 dout(10) << __func__ << dendl;
1433
1434 if (sync_timeout_event) {
1435 timer.cancel_event(sync_timeout_event);
1436 sync_timeout_event = NULL;
1437 }
1438
11fdf7f2 1439 sync_provider = entity_addrvec_t();
7c673cae
FG
1440 sync_cookie = 0;
1441 sync_full = false;
1442 sync_start_version = 0;
1443}
1444
1445void Monitor::sync_reset_provider()
1446{
1447 dout(10) << __func__ << dendl;
1448 sync_providers.clear();
1449}
1450
11fdf7f2 1451void Monitor::sync_start(entity_addrvec_t &addrs, bool full)
7c673cae 1452{
11fdf7f2 1453 dout(10) << __func__ << " " << addrs << (full ? " full" : " recent") << dendl;
7c673cae 1454
11fdf7f2 1455 ceph_assert(state == STATE_PROBING ||
7c673cae
FG
1456 state == STATE_SYNCHRONIZING);
1457 state = STATE_SYNCHRONIZING;
1458
1459 // make sure are not a provider for anyone!
1460 sync_reset_provider();
1461
1462 sync_full = full;
1463
1464 if (sync_full) {
1465 // stash key state, and mark that we are syncing
1466 auto t(std::make_shared<MonitorDBStore::Transaction>());
1467 sync_stash_critical_state(t);
1468 t->put("mon_sync", "in_sync", 1);
1469
11fdf7f2 1470 sync_last_committed_floor = std::max(sync_last_committed_floor, paxos->get_version());
7c673cae
FG
1471 dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1472 << sync_last_committed_floor << dendl;
1473 t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1474
1475 store->apply_transaction(t);
1476
11fdf7f2 1477 ceph_assert(g_conf()->mon_sync_requester_kill_at != 1);
7c673cae
FG
1478
1479 // clear the underlying store
1480 set<string> targets = get_sync_targets_names();
1481 dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1482 store->clear(targets);
1483
1484 // make sure paxos knows it has been reset. this prevents a
1485 // bootstrap and then different probe reply order from possibly
1486 // deciding a partial or no sync is needed.
1487 paxos->init();
1488
11fdf7f2 1489 ceph_assert(g_conf()->mon_sync_requester_kill_at != 2);
7c673cae
FG
1490 }
1491
1492 // assume 'other' as the leader. We will update the leader once we receive
1493 // a reply to the sync start.
11fdf7f2 1494 sync_provider = addrs;
7c673cae
FG
1495
1496 sync_reset_timeout();
1497
1498 MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1499 if (!sync_full)
1500 m->last_committed = paxos->get_version();
11fdf7f2 1501 messenger->send_to_mon(m, sync_provider);
7c673cae
FG
1502}
1503
1504void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1505{
1506 dout(10) << __func__ << dendl;
1507 bufferlist backup_monmap;
1508 sync_obtain_latest_monmap(backup_monmap);
11fdf7f2 1509 ceph_assert(backup_monmap.length() > 0);
7c673cae
FG
1510 t->put("mon_sync", "latest_monmap", backup_monmap);
1511}
1512
1513void Monitor::sync_reset_timeout()
1514{
1515 dout(10) << __func__ << dendl;
1516 if (sync_timeout_event)
1517 timer.cancel_event(sync_timeout_event);
3efd9988 1518 sync_timeout_event = timer.add_event_after(
11fdf7f2 1519 g_conf()->mon_sync_timeout,
9f95a23c 1520 new C_MonContext{this, [this](int) {
3efd9988 1521 sync_timeout();
9f95a23c 1522 }});
7c673cae
FG
1523}
1524
1525void Monitor::sync_finish(version_t last_committed)
1526{
1527 dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1528
11fdf7f2 1529 ceph_assert(g_conf()->mon_sync_requester_kill_at != 7);
7c673cae
FG
1530
1531 if (sync_full) {
1532 // finalize the paxos commits
1533 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1534 paxos->read_and_prepare_transactions(tx, sync_start_version,
1535 last_committed);
1536 tx->put(paxos->get_name(), "last_committed", last_committed);
1537
1538 dout(30) << __func__ << " final tx dump:\n";
1539 JSONFormatter f(true);
1540 tx->dump(&f);
1541 f.flush(*_dout);
1542 *_dout << dendl;
1543
1544 store->apply_transaction(tx);
1545 }
1546
11fdf7f2 1547 ceph_assert(g_conf()->mon_sync_requester_kill_at != 8);
7c673cae
FG
1548
1549 auto t(std::make_shared<MonitorDBStore::Transaction>());
1550 t->erase("mon_sync", "in_sync");
1551 t->erase("mon_sync", "force_sync");
1552 t->erase("mon_sync", "last_committed_floor");
1553 store->apply_transaction(t);
1554
11fdf7f2 1555 ceph_assert(g_conf()->mon_sync_requester_kill_at != 9);
7c673cae
FG
1556
1557 init_paxos();
1558
11fdf7f2 1559 ceph_assert(g_conf()->mon_sync_requester_kill_at != 10);
7c673cae
FG
1560
1561 bootstrap();
1562}
1563
1564void Monitor::handle_sync(MonOpRequestRef op)
1565{
9f95a23c 1566 auto m = op->get_req<MMonSync>();
7c673cae
FG
1567 dout(10) << __func__ << " " << *m << dendl;
1568 switch (m->op) {
1569
1570 // provider ---------
1571
1572 case MMonSync::OP_GET_COOKIE_FULL:
1573 case MMonSync::OP_GET_COOKIE_RECENT:
1574 handle_sync_get_cookie(op);
1575 break;
1576 case MMonSync::OP_GET_CHUNK:
1577 handle_sync_get_chunk(op);
1578 break;
1579
1580 // client -----------
1581
1582 case MMonSync::OP_COOKIE:
1583 handle_sync_cookie(op);
1584 break;
1585
1586 case MMonSync::OP_CHUNK:
1587 case MMonSync::OP_LAST_CHUNK:
1588 handle_sync_chunk(op);
1589 break;
1590 case MMonSync::OP_NO_COOKIE:
1591 handle_sync_no_cookie(op);
1592 break;
1593
1594 default:
1595 dout(0) << __func__ << " unknown op " << m->op << dendl;
11fdf7f2 1596 ceph_abort_msg("unknown op");
7c673cae
FG
1597 }
1598}
1599
1600// leader
1601
1602void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1603{
9f95a23c 1604 auto m = op->get_req<MMonSync>();
7c673cae
FG
1605 MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1606 m->get_connection()->send_message(reply);
1607}
1608
1609void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1610{
9f95a23c 1611 auto m = op->get_req<MMonSync>();
7c673cae
FG
1612 if (is_synchronizing()) {
1613 _sync_reply_no_cookie(op);
1614 return;
1615 }
1616
11fdf7f2 1617 ceph_assert(g_conf()->mon_sync_provider_kill_at != 1);
7c673cae
FG
1618
1619 // make sure they can understand us.
1620 if ((required_features ^ m->get_connection()->get_features()) &
1621 required_features) {
1622 dout(5) << " ignoring peer mon." << m->get_source().num()
1623 << " has features " << std::hex
1624 << m->get_connection()->get_features()
1625 << " but we require " << required_features << std::dec << dendl;
1626 return;
1627 }
1628
1629 // make up a unique cookie. include election epoch (which persists
1630 // across restarts for the whole cluster) and a counter for this
1631 // process instance. there is no need to be unique *across*
1632 // monitors, though.
1633 uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
11fdf7f2 1634 ceph_assert(sync_providers.count(cookie) == 0);
7c673cae
FG
1635
1636 dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1637
1638 SyncProvider& sp = sync_providers[cookie];
1639 sp.cookie = cookie;
11fdf7f2
TL
1640 sp.addrs = m->get_source_addrs();
1641 sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
7c673cae
FG
1642
1643 set<string> sync_targets;
1644 if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1645 // full scan
1646 sync_targets = get_sync_targets_names();
1647 sp.last_committed = paxos->get_version();
1648 sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1649 sp.full = true;
1650 dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1651 } else {
1652 // just catch up paxos
1653 sp.last_committed = m->last_committed;
1654 }
1655 dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1656
1657 MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1658 reply->last_committed = sp.last_committed;
1659 m->get_connection()->send_message(reply);
1660}
1661
1662void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1663{
9f95a23c 1664 auto m = op->get_req<MMonSync>();
7c673cae
FG
1665 dout(10) << __func__ << " " << *m << dendl;
1666
1667 if (sync_providers.count(m->cookie) == 0) {
1668 dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1669 _sync_reply_no_cookie(op);
1670 return;
1671 }
1672
11fdf7f2 1673 ceph_assert(g_conf()->mon_sync_provider_kill_at != 2);
7c673cae
FG
1674
1675 SyncProvider& sp = sync_providers[m->cookie];
11fdf7f2 1676 sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
7c673cae
FG
1677
1678 if (sp.last_committed < paxos->get_first_committed() &&
1679 paxos->get_first_committed() > 1) {
1680 dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1681 << " < our fc " << paxos->get_first_committed() << dendl;
1682 sync_providers.erase(m->cookie);
1683 _sync_reply_no_cookie(op);
1684 return;
1685 }
1686
1687 MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1688 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1689
9f95a23c
TL
1690 int bytes_left = g_conf()->mon_sync_max_payload_size;
1691 int keys_left = g_conf()->mon_sync_max_payload_keys;
1692 while (sp.last_committed < paxos->get_version() &&
1693 bytes_left > 0 &&
1694 keys_left > 0) {
7c673cae
FG
1695 bufferlist bl;
1696 sp.last_committed++;
224ce89b
WB
1697
1698 int err = store->get(paxos->get_name(), sp.last_committed, bl);
11fdf7f2 1699 ceph_assert(err == 0);
224ce89b 1700
7c673cae 1701 tx->put(paxos->get_name(), sp.last_committed, bl);
9f95a23c
TL
1702 bytes_left -= bl.length();
1703 --keys_left;
7c673cae
FG
1704 dout(20) << __func__ << " including paxos state " << sp.last_committed
1705 << dendl;
1706 }
1707 reply->last_committed = sp.last_committed;
1708
9f95a23c
TL
1709 if (sp.full && bytes_left > 0 && keys_left > 0) {
1710 sp.synchronizer->get_chunk_tx(tx, bytes_left, keys_left);
7c673cae
FG
1711 sp.last_key = sp.synchronizer->get_last_key();
1712 reply->last_key = sp.last_key;
1713 }
1714
1715 if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1716 sp.last_committed < paxos->get_version()) {
1717 dout(10) << __func__ << " chunk, through version " << sp.last_committed
1718 << " key " << sp.last_key << dendl;
1719 } else {
1720 dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1721 << " key " << sp.last_key << dendl;
1722 reply->op = MMonSync::OP_LAST_CHUNK;
1723
11fdf7f2 1724 ceph_assert(g_conf()->mon_sync_provider_kill_at != 3);
7c673cae
FG
1725
1726 // clean up our local state
1727 sync_providers.erase(sp.cookie);
1728 }
1729
11fdf7f2 1730 encode(*tx, reply->chunk_bl);
7c673cae
FG
1731
1732 m->get_connection()->send_message(reply);
1733}
1734
1735// requester
1736
1737void Monitor::handle_sync_cookie(MonOpRequestRef op)
1738{
9f95a23c 1739 auto m = op->get_req<MMonSync>();
7c673cae
FG
1740 dout(10) << __func__ << " " << *m << dendl;
1741 if (sync_cookie) {
1742 dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1743 return;
1744 }
11fdf7f2 1745 if (m->get_source_addrs() != sync_provider) {
7c673cae
FG
1746 dout(10) << __func__ << " source does not match, discarding" << dendl;
1747 return;
1748 }
1749 sync_cookie = m->cookie;
1750 sync_start_version = m->last_committed;
1751
1752 sync_reset_timeout();
1753 sync_get_next_chunk();
1754
11fdf7f2 1755 ceph_assert(g_conf()->mon_sync_requester_kill_at != 3);
7c673cae
FG
1756}
1757
1758void Monitor::sync_get_next_chunk()
1759{
1760 dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
11fdf7f2
TL
1761 if (g_conf()->mon_inject_sync_get_chunk_delay > 0) {
1762 dout(20) << __func__ << " injecting delay of " << g_conf()->mon_inject_sync_get_chunk_delay << dendl;
1763 usleep((long long)(g_conf()->mon_inject_sync_get_chunk_delay * 1000000.0));
7c673cae
FG
1764 }
1765 MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
11fdf7f2 1766 messenger->send_to_mon(r, sync_provider);
7c673cae 1767
11fdf7f2 1768 ceph_assert(g_conf()->mon_sync_requester_kill_at != 4);
7c673cae
FG
1769}
1770
1771void Monitor::handle_sync_chunk(MonOpRequestRef op)
1772{
9f95a23c 1773 auto m = op->get_req<MMonSync>();
7c673cae
FG
1774 dout(10) << __func__ << " " << *m << dendl;
1775
1776 if (m->cookie != sync_cookie) {
1777 dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1778 return;
1779 }
11fdf7f2 1780 if (m->get_source_addrs() != sync_provider) {
7c673cae
FG
1781 dout(10) << __func__ << " source does not match, discarding" << dendl;
1782 return;
1783 }
1784
11fdf7f2
TL
1785 ceph_assert(state == STATE_SYNCHRONIZING);
1786 ceph_assert(g_conf()->mon_sync_requester_kill_at != 5);
7c673cae
FG
1787
1788 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1789 tx->append_from_encoded(m->chunk_bl);
1790
1791 dout(30) << __func__ << " tx dump:\n";
1792 JSONFormatter f(true);
1793 tx->dump(&f);
1794 f.flush(*_dout);
1795 *_dout << dendl;
1796
1797 store->apply_transaction(tx);
1798
11fdf7f2 1799 ceph_assert(g_conf()->mon_sync_requester_kill_at != 6);
7c673cae
FG
1800
1801 if (!sync_full) {
1802 dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1803 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1804 paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1805 m->last_committed);
1806 tx->put(paxos->get_name(), "last_committed", m->last_committed);
1807
1808 dout(30) << __func__ << " tx dump:\n";
1809 JSONFormatter f(true);
1810 tx->dump(&f);
1811 f.flush(*_dout);
1812 *_dout << dendl;
1813
1814 store->apply_transaction(tx);
1815 paxos->init(); // to refresh what we just wrote
1816 }
1817
1818 if (m->op == MMonSync::OP_CHUNK) {
1819 sync_reset_timeout();
1820 sync_get_next_chunk();
1821 } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1822 sync_finish(m->last_committed);
1823 }
1824}
1825
1826void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1827{
1828 dout(10) << __func__ << dendl;
1829 bootstrap();
1830}
1831
1832void Monitor::sync_trim_providers()
1833{
1834 dout(20) << __func__ << dendl;
1835
1836 utime_t now = ceph_clock_now();
1837 map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1838 while (p != sync_providers.end()) {
1839 if (now > p->second.timeout) {
11fdf7f2
TL
1840 dout(10) << __func__ << " expiring cookie " << p->second.cookie
1841 << " for " << p->second.addrs << dendl;
7c673cae
FG
1842 sync_providers.erase(p++);
1843 } else {
1844 ++p;
1845 }
1846 }
1847}
1848
1849// ---------------------------------------------------
1850// probe
1851
1852void Monitor::cancel_probe_timeout()
1853{
1854 if (probe_timeout_event) {
1855 dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1856 timer.cancel_event(probe_timeout_event);
1857 probe_timeout_event = NULL;
1858 } else {
1859 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1860 }
1861}
1862
1863void Monitor::reset_probe_timeout()
1864{
1865 cancel_probe_timeout();
9f95a23c 1866 probe_timeout_event = new C_MonContext{this, [this](int r) {
7c673cae 1867 probe_timeout(r);
9f95a23c 1868 }};
11fdf7f2 1869 double t = g_conf()->mon_probe_timeout;
3efd9988
FG
1870 if (timer.add_event_after(t, probe_timeout_event)) {
1871 dout(10) << "reset_probe_timeout " << probe_timeout_event
1872 << " after " << t << " seconds" << dendl;
1873 } else {
1874 probe_timeout_event = nullptr;
1875 }
7c673cae
FG
1876}
1877
1878void Monitor::probe_timeout(int r)
1879{
1880 dout(4) << "probe_timeout " << probe_timeout_event << dendl;
11fdf7f2
TL
1881 ceph_assert(is_probing() || is_synchronizing());
1882 ceph_assert(probe_timeout_event);
7c673cae
FG
1883 probe_timeout_event = NULL;
1884 bootstrap();
1885}
1886
1887void Monitor::handle_probe(MonOpRequestRef op)
1888{
9f95a23c 1889 auto m = op->get_req<MMonProbe>();
7c673cae
FG
1890 dout(10) << "handle_probe " << *m << dendl;
1891
1892 if (m->fsid != monmap->fsid) {
1893 dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1894 return;
1895 }
1896
1897 switch (m->op) {
1898 case MMonProbe::OP_PROBE:
1899 handle_probe_probe(op);
1900 break;
1901
1902 case MMonProbe::OP_REPLY:
1903 handle_probe_reply(op);
1904 break;
1905
1906 case MMonProbe::OP_MISSING_FEATURES:
81eedcae
TL
1907 derr << __func__ << " require release " << (int)m->mon_release << " > "
1908 << (int)ceph_release()
1909 << ", or missing features (have " << CEPH_FEATURES_ALL
7c673cae 1910 << ", required " << m->required_features
11fdf7f2 1911 << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL) << ")"
7c673cae
FG
1912 << dendl;
1913 break;
1914 }
1915}
1916
1917void Monitor::handle_probe_probe(MonOpRequestRef op)
1918{
9f95a23c 1919 auto m = op->get_req<MMonProbe>();
7c673cae
FG
1920
1921 dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
1922 << " features " << m->get_connection()->get_features() << dendl;
1923 uint64_t missing = required_features & ~m->get_connection()->get_features();
9f95a23c
TL
1924 if ((m->mon_release != ceph_release_t::unknown &&
1925 m->mon_release < monmap->min_mon_release) ||
81eedcae
TL
1926 missing) {
1927 dout(1) << " peer " << m->get_source_addr()
9f95a23c
TL
1928 << " release " << m->mon_release
1929 << " < min_mon_release " << monmap->min_mon_release
11fdf7f2
TL
1930 << ", or missing features " << missing << dendl;
1931 MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1932 name, has_ever_joined, monmap->min_mon_release);
1933 m->required_features = required_features;
1934 m->get_connection()->send_message(r);
7c673cae
FG
1935 goto out;
1936 }
1937
1938 if (!is_probing() && !is_synchronizing()) {
1939 // If the probing mon is way ahead of us, we need to re-bootstrap.
1940 // Normally we capture this case when we initially bootstrap, but
1941 // it is possible we pass those checks (we overlap with
1942 // quorum-to-be) but fail to join a quorum before it moves past
1943 // us. We need to be kicked back to bootstrap so we can
1944 // synchonize, not keep calling elections.
1945 if (paxos->get_version() + 1 < m->paxos_first_version) {
1946 dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1947 << "ahead of us, re-bootstrapping" << dendl;
1948 bootstrap();
1949 goto out;
1950
1951 }
1952 }
1953
1954 MMonProbe *r;
11fdf7f2
TL
1955 r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined,
1956 ceph_release());
7c673cae
FG
1957 r->name = name;
1958 r->quorum = quorum;
b3b6e05e 1959 r->leader = leader;
7c673cae
FG
1960 monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1961 r->paxos_first_version = paxos->get_first_committed();
1962 r->paxos_last_version = paxos->get_version();
1963 m->get_connection()->send_message(r);
1964
1965 // did we discover a peer here?
1966 if (!monmap->contains(m->get_source_addr())) {
11fdf7f2 1967 dout(1) << " adding peer " << m->get_source_addrs()
7c673cae 1968 << " to list of hints" << dendl;
11fdf7f2 1969 extra_probe_peers.insert(m->get_source_addrs());
f67539c2
TL
1970 } else {
1971 elector.begin_peer_ping(monmap->get_rank(m->get_source_addr()));
7c673cae
FG
1972 }
1973
1974 out:
1975 return;
1976}
1977
1978void Monitor::handle_probe_reply(MonOpRequestRef op)
1979{
9f95a23c 1980 auto m = op->get_req<MMonProbe>();
11fdf7f2
TL
1981 dout(10) << "handle_probe_reply " << m->get_source_inst()
1982 << " " << *m << dendl;
7c673cae
FG
1983 dout(10) << " monmap is " << *monmap << dendl;
1984
1985 // discover name and addrs during probing or electing states.
1986 if (!is_probing() && !is_electing()) {
1987 return;
1988 }
1989
1990 // newer map, or they've joined a quorum and we haven't?
1991 bufferlist mybl;
1992 monmap->encode(mybl, m->get_connection()->get_features());
1993 // make sure it's actually different; the checks below err toward
1994 // taking the other guy's map, which could cause us to loop.
1995 if (!mybl.contents_equal(m->monmap_bl)) {
1996 MonMap *newmap = new MonMap;
1997 newmap->decode(m->monmap_bl);
1998 if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
1999 !has_ever_joined)) {
2000 dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
2001 << ", mine was " << monmap->get_epoch() << dendl;
2002 delete newmap;
2003 monmap->decode(m->monmap_bl);
b3b6e05e 2004 notify_new_monmap(false);
7c673cae
FG
2005
2006 bootstrap();
2007 return;
2008 }
2009 delete newmap;
2010 }
2011
2012 // rename peer?
2013 string peer_name = monmap->get_name(m->get_source_addr());
2014 if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
2015 dout(10) << " renaming peer " << m->get_source_addr() << " "
2016 << peer_name << " -> " << m->name << " in my monmap"
2017 << dendl;
2018 monmap->rename(peer_name, m->name);
2019
2020 if (is_electing()) {
2021 bootstrap();
2022 return;
2023 }
11fdf7f2 2024 } else if (peer_name.size()) {
7c673cae 2025 dout(10) << " peer name is " << peer_name << dendl;
11fdf7f2
TL
2026 } else {
2027 dout(10) << " peer " << m->get_source_addr() << " not in map" << dendl;
7c673cae
FG
2028 }
2029
2030 // new initial peer?
2031 if (monmap->get_epoch() == 0 &&
2032 monmap->contains(m->name) &&
11fdf7f2
TL
2033 monmap->get_addrs(m->name).front().is_blank_ip()) {
2034 dout(1) << " learned initial mon " << m->name
2035 << " addrs " << m->get_source_addrs() << dendl;
2036 monmap->set_addrvec(m->name, m->get_source_addrs());
7c673cae
FG
2037
2038 bootstrap();
2039 return;
2040 }
2041
2042 // end discover phase
2043 if (!is_probing()) {
2044 return;
2045 }
2046
11fdf7f2 2047 ceph_assert(paxos != NULL);
7c673cae
FG
2048
2049 if (is_synchronizing()) {
2050 dout(10) << " currently syncing" << dendl;
2051 return;
2052 }
2053
11fdf7f2 2054 entity_addrvec_t other = m->get_source_addrs();
7c673cae
FG
2055
2056 if (m->paxos_last_version < sync_last_committed_floor) {
2057 dout(10) << " peer paxos versions [" << m->paxos_first_version
2058 << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
2059 << sync_last_committed_floor << ", ignoring"
2060 << dendl;
2061 } else {
2062 if (paxos->get_version() < m->paxos_first_version &&
2063 m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
2064 dout(10) << " peer paxos first versions [" << m->paxos_first_version
2065 << "," << m->paxos_last_version << "]"
2066 << " vs my version " << paxos->get_version()
2067 << " (too far ahead)"
2068 << dendl;
2069 cancel_probe_timeout();
2070 sync_start(other, true);
2071 return;
2072 }
11fdf7f2 2073 if (paxos->get_version() + g_conf()->paxos_max_join_drift < m->paxos_last_version) {
7c673cae
FG
2074 dout(10) << " peer paxos last version " << m->paxos_last_version
2075 << " vs my version " << paxos->get_version()
2076 << " (too far ahead)"
2077 << dendl;
2078 cancel_probe_timeout();
2079 sync_start(other, false);
2080 return;
2081 }
2082 }
2083
11fdf7f2
TL
2084 // did the existing cluster complete upgrade to luminous?
2085 if (osdmon()->osdmap.get_epoch()) {
9f95a23c 2086 if (osdmon()->osdmap.require_osd_release < ceph_release_t::luminous) {
11fdf7f2
TL
2087 derr << __func__ << " existing cluster has not completed upgrade to"
2088 << " luminous; 'ceph osd require_osd_release luminous' before"
2089 << " upgrading" << dendl;
2090 exit(0);
2091 }
2092 if (!osdmon()->osdmap.test_flag(CEPH_OSDMAP_PURGED_SNAPDIRS) ||
2093 !osdmon()->osdmap.test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) {
2094 derr << __func__ << " existing cluster has not completed a full luminous"
2095 << " scrub to purge legacy snapdir objects; please scrub before"
2096 << " upgrading beyond luminous." << dendl;
2097 exit(0);
2098 }
2099 }
2100
7c673cae
FG
2101 // is there an existing quorum?
2102 if (m->quorum.size()) {
2103 dout(10) << " existing quorum " << m->quorum << dendl;
2104
2105 dout(10) << " peer paxos version " << m->paxos_last_version
2106 << " vs my version " << paxos->get_version()
2107 << " (ok)"
2108 << dendl;
b3b6e05e
TL
2109 bool in_map = false;
2110 const auto my_info = monmap->mon_info.find(name);
2111 const map<string,string> *map_crush_loc{nullptr};
2112 if (my_info != monmap->mon_info.end()) {
2113 in_map = true;
2114 map_crush_loc = &my_info->second.crush_loc;
2115 }
2116 if (in_map &&
2117 !monmap->get_addrs(name).front().is_blank_ip() &&
2118 (!need_set_crush_loc || (*map_crush_loc == crush_loc))) {
7c673cae
FG
2119 // i'm part of the cluster; just initiate a new election
2120 start_election();
2121 } else {
b3b6e05e
TL
2122 dout(10) << " ready to join, but i'm not in the monmap/"
2123 "my addr is blank/location is wrong, trying to join" << dendl;
2124 send_mon_message(new MMonJoin(monmap->fsid, name,
2125 messenger->get_myaddrs(), crush_loc,
2126 need_set_crush_loc),
2127 m->leader);
7c673cae
FG
2128 }
2129 } else {
2130 if (monmap->contains(m->name)) {
2131 dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
2132 outside_quorum.insert(m->name);
2133 } else {
2134 dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
2135 return;
2136 }
2137
11fdf7f2 2138 unsigned need = monmap->min_quorum_size();
7c673cae
FG
2139 dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
2140 if (outside_quorum.size() >= need) {
2141 if (outside_quorum.count(name)) {
2142 dout(10) << " that's enough to form a new quorum, calling election" << dendl;
2143 start_election();
2144 } else {
2145 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
2146 }
2147 } else {
2148 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
2149 }
2150 }
2151}
2152
2153void Monitor::join_election()
2154{
2155 dout(10) << __func__ << dendl;
2156 wait_for_paxos_write();
2157 _reset();
2158 state = STATE_ELECTING;
2159
2160 logger->inc(l_mon_num_elections);
2161}
2162
2163void Monitor::start_election()
2164{
2165 dout(10) << "start_election" << dendl;
2166 wait_for_paxos_write();
2167 _reset();
2168 state = STATE_ELECTING;
2169
2170 logger->inc(l_mon_num_elections);
2171 logger->inc(l_mon_election_call);
2172
b32b8144 2173 clog->info() << "mon." << name << " calling monitor election";
7c673cae
FG
2174 elector.call_election();
2175}
2176
2177void Monitor::win_standalone_election()
2178{
2179 dout(1) << "win_standalone_election" << dendl;
2180
2181 // bump election epoch, in case the previous epoch included other
2182 // monitors; we need to be able to make the distinction.
9f95a23c 2183 elector.declare_standalone_victory();
7c673cae
FG
2184
2185 rank = monmap->get_rank(name);
11fdf7f2 2186 ceph_assert(rank == 0);
7c673cae
FG
2187 set<int> q;
2188 q.insert(rank);
2189
224ce89b
WB
2190 map<int,Metadata> metadata;
2191 collect_metadata(&metadata[0]);
2192
7c673cae
FG
2193 win_election(elector.get_epoch(), q,
2194 CEPH_FEATURES_ALL,
2195 ceph::features::mon::get_supported(),
11fdf7f2 2196 ceph_release(),
d2e6a577 2197 metadata);
7c673cae
FG
2198}
2199
2200const utime_t& Monitor::get_leader_since() const
2201{
11fdf7f2 2202 ceph_assert(state == STATE_LEADER);
7c673cae
FG
2203 return leader_since;
2204}
2205
2206epoch_t Monitor::get_epoch()
2207{
2208 return elector.get_epoch();
2209}
2210
2211void Monitor::_finish_svc_election()
2212{
11fdf7f2 2213 ceph_assert(state == STATE_LEADER || state == STATE_PEON);
7c673cae 2214
11fdf7f2 2215 for (auto& svc : paxos_service) {
7c673cae 2216 // we already called election_finished() on monmon(); avoid callig twice
11fdf7f2 2217 if (state == STATE_LEADER && svc.get() == monmon())
7c673cae 2218 continue;
11fdf7f2 2219 svc->election_finished();
7c673cae
FG
2220 }
2221}
2222
9f95a23c 2223void Monitor::win_election(epoch_t epoch, const set<int>& active, uint64_t features,
7c673cae 2224 const mon_feature_t& mon_features,
9f95a23c 2225 ceph_release_t min_mon_release,
d2e6a577 2226 const map<int,Metadata>& metadata)
7c673cae
FG
2227{
2228 dout(10) << __func__ << " epoch " << epoch << " quorum " << active
2229 << " features " << features
2230 << " mon_features " << mon_features
11fdf7f2 2231 << " min_mon_release " << min_mon_release
7c673cae 2232 << dendl;
11fdf7f2 2233 ceph_assert(is_electing());
7c673cae
FG
2234 state = STATE_LEADER;
2235 leader_since = ceph_clock_now();
11fdf7f2 2236 quorum_since = mono_clock::now();
7c673cae
FG
2237 leader = rank;
2238 quorum = active;
2239 quorum_con_features = features;
2240 quorum_mon_features = mon_features;
11fdf7f2 2241 quorum_min_mon_release = min_mon_release;
224ce89b 2242 pending_metadata = metadata;
7c673cae
FG
2243 outside_quorum.clear();
2244
b32b8144
FG
2245 clog->info() << "mon." << name << " is new leader, mons " << get_quorum_names()
2246 << " in quorum (ranks " << quorum << ")";
7c673cae 2247
d2e6a577 2248 set_leader_commands(get_local_commands(mon_features));
7c673cae
FG
2249
2250 paxos->leader_init();
2251 // NOTE: tell monmap monitor first. This is important for the
2252 // bootstrap case to ensure that the very first paxos proposal
2253 // codifies the monmap. Otherwise any manner of chaos can ensue
2254 // when monitors are call elections or participating in a paxos
2255 // round without agreeing on who the participants are.
2256 monmon()->election_finished();
2257 _finish_svc_election();
7c673cae
FG
2258
2259 logger->inc(l_mon_election_win);
2260
224ce89b
WB
2261 // inject new metadata in first transaction.
2262 {
2263 // include previous metadata for missing mons (that aren't part of
2264 // the current quorum).
2265 map<int,Metadata> m = metadata;
2266 for (unsigned rank = 0; rank < monmap->size(); ++rank) {
2267 if (m.count(rank) == 0 &&
2268 mon_metadata.count(rank)) {
2269 m[rank] = mon_metadata[rank];
2270 }
2271 }
2272
2273 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
2274 // a new transaction immediately after the election finishes. We should
2275 // do that anyway for other reasons, though.
2276 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
2277 bufferlist bl;
11fdf7f2 2278 encode(m, bl);
224ce89b
WB
2279 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
2280 }
2281
7c673cae
FG
2282 finish_election();
2283 if (monmap->size() > 1 &&
2284 monmap->get_epoch() > 0) {
2285 timecheck_start();
2286 health_tick_start();
b32b8144
FG
2287
2288 // Freshen the health status before doing health_to_clog in case
2289 // our just-completed election changed the health
9f95a23c 2290 healthmon()->wait_for_active_ctx(new LambdaContext([this](int r){
b32b8144
FG
2291 dout(20) << "healthmon now active" << dendl;
2292 healthmon()->tick();
2293 if (healthmon()->is_proposing()) {
2294 dout(20) << __func__ << " healthmon proposing, waiting" << dendl;
9f95a23c 2295 healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext{this,
b32b8144 2296 [this](int r){
9f95a23c 2297 ceph_assert(ceph_mutex_is_locked_by_me(lock));
b32b8144 2298 do_health_to_clog_interval();
9f95a23c 2299 }});
b32b8144
FG
2300
2301 } else {
2302 do_health_to_clog_interval();
2303 }
2304 }));
2305
7c673cae
FG
2306 scrub_event_start();
2307 }
7c673cae
FG
2308}
2309
2310void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
2311 uint64_t features,
11fdf7f2 2312 const mon_feature_t& mon_features,
9f95a23c 2313 ceph_release_t min_mon_release)
7c673cae
FG
2314{
2315 state = STATE_PEON;
2316 leader_since = utime_t();
11fdf7f2 2317 quorum_since = mono_clock::now();
7c673cae
FG
2318 leader = l;
2319 quorum = q;
2320 outside_quorum.clear();
2321 quorum_con_features = features;
2322 quorum_mon_features = mon_features;
11fdf7f2 2323 quorum_min_mon_release = min_mon_release;
7c673cae
FG
2324 dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
2325 << " quorum is " << quorum << " features are " << quorum_con_features
2326 << " mon_features are " << quorum_mon_features
11fdf7f2 2327 << " min_mon_release " << min_mon_release
7c673cae
FG
2328 << dendl;
2329
2330 paxos->peon_init();
2331 _finish_svc_election();
7c673cae
FG
2332
2333 logger->inc(l_mon_election_lose);
2334
2335 finish_election();
11fdf7f2 2336}
7c673cae 2337
11fdf7f2
TL
2338namespace {
2339std::string collect_compression_algorithms()
2340{
2341 ostringstream os;
2342 bool printed = false;
2343 for (auto [name, key] : Compressor::compression_algorithms) {
2344 if (printed) {
2345 os << ", ";
2346 } else {
2347 printed = true;
2348 }
2349 std::ignore = key;
2350 os << name;
7c673cae 2351 }
11fdf7f2
TL
2352 return os.str();
2353}
7c673cae
FG
2354}
2355
224ce89b
WB
2356void Monitor::collect_metadata(Metadata *m)
2357{
2358 collect_sys_info(m, g_ceph_context);
11fdf7f2
TL
2359 (*m)["addrs"] = stringify(messenger->get_myaddrs());
2360 (*m)["compression_algorithms"] = collect_compression_algorithms();
2361
2362 // infer storage device
2363 string devname = store->get_devname();
2364 set<string> devnames;
2365 get_raw_devices(devname, &devnames);
9f95a23c
TL
2366 map<string,string> errs;
2367 get_device_metadata(devnames, m, &errs);
2368 for (auto& i : errs) {
2369 dout(1) << __func__ << " " << i.first << ": " << i.second << dendl;
11fdf7f2 2370 }
224ce89b
WB
2371}
2372
7c673cae
FG
2373void Monitor::finish_election()
2374{
2375 apply_quorum_to_compatset_features();
2376 apply_monmap_to_compatset_features();
2377 timecheck_finish();
2378 exited_quorum = utime_t();
2379 finish_contexts(g_ceph_context, waitfor_quorum);
2380 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2381 resend_routed_requests();
2382 update_logger();
2383 register_cluster_logger();
2384
11fdf7f2
TL
2385 // enable authentication
2386 {
2387 std::lock_guard l(auth_lock);
2388 authmon()->_set_mon_num_rank(monmap->size(), rank);
2389 }
2390
b3b6e05e 2391 // am i named and located properly?
11fdf7f2 2392 string cur_name = monmap->get_name(messenger->get_myaddrs());
b3b6e05e
TL
2393 const auto my_infop = monmap->mon_info.find(cur_name);
2394 const map<string,string>& map_crush_loc = my_infop->second.crush_loc;
2395
2396 if (cur_name != name ||
2397 (need_set_crush_loc && map_crush_loc != crush_loc)) {
2398 dout(10) << " renaming/moving myself from " << cur_name << "/"
2399 << map_crush_loc <<" -> " << name << "/" << crush_loc << dendl;
2400 send_mon_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddrs(),
2401 crush_loc, need_set_crush_loc),
2402 leader);
f67539c2 2403 return;
7c673cae 2404 }
f67539c2 2405 do_stretch_mode_election_work();
7c673cae
FG
2406}
2407
2408void Monitor::_apply_compatset_features(CompatSet &new_features)
2409{
2410 if (new_features.compare(features) != 0) {
2411 CompatSet diff = features.unsupported(new_features);
2412 dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2413 features = new_features;
2414
2415 auto t = std::make_shared<MonitorDBStore::Transaction>();
2416 write_features(t);
2417 store->apply_transaction(t);
2418
2419 calc_quorum_requirements();
2420 }
2421}
2422
2423void Monitor::apply_quorum_to_compatset_features()
2424{
2425 CompatSet new_features(features);
11fdf7f2 2426 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
7c673cae
FG
2427 if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2428 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2429 }
11fdf7f2
TL
2430 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2431 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
7c673cae
FG
2432 dout(5) << __func__ << dendl;
2433 _apply_compatset_features(new_features);
2434}
2435
2436void Monitor::apply_monmap_to_compatset_features()
2437{
2438 CompatSet new_features(features);
2439 mon_feature_t monmap_features = monmap->get_required_features();
2440
2441 /* persistent monmap features may go into the compatset.
2442 * optional monmap features may not - why?
2443 * because optional monmap features may be set/unset by the admin,
2444 * and possibly by other means that haven't yet been thought out,
2445 * so we can't make the monitor enforce them on start - because they
2446 * may go away.
2447 * this, of course, does not invalidate setting a compatset feature
2448 * for an optional feature - as long as you make sure to clean it up
2449 * once you unset it.
2450 */
2451 if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
11fdf7f2 2452 ceph_assert(ceph::features::mon::get_persistent().contains_all(
7c673cae
FG
2453 ceph::features::mon::FEATURE_KRAKEN));
2454 // this feature should only ever be set if the quorum supports it.
11fdf7f2 2455 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
7c673cae
FG
2456 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2457 }
181888fb 2458 if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
11fdf7f2 2459 ceph_assert(ceph::features::mon::get_persistent().contains_all(
181888fb
FG
2460 ceph::features::mon::FEATURE_LUMINOUS));
2461 // this feature should only ever be set if the quorum supports it.
11fdf7f2 2462 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
181888fb
FG
2463 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
2464 }
11fdf7f2
TL
2465 if (monmap_features.contains_all(ceph::features::mon::FEATURE_MIMIC)) {
2466 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2467 ceph::features::mon::FEATURE_MIMIC));
2468 // this feature should only ever be set if the quorum supports it.
2469 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_MIMIC));
2470 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
2471 }
2472 if (monmap_features.contains_all(ceph::features::mon::FEATURE_NAUTILUS)) {
2473 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2474 ceph::features::mon::FEATURE_NAUTILUS));
2475 // this feature should only ever be set if the quorum supports it.
2476 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_NAUTILUS));
2477 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
2478 }
9f95a23c
TL
2479 if (monmap_features.contains_all(ceph::features::mon::FEATURE_OCTOPUS)) {
2480 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2481 ceph::features::mon::FEATURE_OCTOPUS));
2482 // this feature should only ever be set if the quorum supports it.
2483 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_OCTOPUS));
2484 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS);
2485 }
f67539c2
TL
2486 if (monmap_features.contains_all(ceph::features::mon::FEATURE_PACIFIC)) {
2487 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2488 ceph::features::mon::FEATURE_PACIFIC));
2489 // this feature should only ever be set if the quorum supports it.
2490 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_PACIFIC));
2491 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_PACIFIC);
2492 }
7c673cae
FG
2493
2494 dout(5) << __func__ << dendl;
2495 _apply_compatset_features(new_features);
2496}
2497
2498void Monitor::calc_quorum_requirements()
2499{
2500 required_features = 0;
2501
2502 // compatset
7c673cae
FG
2503 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2504 required_features |= CEPH_FEATURE_OSDMAP_ENC;
2505 }
7c673cae
FG
2506 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2507 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2508 }
181888fb
FG
2509 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) {
2510 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2511 }
11fdf7f2
TL
2512 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_MIMIC)) {
2513 required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
2514 }
2515 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS)) {
2516 required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
2517 CEPH_FEATUREMASK_CEPHX_V2;
2518 }
9f95a23c
TL
2519 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS)) {
2520 required_features |= CEPH_FEATUREMASK_SERVER_OCTOPUS;
2521 }
f67539c2
TL
2522 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_PACIFIC)) {
2523 required_features |= CEPH_FEATUREMASK_SERVER_PACIFIC;
2524 }
7c673cae
FG
2525
2526 // monmap
2527 if (monmap->get_required_features().contains_all(
2528 ceph::features::mon::FEATURE_KRAKEN)) {
2529 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2530 }
2531 if (monmap->get_required_features().contains_all(
2532 ceph::features::mon::FEATURE_LUMINOUS)) {
2533 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2534 }
11fdf7f2
TL
2535 if (monmap->get_required_features().contains_all(
2536 ceph::features::mon::FEATURE_MIMIC)) {
2537 required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
2538 }
2539 if (monmap->get_required_features().contains_all(
2540 ceph::features::mon::FEATURE_NAUTILUS)) {
2541 required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
2542 CEPH_FEATUREMASK_CEPHX_V2;
2543 }
7c673cae
FG
2544 dout(10) << __func__ << " required_features " << required_features << dendl;
2545}
2546
31f18b77
FG
2547void Monitor::get_combined_feature_map(FeatureMap *fm)
2548{
2549 *fm += session_map.feature_map;
2550 for (auto id : quorum) {
2551 if (id != rank) {
2552 *fm += quorum_feature_map[id];
2553 }
2554 }
2555}
2556
9f95a23c 2557void Monitor::sync_force(Formatter *f)
7c673cae 2558{
7c673cae
FG
2559 auto tx(std::make_shared<MonitorDBStore::Transaction>());
2560 sync_stash_critical_state(tx);
2561 tx->put("mon_sync", "force_sync", 1);
2562 store->apply_transaction(tx);
2563
2564 f->open_object_section("sync_force");
2565 f->dump_int("ret", 0);
2566 f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2567 f->close_section(); // sync_force
7c673cae
FG
2568}
2569
2570void Monitor::_quorum_status(Formatter *f, ostream& ss)
2571{
2572 bool free_formatter = false;
2573
2574 if (!f) {
2575 // louzy/lazy hack: default to json if no formatter has been defined
2576 f = new JSONFormatter();
2577 free_formatter = true;
2578 }
2579 f->open_object_section("quorum_status");
2580 f->dump_int("election_epoch", get_epoch());
2581
2582 f->open_array_section("quorum");
2583 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2584 f->dump_int("mon", *p);
2585 f->close_section(); // quorum
2586
2587 list<string> quorum_names = get_quorum_names();
2588 f->open_array_section("quorum_names");
2589 for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2590 f->dump_string("mon", *p);
2591 f->close_section(); // quorum_names
2592
f67539c2 2593 f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(leader));
7c673cae 2594
11fdf7f2
TL
2595 if (!quorum.empty()) {
2596 f->dump_int(
2597 "quorum_age",
2598 std::chrono::duration_cast<std::chrono::seconds>(
2599 mono_clock::now() - quorum_since).count());
2600 }
2601
9f95a23c
TL
2602 f->open_object_section("features");
2603 f->dump_stream("quorum_con") << quorum_con_features;
2604 quorum_mon_features.dump(f, "quorum_mon");
2605 f->close_section();
2606
7c673cae
FG
2607 f->open_object_section("monmap");
2608 monmap->dump(f);
2609 f->close_section(); // monmap
2610
2611 f->close_section(); // quorum_status
2612 f->flush(ss);
2613 if (free_formatter)
2614 delete f;
2615}
2616
9f95a23c 2617void Monitor::get_mon_status(Formatter *f)
7c673cae 2618{
7c673cae
FG
2619 f->open_object_section("mon_status");
2620 f->dump_string("name", name);
2621 f->dump_int("rank", rank);
2622 f->dump_string("state", get_state_name());
2623 f->dump_int("election_epoch", get_epoch());
2624
2625 f->open_array_section("quorum");
2626 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2627 f->dump_int("mon", *p);
2628 }
7c673cae
FG
2629 f->close_section(); // quorum
2630
11fdf7f2
TL
2631 if (!quorum.empty()) {
2632 f->dump_int(
2633 "quorum_age",
2634 std::chrono::duration_cast<std::chrono::seconds>(
2635 mono_clock::now() - quorum_since).count());
2636 }
2637
7c673cae
FG
2638 f->open_object_section("features");
2639 f->dump_stream("required_con") << required_features;
2640 mon_feature_t req_mon_features = get_required_mon_features();
2641 req_mon_features.dump(f, "required_mon");
2642 f->dump_stream("quorum_con") << quorum_con_features;
2643 quorum_mon_features.dump(f, "quorum_mon");
2644 f->close_section(); // features
2645
2646 f->open_array_section("outside_quorum");
2647 for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2648 f->dump_string("mon", *p);
2649 f->close_section(); // outside_quorum
2650
2651 f->open_array_section("extra_probe_peers");
11fdf7f2 2652 for (set<entity_addrvec_t>::iterator p = extra_probe_peers.begin();
7c673cae 2653 p != extra_probe_peers.end();
11fdf7f2
TL
2654 ++p) {
2655 f->dump_object("peer", *p);
2656 }
7c673cae
FG
2657 f->close_section(); // extra_probe_peers
2658
2659 f->open_array_section("sync_provider");
2660 for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2661 p != sync_providers.end();
2662 ++p) {
2663 f->dump_unsigned("cookie", p->second.cookie);
11fdf7f2 2664 f->dump_object("addrs", p->second.addrs);
7c673cae
FG
2665 f->dump_stream("timeout") << p->second.timeout;
2666 f->dump_unsigned("last_committed", p->second.last_committed);
2667 f->dump_stream("last_key") << p->second.last_key;
2668 }
2669 f->close_section();
2670
2671 if (is_synchronizing()) {
2672 f->open_object_section("sync");
2673 f->dump_stream("sync_provider") << sync_provider;
2674 f->dump_unsigned("sync_cookie", sync_cookie);
2675 f->dump_unsigned("sync_start_version", sync_start_version);
2676 f->close_section();
2677 }
2678
11fdf7f2
TL
2679 if (g_conf()->mon_sync_provider_kill_at > 0)
2680 f->dump_int("provider_kill_at", g_conf()->mon_sync_provider_kill_at);
2681 if (g_conf()->mon_sync_requester_kill_at > 0)
2682 f->dump_int("requester_kill_at", g_conf()->mon_sync_requester_kill_at);
7c673cae
FG
2683
2684 f->open_object_section("monmap");
2685 monmap->dump(f);
2686 f->close_section();
2687
31f18b77 2688 f->dump_object("feature_map", session_map.feature_map);
f67539c2 2689 f->dump_bool("stretch_mode", stretch_mode_engaged);
7c673cae 2690 f->close_section(); // mon_status
7c673cae
FG
2691}
2692
2693
2694// health status to clog
2695
2696void Monitor::health_tick_start()
2697{
2698 if (!cct->_conf->mon_health_to_clog ||
2699 cct->_conf->mon_health_to_clog_tick_interval <= 0)
2700 return;
2701
2702 dout(15) << __func__ << dendl;
2703
2704 health_tick_stop();
3efd9988
FG
2705 health_tick_event = timer.add_event_after(
2706 cct->_conf->mon_health_to_clog_tick_interval,
9f95a23c 2707 new C_MonContext{this, [this](int r) {
3efd9988
FG
2708 if (r < 0)
2709 return;
3efd9988 2710 health_tick_start();
9f95a23c 2711 }});
7c673cae
FG
2712}
2713
2714void Monitor::health_tick_stop()
2715{
2716 dout(15) << __func__ << dendl;
2717
2718 if (health_tick_event) {
2719 timer.cancel_event(health_tick_event);
2720 health_tick_event = NULL;
2721 }
2722}
2723
9f95a23c 2724ceph::real_clock::time_point Monitor::health_interval_calc_next_update()
7c673cae 2725{
9f95a23c 2726 auto now = ceph::real_clock::now();
7c673cae 2727
9f95a23c
TL
2728 auto secs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch());
2729 int remainder = secs.count() % cct->_conf->mon_health_to_clog_interval;
7c673cae 2730 int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
9f95a23c 2731 auto next = secs + std::chrono::seconds(adjustment);
7c673cae
FG
2732
2733 dout(20) << __func__
2734 << " now: " << now << ","
2735 << " next: " << next << ","
2736 << " interval: " << cct->_conf->mon_health_to_clog_interval
2737 << dendl;
2738
9f95a23c 2739 return ceph::real_clock::time_point{next};
7c673cae
FG
2740}
2741
2742void Monitor::health_interval_start()
2743{
2744 dout(15) << __func__ << dendl;
2745
2746 if (!cct->_conf->mon_health_to_clog ||
2747 cct->_conf->mon_health_to_clog_interval <= 0) {
2748 return;
2749 }
2750
2751 health_interval_stop();
9f95a23c
TL
2752 auto next = health_interval_calc_next_update();
2753 health_interval_event = new C_MonContext{this, [this](int r) {
7c673cae
FG
2754 if (r < 0)
2755 return;
2756 do_health_to_clog_interval();
9f95a23c 2757 }};
3efd9988
FG
2758 if (!timer.add_event_at(next, health_interval_event)) {
2759 health_interval_event = nullptr;
2760 }
7c673cae
FG
2761}
2762
2763void Monitor::health_interval_stop()
2764{
2765 dout(15) << __func__ << dendl;
2766 if (health_interval_event) {
2767 timer.cancel_event(health_interval_event);
2768 }
2769 health_interval_event = NULL;
2770}
2771
2772void Monitor::health_events_cleanup()
2773{
2774 health_tick_stop();
2775 health_interval_stop();
2776 health_status_cache.reset();
2777}
2778
2779void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2780{
2781 dout(20) << __func__ << dendl;
2782
2783 if (changed.count("mon_health_to_clog")) {
2784 if (!cct->_conf->mon_health_to_clog) {
2785 health_events_cleanup();
11fdf7f2 2786 return;
7c673cae
FG
2787 } else {
2788 if (!health_tick_event) {
2789 health_tick_start();
2790 }
2791 if (!health_interval_event) {
2792 health_interval_start();
2793 }
2794 }
2795 }
2796
2797 if (changed.count("mon_health_to_clog_interval")) {
2798 if (cct->_conf->mon_health_to_clog_interval <= 0) {
2799 health_interval_stop();
2800 } else {
2801 health_interval_start();
2802 }
2803 }
2804
2805 if (changed.count("mon_health_to_clog_tick_interval")) {
2806 if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2807 health_tick_stop();
2808 } else {
2809 health_tick_start();
2810 }
2811 }
2812}
2813
2814void Monitor::do_health_to_clog_interval()
2815{
2816 // outputting to clog may have been disabled in the conf
2817 // since we were scheduled.
2818 if (!cct->_conf->mon_health_to_clog ||
2819 cct->_conf->mon_health_to_clog_interval <= 0)
2820 return;
2821
2822 dout(10) << __func__ << dendl;
2823
2824 // do we have a cached value for next_clog_update? if not,
2825 // do we know when the last update was?
2826
2827 do_health_to_clog(true);
2828 health_interval_start();
2829}
2830
2831void Monitor::do_health_to_clog(bool force)
2832{
2833 // outputting to clog may have been disabled in the conf
2834 // since we were scheduled.
2835 if (!cct->_conf->mon_health_to_clog ||
2836 cct->_conf->mon_health_to_clog_interval <= 0)
2837 return;
2838
2839 dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2840
11fdf7f2 2841 string summary;
9f95a23c 2842 health_status_t level = healthmon()->get_health_status(false, nullptr, &summary);
11fdf7f2
TL
2843 if (!force &&
2844 summary == health_status_cache.summary &&
2845 level == health_status_cache.overall)
2846 return;
f91f0fd5
TL
2847
2848 if (g_conf()->mon_health_detail_to_clog &&
2849 summary != health_status_cache.summary &&
2850 level != HEALTH_OK) {
2851 string details;
2852 level = healthmon()->get_health_status(true, nullptr, &details);
2853 clog->health(level) << "Health detail: " << details;
2854 } else {
2855 clog->health(level) << "overall " << summary;
2856 }
11fdf7f2
TL
2857 health_status_cache.summary = summary;
2858 health_status_cache.overall = level;
224ce89b
WB
2859}
2860
224ce89b
WB
2861void Monitor::log_health(
2862 const health_check_map_t& updated,
2863 const health_check_map_t& previous,
2864 MonitorDBStore::TransactionRef t)
2865{
11fdf7f2 2866 if (!g_conf()->mon_health_to_clog) {
7c673cae
FG
2867 return;
2868 }
181888fb
FG
2869
2870 const utime_t now = ceph_clock_now();
2871
224ce89b
WB
2872 // FIXME: log atomically as part of @t instead of using clog.
2873 dout(10) << __func__ << " updated " << updated.checks.size()
2874 << " previous " << previous.checks.size()
2875 << dendl;
11fdf7f2 2876 const auto min_log_period = g_conf().get_val<int64_t>(
181888fb 2877 "mon_health_log_update_period");
224ce89b
WB
2878 for (auto& p : updated.checks) {
2879 auto q = previous.checks.find(p.first);
181888fb 2880 bool logged = false;
224ce89b
WB
2881 if (q == previous.checks.end()) {
2882 // new
2883 ostringstream ss;
2884 ss << "Health check failed: " << p.second.summary << " ("
2885 << p.first << ")";
181888fb
FG
2886 clog->health(p.second.severity) << ss.str();
2887
2888 logged = true;
224ce89b
WB
2889 } else {
2890 if (p.second.summary != q->second.summary ||
2891 p.second.severity != q->second.severity) {
181888fb
FG
2892
2893 auto status_iter = health_check_log_times.find(p.first);
2894 if (status_iter != health_check_log_times.end()) {
2895 if (p.second.severity == q->second.severity &&
2896 now - status_iter->second.updated_at < min_log_period) {
2897 // We already logged this recently and the severity is unchanged,
2898 // so skip emitting an update of the summary string.
2899 // We'll get an update out of tick() later if the check
2900 // is still failing.
2901 continue;
2902 }
2903 }
2904
2905 // summary or severity changed (ignore detail changes at this level)
2906 ostringstream ss;
224ce89b 2907 ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
181888fb
FG
2908 clog->health(p.second.severity) << ss.str();
2909
2910 logged = true;
2911 }
2912 }
2913 // Record the time at which we last logged, so that we can check this
2914 // when considering whether/when to print update messages.
2915 if (logged) {
2916 auto iter = health_check_log_times.find(p.first);
2917 if (iter == health_check_log_times.end()) {
2918 health_check_log_times.emplace(p.first, HealthCheckLogStatus(
2919 p.second.severity, p.second.summary, now));
2920 } else {
2921 iter->second = HealthCheckLogStatus(
2922 p.second.severity, p.second.summary, now);
224ce89b
WB
2923 }
2924 }
2925 }
2926 for (auto& p : previous.checks) {
2927 if (!updated.checks.count(p.first)) {
2928 // cleared
2929 ostringstream ss;
2930 if (p.first == "DEGRADED_OBJECTS") {
2931 clog->info() << "All degraded objects recovered";
2932 } else if (p.first == "OSD_FLAGS") {
2933 clog->info() << "OSD flags cleared";
2934 } else {
2935 clog->info() << "Health check cleared: " << p.first << " (was: "
2936 << p.second.summary << ")";
2937 }
181888fb
FG
2938
2939 if (health_check_log_times.count(p.first)) {
2940 health_check_log_times.erase(p.first);
2941 }
224ce89b
WB
2942 }
2943 }
7c673cae 2944
224ce89b
WB
2945 if (previous.checks.size() && updated.checks.size() == 0) {
2946 // We might be going into a fully healthy state, check
2947 // other subsystems
2948 bool any_checks = false;
2949 for (auto& svc : paxos_service) {
2950 if (&(svc->get_health_checks()) == &(previous)) {
2951 // Ignore the ones we're clearing right now
2952 continue;
2953 }
7c673cae 2954
224ce89b
WB
2955 if (svc->get_health_checks().checks.size() > 0) {
2956 any_checks = true;
2957 break;
2958 }
2959 }
2960 if (!any_checks) {
2961 clog->info() << "Cluster is now healthy";
2962 }
2963 }
7c673cae
FG
2964}
2965
f67539c2
TL
2966void Monitor::get_cluster_status(stringstream &ss, Formatter *f,
2967 MonSession *session)
7c673cae
FG
2968{
2969 if (f)
2970 f->open_object_section("status");
2971
f67539c2
TL
2972 const auto&& fs_names = session->get_allowed_fs_names();
2973
11fdf7f2 2974 mono_clock::time_point now = mono_clock::now();
7c673cae
FG
2975 if (f) {
2976 f->dump_stream("fsid") << monmap->get_fsid();
9f95a23c 2977 healthmon()->get_health_status(false, f, nullptr);
7c673cae
FG
2978 f->dump_unsigned("election_epoch", get_epoch());
2979 {
2980 f->open_array_section("quorum");
2981 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2982 f->dump_int("rank", *p);
2983 f->close_section();
2984 f->open_array_section("quorum_names");
2985 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2986 f->dump_string("id", monmap->get_name(*p));
2987 f->close_section();
11fdf7f2
TL
2988 f->dump_int(
2989 "quorum_age",
2990 std::chrono::duration_cast<std::chrono::seconds>(
2991 mono_clock::now() - quorum_since).count());
7c673cae
FG
2992 }
2993 f->open_object_section("monmap");
9f95a23c 2994 monmap->dump_summary(f);
7c673cae
FG
2995 f->close_section();
2996 f->open_object_section("osdmap");
224ce89b 2997 osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
7c673cae
FG
2998 f->close_section();
2999 f->open_object_section("pgmap");
11fdf7f2 3000 mgrstatmon()->print_summary(f, NULL);
7c673cae
FG
3001 f->close_section();
3002 f->open_object_section("fsmap");
f67539c2
TL
3003
3004 FSMap fsmap_copy = mdsmon()->get_fsmap();
3005 if (!fs_names.empty()) {
3006 fsmap_copy.filter(fs_names);
3007 }
3008 const FSMap *fsmapp = &fsmap_copy;
3009
3010 fsmapp->print_summary(f, NULL);
7c673cae 3011 f->close_section();
7c673cae
FG
3012 f->open_object_section("mgrmap");
3013 mgrmon()->get_map().print_summary(f, nullptr);
3014 f->close_section();
224ce89b
WB
3015
3016 f->dump_object("servicemap", mgrstatmon()->get_service_map());
11fdf7f2
TL
3017
3018 f->open_object_section("progress_events");
3019 for (auto& i : mgrstatmon()->get_progress_events()) {
3020 f->dump_object(i.first.c_str(), i.second);
3021 }
3022 f->close_section();
3023
7c673cae
FG
3024 f->close_section();
3025 } else {
31f18b77
FG
3026 ss << " cluster:\n";
3027 ss << " id: " << monmap->get_fsid() << "\n";
224ce89b
WB
3028
3029 string health;
9f95a23c
TL
3030 healthmon()->get_health_status(false, nullptr, &health,
3031 "\n ", "\n ");
224ce89b
WB
3032 ss << " health: " << health << "\n";
3033
31f18b77 3034 ss << "\n \n services:\n";
224ce89b
WB
3035 {
3036 size_t maxlen = 3;
3037 auto& service_map = mgrstatmon()->get_service_map();
3038 for (auto& p : service_map.services) {
3039 maxlen = std::max(maxlen, p.first.size());
3040 }
3041 string spacing(maxlen - 3, ' ');
3042 const auto quorum_names = get_quorum_names();
3043 const auto mon_count = monmap->mon_info.size();
3044 ss << " mon: " << spacing << mon_count << " daemons, quorum "
11fdf7f2 3045 << quorum_names << " (age " << timespan_str(now - quorum_since) << ")";
224ce89b
WB
3046 if (quorum_names.size() != mon_count) {
3047 std::list<std::string> out_of_q;
3048 for (size_t i = 0; i < monmap->ranks.size(); ++i) {
3049 if (quorum.count(i) == 0) {
3050 out_of_q.push_back(monmap->ranks[i]);
3051 }
3052 }
3053 ss << ", out of quorum: " << joinify(out_of_q.begin(),
3054 out_of_q.end(), std::string(", "));
31f18b77 3055 }
7c673cae 3056 ss << "\n";
224ce89b
WB
3057 if (mgrmon()->in_use()) {
3058 ss << " mgr: " << spacing;
3059 mgrmon()->get_map().print_summary(nullptr, &ss);
3060 ss << "\n";
3061 }
f67539c2
TL
3062
3063 FSMap fsmap_copy = mdsmon()->get_fsmap();
3064 if (!fs_names.empty()) {
3065 fsmap_copy.filter(fs_names);
3066 }
3067 const FSMap *fsmapp = &fsmap_copy;
3068
3069 if (fsmapp->filesystem_count() > 0 and mdsmon()->should_print_status()){
3070 ss << " mds: " << spacing;
3071 fsmapp->print_daemon_summary(ss);
3072 ss << "\n";
224ce89b 3073 }
f67539c2 3074
224ce89b
WB
3075 ss << " osd: " << spacing;
3076 osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
3077 ss << "\n";
3078 for (auto& p : service_map.services) {
9f95a23c
TL
3079 const std::string &service = p.first;
3080 // filter out normal ceph entity types
3081 if (ServiceMap::is_normal_ceph_entity(service)) {
3082 continue;
3083 }
224ce89b
WB
3084 ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
3085 << p.second.get_summary() << "\n";
3086 }
7c673cae 3087 }
31f18b77 3088
f67539c2
TL
3089 if (auto& service_map = mgrstatmon()->get_service_map();
3090 std::any_of(service_map.services.begin(),
3091 service_map.services.end(),
3092 [](auto& service) {
3093 return service.second.has_running_tasks();
3094 })) {
3095 ss << "\n \n task status:\n";
3096 for (auto& [name, service] : service_map.services) {
3097 ss << service.get_task_summary(name);
9f95a23c
TL
3098 }
3099 }
3100
31f18b77 3101 ss << "\n \n data:\n";
f67539c2 3102 mdsmon()->print_fs_summary(ss);
11fdf7f2
TL
3103 mgrstatmon()->print_summary(NULL, &ss);
3104
3105 auto& pem = mgrstatmon()->get_progress_events();
3106 if (!pem.empty()) {
3107 ss << "\n \n progress:\n";
3108 for (auto& i : pem) {
f67539c2 3109 if (i.second.add_to_ceph_s){
11fdf7f2 3110 ss << " " << i.second.message << "\n";
f67539c2 3111 }
11fdf7f2
TL
3112 }
3113 }
31f18b77 3114 ss << "\n ";
7c673cae
FG
3115 }
3116}
3117
11fdf7f2 3118void Monitor::_generate_command_map(cmdmap_t& cmdmap,
7c673cae
FG
3119 map<string,string> &param_str_map)
3120{
11fdf7f2 3121 for (auto p = cmdmap.begin(); p != cmdmap.end(); ++p) {
7c673cae
FG
3122 if (p->first == "prefix")
3123 continue;
3124 if (p->first == "caps") {
3125 vector<string> cv;
9f95a23c 3126 if (cmd_getval(cmdmap, "caps", cv) &&
7c673cae
FG
3127 cv.size() % 2 == 0) {
3128 for (unsigned i = 0; i < cv.size(); i += 2) {
3129 string k = string("caps_") + cv[i];
3130 param_str_map[k] = cv[i + 1];
3131 }
3132 continue;
3133 }
3134 }
3135 param_str_map[p->first] = cmd_vartype_stringify(p->second);
3136 }
3137}
3138
c07f9fc5
FG
3139const MonCommand *Monitor::_get_moncommand(
3140 const string &cmd_prefix,
d2e6a577
FG
3141 const vector<MonCommand>& cmds)
3142{
3143 for (auto& c : cmds) {
3144 if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
3145 return &c;
7c673cae
FG
3146 }
3147 }
d2e6a577 3148 return nullptr;
7c673cae
FG
3149}
3150
11fdf7f2
TL
3151bool Monitor::_allowed_command(MonSession *s, const string &module,
3152 const string &prefix, const cmdmap_t& cmdmap,
7c673cae
FG
3153 const map<string,string>& param_str_map,
3154 const MonCommand *this_cmd) {
3155
3156 bool cmd_r = this_cmd->requires_perm('r');
3157 bool cmd_w = this_cmd->requires_perm('w');
3158 bool cmd_x = this_cmd->requires_perm('x');
3159
3160 bool capable = s->caps.is_capable(
3161 g_ceph_context,
7c673cae
FG
3162 s->entity_name,
3163 module, prefix, param_str_map,
11fdf7f2
TL
3164 cmd_r, cmd_w, cmd_x,
3165 s->get_peer_socket_addr());
7c673cae
FG
3166
3167 dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
3168 return capable;
3169}
3170
c07f9fc5 3171void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
7c673cae 3172 Formatter *f,
11fdf7f2
TL
3173 uint64_t features,
3174 bufferlist *rdata)
7c673cae
FG
3175{
3176 int cmdnum = 0;
3177 f->open_object_section("command_descriptions");
c07f9fc5
FG
3178 for (const auto &cmd : commands) {
3179 unsigned flags = cmd.flags;
7c673cae
FG
3180 ostringstream secname;
3181 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
11fdf7f2 3182 dump_cmddesc_to_json(f, features, secname.str(),
c07f9fc5 3183 cmd.cmdstring, cmd.helpstring, cmd.module,
11fdf7f2 3184 cmd.req_perms, flags);
7c673cae
FG
3185 cmdnum++;
3186 }
3187 f->close_section(); // command_descriptions
3188
3189 f->flush(*rdata);
3190}
3191
7c673cae
FG
3192bool Monitor::is_keyring_required()
3193{
11fdf7f2
TL
3194 return auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX) ||
3195 auth_service_required.is_supported_auth(CEPH_AUTH_CEPHX) ||
3196 auth_cluster_required.is_supported_auth(CEPH_AUTH_GSS) ||
3197 auth_service_required.is_supported_auth(CEPH_AUTH_GSS);
7c673cae
FG
3198}
3199
3200struct C_MgrProxyCommand : public Context {
3201 Monitor *mon;
3202 MonOpRequestRef op;
3203 uint64_t size;
3204 bufferlist outbl;
3205 string outs;
3206 C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
3207 : mon(mon), op(op), size(s) { }
3208 void finish(int r) {
11fdf7f2 3209 std::lock_guard l(mon->lock);
7c673cae
FG
3210 mon->mgr_proxy_bytes -= size;
3211 mon->reply_command(op, r, outs, outbl, 0);
3212 }
3213};
3214
9f95a23c 3215void Monitor::handle_tell_command(MonOpRequestRef op)
7c673cae 3216{
11fdf7f2 3217 ceph_assert(op->is_type_command());
9f95a23c 3218 MCommand *m = static_cast<MCommand*>(op->get_req());
7c673cae
FG
3219 if (m->fsid != monmap->fsid) {
3220 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
9f95a23c
TL
3221 return reply_tell_command(op, -EACCES, "wrong fsid");
3222 }
3223 MonSession *session = op->get_session();
3224 if (!session) {
3225 dout(5) << __func__ << " dropping stray message " << *m << dendl;
3226 return;
3227 }
3228 cmdmap_t cmdmap;
3229 if (stringstream ss; !cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3230 return reply_tell_command(op, -EINVAL, ss.str());
3231 }
3232 map<string,string> param_str_map;
3233 _generate_command_map(cmdmap, param_str_map);
3234 string prefix;
3235 if (!cmd_getval(cmdmap, "prefix", prefix)) {
3236 return reply_tell_command(op, -EINVAL, "no prefix");
3237 }
3238 if (auto cmd = _get_moncommand(prefix,
3239 get_local_commands(quorum_mon_features));
3240 cmd) {
3241 if (cmd->is_obsolete() ||
3242 (cct->_conf->mon_debug_deprecated_as_obsolete &&
3243 cmd->is_deprecated())) {
3244 return reply_tell_command(op, -ENOTSUP,
3245 "command is obsolete; "
3246 "please check usage and/or man page");
3247 }
3248 }
f67539c2 3249 // see if command is allowed
9f95a23c
TL
3250 if (!session->caps.is_capable(
3251 g_ceph_context,
3252 session->entity_name,
3253 "mon", prefix, param_str_map,
3254 true, true, true,
3255 session->get_peer_socket_addr())) {
3256 return reply_tell_command(op, -EACCES, "insufficient caps");
3257 }
3258 // pass it to asok
3259 cct->get_admin_socket()->queue_tell_command(m);
3260}
3261
3262void Monitor::handle_command(MonOpRequestRef op)
3263{
3264 ceph_assert(op->is_type_command());
3265 auto m = op->get_req<MMonCommand>();
3266 if (m->fsid != monmap->fsid) {
3267 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid
3268 << dendl;
7c673cae
FG
3269 reply_command(op, -EPERM, "wrong fsid", 0);
3270 return;
3271 }
3272
11fdf7f2 3273 MonSession *session = op->get_session();
7c673cae
FG
3274 if (!session) {
3275 dout(5) << __func__ << " dropping stray message " << *m << dendl;
3276 return;
3277 }
7c673cae
FG
3278
3279 if (m->cmd.empty()) {
9f95a23c 3280 reply_command(op, -EINVAL, "no command specified", 0);
7c673cae
FG
3281 return;
3282 }
3283
3284 string prefix;
3285 vector<string> fullcmd;
11fdf7f2 3286 cmdmap_t cmdmap;
7c673cae
FG
3287 stringstream ss, ds;
3288 bufferlist rdata;
3289 string rs;
3290 int r = -EINVAL;
3291 rs = "unrecognized command";
3292
3293 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3294 // ss has reason for failure
3295 r = -EINVAL;
3296 rs = ss.str();
3297 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3298 reply_command(op, r, rs, 0);
3299 return;
3300 }
3301
3302 // check return value. If no prefix parameter provided,
3303 // return value will be false, then return error info.
9f95a23c 3304 if (!cmd_getval(cmdmap, "prefix", prefix)) {
7c673cae
FG
3305 reply_command(op, -EINVAL, "command prefix not found", 0);
3306 return;
3307 }
3308
3309 // check prefix is empty
3310 if (prefix.empty()) {
3311 reply_command(op, -EINVAL, "command prefix must not be empty", 0);
3312 return;
3313 }
3314
3315 if (prefix == "get_command_descriptions") {
3316 bufferlist rdata;
3317 Formatter *f = Formatter::create("json");
d2e6a577 3318
11fdf7f2
TL
3319 std::vector<MonCommand> commands = static_cast<MgrMonitor*>(
3320 paxos_service[PAXOS_MGR].get())->get_command_descs();
c07f9fc5 3321
d2e6a577
FG
3322 for (auto& c : leader_mon_commands) {
3323 commands.push_back(c);
c07f9fc5
FG
3324 }
3325
11fdf7f2
TL
3326 auto features = m->get_connection()->get_features();
3327 format_command_descriptions(commands, f, features, &rdata);
7c673cae
FG
3328 delete f;
3329 reply_command(op, 0, "", rdata, 0);
3330 return;
3331 }
3332
7c673cae
FG
3333 dout(0) << "handle_command " << *m << dendl;
3334
3335 string format;
9f95a23c 3336 cmd_getval(cmdmap, "format", format, string("plain"));
7c673cae
FG
3337 boost::scoped_ptr<Formatter> f(Formatter::create(format));
3338
3339 get_str_vec(prefix, fullcmd);
3340
3341 // make sure fullcmd is not empty.
3342 // invalid prefix will cause empty vector fullcmd.
3343 // such as, prefix=";,,;"
3344 if (fullcmd.empty()) {
3345 reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
3346 return;
3347 }
3348
f67539c2 3349 std::string_view module = fullcmd[0];
7c673cae
FG
3350
3351 // validate command is in leader map
3352
3353 const MonCommand *leader_cmd;
c07f9fc5
FG
3354 const auto& mgr_cmds = mgrmon()->get_command_descs();
3355 const MonCommand *mgr_cmd = nullptr;
3356 if (!mgr_cmds.empty()) {
d2e6a577 3357 mgr_cmd = _get_moncommand(prefix, mgr_cmds);
c07f9fc5 3358 }
d2e6a577 3359 leader_cmd = _get_moncommand(prefix, leader_mon_commands);
7c673cae 3360 if (!leader_cmd) {
c07f9fc5
FG
3361 leader_cmd = mgr_cmd;
3362 if (!leader_cmd) {
3363 reply_command(op, -EINVAL, "command not known", 0);
3364 return;
3365 }
7c673cae
FG
3366 }
3367 // validate command is in our map & matches, or forward if it is allowed
d2e6a577
FG
3368 const MonCommand *mon_cmd = _get_moncommand(
3369 prefix,
3370 get_local_commands(quorum_mon_features));
c07f9fc5
FG
3371 if (!mon_cmd) {
3372 mon_cmd = mgr_cmd;
3373 }
7c673cae
FG
3374 if (!is_leader()) {
3375 if (!mon_cmd) {
3376 if (leader_cmd->is_noforward()) {
3377 reply_command(op, -EINVAL,
3378 "command not locally supported and not allowed to forward",
3379 0);
3380 return;
3381 }
3382 dout(10) << "Command not locally supported, forwarding request "
3383 << m << dendl;
3384 forward_request_leader(op);
3385 return;
3386 } else if (!mon_cmd->is_compat(leader_cmd)) {
3387 if (mon_cmd->is_noforward()) {
3388 reply_command(op, -EINVAL,
3389 "command not compatible with leader and not allowed to forward",
3390 0);
3391 return;
3392 }
3393 dout(10) << "Command not compatible with leader, forwarding request "
3394 << m << dendl;
3395 forward_request_leader(op);
3396 return;
3397 }
3398 }
3399
3400 if (mon_cmd->is_obsolete() ||
3401 (cct->_conf->mon_debug_deprecated_as_obsolete
3402 && mon_cmd->is_deprecated())) {
3403 reply_command(op, -ENOTSUP,
3404 "command is obsolete; please check usage and/or man page",
3405 0);
3406 return;
3407 }
3408
3409 if (session->proxy_con && mon_cmd->is_noforward()) {
3410 dout(10) << "Got forward for noforward command " << m << dendl;
3411 reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
3412 return;
3413 }
3414
3415 /* what we perceive as being the service the command falls under */
3416 string service(mon_cmd->module);
3417
3418 dout(25) << __func__ << " prefix='" << prefix
3419 << "' module='" << module
3420 << "' service='" << service << "'" << dendl;
3421
3422 bool cmd_is_rw =
3423 (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
3424
3425 // validate user's permissions for requested command
3426 map<string,string> param_str_map;
3427 _generate_command_map(cmdmap, param_str_map);
3428 if (!_allowed_command(session, service, prefix, cmdmap,
3429 param_str_map, mon_cmd)) {
3430 dout(1) << __func__ << " access denied" << dendl;
adb31ebb
TL
3431 if (prefix != "config set" && prefix != "config-key set")
3432 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3433 << "from='" << session->name << " " << session->addrs << "' "
3434 << "entity='" << session->entity_name << "' "
3435 << "cmd=" << m->cmd << ": access denied";
7c673cae
FG
3436 reply_command(op, -EACCES, "access denied", 0);
3437 return;
3438 }
3439
adb31ebb
TL
3440 if (prefix != "config set" && prefix != "config-key set")
3441 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3442 << "from='" << session->name << " " << session->addrs << "' "
3443 << "entity='" << session->entity_name << "' "
3444 << "cmd=" << m->cmd << ": dispatch";
7c673cae 3445
1911f103
TL
3446 // compat kludge for legacy clients trying to tell commands that are
3447 // new. see bottom of MonCommands.h. we need to handle both (1)
3448 // pre-octopus clients and (2) octopus clients with a mix of pre-octopus
3449 // and octopus mons.
3450 if ((!HAVE_FEATURE(m->get_connection()->get_features(), SERVER_OCTOPUS) ||
3451 monmap->min_mon_release < ceph_release_t::octopus) &&
3452 (prefix == "injectargs" ||
3453 prefix == "smart" ||
3454 prefix == "mon_status" ||
3455 prefix == "heap")) {
3456 if (m->get_connection()->get_messenger() == 0) {
3457 // Prior to octopus, monitors might forward these messages
3458 // around. that was broken at baseline, and if we try to process
3459 // this message now, it will assert out when we try to send a
3460 // message in reply from the asok/tell worker (see
3461 // AnonConnection). Just reply with an error.
3462 dout(5) << __func__ << " failing forwarded command from a (presumably) "
3463 << "pre-octopus peer" << dendl;
3464 reply_command(
3465 op, -EBUSY,
3466 "failing forwarded tell command in mixed-version mon cluster", 0);
3467 return;
3468 }
3469 dout(5) << __func__ << " passing command to tell/asok" << dendl;
3470 cct->get_admin_socket()->queue_tell_command(m);
3471 return;
3472 }
3473
11fdf7f2 3474 if (mon_cmd->is_mgr()) {
7c673cae
FG
3475 const auto& hdr = m->get_header();
3476 uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
11fdf7f2
TL
3477 uint64_t max = g_conf().get_val<Option::size_t>("mon_client_bytes")
3478 * g_conf().get_val<double>("mon_mgr_proxy_client_bytes_ratio");
7c673cae
FG
3479 if (mgr_proxy_bytes + size > max) {
3480 dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
3481 << " + " << size << " > max " << max << dendl;
3482 reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
3483 return;
3484 }
3485 mgr_proxy_bytes += size;
3486 dout(10) << __func__ << " proxying mgr command (+" << size
3487 << " -> " << mgr_proxy_bytes << ")" << dendl;
3488 C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
3489 mgr_client.start_command(m->cmd,
3490 m->get_data(),
3491 &fin->outbl,
3492 &fin->outs,
3493 new C_OnFinisher(fin, &finisher));
3494 return;
3495 }
3496
d2e6a577
FG
3497 if ((module == "mds" || module == "fs") &&
3498 prefix != "fs authorize") {
7c673cae
FG
3499 mdsmon()->dispatch(op);
3500 return;
3501 }
11fdf7f2
TL
3502 if ((module == "osd" ||
3503 prefix == "pg map" ||
3504 prefix == "pg repeer") &&
31f18b77 3505 prefix != "osd last-stat-seq") {
7c673cae
FG
3506 osdmon()->dispatch(op);
3507 return;
3508 }
11fdf7f2
TL
3509 if (module == "config") {
3510 configmon()->dispatch(op);
7c673cae
FG
3511 return;
3512 }
11fdf7f2 3513
7c673cae
FG
3514 if (module == "mon" &&
3515 /* Let the Monitor class handle the following commands:
7c673cae 3516 * 'mon scrub'
7c673cae 3517 */
7c673cae 3518 prefix != "mon scrub" &&
31f18b77
FG
3519 prefix != "mon metadata" &&
3520 prefix != "mon versions" &&
11fdf7f2
TL
3521 prefix != "mon count-metadata" &&
3522 prefix != "mon ok-to-stop" &&
3523 prefix != "mon ok-to-add-offline" &&
3524 prefix != "mon ok-to-rm") {
7c673cae
FG
3525 monmon()->dispatch(op);
3526 return;
3527 }
9f95a23c
TL
3528 if (module == "health" && prefix != "health") {
3529 healthmon()->dispatch(op);
3530 return;
3531 }
d2e6a577 3532 if (module == "auth" || prefix == "fs authorize") {
7c673cae
FG
3533 authmon()->dispatch(op);
3534 return;
3535 }
3536 if (module == "log") {
3537 logmon()->dispatch(op);
3538 return;
3539 }
3540
3541 if (module == "config-key") {
f67539c2 3542 kvmon()->dispatch(op);
7c673cae
FG
3543 return;
3544 }
3545
3546 if (module == "mgr") {
3547 mgrmon()->dispatch(op);
3548 return;
3549 }
3550
3551 if (prefix == "fsid") {
3552 if (f) {
3553 f->open_object_section("fsid");
3554 f->dump_stream("fsid") << monmap->fsid;
3555 f->close_section();
3556 f->flush(rdata);
3557 } else {
3558 ds << monmap->fsid;
3559 rdata.append(ds);
3560 }
3561 reply_command(op, 0, "", rdata, 0);
3562 return;
3563 }
3564
9f95a23c 3565 if (prefix == "mon scrub") {
7c673cae
FG
3566 wait_for_paxos_write();
3567 if (is_leader()) {
3568 int r = scrub_start();
3569 reply_command(op, r, "", rdata, 0);
3570 } else if (is_peon()) {
3571 forward_request_leader(op);
3572 } else {
3573 reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3574 }
3575 return;
3576 }
3577
9f95a23c 3578 if (prefix == "time-sync-status") {
224ce89b
WB
3579 if (!f)
3580 f.reset(Formatter::create("json-pretty"));
3581 f->open_object_section("time_sync");
3582 if (!timecheck_skews.empty()) {
3583 f->open_object_section("time_skew_status");
3584 for (auto& i : timecheck_skews) {
224ce89b 3585 double skew = i.second;
11fdf7f2
TL
3586 double latency = timecheck_latencies[i.first];
3587 string name = monmap->get_name(i.first);
224ce89b
WB
3588 ostringstream tcss;
3589 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
3590 f->open_object_section(name.c_str());
3591 f->dump_float("skew", skew);
3592 f->dump_float("latency", latency);
3593 f->dump_stream("health") << tcstatus;
3594 if (tcstatus != HEALTH_OK) {
3595 f->dump_stream("details") << tcss.str();
3596 }
3597 f->close_section();
3598 }
3599 f->close_section();
3600 }
3601 f->open_object_section("timechecks");
3602 f->dump_unsigned("epoch", get_epoch());
3603 f->dump_int("round", timecheck_round);
3604 f->dump_stream("round_status") << ((timecheck_round%2) ?
3605 "on-going" : "finished");
3606 f->close_section();
3607 f->close_section();
3608 f->flush(rdata);
3609 r = 0;
3610 rs = "";
7c673cae
FG
3611 } else if (prefix == "status" ||
3612 prefix == "health" ||
3613 prefix == "df") {
3614 string detail;
9f95a23c 3615 cmd_getval(cmdmap, "detail", detail);
7c673cae
FG
3616
3617 if (prefix == "status") {
3618 // get_cluster_status handles f == NULL
f67539c2 3619 get_cluster_status(ds, f.get(), session);
7c673cae
FG
3620
3621 if (f) {
3622 f->flush(ds);
3623 ds << '\n';
3624 }
3625 rdata.append(ds);
3626 } else if (prefix == "health") {
11fdf7f2 3627 string plain;
9f95a23c 3628 healthmon()->get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
11fdf7f2
TL
3629 if (f) {
3630 f->flush(rdata);
7c673cae 3631 } else {
11fdf7f2 3632 rdata.append(plain);
7c673cae 3633 }
7c673cae
FG
3634 } else if (prefix == "df") {
3635 bool verbose = (detail == "detail");
3636 if (f)
3637 f->open_object_section("stats");
3638
11fdf7f2
TL
3639 mgrstatmon()->dump_cluster_stats(&ds, f.get(), verbose);
3640 if (!f) {
3641 ds << "\n \n";
3642 }
3643 mgrstatmon()->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
7c673cae
FG
3644
3645 if (f) {
3646 f->close_section();
3647 f->flush(ds);
3648 ds << '\n';
3649 }
3650 } else {
11fdf7f2 3651 ceph_abort_msg("We should never get here!");
7c673cae
FG
3652 return;
3653 }
3654 rdata.append(ds);
3655 rs = "";
3656 r = 0;
3657 } else if (prefix == "report") {
3658
3659 // this must be formatted, in its current form
3660 if (!f)
3661 f.reset(Formatter::create("json-pretty"));
3662 f->open_object_section("report");
3663 f->dump_stream("cluster_fingerprint") << fingerprint;
3664 f->dump_string("version", ceph_version_to_str());
3665 f->dump_string("commit", git_version_to_str());
3666 f->dump_stream("timestamp") << ceph_clock_now();
3667
3668 vector<string> tagsvec;
9f95a23c 3669 cmd_getval(cmdmap, "tags", tagsvec);
7c673cae
FG
3670 string tagstr = str_join(tagsvec, " ");
3671 if (!tagstr.empty())
3672 tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3673 f->dump_string("tag", tagstr);
3674
9f95a23c 3675 healthmon()->get_health_status(true, f.get(), nullptr);
7c673cae
FG
3676
3677 monmon()->dump_info(f.get());
3678 osdmon()->dump_info(f.get());
3679 mdsmon()->dump_info(f.get());
7c673cae 3680 authmon()->dump_info(f.get());
11fdf7f2 3681 mgrstatmon()->dump_info(f.get());
7c673cae
FG
3682
3683 paxos->dump_info(f.get());
3684
3685 f->close_section();
3686 f->flush(rdata);
3687
3688 ostringstream ss2;
11fdf7f2 3689 ss2 << "report " << rdata.crc32c(CEPH_MON_PORT_LEGACY);
7c673cae
FG
3690 rs = ss2.str();
3691 r = 0;
31f18b77
FG
3692 } else if (prefix == "osd last-stat-seq") {
3693 int64_t osd;
9f95a23c 3694 cmd_getval(cmdmap, "id", osd);
31f18b77
FG
3695 uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3696 if (f) {
3697 f->dump_unsigned("seq", seq);
3698 f->flush(ds);
3699 } else {
3700 ds << seq;
3701 rdata.append(ds);
3702 }
3703 rs = "";
3704 r = 0;
7c673cae
FG
3705 } else if (prefix == "node ls") {
3706 string node_type("all");
9f95a23c 3707 cmd_getval(cmdmap, "type", node_type);
7c673cae
FG
3708 if (!f)
3709 f.reset(Formatter::create("json-pretty"));
3710 if (node_type == "all") {
3711 f->open_object_section("nodes");
3712 print_nodes(f.get(), ds);
3713 osdmon()->print_nodes(f.get());
3714 mdsmon()->print_nodes(f.get());
11fdf7f2 3715 mgrmon()->print_nodes(f.get());
7c673cae
FG
3716 f->close_section();
3717 } else if (node_type == "mon") {
3718 print_nodes(f.get(), ds);
3719 } else if (node_type == "osd") {
3720 osdmon()->print_nodes(f.get());
3721 } else if (node_type == "mds") {
3722 mdsmon()->print_nodes(f.get());
11fdf7f2
TL
3723 } else if (node_type == "mgr") {
3724 mgrmon()->print_nodes(f.get());
7c673cae
FG
3725 }
3726 f->flush(ds);
3727 rdata.append(ds);
3728 rs = "";
3729 r = 0;
31f18b77
FG
3730 } else if (prefix == "features") {
3731 if (!is_leader() && !is_peon()) {
3732 dout(10) << " waiting for quorum" << dendl;
3733 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3734 return;
3735 }
3736 if (!is_leader()) {
3737 forward_request_leader(op);
3738 return;
3739 }
3740 if (!f)
3741 f.reset(Formatter::create("json-pretty"));
3742 FeatureMap fm;
3743 get_combined_feature_map(&fm);
3744 f->dump_object("features", fm);
3745 f->flush(rdata);
3746 rs = "";
3747 r = 0;
7c673cae
FG
3748 } else if (prefix == "mon metadata") {
3749 if (!f)
3750 f.reset(Formatter::create("json-pretty"));
3751
3752 string name;
9f95a23c 3753 bool all = !cmd_getval(cmdmap, "id", name);
7c673cae
FG
3754 if (!all) {
3755 // Dump a single mon's metadata
3756 int mon = monmap->get_rank(name);
3757 if (mon < 0) {
3758 rs = "requested mon not found";
3759 r = -ENOENT;
3760 goto out;
3761 }
3762 f->open_object_section("mon_metadata");
3763 r = get_mon_metadata(mon, f.get(), ds);
3764 f->close_section();
3765 } else {
3766 // Dump all mons' metadata
3767 r = 0;
3768 f->open_array_section("mon_metadata");
3769 for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3770 std::ostringstream get_err;
3771 f->open_object_section("mon");
3772 f->dump_string("name", monmap->get_name(rank));
3773 r = get_mon_metadata(rank, f.get(), get_err);
3774 f->close_section();
3775 if (r == -ENOENT || r == -EINVAL) {
3776 dout(1) << get_err.str() << dendl;
3777 // Drop error, list what metadata we do have
3778 r = 0;
3779 } else if (r != 0) {
3780 derr << "Unexpected error from get_mon_metadata: "
3781 << cpp_strerror(r) << dendl;
3782 ds << get_err.str();
3783 break;
3784 }
3785 }
3786 f->close_section();
3787 }
3788
3789 f->flush(ds);
3790 rdata.append(ds);
3791 rs = "";
31f18b77
FG
3792 } else if (prefix == "mon versions") {
3793 if (!f)
3794 f.reset(Formatter::create("json-pretty"));
3795 count_metadata("ceph_version", f.get());
3796 f->flush(ds);
3797 rdata.append(ds);
3798 rs = "";
3799 r = 0;
3800 } else if (prefix == "mon count-metadata") {
3801 if (!f)
3802 f.reset(Formatter::create("json-pretty"));
3803 string field;
9f95a23c 3804 cmd_getval(cmdmap, "property", field);
31f18b77
FG
3805 count_metadata(field, f.get());
3806 f->flush(ds);
3807 rdata.append(ds);
3808 rs = "";
3809 r = 0;
7c673cae
FG
3810 } else if (prefix == "quorum_status") {
3811 // make sure our map is readable and up to date
3812 if (!is_leader() && !is_peon()) {
3813 dout(10) << " waiting for quorum" << dendl;
3814 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3815 return;
3816 }
3817 _quorum_status(f.get(), ds);
3818 rdata.append(ds);
3819 rs = "";
3820 r = 0;
11fdf7f2
TL
3821 } else if (prefix == "mon ok-to-stop") {
3822 vector<string> ids;
9f95a23c 3823 if (!cmd_getval(cmdmap, "ids", ids)) {
11fdf7f2
TL
3824 r = -EINVAL;
3825 goto out;
3826 }
3827 set<string> wouldbe;
3828 for (auto rank : quorum) {
3829 wouldbe.insert(monmap->get_name(rank));
3830 }
3831 for (auto& n : ids) {
3832 if (monmap->contains(n)) {
3833 wouldbe.erase(n);
3834 }
3835 }
3836 if (wouldbe.size() < monmap->min_quorum_size()) {
3837 r = -EBUSY;
3838 rs = "not enough monitors would be available (" + stringify(wouldbe) +
3839 ") after stopping mons " + stringify(ids);
3840 goto out;
3841 }
3842 r = 0;
3843 rs = "quorum should be preserved (" + stringify(wouldbe) +
3844 ") after stopping " + stringify(ids);
3845 } else if (prefix == "mon ok-to-add-offline") {
3846 if (quorum.size() < monmap->min_quorum_size(monmap->size() + 1)) {
3847 rs = "adding a monitor may break quorum (until that monitor starts)";
3848 r = -EBUSY;
3849 goto out;
3850 }
3851 rs = "adding another mon that is not yet online will not break quorum";
3852 r = 0;
3853 } else if (prefix == "mon ok-to-rm") {
3854 string id;
9f95a23c 3855 if (!cmd_getval(cmdmap, "id", id)) {
11fdf7f2
TL
3856 r = -EINVAL;
3857 rs = "must specify a monitor id";
3858 goto out;
3859 }
3860 if (!monmap->contains(id)) {
3861 r = 0;
3862 rs = "mon." + id + " does not exist";
3863 goto out;
3864 }
3865 int rank = monmap->get_rank(id);
3866 if (quorum.count(rank) &&
3867 quorum.size() - 1 < monmap->min_quorum_size(monmap->size() - 1)) {
3868 r = -EBUSY;
3869 rs = "removing mon." + id + " would break quorum";
3870 goto out;
3871 }
3872 r = 0;
3873 rs = "safe to remove mon." + id;
7c673cae
FG
3874 } else if (prefix == "version") {
3875 if (f) {
3876 f->open_object_section("version");
3877 f->dump_string("version", pretty_version_to_str());
3878 f->close_section();
3879 f->flush(ds);
3880 } else {
3881 ds << pretty_version_to_str();
3882 }
3883 rdata.append(ds);
3884 rs = "";
3885 r = 0;
c07f9fc5
FG
3886 } else if (prefix == "versions") {
3887 if (!f)
3888 f.reset(Formatter::create("json-pretty"));
3889 map<string,int> overall;
3890 f->open_object_section("version");
3891 map<string,int> mon, mgr, osd, mds;
3892
3893 count_metadata("ceph_version", &mon);
3894 f->open_object_section("mon");
3895 for (auto& p : mon) {
3896 f->dump_int(p.first.c_str(), p.second);
3897 overall[p.first] += p.second;
3898 }
3899 f->close_section();
3900
3901 mgrmon()->count_metadata("ceph_version", &mgr);
3902 f->open_object_section("mgr");
3903 for (auto& p : mgr) {
3904 f->dump_int(p.first.c_str(), p.second);
3905 overall[p.first] += p.second;
3906 }
3907 f->close_section();
3908
3909 osdmon()->count_metadata("ceph_version", &osd);
3910 f->open_object_section("osd");
3911 for (auto& p : osd) {
3912 f->dump_int(p.first.c_str(), p.second);
3913 overall[p.first] += p.second;
3914 }
3915 f->close_section();
3916
3917 mdsmon()->count_metadata("ceph_version", &mds);
3918 f->open_object_section("mds");
35e4c445 3919 for (auto& p : mds) {
c07f9fc5
FG
3920 f->dump_int(p.first.c_str(), p.second);
3921 overall[p.first] += p.second;
3922 }
3923 f->close_section();
3924
3925 for (auto& p : mgrstatmon()->get_service_map().services) {
9f95a23c
TL
3926 auto &service = p.first;
3927 if (ServiceMap::is_normal_ceph_entity(service)) {
3928 continue;
3929 }
3930 f->open_object_section(service.c_str());
c07f9fc5
FG
3931 map<string,int> m;
3932 p.second.count_metadata("ceph_version", &m);
3933 for (auto& q : m) {
3934 f->dump_int(q.first.c_str(), q.second);
3935 overall[q.first] += q.second;
3936 }
3937 f->close_section();
3938 }
3939
3940 f->open_object_section("overall");
3941 for (auto& p : overall) {
3942 f->dump_int(p.first.c_str(), p.second);
3943 }
3944 f->close_section();
3945 f->close_section();
3946 f->flush(rdata);
3947 rs = "";
3948 r = 0;
7c673cae
FG
3949 }
3950
3951 out:
3952 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3953 reply_command(op, r, rs, rdata, 0);
3954}
3955
3956void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
3957{
3958 bufferlist rdata;
3959 reply_command(op, rc, rs, rdata, version);
3960}
3961
3962void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
3963 bufferlist& rdata, version_t version)
3964{
9f95a23c 3965 auto m = op->get_req<MMonCommand>();
11fdf7f2 3966 ceph_assert(m->get_type() == MSG_MON_COMMAND);
7c673cae
FG
3967 MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
3968 reply->set_tid(m->get_tid());
3969 reply->set_data(rdata);
3970 send_reply(op, reply);
3971}
3972
9f95a23c
TL
3973void Monitor::reply_tell_command(
3974 MonOpRequestRef op, int rc, const string &rs)
3975{
3976 MCommand *m = static_cast<MCommand*>(op->get_req());
3977 ceph_assert(m->get_type() == MSG_COMMAND);
3978 MCommandReply *reply = new MCommandReply(rc, rs);
3979 reply->set_tid(m->get_tid());
3980 m->get_connection()->send_message(reply);
3981}
3982
7c673cae
FG
3983
3984// ------------------------
3985// request/reply routing
3986//
3987// a client/mds/osd will connect to a random monitor. we need to forward any
3988// messages requiring state updates to the leader, and then route any replies
3989// back via the correct monitor and back to them. (the monitor will not
3990// initiate any connections.)
3991
3992void Monitor::forward_request_leader(MonOpRequestRef op)
3993{
3994 op->mark_event(__func__);
3995
3996 int mon = get_leader();
3997 MonSession *session = op->get_session();
3998 PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
3999
11fdf7f2 4000 if (req->get_source().is_mon() && req->get_source_addrs() != messenger->get_myaddrs()) {
7c673cae
FG
4001 dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
4002 } else if (session->proxy_con) {
4003 dout(10) << "forward_request won't double fwd request " << *req << dendl;
4004 } else if (!session->closed) {
4005 RoutedRequest *rr = new RoutedRequest;
4006 rr->tid = ++routed_request_tid;
7c673cae
FG
4007 rr->con = req->get_connection();
4008 rr->con_features = rr->con->get_features();
4009 encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
4010 rr->session = static_cast<MonSession *>(session->get());
4011 rr->op = op;
4012 routed_requests[rr->tid] = rr;
4013 session->routed_request_tids.insert(rr->tid);
4014
4015 dout(10) << "forward_request " << rr->tid << " request " << *req
4016 << " features " << rr->con_features << dendl;
4017
4018 MForward *forward = new MForward(rr->tid,
4019 req,
4020 rr->con_features,
4021 rr->session->caps);
4022 forward->set_priority(req->get_priority());
4023 if (session->auth_handler) {
4024 forward->entity_name = session->entity_name;
4025 } else if (req->get_source().is_mon()) {
4026 forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
4027 }
11fdf7f2 4028 send_mon_message(forward, mon);
7c673cae 4029 op->mark_forwarded();
11fdf7f2 4030 ceph_assert(op->get_req()->get_type() != 0);
7c673cae
FG
4031 } else {
4032 dout(10) << "forward_request no session for request " << *req << dendl;
4033 }
4034}
4035
4036// fake connection attached to forwarded messages
4037struct AnonConnection : public Connection {
11fdf7f2 4038 entity_addr_t socket_addr;
7c673cae
FG
4039
4040 int send_message(Message *m) override {
11fdf7f2 4041 ceph_assert(!"send_message on anonymous connection");
7c673cae
FG
4042 }
4043 void send_keepalive() override {
11fdf7f2 4044 ceph_assert(!"send_keepalive on anonymous connection");
7c673cae
FG
4045 }
4046 void mark_down() override {
4047 // silently ignore
4048 }
4049 void mark_disposable() override {
4050 // silengtly ignore
4051 }
4052 bool is_connected() override { return false; }
11fdf7f2
TL
4053 entity_addr_t get_peer_socket_addr() const override {
4054 return socket_addr;
4055 }
9f95a23c
TL
4056
4057private:
4058 FRIEND_MAKE_REF(AnonConnection);
4059 explicit AnonConnection(CephContext *cct, const entity_addr_t& sa)
4060 : Connection(cct, nullptr),
4061 socket_addr(sa) {}
7c673cae
FG
4062};
4063
4064//extract the original message and put it into the regular dispatch function
4065void Monitor::handle_forward(MonOpRequestRef op)
4066{
9f95a23c 4067 auto m = op->get_req<MForward>();
11fdf7f2
TL
4068 dout(10) << "received forwarded message from "
4069 << ceph_entity_type_name(m->client_type)
4070 << " " << m->client_addrs
7c673cae
FG
4071 << " via " << m->get_source_inst() << dendl;
4072 MonSession *session = op->get_session();
11fdf7f2 4073 ceph_assert(session);
7c673cae
FG
4074
4075 if (!session->is_capable("mon", MON_CAP_X)) {
4076 dout(0) << "forward from entity with insufficient caps! "
4077 << session->caps << dendl;
4078 } else {
4079 // see PaxosService::dispatch(); we rely on this being anon
4080 // (c->msgr == NULL)
4081 PaxosServiceMessage *req = m->claim_message();
11fdf7f2
TL
4082 ceph_assert(req != NULL);
4083
9f95a23c 4084 auto c = ceph::make_ref<AnonConnection>(cct, m->client_socket_addr);
11fdf7f2
TL
4085 MonSession *s = new MonSession(static_cast<Connection*>(c.get()));
4086 s->_ident(req->get_source(),
4087 req->get_source_addrs());
4088 c->set_priv(RefCountedPtr{s, false});
4089 c->set_peer_addrs(m->client_addrs);
4090 c->set_peer_type(m->client_type);
7c673cae
FG
4091 c->set_features(m->con_features);
4092
11fdf7f2 4093 s->authenticated = true;
7c673cae
FG
4094 s->caps = m->client_caps;
4095 dout(10) << " caps are " << s->caps << dendl;
4096 s->entity_name = m->entity_name;
4097 dout(10) << " entity name '" << s->entity_name << "' type "
4098 << s->entity_name.get_type() << dendl;
4099 s->proxy_con = m->get_connection();
4100 s->proxy_tid = m->tid;
4101
4102 req->set_connection(c);
4103
4104 // not super accurate, but better than nothing.
4105 req->set_recv_stamp(m->get_recv_stamp());
4106
4107 /*
4108 * note which election epoch this is; we will drop the message if
4109 * there is a future election since our peers will resend routed
4110 * requests in that case.
4111 */
4112 req->rx_election_epoch = get_epoch();
4113
7c673cae 4114 dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
7c673cae 4115 _ms_dispatch(req);
7c673cae 4116
11fdf7f2
TL
4117 // break the session <-> con ref loop by removing the con->session
4118 // reference, which is no longer needed once the MonOpRequest is
4119 // set up.
4120 c->set_priv(NULL);
7c673cae
FG
4121 }
4122}
4123
4124void Monitor::send_reply(MonOpRequestRef op, Message *reply)
4125{
4126 op->mark_event(__func__);
4127
4128 MonSession *session = op->get_session();
11fdf7f2 4129 ceph_assert(session);
7c673cae
FG
4130 Message *req = op->get_req();
4131 ConnectionRef con = op->get_connection();
4132
4133 reply->set_cct(g_ceph_context);
4134 dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
4135
4136 if (!con) {
4137 dout(2) << "send_reply no connection, dropping reply " << *reply
4138 << " to " << req << " " << *req << dendl;
4139 reply->put();
4140 op->mark_event("reply: no connection");
4141 return;
4142 }
4143
4144 if (!session->con && !session->proxy_con) {
4145 dout(2) << "send_reply no connection, dropping reply " << *reply
4146 << " to " << req << " " << *req << dendl;
4147 reply->put();
4148 op->mark_event("reply: no connection");
4149 return;
4150 }
4151
4152 if (session->proxy_con) {
4153 dout(15) << "send_reply routing reply to " << con->get_peer_addr()
4154 << " via " << session->proxy_con->get_peer_addr()
4155 << " for request " << *req << dendl;
4156 session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
4157 op->mark_event("reply: send routed request");
4158 } else {
4159 session->con->send_message(reply);
4160 op->mark_event("reply: send");
4161 }
4162}
4163
4164void Monitor::no_reply(MonOpRequestRef op)
4165{
4166 MonSession *session = op->get_session();
4167 Message *req = op->get_req();
4168
4169 if (session->proxy_con) {
4170 dout(10) << "no_reply to " << req->get_source_inst()
4171 << " via " << session->proxy_con->get_peer_addr()
4172 << " for request " << *req << dendl;
4173 session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
4174 op->mark_event("no_reply: send routed request");
4175 } else {
4176 dout(10) << "no_reply to " << req->get_source_inst()
4177 << " " << *req << dendl;
4178 op->mark_event("no_reply");
4179 }
4180}
4181
4182void Monitor::handle_route(MonOpRequestRef op)
4183{
9f95a23c 4184 auto m = op->get_req<MRoute>();
7c673cae
FG
4185 MonSession *session = op->get_session();
4186 //check privileges
4187 if (!session->is_capable("mon", MON_CAP_X)) {
4188 dout(0) << "MRoute received from entity without appropriate perms! "
4189 << dendl;
4190 return;
4191 }
4192 if (m->msg)
11fdf7f2
TL
4193 dout(10) << "handle_route tid " << m->session_mon_tid << " " << *m->msg
4194 << dendl;
7c673cae 4195 else
11fdf7f2 4196 dout(10) << "handle_route tid " << m->session_mon_tid << " null" << dendl;
7c673cae
FG
4197
4198 // look it up
b3b6e05e 4199 if (!m->session_mon_tid) {
11fdf7f2 4200 dout(10) << " not a routed request, ignoring" << dendl;
b3b6e05e
TL
4201 return;
4202 }
4203 auto found = routed_requests.find(m->session_mon_tid);
4204 if (found == routed_requests.end()) {
4205 dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
4206 return;
4207 }
4208 std::unique_ptr<RoutedRequest> rr{found->second};
4209 // reset payload, in case encoding is dependent on target features
4210 if (m->msg) {
4211 m->msg->clear_payload();
4212 rr->con->send_message(m->msg);
4213 m->msg = NULL;
7c673cae 4214 }
b3b6e05e
TL
4215 if (m->send_osdmap_first) {
4216 dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
4217 osdmon()->send_incremental(m->send_osdmap_first, rr->session,
4218 true, MonOpRequestRef());
4219 }
4220 ceph_assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
4221 routed_requests.erase(found);
4222 rr->session->routed_request_tids.erase(m->session_mon_tid);
7c673cae
FG
4223}
4224
4225void Monitor::resend_routed_requests()
4226{
4227 dout(10) << "resend_routed_requests" << dendl;
4228 int mon = get_leader();
4229 list<Context*> retry;
4230 for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
4231 p != routed_requests.end();
4232 ++p) {
4233 RoutedRequest *rr = p->second;
4234
4235 if (mon == rank) {
4236 dout(10) << " requeue for self tid " << rr->tid << dendl;
4237 rr->op->mark_event("retry routed request");
4238 retry.push_back(new C_RetryMessage(this, rr->op));
4239 if (rr->session) {
11fdf7f2 4240 ceph_assert(rr->session->routed_request_tids.count(p->first));
7c673cae
FG
4241 rr->session->routed_request_tids.erase(p->first);
4242 }
4243 delete rr;
4244 } else {
11fdf7f2
TL
4245 auto q = rr->request_bl.cbegin();
4246 PaxosServiceMessage *req =
4247 (PaxosServiceMessage *)decode_message(cct, 0, q);
7c673cae 4248 rr->op->mark_event("resend forwarded message to leader");
11fdf7f2
TL
4249 dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req
4250 << dendl;
4251 MForward *forward = new MForward(rr->tid,
4252 req,
4253 rr->con_features,
7c673cae
FG
4254 rr->session->caps);
4255 req->put(); // forward takes its own ref; drop ours.
11fdf7f2
TL
4256 forward->client_type = rr->con->get_peer_type();
4257 forward->client_addrs = rr->con->get_peer_addrs();
4258 forward->client_socket_addr = rr->con->get_peer_socket_addr();
7c673cae 4259 forward->set_priority(req->get_priority());
11fdf7f2 4260 send_mon_message(forward, mon);
7c673cae
FG
4261 }
4262 }
4263 if (mon == rank) {
4264 routed_requests.clear();
4265 finish_contexts(g_ceph_context, retry);
4266 }
4267}
4268
4269void Monitor::remove_session(MonSession *s)
4270{
11fdf7f2 4271 dout(10) << "remove_session " << s << " " << s->name << " " << s->addrs
224ce89b 4272 << " features 0x" << std::hex << s->con_features << std::dec << dendl;
11fdf7f2
TL
4273 ceph_assert(s->con);
4274 ceph_assert(!s->closed);
7c673cae
FG
4275 for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
4276 p != s->routed_request_tids.end();
4277 ++p) {
11fdf7f2 4278 ceph_assert(routed_requests.count(*p));
7c673cae
FG
4279 RoutedRequest *rr = routed_requests[*p];
4280 dout(10) << " dropping routed request " << rr->tid << dendl;
4281 delete rr;
4282 routed_requests.erase(*p);
4283 }
4284 s->routed_request_tids.clear();
11fdf7f2 4285 s->con->set_priv(nullptr);
7c673cae
FG
4286 session_map.remove_session(s);
4287 logger->set(l_mon_num_sessions, session_map.get_size());
4288 logger->inc(l_mon_session_rm);
4289}
4290
4291void Monitor::remove_all_sessions()
4292{
11fdf7f2 4293 std::lock_guard l(session_map_lock);
7c673cae
FG
4294 while (!session_map.sessions.empty()) {
4295 MonSession *s = session_map.sessions.front();
4296 remove_session(s);
11fdf7f2 4297 logger->inc(l_mon_session_rm);
7c673cae
FG
4298 }
4299 if (logger)
4300 logger->set(l_mon_num_sessions, session_map.get_size());
4301}
4302
11fdf7f2 4303void Monitor::send_mon_message(Message *m, int rank)
7c673cae 4304{
11fdf7f2 4305 messenger->send_to_mon(m, monmap->get_addrs(rank));
7c673cae
FG
4306}
4307
4308void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
4309{
4310 /**
4311 * Wait list the new session until we're in the quorum, assuming it's
4312 * sufficiently new.
4313 * tick() will periodically send them back through so we can send
4314 * the client elsewhere if we don't think we're getting back in.
4315 *
f67539c2 4316 * But we allow a few sorts of messages:
7c673cae
FG
4317 * 1) Monitors can talk to us at any time, of course.
4318 * 2) auth messages. It's unlikely to go through much faster, but
4319 * it's possible we've just lost our quorum status and we want to take...
4320 * 3) command messages. We want to accept these under all possible
4321 * circumstances.
4322 */
4323 Message *m = op->get_req();
4324 MonSession *s = op->get_session();
4325 ConnectionRef con = op->get_connection();
4326 utime_t too_old = ceph_clock_now();
4327 too_old -= g_ceph_context->_conf->mon_lease;
4328 if (m->get_recv_stamp() > too_old &&
4329 con->is_connected()) {
4330 dout(5) << "waitlisting message " << *m << dendl;
4331 maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
4332 op->mark_wait_for_quorum();
4333 } else {
4334 dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
4335 con->mark_down();
4336 // proxied sessions aren't registered and don't have a con; don't remove
4337 // those.
4338 if (!s->proxy_con) {
11fdf7f2 4339 std::lock_guard l(session_map_lock);
7c673cae
FG
4340 remove_session(s);
4341 }
4342 op->mark_zap();
4343 }
4344}
4345
4346void Monitor::_ms_dispatch(Message *m)
4347{
4348 if (is_shutdown()) {
4349 m->put();
4350 return;
4351 }
4352
4353 MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
4354 bool src_is_mon = op->is_src_mon();
4355 op->mark_event("mon:_ms_dispatch");
4356 MonSession *s = op->get_session();
4357 if (s && s->closed) {
4358 return;
4359 }
224ce89b
WB
4360
4361 if (src_is_mon && s) {
4362 ConnectionRef con = m->get_connection();
4363 if (con->get_messenger() && con->get_features() != s->con_features) {
4364 // only update features if this is a non-anonymous connection
4365 dout(10) << __func__ << " feature change for " << m->get_source_inst()
4366 << " (was " << s->con_features
4367 << ", now " << con->get_features() << ")" << dendl;
4368 // connection features changed - recreate session.
4369 if (s->con && s->con != con) {
4370 dout(10) << __func__ << " connection for " << m->get_source_inst()
4371 << " changed from session; mark down and replace" << dendl;
4372 s->con->mark_down();
4373 }
4374 if (s->item.is_on_list()) {
4375 // forwarded messages' sessions are not in the sessions map and
4376 // exist only while the op is being handled.
9f95a23c 4377 std::lock_guard l(session_map_lock);
224ce89b
WB
4378 remove_session(s);
4379 }
224ce89b
WB
4380 s = nullptr;
4381 }
4382 }
4383
7c673cae
FG
4384 if (!s) {
4385 // if the sender is not a monitor, make sure their first message for a
4386 // session is an MAuth. If it is not, assume it's a stray message,
4387 // and considering that we are creating a new session it is safe to
4388 // assume that the sender hasn't authenticated yet, so we have no way
4389 // of assessing whether we should handle it or not.
4390 if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
4391 m->get_type() != CEPH_MSG_MON_GET_MAP &&
4392 m->get_type() != CEPH_MSG_PING)) {
4393 dout(1) << __func__ << " dropping stray message " << *m
4394 << " from " << m->get_source_inst() << dendl;
4395 return;
4396 }
4397
4398 ConnectionRef con = m->get_connection();
4399 {
11fdf7f2
TL
4400 std::lock_guard l(session_map_lock);
4401 s = session_map.new_session(m->get_source(),
4402 m->get_source_addrs(),
4403 con.get());
7c673cae 4404 }
11fdf7f2
TL
4405 ceph_assert(s);
4406 con->set_priv(RefCountedPtr{s, false});
224ce89b
WB
4407 dout(10) << __func__ << " new session " << s << " " << *s
4408 << " features 0x" << std::hex
4409 << s->con_features << std::dec << dendl;
7c673cae
FG
4410 op->set_session(s);
4411
4412 logger->set(l_mon_num_sessions, session_map.get_size());
4413 logger->inc(l_mon_session_add);
4414
4415 if (src_is_mon) {
4416 // give it monitor caps; the peer type has been authenticated
4417 dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
4418 if (!s->caps.is_allow_all()) // but no need to repeatedly copy
11fdf7f2
TL
4419 s->caps = mon_caps;
4420 s->authenticated = true;
7c673cae 4421 }
7c673cae 4422 } else {
11fdf7f2 4423 dout(20) << __func__ << " existing session " << s << " for " << s->name
7c673cae
FG
4424 << dendl;
4425 }
4426
11fdf7f2 4427 ceph_assert(s);
7c673cae
FG
4428
4429 s->session_timeout = ceph_clock_now();
11fdf7f2 4430 s->session_timeout += g_conf()->mon_session_timeout;
7c673cae
FG
4431
4432 if (s->auth_handler) {
4433 s->entity_name = s->auth_handler->get_entity_name();
c5c27e9a
TL
4434 s->global_id = s->auth_handler->get_global_id();
4435 s->global_id_status = s->auth_handler->get_global_id_status();
7c673cae 4436 }
c5c27e9a
TL
4437 dout(20) << " entity_name " << s->entity_name
4438 << " global_id " << s->global_id
4439 << " (" << s->global_id_status
4440 << ") caps " << s->caps.get_str() << dendl;
7c673cae 4441
f67539c2
TL
4442 if (!session_stretch_allowed(s, op)) {
4443 return;
4444 }
7c673cae 4445 if ((is_synchronizing() ||
11fdf7f2 4446 (!s->authenticated && !exited_quorum.is_zero())) &&
7c673cae
FG
4447 !src_is_mon &&
4448 m->get_type() != CEPH_MSG_PING) {
4449 waitlist_or_zap_client(op);
4450 } else {
4451 dispatch_op(op);
4452 }
4453 return;
4454}
4455
4456void Monitor::dispatch_op(MonOpRequestRef op)
4457{
4458 op->mark_event("mon:dispatch_op");
4459 MonSession *s = op->get_session();
11fdf7f2 4460 ceph_assert(s);
7c673cae
FG
4461 if (s->closed) {
4462 dout(10) << " session closed, dropping " << op->get_req() << dendl;
4463 return;
4464 }
4465
4466 /* we will consider the default type as being 'monitor' until proven wrong */
4467 op->set_type_monitor();
4468 /* deal with all messages that do not necessarily need caps */
7c673cae
FG
4469 switch (op->get_req()->get_type()) {
4470 // auth
4471 case MSG_MON_GLOBAL_ID:
4472 case CEPH_MSG_AUTH:
4473 op->set_type_service();
4474 /* no need to check caps here */
4475 paxos_service[PAXOS_AUTH]->dispatch(op);
11fdf7f2 4476 return;
7c673cae
FG
4477
4478 case CEPH_MSG_PING:
4479 handle_ping(op);
11fdf7f2 4480 return;
9f95a23c
TL
4481 case MSG_COMMAND:
4482 op->set_type_command();
4483 handle_tell_command(op);
4484 return;
11fdf7f2 4485 }
7c673cae 4486
11fdf7f2
TL
4487 if (!op->get_session()->authenticated) {
4488 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4489 << " is not authenticated, dropping " << *(op->get_req())
4490 << dendl;
4491 return;
4492 }
4493
c5c27e9a
TL
4494 // global_id_status == NONE: all sessions for auth_none and krb,
4495 // mon <-> mon sessions (including proxied sessions) for cephx
4496 ceph_assert(s->global_id_status == global_id_status_t::NONE ||
4497 s->global_id_status == global_id_status_t::NEW_OK ||
4498 s->global_id_status == global_id_status_t::NEW_NOT_EXPOSED ||
4499 s->global_id_status == global_id_status_t::RECLAIM_OK ||
4500 s->global_id_status == global_id_status_t::RECLAIM_INSECURE);
4501
4502 // let mon_getmap through for "ping" (which doesn't reconnect)
4503 // and "tell" (which reconnects but doesn't attempt to preserve
4504 // its global_id and stays in NEW_NOT_EXPOSED, retrying until
4505 // ->send_attempts reaches 0)
4506 if (cct->_conf->auth_expose_insecure_global_id_reclaim &&
4507 s->global_id_status == global_id_status_t::NEW_NOT_EXPOSED &&
4508 op->get_req()->get_type() != CEPH_MSG_MON_GET_MAP) {
4509 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4510 << " may omit old_ticket on reconnects, discarding "
4511 << *op->get_req() << " and forcing reconnect" << dendl;
4512 ceph_assert(s->con && !s->proxy_con);
4513 s->con->mark_down();
4514 {
4515 std::lock_guard l(session_map_lock);
4516 remove_session(s);
4517 }
4518 op->mark_zap();
4519 return;
4520 }
4521
11fdf7f2 4522 switch (op->get_req()->get_type()) {
7c673cae
FG
4523 case CEPH_MSG_MON_GET_MAP:
4524 handle_mon_get_map(op);
11fdf7f2
TL
4525 return;
4526
4527 case MSG_GET_CONFIG:
4528 configmon()->handle_get_config(op);
4529 return;
7c673cae 4530
11fdf7f2
TL
4531 case CEPH_MSG_MON_SUBSCRIBE:
4532 /* FIXME: check what's being subscribed, filter accordingly */
4533 handle_subscribe(op);
4534 return;
7c673cae 4535 }
7c673cae
FG
4536
4537 /* well, maybe the op belongs to a service... */
4538 op->set_type_service();
4539 /* deal with all messages which caps should be checked somewhere else */
7c673cae
FG
4540 switch (op->get_req()->get_type()) {
4541
4542 // OSDs
4543 case CEPH_MSG_MON_GET_OSDMAP:
4544 case CEPH_MSG_POOLOP:
4545 case MSG_OSD_BEACON:
4546 case MSG_OSD_MARK_ME_DOWN:
9f95a23c 4547 case MSG_OSD_MARK_ME_DEAD:
7c673cae
FG
4548 case MSG_OSD_FULL:
4549 case MSG_OSD_FAILURE:
4550 case MSG_OSD_BOOT:
4551 case MSG_OSD_ALIVE:
4552 case MSG_OSD_PGTEMP:
4553 case MSG_OSD_PG_CREATED:
4554 case MSG_REMOVE_SNAPS:
9f95a23c 4555 case MSG_MON_GET_PURGED_SNAPS:
11fdf7f2 4556 case MSG_OSD_PG_READY_TO_MERGE:
7c673cae 4557 paxos_service[PAXOS_OSDMAP]->dispatch(op);
11fdf7f2 4558 return;
7c673cae
FG
4559
4560 // MDSs
4561 case MSG_MDS_BEACON:
4562 case MSG_MDS_OFFLOAD_TARGETS:
4563 paxos_service[PAXOS_MDSMAP]->dispatch(op);
11fdf7f2 4564 return;
7c673cae
FG
4565
4566 // Mgrs
4567 case MSG_MGR_BEACON:
4568 paxos_service[PAXOS_MGR]->dispatch(op);
11fdf7f2 4569 return;
7c673cae 4570
31f18b77 4571 // MgrStat
b32b8144 4572 case MSG_MON_MGR_REPORT:
11fdf7f2 4573 case CEPH_MSG_STATFS:
7c673cae 4574 case MSG_GETPOOLSTATS:
31f18b77 4575 paxos_service[PAXOS_MGRSTAT]->dispatch(op);
11fdf7f2 4576 return;
7c673cae 4577
11fdf7f2 4578 // log
7c673cae
FG
4579 case MSG_LOG:
4580 paxos_service[PAXOS_LOG]->dispatch(op);
11fdf7f2 4581 return;
7c673cae
FG
4582
4583 // handle_command() does its own caps checking
4584 case MSG_MON_COMMAND:
4585 op->set_type_command();
4586 handle_command(op);
11fdf7f2 4587 return;
7c673cae 4588 }
7c673cae
FG
4589
4590 /* nop, looks like it's not a service message; revert back to monitor */
4591 op->set_type_monitor();
4592
4593 /* messages we, the Monitor class, need to deal with
4594 * but may be sent by clients. */
4595
4596 if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
4597 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4598 << " not enough caps for " << *(op->get_req()) << " -- dropping"
4599 << dendl;
11fdf7f2 4600 return;
7c673cae
FG
4601 }
4602
7c673cae 4603 switch (op->get_req()->get_type()) {
7c673cae
FG
4604 // misc
4605 case CEPH_MSG_MON_GET_VERSION:
4606 handle_get_version(op);
11fdf7f2 4607 return;
7c673cae 4608 }
7c673cae
FG
4609
4610 if (!op->is_src_mon()) {
4611 dout(1) << __func__ << " unexpected monitor message from"
4612 << " non-monitor entity " << op->get_req()->get_source_inst()
4613 << " " << *(op->get_req()) << " -- dropping" << dendl;
11fdf7f2 4614 return;
7c673cae
FG
4615 }
4616
4617 /* messages that should only be sent by another monitor */
7c673cae
FG
4618 switch (op->get_req()->get_type()) {
4619
4620 case MSG_ROUTE:
4621 handle_route(op);
11fdf7f2 4622 return;
7c673cae
FG
4623
4624 case MSG_MON_PROBE:
4625 handle_probe(op);
11fdf7f2 4626 return;
7c673cae
FG
4627
4628 // Sync (i.e., the new slurp, but on steroids)
4629 case MSG_MON_SYNC:
4630 handle_sync(op);
11fdf7f2 4631 return;
7c673cae
FG
4632 case MSG_MON_SCRUB:
4633 handle_scrub(op);
11fdf7f2 4634 return;
7c673cae
FG
4635
4636 /* log acks are sent from a monitor we sent the MLog to, and are
4637 never sent by clients to us. */
4638 case MSG_LOGACK:
4639 log_client.handle_log_ack((MLogAck*)op->get_req());
11fdf7f2 4640 return;
7c673cae
FG
4641
4642 // monmap
4643 case MSG_MON_JOIN:
4644 op->set_type_service();
4645 paxos_service[PAXOS_MONMAP]->dispatch(op);
11fdf7f2 4646 return;
7c673cae
FG
4647
4648 // paxos
4649 case MSG_MON_PAXOS:
4650 {
4651 op->set_type_paxos();
9f95a23c 4652 auto pm = op->get_req<MMonPaxos>();
7c673cae
FG
4653 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4654 //can't send these!
11fdf7f2 4655 return;
7c673cae
FG
4656 }
4657
4658 if (state == STATE_SYNCHRONIZING) {
4659 // we are synchronizing. These messages would do us no
4660 // good, thus just drop them and ignore them.
4661 dout(10) << __func__ << " ignore paxos msg from "
4662 << pm->get_source_inst() << dendl;
11fdf7f2 4663 return;
7c673cae
FG
4664 }
4665
4666 // sanitize
4667 if (pm->epoch > get_epoch()) {
4668 bootstrap();
11fdf7f2 4669 return;
7c673cae
FG
4670 }
4671 if (pm->epoch != get_epoch()) {
11fdf7f2 4672 return;
7c673cae
FG
4673 }
4674
4675 paxos->dispatch(op);
4676 }
11fdf7f2 4677 return;
7c673cae
FG
4678
4679 // elector messages
4680 case MSG_MON_ELECTION:
f67539c2 4681 op->set_type_election_or_ping();
7c673cae
FG
4682 //check privileges here for simplicity
4683 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4684 dout(0) << "MMonElection received from entity without enough caps!"
4685 << op->get_session()->caps << dendl;
11fdf7f2 4686 return;;
7c673cae
FG
4687 }
4688 if (!is_probing() && !is_synchronizing()) {
4689 elector.dispatch(op);
4690 }
11fdf7f2 4691 return;
7c673cae 4692
f67539c2
TL
4693 case MSG_MON_PING:
4694 op->set_type_election_or_ping();
4695 elector.dispatch(op);
4696 return;
4697
7c673cae
FG
4698 case MSG_FORWARD:
4699 handle_forward(op);
11fdf7f2 4700 return;
7c673cae
FG
4701
4702 case MSG_TIMECHECK:
11fdf7f2
TL
4703 dout(5) << __func__ << " ignoring " << op << dendl;
4704 return;
4705 case MSG_TIMECHECK2:
7c673cae 4706 handle_timecheck(op);
11fdf7f2 4707 return;
7c673cae
FG
4708
4709 case MSG_MON_HEALTH:
11fdf7f2
TL
4710 dout(5) << __func__ << " dropping deprecated message: "
4711 << *op->get_req() << dendl;
7c673cae 4712 break;
224ce89b
WB
4713 case MSG_MON_HEALTH_CHECKS:
4714 op->set_type_service();
4715 paxos_service[PAXOS_HEALTH]->dispatch(op);
11fdf7f2 4716 return;
7c673cae 4717 }
11fdf7f2 4718 dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
7c673cae
FG
4719 return;
4720}
4721
4722void Monitor::handle_ping(MonOpRequestRef op)
4723{
9f95a23c 4724 auto m = op->get_req<MPing>();
7c673cae
FG
4725 dout(10) << __func__ << " " << *m << dendl;
4726 MPing *reply = new MPing;
7c673cae
FG
4727 bufferlist payload;
4728 boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4729 f->open_object_section("pong");
4730
9f95a23c
TL
4731 healthmon()->get_health_status(false, f.get(), nullptr);
4732 get_mon_status(f.get());
7c673cae
FG
4733
4734 f->close_section();
4735 stringstream ss;
4736 f->flush(ss);
11fdf7f2 4737 encode(ss.str(), payload);
7c673cae
FG
4738 reply->set_payload(payload);
4739 dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
11fdf7f2 4740 m->get_connection()->send_message(reply);
7c673cae
FG
4741}
4742
4743void Monitor::timecheck_start()
4744{
4745 dout(10) << __func__ << dendl;
4746 timecheck_cleanup();
11fdf7f2
TL
4747 if (get_quorum_mon_features().contains_all(
4748 ceph::features::mon::FEATURE_NAUTILUS)) {
4749 timecheck_start_round();
4750 }
7c673cae
FG
4751}
4752
4753void Monitor::timecheck_finish()
4754{
4755 dout(10) << __func__ << dendl;
4756 timecheck_cleanup();
4757}
4758
4759void Monitor::timecheck_start_round()
4760{
4761 dout(10) << __func__ << " curr " << timecheck_round << dendl;
11fdf7f2 4762 ceph_assert(is_leader());
7c673cae
FG
4763
4764 if (monmap->size() == 1) {
11fdf7f2 4765 ceph_abort_msg("We are alone; this shouldn't have been scheduled!");
7c673cae
FG
4766 return;
4767 }
4768
4769 if (timecheck_round % 2) {
4770 dout(10) << __func__ << " there's a timecheck going on" << dendl;
4771 utime_t curr_time = ceph_clock_now();
11fdf7f2 4772 double max = g_conf()->mon_timecheck_interval*3;
7c673cae
FG
4773 if (curr_time - timecheck_round_start < max) {
4774 dout(10) << __func__ << " keep current round going" << dendl;
4775 goto out;
4776 } else {
4777 dout(10) << __func__
4778 << " finish current timecheck and start new" << dendl;
4779 timecheck_cancel_round();
4780 }
4781 }
4782
11fdf7f2 4783 ceph_assert(timecheck_round % 2 == 0);
7c673cae
FG
4784 timecheck_acks = 0;
4785 timecheck_round ++;
4786 timecheck_round_start = ceph_clock_now();
4787 dout(10) << __func__ << " new " << timecheck_round << dendl;
4788
4789 timecheck();
4790out:
4791 dout(10) << __func__ << " setting up next event" << dendl;
4792 timecheck_reset_event();
4793}
4794
4795void Monitor::timecheck_finish_round(bool success)
4796{
4797 dout(10) << __func__ << " curr " << timecheck_round << dendl;
11fdf7f2 4798 ceph_assert(timecheck_round % 2);
7c673cae
FG
4799 timecheck_round ++;
4800 timecheck_round_start = utime_t();
4801
4802 if (success) {
11fdf7f2
TL
4803 ceph_assert(timecheck_waiting.empty());
4804 ceph_assert(timecheck_acks == quorum.size());
7c673cae
FG
4805 timecheck_report();
4806 timecheck_check_skews();
4807 return;
4808 }
4809
4810 dout(10) << __func__ << " " << timecheck_waiting.size()
4811 << " peers still waiting:";
11fdf7f2
TL
4812 for (auto& p : timecheck_waiting) {
4813 *_dout << " mon." << p.first;
7c673cae
FG
4814 }
4815 *_dout << dendl;
4816 timecheck_waiting.clear();
4817
4818 dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4819}
4820
4821void Monitor::timecheck_cancel_round()
4822{
4823 timecheck_finish_round(false);
4824}
4825
4826void Monitor::timecheck_cleanup()
4827{
4828 timecheck_round = 0;
4829 timecheck_acks = 0;
4830 timecheck_round_start = utime_t();
4831
4832 if (timecheck_event) {
4833 timer.cancel_event(timecheck_event);
4834 timecheck_event = NULL;
4835 }
4836 timecheck_waiting.clear();
4837 timecheck_skews.clear();
4838 timecheck_latencies.clear();
4839
4840 timecheck_rounds_since_clean = 0;
4841}
4842
4843void Monitor::timecheck_reset_event()
4844{
4845 if (timecheck_event) {
4846 timer.cancel_event(timecheck_event);
4847 timecheck_event = NULL;
4848 }
4849
4850 double delay =
4851 cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4852
4853 if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4854 delay = cct->_conf->mon_timecheck_interval;
4855 }
4856
4857 dout(10) << __func__ << " delay " << delay
4858 << " rounds_since_clean " << timecheck_rounds_since_clean
4859 << dendl;
4860
3efd9988
FG
4861 timecheck_event = timer.add_event_after(
4862 delay,
9f95a23c 4863 new C_MonContext{this, [this](int) {
3efd9988 4864 timecheck_start_round();
9f95a23c 4865 }});
7c673cae
FG
4866}
4867
4868void Monitor::timecheck_check_skews()
4869{
4870 dout(10) << __func__ << dendl;
11fdf7f2
TL
4871 ceph_assert(is_leader());
4872 ceph_assert((timecheck_round % 2) == 0);
7c673cae 4873 if (monmap->size() == 1) {
11fdf7f2 4874 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
7c673cae
FG
4875 return;
4876 }
11fdf7f2 4877 ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
7c673cae
FG
4878
4879 bool found_skew = false;
11fdf7f2 4880 for (auto& p : timecheck_skews) {
7c673cae 4881 double abs_skew;
11fdf7f2 4882 if (timecheck_has_skew(p.second, &abs_skew)) {
7c673cae 4883 dout(10) << __func__
11fdf7f2 4884 << " " << p.first << " skew " << abs_skew << dendl;
7c673cae
FG
4885 found_skew = true;
4886 }
4887 }
4888
4889 if (found_skew) {
4890 ++timecheck_rounds_since_clean;
4891 timecheck_reset_event();
4892 } else if (timecheck_rounds_since_clean > 0) {
4893 dout(1) << __func__
4894 << " no clock skews found after " << timecheck_rounds_since_clean
4895 << " rounds" << dendl;
4896 // make sure the skews are really gone and not just a transient success
4897 // this will run just once if not in the presence of skews again.
4898 timecheck_rounds_since_clean = 1;
4899 timecheck_reset_event();
4900 timecheck_rounds_since_clean = 0;
4901 }
4902
4903}
4904
4905void Monitor::timecheck_report()
4906{
4907 dout(10) << __func__ << dendl;
11fdf7f2
TL
4908 ceph_assert(is_leader());
4909 ceph_assert((timecheck_round % 2) == 0);
7c673cae 4910 if (monmap->size() == 1) {
11fdf7f2 4911 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
7c673cae
FG
4912 return;
4913 }
4914
11fdf7f2 4915 ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
7c673cae
FG
4916 bool do_output = true; // only output report once
4917 for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4918 if (monmap->get_name(*q) == name)
4919 continue;
4920
11fdf7f2 4921 MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_REPORT);
7c673cae
FG
4922 m->epoch = get_epoch();
4923 m->round = timecheck_round;
4924
11fdf7f2
TL
4925 for (auto& it : timecheck_skews) {
4926 double skew = it.second;
4927 double latency = timecheck_latencies[it.first];
7c673cae 4928
11fdf7f2
TL
4929 m->skews[it.first] = skew;
4930 m->latencies[it.first] = latency;
7c673cae
FG
4931
4932 if (do_output) {
11fdf7f2 4933 dout(25) << __func__ << " mon." << it.first
7c673cae
FG
4934 << " latency " << latency
4935 << " skew " << skew << dendl;
4936 }
4937 }
4938 do_output = false;
11fdf7f2
TL
4939 dout(10) << __func__ << " send report to mon." << *q << dendl;
4940 send_mon_message(m, *q);
7c673cae
FG
4941 }
4942}
4943
4944void Monitor::timecheck()
4945{
4946 dout(10) << __func__ << dendl;
11fdf7f2 4947 ceph_assert(is_leader());
7c673cae 4948 if (monmap->size() == 1) {
11fdf7f2 4949 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
7c673cae
FG
4950 return;
4951 }
11fdf7f2 4952 ceph_assert(timecheck_round % 2 != 0);
7c673cae
FG
4953
4954 timecheck_acks = 1; // we ack ourselves
4955
4956 dout(10) << __func__ << " start timecheck epoch " << get_epoch()
4957 << " round " << timecheck_round << dendl;
4958
4959 // we are at the eye of the storm; the point of reference
11fdf7f2
TL
4960 timecheck_skews[rank] = 0.0;
4961 timecheck_latencies[rank] = 0.0;
7c673cae
FG
4962
4963 for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
4964 if (monmap->get_name(*it) == name)
4965 continue;
4966
7c673cae 4967 utime_t curr_time = ceph_clock_now();
11fdf7f2
TL
4968 timecheck_waiting[*it] = curr_time;
4969 MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_PING);
7c673cae
FG
4970 m->epoch = get_epoch();
4971 m->round = timecheck_round;
11fdf7f2
TL
4972 dout(10) << __func__ << " send " << *m << " to mon." << *it << dendl;
4973 send_mon_message(m, *it);
7c673cae
FG
4974 }
4975}
4976
4977health_status_t Monitor::timecheck_status(ostringstream &ss,
4978 const double skew_bound,
4979 const double latency)
4980{
4981 health_status_t status = HEALTH_OK;
11fdf7f2 4982 ceph_assert(latency >= 0);
7c673cae
FG
4983
4984 double abs_skew;
4985 if (timecheck_has_skew(skew_bound, &abs_skew)) {
4986 status = HEALTH_WARN;
4987 ss << "clock skew " << abs_skew << "s"
11fdf7f2 4988 << " > max " << g_conf()->mon_clock_drift_allowed << "s";
7c673cae
FG
4989 }
4990
4991 return status;
4992}
4993
4994void Monitor::handle_timecheck_leader(MonOpRequestRef op)
4995{
9f95a23c 4996 auto m = op->get_req<MTimeCheck2>();
7c673cae
FG
4997 dout(10) << __func__ << " " << *m << dendl;
4998 /* handles PONG's */
11fdf7f2 4999 ceph_assert(m->op == MTimeCheck2::OP_PONG);
7c673cae 5000
11fdf7f2 5001 int other = m->get_source().num();
7c673cae
FG
5002 if (m->epoch < get_epoch()) {
5003 dout(1) << __func__ << " got old timecheck epoch " << m->epoch
5004 << " from " << other
5005 << " curr " << get_epoch()
5006 << " -- severely lagged? discard" << dendl;
5007 return;
5008 }
11fdf7f2 5009 ceph_assert(m->epoch == get_epoch());
7c673cae
FG
5010
5011 if (m->round < timecheck_round) {
5012 dout(1) << __func__ << " got old round " << m->round
5013 << " from " << other
5014 << " curr " << timecheck_round << " -- discard" << dendl;
5015 return;
5016 }
5017
5018 utime_t curr_time = ceph_clock_now();
5019
11fdf7f2 5020 ceph_assert(timecheck_waiting.count(other) > 0);
7c673cae
FG
5021 utime_t timecheck_sent = timecheck_waiting[other];
5022 timecheck_waiting.erase(other);
5023 if (curr_time < timecheck_sent) {
5024 // our clock was readjusted -- drop everything until it all makes sense.
5025 dout(1) << __func__ << " our clock was readjusted --"
5026 << " bump round and drop current check"
5027 << dendl;
5028 timecheck_cancel_round();
5029 return;
5030 }
5031
5032 /* update peer latencies */
5033 double latency = (double)(curr_time - timecheck_sent);
5034
5035 if (timecheck_latencies.count(other) == 0)
5036 timecheck_latencies[other] = latency;
5037 else {
5038 double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
5039 timecheck_latencies[other] = avg_latency;
5040 }
5041
5042 /*
5043 * update skews
5044 *
5045 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
5046 * and 'a' happens to be lower than 'b'; so we use double instead.
5047 *
5048 * latency is always expected to be >= 0.
5049 *
5050 * delta, the difference between theirs timestamp and ours, may either be
5051 * lower or higher than 0; will hardly ever be 0.
5052 *
5053 * The absolute skew is the absolute delta minus the latency, which is
5054 * taken as a whole instead of an rtt given that there is some queueing
5055 * and dispatch times involved and it's hard to assess how long exactly
5056 * it took for the message to travel to the other side and be handled. So
5057 * we call it a bounded skew, the worst case scenario.
5058 *
5059 * Now, to math!
5060 *
5061 * Given that the latency is always positive, we can establish that the
5062 * bounded skew will be:
5063 *
5064 * 1. positive if the absolute delta is higher than the latency and
5065 * delta is positive
5066 * 2. negative if the absolute delta is higher than the latency and
5067 * delta is negative.
5068 * 3. zero if the absolute delta is lower than the latency.
5069 *
5070 * On 3. we make a judgement call and treat the skew as non-existent.
5071 * This is because that, if the absolute delta is lower than the
5072 * latency, then the apparently existing skew is nothing more than a
5073 * side-effect of the high latency at work.
5074 *
5075 * This may not be entirely true though, as a severely skewed clock
5076 * may be masked by an even higher latency, but with high latencies
5077 * we probably have worse issues to deal with than just skewed clocks.
5078 */
11fdf7f2 5079 ceph_assert(latency >= 0);
7c673cae
FG
5080
5081 double delta = ((double) m->timestamp) - ((double) curr_time);
5082 double abs_delta = (delta > 0 ? delta : -delta);
5083 double skew_bound = abs_delta - latency;
5084 if (skew_bound < 0)
5085 skew_bound = 0;
5086 else if (delta < 0)
5087 skew_bound = -skew_bound;
5088
5089 ostringstream ss;
5090 health_status_t status = timecheck_status(ss, skew_bound, latency);
b32b8144
FG
5091 if (status != HEALTH_OK) {
5092 clog->health(status) << other << " " << ss.str();
5093 }
7c673cae
FG
5094
5095 dout(10) << __func__ << " from " << other << " ts " << m->timestamp
5096 << " delta " << delta << " skew_bound " << skew_bound
5097 << " latency " << latency << dendl;
5098
5099 timecheck_skews[other] = skew_bound;
5100
5101 timecheck_acks++;
5102 if (timecheck_acks == quorum.size()) {
5103 dout(10) << __func__ << " got pongs from everybody ("
5104 << timecheck_acks << " total)" << dendl;
11fdf7f2
TL
5105 ceph_assert(timecheck_skews.size() == timecheck_acks);
5106 ceph_assert(timecheck_waiting.empty());
7c673cae
FG
5107 // everyone has acked, so bump the round to finish it.
5108 timecheck_finish_round();
5109 }
5110}
5111
5112void Monitor::handle_timecheck_peon(MonOpRequestRef op)
5113{
9f95a23c 5114 auto m = op->get_req<MTimeCheck2>();
7c673cae
FG
5115 dout(10) << __func__ << " " << *m << dendl;
5116
11fdf7f2
TL
5117 ceph_assert(is_peon());
5118 ceph_assert(m->op == MTimeCheck2::OP_PING || m->op == MTimeCheck2::OP_REPORT);
7c673cae
FG
5119
5120 if (m->epoch != get_epoch()) {
5121 dout(1) << __func__ << " got wrong epoch "
5122 << "(ours " << get_epoch()
5123 << " theirs: " << m->epoch << ") -- discarding" << dendl;
5124 return;
5125 }
5126
5127 if (m->round < timecheck_round) {
5128 dout(1) << __func__ << " got old round " << m->round
5129 << " current " << timecheck_round
5130 << " (epoch " << get_epoch() << ") -- discarding" << dendl;
5131 return;
5132 }
5133
5134 timecheck_round = m->round;
5135
11fdf7f2
TL
5136 if (m->op == MTimeCheck2::OP_REPORT) {
5137 ceph_assert((timecheck_round % 2) == 0);
7c673cae
FG
5138 timecheck_latencies.swap(m->latencies);
5139 timecheck_skews.swap(m->skews);
5140 return;
5141 }
5142
11fdf7f2
TL
5143 ceph_assert((timecheck_round % 2) != 0);
5144 MTimeCheck2 *reply = new MTimeCheck2(MTimeCheck2::OP_PONG);
7c673cae
FG
5145 utime_t curr_time = ceph_clock_now();
5146 reply->timestamp = curr_time;
5147 reply->epoch = m->epoch;
5148 reply->round = m->round;
5149 dout(10) << __func__ << " send " << *m
5150 << " to " << m->get_source_inst() << dendl;
5151 m->get_connection()->send_message(reply);
5152}
5153
5154void Monitor::handle_timecheck(MonOpRequestRef op)
5155{
9f95a23c 5156 auto m = op->get_req<MTimeCheck2>();
7c673cae
FG
5157 dout(10) << __func__ << " " << *m << dendl;
5158
5159 if (is_leader()) {
11fdf7f2 5160 if (m->op != MTimeCheck2::OP_PONG) {
7c673cae
FG
5161 dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
5162 } else {
5163 handle_timecheck_leader(op);
5164 }
5165 } else if (is_peon()) {
11fdf7f2 5166 if (m->op != MTimeCheck2::OP_PING && m->op != MTimeCheck2::OP_REPORT) {
7c673cae
FG
5167 dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
5168 } else {
5169 handle_timecheck_peon(op);
5170 }
5171 } else {
5172 dout(1) << __func__ << " drop unexpected msg" << dendl;
5173 }
5174}
5175
5176void Monitor::handle_subscribe(MonOpRequestRef op)
5177{
9f95a23c 5178 auto m = op->get_req<MMonSubscribe>();
7c673cae
FG
5179 dout(10) << "handle_subscribe " << *m << dendl;
5180
5181 bool reply = false;
5182
5183 MonSession *s = op->get_session();
11fdf7f2
TL
5184 ceph_assert(s);
5185
5186 if (m->hostname.size()) {
5187 s->remote_host = m->hostname;
5188 }
7c673cae
FG
5189
5190 for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
5191 p != m->what.end();
5192 ++p) {
11fdf7f2
TL
5193 if (p->first == "monmap" || p->first == "config") {
5194 // these require no caps
5195 } else if (!s->is_capable("mon", MON_CAP_R)) {
5196 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
5197 << " not enough caps for " << *(op->get_req()) << " -- dropping"
5198 << dendl;
5199 continue;
5200 }
5201
7c673cae
FG
5202 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
5203 if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
5204 reply = true;
5205
5206 // remove conflicting subscribes
5207 if (logmon()->sub_name_to_id(p->first) >= 0) {
5208 for (map<string, Subscription*>::iterator it = s->sub_map.begin();
5209 it != s->sub_map.end(); ) {
5210 if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
11fdf7f2 5211 std::lock_guard l(session_map_lock);
7c673cae
FG
5212 session_map.remove_sub((it++)->second);
5213 } else {
5214 ++it;
5215 }
5216 }
5217 }
5218
5219 {
11fdf7f2 5220 std::lock_guard l(session_map_lock);
7c673cae
FG
5221 session_map.add_update_sub(s, p->first, p->second.start,
5222 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
5223 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
5224 }
5225
5226 if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
5227 dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
5228 if ((int)s->is_capable("mds", MON_CAP_R)) {
5229 Subscription *sub = s->sub_map[p->first];
11fdf7f2 5230 ceph_assert(sub != nullptr);
7c673cae
FG
5231 mdsmon()->check_sub(sub);
5232 }
5233 } else if (p->first == "osdmap") {
5234 if ((int)s->is_capable("osd", MON_CAP_R)) {
5235 if (s->osd_epoch > p->second.start) {
5236 // client needs earlier osdmaps on purpose, so reset the sent epoch
5237 s->osd_epoch = 0;
5238 }
5239 osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
5240 }
5241 } else if (p->first == "osd_pg_creates") {
5242 if ((int)s->is_capable("osd", MON_CAP_W)) {
11fdf7f2 5243 osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
7c673cae
FG
5244 }
5245 } else if (p->first == "monmap") {
5246 monmon()->check_sub(s->sub_map[p->first]);
5247 } else if (logmon()->sub_name_to_id(p->first) >= 0) {
5248 logmon()->check_sub(s->sub_map[p->first]);
5249 } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
5250 mgrmon()->check_sub(s->sub_map[p->first]);
224ce89b
WB
5251 } else if (p->first == "servicemap") {
5252 mgrstatmon()->check_sub(s->sub_map[p->first]);
11fdf7f2
TL
5253 } else if (p->first == "config") {
5254 configmon()->check_sub(s);
f67539c2
TL
5255 } else if (p->first.find("kv:") == 0) {
5256 kvmon()->check_sub(s->sub_map[p->first]);
7c673cae
FG
5257 }
5258 }
5259
5260 if (reply) {
5261 // we only need to reply if the client is old enough to think it
5262 // has to send renewals.
5263 ConnectionRef con = m->get_connection();
5264 if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
5265 m->get_connection()->send_message(new MMonSubscribeAck(
11fdf7f2 5266 monmap->get_fsid(), (int)g_conf()->mon_subscribe_interval));
7c673cae
FG
5267 }
5268
5269}
5270
5271void Monitor::handle_get_version(MonOpRequestRef op)
5272{
9f95a23c 5273 auto m = op->get_req<MMonGetVersion>();
7c673cae
FG
5274 dout(10) << "handle_get_version " << *m << dendl;
5275 PaxosService *svc = NULL;
5276
5277 MonSession *s = op->get_session();
11fdf7f2 5278 ceph_assert(s);
7c673cae
FG
5279
5280 if (!is_leader() && !is_peon()) {
5281 dout(10) << " waiting for quorum" << dendl;
5282 waitfor_quorum.push_back(new C_RetryMessage(this, op));
5283 goto out;
5284 }
5285
5286 if (m->what == "mdsmap") {
5287 svc = mdsmon();
5288 } else if (m->what == "fsmap") {
5289 svc = mdsmon();
5290 } else if (m->what == "osdmap") {
5291 svc = osdmon();
5292 } else if (m->what == "monmap") {
5293 svc = monmon();
5294 } else {
5295 derr << "invalid map type " << m->what << dendl;
5296 }
5297
5298 if (svc) {
5299 if (!svc->is_readable()) {
5300 svc->wait_for_readable(op, new C_RetryMessage(this, op));
5301 goto out;
5302 }
5303
5304 MMonGetVersionReply *reply = new MMonGetVersionReply();
5305 reply->handle = m->handle;
5306 reply->version = svc->get_last_committed();
5307 reply->oldest_version = svc->get_first_committed();
5308 reply->set_tid(m->get_tid());
5309
5310 m->get_connection()->send_message(reply);
5311 }
5312 out:
5313 return;
5314}
5315
5316bool Monitor::ms_handle_reset(Connection *con)
5317{
5318 dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
5319
5320 // ignore lossless monitor sessions
5321 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
5322 return false;
5323
11fdf7f2
TL
5324 auto priv = con->get_priv();
5325 auto s = static_cast<MonSession*>(priv.get());
7c673cae
FG
5326 if (!s)
5327 return false;
5328
5329 // break any con <-> session ref cycle
11fdf7f2 5330 s->con->set_priv(nullptr);
7c673cae
FG
5331
5332 if (is_shutdown())
5333 return false;
5334
11fdf7f2 5335 std::lock_guard l(lock);
7c673cae 5336
11fdf7f2
TL
5337 dout(10) << "reset/close on session " << s->name << " " << s->addrs << dendl;
5338 if (!s->closed && s->item.is_on_list()) {
5339 std::lock_guard l(session_map_lock);
7c673cae
FG
5340 remove_session(s);
5341 }
7c673cae
FG
5342 return true;
5343}
5344
5345bool Monitor::ms_handle_refused(Connection *con)
5346{
5347 // just log for now...
5348 dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
5349 return false;
5350}
5351
5352// -----
5353
5354void Monitor::send_latest_monmap(Connection *con)
5355{
5356 bufferlist bl;
5357 monmap->encode(bl, con->get_features());
5358 con->send_message(new MMonMap(bl));
5359}
5360
5361void Monitor::handle_mon_get_map(MonOpRequestRef op)
5362{
9f95a23c 5363 auto m = op->get_req<MMonGetMap>();
7c673cae
FG
5364 dout(10) << "handle_mon_get_map" << dendl;
5365 send_latest_monmap(m->get_connection().get());
5366}
5367
224ce89b 5368int Monitor::load_metadata()
7c673cae
FG
5369{
5370 bufferlist bl;
5371 int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
5372 if (r)
5373 return r;
11fdf7f2
TL
5374 auto it = bl.cbegin();
5375 decode(mon_metadata, it);
224ce89b
WB
5376
5377 pending_metadata = mon_metadata;
7c673cae
FG
5378 return 0;
5379}
5380
5381int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
5382{
11fdf7f2 5383 ceph_assert(f);
224ce89b 5384 if (!mon_metadata.count(mon)) {
7c673cae
FG
5385 err << "mon." << mon << " not found";
5386 return -EINVAL;
5387 }
224ce89b 5388 const Metadata& m = mon_metadata[mon];
7c673cae
FG
5389 for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
5390 f->dump_string(p->first.c_str(), p->second);
5391 }
5392 return 0;
5393}
5394
c07f9fc5 5395void Monitor::count_metadata(const string& field, map<string,int> *out)
31f18b77 5396{
224ce89b 5397 for (auto& p : mon_metadata) {
31f18b77
FG
5398 auto q = p.second.find(field);
5399 if (q == p.second.end()) {
c07f9fc5 5400 (*out)["unknown"]++;
31f18b77 5401 } else {
c07f9fc5 5402 (*out)[q->second]++;
31f18b77
FG
5403 }
5404 }
c07f9fc5
FG
5405}
5406
5407void Monitor::count_metadata(const string& field, Formatter *f)
5408{
5409 map<string,int> by_val;
5410 count_metadata(field, &by_val);
31f18b77
FG
5411 f->open_object_section(field.c_str());
5412 for (auto& p : by_val) {
5413 f->dump_int(p.first.c_str(), p.second);
5414 }
5415 f->close_section();
5416}
5417
f67539c2
TL
5418void Monitor::get_all_versions(std::map<string, list<string> > &versions)
5419{
5420 // mon
5421 get_versions(versions);
5422 // osd
5423 osdmon()->get_versions(versions);
5424 // mgr
5425 mgrmon()->get_versions(versions);
5426 // mds
5427 mdsmon()->get_versions(versions);
5428 dout(20) << __func__ << " all versions=" << versions << dendl;
5429}
5430
5431void Monitor::get_versions(std::map<string, list<string> > &versions)
5432{
5433 for (auto& [rank, metadata] : mon_metadata) {
5434 auto q = metadata.find("ceph_version_short");
5435 if (q == metadata.end()) {
5436 // not likely
5437 continue;
5438 }
5439 versions[q->second].push_back(string("mon.") + monmap->get_name(rank));
5440 }
5441}
5442
7c673cae
FG
5443int Monitor::print_nodes(Formatter *f, ostream& err)
5444{
11fdf7f2 5445 map<string, list<string> > mons; // hostname => mon
224ce89b
WB
5446 for (map<int, Metadata>::iterator it = mon_metadata.begin();
5447 it != mon_metadata.end(); ++it) {
7c673cae
FG
5448 const Metadata& m = it->second;
5449 Metadata::const_iterator hostname = m.find("hostname");
5450 if (hostname == m.end()) {
5451 // not likely though
5452 continue;
5453 }
11fdf7f2 5454 mons[hostname->second].push_back(monmap->get_name(it->first));
7c673cae
FG
5455 }
5456
5457 dump_services(f, mons, "mon");
5458 return 0;
5459}
5460
5461// ----------------------------------------------
5462// scrub
5463
5464int Monitor::scrub_start()
5465{
5466 dout(10) << __func__ << dendl;
11fdf7f2 5467 ceph_assert(is_leader());
7c673cae
FG
5468
5469 if (!scrub_result.empty()) {
5470 clog->info() << "scrub already in progress";
5471 return -EBUSY;
5472 }
5473
5474 scrub_event_cancel();
5475 scrub_result.clear();
5476 scrub_state.reset(new ScrubState);
5477
5478 scrub();
5479 return 0;
5480}
5481
5482int Monitor::scrub()
5483{
11fdf7f2
TL
5484 ceph_assert(is_leader());
5485 ceph_assert(scrub_state);
7c673cae
FG
5486
5487 scrub_cancel_timeout();
5488 wait_for_paxos_write();
5489 scrub_version = paxos->get_version();
5490
5491
5492 // scrub all keys if we're the only monitor in the quorum
5493 int32_t num_keys =
5494 (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
5495
5496 for (set<int>::iterator p = quorum.begin();
5497 p != quorum.end();
5498 ++p) {
5499 if (*p == rank)
5500 continue;
5501 MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
5502 num_keys);
5503 r->key = scrub_state->last_key;
11fdf7f2 5504 send_mon_message(r, *p);
7c673cae
FG
5505 }
5506
5507 // scrub my keys
5508 bool r = _scrub(&scrub_result[rank],
5509 &scrub_state->last_key,
5510 &num_keys);
5511
5512 scrub_state->finished = !r;
5513
5514 // only after we got our scrub results do we really care whether the
5515 // other monitors are late on their results. Also, this way we avoid
5516 // triggering the timeout if we end up getting stuck in _scrub() for
5517 // longer than the duration of the timeout.
5518 scrub_reset_timeout();
5519
5520 if (quorum.size() == 1) {
11fdf7f2 5521 ceph_assert(scrub_state->finished == true);
7c673cae
FG
5522 scrub_finish();
5523 }
5524 return 0;
5525}
5526
5527void Monitor::handle_scrub(MonOpRequestRef op)
5528{
9f95a23c 5529 auto m = op->get_req<MMonScrub>();
7c673cae
FG
5530 dout(10) << __func__ << " " << *m << dendl;
5531 switch (m->op) {
5532 case MMonScrub::OP_SCRUB:
5533 {
5534 if (!is_peon())
5535 break;
5536
5537 wait_for_paxos_write();
5538
5539 if (m->version != paxos->get_version())
5540 break;
5541
5542 MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
5543 m->version,
5544 m->num_keys);
5545
5546 reply->key = m->key;
5547 _scrub(&reply->result, &reply->key, &reply->num_keys);
5548 m->get_connection()->send_message(reply);
5549 }
5550 break;
5551
5552 case MMonScrub::OP_RESULT:
5553 {
5554 if (!is_leader())
5555 break;
5556 if (m->version != scrub_version)
5557 break;
5558 // reset the timeout each time we get a result
5559 scrub_reset_timeout();
5560
5561 int from = m->get_source().num();
11fdf7f2 5562 ceph_assert(scrub_result.count(from) == 0);
7c673cae
FG
5563 scrub_result[from] = m->result;
5564
5565 if (scrub_result.size() == quorum.size()) {
5566 scrub_check_results();
5567 scrub_result.clear();
5568 if (scrub_state->finished)
5569 scrub_finish();
5570 else
5571 scrub();
5572 }
5573 }
5574 break;
5575 }
5576}
5577
5578bool Monitor::_scrub(ScrubResult *r,
5579 pair<string,string> *start,
5580 int *num_keys)
5581{
11fdf7f2
TL
5582 ceph_assert(r != NULL);
5583 ceph_assert(start != NULL);
5584 ceph_assert(num_keys != NULL);
7c673cae
FG
5585
5586 set<string> prefixes = get_sync_targets_names();
5587 prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5588
5589 dout(10) << __func__ << " start (" << *start << ")"
5590 << " num_keys " << *num_keys << dendl;
5591
5592 MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
5593
5594 int scrubbed_keys = 0;
5595 pair<string,string> last_key;
5596
5597 while (it->has_next_chunk()) {
5598
5599 if (*num_keys > 0 && scrubbed_keys == *num_keys)
5600 break;
5601
5602 pair<string,string> k = it->get_next_key();
5603 if (prefixes.count(k.first) == 0)
5604 continue;
5605
5606 if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
5607 (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
5608 dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
5609 << dendl;
5610 continue;
5611 }
5612
5613 bufferlist bl;
224ce89b 5614 int err = store->get(k.first, k.second, bl);
11fdf7f2 5615 ceph_assert(err == 0);
224ce89b 5616
7c673cae
FG
5617 uint32_t key_crc = bl.crc32c(0);
5618 dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
5619 << " crc " << key_crc << dendl;
5620 r->prefix_keys[k.first]++;
224ce89b 5621 if (r->prefix_crc.count(k.first) == 0) {
7c673cae 5622 r->prefix_crc[k.first] = 0;
224ce89b 5623 }
7c673cae
FG
5624 r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
5625
5626 if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
5627 (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
5628 dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
5629 r->prefix_crc[k.first] += 1;
5630 }
5631
5632 ++scrubbed_keys;
5633 last_key = k;
5634 }
5635
5636 dout(20) << __func__ << " last_key (" << last_key << ")"
5637 << " scrubbed_keys " << scrubbed_keys
5638 << " has_next " << it->has_next_chunk() << dendl;
5639
5640 *start = last_key;
5641 *num_keys = scrubbed_keys;
5642
5643 return it->has_next_chunk();
5644}
5645
5646void Monitor::scrub_check_results()
5647{
5648 dout(10) << __func__ << dendl;
5649
5650 // compare
5651 int errors = 0;
5652 ScrubResult& mine = scrub_result[rank];
5653 for (map<int,ScrubResult>::iterator p = scrub_result.begin();
5654 p != scrub_result.end();
5655 ++p) {
5656 if (p->first == rank)
5657 continue;
5658 if (p->second != mine) {
5659 ++errors;
5660 clog->error() << "scrub mismatch";
5661 clog->error() << " mon." << rank << " " << mine;
5662 clog->error() << " mon." << p->first << " " << p->second;
5663 }
5664 }
5665 if (!errors)
d2e6a577 5666 clog->debug() << "scrub ok on " << quorum << ": " << mine;
7c673cae
FG
5667}
5668
5669inline void Monitor::scrub_timeout()
5670{
5671 dout(1) << __func__ << " restarting scrub" << dendl;
5672 scrub_reset();
5673 scrub_start();
5674}
5675
5676void Monitor::scrub_finish()
5677{
5678 dout(10) << __func__ << dendl;
5679 scrub_reset();
5680 scrub_event_start();
5681}
5682
5683void Monitor::scrub_reset()
5684{
5685 dout(10) << __func__ << dendl;
5686 scrub_cancel_timeout();
5687 scrub_version = 0;
5688 scrub_result.clear();
5689 scrub_state.reset();
5690}
5691
f67539c2 5692inline void Monitor::scrub_update_interval(ceph::timespan interval)
7c673cae
FG
5693{
5694 // we don't care about changes if we are not the leader.
5695 // changes will be visible if we become the leader.
5696 if (!is_leader())
5697 return;
5698
f67539c2 5699 dout(1) << __func__ << " new interval = " << interval << dendl;
7c673cae
FG
5700
5701 // if scrub already in progress, all changes will already be visible during
5702 // the next round. Nothing to do.
5703 if (scrub_state != NULL)
5704 return;
5705
5706 scrub_event_cancel();
5707 scrub_event_start();
5708}
5709
5710void Monitor::scrub_event_start()
5711{
5712 dout(10) << __func__ << dendl;
5713
5714 if (scrub_event)
5715 scrub_event_cancel();
5716
f67539c2
TL
5717 auto scrub_interval =
5718 cct->_conf.get_val<std::chrono::seconds>("mon_scrub_interval");
5719 if (scrub_interval == std::chrono::seconds::zero()) {
7c673cae 5720 dout(1) << __func__ << " scrub event is disabled"
f67539c2 5721 << " (mon_scrub_interval = " << scrub_interval
7c673cae
FG
5722 << ")" << dendl;
5723 return;
5724 }
5725
3efd9988 5726 scrub_event = timer.add_event_after(
f67539c2 5727 scrub_interval,
9f95a23c 5728 new C_MonContext{this, [this](int) {
7c673cae 5729 scrub_start();
9f95a23c 5730 }});
7c673cae
FG
5731}
5732
5733void Monitor::scrub_event_cancel()
5734{
5735 dout(10) << __func__ << dendl;
5736 if (scrub_event) {
5737 timer.cancel_event(scrub_event);
5738 scrub_event = NULL;
5739 }
5740}
5741
5742inline void Monitor::scrub_cancel_timeout()
5743{
5744 if (scrub_timeout_event) {
5745 timer.cancel_event(scrub_timeout_event);
5746 scrub_timeout_event = NULL;
5747 }
5748}
5749
5750void Monitor::scrub_reset_timeout()
5751{
5752 dout(15) << __func__ << " reset timeout event" << dendl;
5753 scrub_cancel_timeout();
3efd9988 5754 scrub_timeout_event = timer.add_event_after(
11fdf7f2 5755 g_conf()->mon_scrub_timeout,
9f95a23c 5756 new C_MonContext{this, [this](int) {
7c673cae 5757 scrub_timeout();
9f95a23c 5758 }});
7c673cae
FG
5759}
5760
5761/************ TICK ***************/
5762void Monitor::new_tick()
5763{
9f95a23c 5764 timer.add_event_after(g_conf()->mon_tick_interval, new C_MonContext{this, [this](int) {
7c673cae 5765 tick();
9f95a23c 5766 }});
7c673cae
FG
5767}
5768
5769void Monitor::tick()
5770{
5771 // ok go.
5772 dout(11) << "tick" << dendl;
181888fb 5773 const utime_t now = ceph_clock_now();
7c673cae 5774
181888fb
FG
5775 // Check if we need to emit any delayed health check updated messages
5776 if (is_leader()) {
11fdf7f2 5777 const auto min_period = g_conf().get_val<int64_t>(
181888fb
FG
5778 "mon_health_log_update_period");
5779 for (auto& svc : paxos_service) {
5780 auto health = svc->get_health_checks();
5781
5782 for (const auto &i : health.checks) {
5783 const std::string &code = i.first;
5784 const std::string &summary = i.second.summary;
5785 const health_status_t severity = i.second.severity;
5786
5787 auto status_iter = health_check_log_times.find(code);
5788 if (status_iter == health_check_log_times.end()) {
5789 continue;
5790 }
5791
5792 auto &log_status = status_iter->second;
5793 bool const changed = log_status.last_message != summary
5794 || log_status.severity != severity;
5795
5796 if (changed && now - log_status.updated_at > min_period) {
5797 log_status.last_message = summary;
5798 log_status.updated_at = now;
5799 log_status.severity = severity;
5800
5801 ostringstream ss;
5802 ss << "Health check update: " << summary << " (" << code << ")";
5803 clog->health(severity) << ss.str();
5804 }
5805 }
5806 }
5807 }
5808
5809
11fdf7f2
TL
5810 for (auto& svc : paxos_service) {
5811 svc->tick();
5812 svc->maybe_trim();
7c673cae
FG
5813 }
5814
5815 // trim sessions
7c673cae 5816 {
11fdf7f2 5817 std::lock_guard l(session_map_lock);
7c673cae
FG
5818 auto p = session_map.sessions.begin();
5819
5820 bool out_for_too_long = (!exited_quorum.is_zero() &&
11fdf7f2 5821 now > (exited_quorum + 2*g_conf()->mon_lease));
7c673cae
FG
5822
5823 while (!p.end()) {
5824 MonSession *s = *p;
5825 ++p;
5826
5827 // don't trim monitors
11fdf7f2 5828 if (s->name.is_mon())
7c673cae
FG
5829 continue;
5830
5831 if (s->session_timeout < now && s->con) {
5832 // check keepalive, too
5833 s->session_timeout = s->con->get_last_keepalive();
11fdf7f2 5834 s->session_timeout += g_conf()->mon_session_timeout;
7c673cae
FG
5835 }
5836 if (s->session_timeout < now) {
11fdf7f2
TL
5837 dout(10) << " trimming session " << s->con << " " << s->name
5838 << " " << s->addrs
7c673cae
FG
5839 << " (timeout " << s->session_timeout
5840 << " < now " << now << ")" << dendl;
5841 } else if (out_for_too_long) {
5842 // boot the client Session because we've taken too long getting back in
11fdf7f2 5843 dout(10) << " trimming session " << s->con << " " << s->name
7c673cae
FG
5844 << " because we've been out of quorum too long" << dendl;
5845 } else {
5846 continue;
5847 }
5848
5849 s->con->mark_down();
5850 remove_session(s);
5851 logger->inc(l_mon_session_trim);
5852 }
5853 }
5854 sync_trim_providers();
5855
5856 if (!maybe_wait_for_quorum.empty()) {
5857 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5858 }
5859
5860 if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5861 // this is only necessary on upgraded clusters.
5862 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5863 prepare_new_fingerprint(t);
5864 paxos->trigger_propose();
5865 }
5866
11fdf7f2 5867 mgr_client.update_daemon_health(get_health_metrics());
7c673cae
FG
5868 new_tick();
5869}
5870
11fdf7f2
TL
5871vector<DaemonHealthMetric> Monitor::get_health_metrics()
5872{
5873 vector<DaemonHealthMetric> metrics;
5874
5875 utime_t oldest_secs;
5876 const utime_t now = ceph_clock_now();
5877 auto too_old = now;
5878 too_old -= g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count();
5879 int slow = 0;
5880 TrackedOpRef oldest_op;
5881 auto count_slow_ops = [&](TrackedOp& op) {
5882 if (op.get_initiated() < too_old) {
5883 slow++;
5884 if (!oldest_op || op.get_initiated() < oldest_op->get_initiated()) {
5885 oldest_op = &op;
5886 }
5887 return true;
5888 } else {
5889 return false;
5890 }
5891 };
5892 if (op_tracker.visit_ops_in_flight(&oldest_secs, count_slow_ops)) {
5893 if (slow) {
5894 derr << __func__ << " reporting " << slow << " slow ops, oldest is "
5895 << oldest_op->get_desc() << dendl;
5896 }
5897 metrics.emplace_back(daemon_metric::SLOW_OPS, slow, oldest_secs);
5898 } else {
5899 metrics.emplace_back(daemon_metric::SLOW_OPS, 0, 0);
5900 }
5901 return metrics;
5902}
5903
7c673cae
FG
5904void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5905{
5906 uuid_d nf;
5907 nf.generate_random();
5908 dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5909
5910 bufferlist bl;
11fdf7f2 5911 encode(nf, bl);
7c673cae
FG
5912 t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5913}
5914
5915int Monitor::check_fsid()
5916{
5917 bufferlist ebl;
5918 int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5919 if (r == -ENOENT)
5920 return r;
11fdf7f2 5921 ceph_assert(r == 0);
7c673cae
FG
5922
5923 string es(ebl.c_str(), ebl.length());
5924
5925 // only keep the first line
5926 size_t pos = es.find_first_of('\n');
5927 if (pos != string::npos)
5928 es.resize(pos);
5929
5930 dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
5931 uuid_d ondisk;
5932 if (!ondisk.parse(es.c_str())) {
5933 derr << "error: unable to parse uuid" << dendl;
5934 return -EINVAL;
5935 }
5936
5937 if (monmap->get_fsid() != ondisk) {
5938 derr << "error: cluster_uuid file exists with value " << ondisk
5939 << ", != our uuid " << monmap->get_fsid() << dendl;
5940 return -EEXIST;
5941 }
5942
5943 return 0;
5944}
5945
5946int Monitor::write_fsid()
5947{
5948 auto t(std::make_shared<MonitorDBStore::Transaction>());
5949 write_fsid(t);
5950 int r = store->apply_transaction(t);
5951 return r;
5952}
5953
5954int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
5955{
5956 ostringstream ss;
5957 ss << monmap->get_fsid() << "\n";
5958 string us = ss.str();
5959
5960 bufferlist b;
5961 b.append(us);
5962
5963 t->put(MONITOR_NAME, "cluster_uuid", b);
5964 return 0;
5965}
5966
5967/*
5968 * this is the closest thing to a traditional 'mkfs' for ceph.
5969 * initialize the monitor state machines to their initial values.
5970 */
5971int Monitor::mkfs(bufferlist& osdmapbl)
5972{
5973 auto t(std::make_shared<MonitorDBStore::Transaction>());
5974
5975 // verify cluster fsid
5976 int r = check_fsid();
5977 if (r < 0 && r != -ENOENT)
5978 return r;
5979
5980 bufferlist magicbl;
5981 magicbl.append(CEPH_MON_ONDISK_MAGIC);
5982 magicbl.append("\n");
5983 t->put(MONITOR_NAME, "magic", magicbl);
5984
5985
5986 features = get_initial_supported_features();
5987 write_features(t);
5988
5989 // save monmap, osdmap, keyring.
5990 bufferlist monmapbl;
5991 monmap->encode(monmapbl, CEPH_FEATURES_ALL);
5992 monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5993 t->put("mkfs", "monmap", monmapbl);
5994
5995 if (osdmapbl.length()) {
5996 // make sure it's a valid osdmap
5997 try {
5998 OSDMap om;
5999 om.decode(osdmapbl);
6000 }
f67539c2 6001 catch (ceph::buffer::error& e) {
7c673cae
FG
6002 derr << "error decoding provided osdmap: " << e.what() << dendl;
6003 return -EINVAL;
6004 }
6005 t->put("mkfs", "osdmap", osdmapbl);
6006 }
6007
6008 if (is_keyring_required()) {
6009 KeyRing keyring;
6010 string keyring_filename;
6011
11fdf7f2 6012 r = ceph_resolve_file_search(g_conf()->keyring, keyring_filename);
7c673cae 6013 if (r) {
11fdf7f2 6014 derr << "unable to find a keyring file on " << g_conf()->keyring
7c673cae 6015 << ": " << cpp_strerror(r) << dendl;
11fdf7f2
TL
6016 if (g_conf()->key != "") {
6017 string keyring_plaintext = "[mon.]\n\tkey = " + g_conf()->key +
7c673cae
FG
6018 "\n\tcaps mon = \"allow *\"\n";
6019 bufferlist bl;
6020 bl.append(keyring_plaintext);
6021 try {
11fdf7f2 6022 auto i = bl.cbegin();
7c673cae
FG
6023 keyring.decode_plaintext(i);
6024 }
f67539c2 6025 catch (const ceph::buffer::error& e) {
7c673cae
FG
6026 derr << "error decoding keyring " << keyring_plaintext
6027 << ": " << e.what() << dendl;
6028 return -EINVAL;
6029 }
6030 } else {
6031 return -ENOENT;
6032 }
6033 } else {
6034 r = keyring.load(g_ceph_context, keyring_filename);
6035 if (r < 0) {
11fdf7f2 6036 derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
7c673cae
FG
6037 return r;
6038 }
6039 }
6040
6041 // put mon. key in external keyring; seed with everything else.
6042 extract_save_mon_key(keyring);
6043
6044 bufferlist keyringbl;
6045 keyring.encode_plaintext(keyringbl);
6046 t->put("mkfs", "keyring", keyringbl);
6047 }
6048 write_fsid(t);
6049 store->apply_transaction(t);
6050
6051 return 0;
6052}
6053
6054int Monitor::write_default_keyring(bufferlist& bl)
6055{
6056 ostringstream os;
11fdf7f2 6057 os << g_conf()->mon_data << "/keyring";
7c673cae
FG
6058
6059 int err = 0;
91327a77 6060 int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT|O_CLOEXEC, 0600);
7c673cae
FG
6061 if (fd < 0) {
6062 err = -errno;
6063 dout(0) << __func__ << " failed to open " << os.str()
6064 << ": " << cpp_strerror(err) << dendl;
6065 return err;
6066 }
6067
6068 err = bl.write_fd(fd);
6069 if (!err)
6070 ::fsync(fd);
6071 VOID_TEMP_FAILURE_RETRY(::close(fd));
6072
6073 return err;
6074}
6075
6076void Monitor::extract_save_mon_key(KeyRing& keyring)
6077{
6078 EntityName mon_name;
6079 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
6080 EntityAuth mon_key;
6081 if (keyring.get_auth(mon_name, mon_key)) {
6082 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
6083 KeyRing pkey;
6084 pkey.add(mon_name, mon_key);
6085 bufferlist bl;
6086 pkey.encode_plaintext(bl);
6087 write_default_keyring(bl);
6088 keyring.remove(mon_name);
6089 }
6090}
6091
11fdf7f2
TL
6092// AuthClient methods -- for mon <-> mon communication
6093int Monitor::get_auth_request(
6094 Connection *con,
6095 AuthConnectionMeta *auth_meta,
6096 uint32_t *method,
6097 vector<uint32_t> *preferred_modes,
6098 bufferlist *out)
6099{
6100 std::scoped_lock l(auth_lock);
6101 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
6102 con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
6103 return -EACCES;
6104 }
6105 AuthAuthorizer *auth;
9f95a23c 6106 if (!get_authorizer(con->get_peer_type(), &auth)) {
11fdf7f2
TL
6107 return -EACCES;
6108 }
6109 auth_meta->authorizer.reset(auth);
6110 auth_registry.get_supported_modes(con->get_peer_type(),
6111 auth->protocol,
6112 preferred_modes);
6113 *method = auth->protocol;
6114 *out = auth->bl;
6115 return 0;
6116}
6117
6118int Monitor::handle_auth_reply_more(
6119 Connection *con,
6120 AuthConnectionMeta *auth_meta,
6121 const bufferlist& bl,
6122 bufferlist *reply)
6123{
6124 std::scoped_lock l(auth_lock);
6125 if (!auth_meta->authorizer) {
6126 derr << __func__ << " no authorizer?" << dendl;
6127 return -EACCES;
6128 }
6129 auth_meta->authorizer->add_challenge(cct, bl);
6130 *reply = auth_meta->authorizer->bl;
6131 return 0;
6132}
6133
6134int Monitor::handle_auth_done(
6135 Connection *con,
6136 AuthConnectionMeta *auth_meta,
6137 uint64_t global_id,
6138 uint32_t con_mode,
6139 const bufferlist& bl,
6140 CryptoKey *session_key,
6141 std::string *connection_secret)
6142{
6143 std::scoped_lock l(auth_lock);
6144 // verify authorizer reply
6145 auto p = bl.begin();
6146 if (!auth_meta->authorizer->verify_reply(p, connection_secret)) {
6147 dout(0) << __func__ << " failed verifying authorizer reply" << dendl;
6148 return -EACCES;
6149 }
6150 auth_meta->session_key = auth_meta->authorizer->session_key;
6151 return 0;
6152}
6153
6154int Monitor::handle_auth_bad_method(
6155 Connection *con,
6156 AuthConnectionMeta *auth_meta,
6157 uint32_t old_auth_method,
6158 int result,
6159 const std::vector<uint32_t>& allowed_methods,
6160 const std::vector<uint32_t>& allowed_modes)
6161{
6162 derr << __func__ << " hmm, they didn't like " << old_auth_method
6163 << " result " << cpp_strerror(result) << dendl;
6164 return -EACCES;
6165}
6166
9f95a23c 6167bool Monitor::get_authorizer(int service_id, AuthAuthorizer **authorizer)
7c673cae 6168{
9f95a23c 6169 dout(10) << "get_authorizer for " << ceph_entity_type_name(service_id)
7c673cae
FG
6170 << dendl;
6171
6172 if (is_shutdown())
6173 return false;
6174
6175 // we only connect to other monitors and mgr; every else connects to us.
6176 if (service_id != CEPH_ENTITY_TYPE_MON &&
6177 service_id != CEPH_ENTITY_TYPE_MGR)
6178 return false;
6179
6180 if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
6181 // auth_none
6182 dout(20) << __func__ << " building auth_none authorizer" << dendl;
11fdf7f2 6183 AuthNoneClientHandler handler{g_ceph_context};
7c673cae
FG
6184 handler.set_global_id(0);
6185 *authorizer = handler.build_authorizer(service_id);
6186 return true;
6187 }
6188
6189 CephXServiceTicketInfo auth_ticket_info;
6190 CephXSessionAuthInfo info;
6191 int ret;
6192
6193 EntityName name;
6194 name.set_type(CEPH_ENTITY_TYPE_MON);
6195 auth_ticket_info.ticket.name = name;
6196 auth_ticket_info.ticket.global_id = 0;
6197
6198 if (service_id == CEPH_ENTITY_TYPE_MON) {
6199 // mon to mon authentication uses the private monitor shared key and not the
6200 // rotating key
6201 CryptoKey secret;
6202 if (!keyring.get_secret(name, secret) &&
6203 !key_server.get_secret(name, secret)) {
6204 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
6205 << dendl;
6206 stringstream ss, ds;
6207 int err = key_server.list_secrets(ds);
6208 if (err < 0)
6209 ss << "no installed auth entries!";
6210 else
6211 ss << "installed auth entries:";
6212 dout(0) << ss.str() << "\n" << ds.str() << dendl;
6213 return false;
6214 }
6215
11fdf7f2 6216 ret = key_server.build_session_auth_info(
c5c27e9a 6217 service_id, auth_ticket_info.ticket, secret, (uint64_t)-1, info);
7c673cae
FG
6218 if (ret < 0) {
6219 dout(0) << __func__ << " failed to build mon session_auth_info "
6220 << cpp_strerror(ret) << dendl;
6221 return false;
6222 }
6223 } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
6224 // mgr
11fdf7f2
TL
6225 ret = key_server.build_session_auth_info(
6226 service_id, auth_ticket_info.ticket, info);
7c673cae
FG
6227 if (ret < 0) {
6228 derr << __func__ << " failed to build mgr service session_auth_info "
6229 << cpp_strerror(ret) << dendl;
6230 return false;
6231 }
6232 } else {
6233 ceph_abort(); // see check at top of fn
6234 }
6235
6236 CephXTicketBlob blob;
6237 if (!cephx_build_service_ticket_blob(cct, info, blob)) {
9f95a23c 6238 dout(0) << "get_authorizer failed to build service ticket" << dendl;
7c673cae
FG
6239 return false;
6240 }
6241 bufferlist ticket_data;
11fdf7f2 6242 encode(blob, ticket_data);
7c673cae 6243
11fdf7f2 6244 auto iter = ticket_data.cbegin();
7c673cae 6245 CephXTicketHandler handler(g_ceph_context, service_id);
11fdf7f2 6246 decode(handler.ticket, iter);
7c673cae
FG
6247
6248 handler.session_key = info.session_key;
6249
6250 *authorizer = handler.build_authorizer(0);
6251
6252 return true;
6253}
6254
11fdf7f2
TL
6255int Monitor::handle_auth_request(
6256 Connection *con,
6257 AuthConnectionMeta *auth_meta,
6258 bool more,
6259 uint32_t auth_method,
6260 const bufferlist &payload,
6261 bufferlist *reply)
6262{
6263 std::scoped_lock l(auth_lock);
7c673cae 6264
11fdf7f2
TL
6265 // NOTE: be careful, the Connection hasn't fully negotiated yet, so
6266 // e.g., peer_features, peer_addrs, and others are still unknown.
6267
6268 dout(10) << __func__ << " con " << con << (more ? " (more)":" (start)")
6269 << " method " << auth_method
6270 << " payload " << payload.length()
6271 << dendl;
9f95a23c
TL
6272 if (!payload.length()) {
6273 if (!con->is_msgr2() &&
6274 con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
6275 // for v1 connections, we tolerate no authorizer (from
6276 // non-monitors), because authentication happens via MAuth
6277 // messages.
6278 return 1;
6279 }
6280 return -EACCES;
6281 }
11fdf7f2
TL
6282 if (!more) {
6283 auth_meta->auth_mode = payload[0];
6284 }
6285
6286 if (auth_meta->auth_mode >= AUTH_MODE_AUTHORIZER &&
6287 auth_meta->auth_mode <= AUTH_MODE_AUTHORIZER_MAX) {
6288 AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
6289 auth_method);
6290 if (!ah) {
6291 lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
6292 << auth_method << dendl;
6293 return -EOPNOTSUPP;
6294 }
6295 bool was_challenge = (bool)auth_meta->authorizer_challenge;
6296 bool isvalid = ah->verify_authorizer(
6297 cct,
9f95a23c 6298 keyring,
11fdf7f2
TL
6299 payload,
6300 auth_meta->get_connection_secret_length(),
6301 reply,
6302 &con->peer_name,
6303 &con->peer_global_id,
6304 &con->peer_caps_info,
6305 &auth_meta->session_key,
6306 &auth_meta->connection_secret,
6307 &auth_meta->authorizer_challenge);
6308 if (isvalid) {
6309 ms_handle_authentication(con);
6310 return 1;
6311 }
6312 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
6313 return 0;
6314 }
6315 dout(10) << __func__ << " bad authorizer on " << con << dendl;
6316 return -EACCES;
eafe8130 6317 } else if (auth_meta->auth_mode < AUTH_MODE_MON ||
11fdf7f2
TL
6318 auth_meta->auth_mode > AUTH_MODE_MON_MAX) {
6319 derr << __func__ << " unrecognized auth mode " << auth_meta->auth_mode
6320 << dendl;
6321 return -EACCES;
6322 }
6323
6324 // wait until we've formed an initial quorum on mkfs so that we have
6325 // the initial keys (e.g., client.admin).
6326 if (authmon()->get_last_committed() == 0) {
6327 dout(10) << __func__ << " haven't formed initial quorum, EBUSY" << dendl;
6328 return -EBUSY;
6329 }
6330
6331 RefCountedPtr priv;
6332 MonSession *s;
6333 int32_t r = 0;
6334 auto p = payload.begin();
6335 if (!more) {
6336 if (con->get_priv()) {
6337 return -EACCES; // wtf
6338 }
6339
6340 // handler?
6341 unique_ptr<AuthServiceHandler> auth_handler{get_auth_service_handler(
6342 auth_method, g_ceph_context, &key_server)};
6343 if (!auth_handler) {
6344 dout(1) << __func__ << " auth_method " << auth_method << " not supported"
6345 << dendl;
6346 return -EOPNOTSUPP;
6347 }
6348
6349 uint8_t mode;
6350 EntityName entity_name;
6351
6352 try {
6353 decode(mode, p);
6354 if (mode < AUTH_MODE_MON ||
6355 mode > AUTH_MODE_MON_MAX) {
6356 dout(1) << __func__ << " invalid mode " << (int)mode << dendl;
6357 return -EACCES;
6358 }
6359 assert(mode >= AUTH_MODE_MON && mode <= AUTH_MODE_MON_MAX);
6360 decode(entity_name, p);
6361 decode(con->peer_global_id, p);
f67539c2 6362 } catch (ceph::buffer::error& e) {
11fdf7f2
TL
6363 dout(1) << __func__ << " failed to decode, " << e.what() << dendl;
6364 return -EACCES;
6365 }
6366
6367 // supported method?
6368 if (entity_name.get_type() == CEPH_ENTITY_TYPE_MON ||
6369 entity_name.get_type() == CEPH_ENTITY_TYPE_OSD ||
6370 entity_name.get_type() == CEPH_ENTITY_TYPE_MDS ||
6371 entity_name.get_type() == CEPH_ENTITY_TYPE_MGR) {
6372 if (!auth_cluster_required.is_supported_auth(auth_method)) {
6373 dout(10) << __func__ << " entity " << entity_name << " method "
6374 << auth_method << " not among supported "
6375 << auth_cluster_required.get_supported_set() << dendl;
6376 return -EOPNOTSUPP;
7c673cae
FG
6377 }
6378 } else {
11fdf7f2
TL
6379 if (!auth_service_required.is_supported_auth(auth_method)) {
6380 dout(10) << __func__ << " entity " << entity_name << " method "
6381 << auth_method << " not among supported "
6382 << auth_cluster_required.get_supported_set() << dendl;
6383 return -EOPNOTSUPP;
6384 }
6385 }
6386
6387 // for msgr1 we would do some weirdness here to ensure signatures
6388 // are supported by the client if we require it. for msgr2 that
6389 // is not necessary.
6390
c5c27e9a 6391 bool is_new_global_id = false;
11fdf7f2
TL
6392 if (!con->peer_global_id) {
6393 con->peer_global_id = authmon()->_assign_global_id();
6394 if (!con->peer_global_id) {
6395 dout(1) << __func__ << " failed to assign global_id" << dendl;
6396 return -EBUSY;
6397 }
c5c27e9a 6398 is_new_global_id = true;
11fdf7f2
TL
6399 }
6400
6401 // set up partial session
6402 s = new MonSession(con);
6403 s->auth_handler = auth_handler.release();
6404 con->set_priv(RefCountedPtr{s, false});
6405
6406 r = s->auth_handler->start_session(
6407 entity_name,
c5c27e9a
TL
6408 con->peer_global_id,
6409 is_new_global_id,
11fdf7f2 6410 reply,
c5c27e9a 6411 &con->peer_caps_info);
11fdf7f2
TL
6412 } else {
6413 priv = con->get_priv();
6414 if (!priv) {
6415 // this can happen if the async ms_handle_reset event races with
6416 // the unlocked call into handle_auth_request
6417 return -EACCES;
7c673cae 6418 }
11fdf7f2
TL
6419 s = static_cast<MonSession*>(priv.get());
6420 r = s->auth_handler->handle_request(
6421 p,
6422 auth_meta->get_connection_secret_length(),
6423 reply,
11fdf7f2
TL
6424 &con->peer_caps_info,
6425 &auth_meta->session_key,
6426 &auth_meta->connection_secret);
6427 }
6428 if (r > 0 &&
6429 !s->authenticated) {
6430 ms_handle_authentication(con);
6431 }
6432
6433 dout(30) << " r " << r << " reply:\n";
6434 reply->hexdump(*_dout);
6435 *_dout << dendl;
6436 return r;
6437}
6438
6439void Monitor::ms_handle_accept(Connection *con)
6440{
6441 auto priv = con->get_priv();
6442 MonSession *s = static_cast<MonSession*>(priv.get());
6443 if (!s) {
6444 // legacy protocol v1?
6445 dout(10) << __func__ << " con " << con << " no session" << dendl;
6446 return;
6447 }
6448
6449 if (s->item.is_on_list()) {
6450 dout(10) << __func__ << " con " << con << " session " << s
6451 << " already on list" << dendl;
7c673cae 6452 } else {
11fdf7f2
TL
6453 dout(10) << __func__ << " con " << con << " session " << s
6454 << " registering session for "
6455 << con->get_peer_addrs() << dendl;
6456 s->_ident(entity_name_t(con->get_peer_type(), con->get_peer_id()),
6457 con->get_peer_addrs());
6458 std::lock_guard l(session_map_lock);
6459 session_map.add_session(s);
7c673cae 6460 }
11fdf7f2
TL
6461}
6462
6463int Monitor::ms_handle_authentication(Connection *con)
6464{
6465 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
6466 // mon <-> mon connections need no Session, and setting one up
6467 // creates an awkward ref cycle between Session and Connection.
6468 return 1;
6469 }
6470
6471 auto priv = con->get_priv();
6472 MonSession *s = static_cast<MonSession*>(priv.get());
6473 if (!s) {
6474 // must be msgr2, otherwise dispatch would have set up the session.
6475 s = session_map.new_session(
6476 entity_name_t(con->get_peer_type(), -1), // we don't know yet
6477 con->get_peer_addrs(),
6478 con);
6479 assert(s);
6480 dout(10) << __func__ << " adding session " << s << " to con " << con
6481 << dendl;
6482 con->set_priv(s);
6483 logger->set(l_mon_num_sessions, session_map.get_size());
6484 logger->inc(l_mon_session_add);
6485 }
6486 dout(10) << __func__ << " session " << s << " con " << con
6487 << " addr " << s->con->get_peer_addr()
6488 << " " << *s << dendl;
6489
6490 AuthCapsInfo &caps_info = con->get_peer_caps_info();
6491 int ret = 0;
6492 if (caps_info.allow_all) {
6493 s->caps.set_allow_all();
6494 s->authenticated = true;
6495 ret = 1;
9f95a23c 6496 } else if (caps_info.caps.length()) {
11fdf7f2
TL
6497 bufferlist::const_iterator p = caps_info.caps.cbegin();
6498 string str;
6499 try {
6500 decode(str, p);
f67539c2 6501 } catch (const ceph::buffer::error &err) {
11fdf7f2
TL
6502 derr << __func__ << " corrupt cap data for " << con->get_peer_entity_name()
6503 << " in auth db" << dendl;
6504 str.clear();
9f95a23c 6505 ret = -EACCES;
11fdf7f2
TL
6506 }
6507 if (ret >= 0) {
6508 if (s->caps.parse(str, NULL)) {
6509 s->authenticated = true;
6510 ret = 1;
6511 } else {
6512 derr << __func__ << " unparseable caps '" << str << "' for "
6513 << con->get_peer_entity_name() << dendl;
9f95a23c 6514 ret = -EACCES;
11fdf7f2
TL
6515 }
6516 }
6517 }
6518
6519 return ret;
7c673cae 6520}
f67539c2 6521
b3b6e05e
TL
6522void Monitor::set_mon_crush_location(const string& loc)
6523{
6524 if (loc.empty()) {
6525 return;
6526 }
6527 vector<string> loc_vec;
6528 loc_vec.push_back(loc);
6529 CrushWrapper::parse_loc_map(loc_vec, &crush_loc);
6530 need_set_crush_loc = true;
6531}
6532
6533void Monitor::notify_new_monmap(bool can_change_external_state)
f67539c2 6534{
b3b6e05e
TL
6535 if (need_set_crush_loc) {
6536 auto my_info_i = monmap->mon_info.find(name);
6537 if (my_info_i != monmap->mon_info.end() &&
6538 my_info_i->second.crush_loc == crush_loc) {
6539 need_set_crush_loc = false;
6540 }
6541 }
f67539c2
TL
6542 elector.notify_strategy_maybe_changed(monmap->strategy);
6543 dout(30) << __func__ << "we have " << monmap->removed_ranks.size() << " removed ranks" << dendl;
6544 for (auto i = monmap->removed_ranks.rbegin();
6545 i != monmap->removed_ranks.rend(); ++i) {
6546 int rank = *i;
6547 dout(10) << __func__ << "removing rank " << rank << dendl;
6548 elector.notify_rank_removed(rank);
6549 }
6550
6551 if (monmap->stretch_mode_enabled) {
b3b6e05e 6552 try_engage_stretch_mode();
f67539c2
TL
6553 }
6554
6555 if (is_stretch_mode()) {
6556 if (!monmap->stretch_marked_down_mons.empty()) {
6557 set_degraded_stretch_mode();
6558 }
6559 }
b3b6e05e 6560 set_elector_disallowed_leaders(can_change_external_state);
f67539c2
TL
6561}
6562
6563void Monitor::set_elector_disallowed_leaders(bool allow_election)
6564{
6565 set<int> dl;
6566 for (auto name : monmap->disallowed_leaders) {
6567 dl.insert(monmap->get_rank(name));
6568 }
6569 if (is_stretch_mode()) {
6570 for (auto name : monmap->stretch_marked_down_mons) {
6571 dl.insert(monmap->get_rank(name));
6572 }
6573 dl.insert(monmap->get_rank(monmap->tiebreaker_mon));
6574 }
6575
6576 bool disallowed_changed = elector.set_disallowed_leaders(dl);
6577 if (disallowed_changed && allow_election) {
6578 elector.call_election();
6579 }
6580}
6581
6582struct CMonEnableStretchMode : public Context {
6583 Monitor *m;
6584 CMonEnableStretchMode(Monitor *mon) : m(mon) {}
6585 void finish(int r) {
b3b6e05e 6586 m->try_engage_stretch_mode();
f67539c2
TL
6587 }
6588};
b3b6e05e 6589void Monitor::try_engage_stretch_mode()
f67539c2
TL
6590{
6591 dout(20) << __func__ << dendl;
6592 if (stretch_mode_engaged) return;
6593 if (!osdmon()->is_readable()) {
6594 osdmon()->wait_for_readable_ctx(new CMonEnableStretchMode(this));
6595 }
6596 if (osdmon()->osdmap.stretch_mode_enabled &&
6597 monmap->stretch_mode_enabled) {
6598 dout(10) << "Engaging stretch mode!" << dendl;
6599 stretch_mode_engaged = true;
6600 int32_t stretch_divider_id = osdmon()->osdmap.stretch_mode_bucket;
6601 stretch_bucket_divider = osdmon()->osdmap.
6602 crush->get_type_name(stretch_divider_id);
6603 disconnect_disallowed_stretch_sessions();
6604 }
6605}
6606
6607void Monitor::do_stretch_mode_election_work()
6608{
6609 dout(20) << __func__ << dendl;
6610 if (!is_stretch_mode() ||
6611 !is_leader()) return;
6612 dout(20) << "checking for degraded stretch mode" << dendl;
6613 map<string, set<string>> old_dead_buckets;
6614 old_dead_buckets.swap(dead_mon_buckets);
6615 up_mon_buckets.clear();
6616 // identify if we've lost a CRUSH bucket, request OSDMonitor check for death
6617 map<string,set<string>> down_mon_buckets;
6618 for (unsigned i = 0; i < monmap->size(); ++i) {
6619 const auto &mi = monmap->mon_info[monmap->get_name(i)];
6620 auto ci = mi.crush_loc.find(stretch_bucket_divider);
6621 ceph_assert(ci != mi.crush_loc.end());
6622 if (quorum.count(i)) {
6623 up_mon_buckets.insert(ci->second);
6624 } else {
6625 down_mon_buckets[ci->second].insert(mi.name);
6626 }
6627 }
6628 dout(20) << "prior dead_mon_buckets: " << old_dead_buckets
6629 << "; down_mon_buckets: " << down_mon_buckets
6630 << "; up_mon_buckets: " << up_mon_buckets << dendl;
6631 for (auto di : down_mon_buckets) {
6632 if (!up_mon_buckets.count(di.first)) {
6633 dead_mon_buckets[di.first] = di.second;
6634 }
6635 }
6636 dout(20) << "new dead_mon_buckets " << dead_mon_buckets << dendl;
6637
6638 if (dead_mon_buckets != old_dead_buckets &&
6639 dead_mon_buckets.size() >= old_dead_buckets.size()) {
6640 maybe_go_degraded_stretch_mode();
6641 }
6642}
6643
6644struct CMonGoDegraded : public Context {
6645 Monitor *m;
6646 CMonGoDegraded(Monitor *mon) : m(mon) {}
6647 void finish(int r) {
6648 m->maybe_go_degraded_stretch_mode();
6649 }
6650};
6651
6652struct CMonGoRecovery : public Context {
6653 Monitor *m;
6654 CMonGoRecovery(Monitor *mon) : m(mon) {}
6655 void finish(int r) {
6656 m->go_recovery_stretch_mode();
6657 }
6658};
6659void Monitor::go_recovery_stretch_mode()
6660{
6661 dout(20) << __func__ << dendl;
6662 if (!is_leader()) return;
6663 if (!is_degraded_stretch_mode()) return;
6664 if (is_recovering_stretch_mode()) return;
6665
6666 if (dead_mon_buckets.size()) {
6667 ceph_assert( 0 == "how did we try and do stretch recovery while we have dead monitor buckets?");
6668 // we can't recover if we are missing monitors in a zone!
6669 return;
6670 }
6671
6672 if (!osdmon()->is_readable()) {
6673 osdmon()->wait_for_readable_ctx(new CMonGoRecovery(this));
6674 return;
6675 }
6676
6677 if (!osdmon()->is_writeable()) {
6678 osdmon()->wait_for_writeable_ctx(new CMonGoRecovery(this));
6679 }
b3b6e05e
TL
6680 osdmon()->trigger_recovery_stretch_mode();
6681}
6682
6683void Monitor::set_recovery_stretch_mode()
6684{
6685 degraded_stretch_mode = true;
f67539c2 6686 recovering_stretch_mode = true;
b3b6e05e 6687 osdmon()->set_recovery_stretch_mode();
f67539c2
TL
6688}
6689
6690void Monitor::maybe_go_degraded_stretch_mode()
6691{
6692 dout(20) << __func__ << dendl;
6693 if (is_degraded_stretch_mode()) return;
6694 if (!is_leader()) return;
6695 if (dead_mon_buckets.empty()) return;
6696 if (!osdmon()->is_readable()) {
6697 osdmon()->wait_for_readable_ctx(new CMonGoDegraded(this));
6698 return;
6699 }
6700 ceph_assert(monmap->contains(monmap->tiebreaker_mon));
6701 // filter out the tiebreaker zone and check if remaining sites are down by OSDs too
6702 const auto &mi = monmap->mon_info[monmap->tiebreaker_mon];
6703 auto ci = mi.crush_loc.find(stretch_bucket_divider);
6704 map<string, set<string>> filtered_dead_buckets = dead_mon_buckets;
6705 filtered_dead_buckets.erase(ci->second);
6706
6707 set<int> matched_down_buckets;
6708 set<string> matched_down_mons;
6709 bool dead = osdmon()->check_for_dead_crush_zones(filtered_dead_buckets,
6710 &matched_down_buckets,
6711 &matched_down_mons);
6712 if (dead) {
6713 if (!osdmon()->is_writeable()) {
6714 osdmon()->wait_for_writeable_ctx(new CMonGoDegraded(this));
6715 }
6716 if (!monmon()->is_writeable()) {
6717 monmon()->wait_for_writeable_ctx(new CMonGoDegraded(this));
6718 }
6719 trigger_degraded_stretch_mode(matched_down_mons, matched_down_buckets);
6720 }
6721}
6722
6723void Monitor::trigger_degraded_stretch_mode(const set<string>& dead_mons,
6724 const set<int>& dead_buckets)
6725{
6726 dout(20) << __func__ << dendl;
6727 ceph_assert(osdmon()->is_writeable());
6728 ceph_assert(monmon()->is_writeable());
6729
6730 // figure out which OSD zone(s) remains alive by removing
6731 // tiebreaker mon from up_mon_buckets
6732 set<string> live_zones = up_mon_buckets;
6733 ceph_assert(monmap->contains(monmap->tiebreaker_mon));
6734 const auto &mi = monmap->mon_info[monmap->tiebreaker_mon];
6735 auto ci = mi.crush_loc.find(stretch_bucket_divider);
6736 live_zones.erase(ci->second);
6737 ceph_assert(live_zones.size() == 1); // only support 2 zones right now
6738
6739 osdmon()->trigger_degraded_stretch_mode(dead_buckets, live_zones);
6740 monmon()->trigger_degraded_stretch_mode(dead_mons);
6741 set_degraded_stretch_mode();
6742}
6743
6744void Monitor::set_degraded_stretch_mode()
6745{
6746 degraded_stretch_mode = true;
6747 recovering_stretch_mode = false;
b3b6e05e 6748 osdmon()->set_degraded_stretch_mode();
f67539c2
TL
6749}
6750
6751struct CMonGoHealthy : public Context {
6752 Monitor *m;
6753 CMonGoHealthy(Monitor *mon) : m(mon) {}
6754 void finish(int r) {
6755 m->trigger_healthy_stretch_mode();
6756 }
6757};
6758
6759
6760void Monitor::trigger_healthy_stretch_mode()
6761{
6762 dout(20) << __func__ << dendl;
6763 if (!is_degraded_stretch_mode()) return;
6764 if (!is_leader()) return;
6765 if (!osdmon()->is_writeable()) {
6766 osdmon()->wait_for_writeable_ctx(new CMonGoHealthy(this));
6767 }
6768 if (!monmon()->is_writeable()) {
6769 monmon()->wait_for_writeable_ctx(new CMonGoHealthy(this));
6770 }
6771
6772 ceph_assert(osdmon()->osdmap.recovering_stretch_mode);
f67539c2
TL
6773 osdmon()->trigger_healthy_stretch_mode();
6774 monmon()->trigger_healthy_stretch_mode();
6775}
6776
6777void Monitor::set_healthy_stretch_mode()
6778{
6779 degraded_stretch_mode = false;
6780 recovering_stretch_mode = false;
b3b6e05e 6781 osdmon()->set_healthy_stretch_mode();
f67539c2
TL
6782}
6783
6784bool Monitor::session_stretch_allowed(MonSession *s, MonOpRequestRef& op)
6785{
6786 if (!is_stretch_mode()) return true;
6787 if (s->proxy_con) return true;
6788 if (s->validated_stretch_connection) return true;
6789 if (!s->con) return true;
6790 if (s->con->peer_is_osd()) {
6791 dout(20) << __func__ << "checking OSD session" << s << dendl;
6792 // okay, check the crush location
6793 int barrier_id;
6794 int retval = osdmon()->osdmap.crush->get_validated_type_id(stretch_bucket_divider,
6795 &barrier_id);
6796 ceph_assert(retval >= 0);
6797 int osd_bucket_id = osdmon()->osdmap.crush->get_parent_of_type(s->con->peer_id,
6798 barrier_id);
6799 const auto &mi = monmap->mon_info.find(name);
6800 ceph_assert(mi != monmap->mon_info.end());
6801 auto ci = mi->second.crush_loc.find(stretch_bucket_divider);
6802 ceph_assert(ci != mi->second.crush_loc.end());
6803 int mon_bucket_id = osdmon()->osdmap.crush->get_item_id(ci->second);
6804
6805 if (osd_bucket_id != mon_bucket_id) {
6806 dout(5) << "discarding session " << *s
6807 << " and sending OSD to matched zone" << dendl;
6808 s->con->mark_down();
6809 std::lock_guard l(session_map_lock);
6810 remove_session(s);
6811 if (op) {
6812 op->mark_zap();
6813 }
6814 return false;
6815 }
6816 }
6817
6818 s->validated_stretch_connection = true;
6819 return true;
6820}
6821
6822void Monitor::disconnect_disallowed_stretch_sessions()
6823{
6824 dout(20) << __func__ << dendl;
6825 MonOpRequestRef blank;
6826 auto i = session_map.sessions.begin();
6827 while (i != session_map.sessions.end()) {
6828 auto j = i;
6829 ++i;
6830 session_stretch_allowed(*j, blank);
6831 }
6832}