]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/Monitor.cc
f756c1f07e4477e6e662a8866d0dc4d068045acb
[ceph.git] / ceph / src / mon / Monitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include <iterator>
17 #include <sstream>
18 #include <tuple>
19 #include <stdlib.h>
20 #include <signal.h>
21 #include <limits.h>
22 #include <cstring>
23 #include <boost/scope_exit.hpp>
24 #include <boost/algorithm/string/predicate.hpp>
25
26 #include "json_spirit/json_spirit_reader.h"
27 #include "json_spirit/json_spirit_writer.h"
28
29 #include "Monitor.h"
30 #include "common/version.h"
31 #include "common/blkdev.h"
32 #include "common/cmdparse.h"
33 #include "common/signal.h"
34
35 #include "osd/OSDMap.h"
36
37 #include "MonitorDBStore.h"
38
39 #include "messages/PaxosServiceMessage.h"
40 #include "messages/MMonMap.h"
41 #include "messages/MMonGetMap.h"
42 #include "messages/MMonGetVersion.h"
43 #include "messages/MMonGetVersionReply.h"
44 #include "messages/MGenericMessage.h"
45 #include "messages/MMonCommand.h"
46 #include "messages/MMonCommandAck.h"
47 #include "messages/MMonSync.h"
48 #include "messages/MMonScrub.h"
49 #include "messages/MMonProbe.h"
50 #include "messages/MMonJoin.h"
51 #include "messages/MMonPaxos.h"
52 #include "messages/MRoute.h"
53 #include "messages/MForward.h"
54
55 #include "messages/MMonSubscribe.h"
56 #include "messages/MMonSubscribeAck.h"
57
58 #include "messages/MCommand.h"
59 #include "messages/MCommandReply.h"
60
61 #include "messages/MTimeCheck2.h"
62 #include "messages/MPing.h"
63
64 #include "common/strtol.h"
65 #include "common/ceph_argparse.h"
66 #include "common/Timer.h"
67 #include "common/Clock.h"
68 #include "common/errno.h"
69 #include "common/perf_counters.h"
70 #include "common/admin_socket.h"
71 #include "global/signal_handler.h"
72 #include "common/Formatter.h"
73 #include "include/stringify.h"
74 #include "include/color.h"
75 #include "include/ceph_fs.h"
76 #include "include/str_list.h"
77
78 #include "OSDMonitor.h"
79 #include "MDSMonitor.h"
80 #include "MonmapMonitor.h"
81 #include "LogMonitor.h"
82 #include "AuthMonitor.h"
83 #include "MgrMonitor.h"
84 #include "MgrStatMonitor.h"
85 #include "ConfigMonitor.h"
86 #include "KVMonitor.h"
87 #include "mon/HealthMonitor.h"
88 #include "common/config.h"
89 #include "common/cmdparse.h"
90 #include "include/ceph_assert.h"
91 #include "include/compat.h"
92 #include "perfglue/heap_profiler.h"
93
94 #include "auth/none/AuthNoneClientHandler.h"
95
96 #define dout_subsys ceph_subsys_mon
97 #undef dout_prefix
98 #define dout_prefix _prefix(_dout, this)
99 using namespace TOPNSPC::common;
100
101 using std::cout;
102 using std::dec;
103 using std::hex;
104 using std::list;
105 using std::map;
106 using std::make_pair;
107 using std::ostream;
108 using std::ostringstream;
109 using std::pair;
110 using std::set;
111 using std::setfill;
112 using std::string;
113 using std::stringstream;
114 using std::to_string;
115 using std::vector;
116 using std::unique_ptr;
117
118 using ceph::bufferlist;
119 using ceph::decode;
120 using ceph::encode;
121 using ceph::ErasureCodeInterfaceRef;
122 using ceph::ErasureCodeProfile;
123 using ceph::Formatter;
124 using ceph::JSONFormatter;
125 using ceph::make_message;
126 using ceph::mono_clock;
127 using ceph::mono_time;
128 using ceph::timespan_str;
129
130
131 static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
132 return *_dout << "mon." << mon->name << "@" << mon->rank
133 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
134 }
135
136 const string Monitor::MONITOR_NAME = "monitor";
137 const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
138
139
140 #undef FLAG
141 #undef COMMAND
142 #undef COMMAND_WITH_FLAG
143 #define FLAG(f) (MonCommand::FLAG_##f)
144 #define COMMAND(parsesig, helptext, modulename, req_perms) \
145 {parsesig, helptext, modulename, req_perms, FLAG(NONE)},
146 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, flags) \
147 {parsesig, helptext, modulename, req_perms, flags},
148 MonCommand mon_commands[] = {
149 #include <mon/MonCommands.h>
150 };
151 #undef COMMAND
152 #undef COMMAND_WITH_FLAG
153
154 Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
155 Messenger *m, Messenger *mgr_m, MonMap *map) :
156 Dispatcher(cct_),
157 AuthServer(cct_),
158 name(nm),
159 rank(-1),
160 messenger(m),
161 con_self(m ? m->get_loopback_connection() : NULL),
162 timer(cct_, lock),
163 finisher(cct_, "mon_finisher", "fin"),
164 cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads),
165 has_ever_joined(false),
166 logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
167 monmap(map),
168 log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
169 key_server(cct, &keyring),
170 auth_cluster_required(cct,
171 cct->_conf->auth_supported.empty() ?
172 cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
173 auth_service_required(cct,
174 cct->_conf->auth_supported.empty() ?
175 cct->_conf->auth_service_required : cct->_conf->auth_supported),
176 mgr_messenger(mgr_m),
177 mgr_client(cct_, mgr_m, monmap),
178 gss_ktfile_client(cct->_conf.get_val<std::string>("gss_ktab_client_file")),
179 store(s),
180
181 elector(this, map->strategy),
182 required_features(0),
183 leader(0),
184 quorum_con_features(0),
185 // scrub
186 scrub_version(0),
187 scrub_event(NULL),
188 scrub_timeout_event(NULL),
189
190 // sync state
191 sync_provider_count(0),
192 sync_cookie(0),
193 sync_full(false),
194 sync_start_version(0),
195 sync_timeout_event(NULL),
196 sync_last_committed_floor(0),
197
198 timecheck_round(0),
199 timecheck_acks(0),
200 timecheck_rounds_since_clean(0),
201 timecheck_event(NULL),
202
203 admin_hook(NULL),
204 routed_request_tid(0),
205 op_tracker(cct, g_conf().get_val<bool>("mon_enable_op_tracker"), 1)
206 {
207 clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
208 audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
209
210 update_log_clients();
211
212 if (!gss_ktfile_client.empty()) {
213 // Assert we can export environment variable
214 /*
215 The default client keytab is used, if it is present and readable,
216 to automatically obtain initial credentials for GSSAPI client
217 applications. The principal name of the first entry in the client
218 keytab is used by default when obtaining initial credentials.
219 1. The KRB5_CLIENT_KTNAME environment variable.
220 2. The default_client_keytab_name profile variable in [libdefaults].
221 3. The hardcoded default, DEFCKTNAME.
222 */
223 const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
224 gss_ktfile_client.c_str(), 1));
225 ceph_assert(set_result == 0);
226 }
227
228 op_tracker.set_complaint_and_threshold(
229 g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count(),
230 g_conf().get_val<int64_t>("mon_op_log_threshold"));
231 op_tracker.set_history_size_and_duration(
232 g_conf().get_val<uint64_t>("mon_op_history_size"),
233 g_conf().get_val<std::chrono::seconds>("mon_op_history_duration").count());
234 op_tracker.set_history_slow_op_size_and_threshold(
235 g_conf().get_val<uint64_t>("mon_op_history_slow_op_size"),
236 g_conf().get_val<std::chrono::seconds>("mon_op_history_slow_op_threshold").count());
237
238 paxos = std::make_unique<Paxos>(*this, "paxos");
239
240 paxos_service[PAXOS_MDSMAP].reset(new MDSMonitor(*this, *paxos, "mdsmap"));
241 paxos_service[PAXOS_MONMAP].reset(new MonmapMonitor(*this, *paxos, "monmap"));
242 paxos_service[PAXOS_OSDMAP].reset(new OSDMonitor(cct, *this, *paxos, "osdmap"));
243 paxos_service[PAXOS_LOG].reset(new LogMonitor(*this, *paxos, "logm"));
244 paxos_service[PAXOS_AUTH].reset(new AuthMonitor(*this, *paxos, "auth"));
245 paxos_service[PAXOS_MGR].reset(new MgrMonitor(*this, *paxos, "mgr"));
246 paxos_service[PAXOS_MGRSTAT].reset(new MgrStatMonitor(*this, *paxos, "mgrstat"));
247 paxos_service[PAXOS_HEALTH].reset(new HealthMonitor(*this, *paxos, "health"));
248 paxos_service[PAXOS_CONFIG].reset(new ConfigMonitor(*this, *paxos, "config"));
249 paxos_service[PAXOS_KV].reset(new KVMonitor(*this, *paxos, "kv"));
250
251 bool r = mon_caps.parse("allow *", NULL);
252 ceph_assert(r);
253
254 exited_quorum = ceph_clock_now();
255
256 // prepare local commands
257 local_mon_commands.resize(std::size(mon_commands));
258 for (unsigned i = 0; i < std::size(mon_commands); ++i) {
259 local_mon_commands[i] = mon_commands[i];
260 }
261 MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
262
263 prenautilus_local_mon_commands = local_mon_commands;
264 for (auto& i : prenautilus_local_mon_commands) {
265 std::string n = cmddesc_get_prenautilus_compat(i.cmdstring);
266 if (n != i.cmdstring) {
267 dout(20) << " pre-nautilus cmd " << i.cmdstring << " -> " << n << dendl;
268 i.cmdstring = n;
269 }
270 }
271 MonCommand::encode_vector(prenautilus_local_mon_commands, prenautilus_local_mon_commands_bl);
272
273 // assume our commands until we have an election. this only means
274 // we won't reply with EINVAL before the election; any command that
275 // actually matters will wait until we have quorum etc and then
276 // retry (and revalidate).
277 leader_mon_commands = local_mon_commands;
278 }
279
280 Monitor::~Monitor()
281 {
282 op_tracker.on_shutdown();
283
284 delete logger;
285 ceph_assert(session_map.sessions.empty());
286 }
287
288
289 class AdminHook : public AdminSocketHook {
290 Monitor *mon;
291 public:
292 explicit AdminHook(Monitor *m) : mon(m) {}
293 int call(std::string_view command, const cmdmap_t& cmdmap,
294 Formatter *f,
295 std::ostream& errss,
296 bufferlist& out) override {
297 stringstream outss;
298 int r = mon->do_admin_command(command, cmdmap, f, errss, outss);
299 out.append(outss);
300 return r;
301 }
302 };
303
304 int Monitor::do_admin_command(
305 std::string_view command,
306 const cmdmap_t& cmdmap,
307 Formatter *f,
308 std::ostream& err,
309 std::ostream& out)
310 {
311 std::lock_guard l(lock);
312
313 int r = 0;
314 string args;
315 for (auto p = cmdmap.begin();
316 p != cmdmap.end(); ++p) {
317 if (p->first == "prefix")
318 continue;
319 if (!args.empty())
320 args += ", ";
321 args += cmd_vartype_stringify(p->second);
322 }
323 args = "[" + args + "]";
324
325 bool read_only = (command == "mon_status" ||
326 command == "mon metadata" ||
327 command == "quorum_status" ||
328 command == "ops" ||
329 command == "sessions");
330
331 (read_only ? audit_clog->debug() : audit_clog->info())
332 << "from='admin socket' entity='admin socket' "
333 << "cmd='" << command << "' args=" << args << ": dispatch";
334
335 if (command == "mon_status") {
336 get_mon_status(f);
337 } else if (command == "quorum_status") {
338 _quorum_status(f, out);
339 } else if (command == "sync_force") {
340 bool validate = false;
341 if (!cmd_getval(cmdmap, "yes_i_really_mean_it", validate)) {
342 std::string v;
343 if (cmd_getval(cmdmap, "validate", v) &&
344 v == "--yes-i-really-mean-it") {
345 validate = true;
346 }
347 }
348 if (!validate) {
349 err << "are you SURE? this will mean the monitor store will be erased "
350 "the next time the monitor is restarted. pass "
351 "'--yes-i-really-mean-it' if you really do.";
352 r = -EPERM;
353 goto abort;
354 }
355 sync_force(f);
356 } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0 ||
357 command.compare(0, 24, "add_bootstrap_peer_hintv") == 0) {
358 if (!_add_bootstrap_peer_hint(command, cmdmap, out))
359 goto abort;
360 } else if (command == "quorum enter") {
361 elector.start_participating();
362 start_election();
363 out << "started responding to quorum, initiated new election";
364 } else if (command == "quorum exit") {
365 start_election();
366 elector.stop_participating();
367 out << "stopped responding to quorum, initiated new election";
368 } else if (command == "ops") {
369 (void)op_tracker.dump_ops_in_flight(f);
370 } else if (command == "sessions") {
371 f->open_array_section("sessions");
372 for (auto p : session_map.sessions) {
373 f->dump_object("session", *p);
374 }
375 f->close_section();
376 } else if (command == "dump_historic_ops") {
377 if (!op_tracker.dump_historic_ops(f)) {
378 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
379 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
380 }
381 } else if (command == "dump_historic_ops_by_duration" ) {
382 if (op_tracker.dump_historic_ops(f, true)) {
383 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
384 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
385 }
386 } else if (command == "dump_historic_slow_ops") {
387 if (op_tracker.dump_historic_slow_ops(f, {})) {
388 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
389 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
390 }
391 } else if (command == "quorum") {
392 string quorumcmd;
393 cmd_getval(cmdmap, "quorumcmd", quorumcmd);
394 if (quorumcmd == "exit") {
395 start_election();
396 elector.stop_participating();
397 out << "stopped responding to quorum, initiated new election" << std::endl;
398 } else if (quorumcmd == "enter") {
399 elector.start_participating();
400 start_election();
401 out << "started responding to quorum, initiated new election" << std::endl;
402 } else {
403 err << "needs a valid 'quorum' command" << std::endl;
404 }
405 } else if (command == "connection scores dump") {
406 if (!get_quorum_mon_features().contains_all(
407 ceph::features::mon::FEATURE_PINGING)) {
408 err << "Not all monitors support changing election strategies; \
409 please upgrade them first!";
410 }
411 elector.dump_connection_scores(f);
412 } else if (command == "connection scores reset") {
413 if (!get_quorum_mon_features().contains_all(
414 ceph::features::mon::FEATURE_PINGING)) {
415 err << "Not all monitors support changing election strategies; \
416 please upgrade them first!";
417 }
418 elector.notify_clear_peer_state();
419 } else if (command == "smart") {
420 string want_devid;
421 cmd_getval(cmdmap, "devid", want_devid);
422
423 string devname = store->get_devname();
424 if (devname.empty()) {
425 err << "could not determine device name for " << store->get_path();
426 r = -ENOENT;
427 goto abort;
428 }
429 set<string> devnames;
430 get_raw_devices(devname, &devnames);
431 json_spirit::mObject json_map;
432 uint64_t smart_timeout = cct->_conf.get_val<uint64_t>(
433 "mon_smart_report_timeout");
434 for (auto& devname : devnames) {
435 string err;
436 string devid = get_device_id(devname, &err);
437 if (want_devid.size() && want_devid != devid) {
438 derr << "get_device_id failed on " << devname << ": " << err << dendl;
439 continue;
440 }
441 json_spirit::mValue smart_json;
442 if (block_device_get_metrics(devname, smart_timeout,
443 &smart_json)) {
444 dout(10) << "block_device_get_metrics failed for /dev/" << devname
445 << dendl;
446 continue;
447 }
448 json_map[devid] = smart_json;
449 }
450 json_spirit::write(json_map, out, json_spirit::pretty_print);
451 } else if (command == "heap") {
452 if (!ceph_using_tcmalloc()) {
453 err << "could not issue heap profiler command -- not using tcmalloc!";
454 r = -EOPNOTSUPP;
455 goto abort;
456 }
457 string cmd;
458 if (!cmd_getval(cmdmap, "heapcmd", cmd)) {
459 err << "unable to get value for command \"" << cmd << "\"";
460 r = -EINVAL;
461 goto abort;
462 }
463 std::vector<std::string> cmd_vec;
464 get_str_vec(cmd, cmd_vec);
465 string val;
466 if (cmd_getval(cmdmap, "value", val)) {
467 cmd_vec.push_back(val);
468 }
469 ceph_heap_profiler_handle_command(cmd_vec, out);
470 } else if (command == "compact") {
471 dout(1) << "triggering manual compaction" << dendl;
472 auto start = ceph::coarse_mono_clock::now();
473 store->compact_async();
474 auto end = ceph::coarse_mono_clock::now();
475 auto duration = ceph::to_seconds<double>(end - start);
476 dout(1) << "finished manual compaction in "
477 << duration << " seconds" << dendl;
478 out << "compacted " << g_conf().get_val<std::string>("mon_keyvaluedb")
479 << " in " << duration << " seconds";
480 } else {
481 ceph_abort_msg("bad AdminSocket command binding");
482 }
483 (read_only ? audit_clog->debug() : audit_clog->info())
484 << "from='admin socket' "
485 << "entity='admin socket' "
486 << "cmd=" << command << " "
487 << "args=" << args << ": finished";
488 return r;
489
490 abort:
491 (read_only ? audit_clog->debug() : audit_clog->info())
492 << "from='admin socket' "
493 << "entity='admin socket' "
494 << "cmd=" << command << " "
495 << "args=" << args << ": aborted";
496 return r;
497 }
498
499 void Monitor::handle_signal(int signum)
500 {
501 derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
502 if (signum == SIGHUP) {
503 sighup_handler(signum);
504 logmon()->reopen_logs();
505 } else {
506 ceph_assert(signum == SIGINT || signum == SIGTERM);
507 shutdown();
508 }
509 }
510
511 CompatSet Monitor::get_initial_supported_features()
512 {
513 CompatSet::FeatureSet ceph_mon_feature_compat;
514 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
515 CompatSet::FeatureSet ceph_mon_feature_incompat;
516 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
517 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
518 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
519 ceph_mon_feature_incompat);
520 }
521
522 CompatSet Monitor::get_supported_features()
523 {
524 CompatSet compat = get_initial_supported_features();
525 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
526 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
527 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
528 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
529 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
530 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
531 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
532 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
533 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS);
534 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_PACIFIC);
535 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_QUINCY);
536 return compat;
537 }
538
539 CompatSet Monitor::get_legacy_features()
540 {
541 CompatSet::FeatureSet ceph_mon_feature_compat;
542 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
543 CompatSet::FeatureSet ceph_mon_feature_incompat;
544 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
545 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
546 ceph_mon_feature_incompat);
547 }
548
549 int Monitor::check_features(MonitorDBStore *store)
550 {
551 CompatSet required = get_supported_features();
552 CompatSet ondisk;
553
554 read_features_off_disk(store, &ondisk);
555
556 if (!required.writeable(ondisk)) {
557 CompatSet diff = required.unsupported(ondisk);
558 generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
559 return -EPERM;
560 }
561
562 return 0;
563 }
564
565 void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
566 {
567 bufferlist featuresbl;
568 store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
569 if (featuresbl.length() == 0) {
570 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
571 << "Assuming it is old-style and introducing one." << dendl;
572 //we only want the baseline ~v.18 features assumed to be on disk.
573 //If new features are introduced this code needs to disappear or
574 //be made smarter.
575 *features = get_legacy_features();
576
577 features->encode(featuresbl);
578 auto t(std::make_shared<MonitorDBStore::Transaction>());
579 t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
580 store->apply_transaction(t);
581 } else {
582 auto it = featuresbl.cbegin();
583 features->decode(it);
584 }
585 }
586
587 void Monitor::read_features()
588 {
589 read_features_off_disk(store, &features);
590 dout(10) << "features " << features << dendl;
591
592 calc_quorum_requirements();
593 dout(10) << "required_features " << required_features << dendl;
594 }
595
596 void Monitor::write_features(MonitorDBStore::TransactionRef t)
597 {
598 bufferlist bl;
599 features.encode(bl);
600 t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
601 }
602
603 const char** Monitor::get_tracked_conf_keys() const
604 {
605 static const char* KEYS[] = {
606 "crushtool", // helpful for testing
607 "mon_election_timeout",
608 "mon_lease",
609 "mon_lease_renew_interval_factor",
610 "mon_lease_ack_timeout_factor",
611 "mon_accept_timeout_factor",
612 // clog & admin clog
613 "clog_to_monitors",
614 "clog_to_syslog",
615 "clog_to_syslog_facility",
616 "clog_to_syslog_level",
617 "clog_to_graylog",
618 "clog_to_graylog_host",
619 "clog_to_graylog_port",
620 "mon_cluster_log_to_file",
621 "host",
622 "fsid",
623 // periodic health to clog
624 "mon_health_to_clog",
625 "mon_health_to_clog_interval",
626 "mon_health_to_clog_tick_interval",
627 // scrub interval
628 "mon_scrub_interval",
629 "mon_allow_pool_delete",
630 // osdmap pruning - observed, not handled.
631 "mon_osdmap_full_prune_enabled",
632 "mon_osdmap_full_prune_min",
633 "mon_osdmap_full_prune_interval",
634 "mon_osdmap_full_prune_txsize",
635 // debug options - observed, not handled
636 "mon_debug_extra_checks",
637 "mon_debug_block_osdmap_trim",
638 NULL
639 };
640 return KEYS;
641 }
642
643 void Monitor::handle_conf_change(const ConfigProxy& conf,
644 const std::set<std::string> &changed)
645 {
646 sanitize_options();
647
648 dout(10) << __func__ << " " << changed << dendl;
649
650 if (changed.count("clog_to_monitors") ||
651 changed.count("clog_to_syslog") ||
652 changed.count("clog_to_syslog_level") ||
653 changed.count("clog_to_syslog_facility") ||
654 changed.count("clog_to_graylog") ||
655 changed.count("clog_to_graylog_host") ||
656 changed.count("clog_to_graylog_port") ||
657 changed.count("host") ||
658 changed.count("fsid")) {
659 update_log_clients();
660 }
661
662 if (changed.count("mon_health_to_clog") ||
663 changed.count("mon_health_to_clog_interval") ||
664 changed.count("mon_health_to_clog_tick_interval")) {
665 finisher.queue(new C_MonContext{this, [this, changed](int) {
666 std::lock_guard l{lock};
667 health_to_clog_update_conf(changed);
668 }});
669 }
670
671 if (changed.count("mon_scrub_interval")) {
672 auto scrub_interval =
673 conf.get_val<std::chrono::seconds>("mon_scrub_interval");
674 finisher.queue(new C_MonContext{this, [this, scrub_interval](int) {
675 std::lock_guard l{lock};
676 scrub_update_interval(scrub_interval);
677 }});
678 }
679 }
680
681 void Monitor::update_log_clients()
682 {
683 clog->parse_client_options(g_ceph_context);
684 audit_clog->parse_client_options(g_ceph_context);
685 }
686
687 int Monitor::sanitize_options()
688 {
689 int r = 0;
690
691 // mon_lease must be greater than mon_lease_renewal; otherwise we
692 // may incur in leases expiring before they are renewed.
693 if (g_conf()->mon_lease_renew_interval_factor >= 1.0) {
694 clog->error() << "mon_lease_renew_interval_factor ("
695 << g_conf()->mon_lease_renew_interval_factor
696 << ") must be less than 1.0";
697 r = -EINVAL;
698 }
699
700 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
701 // got time to renew the lease and get an ack for it. Having both options
702 // with the same value, for a given small vale, could mean timing out if
703 // the monitors happened to be overloaded -- or even under normal load for
704 // a small enough value.
705 if (g_conf()->mon_lease_ack_timeout_factor <= 1.0) {
706 clog->error() << "mon_lease_ack_timeout_factor ("
707 << g_conf()->mon_lease_ack_timeout_factor
708 << ") must be greater than 1.0";
709 r = -EINVAL;
710 }
711
712 return r;
713 }
714
715 int Monitor::preinit()
716 {
717 std::unique_lock l(lock);
718
719 dout(1) << "preinit fsid " << monmap->fsid << dendl;
720
721 int r = sanitize_options();
722 if (r < 0) {
723 derr << "option sanitization failed!" << dendl;
724 return r;
725 }
726
727 ceph_assert(!logger);
728 {
729 PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
730 pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess",
731 PerfCountersBuilder::PRIO_USEFUL);
732 pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions",
733 "sadd", PerfCountersBuilder::PRIO_INTERESTING);
734 pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions",
735 "srm", PerfCountersBuilder::PRIO_INTERESTING);
736 pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions",
737 "strm", PerfCountersBuilder::PRIO_USEFUL);
738 pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in",
739 "ecnt", PerfCountersBuilder::PRIO_USEFUL);
740 pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started",
741 "estt", PerfCountersBuilder::PRIO_INTERESTING);
742 pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won",
743 "ewon", PerfCountersBuilder::PRIO_INTERESTING);
744 pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost",
745 "elst", PerfCountersBuilder::PRIO_INTERESTING);
746 logger = pcb.create_perf_counters();
747 cct->get_perfcounters_collection()->add(logger);
748 }
749
750 ceph_assert(!cluster_logger);
751 {
752 PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
753 pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
754 pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
755 pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
756 pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
757 pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
758 pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
759 pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster", NULL, 0, unit_t(UNIT_BYTES));
760 pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space", NULL, 0, unit_t(UNIT_BYTES));
761 pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space", NULL, 0, unit_t(UNIT_BYTES));
762 pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
763 pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
764 pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
765 pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
766 pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
767 pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
768 pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
769 pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
770 pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
771 pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects", NULL, 0, unit_t(UNIT_BYTES));
772 cluster_logger = pcb.create_perf_counters();
773 }
774
775 paxos->init_logger();
776
777 // verify cluster_uuid
778 {
779 int r = check_fsid();
780 if (r == -ENOENT)
781 r = write_fsid();
782 if (r < 0) {
783 return r;
784 }
785 }
786
787 // open compatset
788 read_features();
789
790 // have we ever joined a quorum?
791 has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
792 dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
793
794 if (!has_ever_joined) {
795 // impose initial quorum restrictions?
796 list<string> initial_members;
797 get_str_list(g_conf()->mon_initial_members, initial_members);
798
799 if (!initial_members.empty()) {
800 dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
801
802 monmap->set_initial_members(
803 g_ceph_context, initial_members, name, messenger->get_myaddrs(),
804 &extra_probe_peers);
805
806 dout(10) << " monmap is " << *monmap << dendl;
807 dout(10) << " extra probe peers " << extra_probe_peers << dendl;
808 }
809 } else if (!monmap->contains(name)) {
810 derr << "not in monmap and have been in a quorum before; "
811 << "must have been removed" << dendl;
812 if (g_conf()->mon_force_quorum_join) {
813 dout(0) << "we should have died but "
814 << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
815 } else {
816 derr << "commit suicide!" << dendl;
817 return -ENOENT;
818 }
819 }
820
821 {
822 // We have a potentially inconsistent store state in hands. Get rid of it
823 // and start fresh.
824 bool clear_store = false;
825 if (store->exists("mon_sync", "in_sync")) {
826 dout(1) << __func__ << " clean up potentially inconsistent store state"
827 << dendl;
828 clear_store = true;
829 }
830
831 if (store->get("mon_sync", "force_sync") > 0) {
832 dout(1) << __func__ << " force sync by clearing store state" << dendl;
833 clear_store = true;
834 }
835
836 if (clear_store) {
837 set<string> sync_prefixes = get_sync_targets_names();
838 store->clear(sync_prefixes);
839 }
840 }
841
842 sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
843 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
844
845 init_paxos();
846
847 if (is_keyring_required()) {
848 // we need to bootstrap authentication keys so we can form an
849 // initial quorum.
850 if (authmon()->get_last_committed() == 0) {
851 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
852 bufferlist bl;
853 int err = store->get("mkfs", "keyring", bl);
854 if (err == 0 && bl.length() > 0) {
855 // Attempt to decode and extract keyring only if it is found.
856 KeyRing keyring;
857 auto p = bl.cbegin();
858 decode(keyring, p);
859 extract_save_mon_key(keyring);
860 }
861 }
862
863 string keyring_loc = g_conf()->mon_data + "/keyring";
864
865 r = keyring.load(cct, keyring_loc);
866 if (r < 0) {
867 EntityName mon_name;
868 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
869 EntityAuth mon_key;
870 if (key_server.get_auth(mon_name, mon_key)) {
871 dout(1) << "copying mon. key from old db to external keyring" << dendl;
872 keyring.add(mon_name, mon_key);
873 bufferlist bl;
874 keyring.encode_plaintext(bl);
875 write_default_keyring(bl);
876 } else {
877 derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
878 return r;
879 }
880 }
881 }
882
883 admin_hook = new AdminHook(this);
884 AdminSocket* admin_socket = cct->get_admin_socket();
885
886 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
887 l.unlock();
888 // register tell/asock commands
889 for (const auto& command : local_mon_commands) {
890 if (!command.is_tell()) {
891 continue;
892 }
893 const auto prefix = cmddesc_get_prefix(command.cmdstring);
894 if (prefix == "injectargs" ||
895 prefix == "version" ||
896 prefix == "tell") {
897 // not registerd by me
898 continue;
899 }
900 r = admin_socket->register_command(command.cmdstring, admin_hook,
901 command.helpstring);
902 ceph_assert(r == 0);
903 }
904 l.lock();
905
906 // add ourselves as a conf observer
907 g_conf().add_observer(this);
908
909 messenger->set_auth_client(this);
910 messenger->set_auth_server(this);
911 mgr_messenger->set_auth_client(this);
912
913 auth_registry.refresh_config();
914
915 return 0;
916 }
917
918 int Monitor::init()
919 {
920 dout(2) << "init" << dendl;
921 std::lock_guard l(lock);
922
923 finisher.start();
924
925 // start ticker
926 timer.init();
927 new_tick();
928
929 cpu_tp.start();
930
931 // i'm ready!
932 messenger->add_dispatcher_tail(this);
933
934 // kickstart pet mgrclient
935 mgr_client.init();
936 mgr_messenger->add_dispatcher_tail(&mgr_client);
937 mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
938 mgrmon()->prime_mgr_client();
939
940 // generate list of filestore OSDs
941 osdmon()->get_filestore_osd_list();
942
943 state = STATE_PROBING;
944 bootstrap();
945 // add features of myself into feature_map
946 session_map.feature_map.add_mon(con_self->get_features());
947 return 0;
948 }
949
950 void Monitor::init_paxos()
951 {
952 dout(10) << __func__ << dendl;
953 paxos->init();
954
955 // init services
956 for (auto& svc : paxos_service) {
957 svc->init();
958 }
959
960 refresh_from_paxos(NULL);
961 }
962
963 void Monitor::refresh_from_paxos(bool *need_bootstrap)
964 {
965 dout(10) << __func__ << dendl;
966
967 bufferlist bl;
968 int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
969 if (r >= 0) {
970 try {
971 auto p = bl.cbegin();
972 decode(fingerprint, p);
973 }
974 catch (ceph::buffer::error& e) {
975 dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
976 }
977 } else {
978 dout(10) << __func__ << " no cluster_fingerprint" << dendl;
979 }
980
981 for (auto& svc : paxos_service) {
982 svc->refresh(need_bootstrap);
983 }
984 for (auto& svc : paxos_service) {
985 svc->post_refresh();
986 }
987 load_metadata();
988 }
989
990 void Monitor::register_cluster_logger()
991 {
992 if (!cluster_logger_registered) {
993 dout(10) << "register_cluster_logger" << dendl;
994 cluster_logger_registered = true;
995 cct->get_perfcounters_collection()->add(cluster_logger);
996 } else {
997 dout(10) << "register_cluster_logger - already registered" << dendl;
998 }
999 }
1000
1001 void Monitor::unregister_cluster_logger()
1002 {
1003 if (cluster_logger_registered) {
1004 dout(10) << "unregister_cluster_logger" << dendl;
1005 cluster_logger_registered = false;
1006 cct->get_perfcounters_collection()->remove(cluster_logger);
1007 } else {
1008 dout(10) << "unregister_cluster_logger - not registered" << dendl;
1009 }
1010 }
1011
1012 void Monitor::update_logger()
1013 {
1014 cluster_logger->set(l_cluster_num_mon, monmap->size());
1015 cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
1016 }
1017
1018 void Monitor::shutdown()
1019 {
1020 dout(1) << "shutdown" << dendl;
1021
1022 lock.lock();
1023
1024 wait_for_paxos_write();
1025
1026 {
1027 std::lock_guard l(auth_lock);
1028 authmon()->_set_mon_num_rank(0, 0);
1029 }
1030
1031 state = STATE_SHUTDOWN;
1032
1033 lock.unlock();
1034 g_conf().remove_observer(this);
1035 lock.lock();
1036
1037 if (admin_hook) {
1038 cct->get_admin_socket()->unregister_commands(admin_hook);
1039 delete admin_hook;
1040 admin_hook = NULL;
1041 }
1042
1043 elector.shutdown();
1044
1045 mgr_client.shutdown();
1046
1047 lock.unlock();
1048 finisher.wait_for_empty();
1049 finisher.stop();
1050 lock.lock();
1051
1052 // clean up
1053 paxos->shutdown();
1054 for (auto& svc : paxos_service) {
1055 svc->shutdown();
1056 }
1057
1058 finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
1059 finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
1060
1061 timer.shutdown();
1062
1063 cpu_tp.stop();
1064
1065 remove_all_sessions();
1066
1067 log_client.shutdown();
1068
1069 // unlock before msgr shutdown...
1070 lock.unlock();
1071
1072 // shutdown messenger before removing logger from perfcounter collection,
1073 // otherwise _ms_dispatch() will try to update deleted logger
1074 messenger->shutdown();
1075 mgr_messenger->shutdown();
1076
1077 if (logger) {
1078 cct->get_perfcounters_collection()->remove(logger);
1079 }
1080 if (cluster_logger) {
1081 if (cluster_logger_registered)
1082 cct->get_perfcounters_collection()->remove(cluster_logger);
1083 delete cluster_logger;
1084 cluster_logger = NULL;
1085 }
1086 }
1087
1088 void Monitor::wait_for_paxos_write()
1089 {
1090 if (paxos->is_writing() || paxos->is_writing_previous()) {
1091 dout(10) << __func__ << " flushing pending write" << dendl;
1092 lock.unlock();
1093 store->flush();
1094 lock.lock();
1095 dout(10) << __func__ << " flushed pending write" << dendl;
1096 }
1097 }
1098
1099 void Monitor::respawn()
1100 {
1101 // --- WARNING TO FUTURE COPY/PASTERS ---
1102 // You must also add a call like
1103 //
1104 // ceph_pthread_setname(pthread_self(), "ceph-mon");
1105 //
1106 // to main() so that /proc/$pid/stat field 2 contains "(ceph-mon)"
1107 // instead of "(exe)", so that killall (and log rotation) will work.
1108
1109 dout(0) << __func__ << dendl;
1110
1111 char *new_argv[orig_argc+1];
1112 dout(1) << " e: '" << orig_argv[0] << "'" << dendl;
1113 for (int i=0; i<orig_argc; i++) {
1114 new_argv[i] = (char *)orig_argv[i];
1115 dout(1) << " " << i << ": '" << orig_argv[i] << "'" << dendl;
1116 }
1117 new_argv[orig_argc] = NULL;
1118
1119 /* Determine the path to our executable, test if Linux /proc/self/exe exists.
1120 * This allows us to exec the same executable even if it has since been
1121 * unlinked.
1122 */
1123 char exe_path[PATH_MAX] = "";
1124 #ifdef PROCPREFIX
1125 if (readlink(PROCPREFIX "/proc/self/exe", exe_path, PATH_MAX-1) != -1) {
1126 dout(1) << "respawning with exe " << exe_path << dendl;
1127 strcpy(exe_path, PROCPREFIX "/proc/self/exe");
1128 } else {
1129 #else
1130 {
1131 #endif
1132 /* Print CWD for the user's interest */
1133 char buf[PATH_MAX];
1134 char *cwd = getcwd(buf, sizeof(buf));
1135 ceph_assert(cwd);
1136 dout(1) << " cwd " << cwd << dendl;
1137
1138 /* Fall back to a best-effort: just running in our CWD */
1139 strncpy(exe_path, orig_argv[0], PATH_MAX-1);
1140 }
1141
1142 dout(1) << " exe_path " << exe_path << dendl;
1143
1144 unblock_all_signals(NULL);
1145 execv(exe_path, new_argv);
1146
1147 dout(0) << "respawn execv " << orig_argv[0]
1148 << " failed with " << cpp_strerror(errno) << dendl;
1149
1150 // We have to assert out here, because suicide() returns, and callers
1151 // to respawn expect it never to return.
1152 ceph_abort();
1153 }
1154
1155 void Monitor::bootstrap()
1156 {
1157 dout(10) << "bootstrap" << dendl;
1158 wait_for_paxos_write();
1159
1160 sync_reset_requester();
1161 unregister_cluster_logger();
1162 cancel_probe_timeout();
1163
1164 if (monmap->get_epoch() == 0) {
1165 dout(10) << "reverting to legacy ranks for seed monmap (epoch 0)" << dendl;
1166 monmap->calc_legacy_ranks();
1167 }
1168 dout(10) << "monmap " << *monmap << dendl;
1169 {
1170 auto from_release = monmap->min_mon_release;
1171 ostringstream err;
1172 if (!can_upgrade_from(from_release, "min_mon_release", err)) {
1173 derr << "current monmap has " << err.str() << " stopping." << dendl;
1174 exit(0);
1175 }
1176 }
1177 // note my rank
1178 int newrank = monmap->get_rank(messenger->get_myaddrs());
1179 if (newrank < 0 && rank >= 0) {
1180 // was i ever part of the quorum?
1181 if (has_ever_joined) {
1182 dout(0) << " removed from monmap, suicide." << dendl;
1183 exit(0);
1184 }
1185 elector.notify_clear_peer_state();
1186 }
1187 if (newrank >= 0 &&
1188 monmap->get_addrs(newrank) != messenger->get_myaddrs()) {
1189 dout(0) << " monmap addrs for rank " << newrank << " changed, i am "
1190 << messenger->get_myaddrs()
1191 << ", monmap is " << monmap->get_addrs(newrank) << ", respawning"
1192 << dendl;
1193
1194 if (monmap->get_epoch()) {
1195 // store this map in temp mon_sync location so that we use it on
1196 // our next startup
1197 derr << " stashing newest monmap " << monmap->get_epoch()
1198 << " for next startup" << dendl;
1199 bufferlist bl;
1200 monmap->encode(bl, -1);
1201 auto t(std::make_shared<MonitorDBStore::Transaction>());
1202 t->put("mon_sync", "temp_newer_monmap", bl);
1203 store->apply_transaction(t);
1204 }
1205
1206 respawn();
1207 }
1208 if (newrank != rank) {
1209 dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
1210 messenger->set_myname(entity_name_t::MON(newrank));
1211 rank = newrank;
1212 elector.notify_rank_changed(rank);
1213
1214 // reset all connections, or else our peers will think we are someone else.
1215 messenger->mark_down_all();
1216 }
1217
1218 // reset
1219 state = STATE_PROBING;
1220
1221 _reset();
1222
1223 // sync store
1224 if (g_conf()->mon_compact_on_bootstrap) {
1225 dout(10) << "bootstrap -- triggering compaction" << dendl;
1226 store->compact();
1227 dout(10) << "bootstrap -- finished compaction" << dendl;
1228 }
1229
1230 // stretch mode bits
1231 set_elector_disallowed_leaders(false);
1232
1233 // singleton monitor?
1234 if (monmap->size() == 1 && rank == 0) {
1235 win_standalone_election();
1236 return;
1237 }
1238
1239 reset_probe_timeout();
1240
1241 // i'm outside the quorum
1242 if (monmap->contains(name))
1243 outside_quorum.insert(name);
1244
1245 // probe monitors
1246 dout(10) << "probing other monitors" << dendl;
1247 for (unsigned i = 0; i < monmap->size(); i++) {
1248 if ((int)i != rank)
1249 send_mon_message(
1250 new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
1251 ceph_release()),
1252 i);
1253 }
1254 for (auto& av : extra_probe_peers) {
1255 if (av != messenger->get_myaddrs()) {
1256 messenger->send_to_mon(
1257 new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
1258 ceph_release()),
1259 av);
1260 }
1261 }
1262 }
1263
1264 bool Monitor::_add_bootstrap_peer_hint(std::string_view cmd,
1265 const cmdmap_t& cmdmap,
1266 ostream& ss)
1267 {
1268 if (is_leader() || is_peon()) {
1269 ss << "mon already active; ignoring bootstrap hint";
1270 return true;
1271 }
1272
1273 entity_addrvec_t addrs;
1274 string addrstr;
1275 if (cmd_getval(cmdmap, "addr", addrstr)) {
1276 dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' addr '"
1277 << addrstr << "'" << dendl;
1278
1279 entity_addr_t addr;
1280 if (!addr.parse(addrstr, entity_addr_t::TYPE_ANY)) {
1281 ss << "failed to parse addrs '" << addrstr
1282 << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1283 return false;
1284 }
1285
1286 addrs.v.push_back(addr);
1287 if (addr.get_port() == 0) {
1288 addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
1289 addrs.v[0].set_port(CEPH_MON_PORT_IANA);
1290 addrs.v.push_back(addr);
1291 addrs.v[1].set_type(entity_addr_t::TYPE_LEGACY);
1292 addrs.v[1].set_port(CEPH_MON_PORT_LEGACY);
1293 } else if (addr.get_type() == entity_addr_t::TYPE_ANY) {
1294 if (addr.get_port() == CEPH_MON_PORT_LEGACY) {
1295 addrs.v[0].set_type(entity_addr_t::TYPE_LEGACY);
1296 } else {
1297 addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
1298 }
1299 }
1300 } else if (cmd_getval(cmdmap, "addrv", addrstr)) {
1301 dout(10) << "_add_bootstrap_peer_hintv '" << cmd << "' addrv '"
1302 << addrstr << "'" << dendl;
1303 const char *end = 0;
1304 if (!addrs.parse(addrstr.c_str(), &end)) {
1305 ss << "failed to parse addrs '" << addrstr
1306 << "'; syntax is 'add_bootstrap_peer_hintv v2:ip:port[,v1:ip:port]'";
1307 return false;
1308 }
1309 } else {
1310 ss << "no addr or addrv provided";
1311 return false;
1312 }
1313
1314 extra_probe_peers.insert(addrs);
1315 ss << "adding peer " << addrs << " to list: " << extra_probe_peers;
1316 return true;
1317 }
1318
1319 // called by bootstrap(), or on leader|peon -> electing
1320 void Monitor::_reset()
1321 {
1322 dout(10) << __func__ << dendl;
1323
1324 // disable authentication
1325 {
1326 std::lock_guard l(auth_lock);
1327 authmon()->_set_mon_num_rank(0, 0);
1328 }
1329
1330 cancel_probe_timeout();
1331 timecheck_finish();
1332 health_events_cleanup();
1333 health_check_log_times.clear();
1334 scrub_event_cancel();
1335
1336 leader_since = utime_t();
1337 quorum_since = {};
1338 if (!quorum.empty()) {
1339 exited_quorum = ceph_clock_now();
1340 }
1341 quorum.clear();
1342 outside_quorum.clear();
1343 quorum_feature_map.clear();
1344
1345 scrub_reset();
1346
1347 paxos->restart();
1348
1349 for (auto& svc : paxos_service) {
1350 svc->restart();
1351 }
1352 }
1353
1354
1355 // -----------------------------------------------------------
1356 // sync
1357
1358 set<string> Monitor::get_sync_targets_names()
1359 {
1360 set<string> targets;
1361 targets.insert(paxos->get_name());
1362 for (auto& svc : paxos_service) {
1363 svc->get_store_prefixes(targets);
1364 }
1365 return targets;
1366 }
1367
1368
1369 void Monitor::sync_timeout()
1370 {
1371 dout(10) << __func__ << dendl;
1372 ceph_assert(state == STATE_SYNCHRONIZING);
1373 bootstrap();
1374 }
1375
1376 void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1377 {
1378 dout(1) << __func__ << dendl;
1379
1380 MonMap latest_monmap;
1381
1382 // Grab latest monmap from MonmapMonitor
1383 bufferlist monmon_bl;
1384 int err = monmon()->get_monmap(monmon_bl);
1385 if (err < 0) {
1386 if (err != -ENOENT) {
1387 derr << __func__
1388 << " something wrong happened while reading the store: "
1389 << cpp_strerror(err) << dendl;
1390 ceph_abort_msg("error reading the store");
1391 }
1392 } else {
1393 latest_monmap.decode(monmon_bl);
1394 }
1395
1396 // Grab last backed up monmap (if any) and compare epochs
1397 if (store->exists("mon_sync", "latest_monmap")) {
1398 bufferlist backup_bl;
1399 int err = store->get("mon_sync", "latest_monmap", backup_bl);
1400 if (err < 0) {
1401 derr << __func__
1402 << " something wrong happened while reading the store: "
1403 << cpp_strerror(err) << dendl;
1404 ceph_abort_msg("error reading the store");
1405 }
1406 ceph_assert(backup_bl.length() > 0);
1407
1408 MonMap backup_monmap;
1409 backup_monmap.decode(backup_bl);
1410
1411 if (backup_monmap.epoch > latest_monmap.epoch)
1412 latest_monmap = backup_monmap;
1413 }
1414
1415 // Check if our current monmap's epoch is greater than the one we've
1416 // got so far.
1417 if (monmap->epoch > latest_monmap.epoch)
1418 latest_monmap = *monmap;
1419
1420 dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1421
1422 latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1423 }
1424
1425 void Monitor::sync_reset_requester()
1426 {
1427 dout(10) << __func__ << dendl;
1428
1429 if (sync_timeout_event) {
1430 timer.cancel_event(sync_timeout_event);
1431 sync_timeout_event = NULL;
1432 }
1433
1434 sync_provider = entity_addrvec_t();
1435 sync_cookie = 0;
1436 sync_full = false;
1437 sync_start_version = 0;
1438 }
1439
1440 void Monitor::sync_reset_provider()
1441 {
1442 dout(10) << __func__ << dendl;
1443 sync_providers.clear();
1444 }
1445
1446 void Monitor::sync_start(entity_addrvec_t &addrs, bool full)
1447 {
1448 dout(10) << __func__ << " " << addrs << (full ? " full" : " recent") << dendl;
1449
1450 ceph_assert(state == STATE_PROBING ||
1451 state == STATE_SYNCHRONIZING);
1452 state = STATE_SYNCHRONIZING;
1453
1454 // make sure are not a provider for anyone!
1455 sync_reset_provider();
1456
1457 sync_full = full;
1458
1459 if (sync_full) {
1460 // stash key state, and mark that we are syncing
1461 auto t(std::make_shared<MonitorDBStore::Transaction>());
1462 sync_stash_critical_state(t);
1463 t->put("mon_sync", "in_sync", 1);
1464
1465 sync_last_committed_floor = std::max(sync_last_committed_floor, paxos->get_version());
1466 dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1467 << sync_last_committed_floor << dendl;
1468 t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1469
1470 store->apply_transaction(t);
1471
1472 ceph_assert(g_conf()->mon_sync_requester_kill_at != 1);
1473
1474 // clear the underlying store
1475 set<string> targets = get_sync_targets_names();
1476 dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1477 store->clear(targets);
1478
1479 // make sure paxos knows it has been reset. this prevents a
1480 // bootstrap and then different probe reply order from possibly
1481 // deciding a partial or no sync is needed.
1482 paxos->init();
1483
1484 ceph_assert(g_conf()->mon_sync_requester_kill_at != 2);
1485 }
1486
1487 // assume 'other' as the leader. We will update the leader once we receive
1488 // a reply to the sync start.
1489 sync_provider = addrs;
1490
1491 sync_reset_timeout();
1492
1493 MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1494 if (!sync_full)
1495 m->last_committed = paxos->get_version();
1496 messenger->send_to_mon(m, sync_provider);
1497 }
1498
1499 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1500 {
1501 dout(10) << __func__ << dendl;
1502 bufferlist backup_monmap;
1503 sync_obtain_latest_monmap(backup_monmap);
1504 ceph_assert(backup_monmap.length() > 0);
1505 t->put("mon_sync", "latest_monmap", backup_monmap);
1506 }
1507
1508 void Monitor::sync_reset_timeout()
1509 {
1510 dout(10) << __func__ << dendl;
1511 if (sync_timeout_event)
1512 timer.cancel_event(sync_timeout_event);
1513 sync_timeout_event = timer.add_event_after(
1514 g_conf()->mon_sync_timeout,
1515 new C_MonContext{this, [this](int) {
1516 sync_timeout();
1517 }});
1518 }
1519
1520 void Monitor::sync_finish(version_t last_committed)
1521 {
1522 dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1523
1524 ceph_assert(g_conf()->mon_sync_requester_kill_at != 7);
1525
1526 if (sync_full) {
1527 // finalize the paxos commits
1528 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1529 paxos->read_and_prepare_transactions(tx, sync_start_version,
1530 last_committed);
1531 tx->put(paxos->get_name(), "last_committed", last_committed);
1532
1533 dout(30) << __func__ << " final tx dump:\n";
1534 JSONFormatter f(true);
1535 tx->dump(&f);
1536 f.flush(*_dout);
1537 *_dout << dendl;
1538
1539 store->apply_transaction(tx);
1540 }
1541
1542 ceph_assert(g_conf()->mon_sync_requester_kill_at != 8);
1543
1544 auto t(std::make_shared<MonitorDBStore::Transaction>());
1545 t->erase("mon_sync", "in_sync");
1546 t->erase("mon_sync", "force_sync");
1547 t->erase("mon_sync", "last_committed_floor");
1548 store->apply_transaction(t);
1549
1550 ceph_assert(g_conf()->mon_sync_requester_kill_at != 9);
1551
1552 init_paxos();
1553
1554 ceph_assert(g_conf()->mon_sync_requester_kill_at != 10);
1555
1556 bootstrap();
1557 }
1558
1559 void Monitor::handle_sync(MonOpRequestRef op)
1560 {
1561 auto m = op->get_req<MMonSync>();
1562 dout(10) << __func__ << " " << *m << dendl;
1563 switch (m->op) {
1564
1565 // provider ---------
1566
1567 case MMonSync::OP_GET_COOKIE_FULL:
1568 case MMonSync::OP_GET_COOKIE_RECENT:
1569 handle_sync_get_cookie(op);
1570 break;
1571 case MMonSync::OP_GET_CHUNK:
1572 handle_sync_get_chunk(op);
1573 break;
1574
1575 // client -----------
1576
1577 case MMonSync::OP_COOKIE:
1578 handle_sync_cookie(op);
1579 break;
1580
1581 case MMonSync::OP_CHUNK:
1582 case MMonSync::OP_LAST_CHUNK:
1583 handle_sync_chunk(op);
1584 break;
1585 case MMonSync::OP_NO_COOKIE:
1586 handle_sync_no_cookie(op);
1587 break;
1588
1589 default:
1590 dout(0) << __func__ << " unknown op " << m->op << dendl;
1591 ceph_abort_msg("unknown op");
1592 }
1593 }
1594
1595 // leader
1596
1597 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1598 {
1599 auto m = op->get_req<MMonSync>();
1600 MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1601 m->get_connection()->send_message(reply);
1602 }
1603
1604 void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1605 {
1606 auto m = op->get_req<MMonSync>();
1607 if (is_synchronizing()) {
1608 _sync_reply_no_cookie(op);
1609 return;
1610 }
1611
1612 ceph_assert(g_conf()->mon_sync_provider_kill_at != 1);
1613
1614 // make sure they can understand us.
1615 if ((required_features ^ m->get_connection()->get_features()) &
1616 required_features) {
1617 dout(5) << " ignoring peer mon." << m->get_source().num()
1618 << " has features " << std::hex
1619 << m->get_connection()->get_features()
1620 << " but we require " << required_features << std::dec << dendl;
1621 return;
1622 }
1623
1624 // make up a unique cookie. include election epoch (which persists
1625 // across restarts for the whole cluster) and a counter for this
1626 // process instance. there is no need to be unique *across*
1627 // monitors, though.
1628 uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
1629 ceph_assert(sync_providers.count(cookie) == 0);
1630
1631 dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1632
1633 SyncProvider& sp = sync_providers[cookie];
1634 sp.cookie = cookie;
1635 sp.addrs = m->get_source_addrs();
1636 sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
1637
1638 set<string> sync_targets;
1639 if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1640 // full scan
1641 sync_targets = get_sync_targets_names();
1642 sp.last_committed = paxos->get_version();
1643 sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1644 sp.full = true;
1645 dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1646 } else {
1647 // just catch up paxos
1648 sp.last_committed = m->last_committed;
1649 }
1650 dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1651
1652 MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1653 reply->last_committed = sp.last_committed;
1654 m->get_connection()->send_message(reply);
1655 }
1656
1657 void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1658 {
1659 auto m = op->get_req<MMonSync>();
1660 dout(10) << __func__ << " " << *m << dendl;
1661
1662 if (sync_providers.count(m->cookie) == 0) {
1663 dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1664 _sync_reply_no_cookie(op);
1665 return;
1666 }
1667
1668 ceph_assert(g_conf()->mon_sync_provider_kill_at != 2);
1669
1670 SyncProvider& sp = sync_providers[m->cookie];
1671 sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
1672
1673 if (sp.last_committed < paxos->get_first_committed() &&
1674 paxos->get_first_committed() > 1) {
1675 dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1676 << " < our fc " << paxos->get_first_committed() << dendl;
1677 sync_providers.erase(m->cookie);
1678 _sync_reply_no_cookie(op);
1679 return;
1680 }
1681
1682 MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1683 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1684
1685 int bytes_left = g_conf()->mon_sync_max_payload_size;
1686 int keys_left = g_conf()->mon_sync_max_payload_keys;
1687 while (sp.last_committed < paxos->get_version() &&
1688 bytes_left > 0 &&
1689 keys_left > 0) {
1690 bufferlist bl;
1691 sp.last_committed++;
1692
1693 int err = store->get(paxos->get_name(), sp.last_committed, bl);
1694 ceph_assert(err == 0);
1695
1696 tx->put(paxos->get_name(), sp.last_committed, bl);
1697 bytes_left -= bl.length();
1698 --keys_left;
1699 dout(20) << __func__ << " including paxos state " << sp.last_committed
1700 << dendl;
1701 }
1702 reply->last_committed = sp.last_committed;
1703
1704 if (sp.full && bytes_left > 0 && keys_left > 0) {
1705 sp.synchronizer->get_chunk_tx(tx, bytes_left, keys_left);
1706 sp.last_key = sp.synchronizer->get_last_key();
1707 reply->last_key = sp.last_key;
1708 }
1709
1710 if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1711 sp.last_committed < paxos->get_version()) {
1712 dout(10) << __func__ << " chunk, through version " << sp.last_committed
1713 << " key " << sp.last_key << dendl;
1714 } else {
1715 dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1716 << " key " << sp.last_key << dendl;
1717 reply->op = MMonSync::OP_LAST_CHUNK;
1718
1719 ceph_assert(g_conf()->mon_sync_provider_kill_at != 3);
1720
1721 // clean up our local state
1722 sync_providers.erase(sp.cookie);
1723 }
1724
1725 encode(*tx, reply->chunk_bl);
1726
1727 m->get_connection()->send_message(reply);
1728 }
1729
1730 // requester
1731
1732 void Monitor::handle_sync_cookie(MonOpRequestRef op)
1733 {
1734 auto m = op->get_req<MMonSync>();
1735 dout(10) << __func__ << " " << *m << dendl;
1736 if (sync_cookie) {
1737 dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1738 return;
1739 }
1740 if (m->get_source_addrs() != sync_provider) {
1741 dout(10) << __func__ << " source does not match, discarding" << dendl;
1742 return;
1743 }
1744 sync_cookie = m->cookie;
1745 sync_start_version = m->last_committed;
1746
1747 sync_reset_timeout();
1748 sync_get_next_chunk();
1749
1750 ceph_assert(g_conf()->mon_sync_requester_kill_at != 3);
1751 }
1752
1753 void Monitor::sync_get_next_chunk()
1754 {
1755 dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
1756 if (g_conf()->mon_inject_sync_get_chunk_delay > 0) {
1757 dout(20) << __func__ << " injecting delay of " << g_conf()->mon_inject_sync_get_chunk_delay << dendl;
1758 usleep((long long)(g_conf()->mon_inject_sync_get_chunk_delay * 1000000.0));
1759 }
1760 MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
1761 messenger->send_to_mon(r, sync_provider);
1762
1763 ceph_assert(g_conf()->mon_sync_requester_kill_at != 4);
1764 }
1765
1766 void Monitor::handle_sync_chunk(MonOpRequestRef op)
1767 {
1768 auto m = op->get_req<MMonSync>();
1769 dout(10) << __func__ << " " << *m << dendl;
1770
1771 if (m->cookie != sync_cookie) {
1772 dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1773 return;
1774 }
1775 if (m->get_source_addrs() != sync_provider) {
1776 dout(10) << __func__ << " source does not match, discarding" << dendl;
1777 return;
1778 }
1779
1780 ceph_assert(state == STATE_SYNCHRONIZING);
1781 ceph_assert(g_conf()->mon_sync_requester_kill_at != 5);
1782
1783 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1784 tx->append_from_encoded(m->chunk_bl);
1785
1786 dout(30) << __func__ << " tx dump:\n";
1787 JSONFormatter f(true);
1788 tx->dump(&f);
1789 f.flush(*_dout);
1790 *_dout << dendl;
1791
1792 store->apply_transaction(tx);
1793
1794 ceph_assert(g_conf()->mon_sync_requester_kill_at != 6);
1795
1796 if (!sync_full) {
1797 dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1798 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1799 paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1800 m->last_committed);
1801 tx->put(paxos->get_name(), "last_committed", m->last_committed);
1802
1803 dout(30) << __func__ << " tx dump:\n";
1804 JSONFormatter f(true);
1805 tx->dump(&f);
1806 f.flush(*_dout);
1807 *_dout << dendl;
1808
1809 store->apply_transaction(tx);
1810 paxos->init(); // to refresh what we just wrote
1811 }
1812
1813 if (m->op == MMonSync::OP_CHUNK) {
1814 sync_reset_timeout();
1815 sync_get_next_chunk();
1816 } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1817 sync_finish(m->last_committed);
1818 }
1819 }
1820
1821 void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1822 {
1823 dout(10) << __func__ << dendl;
1824 bootstrap();
1825 }
1826
1827 void Monitor::sync_trim_providers()
1828 {
1829 dout(20) << __func__ << dendl;
1830
1831 utime_t now = ceph_clock_now();
1832 map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1833 while (p != sync_providers.end()) {
1834 if (now > p->second.timeout) {
1835 dout(10) << __func__ << " expiring cookie " << p->second.cookie
1836 << " for " << p->second.addrs << dendl;
1837 sync_providers.erase(p++);
1838 } else {
1839 ++p;
1840 }
1841 }
1842 }
1843
1844 // ---------------------------------------------------
1845 // probe
1846
1847 void Monitor::cancel_probe_timeout()
1848 {
1849 if (probe_timeout_event) {
1850 dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1851 timer.cancel_event(probe_timeout_event);
1852 probe_timeout_event = NULL;
1853 } else {
1854 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1855 }
1856 }
1857
1858 void Monitor::reset_probe_timeout()
1859 {
1860 cancel_probe_timeout();
1861 probe_timeout_event = new C_MonContext{this, [this](int r) {
1862 probe_timeout(r);
1863 }};
1864 double t = g_conf()->mon_probe_timeout;
1865 if (timer.add_event_after(t, probe_timeout_event)) {
1866 dout(10) << "reset_probe_timeout " << probe_timeout_event
1867 << " after " << t << " seconds" << dendl;
1868 } else {
1869 probe_timeout_event = nullptr;
1870 }
1871 }
1872
1873 void Monitor::probe_timeout(int r)
1874 {
1875 dout(4) << "probe_timeout " << probe_timeout_event << dendl;
1876 ceph_assert(is_probing() || is_synchronizing());
1877 ceph_assert(probe_timeout_event);
1878 probe_timeout_event = NULL;
1879 bootstrap();
1880 }
1881
1882 void Monitor::handle_probe(MonOpRequestRef op)
1883 {
1884 auto m = op->get_req<MMonProbe>();
1885 dout(10) << "handle_probe " << *m << dendl;
1886
1887 if (m->fsid != monmap->fsid) {
1888 dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1889 return;
1890 }
1891
1892 switch (m->op) {
1893 case MMonProbe::OP_PROBE:
1894 handle_probe_probe(op);
1895 break;
1896
1897 case MMonProbe::OP_REPLY:
1898 handle_probe_reply(op);
1899 break;
1900
1901 case MMonProbe::OP_MISSING_FEATURES:
1902 derr << __func__ << " require release " << (int)m->mon_release << " > "
1903 << (int)ceph_release()
1904 << ", or missing features (have " << CEPH_FEATURES_ALL
1905 << ", required " << m->required_features
1906 << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL) << ")"
1907 << dendl;
1908 break;
1909 }
1910 }
1911
1912 void Monitor::handle_probe_probe(MonOpRequestRef op)
1913 {
1914 auto m = op->get_req<MMonProbe>();
1915
1916 dout(10) << "handle_probe_probe " << m->get_source_inst() << " " << *m
1917 << " features " << m->get_connection()->get_features() << dendl;
1918 uint64_t missing = required_features & ~m->get_connection()->get_features();
1919 if ((m->mon_release != ceph_release_t::unknown &&
1920 m->mon_release < monmap->min_mon_release) ||
1921 missing) {
1922 dout(1) << " peer " << m->get_source_addr()
1923 << " release " << m->mon_release
1924 << " < min_mon_release " << monmap->min_mon_release
1925 << ", or missing features " << missing << dendl;
1926 MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1927 name, has_ever_joined, monmap->min_mon_release);
1928 m->required_features = required_features;
1929 m->get_connection()->send_message(r);
1930 goto out;
1931 }
1932
1933 if (!is_probing() && !is_synchronizing()) {
1934 // If the probing mon is way ahead of us, we need to re-bootstrap.
1935 // Normally we capture this case when we initially bootstrap, but
1936 // it is possible we pass those checks (we overlap with
1937 // quorum-to-be) but fail to join a quorum before it moves past
1938 // us. We need to be kicked back to bootstrap so we can
1939 // synchonize, not keep calling elections.
1940 if (paxos->get_version() + 1 < m->paxos_first_version) {
1941 dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1942 << "ahead of us, re-bootstrapping" << dendl;
1943 bootstrap();
1944 goto out;
1945
1946 }
1947 }
1948
1949 MMonProbe *r;
1950 r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined,
1951 ceph_release());
1952 r->name = name;
1953 r->quorum = quorum;
1954 r->leader = leader;
1955 monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1956 r->paxos_first_version = paxos->get_first_committed();
1957 r->paxos_last_version = paxos->get_version();
1958 m->get_connection()->send_message(r);
1959
1960 // did we discover a peer here?
1961 if (!monmap->contains(m->get_source_addr())) {
1962 dout(1) << " adding peer " << m->get_source_addrs()
1963 << " to list of hints" << dendl;
1964 extra_probe_peers.insert(m->get_source_addrs());
1965 } else {
1966 elector.begin_peer_ping(monmap->get_rank(m->get_source_addr()));
1967 }
1968
1969 out:
1970 return;
1971 }
1972
1973 void Monitor::handle_probe_reply(MonOpRequestRef op)
1974 {
1975 auto m = op->get_req<MMonProbe>();
1976 dout(10) << "handle_probe_reply " << m->get_source_inst()
1977 << " " << *m << dendl;
1978 dout(10) << " monmap is " << *monmap << dendl;
1979
1980 // discover name and addrs during probing or electing states.
1981 if (!is_probing() && !is_electing()) {
1982 return;
1983 }
1984
1985 // newer map, or they've joined a quorum and we haven't?
1986 bufferlist mybl;
1987 monmap->encode(mybl, m->get_connection()->get_features());
1988 // make sure it's actually different; the checks below err toward
1989 // taking the other guy's map, which could cause us to loop.
1990 if (!mybl.contents_equal(m->monmap_bl)) {
1991 MonMap *newmap = new MonMap;
1992 newmap->decode(m->monmap_bl);
1993 if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
1994 !has_ever_joined)) {
1995 dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
1996 << ", mine was " << monmap->get_epoch() << dendl;
1997 delete newmap;
1998 monmap->decode(m->monmap_bl);
1999 notify_new_monmap(false);
2000
2001 bootstrap();
2002 return;
2003 }
2004 delete newmap;
2005 }
2006
2007 // rename peer?
2008 string peer_name = monmap->get_name(m->get_source_addr());
2009 if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
2010 dout(10) << " renaming peer " << m->get_source_addr() << " "
2011 << peer_name << " -> " << m->name << " in my monmap"
2012 << dendl;
2013 monmap->rename(peer_name, m->name);
2014
2015 if (is_electing()) {
2016 bootstrap();
2017 return;
2018 }
2019 } else if (peer_name.size()) {
2020 dout(10) << " peer name is " << peer_name << dendl;
2021 } else {
2022 dout(10) << " peer " << m->get_source_addr() << " not in map" << dendl;
2023 }
2024
2025 // new initial peer?
2026 if (monmap->get_epoch() == 0 &&
2027 monmap->contains(m->name) &&
2028 monmap->get_addrs(m->name).front().is_blank_ip()) {
2029 dout(1) << " learned initial mon " << m->name
2030 << " addrs " << m->get_source_addrs() << dendl;
2031 monmap->set_addrvec(m->name, m->get_source_addrs());
2032
2033 bootstrap();
2034 return;
2035 }
2036
2037 // end discover phase
2038 if (!is_probing()) {
2039 return;
2040 }
2041
2042 ceph_assert(paxos != NULL);
2043
2044 if (is_synchronizing()) {
2045 dout(10) << " currently syncing" << dendl;
2046 return;
2047 }
2048
2049 entity_addrvec_t other = m->get_source_addrs();
2050
2051 if (m->paxos_last_version < sync_last_committed_floor) {
2052 dout(10) << " peer paxos versions [" << m->paxos_first_version
2053 << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
2054 << sync_last_committed_floor << ", ignoring"
2055 << dendl;
2056 } else {
2057 if (paxos->get_version() < m->paxos_first_version &&
2058 m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
2059 dout(10) << " peer paxos first versions [" << m->paxos_first_version
2060 << "," << m->paxos_last_version << "]"
2061 << " vs my version " << paxos->get_version()
2062 << " (too far ahead)"
2063 << dendl;
2064 cancel_probe_timeout();
2065 sync_start(other, true);
2066 return;
2067 }
2068 if (paxos->get_version() + g_conf()->paxos_max_join_drift < m->paxos_last_version) {
2069 dout(10) << " peer paxos last version " << m->paxos_last_version
2070 << " vs my version " << paxos->get_version()
2071 << " (too far ahead)"
2072 << dendl;
2073 cancel_probe_timeout();
2074 sync_start(other, false);
2075 return;
2076 }
2077 }
2078
2079 // did the existing cluster complete upgrade to luminous?
2080 if (osdmon()->osdmap.get_epoch()) {
2081 if (osdmon()->osdmap.require_osd_release < ceph_release_t::luminous) {
2082 derr << __func__ << " existing cluster has not completed upgrade to"
2083 << " luminous; 'ceph osd require_osd_release luminous' before"
2084 << " upgrading" << dendl;
2085 exit(0);
2086 }
2087 if (!osdmon()->osdmap.test_flag(CEPH_OSDMAP_PURGED_SNAPDIRS) ||
2088 !osdmon()->osdmap.test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) {
2089 derr << __func__ << " existing cluster has not completed a full luminous"
2090 << " scrub to purge legacy snapdir objects; please scrub before"
2091 << " upgrading beyond luminous." << dendl;
2092 exit(0);
2093 }
2094 }
2095
2096 // is there an existing quorum?
2097 if (m->quorum.size()) {
2098 dout(10) << " existing quorum " << m->quorum << dendl;
2099
2100 dout(10) << " peer paxos version " << m->paxos_last_version
2101 << " vs my version " << paxos->get_version()
2102 << " (ok)"
2103 << dendl;
2104 bool in_map = false;
2105 const auto my_info = monmap->mon_info.find(name);
2106 const map<string,string> *map_crush_loc{nullptr};
2107 if (my_info != monmap->mon_info.end()) {
2108 in_map = true;
2109 map_crush_loc = &my_info->second.crush_loc;
2110 }
2111 if (in_map &&
2112 !monmap->get_addrs(name).front().is_blank_ip() &&
2113 (!need_set_crush_loc || (*map_crush_loc == crush_loc))) {
2114 // i'm part of the cluster; just initiate a new election
2115 start_election();
2116 } else {
2117 dout(10) << " ready to join, but i'm not in the monmap/"
2118 "my addr is blank/location is wrong, trying to join" << dendl;
2119 send_mon_message(new MMonJoin(monmap->fsid, name,
2120 messenger->get_myaddrs(), crush_loc,
2121 need_set_crush_loc),
2122 m->leader);
2123 }
2124 } else {
2125 if (monmap->contains(m->name)) {
2126 dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
2127 outside_quorum.insert(m->name);
2128 } else {
2129 dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
2130 return;
2131 }
2132
2133 unsigned need = monmap->min_quorum_size();
2134 dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
2135 if (outside_quorum.size() >= need) {
2136 if (outside_quorum.count(name)) {
2137 dout(10) << " that's enough to form a new quorum, calling election" << dendl;
2138 start_election();
2139 } else {
2140 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
2141 }
2142 } else {
2143 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
2144 }
2145 }
2146 }
2147
2148 void Monitor::join_election()
2149 {
2150 dout(10) << __func__ << dendl;
2151 wait_for_paxos_write();
2152 _reset();
2153 state = STATE_ELECTING;
2154
2155 logger->inc(l_mon_num_elections);
2156 }
2157
2158 void Monitor::start_election()
2159 {
2160 dout(10) << "start_election" << dendl;
2161 wait_for_paxos_write();
2162 _reset();
2163 state = STATE_ELECTING;
2164
2165 logger->inc(l_mon_num_elections);
2166 logger->inc(l_mon_election_call);
2167
2168 clog->info() << "mon." << name << " calling monitor election";
2169 elector.call_election();
2170 }
2171
2172 void Monitor::win_standalone_election()
2173 {
2174 dout(1) << "win_standalone_election" << dendl;
2175
2176 // bump election epoch, in case the previous epoch included other
2177 // monitors; we need to be able to make the distinction.
2178 elector.declare_standalone_victory();
2179
2180 rank = monmap->get_rank(name);
2181 ceph_assert(rank == 0);
2182 set<int> q;
2183 q.insert(rank);
2184
2185 map<int,Metadata> metadata;
2186 collect_metadata(&metadata[0]);
2187
2188 win_election(elector.get_epoch(), q,
2189 CEPH_FEATURES_ALL,
2190 ceph::features::mon::get_supported(),
2191 ceph_release(),
2192 metadata);
2193 }
2194
2195 const utime_t& Monitor::get_leader_since() const
2196 {
2197 ceph_assert(state == STATE_LEADER);
2198 return leader_since;
2199 }
2200
2201 epoch_t Monitor::get_epoch()
2202 {
2203 return elector.get_epoch();
2204 }
2205
2206 void Monitor::_finish_svc_election()
2207 {
2208 ceph_assert(state == STATE_LEADER || state == STATE_PEON);
2209
2210 for (auto& svc : paxos_service) {
2211 // we already called election_finished() on monmon(); avoid callig twice
2212 if (state == STATE_LEADER && svc.get() == monmon())
2213 continue;
2214 svc->election_finished();
2215 }
2216 }
2217
2218 void Monitor::win_election(epoch_t epoch, const set<int>& active, uint64_t features,
2219 const mon_feature_t& mon_features,
2220 ceph_release_t min_mon_release,
2221 const map<int,Metadata>& metadata)
2222 {
2223 dout(10) << __func__ << " epoch " << epoch << " quorum " << active
2224 << " features " << features
2225 << " mon_features " << mon_features
2226 << " min_mon_release " << min_mon_release
2227 << dendl;
2228 ceph_assert(is_electing());
2229 state = STATE_LEADER;
2230 leader_since = ceph_clock_now();
2231 quorum_since = mono_clock::now();
2232 leader = rank;
2233 quorum = active;
2234 quorum_con_features = features;
2235 quorum_mon_features = mon_features;
2236 quorum_min_mon_release = min_mon_release;
2237 pending_metadata = metadata;
2238 outside_quorum.clear();
2239
2240 clog->info() << "mon." << name << " is new leader, mons " << get_quorum_names()
2241 << " in quorum (ranks " << quorum << ")";
2242
2243 set_leader_commands(get_local_commands(mon_features));
2244
2245 paxos->leader_init();
2246 // NOTE: tell monmap monitor first. This is important for the
2247 // bootstrap case to ensure that the very first paxos proposal
2248 // codifies the monmap. Otherwise any manner of chaos can ensue
2249 // when monitors are call elections or participating in a paxos
2250 // round without agreeing on who the participants are.
2251 monmon()->election_finished();
2252 _finish_svc_election();
2253
2254 logger->inc(l_mon_election_win);
2255
2256 // inject new metadata in first transaction.
2257 {
2258 // include previous metadata for missing mons (that aren't part of
2259 // the current quorum).
2260 map<int,Metadata> m = metadata;
2261 for (unsigned rank = 0; rank < monmap->size(); ++rank) {
2262 if (m.count(rank) == 0 &&
2263 mon_metadata.count(rank)) {
2264 m[rank] = mon_metadata[rank];
2265 }
2266 }
2267
2268 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
2269 // a new transaction immediately after the election finishes. We should
2270 // do that anyway for other reasons, though.
2271 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
2272 bufferlist bl;
2273 encode(m, bl);
2274 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
2275 }
2276
2277 finish_election();
2278 if (monmap->size() > 1 &&
2279 monmap->get_epoch() > 0) {
2280 timecheck_start();
2281 health_tick_start();
2282
2283 // Freshen the health status before doing health_to_clog in case
2284 // our just-completed election changed the health
2285 healthmon()->wait_for_active_ctx(new LambdaContext([this](int r){
2286 dout(20) << "healthmon now active" << dendl;
2287 healthmon()->tick();
2288 if (healthmon()->is_proposing()) {
2289 dout(20) << __func__ << " healthmon proposing, waiting" << dendl;
2290 healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext{this,
2291 [this](int r){
2292 ceph_assert(ceph_mutex_is_locked_by_me(lock));
2293 do_health_to_clog_interval();
2294 }});
2295
2296 } else {
2297 do_health_to_clog_interval();
2298 }
2299 }));
2300
2301 scrub_event_start();
2302 }
2303 }
2304
2305 void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
2306 uint64_t features,
2307 const mon_feature_t& mon_features,
2308 ceph_release_t min_mon_release)
2309 {
2310 state = STATE_PEON;
2311 leader_since = utime_t();
2312 quorum_since = mono_clock::now();
2313 leader = l;
2314 quorum = q;
2315 outside_quorum.clear();
2316 quorum_con_features = features;
2317 quorum_mon_features = mon_features;
2318 quorum_min_mon_release = min_mon_release;
2319 dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
2320 << " quorum is " << quorum << " features are " << quorum_con_features
2321 << " mon_features are " << quorum_mon_features
2322 << " min_mon_release " << min_mon_release
2323 << dendl;
2324
2325 paxos->peon_init();
2326 _finish_svc_election();
2327
2328 logger->inc(l_mon_election_lose);
2329
2330 finish_election();
2331 }
2332
2333 namespace {
2334 std::string collect_compression_algorithms()
2335 {
2336 ostringstream os;
2337 bool printed = false;
2338 for (auto [name, key] : Compressor::compression_algorithms) {
2339 if (printed) {
2340 os << ", ";
2341 } else {
2342 printed = true;
2343 }
2344 std::ignore = key;
2345 os << name;
2346 }
2347 return os.str();
2348 }
2349 }
2350
2351 void Monitor::collect_metadata(Metadata *m)
2352 {
2353 collect_sys_info(m, g_ceph_context);
2354 (*m)["addrs"] = stringify(messenger->get_myaddrs());
2355 (*m)["compression_algorithms"] = collect_compression_algorithms();
2356
2357 // infer storage device
2358 string devname = store->get_devname();
2359 set<string> devnames;
2360 get_raw_devices(devname, &devnames);
2361 map<string,string> errs;
2362 get_device_metadata(devnames, m, &errs);
2363 for (auto& i : errs) {
2364 dout(1) << __func__ << " " << i.first << ": " << i.second << dendl;
2365 }
2366 }
2367
2368 void Monitor::finish_election()
2369 {
2370 apply_quorum_to_compatset_features();
2371 apply_monmap_to_compatset_features();
2372 timecheck_finish();
2373 exited_quorum = utime_t();
2374 finish_contexts(g_ceph_context, waitfor_quorum);
2375 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2376 resend_routed_requests();
2377 update_logger();
2378 register_cluster_logger();
2379
2380 // enable authentication
2381 {
2382 std::lock_guard l(auth_lock);
2383 authmon()->_set_mon_num_rank(monmap->size(), rank);
2384 }
2385
2386 // am i named and located properly?
2387 string cur_name = monmap->get_name(messenger->get_myaddrs());
2388 const auto my_infop = monmap->mon_info.find(cur_name);
2389 const map<string,string>& map_crush_loc = my_infop->second.crush_loc;
2390
2391 if (cur_name != name ||
2392 (need_set_crush_loc && map_crush_loc != crush_loc)) {
2393 dout(10) << " renaming/moving myself from " << cur_name << "/"
2394 << map_crush_loc <<" -> " << name << "/" << crush_loc << dendl;
2395 send_mon_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddrs(),
2396 crush_loc, need_set_crush_loc),
2397 leader);
2398 return;
2399 }
2400 do_stretch_mode_election_work();
2401 }
2402
2403 void Monitor::_apply_compatset_features(CompatSet &new_features)
2404 {
2405 if (new_features.compare(features) != 0) {
2406 CompatSet diff = features.unsupported(new_features);
2407 dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2408 features = new_features;
2409
2410 auto t = std::make_shared<MonitorDBStore::Transaction>();
2411 write_features(t);
2412 store->apply_transaction(t);
2413
2414 calc_quorum_requirements();
2415 }
2416 }
2417
2418 void Monitor::apply_quorum_to_compatset_features()
2419 {
2420 CompatSet new_features(features);
2421 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
2422 if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2423 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2424 }
2425 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2426 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
2427 dout(5) << __func__ << dendl;
2428 _apply_compatset_features(new_features);
2429 }
2430
2431 void Monitor::apply_monmap_to_compatset_features()
2432 {
2433 CompatSet new_features(features);
2434 mon_feature_t monmap_features = monmap->get_required_features();
2435
2436 /* persistent monmap features may go into the compatset.
2437 * optional monmap features may not - why?
2438 * because optional monmap features may be set/unset by the admin,
2439 * and possibly by other means that haven't yet been thought out,
2440 * so we can't make the monitor enforce them on start - because they
2441 * may go away.
2442 * this, of course, does not invalidate setting a compatset feature
2443 * for an optional feature - as long as you make sure to clean it up
2444 * once you unset it.
2445 */
2446 if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
2447 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2448 ceph::features::mon::FEATURE_KRAKEN));
2449 // this feature should only ever be set if the quorum supports it.
2450 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
2451 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2452 }
2453 if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
2454 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2455 ceph::features::mon::FEATURE_LUMINOUS));
2456 // this feature should only ever be set if the quorum supports it.
2457 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
2458 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
2459 }
2460 if (monmap_features.contains_all(ceph::features::mon::FEATURE_MIMIC)) {
2461 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2462 ceph::features::mon::FEATURE_MIMIC));
2463 // this feature should only ever be set if the quorum supports it.
2464 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_MIMIC));
2465 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
2466 }
2467 if (monmap_features.contains_all(ceph::features::mon::FEATURE_NAUTILUS)) {
2468 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2469 ceph::features::mon::FEATURE_NAUTILUS));
2470 // this feature should only ever be set if the quorum supports it.
2471 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_NAUTILUS));
2472 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
2473 }
2474 if (monmap_features.contains_all(ceph::features::mon::FEATURE_OCTOPUS)) {
2475 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2476 ceph::features::mon::FEATURE_OCTOPUS));
2477 // this feature should only ever be set if the quorum supports it.
2478 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_OCTOPUS));
2479 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS);
2480 }
2481 if (monmap_features.contains_all(ceph::features::mon::FEATURE_PACIFIC)) {
2482 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2483 ceph::features::mon::FEATURE_PACIFIC));
2484 // this feature should only ever be set if the quorum supports it.
2485 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_PACIFIC));
2486 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_PACIFIC);
2487 }
2488 if (monmap_features.contains_all(ceph::features::mon::FEATURE_QUINCY)) {
2489 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2490 ceph::features::mon::FEATURE_QUINCY));
2491 // this feature should only ever be set if the quorum supports it.
2492 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_QUINCY));
2493 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_QUINCY);
2494 }
2495
2496 dout(5) << __func__ << dendl;
2497 _apply_compatset_features(new_features);
2498 }
2499
2500 void Monitor::calc_quorum_requirements()
2501 {
2502 required_features = 0;
2503
2504 // compatset
2505 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2506 required_features |= CEPH_FEATURE_OSDMAP_ENC;
2507 }
2508 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2509 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2510 }
2511 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) {
2512 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2513 }
2514 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_MIMIC)) {
2515 required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
2516 }
2517 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS)) {
2518 required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
2519 CEPH_FEATUREMASK_CEPHX_V2;
2520 }
2521 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS)) {
2522 required_features |= CEPH_FEATUREMASK_SERVER_OCTOPUS;
2523 }
2524 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_PACIFIC)) {
2525 required_features |= CEPH_FEATUREMASK_SERVER_PACIFIC;
2526 }
2527 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_QUINCY)) {
2528 required_features |= CEPH_FEATUREMASK_SERVER_QUINCY;
2529 }
2530
2531 // monmap
2532 if (monmap->get_required_features().contains_all(
2533 ceph::features::mon::FEATURE_KRAKEN)) {
2534 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2535 }
2536 if (monmap->get_required_features().contains_all(
2537 ceph::features::mon::FEATURE_LUMINOUS)) {
2538 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2539 }
2540 if (monmap->get_required_features().contains_all(
2541 ceph::features::mon::FEATURE_MIMIC)) {
2542 required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
2543 }
2544 if (monmap->get_required_features().contains_all(
2545 ceph::features::mon::FEATURE_NAUTILUS)) {
2546 required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
2547 CEPH_FEATUREMASK_CEPHX_V2;
2548 }
2549 dout(10) << __func__ << " required_features " << required_features << dendl;
2550 }
2551
2552 void Monitor::get_combined_feature_map(FeatureMap *fm)
2553 {
2554 *fm += session_map.feature_map;
2555 for (auto id : quorum) {
2556 if (id != rank) {
2557 *fm += quorum_feature_map[id];
2558 }
2559 }
2560 }
2561
2562 void Monitor::sync_force(Formatter *f)
2563 {
2564 auto tx(std::make_shared<MonitorDBStore::Transaction>());
2565 sync_stash_critical_state(tx);
2566 tx->put("mon_sync", "force_sync", 1);
2567 store->apply_transaction(tx);
2568
2569 f->open_object_section("sync_force");
2570 f->dump_int("ret", 0);
2571 f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2572 f->close_section(); // sync_force
2573 }
2574
2575 void Monitor::_quorum_status(Formatter *f, ostream& ss)
2576 {
2577 bool free_formatter = false;
2578
2579 if (!f) {
2580 // louzy/lazy hack: default to json if no formatter has been defined
2581 f = new JSONFormatter();
2582 free_formatter = true;
2583 }
2584 f->open_object_section("quorum_status");
2585 f->dump_int("election_epoch", get_epoch());
2586
2587 f->open_array_section("quorum");
2588 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2589 f->dump_int("mon", *p);
2590 f->close_section(); // quorum
2591
2592 list<string> quorum_names = get_quorum_names();
2593 f->open_array_section("quorum_names");
2594 for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2595 f->dump_string("mon", *p);
2596 f->close_section(); // quorum_names
2597
2598 f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(leader));
2599
2600 if (!quorum.empty()) {
2601 f->dump_int(
2602 "quorum_age",
2603 quorum_age());
2604 }
2605
2606 f->open_object_section("features");
2607 f->dump_stream("quorum_con") << quorum_con_features;
2608 quorum_mon_features.dump(f, "quorum_mon");
2609 f->close_section();
2610
2611 f->open_object_section("monmap");
2612 monmap->dump(f);
2613 f->close_section(); // monmap
2614
2615 f->close_section(); // quorum_status
2616 f->flush(ss);
2617 if (free_formatter)
2618 delete f;
2619 }
2620
2621 void Monitor::get_mon_status(Formatter *f)
2622 {
2623 f->open_object_section("mon_status");
2624 f->dump_string("name", name);
2625 f->dump_int("rank", rank);
2626 f->dump_string("state", get_state_name());
2627 f->dump_int("election_epoch", get_epoch());
2628
2629 f->open_array_section("quorum");
2630 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2631 f->dump_int("mon", *p);
2632 }
2633 f->close_section(); // quorum
2634
2635 if (!quorum.empty()) {
2636 f->dump_int(
2637 "quorum_age",
2638 quorum_age());
2639 }
2640
2641 f->open_object_section("features");
2642 f->dump_stream("required_con") << required_features;
2643 mon_feature_t req_mon_features = get_required_mon_features();
2644 req_mon_features.dump(f, "required_mon");
2645 f->dump_stream("quorum_con") << quorum_con_features;
2646 quorum_mon_features.dump(f, "quorum_mon");
2647 f->close_section(); // features
2648
2649 f->open_array_section("outside_quorum");
2650 for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2651 f->dump_string("mon", *p);
2652 f->close_section(); // outside_quorum
2653
2654 f->open_array_section("extra_probe_peers");
2655 for (set<entity_addrvec_t>::iterator p = extra_probe_peers.begin();
2656 p != extra_probe_peers.end();
2657 ++p) {
2658 f->dump_object("peer", *p);
2659 }
2660 f->close_section(); // extra_probe_peers
2661
2662 f->open_array_section("sync_provider");
2663 for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2664 p != sync_providers.end();
2665 ++p) {
2666 f->dump_unsigned("cookie", p->second.cookie);
2667 f->dump_object("addrs", p->second.addrs);
2668 f->dump_stream("timeout") << p->second.timeout;
2669 f->dump_unsigned("last_committed", p->second.last_committed);
2670 f->dump_stream("last_key") << p->second.last_key;
2671 }
2672 f->close_section();
2673
2674 if (is_synchronizing()) {
2675 f->open_object_section("sync");
2676 f->dump_stream("sync_provider") << sync_provider;
2677 f->dump_unsigned("sync_cookie", sync_cookie);
2678 f->dump_unsigned("sync_start_version", sync_start_version);
2679 f->close_section();
2680 }
2681
2682 if (g_conf()->mon_sync_provider_kill_at > 0)
2683 f->dump_int("provider_kill_at", g_conf()->mon_sync_provider_kill_at);
2684 if (g_conf()->mon_sync_requester_kill_at > 0)
2685 f->dump_int("requester_kill_at", g_conf()->mon_sync_requester_kill_at);
2686
2687 f->open_object_section("monmap");
2688 monmap->dump(f);
2689 f->close_section();
2690
2691 f->dump_object("feature_map", session_map.feature_map);
2692 f->dump_bool("stretch_mode", stretch_mode_engaged);
2693 f->close_section(); // mon_status
2694 }
2695
2696
2697 // health status to clog
2698
2699 void Monitor::health_tick_start()
2700 {
2701 if (!cct->_conf->mon_health_to_clog ||
2702 cct->_conf->mon_health_to_clog_tick_interval <= 0)
2703 return;
2704
2705 dout(15) << __func__ << dendl;
2706
2707 health_tick_stop();
2708 health_tick_event = timer.add_event_after(
2709 cct->_conf->mon_health_to_clog_tick_interval,
2710 new C_MonContext{this, [this](int r) {
2711 if (r < 0)
2712 return;
2713 health_tick_start();
2714 }});
2715 }
2716
2717 void Monitor::health_tick_stop()
2718 {
2719 dout(15) << __func__ << dendl;
2720
2721 if (health_tick_event) {
2722 timer.cancel_event(health_tick_event);
2723 health_tick_event = NULL;
2724 }
2725 }
2726
2727 ceph::real_clock::time_point Monitor::health_interval_calc_next_update()
2728 {
2729 auto now = ceph::real_clock::now();
2730
2731 auto secs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch());
2732 int remainder = secs.count() % cct->_conf->mon_health_to_clog_interval;
2733 int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
2734 auto next = secs + std::chrono::seconds(adjustment);
2735
2736 dout(20) << __func__
2737 << " now: " << now << ","
2738 << " next: " << next << ","
2739 << " interval: " << cct->_conf->mon_health_to_clog_interval
2740 << dendl;
2741
2742 return ceph::real_clock::time_point{next};
2743 }
2744
2745 void Monitor::health_interval_start()
2746 {
2747 dout(15) << __func__ << dendl;
2748
2749 if (!cct->_conf->mon_health_to_clog ||
2750 cct->_conf->mon_health_to_clog_interval <= 0) {
2751 return;
2752 }
2753
2754 health_interval_stop();
2755 auto next = health_interval_calc_next_update();
2756 health_interval_event = new C_MonContext{this, [this](int r) {
2757 if (r < 0)
2758 return;
2759 do_health_to_clog_interval();
2760 }};
2761 if (!timer.add_event_at(next, health_interval_event)) {
2762 health_interval_event = nullptr;
2763 }
2764 }
2765
2766 void Monitor::health_interval_stop()
2767 {
2768 dout(15) << __func__ << dendl;
2769 if (health_interval_event) {
2770 timer.cancel_event(health_interval_event);
2771 }
2772 health_interval_event = NULL;
2773 }
2774
2775 void Monitor::health_events_cleanup()
2776 {
2777 health_tick_stop();
2778 health_interval_stop();
2779 health_status_cache.reset();
2780 }
2781
2782 void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2783 {
2784 dout(20) << __func__ << dendl;
2785
2786 if (changed.count("mon_health_to_clog")) {
2787 if (!cct->_conf->mon_health_to_clog) {
2788 health_events_cleanup();
2789 return;
2790 } else {
2791 if (!health_tick_event) {
2792 health_tick_start();
2793 }
2794 if (!health_interval_event) {
2795 health_interval_start();
2796 }
2797 }
2798 }
2799
2800 if (changed.count("mon_health_to_clog_interval")) {
2801 if (cct->_conf->mon_health_to_clog_interval <= 0) {
2802 health_interval_stop();
2803 } else {
2804 health_interval_start();
2805 }
2806 }
2807
2808 if (changed.count("mon_health_to_clog_tick_interval")) {
2809 if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2810 health_tick_stop();
2811 } else {
2812 health_tick_start();
2813 }
2814 }
2815 }
2816
2817 void Monitor::do_health_to_clog_interval()
2818 {
2819 // outputting to clog may have been disabled in the conf
2820 // since we were scheduled.
2821 if (!cct->_conf->mon_health_to_clog ||
2822 cct->_conf->mon_health_to_clog_interval <= 0)
2823 return;
2824
2825 dout(10) << __func__ << dendl;
2826
2827 // do we have a cached value for next_clog_update? if not,
2828 // do we know when the last update was?
2829
2830 do_health_to_clog(true);
2831 health_interval_start();
2832 }
2833
2834 void Monitor::do_health_to_clog(bool force)
2835 {
2836 // outputting to clog may have been disabled in the conf
2837 // since we were scheduled.
2838 if (!cct->_conf->mon_health_to_clog ||
2839 cct->_conf->mon_health_to_clog_interval <= 0)
2840 return;
2841
2842 dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2843
2844 string summary;
2845 health_status_t level = healthmon()->get_health_status(false, nullptr, &summary);
2846 if (!force &&
2847 summary == health_status_cache.summary &&
2848 level == health_status_cache.overall)
2849 return;
2850
2851 if (g_conf()->mon_health_detail_to_clog &&
2852 summary != health_status_cache.summary &&
2853 level != HEALTH_OK) {
2854 string details;
2855 level = healthmon()->get_health_status(true, nullptr, &details);
2856 clog->health(level) << "Health detail: " << details;
2857 } else {
2858 clog->health(level) << "overall " << summary;
2859 }
2860 health_status_cache.summary = summary;
2861 health_status_cache.overall = level;
2862 }
2863
2864 void Monitor::log_health(
2865 const health_check_map_t& updated,
2866 const health_check_map_t& previous,
2867 MonitorDBStore::TransactionRef t)
2868 {
2869 if (!g_conf()->mon_health_to_clog) {
2870 return;
2871 }
2872
2873 const utime_t now = ceph_clock_now();
2874
2875 // FIXME: log atomically as part of @t instead of using clog.
2876 dout(10) << __func__ << " updated " << updated.checks.size()
2877 << " previous " << previous.checks.size()
2878 << dendl;
2879 const auto min_log_period = g_conf().get_val<int64_t>(
2880 "mon_health_log_update_period");
2881 for (auto& p : updated.checks) {
2882 auto q = previous.checks.find(p.first);
2883 bool logged = false;
2884 if (q == previous.checks.end()) {
2885 // new
2886 ostringstream ss;
2887 ss << "Health check failed: " << p.second.summary << " ("
2888 << p.first << ")";
2889 clog->health(p.second.severity) << ss.str();
2890
2891 logged = true;
2892 } else {
2893 if (p.second.summary != q->second.summary ||
2894 p.second.severity != q->second.severity) {
2895
2896 auto status_iter = health_check_log_times.find(p.first);
2897 if (status_iter != health_check_log_times.end()) {
2898 if (p.second.severity == q->second.severity &&
2899 now - status_iter->second.updated_at < min_log_period) {
2900 // We already logged this recently and the severity is unchanged,
2901 // so skip emitting an update of the summary string.
2902 // We'll get an update out of tick() later if the check
2903 // is still failing.
2904 continue;
2905 }
2906 }
2907
2908 // summary or severity changed (ignore detail changes at this level)
2909 ostringstream ss;
2910 ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
2911 clog->health(p.second.severity) << ss.str();
2912
2913 logged = true;
2914 }
2915 }
2916 // Record the time at which we last logged, so that we can check this
2917 // when considering whether/when to print update messages.
2918 if (logged) {
2919 auto iter = health_check_log_times.find(p.first);
2920 if (iter == health_check_log_times.end()) {
2921 health_check_log_times.emplace(p.first, HealthCheckLogStatus(
2922 p.second.severity, p.second.summary, now));
2923 } else {
2924 iter->second = HealthCheckLogStatus(
2925 p.second.severity, p.second.summary, now);
2926 }
2927 }
2928 }
2929 for (auto& p : previous.checks) {
2930 if (!updated.checks.count(p.first)) {
2931 // cleared
2932 ostringstream ss;
2933 if (p.first == "DEGRADED_OBJECTS") {
2934 clog->info() << "All degraded objects recovered";
2935 } else if (p.first == "OSD_FLAGS") {
2936 clog->info() << "OSD flags cleared";
2937 } else {
2938 clog->info() << "Health check cleared: " << p.first << " (was: "
2939 << p.second.summary << ")";
2940 }
2941
2942 if (health_check_log_times.count(p.first)) {
2943 health_check_log_times.erase(p.first);
2944 }
2945 }
2946 }
2947
2948 if (previous.checks.size() && updated.checks.size() == 0) {
2949 // We might be going into a fully healthy state, check
2950 // other subsystems
2951 bool any_checks = false;
2952 for (auto& svc : paxos_service) {
2953 if (&(svc->get_health_checks()) == &(previous)) {
2954 // Ignore the ones we're clearing right now
2955 continue;
2956 }
2957
2958 if (svc->get_health_checks().checks.size() > 0) {
2959 any_checks = true;
2960 break;
2961 }
2962 }
2963 if (!any_checks) {
2964 clog->info() << "Cluster is now healthy";
2965 }
2966 }
2967 }
2968
2969 void Monitor::update_pending_metadata()
2970 {
2971 Metadata metadata;
2972 collect_metadata(&metadata);
2973 size_t version_size = mon_metadata[rank]["ceph_version_short"].size();
2974 const std::string current_version = mon_metadata[rank]["ceph_version_short"];
2975 const std::string pending_version = metadata["ceph_version_short"];
2976
2977 if (current_version.compare(0, version_size, pending_version) < 0) {
2978 mgr_client.update_daemon_metadata("mon", name, metadata);
2979 }
2980 }
2981
2982 void Monitor::get_cluster_status(stringstream &ss, Formatter *f,
2983 MonSession *session)
2984 {
2985 if (f)
2986 f->open_object_section("status");
2987
2988 const auto&& fs_names = session->get_allowed_fs_names();
2989
2990 if (f) {
2991 f->dump_stream("fsid") << monmap->get_fsid();
2992 healthmon()->get_health_status(false, f, nullptr);
2993 f->dump_unsigned("election_epoch", get_epoch());
2994 {
2995 f->open_array_section("quorum");
2996 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2997 f->dump_int("rank", *p);
2998 f->close_section();
2999 f->open_array_section("quorum_names");
3000 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
3001 f->dump_string("id", monmap->get_name(*p));
3002 f->close_section();
3003 f->dump_int(
3004 "quorum_age",
3005 quorum_age());
3006 }
3007 f->open_object_section("monmap");
3008 monmap->dump_summary(f);
3009 f->close_section();
3010 f->open_object_section("osdmap");
3011 osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
3012 f->close_section();
3013 f->open_object_section("pgmap");
3014 mgrstatmon()->print_summary(f, NULL);
3015 f->close_section();
3016 f->open_object_section("fsmap");
3017
3018 FSMap fsmap_copy = mdsmon()->get_fsmap();
3019 if (!fs_names.empty()) {
3020 fsmap_copy.filter(fs_names);
3021 }
3022 const FSMap *fsmapp = &fsmap_copy;
3023
3024 fsmapp->print_summary(f, NULL);
3025 f->close_section();
3026 f->open_object_section("mgrmap");
3027 mgrmon()->get_map().print_summary(f, nullptr);
3028 f->close_section();
3029
3030 f->dump_object("servicemap", mgrstatmon()->get_service_map());
3031
3032 f->open_object_section("progress_events");
3033 for (auto& i : mgrstatmon()->get_progress_events()) {
3034 f->dump_object(i.first.c_str(), i.second);
3035 }
3036 f->close_section();
3037
3038 f->close_section();
3039 } else {
3040 ss << " cluster:\n";
3041 ss << " id: " << monmap->get_fsid() << "\n";
3042
3043 string health;
3044 healthmon()->get_health_status(false, nullptr, &health,
3045 "\n ", "\n ");
3046 ss << " health: " << health << "\n";
3047
3048 ss << "\n \n services:\n";
3049 {
3050 size_t maxlen = 3;
3051 auto& service_map = mgrstatmon()->get_service_map();
3052 for (auto& p : service_map.services) {
3053 maxlen = std::max(maxlen, p.first.size());
3054 }
3055 string spacing(maxlen - 3, ' ');
3056 const auto quorum_names = get_quorum_names();
3057 const auto mon_count = monmap->mon_info.size();
3058 auto mnow = ceph::mono_clock::now();
3059 ss << " mon: " << spacing << mon_count << " daemons, quorum "
3060 << quorum_names << " (age " << timespan_str(mnow - quorum_since) << ")";
3061 if (quorum_names.size() != mon_count) {
3062 std::list<std::string> out_of_q;
3063 for (size_t i = 0; i < monmap->ranks.size(); ++i) {
3064 if (quorum.count(i) == 0) {
3065 out_of_q.push_back(monmap->ranks[i]);
3066 }
3067 }
3068 ss << ", out of quorum: " << joinify(out_of_q.begin(),
3069 out_of_q.end(), std::string(", "));
3070 }
3071 ss << "\n";
3072 if (mgrmon()->in_use()) {
3073 ss << " mgr: " << spacing;
3074 mgrmon()->get_map().print_summary(nullptr, &ss);
3075 ss << "\n";
3076 }
3077
3078 FSMap fsmap_copy = mdsmon()->get_fsmap();
3079 if (!fs_names.empty()) {
3080 fsmap_copy.filter(fs_names);
3081 }
3082 const FSMap *fsmapp = &fsmap_copy;
3083
3084 if (fsmapp->filesystem_count() > 0 and mdsmon()->should_print_status()){
3085 ss << " mds: " << spacing;
3086 fsmapp->print_daemon_summary(ss);
3087 ss << "\n";
3088 }
3089
3090 ss << " osd: " << spacing;
3091 osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
3092 ss << "\n";
3093 for (auto& p : service_map.services) {
3094 const std::string &service = p.first;
3095 // filter out normal ceph entity types
3096 if (ServiceMap::is_normal_ceph_entity(service)) {
3097 continue;
3098 }
3099 ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
3100 << p.second.get_summary() << "\n";
3101 }
3102 }
3103
3104 if (auto& service_map = mgrstatmon()->get_service_map();
3105 std::any_of(service_map.services.begin(),
3106 service_map.services.end(),
3107 [](auto& service) {
3108 return service.second.has_running_tasks();
3109 })) {
3110 ss << "\n \n task status:\n";
3111 for (auto& [name, service] : service_map.services) {
3112 ss << service.get_task_summary(name);
3113 }
3114 }
3115
3116 ss << "\n \n data:\n";
3117 mdsmon()->print_fs_summary(ss);
3118 mgrstatmon()->print_summary(NULL, &ss);
3119
3120 auto& pem = mgrstatmon()->get_progress_events();
3121 if (!pem.empty()) {
3122 ss << "\n \n progress:\n";
3123 for (auto& i : pem) {
3124 if (i.second.add_to_ceph_s){
3125 ss << " " << i.second.message << "\n";
3126 }
3127 }
3128 }
3129 ss << "\n ";
3130 }
3131 }
3132
3133 void Monitor::_generate_command_map(cmdmap_t& cmdmap,
3134 map<string,string> &param_str_map)
3135 {
3136 for (auto p = cmdmap.begin(); p != cmdmap.end(); ++p) {
3137 if (p->first == "prefix")
3138 continue;
3139 if (p->first == "caps") {
3140 vector<string> cv;
3141 if (cmd_getval(cmdmap, "caps", cv) &&
3142 cv.size() % 2 == 0) {
3143 for (unsigned i = 0; i < cv.size(); i += 2) {
3144 string k = string("caps_") + cv[i];
3145 param_str_map[k] = cv[i + 1];
3146 }
3147 continue;
3148 }
3149 }
3150 param_str_map[p->first] = cmd_vartype_stringify(p->second);
3151 }
3152 }
3153
3154 const MonCommand *Monitor::_get_moncommand(
3155 const string &cmd_prefix,
3156 const vector<MonCommand>& cmds)
3157 {
3158 for (auto& c : cmds) {
3159 if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
3160 return &c;
3161 }
3162 }
3163 return nullptr;
3164 }
3165
3166 bool Monitor::_allowed_command(MonSession *s, const string &module,
3167 const string &prefix, const cmdmap_t& cmdmap,
3168 const map<string,string>& param_str_map,
3169 const MonCommand *this_cmd) {
3170
3171 bool cmd_r = this_cmd->requires_perm('r');
3172 bool cmd_w = this_cmd->requires_perm('w');
3173 bool cmd_x = this_cmd->requires_perm('x');
3174
3175 bool capable = s->caps.is_capable(
3176 g_ceph_context,
3177 s->entity_name,
3178 module, prefix, param_str_map,
3179 cmd_r, cmd_w, cmd_x,
3180 s->get_peer_socket_addr());
3181
3182 dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
3183 return capable;
3184 }
3185
3186 void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
3187 Formatter *f,
3188 uint64_t features,
3189 bufferlist *rdata)
3190 {
3191 int cmdnum = 0;
3192 f->open_object_section("command_descriptions");
3193 for (const auto &cmd : commands) {
3194 unsigned flags = cmd.flags;
3195 ostringstream secname;
3196 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
3197 dump_cmddesc_to_json(f, features, secname.str(),
3198 cmd.cmdstring, cmd.helpstring, cmd.module,
3199 cmd.req_perms, flags);
3200 cmdnum++;
3201 }
3202 f->close_section(); // command_descriptions
3203
3204 f->flush(*rdata);
3205 }
3206
3207 bool Monitor::is_keyring_required()
3208 {
3209 return auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX) ||
3210 auth_service_required.is_supported_auth(CEPH_AUTH_CEPHX) ||
3211 auth_cluster_required.is_supported_auth(CEPH_AUTH_GSS) ||
3212 auth_service_required.is_supported_auth(CEPH_AUTH_GSS);
3213 }
3214
3215 struct C_MgrProxyCommand : public Context {
3216 Monitor *mon;
3217 MonOpRequestRef op;
3218 uint64_t size;
3219 bufferlist outbl;
3220 string outs;
3221 C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
3222 : mon(mon), op(op), size(s) { }
3223 void finish(int r) {
3224 std::lock_guard l(mon->lock);
3225 mon->mgr_proxy_bytes -= size;
3226 mon->reply_command(op, r, outs, outbl, 0);
3227 }
3228 };
3229
3230 void Monitor::handle_tell_command(MonOpRequestRef op)
3231 {
3232 ceph_assert(op->is_type_command());
3233 MCommand *m = static_cast<MCommand*>(op->get_req());
3234 if (m->fsid != monmap->fsid) {
3235 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
3236 return reply_tell_command(op, -EACCES, "wrong fsid");
3237 }
3238 MonSession *session = op->get_session();
3239 if (!session) {
3240 dout(5) << __func__ << " dropping stray message " << *m << dendl;
3241 return;
3242 }
3243 cmdmap_t cmdmap;
3244 if (stringstream ss; !cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3245 return reply_tell_command(op, -EINVAL, ss.str());
3246 }
3247 map<string,string> param_str_map;
3248 _generate_command_map(cmdmap, param_str_map);
3249 string prefix;
3250 if (!cmd_getval(cmdmap, "prefix", prefix)) {
3251 return reply_tell_command(op, -EINVAL, "no prefix");
3252 }
3253 if (auto cmd = _get_moncommand(prefix,
3254 get_local_commands(quorum_mon_features));
3255 cmd) {
3256 if (cmd->is_obsolete() ||
3257 (cct->_conf->mon_debug_deprecated_as_obsolete &&
3258 cmd->is_deprecated())) {
3259 return reply_tell_command(op, -ENOTSUP,
3260 "command is obsolete; "
3261 "please check usage and/or man page");
3262 }
3263 }
3264 // see if command is allowed
3265 if (!session->caps.is_capable(
3266 g_ceph_context,
3267 session->entity_name,
3268 "mon", prefix, param_str_map,
3269 true, true, true,
3270 session->get_peer_socket_addr())) {
3271 return reply_tell_command(op, -EACCES, "insufficient caps");
3272 }
3273 // pass it to asok
3274 cct->get_admin_socket()->queue_tell_command(m);
3275 }
3276
3277 void Monitor::handle_command(MonOpRequestRef op)
3278 {
3279 ceph_assert(op->is_type_command());
3280 auto m = op->get_req<MMonCommand>();
3281 if (m->fsid != monmap->fsid) {
3282 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid
3283 << dendl;
3284 reply_command(op, -EPERM, "wrong fsid", 0);
3285 return;
3286 }
3287
3288 MonSession *session = op->get_session();
3289 if (!session) {
3290 dout(5) << __func__ << " dropping stray message " << *m << dendl;
3291 return;
3292 }
3293
3294 if (m->cmd.empty()) {
3295 reply_command(op, -EINVAL, "no command specified", 0);
3296 return;
3297 }
3298
3299 string prefix;
3300 vector<string> fullcmd;
3301 cmdmap_t cmdmap;
3302 stringstream ss, ds;
3303 bufferlist rdata;
3304 string rs;
3305 int r = -EINVAL;
3306 rs = "unrecognized command";
3307
3308 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3309 // ss has reason for failure
3310 r = -EINVAL;
3311 rs = ss.str();
3312 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3313 reply_command(op, r, rs, 0);
3314 return;
3315 }
3316
3317 // check return value. If no prefix parameter provided,
3318 // return value will be false, then return error info.
3319 if (!cmd_getval(cmdmap, "prefix", prefix)) {
3320 reply_command(op, -EINVAL, "command prefix not found", 0);
3321 return;
3322 }
3323
3324 // check prefix is empty
3325 if (prefix.empty()) {
3326 reply_command(op, -EINVAL, "command prefix must not be empty", 0);
3327 return;
3328 }
3329
3330 if (prefix == "get_command_descriptions") {
3331 bufferlist rdata;
3332 Formatter *f = Formatter::create("json");
3333
3334 std::vector<MonCommand> commands = static_cast<MgrMonitor*>(
3335 paxos_service[PAXOS_MGR].get())->get_command_descs();
3336
3337 for (auto& c : leader_mon_commands) {
3338 commands.push_back(c);
3339 }
3340
3341 auto features = m->get_connection()->get_features();
3342 format_command_descriptions(commands, f, features, &rdata);
3343 delete f;
3344 reply_command(op, 0, "", rdata, 0);
3345 return;
3346 }
3347
3348 dout(0) << "handle_command " << *m << dendl;
3349
3350 string format = cmd_getval_or<string>(cmdmap, "format", "plain");
3351 boost::scoped_ptr<Formatter> f(Formatter::create(format));
3352
3353 get_str_vec(prefix, fullcmd);
3354
3355 // make sure fullcmd is not empty.
3356 // invalid prefix will cause empty vector fullcmd.
3357 // such as, prefix=";,,;"
3358 if (fullcmd.empty()) {
3359 reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
3360 return;
3361 }
3362
3363 std::string_view module = fullcmd[0];
3364
3365 // validate command is in leader map
3366
3367 const MonCommand *leader_cmd;
3368 const auto& mgr_cmds = mgrmon()->get_command_descs();
3369 const MonCommand *mgr_cmd = nullptr;
3370 if (!mgr_cmds.empty()) {
3371 mgr_cmd = _get_moncommand(prefix, mgr_cmds);
3372 }
3373 leader_cmd = _get_moncommand(prefix, leader_mon_commands);
3374 if (!leader_cmd) {
3375 leader_cmd = mgr_cmd;
3376 if (!leader_cmd) {
3377 reply_command(op, -EINVAL, "command not known", 0);
3378 return;
3379 }
3380 }
3381 // validate command is in our map & matches, or forward if it is allowed
3382 const MonCommand *mon_cmd = _get_moncommand(
3383 prefix,
3384 get_local_commands(quorum_mon_features));
3385 if (!mon_cmd) {
3386 mon_cmd = mgr_cmd;
3387 }
3388 if (!is_leader()) {
3389 if (!mon_cmd) {
3390 if (leader_cmd->is_noforward()) {
3391 reply_command(op, -EINVAL,
3392 "command not locally supported and not allowed to forward",
3393 0);
3394 return;
3395 }
3396 dout(10) << "Command not locally supported, forwarding request "
3397 << m << dendl;
3398 forward_request_leader(op);
3399 return;
3400 } else if (!mon_cmd->is_compat(leader_cmd)) {
3401 if (mon_cmd->is_noforward()) {
3402 reply_command(op, -EINVAL,
3403 "command not compatible with leader and not allowed to forward",
3404 0);
3405 return;
3406 }
3407 dout(10) << "Command not compatible with leader, forwarding request "
3408 << m << dendl;
3409 forward_request_leader(op);
3410 return;
3411 }
3412 }
3413
3414 if (mon_cmd->is_obsolete() ||
3415 (cct->_conf->mon_debug_deprecated_as_obsolete
3416 && mon_cmd->is_deprecated())) {
3417 reply_command(op, -ENOTSUP,
3418 "command is obsolete; please check usage and/or man page",
3419 0);
3420 return;
3421 }
3422
3423 if (session->proxy_con && mon_cmd->is_noforward()) {
3424 dout(10) << "Got forward for noforward command " << m << dendl;
3425 reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
3426 return;
3427 }
3428
3429 /* what we perceive as being the service the command falls under */
3430 string service(mon_cmd->module);
3431
3432 dout(25) << __func__ << " prefix='" << prefix
3433 << "' module='" << module
3434 << "' service='" << service << "'" << dendl;
3435
3436 bool cmd_is_rw =
3437 (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
3438
3439 // validate user's permissions for requested command
3440 map<string,string> param_str_map;
3441
3442 // Catch bad_cmd_get exception if _generate_command_map() throws it
3443 try {
3444 _generate_command_map(cmdmap, param_str_map);
3445 }
3446 catch(bad_cmd_get& e) {
3447 reply_command(op, -EINVAL, e.what(), 0);
3448 }
3449
3450 if (!_allowed_command(session, service, prefix, cmdmap,
3451 param_str_map, mon_cmd)) {
3452 dout(1) << __func__ << " access denied" << dendl;
3453 if (prefix != "config set" && prefix != "config-key set")
3454 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3455 << "from='" << session->name << " " << session->addrs << "' "
3456 << "entity='" << session->entity_name << "' "
3457 << "cmd=" << m->cmd << ": access denied";
3458 reply_command(op, -EACCES, "access denied", 0);
3459 return;
3460 }
3461
3462 if (prefix != "config set" && prefix != "config-key set")
3463 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3464 << "from='" << session->name << " " << session->addrs << "' "
3465 << "entity='" << session->entity_name << "' "
3466 << "cmd=" << m->cmd << ": dispatch";
3467
3468 // compat kludge for legacy clients trying to tell commands that are
3469 // new. see bottom of MonCommands.h. we need to handle both (1)
3470 // pre-octopus clients and (2) octopus clients with a mix of pre-octopus
3471 // and octopus mons.
3472 if ((!HAVE_FEATURE(m->get_connection()->get_features(), SERVER_OCTOPUS) ||
3473 monmap->min_mon_release < ceph_release_t::octopus) &&
3474 (prefix == "injectargs" ||
3475 prefix == "smart" ||
3476 prefix == "mon_status" ||
3477 prefix == "heap")) {
3478 if (m->get_connection()->get_messenger() == 0) {
3479 // Prior to octopus, monitors might forward these messages
3480 // around. that was broken at baseline, and if we try to process
3481 // this message now, it will assert out when we try to send a
3482 // message in reply from the asok/tell worker (see
3483 // AnonConnection). Just reply with an error.
3484 dout(5) << __func__ << " failing forwarded command from a (presumably) "
3485 << "pre-octopus peer" << dendl;
3486 reply_command(
3487 op, -EBUSY,
3488 "failing forwarded tell command in mixed-version mon cluster", 0);
3489 return;
3490 }
3491 dout(5) << __func__ << " passing command to tell/asok" << dendl;
3492 cct->get_admin_socket()->queue_tell_command(m);
3493 return;
3494 }
3495
3496 if (mon_cmd->is_mgr()) {
3497 const auto& hdr = m->get_header();
3498 uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
3499 uint64_t max = g_conf().get_val<Option::size_t>("mon_client_bytes")
3500 * g_conf().get_val<double>("mon_mgr_proxy_client_bytes_ratio");
3501 if (mgr_proxy_bytes + size > max) {
3502 dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
3503 << " + " << size << " > max " << max << dendl;
3504 reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
3505 return;
3506 }
3507 mgr_proxy_bytes += size;
3508 dout(10) << __func__ << " proxying mgr command (+" << size
3509 << " -> " << mgr_proxy_bytes << ")" << dendl;
3510 C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
3511 mgr_client.start_command(m->cmd,
3512 m->get_data(),
3513 &fin->outbl,
3514 &fin->outs,
3515 new C_OnFinisher(fin, &finisher));
3516 return;
3517 }
3518
3519 if ((module == "mds" || module == "fs") &&
3520 prefix != "fs authorize") {
3521 mdsmon()->dispatch(op);
3522 return;
3523 }
3524 if ((module == "osd" ||
3525 prefix == "pg map" ||
3526 prefix == "pg repeer") &&
3527 prefix != "osd last-stat-seq") {
3528 osdmon()->dispatch(op);
3529 return;
3530 }
3531 if (module == "config") {
3532 configmon()->dispatch(op);
3533 return;
3534 }
3535
3536 if (module == "mon" &&
3537 /* Let the Monitor class handle the following commands:
3538 * 'mon scrub'
3539 */
3540 prefix != "mon scrub" &&
3541 prefix != "mon metadata" &&
3542 prefix != "mon versions" &&
3543 prefix != "mon count-metadata" &&
3544 prefix != "mon ok-to-stop" &&
3545 prefix != "mon ok-to-add-offline" &&
3546 prefix != "mon ok-to-rm") {
3547 monmon()->dispatch(op);
3548 return;
3549 }
3550 if (module == "health" && prefix != "health") {
3551 healthmon()->dispatch(op);
3552 return;
3553 }
3554 if (module == "auth" || prefix == "fs authorize") {
3555 authmon()->dispatch(op);
3556 return;
3557 }
3558 if (module == "log") {
3559 logmon()->dispatch(op);
3560 return;
3561 }
3562
3563 if (module == "config-key") {
3564 kvmon()->dispatch(op);
3565 return;
3566 }
3567
3568 if (module == "mgr") {
3569 mgrmon()->dispatch(op);
3570 return;
3571 }
3572
3573 if (prefix == "fsid") {
3574 if (f) {
3575 f->open_object_section("fsid");
3576 f->dump_stream("fsid") << monmap->fsid;
3577 f->close_section();
3578 f->flush(rdata);
3579 } else {
3580 ds << monmap->fsid;
3581 rdata.append(ds);
3582 }
3583 reply_command(op, 0, "", rdata, 0);
3584 return;
3585 }
3586
3587 if (prefix == "mon scrub") {
3588 wait_for_paxos_write();
3589 if (is_leader()) {
3590 int r = scrub_start();
3591 reply_command(op, r, "", rdata, 0);
3592 } else if (is_peon()) {
3593 forward_request_leader(op);
3594 } else {
3595 reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3596 }
3597 return;
3598 }
3599
3600 if (prefix == "time-sync-status") {
3601 if (!f)
3602 f.reset(Formatter::create("json-pretty"));
3603 f->open_object_section("time_sync");
3604 if (!timecheck_skews.empty()) {
3605 f->open_object_section("time_skew_status");
3606 for (auto& i : timecheck_skews) {
3607 double skew = i.second;
3608 double latency = timecheck_latencies[i.first];
3609 string name = monmap->get_name(i.first);
3610 ostringstream tcss;
3611 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
3612 f->open_object_section(name.c_str());
3613 f->dump_float("skew", skew);
3614 f->dump_float("latency", latency);
3615 f->dump_stream("health") << tcstatus;
3616 if (tcstatus != HEALTH_OK) {
3617 f->dump_stream("details") << tcss.str();
3618 }
3619 f->close_section();
3620 }
3621 f->close_section();
3622 }
3623 f->open_object_section("timechecks");
3624 f->dump_unsigned("epoch", get_epoch());
3625 f->dump_int("round", timecheck_round);
3626 f->dump_stream("round_status") << ((timecheck_round%2) ?
3627 "on-going" : "finished");
3628 f->close_section();
3629 f->close_section();
3630 f->flush(rdata);
3631 r = 0;
3632 rs = "";
3633 } else if (prefix == "status" ||
3634 prefix == "health" ||
3635 prefix == "df") {
3636 string detail;
3637 cmd_getval(cmdmap, "detail", detail);
3638
3639 if (prefix == "status") {
3640 // get_cluster_status handles f == NULL
3641 get_cluster_status(ds, f.get(), session);
3642
3643 if (f) {
3644 f->flush(ds);
3645 ds << '\n';
3646 }
3647 rdata.append(ds);
3648 } else if (prefix == "health") {
3649 string plain;
3650 healthmon()->get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
3651 if (f) {
3652 f->flush(ds);
3653 rdata.append(ds);
3654 } else {
3655 rdata.append(plain);
3656 }
3657 } else if (prefix == "df") {
3658 bool verbose = (detail == "detail");
3659 if (f)
3660 f->open_object_section("stats");
3661
3662 mgrstatmon()->dump_cluster_stats(&ds, f.get(), verbose);
3663 if (!f) {
3664 ds << "\n \n";
3665 }
3666 mgrstatmon()->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
3667
3668 if (f) {
3669 f->close_section();
3670 f->flush(ds);
3671 ds << '\n';
3672 }
3673 } else {
3674 ceph_abort_msg("We should never get here!");
3675 return;
3676 }
3677 rdata.append(ds);
3678 rs = "";
3679 r = 0;
3680 } else if (prefix == "report") {
3681
3682 // this must be formatted, in its current form
3683 if (!f)
3684 f.reset(Formatter::create("json-pretty"));
3685 f->open_object_section("report");
3686 f->dump_stream("cluster_fingerprint") << fingerprint;
3687 f->dump_string("version", ceph_version_to_str());
3688 f->dump_string("commit", git_version_to_str());
3689 f->dump_stream("timestamp") << ceph_clock_now();
3690
3691 vector<string> tagsvec;
3692 cmd_getval(cmdmap, "tags", tagsvec);
3693 string tagstr = str_join(tagsvec, " ");
3694 if (!tagstr.empty())
3695 tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3696 f->dump_string("tag", tagstr);
3697
3698 healthmon()->get_health_status(true, f.get(), nullptr);
3699
3700 monmon()->dump_info(f.get());
3701 osdmon()->dump_info(f.get());
3702 mdsmon()->dump_info(f.get());
3703 authmon()->dump_info(f.get());
3704 mgrstatmon()->dump_info(f.get());
3705
3706 paxos->dump_info(f.get());
3707
3708 f->close_section();
3709 f->flush(rdata);
3710
3711 ostringstream ss2;
3712 ss2 << "report " << rdata.crc32c(CEPH_MON_PORT_LEGACY);
3713 rs = ss2.str();
3714 r = 0;
3715 } else if (prefix == "osd last-stat-seq") {
3716 int64_t osd = 0;
3717 cmd_getval(cmdmap, "id", osd);
3718 uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3719 if (f) {
3720 f->dump_unsigned("seq", seq);
3721 f->flush(ds);
3722 } else {
3723 ds << seq;
3724 rdata.append(ds);
3725 }
3726 rs = "";
3727 r = 0;
3728 } else if (prefix == "node ls") {
3729 string node_type("all");
3730 cmd_getval(cmdmap, "type", node_type);
3731 if (!f)
3732 f.reset(Formatter::create("json-pretty"));
3733 if (node_type == "all") {
3734 f->open_object_section("nodes");
3735 print_nodes(f.get(), ds);
3736 osdmon()->print_nodes(f.get());
3737 mdsmon()->print_nodes(f.get());
3738 mgrmon()->print_nodes(f.get());
3739 f->close_section();
3740 } else if (node_type == "mon") {
3741 print_nodes(f.get(), ds);
3742 } else if (node_type == "osd") {
3743 osdmon()->print_nodes(f.get());
3744 } else if (node_type == "mds") {
3745 mdsmon()->print_nodes(f.get());
3746 } else if (node_type == "mgr") {
3747 mgrmon()->print_nodes(f.get());
3748 }
3749 f->flush(ds);
3750 rdata.append(ds);
3751 rs = "";
3752 r = 0;
3753 } else if (prefix == "features") {
3754 if (!is_leader() && !is_peon()) {
3755 dout(10) << " waiting for quorum" << dendl;
3756 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3757 return;
3758 }
3759 if (!is_leader()) {
3760 forward_request_leader(op);
3761 return;
3762 }
3763 if (!f)
3764 f.reset(Formatter::create("json-pretty"));
3765 FeatureMap fm;
3766 get_combined_feature_map(&fm);
3767 f->dump_object("features", fm);
3768 f->flush(rdata);
3769 rs = "";
3770 r = 0;
3771 } else if (prefix == "mon metadata") {
3772 if (!f)
3773 f.reset(Formatter::create("json-pretty"));
3774
3775 string name;
3776 bool all = !cmd_getval(cmdmap, "id", name);
3777 if (!all) {
3778 // Dump a single mon's metadata
3779 int mon = monmap->get_rank(name);
3780 if (mon < 0) {
3781 rs = "requested mon not found";
3782 r = -ENOENT;
3783 goto out;
3784 }
3785 f->open_object_section("mon_metadata");
3786 r = get_mon_metadata(mon, f.get(), ds);
3787 f->close_section();
3788 } else {
3789 // Dump all mons' metadata
3790 r = 0;
3791 f->open_array_section("mon_metadata");
3792 for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3793 std::ostringstream get_err;
3794 f->open_object_section("mon");
3795 f->dump_string("name", monmap->get_name(rank));
3796 r = get_mon_metadata(rank, f.get(), get_err);
3797 f->close_section();
3798 if (r == -ENOENT || r == -EINVAL) {
3799 dout(1) << get_err.str() << dendl;
3800 // Drop error, list what metadata we do have
3801 r = 0;
3802 } else if (r != 0) {
3803 derr << "Unexpected error from get_mon_metadata: "
3804 << cpp_strerror(r) << dendl;
3805 ds << get_err.str();
3806 break;
3807 }
3808 }
3809 f->close_section();
3810 }
3811
3812 f->flush(ds);
3813 rdata.append(ds);
3814 rs = "";
3815 } else if (prefix == "mon versions") {
3816 if (!f)
3817 f.reset(Formatter::create("json-pretty"));
3818 count_metadata("ceph_version", f.get());
3819 f->flush(ds);
3820 rdata.append(ds);
3821 rs = "";
3822 r = 0;
3823 } else if (prefix == "mon count-metadata") {
3824 if (!f)
3825 f.reset(Formatter::create("json-pretty"));
3826 string field;
3827 cmd_getval(cmdmap, "property", field);
3828 count_metadata(field, f.get());
3829 f->flush(ds);
3830 rdata.append(ds);
3831 rs = "";
3832 r = 0;
3833 } else if (prefix == "quorum_status") {
3834 // make sure our map is readable and up to date
3835 if (!is_leader() && !is_peon()) {
3836 dout(10) << " waiting for quorum" << dendl;
3837 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3838 return;
3839 }
3840 _quorum_status(f.get(), ds);
3841 rdata.append(ds);
3842 rs = "";
3843 r = 0;
3844 } else if (prefix == "mon ok-to-stop") {
3845 vector<string> ids;
3846 if (!cmd_getval(cmdmap, "ids", ids)) {
3847 r = -EINVAL;
3848 goto out;
3849 }
3850 set<string> wouldbe;
3851 for (auto rank : quorum) {
3852 wouldbe.insert(monmap->get_name(rank));
3853 }
3854 for (auto& n : ids) {
3855 if (monmap->contains(n)) {
3856 wouldbe.erase(n);
3857 }
3858 }
3859 if (wouldbe.size() < monmap->min_quorum_size()) {
3860 r = -EBUSY;
3861 rs = "not enough monitors would be available (" + stringify(wouldbe) +
3862 ") after stopping mons " + stringify(ids);
3863 goto out;
3864 }
3865 r = 0;
3866 rs = "quorum should be preserved (" + stringify(wouldbe) +
3867 ") after stopping " + stringify(ids);
3868 } else if (prefix == "mon ok-to-add-offline") {
3869 if (quorum.size() < monmap->min_quorum_size(monmap->size() + 1)) {
3870 rs = "adding a monitor may break quorum (until that monitor starts)";
3871 r = -EBUSY;
3872 goto out;
3873 }
3874 rs = "adding another mon that is not yet online will not break quorum";
3875 r = 0;
3876 } else if (prefix == "mon ok-to-rm") {
3877 string id;
3878 if (!cmd_getval(cmdmap, "id", id)) {
3879 r = -EINVAL;
3880 rs = "must specify a monitor id";
3881 goto out;
3882 }
3883 if (!monmap->contains(id)) {
3884 r = 0;
3885 rs = "mon." + id + " does not exist";
3886 goto out;
3887 }
3888 int rank = monmap->get_rank(id);
3889 if (quorum.count(rank) &&
3890 quorum.size() - 1 < monmap->min_quorum_size(monmap->size() - 1)) {
3891 r = -EBUSY;
3892 rs = "removing mon." + id + " would break quorum";
3893 goto out;
3894 }
3895 r = 0;
3896 rs = "safe to remove mon." + id;
3897 } else if (prefix == "version") {
3898 if (f) {
3899 f->open_object_section("version");
3900 f->dump_string("version", pretty_version_to_str());
3901 f->close_section();
3902 f->flush(ds);
3903 } else {
3904 ds << pretty_version_to_str();
3905 }
3906 rdata.append(ds);
3907 rs = "";
3908 r = 0;
3909 } else if (prefix == "versions") {
3910 if (!f)
3911 f.reset(Formatter::create("json-pretty"));
3912 map<string,int> overall;
3913 f->open_object_section("version");
3914 map<string,int> mon, mgr, osd, mds;
3915
3916 count_metadata("ceph_version", &mon);
3917 f->open_object_section("mon");
3918 for (auto& p : mon) {
3919 f->dump_int(p.first.c_str(), p.second);
3920 overall[p.first] += p.second;
3921 }
3922 f->close_section();
3923
3924 mgrmon()->count_metadata("ceph_version", &mgr);
3925 f->open_object_section("mgr");
3926 for (auto& p : mgr) {
3927 f->dump_int(p.first.c_str(), p.second);
3928 overall[p.first] += p.second;
3929 }
3930 f->close_section();
3931
3932 osdmon()->count_metadata("ceph_version", &osd);
3933 f->open_object_section("osd");
3934 for (auto& p : osd) {
3935 f->dump_int(p.first.c_str(), p.second);
3936 overall[p.first] += p.second;
3937 }
3938 f->close_section();
3939
3940 mdsmon()->count_metadata("ceph_version", &mds);
3941 f->open_object_section("mds");
3942 for (auto& p : mds) {
3943 f->dump_int(p.first.c_str(), p.second);
3944 overall[p.first] += p.second;
3945 }
3946 f->close_section();
3947
3948 for (auto& p : mgrstatmon()->get_service_map().services) {
3949 auto &service = p.first;
3950 if (ServiceMap::is_normal_ceph_entity(service)) {
3951 continue;
3952 }
3953 f->open_object_section(service.c_str());
3954 map<string,int> m;
3955 p.second.count_metadata("ceph_version", &m);
3956 for (auto& q : m) {
3957 f->dump_int(q.first.c_str(), q.second);
3958 overall[q.first] += q.second;
3959 }
3960 f->close_section();
3961 }
3962
3963 f->open_object_section("overall");
3964 for (auto& p : overall) {
3965 f->dump_int(p.first.c_str(), p.second);
3966 }
3967 f->close_section();
3968 f->close_section();
3969 f->flush(rdata);
3970 rs = "";
3971 r = 0;
3972 }
3973
3974 out:
3975 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3976 reply_command(op, r, rs, rdata, 0);
3977 }
3978
3979 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
3980 {
3981 bufferlist rdata;
3982 reply_command(op, rc, rs, rdata, version);
3983 }
3984
3985 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
3986 bufferlist& rdata, version_t version)
3987 {
3988 auto m = op->get_req<MMonCommand>();
3989 ceph_assert(m->get_type() == MSG_MON_COMMAND);
3990 MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
3991 reply->set_tid(m->get_tid());
3992 reply->set_data(rdata);
3993 send_reply(op, reply);
3994 }
3995
3996 void Monitor::reply_tell_command(
3997 MonOpRequestRef op, int rc, const string &rs)
3998 {
3999 MCommand *m = static_cast<MCommand*>(op->get_req());
4000 ceph_assert(m->get_type() == MSG_COMMAND);
4001 MCommandReply *reply = new MCommandReply(rc, rs);
4002 reply->set_tid(m->get_tid());
4003 m->get_connection()->send_message(reply);
4004 }
4005
4006
4007 // ------------------------
4008 // request/reply routing
4009 //
4010 // a client/mds/osd will connect to a random monitor. we need to forward any
4011 // messages requiring state updates to the leader, and then route any replies
4012 // back via the correct monitor and back to them. (the monitor will not
4013 // initiate any connections.)
4014
4015 void Monitor::forward_request_leader(MonOpRequestRef op)
4016 {
4017 op->mark_event(__func__);
4018
4019 int mon = get_leader();
4020 MonSession *session = op->get_session();
4021 PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
4022
4023 if (req->get_source().is_mon() && req->get_source_addrs() != messenger->get_myaddrs()) {
4024 dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
4025 } else if (session->proxy_con) {
4026 dout(10) << "forward_request won't double fwd request " << *req << dendl;
4027 } else if (!session->closed) {
4028 RoutedRequest *rr = new RoutedRequest;
4029 rr->tid = ++routed_request_tid;
4030 rr->con = req->get_connection();
4031 rr->con_features = rr->con->get_features();
4032 encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
4033 rr->session = static_cast<MonSession *>(session->get());
4034 rr->op = op;
4035 routed_requests[rr->tid] = rr;
4036 session->routed_request_tids.insert(rr->tid);
4037
4038 dout(10) << "forward_request " << rr->tid << " request " << *req
4039 << " features " << rr->con_features << dendl;
4040
4041 MForward *forward = new MForward(rr->tid,
4042 req,
4043 rr->con_features,
4044 rr->session->caps);
4045 forward->set_priority(req->get_priority());
4046 if (session->auth_handler) {
4047 forward->entity_name = session->entity_name;
4048 } else if (req->get_source().is_mon()) {
4049 forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
4050 }
4051 send_mon_message(forward, mon);
4052 op->mark_forwarded();
4053 ceph_assert(op->get_req()->get_type() != 0);
4054 } else {
4055 dout(10) << "forward_request no session for request " << *req << dendl;
4056 }
4057 }
4058
4059 // fake connection attached to forwarded messages
4060 struct AnonConnection : public Connection {
4061 entity_addr_t socket_addr;
4062
4063 int send_message(Message *m) override {
4064 ceph_assert(!"send_message on anonymous connection");
4065 }
4066 void send_keepalive() override {
4067 ceph_assert(!"send_keepalive on anonymous connection");
4068 }
4069 void mark_down() override {
4070 // silently ignore
4071 }
4072 void mark_disposable() override {
4073 // silengtly ignore
4074 }
4075 bool is_connected() override { return false; }
4076 entity_addr_t get_peer_socket_addr() const override {
4077 return socket_addr;
4078 }
4079
4080 private:
4081 FRIEND_MAKE_REF(AnonConnection);
4082 explicit AnonConnection(CephContext *cct, const entity_addr_t& sa)
4083 : Connection(cct, nullptr),
4084 socket_addr(sa) {}
4085 };
4086
4087 //extract the original message and put it into the regular dispatch function
4088 void Monitor::handle_forward(MonOpRequestRef op)
4089 {
4090 auto m = op->get_req<MForward>();
4091 dout(10) << "received forwarded message from "
4092 << ceph_entity_type_name(m->client_type)
4093 << " " << m->client_addrs
4094 << " via " << m->get_source_inst() << dendl;
4095 MonSession *session = op->get_session();
4096 ceph_assert(session);
4097
4098 if (!session->is_capable("mon", MON_CAP_X)) {
4099 dout(0) << "forward from entity with insufficient caps! "
4100 << session->caps << dendl;
4101 } else {
4102 // see PaxosService::dispatch(); we rely on this being anon
4103 // (c->msgr == NULL)
4104 PaxosServiceMessage *req = m->claim_message();
4105 ceph_assert(req != NULL);
4106
4107 auto c = ceph::make_ref<AnonConnection>(cct, m->client_socket_addr);
4108 MonSession *s = new MonSession(static_cast<Connection*>(c.get()));
4109 s->_ident(req->get_source(),
4110 req->get_source_addrs());
4111 c->set_priv(RefCountedPtr{s, false});
4112 c->set_peer_addrs(m->client_addrs);
4113 c->set_peer_type(m->client_type);
4114 c->set_features(m->con_features);
4115
4116 s->authenticated = true;
4117 s->caps = m->client_caps;
4118 dout(10) << " caps are " << s->caps << dendl;
4119 s->entity_name = m->entity_name;
4120 dout(10) << " entity name '" << s->entity_name << "' type "
4121 << s->entity_name.get_type() << dendl;
4122 s->proxy_con = m->get_connection();
4123 s->proxy_tid = m->tid;
4124
4125 req->set_connection(c);
4126
4127 // not super accurate, but better than nothing.
4128 req->set_recv_stamp(m->get_recv_stamp());
4129
4130 /*
4131 * note which election epoch this is; we will drop the message if
4132 * there is a future election since our peers will resend routed
4133 * requests in that case.
4134 */
4135 req->rx_election_epoch = get_epoch();
4136
4137 dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
4138 _ms_dispatch(req);
4139
4140 // break the session <-> con ref loop by removing the con->session
4141 // reference, which is no longer needed once the MonOpRequest is
4142 // set up.
4143 c->set_priv(NULL);
4144 }
4145 }
4146
4147 void Monitor::send_reply(MonOpRequestRef op, Message *reply)
4148 {
4149 op->mark_event(__func__);
4150
4151 MonSession *session = op->get_session();
4152 ceph_assert(session);
4153 Message *req = op->get_req();
4154 ConnectionRef con = op->get_connection();
4155
4156 reply->set_cct(g_ceph_context);
4157 dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
4158
4159 if (!con) {
4160 dout(2) << "send_reply no connection, dropping reply " << *reply
4161 << " to " << req << " " << *req << dendl;
4162 reply->put();
4163 op->mark_event("reply: no connection");
4164 return;
4165 }
4166
4167 if (!session->con && !session->proxy_con) {
4168 dout(2) << "send_reply no connection, dropping reply " << *reply
4169 << " to " << req << " " << *req << dendl;
4170 reply->put();
4171 op->mark_event("reply: no connection");
4172 return;
4173 }
4174
4175 if (session->proxy_con) {
4176 dout(15) << "send_reply routing reply to " << con->get_peer_addr()
4177 << " via " << session->proxy_con->get_peer_addr()
4178 << " for request " << *req << dendl;
4179 session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
4180 op->mark_event("reply: send routed request");
4181 } else {
4182 session->con->send_message(reply);
4183 op->mark_event("reply: send");
4184 }
4185 }
4186
4187 void Monitor::no_reply(MonOpRequestRef op)
4188 {
4189 MonSession *session = op->get_session();
4190 Message *req = op->get_req();
4191
4192 if (session->proxy_con) {
4193 dout(10) << "no_reply to " << req->get_source_inst()
4194 << " via " << session->proxy_con->get_peer_addr()
4195 << " for request " << *req << dendl;
4196 session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
4197 op->mark_event("no_reply: send routed request");
4198 } else {
4199 dout(10) << "no_reply to " << req->get_source_inst()
4200 << " " << *req << dendl;
4201 op->mark_event("no_reply");
4202 }
4203 }
4204
4205 void Monitor::handle_route(MonOpRequestRef op)
4206 {
4207 auto m = op->get_req<MRoute>();
4208 MonSession *session = op->get_session();
4209 //check privileges
4210 if (!session->is_capable("mon", MON_CAP_X)) {
4211 dout(0) << "MRoute received from entity without appropriate perms! "
4212 << dendl;
4213 return;
4214 }
4215 if (m->msg)
4216 dout(10) << "handle_route tid " << m->session_mon_tid << " " << *m->msg
4217 << dendl;
4218 else
4219 dout(10) << "handle_route tid " << m->session_mon_tid << " null" << dendl;
4220
4221 // look it up
4222 if (!m->session_mon_tid) {
4223 dout(10) << " not a routed request, ignoring" << dendl;
4224 return;
4225 }
4226 auto found = routed_requests.find(m->session_mon_tid);
4227 if (found == routed_requests.end()) {
4228 dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
4229 return;
4230 }
4231 std::unique_ptr<RoutedRequest> rr{found->second};
4232 // reset payload, in case encoding is dependent on target features
4233 if (m->msg) {
4234 m->msg->clear_payload();
4235 rr->con->send_message(m->msg);
4236 m->msg = NULL;
4237 }
4238 if (m->send_osdmap_first) {
4239 dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
4240 osdmon()->send_incremental(m->send_osdmap_first, rr->session,
4241 true, MonOpRequestRef());
4242 }
4243 ceph_assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
4244 routed_requests.erase(found);
4245 rr->session->routed_request_tids.erase(m->session_mon_tid);
4246 }
4247
4248 void Monitor::resend_routed_requests()
4249 {
4250 dout(10) << "resend_routed_requests" << dendl;
4251 int mon = get_leader();
4252 list<Context*> retry;
4253 for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
4254 p != routed_requests.end();
4255 ++p) {
4256 RoutedRequest *rr = p->second;
4257
4258 if (mon == rank) {
4259 dout(10) << " requeue for self tid " << rr->tid << dendl;
4260 rr->op->mark_event("retry routed request");
4261 retry.push_back(new C_RetryMessage(this, rr->op));
4262 if (rr->session) {
4263 ceph_assert(rr->session->routed_request_tids.count(p->first));
4264 rr->session->routed_request_tids.erase(p->first);
4265 }
4266 delete rr;
4267 } else {
4268 auto q = rr->request_bl.cbegin();
4269 PaxosServiceMessage *req =
4270 (PaxosServiceMessage *)decode_message(cct, 0, q);
4271 rr->op->mark_event("resend forwarded message to leader");
4272 dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req
4273 << dendl;
4274 MForward *forward = new MForward(rr->tid,
4275 req,
4276 rr->con_features,
4277 rr->session->caps);
4278 req->put(); // forward takes its own ref; drop ours.
4279 forward->client_type = rr->con->get_peer_type();
4280 forward->client_addrs = rr->con->get_peer_addrs();
4281 forward->client_socket_addr = rr->con->get_peer_socket_addr();
4282 forward->set_priority(req->get_priority());
4283 send_mon_message(forward, mon);
4284 }
4285 }
4286 if (mon == rank) {
4287 routed_requests.clear();
4288 finish_contexts(g_ceph_context, retry);
4289 }
4290 }
4291
4292 void Monitor::remove_session(MonSession *s)
4293 {
4294 dout(10) << "remove_session " << s << " " << s->name << " " << s->addrs
4295 << " features 0x" << std::hex << s->con_features << std::dec << dendl;
4296 ceph_assert(s->con);
4297 ceph_assert(!s->closed);
4298 for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
4299 p != s->routed_request_tids.end();
4300 ++p) {
4301 ceph_assert(routed_requests.count(*p));
4302 RoutedRequest *rr = routed_requests[*p];
4303 dout(10) << " dropping routed request " << rr->tid << dendl;
4304 delete rr;
4305 routed_requests.erase(*p);
4306 }
4307 s->routed_request_tids.clear();
4308 s->con->set_priv(nullptr);
4309 session_map.remove_session(s);
4310 logger->set(l_mon_num_sessions, session_map.get_size());
4311 logger->inc(l_mon_session_rm);
4312 }
4313
4314 void Monitor::remove_all_sessions()
4315 {
4316 std::lock_guard l(session_map_lock);
4317 while (!session_map.sessions.empty()) {
4318 MonSession *s = session_map.sessions.front();
4319 remove_session(s);
4320 logger->inc(l_mon_session_rm);
4321 }
4322 if (logger)
4323 logger->set(l_mon_num_sessions, session_map.get_size());
4324 }
4325
4326 void Monitor::send_mon_message(Message *m, int rank)
4327 {
4328 messenger->send_to_mon(m, monmap->get_addrs(rank));
4329 }
4330
4331 void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
4332 {
4333 /**
4334 * Wait list the new session until we're in the quorum, assuming it's
4335 * sufficiently new.
4336 * tick() will periodically send them back through so we can send
4337 * the client elsewhere if we don't think we're getting back in.
4338 *
4339 * But we allow a few sorts of messages:
4340 * 1) Monitors can talk to us at any time, of course.
4341 * 2) auth messages. It's unlikely to go through much faster, but
4342 * it's possible we've just lost our quorum status and we want to take...
4343 * 3) command messages. We want to accept these under all possible
4344 * circumstances.
4345 */
4346 Message *m = op->get_req();
4347 MonSession *s = op->get_session();
4348 ConnectionRef con = op->get_connection();
4349 utime_t too_old = ceph_clock_now();
4350 too_old -= g_ceph_context->_conf->mon_lease;
4351 if (m->get_recv_stamp() > too_old &&
4352 con->is_connected()) {
4353 dout(5) << "waitlisting message " << *m << dendl;
4354 maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
4355 op->mark_wait_for_quorum();
4356 } else {
4357 dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
4358 con->mark_down();
4359 // proxied sessions aren't registered and don't have a con; don't remove
4360 // those.
4361 if (!s->proxy_con) {
4362 std::lock_guard l(session_map_lock);
4363 remove_session(s);
4364 }
4365 op->mark_zap();
4366 }
4367 }
4368
4369 void Monitor::_ms_dispatch(Message *m)
4370 {
4371 if (is_shutdown()) {
4372 m->put();
4373 return;
4374 }
4375
4376 MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
4377 bool src_is_mon = op->is_src_mon();
4378 op->mark_event("mon:_ms_dispatch");
4379 MonSession *s = op->get_session();
4380 if (s && s->closed) {
4381 return;
4382 }
4383
4384 if (src_is_mon && s) {
4385 ConnectionRef con = m->get_connection();
4386 if (con->get_messenger() && con->get_features() != s->con_features) {
4387 // only update features if this is a non-anonymous connection
4388 dout(10) << __func__ << " feature change for " << m->get_source_inst()
4389 << " (was " << s->con_features
4390 << ", now " << con->get_features() << ")" << dendl;
4391 // connection features changed - recreate session.
4392 if (s->con && s->con != con) {
4393 dout(10) << __func__ << " connection for " << m->get_source_inst()
4394 << " changed from session; mark down and replace" << dendl;
4395 s->con->mark_down();
4396 }
4397 if (s->item.is_on_list()) {
4398 // forwarded messages' sessions are not in the sessions map and
4399 // exist only while the op is being handled.
4400 std::lock_guard l(session_map_lock);
4401 remove_session(s);
4402 }
4403 s = nullptr;
4404 }
4405 }
4406
4407 if (!s) {
4408 // if the sender is not a monitor, make sure their first message for a
4409 // session is an MAuth. If it is not, assume it's a stray message,
4410 // and considering that we are creating a new session it is safe to
4411 // assume that the sender hasn't authenticated yet, so we have no way
4412 // of assessing whether we should handle it or not.
4413 if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
4414 m->get_type() != CEPH_MSG_MON_GET_MAP &&
4415 m->get_type() != CEPH_MSG_PING)) {
4416 dout(1) << __func__ << " dropping stray message " << *m
4417 << " from " << m->get_source_inst() << dendl;
4418 return;
4419 }
4420
4421 ConnectionRef con = m->get_connection();
4422 {
4423 std::lock_guard l(session_map_lock);
4424 s = session_map.new_session(m->get_source(),
4425 m->get_source_addrs(),
4426 con.get());
4427 }
4428 ceph_assert(s);
4429 con->set_priv(RefCountedPtr{s, false});
4430 dout(10) << __func__ << " new session " << s << " " << *s
4431 << " features 0x" << std::hex
4432 << s->con_features << std::dec << dendl;
4433 op->set_session(s);
4434
4435 logger->set(l_mon_num_sessions, session_map.get_size());
4436 logger->inc(l_mon_session_add);
4437
4438 if (src_is_mon) {
4439 // give it monitor caps; the peer type has been authenticated
4440 dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
4441 if (!s->caps.is_allow_all()) // but no need to repeatedly copy
4442 s->caps = mon_caps;
4443 s->authenticated = true;
4444 }
4445 } else {
4446 dout(20) << __func__ << " existing session " << s << " for " << s->name
4447 << dendl;
4448 }
4449
4450 ceph_assert(s);
4451
4452 s->session_timeout = ceph_clock_now();
4453 s->session_timeout += g_conf()->mon_session_timeout;
4454
4455 if (s->auth_handler) {
4456 s->entity_name = s->auth_handler->get_entity_name();
4457 s->global_id = s->auth_handler->get_global_id();
4458 s->global_id_status = s->auth_handler->get_global_id_status();
4459 }
4460 dout(20) << " entity_name " << s->entity_name
4461 << " global_id " << s->global_id
4462 << " (" << s->global_id_status
4463 << ") caps " << s->caps.get_str() << dendl;
4464
4465 if (!session_stretch_allowed(s, op)) {
4466 return;
4467 }
4468 if ((is_synchronizing() ||
4469 (!s->authenticated && !exited_quorum.is_zero())) &&
4470 !src_is_mon &&
4471 m->get_type() != CEPH_MSG_PING) {
4472 waitlist_or_zap_client(op);
4473 } else {
4474 dispatch_op(op);
4475 }
4476 return;
4477 }
4478
4479 void Monitor::dispatch_op(MonOpRequestRef op)
4480 {
4481 op->mark_event("mon:dispatch_op");
4482 MonSession *s = op->get_session();
4483 ceph_assert(s);
4484 if (s->closed) {
4485 dout(10) << " session closed, dropping " << op->get_req() << dendl;
4486 return;
4487 }
4488
4489 /* we will consider the default type as being 'monitor' until proven wrong */
4490 op->set_type_monitor();
4491 /* deal with all messages that do not necessarily need caps */
4492 switch (op->get_req()->get_type()) {
4493 // auth
4494 case MSG_MON_GLOBAL_ID:
4495 case CEPH_MSG_AUTH:
4496 op->set_type_service();
4497 /* no need to check caps here */
4498 paxos_service[PAXOS_AUTH]->dispatch(op);
4499 return;
4500
4501 case CEPH_MSG_PING:
4502 handle_ping(op);
4503 return;
4504 case MSG_COMMAND:
4505 op->set_type_command();
4506 handle_tell_command(op);
4507 return;
4508 }
4509
4510 if (!op->get_session()->authenticated) {
4511 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4512 << " is not authenticated, dropping " << *(op->get_req())
4513 << dendl;
4514 return;
4515 }
4516
4517 // global_id_status == NONE: all sessions for auth_none and krb,
4518 // mon <-> mon sessions (including proxied sessions) for cephx
4519 ceph_assert(s->global_id_status == global_id_status_t::NONE ||
4520 s->global_id_status == global_id_status_t::NEW_OK ||
4521 s->global_id_status == global_id_status_t::NEW_NOT_EXPOSED ||
4522 s->global_id_status == global_id_status_t::RECLAIM_OK ||
4523 s->global_id_status == global_id_status_t::RECLAIM_INSECURE);
4524
4525 // let mon_getmap through for "ping" (which doesn't reconnect)
4526 // and "tell" (which reconnects but doesn't attempt to preserve
4527 // its global_id and stays in NEW_NOT_EXPOSED, retrying until
4528 // ->send_attempts reaches 0)
4529 if (cct->_conf->auth_expose_insecure_global_id_reclaim &&
4530 s->global_id_status == global_id_status_t::NEW_NOT_EXPOSED &&
4531 op->get_req()->get_type() != CEPH_MSG_MON_GET_MAP) {
4532 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4533 << " may omit old_ticket on reconnects, discarding "
4534 << *op->get_req() << " and forcing reconnect" << dendl;
4535 ceph_assert(s->con && !s->proxy_con);
4536 s->con->mark_down();
4537 {
4538 std::lock_guard l(session_map_lock);
4539 remove_session(s);
4540 }
4541 op->mark_zap();
4542 return;
4543 }
4544
4545 switch (op->get_req()->get_type()) {
4546 case CEPH_MSG_MON_GET_MAP:
4547 handle_mon_get_map(op);
4548 return;
4549
4550 case MSG_GET_CONFIG:
4551 configmon()->handle_get_config(op);
4552 return;
4553
4554 case CEPH_MSG_MON_SUBSCRIBE:
4555 /* FIXME: check what's being subscribed, filter accordingly */
4556 handle_subscribe(op);
4557 return;
4558 }
4559
4560 /* well, maybe the op belongs to a service... */
4561 op->set_type_service();
4562 /* deal with all messages which caps should be checked somewhere else */
4563 switch (op->get_req()->get_type()) {
4564
4565 // OSDs
4566 case CEPH_MSG_MON_GET_OSDMAP:
4567 case CEPH_MSG_POOLOP:
4568 case MSG_OSD_BEACON:
4569 case MSG_OSD_MARK_ME_DOWN:
4570 case MSG_OSD_MARK_ME_DEAD:
4571 case MSG_OSD_FULL:
4572 case MSG_OSD_FAILURE:
4573 case MSG_OSD_BOOT:
4574 case MSG_OSD_ALIVE:
4575 case MSG_OSD_PGTEMP:
4576 case MSG_OSD_PG_CREATED:
4577 case MSG_REMOVE_SNAPS:
4578 case MSG_MON_GET_PURGED_SNAPS:
4579 case MSG_OSD_PG_READY_TO_MERGE:
4580 paxos_service[PAXOS_OSDMAP]->dispatch(op);
4581 return;
4582
4583 // MDSs
4584 case MSG_MDS_BEACON:
4585 case MSG_MDS_OFFLOAD_TARGETS:
4586 paxos_service[PAXOS_MDSMAP]->dispatch(op);
4587 return;
4588
4589 // Mgrs
4590 case MSG_MGR_BEACON:
4591 paxos_service[PAXOS_MGR]->dispatch(op);
4592 return;
4593
4594 // MgrStat
4595 case MSG_MON_MGR_REPORT:
4596 case CEPH_MSG_STATFS:
4597 case MSG_GETPOOLSTATS:
4598 paxos_service[PAXOS_MGRSTAT]->dispatch(op);
4599 return;
4600
4601 // log
4602 case MSG_LOG:
4603 paxos_service[PAXOS_LOG]->dispatch(op);
4604 return;
4605
4606 // handle_command() does its own caps checking
4607 case MSG_MON_COMMAND:
4608 op->set_type_command();
4609 handle_command(op);
4610 return;
4611 }
4612
4613 /* nop, looks like it's not a service message; revert back to monitor */
4614 op->set_type_monitor();
4615
4616 /* messages we, the Monitor class, need to deal with
4617 * but may be sent by clients. */
4618
4619 if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
4620 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4621 << " not enough caps for " << *(op->get_req()) << " -- dropping"
4622 << dendl;
4623 return;
4624 }
4625
4626 switch (op->get_req()->get_type()) {
4627 // misc
4628 case CEPH_MSG_MON_GET_VERSION:
4629 handle_get_version(op);
4630 return;
4631 }
4632
4633 if (!op->is_src_mon()) {
4634 dout(1) << __func__ << " unexpected monitor message from"
4635 << " non-monitor entity " << op->get_req()->get_source_inst()
4636 << " " << *(op->get_req()) << " -- dropping" << dendl;
4637 return;
4638 }
4639
4640 /* messages that should only be sent by another monitor */
4641 switch (op->get_req()->get_type()) {
4642
4643 case MSG_ROUTE:
4644 handle_route(op);
4645 return;
4646
4647 case MSG_MON_PROBE:
4648 handle_probe(op);
4649 return;
4650
4651 // Sync (i.e., the new slurp, but on steroids)
4652 case MSG_MON_SYNC:
4653 handle_sync(op);
4654 return;
4655 case MSG_MON_SCRUB:
4656 handle_scrub(op);
4657 return;
4658
4659 /* log acks are sent from a monitor we sent the MLog to, and are
4660 never sent by clients to us. */
4661 case MSG_LOGACK:
4662 log_client.handle_log_ack((MLogAck*)op->get_req());
4663 return;
4664
4665 // monmap
4666 case MSG_MON_JOIN:
4667 op->set_type_service();
4668 paxos_service[PAXOS_MONMAP]->dispatch(op);
4669 return;
4670
4671 // paxos
4672 case MSG_MON_PAXOS:
4673 {
4674 op->set_type_paxos();
4675 auto pm = op->get_req<MMonPaxos>();
4676 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4677 //can't send these!
4678 return;
4679 }
4680
4681 if (state == STATE_SYNCHRONIZING) {
4682 // we are synchronizing. These messages would do us no
4683 // good, thus just drop them and ignore them.
4684 dout(10) << __func__ << " ignore paxos msg from "
4685 << pm->get_source_inst() << dendl;
4686 return;
4687 }
4688
4689 // sanitize
4690 if (pm->epoch > get_epoch()) {
4691 bootstrap();
4692 return;
4693 }
4694 if (pm->epoch != get_epoch()) {
4695 return;
4696 }
4697
4698 paxos->dispatch(op);
4699 }
4700 return;
4701
4702 // elector messages
4703 case MSG_MON_ELECTION:
4704 op->set_type_election_or_ping();
4705 //check privileges here for simplicity
4706 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4707 dout(0) << "MMonElection received from entity without enough caps!"
4708 << op->get_session()->caps << dendl;
4709 return;;
4710 }
4711 if (!is_probing() && !is_synchronizing()) {
4712 elector.dispatch(op);
4713 }
4714 return;
4715
4716 case MSG_MON_PING:
4717 op->set_type_election_or_ping();
4718 elector.dispatch(op);
4719 return;
4720
4721 case MSG_FORWARD:
4722 handle_forward(op);
4723 return;
4724
4725 case MSG_TIMECHECK:
4726 dout(5) << __func__ << " ignoring " << op << dendl;
4727 return;
4728 case MSG_TIMECHECK2:
4729 handle_timecheck(op);
4730 return;
4731
4732 case MSG_MON_HEALTH:
4733 dout(5) << __func__ << " dropping deprecated message: "
4734 << *op->get_req() << dendl;
4735 break;
4736 case MSG_MON_HEALTH_CHECKS:
4737 op->set_type_service();
4738 paxos_service[PAXOS_HEALTH]->dispatch(op);
4739 return;
4740 }
4741 dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
4742 return;
4743 }
4744
4745 void Monitor::handle_ping(MonOpRequestRef op)
4746 {
4747 auto m = op->get_req<MPing>();
4748 dout(10) << __func__ << " " << *m << dendl;
4749 MPing *reply = new MPing;
4750 bufferlist payload;
4751 boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4752 f->open_object_section("pong");
4753
4754 healthmon()->get_health_status(false, f.get(), nullptr);
4755 get_mon_status(f.get());
4756
4757 f->close_section();
4758 stringstream ss;
4759 f->flush(ss);
4760 encode(ss.str(), payload);
4761 reply->set_payload(payload);
4762 dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
4763 m->get_connection()->send_message(reply);
4764 }
4765
4766 void Monitor::timecheck_start()
4767 {
4768 dout(10) << __func__ << dendl;
4769 timecheck_cleanup();
4770 if (get_quorum_mon_features().contains_all(
4771 ceph::features::mon::FEATURE_NAUTILUS)) {
4772 timecheck_start_round();
4773 }
4774 }
4775
4776 void Monitor::timecheck_finish()
4777 {
4778 dout(10) << __func__ << dendl;
4779 timecheck_cleanup();
4780 }
4781
4782 void Monitor::timecheck_start_round()
4783 {
4784 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4785 ceph_assert(is_leader());
4786
4787 if (monmap->size() == 1) {
4788 ceph_abort_msg("We are alone; this shouldn't have been scheduled!");
4789 return;
4790 }
4791
4792 if (timecheck_round % 2) {
4793 dout(10) << __func__ << " there's a timecheck going on" << dendl;
4794 utime_t curr_time = ceph_clock_now();
4795 double max = g_conf()->mon_timecheck_interval*3;
4796 if (curr_time - timecheck_round_start < max) {
4797 dout(10) << __func__ << " keep current round going" << dendl;
4798 goto out;
4799 } else {
4800 dout(10) << __func__
4801 << " finish current timecheck and start new" << dendl;
4802 timecheck_cancel_round();
4803 }
4804 }
4805
4806 ceph_assert(timecheck_round % 2 == 0);
4807 timecheck_acks = 0;
4808 timecheck_round ++;
4809 timecheck_round_start = ceph_clock_now();
4810 dout(10) << __func__ << " new " << timecheck_round << dendl;
4811
4812 timecheck();
4813 out:
4814 dout(10) << __func__ << " setting up next event" << dendl;
4815 timecheck_reset_event();
4816 }
4817
4818 void Monitor::timecheck_finish_round(bool success)
4819 {
4820 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4821 ceph_assert(timecheck_round % 2);
4822 timecheck_round ++;
4823 timecheck_round_start = utime_t();
4824
4825 if (success) {
4826 ceph_assert(timecheck_waiting.empty());
4827 ceph_assert(timecheck_acks == quorum.size());
4828 timecheck_report();
4829 timecheck_check_skews();
4830 return;
4831 }
4832
4833 dout(10) << __func__ << " " << timecheck_waiting.size()
4834 << " peers still waiting:";
4835 for (auto& p : timecheck_waiting) {
4836 *_dout << " mon." << p.first;
4837 }
4838 *_dout << dendl;
4839 timecheck_waiting.clear();
4840
4841 dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4842 }
4843
4844 void Monitor::timecheck_cancel_round()
4845 {
4846 timecheck_finish_round(false);
4847 }
4848
4849 void Monitor::timecheck_cleanup()
4850 {
4851 timecheck_round = 0;
4852 timecheck_acks = 0;
4853 timecheck_round_start = utime_t();
4854
4855 if (timecheck_event) {
4856 timer.cancel_event(timecheck_event);
4857 timecheck_event = NULL;
4858 }
4859 timecheck_waiting.clear();
4860 timecheck_skews.clear();
4861 timecheck_latencies.clear();
4862
4863 timecheck_rounds_since_clean = 0;
4864 }
4865
4866 void Monitor::timecheck_reset_event()
4867 {
4868 if (timecheck_event) {
4869 timer.cancel_event(timecheck_event);
4870 timecheck_event = NULL;
4871 }
4872
4873 double delay =
4874 cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4875
4876 if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4877 delay = cct->_conf->mon_timecheck_interval;
4878 }
4879
4880 dout(10) << __func__ << " delay " << delay
4881 << " rounds_since_clean " << timecheck_rounds_since_clean
4882 << dendl;
4883
4884 timecheck_event = timer.add_event_after(
4885 delay,
4886 new C_MonContext{this, [this](int) {
4887 timecheck_start_round();
4888 }});
4889 }
4890
4891 void Monitor::timecheck_check_skews()
4892 {
4893 dout(10) << __func__ << dendl;
4894 ceph_assert(is_leader());
4895 ceph_assert((timecheck_round % 2) == 0);
4896 if (monmap->size() == 1) {
4897 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4898 return;
4899 }
4900 ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
4901
4902 bool found_skew = false;
4903 for (auto& p : timecheck_skews) {
4904 double abs_skew;
4905 if (timecheck_has_skew(p.second, &abs_skew)) {
4906 dout(10) << __func__
4907 << " " << p.first << " skew " << abs_skew << dendl;
4908 found_skew = true;
4909 }
4910 }
4911
4912 if (found_skew) {
4913 ++timecheck_rounds_since_clean;
4914 timecheck_reset_event();
4915 } else if (timecheck_rounds_since_clean > 0) {
4916 dout(1) << __func__
4917 << " no clock skews found after " << timecheck_rounds_since_clean
4918 << " rounds" << dendl;
4919 // make sure the skews are really gone and not just a transient success
4920 // this will run just once if not in the presence of skews again.
4921 timecheck_rounds_since_clean = 1;
4922 timecheck_reset_event();
4923 timecheck_rounds_since_clean = 0;
4924 }
4925
4926 }
4927
4928 void Monitor::timecheck_report()
4929 {
4930 dout(10) << __func__ << dendl;
4931 ceph_assert(is_leader());
4932 ceph_assert((timecheck_round % 2) == 0);
4933 if (monmap->size() == 1) {
4934 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4935 return;
4936 }
4937
4938 ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
4939 bool do_output = true; // only output report once
4940 for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4941 if (monmap->get_name(*q) == name)
4942 continue;
4943
4944 MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_REPORT);
4945 m->epoch = get_epoch();
4946 m->round = timecheck_round;
4947
4948 for (auto& it : timecheck_skews) {
4949 double skew = it.second;
4950 double latency = timecheck_latencies[it.first];
4951
4952 m->skews[it.first] = skew;
4953 m->latencies[it.first] = latency;
4954
4955 if (do_output) {
4956 dout(25) << __func__ << " mon." << it.first
4957 << " latency " << latency
4958 << " skew " << skew << dendl;
4959 }
4960 }
4961 do_output = false;
4962 dout(10) << __func__ << " send report to mon." << *q << dendl;
4963 send_mon_message(m, *q);
4964 }
4965 }
4966
4967 void Monitor::timecheck()
4968 {
4969 dout(10) << __func__ << dendl;
4970 ceph_assert(is_leader());
4971 if (monmap->size() == 1) {
4972 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4973 return;
4974 }
4975 ceph_assert(timecheck_round % 2 != 0);
4976
4977 timecheck_acks = 1; // we ack ourselves
4978
4979 dout(10) << __func__ << " start timecheck epoch " << get_epoch()
4980 << " round " << timecheck_round << dendl;
4981
4982 // we are at the eye of the storm; the point of reference
4983 timecheck_skews[rank] = 0.0;
4984 timecheck_latencies[rank] = 0.0;
4985
4986 for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
4987 if (monmap->get_name(*it) == name)
4988 continue;
4989
4990 utime_t curr_time = ceph_clock_now();
4991 timecheck_waiting[*it] = curr_time;
4992 MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_PING);
4993 m->epoch = get_epoch();
4994 m->round = timecheck_round;
4995 dout(10) << __func__ << " send " << *m << " to mon." << *it << dendl;
4996 send_mon_message(m, *it);
4997 }
4998 }
4999
5000 health_status_t Monitor::timecheck_status(ostringstream &ss,
5001 const double skew_bound,
5002 const double latency)
5003 {
5004 health_status_t status = HEALTH_OK;
5005 ceph_assert(latency >= 0);
5006
5007 double abs_skew;
5008 if (timecheck_has_skew(skew_bound, &abs_skew)) {
5009 status = HEALTH_WARN;
5010 ss << "clock skew " << abs_skew << "s"
5011 << " > max " << g_conf()->mon_clock_drift_allowed << "s";
5012 }
5013
5014 return status;
5015 }
5016
5017 void Monitor::handle_timecheck_leader(MonOpRequestRef op)
5018 {
5019 auto m = op->get_req<MTimeCheck2>();
5020 dout(10) << __func__ << " " << *m << dendl;
5021 /* handles PONG's */
5022 ceph_assert(m->op == MTimeCheck2::OP_PONG);
5023
5024 int other = m->get_source().num();
5025 if (m->epoch < get_epoch()) {
5026 dout(1) << __func__ << " got old timecheck epoch " << m->epoch
5027 << " from " << other
5028 << " curr " << get_epoch()
5029 << " -- severely lagged? discard" << dendl;
5030 return;
5031 }
5032 ceph_assert(m->epoch == get_epoch());
5033
5034 if (m->round < timecheck_round) {
5035 dout(1) << __func__ << " got old round " << m->round
5036 << " from " << other
5037 << " curr " << timecheck_round << " -- discard" << dendl;
5038 return;
5039 }
5040
5041 utime_t curr_time = ceph_clock_now();
5042
5043 ceph_assert(timecheck_waiting.count(other) > 0);
5044 utime_t timecheck_sent = timecheck_waiting[other];
5045 timecheck_waiting.erase(other);
5046 if (curr_time < timecheck_sent) {
5047 // our clock was readjusted -- drop everything until it all makes sense.
5048 dout(1) << __func__ << " our clock was readjusted --"
5049 << " bump round and drop current check"
5050 << dendl;
5051 timecheck_cancel_round();
5052 return;
5053 }
5054
5055 /* update peer latencies */
5056 double latency = (double)(curr_time - timecheck_sent);
5057
5058 if (timecheck_latencies.count(other) == 0)
5059 timecheck_latencies[other] = latency;
5060 else {
5061 double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
5062 timecheck_latencies[other] = avg_latency;
5063 }
5064
5065 /*
5066 * update skews
5067 *
5068 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
5069 * and 'a' happens to be lower than 'b'; so we use double instead.
5070 *
5071 * latency is always expected to be >= 0.
5072 *
5073 * delta, the difference between theirs timestamp and ours, may either be
5074 * lower or higher than 0; will hardly ever be 0.
5075 *
5076 * The absolute skew is the absolute delta minus the latency, which is
5077 * taken as a whole instead of an rtt given that there is some queueing
5078 * and dispatch times involved and it's hard to assess how long exactly
5079 * it took for the message to travel to the other side and be handled. So
5080 * we call it a bounded skew, the worst case scenario.
5081 *
5082 * Now, to math!
5083 *
5084 * Given that the latency is always positive, we can establish that the
5085 * bounded skew will be:
5086 *
5087 * 1. positive if the absolute delta is higher than the latency and
5088 * delta is positive
5089 * 2. negative if the absolute delta is higher than the latency and
5090 * delta is negative.
5091 * 3. zero if the absolute delta is lower than the latency.
5092 *
5093 * On 3. we make a judgement call and treat the skew as non-existent.
5094 * This is because that, if the absolute delta is lower than the
5095 * latency, then the apparently existing skew is nothing more than a
5096 * side-effect of the high latency at work.
5097 *
5098 * This may not be entirely true though, as a severely skewed clock
5099 * may be masked by an even higher latency, but with high latencies
5100 * we probably have worse issues to deal with than just skewed clocks.
5101 */
5102 ceph_assert(latency >= 0);
5103
5104 double delta = ((double) m->timestamp) - ((double) curr_time);
5105 double abs_delta = (delta > 0 ? delta : -delta);
5106 double skew_bound = abs_delta - latency;
5107 if (skew_bound < 0)
5108 skew_bound = 0;
5109 else if (delta < 0)
5110 skew_bound = -skew_bound;
5111
5112 ostringstream ss;
5113 health_status_t status = timecheck_status(ss, skew_bound, latency);
5114 if (status != HEALTH_OK) {
5115 clog->health(status) << other << " " << ss.str();
5116 }
5117
5118 dout(10) << __func__ << " from " << other << " ts " << m->timestamp
5119 << " delta " << delta << " skew_bound " << skew_bound
5120 << " latency " << latency << dendl;
5121
5122 timecheck_skews[other] = skew_bound;
5123
5124 timecheck_acks++;
5125 if (timecheck_acks == quorum.size()) {
5126 dout(10) << __func__ << " got pongs from everybody ("
5127 << timecheck_acks << " total)" << dendl;
5128 ceph_assert(timecheck_skews.size() == timecheck_acks);
5129 ceph_assert(timecheck_waiting.empty());
5130 // everyone has acked, so bump the round to finish it.
5131 timecheck_finish_round();
5132 }
5133 }
5134
5135 void Monitor::handle_timecheck_peon(MonOpRequestRef op)
5136 {
5137 auto m = op->get_req<MTimeCheck2>();
5138 dout(10) << __func__ << " " << *m << dendl;
5139
5140 ceph_assert(is_peon());
5141 ceph_assert(m->op == MTimeCheck2::OP_PING || m->op == MTimeCheck2::OP_REPORT);
5142
5143 if (m->epoch != get_epoch()) {
5144 dout(1) << __func__ << " got wrong epoch "
5145 << "(ours " << get_epoch()
5146 << " theirs: " << m->epoch << ") -- discarding" << dendl;
5147 return;
5148 }
5149
5150 if (m->round < timecheck_round) {
5151 dout(1) << __func__ << " got old round " << m->round
5152 << " current " << timecheck_round
5153 << " (epoch " << get_epoch() << ") -- discarding" << dendl;
5154 return;
5155 }
5156
5157 timecheck_round = m->round;
5158
5159 if (m->op == MTimeCheck2::OP_REPORT) {
5160 ceph_assert((timecheck_round % 2) == 0);
5161 timecheck_latencies.swap(m->latencies);
5162 timecheck_skews.swap(m->skews);
5163 return;
5164 }
5165
5166 ceph_assert((timecheck_round % 2) != 0);
5167 MTimeCheck2 *reply = new MTimeCheck2(MTimeCheck2::OP_PONG);
5168 utime_t curr_time = ceph_clock_now();
5169 reply->timestamp = curr_time;
5170 reply->epoch = m->epoch;
5171 reply->round = m->round;
5172 dout(10) << __func__ << " send " << *m
5173 << " to " << m->get_source_inst() << dendl;
5174 m->get_connection()->send_message(reply);
5175 }
5176
5177 void Monitor::handle_timecheck(MonOpRequestRef op)
5178 {
5179 auto m = op->get_req<MTimeCheck2>();
5180 dout(10) << __func__ << " " << *m << dendl;
5181
5182 if (is_leader()) {
5183 if (m->op != MTimeCheck2::OP_PONG) {
5184 dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
5185 } else {
5186 handle_timecheck_leader(op);
5187 }
5188 } else if (is_peon()) {
5189 if (m->op != MTimeCheck2::OP_PING && m->op != MTimeCheck2::OP_REPORT) {
5190 dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
5191 } else {
5192 handle_timecheck_peon(op);
5193 }
5194 } else {
5195 dout(1) << __func__ << " drop unexpected msg" << dendl;
5196 }
5197 }
5198
5199 void Monitor::handle_subscribe(MonOpRequestRef op)
5200 {
5201 auto m = op->get_req<MMonSubscribe>();
5202 dout(10) << "handle_subscribe " << *m << dendl;
5203
5204 bool reply = false;
5205
5206 MonSession *s = op->get_session();
5207 ceph_assert(s);
5208
5209 if (m->hostname.size()) {
5210 s->remote_host = m->hostname;
5211 }
5212
5213 for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
5214 p != m->what.end();
5215 ++p) {
5216 if (p->first == "monmap" || p->first == "config") {
5217 // these require no caps
5218 } else if (!s->is_capable("mon", MON_CAP_R)) {
5219 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
5220 << " not enough caps for " << *(op->get_req()) << " -- dropping"
5221 << dendl;
5222 continue;
5223 }
5224
5225 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
5226 if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
5227 reply = true;
5228
5229 // remove conflicting subscribes
5230 if (logmon()->sub_name_to_id(p->first) >= 0) {
5231 for (map<string, Subscription*>::iterator it = s->sub_map.begin();
5232 it != s->sub_map.end(); ) {
5233 if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
5234 std::lock_guard l(session_map_lock);
5235 session_map.remove_sub((it++)->second);
5236 } else {
5237 ++it;
5238 }
5239 }
5240 }
5241
5242 {
5243 std::lock_guard l(session_map_lock);
5244 session_map.add_update_sub(s, p->first, p->second.start,
5245 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
5246 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
5247 }
5248
5249 if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
5250 dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
5251 if ((int)s->is_capable("mds", MON_CAP_R)) {
5252 Subscription *sub = s->sub_map[p->first];
5253 ceph_assert(sub != nullptr);
5254 mdsmon()->check_sub(sub);
5255 }
5256 } else if (p->first == "osdmap") {
5257 if ((int)s->is_capable("osd", MON_CAP_R)) {
5258 if (s->osd_epoch > p->second.start) {
5259 // client needs earlier osdmaps on purpose, so reset the sent epoch
5260 s->osd_epoch = 0;
5261 }
5262 osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
5263 }
5264 } else if (p->first == "osd_pg_creates") {
5265 if ((int)s->is_capable("osd", MON_CAP_W)) {
5266 osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
5267 }
5268 } else if (p->first == "monmap") {
5269 monmon()->check_sub(s->sub_map[p->first]);
5270 } else if (logmon()->sub_name_to_id(p->first) >= 0) {
5271 logmon()->check_sub(s->sub_map[p->first]);
5272 } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
5273 mgrmon()->check_sub(s->sub_map[p->first]);
5274 } else if (p->first == "servicemap") {
5275 mgrstatmon()->check_sub(s->sub_map[p->first]);
5276 } else if (p->first == "config") {
5277 configmon()->check_sub(s);
5278 } else if (p->first.find("kv:") == 0) {
5279 kvmon()->check_sub(s->sub_map[p->first]);
5280 }
5281 }
5282
5283 if (reply) {
5284 // we only need to reply if the client is old enough to think it
5285 // has to send renewals.
5286 ConnectionRef con = m->get_connection();
5287 if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
5288 m->get_connection()->send_message(new MMonSubscribeAck(
5289 monmap->get_fsid(), (int)g_conf()->mon_subscribe_interval));
5290 }
5291
5292 }
5293
5294 void Monitor::handle_get_version(MonOpRequestRef op)
5295 {
5296 auto m = op->get_req<MMonGetVersion>();
5297 dout(10) << "handle_get_version " << *m << dendl;
5298 PaxosService *svc = NULL;
5299
5300 MonSession *s = op->get_session();
5301 ceph_assert(s);
5302
5303 if (!is_leader() && !is_peon()) {
5304 dout(10) << " waiting for quorum" << dendl;
5305 waitfor_quorum.push_back(new C_RetryMessage(this, op));
5306 goto out;
5307 }
5308
5309 if (m->what == "mdsmap") {
5310 svc = mdsmon();
5311 } else if (m->what == "fsmap") {
5312 svc = mdsmon();
5313 } else if (m->what == "osdmap") {
5314 svc = osdmon();
5315 } else if (m->what == "monmap") {
5316 svc = monmon();
5317 } else {
5318 derr << "invalid map type " << m->what << dendl;
5319 }
5320
5321 if (svc) {
5322 if (!svc->is_readable()) {
5323 svc->wait_for_readable(op, new C_RetryMessage(this, op));
5324 goto out;
5325 }
5326
5327 MMonGetVersionReply *reply = new MMonGetVersionReply();
5328 reply->handle = m->handle;
5329 reply->version = svc->get_last_committed();
5330 reply->oldest_version = svc->get_first_committed();
5331 reply->set_tid(m->get_tid());
5332
5333 m->get_connection()->send_message(reply);
5334 }
5335 out:
5336 return;
5337 }
5338
5339 bool Monitor::ms_handle_reset(Connection *con)
5340 {
5341 dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
5342
5343 // ignore lossless monitor sessions
5344 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
5345 return false;
5346
5347 auto priv = con->get_priv();
5348 auto s = static_cast<MonSession*>(priv.get());
5349 if (!s)
5350 return false;
5351
5352 // break any con <-> session ref cycle
5353 s->con->set_priv(nullptr);
5354
5355 if (is_shutdown())
5356 return false;
5357
5358 std::lock_guard l(lock);
5359
5360 dout(10) << "reset/close on session " << s->name << " " << s->addrs << dendl;
5361 if (!s->closed && s->item.is_on_list()) {
5362 std::lock_guard l(session_map_lock);
5363 remove_session(s);
5364 }
5365 return true;
5366 }
5367
5368 bool Monitor::ms_handle_refused(Connection *con)
5369 {
5370 // just log for now...
5371 dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
5372 return false;
5373 }
5374
5375 // -----
5376
5377 void Monitor::send_latest_monmap(Connection *con)
5378 {
5379 bufferlist bl;
5380 monmap->encode(bl, con->get_features());
5381 con->send_message(new MMonMap(bl));
5382 }
5383
5384 void Monitor::handle_mon_get_map(MonOpRequestRef op)
5385 {
5386 auto m = op->get_req<MMonGetMap>();
5387 dout(10) << "handle_mon_get_map" << dendl;
5388 send_latest_monmap(m->get_connection().get());
5389 }
5390
5391 int Monitor::load_metadata()
5392 {
5393 bufferlist bl;
5394 int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
5395 if (r)
5396 return r;
5397 auto it = bl.cbegin();
5398 decode(mon_metadata, it);
5399
5400 pending_metadata = mon_metadata;
5401 return 0;
5402 }
5403
5404 int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
5405 {
5406 ceph_assert(f);
5407 if (!mon_metadata.count(mon)) {
5408 err << "mon." << mon << " not found";
5409 return -EINVAL;
5410 }
5411 const Metadata& m = mon_metadata[mon];
5412 for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
5413 f->dump_string(p->first.c_str(), p->second);
5414 }
5415 return 0;
5416 }
5417
5418 void Monitor::count_metadata(const string& field, map<string,int> *out)
5419 {
5420 for (auto& p : mon_metadata) {
5421 auto q = p.second.find(field);
5422 if (q == p.second.end()) {
5423 (*out)["unknown"]++;
5424 } else {
5425 (*out)[q->second]++;
5426 }
5427 }
5428 }
5429
5430 void Monitor::count_metadata(const string& field, Formatter *f)
5431 {
5432 map<string,int> by_val;
5433 count_metadata(field, &by_val);
5434 f->open_object_section(field.c_str());
5435 for (auto& p : by_val) {
5436 f->dump_int(p.first.c_str(), p.second);
5437 }
5438 f->close_section();
5439 }
5440
5441 void Monitor::get_all_versions(std::map<string, list<string> > &versions)
5442 {
5443 // mon
5444 get_versions(versions);
5445 // osd
5446 osdmon()->get_versions(versions);
5447 // mgr
5448 mgrmon()->get_versions(versions);
5449 // mds
5450 mdsmon()->get_versions(versions);
5451 dout(20) << __func__ << " all versions=" << versions << dendl;
5452 }
5453
5454 void Monitor::get_versions(std::map<string, list<string> > &versions)
5455 {
5456 for (auto& [rank, metadata] : mon_metadata) {
5457 auto q = metadata.find("ceph_version_short");
5458 if (q == metadata.end()) {
5459 // not likely
5460 continue;
5461 }
5462 versions[q->second].push_back(string("mon.") + monmap->get_name(rank));
5463 }
5464 }
5465
5466 int Monitor::print_nodes(Formatter *f, ostream& err)
5467 {
5468 map<string, list<string> > mons; // hostname => mon
5469 for (map<int, Metadata>::iterator it = mon_metadata.begin();
5470 it != mon_metadata.end(); ++it) {
5471 const Metadata& m = it->second;
5472 Metadata::const_iterator hostname = m.find("hostname");
5473 if (hostname == m.end()) {
5474 // not likely though
5475 continue;
5476 }
5477 mons[hostname->second].push_back(monmap->get_name(it->first));
5478 }
5479
5480 dump_services(f, mons, "mon");
5481 return 0;
5482 }
5483
5484 // ----------------------------------------------
5485 // scrub
5486
5487 int Monitor::scrub_start()
5488 {
5489 dout(10) << __func__ << dendl;
5490 ceph_assert(is_leader());
5491
5492 if (!scrub_result.empty()) {
5493 clog->info() << "scrub already in progress";
5494 return -EBUSY;
5495 }
5496
5497 scrub_event_cancel();
5498 scrub_result.clear();
5499 scrub_state.reset(new ScrubState);
5500
5501 scrub();
5502 return 0;
5503 }
5504
5505 int Monitor::scrub()
5506 {
5507 ceph_assert(is_leader());
5508 ceph_assert(scrub_state);
5509
5510 scrub_cancel_timeout();
5511 wait_for_paxos_write();
5512 scrub_version = paxos->get_version();
5513
5514
5515 // scrub all keys if we're the only monitor in the quorum
5516 int32_t num_keys =
5517 (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
5518
5519 for (set<int>::iterator p = quorum.begin();
5520 p != quorum.end();
5521 ++p) {
5522 if (*p == rank)
5523 continue;
5524 MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
5525 num_keys);
5526 r->key = scrub_state->last_key;
5527 send_mon_message(r, *p);
5528 }
5529
5530 // scrub my keys
5531 bool r = _scrub(&scrub_result[rank],
5532 &scrub_state->last_key,
5533 &num_keys);
5534
5535 scrub_state->finished = !r;
5536
5537 // only after we got our scrub results do we really care whether the
5538 // other monitors are late on their results. Also, this way we avoid
5539 // triggering the timeout if we end up getting stuck in _scrub() for
5540 // longer than the duration of the timeout.
5541 scrub_reset_timeout();
5542
5543 if (quorum.size() == 1) {
5544 ceph_assert(scrub_state->finished == true);
5545 scrub_finish();
5546 }
5547 return 0;
5548 }
5549
5550 void Monitor::handle_scrub(MonOpRequestRef op)
5551 {
5552 auto m = op->get_req<MMonScrub>();
5553 dout(10) << __func__ << " " << *m << dendl;
5554 switch (m->op) {
5555 case MMonScrub::OP_SCRUB:
5556 {
5557 if (!is_peon())
5558 break;
5559
5560 wait_for_paxos_write();
5561
5562 if (m->version != paxos->get_version())
5563 break;
5564
5565 MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
5566 m->version,
5567 m->num_keys);
5568
5569 reply->key = m->key;
5570 _scrub(&reply->result, &reply->key, &reply->num_keys);
5571 m->get_connection()->send_message(reply);
5572 }
5573 break;
5574
5575 case MMonScrub::OP_RESULT:
5576 {
5577 if (!is_leader())
5578 break;
5579 if (m->version != scrub_version)
5580 break;
5581 // reset the timeout each time we get a result
5582 scrub_reset_timeout();
5583
5584 int from = m->get_source().num();
5585 ceph_assert(scrub_result.count(from) == 0);
5586 scrub_result[from] = m->result;
5587
5588 if (scrub_result.size() == quorum.size()) {
5589 scrub_check_results();
5590 scrub_result.clear();
5591 if (scrub_state->finished)
5592 scrub_finish();
5593 else
5594 scrub();
5595 }
5596 }
5597 break;
5598 }
5599 }
5600
5601 bool Monitor::_scrub(ScrubResult *r,
5602 pair<string,string> *start,
5603 int *num_keys)
5604 {
5605 ceph_assert(r != NULL);
5606 ceph_assert(start != NULL);
5607 ceph_assert(num_keys != NULL);
5608
5609 set<string> prefixes = get_sync_targets_names();
5610 prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5611
5612 dout(10) << __func__ << " start (" << *start << ")"
5613 << " num_keys " << *num_keys << dendl;
5614
5615 MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
5616
5617 int scrubbed_keys = 0;
5618 pair<string,string> last_key;
5619
5620 while (it->has_next_chunk()) {
5621
5622 if (*num_keys > 0 && scrubbed_keys == *num_keys)
5623 break;
5624
5625 pair<string,string> k = it->get_next_key();
5626 if (prefixes.count(k.first) == 0)
5627 continue;
5628
5629 if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
5630 (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
5631 dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
5632 << dendl;
5633 continue;
5634 }
5635
5636 bufferlist bl;
5637 int err = store->get(k.first, k.second, bl);
5638 ceph_assert(err == 0);
5639
5640 uint32_t key_crc = bl.crc32c(0);
5641 dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
5642 << " crc " << key_crc << dendl;
5643 r->prefix_keys[k.first]++;
5644 if (r->prefix_crc.count(k.first) == 0) {
5645 r->prefix_crc[k.first] = 0;
5646 }
5647 r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
5648
5649 if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
5650 (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
5651 dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
5652 r->prefix_crc[k.first] += 1;
5653 }
5654
5655 ++scrubbed_keys;
5656 last_key = k;
5657 }
5658
5659 dout(20) << __func__ << " last_key (" << last_key << ")"
5660 << " scrubbed_keys " << scrubbed_keys
5661 << " has_next " << it->has_next_chunk() << dendl;
5662
5663 *start = last_key;
5664 *num_keys = scrubbed_keys;
5665
5666 return it->has_next_chunk();
5667 }
5668
5669 void Monitor::scrub_check_results()
5670 {
5671 dout(10) << __func__ << dendl;
5672
5673 // compare
5674 int errors = 0;
5675 ScrubResult& mine = scrub_result[rank];
5676 for (map<int,ScrubResult>::iterator p = scrub_result.begin();
5677 p != scrub_result.end();
5678 ++p) {
5679 if (p->first == rank)
5680 continue;
5681 if (p->second != mine) {
5682 ++errors;
5683 clog->error() << "scrub mismatch";
5684 clog->error() << " mon." << rank << " " << mine;
5685 clog->error() << " mon." << p->first << " " << p->second;
5686 }
5687 }
5688 if (!errors)
5689 clog->debug() << "scrub ok on " << quorum << ": " << mine;
5690 }
5691
5692 inline void Monitor::scrub_timeout()
5693 {
5694 dout(1) << __func__ << " restarting scrub" << dendl;
5695 scrub_reset();
5696 scrub_start();
5697 }
5698
5699 void Monitor::scrub_finish()
5700 {
5701 dout(10) << __func__ << dendl;
5702 scrub_reset();
5703 scrub_event_start();
5704 }
5705
5706 void Monitor::scrub_reset()
5707 {
5708 dout(10) << __func__ << dendl;
5709 scrub_cancel_timeout();
5710 scrub_version = 0;
5711 scrub_result.clear();
5712 scrub_state.reset();
5713 }
5714
5715 inline void Monitor::scrub_update_interval(ceph::timespan interval)
5716 {
5717 // we don't care about changes if we are not the leader.
5718 // changes will be visible if we become the leader.
5719 if (!is_leader())
5720 return;
5721
5722 dout(1) << __func__ << " new interval = " << interval << dendl;
5723
5724 // if scrub already in progress, all changes will already be visible during
5725 // the next round. Nothing to do.
5726 if (scrub_state != NULL)
5727 return;
5728
5729 scrub_event_cancel();
5730 scrub_event_start();
5731 }
5732
5733 void Monitor::scrub_event_start()
5734 {
5735 dout(10) << __func__ << dendl;
5736
5737 if (scrub_event)
5738 scrub_event_cancel();
5739
5740 auto scrub_interval =
5741 cct->_conf.get_val<std::chrono::seconds>("mon_scrub_interval");
5742 if (scrub_interval == std::chrono::seconds::zero()) {
5743 dout(1) << __func__ << " scrub event is disabled"
5744 << " (mon_scrub_interval = " << scrub_interval
5745 << ")" << dendl;
5746 return;
5747 }
5748
5749 scrub_event = timer.add_event_after(
5750 scrub_interval,
5751 new C_MonContext{this, [this](int) {
5752 scrub_start();
5753 }});
5754 }
5755
5756 void Monitor::scrub_event_cancel()
5757 {
5758 dout(10) << __func__ << dendl;
5759 if (scrub_event) {
5760 timer.cancel_event(scrub_event);
5761 scrub_event = NULL;
5762 }
5763 }
5764
5765 inline void Monitor::scrub_cancel_timeout()
5766 {
5767 if (scrub_timeout_event) {
5768 timer.cancel_event(scrub_timeout_event);
5769 scrub_timeout_event = NULL;
5770 }
5771 }
5772
5773 void Monitor::scrub_reset_timeout()
5774 {
5775 dout(15) << __func__ << " reset timeout event" << dendl;
5776 scrub_cancel_timeout();
5777 scrub_timeout_event = timer.add_event_after(
5778 g_conf()->mon_scrub_timeout,
5779 new C_MonContext{this, [this](int) {
5780 scrub_timeout();
5781 }});
5782 }
5783
5784 /************ TICK ***************/
5785 void Monitor::new_tick()
5786 {
5787 timer.add_event_after(g_conf()->mon_tick_interval, new C_MonContext{this, [this](int) {
5788 tick();
5789 }});
5790 }
5791
5792 void Monitor::tick()
5793 {
5794 // ok go.
5795 dout(11) << "tick" << dendl;
5796 const utime_t now = ceph_clock_now();
5797
5798 // Check if we need to emit any delayed health check updated messages
5799 if (is_leader()) {
5800 const auto min_period = g_conf().get_val<int64_t>(
5801 "mon_health_log_update_period");
5802 for (auto& svc : paxos_service) {
5803 auto health = svc->get_health_checks();
5804
5805 for (const auto &i : health.checks) {
5806 const std::string &code = i.first;
5807 const std::string &summary = i.second.summary;
5808 const health_status_t severity = i.second.severity;
5809
5810 auto status_iter = health_check_log_times.find(code);
5811 if (status_iter == health_check_log_times.end()) {
5812 continue;
5813 }
5814
5815 auto &log_status = status_iter->second;
5816 bool const changed = log_status.last_message != summary
5817 || log_status.severity != severity;
5818
5819 if (changed && now - log_status.updated_at > min_period) {
5820 log_status.last_message = summary;
5821 log_status.updated_at = now;
5822 log_status.severity = severity;
5823
5824 ostringstream ss;
5825 ss << "Health check update: " << summary << " (" << code << ")";
5826 clog->health(severity) << ss.str();
5827 }
5828 }
5829 }
5830 }
5831
5832
5833 for (auto& svc : paxos_service) {
5834 svc->tick();
5835 svc->maybe_trim();
5836 }
5837
5838 // trim sessions
5839 {
5840 std::lock_guard l(session_map_lock);
5841 auto p = session_map.sessions.begin();
5842
5843 bool out_for_too_long = (!exited_quorum.is_zero() &&
5844 now > (exited_quorum + 2*g_conf()->mon_lease));
5845
5846 while (!p.end()) {
5847 MonSession *s = *p;
5848 ++p;
5849
5850 // don't trim monitors
5851 if (s->name.is_mon())
5852 continue;
5853
5854 if (s->session_timeout < now && s->con) {
5855 // check keepalive, too
5856 s->session_timeout = s->con->get_last_keepalive();
5857 s->session_timeout += g_conf()->mon_session_timeout;
5858 }
5859 if (s->session_timeout < now) {
5860 dout(10) << " trimming session " << s->con << " " << s->name
5861 << " " << s->addrs
5862 << " (timeout " << s->session_timeout
5863 << " < now " << now << ")" << dendl;
5864 } else if (out_for_too_long) {
5865 // boot the client Session because we've taken too long getting back in
5866 dout(10) << " trimming session " << s->con << " " << s->name
5867 << " because we've been out of quorum too long" << dendl;
5868 } else {
5869 continue;
5870 }
5871
5872 s->con->mark_down();
5873 remove_session(s);
5874 logger->inc(l_mon_session_trim);
5875 }
5876 }
5877 sync_trim_providers();
5878
5879 if (!maybe_wait_for_quorum.empty()) {
5880 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5881 }
5882
5883 if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5884 // this is only necessary on upgraded clusters.
5885 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5886 prepare_new_fingerprint(t);
5887 paxos->trigger_propose();
5888 }
5889
5890 mgr_client.update_daemon_health(get_health_metrics());
5891 new_tick();
5892 }
5893
5894 vector<DaemonHealthMetric> Monitor::get_health_metrics()
5895 {
5896 vector<DaemonHealthMetric> metrics;
5897
5898 utime_t oldest_secs;
5899 const utime_t now = ceph_clock_now();
5900 auto too_old = now;
5901 too_old -= g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count();
5902 int slow = 0;
5903 TrackedOpRef oldest_op;
5904 auto count_slow_ops = [&](TrackedOp& op) {
5905 if (op.get_initiated() < too_old) {
5906 slow++;
5907 if (!oldest_op || op.get_initiated() < oldest_op->get_initiated()) {
5908 oldest_op = &op;
5909 }
5910 return true;
5911 } else {
5912 return false;
5913 }
5914 };
5915 if (op_tracker.visit_ops_in_flight(&oldest_secs, count_slow_ops)) {
5916 if (slow) {
5917 derr << __func__ << " reporting " << slow << " slow ops, oldest is "
5918 << oldest_op->get_desc() << dendl;
5919 }
5920 metrics.emplace_back(daemon_metric::SLOW_OPS, slow, oldest_secs);
5921 } else {
5922 metrics.emplace_back(daemon_metric::SLOW_OPS, 0, 0);
5923 }
5924 return metrics;
5925 }
5926
5927 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5928 {
5929 uuid_d nf;
5930 nf.generate_random();
5931 dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5932
5933 bufferlist bl;
5934 encode(nf, bl);
5935 t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5936 }
5937
5938 int Monitor::check_fsid()
5939 {
5940 bufferlist ebl;
5941 int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5942 if (r == -ENOENT)
5943 return r;
5944 ceph_assert(r == 0);
5945
5946 string es(ebl.c_str(), ebl.length());
5947
5948 // only keep the first line
5949 size_t pos = es.find_first_of('\n');
5950 if (pos != string::npos)
5951 es.resize(pos);
5952
5953 dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
5954 uuid_d ondisk;
5955 if (!ondisk.parse(es.c_str())) {
5956 derr << "error: unable to parse uuid" << dendl;
5957 return -EINVAL;
5958 }
5959
5960 if (monmap->get_fsid() != ondisk) {
5961 derr << "error: cluster_uuid file exists with value " << ondisk
5962 << ", != our uuid " << monmap->get_fsid() << dendl;
5963 return -EEXIST;
5964 }
5965
5966 return 0;
5967 }
5968
5969 int Monitor::write_fsid()
5970 {
5971 auto t(std::make_shared<MonitorDBStore::Transaction>());
5972 write_fsid(t);
5973 int r = store->apply_transaction(t);
5974 return r;
5975 }
5976
5977 int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
5978 {
5979 ostringstream ss;
5980 ss << monmap->get_fsid() << "\n";
5981 string us = ss.str();
5982
5983 bufferlist b;
5984 b.append(us);
5985
5986 t->put(MONITOR_NAME, "cluster_uuid", b);
5987 return 0;
5988 }
5989
5990 /*
5991 * this is the closest thing to a traditional 'mkfs' for ceph.
5992 * initialize the monitor state machines to their initial values.
5993 */
5994 int Monitor::mkfs(bufferlist& osdmapbl)
5995 {
5996 auto t(std::make_shared<MonitorDBStore::Transaction>());
5997
5998 // verify cluster fsid
5999 int r = check_fsid();
6000 if (r < 0 && r != -ENOENT)
6001 return r;
6002
6003 bufferlist magicbl;
6004 magicbl.append(CEPH_MON_ONDISK_MAGIC);
6005 magicbl.append("\n");
6006 t->put(MONITOR_NAME, "magic", magicbl);
6007
6008
6009 features = get_initial_supported_features();
6010 write_features(t);
6011
6012 // save monmap, osdmap, keyring.
6013 bufferlist monmapbl;
6014 monmap->encode(monmapbl, CEPH_FEATURES_ALL);
6015 monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
6016 t->put("mkfs", "monmap", monmapbl);
6017
6018 if (osdmapbl.length()) {
6019 // make sure it's a valid osdmap
6020 try {
6021 OSDMap om;
6022 om.decode(osdmapbl);
6023 }
6024 catch (ceph::buffer::error& e) {
6025 derr << "error decoding provided osdmap: " << e.what() << dendl;
6026 return -EINVAL;
6027 }
6028 t->put("mkfs", "osdmap", osdmapbl);
6029 }
6030
6031 if (is_keyring_required()) {
6032 KeyRing keyring;
6033 string keyring_filename;
6034
6035 r = ceph_resolve_file_search(g_conf()->keyring, keyring_filename);
6036 if (r) {
6037 if (g_conf()->key != "") {
6038 string keyring_plaintext = "[mon.]\n\tkey = " + g_conf()->key +
6039 "\n\tcaps mon = \"allow *\"\n";
6040 bufferlist bl;
6041 bl.append(keyring_plaintext);
6042 try {
6043 auto i = bl.cbegin();
6044 keyring.decode(i);
6045 }
6046 catch (const ceph::buffer::error& e) {
6047 derr << "error decoding keyring " << keyring_plaintext
6048 << ": " << e.what() << dendl;
6049 return -EINVAL;
6050 }
6051 } else {
6052 derr << "unable to find a keyring on " << g_conf()->keyring
6053 << ": " << cpp_strerror(r) << dendl;
6054 return r;
6055 }
6056 } else {
6057 r = keyring.load(g_ceph_context, keyring_filename);
6058 if (r < 0) {
6059 derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
6060 return r;
6061 }
6062 }
6063
6064 // put mon. key in external keyring; seed with everything else.
6065 extract_save_mon_key(keyring);
6066
6067 bufferlist keyringbl;
6068 keyring.encode_plaintext(keyringbl);
6069 t->put("mkfs", "keyring", keyringbl);
6070 }
6071 write_fsid(t);
6072 store->apply_transaction(t);
6073
6074 return 0;
6075 }
6076
6077 int Monitor::write_default_keyring(bufferlist& bl)
6078 {
6079 ostringstream os;
6080 os << g_conf()->mon_data << "/keyring";
6081
6082 int err = 0;
6083 int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT|O_CLOEXEC, 0600);
6084 if (fd < 0) {
6085 err = -errno;
6086 dout(0) << __func__ << " failed to open " << os.str()
6087 << ": " << cpp_strerror(err) << dendl;
6088 return err;
6089 }
6090
6091 err = bl.write_fd(fd);
6092 if (!err)
6093 ::fsync(fd);
6094 VOID_TEMP_FAILURE_RETRY(::close(fd));
6095
6096 return err;
6097 }
6098
6099 void Monitor::extract_save_mon_key(KeyRing& keyring)
6100 {
6101 EntityName mon_name;
6102 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
6103 EntityAuth mon_key;
6104 if (keyring.get_auth(mon_name, mon_key)) {
6105 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
6106 KeyRing pkey;
6107 pkey.add(mon_name, mon_key);
6108 bufferlist bl;
6109 pkey.encode_plaintext(bl);
6110 write_default_keyring(bl);
6111 keyring.remove(mon_name);
6112 }
6113 }
6114
6115 // AuthClient methods -- for mon <-> mon communication
6116 int Monitor::get_auth_request(
6117 Connection *con,
6118 AuthConnectionMeta *auth_meta,
6119 uint32_t *method,
6120 vector<uint32_t> *preferred_modes,
6121 bufferlist *out)
6122 {
6123 std::scoped_lock l(auth_lock);
6124 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
6125 con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
6126 return -EACCES;
6127 }
6128 AuthAuthorizer *auth;
6129 if (!get_authorizer(con->get_peer_type(), &auth)) {
6130 return -EACCES;
6131 }
6132 auth_meta->authorizer.reset(auth);
6133 auth_registry.get_supported_modes(con->get_peer_type(),
6134 auth->protocol,
6135 preferred_modes);
6136 *method = auth->protocol;
6137 *out = auth->bl;
6138 return 0;
6139 }
6140
6141 int Monitor::handle_auth_reply_more(
6142 Connection *con,
6143 AuthConnectionMeta *auth_meta,
6144 const bufferlist& bl,
6145 bufferlist *reply)
6146 {
6147 std::scoped_lock l(auth_lock);
6148 if (!auth_meta->authorizer) {
6149 derr << __func__ << " no authorizer?" << dendl;
6150 return -EACCES;
6151 }
6152 auth_meta->authorizer->add_challenge(cct, bl);
6153 *reply = auth_meta->authorizer->bl;
6154 return 0;
6155 }
6156
6157 int Monitor::handle_auth_done(
6158 Connection *con,
6159 AuthConnectionMeta *auth_meta,
6160 uint64_t global_id,
6161 uint32_t con_mode,
6162 const bufferlist& bl,
6163 CryptoKey *session_key,
6164 std::string *connection_secret)
6165 {
6166 std::scoped_lock l(auth_lock);
6167 // verify authorizer reply
6168 auto p = bl.begin();
6169 if (!auth_meta->authorizer->verify_reply(p, connection_secret)) {
6170 dout(0) << __func__ << " failed verifying authorizer reply" << dendl;
6171 return -EACCES;
6172 }
6173 auth_meta->session_key = auth_meta->authorizer->session_key;
6174 return 0;
6175 }
6176
6177 int Monitor::handle_auth_bad_method(
6178 Connection *con,
6179 AuthConnectionMeta *auth_meta,
6180 uint32_t old_auth_method,
6181 int result,
6182 const std::vector<uint32_t>& allowed_methods,
6183 const std::vector<uint32_t>& allowed_modes)
6184 {
6185 derr << __func__ << " hmm, they didn't like " << old_auth_method
6186 << " result " << cpp_strerror(result) << dendl;
6187 return -EACCES;
6188 }
6189
6190 bool Monitor::get_authorizer(int service_id, AuthAuthorizer **authorizer)
6191 {
6192 dout(10) << "get_authorizer for " << ceph_entity_type_name(service_id)
6193 << dendl;
6194
6195 if (is_shutdown())
6196 return false;
6197
6198 // we only connect to other monitors and mgr; every else connects to us.
6199 if (service_id != CEPH_ENTITY_TYPE_MON &&
6200 service_id != CEPH_ENTITY_TYPE_MGR)
6201 return false;
6202
6203 if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
6204 // auth_none
6205 dout(20) << __func__ << " building auth_none authorizer" << dendl;
6206 AuthNoneClientHandler handler{g_ceph_context};
6207 handler.set_global_id(0);
6208 *authorizer = handler.build_authorizer(service_id);
6209 return true;
6210 }
6211
6212 CephXServiceTicketInfo auth_ticket_info;
6213 CephXSessionAuthInfo info;
6214 int ret;
6215
6216 EntityName name;
6217 name.set_type(CEPH_ENTITY_TYPE_MON);
6218 auth_ticket_info.ticket.name = name;
6219 auth_ticket_info.ticket.global_id = 0;
6220
6221 if (service_id == CEPH_ENTITY_TYPE_MON) {
6222 // mon to mon authentication uses the private monitor shared key and not the
6223 // rotating key
6224 CryptoKey secret;
6225 if (!keyring.get_secret(name, secret) &&
6226 !key_server.get_secret(name, secret)) {
6227 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
6228 << dendl;
6229 stringstream ss, ds;
6230 int err = key_server.list_secrets(ds);
6231 if (err < 0)
6232 ss << "no installed auth entries!";
6233 else
6234 ss << "installed auth entries:";
6235 dout(0) << ss.str() << "\n" << ds.str() << dendl;
6236 return false;
6237 }
6238
6239 ret = key_server.build_session_auth_info(
6240 service_id, auth_ticket_info.ticket, secret, (uint64_t)-1, info);
6241 if (ret < 0) {
6242 dout(0) << __func__ << " failed to build mon session_auth_info "
6243 << cpp_strerror(ret) << dendl;
6244 return false;
6245 }
6246 } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
6247 // mgr
6248 ret = key_server.build_session_auth_info(
6249 service_id, auth_ticket_info.ticket, info);
6250 if (ret < 0) {
6251 derr << __func__ << " failed to build mgr service session_auth_info "
6252 << cpp_strerror(ret) << dendl;
6253 return false;
6254 }
6255 } else {
6256 ceph_abort(); // see check at top of fn
6257 }
6258
6259 CephXTicketBlob blob;
6260 if (!cephx_build_service_ticket_blob(cct, info, blob)) {
6261 dout(0) << "get_authorizer failed to build service ticket" << dendl;
6262 return false;
6263 }
6264 bufferlist ticket_data;
6265 encode(blob, ticket_data);
6266
6267 auto iter = ticket_data.cbegin();
6268 CephXTicketHandler handler(g_ceph_context, service_id);
6269 decode(handler.ticket, iter);
6270
6271 handler.session_key = info.session_key;
6272
6273 *authorizer = handler.build_authorizer(0);
6274
6275 return true;
6276 }
6277
6278 int Monitor::handle_auth_request(
6279 Connection *con,
6280 AuthConnectionMeta *auth_meta,
6281 bool more,
6282 uint32_t auth_method,
6283 const bufferlist &payload,
6284 bufferlist *reply)
6285 {
6286 std::scoped_lock l(auth_lock);
6287
6288 // NOTE: be careful, the Connection hasn't fully negotiated yet, so
6289 // e.g., peer_features, peer_addrs, and others are still unknown.
6290
6291 dout(10) << __func__ << " con " << con << (more ? " (more)":" (start)")
6292 << " method " << auth_method
6293 << " payload " << payload.length()
6294 << dendl;
6295 if (!payload.length()) {
6296 if (!con->is_msgr2() &&
6297 con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
6298 // for v1 connections, we tolerate no authorizer (from
6299 // non-monitors), because authentication happens via MAuth
6300 // messages.
6301 return 1;
6302 }
6303 return -EACCES;
6304 }
6305 if (!more) {
6306 auth_meta->auth_mode = payload[0];
6307 }
6308
6309 if (auth_meta->auth_mode >= AUTH_MODE_AUTHORIZER &&
6310 auth_meta->auth_mode <= AUTH_MODE_AUTHORIZER_MAX) {
6311 AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
6312 auth_method);
6313 if (!ah) {
6314 lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
6315 << auth_method << dendl;
6316 return -EOPNOTSUPP;
6317 }
6318 bool was_challenge = (bool)auth_meta->authorizer_challenge;
6319 bool isvalid = ah->verify_authorizer(
6320 cct,
6321 keyring,
6322 payload,
6323 auth_meta->get_connection_secret_length(),
6324 reply,
6325 &con->peer_name,
6326 &con->peer_global_id,
6327 &con->peer_caps_info,
6328 &auth_meta->session_key,
6329 &auth_meta->connection_secret,
6330 &auth_meta->authorizer_challenge);
6331 if (isvalid) {
6332 ms_handle_authentication(con);
6333 return 1;
6334 }
6335 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
6336 return 0;
6337 }
6338 dout(10) << __func__ << " bad authorizer on " << con << dendl;
6339 return -EACCES;
6340 } else if (auth_meta->auth_mode < AUTH_MODE_MON ||
6341 auth_meta->auth_mode > AUTH_MODE_MON_MAX) {
6342 derr << __func__ << " unrecognized auth mode " << auth_meta->auth_mode
6343 << dendl;
6344 return -EACCES;
6345 }
6346
6347 // wait until we've formed an initial quorum on mkfs so that we have
6348 // the initial keys (e.g., client.admin).
6349 if (authmon()->get_last_committed() == 0) {
6350 dout(10) << __func__ << " haven't formed initial quorum, EBUSY" << dendl;
6351 return -EBUSY;
6352 }
6353
6354 RefCountedPtr priv;
6355 MonSession *s;
6356 int32_t r = 0;
6357 auto p = payload.begin();
6358 if (!more) {
6359 if (con->get_priv()) {
6360 return -EACCES; // wtf
6361 }
6362
6363 // handler?
6364 unique_ptr<AuthServiceHandler> auth_handler{get_auth_service_handler(
6365 auth_method, g_ceph_context, &key_server)};
6366 if (!auth_handler) {
6367 dout(1) << __func__ << " auth_method " << auth_method << " not supported"
6368 << dendl;
6369 return -EOPNOTSUPP;
6370 }
6371
6372 uint8_t mode;
6373 EntityName entity_name;
6374
6375 try {
6376 decode(mode, p);
6377 if (mode < AUTH_MODE_MON ||
6378 mode > AUTH_MODE_MON_MAX) {
6379 dout(1) << __func__ << " invalid mode " << (int)mode << dendl;
6380 return -EACCES;
6381 }
6382 assert(mode >= AUTH_MODE_MON && mode <= AUTH_MODE_MON_MAX);
6383 decode(entity_name, p);
6384 decode(con->peer_global_id, p);
6385 } catch (ceph::buffer::error& e) {
6386 dout(1) << __func__ << " failed to decode, " << e.what() << dendl;
6387 return -EACCES;
6388 }
6389
6390 // supported method?
6391 if (entity_name.get_type() == CEPH_ENTITY_TYPE_MON ||
6392 entity_name.get_type() == CEPH_ENTITY_TYPE_OSD ||
6393 entity_name.get_type() == CEPH_ENTITY_TYPE_MDS ||
6394 entity_name.get_type() == CEPH_ENTITY_TYPE_MGR) {
6395 if (!auth_cluster_required.is_supported_auth(auth_method)) {
6396 dout(10) << __func__ << " entity " << entity_name << " method "
6397 << auth_method << " not among supported "
6398 << auth_cluster_required.get_supported_set() << dendl;
6399 return -EOPNOTSUPP;
6400 }
6401 } else {
6402 if (!auth_service_required.is_supported_auth(auth_method)) {
6403 dout(10) << __func__ << " entity " << entity_name << " method "
6404 << auth_method << " not among supported "
6405 << auth_cluster_required.get_supported_set() << dendl;
6406 return -EOPNOTSUPP;
6407 }
6408 }
6409
6410 // for msgr1 we would do some weirdness here to ensure signatures
6411 // are supported by the client if we require it. for msgr2 that
6412 // is not necessary.
6413
6414 bool is_new_global_id = false;
6415 if (!con->peer_global_id) {
6416 con->peer_global_id = authmon()->_assign_global_id();
6417 if (!con->peer_global_id) {
6418 dout(1) << __func__ << " failed to assign global_id" << dendl;
6419 return -EBUSY;
6420 }
6421 is_new_global_id = true;
6422 }
6423
6424 // set up partial session
6425 s = new MonSession(con);
6426 s->auth_handler = auth_handler.release();
6427 con->set_priv(RefCountedPtr{s, false});
6428
6429 r = s->auth_handler->start_session(
6430 entity_name,
6431 con->peer_global_id,
6432 is_new_global_id,
6433 reply,
6434 &con->peer_caps_info);
6435 } else {
6436 priv = con->get_priv();
6437 if (!priv) {
6438 // this can happen if the async ms_handle_reset event races with
6439 // the unlocked call into handle_auth_request
6440 return -EACCES;
6441 }
6442 s = static_cast<MonSession*>(priv.get());
6443 r = s->auth_handler->handle_request(
6444 p,
6445 auth_meta->get_connection_secret_length(),
6446 reply,
6447 &con->peer_caps_info,
6448 &auth_meta->session_key,
6449 &auth_meta->connection_secret);
6450 }
6451 if (r > 0 &&
6452 !s->authenticated) {
6453 ms_handle_authentication(con);
6454 }
6455
6456 dout(30) << " r " << r << " reply:\n";
6457 reply->hexdump(*_dout);
6458 *_dout << dendl;
6459 return r;
6460 }
6461
6462 void Monitor::ms_handle_accept(Connection *con)
6463 {
6464 auto priv = con->get_priv();
6465 MonSession *s = static_cast<MonSession*>(priv.get());
6466 if (!s) {
6467 // legacy protocol v1?
6468 dout(10) << __func__ << " con " << con << " no session" << dendl;
6469 return;
6470 }
6471
6472 if (s->item.is_on_list()) {
6473 dout(10) << __func__ << " con " << con << " session " << s
6474 << " already on list" << dendl;
6475 } else {
6476 std::lock_guard l(session_map_lock);
6477 if (state == STATE_SHUTDOWN) {
6478 dout(10) << __func__ << " ignoring new con " << con << " (shutdown)" << dendl;
6479 con->mark_down();
6480 return;
6481 }
6482 dout(10) << __func__ << " con " << con << " session " << s
6483 << " registering session for "
6484 << con->get_peer_addrs() << dendl;
6485 s->_ident(entity_name_t(con->get_peer_type(), con->get_peer_id()),
6486 con->get_peer_addrs());
6487 session_map.add_session(s);
6488 }
6489 }
6490
6491 int Monitor::ms_handle_authentication(Connection *con)
6492 {
6493 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
6494 // mon <-> mon connections need no Session, and setting one up
6495 // creates an awkward ref cycle between Session and Connection.
6496 return 1;
6497 }
6498
6499 auto priv = con->get_priv();
6500 MonSession *s = static_cast<MonSession*>(priv.get());
6501 if (!s) {
6502 // must be msgr2, otherwise dispatch would have set up the session.
6503 s = session_map.new_session(
6504 entity_name_t(con->get_peer_type(), -1), // we don't know yet
6505 con->get_peer_addrs(),
6506 con);
6507 assert(s);
6508 dout(10) << __func__ << " adding session " << s << " to con " << con
6509 << dendl;
6510 con->set_priv(s);
6511 logger->set(l_mon_num_sessions, session_map.get_size());
6512 logger->inc(l_mon_session_add);
6513 }
6514 dout(10) << __func__ << " session " << s << " con " << con
6515 << " addr " << s->con->get_peer_addr()
6516 << " " << *s << dendl;
6517
6518 AuthCapsInfo &caps_info = con->get_peer_caps_info();
6519 int ret = 0;
6520 if (caps_info.allow_all) {
6521 s->caps.set_allow_all();
6522 s->authenticated = true;
6523 ret = 1;
6524 } else if (caps_info.caps.length()) {
6525 bufferlist::const_iterator p = caps_info.caps.cbegin();
6526 string str;
6527 try {
6528 decode(str, p);
6529 } catch (const ceph::buffer::error &err) {
6530 derr << __func__ << " corrupt cap data for " << con->get_peer_entity_name()
6531 << " in auth db" << dendl;
6532 str.clear();
6533 ret = -EACCES;
6534 }
6535 if (ret >= 0) {
6536 if (s->caps.parse(str, NULL)) {
6537 s->authenticated = true;
6538 ret = 1;
6539 } else {
6540 derr << __func__ << " unparseable caps '" << str << "' for "
6541 << con->get_peer_entity_name() << dendl;
6542 ret = -EACCES;
6543 }
6544 }
6545 }
6546
6547 return ret;
6548 }
6549
6550 void Monitor::set_mon_crush_location(const string& loc)
6551 {
6552 if (loc.empty()) {
6553 return;
6554 }
6555 vector<string> loc_vec;
6556 loc_vec.push_back(loc);
6557 CrushWrapper::parse_loc_map(loc_vec, &crush_loc);
6558 need_set_crush_loc = true;
6559 }
6560
6561 void Monitor::notify_new_monmap(bool can_change_external_state)
6562 {
6563 if (need_set_crush_loc) {
6564 auto my_info_i = monmap->mon_info.find(name);
6565 if (my_info_i != monmap->mon_info.end() &&
6566 my_info_i->second.crush_loc == crush_loc) {
6567 need_set_crush_loc = false;
6568 }
6569 }
6570 elector.notify_strategy_maybe_changed(monmap->strategy);
6571 dout(30) << __func__ << "we have " << monmap->removed_ranks.size() << " removed ranks" << dendl;
6572 for (auto i = monmap->removed_ranks.rbegin();
6573 i != monmap->removed_ranks.rend(); ++i) {
6574 int rank = *i;
6575 dout(10) << __func__ << "removing rank " << rank << dendl;
6576 elector.notify_rank_removed(rank);
6577 }
6578
6579 if (monmap->stretch_mode_enabled) {
6580 try_engage_stretch_mode();
6581 }
6582
6583 if (is_stretch_mode()) {
6584 if (!monmap->stretch_marked_down_mons.empty()) {
6585 set_degraded_stretch_mode();
6586 }
6587 }
6588 set_elector_disallowed_leaders(can_change_external_state);
6589 }
6590
6591 void Monitor::set_elector_disallowed_leaders(bool allow_election)
6592 {
6593 set<int> dl;
6594 for (auto name : monmap->disallowed_leaders) {
6595 dl.insert(monmap->get_rank(name));
6596 }
6597 if (is_stretch_mode()) {
6598 for (auto name : monmap->stretch_marked_down_mons) {
6599 dl.insert(monmap->get_rank(name));
6600 }
6601 dl.insert(monmap->get_rank(monmap->tiebreaker_mon));
6602 }
6603
6604 bool disallowed_changed = elector.set_disallowed_leaders(dl);
6605 if (disallowed_changed && allow_election) {
6606 elector.call_election();
6607 }
6608 }
6609
6610 struct CMonEnableStretchMode : public Context {
6611 Monitor *m;
6612 CMonEnableStretchMode(Monitor *mon) : m(mon) {}
6613 void finish(int r) {
6614 m->try_engage_stretch_mode();
6615 }
6616 };
6617 void Monitor::try_engage_stretch_mode()
6618 {
6619 dout(20) << __func__ << dendl;
6620 if (stretch_mode_engaged) return;
6621 if (!osdmon()->is_readable()) {
6622 osdmon()->wait_for_readable_ctx(new CMonEnableStretchMode(this));
6623 }
6624 if (osdmon()->osdmap.stretch_mode_enabled &&
6625 monmap->stretch_mode_enabled) {
6626 dout(10) << "Engaging stretch mode!" << dendl;
6627 stretch_mode_engaged = true;
6628 int32_t stretch_divider_id = osdmon()->osdmap.stretch_mode_bucket;
6629 stretch_bucket_divider = osdmon()->osdmap.
6630 crush->get_type_name(stretch_divider_id);
6631 disconnect_disallowed_stretch_sessions();
6632 }
6633 }
6634
6635 void Monitor::do_stretch_mode_election_work()
6636 {
6637 dout(20) << __func__ << dendl;
6638 if (!is_stretch_mode() ||
6639 !is_leader()) return;
6640 dout(20) << "checking for degraded stretch mode" << dendl;
6641 map<string, set<string>> old_dead_buckets;
6642 old_dead_buckets.swap(dead_mon_buckets);
6643 up_mon_buckets.clear();
6644 // identify if we've lost a CRUSH bucket, request OSDMonitor check for death
6645 map<string,set<string>> down_mon_buckets;
6646 for (unsigned i = 0; i < monmap->size(); ++i) {
6647 const auto &mi = monmap->mon_info[monmap->get_name(i)];
6648 auto ci = mi.crush_loc.find(stretch_bucket_divider);
6649 ceph_assert(ci != mi.crush_loc.end());
6650 if (quorum.count(i)) {
6651 up_mon_buckets.insert(ci->second);
6652 } else {
6653 down_mon_buckets[ci->second].insert(mi.name);
6654 }
6655 }
6656 dout(20) << "prior dead_mon_buckets: " << old_dead_buckets
6657 << "; down_mon_buckets: " << down_mon_buckets
6658 << "; up_mon_buckets: " << up_mon_buckets << dendl;
6659 for (const auto& di : down_mon_buckets) {
6660 if (!up_mon_buckets.count(di.first)) {
6661 dead_mon_buckets[di.first] = di.second;
6662 }
6663 }
6664 dout(20) << "new dead_mon_buckets " << dead_mon_buckets << dendl;
6665
6666 if (dead_mon_buckets != old_dead_buckets &&
6667 dead_mon_buckets.size() >= old_dead_buckets.size()) {
6668 maybe_go_degraded_stretch_mode();
6669 }
6670 }
6671
6672 struct CMonGoDegraded : public Context {
6673 Monitor *m;
6674 CMonGoDegraded(Monitor *mon) : m(mon) {}
6675 void finish(int r) {
6676 m->maybe_go_degraded_stretch_mode();
6677 }
6678 };
6679
6680 struct CMonGoRecovery : public Context {
6681 Monitor *m;
6682 CMonGoRecovery(Monitor *mon) : m(mon) {}
6683 void finish(int r) {
6684 m->go_recovery_stretch_mode();
6685 }
6686 };
6687 void Monitor::go_recovery_stretch_mode()
6688 {
6689 dout(20) << __func__ << dendl;
6690 if (!is_leader()) return;
6691 if (!is_degraded_stretch_mode()) return;
6692 if (is_recovering_stretch_mode()) return;
6693
6694 if (dead_mon_buckets.size()) {
6695 ceph_assert( 0 == "how did we try and do stretch recovery while we have dead monitor buckets?");
6696 // we can't recover if we are missing monitors in a zone!
6697 return;
6698 }
6699
6700 if (!osdmon()->is_readable()) {
6701 osdmon()->wait_for_readable_ctx(new CMonGoRecovery(this));
6702 return;
6703 }
6704
6705 if (!osdmon()->is_writeable()) {
6706 osdmon()->wait_for_writeable_ctx(new CMonGoRecovery(this));
6707 }
6708 osdmon()->trigger_recovery_stretch_mode();
6709 }
6710
6711 void Monitor::set_recovery_stretch_mode()
6712 {
6713 degraded_stretch_mode = true;
6714 recovering_stretch_mode = true;
6715 osdmon()->set_recovery_stretch_mode();
6716 }
6717
6718 void Monitor::maybe_go_degraded_stretch_mode()
6719 {
6720 dout(20) << __func__ << dendl;
6721 if (is_degraded_stretch_mode()) return;
6722 if (!is_leader()) return;
6723 if (dead_mon_buckets.empty()) return;
6724 if (!osdmon()->is_readable()) {
6725 osdmon()->wait_for_readable_ctx(new CMonGoDegraded(this));
6726 return;
6727 }
6728 ceph_assert(monmap->contains(monmap->tiebreaker_mon));
6729 // filter out the tiebreaker zone and check if remaining sites are down by OSDs too
6730 const auto &mi = monmap->mon_info[monmap->tiebreaker_mon];
6731 auto ci = mi.crush_loc.find(stretch_bucket_divider);
6732 map<string, set<string>> filtered_dead_buckets = dead_mon_buckets;
6733 filtered_dead_buckets.erase(ci->second);
6734
6735 set<int> matched_down_buckets;
6736 set<string> matched_down_mons;
6737 bool dead = osdmon()->check_for_dead_crush_zones(filtered_dead_buckets,
6738 &matched_down_buckets,
6739 &matched_down_mons);
6740 if (dead) {
6741 if (!osdmon()->is_writeable()) {
6742 osdmon()->wait_for_writeable_ctx(new CMonGoDegraded(this));
6743 }
6744 if (!monmon()->is_writeable()) {
6745 monmon()->wait_for_writeable_ctx(new CMonGoDegraded(this));
6746 }
6747 trigger_degraded_stretch_mode(matched_down_mons, matched_down_buckets);
6748 }
6749 }
6750
6751 void Monitor::trigger_degraded_stretch_mode(const set<string>& dead_mons,
6752 const set<int>& dead_buckets)
6753 {
6754 dout(20) << __func__ << dendl;
6755 ceph_assert(osdmon()->is_writeable());
6756 ceph_assert(monmon()->is_writeable());
6757
6758 // figure out which OSD zone(s) remains alive by removing
6759 // tiebreaker mon from up_mon_buckets
6760 set<string> live_zones = up_mon_buckets;
6761 ceph_assert(monmap->contains(monmap->tiebreaker_mon));
6762 const auto &mi = monmap->mon_info[monmap->tiebreaker_mon];
6763 auto ci = mi.crush_loc.find(stretch_bucket_divider);
6764 live_zones.erase(ci->second);
6765 ceph_assert(live_zones.size() == 1); // only support 2 zones right now
6766
6767 osdmon()->trigger_degraded_stretch_mode(dead_buckets, live_zones);
6768 monmon()->trigger_degraded_stretch_mode(dead_mons);
6769 set_degraded_stretch_mode();
6770 }
6771
6772 void Monitor::set_degraded_stretch_mode()
6773 {
6774 degraded_stretch_mode = true;
6775 recovering_stretch_mode = false;
6776 osdmon()->set_degraded_stretch_mode();
6777 }
6778
6779 struct CMonGoHealthy : public Context {
6780 Monitor *m;
6781 CMonGoHealthy(Monitor *mon) : m(mon) {}
6782 void finish(int r) {
6783 m->trigger_healthy_stretch_mode();
6784 }
6785 };
6786
6787
6788 void Monitor::trigger_healthy_stretch_mode()
6789 {
6790 dout(20) << __func__ << dendl;
6791 if (!is_degraded_stretch_mode()) return;
6792 if (!is_leader()) return;
6793 if (!osdmon()->is_writeable()) {
6794 osdmon()->wait_for_writeable_ctx(new CMonGoHealthy(this));
6795 }
6796 if (!monmon()->is_writeable()) {
6797 monmon()->wait_for_writeable_ctx(new CMonGoHealthy(this));
6798 }
6799
6800 ceph_assert(osdmon()->osdmap.recovering_stretch_mode);
6801 osdmon()->trigger_healthy_stretch_mode();
6802 monmon()->trigger_healthy_stretch_mode();
6803 }
6804
6805 void Monitor::set_healthy_stretch_mode()
6806 {
6807 degraded_stretch_mode = false;
6808 recovering_stretch_mode = false;
6809 osdmon()->set_healthy_stretch_mode();
6810 }
6811
6812 bool Monitor::session_stretch_allowed(MonSession *s, MonOpRequestRef& op)
6813 {
6814 if (!is_stretch_mode()) return true;
6815 if (s->proxy_con) return true;
6816 if (s->validated_stretch_connection) return true;
6817 if (!s->con) return true;
6818 if (s->con->peer_is_osd()) {
6819 dout(20) << __func__ << "checking OSD session" << s << dendl;
6820 // okay, check the crush location
6821 int barrier_id = [&] {
6822 auto type_id = osdmon()->osdmap.crush->get_validated_type_id(
6823 stretch_bucket_divider);
6824 ceph_assert(type_id.has_value());
6825 return *type_id;
6826 }();
6827 int osd_bucket_id = osdmon()->osdmap.crush->get_parent_of_type(s->con->peer_id,
6828 barrier_id);
6829 const auto &mi = monmap->mon_info.find(name);
6830 ceph_assert(mi != monmap->mon_info.end());
6831 auto ci = mi->second.crush_loc.find(stretch_bucket_divider);
6832 ceph_assert(ci != mi->second.crush_loc.end());
6833 int mon_bucket_id = osdmon()->osdmap.crush->get_item_id(ci->second);
6834
6835 if (osd_bucket_id != mon_bucket_id) {
6836 dout(5) << "discarding session " << *s
6837 << " and sending OSD to matched zone" << dendl;
6838 s->con->mark_down();
6839 std::lock_guard l(session_map_lock);
6840 remove_session(s);
6841 if (op) {
6842 op->mark_zap();
6843 }
6844 return false;
6845 }
6846 }
6847
6848 s->validated_stretch_connection = true;
6849 return true;
6850 }
6851
6852 void Monitor::disconnect_disallowed_stretch_sessions()
6853 {
6854 dout(20) << __func__ << dendl;
6855 MonOpRequestRef blank;
6856 auto i = session_map.sessions.begin();
6857 while (i != session_map.sessions.end()) {
6858 auto j = i;
6859 ++i;
6860 session_stretch_allowed(*j, blank);
6861 }
6862 }