]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/Monitor.cc
5635e5ebcb6958736131fa93539636fd4edb30c7
[ceph.git] / ceph / src / mon / Monitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include <iterator>
17 #include <sstream>
18 #include <tuple>
19 #include <stdlib.h>
20 #include <signal.h>
21 #include <limits.h>
22 #include <cstring>
23 #include <boost/scope_exit.hpp>
24 #include <boost/algorithm/string/predicate.hpp>
25
26 #include "json_spirit/json_spirit_reader.h"
27 #include "json_spirit/json_spirit_writer.h"
28
29 #include "Monitor.h"
30 #include "common/version.h"
31 #include "common/blkdev.h"
32 #include "common/cmdparse.h"
33 #include "common/signal.h"
34
35 #include "osd/OSDMap.h"
36
37 #include "MonitorDBStore.h"
38
39 #include "messages/PaxosServiceMessage.h"
40 #include "messages/MMonMap.h"
41 #include "messages/MMonGetMap.h"
42 #include "messages/MMonGetVersion.h"
43 #include "messages/MMonGetVersionReply.h"
44 #include "messages/MGenericMessage.h"
45 #include "messages/MMonCommand.h"
46 #include "messages/MMonCommandAck.h"
47 #include "messages/MMonSync.h"
48 #include "messages/MMonScrub.h"
49 #include "messages/MMonProbe.h"
50 #include "messages/MMonJoin.h"
51 #include "messages/MMonPaxos.h"
52 #include "messages/MRoute.h"
53 #include "messages/MForward.h"
54
55 #include "messages/MMonSubscribe.h"
56 #include "messages/MMonSubscribeAck.h"
57
58 #include "messages/MCommand.h"
59 #include "messages/MCommandReply.h"
60
61 #include "messages/MTimeCheck2.h"
62 #include "messages/MPing.h"
63
64 #include "common/strtol.h"
65 #include "common/ceph_argparse.h"
66 #include "common/Timer.h"
67 #include "common/Clock.h"
68 #include "common/errno.h"
69 #include "common/perf_counters.h"
70 #include "common/admin_socket.h"
71 #include "global/signal_handler.h"
72 #include "common/Formatter.h"
73 #include "include/stringify.h"
74 #include "include/color.h"
75 #include "include/ceph_fs.h"
76 #include "include/str_list.h"
77
78 #include "OSDMonitor.h"
79 #include "MDSMonitor.h"
80 #include "MonmapMonitor.h"
81 #include "LogMonitor.h"
82 #include "AuthMonitor.h"
83 #include "MgrMonitor.h"
84 #include "MgrStatMonitor.h"
85 #include "ConfigMonitor.h"
86 #include "KVMonitor.h"
87 #include "mon/HealthMonitor.h"
88 #include "common/config.h"
89 #include "common/cmdparse.h"
90 #include "include/ceph_assert.h"
91 #include "include/compat.h"
92 #include "perfglue/heap_profiler.h"
93
94 #include "auth/none/AuthNoneClientHandler.h"
95
96 #define dout_subsys ceph_subsys_mon
97 #undef dout_prefix
98 #define dout_prefix _prefix(_dout, this)
99 using namespace TOPNSPC::common;
100
101 using std::cout;
102 using std::dec;
103 using std::hex;
104 using std::list;
105 using std::map;
106 using std::make_pair;
107 using std::ostream;
108 using std::ostringstream;
109 using std::pair;
110 using std::set;
111 using std::setfill;
112 using std::string;
113 using std::stringstream;
114 using std::to_string;
115 using std::vector;
116 using std::unique_ptr;
117
118 using ceph::bufferlist;
119 using ceph::decode;
120 using ceph::encode;
121 using ceph::ErasureCodeInterfaceRef;
122 using ceph::ErasureCodeProfile;
123 using ceph::Formatter;
124 using ceph::JSONFormatter;
125 using ceph::make_message;
126 using ceph::mono_clock;
127 using ceph::mono_time;
128 using ceph::timespan_str;
129
130
131 static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
132 return *_dout << "mon." << mon->name << "@" << mon->rank
133 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
134 }
135
136 const string Monitor::MONITOR_NAME = "monitor";
137 const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
138
139
140 #undef FLAG
141 #undef COMMAND
142 #undef COMMAND_WITH_FLAG
143 #define FLAG(f) (MonCommand::FLAG_##f)
144 #define COMMAND(parsesig, helptext, modulename, req_perms) \
145 {parsesig, helptext, modulename, req_perms, FLAG(NONE)},
146 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, flags) \
147 {parsesig, helptext, modulename, req_perms, flags},
148 MonCommand mon_commands[] = {
149 #include <mon/MonCommands.h>
150 };
151 #undef COMMAND
152 #undef COMMAND_WITH_FLAG
153
154 Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
155 Messenger *m, Messenger *mgr_m, MonMap *map) :
156 Dispatcher(cct_),
157 AuthServer(cct_),
158 name(nm),
159 rank(-1),
160 messenger(m),
161 con_self(m ? m->get_loopback_connection() : NULL),
162 timer(cct_, lock),
163 finisher(cct_, "mon_finisher", "fin"),
164 cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads),
165 has_ever_joined(false),
166 logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
167 monmap(map),
168 log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
169 key_server(cct, &keyring),
170 auth_cluster_required(cct,
171 cct->_conf->auth_supported.empty() ?
172 cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
173 auth_service_required(cct,
174 cct->_conf->auth_supported.empty() ?
175 cct->_conf->auth_service_required : cct->_conf->auth_supported),
176 mgr_messenger(mgr_m),
177 mgr_client(cct_, mgr_m, monmap),
178 gss_ktfile_client(cct->_conf.get_val<std::string>("gss_ktab_client_file")),
179 store(s),
180
181 elector(this, map->strategy),
182 required_features(0),
183 leader(0),
184 quorum_con_features(0),
185 // scrub
186 scrub_version(0),
187 scrub_event(NULL),
188 scrub_timeout_event(NULL),
189
190 // sync state
191 sync_provider_count(0),
192 sync_cookie(0),
193 sync_full(false),
194 sync_start_version(0),
195 sync_timeout_event(NULL),
196 sync_last_committed_floor(0),
197
198 timecheck_round(0),
199 timecheck_acks(0),
200 timecheck_rounds_since_clean(0),
201 timecheck_event(NULL),
202
203 admin_hook(NULL),
204 routed_request_tid(0),
205 op_tracker(cct, g_conf().get_val<bool>("mon_enable_op_tracker"), 1)
206 {
207 clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
208 audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
209
210 update_log_clients();
211
212 if (!gss_ktfile_client.empty()) {
213 // Assert we can export environment variable
214 /*
215 The default client keytab is used, if it is present and readable,
216 to automatically obtain initial credentials for GSSAPI client
217 applications. The principal name of the first entry in the client
218 keytab is used by default when obtaining initial credentials.
219 1. The KRB5_CLIENT_KTNAME environment variable.
220 2. The default_client_keytab_name profile variable in [libdefaults].
221 3. The hardcoded default, DEFCKTNAME.
222 */
223 const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
224 gss_ktfile_client.c_str(), 1));
225 ceph_assert(set_result == 0);
226 }
227
228 op_tracker.set_complaint_and_threshold(
229 g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count(),
230 g_conf().get_val<int64_t>("mon_op_log_threshold"));
231 op_tracker.set_history_size_and_duration(
232 g_conf().get_val<uint64_t>("mon_op_history_size"),
233 g_conf().get_val<std::chrono::seconds>("mon_op_history_duration").count());
234 op_tracker.set_history_slow_op_size_and_threshold(
235 g_conf().get_val<uint64_t>("mon_op_history_slow_op_size"),
236 g_conf().get_val<std::chrono::seconds>("mon_op_history_slow_op_threshold").count());
237
238 paxos = std::make_unique<Paxos>(*this, "paxos");
239
240 paxos_service[PAXOS_MDSMAP].reset(new MDSMonitor(*this, *paxos, "mdsmap"));
241 paxos_service[PAXOS_MONMAP].reset(new MonmapMonitor(*this, *paxos, "monmap"));
242 paxos_service[PAXOS_OSDMAP].reset(new OSDMonitor(cct, *this, *paxos, "osdmap"));
243 paxos_service[PAXOS_LOG].reset(new LogMonitor(*this, *paxos, "logm"));
244 paxos_service[PAXOS_AUTH].reset(new AuthMonitor(*this, *paxos, "auth"));
245 paxos_service[PAXOS_MGR].reset(new MgrMonitor(*this, *paxos, "mgr"));
246 paxos_service[PAXOS_MGRSTAT].reset(new MgrStatMonitor(*this, *paxos, "mgrstat"));
247 paxos_service[PAXOS_HEALTH].reset(new HealthMonitor(*this, *paxos, "health"));
248 paxos_service[PAXOS_CONFIG].reset(new ConfigMonitor(*this, *paxos, "config"));
249 paxos_service[PAXOS_KV].reset(new KVMonitor(*this, *paxos, "kv"));
250
251 bool r = mon_caps.parse("allow *", NULL);
252 ceph_assert(r);
253
254 exited_quorum = ceph_clock_now();
255
256 // prepare local commands
257 local_mon_commands.resize(std::size(mon_commands));
258 for (unsigned i = 0; i < std::size(mon_commands); ++i) {
259 local_mon_commands[i] = mon_commands[i];
260 }
261 MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
262
263 prenautilus_local_mon_commands = local_mon_commands;
264 for (auto& i : prenautilus_local_mon_commands) {
265 std::string n = cmddesc_get_prenautilus_compat(i.cmdstring);
266 if (n != i.cmdstring) {
267 dout(20) << " pre-nautilus cmd " << i.cmdstring << " -> " << n << dendl;
268 i.cmdstring = n;
269 }
270 }
271 MonCommand::encode_vector(prenautilus_local_mon_commands, prenautilus_local_mon_commands_bl);
272
273 // assume our commands until we have an election. this only means
274 // we won't reply with EINVAL before the election; any command that
275 // actually matters will wait until we have quorum etc and then
276 // retry (and revalidate).
277 leader_mon_commands = local_mon_commands;
278 }
279
280 Monitor::~Monitor()
281 {
282 op_tracker.on_shutdown();
283
284 delete logger;
285 ceph_assert(session_map.sessions.empty());
286 }
287
288
289 class AdminHook : public AdminSocketHook {
290 Monitor *mon;
291 public:
292 explicit AdminHook(Monitor *m) : mon(m) {}
293 int call(std::string_view command, const cmdmap_t& cmdmap,
294 const bufferlist&,
295 Formatter *f,
296 std::ostream& errss,
297 bufferlist& out) override {
298 stringstream outss;
299 int r = mon->do_admin_command(command, cmdmap, f, errss, outss);
300 out.append(outss);
301 return r;
302 }
303 };
304
305 int Monitor::do_admin_command(
306 std::string_view command,
307 const cmdmap_t& cmdmap,
308 Formatter *f,
309 std::ostream& err,
310 std::ostream& out)
311 {
312 std::lock_guard l(lock);
313
314 int r = 0;
315 string args;
316 for (auto p = cmdmap.begin();
317 p != cmdmap.end(); ++p) {
318 if (p->first == "prefix")
319 continue;
320 if (!args.empty())
321 args += ", ";
322 args += cmd_vartype_stringify(p->second);
323 }
324 args = "[" + args + "]";
325
326 bool read_only = (command == "mon_status" ||
327 command == "mon metadata" ||
328 command == "quorum_status" ||
329 command == "ops" ||
330 command == "sessions");
331
332 (read_only ? audit_clog->debug() : audit_clog->info())
333 << "from='admin socket' entity='admin socket' "
334 << "cmd='" << command << "' args=" << args << ": dispatch";
335
336 if (command == "mon_status") {
337 get_mon_status(f);
338 } else if (command == "quorum_status") {
339 _quorum_status(f, out);
340 } else if (command == "sync_force") {
341 bool validate = false;
342 if (!cmd_getval(cmdmap, "yes_i_really_mean_it", validate)) {
343 std::string v;
344 if (cmd_getval(cmdmap, "validate", v) &&
345 v == "--yes-i-really-mean-it") {
346 validate = true;
347 }
348 }
349 if (!validate) {
350 err << "are you SURE? this will mean the monitor store will be erased "
351 "the next time the monitor is restarted. pass "
352 "'--yes-i-really-mean-it' if you really do.";
353 r = -EPERM;
354 goto abort;
355 }
356 sync_force(f);
357 } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0 ||
358 command.compare(0, 24, "add_bootstrap_peer_hintv") == 0) {
359 if (!_add_bootstrap_peer_hint(command, cmdmap, out))
360 goto abort;
361 } else if (command == "quorum enter") {
362 elector.start_participating();
363 start_election();
364 out << "started responding to quorum, initiated new election";
365 } else if (command == "quorum exit") {
366 start_election();
367 elector.stop_participating();
368 out << "stopped responding to quorum, initiated new election";
369 } else if (command == "ops") {
370 (void)op_tracker.dump_ops_in_flight(f);
371 } else if (command == "sessions") {
372 f->open_array_section("sessions");
373 for (auto p : session_map.sessions) {
374 f->dump_object("session", *p);
375 }
376 f->close_section();
377 } else if (command == "dump_historic_ops") {
378 if (!op_tracker.dump_historic_ops(f)) {
379 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
380 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
381 }
382 } else if (command == "dump_historic_ops_by_duration" ) {
383 if (op_tracker.dump_historic_ops(f, true)) {
384 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
385 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
386 }
387 } else if (command == "dump_historic_slow_ops") {
388 if (op_tracker.dump_historic_slow_ops(f, {})) {
389 err << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
390 please enable \"mon_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
391 }
392 } else if (command == "quorum") {
393 string quorumcmd;
394 cmd_getval(cmdmap, "quorumcmd", quorumcmd);
395 if (quorumcmd == "exit") {
396 start_election();
397 elector.stop_participating();
398 out << "stopped responding to quorum, initiated new election" << std::endl;
399 } else if (quorumcmd == "enter") {
400 elector.start_participating();
401 start_election();
402 out << "started responding to quorum, initiated new election" << std::endl;
403 } else {
404 err << "needs a valid 'quorum' command" << std::endl;
405 }
406 } else if (command == "connection scores dump") {
407 if (!get_quorum_mon_features().contains_all(
408 ceph::features::mon::FEATURE_PINGING)) {
409 err << "Not all monitors support changing election strategies; \
410 please upgrade them first!";
411 }
412 elector.dump_connection_scores(f);
413 } else if (command == "connection scores reset") {
414 if (!get_quorum_mon_features().contains_all(
415 ceph::features::mon::FEATURE_PINGING)) {
416 err << "Not all monitors support changing election strategies; \
417 please upgrade them first!";
418 }
419 elector.notify_clear_peer_state();
420 } else if (command == "smart") {
421 string want_devid;
422 cmd_getval(cmdmap, "devid", want_devid);
423
424 string devname = store->get_devname();
425 if (devname.empty()) {
426 err << "could not determine device name for " << store->get_path();
427 r = -ENOENT;
428 goto abort;
429 }
430 set<string> devnames;
431 get_raw_devices(devname, &devnames);
432 json_spirit::mObject json_map;
433 uint64_t smart_timeout = cct->_conf.get_val<uint64_t>(
434 "mon_smart_report_timeout");
435 for (auto& devname : devnames) {
436 string err;
437 string devid = get_device_id(devname, &err);
438 if (want_devid.size() && want_devid != devid) {
439 derr << "get_device_id failed on " << devname << ": " << err << dendl;
440 continue;
441 }
442 json_spirit::mValue smart_json;
443 if (block_device_get_metrics(devname, smart_timeout,
444 &smart_json)) {
445 dout(10) << "block_device_get_metrics failed for /dev/" << devname
446 << dendl;
447 continue;
448 }
449 json_map[devid] = smart_json;
450 }
451 json_spirit::write(json_map, out, json_spirit::pretty_print);
452 } else if (command == "heap") {
453 if (!ceph_using_tcmalloc()) {
454 err << "could not issue heap profiler command -- not using tcmalloc!";
455 r = -EOPNOTSUPP;
456 goto abort;
457 }
458 string cmd;
459 if (!cmd_getval(cmdmap, "heapcmd", cmd)) {
460 err << "unable to get value for command \"" << cmd << "\"";
461 r = -EINVAL;
462 goto abort;
463 }
464 std::vector<std::string> cmd_vec;
465 get_str_vec(cmd, cmd_vec);
466 string val;
467 if (cmd_getval(cmdmap, "value", val)) {
468 cmd_vec.push_back(val);
469 }
470 ceph_heap_profiler_handle_command(cmd_vec, out);
471 } else if (command == "compact") {
472 dout(1) << "triggering manual compaction" << dendl;
473 auto start = ceph::coarse_mono_clock::now();
474 store->compact_async();
475 auto end = ceph::coarse_mono_clock::now();
476 auto duration = ceph::to_seconds<double>(end - start);
477 dout(1) << "finished manual compaction in "
478 << duration << " seconds" << dendl;
479 out << "compacted " << g_conf().get_val<std::string>("mon_keyvaluedb")
480 << " in " << duration << " seconds";
481 } else {
482 ceph_abort_msg("bad AdminSocket command binding");
483 }
484 (read_only ? audit_clog->debug() : audit_clog->info())
485 << "from='admin socket' "
486 << "entity='admin socket' "
487 << "cmd=" << command << " "
488 << "args=" << args << ": finished";
489 return r;
490
491 abort:
492 (read_only ? audit_clog->debug() : audit_clog->info())
493 << "from='admin socket' "
494 << "entity='admin socket' "
495 << "cmd=" << command << " "
496 << "args=" << args << ": aborted";
497 return r;
498 }
499
500 void Monitor::handle_signal(int signum)
501 {
502 derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
503 if (signum == SIGHUP) {
504 sighup_handler(signum);
505 logmon()->reopen_logs();
506 } else {
507 ceph_assert(signum == SIGINT || signum == SIGTERM);
508 shutdown();
509 }
510 }
511
512 CompatSet Monitor::get_initial_supported_features()
513 {
514 CompatSet::FeatureSet ceph_mon_feature_compat;
515 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
516 CompatSet::FeatureSet ceph_mon_feature_incompat;
517 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
518 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
519 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
520 ceph_mon_feature_incompat);
521 }
522
523 CompatSet Monitor::get_supported_features()
524 {
525 CompatSet compat = get_initial_supported_features();
526 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
527 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
528 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
529 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
530 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
531 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
532 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
533 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
534 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS);
535 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_PACIFIC);
536 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_QUINCY);
537 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_REEF);
538 return compat;
539 }
540
541 CompatSet Monitor::get_legacy_features()
542 {
543 CompatSet::FeatureSet ceph_mon_feature_compat;
544 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
545 CompatSet::FeatureSet ceph_mon_feature_incompat;
546 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
547 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
548 ceph_mon_feature_incompat);
549 }
550
551 int Monitor::check_features(MonitorDBStore *store)
552 {
553 CompatSet required = get_supported_features();
554 CompatSet ondisk;
555
556 read_features_off_disk(store, &ondisk);
557
558 if (!required.writeable(ondisk)) {
559 CompatSet diff = required.unsupported(ondisk);
560 generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
561 return -EPERM;
562 }
563
564 return 0;
565 }
566
567 void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
568 {
569 bufferlist featuresbl;
570 store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
571 if (featuresbl.length() == 0) {
572 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
573 << "Assuming it is old-style and introducing one." << dendl;
574 //we only want the baseline ~v.18 features assumed to be on disk.
575 //If new features are introduced this code needs to disappear or
576 //be made smarter.
577 *features = get_legacy_features();
578
579 features->encode(featuresbl);
580 auto t(std::make_shared<MonitorDBStore::Transaction>());
581 t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
582 store->apply_transaction(t);
583 } else {
584 auto it = featuresbl.cbegin();
585 features->decode(it);
586 }
587 }
588
589 void Monitor::read_features()
590 {
591 read_features_off_disk(store, &features);
592 dout(10) << "features " << features << dendl;
593
594 calc_quorum_requirements();
595 dout(10) << "required_features " << required_features << dendl;
596 }
597
598 void Monitor::write_features(MonitorDBStore::TransactionRef t)
599 {
600 bufferlist bl;
601 features.encode(bl);
602 t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
603 }
604
605 const char** Monitor::get_tracked_conf_keys() const
606 {
607 static const char* KEYS[] = {
608 "crushtool", // helpful for testing
609 "mon_election_timeout",
610 "mon_lease",
611 "mon_lease_renew_interval_factor",
612 "mon_lease_ack_timeout_factor",
613 "mon_accept_timeout_factor",
614 // clog & admin clog
615 "clog_to_monitors",
616 "clog_to_syslog",
617 "clog_to_syslog_facility",
618 "clog_to_syslog_level",
619 "clog_to_graylog",
620 "clog_to_graylog_host",
621 "clog_to_graylog_port",
622 "mon_cluster_log_to_file",
623 "host",
624 "fsid",
625 // periodic health to clog
626 "mon_health_to_clog",
627 "mon_health_to_clog_interval",
628 "mon_health_to_clog_tick_interval",
629 // scrub interval
630 "mon_scrub_interval",
631 "mon_allow_pool_delete",
632 // osdmap pruning - observed, not handled.
633 "mon_osdmap_full_prune_enabled",
634 "mon_osdmap_full_prune_min",
635 "mon_osdmap_full_prune_interval",
636 "mon_osdmap_full_prune_txsize",
637 // debug options - observed, not handled
638 "mon_debug_extra_checks",
639 "mon_debug_block_osdmap_trim",
640 NULL
641 };
642 return KEYS;
643 }
644
645 void Monitor::handle_conf_change(const ConfigProxy& conf,
646 const std::set<std::string> &changed)
647 {
648 sanitize_options();
649
650 dout(10) << __func__ << " " << changed << dendl;
651
652 if (changed.count("clog_to_monitors") ||
653 changed.count("clog_to_syslog") ||
654 changed.count("clog_to_syslog_level") ||
655 changed.count("clog_to_syslog_facility") ||
656 changed.count("clog_to_graylog") ||
657 changed.count("clog_to_graylog_host") ||
658 changed.count("clog_to_graylog_port") ||
659 changed.count("host") ||
660 changed.count("fsid")) {
661 update_log_clients();
662 }
663
664 if (changed.count("mon_health_to_clog") ||
665 changed.count("mon_health_to_clog_interval") ||
666 changed.count("mon_health_to_clog_tick_interval")) {
667 finisher.queue(new C_MonContext{this, [this, changed](int) {
668 std::lock_guard l{lock};
669 health_to_clog_update_conf(changed);
670 }});
671 }
672
673 if (changed.count("mon_scrub_interval")) {
674 auto scrub_interval =
675 conf.get_val<std::chrono::seconds>("mon_scrub_interval");
676 finisher.queue(new C_MonContext{this, [this, scrub_interval](int) {
677 std::lock_guard l{lock};
678 scrub_update_interval(scrub_interval);
679 }});
680 }
681 }
682
683 void Monitor::update_log_clients()
684 {
685 clog->parse_client_options(g_ceph_context);
686 audit_clog->parse_client_options(g_ceph_context);
687 }
688
689 int Monitor::sanitize_options()
690 {
691 int r = 0;
692
693 // mon_lease must be greater than mon_lease_renewal; otherwise we
694 // may incur in leases expiring before they are renewed.
695 if (g_conf()->mon_lease_renew_interval_factor >= 1.0) {
696 clog->error() << "mon_lease_renew_interval_factor ("
697 << g_conf()->mon_lease_renew_interval_factor
698 << ") must be less than 1.0";
699 r = -EINVAL;
700 }
701
702 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
703 // got time to renew the lease and get an ack for it. Having both options
704 // with the same value, for a given small vale, could mean timing out if
705 // the monitors happened to be overloaded -- or even under normal load for
706 // a small enough value.
707 if (g_conf()->mon_lease_ack_timeout_factor <= 1.0) {
708 clog->error() << "mon_lease_ack_timeout_factor ("
709 << g_conf()->mon_lease_ack_timeout_factor
710 << ") must be greater than 1.0";
711 r = -EINVAL;
712 }
713
714 return r;
715 }
716
717 int Monitor::preinit()
718 {
719 std::unique_lock l(lock);
720
721 dout(1) << "preinit fsid " << monmap->fsid << dendl;
722
723 int r = sanitize_options();
724 if (r < 0) {
725 derr << "option sanitization failed!" << dendl;
726 return r;
727 }
728
729 ceph_assert(!logger);
730 {
731 PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
732 pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess",
733 PerfCountersBuilder::PRIO_USEFUL);
734 pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions",
735 "sadd", PerfCountersBuilder::PRIO_INTERESTING);
736 pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions",
737 "srm", PerfCountersBuilder::PRIO_INTERESTING);
738 pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions",
739 "strm", PerfCountersBuilder::PRIO_USEFUL);
740 pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in",
741 "ecnt", PerfCountersBuilder::PRIO_USEFUL);
742 pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started",
743 "estt", PerfCountersBuilder::PRIO_INTERESTING);
744 pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won",
745 "ewon", PerfCountersBuilder::PRIO_INTERESTING);
746 pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost",
747 "elst", PerfCountersBuilder::PRIO_INTERESTING);
748 logger = pcb.create_perf_counters();
749 cct->get_perfcounters_collection()->add(logger);
750 }
751
752 ceph_assert(!cluster_logger);
753 {
754 PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
755 pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
756 pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
757 pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
758 pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
759 pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
760 pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
761 pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster", NULL, 0, unit_t(UNIT_BYTES));
762 pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space", NULL, 0, unit_t(UNIT_BYTES));
763 pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space", NULL, 0, unit_t(UNIT_BYTES));
764 pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
765 pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
766 pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
767 pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
768 pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
769 pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
770 pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
771 pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
772 pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
773 pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects", NULL, 0, unit_t(UNIT_BYTES));
774 cluster_logger = pcb.create_perf_counters();
775 }
776
777 paxos->init_logger();
778
779 // verify cluster_uuid
780 {
781 int r = check_fsid();
782 if (r == -ENOENT)
783 r = write_fsid();
784 if (r < 0) {
785 return r;
786 }
787 }
788
789 // open compatset
790 read_features();
791
792 // have we ever joined a quorum?
793 has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
794 dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
795
796 if (!has_ever_joined) {
797 // impose initial quorum restrictions?
798 list<string> initial_members;
799 get_str_list(g_conf()->mon_initial_members, initial_members);
800
801 if (!initial_members.empty()) {
802 dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
803
804 monmap->set_initial_members(
805 g_ceph_context, initial_members, name, messenger->get_myaddrs(),
806 &extra_probe_peers);
807
808 dout(10) << " monmap is " << *monmap << dendl;
809 dout(10) << " extra probe peers " << extra_probe_peers << dendl;
810 }
811 } else if (!monmap->contains(name)) {
812 derr << "not in monmap and have been in a quorum before; "
813 << "must have been removed" << dendl;
814 if (g_conf()->mon_force_quorum_join) {
815 dout(0) << "we should have died but "
816 << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
817 } else {
818 derr << "commit suicide!" << dendl;
819 return -ENOENT;
820 }
821 }
822
823 {
824 // We have a potentially inconsistent store state in hands. Get rid of it
825 // and start fresh.
826 bool clear_store = false;
827 if (store->exists("mon_sync", "in_sync")) {
828 dout(1) << __func__ << " clean up potentially inconsistent store state"
829 << dendl;
830 clear_store = true;
831 }
832
833 if (store->get("mon_sync", "force_sync") > 0) {
834 dout(1) << __func__ << " force sync by clearing store state" << dendl;
835 clear_store = true;
836 }
837
838 if (clear_store) {
839 set<string> sync_prefixes = get_sync_targets_names();
840 store->clear(sync_prefixes);
841 }
842 }
843
844 sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
845 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
846
847 init_paxos();
848
849 if (is_keyring_required()) {
850 // we need to bootstrap authentication keys so we can form an
851 // initial quorum.
852 if (authmon()->get_last_committed() == 0) {
853 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
854 bufferlist bl;
855 int err = store->get("mkfs", "keyring", bl);
856 if (err == 0 && bl.length() > 0) {
857 // Attempt to decode and extract keyring only if it is found.
858 KeyRing keyring;
859 auto p = bl.cbegin();
860 decode(keyring, p);
861 extract_save_mon_key(keyring);
862 }
863 }
864
865 string keyring_loc = g_conf()->mon_data + "/keyring";
866
867 r = keyring.load(cct, keyring_loc);
868 if (r < 0) {
869 EntityName mon_name;
870 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
871 EntityAuth mon_key;
872 if (key_server.get_auth(mon_name, mon_key)) {
873 dout(1) << "copying mon. key from old db to external keyring" << dendl;
874 keyring.add(mon_name, mon_key);
875 bufferlist bl;
876 keyring.encode_plaintext(bl);
877 write_default_keyring(bl);
878 } else {
879 derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
880 return r;
881 }
882 }
883 }
884
885 admin_hook = new AdminHook(this);
886 AdminSocket* admin_socket = cct->get_admin_socket();
887
888 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
889 l.unlock();
890 // register tell/asock commands
891 for (const auto& command : local_mon_commands) {
892 if (!command.is_tell()) {
893 continue;
894 }
895 const auto prefix = cmddesc_get_prefix(command.cmdstring);
896 if (prefix == "injectargs" ||
897 prefix == "version" ||
898 prefix == "tell") {
899 // not registerd by me
900 continue;
901 }
902 r = admin_socket->register_command(command.cmdstring, admin_hook,
903 command.helpstring);
904 ceph_assert(r == 0);
905 }
906 l.lock();
907
908 // add ourselves as a conf observer
909 g_conf().add_observer(this);
910
911 messenger->set_auth_client(this);
912 messenger->set_auth_server(this);
913 mgr_messenger->set_auth_client(this);
914
915 auth_registry.refresh_config();
916
917 return 0;
918 }
919
920 int Monitor::init()
921 {
922 dout(2) << "init" << dendl;
923 std::lock_guard l(lock);
924
925 finisher.start();
926
927 // start ticker
928 timer.init();
929 new_tick();
930
931 cpu_tp.start();
932
933 // i'm ready!
934 messenger->add_dispatcher_tail(this);
935
936 // kickstart pet mgrclient
937 mgr_client.init();
938 mgr_messenger->add_dispatcher_tail(&mgr_client);
939 mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
940 mgrmon()->prime_mgr_client();
941
942 // generate list of filestore OSDs
943 osdmon()->get_filestore_osd_list();
944
945 state = STATE_PROBING;
946
947 bootstrap();
948
949 if (!elector.peer_tracker_is_clean()){
950 dout(10) << "peer_tracker looks inconsistent"
951 << " previous bad logic, clearing ..." << dendl;
952 elector.notify_clear_peer_state();
953 }
954
955 // add features of myself into feature_map
956 session_map.feature_map.add_mon(con_self->get_features());
957 return 0;
958 }
959
960 void Monitor::init_paxos()
961 {
962 dout(10) << __func__ << dendl;
963 paxos->init();
964
965 // init services
966 for (auto& svc : paxos_service) {
967 svc->init();
968 }
969
970 refresh_from_paxos(NULL);
971 }
972
973 void Monitor::refresh_from_paxos(bool *need_bootstrap)
974 {
975 dout(10) << __func__ << dendl;
976
977 bufferlist bl;
978 int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
979 if (r >= 0) {
980 try {
981 auto p = bl.cbegin();
982 decode(fingerprint, p);
983 }
984 catch (ceph::buffer::error& e) {
985 dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
986 }
987 } else {
988 dout(10) << __func__ << " no cluster_fingerprint" << dendl;
989 }
990
991 for (auto& svc : paxos_service) {
992 svc->refresh(need_bootstrap);
993 }
994 for (auto& svc : paxos_service) {
995 svc->post_refresh();
996 }
997 load_metadata();
998 }
999
1000 void Monitor::register_cluster_logger()
1001 {
1002 if (!cluster_logger_registered) {
1003 dout(10) << "register_cluster_logger" << dendl;
1004 cluster_logger_registered = true;
1005 cct->get_perfcounters_collection()->add(cluster_logger);
1006 } else {
1007 dout(10) << "register_cluster_logger - already registered" << dendl;
1008 }
1009 }
1010
1011 void Monitor::unregister_cluster_logger()
1012 {
1013 if (cluster_logger_registered) {
1014 dout(10) << "unregister_cluster_logger" << dendl;
1015 cluster_logger_registered = false;
1016 cct->get_perfcounters_collection()->remove(cluster_logger);
1017 } else {
1018 dout(10) << "unregister_cluster_logger - not registered" << dendl;
1019 }
1020 }
1021
1022 void Monitor::update_logger()
1023 {
1024 cluster_logger->set(l_cluster_num_mon, monmap->size());
1025 cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
1026 }
1027
1028 void Monitor::shutdown()
1029 {
1030 dout(1) << "shutdown" << dendl;
1031
1032 lock.lock();
1033
1034 wait_for_paxos_write();
1035
1036 {
1037 std::lock_guard l(auth_lock);
1038 authmon()->_set_mon_num_rank(0, 0);
1039 }
1040
1041 state = STATE_SHUTDOWN;
1042
1043 lock.unlock();
1044 g_conf().remove_observer(this);
1045 lock.lock();
1046
1047 if (admin_hook) {
1048 cct->get_admin_socket()->unregister_commands(admin_hook);
1049 delete admin_hook;
1050 admin_hook = NULL;
1051 }
1052
1053 elector.shutdown();
1054
1055 mgr_client.shutdown();
1056
1057 lock.unlock();
1058 finisher.wait_for_empty();
1059 finisher.stop();
1060 lock.lock();
1061
1062 // clean up
1063 paxos->shutdown();
1064 for (auto& svc : paxos_service) {
1065 svc->shutdown();
1066 }
1067
1068 finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
1069 finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
1070
1071 timer.shutdown();
1072
1073 cpu_tp.stop();
1074
1075 remove_all_sessions();
1076
1077 log_client.shutdown();
1078
1079 // unlock before msgr shutdown...
1080 lock.unlock();
1081
1082 // shutdown messenger before removing logger from perfcounter collection,
1083 // otherwise _ms_dispatch() will try to update deleted logger
1084 messenger->shutdown();
1085 mgr_messenger->shutdown();
1086
1087 if (logger) {
1088 cct->get_perfcounters_collection()->remove(logger);
1089 }
1090 if (cluster_logger) {
1091 if (cluster_logger_registered)
1092 cct->get_perfcounters_collection()->remove(cluster_logger);
1093 delete cluster_logger;
1094 cluster_logger = NULL;
1095 }
1096 }
1097
1098 void Monitor::wait_for_paxos_write()
1099 {
1100 if (paxos->is_writing() || paxos->is_writing_previous()) {
1101 dout(10) << __func__ << " flushing pending write" << dendl;
1102 lock.unlock();
1103 store->flush();
1104 lock.lock();
1105 dout(10) << __func__ << " flushed pending write" << dendl;
1106 }
1107 }
1108
1109 void Monitor::respawn()
1110 {
1111 // --- WARNING TO FUTURE COPY/PASTERS ---
1112 // You must also add a call like
1113 //
1114 // ceph_pthread_setname(pthread_self(), "ceph-mon");
1115 //
1116 // to main() so that /proc/$pid/stat field 2 contains "(ceph-mon)"
1117 // instead of "(exe)", so that killall (and log rotation) will work.
1118
1119 dout(0) << __func__ << dendl;
1120
1121 char *new_argv[orig_argc+1];
1122 dout(1) << " e: '" << orig_argv[0] << "'" << dendl;
1123 for (int i=0; i<orig_argc; i++) {
1124 new_argv[i] = (char *)orig_argv[i];
1125 dout(1) << " " << i << ": '" << orig_argv[i] << "'" << dendl;
1126 }
1127 new_argv[orig_argc] = NULL;
1128
1129 /* Determine the path to our executable, test if Linux /proc/self/exe exists.
1130 * This allows us to exec the same executable even if it has since been
1131 * unlinked.
1132 */
1133 char exe_path[PATH_MAX] = "";
1134 #ifdef PROCPREFIX
1135 if (readlink(PROCPREFIX "/proc/self/exe", exe_path, PATH_MAX-1) != -1) {
1136 dout(1) << "respawning with exe " << exe_path << dendl;
1137 strcpy(exe_path, PROCPREFIX "/proc/self/exe");
1138 } else {
1139 #else
1140 {
1141 #endif
1142 /* Print CWD for the user's interest */
1143 char buf[PATH_MAX];
1144 char *cwd = getcwd(buf, sizeof(buf));
1145 ceph_assert(cwd);
1146 dout(1) << " cwd " << cwd << dendl;
1147
1148 /* Fall back to a best-effort: just running in our CWD */
1149 strncpy(exe_path, orig_argv[0], PATH_MAX-1);
1150 }
1151
1152 dout(1) << " exe_path " << exe_path << dendl;
1153
1154 unblock_all_signals(NULL);
1155 execv(exe_path, new_argv);
1156
1157 dout(0) << "respawn execv " << orig_argv[0]
1158 << " failed with " << cpp_strerror(errno) << dendl;
1159
1160 // We have to assert out here, because suicide() returns, and callers
1161 // to respawn expect it never to return.
1162 ceph_abort();
1163 }
1164
1165 void Monitor::bootstrap()
1166 {
1167 dout(10) << "bootstrap" << dendl;
1168 wait_for_paxos_write();
1169
1170 sync_reset_requester();
1171 unregister_cluster_logger();
1172 cancel_probe_timeout();
1173
1174 if (monmap->get_epoch() == 0) {
1175 dout(10) << "reverting to legacy ranks for seed monmap (epoch 0)" << dendl;
1176 monmap->calc_legacy_ranks();
1177 }
1178 dout(10) << "monmap " << *monmap << dendl;
1179 {
1180 auto from_release = monmap->min_mon_release;
1181 ostringstream err;
1182 if (!can_upgrade_from(from_release, "min_mon_release", err)) {
1183 derr << "current monmap has " << err.str() << " stopping." << dendl;
1184 exit(0);
1185 }
1186 }
1187 // note my rank
1188 int newrank = monmap->get_rank(messenger->get_myaddrs());
1189 if (newrank < 0 && rank >= 0) {
1190 // was i ever part of the quorum?
1191 if (has_ever_joined) {
1192 dout(0) << " removed from monmap, suicide." << dendl;
1193 exit(0);
1194 }
1195 elector.notify_clear_peer_state();
1196 }
1197 if (newrank >= 0 &&
1198 monmap->get_addrs(newrank) != messenger->get_myaddrs()) {
1199 dout(0) << " monmap addrs for rank " << newrank << " changed, i am "
1200 << messenger->get_myaddrs()
1201 << ", monmap is " << monmap->get_addrs(newrank) << ", respawning"
1202 << dendl;
1203
1204 if (monmap->get_epoch()) {
1205 // store this map in temp mon_sync location so that we use it on
1206 // our next startup
1207 derr << " stashing newest monmap " << monmap->get_epoch()
1208 << " for next startup" << dendl;
1209 bufferlist bl;
1210 monmap->encode(bl, -1);
1211 auto t(std::make_shared<MonitorDBStore::Transaction>());
1212 t->put("mon_sync", "temp_newer_monmap", bl);
1213 store->apply_transaction(t);
1214 }
1215
1216 respawn();
1217 }
1218 if (newrank != rank) {
1219 dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
1220 messenger->set_myname(entity_name_t::MON(newrank));
1221 rank = newrank;
1222 elector.notify_rank_changed(rank);
1223
1224 // reset all connections, or else our peers will think we are someone else.
1225 messenger->mark_down_all();
1226 }
1227
1228 // reset
1229 state = STATE_PROBING;
1230
1231 _reset();
1232
1233 // sync store
1234 if (g_conf()->mon_compact_on_bootstrap) {
1235 dout(10) << "bootstrap -- triggering compaction" << dendl;
1236 store->compact();
1237 dout(10) << "bootstrap -- finished compaction" << dendl;
1238 }
1239
1240 // stretch mode bits
1241 set_elector_disallowed_leaders(false);
1242
1243 // singleton monitor?
1244 if (monmap->size() == 1 && rank == 0) {
1245 win_standalone_election();
1246 return;
1247 }
1248
1249 reset_probe_timeout();
1250
1251 // i'm outside the quorum
1252 if (monmap->contains(name))
1253 outside_quorum.insert(name);
1254
1255 // probe monitors
1256 dout(10) << "probing other monitors" << dendl;
1257 for (unsigned i = 0; i < monmap->size(); i++) {
1258 if ((int)i != rank)
1259 send_mon_message(
1260 new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
1261 ceph_release()),
1262 i);
1263 }
1264 for (auto& av : extra_probe_peers) {
1265 if (av != messenger->get_myaddrs()) {
1266 messenger->send_to_mon(
1267 new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,
1268 ceph_release()),
1269 av);
1270 }
1271 }
1272 }
1273
1274 bool Monitor::_add_bootstrap_peer_hint(std::string_view cmd,
1275 const cmdmap_t& cmdmap,
1276 ostream& ss)
1277 {
1278 if (is_leader() || is_peon()) {
1279 ss << "mon already active; ignoring bootstrap hint";
1280 return true;
1281 }
1282
1283 entity_addrvec_t addrs;
1284 string addrstr;
1285 if (cmd_getval(cmdmap, "addr", addrstr)) {
1286 dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' addr '"
1287 << addrstr << "'" << dendl;
1288
1289 entity_addr_t addr;
1290 if (!addr.parse(addrstr, entity_addr_t::TYPE_ANY)) {
1291 ss << "failed to parse addrs '" << addrstr
1292 << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1293 return false;
1294 }
1295
1296 addrs.v.push_back(addr);
1297 if (addr.get_port() == 0) {
1298 addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
1299 addrs.v[0].set_port(CEPH_MON_PORT_IANA);
1300 addrs.v.push_back(addr);
1301 addrs.v[1].set_type(entity_addr_t::TYPE_LEGACY);
1302 addrs.v[1].set_port(CEPH_MON_PORT_LEGACY);
1303 } else if (addr.get_type() == entity_addr_t::TYPE_ANY) {
1304 if (addr.get_port() == CEPH_MON_PORT_LEGACY) {
1305 addrs.v[0].set_type(entity_addr_t::TYPE_LEGACY);
1306 } else {
1307 addrs.v[0].set_type(entity_addr_t::TYPE_MSGR2);
1308 }
1309 }
1310 } else if (cmd_getval(cmdmap, "addrv", addrstr)) {
1311 dout(10) << "_add_bootstrap_peer_hintv '" << cmd << "' addrv '"
1312 << addrstr << "'" << dendl;
1313 const char *end = 0;
1314 if (!addrs.parse(addrstr.c_str(), &end)) {
1315 ss << "failed to parse addrs '" << addrstr
1316 << "'; syntax is 'add_bootstrap_peer_hintv v2:ip:port[,v1:ip:port]'";
1317 return false;
1318 }
1319 } else {
1320 ss << "no addr or addrv provided";
1321 return false;
1322 }
1323
1324 extra_probe_peers.insert(addrs);
1325 ss << "adding peer " << addrs << " to list: " << extra_probe_peers;
1326 return true;
1327 }
1328
1329 // called by bootstrap(), or on leader|peon -> electing
1330 void Monitor::_reset()
1331 {
1332 dout(10) << __func__ << dendl;
1333
1334 // disable authentication
1335 {
1336 std::lock_guard l(auth_lock);
1337 authmon()->_set_mon_num_rank(0, 0);
1338 }
1339
1340 cancel_probe_timeout();
1341 timecheck_finish();
1342 health_events_cleanup();
1343 health_check_log_times.clear();
1344 scrub_event_cancel();
1345
1346 leader_since = utime_t();
1347 quorum_since = {};
1348 if (!quorum.empty()) {
1349 exited_quorum = ceph_clock_now();
1350 }
1351 quorum.clear();
1352 outside_quorum.clear();
1353 quorum_feature_map.clear();
1354
1355 scrub_reset();
1356
1357 paxos->restart();
1358
1359 for (auto& svc : paxos_service) {
1360 svc->restart();
1361 }
1362 }
1363
1364
1365 // -----------------------------------------------------------
1366 // sync
1367
1368 set<string> Monitor::get_sync_targets_names()
1369 {
1370 set<string> targets;
1371 targets.insert(paxos->get_name());
1372 for (auto& svc : paxos_service) {
1373 svc->get_store_prefixes(targets);
1374 }
1375 return targets;
1376 }
1377
1378
1379 void Monitor::sync_timeout()
1380 {
1381 dout(10) << __func__ << dendl;
1382 ceph_assert(state == STATE_SYNCHRONIZING);
1383 bootstrap();
1384 }
1385
1386 void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1387 {
1388 dout(1) << __func__ << dendl;
1389
1390 MonMap latest_monmap;
1391
1392 // Grab latest monmap from MonmapMonitor
1393 bufferlist monmon_bl;
1394 int err = monmon()->get_monmap(monmon_bl);
1395 if (err < 0) {
1396 if (err != -ENOENT) {
1397 derr << __func__
1398 << " something wrong happened while reading the store: "
1399 << cpp_strerror(err) << dendl;
1400 ceph_abort_msg("error reading the store");
1401 }
1402 } else {
1403 latest_monmap.decode(monmon_bl);
1404 }
1405
1406 // Grab last backed up monmap (if any) and compare epochs
1407 if (store->exists("mon_sync", "latest_monmap")) {
1408 bufferlist backup_bl;
1409 int err = store->get("mon_sync", "latest_monmap", backup_bl);
1410 if (err < 0) {
1411 derr << __func__
1412 << " something wrong happened while reading the store: "
1413 << cpp_strerror(err) << dendl;
1414 ceph_abort_msg("error reading the store");
1415 }
1416 ceph_assert(backup_bl.length() > 0);
1417
1418 MonMap backup_monmap;
1419 backup_monmap.decode(backup_bl);
1420
1421 if (backup_monmap.epoch > latest_monmap.epoch)
1422 latest_monmap = backup_monmap;
1423 }
1424
1425 // Check if our current monmap's epoch is greater than the one we've
1426 // got so far.
1427 if (monmap->epoch > latest_monmap.epoch)
1428 latest_monmap = *monmap;
1429
1430 dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1431
1432 latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1433 }
1434
1435 void Monitor::sync_reset_requester()
1436 {
1437 dout(10) << __func__ << dendl;
1438
1439 if (sync_timeout_event) {
1440 timer.cancel_event(sync_timeout_event);
1441 sync_timeout_event = NULL;
1442 }
1443
1444 sync_provider = entity_addrvec_t();
1445 sync_cookie = 0;
1446 sync_full = false;
1447 sync_start_version = 0;
1448 }
1449
1450 void Monitor::sync_reset_provider()
1451 {
1452 dout(10) << __func__ << dendl;
1453 sync_providers.clear();
1454 }
1455
1456 void Monitor::sync_start(entity_addrvec_t &addrs, bool full)
1457 {
1458 dout(10) << __func__ << " " << addrs << (full ? " full" : " recent") << dendl;
1459
1460 ceph_assert(state == STATE_PROBING ||
1461 state == STATE_SYNCHRONIZING);
1462 state = STATE_SYNCHRONIZING;
1463
1464 // make sure are not a provider for anyone!
1465 sync_reset_provider();
1466
1467 sync_full = full;
1468
1469 if (sync_full) {
1470 // stash key state, and mark that we are syncing
1471 auto t(std::make_shared<MonitorDBStore::Transaction>());
1472 sync_stash_critical_state(t);
1473 t->put("mon_sync", "in_sync", 1);
1474
1475 sync_last_committed_floor = std::max(sync_last_committed_floor, paxos->get_version());
1476 dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1477 << sync_last_committed_floor << dendl;
1478 t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1479
1480 store->apply_transaction(t);
1481
1482 ceph_assert(g_conf()->mon_sync_requester_kill_at != 1);
1483
1484 // clear the underlying store
1485 set<string> targets = get_sync_targets_names();
1486 dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1487 store->clear(targets);
1488
1489 // make sure paxos knows it has been reset. this prevents a
1490 // bootstrap and then different probe reply order from possibly
1491 // deciding a partial or no sync is needed.
1492 paxos->init();
1493
1494 ceph_assert(g_conf()->mon_sync_requester_kill_at != 2);
1495 }
1496
1497 // assume 'other' as the leader. We will update the leader once we receive
1498 // a reply to the sync start.
1499 sync_provider = addrs;
1500
1501 sync_reset_timeout();
1502
1503 MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1504 if (!sync_full)
1505 m->last_committed = paxos->get_version();
1506 messenger->send_to_mon(m, sync_provider);
1507 }
1508
1509 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1510 {
1511 dout(10) << __func__ << dendl;
1512 bufferlist backup_monmap;
1513 sync_obtain_latest_monmap(backup_monmap);
1514 ceph_assert(backup_monmap.length() > 0);
1515 t->put("mon_sync", "latest_monmap", backup_monmap);
1516 }
1517
1518 void Monitor::sync_reset_timeout()
1519 {
1520 dout(10) << __func__ << dendl;
1521 if (sync_timeout_event)
1522 timer.cancel_event(sync_timeout_event);
1523 sync_timeout_event = timer.add_event_after(
1524 g_conf()->mon_sync_timeout,
1525 new C_MonContext{this, [this](int) {
1526 sync_timeout();
1527 }});
1528 }
1529
1530 void Monitor::sync_finish(version_t last_committed)
1531 {
1532 dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1533
1534 ceph_assert(g_conf()->mon_sync_requester_kill_at != 7);
1535
1536 if (sync_full) {
1537 // finalize the paxos commits
1538 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1539 paxos->read_and_prepare_transactions(tx, sync_start_version,
1540 last_committed);
1541 tx->put(paxos->get_name(), "last_committed", last_committed);
1542
1543 dout(30) << __func__ << " final tx dump:\n";
1544 JSONFormatter f(true);
1545 tx->dump(&f);
1546 f.flush(*_dout);
1547 *_dout << dendl;
1548
1549 store->apply_transaction(tx);
1550 }
1551
1552 ceph_assert(g_conf()->mon_sync_requester_kill_at != 8);
1553
1554 auto t(std::make_shared<MonitorDBStore::Transaction>());
1555 t->erase("mon_sync", "in_sync");
1556 t->erase("mon_sync", "force_sync");
1557 t->erase("mon_sync", "last_committed_floor");
1558 store->apply_transaction(t);
1559
1560 ceph_assert(g_conf()->mon_sync_requester_kill_at != 9);
1561
1562 init_paxos();
1563
1564 ceph_assert(g_conf()->mon_sync_requester_kill_at != 10);
1565
1566 bootstrap();
1567 }
1568
1569 void Monitor::handle_sync(MonOpRequestRef op)
1570 {
1571 auto m = op->get_req<MMonSync>();
1572 dout(10) << __func__ << " " << *m << dendl;
1573 switch (m->op) {
1574
1575 // provider ---------
1576
1577 case MMonSync::OP_GET_COOKIE_FULL:
1578 case MMonSync::OP_GET_COOKIE_RECENT:
1579 handle_sync_get_cookie(op);
1580 break;
1581 case MMonSync::OP_GET_CHUNK:
1582 handle_sync_get_chunk(op);
1583 break;
1584
1585 // client -----------
1586
1587 case MMonSync::OP_COOKIE:
1588 handle_sync_cookie(op);
1589 break;
1590
1591 case MMonSync::OP_CHUNK:
1592 case MMonSync::OP_LAST_CHUNK:
1593 handle_sync_chunk(op);
1594 break;
1595 case MMonSync::OP_NO_COOKIE:
1596 handle_sync_no_cookie(op);
1597 break;
1598
1599 default:
1600 dout(0) << __func__ << " unknown op " << m->op << dendl;
1601 ceph_abort_msg("unknown op");
1602 }
1603 }
1604
1605 // leader
1606
1607 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1608 {
1609 auto m = op->get_req<MMonSync>();
1610 MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1611 m->get_connection()->send_message(reply);
1612 }
1613
1614 void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1615 {
1616 auto m = op->get_req<MMonSync>();
1617 if (is_synchronizing()) {
1618 _sync_reply_no_cookie(op);
1619 return;
1620 }
1621
1622 ceph_assert(g_conf()->mon_sync_provider_kill_at != 1);
1623
1624 // make sure they can understand us.
1625 if ((required_features ^ m->get_connection()->get_features()) &
1626 required_features) {
1627 dout(5) << " ignoring peer mon." << m->get_source().num()
1628 << " has features " << std::hex
1629 << m->get_connection()->get_features()
1630 << " but we require " << required_features << std::dec << dendl;
1631 return;
1632 }
1633
1634 // make up a unique cookie. include election epoch (which persists
1635 // across restarts for the whole cluster) and a counter for this
1636 // process instance. there is no need to be unique *across*
1637 // monitors, though.
1638 uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
1639 ceph_assert(sync_providers.count(cookie) == 0);
1640
1641 dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1642
1643 SyncProvider& sp = sync_providers[cookie];
1644 sp.cookie = cookie;
1645 sp.addrs = m->get_source_addrs();
1646 sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
1647
1648 set<string> sync_targets;
1649 if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1650 // full scan
1651 sync_targets = get_sync_targets_names();
1652 sp.last_committed = paxos->get_version();
1653 sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1654 sp.full = true;
1655 dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1656 } else {
1657 // just catch up paxos
1658 sp.last_committed = m->last_committed;
1659 }
1660 dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1661
1662 MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1663 reply->last_committed = sp.last_committed;
1664 m->get_connection()->send_message(reply);
1665 }
1666
1667 void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1668 {
1669 auto m = op->get_req<MMonSync>();
1670 dout(10) << __func__ << " " << *m << dendl;
1671
1672 if (sync_providers.count(m->cookie) == 0) {
1673 dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1674 _sync_reply_no_cookie(op);
1675 return;
1676 }
1677
1678 ceph_assert(g_conf()->mon_sync_provider_kill_at != 2);
1679
1680 SyncProvider& sp = sync_providers[m->cookie];
1681 sp.reset_timeout(g_ceph_context, g_conf()->mon_sync_timeout * 2);
1682
1683 if (sp.last_committed < paxos->get_first_committed() &&
1684 paxos->get_first_committed() > 1) {
1685 dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1686 << " < our fc " << paxos->get_first_committed() << dendl;
1687 sync_providers.erase(m->cookie);
1688 _sync_reply_no_cookie(op);
1689 return;
1690 }
1691
1692 MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1693 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1694
1695 int bytes_left = g_conf()->mon_sync_max_payload_size;
1696 int keys_left = g_conf()->mon_sync_max_payload_keys;
1697 while (sp.last_committed < paxos->get_version() &&
1698 bytes_left > 0 &&
1699 keys_left > 0) {
1700 bufferlist bl;
1701 sp.last_committed++;
1702
1703 int err = store->get(paxos->get_name(), sp.last_committed, bl);
1704 ceph_assert(err == 0);
1705
1706 tx->put(paxos->get_name(), sp.last_committed, bl);
1707 bytes_left -= bl.length();
1708 --keys_left;
1709 dout(20) << __func__ << " including paxos state " << sp.last_committed
1710 << dendl;
1711 }
1712 reply->last_committed = sp.last_committed;
1713
1714 if (sp.full && bytes_left > 0 && keys_left > 0) {
1715 sp.synchronizer->get_chunk_tx(tx, bytes_left, keys_left);
1716 sp.last_key = sp.synchronizer->get_last_key();
1717 reply->last_key = sp.last_key;
1718 }
1719
1720 if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1721 sp.last_committed < paxos->get_version()) {
1722 dout(10) << __func__ << " chunk, through version " << sp.last_committed
1723 << " key " << sp.last_key << dendl;
1724 } else {
1725 dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1726 << " key " << sp.last_key << dendl;
1727 reply->op = MMonSync::OP_LAST_CHUNK;
1728
1729 ceph_assert(g_conf()->mon_sync_provider_kill_at != 3);
1730
1731 // clean up our local state
1732 sync_providers.erase(sp.cookie);
1733 }
1734
1735 encode(*tx, reply->chunk_bl);
1736
1737 m->get_connection()->send_message(reply);
1738 }
1739
1740 // requester
1741
1742 void Monitor::handle_sync_cookie(MonOpRequestRef op)
1743 {
1744 auto m = op->get_req<MMonSync>();
1745 dout(10) << __func__ << " " << *m << dendl;
1746 if (sync_cookie) {
1747 dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1748 return;
1749 }
1750 if (m->get_source_addrs() != sync_provider) {
1751 dout(10) << __func__ << " source does not match, discarding" << dendl;
1752 return;
1753 }
1754 sync_cookie = m->cookie;
1755 sync_start_version = m->last_committed;
1756
1757 sync_reset_timeout();
1758 sync_get_next_chunk();
1759
1760 ceph_assert(g_conf()->mon_sync_requester_kill_at != 3);
1761 }
1762
1763 void Monitor::sync_get_next_chunk()
1764 {
1765 dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
1766 if (g_conf()->mon_inject_sync_get_chunk_delay > 0) {
1767 dout(20) << __func__ << " injecting delay of " << g_conf()->mon_inject_sync_get_chunk_delay << dendl;
1768 usleep((long long)(g_conf()->mon_inject_sync_get_chunk_delay * 1000000.0));
1769 }
1770 MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
1771 messenger->send_to_mon(r, sync_provider);
1772
1773 ceph_assert(g_conf()->mon_sync_requester_kill_at != 4);
1774 }
1775
1776 void Monitor::handle_sync_chunk(MonOpRequestRef op)
1777 {
1778 auto m = op->get_req<MMonSync>();
1779 dout(10) << __func__ << " " << *m << dendl;
1780
1781 if (m->cookie != sync_cookie) {
1782 dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1783 return;
1784 }
1785 if (m->get_source_addrs() != sync_provider) {
1786 dout(10) << __func__ << " source does not match, discarding" << dendl;
1787 return;
1788 }
1789
1790 ceph_assert(state == STATE_SYNCHRONIZING);
1791 ceph_assert(g_conf()->mon_sync_requester_kill_at != 5);
1792
1793 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1794 tx->append_from_encoded(m->chunk_bl);
1795
1796 dout(30) << __func__ << " tx dump:\n";
1797 JSONFormatter f(true);
1798 tx->dump(&f);
1799 f.flush(*_dout);
1800 *_dout << dendl;
1801
1802 store->apply_transaction(tx);
1803
1804 ceph_assert(g_conf()->mon_sync_requester_kill_at != 6);
1805
1806 if (!sync_full) {
1807 dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1808 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1809 paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1810 m->last_committed);
1811 tx->put(paxos->get_name(), "last_committed", m->last_committed);
1812
1813 dout(30) << __func__ << " tx dump:\n";
1814 JSONFormatter f(true);
1815 tx->dump(&f);
1816 f.flush(*_dout);
1817 *_dout << dendl;
1818
1819 store->apply_transaction(tx);
1820 paxos->init(); // to refresh what we just wrote
1821 }
1822
1823 if (m->op == MMonSync::OP_CHUNK) {
1824 sync_reset_timeout();
1825 sync_get_next_chunk();
1826 } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1827 sync_finish(m->last_committed);
1828 }
1829 }
1830
1831 void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1832 {
1833 dout(10) << __func__ << dendl;
1834 bootstrap();
1835 }
1836
1837 void Monitor::sync_trim_providers()
1838 {
1839 dout(20) << __func__ << dendl;
1840
1841 utime_t now = ceph_clock_now();
1842 map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1843 while (p != sync_providers.end()) {
1844 if (now > p->second.timeout) {
1845 dout(10) << __func__ << " expiring cookie " << p->second.cookie
1846 << " for " << p->second.addrs << dendl;
1847 sync_providers.erase(p++);
1848 } else {
1849 ++p;
1850 }
1851 }
1852 }
1853
1854 // ---------------------------------------------------
1855 // probe
1856
1857 void Monitor::cancel_probe_timeout()
1858 {
1859 if (probe_timeout_event) {
1860 dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1861 timer.cancel_event(probe_timeout_event);
1862 probe_timeout_event = NULL;
1863 } else {
1864 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1865 }
1866 }
1867
1868 void Monitor::reset_probe_timeout()
1869 {
1870 cancel_probe_timeout();
1871 probe_timeout_event = new C_MonContext{this, [this](int r) {
1872 probe_timeout(r);
1873 }};
1874 double t = g_conf()->mon_probe_timeout;
1875 if (timer.add_event_after(t, probe_timeout_event)) {
1876 dout(10) << "reset_probe_timeout " << probe_timeout_event
1877 << " after " << t << " seconds" << dendl;
1878 } else {
1879 probe_timeout_event = nullptr;
1880 }
1881 }
1882
1883 void Monitor::probe_timeout(int r)
1884 {
1885 dout(4) << "probe_timeout " << probe_timeout_event << dendl;
1886 ceph_assert(is_probing() || is_synchronizing());
1887 ceph_assert(probe_timeout_event);
1888 probe_timeout_event = NULL;
1889 bootstrap();
1890 }
1891
1892 void Monitor::handle_probe(MonOpRequestRef op)
1893 {
1894 auto m = op->get_req<MMonProbe>();
1895 dout(10) << "handle_probe " << *m << dendl;
1896
1897 if (m->fsid != monmap->fsid) {
1898 dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1899 return;
1900 }
1901
1902 switch (m->op) {
1903 case MMonProbe::OP_PROBE:
1904 handle_probe_probe(op);
1905 break;
1906
1907 case MMonProbe::OP_REPLY:
1908 handle_probe_reply(op);
1909 break;
1910
1911 case MMonProbe::OP_MISSING_FEATURES:
1912 derr << __func__ << " require release " << (int)m->mon_release << " > "
1913 << (int)ceph_release()
1914 << ", or missing features (have " << CEPH_FEATURES_ALL
1915 << ", required " << m->required_features
1916 << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL) << ")"
1917 << dendl;
1918 break;
1919 }
1920 }
1921
1922 void Monitor::handle_probe_probe(MonOpRequestRef op)
1923 {
1924 auto m = op->get_req<MMonProbe>();
1925
1926 dout(10) << "handle_probe_probe " << m->get_source_inst() << " " << *m
1927 << " features " << m->get_connection()->get_features() << dendl;
1928 uint64_t missing = required_features & ~m->get_connection()->get_features();
1929 if ((m->mon_release != ceph_release_t::unknown &&
1930 m->mon_release < monmap->min_mon_release) ||
1931 missing) {
1932 dout(1) << " peer " << m->get_source_addr()
1933 << " release " << m->mon_release
1934 << " < min_mon_release " << monmap->min_mon_release
1935 << ", or missing features " << missing << dendl;
1936 MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1937 name, has_ever_joined, monmap->min_mon_release);
1938 m->required_features = required_features;
1939 m->get_connection()->send_message(r);
1940 goto out;
1941 }
1942
1943 if (!is_probing() && !is_synchronizing()) {
1944 // If the probing mon is way ahead of us, we need to re-bootstrap.
1945 // Normally we capture this case when we initially bootstrap, but
1946 // it is possible we pass those checks (we overlap with
1947 // quorum-to-be) but fail to join a quorum before it moves past
1948 // us. We need to be kicked back to bootstrap so we can
1949 // synchonize, not keep calling elections.
1950 if (paxos->get_version() + 1 < m->paxos_first_version) {
1951 dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1952 << "ahead of us, re-bootstrapping" << dendl;
1953 bootstrap();
1954 goto out;
1955
1956 }
1957 }
1958
1959 MMonProbe *r;
1960 r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined,
1961 ceph_release());
1962 r->name = name;
1963 r->quorum = quorum;
1964 r->leader = leader;
1965 monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1966 r->paxos_first_version = paxos->get_first_committed();
1967 r->paxos_last_version = paxos->get_version();
1968 m->get_connection()->send_message(r);
1969
1970 // did we discover a peer here?
1971 if (!monmap->contains(m->get_source_addr())) {
1972 dout(1) << " adding peer " << m->get_source_addrs()
1973 << " to list of hints" << dendl;
1974 extra_probe_peers.insert(m->get_source_addrs());
1975 } else {
1976 elector.begin_peer_ping(monmap->get_rank(m->get_source_addr()));
1977 }
1978
1979 out:
1980 return;
1981 }
1982
1983 void Monitor::handle_probe_reply(MonOpRequestRef op)
1984 {
1985 auto m = op->get_req<MMonProbe>();
1986 dout(10) << "handle_probe_reply " << m->get_source_inst()
1987 << " " << *m << dendl;
1988 dout(10) << " monmap is " << *monmap << dendl;
1989
1990 // discover name and addrs during probing or electing states.
1991 if (!is_probing() && !is_electing()) {
1992 return;
1993 }
1994
1995 // newer map, or they've joined a quorum and we haven't?
1996 bufferlist mybl;
1997 monmap->encode(mybl, m->get_connection()->get_features());
1998 // make sure it's actually different; the checks below err toward
1999 // taking the other guy's map, which could cause us to loop.
2000 if (!mybl.contents_equal(m->monmap_bl)) {
2001 MonMap *newmap = new MonMap;
2002 newmap->decode(m->monmap_bl);
2003 if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
2004 !has_ever_joined)) {
2005 dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
2006 << ", mine was " << monmap->get_epoch() << dendl;
2007 int epoch_diff = newmap->get_epoch() - monmap->get_epoch();
2008 delete newmap;
2009 monmap->decode(m->monmap_bl);
2010 dout(20) << "has_ever_joined: " << has_ever_joined << dendl;
2011 if (epoch_diff == 1 && has_ever_joined) {
2012 notify_new_monmap(false);
2013 } else {
2014 notify_new_monmap(false, false);
2015 elector.notify_clear_peer_state();
2016 }
2017 bootstrap();
2018 return;
2019 }
2020 delete newmap;
2021 }
2022
2023 // rename peer?
2024 string peer_name = monmap->get_name(m->get_source_addr());
2025 if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
2026 dout(10) << " renaming peer " << m->get_source_addr() << " "
2027 << peer_name << " -> " << m->name << " in my monmap"
2028 << dendl;
2029 monmap->rename(peer_name, m->name);
2030
2031 if (is_electing()) {
2032 bootstrap();
2033 return;
2034 }
2035 } else if (peer_name.size()) {
2036 dout(10) << " peer name is " << peer_name << dendl;
2037 } else {
2038 dout(10) << " peer " << m->get_source_addr() << " not in map" << dendl;
2039 }
2040
2041 // new initial peer?
2042 if (monmap->get_epoch() == 0 &&
2043 monmap->contains(m->name) &&
2044 monmap->get_addrs(m->name).front().is_blank_ip()) {
2045 dout(1) << " learned initial mon " << m->name
2046 << " addrs " << m->get_source_addrs() << dendl;
2047 monmap->set_addrvec(m->name, m->get_source_addrs());
2048
2049 bootstrap();
2050 return;
2051 }
2052
2053 // end discover phase
2054 if (!is_probing()) {
2055 return;
2056 }
2057
2058 ceph_assert(paxos != NULL);
2059
2060 if (is_synchronizing()) {
2061 dout(10) << " currently syncing" << dendl;
2062 return;
2063 }
2064
2065 entity_addrvec_t other = m->get_source_addrs();
2066
2067 if (m->paxos_last_version < sync_last_committed_floor) {
2068 dout(10) << " peer paxos versions [" << m->paxos_first_version
2069 << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
2070 << sync_last_committed_floor << ", ignoring"
2071 << dendl;
2072 } else {
2073 if (paxos->get_version() < m->paxos_first_version &&
2074 m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
2075 dout(10) << " peer paxos first versions [" << m->paxos_first_version
2076 << "," << m->paxos_last_version << "]"
2077 << " vs my version " << paxos->get_version()
2078 << " (too far ahead)"
2079 << dendl;
2080 cancel_probe_timeout();
2081 sync_start(other, true);
2082 return;
2083 }
2084 if (paxos->get_version() + g_conf()->paxos_max_join_drift < m->paxos_last_version) {
2085 dout(10) << " peer paxos last version " << m->paxos_last_version
2086 << " vs my version " << paxos->get_version()
2087 << " (too far ahead)"
2088 << dendl;
2089 cancel_probe_timeout();
2090 sync_start(other, false);
2091 return;
2092 }
2093 }
2094
2095 // did the existing cluster complete upgrade to luminous?
2096 if (osdmon()->osdmap.get_epoch()) {
2097 if (osdmon()->osdmap.require_osd_release < ceph_release_t::luminous) {
2098 derr << __func__ << " existing cluster has not completed upgrade to"
2099 << " luminous; 'ceph osd require_osd_release luminous' before"
2100 << " upgrading" << dendl;
2101 exit(0);
2102 }
2103 if (!osdmon()->osdmap.test_flag(CEPH_OSDMAP_PURGED_SNAPDIRS) ||
2104 !osdmon()->osdmap.test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) {
2105 derr << __func__ << " existing cluster has not completed a full luminous"
2106 << " scrub to purge legacy snapdir objects; please scrub before"
2107 << " upgrading beyond luminous." << dendl;
2108 exit(0);
2109 }
2110 }
2111
2112 // is there an existing quorum?
2113 if (m->quorum.size()) {
2114 dout(10) << " existing quorum " << m->quorum << dendl;
2115
2116 dout(10) << " peer paxos version " << m->paxos_last_version
2117 << " vs my version " << paxos->get_version()
2118 << " (ok)"
2119 << dendl;
2120 bool in_map = false;
2121 const auto my_info = monmap->mon_info.find(name);
2122 const map<string,string> *map_crush_loc{nullptr};
2123 if (my_info != monmap->mon_info.end()) {
2124 in_map = true;
2125 map_crush_loc = &my_info->second.crush_loc;
2126 }
2127 if (in_map &&
2128 !monmap->get_addrs(name).front().is_blank_ip() &&
2129 (!need_set_crush_loc || (*map_crush_loc == crush_loc))) {
2130 // i'm part of the cluster; just initiate a new election
2131 start_election();
2132 } else {
2133 dout(10) << " ready to join, but i'm not in the monmap/"
2134 "my addr is blank/location is wrong, trying to join" << dendl;
2135 send_mon_message(new MMonJoin(monmap->fsid, name,
2136 messenger->get_myaddrs(), crush_loc,
2137 need_set_crush_loc),
2138 m->leader);
2139 }
2140 } else {
2141 if (monmap->contains(m->name)) {
2142 dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
2143 outside_quorum.insert(m->name);
2144 } else {
2145 dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
2146 return;
2147 }
2148
2149 unsigned need = monmap->min_quorum_size();
2150 dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
2151 if (outside_quorum.size() >= need) {
2152 if (outside_quorum.count(name)) {
2153 dout(10) << " that's enough to form a new quorum, calling election" << dendl;
2154 start_election();
2155 } else {
2156 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
2157 }
2158 } else {
2159 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
2160 }
2161 }
2162 }
2163
2164 void Monitor::join_election()
2165 {
2166 dout(10) << __func__ << dendl;
2167 wait_for_paxos_write();
2168 _reset();
2169 state = STATE_ELECTING;
2170
2171 logger->inc(l_mon_num_elections);
2172 }
2173
2174 void Monitor::start_election()
2175 {
2176 dout(10) << "start_election" << dendl;
2177 wait_for_paxos_write();
2178 _reset();
2179 state = STATE_ELECTING;
2180
2181 logger->inc(l_mon_num_elections);
2182 logger->inc(l_mon_election_call);
2183
2184 clog->info() << "mon." << name << " calling monitor election";
2185 elector.call_election();
2186 }
2187
2188 void Monitor::win_standalone_election()
2189 {
2190 dout(1) << "win_standalone_election" << dendl;
2191
2192 // bump election epoch, in case the previous epoch included other
2193 // monitors; we need to be able to make the distinction.
2194 elector.declare_standalone_victory();
2195
2196 rank = monmap->get_rank(name);
2197 ceph_assert(rank == 0);
2198 set<int> q;
2199 q.insert(rank);
2200
2201 map<int,Metadata> metadata;
2202 collect_metadata(&metadata[0]);
2203
2204 win_election(elector.get_epoch(), q,
2205 CEPH_FEATURES_ALL,
2206 ceph::features::mon::get_supported(),
2207 ceph_release(),
2208 metadata);
2209 }
2210
2211 const utime_t& Monitor::get_leader_since() const
2212 {
2213 ceph_assert(state == STATE_LEADER);
2214 return leader_since;
2215 }
2216
2217 epoch_t Monitor::get_epoch()
2218 {
2219 return elector.get_epoch();
2220 }
2221
2222 void Monitor::_finish_svc_election()
2223 {
2224 ceph_assert(state == STATE_LEADER || state == STATE_PEON);
2225
2226 for (auto& svc : paxos_service) {
2227 // we already called election_finished() on monmon(); avoid callig twice
2228 if (state == STATE_LEADER && svc.get() == monmon())
2229 continue;
2230 svc->election_finished();
2231 }
2232 }
2233
2234 void Monitor::win_election(epoch_t epoch, const set<int>& active, uint64_t features,
2235 const mon_feature_t& mon_features,
2236 ceph_release_t min_mon_release,
2237 const map<int,Metadata>& metadata)
2238 {
2239 dout(10) << __func__ << " epoch " << epoch << " quorum " << active
2240 << " features " << features
2241 << " mon_features " << mon_features
2242 << " min_mon_release " << min_mon_release
2243 << dendl;
2244 ceph_assert(is_electing());
2245 state = STATE_LEADER;
2246 leader_since = ceph_clock_now();
2247 quorum_since = mono_clock::now();
2248 leader = rank;
2249 quorum = active;
2250 quorum_con_features = features;
2251 quorum_mon_features = mon_features;
2252 quorum_min_mon_release = min_mon_release;
2253 pending_metadata = metadata;
2254 outside_quorum.clear();
2255
2256 clog->info() << "mon." << name << " is new leader, mons " << get_quorum_names()
2257 << " in quorum (ranks " << quorum << ")";
2258
2259 set_leader_commands(get_local_commands(mon_features));
2260
2261 paxos->leader_init();
2262 // NOTE: tell monmap monitor first. This is important for the
2263 // bootstrap case to ensure that the very first paxos proposal
2264 // codifies the monmap. Otherwise any manner of chaos can ensue
2265 // when monitors are call elections or participating in a paxos
2266 // round without agreeing on who the participants are.
2267 monmon()->election_finished();
2268 _finish_svc_election();
2269
2270 logger->inc(l_mon_election_win);
2271
2272 // inject new metadata in first transaction.
2273 {
2274 // include previous metadata for missing mons (that aren't part of
2275 // the current quorum).
2276 map<int,Metadata> m = metadata;
2277 for (unsigned rank = 0; rank < monmap->size(); ++rank) {
2278 if (m.count(rank) == 0 &&
2279 mon_metadata.count(rank)) {
2280 m[rank] = mon_metadata[rank];
2281 }
2282 }
2283
2284 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
2285 // a new transaction immediately after the election finishes. We should
2286 // do that anyway for other reasons, though.
2287 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
2288 bufferlist bl;
2289 encode(m, bl);
2290 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
2291 }
2292
2293 finish_election();
2294 if (monmap->size() > 1 &&
2295 monmap->get_epoch() > 0) {
2296 timecheck_start();
2297 health_tick_start();
2298
2299 // Freshen the health status before doing health_to_clog in case
2300 // our just-completed election changed the health
2301 healthmon()->wait_for_active_ctx(new LambdaContext([this](int r){
2302 dout(20) << "healthmon now active" << dendl;
2303 healthmon()->tick();
2304 if (healthmon()->is_proposing()) {
2305 dout(20) << __func__ << " healthmon proposing, waiting" << dendl;
2306 healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext{this,
2307 [this](int r){
2308 ceph_assert(ceph_mutex_is_locked_by_me(lock));
2309 do_health_to_clog_interval();
2310 }});
2311
2312 } else {
2313 do_health_to_clog_interval();
2314 }
2315 }));
2316
2317 scrub_event_start();
2318 }
2319 }
2320
2321 void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
2322 uint64_t features,
2323 const mon_feature_t& mon_features,
2324 ceph_release_t min_mon_release)
2325 {
2326 state = STATE_PEON;
2327 leader_since = utime_t();
2328 quorum_since = mono_clock::now();
2329 leader = l;
2330 quorum = q;
2331 outside_quorum.clear();
2332 quorum_con_features = features;
2333 quorum_mon_features = mon_features;
2334 quorum_min_mon_release = min_mon_release;
2335 dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
2336 << " quorum is " << quorum << " features are " << quorum_con_features
2337 << " mon_features are " << quorum_mon_features
2338 << " min_mon_release " << min_mon_release
2339 << dendl;
2340
2341 paxos->peon_init();
2342 _finish_svc_election();
2343
2344 logger->inc(l_mon_election_lose);
2345
2346 finish_election();
2347 }
2348
2349 namespace {
2350 std::string collect_compression_algorithms()
2351 {
2352 ostringstream os;
2353 bool printed = false;
2354 for (auto [name, key] : Compressor::compression_algorithms) {
2355 if (printed) {
2356 os << ", ";
2357 } else {
2358 printed = true;
2359 }
2360 std::ignore = key;
2361 os << name;
2362 }
2363 return os.str();
2364 }
2365 }
2366
2367 void Monitor::collect_metadata(Metadata *m)
2368 {
2369 collect_sys_info(m, g_ceph_context);
2370 (*m)["addrs"] = stringify(messenger->get_myaddrs());
2371 (*m)["compression_algorithms"] = collect_compression_algorithms();
2372
2373 // infer storage device
2374 string devname = store->get_devname();
2375 set<string> devnames;
2376 get_raw_devices(devname, &devnames);
2377 map<string,string> errs;
2378 get_device_metadata(devnames, m, &errs);
2379 for (auto& i : errs) {
2380 dout(1) << __func__ << " " << i.first << ": " << i.second << dendl;
2381 }
2382 }
2383
2384 void Monitor::finish_election()
2385 {
2386 apply_quorum_to_compatset_features();
2387 apply_monmap_to_compatset_features();
2388 timecheck_finish();
2389 exited_quorum = utime_t();
2390 finish_contexts(g_ceph_context, waitfor_quorum);
2391 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2392 resend_routed_requests();
2393 update_logger();
2394 register_cluster_logger();
2395
2396 // enable authentication
2397 {
2398 std::lock_guard l(auth_lock);
2399 authmon()->_set_mon_num_rank(monmap->size(), rank);
2400 }
2401
2402 // am i named and located properly?
2403 string cur_name = monmap->get_name(messenger->get_myaddrs());
2404 const auto my_infop = monmap->mon_info.find(cur_name);
2405 const map<string,string>& map_crush_loc = my_infop->second.crush_loc;
2406
2407 if (cur_name != name ||
2408 (need_set_crush_loc && map_crush_loc != crush_loc)) {
2409 dout(10) << " renaming/moving myself from " << cur_name << "/"
2410 << map_crush_loc <<" -> " << name << "/" << crush_loc << dendl;
2411 send_mon_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddrs(),
2412 crush_loc, need_set_crush_loc),
2413 leader);
2414 return;
2415 }
2416 do_stretch_mode_election_work();
2417 }
2418
2419 void Monitor::_apply_compatset_features(CompatSet &new_features)
2420 {
2421 if (new_features.compare(features) != 0) {
2422 CompatSet diff = features.unsupported(new_features);
2423 dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2424 features = new_features;
2425
2426 auto t = std::make_shared<MonitorDBStore::Transaction>();
2427 write_features(t);
2428 store->apply_transaction(t);
2429
2430 calc_quorum_requirements();
2431 }
2432 }
2433
2434 void Monitor::apply_quorum_to_compatset_features()
2435 {
2436 CompatSet new_features(features);
2437 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
2438 if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2439 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2440 }
2441 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2442 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
2443 dout(5) << __func__ << dendl;
2444 _apply_compatset_features(new_features);
2445 }
2446
2447 void Monitor::apply_monmap_to_compatset_features()
2448 {
2449 CompatSet new_features(features);
2450 mon_feature_t monmap_features = monmap->get_required_features();
2451
2452 /* persistent monmap features may go into the compatset.
2453 * optional monmap features may not - why?
2454 * because optional monmap features may be set/unset by the admin,
2455 * and possibly by other means that haven't yet been thought out,
2456 * so we can't make the monitor enforce them on start - because they
2457 * may go away.
2458 * this, of course, does not invalidate setting a compatset feature
2459 * for an optional feature - as long as you make sure to clean it up
2460 * once you unset it.
2461 */
2462 if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
2463 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2464 ceph::features::mon::FEATURE_KRAKEN));
2465 // this feature should only ever be set if the quorum supports it.
2466 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
2467 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2468 }
2469 if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
2470 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2471 ceph::features::mon::FEATURE_LUMINOUS));
2472 // this feature should only ever be set if the quorum supports it.
2473 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
2474 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
2475 }
2476 if (monmap_features.contains_all(ceph::features::mon::FEATURE_MIMIC)) {
2477 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2478 ceph::features::mon::FEATURE_MIMIC));
2479 // this feature should only ever be set if the quorum supports it.
2480 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_MIMIC));
2481 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_MIMIC);
2482 }
2483 if (monmap_features.contains_all(ceph::features::mon::FEATURE_NAUTILUS)) {
2484 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2485 ceph::features::mon::FEATURE_NAUTILUS));
2486 // this feature should only ever be set if the quorum supports it.
2487 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_NAUTILUS));
2488 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS);
2489 }
2490 if (monmap_features.contains_all(ceph::features::mon::FEATURE_OCTOPUS)) {
2491 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2492 ceph::features::mon::FEATURE_OCTOPUS));
2493 // this feature should only ever be set if the quorum supports it.
2494 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_OCTOPUS));
2495 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS);
2496 }
2497 if (monmap_features.contains_all(ceph::features::mon::FEATURE_PACIFIC)) {
2498 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2499 ceph::features::mon::FEATURE_PACIFIC));
2500 // this feature should only ever be set if the quorum supports it.
2501 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_PACIFIC));
2502 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_PACIFIC);
2503 }
2504 if (monmap_features.contains_all(ceph::features::mon::FEATURE_QUINCY)) {
2505 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2506 ceph::features::mon::FEATURE_QUINCY));
2507 // this feature should only ever be set if the quorum supports it.
2508 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_QUINCY));
2509 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_QUINCY);
2510 }
2511 if (monmap_features.contains_all(ceph::features::mon::FEATURE_REEF)) {
2512 ceph_assert(ceph::features::mon::get_persistent().contains_all(
2513 ceph::features::mon::FEATURE_REEF));
2514 // this feature should only ever be set if the quorum supports it.
2515 ceph_assert(HAVE_FEATURE(quorum_con_features, SERVER_REEF));
2516 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_REEF);
2517 }
2518
2519 dout(5) << __func__ << dendl;
2520 _apply_compatset_features(new_features);
2521 }
2522
2523 void Monitor::calc_quorum_requirements()
2524 {
2525 required_features = 0;
2526
2527 // compatset
2528 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2529 required_features |= CEPH_FEATURE_OSDMAP_ENC;
2530 }
2531 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2532 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2533 }
2534 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) {
2535 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2536 }
2537 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_MIMIC)) {
2538 required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
2539 }
2540 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_NAUTILUS)) {
2541 required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
2542 CEPH_FEATUREMASK_CEPHX_V2;
2543 }
2544 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OCTOPUS)) {
2545 required_features |= CEPH_FEATUREMASK_SERVER_OCTOPUS;
2546 }
2547 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_PACIFIC)) {
2548 required_features |= CEPH_FEATUREMASK_SERVER_PACIFIC;
2549 }
2550 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_QUINCY)) {
2551 required_features |= CEPH_FEATUREMASK_SERVER_QUINCY;
2552 }
2553 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_REEF)) {
2554 required_features |= CEPH_FEATUREMASK_SERVER_REEF;
2555 }
2556
2557 // monmap
2558 if (monmap->get_required_features().contains_all(
2559 ceph::features::mon::FEATURE_KRAKEN)) {
2560 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2561 }
2562 if (monmap->get_required_features().contains_all(
2563 ceph::features::mon::FEATURE_LUMINOUS)) {
2564 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2565 }
2566 if (monmap->get_required_features().contains_all(
2567 ceph::features::mon::FEATURE_MIMIC)) {
2568 required_features |= CEPH_FEATUREMASK_SERVER_MIMIC;
2569 }
2570 if (monmap->get_required_features().contains_all(
2571 ceph::features::mon::FEATURE_NAUTILUS)) {
2572 required_features |= CEPH_FEATUREMASK_SERVER_NAUTILUS |
2573 CEPH_FEATUREMASK_CEPHX_V2;
2574 }
2575 dout(10) << __func__ << " required_features " << required_features << dendl;
2576 }
2577
2578 void Monitor::get_combined_feature_map(FeatureMap *fm)
2579 {
2580 *fm += session_map.feature_map;
2581 for (auto id : quorum) {
2582 if (id != rank) {
2583 *fm += quorum_feature_map[id];
2584 }
2585 }
2586 }
2587
2588 void Monitor::sync_force(Formatter *f)
2589 {
2590 auto tx(std::make_shared<MonitorDBStore::Transaction>());
2591 sync_stash_critical_state(tx);
2592 tx->put("mon_sync", "force_sync", 1);
2593 store->apply_transaction(tx);
2594
2595 f->open_object_section("sync_force");
2596 f->dump_int("ret", 0);
2597 f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2598 f->close_section(); // sync_force
2599 }
2600
2601 void Monitor::_quorum_status(Formatter *f, ostream& ss)
2602 {
2603 bool free_formatter = false;
2604
2605 if (!f) {
2606 // louzy/lazy hack: default to json if no formatter has been defined
2607 f = new JSONFormatter();
2608 free_formatter = true;
2609 }
2610 f->open_object_section("quorum_status");
2611 f->dump_int("election_epoch", get_epoch());
2612
2613 f->open_array_section("quorum");
2614 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2615 f->dump_int("mon", *p);
2616 f->close_section(); // quorum
2617
2618 list<string> quorum_names = get_quorum_names();
2619 f->open_array_section("quorum_names");
2620 for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2621 f->dump_string("mon", *p);
2622 f->close_section(); // quorum_names
2623
2624 f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(leader));
2625
2626 if (!quorum.empty()) {
2627 f->dump_int(
2628 "quorum_age",
2629 quorum_age());
2630 }
2631
2632 f->open_object_section("features");
2633 f->dump_stream("quorum_con") << quorum_con_features;
2634 quorum_mon_features.dump(f, "quorum_mon");
2635 f->close_section();
2636
2637 f->open_object_section("monmap");
2638 monmap->dump(f);
2639 f->close_section(); // monmap
2640
2641 f->close_section(); // quorum_status
2642 f->flush(ss);
2643 if (free_formatter)
2644 delete f;
2645 }
2646
2647 void Monitor::get_mon_status(Formatter *f)
2648 {
2649 f->open_object_section("mon_status");
2650 f->dump_string("name", name);
2651 f->dump_int("rank", rank);
2652 f->dump_string("state", get_state_name());
2653 f->dump_int("election_epoch", get_epoch());
2654
2655 f->open_array_section("quorum");
2656 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2657 f->dump_int("mon", *p);
2658 }
2659 f->close_section(); // quorum
2660
2661 if (!quorum.empty()) {
2662 f->dump_int(
2663 "quorum_age",
2664 quorum_age());
2665 }
2666
2667 f->open_object_section("features");
2668 f->dump_stream("required_con") << required_features;
2669 mon_feature_t req_mon_features = get_required_mon_features();
2670 req_mon_features.dump(f, "required_mon");
2671 f->dump_stream("quorum_con") << quorum_con_features;
2672 quorum_mon_features.dump(f, "quorum_mon");
2673 f->close_section(); // features
2674
2675 f->open_array_section("outside_quorum");
2676 for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2677 f->dump_string("mon", *p);
2678 f->close_section(); // outside_quorum
2679
2680 f->open_array_section("extra_probe_peers");
2681 for (set<entity_addrvec_t>::iterator p = extra_probe_peers.begin();
2682 p != extra_probe_peers.end();
2683 ++p) {
2684 f->dump_object("peer", *p);
2685 }
2686 f->close_section(); // extra_probe_peers
2687
2688 f->open_array_section("sync_provider");
2689 for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2690 p != sync_providers.end();
2691 ++p) {
2692 f->dump_unsigned("cookie", p->second.cookie);
2693 f->dump_object("addrs", p->second.addrs);
2694 f->dump_stream("timeout") << p->second.timeout;
2695 f->dump_unsigned("last_committed", p->second.last_committed);
2696 f->dump_stream("last_key") << p->second.last_key;
2697 }
2698 f->close_section();
2699
2700 if (is_synchronizing()) {
2701 f->open_object_section("sync");
2702 f->dump_stream("sync_provider") << sync_provider;
2703 f->dump_unsigned("sync_cookie", sync_cookie);
2704 f->dump_unsigned("sync_start_version", sync_start_version);
2705 f->close_section();
2706 }
2707
2708 if (g_conf()->mon_sync_provider_kill_at > 0)
2709 f->dump_int("provider_kill_at", g_conf()->mon_sync_provider_kill_at);
2710 if (g_conf()->mon_sync_requester_kill_at > 0)
2711 f->dump_int("requester_kill_at", g_conf()->mon_sync_requester_kill_at);
2712
2713 f->open_object_section("monmap");
2714 monmap->dump(f);
2715 f->close_section();
2716
2717 f->dump_object("feature_map", session_map.feature_map);
2718 f->dump_bool("stretch_mode", stretch_mode_engaged);
2719 f->close_section(); // mon_status
2720 }
2721
2722
2723 // health status to clog
2724
2725 void Monitor::health_tick_start()
2726 {
2727 if (!cct->_conf->mon_health_to_clog ||
2728 cct->_conf->mon_health_to_clog_tick_interval <= 0)
2729 return;
2730
2731 dout(15) << __func__ << dendl;
2732
2733 health_tick_stop();
2734 health_tick_event = timer.add_event_after(
2735 cct->_conf->mon_health_to_clog_tick_interval,
2736 new C_MonContext{this, [this](int r) {
2737 if (r < 0)
2738 return;
2739 health_tick_start();
2740 }});
2741 }
2742
2743 void Monitor::health_tick_stop()
2744 {
2745 dout(15) << __func__ << dendl;
2746
2747 if (health_tick_event) {
2748 timer.cancel_event(health_tick_event);
2749 health_tick_event = NULL;
2750 }
2751 }
2752
2753 ceph::real_clock::time_point Monitor::health_interval_calc_next_update()
2754 {
2755 auto now = ceph::real_clock::now();
2756
2757 auto secs = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch());
2758 int remainder = secs.count() % cct->_conf->mon_health_to_clog_interval;
2759 int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
2760 auto next = secs + std::chrono::seconds(adjustment);
2761
2762 dout(20) << __func__
2763 << " now: " << now << ","
2764 << " next: " << next << ","
2765 << " interval: " << cct->_conf->mon_health_to_clog_interval
2766 << dendl;
2767
2768 return ceph::real_clock::time_point{next};
2769 }
2770
2771 void Monitor::health_interval_start()
2772 {
2773 dout(15) << __func__ << dendl;
2774
2775 if (!cct->_conf->mon_health_to_clog ||
2776 cct->_conf->mon_health_to_clog_interval <= 0) {
2777 return;
2778 }
2779
2780 health_interval_stop();
2781 auto next = health_interval_calc_next_update();
2782 health_interval_event = new C_MonContext{this, [this](int r) {
2783 if (r < 0)
2784 return;
2785 do_health_to_clog_interval();
2786 }};
2787 if (!timer.add_event_at(next, health_interval_event)) {
2788 health_interval_event = nullptr;
2789 }
2790 }
2791
2792 void Monitor::health_interval_stop()
2793 {
2794 dout(15) << __func__ << dendl;
2795 if (health_interval_event) {
2796 timer.cancel_event(health_interval_event);
2797 }
2798 health_interval_event = NULL;
2799 }
2800
2801 void Monitor::health_events_cleanup()
2802 {
2803 health_tick_stop();
2804 health_interval_stop();
2805 health_status_cache.reset();
2806 }
2807
2808 void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2809 {
2810 dout(20) << __func__ << dendl;
2811
2812 if (changed.count("mon_health_to_clog")) {
2813 if (!cct->_conf->mon_health_to_clog) {
2814 health_events_cleanup();
2815 return;
2816 } else {
2817 if (!health_tick_event) {
2818 health_tick_start();
2819 }
2820 if (!health_interval_event) {
2821 health_interval_start();
2822 }
2823 }
2824 }
2825
2826 if (changed.count("mon_health_to_clog_interval")) {
2827 if (cct->_conf->mon_health_to_clog_interval <= 0) {
2828 health_interval_stop();
2829 } else {
2830 health_interval_start();
2831 }
2832 }
2833
2834 if (changed.count("mon_health_to_clog_tick_interval")) {
2835 if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2836 health_tick_stop();
2837 } else {
2838 health_tick_start();
2839 }
2840 }
2841 }
2842
2843 void Monitor::do_health_to_clog_interval()
2844 {
2845 // outputting to clog may have been disabled in the conf
2846 // since we were scheduled.
2847 if (!cct->_conf->mon_health_to_clog ||
2848 cct->_conf->mon_health_to_clog_interval <= 0)
2849 return;
2850
2851 dout(10) << __func__ << dendl;
2852
2853 // do we have a cached value for next_clog_update? if not,
2854 // do we know when the last update was?
2855
2856 do_health_to_clog(true);
2857 health_interval_start();
2858 }
2859
2860 void Monitor::do_health_to_clog(bool force)
2861 {
2862 // outputting to clog may have been disabled in the conf
2863 // since we were scheduled.
2864 if (!cct->_conf->mon_health_to_clog ||
2865 cct->_conf->mon_health_to_clog_interval <= 0)
2866 return;
2867
2868 dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2869
2870 string summary;
2871 health_status_t level = healthmon()->get_health_status(false, nullptr, &summary);
2872 if (!force &&
2873 summary == health_status_cache.summary &&
2874 level == health_status_cache.overall)
2875 return;
2876
2877 if (g_conf()->mon_health_detail_to_clog &&
2878 summary != health_status_cache.summary &&
2879 level != HEALTH_OK) {
2880 string details;
2881 level = healthmon()->get_health_status(true, nullptr, &details);
2882 clog->health(level) << "Health detail: " << details;
2883 } else {
2884 clog->health(level) << "overall " << summary;
2885 }
2886 health_status_cache.summary = summary;
2887 health_status_cache.overall = level;
2888 }
2889
2890 void Monitor::log_health(
2891 const health_check_map_t& updated,
2892 const health_check_map_t& previous,
2893 MonitorDBStore::TransactionRef t)
2894 {
2895 if (!g_conf()->mon_health_to_clog) {
2896 return;
2897 }
2898
2899 const utime_t now = ceph_clock_now();
2900
2901 // FIXME: log atomically as part of @t instead of using clog.
2902 dout(10) << __func__ << " updated " << updated.checks.size()
2903 << " previous " << previous.checks.size()
2904 << dendl;
2905 const auto min_log_period = g_conf().get_val<int64_t>(
2906 "mon_health_log_update_period");
2907 for (auto& p : updated.checks) {
2908 auto q = previous.checks.find(p.first);
2909 bool logged = false;
2910 if (q == previous.checks.end()) {
2911 // new
2912 ostringstream ss;
2913 ss << "Health check failed: " << p.second.summary << " ("
2914 << p.first << ")";
2915 clog->health(p.second.severity) << ss.str();
2916
2917 logged = true;
2918 } else {
2919 if (p.second.summary != q->second.summary ||
2920 p.second.severity != q->second.severity) {
2921
2922 auto status_iter = health_check_log_times.find(p.first);
2923 if (status_iter != health_check_log_times.end()) {
2924 if (p.second.severity == q->second.severity &&
2925 now - status_iter->second.updated_at < min_log_period) {
2926 // We already logged this recently and the severity is unchanged,
2927 // so skip emitting an update of the summary string.
2928 // We'll get an update out of tick() later if the check
2929 // is still failing.
2930 continue;
2931 }
2932 }
2933
2934 // summary or severity changed (ignore detail changes at this level)
2935 ostringstream ss;
2936 ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
2937 clog->health(p.second.severity) << ss.str();
2938
2939 logged = true;
2940 }
2941 }
2942 // Record the time at which we last logged, so that we can check this
2943 // when considering whether/when to print update messages.
2944 if (logged) {
2945 auto iter = health_check_log_times.find(p.first);
2946 if (iter == health_check_log_times.end()) {
2947 health_check_log_times.emplace(p.first, HealthCheckLogStatus(
2948 p.second.severity, p.second.summary, now));
2949 } else {
2950 iter->second = HealthCheckLogStatus(
2951 p.second.severity, p.second.summary, now);
2952 }
2953 }
2954 }
2955 for (auto& p : previous.checks) {
2956 if (!updated.checks.count(p.first)) {
2957 // cleared
2958 ostringstream ss;
2959 if (p.first == "DEGRADED_OBJECTS") {
2960 clog->info() << "All degraded objects recovered";
2961 } else if (p.first == "OSD_FLAGS") {
2962 clog->info() << "OSD flags cleared";
2963 } else {
2964 clog->info() << "Health check cleared: " << p.first << " (was: "
2965 << p.second.summary << ")";
2966 }
2967
2968 if (health_check_log_times.count(p.first)) {
2969 health_check_log_times.erase(p.first);
2970 }
2971 }
2972 }
2973
2974 if (previous.checks.size() && updated.checks.size() == 0) {
2975 // We might be going into a fully healthy state, check
2976 // other subsystems
2977 bool any_checks = false;
2978 for (auto& svc : paxos_service) {
2979 if (&(svc->get_health_checks()) == &(previous)) {
2980 // Ignore the ones we're clearing right now
2981 continue;
2982 }
2983
2984 if (svc->get_health_checks().checks.size() > 0) {
2985 any_checks = true;
2986 break;
2987 }
2988 }
2989 if (!any_checks) {
2990 clog->info() << "Cluster is now healthy";
2991 }
2992 }
2993 }
2994
2995 void Monitor::update_pending_metadata()
2996 {
2997 Metadata metadata;
2998 collect_metadata(&metadata);
2999 size_t version_size = mon_metadata[rank]["ceph_version_short"].size();
3000 const std::string current_version = mon_metadata[rank]["ceph_version_short"];
3001 const std::string pending_version = metadata["ceph_version_short"];
3002
3003 if (current_version.compare(0, version_size, pending_version) != 0) {
3004 mgr_client.update_daemon_metadata("mon", name, metadata);
3005 }
3006 }
3007
3008 void Monitor::get_cluster_status(stringstream &ss, Formatter *f,
3009 MonSession *session)
3010 {
3011 if (f)
3012 f->open_object_section("status");
3013
3014 const auto&& fs_names = session->get_allowed_fs_names();
3015
3016 if (f) {
3017 f->dump_stream("fsid") << monmap->get_fsid();
3018 healthmon()->get_health_status(false, f, nullptr);
3019 f->dump_unsigned("election_epoch", get_epoch());
3020 {
3021 f->open_array_section("quorum");
3022 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
3023 f->dump_int("rank", *p);
3024 f->close_section();
3025 f->open_array_section("quorum_names");
3026 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
3027 f->dump_string("id", monmap->get_name(*p));
3028 f->close_section();
3029 f->dump_int(
3030 "quorum_age",
3031 quorum_age());
3032 }
3033 f->open_object_section("monmap");
3034 monmap->dump_summary(f);
3035 f->close_section();
3036 f->open_object_section("osdmap");
3037 osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
3038 f->close_section();
3039 f->open_object_section("pgmap");
3040 mgrstatmon()->print_summary(f, NULL);
3041 f->close_section();
3042 f->open_object_section("fsmap");
3043
3044 FSMap fsmap_copy = mdsmon()->get_fsmap();
3045 if (!fs_names.empty()) {
3046 fsmap_copy.filter(fs_names);
3047 }
3048 const FSMap *fsmapp = &fsmap_copy;
3049
3050 fsmapp->print_summary(f, NULL);
3051 f->close_section();
3052 f->open_object_section("mgrmap");
3053 mgrmon()->get_map().print_summary(f, nullptr);
3054 f->close_section();
3055
3056 f->dump_object("servicemap", mgrstatmon()->get_service_map());
3057
3058 f->open_object_section("progress_events");
3059 for (auto& i : mgrstatmon()->get_progress_events()) {
3060 f->dump_object(i.first.c_str(), i.second);
3061 }
3062 f->close_section();
3063
3064 f->close_section();
3065 } else {
3066 ss << " cluster:\n";
3067 ss << " id: " << monmap->get_fsid() << "\n";
3068
3069 string health;
3070 healthmon()->get_health_status(false, nullptr, &health,
3071 "\n ", "\n ");
3072 ss << " health: " << health << "\n";
3073
3074 ss << "\n \n services:\n";
3075 {
3076 size_t maxlen = 3;
3077 auto& service_map = mgrstatmon()->get_service_map();
3078 for (auto& p : service_map.services) {
3079 maxlen = std::max(maxlen, p.first.size());
3080 }
3081 string spacing(maxlen - 3, ' ');
3082 const auto quorum_names = get_quorum_names();
3083 const auto mon_count = monmap->mon_info.size();
3084 auto mnow = ceph::mono_clock::now();
3085 ss << " mon: " << spacing << mon_count << " daemons, quorum "
3086 << quorum_names << " (age " << timespan_str(mnow - quorum_since) << ")";
3087 if (quorum_names.size() != mon_count) {
3088 std::list<std::string> out_of_q;
3089 for (size_t i = 0; i < monmap->ranks.size(); ++i) {
3090 if (quorum.count(i) == 0) {
3091 out_of_q.push_back(monmap->ranks[i]);
3092 }
3093 }
3094 ss << ", out of quorum: " << joinify(out_of_q.begin(),
3095 out_of_q.end(), std::string(", "));
3096 }
3097 ss << "\n";
3098 if (mgrmon()->in_use()) {
3099 ss << " mgr: " << spacing;
3100 mgrmon()->get_map().print_summary(nullptr, &ss);
3101 ss << "\n";
3102 }
3103
3104 FSMap fsmap_copy = mdsmon()->get_fsmap();
3105 if (!fs_names.empty()) {
3106 fsmap_copy.filter(fs_names);
3107 }
3108 const FSMap *fsmapp = &fsmap_copy;
3109
3110 if (fsmapp->filesystem_count() > 0 and mdsmon()->should_print_status()){
3111 ss << " mds: " << spacing;
3112 fsmapp->print_daemon_summary(ss);
3113 ss << "\n";
3114 }
3115
3116 ss << " osd: " << spacing;
3117 osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
3118 ss << "\n";
3119 for (auto& p : service_map.services) {
3120 const std::string &service = p.first;
3121 // filter out normal ceph entity types
3122 if (ServiceMap::is_normal_ceph_entity(service)) {
3123 continue;
3124 }
3125 ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
3126 << p.second.get_summary() << "\n";
3127 }
3128 }
3129
3130 if (auto& service_map = mgrstatmon()->get_service_map();
3131 std::any_of(service_map.services.begin(),
3132 service_map.services.end(),
3133 [](auto& service) {
3134 return service.second.has_running_tasks();
3135 })) {
3136 ss << "\n \n task status:\n";
3137 for (auto& [name, service] : service_map.services) {
3138 ss << service.get_task_summary(name);
3139 }
3140 }
3141
3142 ss << "\n \n data:\n";
3143 mdsmon()->print_fs_summary(ss);
3144 mgrstatmon()->print_summary(NULL, &ss);
3145
3146 auto& pem = mgrstatmon()->get_progress_events();
3147 if (!pem.empty()) {
3148 ss << "\n \n progress:\n";
3149 for (auto& i : pem) {
3150 if (i.second.add_to_ceph_s){
3151 ss << " " << i.second.message << "\n";
3152 }
3153 }
3154 }
3155 ss << "\n ";
3156 }
3157 }
3158
3159 void Monitor::_generate_command_map(cmdmap_t& cmdmap,
3160 map<string,string> &param_str_map)
3161 {
3162 for (auto p = cmdmap.begin(); p != cmdmap.end(); ++p) {
3163 if (p->first == "prefix")
3164 continue;
3165 if (p->first == "caps") {
3166 vector<string> cv;
3167 if (cmd_getval(cmdmap, "caps", cv) &&
3168 cv.size() % 2 == 0) {
3169 for (unsigned i = 0; i < cv.size(); i += 2) {
3170 string k = string("caps_") + cv[i];
3171 param_str_map[k] = cv[i + 1];
3172 }
3173 continue;
3174 }
3175 }
3176 param_str_map[p->first] = cmd_vartype_stringify(p->second);
3177 }
3178 }
3179
3180 const MonCommand *Monitor::_get_moncommand(
3181 const string &cmd_prefix,
3182 const vector<MonCommand>& cmds)
3183 {
3184 for (auto& c : cmds) {
3185 if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
3186 return &c;
3187 }
3188 }
3189 return nullptr;
3190 }
3191
3192 bool Monitor::_allowed_command(MonSession *s, const string &module,
3193 const string &prefix, const cmdmap_t& cmdmap,
3194 const map<string,string>& param_str_map,
3195 const MonCommand *this_cmd) {
3196
3197 bool cmd_r = this_cmd->requires_perm('r');
3198 bool cmd_w = this_cmd->requires_perm('w');
3199 bool cmd_x = this_cmd->requires_perm('x');
3200
3201 bool capable = s->caps.is_capable(
3202 g_ceph_context,
3203 s->entity_name,
3204 module, prefix, param_str_map,
3205 cmd_r, cmd_w, cmd_x,
3206 s->get_peer_socket_addr());
3207
3208 dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
3209 return capable;
3210 }
3211
3212 void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
3213 Formatter *f,
3214 uint64_t features,
3215 bufferlist *rdata)
3216 {
3217 int cmdnum = 0;
3218 f->open_object_section("command_descriptions");
3219 for (const auto &cmd : commands) {
3220 unsigned flags = cmd.flags;
3221 ostringstream secname;
3222 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
3223 dump_cmddesc_to_json(f, features, secname.str(),
3224 cmd.cmdstring, cmd.helpstring, cmd.module,
3225 cmd.req_perms, flags);
3226 cmdnum++;
3227 }
3228 f->close_section(); // command_descriptions
3229
3230 f->flush(*rdata);
3231 }
3232
3233 bool Monitor::is_keyring_required()
3234 {
3235 return auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX) ||
3236 auth_service_required.is_supported_auth(CEPH_AUTH_CEPHX) ||
3237 auth_cluster_required.is_supported_auth(CEPH_AUTH_GSS) ||
3238 auth_service_required.is_supported_auth(CEPH_AUTH_GSS);
3239 }
3240
3241 struct C_MgrProxyCommand : public Context {
3242 Monitor *mon;
3243 MonOpRequestRef op;
3244 uint64_t size;
3245 bufferlist outbl;
3246 string outs;
3247 C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
3248 : mon(mon), op(op), size(s) { }
3249 void finish(int r) {
3250 std::lock_guard l(mon->lock);
3251 mon->mgr_proxy_bytes -= size;
3252 mon->reply_command(op, r, outs, outbl, 0);
3253 }
3254 };
3255
3256 void Monitor::handle_tell_command(MonOpRequestRef op)
3257 {
3258 ceph_assert(op->is_type_command());
3259 MCommand *m = static_cast<MCommand*>(op->get_req());
3260 if (m->fsid != monmap->fsid) {
3261 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
3262 return reply_tell_command(op, -EACCES, "wrong fsid");
3263 }
3264 MonSession *session = op->get_session();
3265 if (!session) {
3266 dout(5) << __func__ << " dropping stray message " << *m << dendl;
3267 return;
3268 }
3269 cmdmap_t cmdmap;
3270 if (stringstream ss; !cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3271 return reply_tell_command(op, -EINVAL, ss.str());
3272 }
3273 map<string,string> param_str_map;
3274 _generate_command_map(cmdmap, param_str_map);
3275 string prefix;
3276 if (!cmd_getval(cmdmap, "prefix", prefix)) {
3277 return reply_tell_command(op, -EINVAL, "no prefix");
3278 }
3279 if (auto cmd = _get_moncommand(prefix,
3280 get_local_commands(quorum_mon_features));
3281 cmd) {
3282 if (cmd->is_obsolete() ||
3283 (cct->_conf->mon_debug_deprecated_as_obsolete &&
3284 cmd->is_deprecated())) {
3285 return reply_tell_command(op, -ENOTSUP,
3286 "command is obsolete; "
3287 "please check usage and/or man page");
3288 }
3289 }
3290 // see if command is allowed
3291 if (!session->caps.is_capable(
3292 g_ceph_context,
3293 session->entity_name,
3294 "mon", prefix, param_str_map,
3295 true, true, true,
3296 session->get_peer_socket_addr())) {
3297 return reply_tell_command(op, -EACCES, "insufficient caps");
3298 }
3299 // pass it to asok
3300 cct->get_admin_socket()->queue_tell_command(m);
3301 }
3302
3303 void Monitor::handle_command(MonOpRequestRef op)
3304 {
3305 ceph_assert(op->is_type_command());
3306 auto m = op->get_req<MMonCommand>();
3307 if (m->fsid != monmap->fsid) {
3308 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid
3309 << dendl;
3310 reply_command(op, -EPERM, "wrong fsid", 0);
3311 return;
3312 }
3313
3314 MonSession *session = op->get_session();
3315 if (!session) {
3316 dout(5) << __func__ << " dropping stray message " << *m << dendl;
3317 return;
3318 }
3319
3320 if (m->cmd.empty()) {
3321 reply_command(op, -EINVAL, "no command specified", 0);
3322 return;
3323 }
3324
3325 string prefix;
3326 vector<string> fullcmd;
3327 cmdmap_t cmdmap;
3328 stringstream ss, ds;
3329 bufferlist rdata;
3330 string rs;
3331 int r = -EINVAL;
3332 rs = "unrecognized command";
3333
3334 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3335 // ss has reason for failure
3336 r = -EINVAL;
3337 rs = ss.str();
3338 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3339 reply_command(op, r, rs, 0);
3340 return;
3341 }
3342
3343 // check return value. If no prefix parameter provided,
3344 // return value will be false, then return error info.
3345 if (!cmd_getval(cmdmap, "prefix", prefix)) {
3346 reply_command(op, -EINVAL, "command prefix not found", 0);
3347 return;
3348 }
3349
3350 // check prefix is empty
3351 if (prefix.empty()) {
3352 reply_command(op, -EINVAL, "command prefix must not be empty", 0);
3353 return;
3354 }
3355
3356 if (prefix == "get_command_descriptions") {
3357 bufferlist rdata;
3358 Formatter *f = Formatter::create("json");
3359
3360 std::vector<MonCommand> commands = static_cast<MgrMonitor*>(
3361 paxos_service[PAXOS_MGR].get())->get_command_descs();
3362
3363 for (auto& c : leader_mon_commands) {
3364 commands.push_back(c);
3365 }
3366
3367 auto features = m->get_connection()->get_features();
3368 format_command_descriptions(commands, f, features, &rdata);
3369 delete f;
3370 reply_command(op, 0, "", rdata, 0);
3371 return;
3372 }
3373
3374 dout(0) << "handle_command " << *m << dendl;
3375
3376 string format = cmd_getval_or<string>(cmdmap, "format", "plain");
3377 boost::scoped_ptr<Formatter> f(Formatter::create(format));
3378
3379 get_str_vec(prefix, fullcmd);
3380
3381 // make sure fullcmd is not empty.
3382 // invalid prefix will cause empty vector fullcmd.
3383 // such as, prefix=";,,;"
3384 if (fullcmd.empty()) {
3385 reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
3386 return;
3387 }
3388
3389 std::string_view module = fullcmd[0];
3390
3391 // validate command is in leader map
3392
3393 const MonCommand *leader_cmd;
3394 const auto& mgr_cmds = mgrmon()->get_command_descs();
3395 const MonCommand *mgr_cmd = nullptr;
3396 if (!mgr_cmds.empty()) {
3397 mgr_cmd = _get_moncommand(prefix, mgr_cmds);
3398 }
3399 leader_cmd = _get_moncommand(prefix, leader_mon_commands);
3400 if (!leader_cmd) {
3401 leader_cmd = mgr_cmd;
3402 if (!leader_cmd) {
3403 reply_command(op, -EINVAL, "command not known", 0);
3404 return;
3405 }
3406 }
3407 // validate command is in our map & matches, or forward if it is allowed
3408 const MonCommand *mon_cmd = _get_moncommand(
3409 prefix,
3410 get_local_commands(quorum_mon_features));
3411 if (!mon_cmd) {
3412 mon_cmd = mgr_cmd;
3413 }
3414 if (!is_leader()) {
3415 if (!mon_cmd) {
3416 if (leader_cmd->is_noforward()) {
3417 reply_command(op, -EINVAL,
3418 "command not locally supported and not allowed to forward",
3419 0);
3420 return;
3421 }
3422 dout(10) << "Command not locally supported, forwarding request "
3423 << m << dendl;
3424 forward_request_leader(op);
3425 return;
3426 } else if (!mon_cmd->is_compat(leader_cmd)) {
3427 if (mon_cmd->is_noforward()) {
3428 reply_command(op, -EINVAL,
3429 "command not compatible with leader and not allowed to forward",
3430 0);
3431 return;
3432 }
3433 dout(10) << "Command not compatible with leader, forwarding request "
3434 << m << dendl;
3435 forward_request_leader(op);
3436 return;
3437 }
3438 }
3439
3440 if (mon_cmd->is_obsolete() ||
3441 (cct->_conf->mon_debug_deprecated_as_obsolete
3442 && mon_cmd->is_deprecated())) {
3443 reply_command(op, -ENOTSUP,
3444 "command is obsolete; please check usage and/or man page",
3445 0);
3446 return;
3447 }
3448
3449 if (session->proxy_con && mon_cmd->is_noforward()) {
3450 dout(10) << "Got forward for noforward command " << m << dendl;
3451 reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
3452 return;
3453 }
3454
3455 /* what we perceive as being the service the command falls under */
3456 string service(mon_cmd->module);
3457
3458 dout(25) << __func__ << " prefix='" << prefix
3459 << "' module='" << module
3460 << "' service='" << service << "'" << dendl;
3461
3462 bool cmd_is_rw =
3463 (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
3464
3465 // validate user's permissions for requested command
3466 map<string,string> param_str_map;
3467
3468 // Catch bad_cmd_get exception if _generate_command_map() throws it
3469 try {
3470 _generate_command_map(cmdmap, param_str_map);
3471 } catch (const bad_cmd_get& e) {
3472 reply_command(op, -EINVAL, e.what(), 0);
3473 return;
3474 }
3475
3476 if (!_allowed_command(session, service, prefix, cmdmap,
3477 param_str_map, mon_cmd)) {
3478 dout(1) << __func__ << " access denied" << dendl;
3479 if (prefix != "config set" && prefix != "config-key set")
3480 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3481 << "from='" << session->name << " " << session->addrs << "' "
3482 << "entity='" << session->entity_name << "' "
3483 << "cmd=" << m->cmd << ": access denied";
3484 reply_command(op, -EACCES, "access denied", 0);
3485 return;
3486 }
3487
3488 if (prefix != "config set" && prefix != "config-key set")
3489 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3490 << "from='" << session->name << " " << session->addrs << "' "
3491 << "entity='" << session->entity_name << "' "
3492 << "cmd=" << m->cmd << ": dispatch";
3493
3494 // compat kludge for legacy clients trying to tell commands that are
3495 // new. see bottom of MonCommands.h. we need to handle both (1)
3496 // pre-octopus clients and (2) octopus clients with a mix of pre-octopus
3497 // and octopus mons.
3498 if ((!HAVE_FEATURE(m->get_connection()->get_features(), SERVER_OCTOPUS) ||
3499 monmap->min_mon_release < ceph_release_t::octopus) &&
3500 (prefix == "injectargs" ||
3501 prefix == "smart" ||
3502 prefix == "mon_status" ||
3503 prefix == "heap")) {
3504 if (m->get_connection()->get_messenger() == 0) {
3505 // Prior to octopus, monitors might forward these messages
3506 // around. that was broken at baseline, and if we try to process
3507 // this message now, it will assert out when we try to send a
3508 // message in reply from the asok/tell worker (see
3509 // AnonConnection). Just reply with an error.
3510 dout(5) << __func__ << " failing forwarded command from a (presumably) "
3511 << "pre-octopus peer" << dendl;
3512 reply_command(
3513 op, -EBUSY,
3514 "failing forwarded tell command in mixed-version mon cluster", 0);
3515 return;
3516 }
3517 dout(5) << __func__ << " passing command to tell/asok" << dendl;
3518 cct->get_admin_socket()->queue_tell_command(m);
3519 return;
3520 }
3521
3522 if (mon_cmd->is_mgr()) {
3523 const auto& hdr = m->get_header();
3524 uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
3525 uint64_t max = g_conf().get_val<Option::size_t>("mon_client_bytes")
3526 * g_conf().get_val<double>("mon_mgr_proxy_client_bytes_ratio");
3527 if (mgr_proxy_bytes + size > max) {
3528 dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
3529 << " + " << size << " > max " << max << dendl;
3530 reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
3531 return;
3532 }
3533 mgr_proxy_bytes += size;
3534 dout(10) << __func__ << " proxying mgr command (+" << size
3535 << " -> " << mgr_proxy_bytes << ")" << dendl;
3536 C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
3537 mgr_client.start_command(m->cmd,
3538 m->get_data(),
3539 &fin->outbl,
3540 &fin->outs,
3541 new C_OnFinisher(fin, &finisher));
3542 return;
3543 }
3544
3545 if ((module == "mds" || module == "fs") &&
3546 prefix != "fs authorize") {
3547 mdsmon()->dispatch(op);
3548 return;
3549 }
3550 if ((module == "osd" ||
3551 prefix == "pg map" ||
3552 prefix == "pg repeer") &&
3553 prefix != "osd last-stat-seq") {
3554 osdmon()->dispatch(op);
3555 return;
3556 }
3557 if (module == "config") {
3558 configmon()->dispatch(op);
3559 return;
3560 }
3561
3562 if (module == "mon" &&
3563 /* Let the Monitor class handle the following commands:
3564 * 'mon scrub'
3565 */
3566 prefix != "mon scrub" &&
3567 prefix != "mon metadata" &&
3568 prefix != "mon versions" &&
3569 prefix != "mon count-metadata" &&
3570 prefix != "mon ok-to-stop" &&
3571 prefix != "mon ok-to-add-offline" &&
3572 prefix != "mon ok-to-rm") {
3573 monmon()->dispatch(op);
3574 return;
3575 }
3576 if (module == "health" && prefix != "health") {
3577 healthmon()->dispatch(op);
3578 return;
3579 }
3580 if (module == "auth" || prefix == "fs authorize") {
3581 authmon()->dispatch(op);
3582 return;
3583 }
3584 if (module == "log") {
3585 logmon()->dispatch(op);
3586 return;
3587 }
3588
3589 if (module == "config-key") {
3590 kvmon()->dispatch(op);
3591 return;
3592 }
3593
3594 if (module == "mgr") {
3595 mgrmon()->dispatch(op);
3596 return;
3597 }
3598
3599 if (prefix == "fsid") {
3600 if (f) {
3601 f->open_object_section("fsid");
3602 f->dump_stream("fsid") << monmap->fsid;
3603 f->close_section();
3604 f->flush(rdata);
3605 } else {
3606 ds << monmap->fsid;
3607 rdata.append(ds);
3608 }
3609 reply_command(op, 0, "", rdata, 0);
3610 return;
3611 }
3612
3613 if (prefix == "mon scrub") {
3614 wait_for_paxos_write();
3615 if (is_leader()) {
3616 int r = scrub_start();
3617 reply_command(op, r, "", rdata, 0);
3618 } else if (is_peon()) {
3619 forward_request_leader(op);
3620 } else {
3621 reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3622 }
3623 return;
3624 }
3625
3626 if (prefix == "time-sync-status") {
3627 if (!f)
3628 f.reset(Formatter::create("json-pretty"));
3629 f->open_object_section("time_sync");
3630 if (!timecheck_skews.empty()) {
3631 f->open_object_section("time_skew_status");
3632 for (auto& i : timecheck_skews) {
3633 double skew = i.second;
3634 double latency = timecheck_latencies[i.first];
3635 string name = monmap->get_name(i.first);
3636 ostringstream tcss;
3637 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
3638 f->open_object_section(name.c_str());
3639 f->dump_float("skew", skew);
3640 f->dump_float("latency", latency);
3641 f->dump_stream("health") << tcstatus;
3642 if (tcstatus != HEALTH_OK) {
3643 f->dump_stream("details") << tcss.str();
3644 }
3645 f->close_section();
3646 }
3647 f->close_section();
3648 }
3649 f->open_object_section("timechecks");
3650 f->dump_unsigned("epoch", get_epoch());
3651 f->dump_int("round", timecheck_round);
3652 f->dump_stream("round_status") << ((timecheck_round%2) ?
3653 "on-going" : "finished");
3654 f->close_section();
3655 f->close_section();
3656 f->flush(rdata);
3657 r = 0;
3658 rs = "";
3659 } else if (prefix == "status" ||
3660 prefix == "health" ||
3661 prefix == "df") {
3662 string detail;
3663 cmd_getval(cmdmap, "detail", detail);
3664
3665 if (prefix == "status") {
3666 // get_cluster_status handles f == NULL
3667 get_cluster_status(ds, f.get(), session);
3668
3669 if (f) {
3670 f->flush(ds);
3671 ds << '\n';
3672 }
3673 rdata.append(ds);
3674 } else if (prefix == "health") {
3675 string plain;
3676 healthmon()->get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
3677 if (f) {
3678 f->flush(ds);
3679 rdata.append(ds);
3680 } else {
3681 rdata.append(plain);
3682 }
3683 } else if (prefix == "df") {
3684 bool verbose = (detail == "detail");
3685 if (f)
3686 f->open_object_section("stats");
3687
3688 mgrstatmon()->dump_cluster_stats(&ds, f.get(), verbose);
3689 if (!f) {
3690 ds << "\n \n";
3691 }
3692 mgrstatmon()->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
3693
3694 if (f) {
3695 f->close_section();
3696 f->flush(ds);
3697 ds << '\n';
3698 }
3699 } else {
3700 ceph_abort_msg("We should never get here!");
3701 return;
3702 }
3703 rdata.append(ds);
3704 rs = "";
3705 r = 0;
3706 } else if (prefix == "report") {
3707 // some of the report data is only known by leader, e.g. osdmap_clean_epochs
3708 if (!is_leader() && !is_peon()) {
3709 dout(10) << " waiting for quorum" << dendl;
3710 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3711 return;
3712 }
3713 if (!is_leader()) {
3714 forward_request_leader(op);
3715 return;
3716 }
3717 // this must be formatted, in its current form
3718 if (!f)
3719 f.reset(Formatter::create("json-pretty"));
3720 f->open_object_section("report");
3721 f->dump_stream("cluster_fingerprint") << fingerprint;
3722 f->dump_string("version", ceph_version_to_str());
3723 f->dump_string("commit", git_version_to_str());
3724 f->dump_stream("timestamp") << ceph_clock_now();
3725
3726 vector<string> tagsvec;
3727 cmd_getval(cmdmap, "tags", tagsvec);
3728 string tagstr = str_join(tagsvec, " ");
3729 if (!tagstr.empty())
3730 tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3731 f->dump_string("tag", tagstr);
3732
3733 healthmon()->get_health_status(true, f.get(), nullptr);
3734
3735 monmon()->dump_info(f.get());
3736 osdmon()->dump_info(f.get());
3737 mdsmon()->dump_info(f.get());
3738 authmon()->dump_info(f.get());
3739 mgrstatmon()->dump_info(f.get());
3740 logmon()->dump_info(f.get());
3741
3742 paxos->dump_info(f.get());
3743
3744 f->close_section();
3745 f->flush(rdata);
3746
3747 ostringstream ss2;
3748 ss2 << "report " << rdata.crc32c(CEPH_MON_PORT_LEGACY);
3749 rs = ss2.str();
3750 r = 0;
3751 } else if (prefix == "osd last-stat-seq") {
3752 int64_t osd = 0;
3753 cmd_getval(cmdmap, "id", osd);
3754 uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3755 if (f) {
3756 f->dump_unsigned("seq", seq);
3757 f->flush(ds);
3758 } else {
3759 ds << seq;
3760 rdata.append(ds);
3761 }
3762 rs = "";
3763 r = 0;
3764 } else if (prefix == "node ls") {
3765 string node_type("all");
3766 cmd_getval(cmdmap, "type", node_type);
3767 if (!f)
3768 f.reset(Formatter::create("json-pretty"));
3769 if (node_type == "all") {
3770 f->open_object_section("nodes");
3771 print_nodes(f.get(), ds);
3772 osdmon()->print_nodes(f.get());
3773 mdsmon()->print_nodes(f.get());
3774 mgrmon()->print_nodes(f.get());
3775 f->close_section();
3776 } else if (node_type == "mon") {
3777 print_nodes(f.get(), ds);
3778 } else if (node_type == "osd") {
3779 osdmon()->print_nodes(f.get());
3780 } else if (node_type == "mds") {
3781 mdsmon()->print_nodes(f.get());
3782 } else if (node_type == "mgr") {
3783 mgrmon()->print_nodes(f.get());
3784 }
3785 f->flush(ds);
3786 rdata.append(ds);
3787 rs = "";
3788 r = 0;
3789 } else if (prefix == "features") {
3790 if (!is_leader() && !is_peon()) {
3791 dout(10) << " waiting for quorum" << dendl;
3792 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3793 return;
3794 }
3795 if (!is_leader()) {
3796 forward_request_leader(op);
3797 return;
3798 }
3799 if (!f)
3800 f.reset(Formatter::create("json-pretty"));
3801 FeatureMap fm;
3802 get_combined_feature_map(&fm);
3803 f->dump_object("features", fm);
3804 f->flush(rdata);
3805 rs = "";
3806 r = 0;
3807 } else if (prefix == "mon metadata") {
3808 if (!f)
3809 f.reset(Formatter::create("json-pretty"));
3810
3811 string name;
3812 bool all = !cmd_getval(cmdmap, "id", name);
3813 if (!all) {
3814 // Dump a single mon's metadata
3815 int mon = monmap->get_rank(name);
3816 if (mon < 0) {
3817 rs = "requested mon not found";
3818 r = -ENOENT;
3819 goto out;
3820 }
3821 f->open_object_section("mon_metadata");
3822 r = get_mon_metadata(mon, f.get(), ds);
3823 f->close_section();
3824 } else {
3825 // Dump all mons' metadata
3826 r = 0;
3827 f->open_array_section("mon_metadata");
3828 for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3829 std::ostringstream get_err;
3830 f->open_object_section("mon");
3831 f->dump_string("name", monmap->get_name(rank));
3832 r = get_mon_metadata(rank, f.get(), get_err);
3833 f->close_section();
3834 if (r == -ENOENT || r == -EINVAL) {
3835 dout(1) << get_err.str() << dendl;
3836 // Drop error, list what metadata we do have
3837 r = 0;
3838 } else if (r != 0) {
3839 derr << "Unexpected error from get_mon_metadata: "
3840 << cpp_strerror(r) << dendl;
3841 ds << get_err.str();
3842 break;
3843 }
3844 }
3845 f->close_section();
3846 }
3847
3848 f->flush(ds);
3849 rdata.append(ds);
3850 rs = "";
3851 } else if (prefix == "mon versions") {
3852 if (!f)
3853 f.reset(Formatter::create("json-pretty"));
3854 count_metadata("ceph_version", f.get());
3855 f->flush(ds);
3856 rdata.append(ds);
3857 rs = "";
3858 r = 0;
3859 } else if (prefix == "mon count-metadata") {
3860 if (!f)
3861 f.reset(Formatter::create("json-pretty"));
3862 string field;
3863 cmd_getval(cmdmap, "property", field);
3864 count_metadata(field, f.get());
3865 f->flush(ds);
3866 rdata.append(ds);
3867 rs = "";
3868 r = 0;
3869 } else if (prefix == "quorum_status") {
3870 // make sure our map is readable and up to date
3871 if (!is_leader() && !is_peon()) {
3872 dout(10) << " waiting for quorum" << dendl;
3873 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3874 return;
3875 }
3876 _quorum_status(f.get(), ds);
3877 rdata.append(ds);
3878 rs = "";
3879 r = 0;
3880 } else if (prefix == "mon ok-to-stop") {
3881 vector<string> ids, invalid_ids;
3882 if (!cmd_getval(cmdmap, "ids", ids)) {
3883 r = -EINVAL;
3884 goto out;
3885 }
3886 set<string> wouldbe;
3887 for (auto rank : quorum) {
3888 wouldbe.insert(monmap->get_name(rank));
3889 }
3890 for (auto& n : ids) {
3891 if (monmap->contains(n)) {
3892 wouldbe.erase(n);
3893 } else {
3894 invalid_ids.push_back(n);
3895 }
3896 }
3897 if (!invalid_ids.empty()) {
3898 r = 0;
3899 rs = "invalid mon(s) specified: " + stringify(invalid_ids);
3900 goto out;
3901 }
3902
3903 if (wouldbe.size() < monmap->min_quorum_size()) {
3904 r = -EBUSY;
3905 rs = "not enough monitors would be available (" + stringify(wouldbe) +
3906 ") after stopping mons " + stringify(ids);
3907 goto out;
3908 }
3909 r = 0;
3910 rs = "quorum should be preserved (" + stringify(wouldbe) +
3911 ") after stopping " + stringify(ids);
3912 } else if (prefix == "mon ok-to-add-offline") {
3913 if (quorum.size() < monmap->min_quorum_size(monmap->size() + 1)) {
3914 rs = "adding a monitor may break quorum (until that monitor starts)";
3915 r = -EBUSY;
3916 goto out;
3917 }
3918 rs = "adding another mon that is not yet online will not break quorum";
3919 r = 0;
3920 } else if (prefix == "mon ok-to-rm") {
3921 string id;
3922 if (!cmd_getval(cmdmap, "id", id)) {
3923 r = -EINVAL;
3924 rs = "must specify a monitor id";
3925 goto out;
3926 }
3927 if (!monmap->contains(id)) {
3928 r = 0;
3929 rs = "mon." + id + " does not exist";
3930 goto out;
3931 }
3932 int rank = monmap->get_rank(id);
3933 if (quorum.count(rank) &&
3934 quorum.size() - 1 < monmap->min_quorum_size(monmap->size() - 1)) {
3935 r = -EBUSY;
3936 rs = "removing mon." + id + " would break quorum";
3937 goto out;
3938 }
3939 r = 0;
3940 rs = "safe to remove mon." + id;
3941 } else if (prefix == "version") {
3942 if (f) {
3943 f->open_object_section("version");
3944 f->dump_string("version", pretty_version_to_str());
3945 f->close_section();
3946 f->flush(ds);
3947 } else {
3948 ds << pretty_version_to_str();
3949 }
3950 rdata.append(ds);
3951 rs = "";
3952 r = 0;
3953 } else if (prefix == "versions") {
3954 if (!f)
3955 f.reset(Formatter::create("json-pretty"));
3956 map<string,int> overall;
3957 f->open_object_section("version");
3958 map<string,int> mon, mgr, osd, mds;
3959
3960 count_metadata("ceph_version", &mon);
3961 f->open_object_section("mon");
3962 for (auto& p : mon) {
3963 f->dump_int(p.first.c_str(), p.second);
3964 overall[p.first] += p.second;
3965 }
3966 f->close_section();
3967
3968 mgrmon()->count_metadata("ceph_version", &mgr);
3969 if (!mgr.empty()) {
3970 f->open_object_section("mgr");
3971 for (auto& p : mgr) {
3972 f->dump_int(p.first.c_str(), p.second);
3973 overall[p.first] += p.second;
3974 }
3975 f->close_section();
3976 }
3977
3978 osdmon()->count_metadata("ceph_version", &osd);
3979 if (!osd.empty()) {
3980 f->open_object_section("osd");
3981 for (auto& p : osd) {
3982 f->dump_int(p.first.c_str(), p.second);
3983 overall[p.first] += p.second;
3984 }
3985 f->close_section();
3986 }
3987
3988 mdsmon()->count_metadata("ceph_version", &mds);
3989 if (!mds.empty()) {
3990 f->open_object_section("mds");
3991 for (auto& p : mds) {
3992 f->dump_int(p.first.c_str(), p.second);
3993 overall[p.first] += p.second;
3994 }
3995 f->close_section();
3996 }
3997
3998 for (auto& p : mgrstatmon()->get_service_map().services) {
3999 auto &service = p.first;
4000 if (ServiceMap::is_normal_ceph_entity(service)) {
4001 continue;
4002 }
4003 f->open_object_section(service.c_str());
4004 map<string,int> m;
4005 p.second.count_metadata("ceph_version", &m);
4006 for (auto& q : m) {
4007 f->dump_int(q.first.c_str(), q.second);
4008 overall[q.first] += q.second;
4009 }
4010 f->close_section();
4011 }
4012
4013 f->open_object_section("overall");
4014 for (auto& p : overall) {
4015 f->dump_int(p.first.c_str(), p.second);
4016 }
4017 f->close_section();
4018 f->close_section();
4019 f->flush(rdata);
4020 rs = "";
4021 r = 0;
4022 }
4023
4024 out:
4025 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
4026 reply_command(op, r, rs, rdata, 0);
4027 }
4028
4029 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
4030 {
4031 bufferlist rdata;
4032 reply_command(op, rc, rs, rdata, version);
4033 }
4034
4035 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
4036 bufferlist& rdata, version_t version)
4037 {
4038 auto m = op->get_req<MMonCommand>();
4039 ceph_assert(m->get_type() == MSG_MON_COMMAND);
4040 MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
4041 reply->set_tid(m->get_tid());
4042 reply->set_data(rdata);
4043 send_reply(op, reply);
4044 }
4045
4046 void Monitor::reply_tell_command(
4047 MonOpRequestRef op, int rc, const string &rs)
4048 {
4049 MCommand *m = static_cast<MCommand*>(op->get_req());
4050 ceph_assert(m->get_type() == MSG_COMMAND);
4051 MCommandReply *reply = new MCommandReply(rc, rs);
4052 reply->set_tid(m->get_tid());
4053 m->get_connection()->send_message(reply);
4054 }
4055
4056
4057 // ------------------------
4058 // request/reply routing
4059 //
4060 // a client/mds/osd will connect to a random monitor. we need to forward any
4061 // messages requiring state updates to the leader, and then route any replies
4062 // back via the correct monitor and back to them. (the monitor will not
4063 // initiate any connections.)
4064
4065 void Monitor::forward_request_leader(MonOpRequestRef op)
4066 {
4067 op->mark_event(__func__);
4068
4069 int mon = get_leader();
4070 MonSession *session = op->get_session();
4071 PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
4072
4073 if (req->get_source().is_mon() && req->get_source_addrs() != messenger->get_myaddrs()) {
4074 dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
4075 } else if (session->proxy_con) {
4076 dout(10) << "forward_request won't double fwd request " << *req << dendl;
4077 } else if (!session->closed) {
4078 RoutedRequest *rr = new RoutedRequest;
4079 rr->tid = ++routed_request_tid;
4080 rr->con = req->get_connection();
4081 rr->con_features = rr->con->get_features();
4082 encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
4083 rr->session = static_cast<MonSession *>(session->get());
4084 rr->op = op;
4085 routed_requests[rr->tid] = rr;
4086 session->routed_request_tids.insert(rr->tid);
4087
4088 dout(10) << "forward_request " << rr->tid << " request " << *req
4089 << " features " << rr->con_features << dendl;
4090
4091 MForward *forward = new MForward(rr->tid,
4092 req,
4093 rr->con_features,
4094 rr->session->caps);
4095 forward->set_priority(req->get_priority());
4096 if (session->auth_handler) {
4097 forward->entity_name = session->entity_name;
4098 } else if (req->get_source().is_mon()) {
4099 forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
4100 }
4101 send_mon_message(forward, mon);
4102 op->mark_forwarded();
4103 ceph_assert(op->get_req()->get_type() != 0);
4104 } else {
4105 dout(10) << "forward_request no session for request " << *req << dendl;
4106 }
4107 }
4108
4109 // fake connection attached to forwarded messages
4110 struct AnonConnection : public Connection {
4111 entity_addr_t socket_addr;
4112
4113 int send_message(Message *m) override {
4114 ceph_assert(!"send_message on anonymous connection");
4115 }
4116 void send_keepalive() override {
4117 ceph_assert(!"send_keepalive on anonymous connection");
4118 }
4119 void mark_down() override {
4120 // silently ignore
4121 }
4122 void mark_disposable() override {
4123 // silengtly ignore
4124 }
4125 bool is_connected() override { return false; }
4126 entity_addr_t get_peer_socket_addr() const override {
4127 return socket_addr;
4128 }
4129
4130 private:
4131 FRIEND_MAKE_REF(AnonConnection);
4132 explicit AnonConnection(CephContext *cct, const entity_addr_t& sa)
4133 : Connection(cct, nullptr),
4134 socket_addr(sa) {}
4135 };
4136
4137 //extract the original message and put it into the regular dispatch function
4138 void Monitor::handle_forward(MonOpRequestRef op)
4139 {
4140 auto m = op->get_req<MForward>();
4141 dout(10) << "received forwarded message from "
4142 << ceph_entity_type_name(m->client_type)
4143 << " " << m->client_addrs
4144 << " via " << m->get_source_inst() << dendl;
4145 MonSession *session = op->get_session();
4146 ceph_assert(session);
4147
4148 if (!session->is_capable("mon", MON_CAP_X)) {
4149 dout(0) << "forward from entity with insufficient caps! "
4150 << session->caps << dendl;
4151 } else {
4152 // see PaxosService::dispatch(); we rely on this being anon
4153 // (c->msgr == NULL)
4154 PaxosServiceMessage *req = m->claim_message();
4155 ceph_assert(req != NULL);
4156
4157 auto c = ceph::make_ref<AnonConnection>(cct, m->client_socket_addr);
4158 MonSession *s = new MonSession(static_cast<Connection*>(c.get()));
4159 s->_ident(req->get_source(),
4160 req->get_source_addrs());
4161 c->set_priv(RefCountedPtr{s, false});
4162 c->set_peer_addrs(m->client_addrs);
4163 c->set_peer_type(m->client_type);
4164 c->set_features(m->con_features);
4165
4166 s->authenticated = true;
4167 s->caps = m->client_caps;
4168 dout(10) << " caps are " << s->caps << dendl;
4169 s->entity_name = m->entity_name;
4170 dout(10) << " entity name '" << s->entity_name << "' type "
4171 << s->entity_name.get_type() << dendl;
4172 s->proxy_con = m->get_connection();
4173 s->proxy_tid = m->tid;
4174
4175 req->set_connection(c);
4176
4177 // not super accurate, but better than nothing.
4178 req->set_recv_stamp(m->get_recv_stamp());
4179
4180 /*
4181 * note which election epoch this is; we will drop the message if
4182 * there is a future election since our peers will resend routed
4183 * requests in that case.
4184 */
4185 req->rx_election_epoch = get_epoch();
4186
4187 dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
4188 _ms_dispatch(req);
4189
4190 // break the session <-> con ref loop by removing the con->session
4191 // reference, which is no longer needed once the MonOpRequest is
4192 // set up.
4193 c->set_priv(NULL);
4194 }
4195 }
4196
4197 void Monitor::send_reply(MonOpRequestRef op, Message *reply)
4198 {
4199 op->mark_event(__func__);
4200
4201 MonSession *session = op->get_session();
4202 ceph_assert(session);
4203 Message *req = op->get_req();
4204 ConnectionRef con = op->get_connection();
4205
4206 reply->set_cct(g_ceph_context);
4207 dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
4208
4209 if (!con) {
4210 dout(2) << "send_reply no connection, dropping reply " << *reply
4211 << " to " << req << " " << *req << dendl;
4212 reply->put();
4213 op->mark_event("reply: no connection");
4214 return;
4215 }
4216
4217 if (!session->con && !session->proxy_con) {
4218 dout(2) << "send_reply no connection, dropping reply " << *reply
4219 << " to " << req << " " << *req << dendl;
4220 reply->put();
4221 op->mark_event("reply: no connection");
4222 return;
4223 }
4224
4225 if (session->proxy_con) {
4226 dout(15) << "send_reply routing reply to " << con->get_peer_addr()
4227 << " via " << session->proxy_con->get_peer_addr()
4228 << " for request " << *req << dendl;
4229 session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
4230 op->mark_event("reply: send routed request");
4231 } else {
4232 session->con->send_message(reply);
4233 op->mark_event("reply: send");
4234 }
4235 }
4236
4237 void Monitor::no_reply(MonOpRequestRef op)
4238 {
4239 MonSession *session = op->get_session();
4240 Message *req = op->get_req();
4241
4242 if (session->proxy_con) {
4243 dout(10) << "no_reply to " << req->get_source_inst()
4244 << " via " << session->proxy_con->get_peer_addr()
4245 << " for request " << *req << dendl;
4246 session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
4247 op->mark_event("no_reply: send routed request");
4248 } else {
4249 dout(10) << "no_reply to " << req->get_source_inst()
4250 << " " << *req << dendl;
4251 op->mark_event("no_reply");
4252 }
4253 }
4254
4255 void Monitor::handle_route(MonOpRequestRef op)
4256 {
4257 auto m = op->get_req<MRoute>();
4258 MonSession *session = op->get_session();
4259 //check privileges
4260 if (!session->is_capable("mon", MON_CAP_X)) {
4261 dout(0) << "MRoute received from entity without appropriate perms! "
4262 << dendl;
4263 return;
4264 }
4265 if (m->msg)
4266 dout(10) << "handle_route tid " << m->session_mon_tid << " " << *m->msg
4267 << dendl;
4268 else
4269 dout(10) << "handle_route tid " << m->session_mon_tid << " null" << dendl;
4270
4271 // look it up
4272 if (!m->session_mon_tid) {
4273 dout(10) << " not a routed request, ignoring" << dendl;
4274 return;
4275 }
4276 auto found = routed_requests.find(m->session_mon_tid);
4277 if (found == routed_requests.end()) {
4278 dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
4279 return;
4280 }
4281 std::unique_ptr<RoutedRequest> rr{found->second};
4282 // reset payload, in case encoding is dependent on target features
4283 if (m->msg) {
4284 m->msg->clear_payload();
4285 rr->con->send_message(m->msg);
4286 m->msg = NULL;
4287 }
4288 if (m->send_osdmap_first) {
4289 dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
4290 osdmon()->send_incremental(m->send_osdmap_first, rr->session,
4291 true, MonOpRequestRef());
4292 }
4293 ceph_assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
4294 routed_requests.erase(found);
4295 rr->session->routed_request_tids.erase(m->session_mon_tid);
4296 }
4297
4298 void Monitor::resend_routed_requests()
4299 {
4300 dout(10) << "resend_routed_requests" << dendl;
4301 int mon = get_leader();
4302 list<Context*> retry;
4303 for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
4304 p != routed_requests.end();
4305 ++p) {
4306 RoutedRequest *rr = p->second;
4307
4308 if (mon == rank) {
4309 dout(10) << " requeue for self tid " << rr->tid << dendl;
4310 rr->op->mark_event("retry routed request");
4311 retry.push_back(new C_RetryMessage(this, rr->op));
4312 if (rr->session) {
4313 ceph_assert(rr->session->routed_request_tids.count(p->first));
4314 rr->session->routed_request_tids.erase(p->first);
4315 }
4316 delete rr;
4317 } else {
4318 auto q = rr->request_bl.cbegin();
4319 PaxosServiceMessage *req =
4320 (PaxosServiceMessage *)decode_message(cct, 0, q);
4321 rr->op->mark_event("resend forwarded message to leader");
4322 dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req
4323 << dendl;
4324 MForward *forward = new MForward(rr->tid,
4325 req,
4326 rr->con_features,
4327 rr->session->caps);
4328 req->put(); // forward takes its own ref; drop ours.
4329 forward->client_type = rr->con->get_peer_type();
4330 forward->client_addrs = rr->con->get_peer_addrs();
4331 forward->client_socket_addr = rr->con->get_peer_socket_addr();
4332 forward->set_priority(req->get_priority());
4333 send_mon_message(forward, mon);
4334 }
4335 }
4336 if (mon == rank) {
4337 routed_requests.clear();
4338 finish_contexts(g_ceph_context, retry);
4339 }
4340 }
4341
4342 void Monitor::remove_session(MonSession *s)
4343 {
4344 dout(10) << "remove_session " << s << " " << s->name << " " << s->addrs
4345 << " features 0x" << std::hex << s->con_features << std::dec << dendl;
4346 ceph_assert(s->con);
4347 ceph_assert(!s->closed);
4348 for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
4349 p != s->routed_request_tids.end();
4350 ++p) {
4351 ceph_assert(routed_requests.count(*p));
4352 RoutedRequest *rr = routed_requests[*p];
4353 dout(10) << " dropping routed request " << rr->tid << dendl;
4354 delete rr;
4355 routed_requests.erase(*p);
4356 }
4357 s->routed_request_tids.clear();
4358 s->con->set_priv(nullptr);
4359 session_map.remove_session(s);
4360 logger->set(l_mon_num_sessions, session_map.get_size());
4361 logger->inc(l_mon_session_rm);
4362 }
4363
4364 void Monitor::remove_all_sessions()
4365 {
4366 std::lock_guard l(session_map_lock);
4367 while (!session_map.sessions.empty()) {
4368 MonSession *s = session_map.sessions.front();
4369 remove_session(s);
4370 logger->inc(l_mon_session_rm);
4371 }
4372 if (logger)
4373 logger->set(l_mon_num_sessions, session_map.get_size());
4374 }
4375
4376 void Monitor::send_mon_message(Message *m, int rank)
4377 {
4378 messenger->send_to_mon(m, monmap->get_addrs(rank));
4379 }
4380
4381 void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
4382 {
4383 /**
4384 * Wait list the new session until we're in the quorum, assuming it's
4385 * sufficiently new.
4386 * tick() will periodically send them back through so we can send
4387 * the client elsewhere if we don't think we're getting back in.
4388 *
4389 * But we allow a few sorts of messages:
4390 * 1) Monitors can talk to us at any time, of course.
4391 * 2) auth messages. It's unlikely to go through much faster, but
4392 * it's possible we've just lost our quorum status and we want to take...
4393 * 3) command messages. We want to accept these under all possible
4394 * circumstances.
4395 */
4396 Message *m = op->get_req();
4397 MonSession *s = op->get_session();
4398 ConnectionRef con = op->get_connection();
4399 utime_t too_old = ceph_clock_now();
4400 too_old -= g_ceph_context->_conf->mon_lease;
4401 if (m->get_recv_stamp() > too_old &&
4402 con->is_connected()) {
4403 dout(5) << "waitlisting message " << *m << dendl;
4404 maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
4405 op->mark_wait_for_quorum();
4406 } else {
4407 dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
4408 con->mark_down();
4409 // proxied sessions aren't registered and don't have a con; don't remove
4410 // those.
4411 if (!s->proxy_con) {
4412 std::lock_guard l(session_map_lock);
4413 remove_session(s);
4414 }
4415 op->mark_zap();
4416 }
4417 }
4418
4419 void Monitor::_ms_dispatch(Message *m)
4420 {
4421 if (is_shutdown()) {
4422 m->put();
4423 return;
4424 }
4425
4426 MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
4427 bool src_is_mon = op->is_src_mon();
4428 op->mark_event("mon:_ms_dispatch");
4429 MonSession *s = op->get_session();
4430 if (s && s->closed) {
4431 return;
4432 }
4433
4434 if (src_is_mon && s) {
4435 ConnectionRef con = m->get_connection();
4436 if (con->get_messenger() && con->get_features() != s->con_features) {
4437 // only update features if this is a non-anonymous connection
4438 dout(10) << __func__ << " feature change for " << m->get_source_inst()
4439 << " (was " << s->con_features
4440 << ", now " << con->get_features() << ")" << dendl;
4441 // connection features changed - recreate session.
4442 if (s->con && s->con != con) {
4443 dout(10) << __func__ << " connection for " << m->get_source_inst()
4444 << " changed from session; mark down and replace" << dendl;
4445 s->con->mark_down();
4446 }
4447 if (s->item.is_on_list()) {
4448 // forwarded messages' sessions are not in the sessions map and
4449 // exist only while the op is being handled.
4450 std::lock_guard l(session_map_lock);
4451 remove_session(s);
4452 }
4453 s = nullptr;
4454 }
4455 }
4456
4457 if (!s) {
4458 // if the sender is not a monitor, make sure their first message for a
4459 // session is an MAuth. If it is not, assume it's a stray message,
4460 // and considering that we are creating a new session it is safe to
4461 // assume that the sender hasn't authenticated yet, so we have no way
4462 // of assessing whether we should handle it or not.
4463 if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
4464 m->get_type() != CEPH_MSG_MON_GET_MAP &&
4465 m->get_type() != CEPH_MSG_PING)) {
4466 dout(1) << __func__ << " dropping stray message " << *m
4467 << " from " << m->get_source_inst() << dendl;
4468 return;
4469 }
4470
4471 ConnectionRef con = m->get_connection();
4472 {
4473 std::lock_guard l(session_map_lock);
4474 s = session_map.new_session(m->get_source(),
4475 m->get_source_addrs(),
4476 con.get());
4477 }
4478 ceph_assert(s);
4479 con->set_priv(RefCountedPtr{s, false});
4480 dout(10) << __func__ << " new session " << s << " " << *s
4481 << " features 0x" << std::hex
4482 << s->con_features << std::dec << dendl;
4483 op->set_session(s);
4484
4485 logger->set(l_mon_num_sessions, session_map.get_size());
4486 logger->inc(l_mon_session_add);
4487
4488 if (src_is_mon) {
4489 // give it monitor caps; the peer type has been authenticated
4490 dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
4491 if (!s->caps.is_allow_all()) // but no need to repeatedly copy
4492 s->caps = mon_caps;
4493 s->authenticated = true;
4494 }
4495 } else {
4496 dout(20) << __func__ << " existing session " << s << " for " << s->name
4497 << dendl;
4498 }
4499
4500 ceph_assert(s);
4501
4502 s->session_timeout = ceph_clock_now();
4503 s->session_timeout += g_conf()->mon_session_timeout;
4504
4505 if (s->auth_handler) {
4506 s->entity_name = s->auth_handler->get_entity_name();
4507 s->global_id = s->auth_handler->get_global_id();
4508 s->global_id_status = s->auth_handler->get_global_id_status();
4509 }
4510 dout(20) << " entity_name " << s->entity_name
4511 << " global_id " << s->global_id
4512 << " (" << s->global_id_status
4513 << ") caps " << s->caps.get_str() << dendl;
4514
4515 if (!session_stretch_allowed(s, op)) {
4516 return;
4517 }
4518 if ((is_synchronizing() ||
4519 (!s->authenticated && !exited_quorum.is_zero())) &&
4520 !src_is_mon &&
4521 m->get_type() != CEPH_MSG_PING) {
4522 waitlist_or_zap_client(op);
4523 } else {
4524 dispatch_op(op);
4525 }
4526 return;
4527 }
4528
4529 void Monitor::dispatch_op(MonOpRequestRef op)
4530 {
4531 op->mark_event("mon:dispatch_op");
4532 MonSession *s = op->get_session();
4533 ceph_assert(s);
4534 if (s->closed) {
4535 dout(10) << " session closed, dropping " << op->get_req() << dendl;
4536 return;
4537 }
4538
4539 /* we will consider the default type as being 'monitor' until proven wrong */
4540 op->set_type_monitor();
4541 /* deal with all messages that do not necessarily need caps */
4542 switch (op->get_req()->get_type()) {
4543 // auth
4544 case MSG_MON_GLOBAL_ID:
4545 case MSG_MON_USED_PENDING_KEYS:
4546 case CEPH_MSG_AUTH:
4547 op->set_type_service();
4548 /* no need to check caps here */
4549 paxos_service[PAXOS_AUTH]->dispatch(op);
4550 return;
4551
4552 case CEPH_MSG_PING:
4553 handle_ping(op);
4554 return;
4555 case MSG_COMMAND:
4556 op->set_type_command();
4557 handle_tell_command(op);
4558 return;
4559 }
4560
4561 if (!op->get_session()->authenticated) {
4562 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4563 << " is not authenticated, dropping " << *(op->get_req())
4564 << dendl;
4565 return;
4566 }
4567
4568 // global_id_status == NONE: all sessions for auth_none and krb,
4569 // mon <-> mon sessions (including proxied sessions) for cephx
4570 ceph_assert(s->global_id_status == global_id_status_t::NONE ||
4571 s->global_id_status == global_id_status_t::NEW_OK ||
4572 s->global_id_status == global_id_status_t::NEW_NOT_EXPOSED ||
4573 s->global_id_status == global_id_status_t::RECLAIM_OK ||
4574 s->global_id_status == global_id_status_t::RECLAIM_INSECURE);
4575
4576 // let mon_getmap through for "ping" (which doesn't reconnect)
4577 // and "tell" (which reconnects but doesn't attempt to preserve
4578 // its global_id and stays in NEW_NOT_EXPOSED, retrying until
4579 // ->send_attempts reaches 0)
4580 if (cct->_conf->auth_expose_insecure_global_id_reclaim &&
4581 s->global_id_status == global_id_status_t::NEW_NOT_EXPOSED &&
4582 op->get_req()->get_type() != CEPH_MSG_MON_GET_MAP) {
4583 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4584 << " may omit old_ticket on reconnects, discarding "
4585 << *op->get_req() << " and forcing reconnect" << dendl;
4586 ceph_assert(s->con && !s->proxy_con);
4587 s->con->mark_down();
4588 {
4589 std::lock_guard l(session_map_lock);
4590 remove_session(s);
4591 }
4592 op->mark_zap();
4593 return;
4594 }
4595
4596 switch (op->get_req()->get_type()) {
4597 case CEPH_MSG_MON_GET_MAP:
4598 handle_mon_get_map(op);
4599 return;
4600
4601 case MSG_GET_CONFIG:
4602 configmon()->handle_get_config(op);
4603 return;
4604
4605 case CEPH_MSG_MON_SUBSCRIBE:
4606 /* FIXME: check what's being subscribed, filter accordingly */
4607 handle_subscribe(op);
4608 return;
4609 }
4610
4611 /* well, maybe the op belongs to a service... */
4612 op->set_type_service();
4613 /* deal with all messages which caps should be checked somewhere else */
4614 switch (op->get_req()->get_type()) {
4615
4616 // OSDs
4617 case CEPH_MSG_MON_GET_OSDMAP:
4618 case CEPH_MSG_POOLOP:
4619 case MSG_OSD_BEACON:
4620 case MSG_OSD_MARK_ME_DOWN:
4621 case MSG_OSD_MARK_ME_DEAD:
4622 case MSG_OSD_FULL:
4623 case MSG_OSD_FAILURE:
4624 case MSG_OSD_BOOT:
4625 case MSG_OSD_ALIVE:
4626 case MSG_OSD_PGTEMP:
4627 case MSG_OSD_PG_CREATED:
4628 case MSG_REMOVE_SNAPS:
4629 case MSG_MON_GET_PURGED_SNAPS:
4630 case MSG_OSD_PG_READY_TO_MERGE:
4631 paxos_service[PAXOS_OSDMAP]->dispatch(op);
4632 return;
4633
4634 // MDSs
4635 case MSG_MDS_BEACON:
4636 case MSG_MDS_OFFLOAD_TARGETS:
4637 paxos_service[PAXOS_MDSMAP]->dispatch(op);
4638 return;
4639
4640 // Mgrs
4641 case MSG_MGR_BEACON:
4642 paxos_service[PAXOS_MGR]->dispatch(op);
4643 return;
4644
4645 // MgrStat
4646 case MSG_MON_MGR_REPORT:
4647 case CEPH_MSG_STATFS:
4648 case MSG_GETPOOLSTATS:
4649 paxos_service[PAXOS_MGRSTAT]->dispatch(op);
4650 return;
4651
4652 // log
4653 case MSG_LOG:
4654 paxos_service[PAXOS_LOG]->dispatch(op);
4655 return;
4656
4657 // handle_command() does its own caps checking
4658 case MSG_MON_COMMAND:
4659 op->set_type_command();
4660 handle_command(op);
4661 return;
4662 }
4663
4664 /* nop, looks like it's not a service message; revert back to monitor */
4665 op->set_type_monitor();
4666
4667 /* messages we, the Monitor class, need to deal with
4668 * but may be sent by clients. */
4669
4670 if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
4671 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4672 << " not enough caps for " << *(op->get_req()) << " -- dropping"
4673 << dendl;
4674 return;
4675 }
4676
4677 switch (op->get_req()->get_type()) {
4678 // misc
4679 case CEPH_MSG_MON_GET_VERSION:
4680 handle_get_version(op);
4681 return;
4682 }
4683
4684 if (!op->is_src_mon()) {
4685 dout(1) << __func__ << " unexpected monitor message from"
4686 << " non-monitor entity " << op->get_req()->get_source_inst()
4687 << " " << *(op->get_req()) << " -- dropping" << dendl;
4688 return;
4689 }
4690
4691 /* messages that should only be sent by another monitor */
4692 switch (op->get_req()->get_type()) {
4693
4694 case MSG_ROUTE:
4695 handle_route(op);
4696 return;
4697
4698 case MSG_MON_PROBE:
4699 handle_probe(op);
4700 return;
4701
4702 // Sync (i.e., the new slurp, but on steroids)
4703 case MSG_MON_SYNC:
4704 handle_sync(op);
4705 return;
4706 case MSG_MON_SCRUB:
4707 handle_scrub(op);
4708 return;
4709
4710 /* log acks are sent from a monitor we sent the MLog to, and are
4711 never sent by clients to us. */
4712 case MSG_LOGACK:
4713 log_client.handle_log_ack((MLogAck*)op->get_req());
4714 return;
4715
4716 // monmap
4717 case MSG_MON_JOIN:
4718 op->set_type_service();
4719 paxos_service[PAXOS_MONMAP]->dispatch(op);
4720 return;
4721
4722 // paxos
4723 case MSG_MON_PAXOS:
4724 {
4725 op->set_type_paxos();
4726 auto pm = op->get_req<MMonPaxos>();
4727 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4728 //can't send these!
4729 return;
4730 }
4731
4732 if (state == STATE_SYNCHRONIZING) {
4733 // we are synchronizing. These messages would do us no
4734 // good, thus just drop them and ignore them.
4735 dout(10) << __func__ << " ignore paxos msg from "
4736 << pm->get_source_inst() << dendl;
4737 return;
4738 }
4739
4740 // sanitize
4741 if (pm->epoch > get_epoch()) {
4742 bootstrap();
4743 return;
4744 }
4745 if (pm->epoch != get_epoch()) {
4746 return;
4747 }
4748
4749 paxos->dispatch(op);
4750 }
4751 return;
4752
4753 // elector messages
4754 case MSG_MON_ELECTION:
4755 op->set_type_election_or_ping();
4756 //check privileges here for simplicity
4757 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4758 dout(0) << "MMonElection received from entity without enough caps!"
4759 << op->get_session()->caps << dendl;
4760 return;;
4761 }
4762 if (!is_probing() && !is_synchronizing()) {
4763 elector.dispatch(op);
4764 }
4765 return;
4766
4767 case MSG_MON_PING:
4768 op->set_type_election_or_ping();
4769 elector.dispatch(op);
4770 return;
4771
4772 case MSG_FORWARD:
4773 handle_forward(op);
4774 return;
4775
4776 case MSG_TIMECHECK:
4777 dout(5) << __func__ << " ignoring " << op << dendl;
4778 return;
4779 case MSG_TIMECHECK2:
4780 handle_timecheck(op);
4781 return;
4782
4783 case MSG_MON_HEALTH:
4784 dout(5) << __func__ << " dropping deprecated message: "
4785 << *op->get_req() << dendl;
4786 break;
4787 case MSG_MON_HEALTH_CHECKS:
4788 op->set_type_service();
4789 paxos_service[PAXOS_HEALTH]->dispatch(op);
4790 return;
4791 }
4792 dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
4793 return;
4794 }
4795
4796 void Monitor::handle_ping(MonOpRequestRef op)
4797 {
4798 auto m = op->get_req<MPing>();
4799 dout(10) << __func__ << " " << *m << dendl;
4800 MPing *reply = new MPing;
4801 bufferlist payload;
4802 boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4803 f->open_object_section("pong");
4804
4805 healthmon()->get_health_status(false, f.get(), nullptr);
4806 get_mon_status(f.get());
4807
4808 f->close_section();
4809 stringstream ss;
4810 f->flush(ss);
4811 encode(ss.str(), payload);
4812 reply->set_payload(payload);
4813 dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
4814 m->get_connection()->send_message(reply);
4815 }
4816
4817 void Monitor::timecheck_start()
4818 {
4819 dout(10) << __func__ << dendl;
4820 timecheck_cleanup();
4821 if (get_quorum_mon_features().contains_all(
4822 ceph::features::mon::FEATURE_NAUTILUS)) {
4823 timecheck_start_round();
4824 }
4825 }
4826
4827 void Monitor::timecheck_finish()
4828 {
4829 dout(10) << __func__ << dendl;
4830 timecheck_cleanup();
4831 }
4832
4833 void Monitor::timecheck_start_round()
4834 {
4835 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4836 ceph_assert(is_leader());
4837
4838 if (monmap->size() == 1) {
4839 ceph_abort_msg("We are alone; this shouldn't have been scheduled!");
4840 return;
4841 }
4842
4843 if (timecheck_round % 2) {
4844 dout(10) << __func__ << " there's a timecheck going on" << dendl;
4845 utime_t curr_time = ceph_clock_now();
4846 double max = g_conf()->mon_timecheck_interval*3;
4847 if (curr_time - timecheck_round_start < max) {
4848 dout(10) << __func__ << " keep current round going" << dendl;
4849 goto out;
4850 } else {
4851 dout(10) << __func__
4852 << " finish current timecheck and start new" << dendl;
4853 timecheck_cancel_round();
4854 }
4855 }
4856
4857 ceph_assert(timecheck_round % 2 == 0);
4858 timecheck_acks = 0;
4859 timecheck_round ++;
4860 timecheck_round_start = ceph_clock_now();
4861 dout(10) << __func__ << " new " << timecheck_round << dendl;
4862
4863 timecheck();
4864 out:
4865 dout(10) << __func__ << " setting up next event" << dendl;
4866 timecheck_reset_event();
4867 }
4868
4869 void Monitor::timecheck_finish_round(bool success)
4870 {
4871 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4872 ceph_assert(timecheck_round % 2);
4873 timecheck_round ++;
4874 timecheck_round_start = utime_t();
4875
4876 if (success) {
4877 ceph_assert(timecheck_waiting.empty());
4878 ceph_assert(timecheck_acks == quorum.size());
4879 timecheck_report();
4880 timecheck_check_skews();
4881 return;
4882 }
4883
4884 dout(10) << __func__ << " " << timecheck_waiting.size()
4885 << " peers still waiting:";
4886 for (auto& p : timecheck_waiting) {
4887 *_dout << " mon." << p.first;
4888 }
4889 *_dout << dendl;
4890 timecheck_waiting.clear();
4891
4892 dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4893 }
4894
4895 void Monitor::timecheck_cancel_round()
4896 {
4897 timecheck_finish_round(false);
4898 }
4899
4900 void Monitor::timecheck_cleanup()
4901 {
4902 timecheck_round = 0;
4903 timecheck_acks = 0;
4904 timecheck_round_start = utime_t();
4905
4906 if (timecheck_event) {
4907 timer.cancel_event(timecheck_event);
4908 timecheck_event = NULL;
4909 }
4910 timecheck_waiting.clear();
4911 timecheck_skews.clear();
4912 timecheck_latencies.clear();
4913
4914 timecheck_rounds_since_clean = 0;
4915 }
4916
4917 void Monitor::timecheck_reset_event()
4918 {
4919 if (timecheck_event) {
4920 timer.cancel_event(timecheck_event);
4921 timecheck_event = NULL;
4922 }
4923
4924 double delay =
4925 cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4926
4927 if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4928 delay = cct->_conf->mon_timecheck_interval;
4929 }
4930
4931 dout(10) << __func__ << " delay " << delay
4932 << " rounds_since_clean " << timecheck_rounds_since_clean
4933 << dendl;
4934
4935 timecheck_event = timer.add_event_after(
4936 delay,
4937 new C_MonContext{this, [this](int) {
4938 timecheck_start_round();
4939 }});
4940 }
4941
4942 void Monitor::timecheck_check_skews()
4943 {
4944 dout(10) << __func__ << dendl;
4945 ceph_assert(is_leader());
4946 ceph_assert((timecheck_round % 2) == 0);
4947 if (monmap->size() == 1) {
4948 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4949 return;
4950 }
4951 ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
4952
4953 bool found_skew = false;
4954 for (auto& p : timecheck_skews) {
4955 double abs_skew;
4956 if (timecheck_has_skew(p.second, &abs_skew)) {
4957 dout(10) << __func__
4958 << " " << p.first << " skew " << abs_skew << dendl;
4959 found_skew = true;
4960 }
4961 }
4962
4963 if (found_skew) {
4964 ++timecheck_rounds_since_clean;
4965 timecheck_reset_event();
4966 } else if (timecheck_rounds_since_clean > 0) {
4967 dout(1) << __func__
4968 << " no clock skews found after " << timecheck_rounds_since_clean
4969 << " rounds" << dendl;
4970 // make sure the skews are really gone and not just a transient success
4971 // this will run just once if not in the presence of skews again.
4972 timecheck_rounds_since_clean = 1;
4973 timecheck_reset_event();
4974 timecheck_rounds_since_clean = 0;
4975 }
4976
4977 }
4978
4979 void Monitor::timecheck_report()
4980 {
4981 dout(10) << __func__ << dendl;
4982 ceph_assert(is_leader());
4983 ceph_assert((timecheck_round % 2) == 0);
4984 if (monmap->size() == 1) {
4985 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
4986 return;
4987 }
4988
4989 ceph_assert(timecheck_latencies.size() == timecheck_skews.size());
4990 bool do_output = true; // only output report once
4991 for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4992 if (monmap->get_name(*q) == name)
4993 continue;
4994
4995 MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_REPORT);
4996 m->epoch = get_epoch();
4997 m->round = timecheck_round;
4998
4999 for (auto& it : timecheck_skews) {
5000 double skew = it.second;
5001 double latency = timecheck_latencies[it.first];
5002
5003 m->skews[it.first] = skew;
5004 m->latencies[it.first] = latency;
5005
5006 if (do_output) {
5007 dout(25) << __func__ << " mon." << it.first
5008 << " latency " << latency
5009 << " skew " << skew << dendl;
5010 }
5011 }
5012 do_output = false;
5013 dout(10) << __func__ << " send report to mon." << *q << dendl;
5014 send_mon_message(m, *q);
5015 }
5016 }
5017
5018 void Monitor::timecheck()
5019 {
5020 dout(10) << __func__ << dendl;
5021 ceph_assert(is_leader());
5022 if (monmap->size() == 1) {
5023 ceph_abort_msg("We are alone; we shouldn't have gotten here!");
5024 return;
5025 }
5026 ceph_assert(timecheck_round % 2 != 0);
5027
5028 timecheck_acks = 1; // we ack ourselves
5029
5030 dout(10) << __func__ << " start timecheck epoch " << get_epoch()
5031 << " round " << timecheck_round << dendl;
5032
5033 // we are at the eye of the storm; the point of reference
5034 timecheck_skews[rank] = 0.0;
5035 timecheck_latencies[rank] = 0.0;
5036
5037 for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
5038 if (monmap->get_name(*it) == name)
5039 continue;
5040
5041 utime_t curr_time = ceph_clock_now();
5042 timecheck_waiting[*it] = curr_time;
5043 MTimeCheck2 *m = new MTimeCheck2(MTimeCheck2::OP_PING);
5044 m->epoch = get_epoch();
5045 m->round = timecheck_round;
5046 dout(10) << __func__ << " send " << *m << " to mon." << *it << dendl;
5047 send_mon_message(m, *it);
5048 }
5049 }
5050
5051 health_status_t Monitor::timecheck_status(ostringstream &ss,
5052 const double skew_bound,
5053 const double latency)
5054 {
5055 health_status_t status = HEALTH_OK;
5056 ceph_assert(latency >= 0);
5057
5058 double abs_skew;
5059 if (timecheck_has_skew(skew_bound, &abs_skew)) {
5060 status = HEALTH_WARN;
5061 ss << "clock skew " << abs_skew << "s"
5062 << " > max " << g_conf()->mon_clock_drift_allowed << "s";
5063 }
5064
5065 return status;
5066 }
5067
5068 void Monitor::handle_timecheck_leader(MonOpRequestRef op)
5069 {
5070 auto m = op->get_req<MTimeCheck2>();
5071 dout(10) << __func__ << " " << *m << dendl;
5072 /* handles PONG's */
5073 ceph_assert(m->op == MTimeCheck2::OP_PONG);
5074
5075 int other = m->get_source().num();
5076 if (m->epoch < get_epoch()) {
5077 dout(1) << __func__ << " got old timecheck epoch " << m->epoch
5078 << " from " << other
5079 << " curr " << get_epoch()
5080 << " -- severely lagged? discard" << dendl;
5081 return;
5082 }
5083 ceph_assert(m->epoch == get_epoch());
5084
5085 if (m->round < timecheck_round) {
5086 dout(1) << __func__ << " got old round " << m->round
5087 << " from " << other
5088 << " curr " << timecheck_round << " -- discard" << dendl;
5089 return;
5090 }
5091
5092 utime_t curr_time = ceph_clock_now();
5093
5094 ceph_assert(timecheck_waiting.count(other) > 0);
5095 utime_t timecheck_sent = timecheck_waiting[other];
5096 timecheck_waiting.erase(other);
5097 if (curr_time < timecheck_sent) {
5098 // our clock was readjusted -- drop everything until it all makes sense.
5099 dout(1) << __func__ << " our clock was readjusted --"
5100 << " bump round and drop current check"
5101 << dendl;
5102 timecheck_cancel_round();
5103 return;
5104 }
5105
5106 /* update peer latencies */
5107 double latency = (double)(curr_time - timecheck_sent);
5108
5109 if (timecheck_latencies.count(other) == 0)
5110 timecheck_latencies[other] = latency;
5111 else {
5112 double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
5113 timecheck_latencies[other] = avg_latency;
5114 }
5115
5116 /*
5117 * update skews
5118 *
5119 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
5120 * and 'a' happens to be lower than 'b'; so we use double instead.
5121 *
5122 * latency is always expected to be >= 0.
5123 *
5124 * delta, the difference between theirs timestamp and ours, may either be
5125 * lower or higher than 0; will hardly ever be 0.
5126 *
5127 * The absolute skew is the absolute delta minus the latency, which is
5128 * taken as a whole instead of an rtt given that there is some queueing
5129 * and dispatch times involved and it's hard to assess how long exactly
5130 * it took for the message to travel to the other side and be handled. So
5131 * we call it a bounded skew, the worst case scenario.
5132 *
5133 * Now, to math!
5134 *
5135 * Given that the latency is always positive, we can establish that the
5136 * bounded skew will be:
5137 *
5138 * 1. positive if the absolute delta is higher than the latency and
5139 * delta is positive
5140 * 2. negative if the absolute delta is higher than the latency and
5141 * delta is negative.
5142 * 3. zero if the absolute delta is lower than the latency.
5143 *
5144 * On 3. we make a judgement call and treat the skew as non-existent.
5145 * This is because that, if the absolute delta is lower than the
5146 * latency, then the apparently existing skew is nothing more than a
5147 * side-effect of the high latency at work.
5148 *
5149 * This may not be entirely true though, as a severely skewed clock
5150 * may be masked by an even higher latency, but with high latencies
5151 * we probably have worse issues to deal with than just skewed clocks.
5152 */
5153 ceph_assert(latency >= 0);
5154
5155 double delta = ((double) m->timestamp) - ((double) curr_time);
5156 double abs_delta = (delta > 0 ? delta : -delta);
5157 double skew_bound = abs_delta - latency;
5158 if (skew_bound < 0)
5159 skew_bound = 0;
5160 else if (delta < 0)
5161 skew_bound = -skew_bound;
5162
5163 ostringstream ss;
5164 health_status_t status = timecheck_status(ss, skew_bound, latency);
5165 if (status != HEALTH_OK) {
5166 clog->health(status) << other << " " << ss.str();
5167 }
5168
5169 dout(10) << __func__ << " from " << other << " ts " << m->timestamp
5170 << " delta " << delta << " skew_bound " << skew_bound
5171 << " latency " << latency << dendl;
5172
5173 timecheck_skews[other] = skew_bound;
5174
5175 timecheck_acks++;
5176 if (timecheck_acks == quorum.size()) {
5177 dout(10) << __func__ << " got pongs from everybody ("
5178 << timecheck_acks << " total)" << dendl;
5179 ceph_assert(timecheck_skews.size() == timecheck_acks);
5180 ceph_assert(timecheck_waiting.empty());
5181 // everyone has acked, so bump the round to finish it.
5182 timecheck_finish_round();
5183 }
5184 }
5185
5186 void Monitor::handle_timecheck_peon(MonOpRequestRef op)
5187 {
5188 auto m = op->get_req<MTimeCheck2>();
5189 dout(10) << __func__ << " " << *m << dendl;
5190
5191 ceph_assert(is_peon());
5192 ceph_assert(m->op == MTimeCheck2::OP_PING || m->op == MTimeCheck2::OP_REPORT);
5193
5194 if (m->epoch != get_epoch()) {
5195 dout(1) << __func__ << " got wrong epoch "
5196 << "(ours " << get_epoch()
5197 << " theirs: " << m->epoch << ") -- discarding" << dendl;
5198 return;
5199 }
5200
5201 if (m->round < timecheck_round) {
5202 dout(1) << __func__ << " got old round " << m->round
5203 << " current " << timecheck_round
5204 << " (epoch " << get_epoch() << ") -- discarding" << dendl;
5205 return;
5206 }
5207
5208 timecheck_round = m->round;
5209
5210 if (m->op == MTimeCheck2::OP_REPORT) {
5211 ceph_assert((timecheck_round % 2) == 0);
5212 timecheck_latencies.swap(m->latencies);
5213 timecheck_skews.swap(m->skews);
5214 return;
5215 }
5216
5217 ceph_assert((timecheck_round % 2) != 0);
5218 MTimeCheck2 *reply = new MTimeCheck2(MTimeCheck2::OP_PONG);
5219 utime_t curr_time = ceph_clock_now();
5220 reply->timestamp = curr_time;
5221 reply->epoch = m->epoch;
5222 reply->round = m->round;
5223 dout(10) << __func__ << " send " << *m
5224 << " to " << m->get_source_inst() << dendl;
5225 m->get_connection()->send_message(reply);
5226 }
5227
5228 void Monitor::handle_timecheck(MonOpRequestRef op)
5229 {
5230 auto m = op->get_req<MTimeCheck2>();
5231 dout(10) << __func__ << " " << *m << dendl;
5232
5233 if (is_leader()) {
5234 if (m->op != MTimeCheck2::OP_PONG) {
5235 dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
5236 } else {
5237 handle_timecheck_leader(op);
5238 }
5239 } else if (is_peon()) {
5240 if (m->op != MTimeCheck2::OP_PING && m->op != MTimeCheck2::OP_REPORT) {
5241 dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
5242 } else {
5243 handle_timecheck_peon(op);
5244 }
5245 } else {
5246 dout(1) << __func__ << " drop unexpected msg" << dendl;
5247 }
5248 }
5249
5250 void Monitor::handle_subscribe(MonOpRequestRef op)
5251 {
5252 auto m = op->get_req<MMonSubscribe>();
5253 dout(10) << "handle_subscribe " << *m << dendl;
5254
5255 bool reply = false;
5256
5257 MonSession *s = op->get_session();
5258 ceph_assert(s);
5259
5260 if (m->hostname.size()) {
5261 s->remote_host = m->hostname;
5262 }
5263
5264 for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
5265 p != m->what.end();
5266 ++p) {
5267 if (p->first == "monmap" || p->first == "config") {
5268 // these require no caps
5269 } else if (!s->is_capable("mon", MON_CAP_R)) {
5270 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
5271 << " not enough caps for " << *(op->get_req()) << " -- dropping"
5272 << dendl;
5273 continue;
5274 }
5275
5276 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
5277 if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
5278 reply = true;
5279
5280 // remove conflicting subscribes
5281 if (logmon()->sub_name_to_id(p->first) >= 0) {
5282 for (map<string, Subscription*>::iterator it = s->sub_map.begin();
5283 it != s->sub_map.end(); ) {
5284 if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
5285 std::lock_guard l(session_map_lock);
5286 session_map.remove_sub((it++)->second);
5287 } else {
5288 ++it;
5289 }
5290 }
5291 }
5292
5293 {
5294 std::lock_guard l(session_map_lock);
5295 session_map.add_update_sub(s, p->first, p->second.start,
5296 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
5297 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
5298 }
5299
5300 if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
5301 dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
5302 if ((int)s->is_capable("mds", MON_CAP_R)) {
5303 Subscription *sub = s->sub_map[p->first];
5304 ceph_assert(sub != nullptr);
5305 mdsmon()->check_sub(sub);
5306 }
5307 } else if (p->first == "osdmap") {
5308 if ((int)s->is_capable("osd", MON_CAP_R)) {
5309 if (s->osd_epoch > p->second.start) {
5310 // client needs earlier osdmaps on purpose, so reset the sent epoch
5311 s->osd_epoch = 0;
5312 }
5313 osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
5314 }
5315 } else if (p->first == "osd_pg_creates") {
5316 if ((int)s->is_capable("osd", MON_CAP_W)) {
5317 osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
5318 }
5319 } else if (p->first == "monmap") {
5320 monmon()->check_sub(s->sub_map[p->first]);
5321 } else if (logmon()->sub_name_to_id(p->first) >= 0) {
5322 logmon()->check_sub(s->sub_map[p->first]);
5323 } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
5324 mgrmon()->check_sub(s->sub_map[p->first]);
5325 } else if (p->first == "servicemap") {
5326 mgrstatmon()->check_sub(s->sub_map[p->first]);
5327 } else if (p->first == "config") {
5328 configmon()->check_sub(s);
5329 } else if (p->first.find("kv:") == 0) {
5330 kvmon()->check_sub(s->sub_map[p->first]);
5331 }
5332 }
5333
5334 if (reply) {
5335 // we only need to reply if the client is old enough to think it
5336 // has to send renewals.
5337 ConnectionRef con = m->get_connection();
5338 if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
5339 m->get_connection()->send_message(new MMonSubscribeAck(
5340 monmap->get_fsid(), (int)g_conf()->mon_subscribe_interval));
5341 }
5342
5343 }
5344
5345 void Monitor::handle_get_version(MonOpRequestRef op)
5346 {
5347 auto m = op->get_req<MMonGetVersion>();
5348 dout(10) << "handle_get_version " << *m << dendl;
5349 PaxosService *svc = NULL;
5350
5351 MonSession *s = op->get_session();
5352 ceph_assert(s);
5353
5354 if (!is_leader() && !is_peon()) {
5355 dout(10) << " waiting for quorum" << dendl;
5356 waitfor_quorum.push_back(new C_RetryMessage(this, op));
5357 goto out;
5358 }
5359
5360 if (m->what == "mdsmap") {
5361 svc = mdsmon();
5362 } else if (m->what == "fsmap") {
5363 svc = mdsmon();
5364 } else if (m->what == "osdmap") {
5365 svc = osdmon();
5366 } else if (m->what == "monmap") {
5367 svc = monmon();
5368 } else {
5369 derr << "invalid map type " << m->what << dendl;
5370 }
5371
5372 if (svc) {
5373 if (!svc->is_readable()) {
5374 svc->wait_for_readable(op, new C_RetryMessage(this, op));
5375 goto out;
5376 }
5377
5378 MMonGetVersionReply *reply = new MMonGetVersionReply();
5379 reply->handle = m->handle;
5380 reply->version = svc->get_last_committed();
5381 reply->oldest_version = svc->get_first_committed();
5382 reply->set_tid(m->get_tid());
5383
5384 m->get_connection()->send_message(reply);
5385 }
5386 out:
5387 return;
5388 }
5389
5390 bool Monitor::ms_handle_reset(Connection *con)
5391 {
5392 dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
5393
5394 // ignore lossless monitor sessions
5395 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
5396 return false;
5397
5398 auto priv = con->get_priv();
5399 auto s = static_cast<MonSession*>(priv.get());
5400 if (!s)
5401 return false;
5402
5403 // break any con <-> session ref cycle
5404 s->con->set_priv(nullptr);
5405
5406 if (is_shutdown())
5407 return false;
5408
5409 std::lock_guard l(lock);
5410
5411 dout(10) << "reset/close on session " << s->name << " " << s->addrs << dendl;
5412 if (!s->closed && s->item.is_on_list()) {
5413 std::lock_guard l(session_map_lock);
5414 remove_session(s);
5415 }
5416 return true;
5417 }
5418
5419 bool Monitor::ms_handle_refused(Connection *con)
5420 {
5421 // just log for now...
5422 dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
5423 return false;
5424 }
5425
5426 // -----
5427
5428 void Monitor::send_latest_monmap(Connection *con)
5429 {
5430 bufferlist bl;
5431 monmap->encode(bl, con->get_features());
5432 con->send_message(new MMonMap(bl));
5433 }
5434
5435 void Monitor::handle_mon_get_map(MonOpRequestRef op)
5436 {
5437 auto m = op->get_req<MMonGetMap>();
5438 dout(10) << "handle_mon_get_map" << dendl;
5439 send_latest_monmap(m->get_connection().get());
5440 }
5441
5442 int Monitor::load_metadata()
5443 {
5444 bufferlist bl;
5445 int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
5446 if (r)
5447 return r;
5448 auto it = bl.cbegin();
5449 decode(mon_metadata, it);
5450
5451 pending_metadata = mon_metadata;
5452 return 0;
5453 }
5454
5455 int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
5456 {
5457 ceph_assert(f);
5458 if (!mon_metadata.count(mon)) {
5459 err << "mon." << mon << " not found";
5460 return -EINVAL;
5461 }
5462 const Metadata& m = mon_metadata[mon];
5463 for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
5464 f->dump_string(p->first.c_str(), p->second);
5465 }
5466 return 0;
5467 }
5468
5469 void Monitor::count_metadata(const string& field, map<string,int> *out)
5470 {
5471 for (auto& p : mon_metadata) {
5472 auto q = p.second.find(field);
5473 if (q == p.second.end()) {
5474 (*out)["unknown"]++;
5475 } else {
5476 (*out)[q->second]++;
5477 }
5478 }
5479 }
5480
5481 void Monitor::count_metadata(const string& field, Formatter *f)
5482 {
5483 map<string,int> by_val;
5484 count_metadata(field, &by_val);
5485 f->open_object_section(field.c_str());
5486 for (auto& p : by_val) {
5487 f->dump_int(p.first.c_str(), p.second);
5488 }
5489 f->close_section();
5490 }
5491
5492 void Monitor::get_all_versions(std::map<string, list<string> > &versions)
5493 {
5494 // mon
5495 get_versions(versions);
5496 // osd
5497 osdmon()->get_versions(versions);
5498 // mgr
5499 mgrmon()->get_versions(versions);
5500 // mds
5501 mdsmon()->get_versions(versions);
5502 dout(20) << __func__ << " all versions=" << versions << dendl;
5503 }
5504
5505 void Monitor::get_versions(std::map<string, list<string> > &versions)
5506 {
5507 for (auto& [rank, metadata] : mon_metadata) {
5508 auto q = metadata.find("ceph_version_short");
5509 if (q == metadata.end()) {
5510 // not likely
5511 continue;
5512 }
5513 versions[q->second].push_back(string("mon.") + monmap->get_name(rank));
5514 }
5515 }
5516
5517 int Monitor::print_nodes(Formatter *f, ostream& err)
5518 {
5519 map<string, list<string> > mons; // hostname => mon
5520 for (map<int, Metadata>::iterator it = mon_metadata.begin();
5521 it != mon_metadata.end(); ++it) {
5522 const Metadata& m = it->second;
5523 Metadata::const_iterator hostname = m.find("hostname");
5524 if (hostname == m.end()) {
5525 // not likely though
5526 continue;
5527 }
5528 mons[hostname->second].push_back(monmap->get_name(it->first));
5529 }
5530
5531 dump_services(f, mons, "mon");
5532 return 0;
5533 }
5534
5535 // ----------------------------------------------
5536 // scrub
5537
5538 int Monitor::scrub_start()
5539 {
5540 dout(10) << __func__ << dendl;
5541 ceph_assert(is_leader());
5542
5543 if (!scrub_result.empty()) {
5544 clog->info() << "scrub already in progress";
5545 return -EBUSY;
5546 }
5547
5548 scrub_event_cancel();
5549 scrub_result.clear();
5550 scrub_state.reset(new ScrubState);
5551
5552 scrub();
5553 return 0;
5554 }
5555
5556 int Monitor::scrub()
5557 {
5558 ceph_assert(is_leader());
5559 ceph_assert(scrub_state);
5560
5561 scrub_cancel_timeout();
5562 wait_for_paxos_write();
5563 scrub_version = paxos->get_version();
5564
5565
5566 // scrub all keys if we're the only monitor in the quorum
5567 int32_t num_keys =
5568 (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
5569
5570 for (set<int>::iterator p = quorum.begin();
5571 p != quorum.end();
5572 ++p) {
5573 if (*p == rank)
5574 continue;
5575 MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
5576 num_keys);
5577 r->key = scrub_state->last_key;
5578 send_mon_message(r, *p);
5579 }
5580
5581 // scrub my keys
5582 bool r = _scrub(&scrub_result[rank],
5583 &scrub_state->last_key,
5584 &num_keys);
5585
5586 scrub_state->finished = !r;
5587
5588 // only after we got our scrub results do we really care whether the
5589 // other monitors are late on their results. Also, this way we avoid
5590 // triggering the timeout if we end up getting stuck in _scrub() for
5591 // longer than the duration of the timeout.
5592 scrub_reset_timeout();
5593
5594 if (quorum.size() == 1) {
5595 ceph_assert(scrub_state->finished == true);
5596 scrub_finish();
5597 }
5598 return 0;
5599 }
5600
5601 void Monitor::handle_scrub(MonOpRequestRef op)
5602 {
5603 auto m = op->get_req<MMonScrub>();
5604 dout(10) << __func__ << " " << *m << dendl;
5605 switch (m->op) {
5606 case MMonScrub::OP_SCRUB:
5607 {
5608 if (!is_peon())
5609 break;
5610
5611 wait_for_paxos_write();
5612
5613 if (m->version != paxos->get_version())
5614 break;
5615
5616 MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
5617 m->version,
5618 m->num_keys);
5619
5620 reply->key = m->key;
5621 _scrub(&reply->result, &reply->key, &reply->num_keys);
5622 m->get_connection()->send_message(reply);
5623 }
5624 break;
5625
5626 case MMonScrub::OP_RESULT:
5627 {
5628 if (!is_leader())
5629 break;
5630 if (m->version != scrub_version)
5631 break;
5632 // reset the timeout each time we get a result
5633 scrub_reset_timeout();
5634
5635 int from = m->get_source().num();
5636 ceph_assert(scrub_result.count(from) == 0);
5637 scrub_result[from] = m->result;
5638
5639 if (scrub_result.size() == quorum.size()) {
5640 scrub_check_results();
5641 scrub_result.clear();
5642 if (scrub_state->finished)
5643 scrub_finish();
5644 else
5645 scrub();
5646 }
5647 }
5648 break;
5649 }
5650 }
5651
5652 bool Monitor::_scrub(ScrubResult *r,
5653 pair<string,string> *start,
5654 int *num_keys)
5655 {
5656 ceph_assert(r != NULL);
5657 ceph_assert(start != NULL);
5658 ceph_assert(num_keys != NULL);
5659
5660 set<string> prefixes = get_sync_targets_names();
5661 prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5662
5663 dout(10) << __func__ << " start (" << *start << ")"
5664 << " num_keys " << *num_keys << dendl;
5665
5666 MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
5667
5668 int scrubbed_keys = 0;
5669 pair<string,string> last_key;
5670
5671 while (it->has_next_chunk()) {
5672
5673 if (*num_keys > 0 && scrubbed_keys == *num_keys)
5674 break;
5675
5676 pair<string,string> k = it->get_next_key();
5677 if (prefixes.count(k.first) == 0)
5678 continue;
5679
5680 if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
5681 (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
5682 dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
5683 << dendl;
5684 continue;
5685 }
5686
5687 bufferlist bl;
5688 int err = store->get(k.first, k.second, bl);
5689 ceph_assert(err == 0);
5690
5691 uint32_t key_crc = bl.crc32c(0);
5692 dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
5693 << " crc " << key_crc << dendl;
5694 r->prefix_keys[k.first]++;
5695 if (r->prefix_crc.count(k.first) == 0) {
5696 r->prefix_crc[k.first] = 0;
5697 }
5698 r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
5699
5700 if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
5701 (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
5702 dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
5703 r->prefix_crc[k.first] += 1;
5704 }
5705
5706 ++scrubbed_keys;
5707 last_key = k;
5708 }
5709
5710 dout(20) << __func__ << " last_key (" << last_key << ")"
5711 << " scrubbed_keys " << scrubbed_keys
5712 << " has_next " << it->has_next_chunk() << dendl;
5713
5714 *start = last_key;
5715 *num_keys = scrubbed_keys;
5716
5717 return it->has_next_chunk();
5718 }
5719
5720 void Monitor::scrub_check_results()
5721 {
5722 dout(10) << __func__ << dendl;
5723
5724 // compare
5725 int errors = 0;
5726 ScrubResult& mine = scrub_result[rank];
5727 for (map<int,ScrubResult>::iterator p = scrub_result.begin();
5728 p != scrub_result.end();
5729 ++p) {
5730 if (p->first == rank)
5731 continue;
5732 if (p->second != mine) {
5733 ++errors;
5734 clog->error() << "scrub mismatch";
5735 clog->error() << " mon." << rank << " " << mine;
5736 clog->error() << " mon." << p->first << " " << p->second;
5737 }
5738 }
5739 if (!errors)
5740 clog->debug() << "scrub ok on " << quorum << ": " << mine;
5741 }
5742
5743 inline void Monitor::scrub_timeout()
5744 {
5745 dout(1) << __func__ << " restarting scrub" << dendl;
5746 scrub_reset();
5747 scrub_start();
5748 }
5749
5750 void Monitor::scrub_finish()
5751 {
5752 dout(10) << __func__ << dendl;
5753 scrub_reset();
5754 scrub_event_start();
5755 }
5756
5757 void Monitor::scrub_reset()
5758 {
5759 dout(10) << __func__ << dendl;
5760 scrub_cancel_timeout();
5761 scrub_version = 0;
5762 scrub_result.clear();
5763 scrub_state.reset();
5764 }
5765
5766 inline void Monitor::scrub_update_interval(ceph::timespan interval)
5767 {
5768 // we don't care about changes if we are not the leader.
5769 // changes will be visible if we become the leader.
5770 if (!is_leader())
5771 return;
5772
5773 dout(1) << __func__ << " new interval = " << interval << dendl;
5774
5775 // if scrub already in progress, all changes will already be visible during
5776 // the next round. Nothing to do.
5777 if (scrub_state != NULL)
5778 return;
5779
5780 scrub_event_cancel();
5781 scrub_event_start();
5782 }
5783
5784 void Monitor::scrub_event_start()
5785 {
5786 dout(10) << __func__ << dendl;
5787
5788 if (scrub_event)
5789 scrub_event_cancel();
5790
5791 auto scrub_interval =
5792 cct->_conf.get_val<std::chrono::seconds>("mon_scrub_interval");
5793 if (scrub_interval == std::chrono::seconds::zero()) {
5794 dout(1) << __func__ << " scrub event is disabled"
5795 << " (mon_scrub_interval = " << scrub_interval
5796 << ")" << dendl;
5797 return;
5798 }
5799
5800 scrub_event = timer.add_event_after(
5801 scrub_interval,
5802 new C_MonContext{this, [this](int) {
5803 scrub_start();
5804 }});
5805 }
5806
5807 void Monitor::scrub_event_cancel()
5808 {
5809 dout(10) << __func__ << dendl;
5810 if (scrub_event) {
5811 timer.cancel_event(scrub_event);
5812 scrub_event = NULL;
5813 }
5814 }
5815
5816 inline void Monitor::scrub_cancel_timeout()
5817 {
5818 if (scrub_timeout_event) {
5819 timer.cancel_event(scrub_timeout_event);
5820 scrub_timeout_event = NULL;
5821 }
5822 }
5823
5824 void Monitor::scrub_reset_timeout()
5825 {
5826 dout(15) << __func__ << " reset timeout event" << dendl;
5827 scrub_cancel_timeout();
5828 scrub_timeout_event = timer.add_event_after(
5829 g_conf()->mon_scrub_timeout,
5830 new C_MonContext{this, [this](int) {
5831 scrub_timeout();
5832 }});
5833 }
5834
5835 /************ TICK ***************/
5836 void Monitor::new_tick()
5837 {
5838 timer.add_event_after(g_conf()->mon_tick_interval, new C_MonContext{this, [this](int) {
5839 tick();
5840 }});
5841 }
5842
5843 void Monitor::tick()
5844 {
5845 // ok go.
5846 dout(11) << "tick" << dendl;
5847 const utime_t now = ceph_clock_now();
5848
5849 // Check if we need to emit any delayed health check updated messages
5850 if (is_leader()) {
5851 const auto min_period = g_conf().get_val<int64_t>(
5852 "mon_health_log_update_period");
5853 for (auto& svc : paxos_service) {
5854 auto health = svc->get_health_checks();
5855
5856 for (const auto &i : health.checks) {
5857 const std::string &code = i.first;
5858 const std::string &summary = i.second.summary;
5859 const health_status_t severity = i.second.severity;
5860
5861 auto status_iter = health_check_log_times.find(code);
5862 if (status_iter == health_check_log_times.end()) {
5863 continue;
5864 }
5865
5866 auto &log_status = status_iter->second;
5867 bool const changed = log_status.last_message != summary
5868 || log_status.severity != severity;
5869
5870 if (changed && now - log_status.updated_at > min_period) {
5871 log_status.last_message = summary;
5872 log_status.updated_at = now;
5873 log_status.severity = severity;
5874
5875 ostringstream ss;
5876 ss << "Health check update: " << summary << " (" << code << ")";
5877 clog->health(severity) << ss.str();
5878 }
5879 }
5880 }
5881 }
5882
5883
5884 for (auto& svc : paxos_service) {
5885 svc->tick();
5886 svc->maybe_trim();
5887 }
5888
5889 // trim sessions
5890 {
5891 std::lock_guard l(session_map_lock);
5892 auto p = session_map.sessions.begin();
5893
5894 bool out_for_too_long = (!exited_quorum.is_zero() &&
5895 now > (exited_quorum + 2*g_conf()->mon_lease));
5896
5897 while (!p.end()) {
5898 MonSession *s = *p;
5899 ++p;
5900
5901 // don't trim monitors
5902 if (s->name.is_mon())
5903 continue;
5904
5905 if (s->session_timeout < now && s->con) {
5906 // check keepalive, too
5907 s->session_timeout = s->con->get_last_keepalive();
5908 s->session_timeout += g_conf()->mon_session_timeout;
5909 }
5910 if (s->session_timeout < now) {
5911 dout(10) << " trimming session " << s->con << " " << s->name
5912 << " " << s->addrs
5913 << " (timeout " << s->session_timeout
5914 << " < now " << now << ")" << dendl;
5915 } else if (out_for_too_long) {
5916 // boot the client Session because we've taken too long getting back in
5917 dout(10) << " trimming session " << s->con << " " << s->name
5918 << " because we've been out of quorum too long" << dendl;
5919 } else {
5920 continue;
5921 }
5922
5923 s->con->mark_down();
5924 remove_session(s);
5925 logger->inc(l_mon_session_trim);
5926 }
5927 }
5928 sync_trim_providers();
5929
5930 if (!maybe_wait_for_quorum.empty()) {
5931 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5932 }
5933
5934 if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5935 // this is only necessary on upgraded clusters.
5936 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5937 prepare_new_fingerprint(t);
5938 paxos->trigger_propose();
5939 }
5940
5941 mgr_client.update_daemon_health(get_health_metrics());
5942 new_tick();
5943 }
5944
5945 vector<DaemonHealthMetric> Monitor::get_health_metrics()
5946 {
5947 vector<DaemonHealthMetric> metrics;
5948
5949 utime_t oldest_secs;
5950 const utime_t now = ceph_clock_now();
5951 auto too_old = now;
5952 too_old -= g_conf().get_val<std::chrono::seconds>("mon_op_complaint_time").count();
5953 int slow = 0;
5954 TrackedOpRef oldest_op;
5955 auto count_slow_ops = [&](TrackedOp& op) {
5956 if (op.get_initiated() < too_old) {
5957 slow++;
5958 if (!oldest_op || op.get_initiated() < oldest_op->get_initiated()) {
5959 oldest_op = &op;
5960 }
5961 return true;
5962 } else {
5963 return false;
5964 }
5965 };
5966 if (op_tracker.visit_ops_in_flight(&oldest_secs, count_slow_ops)) {
5967 if (slow) {
5968 derr << __func__ << " reporting " << slow << " slow ops, oldest is "
5969 << oldest_op->get_desc() << dendl;
5970 }
5971 metrics.emplace_back(daemon_metric::SLOW_OPS, slow, oldest_secs);
5972 } else {
5973 metrics.emplace_back(daemon_metric::SLOW_OPS, 0, 0);
5974 }
5975 return metrics;
5976 }
5977
5978 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5979 {
5980 uuid_d nf;
5981 nf.generate_random();
5982 dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5983
5984 bufferlist bl;
5985 encode(nf, bl);
5986 t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5987 }
5988
5989 int Monitor::check_fsid()
5990 {
5991 bufferlist ebl;
5992 int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5993 if (r == -ENOENT)
5994 return r;
5995 ceph_assert(r == 0);
5996
5997 string es(ebl.c_str(), ebl.length());
5998
5999 // only keep the first line
6000 size_t pos = es.find_first_of('\n');
6001 if (pos != string::npos)
6002 es.resize(pos);
6003
6004 dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
6005 uuid_d ondisk;
6006 if (!ondisk.parse(es.c_str())) {
6007 derr << "error: unable to parse uuid" << dendl;
6008 return -EINVAL;
6009 }
6010
6011 if (monmap->get_fsid() != ondisk) {
6012 derr << "error: cluster_uuid file exists with value " << ondisk
6013 << ", != our uuid " << monmap->get_fsid() << dendl;
6014 return -EEXIST;
6015 }
6016
6017 return 0;
6018 }
6019
6020 int Monitor::write_fsid()
6021 {
6022 auto t(std::make_shared<MonitorDBStore::Transaction>());
6023 write_fsid(t);
6024 int r = store->apply_transaction(t);
6025 return r;
6026 }
6027
6028 int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
6029 {
6030 ostringstream ss;
6031 ss << monmap->get_fsid() << "\n";
6032 string us = ss.str();
6033
6034 bufferlist b;
6035 b.append(us);
6036
6037 t->put(MONITOR_NAME, "cluster_uuid", b);
6038 return 0;
6039 }
6040
6041 /*
6042 * this is the closest thing to a traditional 'mkfs' for ceph.
6043 * initialize the monitor state machines to their initial values.
6044 */
6045 int Monitor::mkfs(bufferlist& osdmapbl)
6046 {
6047 auto t(std::make_shared<MonitorDBStore::Transaction>());
6048
6049 // verify cluster fsid
6050 int r = check_fsid();
6051 if (r < 0 && r != -ENOENT)
6052 return r;
6053
6054 bufferlist magicbl;
6055 magicbl.append(CEPH_MON_ONDISK_MAGIC);
6056 magicbl.append("\n");
6057 t->put(MONITOR_NAME, "magic", magicbl);
6058
6059
6060 features = get_initial_supported_features();
6061 write_features(t);
6062
6063 // save monmap, osdmap, keyring.
6064 bufferlist monmapbl;
6065 monmap->encode(monmapbl, CEPH_FEATURES_ALL);
6066 monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
6067 t->put("mkfs", "monmap", monmapbl);
6068
6069 if (osdmapbl.length()) {
6070 // make sure it's a valid osdmap
6071 try {
6072 OSDMap om;
6073 om.decode(osdmapbl);
6074 }
6075 catch (ceph::buffer::error& e) {
6076 derr << "error decoding provided osdmap: " << e.what() << dendl;
6077 return -EINVAL;
6078 }
6079 t->put("mkfs", "osdmap", osdmapbl);
6080 }
6081
6082 if (is_keyring_required()) {
6083 KeyRing keyring;
6084 string keyring_filename;
6085
6086 r = ceph_resolve_file_search(g_conf()->keyring, keyring_filename);
6087 if (r) {
6088 if (g_conf()->key != "") {
6089 string keyring_plaintext = "[mon.]\n\tkey = " + g_conf()->key +
6090 "\n\tcaps mon = \"allow *\"\n";
6091 bufferlist bl;
6092 bl.append(keyring_plaintext);
6093 try {
6094 auto i = bl.cbegin();
6095 keyring.decode(i);
6096 }
6097 catch (const ceph::buffer::error& e) {
6098 derr << "error decoding keyring " << keyring_plaintext
6099 << ": " << e.what() << dendl;
6100 return -EINVAL;
6101 }
6102 } else {
6103 derr << "unable to find a keyring on " << g_conf()->keyring
6104 << ": " << cpp_strerror(r) << dendl;
6105 return r;
6106 }
6107 } else {
6108 r = keyring.load(g_ceph_context, keyring_filename);
6109 if (r < 0) {
6110 derr << "unable to load initial keyring " << g_conf()->keyring << dendl;
6111 return r;
6112 }
6113 }
6114
6115 // put mon. key in external keyring; seed with everything else.
6116 extract_save_mon_key(keyring);
6117
6118 bufferlist keyringbl;
6119 keyring.encode_plaintext(keyringbl);
6120 t->put("mkfs", "keyring", keyringbl);
6121 }
6122 write_fsid(t);
6123 store->apply_transaction(t);
6124
6125 return 0;
6126 }
6127
6128 int Monitor::write_default_keyring(bufferlist& bl)
6129 {
6130 ostringstream os;
6131 os << g_conf()->mon_data << "/keyring";
6132
6133 int err = 0;
6134 int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT|O_CLOEXEC, 0600);
6135 if (fd < 0) {
6136 err = -errno;
6137 dout(0) << __func__ << " failed to open " << os.str()
6138 << ": " << cpp_strerror(err) << dendl;
6139 return err;
6140 }
6141
6142 err = bl.write_fd(fd);
6143 if (!err)
6144 ::fsync(fd);
6145 VOID_TEMP_FAILURE_RETRY(::close(fd));
6146
6147 return err;
6148 }
6149
6150 void Monitor::extract_save_mon_key(KeyRing& keyring)
6151 {
6152 EntityName mon_name;
6153 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
6154 EntityAuth mon_key;
6155 if (keyring.get_auth(mon_name, mon_key)) {
6156 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
6157 KeyRing pkey;
6158 pkey.add(mon_name, mon_key);
6159 bufferlist bl;
6160 pkey.encode_plaintext(bl);
6161 write_default_keyring(bl);
6162 keyring.remove(mon_name);
6163 }
6164 }
6165
6166 // AuthClient methods -- for mon <-> mon communication
6167 int Monitor::get_auth_request(
6168 Connection *con,
6169 AuthConnectionMeta *auth_meta,
6170 uint32_t *method,
6171 vector<uint32_t> *preferred_modes,
6172 bufferlist *out)
6173 {
6174 std::scoped_lock l(auth_lock);
6175 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
6176 con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
6177 return -EACCES;
6178 }
6179 AuthAuthorizer *auth;
6180 if (!get_authorizer(con->get_peer_type(), &auth)) {
6181 return -EACCES;
6182 }
6183 auth_meta->authorizer.reset(auth);
6184 auth_registry.get_supported_modes(con->get_peer_type(),
6185 auth->protocol,
6186 preferred_modes);
6187 *method = auth->protocol;
6188 *out = auth->bl;
6189 return 0;
6190 }
6191
6192 int Monitor::handle_auth_reply_more(
6193 Connection *con,
6194 AuthConnectionMeta *auth_meta,
6195 const bufferlist& bl,
6196 bufferlist *reply)
6197 {
6198 std::scoped_lock l(auth_lock);
6199 if (!auth_meta->authorizer) {
6200 derr << __func__ << " no authorizer?" << dendl;
6201 return -EACCES;
6202 }
6203 auth_meta->authorizer->add_challenge(cct, bl);
6204 *reply = auth_meta->authorizer->bl;
6205 return 0;
6206 }
6207
6208 int Monitor::handle_auth_done(
6209 Connection *con,
6210 AuthConnectionMeta *auth_meta,
6211 uint64_t global_id,
6212 uint32_t con_mode,
6213 const bufferlist& bl,
6214 CryptoKey *session_key,
6215 std::string *connection_secret)
6216 {
6217 std::scoped_lock l(auth_lock);
6218 // verify authorizer reply
6219 auto p = bl.begin();
6220 if (!auth_meta->authorizer->verify_reply(p, connection_secret)) {
6221 dout(0) << __func__ << " failed verifying authorizer reply" << dendl;
6222 return -EACCES;
6223 }
6224 auth_meta->session_key = auth_meta->authorizer->session_key;
6225 return 0;
6226 }
6227
6228 int Monitor::handle_auth_bad_method(
6229 Connection *con,
6230 AuthConnectionMeta *auth_meta,
6231 uint32_t old_auth_method,
6232 int result,
6233 const std::vector<uint32_t>& allowed_methods,
6234 const std::vector<uint32_t>& allowed_modes)
6235 {
6236 derr << __func__ << " hmm, they didn't like " << old_auth_method
6237 << " result " << cpp_strerror(result) << dendl;
6238 return -EACCES;
6239 }
6240
6241 bool Monitor::get_authorizer(int service_id, AuthAuthorizer **authorizer)
6242 {
6243 dout(10) << "get_authorizer for " << ceph_entity_type_name(service_id)
6244 << dendl;
6245
6246 if (is_shutdown())
6247 return false;
6248
6249 // we only connect to other monitors and mgr; every else connects to us.
6250 if (service_id != CEPH_ENTITY_TYPE_MON &&
6251 service_id != CEPH_ENTITY_TYPE_MGR)
6252 return false;
6253
6254 if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
6255 // auth_none
6256 dout(20) << __func__ << " building auth_none authorizer" << dendl;
6257 AuthNoneClientHandler handler{g_ceph_context};
6258 handler.set_global_id(0);
6259 *authorizer = handler.build_authorizer(service_id);
6260 return true;
6261 }
6262
6263 CephXServiceTicketInfo auth_ticket_info;
6264 CephXSessionAuthInfo info;
6265 int ret;
6266
6267 EntityName name;
6268 name.set_type(CEPH_ENTITY_TYPE_MON);
6269 auth_ticket_info.ticket.name = name;
6270 auth_ticket_info.ticket.global_id = 0;
6271
6272 if (service_id == CEPH_ENTITY_TYPE_MON) {
6273 // mon to mon authentication uses the private monitor shared key and not the
6274 // rotating key
6275 CryptoKey secret;
6276 if (!keyring.get_secret(name, secret) &&
6277 !key_server.get_secret(name, secret)) {
6278 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
6279 << dendl;
6280 stringstream ss, ds;
6281 int err = key_server.list_secrets(ds);
6282 if (err < 0)
6283 ss << "no installed auth entries!";
6284 else
6285 ss << "installed auth entries:";
6286 dout(0) << ss.str() << "\n" << ds.str() << dendl;
6287 return false;
6288 }
6289
6290 ret = key_server.build_session_auth_info(
6291 service_id, auth_ticket_info.ticket, secret, (uint64_t)-1, info);
6292 if (ret < 0) {
6293 dout(0) << __func__ << " failed to build mon session_auth_info "
6294 << cpp_strerror(ret) << dendl;
6295 return false;
6296 }
6297 } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
6298 // mgr
6299 ret = key_server.build_session_auth_info(
6300 service_id, auth_ticket_info.ticket, info);
6301 if (ret < 0) {
6302 derr << __func__ << " failed to build mgr service session_auth_info "
6303 << cpp_strerror(ret) << dendl;
6304 return false;
6305 }
6306 } else {
6307 ceph_abort(); // see check at top of fn
6308 }
6309
6310 CephXTicketBlob blob;
6311 if (!cephx_build_service_ticket_blob(cct, info, blob)) {
6312 dout(0) << "get_authorizer failed to build service ticket" << dendl;
6313 return false;
6314 }
6315 bufferlist ticket_data;
6316 encode(blob, ticket_data);
6317
6318 auto iter = ticket_data.cbegin();
6319 CephXTicketHandler handler(g_ceph_context, service_id);
6320 decode(handler.ticket, iter);
6321
6322 handler.session_key = info.session_key;
6323
6324 *authorizer = handler.build_authorizer(0);
6325
6326 return true;
6327 }
6328
6329 int Monitor::handle_auth_request(
6330 Connection *con,
6331 AuthConnectionMeta *auth_meta,
6332 bool more,
6333 uint32_t auth_method,
6334 const bufferlist &payload,
6335 bufferlist *reply)
6336 {
6337 std::scoped_lock l(auth_lock);
6338
6339 // NOTE: be careful, the Connection hasn't fully negotiated yet, so
6340 // e.g., peer_features, peer_addrs, and others are still unknown.
6341
6342 dout(10) << __func__ << " con " << con << (more ? " (more)":" (start)")
6343 << " method " << auth_method
6344 << " payload " << payload.length()
6345 << dendl;
6346 if (!payload.length()) {
6347 if (!con->is_msgr2() &&
6348 con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
6349 // for v1 connections, we tolerate no authorizer (from
6350 // non-monitors), because authentication happens via MAuth
6351 // messages.
6352 return 1;
6353 }
6354 return -EACCES;
6355 }
6356 if (!more) {
6357 auth_meta->auth_mode = payload[0];
6358 }
6359
6360 if (auth_meta->auth_mode >= AUTH_MODE_AUTHORIZER &&
6361 auth_meta->auth_mode <= AUTH_MODE_AUTHORIZER_MAX) {
6362 AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
6363 auth_method);
6364 if (!ah) {
6365 lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
6366 << auth_method << dendl;
6367 return -EOPNOTSUPP;
6368 }
6369 bool was_challenge = (bool)auth_meta->authorizer_challenge;
6370 bool isvalid = ah->verify_authorizer(
6371 cct,
6372 keyring,
6373 payload,
6374 auth_meta->get_connection_secret_length(),
6375 reply,
6376 &con->peer_name,
6377 &con->peer_global_id,
6378 &con->peer_caps_info,
6379 &auth_meta->session_key,
6380 &auth_meta->connection_secret,
6381 &auth_meta->authorizer_challenge);
6382 if (isvalid) {
6383 ms_handle_authentication(con);
6384 return 1;
6385 }
6386 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
6387 return 0;
6388 }
6389 dout(10) << __func__ << " bad authorizer on " << con << dendl;
6390 return -EACCES;
6391 } else if (auth_meta->auth_mode < AUTH_MODE_MON ||
6392 auth_meta->auth_mode > AUTH_MODE_MON_MAX) {
6393 derr << __func__ << " unrecognized auth mode " << auth_meta->auth_mode
6394 << dendl;
6395 return -EACCES;
6396 }
6397
6398 // wait until we've formed an initial quorum on mkfs so that we have
6399 // the initial keys (e.g., client.admin).
6400 if (authmon()->get_last_committed() == 0) {
6401 dout(10) << __func__ << " haven't formed initial quorum, EBUSY" << dendl;
6402 return -EBUSY;
6403 }
6404
6405 RefCountedPtr priv;
6406 MonSession *s;
6407 int32_t r = 0;
6408 auto p = payload.begin();
6409 if (!more) {
6410 if (con->get_priv()) {
6411 return -EACCES; // wtf
6412 }
6413
6414 // handler?
6415 unique_ptr<AuthServiceHandler> auth_handler{get_auth_service_handler(
6416 auth_method, g_ceph_context, &key_server)};
6417 if (!auth_handler) {
6418 dout(1) << __func__ << " auth_method " << auth_method << " not supported"
6419 << dendl;
6420 return -EOPNOTSUPP;
6421 }
6422
6423 uint8_t mode;
6424 EntityName entity_name;
6425
6426 try {
6427 decode(mode, p);
6428 if (mode < AUTH_MODE_MON ||
6429 mode > AUTH_MODE_MON_MAX) {
6430 dout(1) << __func__ << " invalid mode " << (int)mode << dendl;
6431 return -EACCES;
6432 }
6433 assert(mode >= AUTH_MODE_MON && mode <= AUTH_MODE_MON_MAX);
6434 decode(entity_name, p);
6435 decode(con->peer_global_id, p);
6436 } catch (ceph::buffer::error& e) {
6437 dout(1) << __func__ << " failed to decode, " << e.what() << dendl;
6438 return -EACCES;
6439 }
6440
6441 // supported method?
6442 if (entity_name.get_type() == CEPH_ENTITY_TYPE_MON ||
6443 entity_name.get_type() == CEPH_ENTITY_TYPE_OSD ||
6444 entity_name.get_type() == CEPH_ENTITY_TYPE_MDS ||
6445 entity_name.get_type() == CEPH_ENTITY_TYPE_MGR) {
6446 if (!auth_cluster_required.is_supported_auth(auth_method)) {
6447 dout(10) << __func__ << " entity " << entity_name << " method "
6448 << auth_method << " not among supported "
6449 << auth_cluster_required.get_supported_set() << dendl;
6450 return -EOPNOTSUPP;
6451 }
6452 } else {
6453 if (!auth_service_required.is_supported_auth(auth_method)) {
6454 dout(10) << __func__ << " entity " << entity_name << " method "
6455 << auth_method << " not among supported "
6456 << auth_cluster_required.get_supported_set() << dendl;
6457 return -EOPNOTSUPP;
6458 }
6459 }
6460
6461 // for msgr1 we would do some weirdness here to ensure signatures
6462 // are supported by the client if we require it. for msgr2 that
6463 // is not necessary.
6464
6465 bool is_new_global_id = false;
6466 if (!con->peer_global_id) {
6467 con->peer_global_id = authmon()->_assign_global_id();
6468 if (!con->peer_global_id) {
6469 dout(1) << __func__ << " failed to assign global_id" << dendl;
6470 return -EBUSY;
6471 }
6472 is_new_global_id = true;
6473 }
6474
6475 // set up partial session
6476 s = new MonSession(con);
6477 s->auth_handler = auth_handler.release();
6478 con->set_priv(RefCountedPtr{s, false});
6479
6480 r = s->auth_handler->start_session(
6481 entity_name,
6482 con->peer_global_id,
6483 is_new_global_id,
6484 reply,
6485 &con->peer_caps_info);
6486 } else {
6487 priv = con->get_priv();
6488 if (!priv) {
6489 // this can happen if the async ms_handle_reset event races with
6490 // the unlocked call into handle_auth_request
6491 return -EACCES;
6492 }
6493 s = static_cast<MonSession*>(priv.get());
6494 r = s->auth_handler->handle_request(
6495 p,
6496 auth_meta->get_connection_secret_length(),
6497 reply,
6498 &con->peer_caps_info,
6499 &auth_meta->session_key,
6500 &auth_meta->connection_secret);
6501 }
6502 if (r > 0 &&
6503 !s->authenticated) {
6504 ms_handle_authentication(con);
6505 }
6506
6507 dout(30) << " r " << r << " reply:\n";
6508 reply->hexdump(*_dout);
6509 *_dout << dendl;
6510 return r;
6511 }
6512
6513 void Monitor::ms_handle_accept(Connection *con)
6514 {
6515 auto priv = con->get_priv();
6516 MonSession *s = static_cast<MonSession*>(priv.get());
6517 if (!s) {
6518 // legacy protocol v1?
6519 dout(10) << __func__ << " con " << con << " no session" << dendl;
6520 return;
6521 }
6522
6523 if (s->item.is_on_list()) {
6524 dout(10) << __func__ << " con " << con << " session " << s
6525 << " already on list" << dendl;
6526 } else {
6527 std::lock_guard l(session_map_lock);
6528 if (state == STATE_SHUTDOWN) {
6529 dout(10) << __func__ << " ignoring new con " << con << " (shutdown)" << dendl;
6530 con->mark_down();
6531 return;
6532 }
6533 dout(10) << __func__ << " con " << con << " session " << s
6534 << " registering session for "
6535 << con->get_peer_addrs() << dendl;
6536 s->_ident(entity_name_t(con->get_peer_type(), con->get_peer_id()),
6537 con->get_peer_addrs());
6538 session_map.add_session(s);
6539 }
6540 }
6541
6542 int Monitor::ms_handle_authentication(Connection *con)
6543 {
6544 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
6545 // mon <-> mon connections need no Session, and setting one up
6546 // creates an awkward ref cycle between Session and Connection.
6547 return 1;
6548 }
6549
6550 auto priv = con->get_priv();
6551 MonSession *s = static_cast<MonSession*>(priv.get());
6552 if (!s) {
6553 // must be msgr2, otherwise dispatch would have set up the session.
6554 s = session_map.new_session(
6555 entity_name_t(con->get_peer_type(), -1), // we don't know yet
6556 con->get_peer_addrs(),
6557 con);
6558 assert(s);
6559 dout(10) << __func__ << " adding session " << s << " to con " << con
6560 << dendl;
6561 con->set_priv(s);
6562 logger->set(l_mon_num_sessions, session_map.get_size());
6563 logger->inc(l_mon_session_add);
6564 }
6565 dout(10) << __func__ << " session " << s << " con " << con
6566 << " addr " << s->con->get_peer_addr()
6567 << " " << *s << dendl;
6568
6569 AuthCapsInfo &caps_info = con->get_peer_caps_info();
6570 int ret = 0;
6571 if (caps_info.allow_all) {
6572 s->caps.set_allow_all();
6573 s->authenticated = true;
6574 ret = 1;
6575 } else if (caps_info.caps.length()) {
6576 bufferlist::const_iterator p = caps_info.caps.cbegin();
6577 string str;
6578 try {
6579 decode(str, p);
6580 } catch (const ceph::buffer::error &err) {
6581 derr << __func__ << " corrupt cap data for " << con->get_peer_entity_name()
6582 << " in auth db" << dendl;
6583 str.clear();
6584 ret = -EACCES;
6585 }
6586 if (ret >= 0) {
6587 if (s->caps.parse(str, NULL)) {
6588 s->authenticated = true;
6589 ret = 1;
6590 } else {
6591 derr << __func__ << " unparseable caps '" << str << "' for "
6592 << con->get_peer_entity_name() << dendl;
6593 ret = -EACCES;
6594 }
6595 }
6596 }
6597
6598 return ret;
6599 }
6600
6601 void Monitor::set_mon_crush_location(const string& loc)
6602 {
6603 if (loc.empty()) {
6604 return;
6605 }
6606 vector<string> loc_vec;
6607 loc_vec.push_back(loc);
6608 CrushWrapper::parse_loc_map(loc_vec, &crush_loc);
6609 need_set_crush_loc = true;
6610 }
6611
6612 void Monitor::notify_new_monmap(bool can_change_external_state, bool remove_rank_elector)
6613 {
6614 if (need_set_crush_loc) {
6615 auto my_info_i = monmap->mon_info.find(name);
6616 if (my_info_i != monmap->mon_info.end() &&
6617 my_info_i->second.crush_loc == crush_loc) {
6618 need_set_crush_loc = false;
6619 }
6620 }
6621 elector.notify_strategy_maybe_changed(monmap->strategy);
6622 if (remove_rank_elector){
6623 dout(10) << __func__ << " we have " << monmap->ranks.size()<< " ranks" << dendl;
6624 dout(10) << __func__ << " we have " << monmap->removed_ranks.size() << " removed ranks" << dendl;
6625 for (auto i = monmap->removed_ranks.rbegin();
6626 i != monmap->removed_ranks.rend(); ++i) {
6627 int remove_rank = *i;
6628 dout(10) << __func__ << " removing rank " << remove_rank << dendl;
6629 if (rank == remove_rank) {
6630 dout(5) << "We are removing our own rank, probably we"
6631 << " are removed from monmap before we shutdown ... dropping." << dendl;
6632 continue;
6633 }
6634 int new_rank = monmap->get_rank(messenger->get_myaddrs());
6635 if (new_rank == -1) {
6636 dout(5) << "We no longer exists in the monmap! ... dropping." << dendl;
6637 continue;
6638 }
6639 elector.notify_rank_removed(remove_rank, new_rank);
6640 }
6641 }
6642
6643 if (monmap->stretch_mode_enabled) {
6644 try_engage_stretch_mode();
6645 }
6646
6647 if (is_stretch_mode()) {
6648 if (!monmap->stretch_marked_down_mons.empty()) {
6649 dout(20) << __func__ << " stretch_marked_down_mons: " << monmap->stretch_marked_down_mons << dendl;
6650 set_degraded_stretch_mode();
6651 }
6652 }
6653 set_elector_disallowed_leaders(can_change_external_state);
6654 }
6655
6656 void Monitor::set_elector_disallowed_leaders(bool allow_election)
6657 {
6658 set<int> dl;
6659 for (auto name : monmap->disallowed_leaders) {
6660 dl.insert(monmap->get_rank(name));
6661 }
6662 if (is_stretch_mode()) {
6663 for (auto name : monmap->stretch_marked_down_mons) {
6664 dl.insert(monmap->get_rank(name));
6665 }
6666 dl.insert(monmap->get_rank(monmap->tiebreaker_mon));
6667 }
6668
6669 bool disallowed_changed = elector.set_disallowed_leaders(dl);
6670 if (disallowed_changed && allow_election) {
6671 elector.call_election();
6672 }
6673 }
6674
6675 struct CMonEnableStretchMode : public Context {
6676 Monitor *m;
6677 CMonEnableStretchMode(Monitor *mon) : m(mon) {}
6678 void finish(int r) {
6679 m->try_engage_stretch_mode();
6680 }
6681 };
6682 void Monitor::try_engage_stretch_mode()
6683 {
6684 dout(20) << __func__ << dendl;
6685 if (stretch_mode_engaged) return;
6686 if (!osdmon()->is_readable()) {
6687 dout(20) << "osdmon is not readable" << dendl;
6688 osdmon()->wait_for_readable_ctx(new CMonEnableStretchMode(this));
6689 return;
6690 }
6691 if (osdmon()->osdmap.stretch_mode_enabled &&
6692 monmap->stretch_mode_enabled) {
6693 dout(10) << "Engaging stretch mode!" << dendl;
6694 stretch_mode_engaged = true;
6695 int32_t stretch_divider_id = osdmon()->osdmap.stretch_mode_bucket;
6696 stretch_bucket_divider = osdmon()->osdmap.
6697 crush->get_type_name(stretch_divider_id);
6698 disconnect_disallowed_stretch_sessions();
6699 }
6700 }
6701
6702 void Monitor::do_stretch_mode_election_work()
6703 {
6704 dout(20) << __func__ << dendl;
6705 if (!is_stretch_mode() ||
6706 !is_leader()) return;
6707 dout(20) << "checking for degraded stretch mode" << dendl;
6708 map<string, set<string>> old_dead_buckets;
6709 old_dead_buckets.swap(dead_mon_buckets);
6710 up_mon_buckets.clear();
6711 // identify if we've lost a CRUSH bucket, request OSDMonitor check for death
6712 map<string,set<string>> down_mon_buckets;
6713 for (unsigned i = 0; i < monmap->size(); ++i) {
6714 const auto &mi = monmap->mon_info[monmap->get_name(i)];
6715 auto ci = mi.crush_loc.find(stretch_bucket_divider);
6716 ceph_assert(ci != mi.crush_loc.end());
6717 if (quorum.count(i)) {
6718 up_mon_buckets.insert(ci->second);
6719 } else {
6720 down_mon_buckets[ci->second].insert(mi.name);
6721 }
6722 }
6723 dout(20) << "prior dead_mon_buckets: " << old_dead_buckets
6724 << "; down_mon_buckets: " << down_mon_buckets
6725 << "; up_mon_buckets: " << up_mon_buckets << dendl;
6726 for (const auto& di : down_mon_buckets) {
6727 if (!up_mon_buckets.count(di.first)) {
6728 dead_mon_buckets[di.first] = di.second;
6729 }
6730 }
6731 dout(20) << "new dead_mon_buckets " << dead_mon_buckets << dendl;
6732
6733 if (dead_mon_buckets != old_dead_buckets &&
6734 dead_mon_buckets.size() >= old_dead_buckets.size()) {
6735 maybe_go_degraded_stretch_mode();
6736 }
6737 }
6738
6739 struct CMonGoDegraded : public Context {
6740 Monitor *m;
6741 CMonGoDegraded(Monitor *mon) : m(mon) {}
6742 void finish(int r) {
6743 m->maybe_go_degraded_stretch_mode();
6744 }
6745 };
6746
6747 struct CMonGoRecovery : public Context {
6748 Monitor *m;
6749 CMonGoRecovery(Monitor *mon) : m(mon) {}
6750 void finish(int r) {
6751 m->go_recovery_stretch_mode();
6752 }
6753 };
6754 void Monitor::go_recovery_stretch_mode()
6755 {
6756 dout(20) << __func__ << dendl;
6757 dout(20) << "is_leader(): " << is_leader() << dendl;
6758 if (!is_leader()) return;
6759 dout(20) << "is_degraded_stretch_mode(): " << is_degraded_stretch_mode() << dendl;
6760 if (!is_degraded_stretch_mode()) return;
6761 dout(20) << "is_recovering_stretch_mode(): " << is_recovering_stretch_mode() << dendl;
6762 if (is_recovering_stretch_mode()) return;
6763 dout(20) << "dead_mon_buckets.size(): " << dead_mon_buckets.size() << dendl;
6764 dout(20) << "dead_mon_buckets: " << dead_mon_buckets << dendl;
6765 if (dead_mon_buckets.size()) {
6766 ceph_assert( 0 == "how did we try and do stretch recovery while we have dead monitor buckets?");
6767 // we can't recover if we are missing monitors in a zone!
6768 return;
6769 }
6770
6771 if (!osdmon()->is_readable()) {
6772 dout(20) << "osdmon is not readable" << dendl;
6773 osdmon()->wait_for_readable_ctx(new CMonGoRecovery(this));
6774 return;
6775 }
6776
6777 if (!osdmon()->is_writeable()) {
6778 dout(20) << "osdmon is not writeable" << dendl;
6779 osdmon()->wait_for_writeable_ctx(new CMonGoRecovery(this));
6780 return;
6781 }
6782 osdmon()->trigger_recovery_stretch_mode();
6783 }
6784
6785 void Monitor::set_recovery_stretch_mode()
6786 {
6787 degraded_stretch_mode = true;
6788 recovering_stretch_mode = true;
6789 osdmon()->set_recovery_stretch_mode();
6790 }
6791
6792 void Monitor::maybe_go_degraded_stretch_mode()
6793 {
6794 dout(20) << __func__ << dendl;
6795 if (is_degraded_stretch_mode()) return;
6796 if (!is_leader()) return;
6797 if (dead_mon_buckets.empty()) return;
6798 if (!osdmon()->is_readable()) {
6799 osdmon()->wait_for_readable_ctx(new CMonGoDegraded(this));
6800 return;
6801 }
6802 ceph_assert(monmap->contains(monmap->tiebreaker_mon));
6803 // filter out the tiebreaker zone and check if remaining sites are down by OSDs too
6804 const auto &mi = monmap->mon_info[monmap->tiebreaker_mon];
6805 auto ci = mi.crush_loc.find(stretch_bucket_divider);
6806 map<string, set<string>> filtered_dead_buckets = dead_mon_buckets;
6807 filtered_dead_buckets.erase(ci->second);
6808
6809 set<int> matched_down_buckets;
6810 set<string> matched_down_mons;
6811 bool dead = osdmon()->check_for_dead_crush_zones(filtered_dead_buckets,
6812 &matched_down_buckets,
6813 &matched_down_mons);
6814 if (dead) {
6815 if (!osdmon()->is_writeable()) {
6816 dout(20) << "osdmon is not writeable" << dendl;
6817 osdmon()->wait_for_writeable_ctx(new CMonGoDegraded(this));
6818 return;
6819 }
6820 if (!monmon()->is_writeable()) {
6821 dout(20) << "monmon is not writeable" << dendl;
6822 monmon()->wait_for_writeable_ctx(new CMonGoDegraded(this));
6823 return;
6824 }
6825 trigger_degraded_stretch_mode(matched_down_mons, matched_down_buckets);
6826 }
6827 }
6828
6829 void Monitor::trigger_degraded_stretch_mode(const set<string>& dead_mons,
6830 const set<int>& dead_buckets)
6831 {
6832 dout(20) << __func__ << dendl;
6833 ceph_assert(osdmon()->is_writeable());
6834 ceph_assert(monmon()->is_writeable());
6835
6836 // figure out which OSD zone(s) remains alive by removing
6837 // tiebreaker mon from up_mon_buckets
6838 set<string> live_zones = up_mon_buckets;
6839 ceph_assert(monmap->contains(monmap->tiebreaker_mon));
6840 const auto &mi = monmap->mon_info[monmap->tiebreaker_mon];
6841 auto ci = mi.crush_loc.find(stretch_bucket_divider);
6842 live_zones.erase(ci->second);
6843 ceph_assert(live_zones.size() == 1); // only support 2 zones right now
6844
6845 osdmon()->trigger_degraded_stretch_mode(dead_buckets, live_zones);
6846 monmon()->trigger_degraded_stretch_mode(dead_mons);
6847 set_degraded_stretch_mode();
6848 }
6849
6850 void Monitor::set_degraded_stretch_mode()
6851 {
6852 dout(20) << __func__ << dendl;
6853 degraded_stretch_mode = true;
6854 recovering_stretch_mode = false;
6855 osdmon()->set_degraded_stretch_mode();
6856 }
6857
6858 struct CMonGoHealthy : public Context {
6859 Monitor *m;
6860 CMonGoHealthy(Monitor *mon) : m(mon) {}
6861 void finish(int r) {
6862 m->trigger_healthy_stretch_mode();
6863 }
6864 };
6865
6866
6867 void Monitor::trigger_healthy_stretch_mode()
6868 {
6869 dout(20) << __func__ << dendl;
6870 if (!is_degraded_stretch_mode()) return;
6871 if (!is_leader()) return;
6872 if (!osdmon()->is_writeable()) {
6873 dout(20) << "osdmon is not writeable" << dendl;
6874 osdmon()->wait_for_writeable_ctx(new CMonGoHealthy(this));
6875 return;
6876 }
6877 if (!monmon()->is_writeable()) {
6878 dout(20) << "monmon is not writeable" << dendl;
6879 monmon()->wait_for_writeable_ctx(new CMonGoHealthy(this));
6880 return;
6881 }
6882
6883 ceph_assert(osdmon()->osdmap.recovering_stretch_mode);
6884 osdmon()->trigger_healthy_stretch_mode();
6885 monmon()->trigger_healthy_stretch_mode();
6886 }
6887
6888 void Monitor::set_healthy_stretch_mode()
6889 {
6890 degraded_stretch_mode = false;
6891 recovering_stretch_mode = false;
6892 osdmon()->set_healthy_stretch_mode();
6893 }
6894
6895 bool Monitor::session_stretch_allowed(MonSession *s, MonOpRequestRef& op)
6896 {
6897 if (!is_stretch_mode()) return true;
6898 if (s->proxy_con) return true;
6899 if (s->validated_stretch_connection) return true;
6900 if (!s->con) return true;
6901 if (s->con->peer_is_osd()) {
6902 dout(20) << __func__ << "checking OSD session" << s << dendl;
6903 // okay, check the crush location
6904 int barrier_id = [&] {
6905 auto type_id = osdmon()->osdmap.crush->get_validated_type_id(
6906 stretch_bucket_divider);
6907 ceph_assert(type_id.has_value());
6908 return *type_id;
6909 }();
6910 int osd_bucket_id = osdmon()->osdmap.crush->get_parent_of_type(s->con->peer_id,
6911 barrier_id);
6912 const auto &mi = monmap->mon_info.find(name);
6913 ceph_assert(mi != monmap->mon_info.end());
6914 auto ci = mi->second.crush_loc.find(stretch_bucket_divider);
6915 ceph_assert(ci != mi->second.crush_loc.end());
6916 int mon_bucket_id = osdmon()->osdmap.crush->get_item_id(ci->second);
6917
6918 if (osd_bucket_id != mon_bucket_id) {
6919 dout(5) << "discarding session " << *s
6920 << " and sending OSD to matched zone" << dendl;
6921 s->con->mark_down();
6922 std::lock_guard l(session_map_lock);
6923 remove_session(s);
6924 if (op) {
6925 op->mark_zap();
6926 }
6927 return false;
6928 }
6929 }
6930
6931 s->validated_stretch_connection = true;
6932 return true;
6933 }
6934
6935 void Monitor::disconnect_disallowed_stretch_sessions()
6936 {
6937 dout(20) << __func__ << dendl;
6938 MonOpRequestRef blank;
6939 auto i = session_map.sessions.begin();
6940 while (i != session_map.sessions.end()) {
6941 auto j = i;
6942 ++i;
6943 session_stretch_allowed(*j, blank);
6944 }
6945 }