]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/Monitor.cc
eecd2f68c315af2efb9080e33d05d859ddac9489
[ceph.git] / ceph / src / mon / Monitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include <iterator>
17 #include <sstream>
18 #include <tuple>
19 #include <stdlib.h>
20 #include <signal.h>
21 #include <limits.h>
22 #include <cstring>
23 #include <boost/scope_exit.hpp>
24 #include <boost/algorithm/string/predicate.hpp>
25
26 #include "json_spirit/json_spirit_reader.h"
27 #include "json_spirit/json_spirit_writer.h"
28
29 #include "Monitor.h"
30 #include "common/version.h"
31 #include "common/blkdev.h"
32 #include "common/cmdparse.h"
33 #include "common/signal.h"
34
35 #include "osd/OSDMap.h"
36
37 #include "MonitorDBStore.h"
38
39 #include "messages/PaxosServiceMessage.h"
40 #include "messages/MMonMap.h"
41 #include "messages/MMonGetMap.h"
42 #include "messages/MMonGetVersion.h"
43 #include "messages/MMonGetVersionReply.h"
44 #include "messages/MGenericMessage.h"
45 #include "messages/MMonCommand.h"
46 #include "messages/MMonCommandAck.h"
47 #include "messages/MMonMetadata.h"
48 #include "messages/MMonSync.h"
49 #include "messages/MMonScrub.h"
50 #include "messages/MMonProbe.h"
51 #include "messages/MMonJoin.h"
52 #include "messages/MMonPaxos.h"
53 #include "messages/MRoute.h"
54 #include "messages/MForward.h"
55
56 #include "messages/MMonSubscribe.h"
57 #include "messages/MMonSubscribeAck.h"
58
59 #include "messages/MCommand.h"
60 #include "messages/MCommandReply.h"
61
62 #include "messages/MAuthReply.h"
63
64 #include "messages/MTimeCheck2.h"
65 #include "messages/MPing.h"
66
67 #include "common/strtol.h"
68 #include "common/ceph_argparse.h"
69 #include "common/Timer.h"
70 #include "common/Clock.h"
71 #include "common/errno.h"
72 #include "common/perf_counters.h"
73 #include "common/admin_socket.h"
74 #include "global/signal_handler.h"
75 #include "common/Formatter.h"
76 #include "include/stringify.h"
77 #include "include/color.h"
78 #include "include/ceph_fs.h"
79 #include "include/str_list.h"
80
81 #include "OSDMonitor.h"
82 #include "MDSMonitor.h"
83 #include "MonmapMonitor.h"
84 #include "LogMonitor.h"
85 #include "AuthMonitor.h"
86 #include "MgrMonitor.h"
87 #include "MgrStatMonitor.h"
88 #include "ConfigMonitor.h"
89 #include "mon/QuorumService.h"
90 #include "mon/HealthMonitor.h"
91 #include "mon/ConfigKeyService.h"
92 #include "common/config.h"
93 #include "common/cmdparse.h"
94 #include "include/ceph_assert.h"
95 #include "include/compat.h"
96 #include "perfglue/heap_profiler.h"
97
98 #include "auth/none/AuthNoneClientHandler.h"
99
100 #define dout_subsys ceph_subsys_mon
101 #undef dout_prefix
102 #define dout_prefix _prefix(_dout, this)
103 using namespace TOPNSPC::common;
104 static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
105 return *_dout << "mon." << mon->name << "@" << mon->rank
106 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
107 }
108
109 const string Monitor::MONITOR_NAME = "monitor";
110 const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
111
112
113 #undef FLAG
114 #undef COMMAND
115 #undef COMMAND_WITH_FLAG
116 #define FLAG(f) (MonCommand::FLAG_##f)
117 #define COMMAND(parsesig, helptext, modulename, req_perms) \
118 {parsesig, helptext, modulename, req_perms, FLAG(NONE)},
119 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, flags) \
120 {parsesig, helptext, modulename, req_perms, flags},
121 MonCommand mon_commands[] = {
122 #include <mon/MonCommands.h>
123 };
124 #undef COMMAND
125 #undef COMMAND_WITH_FLAG
126
127 Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
128 Messenger *m, Messenger *mgr_m, MonMap *map) :
129 Dispatcher(cct_),
130 AuthServer(cct_),
131 name(nm),
132 rank(-1),
133 messenger(m),
134 con_self(m ? m->get_loopback_connection() : NULL),
135 timer(cct_, lock),
136 finisher(cct_, "mon_finisher", "fin"),
137 cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads),
138 has_ever_joined(false),
139 logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
140 monmap(map),
141 log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
142 key_server(cct, &keyring),
143 auth_cluster_required(cct,
144 cct->_conf->auth_supported.empty() ?
145 cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
146 auth_service_required(cct,
147 cct->_conf->auth_supported.empty() ?
148 cct->_conf->auth_service_required : cct->_conf->auth_supported),
149 mgr_messenger(mgr_m),
150 mgr_client(cct_, mgr_m, monmap),
151 gss_ktfile_client(cct->_conf.get_val<std::string>("gss_ktab_client_file")),
152 store(s),
153
154 elector(this),
155 required_features(0),
156 leader(0),
157 quorum_con_features(0),
158 // scrub
159 scrub_version(0),
160 scrub_event(NULL),
161 scrub_timeout_event(NULL),
162
163 // sync state
164 sync_provider_count(0),
165 sync_cookie(0),
166 sync_full(false),
167 sync_start_version(0),
168 sync_timeout_event(NULL),
169 sync_last_committed_floor(0),
170
171 timecheck_round(0),
172 timecheck_acks(0),
173 timecheck_rounds_since_clean(0),
174 timecheck_event(NULL),
175
176 paxos_service(PAXOS_NUM),
177 admin_hook(NULL),
178 routed_request_tid(0),
179 op_tracker(cct, g_conf().get_val<bool>("mon_enable_op_tracker"), 1)
180 {
181 clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
182 audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
183
184 update_log_clients();
185
186 if (!gss_ktfile_client.empty()) {
187 // Assert we can export environment variable
188 /*
189 The default client keytab is used, if it is present and readable,
190 to automatically obtain initial credentials for GSSAPI client
191 applications. The principal name of the first entry in the client
192 keytab is used by default when obtaining initial credentials.
193 1. The KRB5_CLIENT_KTNAME environment variable.
194 2. The default_client_keytab_name profile variable in [libdefaults].
195 3. The hardcoded default, DEFCKTNAME.
196 */
197 const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
198 gss_ktfile_client.c_str(), 1));
199 ceph_assert(set_result == 0);
200 }
201
202 op_tracker.set_complaint_and_threshold(
203 g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count(),
204 g_conf().get_val<int64_t>("mon_op_log_threshold"));
205 op_tracker.set_history_size_and_duration(
206 g_conf().get_val<uint64_t>("mon_op_history_size"),
207 g_conf().get_val<std::chrono::seconds>("mon_op_history_duration").count());
208 op_tracker.set_history_slow_op_size_and_threshold(
209 g_conf().get_val<uint64_t>("mon_op_history_slow_op_size"),
210 g_conf().get_val<std::chrono::seconds>("mon_op_history_slow_op_threshold").count());
211
212 paxos = new Paxos(this, "paxos");
213
214 paxos_service[PAXOS_MDSMAP].reset(new MDSMonitor(this, paxos, "mdsmap"));
215 paxos_service[PAXOS_MONMAP].reset(new MonmapMonitor(this, paxos, "monmap"));
216 paxos_service[PAXOS_OSDMAP].reset(new OSDMonitor(cct, this, paxos, "osdmap"));
217 paxos_service[PAXOS_LOG].reset(new LogMonitor(this, paxos, "logm"));
218 paxos_service[PAXOS_AUTH].reset(new AuthMonitor(this, paxos, "auth"));
219 paxos_service[PAXOS_MGR].reset(new MgrMonitor(this, paxos, "mgr"));
220 paxos_service[PAXOS_MGRSTAT].reset(new MgrStatMonitor(this, paxos, "mgrstat"));
221 paxos_service[PAXOS_HEALTH].reset(new HealthMonitor(this, paxos, "health"));
222 paxos_service[PAXOS_CONFIG].reset(new ConfigMonitor(this, paxos, "config"));
223
224 config_key_service = new ConfigKeyService(this, paxos);
225
226 bool r = mon_caps.parse("allow *", NULL);
227 ceph_assert(r);
228
229 exited_quorum = ceph_clock_now();
230
231 // prepare local commands
232 local_mon_commands.resize(std::size(mon_commands));
233 for (unsigned i = 0; i < std::size(mon_commands); ++i) {
234 local_mon_commands[i] = mon_commands[i];
235 }
236 MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
237
238 prenautilus_local_mon_commands = local_mon_commands;
239 for (auto& i : prenautilus_local_mon_commands) {
240 std::string n = cmddesc_get_prenautilus_compat(i.cmdstring);
241 if (n != i.cmdstring) {
242 dout(20) << " pre-nautilus cmd " << i.cmdstring << " -> " << n << dendl;
243 i.cmdstring = n;
244 }
245 }
246 MonCommand::encode_vector(prenautilus_local_mon_commands, prenautilus_local_mon_commands_bl);
247
248 // assume our commands until we have an election. this only means
249 // we won't reply with EINVAL before the election; any command that
250 // actually matters will wait until we have quorum etc and then
251 // retry (and revalidate).
252 leader_mon_commands = local_mon_commands;
253 }
254
255 Monitor::~Monitor()
256 {
257 op_tracker.on_shutdown();
258
259 paxos_service.clear();
260 delete config_key_service;
261 delete paxos;
262 ceph_assert(session_map.sessions.empty());
263 }
264
265
266 class AdminHook : public AdminSocketHook {
267 Monitor *mon;
268 public:
269 explicit AdminHook(Monitor *m) : mon(m) {}
270 int call(std::string_view command, const cmdmap_t& cmdmap,
271 Formatter *f,
272 std::ostream& errss,
273 bufferlist& out) override {
274 stringstream outss;
275 int r = mon->do_admin_command(command, cmdmap, f, errss, outss);
276 out.append(outss);
277 return r;
278 }
279 };
280
281 int Monitor::do_admin_command(
282 std::string_view command,
283 const cmdmap_t& cmdmap,
284 Formatter *f,
285 std::ostream& err,
286 std::ostream& out)
287 {
288 std::lock_guard l(lock);
289
290 int r = 0;
291 string args;
292 for (auto p = cmdmap.begin();
293 p != cmdmap.end(); ++p) {
294 if (p->first == "prefix")
295 continue;
296 if (!args.empty())
297 args += ", ";
298 args += cmd_vartype_stringify(p->second);
299 }
300 args = "[" + args + "]";
301
302 bool read_only = (command == "mon_status" ||
303 command == "mon metadata" ||
304 command == "quorum_status" ||
305 command == "ops" ||
306 command == "sessions");
307
308 (read_only ? audit_clog->debug() : audit_clog->info())
309 << "from='admin socket' entity='admin socket' "
310 << "cmd='" << command << "' args=" << args << ": dispatch";
311
312 if (command == "mon_status") {
313 get_mon_status(f);
314 } else if (command == "quorum_status") {
315 _quorum_status(f, out);
316 } else if (command == "sync_force") {
317 string validate;
318 if ((!cmd_getval(cmdmap, "validate", validate)) ||
319 (validate != "--yes-i-really-mean-it")) {
320 err << "are you SURE? this will mean the monitor store will be erased "
321 "the next time the monitor is restarted. pass "
322 "'--yes-i-really-mean-it' if you really do.";
323 r = -EPERM;
324 goto abort;
325 }
326 sync_force(f);
327 } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0 ||
328 command.compare(0, 24, "add_bootstrap_peer_hintv") == 0) {
329 if (!_add_bootstrap_peer_hint(command, cmdmap, out))
330 goto abort;
331 } else if (command == "quorum enter") {
332 elector.start_participating();
333 start_election();
334 out << "started responding to quorum, initiated new election";
335 } else if (command == "quorum exit") {
336 start_election();
337 elector.stop_participating();
338 out << "stopped responding to quorum, initiated new election";
339 } else if (command == "ops") {
340 (void)op_tracker.dump_ops_in_flight(f);
341 } else if (command == "sessions") {
342 f->open_array_section("sessions");
343 for (auto p : session_map.sessions) {
344 f->dump_object("session", *p);
345 }
346 f->close_section();
347 } else if (command == "dump_historic_ops") {
348 if (!op_tracker.dump_historic_ops(f)) {
349 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
350 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
351 }
352 } else if (command == "dump_historic_ops_by_duration" ) {
353 if (op_tracker.dump_historic_ops(f, true)) {
354 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
355 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
356 }
357 } else if (command == "dump_historic_slow_ops") {
358 if (op_tracker.dump_historic_slow_ops(f, {})) {
359 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
360 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
361 }
362 } else if (command == "quorum") {
363 string quorumcmd;
364 cmd_getval(cmdmap, "quorumcmd", quorumcmd);
365 if (quorumcmd == "exit") {
366 start_election();
367 elector.stop_participating();
368 out << "stopped responding to quorum, initiated new election" << std::endl;
369 } else if (quorumcmd == "enter") {
370 elector.start_participating();
371 start_election();
372 out << "started responding to quorum, initiated new election" << std::endl;
373 } else {
374 err << "needs a valid 'quorum' command" << std::endl;
375 }
376 } else if (command == "smart") {
377 string want_devid;
378 cmd_getval(cmdmap, "devid", want_devid);
379
380 string devname = store->get_devname();
381 set<string> devnames;
382 get_raw_devices(devname, &devnames);
383 json_spirit::mObject json_map;
384 uint64_t smart_timeout = cct->_conf.get_val<uint64_t>(
385 "mon_smart_report_timeout");
386 for (auto& devname : devnames) {
387 string err;
388 string devid = get_device_id(devname, &err);
389 if (want_devid.size() && want_devid != devid) {
390 derr << "get_device_id failed on " << devname << ": " << err << dendl;
391 continue;
392 }
393 json_spirit::mValue smart_json;
394 if (block_device_get_metrics(devname, smart_timeout,
395 &smart_json)) {
396 dout(10) << "block_device_get_metrics failed for /dev/" << devname
397 << dendl;
398 continue;
399 }
400 json_map[devid] = smart_json;
401 }
402 json_spirit::write(json_map, out, json_spirit::pretty_print);
403 } else if (command == "heap") {
404 if (!ceph_using_tcmalloc()) {
405 err << "could not issue heap profiler command -- not using tcmalloc!";
406 r = -EOPNOTSUPP;
407 goto abort;
408 }
409 string cmd;
410 if (!cmd_getval(cmdmap, "heapcmd", cmd)) {
411 err << "unable to get value for command \"" << cmd << "\"";
412 r = -EINVAL;
413 goto abort;
414 }
415 std::vector<std::string> cmd_vec;
416 get_str_vec(cmd, cmd_vec);
417 string val;
418 if (cmd_getval(cmdmap, "value", val)) {
419 cmd_vec.push_back(val);
420 }
421 ceph_heap_profiler_handle_command(cmd_vec, out);
422 } else if (command == "compact") {
423 dout(1) << "triggering manual compaction" << dendl;
424 auto start = ceph::coarse_mono_clock::now();
425 store->compact_async();
426 auto end = ceph::coarse_mono_clock::now();
427 auto duration = ceph::to_seconds<double>(end - start);
428 dout(1) << "finished manual compaction in "
429 << duration << " seconds" << dendl;
430 out << "compacted " << g_conf().get_val<std::string>("mon_keyvaluedb")
431 << " in " << duration << " seconds";
432 } else {
433 ceph_abort_msg("bad AdminSocket command binding");
434 }
435 (read_only ? audit_clog->debug() : audit_clog->info())
436 << "from='admin socket' "
437 << "entity='admin socket' "
438 << "cmd=" << command << " "
439 << "args=" << args << ": finished";
440 return r;
441
442 abort:
443 (read_only ? audit_clog->debug() : audit_clog->info())
444 << "from='admin socket' "
445 << "entity='admin socket' "
446 << "cmd=" << command << " "
447 << "args=" << args << ": aborted";
448 return r;
449 }
450
451 void Monitor::handle_signal(int signum)
452 {
453 ceph_assert(signum == SIGINT || signum == SIGTERM);
454 derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
455 shutdown();
456 }
457
458 CompatSet Monitor::get_initial_supported_features()
459 {
460 CompatSet::FeatureSet ceph_mon_feature_compat;
461 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
462 CompatSet::FeatureSet ceph_mon_feature_incompat;
463 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
464 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
465 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
466 ceph_mon_feature_incompat);
467 }
468
469 CompatSet Monitor::get_supported_features()
470 {
471 CompatSet compat = get_initial_supported_features();
472 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
473 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
474 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
475 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
476 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
477 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
478 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
479 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
480 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS);
481 return compat;
482 }
483
484 CompatSet Monitor::get_legacy_features()
485 {
486 CompatSet::FeatureSet ceph_mon_feature_compat;
487 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
488 CompatSet::FeatureSet ceph_mon_feature_incompat;
489 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
490 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
491 ceph_mon_feature_incompat);
492 }
493
494 int Monitor::check_features(MonitorDBStore *store)
495 {
496 CompatSet required = get_supported_features();
497 CompatSet ondisk;
498
499 read_features_off_disk(store, &ondisk);
500
501 if (!required.writeable(ondisk)) {
502 CompatSet diff = required.unsupported(ondisk);
503 generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
504 return -EPERM;
505 }
506
507 return 0;
508 }
509
510 void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
511 {
512 bufferlist featuresbl;
513 store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
514 if (featuresbl.length() == 0) {
515 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
516 << "Assuming it is old-style and introducing one." << dendl;
517 //we only want the baseline ~v.18 features assumed to be on disk.
518 //If new features are introduced this code needs to disappear or
519 //be made smarter.
520 *features = get_legacy_features();
521
522 features->encode(featuresbl);
523 auto t(std::make_shared<MonitorDBStore::Transaction>());
524 t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
525 store->apply_transaction(t);
526 } else {
527 auto it = featuresbl.cbegin();
528 features->decode(it);
529 }
530 }
531
532 void Monitor::read_features()
533 {
534 read_features_off_disk(store, &features);
535 dout(10) << "features " << features << dendl;
536
537 calc_quorum_requirements();
538 dout(10) << "required_features " << required_features << dendl;
539 }
540
541 void Monitor::write_features(MonitorDBStore::TransactionRef t)
542 {
543 bufferlist bl;
544 features.encode(bl);
545 t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
546 }
547
548 const char** Monitor::get_tracked_conf_keys() const
549 {
550 static const char* KEYS[] = {
551 "crushtool", // helpful for testing
552 "mon_election_timeout",
553 "mon_lease",
554 "mon_lease_renew_interval_factor",
555 "mon_lease_ack_timeout_factor",
556 "mon_accept_timeout_factor",
557 // clog & admin clog
558 "clog_to_monitors",
559 "clog_to_syslog",
560 "clog_to_syslog_facility",
561 "clog_to_syslog_level",
562 "clog_to_graylog",
563 "clog_to_graylog_host",
564 "clog_to_graylog_port",
565 "host",
566 "fsid",
567 // periodic health to clog
568 "mon_health_to_clog",
569 "mon_health_to_clog_interval",
570 "mon_health_to_clog_tick_interval",
571 // scrub interval
572 "mon_scrub_interval",
573 "mon_allow_pool_delete",
574 // osdmap pruning - observed, not handled.
575 "mon_osdmap_full_prune_enabled",
576 "mon_osdmap_full_prune_min",
577 "mon_osdmap_full_prune_interval",
578 "mon_osdmap_full_prune_txsize",
579 // debug options - observed, not handled
580 "mon_debug_extra_checks",
581 "mon_debug_block_osdmap_trim",
582 NULL
583 };
584 return KEYS;
585 }
586
587 void Monitor::handle_conf_change(const ConfigProxy& conf,
588 const std::set<std::string> &changed)
589 {
590 sanitize_options();
591
592 dout(10) << __func__ << " " << changed << dendl;
593
594 if (changed.count("clog_to_monitors") ||
595 changed.count("clog_to_syslog") ||
596 changed.count("clog_to_syslog_level") ||
597 changed.count("clog_to_syslog_facility") ||
598 changed.count("clog_to_graylog") ||
599 changed.count("clog_to_graylog_host") ||
600 changed.count("clog_to_graylog_port") ||
601 changed.count("host") ||
602 changed.count("fsid")) {
603 update_log_clients();
604 }
605
606 if (changed.count("mon_health_to_clog") ||
607 changed.count("mon_health_to_clog_interval") ||
608 changed.count("mon_health_to_clog_tick_interval")) {
609 finisher.queue(new C_MonContext{this, [this, changed](int) {
610 std::lock_guard l{lock};
611 health_to_clog_update_conf(changed);
612 }});
613 }
614
615 if (changed.count("mon_scrub_interval")) {
616 int scrub_interval = conf->mon_scrub_interval;
617 finisher.queue(new C_MonContext{this, [this, scrub_interval](int) {
618 std::lock_guard l{lock};
619 scrub_update_interval(scrub_interval);
620 }});
621 }
622 }
623
624 void Monitor::update_log_clients()
625 {
626 map<string,string> log_to_monitors;
627 map<string,string> log_to_syslog;
628 map<string,string> log_channel;
629 map<string,string> log_prio;
630 map<string,string> log_to_graylog;
631 map<string,string> log_to_graylog_host;
632 map<string,string> log_to_graylog_port;
633 uuid_d fsid;
634 string host;
635
636 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
637 log_channel, log_prio, log_to_graylog,
638 log_to_graylog_host, log_to_graylog_port,
639 fsid, host))
640 return;
641
642 clog->update_config(log_to_monitors, log_to_syslog,
643 log_channel, log_prio, log_to_graylog,
644 log_to_graylog_host, log_to_graylog_port,
645 fsid, host);
646
647 audit_clog->update_config(log_to_monitors, log_to_syslog,
648 log_channel, log_prio, log_to_graylog,
649 log_to_graylog_host, log_to_graylog_port,
650 fsid, host);
651 }
652
653 int Monitor::sanitize_options()
654 {
655 int r = 0;
656
657 // mon_lease must be greater than mon_lease_renewal; otherwise we
658 // may incur in leases expiring before they are renewed.
659 if (g_conf()->mon_lease_renew_interval_factor >= 1.0) {
660 clog->error() << "mon_lease_renew_interval_factor ("
661 << g_conf()->mon_lease_renew_interval_factor
662 << ") must be less than 1.0";
663 r = -EINVAL;
664 }
665
666 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
667 // got time to renew the lease and get an ack for it. Having both options
668 // with the same value, for a given small vale, could mean timing out if
669 // the monitors happened to be overloaded -- or even under normal load for
670 // a small enough value.
671 if (g_conf()->mon_lease_ack_timeout_factor <= 1.0) {
672 clog->error() << "mon_lease_ack_timeout_factor ("
673 << g_conf()->mon_lease_ack_timeout_factor
674 << ") must be greater than 1.0";
675 r = -EINVAL;
676 }
677
678 return r;
679 }
680
681 int Monitor::preinit()
682 {
683 std::unique_lock l(lock);
684
685 dout(1) << "preinit fsid " << monmap->fsid << dendl;
686
687 int r = sanitize_options();
688 if (r < 0) {
689 derr << "option sanitization failed!" << dendl;
690 return r;
691 }
692
693 ceph_assert(!logger);
694 {
695 PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
696 pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess",
697 PerfCountersBuilder::PRIO_USEFUL);
698 pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions",
699 "sadd", PerfCountersBuilder::PRIO_INTERESTING);
700 pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions",
701 "srm", PerfCountersBuilder::PRIO_INTERESTING);
702 pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions",
703 "strm", PerfCountersBuilder::PRIO_USEFUL);
704 pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in",
705 "ecnt", PerfCountersBuilder::PRIO_USEFUL);
706 pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started",
707 "estt", PerfCountersBuilder::PRIO_INTERESTING);
708 pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won",
709 "ewon", PerfCountersBuilder::PRIO_INTERESTING);
710 pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost",
711 "elst", PerfCountersBuilder::PRIO_INTERESTING);
712 logger = pcb.create_perf_counters();
713 cct->get_perfcounters_collection()->add(logger);
714 }
715
716 ceph_assert(!cluster_logger);
717 {
718 PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
719 pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
720 pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
721 pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
722 pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
723 pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
724 pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
725 pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster", NULL, 0, unit_t(UNIT_BYTES));
726 pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space", NULL, 0, unit_t(UNIT_BYTES));
727 pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space", NULL, 0, unit_t(UNIT_BYTES));
728 pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
729 pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
730 pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
731 pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
732 pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
733 pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
734 pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
735 pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
736 pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
737 pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects", NULL, 0, unit_t(UNIT_BYTES));
738 cluster_logger = pcb.create_perf_counters();
739 }
740
741 paxos->init_logger();
742
743 // verify cluster_uuid
744 {
745 int r = check_fsid();
746 if (r == -ENOENT)
747 r = write_fsid();
748 if (r < 0) {
749 return r;
750 }
751 }
752
753 // open compatset
754 read_features();
755
756 // have we ever joined a quorum?
757 has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
758 dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
759
760 if (!has_ever_joined) {
761 // impose initial quorum restrictions?
762 list<string> initial_members;
763 get_str_list(g_conf()->mon_initial_members, initial_members);
764
765 if (!initial_members.empty()) {
766 dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
767
768 monmap->set_initial_members(
769 g_ceph_context, initial_members, name, messenger->get_myaddrs(),
770 &extra_probe_peers);
771
772 dout(10) << " monmap is " << *monmap << dendl;
773 dout(10) << " extra probe peers " << extra_probe_peers << dendl;
774 }
775 } else if (!monmap->contains(name)) {
776 derr << "not in monmap and have been in a quorum before; "
777 << "must have been removed" << dendl;
778 if (g_conf()->mon_force_quorum_join) {
779 dout(0) << "we should have died but "
780 << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
781 } else {
782 derr << "commit suicide!" << dendl;
783 return -ENOENT;
784 }
785 }
786
787 {
788 // We have a potentially inconsistent store state in hands. Get rid of it
789 // and start fresh.
790 bool clear_store = false;
791 if (store->exists("mon_sync", "in_sync")) {
792 dout(1) << __func__ << " clean up potentially inconsistent store state"
793 << dendl;
794 clear_store = true;
795 }
796
797 if (store->get("mon_sync", "force_sync") > 0) {
798 dout(1) << __func__ << " force sync by clearing store state" << dendl;
799 clear_store = true;
800 }
801
802 if (clear_store) {
803 set<string> sync_prefixes = get_sync_targets_names();
804 store->clear(sync_prefixes);
805 }
806 }
807
808 sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
809 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
810
811 init_paxos();
812
813 if (is_keyring_required()) {
814 // we need to bootstrap authentication keys so we can form an
815 // initial quorum.
816 if (authmon()->get_last_committed() == 0) {
817 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
818 bufferlist bl;
819 int err = store->get("mkfs", "keyring", bl);
820 if (err == 0 && bl.length() > 0) {
821 // Attempt to decode and extract keyring only if it is found.
822 KeyRing keyring;
823 auto p = bl.cbegin();
824 decode(keyring, p);
825 extract_save_mon_key(keyring);
826 }
827 }
828
829 string keyring_loc = g_conf()->mon_data + "/keyring";
830
831 r = keyring.load(cct, keyring_loc);
832 if (r < 0) {
833 EntityName mon_name;
834 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
835 EntityAuth mon_key;
836 if (key_server.get_auth(mon_name, mon_key)) {
837 dout(1) << "copying mon. key from old db to external keyring" << dendl;
838 keyring.add(mon_name, mon_key);
839 bufferlist bl;
840 keyring.encode_plaintext(bl);
841 write_default_keyring(bl);
842 } else {
843 derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
844 return r;
845 }
846 }
847 }
848
849 admin_hook = new AdminHook(this);
850 AdminSocket* admin_socket = cct->get_admin_socket();
851
852 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
853 l.unlock();
854 // register tell/asock commands
855 for (const auto& command : local_mon_commands) {
856 if (!command.is_tell()) {
857 continue;
858 }
859 const auto prefix = cmddesc_get_prefix(command.cmdstring);
860 if (prefix == "injectargs" ||
861 prefix == "version" ||
862 prefix == "tell") {
863 // not registerd by me
864 continue;
865 }
866 r = admin_socket->register_command(command.cmdstring, admin_hook,
867 command.helpstring);
868 ceph_assert(r == 0);
869 }
870 l.lock();
871
872 // add ourselves as a conf observer
873 g_conf().add_observer(this);
874
875 messenger->set_auth_client(this);
876 messenger->set_auth_server(this);
877 mgr_messenger->set_auth_client(this);
878
879 auth_registry.refresh_config();
880
881 return 0;
882 }
883
884 int Monitor::init()
885 {
886 dout(2) << "init" << dendl;
887 std::lock_guard l(lock);
888
889 finisher.start();
890
891 // start ticker
892 timer.init();
893 new_tick();
894
895 cpu_tp.start();
896
897 // i'm ready!
898 messenger->add_dispatcher_tail(this);
899
900 // kickstart pet mgrclient
901 mgr_client.init();
902 mgr_messenger->add_dispatcher_tail(&mgr_client);
903 mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
904 mgrmon()->prime_mgr_client();
905
906 state = STATE_PROBING;
907 bootstrap();
908 // add features of myself into feature_map
909 session_map.feature_map.add_mon(con_self->get_features());
910 return 0;
911 }
912
913 void Monitor::init_paxos()
914 {
915 dout(10) << __func__ << dendl;
916 paxos->init();
917
918 // init services
919 for (auto& svc : paxos_service) {
920 svc->init();
921 }
922
923 refresh_from_paxos(NULL);
924 }
925
926 void Monitor::refresh_from_paxos(bool *need_bootstrap)
927 {
928 dout(10) << __func__ << dendl;
929
930 bufferlist bl;
931 int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
932 if (r >= 0) {
933 try {
934 auto p = bl.cbegin();
935 decode(fingerprint, p);
936 }
937 catch (buffer::error& e) {
938 dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
939 }
940 } else {
941 dout(10) << __func__ << " no cluster_fingerprint" << dendl;
942 }
943
944 for (auto& svc : paxos_service) {
945 svc->refresh(need_bootstrap);
946 }
947 for (auto& svc : paxos_service) {
948 svc->post_refresh();
949 }
950 load_metadata();
951 }
952
953 void Monitor::register_cluster_logger()
954 {
955 if (!cluster_logger_registered) {
956 dout(10) << "register_cluster_logger" << dendl;
957 cluster_logger_registered = true;
958 cct->get_perfcounters_collection()->add(cluster_logger);
959 } else {
960 dout(10) << "register_cluster_logger - already registered" << dendl;
961 }
962 }
963
964 void Monitor::unregister_cluster_logger()
965 {
966 if (cluster_logger_registered) {
967 dout(10) << "unregister_cluster_logger" << dendl;
968 cluster_logger_registered = false;
969 cct->get_perfcounters_collection()->remove(cluster_logger);
970 } else {
971 dout(10) << "unregister_cluster_logger - not registered" << dendl;
972 }
973 }
974
975 void Monitor::update_logger()
976 {
977 cluster_logger->set(l_cluster_num_mon, monmap->size());
978 cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
979 }
980
981 void Monitor::shutdown()
982 {
983 dout(1) << "shutdown" << dendl;
984
985 lock.lock();
986
987 wait_for_paxos_write();
988
989 {
990 std::lock_guard l(auth_lock);
991 authmon()->_set_mon_num_rank(0, 0);
992 }
993
994 state = STATE_SHUTDOWN;
995
996 lock.unlock();
997 g_conf().remove_observer(this);
998 lock.lock();
999
1000 if (admin_hook) {
1001 cct->get_admin_socket()->unregister_commands(admin_hook);
1002 delete admin_hook;
1003 admin_hook = NULL;
1004 }
1005
1006 elector.shutdown();
1007
1008 mgr_client.shutdown();
1009
1010 lock.unlock();
1011 finisher.wait_for_empty();
1012 finisher.stop();
1013 lock.lock();
1014
1015 // clean up
1016 paxos->shutdown();
1017 for (auto& svc : paxos_service) {
1018 svc->shutdown();
1019 }
1020
1021 finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
1022 finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
1023
1024 timer.shutdown();
1025
1026 cpu_tp.stop();
1027
1028 remove_all_sessions();
1029
1030 log_client.shutdown();
1031
1032 // unlock before msgr shutdown...
1033 lock.unlock();
1034
1035 // shutdown messenger before removing logger from perfcounter collection,
1036 // otherwise _ms_dispatch() will try to update deleted logger
1037 messenger->shutdown();
1038 mgr_messenger->shutdown();
1039
1040 if (logger) {
1041 cct->get_perfcounters_collection()->remove(logger);
1042 delete logger;
1043 logger = NULL;
1044 }
1045 if (cluster_logger) {
1046 if (cluster_logger_registered)
1047 cct->get_perfcounters_collection()->remove(cluster_logger);
1048 delete cluster_logger;
1049 cluster_logger = NULL;
1050 }
1051 }
1052
1053 void Monitor::wait_for_paxos_write()
1054 {
1055 if (paxos->is_writing() || paxos->is_writing_previous()) {
1056 dout(10) << __func__ << " flushing pending write" << dendl;
1057 lock.unlock();
1058 store->flush();
1059 lock.lock();
1060 dout(10) << __func__ << " flushed pending write" << dendl;
1061 }
1062 }
1063
1064 void Monitor::respawn()
1065 {
1066 // --- WARNING TO FUTURE COPY/PASTERS ---
1067 // You must also add a call like
1068 //
1069 // ceph_pthread_setname(pthread_self(), "ceph-mon");
1070 //
1071 // to main() so that /proc/$pid/stat field 2 contains "(ceph-mon)"
1072 // instead of "(exe)", so that killall (and log rotation) will work.
1073
1074 dout(0) << __func__ << dendl;
1075
1076 char *new_argv[orig_argc+1];
1077 dout(1) << " e: '" << orig_argv[0] << "'" << dendl;
1078 for (int i=0; i<orig_argc; i++) {
1079 new_argv[i] = (char *)orig_argv[i];
1080 dout(1) << " " << i << ": '" << orig_argv[i] << "'" << dendl;
1081 }
1082 new_argv[orig_argc] = NULL;
1083
1084 /* Determine the path to our executable, test if Linux /proc/self/exe exists.
1085 * This allows us to exec the same executable even if it has since been
1086 * unlinked.
1087 */
1088 char exe_path[PATH_MAX] = "";
1089 #ifdef PROCPREFIX
1090 if (readlink(PROCPREFIX "/proc/self/exe", exe_path, PATH_MAX-1) != -1) {
1091 dout(1) << "respawning with exe " << exe_path << dendl;
1092 strcpy(exe_path, PROCPREFIX "/proc/self/exe");
1093 } else {
1094 #else
1095 {
1096 #endif
1097 /* Print CWD for the user's interest */
1098 char buf[PATH_MAX];
1099 char *cwd = getcwd(buf, sizeof(buf));
1100 ceph_assert(cwd);
1101 dout(1) << " cwd " << cwd << dendl;
1102
1103 /* Fall back to a best-effort: just running in our CWD */
1104 strncpy(exe_path, orig_argv[0], PATH_MAX-1);
1105 }
1106
1107 dout(1) << " exe_path " << exe_path << dendl;
1108
1109 unblock_all_signals(NULL);
1110 execv(exe_path, new_argv);
1111
1112 dout(0) << "respawn execv " << orig_argv[0]
1113 << " failed with " << cpp_strerror(errno) << dendl;
1114
1115 // We have to assert out here, because suicide() returns, and callers
1116 // to respawn expect it never to return.
1117 ceph_abort();
1118 }
1119
1120 void Monitor::bootstrap()
1121 {
1122 dout(10) << "bootstrap" << dendl;
1123 wait_for_paxos_write();
1124
1125 sync_reset_requester();
1126 unregister_cluster_logger();
1127 cancel_probe_timeout();
1128
1129 if (monmap->get_epoch() == 0) {
1130 dout(10) << "reverting to legacy ranks for seed monmap (epoch 0)" << dendl;
1131 monmap->calc_legacy_ranks();
1132 }
1133 dout(10) << "monmap " << *monmap << dendl;
1134 {
1135 auto from_release = monmap->min_mon_release;
1136 ostringstream err;
1137 if (!can_upgrade_from(from_release, "min_mon_release", err)) {
1138 derr << "current monmap has " << err.str() << " stopping." << dendl;
1139 exit(0);
1140 }
1141 }
1142 // note my rank
1143 int newrank = monmap->get_rank(messenger->get_myaddrs());
1144 if (newrank < 0 && rank >= 0) {
1145 // was i ever part of the quorum?
1146 if (has_ever_joined) {
1147 dout(0) << " removed from monmap, suicide." << dendl;
1148 exit(0);
1149 }
1150 }
1151 if (newrank >= 0 &&
1152 monmap->get_addrs(newrank) != messenger->get_myaddrs()) {
1153 dout(0) << " monmap addrs for rank " << newrank << " changed, i am "
1154 << messenger->get_myaddrs()
1155 << ", monmap is " << monmap->get_addrs(newrank) << ", respawning"
1156 << dendl;
1157
1158 if (monmap->get_epoch()) {
1159 // store this map in temp mon_sync location so that we use it on
1160 // our next startup
1161 derr << " stashing newest monmap " << monmap->get_epoch()
1162 << " for next startup" << dendl;
1163 bufferlist bl;
1164 monmap->encode(bl, -1);
1165 auto t(std::make_shared<MonitorDBStore::Transaction>());
1166 t->put("mon_sync", "temp_newer_monmap", bl);
1167 store->apply_transaction(t);
1168 }
1169
1170 respawn();
1171 }
1172 if (newrank != rank) {
1173 dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
1174 messenger->set_myname(entity_name_t::MON(newrank));
1175 rank = newrank;
1176
1177 // reset all connections, or else our peers will think we are someone else.
1178 messenger->mark_down_all();
1179 }
1180
1181 // reset
1182 state = STATE_PROBING;
1183
1184 _reset();
1185
1186 // sync store
1187 if (g_conf()->mon_compact_on_bootstrap) {
1188 dout(10) << "bootstrap -- triggering compaction" << dendl;
1189 store->compact();
1190 dout(10) << "bootstrap -- finished compaction" << dendl;
1191 }
1192
1193 // singleton monitor?
1194 if (monmap->size() == 1 && rank == 0) {
1195 win_standalone_election();
1196 return;
1197 }
1198
1199 reset_probe_timeout();
1200
1201 // i'm outside the quorum
1202 if (monmap->contains(name))
1203 outside_quorum.insert(name);
1204
1205 // probe monitors
1206 dout(10) << "probing other monitors" << dendl;
1207 for (unsigned i = 0; i < monmap->size(); i++) {
1208 if ((int)i != rank)
1209 send_mon_message(
1210 new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
1211 ceph_release()),
1212 i);
1213 }
1214 for (auto& av : extra_probe_peers) {
1215 if (av != messenger->get_myaddrs()) {
1216 messenger->send_to_mon(
1217 new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
1218 ceph_release()),
1219 av);
1220 }
1221 }
1222 }
1223
1224 bool Monitor::_add_bootstrap_peer_hint(std::string_view cmd,
1225 const cmdmap_t& cmdmap,
1226 ostream& ss)
1227 {
1228 if (is_leader() || is_peon()) {
1229 ss << "mon already active; ignoring bootstrap hint";
1230 return true;
1231 }
1232
1233 entity_addrvec_t addrs;
1234 string addrstr;
1235 if (cmd_getval(cmdmap, "addr", addrstr)) {
1236 dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' addr '"
1237 << addrstr << "'" << dendl;
1238
1239 entity_addr_t addr;
1240 const char *end = 0;
1241 if (!addr.parse(addrstr.c_str(), &end, entity_addr_t::TYPE_ANY)) {
1242 ss << "failed to parse addrs '" << addrstr
1243 << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1244 return false;
1245 }
1246
1247 addrs.v.push_back(addr);
1248 if (addr.get_port() == 0) {
1249 addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
1250 addrs.v[0].set_port(CEPH_MON_PORT_IANA);
1251 addrs.v.push_back(addr);
1252 addrs.v[1].set_type(entity_addr_t::TYPE_LEGACY);
1253 addrs.v[1].set_port(CEPH_MON_PORT_LEGACY);
1254 } else if (addr.get_type() == entity_addr_t::TYPE_ANY) {
1255 if (addr.get_port() == CEPH_MON_PORT_LEGACY) {
1256 addrs.v[0].set_type(entity_addr_t::TYPE_LEGACY);
1257 } else {
1258 addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
1259 }
1260 }
1261 } else if (cmd_getval(cmdmap, "addrv", addrstr)) {
1262 dout(10) << "_add_bootstrap_peer_hintv '" << cmd << "' addrv '"
1263 << addrstr << "'" << dendl;
1264 const char *end = 0;
1265 if (!addrs.parse(addrstr.c_str(), &end)) {
1266 ss << "failed to parse addrs '" << addrstr
1267 << "'; syntax is 'add_bootstrap_peer_hintv v2:ip:port[,v1:ip:port]'";
1268 return false;
1269 }
1270 } else {
1271 ss << "no addr or addrv provided";
1272 return false;
1273 }
1274
1275 extra_probe_peers.insert(addrs);
1276 ss << "adding peer " << addrs << " to list: " << extra_probe_peers;
1277 return true;
1278 }
1279
1280 // called by bootstrap(), or on leader|peon -> electing
1281 void Monitor::_reset()
1282 {
1283 dout(10) << __func__ << dendl;
1284
1285 // disable authentication
1286 {
1287 std::lock_guard l(auth_lock);
1288 authmon()->_set_mon_num_rank(0, 0);
1289 }
1290
1291 cancel_probe_timeout();
1292 timecheck_finish();
1293 health_events_cleanup();
1294 health_check_log_times.clear();
1295 scrub_event_cancel();
1296
1297 leader_since = utime_t();
1298 quorum_since = {};
1299 if (!quorum.empty()) {
1300 exited_quorum = ceph_clock_now();
1301 }
1302 quorum.clear();
1303 outside_quorum.clear();
1304 quorum_feature_map.clear();
1305
1306 scrub_reset();
1307
1308 paxos->restart();
1309
1310 for (auto& svc : paxos_service) {
1311 svc->restart();
1312 }
1313 }
1314
1315
1316 // -----------------------------------------------------------
1317 // sync
1318
1319 set<string> Monitor::get_sync_targets_names()
1320 {
1321 set<string> targets;
1322 targets.insert(paxos->get_name());
1323 for (auto& svc : paxos_service) {
1324 svc->get_store_prefixes(targets);
1325 }
1326 ConfigKeyService *config_key_service_ptr = dynamic_cast<ConfigKeyService*>(config_key_service);
1327 ceph_assert(config_key_service_ptr);
1328 config_key_service_ptr->get_store_prefixes(targets);
1329 return targets;
1330 }
1331
1332
1333 void Monitor::sync_timeout()
1334 {
1335 dout(10) << __func__ << dendl;
1336 ceph_assert(state == STATE_SYNCHRONIZING);
1337 bootstrap();
1338 }
1339
1340 void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1341 {
1342 dout(1) << __func__ << dendl;
1343
1344 MonMap latest_monmap;
1345
1346 // Grab latest monmap from MonmapMonitor
1347 bufferlist monmon_bl;
1348 int err = monmon()->get_monmap(monmon_bl);
1349 if (err < 0) {
1350 if (err != -ENOENT) {
1351 derr << __func__
1352 << " something wrong happened while reading the store: "
1353 << cpp_strerror(err) << dendl;
1354 ceph_abort_msg("error reading the store");
1355 }
1356 } else {
1357 latest_monmap.decode(monmon_bl);
1358 }
1359
1360 // Grab last backed up monmap (if any) and compare epochs
1361 if (store->exists("mon_sync", "latest_monmap")) {
1362 bufferlist backup_bl;
1363 int err = store->get("mon_sync", "latest_monmap", backup_bl);
1364 if (err < 0) {
1365 derr << __func__
1366 << " something wrong happened while reading the store: "
1367 << cpp_strerror(err) << dendl;
1368 ceph_abort_msg("error reading the store");
1369 }
1370 ceph_assert(backup_bl.length() > 0);
1371
1372 MonMap backup_monmap;
1373 backup_monmap.decode(backup_bl);
1374
1375 if (backup_monmap.epoch > latest_monmap.epoch)
1376 latest_monmap = backup_monmap;
1377 }
1378
1379 // Check if our current monmap's epoch is greater than the one we've
1380 // got so far.
1381 if (monmap->epoch > latest_monmap.epoch)
1382 latest_monmap = *monmap;
1383
1384 dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1385
1386 latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1387 }
1388
1389 void Monitor::sync_reset_requester()
1390 {
1391 dout(10) << __func__ << dendl;
1392
1393 if (sync_timeout_event) {
1394 timer.cancel_event(sync_timeout_event);
1395 sync_timeout_event = NULL;
1396 }
1397
1398 sync_provider = entity_addrvec_t();
1399 sync_cookie = 0;
1400 sync_full = false;
1401 sync_start_version = 0;
1402 }
1403
1404 void Monitor::sync_reset_provider()
1405 {
1406 dout(10) << __func__ << dendl;
1407 sync_providers.clear();
1408 }
1409
1410 void Monitor::sync_start(entity_addrvec_t &addrs, bool full)
1411 {
1412 dout(10) << __func__ << " " << addrs << (full ? " full" : " recent") << dendl;
1413
1414 ceph_assert(state == STATE_PROBING ||
1415 state == STATE_SYNCHRONIZING);
1416 state = STATE_SYNCHRONIZING;
1417
1418 // make sure are not a provider for anyone!
1419 sync_reset_provider();
1420
1421 sync_full = full;
1422
1423 if (sync_full) {
1424 // stash key state, and mark that we are syncing
1425 auto t(std::make_shared<MonitorDBStore::Transaction>());
1426 sync_stash_critical_state(t);
1427 t->put("mon_sync", "in_sync", 1);
1428
1429 sync_last_committed_floor = std::max(sync_last_committed_floor, paxos->get_version());
1430 dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1431 << sync_last_committed_floor << dendl;
1432 t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1433
1434 store->apply_transaction(t);
1435
1436 ceph_assert(g_conf()->mon_sync_requester_kill_at != 1);
1437
1438 // clear the underlying store
1439 set<string> targets = get_sync_targets_names();
1440 dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1441 store->clear(targets);
1442
1443 // make sure paxos knows it has been reset. this prevents a
1444 // bootstrap and then different probe reply order from possibly
1445 // deciding a partial or no sync is needed.
1446 paxos->init();
1447
1448 ceph_assert(g_conf()->mon_sync_requester_kill_at != 2);
1449 }
1450
1451 // assume 'other' as the leader. We will update the leader once we receive
1452 // a reply to the sync start.
1453 sync_provider = addrs;
1454
1455 sync_reset_timeout();
1456
1457 MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1458 if (!sync_full)
1459 m->last_committed = paxos->get_version();
1460 messenger->send_to_mon(m, sync_provider);
1461 }
1462
1463 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1464 {
1465 dout(10) << __func__ << dendl;
1466 bufferlist backup_monmap;
1467 sync_obtain_latest_monmap(backup_monmap);
1468 ceph_assert(backup_monmap.length() > 0);
1469 t->put("mon_sync", "latest_monmap", backup_monmap);
1470 }
1471
1472 void Monitor::sync_reset_timeout()
1473 {
1474 dout(10) << __func__ << dendl;
1475 if (sync_timeout_event)
1476 timer.cancel_event(sync_timeout_event);
1477 sync_timeout_event = timer.add_event_after(
1478 g_conf()->mon_sync_timeout,
1479 new C_MonContext{this, [this](int) {
1480 sync_timeout();
1481 }});
1482 }
1483
1484 void Monitor::sync_finish(version_t last_committed)
1485 {
1486 dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1487
1488 ceph_assert(g_conf()->mon_sync_requester_kill_at != 7);
1489
1490 if (sync_full) {
1491 // finalize the paxos commits
1492 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1493 paxos->read_and_prepare_transactions(tx, sync_start_version,
1494 last_committed);
1495 tx->put(paxos->get_name(), "last_committed", last_committed);
1496
1497 dout(30) << __func__ << " final tx dump:\n";
1498 JSONFormatter f(true);
1499 tx->dump(&f);
1500 f.flush(*_dout);
1501 *_dout << dendl;
1502
1503 store->apply_transaction(tx);
1504 }
1505
1506 ceph_assert(g_conf()->mon_sync_requester_kill_at != 8);
1507
1508 auto t(std::make_shared<MonitorDBStore::Transaction>());
1509 t->erase("mon_sync", "in_sync");
1510 t->erase("mon_sync", "force_sync");
1511 t->erase("mon_sync", "last_committed_floor");
1512 store->apply_transaction(t);
1513
1514 ceph_assert(g_conf()->mon_sync_requester_kill_at != 9);
1515
1516 init_paxos();
1517
1518 ceph_assert(g_conf()->mon_sync_requester_kill_at != 10);
1519
1520 bootstrap();
1521 }
1522
1523 void Monitor::handle_sync(MonOpRequestRef op)
1524 {
1525 auto m = op->get_req<MMonSync>();
1526 dout(10) << __func__ << " " << *m << dendl;
1527 switch (m->op) {
1528
1529 // provider ---------
1530
1531 case MMonSync::OP_GET_COOKIE_FULL:
1532 case MMonSync::OP_GET_COOKIE_RECENT:
1533 handle_sync_get_cookie(op);
1534 break;
1535 case MMonSync::OP_GET_CHUNK:
1536 handle_sync_get_chunk(op);
1537 break;
1538
1539 // client -----------
1540
1541 case MMonSync::OP_COOKIE:
1542 handle_sync_cookie(op);
1543 break;
1544
1545 case MMonSync::OP_CHUNK:
1546 case MMonSync::OP_LAST_CHUNK:
1547 handle_sync_chunk(op);
1548 break;
1549 case MMonSync::OP_NO_COOKIE:
1550 handle_sync_no_cookie(op);
1551 break;
1552
1553 default:
1554 dout(0) << __func__ << " unknown op " << m->op << dendl;
1555 ceph_abort_msg("unknown op");
1556 }
1557 }
1558
1559 // leader
1560
1561 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1562 {
1563 auto m = op->get_req<MMonSync>();
1564 MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1565 m->get_connection()->send_message(reply);
1566 }
1567
1568 void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1569 {
1570 auto m = op->get_req<MMonSync>();
1571 if (is_synchronizing()) {
1572 _sync_reply_no_cookie(op);
1573 return;
1574 }
1575
1576 ceph_assert(g_conf()->mon_sync_provider_kill_at != 1);
1577
1578 // make sure they can understand us.
1579 if ((required_features ^ m->get_connection()->get_features()) &
1580 required_features) {
1581 dout(5) << " ignoring peer mon." << m->get_source().num()
1582 << " has features " << std::hex
1583 << m->get_connection()->get_features()
1584 << " but we require " << required_features << std::dec << dendl;
1585 return;
1586 }
1587
1588 // make up a unique cookie. include election epoch (which persists
1589 // across restarts for the whole cluster) and a counter for this
1590 // process instance. there is no need to be unique *across*
1591 // monitors, though.
1592 uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
1593 ceph_assert(sync_providers.count(cookie) == 0);
1594
1595 dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1596
1597 SyncProvider& sp = sync_providers[cookie];
1598 sp.cookie = cookie;
1599 sp.addrs = m->get_source_addrs();
1600 sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
1601
1602 set<string> sync_targets;
1603 if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1604 // full scan
1605 sync_targets = get_sync_targets_names();
1606 sp.last_committed = paxos->get_version();
1607 sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1608 sp.full = true;
1609 dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1610 } else {
1611 // just catch up paxos
1612 sp.last_committed = m->last_committed;
1613 }
1614 dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1615
1616 MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1617 reply->last_committed = sp.last_committed;
1618 m->get_connection()->send_message(reply);
1619 }
1620
1621 void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1622 {
1623 auto m = op->get_req<MMonSync>();
1624 dout(10) << __func__ << " " << *m << dendl;
1625
1626 if (sync_providers.count(m->cookie) == 0) {
1627 dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1628 _sync_reply_no_cookie(op);
1629 return;
1630 }
1631
1632 ceph_assert(g_conf()->mon_sync_provider_kill_at != 2);
1633
1634 SyncProvider& sp = sync_providers[m->cookie];
1635 sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
1636
1637 if (sp.last_committed < paxos->get_first_committed() &&
1638 paxos->get_first_committed() > 1) {
1639 dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1640 << " < our fc " << paxos->get_first_committed() << dendl;
1641 sync_providers.erase(m->cookie);
1642 _sync_reply_no_cookie(op);
1643 return;
1644 }
1645
1646 MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1647 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1648
1649 int bytes_left = g_conf()->mon_sync_max_payload_size;
1650 int keys_left = g_conf()->mon_sync_max_payload_keys;
1651 while (sp.last_committed < paxos->get_version() &&
1652 bytes_left > 0 &&
1653 keys_left > 0) {
1654 bufferlist bl;
1655 sp.last_committed++;
1656
1657 int err = store->get(paxos->get_name(), sp.last_committed, bl);
1658 ceph_assert(err == 0);
1659
1660 tx->put(paxos->get_name(), sp.last_committed, bl);
1661 bytes_left -= bl.length();
1662 --keys_left;
1663 dout(20) << __func__ << " including paxos state " << sp.last_committed
1664 << dendl;
1665 }
1666 reply->last_committed = sp.last_committed;
1667
1668 if (sp.full && bytes_left > 0 && keys_left > 0) {
1669 sp.synchronizer->get_chunk_tx(tx, bytes_left, keys_left);
1670 sp.last_key = sp.synchronizer->get_last_key();
1671 reply->last_key = sp.last_key;
1672 }
1673
1674 if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1675 sp.last_committed < paxos->get_version()) {
1676 dout(10) << __func__ << " chunk, through version " << sp.last_committed
1677 << " key " << sp.last_key << dendl;
1678 } else {
1679 dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1680 << " key " << sp.last_key << dendl;
1681 reply->op = MMonSync::OP_LAST_CHUNK;
1682
1683 ceph_assert(g_conf()->mon_sync_provider_kill_at != 3);
1684
1685 // clean up our local state
1686 sync_providers.erase(sp.cookie);
1687 }
1688
1689 encode(*tx, reply->chunk_bl);
1690
1691 m->get_connection()->send_message(reply);
1692 }
1693
1694 // requester
1695
1696 void Monitor::handle_sync_cookie(MonOpRequestRef op)
1697 {
1698 auto m = op->get_req<MMonSync>();
1699 dout(10) << __func__ << " " << *m << dendl;
1700 if (sync_cookie) {
1701 dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1702 return;
1703 }
1704 if (m->get_source_addrs() != sync_provider) {
1705 dout(10) << __func__ << " source does not match, discarding" << dendl;
1706 return;
1707 }
1708 sync_cookie = m->cookie;
1709 sync_start_version = m->last_committed;
1710
1711 sync_reset_timeout();
1712 sync_get_next_chunk();
1713
1714 ceph_assert(g_conf()->mon_sync_requester_kill_at != 3);
1715 }
1716
1717 void Monitor::sync_get_next_chunk()
1718 {
1719 dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
1720 if (g_conf()->mon_inject_sync_get_chunk_delay > 0) {
1721 dout(20) << __func__ << " injecting delay of " << g_conf()->mon_inject_sync_get_chunk_delay << dendl;
1722 usleep((long long)(g_conf()->mon_inject_sync_get_chunk_delay * 1000000.0));
1723 }
1724 MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
1725 messenger->send_to_mon(r, sync_provider);
1726
1727 ceph_assert(g_conf()->mon_sync_requester_kill_at != 4);
1728 }
1729
1730 void Monitor::handle_sync_chunk(MonOpRequestRef op)
1731 {
1732 auto m = op->get_req<MMonSync>();
1733 dout(10) << __func__ << " " << *m << dendl;
1734
1735 if (m->cookie != sync_cookie) {
1736 dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1737 return;
1738 }
1739 if (m->get_source_addrs() != sync_provider) {
1740 dout(10) << __func__ << " source does not match, discarding" << dendl;
1741 return;
1742 }
1743
1744 ceph_assert(state == STATE_SYNCHRONIZING);
1745 ceph_assert(g_conf()->mon_sync_requester_kill_at != 5);
1746
1747 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1748 tx->append_from_encoded(m->chunk_bl);
1749
1750 dout(30) << __func__ << " tx dump:\n";
1751 JSONFormatter f(true);
1752 tx->dump(&f);
1753 f.flush(*_dout);
1754 *_dout << dendl;
1755
1756 store->apply_transaction(tx);
1757
1758 ceph_assert(g_conf()->mon_sync_requester_kill_at != 6);
1759
1760 if (!sync_full) {
1761 dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1762 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1763 paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1764 m->last_committed);
1765 tx->put(paxos->get_name(), "last_committed", m->last_committed);
1766
1767 dout(30) << __func__ << " tx dump:\n";
1768 JSONFormatter f(true);
1769 tx->dump(&f);
1770 f.flush(*_dout);
1771 *_dout << dendl;
1772
1773 store->apply_transaction(tx);
1774 paxos->init(); // to refresh what we just wrote
1775 }
1776
1777 if (m->op == MMonSync::OP_CHUNK) {
1778 sync_reset_timeout();
1779 sync_get_next_chunk();
1780 } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1781 sync_finish(m->last_committed);
1782 }
1783 }
1784
1785 void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1786 {
1787 dout(10) << __func__ << dendl;
1788 bootstrap();
1789 }
1790
1791 void Monitor::sync_trim_providers()
1792 {
1793 dout(20) << __func__ << dendl;
1794
1795 utime_t now = ceph_clock_now();
1796 map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1797 while (p != sync_providers.end()) {
1798 if (now > p->second.timeout) {
1799 dout(10) << __func__ << " expiring cookie " << p->second.cookie
1800 << " for " << p->second.addrs << dendl;
1801 sync_providers.erase(p++);
1802 } else {
1803 ++p;
1804 }
1805 }
1806 }
1807
1808 // ---------------------------------------------------
1809 // probe
1810
1811 void Monitor::cancel_probe_timeout()
1812 {
1813 if (probe_timeout_event) {
1814 dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1815 timer.cancel_event(probe_timeout_event);
1816 probe_timeout_event = NULL;
1817 } else {
1818 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1819 }
1820 }
1821
1822 void Monitor::reset_probe_timeout()
1823 {
1824 cancel_probe_timeout();
1825 probe_timeout_event = new C_MonContext{this, [this](int r) {
1826 probe_timeout(r);
1827 }};
1828 double t = g_conf()->mon_probe_timeout;
1829 if (timer.add_event_after(t, probe_timeout_event)) {
1830 dout(10) << "reset_probe_timeout " << probe_timeout_event
1831 << " after " << t << " seconds" << dendl;
1832 } else {
1833 probe_timeout_event = nullptr;
1834 }
1835 }
1836
1837 void Monitor::probe_timeout(int r)
1838 {
1839 dout(4) << "probe_timeout " << probe_timeout_event << dendl;
1840 ceph_assert(is_probing() || is_synchronizing());
1841 ceph_assert(probe_timeout_event);
1842 probe_timeout_event = NULL;
1843 bootstrap();
1844 }
1845
1846 void Monitor::handle_probe(MonOpRequestRef op)
1847 {
1848 auto m = op->get_req<MMonProbe>();
1849 dout(10) << "handle_probe " << *m << dendl;
1850
1851 if (m->fsid != monmap->fsid) {
1852 dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1853 return;
1854 }
1855
1856 switch (m->op) {
1857 case MMonProbe::OP_PROBE:
1858 handle_probe_probe(op);
1859 break;
1860
1861 case MMonProbe::OP_REPLY:
1862 handle_probe_reply(op);
1863 break;
1864
1865 case MMonProbe::OP_MISSING_FEATURES:
1866 derr << __func__ << " require release " << (int)m->mon_release << " > "
1867 << (int)ceph_release()
1868 << ", or missing features (have " << CEPH_FEATURES_ALL
1869 << ", required " << m->required_features
1870 << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL) << ")"
1871 << dendl;
1872 break;
1873 }
1874 }
1875
1876 void Monitor::handle_probe_probe(MonOpRequestRef op)
1877 {
1878 auto m = op->get_req<MMonProbe>();
1879
1880 dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
1881 << " features " << m->get_connection()->get_features() << dendl;
1882 uint64_t missing = required_features & ~m->get_connection()->get_features();
1883 if ((m->mon_release != ceph_release_t::unknown &&
1884 m->mon_release < monmap->min_mon_release) ||
1885 missing) {
1886 dout(1) << " peer " << m->get_source_addr()
1887 << " release " << m->mon_release
1888 << " < min_mon_release " << monmap->min_mon_release
1889 << ", or missing features " << missing << dendl;
1890 MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1891 name, has_ever_joined, monmap->min_mon_release);
1892 m->required_features = required_features;
1893 m->get_connection()->send_message(r);
1894 goto out;
1895 }
1896
1897 if (!is_probing() && !is_synchronizing()) {
1898 // If the probing mon is way ahead of us, we need to re-bootstrap.
1899 // Normally we capture this case when we initially bootstrap, but
1900 // it is possible we pass those checks (we overlap with
1901 // quorum-to-be) but fail to join a quorum before it moves past
1902 // us. We need to be kicked back to bootstrap so we can
1903 // synchonize, not keep calling elections.
1904 if (paxos->get_version() + 1 < m->paxos_first_version) {
1905 dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1906 << "ahead of us, re-bootstrapping" << dendl;
1907 bootstrap();
1908 goto out;
1909
1910 }
1911 }
1912
1913 MMonProbe *r;
1914 r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined,
1915 ceph_release());
1916 r->name = name;
1917 r->quorum = quorum;
1918 monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1919 r->paxos_first_version = paxos->get_first_committed();
1920 r->paxos_last_version = paxos->get_version();
1921 m->get_connection()->send_message(r);
1922
1923 // did we discover a peer here?
1924 if (!monmap->contains(m->get_source_addr())) {
1925 dout(1) << " adding peer " << m->get_source_addrs()
1926 << " to list of hints" << dendl;
1927 extra_probe_peers.insert(m->get_source_addrs());
1928 }
1929
1930 out:
1931 return;
1932 }
1933
1934 void Monitor::handle_probe_reply(MonOpRequestRef op)
1935 {
1936 auto m = op->get_req<MMonProbe>();
1937 dout(10) << "handle_probe_reply " << m->get_source_inst()
1938 << " " << *m << dendl;
1939 dout(10) << " monmap is " << *monmap << dendl;
1940
1941 // discover name and addrs during probing or electing states.
1942 if (!is_probing() && !is_electing()) {
1943 return;
1944 }
1945
1946 // newer map, or they've joined a quorum and we haven't?
1947 bufferlist mybl;
1948 monmap->encode(mybl, m->get_connection()->get_features());
1949 // make sure it's actually different; the checks below err toward
1950 // taking the other guy's map, which could cause us to loop.
1951 if (!mybl.contents_equal(m->monmap_bl)) {
1952 MonMap *newmap = new MonMap;
1953 newmap->decode(m->monmap_bl);
1954 if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
1955 !has_ever_joined)) {
1956 dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
1957 << ", mine was " << monmap->get_epoch() << dendl;
1958 delete newmap;
1959 monmap->decode(m->monmap_bl);
1960
1961 bootstrap();
1962 return;
1963 }
1964 delete newmap;
1965 }
1966
1967 // rename peer?
1968 string peer_name = monmap->get_name(m->get_source_addr());
1969 if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
1970 dout(10) << " renaming peer " << m->get_source_addr() << " "
1971 << peer_name << " -> " << m->name << " in my monmap"
1972 << dendl;
1973 monmap->rename(peer_name, m->name);
1974
1975 if (is_electing()) {
1976 bootstrap();
1977 return;
1978 }
1979 } else if (peer_name.size()) {
1980 dout(10) << " peer name is " << peer_name << dendl;
1981 } else {
1982 dout(10) << " peer " << m->get_source_addr() << " not in map" << dendl;
1983 }
1984
1985 // new initial peer?
1986 if (monmap->get_epoch() == 0 &&
1987 monmap->contains(m->name) &&
1988 monmap->get_addrs(m->name).front().is_blank_ip()) {
1989 dout(1) << " learned initial mon " << m->name
1990 << " addrs " << m->get_source_addrs() << dendl;
1991 monmap->set_addrvec(m->name, m->get_source_addrs());
1992
1993 bootstrap();
1994 return;
1995 }
1996
1997 // end discover phase
1998 if (!is_probing()) {
1999 return;
2000 }
2001
2002 ceph_assert(paxos != NULL);
2003
2004 if (is_synchronizing()) {
2005 dout(10) << " currently syncing" << dendl;
2006 return;
2007 }
2008
2009 entity_addrvec_t other = m->get_source_addrs();
2010
2011 if (m->paxos_last_version < sync_last_committed_floor) {
2012 dout(10) << " peer paxos versions [" << m->paxos_first_version
2013 << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
2014 << sync_last_committed_floor << ", ignoring"
2015 << dendl;
2016 } else {
2017 if (paxos->get_version() < m->paxos_first_version &&
2018 m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
2019 dout(10) << " peer paxos first versions [" << m->paxos_first_version
2020 << "," << m->paxos_last_version << "]"
2021 << " vs my version " << paxos->get_version()
2022 << " (too far ahead)"
2023 << dendl;
2024 cancel_probe_timeout();
2025 sync_start(other, true);
2026 return;
2027 }
2028 if (paxos->get_version() + g_conf()->paxos_max_join_drift < m->paxos_last_version) {
2029 dout(10) << " peer paxos last version " << m->paxos_last_version
2030 << " vs my version " << paxos->get_version()
2031 << " (too far ahead)"
2032 << dendl;
2033 cancel_probe_timeout();
2034 sync_start(other, false);
2035 return;
2036 }
2037 }
2038
2039 // did the existing cluster complete upgrade to luminous?
2040 if (osdmon()->osdmap.get_epoch()) {
2041 if (osdmon()->osdmap.require_osd_release < ceph_release_t::luminous) {
2042 derr << __func__ << " existing cluster has not completed upgrade to"
2043 << " luminous; 'ceph osd require_osd_release luminous' before"
2044 << " upgrading" << dendl;
2045 exit(0);
2046 }
2047 if (!osdmon()->osdmap.test_flag(CEPH_OSDMAP_PURGED_SNAPDIRS) ||
2048 !osdmon()->osdmap.test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) {
2049 derr << __func__ << " existing cluster has not completed a full luminous"
2050 << " scrub to purge legacy snapdir objects; please scrub before"
2051 << " upgrading beyond luminous." << dendl;
2052 exit(0);
2053 }
2054 }
2055
2056 // is there an existing quorum?
2057 if (m->quorum.size()) {
2058 dout(10) << " existing quorum " << m->quorum << dendl;
2059
2060 dout(10) << " peer paxos version " << m->paxos_last_version
2061 << " vs my version " << paxos->get_version()
2062 << " (ok)"
2063 << dendl;
2064
2065 if (monmap->contains(name) &&
2066 !monmap->get_addrs(name).front().is_blank_ip()) {
2067 // i'm part of the cluster; just initiate a new election
2068 start_election();
2069 } else {
2070 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
2071 send_mon_message(
2072 new MMonJoin(monmap->fsid, name, messenger->get_myaddrs()),
2073 *m->quorum.begin());
2074 }
2075 } else {
2076 if (monmap->contains(m->name)) {
2077 dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
2078 outside_quorum.insert(m->name);
2079 } else {
2080 dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
2081 return;
2082 }
2083
2084 unsigned need = monmap->min_quorum_size();
2085 dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
2086 if (outside_quorum.size() >= need) {
2087 if (outside_quorum.count(name)) {
2088 dout(10) << " that's enough to form a new quorum, calling election" << dendl;
2089 start_election();
2090 } else {
2091 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
2092 }
2093 } else {
2094 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
2095 }
2096 }
2097 }
2098
2099 void Monitor::join_election()
2100 {
2101 dout(10) << __func__ << dendl;
2102 wait_for_paxos_write();
2103 _reset();
2104 state = STATE_ELECTING;
2105
2106 logger->inc(l_mon_num_elections);
2107 }
2108
2109 void Monitor::start_election()
2110 {
2111 dout(10) << "start_election" << dendl;
2112 wait_for_paxos_write();
2113 _reset();
2114 state = STATE_ELECTING;
2115
2116 logger->inc(l_mon_num_elections);
2117 logger->inc(l_mon_election_call);
2118
2119 clog->info() << "mon." << name << " calling monitor election";
2120 elector.call_election();
2121 }
2122
2123 void Monitor::win_standalone_election()
2124 {
2125 dout(1) << "win_standalone_election" << dendl;
2126
2127 // bump election epoch, in case the previous epoch included other
2128 // monitors; we need to be able to make the distinction.
2129 elector.declare_standalone_victory();
2130
2131 rank = monmap->get_rank(name);
2132 ceph_assert(rank == 0);
2133 set<int> q;
2134 q.insert(rank);
2135
2136 map<int,Metadata> metadata;
2137 collect_metadata(&metadata[0]);
2138
2139 win_election(elector.get_epoch(), q,
2140 CEPH_FEATURES_ALL,
2141 ceph::features::mon::get_supported(),
2142 ceph_release(),
2143 metadata);
2144 }
2145
2146 const utime_t& Monitor::get_leader_since() const
2147 {
2148 ceph_assert(state == STATE_LEADER);
2149 return leader_since;
2150 }
2151
2152 epoch_t Monitor::get_epoch()
2153 {
2154 return elector.get_epoch();
2155 }
2156
2157 void Monitor::_finish_svc_election()
2158 {
2159 ceph_assert(state == STATE_LEADER || state == STATE_PEON);
2160
2161 for (auto& svc : paxos_service) {
2162 // we already called election_finished() on monmon(); avoid callig twice
2163 if (state == STATE_LEADER && svc.get() == monmon())
2164 continue;
2165 svc->election_finished();
2166 }
2167 }
2168
2169 void Monitor::win_election(epoch_t epoch, const set<int>& active, uint64_t features,
2170 const mon_feature_t& mon_features,
2171 ceph_release_t min_mon_release,
2172 const map<int,Metadata>& metadata)
2173 {
2174 dout(10) << __func__ << " epoch " << epoch << " quorum " << active
2175 << " features " << features
2176 << " mon_features " << mon_features
2177 << " min_mon_release " << min_mon_release
2178 << dendl;
2179 ceph_assert(is_electing());
2180 state = STATE_LEADER;
2181 leader_since = ceph_clock_now();
2182 quorum_since = mono_clock::now();
2183 leader = rank;
2184 quorum = active;
2185 quorum_con_features = features;
2186 quorum_mon_features = mon_features;
2187 quorum_min_mon_release = min_mon_release;
2188 pending_metadata = metadata;
2189 outside_quorum.clear();
2190
2191 clog->info() << "mon." << name << " is new leader, mons " << get_quorum_names()
2192 << " in quorum (ranks " << quorum << ")";
2193
2194 set_leader_commands(get_local_commands(mon_features));
2195
2196 paxos->leader_init();
2197 // NOTE: tell monmap monitor first. This is important for the
2198 // bootstrap case to ensure that the very first paxos proposal
2199 // codifies the monmap. Otherwise any manner of chaos can ensue
2200 // when monitors are call elections or participating in a paxos
2201 // round without agreeing on who the participants are.
2202 monmon()->election_finished();
2203 _finish_svc_election();
2204
2205 logger->inc(l_mon_election_win);
2206
2207 // inject new metadata in first transaction.
2208 {
2209 // include previous metadata for missing mons (that aren't part of
2210 // the current quorum).
2211 map<int,Metadata> m = metadata;
2212 for (unsigned rank = 0; rank < monmap->size(); ++rank) {
2213 if (m.count(rank) == 0 &&
2214 mon_metadata.count(rank)) {
2215 m[rank] = mon_metadata[rank];
2216 }
2217 }
2218
2219 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
2220 // a new transaction immediately after the election finishes. We should
2221 // do that anyway for other reasons, though.
2222 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
2223 bufferlist bl;
2224 encode(m, bl);
2225 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
2226 }
2227
2228 finish_election();
2229 if (monmap->size() > 1 &&
2230 monmap->get_epoch() > 0) {
2231 timecheck_start();
2232 health_tick_start();
2233
2234 // Freshen the health status before doing health_to_clog in case
2235 // our just-completed election changed the health
2236 healthmon()->wait_for_active_ctx(new LambdaContext([this](int r){
2237 dout(20) << "healthmon now active" << dendl;
2238 healthmon()->tick();
2239 if (healthmon()->is_proposing()) {
2240 dout(20) << __func__ << " healthmon proposing, waiting" << dendl;
2241 healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext{this,
2242 [this](int r){
2243 ceph_assert(ceph_mutex_is_locked_by_me(lock));
2244 do_health_to_clog_interval();
2245 }});
2246
2247 } else {
2248 do_health_to_clog_interval();
2249 }
2250 }));
2251
2252 scrub_event_start();
2253 }
2254 }
2255
2256 void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
2257 uint64_t features,
2258 const mon_feature_t& mon_features,
2259 ceph_release_t min_mon_release)
2260 {
2261 state = STATE_PEON;
2262 leader_since = utime_t();
2263 quorum_since = mono_clock::now();
2264 leader = l;
2265 quorum = q;
2266 outside_quorum.clear();
2267 quorum_con_features = features;
2268 quorum_mon_features = mon_features;
2269 quorum_min_mon_release = min_mon_release;
2270 dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
2271 << " quorum is " << quorum << " features are " << quorum_con_features
2272 << " mon_features are " << quorum_mon_features
2273 << " min_mon_release " << min_mon_release
2274 << dendl;
2275
2276 paxos->peon_init();
2277 _finish_svc_election();
2278
2279 logger->inc(l_mon_election_lose);
2280
2281 finish_election();
2282 }
2283
2284 namespace {
2285 std::string collect_compression_algorithms()
2286 {
2287 ostringstream os;
2288 bool printed = false;
2289 for (auto [name, key] : Compressor::compression_algorithms) {
2290 if (printed) {
2291 os << ", ";
2292 } else {
2293 printed = true;
2294 }
2295 std::ignore = key;
2296 os << name;
2297 }
2298 return os.str();
2299 }
2300 }
2301
2302 void Monitor::collect_metadata(Metadata *m)
2303 {
2304 collect_sys_info(m, g_ceph_context);
2305 (*m)["addrs"] = stringify(messenger->get_myaddrs());
2306 (*m)["compression_algorithms"] = collect_compression_algorithms();
2307
2308 // infer storage device
2309 string devname = store->get_devname();
2310 set<string> devnames;
2311 get_raw_devices(devname, &devnames);
2312 map<string,string> errs;
2313 get_device_metadata(devnames, m, &errs);
2314 for (auto& i : errs) {
2315 dout(1) << __func__ << " " << i.first << ": " << i.second << dendl;
2316 }
2317 }
2318
2319 void Monitor::finish_election()
2320 {
2321 apply_quorum_to_compatset_features();
2322 apply_monmap_to_compatset_features();
2323 timecheck_finish();
2324 exited_quorum = utime_t();
2325 finish_contexts(g_ceph_context, waitfor_quorum);
2326 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2327 resend_routed_requests();
2328 update_logger();
2329 register_cluster_logger();
2330
2331 // enable authentication
2332 {
2333 std::lock_guard l(auth_lock);
2334 authmon()->_set_mon_num_rank(monmap->size(), rank);
2335 }
2336
2337 // am i named properly?
2338 string cur_name = monmap->get_name(messenger->get_myaddrs());
2339 if (cur_name != name) {
2340 dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
2341 send_mon_message(
2342 new MMonJoin(monmap->fsid, name, messenger->get_myaddrs()),
2343 *quorum.begin());
2344 }
2345 }
2346
2347 void Monitor::_apply_compatset_features(CompatSet &new_features)
2348 {
2349 if (new_features.compare(features) != 0) {
2350 CompatSet diff = features.unsupported(new_features);
2351 dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2352 features = new_features;
2353
2354 auto t = std::make_shared<MonitorDBStore::Transaction>();
2355 write_features(t);
2356 store->apply_transaction(t);
2357
2358 calc_quorum_requirements();
2359 }
2360 }
2361
2362 void Monitor::apply_quorum_to_compatset_features()
2363 {
2364 CompatSet new_features(features);
2365 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
2366 if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2367 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2368 }
2369 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2370 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
2371 dout(5) << __func__ << dendl;
2372 _apply_compatset_features(new_features);
2373 }
2374
2375 void Monitor::apply_monmap_to_compatset_features()
2376 {
2377 CompatSet new_features(features);
2378 mon_feature_t monmap_features = monmap->get_required_features();
2379
2380 /* persistent monmap features may go into the compatset.
2381 * optional monmap features may not - why?
2382 * because optional monmap features may be set/unset by the admin,
2383 * and possibly by other means that haven't yet been thought out,
2384 * so we can't make the monitor enforce them on start - because they
2385 * may go away.
2386 * this, of course, does not invalidate setting a compatset feature
2387 * for an optional feature - as long as you make sure to clean it up
2388 * once you unset it.
2389 */
2390 if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
2391 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2392 ceph::features::mon::FEATURE_KRAKEN));
2393 // this feature should only ever be set if the quorum supports it.
2394 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
2395 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2396 }
2397 if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
2398 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2399 ceph::features::mon::FEATURE_LUMINOUS));
2400 // this feature should only ever be set if the quorum supports it.
2401 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
2402 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
2403 }
2404 if (monmap_features.contains_all(ceph::features::mon::FEATURE_MIMIC)) {
2405 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2406 ceph::features::mon::FEATURE_MIMIC));
2407 // this feature should only ever be set if the quorum supports it.
2408 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_MIMIC));
2409 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
2410 }
2411 if (monmap_features.contains_all(ceph::features::mon::FEATURE_NAUTILUS)) {
2412 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2413 ceph::features::mon::FEATURE_NAUTILUS));
2414 // this feature should only ever be set if the quorum supports it.
2415 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_NAUTILUS));
2416 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
2417 }
2418 if (monmap_features.contains_all(ceph::features::mon::FEATURE_OCTOPUS)) {
2419 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2420 ceph::features::mon::FEATURE_OCTOPUS));
2421 // this feature should only ever be set if the quorum supports it.
2422 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_OCTOPUS));
2423 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS);
2424 }
2425
2426 dout(5) << __func__ << dendl;
2427 _apply_compatset_features(new_features);
2428 }
2429
2430 void Monitor::calc_quorum_requirements()
2431 {
2432 required_features = 0;
2433
2434 // compatset
2435 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2436 required_features |= CEPH_FEATURE_OSDMAP_ENC;
2437 }
2438 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2439 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2440 }
2441 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) {
2442 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2443 }
2444 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_MIMIC)) {
2445 required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
2446 }
2447 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS)) {
2448 required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
2449 CEPH_FEATUREMASK_CEPHX_V2;
2450 }
2451 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS)) {
2452 required_features |= CEPH_FEATUREMASK_SERVER_OCTOPUS;
2453 }
2454
2455 // monmap
2456 if (monmap->get_required_features().contains_all(
2457 ceph::features::mon::FEATURE_KRAKEN)) {
2458 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2459 }
2460 if (monmap->get_required_features().contains_all(
2461 ceph::features::mon::FEATURE_LUMINOUS)) {
2462 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2463 }
2464 if (monmap->get_required_features().contains_all(
2465 ceph::features::mon::FEATURE_MIMIC)) {
2466 required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
2467 }
2468 if (monmap->get_required_features().contains_all(
2469 ceph::features::mon::FEATURE_NAUTILUS)) {
2470 required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
2471 CEPH_FEATUREMASK_CEPHX_V2;
2472 }
2473 dout(10) << __func__ << " required_features " << required_features << dendl;
2474 }
2475
2476 void Monitor::get_combined_feature_map(FeatureMap *fm)
2477 {
2478 *fm += session_map.feature_map;
2479 for (auto id : quorum) {
2480 if (id != rank) {
2481 *fm += quorum_feature_map[id];
2482 }
2483 }
2484 }
2485
2486 void Monitor::sync_force(Formatter *f)
2487 {
2488 auto tx(std::make_shared<MonitorDBStore::Transaction>());
2489 sync_stash_critical_state(tx);
2490 tx->put("mon_sync", "force_sync", 1);
2491 store->apply_transaction(tx);
2492
2493 f->open_object_section("sync_force");
2494 f->dump_int("ret", 0);
2495 f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2496 f->close_section(); // sync_force
2497 }
2498
2499 void Monitor::_quorum_status(Formatter *f, ostream& ss)
2500 {
2501 bool free_formatter = false;
2502
2503 if (!f) {
2504 // louzy/lazy hack: default to json if no formatter has been defined
2505 f = new JSONFormatter();
2506 free_formatter = true;
2507 }
2508 f->open_object_section("quorum_status");
2509 f->dump_int("election_epoch", get_epoch());
2510
2511 f->open_array_section("quorum");
2512 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2513 f->dump_int("mon", *p);
2514 f->close_section(); // quorum
2515
2516 list<string> quorum_names = get_quorum_names();
2517 f->open_array_section("quorum_names");
2518 for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2519 f->dump_string("mon", *p);
2520 f->close_section(); // quorum_names
2521
2522 f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin()));
2523
2524 if (!quorum.empty()) {
2525 f->dump_int(
2526 "quorum_age",
2527 std::chrono::duration_cast<std::chrono::seconds>(
2528 mono_clock::now() - quorum_since).count());
2529 }
2530
2531 f->open_object_section("features");
2532 f->dump_stream("quorum_con") << quorum_con_features;
2533 quorum_mon_features.dump(f, "quorum_mon");
2534 f->close_section();
2535
2536 f->open_object_section("monmap");
2537 monmap->dump(f);
2538 f->close_section(); // monmap
2539
2540 f->close_section(); // quorum_status
2541 f->flush(ss);
2542 if (free_formatter)
2543 delete f;
2544 }
2545
2546 void Monitor::get_mon_status(Formatter *f)
2547 {
2548 f->open_object_section("mon_status");
2549 f->dump_string("name", name);
2550 f->dump_int("rank", rank);
2551 f->dump_string("state", get_state_name());
2552 f->dump_int("election_epoch", get_epoch());
2553
2554 f->open_array_section("quorum");
2555 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2556 f->dump_int("mon", *p);
2557 }
2558 f->close_section(); // quorum
2559
2560 if (!quorum.empty()) {
2561 f->dump_int(
2562 "quorum_age",
2563 std::chrono::duration_cast<std::chrono::seconds>(
2564 mono_clock::now() - quorum_since).count());
2565 }
2566
2567 f->open_object_section("features");
2568 f->dump_stream("required_con") << required_features;
2569 mon_feature_t req_mon_features = get_required_mon_features();
2570 req_mon_features.dump(f, "required_mon");
2571 f->dump_stream("quorum_con") << quorum_con_features;
2572 quorum_mon_features.dump(f, "quorum_mon");
2573 f->close_section(); // features
2574
2575 f->open_array_section("outside_quorum");
2576 for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2577 f->dump_string("mon", *p);
2578 f->close_section(); // outside_quorum
2579
2580 f->open_array_section("extra_probe_peers");
2581 for (set<entity_addrvec_t>::iterator p = extra_probe_peers.begin();
2582 p != extra_probe_peers.end();
2583 ++p) {
2584 f->dump_object("peer", *p);
2585 }
2586 f->close_section(); // extra_probe_peers
2587
2588 f->open_array_section("sync_provider");
2589 for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2590 p != sync_providers.end();
2591 ++p) {
2592 f->dump_unsigned("cookie", p->second.cookie);
2593 f->dump_object("addrs", p->second.addrs);
2594 f->dump_stream("timeout") << p->second.timeout;
2595 f->dump_unsigned("last_committed", p->second.last_committed);
2596 f->dump_stream("last_key") << p->second.last_key;
2597 }
2598 f->close_section();
2599
2600 if (is_synchronizing()) {
2601 f->open_object_section("sync");
2602 f->dump_stream("sync_provider") << sync_provider;
2603 f->dump_unsigned("sync_cookie", sync_cookie);
2604 f->dump_unsigned("sync_start_version", sync_start_version);
2605 f->close_section();
2606 }
2607
2608 if (g_conf()->mon_sync_provider_kill_at > 0)
2609 f->dump_int("provider_kill_at", g_conf()->mon_sync_provider_kill_at);
2610 if (g_conf()->mon_sync_requester_kill_at > 0)
2611 f->dump_int("requester_kill_at", g_conf()->mon_sync_requester_kill_at);
2612
2613 f->open_object_section("monmap");
2614 monmap->dump(f);
2615 f->close_section();
2616
2617 f->dump_object("feature_map", session_map.feature_map);
2618 f->close_section(); // mon_status
2619 }
2620
2621
2622 // health status to clog
2623
2624 void Monitor::health_tick_start()
2625 {
2626 if (!cct->_conf->mon_health_to_clog ||
2627 cct->_conf->mon_health_to_clog_tick_interval <= 0)
2628 return;
2629
2630 dout(15) << __func__ << dendl;
2631
2632 health_tick_stop();
2633 health_tick_event = timer.add_event_after(
2634 cct->_conf->mon_health_to_clog_tick_interval,
2635 new C_MonContext{this, [this](int r) {
2636 if (r < 0)
2637 return;
2638 health_tick_start();
2639 }});
2640 }
2641
2642 void Monitor::health_tick_stop()
2643 {
2644 dout(15) << __func__ << dendl;
2645
2646 if (health_tick_event) {
2647 timer.cancel_event(health_tick_event);
2648 health_tick_event = NULL;
2649 }
2650 }
2651
2652 ceph::real_clock::time_point Monitor::health_interval_calc_next_update()
2653 {
2654 auto now = ceph::real_clock::now();
2655
2656 auto secs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch());
2657 int remainder = secs.count() % cct->_conf->mon_health_to_clog_interval;
2658 int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
2659 auto next = secs + std::chrono::seconds(adjustment);
2660
2661 dout(20) << __func__
2662 << " now: " << now << ","
2663 << " next: " << next << ","
2664 << " interval: " << cct->_conf->mon_health_to_clog_interval
2665 << dendl;
2666
2667 return ceph::real_clock::time_point{next};
2668 }
2669
2670 void Monitor::health_interval_start()
2671 {
2672 dout(15) << __func__ << dendl;
2673
2674 if (!cct->_conf->mon_health_to_clog ||
2675 cct->_conf->mon_health_to_clog_interval <= 0) {
2676 return;
2677 }
2678
2679 health_interval_stop();
2680 auto next = health_interval_calc_next_update();
2681 health_interval_event = new C_MonContext{this, [this](int r) {
2682 if (r < 0)
2683 return;
2684 do_health_to_clog_interval();
2685 }};
2686 if (!timer.add_event_at(next, health_interval_event)) {
2687 health_interval_event = nullptr;
2688 }
2689 }
2690
2691 void Monitor::health_interval_stop()
2692 {
2693 dout(15) << __func__ << dendl;
2694 if (health_interval_event) {
2695 timer.cancel_event(health_interval_event);
2696 }
2697 health_interval_event = NULL;
2698 }
2699
2700 void Monitor::health_events_cleanup()
2701 {
2702 health_tick_stop();
2703 health_interval_stop();
2704 health_status_cache.reset();
2705 }
2706
2707 void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2708 {
2709 dout(20) << __func__ << dendl;
2710
2711 if (changed.count("mon_health_to_clog")) {
2712 if (!cct->_conf->mon_health_to_clog) {
2713 health_events_cleanup();
2714 return;
2715 } else {
2716 if (!health_tick_event) {
2717 health_tick_start();
2718 }
2719 if (!health_interval_event) {
2720 health_interval_start();
2721 }
2722 }
2723 }
2724
2725 if (changed.count("mon_health_to_clog_interval")) {
2726 if (cct->_conf->mon_health_to_clog_interval <= 0) {
2727 health_interval_stop();
2728 } else {
2729 health_interval_start();
2730 }
2731 }
2732
2733 if (changed.count("mon_health_to_clog_tick_interval")) {
2734 if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2735 health_tick_stop();
2736 } else {
2737 health_tick_start();
2738 }
2739 }
2740 }
2741
2742 void Monitor::do_health_to_clog_interval()
2743 {
2744 // outputting to clog may have been disabled in the conf
2745 // since we were scheduled.
2746 if (!cct->_conf->mon_health_to_clog ||
2747 cct->_conf->mon_health_to_clog_interval <= 0)
2748 return;
2749
2750 dout(10) << __func__ << dendl;
2751
2752 // do we have a cached value for next_clog_update? if not,
2753 // do we know when the last update was?
2754
2755 do_health_to_clog(true);
2756 health_interval_start();
2757 }
2758
2759 void Monitor::do_health_to_clog(bool force)
2760 {
2761 // outputting to clog may have been disabled in the conf
2762 // since we were scheduled.
2763 if (!cct->_conf->mon_health_to_clog ||
2764 cct->_conf->mon_health_to_clog_interval <= 0)
2765 return;
2766
2767 dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2768
2769 string summary;
2770 health_status_t level = healthmon()->get_health_status(false, nullptr, &summary);
2771 if (!force &&
2772 summary == health_status_cache.summary &&
2773 level == health_status_cache.overall)
2774 return;
2775 clog->health(level) << "overall " << summary;
2776 health_status_cache.summary = summary;
2777 health_status_cache.overall = level;
2778 }
2779
2780 void Monitor::log_health(
2781 const health_check_map_t& updated,
2782 const health_check_map_t& previous,
2783 MonitorDBStore::TransactionRef t)
2784 {
2785 if (!g_conf()->mon_health_to_clog) {
2786 return;
2787 }
2788
2789 const utime_t now = ceph_clock_now();
2790
2791 // FIXME: log atomically as part of @t instead of using clog.
2792 dout(10) << __func__ << " updated " << updated.checks.size()
2793 << " previous " << previous.checks.size()
2794 << dendl;
2795 const auto min_log_period = g_conf().get_val<int64_t>(
2796 "mon_health_log_update_period");
2797 for (auto& p : updated.checks) {
2798 auto q = previous.checks.find(p.first);
2799 bool logged = false;
2800 if (q == previous.checks.end()) {
2801 // new
2802 ostringstream ss;
2803 ss << "Health check failed: " << p.second.summary << " ("
2804 << p.first << ")";
2805 clog->health(p.second.severity) << ss.str();
2806
2807 logged = true;
2808 } else {
2809 if (p.second.summary != q->second.summary ||
2810 p.second.severity != q->second.severity) {
2811
2812 auto status_iter = health_check_log_times.find(p.first);
2813 if (status_iter != health_check_log_times.end()) {
2814 if (p.second.severity == q->second.severity &&
2815 now - status_iter->second.updated_at < min_log_period) {
2816 // We already logged this recently and the severity is unchanged,
2817 // so skip emitting an update of the summary string.
2818 // We'll get an update out of tick() later if the check
2819 // is still failing.
2820 continue;
2821 }
2822 }
2823
2824 // summary or severity changed (ignore detail changes at this level)
2825 ostringstream ss;
2826 ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
2827 clog->health(p.second.severity) << ss.str();
2828
2829 logged = true;
2830 }
2831 }
2832 // Record the time at which we last logged, so that we can check this
2833 // when considering whether/when to print update messages.
2834 if (logged) {
2835 auto iter = health_check_log_times.find(p.first);
2836 if (iter == health_check_log_times.end()) {
2837 health_check_log_times.emplace(p.first, HealthCheckLogStatus(
2838 p.second.severity, p.second.summary, now));
2839 } else {
2840 iter->second = HealthCheckLogStatus(
2841 p.second.severity, p.second.summary, now);
2842 }
2843 }
2844 }
2845 for (auto& p : previous.checks) {
2846 if (!updated.checks.count(p.first)) {
2847 // cleared
2848 ostringstream ss;
2849 if (p.first == "DEGRADED_OBJECTS") {
2850 clog->info() << "All degraded objects recovered";
2851 } else if (p.first == "OSD_FLAGS") {
2852 clog->info() << "OSD flags cleared";
2853 } else {
2854 clog->info() << "Health check cleared: " << p.first << " (was: "
2855 << p.second.summary << ")";
2856 }
2857
2858 if (health_check_log_times.count(p.first)) {
2859 health_check_log_times.erase(p.first);
2860 }
2861 }
2862 }
2863
2864 if (previous.checks.size() && updated.checks.size() == 0) {
2865 // We might be going into a fully healthy state, check
2866 // other subsystems
2867 bool any_checks = false;
2868 for (auto& svc : paxos_service) {
2869 if (&(svc->get_health_checks()) == &(previous)) {
2870 // Ignore the ones we're clearing right now
2871 continue;
2872 }
2873
2874 if (svc->get_health_checks().checks.size() > 0) {
2875 any_checks = true;
2876 break;
2877 }
2878 }
2879 if (!any_checks) {
2880 clog->info() << "Cluster is now healthy";
2881 }
2882 }
2883 }
2884
2885 void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
2886 {
2887 if (f)
2888 f->open_object_section("status");
2889
2890 mono_clock::time_point now = mono_clock::now();
2891 if (f) {
2892 f->dump_stream("fsid") << monmap->get_fsid();
2893 healthmon()->get_health_status(false, f, nullptr);
2894 f->dump_unsigned("election_epoch", get_epoch());
2895 {
2896 f->open_array_section("quorum");
2897 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2898 f->dump_int("rank", *p);
2899 f->close_section();
2900 f->open_array_section("quorum_names");
2901 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2902 f->dump_string("id", monmap->get_name(*p));
2903 f->close_section();
2904 f->dump_int(
2905 "quorum_age",
2906 std::chrono::duration_cast<std::chrono::seconds>(
2907 mono_clock::now() - quorum_since).count());
2908 }
2909 f->open_object_section("monmap");
2910 monmap->dump_summary(f);
2911 f->close_section();
2912 f->open_object_section("osdmap");
2913 osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
2914 f->close_section();
2915 f->open_object_section("pgmap");
2916 mgrstatmon()->print_summary(f, NULL);
2917 f->close_section();
2918 f->open_object_section("fsmap");
2919 mdsmon()->get_fsmap().print_summary(f, NULL);
2920 f->close_section();
2921 f->open_object_section("mgrmap");
2922 mgrmon()->get_map().print_summary(f, nullptr);
2923 f->close_section();
2924
2925 f->dump_object("servicemap", mgrstatmon()->get_service_map());
2926
2927 f->open_object_section("progress_events");
2928 for (auto& i : mgrstatmon()->get_progress_events()) {
2929 f->dump_object(i.first.c_str(), i.second);
2930 }
2931 f->close_section();
2932
2933 f->close_section();
2934 } else {
2935 ss << " cluster:\n";
2936 ss << " id: " << monmap->get_fsid() << "\n";
2937
2938 string health;
2939 healthmon()->get_health_status(false, nullptr, &health,
2940 "\n ", "\n ");
2941 ss << " health: " << health << "\n";
2942
2943 ss << "\n \n services:\n";
2944 {
2945 size_t maxlen = 3;
2946 auto& service_map = mgrstatmon()->get_service_map();
2947 for (auto& p : service_map.services) {
2948 maxlen = std::max(maxlen, p.first.size());
2949 }
2950 string spacing(maxlen - 3, ' ');
2951 const auto quorum_names = get_quorum_names();
2952 const auto mon_count = monmap->mon_info.size();
2953 ss << " mon: " << spacing << mon_count << " daemons, quorum "
2954 << quorum_names << " (age " << timespan_str(now - quorum_since) << ")";
2955 if (quorum_names.size() != mon_count) {
2956 std::list<std::string> out_of_q;
2957 for (size_t i = 0; i < monmap->ranks.size(); ++i) {
2958 if (quorum.count(i) == 0) {
2959 out_of_q.push_back(monmap->ranks[i]);
2960 }
2961 }
2962 ss << ", out of quorum: " << joinify(out_of_q.begin(),
2963 out_of_q.end(), std::string(", "));
2964 }
2965 ss << "\n";
2966 if (mgrmon()->in_use()) {
2967 ss << " mgr: " << spacing;
2968 mgrmon()->get_map().print_summary(nullptr, &ss);
2969 ss << "\n";
2970 }
2971 if (mdsmon()->should_print_status()) {
2972 ss << " mds: " << spacing << mdsmon()->get_fsmap() << "\n";
2973 }
2974 ss << " osd: " << spacing;
2975 osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
2976 ss << "\n";
2977 for (auto& p : service_map.services) {
2978 const std::string &service = p.first;
2979 // filter out normal ceph entity types
2980 if (ServiceMap::is_normal_ceph_entity(service)) {
2981 continue;
2982 }
2983 ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
2984 << p.second.get_summary() << "\n";
2985 }
2986 }
2987
2988 {
2989 auto& service_map = mgrstatmon()->get_service_map();
2990 if (!service_map.services.empty()) {
2991 ss << "\n \n task status:\n";
2992 {
2993 for (auto &p : service_map.services) {
2994 ss << p.second.get_task_summary(p.first);
2995 }
2996 }
2997 }
2998 }
2999
3000 ss << "\n \n data:\n";
3001 mgrstatmon()->print_summary(NULL, &ss);
3002
3003 auto& pem = mgrstatmon()->get_progress_events();
3004 if (!pem.empty()) {
3005 ss << "\n \n progress:\n";
3006 for (auto& i : pem) {
3007 ss << " " << i.second.message << "\n";
3008 }
3009 }
3010 ss << "\n ";
3011 }
3012 }
3013
3014 void Monitor::_generate_command_map(cmdmap_t& cmdmap,
3015 map<string,string> &param_str_map)
3016 {
3017 for (auto p = cmdmap.begin(); p != cmdmap.end(); ++p) {
3018 if (p->first == "prefix")
3019 continue;
3020 if (p->first == "caps") {
3021 vector<string> cv;
3022 if (cmd_getval(cmdmap, "caps", cv) &&
3023 cv.size() % 2 == 0) {
3024 for (unsigned i = 0; i < cv.size(); i += 2) {
3025 string k = string("caps_") + cv[i];
3026 param_str_map[k] = cv[i + 1];
3027 }
3028 continue;
3029 }
3030 }
3031 param_str_map[p->first] = cmd_vartype_stringify(p->second);
3032 }
3033 }
3034
3035 const MonCommand *Monitor::_get_moncommand(
3036 const string &cmd_prefix,
3037 const vector<MonCommand>& cmds)
3038 {
3039 for (auto& c : cmds) {
3040 if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
3041 return &c;
3042 }
3043 }
3044 return nullptr;
3045 }
3046
3047 bool Monitor::_allowed_command(MonSession *s, const string &module,
3048 const string &prefix, const cmdmap_t& cmdmap,
3049 const map<string,string>& param_str_map,
3050 const MonCommand *this_cmd) {
3051
3052 bool cmd_r = this_cmd->requires_perm('r');
3053 bool cmd_w = this_cmd->requires_perm('w');
3054 bool cmd_x = this_cmd->requires_perm('x');
3055
3056 bool capable = s->caps.is_capable(
3057 g_ceph_context,
3058 s->entity_name,
3059 module, prefix, param_str_map,
3060 cmd_r, cmd_w, cmd_x,
3061 s->get_peer_socket_addr());
3062
3063 dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
3064 return capable;
3065 }
3066
3067 void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
3068 Formatter *f,
3069 uint64_t features,
3070 bufferlist *rdata)
3071 {
3072 int cmdnum = 0;
3073 f->open_object_section("command_descriptions");
3074 for (const auto &cmd : commands) {
3075 unsigned flags = cmd.flags;
3076 ostringstream secname;
3077 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
3078 dump_cmddesc_to_json(f, features, secname.str(),
3079 cmd.cmdstring, cmd.helpstring, cmd.module,
3080 cmd.req_perms, flags);
3081 cmdnum++;
3082 }
3083 f->close_section(); // command_descriptions
3084
3085 f->flush(*rdata);
3086 }
3087
3088 bool Monitor::is_keyring_required()
3089 {
3090 return auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX) ||
3091 auth_service_required.is_supported_auth(CEPH_AUTH_CEPHX) ||
3092 auth_cluster_required.is_supported_auth(CEPH_AUTH_GSS) ||
3093 auth_service_required.is_supported_auth(CEPH_AUTH_GSS);
3094 }
3095
3096 struct C_MgrProxyCommand : public Context {
3097 Monitor *mon;
3098 MonOpRequestRef op;
3099 uint64_t size;
3100 bufferlist outbl;
3101 string outs;
3102 C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
3103 : mon(mon), op(op), size(s) { }
3104 void finish(int r) {
3105 std::lock_guard l(mon->lock);
3106 mon->mgr_proxy_bytes -= size;
3107 mon->reply_command(op, r, outs, outbl, 0);
3108 }
3109 };
3110
3111 void Monitor::handle_tell_command(MonOpRequestRef op)
3112 {
3113 ceph_assert(op->is_type_command());
3114 MCommand *m = static_cast<MCommand*>(op->get_req());
3115 if (m->fsid != monmap->fsid) {
3116 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
3117 return reply_tell_command(op, -EACCES, "wrong fsid");
3118 }
3119 MonSession *session = op->get_session();
3120 if (!session) {
3121 dout(5) << __func__ << " dropping stray message " << *m << dendl;
3122 return;
3123 }
3124 cmdmap_t cmdmap;
3125 if (stringstream ss; !cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3126 return reply_tell_command(op, -EINVAL, ss.str());
3127 }
3128 map<string,string> param_str_map;
3129 _generate_command_map(cmdmap, param_str_map);
3130 string prefix;
3131 if (!cmd_getval(cmdmap, "prefix", prefix)) {
3132 return reply_tell_command(op, -EINVAL, "no prefix");
3133 }
3134 if (auto cmd = _get_moncommand(prefix,
3135 get_local_commands(quorum_mon_features));
3136 cmd) {
3137 if (cmd->is_obsolete() ||
3138 (cct->_conf->mon_debug_deprecated_as_obsolete &&
3139 cmd->is_deprecated())) {
3140 return reply_tell_command(op, -ENOTSUP,
3141 "command is obsolete; "
3142 "please check usage and/or man page");
3143 }
3144 }
3145 // see if command is whitelisted
3146 if (!session->caps.is_capable(
3147 g_ceph_context,
3148 session->entity_name,
3149 "mon", prefix, param_str_map,
3150 true, true, true,
3151 session->get_peer_socket_addr())) {
3152 return reply_tell_command(op, -EACCES, "insufficient caps");
3153 }
3154 // pass it to asok
3155 cct->get_admin_socket()->queue_tell_command(m);
3156 }
3157
3158 void Monitor::handle_command(MonOpRequestRef op)
3159 {
3160 ceph_assert(op->is_type_command());
3161 auto m = op->get_req<MMonCommand>();
3162 if (m->fsid != monmap->fsid) {
3163 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid
3164 << dendl;
3165 reply_command(op, -EPERM, "wrong fsid", 0);
3166 return;
3167 }
3168
3169 MonSession *session = op->get_session();
3170 if (!session) {
3171 dout(5) << __func__ << " dropping stray message " << *m << dendl;
3172 return;
3173 }
3174
3175 if (m->cmd.empty()) {
3176 reply_command(op, -EINVAL, "no command specified", 0);
3177 return;
3178 }
3179
3180 string prefix;
3181 vector<string> fullcmd;
3182 cmdmap_t cmdmap;
3183 stringstream ss, ds;
3184 bufferlist rdata;
3185 string rs;
3186 int r = -EINVAL;
3187 rs = "unrecognized command";
3188
3189 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3190 // ss has reason for failure
3191 r = -EINVAL;
3192 rs = ss.str();
3193 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3194 reply_command(op, r, rs, 0);
3195 return;
3196 }
3197
3198 // check return value. If no prefix parameter provided,
3199 // return value will be false, then return error info.
3200 if (!cmd_getval(cmdmap, "prefix", prefix)) {
3201 reply_command(op, -EINVAL, "command prefix not found", 0);
3202 return;
3203 }
3204
3205 // check prefix is empty
3206 if (prefix.empty()) {
3207 reply_command(op, -EINVAL, "command prefix must not be empty", 0);
3208 return;
3209 }
3210
3211 if (prefix == "get_command_descriptions") {
3212 bufferlist rdata;
3213 Formatter *f = Formatter::create("json");
3214
3215 std::vector<MonCommand> commands = static_cast<MgrMonitor*>(
3216 paxos_service[PAXOS_MGR].get())->get_command_descs();
3217
3218 for (auto& c : leader_mon_commands) {
3219 commands.push_back(c);
3220 }
3221
3222 auto features = m->get_connection()->get_features();
3223 format_command_descriptions(commands, f, features, &rdata);
3224 delete f;
3225 reply_command(op, 0, "", rdata, 0);
3226 return;
3227 }
3228
3229 string module;
3230 string err;
3231
3232 dout(0) << "handle_command " << *m << dendl;
3233
3234 string format;
3235 cmd_getval(cmdmap, "format", format, string("plain"));
3236 boost::scoped_ptr<Formatter> f(Formatter::create(format));
3237
3238 get_str_vec(prefix, fullcmd);
3239
3240 // make sure fullcmd is not empty.
3241 // invalid prefix will cause empty vector fullcmd.
3242 // such as, prefix=";,,;"
3243 if (fullcmd.empty()) {
3244 reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
3245 return;
3246 }
3247
3248 module = fullcmd[0];
3249
3250 // validate command is in leader map
3251
3252 const MonCommand *leader_cmd;
3253 const auto& mgr_cmds = mgrmon()->get_command_descs();
3254 const MonCommand *mgr_cmd = nullptr;
3255 if (!mgr_cmds.empty()) {
3256 mgr_cmd = _get_moncommand(prefix, mgr_cmds);
3257 }
3258 leader_cmd = _get_moncommand(prefix, leader_mon_commands);
3259 if (!leader_cmd) {
3260 leader_cmd = mgr_cmd;
3261 if (!leader_cmd) {
3262 reply_command(op, -EINVAL, "command not known", 0);
3263 return;
3264 }
3265 }
3266 // validate command is in our map & matches, or forward if it is allowed
3267 const MonCommand *mon_cmd = _get_moncommand(
3268 prefix,
3269 get_local_commands(quorum_mon_features));
3270 if (!mon_cmd) {
3271 mon_cmd = mgr_cmd;
3272 }
3273 if (!is_leader()) {
3274 if (!mon_cmd) {
3275 if (leader_cmd->is_noforward()) {
3276 reply_command(op, -EINVAL,
3277 "command not locally supported and not allowed to forward",
3278 0);
3279 return;
3280 }
3281 dout(10) << "Command not locally supported, forwarding request "
3282 << m << dendl;
3283 forward_request_leader(op);
3284 return;
3285 } else if (!mon_cmd->is_compat(leader_cmd)) {
3286 if (mon_cmd->is_noforward()) {
3287 reply_command(op, -EINVAL,
3288 "command not compatible with leader and not allowed to forward",
3289 0);
3290 return;
3291 }
3292 dout(10) << "Command not compatible with leader, forwarding request "
3293 << m << dendl;
3294 forward_request_leader(op);
3295 return;
3296 }
3297 }
3298
3299 if (mon_cmd->is_obsolete() ||
3300 (cct->_conf->mon_debug_deprecated_as_obsolete
3301 && mon_cmd->is_deprecated())) {
3302 reply_command(op, -ENOTSUP,
3303 "command is obsolete; please check usage and/or man page",
3304 0);
3305 return;
3306 }
3307
3308 if (session->proxy_con && mon_cmd->is_noforward()) {
3309 dout(10) << "Got forward for noforward command " << m << dendl;
3310 reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
3311 return;
3312 }
3313
3314 /* what we perceive as being the service the command falls under */
3315 string service(mon_cmd->module);
3316
3317 dout(25) << __func__ << " prefix='" << prefix
3318 << "' module='" << module
3319 << "' service='" << service << "'" << dendl;
3320
3321 bool cmd_is_rw =
3322 (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
3323
3324 // validate user's permissions for requested command
3325 map<string,string> param_str_map;
3326 _generate_command_map(cmdmap, param_str_map);
3327 if (!_allowed_command(session, service, prefix, cmdmap,
3328 param_str_map, mon_cmd)) {
3329 dout(1) << __func__ << " access denied" << dendl;
3330 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3331 << "from='" << session->name << " " << session->addrs << "' "
3332 << "entity='" << session->entity_name << "' "
3333 << "cmd=" << m->cmd << ": access denied";
3334 reply_command(op, -EACCES, "access denied", 0);
3335 return;
3336 }
3337
3338 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3339 << "from='" << session->name << " " << session->addrs << "' "
3340 << "entity='" << session->entity_name << "' "
3341 << "cmd=" << m->cmd << ": dispatch";
3342
3343 // compat kludge for legacy clients trying to tell commands that are
3344 // new. see bottom of MonCommands.h. we need to handle both (1)
3345 // pre-octopus clients and (2) octopus clients with a mix of pre-octopus
3346 // and octopus mons.
3347 if ((!HAVE_FEATURE(m->get_connection()->get_features(), SERVER_OCTOPUS) ||
3348 monmap->min_mon_release < ceph_release_t::octopus) &&
3349 (prefix == "injectargs" ||
3350 prefix == "smart" ||
3351 prefix == "mon_status" ||
3352 prefix == "heap")) {
3353 if (m->get_connection()->get_messenger() == 0) {
3354 // Prior to octopus, monitors might forward these messages
3355 // around. that was broken at baseline, and if we try to process
3356 // this message now, it will assert out when we try to send a
3357 // message in reply from the asok/tell worker (see
3358 // AnonConnection). Just reply with an error.
3359 dout(5) << __func__ << " failing forwarded command from a (presumably) "
3360 << "pre-octopus peer" << dendl;
3361 reply_command(
3362 op, -EBUSY,
3363 "failing forwarded tell command in mixed-version mon cluster", 0);
3364 return;
3365 }
3366 dout(5) << __func__ << " passing command to tell/asok" << dendl;
3367 cct->get_admin_socket()->queue_tell_command(m);
3368 return;
3369 }
3370
3371 if (mon_cmd->is_mgr()) {
3372 const auto& hdr = m->get_header();
3373 uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
3374 uint64_t max = g_conf().get_val<Option::size_t>("mon_client_bytes")
3375 * g_conf().get_val<double>("mon_mgr_proxy_client_bytes_ratio");
3376 if (mgr_proxy_bytes + size > max) {
3377 dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
3378 << " + " << size << " > max " << max << dendl;
3379 reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
3380 return;
3381 }
3382 mgr_proxy_bytes += size;
3383 dout(10) << __func__ << " proxying mgr command (+" << size
3384 << " -> " << mgr_proxy_bytes << ")" << dendl;
3385 C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
3386 mgr_client.start_command(m->cmd,
3387 m->get_data(),
3388 &fin->outbl,
3389 &fin->outs,
3390 new C_OnFinisher(fin, &finisher));
3391 return;
3392 }
3393
3394 if ((module == "mds" || module == "fs") &&
3395 prefix != "fs authorize") {
3396 mdsmon()->dispatch(op);
3397 return;
3398 }
3399 if ((module == "osd" ||
3400 prefix == "pg map" ||
3401 prefix == "pg repeer") &&
3402 prefix != "osd last-stat-seq") {
3403 osdmon()->dispatch(op);
3404 return;
3405 }
3406 if (module == "config") {
3407 configmon()->dispatch(op);
3408 return;
3409 }
3410
3411 if (module == "mon" &&
3412 /* Let the Monitor class handle the following commands:
3413 * 'mon scrub'
3414 */
3415 prefix != "mon scrub" &&
3416 prefix != "mon metadata" &&
3417 prefix != "mon versions" &&
3418 prefix != "mon count-metadata" &&
3419 prefix != "mon ok-to-stop" &&
3420 prefix != "mon ok-to-add-offline" &&
3421 prefix != "mon ok-to-rm") {
3422 monmon()->dispatch(op);
3423 return;
3424 }
3425 if (module == "health" && prefix != "health") {
3426 healthmon()->dispatch(op);
3427 return;
3428 }
3429 if (module == "auth" || prefix == "fs authorize") {
3430 authmon()->dispatch(op);
3431 return;
3432 }
3433 if (module == "log") {
3434 logmon()->dispatch(op);
3435 return;
3436 }
3437
3438 if (module == "config-key") {
3439 config_key_service->dispatch(op);
3440 return;
3441 }
3442
3443 if (module == "mgr") {
3444 mgrmon()->dispatch(op);
3445 return;
3446 }
3447
3448 if (prefix == "fsid") {
3449 if (f) {
3450 f->open_object_section("fsid");
3451 f->dump_stream("fsid") << monmap->fsid;
3452 f->close_section();
3453 f->flush(rdata);
3454 } else {
3455 ds << monmap->fsid;
3456 rdata.append(ds);
3457 }
3458 reply_command(op, 0, "", rdata, 0);
3459 return;
3460 }
3461
3462 if (prefix == "mon scrub") {
3463 wait_for_paxos_write();
3464 if (is_leader()) {
3465 int r = scrub_start();
3466 reply_command(op, r, "", rdata, 0);
3467 } else if (is_peon()) {
3468 forward_request_leader(op);
3469 } else {
3470 reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3471 }
3472 return;
3473 }
3474
3475 if (prefix == "time-sync-status") {
3476 if (!f)
3477 f.reset(Formatter::create("json-pretty"));
3478 f->open_object_section("time_sync");
3479 if (!timecheck_skews.empty()) {
3480 f->open_object_section("time_skew_status");
3481 for (auto& i : timecheck_skews) {
3482 double skew = i.second;
3483 double latency = timecheck_latencies[i.first];
3484 string name = monmap->get_name(i.first);
3485 ostringstream tcss;
3486 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
3487 f->open_object_section(name.c_str());
3488 f->dump_float("skew", skew);
3489 f->dump_float("latency", latency);
3490 f->dump_stream("health") << tcstatus;
3491 if (tcstatus != HEALTH_OK) {
3492 f->dump_stream("details") << tcss.str();
3493 }
3494 f->close_section();
3495 }
3496 f->close_section();
3497 }
3498 f->open_object_section("timechecks");
3499 f->dump_unsigned("epoch", get_epoch());
3500 f->dump_int("round", timecheck_round);
3501 f->dump_stream("round_status") << ((timecheck_round%2) ?
3502 "on-going" : "finished");
3503 f->close_section();
3504 f->close_section();
3505 f->flush(rdata);
3506 r = 0;
3507 rs = "";
3508 } else if (prefix == "status" ||
3509 prefix == "health" ||
3510 prefix == "df") {
3511 string detail;
3512 cmd_getval(cmdmap, "detail", detail);
3513
3514 if (prefix == "status") {
3515 // get_cluster_status handles f == NULL
3516 get_cluster_status(ds, f.get());
3517
3518 if (f) {
3519 f->flush(ds);
3520 ds << '\n';
3521 }
3522 rdata.append(ds);
3523 } else if (prefix == "health") {
3524 string plain;
3525 healthmon()->get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
3526 if (f) {
3527 f->flush(rdata);
3528 } else {
3529 rdata.append(plain);
3530 }
3531 } else if (prefix == "df") {
3532 bool verbose = (detail == "detail");
3533 if (f)
3534 f->open_object_section("stats");
3535
3536 mgrstatmon()->dump_cluster_stats(&ds, f.get(), verbose);
3537 if (!f) {
3538 ds << "\n \n";
3539 }
3540 mgrstatmon()->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
3541
3542 if (f) {
3543 f->close_section();
3544 f->flush(ds);
3545 ds << '\n';
3546 }
3547 } else {
3548 ceph_abort_msg("We should never get here!");
3549 return;
3550 }
3551 rdata.append(ds);
3552 rs = "";
3553 r = 0;
3554 } else if (prefix == "report") {
3555
3556 // this must be formatted, in its current form
3557 if (!f)
3558 f.reset(Formatter::create("json-pretty"));
3559 f->open_object_section("report");
3560 f->dump_stream("cluster_fingerprint") << fingerprint;
3561 f->dump_string("version", ceph_version_to_str());
3562 f->dump_string("commit", git_version_to_str());
3563 f->dump_stream("timestamp") << ceph_clock_now();
3564
3565 vector<string> tagsvec;
3566 cmd_getval(cmdmap, "tags", tagsvec);
3567 string tagstr = str_join(tagsvec, " ");
3568 if (!tagstr.empty())
3569 tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3570 f->dump_string("tag", tagstr);
3571
3572 healthmon()->get_health_status(true, f.get(), nullptr);
3573
3574 monmon()->dump_info(f.get());
3575 osdmon()->dump_info(f.get());
3576 mdsmon()->dump_info(f.get());
3577 authmon()->dump_info(f.get());
3578 mgrstatmon()->dump_info(f.get());
3579
3580 paxos->dump_info(f.get());
3581
3582 f->close_section();
3583 f->flush(rdata);
3584
3585 ostringstream ss2;
3586 ss2 << "report " << rdata.crc32c(CEPH_MON_PORT_LEGACY);
3587 rs = ss2.str();
3588 r = 0;
3589 } else if (prefix == "osd last-stat-seq") {
3590 int64_t osd;
3591 cmd_getval(cmdmap, "id", osd);
3592 uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3593 if (f) {
3594 f->dump_unsigned("seq", seq);
3595 f->flush(ds);
3596 } else {
3597 ds << seq;
3598 rdata.append(ds);
3599 }
3600 rs = "";
3601 r = 0;
3602 } else if (prefix == "node ls") {
3603 string node_type("all");
3604 cmd_getval(cmdmap, "type", node_type);
3605 if (!f)
3606 f.reset(Formatter::create("json-pretty"));
3607 if (node_type == "all") {
3608 f->open_object_section("nodes");
3609 print_nodes(f.get(), ds);
3610 osdmon()->print_nodes(f.get());
3611 mdsmon()->print_nodes(f.get());
3612 mgrmon()->print_nodes(f.get());
3613 f->close_section();
3614 } else if (node_type == "mon") {
3615 print_nodes(f.get(), ds);
3616 } else if (node_type == "osd") {
3617 osdmon()->print_nodes(f.get());
3618 } else if (node_type == "mds") {
3619 mdsmon()->print_nodes(f.get());
3620 } else if (node_type == "mgr") {
3621 mgrmon()->print_nodes(f.get());
3622 }
3623 f->flush(ds);
3624 rdata.append(ds);
3625 rs = "";
3626 r = 0;
3627 } else if (prefix == "features") {
3628 if (!is_leader() && !is_peon()) {
3629 dout(10) << " waiting for quorum" << dendl;
3630 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3631 return;
3632 }
3633 if (!is_leader()) {
3634 forward_request_leader(op);
3635 return;
3636 }
3637 if (!f)
3638 f.reset(Formatter::create("json-pretty"));
3639 FeatureMap fm;
3640 get_combined_feature_map(&fm);
3641 f->dump_object("features", fm);
3642 f->flush(rdata);
3643 rs = "";
3644 r = 0;
3645 } else if (prefix == "mon metadata") {
3646 if (!f)
3647 f.reset(Formatter::create("json-pretty"));
3648
3649 string name;
3650 bool all = !cmd_getval(cmdmap, "id", name);
3651 if (!all) {
3652 // Dump a single mon's metadata
3653 int mon = monmap->get_rank(name);
3654 if (mon < 0) {
3655 rs = "requested mon not found";
3656 r = -ENOENT;
3657 goto out;
3658 }
3659 f->open_object_section("mon_metadata");
3660 r = get_mon_metadata(mon, f.get(), ds);
3661 f->close_section();
3662 } else {
3663 // Dump all mons' metadata
3664 r = 0;
3665 f->open_array_section("mon_metadata");
3666 for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3667 std::ostringstream get_err;
3668 f->open_object_section("mon");
3669 f->dump_string("name", monmap->get_name(rank));
3670 r = get_mon_metadata(rank, f.get(), get_err);
3671 f->close_section();
3672 if (r == -ENOENT || r == -EINVAL) {
3673 dout(1) << get_err.str() << dendl;
3674 // Drop error, list what metadata we do have
3675 r = 0;
3676 } else if (r != 0) {
3677 derr << "Unexpected error from get_mon_metadata: "
3678 << cpp_strerror(r) << dendl;
3679 ds << get_err.str();
3680 break;
3681 }
3682 }
3683 f->close_section();
3684 }
3685
3686 f->flush(ds);
3687 rdata.append(ds);
3688 rs = "";
3689 } else if (prefix == "mon versions") {
3690 if (!f)
3691 f.reset(Formatter::create("json-pretty"));
3692 count_metadata("ceph_version", f.get());
3693 f->flush(ds);
3694 rdata.append(ds);
3695 rs = "";
3696 r = 0;
3697 } else if (prefix == "mon count-metadata") {
3698 if (!f)
3699 f.reset(Formatter::create("json-pretty"));
3700 string field;
3701 cmd_getval(cmdmap, "property", field);
3702 count_metadata(field, f.get());
3703 f->flush(ds);
3704 rdata.append(ds);
3705 rs = "";
3706 r = 0;
3707 } else if (prefix == "quorum_status") {
3708 // make sure our map is readable and up to date
3709 if (!is_leader() && !is_peon()) {
3710 dout(10) << " waiting for quorum" << dendl;
3711 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3712 return;
3713 }
3714 _quorum_status(f.get(), ds);
3715 rdata.append(ds);
3716 rs = "";
3717 r = 0;
3718 } else if (prefix == "mon ok-to-stop") {
3719 vector<string> ids;
3720 if (!cmd_getval(cmdmap, "ids", ids)) {
3721 r = -EINVAL;
3722 goto out;
3723 }
3724 set<string> wouldbe;
3725 for (auto rank : quorum) {
3726 wouldbe.insert(monmap->get_name(rank));
3727 }
3728 for (auto& n : ids) {
3729 if (monmap->contains(n)) {
3730 wouldbe.erase(n);
3731 }
3732 }
3733 if (wouldbe.size() < monmap->min_quorum_size()) {
3734 r = -EBUSY;
3735 rs = "not enough monitors would be available (" + stringify(wouldbe) +
3736 ") after stopping mons " + stringify(ids);
3737 goto out;
3738 }
3739 r = 0;
3740 rs = "quorum should be preserved (" + stringify(wouldbe) +
3741 ") after stopping " + stringify(ids);
3742 } else if (prefix == "mon ok-to-add-offline") {
3743 if (quorum.size() < monmap->min_quorum_size(monmap->size() + 1)) {
3744 rs = "adding a monitor may break quorum (until that monitor starts)";
3745 r = -EBUSY;
3746 goto out;
3747 }
3748 rs = "adding another mon that is not yet online will not break quorum";
3749 r = 0;
3750 } else if (prefix == "mon ok-to-rm") {
3751 string id;
3752 if (!cmd_getval(cmdmap, "id", id)) {
3753 r = -EINVAL;
3754 rs = "must specify a monitor id";
3755 goto out;
3756 }
3757 if (!monmap->contains(id)) {
3758 r = 0;
3759 rs = "mon." + id + " does not exist";
3760 goto out;
3761 }
3762 int rank = monmap->get_rank(id);
3763 if (quorum.count(rank) &&
3764 quorum.size() - 1 < monmap->min_quorum_size(monmap->size() - 1)) {
3765 r = -EBUSY;
3766 rs = "removing mon." + id + " would break quorum";
3767 goto out;
3768 }
3769 r = 0;
3770 rs = "safe to remove mon." + id;
3771 } else if (prefix == "version") {
3772 if (f) {
3773 f->open_object_section("version");
3774 f->dump_string("version", pretty_version_to_str());
3775 f->close_section();
3776 f->flush(ds);
3777 } else {
3778 ds << pretty_version_to_str();
3779 }
3780 rdata.append(ds);
3781 rs = "";
3782 r = 0;
3783 } else if (prefix == "versions") {
3784 if (!f)
3785 f.reset(Formatter::create("json-pretty"));
3786 map<string,int> overall;
3787 f->open_object_section("version");
3788 map<string,int> mon, mgr, osd, mds;
3789
3790 count_metadata("ceph_version", &mon);
3791 f->open_object_section("mon");
3792 for (auto& p : mon) {
3793 f->dump_int(p.first.c_str(), p.second);
3794 overall[p.first] += p.second;
3795 }
3796 f->close_section();
3797
3798 mgrmon()->count_metadata("ceph_version", &mgr);
3799 f->open_object_section("mgr");
3800 for (auto& p : mgr) {
3801 f->dump_int(p.first.c_str(), p.second);
3802 overall[p.first] += p.second;
3803 }
3804 f->close_section();
3805
3806 osdmon()->count_metadata("ceph_version", &osd);
3807 f->open_object_section("osd");
3808 for (auto& p : osd) {
3809 f->dump_int(p.first.c_str(), p.second);
3810 overall[p.first] += p.second;
3811 }
3812 f->close_section();
3813
3814 mdsmon()->count_metadata("ceph_version", &mds);
3815 f->open_object_section("mds");
3816 for (auto& p : mds) {
3817 f->dump_int(p.first.c_str(), p.second);
3818 overall[p.first] += p.second;
3819 }
3820 f->close_section();
3821
3822 for (auto& p : mgrstatmon()->get_service_map().services) {
3823 auto &service = p.first;
3824 if (ServiceMap::is_normal_ceph_entity(service)) {
3825 continue;
3826 }
3827 f->open_object_section(service.c_str());
3828 map<string,int> m;
3829 p.second.count_metadata("ceph_version", &m);
3830 for (auto& q : m) {
3831 f->dump_int(q.first.c_str(), q.second);
3832 overall[q.first] += q.second;
3833 }
3834 f->close_section();
3835 }
3836
3837 f->open_object_section("overall");
3838 for (auto& p : overall) {
3839 f->dump_int(p.first.c_str(), p.second);
3840 }
3841 f->close_section();
3842 f->close_section();
3843 f->flush(rdata);
3844 rs = "";
3845 r = 0;
3846 }
3847
3848 out:
3849 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3850 reply_command(op, r, rs, rdata, 0);
3851 }
3852
3853 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
3854 {
3855 bufferlist rdata;
3856 reply_command(op, rc, rs, rdata, version);
3857 }
3858
3859 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
3860 bufferlist& rdata, version_t version)
3861 {
3862 auto m = op->get_req<MMonCommand>();
3863 ceph_assert(m->get_type() == MSG_MON_COMMAND);
3864 MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
3865 reply->set_tid(m->get_tid());
3866 reply->set_data(rdata);
3867 send_reply(op, reply);
3868 }
3869
3870 void Monitor::reply_tell_command(
3871 MonOpRequestRef op, int rc, const string &rs)
3872 {
3873 MCommand *m = static_cast<MCommand*>(op->get_req());
3874 ceph_assert(m->get_type() == MSG_COMMAND);
3875 MCommandReply *reply = new MCommandReply(rc, rs);
3876 reply->set_tid(m->get_tid());
3877 m->get_connection()->send_message(reply);
3878 }
3879
3880
3881 // ------------------------
3882 // request/reply routing
3883 //
3884 // a client/mds/osd will connect to a random monitor. we need to forward any
3885 // messages requiring state updates to the leader, and then route any replies
3886 // back via the correct monitor and back to them. (the monitor will not
3887 // initiate any connections.)
3888
3889 void Monitor::forward_request_leader(MonOpRequestRef op)
3890 {
3891 op->mark_event(__func__);
3892
3893 int mon = get_leader();
3894 MonSession *session = op->get_session();
3895 PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
3896
3897 if (req->get_source().is_mon() && req->get_source_addrs() != messenger->get_myaddrs()) {
3898 dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
3899 } else if (session->proxy_con) {
3900 dout(10) << "forward_request won't double fwd request " << *req << dendl;
3901 } else if (!session->closed) {
3902 RoutedRequest *rr = new RoutedRequest;
3903 rr->tid = ++routed_request_tid;
3904 rr->con = req->get_connection();
3905 rr->con_features = rr->con->get_features();
3906 encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
3907 rr->session = static_cast<MonSession *>(session->get());
3908 rr->op = op;
3909 routed_requests[rr->tid] = rr;
3910 session->routed_request_tids.insert(rr->tid);
3911
3912 dout(10) << "forward_request " << rr->tid << " request " << *req
3913 << " features " << rr->con_features << dendl;
3914
3915 MForward *forward = new MForward(rr->tid,
3916 req,
3917 rr->con_features,
3918 rr->session->caps);
3919 forward->set_priority(req->get_priority());
3920 if (session->auth_handler) {
3921 forward->entity_name = session->entity_name;
3922 } else if (req->get_source().is_mon()) {
3923 forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
3924 }
3925 send_mon_message(forward, mon);
3926 op->mark_forwarded();
3927 ceph_assert(op->get_req()->get_type() != 0);
3928 } else {
3929 dout(10) << "forward_request no session for request " << *req << dendl;
3930 }
3931 }
3932
3933 // fake connection attached to forwarded messages
3934 struct AnonConnection : public Connection {
3935 entity_addr_t socket_addr;
3936
3937 int send_message(Message *m) override {
3938 ceph_assert(!"send_message on anonymous connection");
3939 }
3940 void send_keepalive() override {
3941 ceph_assert(!"send_keepalive on anonymous connection");
3942 }
3943 void mark_down() override {
3944 // silently ignore
3945 }
3946 void mark_disposable() override {
3947 // silengtly ignore
3948 }
3949 bool is_connected() override { return false; }
3950 entity_addr_t get_peer_socket_addr() const override {
3951 return socket_addr;
3952 }
3953
3954 private:
3955 FRIEND_MAKE_REF(AnonConnection);
3956 explicit AnonConnection(CephContext *cct, const entity_addr_t& sa)
3957 : Connection(cct, nullptr),
3958 socket_addr(sa) {}
3959 };
3960
3961 //extract the original message and put it into the regular dispatch function
3962 void Monitor::handle_forward(MonOpRequestRef op)
3963 {
3964 auto m = op->get_req<MForward>();
3965 dout(10) << "received forwarded message from "
3966 << ceph_entity_type_name(m->client_type)
3967 << " " << m->client_addrs
3968 << " via " << m->get_source_inst() << dendl;
3969 MonSession *session = op->get_session();
3970 ceph_assert(session);
3971
3972 if (!session->is_capable("mon", MON_CAP_X)) {
3973 dout(0) << "forward from entity with insufficient caps! "
3974 << session->caps << dendl;
3975 } else {
3976 // see PaxosService::dispatch(); we rely on this being anon
3977 // (c->msgr == NULL)
3978 PaxosServiceMessage *req = m->claim_message();
3979 ceph_assert(req != NULL);
3980
3981 auto c = ceph::make_ref<AnonConnection>(cct, m->client_socket_addr);
3982 MonSession *s = new MonSession(static_cast<Connection*>(c.get()));
3983 s->_ident(req->get_source(),
3984 req->get_source_addrs());
3985 c->set_priv(RefCountedPtr{s, false});
3986 c->set_peer_addrs(m->client_addrs);
3987 c->set_peer_type(m->client_type);
3988 c->set_features(m->con_features);
3989
3990 s->authenticated = true;
3991 s->caps = m->client_caps;
3992 dout(10) << " caps are " << s->caps << dendl;
3993 s->entity_name = m->entity_name;
3994 dout(10) << " entity name '" << s->entity_name << "' type "
3995 << s->entity_name.get_type() << dendl;
3996 s->proxy_con = m->get_connection();
3997 s->proxy_tid = m->tid;
3998
3999 req->set_connection(c);
4000
4001 // not super accurate, but better than nothing.
4002 req->set_recv_stamp(m->get_recv_stamp());
4003
4004 /*
4005 * note which election epoch this is; we will drop the message if
4006 * there is a future election since our peers will resend routed
4007 * requests in that case.
4008 */
4009 req->rx_election_epoch = get_epoch();
4010
4011 dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
4012 _ms_dispatch(req);
4013
4014 // break the session <-> con ref loop by removing the con->session
4015 // reference, which is no longer needed once the MonOpRequest is
4016 // set up.
4017 c->set_priv(NULL);
4018 }
4019 }
4020
4021 void Monitor::send_reply(MonOpRequestRef op, Message *reply)
4022 {
4023 op->mark_event(__func__);
4024
4025 MonSession *session = op->get_session();
4026 ceph_assert(session);
4027 Message *req = op->get_req();
4028 ConnectionRef con = op->get_connection();
4029
4030 reply->set_cct(g_ceph_context);
4031 dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
4032
4033 if (!con) {
4034 dout(2) << "send_reply no connection, dropping reply " << *reply
4035 << " to " << req << " " << *req << dendl;
4036 reply->put();
4037 op->mark_event("reply: no connection");
4038 return;
4039 }
4040
4041 if (!session->con && !session->proxy_con) {
4042 dout(2) << "send_reply no connection, dropping reply " << *reply
4043 << " to " << req << " " << *req << dendl;
4044 reply->put();
4045 op->mark_event("reply: no connection");
4046 return;
4047 }
4048
4049 if (session->proxy_con) {
4050 dout(15) << "send_reply routing reply to " << con->get_peer_addr()
4051 << " via " << session->proxy_con->get_peer_addr()
4052 << " for request " << *req << dendl;
4053 session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
4054 op->mark_event("reply: send routed request");
4055 } else {
4056 session->con->send_message(reply);
4057 op->mark_event("reply: send");
4058 }
4059 }
4060
4061 void Monitor::no_reply(MonOpRequestRef op)
4062 {
4063 MonSession *session = op->get_session();
4064 Message *req = op->get_req();
4065
4066 if (session->proxy_con) {
4067 dout(10) << "no_reply to " << req->get_source_inst()
4068 << " via " << session->proxy_con->get_peer_addr()
4069 << " for request " << *req << dendl;
4070 session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
4071 op->mark_event("no_reply: send routed request");
4072 } else {
4073 dout(10) << "no_reply to " << req->get_source_inst()
4074 << " " << *req << dendl;
4075 op->mark_event("no_reply");
4076 }
4077 }
4078
4079 void Monitor::handle_route(MonOpRequestRef op)
4080 {
4081 auto m = op->get_req<MRoute>();
4082 MonSession *session = op->get_session();
4083 //check privileges
4084 if (!session->is_capable("mon", MON_CAP_X)) {
4085 dout(0) << "MRoute received from entity without appropriate perms! "
4086 << dendl;
4087 return;
4088 }
4089 if (m->msg)
4090 dout(10) << "handle_route tid " << m->session_mon_tid << " " << *m->msg
4091 << dendl;
4092 else
4093 dout(10) << "handle_route tid " << m->session_mon_tid << " null" << dendl;
4094
4095 // look it up
4096 if (m->session_mon_tid) {
4097 if (routed_requests.count(m->session_mon_tid)) {
4098 RoutedRequest *rr = routed_requests[m->session_mon_tid];
4099
4100 // reset payload, in case encoding is dependent on target features
4101 if (m->msg) {
4102 m->msg->clear_payload();
4103 rr->con->send_message(m->msg);
4104 m->msg = NULL;
4105 }
4106 if (m->send_osdmap_first) {
4107 dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
4108 osdmon()->send_incremental(m->send_osdmap_first, rr->session,
4109 true, MonOpRequestRef());
4110 }
4111 ceph_assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
4112 routed_requests.erase(m->session_mon_tid);
4113 rr->session->routed_request_tids.erase(m->session_mon_tid);
4114 delete rr;
4115 } else {
4116 dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
4117 }
4118 } else {
4119 dout(10) << " not a routed request, ignoring" << dendl;
4120 }
4121 }
4122
4123 void Monitor::resend_routed_requests()
4124 {
4125 dout(10) << "resend_routed_requests" << dendl;
4126 int mon = get_leader();
4127 list<Context*> retry;
4128 for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
4129 p != routed_requests.end();
4130 ++p) {
4131 RoutedRequest *rr = p->second;
4132
4133 if (mon == rank) {
4134 dout(10) << " requeue for self tid " << rr->tid << dendl;
4135 rr->op->mark_event("retry routed request");
4136 retry.push_back(new C_RetryMessage(this, rr->op));
4137 if (rr->session) {
4138 ceph_assert(rr->session->routed_request_tids.count(p->first));
4139 rr->session->routed_request_tids.erase(p->first);
4140 }
4141 delete rr;
4142 } else {
4143 auto q = rr->request_bl.cbegin();
4144 PaxosServiceMessage *req =
4145 (PaxosServiceMessage *)decode_message(cct, 0, q);
4146 rr->op->mark_event("resend forwarded message to leader");
4147 dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req
4148 << dendl;
4149 MForward *forward = new MForward(rr->tid,
4150 req,
4151 rr->con_features,
4152 rr->session->caps);
4153 req->put(); // forward takes its own ref; drop ours.
4154 forward->client_type = rr->con->get_peer_type();
4155 forward->client_addrs = rr->con->get_peer_addrs();
4156 forward->client_socket_addr = rr->con->get_peer_socket_addr();
4157 forward->set_priority(req->get_priority());
4158 send_mon_message(forward, mon);
4159 }
4160 }
4161 if (mon == rank) {
4162 routed_requests.clear();
4163 finish_contexts(g_ceph_context, retry);
4164 }
4165 }
4166
4167 void Monitor::remove_session(MonSession *s)
4168 {
4169 dout(10) << "remove_session " << s << " " << s->name << " " << s->addrs
4170 << " features 0x" << std::hex << s->con_features << std::dec << dendl;
4171 ceph_assert(s->con);
4172 ceph_assert(!s->closed);
4173 for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
4174 p != s->routed_request_tids.end();
4175 ++p) {
4176 ceph_assert(routed_requests.count(*p));
4177 RoutedRequest *rr = routed_requests[*p];
4178 dout(10) << " dropping routed request " << rr->tid << dendl;
4179 delete rr;
4180 routed_requests.erase(*p);
4181 }
4182 s->routed_request_tids.clear();
4183 s->con->set_priv(nullptr);
4184 session_map.remove_session(s);
4185 logger->set(l_mon_num_sessions, session_map.get_size());
4186 logger->inc(l_mon_session_rm);
4187 }
4188
4189 void Monitor::remove_all_sessions()
4190 {
4191 std::lock_guard l(session_map_lock);
4192 while (!session_map.sessions.empty()) {
4193 MonSession *s = session_map.sessions.front();
4194 remove_session(s);
4195 logger->inc(l_mon_session_rm);
4196 }
4197 if (logger)
4198 logger->set(l_mon_num_sessions, session_map.get_size());
4199 }
4200
4201 void Monitor::send_mon_message(Message *m, int rank)
4202 {
4203 messenger->send_to_mon(m, monmap->get_addrs(rank));
4204 }
4205
4206 void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
4207 {
4208 /**
4209 * Wait list the new session until we're in the quorum, assuming it's
4210 * sufficiently new.
4211 * tick() will periodically send them back through so we can send
4212 * the client elsewhere if we don't think we're getting back in.
4213 *
4214 * But we whitelist a few sorts of messages:
4215 * 1) Monitors can talk to us at any time, of course.
4216 * 2) auth messages. It's unlikely to go through much faster, but
4217 * it's possible we've just lost our quorum status and we want to take...
4218 * 3) command messages. We want to accept these under all possible
4219 * circumstances.
4220 */
4221 Message *m = op->get_req();
4222 MonSession *s = op->get_session();
4223 ConnectionRef con = op->get_connection();
4224 utime_t too_old = ceph_clock_now();
4225 too_old -= g_ceph_context->_conf->mon_lease;
4226 if (m->get_recv_stamp() > too_old &&
4227 con->is_connected()) {
4228 dout(5) << "waitlisting message " << *m << dendl;
4229 maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
4230 op->mark_wait_for_quorum();
4231 } else {
4232 dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
4233 con->mark_down();
4234 // proxied sessions aren't registered and don't have a con; don't remove
4235 // those.
4236 if (!s->proxy_con) {
4237 std::lock_guard l(session_map_lock);
4238 remove_session(s);
4239 }
4240 op->mark_zap();
4241 }
4242 }
4243
4244 void Monitor::_ms_dispatch(Message *m)
4245 {
4246 if (is_shutdown()) {
4247 m->put();
4248 return;
4249 }
4250
4251 MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
4252 bool src_is_mon = op->is_src_mon();
4253 op->mark_event("mon:_ms_dispatch");
4254 MonSession *s = op->get_session();
4255 if (s && s->closed) {
4256 return;
4257 }
4258
4259 if (src_is_mon && s) {
4260 ConnectionRef con = m->get_connection();
4261 if (con->get_messenger() && con->get_features() != s->con_features) {
4262 // only update features if this is a non-anonymous connection
4263 dout(10) << __func__ << " feature change for " << m->get_source_inst()
4264 << " (was " << s->con_features
4265 << ", now " << con->get_features() << ")" << dendl;
4266 // connection features changed - recreate session.
4267 if (s->con && s->con != con) {
4268 dout(10) << __func__ << " connection for " << m->get_source_inst()
4269 << " changed from session; mark down and replace" << dendl;
4270 s->con->mark_down();
4271 }
4272 if (s->item.is_on_list()) {
4273 // forwarded messages' sessions are not in the sessions map and
4274 // exist only while the op is being handled.
4275 std::lock_guard l(session_map_lock);
4276 remove_session(s);
4277 }
4278 s = nullptr;
4279 }
4280 }
4281
4282 if (!s) {
4283 // if the sender is not a monitor, make sure their first message for a
4284 // session is an MAuth. If it is not, assume it's a stray message,
4285 // and considering that we are creating a new session it is safe to
4286 // assume that the sender hasn't authenticated yet, so we have no way
4287 // of assessing whether we should handle it or not.
4288 if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
4289 m->get_type() != CEPH_MSG_MON_GET_MAP &&
4290 m->get_type() != CEPH_MSG_PING)) {
4291 dout(1) << __func__ << " dropping stray message " << *m
4292 << " from " << m->get_source_inst() << dendl;
4293 return;
4294 }
4295
4296 ConnectionRef con = m->get_connection();
4297 {
4298 std::lock_guard l(session_map_lock);
4299 s = session_map.new_session(m->get_source(),
4300 m->get_source_addrs(),
4301 con.get());
4302 }
4303 ceph_assert(s);
4304 con->set_priv(RefCountedPtr{s, false});
4305 dout(10) << __func__ << " new session " << s << " " << *s
4306 << " features 0x" << std::hex
4307 << s->con_features << std::dec << dendl;
4308 op->set_session(s);
4309
4310 logger->set(l_mon_num_sessions, session_map.get_size());
4311 logger->inc(l_mon_session_add);
4312
4313 if (src_is_mon) {
4314 // give it monitor caps; the peer type has been authenticated
4315 dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
4316 if (!s->caps.is_allow_all()) // but no need to repeatedly copy
4317 s->caps = mon_caps;
4318 s->authenticated = true;
4319 }
4320 } else {
4321 dout(20) << __func__ << " existing session " << s << " for " << s->name
4322 << dendl;
4323 }
4324
4325 ceph_assert(s);
4326
4327 s->session_timeout = ceph_clock_now();
4328 s->session_timeout += g_conf()->mon_session_timeout;
4329
4330 if (s->auth_handler) {
4331 s->entity_name = s->auth_handler->get_entity_name();
4332 }
4333 dout(20) << " entity " << s->entity_name
4334 << " caps " << s->caps.get_str() << dendl;
4335
4336 if ((is_synchronizing() ||
4337 (!s->authenticated && !exited_quorum.is_zero())) &&
4338 !src_is_mon &&
4339 m->get_type() != CEPH_MSG_PING) {
4340 waitlist_or_zap_client(op);
4341 } else {
4342 dispatch_op(op);
4343 }
4344 return;
4345 }
4346
4347 void Monitor::dispatch_op(MonOpRequestRef op)
4348 {
4349 op->mark_event("mon:dispatch_op");
4350 MonSession *s = op->get_session();
4351 ceph_assert(s);
4352 if (s->closed) {
4353 dout(10) << " session closed, dropping " << op->get_req() << dendl;
4354 return;
4355 }
4356
4357 /* we will consider the default type as being 'monitor' until proven wrong */
4358 op->set_type_monitor();
4359 /* deal with all messages that do not necessarily need caps */
4360 switch (op->get_req()->get_type()) {
4361 // auth
4362 case MSG_MON_GLOBAL_ID:
4363 case CEPH_MSG_AUTH:
4364 op->set_type_service();
4365 /* no need to check caps here */
4366 paxos_service[PAXOS_AUTH]->dispatch(op);
4367 return;
4368
4369 case CEPH_MSG_PING:
4370 handle_ping(op);
4371 return;
4372 case MSG_COMMAND:
4373 op->set_type_command();
4374 handle_tell_command(op);
4375 return;
4376 }
4377
4378 if (!op->get_session()->authenticated) {
4379 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4380 << " is not authenticated, dropping " << *(op->get_req())
4381 << dendl;
4382 return;
4383 }
4384
4385 switch (op->get_req()->get_type()) {
4386 case CEPH_MSG_MON_GET_MAP:
4387 handle_mon_get_map(op);
4388 return;
4389
4390 case MSG_GET_CONFIG:
4391 configmon()->handle_get_config(op);
4392 return;
4393
4394 case CEPH_MSG_MON_METADATA:
4395 return handle_mon_metadata(op);
4396
4397 case CEPH_MSG_MON_SUBSCRIBE:
4398 /* FIXME: check what's being subscribed, filter accordingly */
4399 handle_subscribe(op);
4400 return;
4401 }
4402
4403 /* well, maybe the op belongs to a service... */
4404 op->set_type_service();
4405 /* deal with all messages which caps should be checked somewhere else */
4406 switch (op->get_req()->get_type()) {
4407
4408 // OSDs
4409 case CEPH_MSG_MON_GET_OSDMAP:
4410 case CEPH_MSG_POOLOP:
4411 case MSG_OSD_BEACON:
4412 case MSG_OSD_MARK_ME_DOWN:
4413 case MSG_OSD_MARK_ME_DEAD:
4414 case MSG_OSD_FULL:
4415 case MSG_OSD_FAILURE:
4416 case MSG_OSD_BOOT:
4417 case MSG_OSD_ALIVE:
4418 case MSG_OSD_PGTEMP:
4419 case MSG_OSD_PG_CREATED:
4420 case MSG_REMOVE_SNAPS:
4421 case MSG_MON_GET_PURGED_SNAPS:
4422 case MSG_OSD_PG_READY_TO_MERGE:
4423 paxos_service[PAXOS_OSDMAP]->dispatch(op);
4424 return;
4425
4426 // MDSs
4427 case MSG_MDS_BEACON:
4428 case MSG_MDS_OFFLOAD_TARGETS:
4429 paxos_service[PAXOS_MDSMAP]->dispatch(op);
4430 return;
4431
4432 // Mgrs
4433 case MSG_MGR_BEACON:
4434 paxos_service[PAXOS_MGR]->dispatch(op);
4435 return;
4436
4437 // MgrStat
4438 case MSG_MON_MGR_REPORT:
4439 case CEPH_MSG_STATFS:
4440 case MSG_GETPOOLSTATS:
4441 paxos_service[PAXOS_MGRSTAT]->dispatch(op);
4442 return;
4443
4444 // log
4445 case MSG_LOG:
4446 paxos_service[PAXOS_LOG]->dispatch(op);
4447 return;
4448
4449 // handle_command() does its own caps checking
4450 case MSG_MON_COMMAND:
4451 op->set_type_command();
4452 handle_command(op);
4453 return;
4454 }
4455
4456 /* nop, looks like it's not a service message; revert back to monitor */
4457 op->set_type_monitor();
4458
4459 /* messages we, the Monitor class, need to deal with
4460 * but may be sent by clients. */
4461
4462 if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
4463 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4464 << " not enough caps for " << *(op->get_req()) << " -- dropping"
4465 << dendl;
4466 return;
4467 }
4468
4469 switch (op->get_req()->get_type()) {
4470 // misc
4471 case CEPH_MSG_MON_GET_VERSION:
4472 handle_get_version(op);
4473 return;
4474 }
4475
4476 if (!op->is_src_mon()) {
4477 dout(1) << __func__ << " unexpected monitor message from"
4478 << " non-monitor entity " << op->get_req()->get_source_inst()
4479 << " " << *(op->get_req()) << " -- dropping" << dendl;
4480 return;
4481 }
4482
4483 /* messages that should only be sent by another monitor */
4484 switch (op->get_req()->get_type()) {
4485
4486 case MSG_ROUTE:
4487 handle_route(op);
4488 return;
4489
4490 case MSG_MON_PROBE:
4491 handle_probe(op);
4492 return;
4493
4494 // Sync (i.e., the new slurp, but on steroids)
4495 case MSG_MON_SYNC:
4496 handle_sync(op);
4497 return;
4498 case MSG_MON_SCRUB:
4499 handle_scrub(op);
4500 return;
4501
4502 /* log acks are sent from a monitor we sent the MLog to, and are
4503 never sent by clients to us. */
4504 case MSG_LOGACK:
4505 log_client.handle_log_ack((MLogAck*)op->get_req());
4506 return;
4507
4508 // monmap
4509 case MSG_MON_JOIN:
4510 op->set_type_service();
4511 paxos_service[PAXOS_MONMAP]->dispatch(op);
4512 return;
4513
4514 // paxos
4515 case MSG_MON_PAXOS:
4516 {
4517 op->set_type_paxos();
4518 auto pm = op->get_req<MMonPaxos>();
4519 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4520 //can't send these!
4521 return;
4522 }
4523
4524 if (state == STATE_SYNCHRONIZING) {
4525 // we are synchronizing. These messages would do us no
4526 // good, thus just drop them and ignore them.
4527 dout(10) << __func__ << " ignore paxos msg from "
4528 << pm->get_source_inst() << dendl;
4529 return;
4530 }
4531
4532 // sanitize
4533 if (pm->epoch > get_epoch()) {
4534 bootstrap();
4535 return;
4536 }
4537 if (pm->epoch != get_epoch()) {
4538 return;
4539 }
4540
4541 paxos->dispatch(op);
4542 }
4543 return;
4544
4545 // elector messages
4546 case MSG_MON_ELECTION:
4547 op->set_type_election();
4548 //check privileges here for simplicity
4549 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4550 dout(0) << "MMonElection received from entity without enough caps!"
4551 << op->get_session()->caps << dendl;
4552 return;;
4553 }
4554 if (!is_probing() && !is_synchronizing()) {
4555 elector.dispatch(op);
4556 }
4557 return;
4558
4559 case MSG_FORWARD:
4560 handle_forward(op);
4561 return;
4562
4563 case MSG_TIMECHECK:
4564 dout(5) << __func__ << " ignoring " << op << dendl;
4565 return;
4566 case MSG_TIMECHECK2:
4567 handle_timecheck(op);
4568 return;
4569
4570 case MSG_MON_HEALTH:
4571 dout(5) << __func__ << " dropping deprecated message: "
4572 << *op->get_req() << dendl;
4573 break;
4574 case MSG_MON_HEALTH_CHECKS:
4575 op->set_type_service();
4576 paxos_service[PAXOS_HEALTH]->dispatch(op);
4577 return;
4578 }
4579 dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
4580 return;
4581 }
4582
4583 void Monitor::handle_ping(MonOpRequestRef op)
4584 {
4585 auto m = op->get_req<MPing>();
4586 dout(10) << __func__ << " " << *m << dendl;
4587 MPing *reply = new MPing;
4588 bufferlist payload;
4589 boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4590 f->open_object_section("pong");
4591
4592 healthmon()->get_health_status(false, f.get(), nullptr);
4593 get_mon_status(f.get());
4594
4595 f->close_section();
4596 stringstream ss;
4597 f->flush(ss);
4598 encode(ss.str(), payload);
4599 reply->set_payload(payload);
4600 dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
4601 m->get_connection()->send_message(reply);
4602 }
4603
4604 void Monitor::timecheck_start()
4605 {
4606 dout(10) << __func__ << dendl;
4607 timecheck_cleanup();
4608 if (get_quorum_mon_features().contains_all(
4609 ceph::features::mon::FEATURE_NAUTILUS)) {
4610 timecheck_start_round();
4611 }
4612 }
4613
4614 void Monitor::timecheck_finish()
4615 {
4616 dout(10) << __func__ << dendl;
4617 timecheck_cleanup();
4618 }
4619
4620 void Monitor::timecheck_start_round()
4621 {
4622 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4623 ceph_assert(is_leader());
4624
4625 if (monmap->size() == 1) {
4626 ceph_abort_msg("We are alone; this shouldn't have been scheduled!");
4627 return;
4628 }
4629
4630 if (timecheck_round % 2) {
4631 dout(10) << __func__ << " there's a timecheck going on" << dendl;
4632 utime_t curr_time = ceph_clock_now();
4633 double max = g_conf()->mon_timecheck_interval*3;
4634 if (curr_time - timecheck_round_start < max) {
4635 dout(10) << __func__ << " keep current round going" << dendl;
4636 goto out;
4637 } else {
4638 dout(10) << __func__
4639 << " finish current timecheck and start new" << dendl;
4640 timecheck_cancel_round();
4641 }
4642 }
4643
4644 ceph_assert(timecheck_round % 2 == 0);
4645 timecheck_acks = 0;
4646 timecheck_round ++;
4647 timecheck_round_start = ceph_clock_now();
4648 dout(10) << __func__ << " new " << timecheck_round << dendl;
4649
4650 timecheck();
4651 out:
4652 dout(10) << __func__ << " setting up next event" << dendl;
4653 timecheck_reset_event();
4654 }
4655
4656 void Monitor::timecheck_finish_round(bool success)
4657 {
4658 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4659 ceph_assert(timecheck_round % 2);
4660 timecheck_round ++;
4661 timecheck_round_start = utime_t();
4662
4663 if (success) {
4664 ceph_assert(timecheck_waiting.empty());
4665 ceph_assert(timecheck_acks == quorum.size());
4666 timecheck_report();
4667 timecheck_check_skews();
4668 return;
4669 }
4670
4671 dout(10) << __func__ << " " << timecheck_waiting.size()
4672 << " peers still waiting:";
4673 for (auto& p : timecheck_waiting) {
4674 *_dout << " mon." << p.first;
4675 }
4676 *_dout << dendl;
4677 timecheck_waiting.clear();
4678
4679 dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4680 }
4681
4682 void Monitor::timecheck_cancel_round()
4683 {
4684 timecheck_finish_round(false);
4685 }
4686
4687 void Monitor::timecheck_cleanup()
4688 {
4689 timecheck_round = 0;
4690 timecheck_acks = 0;
4691 timecheck_round_start = utime_t();
4692
4693 if (timecheck_event) {
4694 timer.cancel_event(timecheck_event);
4695 timecheck_event = NULL;
4696 }
4697 timecheck_waiting.clear();
4698 timecheck_skews.clear();
4699 timecheck_latencies.clear();
4700
4701 timecheck_rounds_since_clean = 0;
4702 }
4703
4704 void Monitor::timecheck_reset_event()
4705 {
4706 if (timecheck_event) {
4707 timer.cancel_event(timecheck_event);
4708 timecheck_event = NULL;
4709 }
4710
4711 double delay =
4712 cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4713
4714 if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4715 delay = cct->_conf->mon_timecheck_interval;
4716 }
4717
4718 dout(10) << __func__ << " delay " << delay
4719 << " rounds_since_clean " << timecheck_rounds_since_clean
4720 << dendl;
4721
4722 timecheck_event = timer.add_event_after(
4723 delay,
4724 new C_MonContext{this, [this](int) {
4725 timecheck_start_round();
4726 }});
4727 }
4728
4729 void Monitor::timecheck_check_skews()
4730 {
4731 dout(10) << __func__ << dendl;
4732 ceph_assert(is_leader());
4733 ceph_assert((timecheck_round % 2) == 0);
4734 if (monmap->size() == 1) {
4735 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4736 return;
4737 }
4738 ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
4739
4740 bool found_skew = false;
4741 for (auto& p : timecheck_skews) {
4742 double abs_skew;
4743 if (timecheck_has_skew(p.second, &abs_skew)) {
4744 dout(10) << __func__
4745 << " " << p.first << " skew " << abs_skew << dendl;
4746 found_skew = true;
4747 }
4748 }
4749
4750 if (found_skew) {
4751 ++timecheck_rounds_since_clean;
4752 timecheck_reset_event();
4753 } else if (timecheck_rounds_since_clean > 0) {
4754 dout(1) << __func__
4755 << " no clock skews found after " << timecheck_rounds_since_clean
4756 << " rounds" << dendl;
4757 // make sure the skews are really gone and not just a transient success
4758 // this will run just once if not in the presence of skews again.
4759 timecheck_rounds_since_clean = 1;
4760 timecheck_reset_event();
4761 timecheck_rounds_since_clean = 0;
4762 }
4763
4764 }
4765
4766 void Monitor::timecheck_report()
4767 {
4768 dout(10) << __func__ << dendl;
4769 ceph_assert(is_leader());
4770 ceph_assert((timecheck_round % 2) == 0);
4771 if (monmap->size() == 1) {
4772 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4773 return;
4774 }
4775
4776 ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
4777 bool do_output = true; // only output report once
4778 for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4779 if (monmap->get_name(*q) == name)
4780 continue;
4781
4782 MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_REPORT);
4783 m->epoch = get_epoch();
4784 m->round = timecheck_round;
4785
4786 for (auto& it : timecheck_skews) {
4787 double skew = it.second;
4788 double latency = timecheck_latencies[it.first];
4789
4790 m->skews[it.first] = skew;
4791 m->latencies[it.first] = latency;
4792
4793 if (do_output) {
4794 dout(25) << __func__ << " mon." << it.first
4795 << " latency " << latency
4796 << " skew " << skew << dendl;
4797 }
4798 }
4799 do_output = false;
4800 dout(10) << __func__ << " send report to mon." << *q << dendl;
4801 send_mon_message(m, *q);
4802 }
4803 }
4804
4805 void Monitor::timecheck()
4806 {
4807 dout(10) << __func__ << dendl;
4808 ceph_assert(is_leader());
4809 if (monmap->size() == 1) {
4810 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4811 return;
4812 }
4813 ceph_assert(timecheck_round % 2 != 0);
4814
4815 timecheck_acks = 1; // we ack ourselves
4816
4817 dout(10) << __func__ << " start timecheck epoch " << get_epoch()
4818 << " round " << timecheck_round << dendl;
4819
4820 // we are at the eye of the storm; the point of reference
4821 timecheck_skews[rank] = 0.0;
4822 timecheck_latencies[rank] = 0.0;
4823
4824 for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
4825 if (monmap->get_name(*it) == name)
4826 continue;
4827
4828 utime_t curr_time = ceph_clock_now();
4829 timecheck_waiting[*it] = curr_time;
4830 MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_PING);
4831 m->epoch = get_epoch();
4832 m->round = timecheck_round;
4833 dout(10) << __func__ << " send " << *m << " to mon." << *it << dendl;
4834 send_mon_message(m, *it);
4835 }
4836 }
4837
4838 health_status_t Monitor::timecheck_status(ostringstream &ss,
4839 const double skew_bound,
4840 const double latency)
4841 {
4842 health_status_t status = HEALTH_OK;
4843 ceph_assert(latency >= 0);
4844
4845 double abs_skew;
4846 if (timecheck_has_skew(skew_bound, &abs_skew)) {
4847 status = HEALTH_WARN;
4848 ss << "clock skew " << abs_skew << "s"
4849 << " > max " << g_conf()->mon_clock_drift_allowed << "s";
4850 }
4851
4852 return status;
4853 }
4854
4855 void Monitor::handle_timecheck_leader(MonOpRequestRef op)
4856 {
4857 auto m = op->get_req<MTimeCheck2>();
4858 dout(10) << __func__ << " " << *m << dendl;
4859 /* handles PONG's */
4860 ceph_assert(m->op == MTimeCheck2::OP_PONG);
4861
4862 int other = m->get_source().num();
4863 if (m->epoch < get_epoch()) {
4864 dout(1) << __func__ << " got old timecheck epoch " << m->epoch
4865 << " from " << other
4866 << " curr " << get_epoch()
4867 << " -- severely lagged? discard" << dendl;
4868 return;
4869 }
4870 ceph_assert(m->epoch == get_epoch());
4871
4872 if (m->round < timecheck_round) {
4873 dout(1) << __func__ << " got old round " << m->round
4874 << " from " << other
4875 << " curr " << timecheck_round << " -- discard" << dendl;
4876 return;
4877 }
4878
4879 utime_t curr_time = ceph_clock_now();
4880
4881 ceph_assert(timecheck_waiting.count(other) > 0);
4882 utime_t timecheck_sent = timecheck_waiting[other];
4883 timecheck_waiting.erase(other);
4884 if (curr_time < timecheck_sent) {
4885 // our clock was readjusted -- drop everything until it all makes sense.
4886 dout(1) << __func__ << " our clock was readjusted --"
4887 << " bump round and drop current check"
4888 << dendl;
4889 timecheck_cancel_round();
4890 return;
4891 }
4892
4893 /* update peer latencies */
4894 double latency = (double)(curr_time - timecheck_sent);
4895
4896 if (timecheck_latencies.count(other) == 0)
4897 timecheck_latencies[other] = latency;
4898 else {
4899 double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
4900 timecheck_latencies[other] = avg_latency;
4901 }
4902
4903 /*
4904 * update skews
4905 *
4906 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4907 * and 'a' happens to be lower than 'b'; so we use double instead.
4908 *
4909 * latency is always expected to be >= 0.
4910 *
4911 * delta, the difference between theirs timestamp and ours, may either be
4912 * lower or higher than 0; will hardly ever be 0.
4913 *
4914 * The absolute skew is the absolute delta minus the latency, which is
4915 * taken as a whole instead of an rtt given that there is some queueing
4916 * and dispatch times involved and it's hard to assess how long exactly
4917 * it took for the message to travel to the other side and be handled. So
4918 * we call it a bounded skew, the worst case scenario.
4919 *
4920 * Now, to math!
4921 *
4922 * Given that the latency is always positive, we can establish that the
4923 * bounded skew will be:
4924 *
4925 * 1. positive if the absolute delta is higher than the latency and
4926 * delta is positive
4927 * 2. negative if the absolute delta is higher than the latency and
4928 * delta is negative.
4929 * 3. zero if the absolute delta is lower than the latency.
4930 *
4931 * On 3. we make a judgement call and treat the skew as non-existent.
4932 * This is because that, if the absolute delta is lower than the
4933 * latency, then the apparently existing skew is nothing more than a
4934 * side-effect of the high latency at work.
4935 *
4936 * This may not be entirely true though, as a severely skewed clock
4937 * may be masked by an even higher latency, but with high latencies
4938 * we probably have worse issues to deal with than just skewed clocks.
4939 */
4940 ceph_assert(latency >= 0);
4941
4942 double delta = ((double) m->timestamp) - ((double) curr_time);
4943 double abs_delta = (delta > 0 ? delta : -delta);
4944 double skew_bound = abs_delta - latency;
4945 if (skew_bound < 0)
4946 skew_bound = 0;
4947 else if (delta < 0)
4948 skew_bound = -skew_bound;
4949
4950 ostringstream ss;
4951 health_status_t status = timecheck_status(ss, skew_bound, latency);
4952 if (status != HEALTH_OK) {
4953 clog->health(status) << other << " " << ss.str();
4954 }
4955
4956 dout(10) << __func__ << " from " << other << " ts " << m->timestamp
4957 << " delta " << delta << " skew_bound " << skew_bound
4958 << " latency " << latency << dendl;
4959
4960 timecheck_skews[other] = skew_bound;
4961
4962 timecheck_acks++;
4963 if (timecheck_acks == quorum.size()) {
4964 dout(10) << __func__ << " got pongs from everybody ("
4965 << timecheck_acks << " total)" << dendl;
4966 ceph_assert(timecheck_skews.size() == timecheck_acks);
4967 ceph_assert(timecheck_waiting.empty());
4968 // everyone has acked, so bump the round to finish it.
4969 timecheck_finish_round();
4970 }
4971 }
4972
4973 void Monitor::handle_timecheck_peon(MonOpRequestRef op)
4974 {
4975 auto m = op->get_req<MTimeCheck2>();
4976 dout(10) << __func__ << " " << *m << dendl;
4977
4978 ceph_assert(is_peon());
4979 ceph_assert(m->op == MTimeCheck2::OP_PING || m->op == MTimeCheck2::OP_REPORT);
4980
4981 if (m->epoch != get_epoch()) {
4982 dout(1) << __func__ << " got wrong epoch "
4983 << "(ours " << get_epoch()
4984 << " theirs: " << m->epoch << ") -- discarding" << dendl;
4985 return;
4986 }
4987
4988 if (m->round < timecheck_round) {
4989 dout(1) << __func__ << " got old round " << m->round
4990 << " current " << timecheck_round
4991 << " (epoch " << get_epoch() << ") -- discarding" << dendl;
4992 return;
4993 }
4994
4995 timecheck_round = m->round;
4996
4997 if (m->op == MTimeCheck2::OP_REPORT) {
4998 ceph_assert((timecheck_round % 2) == 0);
4999 timecheck_latencies.swap(m->latencies);
5000 timecheck_skews.swap(m->skews);
5001 return;
5002 }
5003
5004 ceph_assert((timecheck_round % 2) != 0);
5005 MTimeCheck2 *reply = new MTimeCheck2(MTimeCheck2::OP_PONG);
5006 utime_t curr_time = ceph_clock_now();
5007 reply->timestamp = curr_time;
5008 reply->epoch = m->epoch;
5009 reply->round = m->round;
5010 dout(10) << __func__ << " send " << *m
5011 << " to " << m->get_source_inst() << dendl;
5012 m->get_connection()->send_message(reply);
5013 }
5014
5015 void Monitor::handle_timecheck(MonOpRequestRef op)
5016 {
5017 auto m = op->get_req<MTimeCheck2>();
5018 dout(10) << __func__ << " " << *m << dendl;
5019
5020 if (is_leader()) {
5021 if (m->op != MTimeCheck2::OP_PONG) {
5022 dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
5023 } else {
5024 handle_timecheck_leader(op);
5025 }
5026 } else if (is_peon()) {
5027 if (m->op != MTimeCheck2::OP_PING && m->op != MTimeCheck2::OP_REPORT) {
5028 dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
5029 } else {
5030 handle_timecheck_peon(op);
5031 }
5032 } else {
5033 dout(1) << __func__ << " drop unexpected msg" << dendl;
5034 }
5035 }
5036
5037 void Monitor::handle_subscribe(MonOpRequestRef op)
5038 {
5039 auto m = op->get_req<MMonSubscribe>();
5040 dout(10) << "handle_subscribe " << *m << dendl;
5041
5042 bool reply = false;
5043
5044 MonSession *s = op->get_session();
5045 ceph_assert(s);
5046
5047 if (m->hostname.size()) {
5048 s->remote_host = m->hostname;
5049 }
5050
5051 for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
5052 p != m->what.end();
5053 ++p) {
5054 if (p->first == "monmap" || p->first == "config") {
5055 // these require no caps
5056 } else if (!s->is_capable("mon", MON_CAP_R)) {
5057 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
5058 << " not enough caps for " << *(op->get_req()) << " -- dropping"
5059 << dendl;
5060 continue;
5061 }
5062
5063 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
5064 if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
5065 reply = true;
5066
5067 // remove conflicting subscribes
5068 if (logmon()->sub_name_to_id(p->first) >= 0) {
5069 for (map<string, Subscription*>::iterator it = s->sub_map.begin();
5070 it != s->sub_map.end(); ) {
5071 if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
5072 std::lock_guard l(session_map_lock);
5073 session_map.remove_sub((it++)->second);
5074 } else {
5075 ++it;
5076 }
5077 }
5078 }
5079
5080 {
5081 std::lock_guard l(session_map_lock);
5082 session_map.add_update_sub(s, p->first, p->second.start,
5083 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
5084 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
5085 }
5086
5087 if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
5088 dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
5089 if ((int)s->is_capable("mds", MON_CAP_R)) {
5090 Subscription *sub = s->sub_map[p->first];
5091 ceph_assert(sub != nullptr);
5092 mdsmon()->check_sub(sub);
5093 }
5094 } else if (p->first == "osdmap") {
5095 if ((int)s->is_capable("osd", MON_CAP_R)) {
5096 if (s->osd_epoch > p->second.start) {
5097 // client needs earlier osdmaps on purpose, so reset the sent epoch
5098 s->osd_epoch = 0;
5099 }
5100 osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
5101 }
5102 } else if (p->first == "osd_pg_creates") {
5103 if ((int)s->is_capable("osd", MON_CAP_W)) {
5104 osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
5105 }
5106 } else if (p->first == "monmap") {
5107 monmon()->check_sub(s->sub_map[p->first]);
5108 } else if (logmon()->sub_name_to_id(p->first) >= 0) {
5109 logmon()->check_sub(s->sub_map[p->first]);
5110 } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
5111 mgrmon()->check_sub(s->sub_map[p->first]);
5112 } else if (p->first == "servicemap") {
5113 mgrstatmon()->check_sub(s->sub_map[p->first]);
5114 } else if (p->first == "config") {
5115 configmon()->check_sub(s);
5116 }
5117 }
5118
5119 if (reply) {
5120 // we only need to reply if the client is old enough to think it
5121 // has to send renewals.
5122 ConnectionRef con = m->get_connection();
5123 if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
5124 m->get_connection()->send_message(new MMonSubscribeAck(
5125 monmap->get_fsid(), (int)g_conf()->mon_subscribe_interval));
5126 }
5127
5128 }
5129
5130 void Monitor::handle_get_version(MonOpRequestRef op)
5131 {
5132 auto m = op->get_req<MMonGetVersion>();
5133 dout(10) << "handle_get_version " << *m << dendl;
5134 PaxosService *svc = NULL;
5135
5136 MonSession *s = op->get_session();
5137 ceph_assert(s);
5138
5139 if (!is_leader() && !is_peon()) {
5140 dout(10) << " waiting for quorum" << dendl;
5141 waitfor_quorum.push_back(new C_RetryMessage(this, op));
5142 goto out;
5143 }
5144
5145 if (m->what == "mdsmap") {
5146 svc = mdsmon();
5147 } else if (m->what == "fsmap") {
5148 svc = mdsmon();
5149 } else if (m->what == "osdmap") {
5150 svc = osdmon();
5151 } else if (m->what == "monmap") {
5152 svc = monmon();
5153 } else {
5154 derr << "invalid map type " << m->what << dendl;
5155 }
5156
5157 if (svc) {
5158 if (!svc->is_readable()) {
5159 svc->wait_for_readable(op, new C_RetryMessage(this, op));
5160 goto out;
5161 }
5162
5163 MMonGetVersionReply *reply = new MMonGetVersionReply();
5164 reply->handle = m->handle;
5165 reply->version = svc->get_last_committed();
5166 reply->oldest_version = svc->get_first_committed();
5167 reply->set_tid(m->get_tid());
5168
5169 m->get_connection()->send_message(reply);
5170 }
5171 out:
5172 return;
5173 }
5174
5175 bool Monitor::ms_handle_reset(Connection *con)
5176 {
5177 dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
5178
5179 // ignore lossless monitor sessions
5180 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
5181 return false;
5182
5183 auto priv = con->get_priv();
5184 auto s = static_cast<MonSession*>(priv.get());
5185 if (!s)
5186 return false;
5187
5188 // break any con <-> session ref cycle
5189 s->con->set_priv(nullptr);
5190
5191 if (is_shutdown())
5192 return false;
5193
5194 std::lock_guard l(lock);
5195
5196 dout(10) << "reset/close on session " << s->name << " " << s->addrs << dendl;
5197 if (!s->closed && s->item.is_on_list()) {
5198 std::lock_guard l(session_map_lock);
5199 remove_session(s);
5200 }
5201 return true;
5202 }
5203
5204 bool Monitor::ms_handle_refused(Connection *con)
5205 {
5206 // just log for now...
5207 dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
5208 return false;
5209 }
5210
5211 // -----
5212
5213 void Monitor::send_latest_monmap(Connection *con)
5214 {
5215 bufferlist bl;
5216 monmap->encode(bl, con->get_features());
5217 con->send_message(new MMonMap(bl));
5218 }
5219
5220 void Monitor::handle_mon_get_map(MonOpRequestRef op)
5221 {
5222 auto m = op->get_req<MMonGetMap>();
5223 dout(10) << "handle_mon_get_map" << dendl;
5224 send_latest_monmap(m->get_connection().get());
5225 }
5226
5227 void Monitor::handle_mon_metadata(MonOpRequestRef op)
5228 {
5229 auto m = op->get_req<MMonMetadata>();
5230 if (is_leader()) {
5231 dout(10) << __func__ << dendl;
5232 update_mon_metadata(m->get_source().num(), std::move(m->data));
5233 }
5234 }
5235
5236 void Monitor::update_mon_metadata(int from, Metadata&& m)
5237 {
5238 // NOTE: this is now for legacy (kraken or jewel) mons only.
5239 pending_metadata[from] = std::move(m);
5240
5241 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5242 bufferlist bl;
5243 encode(pending_metadata, bl);
5244 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
5245 paxos->trigger_propose();
5246 }
5247
5248 int Monitor::load_metadata()
5249 {
5250 bufferlist bl;
5251 int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
5252 if (r)
5253 return r;
5254 auto it = bl.cbegin();
5255 decode(mon_metadata, it);
5256
5257 pending_metadata = mon_metadata;
5258 return 0;
5259 }
5260
5261 int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
5262 {
5263 ceph_assert(f);
5264 if (!mon_metadata.count(mon)) {
5265 err << "mon." << mon << " not found";
5266 return -EINVAL;
5267 }
5268 const Metadata& m = mon_metadata[mon];
5269 for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
5270 f->dump_string(p->first.c_str(), p->second);
5271 }
5272 return 0;
5273 }
5274
5275 void Monitor::count_metadata(const string& field, map<string,int> *out)
5276 {
5277 for (auto& p : mon_metadata) {
5278 auto q = p.second.find(field);
5279 if (q == p.second.end()) {
5280 (*out)["unknown"]++;
5281 } else {
5282 (*out)[q->second]++;
5283 }
5284 }
5285 }
5286
5287 void Monitor::count_metadata(const string& field, Formatter *f)
5288 {
5289 map<string,int> by_val;
5290 count_metadata(field, &by_val);
5291 f->open_object_section(field.c_str());
5292 for (auto& p : by_val) {
5293 f->dump_int(p.first.c_str(), p.second);
5294 }
5295 f->close_section();
5296 }
5297
5298 int Monitor::print_nodes(Formatter *f, ostream& err)
5299 {
5300 map<string, list<string> > mons; // hostname => mon
5301 for (map<int, Metadata>::iterator it = mon_metadata.begin();
5302 it != mon_metadata.end(); ++it) {
5303 const Metadata& m = it->second;
5304 Metadata::const_iterator hostname = m.find("hostname");
5305 if (hostname == m.end()) {
5306 // not likely though
5307 continue;
5308 }
5309 mons[hostname->second].push_back(monmap->get_name(it->first));
5310 }
5311
5312 dump_services(f, mons, "mon");
5313 return 0;
5314 }
5315
5316 // ----------------------------------------------
5317 // scrub
5318
5319 int Monitor::scrub_start()
5320 {
5321 dout(10) << __func__ << dendl;
5322 ceph_assert(is_leader());
5323
5324 if (!scrub_result.empty()) {
5325 clog->info() << "scrub already in progress";
5326 return -EBUSY;
5327 }
5328
5329 scrub_event_cancel();
5330 scrub_result.clear();
5331 scrub_state.reset(new ScrubState);
5332
5333 scrub();
5334 return 0;
5335 }
5336
5337 int Monitor::scrub()
5338 {
5339 ceph_assert(is_leader());
5340 ceph_assert(scrub_state);
5341
5342 scrub_cancel_timeout();
5343 wait_for_paxos_write();
5344 scrub_version = paxos->get_version();
5345
5346
5347 // scrub all keys if we're the only monitor in the quorum
5348 int32_t num_keys =
5349 (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
5350
5351 for (set<int>::iterator p = quorum.begin();
5352 p != quorum.end();
5353 ++p) {
5354 if (*p == rank)
5355 continue;
5356 MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
5357 num_keys);
5358 r->key = scrub_state->last_key;
5359 send_mon_message(r, *p);
5360 }
5361
5362 // scrub my keys
5363 bool r = _scrub(&scrub_result[rank],
5364 &scrub_state->last_key,
5365 &num_keys);
5366
5367 scrub_state->finished = !r;
5368
5369 // only after we got our scrub results do we really care whether the
5370 // other monitors are late on their results. Also, this way we avoid
5371 // triggering the timeout if we end up getting stuck in _scrub() for
5372 // longer than the duration of the timeout.
5373 scrub_reset_timeout();
5374
5375 if (quorum.size() == 1) {
5376 ceph_assert(scrub_state->finished == true);
5377 scrub_finish();
5378 }
5379 return 0;
5380 }
5381
5382 void Monitor::handle_scrub(MonOpRequestRef op)
5383 {
5384 auto m = op->get_req<MMonScrub>();
5385 dout(10) << __func__ << " " << *m << dendl;
5386 switch (m->op) {
5387 case MMonScrub::OP_SCRUB:
5388 {
5389 if (!is_peon())
5390 break;
5391
5392 wait_for_paxos_write();
5393
5394 if (m->version != paxos->get_version())
5395 break;
5396
5397 MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
5398 m->version,
5399 m->num_keys);
5400
5401 reply->key = m->key;
5402 _scrub(&reply->result, &reply->key, &reply->num_keys);
5403 m->get_connection()->send_message(reply);
5404 }
5405 break;
5406
5407 case MMonScrub::OP_RESULT:
5408 {
5409 if (!is_leader())
5410 break;
5411 if (m->version != scrub_version)
5412 break;
5413 // reset the timeout each time we get a result
5414 scrub_reset_timeout();
5415
5416 int from = m->get_source().num();
5417 ceph_assert(scrub_result.count(from) == 0);
5418 scrub_result[from] = m->result;
5419
5420 if (scrub_result.size() == quorum.size()) {
5421 scrub_check_results();
5422 scrub_result.clear();
5423 if (scrub_state->finished)
5424 scrub_finish();
5425 else
5426 scrub();
5427 }
5428 }
5429 break;
5430 }
5431 }
5432
5433 bool Monitor::_scrub(ScrubResult *r,
5434 pair<string,string> *start,
5435 int *num_keys)
5436 {
5437 ceph_assert(r != NULL);
5438 ceph_assert(start != NULL);
5439 ceph_assert(num_keys != NULL);
5440
5441 set<string> prefixes = get_sync_targets_names();
5442 prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5443
5444 dout(10) << __func__ << " start (" << *start << ")"
5445 << " num_keys " << *num_keys << dendl;
5446
5447 MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
5448
5449 int scrubbed_keys = 0;
5450 pair<string,string> last_key;
5451
5452 while (it->has_next_chunk()) {
5453
5454 if (*num_keys > 0 && scrubbed_keys == *num_keys)
5455 break;
5456
5457 pair<string,string> k = it->get_next_key();
5458 if (prefixes.count(k.first) == 0)
5459 continue;
5460
5461 if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
5462 (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
5463 dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
5464 << dendl;
5465 continue;
5466 }
5467
5468 bufferlist bl;
5469 int err = store->get(k.first, k.second, bl);
5470 ceph_assert(err == 0);
5471
5472 uint32_t key_crc = bl.crc32c(0);
5473 dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
5474 << " crc " << key_crc << dendl;
5475 r->prefix_keys[k.first]++;
5476 if (r->prefix_crc.count(k.first) == 0) {
5477 r->prefix_crc[k.first] = 0;
5478 }
5479 r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
5480
5481 if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
5482 (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
5483 dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
5484 r->prefix_crc[k.first] += 1;
5485 }
5486
5487 ++scrubbed_keys;
5488 last_key = k;
5489 }
5490
5491 dout(20) << __func__ << " last_key (" << last_key << ")"
5492 << " scrubbed_keys " << scrubbed_keys
5493 << " has_next " << it->has_next_chunk() << dendl;
5494
5495 *start = last_key;
5496 *num_keys = scrubbed_keys;
5497
5498 return it->has_next_chunk();
5499 }
5500
5501 void Monitor::scrub_check_results()
5502 {
5503 dout(10) << __func__ << dendl;
5504
5505 // compare
5506 int errors = 0;
5507 ScrubResult& mine = scrub_result[rank];
5508 for (map<int,ScrubResult>::iterator p = scrub_result.begin();
5509 p != scrub_result.end();
5510 ++p) {
5511 if (p->first == rank)
5512 continue;
5513 if (p->second != mine) {
5514 ++errors;
5515 clog->error() << "scrub mismatch";
5516 clog->error() << " mon." << rank << " " << mine;
5517 clog->error() << " mon." << p->first << " " << p->second;
5518 }
5519 }
5520 if (!errors)
5521 clog->debug() << "scrub ok on " << quorum << ": " << mine;
5522 }
5523
5524 inline void Monitor::scrub_timeout()
5525 {
5526 dout(1) << __func__ << " restarting scrub" << dendl;
5527 scrub_reset();
5528 scrub_start();
5529 }
5530
5531 void Monitor::scrub_finish()
5532 {
5533 dout(10) << __func__ << dendl;
5534 scrub_reset();
5535 scrub_event_start();
5536 }
5537
5538 void Monitor::scrub_reset()
5539 {
5540 dout(10) << __func__ << dendl;
5541 scrub_cancel_timeout();
5542 scrub_version = 0;
5543 scrub_result.clear();
5544 scrub_state.reset();
5545 }
5546
5547 inline void Monitor::scrub_update_interval(int secs)
5548 {
5549 // we don't care about changes if we are not the leader.
5550 // changes will be visible if we become the leader.
5551 if (!is_leader())
5552 return;
5553
5554 dout(1) << __func__ << " new interval = " << secs << dendl;
5555
5556 // if scrub already in progress, all changes will already be visible during
5557 // the next round. Nothing to do.
5558 if (scrub_state != NULL)
5559 return;
5560
5561 scrub_event_cancel();
5562 scrub_event_start();
5563 }
5564
5565 void Monitor::scrub_event_start()
5566 {
5567 dout(10) << __func__ << dendl;
5568
5569 if (scrub_event)
5570 scrub_event_cancel();
5571
5572 if (cct->_conf->mon_scrub_interval <= 0) {
5573 dout(1) << __func__ << " scrub event is disabled"
5574 << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval
5575 << ")" << dendl;
5576 return;
5577 }
5578
5579 scrub_event = timer.add_event_after(
5580 cct->_conf->mon_scrub_interval,
5581 new C_MonContext{this, [this](int) {
5582 scrub_start();
5583 }});
5584 }
5585
5586 void Monitor::scrub_event_cancel()
5587 {
5588 dout(10) << __func__ << dendl;
5589 if (scrub_event) {
5590 timer.cancel_event(scrub_event);
5591 scrub_event = NULL;
5592 }
5593 }
5594
5595 inline void Monitor::scrub_cancel_timeout()
5596 {
5597 if (scrub_timeout_event) {
5598 timer.cancel_event(scrub_timeout_event);
5599 scrub_timeout_event = NULL;
5600 }
5601 }
5602
5603 void Monitor::scrub_reset_timeout()
5604 {
5605 dout(15) << __func__ << " reset timeout event" << dendl;
5606 scrub_cancel_timeout();
5607 scrub_timeout_event = timer.add_event_after(
5608 g_conf()->mon_scrub_timeout,
5609 new C_MonContext{this, [this](int) {
5610 scrub_timeout();
5611 }});
5612 }
5613
5614 /************ TICK ***************/
5615 void Monitor::new_tick()
5616 {
5617 timer.add_event_after(g_conf()->mon_tick_interval, new C_MonContext{this, [this](int) {
5618 tick();
5619 }});
5620 }
5621
5622 void Monitor::tick()
5623 {
5624 // ok go.
5625 dout(11) << "tick" << dendl;
5626 const utime_t now = ceph_clock_now();
5627
5628 // Check if we need to emit any delayed health check updated messages
5629 if (is_leader()) {
5630 const auto min_period = g_conf().get_val<int64_t>(
5631 "mon_health_log_update_period");
5632 for (auto& svc : paxos_service) {
5633 auto health = svc->get_health_checks();
5634
5635 for (const auto &i : health.checks) {
5636 const std::string &code = i.first;
5637 const std::string &summary = i.second.summary;
5638 const health_status_t severity = i.second.severity;
5639
5640 auto status_iter = health_check_log_times.find(code);
5641 if (status_iter == health_check_log_times.end()) {
5642 continue;
5643 }
5644
5645 auto &log_status = status_iter->second;
5646 bool const changed = log_status.last_message != summary
5647 || log_status.severity != severity;
5648
5649 if (changed && now - log_status.updated_at > min_period) {
5650 log_status.last_message = summary;
5651 log_status.updated_at = now;
5652 log_status.severity = severity;
5653
5654 ostringstream ss;
5655 ss << "Health check update: " << summary << " (" << code << ")";
5656 clog->health(severity) << ss.str();
5657 }
5658 }
5659 }
5660 }
5661
5662
5663 for (auto& svc : paxos_service) {
5664 svc->tick();
5665 svc->maybe_trim();
5666 }
5667
5668 // trim sessions
5669 {
5670 std::lock_guard l(session_map_lock);
5671 auto p = session_map.sessions.begin();
5672
5673 bool out_for_too_long = (!exited_quorum.is_zero() &&
5674 now > (exited_quorum + 2*g_conf()->mon_lease));
5675
5676 while (!p.end()) {
5677 MonSession *s = *p;
5678 ++p;
5679
5680 // don't trim monitors
5681 if (s->name.is_mon())
5682 continue;
5683
5684 if (s->session_timeout < now && s->con) {
5685 // check keepalive, too
5686 s->session_timeout = s->con->get_last_keepalive();
5687 s->session_timeout += g_conf()->mon_session_timeout;
5688 }
5689 if (s->session_timeout < now) {
5690 dout(10) << " trimming session " << s->con << " " << s->name
5691 << " " << s->addrs
5692 << " (timeout " << s->session_timeout
5693 << " < now " << now << ")" << dendl;
5694 } else if (out_for_too_long) {
5695 // boot the client Session because we've taken too long getting back in
5696 dout(10) << " trimming session " << s->con << " " << s->name
5697 << " because we've been out of quorum too long" << dendl;
5698 } else {
5699 continue;
5700 }
5701
5702 s->con->mark_down();
5703 remove_session(s);
5704 logger->inc(l_mon_session_trim);
5705 }
5706 }
5707 sync_trim_providers();
5708
5709 if (!maybe_wait_for_quorum.empty()) {
5710 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5711 }
5712
5713 if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5714 // this is only necessary on upgraded clusters.
5715 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5716 prepare_new_fingerprint(t);
5717 paxos->trigger_propose();
5718 }
5719
5720 mgr_client.update_daemon_health(get_health_metrics());
5721 new_tick();
5722 }
5723
5724 vector<DaemonHealthMetric> Monitor::get_health_metrics()
5725 {
5726 vector<DaemonHealthMetric> metrics;
5727
5728 utime_t oldest_secs;
5729 const utime_t now = ceph_clock_now();
5730 auto too_old = now;
5731 too_old -= g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count();
5732 int slow = 0;
5733 TrackedOpRef oldest_op;
5734 auto count_slow_ops = [&](TrackedOp& op) {
5735 if (op.get_initiated() < too_old) {
5736 slow++;
5737 if (!oldest_op || op.get_initiated() < oldest_op->get_initiated()) {
5738 oldest_op = &op;
5739 }
5740 return true;
5741 } else {
5742 return false;
5743 }
5744 };
5745 if (op_tracker.visit_ops_in_flight(&oldest_secs, count_slow_ops)) {
5746 if (slow) {
5747 derr << __func__ << " reporting " << slow << " slow ops, oldest is "
5748 << oldest_op->get_desc() << dendl;
5749 }
5750 metrics.emplace_back(daemon_metric::SLOW_OPS, slow, oldest_secs);
5751 } else {
5752 metrics.emplace_back(daemon_metric::SLOW_OPS, 0, 0);
5753 }
5754 return metrics;
5755 }
5756
5757 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5758 {
5759 uuid_d nf;
5760 nf.generate_random();
5761 dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5762
5763 bufferlist bl;
5764 encode(nf, bl);
5765 t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5766 }
5767
5768 int Monitor::check_fsid()
5769 {
5770 bufferlist ebl;
5771 int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5772 if (r == -ENOENT)
5773 return r;
5774 ceph_assert(r == 0);
5775
5776 string es(ebl.c_str(), ebl.length());
5777
5778 // only keep the first line
5779 size_t pos = es.find_first_of('\n');
5780 if (pos != string::npos)
5781 es.resize(pos);
5782
5783 dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
5784 uuid_d ondisk;
5785 if (!ondisk.parse(es.c_str())) {
5786 derr << "error: unable to parse uuid" << dendl;
5787 return -EINVAL;
5788 }
5789
5790 if (monmap->get_fsid() != ondisk) {
5791 derr << "error: cluster_uuid file exists with value " << ondisk
5792 << ", != our uuid " << monmap->get_fsid() << dendl;
5793 return -EEXIST;
5794 }
5795
5796 return 0;
5797 }
5798
5799 int Monitor::write_fsid()
5800 {
5801 auto t(std::make_shared<MonitorDBStore::Transaction>());
5802 write_fsid(t);
5803 int r = store->apply_transaction(t);
5804 return r;
5805 }
5806
5807 int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
5808 {
5809 ostringstream ss;
5810 ss << monmap->get_fsid() << "\n";
5811 string us = ss.str();
5812
5813 bufferlist b;
5814 b.append(us);
5815
5816 t->put(MONITOR_NAME, "cluster_uuid", b);
5817 return 0;
5818 }
5819
5820 /*
5821 * this is the closest thing to a traditional 'mkfs' for ceph.
5822 * initialize the monitor state machines to their initial values.
5823 */
5824 int Monitor::mkfs(bufferlist& osdmapbl)
5825 {
5826 auto t(std::make_shared<MonitorDBStore::Transaction>());
5827
5828 // verify cluster fsid
5829 int r = check_fsid();
5830 if (r < 0 && r != -ENOENT)
5831 return r;
5832
5833 bufferlist magicbl;
5834 magicbl.append(CEPH_MON_ONDISK_MAGIC);
5835 magicbl.append("\n");
5836 t->put(MONITOR_NAME, "magic", magicbl);
5837
5838
5839 features = get_initial_supported_features();
5840 write_features(t);
5841
5842 // save monmap, osdmap, keyring.
5843 bufferlist monmapbl;
5844 monmap->encode(monmapbl, CEPH_FEATURES_ALL);
5845 monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5846 t->put("mkfs", "monmap", monmapbl);
5847
5848 if (osdmapbl.length()) {
5849 // make sure it's a valid osdmap
5850 try {
5851 OSDMap om;
5852 om.decode(osdmapbl);
5853 }
5854 catch (buffer::error& e) {
5855 derr << "error decoding provided osdmap: " << e.what() << dendl;
5856 return -EINVAL;
5857 }
5858 t->put("mkfs", "osdmap", osdmapbl);
5859 }
5860
5861 if (is_keyring_required()) {
5862 KeyRing keyring;
5863 string keyring_filename;
5864
5865 r = ceph_resolve_file_search(g_conf()->keyring, keyring_filename);
5866 if (r) {
5867 derr << "unable to find a keyring file on " << g_conf()->keyring
5868 << ": " << cpp_strerror(r) << dendl;
5869 if (g_conf()->key != "") {
5870 string keyring_plaintext = "[mon.]\n\tkey = " + g_conf()->key +
5871 "\n\tcaps mon = \"allow *\"\n";
5872 bufferlist bl;
5873 bl.append(keyring_plaintext);
5874 try {
5875 auto i = bl.cbegin();
5876 keyring.decode_plaintext(i);
5877 }
5878 catch (const buffer::error& e) {
5879 derr << "error decoding keyring " << keyring_plaintext
5880 << ": " << e.what() << dendl;
5881 return -EINVAL;
5882 }
5883 } else {
5884 return -ENOENT;
5885 }
5886 } else {
5887 r = keyring.load(g_ceph_context, keyring_filename);
5888 if (r < 0) {
5889 derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
5890 return r;
5891 }
5892 }
5893
5894 // put mon. key in external keyring; seed with everything else.
5895 extract_save_mon_key(keyring);
5896
5897 bufferlist keyringbl;
5898 keyring.encode_plaintext(keyringbl);
5899 t->put("mkfs", "keyring", keyringbl);
5900 }
5901 write_fsid(t);
5902 store->apply_transaction(t);
5903
5904 return 0;
5905 }
5906
5907 int Monitor::write_default_keyring(bufferlist& bl)
5908 {
5909 ostringstream os;
5910 os << g_conf()->mon_data << "/keyring";
5911
5912 int err = 0;
5913 int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT|O_CLOEXEC, 0600);
5914 if (fd < 0) {
5915 err = -errno;
5916 dout(0) << __func__ << " failed to open " << os.str()
5917 << ": " << cpp_strerror(err) << dendl;
5918 return err;
5919 }
5920
5921 err = bl.write_fd(fd);
5922 if (!err)
5923 ::fsync(fd);
5924 VOID_TEMP_FAILURE_RETRY(::close(fd));
5925
5926 return err;
5927 }
5928
5929 void Monitor::extract_save_mon_key(KeyRing& keyring)
5930 {
5931 EntityName mon_name;
5932 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
5933 EntityAuth mon_key;
5934 if (keyring.get_auth(mon_name, mon_key)) {
5935 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
5936 KeyRing pkey;
5937 pkey.add(mon_name, mon_key);
5938 bufferlist bl;
5939 pkey.encode_plaintext(bl);
5940 write_default_keyring(bl);
5941 keyring.remove(mon_name);
5942 }
5943 }
5944
5945 // AuthClient methods -- for mon <-> mon communication
5946 int Monitor::get_auth_request(
5947 Connection *con,
5948 AuthConnectionMeta *auth_meta,
5949 uint32_t *method,
5950 vector<uint32_t> *preferred_modes,
5951 bufferlist *out)
5952 {
5953 std::scoped_lock l(auth_lock);
5954 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
5955 con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
5956 return -EACCES;
5957 }
5958 AuthAuthorizer *auth;
5959 if (!get_authorizer(con->get_peer_type(), &auth)) {
5960 return -EACCES;
5961 }
5962 auth_meta->authorizer.reset(auth);
5963 auth_registry.get_supported_modes(con->get_peer_type(),
5964 auth->protocol,
5965 preferred_modes);
5966 *method = auth->protocol;
5967 *out = auth->bl;
5968 return 0;
5969 }
5970
5971 int Monitor::handle_auth_reply_more(
5972 Connection *con,
5973 AuthConnectionMeta *auth_meta,
5974 const bufferlist& bl,
5975 bufferlist *reply)
5976 {
5977 std::scoped_lock l(auth_lock);
5978 if (!auth_meta->authorizer) {
5979 derr << __func__ << " no authorizer?" << dendl;
5980 return -EACCES;
5981 }
5982 auth_meta->authorizer->add_challenge(cct, bl);
5983 *reply = auth_meta->authorizer->bl;
5984 return 0;
5985 }
5986
5987 int Monitor::handle_auth_done(
5988 Connection *con,
5989 AuthConnectionMeta *auth_meta,
5990 uint64_t global_id,
5991 uint32_t con_mode,
5992 const bufferlist& bl,
5993 CryptoKey *session_key,
5994 std::string *connection_secret)
5995 {
5996 std::scoped_lock l(auth_lock);
5997 // verify authorizer reply
5998 auto p = bl.begin();
5999 if (!auth_meta->authorizer->verify_reply(p, connection_secret)) {
6000 dout(0) << __func__ << " failed verifying authorizer reply" << dendl;
6001 return -EACCES;
6002 }
6003 auth_meta->session_key = auth_meta->authorizer->session_key;
6004 return 0;
6005 }
6006
6007 int Monitor::handle_auth_bad_method(
6008 Connection *con,
6009 AuthConnectionMeta *auth_meta,
6010 uint32_t old_auth_method,
6011 int result,
6012 const std::vector<uint32_t>& allowed_methods,
6013 const std::vector<uint32_t>& allowed_modes)
6014 {
6015 derr << __func__ << " hmm, they didn't like " << old_auth_method
6016 << " result " << cpp_strerror(result) << dendl;
6017 return -EACCES;
6018 }
6019
6020 bool Monitor::get_authorizer(int service_id, AuthAuthorizer **authorizer)
6021 {
6022 dout(10) << "get_authorizer for " << ceph_entity_type_name(service_id)
6023 << dendl;
6024
6025 if (is_shutdown())
6026 return false;
6027
6028 // we only connect to other monitors and mgr; every else connects to us.
6029 if (service_id != CEPH_ENTITY_TYPE_MON &&
6030 service_id != CEPH_ENTITY_TYPE_MGR)
6031 return false;
6032
6033 if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
6034 // auth_none
6035 dout(20) << __func__ << " building auth_none authorizer" << dendl;
6036 AuthNoneClientHandler handler{g_ceph_context};
6037 handler.set_global_id(0);
6038 *authorizer = handler.build_authorizer(service_id);
6039 return true;
6040 }
6041
6042 CephXServiceTicketInfo auth_ticket_info;
6043 CephXSessionAuthInfo info;
6044 int ret;
6045
6046 EntityName name;
6047 name.set_type(CEPH_ENTITY_TYPE_MON);
6048 auth_ticket_info.ticket.name = name;
6049 auth_ticket_info.ticket.global_id = 0;
6050
6051 if (service_id == CEPH_ENTITY_TYPE_MON) {
6052 // mon to mon authentication uses the private monitor shared key and not the
6053 // rotating key
6054 CryptoKey secret;
6055 if (!keyring.get_secret(name, secret) &&
6056 !key_server.get_secret(name, secret)) {
6057 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
6058 << dendl;
6059 stringstream ss, ds;
6060 int err = key_server.list_secrets(ds);
6061 if (err < 0)
6062 ss << "no installed auth entries!";
6063 else
6064 ss << "installed auth entries:";
6065 dout(0) << ss.str() << "\n" << ds.str() << dendl;
6066 return false;
6067 }
6068
6069 ret = key_server.build_session_auth_info(
6070 service_id, auth_ticket_info.ticket, info, secret, (uint64_t)-1);
6071 if (ret < 0) {
6072 dout(0) << __func__ << " failed to build mon session_auth_info "
6073 << cpp_strerror(ret) << dendl;
6074 return false;
6075 }
6076 } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
6077 // mgr
6078 ret = key_server.build_session_auth_info(
6079 service_id, auth_ticket_info.ticket, info);
6080 if (ret < 0) {
6081 derr << __func__ << " failed to build mgr service session_auth_info "
6082 << cpp_strerror(ret) << dendl;
6083 return false;
6084 }
6085 } else {
6086 ceph_abort(); // see check at top of fn
6087 }
6088
6089 CephXTicketBlob blob;
6090 if (!cephx_build_service_ticket_blob(cct, info, blob)) {
6091 dout(0) << "get_authorizer failed to build service ticket" << dendl;
6092 return false;
6093 }
6094 bufferlist ticket_data;
6095 encode(blob, ticket_data);
6096
6097 auto iter = ticket_data.cbegin();
6098 CephXTicketHandler handler(g_ceph_context, service_id);
6099 decode(handler.ticket, iter);
6100
6101 handler.session_key = info.session_key;
6102
6103 *authorizer = handler.build_authorizer(0);
6104
6105 return true;
6106 }
6107
6108 int Monitor::handle_auth_request(
6109 Connection *con,
6110 AuthConnectionMeta *auth_meta,
6111 bool more,
6112 uint32_t auth_method,
6113 const bufferlist &payload,
6114 bufferlist *reply)
6115 {
6116 std::scoped_lock l(auth_lock);
6117
6118 // NOTE: be careful, the Connection hasn't fully negotiated yet, so
6119 // e.g., peer_features, peer_addrs, and others are still unknown.
6120
6121 dout(10) << __func__ << " con " << con << (more ? " (more)":" (start)")
6122 << " method " << auth_method
6123 << " payload " << payload.length()
6124 << dendl;
6125 if (!payload.length()) {
6126 if (!con->is_msgr2() &&
6127 con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
6128 // for v1 connections, we tolerate no authorizer (from
6129 // non-monitors), because authentication happens via MAuth
6130 // messages.
6131 return 1;
6132 }
6133 return -EACCES;
6134 }
6135 if (!more) {
6136 auth_meta->auth_mode = payload[0];
6137 }
6138
6139 if (auth_meta->auth_mode >= AUTH_MODE_AUTHORIZER &&
6140 auth_meta->auth_mode <= AUTH_MODE_AUTHORIZER_MAX) {
6141 AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
6142 auth_method);
6143 if (!ah) {
6144 lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
6145 << auth_method << dendl;
6146 return -EOPNOTSUPP;
6147 }
6148 bool was_challenge = (bool)auth_meta->authorizer_challenge;
6149 bool isvalid = ah->verify_authorizer(
6150 cct,
6151 keyring,
6152 payload,
6153 auth_meta->get_connection_secret_length(),
6154 reply,
6155 &con->peer_name,
6156 &con->peer_global_id,
6157 &con->peer_caps_info,
6158 &auth_meta->session_key,
6159 &auth_meta->connection_secret,
6160 &auth_meta->authorizer_challenge);
6161 if (isvalid) {
6162 ms_handle_authentication(con);
6163 return 1;
6164 }
6165 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
6166 return 0;
6167 }
6168 dout(10) << __func__ << " bad authorizer on " << con << dendl;
6169 return -EACCES;
6170 } else if (auth_meta->auth_mode < AUTH_MODE_MON ||
6171 auth_meta->auth_mode > AUTH_MODE_MON_MAX) {
6172 derr << __func__ << " unrecognized auth mode " << auth_meta->auth_mode
6173 << dendl;
6174 return -EACCES;
6175 }
6176
6177 // wait until we've formed an initial quorum on mkfs so that we have
6178 // the initial keys (e.g., client.admin).
6179 if (authmon()->get_last_committed() == 0) {
6180 dout(10) << __func__ << " haven't formed initial quorum, EBUSY" << dendl;
6181 return -EBUSY;
6182 }
6183
6184 RefCountedPtr priv;
6185 MonSession *s;
6186 int32_t r = 0;
6187 auto p = payload.begin();
6188 if (!more) {
6189 if (con->get_priv()) {
6190 return -EACCES; // wtf
6191 }
6192
6193 // handler?
6194 unique_ptr<AuthServiceHandler> auth_handler{get_auth_service_handler(
6195 auth_method, g_ceph_context, &key_server)};
6196 if (!auth_handler) {
6197 dout(1) << __func__ << " auth_method " << auth_method << " not supported"
6198 << dendl;
6199 return -EOPNOTSUPP;
6200 }
6201
6202 uint8_t mode;
6203 EntityName entity_name;
6204
6205 try {
6206 decode(mode, p);
6207 if (mode < AUTH_MODE_MON ||
6208 mode > AUTH_MODE_MON_MAX) {
6209 dout(1) << __func__ << " invalid mode " << (int)mode << dendl;
6210 return -EACCES;
6211 }
6212 assert(mode >= AUTH_MODE_MON && mode <= AUTH_MODE_MON_MAX);
6213 decode(entity_name, p);
6214 decode(con->peer_global_id, p);
6215 } catch (buffer::error& e) {
6216 dout(1) << __func__ << " failed to decode, " << e.what() << dendl;
6217 return -EACCES;
6218 }
6219
6220 // supported method?
6221 if (entity_name.get_type() == CEPH_ENTITY_TYPE_MON ||
6222 entity_name.get_type() == CEPH_ENTITY_TYPE_OSD ||
6223 entity_name.get_type() == CEPH_ENTITY_TYPE_MDS ||
6224 entity_name.get_type() == CEPH_ENTITY_TYPE_MGR) {
6225 if (!auth_cluster_required.is_supported_auth(auth_method)) {
6226 dout(10) << __func__ << " entity " << entity_name << " method "
6227 << auth_method << " not among supported "
6228 << auth_cluster_required.get_supported_set() << dendl;
6229 return -EOPNOTSUPP;
6230 }
6231 } else {
6232 if (!auth_service_required.is_supported_auth(auth_method)) {
6233 dout(10) << __func__ << " entity " << entity_name << " method "
6234 << auth_method << " not among supported "
6235 << auth_cluster_required.get_supported_set() << dendl;
6236 return -EOPNOTSUPP;
6237 }
6238 }
6239
6240 // for msgr1 we would do some weirdness here to ensure signatures
6241 // are supported by the client if we require it. for msgr2 that
6242 // is not necessary.
6243
6244 if (!con->peer_global_id) {
6245 con->peer_global_id = authmon()->_assign_global_id();
6246 if (!con->peer_global_id) {
6247 dout(1) << __func__ << " failed to assign global_id" << dendl;
6248 return -EBUSY;
6249 }
6250 dout(10) << __func__ << " assigned global_id " << con->peer_global_id
6251 << dendl;
6252 }
6253
6254 // set up partial session
6255 s = new MonSession(con);
6256 s->auth_handler = auth_handler.release();
6257 con->set_priv(RefCountedPtr{s, false});
6258
6259 r = s->auth_handler->start_session(
6260 entity_name,
6261 auth_meta->get_connection_secret_length(),
6262 reply,
6263 &con->peer_caps_info,
6264 &auth_meta->session_key,
6265 &auth_meta->connection_secret);
6266 } else {
6267 priv = con->get_priv();
6268 if (!priv) {
6269 // this can happen if the async ms_handle_reset event races with
6270 // the unlocked call into handle_auth_request
6271 return -EACCES;
6272 }
6273 s = static_cast<MonSession*>(priv.get());
6274 r = s->auth_handler->handle_request(
6275 p,
6276 auth_meta->get_connection_secret_length(),
6277 reply,
6278 &con->peer_global_id,
6279 &con->peer_caps_info,
6280 &auth_meta->session_key,
6281 &auth_meta->connection_secret);
6282 }
6283 if (r > 0 &&
6284 !s->authenticated) {
6285 ms_handle_authentication(con);
6286 }
6287
6288 dout(30) << " r " << r << " reply:\n";
6289 reply->hexdump(*_dout);
6290 *_dout << dendl;
6291 return r;
6292 }
6293
6294 void Monitor::ms_handle_accept(Connection *con)
6295 {
6296 auto priv = con->get_priv();
6297 MonSession *s = static_cast<MonSession*>(priv.get());
6298 if (!s) {
6299 // legacy protocol v1?
6300 dout(10) << __func__ << " con " << con << " no session" << dendl;
6301 return;
6302 }
6303
6304 if (s->item.is_on_list()) {
6305 dout(10) << __func__ << " con " << con << " session " << s
6306 << " already on list" << dendl;
6307 } else {
6308 dout(10) << __func__ << " con " << con << " session " << s
6309 << " registering session for "
6310 << con->get_peer_addrs() << dendl;
6311 s->_ident(entity_name_t(con->get_peer_type(), con->get_peer_id()),
6312 con->get_peer_addrs());
6313 std::lock_guard l(session_map_lock);
6314 session_map.add_session(s);
6315 }
6316 }
6317
6318 int Monitor::ms_handle_authentication(Connection *con)
6319 {
6320 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
6321 // mon <-> mon connections need no Session, and setting one up
6322 // creates an awkward ref cycle between Session and Connection.
6323 return 1;
6324 }
6325
6326 auto priv = con->get_priv();
6327 MonSession *s = static_cast<MonSession*>(priv.get());
6328 if (!s) {
6329 // must be msgr2, otherwise dispatch would have set up the session.
6330 s = session_map.new_session(
6331 entity_name_t(con->get_peer_type(), -1), // we don't know yet
6332 con->get_peer_addrs(),
6333 con);
6334 assert(s);
6335 dout(10) << __func__ << " adding session " << s << " to con " << con
6336 << dendl;
6337 con->set_priv(s);
6338 logger->set(l_mon_num_sessions, session_map.get_size());
6339 logger->inc(l_mon_session_add);
6340 }
6341 dout(10) << __func__ << " session " << s << " con " << con
6342 << " addr " << s->con->get_peer_addr()
6343 << " " << *s << dendl;
6344
6345 AuthCapsInfo &caps_info = con->get_peer_caps_info();
6346 int ret = 0;
6347 if (caps_info.allow_all) {
6348 s->caps.set_allow_all();
6349 s->authenticated = true;
6350 ret = 1;
6351 } else if (caps_info.caps.length()) {
6352 bufferlist::const_iterator p = caps_info.caps.cbegin();
6353 string str;
6354 try {
6355 decode(str, p);
6356 } catch (const buffer::error &err) {
6357 derr << __func__ << " corrupt cap data for " << con->get_peer_entity_name()
6358 << " in auth db" << dendl;
6359 str.clear();
6360 ret = -EACCES;
6361 }
6362 if (ret >= 0) {
6363 if (s->caps.parse(str, NULL)) {
6364 s->authenticated = true;
6365 ret = 1;
6366 } else {
6367 derr << __func__ << " unparseable caps '" << str << "' for "
6368 << con->get_peer_entity_name() << dendl;
6369 ret = -EACCES;
6370 }
6371 }
6372 }
6373
6374 return ret;
6375 }