]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/Monitor.cc
Import ceph 15.2.8
[ceph.git] / ceph / src / mon / Monitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include <iterator>
17 #include <sstream>
18 #include <tuple>
19 #include <stdlib.h>
20 #include <signal.h>
21 #include <limits.h>
22 #include <cstring>
23 #include <boost/scope_exit.hpp>
24 #include <boost/algorithm/string/predicate.hpp>
25
26 #include "json_spirit/json_spirit_reader.h"
27 #include "json_spirit/json_spirit_writer.h"
28
29 #include "Monitor.h"
30 #include "common/version.h"
31 #include "common/blkdev.h"
32 #include "common/cmdparse.h"
33 #include "common/signal.h"
34
35 #include "osd/OSDMap.h"
36
37 #include "MonitorDBStore.h"
38
39 #include "messages/PaxosServiceMessage.h"
40 #include "messages/MMonMap.h"
41 #include "messages/MMonGetMap.h"
42 #include "messages/MMonGetVersion.h"
43 #include "messages/MMonGetVersionReply.h"
44 #include "messages/MGenericMessage.h"
45 #include "messages/MMonCommand.h"
46 #include "messages/MMonCommandAck.h"
47 #include "messages/MMonMetadata.h"
48 #include "messages/MMonSync.h"
49 #include "messages/MMonScrub.h"
50 #include "messages/MMonProbe.h"
51 #include "messages/MMonJoin.h"
52 #include "messages/MMonPaxos.h"
53 #include "messages/MRoute.h"
54 #include "messages/MForward.h"
55
56 #include "messages/MMonSubscribe.h"
57 #include "messages/MMonSubscribeAck.h"
58
59 #include "messages/MCommand.h"
60 #include "messages/MCommandReply.h"
61
62 #include "messages/MAuthReply.h"
63
64 #include "messages/MTimeCheck2.h"
65 #include "messages/MPing.h"
66
67 #include "common/strtol.h"
68 #include "common/ceph_argparse.h"
69 #include "common/Timer.h"
70 #include "common/Clock.h"
71 #include "common/errno.h"
72 #include "common/perf_counters.h"
73 #include "common/admin_socket.h"
74 #include "global/signal_handler.h"
75 #include "common/Formatter.h"
76 #include "include/stringify.h"
77 #include "include/color.h"
78 #include "include/ceph_fs.h"
79 #include "include/str_list.h"
80
81 #include "OSDMonitor.h"
82 #include "MDSMonitor.h"
83 #include "MonmapMonitor.h"
84 #include "LogMonitor.h"
85 #include "AuthMonitor.h"
86 #include "MgrMonitor.h"
87 #include "MgrStatMonitor.h"
88 #include "ConfigMonitor.h"
89 #include "mon/QuorumService.h"
90 #include "mon/HealthMonitor.h"
91 #include "mon/ConfigKeyService.h"
92 #include "common/config.h"
93 #include "common/cmdparse.h"
94 #include "include/ceph_assert.h"
95 #include "include/compat.h"
96 #include "perfglue/heap_profiler.h"
97
98 #include "auth/none/AuthNoneClientHandler.h"
99
100 #define dout_subsys ceph_subsys_mon
101 #undef dout_prefix
102 #define dout_prefix _prefix(_dout, this)
103 using namespace TOPNSPC::common;
104 static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
105 return *_dout << "mon." << mon->name << "@" << mon->rank
106 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
107 }
108
109 const string Monitor::MONITOR_NAME = "monitor";
110 const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
111
112
113 #undef FLAG
114 #undef COMMAND
115 #undef COMMAND_WITH_FLAG
116 #define FLAG(f) (MonCommand::FLAG_##f)
117 #define COMMAND(parsesig, helptext, modulename, req_perms) \
118 {parsesig, helptext, modulename, req_perms, FLAG(NONE)},
119 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, flags) \
120 {parsesig, helptext, modulename, req_perms, flags},
121 MonCommand mon_commands[] = {
122 #include <mon/MonCommands.h>
123 };
124 #undef COMMAND
125 #undef COMMAND_WITH_FLAG
126
127 Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
128 Messenger *m, Messenger *mgr_m, MonMap *map) :
129 Dispatcher(cct_),
130 AuthServer(cct_),
131 name(nm),
132 rank(-1),
133 messenger(m),
134 con_self(m ? m->get_loopback_connection() : NULL),
135 timer(cct_, lock),
136 finisher(cct_, "mon_finisher", "fin"),
137 cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads),
138 has_ever_joined(false),
139 logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
140 monmap(map),
141 log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
142 key_server(cct, &keyring),
143 auth_cluster_required(cct,
144 cct->_conf->auth_supported.empty() ?
145 cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
146 auth_service_required(cct,
147 cct->_conf->auth_supported.empty() ?
148 cct->_conf->auth_service_required : cct->_conf->auth_supported),
149 mgr_messenger(mgr_m),
150 mgr_client(cct_, mgr_m, monmap),
151 gss_ktfile_client(cct->_conf.get_val<std::string>("gss_ktab_client_file")),
152 store(s),
153
154 elector(this),
155 required_features(0),
156 leader(0),
157 quorum_con_features(0),
158 // scrub
159 scrub_version(0),
160 scrub_event(NULL),
161 scrub_timeout_event(NULL),
162
163 // sync state
164 sync_provider_count(0),
165 sync_cookie(0),
166 sync_full(false),
167 sync_start_version(0),
168 sync_timeout_event(NULL),
169 sync_last_committed_floor(0),
170
171 timecheck_round(0),
172 timecheck_acks(0),
173 timecheck_rounds_since_clean(0),
174 timecheck_event(NULL),
175
176 paxos_service(PAXOS_NUM),
177 admin_hook(NULL),
178 routed_request_tid(0),
179 op_tracker(cct, g_conf().get_val<bool>("mon_enable_op_tracker"), 1)
180 {
181 clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
182 audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
183
184 update_log_clients();
185
186 if (!gss_ktfile_client.empty()) {
187 // Assert we can export environment variable
188 /*
189 The default client keytab is used, if it is present and readable,
190 to automatically obtain initial credentials for GSSAPI client
191 applications. The principal name of the first entry in the client
192 keytab is used by default when obtaining initial credentials.
193 1. The KRB5_CLIENT_KTNAME environment variable.
194 2. The default_client_keytab_name profile variable in [libdefaults].
195 3. The hardcoded default, DEFCKTNAME.
196 */
197 const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
198 gss_ktfile_client.c_str(), 1));
199 ceph_assert(set_result == 0);
200 }
201
202 op_tracker.set_complaint_and_threshold(
203 g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count(),
204 g_conf().get_val<int64_t>("mon_op_log_threshold"));
205 op_tracker.set_history_size_and_duration(
206 g_conf().get_val<uint64_t>("mon_op_history_size"),
207 g_conf().get_val<std::chrono::seconds>("mon_op_history_duration").count());
208 op_tracker.set_history_slow_op_size_and_threshold(
209 g_conf().get_val<uint64_t>("mon_op_history_slow_op_size"),
210 g_conf().get_val<std::chrono::seconds>("mon_op_history_slow_op_threshold").count());
211
212 paxos = new Paxos(this, "paxos");
213
214 paxos_service[PAXOS_MDSMAP].reset(new MDSMonitor(this, paxos, "mdsmap"));
215 paxos_service[PAXOS_MONMAP].reset(new MonmapMonitor(this, paxos, "monmap"));
216 paxos_service[PAXOS_OSDMAP].reset(new OSDMonitor(cct, this, paxos, "osdmap"));
217 paxos_service[PAXOS_LOG].reset(new LogMonitor(this, paxos, "logm"));
218 paxos_service[PAXOS_AUTH].reset(new AuthMonitor(this, paxos, "auth"));
219 paxos_service[PAXOS_MGR].reset(new MgrMonitor(this, paxos, "mgr"));
220 paxos_service[PAXOS_MGRSTAT].reset(new MgrStatMonitor(this, paxos, "mgrstat"));
221 paxos_service[PAXOS_HEALTH].reset(new HealthMonitor(this, paxos, "health"));
222 paxos_service[PAXOS_CONFIG].reset(new ConfigMonitor(this, paxos, "config"));
223
224 config_key_service = new ConfigKeyService(this, paxos);
225
226 bool r = mon_caps.parse("allow *", NULL);
227 ceph_assert(r);
228
229 exited_quorum = ceph_clock_now();
230
231 // prepare local commands
232 local_mon_commands.resize(std::size(mon_commands));
233 for (unsigned i = 0; i < std::size(mon_commands); ++i) {
234 local_mon_commands[i] = mon_commands[i];
235 }
236 MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
237
238 prenautilus_local_mon_commands = local_mon_commands;
239 for (auto& i : prenautilus_local_mon_commands) {
240 std::string n = cmddesc_get_prenautilus_compat(i.cmdstring);
241 if (n != i.cmdstring) {
242 dout(20) << " pre-nautilus cmd " << i.cmdstring << " -> " << n << dendl;
243 i.cmdstring = n;
244 }
245 }
246 MonCommand::encode_vector(prenautilus_local_mon_commands, prenautilus_local_mon_commands_bl);
247
248 // assume our commands until we have an election. this only means
249 // we won't reply with EINVAL before the election; any command that
250 // actually matters will wait until we have quorum etc and then
251 // retry (and revalidate).
252 leader_mon_commands = local_mon_commands;
253 }
254
255 Monitor::~Monitor()
256 {
257 op_tracker.on_shutdown();
258
259 paxos_service.clear();
260 delete config_key_service;
261 delete paxos;
262 ceph_assert(session_map.sessions.empty());
263 }
264
265
266 class AdminHook : public AdminSocketHook {
267 Monitor *mon;
268 public:
269 explicit AdminHook(Monitor *m) : mon(m) {}
270 int call(std::string_view command, const cmdmap_t& cmdmap,
271 Formatter *f,
272 std::ostream& errss,
273 bufferlist& out) override {
274 stringstream outss;
275 int r = mon->do_admin_command(command, cmdmap, f, errss, outss);
276 out.append(outss);
277 return r;
278 }
279 };
280
281 int Monitor::do_admin_command(
282 std::string_view command,
283 const cmdmap_t& cmdmap,
284 Formatter *f,
285 std::ostream& err,
286 std::ostream& out)
287 {
288 std::lock_guard l(lock);
289
290 int r = 0;
291 string args;
292 for (auto p = cmdmap.begin();
293 p != cmdmap.end(); ++p) {
294 if (p->first == "prefix")
295 continue;
296 if (!args.empty())
297 args += ", ";
298 args += cmd_vartype_stringify(p->second);
299 }
300 args = "[" + args + "]";
301
302 bool read_only = (command == "mon_status" ||
303 command == "mon metadata" ||
304 command == "quorum_status" ||
305 command == "ops" ||
306 command == "sessions");
307
308 (read_only ? audit_clog->debug() : audit_clog->info())
309 << "from='admin socket' entity='admin socket' "
310 << "cmd='" << command << "' args=" << args << ": dispatch";
311
312 if (command == "mon_status") {
313 get_mon_status(f);
314 } else if (command == "quorum_status") {
315 _quorum_status(f, out);
316 } else if (command == "sync_force") {
317 string validate;
318 if ((!cmd_getval(cmdmap, "validate", validate)) ||
319 (validate != "--yes-i-really-mean-it")) {
320 err << "are you SURE? this will mean the monitor store will be erased "
321 "the next time the monitor is restarted. pass "
322 "'--yes-i-really-mean-it' if you really do.";
323 r = -EPERM;
324 goto abort;
325 }
326 sync_force(f);
327 } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0 ||
328 command.compare(0, 24, "add_bootstrap_peer_hintv") == 0) {
329 if (!_add_bootstrap_peer_hint(command, cmdmap, out))
330 goto abort;
331 } else if (command == "quorum enter") {
332 elector.start_participating();
333 start_election();
334 out << "started responding to quorum, initiated new election";
335 } else if (command == "quorum exit") {
336 start_election();
337 elector.stop_participating();
338 out << "stopped responding to quorum, initiated new election";
339 } else if (command == "ops") {
340 (void)op_tracker.dump_ops_in_flight(f);
341 } else if (command == "sessions") {
342 f->open_array_section("sessions");
343 for (auto p : session_map.sessions) {
344 f->dump_object("session", *p);
345 }
346 f->close_section();
347 } else if (command == "dump_historic_ops") {
348 if (!op_tracker.dump_historic_ops(f)) {
349 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
350 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
351 }
352 } else if (command == "dump_historic_ops_by_duration" ) {
353 if (op_tracker.dump_historic_ops(f, true)) {
354 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
355 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
356 }
357 } else if (command == "dump_historic_slow_ops") {
358 if (op_tracker.dump_historic_slow_ops(f, {})) {
359 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
360 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
361 }
362 } else if (command == "quorum") {
363 string quorumcmd;
364 cmd_getval(cmdmap, "quorumcmd", quorumcmd);
365 if (quorumcmd == "exit") {
366 start_election();
367 elector.stop_participating();
368 out << "stopped responding to quorum, initiated new election" << std::endl;
369 } else if (quorumcmd == "enter") {
370 elector.start_participating();
371 start_election();
372 out << "started responding to quorum, initiated new election" << std::endl;
373 } else {
374 err << "needs a valid 'quorum' command" << std::endl;
375 }
376 } else if (command == "smart") {
377 string want_devid;
378 cmd_getval(cmdmap, "devid", want_devid);
379
380 string devname = store->get_devname();
381 set<string> devnames;
382 get_raw_devices(devname, &devnames);
383 json_spirit::mObject json_map;
384 uint64_t smart_timeout = cct->_conf.get_val<uint64_t>(
385 "mon_smart_report_timeout");
386 for (auto& devname : devnames) {
387 string err;
388 string devid = get_device_id(devname, &err);
389 if (want_devid.size() && want_devid != devid) {
390 derr << "get_device_id failed on " << devname << ": " << err << dendl;
391 continue;
392 }
393 json_spirit::mValue smart_json;
394 if (block_device_get_metrics(devname, smart_timeout,
395 &smart_json)) {
396 dout(10) << "block_device_get_metrics failed for /dev/" << devname
397 << dendl;
398 continue;
399 }
400 json_map[devid] = smart_json;
401 }
402 json_spirit::write(json_map, out, json_spirit::pretty_print);
403 } else if (command == "heap") {
404 if (!ceph_using_tcmalloc()) {
405 err << "could not issue heap profiler command -- not using tcmalloc!";
406 r = -EOPNOTSUPP;
407 goto abort;
408 }
409 string cmd;
410 if (!cmd_getval(cmdmap, "heapcmd", cmd)) {
411 err << "unable to get value for command \"" << cmd << "\"";
412 r = -EINVAL;
413 goto abort;
414 }
415 std::vector<std::string> cmd_vec;
416 get_str_vec(cmd, cmd_vec);
417 string val;
418 if (cmd_getval(cmdmap, "value", val)) {
419 cmd_vec.push_back(val);
420 }
421 ceph_heap_profiler_handle_command(cmd_vec, out);
422 } else if (command == "compact") {
423 dout(1) << "triggering manual compaction" << dendl;
424 auto start = ceph::coarse_mono_clock::now();
425 store->compact_async();
426 auto end = ceph::coarse_mono_clock::now();
427 auto duration = ceph::to_seconds<double>(end - start);
428 dout(1) << "finished manual compaction in "
429 << duration << " seconds" << dendl;
430 out << "compacted " << g_conf().get_val<std::string>("mon_keyvaluedb")
431 << " in " << duration << " seconds";
432 } else {
433 ceph_abort_msg("bad AdminSocket command binding");
434 }
435 (read_only ? audit_clog->debug() : audit_clog->info())
436 << "from='admin socket' "
437 << "entity='admin socket' "
438 << "cmd=" << command << " "
439 << "args=" << args << ": finished";
440 return r;
441
442 abort:
443 (read_only ? audit_clog->debug() : audit_clog->info())
444 << "from='admin socket' "
445 << "entity='admin socket' "
446 << "cmd=" << command << " "
447 << "args=" << args << ": aborted";
448 return r;
449 }
450
451 void Monitor::handle_signal(int signum)
452 {
453 ceph_assert(signum == SIGINT || signum == SIGTERM);
454 derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
455 shutdown();
456 }
457
458 CompatSet Monitor::get_initial_supported_features()
459 {
460 CompatSet::FeatureSet ceph_mon_feature_compat;
461 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
462 CompatSet::FeatureSet ceph_mon_feature_incompat;
463 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
464 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
465 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
466 ceph_mon_feature_incompat);
467 }
468
469 CompatSet Monitor::get_supported_features()
470 {
471 CompatSet compat = get_initial_supported_features();
472 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
473 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
474 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
475 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
476 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
477 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
478 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
479 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
480 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS);
481 return compat;
482 }
483
484 CompatSet Monitor::get_legacy_features()
485 {
486 CompatSet::FeatureSet ceph_mon_feature_compat;
487 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
488 CompatSet::FeatureSet ceph_mon_feature_incompat;
489 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
490 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
491 ceph_mon_feature_incompat);
492 }
493
494 int Monitor::check_features(MonitorDBStore *store)
495 {
496 CompatSet required = get_supported_features();
497 CompatSet ondisk;
498
499 read_features_off_disk(store, &ondisk);
500
501 if (!required.writeable(ondisk)) {
502 CompatSet diff = required.unsupported(ondisk);
503 generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
504 return -EPERM;
505 }
506
507 return 0;
508 }
509
510 void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
511 {
512 bufferlist featuresbl;
513 store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
514 if (featuresbl.length() == 0) {
515 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
516 << "Assuming it is old-style and introducing one." << dendl;
517 //we only want the baseline ~v.18 features assumed to be on disk.
518 //If new features are introduced this code needs to disappear or
519 //be made smarter.
520 *features = get_legacy_features();
521
522 features->encode(featuresbl);
523 auto t(std::make_shared<MonitorDBStore::Transaction>());
524 t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
525 store->apply_transaction(t);
526 } else {
527 auto it = featuresbl.cbegin();
528 features->decode(it);
529 }
530 }
531
532 void Monitor::read_features()
533 {
534 read_features_off_disk(store, &features);
535 dout(10) << "features " << features << dendl;
536
537 calc_quorum_requirements();
538 dout(10) << "required_features " << required_features << dendl;
539 }
540
541 void Monitor::write_features(MonitorDBStore::TransactionRef t)
542 {
543 bufferlist bl;
544 features.encode(bl);
545 t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
546 }
547
548 const char** Monitor::get_tracked_conf_keys() const
549 {
550 static const char* KEYS[] = {
551 "crushtool", // helpful for testing
552 "mon_election_timeout",
553 "mon_lease",
554 "mon_lease_renew_interval_factor",
555 "mon_lease_ack_timeout_factor",
556 "mon_accept_timeout_factor",
557 // clog & admin clog
558 "clog_to_monitors",
559 "clog_to_syslog",
560 "clog_to_syslog_facility",
561 "clog_to_syslog_level",
562 "clog_to_graylog",
563 "clog_to_graylog_host",
564 "clog_to_graylog_port",
565 "host",
566 "fsid",
567 // periodic health to clog
568 "mon_health_to_clog",
569 "mon_health_to_clog_interval",
570 "mon_health_to_clog_tick_interval",
571 // scrub interval
572 "mon_scrub_interval",
573 "mon_allow_pool_delete",
574 // osdmap pruning - observed, not handled.
575 "mon_osdmap_full_prune_enabled",
576 "mon_osdmap_full_prune_min",
577 "mon_osdmap_full_prune_interval",
578 "mon_osdmap_full_prune_txsize",
579 // debug options - observed, not handled
580 "mon_debug_extra_checks",
581 "mon_debug_block_osdmap_trim",
582 NULL
583 };
584 return KEYS;
585 }
586
587 void Monitor::handle_conf_change(const ConfigProxy& conf,
588 const std::set<std::string> &changed)
589 {
590 sanitize_options();
591
592 dout(10) << __func__ << " " << changed << dendl;
593
594 if (changed.count("clog_to_monitors") ||
595 changed.count("clog_to_syslog") ||
596 changed.count("clog_to_syslog_level") ||
597 changed.count("clog_to_syslog_facility") ||
598 changed.count("clog_to_graylog") ||
599 changed.count("clog_to_graylog_host") ||
600 changed.count("clog_to_graylog_port") ||
601 changed.count("host") ||
602 changed.count("fsid")) {
603 update_log_clients();
604 }
605
606 if (changed.count("mon_health_to_clog") ||
607 changed.count("mon_health_to_clog_interval") ||
608 changed.count("mon_health_to_clog_tick_interval")) {
609 finisher.queue(new C_MonContext{this, [this, changed](int) {
610 std::lock_guard l{lock};
611 health_to_clog_update_conf(changed);
612 }});
613 }
614
615 if (changed.count("mon_scrub_interval")) {
616 int scrub_interval = conf->mon_scrub_interval;
617 finisher.queue(new C_MonContext{this, [this, scrub_interval](int) {
618 std::lock_guard l{lock};
619 scrub_update_interval(scrub_interval);
620 }});
621 }
622 }
623
624 void Monitor::update_log_clients()
625 {
626 map<string,string> log_to_monitors;
627 map<string,string> log_to_syslog;
628 map<string,string> log_channel;
629 map<string,string> log_prio;
630 map<string,string> log_to_graylog;
631 map<string,string> log_to_graylog_host;
632 map<string,string> log_to_graylog_port;
633 uuid_d fsid;
634 string host;
635
636 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
637 log_channel, log_prio, log_to_graylog,
638 log_to_graylog_host, log_to_graylog_port,
639 fsid, host))
640 return;
641
642 clog->update_config(log_to_monitors, log_to_syslog,
643 log_channel, log_prio, log_to_graylog,
644 log_to_graylog_host, log_to_graylog_port,
645 fsid, host);
646
647 audit_clog->update_config(log_to_monitors, log_to_syslog,
648 log_channel, log_prio, log_to_graylog,
649 log_to_graylog_host, log_to_graylog_port,
650 fsid, host);
651 }
652
653 int Monitor::sanitize_options()
654 {
655 int r = 0;
656
657 // mon_lease must be greater than mon_lease_renewal; otherwise we
658 // may incur in leases expiring before they are renewed.
659 if (g_conf()->mon_lease_renew_interval_factor >= 1.0) {
660 clog->error() << "mon_lease_renew_interval_factor ("
661 << g_conf()->mon_lease_renew_interval_factor
662 << ") must be less than 1.0";
663 r = -EINVAL;
664 }
665
666 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
667 // got time to renew the lease and get an ack for it. Having both options
668 // with the same value, for a given small vale, could mean timing out if
669 // the monitors happened to be overloaded -- or even under normal load for
670 // a small enough value.
671 if (g_conf()->mon_lease_ack_timeout_factor <= 1.0) {
672 clog->error() << "mon_lease_ack_timeout_factor ("
673 << g_conf()->mon_lease_ack_timeout_factor
674 << ") must be greater than 1.0";
675 r = -EINVAL;
676 }
677
678 return r;
679 }
680
681 int Monitor::preinit()
682 {
683 std::unique_lock l(lock);
684
685 dout(1) << "preinit fsid " << monmap->fsid << dendl;
686
687 int r = sanitize_options();
688 if (r < 0) {
689 derr << "option sanitization failed!" << dendl;
690 return r;
691 }
692
693 ceph_assert(!logger);
694 {
695 PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
696 pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess",
697 PerfCountersBuilder::PRIO_USEFUL);
698 pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions",
699 "sadd", PerfCountersBuilder::PRIO_INTERESTING);
700 pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions",
701 "srm", PerfCountersBuilder::PRIO_INTERESTING);
702 pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions",
703 "strm", PerfCountersBuilder::PRIO_USEFUL);
704 pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in",
705 "ecnt", PerfCountersBuilder::PRIO_USEFUL);
706 pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started",
707 "estt", PerfCountersBuilder::PRIO_INTERESTING);
708 pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won",
709 "ewon", PerfCountersBuilder::PRIO_INTERESTING);
710 pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost",
711 "elst", PerfCountersBuilder::PRIO_INTERESTING);
712 logger = pcb.create_perf_counters();
713 cct->get_perfcounters_collection()->add(logger);
714 }
715
716 ceph_assert(!cluster_logger);
717 {
718 PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
719 pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
720 pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
721 pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
722 pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
723 pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
724 pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
725 pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster", NULL, 0, unit_t(UNIT_BYTES));
726 pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space", NULL, 0, unit_t(UNIT_BYTES));
727 pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space", NULL, 0, unit_t(UNIT_BYTES));
728 pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
729 pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
730 pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
731 pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
732 pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
733 pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
734 pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
735 pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
736 pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
737 pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects", NULL, 0, unit_t(UNIT_BYTES));
738 cluster_logger = pcb.create_perf_counters();
739 }
740
741 paxos->init_logger();
742
743 // verify cluster_uuid
744 {
745 int r = check_fsid();
746 if (r == -ENOENT)
747 r = write_fsid();
748 if (r < 0) {
749 return r;
750 }
751 }
752
753 // open compatset
754 read_features();
755
756 // have we ever joined a quorum?
757 has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
758 dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
759
760 if (!has_ever_joined) {
761 // impose initial quorum restrictions?
762 list<string> initial_members;
763 get_str_list(g_conf()->mon_initial_members, initial_members);
764
765 if (!initial_members.empty()) {
766 dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
767
768 monmap->set_initial_members(
769 g_ceph_context, initial_members, name, messenger->get_myaddrs(),
770 &extra_probe_peers);
771
772 dout(10) << " monmap is " << *monmap << dendl;
773 dout(10) << " extra probe peers " << extra_probe_peers << dendl;
774 }
775 } else if (!monmap->contains(name)) {
776 derr << "not in monmap and have been in a quorum before; "
777 << "must have been removed" << dendl;
778 if (g_conf()->mon_force_quorum_join) {
779 dout(0) << "we should have died but "
780 << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
781 } else {
782 derr << "commit suicide!" << dendl;
783 return -ENOENT;
784 }
785 }
786
787 {
788 // We have a potentially inconsistent store state in hands. Get rid of it
789 // and start fresh.
790 bool clear_store = false;
791 if (store->exists("mon_sync", "in_sync")) {
792 dout(1) << __func__ << " clean up potentially inconsistent store state"
793 << dendl;
794 clear_store = true;
795 }
796
797 if (store->get("mon_sync", "force_sync") > 0) {
798 dout(1) << __func__ << " force sync by clearing store state" << dendl;
799 clear_store = true;
800 }
801
802 if (clear_store) {
803 set<string> sync_prefixes = get_sync_targets_names();
804 store->clear(sync_prefixes);
805 }
806 }
807
808 sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
809 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
810
811 init_paxos();
812
813 if (is_keyring_required()) {
814 // we need to bootstrap authentication keys so we can form an
815 // initial quorum.
816 if (authmon()->get_last_committed() == 0) {
817 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
818 bufferlist bl;
819 int err = store->get("mkfs", "keyring", bl);
820 if (err == 0 && bl.length() > 0) {
821 // Attempt to decode and extract keyring only if it is found.
822 KeyRing keyring;
823 auto p = bl.cbegin();
824 decode(keyring, p);
825 extract_save_mon_key(keyring);
826 }
827 }
828
829 string keyring_loc = g_conf()->mon_data + "/keyring";
830
831 r = keyring.load(cct, keyring_loc);
832 if (r < 0) {
833 EntityName mon_name;
834 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
835 EntityAuth mon_key;
836 if (key_server.get_auth(mon_name, mon_key)) {
837 dout(1) << "copying mon. key from old db to external keyring" << dendl;
838 keyring.add(mon_name, mon_key);
839 bufferlist bl;
840 keyring.encode_plaintext(bl);
841 write_default_keyring(bl);
842 } else {
843 derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
844 return r;
845 }
846 }
847 }
848
849 admin_hook = new AdminHook(this);
850 AdminSocket* admin_socket = cct->get_admin_socket();
851
852 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
853 l.unlock();
854 // register tell/asock commands
855 for (const auto& command : local_mon_commands) {
856 if (!command.is_tell()) {
857 continue;
858 }
859 const auto prefix = cmddesc_get_prefix(command.cmdstring);
860 if (prefix == "injectargs" ||
861 prefix == "version" ||
862 prefix == "tell") {
863 // not registerd by me
864 continue;
865 }
866 r = admin_socket->register_command(command.cmdstring, admin_hook,
867 command.helpstring);
868 ceph_assert(r == 0);
869 }
870 l.lock();
871
872 // add ourselves as a conf observer
873 g_conf().add_observer(this);
874
875 messenger->set_auth_client(this);
876 messenger->set_auth_server(this);
877 mgr_messenger->set_auth_client(this);
878
879 auth_registry.refresh_config();
880
881 return 0;
882 }
883
884 int Monitor::init()
885 {
886 dout(2) << "init" << dendl;
887 std::lock_guard l(lock);
888
889 finisher.start();
890
891 // start ticker
892 timer.init();
893 new_tick();
894
895 cpu_tp.start();
896
897 // i'm ready!
898 messenger->add_dispatcher_tail(this);
899
900 // kickstart pet mgrclient
901 mgr_client.init();
902 mgr_messenger->add_dispatcher_tail(&mgr_client);
903 mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
904 mgrmon()->prime_mgr_client();
905
906 state = STATE_PROBING;
907 bootstrap();
908 // add features of myself into feature_map
909 session_map.feature_map.add_mon(con_self->get_features());
910 return 0;
911 }
912
913 void Monitor::init_paxos()
914 {
915 dout(10) << __func__ << dendl;
916 paxos->init();
917
918 // init services
919 for (auto& svc : paxos_service) {
920 svc->init();
921 }
922
923 refresh_from_paxos(NULL);
924 }
925
926 void Monitor::refresh_from_paxos(bool *need_bootstrap)
927 {
928 dout(10) << __func__ << dendl;
929
930 bufferlist bl;
931 int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
932 if (r >= 0) {
933 try {
934 auto p = bl.cbegin();
935 decode(fingerprint, p);
936 }
937 catch (buffer::error& e) {
938 dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
939 }
940 } else {
941 dout(10) << __func__ << " no cluster_fingerprint" << dendl;
942 }
943
944 for (auto& svc : paxos_service) {
945 svc->refresh(need_bootstrap);
946 }
947 for (auto& svc : paxos_service) {
948 svc->post_refresh();
949 }
950 load_metadata();
951 }
952
953 void Monitor::register_cluster_logger()
954 {
955 if (!cluster_logger_registered) {
956 dout(10) << "register_cluster_logger" << dendl;
957 cluster_logger_registered = true;
958 cct->get_perfcounters_collection()->add(cluster_logger);
959 } else {
960 dout(10) << "register_cluster_logger - already registered" << dendl;
961 }
962 }
963
964 void Monitor::unregister_cluster_logger()
965 {
966 if (cluster_logger_registered) {
967 dout(10) << "unregister_cluster_logger" << dendl;
968 cluster_logger_registered = false;
969 cct->get_perfcounters_collection()->remove(cluster_logger);
970 } else {
971 dout(10) << "unregister_cluster_logger - not registered" << dendl;
972 }
973 }
974
975 void Monitor::update_logger()
976 {
977 cluster_logger->set(l_cluster_num_mon, monmap->size());
978 cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
979 }
980
981 void Monitor::shutdown()
982 {
983 dout(1) << "shutdown" << dendl;
984
985 lock.lock();
986
987 wait_for_paxos_write();
988
989 {
990 std::lock_guard l(auth_lock);
991 authmon()->_set_mon_num_rank(0, 0);
992 }
993
994 state = STATE_SHUTDOWN;
995
996 lock.unlock();
997 g_conf().remove_observer(this);
998 lock.lock();
999
1000 if (admin_hook) {
1001 cct->get_admin_socket()->unregister_commands(admin_hook);
1002 delete admin_hook;
1003 admin_hook = NULL;
1004 }
1005
1006 elector.shutdown();
1007
1008 mgr_client.shutdown();
1009
1010 lock.unlock();
1011 finisher.wait_for_empty();
1012 finisher.stop();
1013 lock.lock();
1014
1015 // clean up
1016 paxos->shutdown();
1017 for (auto& svc : paxos_service) {
1018 svc->shutdown();
1019 }
1020
1021 finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
1022 finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
1023
1024 timer.shutdown();
1025
1026 cpu_tp.stop();
1027
1028 remove_all_sessions();
1029
1030 log_client.shutdown();
1031
1032 // unlock before msgr shutdown...
1033 lock.unlock();
1034
1035 // shutdown messenger before removing logger from perfcounter collection,
1036 // otherwise _ms_dispatch() will try to update deleted logger
1037 messenger->shutdown();
1038 mgr_messenger->shutdown();
1039
1040 if (logger) {
1041 cct->get_perfcounters_collection()->remove(logger);
1042 delete logger;
1043 logger = NULL;
1044 }
1045 if (cluster_logger) {
1046 if (cluster_logger_registered)
1047 cct->get_perfcounters_collection()->remove(cluster_logger);
1048 delete cluster_logger;
1049 cluster_logger = NULL;
1050 }
1051 }
1052
1053 void Monitor::wait_for_paxos_write()
1054 {
1055 if (paxos->is_writing() || paxos->is_writing_previous()) {
1056 dout(10) << __func__ << " flushing pending write" << dendl;
1057 lock.unlock();
1058 store->flush();
1059 lock.lock();
1060 dout(10) << __func__ << " flushed pending write" << dendl;
1061 }
1062 }
1063
1064 void Monitor::respawn()
1065 {
1066 // --- WARNING TO FUTURE COPY/PASTERS ---
1067 // You must also add a call like
1068 //
1069 // ceph_pthread_setname(pthread_self(), "ceph-mon");
1070 //
1071 // to main() so that /proc/$pid/stat field 2 contains "(ceph-mon)"
1072 // instead of "(exe)", so that killall (and log rotation) will work.
1073
1074 dout(0) << __func__ << dendl;
1075
1076 char *new_argv[orig_argc+1];
1077 dout(1) << " e: '" << orig_argv[0] << "'" << dendl;
1078 for (int i=0; i<orig_argc; i++) {
1079 new_argv[i] = (char *)orig_argv[i];
1080 dout(1) << " " << i << ": '" << orig_argv[i] << "'" << dendl;
1081 }
1082 new_argv[orig_argc] = NULL;
1083
1084 /* Determine the path to our executable, test if Linux /proc/self/exe exists.
1085 * This allows us to exec the same executable even if it has since been
1086 * unlinked.
1087 */
1088 char exe_path[PATH_MAX] = "";
1089 #ifdef PROCPREFIX
1090 if (readlink(PROCPREFIX "/proc/self/exe", exe_path, PATH_MAX-1) != -1) {
1091 dout(1) << "respawning with exe " << exe_path << dendl;
1092 strcpy(exe_path, PROCPREFIX "/proc/self/exe");
1093 } else {
1094 #else
1095 {
1096 #endif
1097 /* Print CWD for the user's interest */
1098 char buf[PATH_MAX];
1099 char *cwd = getcwd(buf, sizeof(buf));
1100 ceph_assert(cwd);
1101 dout(1) << " cwd " << cwd << dendl;
1102
1103 /* Fall back to a best-effort: just running in our CWD */
1104 strncpy(exe_path, orig_argv[0], PATH_MAX-1);
1105 }
1106
1107 dout(1) << " exe_path " << exe_path << dendl;
1108
1109 unblock_all_signals(NULL);
1110 execv(exe_path, new_argv);
1111
1112 dout(0) << "respawn execv " << orig_argv[0]
1113 << " failed with " << cpp_strerror(errno) << dendl;
1114
1115 // We have to assert out here, because suicide() returns, and callers
1116 // to respawn expect it never to return.
1117 ceph_abort();
1118 }
1119
1120 void Monitor::bootstrap()
1121 {
1122 dout(10) << "bootstrap" << dendl;
1123 wait_for_paxos_write();
1124
1125 sync_reset_requester();
1126 unregister_cluster_logger();
1127 cancel_probe_timeout();
1128
1129 if (monmap->get_epoch() == 0) {
1130 dout(10) << "reverting to legacy ranks for seed monmap (epoch 0)" << dendl;
1131 monmap->calc_legacy_ranks();
1132 }
1133 dout(10) << "monmap " << *monmap << dendl;
1134 {
1135 auto from_release = monmap->min_mon_release;
1136 ostringstream err;
1137 if (!can_upgrade_from(from_release, "min_mon_release", err)) {
1138 derr << "current monmap has " << err.str() << " stopping." << dendl;
1139 exit(0);
1140 }
1141 }
1142 // note my rank
1143 int newrank = monmap->get_rank(messenger->get_myaddrs());
1144 if (newrank < 0 && rank >= 0) {
1145 // was i ever part of the quorum?
1146 if (has_ever_joined) {
1147 dout(0) << " removed from monmap, suicide." << dendl;
1148 exit(0);
1149 }
1150 }
1151 if (newrank >= 0 &&
1152 monmap->get_addrs(newrank) != messenger->get_myaddrs()) {
1153 dout(0) << " monmap addrs for rank " << newrank << " changed, i am "
1154 << messenger->get_myaddrs()
1155 << ", monmap is " << monmap->get_addrs(newrank) << ", respawning"
1156 << dendl;
1157
1158 if (monmap->get_epoch()) {
1159 // store this map in temp mon_sync location so that we use it on
1160 // our next startup
1161 derr << " stashing newest monmap " << monmap->get_epoch()
1162 << " for next startup" << dendl;
1163 bufferlist bl;
1164 monmap->encode(bl, -1);
1165 auto t(std::make_shared<MonitorDBStore::Transaction>());
1166 t->put("mon_sync", "temp_newer_monmap", bl);
1167 store->apply_transaction(t);
1168 }
1169
1170 respawn();
1171 }
1172 if (newrank != rank) {
1173 dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
1174 messenger->set_myname(entity_name_t::MON(newrank));
1175 rank = newrank;
1176
1177 // reset all connections, or else our peers will think we are someone else.
1178 messenger->mark_down_all();
1179 }
1180
1181 // reset
1182 state = STATE_PROBING;
1183
1184 _reset();
1185
1186 // sync store
1187 if (g_conf()->mon_compact_on_bootstrap) {
1188 dout(10) << "bootstrap -- triggering compaction" << dendl;
1189 store->compact();
1190 dout(10) << "bootstrap -- finished compaction" << dendl;
1191 }
1192
1193 // singleton monitor?
1194 if (monmap->size() == 1 && rank == 0) {
1195 win_standalone_election();
1196 return;
1197 }
1198
1199 reset_probe_timeout();
1200
1201 // i'm outside the quorum
1202 if (monmap->contains(name))
1203 outside_quorum.insert(name);
1204
1205 // probe monitors
1206 dout(10) << "probing other monitors" << dendl;
1207 for (unsigned i = 0; i < monmap->size(); i++) {
1208 if ((int)i != rank)
1209 send_mon_message(
1210 new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
1211 ceph_release()),
1212 i);
1213 }
1214 for (auto& av : extra_probe_peers) {
1215 if (av != messenger->get_myaddrs()) {
1216 messenger->send_to_mon(
1217 new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
1218 ceph_release()),
1219 av);
1220 }
1221 }
1222 }
1223
1224 bool Monitor::_add_bootstrap_peer_hint(std::string_view cmd,
1225 const cmdmap_t& cmdmap,
1226 ostream& ss)
1227 {
1228 if (is_leader() || is_peon()) {
1229 ss << "mon already active; ignoring bootstrap hint";
1230 return true;
1231 }
1232
1233 entity_addrvec_t addrs;
1234 string addrstr;
1235 if (cmd_getval(cmdmap, "addr", addrstr)) {
1236 dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' addr '"
1237 << addrstr << "'" << dendl;
1238
1239 entity_addr_t addr;
1240 const char *end = 0;
1241 if (!addr.parse(addrstr.c_str(), &end, entity_addr_t::TYPE_ANY)) {
1242 ss << "failed to parse addrs '" << addrstr
1243 << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1244 return false;
1245 }
1246
1247 addrs.v.push_back(addr);
1248 if (addr.get_port() == 0) {
1249 addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
1250 addrs.v[0].set_port(CEPH_MON_PORT_IANA);
1251 addrs.v.push_back(addr);
1252 addrs.v[1].set_type(entity_addr_t::TYPE_LEGACY);
1253 addrs.v[1].set_port(CEPH_MON_PORT_LEGACY);
1254 } else if (addr.get_type() == entity_addr_t::TYPE_ANY) {
1255 if (addr.get_port() == CEPH_MON_PORT_LEGACY) {
1256 addrs.v[0].set_type(entity_addr_t::TYPE_LEGACY);
1257 } else {
1258 addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
1259 }
1260 }
1261 } else if (cmd_getval(cmdmap, "addrv", addrstr)) {
1262 dout(10) << "_add_bootstrap_peer_hintv '" << cmd << "' addrv '"
1263 << addrstr << "'" << dendl;
1264 const char *end = 0;
1265 if (!addrs.parse(addrstr.c_str(), &end)) {
1266 ss << "failed to parse addrs '" << addrstr
1267 << "'; syntax is 'add_bootstrap_peer_hintv v2:ip:port[,v1:ip:port]'";
1268 return false;
1269 }
1270 } else {
1271 ss << "no addr or addrv provided";
1272 return false;
1273 }
1274
1275 extra_probe_peers.insert(addrs);
1276 ss << "adding peer " << addrs << " to list: " << extra_probe_peers;
1277 return true;
1278 }
1279
1280 // called by bootstrap(), or on leader|peon -> electing
1281 void Monitor::_reset()
1282 {
1283 dout(10) << __func__ << dendl;
1284
1285 // disable authentication
1286 {
1287 std::lock_guard l(auth_lock);
1288 authmon()->_set_mon_num_rank(0, 0);
1289 }
1290
1291 cancel_probe_timeout();
1292 timecheck_finish();
1293 health_events_cleanup();
1294 health_check_log_times.clear();
1295 scrub_event_cancel();
1296
1297 leader_since = utime_t();
1298 quorum_since = {};
1299 if (!quorum.empty()) {
1300 exited_quorum = ceph_clock_now();
1301 }
1302 quorum.clear();
1303 outside_quorum.clear();
1304 quorum_feature_map.clear();
1305
1306 scrub_reset();
1307
1308 paxos->restart();
1309
1310 for (auto& svc : paxos_service) {
1311 svc->restart();
1312 }
1313 }
1314
1315
1316 // -----------------------------------------------------------
1317 // sync
1318
1319 set<string> Monitor::get_sync_targets_names()
1320 {
1321 set<string> targets;
1322 targets.insert(paxos->get_name());
1323 for (auto& svc : paxos_service) {
1324 svc->get_store_prefixes(targets);
1325 }
1326 ConfigKeyService *config_key_service_ptr = dynamic_cast<ConfigKeyService*>(config_key_service);
1327 ceph_assert(config_key_service_ptr);
1328 config_key_service_ptr->get_store_prefixes(targets);
1329 return targets;
1330 }
1331
1332
1333 void Monitor::sync_timeout()
1334 {
1335 dout(10) << __func__ << dendl;
1336 ceph_assert(state == STATE_SYNCHRONIZING);
1337 bootstrap();
1338 }
1339
1340 void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1341 {
1342 dout(1) << __func__ << dendl;
1343
1344 MonMap latest_monmap;
1345
1346 // Grab latest monmap from MonmapMonitor
1347 bufferlist monmon_bl;
1348 int err = monmon()->get_monmap(monmon_bl);
1349 if (err < 0) {
1350 if (err != -ENOENT) {
1351 derr << __func__
1352 << " something wrong happened while reading the store: "
1353 << cpp_strerror(err) << dendl;
1354 ceph_abort_msg("error reading the store");
1355 }
1356 } else {
1357 latest_monmap.decode(monmon_bl);
1358 }
1359
1360 // Grab last backed up monmap (if any) and compare epochs
1361 if (store->exists("mon_sync", "latest_monmap")) {
1362 bufferlist backup_bl;
1363 int err = store->get("mon_sync", "latest_monmap", backup_bl);
1364 if (err < 0) {
1365 derr << __func__
1366 << " something wrong happened while reading the store: "
1367 << cpp_strerror(err) << dendl;
1368 ceph_abort_msg("error reading the store");
1369 }
1370 ceph_assert(backup_bl.length() > 0);
1371
1372 MonMap backup_monmap;
1373 backup_monmap.decode(backup_bl);
1374
1375 if (backup_monmap.epoch > latest_monmap.epoch)
1376 latest_monmap = backup_monmap;
1377 }
1378
1379 // Check if our current monmap's epoch is greater than the one we've
1380 // got so far.
1381 if (monmap->epoch > latest_monmap.epoch)
1382 latest_monmap = *monmap;
1383
1384 dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1385
1386 latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1387 }
1388
1389 void Monitor::sync_reset_requester()
1390 {
1391 dout(10) << __func__ << dendl;
1392
1393 if (sync_timeout_event) {
1394 timer.cancel_event(sync_timeout_event);
1395 sync_timeout_event = NULL;
1396 }
1397
1398 sync_provider = entity_addrvec_t();
1399 sync_cookie = 0;
1400 sync_full = false;
1401 sync_start_version = 0;
1402 }
1403
1404 void Monitor::sync_reset_provider()
1405 {
1406 dout(10) << __func__ << dendl;
1407 sync_providers.clear();
1408 }
1409
1410 void Monitor::sync_start(entity_addrvec_t &addrs, bool full)
1411 {
1412 dout(10) << __func__ << " " << addrs << (full ? " full" : " recent") << dendl;
1413
1414 ceph_assert(state == STATE_PROBING ||
1415 state == STATE_SYNCHRONIZING);
1416 state = STATE_SYNCHRONIZING;
1417
1418 // make sure are not a provider for anyone!
1419 sync_reset_provider();
1420
1421 sync_full = full;
1422
1423 if (sync_full) {
1424 // stash key state, and mark that we are syncing
1425 auto t(std::make_shared<MonitorDBStore::Transaction>());
1426 sync_stash_critical_state(t);
1427 t->put("mon_sync", "in_sync", 1);
1428
1429 sync_last_committed_floor = std::max(sync_last_committed_floor, paxos->get_version());
1430 dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1431 << sync_last_committed_floor << dendl;
1432 t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1433
1434 store->apply_transaction(t);
1435
1436 ceph_assert(g_conf()->mon_sync_requester_kill_at != 1);
1437
1438 // clear the underlying store
1439 set<string> targets = get_sync_targets_names();
1440 dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1441 store->clear(targets);
1442
1443 // make sure paxos knows it has been reset. this prevents a
1444 // bootstrap and then different probe reply order from possibly
1445 // deciding a partial or no sync is needed.
1446 paxos->init();
1447
1448 ceph_assert(g_conf()->mon_sync_requester_kill_at != 2);
1449 }
1450
1451 // assume 'other' as the leader. We will update the leader once we receive
1452 // a reply to the sync start.
1453 sync_provider = addrs;
1454
1455 sync_reset_timeout();
1456
1457 MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1458 if (!sync_full)
1459 m->last_committed = paxos->get_version();
1460 messenger->send_to_mon(m, sync_provider);
1461 }
1462
1463 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1464 {
1465 dout(10) << __func__ << dendl;
1466 bufferlist backup_monmap;
1467 sync_obtain_latest_monmap(backup_monmap);
1468 ceph_assert(backup_monmap.length() > 0);
1469 t->put("mon_sync", "latest_monmap", backup_monmap);
1470 }
1471
1472 void Monitor::sync_reset_timeout()
1473 {
1474 dout(10) << __func__ << dendl;
1475 if (sync_timeout_event)
1476 timer.cancel_event(sync_timeout_event);
1477 sync_timeout_event = timer.add_event_after(
1478 g_conf()->mon_sync_timeout,
1479 new C_MonContext{this, [this](int) {
1480 sync_timeout();
1481 }});
1482 }
1483
1484 void Monitor::sync_finish(version_t last_committed)
1485 {
1486 dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1487
1488 ceph_assert(g_conf()->mon_sync_requester_kill_at != 7);
1489
1490 if (sync_full) {
1491 // finalize the paxos commits
1492 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1493 paxos->read_and_prepare_transactions(tx, sync_start_version,
1494 last_committed);
1495 tx->put(paxos->get_name(), "last_committed", last_committed);
1496
1497 dout(30) << __func__ << " final tx dump:\n";
1498 JSONFormatter f(true);
1499 tx->dump(&f);
1500 f.flush(*_dout);
1501 *_dout << dendl;
1502
1503 store->apply_transaction(tx);
1504 }
1505
1506 ceph_assert(g_conf()->mon_sync_requester_kill_at != 8);
1507
1508 auto t(std::make_shared<MonitorDBStore::Transaction>());
1509 t->erase("mon_sync", "in_sync");
1510 t->erase("mon_sync", "force_sync");
1511 t->erase("mon_sync", "last_committed_floor");
1512 store->apply_transaction(t);
1513
1514 ceph_assert(g_conf()->mon_sync_requester_kill_at != 9);
1515
1516 init_paxos();
1517
1518 ceph_assert(g_conf()->mon_sync_requester_kill_at != 10);
1519
1520 bootstrap();
1521 }
1522
1523 void Monitor::handle_sync(MonOpRequestRef op)
1524 {
1525 auto m = op->get_req<MMonSync>();
1526 dout(10) << __func__ << " " << *m << dendl;
1527 switch (m->op) {
1528
1529 // provider ---------
1530
1531 case MMonSync::OP_GET_COOKIE_FULL:
1532 case MMonSync::OP_GET_COOKIE_RECENT:
1533 handle_sync_get_cookie(op);
1534 break;
1535 case MMonSync::OP_GET_CHUNK:
1536 handle_sync_get_chunk(op);
1537 break;
1538
1539 // client -----------
1540
1541 case MMonSync::OP_COOKIE:
1542 handle_sync_cookie(op);
1543 break;
1544
1545 case MMonSync::OP_CHUNK:
1546 case MMonSync::OP_LAST_CHUNK:
1547 handle_sync_chunk(op);
1548 break;
1549 case MMonSync::OP_NO_COOKIE:
1550 handle_sync_no_cookie(op);
1551 break;
1552
1553 default:
1554 dout(0) << __func__ << " unknown op " << m->op << dendl;
1555 ceph_abort_msg("unknown op");
1556 }
1557 }
1558
1559 // leader
1560
1561 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1562 {
1563 auto m = op->get_req<MMonSync>();
1564 MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1565 m->get_connection()->send_message(reply);
1566 }
1567
1568 void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1569 {
1570 auto m = op->get_req<MMonSync>();
1571 if (is_synchronizing()) {
1572 _sync_reply_no_cookie(op);
1573 return;
1574 }
1575
1576 ceph_assert(g_conf()->mon_sync_provider_kill_at != 1);
1577
1578 // make sure they can understand us.
1579 if ((required_features ^ m->get_connection()->get_features()) &
1580 required_features) {
1581 dout(5) << " ignoring peer mon." << m->get_source().num()
1582 << " has features " << std::hex
1583 << m->get_connection()->get_features()
1584 << " but we require " << required_features << std::dec << dendl;
1585 return;
1586 }
1587
1588 // make up a unique cookie. include election epoch (which persists
1589 // across restarts for the whole cluster) and a counter for this
1590 // process instance. there is no need to be unique *across*
1591 // monitors, though.
1592 uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
1593 ceph_assert(sync_providers.count(cookie) == 0);
1594
1595 dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1596
1597 SyncProvider& sp = sync_providers[cookie];
1598 sp.cookie = cookie;
1599 sp.addrs = m->get_source_addrs();
1600 sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
1601
1602 set<string> sync_targets;
1603 if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1604 // full scan
1605 sync_targets = get_sync_targets_names();
1606 sp.last_committed = paxos->get_version();
1607 sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1608 sp.full = true;
1609 dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1610 } else {
1611 // just catch up paxos
1612 sp.last_committed = m->last_committed;
1613 }
1614 dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1615
1616 MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1617 reply->last_committed = sp.last_committed;
1618 m->get_connection()->send_message(reply);
1619 }
1620
1621 void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1622 {
1623 auto m = op->get_req<MMonSync>();
1624 dout(10) << __func__ << " " << *m << dendl;
1625
1626 if (sync_providers.count(m->cookie) == 0) {
1627 dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1628 _sync_reply_no_cookie(op);
1629 return;
1630 }
1631
1632 ceph_assert(g_conf()->mon_sync_provider_kill_at != 2);
1633
1634 SyncProvider& sp = sync_providers[m->cookie];
1635 sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
1636
1637 if (sp.last_committed < paxos->get_first_committed() &&
1638 paxos->get_first_committed() > 1) {
1639 dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1640 << " < our fc " << paxos->get_first_committed() << dendl;
1641 sync_providers.erase(m->cookie);
1642 _sync_reply_no_cookie(op);
1643 return;
1644 }
1645
1646 MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1647 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1648
1649 int bytes_left = g_conf()->mon_sync_max_payload_size;
1650 int keys_left = g_conf()->mon_sync_max_payload_keys;
1651 while (sp.last_committed < paxos->get_version() &&
1652 bytes_left > 0 &&
1653 keys_left > 0) {
1654 bufferlist bl;
1655 sp.last_committed++;
1656
1657 int err = store->get(paxos->get_name(), sp.last_committed, bl);
1658 ceph_assert(err == 0);
1659
1660 tx->put(paxos->get_name(), sp.last_committed, bl);
1661 bytes_left -= bl.length();
1662 --keys_left;
1663 dout(20) << __func__ << " including paxos state " << sp.last_committed
1664 << dendl;
1665 }
1666 reply->last_committed = sp.last_committed;
1667
1668 if (sp.full && bytes_left > 0 && keys_left > 0) {
1669 sp.synchronizer->get_chunk_tx(tx, bytes_left, keys_left);
1670 sp.last_key = sp.synchronizer->get_last_key();
1671 reply->last_key = sp.last_key;
1672 }
1673
1674 if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1675 sp.last_committed < paxos->get_version()) {
1676 dout(10) << __func__ << " chunk, through version " << sp.last_committed
1677 << " key " << sp.last_key << dendl;
1678 } else {
1679 dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1680 << " key " << sp.last_key << dendl;
1681 reply->op = MMonSync::OP_LAST_CHUNK;
1682
1683 ceph_assert(g_conf()->mon_sync_provider_kill_at != 3);
1684
1685 // clean up our local state
1686 sync_providers.erase(sp.cookie);
1687 }
1688
1689 encode(*tx, reply->chunk_bl);
1690
1691 m->get_connection()->send_message(reply);
1692 }
1693
1694 // requester
1695
1696 void Monitor::handle_sync_cookie(MonOpRequestRef op)
1697 {
1698 auto m = op->get_req<MMonSync>();
1699 dout(10) << __func__ << " " << *m << dendl;
1700 if (sync_cookie) {
1701 dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1702 return;
1703 }
1704 if (m->get_source_addrs() != sync_provider) {
1705 dout(10) << __func__ << " source does not match, discarding" << dendl;
1706 return;
1707 }
1708 sync_cookie = m->cookie;
1709 sync_start_version = m->last_committed;
1710
1711 sync_reset_timeout();
1712 sync_get_next_chunk();
1713
1714 ceph_assert(g_conf()->mon_sync_requester_kill_at != 3);
1715 }
1716
1717 void Monitor::sync_get_next_chunk()
1718 {
1719 dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
1720 if (g_conf()->mon_inject_sync_get_chunk_delay > 0) {
1721 dout(20) << __func__ << " injecting delay of " << g_conf()->mon_inject_sync_get_chunk_delay << dendl;
1722 usleep((long long)(g_conf()->mon_inject_sync_get_chunk_delay * 1000000.0));
1723 }
1724 MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
1725 messenger->send_to_mon(r, sync_provider);
1726
1727 ceph_assert(g_conf()->mon_sync_requester_kill_at != 4);
1728 }
1729
1730 void Monitor::handle_sync_chunk(MonOpRequestRef op)
1731 {
1732 auto m = op->get_req<MMonSync>();
1733 dout(10) << __func__ << " " << *m << dendl;
1734
1735 if (m->cookie != sync_cookie) {
1736 dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1737 return;
1738 }
1739 if (m->get_source_addrs() != sync_provider) {
1740 dout(10) << __func__ << " source does not match, discarding" << dendl;
1741 return;
1742 }
1743
1744 ceph_assert(state == STATE_SYNCHRONIZING);
1745 ceph_assert(g_conf()->mon_sync_requester_kill_at != 5);
1746
1747 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1748 tx->append_from_encoded(m->chunk_bl);
1749
1750 dout(30) << __func__ << " tx dump:\n";
1751 JSONFormatter f(true);
1752 tx->dump(&f);
1753 f.flush(*_dout);
1754 *_dout << dendl;
1755
1756 store->apply_transaction(tx);
1757
1758 ceph_assert(g_conf()->mon_sync_requester_kill_at != 6);
1759
1760 if (!sync_full) {
1761 dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1762 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1763 paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1764 m->last_committed);
1765 tx->put(paxos->get_name(), "last_committed", m->last_committed);
1766
1767 dout(30) << __func__ << " tx dump:\n";
1768 JSONFormatter f(true);
1769 tx->dump(&f);
1770 f.flush(*_dout);
1771 *_dout << dendl;
1772
1773 store->apply_transaction(tx);
1774 paxos->init(); // to refresh what we just wrote
1775 }
1776
1777 if (m->op == MMonSync::OP_CHUNK) {
1778 sync_reset_timeout();
1779 sync_get_next_chunk();
1780 } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1781 sync_finish(m->last_committed);
1782 }
1783 }
1784
1785 void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1786 {
1787 dout(10) << __func__ << dendl;
1788 bootstrap();
1789 }
1790
1791 void Monitor::sync_trim_providers()
1792 {
1793 dout(20) << __func__ << dendl;
1794
1795 utime_t now = ceph_clock_now();
1796 map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1797 while (p != sync_providers.end()) {
1798 if (now > p->second.timeout) {
1799 dout(10) << __func__ << " expiring cookie " << p->second.cookie
1800 << " for " << p->second.addrs << dendl;
1801 sync_providers.erase(p++);
1802 } else {
1803 ++p;
1804 }
1805 }
1806 }
1807
1808 // ---------------------------------------------------
1809 // probe
1810
1811 void Monitor::cancel_probe_timeout()
1812 {
1813 if (probe_timeout_event) {
1814 dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1815 timer.cancel_event(probe_timeout_event);
1816 probe_timeout_event = NULL;
1817 } else {
1818 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1819 }
1820 }
1821
1822 void Monitor::reset_probe_timeout()
1823 {
1824 cancel_probe_timeout();
1825 probe_timeout_event = new C_MonContext{this, [this](int r) {
1826 probe_timeout(r);
1827 }};
1828 double t = g_conf()->mon_probe_timeout;
1829 if (timer.add_event_after(t, probe_timeout_event)) {
1830 dout(10) << "reset_probe_timeout " << probe_timeout_event
1831 << " after " << t << " seconds" << dendl;
1832 } else {
1833 probe_timeout_event = nullptr;
1834 }
1835 }
1836
1837 void Monitor::probe_timeout(int r)
1838 {
1839 dout(4) << "probe_timeout " << probe_timeout_event << dendl;
1840 ceph_assert(is_probing() || is_synchronizing());
1841 ceph_assert(probe_timeout_event);
1842 probe_timeout_event = NULL;
1843 bootstrap();
1844 }
1845
1846 void Monitor::handle_probe(MonOpRequestRef op)
1847 {
1848 auto m = op->get_req<MMonProbe>();
1849 dout(10) << "handle_probe " << *m << dendl;
1850
1851 if (m->fsid != monmap->fsid) {
1852 dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1853 return;
1854 }
1855
1856 switch (m->op) {
1857 case MMonProbe::OP_PROBE:
1858 handle_probe_probe(op);
1859 break;
1860
1861 case MMonProbe::OP_REPLY:
1862 handle_probe_reply(op);
1863 break;
1864
1865 case MMonProbe::OP_MISSING_FEATURES:
1866 derr << __func__ << " require release " << (int)m->mon_release << " > "
1867 << (int)ceph_release()
1868 << ", or missing features (have " << CEPH_FEATURES_ALL
1869 << ", required " << m->required_features
1870 << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL) << ")"
1871 << dendl;
1872 break;
1873 }
1874 }
1875
1876 void Monitor::handle_probe_probe(MonOpRequestRef op)
1877 {
1878 auto m = op->get_req<MMonProbe>();
1879
1880 dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
1881 << " features " << m->get_connection()->get_features() << dendl;
1882 uint64_t missing = required_features & ~m->get_connection()->get_features();
1883 if ((m->mon_release != ceph_release_t::unknown &&
1884 m->mon_release < monmap->min_mon_release) ||
1885 missing) {
1886 dout(1) << " peer " << m->get_source_addr()
1887 << " release " << m->mon_release
1888 << " < min_mon_release " << monmap->min_mon_release
1889 << ", or missing features " << missing << dendl;
1890 MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1891 name, has_ever_joined, monmap->min_mon_release);
1892 m->required_features = required_features;
1893 m->get_connection()->send_message(r);
1894 goto out;
1895 }
1896
1897 if (!is_probing() && !is_synchronizing()) {
1898 // If the probing mon is way ahead of us, we need to re-bootstrap.
1899 // Normally we capture this case when we initially bootstrap, but
1900 // it is possible we pass those checks (we overlap with
1901 // quorum-to-be) but fail to join a quorum before it moves past
1902 // us. We need to be kicked back to bootstrap so we can
1903 // synchonize, not keep calling elections.
1904 if (paxos->get_version() + 1 < m->paxos_first_version) {
1905 dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1906 << "ahead of us, re-bootstrapping" << dendl;
1907 bootstrap();
1908 goto out;
1909
1910 }
1911 }
1912
1913 MMonProbe *r;
1914 r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined,
1915 ceph_release());
1916 r->name = name;
1917 r->quorum = quorum;
1918 monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1919 r->paxos_first_version = paxos->get_first_committed();
1920 r->paxos_last_version = paxos->get_version();
1921 m->get_connection()->send_message(r);
1922
1923 // did we discover a peer here?
1924 if (!monmap->contains(m->get_source_addr())) {
1925 dout(1) << " adding peer " << m->get_source_addrs()
1926 << " to list of hints" << dendl;
1927 extra_probe_peers.insert(m->get_source_addrs());
1928 }
1929
1930 out:
1931 return;
1932 }
1933
1934 void Monitor::handle_probe_reply(MonOpRequestRef op)
1935 {
1936 auto m = op->get_req<MMonProbe>();
1937 dout(10) << "handle_probe_reply " << m->get_source_inst()
1938 << " " << *m << dendl;
1939 dout(10) << " monmap is " << *monmap << dendl;
1940
1941 // discover name and addrs during probing or electing states.
1942 if (!is_probing() && !is_electing()) {
1943 return;
1944 }
1945
1946 // newer map, or they've joined a quorum and we haven't?
1947 bufferlist mybl;
1948 monmap->encode(mybl, m->get_connection()->get_features());
1949 // make sure it's actually different; the checks below err toward
1950 // taking the other guy's map, which could cause us to loop.
1951 if (!mybl.contents_equal(m->monmap_bl)) {
1952 MonMap *newmap = new MonMap;
1953 newmap->decode(m->monmap_bl);
1954 if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
1955 !has_ever_joined)) {
1956 dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
1957 << ", mine was " << monmap->get_epoch() << dendl;
1958 delete newmap;
1959 monmap->decode(m->monmap_bl);
1960
1961 bootstrap();
1962 return;
1963 }
1964 delete newmap;
1965 }
1966
1967 // rename peer?
1968 string peer_name = monmap->get_name(m->get_source_addr());
1969 if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
1970 dout(10) << " renaming peer " << m->get_source_addr() << " "
1971 << peer_name << " -> " << m->name << " in my monmap"
1972 << dendl;
1973 monmap->rename(peer_name, m->name);
1974
1975 if (is_electing()) {
1976 bootstrap();
1977 return;
1978 }
1979 } else if (peer_name.size()) {
1980 dout(10) << " peer name is " << peer_name << dendl;
1981 } else {
1982 dout(10) << " peer " << m->get_source_addr() << " not in map" << dendl;
1983 }
1984
1985 // new initial peer?
1986 if (monmap->get_epoch() == 0 &&
1987 monmap->contains(m->name) &&
1988 monmap->get_addrs(m->name).front().is_blank_ip()) {
1989 dout(1) << " learned initial mon " << m->name
1990 << " addrs " << m->get_source_addrs() << dendl;
1991 monmap->set_addrvec(m->name, m->get_source_addrs());
1992
1993 bootstrap();
1994 return;
1995 }
1996
1997 // end discover phase
1998 if (!is_probing()) {
1999 return;
2000 }
2001
2002 ceph_assert(paxos != NULL);
2003
2004 if (is_synchronizing()) {
2005 dout(10) << " currently syncing" << dendl;
2006 return;
2007 }
2008
2009 entity_addrvec_t other = m->get_source_addrs();
2010
2011 if (m->paxos_last_version < sync_last_committed_floor) {
2012 dout(10) << " peer paxos versions [" << m->paxos_first_version
2013 << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
2014 << sync_last_committed_floor << ", ignoring"
2015 << dendl;
2016 } else {
2017 if (paxos->get_version() < m->paxos_first_version &&
2018 m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
2019 dout(10) << " peer paxos first versions [" << m->paxos_first_version
2020 << "," << m->paxos_last_version << "]"
2021 << " vs my version " << paxos->get_version()
2022 << " (too far ahead)"
2023 << dendl;
2024 cancel_probe_timeout();
2025 sync_start(other, true);
2026 return;
2027 }
2028 if (paxos->get_version() + g_conf()->paxos_max_join_drift < m->paxos_last_version) {
2029 dout(10) << " peer paxos last version " << m->paxos_last_version
2030 << " vs my version " << paxos->get_version()
2031 << " (too far ahead)"
2032 << dendl;
2033 cancel_probe_timeout();
2034 sync_start(other, false);
2035 return;
2036 }
2037 }
2038
2039 // did the existing cluster complete upgrade to luminous?
2040 if (osdmon()->osdmap.get_epoch()) {
2041 if (osdmon()->osdmap.require_osd_release < ceph_release_t::luminous) {
2042 derr << __func__ << " existing cluster has not completed upgrade to"
2043 << " luminous; 'ceph osd require_osd_release luminous' before"
2044 << " upgrading" << dendl;
2045 exit(0);
2046 }
2047 if (!osdmon()->osdmap.test_flag(CEPH_OSDMAP_PURGED_SNAPDIRS) ||
2048 !osdmon()->osdmap.test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) {
2049 derr << __func__ << " existing cluster has not completed a full luminous"
2050 << " scrub to purge legacy snapdir objects; please scrub before"
2051 << " upgrading beyond luminous." << dendl;
2052 exit(0);
2053 }
2054 }
2055
2056 // is there an existing quorum?
2057 if (m->quorum.size()) {
2058 dout(10) << " existing quorum " << m->quorum << dendl;
2059
2060 dout(10) << " peer paxos version " << m->paxos_last_version
2061 << " vs my version " << paxos->get_version()
2062 << " (ok)"
2063 << dendl;
2064
2065 if (monmap->contains(name) &&
2066 !monmap->get_addrs(name).front().is_blank_ip()) {
2067 // i'm part of the cluster; just initiate a new election
2068 start_election();
2069 } else {
2070 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
2071 send_mon_message(
2072 new MMonJoin(monmap->fsid, name, messenger->get_myaddrs()),
2073 *m->quorum.begin());
2074 }
2075 } else {
2076 if (monmap->contains(m->name)) {
2077 dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
2078 outside_quorum.insert(m->name);
2079 } else {
2080 dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
2081 return;
2082 }
2083
2084 unsigned need = monmap->min_quorum_size();
2085 dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
2086 if (outside_quorum.size() >= need) {
2087 if (outside_quorum.count(name)) {
2088 dout(10) << " that's enough to form a new quorum, calling election" << dendl;
2089 start_election();
2090 } else {
2091 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
2092 }
2093 } else {
2094 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
2095 }
2096 }
2097 }
2098
2099 void Monitor::join_election()
2100 {
2101 dout(10) << __func__ << dendl;
2102 wait_for_paxos_write();
2103 _reset();
2104 state = STATE_ELECTING;
2105
2106 logger->inc(l_mon_num_elections);
2107 }
2108
2109 void Monitor::start_election()
2110 {
2111 dout(10) << "start_election" << dendl;
2112 wait_for_paxos_write();
2113 _reset();
2114 state = STATE_ELECTING;
2115
2116 logger->inc(l_mon_num_elections);
2117 logger->inc(l_mon_election_call);
2118
2119 clog->info() << "mon." << name << " calling monitor election";
2120 elector.call_election();
2121 }
2122
2123 void Monitor::win_standalone_election()
2124 {
2125 dout(1) << "win_standalone_election" << dendl;
2126
2127 // bump election epoch, in case the previous epoch included other
2128 // monitors; we need to be able to make the distinction.
2129 elector.declare_standalone_victory();
2130
2131 rank = monmap->get_rank(name);
2132 ceph_assert(rank == 0);
2133 set<int> q;
2134 q.insert(rank);
2135
2136 map<int,Metadata> metadata;
2137 collect_metadata(&metadata[0]);
2138
2139 win_election(elector.get_epoch(), q,
2140 CEPH_FEATURES_ALL,
2141 ceph::features::mon::get_supported(),
2142 ceph_release(),
2143 metadata);
2144 }
2145
2146 const utime_t& Monitor::get_leader_since() const
2147 {
2148 ceph_assert(state == STATE_LEADER);
2149 return leader_since;
2150 }
2151
2152 epoch_t Monitor::get_epoch()
2153 {
2154 return elector.get_epoch();
2155 }
2156
2157 void Monitor::_finish_svc_election()
2158 {
2159 ceph_assert(state == STATE_LEADER || state == STATE_PEON);
2160
2161 for (auto& svc : paxos_service) {
2162 // we already called election_finished() on monmon(); avoid callig twice
2163 if (state == STATE_LEADER && svc.get() == monmon())
2164 continue;
2165 svc->election_finished();
2166 }
2167 }
2168
2169 void Monitor::win_election(epoch_t epoch, const set<int>& active, uint64_t features,
2170 const mon_feature_t& mon_features,
2171 ceph_release_t min_mon_release,
2172 const map<int,Metadata>& metadata)
2173 {
2174 dout(10) << __func__ << " epoch " << epoch << " quorum " << active
2175 << " features " << features
2176 << " mon_features " << mon_features
2177 << " min_mon_release " << min_mon_release
2178 << dendl;
2179 ceph_assert(is_electing());
2180 state = STATE_LEADER;
2181 leader_since = ceph_clock_now();
2182 quorum_since = mono_clock::now();
2183 leader = rank;
2184 quorum = active;
2185 quorum_con_features = features;
2186 quorum_mon_features = mon_features;
2187 quorum_min_mon_release = min_mon_release;
2188 pending_metadata = metadata;
2189 outside_quorum.clear();
2190
2191 clog->info() << "mon." << name << " is new leader, mons " << get_quorum_names()
2192 << " in quorum (ranks " << quorum << ")";
2193
2194 set_leader_commands(get_local_commands(mon_features));
2195
2196 paxos->leader_init();
2197 // NOTE: tell monmap monitor first. This is important for the
2198 // bootstrap case to ensure that the very first paxos proposal
2199 // codifies the monmap. Otherwise any manner of chaos can ensue
2200 // when monitors are call elections or participating in a paxos
2201 // round without agreeing on who the participants are.
2202 monmon()->election_finished();
2203 _finish_svc_election();
2204
2205 logger->inc(l_mon_election_win);
2206
2207 // inject new metadata in first transaction.
2208 {
2209 // include previous metadata for missing mons (that aren't part of
2210 // the current quorum).
2211 map<int,Metadata> m = metadata;
2212 for (unsigned rank = 0; rank < monmap->size(); ++rank) {
2213 if (m.count(rank) == 0 &&
2214 mon_metadata.count(rank)) {
2215 m[rank] = mon_metadata[rank];
2216 }
2217 }
2218
2219 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
2220 // a new transaction immediately after the election finishes. We should
2221 // do that anyway for other reasons, though.
2222 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
2223 bufferlist bl;
2224 encode(m, bl);
2225 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
2226 }
2227
2228 finish_election();
2229 if (monmap->size() > 1 &&
2230 monmap->get_epoch() > 0) {
2231 timecheck_start();
2232 health_tick_start();
2233
2234 // Freshen the health status before doing health_to_clog in case
2235 // our just-completed election changed the health
2236 healthmon()->wait_for_active_ctx(new LambdaContext([this](int r){
2237 dout(20) << "healthmon now active" << dendl;
2238 healthmon()->tick();
2239 if (healthmon()->is_proposing()) {
2240 dout(20) << __func__ << " healthmon proposing, waiting" << dendl;
2241 healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext{this,
2242 [this](int r){
2243 ceph_assert(ceph_mutex_is_locked_by_me(lock));
2244 do_health_to_clog_interval();
2245 }});
2246
2247 } else {
2248 do_health_to_clog_interval();
2249 }
2250 }));
2251
2252 scrub_event_start();
2253 }
2254 }
2255
2256 void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
2257 uint64_t features,
2258 const mon_feature_t& mon_features,
2259 ceph_release_t min_mon_release)
2260 {
2261 state = STATE_PEON;
2262 leader_since = utime_t();
2263 quorum_since = mono_clock::now();
2264 leader = l;
2265 quorum = q;
2266 outside_quorum.clear();
2267 quorum_con_features = features;
2268 quorum_mon_features = mon_features;
2269 quorum_min_mon_release = min_mon_release;
2270 dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
2271 << " quorum is " << quorum << " features are " << quorum_con_features
2272 << " mon_features are " << quorum_mon_features
2273 << " min_mon_release " << min_mon_release
2274 << dendl;
2275
2276 paxos->peon_init();
2277 _finish_svc_election();
2278
2279 logger->inc(l_mon_election_lose);
2280
2281 finish_election();
2282 }
2283
2284 namespace {
2285 std::string collect_compression_algorithms()
2286 {
2287 ostringstream os;
2288 bool printed = false;
2289 for (auto [name, key] : Compressor::compression_algorithms) {
2290 if (printed) {
2291 os << ", ";
2292 } else {
2293 printed = true;
2294 }
2295 std::ignore = key;
2296 os << name;
2297 }
2298 return os.str();
2299 }
2300 }
2301
2302 void Monitor::collect_metadata(Metadata *m)
2303 {
2304 collect_sys_info(m, g_ceph_context);
2305 (*m)["addrs"] = stringify(messenger->get_myaddrs());
2306 (*m)["compression_algorithms"] = collect_compression_algorithms();
2307
2308 // infer storage device
2309 string devname = store->get_devname();
2310 set<string> devnames;
2311 get_raw_devices(devname, &devnames);
2312 map<string,string> errs;
2313 get_device_metadata(devnames, m, &errs);
2314 for (auto& i : errs) {
2315 dout(1) << __func__ << " " << i.first << ": " << i.second << dendl;
2316 }
2317 }
2318
2319 void Monitor::finish_election()
2320 {
2321 apply_quorum_to_compatset_features();
2322 apply_monmap_to_compatset_features();
2323 timecheck_finish();
2324 exited_quorum = utime_t();
2325 finish_contexts(g_ceph_context, waitfor_quorum);
2326 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2327 resend_routed_requests();
2328 update_logger();
2329 register_cluster_logger();
2330
2331 // enable authentication
2332 {
2333 std::lock_guard l(auth_lock);
2334 authmon()->_set_mon_num_rank(monmap->size(), rank);
2335 }
2336
2337 // am i named properly?
2338 string cur_name = monmap->get_name(messenger->get_myaddrs());
2339 if (cur_name != name) {
2340 dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
2341 send_mon_message(
2342 new MMonJoin(monmap->fsid, name, messenger->get_myaddrs()),
2343 *quorum.begin());
2344 }
2345 }
2346
2347 void Monitor::_apply_compatset_features(CompatSet &new_features)
2348 {
2349 if (new_features.compare(features) != 0) {
2350 CompatSet diff = features.unsupported(new_features);
2351 dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2352 features = new_features;
2353
2354 auto t = std::make_shared<MonitorDBStore::Transaction>();
2355 write_features(t);
2356 store->apply_transaction(t);
2357
2358 calc_quorum_requirements();
2359 }
2360 }
2361
2362 void Monitor::apply_quorum_to_compatset_features()
2363 {
2364 CompatSet new_features(features);
2365 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
2366 if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2367 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2368 }
2369 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2370 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
2371 dout(5) << __func__ << dendl;
2372 _apply_compatset_features(new_features);
2373 }
2374
2375 void Monitor::apply_monmap_to_compatset_features()
2376 {
2377 CompatSet new_features(features);
2378 mon_feature_t monmap_features = monmap->get_required_features();
2379
2380 /* persistent monmap features may go into the compatset.
2381 * optional monmap features may not - why?
2382 * because optional monmap features may be set/unset by the admin,
2383 * and possibly by other means that haven't yet been thought out,
2384 * so we can't make the monitor enforce them on start - because they
2385 * may go away.
2386 * this, of course, does not invalidate setting a compatset feature
2387 * for an optional feature - as long as you make sure to clean it up
2388 * once you unset it.
2389 */
2390 if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
2391 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2392 ceph::features::mon::FEATURE_KRAKEN));
2393 // this feature should only ever be set if the quorum supports it.
2394 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
2395 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2396 }
2397 if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
2398 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2399 ceph::features::mon::FEATURE_LUMINOUS));
2400 // this feature should only ever be set if the quorum supports it.
2401 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
2402 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
2403 }
2404 if (monmap_features.contains_all(ceph::features::mon::FEATURE_MIMIC)) {
2405 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2406 ceph::features::mon::FEATURE_MIMIC));
2407 // this feature should only ever be set if the quorum supports it.
2408 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_MIMIC));
2409 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
2410 }
2411 if (monmap_features.contains_all(ceph::features::mon::FEATURE_NAUTILUS)) {
2412 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2413 ceph::features::mon::FEATURE_NAUTILUS));
2414 // this feature should only ever be set if the quorum supports it.
2415 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_NAUTILUS));
2416 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
2417 }
2418 if (monmap_features.contains_all(ceph::features::mon::FEATURE_OCTOPUS)) {
2419 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2420 ceph::features::mon::FEATURE_OCTOPUS));
2421 // this feature should only ever be set if the quorum supports it.
2422 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_OCTOPUS));
2423 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS);
2424 }
2425
2426 dout(5) << __func__ << dendl;
2427 _apply_compatset_features(new_features);
2428 }
2429
2430 void Monitor::calc_quorum_requirements()
2431 {
2432 required_features = 0;
2433
2434 // compatset
2435 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2436 required_features |= CEPH_FEATURE_OSDMAP_ENC;
2437 }
2438 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2439 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2440 }
2441 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) {
2442 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2443 }
2444 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_MIMIC)) {
2445 required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
2446 }
2447 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS)) {
2448 required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
2449 CEPH_FEATUREMASK_CEPHX_V2;
2450 }
2451 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS)) {
2452 required_features |= CEPH_FEATUREMASK_SERVER_OCTOPUS;
2453 }
2454
2455 // monmap
2456 if (monmap->get_required_features().contains_all(
2457 ceph::features::mon::FEATURE_KRAKEN)) {
2458 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2459 }
2460 if (monmap->get_required_features().contains_all(
2461 ceph::features::mon::FEATURE_LUMINOUS)) {
2462 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2463 }
2464 if (monmap->get_required_features().contains_all(
2465 ceph::features::mon::FEATURE_MIMIC)) {
2466 required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
2467 }
2468 if (monmap->get_required_features().contains_all(
2469 ceph::features::mon::FEATURE_NAUTILUS)) {
2470 required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
2471 CEPH_FEATUREMASK_CEPHX_V2;
2472 }
2473 dout(10) << __func__ << " required_features " << required_features << dendl;
2474 }
2475
2476 void Monitor::get_combined_feature_map(FeatureMap *fm)
2477 {
2478 *fm += session_map.feature_map;
2479 for (auto id : quorum) {
2480 if (id != rank) {
2481 *fm += quorum_feature_map[id];
2482 }
2483 }
2484 }
2485
2486 void Monitor::sync_force(Formatter *f)
2487 {
2488 auto tx(std::make_shared<MonitorDBStore::Transaction>());
2489 sync_stash_critical_state(tx);
2490 tx->put("mon_sync", "force_sync", 1);
2491 store->apply_transaction(tx);
2492
2493 f->open_object_section("sync_force");
2494 f->dump_int("ret", 0);
2495 f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2496 f->close_section(); // sync_force
2497 }
2498
2499 void Monitor::_quorum_status(Formatter *f, ostream& ss)
2500 {
2501 bool free_formatter = false;
2502
2503 if (!f) {
2504 // louzy/lazy hack: default to json if no formatter has been defined
2505 f = new JSONFormatter();
2506 free_formatter = true;
2507 }
2508 f->open_object_section("quorum_status");
2509 f->dump_int("election_epoch", get_epoch());
2510
2511 f->open_array_section("quorum");
2512 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2513 f->dump_int("mon", *p);
2514 f->close_section(); // quorum
2515
2516 list<string> quorum_names = get_quorum_names();
2517 f->open_array_section("quorum_names");
2518 for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2519 f->dump_string("mon", *p);
2520 f->close_section(); // quorum_names
2521
2522 f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin()));
2523
2524 if (!quorum.empty()) {
2525 f->dump_int(
2526 "quorum_age",
2527 std::chrono::duration_cast<std::chrono::seconds>(
2528 mono_clock::now() - quorum_since).count());
2529 }
2530
2531 f->open_object_section("features");
2532 f->dump_stream("quorum_con") << quorum_con_features;
2533 quorum_mon_features.dump(f, "quorum_mon");
2534 f->close_section();
2535
2536 f->open_object_section("monmap");
2537 monmap->dump(f);
2538 f->close_section(); // monmap
2539
2540 f->close_section(); // quorum_status
2541 f->flush(ss);
2542 if (free_formatter)
2543 delete f;
2544 }
2545
2546 void Monitor::get_mon_status(Formatter *f)
2547 {
2548 f->open_object_section("mon_status");
2549 f->dump_string("name", name);
2550 f->dump_int("rank", rank);
2551 f->dump_string("state", get_state_name());
2552 f->dump_int("election_epoch", get_epoch());
2553
2554 f->open_array_section("quorum");
2555 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2556 f->dump_int("mon", *p);
2557 }
2558 f->close_section(); // quorum
2559
2560 if (!quorum.empty()) {
2561 f->dump_int(
2562 "quorum_age",
2563 std::chrono::duration_cast<std::chrono::seconds>(
2564 mono_clock::now() - quorum_since).count());
2565 }
2566
2567 f->open_object_section("features");
2568 f->dump_stream("required_con") << required_features;
2569 mon_feature_t req_mon_features = get_required_mon_features();
2570 req_mon_features.dump(f, "required_mon");
2571 f->dump_stream("quorum_con") << quorum_con_features;
2572 quorum_mon_features.dump(f, "quorum_mon");
2573 f->close_section(); // features
2574
2575 f->open_array_section("outside_quorum");
2576 for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2577 f->dump_string("mon", *p);
2578 f->close_section(); // outside_quorum
2579
2580 f->open_array_section("extra_probe_peers");
2581 for (set<entity_addrvec_t>::iterator p = extra_probe_peers.begin();
2582 p != extra_probe_peers.end();
2583 ++p) {
2584 f->dump_object("peer", *p);
2585 }
2586 f->close_section(); // extra_probe_peers
2587
2588 f->open_array_section("sync_provider");
2589 for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2590 p != sync_providers.end();
2591 ++p) {
2592 f->dump_unsigned("cookie", p->second.cookie);
2593 f->dump_object("addrs", p->second.addrs);
2594 f->dump_stream("timeout") << p->second.timeout;
2595 f->dump_unsigned("last_committed", p->second.last_committed);
2596 f->dump_stream("last_key") << p->second.last_key;
2597 }
2598 f->close_section();
2599
2600 if (is_synchronizing()) {
2601 f->open_object_section("sync");
2602 f->dump_stream("sync_provider") << sync_provider;
2603 f->dump_unsigned("sync_cookie", sync_cookie);
2604 f->dump_unsigned("sync_start_version", sync_start_version);
2605 f->close_section();
2606 }
2607
2608 if (g_conf()->mon_sync_provider_kill_at > 0)
2609 f->dump_int("provider_kill_at", g_conf()->mon_sync_provider_kill_at);
2610 if (g_conf()->mon_sync_requester_kill_at > 0)
2611 f->dump_int("requester_kill_at", g_conf()->mon_sync_requester_kill_at);
2612
2613 f->open_object_section("monmap");
2614 monmap->dump(f);
2615 f->close_section();
2616
2617 f->dump_object("feature_map", session_map.feature_map);
2618 f->close_section(); // mon_status
2619 }
2620
2621
2622 // health status to clog
2623
2624 void Monitor::health_tick_start()
2625 {
2626 if (!cct->_conf->mon_health_to_clog ||
2627 cct->_conf->mon_health_to_clog_tick_interval <= 0)
2628 return;
2629
2630 dout(15) << __func__ << dendl;
2631
2632 health_tick_stop();
2633 health_tick_event = timer.add_event_after(
2634 cct->_conf->mon_health_to_clog_tick_interval,
2635 new C_MonContext{this, [this](int r) {
2636 if (r < 0)
2637 return;
2638 health_tick_start();
2639 }});
2640 }
2641
2642 void Monitor::health_tick_stop()
2643 {
2644 dout(15) << __func__ << dendl;
2645
2646 if (health_tick_event) {
2647 timer.cancel_event(health_tick_event);
2648 health_tick_event = NULL;
2649 }
2650 }
2651
2652 ceph::real_clock::time_point Monitor::health_interval_calc_next_update()
2653 {
2654 auto now = ceph::real_clock::now();
2655
2656 auto secs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch());
2657 int remainder = secs.count() % cct->_conf->mon_health_to_clog_interval;
2658 int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
2659 auto next = secs + std::chrono::seconds(adjustment);
2660
2661 dout(20) << __func__
2662 << " now: " << now << ","
2663 << " next: " << next << ","
2664 << " interval: " << cct->_conf->mon_health_to_clog_interval
2665 << dendl;
2666
2667 return ceph::real_clock::time_point{next};
2668 }
2669
2670 void Monitor::health_interval_start()
2671 {
2672 dout(15) << __func__ << dendl;
2673
2674 if (!cct->_conf->mon_health_to_clog ||
2675 cct->_conf->mon_health_to_clog_interval <= 0) {
2676 return;
2677 }
2678
2679 health_interval_stop();
2680 auto next = health_interval_calc_next_update();
2681 health_interval_event = new C_MonContext{this, [this](int r) {
2682 if (r < 0)
2683 return;
2684 do_health_to_clog_interval();
2685 }};
2686 if (!timer.add_event_at(next, health_interval_event)) {
2687 health_interval_event = nullptr;
2688 }
2689 }
2690
2691 void Monitor::health_interval_stop()
2692 {
2693 dout(15) << __func__ << dendl;
2694 if (health_interval_event) {
2695 timer.cancel_event(health_interval_event);
2696 }
2697 health_interval_event = NULL;
2698 }
2699
2700 void Monitor::health_events_cleanup()
2701 {
2702 health_tick_stop();
2703 health_interval_stop();
2704 health_status_cache.reset();
2705 }
2706
2707 void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2708 {
2709 dout(20) << __func__ << dendl;
2710
2711 if (changed.count("mon_health_to_clog")) {
2712 if (!cct->_conf->mon_health_to_clog) {
2713 health_events_cleanup();
2714 return;
2715 } else {
2716 if (!health_tick_event) {
2717 health_tick_start();
2718 }
2719 if (!health_interval_event) {
2720 health_interval_start();
2721 }
2722 }
2723 }
2724
2725 if (changed.count("mon_health_to_clog_interval")) {
2726 if (cct->_conf->mon_health_to_clog_interval <= 0) {
2727 health_interval_stop();
2728 } else {
2729 health_interval_start();
2730 }
2731 }
2732
2733 if (changed.count("mon_health_to_clog_tick_interval")) {
2734 if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2735 health_tick_stop();
2736 } else {
2737 health_tick_start();
2738 }
2739 }
2740 }
2741
2742 void Monitor::do_health_to_clog_interval()
2743 {
2744 // outputting to clog may have been disabled in the conf
2745 // since we were scheduled.
2746 if (!cct->_conf->mon_health_to_clog ||
2747 cct->_conf->mon_health_to_clog_interval <= 0)
2748 return;
2749
2750 dout(10) << __func__ << dendl;
2751
2752 // do we have a cached value for next_clog_update? if not,
2753 // do we know when the last update was?
2754
2755 do_health_to_clog(true);
2756 health_interval_start();
2757 }
2758
2759 void Monitor::do_health_to_clog(bool force)
2760 {
2761 // outputting to clog may have been disabled in the conf
2762 // since we were scheduled.
2763 if (!cct->_conf->mon_health_to_clog ||
2764 cct->_conf->mon_health_to_clog_interval <= 0)
2765 return;
2766
2767 dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2768
2769 string summary;
2770 health_status_t level = healthmon()->get_health_status(false, nullptr, &summary);
2771 if (!force &&
2772 summary == health_status_cache.summary &&
2773 level == health_status_cache.overall)
2774 return;
2775
2776 if (g_conf()->mon_health_detail_to_clog &&
2777 summary != health_status_cache.summary &&
2778 level != HEALTH_OK) {
2779 string details;
2780 level = healthmon()->get_health_status(true, nullptr, &details);
2781 clog->health(level) << "Health detail: " << details;
2782 } else {
2783 clog->health(level) << "overall " << summary;
2784 }
2785 health_status_cache.summary = summary;
2786 health_status_cache.overall = level;
2787 }
2788
2789 void Monitor::log_health(
2790 const health_check_map_t& updated,
2791 const health_check_map_t& previous,
2792 MonitorDBStore::TransactionRef t)
2793 {
2794 if (!g_conf()->mon_health_to_clog) {
2795 return;
2796 }
2797
2798 const utime_t now = ceph_clock_now();
2799
2800 // FIXME: log atomically as part of @t instead of using clog.
2801 dout(10) << __func__ << " updated " << updated.checks.size()
2802 << " previous " << previous.checks.size()
2803 << dendl;
2804 const auto min_log_period = g_conf().get_val<int64_t>(
2805 "mon_health_log_update_period");
2806 for (auto& p : updated.checks) {
2807 auto q = previous.checks.find(p.first);
2808 bool logged = false;
2809 if (q == previous.checks.end()) {
2810 // new
2811 ostringstream ss;
2812 ss << "Health check failed: " << p.second.summary << " ("
2813 << p.first << ")";
2814 clog->health(p.second.severity) << ss.str();
2815
2816 logged = true;
2817 } else {
2818 if (p.second.summary != q->second.summary ||
2819 p.second.severity != q->second.severity) {
2820
2821 auto status_iter = health_check_log_times.find(p.first);
2822 if (status_iter != health_check_log_times.end()) {
2823 if (p.second.severity == q->second.severity &&
2824 now - status_iter->second.updated_at < min_log_period) {
2825 // We already logged this recently and the severity is unchanged,
2826 // so skip emitting an update of the summary string.
2827 // We'll get an update out of tick() later if the check
2828 // is still failing.
2829 continue;
2830 }
2831 }
2832
2833 // summary or severity changed (ignore detail changes at this level)
2834 ostringstream ss;
2835 ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
2836 clog->health(p.second.severity) << ss.str();
2837
2838 logged = true;
2839 }
2840 }
2841 // Record the time at which we last logged, so that we can check this
2842 // when considering whether/when to print update messages.
2843 if (logged) {
2844 auto iter = health_check_log_times.find(p.first);
2845 if (iter == health_check_log_times.end()) {
2846 health_check_log_times.emplace(p.first, HealthCheckLogStatus(
2847 p.second.severity, p.second.summary, now));
2848 } else {
2849 iter->second = HealthCheckLogStatus(
2850 p.second.severity, p.second.summary, now);
2851 }
2852 }
2853 }
2854 for (auto& p : previous.checks) {
2855 if (!updated.checks.count(p.first)) {
2856 // cleared
2857 ostringstream ss;
2858 if (p.first == "DEGRADED_OBJECTS") {
2859 clog->info() << "All degraded objects recovered";
2860 } else if (p.first == "OSD_FLAGS") {
2861 clog->info() << "OSD flags cleared";
2862 } else {
2863 clog->info() << "Health check cleared: " << p.first << " (was: "
2864 << p.second.summary << ")";
2865 }
2866
2867 if (health_check_log_times.count(p.first)) {
2868 health_check_log_times.erase(p.first);
2869 }
2870 }
2871 }
2872
2873 if (previous.checks.size() && updated.checks.size() == 0) {
2874 // We might be going into a fully healthy state, check
2875 // other subsystems
2876 bool any_checks = false;
2877 for (auto& svc : paxos_service) {
2878 if (&(svc->get_health_checks()) == &(previous)) {
2879 // Ignore the ones we're clearing right now
2880 continue;
2881 }
2882
2883 if (svc->get_health_checks().checks.size() > 0) {
2884 any_checks = true;
2885 break;
2886 }
2887 }
2888 if (!any_checks) {
2889 clog->info() << "Cluster is now healthy";
2890 }
2891 }
2892 }
2893
2894 void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
2895 {
2896 if (f)
2897 f->open_object_section("status");
2898
2899 mono_clock::time_point now = mono_clock::now();
2900 if (f) {
2901 f->dump_stream("fsid") << monmap->get_fsid();
2902 healthmon()->get_health_status(false, f, nullptr);
2903 f->dump_unsigned("election_epoch", get_epoch());
2904 {
2905 f->open_array_section("quorum");
2906 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2907 f->dump_int("rank", *p);
2908 f->close_section();
2909 f->open_array_section("quorum_names");
2910 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2911 f->dump_string("id", monmap->get_name(*p));
2912 f->close_section();
2913 f->dump_int(
2914 "quorum_age",
2915 std::chrono::duration_cast<std::chrono::seconds>(
2916 mono_clock::now() - quorum_since).count());
2917 }
2918 f->open_object_section("monmap");
2919 monmap->dump_summary(f);
2920 f->close_section();
2921 f->open_object_section("osdmap");
2922 osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
2923 f->close_section();
2924 f->open_object_section("pgmap");
2925 mgrstatmon()->print_summary(f, NULL);
2926 f->close_section();
2927 f->open_object_section("fsmap");
2928 mdsmon()->get_fsmap().print_summary(f, NULL);
2929 f->close_section();
2930 f->open_object_section("mgrmap");
2931 mgrmon()->get_map().print_summary(f, nullptr);
2932 f->close_section();
2933
2934 f->dump_object("servicemap", mgrstatmon()->get_service_map());
2935
2936 f->open_object_section("progress_events");
2937 for (auto& i : mgrstatmon()->get_progress_events()) {
2938 f->dump_object(i.first.c_str(), i.second);
2939 }
2940 f->close_section();
2941
2942 f->close_section();
2943 } else {
2944 ss << " cluster:\n";
2945 ss << " id: " << monmap->get_fsid() << "\n";
2946
2947 string health;
2948 healthmon()->get_health_status(false, nullptr, &health,
2949 "\n ", "\n ");
2950 ss << " health: " << health << "\n";
2951
2952 ss << "\n \n services:\n";
2953 {
2954 size_t maxlen = 3;
2955 auto& service_map = mgrstatmon()->get_service_map();
2956 for (auto& p : service_map.services) {
2957 maxlen = std::max(maxlen, p.first.size());
2958 }
2959 string spacing(maxlen - 3, ' ');
2960 const auto quorum_names = get_quorum_names();
2961 const auto mon_count = monmap->mon_info.size();
2962 ss << " mon: " << spacing << mon_count << " daemons, quorum "
2963 << quorum_names << " (age " << timespan_str(now - quorum_since) << ")";
2964 if (quorum_names.size() != mon_count) {
2965 std::list<std::string> out_of_q;
2966 for (size_t i = 0; i < monmap->ranks.size(); ++i) {
2967 if (quorum.count(i) == 0) {
2968 out_of_q.push_back(monmap->ranks[i]);
2969 }
2970 }
2971 ss << ", out of quorum: " << joinify(out_of_q.begin(),
2972 out_of_q.end(), std::string(", "));
2973 }
2974 ss << "\n";
2975 if (mgrmon()->in_use()) {
2976 ss << " mgr: " << spacing;
2977 mgrmon()->get_map().print_summary(nullptr, &ss);
2978 ss << "\n";
2979 }
2980 if (mdsmon()->should_print_status()) {
2981 ss << " mds: " << spacing << mdsmon()->get_fsmap() << "\n";
2982 }
2983 ss << " osd: " << spacing;
2984 osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
2985 ss << "\n";
2986 for (auto& p : service_map.services) {
2987 const std::string &service = p.first;
2988 // filter out normal ceph entity types
2989 if (ServiceMap::is_normal_ceph_entity(service)) {
2990 continue;
2991 }
2992 ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
2993 << p.second.get_summary() << "\n";
2994 }
2995 }
2996
2997 {
2998 auto& service_map = mgrstatmon()->get_service_map();
2999 if (!service_map.services.empty()) {
3000 ss << "\n \n task status:\n";
3001 {
3002 for (auto &p : service_map.services) {
3003 ss << p.second.get_task_summary(p.first);
3004 }
3005 }
3006 }
3007 }
3008
3009 ss << "\n \n data:\n";
3010 mgrstatmon()->print_summary(NULL, &ss);
3011
3012 auto& pem = mgrstatmon()->get_progress_events();
3013 if (!pem.empty()) {
3014 ss << "\n \n progress:\n";
3015 for (auto& i : pem) {
3016 ss << " " << i.second.message << "\n";
3017 }
3018 }
3019 ss << "\n ";
3020 }
3021 }
3022
3023 void Monitor::_generate_command_map(cmdmap_t& cmdmap,
3024 map<string,string> &param_str_map)
3025 {
3026 for (auto p = cmdmap.begin(); p != cmdmap.end(); ++p) {
3027 if (p->first == "prefix")
3028 continue;
3029 if (p->first == "caps") {
3030 vector<string> cv;
3031 if (cmd_getval(cmdmap, "caps", cv) &&
3032 cv.size() % 2 == 0) {
3033 for (unsigned i = 0; i < cv.size(); i += 2) {
3034 string k = string("caps_") + cv[i];
3035 param_str_map[k] = cv[i + 1];
3036 }
3037 continue;
3038 }
3039 }
3040 param_str_map[p->first] = cmd_vartype_stringify(p->second);
3041 }
3042 }
3043
3044 const MonCommand *Monitor::_get_moncommand(
3045 const string &cmd_prefix,
3046 const vector<MonCommand>& cmds)
3047 {
3048 for (auto& c : cmds) {
3049 if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
3050 return &c;
3051 }
3052 }
3053 return nullptr;
3054 }
3055
3056 bool Monitor::_allowed_command(MonSession *s, const string &module,
3057 const string &prefix, const cmdmap_t& cmdmap,
3058 const map<string,string>& param_str_map,
3059 const MonCommand *this_cmd) {
3060
3061 bool cmd_r = this_cmd->requires_perm('r');
3062 bool cmd_w = this_cmd->requires_perm('w');
3063 bool cmd_x = this_cmd->requires_perm('x');
3064
3065 bool capable = s->caps.is_capable(
3066 g_ceph_context,
3067 s->entity_name,
3068 module, prefix, param_str_map,
3069 cmd_r, cmd_w, cmd_x,
3070 s->get_peer_socket_addr());
3071
3072 dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
3073 return capable;
3074 }
3075
3076 void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
3077 Formatter *f,
3078 uint64_t features,
3079 bufferlist *rdata)
3080 {
3081 int cmdnum = 0;
3082 f->open_object_section("command_descriptions");
3083 for (const auto &cmd : commands) {
3084 unsigned flags = cmd.flags;
3085 ostringstream secname;
3086 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
3087 dump_cmddesc_to_json(f, features, secname.str(),
3088 cmd.cmdstring, cmd.helpstring, cmd.module,
3089 cmd.req_perms, flags);
3090 cmdnum++;
3091 }
3092 f->close_section(); // command_descriptions
3093
3094 f->flush(*rdata);
3095 }
3096
3097 bool Monitor::is_keyring_required()
3098 {
3099 return auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX) ||
3100 auth_service_required.is_supported_auth(CEPH_AUTH_CEPHX) ||
3101 auth_cluster_required.is_supported_auth(CEPH_AUTH_GSS) ||
3102 auth_service_required.is_supported_auth(CEPH_AUTH_GSS);
3103 }
3104
3105 struct C_MgrProxyCommand : public Context {
3106 Monitor *mon;
3107 MonOpRequestRef op;
3108 uint64_t size;
3109 bufferlist outbl;
3110 string outs;
3111 C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
3112 : mon(mon), op(op), size(s) { }
3113 void finish(int r) {
3114 std::lock_guard l(mon->lock);
3115 mon->mgr_proxy_bytes -= size;
3116 mon->reply_command(op, r, outs, outbl, 0);
3117 }
3118 };
3119
3120 void Monitor::handle_tell_command(MonOpRequestRef op)
3121 {
3122 ceph_assert(op->is_type_command());
3123 MCommand *m = static_cast<MCommand*>(op->get_req());
3124 if (m->fsid != monmap->fsid) {
3125 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
3126 return reply_tell_command(op, -EACCES, "wrong fsid");
3127 }
3128 MonSession *session = op->get_session();
3129 if (!session) {
3130 dout(5) << __func__ << " dropping stray message " << *m << dendl;
3131 return;
3132 }
3133 cmdmap_t cmdmap;
3134 if (stringstream ss; !cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3135 return reply_tell_command(op, -EINVAL, ss.str());
3136 }
3137 map<string,string> param_str_map;
3138 _generate_command_map(cmdmap, param_str_map);
3139 string prefix;
3140 if (!cmd_getval(cmdmap, "prefix", prefix)) {
3141 return reply_tell_command(op, -EINVAL, "no prefix");
3142 }
3143 if (auto cmd = _get_moncommand(prefix,
3144 get_local_commands(quorum_mon_features));
3145 cmd) {
3146 if (cmd->is_obsolete() ||
3147 (cct->_conf->mon_debug_deprecated_as_obsolete &&
3148 cmd->is_deprecated())) {
3149 return reply_tell_command(op, -ENOTSUP,
3150 "command is obsolete; "
3151 "please check usage and/or man page");
3152 }
3153 }
3154 // see if command is whitelisted
3155 if (!session->caps.is_capable(
3156 g_ceph_context,
3157 session->entity_name,
3158 "mon", prefix, param_str_map,
3159 true, true, true,
3160 session->get_peer_socket_addr())) {
3161 return reply_tell_command(op, -EACCES, "insufficient caps");
3162 }
3163 // pass it to asok
3164 cct->get_admin_socket()->queue_tell_command(m);
3165 }
3166
3167 void Monitor::handle_command(MonOpRequestRef op)
3168 {
3169 ceph_assert(op->is_type_command());
3170 auto m = op->get_req<MMonCommand>();
3171 if (m->fsid != monmap->fsid) {
3172 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid
3173 << dendl;
3174 reply_command(op, -EPERM, "wrong fsid", 0);
3175 return;
3176 }
3177
3178 MonSession *session = op->get_session();
3179 if (!session) {
3180 dout(5) << __func__ << " dropping stray message " << *m << dendl;
3181 return;
3182 }
3183
3184 if (m->cmd.empty()) {
3185 reply_command(op, -EINVAL, "no command specified", 0);
3186 return;
3187 }
3188
3189 string prefix;
3190 vector<string> fullcmd;
3191 cmdmap_t cmdmap;
3192 stringstream ss, ds;
3193 bufferlist rdata;
3194 string rs;
3195 int r = -EINVAL;
3196 rs = "unrecognized command";
3197
3198 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3199 // ss has reason for failure
3200 r = -EINVAL;
3201 rs = ss.str();
3202 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3203 reply_command(op, r, rs, 0);
3204 return;
3205 }
3206
3207 // check return value. If no prefix parameter provided,
3208 // return value will be false, then return error info.
3209 if (!cmd_getval(cmdmap, "prefix", prefix)) {
3210 reply_command(op, -EINVAL, "command prefix not found", 0);
3211 return;
3212 }
3213
3214 // check prefix is empty
3215 if (prefix.empty()) {
3216 reply_command(op, -EINVAL, "command prefix must not be empty", 0);
3217 return;
3218 }
3219
3220 if (prefix == "get_command_descriptions") {
3221 bufferlist rdata;
3222 Formatter *f = Formatter::create("json");
3223
3224 std::vector<MonCommand> commands = static_cast<MgrMonitor*>(
3225 paxos_service[PAXOS_MGR].get())->get_command_descs();
3226
3227 for (auto& c : leader_mon_commands) {
3228 commands.push_back(c);
3229 }
3230
3231 auto features = m->get_connection()->get_features();
3232 format_command_descriptions(commands, f, features, &rdata);
3233 delete f;
3234 reply_command(op, 0, "", rdata, 0);
3235 return;
3236 }
3237
3238 string module;
3239 string err;
3240
3241 dout(0) << "handle_command " << *m << dendl;
3242
3243 string format;
3244 cmd_getval(cmdmap, "format", format, string("plain"));
3245 boost::scoped_ptr<Formatter> f(Formatter::create(format));
3246
3247 get_str_vec(prefix, fullcmd);
3248
3249 // make sure fullcmd is not empty.
3250 // invalid prefix will cause empty vector fullcmd.
3251 // such as, prefix=";,,;"
3252 if (fullcmd.empty()) {
3253 reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
3254 return;
3255 }
3256
3257 module = fullcmd[0];
3258
3259 // validate command is in leader map
3260
3261 const MonCommand *leader_cmd;
3262 const auto& mgr_cmds = mgrmon()->get_command_descs();
3263 const MonCommand *mgr_cmd = nullptr;
3264 if (!mgr_cmds.empty()) {
3265 mgr_cmd = _get_moncommand(prefix, mgr_cmds);
3266 }
3267 leader_cmd = _get_moncommand(prefix, leader_mon_commands);
3268 if (!leader_cmd) {
3269 leader_cmd = mgr_cmd;
3270 if (!leader_cmd) {
3271 reply_command(op, -EINVAL, "command not known", 0);
3272 return;
3273 }
3274 }
3275 // validate command is in our map & matches, or forward if it is allowed
3276 const MonCommand *mon_cmd = _get_moncommand(
3277 prefix,
3278 get_local_commands(quorum_mon_features));
3279 if (!mon_cmd) {
3280 mon_cmd = mgr_cmd;
3281 }
3282 if (!is_leader()) {
3283 if (!mon_cmd) {
3284 if (leader_cmd->is_noforward()) {
3285 reply_command(op, -EINVAL,
3286 "command not locally supported and not allowed to forward",
3287 0);
3288 return;
3289 }
3290 dout(10) << "Command not locally supported, forwarding request "
3291 << m << dendl;
3292 forward_request_leader(op);
3293 return;
3294 } else if (!mon_cmd->is_compat(leader_cmd)) {
3295 if (mon_cmd->is_noforward()) {
3296 reply_command(op, -EINVAL,
3297 "command not compatible with leader and not allowed to forward",
3298 0);
3299 return;
3300 }
3301 dout(10) << "Command not compatible with leader, forwarding request "
3302 << m << dendl;
3303 forward_request_leader(op);
3304 return;
3305 }
3306 }
3307
3308 if (mon_cmd->is_obsolete() ||
3309 (cct->_conf->mon_debug_deprecated_as_obsolete
3310 && mon_cmd->is_deprecated())) {
3311 reply_command(op, -ENOTSUP,
3312 "command is obsolete; please check usage and/or man page",
3313 0);
3314 return;
3315 }
3316
3317 if (session->proxy_con && mon_cmd->is_noforward()) {
3318 dout(10) << "Got forward for noforward command " << m << dendl;
3319 reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
3320 return;
3321 }
3322
3323 /* what we perceive as being the service the command falls under */
3324 string service(mon_cmd->module);
3325
3326 dout(25) << __func__ << " prefix='" << prefix
3327 << "' module='" << module
3328 << "' service='" << service << "'" << dendl;
3329
3330 bool cmd_is_rw =
3331 (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
3332
3333 // validate user's permissions for requested command
3334 map<string,string> param_str_map;
3335 _generate_command_map(cmdmap, param_str_map);
3336 if (!_allowed_command(session, service, prefix, cmdmap,
3337 param_str_map, mon_cmd)) {
3338 dout(1) << __func__ << " access denied" << dendl;
3339 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3340 << "from='" << session->name << " " << session->addrs << "' "
3341 << "entity='" << session->entity_name << "' "
3342 << "cmd=" << m->cmd << ": access denied";
3343 reply_command(op, -EACCES, "access denied", 0);
3344 return;
3345 }
3346
3347 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3348 << "from='" << session->name << " " << session->addrs << "' "
3349 << "entity='" << session->entity_name << "' "
3350 << "cmd=" << m->cmd << ": dispatch";
3351
3352 // compat kludge for legacy clients trying to tell commands that are
3353 // new. see bottom of MonCommands.h. we need to handle both (1)
3354 // pre-octopus clients and (2) octopus clients with a mix of pre-octopus
3355 // and octopus mons.
3356 if ((!HAVE_FEATURE(m->get_connection()->get_features(), SERVER_OCTOPUS) ||
3357 monmap->min_mon_release < ceph_release_t::octopus) &&
3358 (prefix == "injectargs" ||
3359 prefix == "smart" ||
3360 prefix == "mon_status" ||
3361 prefix == "heap")) {
3362 if (m->get_connection()->get_messenger() == 0) {
3363 // Prior to octopus, monitors might forward these messages
3364 // around. that was broken at baseline, and if we try to process
3365 // this message now, it will assert out when we try to send a
3366 // message in reply from the asok/tell worker (see
3367 // AnonConnection). Just reply with an error.
3368 dout(5) << __func__ << " failing forwarded command from a (presumably) "
3369 << "pre-octopus peer" << dendl;
3370 reply_command(
3371 op, -EBUSY,
3372 "failing forwarded tell command in mixed-version mon cluster", 0);
3373 return;
3374 }
3375 dout(5) << __func__ << " passing command to tell/asok" << dendl;
3376 cct->get_admin_socket()->queue_tell_command(m);
3377 return;
3378 }
3379
3380 if (mon_cmd->is_mgr()) {
3381 const auto& hdr = m->get_header();
3382 uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
3383 uint64_t max = g_conf().get_val<Option::size_t>("mon_client_bytes")
3384 * g_conf().get_val<double>("mon_mgr_proxy_client_bytes_ratio");
3385 if (mgr_proxy_bytes + size > max) {
3386 dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
3387 << " + " << size << " > max " << max << dendl;
3388 reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
3389 return;
3390 }
3391 mgr_proxy_bytes += size;
3392 dout(10) << __func__ << " proxying mgr command (+" << size
3393 << " -> " << mgr_proxy_bytes << ")" << dendl;
3394 C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
3395 mgr_client.start_command(m->cmd,
3396 m->get_data(),
3397 &fin->outbl,
3398 &fin->outs,
3399 new C_OnFinisher(fin, &finisher));
3400 return;
3401 }
3402
3403 if ((module == "mds" || module == "fs") &&
3404 prefix != "fs authorize") {
3405 mdsmon()->dispatch(op);
3406 return;
3407 }
3408 if ((module == "osd" ||
3409 prefix == "pg map" ||
3410 prefix == "pg repeer") &&
3411 prefix != "osd last-stat-seq") {
3412 osdmon()->dispatch(op);
3413 return;
3414 }
3415 if (module == "config") {
3416 configmon()->dispatch(op);
3417 return;
3418 }
3419
3420 if (module == "mon" &&
3421 /* Let the Monitor class handle the following commands:
3422 * 'mon scrub'
3423 */
3424 prefix != "mon scrub" &&
3425 prefix != "mon metadata" &&
3426 prefix != "mon versions" &&
3427 prefix != "mon count-metadata" &&
3428 prefix != "mon ok-to-stop" &&
3429 prefix != "mon ok-to-add-offline" &&
3430 prefix != "mon ok-to-rm") {
3431 monmon()->dispatch(op);
3432 return;
3433 }
3434 if (module == "health" && prefix != "health") {
3435 healthmon()->dispatch(op);
3436 return;
3437 }
3438 if (module == "auth" || prefix == "fs authorize") {
3439 authmon()->dispatch(op);
3440 return;
3441 }
3442 if (module == "log") {
3443 logmon()->dispatch(op);
3444 return;
3445 }
3446
3447 if (module == "config-key") {
3448 config_key_service->dispatch(op);
3449 return;
3450 }
3451
3452 if (module == "mgr") {
3453 mgrmon()->dispatch(op);
3454 return;
3455 }
3456
3457 if (prefix == "fsid") {
3458 if (f) {
3459 f->open_object_section("fsid");
3460 f->dump_stream("fsid") << monmap->fsid;
3461 f->close_section();
3462 f->flush(rdata);
3463 } else {
3464 ds << monmap->fsid;
3465 rdata.append(ds);
3466 }
3467 reply_command(op, 0, "", rdata, 0);
3468 return;
3469 }
3470
3471 if (prefix == "mon scrub") {
3472 wait_for_paxos_write();
3473 if (is_leader()) {
3474 int r = scrub_start();
3475 reply_command(op, r, "", rdata, 0);
3476 } else if (is_peon()) {
3477 forward_request_leader(op);
3478 } else {
3479 reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3480 }
3481 return;
3482 }
3483
3484 if (prefix == "time-sync-status") {
3485 if (!f)
3486 f.reset(Formatter::create("json-pretty"));
3487 f->open_object_section("time_sync");
3488 if (!timecheck_skews.empty()) {
3489 f->open_object_section("time_skew_status");
3490 for (auto& i : timecheck_skews) {
3491 double skew = i.second;
3492 double latency = timecheck_latencies[i.first];
3493 string name = monmap->get_name(i.first);
3494 ostringstream tcss;
3495 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
3496 f->open_object_section(name.c_str());
3497 f->dump_float("skew", skew);
3498 f->dump_float("latency", latency);
3499 f->dump_stream("health") << tcstatus;
3500 if (tcstatus != HEALTH_OK) {
3501 f->dump_stream("details") << tcss.str();
3502 }
3503 f->close_section();
3504 }
3505 f->close_section();
3506 }
3507 f->open_object_section("timechecks");
3508 f->dump_unsigned("epoch", get_epoch());
3509 f->dump_int("round", timecheck_round);
3510 f->dump_stream("round_status") << ((timecheck_round%2) ?
3511 "on-going" : "finished");
3512 f->close_section();
3513 f->close_section();
3514 f->flush(rdata);
3515 r = 0;
3516 rs = "";
3517 } else if (prefix == "status" ||
3518 prefix == "health" ||
3519 prefix == "df") {
3520 string detail;
3521 cmd_getval(cmdmap, "detail", detail);
3522
3523 if (prefix == "status") {
3524 // get_cluster_status handles f == NULL
3525 get_cluster_status(ds, f.get());
3526
3527 if (f) {
3528 f->flush(ds);
3529 ds << '\n';
3530 }
3531 rdata.append(ds);
3532 } else if (prefix == "health") {
3533 string plain;
3534 healthmon()->get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
3535 if (f) {
3536 f->flush(rdata);
3537 } else {
3538 rdata.append(plain);
3539 }
3540 } else if (prefix == "df") {
3541 bool verbose = (detail == "detail");
3542 if (f)
3543 f->open_object_section("stats");
3544
3545 mgrstatmon()->dump_cluster_stats(&ds, f.get(), verbose);
3546 if (!f) {
3547 ds << "\n \n";
3548 }
3549 mgrstatmon()->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
3550
3551 if (f) {
3552 f->close_section();
3553 f->flush(ds);
3554 ds << '\n';
3555 }
3556 } else {
3557 ceph_abort_msg("We should never get here!");
3558 return;
3559 }
3560 rdata.append(ds);
3561 rs = "";
3562 r = 0;
3563 } else if (prefix == "report") {
3564
3565 // this must be formatted, in its current form
3566 if (!f)
3567 f.reset(Formatter::create("json-pretty"));
3568 f->open_object_section("report");
3569 f->dump_stream("cluster_fingerprint") << fingerprint;
3570 f->dump_string("version", ceph_version_to_str());
3571 f->dump_string("commit", git_version_to_str());
3572 f->dump_stream("timestamp") << ceph_clock_now();
3573
3574 vector<string> tagsvec;
3575 cmd_getval(cmdmap, "tags", tagsvec);
3576 string tagstr = str_join(tagsvec, " ");
3577 if (!tagstr.empty())
3578 tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3579 f->dump_string("tag", tagstr);
3580
3581 healthmon()->get_health_status(true, f.get(), nullptr);
3582
3583 monmon()->dump_info(f.get());
3584 osdmon()->dump_info(f.get());
3585 mdsmon()->dump_info(f.get());
3586 authmon()->dump_info(f.get());
3587 mgrstatmon()->dump_info(f.get());
3588
3589 paxos->dump_info(f.get());
3590
3591 f->close_section();
3592 f->flush(rdata);
3593
3594 ostringstream ss2;
3595 ss2 << "report " << rdata.crc32c(CEPH_MON_PORT_LEGACY);
3596 rs = ss2.str();
3597 r = 0;
3598 } else if (prefix == "osd last-stat-seq") {
3599 int64_t osd;
3600 cmd_getval(cmdmap, "id", osd);
3601 uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3602 if (f) {
3603 f->dump_unsigned("seq", seq);
3604 f->flush(ds);
3605 } else {
3606 ds << seq;
3607 rdata.append(ds);
3608 }
3609 rs = "";
3610 r = 0;
3611 } else if (prefix == "node ls") {
3612 string node_type("all");
3613 cmd_getval(cmdmap, "type", node_type);
3614 if (!f)
3615 f.reset(Formatter::create("json-pretty"));
3616 if (node_type == "all") {
3617 f->open_object_section("nodes");
3618 print_nodes(f.get(), ds);
3619 osdmon()->print_nodes(f.get());
3620 mdsmon()->print_nodes(f.get());
3621 mgrmon()->print_nodes(f.get());
3622 f->close_section();
3623 } else if (node_type == "mon") {
3624 print_nodes(f.get(), ds);
3625 } else if (node_type == "osd") {
3626 osdmon()->print_nodes(f.get());
3627 } else if (node_type == "mds") {
3628 mdsmon()->print_nodes(f.get());
3629 } else if (node_type == "mgr") {
3630 mgrmon()->print_nodes(f.get());
3631 }
3632 f->flush(ds);
3633 rdata.append(ds);
3634 rs = "";
3635 r = 0;
3636 } else if (prefix == "features") {
3637 if (!is_leader() && !is_peon()) {
3638 dout(10) << " waiting for quorum" << dendl;
3639 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3640 return;
3641 }
3642 if (!is_leader()) {
3643 forward_request_leader(op);
3644 return;
3645 }
3646 if (!f)
3647 f.reset(Formatter::create("json-pretty"));
3648 FeatureMap fm;
3649 get_combined_feature_map(&fm);
3650 f->dump_object("features", fm);
3651 f->flush(rdata);
3652 rs = "";
3653 r = 0;
3654 } else if (prefix == "mon metadata") {
3655 if (!f)
3656 f.reset(Formatter::create("json-pretty"));
3657
3658 string name;
3659 bool all = !cmd_getval(cmdmap, "id", name);
3660 if (!all) {
3661 // Dump a single mon's metadata
3662 int mon = monmap->get_rank(name);
3663 if (mon < 0) {
3664 rs = "requested mon not found";
3665 r = -ENOENT;
3666 goto out;
3667 }
3668 f->open_object_section("mon_metadata");
3669 r = get_mon_metadata(mon, f.get(), ds);
3670 f->close_section();
3671 } else {
3672 // Dump all mons' metadata
3673 r = 0;
3674 f->open_array_section("mon_metadata");
3675 for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3676 std::ostringstream get_err;
3677 f->open_object_section("mon");
3678 f->dump_string("name", monmap->get_name(rank));
3679 r = get_mon_metadata(rank, f.get(), get_err);
3680 f->close_section();
3681 if (r == -ENOENT || r == -EINVAL) {
3682 dout(1) << get_err.str() << dendl;
3683 // Drop error, list what metadata we do have
3684 r = 0;
3685 } else if (r != 0) {
3686 derr << "Unexpected error from get_mon_metadata: "
3687 << cpp_strerror(r) << dendl;
3688 ds << get_err.str();
3689 break;
3690 }
3691 }
3692 f->close_section();
3693 }
3694
3695 f->flush(ds);
3696 rdata.append(ds);
3697 rs = "";
3698 } else if (prefix == "mon versions") {
3699 if (!f)
3700 f.reset(Formatter::create("json-pretty"));
3701 count_metadata("ceph_version", f.get());
3702 f->flush(ds);
3703 rdata.append(ds);
3704 rs = "";
3705 r = 0;
3706 } else if (prefix == "mon count-metadata") {
3707 if (!f)
3708 f.reset(Formatter::create("json-pretty"));
3709 string field;
3710 cmd_getval(cmdmap, "property", field);
3711 count_metadata(field, f.get());
3712 f->flush(ds);
3713 rdata.append(ds);
3714 rs = "";
3715 r = 0;
3716 } else if (prefix == "quorum_status") {
3717 // make sure our map is readable and up to date
3718 if (!is_leader() && !is_peon()) {
3719 dout(10) << " waiting for quorum" << dendl;
3720 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3721 return;
3722 }
3723 _quorum_status(f.get(), ds);
3724 rdata.append(ds);
3725 rs = "";
3726 r = 0;
3727 } else if (prefix == "mon ok-to-stop") {
3728 vector<string> ids;
3729 if (!cmd_getval(cmdmap, "ids", ids)) {
3730 r = -EINVAL;
3731 goto out;
3732 }
3733 set<string> wouldbe;
3734 for (auto rank : quorum) {
3735 wouldbe.insert(monmap->get_name(rank));
3736 }
3737 for (auto& n : ids) {
3738 if (monmap->contains(n)) {
3739 wouldbe.erase(n);
3740 }
3741 }
3742 if (wouldbe.size() < monmap->min_quorum_size()) {
3743 r = -EBUSY;
3744 rs = "not enough monitors would be available (" + stringify(wouldbe) +
3745 ") after stopping mons " + stringify(ids);
3746 goto out;
3747 }
3748 r = 0;
3749 rs = "quorum should be preserved (" + stringify(wouldbe) +
3750 ") after stopping " + stringify(ids);
3751 } else if (prefix == "mon ok-to-add-offline") {
3752 if (quorum.size() < monmap->min_quorum_size(monmap->size() + 1)) {
3753 rs = "adding a monitor may break quorum (until that monitor starts)";
3754 r = -EBUSY;
3755 goto out;
3756 }
3757 rs = "adding another mon that is not yet online will not break quorum";
3758 r = 0;
3759 } else if (prefix == "mon ok-to-rm") {
3760 string id;
3761 if (!cmd_getval(cmdmap, "id", id)) {
3762 r = -EINVAL;
3763 rs = "must specify a monitor id";
3764 goto out;
3765 }
3766 if (!monmap->contains(id)) {
3767 r = 0;
3768 rs = "mon." + id + " does not exist";
3769 goto out;
3770 }
3771 int rank = monmap->get_rank(id);
3772 if (quorum.count(rank) &&
3773 quorum.size() - 1 < monmap->min_quorum_size(monmap->size() - 1)) {
3774 r = -EBUSY;
3775 rs = "removing mon." + id + " would break quorum";
3776 goto out;
3777 }
3778 r = 0;
3779 rs = "safe to remove mon." + id;
3780 } else if (prefix == "version") {
3781 if (f) {
3782 f->open_object_section("version");
3783 f->dump_string("version", pretty_version_to_str());
3784 f->close_section();
3785 f->flush(ds);
3786 } else {
3787 ds << pretty_version_to_str();
3788 }
3789 rdata.append(ds);
3790 rs = "";
3791 r = 0;
3792 } else if (prefix == "versions") {
3793 if (!f)
3794 f.reset(Formatter::create("json-pretty"));
3795 map<string,int> overall;
3796 f->open_object_section("version");
3797 map<string,int> mon, mgr, osd, mds;
3798
3799 count_metadata("ceph_version", &mon);
3800 f->open_object_section("mon");
3801 for (auto& p : mon) {
3802 f->dump_int(p.first.c_str(), p.second);
3803 overall[p.first] += p.second;
3804 }
3805 f->close_section();
3806
3807 mgrmon()->count_metadata("ceph_version", &mgr);
3808 f->open_object_section("mgr");
3809 for (auto& p : mgr) {
3810 f->dump_int(p.first.c_str(), p.second);
3811 overall[p.first] += p.second;
3812 }
3813 f->close_section();
3814
3815 osdmon()->count_metadata("ceph_version", &osd);
3816 f->open_object_section("osd");
3817 for (auto& p : osd) {
3818 f->dump_int(p.first.c_str(), p.second);
3819 overall[p.first] += p.second;
3820 }
3821 f->close_section();
3822
3823 mdsmon()->count_metadata("ceph_version", &mds);
3824 f->open_object_section("mds");
3825 for (auto& p : mds) {
3826 f->dump_int(p.first.c_str(), p.second);
3827 overall[p.first] += p.second;
3828 }
3829 f->close_section();
3830
3831 for (auto& p : mgrstatmon()->get_service_map().services) {
3832 auto &service = p.first;
3833 if (ServiceMap::is_normal_ceph_entity(service)) {
3834 continue;
3835 }
3836 f->open_object_section(service.c_str());
3837 map<string,int> m;
3838 p.second.count_metadata("ceph_version", &m);
3839 for (auto& q : m) {
3840 f->dump_int(q.first.c_str(), q.second);
3841 overall[q.first] += q.second;
3842 }
3843 f->close_section();
3844 }
3845
3846 f->open_object_section("overall");
3847 for (auto& p : overall) {
3848 f->dump_int(p.first.c_str(), p.second);
3849 }
3850 f->close_section();
3851 f->close_section();
3852 f->flush(rdata);
3853 rs = "";
3854 r = 0;
3855 }
3856
3857 out:
3858 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3859 reply_command(op, r, rs, rdata, 0);
3860 }
3861
3862 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
3863 {
3864 bufferlist rdata;
3865 reply_command(op, rc, rs, rdata, version);
3866 }
3867
3868 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
3869 bufferlist& rdata, version_t version)
3870 {
3871 auto m = op->get_req<MMonCommand>();
3872 ceph_assert(m->get_type() == MSG_MON_COMMAND);
3873 MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
3874 reply->set_tid(m->get_tid());
3875 reply->set_data(rdata);
3876 send_reply(op, reply);
3877 }
3878
3879 void Monitor::reply_tell_command(
3880 MonOpRequestRef op, int rc, const string &rs)
3881 {
3882 MCommand *m = static_cast<MCommand*>(op->get_req());
3883 ceph_assert(m->get_type() == MSG_COMMAND);
3884 MCommandReply *reply = new MCommandReply(rc, rs);
3885 reply->set_tid(m->get_tid());
3886 m->get_connection()->send_message(reply);
3887 }
3888
3889
3890 // ------------------------
3891 // request/reply routing
3892 //
3893 // a client/mds/osd will connect to a random monitor. we need to forward any
3894 // messages requiring state updates to the leader, and then route any replies
3895 // back via the correct monitor and back to them. (the monitor will not
3896 // initiate any connections.)
3897
3898 void Monitor::forward_request_leader(MonOpRequestRef op)
3899 {
3900 op->mark_event(__func__);
3901
3902 int mon = get_leader();
3903 MonSession *session = op->get_session();
3904 PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
3905
3906 if (req->get_source().is_mon() && req->get_source_addrs() != messenger->get_myaddrs()) {
3907 dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
3908 } else if (session->proxy_con) {
3909 dout(10) << "forward_request won't double fwd request " << *req << dendl;
3910 } else if (!session->closed) {
3911 RoutedRequest *rr = new RoutedRequest;
3912 rr->tid = ++routed_request_tid;
3913 rr->con = req->get_connection();
3914 rr->con_features = rr->con->get_features();
3915 encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
3916 rr->session = static_cast<MonSession *>(session->get());
3917 rr->op = op;
3918 routed_requests[rr->tid] = rr;
3919 session->routed_request_tids.insert(rr->tid);
3920
3921 dout(10) << "forward_request " << rr->tid << " request " << *req
3922 << " features " << rr->con_features << dendl;
3923
3924 MForward *forward = new MForward(rr->tid,
3925 req,
3926 rr->con_features,
3927 rr->session->caps);
3928 forward->set_priority(req->get_priority());
3929 if (session->auth_handler) {
3930 forward->entity_name = session->entity_name;
3931 } else if (req->get_source().is_mon()) {
3932 forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
3933 }
3934 send_mon_message(forward, mon);
3935 op->mark_forwarded();
3936 ceph_assert(op->get_req()->get_type() != 0);
3937 } else {
3938 dout(10) << "forward_request no session for request " << *req << dendl;
3939 }
3940 }
3941
3942 // fake connection attached to forwarded messages
3943 struct AnonConnection : public Connection {
3944 entity_addr_t socket_addr;
3945
3946 int send_message(Message *m) override {
3947 ceph_assert(!"send_message on anonymous connection");
3948 }
3949 void send_keepalive() override {
3950 ceph_assert(!"send_keepalive on anonymous connection");
3951 }
3952 void mark_down() override {
3953 // silently ignore
3954 }
3955 void mark_disposable() override {
3956 // silengtly ignore
3957 }
3958 bool is_connected() override { return false; }
3959 entity_addr_t get_peer_socket_addr() const override {
3960 return socket_addr;
3961 }
3962
3963 private:
3964 FRIEND_MAKE_REF(AnonConnection);
3965 explicit AnonConnection(CephContext *cct, const entity_addr_t& sa)
3966 : Connection(cct, nullptr),
3967 socket_addr(sa) {}
3968 };
3969
3970 //extract the original message and put it into the regular dispatch function
3971 void Monitor::handle_forward(MonOpRequestRef op)
3972 {
3973 auto m = op->get_req<MForward>();
3974 dout(10) << "received forwarded message from "
3975 << ceph_entity_type_name(m->client_type)
3976 << " " << m->client_addrs
3977 << " via " << m->get_source_inst() << dendl;
3978 MonSession *session = op->get_session();
3979 ceph_assert(session);
3980
3981 if (!session->is_capable("mon", MON_CAP_X)) {
3982 dout(0) << "forward from entity with insufficient caps! "
3983 << session->caps << dendl;
3984 } else {
3985 // see PaxosService::dispatch(); we rely on this being anon
3986 // (c->msgr == NULL)
3987 PaxosServiceMessage *req = m->claim_message();
3988 ceph_assert(req != NULL);
3989
3990 auto c = ceph::make_ref<AnonConnection>(cct, m->client_socket_addr);
3991 MonSession *s = new MonSession(static_cast<Connection*>(c.get()));
3992 s->_ident(req->get_source(),
3993 req->get_source_addrs());
3994 c->set_priv(RefCountedPtr{s, false});
3995 c->set_peer_addrs(m->client_addrs);
3996 c->set_peer_type(m->client_type);
3997 c->set_features(m->con_features);
3998
3999 s->authenticated = true;
4000 s->caps = m->client_caps;
4001 dout(10) << " caps are " << s->caps << dendl;
4002 s->entity_name = m->entity_name;
4003 dout(10) << " entity name '" << s->entity_name << "' type "
4004 << s->entity_name.get_type() << dendl;
4005 s->proxy_con = m->get_connection();
4006 s->proxy_tid = m->tid;
4007
4008 req->set_connection(c);
4009
4010 // not super accurate, but better than nothing.
4011 req->set_recv_stamp(m->get_recv_stamp());
4012
4013 /*
4014 * note which election epoch this is; we will drop the message if
4015 * there is a future election since our peers will resend routed
4016 * requests in that case.
4017 */
4018 req->rx_election_epoch = get_epoch();
4019
4020 dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
4021 _ms_dispatch(req);
4022
4023 // break the session <-> con ref loop by removing the con->session
4024 // reference, which is no longer needed once the MonOpRequest is
4025 // set up.
4026 c->set_priv(NULL);
4027 }
4028 }
4029
4030 void Monitor::send_reply(MonOpRequestRef op, Message *reply)
4031 {
4032 op->mark_event(__func__);
4033
4034 MonSession *session = op->get_session();
4035 ceph_assert(session);
4036 Message *req = op->get_req();
4037 ConnectionRef con = op->get_connection();
4038
4039 reply->set_cct(g_ceph_context);
4040 dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
4041
4042 if (!con) {
4043 dout(2) << "send_reply no connection, dropping reply " << *reply
4044 << " to " << req << " " << *req << dendl;
4045 reply->put();
4046 op->mark_event("reply: no connection");
4047 return;
4048 }
4049
4050 if (!session->con && !session->proxy_con) {
4051 dout(2) << "send_reply no connection, dropping reply " << *reply
4052 << " to " << req << " " << *req << dendl;
4053 reply->put();
4054 op->mark_event("reply: no connection");
4055 return;
4056 }
4057
4058 if (session->proxy_con) {
4059 dout(15) << "send_reply routing reply to " << con->get_peer_addr()
4060 << " via " << session->proxy_con->get_peer_addr()
4061 << " for request " << *req << dendl;
4062 session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
4063 op->mark_event("reply: send routed request");
4064 } else {
4065 session->con->send_message(reply);
4066 op->mark_event("reply: send");
4067 }
4068 }
4069
4070 void Monitor::no_reply(MonOpRequestRef op)
4071 {
4072 MonSession *session = op->get_session();
4073 Message *req = op->get_req();
4074
4075 if (session->proxy_con) {
4076 dout(10) << "no_reply to " << req->get_source_inst()
4077 << " via " << session->proxy_con->get_peer_addr()
4078 << " for request " << *req << dendl;
4079 session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
4080 op->mark_event("no_reply: send routed request");
4081 } else {
4082 dout(10) << "no_reply to " << req->get_source_inst()
4083 << " " << *req << dendl;
4084 op->mark_event("no_reply");
4085 }
4086 }
4087
4088 void Monitor::handle_route(MonOpRequestRef op)
4089 {
4090 auto m = op->get_req<MRoute>();
4091 MonSession *session = op->get_session();
4092 //check privileges
4093 if (!session->is_capable("mon", MON_CAP_X)) {
4094 dout(0) << "MRoute received from entity without appropriate perms! "
4095 << dendl;
4096 return;
4097 }
4098 if (m->msg)
4099 dout(10) << "handle_route tid " << m->session_mon_tid << " " << *m->msg
4100 << dendl;
4101 else
4102 dout(10) << "handle_route tid " << m->session_mon_tid << " null" << dendl;
4103
4104 // look it up
4105 if (m->session_mon_tid) {
4106 if (routed_requests.count(m->session_mon_tid)) {
4107 RoutedRequest *rr = routed_requests[m->session_mon_tid];
4108
4109 // reset payload, in case encoding is dependent on target features
4110 if (m->msg) {
4111 m->msg->clear_payload();
4112 rr->con->send_message(m->msg);
4113 m->msg = NULL;
4114 }
4115 if (m->send_osdmap_first) {
4116 dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
4117 osdmon()->send_incremental(m->send_osdmap_first, rr->session,
4118 true, MonOpRequestRef());
4119 }
4120 ceph_assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
4121 routed_requests.erase(m->session_mon_tid);
4122 rr->session->routed_request_tids.erase(m->session_mon_tid);
4123 delete rr;
4124 } else {
4125 dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
4126 }
4127 } else {
4128 dout(10) << " not a routed request, ignoring" << dendl;
4129 }
4130 }
4131
4132 void Monitor::resend_routed_requests()
4133 {
4134 dout(10) << "resend_routed_requests" << dendl;
4135 int mon = get_leader();
4136 list<Context*> retry;
4137 for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
4138 p != routed_requests.end();
4139 ++p) {
4140 RoutedRequest *rr = p->second;
4141
4142 if (mon == rank) {
4143 dout(10) << " requeue for self tid " << rr->tid << dendl;
4144 rr->op->mark_event("retry routed request");
4145 retry.push_back(new C_RetryMessage(this, rr->op));
4146 if (rr->session) {
4147 ceph_assert(rr->session->routed_request_tids.count(p->first));
4148 rr->session->routed_request_tids.erase(p->first);
4149 }
4150 delete rr;
4151 } else {
4152 auto q = rr->request_bl.cbegin();
4153 PaxosServiceMessage *req =
4154 (PaxosServiceMessage *)decode_message(cct, 0, q);
4155 rr->op->mark_event("resend forwarded message to leader");
4156 dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req
4157 << dendl;
4158 MForward *forward = new MForward(rr->tid,
4159 req,
4160 rr->con_features,
4161 rr->session->caps);
4162 req->put(); // forward takes its own ref; drop ours.
4163 forward->client_type = rr->con->get_peer_type();
4164 forward->client_addrs = rr->con->get_peer_addrs();
4165 forward->client_socket_addr = rr->con->get_peer_socket_addr();
4166 forward->set_priority(req->get_priority());
4167 send_mon_message(forward, mon);
4168 }
4169 }
4170 if (mon == rank) {
4171 routed_requests.clear();
4172 finish_contexts(g_ceph_context, retry);
4173 }
4174 }
4175
4176 void Monitor::remove_session(MonSession *s)
4177 {
4178 dout(10) << "remove_session " << s << " " << s->name << " " << s->addrs
4179 << " features 0x" << std::hex << s->con_features << std::dec << dendl;
4180 ceph_assert(s->con);
4181 ceph_assert(!s->closed);
4182 for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
4183 p != s->routed_request_tids.end();
4184 ++p) {
4185 ceph_assert(routed_requests.count(*p));
4186 RoutedRequest *rr = routed_requests[*p];
4187 dout(10) << " dropping routed request " << rr->tid << dendl;
4188 delete rr;
4189 routed_requests.erase(*p);
4190 }
4191 s->routed_request_tids.clear();
4192 s->con->set_priv(nullptr);
4193 session_map.remove_session(s);
4194 logger->set(l_mon_num_sessions, session_map.get_size());
4195 logger->inc(l_mon_session_rm);
4196 }
4197
4198 void Monitor::remove_all_sessions()
4199 {
4200 std::lock_guard l(session_map_lock);
4201 while (!session_map.sessions.empty()) {
4202 MonSession *s = session_map.sessions.front();
4203 remove_session(s);
4204 logger->inc(l_mon_session_rm);
4205 }
4206 if (logger)
4207 logger->set(l_mon_num_sessions, session_map.get_size());
4208 }
4209
4210 void Monitor::send_mon_message(Message *m, int rank)
4211 {
4212 messenger->send_to_mon(m, monmap->get_addrs(rank));
4213 }
4214
4215 void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
4216 {
4217 /**
4218 * Wait list the new session until we're in the quorum, assuming it's
4219 * sufficiently new.
4220 * tick() will periodically send them back through so we can send
4221 * the client elsewhere if we don't think we're getting back in.
4222 *
4223 * But we whitelist a few sorts of messages:
4224 * 1) Monitors can talk to us at any time, of course.
4225 * 2) auth messages. It's unlikely to go through much faster, but
4226 * it's possible we've just lost our quorum status and we want to take...
4227 * 3) command messages. We want to accept these under all possible
4228 * circumstances.
4229 */
4230 Message *m = op->get_req();
4231 MonSession *s = op->get_session();
4232 ConnectionRef con = op->get_connection();
4233 utime_t too_old = ceph_clock_now();
4234 too_old -= g_ceph_context->_conf->mon_lease;
4235 if (m->get_recv_stamp() > too_old &&
4236 con->is_connected()) {
4237 dout(5) << "waitlisting message " << *m << dendl;
4238 maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
4239 op->mark_wait_for_quorum();
4240 } else {
4241 dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
4242 con->mark_down();
4243 // proxied sessions aren't registered and don't have a con; don't remove
4244 // those.
4245 if (!s->proxy_con) {
4246 std::lock_guard l(session_map_lock);
4247 remove_session(s);
4248 }
4249 op->mark_zap();
4250 }
4251 }
4252
4253 void Monitor::_ms_dispatch(Message *m)
4254 {
4255 if (is_shutdown()) {
4256 m->put();
4257 return;
4258 }
4259
4260 MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
4261 bool src_is_mon = op->is_src_mon();
4262 op->mark_event("mon:_ms_dispatch");
4263 MonSession *s = op->get_session();
4264 if (s && s->closed) {
4265 return;
4266 }
4267
4268 if (src_is_mon && s) {
4269 ConnectionRef con = m->get_connection();
4270 if (con->get_messenger() && con->get_features() != s->con_features) {
4271 // only update features if this is a non-anonymous connection
4272 dout(10) << __func__ << " feature change for " << m->get_source_inst()
4273 << " (was " << s->con_features
4274 << ", now " << con->get_features() << ")" << dendl;
4275 // connection features changed - recreate session.
4276 if (s->con && s->con != con) {
4277 dout(10) << __func__ << " connection for " << m->get_source_inst()
4278 << " changed from session; mark down and replace" << dendl;
4279 s->con->mark_down();
4280 }
4281 if (s->item.is_on_list()) {
4282 // forwarded messages' sessions are not in the sessions map and
4283 // exist only while the op is being handled.
4284 std::lock_guard l(session_map_lock);
4285 remove_session(s);
4286 }
4287 s = nullptr;
4288 }
4289 }
4290
4291 if (!s) {
4292 // if the sender is not a monitor, make sure their first message for a
4293 // session is an MAuth. If it is not, assume it's a stray message,
4294 // and considering that we are creating a new session it is safe to
4295 // assume that the sender hasn't authenticated yet, so we have no way
4296 // of assessing whether we should handle it or not.
4297 if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
4298 m->get_type() != CEPH_MSG_MON_GET_MAP &&
4299 m->get_type() != CEPH_MSG_PING)) {
4300 dout(1) << __func__ << " dropping stray message " << *m
4301 << " from " << m->get_source_inst() << dendl;
4302 return;
4303 }
4304
4305 ConnectionRef con = m->get_connection();
4306 {
4307 std::lock_guard l(session_map_lock);
4308 s = session_map.new_session(m->get_source(),
4309 m->get_source_addrs(),
4310 con.get());
4311 }
4312 ceph_assert(s);
4313 con->set_priv(RefCountedPtr{s, false});
4314 dout(10) << __func__ << " new session " << s << " " << *s
4315 << " features 0x" << std::hex
4316 << s->con_features << std::dec << dendl;
4317 op->set_session(s);
4318
4319 logger->set(l_mon_num_sessions, session_map.get_size());
4320 logger->inc(l_mon_session_add);
4321
4322 if (src_is_mon) {
4323 // give it monitor caps; the peer type has been authenticated
4324 dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
4325 if (!s->caps.is_allow_all()) // but no need to repeatedly copy
4326 s->caps = mon_caps;
4327 s->authenticated = true;
4328 }
4329 } else {
4330 dout(20) << __func__ << " existing session " << s << " for " << s->name
4331 << dendl;
4332 }
4333
4334 ceph_assert(s);
4335
4336 s->session_timeout = ceph_clock_now();
4337 s->session_timeout += g_conf()->mon_session_timeout;
4338
4339 if (s->auth_handler) {
4340 s->entity_name = s->auth_handler->get_entity_name();
4341 }
4342 dout(20) << " entity " << s->entity_name
4343 << " caps " << s->caps.get_str() << dendl;
4344
4345 if ((is_synchronizing() ||
4346 (!s->authenticated && !exited_quorum.is_zero())) &&
4347 !src_is_mon &&
4348 m->get_type() != CEPH_MSG_PING) {
4349 waitlist_or_zap_client(op);
4350 } else {
4351 dispatch_op(op);
4352 }
4353 return;
4354 }
4355
4356 void Monitor::dispatch_op(MonOpRequestRef op)
4357 {
4358 op->mark_event("mon:dispatch_op");
4359 MonSession *s = op->get_session();
4360 ceph_assert(s);
4361 if (s->closed) {
4362 dout(10) << " session closed, dropping " << op->get_req() << dendl;
4363 return;
4364 }
4365
4366 /* we will consider the default type as being 'monitor' until proven wrong */
4367 op->set_type_monitor();
4368 /* deal with all messages that do not necessarily need caps */
4369 switch (op->get_req()->get_type()) {
4370 // auth
4371 case MSG_MON_GLOBAL_ID:
4372 case CEPH_MSG_AUTH:
4373 op->set_type_service();
4374 /* no need to check caps here */
4375 paxos_service[PAXOS_AUTH]->dispatch(op);
4376 return;
4377
4378 case CEPH_MSG_PING:
4379 handle_ping(op);
4380 return;
4381 case MSG_COMMAND:
4382 op->set_type_command();
4383 handle_tell_command(op);
4384 return;
4385 }
4386
4387 if (!op->get_session()->authenticated) {
4388 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4389 << " is not authenticated, dropping " << *(op->get_req())
4390 << dendl;
4391 return;
4392 }
4393
4394 switch (op->get_req()->get_type()) {
4395 case CEPH_MSG_MON_GET_MAP:
4396 handle_mon_get_map(op);
4397 return;
4398
4399 case MSG_GET_CONFIG:
4400 configmon()->handle_get_config(op);
4401 return;
4402
4403 case CEPH_MSG_MON_METADATA:
4404 return handle_mon_metadata(op);
4405
4406 case CEPH_MSG_MON_SUBSCRIBE:
4407 /* FIXME: check what's being subscribed, filter accordingly */
4408 handle_subscribe(op);
4409 return;
4410 }
4411
4412 /* well, maybe the op belongs to a service... */
4413 op->set_type_service();
4414 /* deal with all messages which caps should be checked somewhere else */
4415 switch (op->get_req()->get_type()) {
4416
4417 // OSDs
4418 case CEPH_MSG_MON_GET_OSDMAP:
4419 case CEPH_MSG_POOLOP:
4420 case MSG_OSD_BEACON:
4421 case MSG_OSD_MARK_ME_DOWN:
4422 case MSG_OSD_MARK_ME_DEAD:
4423 case MSG_OSD_FULL:
4424 case MSG_OSD_FAILURE:
4425 case MSG_OSD_BOOT:
4426 case MSG_OSD_ALIVE:
4427 case MSG_OSD_PGTEMP:
4428 case MSG_OSD_PG_CREATED:
4429 case MSG_REMOVE_SNAPS:
4430 case MSG_MON_GET_PURGED_SNAPS:
4431 case MSG_OSD_PG_READY_TO_MERGE:
4432 paxos_service[PAXOS_OSDMAP]->dispatch(op);
4433 return;
4434
4435 // MDSs
4436 case MSG_MDS_BEACON:
4437 case MSG_MDS_OFFLOAD_TARGETS:
4438 paxos_service[PAXOS_MDSMAP]->dispatch(op);
4439 return;
4440
4441 // Mgrs
4442 case MSG_MGR_BEACON:
4443 paxos_service[PAXOS_MGR]->dispatch(op);
4444 return;
4445
4446 // MgrStat
4447 case MSG_MON_MGR_REPORT:
4448 case CEPH_MSG_STATFS:
4449 case MSG_GETPOOLSTATS:
4450 paxos_service[PAXOS_MGRSTAT]->dispatch(op);
4451 return;
4452
4453 // log
4454 case MSG_LOG:
4455 paxos_service[PAXOS_LOG]->dispatch(op);
4456 return;
4457
4458 // handle_command() does its own caps checking
4459 case MSG_MON_COMMAND:
4460 op->set_type_command();
4461 handle_command(op);
4462 return;
4463 }
4464
4465 /* nop, looks like it's not a service message; revert back to monitor */
4466 op->set_type_monitor();
4467
4468 /* messages we, the Monitor class, need to deal with
4469 * but may be sent by clients. */
4470
4471 if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
4472 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4473 << " not enough caps for " << *(op->get_req()) << " -- dropping"
4474 << dendl;
4475 return;
4476 }
4477
4478 switch (op->get_req()->get_type()) {
4479 // misc
4480 case CEPH_MSG_MON_GET_VERSION:
4481 handle_get_version(op);
4482 return;
4483 }
4484
4485 if (!op->is_src_mon()) {
4486 dout(1) << __func__ << " unexpected monitor message from"
4487 << " non-monitor entity " << op->get_req()->get_source_inst()
4488 << " " << *(op->get_req()) << " -- dropping" << dendl;
4489 return;
4490 }
4491
4492 /* messages that should only be sent by another monitor */
4493 switch (op->get_req()->get_type()) {
4494
4495 case MSG_ROUTE:
4496 handle_route(op);
4497 return;
4498
4499 case MSG_MON_PROBE:
4500 handle_probe(op);
4501 return;
4502
4503 // Sync (i.e., the new slurp, but on steroids)
4504 case MSG_MON_SYNC:
4505 handle_sync(op);
4506 return;
4507 case MSG_MON_SCRUB:
4508 handle_scrub(op);
4509 return;
4510
4511 /* log acks are sent from a monitor we sent the MLog to, and are
4512 never sent by clients to us. */
4513 case MSG_LOGACK:
4514 log_client.handle_log_ack((MLogAck*)op->get_req());
4515 return;
4516
4517 // monmap
4518 case MSG_MON_JOIN:
4519 op->set_type_service();
4520 paxos_service[PAXOS_MONMAP]->dispatch(op);
4521 return;
4522
4523 // paxos
4524 case MSG_MON_PAXOS:
4525 {
4526 op->set_type_paxos();
4527 auto pm = op->get_req<MMonPaxos>();
4528 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4529 //can't send these!
4530 return;
4531 }
4532
4533 if (state == STATE_SYNCHRONIZING) {
4534 // we are synchronizing. These messages would do us no
4535 // good, thus just drop them and ignore them.
4536 dout(10) << __func__ << " ignore paxos msg from "
4537 << pm->get_source_inst() << dendl;
4538 return;
4539 }
4540
4541 // sanitize
4542 if (pm->epoch > get_epoch()) {
4543 bootstrap();
4544 return;
4545 }
4546 if (pm->epoch != get_epoch()) {
4547 return;
4548 }
4549
4550 paxos->dispatch(op);
4551 }
4552 return;
4553
4554 // elector messages
4555 case MSG_MON_ELECTION:
4556 op->set_type_election();
4557 //check privileges here for simplicity
4558 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4559 dout(0) << "MMonElection received from entity without enough caps!"
4560 << op->get_session()->caps << dendl;
4561 return;;
4562 }
4563 if (!is_probing() && !is_synchronizing()) {
4564 elector.dispatch(op);
4565 }
4566 return;
4567
4568 case MSG_FORWARD:
4569 handle_forward(op);
4570 return;
4571
4572 case MSG_TIMECHECK:
4573 dout(5) << __func__ << " ignoring " << op << dendl;
4574 return;
4575 case MSG_TIMECHECK2:
4576 handle_timecheck(op);
4577 return;
4578
4579 case MSG_MON_HEALTH:
4580 dout(5) << __func__ << " dropping deprecated message: "
4581 << *op->get_req() << dendl;
4582 break;
4583 case MSG_MON_HEALTH_CHECKS:
4584 op->set_type_service();
4585 paxos_service[PAXOS_HEALTH]->dispatch(op);
4586 return;
4587 }
4588 dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
4589 return;
4590 }
4591
4592 void Monitor::handle_ping(MonOpRequestRef op)
4593 {
4594 auto m = op->get_req<MPing>();
4595 dout(10) << __func__ << " " << *m << dendl;
4596 MPing *reply = new MPing;
4597 bufferlist payload;
4598 boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4599 f->open_object_section("pong");
4600
4601 healthmon()->get_health_status(false, f.get(), nullptr);
4602 get_mon_status(f.get());
4603
4604 f->close_section();
4605 stringstream ss;
4606 f->flush(ss);
4607 encode(ss.str(), payload);
4608 reply->set_payload(payload);
4609 dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
4610 m->get_connection()->send_message(reply);
4611 }
4612
4613 void Monitor::timecheck_start()
4614 {
4615 dout(10) << __func__ << dendl;
4616 timecheck_cleanup();
4617 if (get_quorum_mon_features().contains_all(
4618 ceph::features::mon::FEATURE_NAUTILUS)) {
4619 timecheck_start_round();
4620 }
4621 }
4622
4623 void Monitor::timecheck_finish()
4624 {
4625 dout(10) << __func__ << dendl;
4626 timecheck_cleanup();
4627 }
4628
4629 void Monitor::timecheck_start_round()
4630 {
4631 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4632 ceph_assert(is_leader());
4633
4634 if (monmap->size() == 1) {
4635 ceph_abort_msg("We are alone; this shouldn't have been scheduled!");
4636 return;
4637 }
4638
4639 if (timecheck_round % 2) {
4640 dout(10) << __func__ << " there's a timecheck going on" << dendl;
4641 utime_t curr_time = ceph_clock_now();
4642 double max = g_conf()->mon_timecheck_interval*3;
4643 if (curr_time - timecheck_round_start < max) {
4644 dout(10) << __func__ << " keep current round going" << dendl;
4645 goto out;
4646 } else {
4647 dout(10) << __func__
4648 << " finish current timecheck and start new" << dendl;
4649 timecheck_cancel_round();
4650 }
4651 }
4652
4653 ceph_assert(timecheck_round % 2 == 0);
4654 timecheck_acks = 0;
4655 timecheck_round ++;
4656 timecheck_round_start = ceph_clock_now();
4657 dout(10) << __func__ << " new " << timecheck_round << dendl;
4658
4659 timecheck();
4660 out:
4661 dout(10) << __func__ << " setting up next event" << dendl;
4662 timecheck_reset_event();
4663 }
4664
4665 void Monitor::timecheck_finish_round(bool success)
4666 {
4667 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4668 ceph_assert(timecheck_round % 2);
4669 timecheck_round ++;
4670 timecheck_round_start = utime_t();
4671
4672 if (success) {
4673 ceph_assert(timecheck_waiting.empty());
4674 ceph_assert(timecheck_acks == quorum.size());
4675 timecheck_report();
4676 timecheck_check_skews();
4677 return;
4678 }
4679
4680 dout(10) << __func__ << " " << timecheck_waiting.size()
4681 << " peers still waiting:";
4682 for (auto& p : timecheck_waiting) {
4683 *_dout << " mon." << p.first;
4684 }
4685 *_dout << dendl;
4686 timecheck_waiting.clear();
4687
4688 dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4689 }
4690
4691 void Monitor::timecheck_cancel_round()
4692 {
4693 timecheck_finish_round(false);
4694 }
4695
4696 void Monitor::timecheck_cleanup()
4697 {
4698 timecheck_round = 0;
4699 timecheck_acks = 0;
4700 timecheck_round_start = utime_t();
4701
4702 if (timecheck_event) {
4703 timer.cancel_event(timecheck_event);
4704 timecheck_event = NULL;
4705 }
4706 timecheck_waiting.clear();
4707 timecheck_skews.clear();
4708 timecheck_latencies.clear();
4709
4710 timecheck_rounds_since_clean = 0;
4711 }
4712
4713 void Monitor::timecheck_reset_event()
4714 {
4715 if (timecheck_event) {
4716 timer.cancel_event(timecheck_event);
4717 timecheck_event = NULL;
4718 }
4719
4720 double delay =
4721 cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4722
4723 if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4724 delay = cct->_conf->mon_timecheck_interval;
4725 }
4726
4727 dout(10) << __func__ << " delay " << delay
4728 << " rounds_since_clean " << timecheck_rounds_since_clean
4729 << dendl;
4730
4731 timecheck_event = timer.add_event_after(
4732 delay,
4733 new C_MonContext{this, [this](int) {
4734 timecheck_start_round();
4735 }});
4736 }
4737
4738 void Monitor::timecheck_check_skews()
4739 {
4740 dout(10) << __func__ << dendl;
4741 ceph_assert(is_leader());
4742 ceph_assert((timecheck_round % 2) == 0);
4743 if (monmap->size() == 1) {
4744 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4745 return;
4746 }
4747 ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
4748
4749 bool found_skew = false;
4750 for (auto& p : timecheck_skews) {
4751 double abs_skew;
4752 if (timecheck_has_skew(p.second, &abs_skew)) {
4753 dout(10) << __func__
4754 << " " << p.first << " skew " << abs_skew << dendl;
4755 found_skew = true;
4756 }
4757 }
4758
4759 if (found_skew) {
4760 ++timecheck_rounds_since_clean;
4761 timecheck_reset_event();
4762 } else if (timecheck_rounds_since_clean > 0) {
4763 dout(1) << __func__
4764 << " no clock skews found after " << timecheck_rounds_since_clean
4765 << " rounds" << dendl;
4766 // make sure the skews are really gone and not just a transient success
4767 // this will run just once if not in the presence of skews again.
4768 timecheck_rounds_since_clean = 1;
4769 timecheck_reset_event();
4770 timecheck_rounds_since_clean = 0;
4771 }
4772
4773 }
4774
4775 void Monitor::timecheck_report()
4776 {
4777 dout(10) << __func__ << dendl;
4778 ceph_assert(is_leader());
4779 ceph_assert((timecheck_round % 2) == 0);
4780 if (monmap->size() == 1) {
4781 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4782 return;
4783 }
4784
4785 ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
4786 bool do_output = true; // only output report once
4787 for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4788 if (monmap->get_name(*q) == name)
4789 continue;
4790
4791 MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_REPORT);
4792 m->epoch = get_epoch();
4793 m->round = timecheck_round;
4794
4795 for (auto& it : timecheck_skews) {
4796 double skew = it.second;
4797 double latency = timecheck_latencies[it.first];
4798
4799 m->skews[it.first] = skew;
4800 m->latencies[it.first] = latency;
4801
4802 if (do_output) {
4803 dout(25) << __func__ << " mon." << it.first
4804 << " latency " << latency
4805 << " skew " << skew << dendl;
4806 }
4807 }
4808 do_output = false;
4809 dout(10) << __func__ << " send report to mon." << *q << dendl;
4810 send_mon_message(m, *q);
4811 }
4812 }
4813
4814 void Monitor::timecheck()
4815 {
4816 dout(10) << __func__ << dendl;
4817 ceph_assert(is_leader());
4818 if (monmap->size() == 1) {
4819 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4820 return;
4821 }
4822 ceph_assert(timecheck_round % 2 != 0);
4823
4824 timecheck_acks = 1; // we ack ourselves
4825
4826 dout(10) << __func__ << " start timecheck epoch " << get_epoch()
4827 << " round " << timecheck_round << dendl;
4828
4829 // we are at the eye of the storm; the point of reference
4830 timecheck_skews[rank] = 0.0;
4831 timecheck_latencies[rank] = 0.0;
4832
4833 for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
4834 if (monmap->get_name(*it) == name)
4835 continue;
4836
4837 utime_t curr_time = ceph_clock_now();
4838 timecheck_waiting[*it] = curr_time;
4839 MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_PING);
4840 m->epoch = get_epoch();
4841 m->round = timecheck_round;
4842 dout(10) << __func__ << " send " << *m << " to mon." << *it << dendl;
4843 send_mon_message(m, *it);
4844 }
4845 }
4846
4847 health_status_t Monitor::timecheck_status(ostringstream &ss,
4848 const double skew_bound,
4849 const double latency)
4850 {
4851 health_status_t status = HEALTH_OK;
4852 ceph_assert(latency >= 0);
4853
4854 double abs_skew;
4855 if (timecheck_has_skew(skew_bound, &abs_skew)) {
4856 status = HEALTH_WARN;
4857 ss << "clock skew " << abs_skew << "s"
4858 << " > max " << g_conf()->mon_clock_drift_allowed << "s";
4859 }
4860
4861 return status;
4862 }
4863
4864 void Monitor::handle_timecheck_leader(MonOpRequestRef op)
4865 {
4866 auto m = op->get_req<MTimeCheck2>();
4867 dout(10) << __func__ << " " << *m << dendl;
4868 /* handles PONG's */
4869 ceph_assert(m->op == MTimeCheck2::OP_PONG);
4870
4871 int other = m->get_source().num();
4872 if (m->epoch < get_epoch()) {
4873 dout(1) << __func__ << " got old timecheck epoch " << m->epoch
4874 << " from " << other
4875 << " curr " << get_epoch()
4876 << " -- severely lagged? discard" << dendl;
4877 return;
4878 }
4879 ceph_assert(m->epoch == get_epoch());
4880
4881 if (m->round < timecheck_round) {
4882 dout(1) << __func__ << " got old round " << m->round
4883 << " from " << other
4884 << " curr " << timecheck_round << " -- discard" << dendl;
4885 return;
4886 }
4887
4888 utime_t curr_time = ceph_clock_now();
4889
4890 ceph_assert(timecheck_waiting.count(other) > 0);
4891 utime_t timecheck_sent = timecheck_waiting[other];
4892 timecheck_waiting.erase(other);
4893 if (curr_time < timecheck_sent) {
4894 // our clock was readjusted -- drop everything until it all makes sense.
4895 dout(1) << __func__ << " our clock was readjusted --"
4896 << " bump round and drop current check"
4897 << dendl;
4898 timecheck_cancel_round();
4899 return;
4900 }
4901
4902 /* update peer latencies */
4903 double latency = (double)(curr_time - timecheck_sent);
4904
4905 if (timecheck_latencies.count(other) == 0)
4906 timecheck_latencies[other] = latency;
4907 else {
4908 double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
4909 timecheck_latencies[other] = avg_latency;
4910 }
4911
4912 /*
4913 * update skews
4914 *
4915 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4916 * and 'a' happens to be lower than 'b'; so we use double instead.
4917 *
4918 * latency is always expected to be >= 0.
4919 *
4920 * delta, the difference between theirs timestamp and ours, may either be
4921 * lower or higher than 0; will hardly ever be 0.
4922 *
4923 * The absolute skew is the absolute delta minus the latency, which is
4924 * taken as a whole instead of an rtt given that there is some queueing
4925 * and dispatch times involved and it's hard to assess how long exactly
4926 * it took for the message to travel to the other side and be handled. So
4927 * we call it a bounded skew, the worst case scenario.
4928 *
4929 * Now, to math!
4930 *
4931 * Given that the latency is always positive, we can establish that the
4932 * bounded skew will be:
4933 *
4934 * 1. positive if the absolute delta is higher than the latency and
4935 * delta is positive
4936 * 2. negative if the absolute delta is higher than the latency and
4937 * delta is negative.
4938 * 3. zero if the absolute delta is lower than the latency.
4939 *
4940 * On 3. we make a judgement call and treat the skew as non-existent.
4941 * This is because that, if the absolute delta is lower than the
4942 * latency, then the apparently existing skew is nothing more than a
4943 * side-effect of the high latency at work.
4944 *
4945 * This may not be entirely true though, as a severely skewed clock
4946 * may be masked by an even higher latency, but with high latencies
4947 * we probably have worse issues to deal with than just skewed clocks.
4948 */
4949 ceph_assert(latency >= 0);
4950
4951 double delta = ((double) m->timestamp) - ((double) curr_time);
4952 double abs_delta = (delta > 0 ? delta : -delta);
4953 double skew_bound = abs_delta - latency;
4954 if (skew_bound < 0)
4955 skew_bound = 0;
4956 else if (delta < 0)
4957 skew_bound = -skew_bound;
4958
4959 ostringstream ss;
4960 health_status_t status = timecheck_status(ss, skew_bound, latency);
4961 if (status != HEALTH_OK) {
4962 clog->health(status) << other << " " << ss.str();
4963 }
4964
4965 dout(10) << __func__ << " from " << other << " ts " << m->timestamp
4966 << " delta " << delta << " skew_bound " << skew_bound
4967 << " latency " << latency << dendl;
4968
4969 timecheck_skews[other] = skew_bound;
4970
4971 timecheck_acks++;
4972 if (timecheck_acks == quorum.size()) {
4973 dout(10) << __func__ << " got pongs from everybody ("
4974 << timecheck_acks << " total)" << dendl;
4975 ceph_assert(timecheck_skews.size() == timecheck_acks);
4976 ceph_assert(timecheck_waiting.empty());
4977 // everyone has acked, so bump the round to finish it.
4978 timecheck_finish_round();
4979 }
4980 }
4981
4982 void Monitor::handle_timecheck_peon(MonOpRequestRef op)
4983 {
4984 auto m = op->get_req<MTimeCheck2>();
4985 dout(10) << __func__ << " " << *m << dendl;
4986
4987 ceph_assert(is_peon());
4988 ceph_assert(m->op == MTimeCheck2::OP_PING || m->op == MTimeCheck2::OP_REPORT);
4989
4990 if (m->epoch != get_epoch()) {
4991 dout(1) << __func__ << " got wrong epoch "
4992 << "(ours " << get_epoch()
4993 << " theirs: " << m->epoch << ") -- discarding" << dendl;
4994 return;
4995 }
4996
4997 if (m->round < timecheck_round) {
4998 dout(1) << __func__ << " got old round " << m->round
4999 << " current " << timecheck_round
5000 << " (epoch " << get_epoch() << ") -- discarding" << dendl;
5001 return;
5002 }
5003
5004 timecheck_round = m->round;
5005
5006 if (m->op == MTimeCheck2::OP_REPORT) {
5007 ceph_assert((timecheck_round % 2) == 0);
5008 timecheck_latencies.swap(m->latencies);
5009 timecheck_skews.swap(m->skews);
5010 return;
5011 }
5012
5013 ceph_assert((timecheck_round % 2) != 0);
5014 MTimeCheck2 *reply = new MTimeCheck2(MTimeCheck2::OP_PONG);
5015 utime_t curr_time = ceph_clock_now();
5016 reply->timestamp = curr_time;
5017 reply->epoch = m->epoch;
5018 reply->round = m->round;
5019 dout(10) << __func__ << " send " << *m
5020 << " to " << m->get_source_inst() << dendl;
5021 m->get_connection()->send_message(reply);
5022 }
5023
5024 void Monitor::handle_timecheck(MonOpRequestRef op)
5025 {
5026 auto m = op->get_req<MTimeCheck2>();
5027 dout(10) << __func__ << " " << *m << dendl;
5028
5029 if (is_leader()) {
5030 if (m->op != MTimeCheck2::OP_PONG) {
5031 dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
5032 } else {
5033 handle_timecheck_leader(op);
5034 }
5035 } else if (is_peon()) {
5036 if (m->op != MTimeCheck2::OP_PING && m->op != MTimeCheck2::OP_REPORT) {
5037 dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
5038 } else {
5039 handle_timecheck_peon(op);
5040 }
5041 } else {
5042 dout(1) << __func__ << " drop unexpected msg" << dendl;
5043 }
5044 }
5045
5046 void Monitor::handle_subscribe(MonOpRequestRef op)
5047 {
5048 auto m = op->get_req<MMonSubscribe>();
5049 dout(10) << "handle_subscribe " << *m << dendl;
5050
5051 bool reply = false;
5052
5053 MonSession *s = op->get_session();
5054 ceph_assert(s);
5055
5056 if (m->hostname.size()) {
5057 s->remote_host = m->hostname;
5058 }
5059
5060 for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
5061 p != m->what.end();
5062 ++p) {
5063 if (p->first == "monmap" || p->first == "config") {
5064 // these require no caps
5065 } else if (!s->is_capable("mon", MON_CAP_R)) {
5066 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
5067 << " not enough caps for " << *(op->get_req()) << " -- dropping"
5068 << dendl;
5069 continue;
5070 }
5071
5072 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
5073 if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
5074 reply = true;
5075
5076 // remove conflicting subscribes
5077 if (logmon()->sub_name_to_id(p->first) >= 0) {
5078 for (map<string, Subscription*>::iterator it = s->sub_map.begin();
5079 it != s->sub_map.end(); ) {
5080 if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
5081 std::lock_guard l(session_map_lock);
5082 session_map.remove_sub((it++)->second);
5083 } else {
5084 ++it;
5085 }
5086 }
5087 }
5088
5089 {
5090 std::lock_guard l(session_map_lock);
5091 session_map.add_update_sub(s, p->first, p->second.start,
5092 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
5093 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
5094 }
5095
5096 if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
5097 dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
5098 if ((int)s->is_capable("mds", MON_CAP_R)) {
5099 Subscription *sub = s->sub_map[p->first];
5100 ceph_assert(sub != nullptr);
5101 mdsmon()->check_sub(sub);
5102 }
5103 } else if (p->first == "osdmap") {
5104 if ((int)s->is_capable("osd", MON_CAP_R)) {
5105 if (s->osd_epoch > p->second.start) {
5106 // client needs earlier osdmaps on purpose, so reset the sent epoch
5107 s->osd_epoch = 0;
5108 }
5109 osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
5110 }
5111 } else if (p->first == "osd_pg_creates") {
5112 if ((int)s->is_capable("osd", MON_CAP_W)) {
5113 osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
5114 }
5115 } else if (p->first == "monmap") {
5116 monmon()->check_sub(s->sub_map[p->first]);
5117 } else if (logmon()->sub_name_to_id(p->first) >= 0) {
5118 logmon()->check_sub(s->sub_map[p->first]);
5119 } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
5120 mgrmon()->check_sub(s->sub_map[p->first]);
5121 } else if (p->first == "servicemap") {
5122 mgrstatmon()->check_sub(s->sub_map[p->first]);
5123 } else if (p->first == "config") {
5124 configmon()->check_sub(s);
5125 }
5126 }
5127
5128 if (reply) {
5129 // we only need to reply if the client is old enough to think it
5130 // has to send renewals.
5131 ConnectionRef con = m->get_connection();
5132 if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
5133 m->get_connection()->send_message(new MMonSubscribeAck(
5134 monmap->get_fsid(), (int)g_conf()->mon_subscribe_interval));
5135 }
5136
5137 }
5138
5139 void Monitor::handle_get_version(MonOpRequestRef op)
5140 {
5141 auto m = op->get_req<MMonGetVersion>();
5142 dout(10) << "handle_get_version " << *m << dendl;
5143 PaxosService *svc = NULL;
5144
5145 MonSession *s = op->get_session();
5146 ceph_assert(s);
5147
5148 if (!is_leader() && !is_peon()) {
5149 dout(10) << " waiting for quorum" << dendl;
5150 waitfor_quorum.push_back(new C_RetryMessage(this, op));
5151 goto out;
5152 }
5153
5154 if (m->what == "mdsmap") {
5155 svc = mdsmon();
5156 } else if (m->what == "fsmap") {
5157 svc = mdsmon();
5158 } else if (m->what == "osdmap") {
5159 svc = osdmon();
5160 } else if (m->what == "monmap") {
5161 svc = monmon();
5162 } else {
5163 derr << "invalid map type " << m->what << dendl;
5164 }
5165
5166 if (svc) {
5167 if (!svc->is_readable()) {
5168 svc->wait_for_readable(op, new C_RetryMessage(this, op));
5169 goto out;
5170 }
5171
5172 MMonGetVersionReply *reply = new MMonGetVersionReply();
5173 reply->handle = m->handle;
5174 reply->version = svc->get_last_committed();
5175 reply->oldest_version = svc->get_first_committed();
5176 reply->set_tid(m->get_tid());
5177
5178 m->get_connection()->send_message(reply);
5179 }
5180 out:
5181 return;
5182 }
5183
5184 bool Monitor::ms_handle_reset(Connection *con)
5185 {
5186 dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
5187
5188 // ignore lossless monitor sessions
5189 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
5190 return false;
5191
5192 auto priv = con->get_priv();
5193 auto s = static_cast<MonSession*>(priv.get());
5194 if (!s)
5195 return false;
5196
5197 // break any con <-> session ref cycle
5198 s->con->set_priv(nullptr);
5199
5200 if (is_shutdown())
5201 return false;
5202
5203 std::lock_guard l(lock);
5204
5205 dout(10) << "reset/close on session " << s->name << " " << s->addrs << dendl;
5206 if (!s->closed && s->item.is_on_list()) {
5207 std::lock_guard l(session_map_lock);
5208 remove_session(s);
5209 }
5210 return true;
5211 }
5212
5213 bool Monitor::ms_handle_refused(Connection *con)
5214 {
5215 // just log for now...
5216 dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
5217 return false;
5218 }
5219
5220 // -----
5221
5222 void Monitor::send_latest_monmap(Connection *con)
5223 {
5224 bufferlist bl;
5225 monmap->encode(bl, con->get_features());
5226 con->send_message(new MMonMap(bl));
5227 }
5228
5229 void Monitor::handle_mon_get_map(MonOpRequestRef op)
5230 {
5231 auto m = op->get_req<MMonGetMap>();
5232 dout(10) << "handle_mon_get_map" << dendl;
5233 send_latest_monmap(m->get_connection().get());
5234 }
5235
5236 void Monitor::handle_mon_metadata(MonOpRequestRef op)
5237 {
5238 auto m = op->get_req<MMonMetadata>();
5239 if (is_leader()) {
5240 dout(10) << __func__ << dendl;
5241 update_mon_metadata(m->get_source().num(), std::move(m->data));
5242 }
5243 }
5244
5245 void Monitor::update_mon_metadata(int from, Metadata&& m)
5246 {
5247 // NOTE: this is now for legacy (kraken or jewel) mons only.
5248 pending_metadata[from] = std::move(m);
5249
5250 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5251 bufferlist bl;
5252 encode(pending_metadata, bl);
5253 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
5254 paxos->trigger_propose();
5255 }
5256
5257 int Monitor::load_metadata()
5258 {
5259 bufferlist bl;
5260 int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
5261 if (r)
5262 return r;
5263 auto it = bl.cbegin();
5264 decode(mon_metadata, it);
5265
5266 pending_metadata = mon_metadata;
5267 return 0;
5268 }
5269
5270 int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
5271 {
5272 ceph_assert(f);
5273 if (!mon_metadata.count(mon)) {
5274 err << "mon." << mon << " not found";
5275 return -EINVAL;
5276 }
5277 const Metadata& m = mon_metadata[mon];
5278 for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
5279 f->dump_string(p->first.c_str(), p->second);
5280 }
5281 return 0;
5282 }
5283
5284 void Monitor::count_metadata(const string& field, map<string,int> *out)
5285 {
5286 for (auto& p : mon_metadata) {
5287 auto q = p.second.find(field);
5288 if (q == p.second.end()) {
5289 (*out)["unknown"]++;
5290 } else {
5291 (*out)[q->second]++;
5292 }
5293 }
5294 }
5295
5296 void Monitor::count_metadata(const string& field, Formatter *f)
5297 {
5298 map<string,int> by_val;
5299 count_metadata(field, &by_val);
5300 f->open_object_section(field.c_str());
5301 for (auto& p : by_val) {
5302 f->dump_int(p.first.c_str(), p.second);
5303 }
5304 f->close_section();
5305 }
5306
5307 int Monitor::print_nodes(Formatter *f, ostream& err)
5308 {
5309 map<string, list<string> > mons; // hostname => mon
5310 for (map<int, Metadata>::iterator it = mon_metadata.begin();
5311 it != mon_metadata.end(); ++it) {
5312 const Metadata& m = it->second;
5313 Metadata::const_iterator hostname = m.find("hostname");
5314 if (hostname == m.end()) {
5315 // not likely though
5316 continue;
5317 }
5318 mons[hostname->second].push_back(monmap->get_name(it->first));
5319 }
5320
5321 dump_services(f, mons, "mon");
5322 return 0;
5323 }
5324
5325 // ----------------------------------------------
5326 // scrub
5327
5328 int Monitor::scrub_start()
5329 {
5330 dout(10) << __func__ << dendl;
5331 ceph_assert(is_leader());
5332
5333 if (!scrub_result.empty()) {
5334 clog->info() << "scrub already in progress";
5335 return -EBUSY;
5336 }
5337
5338 scrub_event_cancel();
5339 scrub_result.clear();
5340 scrub_state.reset(new ScrubState);
5341
5342 scrub();
5343 return 0;
5344 }
5345
5346 int Monitor::scrub()
5347 {
5348 ceph_assert(is_leader());
5349 ceph_assert(scrub_state);
5350
5351 scrub_cancel_timeout();
5352 wait_for_paxos_write();
5353 scrub_version = paxos->get_version();
5354
5355
5356 // scrub all keys if we're the only monitor in the quorum
5357 int32_t num_keys =
5358 (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
5359
5360 for (set<int>::iterator p = quorum.begin();
5361 p != quorum.end();
5362 ++p) {
5363 if (*p == rank)
5364 continue;
5365 MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
5366 num_keys);
5367 r->key = scrub_state->last_key;
5368 send_mon_message(r, *p);
5369 }
5370
5371 // scrub my keys
5372 bool r = _scrub(&scrub_result[rank],
5373 &scrub_state->last_key,
5374 &num_keys);
5375
5376 scrub_state->finished = !r;
5377
5378 // only after we got our scrub results do we really care whether the
5379 // other monitors are late on their results. Also, this way we avoid
5380 // triggering the timeout if we end up getting stuck in _scrub() for
5381 // longer than the duration of the timeout.
5382 scrub_reset_timeout();
5383
5384 if (quorum.size() == 1) {
5385 ceph_assert(scrub_state->finished == true);
5386 scrub_finish();
5387 }
5388 return 0;
5389 }
5390
5391 void Monitor::handle_scrub(MonOpRequestRef op)
5392 {
5393 auto m = op->get_req<MMonScrub>();
5394 dout(10) << __func__ << " " << *m << dendl;
5395 switch (m->op) {
5396 case MMonScrub::OP_SCRUB:
5397 {
5398 if (!is_peon())
5399 break;
5400
5401 wait_for_paxos_write();
5402
5403 if (m->version != paxos->get_version())
5404 break;
5405
5406 MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
5407 m->version,
5408 m->num_keys);
5409
5410 reply->key = m->key;
5411 _scrub(&reply->result, &reply->key, &reply->num_keys);
5412 m->get_connection()->send_message(reply);
5413 }
5414 break;
5415
5416 case MMonScrub::OP_RESULT:
5417 {
5418 if (!is_leader())
5419 break;
5420 if (m->version != scrub_version)
5421 break;
5422 // reset the timeout each time we get a result
5423 scrub_reset_timeout();
5424
5425 int from = m->get_source().num();
5426 ceph_assert(scrub_result.count(from) == 0);
5427 scrub_result[from] = m->result;
5428
5429 if (scrub_result.size() == quorum.size()) {
5430 scrub_check_results();
5431 scrub_result.clear();
5432 if (scrub_state->finished)
5433 scrub_finish();
5434 else
5435 scrub();
5436 }
5437 }
5438 break;
5439 }
5440 }
5441
5442 bool Monitor::_scrub(ScrubResult *r,
5443 pair<string,string> *start,
5444 int *num_keys)
5445 {
5446 ceph_assert(r != NULL);
5447 ceph_assert(start != NULL);
5448 ceph_assert(num_keys != NULL);
5449
5450 set<string> prefixes = get_sync_targets_names();
5451 prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5452
5453 dout(10) << __func__ << " start (" << *start << ")"
5454 << " num_keys " << *num_keys << dendl;
5455
5456 MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
5457
5458 int scrubbed_keys = 0;
5459 pair<string,string> last_key;
5460
5461 while (it->has_next_chunk()) {
5462
5463 if (*num_keys > 0 && scrubbed_keys == *num_keys)
5464 break;
5465
5466 pair<string,string> k = it->get_next_key();
5467 if (prefixes.count(k.first) == 0)
5468 continue;
5469
5470 if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
5471 (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
5472 dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
5473 << dendl;
5474 continue;
5475 }
5476
5477 bufferlist bl;
5478 int err = store->get(k.first, k.second, bl);
5479 ceph_assert(err == 0);
5480
5481 uint32_t key_crc = bl.crc32c(0);
5482 dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
5483 << " crc " << key_crc << dendl;
5484 r->prefix_keys[k.first]++;
5485 if (r->prefix_crc.count(k.first) == 0) {
5486 r->prefix_crc[k.first] = 0;
5487 }
5488 r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
5489
5490 if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
5491 (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
5492 dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
5493 r->prefix_crc[k.first] += 1;
5494 }
5495
5496 ++scrubbed_keys;
5497 last_key = k;
5498 }
5499
5500 dout(20) << __func__ << " last_key (" << last_key << ")"
5501 << " scrubbed_keys " << scrubbed_keys
5502 << " has_next " << it->has_next_chunk() << dendl;
5503
5504 *start = last_key;
5505 *num_keys = scrubbed_keys;
5506
5507 return it->has_next_chunk();
5508 }
5509
5510 void Monitor::scrub_check_results()
5511 {
5512 dout(10) << __func__ << dendl;
5513
5514 // compare
5515 int errors = 0;
5516 ScrubResult& mine = scrub_result[rank];
5517 for (map<int,ScrubResult>::iterator p = scrub_result.begin();
5518 p != scrub_result.end();
5519 ++p) {
5520 if (p->first == rank)
5521 continue;
5522 if (p->second != mine) {
5523 ++errors;
5524 clog->error() << "scrub mismatch";
5525 clog->error() << " mon." << rank << " " << mine;
5526 clog->error() << " mon." << p->first << " " << p->second;
5527 }
5528 }
5529 if (!errors)
5530 clog->debug() << "scrub ok on " << quorum << ": " << mine;
5531 }
5532
5533 inline void Monitor::scrub_timeout()
5534 {
5535 dout(1) << __func__ << " restarting scrub" << dendl;
5536 scrub_reset();
5537 scrub_start();
5538 }
5539
5540 void Monitor::scrub_finish()
5541 {
5542 dout(10) << __func__ << dendl;
5543 scrub_reset();
5544 scrub_event_start();
5545 }
5546
5547 void Monitor::scrub_reset()
5548 {
5549 dout(10) << __func__ << dendl;
5550 scrub_cancel_timeout();
5551 scrub_version = 0;
5552 scrub_result.clear();
5553 scrub_state.reset();
5554 }
5555
5556 inline void Monitor::scrub_update_interval(int secs)
5557 {
5558 // we don't care about changes if we are not the leader.
5559 // changes will be visible if we become the leader.
5560 if (!is_leader())
5561 return;
5562
5563 dout(1) << __func__ << " new interval = " << secs << dendl;
5564
5565 // if scrub already in progress, all changes will already be visible during
5566 // the next round. Nothing to do.
5567 if (scrub_state != NULL)
5568 return;
5569
5570 scrub_event_cancel();
5571 scrub_event_start();
5572 }
5573
5574 void Monitor::scrub_event_start()
5575 {
5576 dout(10) << __func__ << dendl;
5577
5578 if (scrub_event)
5579 scrub_event_cancel();
5580
5581 if (cct->_conf->mon_scrub_interval <= 0) {
5582 dout(1) << __func__ << " scrub event is disabled"
5583 << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval
5584 << ")" << dendl;
5585 return;
5586 }
5587
5588 scrub_event = timer.add_event_after(
5589 cct->_conf->mon_scrub_interval,
5590 new C_MonContext{this, [this](int) {
5591 scrub_start();
5592 }});
5593 }
5594
5595 void Monitor::scrub_event_cancel()
5596 {
5597 dout(10) << __func__ << dendl;
5598 if (scrub_event) {
5599 timer.cancel_event(scrub_event);
5600 scrub_event = NULL;
5601 }
5602 }
5603
5604 inline void Monitor::scrub_cancel_timeout()
5605 {
5606 if (scrub_timeout_event) {
5607 timer.cancel_event(scrub_timeout_event);
5608 scrub_timeout_event = NULL;
5609 }
5610 }
5611
5612 void Monitor::scrub_reset_timeout()
5613 {
5614 dout(15) << __func__ << " reset timeout event" << dendl;
5615 scrub_cancel_timeout();
5616 scrub_timeout_event = timer.add_event_after(
5617 g_conf()->mon_scrub_timeout,
5618 new C_MonContext{this, [this](int) {
5619 scrub_timeout();
5620 }});
5621 }
5622
5623 /************ TICK ***************/
5624 void Monitor::new_tick()
5625 {
5626 timer.add_event_after(g_conf()->mon_tick_interval, new C_MonContext{this, [this](int) {
5627 tick();
5628 }});
5629 }
5630
5631 void Monitor::tick()
5632 {
5633 // ok go.
5634 dout(11) << "tick" << dendl;
5635 const utime_t now = ceph_clock_now();
5636
5637 // Check if we need to emit any delayed health check updated messages
5638 if (is_leader()) {
5639 const auto min_period = g_conf().get_val<int64_t>(
5640 "mon_health_log_update_period");
5641 for (auto& svc : paxos_service) {
5642 auto health = svc->get_health_checks();
5643
5644 for (const auto &i : health.checks) {
5645 const std::string &code = i.first;
5646 const std::string &summary = i.second.summary;
5647 const health_status_t severity = i.second.severity;
5648
5649 auto status_iter = health_check_log_times.find(code);
5650 if (status_iter == health_check_log_times.end()) {
5651 continue;
5652 }
5653
5654 auto &log_status = status_iter->second;
5655 bool const changed = log_status.last_message != summary
5656 || log_status.severity != severity;
5657
5658 if (changed && now - log_status.updated_at > min_period) {
5659 log_status.last_message = summary;
5660 log_status.updated_at = now;
5661 log_status.severity = severity;
5662
5663 ostringstream ss;
5664 ss << "Health check update: " << summary << " (" << code << ")";
5665 clog->health(severity) << ss.str();
5666 }
5667 }
5668 }
5669 }
5670
5671
5672 for (auto& svc : paxos_service) {
5673 svc->tick();
5674 svc->maybe_trim();
5675 }
5676
5677 // trim sessions
5678 {
5679 std::lock_guard l(session_map_lock);
5680 auto p = session_map.sessions.begin();
5681
5682 bool out_for_too_long = (!exited_quorum.is_zero() &&
5683 now > (exited_quorum + 2*g_conf()->mon_lease));
5684
5685 while (!p.end()) {
5686 MonSession *s = *p;
5687 ++p;
5688
5689 // don't trim monitors
5690 if (s->name.is_mon())
5691 continue;
5692
5693 if (s->session_timeout < now && s->con) {
5694 // check keepalive, too
5695 s->session_timeout = s->con->get_last_keepalive();
5696 s->session_timeout += g_conf()->mon_session_timeout;
5697 }
5698 if (s->session_timeout < now) {
5699 dout(10) << " trimming session " << s->con << " " << s->name
5700 << " " << s->addrs
5701 << " (timeout " << s->session_timeout
5702 << " < now " << now << ")" << dendl;
5703 } else if (out_for_too_long) {
5704 // boot the client Session because we've taken too long getting back in
5705 dout(10) << " trimming session " << s->con << " " << s->name
5706 << " because we've been out of quorum too long" << dendl;
5707 } else {
5708 continue;
5709 }
5710
5711 s->con->mark_down();
5712 remove_session(s);
5713 logger->inc(l_mon_session_trim);
5714 }
5715 }
5716 sync_trim_providers();
5717
5718 if (!maybe_wait_for_quorum.empty()) {
5719 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5720 }
5721
5722 if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5723 // this is only necessary on upgraded clusters.
5724 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5725 prepare_new_fingerprint(t);
5726 paxos->trigger_propose();
5727 }
5728
5729 mgr_client.update_daemon_health(get_health_metrics());
5730 new_tick();
5731 }
5732
5733 vector<DaemonHealthMetric> Monitor::get_health_metrics()
5734 {
5735 vector<DaemonHealthMetric> metrics;
5736
5737 utime_t oldest_secs;
5738 const utime_t now = ceph_clock_now();
5739 auto too_old = now;
5740 too_old -= g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count();
5741 int slow = 0;
5742 TrackedOpRef oldest_op;
5743 auto count_slow_ops = [&](TrackedOp& op) {
5744 if (op.get_initiated() < too_old) {
5745 slow++;
5746 if (!oldest_op || op.get_initiated() < oldest_op->get_initiated()) {
5747 oldest_op = &op;
5748 }
5749 return true;
5750 } else {
5751 return false;
5752 }
5753 };
5754 if (op_tracker.visit_ops_in_flight(&oldest_secs, count_slow_ops)) {
5755 if (slow) {
5756 derr << __func__ << " reporting " << slow << " slow ops, oldest is "
5757 << oldest_op->get_desc() << dendl;
5758 }
5759 metrics.emplace_back(daemon_metric::SLOW_OPS, slow, oldest_secs);
5760 } else {
5761 metrics.emplace_back(daemon_metric::SLOW_OPS, 0, 0);
5762 }
5763 return metrics;
5764 }
5765
5766 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5767 {
5768 uuid_d nf;
5769 nf.generate_random();
5770 dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5771
5772 bufferlist bl;
5773 encode(nf, bl);
5774 t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5775 }
5776
5777 int Monitor::check_fsid()
5778 {
5779 bufferlist ebl;
5780 int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5781 if (r == -ENOENT)
5782 return r;
5783 ceph_assert(r == 0);
5784
5785 string es(ebl.c_str(), ebl.length());
5786
5787 // only keep the first line
5788 size_t pos = es.find_first_of('\n');
5789 if (pos != string::npos)
5790 es.resize(pos);
5791
5792 dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
5793 uuid_d ondisk;
5794 if (!ondisk.parse(es.c_str())) {
5795 derr << "error: unable to parse uuid" << dendl;
5796 return -EINVAL;
5797 }
5798
5799 if (monmap->get_fsid() != ondisk) {
5800 derr << "error: cluster_uuid file exists with value " << ondisk
5801 << ", != our uuid " << monmap->get_fsid() << dendl;
5802 return -EEXIST;
5803 }
5804
5805 return 0;
5806 }
5807
5808 int Monitor::write_fsid()
5809 {
5810 auto t(std::make_shared<MonitorDBStore::Transaction>());
5811 write_fsid(t);
5812 int r = store->apply_transaction(t);
5813 return r;
5814 }
5815
5816 int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
5817 {
5818 ostringstream ss;
5819 ss << monmap->get_fsid() << "\n";
5820 string us = ss.str();
5821
5822 bufferlist b;
5823 b.append(us);
5824
5825 t->put(MONITOR_NAME, "cluster_uuid", b);
5826 return 0;
5827 }
5828
5829 /*
5830 * this is the closest thing to a traditional 'mkfs' for ceph.
5831 * initialize the monitor state machines to their initial values.
5832 */
5833 int Monitor::mkfs(bufferlist& osdmapbl)
5834 {
5835 auto t(std::make_shared<MonitorDBStore::Transaction>());
5836
5837 // verify cluster fsid
5838 int r = check_fsid();
5839 if (r < 0 && r != -ENOENT)
5840 return r;
5841
5842 bufferlist magicbl;
5843 magicbl.append(CEPH_MON_ONDISK_MAGIC);
5844 magicbl.append("\n");
5845 t->put(MONITOR_NAME, "magic", magicbl);
5846
5847
5848 features = get_initial_supported_features();
5849 write_features(t);
5850
5851 // save monmap, osdmap, keyring.
5852 bufferlist monmapbl;
5853 monmap->encode(monmapbl, CEPH_FEATURES_ALL);
5854 monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5855 t->put("mkfs", "monmap", monmapbl);
5856
5857 if (osdmapbl.length()) {
5858 // make sure it's a valid osdmap
5859 try {
5860 OSDMap om;
5861 om.decode(osdmapbl);
5862 }
5863 catch (buffer::error& e) {
5864 derr << "error decoding provided osdmap: " << e.what() << dendl;
5865 return -EINVAL;
5866 }
5867 t->put("mkfs", "osdmap", osdmapbl);
5868 }
5869
5870 if (is_keyring_required()) {
5871 KeyRing keyring;
5872 string keyring_filename;
5873
5874 r = ceph_resolve_file_search(g_conf()->keyring, keyring_filename);
5875 if (r) {
5876 derr << "unable to find a keyring file on " << g_conf()->keyring
5877 << ": " << cpp_strerror(r) << dendl;
5878 if (g_conf()->key != "") {
5879 string keyring_plaintext = "[mon.]\n\tkey = " + g_conf()->key +
5880 "\n\tcaps mon = \"allow *\"\n";
5881 bufferlist bl;
5882 bl.append(keyring_plaintext);
5883 try {
5884 auto i = bl.cbegin();
5885 keyring.decode_plaintext(i);
5886 }
5887 catch (const buffer::error& e) {
5888 derr << "error decoding keyring " << keyring_plaintext
5889 << ": " << e.what() << dendl;
5890 return -EINVAL;
5891 }
5892 } else {
5893 return -ENOENT;
5894 }
5895 } else {
5896 r = keyring.load(g_ceph_context, keyring_filename);
5897 if (r < 0) {
5898 derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
5899 return r;
5900 }
5901 }
5902
5903 // put mon. key in external keyring; seed with everything else.
5904 extract_save_mon_key(keyring);
5905
5906 bufferlist keyringbl;
5907 keyring.encode_plaintext(keyringbl);
5908 t->put("mkfs", "keyring", keyringbl);
5909 }
5910 write_fsid(t);
5911 store->apply_transaction(t);
5912
5913 return 0;
5914 }
5915
5916 int Monitor::write_default_keyring(bufferlist& bl)
5917 {
5918 ostringstream os;
5919 os << g_conf()->mon_data << "/keyring";
5920
5921 int err = 0;
5922 int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT|O_CLOEXEC, 0600);
5923 if (fd < 0) {
5924 err = -errno;
5925 dout(0) << __func__ << " failed to open " << os.str()
5926 << ": " << cpp_strerror(err) << dendl;
5927 return err;
5928 }
5929
5930 err = bl.write_fd(fd);
5931 if (!err)
5932 ::fsync(fd);
5933 VOID_TEMP_FAILURE_RETRY(::close(fd));
5934
5935 return err;
5936 }
5937
5938 void Monitor::extract_save_mon_key(KeyRing& keyring)
5939 {
5940 EntityName mon_name;
5941 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
5942 EntityAuth mon_key;
5943 if (keyring.get_auth(mon_name, mon_key)) {
5944 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
5945 KeyRing pkey;
5946 pkey.add(mon_name, mon_key);
5947 bufferlist bl;
5948 pkey.encode_plaintext(bl);
5949 write_default_keyring(bl);
5950 keyring.remove(mon_name);
5951 }
5952 }
5953
5954 // AuthClient methods -- for mon <-> mon communication
5955 int Monitor::get_auth_request(
5956 Connection *con,
5957 AuthConnectionMeta *auth_meta,
5958 uint32_t *method,
5959 vector<uint32_t> *preferred_modes,
5960 bufferlist *out)
5961 {
5962 std::scoped_lock l(auth_lock);
5963 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
5964 con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
5965 return -EACCES;
5966 }
5967 AuthAuthorizer *auth;
5968 if (!get_authorizer(con->get_peer_type(), &auth)) {
5969 return -EACCES;
5970 }
5971 auth_meta->authorizer.reset(auth);
5972 auth_registry.get_supported_modes(con->get_peer_type(),
5973 auth->protocol,
5974 preferred_modes);
5975 *method = auth->protocol;
5976 *out = auth->bl;
5977 return 0;
5978 }
5979
5980 int Monitor::handle_auth_reply_more(
5981 Connection *con,
5982 AuthConnectionMeta *auth_meta,
5983 const bufferlist& bl,
5984 bufferlist *reply)
5985 {
5986 std::scoped_lock l(auth_lock);
5987 if (!auth_meta->authorizer) {
5988 derr << __func__ << " no authorizer?" << dendl;
5989 return -EACCES;
5990 }
5991 auth_meta->authorizer->add_challenge(cct, bl);
5992 *reply = auth_meta->authorizer->bl;
5993 return 0;
5994 }
5995
5996 int Monitor::handle_auth_done(
5997 Connection *con,
5998 AuthConnectionMeta *auth_meta,
5999 uint64_t global_id,
6000 uint32_t con_mode,
6001 const bufferlist& bl,
6002 CryptoKey *session_key,
6003 std::string *connection_secret)
6004 {
6005 std::scoped_lock l(auth_lock);
6006 // verify authorizer reply
6007 auto p = bl.begin();
6008 if (!auth_meta->authorizer->verify_reply(p, connection_secret)) {
6009 dout(0) << __func__ << " failed verifying authorizer reply" << dendl;
6010 return -EACCES;
6011 }
6012 auth_meta->session_key = auth_meta->authorizer->session_key;
6013 return 0;
6014 }
6015
6016 int Monitor::handle_auth_bad_method(
6017 Connection *con,
6018 AuthConnectionMeta *auth_meta,
6019 uint32_t old_auth_method,
6020 int result,
6021 const std::vector<uint32_t>& allowed_methods,
6022 const std::vector<uint32_t>& allowed_modes)
6023 {
6024 derr << __func__ << " hmm, they didn't like " << old_auth_method
6025 << " result " << cpp_strerror(result) << dendl;
6026 return -EACCES;
6027 }
6028
6029 bool Monitor::get_authorizer(int service_id, AuthAuthorizer **authorizer)
6030 {
6031 dout(10) << "get_authorizer for " << ceph_entity_type_name(service_id)
6032 << dendl;
6033
6034 if (is_shutdown())
6035 return false;
6036
6037 // we only connect to other monitors and mgr; every else connects to us.
6038 if (service_id != CEPH_ENTITY_TYPE_MON &&
6039 service_id != CEPH_ENTITY_TYPE_MGR)
6040 return false;
6041
6042 if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
6043 // auth_none
6044 dout(20) << __func__ << " building auth_none authorizer" << dendl;
6045 AuthNoneClientHandler handler{g_ceph_context};
6046 handler.set_global_id(0);
6047 *authorizer = handler.build_authorizer(service_id);
6048 return true;
6049 }
6050
6051 CephXServiceTicketInfo auth_ticket_info;
6052 CephXSessionAuthInfo info;
6053 int ret;
6054
6055 EntityName name;
6056 name.set_type(CEPH_ENTITY_TYPE_MON);
6057 auth_ticket_info.ticket.name = name;
6058 auth_ticket_info.ticket.global_id = 0;
6059
6060 if (service_id == CEPH_ENTITY_TYPE_MON) {
6061 // mon to mon authentication uses the private monitor shared key and not the
6062 // rotating key
6063 CryptoKey secret;
6064 if (!keyring.get_secret(name, secret) &&
6065 !key_server.get_secret(name, secret)) {
6066 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
6067 << dendl;
6068 stringstream ss, ds;
6069 int err = key_server.list_secrets(ds);
6070 if (err < 0)
6071 ss << "no installed auth entries!";
6072 else
6073 ss << "installed auth entries:";
6074 dout(0) << ss.str() << "\n" << ds.str() << dendl;
6075 return false;
6076 }
6077
6078 ret = key_server.build_session_auth_info(
6079 service_id, auth_ticket_info.ticket, info, secret, (uint64_t)-1);
6080 if (ret < 0) {
6081 dout(0) << __func__ << " failed to build mon session_auth_info "
6082 << cpp_strerror(ret) << dendl;
6083 return false;
6084 }
6085 } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
6086 // mgr
6087 ret = key_server.build_session_auth_info(
6088 service_id, auth_ticket_info.ticket, info);
6089 if (ret < 0) {
6090 derr << __func__ << " failed to build mgr service session_auth_info "
6091 << cpp_strerror(ret) << dendl;
6092 return false;
6093 }
6094 } else {
6095 ceph_abort(); // see check at top of fn
6096 }
6097
6098 CephXTicketBlob blob;
6099 if (!cephx_build_service_ticket_blob(cct, info, blob)) {
6100 dout(0) << "get_authorizer failed to build service ticket" << dendl;
6101 return false;
6102 }
6103 bufferlist ticket_data;
6104 encode(blob, ticket_data);
6105
6106 auto iter = ticket_data.cbegin();
6107 CephXTicketHandler handler(g_ceph_context, service_id);
6108 decode(handler.ticket, iter);
6109
6110 handler.session_key = info.session_key;
6111
6112 *authorizer = handler.build_authorizer(0);
6113
6114 return true;
6115 }
6116
6117 int Monitor::handle_auth_request(
6118 Connection *con,
6119 AuthConnectionMeta *auth_meta,
6120 bool more,
6121 uint32_t auth_method,
6122 const bufferlist &payload,
6123 bufferlist *reply)
6124 {
6125 std::scoped_lock l(auth_lock);
6126
6127 // NOTE: be careful, the Connection hasn't fully negotiated yet, so
6128 // e.g., peer_features, peer_addrs, and others are still unknown.
6129
6130 dout(10) << __func__ << " con " << con << (more ? " (more)":" (start)")
6131 << " method " << auth_method
6132 << " payload " << payload.length()
6133 << dendl;
6134 if (!payload.length()) {
6135 if (!con->is_msgr2() &&
6136 con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
6137 // for v1 connections, we tolerate no authorizer (from
6138 // non-monitors), because authentication happens via MAuth
6139 // messages.
6140 return 1;
6141 }
6142 return -EACCES;
6143 }
6144 if (!more) {
6145 auth_meta->auth_mode = payload[0];
6146 }
6147
6148 if (auth_meta->auth_mode >= AUTH_MODE_AUTHORIZER &&
6149 auth_meta->auth_mode <= AUTH_MODE_AUTHORIZER_MAX) {
6150 AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
6151 auth_method);
6152 if (!ah) {
6153 lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
6154 << auth_method << dendl;
6155 return -EOPNOTSUPP;
6156 }
6157 bool was_challenge = (bool)auth_meta->authorizer_challenge;
6158 bool isvalid = ah->verify_authorizer(
6159 cct,
6160 keyring,
6161 payload,
6162 auth_meta->get_connection_secret_length(),
6163 reply,
6164 &con->peer_name,
6165 &con->peer_global_id,
6166 &con->peer_caps_info,
6167 &auth_meta->session_key,
6168 &auth_meta->connection_secret,
6169 &auth_meta->authorizer_challenge);
6170 if (isvalid) {
6171 ms_handle_authentication(con);
6172 return 1;
6173 }
6174 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
6175 return 0;
6176 }
6177 dout(10) << __func__ << " bad authorizer on " << con << dendl;
6178 return -EACCES;
6179 } else if (auth_meta->auth_mode < AUTH_MODE_MON ||
6180 auth_meta->auth_mode > AUTH_MODE_MON_MAX) {
6181 derr << __func__ << " unrecognized auth mode " << auth_meta->auth_mode
6182 << dendl;
6183 return -EACCES;
6184 }
6185
6186 // wait until we've formed an initial quorum on mkfs so that we have
6187 // the initial keys (e.g., client.admin).
6188 if (authmon()->get_last_committed() == 0) {
6189 dout(10) << __func__ << " haven't formed initial quorum, EBUSY" << dendl;
6190 return -EBUSY;
6191 }
6192
6193 RefCountedPtr priv;
6194 MonSession *s;
6195 int32_t r = 0;
6196 auto p = payload.begin();
6197 if (!more) {
6198 if (con->get_priv()) {
6199 return -EACCES; // wtf
6200 }
6201
6202 // handler?
6203 unique_ptr<AuthServiceHandler> auth_handler{get_auth_service_handler(
6204 auth_method, g_ceph_context, &key_server)};
6205 if (!auth_handler) {
6206 dout(1) << __func__ << " auth_method " << auth_method << " not supported"
6207 << dendl;
6208 return -EOPNOTSUPP;
6209 }
6210
6211 uint8_t mode;
6212 EntityName entity_name;
6213
6214 try {
6215 decode(mode, p);
6216 if (mode < AUTH_MODE_MON ||
6217 mode > AUTH_MODE_MON_MAX) {
6218 dout(1) << __func__ << " invalid mode " << (int)mode << dendl;
6219 return -EACCES;
6220 }
6221 assert(mode >= AUTH_MODE_MON && mode <= AUTH_MODE_MON_MAX);
6222 decode(entity_name, p);
6223 decode(con->peer_global_id, p);
6224 } catch (buffer::error& e) {
6225 dout(1) << __func__ << " failed to decode, " << e.what() << dendl;
6226 return -EACCES;
6227 }
6228
6229 // supported method?
6230 if (entity_name.get_type() == CEPH_ENTITY_TYPE_MON ||
6231 entity_name.get_type() == CEPH_ENTITY_TYPE_OSD ||
6232 entity_name.get_type() == CEPH_ENTITY_TYPE_MDS ||
6233 entity_name.get_type() == CEPH_ENTITY_TYPE_MGR) {
6234 if (!auth_cluster_required.is_supported_auth(auth_method)) {
6235 dout(10) << __func__ << " entity " << entity_name << " method "
6236 << auth_method << " not among supported "
6237 << auth_cluster_required.get_supported_set() << dendl;
6238 return -EOPNOTSUPP;
6239 }
6240 } else {
6241 if (!auth_service_required.is_supported_auth(auth_method)) {
6242 dout(10) << __func__ << " entity " << entity_name << " method "
6243 << auth_method << " not among supported "
6244 << auth_cluster_required.get_supported_set() << dendl;
6245 return -EOPNOTSUPP;
6246 }
6247 }
6248
6249 // for msgr1 we would do some weirdness here to ensure signatures
6250 // are supported by the client if we require it. for msgr2 that
6251 // is not necessary.
6252
6253 if (!con->peer_global_id) {
6254 con->peer_global_id = authmon()->_assign_global_id();
6255 if (!con->peer_global_id) {
6256 dout(1) << __func__ << " failed to assign global_id" << dendl;
6257 return -EBUSY;
6258 }
6259 dout(10) << __func__ << " assigned global_id " << con->peer_global_id
6260 << dendl;
6261 }
6262
6263 // set up partial session
6264 s = new MonSession(con);
6265 s->auth_handler = auth_handler.release();
6266 con->set_priv(RefCountedPtr{s, false});
6267
6268 r = s->auth_handler->start_session(
6269 entity_name,
6270 auth_meta->get_connection_secret_length(),
6271 reply,
6272 &con->peer_caps_info,
6273 &auth_meta->session_key,
6274 &auth_meta->connection_secret);
6275 } else {
6276 priv = con->get_priv();
6277 if (!priv) {
6278 // this can happen if the async ms_handle_reset event races with
6279 // the unlocked call into handle_auth_request
6280 return -EACCES;
6281 }
6282 s = static_cast<MonSession*>(priv.get());
6283 r = s->auth_handler->handle_request(
6284 p,
6285 auth_meta->get_connection_secret_length(),
6286 reply,
6287 &con->peer_global_id,
6288 &con->peer_caps_info,
6289 &auth_meta->session_key,
6290 &auth_meta->connection_secret);
6291 }
6292 if (r > 0 &&
6293 !s->authenticated) {
6294 ms_handle_authentication(con);
6295 }
6296
6297 dout(30) << " r " << r << " reply:\n";
6298 reply->hexdump(*_dout);
6299 *_dout << dendl;
6300 return r;
6301 }
6302
6303 void Monitor::ms_handle_accept(Connection *con)
6304 {
6305 auto priv = con->get_priv();
6306 MonSession *s = static_cast<MonSession*>(priv.get());
6307 if (!s) {
6308 // legacy protocol v1?
6309 dout(10) << __func__ << " con " << con << " no session" << dendl;
6310 return;
6311 }
6312
6313 if (s->item.is_on_list()) {
6314 dout(10) << __func__ << " con " << con << " session " << s
6315 << " already on list" << dendl;
6316 } else {
6317 dout(10) << __func__ << " con " << con << " session " << s
6318 << " registering session for "
6319 << con->get_peer_addrs() << dendl;
6320 s->_ident(entity_name_t(con->get_peer_type(), con->get_peer_id()),
6321 con->get_peer_addrs());
6322 std::lock_guard l(session_map_lock);
6323 session_map.add_session(s);
6324 }
6325 }
6326
6327 int Monitor::ms_handle_authentication(Connection *con)
6328 {
6329 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
6330 // mon <-> mon connections need no Session, and setting one up
6331 // creates an awkward ref cycle between Session and Connection.
6332 return 1;
6333 }
6334
6335 auto priv = con->get_priv();
6336 MonSession *s = static_cast<MonSession*>(priv.get());
6337 if (!s) {
6338 // must be msgr2, otherwise dispatch would have set up the session.
6339 s = session_map.new_session(
6340 entity_name_t(con->get_peer_type(), -1), // we don't know yet
6341 con->get_peer_addrs(),
6342 con);
6343 assert(s);
6344 dout(10) << __func__ << " adding session " << s << " to con " << con
6345 << dendl;
6346 con->set_priv(s);
6347 logger->set(l_mon_num_sessions, session_map.get_size());
6348 logger->inc(l_mon_session_add);
6349 }
6350 dout(10) << __func__ << " session " << s << " con " << con
6351 << " addr " << s->con->get_peer_addr()
6352 << " " << *s << dendl;
6353
6354 AuthCapsInfo &caps_info = con->get_peer_caps_info();
6355 int ret = 0;
6356 if (caps_info.allow_all) {
6357 s->caps.set_allow_all();
6358 s->authenticated = true;
6359 ret = 1;
6360 } else if (caps_info.caps.length()) {
6361 bufferlist::const_iterator p = caps_info.caps.cbegin();
6362 string str;
6363 try {
6364 decode(str, p);
6365 } catch (const buffer::error &err) {
6366 derr << __func__ << " corrupt cap data for " << con->get_peer_entity_name()
6367 << " in auth db" << dendl;
6368 str.clear();
6369 ret = -EACCES;
6370 }
6371 if (ret >= 0) {
6372 if (s->caps.parse(str, NULL)) {
6373 s->authenticated = true;
6374 ret = 1;
6375 } else {
6376 derr << __func__ << " unparseable caps '" << str << "' for "
6377 << con->get_peer_entity_name() << dendl;
6378 ret = -EACCES;
6379 }
6380 }
6381 }
6382
6383 return ret;
6384 }