]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/Monitor.cc
import 15.2.2 octopus source
[ceph.git] / ceph / src / mon / Monitor.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
11fdf7f2 16#include <iterator>
7c673cae 17#include <sstream>
11fdf7f2 18#include <tuple>
7c673cae
FG
19#include <stdlib.h>
20#include <signal.h>
21#include <limits.h>
22#include <cstring>
23#include <boost/scope_exit.hpp>
24#include <boost/algorithm/string/predicate.hpp>
25
11fdf7f2
TL
26#include "json_spirit/json_spirit_reader.h"
27#include "json_spirit/json_spirit_writer.h"
28
7c673cae
FG
29#include "Monitor.h"
30#include "common/version.h"
11fdf7f2
TL
31#include "common/blkdev.h"
32#include "common/cmdparse.h"
33#include "common/signal.h"
7c673cae
FG
34
35#include "osd/OSDMap.h"
36
37#include "MonitorDBStore.h"
38
39#include "messages/PaxosServiceMessage.h"
40#include "messages/MMonMap.h"
41#include "messages/MMonGetMap.h"
42#include "messages/MMonGetVersion.h"
43#include "messages/MMonGetVersionReply.h"
44#include "messages/MGenericMessage.h"
45#include "messages/MMonCommand.h"
46#include "messages/MMonCommandAck.h"
47#include "messages/MMonMetadata.h"
48#include "messages/MMonSync.h"
49#include "messages/MMonScrub.h"
50#include "messages/MMonProbe.h"
51#include "messages/MMonJoin.h"
52#include "messages/MMonPaxos.h"
53#include "messages/MRoute.h"
54#include "messages/MForward.h"
55
56#include "messages/MMonSubscribe.h"
57#include "messages/MMonSubscribeAck.h"
58
9f95a23c
TL
59#include "messages/MCommand.h"
60#include "messages/MCommandReply.h"
61
7c673cae
FG
62#include "messages/MAuthReply.h"
63
11fdf7f2 64#include "messages/MTimeCheck2.h"
7c673cae
FG
65#include "messages/MPing.h"
66
67#include "common/strtol.h"
68#include "common/ceph_argparse.h"
69#include "common/Timer.h"
70#include "common/Clock.h"
71#include "common/errno.h"
72#include "common/perf_counters.h"
73#include "common/admin_socket.h"
74#include "global/signal_handler.h"
75#include "common/Formatter.h"
76#include "include/stringify.h"
77#include "include/color.h"
78#include "include/ceph_fs.h"
79#include "include/str_list.h"
80
81#include "OSDMonitor.h"
82#include "MDSMonitor.h"
83#include "MonmapMonitor.h"
7c673cae
FG
84#include "LogMonitor.h"
85#include "AuthMonitor.h"
86#include "MgrMonitor.h"
31f18b77 87#include "MgrStatMonitor.h"
11fdf7f2 88#include "ConfigMonitor.h"
7c673cae
FG
89#include "mon/QuorumService.h"
90#include "mon/HealthMonitor.h"
91#include "mon/ConfigKeyService.h"
92#include "common/config.h"
93#include "common/cmdparse.h"
11fdf7f2 94#include "include/ceph_assert.h"
7c673cae
FG
95#include "include/compat.h"
96#include "perfglue/heap_profiler.h"
97
98#include "auth/none/AuthNoneClientHandler.h"
99
100#define dout_subsys ceph_subsys_mon
101#undef dout_prefix
102#define dout_prefix _prefix(_dout, this)
9f95a23c 103using namespace TOPNSPC::common;
7c673cae
FG
104static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
105 return *_dout << "mon." << mon->name << "@" << mon->rank
106 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
107}
108
109const string Monitor::MONITOR_NAME = "monitor";
110const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
111
112
113#undef FLAG
114#undef COMMAND
115#undef COMMAND_WITH_FLAG
7c673cae 116#define FLAG(f) (MonCommand::FLAG_##f)
11fdf7f2
TL
117#define COMMAND(parsesig, helptext, modulename, req_perms) \
118 {parsesig, helptext, modulename, req_perms, FLAG(NONE)},
119#define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, flags) \
120 {parsesig, helptext, modulename, req_perms, flags},
d2e6a577 121MonCommand mon_commands[] = {
7c673cae 122#include <mon/MonCommands.h>
d2e6a577 123};
7c673cae
FG
124#undef COMMAND
125#undef COMMAND_WITH_FLAG
7c673cae 126
7c673cae
FG
127Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
128 Messenger *m, Messenger *mgr_m, MonMap *map) :
129 Dispatcher(cct_),
11fdf7f2 130 AuthServer(cct_),
7c673cae
FG
131 name(nm),
132 rank(-1),
133 messenger(m),
134 con_self(m ? m->get_loopback_connection() : NULL),
7c673cae
FG
135 timer(cct_, lock),
136 finisher(cct_, "mon_finisher", "fin"),
11fdf7f2 137 cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads),
7c673cae
FG
138 has_ever_joined(false),
139 logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
140 monmap(map),
141 log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
142 key_server(cct, &keyring),
143 auth_cluster_required(cct,
144 cct->_conf->auth_supported.empty() ?
145 cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
146 auth_service_required(cct,
147 cct->_conf->auth_supported.empty() ?
11fdf7f2 148 cct->_conf->auth_service_required : cct->_conf->auth_supported),
7c673cae 149 mgr_messenger(mgr_m),
9f95a23c 150 mgr_client(cct_, mgr_m, monmap),
11fdf7f2 151 gss_ktfile_client(cct->_conf.get_val<std::string>("gss_ktab_client_file")),
7c673cae
FG
152 store(s),
153
7c673cae
FG
154 elector(this),
155 required_features(0),
156 leader(0),
157 quorum_con_features(0),
158 // scrub
159 scrub_version(0),
160 scrub_event(NULL),
161 scrub_timeout_event(NULL),
162
163 // sync state
164 sync_provider_count(0),
165 sync_cookie(0),
166 sync_full(false),
167 sync_start_version(0),
168 sync_timeout_event(NULL),
169 sync_last_committed_floor(0),
170
171 timecheck_round(0),
172 timecheck_acks(0),
173 timecheck_rounds_since_clean(0),
174 timecheck_event(NULL),
175
176 paxos_service(PAXOS_NUM),
177 admin_hook(NULL),
178 routed_request_tid(0),
11fdf7f2 179 op_tracker(cct, g_conf().get_val<bool>("mon_enable_op_tracker"), 1)
7c673cae
FG
180{
181 clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
182 audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
183
184 update_log_clients();
185
11fdf7f2
TL
186 if (!gss_ktfile_client.empty()) {
187 // Assert we can export environment variable
188 /*
189 The default client keytab is used, if it is present and readable,
190 to automatically obtain initial credentials for GSSAPI client
191 applications. The principal name of the first entry in the client
192 keytab is used by default when obtaining initial credentials.
193 1. The KRB5_CLIENT_KTNAME environment variable.
194 2. The default_client_keytab_name profile variable in [libdefaults].
195 3. The hardcoded default, DEFCKTNAME.
196 */
197 const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
198 gss_ktfile_client.c_str(), 1));
199 ceph_assert(set_result == 0);
200 }
201
202 op_tracker.set_complaint_and_threshold(
203 g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count(),
204 g_conf().get_val<int64_t>("mon_op_log_threshold"));
205 op_tracker.set_history_size_and_duration(
206 g_conf().get_val<uint64_t>("mon_op_history_size"),
207 g_conf().get_val<std::chrono::seconds>("mon_op_history_duration").count());
208 op_tracker.set_history_slow_op_size_and_threshold(
209 g_conf().get_val<uint64_t>("mon_op_history_slow_op_size"),
210 g_conf().get_val<std::chrono::seconds>("mon_op_history_slow_op_threshold").count());
211
7c673cae
FG
212 paxos = new Paxos(this, "paxos");
213
11fdf7f2
TL
214 paxos_service[PAXOS_MDSMAP].reset(new MDSMonitor(this, paxos, "mdsmap"));
215 paxos_service[PAXOS_MONMAP].reset(new MonmapMonitor(this, paxos, "monmap"));
216 paxos_service[PAXOS_OSDMAP].reset(new OSDMonitor(cct, this, paxos, "osdmap"));
217 paxos_service[PAXOS_LOG].reset(new LogMonitor(this, paxos, "logm"));
218 paxos_service[PAXOS_AUTH].reset(new AuthMonitor(this, paxos, "auth"));
219 paxos_service[PAXOS_MGR].reset(new MgrMonitor(this, paxos, "mgr"));
220 paxos_service[PAXOS_MGRSTAT].reset(new MgrStatMonitor(this, paxos, "mgrstat"));
221 paxos_service[PAXOS_HEALTH].reset(new HealthMonitor(this, paxos, "health"));
222 paxos_service[PAXOS_CONFIG].reset(new ConfigMonitor(this, paxos, "config"));
223
7c673cae
FG
224 config_key_service = new ConfigKeyService(this, paxos);
225
11fdf7f2
TL
226 bool r = mon_caps.parse("allow *", NULL);
227 ceph_assert(r);
7c673cae
FG
228
229 exited_quorum = ceph_clock_now();
230
d2e6a577 231 // prepare local commands
11fdf7f2
TL
232 local_mon_commands.resize(std::size(mon_commands));
233 for (unsigned i = 0; i < std::size(mon_commands); ++i) {
d2e6a577
FG
234 local_mon_commands[i] = mon_commands[i];
235 }
236 MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
237
11fdf7f2
TL
238 prenautilus_local_mon_commands = local_mon_commands;
239 for (auto& i : prenautilus_local_mon_commands) {
240 std::string n = cmddesc_get_prenautilus_compat(i.cmdstring);
241 if (n != i.cmdstring) {
242 dout(20) << " pre-nautilus cmd " << i.cmdstring << " -> " << n << dendl;
243 i.cmdstring = n;
244 }
d2e6a577 245 }
11fdf7f2 246 MonCommand::encode_vector(prenautilus_local_mon_commands, prenautilus_local_mon_commands_bl);
d2e6a577 247
7c673cae
FG
248 // assume our commands until we have an election. this only means
249 // we won't reply with EINVAL before the election; any command that
250 // actually matters will wait until we have quorum etc and then
251 // retry (and revalidate).
d2e6a577 252 leader_mon_commands = local_mon_commands;
7c673cae
FG
253}
254
7c673cae
FG
255Monitor::~Monitor()
256{
11fdf7f2
TL
257 op_tracker.on_shutdown();
258
259 paxos_service.clear();
7c673cae
FG
260 delete config_key_service;
261 delete paxos;
11fdf7f2 262 ceph_assert(session_map.sessions.empty());
7c673cae
FG
263}
264
265
266class AdminHook : public AdminSocketHook {
267 Monitor *mon;
268public:
269 explicit AdminHook(Monitor *m) : mon(m) {}
9f95a23c
TL
270 int call(std::string_view command, const cmdmap_t& cmdmap,
271 Formatter *f,
272 std::ostream& errss,
273 bufferlist& out) override {
274 stringstream outss;
275 int r = mon->do_admin_command(command, cmdmap, f, errss, outss);
276 out.append(outss);
277 return r;
7c673cae
FG
278 }
279};
280
9f95a23c
TL
281int Monitor::do_admin_command(
282 std::string_view command,
283 const cmdmap_t& cmdmap,
284 Formatter *f,
285 std::ostream& err,
286 std::ostream& out)
7c673cae 287{
11fdf7f2 288 std::lock_guard l(lock);
7c673cae 289
9f95a23c 290 int r = 0;
7c673cae 291 string args;
11fdf7f2 292 for (auto p = cmdmap.begin();
7c673cae
FG
293 p != cmdmap.end(); ++p) {
294 if (p->first == "prefix")
295 continue;
296 if (!args.empty())
297 args += ", ";
298 args += cmd_vartype_stringify(p->second);
299 }
300 args = "[" + args + "]";
301
302 bool read_only = (command == "mon_status" ||
303 command == "mon metadata" ||
304 command == "quorum_status" ||
224ce89b
WB
305 command == "ops" ||
306 command == "sessions");
7c673cae
FG
307
308 (read_only ? audit_clog->debug() : audit_clog->info())
309 << "from='admin socket' entity='admin socket' "
310 << "cmd='" << command << "' args=" << args << ": dispatch";
311
312 if (command == "mon_status") {
9f95a23c 313 get_mon_status(f);
7c673cae 314 } else if (command == "quorum_status") {
9f95a23c 315 _quorum_status(f, out);
7c673cae
FG
316 } else if (command == "sync_force") {
317 string validate;
9f95a23c 318 if ((!cmd_getval(cmdmap, "validate", validate)) ||
7c673cae 319 (validate != "--yes-i-really-mean-it")) {
9f95a23c
TL
320 err << "are you SURE? this will mean the monitor store will be erased "
321 "the next time the monitor is restarted. pass "
322 "'--yes-i-really-mean-it' if you really do.";
323 r = -EPERM;
7c673cae
FG
324 goto abort;
325 }
9f95a23c 326 sync_force(f);
11fdf7f2
TL
327 } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0 ||
328 command.compare(0, 24, "add_bootstrap_peer_hintv") == 0) {
9f95a23c 329 if (!_add_bootstrap_peer_hint(command, cmdmap, out))
7c673cae
FG
330 goto abort;
331 } else if (command == "quorum enter") {
332 elector.start_participating();
333 start_election();
9f95a23c 334 out << "started responding to quorum, initiated new election";
7c673cae
FG
335 } else if (command == "quorum exit") {
336 start_election();
337 elector.stop_participating();
9f95a23c 338 out << "stopped responding to quorum, initiated new election";
7c673cae 339 } else if (command == "ops") {
9f95a23c 340 (void)op_tracker.dump_ops_in_flight(f);
224ce89b 341 } else if (command == "sessions") {
9f95a23c
TL
342 f->open_array_section("sessions");
343 for (auto p : session_map.sessions) {
344 f->dump_object("session", *p);
224ce89b 345 }
9f95a23c 346 f->close_section();
11fdf7f2 347 } else if (command == "dump_historic_ops") {
9f95a23c
TL
348 if (!op_tracker.dump_historic_ops(f)) {
349 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
11fdf7f2
TL
350 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
351 }
352 } else if (command == "dump_historic_ops_by_duration" ) {
9f95a23c
TL
353 if (op_tracker.dump_historic_ops(f, true)) {
354 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
11fdf7f2
TL
355 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
356 }
357 } else if (command == "dump_historic_slow_ops") {
9f95a23c
TL
358 if (op_tracker.dump_historic_slow_ops(f, {})) {
359 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
11fdf7f2
TL
360 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
361 }
9f95a23c
TL
362 } else if (command == "quorum") {
363 string quorumcmd;
364 cmd_getval(cmdmap, "quorumcmd", quorumcmd);
365 if (quorumcmd == "exit") {
366 start_election();
367 elector.stop_participating();
368 out << "stopped responding to quorum, initiated new election" << std::endl;
369 } else if (quorumcmd == "enter") {
370 elector.start_participating();
371 start_election();
372 out << "started responding to quorum, initiated new election" << std::endl;
373 } else {
374 err << "needs a valid 'quorum' command" << std::endl;
375 }
376 } else if (command == "smart") {
377 string want_devid;
378 cmd_getval(cmdmap, "devid", want_devid);
379
380 string devname = store->get_devname();
381 set<string> devnames;
382 get_raw_devices(devname, &devnames);
383 json_spirit::mObject json_map;
384 uint64_t smart_timeout = cct->_conf.get_val<uint64_t>(
385 "mon_smart_report_timeout");
386 for (auto& devname : devnames) {
387 string err;
388 string devid = get_device_id(devname, &err);
389 if (want_devid.size() && want_devid != devid) {
390 derr << "get_device_id failed on " << devname << ": " << err << dendl;
391 continue;
392 }
393 json_spirit::mValue smart_json;
394 if (block_device_get_metrics(devname, smart_timeout,
395 &smart_json)) {
396 dout(10) << "block_device_get_metrics failed for /dev/" << devname
397 << dendl;
398 continue;
399 }
400 json_map[devid] = smart_json;
401 }
402 json_spirit::write(json_map, out, json_spirit::pretty_print);
403 } else if (command == "heap") {
404 if (!ceph_using_tcmalloc()) {
405 err << "could not issue heap profiler command -- not using tcmalloc!";
406 r = -EOPNOTSUPP;
407 goto abort;
408 }
409 string cmd;
410 if (!cmd_getval(cmdmap, "heapcmd", cmd)) {
411 err << "unable to get value for command \"" << cmd << "\"";
412 r = -EINVAL;
413 goto abort;
414 }
415 std::vector<std::string> cmd_vec;
416 get_str_vec(cmd, cmd_vec);
417 string val;
418 if (cmd_getval(cmdmap, "value", val)) {
419 cmd_vec.push_back(val);
420 }
421 ceph_heap_profiler_handle_command(cmd_vec, out);
422 } else if (command == "compact") {
423 dout(1) << "triggering manual compaction" << dendl;
424 auto start = ceph::coarse_mono_clock::now();
425 store->compact_async();
426 auto end = ceph::coarse_mono_clock::now();
427 auto duration = ceph::to_seconds<double>(end - start);
428 dout(1) << "finished manual compaction in "
429 << duration << " seconds" << dendl;
430 out << "compacted " << g_conf().get_val<std::string>("mon_keyvaluedb")
431 << " in " << duration << " seconds";
432 } else {
11fdf7f2 433 ceph_abort_msg("bad AdminSocket command binding");
7c673cae
FG
434 }
435 (read_only ? audit_clog->debug() : audit_clog->info())
436 << "from='admin socket' "
437 << "entity='admin socket' "
438 << "cmd=" << command << " "
439 << "args=" << args << ": finished";
9f95a23c 440 return r;
7c673cae
FG
441
442abort:
443 (read_only ? audit_clog->debug() : audit_clog->info())
444 << "from='admin socket' "
445 << "entity='admin socket' "
446 << "cmd=" << command << " "
447 << "args=" << args << ": aborted";
9f95a23c 448 return r;
7c673cae
FG
449}
450
451void Monitor::handle_signal(int signum)
452{
11fdf7f2 453 ceph_assert(signum == SIGINT || signum == SIGTERM);
7c673cae
FG
454 derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
455 shutdown();
456}
457
458CompatSet Monitor::get_initial_supported_features()
459{
460 CompatSet::FeatureSet ceph_mon_feature_compat;
461 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
462 CompatSet::FeatureSet ceph_mon_feature_incompat;
463 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
464 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
465 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
466 ceph_mon_feature_incompat);
467}
468
469CompatSet Monitor::get_supported_features()
470{
471 CompatSet compat = get_initial_supported_features();
472 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
473 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
474 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
475 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
476 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
181888fb 477 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
11fdf7f2
TL
478 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
479 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
9f95a23c 480 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS);
7c673cae
FG
481 return compat;
482}
483
484CompatSet Monitor::get_legacy_features()
485{
486 CompatSet::FeatureSet ceph_mon_feature_compat;
487 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
488 CompatSet::FeatureSet ceph_mon_feature_incompat;
489 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
490 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
491 ceph_mon_feature_incompat);
492}
493
494int Monitor::check_features(MonitorDBStore *store)
495{
496 CompatSet required = get_supported_features();
497 CompatSet ondisk;
498
499 read_features_off_disk(store, &ondisk);
500
501 if (!required.writeable(ondisk)) {
502 CompatSet diff = required.unsupported(ondisk);
503 generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
504 return -EPERM;
505 }
506
507 return 0;
508}
509
510void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
511{
512 bufferlist featuresbl;
513 store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
514 if (featuresbl.length() == 0) {
515 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
516 << "Assuming it is old-style and introducing one." << dendl;
517 //we only want the baseline ~v.18 features assumed to be on disk.
518 //If new features are introduced this code needs to disappear or
519 //be made smarter.
520 *features = get_legacy_features();
521
522 features->encode(featuresbl);
523 auto t(std::make_shared<MonitorDBStore::Transaction>());
524 t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
525 store->apply_transaction(t);
526 } else {
11fdf7f2 527 auto it = featuresbl.cbegin();
7c673cae
FG
528 features->decode(it);
529 }
530}
531
532void Monitor::read_features()
533{
534 read_features_off_disk(store, &features);
535 dout(10) << "features " << features << dendl;
536
537 calc_quorum_requirements();
538 dout(10) << "required_features " << required_features << dendl;
539}
540
541void Monitor::write_features(MonitorDBStore::TransactionRef t)
542{
543 bufferlist bl;
544 features.encode(bl);
545 t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
546}
547
548const char** Monitor::get_tracked_conf_keys() const
549{
550 static const char* KEYS[] = {
551 "crushtool", // helpful for testing
552 "mon_election_timeout",
553 "mon_lease",
554 "mon_lease_renew_interval_factor",
555 "mon_lease_ack_timeout_factor",
556 "mon_accept_timeout_factor",
557 // clog & admin clog
558 "clog_to_monitors",
559 "clog_to_syslog",
560 "clog_to_syslog_facility",
561 "clog_to_syslog_level",
562 "clog_to_graylog",
563 "clog_to_graylog_host",
564 "clog_to_graylog_port",
565 "host",
566 "fsid",
567 // periodic health to clog
568 "mon_health_to_clog",
569 "mon_health_to_clog_interval",
570 "mon_health_to_clog_tick_interval",
571 // scrub interval
572 "mon_scrub_interval",
11fdf7f2
TL
573 "mon_allow_pool_delete",
574 // osdmap pruning - observed, not handled.
575 "mon_osdmap_full_prune_enabled",
576 "mon_osdmap_full_prune_min",
577 "mon_osdmap_full_prune_interval",
578 "mon_osdmap_full_prune_txsize",
579 // debug options - observed, not handled
580 "mon_debug_extra_checks",
581 "mon_debug_block_osdmap_trim",
7c673cae
FG
582 NULL
583 };
584 return KEYS;
585}
586
11fdf7f2 587void Monitor::handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
588 const std::set<std::string> &changed)
589{
590 sanitize_options();
591
592 dout(10) << __func__ << " " << changed << dendl;
593
594 if (changed.count("clog_to_monitors") ||
595 changed.count("clog_to_syslog") ||
596 changed.count("clog_to_syslog_level") ||
597 changed.count("clog_to_syslog_facility") ||
598 changed.count("clog_to_graylog") ||
599 changed.count("clog_to_graylog_host") ||
600 changed.count("clog_to_graylog_port") ||
601 changed.count("host") ||
602 changed.count("fsid")) {
603 update_log_clients();
604 }
605
606 if (changed.count("mon_health_to_clog") ||
607 changed.count("mon_health_to_clog_interval") ||
608 changed.count("mon_health_to_clog_tick_interval")) {
9f95a23c
TL
609 finisher.queue(new C_MonContext{this, [this, changed](int) {
610 std::lock_guard l{lock};
611 health_to_clog_update_conf(changed);
612 }});
7c673cae
FG
613 }
614
615 if (changed.count("mon_scrub_interval")) {
494da23a 616 int scrub_interval = conf->mon_scrub_interval;
9f95a23c
TL
617 finisher.queue(new C_MonContext{this, [this, scrub_interval](int) {
618 std::lock_guard l{lock};
494da23a 619 scrub_update_interval(scrub_interval);
9f95a23c 620 }});
7c673cae
FG
621 }
622}
623
624void Monitor::update_log_clients()
625{
626 map<string,string> log_to_monitors;
627 map<string,string> log_to_syslog;
628 map<string,string> log_channel;
629 map<string,string> log_prio;
630 map<string,string> log_to_graylog;
631 map<string,string> log_to_graylog_host;
632 map<string,string> log_to_graylog_port;
633 uuid_d fsid;
634 string host;
635
636 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
637 log_channel, log_prio, log_to_graylog,
638 log_to_graylog_host, log_to_graylog_port,
639 fsid, host))
640 return;
641
642 clog->update_config(log_to_monitors, log_to_syslog,
643 log_channel, log_prio, log_to_graylog,
644 log_to_graylog_host, log_to_graylog_port,
645 fsid, host);
646
647 audit_clog->update_config(log_to_monitors, log_to_syslog,
648 log_channel, log_prio, log_to_graylog,
649 log_to_graylog_host, log_to_graylog_port,
650 fsid, host);
651}
652
653int Monitor::sanitize_options()
654{
655 int r = 0;
656
657 // mon_lease must be greater than mon_lease_renewal; otherwise we
658 // may incur in leases expiring before they are renewed.
11fdf7f2 659 if (g_conf()->mon_lease_renew_interval_factor >= 1.0) {
7c673cae 660 clog->error() << "mon_lease_renew_interval_factor ("
11fdf7f2 661 << g_conf()->mon_lease_renew_interval_factor
7c673cae
FG
662 << ") must be less than 1.0";
663 r = -EINVAL;
664 }
665
666 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
667 // got time to renew the lease and get an ack for it. Having both options
668 // with the same value, for a given small vale, could mean timing out if
669 // the monitors happened to be overloaded -- or even under normal load for
670 // a small enough value.
11fdf7f2 671 if (g_conf()->mon_lease_ack_timeout_factor <= 1.0) {
7c673cae 672 clog->error() << "mon_lease_ack_timeout_factor ("
11fdf7f2 673 << g_conf()->mon_lease_ack_timeout_factor
7c673cae
FG
674 << ") must be greater than 1.0";
675 r = -EINVAL;
676 }
677
678 return r;
679}
680
681int Monitor::preinit()
682{
9f95a23c 683 std::unique_lock l(lock);
7c673cae
FG
684
685 dout(1) << "preinit fsid " << monmap->fsid << dendl;
686
687 int r = sanitize_options();
688 if (r < 0) {
689 derr << "option sanitization failed!" << dendl;
7c673cae
FG
690 return r;
691 }
692
11fdf7f2 693 ceph_assert(!logger);
7c673cae
FG
694 {
695 PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
3efd9988
FG
696 pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess",
697 PerfCountersBuilder::PRIO_USEFUL);
698 pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions",
699 "sadd", PerfCountersBuilder::PRIO_INTERESTING);
700 pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions",
701 "srm", PerfCountersBuilder::PRIO_INTERESTING);
702 pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions",
703 "strm", PerfCountersBuilder::PRIO_USEFUL);
704 pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in",
705 "ecnt", PerfCountersBuilder::PRIO_USEFUL);
706 pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started",
707 "estt", PerfCountersBuilder::PRIO_INTERESTING);
708 pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won",
709 "ewon", PerfCountersBuilder::PRIO_INTERESTING);
710 pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost",
711 "elst", PerfCountersBuilder::PRIO_INTERESTING);
7c673cae
FG
712 logger = pcb.create_perf_counters();
713 cct->get_perfcounters_collection()->add(logger);
714 }
715
11fdf7f2 716 ceph_assert(!cluster_logger);
7c673cae
FG
717 {
718 PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
719 pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
720 pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
721 pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
722 pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
723 pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
724 pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
11fdf7f2
TL
725 pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster", NULL, 0, unit_t(UNIT_BYTES));
726 pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space", NULL, 0, unit_t(UNIT_BYTES));
727 pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
728 pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
729 pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
730 pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
731 pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
732 pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
733 pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
734 pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
735 pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
736 pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
11fdf7f2 737 pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
738 cluster_logger = pcb.create_perf_counters();
739 }
740
741 paxos->init_logger();
742
743 // verify cluster_uuid
744 {
745 int r = check_fsid();
746 if (r == -ENOENT)
747 r = write_fsid();
748 if (r < 0) {
7c673cae
FG
749 return r;
750 }
751 }
752
753 // open compatset
754 read_features();
755
756 // have we ever joined a quorum?
757 has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
758 dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
759
760 if (!has_ever_joined) {
761 // impose initial quorum restrictions?
762 list<string> initial_members;
11fdf7f2 763 get_str_list(g_conf()->mon_initial_members, initial_members);
7c673cae
FG
764
765 if (!initial_members.empty()) {
766 dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
767
11fdf7f2
TL
768 monmap->set_initial_members(
769 g_ceph_context, initial_members, name, messenger->get_myaddrs(),
770 &extra_probe_peers);
7c673cae
FG
771
772 dout(10) << " monmap is " << *monmap << dendl;
773 dout(10) << " extra probe peers " << extra_probe_peers << dendl;
774 }
775 } else if (!monmap->contains(name)) {
776 derr << "not in monmap and have been in a quorum before; "
777 << "must have been removed" << dendl;
11fdf7f2 778 if (g_conf()->mon_force_quorum_join) {
7c673cae
FG
779 dout(0) << "we should have died but "
780 << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
781 } else {
782 derr << "commit suicide!" << dendl;
7c673cae
FG
783 return -ENOENT;
784 }
785 }
786
787 {
788 // We have a potentially inconsistent store state in hands. Get rid of it
789 // and start fresh.
790 bool clear_store = false;
791 if (store->exists("mon_sync", "in_sync")) {
792 dout(1) << __func__ << " clean up potentially inconsistent store state"
793 << dendl;
794 clear_store = true;
795 }
796
797 if (store->get("mon_sync", "force_sync") > 0) {
798 dout(1) << __func__ << " force sync by clearing store state" << dendl;
799 clear_store = true;
800 }
801
802 if (clear_store) {
803 set<string> sync_prefixes = get_sync_targets_names();
804 store->clear(sync_prefixes);
805 }
806 }
807
808 sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
809 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
810
811 init_paxos();
7c673cae
FG
812
813 if (is_keyring_required()) {
814 // we need to bootstrap authentication keys so we can form an
815 // initial quorum.
816 if (authmon()->get_last_committed() == 0) {
817 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
818 bufferlist bl;
819 int err = store->get("mkfs", "keyring", bl);
820 if (err == 0 && bl.length() > 0) {
821 // Attempt to decode and extract keyring only if it is found.
822 KeyRing keyring;
11fdf7f2
TL
823 auto p = bl.cbegin();
824 decode(keyring, p);
7c673cae
FG
825 extract_save_mon_key(keyring);
826 }
827 }
828
11fdf7f2 829 string keyring_loc = g_conf()->mon_data + "/keyring";
7c673cae
FG
830
831 r = keyring.load(cct, keyring_loc);
832 if (r < 0) {
833 EntityName mon_name;
834 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
835 EntityAuth mon_key;
836 if (key_server.get_auth(mon_name, mon_key)) {
837 dout(1) << "copying mon. key from old db to external keyring" << dendl;
838 keyring.add(mon_name, mon_key);
839 bufferlist bl;
840 keyring.encode_plaintext(bl);
841 write_default_keyring(bl);
842 } else {
11fdf7f2 843 derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
7c673cae
FG
844 return r;
845 }
846 }
847 }
848
849 admin_hook = new AdminHook(this);
850 AdminSocket* admin_socket = cct->get_admin_socket();
851
852 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
9f95a23c
TL
853 l.unlock();
854 // register tell/asock commands
855 for (const auto& command : local_mon_commands) {
856 if (!command.is_tell()) {
857 continue;
858 }
859 const auto prefix = cmddesc_get_prefix(command.cmdstring);
860 if (prefix == "injectargs" ||
861 prefix == "version" ||
862 prefix == "tell") {
863 // not registerd by me
864 continue;
865 }
866 r = admin_socket->register_command(command.cmdstring, admin_hook,
867 command.helpstring);
868 ceph_assert(r == 0);
869 }
870 l.lock();
7c673cae
FG
871
872 // add ourselves as a conf observer
11fdf7f2
TL
873 g_conf().add_observer(this);
874
875 messenger->set_auth_client(this);
876 messenger->set_auth_server(this);
877 mgr_messenger->set_auth_client(this);
878
879 auth_registry.refresh_config();
7c673cae 880
7c673cae
FG
881 return 0;
882}
883
884int Monitor::init()
885{
886 dout(2) << "init" << dendl;
11fdf7f2 887 std::lock_guard l(lock);
7c673cae
FG
888
889 finisher.start();
890
891 // start ticker
892 timer.init();
893 new_tick();
894
895 cpu_tp.start();
896
897 // i'm ready!
898 messenger->add_dispatcher_tail(this);
899
11fdf7f2 900 // kickstart pet mgrclient
7c673cae
FG
901 mgr_client.init();
902 mgr_messenger->add_dispatcher_tail(&mgr_client);
903 mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
11fdf7f2 904 mgrmon()->prime_mgr_client();
7c673cae 905
11fdf7f2 906 state = STATE_PROBING;
7c673cae 907 bootstrap();
b5b8bbf5
FG
908 // add features of myself into feature_map
909 session_map.feature_map.add_mon(con_self->get_features());
7c673cae
FG
910 return 0;
911}
912
913void Monitor::init_paxos()
914{
915 dout(10) << __func__ << dendl;
916 paxos->init();
917
918 // init services
11fdf7f2
TL
919 for (auto& svc : paxos_service) {
920 svc->init();
7c673cae
FG
921 }
922
923 refresh_from_paxos(NULL);
924}
925
926void Monitor::refresh_from_paxos(bool *need_bootstrap)
927{
928 dout(10) << __func__ << dendl;
929
930 bufferlist bl;
931 int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
932 if (r >= 0) {
933 try {
11fdf7f2
TL
934 auto p = bl.cbegin();
935 decode(fingerprint, p);
7c673cae
FG
936 }
937 catch (buffer::error& e) {
938 dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
939 }
940 } else {
941 dout(10) << __func__ << " no cluster_fingerprint" << dendl;
942 }
943
11fdf7f2
TL
944 for (auto& svc : paxos_service) {
945 svc->refresh(need_bootstrap);
7c673cae 946 }
11fdf7f2
TL
947 for (auto& svc : paxos_service) {
948 svc->post_refresh();
7c673cae 949 }
224ce89b 950 load_metadata();
7c673cae
FG
951}
952
953void Monitor::register_cluster_logger()
954{
955 if (!cluster_logger_registered) {
956 dout(10) << "register_cluster_logger" << dendl;
957 cluster_logger_registered = true;
958 cct->get_perfcounters_collection()->add(cluster_logger);
959 } else {
960 dout(10) << "register_cluster_logger - already registered" << dendl;
961 }
962}
963
964void Monitor::unregister_cluster_logger()
965{
966 if (cluster_logger_registered) {
967 dout(10) << "unregister_cluster_logger" << dendl;
968 cluster_logger_registered = false;
969 cct->get_perfcounters_collection()->remove(cluster_logger);
970 } else {
971 dout(10) << "unregister_cluster_logger - not registered" << dendl;
972 }
973}
974
975void Monitor::update_logger()
976{
977 cluster_logger->set(l_cluster_num_mon, monmap->size());
978 cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
979}
980
981void Monitor::shutdown()
982{
983 dout(1) << "shutdown" << dendl;
984
9f95a23c 985 lock.lock();
7c673cae
FG
986
987 wait_for_paxos_write();
988
11fdf7f2
TL
989 {
990 std::lock_guard l(auth_lock);
991 authmon()->_set_mon_num_rank(0, 0);
992 }
993
7c673cae
FG
994 state = STATE_SHUTDOWN;
995
9f95a23c 996 lock.unlock();
11fdf7f2 997 g_conf().remove_observer(this);
9f95a23c 998 lock.lock();
7c673cae
FG
999
1000 if (admin_hook) {
11fdf7f2 1001 cct->get_admin_socket()->unregister_commands(admin_hook);
7c673cae
FG
1002 delete admin_hook;
1003 admin_hook = NULL;
1004 }
1005
1006 elector.shutdown();
1007
1008 mgr_client.shutdown();
1009
9f95a23c 1010 lock.unlock();
7c673cae
FG
1011 finisher.wait_for_empty();
1012 finisher.stop();
9f95a23c 1013 lock.lock();
7c673cae
FG
1014
1015 // clean up
1016 paxos->shutdown();
11fdf7f2
TL
1017 for (auto& svc : paxos_service) {
1018 svc->shutdown();
1019 }
7c673cae
FG
1020
1021 finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
1022 finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
1023
1024 timer.shutdown();
1025
1026 cpu_tp.stop();
1027
1028 remove_all_sessions();
1029
f64942e4
AA
1030 log_client.shutdown();
1031
1032 // unlock before msgr shutdown...
9f95a23c 1033 lock.unlock();
f64942e4
AA
1034
1035 // shutdown messenger before removing logger from perfcounter collection,
1036 // otherwise _ms_dispatch() will try to update deleted logger
1037 messenger->shutdown();
1038 mgr_messenger->shutdown();
1039
7c673cae
FG
1040 if (logger) {
1041 cct->get_perfcounters_collection()->remove(logger);
1042 delete logger;
1043 logger = NULL;
1044 }
1045 if (cluster_logger) {
1046 if (cluster_logger_registered)
1047 cct->get_perfcounters_collection()->remove(cluster_logger);
1048 delete cluster_logger;
1049 cluster_logger = NULL;
1050 }
7c673cae
FG
1051}
1052
1053void Monitor::wait_for_paxos_write()
1054{
1055 if (paxos->is_writing() || paxos->is_writing_previous()) {
1056 dout(10) << __func__ << " flushing pending write" << dendl;
9f95a23c 1057 lock.unlock();
7c673cae 1058 store->flush();
9f95a23c 1059 lock.lock();
7c673cae
FG
1060 dout(10) << __func__ << " flushed pending write" << dendl;
1061 }
1062}
1063
11fdf7f2
TL
1064void Monitor::respawn()
1065{
1066 // --- WARNING TO FUTURE COPY/PASTERS ---
1067 // You must also add a call like
1068 //
1069 // ceph_pthread_setname(pthread_self(), "ceph-mon");
1070 //
1071 // to main() so that /proc/$pid/stat field 2 contains "(ceph-mon)"
1072 // instead of "(exe)", so that killall (and log rotation) will work.
1073
1074 dout(0) << __func__ << dendl;
1075
1076 char *new_argv[orig_argc+1];
1077 dout(1) << " e: '" << orig_argv[0] << "'" << dendl;
1078 for (int i=0; i<orig_argc; i++) {
1079 new_argv[i] = (char *)orig_argv[i];
1080 dout(1) << " " << i << ": '" << orig_argv[i] << "'" << dendl;
1081 }
1082 new_argv[orig_argc] = NULL;
1083
1084 /* Determine the path to our executable, test if Linux /proc/self/exe exists.
1085 * This allows us to exec the same executable even if it has since been
1086 * unlinked.
1087 */
1088 char exe_path[PATH_MAX] = "";
1089#ifdef PROCPREFIX
1090 if (readlink(PROCPREFIX "/proc/self/exe", exe_path, PATH_MAX-1) != -1) {
1091 dout(1) << "respawning with exe " << exe_path << dendl;
1092 strcpy(exe_path, PROCPREFIX "/proc/self/exe");
1093 } else {
1094#else
1095 {
1096#endif
1097 /* Print CWD for the user's interest */
1098 char buf[PATH_MAX];
1099 char *cwd = getcwd(buf, sizeof(buf));
1100 ceph_assert(cwd);
1101 dout(1) << " cwd " << cwd << dendl;
1102
1103 /* Fall back to a best-effort: just running in our CWD */
1104 strncpy(exe_path, orig_argv[0], PATH_MAX-1);
1105 }
1106
1107 dout(1) << " exe_path " << exe_path << dendl;
1108
1109 unblock_all_signals(NULL);
1110 execv(exe_path, new_argv);
1111
1112 dout(0) << "respawn execv " << orig_argv[0]
1113 << " failed with " << cpp_strerror(errno) << dendl;
1114
1115 // We have to assert out here, because suicide() returns, and callers
1116 // to respawn expect it never to return.
1117 ceph_abort();
1118}
1119
7c673cae
FG
1120void Monitor::bootstrap()
1121{
1122 dout(10) << "bootstrap" << dendl;
1123 wait_for_paxos_write();
1124
1125 sync_reset_requester();
1126 unregister_cluster_logger();
1127 cancel_probe_timeout();
1128
11fdf7f2
TL
1129 if (monmap->get_epoch() == 0) {
1130 dout(10) << "reverting to legacy ranks for seed monmap (epoch 0)" << dendl;
1131 monmap->calc_legacy_ranks();
1132 }
1133 dout(10) << "monmap " << *monmap << dendl;
9f95a23c
TL
1134 {
1135 auto from_release = monmap->min_mon_release;
1136 ostringstream err;
1137 if (!can_upgrade_from(from_release, "min_mon_release", err)) {
1138 derr << "current monmap has " << err.str() << " stopping." << dendl;
1139 exit(0);
1140 }
11fdf7f2 1141 }
7c673cae 1142 // note my rank
11fdf7f2 1143 int newrank = monmap->get_rank(messenger->get_myaddrs());
7c673cae
FG
1144 if (newrank < 0 && rank >= 0) {
1145 // was i ever part of the quorum?
1146 if (has_ever_joined) {
1147 dout(0) << " removed from monmap, suicide." << dendl;
1148 exit(0);
1149 }
1150 }
11fdf7f2
TL
1151 if (newrank >= 0 &&
1152 monmap->get_addrs(newrank) != messenger->get_myaddrs()) {
1153 dout(0) << " monmap addrs for rank " << newrank << " changed, i am "
1154 << messenger->get_myaddrs()
1155 << ", monmap is " << monmap->get_addrs(newrank) << ", respawning"
1156 << dendl;
9f95a23c
TL
1157
1158 if (monmap->get_epoch()) {
1159 // store this map in temp mon_sync location so that we use it on
1160 // our next startup
1161 derr << " stashing newest monmap " << monmap->get_epoch()
1162 << " for next startup" << dendl;
1163 bufferlist bl;
1164 monmap->encode(bl, -1);
1165 auto t(std::make_shared<MonitorDBStore::Transaction>());
1166 t->put("mon_sync", "temp_newer_monmap", bl);
1167 store->apply_transaction(t);
1168 }
1169
11fdf7f2
TL
1170 respawn();
1171 }
7c673cae
FG
1172 if (newrank != rank) {
1173 dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
1174 messenger->set_myname(entity_name_t::MON(newrank));
1175 rank = newrank;
1176
1177 // reset all connections, or else our peers will think we are someone else.
1178 messenger->mark_down_all();
1179 }
1180
1181 // reset
1182 state = STATE_PROBING;
1183
1184 _reset();
1185
1186 // sync store
11fdf7f2 1187 if (g_conf()->mon_compact_on_bootstrap) {
7c673cae
FG
1188 dout(10) << "bootstrap -- triggering compaction" << dendl;
1189 store->compact();
1190 dout(10) << "bootstrap -- finished compaction" << dendl;
1191 }
1192
1193 // singleton monitor?
1194 if (monmap->size() == 1 && rank == 0) {
1195 win_standalone_election();
1196 return;
1197 }
1198
1199 reset_probe_timeout();
1200
1201 // i'm outside the quorum
1202 if (monmap->contains(name))
1203 outside_quorum.insert(name);
1204
1205 // probe monitors
1206 dout(10) << "probing other monitors" << dendl;
1207 for (unsigned i = 0; i < monmap->size(); i++) {
1208 if ((int)i != rank)
11fdf7f2
TL
1209 send_mon_message(
1210 new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
1211 ceph_release()),
1212 i);
1213 }
1214 for (auto& av : extra_probe_peers) {
1215 if (av != messenger->get_myaddrs()) {
1216 messenger->send_to_mon(
1217 new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
1218 ceph_release()),
1219 av);
7c673cae
FG
1220 }
1221 }
1222}
1223
11fdf7f2
TL
1224bool Monitor::_add_bootstrap_peer_hint(std::string_view cmd,
1225 const cmdmap_t& cmdmap,
1226 ostream& ss)
7c673cae 1227{
7c673cae
FG
1228 if (is_leader() || is_peon()) {
1229 ss << "mon already active; ignoring bootstrap hint";
1230 return true;
1231 }
1232
11fdf7f2
TL
1233 entity_addrvec_t addrs;
1234 string addrstr;
9f95a23c 1235 if (cmd_getval(cmdmap, "addr", addrstr)) {
11fdf7f2
TL
1236 dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' addr '"
1237 << addrstr << "'" << dendl;
1238
1239 entity_addr_t addr;
1240 const char *end = 0;
1241 if (!addr.parse(addrstr.c_str(), &end, entity_addr_t::TYPE_ANY)) {
1242 ss << "failed to parse addrs '" << addrstr
1243 << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1244 return false;
1245 }
1246
1247 addrs.v.push_back(addr);
1248 if (addr.get_port() == 0) {
1249 addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
1250 addrs.v[0].set_port(CEPH_MON_PORT_IANA);
1251 addrs.v.push_back(addr);
1252 addrs.v[1].set_type(entity_addr_t::TYPE_LEGACY);
1253 addrs.v[1].set_port(CEPH_MON_PORT_LEGACY);
1254 } else if (addr.get_type() == entity_addr_t::TYPE_ANY) {
1255 if (addr.get_port() == CEPH_MON_PORT_LEGACY) {
1256 addrs.v[0].set_type(entity_addr_t::TYPE_LEGACY);
1257 } else {
1258 addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
1259 }
1260 }
9f95a23c 1261 } else if (cmd_getval(cmdmap, "addrv", addrstr)) {
11fdf7f2
TL
1262 dout(10) << "_add_bootstrap_peer_hintv '" << cmd << "' addrv '"
1263 << addrstr << "'" << dendl;
1264 const char *end = 0;
1265 if (!addrs.parse(addrstr.c_str(), &end)) {
1266 ss << "failed to parse addrs '" << addrstr
1267 << "'; syntax is 'add_bootstrap_peer_hintv v2:ip:port[,v1:ip:port]'";
1268 return false;
1269 }
1270 } else {
1271 ss << "no addr or addrv provided";
1272 return false;
1273 }
7c673cae 1274
11fdf7f2
TL
1275 extra_probe_peers.insert(addrs);
1276 ss << "adding peer " << addrs << " to list: " << extra_probe_peers;
7c673cae
FG
1277 return true;
1278}
1279
1280// called by bootstrap(), or on leader|peon -> electing
1281void Monitor::_reset()
1282{
1283 dout(10) << __func__ << dendl;
1284
11fdf7f2
TL
1285 // disable authentication
1286 {
1287 std::lock_guard l(auth_lock);
1288 authmon()->_set_mon_num_rank(0, 0);
1289 }
1290
7c673cae
FG
1291 cancel_probe_timeout();
1292 timecheck_finish();
1293 health_events_cleanup();
181888fb 1294 health_check_log_times.clear();
7c673cae
FG
1295 scrub_event_cancel();
1296
1297 leader_since = utime_t();
11fdf7f2 1298 quorum_since = {};
7c673cae
FG
1299 if (!quorum.empty()) {
1300 exited_quorum = ceph_clock_now();
1301 }
1302 quorum.clear();
1303 outside_quorum.clear();
31f18b77 1304 quorum_feature_map.clear();
7c673cae
FG
1305
1306 scrub_reset();
1307
1308 paxos->restart();
1309
11fdf7f2
TL
1310 for (auto& svc : paxos_service) {
1311 svc->restart();
1312 }
7c673cae
FG
1313}
1314
1315
1316// -----------------------------------------------------------
1317// sync
1318
1319set<string> Monitor::get_sync_targets_names()
1320{
1321 set<string> targets;
1322 targets.insert(paxos->get_name());
11fdf7f2
TL
1323 for (auto& svc : paxos_service) {
1324 svc->get_store_prefixes(targets);
1325 }
7c673cae 1326 ConfigKeyService *config_key_service_ptr = dynamic_cast<ConfigKeyService*>(config_key_service);
11fdf7f2 1327 ceph_assert(config_key_service_ptr);
7c673cae
FG
1328 config_key_service_ptr->get_store_prefixes(targets);
1329 return targets;
1330}
1331
1332
1333void Monitor::sync_timeout()
1334{
1335 dout(10) << __func__ << dendl;
11fdf7f2 1336 ceph_assert(state == STATE_SYNCHRONIZING);
7c673cae
FG
1337 bootstrap();
1338}
1339
1340void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1341{
1342 dout(1) << __func__ << dendl;
1343
1344 MonMap latest_monmap;
1345
1346 // Grab latest monmap from MonmapMonitor
1347 bufferlist monmon_bl;
1348 int err = monmon()->get_monmap(monmon_bl);
1349 if (err < 0) {
1350 if (err != -ENOENT) {
1351 derr << __func__
1352 << " something wrong happened while reading the store: "
1353 << cpp_strerror(err) << dendl;
11fdf7f2 1354 ceph_abort_msg("error reading the store");
7c673cae
FG
1355 }
1356 } else {
1357 latest_monmap.decode(monmon_bl);
1358 }
1359
1360 // Grab last backed up monmap (if any) and compare epochs
1361 if (store->exists("mon_sync", "latest_monmap")) {
1362 bufferlist backup_bl;
1363 int err = store->get("mon_sync", "latest_monmap", backup_bl);
1364 if (err < 0) {
1365 derr << __func__
1366 << " something wrong happened while reading the store: "
1367 << cpp_strerror(err) << dendl;
11fdf7f2 1368 ceph_abort_msg("error reading the store");
7c673cae 1369 }
11fdf7f2 1370 ceph_assert(backup_bl.length() > 0);
7c673cae
FG
1371
1372 MonMap backup_monmap;
1373 backup_monmap.decode(backup_bl);
1374
1375 if (backup_monmap.epoch > latest_monmap.epoch)
1376 latest_monmap = backup_monmap;
1377 }
1378
1379 // Check if our current monmap's epoch is greater than the one we've
1380 // got so far.
1381 if (monmap->epoch > latest_monmap.epoch)
1382 latest_monmap = *monmap;
1383
1384 dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1385
1386 latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1387}
1388
1389void Monitor::sync_reset_requester()
1390{
1391 dout(10) << __func__ << dendl;
1392
1393 if (sync_timeout_event) {
1394 timer.cancel_event(sync_timeout_event);
1395 sync_timeout_event = NULL;
1396 }
1397
11fdf7f2 1398 sync_provider = entity_addrvec_t();
7c673cae
FG
1399 sync_cookie = 0;
1400 sync_full = false;
1401 sync_start_version = 0;
1402}
1403
1404void Monitor::sync_reset_provider()
1405{
1406 dout(10) << __func__ << dendl;
1407 sync_providers.clear();
1408}
1409
11fdf7f2 1410void Monitor::sync_start(entity_addrvec_t &addrs, bool full)
7c673cae 1411{
11fdf7f2 1412 dout(10) << __func__ << " " << addrs << (full ? " full" : " recent") << dendl;
7c673cae 1413
11fdf7f2 1414 ceph_assert(state == STATE_PROBING ||
7c673cae
FG
1415 state == STATE_SYNCHRONIZING);
1416 state = STATE_SYNCHRONIZING;
1417
1418 // make sure are not a provider for anyone!
1419 sync_reset_provider();
1420
1421 sync_full = full;
1422
1423 if (sync_full) {
1424 // stash key state, and mark that we are syncing
1425 auto t(std::make_shared<MonitorDBStore::Transaction>());
1426 sync_stash_critical_state(t);
1427 t->put("mon_sync", "in_sync", 1);
1428
11fdf7f2 1429 sync_last_committed_floor = std::max(sync_last_committed_floor, paxos->get_version());
7c673cae
FG
1430 dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1431 << sync_last_committed_floor << dendl;
1432 t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1433
1434 store->apply_transaction(t);
1435
11fdf7f2 1436 ceph_assert(g_conf()->mon_sync_requester_kill_at != 1);
7c673cae
FG
1437
1438 // clear the underlying store
1439 set<string> targets = get_sync_targets_names();
1440 dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1441 store->clear(targets);
1442
1443 // make sure paxos knows it has been reset. this prevents a
1444 // bootstrap and then different probe reply order from possibly
1445 // deciding a partial or no sync is needed.
1446 paxos->init();
1447
11fdf7f2 1448 ceph_assert(g_conf()->mon_sync_requester_kill_at != 2);
7c673cae
FG
1449 }
1450
1451 // assume 'other' as the leader. We will update the leader once we receive
1452 // a reply to the sync start.
11fdf7f2 1453 sync_provider = addrs;
7c673cae
FG
1454
1455 sync_reset_timeout();
1456
1457 MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1458 if (!sync_full)
1459 m->last_committed = paxos->get_version();
11fdf7f2 1460 messenger->send_to_mon(m, sync_provider);
7c673cae
FG
1461}
1462
1463void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1464{
1465 dout(10) << __func__ << dendl;
1466 bufferlist backup_monmap;
1467 sync_obtain_latest_monmap(backup_monmap);
11fdf7f2 1468 ceph_assert(backup_monmap.length() > 0);
7c673cae
FG
1469 t->put("mon_sync", "latest_monmap", backup_monmap);
1470}
1471
1472void Monitor::sync_reset_timeout()
1473{
1474 dout(10) << __func__ << dendl;
1475 if (sync_timeout_event)
1476 timer.cancel_event(sync_timeout_event);
3efd9988 1477 sync_timeout_event = timer.add_event_after(
11fdf7f2 1478 g_conf()->mon_sync_timeout,
9f95a23c 1479 new C_MonContext{this, [this](int) {
3efd9988 1480 sync_timeout();
9f95a23c 1481 }});
7c673cae
FG
1482}
1483
1484void Monitor::sync_finish(version_t last_committed)
1485{
1486 dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1487
11fdf7f2 1488 ceph_assert(g_conf()->mon_sync_requester_kill_at != 7);
7c673cae
FG
1489
1490 if (sync_full) {
1491 // finalize the paxos commits
1492 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1493 paxos->read_and_prepare_transactions(tx, sync_start_version,
1494 last_committed);
1495 tx->put(paxos->get_name(), "last_committed", last_committed);
1496
1497 dout(30) << __func__ << " final tx dump:\n";
1498 JSONFormatter f(true);
1499 tx->dump(&f);
1500 f.flush(*_dout);
1501 *_dout << dendl;
1502
1503 store->apply_transaction(tx);
1504 }
1505
11fdf7f2 1506 ceph_assert(g_conf()->mon_sync_requester_kill_at != 8);
7c673cae
FG
1507
1508 auto t(std::make_shared<MonitorDBStore::Transaction>());
1509 t->erase("mon_sync", "in_sync");
1510 t->erase("mon_sync", "force_sync");
1511 t->erase("mon_sync", "last_committed_floor");
1512 store->apply_transaction(t);
1513
11fdf7f2 1514 ceph_assert(g_conf()->mon_sync_requester_kill_at != 9);
7c673cae
FG
1515
1516 init_paxos();
1517
11fdf7f2 1518 ceph_assert(g_conf()->mon_sync_requester_kill_at != 10);
7c673cae
FG
1519
1520 bootstrap();
1521}
1522
1523void Monitor::handle_sync(MonOpRequestRef op)
1524{
9f95a23c 1525 auto m = op->get_req<MMonSync>();
7c673cae
FG
1526 dout(10) << __func__ << " " << *m << dendl;
1527 switch (m->op) {
1528
1529 // provider ---------
1530
1531 case MMonSync::OP_GET_COOKIE_FULL:
1532 case MMonSync::OP_GET_COOKIE_RECENT:
1533 handle_sync_get_cookie(op);
1534 break;
1535 case MMonSync::OP_GET_CHUNK:
1536 handle_sync_get_chunk(op);
1537 break;
1538
1539 // client -----------
1540
1541 case MMonSync::OP_COOKIE:
1542 handle_sync_cookie(op);
1543 break;
1544
1545 case MMonSync::OP_CHUNK:
1546 case MMonSync::OP_LAST_CHUNK:
1547 handle_sync_chunk(op);
1548 break;
1549 case MMonSync::OP_NO_COOKIE:
1550 handle_sync_no_cookie(op);
1551 break;
1552
1553 default:
1554 dout(0) << __func__ << " unknown op " << m->op << dendl;
11fdf7f2 1555 ceph_abort_msg("unknown op");
7c673cae
FG
1556 }
1557}
1558
1559// leader
1560
1561void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1562{
9f95a23c 1563 auto m = op->get_req<MMonSync>();
7c673cae
FG
1564 MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1565 m->get_connection()->send_message(reply);
1566}
1567
1568void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1569{
9f95a23c 1570 auto m = op->get_req<MMonSync>();
7c673cae
FG
1571 if (is_synchronizing()) {
1572 _sync_reply_no_cookie(op);
1573 return;
1574 }
1575
11fdf7f2 1576 ceph_assert(g_conf()->mon_sync_provider_kill_at != 1);
7c673cae
FG
1577
1578 // make sure they can understand us.
1579 if ((required_features ^ m->get_connection()->get_features()) &
1580 required_features) {
1581 dout(5) << " ignoring peer mon." << m->get_source().num()
1582 << " has features " << std::hex
1583 << m->get_connection()->get_features()
1584 << " but we require " << required_features << std::dec << dendl;
1585 return;
1586 }
1587
1588 // make up a unique cookie. include election epoch (which persists
1589 // across restarts for the whole cluster) and a counter for this
1590 // process instance. there is no need to be unique *across*
1591 // monitors, though.
1592 uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
11fdf7f2 1593 ceph_assert(sync_providers.count(cookie) == 0);
7c673cae
FG
1594
1595 dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1596
1597 SyncProvider& sp = sync_providers[cookie];
1598 sp.cookie = cookie;
11fdf7f2
TL
1599 sp.addrs = m->get_source_addrs();
1600 sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
7c673cae
FG
1601
1602 set<string> sync_targets;
1603 if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1604 // full scan
1605 sync_targets = get_sync_targets_names();
1606 sp.last_committed = paxos->get_version();
1607 sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1608 sp.full = true;
1609 dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1610 } else {
1611 // just catch up paxos
1612 sp.last_committed = m->last_committed;
1613 }
1614 dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1615
1616 MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1617 reply->last_committed = sp.last_committed;
1618 m->get_connection()->send_message(reply);
1619}
1620
1621void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1622{
9f95a23c 1623 auto m = op->get_req<MMonSync>();
7c673cae
FG
1624 dout(10) << __func__ << " " << *m << dendl;
1625
1626 if (sync_providers.count(m->cookie) == 0) {
1627 dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1628 _sync_reply_no_cookie(op);
1629 return;
1630 }
1631
11fdf7f2 1632 ceph_assert(g_conf()->mon_sync_provider_kill_at != 2);
7c673cae
FG
1633
1634 SyncProvider& sp = sync_providers[m->cookie];
11fdf7f2 1635 sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
7c673cae
FG
1636
1637 if (sp.last_committed < paxos->get_first_committed() &&
1638 paxos->get_first_committed() > 1) {
1639 dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1640 << " < our fc " << paxos->get_first_committed() << dendl;
1641 sync_providers.erase(m->cookie);
1642 _sync_reply_no_cookie(op);
1643 return;
1644 }
1645
1646 MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1647 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1648
9f95a23c
TL
1649 int bytes_left = g_conf()->mon_sync_max_payload_size;
1650 int keys_left = g_conf()->mon_sync_max_payload_keys;
1651 while (sp.last_committed < paxos->get_version() &&
1652 bytes_left > 0 &&
1653 keys_left > 0) {
7c673cae
FG
1654 bufferlist bl;
1655 sp.last_committed++;
224ce89b
WB
1656
1657 int err = store->get(paxos->get_name(), sp.last_committed, bl);
11fdf7f2 1658 ceph_assert(err == 0);
224ce89b 1659
7c673cae 1660 tx->put(paxos->get_name(), sp.last_committed, bl);
9f95a23c
TL
1661 bytes_left -= bl.length();
1662 --keys_left;
7c673cae
FG
1663 dout(20) << __func__ << " including paxos state " << sp.last_committed
1664 << dendl;
1665 }
1666 reply->last_committed = sp.last_committed;
1667
9f95a23c
TL
1668 if (sp.full && bytes_left > 0 && keys_left > 0) {
1669 sp.synchronizer->get_chunk_tx(tx, bytes_left, keys_left);
7c673cae
FG
1670 sp.last_key = sp.synchronizer->get_last_key();
1671 reply->last_key = sp.last_key;
1672 }
1673
1674 if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1675 sp.last_committed < paxos->get_version()) {
1676 dout(10) << __func__ << " chunk, through version " << sp.last_committed
1677 << " key " << sp.last_key << dendl;
1678 } else {
1679 dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1680 << " key " << sp.last_key << dendl;
1681 reply->op = MMonSync::OP_LAST_CHUNK;
1682
11fdf7f2 1683 ceph_assert(g_conf()->mon_sync_provider_kill_at != 3);
7c673cae
FG
1684
1685 // clean up our local state
1686 sync_providers.erase(sp.cookie);
1687 }
1688
11fdf7f2 1689 encode(*tx, reply->chunk_bl);
7c673cae
FG
1690
1691 m->get_connection()->send_message(reply);
1692}
1693
1694// requester
1695
1696void Monitor::handle_sync_cookie(MonOpRequestRef op)
1697{
9f95a23c 1698 auto m = op->get_req<MMonSync>();
7c673cae
FG
1699 dout(10) << __func__ << " " << *m << dendl;
1700 if (sync_cookie) {
1701 dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1702 return;
1703 }
11fdf7f2 1704 if (m->get_source_addrs() != sync_provider) {
7c673cae
FG
1705 dout(10) << __func__ << " source does not match, discarding" << dendl;
1706 return;
1707 }
1708 sync_cookie = m->cookie;
1709 sync_start_version = m->last_committed;
1710
1711 sync_reset_timeout();
1712 sync_get_next_chunk();
1713
11fdf7f2 1714 ceph_assert(g_conf()->mon_sync_requester_kill_at != 3);
7c673cae
FG
1715}
1716
1717void Monitor::sync_get_next_chunk()
1718{
1719 dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
11fdf7f2
TL
1720 if (g_conf()->mon_inject_sync_get_chunk_delay > 0) {
1721 dout(20) << __func__ << " injecting delay of " << g_conf()->mon_inject_sync_get_chunk_delay << dendl;
1722 usleep((long long)(g_conf()->mon_inject_sync_get_chunk_delay * 1000000.0));
7c673cae
FG
1723 }
1724 MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
11fdf7f2 1725 messenger->send_to_mon(r, sync_provider);
7c673cae 1726
11fdf7f2 1727 ceph_assert(g_conf()->mon_sync_requester_kill_at != 4);
7c673cae
FG
1728}
1729
1730void Monitor::handle_sync_chunk(MonOpRequestRef op)
1731{
9f95a23c 1732 auto m = op->get_req<MMonSync>();
7c673cae
FG
1733 dout(10) << __func__ << " " << *m << dendl;
1734
1735 if (m->cookie != sync_cookie) {
1736 dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1737 return;
1738 }
11fdf7f2 1739 if (m->get_source_addrs() != sync_provider) {
7c673cae
FG
1740 dout(10) << __func__ << " source does not match, discarding" << dendl;
1741 return;
1742 }
1743
11fdf7f2
TL
1744 ceph_assert(state == STATE_SYNCHRONIZING);
1745 ceph_assert(g_conf()->mon_sync_requester_kill_at != 5);
7c673cae
FG
1746
1747 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1748 tx->append_from_encoded(m->chunk_bl);
1749
1750 dout(30) << __func__ << " tx dump:\n";
1751 JSONFormatter f(true);
1752 tx->dump(&f);
1753 f.flush(*_dout);
1754 *_dout << dendl;
1755
1756 store->apply_transaction(tx);
1757
11fdf7f2 1758 ceph_assert(g_conf()->mon_sync_requester_kill_at != 6);
7c673cae
FG
1759
1760 if (!sync_full) {
1761 dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1762 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1763 paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1764 m->last_committed);
1765 tx->put(paxos->get_name(), "last_committed", m->last_committed);
1766
1767 dout(30) << __func__ << " tx dump:\n";
1768 JSONFormatter f(true);
1769 tx->dump(&f);
1770 f.flush(*_dout);
1771 *_dout << dendl;
1772
1773 store->apply_transaction(tx);
1774 paxos->init(); // to refresh what we just wrote
1775 }
1776
1777 if (m->op == MMonSync::OP_CHUNK) {
1778 sync_reset_timeout();
1779 sync_get_next_chunk();
1780 } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1781 sync_finish(m->last_committed);
1782 }
1783}
1784
1785void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1786{
1787 dout(10) << __func__ << dendl;
1788 bootstrap();
1789}
1790
1791void Monitor::sync_trim_providers()
1792{
1793 dout(20) << __func__ << dendl;
1794
1795 utime_t now = ceph_clock_now();
1796 map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1797 while (p != sync_providers.end()) {
1798 if (now > p->second.timeout) {
11fdf7f2
TL
1799 dout(10) << __func__ << " expiring cookie " << p->second.cookie
1800 << " for " << p->second.addrs << dendl;
7c673cae
FG
1801 sync_providers.erase(p++);
1802 } else {
1803 ++p;
1804 }
1805 }
1806}
1807
1808// ---------------------------------------------------
1809// probe
1810
1811void Monitor::cancel_probe_timeout()
1812{
1813 if (probe_timeout_event) {
1814 dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1815 timer.cancel_event(probe_timeout_event);
1816 probe_timeout_event = NULL;
1817 } else {
1818 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1819 }
1820}
1821
1822void Monitor::reset_probe_timeout()
1823{
1824 cancel_probe_timeout();
9f95a23c 1825 probe_timeout_event = new C_MonContext{this, [this](int r) {
7c673cae 1826 probe_timeout(r);
9f95a23c 1827 }};
11fdf7f2 1828 double t = g_conf()->mon_probe_timeout;
3efd9988
FG
1829 if (timer.add_event_after(t, probe_timeout_event)) {
1830 dout(10) << "reset_probe_timeout " << probe_timeout_event
1831 << " after " << t << " seconds" << dendl;
1832 } else {
1833 probe_timeout_event = nullptr;
1834 }
7c673cae
FG
1835}
1836
1837void Monitor::probe_timeout(int r)
1838{
1839 dout(4) << "probe_timeout " << probe_timeout_event << dendl;
11fdf7f2
TL
1840 ceph_assert(is_probing() || is_synchronizing());
1841 ceph_assert(probe_timeout_event);
7c673cae
FG
1842 probe_timeout_event = NULL;
1843 bootstrap();
1844}
1845
1846void Monitor::handle_probe(MonOpRequestRef op)
1847{
9f95a23c 1848 auto m = op->get_req<MMonProbe>();
7c673cae
FG
1849 dout(10) << "handle_probe " << *m << dendl;
1850
1851 if (m->fsid != monmap->fsid) {
1852 dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1853 return;
1854 }
1855
1856 switch (m->op) {
1857 case MMonProbe::OP_PROBE:
1858 handle_probe_probe(op);
1859 break;
1860
1861 case MMonProbe::OP_REPLY:
1862 handle_probe_reply(op);
1863 break;
1864
1865 case MMonProbe::OP_MISSING_FEATURES:
81eedcae
TL
1866 derr << __func__ << " require release " << (int)m->mon_release << " > "
1867 << (int)ceph_release()
1868 << ", or missing features (have " << CEPH_FEATURES_ALL
7c673cae 1869 << ", required " << m->required_features
11fdf7f2 1870 << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL) << ")"
7c673cae
FG
1871 << dendl;
1872 break;
1873 }
1874}
1875
1876void Monitor::handle_probe_probe(MonOpRequestRef op)
1877{
9f95a23c 1878 auto m = op->get_req<MMonProbe>();
7c673cae
FG
1879
1880 dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
1881 << " features " << m->get_connection()->get_features() << dendl;
1882 uint64_t missing = required_features & ~m->get_connection()->get_features();
9f95a23c
TL
1883 if ((m->mon_release != ceph_release_t::unknown &&
1884 m->mon_release < monmap->min_mon_release) ||
81eedcae
TL
1885 missing) {
1886 dout(1) << " peer " << m->get_source_addr()
9f95a23c
TL
1887 << " release " << m->mon_release
1888 << " < min_mon_release " << monmap->min_mon_release
11fdf7f2
TL
1889 << ", or missing features " << missing << dendl;
1890 MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1891 name, has_ever_joined, monmap->min_mon_release);
1892 m->required_features = required_features;
1893 m->get_connection()->send_message(r);
7c673cae
FG
1894 goto out;
1895 }
1896
1897 if (!is_probing() && !is_synchronizing()) {
1898 // If the probing mon is way ahead of us, we need to re-bootstrap.
1899 // Normally we capture this case when we initially bootstrap, but
1900 // it is possible we pass those checks (we overlap with
1901 // quorum-to-be) but fail to join a quorum before it moves past
1902 // us. We need to be kicked back to bootstrap so we can
1903 // synchonize, not keep calling elections.
1904 if (paxos->get_version() + 1 < m->paxos_first_version) {
1905 dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1906 << "ahead of us, re-bootstrapping" << dendl;
1907 bootstrap();
1908 goto out;
1909
1910 }
1911 }
1912
1913 MMonProbe *r;
11fdf7f2
TL
1914 r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined,
1915 ceph_release());
7c673cae
FG
1916 r->name = name;
1917 r->quorum = quorum;
1918 monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1919 r->paxos_first_version = paxos->get_first_committed();
1920 r->paxos_last_version = paxos->get_version();
1921 m->get_connection()->send_message(r);
1922
1923 // did we discover a peer here?
1924 if (!monmap->contains(m->get_source_addr())) {
11fdf7f2 1925 dout(1) << " adding peer " << m->get_source_addrs()
7c673cae 1926 << " to list of hints" << dendl;
11fdf7f2 1927 extra_probe_peers.insert(m->get_source_addrs());
7c673cae
FG
1928 }
1929
1930 out:
1931 return;
1932}
1933
1934void Monitor::handle_probe_reply(MonOpRequestRef op)
1935{
9f95a23c 1936 auto m = op->get_req<MMonProbe>();
11fdf7f2
TL
1937 dout(10) << "handle_probe_reply " << m->get_source_inst()
1938 << " " << *m << dendl;
7c673cae
FG
1939 dout(10) << " monmap is " << *monmap << dendl;
1940
1941 // discover name and addrs during probing or electing states.
1942 if (!is_probing() && !is_electing()) {
1943 return;
1944 }
1945
1946 // newer map, or they've joined a quorum and we haven't?
1947 bufferlist mybl;
1948 monmap->encode(mybl, m->get_connection()->get_features());
1949 // make sure it's actually different; the checks below err toward
1950 // taking the other guy's map, which could cause us to loop.
1951 if (!mybl.contents_equal(m->monmap_bl)) {
1952 MonMap *newmap = new MonMap;
1953 newmap->decode(m->monmap_bl);
1954 if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
1955 !has_ever_joined)) {
1956 dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
1957 << ", mine was " << monmap->get_epoch() << dendl;
1958 delete newmap;
1959 monmap->decode(m->monmap_bl);
1960
1961 bootstrap();
1962 return;
1963 }
1964 delete newmap;
1965 }
1966
1967 // rename peer?
1968 string peer_name = monmap->get_name(m->get_source_addr());
1969 if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
1970 dout(10) << " renaming peer " << m->get_source_addr() << " "
1971 << peer_name << " -> " << m->name << " in my monmap"
1972 << dendl;
1973 monmap->rename(peer_name, m->name);
1974
1975 if (is_electing()) {
1976 bootstrap();
1977 return;
1978 }
11fdf7f2 1979 } else if (peer_name.size()) {
7c673cae 1980 dout(10) << " peer name is " << peer_name << dendl;
11fdf7f2
TL
1981 } else {
1982 dout(10) << " peer " << m->get_source_addr() << " not in map" << dendl;
7c673cae
FG
1983 }
1984
1985 // new initial peer?
1986 if (monmap->get_epoch() == 0 &&
1987 monmap->contains(m->name) &&
11fdf7f2
TL
1988 monmap->get_addrs(m->name).front().is_blank_ip()) {
1989 dout(1) << " learned initial mon " << m->name
1990 << " addrs " << m->get_source_addrs() << dendl;
1991 monmap->set_addrvec(m->name, m->get_source_addrs());
7c673cae
FG
1992
1993 bootstrap();
1994 return;
1995 }
1996
1997 // end discover phase
1998 if (!is_probing()) {
1999 return;
2000 }
2001
11fdf7f2 2002 ceph_assert(paxos != NULL);
7c673cae
FG
2003
2004 if (is_synchronizing()) {
2005 dout(10) << " currently syncing" << dendl;
2006 return;
2007 }
2008
11fdf7f2 2009 entity_addrvec_t other = m->get_source_addrs();
7c673cae
FG
2010
2011 if (m->paxos_last_version < sync_last_committed_floor) {
2012 dout(10) << " peer paxos versions [" << m->paxos_first_version
2013 << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
2014 << sync_last_committed_floor << ", ignoring"
2015 << dendl;
2016 } else {
2017 if (paxos->get_version() < m->paxos_first_version &&
2018 m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
2019 dout(10) << " peer paxos first versions [" << m->paxos_first_version
2020 << "," << m->paxos_last_version << "]"
2021 << " vs my version " << paxos->get_version()
2022 << " (too far ahead)"
2023 << dendl;
2024 cancel_probe_timeout();
2025 sync_start(other, true);
2026 return;
2027 }
11fdf7f2 2028 if (paxos->get_version() + g_conf()->paxos_max_join_drift < m->paxos_last_version) {
7c673cae
FG
2029 dout(10) << " peer paxos last version " << m->paxos_last_version
2030 << " vs my version " << paxos->get_version()
2031 << " (too far ahead)"
2032 << dendl;
2033 cancel_probe_timeout();
2034 sync_start(other, false);
2035 return;
2036 }
2037 }
2038
11fdf7f2
TL
2039 // did the existing cluster complete upgrade to luminous?
2040 if (osdmon()->osdmap.get_epoch()) {
9f95a23c 2041 if (osdmon()->osdmap.require_osd_release < ceph_release_t::luminous) {
11fdf7f2
TL
2042 derr << __func__ << " existing cluster has not completed upgrade to"
2043 << " luminous; 'ceph osd require_osd_release luminous' before"
2044 << " upgrading" << dendl;
2045 exit(0);
2046 }
2047 if (!osdmon()->osdmap.test_flag(CEPH_OSDMAP_PURGED_SNAPDIRS) ||
2048 !osdmon()->osdmap.test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) {
2049 derr << __func__ << " existing cluster has not completed a full luminous"
2050 << " scrub to purge legacy snapdir objects; please scrub before"
2051 << " upgrading beyond luminous." << dendl;
2052 exit(0);
2053 }
2054 }
2055
7c673cae
FG
2056 // is there an existing quorum?
2057 if (m->quorum.size()) {
2058 dout(10) << " existing quorum " << m->quorum << dendl;
2059
2060 dout(10) << " peer paxos version " << m->paxos_last_version
2061 << " vs my version " << paxos->get_version()
2062 << " (ok)"
2063 << dendl;
2064
2065 if (monmap->contains(name) &&
11fdf7f2 2066 !monmap->get_addrs(name).front().is_blank_ip()) {
7c673cae
FG
2067 // i'm part of the cluster; just initiate a new election
2068 start_election();
2069 } else {
2070 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
11fdf7f2
TL
2071 send_mon_message(
2072 new MMonJoin(monmap->fsid, name, messenger->get_myaddrs()),
2073 *m->quorum.begin());
7c673cae
FG
2074 }
2075 } else {
2076 if (monmap->contains(m->name)) {
2077 dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
2078 outside_quorum.insert(m->name);
2079 } else {
2080 dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
2081 return;
2082 }
2083
11fdf7f2 2084 unsigned need = monmap->min_quorum_size();
7c673cae
FG
2085 dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
2086 if (outside_quorum.size() >= need) {
2087 if (outside_quorum.count(name)) {
2088 dout(10) << " that's enough to form a new quorum, calling election" << dendl;
2089 start_election();
2090 } else {
2091 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
2092 }
2093 } else {
2094 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
2095 }
2096 }
2097}
2098
2099void Monitor::join_election()
2100{
2101 dout(10) << __func__ << dendl;
2102 wait_for_paxos_write();
2103 _reset();
2104 state = STATE_ELECTING;
2105
2106 logger->inc(l_mon_num_elections);
2107}
2108
2109void Monitor::start_election()
2110{
2111 dout(10) << "start_election" << dendl;
2112 wait_for_paxos_write();
2113 _reset();
2114 state = STATE_ELECTING;
2115
2116 logger->inc(l_mon_num_elections);
2117 logger->inc(l_mon_election_call);
2118
b32b8144 2119 clog->info() << "mon." << name << " calling monitor election";
7c673cae
FG
2120 elector.call_election();
2121}
2122
2123void Monitor::win_standalone_election()
2124{
2125 dout(1) << "win_standalone_election" << dendl;
2126
2127 // bump election epoch, in case the previous epoch included other
2128 // monitors; we need to be able to make the distinction.
9f95a23c 2129 elector.declare_standalone_victory();
7c673cae
FG
2130
2131 rank = monmap->get_rank(name);
11fdf7f2 2132 ceph_assert(rank == 0);
7c673cae
FG
2133 set<int> q;
2134 q.insert(rank);
2135
224ce89b
WB
2136 map<int,Metadata> metadata;
2137 collect_metadata(&metadata[0]);
2138
7c673cae
FG
2139 win_election(elector.get_epoch(), q,
2140 CEPH_FEATURES_ALL,
2141 ceph::features::mon::get_supported(),
11fdf7f2 2142 ceph_release(),
d2e6a577 2143 metadata);
7c673cae
FG
2144}
2145
2146const utime_t& Monitor::get_leader_since() const
2147{
11fdf7f2 2148 ceph_assert(state == STATE_LEADER);
7c673cae
FG
2149 return leader_since;
2150}
2151
2152epoch_t Monitor::get_epoch()
2153{
2154 return elector.get_epoch();
2155}
2156
2157void Monitor::_finish_svc_election()
2158{
11fdf7f2 2159 ceph_assert(state == STATE_LEADER || state == STATE_PEON);
7c673cae 2160
11fdf7f2 2161 for (auto& svc : paxos_service) {
7c673cae 2162 // we already called election_finished() on monmon(); avoid callig twice
11fdf7f2 2163 if (state == STATE_LEADER && svc.get() == monmon())
7c673cae 2164 continue;
11fdf7f2 2165 svc->election_finished();
7c673cae
FG
2166 }
2167}
2168
9f95a23c 2169void Monitor::win_election(epoch_t epoch, const set<int>& active, uint64_t features,
7c673cae 2170 const mon_feature_t& mon_features,
9f95a23c 2171 ceph_release_t min_mon_release,
d2e6a577 2172 const map<int,Metadata>& metadata)
7c673cae
FG
2173{
2174 dout(10) << __func__ << " epoch " << epoch << " quorum " << active
2175 << " features " << features
2176 << " mon_features " << mon_features
11fdf7f2 2177 << " min_mon_release " << min_mon_release
7c673cae 2178 << dendl;
11fdf7f2 2179 ceph_assert(is_electing());
7c673cae
FG
2180 state = STATE_LEADER;
2181 leader_since = ceph_clock_now();
11fdf7f2 2182 quorum_since = mono_clock::now();
7c673cae
FG
2183 leader = rank;
2184 quorum = active;
2185 quorum_con_features = features;
2186 quorum_mon_features = mon_features;
11fdf7f2 2187 quorum_min_mon_release = min_mon_release;
224ce89b 2188 pending_metadata = metadata;
7c673cae
FG
2189 outside_quorum.clear();
2190
b32b8144
FG
2191 clog->info() << "mon." << name << " is new leader, mons " << get_quorum_names()
2192 << " in quorum (ranks " << quorum << ")";
7c673cae 2193
d2e6a577 2194 set_leader_commands(get_local_commands(mon_features));
7c673cae
FG
2195
2196 paxos->leader_init();
2197 // NOTE: tell monmap monitor first. This is important for the
2198 // bootstrap case to ensure that the very first paxos proposal
2199 // codifies the monmap. Otherwise any manner of chaos can ensue
2200 // when monitors are call elections or participating in a paxos
2201 // round without agreeing on who the participants are.
2202 monmon()->election_finished();
2203 _finish_svc_election();
7c673cae
FG
2204
2205 logger->inc(l_mon_election_win);
2206
224ce89b
WB
2207 // inject new metadata in first transaction.
2208 {
2209 // include previous metadata for missing mons (that aren't part of
2210 // the current quorum).
2211 map<int,Metadata> m = metadata;
2212 for (unsigned rank = 0; rank < monmap->size(); ++rank) {
2213 if (m.count(rank) == 0 &&
2214 mon_metadata.count(rank)) {
2215 m[rank] = mon_metadata[rank];
2216 }
2217 }
2218
2219 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
2220 // a new transaction immediately after the election finishes. We should
2221 // do that anyway for other reasons, though.
2222 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
2223 bufferlist bl;
11fdf7f2 2224 encode(m, bl);
224ce89b
WB
2225 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
2226 }
2227
7c673cae
FG
2228 finish_election();
2229 if (monmap->size() > 1 &&
2230 monmap->get_epoch() > 0) {
2231 timecheck_start();
2232 health_tick_start();
b32b8144
FG
2233
2234 // Freshen the health status before doing health_to_clog in case
2235 // our just-completed election changed the health
9f95a23c 2236 healthmon()->wait_for_active_ctx(new LambdaContext([this](int r){
b32b8144
FG
2237 dout(20) << "healthmon now active" << dendl;
2238 healthmon()->tick();
2239 if (healthmon()->is_proposing()) {
2240 dout(20) << __func__ << " healthmon proposing, waiting" << dendl;
9f95a23c 2241 healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext{this,
b32b8144 2242 [this](int r){
9f95a23c 2243 ceph_assert(ceph_mutex_is_locked_by_me(lock));
b32b8144 2244 do_health_to_clog_interval();
9f95a23c 2245 }});
b32b8144
FG
2246
2247 } else {
2248 do_health_to_clog_interval();
2249 }
2250 }));
2251
7c673cae
FG
2252 scrub_event_start();
2253 }
7c673cae
FG
2254}
2255
2256void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
2257 uint64_t features,
11fdf7f2 2258 const mon_feature_t& mon_features,
9f95a23c 2259 ceph_release_t min_mon_release)
7c673cae
FG
2260{
2261 state = STATE_PEON;
2262 leader_since = utime_t();
11fdf7f2 2263 quorum_since = mono_clock::now();
7c673cae
FG
2264 leader = l;
2265 quorum = q;
2266 outside_quorum.clear();
2267 quorum_con_features = features;
2268 quorum_mon_features = mon_features;
11fdf7f2 2269 quorum_min_mon_release = min_mon_release;
7c673cae
FG
2270 dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
2271 << " quorum is " << quorum << " features are " << quorum_con_features
2272 << " mon_features are " << quorum_mon_features
11fdf7f2 2273 << " min_mon_release " << min_mon_release
7c673cae
FG
2274 << dendl;
2275
2276 paxos->peon_init();
2277 _finish_svc_election();
7c673cae
FG
2278
2279 logger->inc(l_mon_election_lose);
2280
2281 finish_election();
11fdf7f2 2282}
7c673cae 2283
11fdf7f2
TL
2284namespace {
2285std::string collect_compression_algorithms()
2286{
2287 ostringstream os;
2288 bool printed = false;
2289 for (auto [name, key] : Compressor::compression_algorithms) {
2290 if (printed) {
2291 os << ", ";
2292 } else {
2293 printed = true;
2294 }
2295 std::ignore = key;
2296 os << name;
7c673cae 2297 }
11fdf7f2
TL
2298 return os.str();
2299}
7c673cae
FG
2300}
2301
224ce89b
WB
2302void Monitor::collect_metadata(Metadata *m)
2303{
2304 collect_sys_info(m, g_ceph_context);
11fdf7f2
TL
2305 (*m)["addrs"] = stringify(messenger->get_myaddrs());
2306 (*m)["compression_algorithms"] = collect_compression_algorithms();
2307
2308 // infer storage device
2309 string devname = store->get_devname();
2310 set<string> devnames;
2311 get_raw_devices(devname, &devnames);
9f95a23c
TL
2312 map<string,string> errs;
2313 get_device_metadata(devnames, m, &errs);
2314 for (auto& i : errs) {
2315 dout(1) << __func__ << " " << i.first << ": " << i.second << dendl;
11fdf7f2 2316 }
224ce89b
WB
2317}
2318
7c673cae
FG
2319void Monitor::finish_election()
2320{
2321 apply_quorum_to_compatset_features();
2322 apply_monmap_to_compatset_features();
2323 timecheck_finish();
2324 exited_quorum = utime_t();
2325 finish_contexts(g_ceph_context, waitfor_quorum);
2326 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2327 resend_routed_requests();
2328 update_logger();
2329 register_cluster_logger();
2330
11fdf7f2
TL
2331 // enable authentication
2332 {
2333 std::lock_guard l(auth_lock);
2334 authmon()->_set_mon_num_rank(monmap->size(), rank);
2335 }
2336
7c673cae 2337 // am i named properly?
11fdf7f2 2338 string cur_name = monmap->get_name(messenger->get_myaddrs());
7c673cae
FG
2339 if (cur_name != name) {
2340 dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
11fdf7f2
TL
2341 send_mon_message(
2342 new MMonJoin(monmap->fsid, name, messenger->get_myaddrs()),
2343 *quorum.begin());
7c673cae
FG
2344 }
2345}
2346
2347void Monitor::_apply_compatset_features(CompatSet &new_features)
2348{
2349 if (new_features.compare(features) != 0) {
2350 CompatSet diff = features.unsupported(new_features);
2351 dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2352 features = new_features;
2353
2354 auto t = std::make_shared<MonitorDBStore::Transaction>();
2355 write_features(t);
2356 store->apply_transaction(t);
2357
2358 calc_quorum_requirements();
2359 }
2360}
2361
2362void Monitor::apply_quorum_to_compatset_features()
2363{
2364 CompatSet new_features(features);
11fdf7f2 2365 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
7c673cae
FG
2366 if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2367 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2368 }
11fdf7f2
TL
2369 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2370 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
7c673cae
FG
2371 dout(5) << __func__ << dendl;
2372 _apply_compatset_features(new_features);
2373}
2374
2375void Monitor::apply_monmap_to_compatset_features()
2376{
2377 CompatSet new_features(features);
2378 mon_feature_t monmap_features = monmap->get_required_features();
2379
2380 /* persistent monmap features may go into the compatset.
2381 * optional monmap features may not - why?
2382 * because optional monmap features may be set/unset by the admin,
2383 * and possibly by other means that haven't yet been thought out,
2384 * so we can't make the monitor enforce them on start - because they
2385 * may go away.
2386 * this, of course, does not invalidate setting a compatset feature
2387 * for an optional feature - as long as you make sure to clean it up
2388 * once you unset it.
2389 */
2390 if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
11fdf7f2 2391 ceph_assert(ceph::features::mon::get_persistent().contains_all(
7c673cae
FG
2392 ceph::features::mon::FEATURE_KRAKEN));
2393 // this feature should only ever be set if the quorum supports it.
11fdf7f2 2394 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
7c673cae
FG
2395 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2396 }
181888fb 2397 if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
11fdf7f2 2398 ceph_assert(ceph::features::mon::get_persistent().contains_all(
181888fb
FG
2399 ceph::features::mon::FEATURE_LUMINOUS));
2400 // this feature should only ever be set if the quorum supports it.
11fdf7f2 2401 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
181888fb
FG
2402 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
2403 }
11fdf7f2
TL
2404 if (monmap_features.contains_all(ceph::features::mon::FEATURE_MIMIC)) {
2405 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2406 ceph::features::mon::FEATURE_MIMIC));
2407 // this feature should only ever be set if the quorum supports it.
2408 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_MIMIC));
2409 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
2410 }
2411 if (monmap_features.contains_all(ceph::features::mon::FEATURE_NAUTILUS)) {
2412 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2413 ceph::features::mon::FEATURE_NAUTILUS));
2414 // this feature should only ever be set if the quorum supports it.
2415 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_NAUTILUS));
2416 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
2417 }
9f95a23c
TL
2418 if (monmap_features.contains_all(ceph::features::mon::FEATURE_OCTOPUS)) {
2419 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2420 ceph::features::mon::FEATURE_OCTOPUS));
2421 // this feature should only ever be set if the quorum supports it.
2422 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_OCTOPUS));
2423 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS);
2424 }
7c673cae
FG
2425
2426 dout(5) << __func__ << dendl;
2427 _apply_compatset_features(new_features);
2428}
2429
2430void Monitor::calc_quorum_requirements()
2431{
2432 required_features = 0;
2433
2434 // compatset
7c673cae
FG
2435 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2436 required_features |= CEPH_FEATURE_OSDMAP_ENC;
2437 }
7c673cae
FG
2438 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2439 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2440 }
181888fb
FG
2441 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) {
2442 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2443 }
11fdf7f2
TL
2444 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_MIMIC)) {
2445 required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
2446 }
2447 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS)) {
2448 required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
2449 CEPH_FEATUREMASK_CEPHX_V2;
2450 }
9f95a23c
TL
2451 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS)) {
2452 required_features |= CEPH_FEATUREMASK_SERVER_OCTOPUS;
2453 }
7c673cae
FG
2454
2455 // monmap
2456 if (monmap->get_required_features().contains_all(
2457 ceph::features::mon::FEATURE_KRAKEN)) {
2458 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2459 }
2460 if (monmap->get_required_features().contains_all(
2461 ceph::features::mon::FEATURE_LUMINOUS)) {
2462 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2463 }
11fdf7f2
TL
2464 if (monmap->get_required_features().contains_all(
2465 ceph::features::mon::FEATURE_MIMIC)) {
2466 required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
2467 }
2468 if (monmap->get_required_features().contains_all(
2469 ceph::features::mon::FEATURE_NAUTILUS)) {
2470 required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
2471 CEPH_FEATUREMASK_CEPHX_V2;
2472 }
7c673cae
FG
2473 dout(10) << __func__ << " required_features " << required_features << dendl;
2474}
2475
31f18b77
FG
2476void Monitor::get_combined_feature_map(FeatureMap *fm)
2477{
2478 *fm += session_map.feature_map;
2479 for (auto id : quorum) {
2480 if (id != rank) {
2481 *fm += quorum_feature_map[id];
2482 }
2483 }
2484}
2485
9f95a23c 2486void Monitor::sync_force(Formatter *f)
7c673cae 2487{
7c673cae
FG
2488 auto tx(std::make_shared<MonitorDBStore::Transaction>());
2489 sync_stash_critical_state(tx);
2490 tx->put("mon_sync", "force_sync", 1);
2491 store->apply_transaction(tx);
2492
2493 f->open_object_section("sync_force");
2494 f->dump_int("ret", 0);
2495 f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2496 f->close_section(); // sync_force
7c673cae
FG
2497}
2498
2499void Monitor::_quorum_status(Formatter *f, ostream& ss)
2500{
2501 bool free_formatter = false;
2502
2503 if (!f) {
2504 // louzy/lazy hack: default to json if no formatter has been defined
2505 f = new JSONFormatter();
2506 free_formatter = true;
2507 }
2508 f->open_object_section("quorum_status");
2509 f->dump_int("election_epoch", get_epoch());
2510
2511 f->open_array_section("quorum");
2512 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2513 f->dump_int("mon", *p);
2514 f->close_section(); // quorum
2515
2516 list<string> quorum_names = get_quorum_names();
2517 f->open_array_section("quorum_names");
2518 for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2519 f->dump_string("mon", *p);
2520 f->close_section(); // quorum_names
2521
2522 f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin()));
2523
11fdf7f2
TL
2524 if (!quorum.empty()) {
2525 f->dump_int(
2526 "quorum_age",
2527 std::chrono::duration_cast<std::chrono::seconds>(
2528 mono_clock::now() - quorum_since).count());
2529 }
2530
9f95a23c
TL
2531 f->open_object_section("features");
2532 f->dump_stream("quorum_con") << quorum_con_features;
2533 quorum_mon_features.dump(f, "quorum_mon");
2534 f->close_section();
2535
7c673cae
FG
2536 f->open_object_section("monmap");
2537 monmap->dump(f);
2538 f->close_section(); // monmap
2539
2540 f->close_section(); // quorum_status
2541 f->flush(ss);
2542 if (free_formatter)
2543 delete f;
2544}
2545
9f95a23c 2546void Monitor::get_mon_status(Formatter *f)
7c673cae 2547{
7c673cae
FG
2548 f->open_object_section("mon_status");
2549 f->dump_string("name", name);
2550 f->dump_int("rank", rank);
2551 f->dump_string("state", get_state_name());
2552 f->dump_int("election_epoch", get_epoch());
2553
2554 f->open_array_section("quorum");
2555 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2556 f->dump_int("mon", *p);
2557 }
7c673cae
FG
2558 f->close_section(); // quorum
2559
11fdf7f2
TL
2560 if (!quorum.empty()) {
2561 f->dump_int(
2562 "quorum_age",
2563 std::chrono::duration_cast<std::chrono::seconds>(
2564 mono_clock::now() - quorum_since).count());
2565 }
2566
7c673cae
FG
2567 f->open_object_section("features");
2568 f->dump_stream("required_con") << required_features;
2569 mon_feature_t req_mon_features = get_required_mon_features();
2570 req_mon_features.dump(f, "required_mon");
2571 f->dump_stream("quorum_con") << quorum_con_features;
2572 quorum_mon_features.dump(f, "quorum_mon");
2573 f->close_section(); // features
2574
2575 f->open_array_section("outside_quorum");
2576 for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2577 f->dump_string("mon", *p);
2578 f->close_section(); // outside_quorum
2579
2580 f->open_array_section("extra_probe_peers");
11fdf7f2 2581 for (set<entity_addrvec_t>::iterator p = extra_probe_peers.begin();
7c673cae 2582 p != extra_probe_peers.end();
11fdf7f2
TL
2583 ++p) {
2584 f->dump_object("peer", *p);
2585 }
7c673cae
FG
2586 f->close_section(); // extra_probe_peers
2587
2588 f->open_array_section("sync_provider");
2589 for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2590 p != sync_providers.end();
2591 ++p) {
2592 f->dump_unsigned("cookie", p->second.cookie);
11fdf7f2 2593 f->dump_object("addrs", p->second.addrs);
7c673cae
FG
2594 f->dump_stream("timeout") << p->second.timeout;
2595 f->dump_unsigned("last_committed", p->second.last_committed);
2596 f->dump_stream("last_key") << p->second.last_key;
2597 }
2598 f->close_section();
2599
2600 if (is_synchronizing()) {
2601 f->open_object_section("sync");
2602 f->dump_stream("sync_provider") << sync_provider;
2603 f->dump_unsigned("sync_cookie", sync_cookie);
2604 f->dump_unsigned("sync_start_version", sync_start_version);
2605 f->close_section();
2606 }
2607
11fdf7f2
TL
2608 if (g_conf()->mon_sync_provider_kill_at > 0)
2609 f->dump_int("provider_kill_at", g_conf()->mon_sync_provider_kill_at);
2610 if (g_conf()->mon_sync_requester_kill_at > 0)
2611 f->dump_int("requester_kill_at", g_conf()->mon_sync_requester_kill_at);
7c673cae
FG
2612
2613 f->open_object_section("monmap");
2614 monmap->dump(f);
2615 f->close_section();
2616
31f18b77 2617 f->dump_object("feature_map", session_map.feature_map);
7c673cae 2618 f->close_section(); // mon_status
7c673cae
FG
2619}
2620
2621
2622// health status to clog
2623
2624void Monitor::health_tick_start()
2625{
2626 if (!cct->_conf->mon_health_to_clog ||
2627 cct->_conf->mon_health_to_clog_tick_interval <= 0)
2628 return;
2629
2630 dout(15) << __func__ << dendl;
2631
2632 health_tick_stop();
3efd9988
FG
2633 health_tick_event = timer.add_event_after(
2634 cct->_conf->mon_health_to_clog_tick_interval,
9f95a23c 2635 new C_MonContext{this, [this](int r) {
3efd9988
FG
2636 if (r < 0)
2637 return;
3efd9988 2638 health_tick_start();
9f95a23c 2639 }});
7c673cae
FG
2640}
2641
2642void Monitor::health_tick_stop()
2643{
2644 dout(15) << __func__ << dendl;
2645
2646 if (health_tick_event) {
2647 timer.cancel_event(health_tick_event);
2648 health_tick_event = NULL;
2649 }
2650}
2651
9f95a23c 2652ceph::real_clock::time_point Monitor::health_interval_calc_next_update()
7c673cae 2653{
9f95a23c 2654 auto now = ceph::real_clock::now();
7c673cae 2655
9f95a23c
TL
2656 auto secs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch());
2657 int remainder = secs.count() % cct->_conf->mon_health_to_clog_interval;
7c673cae 2658 int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
9f95a23c 2659 auto next = secs + std::chrono::seconds(adjustment);
7c673cae
FG
2660
2661 dout(20) << __func__
2662 << " now: " << now << ","
2663 << " next: " << next << ","
2664 << " interval: " << cct->_conf->mon_health_to_clog_interval
2665 << dendl;
2666
9f95a23c 2667 return ceph::real_clock::time_point{next};
7c673cae
FG
2668}
2669
2670void Monitor::health_interval_start()
2671{
2672 dout(15) << __func__ << dendl;
2673
2674 if (!cct->_conf->mon_health_to_clog ||
2675 cct->_conf->mon_health_to_clog_interval <= 0) {
2676 return;
2677 }
2678
2679 health_interval_stop();
9f95a23c
TL
2680 auto next = health_interval_calc_next_update();
2681 health_interval_event = new C_MonContext{this, [this](int r) {
7c673cae
FG
2682 if (r < 0)
2683 return;
2684 do_health_to_clog_interval();
9f95a23c 2685 }};
3efd9988
FG
2686 if (!timer.add_event_at(next, health_interval_event)) {
2687 health_interval_event = nullptr;
2688 }
7c673cae
FG
2689}
2690
2691void Monitor::health_interval_stop()
2692{
2693 dout(15) << __func__ << dendl;
2694 if (health_interval_event) {
2695 timer.cancel_event(health_interval_event);
2696 }
2697 health_interval_event = NULL;
2698}
2699
2700void Monitor::health_events_cleanup()
2701{
2702 health_tick_stop();
2703 health_interval_stop();
2704 health_status_cache.reset();
2705}
2706
2707void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2708{
2709 dout(20) << __func__ << dendl;
2710
2711 if (changed.count("mon_health_to_clog")) {
2712 if (!cct->_conf->mon_health_to_clog) {
2713 health_events_cleanup();
11fdf7f2 2714 return;
7c673cae
FG
2715 } else {
2716 if (!health_tick_event) {
2717 health_tick_start();
2718 }
2719 if (!health_interval_event) {
2720 health_interval_start();
2721 }
2722 }
2723 }
2724
2725 if (changed.count("mon_health_to_clog_interval")) {
2726 if (cct->_conf->mon_health_to_clog_interval <= 0) {
2727 health_interval_stop();
2728 } else {
2729 health_interval_start();
2730 }
2731 }
2732
2733 if (changed.count("mon_health_to_clog_tick_interval")) {
2734 if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2735 health_tick_stop();
2736 } else {
2737 health_tick_start();
2738 }
2739 }
2740}
2741
2742void Monitor::do_health_to_clog_interval()
2743{
2744 // outputting to clog may have been disabled in the conf
2745 // since we were scheduled.
2746 if (!cct->_conf->mon_health_to_clog ||
2747 cct->_conf->mon_health_to_clog_interval <= 0)
2748 return;
2749
2750 dout(10) << __func__ << dendl;
2751
2752 // do we have a cached value for next_clog_update? if not,
2753 // do we know when the last update was?
2754
2755 do_health_to_clog(true);
2756 health_interval_start();
2757}
2758
2759void Monitor::do_health_to_clog(bool force)
2760{
2761 // outputting to clog may have been disabled in the conf
2762 // since we were scheduled.
2763 if (!cct->_conf->mon_health_to_clog ||
2764 cct->_conf->mon_health_to_clog_interval <= 0)
2765 return;
2766
2767 dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2768
11fdf7f2 2769 string summary;
9f95a23c 2770 health_status_t level = healthmon()->get_health_status(false, nullptr, &summary);
11fdf7f2
TL
2771 if (!force &&
2772 summary == health_status_cache.summary &&
2773 level == health_status_cache.overall)
2774 return;
2775 clog->health(level) << "overall " << summary;
2776 health_status_cache.summary = summary;
2777 health_status_cache.overall = level;
224ce89b
WB
2778}
2779
224ce89b
WB
2780void Monitor::log_health(
2781 const health_check_map_t& updated,
2782 const health_check_map_t& previous,
2783 MonitorDBStore::TransactionRef t)
2784{
11fdf7f2 2785 if (!g_conf()->mon_health_to_clog) {
7c673cae
FG
2786 return;
2787 }
181888fb
FG
2788
2789 const utime_t now = ceph_clock_now();
2790
224ce89b
WB
2791 // FIXME: log atomically as part of @t instead of using clog.
2792 dout(10) << __func__ << " updated " << updated.checks.size()
2793 << " previous " << previous.checks.size()
2794 << dendl;
11fdf7f2 2795 const auto min_log_period = g_conf().get_val<int64_t>(
181888fb 2796 "mon_health_log_update_period");
224ce89b
WB
2797 for (auto& p : updated.checks) {
2798 auto q = previous.checks.find(p.first);
181888fb 2799 bool logged = false;
224ce89b
WB
2800 if (q == previous.checks.end()) {
2801 // new
2802 ostringstream ss;
2803 ss << "Health check failed: " << p.second.summary << " ("
2804 << p.first << ")";
181888fb
FG
2805 clog->health(p.second.severity) << ss.str();
2806
2807 logged = true;
224ce89b
WB
2808 } else {
2809 if (p.second.summary != q->second.summary ||
2810 p.second.severity != q->second.severity) {
181888fb
FG
2811
2812 auto status_iter = health_check_log_times.find(p.first);
2813 if (status_iter != health_check_log_times.end()) {
2814 if (p.second.severity == q->second.severity &&
2815 now - status_iter->second.updated_at < min_log_period) {
2816 // We already logged this recently and the severity is unchanged,
2817 // so skip emitting an update of the summary string.
2818 // We'll get an update out of tick() later if the check
2819 // is still failing.
2820 continue;
2821 }
2822 }
2823
2824 // summary or severity changed (ignore detail changes at this level)
2825 ostringstream ss;
224ce89b 2826 ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
181888fb
FG
2827 clog->health(p.second.severity) << ss.str();
2828
2829 logged = true;
2830 }
2831 }
2832 // Record the time at which we last logged, so that we can check this
2833 // when considering whether/when to print update messages.
2834 if (logged) {
2835 auto iter = health_check_log_times.find(p.first);
2836 if (iter == health_check_log_times.end()) {
2837 health_check_log_times.emplace(p.first, HealthCheckLogStatus(
2838 p.second.severity, p.second.summary, now));
2839 } else {
2840 iter->second = HealthCheckLogStatus(
2841 p.second.severity, p.second.summary, now);
224ce89b
WB
2842 }
2843 }
2844 }
2845 for (auto& p : previous.checks) {
2846 if (!updated.checks.count(p.first)) {
2847 // cleared
2848 ostringstream ss;
2849 if (p.first == "DEGRADED_OBJECTS") {
2850 clog->info() << "All degraded objects recovered";
2851 } else if (p.first == "OSD_FLAGS") {
2852 clog->info() << "OSD flags cleared";
2853 } else {
2854 clog->info() << "Health check cleared: " << p.first << " (was: "
2855 << p.second.summary << ")";
2856 }
181888fb
FG
2857
2858 if (health_check_log_times.count(p.first)) {
2859 health_check_log_times.erase(p.first);
2860 }
224ce89b
WB
2861 }
2862 }
7c673cae 2863
224ce89b
WB
2864 if (previous.checks.size() && updated.checks.size() == 0) {
2865 // We might be going into a fully healthy state, check
2866 // other subsystems
2867 bool any_checks = false;
2868 for (auto& svc : paxos_service) {
2869 if (&(svc->get_health_checks()) == &(previous)) {
2870 // Ignore the ones we're clearing right now
2871 continue;
2872 }
7c673cae 2873
224ce89b
WB
2874 if (svc->get_health_checks().checks.size() > 0) {
2875 any_checks = true;
2876 break;
2877 }
2878 }
2879 if (!any_checks) {
2880 clog->info() << "Cluster is now healthy";
2881 }
2882 }
7c673cae
FG
2883}
2884
7c673cae
FG
2885void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
2886{
2887 if (f)
2888 f->open_object_section("status");
2889
11fdf7f2 2890 mono_clock::time_point now = mono_clock::now();
7c673cae
FG
2891 if (f) {
2892 f->dump_stream("fsid") << monmap->get_fsid();
9f95a23c 2893 healthmon()->get_health_status(false, f, nullptr);
7c673cae
FG
2894 f->dump_unsigned("election_epoch", get_epoch());
2895 {
2896 f->open_array_section("quorum");
2897 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2898 f->dump_int("rank", *p);
2899 f->close_section();
2900 f->open_array_section("quorum_names");
2901 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2902 f->dump_string("id", monmap->get_name(*p));
2903 f->close_section();
11fdf7f2
TL
2904 f->dump_int(
2905 "quorum_age",
2906 std::chrono::duration_cast<std::chrono::seconds>(
2907 mono_clock::now() - quorum_since).count());
7c673cae
FG
2908 }
2909 f->open_object_section("monmap");
9f95a23c 2910 monmap->dump_summary(f);
7c673cae
FG
2911 f->close_section();
2912 f->open_object_section("osdmap");
224ce89b 2913 osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
7c673cae
FG
2914 f->close_section();
2915 f->open_object_section("pgmap");
11fdf7f2 2916 mgrstatmon()->print_summary(f, NULL);
7c673cae
FG
2917 f->close_section();
2918 f->open_object_section("fsmap");
2919 mdsmon()->get_fsmap().print_summary(f, NULL);
2920 f->close_section();
7c673cae
FG
2921 f->open_object_section("mgrmap");
2922 mgrmon()->get_map().print_summary(f, nullptr);
2923 f->close_section();
224ce89b
WB
2924
2925 f->dump_object("servicemap", mgrstatmon()->get_service_map());
11fdf7f2
TL
2926
2927 f->open_object_section("progress_events");
2928 for (auto& i : mgrstatmon()->get_progress_events()) {
2929 f->dump_object(i.first.c_str(), i.second);
2930 }
2931 f->close_section();
2932
7c673cae
FG
2933 f->close_section();
2934 } else {
31f18b77
FG
2935 ss << " cluster:\n";
2936 ss << " id: " << monmap->get_fsid() << "\n";
224ce89b
WB
2937
2938 string health;
9f95a23c
TL
2939 healthmon()->get_health_status(false, nullptr, &health,
2940 "\n ", "\n ");
224ce89b
WB
2941 ss << " health: " << health << "\n";
2942
31f18b77 2943 ss << "\n \n services:\n";
224ce89b
WB
2944 {
2945 size_t maxlen = 3;
2946 auto& service_map = mgrstatmon()->get_service_map();
2947 for (auto& p : service_map.services) {
2948 maxlen = std::max(maxlen, p.first.size());
2949 }
2950 string spacing(maxlen - 3, ' ');
2951 const auto quorum_names = get_quorum_names();
2952 const auto mon_count = monmap->mon_info.size();
2953 ss << " mon: " << spacing << mon_count << " daemons, quorum "
11fdf7f2 2954 << quorum_names << " (age " << timespan_str(now - quorum_since) << ")";
224ce89b
WB
2955 if (quorum_names.size() != mon_count) {
2956 std::list<std::string> out_of_q;
2957 for (size_t i = 0; i < monmap->ranks.size(); ++i) {
2958 if (quorum.count(i) == 0) {
2959 out_of_q.push_back(monmap->ranks[i]);
2960 }
2961 }
2962 ss << ", out of quorum: " << joinify(out_of_q.begin(),
2963 out_of_q.end(), std::string(", "));
31f18b77 2964 }
7c673cae 2965 ss << "\n";
224ce89b
WB
2966 if (mgrmon()->in_use()) {
2967 ss << " mgr: " << spacing;
2968 mgrmon()->get_map().print_summary(nullptr, &ss);
2969 ss << "\n";
2970 }
92f5a8d4
TL
2971 if (mdsmon()->should_print_status()) {
2972 ss << " mds: " << spacing << mdsmon()->get_fsmap() << "\n";
224ce89b
WB
2973 }
2974 ss << " osd: " << spacing;
2975 osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
2976 ss << "\n";
2977 for (auto& p : service_map.services) {
9f95a23c
TL
2978 const std::string &service = p.first;
2979 // filter out normal ceph entity types
2980 if (ServiceMap::is_normal_ceph_entity(service)) {
2981 continue;
2982 }
224ce89b
WB
2983 ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
2984 << p.second.get_summary() << "\n";
2985 }
7c673cae 2986 }
31f18b77 2987
9f95a23c
TL
2988 {
2989 auto& service_map = mgrstatmon()->get_service_map();
2990 if (!service_map.services.empty()) {
2991 ss << "\n \n task status:\n";
2992 {
2993 for (auto &p : service_map.services) {
2994 ss << p.second.get_task_summary(p.first);
2995 }
2996 }
2997 }
2998 }
2999
31f18b77 3000 ss << "\n \n data:\n";
11fdf7f2
TL
3001 mgrstatmon()->print_summary(NULL, &ss);
3002
3003 auto& pem = mgrstatmon()->get_progress_events();
3004 if (!pem.empty()) {
3005 ss << "\n \n progress:\n";
3006 for (auto& i : pem) {
3007 ss << " " << i.second.message << "\n";
11fdf7f2
TL
3008 }
3009 }
31f18b77 3010 ss << "\n ";
7c673cae
FG
3011 }
3012}
3013
11fdf7f2 3014void Monitor::_generate_command_map(cmdmap_t& cmdmap,
7c673cae
FG
3015 map<string,string> &param_str_map)
3016{
11fdf7f2 3017 for (auto p = cmdmap.begin(); p != cmdmap.end(); ++p) {
7c673cae
FG
3018 if (p->first == "prefix")
3019 continue;
3020 if (p->first == "caps") {
3021 vector<string> cv;
9f95a23c 3022 if (cmd_getval(cmdmap, "caps", cv) &&
7c673cae
FG
3023 cv.size() % 2 == 0) {
3024 for (unsigned i = 0; i < cv.size(); i += 2) {
3025 string k = string("caps_") + cv[i];
3026 param_str_map[k] = cv[i + 1];
3027 }
3028 continue;
3029 }
3030 }
3031 param_str_map[p->first] = cmd_vartype_stringify(p->second);
3032 }
3033}
3034
c07f9fc5
FG
3035const MonCommand *Monitor::_get_moncommand(
3036 const string &cmd_prefix,
d2e6a577
FG
3037 const vector<MonCommand>& cmds)
3038{
3039 for (auto& c : cmds) {
3040 if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
3041 return &c;
7c673cae
FG
3042 }
3043 }
d2e6a577 3044 return nullptr;
7c673cae
FG
3045}
3046
11fdf7f2
TL
3047bool Monitor::_allowed_command(MonSession *s, const string &module,
3048 const string &prefix, const cmdmap_t& cmdmap,
7c673cae
FG
3049 const map<string,string>& param_str_map,
3050 const MonCommand *this_cmd) {
3051
3052 bool cmd_r = this_cmd->requires_perm('r');
3053 bool cmd_w = this_cmd->requires_perm('w');
3054 bool cmd_x = this_cmd->requires_perm('x');
3055
3056 bool capable = s->caps.is_capable(
3057 g_ceph_context,
7c673cae
FG
3058 s->entity_name,
3059 module, prefix, param_str_map,
11fdf7f2
TL
3060 cmd_r, cmd_w, cmd_x,
3061 s->get_peer_socket_addr());
7c673cae
FG
3062
3063 dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
3064 return capable;
3065}
3066
c07f9fc5 3067void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
7c673cae 3068 Formatter *f,
11fdf7f2
TL
3069 uint64_t features,
3070 bufferlist *rdata)
7c673cae
FG
3071{
3072 int cmdnum = 0;
3073 f->open_object_section("command_descriptions");
c07f9fc5
FG
3074 for (const auto &cmd : commands) {
3075 unsigned flags = cmd.flags;
7c673cae
FG
3076 ostringstream secname;
3077 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
11fdf7f2 3078 dump_cmddesc_to_json(f, features, secname.str(),
c07f9fc5 3079 cmd.cmdstring, cmd.helpstring, cmd.module,
11fdf7f2 3080 cmd.req_perms, flags);
7c673cae
FG
3081 cmdnum++;
3082 }
3083 f->close_section(); // command_descriptions
3084
3085 f->flush(*rdata);
3086}
3087
7c673cae
FG
3088bool Monitor::is_keyring_required()
3089{
11fdf7f2
TL
3090 return auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX) ||
3091 auth_service_required.is_supported_auth(CEPH_AUTH_CEPHX) ||
3092 auth_cluster_required.is_supported_auth(CEPH_AUTH_GSS) ||
3093 auth_service_required.is_supported_auth(CEPH_AUTH_GSS);
7c673cae
FG
3094}
3095
3096struct C_MgrProxyCommand : public Context {
3097 Monitor *mon;
3098 MonOpRequestRef op;
3099 uint64_t size;
3100 bufferlist outbl;
3101 string outs;
3102 C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
3103 : mon(mon), op(op), size(s) { }
3104 void finish(int r) {
11fdf7f2 3105 std::lock_guard l(mon->lock);
7c673cae
FG
3106 mon->mgr_proxy_bytes -= size;
3107 mon->reply_command(op, r, outs, outbl, 0);
3108 }
3109};
3110
9f95a23c 3111void Monitor::handle_tell_command(MonOpRequestRef op)
7c673cae 3112{
11fdf7f2 3113 ceph_assert(op->is_type_command());
9f95a23c 3114 MCommand *m = static_cast<MCommand*>(op->get_req());
7c673cae
FG
3115 if (m->fsid != monmap->fsid) {
3116 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
9f95a23c
TL
3117 return reply_tell_command(op, -EACCES, "wrong fsid");
3118 }
3119 MonSession *session = op->get_session();
3120 if (!session) {
3121 dout(5) << __func__ << " dropping stray message " << *m << dendl;
3122 return;
3123 }
3124 cmdmap_t cmdmap;
3125 if (stringstream ss; !cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3126 return reply_tell_command(op, -EINVAL, ss.str());
3127 }
3128 map<string,string> param_str_map;
3129 _generate_command_map(cmdmap, param_str_map);
3130 string prefix;
3131 if (!cmd_getval(cmdmap, "prefix", prefix)) {
3132 return reply_tell_command(op, -EINVAL, "no prefix");
3133 }
3134 if (auto cmd = _get_moncommand(prefix,
3135 get_local_commands(quorum_mon_features));
3136 cmd) {
3137 if (cmd->is_obsolete() ||
3138 (cct->_conf->mon_debug_deprecated_as_obsolete &&
3139 cmd->is_deprecated())) {
3140 return reply_tell_command(op, -ENOTSUP,
3141 "command is obsolete; "
3142 "please check usage and/or man page");
3143 }
3144 }
3145 // see if command is whitelisted
3146 if (!session->caps.is_capable(
3147 g_ceph_context,
3148 session->entity_name,
3149 "mon", prefix, param_str_map,
3150 true, true, true,
3151 session->get_peer_socket_addr())) {
3152 return reply_tell_command(op, -EACCES, "insufficient caps");
3153 }
3154 // pass it to asok
3155 cct->get_admin_socket()->queue_tell_command(m);
3156}
3157
3158void Monitor::handle_command(MonOpRequestRef op)
3159{
3160 ceph_assert(op->is_type_command());
3161 auto m = op->get_req<MMonCommand>();
3162 if (m->fsid != monmap->fsid) {
3163 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid
3164 << dendl;
7c673cae
FG
3165 reply_command(op, -EPERM, "wrong fsid", 0);
3166 return;
3167 }
3168
11fdf7f2 3169 MonSession *session = op->get_session();
7c673cae
FG
3170 if (!session) {
3171 dout(5) << __func__ << " dropping stray message " << *m << dendl;
3172 return;
3173 }
7c673cae
FG
3174
3175 if (m->cmd.empty()) {
9f95a23c 3176 reply_command(op, -EINVAL, "no command specified", 0);
7c673cae
FG
3177 return;
3178 }
3179
3180 string prefix;
3181 vector<string> fullcmd;
11fdf7f2 3182 cmdmap_t cmdmap;
7c673cae
FG
3183 stringstream ss, ds;
3184 bufferlist rdata;
3185 string rs;
3186 int r = -EINVAL;
3187 rs = "unrecognized command";
3188
3189 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3190 // ss has reason for failure
3191 r = -EINVAL;
3192 rs = ss.str();
3193 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3194 reply_command(op, r, rs, 0);
3195 return;
3196 }
3197
3198 // check return value. If no prefix parameter provided,
3199 // return value will be false, then return error info.
9f95a23c 3200 if (!cmd_getval(cmdmap, "prefix", prefix)) {
7c673cae
FG
3201 reply_command(op, -EINVAL, "command prefix not found", 0);
3202 return;
3203 }
3204
3205 // check prefix is empty
3206 if (prefix.empty()) {
3207 reply_command(op, -EINVAL, "command prefix must not be empty", 0);
3208 return;
3209 }
3210
3211 if (prefix == "get_command_descriptions") {
3212 bufferlist rdata;
3213 Formatter *f = Formatter::create("json");
d2e6a577 3214
11fdf7f2
TL
3215 std::vector<MonCommand> commands = static_cast<MgrMonitor*>(
3216 paxos_service[PAXOS_MGR].get())->get_command_descs();
c07f9fc5 3217
d2e6a577
FG
3218 for (auto& c : leader_mon_commands) {
3219 commands.push_back(c);
c07f9fc5
FG
3220 }
3221
11fdf7f2
TL
3222 auto features = m->get_connection()->get_features();
3223 format_command_descriptions(commands, f, features, &rdata);
7c673cae
FG
3224 delete f;
3225 reply_command(op, 0, "", rdata, 0);
3226 return;
3227 }
3228
3229 string module;
3230 string err;
3231
3232 dout(0) << "handle_command " << *m << dendl;
3233
3234 string format;
9f95a23c 3235 cmd_getval(cmdmap, "format", format, string("plain"));
7c673cae
FG
3236 boost::scoped_ptr<Formatter> f(Formatter::create(format));
3237
3238 get_str_vec(prefix, fullcmd);
3239
3240 // make sure fullcmd is not empty.
3241 // invalid prefix will cause empty vector fullcmd.
3242 // such as, prefix=";,,;"
3243 if (fullcmd.empty()) {
3244 reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
3245 return;
3246 }
3247
3248 module = fullcmd[0];
3249
3250 // validate command is in leader map
3251
3252 const MonCommand *leader_cmd;
c07f9fc5
FG
3253 const auto& mgr_cmds = mgrmon()->get_command_descs();
3254 const MonCommand *mgr_cmd = nullptr;
3255 if (!mgr_cmds.empty()) {
d2e6a577 3256 mgr_cmd = _get_moncommand(prefix, mgr_cmds);
c07f9fc5 3257 }
d2e6a577 3258 leader_cmd = _get_moncommand(prefix, leader_mon_commands);
7c673cae 3259 if (!leader_cmd) {
c07f9fc5
FG
3260 leader_cmd = mgr_cmd;
3261 if (!leader_cmd) {
3262 reply_command(op, -EINVAL, "command not known", 0);
3263 return;
3264 }
7c673cae
FG
3265 }
3266 // validate command is in our map & matches, or forward if it is allowed
d2e6a577
FG
3267 const MonCommand *mon_cmd = _get_moncommand(
3268 prefix,
3269 get_local_commands(quorum_mon_features));
c07f9fc5
FG
3270 if (!mon_cmd) {
3271 mon_cmd = mgr_cmd;
3272 }
7c673cae
FG
3273 if (!is_leader()) {
3274 if (!mon_cmd) {
3275 if (leader_cmd->is_noforward()) {
3276 reply_command(op, -EINVAL,
3277 "command not locally supported and not allowed to forward",
3278 0);
3279 return;
3280 }
3281 dout(10) << "Command not locally supported, forwarding request "
3282 << m << dendl;
3283 forward_request_leader(op);
3284 return;
3285 } else if (!mon_cmd->is_compat(leader_cmd)) {
3286 if (mon_cmd->is_noforward()) {
3287 reply_command(op, -EINVAL,
3288 "command not compatible with leader and not allowed to forward",
3289 0);
3290 return;
3291 }
3292 dout(10) << "Command not compatible with leader, forwarding request "
3293 << m << dendl;
3294 forward_request_leader(op);
3295 return;
3296 }
3297 }
3298
3299 if (mon_cmd->is_obsolete() ||
3300 (cct->_conf->mon_debug_deprecated_as_obsolete
3301 && mon_cmd->is_deprecated())) {
3302 reply_command(op, -ENOTSUP,
3303 "command is obsolete; please check usage and/or man page",
3304 0);
3305 return;
3306 }
3307
3308 if (session->proxy_con && mon_cmd->is_noforward()) {
3309 dout(10) << "Got forward for noforward command " << m << dendl;
3310 reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
3311 return;
3312 }
3313
3314 /* what we perceive as being the service the command falls under */
3315 string service(mon_cmd->module);
3316
3317 dout(25) << __func__ << " prefix='" << prefix
3318 << "' module='" << module
3319 << "' service='" << service << "'" << dendl;
3320
3321 bool cmd_is_rw =
3322 (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
3323
3324 // validate user's permissions for requested command
3325 map<string,string> param_str_map;
3326 _generate_command_map(cmdmap, param_str_map);
3327 if (!_allowed_command(session, service, prefix, cmdmap,
3328 param_str_map, mon_cmd)) {
3329 dout(1) << __func__ << " access denied" << dendl;
3330 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
11fdf7f2 3331 << "from='" << session->name << " " << session->addrs << "' "
7c673cae
FG
3332 << "entity='" << session->entity_name << "' "
3333 << "cmd=" << m->cmd << ": access denied";
3334 reply_command(op, -EACCES, "access denied", 0);
3335 return;
3336 }
3337
3338 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
11fdf7f2
TL
3339 << "from='" << session->name << " " << session->addrs << "' "
3340 << "entity='" << session->entity_name << "' "
3341 << "cmd=" << m->cmd << ": dispatch";
7c673cae 3342
1911f103
TL
3343 // compat kludge for legacy clients trying to tell commands that are
3344 // new. see bottom of MonCommands.h. we need to handle both (1)
3345 // pre-octopus clients and (2) octopus clients with a mix of pre-octopus
3346 // and octopus mons.
3347 if ((!HAVE_FEATURE(m->get_connection()->get_features(), SERVER_OCTOPUS) ||
3348 monmap->min_mon_release < ceph_release_t::octopus) &&
3349 (prefix == "injectargs" ||
3350 prefix == "smart" ||
3351 prefix == "mon_status" ||
3352 prefix == "heap")) {
3353 if (m->get_connection()->get_messenger() == 0) {
3354 // Prior to octopus, monitors might forward these messages
3355 // around. that was broken at baseline, and if we try to process
3356 // this message now, it will assert out when we try to send a
3357 // message in reply from the asok/tell worker (see
3358 // AnonConnection). Just reply with an error.
3359 dout(5) << __func__ << " failing forwarded command from a (presumably) "
3360 << "pre-octopus peer" << dendl;
3361 reply_command(
3362 op, -EBUSY,
3363 "failing forwarded tell command in mixed-version mon cluster", 0);
3364 return;
3365 }
3366 dout(5) << __func__ << " passing command to tell/asok" << dendl;
3367 cct->get_admin_socket()->queue_tell_command(m);
3368 return;
3369 }
3370
11fdf7f2 3371 if (mon_cmd->is_mgr()) {
7c673cae
FG
3372 const auto& hdr = m->get_header();
3373 uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
11fdf7f2
TL
3374 uint64_t max = g_conf().get_val<Option::size_t>("mon_client_bytes")
3375 * g_conf().get_val<double>("mon_mgr_proxy_client_bytes_ratio");
7c673cae
FG
3376 if (mgr_proxy_bytes + size > max) {
3377 dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
3378 << " + " << size << " > max " << max << dendl;
3379 reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
3380 return;
3381 }
3382 mgr_proxy_bytes += size;
3383 dout(10) << __func__ << " proxying mgr command (+" << size
3384 << " -> " << mgr_proxy_bytes << ")" << dendl;
3385 C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
3386 mgr_client.start_command(m->cmd,
3387 m->get_data(),
3388 &fin->outbl,
3389 &fin->outs,
3390 new C_OnFinisher(fin, &finisher));
3391 return;
3392 }
3393
d2e6a577
FG
3394 if ((module == "mds" || module == "fs") &&
3395 prefix != "fs authorize") {
7c673cae
FG
3396 mdsmon()->dispatch(op);
3397 return;
3398 }
11fdf7f2
TL
3399 if ((module == "osd" ||
3400 prefix == "pg map" ||
3401 prefix == "pg repeer") &&
31f18b77 3402 prefix != "osd last-stat-seq") {
7c673cae
FG
3403 osdmon()->dispatch(op);
3404 return;
3405 }
11fdf7f2
TL
3406 if (module == "config") {
3407 configmon()->dispatch(op);
7c673cae
FG
3408 return;
3409 }
11fdf7f2 3410
7c673cae
FG
3411 if (module == "mon" &&
3412 /* Let the Monitor class handle the following commands:
7c673cae 3413 * 'mon scrub'
7c673cae 3414 */
7c673cae 3415 prefix != "mon scrub" &&
31f18b77
FG
3416 prefix != "mon metadata" &&
3417 prefix != "mon versions" &&
11fdf7f2
TL
3418 prefix != "mon count-metadata" &&
3419 prefix != "mon ok-to-stop" &&
3420 prefix != "mon ok-to-add-offline" &&
3421 prefix != "mon ok-to-rm") {
7c673cae
FG
3422 monmon()->dispatch(op);
3423 return;
3424 }
9f95a23c
TL
3425 if (module == "health" && prefix != "health") {
3426 healthmon()->dispatch(op);
3427 return;
3428 }
d2e6a577 3429 if (module == "auth" || prefix == "fs authorize") {
7c673cae
FG
3430 authmon()->dispatch(op);
3431 return;
3432 }
3433 if (module == "log") {
3434 logmon()->dispatch(op);
3435 return;
3436 }
3437
3438 if (module == "config-key") {
3439 config_key_service->dispatch(op);
3440 return;
3441 }
3442
3443 if (module == "mgr") {
3444 mgrmon()->dispatch(op);
3445 return;
3446 }
3447
3448 if (prefix == "fsid") {
3449 if (f) {
3450 f->open_object_section("fsid");
3451 f->dump_stream("fsid") << monmap->fsid;
3452 f->close_section();
3453 f->flush(rdata);
3454 } else {
3455 ds << monmap->fsid;
3456 rdata.append(ds);
3457 }
3458 reply_command(op, 0, "", rdata, 0);
3459 return;
3460 }
3461
9f95a23c 3462 if (prefix == "mon scrub") {
7c673cae
FG
3463 wait_for_paxos_write();
3464 if (is_leader()) {
3465 int r = scrub_start();
3466 reply_command(op, r, "", rdata, 0);
3467 } else if (is_peon()) {
3468 forward_request_leader(op);
3469 } else {
3470 reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3471 }
3472 return;
3473 }
3474
9f95a23c 3475 if (prefix == "time-sync-status") {
224ce89b
WB
3476 if (!f)
3477 f.reset(Formatter::create("json-pretty"));
3478 f->open_object_section("time_sync");
3479 if (!timecheck_skews.empty()) {
3480 f->open_object_section("time_skew_status");
3481 for (auto& i : timecheck_skews) {
224ce89b 3482 double skew = i.second;
11fdf7f2
TL
3483 double latency = timecheck_latencies[i.first];
3484 string name = monmap->get_name(i.first);
224ce89b
WB
3485 ostringstream tcss;
3486 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
3487 f->open_object_section(name.c_str());
3488 f->dump_float("skew", skew);
3489 f->dump_float("latency", latency);
3490 f->dump_stream("health") << tcstatus;
3491 if (tcstatus != HEALTH_OK) {
3492 f->dump_stream("details") << tcss.str();
3493 }
3494 f->close_section();
3495 }
3496 f->close_section();
3497 }
3498 f->open_object_section("timechecks");
3499 f->dump_unsigned("epoch", get_epoch());
3500 f->dump_int("round", timecheck_round);
3501 f->dump_stream("round_status") << ((timecheck_round%2) ?
3502 "on-going" : "finished");
3503 f->close_section();
3504 f->close_section();
3505 f->flush(rdata);
3506 r = 0;
3507 rs = "";
7c673cae
FG
3508 } else if (prefix == "status" ||
3509 prefix == "health" ||
3510 prefix == "df") {
3511 string detail;
9f95a23c 3512 cmd_getval(cmdmap, "detail", detail);
7c673cae
FG
3513
3514 if (prefix == "status") {
3515 // get_cluster_status handles f == NULL
3516 get_cluster_status(ds, f.get());
3517
3518 if (f) {
3519 f->flush(ds);
3520 ds << '\n';
3521 }
3522 rdata.append(ds);
3523 } else if (prefix == "health") {
11fdf7f2 3524 string plain;
9f95a23c 3525 healthmon()->get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
11fdf7f2
TL
3526 if (f) {
3527 f->flush(rdata);
7c673cae 3528 } else {
11fdf7f2 3529 rdata.append(plain);
7c673cae 3530 }
7c673cae
FG
3531 } else if (prefix == "df") {
3532 bool verbose = (detail == "detail");
3533 if (f)
3534 f->open_object_section("stats");
3535
11fdf7f2
TL
3536 mgrstatmon()->dump_cluster_stats(&ds, f.get(), verbose);
3537 if (!f) {
3538 ds << "\n \n";
3539 }
3540 mgrstatmon()->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
7c673cae
FG
3541
3542 if (f) {
3543 f->close_section();
3544 f->flush(ds);
3545 ds << '\n';
3546 }
3547 } else {
11fdf7f2 3548 ceph_abort_msg("We should never get here!");
7c673cae
FG
3549 return;
3550 }
3551 rdata.append(ds);
3552 rs = "";
3553 r = 0;
3554 } else if (prefix == "report") {
3555
3556 // this must be formatted, in its current form
3557 if (!f)
3558 f.reset(Formatter::create("json-pretty"));
3559 f->open_object_section("report");
3560 f->dump_stream("cluster_fingerprint") << fingerprint;
3561 f->dump_string("version", ceph_version_to_str());
3562 f->dump_string("commit", git_version_to_str());
3563 f->dump_stream("timestamp") << ceph_clock_now();
3564
3565 vector<string> tagsvec;
9f95a23c 3566 cmd_getval(cmdmap, "tags", tagsvec);
7c673cae
FG
3567 string tagstr = str_join(tagsvec, " ");
3568 if (!tagstr.empty())
3569 tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3570 f->dump_string("tag", tagstr);
3571
9f95a23c 3572 healthmon()->get_health_status(true, f.get(), nullptr);
7c673cae
FG
3573
3574 monmon()->dump_info(f.get());
3575 osdmon()->dump_info(f.get());
3576 mdsmon()->dump_info(f.get());
7c673cae 3577 authmon()->dump_info(f.get());
11fdf7f2 3578 mgrstatmon()->dump_info(f.get());
7c673cae
FG
3579
3580 paxos->dump_info(f.get());
3581
3582 f->close_section();
3583 f->flush(rdata);
3584
3585 ostringstream ss2;
11fdf7f2 3586 ss2 << "report " << rdata.crc32c(CEPH_MON_PORT_LEGACY);
7c673cae
FG
3587 rs = ss2.str();
3588 r = 0;
31f18b77
FG
3589 } else if (prefix == "osd last-stat-seq") {
3590 int64_t osd;
9f95a23c 3591 cmd_getval(cmdmap, "id", osd);
31f18b77
FG
3592 uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3593 if (f) {
3594 f->dump_unsigned("seq", seq);
3595 f->flush(ds);
3596 } else {
3597 ds << seq;
3598 rdata.append(ds);
3599 }
3600 rs = "";
3601 r = 0;
7c673cae
FG
3602 } else if (prefix == "node ls") {
3603 string node_type("all");
9f95a23c 3604 cmd_getval(cmdmap, "type", node_type);
7c673cae
FG
3605 if (!f)
3606 f.reset(Formatter::create("json-pretty"));
3607 if (node_type == "all") {
3608 f->open_object_section("nodes");
3609 print_nodes(f.get(), ds);
3610 osdmon()->print_nodes(f.get());
3611 mdsmon()->print_nodes(f.get());
11fdf7f2 3612 mgrmon()->print_nodes(f.get());
7c673cae
FG
3613 f->close_section();
3614 } else if (node_type == "mon") {
3615 print_nodes(f.get(), ds);
3616 } else if (node_type == "osd") {
3617 osdmon()->print_nodes(f.get());
3618 } else if (node_type == "mds") {
3619 mdsmon()->print_nodes(f.get());
11fdf7f2
TL
3620 } else if (node_type == "mgr") {
3621 mgrmon()->print_nodes(f.get());
7c673cae
FG
3622 }
3623 f->flush(ds);
3624 rdata.append(ds);
3625 rs = "";
3626 r = 0;
31f18b77
FG
3627 } else if (prefix == "features") {
3628 if (!is_leader() && !is_peon()) {
3629 dout(10) << " waiting for quorum" << dendl;
3630 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3631 return;
3632 }
3633 if (!is_leader()) {
3634 forward_request_leader(op);
3635 return;
3636 }
3637 if (!f)
3638 f.reset(Formatter::create("json-pretty"));
3639 FeatureMap fm;
3640 get_combined_feature_map(&fm);
3641 f->dump_object("features", fm);
3642 f->flush(rdata);
3643 rs = "";
3644 r = 0;
7c673cae
FG
3645 } else if (prefix == "mon metadata") {
3646 if (!f)
3647 f.reset(Formatter::create("json-pretty"));
3648
3649 string name;
9f95a23c 3650 bool all = !cmd_getval(cmdmap, "id", name);
7c673cae
FG
3651 if (!all) {
3652 // Dump a single mon's metadata
3653 int mon = monmap->get_rank(name);
3654 if (mon < 0) {
3655 rs = "requested mon not found";
3656 r = -ENOENT;
3657 goto out;
3658 }
3659 f->open_object_section("mon_metadata");
3660 r = get_mon_metadata(mon, f.get(), ds);
3661 f->close_section();
3662 } else {
3663 // Dump all mons' metadata
3664 r = 0;
3665 f->open_array_section("mon_metadata");
3666 for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3667 std::ostringstream get_err;
3668 f->open_object_section("mon");
3669 f->dump_string("name", monmap->get_name(rank));
3670 r = get_mon_metadata(rank, f.get(), get_err);
3671 f->close_section();
3672 if (r == -ENOENT || r == -EINVAL) {
3673 dout(1) << get_err.str() << dendl;
3674 // Drop error, list what metadata we do have
3675 r = 0;
3676 } else if (r != 0) {
3677 derr << "Unexpected error from get_mon_metadata: "
3678 << cpp_strerror(r) << dendl;
3679 ds << get_err.str();
3680 break;
3681 }
3682 }
3683 f->close_section();
3684 }
3685
3686 f->flush(ds);
3687 rdata.append(ds);
3688 rs = "";
31f18b77
FG
3689 } else if (prefix == "mon versions") {
3690 if (!f)
3691 f.reset(Formatter::create("json-pretty"));
3692 count_metadata("ceph_version", f.get());
3693 f->flush(ds);
3694 rdata.append(ds);
3695 rs = "";
3696 r = 0;
3697 } else if (prefix == "mon count-metadata") {
3698 if (!f)
3699 f.reset(Formatter::create("json-pretty"));
3700 string field;
9f95a23c 3701 cmd_getval(cmdmap, "property", field);
31f18b77
FG
3702 count_metadata(field, f.get());
3703 f->flush(ds);
3704 rdata.append(ds);
3705 rs = "";
3706 r = 0;
7c673cae
FG
3707 } else if (prefix == "quorum_status") {
3708 // make sure our map is readable and up to date
3709 if (!is_leader() && !is_peon()) {
3710 dout(10) << " waiting for quorum" << dendl;
3711 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3712 return;
3713 }
3714 _quorum_status(f.get(), ds);
3715 rdata.append(ds);
3716 rs = "";
3717 r = 0;
11fdf7f2
TL
3718 } else if (prefix == "mon ok-to-stop") {
3719 vector<string> ids;
9f95a23c 3720 if (!cmd_getval(cmdmap, "ids", ids)) {
11fdf7f2
TL
3721 r = -EINVAL;
3722 goto out;
3723 }
3724 set<string> wouldbe;
3725 for (auto rank : quorum) {
3726 wouldbe.insert(monmap->get_name(rank));
3727 }
3728 for (auto& n : ids) {
3729 if (monmap->contains(n)) {
3730 wouldbe.erase(n);
3731 }
3732 }
3733 if (wouldbe.size() < monmap->min_quorum_size()) {
3734 r = -EBUSY;
3735 rs = "not enough monitors would be available (" + stringify(wouldbe) +
3736 ") after stopping mons " + stringify(ids);
3737 goto out;
3738 }
3739 r = 0;
3740 rs = "quorum should be preserved (" + stringify(wouldbe) +
3741 ") after stopping " + stringify(ids);
3742 } else if (prefix == "mon ok-to-add-offline") {
3743 if (quorum.size() < monmap->min_quorum_size(monmap->size() + 1)) {
3744 rs = "adding a monitor may break quorum (until that monitor starts)";
3745 r = -EBUSY;
3746 goto out;
3747 }
3748 rs = "adding another mon that is not yet online will not break quorum";
3749 r = 0;
3750 } else if (prefix == "mon ok-to-rm") {
3751 string id;
9f95a23c 3752 if (!cmd_getval(cmdmap, "id", id)) {
11fdf7f2
TL
3753 r = -EINVAL;
3754 rs = "must specify a monitor id";
3755 goto out;
3756 }
3757 if (!monmap->contains(id)) {
3758 r = 0;
3759 rs = "mon." + id + " does not exist";
3760 goto out;
3761 }
3762 int rank = monmap->get_rank(id);
3763 if (quorum.count(rank) &&
3764 quorum.size() - 1 < monmap->min_quorum_size(monmap->size() - 1)) {
3765 r = -EBUSY;
3766 rs = "removing mon." + id + " would break quorum";
3767 goto out;
3768 }
3769 r = 0;
3770 rs = "safe to remove mon." + id;
7c673cae
FG
3771 } else if (prefix == "version") {
3772 if (f) {
3773 f->open_object_section("version");
3774 f->dump_string("version", pretty_version_to_str());
3775 f->close_section();
3776 f->flush(ds);
3777 } else {
3778 ds << pretty_version_to_str();
3779 }
3780 rdata.append(ds);
3781 rs = "";
3782 r = 0;
c07f9fc5
FG
3783 } else if (prefix == "versions") {
3784 if (!f)
3785 f.reset(Formatter::create("json-pretty"));
3786 map<string,int> overall;
3787 f->open_object_section("version");
3788 map<string,int> mon, mgr, osd, mds;
3789
3790 count_metadata("ceph_version", &mon);
3791 f->open_object_section("mon");
3792 for (auto& p : mon) {
3793 f->dump_int(p.first.c_str(), p.second);
3794 overall[p.first] += p.second;
3795 }
3796 f->close_section();
3797
3798 mgrmon()->count_metadata("ceph_version", &mgr);
3799 f->open_object_section("mgr");
3800 for (auto& p : mgr) {
3801 f->dump_int(p.first.c_str(), p.second);
3802 overall[p.first] += p.second;
3803 }
3804 f->close_section();
3805
3806 osdmon()->count_metadata("ceph_version", &osd);
3807 f->open_object_section("osd");
3808 for (auto& p : osd) {
3809 f->dump_int(p.first.c_str(), p.second);
3810 overall[p.first] += p.second;
3811 }
3812 f->close_section();
3813
3814 mdsmon()->count_metadata("ceph_version", &mds);
3815 f->open_object_section("mds");
35e4c445 3816 for (auto& p : mds) {
c07f9fc5
FG
3817 f->dump_int(p.first.c_str(), p.second);
3818 overall[p.first] += p.second;
3819 }
3820 f->close_section();
3821
3822 for (auto& p : mgrstatmon()->get_service_map().services) {
9f95a23c
TL
3823 auto &service = p.first;
3824 if (ServiceMap::is_normal_ceph_entity(service)) {
3825 continue;
3826 }
3827 f->open_object_section(service.c_str());
c07f9fc5
FG
3828 map<string,int> m;
3829 p.second.count_metadata("ceph_version", &m);
3830 for (auto& q : m) {
3831 f->dump_int(q.first.c_str(), q.second);
3832 overall[q.first] += q.second;
3833 }
3834 f->close_section();
3835 }
3836
3837 f->open_object_section("overall");
3838 for (auto& p : overall) {
3839 f->dump_int(p.first.c_str(), p.second);
3840 }
3841 f->close_section();
3842 f->close_section();
3843 f->flush(rdata);
3844 rs = "";
3845 r = 0;
7c673cae
FG
3846 }
3847
3848 out:
3849 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3850 reply_command(op, r, rs, rdata, 0);
3851}
3852
3853void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
3854{
3855 bufferlist rdata;
3856 reply_command(op, rc, rs, rdata, version);
3857}
3858
3859void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
3860 bufferlist& rdata, version_t version)
3861{
9f95a23c 3862 auto m = op->get_req<MMonCommand>();
11fdf7f2 3863 ceph_assert(m->get_type() == MSG_MON_COMMAND);
7c673cae
FG
3864 MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
3865 reply->set_tid(m->get_tid());
3866 reply->set_data(rdata);
3867 send_reply(op, reply);
3868}
3869
9f95a23c
TL
3870void Monitor::reply_tell_command(
3871 MonOpRequestRef op, int rc, const string &rs)
3872{
3873 MCommand *m = static_cast<MCommand*>(op->get_req());
3874 ceph_assert(m->get_type() == MSG_COMMAND);
3875 MCommandReply *reply = new MCommandReply(rc, rs);
3876 reply->set_tid(m->get_tid());
3877 m->get_connection()->send_message(reply);
3878}
3879
7c673cae
FG
3880
3881// ------------------------
3882// request/reply routing
3883//
3884// a client/mds/osd will connect to a random monitor. we need to forward any
3885// messages requiring state updates to the leader, and then route any replies
3886// back via the correct monitor and back to them. (the monitor will not
3887// initiate any connections.)
3888
3889void Monitor::forward_request_leader(MonOpRequestRef op)
3890{
3891 op->mark_event(__func__);
3892
3893 int mon = get_leader();
3894 MonSession *session = op->get_session();
3895 PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
3896
11fdf7f2 3897 if (req->get_source().is_mon() && req->get_source_addrs() != messenger->get_myaddrs()) {
7c673cae
FG
3898 dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
3899 } else if (session->proxy_con) {
3900 dout(10) << "forward_request won't double fwd request " << *req << dendl;
3901 } else if (!session->closed) {
3902 RoutedRequest *rr = new RoutedRequest;
3903 rr->tid = ++routed_request_tid;
7c673cae
FG
3904 rr->con = req->get_connection();
3905 rr->con_features = rr->con->get_features();
3906 encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
3907 rr->session = static_cast<MonSession *>(session->get());
3908 rr->op = op;
3909 routed_requests[rr->tid] = rr;
3910 session->routed_request_tids.insert(rr->tid);
3911
3912 dout(10) << "forward_request " << rr->tid << " request " << *req
3913 << " features " << rr->con_features << dendl;
3914
3915 MForward *forward = new MForward(rr->tid,
3916 req,
3917 rr->con_features,
3918 rr->session->caps);
3919 forward->set_priority(req->get_priority());
3920 if (session->auth_handler) {
3921 forward->entity_name = session->entity_name;
3922 } else if (req->get_source().is_mon()) {
3923 forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
3924 }
11fdf7f2 3925 send_mon_message(forward, mon);
7c673cae 3926 op->mark_forwarded();
11fdf7f2 3927 ceph_assert(op->get_req()->get_type() != 0);
7c673cae
FG
3928 } else {
3929 dout(10) << "forward_request no session for request " << *req << dendl;
3930 }
3931}
3932
3933// fake connection attached to forwarded messages
3934struct AnonConnection : public Connection {
11fdf7f2 3935 entity_addr_t socket_addr;
7c673cae
FG
3936
3937 int send_message(Message *m) override {
11fdf7f2 3938 ceph_assert(!"send_message on anonymous connection");
7c673cae
FG
3939 }
3940 void send_keepalive() override {
11fdf7f2 3941 ceph_assert(!"send_keepalive on anonymous connection");
7c673cae
FG
3942 }
3943 void mark_down() override {
3944 // silently ignore
3945 }
3946 void mark_disposable() override {
3947 // silengtly ignore
3948 }
3949 bool is_connected() override { return false; }
11fdf7f2
TL
3950 entity_addr_t get_peer_socket_addr() const override {
3951 return socket_addr;
3952 }
9f95a23c
TL
3953
3954private:
3955 FRIEND_MAKE_REF(AnonConnection);
3956 explicit AnonConnection(CephContext *cct, const entity_addr_t& sa)
3957 : Connection(cct, nullptr),
3958 socket_addr(sa) {}
7c673cae
FG
3959};
3960
3961//extract the original message and put it into the regular dispatch function
3962void Monitor::handle_forward(MonOpRequestRef op)
3963{
9f95a23c 3964 auto m = op->get_req<MForward>();
11fdf7f2
TL
3965 dout(10) << "received forwarded message from "
3966 << ceph_entity_type_name(m->client_type)
3967 << " " << m->client_addrs
7c673cae
FG
3968 << " via " << m->get_source_inst() << dendl;
3969 MonSession *session = op->get_session();
11fdf7f2 3970 ceph_assert(session);
7c673cae
FG
3971
3972 if (!session->is_capable("mon", MON_CAP_X)) {
3973 dout(0) << "forward from entity with insufficient caps! "
3974 << session->caps << dendl;
3975 } else {
3976 // see PaxosService::dispatch(); we rely on this being anon
3977 // (c->msgr == NULL)
3978 PaxosServiceMessage *req = m->claim_message();
11fdf7f2
TL
3979 ceph_assert(req != NULL);
3980
9f95a23c 3981 auto c = ceph::make_ref<AnonConnection>(cct, m->client_socket_addr);
11fdf7f2
TL
3982 MonSession *s = new MonSession(static_cast<Connection*>(c.get()));
3983 s->_ident(req->get_source(),
3984 req->get_source_addrs());
3985 c->set_priv(RefCountedPtr{s, false});
3986 c->set_peer_addrs(m->client_addrs);
3987 c->set_peer_type(m->client_type);
7c673cae
FG
3988 c->set_features(m->con_features);
3989
11fdf7f2 3990 s->authenticated = true;
7c673cae
FG
3991 s->caps = m->client_caps;
3992 dout(10) << " caps are " << s->caps << dendl;
3993 s->entity_name = m->entity_name;
3994 dout(10) << " entity name '" << s->entity_name << "' type "
3995 << s->entity_name.get_type() << dendl;
3996 s->proxy_con = m->get_connection();
3997 s->proxy_tid = m->tid;
3998
3999 req->set_connection(c);
4000
4001 // not super accurate, but better than nothing.
4002 req->set_recv_stamp(m->get_recv_stamp());
4003
4004 /*
4005 * note which election epoch this is; we will drop the message if
4006 * there is a future election since our peers will resend routed
4007 * requests in that case.
4008 */
4009 req->rx_election_epoch = get_epoch();
4010
7c673cae 4011 dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
7c673cae 4012 _ms_dispatch(req);
7c673cae 4013
11fdf7f2
TL
4014 // break the session <-> con ref loop by removing the con->session
4015 // reference, which is no longer needed once the MonOpRequest is
4016 // set up.
4017 c->set_priv(NULL);
7c673cae
FG
4018 }
4019}
4020
4021void Monitor::send_reply(MonOpRequestRef op, Message *reply)
4022{
4023 op->mark_event(__func__);
4024
4025 MonSession *session = op->get_session();
11fdf7f2 4026 ceph_assert(session);
7c673cae
FG
4027 Message *req = op->get_req();
4028 ConnectionRef con = op->get_connection();
4029
4030 reply->set_cct(g_ceph_context);
4031 dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
4032
4033 if (!con) {
4034 dout(2) << "send_reply no connection, dropping reply " << *reply
4035 << " to " << req << " " << *req << dendl;
4036 reply->put();
4037 op->mark_event("reply: no connection");
4038 return;
4039 }
4040
4041 if (!session->con && !session->proxy_con) {
4042 dout(2) << "send_reply no connection, dropping reply " << *reply
4043 << " to " << req << " " << *req << dendl;
4044 reply->put();
4045 op->mark_event("reply: no connection");
4046 return;
4047 }
4048
4049 if (session->proxy_con) {
4050 dout(15) << "send_reply routing reply to " << con->get_peer_addr()
4051 << " via " << session->proxy_con->get_peer_addr()
4052 << " for request " << *req << dendl;
4053 session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
4054 op->mark_event("reply: send routed request");
4055 } else {
4056 session->con->send_message(reply);
4057 op->mark_event("reply: send");
4058 }
4059}
4060
4061void Monitor::no_reply(MonOpRequestRef op)
4062{
4063 MonSession *session = op->get_session();
4064 Message *req = op->get_req();
4065
4066 if (session->proxy_con) {
4067 dout(10) << "no_reply to " << req->get_source_inst()
4068 << " via " << session->proxy_con->get_peer_addr()
4069 << " for request " << *req << dendl;
4070 session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
4071 op->mark_event("no_reply: send routed request");
4072 } else {
4073 dout(10) << "no_reply to " << req->get_source_inst()
4074 << " " << *req << dendl;
4075 op->mark_event("no_reply");
4076 }
4077}
4078
4079void Monitor::handle_route(MonOpRequestRef op)
4080{
9f95a23c 4081 auto m = op->get_req<MRoute>();
7c673cae
FG
4082 MonSession *session = op->get_session();
4083 //check privileges
4084 if (!session->is_capable("mon", MON_CAP_X)) {
4085 dout(0) << "MRoute received from entity without appropriate perms! "
4086 << dendl;
4087 return;
4088 }
4089 if (m->msg)
11fdf7f2
TL
4090 dout(10) << "handle_route tid " << m->session_mon_tid << " " << *m->msg
4091 << dendl;
7c673cae 4092 else
11fdf7f2 4093 dout(10) << "handle_route tid " << m->session_mon_tid << " null" << dendl;
7c673cae
FG
4094
4095 // look it up
4096 if (m->session_mon_tid) {
4097 if (routed_requests.count(m->session_mon_tid)) {
4098 RoutedRequest *rr = routed_requests[m->session_mon_tid];
4099
4100 // reset payload, in case encoding is dependent on target features
4101 if (m->msg) {
4102 m->msg->clear_payload();
4103 rr->con->send_message(m->msg);
4104 m->msg = NULL;
4105 }
4106 if (m->send_osdmap_first) {
4107 dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
4108 osdmon()->send_incremental(m->send_osdmap_first, rr->session,
4109 true, MonOpRequestRef());
4110 }
11fdf7f2 4111 ceph_assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
7c673cae
FG
4112 routed_requests.erase(m->session_mon_tid);
4113 rr->session->routed_request_tids.erase(m->session_mon_tid);
4114 delete rr;
4115 } else {
4116 dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
4117 }
4118 } else {
11fdf7f2 4119 dout(10) << " not a routed request, ignoring" << dendl;
7c673cae
FG
4120 }
4121}
4122
4123void Monitor::resend_routed_requests()
4124{
4125 dout(10) << "resend_routed_requests" << dendl;
4126 int mon = get_leader();
4127 list<Context*> retry;
4128 for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
4129 p != routed_requests.end();
4130 ++p) {
4131 RoutedRequest *rr = p->second;
4132
4133 if (mon == rank) {
4134 dout(10) << " requeue for self tid " << rr->tid << dendl;
4135 rr->op->mark_event("retry routed request");
4136 retry.push_back(new C_RetryMessage(this, rr->op));
4137 if (rr->session) {
11fdf7f2 4138 ceph_assert(rr->session->routed_request_tids.count(p->first));
7c673cae
FG
4139 rr->session->routed_request_tids.erase(p->first);
4140 }
4141 delete rr;
4142 } else {
11fdf7f2
TL
4143 auto q = rr->request_bl.cbegin();
4144 PaxosServiceMessage *req =
4145 (PaxosServiceMessage *)decode_message(cct, 0, q);
7c673cae 4146 rr->op->mark_event("resend forwarded message to leader");
11fdf7f2
TL
4147 dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req
4148 << dendl;
4149 MForward *forward = new MForward(rr->tid,
4150 req,
4151 rr->con_features,
7c673cae
FG
4152 rr->session->caps);
4153 req->put(); // forward takes its own ref; drop ours.
11fdf7f2
TL
4154 forward->client_type = rr->con->get_peer_type();
4155 forward->client_addrs = rr->con->get_peer_addrs();
4156 forward->client_socket_addr = rr->con->get_peer_socket_addr();
7c673cae 4157 forward->set_priority(req->get_priority());
11fdf7f2 4158 send_mon_message(forward, mon);
7c673cae
FG
4159 }
4160 }
4161 if (mon == rank) {
4162 routed_requests.clear();
4163 finish_contexts(g_ceph_context, retry);
4164 }
4165}
4166
4167void Monitor::remove_session(MonSession *s)
4168{
11fdf7f2 4169 dout(10) << "remove_session " << s << " " << s->name << " " << s->addrs
224ce89b 4170 << " features 0x" << std::hex << s->con_features << std::dec << dendl;
11fdf7f2
TL
4171 ceph_assert(s->con);
4172 ceph_assert(!s->closed);
7c673cae
FG
4173 for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
4174 p != s->routed_request_tids.end();
4175 ++p) {
11fdf7f2 4176 ceph_assert(routed_requests.count(*p));
7c673cae
FG
4177 RoutedRequest *rr = routed_requests[*p];
4178 dout(10) << " dropping routed request " << rr->tid << dendl;
4179 delete rr;
4180 routed_requests.erase(*p);
4181 }
4182 s->routed_request_tids.clear();
11fdf7f2 4183 s->con->set_priv(nullptr);
7c673cae
FG
4184 session_map.remove_session(s);
4185 logger->set(l_mon_num_sessions, session_map.get_size());
4186 logger->inc(l_mon_session_rm);
4187}
4188
4189void Monitor::remove_all_sessions()
4190{
11fdf7f2 4191 std::lock_guard l(session_map_lock);
7c673cae
FG
4192 while (!session_map.sessions.empty()) {
4193 MonSession *s = session_map.sessions.front();
4194 remove_session(s);
11fdf7f2 4195 logger->inc(l_mon_session_rm);
7c673cae
FG
4196 }
4197 if (logger)
4198 logger->set(l_mon_num_sessions, session_map.get_size());
4199}
4200
11fdf7f2 4201void Monitor::send_mon_message(Message *m, int rank)
7c673cae 4202{
11fdf7f2 4203 messenger->send_to_mon(m, monmap->get_addrs(rank));
7c673cae
FG
4204}
4205
4206void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
4207{
4208 /**
4209 * Wait list the new session until we're in the quorum, assuming it's
4210 * sufficiently new.
4211 * tick() will periodically send them back through so we can send
4212 * the client elsewhere if we don't think we're getting back in.
4213 *
4214 * But we whitelist a few sorts of messages:
4215 * 1) Monitors can talk to us at any time, of course.
4216 * 2) auth messages. It's unlikely to go through much faster, but
4217 * it's possible we've just lost our quorum status and we want to take...
4218 * 3) command messages. We want to accept these under all possible
4219 * circumstances.
4220 */
4221 Message *m = op->get_req();
4222 MonSession *s = op->get_session();
4223 ConnectionRef con = op->get_connection();
4224 utime_t too_old = ceph_clock_now();
4225 too_old -= g_ceph_context->_conf->mon_lease;
4226 if (m->get_recv_stamp() > too_old &&
4227 con->is_connected()) {
4228 dout(5) << "waitlisting message " << *m << dendl;
4229 maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
4230 op->mark_wait_for_quorum();
4231 } else {
4232 dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
4233 con->mark_down();
4234 // proxied sessions aren't registered and don't have a con; don't remove
4235 // those.
4236 if (!s->proxy_con) {
11fdf7f2 4237 std::lock_guard l(session_map_lock);
7c673cae
FG
4238 remove_session(s);
4239 }
4240 op->mark_zap();
4241 }
4242}
4243
4244void Monitor::_ms_dispatch(Message *m)
4245{
4246 if (is_shutdown()) {
4247 m->put();
4248 return;
4249 }
4250
4251 MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
4252 bool src_is_mon = op->is_src_mon();
4253 op->mark_event("mon:_ms_dispatch");
4254 MonSession *s = op->get_session();
4255 if (s && s->closed) {
4256 return;
4257 }
224ce89b
WB
4258
4259 if (src_is_mon && s) {
4260 ConnectionRef con = m->get_connection();
4261 if (con->get_messenger() && con->get_features() != s->con_features) {
4262 // only update features if this is a non-anonymous connection
4263 dout(10) << __func__ << " feature change for " << m->get_source_inst()
4264 << " (was " << s->con_features
4265 << ", now " << con->get_features() << ")" << dendl;
4266 // connection features changed - recreate session.
4267 if (s->con && s->con != con) {
4268 dout(10) << __func__ << " connection for " << m->get_source_inst()
4269 << " changed from session; mark down and replace" << dendl;
4270 s->con->mark_down();
4271 }
4272 if (s->item.is_on_list()) {
4273 // forwarded messages' sessions are not in the sessions map and
4274 // exist only while the op is being handled.
9f95a23c 4275 std::lock_guard l(session_map_lock);
224ce89b
WB
4276 remove_session(s);
4277 }
224ce89b
WB
4278 s = nullptr;
4279 }
4280 }
4281
7c673cae
FG
4282 if (!s) {
4283 // if the sender is not a monitor, make sure their first message for a
4284 // session is an MAuth. If it is not, assume it's a stray message,
4285 // and considering that we are creating a new session it is safe to
4286 // assume that the sender hasn't authenticated yet, so we have no way
4287 // of assessing whether we should handle it or not.
4288 if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
4289 m->get_type() != CEPH_MSG_MON_GET_MAP &&
4290 m->get_type() != CEPH_MSG_PING)) {
4291 dout(1) << __func__ << " dropping stray message " << *m
4292 << " from " << m->get_source_inst() << dendl;
4293 return;
4294 }
4295
4296 ConnectionRef con = m->get_connection();
4297 {
11fdf7f2
TL
4298 std::lock_guard l(session_map_lock);
4299 s = session_map.new_session(m->get_source(),
4300 m->get_source_addrs(),
4301 con.get());
7c673cae 4302 }
11fdf7f2
TL
4303 ceph_assert(s);
4304 con->set_priv(RefCountedPtr{s, false});
224ce89b
WB
4305 dout(10) << __func__ << " new session " << s << " " << *s
4306 << " features 0x" << std::hex
4307 << s->con_features << std::dec << dendl;
7c673cae
FG
4308 op->set_session(s);
4309
4310 logger->set(l_mon_num_sessions, session_map.get_size());
4311 logger->inc(l_mon_session_add);
4312
4313 if (src_is_mon) {
4314 // give it monitor caps; the peer type has been authenticated
4315 dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
4316 if (!s->caps.is_allow_all()) // but no need to repeatedly copy
11fdf7f2
TL
4317 s->caps = mon_caps;
4318 s->authenticated = true;
7c673cae 4319 }
7c673cae 4320 } else {
11fdf7f2 4321 dout(20) << __func__ << " existing session " << s << " for " << s->name
7c673cae
FG
4322 << dendl;
4323 }
4324
11fdf7f2 4325 ceph_assert(s);
7c673cae
FG
4326
4327 s->session_timeout = ceph_clock_now();
11fdf7f2 4328 s->session_timeout += g_conf()->mon_session_timeout;
7c673cae
FG
4329
4330 if (s->auth_handler) {
4331 s->entity_name = s->auth_handler->get_entity_name();
4332 }
9f95a23c
TL
4333 dout(20) << " entity " << s->entity_name
4334 << " caps " << s->caps.get_str() << dendl;
7c673cae
FG
4335
4336 if ((is_synchronizing() ||
11fdf7f2 4337 (!s->authenticated && !exited_quorum.is_zero())) &&
7c673cae
FG
4338 !src_is_mon &&
4339 m->get_type() != CEPH_MSG_PING) {
4340 waitlist_or_zap_client(op);
4341 } else {
4342 dispatch_op(op);
4343 }
4344 return;
4345}
4346
4347void Monitor::dispatch_op(MonOpRequestRef op)
4348{
4349 op->mark_event("mon:dispatch_op");
4350 MonSession *s = op->get_session();
11fdf7f2 4351 ceph_assert(s);
7c673cae
FG
4352 if (s->closed) {
4353 dout(10) << " session closed, dropping " << op->get_req() << dendl;
4354 return;
4355 }
4356
4357 /* we will consider the default type as being 'monitor' until proven wrong */
4358 op->set_type_monitor();
4359 /* deal with all messages that do not necessarily need caps */
7c673cae
FG
4360 switch (op->get_req()->get_type()) {
4361 // auth
4362 case MSG_MON_GLOBAL_ID:
4363 case CEPH_MSG_AUTH:
4364 op->set_type_service();
4365 /* no need to check caps here */
4366 paxos_service[PAXOS_AUTH]->dispatch(op);
11fdf7f2 4367 return;
7c673cae
FG
4368
4369 case CEPH_MSG_PING:
4370 handle_ping(op);
11fdf7f2 4371 return;
9f95a23c
TL
4372 case MSG_COMMAND:
4373 op->set_type_command();
4374 handle_tell_command(op);
4375 return;
11fdf7f2 4376 }
7c673cae 4377
11fdf7f2
TL
4378 if (!op->get_session()->authenticated) {
4379 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4380 << " is not authenticated, dropping " << *(op->get_req())
4381 << dendl;
4382 return;
4383 }
4384
4385 switch (op->get_req()->get_type()) {
7c673cae
FG
4386 case CEPH_MSG_MON_GET_MAP:
4387 handle_mon_get_map(op);
11fdf7f2
TL
4388 return;
4389
4390 case MSG_GET_CONFIG:
4391 configmon()->handle_get_config(op);
4392 return;
7c673cae
FG
4393
4394 case CEPH_MSG_MON_METADATA:
4395 return handle_mon_metadata(op);
4396
11fdf7f2
TL
4397 case CEPH_MSG_MON_SUBSCRIBE:
4398 /* FIXME: check what's being subscribed, filter accordingly */
4399 handle_subscribe(op);
4400 return;
7c673cae 4401 }
7c673cae
FG
4402
4403 /* well, maybe the op belongs to a service... */
4404 op->set_type_service();
4405 /* deal with all messages which caps should be checked somewhere else */
7c673cae
FG
4406 switch (op->get_req()->get_type()) {
4407
4408 // OSDs
4409 case CEPH_MSG_MON_GET_OSDMAP:
4410 case CEPH_MSG_POOLOP:
4411 case MSG_OSD_BEACON:
4412 case MSG_OSD_MARK_ME_DOWN:
9f95a23c 4413 case MSG_OSD_MARK_ME_DEAD:
7c673cae
FG
4414 case MSG_OSD_FULL:
4415 case MSG_OSD_FAILURE:
4416 case MSG_OSD_BOOT:
4417 case MSG_OSD_ALIVE:
4418 case MSG_OSD_PGTEMP:
4419 case MSG_OSD_PG_CREATED:
4420 case MSG_REMOVE_SNAPS:
9f95a23c 4421 case MSG_MON_GET_PURGED_SNAPS:
11fdf7f2 4422 case MSG_OSD_PG_READY_TO_MERGE:
7c673cae 4423 paxos_service[PAXOS_OSDMAP]->dispatch(op);
11fdf7f2 4424 return;
7c673cae
FG
4425
4426 // MDSs
4427 case MSG_MDS_BEACON:
4428 case MSG_MDS_OFFLOAD_TARGETS:
4429 paxos_service[PAXOS_MDSMAP]->dispatch(op);
11fdf7f2 4430 return;
7c673cae
FG
4431
4432 // Mgrs
4433 case MSG_MGR_BEACON:
4434 paxos_service[PAXOS_MGR]->dispatch(op);
11fdf7f2 4435 return;
7c673cae 4436
31f18b77 4437 // MgrStat
b32b8144 4438 case MSG_MON_MGR_REPORT:
11fdf7f2 4439 case CEPH_MSG_STATFS:
7c673cae 4440 case MSG_GETPOOLSTATS:
31f18b77 4441 paxos_service[PAXOS_MGRSTAT]->dispatch(op);
11fdf7f2 4442 return;
7c673cae 4443
11fdf7f2 4444 // log
7c673cae
FG
4445 case MSG_LOG:
4446 paxos_service[PAXOS_LOG]->dispatch(op);
11fdf7f2 4447 return;
7c673cae
FG
4448
4449 // handle_command() does its own caps checking
4450 case MSG_MON_COMMAND:
4451 op->set_type_command();
4452 handle_command(op);
11fdf7f2 4453 return;
7c673cae 4454 }
7c673cae
FG
4455
4456 /* nop, looks like it's not a service message; revert back to monitor */
4457 op->set_type_monitor();
4458
4459 /* messages we, the Monitor class, need to deal with
4460 * but may be sent by clients. */
4461
4462 if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
4463 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4464 << " not enough caps for " << *(op->get_req()) << " -- dropping"
4465 << dendl;
11fdf7f2 4466 return;
7c673cae
FG
4467 }
4468
7c673cae 4469 switch (op->get_req()->get_type()) {
7c673cae
FG
4470 // misc
4471 case CEPH_MSG_MON_GET_VERSION:
4472 handle_get_version(op);
11fdf7f2 4473 return;
7c673cae 4474 }
7c673cae
FG
4475
4476 if (!op->is_src_mon()) {
4477 dout(1) << __func__ << " unexpected monitor message from"
4478 << " non-monitor entity " << op->get_req()->get_source_inst()
4479 << " " << *(op->get_req()) << " -- dropping" << dendl;
11fdf7f2 4480 return;
7c673cae
FG
4481 }
4482
4483 /* messages that should only be sent by another monitor */
7c673cae
FG
4484 switch (op->get_req()->get_type()) {
4485
4486 case MSG_ROUTE:
4487 handle_route(op);
11fdf7f2 4488 return;
7c673cae
FG
4489
4490 case MSG_MON_PROBE:
4491 handle_probe(op);
11fdf7f2 4492 return;
7c673cae
FG
4493
4494 // Sync (i.e., the new slurp, but on steroids)
4495 case MSG_MON_SYNC:
4496 handle_sync(op);
11fdf7f2 4497 return;
7c673cae
FG
4498 case MSG_MON_SCRUB:
4499 handle_scrub(op);
11fdf7f2 4500 return;
7c673cae
FG
4501
4502 /* log acks are sent from a monitor we sent the MLog to, and are
4503 never sent by clients to us. */
4504 case MSG_LOGACK:
4505 log_client.handle_log_ack((MLogAck*)op->get_req());
11fdf7f2 4506 return;
7c673cae
FG
4507
4508 // monmap
4509 case MSG_MON_JOIN:
4510 op->set_type_service();
4511 paxos_service[PAXOS_MONMAP]->dispatch(op);
11fdf7f2 4512 return;
7c673cae
FG
4513
4514 // paxos
4515 case MSG_MON_PAXOS:
4516 {
4517 op->set_type_paxos();
9f95a23c 4518 auto pm = op->get_req<MMonPaxos>();
7c673cae
FG
4519 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4520 //can't send these!
11fdf7f2 4521 return;
7c673cae
FG
4522 }
4523
4524 if (state == STATE_SYNCHRONIZING) {
4525 // we are synchronizing. These messages would do us no
4526 // good, thus just drop them and ignore them.
4527 dout(10) << __func__ << " ignore paxos msg from "
4528 << pm->get_source_inst() << dendl;
11fdf7f2 4529 return;
7c673cae
FG
4530 }
4531
4532 // sanitize
4533 if (pm->epoch > get_epoch()) {
4534 bootstrap();
11fdf7f2 4535 return;
7c673cae
FG
4536 }
4537 if (pm->epoch != get_epoch()) {
11fdf7f2 4538 return;
7c673cae
FG
4539 }
4540
4541 paxos->dispatch(op);
4542 }
11fdf7f2 4543 return;
7c673cae
FG
4544
4545 // elector messages
4546 case MSG_MON_ELECTION:
4547 op->set_type_election();
4548 //check privileges here for simplicity
4549 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4550 dout(0) << "MMonElection received from entity without enough caps!"
4551 << op->get_session()->caps << dendl;
11fdf7f2 4552 return;;
7c673cae
FG
4553 }
4554 if (!is_probing() && !is_synchronizing()) {
4555 elector.dispatch(op);
4556 }
11fdf7f2 4557 return;
7c673cae
FG
4558
4559 case MSG_FORWARD:
4560 handle_forward(op);
11fdf7f2 4561 return;
7c673cae
FG
4562
4563 case MSG_TIMECHECK:
11fdf7f2
TL
4564 dout(5) << __func__ << " ignoring " << op << dendl;
4565 return;
4566 case MSG_TIMECHECK2:
7c673cae 4567 handle_timecheck(op);
11fdf7f2 4568 return;
7c673cae
FG
4569
4570 case MSG_MON_HEALTH:
11fdf7f2
TL
4571 dout(5) << __func__ << " dropping deprecated message: "
4572 << *op->get_req() << dendl;
7c673cae 4573 break;
224ce89b
WB
4574 case MSG_MON_HEALTH_CHECKS:
4575 op->set_type_service();
4576 paxos_service[PAXOS_HEALTH]->dispatch(op);
11fdf7f2 4577 return;
7c673cae 4578 }
11fdf7f2 4579 dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
7c673cae
FG
4580 return;
4581}
4582
4583void Monitor::handle_ping(MonOpRequestRef op)
4584{
9f95a23c 4585 auto m = op->get_req<MPing>();
7c673cae
FG
4586 dout(10) << __func__ << " " << *m << dendl;
4587 MPing *reply = new MPing;
7c673cae
FG
4588 bufferlist payload;
4589 boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4590 f->open_object_section("pong");
4591
9f95a23c
TL
4592 healthmon()->get_health_status(false, f.get(), nullptr);
4593 get_mon_status(f.get());
7c673cae
FG
4594
4595 f->close_section();
4596 stringstream ss;
4597 f->flush(ss);
11fdf7f2 4598 encode(ss.str(), payload);
7c673cae
FG
4599 reply->set_payload(payload);
4600 dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
11fdf7f2 4601 m->get_connection()->send_message(reply);
7c673cae
FG
4602}
4603
4604void Monitor::timecheck_start()
4605{
4606 dout(10) << __func__ << dendl;
4607 timecheck_cleanup();
11fdf7f2
TL
4608 if (get_quorum_mon_features().contains_all(
4609 ceph::features::mon::FEATURE_NAUTILUS)) {
4610 timecheck_start_round();
4611 }
7c673cae
FG
4612}
4613
4614void Monitor::timecheck_finish()
4615{
4616 dout(10) << __func__ << dendl;
4617 timecheck_cleanup();
4618}
4619
4620void Monitor::timecheck_start_round()
4621{
4622 dout(10) << __func__ << " curr " << timecheck_round << dendl;
11fdf7f2 4623 ceph_assert(is_leader());
7c673cae
FG
4624
4625 if (monmap->size() == 1) {
11fdf7f2 4626 ceph_abort_msg("We are alone; this shouldn't have been scheduled!");
7c673cae
FG
4627 return;
4628 }
4629
4630 if (timecheck_round % 2) {
4631 dout(10) << __func__ << " there's a timecheck going on" << dendl;
4632 utime_t curr_time = ceph_clock_now();
11fdf7f2 4633 double max = g_conf()->mon_timecheck_interval*3;
7c673cae
FG
4634 if (curr_time - timecheck_round_start < max) {
4635 dout(10) << __func__ << " keep current round going" << dendl;
4636 goto out;
4637 } else {
4638 dout(10) << __func__
4639 << " finish current timecheck and start new" << dendl;
4640 timecheck_cancel_round();
4641 }
4642 }
4643
11fdf7f2 4644 ceph_assert(timecheck_round % 2 == 0);
7c673cae
FG
4645 timecheck_acks = 0;
4646 timecheck_round ++;
4647 timecheck_round_start = ceph_clock_now();
4648 dout(10) << __func__ << " new " << timecheck_round << dendl;
4649
4650 timecheck();
4651out:
4652 dout(10) << __func__ << " setting up next event" << dendl;
4653 timecheck_reset_event();
4654}
4655
4656void Monitor::timecheck_finish_round(bool success)
4657{
4658 dout(10) << __func__ << " curr " << timecheck_round << dendl;
11fdf7f2 4659 ceph_assert(timecheck_round % 2);
7c673cae
FG
4660 timecheck_round ++;
4661 timecheck_round_start = utime_t();
4662
4663 if (success) {
11fdf7f2
TL
4664 ceph_assert(timecheck_waiting.empty());
4665 ceph_assert(timecheck_acks == quorum.size());
7c673cae
FG
4666 timecheck_report();
4667 timecheck_check_skews();
4668 return;
4669 }
4670
4671 dout(10) << __func__ << " " << timecheck_waiting.size()
4672 << " peers still waiting:";
11fdf7f2
TL
4673 for (auto& p : timecheck_waiting) {
4674 *_dout << " mon." << p.first;
7c673cae
FG
4675 }
4676 *_dout << dendl;
4677 timecheck_waiting.clear();
4678
4679 dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4680}
4681
4682void Monitor::timecheck_cancel_round()
4683{
4684 timecheck_finish_round(false);
4685}
4686
4687void Monitor::timecheck_cleanup()
4688{
4689 timecheck_round = 0;
4690 timecheck_acks = 0;
4691 timecheck_round_start = utime_t();
4692
4693 if (timecheck_event) {
4694 timer.cancel_event(timecheck_event);
4695 timecheck_event = NULL;
4696 }
4697 timecheck_waiting.clear();
4698 timecheck_skews.clear();
4699 timecheck_latencies.clear();
4700
4701 timecheck_rounds_since_clean = 0;
4702}
4703
4704void Monitor::timecheck_reset_event()
4705{
4706 if (timecheck_event) {
4707 timer.cancel_event(timecheck_event);
4708 timecheck_event = NULL;
4709 }
4710
4711 double delay =
4712 cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4713
4714 if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4715 delay = cct->_conf->mon_timecheck_interval;
4716 }
4717
4718 dout(10) << __func__ << " delay " << delay
4719 << " rounds_since_clean " << timecheck_rounds_since_clean
4720 << dendl;
4721
3efd9988
FG
4722 timecheck_event = timer.add_event_after(
4723 delay,
9f95a23c 4724 new C_MonContext{this, [this](int) {
3efd9988 4725 timecheck_start_round();
9f95a23c 4726 }});
7c673cae
FG
4727}
4728
4729void Monitor::timecheck_check_skews()
4730{
4731 dout(10) << __func__ << dendl;
11fdf7f2
TL
4732 ceph_assert(is_leader());
4733 ceph_assert((timecheck_round % 2) == 0);
7c673cae 4734 if (monmap->size() == 1) {
11fdf7f2 4735 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
7c673cae
FG
4736 return;
4737 }
11fdf7f2 4738 ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
7c673cae
FG
4739
4740 bool found_skew = false;
11fdf7f2 4741 for (auto& p : timecheck_skews) {
7c673cae 4742 double abs_skew;
11fdf7f2 4743 if (timecheck_has_skew(p.second, &abs_skew)) {
7c673cae 4744 dout(10) << __func__
11fdf7f2 4745 << " " << p.first << " skew " << abs_skew << dendl;
7c673cae
FG
4746 found_skew = true;
4747 }
4748 }
4749
4750 if (found_skew) {
4751 ++timecheck_rounds_since_clean;
4752 timecheck_reset_event();
4753 } else if (timecheck_rounds_since_clean > 0) {
4754 dout(1) << __func__
4755 << " no clock skews found after " << timecheck_rounds_since_clean
4756 << " rounds" << dendl;
4757 // make sure the skews are really gone and not just a transient success
4758 // this will run just once if not in the presence of skews again.
4759 timecheck_rounds_since_clean = 1;
4760 timecheck_reset_event();
4761 timecheck_rounds_since_clean = 0;
4762 }
4763
4764}
4765
4766void Monitor::timecheck_report()
4767{
4768 dout(10) << __func__ << dendl;
11fdf7f2
TL
4769 ceph_assert(is_leader());
4770 ceph_assert((timecheck_round % 2) == 0);
7c673cae 4771 if (monmap->size() == 1) {
11fdf7f2 4772 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
7c673cae
FG
4773 return;
4774 }
4775
11fdf7f2 4776 ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
7c673cae
FG
4777 bool do_output = true; // only output report once
4778 for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4779 if (monmap->get_name(*q) == name)
4780 continue;
4781
11fdf7f2 4782 MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_REPORT);
7c673cae
FG
4783 m->epoch = get_epoch();
4784 m->round = timecheck_round;
4785
11fdf7f2
TL
4786 for (auto& it : timecheck_skews) {
4787 double skew = it.second;
4788 double latency = timecheck_latencies[it.first];
7c673cae 4789
11fdf7f2
TL
4790 m->skews[it.first] = skew;
4791 m->latencies[it.first] = latency;
7c673cae
FG
4792
4793 if (do_output) {
11fdf7f2 4794 dout(25) << __func__ << " mon." << it.first
7c673cae
FG
4795 << " latency " << latency
4796 << " skew " << skew << dendl;
4797 }
4798 }
4799 do_output = false;
11fdf7f2
TL
4800 dout(10) << __func__ << " send report to mon." << *q << dendl;
4801 send_mon_message(m, *q);
7c673cae
FG
4802 }
4803}
4804
4805void Monitor::timecheck()
4806{
4807 dout(10) << __func__ << dendl;
11fdf7f2 4808 ceph_assert(is_leader());
7c673cae 4809 if (monmap->size() == 1) {
11fdf7f2 4810 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
7c673cae
FG
4811 return;
4812 }
11fdf7f2 4813 ceph_assert(timecheck_round % 2 != 0);
7c673cae
FG
4814
4815 timecheck_acks = 1; // we ack ourselves
4816
4817 dout(10) << __func__ << " start timecheck epoch " << get_epoch()
4818 << " round " << timecheck_round << dendl;
4819
4820 // we are at the eye of the storm; the point of reference
11fdf7f2
TL
4821 timecheck_skews[rank] = 0.0;
4822 timecheck_latencies[rank] = 0.0;
7c673cae
FG
4823
4824 for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
4825 if (monmap->get_name(*it) == name)
4826 continue;
4827
7c673cae 4828 utime_t curr_time = ceph_clock_now();
11fdf7f2
TL
4829 timecheck_waiting[*it] = curr_time;
4830 MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_PING);
7c673cae
FG
4831 m->epoch = get_epoch();
4832 m->round = timecheck_round;
11fdf7f2
TL
4833 dout(10) << __func__ << " send " << *m << " to mon." << *it << dendl;
4834 send_mon_message(m, *it);
7c673cae
FG
4835 }
4836}
4837
4838health_status_t Monitor::timecheck_status(ostringstream &ss,
4839 const double skew_bound,
4840 const double latency)
4841{
4842 health_status_t status = HEALTH_OK;
11fdf7f2 4843 ceph_assert(latency >= 0);
7c673cae
FG
4844
4845 double abs_skew;
4846 if (timecheck_has_skew(skew_bound, &abs_skew)) {
4847 status = HEALTH_WARN;
4848 ss << "clock skew " << abs_skew << "s"
11fdf7f2 4849 << " > max " << g_conf()->mon_clock_drift_allowed << "s";
7c673cae
FG
4850 }
4851
4852 return status;
4853}
4854
4855void Monitor::handle_timecheck_leader(MonOpRequestRef op)
4856{
9f95a23c 4857 auto m = op->get_req<MTimeCheck2>();
7c673cae
FG
4858 dout(10) << __func__ << " " << *m << dendl;
4859 /* handles PONG's */
11fdf7f2 4860 ceph_assert(m->op == MTimeCheck2::OP_PONG);
7c673cae 4861
11fdf7f2 4862 int other = m->get_source().num();
7c673cae
FG
4863 if (m->epoch < get_epoch()) {
4864 dout(1) << __func__ << " got old timecheck epoch " << m->epoch
4865 << " from " << other
4866 << " curr " << get_epoch()
4867 << " -- severely lagged? discard" << dendl;
4868 return;
4869 }
11fdf7f2 4870 ceph_assert(m->epoch == get_epoch());
7c673cae
FG
4871
4872 if (m->round < timecheck_round) {
4873 dout(1) << __func__ << " got old round " << m->round
4874 << " from " << other
4875 << " curr " << timecheck_round << " -- discard" << dendl;
4876 return;
4877 }
4878
4879 utime_t curr_time = ceph_clock_now();
4880
11fdf7f2 4881 ceph_assert(timecheck_waiting.count(other) > 0);
7c673cae
FG
4882 utime_t timecheck_sent = timecheck_waiting[other];
4883 timecheck_waiting.erase(other);
4884 if (curr_time < timecheck_sent) {
4885 // our clock was readjusted -- drop everything until it all makes sense.
4886 dout(1) << __func__ << " our clock was readjusted --"
4887 << " bump round and drop current check"
4888 << dendl;
4889 timecheck_cancel_round();
4890 return;
4891 }
4892
4893 /* update peer latencies */
4894 double latency = (double)(curr_time - timecheck_sent);
4895
4896 if (timecheck_latencies.count(other) == 0)
4897 timecheck_latencies[other] = latency;
4898 else {
4899 double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
4900 timecheck_latencies[other] = avg_latency;
4901 }
4902
4903 /*
4904 * update skews
4905 *
4906 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4907 * and 'a' happens to be lower than 'b'; so we use double instead.
4908 *
4909 * latency is always expected to be >= 0.
4910 *
4911 * delta, the difference between theirs timestamp and ours, may either be
4912 * lower or higher than 0; will hardly ever be 0.
4913 *
4914 * The absolute skew is the absolute delta minus the latency, which is
4915 * taken as a whole instead of an rtt given that there is some queueing
4916 * and dispatch times involved and it's hard to assess how long exactly
4917 * it took for the message to travel to the other side and be handled. So
4918 * we call it a bounded skew, the worst case scenario.
4919 *
4920 * Now, to math!
4921 *
4922 * Given that the latency is always positive, we can establish that the
4923 * bounded skew will be:
4924 *
4925 * 1. positive if the absolute delta is higher than the latency and
4926 * delta is positive
4927 * 2. negative if the absolute delta is higher than the latency and
4928 * delta is negative.
4929 * 3. zero if the absolute delta is lower than the latency.
4930 *
4931 * On 3. we make a judgement call and treat the skew as non-existent.
4932 * This is because that, if the absolute delta is lower than the
4933 * latency, then the apparently existing skew is nothing more than a
4934 * side-effect of the high latency at work.
4935 *
4936 * This may not be entirely true though, as a severely skewed clock
4937 * may be masked by an even higher latency, but with high latencies
4938 * we probably have worse issues to deal with than just skewed clocks.
4939 */
11fdf7f2 4940 ceph_assert(latency >= 0);
7c673cae
FG
4941
4942 double delta = ((double) m->timestamp) - ((double) curr_time);
4943 double abs_delta = (delta > 0 ? delta : -delta);
4944 double skew_bound = abs_delta - latency;
4945 if (skew_bound < 0)
4946 skew_bound = 0;
4947 else if (delta < 0)
4948 skew_bound = -skew_bound;
4949
4950 ostringstream ss;
4951 health_status_t status = timecheck_status(ss, skew_bound, latency);
b32b8144
FG
4952 if (status != HEALTH_OK) {
4953 clog->health(status) << other << " " << ss.str();
4954 }
7c673cae
FG
4955
4956 dout(10) << __func__ << " from " << other << " ts " << m->timestamp
4957 << " delta " << delta << " skew_bound " << skew_bound
4958 << " latency " << latency << dendl;
4959
4960 timecheck_skews[other] = skew_bound;
4961
4962 timecheck_acks++;
4963 if (timecheck_acks == quorum.size()) {
4964 dout(10) << __func__ << " got pongs from everybody ("
4965 << timecheck_acks << " total)" << dendl;
11fdf7f2
TL
4966 ceph_assert(timecheck_skews.size() == timecheck_acks);
4967 ceph_assert(timecheck_waiting.empty());
7c673cae
FG
4968 // everyone has acked, so bump the round to finish it.
4969 timecheck_finish_round();
4970 }
4971}
4972
4973void Monitor::handle_timecheck_peon(MonOpRequestRef op)
4974{
9f95a23c 4975 auto m = op->get_req<MTimeCheck2>();
7c673cae
FG
4976 dout(10) << __func__ << " " << *m << dendl;
4977
11fdf7f2
TL
4978 ceph_assert(is_peon());
4979 ceph_assert(m->op == MTimeCheck2::OP_PING || m->op == MTimeCheck2::OP_REPORT);
7c673cae
FG
4980
4981 if (m->epoch != get_epoch()) {
4982 dout(1) << __func__ << " got wrong epoch "
4983 << "(ours " << get_epoch()
4984 << " theirs: " << m->epoch << ") -- discarding" << dendl;
4985 return;
4986 }
4987
4988 if (m->round < timecheck_round) {
4989 dout(1) << __func__ << " got old round " << m->round
4990 << " current " << timecheck_round
4991 << " (epoch " << get_epoch() << ") -- discarding" << dendl;
4992 return;
4993 }
4994
4995 timecheck_round = m->round;
4996
11fdf7f2
TL
4997 if (m->op == MTimeCheck2::OP_REPORT) {
4998 ceph_assert((timecheck_round % 2) == 0);
7c673cae
FG
4999 timecheck_latencies.swap(m->latencies);
5000 timecheck_skews.swap(m->skews);
5001 return;
5002 }
5003
11fdf7f2
TL
5004 ceph_assert((timecheck_round % 2) != 0);
5005 MTimeCheck2 *reply = new MTimeCheck2(MTimeCheck2::OP_PONG);
7c673cae
FG
5006 utime_t curr_time = ceph_clock_now();
5007 reply->timestamp = curr_time;
5008 reply->epoch = m->epoch;
5009 reply->round = m->round;
5010 dout(10) << __func__ << " send " << *m
5011 << " to " << m->get_source_inst() << dendl;
5012 m->get_connection()->send_message(reply);
5013}
5014
5015void Monitor::handle_timecheck(MonOpRequestRef op)
5016{
9f95a23c 5017 auto m = op->get_req<MTimeCheck2>();
7c673cae
FG
5018 dout(10) << __func__ << " " << *m << dendl;
5019
5020 if (is_leader()) {
11fdf7f2 5021 if (m->op != MTimeCheck2::OP_PONG) {
7c673cae
FG
5022 dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
5023 } else {
5024 handle_timecheck_leader(op);
5025 }
5026 } else if (is_peon()) {
11fdf7f2 5027 if (m->op != MTimeCheck2::OP_PING && m->op != MTimeCheck2::OP_REPORT) {
7c673cae
FG
5028 dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
5029 } else {
5030 handle_timecheck_peon(op);
5031 }
5032 } else {
5033 dout(1) << __func__ << " drop unexpected msg" << dendl;
5034 }
5035}
5036
5037void Monitor::handle_subscribe(MonOpRequestRef op)
5038{
9f95a23c 5039 auto m = op->get_req<MMonSubscribe>();
7c673cae
FG
5040 dout(10) << "handle_subscribe " << *m << dendl;
5041
5042 bool reply = false;
5043
5044 MonSession *s = op->get_session();
11fdf7f2
TL
5045 ceph_assert(s);
5046
5047 if (m->hostname.size()) {
5048 s->remote_host = m->hostname;
5049 }
7c673cae
FG
5050
5051 for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
5052 p != m->what.end();
5053 ++p) {
11fdf7f2
TL
5054 if (p->first == "monmap" || p->first == "config") {
5055 // these require no caps
5056 } else if (!s->is_capable("mon", MON_CAP_R)) {
5057 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
5058 << " not enough caps for " << *(op->get_req()) << " -- dropping"
5059 << dendl;
5060 continue;
5061 }
5062
7c673cae
FG
5063 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
5064 if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
5065 reply = true;
5066
5067 // remove conflicting subscribes
5068 if (logmon()->sub_name_to_id(p->first) >= 0) {
5069 for (map<string, Subscription*>::iterator it = s->sub_map.begin();
5070 it != s->sub_map.end(); ) {
5071 if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
11fdf7f2 5072 std::lock_guard l(session_map_lock);
7c673cae
FG
5073 session_map.remove_sub((it++)->second);
5074 } else {
5075 ++it;
5076 }
5077 }
5078 }
5079
5080 {
11fdf7f2 5081 std::lock_guard l(session_map_lock);
7c673cae
FG
5082 session_map.add_update_sub(s, p->first, p->second.start,
5083 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
5084 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
5085 }
5086
5087 if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
5088 dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
5089 if ((int)s->is_capable("mds", MON_CAP_R)) {
5090 Subscription *sub = s->sub_map[p->first];
11fdf7f2 5091 ceph_assert(sub != nullptr);
7c673cae
FG
5092 mdsmon()->check_sub(sub);
5093 }
5094 } else if (p->first == "osdmap") {
5095 if ((int)s->is_capable("osd", MON_CAP_R)) {
5096 if (s->osd_epoch > p->second.start) {
5097 // client needs earlier osdmaps on purpose, so reset the sent epoch
5098 s->osd_epoch = 0;
5099 }
5100 osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
5101 }
5102 } else if (p->first == "osd_pg_creates") {
5103 if ((int)s->is_capable("osd", MON_CAP_W)) {
11fdf7f2 5104 osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
7c673cae
FG
5105 }
5106 } else if (p->first == "monmap") {
5107 monmon()->check_sub(s->sub_map[p->first]);
5108 } else if (logmon()->sub_name_to_id(p->first) >= 0) {
5109 logmon()->check_sub(s->sub_map[p->first]);
5110 } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
5111 mgrmon()->check_sub(s->sub_map[p->first]);
224ce89b
WB
5112 } else if (p->first == "servicemap") {
5113 mgrstatmon()->check_sub(s->sub_map[p->first]);
11fdf7f2
TL
5114 } else if (p->first == "config") {
5115 configmon()->check_sub(s);
7c673cae
FG
5116 }
5117 }
5118
5119 if (reply) {
5120 // we only need to reply if the client is old enough to think it
5121 // has to send renewals.
5122 ConnectionRef con = m->get_connection();
5123 if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
5124 m->get_connection()->send_message(new MMonSubscribeAck(
11fdf7f2 5125 monmap->get_fsid(), (int)g_conf()->mon_subscribe_interval));
7c673cae
FG
5126 }
5127
5128}
5129
5130void Monitor::handle_get_version(MonOpRequestRef op)
5131{
9f95a23c 5132 auto m = op->get_req<MMonGetVersion>();
7c673cae
FG
5133 dout(10) << "handle_get_version " << *m << dendl;
5134 PaxosService *svc = NULL;
5135
5136 MonSession *s = op->get_session();
11fdf7f2 5137 ceph_assert(s);
7c673cae
FG
5138
5139 if (!is_leader() && !is_peon()) {
5140 dout(10) << " waiting for quorum" << dendl;
5141 waitfor_quorum.push_back(new C_RetryMessage(this, op));
5142 goto out;
5143 }
5144
5145 if (m->what == "mdsmap") {
5146 svc = mdsmon();
5147 } else if (m->what == "fsmap") {
5148 svc = mdsmon();
5149 } else if (m->what == "osdmap") {
5150 svc = osdmon();
5151 } else if (m->what == "monmap") {
5152 svc = monmon();
5153 } else {
5154 derr << "invalid map type " << m->what << dendl;
5155 }
5156
5157 if (svc) {
5158 if (!svc->is_readable()) {
5159 svc->wait_for_readable(op, new C_RetryMessage(this, op));
5160 goto out;
5161 }
5162
5163 MMonGetVersionReply *reply = new MMonGetVersionReply();
5164 reply->handle = m->handle;
5165 reply->version = svc->get_last_committed();
5166 reply->oldest_version = svc->get_first_committed();
5167 reply->set_tid(m->get_tid());
5168
5169 m->get_connection()->send_message(reply);
5170 }
5171 out:
5172 return;
5173}
5174
5175bool Monitor::ms_handle_reset(Connection *con)
5176{
5177 dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
5178
5179 // ignore lossless monitor sessions
5180 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
5181 return false;
5182
11fdf7f2
TL
5183 auto priv = con->get_priv();
5184 auto s = static_cast<MonSession*>(priv.get());
7c673cae
FG
5185 if (!s)
5186 return false;
5187
5188 // break any con <-> session ref cycle
11fdf7f2 5189 s->con->set_priv(nullptr);
7c673cae
FG
5190
5191 if (is_shutdown())
5192 return false;
5193
11fdf7f2 5194 std::lock_guard l(lock);
7c673cae 5195
11fdf7f2
TL
5196 dout(10) << "reset/close on session " << s->name << " " << s->addrs << dendl;
5197 if (!s->closed && s->item.is_on_list()) {
5198 std::lock_guard l(session_map_lock);
7c673cae
FG
5199 remove_session(s);
5200 }
7c673cae
FG
5201 return true;
5202}
5203
5204bool Monitor::ms_handle_refused(Connection *con)
5205{
5206 // just log for now...
5207 dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
5208 return false;
5209}
5210
5211// -----
5212
5213void Monitor::send_latest_monmap(Connection *con)
5214{
5215 bufferlist bl;
5216 monmap->encode(bl, con->get_features());
5217 con->send_message(new MMonMap(bl));
5218}
5219
5220void Monitor::handle_mon_get_map(MonOpRequestRef op)
5221{
9f95a23c 5222 auto m = op->get_req<MMonGetMap>();
7c673cae
FG
5223 dout(10) << "handle_mon_get_map" << dendl;
5224 send_latest_monmap(m->get_connection().get());
5225}
5226
5227void Monitor::handle_mon_metadata(MonOpRequestRef op)
5228{
9f95a23c 5229 auto m = op->get_req<MMonMetadata>();
7c673cae
FG
5230 if (is_leader()) {
5231 dout(10) << __func__ << dendl;
5232 update_mon_metadata(m->get_source().num(), std::move(m->data));
5233 }
5234}
5235
5236void Monitor::update_mon_metadata(int from, Metadata&& m)
5237{
224ce89b
WB
5238 // NOTE: this is now for legacy (kraken or jewel) mons only.
5239 pending_metadata[from] = std::move(m);
7c673cae
FG
5240
5241 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
224ce89b 5242 bufferlist bl;
11fdf7f2 5243 encode(pending_metadata, bl);
7c673cae
FG
5244 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
5245 paxos->trigger_propose();
5246}
5247
224ce89b 5248int Monitor::load_metadata()
7c673cae
FG
5249{
5250 bufferlist bl;
5251 int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
5252 if (r)
5253 return r;
11fdf7f2
TL
5254 auto it = bl.cbegin();
5255 decode(mon_metadata, it);
224ce89b
WB
5256
5257 pending_metadata = mon_metadata;
7c673cae
FG
5258 return 0;
5259}
5260
5261int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
5262{
11fdf7f2 5263 ceph_assert(f);
224ce89b 5264 if (!mon_metadata.count(mon)) {
7c673cae
FG
5265 err << "mon." << mon << " not found";
5266 return -EINVAL;
5267 }
224ce89b 5268 const Metadata& m = mon_metadata[mon];
7c673cae
FG
5269 for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
5270 f->dump_string(p->first.c_str(), p->second);
5271 }
5272 return 0;
5273}
5274
c07f9fc5 5275void Monitor::count_metadata(const string& field, map<string,int> *out)
31f18b77 5276{
224ce89b 5277 for (auto& p : mon_metadata) {
31f18b77
FG
5278 auto q = p.second.find(field);
5279 if (q == p.second.end()) {
c07f9fc5 5280 (*out)["unknown"]++;
31f18b77 5281 } else {
c07f9fc5 5282 (*out)[q->second]++;
31f18b77
FG
5283 }
5284 }
c07f9fc5
FG
5285}
5286
5287void Monitor::count_metadata(const string& field, Formatter *f)
5288{
5289 map<string,int> by_val;
5290 count_metadata(field, &by_val);
31f18b77
FG
5291 f->open_object_section(field.c_str());
5292 for (auto& p : by_val) {
5293 f->dump_int(p.first.c_str(), p.second);
5294 }
5295 f->close_section();
5296}
5297
7c673cae
FG
5298int Monitor::print_nodes(Formatter *f, ostream& err)
5299{
11fdf7f2 5300 map<string, list<string> > mons; // hostname => mon
224ce89b
WB
5301 for (map<int, Metadata>::iterator it = mon_metadata.begin();
5302 it != mon_metadata.end(); ++it) {
7c673cae
FG
5303 const Metadata& m = it->second;
5304 Metadata::const_iterator hostname = m.find("hostname");
5305 if (hostname == m.end()) {
5306 // not likely though
5307 continue;
5308 }
11fdf7f2 5309 mons[hostname->second].push_back(monmap->get_name(it->first));
7c673cae
FG
5310 }
5311
5312 dump_services(f, mons, "mon");
5313 return 0;
5314}
5315
5316// ----------------------------------------------
5317// scrub
5318
5319int Monitor::scrub_start()
5320{
5321 dout(10) << __func__ << dendl;
11fdf7f2 5322 ceph_assert(is_leader());
7c673cae
FG
5323
5324 if (!scrub_result.empty()) {
5325 clog->info() << "scrub already in progress";
5326 return -EBUSY;
5327 }
5328
5329 scrub_event_cancel();
5330 scrub_result.clear();
5331 scrub_state.reset(new ScrubState);
5332
5333 scrub();
5334 return 0;
5335}
5336
5337int Monitor::scrub()
5338{
11fdf7f2
TL
5339 ceph_assert(is_leader());
5340 ceph_assert(scrub_state);
7c673cae
FG
5341
5342 scrub_cancel_timeout();
5343 wait_for_paxos_write();
5344 scrub_version = paxos->get_version();
5345
5346
5347 // scrub all keys if we're the only monitor in the quorum
5348 int32_t num_keys =
5349 (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
5350
5351 for (set<int>::iterator p = quorum.begin();
5352 p != quorum.end();
5353 ++p) {
5354 if (*p == rank)
5355 continue;
5356 MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
5357 num_keys);
5358 r->key = scrub_state->last_key;
11fdf7f2 5359 send_mon_message(r, *p);
7c673cae
FG
5360 }
5361
5362 // scrub my keys
5363 bool r = _scrub(&scrub_result[rank],
5364 &scrub_state->last_key,
5365 &num_keys);
5366
5367 scrub_state->finished = !r;
5368
5369 // only after we got our scrub results do we really care whether the
5370 // other monitors are late on their results. Also, this way we avoid
5371 // triggering the timeout if we end up getting stuck in _scrub() for
5372 // longer than the duration of the timeout.
5373 scrub_reset_timeout();
5374
5375 if (quorum.size() == 1) {
11fdf7f2 5376 ceph_assert(scrub_state->finished == true);
7c673cae
FG
5377 scrub_finish();
5378 }
5379 return 0;
5380}
5381
5382void Monitor::handle_scrub(MonOpRequestRef op)
5383{
9f95a23c 5384 auto m = op->get_req<MMonScrub>();
7c673cae
FG
5385 dout(10) << __func__ << " " << *m << dendl;
5386 switch (m->op) {
5387 case MMonScrub::OP_SCRUB:
5388 {
5389 if (!is_peon())
5390 break;
5391
5392 wait_for_paxos_write();
5393
5394 if (m->version != paxos->get_version())
5395 break;
5396
5397 MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
5398 m->version,
5399 m->num_keys);
5400
5401 reply->key = m->key;
5402 _scrub(&reply->result, &reply->key, &reply->num_keys);
5403 m->get_connection()->send_message(reply);
5404 }
5405 break;
5406
5407 case MMonScrub::OP_RESULT:
5408 {
5409 if (!is_leader())
5410 break;
5411 if (m->version != scrub_version)
5412 break;
5413 // reset the timeout each time we get a result
5414 scrub_reset_timeout();
5415
5416 int from = m->get_source().num();
11fdf7f2 5417 ceph_assert(scrub_result.count(from) == 0);
7c673cae
FG
5418 scrub_result[from] = m->result;
5419
5420 if (scrub_result.size() == quorum.size()) {
5421 scrub_check_results();
5422 scrub_result.clear();
5423 if (scrub_state->finished)
5424 scrub_finish();
5425 else
5426 scrub();
5427 }
5428 }
5429 break;
5430 }
5431}
5432
5433bool Monitor::_scrub(ScrubResult *r,
5434 pair<string,string> *start,
5435 int *num_keys)
5436{
11fdf7f2
TL
5437 ceph_assert(r != NULL);
5438 ceph_assert(start != NULL);
5439 ceph_assert(num_keys != NULL);
7c673cae
FG
5440
5441 set<string> prefixes = get_sync_targets_names();
5442 prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5443
5444 dout(10) << __func__ << " start (" << *start << ")"
5445 << " num_keys " << *num_keys << dendl;
5446
5447 MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
5448
5449 int scrubbed_keys = 0;
5450 pair<string,string> last_key;
5451
5452 while (it->has_next_chunk()) {
5453
5454 if (*num_keys > 0 && scrubbed_keys == *num_keys)
5455 break;
5456
5457 pair<string,string> k = it->get_next_key();
5458 if (prefixes.count(k.first) == 0)
5459 continue;
5460
5461 if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
5462 (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
5463 dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
5464 << dendl;
5465 continue;
5466 }
5467
5468 bufferlist bl;
224ce89b 5469 int err = store->get(k.first, k.second, bl);
11fdf7f2 5470 ceph_assert(err == 0);
224ce89b 5471
7c673cae
FG
5472 uint32_t key_crc = bl.crc32c(0);
5473 dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
5474 << " crc " << key_crc << dendl;
5475 r->prefix_keys[k.first]++;
224ce89b 5476 if (r->prefix_crc.count(k.first) == 0) {
7c673cae 5477 r->prefix_crc[k.first] = 0;
224ce89b 5478 }
7c673cae
FG
5479 r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
5480
5481 if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
5482 (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
5483 dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
5484 r->prefix_crc[k.first] += 1;
5485 }
5486
5487 ++scrubbed_keys;
5488 last_key = k;
5489 }
5490
5491 dout(20) << __func__ << " last_key (" << last_key << ")"
5492 << " scrubbed_keys " << scrubbed_keys
5493 << " has_next " << it->has_next_chunk() << dendl;
5494
5495 *start = last_key;
5496 *num_keys = scrubbed_keys;
5497
5498 return it->has_next_chunk();
5499}
5500
5501void Monitor::scrub_check_results()
5502{
5503 dout(10) << __func__ << dendl;
5504
5505 // compare
5506 int errors = 0;
5507 ScrubResult& mine = scrub_result[rank];
5508 for (map<int,ScrubResult>::iterator p = scrub_result.begin();
5509 p != scrub_result.end();
5510 ++p) {
5511 if (p->first == rank)
5512 continue;
5513 if (p->second != mine) {
5514 ++errors;
5515 clog->error() << "scrub mismatch";
5516 clog->error() << " mon." << rank << " " << mine;
5517 clog->error() << " mon." << p->first << " " << p->second;
5518 }
5519 }
5520 if (!errors)
d2e6a577 5521 clog->debug() << "scrub ok on " << quorum << ": " << mine;
7c673cae
FG
5522}
5523
5524inline void Monitor::scrub_timeout()
5525{
5526 dout(1) << __func__ << " restarting scrub" << dendl;
5527 scrub_reset();
5528 scrub_start();
5529}
5530
5531void Monitor::scrub_finish()
5532{
5533 dout(10) << __func__ << dendl;
5534 scrub_reset();
5535 scrub_event_start();
5536}
5537
5538void Monitor::scrub_reset()
5539{
5540 dout(10) << __func__ << dendl;
5541 scrub_cancel_timeout();
5542 scrub_version = 0;
5543 scrub_result.clear();
5544 scrub_state.reset();
5545}
5546
5547inline void Monitor::scrub_update_interval(int secs)
5548{
5549 // we don't care about changes if we are not the leader.
5550 // changes will be visible if we become the leader.
5551 if (!is_leader())
5552 return;
5553
5554 dout(1) << __func__ << " new interval = " << secs << dendl;
5555
5556 // if scrub already in progress, all changes will already be visible during
5557 // the next round. Nothing to do.
5558 if (scrub_state != NULL)
5559 return;
5560
5561 scrub_event_cancel();
5562 scrub_event_start();
5563}
5564
5565void Monitor::scrub_event_start()
5566{
5567 dout(10) << __func__ << dendl;
5568
5569 if (scrub_event)
5570 scrub_event_cancel();
5571
5572 if (cct->_conf->mon_scrub_interval <= 0) {
5573 dout(1) << __func__ << " scrub event is disabled"
5574 << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval
5575 << ")" << dendl;
5576 return;
5577 }
5578
3efd9988
FG
5579 scrub_event = timer.add_event_after(
5580 cct->_conf->mon_scrub_interval,
9f95a23c 5581 new C_MonContext{this, [this](int) {
7c673cae 5582 scrub_start();
9f95a23c 5583 }});
7c673cae
FG
5584}
5585
5586void Monitor::scrub_event_cancel()
5587{
5588 dout(10) << __func__ << dendl;
5589 if (scrub_event) {
5590 timer.cancel_event(scrub_event);
5591 scrub_event = NULL;
5592 }
5593}
5594
5595inline void Monitor::scrub_cancel_timeout()
5596{
5597 if (scrub_timeout_event) {
5598 timer.cancel_event(scrub_timeout_event);
5599 scrub_timeout_event = NULL;
5600 }
5601}
5602
5603void Monitor::scrub_reset_timeout()
5604{
5605 dout(15) << __func__ << " reset timeout event" << dendl;
5606 scrub_cancel_timeout();
3efd9988 5607 scrub_timeout_event = timer.add_event_after(
11fdf7f2 5608 g_conf()->mon_scrub_timeout,
9f95a23c 5609 new C_MonContext{this, [this](int) {
7c673cae 5610 scrub_timeout();
9f95a23c 5611 }});
7c673cae
FG
5612}
5613
5614/************ TICK ***************/
5615void Monitor::new_tick()
5616{
9f95a23c 5617 timer.add_event_after(g_conf()->mon_tick_interval, new C_MonContext{this, [this](int) {
7c673cae 5618 tick();
9f95a23c 5619 }});
7c673cae
FG
5620}
5621
5622void Monitor::tick()
5623{
5624 // ok go.
5625 dout(11) << "tick" << dendl;
181888fb 5626 const utime_t now = ceph_clock_now();
7c673cae 5627
181888fb
FG
5628 // Check if we need to emit any delayed health check updated messages
5629 if (is_leader()) {
11fdf7f2 5630 const auto min_period = g_conf().get_val<int64_t>(
181888fb
FG
5631 "mon_health_log_update_period");
5632 for (auto& svc : paxos_service) {
5633 auto health = svc->get_health_checks();
5634
5635 for (const auto &i : health.checks) {
5636 const std::string &code = i.first;
5637 const std::string &summary = i.second.summary;
5638 const health_status_t severity = i.second.severity;
5639
5640 auto status_iter = health_check_log_times.find(code);
5641 if (status_iter == health_check_log_times.end()) {
5642 continue;
5643 }
5644
5645 auto &log_status = status_iter->second;
5646 bool const changed = log_status.last_message != summary
5647 || log_status.severity != severity;
5648
5649 if (changed && now - log_status.updated_at > min_period) {
5650 log_status.last_message = summary;
5651 log_status.updated_at = now;
5652 log_status.severity = severity;
5653
5654 ostringstream ss;
5655 ss << "Health check update: " << summary << " (" << code << ")";
5656 clog->health(severity) << ss.str();
5657 }
5658 }
5659 }
5660 }
5661
5662
11fdf7f2
TL
5663 for (auto& svc : paxos_service) {
5664 svc->tick();
5665 svc->maybe_trim();
7c673cae
FG
5666 }
5667
5668 // trim sessions
7c673cae 5669 {
11fdf7f2 5670 std::lock_guard l(session_map_lock);
7c673cae
FG
5671 auto p = session_map.sessions.begin();
5672
5673 bool out_for_too_long = (!exited_quorum.is_zero() &&
11fdf7f2 5674 now > (exited_quorum + 2*g_conf()->mon_lease));
7c673cae
FG
5675
5676 while (!p.end()) {
5677 MonSession *s = *p;
5678 ++p;
5679
5680 // don't trim monitors
11fdf7f2 5681 if (s->name.is_mon())
7c673cae
FG
5682 continue;
5683
5684 if (s->session_timeout < now && s->con) {
5685 // check keepalive, too
5686 s->session_timeout = s->con->get_last_keepalive();
11fdf7f2 5687 s->session_timeout += g_conf()->mon_session_timeout;
7c673cae
FG
5688 }
5689 if (s->session_timeout < now) {
11fdf7f2
TL
5690 dout(10) << " trimming session " << s->con << " " << s->name
5691 << " " << s->addrs
7c673cae
FG
5692 << " (timeout " << s->session_timeout
5693 << " < now " << now << ")" << dendl;
5694 } else if (out_for_too_long) {
5695 // boot the client Session because we've taken too long getting back in
11fdf7f2 5696 dout(10) << " trimming session " << s->con << " " << s->name
7c673cae
FG
5697 << " because we've been out of quorum too long" << dendl;
5698 } else {
5699 continue;
5700 }
5701
5702 s->con->mark_down();
5703 remove_session(s);
5704 logger->inc(l_mon_session_trim);
5705 }
5706 }
5707 sync_trim_providers();
5708
5709 if (!maybe_wait_for_quorum.empty()) {
5710 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5711 }
5712
5713 if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5714 // this is only necessary on upgraded clusters.
5715 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5716 prepare_new_fingerprint(t);
5717 paxos->trigger_propose();
5718 }
5719
11fdf7f2 5720 mgr_client.update_daemon_health(get_health_metrics());
7c673cae
FG
5721 new_tick();
5722}
5723
11fdf7f2
TL
5724vector<DaemonHealthMetric> Monitor::get_health_metrics()
5725{
5726 vector<DaemonHealthMetric> metrics;
5727
5728 utime_t oldest_secs;
5729 const utime_t now = ceph_clock_now();
5730 auto too_old = now;
5731 too_old -= g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count();
5732 int slow = 0;
5733 TrackedOpRef oldest_op;
5734 auto count_slow_ops = [&](TrackedOp& op) {
5735 if (op.get_initiated() < too_old) {
5736 slow++;
5737 if (!oldest_op || op.get_initiated() < oldest_op->get_initiated()) {
5738 oldest_op = &op;
5739 }
5740 return true;
5741 } else {
5742 return false;
5743 }
5744 };
5745 if (op_tracker.visit_ops_in_flight(&oldest_secs, count_slow_ops)) {
5746 if (slow) {
5747 derr << __func__ << " reporting " << slow << " slow ops, oldest is "
5748 << oldest_op->get_desc() << dendl;
5749 }
5750 metrics.emplace_back(daemon_metric::SLOW_OPS, slow, oldest_secs);
5751 } else {
5752 metrics.emplace_back(daemon_metric::SLOW_OPS, 0, 0);
5753 }
5754 return metrics;
5755}
5756
7c673cae
FG
5757void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5758{
5759 uuid_d nf;
5760 nf.generate_random();
5761 dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5762
5763 bufferlist bl;
11fdf7f2 5764 encode(nf, bl);
7c673cae
FG
5765 t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5766}
5767
5768int Monitor::check_fsid()
5769{
5770 bufferlist ebl;
5771 int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5772 if (r == -ENOENT)
5773 return r;
11fdf7f2 5774 ceph_assert(r == 0);
7c673cae
FG
5775
5776 string es(ebl.c_str(), ebl.length());
5777
5778 // only keep the first line
5779 size_t pos = es.find_first_of('\n');
5780 if (pos != string::npos)
5781 es.resize(pos);
5782
5783 dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
5784 uuid_d ondisk;
5785 if (!ondisk.parse(es.c_str())) {
5786 derr << "error: unable to parse uuid" << dendl;
5787 return -EINVAL;
5788 }
5789
5790 if (monmap->get_fsid() != ondisk) {
5791 derr << "error: cluster_uuid file exists with value " << ondisk
5792 << ", != our uuid " << monmap->get_fsid() << dendl;
5793 return -EEXIST;
5794 }
5795
5796 return 0;
5797}
5798
5799int Monitor::write_fsid()
5800{
5801 auto t(std::make_shared<MonitorDBStore::Transaction>());
5802 write_fsid(t);
5803 int r = store->apply_transaction(t);
5804 return r;
5805}
5806
5807int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
5808{
5809 ostringstream ss;
5810 ss << monmap->get_fsid() << "\n";
5811 string us = ss.str();
5812
5813 bufferlist b;
5814 b.append(us);
5815
5816 t->put(MONITOR_NAME, "cluster_uuid", b);
5817 return 0;
5818}
5819
5820/*
5821 * this is the closest thing to a traditional 'mkfs' for ceph.
5822 * initialize the monitor state machines to their initial values.
5823 */
5824int Monitor::mkfs(bufferlist& osdmapbl)
5825{
5826 auto t(std::make_shared<MonitorDBStore::Transaction>());
5827
5828 // verify cluster fsid
5829 int r = check_fsid();
5830 if (r < 0 && r != -ENOENT)
5831 return r;
5832
5833 bufferlist magicbl;
5834 magicbl.append(CEPH_MON_ONDISK_MAGIC);
5835 magicbl.append("\n");
5836 t->put(MONITOR_NAME, "magic", magicbl);
5837
5838
5839 features = get_initial_supported_features();
5840 write_features(t);
5841
5842 // save monmap, osdmap, keyring.
5843 bufferlist monmapbl;
5844 monmap->encode(monmapbl, CEPH_FEATURES_ALL);
5845 monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5846 t->put("mkfs", "monmap", monmapbl);
5847
5848 if (osdmapbl.length()) {
5849 // make sure it's a valid osdmap
5850 try {
5851 OSDMap om;
5852 om.decode(osdmapbl);
5853 }
5854 catch (buffer::error& e) {
5855 derr << "error decoding provided osdmap: " << e.what() << dendl;
5856 return -EINVAL;
5857 }
5858 t->put("mkfs", "osdmap", osdmapbl);
5859 }
5860
5861 if (is_keyring_required()) {
5862 KeyRing keyring;
5863 string keyring_filename;
5864
11fdf7f2 5865 r = ceph_resolve_file_search(g_conf()->keyring, keyring_filename);
7c673cae 5866 if (r) {
11fdf7f2 5867 derr << "unable to find a keyring file on " << g_conf()->keyring
7c673cae 5868 << ": " << cpp_strerror(r) << dendl;
11fdf7f2
TL
5869 if (g_conf()->key != "") {
5870 string keyring_plaintext = "[mon.]\n\tkey = " + g_conf()->key +
7c673cae
FG
5871 "\n\tcaps mon = \"allow *\"\n";
5872 bufferlist bl;
5873 bl.append(keyring_plaintext);
5874 try {
11fdf7f2 5875 auto i = bl.cbegin();
7c673cae
FG
5876 keyring.decode_plaintext(i);
5877 }
5878 catch (const buffer::error& e) {
5879 derr << "error decoding keyring " << keyring_plaintext
5880 << ": " << e.what() << dendl;
5881 return -EINVAL;
5882 }
5883 } else {
5884 return -ENOENT;
5885 }
5886 } else {
5887 r = keyring.load(g_ceph_context, keyring_filename);
5888 if (r < 0) {
11fdf7f2 5889 derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
7c673cae
FG
5890 return r;
5891 }
5892 }
5893
5894 // put mon. key in external keyring; seed with everything else.
5895 extract_save_mon_key(keyring);
5896
5897 bufferlist keyringbl;
5898 keyring.encode_plaintext(keyringbl);
5899 t->put("mkfs", "keyring", keyringbl);
5900 }
5901 write_fsid(t);
5902 store->apply_transaction(t);
5903
5904 return 0;
5905}
5906
5907int Monitor::write_default_keyring(bufferlist& bl)
5908{
5909 ostringstream os;
11fdf7f2 5910 os << g_conf()->mon_data << "/keyring";
7c673cae
FG
5911
5912 int err = 0;
91327a77 5913 int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT|O_CLOEXEC, 0600);
7c673cae
FG
5914 if (fd < 0) {
5915 err = -errno;
5916 dout(0) << __func__ << " failed to open " << os.str()
5917 << ": " << cpp_strerror(err) << dendl;
5918 return err;
5919 }
5920
5921 err = bl.write_fd(fd);
5922 if (!err)
5923 ::fsync(fd);
5924 VOID_TEMP_FAILURE_RETRY(::close(fd));
5925
5926 return err;
5927}
5928
5929void Monitor::extract_save_mon_key(KeyRing& keyring)
5930{
5931 EntityName mon_name;
5932 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
5933 EntityAuth mon_key;
5934 if (keyring.get_auth(mon_name, mon_key)) {
5935 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
5936 KeyRing pkey;
5937 pkey.add(mon_name, mon_key);
5938 bufferlist bl;
5939 pkey.encode_plaintext(bl);
5940 write_default_keyring(bl);
5941 keyring.remove(mon_name);
5942 }
5943}
5944
11fdf7f2
TL
5945// AuthClient methods -- for mon <-> mon communication
5946int Monitor::get_auth_request(
5947 Connection *con,
5948 AuthConnectionMeta *auth_meta,
5949 uint32_t *method,
5950 vector<uint32_t> *preferred_modes,
5951 bufferlist *out)
5952{
5953 std::scoped_lock l(auth_lock);
5954 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
5955 con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
5956 return -EACCES;
5957 }
5958 AuthAuthorizer *auth;
9f95a23c 5959 if (!get_authorizer(con->get_peer_type(), &auth)) {
11fdf7f2
TL
5960 return -EACCES;
5961 }
5962 auth_meta->authorizer.reset(auth);
5963 auth_registry.get_supported_modes(con->get_peer_type(),
5964 auth->protocol,
5965 preferred_modes);
5966 *method = auth->protocol;
5967 *out = auth->bl;
5968 return 0;
5969}
5970
5971int Monitor::handle_auth_reply_more(
5972 Connection *con,
5973 AuthConnectionMeta *auth_meta,
5974 const bufferlist& bl,
5975 bufferlist *reply)
5976{
5977 std::scoped_lock l(auth_lock);
5978 if (!auth_meta->authorizer) {
5979 derr << __func__ << " no authorizer?" << dendl;
5980 return -EACCES;
5981 }
5982 auth_meta->authorizer->add_challenge(cct, bl);
5983 *reply = auth_meta->authorizer->bl;
5984 return 0;
5985}
5986
5987int Monitor::handle_auth_done(
5988 Connection *con,
5989 AuthConnectionMeta *auth_meta,
5990 uint64_t global_id,
5991 uint32_t con_mode,
5992 const bufferlist& bl,
5993 CryptoKey *session_key,
5994 std::string *connection_secret)
5995{
5996 std::scoped_lock l(auth_lock);
5997 // verify authorizer reply
5998 auto p = bl.begin();
5999 if (!auth_meta->authorizer->verify_reply(p, connection_secret)) {
6000 dout(0) << __func__ << " failed verifying authorizer reply" << dendl;
6001 return -EACCES;
6002 }
6003 auth_meta->session_key = auth_meta->authorizer->session_key;
6004 return 0;
6005}
6006
6007int Monitor::handle_auth_bad_method(
6008 Connection *con,
6009 AuthConnectionMeta *auth_meta,
6010 uint32_t old_auth_method,
6011 int result,
6012 const std::vector<uint32_t>& allowed_methods,
6013 const std::vector<uint32_t>& allowed_modes)
6014{
6015 derr << __func__ << " hmm, they didn't like " << old_auth_method
6016 << " result " << cpp_strerror(result) << dendl;
6017 return -EACCES;
6018}
6019
9f95a23c 6020bool Monitor::get_authorizer(int service_id, AuthAuthorizer **authorizer)
7c673cae 6021{
9f95a23c 6022 dout(10) << "get_authorizer for " << ceph_entity_type_name(service_id)
7c673cae
FG
6023 << dendl;
6024
6025 if (is_shutdown())
6026 return false;
6027
6028 // we only connect to other monitors and mgr; every else connects to us.
6029 if (service_id != CEPH_ENTITY_TYPE_MON &&
6030 service_id != CEPH_ENTITY_TYPE_MGR)
6031 return false;
6032
6033 if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
6034 // auth_none
6035 dout(20) << __func__ << " building auth_none authorizer" << dendl;
11fdf7f2 6036 AuthNoneClientHandler handler{g_ceph_context};
7c673cae
FG
6037 handler.set_global_id(0);
6038 *authorizer = handler.build_authorizer(service_id);
6039 return true;
6040 }
6041
6042 CephXServiceTicketInfo auth_ticket_info;
6043 CephXSessionAuthInfo info;
6044 int ret;
6045
6046 EntityName name;
6047 name.set_type(CEPH_ENTITY_TYPE_MON);
6048 auth_ticket_info.ticket.name = name;
6049 auth_ticket_info.ticket.global_id = 0;
6050
6051 if (service_id == CEPH_ENTITY_TYPE_MON) {
6052 // mon to mon authentication uses the private monitor shared key and not the
6053 // rotating key
6054 CryptoKey secret;
6055 if (!keyring.get_secret(name, secret) &&
6056 !key_server.get_secret(name, secret)) {
6057 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
6058 << dendl;
6059 stringstream ss, ds;
6060 int err = key_server.list_secrets(ds);
6061 if (err < 0)
6062 ss << "no installed auth entries!";
6063 else
6064 ss << "installed auth entries:";
6065 dout(0) << ss.str() << "\n" << ds.str() << dendl;
6066 return false;
6067 }
6068
11fdf7f2
TL
6069 ret = key_server.build_session_auth_info(
6070 service_id, auth_ticket_info.ticket, info, secret, (uint64_t)-1);
7c673cae
FG
6071 if (ret < 0) {
6072 dout(0) << __func__ << " failed to build mon session_auth_info "
6073 << cpp_strerror(ret) << dendl;
6074 return false;
6075 }
6076 } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
6077 // mgr
11fdf7f2
TL
6078 ret = key_server.build_session_auth_info(
6079 service_id, auth_ticket_info.ticket, info);
7c673cae
FG
6080 if (ret < 0) {
6081 derr << __func__ << " failed to build mgr service session_auth_info "
6082 << cpp_strerror(ret) << dendl;
6083 return false;
6084 }
6085 } else {
6086 ceph_abort(); // see check at top of fn
6087 }
6088
6089 CephXTicketBlob blob;
6090 if (!cephx_build_service_ticket_blob(cct, info, blob)) {
9f95a23c 6091 dout(0) << "get_authorizer failed to build service ticket" << dendl;
7c673cae
FG
6092 return false;
6093 }
6094 bufferlist ticket_data;
11fdf7f2 6095 encode(blob, ticket_data);
7c673cae 6096
11fdf7f2 6097 auto iter = ticket_data.cbegin();
7c673cae 6098 CephXTicketHandler handler(g_ceph_context, service_id);
11fdf7f2 6099 decode(handler.ticket, iter);
7c673cae
FG
6100
6101 handler.session_key = info.session_key;
6102
6103 *authorizer = handler.build_authorizer(0);
6104
6105 return true;
6106}
6107
11fdf7f2
TL
6108int Monitor::handle_auth_request(
6109 Connection *con,
6110 AuthConnectionMeta *auth_meta,
6111 bool more,
6112 uint32_t auth_method,
6113 const bufferlist &payload,
6114 bufferlist *reply)
6115{
6116 std::scoped_lock l(auth_lock);
7c673cae 6117
11fdf7f2
TL
6118 // NOTE: be careful, the Connection hasn't fully negotiated yet, so
6119 // e.g., peer_features, peer_addrs, and others are still unknown.
6120
6121 dout(10) << __func__ << " con " << con << (more ? " (more)":" (start)")
6122 << " method " << auth_method
6123 << " payload " << payload.length()
6124 << dendl;
9f95a23c
TL
6125 if (!payload.length()) {
6126 if (!con->is_msgr2() &&
6127 con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
6128 // for v1 connections, we tolerate no authorizer (from
6129 // non-monitors), because authentication happens via MAuth
6130 // messages.
6131 return 1;
6132 }
6133 return -EACCES;
6134 }
11fdf7f2
TL
6135 if (!more) {
6136 auth_meta->auth_mode = payload[0];
6137 }
6138
6139 if (auth_meta->auth_mode >= AUTH_MODE_AUTHORIZER &&
6140 auth_meta->auth_mode <= AUTH_MODE_AUTHORIZER_MAX) {
6141 AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
6142 auth_method);
6143 if (!ah) {
6144 lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
6145 << auth_method << dendl;
6146 return -EOPNOTSUPP;
6147 }
6148 bool was_challenge = (bool)auth_meta->authorizer_challenge;
6149 bool isvalid = ah->verify_authorizer(
6150 cct,
9f95a23c 6151 keyring,
11fdf7f2
TL
6152 payload,
6153 auth_meta->get_connection_secret_length(),
6154 reply,
6155 &con->peer_name,
6156 &con->peer_global_id,
6157 &con->peer_caps_info,
6158 &auth_meta->session_key,
6159 &auth_meta->connection_secret,
6160 &auth_meta->authorizer_challenge);
6161 if (isvalid) {
6162 ms_handle_authentication(con);
6163 return 1;
6164 }
6165 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
6166 return 0;
6167 }
6168 dout(10) << __func__ << " bad authorizer on " << con << dendl;
6169 return -EACCES;
eafe8130 6170 } else if (auth_meta->auth_mode < AUTH_MODE_MON ||
11fdf7f2
TL
6171 auth_meta->auth_mode > AUTH_MODE_MON_MAX) {
6172 derr << __func__ << " unrecognized auth mode " << auth_meta->auth_mode
6173 << dendl;
6174 return -EACCES;
6175 }
6176
6177 // wait until we've formed an initial quorum on mkfs so that we have
6178 // the initial keys (e.g., client.admin).
6179 if (authmon()->get_last_committed() == 0) {
6180 dout(10) << __func__ << " haven't formed initial quorum, EBUSY" << dendl;
6181 return -EBUSY;
6182 }
6183
6184 RefCountedPtr priv;
6185 MonSession *s;
6186 int32_t r = 0;
6187 auto p = payload.begin();
6188 if (!more) {
6189 if (con->get_priv()) {
6190 return -EACCES; // wtf
6191 }
6192
6193 // handler?
6194 unique_ptr<AuthServiceHandler> auth_handler{get_auth_service_handler(
6195 auth_method, g_ceph_context, &key_server)};
6196 if (!auth_handler) {
6197 dout(1) << __func__ << " auth_method " << auth_method << " not supported"
6198 << dendl;
6199 return -EOPNOTSUPP;
6200 }
6201
6202 uint8_t mode;
6203 EntityName entity_name;
6204
6205 try {
6206 decode(mode, p);
6207 if (mode < AUTH_MODE_MON ||
6208 mode > AUTH_MODE_MON_MAX) {
6209 dout(1) << __func__ << " invalid mode " << (int)mode << dendl;
6210 return -EACCES;
6211 }
6212 assert(mode >= AUTH_MODE_MON && mode <= AUTH_MODE_MON_MAX);
6213 decode(entity_name, p);
6214 decode(con->peer_global_id, p);
6215 } catch (buffer::error& e) {
6216 dout(1) << __func__ << " failed to decode, " << e.what() << dendl;
6217 return -EACCES;
6218 }
6219
6220 // supported method?
6221 if (entity_name.get_type() == CEPH_ENTITY_TYPE_MON ||
6222 entity_name.get_type() == CEPH_ENTITY_TYPE_OSD ||
6223 entity_name.get_type() == CEPH_ENTITY_TYPE_MDS ||
6224 entity_name.get_type() == CEPH_ENTITY_TYPE_MGR) {
6225 if (!auth_cluster_required.is_supported_auth(auth_method)) {
6226 dout(10) << __func__ << " entity " << entity_name << " method "
6227 << auth_method << " not among supported "
6228 << auth_cluster_required.get_supported_set() << dendl;
6229 return -EOPNOTSUPP;
7c673cae
FG
6230 }
6231 } else {
11fdf7f2
TL
6232 if (!auth_service_required.is_supported_auth(auth_method)) {
6233 dout(10) << __func__ << " entity " << entity_name << " method "
6234 << auth_method << " not among supported "
6235 << auth_cluster_required.get_supported_set() << dendl;
6236 return -EOPNOTSUPP;
6237 }
6238 }
6239
6240 // for msgr1 we would do some weirdness here to ensure signatures
6241 // are supported by the client if we require it. for msgr2 that
6242 // is not necessary.
6243
6244 if (!con->peer_global_id) {
6245 con->peer_global_id = authmon()->_assign_global_id();
6246 if (!con->peer_global_id) {
6247 dout(1) << __func__ << " failed to assign global_id" << dendl;
6248 return -EBUSY;
6249 }
6250 dout(10) << __func__ << " assigned global_id " << con->peer_global_id
6251 << dendl;
6252 }
6253
6254 // set up partial session
6255 s = new MonSession(con);
6256 s->auth_handler = auth_handler.release();
6257 con->set_priv(RefCountedPtr{s, false});
6258
6259 r = s->auth_handler->start_session(
6260 entity_name,
6261 auth_meta->get_connection_secret_length(),
6262 reply,
6263 &con->peer_caps_info,
6264 &auth_meta->session_key,
6265 &auth_meta->connection_secret);
6266 } else {
6267 priv = con->get_priv();
6268 if (!priv) {
6269 // this can happen if the async ms_handle_reset event races with
6270 // the unlocked call into handle_auth_request
6271 return -EACCES;
7c673cae 6272 }
11fdf7f2
TL
6273 s = static_cast<MonSession*>(priv.get());
6274 r = s->auth_handler->handle_request(
6275 p,
6276 auth_meta->get_connection_secret_length(),
6277 reply,
6278 &con->peer_global_id,
6279 &con->peer_caps_info,
6280 &auth_meta->session_key,
6281 &auth_meta->connection_secret);
6282 }
6283 if (r > 0 &&
6284 !s->authenticated) {
6285 ms_handle_authentication(con);
6286 }
6287
6288 dout(30) << " r " << r << " reply:\n";
6289 reply->hexdump(*_dout);
6290 *_dout << dendl;
6291 return r;
6292}
6293
6294void Monitor::ms_handle_accept(Connection *con)
6295{
6296 auto priv = con->get_priv();
6297 MonSession *s = static_cast<MonSession*>(priv.get());
6298 if (!s) {
6299 // legacy protocol v1?
6300 dout(10) << __func__ << " con " << con << " no session" << dendl;
6301 return;
6302 }
6303
6304 if (s->item.is_on_list()) {
6305 dout(10) << __func__ << " con " << con << " session " << s
6306 << " already on list" << dendl;
7c673cae 6307 } else {
11fdf7f2
TL
6308 dout(10) << __func__ << " con " << con << " session " << s
6309 << " registering session for "
6310 << con->get_peer_addrs() << dendl;
6311 s->_ident(entity_name_t(con->get_peer_type(), con->get_peer_id()),
6312 con->get_peer_addrs());
6313 std::lock_guard l(session_map_lock);
6314 session_map.add_session(s);
7c673cae 6315 }
11fdf7f2
TL
6316}
6317
6318int Monitor::ms_handle_authentication(Connection *con)
6319{
6320 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
6321 // mon <-> mon connections need no Session, and setting one up
6322 // creates an awkward ref cycle between Session and Connection.
6323 return 1;
6324 }
6325
6326 auto priv = con->get_priv();
6327 MonSession *s = static_cast<MonSession*>(priv.get());
6328 if (!s) {
6329 // must be msgr2, otherwise dispatch would have set up the session.
6330 s = session_map.new_session(
6331 entity_name_t(con->get_peer_type(), -1), // we don't know yet
6332 con->get_peer_addrs(),
6333 con);
6334 assert(s);
6335 dout(10) << __func__ << " adding session " << s << " to con " << con
6336 << dendl;
6337 con->set_priv(s);
6338 logger->set(l_mon_num_sessions, session_map.get_size());
6339 logger->inc(l_mon_session_add);
6340 }
6341 dout(10) << __func__ << " session " << s << " con " << con
6342 << " addr " << s->con->get_peer_addr()
6343 << " " << *s << dendl;
6344
6345 AuthCapsInfo &caps_info = con->get_peer_caps_info();
6346 int ret = 0;
6347 if (caps_info.allow_all) {
6348 s->caps.set_allow_all();
6349 s->authenticated = true;
6350 ret = 1;
9f95a23c 6351 } else if (caps_info.caps.length()) {
11fdf7f2
TL
6352 bufferlist::const_iterator p = caps_info.caps.cbegin();
6353 string str;
6354 try {
6355 decode(str, p);
6356 } catch (const buffer::error &err) {
6357 derr << __func__ << " corrupt cap data for " << con->get_peer_entity_name()
6358 << " in auth db" << dendl;
6359 str.clear();
9f95a23c 6360 ret = -EACCES;
11fdf7f2
TL
6361 }
6362 if (ret >= 0) {
6363 if (s->caps.parse(str, NULL)) {
6364 s->authenticated = true;
6365 ret = 1;
6366 } else {
6367 derr << __func__ << " unparseable caps '" << str << "' for "
6368 << con->get_peer_entity_name() << dendl;
9f95a23c 6369 ret = -EACCES;
11fdf7f2
TL
6370 }
6371 }
6372 }
6373
6374 return ret;
7c673cae 6375}